diff --git a/vnpy/app/cta_option/engine.py b/vnpy/app/cta_option/engine.py index c8adfc0a..527ce519 100644 --- a/vnpy/app/cta_option/engine.py +++ b/vnpy/app/cta_option/engine.py @@ -26,6 +26,8 @@ from uuid import uuid1 from vnpy.event import Event, EventEngine from vnpy.trader.engine import BaseEngine, MainEngine from vnpy.trader.object import ( + OrderData, + TradeData, OrderRequest, SubscribeRequest, LogData, @@ -65,7 +67,8 @@ from vnpy.trader.utility import ( get_underlying_symbol, append_data, import_module_by_str, -get_csv_last_dt) + print_dict, + get_csv_last_dt) from vnpy.trader.util_logger import setup_logger, logging from vnpy.trader.util_wechat import send_wx_msg @@ -177,6 +180,12 @@ class CtaOptionEngine(BaseEngine): self.thread_tasks = [] self.vt_tradeids = set() # for filtering duplicate trade + self.active_orders = {} + self.internal_orderids = set() + + self.net_pos_target = {} # 净仓目标, vt_symbol: {pos: 正负数} + self.net_pos_holding = {} # 净仓持有, vt_symbol: {pos: 正负数} + self.int_orderid_count = 1 self.last_minute = None self.symbol_bar_dict = {} # vt_symbol: bar(一分钟bar) @@ -217,12 +226,15 @@ class CtaOptionEngine(BaseEngine): self.register_event() self.register_funcs() + # 恢复内部数据 + self.load_internal_data() + self.load_strategy_class() self.load_strategy_setting() self.write_log("CTA策略引擎初始化成功") - if self.engine_config.get('get_pos_from_db',False): + if self.engine_config.get('get_pos_from_db', False): self.write_log(f'激活数据库策略仓位比对模式') self.init_mongo_data() @@ -296,7 +308,9 @@ class CtaOptionEngine(BaseEngine): # 触发每个策略的定时接口 for strategy in list(self.strategies.values()): - strategy.on_timer() + if strategy and strategy.inited: + self.call_strategy_func(strategy, strategy.on_timer) + if not strategy.trading: all_trading = False @@ -308,6 +322,32 @@ class CtaOptionEngine(BaseEngine): if self.last_minute != dt.minute: self.last_minute = dt.minute + # 内部订单超时处理 + for vt_orderid in list(self.active_orders.keys()): + if vt_orderid not in self.internal_orderids: + self.write_log(f'{vt_orderid}不在内部活动订单中,不撤单') + continue + + order = self.active_orders.get(vt_orderid, None) + if order is None: + self.write_error(f'找不到内部活动订单,不撤单') + continue + + # 检查超时 + if order.datetime and \ + (datetime.now() - order.datetime).total_seconds() > 60 and \ + order.status in [Status.NOTTRADED, Status.PARTTRADED]: + self.write_log( + f'内部活动订单{order.orderid}, {order.vt_symbol}[{order.name}], {order.direction.value}, {order.offset.value}超时') + req = order.create_cancel_request() + return self.main_engine.cancel_order(req, order.gateway_name) + + for vt_symbol in set(self.net_pos_target.keys()).union(set(self.net_pos_holding.keys())): + self.execute_pos_target(vt_symbol) + + # 保存内部数据 + self.save_internal_data() + if all_trading: # 主动获取所有策略得持仓信息 all_strategy_pos = self.get_all_strategy_pos() @@ -320,12 +360,6 @@ class CtaOptionEngine(BaseEngine): # 推送到事件 self.put_all_strategy_pos_event(all_strategy_pos) - for strategy_name in list(self.strategies.keys()): - strategy = self.strategies.get(strategy_name, None) - if strategy and strategy.inited: - self.call_strategy_func(strategy, strategy.on_timer) - - def process_tick_event(self, event: Event): """处理tick到达事件""" tick = event.data @@ -344,7 +378,7 @@ class CtaOptionEngine(BaseEngine): for strategy in strategies: if strategy.inited: - self.call_strategy_func(strategy, strategy.on_tick, {tick.vt_symbol:tick}) + self.call_strategy_func(strategy, strategy.on_tick, {tick.vt_symbol: tick}) def process_bar_event(self, event: Event): """处理bar到达事件""" @@ -365,8 +399,15 @@ class CtaOptionEngine(BaseEngine): strategy = self.orderid_strategy_map.get(order.vt_orderid, None) if not strategy: - self.write_log(f'委托单没有对应的策略设置:order:{order.__dict__}') - self.write_log(f'当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}') + # self.write_log(f'委托单没有对应的策略设置:order:{order.__dict__}') + # self.write_log(f'当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}') + if order.type != OrderType.STOP: + if order.status in [Status.ALLTRADED, Status.CANCELLED, Status.REJECTED]: + self.active_orders.pop(order.vt_orderid, None) + + elif order.status in [Status.SUBMITTING, Status.NOTTRADED, Status.PARTTRADED]: + self.active_orders.update({order.vt_orderid: copy(order)}) + return self.write_log(f'委托更新:{order.vt_orderid} => 策略:{strategy.strategy_name}') # Remove vt_orderid if order is no longer active. @@ -404,8 +445,19 @@ class CtaOptionEngine(BaseEngine): strategy = self.orderid_strategy_map.get(trade.vt_orderid, None) if not strategy: - self.write_log(f'成交单没有对应的策略设置:trade:{trade.__dict__}') - self.write_log(f'当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}') + if trade.vt_orderid in self.internal_orderids: + cur_pos = self.net_pos_holding.get(trade.vt_symbol,0) + if trade.direction == Direction.LONG: + new_pos = cur_pos + trade.volume + else: + new_pos = cur_pos - trade.volume + self.write_log(f'内部委托单成交更新,{trade.vt_symbol}[{trade.name}]: {cur_pos} => {new_pos}') + self.write_log(f'成交单:trade:{print_dict(trade.__dict__)}') + self.net_pos_holding.update({trade.vt_symbol: new_pos}) + self.save_internal_data() + else: + self.write_log(f'成交单没有对应的策略设置:trade:{trade.__dict__}') + self.write_log(f'当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}') return self.write_log(f'成交更新:{trade.vt_orderid} => 策略:{strategy.strategy_name}') @@ -418,7 +470,8 @@ class CtaOptionEngine(BaseEngine): # strategy.pos -= trade.volume # 根据策略名称,写入 data\straetgy_name_trade.csv文件 strategy_name = getattr(strategy, 'strategy_name') - trade_fields = ['datetime', 'symbol', 'exchange', 'vt_symbol', 'name','tradeid', 'vt_tradeid', 'orderid', 'vt_orderid', + trade_fields = ['datetime', 'symbol', 'exchange', 'vt_symbol', 'name', 'tradeid', 'vt_tradeid', 'orderid', + 'vt_orderid', 'direction', 'offset', 'price', 'volume'] trade_dict = OrderedDict() try: @@ -554,7 +607,7 @@ class CtaOptionEngine(BaseEngine): def send_server_order( self, - strategy: CtaTemplate, + strategy_name: str, contract: ContractData, direction: Direction, offset: Offset, @@ -575,7 +628,7 @@ class CtaOptionEngine(BaseEngine): type=type, price=price, volume=volume, - strategy_name=strategy.strategy_name + strategy_name=strategy_name ) # 如果没有指定网关,则使用合约信息内的网关 @@ -599,14 +652,16 @@ class CtaOptionEngine(BaseEngine): vt_orderids.append(vt_orderid) # Save relationship between orderid and strategy. - self.orderid_strategy_map[vt_orderid] = strategy - self.strategy_orderid_map[strategy.strategy_name].add(vt_orderid) + strategy = self.strategies.get(strategy_name, None) + if strategy: + self.orderid_strategy_map[vt_orderid] = strategy + self.strategy_orderid_map[strategy.strategy_name].add(vt_orderid) return vt_orderids def send_limit_order( self, - strategy: CtaTemplate, + strategy_name: str, contract: ContractData, direction: Direction, offset: Offset, @@ -618,7 +673,7 @@ class CtaOptionEngine(BaseEngine): Send a limit order to server. """ return self.send_server_order( - strategy, + strategy_name, contract, direction, offset, @@ -630,7 +685,7 @@ class CtaOptionEngine(BaseEngine): def send_fak_order( self, - strategy: CtaTemplate, + strategy_name: str, contract: ContractData, direction: Direction, offset: Offset, @@ -642,7 +697,7 @@ class CtaOptionEngine(BaseEngine): Send a limit order to server. """ return self.send_server_order( - strategy, + strategy_name, contract, direction, offset, @@ -654,7 +709,7 @@ class CtaOptionEngine(BaseEngine): def send_server_stop_order( self, - strategy: CtaTemplate, + strategy_name: str, contract: ContractData, direction: Direction, offset: Offset, @@ -669,7 +724,7 @@ class CtaOptionEngine(BaseEngine): on the trading server. """ return self.send_server_order( - strategy, + strategy_name, contract, direction, offset, @@ -681,7 +736,7 @@ class CtaOptionEngine(BaseEngine): def send_local_stop_order( self, - strategy: CtaTemplate, + strategy_name: str, vt_symbol: str, direction: Direction, offset: Offset, @@ -702,42 +757,43 @@ class CtaOptionEngine(BaseEngine): price=price, volume=volume, stop_orderid=stop_orderid, - strategy_name=strategy.strategy_name, + strategy_name=strategy_name, gateway_name=gateway_name ) self.stop_orders[stop_orderid] = stop_order - vt_orderids = self.strategy_orderid_map[strategy.strategy_name] + vt_orderids = self.strategy_orderid_map[strategy_name] vt_orderids.add(stop_orderid) - - self.call_strategy_func(strategy, strategy.on_stop_order, stop_order) + strategy = self.strategies.get(strategy_name, None) + if strategy: + self.call_strategy_func(strategy, strategy.on_stop_order, stop_order) self.put_stop_order_event(stop_order) return [stop_orderid] - def cancel_server_order(self, strategy: CtaTemplate, vt_orderid: str): + def cancel_server_order(self, strategy_name: str, vt_orderid: str): """ Cancel existing order by vt_orderid. """ order = self.main_engine.get_order(vt_orderid) if not order: self.write_log(msg=f"撤单失败,找不到委托{vt_orderid}", - strategy_name=strategy.strategy_name, + strategy_name=strategy_name, level=logging.ERROR) return False req = order.create_cancel_request() return self.main_engine.cancel_order(req, order.gateway_name) - def cancel_local_stop_order(self, strategy: CtaTemplate, stop_orderid: str): + def cancel_local_stop_order(self, strategy_name: str, stop_orderid: str): """ Cancel a local stop order. """ stop_order = self.stop_orders.get(stop_orderid, None) if not stop_order: return False - strategy = self.strategies[stop_order.strategy_name] + strategy = self.strategies[strategy_name] # Remove from relation map. self.stop_orders.pop(stop_orderid) @@ -755,7 +811,7 @@ class CtaOptionEngine(BaseEngine): def send_order( self, - strategy: CtaTemplate, + strategy_name: str, vt_symbol: str, direction: Direction, offset: Offset, @@ -763,15 +819,17 @@ class CtaOptionEngine(BaseEngine): volume: float, stop: bool, order_type: OrderType = OrderType.LIMIT, - gateway_name: str = None + gateway_name: str = None, + internal=False ): """ - 该方法供策略使用,发送委托。 + 该方法供策略、引擎使用,发送委托。 + internal: True,引擎内部使用,执行自动轧差; False 直接使用 """ contract = self.main_engine.get_contract(vt_symbol) if not contract: self.write_log(msg=f"委托失败,找不到合约:{vt_symbol}", - strategy_name=strategy.strategy_name, + strategy_name=strategy_name, level=logging.ERROR) return "" if contract.gateway_name and not gateway_name: @@ -779,37 +837,347 @@ class CtaOptionEngine(BaseEngine): # Round order price and volume to nearest incremental value price = round_to(price, contract.pricetick) volume = round_to(volume, contract.min_volume) - + if volume <= 0: + self.write_error(msg=f"委托失败,合约:{vt_symbol},委托数量{volume}不符合正数", + strategy_name=strategy_name, + level=logging.ERROR) + return "" if stop: if contract.stop_supported: - return self.send_server_stop_order(strategy, contract, direction, offset, price, volume, + return self.send_server_stop_order(strategy_name, contract, direction, offset, price, volume, gateway_name) else: - return self.send_local_stop_order(strategy, vt_symbol, direction, offset, price, volume, + return self.send_local_stop_order(strategy_name, vt_symbol, direction, offset, price, volume, gateway_name) - if order_type == OrderType.FAK: - return self.send_fak_order(strategy, contract, direction, offset, price, volume, gateway_name) - else: - return self.send_limit_order(strategy, contract, direction, offset, price, volume, gateway_name) + # 内部订单 + if internal: + return self.handel_internal_order( + strategy_name=strategy_name, + vt_symbol=vt_symbol, + direction=direction, + offset=offset, + price=price, + volume=volume, + gateway_name=gateway_name) - def cancel_order(self, strategy: CtaTemplate, vt_orderid: str): + # 直接调用主引擎 + if order_type == OrderType.FAK: + return self.send_fak_order(strategy_name, contract, direction, offset, price, volume, gateway_name) + else: + return self.send_limit_order(strategy_name, contract, direction, offset, price, volume, gateway_name) + + def cancel_order(self, strategy_name: str, vt_orderid: str): """ """ if vt_orderid.startswith(STOPORDER_PREFIX): - return self.cancel_local_stop_order(strategy, vt_orderid) + return self.cancel_local_stop_order(strategy_name, vt_orderid) else: - return self.cancel_server_order(strategy, vt_orderid) + return self.cancel_server_order(strategy_name, vt_orderid) - def cancel_all(self, strategy: CtaTemplate): + def cancel_all(self, strategy_name: str): """ Cancel all active orders of a strategy. """ - vt_orderids = self.strategy_orderid_map[strategy.strategy_name] + vt_orderids = self.strategy_orderid_map[strategy_name] if not vt_orderids: return for vt_orderid in copy(vt_orderids): - self.cancel_order(strategy, vt_orderid) + self.cancel_order(strategy_name, vt_orderid) + + def handel_internal_order(self, **kwargs) -> str: + """ + 处理内部订单: + 策略 => 内部订单 => 产生内部订单号 => 登记内部处理逻辑 => 添加后续异步task + :param kwargs: + :return: + """ + self.write_log(f'内部订单 => 开始处理') + vt_symbol = kwargs.get('vt_symbol') + symbol, exchange = extract_vt_symbol(vt_symbol) + orderid = f'o_{self.int_orderid_count}' + strategy_name = kwargs.get('strategy_name', "") + + self.int_orderid_count += 1 + order = OrderData( + symbol=symbol, + exchange=exchange, + name=self.get_name(vt_symbol), + orderid=orderid, + direction=kwargs.get('direction'), + offset=kwargs.get('offset'), + type=OrderType.LIMIT, + price=kwargs.get('price'), + volume=kwargs.get('volume'), + datetime=datetime.now(), + gateway_name=kwargs.get('gateway_name', "") + ) + self.write_log(f'内部订单 => 生成 \n{print_dict(order.__dict__)}') + strategy = self.strategies.get(strategy_name, None) + if strategy: + self.write_log(f'内部订单 => 绑定 {order.vt_orderid} <=>策略{strategy_name}') + self.orderid_strategy_map[order.vt_orderid] = strategy + self.strategy_orderid_map[strategy.strategy_name].add(order.vt_orderid) + + task = self.thread_executor.submit(self._handle_internal_order, order, strategy_name) + self.thread_tasks.append(task) + + return [order.vt_orderid] + + def _handle_internal_order(self, order: OrderData, strategy_name: str): + """ + 线程执行内部订单 + :param order: + :return: + """ + self.write_log(f'内部订单 => 异步处理') + vt_symbol = order.vt_symbol + # 发送委托更新 + order.sys_orderid = order.orderid + order.status = Status.ALLTRADED + order.traded = order.volume + order.time = order.datetime.strftime("%H:%M:%S") + + # 制作假的成交单 + trade = TradeData( + symbol=order.symbol, + exchange=order.exchange, + direction=order.direction, + offset=order.offset, + name=order.name, + strategy_name=strategy_name, + orderid=order.orderid, + tradeid=f't_{order.orderid}', + price=order.price, + volume=order.volume, + datetime=datetime.now(), + time=order.time, + gateway_name=order.gateway_name + ) + self.write_log(f'内部订单 => 生成成交单 \n {print_dict(trade.__dict__)}') + # 发出委托更新、订单更新 + self.event_engine.put(Event(type=EVENT_ORDER, data=order)) + self.event_engine.put(Event(type=EVENT_TRADE, data=trade)) + + target_pos = self.net_pos_target.get(vt_symbol, 0) + + if order.direction == Direction.LONG: + new_target_pos = target_pos + order.volume + else: + new_target_pos = target_pos - order.volume + + self.net_pos_target.update({vt_symbol: new_target_pos}) + + self.write_log( + f'{strategy_name} {order.direction.value} {order.offset.value}: net_pos_target: {target_pos} => {new_target_pos}') + # 记录日志 + append_data( + file_name=os.path.abspath(os.path.join(self.get_data_path(), 'cta_option_internal_orders.csv')), + dict_data=OrderedDict({ + 'datetime': order.datetime.strftime('%Y-%m-%d %H:%M:%S'), + 'strategy_name': strategy_name, + 'vt_symbol': order.volume, + 'name': order.name, + 'direction': order.direction.value, + 'offset': order.offset.value, + 'price': order.price, + 'volume': order.volume, + 'old_target': target_pos, + 'new_target': new_target_pos + })) + + def load_internal_data(self): + """ + 加载内部数据 + :return: + """ + f_name = os.path.abspath(os.path.join(self.get_data_path(),f'{self.engine_name}_datas.json')) + try: + j = load_json(f_name,auto_save=True) + self.net_pos_target = j.get('net_pos_target', {}) + self.net_pos_holding = j.get('net_pos_holding', {}) + self.write_log('恢复内部持仓目标:{}'.format( + ';'.join([f'{k}[{self.get_name(k)}]:{v}' for k,v in self.net_pos_target.items()]))) + self.write_log('恢复内部持仓:{}'.format( + ';'.join([f'{k}[{self.get_name(k)}]:{v}' for k, v in self.net_pos_target.items()]))) + + except Exception as ex: + self.write_error(f'恢复内部数据异常:{str(ex)}') + + def save_internal_data(self): + """ + 保存内部数据 + :return: + """ + f_name = os.path.abspath(os.path.join(self.get_data_path(), f'{self.engine_name}_datas.json')) + try: + d = { + "net_pos_target":self.net_pos_target, + "net_pos_holding":self.net_pos_holding + } + save_json(f_name,d) + except Exception as ex: + self.write_error(f'保存内部数据异常:{str(ex)}') + + def execute_pos_target(self, vt_symbol): + """ + 执行仓位目标 + :param vt_symbol: + :return: + """ + target_pos = self.net_pos_target.get(vt_symbol, 0) + holding_pos = self.net_pos_holding.get(vt_symbol, 0) + + diff_pos = target_pos - holding_pos + if diff_pos == 0: + return + # 获取最新价 + cur_price = self.get_price(vt_symbol) + if not cur_price: + self.write_log(f'仓位目标执行 => 订阅{vt_symbol}行情') + contract = self.main_engine.get_contract(vt_symbol) + if contract: + gateway_name = "" + if contract.gateway_name: + gateway_name = contract.gateway_name + req = SubscribeRequest( + symbol=contract.symbol, exchange=contract.exchange) + self.main_engine.subscribe(req, gateway_name) + + return + # 获取最新tick + cur_tick = self.get_tick(vt_symbol) + if cur_tick is None: + return + + price_tick = self.get_price_tick(vt_symbol) + # 需要买入 + if diff_pos > 0: + # 账号得多、空 + acc_long_position = self.get_position(vt_symbol=vt_symbol, direction=Direction.LONG) + acc_long_pos = 0 if acc_long_position is None else acc_long_position.volume-acc_long_position.frozen + acc_short_position = self.get_position(vt_symbol=vt_symbol, direction=Direction.SHORT) + acc_short_pos = 0 if acc_short_position is None else acc_short_position.volume-acc_short_position.frozen + cover_pos = 0 + buy_pos = 0 + # 仅平仓 + if acc_short_pos > 0: + # 优先平空单 + cover_pos = min(diff_pos,acc_short_pos) + buy_pos = diff_pos - cover_pos + else: + # 仅开仓 + cover_pos = 0 + buy_pos = diff_pos + + self.write_log(f'{self.engine_name}仓位执行{vt_symbol}[{self.get_name(vt_symbol)}]: ' + + f'[账号多单:{acc_long_pos},空单:{acc_short_pos}]' + + f'[holding:{holding_pos} =>target:{target_pos} ] => cover:{cover_pos} + buy:{buy_pos}') + if cover_pos > 0: + if not self.exist_order(vt_symbol, direction=Direction.LONG, offset=Offset.CLOSE): + vt_orderids = self.send_order( + strategy_name="", + vt_symbol=vt_symbol, + price=cur_price, + volume=cover_pos, + direction=Direction.LONG, + offset=Offset.CLOSE, + order_type=OrderType.LIMIT, + stop=False, + internal=False + ) + if len(vt_orderids) > 0: + self.internal_orderids =self.internal_orderids.union(vt_orderids) + + if buy_pos > 0: + if not self.exist_order(vt_symbol, direction=Direction.LONG, offset=Offset.OPEN): + vt_orderids = self.send_order( + strategy_name="", + vt_symbol=vt_symbol, + price=cur_price, + volume=buy_pos, + direction=Direction.LONG, + offset=Offset.OPEN, + order_type=OrderType.LIMIT, + stop=False, + internal=False + ) + if len(vt_orderids) > 0: + self.internal_orderids= self.internal_orderids.union(vt_orderids) + # 需要卖出 ( diff_pos < 0) + else: + # 账号得多、空 + acc_long_position = self.get_position(vt_symbol=vt_symbol, direction=Direction.LONG) + acc_long_pos = 0 if acc_long_position is None else acc_long_position.volume - acc_long_position.frozen + acc_short_position = self.get_position(vt_symbol=vt_symbol, direction=Direction.SHORT) + acc_short_pos = 0 if acc_short_position is None else acc_short_position.volume - acc_short_position.frozen + + # 如果持有多单,优先平掉多单 + if acc_long_pos > 0: + sell_pos = min(abs(diff_pos), acc_long_pos) + short_pos = abs(diff_pos) - sell_pos + else: + # 仅开仓 + sell_pos = 0 + short_pos = abs(diff_pos) + + self.write_log(f'{self.engine_name}仓位执行{vt_symbol}[{self.get_name(vt_symbol)}]' + + f'[账号多单:{acc_long_pos},空单:{acc_short_pos}],' + + f'[holding:{holding_pos} => target:{target_pos}] => sell:{sell_pos}, short:{short_pos}') + + if sell_pos > 0: + if not self.exist_order(vt_symbol, direction=Direction.SHORT, offset=Offset.CLOSE): + vt_orderids = self.send_order( + strategy_name="", + vt_symbol=vt_symbol, + price=cur_price, + volume=sell_pos, + direction=Direction.SHORT, + offset=Offset.CLOSE, + order_type=OrderType.LIMIT, + stop=False, + internal=False + ) + if len(vt_orderids) > 0: + self.internal_orderids = self.internal_orderids.union(vt_orderids) + if short_pos > 0: + if not self.exist_order(vt_symbol, direction=Direction.SHORT, offset=Offset.OPEN): + vt_orderids = self.send_order( + strategy_name="", + vt_symbol=vt_symbol, + price=cur_price, + volume=short_pos, + direction=Direction.LONG, + offset=Offset.OPEN, + order_type=OrderType.LIMIT, + stop=False, + internal=False + ) + if len(vt_orderids) > 0: + self.internal_orderids = self.internal_orderids.union(vt_orderids) + + def exist_order(self, vt_symbol, direction, offset): + """ + 是否存在相同得委托 + :param vt_symbol: + :param direction: + :param offset: + :return: + """ + if len(self.active_orders) == 0: + self.write_log(f'内部活动订单中,数量为零. 查询{vt_symbol},方向:{direction.value}, 开平:{offset.value}') + return False + + for vt_orderid in list(self.active_orders.keys()): + order = self.active_orders.get(vt_orderid, None) + if order is None: + continue + + if order.vt_symbol == vt_symbol and order.direction == direction and order.offset == offset: + self.write_log(f'引擎存在相同得内部活动订单:{order.name}') + return True + + return False def subscribe_symbol(self, strategy_name: str, vt_symbol: str, gateway_name: str = '', is_bar: bool = False): """订阅合约""" @@ -1215,7 +1583,8 @@ class CtaOptionEngine(BaseEngine): # 复权转换 adj_list = self.adjust_factor_dict.get(vt_symbol, []) # 按照结束日期,裁剪复权记录 - adj_list = [row for row in adj_list if row['dividOperateDate'].replace('-', '') <= end.strftime('%Y%m%d')] + adj_list = [row for row in adj_list if + row['dividOperateDate'].replace('-', '') <= end.strftime('%Y%m%d')] if len(adj_list) > 0: self.write_log(f'需要对{vt_symbol}进行前复权处理') @@ -1223,13 +1592,14 @@ class CtaOptionEngine(BaseEngine): row.update({'dividOperateDate': row.get('dividOperateDate')[:10] + ' 09:30:00'}) # list -> dataframe, 转换复权日期格式 adj_data = pd.DataFrame(adj_list) - adj_data["dividOperateDate"] = pd.to_datetime(adj_data["dividOperateDate"], format="%Y-%m-%d %H:%M:%S") + adj_data["dividOperateDate"] = pd.to_datetime(adj_data["dividOperateDate"], + format="%Y-%m-%d %H:%M:%S") adj_data = adj_data.set_index("dividOperateDate") # 调用转换方法,对open,high,low,close, volume进行复权, fore, 前复权, 其他,后复权 symbol_df = stock_to_adj(symbol_df, adj_data, adj_type='fore') for dt, bar_data in symbol_df.iterrows(): - bar_datetime = dt #- timedelta(seconds=bar_interval_seconds) + bar_datetime = dt # - timedelta(seconds=bar_interval_seconds) bar = BarData( gateway_name='backtesting', @@ -1266,7 +1636,6 @@ class CtaOptionEngine(BaseEngine): return bars - def resample_bars(self, df, x_min=None, x_hour=None, to_day=False): """ 重建x分钟K线(或日线) @@ -1917,7 +2286,7 @@ class CtaOptionEngine(BaseEngine): self.init_mongo_data() if self.mongo_data and self.mongo_data.db_has_connected: - filter = {'account_id':self.engine_config.get('accountid','-')} + filter = {'account_id': self.engine_config.get('accountid', '-')} pos_list = self.mongo_data.db_query( db_name='Account', @@ -2013,19 +2382,19 @@ class CtaOptionEngine(BaseEngine): for pos in self.main_engine.get_all_positions(): vt_symbols.add(pos.vt_symbol) vt_symbol_pos = compare_pos.get(pos.vt_symbol, { - "账号空单": 0, - '账号多单': 0, - '策略空单': 0, - '策略多单': 0, - '空单策略': [], - '多单策略': [] - }) + "账号空单": 0, + '账号多单': 0, + '策略空单': 0, + '策略多单': 0, + '空单策略': [], + '多单策略': [] + }) if pos.direction == Direction.LONG: vt_symbol_pos['账号多单'] = vt_symbol_pos['账号多单'] + pos.volume else: vt_symbol_pos['账号空单'] = vt_symbol_pos['账号空单'] + pos.volume - compare_pos.update({pos.vt_symbol:vt_symbol_pos}) + compare_pos.update({pos.vt_symbol: vt_symbol_pos}) # 逐一根据策略仓位,与Account_pos进行处理比对 for strategy_pos in strategy_pos_list: @@ -2081,33 +2450,31 @@ class CtaOptionEngine(BaseEngine): 'direction': Direction.SHORT.value, 'strategy_list': symbol_pos.get('空单策略', [])} - # 股指期货: 帐号多/空轧差, vs 策略多空轧差 是否一致; - # 其他期货:帐号多单 vs 除了多单, 空单 vs 空单 - if vt_symbol.endswith(".CFFEX"): - diff_match = (symbol_pos.get('账号多单', 0) - symbol_pos.get('账号空单', 0)) == ( - symbol_pos.get('策略多单', 0) - symbol_pos.get('策略空单', 0)) - pos_match = symbol_pos.get('账号空单', 0) == symbol_pos.get('策略空单', 0) and \ - symbol_pos.get('账号多单', 0) == symbol_pos.get('策略多单', 0) - match = diff_match - # 轧差一致,帐号/策略持仓不一致 - if diff_match and not pos_match: - if symbol_pos.get('账号多单', 0) > symbol_pos.get('策略多单', 0): - self.write_log('{}轧差持仓:多:{},空:{} 大于 策略持仓 多:{},空:{}'.format( - vt_symbol, - symbol_pos.get('账号多单', 0), - symbol_pos.get('账号空单', 0), - symbol_pos.get('策略多单', 0), - symbol_pos.get('策略空单', 0) - )) - diff_pos_dict.update({vt_symbol: {"long": symbol_pos.get('账号多单', 0) - symbol_pos.get('策略多单', 0), - "short": symbol_pos.get('账号空单', 0) - symbol_pos.get('策略空单', - 0)}}) - else: - match = round(symbol_pos.get('账号空单', 0), 7) == round(symbol_pos.get('策略空单', 0), 7) and \ - round(symbol_pos.get('账号多单', 0), 7) == round(symbol_pos.get('策略多单', 0), 7) + # 帐号多/空轧差, vs 策略多空轧差 是否一致; + + diff_match = (symbol_pos.get('账号多单', 0) - symbol_pos.get('账号空单', 0)) == ( + symbol_pos.get('策略多单', 0) - symbol_pos.get('策略空单', 0)) + pos_match = symbol_pos.get('账号空单', 0) == symbol_pos.get('策略空单', 0) and \ + symbol_pos.get('账号多单', 0) == symbol_pos.get('策略多单', 0) + match = diff_match + # 轧差一致,帐号/策略持仓不一致 + if diff_match and not pos_match: + if symbol_pos.get('账号多单', 0) > symbol_pos.get('策略多单', 0): + self.write_log('{}轧差持仓:多:{},空:{} 大于 策略持仓 多:{},空:{}'.format( + vt_symbol, + symbol_pos.get('账号多单', 0), + symbol_pos.get('账号空单', 0), + symbol_pos.get('策略多单', 0), + symbol_pos.get('策略空单', 0) + )) + diff_pos_dict.update({vt_symbol: {"long": symbol_pos.get('账号多单', 0) - symbol_pos.get('策略多单', 0), + "short": symbol_pos.get('账号空单', 0) - symbol_pos.get('策略空单', + 0)}}) + # 多空都一致 if match: - msg = u'{}多空都一致.{}\n'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False)) + msg = u'{}[{}]多空都一致.{}\n'.format(vt_symbol, self.get_name(vt_symbol), + json.dumps(symbol_pos, indent=2, ensure_ascii=False)) self.write_log(msg) compare_info += msg else: @@ -2131,14 +2498,15 @@ class CtaOptionEngine(BaseEngine): diff_short_volume = round(symbol_pos.get('账号空单', 0), 7) - round(symbol_pos.get('策略空单', 0), 7) if diff_short_volume != 0: - msg = '{}空单[账号({}), 策略{},共({})], ' \ + msg = '{}[{}] 空单[账号({}), 策略{},共({})], ' \ .format(vt_symbol, + self.get_name(vt_symbol), symbol_pos.get('账号空单'), symbol_pos.get('空单策略'), symbol_pos.get('策略空单')) pos_compare_result += msg - self.write_error(u'{}不一致:{}'.format(vt_symbol, msg)) - compare_info += u'{}不一致:{}\n'.format(vt_symbol, msg) + self.write_error(u'{}[{}]不一致:{}'.format(vt_symbol, self.get_name(vt_symbol), msg)) + compare_info += u'{}[{}]不一致:{}\n'.format(vt_symbol, self.get_name(vt_symbol), msg) if auto_balance: self.balance_pos(vt_symbol, Direction.SHORT, diff_short_volume) @@ -2374,6 +2742,6 @@ class CtaOptionEngine(BaseEngine): if strategy: subject = f"{strategy.strategy_name}" else: - subject = "CTAOPtion引擎" + subject = "CTA Option引擎" send_wx_msg(content=f'{subject}:{msg}') diff --git a/vnpy/app/cta_option/template.py b/vnpy/app/cta_option/template.py index 98ff2a24..51d892ee 100644 --- a/vnpy/app/cta_option/template.py +++ b/vnpy/app/cta_option/template.py @@ -194,7 +194,7 @@ class CtaTemplate(ABC): pos = PositionData( gateway_name=contract.gateway_name if contract else '', symbol=symbol, - name=contract.name, + name=contract.name if contract else symbol, exchange=exchange, direction=direction ) @@ -405,14 +405,15 @@ class CtaTemplate(ABC): return [] vt_orderids = self.cta_engine.send_order( - strategy=self, + strategy_name=self.strategy_name, vt_symbol=vt_symbol, direction=direction, offset=offset, price=price, volume=volume, stop=stop, - order_type=order_type + order_type=order_type, + internal=True ) if len(vt_orderids) == 0: self.write_error(f'{self.strategy_name}调用cta_engine.send_order委托返回失败,vt_symbol:{vt_symbol}') @@ -452,7 +453,7 @@ class CtaTemplate(ABC): Cancel an existing order. """ if self.trading: - return self.cta_engine.cancel_order(self, vt_orderid) + return self.cta_engine.cancel_order(self.strategy_name, vt_orderid) return False @@ -461,7 +462,7 @@ class CtaTemplate(ABC): Cancel all orders sent by strategy. """ if self.trading: - self.cta_engine.cancel_all(self) + self.cta_engine.cancel_all(self.strategy_name) def is_upper_limit(self, symbol): """是否涨停""" @@ -539,7 +540,11 @@ class CtaTemplate(ABC): self.cta_engine.sync_strategy_data(self) class CtaOptionTemplate(CtaTemplate): - """期权交易增强版模板""" + """ + 期权交易增强版模板 + 使用target_pos得方式,开平仓时,只需要更新self.policy.target_pos{} + + """ # 逻辑过程日志 @@ -595,7 +600,6 @@ class CtaOptionTemplate(CtaTemplate): self.write_log('当前policy:\n{}'.format(print_dict(self.policy.to_json()))) - def sync_data(self): """同步更新数据""" if not self.backtesting: diff --git a/vnpy/app/cta_stock/template.py b/vnpy/app/cta_stock/template.py index d72ceada..46bf93c9 100644 --- a/vnpy/app/cta_stock/template.py +++ b/vnpy/app/cta_stock/template.py @@ -1078,6 +1078,8 @@ class CtaStockTemplate(CtaTemplate): ordering_grid = None for grid in self.gt.dn_grids: + cn_name = self.cta_engine.get_name(grid.vt_symbol) + # 只扫描vt_symbol 匹配的网格 if vt_symbol and vt_symbol != grid.vt_symbol: continue @@ -1088,10 +1090,15 @@ class CtaStockTemplate(CtaTemplate): # 排除存在委托单号的网格 if len(grid.order_ids) > 0: + self.write_log(f'网格{grid.vt_symbol}[{cn_name}]存在委托单号:{grid.order_ids}') continue if grid.volume <= grid.traded_volume: - self.write_log(u'网格计划卖出:{},已成交:{}'.format(grid.volume, grid.traded_volume)) + self.write_log(u'{}[{}]网格计划卖出:{},已成交:{}'.format( + grid.vt_symbol, + cn_name, + grid.volume, + grid.traded_volume)) self.tns_finish_sell_grid(grid) continue @@ -1103,7 +1110,7 @@ class CtaStockTemplate(CtaTemplate): direction=Direction.NET) vt_symbol = ordering_grid.vt_symbol - cn_name = self.cta_engine.get_name(ordering_grid.vt_symbol) + sell_volume = ordering_grid.volume - ordering_grid.traded_volume if acc_symbol_pos is None: @@ -1122,11 +1129,12 @@ class CtaStockTemplate(CtaTemplate): sell_volume = acc_symbol_pos.volume if sell_volume == 0: - self.write_log(f'账号{vt_symbol}持仓{acc_symbol_pos.volume},卖出目标:{sell_volume}=0 不执行') + self.write_log(f'账号{vt_symbol}[{cn_name}]持仓{acc_symbol_pos.volume},卖出目标:{sell_volume}=0 不执行') continue cur_price = self.cta_engine.get_price(vt_symbol) if not cur_price: + self.write_log(f'获取不到{vt_symbol}[{cn_name}]价格,发出订阅') self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=vt_symbol) continue diff --git a/vnpy/app/cta_strategy_pro/template.py b/vnpy/app/cta_strategy_pro/template.py index 5cac1637..c2a38490 100644 --- a/vnpy/app/cta_strategy_pro/template.py +++ b/vnpy/app/cta_strategy_pro/template.py @@ -871,7 +871,9 @@ class CtaProTemplate(CtaTemplate): for g in self.gt.get_opened_grids(direction=Direction.LONG): vt_symbol = g.snapshot.get('mi_symbol', g.vt_symbol if g.vt_symbol and '99' not in g.vt_symbol else self.vt_symbol) open_price = g.snapshot.get('open_price', g.open_price) + name = self.cta_engine.get_name(vt_symbol) pos_list.append({'vt_symbol': vt_symbol, + 'name':name, 'direction': 'long', 'volume': g.volume - g.traded_volume, 'price': open_price}) @@ -880,7 +882,9 @@ class CtaProTemplate(CtaTemplate): for g in self.gt.get_opened_grids(direction=Direction.SHORT): vt_symbol = g.snapshot.get('mi_symbol', g.vt_symbol if g.vt_symbol and '99' not in g.vt_symbol else self.vt_symbol) open_price = g.snapshot.get('open_price', g.open_price) + name = self.cta_engine.get_name(vt_symbol) pos_list.append({'vt_symbol': vt_symbol, + 'name': name, 'direction': 'short', 'volume': abs(g.volume - g.traded_volume), 'price': open_price}) @@ -1787,7 +1791,7 @@ class CtaProFutureTemplate(CtaProTemplate): # order_price = order_info['price'] # order_direction = order_info['direction'] # order_offset = order_info['offset'] - order_grid = order_info['grid'] + order_grid = order_info.get('grid',None) order_status = order_info.get('status', Status.NOTTRADED) order_type = order_info.get('order_type', OrderType.LIMIT) over_seconds = (dt - order_time).total_seconds() @@ -1827,7 +1831,7 @@ class CtaProFutureTemplate(CtaProTemplate): if order_info['direction'] == Direction.SHORT: cur_price = self.cta_engine.get_price(order_vt_symbol) short_price = cur_price - self.price_tick - if order_grid.volume != order_volume and order_volume > 0: + if order_grid and order_grid.volume != order_volume and order_volume > 0: self.write_log( u'网格volume:{},order_volume:{}不一致,修正'.format(order_grid.volume, order_volume)) order_grid.volume = order_volume @@ -1842,13 +1846,14 @@ class CtaProFutureTemplate(CtaProTemplate): if len(vt_orderids) > 0: self.write_log(u'委托成功,orderid:{}'.format(vt_orderids)) - order_grid.snapshot.update({'open_price': short_price}) + if order_grid: + order_grid.snapshot.update({'open_price': short_price}) else: self.write_error(u'撤单后,重新委托开空仓失败') else: cur_price = self.cta_engine.get_price(order_vt_symbol) buy_price = cur_price + self.price_tick - if order_grid.volume != order_volume and order_volume > 0: + if order_grid and order_grid.volume != order_volume and order_volume > 0: self.write_log( u'网格volume:{},order_volume:{}不一致,修正'.format(order_grid.volume, order_volume)) order_grid.volume = order_volume @@ -1863,7 +1868,8 @@ class CtaProFutureTemplate(CtaProTemplate): if len(vt_orderids) > 0: self.write_log(u'委托成功,orderids:{}'.format(vt_orderids)) - order_grid.snapshot.update({'open_price': buy_price}) + if order_grid: + order_grid.snapshot.update({'open_price': buy_price}) else: self.write_error(u'撤单后,重新委托开多仓失败') else: diff --git a/vnpy/app/index_tick_publisher/engine.py b/vnpy/app/index_tick_publisher/engine.py index 3592dfff..73cffd04 100644 --- a/vnpy/app/index_tick_publisher/engine.py +++ b/vnpy/app/index_tick_publisher/engine.py @@ -132,6 +132,8 @@ class IndexTickPublisherV2(BaseEngine): if len(self.selected_underly_symbols) > 0 and underly_symbol not in self.selected_underly_symbols: continue + self.write_log(f'定时检查{underly_symbol}') + # 日盘数据,夜盘期间不订阅 if dt_now.hour < 4 or dt_now.hour > 20: if underly_symbol in MARKET_DAY_ONLY: @@ -139,12 +141,17 @@ class IndexTickPublisherV2(BaseEngine): # 获取当前所有的合约列表 symbols = info.get('symbols', {}) + + total_oi = 0 + if len(symbols) > 0: + total_oi = sum([v for v in symbols.values()]) + # 获取交易所 exchange = info.get('exchange', 'LOCAL') # 获取本地记录的tick dict tick_dict = self.ticks.get(underly_symbol, {}) - for symbol in symbols.keys(): + for symbol in list(symbols.keys()): # 全路径合约 => 标准合约 ,如 ZC2109 => ZC109, RB2110 => rb2110 vn_symbol = get_real_symbol_by_exchange(symbol, Exchange(exchange)) @@ -152,6 +159,14 @@ class IndexTickPublisherV2(BaseEngine): self.write_log(f'移除早于当月的合约{symbol}') symbols.pop(symbol, None) continue + + cur_oi = symbols.get(symbol,0) + + if cur_oi < max(total_oi * 0.03,100): + self.write_log(f'{symbol} 上一交易日持仓量:{cur_oi} 小于合约总持仓量{total_oi}得3% {max(total_oi * 0.03,100)},不纳入指数计算范围') + symbols.pop(symbol, None) + continue + # 生成带交易所信息的合约 vt_symbol = f'{vn_symbol}.{exchange}' # symbol_exchange_map是全局变量,ctp md api会使用到,所以需要更新其 合约与交易所的关系 diff --git a/vnpy/trader/constant.py b/vnpy/trader/constant.py index 2887beac..303bdec9 100644 --- a/vnpy/trader/constant.py +++ b/vnpy/trader/constant.py @@ -256,8 +256,8 @@ class ChanSignals(Enum): # 趋势类买卖点(9~13笔分析结果) Q1L0 = "Q1L0~趋势类一买" Q2L0 = "Q2L0~趋势类二买" - Q3L0 = "Q2L0~趋势类三买" + Q3L0 = "Q3L0~趋势类三买" Q1S0 = "Q1S0~趋势类一卖" Q2S0 = "Q2S0~趋势类二卖" - Q3S0 = "Q2S0~趋势类三卖" + Q3S0 = "Q3S0~趋势类三卖" diff --git a/vnpy/trader/utility.py b/vnpy/trader/utility.py index 1affbfb1..014bfa92 100644 --- a/vnpy/trader/utility.py +++ b/vnpy/trader/utility.py @@ -337,7 +337,7 @@ def get_digits(value: float) -> int: def print_dict(d: dict): """返回dict的字符串类型""" - max_key_len = max([len(str(k)) for k in d.keys()]) + max_key_len = max([len(str(k)) for k in d.keys()]) if len(d.keys()) > 0 else 10 return '\n'.join([str(key) + (max_key_len-len(str(key))) * " " + f': {d[key]}' for key in sorted(d.keys())])