[update] 增加查询所有持仓,一些bug修复

This commit is contained in:
msincenselee 2021-03-13 22:23:43 +08:00
parent 2aa264f30a
commit c439195056
12 changed files with 274 additions and 156 deletions

View File

@ -23,7 +23,7 @@ conda create -name py37=python3.7
解决: 解决:
sudo find / -name libta_lib.so.0 sudo find / -name libta_lib.so.0
/home/ai/eco-ta/ta-lib/src/.libs/libta_lib.so.0 /home/trade/ta-lib/src/.libs/libta_lib.so.0
/usr/local/lib/libta_lib.so.0 /usr/local/lib/libta_lib.so.0
vi /etc/profile vi /etc/profile
添加 添加

View File

@ -13,7 +13,6 @@ matplotlib
seaborn seaborn
futu-api futu-api
tigeropen tigeropen
rqdatac
ta-lib ta-lib
ibapi ibapi
deap deap
@ -22,4 +21,4 @@ QScintilla
PySocks PySocks
pykalman pykalman
cython cython
tqsdk

View File

@ -1841,7 +1841,7 @@ class BackTestingEngine(object):
self.daily_max_drawdown_rate = drawdown_rate self.daily_max_drawdown_rate = drawdown_rate
self.max_drawdown_rate_time = data['date'] self.max_drawdown_rate_time = data['date']
msg = u'{}: net={}, capital={} max={} margin={} commission={} pos: {}' \ msg = u'{}: net={}, capital={} max={} holding_profit={} commission={} pos: \n{}' \
.format(data['date'], .format(data['date'],
data['net'], c, m, data['net'], c, m,
today_holding_profit, today_holding_profit,
@ -1922,6 +1922,7 @@ class BackTestingEngine(object):
d = {} d = {}
d['init_capital'] = self.init_capital d['init_capital'] = self.init_capital
d['profit'] = self.cur_capital - self.init_capital d['profit'] = self.cur_capital - self.init_capital
d['net_capital'] = self.net_capital
d['max_capital'] = self.max_net_capital # 取消原 maxCapital d['max_capital'] = self.max_net_capital # 取消原 maxCapital
if len(self.pnl_list) == 0: if len(self.pnl_list) == 0:
@ -2002,8 +2003,11 @@ class BackTestingEngine(object):
result_info.update({u'期初资金': d['init_capital']}) result_info.update({u'期初资金': d['init_capital']})
self.output(u'期初资金:\t%s' % format_number(d['init_capital'])) self.output(u'期初资金:\t%s' % format_number(d['init_capital']))
result_info.update({u'总盈亏': d['profit']}) result_info.update({u'期末资金': d['net_capital']})
self.output(u'总盈亏:\t%s' % format_number(d['profit'])) self.output(u'期末资金:\t%s' % format_number(d['net_capital']))
result_info.update({u'平仓盈亏': d['profit']})
self.output(u'平仓盈亏:\t%s' % format_number(d['profit']))
result_info.update({u'资金最高净值': d['max_capital']}) result_info.update({u'资金最高净值': d['max_capital']})
self.output(u'资金最高净值:\t%s' % format_number(d['max_capital'])) self.output(u'资金最高净值:\t%s' % format_number(d['max_capital']))

View File

@ -858,6 +858,10 @@ class CtaEngine(BaseEngine):
vt_position_id = f"{gateway_name}.{vt_symbol}.{direction.value}" vt_position_id = f"{gateway_name}.{vt_symbol}.{direction.value}"
return self.main_engine.get_position(vt_position_id) return self.main_engine.get_position(vt_position_id)
def get_all_positions(self):
"""查询当前帐号得所有持仓合约"""
return self.main_engine.get_all_positions()
def get_engine_type(self): def get_engine_type(self):
"""""" """"""
return self.engine_type return self.engine_type
@ -993,88 +997,110 @@ class CtaEngine(BaseEngine):
""" """
Init strategies in queue. Init strategies in queue.
""" """
strategy = self.strategies[strategy_name] try:
strategy = self.strategies[strategy_name]
if strategy.inited: if strategy.inited:
self.write_error(f"{strategy_name}已经完成初始化,禁止重复操作") self.write_error(f"{strategy_name}已经完成初始化,禁止重复操作")
return return
self.write_log(f"{strategy_name}开始执行初始化") self.write_log(f"{strategy_name}开始执行初始化")
# Call on_init function of strategy # Call on_init function of strategy
self.call_strategy_func(strategy, strategy.on_init) self.call_strategy_func(strategy, strategy.on_init)
# Restore strategy data(variables) # Restore strategy data(variables)
# Pro 版本不使用自动恢复除了内部数据功能,由策略自身初始化时完成 # Pro 版本不使用自动恢复除了内部数据功能,由策略自身初始化时完成
# data = self.strategy_data.get(strategy_name, None) # data = self.strategy_data.get(strategy_name, None)
# if data: # if data:
# for name in strategy.variables: # for name in strategy.variables:
# value = data.get(name, None) # value = data.get(name, None)
# if value: # if value:
# setattr(strategy, name, value) # setattr(strategy, name, value)
# Subscribe market data 订阅缺省的vt_symbol, 如果有其他合约需要订阅,由策略内部初始化时提交订阅即可。 # Subscribe market data 订阅缺省的vt_symbol, 如果有其他合约需要订阅,由策略内部初始化时提交订阅即可。
self.subscribe_symbol(strategy_name, vt_symbol=strategy.vt_symbol) self.subscribe_symbol(strategy_name, vt_symbol=strategy.vt_symbol)
# Put event to update init completed status. # Put event to update init completed status.
strategy.inited = True strategy.inited = True
self.put_strategy_event(strategy) self.put_strategy_event(strategy)
self.write_log(f"{strategy_name}初始化完成") self.write_log(f"{strategy_name}初始化完成")
# 初始化后,自动启动策略交易
if auto_start:
self.start_strategy(strategy_name)
except Exception as ex:
msg = f'{strategy_name}执行on_init异常:{str(ex)}'
self.write_error(ex)
self.send_wechat(msg)
self.write_error(traceback.format_exc())
# 初始化后,自动启动策略交易
if auto_start:
self.start_strategy(strategy_name)
def start_strategy(self, strategy_name: str): def start_strategy(self, strategy_name: str):
""" """
Start a strategy. Start a strategy.
""" """
strategy = self.strategies[strategy_name] try:
if not strategy.inited: strategy = self.strategies[strategy_name]
msg = f"策略{strategy.strategy_name}启动失败,请先初始化" if not strategy.inited:
self.write_error(msg) msg = f"策略{strategy.strategy_name}启动失败,请先初始化"
return False, msg self.write_error(msg)
return False, msg
if strategy.trading: if strategy.trading:
msg = f"{strategy_name}已经启动,请勿重复操作" msg = f"{strategy_name}已经启动,请勿重复操作"
self.write_error(msg) self.write_error(msg)
return False, msg return False, msg
self.call_strategy_func(strategy, strategy.on_start) self.call_strategy_func(strategy, strategy.on_start)
strategy.trading = True strategy.trading = True
self.put_strategy_event(strategy) self.put_strategy_event(strategy)
return True, f'成功启动策略{strategy_name}' return True, f'成功启动策略{strategy_name}'
except Exception as ex:
msg = f'{strategy_name}执行on_start异常:{str(ex)}'
self.write_error(ex)
self.send_wechat(msg)
self.write_error(traceback.format_exc())
def stop_strategy(self, strategy_name: str): def stop_strategy(self, strategy_name: str):
""" """
Stop a strategy. Stop a strategy.
""" """
strategy = self.strategies[strategy_name] try:
if not strategy.trading: strategy = self.strategies[strategy_name]
msg = f'{strategy_name}策略实例已处于停止交易状态' if not strategy.trading:
self.write_log(msg) msg = f'{strategy_name}策略实例已处于停止交易状态'
return False, msg self.write_log(msg)
return False, msg
# Call on_stop function of the strategy # Call on_stop function of the strategy
self.write_log(f'调用{strategy_name}的on_stop,停止交易') self.write_log(f'调用{strategy_name}的on_stop,停止交易')
self.call_strategy_func(strategy, strategy.on_stop) self.call_strategy_func(strategy, strategy.on_stop)
# Change trading status of strategy to False # Change trading status of strategy to False
strategy.trading = False strategy.trading = False
# Cancel all orders of the strategy # Cancel all orders of the strategy
self.write_log(f'撤销{strategy_name}所有委托') self.write_log(f'撤销{strategy_name}所有委托')
self.cancel_all(strategy) self.cancel_all(strategy)
# Sync strategy variables to data file # Sync strategy variables to data file
# 取消此功能,由策略自身完成数据的持久化 # 取消此功能,由策略自身完成数据的持久化
# self.sync_strategy_data(strategy) # self.sync_strategy_data(strategy)
# Update GUI # Update GUI
self.put_strategy_event(strategy) self.put_strategy_event(strategy)
return True, f'成功停止策略{strategy_name}' return True, f'成功停止策略{strategy_name}'
except Exception as ex:
msg = f'执行stop_strategy({strategy_name})异常:{str(ex)}'
self.write_error(ex)
self.send_wechat(msg)
self.write_error(traceback.format_exc())
return False, f'停止策略失败{strategy_name},异常:{str(ex)}'
def edit_strategy(self, strategy_name: str, setting: dict): def edit_strategy(self, strategy_name: str, setting: dict):
""" """
@ -1094,36 +1120,45 @@ class CtaEngine(BaseEngine):
""" """
Remove a strategy. Remove a strategy.
""" """
strategy = self.strategies[strategy_name] try:
if strategy.trading:
err_msg = f"策略{strategy.strategy_name}移除失败,请先停止"
self.write_error(err_msg)
return False, err_msg
# Remove setting strategy = self.strategies[strategy_name]
self.remove_strategy_setting(strategy_name) if strategy.trading:
err_msg = f"策略{strategy.strategy_name}移除失败,请先停止"
self.write_error(err_msg)
return False, err_msg
# 移除订阅合约与策略的关联关系 # Remove setting
for vt_symbol in self.strategy_symbol_map[strategy_name]: self.remove_strategy_setting(strategy_name)
# Remove from symbol strategy map
self.write_log(f'移除{vt_symbol}《=》{strategy_name}的订阅关系')
strategies = self.symbol_strategy_map[vt_symbol]
strategies.remove(strategy)
# Remove from active orderid map # 移除订阅合约与策略的关联关系
if strategy_name in self.strategy_orderid_map: for vt_symbol in self.strategy_symbol_map[strategy_name]:
vt_orderids = self.strategy_orderid_map.pop(strategy_name) # Remove from symbol strategy map
self.write_log(f'移除{strategy_name}的所有委托订单映射关系') self.write_log(f'移除{vt_symbol}《=》{strategy_name}的订阅关系')
# Remove vt_orderid strategy map strategies = self.symbol_strategy_map[vt_symbol]
for vt_orderid in vt_orderids: strategies.remove(strategy)
if vt_orderid in self.orderid_strategy_map:
self.orderid_strategy_map.pop(vt_orderid)
# Remove from strategies # Remove from active orderid map
self.write_log(f'移除{strategy_name}策略实例') if strategy_name in self.strategy_orderid_map:
self.strategies.pop(strategy_name) vt_orderids = self.strategy_orderid_map.pop(strategy_name)
self.write_log(f'移除{strategy_name}的所有委托订单映射关系')
# Remove vt_orderid strategy map
for vt_orderid in vt_orderids:
if vt_orderid in self.orderid_strategy_map:
self.orderid_strategy_map.pop(vt_orderid)
return True, f'成功移除{strategy_name}策略实例' # Remove from strategies
self.write_log(f'移除{strategy_name}策略实例')
self.strategies.pop(strategy_name)
return True, f'成功移除{strategy_name}策略实例'
except Exception as ex:
msg = f'执行remove_strategy({strategy_name})异常:{str(ex)}'
self.write_error(ex)
self.send_wechat(msg)
self.write_error(traceback.format_exc())
return False, f'移除策略失败{strategy_name},异常:{str(ex)}'
def reload_strategy(self, strategy_name: str, vt_symbol: str = '', setting: dict = {}): def reload_strategy(self, strategy_name: str, vt_symbol: str = '', setting: dict = {}):
""" """
@ -1133,63 +1168,70 @@ class CtaEngine(BaseEngine):
:param setting: :param setting:
:return: :return:
""" """
self.write_log(f'开始重新加载策略{strategy_name}') try:
self.write_log(f'开始重新加载策略{strategy_name}')
# 优先判断重启的策略,是否已经加载 # 优先判断重启的策略,是否已经加载
if strategy_name not in self.strategies or strategy_name not in self.strategy_setting: if strategy_name not in self.strategies or strategy_name not in self.strategy_setting:
err_msg = f"{strategy_name}不在运行策略中,不能重启" err_msg = f"{strategy_name}不在运行策略中,不能重启"
self.write_error(err_msg)
return False, err_msg
# 从本地配置文件中读取
if len(setting) == 0:
strategies_setting = load_json(self.setting_filename)
old_strategy_config = strategies_setting.get(strategy_name, {})
else:
old_strategy_config = copy(self.strategy_setting[strategy_name])
class_name = old_strategy_config.get('class_name')
if len(vt_symbol) == 0:
vt_symbol = old_strategy_config.get('vt_symbol')
if len(setting) == 0:
setting = old_strategy_config.get('setting')
module_name = self.class_module_map[class_name]
# 重新load class module
#if not self.load_strategy_class_from_module(module_name):
# err_msg = f'不能加载模块:{module_name}'
# self.write_error(err_msg)
# return False, err_msg
if module_name:
new_class_name = module_name + '.' + class_name
self.write_log(u'转换策略为全路径:{}'.format(new_class_name))
strategy_class = import_module_by_str(new_class_name)
if strategy_class is None:
err_msg = u'加载策略模块失败:{}'.format(class_name)
self.write_error(err_msg) self.write_error(err_msg)
return False, err_msg return False, err_msg
self.write_log(f'重新加载模块成功,使用新模块:{new_class_name}') # 从本地配置文件中读取
self.classes[class_name] = strategy_class if len(setting) == 0:
strategies_setting = load_json(self.setting_filename)
old_strategy_config = strategies_setting.get(strategy_name, {})
else:
old_strategy_config = copy(self.strategy_setting[strategy_name])
# 停止当前策略实例的运行,撤单 class_name = old_strategy_config.get('class_name')
self.stop_strategy(strategy_name) if len(vt_symbol) == 0:
vt_symbol = old_strategy_config.get('vt_symbol')
if len(setting) == 0:
setting = old_strategy_config.get('setting')
# 移除运行中的策略实例 module_name = self.class_module_map[class_name]
self.remove_strategy(strategy_name) # 重新load class module
#if not self.load_strategy_class_from_module(module_name):
# err_msg = f'不能加载模块:{module_name}'
# self.write_error(err_msg)
# return False, err_msg
if module_name:
new_class_name = module_name + '.' + class_name
self.write_log(u'转换策略为全路径:{}'.format(new_class_name))
# 重新添加策略 strategy_class = import_module_by_str(new_class_name)
self.add_strategy(class_name=class_name, if strategy_class is None:
strategy_name=strategy_name, err_msg = u'加载策略模块失败:{}'.format(class_name)
vt_symbol=vt_symbol, self.write_error(err_msg)
setting=setting, return False, err_msg
auto_init=old_strategy_config.get('auto_init', False),
auto_start=old_strategy_config.get('auto_start', False))
msg = f'成功重载策略{strategy_name}' self.write_log(f'重新加载模块成功,使用新模块:{new_class_name}')
self.write_log(msg) self.classes[class_name] = strategy_class
return True, msg
# 停止当前策略实例的运行,撤单
self.stop_strategy(strategy_name)
# 移除运行中的策略实例
self.remove_strategy(strategy_name)
# 重新添加策略
self.add_strategy(class_name=class_name,
strategy_name=strategy_name,
vt_symbol=vt_symbol,
setting=setting,
auto_init=old_strategy_config.get('auto_init', False),
auto_start=old_strategy_config.get('auto_start', False))
msg = f'成功重载策略{strategy_name}'
self.write_log(msg)
return True, msg
except Exception as ex:
msg = f'执行reload_strategy({strategy_name})异常:{str(ex)}'
self.write_error(ex)
self.send_wechat(msg)
self.write_error(traceback.format_exc())
return False, f'重启策略失败{strategy_name},异常:{str(ex)}'
def save_strategy_data(self, select_name: str = 'ALL'): def save_strategy_data(self, select_name: str = 'ALL'):
""" save strategy data""" """ save strategy data"""

View File

@ -975,7 +975,7 @@ class CtaFutureTemplate(CtaTemplate):
else: else:
self.write_error(u'委托空单平仓失败') self.write_error(u'委托空单平仓失败')
def grid_buy(self, grid): def grid_buy(self, grid, **kwargs):
""" """
事务开多仓 事务开多仓
:return: :return:
@ -1002,7 +1002,7 @@ class CtaFutureTemplate(CtaTemplate):
.format(grid.type, grid.open_price, grid.volume, grid.close_price)) .format(grid.type, grid.open_price, grid.volume, grid.close_price))
return False return False
def grid_short(self, grid): def grid_short(self, grid, **kwargs):
""" """
事务开空仓 事务开空仓
:return: :return:
@ -1029,7 +1029,7 @@ class CtaFutureTemplate(CtaTemplate):
.format(grid.type, grid.open_price, grid.volume, grid.close_price)) .format(grid.type, grid.open_price, grid.volume, grid.close_price))
return False return False
def grid_sell(self, grid): def grid_sell(self, grid, **kwargs):
""" """
事务平多单仓位 事务平多单仓位
1.来源自止损止盈平仓 1.来源自止损止盈平仓
@ -1090,7 +1090,7 @@ class CtaFutureTemplate(CtaTemplate):
return True return True
def grid_cover(self, grid): def grid_cover(self, grid, **kwargs):
""" """
事务平空单仓位 事务平空单仓位
1.来源自止损止盈平仓 1.来源自止损止盈平仓
@ -1179,13 +1179,13 @@ class CtaFutureTemplate(CtaTemplate):
# 只处理未成交的限价委托单 # 只处理未成交的限价委托单
if order_status in [Status.SUBMITTING, Status.NOTTRADED] and order_type == OrderType.LIMIT: if order_status in [Status.SUBMITTING, Status.NOTTRADED] and order_type == OrderType.LIMIT:
if over_seconds > self.cancel_seconds or force: # 超过设置的时间还未成交 if over_seconds > self.cancel_seconds or force: # 超过设置的时间还未成交
self.write_log(u'超时{}秒未成交取消委托单vt_orderid:{},order:{}' self.write_log(u'{}超时{}秒未成交取消委托单vt_orderid:{},order:{}'
.format(over_seconds, vt_orderid, order_info)) .format(order_vt_symbol, over_seconds, vt_orderid, order_info))
order_info.update({'status': Status.CANCELLING}) order_info.update({'status': Status.CANCELLING})
self.active_orders.update({vt_orderid: order_info}) self.active_orders.update({vt_orderid: order_info})
ret = self.cancel_order(str(vt_orderid)) ret = self.cancel_order(str(vt_orderid))
if not ret: if not ret:
self.write_log(u'撤单失败,更新状态为撤单成功') self.write_log(f'{order_vt_symbol}撤单失败,更新状态为撤单成功')
order_info.update({'status': Status.CANCELLED}) order_info.update({'status': Status.CANCELLED})
self.active_orders.update({vt_orderid: order_info}) self.active_orders.update({vt_orderid: order_info})
if order_grid and vt_orderid in order_grid.order_ids: if order_grid and vt_orderid in order_grid.order_ids:

View File

@ -212,11 +212,12 @@ class BackTestingEngine(object):
self.test_setting = None # 回测设置 self.test_setting = None # 回测设置
self.strategy_setting = None # 所有回测策略得设置 self.strategy_setting = None # 所有回测策略得设置
def create_fund_kline(self, name, use_renko=False): def create_fund_kline(self, name, use_renko=False, extra_setting = {}):
""" """
创建资金曲线 创建资金曲线
:param name: 账号名或者策略名 :param name: 账号名或者策略名
:param use_renko: :param use_renko:是否使用砖图
:param extra_setting: 扩展得k线设置例如macd等
:return: :return:
""" """
setting = {} setting = {}
@ -228,7 +229,9 @@ class BackTestingEngine(object):
setting['price_tick'] = 0.01 setting['price_tick'] = 0.01
setting['underlying_symbol'] = 'fund' setting['underlying_symbol'] = 'fund'
setting['is_7x24'] = self.is_7x24 setting['is_7x24'] = self.is_7x24
for k,v in extra_setting.items():
if k not in setting:
setting.update({k:v})
if use_renko: if use_renko:
# 使用砖图,高度是资金的千分之一 # 使用砖图,高度是资金的千分之一
setting['height'] = self.init_capital * 0.001 setting['height'] = self.init_capital * 0.001
@ -1255,7 +1258,7 @@ class BackTestingEngine(object):
"""更新持仓信息,把今仓=>昨仓""" """更新持仓信息,把今仓=>昨仓"""
for k, v in self.positions.items(): for k, v in self.positions.items():
if v.volume > 0: if v.volume != v.yd_volume:
self.write_log(f'调整{v.vt_symbol}持仓: 昨仓{v.yd_volume} => {v.volume}') self.write_log(f'调整{v.vt_symbol}持仓: 昨仓{v.yd_volume} => {v.volume}')
v.yd_volume = v.volume v.yd_volume = v.volume

View File

@ -598,20 +598,29 @@ class CtaStockTemplate(CtaTemplate):
self.write_log(f'清除委托单:{lg.order_ids}') self.write_log(f'清除委托单:{lg.order_ids}')
[self.cta_engine.cancel_order(self, vt_orderid) for vt_orderid in lg.order_ids] [self.cta_engine.cancel_order(self, vt_orderid) for vt_orderid in lg.order_ids]
lg.order_ids = [] lg.order_ids = []
if lg.open_status and not lg.close_status and not lg.order_status:
# 持仓
if lg.open_status and not lg.close_status and not lg.order_status and lg.volume > 0:
pos = self.get_position(lg.vt_symbol) pos = self.get_position(lg.vt_symbol)
pos.price = round((pos.price * pos.volume + lg.open_price * lg.volume) / (pos.volume + lg.volume),3)
pos.volume += lg.volume pos.volume += lg.volume
lg.traded_volume = 0 lg.traded_volume = 0
self.write_log(u'持仓状态,加载持仓多单[{},价格:{},数量:{}手, 开仓时间:{}' self.write_log(u'持仓状态,加载持仓多单[{},价格:{},数量:{}手, 开仓时间:{}'
.format(lg.vt_symbol, lg.open_price, lg.volume, lg.open_time)) .format(lg.vt_symbol, lg.open_price, lg.volume, lg.open_time))
self.positions.update({lg.vt_symbol: pos}) self.positions.update({lg.vt_symbol: pos})
# 部分开仓
elif lg.order_status and not lg.open_status and not lg.close_status and lg.traded_volume > 0: elif lg.order_status and not lg.open_status and not lg.close_status and lg.traded_volume > 0:
pos = self.get_position(lg.vt_symbol) pos = self.get_position(lg.vt_symbol)
pos.price = round(
(pos.price * pos.volume + lg.open_price * lg.traded_volume) / (pos.volume + lg.traded_volume), 3)
pos.volume += lg.traded_volume pos.volume += lg.traded_volume
self.write_log(u'开仓状态,加载部分持仓多单[{},价格:{},数量:{}手, 开仓时间:{}' self.write_log(u'开仓状态,加载部分持仓多单[{},价格:{},数量:{}手, 开仓时间:{}'
.format(lg.vt_symbol, lg.open_price, lg.traded_volume, lg.open_time)) .format(lg.vt_symbol, lg.open_price, lg.traded_volume, lg.open_time))
self.positions.update({lg.vt_symbol: pos}) self.positions.update({lg.vt_symbol: pos})
elif lg.order_status and lg.open_status and lg.close_status:
# 正在平仓 =>
elif lg.order_status and lg.open_status and lg.close_status and lg.volume > 0:
if lg.traded_volume > 0: if lg.traded_volume > 0:
old_volume = lg.volume old_volume = lg.volume
lg.volume -= lg.traded_volume lg.volume -= lg.traded_volume
@ -619,6 +628,8 @@ class CtaStockTemplate(CtaTemplate):
lg.traded_volume = 0 lg.traded_volume = 0
pos = self.get_position(lg.vt_symbol) pos = self.get_position(lg.vt_symbol)
pos.price = round(
(pos.price * pos.volume + lg.open_price * lg.volume) / (pos.volume + lg.volume), 3)
pos.volume += lg.volume pos.volume += lg.volume
self.write_log(u'卖出状态,加载持仓多单[{},价格:{},数量:{}手, 开仓时间:{}' self.write_log(u'卖出状态,加载持仓多单[{},价格:{},数量:{}手, 开仓时间:{}'
.format(lg.vt_symbol, lg.open_price, lg.volume, lg.open_time)) .format(lg.vt_symbol, lg.open_price, lg.volume, lg.open_time))

View File

@ -212,11 +212,12 @@ class BackTestingEngine(object):
self.test_setting = None # 回测设置 self.test_setting = None # 回测设置
self.strategy_setting = None # 所有回测策略得设置 self.strategy_setting = None # 所有回测策略得设置
def create_fund_kline(self, name, use_renko=False): def create_fund_kline(self, name, use_renko=False, extra_setting = {}):
""" """
创建资金曲线 创建资金曲线
:param name: 账号名或者策略名 :param name: 账号名或者策略名
:param use_renko: :param use_renko:
:param extra_setting: 扩展得k线设置例如macd等
:return: :return:
""" """
setting = {} setting = {}
@ -232,6 +233,10 @@ class BackTestingEngine(object):
setting['height'] = self.init_capital * 0.001 setting['height'] = self.init_capital * 0.001
setting['use_renko'] = True setting['use_renko'] = True
for k, v in extra_setting.items():
if k not in setting:
setting.update({k: v})
fund_kline = FundKline(cta_engine=self, setting=setting) fund_kline = FundKline(cta_engine=self, setting=setting)
self.fund_kline_dict.update({name: fund_kline}) self.fund_kline_dict.update({name: fund_kline})
return fund_kline return fund_kline
@ -254,6 +259,36 @@ class BackTestingEngine(object):
else: else:
return None return None
# todo wj
def save_fund_kline(self, name: str = None):
# 没有指定账号并且存在一个或多个资金K线
if len(self.fund_kline_dict) > 0:
# 优先找vt_setting中配置了strategy_groud的资金K线
kline = self.fund_kline_dict.get(name, None)
# 找不到,返回第一个
if kline is None:
kline = self.fund_kline_dict.values()[0]
kline_file = str(os.path.join(self.get_data_path(), 'fund_{}.csv'.format(name)))
# 如果数据文件存在,则删除数据
if os.path.exists(kline_file):
os.remove(kline_file)
# 设置 kline的输出文件
kline.kline.export_filename = kline_file
kline.kline.export_fields = [
{'name': 'datetime', 'source': 'bar', 'attr': 'datetime', 'type_': 'datetime'},
{'name': 'open', 'source': 'bar', 'attr': 'open_price', 'type_': 'float'},
{'name': 'high', 'source': 'bar', 'attr': 'high_price', 'type_': 'float'},
{'name': 'low', 'source': 'bar', 'attr': 'low_price', 'type_': 'float'},
{'name': 'close', 'source': 'bar', 'attr': 'close_price', 'type_': 'float'},
{'name': 'turnover', 'source': 'bar', 'attr': 'turnover', 'type_': 'float'},
{'name': 'volume', 'source': 'bar', 'attr': 'volume', 'type_': 'float'},
{'name': 'open_interest', 'source': 'bar', 'attr': 'open_interest', 'type_': 'float'}
]
kline.save()
def get_account(self, vt_accountid: str = ""): def get_account(self, vt_accountid: str = ""):
"""返回账号的实时权益,可用资金,仓位比例,投资仓位比例上限""" """返回账号的实时权益,可用资金,仓位比例,投资仓位比例上限"""
if self.net_capital == 0.0: if self.net_capital == 0.0:
@ -2199,6 +2234,11 @@ class BackTestingEngine(object):
def show_backtesting_result(self): def show_backtesting_result(self):
"""显示回测结果""" """显示回测结果"""
# 导出资金曲线
if self.active_fund_kline:
for key in self.fund_kline_dict.keys():
self.save_fund_kline(key)
d, daily_net_capital, daily_capital = self.get_result() d, daily_net_capital, daily_capital = self.get_result()
if len(d) == 0: if len(d) == 0:

View File

@ -220,14 +220,20 @@ class CtaEngine(BaseEngine):
def process_timer_event(self, event: Event): def process_timer_event(self, event: Event):
""" 处理定时器事件""" """ 处理定时器事件"""
all_trading = True all_trading = True
dt = datetime.now()
# 触发每个策略的定时接口 # 触发每个策略的定时接口
for strategy in list(self.strategies.values()): for strategy in list(self.strategies.values()):
strategy.on_timer() strategy.on_timer()
if not strategy.trading: if not strategy.trading:
all_trading = False all_trading = False
dt = datetime.now() # 临近夜晚收盘前,强制发出撤单
if dt.hour == 2 and dt.minute == 59 and dt.second >= 55:
self.cancel_all(strategy)
# 每分钟执行的逻辑 # 每分钟执行的逻辑
if self.last_minute != dt.minute: if self.last_minute != dt.minute:
self.last_minute = dt.minute self.last_minute = dt.minute

View File

@ -904,6 +904,11 @@ class CtaProTemplate(CtaTemplate):
order_time = order_info.get('order_time') order_time = order_info.get('order_time')
over_ms = (dt - order_time).total_seconds() over_ms = (dt - order_time).total_seconds()
# 白天开盘或许有指数与真实tick的时间延迟这个时刻不做撤单功能
if f'{dt.hour}:{dt.minute}' in ['10:30', '13:30']:
continue
if (over_ms > self.cancel_seconds) \ if (over_ms > self.cancel_seconds) \
or force: # 超过设置的时间还未成交 or force: # 超过设置的时间还未成交
self.write_log(f'{dt}, 超时{over_ms}秒未成交,取消委托单:{order_info}') self.write_log(f'{dt}, 超时{over_ms}秒未成交,取消委托单:{order_info}')

View File

@ -4,6 +4,7 @@ import sys
import traceback import traceback
import json import json
from datetime import datetime, timedelta from datetime import datetime, timedelta
from time import time
from copy import copy, deepcopy from copy import copy, deepcopy
from functools import lru_cache from functools import lru_cache
from typing import List from typing import List
@ -2055,8 +2056,9 @@ class TqMdApi():
""" """
更新行情/委托/账户/持仓 更新行情/委托/账户/持仓
""" """
while self.api.wait_update(): while self.is_connected:
deadline = time() + 5
self.api.wait_update(deadline=deadline)
# 更新行情信息 # 更新行情信息
for vt_symbol, quote in self.quote_objs: for vt_symbol, quote in self.quote_objs:
if self.api.is_changing(quote): if self.api.is_changing(quote):

View File

@ -357,6 +357,10 @@ class RohonGateway(BaseGateway):
return False return False
if not self.td_api.connect_status or self.md_api.connect_status: if not self.td_api.connect_status or self.md_api.connect_status:
if not self.td_api.connect_status:
self.write_error(f'交易服务器连接断开')
if not self.md_api.connect_status:
self.write_error(f'行情服务器连接断开')
return False return False
return True return True
@ -572,7 +576,8 @@ class RohonMdApi(MdApi):
Callback when front server is disconnected. Callback when front server is disconnected.
""" """
self.login_status = False self.login_status = False
self.gateway.write_log(f"行情服务器连接断开,原因{reason}") self.connect_status = False
self.gateway.write_error(f"行情服务器连接断开,原因{reason}")
self.gateway.status.update({'md_con': False, 'md_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) self.gateway.status.update({'md_con': False, 'md_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool): def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool):
@ -690,6 +695,7 @@ class RohonMdApi(MdApi):
self.init() self.init()
self.connect_status = True self.connect_status = True
# If already connected, then login immediately. # If already connected, then login immediately.
elif not self.login_status: elif not self.login_status:
self.login() self.login()
@ -773,8 +779,8 @@ class RohonTdApi(TdApi):
def onFrontDisconnected(self, reason: int): def onFrontDisconnected(self, reason: int):
"""""" """"""
self.login_status = False self.login_status = False
self.gateway.write_log(f"交易服务器连接断开,原因{reason}") self.gateway.write_error(f"交易服务器连接断开,原因{reason}")
self.gateway.status.update({'td_con': True, 'td_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) self.gateway.status.update({'td_con': False, 'td_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
def onRspAuthenticate(self, data: dict, error: dict, reqid: int, last: bool): def onRspAuthenticate(self, data: dict, error: dict, reqid: int, last: bool):
"""""" """"""