Merge pull request #1520 from nanoric/oes_use_push

[Mod] OesGateway: 使用推送的初始化信息,而不是主动查询
This commit is contained in:
vn.py 2019-03-25 14:45:51 +08:00 committed by GitHub
commit a97a41c645
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 51 deletions

View File

@ -94,11 +94,11 @@ class OesGateway(BaseGateway):
self.td_api.password = password
if self.td_api.connect():
self.write_log(_("成功连接到交易服务器"))
self.td_api.query_account()
self.td_api.query_contracts()
# self.td_api.query_account()
self.write_log("合约信息查询成功")
self.td_api.query_position()
self.td_api.query_orders()
# self.td_api.query_position()
# self.td_api.query_orders()
self.td_api.start()
else:
self.write_log(_("无法连接到交易服务器,请检查你的配置"))

View File

@ -40,8 +40,8 @@ class OesMdMessageLoop:
self.message_handlers: Dict[eMdsMsgTypeT, Callable[[MdsMktRspMsgBodyT], int]] = {
# tick & orderbook
eMdsMsgTypeT.MDS_MSGTYPE_MARKET_DATA_SNAPSHOT_FULL_REFRESH: self.on_market_full_refresh,
eMdsMsgTypeT.MDS_MSGTYPE_L2_MARKET_DATA_SNAPSHOT: self.on_l2_market_data_snapshot,
eMdsMsgTypeT.MDS_MSGTYPE_MARKET_DATA_SNAPSHOT_FULL_REFRESH: self.on_init_tick,
eMdsMsgTypeT.MDS_MSGTYPE_L2_MARKET_DATA_SNAPSHOT: self.on_l2_tick,
eMdsMsgTypeT.MDS_MSGTYPE_L2_ORDER: self.on_l2_order,
eMdsMsgTypeT.MDS_MSGTYPE_L2_TRADE: self.on_l2_trade,
@ -132,7 +132,7 @@ class OesMdMessageLoop:
time.sleep(1)
return
def on_l2_market_data_snapshot(self, d: MdsMktRspMsgBodyT):
def on_l2_tick(self, d: MdsMktRspMsgBodyT):
""""""
data: MdsL2StockSnapshotBodyT = d.mktDataSnapshot.l2Stock
symbol = str(data.SecurityID)
@ -148,7 +148,7 @@ class OesMdMessageLoop:
tick.__dict__['ask_price_' + str(i + 1)] = data.OfferLevels[i].Price / 10000
self.gateway.on_tick(copy(tick))
def on_market_full_refresh(self, d: MdsMktRspMsgBodyT):
def on_init_tick(self, d: MdsMktRspMsgBodyT):
""""""
data: MdsStockSnapshotBodyT = d.mktDataSnapshot.stock
symbol = data.SecurityID

View File

@ -120,13 +120,15 @@ class OesTdMessageLoop:
def __init__(self,
gateway: BaseGateway,
env: OesApiClientEnvT,
td: "OesTdApi"
td: "OesTdApi",
order_manager: "OrderManager",
):
""""""
self.gateway = gateway
self._env = env
self._td = td
self._order_manager = order_manager
self._alive = False
self._th = Thread(target=self._message_loop)
@ -203,7 +205,7 @@ class OesTdMessageLoop:
data: OesOrdRejectT = d.rptMsg.rptBody.ordRejectRsp
if not data.origClSeqNo:
try:
i = self._td.get_order(data.clSeqNo)
i = self._order_manager.get_order(data.clSeqNo)
except KeyError:
return # rejected order created by others, don't need to care.
@ -223,46 +225,22 @@ class OesTdMessageLoop:
""""""
data = d.rptMsg.rptBody.ordInsertRsp
if not data.origClSeqNo:
# normal order
i = self._td.get_order(data.clSeqNo)
else:
# data.ordStatus == eOesOrdStatusT.OES_ORD_STATUS_CANCEL_DONE:
i = self._td.get_order(data.origClSeqNo)
vt_order = i.vt_order
vt_order.status = STATUS_OES2VT[data.ordStatus]
vt_order.volume = data.ordQty
vt_order.traded = data.cumQty
vt_order.time = parse_oes_datetime(data.ordDate, data.ordTime)
vt_order = self._order_manager.oes_order_to_vt(data)
self.gateway.on_order(copy(vt_order))
def on_order_report(self, d: OesRspMsgBodyT):
""""""
data: OesOrdCnfmT = d.rptMsg.rptBody.ordCnfm
if not data.origClSeqNo:
# normal order
i = self._td.get_order(data.clSeqNo)
else:
# data.ordStatus == eOesOrdStatusT.OES_ORD_STATUS_CANCEL_DONE:
i = self._td.get_order(data.origClSeqNo)
vt_order = i.vt_order
vt_order.status = STATUS_OES2VT[data.ordStatus]
vt_order.volume = data.ordQty
vt_order.traded = data.cumQty
vt_order.time = parse_oes_datetime(data.ordDate, data.ordCnfmTime)
vt_order = self._order_manager.oes_order_to_vt(data)
self.gateway.on_order(copy(vt_order))
def on_trade_report(self, d: OesRspMsgBodyT):
""""""
data: OesTrdCnfmT = d.rptMsg.rptBody.trdCnfm
i = self._td.get_order(data.clSeqNo)
i = self._order_manager.get_order(data.clSeqNo)
vt_order = i.vt_order
# vt_order.status = STATUS_OES2VT[data.ordStatus]
trade = TradeData(
gateway_name=self.gateway.gateway_name,
@ -337,16 +315,17 @@ class OesTdApi:
self._env = OesApiClientEnvT()
self._order_manager: "OrderManager" = OrderManager(self.gateway.gateway_name)
self._message_loop = OesTdMessageLoop(gateway,
self._env,
self)
self,
self._order_manager
)
self._last_seq_lock = Lock()
self._last_seq_index = 1000000 # 0 has special manning for oes
self._ord_reconnect_lock = Lock()
self._orders: Dict[int, InternalOrder] = {}
def connect(self):
"""Connect to trading server.
:note set config_path before calling this function
@ -636,7 +615,7 @@ class OesTdApi:
order = vt_req.create_order_data(str(order_id), self.gateway.gateway_name)
order.direction = Direction.NET # fix direction into NET: stock only
self.save_order(order_id, order)
self._order_manager.save_order(order_id, order)
ret = OesApi_SendOrderReq(self._env.ordChannel,
oes_req
@ -694,7 +673,7 @@ class OesTdApi:
""""""
data: OesOrdCnfmT = cast.toOesOrdItemT(body)
i = self.get_order(data.clSeqNo)
i = self._order_manager.get_order(data.clSeqNo)
vt_order = i.vt_order
vt_order.status = STATUS_OES2VT[data.ordStatus]
vt_order.volume = data.ordQty
@ -719,24 +698,35 @@ class OesTdApi:
):
""""""
data: OesOrdCnfmT = cast.toOesOrdItemT(body)
vt_order = self._order_manager.oes_order_to_vt(data)
self.gateway.on_order(vt_order)
return 1
class OrderManager:
def __init__(self, gateway_name: str):
self._orders: Dict[int, InternalOrder] = {}
self.gateway_name = gateway_name
def oes_order_to_vt(self, data):
order_id = data.clSeqNo
if hasattr(data, "origClSeqNo") and data.origClSeqNo:
order_id = data.origClSeqNo
try:
i = self.get_order(data.clSeqNo)
i = self.get_order(order_id)
vt_order = i.vt_order
vt_order.status = STATUS_OES2VT[data.ordStatus]
vt_order.volume = data.ordQty
vt_order.traded = data.cumQty
self.gateway.on_order(copy(vt_order))
vt_order.time = parse_oes_datetime(data.ordDate, data.ordTime).isoformat()
except KeyError:
# order_id = self.order_manager.new_remote_id()
order_id = data.clSeqNo
if data.bsType == eOesBuySellTypeT.OES_BS_TYPE_BUY:
offset = Offset.OPEN
else:
offset = Offset.CLOSE
vt_order = OrderData(
gateway_name=self.gateway.gateway_name,
gateway_name=self.gateway_name,
symbol=data.securityId,
exchange=EXCHANGE_OES2VT[data.mktId],
orderid=str(order_id if order_id else data.origClSeqNo), # generated id
@ -744,17 +734,16 @@ class OesTdApi:
direction=Direction.NET,
offset=offset,
price=data.ordPrice / 10000,
volume=data.ordQty - data.canceledQty,
volume=data.ordQty,
traded=data.cumQty,
status=STATUS_OES2VT[
data.ordStatus],
# this time should be generated automatically or by a static function
time=datetime.utcnow().isoformat(),
time=parse_oes_datetime(data.ordDate, data.ordCnfmTime).isoformat(),
)
self.save_order(order_id, vt_order)
self.gateway.on_order(copy(vt_order))
return 1
return vt_order
def save_order(self, order_id: int, order: OrderData):
""""""