[Fix] OesGateway:修复一个BUG:多个打开多个vnpy实例时重连会导致重连的服务器混乱的问题

[Fix] OesGateway:修复成交委托号为0的BUG

Windows version fix only. Linux not supported yet.
This commit is contained in:
nanoric 2019-03-14 07:37:23 -04:00
parent cc129b3e2c
commit 9b4f360bef
6 changed files with 570 additions and 272 deletions

Binary file not shown.

File diff suppressed because it is too large Load Diff

View File

@ -69,7 +69,13 @@ class OesGateway(BaseGateway):
log_path=log_path) log_path=log_path)
f.write(content) f.write(content)
self.md_api.tcp_server = setting['md_tcp_server']
self.md_api.qry_server = setting['md_qry_server']
Thread(target=self._connect_md_sync, args=(config_path, username, password)).start() Thread(target=self._connect_md_sync, args=(config_path, username, password)).start()
self.td_api.ord_server = setting['td_ord_server']
self.td_api.rpt_server = setting['td_rpt_server']
self.td_api.qry_server = setting['td_qry_server']
Thread(target=self._connect_td_sync, args=(config_path, username, password)).start() Thread(target=self._connect_td_sync, args=(config_path, username, password)).start()
def _connect_td_sync(self, config_path, username, password): def _connect_td_sync(self, config_path, username, password):

View File

@ -5,14 +5,15 @@ from threading import Thread
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
from typing import Any, Callable, Dict from typing import Any, Callable, Dict
from vnpy.api.oes.vnoes import MdsApiClientEnvT, MdsApi_DestoryAll, MdsApi_InitAllByConvention, \ from vnpy.api.oes.vnoes import MdsApiClientEnvT, MdsApi_DestoryAll, MdsApi_InitLogger, \
MdsApi_IsValidQryChannel, MdsApi_IsValidTcpChannel, MdsApi_LogoutAll, MdsApi_SetThreadPassword, \ MdsApi_InitTcpChannel2, MdsApi_IsValidTcpChannel, MdsApi_LogoutAll, MdsApi_SetThreadPassword, \
MdsApi_SetThreadUsername, MdsApi_SubscribeMarketData, MdsApi_WaitOnMsg, MdsL2StockSnapshotBodyT, \ MdsApi_SetThreadUsername, MdsApi_SubscribeMarketData, MdsApi_WaitOnMsg, MdsL2StockSnapshotBodyT, \
MdsMktDataRequestEntryT, MdsMktDataRequestReqT, MdsMktRspMsgBodyT, MdsStockSnapshotBodyT, \ MdsMktDataRequestEntryT, MdsMktDataRequestReqBufT, MdsMktDataRequestReqT, MdsMktRspMsgBodyT, \
SGeneralClientChannelT, SMsgHeadT, SPlatform_IsNegEpipe, cast, \ MdsStockSnapshotBodyT, SGeneralClientChannelT, SMsgHeadT, SPlatform_IsNegEpipe, cast, \
eMdsExchangeIdT, eMdsMktSubscribeFlagT, eMdsMsgTypeT, eMdsSecurityTypeT, eMdsSubscribeDataTypeT, \ eMdsExchangeIdT, eMdsMktSubscribeFlagT, eMdsMsgTypeT, eMdsSecurityTypeT, eMdsSubscribeDataTypeT, \
eMdsSubscribeModeT, eMdsSubscribedTickExpireTypeT, eSMsgProtocolTypeT eMdsSubscribeModeT, eMdsSubscribedTickExpireTypeT, eMdsSubscribedTickTypeT, eSMsgProtocolTypeT
from vnpy.gateway.oes.utils import create_remote_config
from vnpy.trader.constant import Exchange from vnpy.trader.constant import Exchange
from vnpy.trader.gateway import BaseGateway from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import SubscribeRequest, TickData from vnpy.trader.object import SubscribeRequest, TickData
@ -36,7 +37,7 @@ class OesMdMessageLoop:
self._md = md self._md = md
self._th = Thread(target=self._message_loop) self._th = Thread(target=self._message_loop)
self.message_handlers: Dict[int, Callable[[dict], None]] = { self.message_handlers: Dict[eMdsMsgTypeT, Callable[[MdsMktRspMsgBodyT], int]] = {
# tick & orderbook # tick & orderbook
eMdsMsgTypeT.MDS_MSGTYPE_MARKET_DATA_SNAPSHOT_FULL_REFRESH: self.on_market_full_refresh, eMdsMsgTypeT.MDS_MSGTYPE_MARKET_DATA_SNAPSHOT_FULL_REFRESH: self.on_market_full_refresh,
eMdsMsgTypeT.MDS_MSGTYPE_L2_MARKET_DATA_SNAPSHOT: self.on_l2_market_data_snapshot, eMdsMsgTypeT.MDS_MSGTYPE_L2_MARKET_DATA_SNAPSHOT: self.on_l2_market_data_snapshot,
@ -65,6 +66,7 @@ class OesMdMessageLoop:
def start(self): def start(self):
"""""" """"""
if not self._alive: # not thread-safe
self._alive = True self._alive = True
self._th.start() self._th.start()
@ -212,6 +214,9 @@ class OesMdApi:
"""""" """"""
self.gateway = gateway self.gateway = gateway
self.config_path: str = '' self.config_path: str = ''
self.tcp_server: str = ''
self.qry_server: str = ''
self.username: str = '' self.username: str = ''
self.password: str = '' self.password: str = ''
@ -223,16 +228,32 @@ class OesMdApi:
"""Connect to trading server. """Connect to trading server.
:note set config_path before calling this function :note set config_path before calling this function
""" """
MdsApi_InitLogger(self.config_path, "log")
MdsApi_SetThreadUsername(self.username) MdsApi_SetThreadUsername(self.username)
MdsApi_SetThreadPassword(self.password) MdsApi_SetThreadPassword(self.password)
config_path = self.config_path info = MdsMktDataRequestReqBufT()
if not MdsApi_InitAllByConvention(self._env, config_path): info.mktDataRequestReq.subMode = eMdsSubscribeModeT.MDS_SUB_MODE_SET
info.mktDataRequestReq.tickType = eMdsSubscribedTickTypeT.MDS_TICK_TYPE_LATEST_SIMPLIFIED
info.mktDataRequestReq.isRequireInitialMktData = True
info.mktDataRequestReq.tickExpireType = eMdsSubscribedTickExpireTypeT.MDS_TICK_EXPIRE_TYPE_TIMELY
info.mktDataRequestReq.dataTypes = (eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_L1_SNAPSHOT
| eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_L2_SNAPSHOT
| eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_L2_BEST_ORDERS
| eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_L2_TRADE
| eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_L2_ORDER
)
info.mktDataRequestReq.beginTime = 0
info.mktDataRequestReq.subSecurityCnt = 0
if not MdsApi_InitTcpChannel2(self._env.tcpChannel,
create_remote_config(server=self.tcp_server,
username=self.username,
password=self.password),
info):
return False return False
if not MdsApi_IsValidTcpChannel(self._env.tcpChannel): if not MdsApi_IsValidTcpChannel(self._env.tcpChannel):
return False return False
if not MdsApi_IsValidQryChannel(self._env.qryChannel):
return False
return True return True
def start(self): def start(self):

View File

@ -5,18 +5,21 @@ from threading import Lock, Thread
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
from typing import Any, Callable, Dict from typing import Any, Callable, Dict
from vnpy.api.oes.vnoes import OesApiClientEnvT, OesApi_DestoryAll, OesApi_InitAllByConvention, \ from vnpy.api.oes.vnoes import OesApiClientEnvT, OesApiSubscribeInfoT, OesApi_DestoryAll, \
OesApi_IsValidOrdChannel, OesApi_IsValidQryChannel, OesApi_IsValidRptChannel, OesApi_LogoutAll, \ OesApi_InitLogger, OesApi_InitOrdChannel2, OesApi_InitQryChannel2, OesApi_InitRptChannel2, \
OesApi_QueryCashAsset, OesApi_QueryOptHolding, OesApi_QueryOption, OesApi_QueryOrder, \ OesApi_IsValidOrdChannel, OesApi_IsValidQryChannel, OesApi_LogoutAll, OesApi_QueryCashAsset, \
OesApi_QueryStkHolding, OesApi_QueryStock, OesApi_SendOrderCancelReq, OesApi_SendOrderReq, \ OesApi_QueryOptHolding, OesApi_QueryOption, OesApi_QueryOrder, OesApi_QueryStkHolding, \
OesApi_SetThreadPassword, OesApi_SetThreadUsername, OesApi_WaitReportMsg, OesOrdCancelReqT, \ OesApi_QueryStock, OesApi_SendOrderCancelReq, OesApi_SendOrderReq, OesApi_SetThreadPassword, \
OesOrdCnfmT, OesOrdRejectT, OesOrdReqT, OesQryCashAssetFilterT, OesQryCursorT, \ OesApi_SetThreadUsername, OesApi_WaitReportMsg, OesOrdCancelReqT, OesOrdCnfmT, OesOrdRejectT, \
OesQryOptionFilterT, OesQryOrdFilterT, OesQryStkHoldingFilterT, OesQryStockFilterT, \ OesOrdReqT, OesQryCashAssetFilterT, OesQryCursorT, OesQryOptionFilterT, OesQryOrdFilterT, \
OesRspMsgBodyT, OesStockBaseInfoT, OesTrdCnfmT, SGeneralClientChannelT, SMSG_PROTO_BINARY, \ OesQryStkHoldingFilterT, OesQryStockFilterT, OesRspMsgBodyT, OesStockBaseInfoT, OesTrdCnfmT, \
SMsgHeadT, SPlatform_IsNegEpipe, cast, eOesBuySellTypeT, eOesMarketIdT, \ SGeneralClientChannelT, SMSG_PROTO_BINARY, SMsgHeadT, SPlatform_IsNegEpipe, cast, \
eOesMsgTypeT, eOesOrdStatusT, eOesOrdTypeShT, eOesOrdTypeSzT eOesBuySellTypeT, eOesMarketIdT, eOesMsgTypeT, eOesOrdStatusT, eOesOrdTypeShT, eOesOrdTypeSzT, \
eOesSubscribeReportTypeT, OesApi_SetCustomizedDriverId, OesApi_GetCustomizedDriverId, \
OesApi_InitAllByConvention
from vnpy.gateway.oes.error_code import error_to_str from vnpy.gateway.oes.error_code import error_to_str
from vnpy.gateway.oes.utils import create_remote_config
from vnpy.trader.constant import Direction, Exchange, Offset, PriceType, Product, Status from vnpy.trader.constant import Direction, Exchange, Offset, PriceType, Product, Status
from vnpy.trader.gateway import BaseGateway from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import AccountData, CancelRequest, ContractData, OrderData, OrderRequest, \ from vnpy.trader.object import AccountData, CancelRequest, ContractData, OrderData, OrderRequest, \
@ -125,7 +128,7 @@ class OesTdMessageLoop:
self._alive = False self._alive = False
self._th = Thread(target=self._message_loop) self._th = Thread(target=self._message_loop)
self.message_handlers: Dict[int, Callable[[dict], None]] = { self.message_handlers: Dict[eOesMsgTypeT, Callable[[OesRspMsgBodyT], int]] = {
eOesMsgTypeT.OESMSG_RPT_BUSINESS_REJECT: self.on_order_rejected, eOesMsgTypeT.OESMSG_RPT_BUSINESS_REJECT: self.on_order_rejected,
eOesMsgTypeT.OESMSG_RPT_ORDER_INSERT: self.on_order_inserted, eOesMsgTypeT.OESMSG_RPT_ORDER_INSERT: self.on_order_inserted,
eOesMsgTypeT.OESMSG_RPT_ORDER_REPORT: self.on_order_report, eOesMsgTypeT.OESMSG_RPT_ORDER_REPORT: self.on_order_report,
@ -140,6 +143,7 @@ class OesTdMessageLoop:
def start(self): def start(self):
"""""" """"""
if not self._alive: # not thread-safe
self._alive = True self._alive = True
self._th.start() self._th.start()
@ -196,7 +200,11 @@ class OesTdMessageLoop:
error_string = error_to_str(error_code) error_string = error_to_str(error_code)
data: OesOrdRejectT = d.rptMsg.rptBody.ordRejectRsp data: OesOrdRejectT = d.rptMsg.rptBody.ordRejectRsp
if not data.origClSeqNo: if not data.origClSeqNo:
try:
i = self._td.get_order(data.clSeqNo) i = self._td.get_order(data.clSeqNo)
except KeyError: # todo: check why KeyError at startup
# maybe I should find a way to disable subscription of orders at startup
return
vt_order = i.vt_order vt_order = i.vt_order
if vt_order == Status.ALLTRADED: if vt_order == Status.ALLTRADED:
@ -255,13 +263,13 @@ class OesTdMessageLoop:
gateway_name=self.gateway.gateway_name, gateway_name=self.gateway.gateway_name,
symbol=data.securityId, symbol=data.securityId,
exchange=EXCHANGE_OES2VT[data.mktId], exchange=EXCHANGE_OES2VT[data.mktId],
orderid=data.userInfo, orderid=str(data.clSeqNo),
tradeid=data.exchTrdNum, tradeid=str(data.exchTrdNum),
direction=vt_order.direction, direction=vt_order.direction,
offset=vt_order.offset, offset=vt_order.offset,
price=data.trdPrice / 10000, price=data.trdPrice / 10000,
volume=data.trdQty, volume=data.trdQty,
time=parse_oes_datetime(data.trdDate, data.trdTime) time=parse_oes_datetime(data.trdDate, data.trdTime).isoformat()
) )
vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.status = STATUS_OES2VT[data.ordStatus]
vt_order.traded = data.cumQty vt_order.traded = data.cumQty
@ -312,8 +320,11 @@ class OesTdApi:
def __init__(self, gateway: BaseGateway): def __init__(self, gateway: BaseGateway):
"""""" """"""
self.config_path: str = None self.config_path: str = ''
self.username: str = '' self.username: str = ''
self.ord_server: str = ''
self.qry_server: str = ''
self.rpt_server: str = ''
self.password: str = '' self.password: str = ''
self.gateway = gateway self.gateway = gateway
@ -332,19 +343,50 @@ class OesTdApi:
"""Connect to trading server. """Connect to trading server.
:note set config_path before calling this function :note set config_path before calling this function
""" """
OesApi_InitLogger(self.config_path, 'log')
OesApi_SetThreadUsername(self.username) OesApi_SetThreadUsername(self.username)
OesApi_SetThreadPassword(self.password) OesApi_SetThreadPassword(self.password)
config_path = self.config_path hdd_id = "" # todo: get hdd_id if necessary
if not OesApi_InitAllByConvention(self._env, config_path, -1, self._last_seq_index): OesApi_SetCustomizedDriverId(hdd_id)
OesApi_InitAllByConvention(self._env, self.config_path, -1, 0)
if (not OesApi_InitOrdChannel2(self._env.ordChannel,
create_remote_config(self.ord_server,
self.username,
self.password),
0)
or not OesApi_IsValidOrdChannel(self._env.ordChannel)):
self.gateway.write_log(_("无法初始化交易下单通道(td_ord_server)"))
return False return False
self._last_seq_index = max(self._last_seq_index, self._env.ordChannel.lastOutMsgSeq + 1) self._last_seq_index = max(self._last_seq_index, self._env.ordChannel.lastOutMsgSeq + 1)
if (not OesApi_InitQryChannel2(self._env.qryChannel,
create_remote_config(self.qry_server,
self.username,
self.password))
or not OesApi_IsValidQryChannel(self._env.qryChannel)):
self.gateway.write_log(_("无法初始化交易查询通道(td_qry_server)"))
return False
if not OesApi_IsValidOrdChannel(self._env.ordChannel): subscribe_info = OesApiSubscribeInfoT()
return False subscribe_info.clEnvId = 0
if not OesApi_IsValidQryChannel(self._env.qryChannel): subscribe_info.rptTypes = (eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_BUSINESS_REJECT
return False | eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_ORDER_INSERT
if not OesApi_IsValidRptChannel(self._env.rptChannel): | eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_ORDER_REPORT
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_TRADE_REPORT
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_FUND_TRSF_REPORT
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_CASH_ASSET_VARIATION
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_HOLDING_VARIATION
)
if (not OesApi_InitRptChannel2(self._env.rptChannel,
create_remote_config(self.rpt_server,
self.username,
self.password),
subscribe_info,
0)
or not OesApi_IsValidQryChannel(self._env.qryChannel)):
self.gateway.write_log(_("无法初始化交易查询通道(td_qry_server)"))
return False return False
return True return True
@ -489,7 +531,6 @@ class OesTdApi:
"""""" """"""
f = OesQryStkHoldingFilterT() f = OesQryStkHoldingFilterT()
f.mktId = eOesMarketIdT.OES_MKT_ID_UNDEFINE f.mktId = eOesMarketIdT.OES_MKT_ID_UNDEFINE
f.userInfo = 0
ret = OesApi_QueryOptHolding(self._env.qryChannel, ret = OesApi_QueryOptHolding(self._env.qryChannel,
f, f,
self.on_query_holding self.on_query_holding
@ -658,7 +699,7 @@ class OesTdApi:
gateway_name=self.gateway.gateway_name, gateway_name=self.gateway.gateway_name,
symbol=data.securityId, symbol=data.securityId,
exchange=EXCHANGE_OES2VT[data.mktId], exchange=EXCHANGE_OES2VT[data.mktId],
orderid=order_id if order_id else data.origClSeqNo, # generated id orderid=str(order_id if order_id else data.origClSeqNo), # generated id
direction=Direction.NET, direction=Direction.NET,
offset=offset, offset=offset,
price=data.ordPrice / 10000, price=data.ordPrice / 10000,

View File

@ -1,6 +1,30 @@
import os import os
from vnpy.api.oes.vnoes import SGeneralClientRemoteCfgT, SGeneralClientAddrInfoT
mydir = os.path.dirname(__file__) mydir = os.path.dirname(__file__)
config_template_path = os.path.join(mydir, "config_template.ini") config_template_path = os.path.join(mydir, "config_template.ini")
with open(config_template_path, "rt", encoding='utf-8') as f: with open(config_template_path, "rt", encoding='utf-8') as f:
config_template = f.read() config_template = f.read()
def create_remote_config(server: str, username: str, password: str):
cfg = SGeneralClientRemoteCfgT()
cfg.username = username
cfg.password = password
cfg.addrCnt = 1
cfg.addrList[0].uri = server
cfg.clEnvId = 0
cfg.clusterType = 0
cfg.socketOpt.soRcvbuf = 8192
cfg.socketOpt.soSndbuf = 8192
cfg.socketOpt.connTimeoutMs = 5000
cfg.socketOpt.tcpNodelay = 1
cfg.socketOpt.quickAck = 1
cfg.socketOpt.keepalive = 1
cfg.socketOpt.keepIdle = 60
cfg.socketOpt.keepIntvl = 5
cfg.socketOpt.keepCnt = 9
return cfg