diff --git a/vnpy/app/cta_stock/engine.py b/vnpy/app/cta_stock/engine.py index 07fe1ec6..f341a112 100644 --- a/vnpy/app/cta_stock/engine.py +++ b/vnpy/app/cta_stock/engine.py @@ -237,8 +237,9 @@ class CtaEngine(BaseEngine): # 推送到事件 self.put_all_strategy_pos_event(all_strategy_pos) - for strategy in self.strategies.values(): - if strategy.inited: + for strategy_name in list(self.strategies.keys()): + strategy = self.strategies.get(strategy_name, None) + if strategy and strategy.inited: self.call_strategy_func(strategy, strategy.on_timer) diff --git a/vnpy/app/cta_stock/template.py b/vnpy/app/cta_stock/template.py index 98745a7d..4b0e784a 100644 --- a/vnpy/app/cta_stock/template.py +++ b/vnpy/app/cta_stock/template.py @@ -993,67 +993,62 @@ class CtaStockTemplate(CtaTemplate): self.tns_finish_sell_grid(grid) continue - # 定位到首个满足条件的网格,跳出循环 + # 定位到首个满足条件的网格 ordering_grid = grid - break - # 没有满足条件的网格 - if ordering_grid is None: - return + acc_symbol_pos = self.cta_engine.get_position( + vt_symbol=ordering_grid.vt_symbol, + direction=Direction.NET) + if acc_symbol_pos is None: + self.write_error(f'{self.strategy_name}当前{ordering_grid.vt_symbol}持仓查询不到, 无法执行卖出') + continue - acc_symbol_pos = self.cta_engine.get_position( - vt_symbol=ordering_grid.vt_symbol, - direction=Direction.NET) - if acc_symbol_pos is None: - self.write_error(f'{self.strategy_name}当前{ordering_grid.vt_symbol}持仓查询不到, 无法执行卖出') - return + vt_symbol = ordering_grid.vt_symbol + sell_volume = ordering_grid.volume - ordering_grid.traded_volume - vt_symbol = ordering_grid.vt_symbol - sell_volume = ordering_grid.volume - ordering_grid.traded_volume + if sell_volume > acc_symbol_pos.volume: + self.write_error(u'账号{}持仓{},不满足减仓目标:{}' + .format(vt_symbol, acc_symbol_pos.volume, sell_volume)) + continue - if sell_volume > acc_symbol_pos.volume: - self.write_error(u'账号{}持仓{},不满足减仓目标:{}' - .format(vt_symbol, acc_symbol_pos.volume, sell_volume)) - return + # 实盘运行时,要加入市场买卖量的判断 + if not self.backtesting: + symbol_tick = self.cta_engine.get_tick(vt_symbol) + if symbol_tick is None: + self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=vt_symbol) + self.write_log(f'获取不到{vt_symbol}得tick,无法根据市场深度进行计算') + continue - # 实盘运行时,要加入市场买卖量的判断 - if not self.backtesting: - symbol_tick = self.cta_engine.get_tick(vt_symbol) - if symbol_tick is None: - self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=vt_symbol) - self.write_log(f'获取不到{vt_symbol}得tick,无法根据市场深度进行计算') - return + symbol_volume_tick = self.cta_engine.get_volume_tick(vt_symbol) + # 根据市场计算,前5档买单数量 + if all([symbol_tick.ask_volume_1, symbol_tick.ask_volume_2, symbol_tick.ask_volume_3, + symbol_tick.ask_volume_4, symbol_tick.ask_volume_5]) \ + and all( + [symbol_tick.bid_volume_1, symbol_tick.bid_volume_2, symbol_tick.bid_volume_3, symbol_tick.bid_volume_4, + symbol_tick.bid_volume_5]): + market_ask_volumes = symbol_tick.ask_volume_1 + symbol_tick.ask_volume_2 + symbol_tick.ask_volume_3 + symbol_tick.ask_volume_4 + symbol_tick.ask_volume_5 + market_bid_volumes = symbol_tick.bid_volume_1 + symbol_tick.bid_volume_2 + symbol_tick.bid_volume_3 + symbol_tick.bid_volume_4 + symbol_tick.bid_volume_5 + org_sell_volume = sell_volume + if market_bid_volumes > 0 and market_ask_volumes > 0 and org_sell_volume >= 2 * symbol_volume_tick: + sell_volume = min(market_bid_volumes / 4, market_ask_volumes / 4, sell_volume) + sell_volume = max(round_to(value=sell_volume, target=symbol_volume_tick), symbol_volume_tick) + if org_sell_volume != sell_volume: + self.write_log(u'修正批次卖出{}数量:{}=>{}'.format(vt_symbol, org_sell_volume, sell_volume)) - symbol_volume_tick = self.cta_engine.get_volume_tick(vt_symbol) - # 根据市场计算,前5档买单数量 - if all([symbol_tick.ask_volume_1, symbol_tick.ask_volume_2, symbol_tick.ask_volume_3, - symbol_tick.ask_volume_4, symbol_tick.ask_volume_5]) \ - and all( - [symbol_tick.bid_volume_1, symbol_tick.bid_volume_2, symbol_tick.bid_volume_3, symbol_tick.bid_volume_4, - symbol_tick.bid_volume_5]): - market_ask_volumes = symbol_tick.ask_volume_1 + symbol_tick.ask_volume_2 + symbol_tick.ask_volume_3 + symbol_tick.ask_volume_4 + symbol_tick.ask_volume_5 - market_bid_volumes = symbol_tick.bid_volume_1 + symbol_tick.bid_volume_2 + symbol_tick.bid_volume_3 + symbol_tick.bid_volume_4 + symbol_tick.bid_volume_5 - org_sell_volume = sell_volume - if market_bid_volumes > 0 and market_ask_volumes > 0 and org_sell_volume >= 2 * symbol_volume_tick: - sell_volume = min(market_bid_volumes / 4, market_ask_volumes / 4, sell_volume) - sell_volume = max(round_to(value=sell_volume, target=symbol_volume_tick), symbol_volume_tick) - if org_sell_volume != sell_volume: - self.write_log(u'修正批次卖出{}数量:{}=>{}'.format(vt_symbol, org_sell_volume, sell_volume)) - - # 获取当前价格 - sell_price = self.cta_engine.get_price(vt_symbol) - self.cta_engine.get_price_tick(vt_symbol) - # 发出委托卖出 - vt_orderids = self.sell( - vt_symbol=vt_symbol, - price=sell_price, - volume=sell_volume, - order_time=self.cur_datetime, - grid=ordering_grid) - if vt_orderids is None or len(vt_orderids) == 0: - self.write_error(f'{vt_symbol} 委托卖出失败,委托价:{sell_price} 数量:{sell_volume}') - return - else: - self.write_log(f'{vt_symbol} 已委托卖出,{sell_volume},委托价:{sell_price}, 数量:{sell_volume}') + # 获取当前价格 + sell_price = self.cta_engine.get_price(vt_symbol) - self.cta_engine.get_price_tick(vt_symbol) + # 发出委托卖出 + vt_orderids = self.sell( + vt_symbol=vt_symbol, + price=sell_price, + volume=sell_volume, + order_time=self.cur_datetime, + grid=ordering_grid) + if vt_orderids is None or len(vt_orderids) == 0: + self.write_error(f'{vt_symbol} 委托卖出失败,委托价:{sell_price} 数量:{sell_volume}') + continue + else: + self.write_log(f'{vt_symbol} 已委托卖出,{sell_volume},委托价:{sell_price}, 数量:{sell_volume}') def tns_finish_sell_grid(self, grid): @@ -1128,68 +1123,63 @@ class CtaStockTemplate(CtaTemplate): self.tns_finish_buy_grid(grid) return - # 定位到首个满足条件的网格,跳出循环 + # 定位到首个满足条件的网格, ordering_grid = grid - break - # 没有满足条件的网格 - if ordering_grid is None: - return + balance, availiable, _, _ = self.cta_engine.get_account() + if availiable <= 0: + self.write_error(u'当前可用资金不足'.format(availiable)) + continue + vt_symbol = ordering_grid.vt_symbol + cur_price = self.cta_engine.get_price(vt_symbol) + if cur_price is None: + self.write_error(f'暂时不能获取{vt_symbol}最新价格') + continue - balance, availiable, _, _ = self.cta_engine.get_account() - if availiable <= 0: - self.write_error(u'当前可用资金不足'.format(availiable)) - return - vt_symbol = ordering_grid.vt_symbol - cur_price = self.cta_engine.get_price(vt_symbol) - if cur_price is None: - self.write_error(f'暂时不能获取{vt_symbol}最新价格') - return + buy_volume = ordering_grid.volume - ordering_grid.traded_volume + min_trade_volume = self.cta_engine.get_volume_tick(vt_symbol) + if availiable < buy_volume * cur_price: + self.write_error(f'可用资金{availiable},不满足买入{vt_symbol},数量:{buy_volume} X价格{cur_price}') + max_buy_volume = int(availiable / cur_price) + max_buy_volume = max_buy_volume - max_buy_volume % min_trade_volume + if max_buy_volume <= min_trade_volume: + continue + # 计划买入数量,与可用资金买入数量的差别 + diff_volume = buy_volume - max_buy_volume + # 降低计划买入数量 + self.write_log(f'总计划{vt_symbol}买入数量:{ordering_grid.volume}=>{ordering_grid.volume - diff_volume}') + ordering_grid.volume -= diff_volume + self.gt.save() + buy_volume = max_buy_volume - buy_volume = ordering_grid.volume - ordering_grid.traded_volume - min_trade_volume = self.cta_engine.get_volume_tick(vt_symbol) - if availiable < buy_volume * cur_price: - self.write_error(f'可用资金{availiable},不满足买入{vt_symbol},数量:{buy_volume} X价格{cur_price}') - max_buy_volume = int(availiable / cur_price) - max_buy_volume = max_buy_volume - max_buy_volume % min_trade_volume - if max_buy_volume <= min_trade_volume: - return - # 计划买入数量,与可用资金买入数量的差别 - diff_volume = buy_volume - max_buy_volume - # 降低计划买入数量 - self.write_log(f'总计划{vt_symbol}买入数量:{ordering_grid.volume}=>{ordering_grid.volume - diff_volume}') - ordering_grid.volume -= diff_volume - self.gt.save() - buy_volume = max_buy_volume + # 实盘运行时,要加入市场买卖量的判断 + if not self.backtesting and 'market' in ordering_grid.snapshot: + symbol_tick = self.cta_engine.get_tick(vt_symbol) + # 根据市场计算,前5档买单数量 + if all([symbol_tick.ask_volume_1, symbol_tick.ask_volume_2, symbol_tick.ask_volume_3, + symbol_tick.ask_volume_4, symbol_tick.ask_volume_5]) \ + and all( + [symbol_tick.bid_volume_1, symbol_tick.bid_volume_2, symbol_tick.bid_volume_3, symbol_tick.bid_volume_4, + symbol_tick.bid_volume_5]): + market_ask_volumes = symbol_tick.ask_volume_1 + symbol_tick.ask_volume_2 + symbol_tick.ask_volume_3 + symbol_tick.ask_volume_4 + symbol_tick.ask_volume_5 + market_bid_volumes = symbol_tick.bid_volume_1 + symbol_tick.bid_volume_2 + symbol_tick.bid_volume_3 + symbol_tick.bid_volume_4 + symbol_tick.bid_volume_5 + if market_bid_volumes > 0 and market_ask_volumes > 0: + buy_volume = min(market_bid_volumes / 4, market_ask_volumes / 4, buy_volume) + buy_volume = max(buy_volume - buy_volume % min_trade_volume, min_trade_volume) - # 实盘运行时,要加入市场买卖量的判断 - if not self.backtesting and 'market' in ordering_grid.snapshot: - symbol_tick = self.cta_engine.get_tick(vt_symbol) - # 根据市场计算,前5档买单数量 - if all([symbol_tick.ask_volume_1, symbol_tick.ask_volume_2, symbol_tick.ask_volume_3, - symbol_tick.ask_volume_4, symbol_tick.ask_volume_5]) \ - and all( - [symbol_tick.bid_volume_1, symbol_tick.bid_volume_2, symbol_tick.bid_volume_3, symbol_tick.bid_volume_4, - symbol_tick.bid_volume_5]): - market_ask_volumes = symbol_tick.ask_volume_1 + symbol_tick.ask_volume_2 + symbol_tick.ask_volume_3 + symbol_tick.ask_volume_4 + symbol_tick.ask_volume_5 - market_bid_volumes = symbol_tick.bid_volume_1 + symbol_tick.bid_volume_2 + symbol_tick.bid_volume_3 + symbol_tick.bid_volume_4 + symbol_tick.bid_volume_5 - if market_bid_volumes > 0 and market_ask_volumes > 0: - buy_volume = min(market_bid_volumes / 4, market_ask_volumes / 4, buy_volume) - buy_volume = max(buy_volume - buy_volume % min_trade_volume, min_trade_volume) + buy_price = cur_price + self.cta_engine.get_price_tick(vt_symbol) * 10 - buy_price = cur_price + self.cta_engine.get_price_tick(vt_symbol) * 10 - - vt_orderids = self.buy( - vt_symbol=vt_symbol, - price=buy_price, - volume=buy_volume, - order_time=self.cur_datetime, - grid=ordering_grid) - if vt_orderids is None or len(vt_orderids) == 0: - self.write_error(f'委托买入失败,{vt_symbol} 委托价:{buy_price} 数量:{buy_volume}') - return - else: - self.write_log(f'{self.strategy_name}, {vt_orderids},已委托买入,{vt_symbol} 委托价:{buy_price} 数量:{buy_volume}') + vt_orderids = self.buy( + vt_symbol=vt_symbol, + price=buy_price, + volume=buy_volume, + order_time=self.cur_datetime, + grid=ordering_grid) + if vt_orderids is None or len(vt_orderids) == 0: + self.write_error(f'委托买入失败,{vt_symbol} 委托价:{buy_price} 数量:{buy_volume}') + continue + else: + self.write_log(f'{self.strategy_name}, {vt_orderids},已委托买入,{vt_symbol} 委托价:{buy_price} 数量:{buy_volume}') def tns_finish_buy_grid(self, grid): """ diff --git a/vnpy/app/cta_strategy_pro/engine.py b/vnpy/app/cta_strategy_pro/engine.py index ab42ae35..578b82fe 100644 --- a/vnpy/app/cta_strategy_pro/engine.py +++ b/vnpy/app/cta_strategy_pro/engine.py @@ -237,7 +237,7 @@ class CtaEngine(BaseEngine): all_strategy_pos = self.get_all_strategy_pos() # 每5分钟检查一次 - if dt.minute % 5 == 0: + if dt.minute % 5 == 0 and self.engine_config.get('compare_pos',True): # 比对仓位,使用上述获取得持仓信息,不用重复获取 self.compare_pos(strategy_pos_list=copy(all_strategy_pos)) @@ -859,7 +859,10 @@ class CtaEngine(BaseEngine): tick = self.main_engine.get_tick(vt_symbol) if tick: - return tick.last_price + if '&' in tick.symbol: + return (tick.ask_price_1 + tick.bid_price_1) / 2 + else: + return tick.last_price return None diff --git a/vnpy/gateway/ctp/ctp_gateway.py b/vnpy/gateway/ctp/ctp_gateway.py index a8338330..74628c47 100644 --- a/vnpy/gateway/ctp/ctp_gateway.py +++ b/vnpy/gateway/ctp/ctp_gateway.py @@ -383,6 +383,7 @@ class CtpGateway(BaseGateway): # 增加映射( leg1 对应的合成器列表映射) leg1_symbol = setting.get('leg1_symbol') + leg1_exchange = Exchange(setting.get('leg1_exchange')) combiner_list = self.tick_combiner_map.get(leg1_symbol, []) if combiner not in combiner_list: self.write_log(u'添加Leg1:{}与合成器得映射'.format(leg1_symbol)) @@ -391,6 +392,7 @@ class CtpGateway(BaseGateway): # 增加映射( leg2 对应的合成器列表映射) leg2_symbol = setting.get('leg2_symbol') + leg2_exchange = Exchange(setting.get('leg2_exchange')) combiner_list = self.tick_combiner_map.get(leg2_symbol, []) if combiner not in combiner_list: self.write_log(u'添加Leg2:{}与合成器得映射'.format(leg2_symbol)) @@ -400,14 +402,14 @@ class CtpGateway(BaseGateway): self.write_log(u'订阅leg1:{}'.format(leg1_symbol)) leg1_req = SubscribeRequest( symbol=leg1_symbol, - exchange=symbol_exchange_map.get(leg1_symbol, Exchange.LOCAL) + exchange=leg1_exchange ) self.subscribe(leg1_req) self.write_log(u'订阅leg2:{}'.format(leg2_symbol)) leg2_req = SubscribeRequest( symbol=leg2_symbol, - exchange=symbol_exchange_map.get(leg1_symbol, Exchange.LOCAL) + exchange=leg2_exchange ) self.subscribe(leg2_req) @@ -646,6 +648,10 @@ class CtpMdApi(MdApi): gateway_name=self.gateway_name ) + # 处理一下标准套利合约的last_price + if '&' in symbol: + tick.last_price = (tick.ask_price_1 + tick.bid_price_1)/2 + if data["BidVolume2"] or data["AskVolume2"]: tick.bid_price_2 = adjust_price(data["BidPrice2"]) tick.bid_price_3 = adjust_price(data["BidPrice3"]) diff --git a/vnpy/gateway/ctptest/ctptest_gateway.py b/vnpy/gateway/ctptest/ctptest_gateway.py index 472a9d4b..9447e674 100644 --- a/vnpy/gateway/ctptest/ctptest_gateway.py +++ b/vnpy/gateway/ctptest/ctptest_gateway.py @@ -137,9 +137,9 @@ class CtptestGateway(BaseGateway): exchanges = list(EXCHANGE_CTP2VT.values()) - def __init__(self, event_engine): + def __init__(self, event_engine, gateway_name="CTPTEST"): """Constructor""" - super().__init__(event_engine, "CTPTEST") + super().__init__(event_engine, gateway_name) self.td_api = CtpTdApi(self) self.md_api = CtpMdApi(self) @@ -166,8 +166,9 @@ class CtptestGateway(BaseGateway): and (not md_address.startswith("ssl://")) ): md_address = "tcp://" + md_address - + print(f'连接交易前置机:{td_address}') self.td_api.connect(td_address, userid, password, brokerid, auth_code, appid, product_info) + print(f'连接行情前置机:{md_address}') self.md_api.connect(md_address, userid, password, brokerid) self.init_query() @@ -772,10 +773,10 @@ class CtpTdApi(TdApi): ctp_req = { "InstrumentID": req.symbol, - "ExchangeID": req.exchange, + "ExchangeID": req.exchange.value, "OrderRef": order_ref, "FrontID": int(frontid), - "SessionID": int(sessionid), + "SessionID": abs(int(sessionid)), "ActionFlag": THOST_FTDC_AF_Delete, "BrokerID": self.brokerid, "InvestorID": self.userid diff --git a/vnpy/gateway/ctptest/test.py b/vnpy/gateway/ctptest/test.py index 472c3dd4..0b5be720 100644 --- a/vnpy/gateway/ctptest/test.py +++ b/vnpy/gateway/ctptest/test.py @@ -12,7 +12,7 @@ if vnpy_root not in sys.path: from vnpy.gateway.ctptest import CtptestGateway from vnpy.event import EventEngine -from vnpy.trader.constant import Exchange +from vnpy.trader.constant import Exchange,OrderType from vnpy.trader.event import ( EVENT_TICK, EVENT_ORDER, @@ -22,17 +22,17 @@ from vnpy.trader.event import ( EVENT_LOG, ) from vnpy.trader.object import ( - SubscribeRequest, + SubscribeRequest,OrderRequest,Direction,Offset,CancelRequest ) # 这里放期货公司需要你连接的测试系统的相关信息 ctp_setting = { - "用户名": "xxx", - "密码": "xxx", - "经纪商代码": "9999", - "交易服务器": "tcp://180.168.146.187:10100", - "行情服务器": "tcp://180.168.146.187:10110", - "产品名称": "simnow_client_test", - "授权编码": "0000000000000000", + "用户名": "12000430", + "密码": "11112222w", + "经纪商代码": "0187", + "交易服务器": "tcp://110.87.99.14:61209", + "行情服务器": "tcp://110.87.99.14:61219", + "产品名称": "client_huafu_2.0.0", + "授权编码": "BON2HDZHJBKLXKUK", "产品信息": "" } @@ -63,10 +63,11 @@ def test(): event_engine.start() gateway = CtptestGateway(event_engine) + print(f'开始接入仿真测试:{ctp_setting}') gateway.connect(ctp_setting) # gateway.connect() - auto_subscribe_symbols = ['rb2010'] + auto_subscribe_symbols = ['rb2101'] for symbol in auto_subscribe_symbols: print(u'自动订阅合约:{}'.format(symbol)) sub = SubscribeRequest(symbol=symbol, exchange=Exchange.SHFE) @@ -81,6 +82,46 @@ def test(): sleep(1) couter -= 1 + for i in range(5): + print(f'发出rb2101的买入委托{i+1}') + order_req = OrderRequest( + strategy_name='', + symbol='rb2101', + exchange=Exchange.SHFE, + direction=Direction.LONG, + offset=Offset.OPEN, + type=OrderType.LIMIT, + price=3800, + volume=i+1 + ) + gateway.send_order(order_req) + + + for i in range(5): + print(f'发出rb2101的平仓委托{i+1}') + order_req = OrderRequest( + strategy_name='', + symbol='rb2101', + exchange=Exchange.SHFE, + direction=Direction.LONG, + offset=Offset.CLOSETODAY, + type=OrderType.LIMIT, + price=3801, + volume=i+1 + ) + gateway.send_order(order_req) + + # + for i in range(5): + print(f'发出rb2101的撤单委托{i + 1}') + cancel_req = CancelRequest( + orderid=f'5_-78969411_{i+1}', + symbol='rb2101', + exchange=Exchange.SHFE + ) + gateway.cancel_order(cancel_req) + + sys.exit(app.exec_()) diff --git a/vnpy/gateway/pb/pb_gateway.py b/vnpy/gateway/pb/pb_gateway.py index 4f206375..91e3e56a 100644 --- a/vnpy/gateway/pb/pb_gateway.py +++ b/vnpy/gateway/pb/pb_gateway.py @@ -2023,14 +2023,14 @@ class PbTdApi(object): sys_orderid = str(data.wtxh) if order_status in [Status.NOTTRADED] and len(sys_orderid) > 0: - self.gateway.write_log(f'撤单:{data.__dict__}') + self.gateway.write_log(f'撤单:{data.wtxh}') cancel_data = (int(sys_orderid), None, None, None, None, None, None, None) cancel_table.append(cancel_data) orders_table.close() cancel_table.close() - except Exchange as ex: + except Exception as ex: self.gateway.write_error(f'dbf全委托撤单异常:{str(ex)}') self.gateway.write_error(traceback.format_exc()) return False diff --git a/vnpy/trader/engine.py b/vnpy/trader/engine.py index 90c3dd11..54ece4af 100644 --- a/vnpy/trader/engine.py +++ b/vnpy/trader/engine.py @@ -532,7 +532,7 @@ class OmsEngine(BaseEngine): tick = event.data self.ticks[tick.vt_symbol] = tick - if tick.last_price: + if '&' not in tick.symbol and tick.last_price: self.prices[tick.vt_symbol] = tick.last_price def process_order_event(self, event: Event) -> None: