From 2ed5301b6a13f37be27cb589cd96181c26793215 Mon Sep 17 00:00:00 2001 From: msincenselee Date: Wed, 19 May 2021 16:23:15 +0800 Subject: [PATCH] =?UTF-8?q?[update]=20json=E4=B8=8Enumpy=E5=85=BC=E5=AE=B9?= =?UTF-8?q?=E9=97=AE=E9=A2=98,=E8=82=A1=E7=A5=A8=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/api/websocket/websocket_client.py | 2 +- vnpy/app/cta_crypto/engine.py | 26 +++++++--- vnpy/app/cta_crypto/template.py | 4 +- vnpy/app/cta_strategy_pro/template.py | 2 +- vnpy/app/trade_copy/__init__.py | 18 +++++++ vnpy/component/cta_policy.py | 21 +++++++- vnpy/gateway/ctp/ctp_gateway.py | 58 ++++++++++++++++++++- vnpy/gateway/gj/gj_gateway.py | 2 +- vnpy/gateway/pb/pb_gateway.py | 2 +- vnpy/gateway/rohon/rohon_gateway.py | 2 +- vnpy/gateway/stockrpc/stock_rpc_gateway.py | 59 ++++++++++++++++++++-- vnpy/trader/gateway.py | 6 +++ 12 files changed, 182 insertions(+), 20 deletions(-) create mode 100644 vnpy/app/trade_copy/__init__.py diff --git a/vnpy/api/websocket/websocket_client.py b/vnpy/api/websocket/websocket_client.py index 6cda0ac7..1bf0cc3e 100644 --- a/vnpy/api/websocket/websocket_client.py +++ b/vnpy/api/websocket/websocket_client.py @@ -212,7 +212,7 @@ class WebsocketClient: recv_data = gzip.decompress(recv_data) data = self.unpack_data(recv_data) except ValueError as e: - print("websocket unable to parse data: " + recv_data) + print("websocket unable to parse data: " + recv_data, file=sys.stderr) raise e self._log('recv data: %s', data) diff --git a/vnpy/app/cta_crypto/engine.py b/vnpy/app/cta_crypto/engine.py index e7880282..c455473c 100644 --- a/vnpy/app/cta_crypto/engine.py +++ b/vnpy/app/cta_crypto/engine.py @@ -204,11 +204,13 @@ class CtaEngine(BaseEngine): def process_timer_event(self, event: Event): """ 处理定时器事件""" all_trading = True + untrading_strategies = [] # 触发每个策略的定时接口 for strategy in list(self.strategies.values()): strategy.on_timer() if not strategy.trading: all_trading = False + untrading_strategies.append(strategy.strategy_name) dt = datetime.now() @@ -225,6 +227,12 @@ class CtaEngine(BaseEngine): # 推送到事件 self.put_all_strategy_pos_event(all_strategy_pos) + else: + if len(untrading_strategies) > 0: + account_id = self.engine_config.get('accountid', '-') + msg = f'异常,{account_id}/策略{untrading_strategies}处于停止交易状态,不能推送持仓对比' + self.write_error(msg) + self.send_wechat(msg) def process_tick_event(self, event: Event): """处理tick到达事件""" @@ -1002,6 +1010,7 @@ class CtaEngine(BaseEngine): """ Init a strategy. """ + self.write_log(f'创建独立线程执行{strategy_name} on_init()') task = self.thread_executor.submit(self._init_strategy, strategy_name, auto_start) self.thread_tasks.append(task) return True @@ -1011,14 +1020,13 @@ class CtaEngine(BaseEngine): Init strategies in queue. """ try: + self.write_log(f"{strategy_name} => 开始执行初始化") strategy = self.strategies[strategy_name] if strategy.inited: self.write_error(f"{strategy_name} => 已经完成初始化,禁止重复操作") return - self.write_log(f"{strategy_name} => 开始执行初始化") - # Call on_init function of strategy self.call_strategy_func(strategy, strategy.on_init) @@ -1048,7 +1056,7 @@ class CtaEngine(BaseEngine): except Exception as ex: msg = f'{strategy_name} => 执行on_init异常:{str(ex)}' - self.write_error(ex) + self.write_error(msg) self.send_wechat(msg) self.write_error(traceback.format_exc()) @@ -1077,7 +1085,7 @@ class CtaEngine(BaseEngine): except Exception as ex: msg = f'{strategy_name} => 执行on_start异常:{str(ex)}' - self.write_error(ex) + self.write_error(msg) self.send_wechat(msg) self.write_error(traceback.format_exc()) @@ -1112,7 +1120,7 @@ class CtaEngine(BaseEngine): return True, f'{strategy_name}=> 成功停止' except Exception as ex: msg = f'{strategy_name} => 执行stop_strategy()异常:{str(ex)}' - self.write_error(ex) + self.write_error(msg) self.send_wechat(msg) self.write_error(traceback.format_exc()) return False, f'停止策略失败{strategy_name},异常:{str(ex)}' @@ -1170,7 +1178,7 @@ class CtaEngine(BaseEngine): except Exception as ex: msg = f'执行remove_strategy({strategy_name})异常:{str(ex)}' - self.write_error(ex) + self.write_error(msg) self.send_wechat(msg) self.write_error(traceback.format_exc()) return False, f'移除策略失败{strategy_name},异常:{str(ex)}' @@ -1243,7 +1251,7 @@ class CtaEngine(BaseEngine): return True, msg except Exception as ex: msg = f'执行reload_strategy({strategy_name})异常:{str(ex)}' - self.write_error(ex) + self.write_error(msg) self.send_wechat(msg) self.write_error(traceback.format_exc()) return False, f'重启策略失败{strategy_name},异常:{str(ex)}' @@ -1281,7 +1289,9 @@ class CtaEngine(BaseEngine): # 保存策略数据 strategy.sync_data() except Exception as ex: - self.write_error(u'保存策略{}数据异常:'.format(strategy_name, str(ex))) + msg = u'保存策略{}数据异常:'.format(strategy_name, str(ex)) + self.write_error(msg) + self.send_wechat(msg) self.write_error(traceback.format_exc()) def clean_strategy_cache(self, strategy_name): diff --git a/vnpy/app/cta_crypto/template.py b/vnpy/app/cta_crypto/template.py index 65658dad..b28d1491 100644 --- a/vnpy/app/cta_crypto/template.py +++ b/vnpy/app/cta_crypto/template.py @@ -513,6 +513,7 @@ class CtaFutureTemplate(CtaTemplate): self.write_log(u'保存policy数据') self.policy.save() + def save_klines_to_cache(self, kline_names: list = []): """ 保存K线数据到缓存 @@ -600,9 +601,10 @@ class CtaFutureTemplate(CtaTemplate): return {} def init_policy(self): - self.write_log(u'init_policy(),初始化执行逻辑') + self.write_log(f'{self.strategy_name} => init_policy(),初始化执行逻辑') if self.policy: self.policy.load() + self.write_log(f'{self.strategy_name} => init_policy(),初始化执行逻辑完成') def init_position(self): """ diff --git a/vnpy/app/cta_strategy_pro/template.py b/vnpy/app/cta_strategy_pro/template.py index f6be43c8..e736bb0c 100644 --- a/vnpy/app/cta_strategy_pro/template.py +++ b/vnpy/app/cta_strategy_pro/template.py @@ -1251,7 +1251,7 @@ class CtaProFutureTemplate(CtaProTemplate): dist_record['symbol'] = trade.vt_symbol # 处理股指锁单 - if trade.exchange == Exchange.CFFEX: + if trade.exchange == Exchange.CFFEX and not self.backtesting: if trade.direction == Direction.LONG: if abs(self.position.short_pos) >= trade.volume: self.position.short_pos += trade.volume diff --git a/vnpy/app/trade_copy/__init__.py b/vnpy/app/trade_copy/__init__.py new file mode 100644 index 00000000..ec29013b --- /dev/null +++ b/vnpy/app/trade_copy/__init__.py @@ -0,0 +1,18 @@ +from pathlib import Path + +from vnpy.trader.app import BaseApp +from vnpy.trader.constant import Direction +from vnpy.trader.object import TickData, BarData, TradeData, OrderData +from vnpy.trader.utility import BarGenerator, ArrayManager + +from .engine import TradeCopyEngine, APP_NAME + +class TradeCopyApp(BaseApp): + """""" + app_name = APP_NAME + app_module = __module__ + app_path = Path(__file__).parent + display_name = "跟单软件" + engine_class = TradeCopyEngine + widget_name = "TcManager" + icon_name = "tc.ico" diff --git a/vnpy/component/cta_policy.py b/vnpy/component/cta_policy.py index 2f55bac2..1f8d994d 100644 --- a/vnpy/component/cta_policy.py +++ b/vnpy/component/cta_policy.py @@ -13,6 +13,25 @@ TNS_STATUS_ORDERING = 'ordering' TNS_STATUS_OPENED = 'opened' TNS_STATUS_CLOSED = 'closed' +import numpy as np + + +class MyEncoder(json.JSONEncoder): + """ + 自定义转换器,处理np,datetime等不能被json转换得问题 + """ + def default(self, obj): + if isinstance(obj, np.integer): + return int(obj) + elif isinstance(obj, np.floating): + return float(obj) + elif isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, datetime): + return obj.strftime('%Y-%m-%d %H:%M:%S') + else: + return super(MyEncoder, self).default(obj) + class CtaPolicy(CtaComponent): """ @@ -103,7 +122,7 @@ class CtaPolicy(CtaComponent): json_data = self.to_json() json_data['save_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') with open(json_file, 'w', encoding='utf8') as f: - data = json.dumps(json_data, indent=4, ensure_ascii=False) + data = json.dumps(json_data, indent=4, ensure_ascii=False, cls=MyEncoder) f.write(data) except IOError as ex: diff --git a/vnpy/gateway/ctp/ctp_gateway.py b/vnpy/gateway/ctp/ctp_gateway.py index f8e2d41a..a3796693 100644 --- a/vnpy/gateway/ctp/ctp_gateway.py +++ b/vnpy/gateway/ctp/ctp_gateway.py @@ -376,13 +376,17 @@ class CtpGateway(BaseGateway): def check_status(self): """检查状态""" + # 检查交易接口、行情接口的连接状态 if self.td_api.connect_status and self.md_api.connect_status: self.status.update({'con': True}) + # 检查通达信行情接口(直接连通达信) if self.tdx_api: self.tdx_api.check_status() - if self.tdx_api is None or self.md_api is None: - return False + + # 检查天勤行情接口 + if self.tq_api: + self.tq_api.check_status() if not self.td_api.connect_status or self.md_api.connect_status: return False @@ -1600,6 +1604,10 @@ class TdxMdApi(): self.check_status() def check_status(self): + """ + 检查通达信直连状态 + :return: + """ # self.write_log(u'检查tdx接口状态') if len(self.registered_symbol_set) == 0: return @@ -1826,6 +1834,8 @@ class SubMdApi(): self.symbol_tick_dict = {} # 合约与最后一个Tick得字典 self.registed_symbol_set = set() # 订阅的合约记录集 + self.last_tick_dt = None + self.sub = None self.setting = {} self.connect_status = False @@ -1852,6 +1862,42 @@ class SubMdApi(): self.gateway.write_error(traceback.format_exc()) self.connect_status = False + def check_status(self): + """接口状态的健康检查""" + + # 订阅的合约 + d = {'sub_symbols': sorted(self.symbol_tick_dict.keys())} + + # 合约的最后时间 + if self.last_tick_dt: + d.update({"sub_tick_time": self.last_tick_dt.strftime('%Y-%m-%d %H:%M:%S')}) + + if len(self.symbol_tick_dict) > 0: + dt_now = datetime.now() + hh_mm = dt_now.hour * 100 + dt_now.minute + # 期货交易时间内 + if 900 <= hh_mm <= 1130 or 1300 <= hh_mm <= 1500 or hh_mm < 230 or hh_mm >= 2100: + # 未有数据到达 + if self.last_tick_dt is None: + d.update({"sub_status": False, "sub_error": u"rabbitmq未有行情数据到达"}) + else: # 有数据 + + # 超时15分钟以上 + if (dt_now - self.last_tick_dt).total_seconds() > 60 * 15: + d.update({"sub_status": False, + "sub_error": u"{}rabbitmq行情数据超时15分钟以上".format(hh_mm)}) + else: + d.update({"sub_status": True}) + self.gateway.status.pop("sub_error", None) + + # 非交易时间 + else: + self.gateway.status.pop("sub_status", None) + self.gateway.status.pop("sub_error", None) + + # 更新到gateway的状态中去 + self.gateway.status.update(d) + def on_message(self, chan, method_frame, _header_frame, body, userdata=None): # print(" [x] %r" % body) try: @@ -1874,6 +1920,7 @@ class SubMdApi(): exchange=Exchange(d.get('exchange')), symbol=symbol, datetime=dt) + d.pop('exchange', None) d.pop('symbol', None) tick.__dict__.update(d) @@ -1887,6 +1934,8 @@ class SubMdApi(): if tick.last_price > pre_tick.last_price * 1.2 or tick.last_price < pre_tick.last_price * 0.8: return + self.last_tick_dt = tick.datetime + self.gateway.on_tick(tick) self.gateway.on_custom_tick(tick) @@ -2016,6 +2065,11 @@ class TqMdApi(): self.update_thread = Thread(target=self.update) self.update_thread.start() + def check_status(self): + """检查接口状态""" + + pass + def generate_tick_from_quote(self, vt_symbol, quote) -> TickData: """ 生成TickData diff --git a/vnpy/gateway/gj/gj_gateway.py b/vnpy/gateway/gj/gj_gateway.py index 163d8f66..ae29d541 100644 --- a/vnpy/gateway/gj/gj_gateway.py +++ b/vnpy/gateway/gj/gj_gateway.py @@ -1167,7 +1167,7 @@ class TqMdApi(): return try: from tqsdk import TqApi - self.api = TqApi(_stock=True, url="wss://u.shinnytech.com/t/nfmd/front/mobile") + self.api = TqApi(_stock=True, url="wss://api.shinnytech.com/t/nfmd/front/mobile") except Exception as e: self.gateway.write_log(f'天勤股票行情API接入异常:'.format(str(e))) self.gateway.write_log(traceback.format_exc()) diff --git a/vnpy/gateway/pb/pb_gateway.py b/vnpy/gateway/pb/pb_gateway.py index 3aed71b3..04344e78 100644 --- a/vnpy/gateway/pb/pb_gateway.py +++ b/vnpy/gateway/pb/pb_gateway.py @@ -2105,7 +2105,7 @@ class TqMdApi(): return try: from tqsdk import TqApi - self.api = TqApi(_stock=True, url="wss://u.shinnytech.com/t/nfmd/front/mobile") + self.api = TqApi(_stock=True, url="wss://api.shinnytech.com/t/nfmd/front/mobile") except Exception as e: self.gateway.write_log(f'天勤股票行情API接入异常:'.format(str(e))) self.gateway.write_log(traceback.format_exc()) diff --git a/vnpy/gateway/rohon/rohon_gateway.py b/vnpy/gateway/rohon/rohon_gateway.py index 472cc8be..247e3671 100644 --- a/vnpy/gateway/rohon/rohon_gateway.py +++ b/vnpy/gateway/rohon/rohon_gateway.py @@ -1088,7 +1088,7 @@ class RohonTdApi(TdApi): price=data["LimitPrice"], volume=data["VolumeTotalOriginal"], traded=data["VolumeTraded"], - status=STATUS_ROHON2VT[data["OrderStatus"]], + status=STATUS_ROHON2VT.get(data["OrderStatus"],Status.UNKNOWN), time=data["InsertTime"], gateway_name=self.gateway_name ) diff --git a/vnpy/gateway/stockrpc/stock_rpc_gateway.py b/vnpy/gateway/stockrpc/stock_rpc_gateway.py index 87cf6b0a..45805143 100644 --- a/vnpy/gateway/stockrpc/stock_rpc_gateway.py +++ b/vnpy/gateway/stockrpc/stock_rpc_gateway.py @@ -1,5 +1,6 @@ import traceback import json +from copy import deepcopy from uuid import uuid1 from datetime import datetime, timedelta from threading import Thread @@ -84,6 +85,16 @@ class StockRpcGateway(BaseGateway): self.query_all() + def check_status(self): + + if self.client: + pass + + if self.rabbit_api: + self.rabbit_api.check_status() + + return True + def subscribe(self, req: SubscribeRequest): """行情订阅""" self.write_log(f'创建订阅任务=> rabbitMQ') @@ -114,6 +125,8 @@ class StockRpcGateway(BaseGateway): task.close() # gateway_name = self.symbol_gateway_map.get(req.vt_symbol, "") # self.client.subscribe(req, gateway_name) + if self.rabbit_api: + self.rabbit_api.registed_symbol_set.add(req.vt_symbol) def send_order(self, req: OrderRequest): """ @@ -161,7 +174,8 @@ class StockRpcGateway(BaseGateway): for position in positions: position.gateway_name = self.gateway_name # 更换 vt_positionid得gateway前缀 - position.vt_positionid = position.vt_positionid.replace(f'{position.gateway_name}.', f'{self.gateway_name}.') + position.vt_positionid = position.vt_positionid.replace(f'{position.gateway_name}.', + f'{self.gateway_name}.') # 更换 vt_accountid得gateway前缀 position.vt_accountid = position.vt_accountid.replace(f'{position.gateway_name}.', f'{self.gateway_name}.') @@ -205,6 +219,8 @@ class StockRpcGateway(BaseGateway): if event.type == EVENT_TICK: return + event = deepcopy(event) + data = event.data if hasattr(data, "gateway_name"): @@ -225,7 +241,7 @@ class StockRpcGateway(BaseGateway): if hasattr(data, 'vt_positionid'): data.vt_positionid = data.vt_positionid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.') - if event.type in [EVENT_ORDER,EVENT_TRADE]: + if event.type in [EVENT_ORDER, EVENT_TRADE]: self.write_log(f'{self.remote_gw_name} => {self.gateway_name} event:{data.__dict__}') self.event_engine.put(event) @@ -242,12 +258,48 @@ class SubMdApi(): self.symbol_tick_dict = {} # 合约与最后一个Tick得字典 self.registed_symbol_set = set() # 订阅的合约记录集 - + self.last_tick_dt = None self.sub = None self.setting = {} self.connect_status = False self.thread = None # 用线程运行所有行情接收 + def check_status(self): + """接口状态的健康检查""" + + # 订阅的合约 + d = {'sub_symbols': sorted(self.symbol_tick_dict.keys())} + + # 合约的最后时间 + if self.last_tick_dt: + d.update({"sub_tick_time": self.last_tick_dt.strftime('%Y-%m-%d %H:%M:%S')}) + + if len(self.symbol_tick_dict) > 0: + dt_now = datetime.now() + hh_mm = dt_now.hour * 100 + dt_now.minute + # 期货交易时间内 + if 930 <= hh_mm <= 1130 or 1301 <= hh_mm <= 1500: + # 未有数据到达 + if self.last_tick_dt is None: + d.update({"sub_status": False, "sub_error": u"rabbitmq未有行情数据到达"}) + else: # 有数据 + + # 超时5分钟以上 + if (dt_now - self.last_tick_dt).total_seconds() > 60 * 5: + d.update({"sub_status": False, + "sub_error": u"{}rabbitmq行情数据超时5分钟以上".format(hh_mm)}) + else: + d.update({"sub_status": True}) + self.gateway.status.pop("sub_error", None) + + # 非交易时间 + else: + self.gateway.status.pop("sub_status", None) + self.gateway.status.pop("sub_error", None) + + # 更新到gateway的状态中去 + self.gateway.status.update(d) + def connect(self, setting={}): """连接""" self.setting = setting @@ -295,6 +347,7 @@ class SubMdApi(): self.symbol_tick_dict[symbol] = tick self.gateway.on_tick(tick) + self.last_tick_dt = tick.datetime except Exception as ex: self.gateway.write_error(u'RabbitMQ on_message 异常:{}'.format(str(ex))) diff --git a/vnpy/trader/gateway.py b/vnpy/trader/gateway.py index 5afb6125..a5a72191 100644 --- a/vnpy/trader/gateway.py +++ b/vnpy/trader/gateway.py @@ -209,6 +209,12 @@ class BaseGateway(ABC): self.write_log(msg, level=ERROR, on_log=True) print(msg, file=sys.stderr) + def check_status(self) -> bool: + """ + check gateway connection or market data status. + """ + return False + @abstractmethod def connect(self, setting: dict) -> None: """