diff --git a/vnpy/app/cta_option/engine.py b/vnpy/app/cta_option/engine.py index 527ce519..c7e3c6de 100644 --- a/vnpy/app/cta_option/engine.py +++ b/vnpy/app/cta_option/engine.py @@ -182,6 +182,7 @@ class CtaOptionEngine(BaseEngine): self.vt_tradeids = set() # for filtering duplicate trade self.active_orders = {} self.internal_orderids = set() + self.single_execute_volume = 1 self.net_pos_target = {} # 净仓目标, vt_symbol: {pos: 正负数} self.net_pos_holding = {} # 净仓持有, vt_symbol: {pos: 正负数} @@ -222,6 +223,7 @@ class CtaOptionEngine(BaseEngine): def init_engine(self): """ + 初始化引擎 """ self.register_event() self.register_funcs() @@ -234,6 +236,10 @@ class CtaOptionEngine(BaseEngine): self.write_log("CTA策略引擎初始化成功") + if self.engine_config.get('single_execute_volume',0) > 0: + self.single_execute_volume = self.engine_config.get('single_execute_volume',1) + self.write_log(f'使用配置得单笔下仓数量:{self.single_execute_volume}') + if self.engine_config.get('get_pos_from_db', False): self.write_log(f'激活数据库策略仓位比对模式') self.init_mongo_data() @@ -318,6 +324,7 @@ class CtaOptionEngine(BaseEngine): if dt.hour == 2 and dt.minute == 59 and dt.second >= 55: self.cancel_all(strategy) + # 每分钟执行的逻辑 if self.last_minute != dt.minute: self.last_minute = dt.minute @@ -338,9 +345,9 @@ class CtaOptionEngine(BaseEngine): (datetime.now() - order.datetime).total_seconds() > 60 and \ order.status in [Status.NOTTRADED, Status.PARTTRADED]: self.write_log( - f'内部活动订单{order.orderid}, {order.vt_symbol}[{order.name}], {order.direction.value}, {order.offset.value}超时') + f'内部活动订单{order.orderid}, {order.vt_symbol}[{order.name}], {order.direction.value}, {order.offset.value},超时.发出撤单') req = order.create_cancel_request() - return self.main_engine.cancel_order(req, order.gateway_name) + self.main_engine.cancel_order(req, order.gateway_name) for vt_symbol in set(self.net_pos_target.keys()).union(set(self.net_pos_holding.keys())): self.execute_pos_target(vt_symbol) @@ -394,20 +401,28 @@ class CtaOptionEngine(BaseEngine): self.call_strategy_func(strategy, strategy.on_bar, {bar.vt_symbol: bar}) def process_order_event(self, event: Event): - """""" + """ + 委托更新事件处理 + :param event: + :return: + """ order = event.data strategy = self.orderid_strategy_map.get(order.vt_orderid, None) if not strategy: - # self.write_log(f'委托单没有对应的策略设置:order:{order.__dict__}') - # self.write_log(f'当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}') - if order.type != OrderType.STOP: - if order.status in [Status.ALLTRADED, Status.CANCELLED, Status.REJECTED]: - self.active_orders.pop(order.vt_orderid, None) - - elif order.status in [Status.SUBMITTING, Status.NOTTRADED, Status.PARTTRADED]: - self.active_orders.update({order.vt_orderid: copy(order)}) + if order.vt_orderid in self.internal_orderids: + self.write_log(f'委托更新 => 内部仓位: {print_dict(order.__dict__)}') + # self.write_log(f'当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}') + if order.type != OrderType.STOP: + if order.status in [Status.ALLTRADED, Status.CANCELLED, Status.REJECTED]: + self.write_log(f'委托更新 => 内部仓位 => 移除活动订单') + self.active_orders.pop(order.vt_orderid, None) + elif order.status in [Status.SUBMITTING, Status.NOTTRADED, Status.PARTTRADED]: + self.write_log(f'委托更新 => 内部仓位 => 更新活动订单') + self.active_orders.update({order.vt_orderid: copy(order)}) + else: + self.write_log(f'委托更新 => 系统账号 => {print_dict(order.__dict__)}') return self.write_log(f'委托更新:{order.vt_orderid} => 策略:{strategy.strategy_name}') # Remove vt_orderid if order is no longer active. @@ -434,33 +449,44 @@ class CtaOptionEngine(BaseEngine): self.call_strategy_func(strategy, strategy.on_order, order) def process_trade_event(self, event: Event): - """""" + """ + 成交更新事件处理 + :param event: + :return: + """ trade = event.data # Filter duplicate trade push if trade.vt_tradeid in self.vt_tradeids: - self.write_log(f'成交单的交易编号{trade.vt_tradeid}已处理完毕,不再处理') + self.write_log(f'成交更新 => 交易编号{trade.vt_tradeid}已处理完毕,不再处理') return self.vt_tradeids.add(trade.vt_tradeid) strategy = self.orderid_strategy_map.get(trade.vt_orderid, None) + + # 该成交得单子,不属于策略,可能是内部,或者其他实例得成交 if not strategy: + + # 属于内部单子 if trade.vt_orderid in self.internal_orderids: - cur_pos = self.net_pos_holding.get(trade.vt_symbol,0) + cur_pos = self.net_pos_holding.get(trade.vt_symbol, 0) if trade.direction == Direction.LONG: new_pos = cur_pos + trade.volume else: new_pos = cur_pos - trade.volume - self.write_log(f'内部委托单成交更新,{trade.vt_symbol}[{trade.name}]: {cur_pos} => {new_pos}') + self.write_log(f'成交更新 => 内部订单 {trade.vt_symbol}[{trade.name}]: {cur_pos} => {new_pos}') self.write_log(f'成交单:trade:{print_dict(trade.__dict__)}') self.net_pos_holding.update({trade.vt_symbol: new_pos}) self.save_internal_data() + + # 可能是其他实例得 else: - self.write_log(f'成交单没有对应的策略设置:trade:{trade.__dict__}') - self.write_log(f'当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}') + self.write_log(f'成交更新 => 没有对应的策略设置:trade:{trade.__dict__}') + self.write_log(f'成交更新 => 当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}') + self.write_log(f'成交更新 => 当前内部订单清单:{self.internal_orderids}') return - self.write_log(f'成交更新:{trade.vt_orderid} => 策略:{strategy.strategy_name}') + self.write_log(f'成交更新 =>:{trade.vt_orderid} => 策略:{strategy.strategy_name}') # Update strategy pos before calling on_trade method # 取消外部干预策略pos,由策略自行完成更新 @@ -885,7 +911,7 @@ class CtaOptionEngine(BaseEngine): for vt_orderid in copy(vt_orderids): self.cancel_order(strategy_name, vt_orderid) - def handel_internal_order(self, **kwargs) -> str: + def handel_internal_order(self, **kwargs): """ 处理内部订单: 策略 => 内部订单 => 产生内部订单号 => 登记内部处理逻辑 => 添加后续异步task @@ -996,9 +1022,9 @@ class CtaOptionEngine(BaseEngine): j = load_json(f_name,auto_save=True) self.net_pos_target = j.get('net_pos_target', {}) self.net_pos_holding = j.get('net_pos_holding', {}) - self.write_log('恢复内部持仓目标:{}'.format( + self.write_log('恢复内部目标持仓:{}'.format( ';'.join([f'{k}[{self.get_name(k)}]:{v}' for k,v in self.net_pos_target.items()]))) - self.write_log('恢复内部持仓:{}'.format( + self.write_log('恢复内部现有持仓:{}'.format( ';'.join([f'{k}[{self.get_name(k)}]:{v}' for k, v in self.net_pos_target.items()]))) except Exception as ex: @@ -1025,10 +1051,10 @@ class CtaOptionEngine(BaseEngine): :param vt_symbol: :return: """ - target_pos = self.net_pos_target.get(vt_symbol, 0) - holding_pos = self.net_pos_holding.get(vt_symbol, 0) + target_pos = self.net_pos_target.get(vt_symbol, 0) # 该合约内部得目标持仓 + holding_pos = self.net_pos_holding.get(vt_symbol, 0) # 该合约内部得现有持仓 - diff_pos = target_pos - holding_pos + diff_pos = target_pos - holding_pos # 找出差异 if diff_pos == 0: return # 获取最新价 @@ -1051,26 +1077,29 @@ class CtaOptionEngine(BaseEngine): return price_tick = self.get_price_tick(vt_symbol) - # 需要买入 + # 需要增加仓位( buy or cover) if diff_pos > 0: - # 账号得多、空 + # 账号得多、空仓位 acc_long_position = self.get_position(vt_symbol=vt_symbol, direction=Direction.LONG) acc_long_pos = 0 if acc_long_position is None else acc_long_position.volume-acc_long_position.frozen acc_short_position = self.get_position(vt_symbol=vt_symbol, direction=Direction.SHORT) acc_short_pos = 0 if acc_short_position is None else acc_short_position.volume-acc_short_position.frozen - cover_pos = 0 - buy_pos = 0 + + if diff_pos > self.single_execute_volume: + self.write_log(f'内部仓位 => 执行{vt_symbol} => 降低交易头寸: {diff_pos} -> {self.single_execute_volume}') + diff_pos = self.single_execute_volume + # 仅平仓 if acc_short_pos > 0: # 优先平空单 - cover_pos = min(diff_pos,acc_short_pos) + cover_pos = min(diff_pos, acc_short_pos) buy_pos = diff_pos - cover_pos else: # 仅开仓 cover_pos = 0 buy_pos = diff_pos - self.write_log(f'{self.engine_name}仓位执行{vt_symbol}[{self.get_name(vt_symbol)}]: ' + + self.write_log(f'内部仓位 => 执行{vt_symbol}[{self.get_name(vt_symbol)}]: ' + f'[账号多单:{acc_long_pos},空单:{acc_short_pos}]' + f'[holding:{holding_pos} =>target:{target_pos} ] => cover:{cover_pos} + buy:{buy_pos}') if cover_pos > 0: @@ -1087,6 +1116,7 @@ class CtaOptionEngine(BaseEngine): internal=False ) if len(vt_orderids) > 0: + self.write_log(f'内部仓位 => 执行 => cover 登记委托编号:{vt_orderids}') self.internal_orderids =self.internal_orderids.union(vt_orderids) if buy_pos > 0: @@ -1103,16 +1133,21 @@ class CtaOptionEngine(BaseEngine): internal=False ) if len(vt_orderids) > 0: + self.write_log(f'内部仓位 => 执行 => buy 登记委托编号:{vt_orderids}') self.internal_orderids= self.internal_orderids.union(vt_orderids) # 需要卖出 ( diff_pos < 0) else: - # 账号得多、空 + # 账号得多、空单 acc_long_position = self.get_position(vt_symbol=vt_symbol, direction=Direction.LONG) acc_long_pos = 0 if acc_long_position is None else acc_long_position.volume - acc_long_position.frozen acc_short_position = self.get_position(vt_symbol=vt_symbol, direction=Direction.SHORT) acc_short_pos = 0 if acc_short_position is None else acc_short_position.volume - acc_short_position.frozen - # 如果持有多单,优先平掉多单 + if abs(diff_pos) > self.single_execute_volume: + self.write_log(f'内部仓位 => 执行{vt_symbol} => 降低交易头寸: {abs(diff_pos)} -> {self.single_execute_volume}') + diff_pos = -self.single_execute_volume + + # 如果账号持有多单,优先平掉账号多单 if acc_long_pos > 0: sell_pos = min(abs(diff_pos), acc_long_pos) short_pos = abs(diff_pos) - sell_pos @@ -1121,7 +1156,7 @@ class CtaOptionEngine(BaseEngine): sell_pos = 0 short_pos = abs(diff_pos) - self.write_log(f'{self.engine_name}仓位执行{vt_symbol}[{self.get_name(vt_symbol)}]' + + self.write_log(f'内部仓位 => 执行{vt_symbol}[{self.get_name(vt_symbol)}]' + f'[账号多单:{acc_long_pos},空单:{acc_short_pos}],' + f'[holding:{holding_pos} => target:{target_pos}] => sell:{sell_pos}, short:{short_pos}') @@ -1139,6 +1174,7 @@ class CtaOptionEngine(BaseEngine): internal=False ) if len(vt_orderids) > 0: + self.write_log(f'内部仓位 => 执行 => sell 登记委托编号:{vt_orderids}') self.internal_orderids = self.internal_orderids.union(vt_orderids) if short_pos > 0: if not self.exist_order(vt_symbol, direction=Direction.SHORT, offset=Offset.OPEN): @@ -1147,13 +1183,14 @@ class CtaOptionEngine(BaseEngine): vt_symbol=vt_symbol, price=cur_price, volume=short_pos, - direction=Direction.LONG, + direction=Direction.SHORT, offset=Offset.OPEN, order_type=OrderType.LIMIT, stop=False, internal=False ) if len(vt_orderids) > 0: + self.write_log(f'内部仓位 => 执行 => short 登记委托编号:{vt_orderids}') self.internal_orderids = self.internal_orderids.union(vt_orderids) def exist_order(self, vt_symbol, direction, offset): @@ -1174,7 +1211,11 @@ class CtaOptionEngine(BaseEngine): continue if order.vt_symbol == vt_symbol and order.direction == direction and order.offset == offset: - self.write_log(f'引擎存在相同得内部活动订单:{order.name}') + self.write_log(f'引擎存在相同的内部活动订单:{order.name}') + return True + + if order.vt_symbol == vt_symbol and order.direction != direction and order.offset != offset: + self.write_log(f'引擎存在可能自成交的内部活动订单:{order.name}') return True return False diff --git a/vnpy/app/cta_strategy_pro/back_testing.py b/vnpy/app/cta_strategy_pro/back_testing.py index eb2fd2ae..7c9ad349 100644 --- a/vnpy/app/cta_strategy_pro/back_testing.py +++ b/vnpy/app/cta_strategy_pro/back_testing.py @@ -415,7 +415,7 @@ class BackTestingEngine(object): @lru_cache() def get_slippage(self, vt_symbol: str): - """获取滑点""" + """获取滑点点数""" return self.slippage.get(vt_symbol, 0) def set_size(self, vt_symbol: str, size: int): diff --git a/vnpy/app/cta_strategy_pro/engine.py b/vnpy/app/cta_strategy_pro/engine.py index adb06c2b..ee6e2d98 100644 --- a/vnpy/app/cta_strategy_pro/engine.py +++ b/vnpy/app/cta_strategy_pro/engine.py @@ -226,7 +226,7 @@ class CtaEngine(BaseEngine): # 触发每个策略的定时接口 for strategy in list(self.strategies.values()): - strategy.on_timer() + self.call_strategy_func(strategy, strategy.on_timer) if not strategy.trading: all_trading = False diff --git a/vnpy/component/cta_line_bar.py b/vnpy/component/cta_line_bar.py index 9533a673..842746e7 100644 --- a/vnpy/component/cta_line_bar.py +++ b/vnpy/component/cta_line_bar.py @@ -93,12 +93,12 @@ class CtaLineBar(object): lineMSetting = {} lineMSetting['name'] = u'M1' lineMSetting['interval'] = Interval.MINUTE - lineMSetting['bar_interval'] = 60 # 1分钟对应60秒 + lineMSetting['bar_interval'] = 1 # 1分钟对应60秒 lineMSetting['para_ema1_len'] = 7 # EMA线1的周期 lineMSetting['para_ema2_len'] = 21 # EMA线2的周期 lineMSetting['para_boll_len'] = 20 # 布林特线周期 lineMSetting['para_boll_std_rate'] = 2 # 布林特线标准差 - lineMSetting['price_tick'] = self.price_tick # 最小条 + lineMSetting['price_tick'] = self.price_tick # 最小跳 lineMSetting['underlying_symbol'] = self.underlying_symbol #商品短号 self.lineM = CtaLineBar(self, self.onBar, lineMSetting) @@ -5876,6 +5876,47 @@ class CtaLineBar(object): return False + def is_duan_divergence(self, direction, user_macd=False): + """ + 判断 两个线段是否背驰 + :param direction: 1,-1 或者 Direction.LONG(判断是否顶背离), Direction.SHORT(判断是否底背离) + :param user_macd: + :return: + """ + if isinstance(direction, Direction): + direction = 1 if direction == Direction.LONG else -1 + + if self.tre_duan is None: + return False + + # 获取对比的两个线段 + if self.cur_duan.direction == direction: + cur_duan = self.cur_duan + tre_duan = self.tre_duan + else: + if len(self.duan_list) < 4: + return False + cur_duan = self.duan_list[-2] + tre_duan = self.duan_list[-4] + + # 判断dif值是否背驰 + is_dif_div = False + if user_macd: + cur_dif = self.get_dif_by_dt(cur_duan.end) + tre_dif = self.get_dif_by_dt(tre_duan.end) + if (cur_dif > tre_dif and direction == -1) or (cur_dif < tre_dif and direction == 1): + is_dif_div = True + else: + is_dif_div = True + + # 判断 高度,斜率,dif值 + if cur_duan.height <= tre_duan.height and cur_duan.atan < tre_duan.atan and is_dif_div: + if (cur_duan.low < tre_duan.low and cur_duan.high < tre_duan.high and direction == -1) \ + or (cur_duan.high > tre_duan.high and cur_duan.low > tre_duan.high and direction == 1): + return True + + return False + def is_fx_macd_divergence(self, direction, cur_duan=None, use_macd=False): """ 分型的macd背离 @@ -6267,7 +6308,7 @@ class CtaLineBar(object): 'end': self.cur_bi.end, 'price': price, 'signal': signal}) - if len(xt_signals) > 200: + if len(xt_signals) > 20: del xt_signals[0] if cur_signal is not None and self.export_xt_filename: self.append_data( @@ -6295,6 +6336,9 @@ class CtaLineBar(object): 'end': self.cur_bi.end, 'price': price, 'signal': qsbc_2nd}) + if len(self.xt_2nd_signals) > 20: + del self.xt_2nd_signals[0] + if cur_signal is not None and self.export_xt_filename: self.append_data( file_name=self.export_xt_filename.replace('_n_', f'_2nd_'), @@ -6309,7 +6353,7 @@ class CtaLineBar(object): """ 获取n笔形态/信号的倒x笔结果 :param n: - :param x: 倒x笔,如倒1笔 + :param x: 倒x笔,如倒1笔, 倒0笔 :return: {} """ xt_signals = getattr(self, xt_name) diff --git a/vnpy/data/binance/binance_future_data.py b/vnpy/data/binance/binance_future_data.py index f6c7f8ef..ec2e0a6d 100644 --- a/vnpy/data/binance/binance_future_data.py +++ b/vnpy/data/binance/binance_future_data.py @@ -259,7 +259,6 @@ class BinanceFutureData(RestClient): contracts = load_json(f, auto_save=False) return contracts - def save_contracts(self): """保存合约配置""" contracts = self.get_contracts() diff --git a/vnpy/gateway/pb/pb_gateway.py b/vnpy/gateway/pb/pb_gateway.py index 4cf977cf..8a1a0b24 100644 --- a/vnpy/gateway/pb/pb_gateway.py +++ b/vnpy/gateway/pb/pb_gateway.py @@ -392,6 +392,11 @@ class PbGateway(BaseGateway): def send_order(self, req: OrderRequest) -> str: """""" + k = f'{req.vt_symbol}_{req.direction.value}_{req.offset.value}' + if len(self.rejected_orders.get(k, [])) > 5: + self.write_error(f'该合约相同请求已经被拒单五次,不能再发单:{print_dict(req.__dict__)}') + return "" + return self.td_api.send_order(req) def cancel_order(self, req: CancelRequest) -> None: diff --git a/vnpy/trader/gateway.py b/vnpy/trader/gateway.py index 61b2de52..6c79a127 100644 --- a/vnpy/trader/gateway.py +++ b/vnpy/trader/gateway.py @@ -32,7 +32,8 @@ from .object import ( CancelRequest, SubscribeRequest, HistoryRequest, - Exchange + Exchange, + Status ) from vnpy.trader.utility import get_folder_path, round_to, get_underlying_symbol, get_real_symbol_by_exchange @@ -103,6 +104,8 @@ class BaseGateway(ABC): self.query_functions = [] + self.rejected_orders = {} # 当日被拒单得订单, vt_symbol_direction_offset:[orders] + def create_logger(self): """ 创建engine独有的日志 @@ -155,6 +158,13 @@ class BaseGateway(ABC): Order event push. Order event of a specific vt_orderid is also pushed. """ + # 如果是拒单,进行登记 + if order.status == Status.REJECTED: + k = f'{order.vt_symbol}_{order.direction.value}_{order.offset.value}' + orders = self.rejected_orders.get(k,[]) + orders.append(deepcopy(order)) + self.rejected_orders.update({k:orders}) + self.on_event(EVENT_ORDER, order) # self.on_event(EVENT_ORDER + order.vt_orderid, order) @@ -782,6 +792,8 @@ class LocalOrderManager: Keep an order buf before pushing it to gateway. """ self.orders[order.orderid] = copy(order) + + self.gateway.on_order(order) def cancel_order(self, req: CancelRequest) -> None: diff --git a/vnpy/trader/utility.py b/vnpy/trader/utility.py index 014bfa92..e31e3c33 100644 --- a/vnpy/trader/utility.py +++ b/vnpy/trader/utility.py @@ -95,7 +95,7 @@ def get_stock_exchange(code, vn=True): market_id = 0 # 缺省深圳 code = str(code) - if code[0] in ['5', '6', '9'] or code[:3] in ["009", "126", "110", "201", "202", "203", "204"]: + if code[0] in ['5', '6', '9'] or code[:3] in ["009", "126", "110", "201", "202", "203", "204","113"]: market_id = 1 # 上海 try: from vnpy.trader.constant import Exchange