[增强功能] 一般更新

This commit is contained in:
msincenselee 2020-07-02 17:28:11 +08:00
parent 08175de6d0
commit 3a086af423
6 changed files with 120 additions and 78 deletions

View File

@ -519,6 +519,8 @@ class AccountRecorder(BaseEngine):
data = event.data
dt = data.get('datetime')
# db.today_strategy_pos.createIndex({'account_id':1,'strategy_group':1,'strategy_name':1,'date':1},{'name':'accountid_strategy_group_strategy_name_date'})
account_id = data.get('accountid')
fld = {
'account_id': account_id,

View File

@ -859,7 +859,8 @@ class CtaEngine(BaseEngine):
vt_symbol: str,
days: int,
interval: Interval,
callback: Callable[[BarData], None]
callback: Callable[[BarData], None],
interval_num: int = 1
):
""""""
symbol, exchange = extract_vt_symbol(vt_symbol)
@ -875,6 +876,7 @@ class CtaEngine(BaseEngine):
symbol=symbol,
exchange=exchange,
interval=interval,
interval_num=interval_num,
start=start,
end=end
)

View File

@ -109,8 +109,15 @@ class CtaEngine(BaseEngine):
"""
super().__init__(main_engine, event_engine, APP_NAME)
# 股票引擎得配置,包括
# "accountid" : "xxxx", 资金账号,一般用于推送消息时附带
# "strategy_group": "cta_strategy_pro", # 当前实例名。多个实例时,区分开
# "trade_2_wx": true # 是否交易记录转发至微信通知
# "event_log: false # 是否转发日志到event bus显示在图形界面
# "snapshot2file": false # 是否保存切片到文件
self.engine_config = {}
# 是否激活 write_log写入event bus(比较耗资源)
self.event_log = False
self.strategy_setting = {} # strategy_name: dict
self.strategy_data = {} # strategy_name: dict
@ -222,8 +229,8 @@ class CtaEngine(BaseEngine):
# 每5分钟检查一次
if dt.minute % 10 == 0:
# 比对仓位,使用上述获取得持仓信息,不用重复获取
#self.compare_pos(strategy_pos_list=copy(all_strategy_pos))
pass
self.compare_pos(strategy_pos_list=copy(all_strategy_pos))
# 推送到事件
self.put_all_strategy_pos_event(all_strategy_pos)
@ -1301,6 +1308,7 @@ class CtaEngine(BaseEngine):
self.write_log(f'{strategy_name}返回得K线切片数据为空')
return
if self.engine_config.get('snapshot2file', False):
# 剩下工作:保存本地文件/数据库
snapshot_folder = get_folder_path(f'data/snapshots/{strategy_name}')
snapshot_file = snapshot_folder.joinpath('{}.pkb2'.format(datetime.now().strftime('%Y%m%d_%H%M%S')))
@ -1681,8 +1689,13 @@ class CtaEngine(BaseEngine):
Load setting file.
"""
# 读取引擎得配置
# "accountid" : "xxxx", 资金账号,一般用于推送消息时附带
# "strategy_group": "cta_strategy_pro", # 当前实例名。多个实例时,区分开
# "trade_2_wx": true # 是否交易记录转发至微信通知
# "event_log: false # 是否转发日志到event bus显示在图形界面
self.engine_config = load_json(self.engine_filename)
# 是否产生event log 日志一般GUI界面才产生而且比好消耗资源)
self.event_log = self.engine_config.get('event_log', False)
# 读取策略得配置
self.strategy_setting = load_json(self.setting_filename)
@ -1754,6 +1767,7 @@ class CtaEngine(BaseEngine):
"""
Create cta engine log event.
"""
if self.event_log:
# 推送至全局CTA_LOG Event
log = LogData(msg=f"{strategy_name}: {msg}" if strategy_name else msg,
gateway_name="CtaStrategy",

View File

@ -565,7 +565,7 @@ class CtaStockTemplate(CtaTemplate):
"""初始化Policy"""
self.write_log(u'init_policy(),初始化执行逻辑')
self.policy.load()
self.write_log('{}'.format(json.dumps(self.policy.to_json(),indent=2, ensure_ascii=True)))
self.write_log('{}'.format(json.dumps(self.policy.to_json(),indent=2, ensure_ascii=False)))
def init_position(self):
"""
@ -637,7 +637,7 @@ class CtaStockTemplate(CtaTemplate):
return pos
def compare_pos(self):
def compare_pos(self,auto_balance=False):
"""比较仓位"""
for vt_symbol, position in self.positions.items():
name = self.cta_engine.get_name(vt_symbol)
@ -752,13 +752,13 @@ class CtaStockTemplate(CtaTemplate):
if len(grid.order_ids) > 0:
self.write_log(f'剩余委托单号:{grid.order_ids}')
"""
# 网格的所有委托单已经执行完毕
if grid.volume <= grid.traded_volume:
grid.order_status = False
if grid.volume < grid.traded_volume:
self.write_error(f'{order.vt_symbol} 已成交总量:{grid.traded_volume}超出{grid.volume}, 更新=>{grid.traded_volume}')
grid.volume = grid.traded_volume
grid.traded_volume = 0
# 卖出完毕sell
if order.direction != Direction.LONG:
@ -774,10 +774,11 @@ class CtaStockTemplate(CtaTemplate):
# 开仓完毕( buy)
else:
grid.open_status = True
grid.traded_volume = 0
grid.open_time = self.cur_datetime
self.write_log(f'买入{order.vt_symbol}完毕,总量:{grid.volume},最后一笔委托价:{order.price}'
+ f',成交:{order.volume}')
"""
self.gt.save()
# 在策略得活动订单中,移除
@ -872,6 +873,7 @@ class CtaStockTemplate(CtaTemplate):
return
remove_gids = []
changed = False
# 多单网格逐一止损/止盈检查:
long_grids = self.gt.get_opened_grids(direction=Direction.LONG)
for lg in long_grids:
@ -896,7 +898,7 @@ class CtaStockTemplate(CtaTemplate):
lg.close_price,
lg.open_price,
lg.volume))
changed = True
if lg.traded_volume > 0:
lg.volume -= lg.traded_volume
lg.traded_volume = 0
@ -910,6 +912,7 @@ class CtaStockTemplate(CtaTemplate):
lg.order_status = True
lg.close_status = True
self.write_log(f'{lg.vt_symbol}[{cn_name}] 数量:{lg.volume},准备卖出')
continue
# 止损
@ -925,7 +928,7 @@ class CtaStockTemplate(CtaTemplate):
lg.stop_price,
lg.open_price,
lg.volume))
changed = True
if lg.traded_volume > 0:
lg.volume -= lg.traded_volume
lg.traded_volume = 0
@ -940,6 +943,7 @@ class CtaStockTemplate(CtaTemplate):
lg.close_status = True
self.write_log(f'{lg.vt_symbol}[{cn_name}] 数量:{lg.volume},准备卖出')
if changed:
if len(remove_gids) > 0:
self.gt.remove_grids_by_ids(direction=Direction.LONG, ids=remove_gids)
self.gt.save()
@ -993,7 +997,7 @@ class CtaStockTemplate(CtaTemplate):
vt_symbol=ordering_grid.vt_symbol,
direction=Direction.NET)
if acc_symbol_pos is None:
self.write_error(u'当前{}持仓查询不到'.format(ordering_grid.vt_symbol))
self.write_error(f'{self.strategy_name}当前{ordering_grid.vt_symbol}持仓查询不到, 无法执行卖出')
return
vt_symbol = ordering_grid.vt_symbol
@ -1038,10 +1042,10 @@ class CtaStockTemplate(CtaTemplate):
order_time=self.cur_datetime,
grid=ordering_grid)
if vt_orderids is None or len(vt_orderids) == 0:
self.write_error(f'委托卖出失败,{vt_symbol} 委托价:{sell_price} 数量:{sell_volume}')
self.write_error(f'{vt_symbol} 委托卖出失败,委托价:{sell_price} 数量:{sell_volume}')
return
else:
self.write_log(f'已委托卖出,{sell_volume},委托价:{sell_price}, 数量:{sell_volume}')
self.write_log(f'{vt_symbol} 已委托卖出,{sell_volume},委托价:{sell_price}, 数量:{sell_volume}')
def tns_finish_sell_grid(self, grid):
@ -1177,7 +1181,7 @@ class CtaStockTemplate(CtaTemplate):
self.write_error(f'委托买入失败,{vt_symbol} 委托价:{buy_price} 数量:{buy_volume}')
return
else:
self.write_error(f'{vt_orderids},已委托买入,{vt_symbol} 委托价:{buy_price} 数量:{buy_volume}')
self.write_log(f'{self.strategy_name}, {vt_orderids},已委托买入,{vt_symbol} 委托价:{buy_price} 数量:{buy_volume}')
def tns_finish_buy_grid(self, grid):
"""
@ -1185,7 +1189,7 @@ class CtaStockTemplate(CtaTemplate):
:return:
"""
self.write_log(u'事务完成买入网格:{},计划数量:{},计划价格:{},实际数量:{}'
.format(grid.type, grid.volume, grid.openPrice, grid.traded_volume))
.format(grid.type, grid.volume, grid.open_price, grid.traded_volume))
if grid.volume != grid.traded_volume:
grid.volume = grid.traded_volume
grid.traded_volume = 0
@ -1276,15 +1280,15 @@ class CtaStockTemplate(CtaTemplate):
name = self.cta_engine.get_name(grid.vt_symbol)
if not grid.open_status and grid.order_status:
opening_info += f'网格{grid.type},买入状态:{name}[{grid.vt_symbol}], [已买入:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}\n'
opening_info += f'网格{grid.type},买入状态:{name}[{grid.vt_symbol}], 买入价:{grid.open_price} [已买入:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}\n'
continue
if grid.open_status and not grid.close_status:
holding_info += f'网格{grid.type},持有状态:{name}[{grid.vt_symbol}],[数量:{grid.volume}, 开仓时间:{grid.open_time}]\n'
holding_info += f'网格{grid.type},持有状态:{name}[{grid.vt_symbol}],买入价:{grid.open_price} [数量:{grid.volume}, 开仓时间:{grid.open_time}]\n'
continue
if grid.open_status and grid.close_status:
closing_info += f'网格{grid.type},卖出状态:{name}[{grid.vt_symbol}], [已卖出:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}\n'
closing_info += f'网格{grid.type},卖出状态:{name}[{grid.vt_symbol}],卖出价:{grid.close_price} [已卖出:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}\n'
if len(opening_info) > 0:
self.write_log(opening_info)

View File

@ -114,8 +114,15 @@ class CtaEngine(BaseEngine):
:param event_engine: 事件引擎
"""
super().__init__(main_engine, event_engine, APP_NAME)
# 增强策略引擎得特殊参数配置
# "accountid" : "xxxx", 资金账号,一般用于推送消息时附带
# "strategy_group": "cta_strategy_pro", # 当前实例名。多个实例时,区分开
# "trade_2_wx": true # 是否交易记录转发至微信通知
# "event_log: false # 是否转发日志到event bus显示在图形界面
# "snapshot2file": false # 是否保存切片到文件
self.engine_config = {}
# 是否激活 write_log写入event bus(比较耗资源)
self.event_log = False
self.strategy_setting = {} # strategy_name: dict
self.strategy_data = {} # strategy_name: dict
@ -812,8 +819,8 @@ class CtaEngine(BaseEngine):
"""查询价格最小跳动"""
contract = self.main_engine.get_contract(vt_symbol)
if contract is None:
self.write_error(f'查询不到{vt_symbol}合约信息,缺省使用0.1作为价格跳动')
return 0.1
self.write_error(f'查询不到{vt_symbol}合约信息,缺省使用1作为价格跳动')
return 1
return contract.pricetick
@ -1313,6 +1320,7 @@ class CtaEngine(BaseEngine):
self.write_log(f'{strategy_name}返回得K线切片数据为空')
return
if self.engine_config.get('snapshot2file', False):
# 剩下工作:保存本地文件/数据库
snapshot_folder = get_folder_path(f'data/snapshots/{strategy_name}')
snapshot_file = snapshot_folder.joinpath('{}.pkb2'.format(datetime.now().strftime('%Y%m%d_%H%M%S')))
@ -1498,7 +1506,7 @@ class CtaEngine(BaseEngine):
spd_vt_symbol = pos.get('vt_symbol', None)
if spd_vt_symbol is not None and spd_vt_symbol.endswith('SPD'):
spd_symbol, spd_exchange = extract_vt_symbol(spd_vt_symbol)
spd_setting = self.main_engine.get_all_custom_contracts().get(spd_symbol, None)
spd_setting = self.main_engine.get_all_custom_contracts(rtn_setting=True).get(spd_symbol, None)
if spd_setting is None:
self.write_error(u'获取不到:{}得设置信息,检查自定义合约配置文件'.format(spd_symbol))
@ -1511,13 +1519,13 @@ class CtaEngine(BaseEngine):
leg1_pos = {}
leg1_pos.update({'symbol': spd_setting.get('leg1_symbol')})
leg1_pos.update({'vt_symbol': spd_setting.get('leg1_symbol')})
leg1_pos.update({'vt_symbol': '{}.{}'.format(spd_setting.get('leg1_symbol'), spd_setting.get('leg1_exchange'))})
leg1_pos.update({'direction': leg1_direction})
leg1_pos.update({'volume': spd_setting.get('leg1_ratio', 1) * spd_volume})
leg2_pos = {}
leg2_pos.update({'symbol': spd_setting.get('leg2_symbol')})
leg2_pos.update({'vt_symbol': spd_setting.get('leg2_symbol')})
leg2_pos.update({'vt_symbol': '{}.{}'.format(spd_setting.get('leg2_symbol'), spd_setting.get('leg2_exchange'))})
leg2_pos.update({'direction': leg2_direction})
leg2_pos.update({'volume': spd_setting.get('leg2_ratio', 1) * spd_volume})
@ -1649,7 +1657,7 @@ class CtaEngine(BaseEngine):
continue
if holding.exchange == Exchange.SPD:
continue
if '&' in holding.vt_symbol and (holding.vt_symbol.startswith('SP') or holding.vt_symbol.startswith('STG')):
if '&' in holding.vt_symbol and (holding.vt_symbol.startswith('SP') or holding.vt_symbol.startswith('STG') or holding.vt_symbol.startswith('PRT')):
continue
compare_pos[vt_symbol] = OrderedDict(
@ -1672,7 +1680,7 @@ class CtaEngine(BaseEngine):
vt_symbols.add(vt_symbol)
symbol_pos = compare_pos.get(vt_symbol, None)
if symbol_pos is None:
self.write_log(u'账号持仓信息获取不到{},创建一个'.format(vt_symbol))
# self.write_log(u'账号持仓信息获取不到{},创建一个'.format(vt_symbol))
symbol_pos = OrderedDict(
{
"账号空单": 0,
@ -1700,9 +1708,8 @@ class CtaEngine(BaseEngine):
compare_info = ''
for vt_symbol in sorted(vt_symbols):
# 发送不一致得结果
symbol_pos = compare_pos.pop(vt_symbol, None)
if symbol_pos is None:
continue
symbol_pos = compare_pos.pop(vt_symbol, {})
d_long = {
'account_id': self.engine_config.get('accountid', '-'),
'vt_symbol': vt_symbol,
@ -1716,21 +1723,21 @@ class CtaEngine(BaseEngine):
'strategy_list': symbol_pos.get('空单策略', [])}
# 多空都一致
if round(symbol_pos['账号空单'], 7) == round(symbol_pos['策略空单'], 7) and \
round(symbol_pos['账号多单'], 7) == round(symbol_pos['策略多单'], 7):
if round(symbol_pos.get('账号空单',0), 7) == round(symbol_pos.get('策略空单',0), 7) and \
round(symbol_pos.get('账号多单',0), 7) == round(symbol_pos.get('策略多单',0), 7):
msg = u'{}多空都一致.{}\n'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False))
self.write_log(msg)
compare_info += msg
else:
pos_compare_result += '\n{}: '.format(vt_symbol)
# 判断是多单不一致?
diff_long_volume = round(symbol_pos['账号多单'], 7) - round(symbol_pos['策略多单'], 7)
diff_long_volume = round(symbol_pos.get('账号多单',0), 7) - round(symbol_pos.get('策略多单',0), 7)
if diff_long_volume != 0:
msg = '{}多单[账号({}), 策略{},共({})], ' \
.format(vt_symbol,
symbol_pos['账号多单'],
symbol_pos['多单策略'],
symbol_pos['策略多单'])
symbol_pos.get('账号多单'),
symbol_pos.get('多单策略'),
symbol_pos.get('策略多单'))
pos_compare_result += msg
self.write_error(u'{}不一致:{}'.format(vt_symbol, msg))
@ -1739,14 +1746,14 @@ class CtaEngine(BaseEngine):
self.balance_pos(vt_symbol, Direction.LONG, diff_long_volume)
# 判断是空单不一致:
diff_short_volume = round(symbol_pos['账号空单'], 7) - round(symbol_pos['策略空单'], 7)
diff_short_volume = round(symbol_pos.get('账号空单',0), 7) - round(symbol_pos.get('策略空单',0), 7)
if diff_short_volume != 0:
msg = '{}空单[账号({}), 策略{},共({})], ' \
.format(vt_symbol,
symbol_pos['账号空单'],
symbol_pos['空单策略'],
symbol_pos['策略空单'])
symbol_pos.get('账号空单'),
symbol_pos.get('空单策略'),
symbol_pos.get('策略空单'))
pos_compare_result += msg
self.write_error(u'{}不一致:{}'.format(vt_symbol, msg))
compare_info += u'{}不一致:{}\n'.format(vt_symbol, msg)
@ -1841,6 +1848,8 @@ class CtaEngine(BaseEngine):
"""
# 读取引擎得配置
self.engine_config = load_json(self.engine_filename)
# 是否产生event log 日志一般GUI界面才产生而且比好消耗资源)
self.event_log = self.engine_config.get('event_log', False)
# 读取策略得配置
self.strategy_setting = load_json(self.setting_filename)
@ -1913,6 +1922,7 @@ class CtaEngine(BaseEngine):
"""
Create cta engine log event.
"""
if self.event_log:
# 推送至全局CTA_LOG Event
log = LogData(msg=f"{strategy_name}: {msg}" if strategy_name else msg,
gateway_name="CtaStrategy",

View File

@ -384,6 +384,7 @@ class CtaTemplate(ABC):
days: int,
interval: Interval = Interval.MINUTE,
callback: Callable = None,
interval_num: int = 1
):
"""
Load historical bar data for initializing strategy.
@ -391,7 +392,7 @@ class CtaTemplate(ABC):
if not callback:
callback = self.on_bar
self.cta_engine.load_bar(self.vt_symbol, days, interval, callback)
self.cta_engine.load_bar(self.vt_symbol, days, interval, callback, interval_num)
def load_tick(self, days: int):
"""
@ -1059,7 +1060,7 @@ class CtaProTemplate(CtaTemplate):
def save_tns(self, tns_data):
"""
保存多空事务记录=csv文件,便于后续分析
:param tns_data:
:param tns_data: {"datetime":xxx, "direction":"long"或者"short", "price":xxx}
:return:
"""
if self.backtesting:
@ -1770,6 +1771,7 @@ class CtaProFutureTemplate(CtaProTemplate):
if self.activate_today_lock:
self.write_log(u'昨仓多单:{},没有今仓,满足条件,直接平昨仓'.format(grid_pos.long_yd))
sell_price = self.cta_engine.get_price(sell_symbol)
if sell_price is None:
self.write_error(f'暂时不能获取{sell_symbol}价格,不能平仓')
@ -1780,6 +1782,10 @@ class CtaProFutureTemplate(CtaProTemplate):
grid.volume -= grid.traded_volume
grid.traded_volume = 0
if grid_pos.long_pos <grid.volume:
self.write_error(f'账号{sell_symbol}多单持仓:{grid_pos.long_pos}不满足平仓:{grid.volume}要求:')
return False
vt_orderids = self.sell(price=sell_price,
volume=grid.volume,
vt_symbol=sell_symbol,
@ -1870,6 +1876,10 @@ class CtaProFutureTemplate(CtaProTemplate):
grid.volume -= grid.traded_volume
grid.traded_volume = 0
if grid_pos.short_pos < grid.volume:
self.write_error(f'账号{cover_symbol}多单持仓:{grid_pos.short_pos}不满足平仓:{grid.volume}要求:')
return False
vt_orderids = self.cover(price=cover_price,
volume=grid.volume,
vt_symbol=cover_symbol,
@ -1997,7 +2007,7 @@ class CtaProFutureTemplate(CtaProTemplate):
target_long_grid = None
remove_long_grid_ids = []
for g in sorted(locked_long_grids, key=lambda grid: grid.volume):
if g.orderStatus or len(g.orderRef) > 0:
if g.order_status or len(g.orderRef) > 0:
continue
if target_long_grid is None:
target_long_grid = g
@ -2133,7 +2143,7 @@ class CtaProFutureTemplate(CtaProTemplate):
vt_symbol = g.snapshot.get('mi_symbol', self.vt_symbol)
volume = g.volume - g.traded_volume
locked_long_dict.update({vt_symbol: locked_long_dict.get(vt_symbol, 0) + volume})
if g.orderStatus or g.order_ids:
if g.order_status or g.order_ids:
self.write_log(u'当前对锁格:{}存在委托,不进行解锁'.format(g.to_json()))
return
@ -2150,7 +2160,7 @@ class CtaProFutureTemplate(CtaProTemplate):
vt_symbol = g.snapshot.get('mi_symbol', self.vt_symbol)
volume = g.volume - g.traded_volume
locked_short_dict.update({vt_symbol: locked_short_dict.get(vt_symbol, 0) + volume})
if g.orderStatus or g.order_ids:
if g.order_status or g.order_ids:
self.write_log(u'当前对锁格:{}存在委托,不进行解锁'.format(g.to_json()))
return
@ -2238,7 +2248,7 @@ class CtaProFutureTemplate(CtaProTemplate):
for g in long_grids:
# 满足离场条件,或者碰到止损价格
if g.stop_price > 0 and g.stop_price > self.cur_99_price \
and g.openStatus and not g.orderStatus:
and g.open_status and not g.order_status:
dist_record = dict()
dist_record['datetime'] = self.cur_datetime
dist_record['symbol'] = self.idx_symbol
@ -2262,7 +2272,7 @@ class CtaProFutureTemplate(CtaProTemplate):
short_grids = self.gt.get_opened_grids_without_types(direction=Direction.SHORT, types=[LOCK_GRID])
for g in short_grids:
if g.stop_price > 0 and g.stop_price < self.cur_99_price \
and g.openStatus and not g.orderStatus:
and g.open_status and not g.order_status:
dist_record = dict()
dist_record['datetime'] = self.cur_datetime
dist_record['symbol'] = self.idx_symbol