[Add] OesGateway: User can supply a hdd serial number.

[Fix] OesGateway: Optimized reconnection of td api: reconnect if connection of ord_channel is lost.
This commit is contained in:
nanoric 2019-03-14 23:03:14 -04:00
parent 9b4f360bef
commit 5f5bf06e53
2 changed files with 96 additions and 58 deletions

View File

@ -28,6 +28,7 @@ class OesGateway(BaseGateway):
"md_qry_server": "",
"username": "",
"password": "",
"hdd_serial": "",
}
def __init__(self, event_engine):
@ -76,6 +77,7 @@ class OesGateway(BaseGateway):
self.td_api.ord_server = setting['td_ord_server']
self.td_api.rpt_server = setting['td_rpt_server']
self.td_api.qry_server = setting['td_qry_server']
self.td_api.hdd_serial = setting['hdd_serial']
Thread(target=self._connect_td_sync, args=(config_path, username, password)).start()
def _connect_td_sync(self, config_path, username, password):

View File

@ -7,16 +7,15 @@ from typing import Any, Callable, Dict
from vnpy.api.oes.vnoes import OesApiClientEnvT, OesApiSubscribeInfoT, OesApi_DestoryAll, \
OesApi_InitLogger, OesApi_InitOrdChannel2, OesApi_InitQryChannel2, OesApi_InitRptChannel2, \
OesApi_IsValidOrdChannel, OesApi_IsValidQryChannel, OesApi_LogoutAll, OesApi_QueryCashAsset, \
OesApi_IsValidOrdChannel, OesApi_LogoutAll, OesApi_QueryCashAsset, \
OesApi_QueryOptHolding, OesApi_QueryOption, OesApi_QueryOrder, OesApi_QueryStkHolding, \
OesApi_QueryStock, OesApi_SendOrderCancelReq, OesApi_SendOrderReq, OesApi_SetThreadPassword, \
OesApi_SetThreadUsername, OesApi_WaitReportMsg, OesOrdCancelReqT, OesOrdCnfmT, OesOrdRejectT, \
OesOrdReqT, OesQryCashAssetFilterT, OesQryCursorT, OesQryOptionFilterT, OesQryOrdFilterT, \
OesQryStkHoldingFilterT, OesQryStockFilterT, OesRspMsgBodyT, OesStockBaseInfoT, OesTrdCnfmT, \
SGeneralClientChannelT, SMSG_PROTO_BINARY, SMsgHeadT, SPlatform_IsNegEpipe, cast, \
eOesBuySellTypeT, eOesMarketIdT, eOesMsgTypeT, eOesOrdStatusT, eOesOrdTypeShT, eOesOrdTypeSzT, \
eOesSubscribeReportTypeT, OesApi_SetCustomizedDriverId, OesApi_GetCustomizedDriverId, \
OesApi_InitAllByConvention
OesApi_QueryStock, OesApi_SendOrderCancelReq, OesApi_SendOrderReq, OesApi_SetCustomizedDriverId, \
OesApi_SetThreadPassword, OesApi_SetThreadUsername, OesApi_WaitReportMsg, OesOrdCancelReqT, \
OesOrdCnfmT, OesOrdRejectT, OesOrdReqT, OesQryCashAssetFilterT, OesQryCursorT, \
OesQryOptionFilterT, OesQryOrdFilterT, OesQryStkHoldingFilterT, OesQryStockFilterT, \
OesRspMsgBodyT, OesStockBaseInfoT, OesTrdCnfmT, SGeneralClientChannelT, SMSG_PROTO_BINARY, \
SMsgHeadT, SPlatform_IsNegEpipe, cast, eOesBuySellTypeT, eOesMarketIdT, eOesMsgTypeT, \
eOesOrdStatusT, eOesOrdTypeShT, eOesOrdTypeSzT, eOesSubscribeReportTypeT
from vnpy.gateway.oes.error_code import error_to_str
from vnpy.gateway.oes.utils import create_remote_config
@ -65,7 +64,6 @@ BUY_SELL_TYPE_VT2OES = {
(Exchange.SHFE, Offset.CLOSE, Direction.LONG): eOesBuySellTypeT.OES_BS_TYPE_BUY_CLOSE,
(Exchange.SHFE, Offset.CLOSE, Direction.SHORT): eOesBuySellTypeT.OES_BS_TYPE_SELL_CLOSE,
(Exchange.SHFE, Offset.CLOSE, Direction.NET): eOesBuySellTypeT.OES_BS_TYPE_BUY_CLOSE,
# todo: eOesBuySellTypeT.OES_BS_TYPE_OPTION_EXERCISE == 行权
}
STATUS_OES2VT = {
@ -202,8 +200,8 @@ class OesTdMessageLoop:
if not data.origClSeqNo:
try:
i = self._td.get_order(data.clSeqNo)
except KeyError: # todo: check why KeyError at startup
# maybe I should find a way to disable subscription of orders at startup
except KeyError:
# todo: maybe I should find a way to disable subscription of orders at startup
return
vt_order = i.vt_order
@ -290,7 +288,7 @@ class OesTdMessageLoop:
exchange=EXCHANGE_OES2VT[data.mktId],
direction=Direction.NET,
volume=data.sumHld,
frozen=data.lockHld, # todo: to verify
frozen=data.lockHld,
price=data.costPrice / 10000,
# pnl=data.costPrice - data.originalCostAmt,
pnl=0,
@ -321,11 +319,13 @@ class OesTdApi:
def __init__(self, gateway: BaseGateway):
""""""
self.config_path: str = ''
self.username: str = ''
self.ord_server: str = ''
self.qry_server: str = ''
self.rpt_server: str = ''
self.username: str = ''
self.password: str = ''
self.hdd_serial: str = ''
self.gateway = gateway
self._env = OesApiClientEnvT()
@ -336,6 +336,7 @@ class OesTdApi:
self._last_seq_lock = Lock()
self._last_seq_index = 1000000 # 0 has special manning for oes
self._reconnect_lock = Lock()
self._orders: Dict[int, InternalOrder] = {}
@ -344,50 +345,17 @@ class OesTdApi:
:note set config_path before calling this function
"""
OesApi_InitLogger(self.config_path, 'log')
OesApi_SetThreadUsername(self.username)
OesApi_SetThreadPassword(self.password)
hdd_id = "" # todo: get hdd_id if necessary
OesApi_SetCustomizedDriverId(hdd_id)
OesApi_SetCustomizedDriverId(self.hdd_serial)
OesApi_InitAllByConvention(self._env, self.config_path, -1, 0)
if (not OesApi_InitOrdChannel2(self._env.ordChannel,
create_remote_config(self.ord_server,
self.username,
self.password),
0)
or not OesApi_IsValidOrdChannel(self._env.ordChannel)):
if not self._connect_ord_channel():
self.gateway.write_log(_("无法初始化交易下单通道(td_ord_server)"))
return False
self._last_seq_index = max(self._last_seq_index, self._env.ordChannel.lastOutMsgSeq + 1)
if (not OesApi_InitQryChannel2(self._env.qryChannel,
create_remote_config(self.qry_server,
self.username,
self.password))
or not OesApi_IsValidQryChannel(self._env.qryChannel)):
self.gateway.write_log(_("无法初始化交易查询通道(td_qry_server)"))
return False
subscribe_info = OesApiSubscribeInfoT()
subscribe_info.clEnvId = 0
subscribe_info.rptTypes = (eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_BUSINESS_REJECT
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_ORDER_INSERT
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_ORDER_REPORT
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_TRADE_REPORT
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_FUND_TRSF_REPORT
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_CASH_ASSET_VARIATION
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_HOLDING_VARIATION
)
if (not OesApi_InitRptChannel2(self._env.rptChannel,
create_remote_config(self.rpt_server,
self.username,
self.password),
subscribe_info,
0)
or not OesApi_IsValidQryChannel(self._env.qryChannel)):
if not self._connect_qry_channel():
self.gateway.write_log(_("无法初始化交易查询通道(td_qry_server)"))
if not self._connect_rpt_channel():
self.gateway.write_log(_("无法初始化交易查询通道(td_qry_server)"))
return False
return True
def start(self):
@ -411,6 +379,63 @@ class OesTdApi:
self._last_seq_index += 1
return index
def _connect_qry_channel(self):
OesApi_SetThreadUsername(self.username)
OesApi_SetThreadPassword(self.password)
return OesApi_InitQryChannel2(self._env.qryChannel,
create_remote_config(self.qry_server,
self.username,
self.password))
def _connect_ord_channel(self):
OesApi_SetThreadUsername(self.username)
OesApi_SetThreadPassword(self.password)
if not OesApi_InitOrdChannel2(self._env.ordChannel,
create_remote_config(self.ord_server,
self.username,
self.password),
0):
return False
self._last_seq_index = max(self._last_seq_index, self._env.ordChannel.lastOutMsgSeq + 1)
return True
def _connect_rpt_channel(self):
OesApi_SetThreadUsername(self.username)
OesApi_SetThreadPassword(self.password)
subscribe_info = OesApiSubscribeInfoT()
subscribe_info.clEnvId = 0
subscribe_info.rptTypes = (eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_BUSINESS_REJECT
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_ORDER_INSERT
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_ORDER_REPORT
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_TRADE_REPORT
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_FUND_TRSF_REPORT
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_CASH_ASSET_VARIATION
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_HOLDING_VARIATION
)
return OesApi_InitRptChannel2(self._env.rptChannel,
create_remote_config(self.rpt_server,
self.username,
self.password),
subscribe_info,
0)
def _reconnect_ord_channel(self):
with self._reconnect_lock: # prevent spawning multiple reconnect thread
if OesApi_IsValidOrdChannel(self._env.ordChannel):
return
self.gateway.write_log(_("正在重新连接到交易下单通道"))
while not OesApi_IsValidOrdChannel(self._env.ordChannel):
self._connect_ord_channel()
self.gateway.write_log(_("成功重新连接到交易下单通道"))
def _schedule_reconnect_ord_channel(self):
with self._reconnect_lock:
Thread(target=self._reconnect_ord_channel, ).start()
def query_account(self):
""""""
OesApi_QueryCashAsset(self._env.qryChannel,
@ -521,7 +546,7 @@ class OesTdApi:
frozen=data.lockHld,
price=data.costPrice / 10000,
# pnl=data.costPrice - data.originalCostAmt,
pnl=0, # todo: oes只提供日初持仓价格信息不提供最初持仓价格信息所以pnl只有当日的
pnl=0,
yd_volume=data.originalHld,
)
self.gateway.on_position(position)
@ -614,9 +639,14 @@ class OesTdApi:
)
if ret >= 0:
self.gateway.on_order(order)
order.status = Status.SUBMITTING
else:
self.gateway.write_log("Failed to send_order!")
order.status = Status.REJECTED
self.gateway.write_log(_("下单失败")) # todo: can I stringify error?
if ret == -108: # is here any other ret code indicating connection lost?
self.gateway.write_log(_("下单时连接发现连接已断开,正在尝试重连"))
self._schedule_reconnect_ord_channel()
self.gateway.on_order(order)
return order.vt_orderid
@ -632,8 +662,14 @@ class OesTdApi:
oes_req.origClSeqNo = order_id
oes_req.invAcctId = ""
oes_req.securityId = vt_req.symbol
OesApi_SendOrderCancelReq(self._env.ordChannel,
oes_req)
ret = OesApi_SendOrderCancelReq(self._env.ordChannel,
oes_req)
if ret < 0:
self.gateway.write_log(_("撤单失败")) # todo: can I stringify error?
if ret == -108: # is here any other ret code indicating connection lost?
self.gateway.write_log(_("撤单时连接发现连接已断开,正在尝试重连"))
self._schedule_reconnect_ord_channel()
def query_order(self, internal_order: InternalOrder) -> bool:
""""""