[bug fix] 币安合约行情连续频繁订阅导致策略初始化失败

This commit is contained in:
msincenselee 2021-05-10 12:26:15 +08:00
parent 56cc6a9a62
commit 5ddbf6541a
2 changed files with 75 additions and 50 deletions

View File

@ -951,47 +951,60 @@ class CtaEngine(BaseEngine):
"""
Add a new strategy.
"""
if strategy_name in self.strategies:
msg = f"创建策略失败,存在重名{strategy_name}"
self.write_log(msg=msg,
level=logging.CRITICAL)
try:
if strategy_name in self.strategies:
msg = f"{strategy_name} => 创建策略失败,存在重名"
self.write_log(msg=msg,
level=logging.CRITICAL)
return False, msg
strategy_class = self.classes.get(class_name, None)
if not strategy_class:
msg = f"{strategy_name} => 创建策略失败,找不到策略类{class_name}"
self.write_log(msg=msg,
level=logging.CRITICAL)
return False, msg
self.write_log(f'{strategy_name} => 实例化策略类{class_name}')
strategy = strategy_class(self, strategy_name, vt_symbol, setting)
self.strategies[strategy_name] = strategy
if len(vt_symbol) > 0:
self.write_log(f'{strategy_name} => 建立 {vt_symbol}行情绑定关系')
# Add vt_symbol to strategy map.
strategies = self.symbol_strategy_map[vt_symbol]
strategies.append(strategy)
subscribe_symbol_set = self.strategy_symbol_map[strategy_name]
subscribe_symbol_set.add(vt_symbol)
# Update to setting file.
self.write_log(f'{strategy_name} => 更新策略配置 => Cta配置文件')
self.update_strategy_setting(strategy_name, setting, auto_init, auto_start)
self.write_log(f'{strategy_name} => 推送事件')
self.put_strategy_event(strategy)
# 判断设置中是否由自动初始化和自动启动项目
if auto_init:
self.write_log(f'{strategy_name} => 自动初始化与启动{auto_start}')
self.init_strategy(strategy_name, auto_start=auto_start)
return True, f'{strategy_name} => 成功添加至引擎'
except Exception as ex:
msg = f'{strategy_name} => 添加策略异常:{str(ex)}'
self.write_log(msg)
self.write_error(traceback.format_exc())
return False, msg
strategy_class = self.classes.get(class_name, None)
if not strategy_class:
msg = f"创建策略失败,找不到策略类{class_name}"
self.write_log(msg=msg,
level=logging.CRITICAL)
return False, msg
self.write_log(f'开始添加策略类{class_name},实例名:{strategy_name}')
strategy = strategy_class(self, strategy_name, vt_symbol, setting)
self.strategies[strategy_name] = strategy
# Add vt_symbol to strategy map.
strategies = self.symbol_strategy_map[vt_symbol]
strategies.append(strategy)
subscribe_symbol_set = self.strategy_symbol_map[strategy_name]
subscribe_symbol_set.add(vt_symbol)
# Update to setting file.
self.update_strategy_setting(strategy_name, setting, auto_init, auto_start)
self.put_strategy_event(strategy)
# 判断设置中是否由自动初始化和自动启动项目
if auto_init:
self.init_strategy(strategy_name, auto_start=auto_start)
return True, f'成功添加{strategy_name}'
def init_strategy(self, strategy_name: str, auto_start: bool = False):
"""
Init a strategy.
"""
task = self.thread_executor.submit(self._init_strategy, strategy_name, auto_start)
self.thread_tasks.append(task)
return True
def _init_strategy(self, strategy_name: str, auto_start: bool = False):
"""
@ -1001,10 +1014,10 @@ class CtaEngine(BaseEngine):
strategy = self.strategies[strategy_name]
if strategy.inited:
self.write_error(f"{strategy_name}已经完成初始化,禁止重复操作")
self.write_error(f"{strategy_name} => 已经完成初始化,禁止重复操作")
return
self.write_log(f"{strategy_name}开始执行初始化")
self.write_log(f"{strategy_name} => 开始执行初始化")
# Call on_init function of strategy
self.call_strategy_func(strategy, strategy.on_init)
@ -1019,24 +1032,26 @@ class CtaEngine(BaseEngine):
# setattr(strategy, name, value)
# Subscribe market data 订阅缺省的vt_symbol, 如果有其他合约需要订阅,由策略内部初始化时提交订阅即可。
self.subscribe_symbol(strategy_name, vt_symbol=strategy.vt_symbol)
if len(strategy.vt_symbol) > 0:
self.write_log(f'{strategy_name} => 订阅行情{strategy.vt_symbol}')
self.subscribe_symbol(strategy_name, vt_symbol=strategy.vt_symbol)
# Put event to update init completed status.
strategy.inited = True
self.put_strategy_event(strategy)
self.write_log(f"{strategy_name}初始化完成")
self.write_log(f"{strategy_name} => 初始化完成")
# 初始化后,自动启动策略交易
if auto_start:
self.write_log(f'{strategy_name} => 启动交易')
self.start_strategy(strategy_name)
except Exception as ex:
msg = f'{strategy_name}执行on_init异常:{str(ex)}'
msg = f'{strategy_name} => 执行on_init异常:{str(ex)}'
self.write_error(ex)
self.send_wechat(msg)
self.write_error(traceback.format_exc())
def start_strategy(self, strategy_name: str):
"""
Start a strategy.
@ -1044,12 +1059,12 @@ class CtaEngine(BaseEngine):
try:
strategy = self.strategies[strategy_name]
if not strategy.inited:
msg = f"策略{strategy.strategy_name}启动失败,请先初始化"
msg = f"{strategy.strategy_name} => 策略启动失败,请先初始化"
self.write_error(msg)
return False, msg
if strategy.trading:
msg = f"{strategy_name}已经启动,请勿重复操作"
msg = f"{strategy_name} => 已经启动,请勿重复操作"
self.write_error(msg)
return False, msg
@ -1058,10 +1073,10 @@ class CtaEngine(BaseEngine):
self.put_strategy_event(strategy)
return True, f'成功启动策略{strategy_name}'
return True, f'{strategy_name} => 成功启动'
except Exception as ex:
msg = f'{strategy_name}执行on_start异常:{str(ex)}'
msg = f'{strategy_name} => 执行on_start异常:{str(ex)}'
self.write_error(ex)
self.send_wechat(msg)
self.write_error(traceback.format_exc())
@ -1073,19 +1088,19 @@ class CtaEngine(BaseEngine):
try:
strategy = self.strategies[strategy_name]
if not strategy.trading:
msg = f'{strategy_name}策略实例已处于停止交易状态'
msg = f'{strategy_name} => 策略实例已处于停止交易状态'
self.write_log(msg)
return False, msg
# Call on_stop function of the strategy
self.write_log(f'调用{strategy_name}的on_stop,停止交易')
self.write_log(f'{strategy_name} => 调用的on_stop,停止交易')
self.call_strategy_func(strategy, strategy.on_stop)
# Change trading status of strategy to False
strategy.trading = False
# Cancel all orders of the strategy
self.write_log(f'撤销{strategy_name}所有委托')
self.write_log(f'{strategy_name} => 撤销所有委托')
self.cancel_all(strategy)
# Sync strategy variables to data file
@ -1094,9 +1109,9 @@ class CtaEngine(BaseEngine):
# Update GUI
self.put_strategy_event(strategy)
return True, f'成功停止策略{strategy_name}'
return True, f'{strategy_name}=> 成功停止'
except Exception as ex:
msg = f'执行stop_strategy({strategy_name})异常:{str(ex)}'
msg = f'{strategy_name} => 执行stop_strategy()异常:{str(ex)}'
self.write_error(ex)
self.send_wechat(msg)
self.write_error(traceback.format_exc())

View File

@ -1015,6 +1015,11 @@ class BinancefDataWebsocketApi(WebsocketClient):
self.gateway.write_log(f"找不到该合约代码{req.symbol}")
return
if req.symbol.lower() in self.ticks:
self.gateway.write_log(f'{req.symbol}已订阅过,不重复订阅')
return
self.gateway.write_log(f'开始订阅合约{req.symbol}')
# Create tick buf data
tick = TickData(
symbol=req.symbol,
@ -1027,8 +1032,13 @@ class BinancefDataWebsocketApi(WebsocketClient):
# Close previous connection
if self._active:
self.stop()
self.join()
try:
self.stop()
from time import sleep
sleep(0.1)
self.join()
except Exception as ex:
self.gateway.write_error(f'订阅合约{req.symbol},关闭前一个ws连接异常:{str(ex)}')
# Create new connection
channels = []