From 3496ee92b016ac858117f9eae1f9f12cd6bc5b99 Mon Sep 17 00:00:00 2001 From: msincenselee Date: Wed, 22 Jul 2020 21:55:35 +0800 Subject: [PATCH] [bug fix] --- vnpy/app/cta_strategy_pro/template.py | 20 ++- vnpy/gateway/pb/pb_gateway.py | 189 +++++++++++++++++++++++++- vnpy/trader/util_wechat.py | 4 +- 3 files changed, 204 insertions(+), 9 deletions(-) diff --git a/vnpy/app/cta_strategy_pro/template.py b/vnpy/app/cta_strategy_pro/template.py index 4edc30e8..c30ff6d9 100644 --- a/vnpy/app/cta_strategy_pro/template.py +++ b/vnpy/app/cta_strategy_pro/template.py @@ -197,6 +197,9 @@ class CtaTemplate(ABC): if self.is_upper_limit(vt_symbol): self.write_error(u'涨停价不做FAK/FOK委托') return [] + if volume == 0: + self.write_error(f'委托数量有误,必须大于0,{vt_symbol}, price:{price}') + return [] return self.send_order(vt_symbol=vt_symbol, direction=Direction.LONG, offset=Offset.OPEN, @@ -218,6 +221,9 @@ class CtaTemplate(ABC): if self.is_lower_limit(vt_symbol): self.write_error(u'跌停价不做FAK/FOK sell委托') return [] + if volume == 0: + self.write_error(f'委托数量有误,必须大于0,{vt_symbol}, price:{price}') + return [] return self.send_order(vt_symbol=vt_symbol, direction=Direction.SHORT, offset=Offset.CLOSE, @@ -239,6 +245,9 @@ class CtaTemplate(ABC): if self.is_lower_limit(vt_symbol): self.write_error(u'跌停价不做FAK/FOK short委托') return [] + if volume == 0: + self.write_error(f'委托数量有误,必须大于0,{vt_symbol}, price:{price}') + return [] return self.send_order(vt_symbol=vt_symbol, direction=Direction.SHORT, offset=Offset.OPEN, @@ -260,6 +269,9 @@ class CtaTemplate(ABC): if self.is_upper_limit(vt_symbol): self.write_error(u'涨停价不做FAK/FOK cover委托') return [] + if volume == 0: + self.write_error(f'委托数量有误,必须大于0,{vt_symbol}, price:{price}') + return [] return self.send_order(vt_symbol=vt_symbol, direction=Direction.LONG, offset=Offset.CLOSE, @@ -2021,7 +2033,7 @@ class CtaProFutureTemplate(CtaProTemplate): target_long_grid = None remove_long_grid_ids = [] for g in sorted(locked_long_grids, key=lambda grid: grid.volume): - if g.order_status or len(g.orderRef) > 0: + if g.order_status or len(g.order_ids) > 0: continue if target_long_grid is None: target_long_grid = g @@ -2035,7 +2047,7 @@ class CtaProFutureTemplate(CtaProTemplate): remain_grid = copy(g) g.volume = open_volume remain_grid.volume -= open_volume - remain_grid.id = uuid.uuid1() + remain_grid.id = str(uuid.uuid1()) self.gt.dn_grids.append(remain_grid) self.write_log(u'添加剩余仓位到新多单网格:g.volume:{}' .format(remain_grid.volume)) @@ -2081,7 +2093,7 @@ class CtaProFutureTemplate(CtaProTemplate): remain_grid = copy(g) g.volume = open_volume remain_grid.volume -= open_volume - remain_grid.id = uuid.uuid1() + remain_grid.id = str(uuid.uuid1()) self.gt.up_grids.append(remain_grid) self.write_log(u'添加剩余仓位到新空单网格:g.volume:{}' .format(remain_grid.volume)) @@ -2146,6 +2158,8 @@ class CtaProFutureTemplate(CtaProTemplate): # 正在委托时,不处理 if self.entrust != 0: return + if not self.activate_today_lock: + return # 多单得对锁格 locked_long_grids = self.gt.get_opened_grids_within_types(direction=Direction.LONG, types=[LOCK_GRID]) diff --git a/vnpy/gateway/pb/pb_gateway.py b/vnpy/gateway/pb/pb_gateway.py index e0ec75e6..90667e12 100644 --- a/vnpy/gateway/pb/pb_gateway.py +++ b/vnpy/gateway/pb/pb_gateway.py @@ -16,7 +16,7 @@ from time import sleep from functools import lru_cache from collections import OrderedDict from multiprocessing.dummy import Pool - +from threading import Thread from vnpy.event import EventEngine from vnpy.trader.event import EVENT_TIMER from vnpy.trader.constant import ( @@ -25,10 +25,12 @@ from vnpy.trader.constant import ( Direction, OrderType, Status, - Offset + Offset, + Interval ) from vnpy.trader.gateway import BaseGateway, LocalOrderManager from vnpy.trader.object import ( + BarData, CancelRequest, OrderRequest, SubscribeRequest, @@ -37,7 +39,8 @@ from vnpy.trader.object import ( OrderData, TradeData, PositionData, - AccountData + AccountData, + HistoryRequest ) from vnpy.trader.utility import get_folder_path, print_dict, extract_vt_symbol, get_stock_exchange, append_data from vnpy.data.tdx.tdx_common import get_stock_type_sz, get_stock_type_sh @@ -47,6 +50,14 @@ symbol_name_map: Dict[str, str] = {} # 代码 <=> 交易所 symbol_exchange_map: Dict[str, Exchange] = {} +# 时间戳对齐 +TIME_GAP = 8 * 60 * 60 * 1000000000 +INTERVAL_VT2TQ = { + Interval.MINUTE: 60, + Interval.HOUR: 60 * 60, + Interval.DAILY: 60 * 60 * 24, +} + # 功能<->文件对应 PB_FILE_NAMES = { 'send_order': 'XHPT_WT', # 通用接口_委托 @@ -323,6 +334,7 @@ class PbGateway(BaseGateway): self.md_api = PbMdApi(self) self.td_api = PbTdApi(self) + self.tq_api = None self.tdx_connected = False # 通达信行情API得连接状态 @@ -359,6 +371,8 @@ class PbGateway(BaseGateway): product_id=product_id, unit_id=unit_id, holder_ids=holder_ids) + self.tq_api = TqMdApi(self) + self.tq_api.connect() self.init_query() def close(self) -> None: @@ -368,7 +382,10 @@ class PbGateway(BaseGateway): def subscribe(self, req: SubscribeRequest) -> None: """""" - self.md_api.subscribe(req) + if self.tq_api and self.tq_api.is_connected: + self.tq_api.subscribe(req) + else: + self.md_api.subscribe(req) def send_order(self, req: OrderRequest) -> str: """""" @@ -2012,3 +2029,167 @@ class PbTdApi(object): def cancel_all_csv(self): pass + + +class TqMdApi(): + """天勤行情API""" + + def __init__(self, gateway): + """""" + super().__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.api = None + self.is_connected = False + self.subscribe_array = [] + # 行情对象列表 + self.quote_objs = [] + + # 数据更新线程 + self.update_thread = None + # 所有的合约 + self.all_instruments = [] + + self.ticks = {} + + def connect(self, setting = {}): + """""" + try: + from tqsdk import TqApi + self.api = TqApi(_stock=True) + except Exception as e: + self.gateway.write_log(f'天勤股票行情API接入异常:'.format(str(e))) + self.gateway.write_log(traceback.format_exc()) + if self.api: + self.is_connected = True + self.gateway.write_log(f'天勤股票行情API已连接') + self.update_thread = Thread(target=self.update) + self.update_thread.start() + + def generate_tick_from_quote(self, vt_symbol, quote) -> TickData: + """ + 生成TickData + """ + # 清洗 nan + quote = {k: 0 if v != v else v for k, v in quote.items()} + symbol, exchange = extract_vt_symbol(vt_symbol) + return TickData( + symbol=symbol, + exchange=exchange, + datetime=datetime.strptime(quote["datetime"], "%Y-%m-%d %H:%M:%S.%f"), + name=symbol, + volume=quote["volume"], + open_interest=quote["open_interest"], + last_price=quote["last_price"], + limit_up=quote["upper_limit"], + limit_down=quote["lower_limit"], + open_price=quote["open"], + high_price=quote["highest"], + low_price=quote["lowest"], + pre_close=quote["pre_close"], + bid_price_1=quote["bid_price1"], + bid_price_2=quote["bid_price2"], + bid_price_3=quote["bid_price3"], + bid_price_4=quote["bid_price4"], + bid_price_5=quote["bid_price5"], + ask_price_1=quote["ask_price1"], + ask_price_2=quote["ask_price2"], + ask_price_3=quote["ask_price3"], + ask_price_4=quote["ask_price4"], + ask_price_5=quote["ask_price5"], + bid_volume_1=quote["bid_volume1"], + bid_volume_2=quote["bid_volume2"], + bid_volume_3=quote["bid_volume3"], + bid_volume_4=quote["bid_volume4"], + bid_volume_5=quote["bid_volume5"], + ask_volume_1=quote["ask_volume1"], + ask_volume_2=quote["ask_volume2"], + ask_volume_3=quote["ask_volume3"], + ask_volume_4=quote["ask_volume4"], + ask_volume_5=quote["ask_volume5"], + gateway_name=self.gateway_name + ) + + def update(self) -> None: + """ + 更新行情/委托/账户/持仓 + """ + while self.api.wait_update(): + + # 更新行情信息 + for vt_symbol, quote in self.quote_objs: + if self.api.is_changing(quote): + tick = self.generate_tick_from_quote(vt_symbol, quote) + tick and self.gateway.on_tick(tick) and self.gateway.on_custom_tick(tick) + + def subscribe(self, req: SubscribeRequest) -> None: + """ + 订阅行情 + """ + if req.vt_symbol not in self.subscribe_array: + symbol, exchange = extract_vt_symbol(req.vt_symbol) + try: + quote = self.api.get_quote(f'{exchange.value}.{symbol}') + self.quote_objs.append((req.vt_symbol, quote)) + self.subscribe_array.append(req.vt_symbol) + except Exception as ex: + self.gateway.write_log('订阅天勤行情异常:{}'.format(str(ex))) + + def query_history(self, req: HistoryRequest) -> List[BarData]: + """ + 获取历史数据 + """ + symbol = req.symbol + exchange = req.exchange + interval = req.interval + start = req.start + end = req.end + # 天勤需要的数据 + tq_symbol = f'{exchange.value}.{symbol}' + tq_interval = INTERVAL_VT2TQ.get(interval) + end += timedelta(1) + total_days = end - start + # 一次最多只能下载 8964 根Bar + min_length = min(8964, total_days.days * 500) + df = self.api.get_kline_serial(tq_symbol, tq_interval, min_length).sort_values( + by=["datetime"] + ) + + # 时间戳对齐 + df["datetime"] = pd.to_datetime(df["datetime"] + TIME_GAP) + + # 过滤开始结束时间 + df = df[(df["datetime"] >= start - timedelta(days=1)) & (df["datetime"] < end)] + + data: List[BarData] = [] + if df is not None: + for ix, row in df.iterrows(): + bar = BarData( + symbol=symbol, + exchange=exchange, + interval=interval, + datetime=row["datetime"].to_pydatetime(), + open_price=row["open"], + high_price=row["high"], + low_price=row["low"], + close_price=row["close"], + volume=row["volume"], + open_interest=row.get("close_oi", 0), + gateway_name=self.gateway_name, + ) + data.append(bar) + return data + + def close(self) -> None: + """""" + try: + if self.api and self.api.wait_update(): + self.api.close() + self.is_connected = False + if self.update_thread: + self.update_thread.join() + except Exception as e: + self.gateway.write_log('退出天勤行情api异常:{}'.format(str(e))) + diff --git a/vnpy/trader/util_wechat.py b/vnpy/trader/util_wechat.py index b39368e8..356e96db 100644 --- a/vnpy/trader/util_wechat.py +++ b/vnpy/trader/util_wechat.py @@ -5,7 +5,7 @@ http://wxpusher.zjiecode.com/ 开通步骤: 1、关注公众号,注册 2、通过公众号,获取UID -3、通过网站=》应用列表=》新建应用,如vnpy2,并获得APP_TOOKEN +3、通过网站打开管理后台:http://wxpusher.zjiecode.com/admin/ 使用微信扫码登录,=》应用列表=》新建应用,如vnpy2,并获得APP_TOOKEN 4、应用列表=》应用(vnpy2)=》 关注. ''' @@ -20,7 +20,7 @@ global wechat_lock wechat_lock = Lock() # 这里可以设置UIDS, 多个人可同时接收 -UIDS = ['UID_kZguGPBQPWn41Ni9FK4CgPts2Kj'] +UIDS = ['UID_kZguGPBQPWn41Ni9FK4CgPts2KjU'] APP_TOKEN = 'AT_aDuiQu41dmAQV2vUMXOaaTDrWyhKJN2z'