From c4391950566eaca3551e56a88db53cb51f5402b0 Mon Sep 17 00:00:00 2001 From: msincenselee Date: Sat, 13 Mar 2021 22:23:43 +0800 Subject: [PATCH] =?UTF-8?q?[update]=20=E5=A2=9E=E5=8A=A0=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=89=80=E6=9C=89=E6=8C=81=E4=BB=93=EF=BC=8C=E4=B8=80=E4=BA=9B?= =?UTF-8?q?bug=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- centos_setup.md | 2 +- requirements.txt | 3 +- vnpy/app/cta_crypto/back_testing.py | 10 +- vnpy/app/cta_crypto/engine.py | 302 ++++++++++++---------- vnpy/app/cta_crypto/template.py | 14 +- vnpy/app/cta_stock/back_testing.py | 11 +- vnpy/app/cta_stock/template.py | 15 +- vnpy/app/cta_strategy_pro/back_testing.py | 42 ++- vnpy/app/cta_strategy_pro/engine.py | 8 +- vnpy/app/cta_strategy_pro/template.py | 5 + vnpy/gateway/ctp/ctp_gateway.py | 6 +- vnpy/gateway/rohon/rohon_gateway.py | 12 +- 12 files changed, 274 insertions(+), 156 deletions(-) diff --git a/centos_setup.md b/centos_setup.md index c61eb307..7ffd26ec 100644 --- a/centos_setup.md +++ b/centos_setup.md @@ -23,7 +23,7 @@ conda create -name py37=python3.7 解决: sudo find / -name libta_lib.so.0 - /home/ai/eco-ta/ta-lib/src/.libs/libta_lib.so.0 + /home/trade/ta-lib/src/.libs/libta_lib.so.0 /usr/local/lib/libta_lib.so.0 vi /etc/profile 添加 diff --git a/requirements.txt b/requirements.txt index 3aec27aa..6255c9e2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,7 +13,6 @@ matplotlib seaborn futu-api tigeropen -rqdatac ta-lib ibapi deap @@ -22,4 +21,4 @@ QScintilla PySocks pykalman cython -tqsdk + diff --git a/vnpy/app/cta_crypto/back_testing.py b/vnpy/app/cta_crypto/back_testing.py index bdb91af2..6ba774a8 100644 --- a/vnpy/app/cta_crypto/back_testing.py +++ b/vnpy/app/cta_crypto/back_testing.py @@ -1841,7 +1841,7 @@ class BackTestingEngine(object): self.daily_max_drawdown_rate = drawdown_rate self.max_drawdown_rate_time = data['date'] - msg = u'{}: net={}, capital={} max={} margin={} commission={}, pos: {}' \ + msg = u'{}: net={}, capital={} max={} holding_profit={} commission={}, pos: \n{}' \ .format(data['date'], data['net'], c, m, today_holding_profit, @@ -1922,6 +1922,7 @@ class BackTestingEngine(object): d = {} d['init_capital'] = self.init_capital d['profit'] = self.cur_capital - self.init_capital + d['net_capital'] = self.net_capital d['max_capital'] = self.max_net_capital # 取消原 maxCapital if len(self.pnl_list) == 0: @@ -2002,8 +2003,11 @@ class BackTestingEngine(object): result_info.update({u'期初资金': d['init_capital']}) self.output(u'期初资金:\t%s' % format_number(d['init_capital'])) - result_info.update({u'总盈亏': d['profit']}) - self.output(u'总盈亏:\t%s' % format_number(d['profit'])) + result_info.update({u'期末资金': d['net_capital']}) + self.output(u'期末资金:\t%s' % format_number(d['net_capital'])) + + result_info.update({u'平仓盈亏': d['profit']}) + self.output(u'平仓盈亏:\t%s' % format_number(d['profit'])) result_info.update({u'资金最高净值': d['max_capital']}) self.output(u'资金最高净值:\t%s' % format_number(d['max_capital'])) diff --git a/vnpy/app/cta_crypto/engine.py b/vnpy/app/cta_crypto/engine.py index 11021057..561678f8 100644 --- a/vnpy/app/cta_crypto/engine.py +++ b/vnpy/app/cta_crypto/engine.py @@ -858,6 +858,10 @@ class CtaEngine(BaseEngine): vt_position_id = f"{gateway_name}.{vt_symbol}.{direction.value}" return self.main_engine.get_position(vt_position_id) + def get_all_positions(self): + """查询当前帐号得所有持仓合约""" + return self.main_engine.get_all_positions() + def get_engine_type(self): """""" return self.engine_type @@ -993,88 +997,110 @@ class CtaEngine(BaseEngine): """ Init strategies in queue. """ - strategy = self.strategies[strategy_name] + try: + strategy = self.strategies[strategy_name] - if strategy.inited: - self.write_error(f"{strategy_name}已经完成初始化,禁止重复操作") - return + if strategy.inited: + self.write_error(f"{strategy_name}已经完成初始化,禁止重复操作") + return - self.write_log(f"{strategy_name}开始执行初始化") + self.write_log(f"{strategy_name}开始执行初始化") - # Call on_init function of strategy - self.call_strategy_func(strategy, strategy.on_init) + # Call on_init function of strategy + self.call_strategy_func(strategy, strategy.on_init) - # Restore strategy data(variables) - # Pro 版本不使用自动恢复除了内部数据功能,由策略自身初始化时完成 - # data = self.strategy_data.get(strategy_name, None) - # if data: - # for name in strategy.variables: - # value = data.get(name, None) - # if value: - # setattr(strategy, name, value) + # Restore strategy data(variables) + # Pro 版本不使用自动恢复除了内部数据功能,由策略自身初始化时完成 + # data = self.strategy_data.get(strategy_name, None) + # if data: + # for name in strategy.variables: + # value = data.get(name, None) + # if value: + # setattr(strategy, name, value) - # Subscribe market data 订阅缺省的vt_symbol, 如果有其他合约需要订阅,由策略内部初始化时提交订阅即可。 - self.subscribe_symbol(strategy_name, vt_symbol=strategy.vt_symbol) + # Subscribe market data 订阅缺省的vt_symbol, 如果有其他合约需要订阅,由策略内部初始化时提交订阅即可。 + self.subscribe_symbol(strategy_name, vt_symbol=strategy.vt_symbol) - # Put event to update init completed status. - strategy.inited = True - self.put_strategy_event(strategy) - self.write_log(f"{strategy_name}初始化完成") + # Put event to update init completed status. + strategy.inited = True + self.put_strategy_event(strategy) + self.write_log(f"{strategy_name}初始化完成") + + # 初始化后,自动启动策略交易 + if auto_start: + self.start_strategy(strategy_name) + + except Exception as ex: + msg = f'{strategy_name}执行on_init异常:{str(ex)}' + self.write_error(ex) + self.send_wechat(msg) + self.write_error(traceback.format_exc()) - # 初始化后,自动启动策略交易 - if auto_start: - self.start_strategy(strategy_name) def start_strategy(self, strategy_name: str): """ Start a strategy. """ - strategy = self.strategies[strategy_name] - if not strategy.inited: - msg = f"策略{strategy.strategy_name}启动失败,请先初始化" - self.write_error(msg) - return False, msg + try: + strategy = self.strategies[strategy_name] + if not strategy.inited: + msg = f"策略{strategy.strategy_name}启动失败,请先初始化" + self.write_error(msg) + return False, msg - if strategy.trading: - msg = f"{strategy_name}已经启动,请勿重复操作" - self.write_error(msg) - return False, msg + if strategy.trading: + msg = f"{strategy_name}已经启动,请勿重复操作" + self.write_error(msg) + return False, msg - self.call_strategy_func(strategy, strategy.on_start) - strategy.trading = True + self.call_strategy_func(strategy, strategy.on_start) + strategy.trading = True - self.put_strategy_event(strategy) + self.put_strategy_event(strategy) - return True, f'成功启动策略{strategy_name}' + return True, f'成功启动策略{strategy_name}' + + except Exception as ex: + msg = f'{strategy_name}执行on_start异常:{str(ex)}' + self.write_error(ex) + self.send_wechat(msg) + self.write_error(traceback.format_exc()) def stop_strategy(self, strategy_name: str): """ Stop a strategy. """ - strategy = self.strategies[strategy_name] - if not strategy.trading: - msg = f'{strategy_name}策略实例已处于停止交易状态' - self.write_log(msg) - return False, msg + try: + strategy = self.strategies[strategy_name] + if not strategy.trading: + msg = f'{strategy_name}策略实例已处于停止交易状态' + self.write_log(msg) + return False, msg - # Call on_stop function of the strategy - self.write_log(f'调用{strategy_name}的on_stop,停止交易') - self.call_strategy_func(strategy, strategy.on_stop) + # Call on_stop function of the strategy + self.write_log(f'调用{strategy_name}的on_stop,停止交易') + self.call_strategy_func(strategy, strategy.on_stop) - # Change trading status of strategy to False - strategy.trading = False + # Change trading status of strategy to False + strategy.trading = False - # Cancel all orders of the strategy - self.write_log(f'撤销{strategy_name}所有委托') - self.cancel_all(strategy) + # Cancel all orders of the strategy + self.write_log(f'撤销{strategy_name}所有委托') + self.cancel_all(strategy) - # Sync strategy variables to data file - # 取消此功能,由策略自身完成数据的持久化 - # self.sync_strategy_data(strategy) + # Sync strategy variables to data file + # 取消此功能,由策略自身完成数据的持久化 + # self.sync_strategy_data(strategy) - # Update GUI - self.put_strategy_event(strategy) - return True, f'成功停止策略{strategy_name}' + # Update GUI + self.put_strategy_event(strategy) + return True, f'成功停止策略{strategy_name}' + except Exception as ex: + msg = f'执行stop_strategy({strategy_name})异常:{str(ex)}' + self.write_error(ex) + self.send_wechat(msg) + self.write_error(traceback.format_exc()) + return False, f'停止策略失败{strategy_name},异常:{str(ex)}' def edit_strategy(self, strategy_name: str, setting: dict): """ @@ -1094,36 +1120,45 @@ class CtaEngine(BaseEngine): """ Remove a strategy. """ - strategy = self.strategies[strategy_name] - if strategy.trading: - err_msg = f"策略{strategy.strategy_name}移除失败,请先停止" - self.write_error(err_msg) - return False, err_msg + try: - # Remove setting - self.remove_strategy_setting(strategy_name) + strategy = self.strategies[strategy_name] + if strategy.trading: + err_msg = f"策略{strategy.strategy_name}移除失败,请先停止" + self.write_error(err_msg) + return False, err_msg - # 移除订阅合约与策略的关联关系 - for vt_symbol in self.strategy_symbol_map[strategy_name]: - # Remove from symbol strategy map - self.write_log(f'移除{vt_symbol}《=》{strategy_name}的订阅关系') - strategies = self.symbol_strategy_map[vt_symbol] - strategies.remove(strategy) + # Remove setting + self.remove_strategy_setting(strategy_name) - # Remove from active orderid map - if strategy_name in self.strategy_orderid_map: - vt_orderids = self.strategy_orderid_map.pop(strategy_name) - self.write_log(f'移除{strategy_name}的所有委托订单映射关系') - # Remove vt_orderid strategy map - for vt_orderid in vt_orderids: - if vt_orderid in self.orderid_strategy_map: - self.orderid_strategy_map.pop(vt_orderid) + # 移除订阅合约与策略的关联关系 + for vt_symbol in self.strategy_symbol_map[strategy_name]: + # Remove from symbol strategy map + self.write_log(f'移除{vt_symbol}《=》{strategy_name}的订阅关系') + strategies = self.symbol_strategy_map[vt_symbol] + strategies.remove(strategy) - # Remove from strategies - self.write_log(f'移除{strategy_name}策略实例') - self.strategies.pop(strategy_name) + # Remove from active orderid map + if strategy_name in self.strategy_orderid_map: + vt_orderids = self.strategy_orderid_map.pop(strategy_name) + self.write_log(f'移除{strategy_name}的所有委托订单映射关系') + # Remove vt_orderid strategy map + for vt_orderid in vt_orderids: + if vt_orderid in self.orderid_strategy_map: + self.orderid_strategy_map.pop(vt_orderid) - return True, f'成功移除{strategy_name}策略实例' + # Remove from strategies + self.write_log(f'移除{strategy_name}策略实例') + self.strategies.pop(strategy_name) + + return True, f'成功移除{strategy_name}策略实例' + + except Exception as ex: + msg = f'执行remove_strategy({strategy_name})异常:{str(ex)}' + self.write_error(ex) + self.send_wechat(msg) + self.write_error(traceback.format_exc()) + return False, f'移除策略失败{strategy_name},异常:{str(ex)}' def reload_strategy(self, strategy_name: str, vt_symbol: str = '', setting: dict = {}): """ @@ -1133,63 +1168,70 @@ class CtaEngine(BaseEngine): :param setting: :return: """ - self.write_log(f'开始重新加载策略{strategy_name}') + try: + self.write_log(f'开始重新加载策略{strategy_name}') - # 优先判断重启的策略,是否已经加载 - if strategy_name not in self.strategies or strategy_name not in self.strategy_setting: - err_msg = f"{strategy_name}不在运行策略中,不能重启" - self.write_error(err_msg) - return False, err_msg - - # 从本地配置文件中读取 - if len(setting) == 0: - strategies_setting = load_json(self.setting_filename) - old_strategy_config = strategies_setting.get(strategy_name, {}) - else: - old_strategy_config = copy(self.strategy_setting[strategy_name]) - - class_name = old_strategy_config.get('class_name') - if len(vt_symbol) == 0: - vt_symbol = old_strategy_config.get('vt_symbol') - if len(setting) == 0: - setting = old_strategy_config.get('setting') - - module_name = self.class_module_map[class_name] - # 重新load class module - #if not self.load_strategy_class_from_module(module_name): - # err_msg = f'不能加载模块:{module_name}' - # self.write_error(err_msg) - # return False, err_msg - if module_name: - new_class_name = module_name + '.' + class_name - self.write_log(u'转换策略为全路径:{}'.format(new_class_name)) - - strategy_class = import_module_by_str(new_class_name) - if strategy_class is None: - err_msg = u'加载策略模块失败:{}'.format(class_name) + # 优先判断重启的策略,是否已经加载 + if strategy_name not in self.strategies or strategy_name not in self.strategy_setting: + err_msg = f"{strategy_name}不在运行策略中,不能重启" self.write_error(err_msg) return False, err_msg - self.write_log(f'重新加载模块成功,使用新模块:{new_class_name}') - self.classes[class_name] = strategy_class + # 从本地配置文件中读取 + if len(setting) == 0: + strategies_setting = load_json(self.setting_filename) + old_strategy_config = strategies_setting.get(strategy_name, {}) + else: + old_strategy_config = copy(self.strategy_setting[strategy_name]) - # 停止当前策略实例的运行,撤单 - self.stop_strategy(strategy_name) + class_name = old_strategy_config.get('class_name') + if len(vt_symbol) == 0: + vt_symbol = old_strategy_config.get('vt_symbol') + if len(setting) == 0: + setting = old_strategy_config.get('setting') - # 移除运行中的策略实例 - self.remove_strategy(strategy_name) + module_name = self.class_module_map[class_name] + # 重新load class module + #if not self.load_strategy_class_from_module(module_name): + # err_msg = f'不能加载模块:{module_name}' + # self.write_error(err_msg) + # return False, err_msg + if module_name: + new_class_name = module_name + '.' + class_name + self.write_log(u'转换策略为全路径:{}'.format(new_class_name)) - # 重新添加策略 - self.add_strategy(class_name=class_name, - strategy_name=strategy_name, - vt_symbol=vt_symbol, - setting=setting, - auto_init=old_strategy_config.get('auto_init', False), - auto_start=old_strategy_config.get('auto_start', False)) + strategy_class = import_module_by_str(new_class_name) + if strategy_class is None: + err_msg = u'加载策略模块失败:{}'.format(class_name) + self.write_error(err_msg) + return False, err_msg - msg = f'成功重载策略{strategy_name}' - self.write_log(msg) - return True, msg + self.write_log(f'重新加载模块成功,使用新模块:{new_class_name}') + self.classes[class_name] = strategy_class + + # 停止当前策略实例的运行,撤单 + self.stop_strategy(strategy_name) + + # 移除运行中的策略实例 + self.remove_strategy(strategy_name) + + # 重新添加策略 + self.add_strategy(class_name=class_name, + strategy_name=strategy_name, + vt_symbol=vt_symbol, + setting=setting, + auto_init=old_strategy_config.get('auto_init', False), + auto_start=old_strategy_config.get('auto_start', False)) + + msg = f'成功重载策略{strategy_name}' + self.write_log(msg) + return True, msg + except Exception as ex: + msg = f'执行reload_strategy({strategy_name})异常:{str(ex)}' + self.write_error(ex) + self.send_wechat(msg) + self.write_error(traceback.format_exc()) + return False, f'重启策略失败{strategy_name},异常:{str(ex)}' def save_strategy_data(self, select_name: str = 'ALL'): """ save strategy data""" diff --git a/vnpy/app/cta_crypto/template.py b/vnpy/app/cta_crypto/template.py index 905ecd5f..65658dad 100644 --- a/vnpy/app/cta_crypto/template.py +++ b/vnpy/app/cta_crypto/template.py @@ -975,7 +975,7 @@ class CtaFutureTemplate(CtaTemplate): else: self.write_error(u'委托空单平仓失败') - def grid_buy(self, grid): + def grid_buy(self, grid, **kwargs): """ 事务开多仓 :return: @@ -1002,7 +1002,7 @@ class CtaFutureTemplate(CtaTemplate): .format(grid.type, grid.open_price, grid.volume, grid.close_price)) return False - def grid_short(self, grid): + def grid_short(self, grid, **kwargs): """ 事务开空仓 :return: @@ -1029,7 +1029,7 @@ class CtaFutureTemplate(CtaTemplate): .format(grid.type, grid.open_price, grid.volume, grid.close_price)) return False - def grid_sell(self, grid): + def grid_sell(self, grid, **kwargs): """ 事务平多单仓位 1.来源自止损止盈平仓 @@ -1090,7 +1090,7 @@ class CtaFutureTemplate(CtaTemplate): return True - def grid_cover(self, grid): + def grid_cover(self, grid, **kwargs): """ 事务平空单仓位 1.来源自止损止盈平仓 @@ -1179,13 +1179,13 @@ class CtaFutureTemplate(CtaTemplate): # 只处理未成交的限价委托单 if order_status in [Status.SUBMITTING, Status.NOTTRADED] and order_type == OrderType.LIMIT: if over_seconds > self.cancel_seconds or force: # 超过设置的时间还未成交 - self.write_log(u'超时{}秒未成交,取消委托单:vt_orderid:{},order:{}' - .format(over_seconds, vt_orderid, order_info)) + self.write_log(u'{}超时{}秒未成交,取消委托单:vt_orderid:{},order:{}' + .format(order_vt_symbol, over_seconds, vt_orderid, order_info)) order_info.update({'status': Status.CANCELLING}) self.active_orders.update({vt_orderid: order_info}) ret = self.cancel_order(str(vt_orderid)) if not ret: - self.write_log(u'撤单失败,更新状态为撤单成功') + self.write_log(f'{order_vt_symbol}撤单失败,更新状态为撤单成功') order_info.update({'status': Status.CANCELLED}) self.active_orders.update({vt_orderid: order_info}) if order_grid and vt_orderid in order_grid.order_ids: diff --git a/vnpy/app/cta_stock/back_testing.py b/vnpy/app/cta_stock/back_testing.py index ff8a2489..3ec531c4 100644 --- a/vnpy/app/cta_stock/back_testing.py +++ b/vnpy/app/cta_stock/back_testing.py @@ -212,11 +212,12 @@ class BackTestingEngine(object): self.test_setting = None # 回测设置 self.strategy_setting = None # 所有回测策略得设置 - def create_fund_kline(self, name, use_renko=False): + def create_fund_kline(self, name, use_renko=False, extra_setting = {}): """ 创建资金曲线 :param name: 账号名,或者策略名 - :param use_renko: + :param use_renko:是否使用砖图 + :param extra_setting: 扩展得k线设置,例如macd等 :return: """ setting = {} @@ -228,7 +229,9 @@ class BackTestingEngine(object): setting['price_tick'] = 0.01 setting['underlying_symbol'] = 'fund' setting['is_7x24'] = self.is_7x24 - + for k,v in extra_setting.items(): + if k not in setting: + setting.update({k:v}) if use_renko: # 使用砖图,高度是资金的千分之一 setting['height'] = self.init_capital * 0.001 @@ -1255,7 +1258,7 @@ class BackTestingEngine(object): """更新持仓信息,把今仓=>昨仓""" for k, v in self.positions.items(): - if v.volume > 0: + if v.volume != v.yd_volume: self.write_log(f'调整{v.vt_symbol}持仓: 昨仓{v.yd_volume} => {v.volume}') v.yd_volume = v.volume diff --git a/vnpy/app/cta_stock/template.py b/vnpy/app/cta_stock/template.py index f574e8b8..604d4752 100644 --- a/vnpy/app/cta_stock/template.py +++ b/vnpy/app/cta_stock/template.py @@ -598,20 +598,29 @@ class CtaStockTemplate(CtaTemplate): self.write_log(f'清除委托单:{lg.order_ids}') [self.cta_engine.cancel_order(self, vt_orderid) for vt_orderid in lg.order_ids] lg.order_ids = [] - if lg.open_status and not lg.close_status and not lg.order_status: + + # 持仓 + if lg.open_status and not lg.close_status and not lg.order_status and lg.volume > 0: pos = self.get_position(lg.vt_symbol) + pos.price = round((pos.price * pos.volume + lg.open_price * lg.volume) / (pos.volume + lg.volume),3) pos.volume += lg.volume lg.traded_volume = 0 self.write_log(u'持仓状态,加载持仓多单[{},价格:{},数量:{}手, 开仓时间:{}' .format(lg.vt_symbol, lg.open_price, lg.volume, lg.open_time)) self.positions.update({lg.vt_symbol: pos}) + + # 部分开仓 elif lg.order_status and not lg.open_status and not lg.close_status and lg.traded_volume > 0: pos = self.get_position(lg.vt_symbol) + pos.price = round( + (pos.price * pos.volume + lg.open_price * lg.traded_volume) / (pos.volume + lg.traded_volume), 3) pos.volume += lg.traded_volume self.write_log(u'开仓状态,加载部分持仓多单[{},价格:{},数量:{}手, 开仓时间:{}' .format(lg.vt_symbol, lg.open_price, lg.traded_volume, lg.open_time)) self.positions.update({lg.vt_symbol: pos}) - elif lg.order_status and lg.open_status and lg.close_status: + + # 正在平仓 => + elif lg.order_status and lg.open_status and lg.close_status and lg.volume > 0: if lg.traded_volume > 0: old_volume = lg.volume lg.volume -= lg.traded_volume @@ -619,6 +628,8 @@ class CtaStockTemplate(CtaTemplate): lg.traded_volume = 0 pos = self.get_position(lg.vt_symbol) + pos.price = round( + (pos.price * pos.volume + lg.open_price * lg.volume) / (pos.volume + lg.volume), 3) pos.volume += lg.volume self.write_log(u'卖出状态,加载持仓多单[{},价格:{},数量:{}手, 开仓时间:{}' .format(lg.vt_symbol, lg.open_price, lg.volume, lg.open_time)) diff --git a/vnpy/app/cta_strategy_pro/back_testing.py b/vnpy/app/cta_strategy_pro/back_testing.py index 91acbc3e..ec989edd 100644 --- a/vnpy/app/cta_strategy_pro/back_testing.py +++ b/vnpy/app/cta_strategy_pro/back_testing.py @@ -212,11 +212,12 @@ class BackTestingEngine(object): self.test_setting = None # 回测设置 self.strategy_setting = None # 所有回测策略得设置 - def create_fund_kline(self, name, use_renko=False): + def create_fund_kline(self, name, use_renko=False, extra_setting = {}): """ 创建资金曲线 :param name: 账号名,或者策略名 :param use_renko: + :param extra_setting: 扩展得k线设置,例如macd等 :return: """ setting = {} @@ -232,6 +233,10 @@ class BackTestingEngine(object): setting['height'] = self.init_capital * 0.001 setting['use_renko'] = True + for k, v in extra_setting.items(): + if k not in setting: + setting.update({k: v}) + fund_kline = FundKline(cta_engine=self, setting=setting) self.fund_kline_dict.update({name: fund_kline}) return fund_kline @@ -254,6 +259,36 @@ class BackTestingEngine(object): else: return None + # todo wj + def save_fund_kline(self, name: str = None): + # 没有指定账号,并且存在一个或多个资金K线 + if len(self.fund_kline_dict) > 0: + # 优先找vt_setting中,配置了strategy_groud的资金K线 + kline = self.fund_kline_dict.get(name, None) + # 找不到,返回第一个 + if kline is None: + kline = self.fund_kline_dict.values()[0] + + kline_file = str(os.path.join(self.get_data_path(), 'fund_{}.csv'.format(name))) + # 如果数据文件存在,则删除数据 + if os.path.exists(kline_file): + os.remove(kline_file) + + # 设置 kline的输出文件 + kline.kline.export_filename = kline_file + kline.kline.export_fields = [ + {'name': 'datetime', 'source': 'bar', 'attr': 'datetime', 'type_': 'datetime'}, + {'name': 'open', 'source': 'bar', 'attr': 'open_price', 'type_': 'float'}, + {'name': 'high', 'source': 'bar', 'attr': 'high_price', 'type_': 'float'}, + {'name': 'low', 'source': 'bar', 'attr': 'low_price', 'type_': 'float'}, + {'name': 'close', 'source': 'bar', 'attr': 'close_price', 'type_': 'float'}, + {'name': 'turnover', 'source': 'bar', 'attr': 'turnover', 'type_': 'float'}, + {'name': 'volume', 'source': 'bar', 'attr': 'volume', 'type_': 'float'}, + {'name': 'open_interest', 'source': 'bar', 'attr': 'open_interest', 'type_': 'float'} + ] + + kline.save() + def get_account(self, vt_accountid: str = ""): """返回账号的实时权益,可用资金,仓位比例,投资仓位比例上限""" if self.net_capital == 0.0: @@ -2199,6 +2234,11 @@ class BackTestingEngine(object): def show_backtesting_result(self): """显示回测结果""" + # 导出资金曲线 + if self.active_fund_kline: + for key in self.fund_kline_dict.keys(): + self.save_fund_kline(key) + d, daily_net_capital, daily_capital = self.get_result() if len(d) == 0: diff --git a/vnpy/app/cta_strategy_pro/engine.py b/vnpy/app/cta_strategy_pro/engine.py index abeb798b..32f63cc9 100644 --- a/vnpy/app/cta_strategy_pro/engine.py +++ b/vnpy/app/cta_strategy_pro/engine.py @@ -220,14 +220,20 @@ class CtaEngine(BaseEngine): def process_timer_event(self, event: Event): """ 处理定时器事件""" + all_trading = True + dt = datetime.now() + # 触发每个策略的定时接口 for strategy in list(self.strategies.values()): strategy.on_timer() if not strategy.trading: all_trading = False - dt = datetime.now() + # 临近夜晚收盘前,强制发出撤单 + if dt.hour == 2 and dt.minute == 59 and dt.second >= 55: + self.cancel_all(strategy) + # 每分钟执行的逻辑 if self.last_minute != dt.minute: self.last_minute = dt.minute diff --git a/vnpy/app/cta_strategy_pro/template.py b/vnpy/app/cta_strategy_pro/template.py index 94dd338e..f6be43c8 100644 --- a/vnpy/app/cta_strategy_pro/template.py +++ b/vnpy/app/cta_strategy_pro/template.py @@ -904,6 +904,11 @@ class CtaProTemplate(CtaTemplate): order_time = order_info.get('order_time') over_ms = (dt - order_time).total_seconds() + + # 白天开盘或许有指数与真实tick的时间延迟,这个时刻不做撤单功能 + if f'{dt.hour}:{dt.minute}' in ['10:30', '13:30']: + continue + if (over_ms > self.cancel_seconds) \ or force: # 超过设置的时间还未成交 self.write_log(f'{dt}, 超时{over_ms}秒未成交,取消委托单:{order_info}') diff --git a/vnpy/gateway/ctp/ctp_gateway.py b/vnpy/gateway/ctp/ctp_gateway.py index 4e54024f..246f2f8a 100644 --- a/vnpy/gateway/ctp/ctp_gateway.py +++ b/vnpy/gateway/ctp/ctp_gateway.py @@ -4,6 +4,7 @@ import sys import traceback import json from datetime import datetime, timedelta +from time import time from copy import copy, deepcopy from functools import lru_cache from typing import List @@ -2055,8 +2056,9 @@ class TqMdApi(): """ 更新行情/委托/账户/持仓 """ - while self.api.wait_update(): - + while self.is_connected: + deadline = time() + 5 + self.api.wait_update(deadline=deadline) # 更新行情信息 for vt_symbol, quote in self.quote_objs: if self.api.is_changing(quote): diff --git a/vnpy/gateway/rohon/rohon_gateway.py b/vnpy/gateway/rohon/rohon_gateway.py index 66398d45..472cc8be 100644 --- a/vnpy/gateway/rohon/rohon_gateway.py +++ b/vnpy/gateway/rohon/rohon_gateway.py @@ -357,6 +357,10 @@ class RohonGateway(BaseGateway): return False if not self.td_api.connect_status or self.md_api.connect_status: + if not self.td_api.connect_status: + self.write_error(f'交易服务器连接断开') + if not self.md_api.connect_status: + self.write_error(f'行情服务器连接断开') return False return True @@ -572,7 +576,8 @@ class RohonMdApi(MdApi): Callback when front server is disconnected. """ self.login_status = False - self.gateway.write_log(f"行情服务器连接断开,原因{reason}") + self.connect_status = False + self.gateway.write_error(f"行情服务器连接断开,原因{reason}") self.gateway.status.update({'md_con': False, 'md_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool): @@ -690,6 +695,7 @@ class RohonMdApi(MdApi): self.init() self.connect_status = True + # If already connected, then login immediately. elif not self.login_status: self.login() @@ -773,8 +779,8 @@ class RohonTdApi(TdApi): def onFrontDisconnected(self, reason: int): """""" self.login_status = False - self.gateway.write_log(f"交易服务器连接断开,原因{reason}") - self.gateway.status.update({'td_con': True, 'td_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) + self.gateway.write_error(f"交易服务器连接断开,原因{reason}") + self.gateway.status.update({'td_con': False, 'td_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) def onRspAuthenticate(self, data: dict, error: dict, reqid: int, last: bool): """"""