[增强] gateway增加状态,增加noui下得日志监控,切片数据重放

This commit is contained in:
msincenselee 2020-03-22 22:14:56 +08:00
parent d653da2097
commit 152fc1d83b
15 changed files with 720 additions and 90 deletions

View File

@ -382,7 +382,7 @@ class AlgoEngine(BaseEngine):
algo_logger = self.algo_loggers.get(algo_name, None)
if not algo_logger:
log_path = get_folder_path('log')
log_filename = os.path.abspath(os.path.join(log_path, str(algo_name)))
log_filename = str(log_path.joinpath(str(algo_name)))
print(u'create logger:{}'.format(log_filename))
self.algo_loggers[algo_name] = setup_logger(
file_name=log_filename,

View File

@ -1,11 +1,16 @@
"""
数字货币CTA策略运行引擎
华富资产
"""
import importlib
import os
import sys
import traceback
import json
import pickle
import bz2
from collections import defaultdict
from pathlib import Path
from typing import Any, Callable
@ -168,6 +173,7 @@ class CtaEngine(BaseEngine):
"""
self.main_engine.get_strategy_status = self.get_strategy_status
self.main_engine.get_strategy_pos = self.get_strategy_pos
self.main_engine.compare_pos = self.compare_pos
self.main_engine.add_strategy = self.add_strategy
self.main_engine.init_strategy = self.init_strategy
self.main_engine.start_strategy = self.start_strategy
@ -179,16 +185,17 @@ class CtaEngine(BaseEngine):
# 注册到远程服务调用
if self.main_engine.rpc_service:
self.main_engine.rpc_service.register(self.main_engine.get_strategy_status)
self.main_engine.rpc_service.register(self.main_engine.get_strategy_pos)
self.main_engine.rpc_service.register(self.main_engine.add_strategy)
self.main_engine.rpc_service.register(self.main_engine.init_strategy)
self.main_engine.rpc_service.register(self.main_engine.start_strategy)
self.main_engine.rpc_service.register(self.main_engine.stop_strategy)
self.main_engine.rpc_service.register(self.main_engine.remove_strategy)
self.main_engine.rpc_service.register(self.main_engine.reload_strategy)
self.main_engine.rpc_service.register(self.main_engine.save_strategy_data)
self.main_engine.rpc_service.register(self.main_engine.save_strategy_snapshot)
self.main_engine.rpc_service.register(self.main_engine.get_strategy_status)
self.main_engine.rpc_service.register(self.main_engine.get_strategy_pos)
self.main_engine.rpc_service.register(self.main_engine.compare_pos)
self.main_engine.rpc_service.register(self.main_engine.add_strategy)
self.main_engine.rpc_service.register(self.main_engine.init_strategy)
self.main_engine.rpc_service.register(self.main_engine.start_strategy)
self.main_engine.rpc_service.register(self.main_engine.stop_strategy)
self.main_engine.rpc_service.register(self.main_engine.remove_strategy)
self.main_engine.rpc_service.register(self.main_engine.reload_strategy)
self.main_engine.rpc_service.register(self.main_engine.save_strategy_data)
self.main_engine.rpc_service.register(self.main_engine.save_strategy_snapshot)
def process_timer_event(self, event: Event):
""" 处理定时器事件"""
@ -316,8 +323,7 @@ class CtaEngine(BaseEngine):
trade_dict.update({'idx_price': trade_dict.get('price')})
if strategy_name is not None:
trade_file = os.path.abspath(
os.path.join(get_folder_path('data'), '{}_trade.csv'.format(strategy_name)))
trade_file = str(get_folder_path('data').joinpath('{}_trade.csv'.format(strategy_name)))
append_data(file_name=trade_file, dict_data=trade_dict)
except Exception as ex:
self.write_error(u'写入交易记录csv出错{},{}'.format(str(ex), traceback.format_exc()))
@ -338,6 +344,7 @@ class CtaEngine(BaseEngine):
holding = self.get_position_holding(position.vt_symbol, position.gateway_name)
holding.update_position(position)
def check_unsubscribed_symbols(self):
"""检查未订阅合约"""
@ -941,15 +948,17 @@ class CtaEngine(BaseEngine):
Add a new strategy.
"""
if strategy_name in self.strategies:
self.write_log(msg=f"创建策略失败,存在重名{strategy_name}",
msg = f"创建策略失败,存在重名{strategy_name}"
self.write_log(msg=msg,
level=logging.CRITICAL)
return
return False, msg
strategy_class = self.classes.get(class_name, None)
if not strategy_class:
self.write_log(msg=f"创建策略失败,找不到策略类{class_name}",
msg = f"创建策略失败,找不到策略类{class_name}"
self.write_log(msg=msg,
level=logging.CRITICAL)
return
return False, msg
self.write_log(f'开始添加策略类{class_name},实例名:{strategy_name}')
strategy = strategy_class(self, strategy_name, vt_symbol, setting)
@ -971,6 +980,8 @@ class CtaEngine(BaseEngine):
if auto_init:
self.init_strategy(strategy_name, auto_start=auto_start)
return True, f'成功添加{strategy_name}'
def init_strategy(self, strategy_name: str, auto_start: bool = False):
"""
Init a strategy.
@ -1020,19 +1031,21 @@ class CtaEngine(BaseEngine):
"""
strategy = self.strategies[strategy_name]
if not strategy.inited:
self.write_error(f"策略{strategy.strategy_name}启动失败,请先初始化")
return False
msg = f"策略{strategy.strategy_name}启动失败,请先初始化"
self.write_error(msg)
return False, msg
if strategy.trading:
self.write_error(f"{strategy_name}已经启动,请勿重复操作")
return False
msg = f"{strategy_name}已经启动,请勿重复操作"
self.write_error(msg)
return False, msg
self.call_strategy_func(strategy, strategy.on_start)
strategy.trading = True
self.put_strategy_event(strategy)
return True
return True, f'成功启动策略{strategy_name}'
def stop_strategy(self, strategy_name: str):
"""
@ -1040,8 +1053,9 @@ class CtaEngine(BaseEngine):
"""
strategy = self.strategies[strategy_name]
if not strategy.trading:
self.write_log(f'{strategy_name}策略实例已处于停止交易状态')
return False
msg = f'{strategy_name}策略实例已处于停止交易状态'
self.write_log(msg)
return False, msg
# Call on_stop function of the strategy
self.write_log(f'调用{strategy_name}的on_stop,停止交易')
@ -1060,8 +1074,7 @@ class CtaEngine(BaseEngine):
# Update GUI
self.put_strategy_event(strategy)
return True
return True, f'成功停止策略{strategy_name}'
def edit_strategy(self, strategy_name: str, setting: dict):
"""
@ -1083,8 +1096,9 @@ class CtaEngine(BaseEngine):
"""
strategy = self.strategies[strategy_name]
if strategy.trading:
self.write_error(f"策略{strategy.strategy_name}移除失败,请先停止")
return False
err_msg = f"策略{strategy.strategy_name}移除失败,请先停止"
self.write_error(err_msg)
return False, err_msg
# Remove setting
self.remove_strategy_setting(strategy_name)
@ -1109,7 +1123,7 @@ class CtaEngine(BaseEngine):
self.write_log(f'移除{strategy_name}策略实例')
self.strategies.pop(strategy_name)
return True
return True, f'成功移除{strategy_name}策略实例'
def reload_strategy(self, strategy_name: str, vt_symbol: str = '', setting: dict = {}):
"""
@ -1123,8 +1137,9 @@ class CtaEngine(BaseEngine):
# 优先判断重启的策略,是否已经加载
if strategy_name not in self.strategies or strategy_name not in self.strategy_setting:
self.write_error(f"{strategy_name}不在运行策略中,不能重启")
return False
err_msg = f"{strategy_name}不在运行策略中,不能重启"
self.write_error(err_msg)
return False, err_msg
# 从本地配置文件中读取
if len(setting) == 0:
@ -1142,7 +1157,9 @@ class CtaEngine(BaseEngine):
module_name = self.class_module_map[class_name]
# 重新load class module
if not self.load_strategy_class_from_module(module_name):
return False
err_msg = f'不能加载模块:{module_name}'
self.write_error(err_msg)
return False, err_msg
# 停止当前策略实例的运行,撤单
self.stop_strategy(strategy_name)
@ -1158,8 +1175,9 @@ class CtaEngine(BaseEngine):
auto_init=old_strategy_config.get('auto_init', False),
auto_start=old_strategy_config.get('auto_start', False))
self.write_log(f'重新运行策略{strategy_name}执行完毕')
return True
msg = f'成功重载策略{strategy_name}'
self.write_log(msg)
return True, msg
def save_strategy_data(self, select_name: str):
""" save strategy data"""
@ -1241,6 +1259,11 @@ class CtaEngine(BaseEngine):
return
# 剩下工作:保存本地文件/数据库
snapshot_folder = get_folder_path(f'data/snapshots/{strategy_name}')
snapshot_file = snapshot_folder.joinpath('{}.pkb2'.format(datetime.now().strftime('%Y%m%d_%H%M%S')))
with bz2.BZ2File(str(snapshot_file), 'wb') as f:
pickle.dump(snapshot, f)
self.write_log(u'切片保存成功:{}'.format(str(snapshot_file)))
except Exception as ex:
self.write_error(u'获取策略{}切片数据异常:'.format(strategy_name, str(ex)))
@ -1417,6 +1440,9 @@ class CtaEngine(BaseEngine):
d['date'] = dt.strftime('%Y%m%d')
d['hour'] = dt.hour
d['datetime'] = datetime.now()
strategy = self.strategies.get(strategy_name)
d['inited'] = strategy.inited
d['trading'] = strategy.trading
try:
d['pos'] = self.get_strategy_pos(name=strategy_name)
except Exception as ex:
@ -1451,6 +1477,146 @@ class CtaEngine(BaseEngine):
d.update(strategy.get_parameters())
return d
def compare_pos(self):
"""
对比账号&策略的持仓,不同的话则发出微信提醒
:return:
"""
# 当前没有接入网关
if len(self.main_engine.gateways) == 0:
return False, u'当前没有接入网关'
self.write_log(u'开始对比账号&策略的持仓')
# 获取当前策略得持仓
strategy_pos_list = self.get_all_strategy_pos()
self.write_log(u'策略持仓清单:{}'.format(strategy_pos_list))
# 需要进行对比得合约集合(来自策略持仓/账号持仓)
vt_symbols = set()
# 账号的持仓处理 => account_pos
compare_pos = dict() # vt_symbol: {'账号多单': xx, '账号空单':xxx, '策略空单':[], '策略多单':[]}
for holding_key in list(self.holdings.keys()):
# gateway_name.symbol.exchange => symbol.exchange
vt_symbol = '.'.join(holding_key.split('.')[-2:])
vt_symbols.add(vt_symbol)
holding = self.holdings.get(holding_key, None)
if holding is None:
continue
compare_pos[vt_symbol] = OrderedDict(
{
"账号空单": holding.short_pos,
'账号多单': holding.long_pos,
'策略空单': 0,
'策略多单': 0,
'空单策略': [],
'多单策略': []
}
)
# 逐一根据策略仓位与Account_pos进行处理比对
for strategy_pos in strategy_pos_list:
for pos in strategy_pos.get('pos', []):
vt_symbol = pos.get('vt_symbol')
if not vt_symbol:
continue
vt_symbols.add(vt_symbol)
symbol_pos = compare_pos.get(vt_symbol, None)
if symbol_pos is None:
self.write_log(u'账号持仓信息获取不到{},创建一个'.format(vt_symbol))
symbol_pos = OrderedDict(
{
"账号空单": 0,
'账号多单': 0,
'策略空单': 0,
'策略多单': 0,
'空单策略': [],
'多单策略': []
}
)
if pos.get('direction') == 'short':
symbol_pos.update({'策略空单': symbol_pos.get('策略空单', 0) + abs(pos.get('volume', 0))})
symbol_pos['空单策略'].append(
u'{}({})'.format(strategy_pos['strategy_name'], abs(pos.get('volume', 0))))
self.write_log(u'更新{}策略持空仓=>{}'.format(vt_symbol, symbol_pos.get('策略空单', 0)))
if pos.get('direction') == 'long':
symbol_pos.update({'策略多单': symbol_pos.get('策略多单', 0) + abs(pos.get('volume', 0))})
symbol_pos['多单策略'].append(
u'{}({})'.format(strategy_pos['strategy_name'], abs(pos.get('volume', 0))))
self.write_log(u'更新{}策略持多仓=>{}'.format(vt_symbol, symbol_pos.get('策略多单', 0)))
pos_compare_result = ''
# 精简输出
compare_info = ''
for vt_symbol in sorted(vt_symbols):
# 发送不一致得结果
symbol_pos = compare_pos.pop(vt_symbol)
d_long = {
'account_id': self.engine_config.get('account_id', '-'),
'vt_symbol': vt_symbol,
'direction': Direction.LONG.value,
'strategy_list': symbol_pos.get('多单策略', [])}
d_short = {
'account_id': self.engine_config.get('account_id', '-'),
'vt_symbol': vt_symbol,
'direction': Direction.SHORT.value,
'strategy_list': symbol_pos.get('多单策略', [])}
# 多空都一致
if round(symbol_pos['账号空单'], 7) == round(symbol_pos['策略空单'], 7) and \
round(symbol_pos['账号多单'], 7) == round(symbol_pos['策略多单'], 7):
msg = u'{}多空都一致.{}\n'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False))
self.write_log(msg)
compare_info += msg
else:
pos_compare_result += '\n{}: '.format(vt_symbol)
# 多单不一致
if round(symbol_pos['策略多单'], 7) != round(symbol_pos['账号多单'], 7):
msg = '{}多单[账号({}), 策略{},共({})], ' \
.format(vt_symbol,
symbol_pos['账号多单'],
symbol_pos['多单策略'],
symbol_pos['策略多单'])
pos_compare_result += msg
self.write_error(u'{}不一致:{}'.format(vt_symbol, msg))
compare_info += u'{}不一致:{}\n'.format(vt_symbol, msg)
# 空单不一致
if round(symbol_pos['策略空单'], 7) != round(symbol_pos['账号空单'], 7):
msg = '{}空单[账号({}), 策略{},共({})], ' \
.format(vt_symbol,
symbol_pos['账号空单'],
symbol_pos['空单策略'],
symbol_pos['策略空单'])
pos_compare_result += msg
self.write_error(u'{}不一致:{}'.format(vt_symbol, msg))
compare_info += u'{}不一致:{}\n'.format(vt_symbol, msg)
# 不匹配输入到stdErr通道
if pos_compare_result != '':
msg = u'账户{}持仓不匹配: {}' \
.format(self.engine_config.get('account_id', '-'),
pos_compare_result)
try:
from vnpy.trader.util_wechat import send_wx_msg
send_wx_msg(content=msg)
except Exception as ex:
pass
ret_msg = u'持仓不匹配: {}' \
.format(pos_compare_result)
self.write_error(ret_msg)
return True, compare_info + ret_msg
else:
self.write_log(u'账户持仓与策略一致')
return True, compare_info
def init_all_strategies(self):
"""
"""
@ -1559,13 +1725,16 @@ class CtaEngine(BaseEngine):
strategy_logger = self.strategy_loggers.get(strategy_name, None)
if not strategy_logger:
log_path = get_folder_path('log')
log_filename = os.path.abspath(os.path.join(log_path, str(strategy_name)))
log_filename = str(log_path.joinpath(str(strategy_name)))
print(u'create logger:{}'.format(log_filename))
self.strategy_loggers[strategy_name] = setup_logger(file_name=log_filename,
name=str(strategy_name))
strategy_logger = self.strategy_loggers.get(strategy_name)
if strategy_logger:
strategy_logger.log(level, msg)
else:
if self.logger:
self.logger.log(level, msg)
# 如果日志数据异常错误和告警输出至sys.stderr
if level in [logging.CRITICAL, logging.ERROR, logging.WARNING]:

View File

@ -662,7 +662,7 @@ class CtaFutureTemplate(CtaTemplate):
'price': g.open_price})
if self.cur_datetime and (datetime.now() - self.cur_datetime).total_seconds() < 10:
self.write_log(u'当前持仓:{}'.format(pos_list))
self.write_log(u'{}当前持仓:{}'.format(self.strategy_name, pos_list))
return pos_list
def on_trade(self, trade: TradeData):

View File

@ -1,13 +1,20 @@
""""""
"""
CTA策略运行引擎增强版
华富资产
"""
import importlib
import os
import sys
import traceback
import json
import pickle
import bz2
from collections import defaultdict
from pathlib import Path
from typing import Any, Callable
from datetime import datetime
from datetime import datetime, timedelta
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
from copy import copy
@ -94,7 +101,11 @@ class CtaEngine(BaseEngine):
engine_filename = "cta_strategy_pro_config.json"
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
"""
构造函数
:param main_engine: 主引擎
:param event_engine: 事件引擎
"""
super().__init__(main_engine, event_engine, APP_NAME)
self.engine_config = {}
@ -138,10 +149,12 @@ class CtaEngine(BaseEngine):
def init_engine(self):
"""
"""
self.load_strategy_class()
self.load_strategy_setting()
self.register_event()
self.register_funcs()
self.load_strategy_class()
self.load_strategy_setting()
self.write_log("CTA策略引擎初始化成功")
def close(self):
@ -164,6 +177,7 @@ class CtaEngine(BaseEngine):
"""
self.main_engine.get_strategy_status = self.get_strategy_status
self.main_engine.get_strategy_pos = self.get_strategy_pos
self.main_engine.compare_pos = self.compare_pos
self.main_engine.add_strategy = self.add_strategy
self.main_engine.init_strategy = self.init_strategy
self.main_engine.start_strategy = self.start_strategy
@ -177,6 +191,7 @@ class CtaEngine(BaseEngine):
if self.main_engine.rpc_service:
self.main_engine.rpc_service.register(self.main_engine.get_strategy_status)
self.main_engine.rpc_service.register(self.main_engine.get_strategy_pos)
self.main_engine.rpc_service.register(self.main_engine.compare_pos)
self.main_engine.rpc_service.register(self.main_engine.add_strategy)
self.main_engine.rpc_service.register(self.main_engine.init_strategy)
self.main_engine.rpc_service.register(self.main_engine.start_strategy)
@ -310,8 +325,7 @@ class CtaEngine(BaseEngine):
trade_dict.update({'idx_price': trade_dict.get('price')})
if strategy_name is not None:
trade_file = os.path.abspath(
os.path.join(get_folder_path('data'), '{}_trade.csv'.format(strategy_name)))
trade_file = str(get_folder_path('data').joinpath('{}_trade.csv'.format(strategy_name)))
append_data(file_name=trade_file, dict_data=trade_dict)
except Exception as ex:
self.write_error(u'写入交易记录csv出错{},{}'.format(str(ex), traceback.format_exc()))
@ -856,15 +870,17 @@ class CtaEngine(BaseEngine):
Add a new strategy.
"""
if strategy_name in self.strategies:
self.write_log(msg=f"创建策略失败,存在重名{strategy_name}",
msg = f"创建策略失败,存在重名{strategy_name}"
self.write_log(msg=msg,
level=logging.CRITICAL)
return
return False, msg
strategy_class = self.classes.get(class_name, None)
if not strategy_class:
self.write_log(msg=f"创建策略失败,找不到策略类{class_name}",
msg = f"创建策略失败,找不到策略类{class_name}"
self.write_log(msg=msg,
level=logging.CRITICAL)
return
return False, msg
self.write_log(f'开始添加策略类{class_name},实例名:{strategy_name}')
strategy = strategy_class(self, strategy_name, vt_symbol, setting)
@ -886,6 +902,8 @@ class CtaEngine(BaseEngine):
if auto_init:
self.init_strategy(strategy_name, auto_start=auto_start)
return True, f'成功添加{strategy_name}'
def init_strategy(self, strategy_name: str, auto_start: bool = False):
"""
Init a strategy.
@ -935,26 +953,31 @@ class CtaEngine(BaseEngine):
"""
strategy = self.strategies[strategy_name]
if not strategy.inited:
self.write_error(f"策略{strategy.strategy_name}启动失败,请先初始化")
return
msg = f"策略{strategy.strategy_name}启动失败,请先初始化"
self.write_error(msg)
return False, msg
if strategy.trading:
self.write_error(f"{strategy_name}已经启动,请勿重复操作")
return
msg = f"{strategy_name}已经启动,请勿重复操作"
self.write_error(msg)
return False, msg
self.call_strategy_func(strategy, strategy.on_start)
strategy.trading = True
self.put_strategy_event(strategy)
return True, f'成功启动策略{strategy_name}'
def stop_strategy(self, strategy_name: str):
"""
Stop a strategy.
"""
strategy = self.strategies[strategy_name]
if not strategy.trading:
self.write_log(f'{strategy_name}策略实例已处于停止交易状态')
return
msg = f'{strategy_name}策略实例已处于停止交易状态'
self.write_log(msg)
return False, msg
# Call on_stop function of the strategy
self.write_log(f'调用{strategy_name}的on_stop,停止交易')
@ -973,6 +996,7 @@ class CtaEngine(BaseEngine):
# Update GUI
self.put_strategy_event(strategy)
return True, f'成功停止策略{strategy_name}'
def edit_strategy(self, strategy_name: str, setting: dict):
"""
@ -994,8 +1018,9 @@ class CtaEngine(BaseEngine):
"""
strategy = self.strategies[strategy_name]
if strategy.trading:
self.write_error(f"策略{strategy.strategy_name}移除失败,请先停止")
return
err_msg = f"策略{strategy.strategy_name}移除失败,请先停止"
self.write_error(err_msg)
return False, err_msg
# Remove setting
self.remove_strategy_setting(strategy_name)
@ -1020,7 +1045,7 @@ class CtaEngine(BaseEngine):
self.write_log(f'移除{strategy_name}策略实例')
self.strategies.pop(strategy_name)
return True
return True, f'成功移除{strategy_name}策略实例'
def reload_strategy(self, strategy_name: str, vt_symbol: str = '', setting: dict = {}):
"""
@ -1034,8 +1059,9 @@ class CtaEngine(BaseEngine):
# 优先判断重启的策略,是否已经加载
if strategy_name not in self.strategies or strategy_name not in self.strategy_setting:
self.write_error(f"{strategy_name}不在运行策略中,不能重启")
return False
err_msg = f"{strategy_name}不在运行策略中,不能重启"
self.write_error(err_msg)
return False, err_msg
# 从本地配置文件中读取
if len(setting) == 0:
@ -1053,7 +1079,9 @@ class CtaEngine(BaseEngine):
module_name = self.class_module_map[class_name]
# 重新load class module
if not self.load_strategy_class_from_module(module_name):
return False
err_msg = f'不能加载模块:{module_name}'
self.write_error(err_msg)
return False, err_msg
# 停止当前策略实例的运行,撤单
self.stop_strategy(strategy_name)
@ -1069,8 +1097,9 @@ class CtaEngine(BaseEngine):
auto_init=old_strategy_config.get('auto_init', False),
auto_start=old_strategy_config.get('auto_start', False))
self.write_log(f'重新运行策略{strategy_name}执行完毕')
return True
msg = f'成功重载策略{strategy_name}'
self.write_log(msg)
return True, msg
def save_strategy_data(self, select_name: str):
""" save strategy data"""
@ -1152,6 +1181,11 @@ class CtaEngine(BaseEngine):
return
# 剩下工作:保存本地文件/数据库
snapshot_folder = get_folder_path(f'data/snapshots/{strategy_name}')
snapshot_file = snapshot_folder.joinpath('{}.pkb2'.format(datetime.now().strftime('%Y%m%d_%H%M%S')))
with bz2.BZ2File(str(snapshot_file), 'wb') as f:
pickle.dump(snapshot, f)
self.write_log(u'切片保存成功:{}'.format(str(snapshot_file)))
except Exception as ex:
self.write_error(u'获取策略{}切片数据异常:'.format(strategy_name, str(ex)))
@ -1236,12 +1270,11 @@ class CtaEngine(BaseEngine):
def get_strategy_status(self):
"""
return strategy name list with inited/trading status
:param :
return strategy inited/trading status
:param strategy_name:
:return:
"""
return [{k: {'inited': v.inited, 'trading': v.trading}} for k, v in self.strategies.items()]
return {k: {'inited': v.inited, 'trading': v.trading} for k, v in self.strategies.items()}
def get_strategy_pos(self, name, strategy=None):
"""
@ -1408,6 +1441,146 @@ class CtaEngine(BaseEngine):
d.update(strategy.get_parameters())
return d
def compare_pos(self):
"""
对比账号&策略的持仓,不同的话则发出微信提醒
:return:
"""
# 当前没有接入网关
if len(self.main_engine.gateways) == 0:
return False, u'当前没有接入网关'
self.write_log(u'开始对比账号&策略的持仓')
# 获取当前策略得持仓
strategy_pos_list = self.get_all_strategy_pos()
self.write_log(u'策略持仓清单:{}'.format(strategy_pos_list))
# 需要进行对比得合约集合(来自策略持仓/账号持仓)
vt_symbols = set()
# 账号的持仓处理 => account_pos
compare_pos = dict() # vt_symbol: {'账号多单': xx, '账号空单':xxx, '策略空单':[], '策略多单':[]}
for holding_key in list(self.offset_converter.holdings.keys()):
# gateway_name.symbol.exchange => symbol.exchange
vt_symbol = '.'.join(holding_key.split('.')[-2:])
vt_symbols.add(vt_symbol)
holding = self.offset_converter.holdings.get(holding_key, None)
if holding is None:
continue
compare_pos[vt_symbol] = OrderedDict(
{
"账号空单": holding.short_pos,
'账号多单': holding.long_pos,
'策略空单': 0,
'策略多单': 0,
'空单策略': [],
'多单策略': []
}
)
# 逐一根据策略仓位与Account_pos进行处理比对
for strategy_pos in strategy_pos_list:
for pos in strategy_pos.get('pos', []):
vt_symbol = pos.get('vt_symbol')
if not vt_symbol:
continue
vt_symbols.add(vt_symbol)
symbol_pos = compare_pos.get(vt_symbol, None)
if symbol_pos is None:
self.write_log(u'账号持仓信息获取不到{},创建一个'.format(vt_symbol))
symbol_pos = OrderedDict(
{
"账号空单": 0,
'账号多单': 0,
'策略空单': 0,
'策略多单': 0,
'空单策略': [],
'多单策略': []
}
)
if pos.get('direction') == 'short':
symbol_pos.update({'策略空单': symbol_pos.get('策略空单', 0) + abs(pos.get('volume', 0))})
symbol_pos['空单策略'].append(
u'{}({})'.format(strategy_pos['strategy_name'], abs(pos.get('volume', 0))))
self.write_log(u'更新{}策略持空仓=>{}'.format(vt_symbol, symbol_pos.get('策略空单', 0)))
if pos.get('direction') == 'long':
symbol_pos.update({'策略多单': symbol_pos.get('策略多单', 0) + abs(pos.get('volume', 0))})
symbol_pos['多单策略'].append(
u'{}({})'.format(strategy_pos['strategy_name'], abs(pos.get('volume', 0))))
self.write_log(u'更新{}策略持多仓=>{}'.format(vt_symbol, symbol_pos.get('策略多单', 0)))
pos_compare_result = ''
# 精简输出
compare_info = ''
for vt_symbol in sorted(vt_symbols):
# 发送不一致得结果
symbol_pos = compare_pos.pop(vt_symbol)
d_long = {
'account_id': self.engine_config.get('account_id', '-'),
'vt_symbol': vt_symbol,
'direction': Direction.LONG.value,
'strategy_list': symbol_pos.get('多单策略', [])}
d_short = {
'account_id': self.engine_config.get('account_id', '-'),
'vt_symbol': vt_symbol,
'direction': Direction.SHORT.value,
'strategy_list': symbol_pos.get('多单策略', [])}
# 多空都一致
if round(symbol_pos['账号空单'], 7) == round(symbol_pos['策略空单'], 7) and \
round(symbol_pos['账号多单'], 7) == round(symbol_pos['策略多单'], 7):
msg = u'{}多空都一致.{}\n'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False))
self.write_log(msg)
compare_info += msg
else:
pos_compare_result += '\n{}: '.format(vt_symbol)
# 多单不一致
if round(symbol_pos['策略多单'], 7) != round(symbol_pos['账号多单'], 7):
msg = '{}多单[账号({}), 策略{},共({})], ' \
.format(vt_symbol,
symbol_pos['账号多单'],
symbol_pos['多单策略'],
symbol_pos['策略多单'])
pos_compare_result += msg
self.write_error(u'{}不一致:{}'.format(vt_symbol, msg))
compare_info += u'{}不一致:{}\n'.format(vt_symbol, msg)
# 空单不一致
if round(symbol_pos['策略空单'], 7) != round(symbol_pos['账号空单'], 7):
msg = '{}空单[账号({}), 策略{},共({})], ' \
.format(vt_symbol,
symbol_pos['账号空单'],
symbol_pos['空单策略'],
symbol_pos['策略空单'])
pos_compare_result += msg
self.write_error(u'{}不一致:{}'.format(vt_symbol, msg))
compare_info += u'{}不一致:{}\n'.format(vt_symbol, msg)
# 不匹配输入到stdErr通道
if pos_compare_result != '':
msg = u'账户{}持仓不匹配: {}' \
.format(self.engine_config.get('account_id', '-'),
pos_compare_result)
try:
from vnpy.trader.util_wechat import send_wx_msg
send_wx_msg(content=msg)
except Exception as ex:
pass
ret_msg = u'持仓不匹配: {}' \
.format(pos_compare_result)
self.write_error(ret_msg)
return True, compare_info + ret_msg
else:
self.write_log(u'账户持仓与策略一致')
return True, compare_info
def init_all_strategies(self):
"""
"""
@ -1516,13 +1689,16 @@ class CtaEngine(BaseEngine):
strategy_logger = self.strategy_loggers.get(strategy_name, None)
if not strategy_logger:
log_path = get_folder_path('log')
log_filename = os.path.abspath(os.path.join(log_path, str(strategy_name)))
log_filename = str(log_path.joinpath(str(strategy_name)))
print(u'create logger:{}'.format(log_filename))
self.strategy_loggers[strategy_name] = setup_logger(file_name=log_filename,
name=str(strategy_name))
strategy_logger = self.strategy_loggers.get(strategy_name)
if strategy_logger:
strategy_logger.log(level, msg)
else:
if self.logger:
self.logger.log(level, msg)
# 如果日志数据异常错误和告警输出至sys.stderr
if level in [logging.CRITICAL, logging.ERROR, logging.WARNING]:

View File

@ -248,13 +248,13 @@ class PortfolioTestingEngine(BackTestingEngine):
bar.high_price = float(bar_data['high'])
bar.low_price = float(bar_data['low'])
bar.volume = int(bar_data['volume'])
bar.date = dt.strftime('%Y-%m-%d')
bar.time = dt.strftime('%H:%M:%S')
bar.date = bar_datetime.strftime('%Y-%m-%d')
bar.time = bar_datetime.strftime('%H:%M:%S')
str_td = str(bar_data.get('trading_day', ''))
if len(str_td) == 8:
bar.trading_day = str_td[0:4] + '-' + str_td[4:6] + '-' + str_td[6:8]
else:
bar.trading_day = get_trading_date(dt)
bar.trading_day = get_trading_date(bar_datetime)
if last_trading_day != bar.trading_day:
self.output(u'回测数据日期:{},资金:{}'.format(bar.trading_day, self.net_capital))

View File

@ -185,7 +185,7 @@ class CtaGridTrade(CtaComponent):
self.min_dn_open_price = 0.0 # 下网格最小开仓价
# 网格json文件的路径
self.json_file_path = os.path.join(get_folder_path('data'), f'{self.json_name}_Grids.json')
self.json_file_path = str(get_folder_path('data').joinpath(f'{self.json_name}_Grids.json'))
def get_volume_rate(self, idx: int = 0):
"""获取网格索引对应的开仓数量比例"""
@ -887,7 +887,7 @@ class CtaGridTrade(CtaComponent):
self.json_name = self.strategy.strategy_name
# 新版网格持久化文件
grid_json_file = os.path.join(grids_save_path, u'{}_Grids.json'.format(self.json_name))
grid_json_file = str(grids_save_path.joinpath(u'{}_Grids.json'.format(self.json_name)))
self.json_file_path = grid_json_file
data = {}
@ -921,7 +921,7 @@ class CtaGridTrade(CtaComponent):
self.json_name = self.strategy.strategy_name
# 若json文件不存在就保存一个若存在就优先使用数据文件
grid_json_file = os.path.join(grids_save_path, u'{}_Grids.json'.format(self.json_name))
grid_json_file = str(grids_save_path.joinpath(u'{}_Grids.json'.format(self.json_name)))
if not os.path.exists(grid_json_file):
data['up_grids'] = []
data['dn_grids'] = []
@ -981,7 +981,7 @@ class CtaGridTrade(CtaComponent):
self.json_name = new_name
# 旧文件
old_json_file = os.path.join(data_folder, u'{0}_Grids.json'.format(old_name))
old_json_file = str(data_folder.joinpath(u'{0}_Grids.json'.format(old_name)))
if os.path.isfile(old_json_file): # 新文件若存在,移除
try:

View File

@ -72,8 +72,7 @@ class CtaPolicy(CtaComponent):
从持久化文件中获取
:return:
"""
json_file = os.path.abspath(
os.path.join(get_folder_path('data'), u'{}_Policy.json'.format(self.strategy.strategy_name)))
json_file = str(get_folder_path('data').joinpath(u'{}_Policy.json'.format(self.strategy.strategy_name)))
json_data = {}
if os.path.exists(json_file):
@ -93,8 +92,7 @@ class CtaPolicy(CtaComponent):
保存至持久化文件
:return:
"""
json_file = os.path.abspath(
os.path.join(get_folder_path('data'), u'{}_Policy.json'.format(self.strategy.strategy_name)))
json_file = str(get_folder_path('data').joinpath(u'{}_Policy.json'.format(self.strategy.strategy_name)))
try:
# 修改为:回测时不保存

View File

@ -110,6 +110,7 @@ class BinancefGateway(BaseGateway):
def __init__(self, event_engine: EventEngine, gateway_name="BINANCEF"):
"""Constructor"""
super().__init__(event_engine, gateway_name)
self.count = 0
self.trade_ws_api = BinancefTradeWebsocketApi(self)
self.market_ws_api = BinancefDataWebsocketApi(self)
@ -168,9 +169,19 @@ class BinancefGateway(BaseGateway):
and self.status.get('mdws_con', False):
self.status.update({'con': True})
self.count += 1
if self.count < 2:
return
self.count = 0
func = self.query_functions.pop(0)
func()
self.query_functions.append(func)
def get_order(self, orderid: str):
return self.rest_api.get_order(orderid)
class BinancefRestApi(RestClient):
"""
BINANCE REST API
@ -201,6 +212,7 @@ class BinancefRestApi(RestClient):
self.orders = {}
def sign(self, request: Request) -> Request:
"""
Generate BINANCE signature.
@ -280,7 +292,8 @@ class BinancefRestApi(RestClient):
self.gateway.write_log("REST API启动成功")
self.gateway.status.update({'td_con': True, 'td_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
if self.gateway.status.get('md_con', False):
self.gateway.status.update({'con': True})
self.query_time()
self.query_account()
self.query_position()
@ -290,6 +303,10 @@ class BinancefRestApi(RestClient):
self.start_user_stream()
# 添加到定时查询队列中
self.gateway.query_functions = [self.query_account, self.query_position]
def query_time(self) -> Request:
""""""
data = {
@ -516,7 +533,7 @@ class BinancefRestApi(RestClient):
short_position = PositionData(
symbol=d["symbol"],
exchange=Exchange.BINANCE,
direction=Direction.LONG,
direction=Direction.SHORT,
volume=0,
price=0,
pnl=0,
@ -805,6 +822,8 @@ class BinancefTradeWebsocketApi(WebsocketClient):
""""""
self.gateway.write_log("交易Websocket API连接成功")
self.gateway.status.update({'tdws_con': True, 'tdws_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
if self.gateway.status.get('td_con', False):
self.gateway.status.update({'con': True})
def on_packet(self, packet: dict) -> None: # type: (dict)->None
""""""
@ -923,6 +942,8 @@ class BinancefDataWebsocketApi(WebsocketClient):
""""""
self.gateway.write_log("行情Websocket API连接刷新")
self.gateway.status.update({'md_con': True, 'md_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
if self.gateway.status.get('td_con', False):
self.gateway.status.update({'con': True})
def subscribe(self, req: SubscribeRequest) -> None:
""""""

View File

@ -295,6 +295,7 @@ class BaseEngine(ABC):
self.engine_name = engine_name
self.logger = None
self.create_logger(engine_name)
def create_logger(self, logger_name: str = 'base_engine'):
"""
@ -303,7 +304,7 @@ class BaseEngine(ABC):
:return:
"""
log_path = get_folder_path("log")
log_filename = os.path.abspath(os.path.join(log_path, logger_name))
log_filename = str(log_path.joinpath(logger_name))
print(u'create logger:{}'.format(log_filename))
self.logger = setup_logger(file_name=log_filename, name=logger_name,
log_level=SETTINGS.get('log.level', logging.DEBUG))

View File

@ -97,13 +97,15 @@ class BaseGateway(ABC):
self.klines = {}
self.status = {'name': gateway_name, 'con': False}
self.query_functions = []
def create_logger(self):
"""
创建engine独有的日志
:return:
"""
log_path = get_folder_path("log")
log_filename = os.path.abspath(os.path.join(log_path, self.gateway_name))
log_filename = str(log_path.joinpath(self.gateway_name))
print(u'create logger:{}'.format(log_filename))
from vnpy.trader.setting import SETTINGS
self.logger = setup_logger(file_name=log_filename, name=self.gateway_name,

View File

@ -1417,9 +1417,11 @@ class KLineWidget(KeyWraper):
class GridKline(QtWidgets.QWidget):
"""多kline"""
def __init__(self, parent=None, kline_settings={}):
def __init__(self, parent=None, kline_settings={}, title=''):
self.parent = parent
super(GridKline, self).__init__(parent)
if title:
self.setWindowTitle(title)
self.kline_settings = kline_settings
self.kline_names = list(self.kline_settings.keys())
@ -1479,11 +1481,14 @@ class GridKline(QtWidgets.QWidget):
continue
# 加载K线
data_file = kline_setting.get('data_file', None)
if not data_file:
continue
df = pd.read_csv(data_file)
df = df.set_index(pd.DatetimeIndex(df['datetime']))
if 'data_frame' in kline_setting:
df = kline_setting['data_frame']
else:
data_file = kline_setting.get('data_file', None)
if not data_file:
continue
df = pd.read_csv(data_file)
df = df.set_index(pd.DatetimeIndex(df['datetime']))
canvas.loadData(df,
main_indicators=kline_setting.get('main_indicators', []),
sub_indicators=kline_setting.get('sub_indicators', [])

View File

@ -0,0 +1,72 @@
# flake8: noqa
"""
多周期显示K线切片
华富资产
"""
import sys
import os
import ctypes
import bz2
import pickle
import zlib
import pandas as pd
from vnpy.trader.ui.kline.crosshair import Crosshair
from vnpy.trader.ui.kline.kline import *
class UiSnapshot(object):
"""查看切片"""
def __init__(self):
pass
def show(self, snapshot_file: str):
if not os.path.exists(snapshot_file):
print(f'{snapshot_file}不存在', file=sys.stderr)
return
d = None
with bz2.BZ2File(snapshot_file, 'rb') as f:
d = pickle.load(f)
use_zlib = d.get('zlib', False)
klines = d.pop('klines', None)
# 如果使用压缩,则解压
if use_zlib and klines:
print('use zlib decompress klines')
klines = pickle.loads(zlib.decompress(klines))
kline_settings = {}
for k, v in klines.items():
# 获取bar各种数据/指标列表
data_list = v.pop('data_list', None)
if data_list is None:
continue
# 主图指标 / 附图指标清单
main_indicators = v.get('main_indicators', [])
sub_indicators = v.get('sub_indicators', [])
df = pd.DataFrame(data_list)
df = df.set_index(pd.DatetimeIndex(df['datetime']))
kline_settings.update(
{
k:
{
"data_frame": df,
"main_indicators": [x.get('name') for x in main_indicators],
"sub_indicators": [x.get('name') for x in sub_indicators]
}
}
)
# K线界面
try:
w = GridKline(kline_settings=kline_settings, title=d.get('strategy',''))
w.showMaximized()
except Exception as ex:
print(u'exception:{},trace:{}'.format(str(ex), traceback.format_exc()))

186
vnpy/trader/util_monitor.py Normal file
View File

@ -0,0 +1,186 @@
# encoding: UTF-8
# 华富资产
import os
from collections import OrderedDict
from typing import Any, Dict
from .utility import get_folder_path
from .util_logger import setup_logger
from .event import (
EVENT_TRADE,
EVENT_ORDER,
EVENT_POSITION,
EVENT_ACCOUNT,
EVENT_LOG
)
########################################################################
class BasicMonitor(object):
"""
基础监控
headers中的值对应的字典格式如下
{'display': u'中文名', 'cell': ""}
"""
event_type: str = ""
data_key: str = ""
headers: Dict[str, dict] = {}
# ----------------------------------------------------------------------
def __init__(self, event_engine=None, monitor_name='BasicMonitor'):
self.event_engine = event_engine
self.logger = None
self.create_logger(monitor_name)
self.register_event()
# ----------------------------------------------------------------------
def register_event(self):
if self.event_type:
self.event_engine.register(self.event_type, self.update_event)
# ----------------------------------------------------------------------
def update_event(self, event):
"""收到事件更新"""
data = event.data
self.update_data(data)
# ----------------------------------------------------------------------
def update_data(self, data):
"""将数据更新到表格中"""
s = []
for header, value in self.headers.items():
v = getattr(data, header)
s.append('%s: %s' % (value['display'], str(v)))
if self.logger is not None:
self.logger.info(' '.join(s))
def create_logger(self, monitor_name):
"""创建日志写入"""
filename = str(get_folder_path('log').joinpath(monitor_name))
print(u'create logger:{}'.format(filename))
self.logger = setup_logger(file_name=filename, name=monitor_name)
class LogMonitor(BasicMonitor):
"""
Monitor for log data.
"""
event_type = EVENT_LOG
data_key = ""
headers = {
"time": {"display": "时间", "update": False},
"msg": {"display": "信息", "update": False},
"gateway_name": {"display": "接口", "update": False},
}
def __init__(self, event_engine=None, monitor_name='LogMonitor'):
super().__init__(event_engine, monitor_name)
class TradeMonitor(BasicMonitor):
"""
Monitor for trade data.
"""
event_type = EVENT_TRADE
data_key = ""
sorting = True
headers: Dict[str, dict] = {
"tradeid": {"display": "成交号 ", "update": False},
"orderid": {"display": "委托号", "update": False},
"symbol": {"display": "代码", "update": False},
"exchange": {"display": "交易所", "update": False},
"direction": {"display": "方向", "update": False},
"offset": {"display": "开平", "update": False},
"price": {"display": "价格", "update": False},
"volume": {"display": "数量", "update": False},
"time": {"display": "时间", "update": False},
"gateway_name": {"display": "接口", "update": False},
}
def __init__(self, event_engine=None, monitor_name='TradeMonitor'):
super().__init__(event_engine, monitor_name)
class OrderMonitor(BasicMonitor):
"""
Monitor for order data.
"""
event_type = EVENT_ORDER
data_key = "vt_orderid"
sorting = True
headers: Dict[str, dict] = {
"orderid": {"display": "委托号", "update": False},
"symbol": {"display": "代码", "update": False},
"exchange": {"display": "交易所", "update": False},
"type": {"display": "类型", "update": False},
"direction": {"display": "方向", "update": False},
"offset": {"display": "开平", "update": False},
"price": {"display": "价格", "update": False},
"volume": {"display": "总数量", "update": True},
"traded": {"display": "已成交", "update": True},
"status": {"display": "状态", "update": True},
"time": {"display": "时间", "update": True},
"gateway_name": {"display": "接口", "update": False},
}
def __init__(self, event_engine=None, monitor_name='OrderMonitor'):
super().__init__(event_engine, monitor_name)
class PositionMonitor(BasicMonitor):
"""
Monitor for position data.
"""
event_type = EVENT_POSITION
data_key = "vt_positionid"
sorting = True
headers = {
"symbol": {"display": "代码", "update": False},
"exchange": {"display": "交易所", "update": False},
"direction": {"display": "方向", "update": False},
"volume": {"display": "数量", "update": True},
"yd_volume": {"display": "昨仓", "update": True},
"frozen": {"display": "冻结", "update": True},
"price": {"display": "均价", "update": True},
"pnl": {"display": "盈亏", "update": True},
"gateway_name": {"display": "接口", "update": False},
}
def __init__(self, event_engine=None, monitor_name='PositionMonitor'):
super().__init__(event_engine, monitor_name)
class AccountMonitor(BasicMonitor):
"""
Monitor for account data.
"""
event_type = EVENT_ACCOUNT
data_key = "vt_accountid"
sorting = True
headers = {
"accountid": {"display": "账号", "update": False},
"pre_balance": {"display": "昨净值", "update": False},
"balance": {"display": "净值", "update": True},
"frozen": {"display": "冻结", "update": True},
"margin": {"display": "保证金", "update": True},
"available": {"display": "可用", "update": True},
"commission": {"display": "手续费", "update": True},
"close_profit": {"display": "平仓收益", "update": True},
"holding_profit": {"display": "持仓收益", "update": True},
"gateway_name": {"display": "接口", "update": False},
}
def __init__(self, event_engine=None, monitor_name='AccountMonitor'):
super().__init__(event_engine, monitor_name)

View File

@ -8,7 +8,7 @@ import os
# 一般使用与运行服务,且唯一进程
# 日志文件路径
logs_path = os.path.abspath(os.path.join(os.getcwd(), 'logs'))
logs_path = os.path.abspath(os.path.join(os.getcwd(), 'log'))
if not os.path.isdir(logs_path):
os.mkdir(logs_path)
assert os.path.isdir(logs_path)
@ -16,7 +16,6 @@ assert os.path.isdir(logs_path)
# 记录pid得文件
pid_file = os.path.abspath(os.path.join(logs_path, 'pid.txt'))
def _check_pid(pid):
"""
检查pid是否与当前进程pid一致

View File

@ -233,7 +233,8 @@ def get_folder_path(folder_name: str) -> Path:
"""
folder_path = TEMP_DIR.joinpath(folder_name)
if not folder_path.exists():
folder_path.mkdir()
os.makedirs(str(folder_path))
#folder_path.mkdir()
return folder_path