From a280758809e20ba075024662cdd74516f98510a9 Mon Sep 17 00:00:00 2001 From: nanoric Date: Thu, 7 Mar 2019 23:13:18 -0400 Subject: [PATCH 01/13] dev --- vnpy/gateway/oes/md.py | 4 - vnpy/gateway/oes/oes_gateway.py | 37 ++--- vnpy/gateway/oes/td.py | 251 +++++++++----------------------- 3 files changed, 77 insertions(+), 215 deletions(-) diff --git a/vnpy/gateway/oes/md.py b/vnpy/gateway/oes/md.py index 8fbdf8df..e256f97d 100644 --- a/vnpy/gateway/oes/md.py +++ b/vnpy/gateway/oes/md.py @@ -114,8 +114,6 @@ class OesMdMessageLoop: data: MdsL2StockSnapshotBodyT = d.mktDataSnapshot.l2Stock symbol = str(data.SecurityID) tick = self.get_last_tick(symbol) - tick.limit_up = data.HighPx / 10000 - tick.limit_down = data.LowPx / 10000 tick.open_price = data.OpenPx / 10000 tick.pre_close = data.ClosePx / 10000 tick.high_price = data.HighPx / 10000 @@ -131,8 +129,6 @@ class OesMdMessageLoop: data: MdsStockSnapshotBodyT = d.mktDataSnapshot.stock symbol = data.SecurityID tick = self.get_last_tick(symbol) - tick.limit_up = data.HighPx / 10000 - tick.limit_down = data.LowPx / 10000 tick.open_price = data.OpenPx / 10000 tick.pre_close = data.ClosePx / 10000 tick.high_price = data.HighPx / 10000 diff --git a/vnpy/gateway/oes/oes_gateway.py b/vnpy/gateway/oes/oes_gateway.py index 83f9b492..9ef4d41b 100644 --- a/vnpy/gateway/oes/oes_gateway.py +++ b/vnpy/gateway/oes/oes_gateway.py @@ -19,30 +19,12 @@ class OesGateway(BaseGateway): VN Trader Gateway for BitMEX connection. """ - def on_tick(self, tick: TickData): - super().on_tick(tick) - - def on_trade(self, trade: TradeData): - super().on_trade(trade) - - def on_order(self, order: OrderData): - super().on_order(order) - - def on_position(self, position: PositionData): - super().on_position(position) - - def on_account(self, account: AccountData): - super().on_account(account) - - def on_contract(self, contract: ContractData): - super().on_contract(contract) - default_setting = { - "td_ord_server": "tcp://106.15.58.119:6101", - "td_rpt_server": "tcp://106.15.58.119:6301", - "td_qry_server": "tcp://106.15.58.119:6401", - "md_tcp_server": "tcp://139.196.228.232:5103", - "md_qry_server": "tcp://139.196.228.232:5203", + "td_ord_server": "", + "td_rpt_server": "", + "td_qry_server": "", + "md_tcp_server": "", + "md_qry_server": "", "username": "", "password": "", } @@ -81,10 +63,12 @@ class OesGateway(BaseGateway): f.write(content) self.td_api.connect(str(config_path)) + self.td_api.query_account() self.td_api.query_contracts() self.td_api.query_position() self.td_api.init_query_orders() + self.td_api.start() self.md_api.connect(str(config_path)) @@ -103,18 +87,17 @@ class OesGateway(BaseGateway): def cancel_order(self, req: CancelRequest): """""" - return self.td_api.cancel_order(req) + self.td_api.cancel_order(req) def query_account(self): """""" - return self.td_api.query_account() + self.td_api.query_account() def query_position(self): """""" - return self.query_position() + self.td_api.query_position() def close(self): """""" self.md_api.stop() self.td_api.stop() - pass diff --git a/vnpy/gateway/oes/td.py b/vnpy/gateway/oes/td.py index 37408d25..a74e7897 100644 --- a/vnpy/gateway/oes/td.py +++ b/vnpy/gateway/oes/td.py @@ -89,8 +89,6 @@ STATUS_OES2VT = { class InternalOrder: order_id: int = None vt_order: OrderData = None - req_data: OesOrdReqT = None - rpt_data: OesOrdCnfmT = None class OrderManager: @@ -99,9 +97,6 @@ class OrderManager: self.last_order_id = 100000000 self._orders: Dict[int, InternalOrder] = {} - # key tuple: seqNo, ordId, envId, userInfo - self._remote_created_orders: Dict[Tuple[int, int, int, int], InternalOrder] = {} - @staticmethod def hash_remote_order(data): key = (data.origClSeqNo, data.origClOrdId, data.origClEnvId, data.userInfo) @@ -112,59 +107,24 @@ class OrderManager: key = (data.clSeqNo, data.clOrdId, data.clEnvId, data.userInfo) return key - def new_local_id(self): - id = self.last_order_id - self.last_order_id += 1 - return id - - def new_remote_id(self): - id = self.last_order_id - self.last_order_id += 1 - return id - - def save_local_created(self, order_id: int, order: OrderData, oes_req: OesOrdReqT): + def save_local_created(self, order_id: int, order: OrderData): + print(f"saved order, id:{order_id}") self._orders[order_id] = InternalOrder( order_id=order_id, vt_order=order, - req_data=oes_req ) - def save_remote_created(self, order_id: int, vt_order: OrderData, data: OesOrdCnfmT): - internal_order = InternalOrder( - order_id=order_id, - vt_order=vt_order, - rpt_data=data - ) - self._orders[order_id] = internal_order - key = self.hash_remote_order(data) - self._remote_created_orders[key] = internal_order + def save_remote_created(self, order_id: int, vt_order: OrderData): + return self.save_local_created(order_id, vt_order) def get_from_order_id(self, id: int): return self._orders[id] - def get_remote_created_order_from_oes_data(self, data): - """ - :return: internal_order if succeed else None, will check only remote created order - """ - try: - key = self.hash_remote_order(data) - except AttributeError: - key = self.hash_remote_trade(data) - try: - return self._remote_created_orders[key] - except KeyError: - return None - def get_from_oes_data(self, data): - try: - key = self.hash_remote_order(data) - except AttributeError: - key = self.hash_remote_trade(data) - try: - return self._remote_created_orders[key] - except KeyError: - order_id = key[3] - return self._orders[order_id] + return data.clSeqNo + + def get_order_id_from_data(self, data): + return self.get_from_order_id(self.get_from_oes_data(data)) class OesTdMessageLoop: @@ -184,7 +144,7 @@ class OesTdMessageLoop: self.th = Thread(target=self.message_loop) self.message_handlers: Dict[int, Callable[[dict], None]] = { - eOesMsgTypeT.OESMSG_RPT_BUSINESS_REJECT: self.on_reject, + eOesMsgTypeT.OESMSG_RPT_BUSINESS_REJECT: self.on_order_rejected, eOesMsgTypeT.OESMSG_RPT_ORDER_INSERT: self.on_order_inserted, eOesMsgTypeT.OESMSG_RPT_ORDER_REPORT: self.on_order_report, eOesMsgTypeT.OESMSG_RPT_TRADE_REPORT: self.on_trade_report, @@ -216,13 +176,13 @@ class OesTdMessageLoop: return 1 def message_loop(self): - rtp_channel = self.env.rptChannel + rpt_channel = self.env.rptChannel timeout_ms = 1000 is_timeout = SPlatform_IsNegEtimeout is_disconnected = SPlatform_IsNegEpipe while self.alive: - ret = OesApi_WaitReportMsg(rtp_channel, + ret = OesApi_WaitReportMsg(rpt_channel, timeout_ms, self.on_message) if ret < 0: @@ -232,24 +192,26 @@ class OesTdMessageLoop: # todo: handle disconnected self.alive = False break - pass return - def on_reject(self, d: OesRspMsgBodyT): + def on_order_rejected(self, d: OesRspMsgBodyT): error_code = d.rptMsg.rptHead.ordRejReason error_string = error_to_str(error_code) data: OesOrdRejectT = d.rptMsg.rptBody.ordRejectRsp - i = self.order_manager.get_from_oes_data(data) - vt_order = i.vt_order + if not data.origClSeqNo: + i = self.order_manager.get_from_order_id(data.clSeqNo) + vt_order = i.vt_order - if vt_order == Status.ALLTRADED: - return + if vt_order == Status.ALLTRADED: + return - vt_order.status = Status.REJECTED + vt_order.status = Status.REJECTED - self.gateway.on_order(vt_order) - self.gateway.write_log( - f"Order: {vt_order.vt_symbol}-{vt_order.vt_orderid} Code: {error_code} Rejected: {error_string}") + self.gateway.on_order(vt_order) + self.gateway.write_log( + f"Order: {vt_order.vt_symbol}-{vt_order.vt_orderid} Code: {error_code} Rejected: {error_string}") + else: + self.gateway.write_log(f"Failed to cancel Order, id: {data.origClSeqNo}") def on_order_inserted(self, d: OesRspMsgBodyT): data = d.rptMsg.rptBody.ordInsertRsp @@ -257,7 +219,7 @@ class OesTdMessageLoop: i = self.order_manager.get_from_oes_data(data) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] - vt_order.volume = data.ordQty - data.canceledQty + vt_order.volume = data.ordQty vt_order.traded = data.cumQty self.gateway.on_order(vt_order) @@ -268,7 +230,7 @@ class OesTdMessageLoop: i = self.order_manager.get_from_oes_data(data) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] - vt_order.volume = data.ordQty - data.canceledQty + vt_order.volume = data.ordQty vt_order.traded = data.cumQty self.gateway.on_order(vt_order) @@ -279,6 +241,7 @@ class OesTdMessageLoop: vt_order = i.vt_order # vt_order.status = STATUS_OES2VT[data.ordStatus] + trade = TradeData( gateway_name=self.gateway.gateway_name, symbol=data.securityId, @@ -315,10 +278,10 @@ class OesTdMessageLoop: exchange=EXCHANGE_OES2VT[data.mktId], direction=Direction.NET, volume=data.sumHld, - frozen=data.lockHld, + frozen=data.lockHld, # todo: to verify price=data.costPrice / 10000, # pnl=data.costPrice - data.originalCostAmt, - pnl=0, # todo: oes只提供日初持仓价格信息,不提供最初持仓价格信息,所以pnl只有当日的 + pnl=0, yd_volume=data.originalHld, ) self.gateway.on_position(position) @@ -329,7 +292,7 @@ class OesTdMessageLoop: balance = data.currentTotalBal availiable = data.currentAvailableBal # drawable = data.currentDrawableBal - account_id = data.custId + account_id = data.cashAcctId account = AccountData( gateway_name=self.gateway.gateway_name, accountid=account_id, @@ -356,12 +319,19 @@ class OesTdApi: self) self.account_id = None - self.last_seq_index = 1 # 0 has special manning for oes + self.last_seq_index = 1000000 # 0 has special manning for oes + + def get_new_seq_index(self): + """note: not thread safe currently""" + # todo: add lock + index = self.last_seq_index + self.last_seq_index += 1 + return index def connect(self, config_path: str): if not OesApi_InitAllByConvention(self.env, config_path, -1, self.last_seq_index): pass - self.last_seq_index = self.env.ordChannel.lastOutMsgSeq + 1 + self.last_seq_index = max(self.last_seq_index, self.env.ordChannel.lastOutMsgSeq + 1) if not OesApi_IsValidOrdChannel(self.env.ordChannel): pass @@ -383,15 +353,11 @@ class OesTdApi: def join(self): self.message_loop.join() - def query_account(self) -> bool: - return self.query_cash_asset() - - def query_cash_asset(self) -> bool: - ret = OesApi_QueryCashAsset(self.env.qryChannel, + def query_account(self): + OesApi_QueryCashAsset(self.env.qryChannel, OesQryCashAssetFilterT(), self.on_query_asset ) - return ret >= 0 def on_query_asset(self, session_info: SGeneralClientChannelT, @@ -403,7 +369,7 @@ class OesTdApi: balance = data.currentTotalBal / 10000 availiable = data.currentAvailableBal / 10000 # drawable = data.currentDrawableBal - account_id = data.custId + account_id = data.cashAcctId account = AccountData( gateway_name=self.gateway.gateway_name, accountid=account_id, @@ -469,60 +435,6 @@ class OesTdApi: self.gateway.on_contract(contract) return 1 - def query_issue(self) -> bool: - f = OesQryIssueFilterT() - ret = OesApi_QueryIssue(self.env.qryChannel, - f, - self.on_query_issue - ) - return ret >= 0 - - def on_query_issue(self, - session_info: SGeneralClientChannelT, - head: SMsgHeadT, - body: Any, - cursor: OesQryCursorT, - ): - data = cast.toOesIssueItemT(body) - contract = ContractData( - gateway_name=self.gateway.gateway_name, - symbol=data.securityId, - exchange=EXCHANGE_OES2VT[data.mktId], - name=data.securityName, - product=PRODUCT_OES2VT[data.mktId], - size=data.qtyUnit, - pricetick=1, - ) - self.gateway.on_contract(contract) - return 1 - - def query_etf(self) -> bool: - f = OesQryEtfFilterT() - ret = OesApi_QueryEtf(self.env.qryChannel, - f, - self.on_query_etf - ) - return ret >= 0 - - def on_query_etf(self, - session_info: SGeneralClientChannelT, - head: SMsgHeadT, - body: Any, - cursor: OesQryCursorT, - ): - data = cast.toOesEtfItemT(body) - contract = ContractData( - gateway_name=self.gateway.gateway_name, - symbol=data.securityId, - exchange=EXCHANGE_OES2VT[data.mktId], - name=data.securityId, - product=PRODUCT_OES2VT[data.mktId], - size=data.creRdmUnit, # todo: to verify! creRdmUnit : 每个篮子 (最小申购、赎回单位) 对应的ETF份数 - pricetick=1, - ) - self.gateway.on_contract(contract) - return 1 - def query_stock_holding(self) -> bool: f = OesQryStkHoldingFilterT() ret = OesApi_QueryStkHolding(self.env.qryChannel, @@ -608,16 +520,14 @@ class OesTdApi: self.query_stock() # self.query_option() # self.query_issue() - pass def query_position(self): self.query_stock_holding() self.query_option_holding() def send_order(self, vt_req: OrderRequest): - seq_id = self.last_seq_index - self.last_seq_index += 1 # note: thread un-safe here, conflict with on_query_order - order_id = self.order_manager.new_local_id() + seq_id = self.get_new_seq_index() + order_id = seq_id oes_req = OesOrdReqT() oes_req.clSeqNo = seq_id @@ -628,16 +538,16 @@ class OesTdApi: oes_req.securityId = vt_req.symbol oes_req.ordQty = int(vt_req.volume) oes_req.ordPrice = int(vt_req.price * 10000) - oes_req.userInfo = order_id + oes_req.origClOrdId = order_id + order = vt_req.create_order_data(str(order_id), self.gateway.gateway_name) + order.direction = Direction.NET # fix direction into NET: stock only + self.order_manager.save_local_created(order_id, order) ret = OesApi_SendOrderReq(self.env.ordChannel, oes_req ) - order = vt_req.create_order_data(str(order_id), self.gateway.gateway_name) - order.direction = Direction.NET # fix direction into NET: stock only if ret >= 0: - self.order_manager.save_local_created(order_id, order, oes_req) self.gateway.on_order(order) else: self.gateway.write_log("Failed to send_order!") @@ -645,40 +555,21 @@ class OesTdApi: return order.vt_orderid def cancel_order(self, vt_req: CancelRequest): - seq_id = self.last_seq_index - self.last_seq_index += 1 # note: thread un-safe here + seq_id = self.get_new_seq_index() oes_req = OesOrdCancelReqT() order_id = int(vt_req.orderid) internal_order = self.order_manager.get_from_order_id(order_id) - if internal_order.rpt_data: - data = internal_order.rpt_data - # oes_req.origClSeqNo = self.local_id_to_sys_id[int(order_id)] - oes_req.origClOrdId = data.clOrdId - oes_req.origClSeqNo = data.clSeqNo - oes_req.origClEnvId = data.origClEnvId - oes_req.mktId = data.mktId - # oes_req.invAcctId = data.invAcctId - # oes_req.mktId = data.mktId - # oes_req.securityId = data.securityId - else: - data = internal_order.req_data - oes_req.origClSeqNo = data.clSeqNo - oes_req.mktId = internal_order.req_data.mktId + oes_req.origClOrdId = internal_order.order_id + oes_req.mktId = EXCHANGE_VT2OES[vt_req.exchange] oes_req.clSeqNo = seq_id + oes_req.origClSeqNo = order_id oes_req.invAcctId = "" oes_req.securityId = vt_req.symbol - oes_req.userInfo = order_id - ret = OesApi_SendOrderCancelReq(self.env.ordChannel, + OesApi_SendOrderCancelReq(self.env.ordChannel, oes_req) - if ret >= 0: - pass - else: - pass - return - def schedule_query_order(self, internal_order: InternalOrder) -> Thread: th = Thread(target=self.query_order, args=(internal_order,)) th.start() @@ -686,16 +577,8 @@ class OesTdApi: def query_order(self, internal_order: InternalOrder) -> bool: f = OesQryOrdFilterT() - if internal_order.req_data: - f.clSeqNo = internal_order.req_data.clSeqNo - f.mktId = internal_order.req_data.mktId - f.invAcctId = internal_order.req_data.invAcctId - else: - f.clSeqNo = internal_order.rpt_data.origClSeqNo - f.clOrdId = internal_order.rpt_data.origClOrdId - f.clEnvId = internal_order.rpt_data.origClEnvId - f.mktId = internal_order.rpt_data.mktId - f.invAcctId = internal_order.rpt_data.invAcctId + f.mktId = EXCHANGE_VT2OES[internal_order.vt_order.exchange] + f.clSeqNo = internal_order.order_id ret = OesApi_QueryOrder(self.env.qryChannel, f, self.on_query_order @@ -735,9 +618,16 @@ class OesTdApi: cursor: OesQryCursorT, ): data: OesOrdCnfmT = cast.toOesOrdItemT(body) - i = self.order_manager.get_remote_created_order_from_oes_data(data) - if not i: - order_id = self.order_manager.new_remote_id() + try: + i = self.order_manager.get_from_oes_data(data) + vt_order = i.vt_order + vt_order.status = STATUS_OES2VT[data.ordStatus] + vt_order.volume = data.ordQty - data.canceledQty + vt_order.traded = data.cumQty + self.gateway.on_order(vt_order) + except KeyError: + # order_id = self.order_manager.new_remote_id() + order_id = self.order_manager.get_order_id_from_data(data) if data.bsType == eOesBuySellTypeT.OES_BS_TYPE_BUY: offset = Offset.OPEN @@ -748,7 +638,7 @@ class OesTdApi: gateway_name=self.gateway.gateway_name, symbol=data.securityId, exchange=EXCHANGE_OES2VT[data.mktId], - orderid=order_id if order_id else data.userInfo, # generated id + orderid=order_id if order_id else data.origClSeqNo, # generated id direction=Direction.NET, offset=offset, price=data.ordPrice / 10000, @@ -759,13 +649,6 @@ class OesTdApi: # this time should be generated automatically or by a static function time=datetime.utcnow().isoformat(), ) - self.order_manager.save_remote_created(order_id, vt_order, data) + self.order_manager.save_remote_created(order_id, vt_order) self.gateway.on_order(vt_order) - return 1 - else: - vt_order = i.vt_order - vt_order.status = STATUS_OES2VT[data.ordStatus] - vt_order.volume = data.ordQty - data.canceledQty - vt_order.traded = data.cumQty - self.gateway.on_order(vt_order) - return 1 + return 1 From cad96b95fcee78478e18743ad99cc4d2b795c849 Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 8 Mar 2019 05:04:46 -0400 Subject: [PATCH 02/13] [Add] gateway.oes: reconnect --- vnpy/gateway/oes/config_template.ini | 6 -- vnpy/gateway/oes/oes_gateway.py | 48 +++++---- vnpy/gateway/oes/{md.py => oes_md.py} | 70 +++++++------ vnpy/gateway/oes/{td.py => oes_td.py} | 140 ++++++++++++++------------ 4 files changed, 149 insertions(+), 115 deletions(-) rename vnpy/gateway/oes/{md.py => oes_md.py} (84%) rename vnpy/gateway/oes/{td.py => oes_td.py} (86%) diff --git a/vnpy/gateway/oes/config_template.ini b/vnpy/gateway/oes/config_template.ini index c7db390f..367a169c 100644 --- a/vnpy/gateway/oes/config_template.ini +++ b/vnpy/gateway/oes/config_template.ini @@ -11,9 +11,6 @@ ordServer = 1 {td_ord_server} rptServer = 1 {td_rpt_server} qryServer = 1 {td_qry_server} -username = {username} -# 密码支持明文和MD5两种格式 (如 txt:XXX 或 md5:XXX..., 不带前缀则默认为明文) -password = {password} heartBtInt = 30 # 客户端环境号, 用于区分不同客户端实例上报的委托申报, 取值由客户端自行分配 @@ -58,9 +55,6 @@ keepCnt = 9 tcpServer = {md_tcp_server} qryServer = {md_qry_server} -username = {username} -# 密码支持明文和MD5两种格式 (如 txt:XXX 或 md5:XXX..., 不带前缀则默认为明文) -password = {password} heartBtInt = 30 sse.stock.enable = false diff --git a/vnpy/gateway/oes/oes_gateway.py b/vnpy/gateway/oes/oes_gateway.py index 9ef4d41b..3f7ad72e 100644 --- a/vnpy/gateway/oes/oes_gateway.py +++ b/vnpy/gateway/oes/oes_gateway.py @@ -3,14 +3,15 @@ """ import hashlib import os +from gettext import gettext as _ from threading import Thread from vnpy.trader.gateway import BaseGateway -from vnpy.trader.object import (AccountData, CancelRequest, ContractData, OrderData, OrderRequest, - PositionData, SubscribeRequest, TickData, TradeData) +from vnpy.trader.object import (CancelRequest, OrderRequest, + SubscribeRequest) from vnpy.trader.utility import get_file_path -from .md import OesMdApi -from .td import OesTdApi +from .oes_md import OesMdApi +from .oes_td import OesTdApi from .utils import config_template @@ -44,7 +45,10 @@ class OesGateway(BaseGateway): if not setting['password'].startswith("md5:"): setting['password'] = "md5:" + hashlib.md5(setting['password'].encode()).hexdigest() - config_path = get_file_path("vnoes.ini") + username = setting['username'] + password = setting['password'] + + config_path = str(get_file_path("vnoes.ini")) with open(config_path, "wt") as f: if 'test' in setting: log_level = 'DEBUG' @@ -54,7 +58,7 @@ class OesGateway(BaseGateway): log_mode = 'file' log_dir = get_file_path('oes') log_path = os.path.join(log_dir, 'log.log') - if os.path.exists(log_dir): + if not os.path.exists(log_dir): os.mkdir(log_dir) content = config_template.format(**setting, log_level=log_level, @@ -62,20 +66,30 @@ class OesGateway(BaseGateway): log_path=log_path) f.write(content) - self.td_api.connect(str(config_path)) + self.md_api.config_path = config_path + self.md_api.username = username + self.md_api.password = password + if self.md_api.connect(): + self.md_api.start() + else: + self.write_log(_("无法连接到行情服务器,请检查你的配置")) - self.td_api.query_account() - self.td_api.query_contracts() - self.td_api.query_position() - self.td_api.init_query_orders() - - self.td_api.start() - - self.md_api.connect(str(config_path)) - self.md_api.start() + self.td_api.config_path = config_path + self.td_api.username = username + self.td_api.password = password + if self.td_api.connect(): + self.write_log(_("成功连接到交易服务器")) + self.td_api.query_account() + self.td_api.query_contracts() + self.write_log("合约信息查询成功") + self.td_api.query_position() + self.td_api.init_query_orders() + self.td_api.start() + else: + self.write_log(_("无法连接到交易服务器,请检查你的配置")) def _connect_async(self, setting: dict): - Thread(target=self._connect_sync, args=(setting, )).start() + Thread(target=self._connect_sync, args=(setting,)).start() def subscribe(self, req: SubscribeRequest): """""" diff --git a/vnpy/gateway/oes/md.py b/vnpy/gateway/oes/oes_md.py similarity index 84% rename from vnpy/gateway/oes/md.py rename to vnpy/gateway/oes/oes_md.py index e256f97d..9d76e1ae 100644 --- a/vnpy/gateway/oes/md.py +++ b/vnpy/gateway/oes/oes_md.py @@ -1,4 +1,6 @@ +import time from datetime import datetime +from gettext import gettext as _ from threading import Thread # noinspection PyUnresolvedReferences from typing import Any, Callable, Dict @@ -9,7 +11,8 @@ from vnpy.api.oes.vnoes import MdsApiClientEnvT, MdsApi_DestoryAll, MdsApi_InitA MdsMktDataRequestReqT, MdsMktRspMsgBodyT, MdsStockSnapshotBodyT, SGeneralClientChannelT, \ SMsgHeadT, SPlatform_IsNegEpipe, SPlatform_IsNegEtimeout, cast, eMdsExchangeIdT, \ eMdsMktSubscribeFlagT, eMdsMsgTypeT, eMdsSecurityTypeT, eMdsSubscribeDataTypeT, \ - eMdsSubscribeModeT, eMdsSubscribedTickExpireTypeT, eSMsgProtocolTypeT + eMdsSubscribeModeT, eMdsSubscribedTickExpireTypeT, eSMsgProtocolTypeT, MdsApi_SetThreadUsername, \ + MdsApi_SetThreadPassword from vnpy.trader.constant import Exchange from vnpy.trader.gateway import BaseGateway @@ -25,10 +28,11 @@ EXCHANGE_VT2MDS = {v: k for k, v in EXCHANGE_MDS2VT.items()} class OesMdMessageLoop: - def __init__(self, gateway: BaseGateway, env: MdsApiClientEnvT): + def __init__(self, gateway: BaseGateway, md: "OesMdApi", env: MdsApiClientEnvT): self.gateway = gateway self.env = env self.alive = False + self.md = md self.th = Thread(target=self.message_loop) self.message_handlers: Dict[int, Callable[[dict], None]] = { @@ -91,6 +95,10 @@ class OesMdMessageLoop: self.gateway.write_log(f"unknown prototype : {session_info.protocolType}") return 1 + def reconnect(self): + self.gateway.write_log(_("正在尝试重新连接到行情服务器。")) + return self.md.connect() + def message_loop(self): tcp_channel = self.env.tcpChannel timeout_ms = 1000 @@ -101,13 +109,12 @@ class OesMdMessageLoop: timeout_ms, self.on_message) if ret < 0: - if is_timeout(ret): - pass + # if is_timeout(ret): + # pass # just no message if is_disconnected(ret): - # todo: handle disconnected - self.alive = False - break - pass + self.gateway.write_log(_("与行情服务器的连接已断开。")) + while not self.reconnect(): + time.sleep(1) return def on_l2_market_data_snapshot(self, d: MdsMktRspMsgBodyT): @@ -139,7 +146,6 @@ class OesMdMessageLoop: for i in range(5): tick.__dict__['ask_price_' + str(i + 1)] = data.OfferLevels[i].Price / 10000 self.gateway.on_tick(tick) - pass def on_l2_trade(self, d: MdsMktRspMsgBodyT): data = d.trade @@ -182,30 +188,39 @@ class OesMdApi: def __init__(self, gateway: BaseGateway): self.gateway = gateway - self.env = MdsApiClientEnvT() - self.message_loop = OesMdMessageLoop(gateway, self.env) + self.config_path: str = '' + self.username: str = '' + self.password: str = '' - def connect(self, config_path: str): - if not MdsApi_InitAllByConvention(self.env, config_path): - pass + self._env = MdsApiClientEnvT() + self._message_loop = OesMdMessageLoop(gateway, self, self._env) - if not MdsApi_IsValidTcpChannel(self.env.tcpChannel): - pass - if not MdsApi_IsValidQryChannel(self.env.qryChannel): - pass + def connect(self) -> bool: + """Connect to trading server. + :note set config_path before calling this function + """ + MdsApi_SetThreadUsername(self.username) + MdsApi_SetThreadPassword(self.password) + + config_path = self.config_path + if not MdsApi_InitAllByConvention(self._env, config_path): + return False + if not MdsApi_IsValidTcpChannel(self._env.tcpChannel): + return False + if not MdsApi_IsValidQryChannel(self._env.qryChannel): + return False + return True def start(self): - self.message_loop.start() + self._message_loop.start() def stop(self): - self.message_loop.stop() - if not MdsApi_LogoutAll(self.env, True): - pass # doc for this function is error - if not MdsApi_DestoryAll(self.env): - pass # doc for this function is error + self._message_loop.stop() + MdsApi_LogoutAll(self._env, True) + MdsApi_DestoryAll(self._env) def join(self): - self.message_loop.join() + self._message_loop.join() # why isn't arg a ContractData? def subscribe(self, req: SubscribeRequest): @@ -236,12 +251,11 @@ class OesMdApi: entry.securityType = eMdsSecurityTypeT.MDS_SECURITY_TYPE_STOCK # todo: option and others entry.instrId = int(req.symbol) - self.message_loop.register_symbol(req.symbol, req.exchange) + self._message_loop.register_symbol(req.symbol, req.exchange) ret = MdsApi_SubscribeMarketData( - self.env.tcpChannel, + self._env.tcpChannel, mds_req, entry) if not ret: self.gateway.write_log( f"MdsApi_SubscribeByString failed with {ret}:{error_to_str(ret)}") - pass diff --git a/vnpy/gateway/oes/td.py b/vnpy/gateway/oes/oes_td.py similarity index 86% rename from vnpy/gateway/oes/td.py rename to vnpy/gateway/oes/oes_td.py index a74e7897..649aef7f 100644 --- a/vnpy/gateway/oes/td.py +++ b/vnpy/gateway/oes/oes_td.py @@ -1,19 +1,21 @@ from dataclasses import dataclass from datetime import datetime -from threading import Thread +from gettext import gettext as _ +from threading import Lock, Thread # noinspection PyUnresolvedReferences from typing import Any, Callable, Dict, Tuple from vnpy.api.oes.vnoes import OesApiClientEnvT, OesApi_DestoryAll, OesApi_InitAllByConvention, \ OesApi_IsValidOrdChannel, OesApi_IsValidQryChannel, OesApi_IsValidRptChannel, OesApi_LogoutAll, \ - OesApi_QueryCashAsset, OesApi_QueryEtf, OesApi_QueryIssue, OesApi_QueryOptHolding, \ + OesApi_QueryCashAsset, OesApi_QueryOptHolding, \ OesApi_QueryOption, OesApi_QueryOrder, OesApi_QueryStkHolding, OesApi_QueryStock, \ OesApi_SendOrderCancelReq, OesApi_SendOrderReq, OesApi_WaitReportMsg, OesOrdCancelReqT, \ - OesOrdCnfmT, OesOrdRejectT, OesOrdReqT, OesQryCashAssetFilterT, OesQryCursorT, OesQryEtfFilterT, \ - OesQryIssueFilterT, OesQryOptionFilterT, OesQryOrdFilterT, OesQryStkHoldingFilterT, \ + OesOrdCnfmT, OesOrdRejectT, OesOrdReqT, OesQryCashAssetFilterT, OesQryCursorT, \ + OesQryOptionFilterT, OesQryOrdFilterT, OesQryStkHoldingFilterT, \ OesQryStockFilterT, OesRspMsgBodyT, OesStockBaseInfoT, OesTrdCnfmT, SGeneralClientChannelT, \ SMSG_PROTO_BINARY, SMsgHeadT, SPlatform_IsNegEpipe, SPlatform_IsNegEtimeout, cast, \ - eOesBuySellTypeT, eOesMarketIdT, eOesMsgTypeT, eOesOrdStatusT, eOesOrdTypeShT, eOesOrdTypeSzT + eOesBuySellTypeT, eOesMarketIdT, eOesMsgTypeT, eOesOrdStatusT, eOesOrdTypeShT, eOesOrdTypeSzT, \ + OesApi_SetThreadUsername, OesApi_SetThreadPassword from vnpy.gateway.oes.error_code import error_to_str from vnpy.trader.constant import Direction, Exchange, Offset, PriceType, Product, Status @@ -175,6 +177,10 @@ class OesTdMessageLoop: self.gateway.write_log(f"unknown prototype : {session_info.protocolType}") return 1 + def reconnect(self): + self.gateway.write_log(_("正在尝试重新连接到交易服务器。")) + self.td.connect() + def message_loop(self): rpt_channel = self.env.rptChannel timeout_ms = 1000 @@ -186,12 +192,10 @@ class OesTdMessageLoop: timeout_ms, self.on_message) if ret < 0: - if is_timeout(ret): - pass + # if is_timeout(ret): + # pass # just no message if is_disconnected(ret): - # todo: handle disconnected - self.alive = False - break + self.reconnect() return def on_order_rejected(self, d: OesRspMsgBodyT): @@ -241,7 +245,6 @@ class OesTdMessageLoop: vt_order = i.vt_order # vt_order.status = STATUS_OES2VT[data.ordStatus] - trade = TradeData( gateway_name=self.gateway.gateway_name, symbol=data.securityId, @@ -309,55 +312,65 @@ class OesTdMessageLoop: class OesTdApi: def __init__(self, gateway: BaseGateway): + self.config_path: str = None + self.username: str = '' + self.password: str = '' self.gateway = gateway - self.env = OesApiClientEnvT() - self.order_manager = OrderManager() - self.message_loop = OesTdMessageLoop(gateway, - self.env, - self.order_manager, - self) + self._env = OesApiClientEnvT() - self.account_id = None - self.last_seq_index = 1000000 # 0 has special manning for oes + self._order_manager = OrderManager() + self._message_loop = OesTdMessageLoop(gateway, + self._env, + self._order_manager, + self) - def get_new_seq_index(self): - """note: not thread safe currently""" - # todo: add lock - index = self.last_seq_index - self.last_seq_index += 1 - return index + self._last_seq_lock = Lock() + self._last_seq_index = 1000000 # 0 has special manning for oes - def connect(self, config_path: str): - if not OesApi_InitAllByConvention(self.env, config_path, -1, self.last_seq_index): - pass - self.last_seq_index = max(self.last_seq_index, self.env.ordChannel.lastOutMsgSeq + 1) + def connect(self): + """Connect to trading server. + :note set config_path before calling this function + """ + OesApi_SetThreadUsername(self.username) + OesApi_SetThreadPassword(self.password) - if not OesApi_IsValidOrdChannel(self.env.ordChannel): - pass - if not OesApi_IsValidQryChannel(self.env.qryChannel): - pass - if not OesApi_IsValidRptChannel(self.env.rptChannel): - pass + config_path = self.config_path + if not OesApi_InitAllByConvention(self._env, config_path, -1, self._last_seq_index): + return False + self._last_seq_index = max(self._last_seq_index, self._env.ordChannel.lastOutMsgSeq + 1) + + if not OesApi_IsValidOrdChannel(self._env.ordChannel): + return False + if not OesApi_IsValidQryChannel(self._env.qryChannel): + return False + if not OesApi_IsValidRptChannel(self._env.rptChannel): + return False + return True def start(self): - self.message_loop.start() + self._message_loop.start() def stop(self): - self.message_loop.stop() - if not OesApi_LogoutAll(self.env, True): - pass # doc for this function is error - if not OesApi_DestoryAll(self.env): - pass # doc for this function is error + self._message_loop.stop() + OesApi_LogoutAll(self._env, True) + OesApi_DestoryAll(self._env) def join(self): - self.message_loop.join() + self._message_loop.join() + + def _get_new_seq_index(self): + """""" + with self._last_seq_lock: + index = self._last_seq_index + self._last_seq_index += 1 + return index def query_account(self): - OesApi_QueryCashAsset(self.env.qryChannel, - OesQryCashAssetFilterT(), - self.on_query_asset - ) + OesApi_QueryCashAsset(self._env.qryChannel, + OesQryCashAssetFilterT(), + self.on_query_asset + ) def on_query_asset(self, session_info: SGeneralClientChannelT, @@ -376,7 +389,6 @@ class OesTdApi: balance=balance, frozen=balance - availiable, ) - self.account_id = account_id self.gateway.on_account(account) return 1 @@ -386,7 +398,7 @@ class OesTdApi: def _query_stock(self, ) -> bool: f = OesQryStockFilterT() - ret = OesApi_QueryStock(self.env.qryChannel, f, self.on_query_stock) + ret = OesApi_QueryStock(self._env.qryChannel, f, self.on_query_stock) return ret >= 0 def on_query_stock(self, @@ -410,7 +422,7 @@ class OesTdApi: def query_option(self) -> bool: f = OesQryOptionFilterT() - ret = OesApi_QueryOption(self.env.qryChannel, + ret = OesApi_QueryOption(self._env.qryChannel, f, self.on_query_option ) @@ -437,7 +449,7 @@ class OesTdApi: def query_stock_holding(self) -> bool: f = OesQryStkHoldingFilterT() - ret = OesApi_QueryStkHolding(self.env.qryChannel, + ret = OesApi_QueryStkHolding(self._env.qryChannel, f, self.on_query_stock_holding ) @@ -470,7 +482,7 @@ class OesTdApi: f = OesQryStkHoldingFilterT() f.mktId = eOesMarketIdT.OES_MKT_ID_UNDEFINE f.userInfo = 0 - ret = OesApi_QueryOptHolding(self.env.qryChannel, + ret = OesApi_QueryOptHolding(self._env.qryChannel, f, self.on_query_holding ) @@ -526,7 +538,7 @@ class OesTdApi: self.query_option_holding() def send_order(self, vt_req: OrderRequest): - seq_id = self.get_new_seq_index() + seq_id = self._get_new_seq_index() order_id = seq_id oes_req = OesOrdReqT() @@ -542,8 +554,8 @@ class OesTdApi: order = vt_req.create_order_data(str(order_id), self.gateway.gateway_name) order.direction = Direction.NET # fix direction into NET: stock only - self.order_manager.save_local_created(order_id, order) - ret = OesApi_SendOrderReq(self.env.ordChannel, + self._order_manager.save_local_created(order_id, order) + ret = OesApi_SendOrderReq(self._env.ordChannel, oes_req ) @@ -555,11 +567,11 @@ class OesTdApi: return order.vt_orderid def cancel_order(self, vt_req: CancelRequest): - seq_id = self.get_new_seq_index() + seq_id = self._get_new_seq_index() oes_req = OesOrdCancelReqT() order_id = int(vt_req.orderid) - internal_order = self.order_manager.get_from_order_id(order_id) + internal_order = self._order_manager.get_from_order_id(order_id) oes_req.origClOrdId = internal_order.order_id oes_req.mktId = EXCHANGE_VT2OES[vt_req.exchange] @@ -567,8 +579,8 @@ class OesTdApi: oes_req.origClSeqNo = order_id oes_req.invAcctId = "" oes_req.securityId = vt_req.symbol - OesApi_SendOrderCancelReq(self.env.ordChannel, - oes_req) + OesApi_SendOrderCancelReq(self._env.ordChannel, + oes_req) def schedule_query_order(self, internal_order: InternalOrder) -> Thread: th = Thread(target=self.query_order, args=(internal_order,)) @@ -579,7 +591,7 @@ class OesTdApi: f = OesQryOrdFilterT() f.mktId = EXCHANGE_VT2OES[internal_order.vt_order.exchange] f.clSeqNo = internal_order.order_id - ret = OesApi_QueryOrder(self.env.qryChannel, + ret = OesApi_QueryOrder(self._env.qryChannel, f, self.on_query_order ) @@ -591,7 +603,7 @@ class OesTdApi: body: Any, cursor: OesQryCursorT): data: OesOrdCnfmT = cast.toOesOrdItemT(body) - i = self.order_manager.get_from_oes_data(data) + i = self._order_manager.get_from_oes_data(data) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.volume = data.ordQty - data.canceledQty @@ -605,7 +617,7 @@ class OesTdApi: :return: """ f = OesQryOrdFilterT() - ret = OesApi_QueryOrder(self.env.qryChannel, + ret = OesApi_QueryOrder(self._env.qryChannel, f, self.on_init_query_orders ) @@ -619,7 +631,7 @@ class OesTdApi: ): data: OesOrdCnfmT = cast.toOesOrdItemT(body) try: - i = self.order_manager.get_from_oes_data(data) + i = self._order_manager.get_from_oes_data(data) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.volume = data.ordQty - data.canceledQty @@ -627,7 +639,7 @@ class OesTdApi: self.gateway.on_order(vt_order) except KeyError: # order_id = self.order_manager.new_remote_id() - order_id = self.order_manager.get_order_id_from_data(data) + order_id = self._order_manager.get_order_id_from_data(data) if data.bsType == eOesBuySellTypeT.OES_BS_TYPE_BUY: offset = Offset.OPEN @@ -649,6 +661,6 @@ class OesTdApi: # this time should be generated automatically or by a static function time=datetime.utcnow().isoformat(), ) - self.order_manager.save_remote_created(order_id, vt_order) + self._order_manager.save_remote_created(order_id, vt_order) self.gateway.on_order(vt_order) return 1 From ee3731f21c94dfe4d5cccb40e39956fff345c1cc Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 8 Mar 2019 06:25:10 -0400 Subject: [PATCH 03/13] =?UTF-8?q?[Mod]=20=E5=8E=BB=E9=99=A4=E4=B8=8D?= =?UTF-8?q?=E5=BF=85=E8=A6=81=E7=9A=84=E8=AE=A2=E9=98=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/gateway/oes/config_template.ini | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vnpy/gateway/oes/config_template.ini b/vnpy/gateway/oes/config_template.ini index 367a169c..2a3358c6 100644 --- a/vnpy/gateway/oes/config_template.ini +++ b/vnpy/gateway/oes/config_template.ini @@ -30,7 +30,7 @@ rpt.subcribeEnvId = 0 # 比如想订阅所有委托、成交相关的回报消息,可以使用如下两种方式: # - rpt.subcribeRptTypes = 1,4,8 # - 或等价的: rpt.subcribeRptTypes = 0x0D -rpt.subcribeRptTypes = 0 +rpt.subcribeRptTypes = 1,2,4,8,0x10,0x20,0x40 # 服务器集群的集群类型 (1: 基于复制集的高可用集群, 2: 基于对等节点的服务器集群, 0: 默认为基于复制集的高可用集群) clusterType = 0 @@ -87,7 +87,7 @@ mktData.tickExpireType = 0 # 0x400:指数行情, 0x800:期权行情) # 要订阅多个数据种类, 可以用逗号或空格分隔, 或者设置为并集值, 如: # "mktData.dataTypes = 1,2,4" 或等价的 "mktData.dataTypes = 0x07" -mktData.dataTypes = 0 +mktData.dataTypes = 1,2,4,8,0x10 # 请求订阅的行情数据的起始时间 (格式: HHMMSS 或 HHMMSSsss) # (-1: 从头开始获取, 0: 从最新位置开始获取实时行情, 大于0: 从指定的起始时间开始获取) From e2e93cef3bf463cbc9a1782449b1ed30ecc89525 Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 8 Mar 2019 06:26:17 -0400 Subject: [PATCH 04/13] =?UTF-8?q?[Fix]=20oes.MdApi:=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E9=80=80=E5=87=BA=E6=97=B6=E8=BF=98=E5=9C=A8=E9=87=8D=E8=BF=9E?= =?UTF-8?q?=E7=9A=84BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/gateway/oes/oes_md.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/vnpy/gateway/oes/oes_md.py b/vnpy/gateway/oes/oes_md.py index 9d76e1ae..75d41432 100644 --- a/vnpy/gateway/oes/oes_md.py +++ b/vnpy/gateway/oes/oes_md.py @@ -6,13 +6,12 @@ from threading import Thread from typing import Any, Callable, Dict from vnpy.api.oes.vnoes import MdsApiClientEnvT, MdsApi_DestoryAll, MdsApi_InitAllByConvention, \ - MdsApi_IsValidQryChannel, MdsApi_IsValidTcpChannel, MdsApi_LogoutAll, \ - MdsApi_SubscribeMarketData, MdsApi_WaitOnMsg, MdsL2StockSnapshotBodyT, MdsMktDataRequestEntryT, \ - MdsMktDataRequestReqT, MdsMktRspMsgBodyT, MdsStockSnapshotBodyT, SGeneralClientChannelT, \ - SMsgHeadT, SPlatform_IsNegEpipe, SPlatform_IsNegEtimeout, cast, eMdsExchangeIdT, \ - eMdsMktSubscribeFlagT, eMdsMsgTypeT, eMdsSecurityTypeT, eMdsSubscribeDataTypeT, \ - eMdsSubscribeModeT, eMdsSubscribedTickExpireTypeT, eSMsgProtocolTypeT, MdsApi_SetThreadUsername, \ - MdsApi_SetThreadPassword + MdsApi_IsValidQryChannel, MdsApi_IsValidTcpChannel, MdsApi_LogoutAll, MdsApi_SetThreadPassword, \ + MdsApi_SetThreadUsername, MdsApi_SubscribeMarketData, MdsApi_WaitOnMsg, MdsL2StockSnapshotBodyT, \ + MdsMktDataRequestEntryT, MdsMktDataRequestReqT, MdsMktRspMsgBodyT, MdsStockSnapshotBodyT, \ + SGeneralClientChannelT, SMsgHeadT, SPlatform_IsNegEpipe, SPlatform_IsNegEtimeout, cast, \ + eMdsExchangeIdT, eMdsMktSubscribeFlagT, eMdsMsgTypeT, eMdsSecurityTypeT, eMdsSubscribeDataTypeT, \ + eMdsSubscribeModeT, eMdsSubscribedTickExpireTypeT, eSMsgProtocolTypeT from vnpy.trader.constant import Exchange from vnpy.trader.gateway import BaseGateway @@ -36,11 +35,13 @@ class OesMdMessageLoop: self.th = Thread(target=self.message_loop) self.message_handlers: Dict[int, Callable[[dict], None]] = { + # tick & orderbook eMdsMsgTypeT.MDS_MSGTYPE_MARKET_DATA_SNAPSHOT_FULL_REFRESH: self.on_market_full_refresh, eMdsMsgTypeT.MDS_MSGTYPE_L2_MARKET_DATA_SNAPSHOT: self.on_l2_market_data_snapshot, eMdsMsgTypeT.MDS_MSGTYPE_L2_ORDER: self.on_l2_order, eMdsMsgTypeT.MDS_MSGTYPE_L2_TRADE: self.on_l2_trade, + # others eMdsMsgTypeT.MDS_MSGTYPE_QRY_SECURITY_STATUS: self.on_security_status, eMdsMsgTypeT.MDS_MSGTYPE_L2_MARKET_DATA_INCREMENTAL: lambda x: 1, eMdsMsgTypeT.MDS_MSGTYPE_L2_BEST_ORDERS_SNAPSHOT: self.on_best_orders_snapshot, @@ -50,6 +51,7 @@ class OesMdMessageLoop: eMdsMsgTypeT.MDS_MSGTYPE_TRADING_SESSION_STATUS: self.on_trading_session_status, eMdsMsgTypeT.MDS_MSGTYPE_SECURITY_STATUS: self.on_security_status, eMdsMsgTypeT.MDS_MSGTYPE_MARKET_DATA_REQUEST: self.on_market_data_request, + eMdsMsgTypeT.MDS_MSGTYPE_HEARTBEAT: lambda x: 1, } self.last_tick: Dict[str, TickData] = {} @@ -76,6 +78,9 @@ class OesMdMessageLoop: self.alive = True self.th.start() + def stop(self): + self.alive = False + def join(self): self.th.join() @@ -113,7 +118,7 @@ class OesMdMessageLoop: # pass # just no message if is_disconnected(ret): self.gateway.write_log(_("与行情服务器的连接已断开。")) - while not self.reconnect(): + while not self.reconnect() and self.alive: time.sleep(1) return @@ -180,9 +185,6 @@ class OesMdMessageLoop: def on_security_status(self, d: MdsMktRspMsgBodyT): pass - def stop(self): - self.alive = False - class OesMdApi: From 897e7f037299ba5b7d8653fa69fdcca6abe6ed21 Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 8 Mar 2019 06:28:47 -0400 Subject: [PATCH 05/13] =?UTF-8?q?[Fix]=20OesTdApi:=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E9=80=80=E5=87=BA=E6=97=B6=E8=BF=98=E5=9C=A8=E9=87=8D=E8=BF=9E?= =?UTF-8?q?=E7=9A=84Bug=20[Fix]=20=E4=BF=AE=E5=A4=8D=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E6=92=A4=E9=94=80=E9=9D=9E=E6=9C=AC=E4=BC=9A=E8=AF=9D=E8=AE=A2?= =?UTF-8?q?=E5=8D=95=E7=9A=84BUG=20[Fix]=20=E5=A1=AB=E5=85=85order&trade?= =?UTF-8?q?=E4=B8=AD=E7=9A=84time=E5=AD=97=E6=AE=B5=20[Fix]=20=E5=86=BB?= =?UTF-8?q?=E7=BB=93=E8=B5=84=E9=87=91=E4=B8=8D=E5=86=8D=E6=98=AF=E8=AE=A1?= =?UTF-8?q?=E7=AE=97=E5=BE=97=E5=87=BA=EF=BC=8C=E8=80=8C=E6=98=AF=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=E7=B2=BE=E7=A1=AE=E7=9A=84=E6=8C=82=E5=8D=95=E5=86=BB?= =?UTF-8?q?=E7=BB=93=E8=B5=84=E9=87=91=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/gateway/oes/oes_td.py | 107 ++++++++++++++++++++++--------------- 1 file changed, 63 insertions(+), 44 deletions(-) diff --git a/vnpy/gateway/oes/oes_td.py b/vnpy/gateway/oes/oes_td.py index 649aef7f..a7e8f435 100644 --- a/vnpy/gateway/oes/oes_td.py +++ b/vnpy/gateway/oes/oes_td.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timedelta, timezone from gettext import gettext as _ from threading import Lock, Thread # noinspection PyUnresolvedReferences @@ -7,15 +7,14 @@ from typing import Any, Callable, Dict, Tuple from vnpy.api.oes.vnoes import OesApiClientEnvT, OesApi_DestoryAll, OesApi_InitAllByConvention, \ OesApi_IsValidOrdChannel, OesApi_IsValidQryChannel, OesApi_IsValidRptChannel, OesApi_LogoutAll, \ - OesApi_QueryCashAsset, OesApi_QueryOptHolding, \ - OesApi_QueryOption, OesApi_QueryOrder, OesApi_QueryStkHolding, OesApi_QueryStock, \ - OesApi_SendOrderCancelReq, OesApi_SendOrderReq, OesApi_WaitReportMsg, OesOrdCancelReqT, \ + OesApi_QueryCashAsset, OesApi_QueryOptHolding, OesApi_QueryOption, OesApi_QueryOrder, \ + OesApi_QueryStkHolding, OesApi_QueryStock, OesApi_SendOrderCancelReq, OesApi_SendOrderReq, \ + OesApi_SetThreadPassword, OesApi_SetThreadUsername, OesApi_WaitReportMsg, OesOrdCancelReqT, \ OesOrdCnfmT, OesOrdRejectT, OesOrdReqT, OesQryCashAssetFilterT, OesQryCursorT, \ - OesQryOptionFilterT, OesQryOrdFilterT, OesQryStkHoldingFilterT, \ - OesQryStockFilterT, OesRspMsgBodyT, OesStockBaseInfoT, OesTrdCnfmT, SGeneralClientChannelT, \ - SMSG_PROTO_BINARY, SMsgHeadT, SPlatform_IsNegEpipe, SPlatform_IsNegEtimeout, cast, \ - eOesBuySellTypeT, eOesMarketIdT, eOesMsgTypeT, eOesOrdStatusT, eOesOrdTypeShT, eOesOrdTypeSzT, \ - OesApi_SetThreadUsername, OesApi_SetThreadPassword + OesQryOptionFilterT, OesQryOrdFilterT, OesQryStkHoldingFilterT, OesQryStockFilterT, \ + OesRspMsgBodyT, OesStockBaseInfoT, OesTrdCnfmT, SGeneralClientChannelT, SMSG_PROTO_BINARY, \ + SMsgHeadT, SPlatform_IsNegEpipe, cast, eOesBuySellTypeT, eOesMarketIdT, \ + eOesMsgTypeT, eOesOrdStatusT, eOesOrdTypeShT, eOesOrdTypeSzT from vnpy.gateway.oes.error_code import error_to_str from vnpy.trader.constant import Direction, Exchange, Offset, PriceType, Product, Status @@ -86,6 +85,8 @@ STATUS_OES2VT = { eOesOrdStatusT.OES_ORD_STATUS_INVALID_SZ_TRY_AGAIN: Status.REJECTED, } +bjtz = timezone(timedelta(hours=8)) + @dataclass class InternalOrder: @@ -93,6 +94,20 @@ class InternalOrder: vt_order: OrderData = None +def parse_oes_datetime(date: int, time: int): + # YYYYMMDD + year = int(date / 10000) + month = int((date % 10000) / 100) + day = int(date % 100) + + # HHMMSSsss + hour = int(time / 10000000) + minute = int((time % 10000000) / 100000) + sec = int((time % 100000) / 1000) + mill = int(time % 1000) + return datetime(year, month, day, hour, minute, sec, mill * 1000, tzinfo=bjtz) + + class OrderManager: def __init__(self): @@ -122,12 +137,6 @@ class OrderManager: def get_from_order_id(self, id: int): return self._orders[id] - def get_from_oes_data(self, data): - return data.clSeqNo - - def get_order_id_from_data(self, data): - return self.get_from_order_id(self.get_from_oes_data(data)) - class OesTdMessageLoop: @@ -138,12 +147,12 @@ class OesTdMessageLoop: td: "OesTdApi" ): self.gateway = gateway - self.env = env - self.order_manager = order_manager - self.td = td + self._td = td + self._env = env + self._order_manager = order_manager - self.alive = False - self.th = Thread(target=self.message_loop) + self._alive = False + self._th = Thread(target=self.message_loop) self.message_handlers: Dict[int, Callable[[dict], None]] = { eOesMsgTypeT.OESMSG_RPT_BUSINESS_REJECT: self.on_order_rejected, @@ -153,15 +162,20 @@ class OesTdMessageLoop: eOesMsgTypeT.OESMSG_RPT_STOCK_HOLDING_VARIATION: self.on_stock_holding, eOesMsgTypeT.OESMSG_RPT_OPTION_HOLDING_VARIATION: self.on_option_holding, eOesMsgTypeT.OESMSG_RPT_CASH_ASSET_VARIATION: self.on_cash, - eOesMsgTypeT.OESMSG_SESS_HEARTBEAT: lambda x: x, + + eOesMsgTypeT.OESMSG_RPT_REPORT_SYNCHRONIZATION: lambda x: 1, + eOesMsgTypeT.OESMSG_SESS_HEARTBEAT: lambda x: 1, } def start(self): - self.alive = True - self.th.start() + self._alive = True + self._th.start() + + def stop(self): + self._alive = False def join(self): - self.th.join() + self._th.join() def on_message(self, session_info: SGeneralClientChannelT, head: SMsgHeadT, @@ -179,15 +193,14 @@ class OesTdMessageLoop: def reconnect(self): self.gateway.write_log(_("正在尝试重新连接到交易服务器。")) - self.td.connect() + self._td.connect() def message_loop(self): - rpt_channel = self.env.rptChannel + rpt_channel = self._env.rptChannel timeout_ms = 1000 - is_timeout = SPlatform_IsNegEtimeout is_disconnected = SPlatform_IsNegEpipe - while self.alive: + while self._alive: ret = OesApi_WaitReportMsg(rpt_channel, timeout_ms, self.on_message) @@ -195,7 +208,9 @@ class OesTdMessageLoop: # if is_timeout(ret): # pass # just no message if is_disconnected(ret): - self.reconnect() + self.gateway.write_log(_("与交易服务器的连接已断开。")) + while not self.reconnect() and self._alive: + pass return def on_order_rejected(self, d: OesRspMsgBodyT): @@ -203,7 +218,7 @@ class OesTdMessageLoop: error_string = error_to_str(error_code) data: OesOrdRejectT = d.rptMsg.rptBody.ordRejectRsp if not data.origClSeqNo: - i = self.order_manager.get_from_order_id(data.clSeqNo) + i = self._order_manager.get_from_order_id(data.clSeqNo) vt_order = i.vt_order if vt_order == Status.ALLTRADED: @@ -215,33 +230,41 @@ class OesTdMessageLoop: self.gateway.write_log( f"Order: {vt_order.vt_symbol}-{vt_order.vt_orderid} Code: {error_code} Rejected: {error_string}") else: - self.gateway.write_log(f"Failed to cancel Order, id: {data.origClSeqNo}") + self.gateway.write_log(f"撤单失败,订单号: {data.origClSeqNo}。原因:{error_string}") def on_order_inserted(self, d: OesRspMsgBodyT): data = d.rptMsg.rptBody.ordInsertRsp - i = self.order_manager.get_from_oes_data(data) + if not data.origClSeqNo: + i = self._order_manager.get_from_order_id(data.clSeqNo) + else: + i = self._order_manager.get_from_order_id(data.origClSeqNo) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.volume = data.ordQty vt_order.traded = data.cumQty + vt_order.time = parse_oes_datetime(data.ordDate, data.ordTime) self.gateway.on_order(vt_order) def on_order_report(self, d: OesRspMsgBodyT): data: OesOrdCnfmT = d.rptMsg.rptBody.ordCnfm - i = self.order_manager.get_from_oes_data(data) + if not data.origClSeqNo: + i = self._order_manager.get_from_order_id(data.clSeqNo) + else: + i = self._order_manager.get_from_order_id(data.origClSeqNo) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.volume = data.ordQty vt_order.traded = data.cumQty + vt_order.time = parse_oes_datetime(data.ordDate, data.ordCnfmTime) self.gateway.on_order(vt_order) def on_trade_report(self, d: OesRspMsgBodyT): data: OesTrdCnfmT = d.rptMsg.rptBody.trdCnfm - i = self.order_manager.get_from_oes_data(data) + i = self._order_manager.get_from_order_id(data.clSeqNo) vt_order = i.vt_order # vt_order.status = STATUS_OES2VT[data.ordStatus] @@ -255,7 +278,7 @@ class OesTdMessageLoop: offset=vt_order.offset, price=data.trdPrice / 10000, volume=data.trdQty, - time=datetime.utcnow().isoformat() # strict + time=parse_oes_datetime(data.trdDate, data.trdTime) ) self.gateway.on_trade(trade) @@ -268,7 +291,7 @@ class OesTdMessageLoop: # Oes have no async call to query order only. # And calling sync function here will slow down vnpy. # So we queue it into another thread. - self.td.schedule_query_order(i) + self._td.schedule_query_order(i) def on_option_holding(self, d: OesRspMsgBodyT): pass @@ -305,9 +328,6 @@ class OesTdMessageLoop: self.gateway.on_account(account) return 1 - def stop(self): - self.alive = False - class OesTdApi: @@ -571,8 +591,6 @@ class OesTdApi: oes_req = OesOrdCancelReqT() order_id = int(vt_req.orderid) - internal_order = self._order_manager.get_from_order_id(order_id) - oes_req.origClOrdId = internal_order.order_id oes_req.mktId = EXCHANGE_VT2OES[vt_req.exchange] oes_req.clSeqNo = seq_id @@ -603,7 +621,8 @@ class OesTdApi: body: Any, cursor: OesQryCursorT): data: OesOrdCnfmT = cast.toOesOrdItemT(body) - i = self._order_manager.get_from_oes_data(data) + + i = self._order_manager.get_from_order_id(data.clSeqNo) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.volume = data.ordQty - data.canceledQty @@ -631,7 +650,7 @@ class OesTdApi: ): data: OesOrdCnfmT = cast.toOesOrdItemT(body) try: - i = self._order_manager.get_from_oes_data(data) + i = self._order_manager.get_from_order_id(data.clSeqNo) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.volume = data.ordQty - data.canceledQty @@ -639,7 +658,7 @@ class OesTdApi: self.gateway.on_order(vt_order) except KeyError: # order_id = self.order_manager.new_remote_id() - order_id = self._order_manager.get_order_id_from_data(data) + order_id = data.clSeqNo if data.bsType == eOesBuySellTypeT.OES_BS_TYPE_BUY: offset = Offset.OPEN From 637dabd4060fb7067b12f81e8b87f510d00e97f2 Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 8 Mar 2019 07:15:01 -0400 Subject: [PATCH 06/13] [Mod] remove OrderManager --- vnpy/gateway/oes/oes_td.py | 86 +++++++++++++------------------------- 1 file changed, 30 insertions(+), 56 deletions(-) diff --git a/vnpy/gateway/oes/oes_td.py b/vnpy/gateway/oes/oes_td.py index a7e8f435..2cde3db6 100644 --- a/vnpy/gateway/oes/oes_td.py +++ b/vnpy/gateway/oes/oes_td.py @@ -108,48 +108,17 @@ def parse_oes_datetime(date: int, time: int): return datetime(year, month, day, hour, minute, sec, mill * 1000, tzinfo=bjtz) -class OrderManager: - - def __init__(self): - self.last_order_id = 100000000 - self._orders: Dict[int, InternalOrder] = {} - - @staticmethod - def hash_remote_order(data): - key = (data.origClSeqNo, data.origClOrdId, data.origClEnvId, data.userInfo) - return key - - @staticmethod - def hash_remote_trade(data: OesTrdCnfmT): - key = (data.clSeqNo, data.clOrdId, data.clEnvId, data.userInfo) - return key - - def save_local_created(self, order_id: int, order: OrderData): - print(f"saved order, id:{order_id}") - self._orders[order_id] = InternalOrder( - order_id=order_id, - vt_order=order, - ) - - def save_remote_created(self, order_id: int, vt_order: OrderData): - return self.save_local_created(order_id, vt_order) - - def get_from_order_id(self, id: int): - return self._orders[id] - - class OesTdMessageLoop: def __init__(self, gateway: BaseGateway, env: OesApiClientEnvT, - order_manager: OrderManager, td: "OesTdApi" ): self.gateway = gateway - self._td = td + self._env = env - self._order_manager = order_manager + self._td = td self._alive = False self._th = Thread(target=self.message_loop) @@ -218,7 +187,7 @@ class OesTdMessageLoop: error_string = error_to_str(error_code) data: OesOrdRejectT = d.rptMsg.rptBody.ordRejectRsp if not data.origClSeqNo: - i = self._order_manager.get_from_order_id(data.clSeqNo) + i = self._td.get_order(data.clSeqNo) vt_order = i.vt_order if vt_order == Status.ALLTRADED: @@ -236,9 +205,9 @@ class OesTdMessageLoop: data = d.rptMsg.rptBody.ordInsertRsp if not data.origClSeqNo: - i = self._order_manager.get_from_order_id(data.clSeqNo) + i = self._td.get_order(data.clSeqNo) else: - i = self._order_manager.get_from_order_id(data.origClSeqNo) + i = self._td.get_order(data.origClSeqNo) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.volume = data.ordQty @@ -251,20 +220,22 @@ class OesTdMessageLoop: data: OesOrdCnfmT = d.rptMsg.rptBody.ordCnfm if not data.origClSeqNo: - i = self._order_manager.get_from_order_id(data.clSeqNo) + i = self._td.get_order(data.clSeqNo) else: - i = self._order_manager.get_from_order_id(data.origClSeqNo) + i = self._td.get_order(data.origClSeqNo) vt_order = i.vt_order + vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.volume = data.ordQty vt_order.traded = data.cumQty vt_order.time = parse_oes_datetime(data.ordDate, data.ordCnfmTime) + self.gateway.on_order(vt_order) def on_trade_report(self, d: OesRspMsgBodyT): data: OesTrdCnfmT = d.rptMsg.rptBody.trdCnfm - i = self._order_manager.get_from_order_id(data.clSeqNo) + i = self._td.get_order(data.clSeqNo) vt_order = i.vt_order # vt_order.status = STATUS_OES2VT[data.ordStatus] @@ -280,18 +251,11 @@ class OesTdMessageLoop: volume=data.trdQty, time=parse_oes_datetime(data.trdDate, data.trdTime) ) + vt_order.status = STATUS_OES2VT[data.ordStatus] + vt_order.traded = data.origOrdQty + data.trdQty + vt_order.time = parse_oes_datetime(data.trdDate, data.trdTime) self.gateway.on_trade(trade) - - # hack : - # Sometimes order_report is not received after a trade is received. - # (only trade msg but no order msg) - # This cause a problem that vt_order.traded stay 0 after a trade, which is a error state. - # So we have to query new status of order for every receiving of trade. - # BUT - # Oes have no async call to query order only. - # And calling sync function here will slow down vnpy. - # So we queue it into another thread. - self._td.schedule_query_order(i) + self.gateway.on_order(vt_order) def on_option_holding(self, d: OesRspMsgBodyT): pass @@ -339,14 +303,14 @@ class OesTdApi: self._env = OesApiClientEnvT() - self._order_manager = OrderManager() self._message_loop = OesTdMessageLoop(gateway, self._env, - self._order_manager, self) self._last_seq_lock = Lock() self._last_seq_index = 1000000 # 0 has special manning for oes + + self._orders: Dict[int, InternalOrder] = {} def connect(self): """Connect to trading server. @@ -574,7 +538,7 @@ class OesTdApi: order = vt_req.create_order_data(str(order_id), self.gateway.gateway_name) order.direction = Direction.NET # fix direction into NET: stock only - self._order_manager.save_local_created(order_id, order) + self.save_order(order_id, order) ret = OesApi_SendOrderReq(self._env.ordChannel, oes_req ) @@ -622,7 +586,7 @@ class OesTdApi: cursor: OesQryCursorT): data: OesOrdCnfmT = cast.toOesOrdItemT(body) - i = self._order_manager.get_from_order_id(data.clSeqNo) + i = self._td.get_order(data.clSeqNo) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.volume = data.ordQty - data.canceledQty @@ -650,7 +614,7 @@ class OesTdApi: ): data: OesOrdCnfmT = cast.toOesOrdItemT(body) try: - i = self._order_manager.get_from_order_id(data.clSeqNo) + i = self.get_order(data.clSeqNo) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.volume = data.ordQty - data.canceledQty @@ -680,6 +644,16 @@ class OesTdApi: # this time should be generated automatically or by a static function time=datetime.utcnow().isoformat(), ) - self._order_manager.save_remote_created(order_id, vt_order) + self.save_order(order_id, vt_order) self.gateway.on_order(vt_order) return 1 + + def save_order(self, order_id: int, order: OrderData): + self._orders[order_id] = InternalOrder( + order_id=order_id, + vt_order=order, + ) + + def get_order(self, order_id: int): + return self._orders[order_id] + From eb5aac2595d350e84b55149c93da2a846eaf3c7e Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 8 Mar 2019 07:26:04 -0400 Subject: [PATCH 07/13] =?UTF-8?q?[Mod]=20=E7=BB=9F=E4=B8=80=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F=EF=BC=8C=E9=9A=90=E8=97=8F=E5=86=85=E9=83=A8=E5=8F=98?= =?UTF-8?q?=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/gateway/oes/oes_gateway.py | 2 + vnpy/gateway/oes/oes_md.py | 79 ++++++++++++++++++++++----------- vnpy/gateway/oes/oes_td.py | 65 +++++++++++++++++++++------ 3 files changed, 106 insertions(+), 40 deletions(-) diff --git a/vnpy/gateway/oes/oes_gateway.py b/vnpy/gateway/oes/oes_gateway.py index 3f7ad72e..2cdd0f58 100644 --- a/vnpy/gateway/oes/oes_gateway.py +++ b/vnpy/gateway/oes/oes_gateway.py @@ -38,6 +38,7 @@ class OesGateway(BaseGateway): self.td_api = OesTdApi(self) def connect(self, setting: dict): + """""" return self._connect_async(setting) def _connect_sync(self, setting: dict): @@ -89,6 +90,7 @@ class OesGateway(BaseGateway): self.write_log(_("无法连接到交易服务器,请检查你的配置")) def _connect_async(self, setting: dict): + """""" Thread(target=self._connect_sync, args=(setting,)).start() def subscribe(self, req: SubscribeRequest): diff --git a/vnpy/gateway/oes/oes_md.py b/vnpy/gateway/oes/oes_md.py index 75d41432..1d45e56f 100644 --- a/vnpy/gateway/oes/oes_md.py +++ b/vnpy/gateway/oes/oes_md.py @@ -9,7 +9,7 @@ from vnpy.api.oes.vnoes import MdsApiClientEnvT, MdsApi_DestoryAll, MdsApi_InitA MdsApi_IsValidQryChannel, MdsApi_IsValidTcpChannel, MdsApi_LogoutAll, MdsApi_SetThreadPassword, \ MdsApi_SetThreadUsername, MdsApi_SubscribeMarketData, MdsApi_WaitOnMsg, MdsL2StockSnapshotBodyT, \ MdsMktDataRequestEntryT, MdsMktDataRequestReqT, MdsMktRspMsgBodyT, MdsStockSnapshotBodyT, \ - SGeneralClientChannelT, SMsgHeadT, SPlatform_IsNegEpipe, SPlatform_IsNegEtimeout, cast, \ + SGeneralClientChannelT, SMsgHeadT, SPlatform_IsNegEpipe, cast, \ eMdsExchangeIdT, eMdsMktSubscribeFlagT, eMdsMsgTypeT, eMdsSecurityTypeT, eMdsSubscribeDataTypeT, \ eMdsSubscribeModeT, eMdsSubscribedTickExpireTypeT, eSMsgProtocolTypeT @@ -28,11 +28,12 @@ EXCHANGE_VT2MDS = {v: k for k, v in EXCHANGE_MDS2VT.items()} class OesMdMessageLoop: def __init__(self, gateway: BaseGateway, md: "OesMdApi", env: MdsApiClientEnvT): + """""" self.gateway = gateway self.env = env self.alive = False self.md = md - self.th = Thread(target=self.message_loop) + self.th = Thread(target=self._message_loop) self.message_handlers: Dict[int, Callable[[dict], None]] = { # tick & orderbook @@ -58,9 +59,29 @@ class OesMdMessageLoop: self.symbol_to_exchange: Dict[str, Exchange] = {} def register_symbol(self, symbol: str, exchange: Exchange): + """""" self.symbol_to_exchange[symbol] = exchange - def get_last_tick(self, symbol): + def start(self): + """""" + self.alive = True + self.th.start() + + def stop(self): + """""" + self.alive = False + + def join(self): + """""" + self.th.join() + + def reconnect(self): + """""" + self.gateway.write_log(_("正在尝试重新连接到行情服务器。")) + return self.md.connect() + + def _get_last_tick(self, symbol): + """""" try: return self.last_tick[symbol] except KeyError: @@ -68,25 +89,15 @@ class OesMdMessageLoop: gateway_name=self.gateway.gateway_name, symbol=symbol, exchange=self.symbol_to_exchange[symbol], - # todo: use cache of something else to resolve exchange datetime=datetime.utcnow() ) self.last_tick[symbol] = tick return tick - def start(self): - self.alive = True - self.th.start() - - def stop(self): - self.alive = False - - def join(self): - self.th.join() - - def on_message(self, session_info: SGeneralClientChannelT, - head: SMsgHeadT, - body: Any): + def _on_message(self, session_info: SGeneralClientChannelT, + head: SMsgHeadT, + body: Any): + """""" if session_info.protocolType == eSMsgProtocolTypeT.SMSG_PROTO_BINARY: b = cast.toMdsMktRspMsgBodyT(body) if head.msgId in self.message_handlers: @@ -100,19 +111,16 @@ class OesMdMessageLoop: self.gateway.write_log(f"unknown prototype : {session_info.protocolType}") return 1 - def reconnect(self): - self.gateway.write_log(_("正在尝试重新连接到行情服务器。")) - return self.md.connect() - - def message_loop(self): + def _message_loop(self): + """""" tcp_channel = self.env.tcpChannel timeout_ms = 1000 - is_timeout = SPlatform_IsNegEtimeout + # is_timeout = SPlatform_IsNegEtimeout is_disconnected = SPlatform_IsNegEpipe while self.alive: ret = MdsApi_WaitOnMsg(tcp_channel, timeout_ms, - self.on_message) + self._on_message) if ret < 0: # if is_timeout(ret): # pass # just no message @@ -123,9 +131,10 @@ class OesMdMessageLoop: return def on_l2_market_data_snapshot(self, d: MdsMktRspMsgBodyT): + """""" data: MdsL2StockSnapshotBodyT = d.mktDataSnapshot.l2Stock symbol = str(data.SecurityID) - tick = self.get_last_tick(symbol) + tick = self._get_last_tick(symbol) tick.open_price = data.OpenPx / 10000 tick.pre_close = data.ClosePx / 10000 tick.high_price = data.HighPx / 10000 @@ -138,9 +147,10 @@ class OesMdMessageLoop: self.gateway.on_tick(tick) def on_market_full_refresh(self, d: MdsMktRspMsgBodyT): + """""" data: MdsStockSnapshotBodyT = d.mktDataSnapshot.stock symbol = data.SecurityID - tick = self.get_last_tick(symbol) + tick = self._get_last_tick(symbol) tick.open_price = data.OpenPx / 10000 tick.pre_close = data.ClosePx / 10000 tick.high_price = data.HighPx / 10000 @@ -153,42 +163,52 @@ class OesMdMessageLoop: self.gateway.on_tick(tick) def on_l2_trade(self, d: MdsMktRspMsgBodyT): + """""" data = d.trade symbol = data.SecurityID - tick = self.get_last_tick(symbol) + tick = self._get_last_tick(symbol) tick.datetime = datetime.utcnow() tick.volume = data.TradeQty tick.last_price = data.TradePrice / 10000 self.gateway.on_tick(tick) def on_market_data_request(self, d: MdsMktRspMsgBodyT): + """""" pass def on_trading_session_status(self, d: MdsMktRspMsgBodyT): + """""" pass def on_l2_market_overview(self, d: MdsMktRspMsgBodyT): + """""" pass def on_index_snapshot_full_refresh(self, d: MdsMktRspMsgBodyT): + """""" pass def on_option_snapshot_ful_refresh(self, d: MdsMktRspMsgBodyT): + """""" pass def on_best_orders_snapshot(self, d: MdsMktRspMsgBodyT): + """""" pass def on_l2_order(self, d: MdsMktRspMsgBodyT): + """""" pass def on_security_status(self, d: MdsMktRspMsgBodyT): + """""" pass class OesMdApi: def __init__(self, gateway: BaseGateway): + """""" self.gateway = gateway self.config_path: str = '' self.username: str = '' @@ -198,6 +218,7 @@ class OesMdApi: self._message_loop = OesMdMessageLoop(gateway, self, self._env) def connect(self) -> bool: + """""" """Connect to trading server. :note set config_path before calling this function """ @@ -214,18 +235,22 @@ class OesMdApi: return True def start(self): + """""" self._message_loop.start() def stop(self): + """""" self._message_loop.stop() MdsApi_LogoutAll(self._env, True) MdsApi_DestoryAll(self._env) def join(self): + """""" self._message_loop.join() # why isn't arg a ContractData? def subscribe(self, req: SubscribeRequest): + """""" mds_req = MdsMktDataRequestReqT() entry = MdsMktDataRequestEntryT() mds_req.subMode = eMdsSubscribeModeT.MDS_SUB_MODE_APPEND diff --git a/vnpy/gateway/oes/oes_td.py b/vnpy/gateway/oes/oes_td.py index 2cde3db6..e399cc3b 100644 --- a/vnpy/gateway/oes/oes_td.py +++ b/vnpy/gateway/oes/oes_td.py @@ -3,7 +3,7 @@ from datetime import datetime, timedelta, timezone from gettext import gettext as _ from threading import Lock, Thread # noinspection PyUnresolvedReferences -from typing import Any, Callable, Dict, Tuple +from typing import Any, Callable, Dict from vnpy.api.oes.vnoes import OesApiClientEnvT, OesApi_DestoryAll, OesApi_InitAllByConvention, \ OesApi_IsValidOrdChannel, OesApi_IsValidQryChannel, OesApi_IsValidRptChannel, OesApi_LogoutAll, \ @@ -95,6 +95,7 @@ class InternalOrder: def parse_oes_datetime(date: int, time: int): + """convert oes datetime to python datetime""" # YYYYMMDD year = int(date / 10000) month = int((date % 10000) / 100) @@ -115,13 +116,14 @@ class OesTdMessageLoop: env: OesApiClientEnvT, td: "OesTdApi" ): + """""" self.gateway = gateway self._env = env self._td = td self._alive = False - self._th = Thread(target=self.message_loop) + self._th = Thread(target=self._message_loop) self.message_handlers: Dict[int, Callable[[dict], None]] = { eOesMsgTypeT.OESMSG_RPT_BUSINESS_REJECT: self.on_order_rejected, @@ -137,18 +139,27 @@ class OesTdMessageLoop: } def start(self): + """""" self._alive = True self._th.start() def stop(self): + """""" self._alive = False def join(self): + """""" self._th.join() - def on_message(self, session_info: SGeneralClientChannelT, - head: SMsgHeadT, - body: Any): + def reconnect(self): + """""" + self.gateway.write_log(_("正在尝试重新连接到交易服务器。")) + self._td.connect() + + def _on_message(self, session_info: SGeneralClientChannelT, + head: SMsgHeadT, + body: Any): + """""" if session_info.protocolType == SMSG_PROTO_BINARY: b = cast.toOesRspMsgBodyT(body) if head.msgId in self.message_handlers: @@ -160,11 +171,8 @@ class OesTdMessageLoop: self.gateway.write_log(f"unknown prototype : {session_info.protocolType}") return 1 - def reconnect(self): - self.gateway.write_log(_("正在尝试重新连接到交易服务器。")) - self._td.connect() - - def message_loop(self): + def _message_loop(self): + """""" rpt_channel = self._env.rptChannel timeout_ms = 1000 is_disconnected = SPlatform_IsNegEpipe @@ -172,7 +180,7 @@ class OesTdMessageLoop: while self._alive: ret = OesApi_WaitReportMsg(rpt_channel, timeout_ms, - self.on_message) + self._on_message) if ret < 0: # if is_timeout(ret): # pass # just no message @@ -183,6 +191,7 @@ class OesTdMessageLoop: return def on_order_rejected(self, d: OesRspMsgBodyT): + """""" error_code = d.rptMsg.rptHead.ordRejReason error_string = error_to_str(error_code) data: OesOrdRejectT = d.rptMsg.rptBody.ordRejectRsp @@ -202,6 +211,7 @@ class OesTdMessageLoop: self.gateway.write_log(f"撤单失败,订单号: {data.origClSeqNo}。原因:{error_string}") def on_order_inserted(self, d: OesRspMsgBodyT): + """""" data = d.rptMsg.rptBody.ordInsertRsp if not data.origClSeqNo: @@ -217,6 +227,7 @@ class OesTdMessageLoop: self.gateway.on_order(vt_order) def on_order_report(self, d: OesRspMsgBodyT): + """""" data: OesOrdCnfmT = d.rptMsg.rptBody.ordCnfm if not data.origClSeqNo: @@ -233,6 +244,7 @@ class OesTdMessageLoop: self.gateway.on_order(vt_order) def on_trade_report(self, d: OesRspMsgBodyT): + """""" data: OesTrdCnfmT = d.rptMsg.rptBody.trdCnfm i = self._td.get_order(data.clSeqNo) @@ -258,9 +270,11 @@ class OesTdMessageLoop: self.gateway.on_order(vt_order) def on_option_holding(self, d: OesRspMsgBodyT): + """""" pass def on_stock_holding(self, d: OesRspMsgBodyT): + """""" data = d.rptMsg.rptBody.stkHoldingRpt position = PositionData( gateway_name=self.gateway.gateway_name, @@ -277,6 +291,7 @@ class OesTdMessageLoop: self.gateway.on_position(position) def on_cash(self, d: OesRspMsgBodyT): + """""" data = d.rptMsg.rptBody.cashAssetRpt balance = data.currentTotalBal @@ -296,6 +311,7 @@ class OesTdMessageLoop: class OesTdApi: def __init__(self, gateway: BaseGateway): + """""" self.config_path: str = None self.username: str = '' self.password: str = '' @@ -309,7 +325,7 @@ class OesTdApi: self._last_seq_lock = Lock() self._last_seq_index = 1000000 # 0 has special manning for oes - + self._orders: Dict[int, InternalOrder] = {} def connect(self): @@ -333,14 +349,17 @@ class OesTdApi: return True def start(self): + """""" self._message_loop.start() def stop(self): + """""" self._message_loop.stop() OesApi_LogoutAll(self._env, True) OesApi_DestoryAll(self._env) def join(self): + """""" self._message_loop.join() def _get_new_seq_index(self): @@ -351,6 +370,7 @@ class OesTdApi: return index def query_account(self): + """""" OesApi_QueryCashAsset(self._env.qryChannel, OesQryCashAssetFilterT(), self.on_query_asset @@ -362,6 +382,7 @@ class OesTdApi: body: Any, cursor: OesQryCursorT, ): + """""" data = cast.toOesCashAssetItemT(body) balance = data.currentTotalBal / 10000 availiable = data.currentAvailableBal / 10000 @@ -377,10 +398,12 @@ class OesTdApi: return 1 def query_stock(self, ) -> bool: + """""" # Thread(target=self._query_stock, ).start() return self._query_stock() def _query_stock(self, ) -> bool: + """""" f = OesQryStockFilterT() ret = OesApi_QueryStock(self._env.qryChannel, f, self.on_query_stock) return ret >= 0 @@ -391,6 +414,7 @@ class OesTdApi: body: Any, cursor: OesQryCursorT, ): + """""" data: OesStockBaseInfoT = cast.toOesStockItemT(body) contract = ContractData( gateway_name=self.gateway.gateway_name, @@ -405,6 +429,7 @@ class OesTdApi: return 1 def query_option(self) -> bool: + """""" f = OesQryOptionFilterT() ret = OesApi_QueryOption(self._env.qryChannel, f, @@ -418,6 +443,7 @@ class OesTdApi: body: Any, cursor: OesQryCursorT, ): + """""" data = cast.toOesOptionItemT(body) contract = ContractData( gateway_name=self.gateway.gateway_name, @@ -432,6 +458,7 @@ class OesTdApi: return 1 def query_stock_holding(self) -> bool: + """""" f = OesQryStkHoldingFilterT() ret = OesApi_QueryStkHolding(self._env.qryChannel, f, @@ -445,6 +472,7 @@ class OesTdApi: body: Any, cursor: OesQryCursorT, ): + """""" data = cast.toOesStkHoldingItemT(body) position = PositionData( @@ -463,6 +491,7 @@ class OesTdApi: return 1 def query_option_holding(self) -> bool: + """""" f = OesQryStkHoldingFilterT() f.mktId = eOesMarketIdT.OES_MKT_ID_UNDEFINE f.userInfo = 0 @@ -478,6 +507,7 @@ class OesTdApi: body: Any, cursor: OesQryCursorT, ): + """""" data = cast.toOesOptHoldingItemT(body) # 权利 @@ -513,15 +543,18 @@ class OesTdApi: return 1 def query_contracts(self): + """""" self.query_stock() # self.query_option() # self.query_issue() def query_position(self): + """""" self.query_stock_holding() self.query_option_holding() def send_order(self, vt_req: OrderRequest): + """""" seq_id = self._get_new_seq_index() order_id = seq_id @@ -551,6 +584,7 @@ class OesTdApi: return order.vt_orderid def cancel_order(self, vt_req: CancelRequest): + """""" seq_id = self._get_new_seq_index() oes_req = OesOrdCancelReqT() @@ -565,11 +599,13 @@ class OesTdApi: oes_req) def schedule_query_order(self, internal_order: InternalOrder) -> Thread: + """""" th = Thread(target=self.query_order, args=(internal_order,)) th.start() return th def query_order(self, internal_order: InternalOrder) -> bool: + """""" f = OesQryOrdFilterT() f.mktId = EXCHANGE_VT2OES[internal_order.vt_order.exchange] f.clSeqNo = internal_order.order_id @@ -584,6 +620,7 @@ class OesTdApi: head: SMsgHeadT, body: Any, cursor: OesQryCursorT): + """""" data: OesOrdCnfmT = cast.toOesOrdItemT(body) i = self._td.get_order(data.clSeqNo) @@ -612,6 +649,7 @@ class OesTdApi: body: Any, cursor: OesQryCursorT, ): + """""" data: OesOrdCnfmT = cast.toOesOrdItemT(body) try: i = self.get_order(data.clSeqNo) @@ -649,11 +687,12 @@ class OesTdApi: return 1 def save_order(self, order_id: int, order: OrderData): + """""" self._orders[order_id] = InternalOrder( order_id=order_id, vt_order=order, ) def get_order(self, order_id: int): + """""" return self._orders[order_id] - From 7eaa0c19f66542c83ef3d4c040212966c32aeb83 Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 8 Mar 2019 07:28:50 -0400 Subject: [PATCH 08/13] [Del] remove useless code --- vnpy/gateway/oes/oes_td.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/vnpy/gateway/oes/oes_td.py b/vnpy/gateway/oes/oes_td.py index e399cc3b..77184c32 100644 --- a/vnpy/gateway/oes/oes_td.py +++ b/vnpy/gateway/oes/oes_td.py @@ -398,11 +398,6 @@ class OesTdApi: return 1 def query_stock(self, ) -> bool: - """""" - # Thread(target=self._query_stock, ).start() - return self._query_stock() - - def _query_stock(self, ) -> bool: """""" f = OesQryStockFilterT() ret = OesApi_QueryStock(self._env.qryChannel, f, self.on_query_stock) @@ -598,12 +593,6 @@ class OesTdApi: OesApi_SendOrderCancelReq(self._env.ordChannel, oes_req) - def schedule_query_order(self, internal_order: InternalOrder) -> Thread: - """""" - th = Thread(target=self.query_order, args=(internal_order,)) - th.start() - return th - def query_order(self, internal_order: InternalOrder) -> bool: """""" f = OesQryOrdFilterT() @@ -623,7 +612,7 @@ class OesTdApi: """""" data: OesOrdCnfmT = cast.toOesOrdItemT(body) - i = self._td.get_order(data.clSeqNo) + i = self.get_order(data.clSeqNo) vt_order = i.vt_order vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.volume = data.ordQty - data.canceledQty From 9b694fce0764457fa6be4896f8313ebab001e715 Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 8 Mar 2019 07:33:36 -0400 Subject: [PATCH 09/13] =?UTF-8?q?[Mod]=20md,=20td=E5=B9=B6=E8=A1=8C?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=EF=BC=8C=E5=8A=A0=E5=BF=ABtd=E5=93=8D?= =?UTF-8?q?=E5=BA=94=E9=80=9F=E5=BA=A6=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/gateway/oes/oes_gateway.py | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/vnpy/gateway/oes/oes_gateway.py b/vnpy/gateway/oes/oes_gateway.py index 2cdd0f58..9901e755 100644 --- a/vnpy/gateway/oes/oes_gateway.py +++ b/vnpy/gateway/oes/oes_gateway.py @@ -38,10 +38,6 @@ class OesGateway(BaseGateway): self.td_api = OesTdApi(self) def connect(self, setting: dict): - """""" - return self._connect_async(setting) - - def _connect_sync(self, setting: dict): """""" if not setting['password'].startswith("md5:"): setting['password'] = "md5:" + hashlib.md5(setting['password'].encode()).hexdigest() @@ -67,14 +63,10 @@ class OesGateway(BaseGateway): log_path=log_path) f.write(content) - self.md_api.config_path = config_path - self.md_api.username = username - self.md_api.password = password - if self.md_api.connect(): - self.md_api.start() - else: - self.write_log(_("无法连接到行情服务器,请检查你的配置")) + Thread(target=self._connect_md_sync, args=(config_path, username, password)).start() + Thread(target=self._connect_td_sync, args=(config_path, username, password)).start() + def _connect_td_sync(self, config_path, username, password): self.td_api.config_path = config_path self.td_api.username = username self.td_api.password = password @@ -89,9 +81,14 @@ class OesGateway(BaseGateway): else: self.write_log(_("无法连接到交易服务器,请检查你的配置")) - def _connect_async(self, setting: dict): - """""" - Thread(target=self._connect_sync, args=(setting,)).start() + def _connect_md_sync(self, config_path, username, password): + self.md_api.config_path = config_path + self.md_api.username = username + self.md_api.password = password + if self.md_api.connect(): + self.md_api.start() + else: + self.write_log(_("无法连接到行情服务器,请检查你的配置")) def subscribe(self, req: SubscribeRequest): """""" From 34bce32dc0f7ffc4ea56348d9c615ee4ba43969a Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 8 Mar 2019 07:40:54 -0400 Subject: [PATCH 10/13] =?UTF-8?q?[Fix]=20=E4=BF=AE=E5=A4=8Dstop()=E4=B9=8B?= =?UTF-8?q?=E5=90=8E=E8=BF=98=E9=87=8D=E8=BF=9E=E4=B8=80=E6=AC=A1=E7=9A=84?= =?UTF-8?q?bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/gateway/oes/oes_md.py | 21 +++++++++++---------- vnpy/gateway/oes/oes_td.py | 2 +- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/vnpy/gateway/oes/oes_md.py b/vnpy/gateway/oes/oes_md.py index 1d45e56f..476799df 100644 --- a/vnpy/gateway/oes/oes_md.py +++ b/vnpy/gateway/oes/oes_md.py @@ -31,9 +31,10 @@ class OesMdMessageLoop: """""" self.gateway = gateway self.env = env - self.alive = False - self.md = md - self.th = Thread(target=self._message_loop) + + self._alive = False + self._md = md + self._th = Thread(target=self._message_loop) self.message_handlers: Dict[int, Callable[[dict], None]] = { # tick & orderbook @@ -64,21 +65,21 @@ class OesMdMessageLoop: def start(self): """""" - self.alive = True - self.th.start() + self._alive = True + self._th.start() def stop(self): """""" - self.alive = False + self._alive = False def join(self): """""" - self.th.join() + self._th.join() def reconnect(self): """""" self.gateway.write_log(_("正在尝试重新连接到行情服务器。")) - return self.md.connect() + return self._md.connect() def _get_last_tick(self, symbol): """""" @@ -117,7 +118,7 @@ class OesMdMessageLoop: timeout_ms = 1000 # is_timeout = SPlatform_IsNegEtimeout is_disconnected = SPlatform_IsNegEpipe - while self.alive: + while self._alive: ret = MdsApi_WaitOnMsg(tcp_channel, timeout_ms, self._on_message) @@ -126,7 +127,7 @@ class OesMdMessageLoop: # pass # just no message if is_disconnected(ret): self.gateway.write_log(_("与行情服务器的连接已断开。")) - while not self.reconnect() and self.alive: + while self._alive and not self.reconnect(): time.sleep(1) return diff --git a/vnpy/gateway/oes/oes_td.py b/vnpy/gateway/oes/oes_td.py index 77184c32..f0f8f9a1 100644 --- a/vnpy/gateway/oes/oes_td.py +++ b/vnpy/gateway/oes/oes_td.py @@ -186,7 +186,7 @@ class OesTdMessageLoop: # pass # just no message if is_disconnected(ret): self.gateway.write_log(_("与交易服务器的连接已断开。")) - while not self.reconnect() and self._alive: + while self._alive and not self.reconnect(): pass return From 0daffb3e5fbed8f977c2602bf9cdfaac1c13055b Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 8 Mar 2019 08:50:45 -0400 Subject: [PATCH 11/13] [Mod] OesTdApi: uniform naming. --- vnpy/gateway/oes/oes_td.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/vnpy/gateway/oes/oes_td.py b/vnpy/gateway/oes/oes_td.py index f0f8f9a1..d58c4011 100644 --- a/vnpy/gateway/oes/oes_td.py +++ b/vnpy/gateway/oes/oes_td.py @@ -567,6 +567,7 @@ class OesTdApi: order = vt_req.create_order_data(str(order_id), self.gateway.gateway_name) order.direction = Direction.NET # fix direction into NET: stock only self.save_order(order_id, order) + ret = OesApi_SendOrderReq(self._env.ordChannel, oes_req ) @@ -620,24 +621,21 @@ class OesTdApi: self.gateway.on_order(vt_order) return 1 - def init_query_orders(self) -> bool: - """ - :note: this function can be called only before calling send_order - :return: - """ + def query_orders(self) -> bool: + """""" f = OesQryOrdFilterT() ret = OesApi_QueryOrder(self._env.qryChannel, f, - self.on_init_query_orders + self.on_query_orders ) return ret >= 0 - def on_init_query_orders(self, - session_info: SGeneralClientChannelT, - head: SMsgHeadT, - body: Any, - cursor: OesQryCursorT, - ): + def on_query_orders(self, + session_info: SGeneralClientChannelT, + head: SMsgHeadT, + body: Any, + cursor: OesQryCursorT, + ): """""" data: OesOrdCnfmT = cast.toOesOrdItemT(body) try: From 212b864c6933edbbc99e3212595a7acfe0d314db Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 8 Mar 2019 08:56:05 -0400 Subject: [PATCH 12/13] [Add] Add lock to make public methods of gateway thread-safe --- vnpy/gateway/oes/oes_gateway.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/vnpy/gateway/oes/oes_gateway.py b/vnpy/gateway/oes/oes_gateway.py index 9901e755..7a3f97a7 100644 --- a/vnpy/gateway/oes/oes_gateway.py +++ b/vnpy/gateway/oes/oes_gateway.py @@ -4,7 +4,7 @@ import hashlib import os from gettext import gettext as _ -from threading import Thread +from threading import Thread, Lock from vnpy.trader.gateway import BaseGateway from vnpy.trader.object import (CancelRequest, OrderRequest, @@ -37,6 +37,12 @@ class OesGateway(BaseGateway): self.md_api = OesMdApi(self) self.td_api = OesTdApi(self) + self._lock_subscribe = Lock() + self._lock_send_order = Lock() + self._lock_cancel_order = Lock() + self._lock_query_position = Lock() + self._lock_query_account = Lock() + def connect(self, setting: dict): """""" if not setting['password'].startswith("md5:"): @@ -76,7 +82,7 @@ class OesGateway(BaseGateway): self.td_api.query_contracts() self.write_log("合约信息查询成功") self.td_api.query_position() - self.td_api.init_query_orders() + self.td_api.query_orders() self.td_api.start() else: self.write_log(_("无法连接到交易服务器,请检查你的配置")) @@ -92,23 +98,28 @@ class OesGateway(BaseGateway): def subscribe(self, req: SubscribeRequest): """""" - self.md_api.subscribe(req) + with self._lock_subscribe: + self.md_api.subscribe(req) def send_order(self, req: OrderRequest): """""" - return self.td_api.send_order(req) + with self._lock_send_order: + return self.td_api.send_order(req) def cancel_order(self, req: CancelRequest): """""" - self.td_api.cancel_order(req) + with self._lock_cancel_order: + self.td_api.cancel_order(req) def query_account(self): """""" - self.td_api.query_account() + with self._lock_query_account: + self.td_api.query_account() def query_position(self): """""" - self.td_api.query_position() + with self._lock_query_position: + self.td_api.query_position() def close(self): """""" From d1306dd2375fb947e5607afd43bc22b3bc80439a Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 8 Mar 2019 08:56:47 -0400 Subject: [PATCH 13/13] =?UTF-8?q?[Fix]=20=E4=BF=AE=E6=AD=A3=E6=88=90?= =?UTF-8?q?=E4=BA=A4=E5=8F=91=E7=94=9F=E6=97=B6=E8=AE=A2=E5=8D=95=E6=88=90?= =?UTF-8?q?=E4=BA=A4=E9=87=8F=E9=94=99=E8=AF=AF=E7=9A=84BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/gateway/oes/oes_td.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vnpy/gateway/oes/oes_td.py b/vnpy/gateway/oes/oes_td.py index d58c4011..ea6ae7e7 100644 --- a/vnpy/gateway/oes/oes_td.py +++ b/vnpy/gateway/oes/oes_td.py @@ -264,7 +264,7 @@ class OesTdMessageLoop: time=parse_oes_datetime(data.trdDate, data.trdTime) ) vt_order.status = STATUS_OES2VT[data.ordStatus] - vt_order.traded = data.origOrdQty + data.trdQty + vt_order.traded = data.cumQty vt_order.time = parse_oes_datetime(data.trdDate, data.trdTime) self.gateway.on_trade(trade) self.gateway.on_order(vt_order)