[update] 期权轧差、网关拒单风控

This commit is contained in:
msincenselee 2021-12-28 13:23:06 +08:00
parent cba14c9b94
commit 55b207f5db
8 changed files with 145 additions and 44 deletions

View File

@ -182,6 +182,7 @@ class CtaOptionEngine(BaseEngine):
self.vt_tradeids = set() # for filtering duplicate trade
self.active_orders = {}
self.internal_orderids = set()
self.single_execute_volume = 1
self.net_pos_target = {} # 净仓目标, vt_symbol: {pos: 正负数}
self.net_pos_holding = {} # 净仓持有, vt_symbol: {pos: 正负数}
@ -222,6 +223,7 @@ class CtaOptionEngine(BaseEngine):
def init_engine(self):
"""
初始化引擎
"""
self.register_event()
self.register_funcs()
@ -234,6 +236,10 @@ class CtaOptionEngine(BaseEngine):
self.write_log("CTA策略引擎初始化成功")
if self.engine_config.get('single_execute_volume',0) > 0:
self.single_execute_volume = self.engine_config.get('single_execute_volume',1)
self.write_log(f'使用配置得单笔下仓数量:{self.single_execute_volume}')
if self.engine_config.get('get_pos_from_db', False):
self.write_log(f'激活数据库策略仓位比对模式')
self.init_mongo_data()
@ -318,6 +324,7 @@ class CtaOptionEngine(BaseEngine):
if dt.hour == 2 and dt.minute == 59 and dt.second >= 55:
self.cancel_all(strategy)
# 每分钟执行的逻辑
if self.last_minute != dt.minute:
self.last_minute = dt.minute
@ -338,9 +345,9 @@ class CtaOptionEngine(BaseEngine):
(datetime.now() - order.datetime).total_seconds() > 60 and \
order.status in [Status.NOTTRADED, Status.PARTTRADED]:
self.write_log(
f'内部活动订单{order.orderid}, {order.vt_symbol}[{order.name}], {order.direction.value}, {order.offset.value}超时')
f'内部活动订单{order.orderid}, {order.vt_symbol}[{order.name}], {order.direction.value}, {order.offset.value},超时.发出撤单')
req = order.create_cancel_request()
return self.main_engine.cancel_order(req, order.gateway_name)
self.main_engine.cancel_order(req, order.gateway_name)
for vt_symbol in set(self.net_pos_target.keys()).union(set(self.net_pos_holding.keys())):
self.execute_pos_target(vt_symbol)
@ -394,20 +401,28 @@ class CtaOptionEngine(BaseEngine):
self.call_strategy_func(strategy, strategy.on_bar, {bar.vt_symbol: bar})
def process_order_event(self, event: Event):
""""""
"""
委托更新事件处理
:param event:
:return:
"""
order = event.data
strategy = self.orderid_strategy_map.get(order.vt_orderid, None)
if not strategy:
# self.write_log(f'委托单没有对应的策略设置:order:{order.__dict__}')
if order.vt_orderid in self.internal_orderids:
self.write_log(f'委托更新 => 内部仓位: {print_dict(order.__dict__)}')
# self.write_log(f'当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}')
if order.type != OrderType.STOP:
if order.status in [Status.ALLTRADED, Status.CANCELLED, Status.REJECTED]:
self.write_log(f'委托更新 => 内部仓位 => 移除活动订单')
self.active_orders.pop(order.vt_orderid, None)
elif order.status in [Status.SUBMITTING, Status.NOTTRADED, Status.PARTTRADED]:
self.write_log(f'委托更新 => 内部仓位 => 更新活动订单')
self.active_orders.update({order.vt_orderid: copy(order)})
else:
self.write_log(f'委托更新 => 系统账号 => {print_dict(order.__dict__)}')
return
self.write_log(f'委托更新:{order.vt_orderid} => 策略:{strategy.strategy_name}')
# Remove vt_orderid if order is no longer active.
@ -434,33 +449,44 @@ class CtaOptionEngine(BaseEngine):
self.call_strategy_func(strategy, strategy.on_order, order)
def process_trade_event(self, event: Event):
""""""
"""
成交更新事件处理
:param event:
:return:
"""
trade = event.data
# Filter duplicate trade push
if trade.vt_tradeid in self.vt_tradeids:
self.write_log(f'成交单的交易编号{trade.vt_tradeid}已处理完毕,不再处理')
self.write_log(f'成交更新 => 交易编号{trade.vt_tradeid}已处理完毕,不再处理')
return
self.vt_tradeids.add(trade.vt_tradeid)
strategy = self.orderid_strategy_map.get(trade.vt_orderid, None)
# 该成交得单子,不属于策略,可能是内部,或者其他实例得成交
if not strategy:
# 属于内部单子
if trade.vt_orderid in self.internal_orderids:
cur_pos = self.net_pos_holding.get(trade.vt_symbol,0)
cur_pos = self.net_pos_holding.get(trade.vt_symbol, 0)
if trade.direction == Direction.LONG:
new_pos = cur_pos + trade.volume
else:
new_pos = cur_pos - trade.volume
self.write_log(f'内部委托单成交更新,{trade.vt_symbol}[{trade.name}]: {cur_pos} => {new_pos}')
self.write_log(f'成交更新 => 内部订单 {trade.vt_symbol}[{trade.name}]: {cur_pos} => {new_pos}')
self.write_log(f'成交单:trade:{print_dict(trade.__dict__)}')
self.net_pos_holding.update({trade.vt_symbol: new_pos})
self.save_internal_data()
# 可能是其他实例得
else:
self.write_log(f'成交单没有对应的策略设置:trade:{trade.__dict__}')
self.write_log(f'当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}')
self.write_log(f'成交更新 => 没有对应的策略设置:trade:{trade.__dict__}')
self.write_log(f'成交更新 => 当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}')
self.write_log(f'成交更新 => 当前内部订单清单:{self.internal_orderids}')
return
self.write_log(f'成交更新:{trade.vt_orderid} => 策略:{strategy.strategy_name}')
self.write_log(f'成交更新 =>:{trade.vt_orderid} => 策略:{strategy.strategy_name}')
# Update strategy pos before calling on_trade method
# 取消外部干预策略pos由策略自行完成更新
@ -885,7 +911,7 @@ class CtaOptionEngine(BaseEngine):
for vt_orderid in copy(vt_orderids):
self.cancel_order(strategy_name, vt_orderid)
def handel_internal_order(self, **kwargs) -> str:
def handel_internal_order(self, **kwargs):
"""
处理内部订单
策略 => 内部订单 => 产生内部订单号 => 登记内部处理逻辑 => 添加后续异步task
@ -996,9 +1022,9 @@ class CtaOptionEngine(BaseEngine):
j = load_json(f_name,auto_save=True)
self.net_pos_target = j.get('net_pos_target', {})
self.net_pos_holding = j.get('net_pos_holding', {})
self.write_log('恢复内部持仓目标{}'.format(
self.write_log('恢复内部目标持仓:{}'.format(
';'.join([f'{k}[{self.get_name(k)}]:{v}' for k,v in self.net_pos_target.items()])))
self.write_log('恢复内部持仓:{}'.format(
self.write_log('恢复内部现有持仓:{}'.format(
';'.join([f'{k}[{self.get_name(k)}]:{v}' for k, v in self.net_pos_target.items()])))
except Exception as ex:
@ -1025,10 +1051,10 @@ class CtaOptionEngine(BaseEngine):
:param vt_symbol:
:return:
"""
target_pos = self.net_pos_target.get(vt_symbol, 0)
holding_pos = self.net_pos_holding.get(vt_symbol, 0)
target_pos = self.net_pos_target.get(vt_symbol, 0) # 该合约内部得目标持仓
holding_pos = self.net_pos_holding.get(vt_symbol, 0) # 该合约内部得现有持仓
diff_pos = target_pos - holding_pos
diff_pos = target_pos - holding_pos # 找出差异
if diff_pos == 0:
return
# 获取最新价
@ -1051,26 +1077,29 @@ class CtaOptionEngine(BaseEngine):
return
price_tick = self.get_price_tick(vt_symbol)
# 需要买入
# 需要增加仓位( buy or cover)
if diff_pos > 0:
# 账号得多、空
# 账号得多、空仓位
acc_long_position = self.get_position(vt_symbol=vt_symbol, direction=Direction.LONG)
acc_long_pos = 0 if acc_long_position is None else acc_long_position.volume-acc_long_position.frozen
acc_short_position = self.get_position(vt_symbol=vt_symbol, direction=Direction.SHORT)
acc_short_pos = 0 if acc_short_position is None else acc_short_position.volume-acc_short_position.frozen
cover_pos = 0
buy_pos = 0
if diff_pos > self.single_execute_volume:
self.write_log(f'内部仓位 => 执行{vt_symbol} => 降低交易头寸: {diff_pos} -> {self.single_execute_volume}')
diff_pos = self.single_execute_volume
# 仅平仓
if acc_short_pos > 0:
# 优先平空单
cover_pos = min(diff_pos,acc_short_pos)
cover_pos = min(diff_pos, acc_short_pos)
buy_pos = diff_pos - cover_pos
else:
# 仅开仓
cover_pos = 0
buy_pos = diff_pos
self.write_log(f'{self.engine_name}仓位执行{vt_symbol}[{self.get_name(vt_symbol)}]: ' +
self.write_log(f'内部仓位 => 执行{vt_symbol}[{self.get_name(vt_symbol)}]: ' +
f'[账号多单:{acc_long_pos},空单:{acc_short_pos}]' +
f'[holding:{holding_pos} =>target:{target_pos} ] => cover:{cover_pos} + buy:{buy_pos}')
if cover_pos > 0:
@ -1087,6 +1116,7 @@ class CtaOptionEngine(BaseEngine):
internal=False
)
if len(vt_orderids) > 0:
self.write_log(f'内部仓位 => 执行 => cover 登记委托编号:{vt_orderids}')
self.internal_orderids =self.internal_orderids.union(vt_orderids)
if buy_pos > 0:
@ -1103,16 +1133,21 @@ class CtaOptionEngine(BaseEngine):
internal=False
)
if len(vt_orderids) > 0:
self.write_log(f'内部仓位 => 执行 => buy 登记委托编号:{vt_orderids}')
self.internal_orderids= self.internal_orderids.union(vt_orderids)
# 需要卖出 ( diff_pos < 0)
else:
# 账号得多、空
# 账号得多、空
acc_long_position = self.get_position(vt_symbol=vt_symbol, direction=Direction.LONG)
acc_long_pos = 0 if acc_long_position is None else acc_long_position.volume - acc_long_position.frozen
acc_short_position = self.get_position(vt_symbol=vt_symbol, direction=Direction.SHORT)
acc_short_pos = 0 if acc_short_position is None else acc_short_position.volume - acc_short_position.frozen
# 如果持有多单,优先平掉多单
if abs(diff_pos) > self.single_execute_volume:
self.write_log(f'内部仓位 => 执行{vt_symbol} => 降低交易头寸: {abs(diff_pos)} -> {self.single_execute_volume}')
diff_pos = -self.single_execute_volume
# 如果账号持有多单,优先平掉账号多单
if acc_long_pos > 0:
sell_pos = min(abs(diff_pos), acc_long_pos)
short_pos = abs(diff_pos) - sell_pos
@ -1121,7 +1156,7 @@ class CtaOptionEngine(BaseEngine):
sell_pos = 0
short_pos = abs(diff_pos)
self.write_log(f'{self.engine_name}仓位执行{vt_symbol}[{self.get_name(vt_symbol)}]' +
self.write_log(f'内部仓位 => 执行{vt_symbol}[{self.get_name(vt_symbol)}]' +
f'[账号多单:{acc_long_pos},空单:{acc_short_pos}]' +
f'[holding:{holding_pos} => target:{target_pos}] => sell:{sell_pos}, short:{short_pos}')
@ -1139,6 +1174,7 @@ class CtaOptionEngine(BaseEngine):
internal=False
)
if len(vt_orderids) > 0:
self.write_log(f'内部仓位 => 执行 => sell 登记委托编号:{vt_orderids}')
self.internal_orderids = self.internal_orderids.union(vt_orderids)
if short_pos > 0:
if not self.exist_order(vt_symbol, direction=Direction.SHORT, offset=Offset.OPEN):
@ -1147,13 +1183,14 @@ class CtaOptionEngine(BaseEngine):
vt_symbol=vt_symbol,
price=cur_price,
volume=short_pos,
direction=Direction.LONG,
direction=Direction.SHORT,
offset=Offset.OPEN,
order_type=OrderType.LIMIT,
stop=False,
internal=False
)
if len(vt_orderids) > 0:
self.write_log(f'内部仓位 => 执行 => short 登记委托编号:{vt_orderids}')
self.internal_orderids = self.internal_orderids.union(vt_orderids)
def exist_order(self, vt_symbol, direction, offset):
@ -1174,7 +1211,11 @@ class CtaOptionEngine(BaseEngine):
continue
if order.vt_symbol == vt_symbol and order.direction == direction and order.offset == offset:
self.write_log(f'引擎存在相同得内部活动订单:{order.name}')
self.write_log(f'引擎存在相同的内部活动订单:{order.name}')
return True
if order.vt_symbol == vt_symbol and order.direction != direction and order.offset != offset:
self.write_log(f'引擎存在可能自成交的内部活动订单:{order.name}')
return True
return False

View File

@ -415,7 +415,7 @@ class BackTestingEngine(object):
@lru_cache()
def get_slippage(self, vt_symbol: str):
"""获取滑点"""
"""获取滑点点数"""
return self.slippage.get(vt_symbol, 0)
def set_size(self, vt_symbol: str, size: int):

View File

@ -226,7 +226,7 @@ class CtaEngine(BaseEngine):
# 触发每个策略的定时接口
for strategy in list(self.strategies.values()):
strategy.on_timer()
self.call_strategy_func(strategy, strategy.on_timer)
if not strategy.trading:
all_trading = False

View File

@ -93,12 +93,12 @@ class CtaLineBar(object):
lineMSetting = {}
lineMSetting['name'] = u'M1'
lineMSetting['interval'] = Interval.MINUTE
lineMSetting['bar_interval'] = 60 # 1分钟对应60秒
lineMSetting['bar_interval'] = 1 # 1分钟对应60秒
lineMSetting['para_ema1_len'] = 7 # EMA线1的周期
lineMSetting['para_ema2_len'] = 21 # EMA线2的周期
lineMSetting['para_boll_len'] = 20 # 布林特线周期
lineMSetting['para_boll_std_rate'] = 2 # 布林特线标准差
lineMSetting['price_tick'] = self.price_tick # 最小
lineMSetting['price_tick'] = self.price_tick # 最小
lineMSetting['underlying_symbol'] = self.underlying_symbol #商品短号
self.lineM = CtaLineBar(self, self.onBar, lineMSetting)
@ -5876,6 +5876,47 @@ class CtaLineBar(object):
return False
def is_duan_divergence(self, direction, user_macd=False):
"""
判断 两个线段是否背驰
:param direction: 1-1 或者 Direction.LONG判断是否顶背离, Direction.SHORT判断是否底背离
:param user_macd:
:return:
"""
if isinstance(direction, Direction):
direction = 1 if direction == Direction.LONG else -1
if self.tre_duan is None:
return False
# 获取对比的两个线段
if self.cur_duan.direction == direction:
cur_duan = self.cur_duan
tre_duan = self.tre_duan
else:
if len(self.duan_list) < 4:
return False
cur_duan = self.duan_list[-2]
tre_duan = self.duan_list[-4]
# 判断dif值是否背驰
is_dif_div = False
if user_macd:
cur_dif = self.get_dif_by_dt(cur_duan.end)
tre_dif = self.get_dif_by_dt(tre_duan.end)
if (cur_dif > tre_dif and direction == -1) or (cur_dif < tre_dif and direction == 1):
is_dif_div = True
else:
is_dif_div = True
# 判断 高度斜率dif值
if cur_duan.height <= tre_duan.height and cur_duan.atan < tre_duan.atan and is_dif_div:
if (cur_duan.low < tre_duan.low and cur_duan.high < tre_duan.high and direction == -1) \
or (cur_duan.high > tre_duan.high and cur_duan.low > tre_duan.high and direction == 1):
return True
return False
def is_fx_macd_divergence(self, direction, cur_duan=None, use_macd=False):
"""
分型的macd背离
@ -6267,7 +6308,7 @@ class CtaLineBar(object):
'end': self.cur_bi.end,
'price': price,
'signal': signal})
if len(xt_signals) > 200:
if len(xt_signals) > 20:
del xt_signals[0]
if cur_signal is not None and self.export_xt_filename:
self.append_data(
@ -6295,6 +6336,9 @@ class CtaLineBar(object):
'end': self.cur_bi.end,
'price': price,
'signal': qsbc_2nd})
if len(self.xt_2nd_signals) > 20:
del self.xt_2nd_signals[0]
if cur_signal is not None and self.export_xt_filename:
self.append_data(
file_name=self.export_xt_filename.replace('_n_', f'_2nd_'),
@ -6309,7 +6353,7 @@ class CtaLineBar(object):
"""
获取n笔形态/信号的倒x笔结果
:param n:
:param x: 倒x笔如倒1笔
:param x: 倒x笔如倒1笔, 倒0笔
:return: {}
"""
xt_signals = getattr(self, xt_name)

View File

@ -259,7 +259,6 @@ class BinanceFutureData(RestClient):
contracts = load_json(f, auto_save=False)
return contracts
def save_contracts(self):
"""保存合约配置"""
contracts = self.get_contracts()

View File

@ -392,6 +392,11 @@ class PbGateway(BaseGateway):
def send_order(self, req: OrderRequest) -> str:
""""""
k = f'{req.vt_symbol}_{req.direction.value}_{req.offset.value}'
if len(self.rejected_orders.get(k, [])) > 5:
self.write_error(f'该合约相同请求已经被拒单五次,不能再发单:{print_dict(req.__dict__)}')
return ""
return self.td_api.send_order(req)
def cancel_order(self, req: CancelRequest) -> None:

View File

@ -32,7 +32,8 @@ from .object import (
CancelRequest,
SubscribeRequest,
HistoryRequest,
Exchange
Exchange,
Status
)
from vnpy.trader.utility import get_folder_path, round_to, get_underlying_symbol, get_real_symbol_by_exchange
@ -103,6 +104,8 @@ class BaseGateway(ABC):
self.query_functions = []
self.rejected_orders = {} # 当日被拒单得订单, vt_symbol_direction_offset:[orders]
def create_logger(self):
"""
创建engine独有的日志
@ -155,6 +158,13 @@ class BaseGateway(ABC):
Order event push.
Order event of a specific vt_orderid is also pushed.
"""
# 如果是拒单,进行登记
if order.status == Status.REJECTED:
k = f'{order.vt_symbol}_{order.direction.value}_{order.offset.value}'
orders = self.rejected_orders.get(k,[])
orders.append(deepcopy(order))
self.rejected_orders.update({k:orders})
self.on_event(EVENT_ORDER, order)
# self.on_event(EVENT_ORDER + order.vt_orderid, order)
@ -782,6 +792,8 @@ class LocalOrderManager:
Keep an order buf before pushing it to gateway.
"""
self.orders[order.orderid] = copy(order)
self.gateway.on_order(order)
def cancel_order(self, req: CancelRequest) -> None:

View File

@ -95,7 +95,7 @@ def get_stock_exchange(code, vn=True):
market_id = 0 # 缺省深圳
code = str(code)
if code[0] in ['5', '6', '9'] or code[:3] in ["009", "126", "110", "201", "202", "203", "204"]:
if code[0] in ['5', '6', '9'] or code[:3] in ["009", "126", "110", "201", "202", "203", "204","113"]:
market_id = 1 # 上海
try:
from vnpy.trader.constant import Exchange