Merge pull request #1489 from nanoric/oes_optimize

Oes optimize
This commit is contained in:
vn.py 2019-03-19 13:45:46 +08:00 committed by GitHub
commit 244bd8cec0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 665 additions and 308 deletions

Binary file not shown.

File diff suppressed because it is too large Load Diff

View File

@ -4,7 +4,7 @@
import hashlib import hashlib
import os import os
from gettext import gettext as _ from gettext import gettext as _
from threading import Thread, Lock from threading import Lock, Thread
from vnpy.trader.gateway import BaseGateway from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import (CancelRequest, OrderRequest, from vnpy.trader.object import (CancelRequest, OrderRequest,
@ -17,7 +17,14 @@ from .utils import config_template
class OesGateway(BaseGateway): class OesGateway(BaseGateway):
""" """
VN Trader Gateway for BitMEX connection. VN Trader Gateway for OES
Because the design of OES API, multiple gateway instance with a same account is currently
not supported.
running multiple gateway instance with the same account will make send_order and
cancel_order fail frequently, because:
* seq_index is not unique between instances
* value range of client_id is too small to create a unique hash for different client.
""" """
default_setting = { default_setting = {
@ -28,6 +35,7 @@ class OesGateway(BaseGateway):
"md_qry_server": "", "md_qry_server": "",
"username": "", "username": "",
"password": "", "password": "",
"hdd_serial": "",
} }
def __init__(self, event_engine): def __init__(self, event_engine):
@ -69,7 +77,14 @@ class OesGateway(BaseGateway):
log_path=log_path) log_path=log_path)
f.write(content) f.write(content)
self.md_api.tcp_server = setting['md_tcp_server']
self.md_api.qry_server = setting['md_qry_server']
Thread(target=self._connect_md_sync, args=(config_path, username, password)).start() Thread(target=self._connect_md_sync, args=(config_path, username, password)).start()
self.td_api.ord_server = setting['td_ord_server']
self.td_api.rpt_server = setting['td_rpt_server']
self.td_api.qry_server = setting['td_qry_server']
self.td_api.hdd_serial = setting['hdd_serial']
Thread(target=self._connect_td_sync, args=(config_path, username, password)).start() Thread(target=self._connect_td_sync, args=(config_path, username, password)).start()
def _connect_td_sync(self, config_path, username, password): def _connect_td_sync(self, config_path, username, password):
@ -92,6 +107,7 @@ class OesGateway(BaseGateway):
self.md_api.username = username self.md_api.username = username
self.md_api.password = password self.md_api.password = password
if self.md_api.connect(): if self.md_api.connect():
self.write_log(_("成功连接到行情服务器"))
self.md_api.start() self.md_api.start()
else: else:
self.write_log(_("无法连接到行情服务器,请检查你的配置")) self.write_log(_("无法连接到行情服务器,请检查你的配置"))

View File

@ -1,18 +1,20 @@
import time import time
from copy import copy
from datetime import datetime from datetime import datetime
from gettext import gettext as _ from gettext import gettext as _
from threading import Thread from threading import Thread
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
from typing import Any, Callable, Dict from typing import Any, Callable, Dict
from vnpy.api.oes.vnoes import MdsApiClientEnvT, MdsApi_DestoryAll, MdsApi_InitAllByConvention, \ from vnpy.api.oes.vnoes import MdsApiClientEnvT, MdsApi_DestoryAll, MdsApi_InitLogger, \
MdsApi_IsValidQryChannel, MdsApi_IsValidTcpChannel, MdsApi_LogoutAll, MdsApi_SetThreadPassword, \ MdsApi_InitTcpChannel2, MdsApi_LogoutAll, MdsApi_SetThreadPassword, \
MdsApi_SetThreadUsername, MdsApi_SubscribeMarketData, MdsApi_WaitOnMsg, MdsL2StockSnapshotBodyT, \ MdsApi_SetThreadUsername, MdsApi_SubscribeMarketData, MdsApi_WaitOnMsg, MdsL2StockSnapshotBodyT, \
MdsMktDataRequestEntryT, MdsMktDataRequestReqT, MdsMktRspMsgBodyT, MdsStockSnapshotBodyT, \ MdsMktDataRequestEntryT, MdsMktDataRequestReqBufT, MdsMktDataRequestReqT, MdsMktRspMsgBodyT, \
SGeneralClientChannelT, SMsgHeadT, SPlatform_IsNegEpipe, cast, \ MdsStockSnapshotBodyT, SGeneralClientChannelT, SMsgHeadT, cast, \
eMdsExchangeIdT, eMdsMktSubscribeFlagT, eMdsMsgTypeT, eMdsSecurityTypeT, eMdsSubscribeDataTypeT, \ eMdsExchangeIdT, eMdsMktSubscribeFlagT, eMdsMsgTypeT, eMdsSecurityTypeT, eMdsSubscribeDataTypeT, \
eMdsSubscribeModeT, eMdsSubscribedTickExpireTypeT, eSMsgProtocolTypeT eMdsSubscribeModeT, eMdsSubscribedTickExpireTypeT, eMdsSubscribedTickTypeT, eSMsgProtocolTypeT
from vnpy.gateway.oes.utils import create_remote_config, is_disconnected
from vnpy.trader.constant import Exchange from vnpy.trader.constant import Exchange
from vnpy.trader.gateway import BaseGateway from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import SubscribeRequest, TickData from vnpy.trader.object import SubscribeRequest, TickData
@ -36,7 +38,7 @@ class OesMdMessageLoop:
self._md = md self._md = md
self._th = Thread(target=self._message_loop) self._th = Thread(target=self._message_loop)
self.message_handlers: Dict[int, Callable[[dict], None]] = { self.message_handlers: Dict[eMdsMsgTypeT, Callable[[MdsMktRspMsgBodyT], int]] = {
# tick & orderbook # tick & orderbook
eMdsMsgTypeT.MDS_MSGTYPE_MARKET_DATA_SNAPSHOT_FULL_REFRESH: self.on_market_full_refresh, eMdsMsgTypeT.MDS_MSGTYPE_MARKET_DATA_SNAPSHOT_FULL_REFRESH: self.on_market_full_refresh,
eMdsMsgTypeT.MDS_MSGTYPE_L2_MARKET_DATA_SNAPSHOT: self.on_l2_market_data_snapshot, eMdsMsgTypeT.MDS_MSGTYPE_L2_MARKET_DATA_SNAPSHOT: self.on_l2_market_data_snapshot,
@ -65,8 +67,9 @@ class OesMdMessageLoop:
def start(self): def start(self):
"""""" """"""
self._alive = True if not self._alive: # not thread-safe
self._th.start() self._alive = True
self._th.start()
def stop(self): def stop(self):
"""""" """"""
@ -79,7 +82,7 @@ class OesMdMessageLoop:
def reconnect(self): def reconnect(self):
"""""" """"""
self.gateway.write_log(_("正在尝试重新连接到行情服务器。")) self.gateway.write_log(_("正在尝试重新连接到行情服务器。"))
return self._md.connect() return self._md.connect_tcp_channel()
def _get_last_tick(self, symbol): def _get_last_tick(self, symbol):
"""""" """"""
@ -116,8 +119,6 @@ class OesMdMessageLoop:
"""""" """"""
tcp_channel = self.env.tcpChannel tcp_channel = self.env.tcpChannel
timeout_ms = 1000 timeout_ms = 1000
# is_timeout = SPlatform_IsNegEtimeout
is_disconnected = SPlatform_IsNegEpipe
while self._alive: while self._alive:
ret = MdsApi_WaitOnMsg(tcp_channel, ret = MdsApi_WaitOnMsg(tcp_channel,
timeout_ms, timeout_ms,
@ -145,7 +146,7 @@ class OesMdMessageLoop:
tick.__dict__['bid_price_' + str(i + 1)] = data.BidLevels[i].Price / 10000 tick.__dict__['bid_price_' + str(i + 1)] = data.BidLevels[i].Price / 10000
for i in range(min(data.OfferPriceLevel, 5)): for i in range(min(data.OfferPriceLevel, 5)):
tick.__dict__['ask_price_' + str(i + 1)] = data.OfferLevels[i].Price / 10000 tick.__dict__['ask_price_' + str(i + 1)] = data.OfferLevels[i].Price / 10000
self.gateway.on_tick(tick) self.gateway.on_tick(copy(tick))
def on_market_full_refresh(self, d: MdsMktRspMsgBodyT): def on_market_full_refresh(self, d: MdsMktRspMsgBodyT):
"""""" """"""
@ -161,7 +162,7 @@ class OesMdMessageLoop:
tick.__dict__['bid_price_' + str(i + 1)] = data.BidLevels[i].Price / 10000 tick.__dict__['bid_price_' + str(i + 1)] = data.BidLevels[i].Price / 10000
for i in range(5): for i in range(5):
tick.__dict__['ask_price_' + str(i + 1)] = data.OfferLevels[i].Price / 10000 tick.__dict__['ask_price_' + str(i + 1)] = data.OfferLevels[i].Price / 10000
self.gateway.on_tick(tick) self.gateway.on_tick(copy(tick))
def on_l2_trade(self, d: MdsMktRspMsgBodyT): def on_l2_trade(self, d: MdsMktRspMsgBodyT):
"""""" """"""
@ -171,7 +172,7 @@ class OesMdMessageLoop:
tick.datetime = datetime.utcnow() tick.datetime = datetime.utcnow()
tick.volume = data.TradeQty tick.volume = data.TradeQty
tick.last_price = data.TradePrice / 10000 tick.last_price = data.TradePrice / 10000
self.gateway.on_tick(tick) self.gateway.on_tick(copy(tick))
def on_market_data_request(self, d: MdsMktRspMsgBodyT): def on_market_data_request(self, d: MdsMktRspMsgBodyT):
"""""" """"""
@ -212,6 +213,9 @@ class OesMdApi:
"""""" """"""
self.gateway = gateway self.gateway = gateway
self.config_path: str = '' self.config_path: str = ''
self.tcp_server: str = ''
self.qry_server: str = ''
self.username: str = '' self.username: str = ''
self.password: str = '' self.password: str = ''
@ -223,17 +227,8 @@ class OesMdApi:
"""Connect to trading server. """Connect to trading server.
:note set config_path before calling this function :note set config_path before calling this function
""" """
MdsApi_SetThreadUsername(self.username) MdsApi_InitLogger(self.config_path, "log")
MdsApi_SetThreadPassword(self.password) return self.connect_tcp_channel()
config_path = self.config_path
if not MdsApi_InitAllByConvention(self._env, config_path):
return False
if not MdsApi_IsValidTcpChannel(self._env.tcpChannel):
return False
if not MdsApi_IsValidQryChannel(self._env.qryChannel):
return False
return True
def start(self): def start(self):
"""""" """"""
@ -249,7 +244,32 @@ class OesMdApi:
"""""" """"""
self._message_loop.join() self._message_loop.join()
# why isn't arg a ContractData? def connect_tcp_channel(self):
""""""
MdsApi_SetThreadUsername(self.username)
MdsApi_SetThreadPassword(self.password)
info = MdsMktDataRequestReqBufT()
info.mktDataRequestReq.subMode = eMdsSubscribeModeT.MDS_SUB_MODE_SET
info.mktDataRequestReq.tickType = eMdsSubscribedTickTypeT.MDS_TICK_TYPE_LATEST_SIMPLIFIED
info.mktDataRequestReq.isRequireInitialMktData = True
info.mktDataRequestReq.tickExpireType = eMdsSubscribedTickExpireTypeT.MDS_TICK_EXPIRE_TYPE_TIMELY
info.mktDataRequestReq.dataTypes = (eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_L1_SNAPSHOT
| eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_L2_SNAPSHOT
| eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_L2_BEST_ORDERS
| eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_L2_TRADE
| eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_L2_ORDER
)
info.mktDataRequestReq.beginTime = 0
info.mktDataRequestReq.subSecurityCnt = 0
if not MdsApi_InitTcpChannel2(self._env.tcpChannel,
create_remote_config(server=self.tcp_server,
username=self.username,
password=self.password),
info):
return False
return True
def subscribe(self, req: SubscribeRequest): def subscribe(self, req: SubscribeRequest):
"""""" """"""
mds_req = MdsMktDataRequestReqT() mds_req = MdsMktDataRequestReqT()

View File

@ -1,3 +1,5 @@
import time
from copy import copy
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from gettext import gettext as _ from gettext import gettext as _
@ -5,18 +7,20 @@ from threading import Lock, Thread
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
from typing import Any, Callable, Dict from typing import Any, Callable, Dict
from vnpy.api.oes.vnoes import OesApiClientEnvT, OesApi_DestoryAll, OesApi_InitAllByConvention, \ from vnpy.api.oes.vnoes import OesApiClientEnvT, OesApiSubscribeInfoT, OesApi_DestoryAll, \
OesApi_IsValidOrdChannel, OesApi_IsValidQryChannel, OesApi_IsValidRptChannel, OesApi_LogoutAll, \ OesApi_InitLogger, OesApi_InitOrdChannel2, OesApi_InitQryChannel2, OesApi_InitRptChannel2, \
OesApi_QueryCashAsset, OesApi_QueryOptHolding, OesApi_QueryOption, OesApi_QueryOrder, \ OesApi_LogoutAll, OesApi_QueryCashAsset, \
OesApi_QueryStkHolding, OesApi_QueryStock, OesApi_SendOrderCancelReq, OesApi_SendOrderReq, \ OesApi_QueryOptHolding, OesApi_QueryOption, OesApi_QueryOrder, OesApi_QueryStkHolding, \
OesApi_QueryStock, OesApi_SendOrderCancelReq, OesApi_SendOrderReq, OesApi_SetCustomizedDriverId, \
OesApi_SetThreadPassword, OesApi_SetThreadUsername, OesApi_WaitReportMsg, OesOrdCancelReqT, \ OesApi_SetThreadPassword, OesApi_SetThreadUsername, OesApi_WaitReportMsg, OesOrdCancelReqT, \
OesOrdCnfmT, OesOrdRejectT, OesOrdReqT, OesQryCashAssetFilterT, OesQryCursorT, \ OesOrdCnfmT, OesOrdRejectT, OesOrdReqT, OesQryCashAssetFilterT, OesQryCursorT, \
OesQryOptionFilterT, OesQryOrdFilterT, OesQryStkHoldingFilterT, OesQryStockFilterT, \ OesQryOptionFilterT, OesQryOrdFilterT, OesQryStkHoldingFilterT, OesQryStockFilterT, \
OesRspMsgBodyT, OesStockBaseInfoT, OesTrdCnfmT, SGeneralClientChannelT, SMSG_PROTO_BINARY, \ OesRspMsgBodyT, OesStockBaseInfoT, OesTrdCnfmT, SGeneralClientChannelT, SMSG_PROTO_BINARY, \
SMsgHeadT, SPlatform_IsNegEpipe, cast, eOesBuySellTypeT, eOesMarketIdT, \ SMsgHeadT, cast, eOesBuySellTypeT, eOesMarketIdT, eOesMsgTypeT, \
eOesMsgTypeT, eOesOrdStatusT, eOesOrdTypeShT, eOesOrdTypeSzT eOesOrdStatusT, eOesOrdTypeShT, eOesOrdTypeSzT, eOesSubscribeReportTypeT
from vnpy.gateway.oes.error_code import error_to_str from vnpy.gateway.oes.error_code import error_to_str
from vnpy.gateway.oes.utils import create_remote_config, is_disconnected
from vnpy.trader.constant import Direction, Exchange, Offset, PriceType, Product, Status from vnpy.trader.constant import Direction, Exchange, Offset, PriceType, Product, Status
from vnpy.trader.gateway import BaseGateway from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import AccountData, CancelRequest, ContractData, OrderData, OrderRequest, \ from vnpy.trader.object import AccountData, CancelRequest, ContractData, OrderData, OrderRequest, \
@ -62,7 +66,6 @@ BUY_SELL_TYPE_VT2OES = {
(Exchange.SHFE, Offset.CLOSE, Direction.LONG): eOesBuySellTypeT.OES_BS_TYPE_BUY_CLOSE, (Exchange.SHFE, Offset.CLOSE, Direction.LONG): eOesBuySellTypeT.OES_BS_TYPE_BUY_CLOSE,
(Exchange.SHFE, Offset.CLOSE, Direction.SHORT): eOesBuySellTypeT.OES_BS_TYPE_SELL_CLOSE, (Exchange.SHFE, Offset.CLOSE, Direction.SHORT): eOesBuySellTypeT.OES_BS_TYPE_SELL_CLOSE,
(Exchange.SHFE, Offset.CLOSE, Direction.NET): eOesBuySellTypeT.OES_BS_TYPE_BUY_CLOSE, (Exchange.SHFE, Offset.CLOSE, Direction.NET): eOesBuySellTypeT.OES_BS_TYPE_BUY_CLOSE,
# todo: eOesBuySellTypeT.OES_BS_TYPE_OPTION_EXERCISE == 行权
} }
STATUS_OES2VT = { STATUS_OES2VT = {
@ -125,7 +128,7 @@ class OesTdMessageLoop:
self._alive = False self._alive = False
self._th = Thread(target=self._message_loop) self._th = Thread(target=self._message_loop)
self.message_handlers: Dict[int, Callable[[dict], None]] = { self.message_handlers: Dict[eOesMsgTypeT, Callable[[OesRspMsgBodyT], int]] = {
eOesMsgTypeT.OESMSG_RPT_BUSINESS_REJECT: self.on_order_rejected, eOesMsgTypeT.OESMSG_RPT_BUSINESS_REJECT: self.on_order_rejected,
eOesMsgTypeT.OESMSG_RPT_ORDER_INSERT: self.on_order_inserted, eOesMsgTypeT.OESMSG_RPT_ORDER_INSERT: self.on_order_inserted,
eOesMsgTypeT.OESMSG_RPT_ORDER_REPORT: self.on_order_report, eOesMsgTypeT.OESMSG_RPT_ORDER_REPORT: self.on_order_report,
@ -140,8 +143,9 @@ class OesTdMessageLoop:
def start(self): def start(self):
"""""" """"""
self._alive = True if not self._alive: # not thread-safe
self._th.start() self._alive = True
self._th.start()
def stop(self): def stop(self):
"""""" """"""
@ -154,7 +158,7 @@ class OesTdMessageLoop:
def reconnect(self): def reconnect(self):
"""""" """"""
self.gateway.write_log(_("正在尝试重新连接到交易服务器。")) self.gateway.write_log(_("正在尝试重新连接到交易服务器。"))
self._td.connect() self._td.connect_rpt_channel()
def _on_message(self, session_info: SGeneralClientChannelT, def _on_message(self, session_info: SGeneralClientChannelT,
head: SMsgHeadT, head: SMsgHeadT,
@ -175,7 +179,6 @@ class OesTdMessageLoop:
"""""" """"""
rpt_channel = self._env.rptChannel rpt_channel = self._env.rptChannel
timeout_ms = 1000 timeout_ms = 1000
is_disconnected = SPlatform_IsNegEpipe
while self._alive: while self._alive:
ret = OesApi_WaitReportMsg(rpt_channel, ret = OesApi_WaitReportMsg(rpt_channel,
@ -196,15 +199,18 @@ class OesTdMessageLoop:
error_string = error_to_str(error_code) error_string = error_to_str(error_code)
data: OesOrdRejectT = d.rptMsg.rptBody.ordRejectRsp data: OesOrdRejectT = d.rptMsg.rptBody.ordRejectRsp
if not data.origClSeqNo: if not data.origClSeqNo:
i = self._td.get_order(data.clSeqNo) try:
vt_order = i.vt_order i = self._td.get_order(data.clSeqNo)
except KeyError:
return # rejected order created by others, don't need to care.
vt_order = i.vt_order
if vt_order == Status.ALLTRADED: if vt_order == Status.ALLTRADED:
return return
vt_order.status = Status.REJECTED vt_order.status = Status.REJECTED
self.gateway.on_order(vt_order) self.gateway.on_order(copy(vt_order))
self.gateway.write_log( self.gateway.write_log(
f"Order: {vt_order.vt_symbol}-{vt_order.vt_orderid} Code: {error_code} Rejected: {error_string}") f"Order: {vt_order.vt_symbol}-{vt_order.vt_orderid} Code: {error_code} Rejected: {error_string}")
else: else:
@ -215,8 +221,10 @@ class OesTdMessageLoop:
data = d.rptMsg.rptBody.ordInsertRsp data = d.rptMsg.rptBody.ordInsertRsp
if not data.origClSeqNo: if not data.origClSeqNo:
# normal order
i = self._td.get_order(data.clSeqNo) i = self._td.get_order(data.clSeqNo)
else: else:
# data.ordStatus == eOesOrdStatusT.OES_ORD_STATUS_CANCEL_DONE:
i = self._td.get_order(data.origClSeqNo) i = self._td.get_order(data.origClSeqNo)
vt_order = i.vt_order vt_order = i.vt_order
vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.status = STATUS_OES2VT[data.ordStatus]
@ -224,15 +232,17 @@ class OesTdMessageLoop:
vt_order.traded = data.cumQty vt_order.traded = data.cumQty
vt_order.time = parse_oes_datetime(data.ordDate, data.ordTime) vt_order.time = parse_oes_datetime(data.ordDate, data.ordTime)
self.gateway.on_order(vt_order) self.gateway.on_order(copy(vt_order))
def on_order_report(self, d: OesRspMsgBodyT): def on_order_report(self, d: OesRspMsgBodyT):
"""""" """"""
data: OesOrdCnfmT = d.rptMsg.rptBody.ordCnfm data: OesOrdCnfmT = d.rptMsg.rptBody.ordCnfm
if not data.origClSeqNo: if not data.origClSeqNo:
# normal order
i = self._td.get_order(data.clSeqNo) i = self._td.get_order(data.clSeqNo)
else: else:
# data.ordStatus == eOesOrdStatusT.OES_ORD_STATUS_CANCEL_DONE:
i = self._td.get_order(data.origClSeqNo) i = self._td.get_order(data.origClSeqNo)
vt_order = i.vt_order vt_order = i.vt_order
@ -241,7 +251,7 @@ class OesTdMessageLoop:
vt_order.traded = data.cumQty vt_order.traded = data.cumQty
vt_order.time = parse_oes_datetime(data.ordDate, data.ordCnfmTime) vt_order.time = parse_oes_datetime(data.ordDate, data.ordCnfmTime)
self.gateway.on_order(vt_order) self.gateway.on_order(copy(vt_order))
def on_trade_report(self, d: OesRspMsgBodyT): def on_trade_report(self, d: OesRspMsgBodyT):
"""""" """"""
@ -255,19 +265,19 @@ class OesTdMessageLoop:
gateway_name=self.gateway.gateway_name, gateway_name=self.gateway.gateway_name,
symbol=data.securityId, symbol=data.securityId,
exchange=EXCHANGE_OES2VT[data.mktId], exchange=EXCHANGE_OES2VT[data.mktId],
orderid=data.userInfo, orderid=str(data.clSeqNo),
tradeid=data.exchTrdNum, tradeid=str(data.exchTrdNum),
direction=vt_order.direction, direction=vt_order.direction,
offset=vt_order.offset, offset=vt_order.offset,
price=data.trdPrice / 10000, price=data.trdPrice / 10000,
volume=data.trdQty, volume=data.trdQty,
time=parse_oes_datetime(data.trdDate, data.trdTime) time=parse_oes_datetime(data.trdDate, data.trdTime).isoformat()
) )
vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.status = STATUS_OES2VT[data.ordStatus]
vt_order.traded = data.cumQty vt_order.traded = data.cumQty
vt_order.time = parse_oes_datetime(data.trdDate, data.trdTime) vt_order.time = parse_oes_datetime(data.trdDate, data.trdTime)
self.gateway.on_trade(trade) self.gateway.on_trade(trade)
self.gateway.on_order(vt_order) self.gateway.on_order(copy(vt_order))
def on_option_holding(self, d: OesRspMsgBodyT): def on_option_holding(self, d: OesRspMsgBodyT):
"""""" """"""
@ -282,7 +292,7 @@ class OesTdMessageLoop:
exchange=EXCHANGE_OES2VT[data.mktId], exchange=EXCHANGE_OES2VT[data.mktId],
direction=Direction.NET, direction=Direction.NET,
volume=data.sumHld, volume=data.sumHld,
frozen=data.lockHld, # todo: to verify frozen=data.lockHld,
price=data.costPrice / 10000, price=data.costPrice / 10000,
# pnl=data.costPrice - data.originalCostAmt, # pnl=data.costPrice - data.originalCostAmt,
pnl=0, pnl=0,
@ -312,9 +322,14 @@ class OesTdApi:
def __init__(self, gateway: BaseGateway): def __init__(self, gateway: BaseGateway):
"""""" """"""
self.config_path: str = None self.config_path: str = ''
self.ord_server: str = ''
self.qry_server: str = ''
self.rpt_server: str = ''
self.username: str = '' self.username: str = ''
self.password: str = '' self.password: str = ''
self.hdd_serial: str = ''
self.gateway = gateway self.gateway = gateway
self._env = OesApiClientEnvT() self._env = OesApiClientEnvT()
@ -325,6 +340,7 @@ class OesTdApi:
self._last_seq_lock = Lock() self._last_seq_lock = Lock()
self._last_seq_index = 1000000 # 0 has special manning for oes self._last_seq_index = 1000000 # 0 has special manning for oes
self._ord_reconnect_lock = Lock()
self._orders: Dict[int, InternalOrder] = {} self._orders: Dict[int, InternalOrder] = {}
@ -332,20 +348,18 @@ class OesTdApi:
"""Connect to trading server. """Connect to trading server.
:note set config_path before calling this function :note set config_path before calling this function
""" """
OesApi_SetThreadUsername(self.username) OesApi_InitLogger(self.config_path, 'log')
OesApi_SetThreadPassword(self.password)
config_path = self.config_path OesApi_SetCustomizedDriverId(self.hdd_serial)
if not OesApi_InitAllByConvention(self._env, config_path, -1, self._last_seq_index):
return False
self._last_seq_index = max(self._last_seq_index, self._env.ordChannel.lastOutMsgSeq + 1)
if not OesApi_IsValidOrdChannel(self._env.ordChannel): if not self._connect_ord_channel():
return False self.gateway.write_log(_("无法初始化交易下单通道(td_ord_server)"))
if not OesApi_IsValidQryChannel(self._env.qryChannel):
return False if not self._connect_qry_channel():
if not OesApi_IsValidRptChannel(self._env.rptChannel): self.gateway.write_log(_("无法初始化交易查询通道(td_qry_server)"))
return False
if not self.connect_rpt_channel():
self.gateway.write_log(_("无法初始化交易查询通道(td_qry_server)"))
return True return True
def start(self): def start(self):
@ -369,6 +383,60 @@ class OesTdApi:
self._last_seq_index += 1 self._last_seq_index += 1
return index return index
def _connect_qry_channel(self):
OesApi_SetThreadUsername(self.username)
OesApi_SetThreadPassword(self.password)
return OesApi_InitQryChannel2(self._env.qryChannel,
create_remote_config(self.qry_server,
self.username,
self.password))
def _connect_ord_channel(self):
OesApi_SetThreadUsername(self.username)
OesApi_SetThreadPassword(self.password)
if not OesApi_InitOrdChannel2(self._env.ordChannel,
create_remote_config(self.ord_server,
self.username,
self.password),
0):
return False
self._last_seq_index = max(self._last_seq_index, self._env.ordChannel.lastOutMsgSeq + 1)
return True
def connect_rpt_channel(self):
OesApi_SetThreadUsername(self.username)
OesApi_SetThreadPassword(self.password)
subscribe_info = OesApiSubscribeInfoT()
subscribe_info.clEnvId = 0
subscribe_info.rptTypes = (eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_BUSINESS_REJECT
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_ORDER_INSERT
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_ORDER_REPORT
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_TRADE_REPORT
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_FUND_TRSF_REPORT
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_CASH_ASSET_VARIATION
| eOesSubscribeReportTypeT.OES_SUB_RPT_TYPE_HOLDING_VARIATION
)
return OesApi_InitRptChannel2(self._env.rptChannel,
create_remote_config(self.rpt_server,
self.username,
self.password),
subscribe_info,
0)
def _reconnect_ord_channel(self):
with self._ord_reconnect_lock: # prevent spawning multiple reconnect thread
self.gateway.write_log(_("正在重新连接到交易下单通道"))
while not self._connect_ord_channel():
time.sleep(1)
self.gateway.write_log(_("成功重新连接到交易下单通道"))
def _schedule_reconnect_ord_channel(self):
Thread(target=self._reconnect_ord_channel, ).start()
def query_account(self): def query_account(self):
"""""" """"""
OesApi_QueryCashAsset(self._env.qryChannel, OesApi_QueryCashAsset(self._env.qryChannel,
@ -479,7 +547,7 @@ class OesTdApi:
frozen=data.lockHld, frozen=data.lockHld,
price=data.costPrice / 10000, price=data.costPrice / 10000,
# pnl=data.costPrice - data.originalCostAmt, # pnl=data.costPrice - data.originalCostAmt,
pnl=0, # todo: oes只提供日初持仓价格信息不提供最初持仓价格信息所以pnl只有当日的 pnl=0,
yd_volume=data.originalHld, yd_volume=data.originalHld,
) )
self.gateway.on_position(position) self.gateway.on_position(position)
@ -489,7 +557,6 @@ class OesTdApi:
"""""" """"""
f = OesQryStkHoldingFilterT() f = OesQryStkHoldingFilterT()
f.mktId = eOesMarketIdT.OES_MKT_ID_UNDEFINE f.mktId = eOesMarketIdT.OES_MKT_ID_UNDEFINE
f.userInfo = 0
ret = OesApi_QueryOptHolding(self._env.qryChannel, ret = OesApi_QueryOptHolding(self._env.qryChannel,
f, f,
self.on_query_holding self.on_query_holding
@ -573,9 +640,14 @@ class OesTdApi:
) )
if ret >= 0: if ret >= 0:
self.gateway.on_order(order) order.status = Status.SUBMITTING
else: else:
self.gateway.write_log("Failed to send_order!") order.status = Status.REJECTED
self.gateway.write_log(_("下单失败")) # todo: can I stringify error?
if is_disconnected(ret):
self.gateway.write_log(_("下单时连接发现连接已断开,正在尝试重连"))
self._schedule_reconnect_ord_channel()
self.gateway.on_order(order)
return order.vt_orderid return order.vt_orderid
@ -591,8 +663,14 @@ class OesTdApi:
oes_req.origClSeqNo = order_id oes_req.origClSeqNo = order_id
oes_req.invAcctId = "" oes_req.invAcctId = ""
oes_req.securityId = vt_req.symbol oes_req.securityId = vt_req.symbol
OesApi_SendOrderCancelReq(self._env.ordChannel,
oes_req) ret = OesApi_SendOrderCancelReq(self._env.ordChannel,
oes_req)
if ret < 0:
self.gateway.write_log(_("撤单失败")) # todo: can I stringify error?
if is_disconnected(ret): # is here any other ret code indicating connection lost?
self.gateway.write_log(_("撤单时连接发现连接已断开,正在尝试重连"))
self._schedule_reconnect_ord_channel()
def query_order(self, internal_order: InternalOrder) -> bool: def query_order(self, internal_order: InternalOrder) -> bool:
"""""" """"""
@ -616,9 +694,9 @@ class OesTdApi:
i = self.get_order(data.clSeqNo) i = self.get_order(data.clSeqNo)
vt_order = i.vt_order vt_order = i.vt_order
vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.status = STATUS_OES2VT[data.ordStatus]
vt_order.volume = data.ordQty - data.canceledQty vt_order.volume = data.ordQty
vt_order.traded = data.cumQty vt_order.traded = data.cumQty
self.gateway.on_order(vt_order) self.gateway.on_order(copy(vt_order))
return 1 return 1
def query_orders(self) -> bool: def query_orders(self) -> bool:
@ -642,9 +720,9 @@ class OesTdApi:
i = self.get_order(data.clSeqNo) i = self.get_order(data.clSeqNo)
vt_order = i.vt_order vt_order = i.vt_order
vt_order.status = STATUS_OES2VT[data.ordStatus] vt_order.status = STATUS_OES2VT[data.ordStatus]
vt_order.volume = data.ordQty - data.canceledQty vt_order.volume = data.ordQty
vt_order.traded = data.cumQty vt_order.traded = data.cumQty
self.gateway.on_order(vt_order) self.gateway.on_order(copy(vt_order))
except KeyError: except KeyError:
# order_id = self.order_manager.new_remote_id() # order_id = self.order_manager.new_remote_id()
order_id = data.clSeqNo order_id = data.clSeqNo
@ -658,7 +736,7 @@ class OesTdApi:
gateway_name=self.gateway.gateway_name, gateway_name=self.gateway.gateway_name,
symbol=data.securityId, symbol=data.securityId,
exchange=EXCHANGE_OES2VT[data.mktId], exchange=EXCHANGE_OES2VT[data.mktId],
orderid=order_id if order_id else data.origClSeqNo, # generated id orderid=str(order_id if order_id else data.origClSeqNo), # generated id
direction=Direction.NET, direction=Direction.NET,
offset=offset, offset=offset,
price=data.ordPrice / 10000, price=data.ordPrice / 10000,
@ -670,7 +748,7 @@ class OesTdApi:
time=datetime.utcnow().isoformat(), time=datetime.utcnow().isoformat(),
) )
self.save_order(order_id, vt_order) self.save_order(order_id, vt_order)
self.gateway.on_order(vt_order) self.gateway.on_order(copy(vt_order))
return 1 return 1
def save_order(self, order_id: int, order: OrderData): def save_order(self, order_id: int, order: OrderData):

View File

@ -1,6 +1,43 @@
import os import os
from vnpy.api.oes.vnoes import SGeneralClientRemoteCfgT
mydir = os.path.dirname(__file__) mydir = os.path.dirname(__file__)
config_template_path = os.path.join(mydir, "config_template.ini") config_template_path = os.path.join(mydir, "config_template.ini")
with open(config_template_path, "rt", encoding='utf-8') as f: with open(config_template_path, "rt", encoding='utf-8') as f:
config_template = f.read() config_template = f.read()
def create_remote_config(server: str, username: str, password: str):
"""
create a SGeneralClientRemoteCfgT.
"""
cfg = SGeneralClientRemoteCfgT()
cfg.username = username
cfg.password = password
cfg.addrCnt = 1
cfg.addrList[0].uri = server
cfg.clEnvId = 0
cfg.clusterType = 0
cfg.socketOpt.soRcvbuf = 8192
cfg.socketOpt.soSndbuf = 8192
cfg.socketOpt.connTimeoutMs = 5000
cfg.socketOpt.tcpNodelay = 1
cfg.socketOpt.quickAck = 1
cfg.socketOpt.keepalive = 1
cfg.socketOpt.keepIdle = 60
cfg.socketOpt.keepIntvl = 5
cfg.socketOpt.keepCnt = 9
return cfg
def is_disconnected(ret: int):
"""
check whether connection is lost by return value of OesApi/MdsApi
106 : ECONNABORTED
107 : ECONNREFUSED
108 : ECONNRESET
maybe there is more than there error codes indicating a disconnected state
"""
return ret == -106 or ret == -107 or ret == -108