diff --git a/vnpy/gateway/oes/md.py b/vnpy/gateway/oes/md.py index 8fbdf8df..e256f97d 100644 --- a/vnpy/gateway/oes/md.py +++ b/vnpy/gateway/oes/md.py @@ -114,8 +114,6 @@ class OesMdMessageLoop: data: MdsL2StockSnapshotBodyT = d.mktDataSnapshot.l2Stock symbol = str(data.SecurityID) tick = self.get_last_tick(symbol) - tick.limit_up = data.HighPx / 10000 - tick.limit_down = data.LowPx / 10000 tick.open_price = data.OpenPx / 10000 tick.pre_close = data.ClosePx / 10000 tick.high_price = data.HighPx / 10000 @@ -131,8 +129,6 @@ class OesMdMessageLoop: data: MdsStockSnapshotBodyT = d.mktDataSnapshot.stock symbol = data.SecurityID tick = self.get_last_tick(symbol) - tick.limit_up = data.HighPx / 10000 - tick.limit_down = data.LowPx / 10000 tick.open_price = data.OpenPx / 10000 tick.pre_close = data.ClosePx / 10000 tick.high_price = data.HighPx / 10000 diff --git a/vnpy/gateway/oes/oes_gateway.py b/vnpy/gateway/oes/oes_gateway.py index 83f9b492..9ef4d41b 100644 --- a/vnpy/gateway/oes/oes_gateway.py +++ b/vnpy/gateway/oes/oes_gateway.py @@ -19,30 +19,12 @@ class OesGateway(BaseGateway): VN Trader Gateway for BitMEX connection. """ - def on_tick(self, tick: TickData): - super().on_tick(tick) - - def on_trade(self, trade: TradeData): - super().on_trade(trade) - - def on_order(self, order: OrderData): - super().on_order(order) - - def on_position(self, position: PositionData): - super().on_position(position) - - def on_account(self, account: AccountData): - super().on_account(account) - - def on_contract(self, contract: ContractData): - super().on_contract(contract) - default_setting = { - "td_ord_server": "tcp://106.15.58.119:6101", - "td_rpt_server": "tcp://106.15.58.119:6301", - "td_qry_server": "tcp://106.15.58.119:6401", - "md_tcp_server": "tcp://139.196.228.232:5103", - "md_qry_server": "tcp://139.196.228.232:5203", + "td_ord_server": "", + "td_rpt_server": "", + "td_qry_server": "", + "md_tcp_server": "", + "md_qry_server": "", "username": "", "password": "", } @@ -81,10 +63,12 @@ class OesGateway(BaseGateway): f.write(content) self.td_api.connect(str(config_path)) + self.td_api.query_account() self.td_api.query_contracts() self.td_api.query_position() self.td_api.init_query_orders() + self.td_api.start() self.md_api.connect(str(config_path)) @@ -103,18 +87,17 @@ class OesGateway(BaseGateway): def cancel_order(self, req: CancelRequest): """""" - return self.td_api.cancel_order(req) + self.td_api.cancel_order(req) def query_account(self): """""" - return self.td_api.query_account() + self.td_api.query_account() def query_position(self): """""" - return self.query_position() + self.td_api.query_position() def close(self): """""" self.md_api.stop() self.td_api.stop() - pass diff --git a/vnpy/gateway/oes/td.py b/vnpy/gateway/oes/td.py index 37408d25..a74e7897 100644 --- a/vnpy/gateway/oes/td.py +++ b/vnpy/gateway/oes/td.py @@ -89,8 +89,6 @@ STATUS_OES2VT = { class InternalOrder: order_id: int = None vt_order: OrderData = None - req_data: OesOrdReqT = None - rpt_data: OesOrdCnfmT = None class OrderManager: @@ -99,9 +97,6 @@ class OrderManager: self.last_order_id = 100000000 self._orders: Dict[int, InternalOrder] = {} - # key tuple: seqNo, ordId, envId, userInfo - self._remote_created_orders: Dict[Tuple[int, int, int, int], InternalOrder] = {} - @staticmethod def hash_remote_order(data): key = (data.origClSeqNo, data.origClOrdId, data.origClEnvId, data.userInfo) @@ -112,59 +107,24 @@ class OrderManager: key = (data.clSeqNo, data.clOrdId, data.clEnvId, data.userInfo) return key - def new_local_id(self): - id = self.last_order_id - self.last_order_id += 1 - return id - - def new_remote_id(self): - id = self.last_order_id - self.last_order_id += 1 - return id - - def save_local_created(self, order_id: int, order: OrderData, oes_req: OesOrdReqT): + def save_local_created(self, order_id: int, order: OrderData): + print(f"saved order, id:{order_id}") self._orders[order_id] = InternalOrder( order_id=order_id, vt_order=order, - req_data=oes_req ) - def save_remote_created(self, order_id: int, vt_order: OrderData, data: OesOrdCnfmT): - internal_order = InternalOrder( - order_id=order_id, - vt_order=vt_order, - rpt_data=data - ) - self._orders[order_id] = internal_order - key = self.hash_remote_order(data) - self._remote_created_orders[key] = internal_order + def save_remote_created(self, order_id: int, vt_order: OrderData): + return self.save_local_created(order_id, vt_order) def get_from_order_id(self, id: int): return self._orders[id] - def get_remote_created_order_from_oes_data(self, data): - """ - :return: internal_order if succeed else None, will check only remote created order - """ - try: - key = self.hash_remote_order(data) - except AttributeError: - key = self.hash_remote_trade(data) - try: - return self._remote_created_orders[key] - except KeyError: - return None - def get_from_oes_data(self, data): - try: - key = self.hash_remote_order(data) - except AttributeError: - key = self.hash_remote_trade(data) - try: - return self._remote_created_orders[key] - except KeyError: - order_id = key[3] - return self._orders[order_id] + return data.clSeqNo + + def get_order_id_from_data(self, data): + return self.get_from_order_id(self.get_from_oes_data(data)) class OesTdMessageLoop: @@ -184,7 +144,7 @@ class OesTdMessageLoop: self.th = Thread(target=self.message_loop) self.message_handlers: Dict[int, Callable[[dict], None]] = { - eOesMsgTypeT.OESMSG_RPT_BUSINESS_REJECT: self.on_reject, + eOesMsgTypeT.OESMSG_RPT_BUSINESS_REJECT: self.on_order_rejected, eOesMsgTypeT.OESMSG_RPT_ORDER_INSERT: self.on_order_inserted, eOesMsgTypeT.OESMSG_RPT_ORDER_REPORT: self.on_order_report, eOesMsgTypeT.OESMSG_RPT_TRADE_REPORT: self.on_trade_report, @@ -216,13 +176,13 @@ class OesTdMessageLoop: return 1 def message_loop(self): - rtp_channel = self.env.rptChannel + rpt_channel = self.env.rptChannel timeout_ms = 1000 is_timeout = SPlatform_IsNegEtimeout is_disconnected = SPlatform_IsNegEpipe while self.alive: - ret = OesApi_WaitReportMsg(rtp_channel, + ret = OesApi_WaitReportMsg(rpt_channel, timeout_ms, self.on_message) if ret < 0: @@ -232,24 +192,26 @@ class OesTdMessageLoop: # todo: handle disconnected self.alive = False break - pass return - def on_reject(self, d: OesRspMsgBodyT): + def on_order_rejected(self, d: OesRspMsgBodyT): error_code = d.rptMsg.rptHead.ordRejReason error_string = error_to_str(error_code) data: OesOrdRejectT = d.rptMsg.rptBody.ordRejectRsp - i = self.order_manager.get_from_oes_data(data) - vt_order = i.vt_order + if not data.origClSeqNo: + i = self.order_manager.get_from_order_id(data.clSeqNo) + vt_order = i.vt_order - if vt_order == Status.ALLTRADED: - return + if vt_order == Status.ALLTRADED: + return - vt_order.status = Status.REJECTED + vt_order.status = Status.REJECTED - self.gateway.on_order(vt_order) - self.gateway.write_log( - f"Order: {vt_order.vt_symbol}-{vt_order.vt_orderid} Code: {error_code} Rejected: {error_string}") + self.gateway.on_order(vt_order) + self.gateway.write_log( + f"Order: {vt_order.vt_symbol}-{vt_order.vt_orderid} Code: {error_code} Rejected: {error_string}") + else: + self.gateway.write_log(f"Failed to cancel Order, id: {data.origClSeqNo}") def on_order_inserted(self, d: OesRspMsgBodyT): data = d.rptMsg.rptBody.ordInsertRsp @@ -257,7 +219,7 @@ class OesTdMessageLoop: i = self.order_manager.get_from_oes_data(data) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] - vt_order.volume = data.ordQty - data.canceledQty + vt_order.volume = data.ordQty vt_order.traded = data.cumQty self.gateway.on_order(vt_order) @@ -268,7 +230,7 @@ class OesTdMessageLoop: i = self.order_manager.get_from_oes_data(data) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] - vt_order.volume = data.ordQty - data.canceledQty + vt_order.volume = data.ordQty vt_order.traded = data.cumQty self.gateway.on_order(vt_order) @@ -279,6 +241,7 @@ class OesTdMessageLoop: vt_order = i.vt_order # vt_order.status = STATUS_OES2VT[data.ordStatus] + trade = TradeData( gateway_name=self.gateway.gateway_name, symbol=data.securityId, @@ -315,10 +278,10 @@ class OesTdMessageLoop: exchange=EXCHANGE_OES2VT[data.mktId], direction=Direction.NET, volume=data.sumHld, - frozen=data.lockHld, + frozen=data.lockHld, # todo: to verify price=data.costPrice / 10000, # pnl=data.costPrice - data.originalCostAmt, - pnl=0, # todo: oes只提供日初持仓价格信息,不提供最初持仓价格信息,所以pnl只有当日的 + pnl=0, yd_volume=data.originalHld, ) self.gateway.on_position(position) @@ -329,7 +292,7 @@ class OesTdMessageLoop: balance = data.currentTotalBal availiable = data.currentAvailableBal # drawable = data.currentDrawableBal - account_id = data.custId + account_id = data.cashAcctId account = AccountData( gateway_name=self.gateway.gateway_name, accountid=account_id, @@ -356,12 +319,19 @@ class OesTdApi: self) self.account_id = None - self.last_seq_index = 1 # 0 has special manning for oes + self.last_seq_index = 1000000 # 0 has special manning for oes + + def get_new_seq_index(self): + """note: not thread safe currently""" + # todo: add lock + index = self.last_seq_index + self.last_seq_index += 1 + return index def connect(self, config_path: str): if not OesApi_InitAllByConvention(self.env, config_path, -1, self.last_seq_index): pass - self.last_seq_index = self.env.ordChannel.lastOutMsgSeq + 1 + self.last_seq_index = max(self.last_seq_index, self.env.ordChannel.lastOutMsgSeq + 1) if not OesApi_IsValidOrdChannel(self.env.ordChannel): pass @@ -383,15 +353,11 @@ class OesTdApi: def join(self): self.message_loop.join() - def query_account(self) -> bool: - return self.query_cash_asset() - - def query_cash_asset(self) -> bool: - ret = OesApi_QueryCashAsset(self.env.qryChannel, + def query_account(self): + OesApi_QueryCashAsset(self.env.qryChannel, OesQryCashAssetFilterT(), self.on_query_asset ) - return ret >= 0 def on_query_asset(self, session_info: SGeneralClientChannelT, @@ -403,7 +369,7 @@ class OesTdApi: balance = data.currentTotalBal / 10000 availiable = data.currentAvailableBal / 10000 # drawable = data.currentDrawableBal - account_id = data.custId + account_id = data.cashAcctId account = AccountData( gateway_name=self.gateway.gateway_name, accountid=account_id, @@ -469,60 +435,6 @@ class OesTdApi: self.gateway.on_contract(contract) return 1 - def query_issue(self) -> bool: - f = OesQryIssueFilterT() - ret = OesApi_QueryIssue(self.env.qryChannel, - f, - self.on_query_issue - ) - return ret >= 0 - - def on_query_issue(self, - session_info: SGeneralClientChannelT, - head: SMsgHeadT, - body: Any, - cursor: OesQryCursorT, - ): - data = cast.toOesIssueItemT(body) - contract = ContractData( - gateway_name=self.gateway.gateway_name, - symbol=data.securityId, - exchange=EXCHANGE_OES2VT[data.mktId], - name=data.securityName, - product=PRODUCT_OES2VT[data.mktId], - size=data.qtyUnit, - pricetick=1, - ) - self.gateway.on_contract(contract) - return 1 - - def query_etf(self) -> bool: - f = OesQryEtfFilterT() - ret = OesApi_QueryEtf(self.env.qryChannel, - f, - self.on_query_etf - ) - return ret >= 0 - - def on_query_etf(self, - session_info: SGeneralClientChannelT, - head: SMsgHeadT, - body: Any, - cursor: OesQryCursorT, - ): - data = cast.toOesEtfItemT(body) - contract = ContractData( - gateway_name=self.gateway.gateway_name, - symbol=data.securityId, - exchange=EXCHANGE_OES2VT[data.mktId], - name=data.securityId, - product=PRODUCT_OES2VT[data.mktId], - size=data.creRdmUnit, # todo: to verify! creRdmUnit : 每个篮子 (最小申购、赎回单位) 对应的ETF份数 - pricetick=1, - ) - self.gateway.on_contract(contract) - return 1 - def query_stock_holding(self) -> bool: f = OesQryStkHoldingFilterT() ret = OesApi_QueryStkHolding(self.env.qryChannel, @@ -608,16 +520,14 @@ class OesTdApi: self.query_stock() # self.query_option() # self.query_issue() - pass def query_position(self): self.query_stock_holding() self.query_option_holding() def send_order(self, vt_req: OrderRequest): - seq_id = self.last_seq_index - self.last_seq_index += 1 # note: thread un-safe here, conflict with on_query_order - order_id = self.order_manager.new_local_id() + seq_id = self.get_new_seq_index() + order_id = seq_id oes_req = OesOrdReqT() oes_req.clSeqNo = seq_id @@ -628,16 +538,16 @@ class OesTdApi: oes_req.securityId = vt_req.symbol oes_req.ordQty = int(vt_req.volume) oes_req.ordPrice = int(vt_req.price * 10000) - oes_req.userInfo = order_id + oes_req.origClOrdId = order_id + order = vt_req.create_order_data(str(order_id), self.gateway.gateway_name) + order.direction = Direction.NET # fix direction into NET: stock only + self.order_manager.save_local_created(order_id, order) ret = OesApi_SendOrderReq(self.env.ordChannel, oes_req ) - order = vt_req.create_order_data(str(order_id), self.gateway.gateway_name) - order.direction = Direction.NET # fix direction into NET: stock only if ret >= 0: - self.order_manager.save_local_created(order_id, order, oes_req) self.gateway.on_order(order) else: self.gateway.write_log("Failed to send_order!") @@ -645,40 +555,21 @@ class OesTdApi: return order.vt_orderid def cancel_order(self, vt_req: CancelRequest): - seq_id = self.last_seq_index - self.last_seq_index += 1 # note: thread un-safe here + seq_id = self.get_new_seq_index() oes_req = OesOrdCancelReqT() order_id = int(vt_req.orderid) internal_order = self.order_manager.get_from_order_id(order_id) - if internal_order.rpt_data: - data = internal_order.rpt_data - # oes_req.origClSeqNo = self.local_id_to_sys_id[int(order_id)] - oes_req.origClOrdId = data.clOrdId - oes_req.origClSeqNo = data.clSeqNo - oes_req.origClEnvId = data.origClEnvId - oes_req.mktId = data.mktId - # oes_req.invAcctId = data.invAcctId - # oes_req.mktId = data.mktId - # oes_req.securityId = data.securityId - else: - data = internal_order.req_data - oes_req.origClSeqNo = data.clSeqNo - oes_req.mktId = internal_order.req_data.mktId + oes_req.origClOrdId = internal_order.order_id + oes_req.mktId = EXCHANGE_VT2OES[vt_req.exchange] oes_req.clSeqNo = seq_id + oes_req.origClSeqNo = order_id oes_req.invAcctId = "" oes_req.securityId = vt_req.symbol - oes_req.userInfo = order_id - ret = OesApi_SendOrderCancelReq(self.env.ordChannel, + OesApi_SendOrderCancelReq(self.env.ordChannel, oes_req) - if ret >= 0: - pass - else: - pass - return - def schedule_query_order(self, internal_order: InternalOrder) -> Thread: th = Thread(target=self.query_order, args=(internal_order,)) th.start() @@ -686,16 +577,8 @@ class OesTdApi: def query_order(self, internal_order: InternalOrder) -> bool: f = OesQryOrdFilterT() - if internal_order.req_data: - f.clSeqNo = internal_order.req_data.clSeqNo - f.mktId = internal_order.req_data.mktId - f.invAcctId = internal_order.req_data.invAcctId - else: - f.clSeqNo = internal_order.rpt_data.origClSeqNo - f.clOrdId = internal_order.rpt_data.origClOrdId - f.clEnvId = internal_order.rpt_data.origClEnvId - f.mktId = internal_order.rpt_data.mktId - f.invAcctId = internal_order.rpt_data.invAcctId + f.mktId = EXCHANGE_VT2OES[internal_order.vt_order.exchange] + f.clSeqNo = internal_order.order_id ret = OesApi_QueryOrder(self.env.qryChannel, f, self.on_query_order @@ -735,9 +618,16 @@ class OesTdApi: cursor: OesQryCursorT, ): data: OesOrdCnfmT = cast.toOesOrdItemT(body) - i = self.order_manager.get_remote_created_order_from_oes_data(data) - if not i: - order_id = self.order_manager.new_remote_id() + try: + i = self.order_manager.get_from_oes_data(data) + vt_order = i.vt_order + vt_order.status = STATUS_OES2VT[data.ordStatus] + vt_order.volume = data.ordQty - data.canceledQty + vt_order.traded = data.cumQty + self.gateway.on_order(vt_order) + except KeyError: + # order_id = self.order_manager.new_remote_id() + order_id = self.order_manager.get_order_id_from_data(data) if data.bsType == eOesBuySellTypeT.OES_BS_TYPE_BUY: offset = Offset.OPEN @@ -748,7 +638,7 @@ class OesTdApi: gateway_name=self.gateway.gateway_name, symbol=data.securityId, exchange=EXCHANGE_OES2VT[data.mktId], - orderid=order_id if order_id else data.userInfo, # generated id + orderid=order_id if order_id else data.origClSeqNo, # generated id direction=Direction.NET, offset=offset, price=data.ordPrice / 10000, @@ -759,13 +649,6 @@ class OesTdApi: # this time should be generated automatically or by a static function time=datetime.utcnow().isoformat(), ) - self.order_manager.save_remote_created(order_id, vt_order, data) + self.order_manager.save_remote_created(order_id, vt_order) self.gateway.on_order(vt_order) - return 1 - else: - vt_order = i.vt_order - vt_order.status = STATUS_OES2VT[data.ordStatus] - vt_order.volume = data.ordQty - data.canceledQty - vt_order.traded = data.cumQty - self.gateway.on_order(vt_order) - return 1 + return 1