diff --git a/vnpy/app/algo_trading/engine.py b/vnpy/app/algo_trading/engine.py index c2b0a213..77f110f0 100644 --- a/vnpy/app/algo_trading/engine.py +++ b/vnpy/app/algo_trading/engine.py @@ -382,7 +382,7 @@ class AlgoEngine(BaseEngine): algo_logger = self.algo_loggers.get(algo_name, None) if not algo_logger: log_path = get_folder_path('log') - log_filename = os.path.abspath(os.path.join(log_path, str(algo_name))) + log_filename = str(log_path.joinpath(str(algo_name))) print(u'create logger:{}'.format(log_filename)) self.algo_loggers[algo_name] = setup_logger( file_name=log_filename, diff --git a/vnpy/app/cta_crypto/engine.py b/vnpy/app/cta_crypto/engine.py index 3130e026..ade1323c 100644 --- a/vnpy/app/cta_crypto/engine.py +++ b/vnpy/app/cta_crypto/engine.py @@ -1,11 +1,16 @@ """ 数字货币CTA策略运行引擎 +华富资产: """ import importlib import os import sys import traceback +import json +import pickle +import bz2 + from collections import defaultdict from pathlib import Path from typing import Any, Callable @@ -168,6 +173,7 @@ class CtaEngine(BaseEngine): """ self.main_engine.get_strategy_status = self.get_strategy_status self.main_engine.get_strategy_pos = self.get_strategy_pos + self.main_engine.compare_pos = self.compare_pos self.main_engine.add_strategy = self.add_strategy self.main_engine.init_strategy = self.init_strategy self.main_engine.start_strategy = self.start_strategy @@ -179,16 +185,17 @@ class CtaEngine(BaseEngine): # 注册到远程服务调用 if self.main_engine.rpc_service: - self.main_engine.rpc_service.register(self.main_engine.get_strategy_status) - self.main_engine.rpc_service.register(self.main_engine.get_strategy_pos) - self.main_engine.rpc_service.register(self.main_engine.add_strategy) - self.main_engine.rpc_service.register(self.main_engine.init_strategy) - self.main_engine.rpc_service.register(self.main_engine.start_strategy) - self.main_engine.rpc_service.register(self.main_engine.stop_strategy) - self.main_engine.rpc_service.register(self.main_engine.remove_strategy) - self.main_engine.rpc_service.register(self.main_engine.reload_strategy) - self.main_engine.rpc_service.register(self.main_engine.save_strategy_data) - self.main_engine.rpc_service.register(self.main_engine.save_strategy_snapshot) + self.main_engine.rpc_service.register(self.main_engine.get_strategy_status) + self.main_engine.rpc_service.register(self.main_engine.get_strategy_pos) + self.main_engine.rpc_service.register(self.main_engine.compare_pos) + self.main_engine.rpc_service.register(self.main_engine.add_strategy) + self.main_engine.rpc_service.register(self.main_engine.init_strategy) + self.main_engine.rpc_service.register(self.main_engine.start_strategy) + self.main_engine.rpc_service.register(self.main_engine.stop_strategy) + self.main_engine.rpc_service.register(self.main_engine.remove_strategy) + self.main_engine.rpc_service.register(self.main_engine.reload_strategy) + self.main_engine.rpc_service.register(self.main_engine.save_strategy_data) + self.main_engine.rpc_service.register(self.main_engine.save_strategy_snapshot) def process_timer_event(self, event: Event): """ 处理定时器事件""" @@ -316,8 +323,7 @@ class CtaEngine(BaseEngine): trade_dict.update({'idx_price': trade_dict.get('price')}) if strategy_name is not None: - trade_file = os.path.abspath( - os.path.join(get_folder_path('data'), '{}_trade.csv'.format(strategy_name))) + trade_file = str(get_folder_path('data').joinpath('{}_trade.csv'.format(strategy_name))) append_data(file_name=trade_file, dict_data=trade_dict) except Exception as ex: self.write_error(u'写入交易记录csv出错:{},{}'.format(str(ex), traceback.format_exc())) @@ -338,6 +344,7 @@ class CtaEngine(BaseEngine): holding = self.get_position_holding(position.vt_symbol, position.gateway_name) holding.update_position(position) + def check_unsubscribed_symbols(self): """检查未订阅合约""" @@ -941,15 +948,17 @@ class CtaEngine(BaseEngine): Add a new strategy. """ if strategy_name in self.strategies: - self.write_log(msg=f"创建策略失败,存在重名{strategy_name}", + msg = f"创建策略失败,存在重名{strategy_name}" + self.write_log(msg=msg, level=logging.CRITICAL) - return + return False, msg strategy_class = self.classes.get(class_name, None) if not strategy_class: - self.write_log(msg=f"创建策略失败,找不到策略类{class_name}", + msg = f"创建策略失败,找不到策略类{class_name}" + self.write_log(msg=msg, level=logging.CRITICAL) - return + return False, msg self.write_log(f'开始添加策略类{class_name},实例名:{strategy_name}') strategy = strategy_class(self, strategy_name, vt_symbol, setting) @@ -971,6 +980,8 @@ class CtaEngine(BaseEngine): if auto_init: self.init_strategy(strategy_name, auto_start=auto_start) + return True, f'成功添加{strategy_name}' + def init_strategy(self, strategy_name: str, auto_start: bool = False): """ Init a strategy. @@ -1020,19 +1031,21 @@ class CtaEngine(BaseEngine): """ strategy = self.strategies[strategy_name] if not strategy.inited: - self.write_error(f"策略{strategy.strategy_name}启动失败,请先初始化") - return False + msg = f"策略{strategy.strategy_name}启动失败,请先初始化" + self.write_error(msg) + return False, msg if strategy.trading: - self.write_error(f"{strategy_name}已经启动,请勿重复操作") - return False + msg = f"{strategy_name}已经启动,请勿重复操作" + self.write_error(msg) + return False, msg self.call_strategy_func(strategy, strategy.on_start) strategy.trading = True self.put_strategy_event(strategy) - return True + return True, f'成功启动策略{strategy_name}' def stop_strategy(self, strategy_name: str): """ @@ -1040,8 +1053,9 @@ class CtaEngine(BaseEngine): """ strategy = self.strategies[strategy_name] if not strategy.trading: - self.write_log(f'{strategy_name}策略实例已处于停止交易状态') - return False + msg = f'{strategy_name}策略实例已处于停止交易状态' + self.write_log(msg) + return False, msg # Call on_stop function of the strategy self.write_log(f'调用{strategy_name}的on_stop,停止交易') @@ -1060,8 +1074,7 @@ class CtaEngine(BaseEngine): # Update GUI self.put_strategy_event(strategy) - - return True + return True, f'成功停止策略{strategy_name}' def edit_strategy(self, strategy_name: str, setting: dict): """ @@ -1083,8 +1096,9 @@ class CtaEngine(BaseEngine): """ strategy = self.strategies[strategy_name] if strategy.trading: - self.write_error(f"策略{strategy.strategy_name}移除失败,请先停止") - return False + err_msg = f"策略{strategy.strategy_name}移除失败,请先停止" + self.write_error(err_msg) + return False, err_msg # Remove setting self.remove_strategy_setting(strategy_name) @@ -1109,7 +1123,7 @@ class CtaEngine(BaseEngine): self.write_log(f'移除{strategy_name}策略实例') self.strategies.pop(strategy_name) - return True + return True, f'成功移除{strategy_name}策略实例' def reload_strategy(self, strategy_name: str, vt_symbol: str = '', setting: dict = {}): """ @@ -1123,8 +1137,9 @@ class CtaEngine(BaseEngine): # 优先判断重启的策略,是否已经加载 if strategy_name not in self.strategies or strategy_name not in self.strategy_setting: - self.write_error(f"{strategy_name}不在运行策略中,不能重启") - return False + err_msg = f"{strategy_name}不在运行策略中,不能重启" + self.write_error(err_msg) + return False, err_msg # 从本地配置文件中读取 if len(setting) == 0: @@ -1142,7 +1157,9 @@ class CtaEngine(BaseEngine): module_name = self.class_module_map[class_name] # 重新load class module if not self.load_strategy_class_from_module(module_name): - return False + err_msg = f'不能加载模块:{module_name}' + self.write_error(err_msg) + return False, err_msg # 停止当前策略实例的运行,撤单 self.stop_strategy(strategy_name) @@ -1158,8 +1175,9 @@ class CtaEngine(BaseEngine): auto_init=old_strategy_config.get('auto_init', False), auto_start=old_strategy_config.get('auto_start', False)) - self.write_log(f'重新运行策略{strategy_name}执行完毕') - return True + msg = f'成功重载策略{strategy_name}' + self.write_log(msg) + return True, msg def save_strategy_data(self, select_name: str): """ save strategy data""" @@ -1241,6 +1259,11 @@ class CtaEngine(BaseEngine): return # 剩下工作:保存本地文件/数据库 + snapshot_folder = get_folder_path(f'data/snapshots/{strategy_name}') + snapshot_file = snapshot_folder.joinpath('{}.pkb2'.format(datetime.now().strftime('%Y%m%d_%H%M%S'))) + with bz2.BZ2File(str(snapshot_file), 'wb') as f: + pickle.dump(snapshot, f) + self.write_log(u'切片保存成功:{}'.format(str(snapshot_file))) except Exception as ex: self.write_error(u'获取策略{}切片数据异常:'.format(strategy_name, str(ex))) @@ -1417,6 +1440,9 @@ class CtaEngine(BaseEngine): d['date'] = dt.strftime('%Y%m%d') d['hour'] = dt.hour d['datetime'] = datetime.now() + strategy = self.strategies.get(strategy_name) + d['inited'] = strategy.inited + d['trading'] = strategy.trading try: d['pos'] = self.get_strategy_pos(name=strategy_name) except Exception as ex: @@ -1451,6 +1477,146 @@ class CtaEngine(BaseEngine): d.update(strategy.get_parameters()) return d + def compare_pos(self): + """ + 对比账号&策略的持仓,不同的话则发出微信提醒 + :return: + """ + # 当前没有接入网关 + if len(self.main_engine.gateways) == 0: + return False, u'当前没有接入网关' + + self.write_log(u'开始对比账号&策略的持仓') + + # 获取当前策略得持仓 + strategy_pos_list = self.get_all_strategy_pos() + self.write_log(u'策略持仓清单:{}'.format(strategy_pos_list)) + + # 需要进行对比得合约集合(来自策略持仓/账号持仓) + vt_symbols = set() + + # 账号的持仓处理 => account_pos + + compare_pos = dict() # vt_symbol: {'账号多单': xx, '账号空单':xxx, '策略空单':[], '策略多单':[]} + + for holding_key in list(self.holdings.keys()): + # gateway_name.symbol.exchange => symbol.exchange + vt_symbol = '.'.join(holding_key.split('.')[-2:]) + + vt_symbols.add(vt_symbol) + holding = self.holdings.get(holding_key, None) + if holding is None: + continue + + compare_pos[vt_symbol] = OrderedDict( + { + "账号空单": holding.short_pos, + '账号多单': holding.long_pos, + '策略空单': 0, + '策略多单': 0, + '空单策略': [], + '多单策略': [] + } + ) + + # 逐一根据策略仓位,与Account_pos进行处理比对 + for strategy_pos in strategy_pos_list: + for pos in strategy_pos.get('pos', []): + vt_symbol = pos.get('vt_symbol') + if not vt_symbol: + continue + vt_symbols.add(vt_symbol) + symbol_pos = compare_pos.get(vt_symbol, None) + if symbol_pos is None: + self.write_log(u'账号持仓信息获取不到{},创建一个'.format(vt_symbol)) + symbol_pos = OrderedDict( + { + "账号空单": 0, + '账号多单': 0, + '策略空单': 0, + '策略多单': 0, + '空单策略': [], + '多单策略': [] + } + ) + + if pos.get('direction') == 'short': + symbol_pos.update({'策略空单': symbol_pos.get('策略空单', 0) + abs(pos.get('volume', 0))}) + symbol_pos['空单策略'].append( + u'{}({})'.format(strategy_pos['strategy_name'], abs(pos.get('volume', 0)))) + self.write_log(u'更新{}策略持空仓=>{}'.format(vt_symbol, symbol_pos.get('策略空单', 0))) + if pos.get('direction') == 'long': + symbol_pos.update({'策略多单': symbol_pos.get('策略多单', 0) + abs(pos.get('volume', 0))}) + symbol_pos['多单策略'].append( + u'{}({})'.format(strategy_pos['strategy_name'], abs(pos.get('volume', 0)))) + self.write_log(u'更新{}策略持多仓=>{}'.format(vt_symbol, symbol_pos.get('策略多单', 0))) + + pos_compare_result = '' + # 精简输出 + compare_info = '' + for vt_symbol in sorted(vt_symbols): + # 发送不一致得结果 + symbol_pos = compare_pos.pop(vt_symbol) + d_long = { + 'account_id': self.engine_config.get('account_id', '-'), + 'vt_symbol': vt_symbol, + 'direction': Direction.LONG.value, + 'strategy_list': symbol_pos.get('多单策略', [])} + + d_short = { + 'account_id': self.engine_config.get('account_id', '-'), + 'vt_symbol': vt_symbol, + 'direction': Direction.SHORT.value, + 'strategy_list': symbol_pos.get('多单策略', [])} + + # 多空都一致 + if round(symbol_pos['账号空单'], 7) == round(symbol_pos['策略空单'], 7) and \ + round(symbol_pos['账号多单'], 7) == round(symbol_pos['策略多单'], 7): + msg = u'{}多空都一致.{}\n'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False)) + self.write_log(msg) + compare_info += msg + else: + pos_compare_result += '\n{}: '.format(vt_symbol) + # 多单不一致 + if round(symbol_pos['策略多单'], 7) != round(symbol_pos['账号多单'], 7): + msg = '{}多单[账号({}), 策略{},共({})], ' \ + .format(vt_symbol, + symbol_pos['账号多单'], + symbol_pos['多单策略'], + symbol_pos['策略多单']) + + pos_compare_result += msg + self.write_error(u'{}不一致:{}'.format(vt_symbol, msg)) + compare_info += u'{}不一致:{}\n'.format(vt_symbol, msg) + # 空单不一致 + if round(symbol_pos['策略空单'], 7) != round(symbol_pos['账号空单'], 7): + msg = '{}空单[账号({}), 策略{},共({})], ' \ + .format(vt_symbol, + symbol_pos['账号空单'], + symbol_pos['空单策略'], + symbol_pos['策略空单']) + pos_compare_result += msg + self.write_error(u'{}不一致:{}'.format(vt_symbol, msg)) + compare_info += u'{}不一致:{}\n'.format(vt_symbol, msg) + + # 不匹配,输入到stdErr通道 + if pos_compare_result != '': + msg = u'账户{}持仓不匹配: {}' \ + .format(self.engine_config.get('account_id', '-'), + pos_compare_result) + try: + from vnpy.trader.util_wechat import send_wx_msg + send_wx_msg(content=msg) + except Exception as ex: + pass + ret_msg = u'持仓不匹配: {}' \ + .format(pos_compare_result) + self.write_error(ret_msg) + return True, compare_info + ret_msg + else: + self.write_log(u'账户持仓与策略一致') + return True, compare_info + def init_all_strategies(self): """ """ @@ -1559,13 +1725,16 @@ class CtaEngine(BaseEngine): strategy_logger = self.strategy_loggers.get(strategy_name, None) if not strategy_logger: log_path = get_folder_path('log') - log_filename = os.path.abspath(os.path.join(log_path, str(strategy_name))) + log_filename = str(log_path.joinpath(str(strategy_name))) print(u'create logger:{}'.format(log_filename)) self.strategy_loggers[strategy_name] = setup_logger(file_name=log_filename, name=str(strategy_name)) strategy_logger = self.strategy_loggers.get(strategy_name) if strategy_logger: strategy_logger.log(level, msg) + else: + if self.logger: + self.logger.log(level, msg) # 如果日志数据异常,错误和告警,输出至sys.stderr if level in [logging.CRITICAL, logging.ERROR, logging.WARNING]: diff --git a/vnpy/app/cta_crypto/template.py b/vnpy/app/cta_crypto/template.py index 1152f141..511182b8 100644 --- a/vnpy/app/cta_crypto/template.py +++ b/vnpy/app/cta_crypto/template.py @@ -662,7 +662,7 @@ class CtaFutureTemplate(CtaTemplate): 'price': g.open_price}) if self.cur_datetime and (datetime.now() - self.cur_datetime).total_seconds() < 10: - self.write_log(u'当前持仓:{}'.format(pos_list)) + self.write_log(u'{}当前持仓:{}'.format(self.strategy_name, pos_list)) return pos_list def on_trade(self, trade: TradeData): diff --git a/vnpy/app/cta_strategy_pro/engine.py b/vnpy/app/cta_strategy_pro/engine.py index b4cc5136..ebc5a1df 100644 --- a/vnpy/app/cta_strategy_pro/engine.py +++ b/vnpy/app/cta_strategy_pro/engine.py @@ -1,13 +1,20 @@ -"""""" +""" +CTA策略运行引擎增强版 +华富资产 +""" import importlib import os import sys import traceback +import json +import pickle +import bz2 + from collections import defaultdict from pathlib import Path from typing import Any, Callable -from datetime import datetime +from datetime import datetime, timedelta from collections import OrderedDict from concurrent.futures import ThreadPoolExecutor from copy import copy @@ -94,7 +101,11 @@ class CtaEngine(BaseEngine): engine_filename = "cta_strategy_pro_config.json" def __init__(self, main_engine: MainEngine, event_engine: EventEngine): - """""" + """ + 构造函数 + :param main_engine: 主引擎 + :param event_engine: 事件引擎 + """ super().__init__(main_engine, event_engine, APP_NAME) self.engine_config = {} @@ -138,10 +149,12 @@ class CtaEngine(BaseEngine): def init_engine(self): """ """ - self.load_strategy_class() - self.load_strategy_setting() self.register_event() self.register_funcs() + + self.load_strategy_class() + self.load_strategy_setting() + self.write_log("CTA策略引擎初始化成功") def close(self): @@ -164,6 +177,7 @@ class CtaEngine(BaseEngine): """ self.main_engine.get_strategy_status = self.get_strategy_status self.main_engine.get_strategy_pos = self.get_strategy_pos + self.main_engine.compare_pos = self.compare_pos self.main_engine.add_strategy = self.add_strategy self.main_engine.init_strategy = self.init_strategy self.main_engine.start_strategy = self.start_strategy @@ -177,6 +191,7 @@ class CtaEngine(BaseEngine): if self.main_engine.rpc_service: self.main_engine.rpc_service.register(self.main_engine.get_strategy_status) self.main_engine.rpc_service.register(self.main_engine.get_strategy_pos) + self.main_engine.rpc_service.register(self.main_engine.compare_pos) self.main_engine.rpc_service.register(self.main_engine.add_strategy) self.main_engine.rpc_service.register(self.main_engine.init_strategy) self.main_engine.rpc_service.register(self.main_engine.start_strategy) @@ -310,8 +325,7 @@ class CtaEngine(BaseEngine): trade_dict.update({'idx_price': trade_dict.get('price')}) if strategy_name is not None: - trade_file = os.path.abspath( - os.path.join(get_folder_path('data'), '{}_trade.csv'.format(strategy_name))) + trade_file = str(get_folder_path('data').joinpath('{}_trade.csv'.format(strategy_name))) append_data(file_name=trade_file, dict_data=trade_dict) except Exception as ex: self.write_error(u'写入交易记录csv出错:{},{}'.format(str(ex), traceback.format_exc())) @@ -856,15 +870,17 @@ class CtaEngine(BaseEngine): Add a new strategy. """ if strategy_name in self.strategies: - self.write_log(msg=f"创建策略失败,存在重名{strategy_name}", + msg = f"创建策略失败,存在重名{strategy_name}" + self.write_log(msg=msg, level=logging.CRITICAL) - return + return False, msg strategy_class = self.classes.get(class_name, None) if not strategy_class: - self.write_log(msg=f"创建策略失败,找不到策略类{class_name}", + msg = f"创建策略失败,找不到策略类{class_name}" + self.write_log(msg=msg, level=logging.CRITICAL) - return + return False, msg self.write_log(f'开始添加策略类{class_name},实例名:{strategy_name}') strategy = strategy_class(self, strategy_name, vt_symbol, setting) @@ -886,6 +902,8 @@ class CtaEngine(BaseEngine): if auto_init: self.init_strategy(strategy_name, auto_start=auto_start) + return True, f'成功添加{strategy_name}' + def init_strategy(self, strategy_name: str, auto_start: bool = False): """ Init a strategy. @@ -935,26 +953,31 @@ class CtaEngine(BaseEngine): """ strategy = self.strategies[strategy_name] if not strategy.inited: - self.write_error(f"策略{strategy.strategy_name}启动失败,请先初始化") - return + msg = f"策略{strategy.strategy_name}启动失败,请先初始化" + self.write_error(msg) + return False, msg if strategy.trading: - self.write_error(f"{strategy_name}已经启动,请勿重复操作") - return + msg = f"{strategy_name}已经启动,请勿重复操作" + self.write_error(msg) + return False, msg self.call_strategy_func(strategy, strategy.on_start) strategy.trading = True self.put_strategy_event(strategy) + return True, f'成功启动策略{strategy_name}' + def stop_strategy(self, strategy_name: str): """ Stop a strategy. """ strategy = self.strategies[strategy_name] if not strategy.trading: - self.write_log(f'{strategy_name}策略实例已处于停止交易状态') - return + msg = f'{strategy_name}策略实例已处于停止交易状态' + self.write_log(msg) + return False, msg # Call on_stop function of the strategy self.write_log(f'调用{strategy_name}的on_stop,停止交易') @@ -973,6 +996,7 @@ class CtaEngine(BaseEngine): # Update GUI self.put_strategy_event(strategy) + return True, f'成功停止策略{strategy_name}' def edit_strategy(self, strategy_name: str, setting: dict): """ @@ -994,8 +1018,9 @@ class CtaEngine(BaseEngine): """ strategy = self.strategies[strategy_name] if strategy.trading: - self.write_error(f"策略{strategy.strategy_name}移除失败,请先停止") - return + err_msg = f"策略{strategy.strategy_name}移除失败,请先停止" + self.write_error(err_msg) + return False, err_msg # Remove setting self.remove_strategy_setting(strategy_name) @@ -1020,7 +1045,7 @@ class CtaEngine(BaseEngine): self.write_log(f'移除{strategy_name}策略实例') self.strategies.pop(strategy_name) - return True + return True, f'成功移除{strategy_name}策略实例' def reload_strategy(self, strategy_name: str, vt_symbol: str = '', setting: dict = {}): """ @@ -1034,8 +1059,9 @@ class CtaEngine(BaseEngine): # 优先判断重启的策略,是否已经加载 if strategy_name not in self.strategies or strategy_name not in self.strategy_setting: - self.write_error(f"{strategy_name}不在运行策略中,不能重启") - return False + err_msg = f"{strategy_name}不在运行策略中,不能重启" + self.write_error(err_msg) + return False, err_msg # 从本地配置文件中读取 if len(setting) == 0: @@ -1053,7 +1079,9 @@ class CtaEngine(BaseEngine): module_name = self.class_module_map[class_name] # 重新load class module if not self.load_strategy_class_from_module(module_name): - return False + err_msg = f'不能加载模块:{module_name}' + self.write_error(err_msg) + return False, err_msg # 停止当前策略实例的运行,撤单 self.stop_strategy(strategy_name) @@ -1069,8 +1097,9 @@ class CtaEngine(BaseEngine): auto_init=old_strategy_config.get('auto_init', False), auto_start=old_strategy_config.get('auto_start', False)) - self.write_log(f'重新运行策略{strategy_name}执行完毕') - return True + msg = f'成功重载策略{strategy_name}' + self.write_log(msg) + return True, msg def save_strategy_data(self, select_name: str): """ save strategy data""" @@ -1152,6 +1181,11 @@ class CtaEngine(BaseEngine): return # 剩下工作:保存本地文件/数据库 + snapshot_folder = get_folder_path(f'data/snapshots/{strategy_name}') + snapshot_file = snapshot_folder.joinpath('{}.pkb2'.format(datetime.now().strftime('%Y%m%d_%H%M%S'))) + with bz2.BZ2File(str(snapshot_file), 'wb') as f: + pickle.dump(snapshot, f) + self.write_log(u'切片保存成功:{}'.format(str(snapshot_file))) except Exception as ex: self.write_error(u'获取策略{}切片数据异常:'.format(strategy_name, str(ex))) @@ -1236,12 +1270,11 @@ class CtaEngine(BaseEngine): def get_strategy_status(self): """ - return strategy name list with inited/trading status - :param : + return strategy inited/trading status + :param strategy_name: :return: """ - return [{k: {'inited': v.inited, 'trading': v.trading}} for k, v in self.strategies.items()] - + return {k: {'inited': v.inited, 'trading': v.trading} for k, v in self.strategies.items()} def get_strategy_pos(self, name, strategy=None): """ @@ -1408,6 +1441,146 @@ class CtaEngine(BaseEngine): d.update(strategy.get_parameters()) return d + def compare_pos(self): + """ + 对比账号&策略的持仓,不同的话则发出微信提醒 + :return: + """ + # 当前没有接入网关 + if len(self.main_engine.gateways) == 0: + return False, u'当前没有接入网关' + + self.write_log(u'开始对比账号&策略的持仓') + + # 获取当前策略得持仓 + strategy_pos_list = self.get_all_strategy_pos() + self.write_log(u'策略持仓清单:{}'.format(strategy_pos_list)) + + # 需要进行对比得合约集合(来自策略持仓/账号持仓) + vt_symbols = set() + + # 账号的持仓处理 => account_pos + + compare_pos = dict() # vt_symbol: {'账号多单': xx, '账号空单':xxx, '策略空单':[], '策略多单':[]} + + for holding_key in list(self.offset_converter.holdings.keys()): + # gateway_name.symbol.exchange => symbol.exchange + vt_symbol = '.'.join(holding_key.split('.')[-2:]) + + vt_symbols.add(vt_symbol) + holding = self.offset_converter.holdings.get(holding_key, None) + if holding is None: + continue + + compare_pos[vt_symbol] = OrderedDict( + { + "账号空单": holding.short_pos, + '账号多单': holding.long_pos, + '策略空单': 0, + '策略多单': 0, + '空单策略': [], + '多单策略': [] + } + ) + + # 逐一根据策略仓位,与Account_pos进行处理比对 + for strategy_pos in strategy_pos_list: + for pos in strategy_pos.get('pos', []): + vt_symbol = pos.get('vt_symbol') + if not vt_symbol: + continue + vt_symbols.add(vt_symbol) + symbol_pos = compare_pos.get(vt_symbol, None) + if symbol_pos is None: + self.write_log(u'账号持仓信息获取不到{},创建一个'.format(vt_symbol)) + symbol_pos = OrderedDict( + { + "账号空单": 0, + '账号多单': 0, + '策略空单': 0, + '策略多单': 0, + '空单策略': [], + '多单策略': [] + } + ) + + if pos.get('direction') == 'short': + symbol_pos.update({'策略空单': symbol_pos.get('策略空单', 0) + abs(pos.get('volume', 0))}) + symbol_pos['空单策略'].append( + u'{}({})'.format(strategy_pos['strategy_name'], abs(pos.get('volume', 0)))) + self.write_log(u'更新{}策略持空仓=>{}'.format(vt_symbol, symbol_pos.get('策略空单', 0))) + if pos.get('direction') == 'long': + symbol_pos.update({'策略多单': symbol_pos.get('策略多单', 0) + abs(pos.get('volume', 0))}) + symbol_pos['多单策略'].append( + u'{}({})'.format(strategy_pos['strategy_name'], abs(pos.get('volume', 0)))) + self.write_log(u'更新{}策略持多仓=>{}'.format(vt_symbol, symbol_pos.get('策略多单', 0))) + + pos_compare_result = '' + # 精简输出 + compare_info = '' + for vt_symbol in sorted(vt_symbols): + # 发送不一致得结果 + symbol_pos = compare_pos.pop(vt_symbol) + d_long = { + 'account_id': self.engine_config.get('account_id', '-'), + 'vt_symbol': vt_symbol, + 'direction': Direction.LONG.value, + 'strategy_list': symbol_pos.get('多单策略', [])} + + d_short = { + 'account_id': self.engine_config.get('account_id', '-'), + 'vt_symbol': vt_symbol, + 'direction': Direction.SHORT.value, + 'strategy_list': symbol_pos.get('多单策略', [])} + + # 多空都一致 + if round(symbol_pos['账号空单'], 7) == round(symbol_pos['策略空单'], 7) and \ + round(symbol_pos['账号多单'], 7) == round(symbol_pos['策略多单'], 7): + msg = u'{}多空都一致.{}\n'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False)) + self.write_log(msg) + compare_info += msg + else: + pos_compare_result += '\n{}: '.format(vt_symbol) + # 多单不一致 + if round(symbol_pos['策略多单'], 7) != round(symbol_pos['账号多单'], 7): + msg = '{}多单[账号({}), 策略{},共({})], ' \ + .format(vt_symbol, + symbol_pos['账号多单'], + symbol_pos['多单策略'], + symbol_pos['策略多单']) + + pos_compare_result += msg + self.write_error(u'{}不一致:{}'.format(vt_symbol, msg)) + compare_info += u'{}不一致:{}\n'.format(vt_symbol, msg) + # 空单不一致 + if round(symbol_pos['策略空单'], 7) != round(symbol_pos['账号空单'], 7): + msg = '{}空单[账号({}), 策略{},共({})], ' \ + .format(vt_symbol, + symbol_pos['账号空单'], + symbol_pos['空单策略'], + symbol_pos['策略空单']) + pos_compare_result += msg + self.write_error(u'{}不一致:{}'.format(vt_symbol, msg)) + compare_info += u'{}不一致:{}\n'.format(vt_symbol, msg) + + # 不匹配,输入到stdErr通道 + if pos_compare_result != '': + msg = u'账户{}持仓不匹配: {}' \ + .format(self.engine_config.get('account_id', '-'), + pos_compare_result) + try: + from vnpy.trader.util_wechat import send_wx_msg + send_wx_msg(content=msg) + except Exception as ex: + pass + ret_msg = u'持仓不匹配: {}' \ + .format(pos_compare_result) + self.write_error(ret_msg) + return True, compare_info + ret_msg + else: + self.write_log(u'账户持仓与策略一致') + return True, compare_info + def init_all_strategies(self): """ """ @@ -1516,13 +1689,16 @@ class CtaEngine(BaseEngine): strategy_logger = self.strategy_loggers.get(strategy_name, None) if not strategy_logger: log_path = get_folder_path('log') - log_filename = os.path.abspath(os.path.join(log_path, str(strategy_name))) + log_filename = str(log_path.joinpath(str(strategy_name))) print(u'create logger:{}'.format(log_filename)) self.strategy_loggers[strategy_name] = setup_logger(file_name=log_filename, name=str(strategy_name)) strategy_logger = self.strategy_loggers.get(strategy_name) if strategy_logger: strategy_logger.log(level, msg) + else: + if self.logger: + self.logger.log(level, msg) # 如果日志数据异常,错误和告警,输出至sys.stderr if level in [logging.CRITICAL, logging.ERROR, logging.WARNING]: diff --git a/vnpy/app/cta_strategy_pro/portfolio_testing.py b/vnpy/app/cta_strategy_pro/portfolio_testing.py index b8491d05..ce031e75 100644 --- a/vnpy/app/cta_strategy_pro/portfolio_testing.py +++ b/vnpy/app/cta_strategy_pro/portfolio_testing.py @@ -248,13 +248,13 @@ class PortfolioTestingEngine(BackTestingEngine): bar.high_price = float(bar_data['high']) bar.low_price = float(bar_data['low']) bar.volume = int(bar_data['volume']) - bar.date = dt.strftime('%Y-%m-%d') - bar.time = dt.strftime('%H:%M:%S') + bar.date = bar_datetime.strftime('%Y-%m-%d') + bar.time = bar_datetime.strftime('%H:%M:%S') str_td = str(bar_data.get('trading_day', '')) if len(str_td) == 8: bar.trading_day = str_td[0:4] + '-' + str_td[4:6] + '-' + str_td[6:8] else: - bar.trading_day = get_trading_date(dt) + bar.trading_day = get_trading_date(bar_datetime) if last_trading_day != bar.trading_day: self.output(u'回测数据日期:{},资金:{}'.format(bar.trading_day, self.net_capital)) diff --git a/vnpy/component/cta_grid_trade.py b/vnpy/component/cta_grid_trade.py index 24a861ff..58afe8b6 100644 --- a/vnpy/component/cta_grid_trade.py +++ b/vnpy/component/cta_grid_trade.py @@ -185,7 +185,7 @@ class CtaGridTrade(CtaComponent): self.min_dn_open_price = 0.0 # 下网格最小开仓价 # 网格json文件的路径 - self.json_file_path = os.path.join(get_folder_path('data'), f'{self.json_name}_Grids.json') + self.json_file_path = str(get_folder_path('data').joinpath(f'{self.json_name}_Grids.json')) def get_volume_rate(self, idx: int = 0): """获取网格索引对应的开仓数量比例""" @@ -887,7 +887,7 @@ class CtaGridTrade(CtaComponent): self.json_name = self.strategy.strategy_name # 新版网格持久化文件 - grid_json_file = os.path.join(grids_save_path, u'{}_Grids.json'.format(self.json_name)) + grid_json_file = str(grids_save_path.joinpath(u'{}_Grids.json'.format(self.json_name))) self.json_file_path = grid_json_file data = {} @@ -921,7 +921,7 @@ class CtaGridTrade(CtaComponent): self.json_name = self.strategy.strategy_name # 若json文件不存在,就保存一个;若存在,就优先使用数据文件 - grid_json_file = os.path.join(grids_save_path, u'{}_Grids.json'.format(self.json_name)) + grid_json_file = str(grids_save_path.joinpath(u'{}_Grids.json'.format(self.json_name))) if not os.path.exists(grid_json_file): data['up_grids'] = [] data['dn_grids'] = [] @@ -981,7 +981,7 @@ class CtaGridTrade(CtaComponent): self.json_name = new_name # 旧文件 - old_json_file = os.path.join(data_folder, u'{0}_Grids.json'.format(old_name)) + old_json_file = str(data_folder.joinpath(u'{0}_Grids.json'.format(old_name))) if os.path.isfile(old_json_file): # 新文件若存在,移除 try: diff --git a/vnpy/component/cta_policy.py b/vnpy/component/cta_policy.py index 18dd7a4a..91c9c3a5 100644 --- a/vnpy/component/cta_policy.py +++ b/vnpy/component/cta_policy.py @@ -72,8 +72,7 @@ class CtaPolicy(CtaComponent): 从持久化文件中获取 :return: """ - json_file = os.path.abspath( - os.path.join(get_folder_path('data'), u'{}_Policy.json'.format(self.strategy.strategy_name))) + json_file = str(get_folder_path('data').joinpath(u'{}_Policy.json'.format(self.strategy.strategy_name))) json_data = {} if os.path.exists(json_file): @@ -93,8 +92,7 @@ class CtaPolicy(CtaComponent): 保存至持久化文件 :return: """ - json_file = os.path.abspath( - os.path.join(get_folder_path('data'), u'{}_Policy.json'.format(self.strategy.strategy_name))) + json_file = str(get_folder_path('data').joinpath(u'{}_Policy.json'.format(self.strategy.strategy_name))) try: # 修改为:回测时不保存 diff --git a/vnpy/gateway/binancef/binancef_gateway.py b/vnpy/gateway/binancef/binancef_gateway.py index 41e88c90..a0e1e497 100644 --- a/vnpy/gateway/binancef/binancef_gateway.py +++ b/vnpy/gateway/binancef/binancef_gateway.py @@ -110,6 +110,7 @@ class BinancefGateway(BaseGateway): def __init__(self, event_engine: EventEngine, gateway_name="BINANCEF"): """Constructor""" super().__init__(event_engine, gateway_name) + self.count = 0 self.trade_ws_api = BinancefTradeWebsocketApi(self) self.market_ws_api = BinancefDataWebsocketApi(self) @@ -168,9 +169,19 @@ class BinancefGateway(BaseGateway): and self.status.get('mdws_con', False): self.status.update({'con': True}) + self.count += 1 + if self.count < 2: + return + self.count = 0 + + func = self.query_functions.pop(0) + func() + self.query_functions.append(func) + def get_order(self, orderid: str): return self.rest_api.get_order(orderid) + class BinancefRestApi(RestClient): """ BINANCE REST API @@ -201,6 +212,7 @@ class BinancefRestApi(RestClient): self.orders = {} + def sign(self, request: Request) -> Request: """ Generate BINANCE signature. @@ -280,7 +292,8 @@ class BinancefRestApi(RestClient): self.gateway.write_log("REST API启动成功") self.gateway.status.update({'td_con': True, 'td_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) - + if self.gateway.status.get('md_con', False): + self.gateway.status.update({'con': True}) self.query_time() self.query_account() self.query_position() @@ -290,6 +303,10 @@ class BinancefRestApi(RestClient): self.start_user_stream() + # 添加到定时查询队列中 + self.gateway.query_functions = [self.query_account, self.query_position] + + def query_time(self) -> Request: """""" data = { @@ -516,7 +533,7 @@ class BinancefRestApi(RestClient): short_position = PositionData( symbol=d["symbol"], exchange=Exchange.BINANCE, - direction=Direction.LONG, + direction=Direction.SHORT, volume=0, price=0, pnl=0, @@ -805,6 +822,8 @@ class BinancefTradeWebsocketApi(WebsocketClient): """""" self.gateway.write_log("交易Websocket API连接成功") self.gateway.status.update({'tdws_con': True, 'tdws_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) + if self.gateway.status.get('td_con', False): + self.gateway.status.update({'con': True}) def on_packet(self, packet: dict) -> None: # type: (dict)->None """""" @@ -923,6 +942,8 @@ class BinancefDataWebsocketApi(WebsocketClient): """""" self.gateway.write_log("行情Websocket API连接刷新") self.gateway.status.update({'md_con': True, 'md_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) + if self.gateway.status.get('td_con', False): + self.gateway.status.update({'con': True}) def subscribe(self, req: SubscribeRequest) -> None: """""" diff --git a/vnpy/trader/engine.py b/vnpy/trader/engine.py index 843de526..29e99b25 100644 --- a/vnpy/trader/engine.py +++ b/vnpy/trader/engine.py @@ -295,6 +295,7 @@ class BaseEngine(ABC): self.engine_name = engine_name self.logger = None + self.create_logger(engine_name) def create_logger(self, logger_name: str = 'base_engine'): """ @@ -303,7 +304,7 @@ class BaseEngine(ABC): :return: """ log_path = get_folder_path("log") - log_filename = os.path.abspath(os.path.join(log_path, logger_name)) + log_filename = str(log_path.joinpath(logger_name)) print(u'create logger:{}'.format(log_filename)) self.logger = setup_logger(file_name=log_filename, name=logger_name, log_level=SETTINGS.get('log.level', logging.DEBUG)) diff --git a/vnpy/trader/gateway.py b/vnpy/trader/gateway.py index 62bb7b9b..70675dd3 100644 --- a/vnpy/trader/gateway.py +++ b/vnpy/trader/gateway.py @@ -97,13 +97,15 @@ class BaseGateway(ABC): self.klines = {} self.status = {'name': gateway_name, 'con': False} + self.query_functions = [] + def create_logger(self): """ 创建engine独有的日志 :return: """ log_path = get_folder_path("log") - log_filename = os.path.abspath(os.path.join(log_path, self.gateway_name)) + log_filename = str(log_path.joinpath(self.gateway_name)) print(u'create logger:{}'.format(log_filename)) from vnpy.trader.setting import SETTINGS self.logger = setup_logger(file_name=log_filename, name=self.gateway_name, diff --git a/vnpy/trader/ui/kline/kline.py b/vnpy/trader/ui/kline/kline.py index 26e1d54b..92e68030 100644 --- a/vnpy/trader/ui/kline/kline.py +++ b/vnpy/trader/ui/kline/kline.py @@ -1417,9 +1417,11 @@ class KLineWidget(KeyWraper): class GridKline(QtWidgets.QWidget): """多kline""" - def __init__(self, parent=None, kline_settings={}): + def __init__(self, parent=None, kline_settings={}, title=''): self.parent = parent super(GridKline, self).__init__(parent) + if title: + self.setWindowTitle(title) self.kline_settings = kline_settings self.kline_names = list(self.kline_settings.keys()) @@ -1479,11 +1481,14 @@ class GridKline(QtWidgets.QWidget): continue # 加载K线 - data_file = kline_setting.get('data_file', None) - if not data_file: - continue - df = pd.read_csv(data_file) - df = df.set_index(pd.DatetimeIndex(df['datetime'])) + if 'data_frame' in kline_setting: + df = kline_setting['data_frame'] + else: + data_file = kline_setting.get('data_file', None) + if not data_file: + continue + df = pd.read_csv(data_file) + df = df.set_index(pd.DatetimeIndex(df['datetime'])) canvas.loadData(df, main_indicators=kline_setting.get('main_indicators', []), sub_indicators=kline_setting.get('sub_indicators', []) diff --git a/vnpy/trader/ui/kline/ui_snapshot.py b/vnpy/trader/ui/kline/ui_snapshot.py new file mode 100644 index 00000000..40cdb9b2 --- /dev/null +++ b/vnpy/trader/ui/kline/ui_snapshot.py @@ -0,0 +1,72 @@ +# flake8: noqa +""" +多周期显示K线切片, +华富资产 +""" + +import sys +import os +import ctypes +import bz2 +import pickle +import zlib +import pandas as pd + +from vnpy.trader.ui.kline.crosshair import Crosshair +from vnpy.trader.ui.kline.kline import * + +class UiSnapshot(object): + """查看切片""" + def __init__(self): + + pass + + def show(self, snapshot_file: str): + + if not os.path.exists(snapshot_file): + print(f'{snapshot_file}不存在', file=sys.stderr) + return + + d = None + with bz2.BZ2File(snapshot_file, 'rb') as f: + d = pickle.load(f) + + use_zlib = d.get('zlib', False) + klines = d.pop('klines', None) + + # 如果使用压缩,则解压 + if use_zlib and klines: + print('use zlib decompress klines') + klines = pickle.loads(zlib.decompress(klines)) + + kline_settings = {} + for k, v in klines.items(): + # 获取bar各种数据/指标列表 + data_list = v.pop('data_list', None) + if data_list is None: + continue + + # 主图指标 / 附图指标清单 + main_indicators = v.get('main_indicators', []) + sub_indicators = v.get('sub_indicators', []) + + df = pd.DataFrame(data_list) + df = df.set_index(pd.DatetimeIndex(df['datetime'])) + + kline_settings.update( + { + k: + { + "data_frame": df, + "main_indicators": [x.get('name') for x in main_indicators], + "sub_indicators": [x.get('name') for x in sub_indicators] + } + } + ) + # K线界面 + try: + w = GridKline(kline_settings=kline_settings, title=d.get('strategy','')) + w.showMaximized() + + except Exception as ex: + print(u'exception:{},trace:{}'.format(str(ex), traceback.format_exc())) diff --git a/vnpy/trader/util_monitor.py b/vnpy/trader/util_monitor.py new file mode 100644 index 00000000..40b1150c --- /dev/null +++ b/vnpy/trader/util_monitor.py @@ -0,0 +1,186 @@ +# encoding: UTF-8 +# 华富资产 + +import os +from collections import OrderedDict +from typing import Any, Dict + +from .utility import get_folder_path +from .util_logger import setup_logger +from .event import ( + EVENT_TRADE, + EVENT_ORDER, + EVENT_POSITION, + EVENT_ACCOUNT, + EVENT_LOG +) + + +######################################################################## +class BasicMonitor(object): + """ + 基础监控 + + headers中的值对应的字典格式如下 + {'display': u'中文名', 'cell': ""} + + """ + event_type: str = "" + data_key: str = "" + headers: Dict[str, dict] = {} + + # ---------------------------------------------------------------------- + def __init__(self, event_engine=None, monitor_name='BasicMonitor'): + self.event_engine = event_engine + + self.logger = None + + self.create_logger(monitor_name) + self.register_event() + + # ---------------------------------------------------------------------- + def register_event(self): + if self.event_type: + self.event_engine.register(self.event_type, self.update_event) + + # ---------------------------------------------------------------------- + def update_event(self, event): + """收到事件更新""" + data = event.data + self.update_data(data) + + # ---------------------------------------------------------------------- + def update_data(self, data): + """将数据更新到表格中""" + s = [] + for header, value in self.headers.items(): + v = getattr(data, header) + s.append('%s: %s' % (value['display'], str(v))) + if self.logger is not None: + self.logger.info(' '.join(s)) + + def create_logger(self, monitor_name): + """创建日志写入""" + filename = str(get_folder_path('log').joinpath(monitor_name)) + print(u'create logger:{}'.format(filename)) + self.logger = setup_logger(file_name=filename, name=monitor_name) + + +class LogMonitor(BasicMonitor): + """ + Monitor for log data. + """ + + event_type = EVENT_LOG + data_key = "" + + headers = { + "time": {"display": "时间", "update": False}, + "msg": {"display": "信息", "update": False}, + "gateway_name": {"display": "接口", "update": False}, + } + + def __init__(self, event_engine=None, monitor_name='LogMonitor'): + super().__init__(event_engine, monitor_name) + +class TradeMonitor(BasicMonitor): + """ + Monitor for trade data. + """ + + event_type = EVENT_TRADE + data_key = "" + sorting = True + + headers: Dict[str, dict] = { + "tradeid": {"display": "成交号 ", "update": False}, + "orderid": {"display": "委托号", "update": False}, + "symbol": {"display": "代码", "update": False}, + "exchange": {"display": "交易所", "update": False}, + "direction": {"display": "方向", "update": False}, + "offset": {"display": "开平", "update": False}, + "price": {"display": "价格", "update": False}, + "volume": {"display": "数量", "update": False}, + "time": {"display": "时间", "update": False}, + "gateway_name": {"display": "接口", "update": False}, + } + + def __init__(self, event_engine=None, monitor_name='TradeMonitor'): + super().__init__(event_engine, monitor_name) + +class OrderMonitor(BasicMonitor): + """ + Monitor for order data. + """ + + event_type = EVENT_ORDER + data_key = "vt_orderid" + sorting = True + + headers: Dict[str, dict] = { + "orderid": {"display": "委托号", "update": False}, + "symbol": {"display": "代码", "update": False}, + "exchange": {"display": "交易所", "update": False}, + "type": {"display": "类型", "update": False}, + "direction": {"display": "方向", "update": False}, + "offset": {"display": "开平", "update": False}, + "price": {"display": "价格", "update": False}, + "volume": {"display": "总数量", "update": True}, + "traded": {"display": "已成交", "update": True}, + "status": {"display": "状态", "update": True}, + "time": {"display": "时间", "update": True}, + "gateway_name": {"display": "接口", "update": False}, + } + + def __init__(self, event_engine=None, monitor_name='OrderMonitor'): + super().__init__(event_engine, monitor_name) + +class PositionMonitor(BasicMonitor): + """ + Monitor for position data. + """ + + event_type = EVENT_POSITION + data_key = "vt_positionid" + sorting = True + + headers = { + "symbol": {"display": "代码", "update": False}, + "exchange": {"display": "交易所", "update": False}, + "direction": {"display": "方向", "update": False}, + "volume": {"display": "数量", "update": True}, + "yd_volume": {"display": "昨仓", "update": True}, + "frozen": {"display": "冻结", "update": True}, + "price": {"display": "均价", "update": True}, + "pnl": {"display": "盈亏", "update": True}, + "gateway_name": {"display": "接口", "update": False}, + } + + def __init__(self, event_engine=None, monitor_name='PositionMonitor'): + super().__init__(event_engine, monitor_name) + + +class AccountMonitor(BasicMonitor): + """ + Monitor for account data. + """ + + event_type = EVENT_ACCOUNT + data_key = "vt_accountid" + sorting = True + + headers = { + "accountid": {"display": "账号", "update": False}, + "pre_balance": {"display": "昨净值", "update": False}, + "balance": {"display": "净值", "update": True}, + "frozen": {"display": "冻结", "update": True}, + "margin": {"display": "保证金", "update": True}, + "available": {"display": "可用", "update": True}, + "commission": {"display": "手续费", "update": True}, + "close_profit": {"display": "平仓收益", "update": True}, + "holding_profit": {"display": "持仓收益", "update": True}, + "gateway_name": {"display": "接口", "update": False}, + } + + def __init__(self, event_engine=None, monitor_name='AccountMonitor'): + super().__init__(event_engine, monitor_name) diff --git a/vnpy/trader/util_pid.py b/vnpy/trader/util_pid.py index 1ee619bc..dee0a54c 100644 --- a/vnpy/trader/util_pid.py +++ b/vnpy/trader/util_pid.py @@ -8,7 +8,7 @@ import os # 一般使用与运行服务,且唯一进程 # 日志文件路径 -logs_path = os.path.abspath(os.path.join(os.getcwd(), 'logs')) +logs_path = os.path.abspath(os.path.join(os.getcwd(), 'log')) if not os.path.isdir(logs_path): os.mkdir(logs_path) assert os.path.isdir(logs_path) @@ -16,7 +16,6 @@ assert os.path.isdir(logs_path) # 记录pid得文件 pid_file = os.path.abspath(os.path.join(logs_path, 'pid.txt')) - def _check_pid(pid): """ 检查pid是否与当前进程pid一致 diff --git a/vnpy/trader/utility.py b/vnpy/trader/utility.py index 4933b873..45cbc0c5 100644 --- a/vnpy/trader/utility.py +++ b/vnpy/trader/utility.py @@ -233,7 +233,8 @@ def get_folder_path(folder_name: str) -> Path: """ folder_path = TEMP_DIR.joinpath(folder_name) if not folder_path.exists(): - folder_path.mkdir() + os.makedirs(str(folder_path)) + #folder_path.mkdir() return folder_path