From 5f5bf06e5321beeb71d0852c21163ba8a14d9ee0 Mon Sep 17 00:00:00 2001 From: nanoric Date: Thu, 14 Mar 2019 23:03:14 -0400 Subject: [PATCH] [Add] OesGateway: User can supply a hdd serial number. [Fix] OesGateway: Optimized reconnection of td api: reconnect if connection of ord_channel is lost. --- vnpy/gateway/oes/oes_gateway.py | 2 + vnpy/gateway/oes/oes_td.py | 152 ++++++++++++++++++++------------ 2 files changed, 96 insertions(+), 58 deletions(-) diff --git a/vnpy/gateway/oes/oes_gateway.py b/vnpy/gateway/oes/oes_gateway.py index 2a64cf55..e28a32fa 100644 --- a/vnpy/gateway/oes/oes_gateway.py +++ b/vnpy/gateway/oes/oes_gateway.py @@ -28,6 +28,7 @@ class OesGateway(BaseGateway): "md_qry_server": "", "username": "", "password": "", + "hdd_serial": "", } def __init__(self, event_engine): @@ -76,6 +77,7 @@ class OesGateway(BaseGateway): self.td_api.ord_server = setting['td_ord_server'] self.td_api.rpt_server = setting['td_rpt_server'] self.td_api.qry_server = setting['td_qry_server'] + self.td_api.hdd_serial = setting['hdd_serial'] Thread(target=self._connect_td_sync, args=(config_path, username, password)).start() def _connect_td_sync(self, config_path, username, password): diff --git a/vnpy/gateway/oes/oes_td.py b/vnpy/gateway/oes/oes_td.py index 70ca38a9..0d7f40ec 100644 --- a/vnpy/gateway/oes/oes_td.py +++ b/vnpy/gateway/oes/oes_td.py @@ -7,16 +7,15 @@ from typing import Any, Callable, Dict from vnpy.api.oes.vnoes import OesApiClientEnvT, OesApiSubscribeInfoT, OesApi_DestoryAll, \ OesApi_InitLogger, OesApi_InitOrdChannel2, OesApi_InitQryChannel2, OesApi_InitRptChannel2, \ - OesApi_IsValidOrdChannel, OesApi_IsValidQryChannel, OesApi_LogoutAll, OesApi_QueryCashAsset, \ + OesApi_IsValidOrdChannel, OesApi_LogoutAll, OesApi_QueryCashAsset, \ OesApi_QueryOptHolding, OesApi_QueryOption, OesApi_QueryOrder, OesApi_QueryStkHolding, \ - OesApi_QueryStock, OesApi_SendOrderCancelReq, OesApi_SendOrderReq, OesApi_SetThreadPassword, \ - OesApi_SetThreadUsername, OesApi_WaitReportMsg, OesOrdCancelReqT, OesOrdCnfmT, OesOrdRejectT, \ - OesOrdReqT, OesQryCashAssetFilterT, OesQryCursorT, OesQryOptionFilterT, OesQryOrdFilterT, \ - OesQryStkHoldingFilterT, OesQryStockFilterT, OesRspMsgBodyT, OesStockBaseInfoT, OesTrdCnfmT, \ - SGeneralClientChannelT, SMSG_PROTO_BINARY, SMsgHeadT, SPlatform_IsNegEpipe, cast, \ - eOesBuySellTypeT, eOesMarketIdT, eOesMsgTypeT, eOesOrdStatusT, eOesOrdTypeShT, eOesOrdTypeSzT, \ - eOesSubscribeReportTypeT, OesApi_SetCustomizedDriverId, OesApi_GetCustomizedDriverId, \ - OesApi_InitAllByConvention + OesApi_QueryStock, OesApi_SendOrderCancelReq, OesApi_SendOrderReq, OesApi_SetCustomizedDriverId, \ + OesApi_SetThreadPassword, OesApi_SetThreadUsername, OesApi_WaitReportMsg, OesOrdCancelReqT, \ + OesOrdCnfmT, OesOrdRejectT, OesOrdReqT, OesQryCashAssetFilterT, OesQryCursorT, \ + OesQryOptionFilterT, OesQryOrdFilterT, OesQryStkHoldingFilterT, OesQryStockFilterT, \ + OesRspMsgBodyT, OesStockBaseInfoT, OesTrdCnfmT, SGeneralClientChannelT, SMSG_PROTO_BINARY, \ + SMsgHeadT, SPlatform_IsNegEpipe, cast, eOesBuySellTypeT, eOesMarketIdT, eOesMsgTypeT, \ + eOesOrdStatusT, eOesOrdTypeShT, eOesOrdTypeSzT, eOesSubscribeReportTypeT from vnpy.gateway.oes.error_code import error_to_str from vnpy.gateway.oes.utils import create_remote_config @@ -65,7 +64,6 @@ BUY_SELL_TYPE_VT2OES = { (Exchange.SHFE, Offset.CLOSE, Direction.LONG): eOesBuySellTypeT.OES_BS_TYPE_BUY_CLOSE, (Exchange.SHFE, Offset.CLOSE, Direction.SHORT): eOesBuySellTypeT.OES_BS_TYPE_SELL_CLOSE, (Exchange.SHFE, Offset.CLOSE, Direction.NET): eOesBuySellTypeT.OES_BS_TYPE_BUY_CLOSE, - # todo: eOesBuySellTypeT.OES_BS_TYPE_OPTION_EXERCISE == 行权 } STATUS_OES2VT = { @@ -202,8 +200,8 @@ class OesTdMessageLoop: if not data.origClSeqNo: try: i = self._td.get_order(data.clSeqNo) - except KeyError: # todo: check why KeyError at startup - # maybe I should find a way to disable subscription of orders at startup + except KeyError: + # todo: maybe I should find a way to disable subscription of orders at startup return vt_order = i.vt_order @@ -290,7 +288,7 @@ class OesTdMessageLoop: exchange=EXCHANGE_OES2VT[data.mktId], direction=Direction.NET, volume=data.sumHld, - frozen=data.lockHld, # todo: to verify + frozen=data.lockHld, price=data.costPrice / 10000, # pnl=data.costPrice - data.originalCostAmt, pnl=0, @@ -321,11 +319,13 @@ class OesTdApi: def __init__(self, gateway: BaseGateway): """""" self.config_path: str = '' - self.username: str = '' self.ord_server: str = '' self.qry_server: str = '' self.rpt_server: str = '' + self.username: str = '' self.password: str = '' + self.hdd_serial: str = '' + self.gateway = gateway self._env = OesApiClientEnvT() @@ -336,6 +336,7 @@ class OesTdApi: self._last_seq_lock = Lock() self._last_seq_index = 1000000 # 0 has special manning for oes + self._reconnect_lock = Lock() self._orders: Dict[int, InternalOrder] = {} @@ -344,50 +345,17 @@ class OesTdApi: :note set config_path before calling this function """ OesApi_InitLogger(self.config_path, 'log') - OesApi_SetThreadUsername(self.username) - OesApi_SetThreadPassword(self.password) - hdd_id = "" # todo: get hdd_id if necessary - OesApi_SetCustomizedDriverId(hdd_id) + OesApi_SetCustomizedDriverId(self.hdd_serial) - OesApi_InitAllByConvention(self._env, self.config_path, -1, 0) - - if (not OesApi_InitOrdChannel2(self._env.ordChannel, - create_remote_config(self.ord_server, - self.username, - self.password), - 0) - or not OesApi_IsValidOrdChannel(self._env.ordChannel)): + if not self._connect_ord_channel(): self.gateway.write_log(_("无法初始化交易下单通道(td_ord_server)")) - return False - self._last_seq_index = max(self._last_seq_index, self._env.ordChannel.lastOutMsgSeq + 1) - if (not OesApi_InitQryChannel2(self._env.qryChannel, - create_remote_config(self.qry_server, - self.username, - self.password)) - or not OesApi_IsValidQryChannel(self._env.qryChannel)): - self.gateway.write_log(_("无法初始化交易查询通道(td_qry_server)")) - return False - subscribe_info = OesApiSubscribeInfoT() - subscribe_info.clEnvId = 0 - subscribe_info.rptTypes = (eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_BUSINESS_REJECT - | eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_ORDER_INSERT - | eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_ORDER_REPORT - | eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_TRADE_REPORT - | eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_FUND_TRSF_REPORT - | eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_CASH_ASSET_VARIATION - | eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_HOLDING_VARIATION - ) - if (not OesApi_InitRptChannel2(self._env.rptChannel, - create_remote_config(self.rpt_server, - self.username, - self.password), - subscribe_info, - 0) - or not OesApi_IsValidQryChannel(self._env.qryChannel)): + if not self._connect_qry_channel(): + self.gateway.write_log(_("无法初始化交易查询通道(td_qry_server)")) + + if not self._connect_rpt_channel(): self.gateway.write_log(_("无法初始化交易查询通道(td_qry_server)")) - return False return True def start(self): @@ -411,6 +379,63 @@ class OesTdApi: self._last_seq_index += 1 return index + def _connect_qry_channel(self): + OesApi_SetThreadUsername(self.username) + OesApi_SetThreadPassword(self.password) + + return OesApi_InitQryChannel2(self._env.qryChannel, + create_remote_config(self.qry_server, + self.username, + self.password)) + + def _connect_ord_channel(self): + OesApi_SetThreadUsername(self.username) + OesApi_SetThreadPassword(self.password) + + if not OesApi_InitOrdChannel2(self._env.ordChannel, + create_remote_config(self.ord_server, + self.username, + self.password), + 0): + return False + self._last_seq_index = max(self._last_seq_index, self._env.ordChannel.lastOutMsgSeq + 1) + return True + + def _connect_rpt_channel(self): + OesApi_SetThreadUsername(self.username) + OesApi_SetThreadPassword(self.password) + + subscribe_info = OesApiSubscribeInfoT() + subscribe_info.clEnvId = 0 + subscribe_info.rptTypes = (eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_BUSINESS_REJECT + | eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_ORDER_INSERT + | eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_ORDER_REPORT + | eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_TRADE_REPORT + | eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_FUND_TRSF_REPORT + | eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_CASH_ASSET_VARIATION + | eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_HOLDING_VARIATION + ) + return OesApi_InitRptChannel2(self._env.rptChannel, + create_remote_config(self.rpt_server, + self.username, + self.password), + subscribe_info, + 0) + + def _reconnect_ord_channel(self): + with self._reconnect_lock: # prevent spawning multiple reconnect thread + if OesApi_IsValidOrdChannel(self._env.ordChannel): + return + + self.gateway.write_log(_("正在重新连接到交易下单通道")) + while not OesApi_IsValidOrdChannel(self._env.ordChannel): + self._connect_ord_channel() + self.gateway.write_log(_("成功重新连接到交易下单通道")) + + def _schedule_reconnect_ord_channel(self): + with self._reconnect_lock: + Thread(target=self._reconnect_ord_channel, ).start() + def query_account(self): """""" OesApi_QueryCashAsset(self._env.qryChannel, @@ -521,7 +546,7 @@ class OesTdApi: frozen=data.lockHld, price=data.costPrice / 10000, # pnl=data.costPrice - data.originalCostAmt, - pnl=0, # todo: oes只提供日初持仓价格信息,不提供最初持仓价格信息,所以pnl只有当日的 + pnl=0, yd_volume=data.originalHld, ) self.gateway.on_position(position) @@ -614,9 +639,14 @@ class OesTdApi: ) if ret >= 0: - self.gateway.on_order(order) + order.status = Status.SUBMITTING else: - self.gateway.write_log("Failed to send_order!") + order.status = Status.REJECTED + self.gateway.write_log(_("下单失败")) # todo: can I stringify error? + if ret == -108: # is here any other ret code indicating connection lost? + self.gateway.write_log(_("下单时连接发现连接已断开,正在尝试重连")) + self._schedule_reconnect_ord_channel() + self.gateway.on_order(order) return order.vt_orderid @@ -632,8 +662,14 @@ class OesTdApi: oes_req.origClSeqNo = order_id oes_req.invAcctId = "" oes_req.securityId = vt_req.symbol - OesApi_SendOrderCancelReq(self._env.ordChannel, - oes_req) + + ret = OesApi_SendOrderCancelReq(self._env.ordChannel, + oes_req) + if ret < 0: + self.gateway.write_log(_("撤单失败")) # todo: can I stringify error? + if ret == -108: # is here any other ret code indicating connection lost? + self.gateway.write_log(_("撤单时连接发现连接已断开,正在尝试重连")) + self._schedule_reconnect_ord_channel() def query_order(self, internal_order: InternalOrder) -> bool: """"""