diff --git a/vnpy/gateway/oes/oes_gateway.py b/vnpy/gateway/oes/oes_gateway.py index 78701b05..8918cb00 100644 --- a/vnpy/gateway/oes/oes_gateway.py +++ b/vnpy/gateway/oes/oes_gateway.py @@ -94,11 +94,11 @@ class OesGateway(BaseGateway): self.td_api.password = password if self.td_api.connect(): self.write_log(_("成功连接到交易服务器")) - self.td_api.query_account() self.td_api.query_contracts() + # self.td_api.query_account() self.write_log("合约信息查询成功") - self.td_api.query_position() - self.td_api.query_orders() + # self.td_api.query_position() + # self.td_api.query_orders() self.td_api.start() else: self.write_log(_("无法连接到交易服务器,请检查你的配置")) diff --git a/vnpy/gateway/oes/oes_md.py b/vnpy/gateway/oes/oes_md.py index 19f3a38f..3b87a63f 100644 --- a/vnpy/gateway/oes/oes_md.py +++ b/vnpy/gateway/oes/oes_md.py @@ -40,8 +40,8 @@ class OesMdMessageLoop: self.message_handlers: Dict[eMdsMsgTypeT, Callable[[MdsMktRspMsgBodyT], int]] = { # tick & orderbook - eMdsMsgTypeT.MDS_MSGTYPE_MARKET_DATA_SNAPSHOT_FULL_REFRESH: self.on_market_full_refresh, - eMdsMsgTypeT.MDS_MSGTYPE_L2_MARKET_DATA_SNAPSHOT: self.on_l2_market_data_snapshot, + eMdsMsgTypeT.MDS_MSGTYPE_MARKET_DATA_SNAPSHOT_FULL_REFRESH: self.on_init_tick, + eMdsMsgTypeT.MDS_MSGTYPE_L2_MARKET_DATA_SNAPSHOT: self.on_l2_tick, eMdsMsgTypeT.MDS_MSGTYPE_L2_ORDER: self.on_l2_order, eMdsMsgTypeT.MDS_MSGTYPE_L2_TRADE: self.on_l2_trade, @@ -132,7 +132,7 @@ class OesMdMessageLoop: time.sleep(1) return - def on_l2_market_data_snapshot(self, d: MdsMktRspMsgBodyT): + def on_l2_tick(self, d: MdsMktRspMsgBodyT): """""" data: MdsL2StockSnapshotBodyT = d.mktDataSnapshot.l2Stock symbol = str(data.SecurityID) @@ -148,7 +148,7 @@ class OesMdMessageLoop: tick.__dict__['ask_price_' + str(i + 1)] = data.OfferLevels[i].Price / 10000 self.gateway.on_tick(copy(tick)) - def on_market_full_refresh(self, d: MdsMktRspMsgBodyT): + def on_init_tick(self, d: MdsMktRspMsgBodyT): """""" data: MdsStockSnapshotBodyT = d.mktDataSnapshot.stock symbol = data.SecurityID diff --git a/vnpy/gateway/oes/oes_td.py b/vnpy/gateway/oes/oes_td.py index e6aef6fa..53608767 100644 --- a/vnpy/gateway/oes/oes_td.py +++ b/vnpy/gateway/oes/oes_td.py @@ -120,13 +120,15 @@ class OesTdMessageLoop: def __init__(self, gateway: BaseGateway, env: OesApiClientEnvT, - td: "OesTdApi" + td: "OesTdApi", + order_manager: "OrderManager", ): """""" self.gateway = gateway self._env = env self._td = td + self._order_manager = order_manager self._alive = False self._th = Thread(target=self._message_loop) @@ -203,7 +205,7 @@ class OesTdMessageLoop: data: OesOrdRejectT = d.rptMsg.rptBody.ordRejectRsp if not data.origClSeqNo: try: - i = self._td.get_order(data.clSeqNo) + i = self._order_manager.get_order(data.clSeqNo) except KeyError: return # rejected order created by others, don't need to care. @@ -223,46 +225,22 @@ class OesTdMessageLoop: """""" data = d.rptMsg.rptBody.ordInsertRsp - if not data.origClSeqNo: - # normal order - i = self._td.get_order(data.clSeqNo) - else: - # data.ordStatus == eOesOrdStatusT.OES_ORD_STATUS_CANCEL_DONE: - i = self._td.get_order(data.origClSeqNo) - vt_order = i.vt_order - vt_order.status = STATUS_OES2VT[data.ordStatus] - vt_order.volume = data.ordQty - vt_order.traded = data.cumQty - vt_order.time = parse_oes_datetime(data.ordDate, data.ordTime) - + vt_order = self._order_manager.oes_order_to_vt(data) self.gateway.on_order(copy(vt_order)) def on_order_report(self, d: OesRspMsgBodyT): """""" data: OesOrdCnfmT = d.rptMsg.rptBody.ordCnfm - if not data.origClSeqNo: - # normal order - i = self._td.get_order(data.clSeqNo) - else: - # data.ordStatus == eOesOrdStatusT.OES_ORD_STATUS_CANCEL_DONE: - i = self._td.get_order(data.origClSeqNo) - vt_order = i.vt_order - - vt_order.status = STATUS_OES2VT[data.ordStatus] - vt_order.volume = data.ordQty - vt_order.traded = data.cumQty - vt_order.time = parse_oes_datetime(data.ordDate, data.ordCnfmTime) - + vt_order = self._order_manager.oes_order_to_vt(data) self.gateway.on_order(copy(vt_order)) def on_trade_report(self, d: OesRspMsgBodyT): """""" data: OesTrdCnfmT = d.rptMsg.rptBody.trdCnfm - i = self._td.get_order(data.clSeqNo) + i = self._order_manager.get_order(data.clSeqNo) vt_order = i.vt_order - # vt_order.status = STATUS_OES2VT[data.ordStatus] trade = TradeData( gateway_name=self.gateway.gateway_name, @@ -337,16 +315,17 @@ class OesTdApi: self._env = OesApiClientEnvT() + self._order_manager: "OrderManager" = OrderManager(self.gateway.gateway_name) self._message_loop = OesTdMessageLoop(gateway, self._env, - self) + self, + self._order_manager + ) self._last_seq_lock = Lock() self._last_seq_index = 1000000 # 0 has special manning for oes self._ord_reconnect_lock = Lock() - self._orders: Dict[int, InternalOrder] = {} - def connect(self): """Connect to trading server. :note set config_path before calling this function @@ -636,7 +615,7 @@ class OesTdApi: order = vt_req.create_order_data(str(order_id), self.gateway.gateway_name) order.direction = Direction.NET # fix direction into NET: stock only - self.save_order(order_id, order) + self._order_manager.save_order(order_id, order) ret = OesApi_SendOrderReq(self._env.ordChannel, oes_req @@ -694,7 +673,7 @@ class OesTdApi: """""" data: OesOrdCnfmT = cast.toOesOrdItemT(body) - i = self.get_order(data.clSeqNo) + i = self._order_manager.get_order(data.clSeqNo) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.volume = data.ordQty @@ -719,24 +698,35 @@ class OesTdApi: ): """""" data: OesOrdCnfmT = cast.toOesOrdItemT(body) + vt_order = self._order_manager.oes_order_to_vt(data) + self.gateway.on_order(vt_order) + return 1 + + +class OrderManager: + def __init__(self, gateway_name: str): + self._orders: Dict[int, InternalOrder] = {} + self.gateway_name = gateway_name + + def oes_order_to_vt(self, data): + order_id = data.clSeqNo + if hasattr(data, "origClSeqNo") and data.origClSeqNo: + order_id = data.origClSeqNo try: - i = self.get_order(data.clSeqNo) + i = self.get_order(order_id) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.volume = data.ordQty vt_order.traded = data.cumQty - self.gateway.on_order(copy(vt_order)) + vt_order.time = parse_oes_datetime(data.ordDate, data.ordTime) except KeyError: - # order_id = self.order_manager.new_remote_id() - order_id = data.clSeqNo - if data.bsType == eOesBuySellTypeT.OES_BS_TYPE_BUY: offset = Offset.OPEN else: offset = Offset.CLOSE vt_order = OrderData( - gateway_name=self.gateway.gateway_name, + gateway_name=self.gateway_name, symbol=data.securityId, exchange=EXCHANGE_OES2VT[data.mktId], orderid=str(order_id if order_id else data.origClSeqNo), # generated id @@ -744,17 +734,16 @@ class OesTdApi: direction=Direction.NET, offset=offset, price=data.ordPrice / 10000, - volume=data.ordQty - data.canceledQty, + volume=data.ordQty, traded=data.cumQty, status=STATUS_OES2VT[ data.ordStatus], # this time should be generated automatically or by a static function - time=datetime.utcnow().isoformat(), + time=parse_oes_datetime(data.ordDate, data.ordCnfmTime).isoformat(), ) self.save_order(order_id, vt_order) - self.gateway.on_order(copy(vt_order)) - return 1 + return vt_order def save_order(self, order_id: int, order: OrderData): """"""