[update] CTA期权引擎支持账号基本轧差;一般更新

This commit is contained in:
msincenselee 2021-12-15 13:59:01 +08:00
parent 7070140197
commit cba14c9b94
7 changed files with 513 additions and 112 deletions

View File

@ -26,6 +26,8 @@ from uuid import uuid1
from vnpy.event import Event, EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.object import (
OrderData,
TradeData,
OrderRequest,
SubscribeRequest,
LogData,
@ -65,7 +67,8 @@ from vnpy.trader.utility import (
get_underlying_symbol,
append_data,
import_module_by_str,
get_csv_last_dt)
print_dict,
get_csv_last_dt)
from vnpy.trader.util_logger import setup_logger, logging
from vnpy.trader.util_wechat import send_wx_msg
@ -177,6 +180,12 @@ class CtaOptionEngine(BaseEngine):
self.thread_tasks = []
self.vt_tradeids = set() # for filtering duplicate trade
self.active_orders = {}
self.internal_orderids = set()
self.net_pos_target = {} # 净仓目标, vt_symbol: {pos: 正负数}
self.net_pos_holding = {} # 净仓持有, vt_symbol: {pos: 正负数}
self.int_orderid_count = 1
self.last_minute = None
self.symbol_bar_dict = {} # vt_symbol: bar(一分钟bar)
@ -217,12 +226,15 @@ class CtaOptionEngine(BaseEngine):
self.register_event()
self.register_funcs()
# 恢复内部数据
self.load_internal_data()
self.load_strategy_class()
self.load_strategy_setting()
self.write_log("CTA策略引擎初始化成功")
if self.engine_config.get('get_pos_from_db',False):
if self.engine_config.get('get_pos_from_db', False):
self.write_log(f'激活数据库策略仓位比对模式')
self.init_mongo_data()
@ -296,7 +308,9 @@ class CtaOptionEngine(BaseEngine):
# 触发每个策略的定时接口
for strategy in list(self.strategies.values()):
strategy.on_timer()
if strategy and strategy.inited:
self.call_strategy_func(strategy, strategy.on_timer)
if not strategy.trading:
all_trading = False
@ -308,6 +322,32 @@ class CtaOptionEngine(BaseEngine):
if self.last_minute != dt.minute:
self.last_minute = dt.minute
# 内部订单超时处理
for vt_orderid in list(self.active_orders.keys()):
if vt_orderid not in self.internal_orderids:
self.write_log(f'{vt_orderid}不在内部活动订单中,不撤单')
continue
order = self.active_orders.get(vt_orderid, None)
if order is None:
self.write_error(f'找不到内部活动订单,不撤单')
continue
# 检查超时
if order.datetime and \
(datetime.now() - order.datetime).total_seconds() > 60 and \
order.status in [Status.NOTTRADED, Status.PARTTRADED]:
self.write_log(
f'内部活动订单{order.orderid}, {order.vt_symbol}[{order.name}], {order.direction.value}, {order.offset.value}超时')
req = order.create_cancel_request()
return self.main_engine.cancel_order(req, order.gateway_name)
for vt_symbol in set(self.net_pos_target.keys()).union(set(self.net_pos_holding.keys())):
self.execute_pos_target(vt_symbol)
# 保存内部数据
self.save_internal_data()
if all_trading:
# 主动获取所有策略得持仓信息
all_strategy_pos = self.get_all_strategy_pos()
@ -320,12 +360,6 @@ class CtaOptionEngine(BaseEngine):
# 推送到事件
self.put_all_strategy_pos_event(all_strategy_pos)
for strategy_name in list(self.strategies.keys()):
strategy = self.strategies.get(strategy_name, None)
if strategy and strategy.inited:
self.call_strategy_func(strategy, strategy.on_timer)
def process_tick_event(self, event: Event):
"""处理tick到达事件"""
tick = event.data
@ -344,7 +378,7 @@ class CtaOptionEngine(BaseEngine):
for strategy in strategies:
if strategy.inited:
self.call_strategy_func(strategy, strategy.on_tick, {tick.vt_symbol:tick})
self.call_strategy_func(strategy, strategy.on_tick, {tick.vt_symbol: tick})
def process_bar_event(self, event: Event):
"""处理bar到达事件"""
@ -365,8 +399,15 @@ class CtaOptionEngine(BaseEngine):
strategy = self.orderid_strategy_map.get(order.vt_orderid, None)
if not strategy:
self.write_log(f'委托单没有对应的策略设置:order:{order.__dict__}')
self.write_log(f'当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}')
# self.write_log(f'委托单没有对应的策略设置:order:{order.__dict__}')
# self.write_log(f'当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}')
if order.type != OrderType.STOP:
if order.status in [Status.ALLTRADED, Status.CANCELLED, Status.REJECTED]:
self.active_orders.pop(order.vt_orderid, None)
elif order.status in [Status.SUBMITTING, Status.NOTTRADED, Status.PARTTRADED]:
self.active_orders.update({order.vt_orderid: copy(order)})
return
self.write_log(f'委托更新:{order.vt_orderid} => 策略:{strategy.strategy_name}')
# Remove vt_orderid if order is no longer active.
@ -404,8 +445,19 @@ class CtaOptionEngine(BaseEngine):
strategy = self.orderid_strategy_map.get(trade.vt_orderid, None)
if not strategy:
self.write_log(f'成交单没有对应的策略设置:trade:{trade.__dict__}')
self.write_log(f'当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}')
if trade.vt_orderid in self.internal_orderids:
cur_pos = self.net_pos_holding.get(trade.vt_symbol,0)
if trade.direction == Direction.LONG:
new_pos = cur_pos + trade.volume
else:
new_pos = cur_pos - trade.volume
self.write_log(f'内部委托单成交更新,{trade.vt_symbol}[{trade.name}]: {cur_pos} => {new_pos}')
self.write_log(f'成交单:trade:{print_dict(trade.__dict__)}')
self.net_pos_holding.update({trade.vt_symbol: new_pos})
self.save_internal_data()
else:
self.write_log(f'成交单没有对应的策略设置:trade:{trade.__dict__}')
self.write_log(f'当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}')
return
self.write_log(f'成交更新:{trade.vt_orderid} => 策略:{strategy.strategy_name}')
@ -418,7 +470,8 @@ class CtaOptionEngine(BaseEngine):
# strategy.pos -= trade.volume
# 根据策略名称,写入 data\straetgy_name_trade.csv文件
strategy_name = getattr(strategy, 'strategy_name')
trade_fields = ['datetime', 'symbol', 'exchange', 'vt_symbol', 'name','tradeid', 'vt_tradeid', 'orderid', 'vt_orderid',
trade_fields = ['datetime', 'symbol', 'exchange', 'vt_symbol', 'name', 'tradeid', 'vt_tradeid', 'orderid',
'vt_orderid',
'direction', 'offset', 'price', 'volume']
trade_dict = OrderedDict()
try:
@ -554,7 +607,7 @@ class CtaOptionEngine(BaseEngine):
def send_server_order(
self,
strategy: CtaTemplate,
strategy_name: str,
contract: ContractData,
direction: Direction,
offset: Offset,
@ -575,7 +628,7 @@ class CtaOptionEngine(BaseEngine):
type=type,
price=price,
volume=volume,
strategy_name=strategy.strategy_name
strategy_name=strategy_name
)
# 如果没有指定网关,则使用合约信息内的网关
@ -599,14 +652,16 @@ class CtaOptionEngine(BaseEngine):
vt_orderids.append(vt_orderid)
# Save relationship between orderid and strategy.
self.orderid_strategy_map[vt_orderid] = strategy
self.strategy_orderid_map[strategy.strategy_name].add(vt_orderid)
strategy = self.strategies.get(strategy_name, None)
if strategy:
self.orderid_strategy_map[vt_orderid] = strategy
self.strategy_orderid_map[strategy.strategy_name].add(vt_orderid)
return vt_orderids
def send_limit_order(
self,
strategy: CtaTemplate,
strategy_name: str,
contract: ContractData,
direction: Direction,
offset: Offset,
@ -618,7 +673,7 @@ class CtaOptionEngine(BaseEngine):
Send a limit order to server.
"""
return self.send_server_order(
strategy,
strategy_name,
contract,
direction,
offset,
@ -630,7 +685,7 @@ class CtaOptionEngine(BaseEngine):
def send_fak_order(
self,
strategy: CtaTemplate,
strategy_name: str,
contract: ContractData,
direction: Direction,
offset: Offset,
@ -642,7 +697,7 @@ class CtaOptionEngine(BaseEngine):
Send a limit order to server.
"""
return self.send_server_order(
strategy,
strategy_name,
contract,
direction,
offset,
@ -654,7 +709,7 @@ class CtaOptionEngine(BaseEngine):
def send_server_stop_order(
self,
strategy: CtaTemplate,
strategy_name: str,
contract: ContractData,
direction: Direction,
offset: Offset,
@ -669,7 +724,7 @@ class CtaOptionEngine(BaseEngine):
on the trading server.
"""
return self.send_server_order(
strategy,
strategy_name,
contract,
direction,
offset,
@ -681,7 +736,7 @@ class CtaOptionEngine(BaseEngine):
def send_local_stop_order(
self,
strategy: CtaTemplate,
strategy_name: str,
vt_symbol: str,
direction: Direction,
offset: Offset,
@ -702,42 +757,43 @@ class CtaOptionEngine(BaseEngine):
price=price,
volume=volume,
stop_orderid=stop_orderid,
strategy_name=strategy.strategy_name,
strategy_name=strategy_name,
gateway_name=gateway_name
)
self.stop_orders[stop_orderid] = stop_order
vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
vt_orderids = self.strategy_orderid_map[strategy_name]
vt_orderids.add(stop_orderid)
self.call_strategy_func(strategy, strategy.on_stop_order, stop_order)
strategy = self.strategies.get(strategy_name, None)
if strategy:
self.call_strategy_func(strategy, strategy.on_stop_order, stop_order)
self.put_stop_order_event(stop_order)
return [stop_orderid]
def cancel_server_order(self, strategy: CtaTemplate, vt_orderid: str):
def cancel_server_order(self, strategy_name: str, vt_orderid: str):
"""
Cancel existing order by vt_orderid.
"""
order = self.main_engine.get_order(vt_orderid)
if not order:
self.write_log(msg=f"撤单失败,找不到委托{vt_orderid}",
strategy_name=strategy.strategy_name,
strategy_name=strategy_name,
level=logging.ERROR)
return False
req = order.create_cancel_request()
return self.main_engine.cancel_order(req, order.gateway_name)
def cancel_local_stop_order(self, strategy: CtaTemplate, stop_orderid: str):
def cancel_local_stop_order(self, strategy_name: str, stop_orderid: str):
"""
Cancel a local stop order.
"""
stop_order = self.stop_orders.get(stop_orderid, None)
if not stop_order:
return False
strategy = self.strategies[stop_order.strategy_name]
strategy = self.strategies[strategy_name]
# Remove from relation map.
self.stop_orders.pop(stop_orderid)
@ -755,7 +811,7 @@ class CtaOptionEngine(BaseEngine):
def send_order(
self,
strategy: CtaTemplate,
strategy_name: str,
vt_symbol: str,
direction: Direction,
offset: Offset,
@ -763,15 +819,17 @@ class CtaOptionEngine(BaseEngine):
volume: float,
stop: bool,
order_type: OrderType = OrderType.LIMIT,
gateway_name: str = None
gateway_name: str = None,
internal=False
):
"""
该方法供策略使用发送委托
该方法供策略引擎使用发送委托
internal: True,引擎内部使用,执行自动轧差; False 直接使用
"""
contract = self.main_engine.get_contract(vt_symbol)
if not contract:
self.write_log(msg=f"委托失败,找不到合约:{vt_symbol}",
strategy_name=strategy.strategy_name,
strategy_name=strategy_name,
level=logging.ERROR)
return ""
if contract.gateway_name and not gateway_name:
@ -779,37 +837,347 @@ class CtaOptionEngine(BaseEngine):
# Round order price and volume to nearest incremental value
price = round_to(price, contract.pricetick)
volume = round_to(volume, contract.min_volume)
if volume <= 0:
self.write_error(msg=f"委托失败,合约:{vt_symbol},委托数量{volume}不符合正数",
strategy_name=strategy_name,
level=logging.ERROR)
return ""
if stop:
if contract.stop_supported:
return self.send_server_stop_order(strategy, contract, direction, offset, price, volume,
return self.send_server_stop_order(strategy_name, contract, direction, offset, price, volume,
gateway_name)
else:
return self.send_local_stop_order(strategy, vt_symbol, direction, offset, price, volume,
return self.send_local_stop_order(strategy_name, vt_symbol, direction, offset, price, volume,
gateway_name)
if order_type == OrderType.FAK:
return self.send_fak_order(strategy, contract, direction, offset, price, volume, gateway_name)
else:
return self.send_limit_order(strategy, contract, direction, offset, price, volume, gateway_name)
# 内部订单
if internal:
return self.handel_internal_order(
strategy_name=strategy_name,
vt_symbol=vt_symbol,
direction=direction,
offset=offset,
price=price,
volume=volume,
gateway_name=gateway_name)
def cancel_order(self, strategy: CtaTemplate, vt_orderid: str):
# 直接调用主引擎
if order_type == OrderType.FAK:
return self.send_fak_order(strategy_name, contract, direction, offset, price, volume, gateway_name)
else:
return self.send_limit_order(strategy_name, contract, direction, offset, price, volume, gateway_name)
def cancel_order(self, strategy_name: str, vt_orderid: str):
"""
"""
if vt_orderid.startswith(STOPORDER_PREFIX):
return self.cancel_local_stop_order(strategy, vt_orderid)
return self.cancel_local_stop_order(strategy_name, vt_orderid)
else:
return self.cancel_server_order(strategy, vt_orderid)
return self.cancel_server_order(strategy_name, vt_orderid)
def cancel_all(self, strategy: CtaTemplate):
def cancel_all(self, strategy_name: str):
"""
Cancel all active orders of a strategy.
"""
vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
vt_orderids = self.strategy_orderid_map[strategy_name]
if not vt_orderids:
return
for vt_orderid in copy(vt_orderids):
self.cancel_order(strategy, vt_orderid)
self.cancel_order(strategy_name, vt_orderid)
def handel_internal_order(self, **kwargs) -> str:
"""
处理内部订单
策略 => 内部订单 => 产生内部订单号 => 登记内部处理逻辑 => 添加后续异步task
:param kwargs:
:return:
"""
self.write_log(f'内部订单 => 开始处理')
vt_symbol = kwargs.get('vt_symbol')
symbol, exchange = extract_vt_symbol(vt_symbol)
orderid = f'o_{self.int_orderid_count}'
strategy_name = kwargs.get('strategy_name', "")
self.int_orderid_count += 1
order = OrderData(
symbol=symbol,
exchange=exchange,
name=self.get_name(vt_symbol),
orderid=orderid,
direction=kwargs.get('direction'),
offset=kwargs.get('offset'),
type=OrderType.LIMIT,
price=kwargs.get('price'),
volume=kwargs.get('volume'),
datetime=datetime.now(),
gateway_name=kwargs.get('gateway_name', "")
)
self.write_log(f'内部订单 => 生成 \n{print_dict(order.__dict__)}')
strategy = self.strategies.get(strategy_name, None)
if strategy:
self.write_log(f'内部订单 => 绑定 {order.vt_orderid} <=>策略{strategy_name}')
self.orderid_strategy_map[order.vt_orderid] = strategy
self.strategy_orderid_map[strategy.strategy_name].add(order.vt_orderid)
task = self.thread_executor.submit(self._handle_internal_order, order, strategy_name)
self.thread_tasks.append(task)
return [order.vt_orderid]
def _handle_internal_order(self, order: OrderData, strategy_name: str):
"""
线程执行内部订单
:param order:
:return:
"""
self.write_log(f'内部订单 => 异步处理')
vt_symbol = order.vt_symbol
# 发送委托更新
order.sys_orderid = order.orderid
order.status = Status.ALLTRADED
order.traded = order.volume
order.time = order.datetime.strftime("%H:%M:%S")
# 制作假的成交单
trade = TradeData(
symbol=order.symbol,
exchange=order.exchange,
direction=order.direction,
offset=order.offset,
name=order.name,
strategy_name=strategy_name,
orderid=order.orderid,
tradeid=f't_{order.orderid}',
price=order.price,
volume=order.volume,
datetime=datetime.now(),
time=order.time,
gateway_name=order.gateway_name
)
self.write_log(f'内部订单 => 生成成交单 \n {print_dict(trade.__dict__)}')
# 发出委托更新、订单更新
self.event_engine.put(Event(type=EVENT_ORDER, data=order))
self.event_engine.put(Event(type=EVENT_TRADE, data=trade))
target_pos = self.net_pos_target.get(vt_symbol, 0)
if order.direction == Direction.LONG:
new_target_pos = target_pos + order.volume
else:
new_target_pos = target_pos - order.volume
self.net_pos_target.update({vt_symbol: new_target_pos})
self.write_log(
f'{strategy_name} {order.direction.value} {order.offset.value}: net_pos_target: {target_pos} => {new_target_pos}')
# 记录日志
append_data(
file_name=os.path.abspath(os.path.join(self.get_data_path(), 'cta_option_internal_orders.csv')),
dict_data=OrderedDict({
'datetime': order.datetime.strftime('%Y-%m-%d %H:%M:%S'),
'strategy_name': strategy_name,
'vt_symbol': order.volume,
'name': order.name,
'direction': order.direction.value,
'offset': order.offset.value,
'price': order.price,
'volume': order.volume,
'old_target': target_pos,
'new_target': new_target_pos
}))
def load_internal_data(self):
"""
加载内部数据
:return:
"""
f_name = os.path.abspath(os.path.join(self.get_data_path(),f'{self.engine_name}_datas.json'))
try:
j = load_json(f_name,auto_save=True)
self.net_pos_target = j.get('net_pos_target', {})
self.net_pos_holding = j.get('net_pos_holding', {})
self.write_log('恢复内部持仓目标:{}'.format(
';'.join([f'{k}[{self.get_name(k)}]:{v}' for k,v in self.net_pos_target.items()])))
self.write_log('恢复内部持仓:{}'.format(
';'.join([f'{k}[{self.get_name(k)}]:{v}' for k, v in self.net_pos_target.items()])))
except Exception as ex:
self.write_error(f'恢复内部数据异常:{str(ex)}')
def save_internal_data(self):
"""
保存内部数据
:return:
"""
f_name = os.path.abspath(os.path.join(self.get_data_path(), f'{self.engine_name}_datas.json'))
try:
d = {
"net_pos_target":self.net_pos_target,
"net_pos_holding":self.net_pos_holding
}
save_json(f_name,d)
except Exception as ex:
self.write_error(f'保存内部数据异常:{str(ex)}')
def execute_pos_target(self, vt_symbol):
"""
执行仓位目标
:param vt_symbol:
:return:
"""
target_pos = self.net_pos_target.get(vt_symbol, 0)
holding_pos = self.net_pos_holding.get(vt_symbol, 0)
diff_pos = target_pos - holding_pos
if diff_pos == 0:
return
# 获取最新价
cur_price = self.get_price(vt_symbol)
if not cur_price:
self.write_log(f'仓位目标执行 => 订阅{vt_symbol}行情')
contract = self.main_engine.get_contract(vt_symbol)
if contract:
gateway_name = ""
if contract.gateway_name:
gateway_name = contract.gateway_name
req = SubscribeRequest(
symbol=contract.symbol, exchange=contract.exchange)
self.main_engine.subscribe(req, gateway_name)
return
# 获取最新tick
cur_tick = self.get_tick(vt_symbol)
if cur_tick is None:
return
price_tick = self.get_price_tick(vt_symbol)
# 需要买入
if diff_pos > 0:
# 账号得多、空
acc_long_position = self.get_position(vt_symbol=vt_symbol, direction=Direction.LONG)
acc_long_pos = 0 if acc_long_position is None else acc_long_position.volume-acc_long_position.frozen
acc_short_position = self.get_position(vt_symbol=vt_symbol, direction=Direction.SHORT)
acc_short_pos = 0 if acc_short_position is None else acc_short_position.volume-acc_short_position.frozen
cover_pos = 0
buy_pos = 0
# 仅平仓
if acc_short_pos > 0:
# 优先平空单
cover_pos = min(diff_pos,acc_short_pos)
buy_pos = diff_pos - cover_pos
else:
# 仅开仓
cover_pos = 0
buy_pos = diff_pos
self.write_log(f'{self.engine_name}仓位执行{vt_symbol}[{self.get_name(vt_symbol)}]: ' +
f'[账号多单:{acc_long_pos},空单:{acc_short_pos}]' +
f'[holding:{holding_pos} =>target:{target_pos} ] => cover:{cover_pos} + buy:{buy_pos}')
if cover_pos > 0:
if not self.exist_order(vt_symbol, direction=Direction.LONG, offset=Offset.CLOSE):
vt_orderids = self.send_order(
strategy_name="",
vt_symbol=vt_symbol,
price=cur_price,
volume=cover_pos,
direction=Direction.LONG,
offset=Offset.CLOSE,
order_type=OrderType.LIMIT,
stop=False,
internal=False
)
if len(vt_orderids) > 0:
self.internal_orderids =self.internal_orderids.union(vt_orderids)
if buy_pos > 0:
if not self.exist_order(vt_symbol, direction=Direction.LONG, offset=Offset.OPEN):
vt_orderids = self.send_order(
strategy_name="",
vt_symbol=vt_symbol,
price=cur_price,
volume=buy_pos,
direction=Direction.LONG,
offset=Offset.OPEN,
order_type=OrderType.LIMIT,
stop=False,
internal=False
)
if len(vt_orderids) > 0:
self.internal_orderids= self.internal_orderids.union(vt_orderids)
# 需要卖出 ( diff_pos < 0)
else:
# 账号得多、空
acc_long_position = self.get_position(vt_symbol=vt_symbol, direction=Direction.LONG)
acc_long_pos = 0 if acc_long_position is None else acc_long_position.volume - acc_long_position.frozen
acc_short_position = self.get_position(vt_symbol=vt_symbol, direction=Direction.SHORT)
acc_short_pos = 0 if acc_short_position is None else acc_short_position.volume - acc_short_position.frozen
# 如果持有多单,优先平掉多单
if acc_long_pos > 0:
sell_pos = min(abs(diff_pos), acc_long_pos)
short_pos = abs(diff_pos) - sell_pos
else:
# 仅开仓
sell_pos = 0
short_pos = abs(diff_pos)
self.write_log(f'{self.engine_name}仓位执行{vt_symbol}[{self.get_name(vt_symbol)}]' +
f'[账号多单:{acc_long_pos},空单:{acc_short_pos}]' +
f'[holding:{holding_pos} => target:{target_pos}] => sell:{sell_pos}, short:{short_pos}')
if sell_pos > 0:
if not self.exist_order(vt_symbol, direction=Direction.SHORT, offset=Offset.CLOSE):
vt_orderids = self.send_order(
strategy_name="",
vt_symbol=vt_symbol,
price=cur_price,
volume=sell_pos,
direction=Direction.SHORT,
offset=Offset.CLOSE,
order_type=OrderType.LIMIT,
stop=False,
internal=False
)
if len(vt_orderids) > 0:
self.internal_orderids = self.internal_orderids.union(vt_orderids)
if short_pos > 0:
if not self.exist_order(vt_symbol, direction=Direction.SHORT, offset=Offset.OPEN):
vt_orderids = self.send_order(
strategy_name="",
vt_symbol=vt_symbol,
price=cur_price,
volume=short_pos,
direction=Direction.LONG,
offset=Offset.OPEN,
order_type=OrderType.LIMIT,
stop=False,
internal=False
)
if len(vt_orderids) > 0:
self.internal_orderids = self.internal_orderids.union(vt_orderids)
def exist_order(self, vt_symbol, direction, offset):
"""
是否存在相同得委托
:param vt_symbol:
:param direction:
:param offset:
:return:
"""
if len(self.active_orders) == 0:
self.write_log(f'内部活动订单中,数量为零. 查询{vt_symbol},方向:{direction.value}, 开平:{offset.value}')
return False
for vt_orderid in list(self.active_orders.keys()):
order = self.active_orders.get(vt_orderid, None)
if order is None:
continue
if order.vt_symbol == vt_symbol and order.direction == direction and order.offset == offset:
self.write_log(f'引擎存在相同得内部活动订单:{order.name}')
return True
return False
def subscribe_symbol(self, strategy_name: str, vt_symbol: str, gateway_name: str = '', is_bar: bool = False):
"""订阅合约"""
@ -1215,7 +1583,8 @@ class CtaOptionEngine(BaseEngine):
# 复权转换
adj_list = self.adjust_factor_dict.get(vt_symbol, [])
# 按照结束日期,裁剪复权记录
adj_list = [row for row in adj_list if row['dividOperateDate'].replace('-', '') <= end.strftime('%Y%m%d')]
adj_list = [row for row in adj_list if
row['dividOperateDate'].replace('-', '') <= end.strftime('%Y%m%d')]
if len(adj_list) > 0:
self.write_log(f'需要对{vt_symbol}进行前复权处理')
@ -1223,13 +1592,14 @@ class CtaOptionEngine(BaseEngine):
row.update({'dividOperateDate': row.get('dividOperateDate')[:10] + ' 09:30:00'})
# list -> dataframe, 转换复权日期格式
adj_data = pd.DataFrame(adj_list)
adj_data["dividOperateDate"] = pd.to_datetime(adj_data["dividOperateDate"], format="%Y-%m-%d %H:%M:%S")
adj_data["dividOperateDate"] = pd.to_datetime(adj_data["dividOperateDate"],
format="%Y-%m-%d %H:%M:%S")
adj_data = adj_data.set_index("dividOperateDate")
# 调用转换方法对open,high,low,close, volume进行复权, fore, 前复权, 其他,后复权
symbol_df = stock_to_adj(symbol_df, adj_data, adj_type='fore')
for dt, bar_data in symbol_df.iterrows():
bar_datetime = dt #- timedelta(seconds=bar_interval_seconds)
bar_datetime = dt # - timedelta(seconds=bar_interval_seconds)
bar = BarData(
gateway_name='backtesting',
@ -1266,7 +1636,6 @@ class CtaOptionEngine(BaseEngine):
return bars
def resample_bars(self, df, x_min=None, x_hour=None, to_day=False):
"""
重建x分钟K线或日线
@ -1917,7 +2286,7 @@ class CtaOptionEngine(BaseEngine):
self.init_mongo_data()
if self.mongo_data and self.mongo_data.db_has_connected:
filter = {'account_id':self.engine_config.get('accountid','-')}
filter = {'account_id': self.engine_config.get('accountid', '-')}
pos_list = self.mongo_data.db_query(
db_name='Account',
@ -2013,19 +2382,19 @@ class CtaOptionEngine(BaseEngine):
for pos in self.main_engine.get_all_positions():
vt_symbols.add(pos.vt_symbol)
vt_symbol_pos = compare_pos.get(pos.vt_symbol, {
"账号空单": 0,
'账号多单': 0,
'策略空单': 0,
'策略多单': 0,
'空单策略': [],
'多单策略': []
})
"账号空单": 0,
'账号多单': 0,
'策略空单': 0,
'策略多单': 0,
'空单策略': [],
'多单策略': []
})
if pos.direction == Direction.LONG:
vt_symbol_pos['账号多单'] = vt_symbol_pos['账号多单'] + pos.volume
else:
vt_symbol_pos['账号空单'] = vt_symbol_pos['账号空单'] + pos.volume
compare_pos.update({pos.vt_symbol:vt_symbol_pos})
compare_pos.update({pos.vt_symbol: vt_symbol_pos})
# 逐一根据策略仓位与Account_pos进行处理比对
for strategy_pos in strategy_pos_list:
@ -2081,33 +2450,31 @@ class CtaOptionEngine(BaseEngine):
'direction': Direction.SHORT.value,
'strategy_list': symbol_pos.get('空单策略', [])}
# 股指期货: 帐号多/空轧差, vs 策略多空轧差 是否一致;
# 其他期货:帐号多单 vs 除了多单, 空单 vs 空单
if vt_symbol.endswith(".CFFEX"):
diff_match = (symbol_pos.get('账号多单', 0) - symbol_pos.get('账号空单', 0)) == (
symbol_pos.get('策略多单', 0) - symbol_pos.get('策略空单', 0))
pos_match = symbol_pos.get('账号空单', 0) == symbol_pos.get('策略空单', 0) and \
symbol_pos.get('账号多单', 0) == symbol_pos.get('策略多单', 0)
match = diff_match
# 轧差一致,帐号/策略持仓不一致
if diff_match and not pos_match:
if symbol_pos.get('账号多单', 0) > symbol_pos.get('策略多单', 0):
self.write_log('{}轧差持仓:多:{},空:{} 大于 策略持仓 多:{},空:{}'.format(
vt_symbol,
symbol_pos.get('账号多单', 0),
symbol_pos.get('账号空单', 0),
symbol_pos.get('策略多单', 0),
symbol_pos.get('策略空单', 0)
))
diff_pos_dict.update({vt_symbol: {"long": symbol_pos.get('账号多单', 0) - symbol_pos.get('策略多单', 0),
"short": symbol_pos.get('账号空单', 0) - symbol_pos.get('策略空单',
0)}})
else:
match = round(symbol_pos.get('账号空单', 0), 7) == round(symbol_pos.get('策略空单', 0), 7) and \
round(symbol_pos.get('账号多单', 0), 7) == round(symbol_pos.get('策略多单', 0), 7)
# 帐号多/空轧差, vs 策略多空轧差 是否一致;
diff_match = (symbol_pos.get('账号多单', 0) - symbol_pos.get('账号空单', 0)) == (
symbol_pos.get('策略多单', 0) - symbol_pos.get('策略空单', 0))
pos_match = symbol_pos.get('账号空单', 0) == symbol_pos.get('策略空单', 0) and \
symbol_pos.get('账号多单', 0) == symbol_pos.get('策略多单', 0)
match = diff_match
# 轧差一致,帐号/策略持仓不一致
if diff_match and not pos_match:
if symbol_pos.get('账号多单', 0) > symbol_pos.get('策略多单', 0):
self.write_log('{}轧差持仓:多:{},空:{} 大于 策略持仓 多:{},空:{}'.format(
vt_symbol,
symbol_pos.get('账号多单', 0),
symbol_pos.get('账号空单', 0),
symbol_pos.get('策略多单', 0),
symbol_pos.get('策略空单', 0)
))
diff_pos_dict.update({vt_symbol: {"long": symbol_pos.get('账号多单', 0) - symbol_pos.get('策略多单', 0),
"short": symbol_pos.get('账号空单', 0) - symbol_pos.get('策略空单',
0)}})
# 多空都一致
if match:
msg = u'{}多空都一致.{}\n'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False))
msg = u'{}[{}]多空都一致.{}\n'.format(vt_symbol, self.get_name(vt_symbol),
json.dumps(symbol_pos, indent=2, ensure_ascii=False))
self.write_log(msg)
compare_info += msg
else:
@ -2131,14 +2498,15 @@ class CtaOptionEngine(BaseEngine):
diff_short_volume = round(symbol_pos.get('账号空单', 0), 7) - round(symbol_pos.get('策略空单', 0), 7)
if diff_short_volume != 0:
msg = '{}空单[账号({}), 策略{},共({})], ' \
msg = '{}[{}] 空单[账号({}), 策略{},共({})], ' \
.format(vt_symbol,
self.get_name(vt_symbol),
symbol_pos.get('账号空单'),
symbol_pos.get('空单策略'),
symbol_pos.get('策略空单'))
pos_compare_result += msg
self.write_error(u'{}不一致:{}'.format(vt_symbol, msg))
compare_info += u'{}不一致:{}\n'.format(vt_symbol, msg)
self.write_error(u'{}[{}]不一致:{}'.format(vt_symbol, self.get_name(vt_symbol), msg))
compare_info += u'{}[{}]不一致:{}\n'.format(vt_symbol, self.get_name(vt_symbol), msg)
if auto_balance:
self.balance_pos(vt_symbol, Direction.SHORT, diff_short_volume)
@ -2374,6 +2742,6 @@ class CtaOptionEngine(BaseEngine):
if strategy:
subject = f"{strategy.strategy_name}"
else:
subject = "CTAOPtion引擎"
subject = "CTA Option引擎"
send_wx_msg(content=f'{subject}:{msg}')

View File

@ -194,7 +194,7 @@ class CtaTemplate(ABC):
pos = PositionData(
gateway_name=contract.gateway_name if contract else '',
symbol=symbol,
name=contract.name,
name=contract.name if contract else symbol,
exchange=exchange,
direction=direction
)
@ -405,14 +405,15 @@ class CtaTemplate(ABC):
return []
vt_orderids = self.cta_engine.send_order(
strategy=self,
strategy_name=self.strategy_name,
vt_symbol=vt_symbol,
direction=direction,
offset=offset,
price=price,
volume=volume,
stop=stop,
order_type=order_type
order_type=order_type,
internal=True
)
if len(vt_orderids) == 0:
self.write_error(f'{self.strategy_name}调用cta_engine.send_order委托返回失败,vt_symbol:{vt_symbol}')
@ -452,7 +453,7 @@ class CtaTemplate(ABC):
Cancel an existing order.
"""
if self.trading:
return self.cta_engine.cancel_order(self, vt_orderid)
return self.cta_engine.cancel_order(self.strategy_name, vt_orderid)
return False
@ -461,7 +462,7 @@ class CtaTemplate(ABC):
Cancel all orders sent by strategy.
"""
if self.trading:
self.cta_engine.cancel_all(self)
self.cta_engine.cancel_all(self.strategy_name)
def is_upper_limit(self, symbol):
"""是否涨停"""
@ -539,7 +540,11 @@ class CtaTemplate(ABC):
self.cta_engine.sync_strategy_data(self)
class CtaOptionTemplate(CtaTemplate):
"""期权交易增强版模板"""
"""
期权交易增强版模板
使用target_pos得方式开平仓时只需要更新self.policy.target_pos{}
"""
# 逻辑过程日志
@ -595,7 +600,6 @@ class CtaOptionTemplate(CtaTemplate):
self.write_log('当前policy:\n{}'.format(print_dict(self.policy.to_json())))
def sync_data(self):
"""同步更新数据"""
if not self.backtesting:

View File

@ -1078,6 +1078,8 @@ class CtaStockTemplate(CtaTemplate):
ordering_grid = None
for grid in self.gt.dn_grids:
cn_name = self.cta_engine.get_name(grid.vt_symbol)
# 只扫描vt_symbol 匹配的网格
if vt_symbol and vt_symbol != grid.vt_symbol:
continue
@ -1088,10 +1090,15 @@ class CtaStockTemplate(CtaTemplate):
# 排除存在委托单号的网格
if len(grid.order_ids) > 0:
self.write_log(f'网格{grid.vt_symbol}[{cn_name}]存在委托单号:{grid.order_ids}')
continue
if grid.volume <= grid.traded_volume:
self.write_log(u'网格计划卖出:{},已成交:{}'.format(grid.volume, grid.traded_volume))
self.write_log(u'{}[{}]网格计划卖出:{},已成交:{}'.format(
grid.vt_symbol,
cn_name,
grid.volume,
grid.traded_volume))
self.tns_finish_sell_grid(grid)
continue
@ -1103,7 +1110,7 @@ class CtaStockTemplate(CtaTemplate):
direction=Direction.NET)
vt_symbol = ordering_grid.vt_symbol
cn_name = self.cta_engine.get_name(ordering_grid.vt_symbol)
sell_volume = ordering_grid.volume - ordering_grid.traded_volume
if acc_symbol_pos is None:
@ -1122,11 +1129,12 @@ class CtaStockTemplate(CtaTemplate):
sell_volume = acc_symbol_pos.volume
if sell_volume == 0:
self.write_log(f'账号{vt_symbol}持仓{acc_symbol_pos.volume},卖出目标:{sell_volume}=0 不执行')
self.write_log(f'账号{vt_symbol}[{cn_name}]持仓{acc_symbol_pos.volume},卖出目标:{sell_volume}=0 不执行')
continue
cur_price = self.cta_engine.get_price(vt_symbol)
if not cur_price:
self.write_log(f'获取不到{vt_symbol}[{cn_name}]价格,发出订阅')
self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=vt_symbol)
continue

View File

@ -871,7 +871,9 @@ class CtaProTemplate(CtaTemplate):
for g in self.gt.get_opened_grids(direction=Direction.LONG):
vt_symbol = g.snapshot.get('mi_symbol', g.vt_symbol if g.vt_symbol and '99' not in g.vt_symbol else self.vt_symbol)
open_price = g.snapshot.get('open_price', g.open_price)
name = self.cta_engine.get_name(vt_symbol)
pos_list.append({'vt_symbol': vt_symbol,
'name':name,
'direction': 'long',
'volume': g.volume - g.traded_volume,
'price': open_price})
@ -880,7 +882,9 @@ class CtaProTemplate(CtaTemplate):
for g in self.gt.get_opened_grids(direction=Direction.SHORT):
vt_symbol = g.snapshot.get('mi_symbol', g.vt_symbol if g.vt_symbol and '99' not in g.vt_symbol else self.vt_symbol)
open_price = g.snapshot.get('open_price', g.open_price)
name = self.cta_engine.get_name(vt_symbol)
pos_list.append({'vt_symbol': vt_symbol,
'name': name,
'direction': 'short',
'volume': abs(g.volume - g.traded_volume),
'price': open_price})
@ -1787,7 +1791,7 @@ class CtaProFutureTemplate(CtaProTemplate):
# order_price = order_info['price']
# order_direction = order_info['direction']
# order_offset = order_info['offset']
order_grid = order_info['grid']
order_grid = order_info.get('grid',None)
order_status = order_info.get('status', Status.NOTTRADED)
order_type = order_info.get('order_type', OrderType.LIMIT)
over_seconds = (dt - order_time).total_seconds()
@ -1827,7 +1831,7 @@ class CtaProFutureTemplate(CtaProTemplate):
if order_info['direction'] == Direction.SHORT:
cur_price = self.cta_engine.get_price(order_vt_symbol)
short_price = cur_price - self.price_tick
if order_grid.volume != order_volume and order_volume > 0:
if order_grid and order_grid.volume != order_volume and order_volume > 0:
self.write_log(
u'网格volume:{},order_volume:{}不一致,修正'.format(order_grid.volume, order_volume))
order_grid.volume = order_volume
@ -1842,13 +1846,14 @@ class CtaProFutureTemplate(CtaProTemplate):
if len(vt_orderids) > 0:
self.write_log(u'委托成功orderid:{}'.format(vt_orderids))
order_grid.snapshot.update({'open_price': short_price})
if order_grid:
order_grid.snapshot.update({'open_price': short_price})
else:
self.write_error(u'撤单后,重新委托开空仓失败')
else:
cur_price = self.cta_engine.get_price(order_vt_symbol)
buy_price = cur_price + self.price_tick
if order_grid.volume != order_volume and order_volume > 0:
if order_grid and order_grid.volume != order_volume and order_volume > 0:
self.write_log(
u'网格volume:{},order_volume:{}不一致,修正'.format(order_grid.volume, order_volume))
order_grid.volume = order_volume
@ -1863,7 +1868,8 @@ class CtaProFutureTemplate(CtaProTemplate):
if len(vt_orderids) > 0:
self.write_log(u'委托成功orderids:{}'.format(vt_orderids))
order_grid.snapshot.update({'open_price': buy_price})
if order_grid:
order_grid.snapshot.update({'open_price': buy_price})
else:
self.write_error(u'撤单后,重新委托开多仓失败')
else:

View File

@ -132,6 +132,8 @@ class IndexTickPublisherV2(BaseEngine):
if len(self.selected_underly_symbols) > 0 and underly_symbol not in self.selected_underly_symbols:
continue
self.write_log(f'定时检查{underly_symbol}')
# 日盘数据,夜盘期间不订阅
if dt_now.hour < 4 or dt_now.hour > 20:
if underly_symbol in MARKET_DAY_ONLY:
@ -139,12 +141,17 @@ class IndexTickPublisherV2(BaseEngine):
# 获取当前所有的合约列表
symbols = info.get('symbols', {})
total_oi = 0
if len(symbols) > 0:
total_oi = sum([v for v in symbols.values()])
# 获取交易所
exchange = info.get('exchange', 'LOCAL')
# 获取本地记录的tick dict
tick_dict = self.ticks.get(underly_symbol, {})
for symbol in symbols.keys():
for symbol in list(symbols.keys()):
# 全路径合约 => 标准合约 ,如 ZC2109 => ZC109, RB2110 => rb2110
vn_symbol = get_real_symbol_by_exchange(symbol, Exchange(exchange))
@ -152,6 +159,14 @@ class IndexTickPublisherV2(BaseEngine):
self.write_log(f'移除早于当月的合约{symbol}')
symbols.pop(symbol, None)
continue
cur_oi = symbols.get(symbol,0)
if cur_oi < max(total_oi * 0.03,100):
self.write_log(f'{symbol} 上一交易日持仓量:{cur_oi} 小于合约总持仓量{total_oi}得3% {max(total_oi * 0.03,100)},不纳入指数计算范围')
symbols.pop(symbol, None)
continue
# 生成带交易所信息的合约
vt_symbol = f'{vn_symbol}.{exchange}'
# symbol_exchange_map是全局变量ctp md api会使用到所以需要更新其 合约与交易所的关系

View File

@ -256,8 +256,8 @@ class ChanSignals(Enum):
# 趋势类买卖点(9~13笔分析结果
Q1L0 = "Q1L0~趋势类一买"
Q2L0 = "Q2L0~趋势类二买"
Q3L0 = "Q2L0~趋势类三买"
Q3L0 = "Q3L0~趋势类三买"
Q1S0 = "Q1S0~趋势类一卖"
Q2S0 = "Q2S0~趋势类二卖"
Q3S0 = "Q2S0~趋势类三卖"
Q3S0 = "Q3S0~趋势类三卖"

View File

@ -337,7 +337,7 @@ def get_digits(value: float) -> int:
def print_dict(d: dict):
"""返回dict的字符串类型"""
max_key_len = max([len(str(k)) for k in d.keys()])
max_key_len = max([len(str(k)) for k in d.keys()]) if len(d.keys()) > 0 else 10
return '\n'.join([str(key) + (max_key_len-len(str(key))) * " " + f': {d[key]}' for key in sorted(d.keys())])