[Fix] OesTdApi:修复退出时还在重连的Bug

[Fix] 修复无法撤销非本会话订单的BUG
[Fix] 填充order&trade中的time字段
[Fix] 冻结资金不再是计算得出,而是使用精确的挂单冻结资金。
This commit is contained in:
nanoric 2019-03-08 06:28:47 -04:00
parent e2e93cef3b
commit 897e7f0372

View File

@ -1,5 +1,5 @@
from dataclasses import dataclass
from datetime import datetime
from datetime import datetime, timedelta, timezone
from gettext import gettext as _
from threading import Lock, Thread
# noinspection PyUnresolvedReferences
@ -7,15 +7,14 @@ from typing import Any, Callable, Dict, Tuple
from vnpy.api.oes.vnoes import OesApiClientEnvT, OesApi_DestoryAll, OesApi_InitAllByConvention, \
OesApi_IsValidOrdChannel, OesApi_IsValidQryChannel, OesApi_IsValidRptChannel, OesApi_LogoutAll, \
OesApi_QueryCashAsset, OesApi_QueryOptHolding, \
OesApi_QueryOption, OesApi_QueryOrder, OesApi_QueryStkHolding, OesApi_QueryStock, \
OesApi_SendOrderCancelReq, OesApi_SendOrderReq, OesApi_WaitReportMsg, OesOrdCancelReqT, \
OesApi_QueryCashAsset, OesApi_QueryOptHolding, OesApi_QueryOption, OesApi_QueryOrder, \
OesApi_QueryStkHolding, OesApi_QueryStock, OesApi_SendOrderCancelReq, OesApi_SendOrderReq, \
OesApi_SetThreadPassword, OesApi_SetThreadUsername, OesApi_WaitReportMsg, OesOrdCancelReqT, \
OesOrdCnfmT, OesOrdRejectT, OesOrdReqT, OesQryCashAssetFilterT, OesQryCursorT, \
OesQryOptionFilterT, OesQryOrdFilterT, OesQryStkHoldingFilterT, \
OesQryStockFilterT, OesRspMsgBodyT, OesStockBaseInfoT, OesTrdCnfmT, SGeneralClientChannelT, \
SMSG_PROTO_BINARY, SMsgHeadT, SPlatform_IsNegEpipe, SPlatform_IsNegEtimeout, cast, \
eOesBuySellTypeT, eOesMarketIdT, eOesMsgTypeT, eOesOrdStatusT, eOesOrdTypeShT, eOesOrdTypeSzT, \
OesApi_SetThreadUsername, OesApi_SetThreadPassword
OesQryOptionFilterT, OesQryOrdFilterT, OesQryStkHoldingFilterT, OesQryStockFilterT, \
OesRspMsgBodyT, OesStockBaseInfoT, OesTrdCnfmT, SGeneralClientChannelT, SMSG_PROTO_BINARY, \
SMsgHeadT, SPlatform_IsNegEpipe, cast, eOesBuySellTypeT, eOesMarketIdT, \
eOesMsgTypeT, eOesOrdStatusT, eOesOrdTypeShT, eOesOrdTypeSzT
from vnpy.gateway.oes.error_code import error_to_str
from vnpy.trader.constant import Direction, Exchange, Offset, PriceType, Product, Status
@ -86,6 +85,8 @@ STATUS_OES2VT = {
eOesOrdStatusT.OES_ORD_STATUS_INVALID_SZ_TRY_AGAIN: Status.REJECTED,
}
bjtz = timezone(timedelta(hours=8))
@dataclass
class InternalOrder:
@ -93,6 +94,20 @@ class InternalOrder:
vt_order: OrderData = None
def parse_oes_datetime(date: int, time: int):
# YYYYMMDD
year = int(date / 10000)
month = int((date % 10000) / 100)
day = int(date % 100)
# HHMMSSsss
hour = int(time / 10000000)
minute = int((time % 10000000) / 100000)
sec = int((time % 100000) / 1000)
mill = int(time % 1000)
return datetime(year, month, day, hour, minute, sec, mill * 1000, tzinfo=bjtz)
class OrderManager:
def __init__(self):
@ -122,12 +137,6 @@ class OrderManager:
def get_from_order_id(self, id: int):
return self._orders[id]
def get_from_oes_data(self, data):
return data.clSeqNo
def get_order_id_from_data(self, data):
return self.get_from_order_id(self.get_from_oes_data(data))
class OesTdMessageLoop:
@ -138,12 +147,12 @@ class OesTdMessageLoop:
td: "OesTdApi"
):
self.gateway = gateway
self.env = env
self.order_manager = order_manager
self.td = td
self._td = td
self._env = env
self._order_manager = order_manager
self.alive = False
self.th = Thread(target=self.message_loop)
self._alive = False
self._th = Thread(target=self.message_loop)
self.message_handlers: Dict[int, Callable[[dict], None]] = {
eOesMsgTypeT.OESMSG_RPT_BUSINESS_REJECT: self.on_order_rejected,
@ -153,15 +162,20 @@ class OesTdMessageLoop:
eOesMsgTypeT.OESMSG_RPT_STOCK_HOLDING_VARIATION: self.on_stock_holding,
eOesMsgTypeT.OESMSG_RPT_OPTION_HOLDING_VARIATION: self.on_option_holding,
eOesMsgTypeT.OESMSG_RPT_CASH_ASSET_VARIATION: self.on_cash,
eOesMsgTypeT.OESMSG_SESS_HEARTBEAT: lambda x: x,
eOesMsgTypeT.OESMSG_RPT_REPORT_SYNCHRONIZATION: lambda x: 1,
eOesMsgTypeT.OESMSG_SESS_HEARTBEAT: lambda x: 1,
}
def start(self):
self.alive = True
self.th.start()
self._alive = True
self._th.start()
def stop(self):
self._alive = False
def join(self):
self.th.join()
self._th.join()
def on_message(self, session_info: SGeneralClientChannelT,
head: SMsgHeadT,
@ -179,15 +193,14 @@ class OesTdMessageLoop:
def reconnect(self):
self.gateway.write_log(_("正在尝试重新连接到交易服务器。"))
self.td.connect()
self._td.connect()
def message_loop(self):
rpt_channel = self.env.rptChannel
rpt_channel = self._env.rptChannel
timeout_ms = 1000
is_timeout = SPlatform_IsNegEtimeout
is_disconnected = SPlatform_IsNegEpipe
while self.alive:
while self._alive:
ret = OesApi_WaitReportMsg(rpt_channel,
timeout_ms,
self.on_message)
@ -195,7 +208,9 @@ class OesTdMessageLoop:
# if is_timeout(ret):
# pass # just no message
if is_disconnected(ret):
self.reconnect()
self.gateway.write_log(_("与交易服务器的连接已断开。"))
while not self.reconnect() and self._alive:
pass
return
def on_order_rejected(self, d: OesRspMsgBodyT):
@ -203,7 +218,7 @@ class OesTdMessageLoop:
error_string = error_to_str(error_code)
data: OesOrdRejectT = d.rptMsg.rptBody.ordRejectRsp
if not data.origClSeqNo:
i = self.order_manager.get_from_order_id(data.clSeqNo)
i = self._order_manager.get_from_order_id(data.clSeqNo)
vt_order = i.vt_order
if vt_order == Status.ALLTRADED:
@ -215,33 +230,41 @@ class OesTdMessageLoop:
self.gateway.write_log(
f"Order: {vt_order.vt_symbol}-{vt_order.vt_orderid} Code: {error_code} Rejected: {error_string}")
else:
self.gateway.write_log(f"Failed to cancel Order, id: {data.origClSeqNo}")
self.gateway.write_log(f"撤单失败,订单号: {data.origClSeqNo}。原因:{error_string}")
def on_order_inserted(self, d: OesRspMsgBodyT):
data = d.rptMsg.rptBody.ordInsertRsp
i = self.order_manager.get_from_oes_data(data)
if not data.origClSeqNo:
i = self._order_manager.get_from_order_id(data.clSeqNo)
else:
i = self._order_manager.get_from_order_id(data.origClSeqNo)
vt_order = i.vt_order
vt_order.status = STATUS_OES2VT[data.ordStatus]
vt_order.volume = data.ordQty
vt_order.traded = data.cumQty
vt_order.time = parse_oes_datetime(data.ordDate, data.ordTime)
self.gateway.on_order(vt_order)
def on_order_report(self, d: OesRspMsgBodyT):
data: OesOrdCnfmT = d.rptMsg.rptBody.ordCnfm
i = self.order_manager.get_from_oes_data(data)
if not data.origClSeqNo:
i = self._order_manager.get_from_order_id(data.clSeqNo)
else:
i = self._order_manager.get_from_order_id(data.origClSeqNo)
vt_order = i.vt_order
vt_order.status = STATUS_OES2VT[data.ordStatus]
vt_order.volume = data.ordQty
vt_order.traded = data.cumQty
vt_order.time = parse_oes_datetime(data.ordDate, data.ordCnfmTime)
self.gateway.on_order(vt_order)
def on_trade_report(self, d: OesRspMsgBodyT):
data: OesTrdCnfmT = d.rptMsg.rptBody.trdCnfm
i = self.order_manager.get_from_oes_data(data)
i = self._order_manager.get_from_order_id(data.clSeqNo)
vt_order = i.vt_order
# vt_order.status = STATUS_OES2VT[data.ordStatus]
@ -255,7 +278,7 @@ class OesTdMessageLoop:
offset=vt_order.offset,
price=data.trdPrice / 10000,
volume=data.trdQty,
time=datetime.utcnow().isoformat() # strict
time=parse_oes_datetime(data.trdDate, data.trdTime)
)
self.gateway.on_trade(trade)
@ -268,7 +291,7 @@ class OesTdMessageLoop:
# Oes have no async call to query order only.
# And calling sync function here will slow down vnpy.
# So we queue it into another thread.
self.td.schedule_query_order(i)
self._td.schedule_query_order(i)
def on_option_holding(self, d: OesRspMsgBodyT):
pass
@ -305,9 +328,6 @@ class OesTdMessageLoop:
self.gateway.on_account(account)
return 1
def stop(self):
self.alive = False
class OesTdApi:
@ -571,8 +591,6 @@ class OesTdApi:
oes_req = OesOrdCancelReqT()
order_id = int(vt_req.orderid)
internal_order = self._order_manager.get_from_order_id(order_id)
oes_req.origClOrdId = internal_order.order_id
oes_req.mktId = EXCHANGE_VT2OES[vt_req.exchange]
oes_req.clSeqNo = seq_id
@ -603,7 +621,8 @@ class OesTdApi:
body: Any,
cursor: OesQryCursorT):
data: OesOrdCnfmT = cast.toOesOrdItemT(body)
i = self._order_manager.get_from_oes_data(data)
i = self._order_manager.get_from_order_id(data.clSeqNo)
vt_order = i.vt_order
vt_order.status = STATUS_OES2VT[data.ordStatus]
vt_order.volume = data.ordQty - data.canceledQty
@ -631,7 +650,7 @@ class OesTdApi:
):
data: OesOrdCnfmT = cast.toOesOrdItemT(body)
try:
i = self._order_manager.get_from_oes_data(data)
i = self._order_manager.get_from_order_id(data.clSeqNo)
vt_order = i.vt_order
vt_order.status = STATUS_OES2VT[data.ordStatus]
vt_order.volume = data.ordQty - data.canceledQty
@ -639,7 +658,7 @@ class OesTdApi:
self.gateway.on_order(vt_order)
except KeyError:
# order_id = self.order_manager.new_remote_id()
order_id = self._order_manager.get_order_id_from_data(data)
order_id = data.clSeqNo
if data.bsType == eOesBuySellTypeT.OES_BS_TYPE_BUY:
offset = Offset.OPEN