[增强功能] 增加定时检查策略是否停止

This commit is contained in:
msincenselee 2020-06-15 16:45:33 +08:00
parent 42e952b2cd
commit 3c503d5958
4 changed files with 43 additions and 15 deletions

View File

@ -12,6 +12,7 @@ from .template import (
Status, Status,
Color, Color,
Interval, Interval,
OrderType,
TickData, TickData,
BarData, BarData,
TradeData, TradeData,

View File

@ -1627,7 +1627,7 @@ class BackTestingEngine(object):
# 计算每个策略实例的持仓盈亏 # 计算每个策略实例的持仓盈亏
strategy_pnl.update({longpos.strategy_name: strategy_pnl.get(longpos.strategy_name, 0) + holding_profit}) strategy_pnl.update({longpos.strategy_name: strategy_pnl.get(longpos.strategy_name, 0) + holding_profit})
positionMsg += "{},long,p={},v={},m={};".format(symbol, longpos.price, longpos.volume, holding_profit) positionMsg += "\n[{},long,p={},v={},m={}]".format(symbol, longpos.price, longpos.volume, round(holding_profit,2))
data['net'] = c + today_holding_profit # 当日净值(含持仓盈亏) data['net'] = c + today_holding_profit # 当日净值(含持仓盈亏)
data['rate'] = (c + today_holding_profit) / self.init_capital data['rate'] = (c + today_holding_profit) / self.init_capital

View File

@ -144,7 +144,9 @@ class CtaEngine(BaseEngine):
self.positions = {} self.positions = {}
self.last_minute = None self.last_minute = datetime.now().strftime('%H%M')
self.health_status = {}
def init_engine(self): def init_engine(self):
""" """
@ -203,19 +205,17 @@ class CtaEngine(BaseEngine):
def process_timer_event(self, event: Event): def process_timer_event(self, event: Event):
""" 处理定时器事件""" """ 处理定时器事件"""
all_trading = True
# 触发每个策略的定时接口
for strategy in list(self.strategies.values()):
strategy.on_timer()
if not strategy.trading:
all_trading = False
dt = datetime.now() dt = datetime.now()
# 每分钟执行的逻辑 # 每分钟执行的逻辑
if self.last_minute != dt.minute: if self.last_minute != dt.strftime('%H%M'):
self.last_minute = dt.minute self.last_minute = dt.strftime('%H%M')
if all_trading: # 检测策略的交易情况
self.health_check()
# 在国内股市开盘期间做检查
if '0930' < self.last_minute < '1530':
# 主动获取所有策略得持仓信息 # 主动获取所有策略得持仓信息
all_strategy_pos = self.get_all_strategy_pos() all_strategy_pos = self.get_all_strategy_pos()
@ -724,6 +724,9 @@ class CtaEngine(BaseEngine):
def cancel_order(self, strategy: CtaTemplate, vt_orderid: str): def cancel_order(self, strategy: CtaTemplate, vt_orderid: str):
""" """
""" """
if vt_orderid is None or len(vt_orderid) == 0:
return False
if vt_orderid.startswith(STOPORDER_PREFIX): if vt_orderid.startswith(STOPORDER_PREFIX):
return self.cancel_local_stop_order(strategy, vt_orderid) return self.cancel_local_stop_order(strategy, vt_orderid)
else: else:
@ -1631,6 +1634,29 @@ class CtaEngine(BaseEngine):
self.write_log(u'账户持仓与策略一致') self.write_log(u'账户持仓与策略一致')
return True, compare_info return True, compare_info
def health_check(self):
"""策略健康检查"""
for strategy_name in list(self.strategies.keys()):
# 获取策略
strategy = self.strategies.get(strategy_name)
if not strategy:
continue
health_count = self.health_status.get(strategy_name, 0)
# 取消计数器
if health_count > 0 and strategy.trading:
self.health_status.pop(strategy_name, None)
continue
# 增加计数器,如果超时十次,发出告警信息
if not strategy.trading:
health_count += 1
if health_count > 10:
self.send_wechat(f'{strategy_name}交易状态停止超过10次检查周期,请检查')
health_count = 0
self.health_status.update({strategy_name: health_count})
def init_all_strategies(self): def init_all_strategies(self):
""" """
""" """

View File

@ -434,7 +434,7 @@ class CtaStockTemplate(CtaTemplate):
self.klines = {} # K线组件字典: kline_name: kline self.klines = {} # K线组件字典: kline_name: kline
self.positions = {} # 策略内持仓记录, vt_symbol: PositionData self.positions = {} # 策略内持仓记录, vt_symbol: PositionData
self.order_type = OrderType.LIMIT self.order_type = OrderType.LIMIT
self.cancel_seconds = 10 # 撤单时间(秒) self.cancel_seconds = 120 # 撤单时间(秒)
# 资金相关 # 资金相关
self.max_invest_rate = 0.1 # 最大仓位(0~1) self.max_invest_rate = 0.1 # 最大仓位(0~1)
@ -781,6 +781,7 @@ class CtaStockTemplate(CtaTemplate):
self.gt.save() self.gt.save()
# 在策略得活动订单中,移除 # 在策略得活动订单中,移除
self.write_log(f'移除活动订单:{order.vt_orderid}')
self.active_orders.pop(order.vt_orderid, None) self.active_orders.pop(order.vt_orderid, None)
def on_order_open_canceled(self, order: OrderData): def on_order_open_canceled(self, order: OrderData):
@ -1142,7 +1143,7 @@ class CtaStockTemplate(CtaTemplate):
buy_volume = max_buy_volume buy_volume = max_buy_volume
# 实盘运行时,要加入市场买卖量的判断 # 实盘运行时,要加入市场买卖量的判断
if not self.backtesting: if not self.backtesting and 'market' in ordering_grid.snapshot:
symbol_tick = self.cta_engine.get_tick(vt_symbol) symbol_tick = self.cta_engine.get_tick(vt_symbol)
# 根据市场计算前5档买单数量 # 根据市场计算前5档买单数量
if all([symbol_tick.ask_volume_1, symbol_tick.ask_volume_2, symbol_tick.ask_volume_3, if all([symbol_tick.ask_volume_1, symbol_tick.ask_volume_2, symbol_tick.ask_volume_3,
@ -1156,7 +1157,7 @@ class CtaStockTemplate(CtaTemplate):
buy_volume = min(market_bid_volumes / 4, market_ask_volumes / 4, buy_volume) buy_volume = min(market_bid_volumes / 4, market_ask_volumes / 4, buy_volume)
buy_volume = max(buy_volume - buy_volume % min_trade_volume, min_trade_volume) buy_volume = max(buy_volume - buy_volume % min_trade_volume, min_trade_volume)
buy_price = cur_price + self.cta_engine.get_price_tick(vt_symbol) buy_price = cur_price + self.cta_engine.get_price_tick(vt_symbol) * 10
vt_orderids = self.buy( vt_orderids = self.buy(
vt_symbol=vt_symbol, vt_symbol=vt_symbol,
@ -1168,7 +1169,7 @@ class CtaStockTemplate(CtaTemplate):
self.write_error(f'委托买入失败,{vt_symbol} 委托价:{buy_price} 数量:{buy_volume}') self.write_error(f'委托买入失败,{vt_symbol} 委托价:{buy_price} 数量:{buy_volume}')
return return
else: else:
self.write_error(f'已委托买入,{vt_symbol} 委托价:{buy_price} 数量:{buy_volume}') self.write_error(f'{vt_orderids},已委托买入,{vt_symbol} 委托价:{buy_price} 数量:{buy_volume}')
def tns_finish_buy_grid(self, grid): def tns_finish_buy_grid(self, grid):
""" """