diff --git a/vnpy/app/cta_crypto/engine.py b/vnpy/app/cta_crypto/engine.py index aba67c94..e7880282 100644 --- a/vnpy/app/cta_crypto/engine.py +++ b/vnpy/app/cta_crypto/engine.py @@ -951,47 +951,60 @@ class CtaEngine(BaseEngine): """ Add a new strategy. """ - if strategy_name in self.strategies: - msg = f"创建策略失败,存在重名{strategy_name}" - self.write_log(msg=msg, - level=logging.CRITICAL) + try: + if strategy_name in self.strategies: + msg = f"{strategy_name} => 创建策略失败,存在重名" + self.write_log(msg=msg, + level=logging.CRITICAL) + return False, msg + + strategy_class = self.classes.get(class_name, None) + if not strategy_class: + msg = f"{strategy_name} => 创建策略失败,找不到策略类{class_name}" + self.write_log(msg=msg, + level=logging.CRITICAL) + return False, msg + + self.write_log(f'{strategy_name} => 实例化策略类{class_name}') + strategy = strategy_class(self, strategy_name, vt_symbol, setting) + self.strategies[strategy_name] = strategy + + if len(vt_symbol) > 0: + self.write_log(f'{strategy_name} => 建立 {vt_symbol}行情绑定关系') + # Add vt_symbol to strategy map. + strategies = self.symbol_strategy_map[vt_symbol] + strategies.append(strategy) + + subscribe_symbol_set = self.strategy_symbol_map[strategy_name] + subscribe_symbol_set.add(vt_symbol) + + # Update to setting file. + self.write_log(f'{strategy_name} => 更新策略配置 => Cta配置文件') + self.update_strategy_setting(strategy_name, setting, auto_init, auto_start) + + self.write_log(f'{strategy_name} => 推送事件') + self.put_strategy_event(strategy) + + # 判断设置中是否由自动初始化和自动启动项目 + if auto_init: + self.write_log(f'{strategy_name} => 自动初始化与启动{auto_start}') + self.init_strategy(strategy_name, auto_start=auto_start) + + return True, f'{strategy_name} => 成功添加至引擎' + + except Exception as ex: + msg = f'{strategy_name} => 添加策略异常:{str(ex)}' + self.write_log(msg) + self.write_error(traceback.format_exc()) return False, msg - strategy_class = self.classes.get(class_name, None) - if not strategy_class: - msg = f"创建策略失败,找不到策略类{class_name}" - self.write_log(msg=msg, - level=logging.CRITICAL) - return False, msg - - self.write_log(f'开始添加策略类{class_name},实例名:{strategy_name}') - strategy = strategy_class(self, strategy_name, vt_symbol, setting) - self.strategies[strategy_name] = strategy - - # Add vt_symbol to strategy map. - strategies = self.symbol_strategy_map[vt_symbol] - strategies.append(strategy) - - subscribe_symbol_set = self.strategy_symbol_map[strategy_name] - subscribe_symbol_set.add(vt_symbol) - - # Update to setting file. - self.update_strategy_setting(strategy_name, setting, auto_init, auto_start) - - self.put_strategy_event(strategy) - - # 判断设置中是否由自动初始化和自动启动项目 - if auto_init: - self.init_strategy(strategy_name, auto_start=auto_start) - - return True, f'成功添加{strategy_name}' - def init_strategy(self, strategy_name: str, auto_start: bool = False): """ Init a strategy. """ task = self.thread_executor.submit(self._init_strategy, strategy_name, auto_start) self.thread_tasks.append(task) + return True def _init_strategy(self, strategy_name: str, auto_start: bool = False): """ @@ -1001,10 +1014,10 @@ class CtaEngine(BaseEngine): strategy = self.strategies[strategy_name] if strategy.inited: - self.write_error(f"{strategy_name}已经完成初始化,禁止重复操作") + self.write_error(f"{strategy_name} => 已经完成初始化,禁止重复操作") return - self.write_log(f"{strategy_name}开始执行初始化") + self.write_log(f"{strategy_name} => 开始执行初始化") # Call on_init function of strategy self.call_strategy_func(strategy, strategy.on_init) @@ -1019,24 +1032,26 @@ class CtaEngine(BaseEngine): # setattr(strategy, name, value) # Subscribe market data 订阅缺省的vt_symbol, 如果有其他合约需要订阅,由策略内部初始化时提交订阅即可。 - self.subscribe_symbol(strategy_name, vt_symbol=strategy.vt_symbol) + if len(strategy.vt_symbol) > 0: + self.write_log(f'{strategy_name} => 订阅行情{strategy.vt_symbol}') + self.subscribe_symbol(strategy_name, vt_symbol=strategy.vt_symbol) # Put event to update init completed status. strategy.inited = True self.put_strategy_event(strategy) - self.write_log(f"{strategy_name}初始化完成") + self.write_log(f"{strategy_name} => 初始化完成") # 初始化后,自动启动策略交易 if auto_start: + self.write_log(f'{strategy_name} => 启动交易') self.start_strategy(strategy_name) except Exception as ex: - msg = f'{strategy_name}执行on_init异常:{str(ex)}' + msg = f'{strategy_name} => 执行on_init异常:{str(ex)}' self.write_error(ex) self.send_wechat(msg) self.write_error(traceback.format_exc()) - def start_strategy(self, strategy_name: str): """ Start a strategy. @@ -1044,12 +1059,12 @@ class CtaEngine(BaseEngine): try: strategy = self.strategies[strategy_name] if not strategy.inited: - msg = f"策略{strategy.strategy_name}启动失败,请先初始化" + msg = f"{strategy.strategy_name} => 策略启动失败,请先初始化" self.write_error(msg) return False, msg if strategy.trading: - msg = f"{strategy_name}已经启动,请勿重复操作" + msg = f"{strategy_name} => 已经启动,请勿重复操作" self.write_error(msg) return False, msg @@ -1058,10 +1073,10 @@ class CtaEngine(BaseEngine): self.put_strategy_event(strategy) - return True, f'成功启动策略{strategy_name}' + return True, f'{strategy_name} => 成功启动' except Exception as ex: - msg = f'{strategy_name}执行on_start异常:{str(ex)}' + msg = f'{strategy_name} => 执行on_start异常:{str(ex)}' self.write_error(ex) self.send_wechat(msg) self.write_error(traceback.format_exc()) @@ -1073,19 +1088,19 @@ class CtaEngine(BaseEngine): try: strategy = self.strategies[strategy_name] if not strategy.trading: - msg = f'{strategy_name}策略实例已处于停止交易状态' + msg = f'{strategy_name} => 策略实例已处于停止交易状态' self.write_log(msg) return False, msg # Call on_stop function of the strategy - self.write_log(f'调用{strategy_name}的on_stop,停止交易') + self.write_log(f'{strategy_name} => 调用的on_stop,停止交易') self.call_strategy_func(strategy, strategy.on_stop) # Change trading status of strategy to False strategy.trading = False # Cancel all orders of the strategy - self.write_log(f'撤销{strategy_name}所有委托') + self.write_log(f'{strategy_name} => 撤销所有委托') self.cancel_all(strategy) # Sync strategy variables to data file @@ -1094,9 +1109,9 @@ class CtaEngine(BaseEngine): # Update GUI self.put_strategy_event(strategy) - return True, f'成功停止策略{strategy_name}' + return True, f'{strategy_name}=> 成功停止' except Exception as ex: - msg = f'执行stop_strategy({strategy_name})异常:{str(ex)}' + msg = f'{strategy_name} => 执行stop_strategy()异常:{str(ex)}' self.write_error(ex) self.send_wechat(msg) self.write_error(traceback.format_exc()) diff --git a/vnpy/gateway/binancef/binancef_gateway.py b/vnpy/gateway/binancef/binancef_gateway.py index 60fa7ace..f27cc724 100644 --- a/vnpy/gateway/binancef/binancef_gateway.py +++ b/vnpy/gateway/binancef/binancef_gateway.py @@ -1015,6 +1015,11 @@ class BinancefDataWebsocketApi(WebsocketClient): self.gateway.write_log(f"找不到该合约代码{req.symbol}") return + if req.symbol.lower() in self.ticks: + self.gateway.write_log(f'{req.symbol}已订阅过,不重复订阅') + return + + self.gateway.write_log(f'开始订阅合约{req.symbol}') # Create tick buf data tick = TickData( symbol=req.symbol, @@ -1027,8 +1032,13 @@ class BinancefDataWebsocketApi(WebsocketClient): # Close previous connection if self._active: - self.stop() - self.join() + try: + self.stop() + from time import sleep + sleep(0.1) + self.join() + except Exception as ex: + self.gateway.write_error(f'订阅合约{req.symbol},关闭前一个ws连接异常:{str(ex)}') # Create new connection channels = []