diff --git a/vnpy/gateway/oes/oes_td.py b/vnpy/gateway/oes/oes_td.py index 649aef7f..a7e8f435 100644 --- a/vnpy/gateway/oes/oes_td.py +++ b/vnpy/gateway/oes/oes_td.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timedelta, timezone from gettext import gettext as _ from threading import Lock, Thread # noinspection PyUnresolvedReferences @@ -7,15 +7,14 @@ from typing import Any, Callable, Dict, Tuple from vnpy.api.oes.vnoes import OesApiClientEnvT, OesApi_DestoryAll, OesApi_InitAllByConvention, \ OesApi_IsValidOrdChannel, OesApi_IsValidQryChannel, OesApi_IsValidRptChannel, OesApi_LogoutAll, \ - OesApi_QueryCashAsset, OesApi_QueryOptHolding, \ - OesApi_QueryOption, OesApi_QueryOrder, OesApi_QueryStkHolding, OesApi_QueryStock, \ - OesApi_SendOrderCancelReq, OesApi_SendOrderReq, OesApi_WaitReportMsg, OesOrdCancelReqT, \ + OesApi_QueryCashAsset, OesApi_QueryOptHolding, OesApi_QueryOption, OesApi_QueryOrder, \ + OesApi_QueryStkHolding, OesApi_QueryStock, OesApi_SendOrderCancelReq, OesApi_SendOrderReq, \ + OesApi_SetThreadPassword, OesApi_SetThreadUsername, OesApi_WaitReportMsg, OesOrdCancelReqT, \ OesOrdCnfmT, OesOrdRejectT, OesOrdReqT, OesQryCashAssetFilterT, OesQryCursorT, \ - OesQryOptionFilterT, OesQryOrdFilterT, OesQryStkHoldingFilterT, \ - OesQryStockFilterT, OesRspMsgBodyT, OesStockBaseInfoT, OesTrdCnfmT, SGeneralClientChannelT, \ - SMSG_PROTO_BINARY, SMsgHeadT, SPlatform_IsNegEpipe, SPlatform_IsNegEtimeout, cast, \ - eOesBuySellTypeT, eOesMarketIdT, eOesMsgTypeT, eOesOrdStatusT, eOesOrdTypeShT, eOesOrdTypeSzT, \ - OesApi_SetThreadUsername, OesApi_SetThreadPassword + OesQryOptionFilterT, OesQryOrdFilterT, OesQryStkHoldingFilterT, OesQryStockFilterT, \ + OesRspMsgBodyT, OesStockBaseInfoT, OesTrdCnfmT, SGeneralClientChannelT, SMSG_PROTO_BINARY, \ + SMsgHeadT, SPlatform_IsNegEpipe, cast, eOesBuySellTypeT, eOesMarketIdT, \ + eOesMsgTypeT, eOesOrdStatusT, eOesOrdTypeShT, eOesOrdTypeSzT from vnpy.gateway.oes.error_code import error_to_str from vnpy.trader.constant import Direction, Exchange, Offset, PriceType, Product, Status @@ -86,6 +85,8 @@ STATUS_OES2VT = { eOesOrdStatusT.OES_ORD_STATUS_INVALID_SZ_TRY_AGAIN: Status.REJECTED, } +bjtz = timezone(timedelta(hours=8)) + @dataclass class InternalOrder: @@ -93,6 +94,20 @@ class InternalOrder: vt_order: OrderData = None +def parse_oes_datetime(date: int, time: int): + # YYYYMMDD + year = int(date / 10000) + month = int((date % 10000) / 100) + day = int(date % 100) + + # HHMMSSsss + hour = int(time / 10000000) + minute = int((time % 10000000) / 100000) + sec = int((time % 100000) / 1000) + mill = int(time % 1000) + return datetime(year, month, day, hour, minute, sec, mill * 1000, tzinfo=bjtz) + + class OrderManager: def __init__(self): @@ -122,12 +137,6 @@ class OrderManager: def get_from_order_id(self, id: int): return self._orders[id] - def get_from_oes_data(self, data): - return data.clSeqNo - - def get_order_id_from_data(self, data): - return self.get_from_order_id(self.get_from_oes_data(data)) - class OesTdMessageLoop: @@ -138,12 +147,12 @@ class OesTdMessageLoop: td: "OesTdApi" ): self.gateway = gateway - self.env = env - self.order_manager = order_manager - self.td = td + self._td = td + self._env = env + self._order_manager = order_manager - self.alive = False - self.th = Thread(target=self.message_loop) + self._alive = False + self._th = Thread(target=self.message_loop) self.message_handlers: Dict[int, Callable[[dict], None]] = { eOesMsgTypeT.OESMSG_RPT_BUSINESS_REJECT: self.on_order_rejected, @@ -153,15 +162,20 @@ class OesTdMessageLoop: eOesMsgTypeT.OESMSG_RPT_STOCK_HOLDING_VARIATION: self.on_stock_holding, eOesMsgTypeT.OESMSG_RPT_OPTION_HOLDING_VARIATION: self.on_option_holding, eOesMsgTypeT.OESMSG_RPT_CASH_ASSET_VARIATION: self.on_cash, - eOesMsgTypeT.OESMSG_SESS_HEARTBEAT: lambda x: x, + + eOesMsgTypeT.OESMSG_RPT_REPORT_SYNCHRONIZATION: lambda x: 1, + eOesMsgTypeT.OESMSG_SESS_HEARTBEAT: lambda x: 1, } def start(self): - self.alive = True - self.th.start() + self._alive = True + self._th.start() + + def stop(self): + self._alive = False def join(self): - self.th.join() + self._th.join() def on_message(self, session_info: SGeneralClientChannelT, head: SMsgHeadT, @@ -179,15 +193,14 @@ class OesTdMessageLoop: def reconnect(self): self.gateway.write_log(_("正在尝试重新连接到交易服务器。")) - self.td.connect() + self._td.connect() def message_loop(self): - rpt_channel = self.env.rptChannel + rpt_channel = self._env.rptChannel timeout_ms = 1000 - is_timeout = SPlatform_IsNegEtimeout is_disconnected = SPlatform_IsNegEpipe - while self.alive: + while self._alive: ret = OesApi_WaitReportMsg(rpt_channel, timeout_ms, self.on_message) @@ -195,7 +208,9 @@ class OesTdMessageLoop: # if is_timeout(ret): # pass # just no message if is_disconnected(ret): - self.reconnect() + self.gateway.write_log(_("与交易服务器的连接已断开。")) + while not self.reconnect() and self._alive: + pass return def on_order_rejected(self, d: OesRspMsgBodyT): @@ -203,7 +218,7 @@ class OesTdMessageLoop: error_string = error_to_str(error_code) data: OesOrdRejectT = d.rptMsg.rptBody.ordRejectRsp if not data.origClSeqNo: - i = self.order_manager.get_from_order_id(data.clSeqNo) + i = self._order_manager.get_from_order_id(data.clSeqNo) vt_order = i.vt_order if vt_order == Status.ALLTRADED: @@ -215,33 +230,41 @@ class OesTdMessageLoop: self.gateway.write_log( f"Order: {vt_order.vt_symbol}-{vt_order.vt_orderid} Code: {error_code} Rejected: {error_string}") else: - self.gateway.write_log(f"Failed to cancel Order, id: {data.origClSeqNo}") + self.gateway.write_log(f"撤单失败,订单号: {data.origClSeqNo}。原因:{error_string}") def on_order_inserted(self, d: OesRspMsgBodyT): data = d.rptMsg.rptBody.ordInsertRsp - i = self.order_manager.get_from_oes_data(data) + if not data.origClSeqNo: + i = self._order_manager.get_from_order_id(data.clSeqNo) + else: + i = self._order_manager.get_from_order_id(data.origClSeqNo) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.volume = data.ordQty vt_order.traded = data.cumQty + vt_order.time = parse_oes_datetime(data.ordDate, data.ordTime) self.gateway.on_order(vt_order) def on_order_report(self, d: OesRspMsgBodyT): data: OesOrdCnfmT = d.rptMsg.rptBody.ordCnfm - i = self.order_manager.get_from_oes_data(data) + if not data.origClSeqNo: + i = self._order_manager.get_from_order_id(data.clSeqNo) + else: + i = self._order_manager.get_from_order_id(data.origClSeqNo) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.volume = data.ordQty vt_order.traded = data.cumQty + vt_order.time = parse_oes_datetime(data.ordDate, data.ordCnfmTime) self.gateway.on_order(vt_order) def on_trade_report(self, d: OesRspMsgBodyT): data: OesTrdCnfmT = d.rptMsg.rptBody.trdCnfm - i = self.order_manager.get_from_oes_data(data) + i = self._order_manager.get_from_order_id(data.clSeqNo) vt_order = i.vt_order # vt_order.status = STATUS_OES2VT[data.ordStatus] @@ -255,7 +278,7 @@ class OesTdMessageLoop: offset=vt_order.offset, price=data.trdPrice / 10000, volume=data.trdQty, - time=datetime.utcnow().isoformat() # strict + time=parse_oes_datetime(data.trdDate, data.trdTime) ) self.gateway.on_trade(trade) @@ -268,7 +291,7 @@ class OesTdMessageLoop: # Oes have no async call to query order only. # And calling sync function here will slow down vnpy. # So we queue it into another thread. - self.td.schedule_query_order(i) + self._td.schedule_query_order(i) def on_option_holding(self, d: OesRspMsgBodyT): pass @@ -305,9 +328,6 @@ class OesTdMessageLoop: self.gateway.on_account(account) return 1 - def stop(self): - self.alive = False - class OesTdApi: @@ -571,8 +591,6 @@ class OesTdApi: oes_req = OesOrdCancelReqT() order_id = int(vt_req.orderid) - internal_order = self._order_manager.get_from_order_id(order_id) - oes_req.origClOrdId = internal_order.order_id oes_req.mktId = EXCHANGE_VT2OES[vt_req.exchange] oes_req.clSeqNo = seq_id @@ -603,7 +621,8 @@ class OesTdApi: body: Any, cursor: OesQryCursorT): data: OesOrdCnfmT = cast.toOesOrdItemT(body) - i = self._order_manager.get_from_oes_data(data) + + i = self._order_manager.get_from_order_id(data.clSeqNo) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.volume = data.ordQty - data.canceledQty @@ -631,7 +650,7 @@ class OesTdApi: ): data: OesOrdCnfmT = cast.toOesOrdItemT(body) try: - i = self._order_manager.get_from_oes_data(data) + i = self._order_manager.get_from_order_id(data.clSeqNo) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.volume = data.ordQty - data.canceledQty @@ -639,7 +658,7 @@ class OesTdApi: self.gateway.on_order(vt_order) except KeyError: # order_id = self.order_manager.new_remote_id() - order_id = self._order_manager.get_order_id_from_data(data) + order_id = data.clSeqNo if data.bsType == eOesBuySellTypeT.OES_BS_TYPE_BUY: offset = Offset.OPEN