From eb5aac2595d350e84b55149c93da2a846eaf3c7e Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 8 Mar 2019 07:26:04 -0400 Subject: [PATCH] =?UTF-8?q?[Mod]=20=E7=BB=9F=E4=B8=80=E6=A0=BC=E5=BC=8F?= =?UTF-8?q?=EF=BC=8C=E9=9A=90=E8=97=8F=E5=86=85=E9=83=A8=E5=8F=98=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/gateway/oes/oes_gateway.py | 2 + vnpy/gateway/oes/oes_md.py | 79 ++++++++++++++++++++++----------- vnpy/gateway/oes/oes_td.py | 65 +++++++++++++++++++++------ 3 files changed, 106 insertions(+), 40 deletions(-) diff --git a/vnpy/gateway/oes/oes_gateway.py b/vnpy/gateway/oes/oes_gateway.py index 3f7ad72e..2cdd0f58 100644 --- a/vnpy/gateway/oes/oes_gateway.py +++ b/vnpy/gateway/oes/oes_gateway.py @@ -38,6 +38,7 @@ class OesGateway(BaseGateway): self.td_api = OesTdApi(self) def connect(self, setting: dict): + """""" return self._connect_async(setting) def _connect_sync(self, setting: dict): @@ -89,6 +90,7 @@ class OesGateway(BaseGateway): self.write_log(_("无法连接到交易服务器,请检查你的配置")) def _connect_async(self, setting: dict): + """""" Thread(target=self._connect_sync, args=(setting,)).start() def subscribe(self, req: SubscribeRequest): diff --git a/vnpy/gateway/oes/oes_md.py b/vnpy/gateway/oes/oes_md.py index 75d41432..1d45e56f 100644 --- a/vnpy/gateway/oes/oes_md.py +++ b/vnpy/gateway/oes/oes_md.py @@ -9,7 +9,7 @@ from vnpy.api.oes.vnoes import MdsApiClientEnvT, MdsApi_DestoryAll, MdsApi_InitA MdsApi_IsValidQryChannel, MdsApi_IsValidTcpChannel, MdsApi_LogoutAll, MdsApi_SetThreadPassword, \ MdsApi_SetThreadUsername, MdsApi_SubscribeMarketData, MdsApi_WaitOnMsg, MdsL2StockSnapshotBodyT, \ MdsMktDataRequestEntryT, MdsMktDataRequestReqT, MdsMktRspMsgBodyT, MdsStockSnapshotBodyT, \ - SGeneralClientChannelT, SMsgHeadT, SPlatform_IsNegEpipe, SPlatform_IsNegEtimeout, cast, \ + SGeneralClientChannelT, SMsgHeadT, SPlatform_IsNegEpipe, cast, \ eMdsExchangeIdT, eMdsMktSubscribeFlagT, eMdsMsgTypeT, eMdsSecurityTypeT, eMdsSubscribeDataTypeT, \ eMdsSubscribeModeT, eMdsSubscribedTickExpireTypeT, eSMsgProtocolTypeT @@ -28,11 +28,12 @@ EXCHANGE_VT2MDS = {v: k for k, v in EXCHANGE_MDS2VT.items()} class OesMdMessageLoop: def __init__(self, gateway: BaseGateway, md: "OesMdApi", env: MdsApiClientEnvT): + """""" self.gateway = gateway self.env = env self.alive = False self.md = md - self.th = Thread(target=self.message_loop) + self.th = Thread(target=self._message_loop) self.message_handlers: Dict[int, Callable[[dict], None]] = { # tick & orderbook @@ -58,9 +59,29 @@ class OesMdMessageLoop: self.symbol_to_exchange: Dict[str, Exchange] = {} def register_symbol(self, symbol: str, exchange: Exchange): + """""" self.symbol_to_exchange[symbol] = exchange - def get_last_tick(self, symbol): + def start(self): + """""" + self.alive = True + self.th.start() + + def stop(self): + """""" + self.alive = False + + def join(self): + """""" + self.th.join() + + def reconnect(self): + """""" + self.gateway.write_log(_("正在尝试重新连接到行情服务器。")) + return self.md.connect() + + def _get_last_tick(self, symbol): + """""" try: return self.last_tick[symbol] except KeyError: @@ -68,25 +89,15 @@ class OesMdMessageLoop: gateway_name=self.gateway.gateway_name, symbol=symbol, exchange=self.symbol_to_exchange[symbol], - # todo: use cache of something else to resolve exchange datetime=datetime.utcnow() ) self.last_tick[symbol] = tick return tick - def start(self): - self.alive = True - self.th.start() - - def stop(self): - self.alive = False - - def join(self): - self.th.join() - - def on_message(self, session_info: SGeneralClientChannelT, - head: SMsgHeadT, - body: Any): + def _on_message(self, session_info: SGeneralClientChannelT, + head: SMsgHeadT, + body: Any): + """""" if session_info.protocolType == eSMsgProtocolTypeT.SMSG_PROTO_BINARY: b = cast.toMdsMktRspMsgBodyT(body) if head.msgId in self.message_handlers: @@ -100,19 +111,16 @@ class OesMdMessageLoop: self.gateway.write_log(f"unknown prototype : {session_info.protocolType}") return 1 - def reconnect(self): - self.gateway.write_log(_("正在尝试重新连接到行情服务器。")) - return self.md.connect() - - def message_loop(self): + def _message_loop(self): + """""" tcp_channel = self.env.tcpChannel timeout_ms = 1000 - is_timeout = SPlatform_IsNegEtimeout + # is_timeout = SPlatform_IsNegEtimeout is_disconnected = SPlatform_IsNegEpipe while self.alive: ret = MdsApi_WaitOnMsg(tcp_channel, timeout_ms, - self.on_message) + self._on_message) if ret < 0: # if is_timeout(ret): # pass # just no message @@ -123,9 +131,10 @@ class OesMdMessageLoop: return def on_l2_market_data_snapshot(self, d: MdsMktRspMsgBodyT): + """""" data: MdsL2StockSnapshotBodyT = d.mktDataSnapshot.l2Stock symbol = str(data.SecurityID) - tick = self.get_last_tick(symbol) + tick = self._get_last_tick(symbol) tick.open_price = data.OpenPx / 10000 tick.pre_close = data.ClosePx / 10000 tick.high_price = data.HighPx / 10000 @@ -138,9 +147,10 @@ class OesMdMessageLoop: self.gateway.on_tick(tick) def on_market_full_refresh(self, d: MdsMktRspMsgBodyT): + """""" data: MdsStockSnapshotBodyT = d.mktDataSnapshot.stock symbol = data.SecurityID - tick = self.get_last_tick(symbol) + tick = self._get_last_tick(symbol) tick.open_price = data.OpenPx / 10000 tick.pre_close = data.ClosePx / 10000 tick.high_price = data.HighPx / 10000 @@ -153,42 +163,52 @@ class OesMdMessageLoop: self.gateway.on_tick(tick) def on_l2_trade(self, d: MdsMktRspMsgBodyT): + """""" data = d.trade symbol = data.SecurityID - tick = self.get_last_tick(symbol) + tick = self._get_last_tick(symbol) tick.datetime = datetime.utcnow() tick.volume = data.TradeQty tick.last_price = data.TradePrice / 10000 self.gateway.on_tick(tick) def on_market_data_request(self, d: MdsMktRspMsgBodyT): + """""" pass def on_trading_session_status(self, d: MdsMktRspMsgBodyT): + """""" pass def on_l2_market_overview(self, d: MdsMktRspMsgBodyT): + """""" pass def on_index_snapshot_full_refresh(self, d: MdsMktRspMsgBodyT): + """""" pass def on_option_snapshot_ful_refresh(self, d: MdsMktRspMsgBodyT): + """""" pass def on_best_orders_snapshot(self, d: MdsMktRspMsgBodyT): + """""" pass def on_l2_order(self, d: MdsMktRspMsgBodyT): + """""" pass def on_security_status(self, d: MdsMktRspMsgBodyT): + """""" pass class OesMdApi: def __init__(self, gateway: BaseGateway): + """""" self.gateway = gateway self.config_path: str = '' self.username: str = '' @@ -198,6 +218,7 @@ class OesMdApi: self._message_loop = OesMdMessageLoop(gateway, self, self._env) def connect(self) -> bool: + """""" """Connect to trading server. :note set config_path before calling this function """ @@ -214,18 +235,22 @@ class OesMdApi: return True def start(self): + """""" self._message_loop.start() def stop(self): + """""" self._message_loop.stop() MdsApi_LogoutAll(self._env, True) MdsApi_DestoryAll(self._env) def join(self): + """""" self._message_loop.join() # why isn't arg a ContractData? def subscribe(self, req: SubscribeRequest): + """""" mds_req = MdsMktDataRequestReqT() entry = MdsMktDataRequestEntryT() mds_req.subMode = eMdsSubscribeModeT.MDS_SUB_MODE_APPEND diff --git a/vnpy/gateway/oes/oes_td.py b/vnpy/gateway/oes/oes_td.py index 2cde3db6..e399cc3b 100644 --- a/vnpy/gateway/oes/oes_td.py +++ b/vnpy/gateway/oes/oes_td.py @@ -3,7 +3,7 @@ from datetime import datetime, timedelta, timezone from gettext import gettext as _ from threading import Lock, Thread # noinspection PyUnresolvedReferences -from typing import Any, Callable, Dict, Tuple +from typing import Any, Callable, Dict from vnpy.api.oes.vnoes import OesApiClientEnvT, OesApi_DestoryAll, OesApi_InitAllByConvention, \ OesApi_IsValidOrdChannel, OesApi_IsValidQryChannel, OesApi_IsValidRptChannel, OesApi_LogoutAll, \ @@ -95,6 +95,7 @@ class InternalOrder: def parse_oes_datetime(date: int, time: int): + """convert oes datetime to python datetime""" # YYYYMMDD year = int(date / 10000) month = int((date % 10000) / 100) @@ -115,13 +116,14 @@ class OesTdMessageLoop: env: OesApiClientEnvT, td: "OesTdApi" ): + """""" self.gateway = gateway self._env = env self._td = td self._alive = False - self._th = Thread(target=self.message_loop) + self._th = Thread(target=self._message_loop) self.message_handlers: Dict[int, Callable[[dict], None]] = { eOesMsgTypeT.OESMSG_RPT_BUSINESS_REJECT: self.on_order_rejected, @@ -137,18 +139,27 @@ class OesTdMessageLoop: } def start(self): + """""" self._alive = True self._th.start() def stop(self): + """""" self._alive = False def join(self): + """""" self._th.join() - def on_message(self, session_info: SGeneralClientChannelT, - head: SMsgHeadT, - body: Any): + def reconnect(self): + """""" + self.gateway.write_log(_("正在尝试重新连接到交易服务器。")) + self._td.connect() + + def _on_message(self, session_info: SGeneralClientChannelT, + head: SMsgHeadT, + body: Any): + """""" if session_info.protocolType == SMSG_PROTO_BINARY: b = cast.toOesRspMsgBodyT(body) if head.msgId in self.message_handlers: @@ -160,11 +171,8 @@ class OesTdMessageLoop: self.gateway.write_log(f"unknown prototype : {session_info.protocolType}") return 1 - def reconnect(self): - self.gateway.write_log(_("正在尝试重新连接到交易服务器。")) - self._td.connect() - - def message_loop(self): + def _message_loop(self): + """""" rpt_channel = self._env.rptChannel timeout_ms = 1000 is_disconnected = SPlatform_IsNegEpipe @@ -172,7 +180,7 @@ class OesTdMessageLoop: while self._alive: ret = OesApi_WaitReportMsg(rpt_channel, timeout_ms, - self.on_message) + self._on_message) if ret < 0: # if is_timeout(ret): # pass # just no message @@ -183,6 +191,7 @@ class OesTdMessageLoop: return def on_order_rejected(self, d: OesRspMsgBodyT): + """""" error_code = d.rptMsg.rptHead.ordRejReason error_string = error_to_str(error_code) data: OesOrdRejectT = d.rptMsg.rptBody.ordRejectRsp @@ -202,6 +211,7 @@ class OesTdMessageLoop: self.gateway.write_log(f"撤单失败,订单号: {data.origClSeqNo}。原因:{error_string}") def on_order_inserted(self, d: OesRspMsgBodyT): + """""" data = d.rptMsg.rptBody.ordInsertRsp if not data.origClSeqNo: @@ -217,6 +227,7 @@ class OesTdMessageLoop: self.gateway.on_order(vt_order) def on_order_report(self, d: OesRspMsgBodyT): + """""" data: OesOrdCnfmT = d.rptMsg.rptBody.ordCnfm if not data.origClSeqNo: @@ -233,6 +244,7 @@ class OesTdMessageLoop: self.gateway.on_order(vt_order) def on_trade_report(self, d: OesRspMsgBodyT): + """""" data: OesTrdCnfmT = d.rptMsg.rptBody.trdCnfm i = self._td.get_order(data.clSeqNo) @@ -258,9 +270,11 @@ class OesTdMessageLoop: self.gateway.on_order(vt_order) def on_option_holding(self, d: OesRspMsgBodyT): + """""" pass def on_stock_holding(self, d: OesRspMsgBodyT): + """""" data = d.rptMsg.rptBody.stkHoldingRpt position = PositionData( gateway_name=self.gateway.gateway_name, @@ -277,6 +291,7 @@ class OesTdMessageLoop: self.gateway.on_position(position) def on_cash(self, d: OesRspMsgBodyT): + """""" data = d.rptMsg.rptBody.cashAssetRpt balance = data.currentTotalBal @@ -296,6 +311,7 @@ class OesTdMessageLoop: class OesTdApi: def __init__(self, gateway: BaseGateway): + """""" self.config_path: str = None self.username: str = '' self.password: str = '' @@ -309,7 +325,7 @@ class OesTdApi: self._last_seq_lock = Lock() self._last_seq_index = 1000000 # 0 has special manning for oes - + self._orders: Dict[int, InternalOrder] = {} def connect(self): @@ -333,14 +349,17 @@ class OesTdApi: return True def start(self): + """""" self._message_loop.start() def stop(self): + """""" self._message_loop.stop() OesApi_LogoutAll(self._env, True) OesApi_DestoryAll(self._env) def join(self): + """""" self._message_loop.join() def _get_new_seq_index(self): @@ -351,6 +370,7 @@ class OesTdApi: return index def query_account(self): + """""" OesApi_QueryCashAsset(self._env.qryChannel, OesQryCashAssetFilterT(), self.on_query_asset @@ -362,6 +382,7 @@ class OesTdApi: body: Any, cursor: OesQryCursorT, ): + """""" data = cast.toOesCashAssetItemT(body) balance = data.currentTotalBal / 10000 availiable = data.currentAvailableBal / 10000 @@ -377,10 +398,12 @@ class OesTdApi: return 1 def query_stock(self, ) -> bool: + """""" # Thread(target=self._query_stock, ).start() return self._query_stock() def _query_stock(self, ) -> bool: + """""" f = OesQryStockFilterT() ret = OesApi_QueryStock(self._env.qryChannel, f, self.on_query_stock) return ret >= 0 @@ -391,6 +414,7 @@ class OesTdApi: body: Any, cursor: OesQryCursorT, ): + """""" data: OesStockBaseInfoT = cast.toOesStockItemT(body) contract = ContractData( gateway_name=self.gateway.gateway_name, @@ -405,6 +429,7 @@ class OesTdApi: return 1 def query_option(self) -> bool: + """""" f = OesQryOptionFilterT() ret = OesApi_QueryOption(self._env.qryChannel, f, @@ -418,6 +443,7 @@ class OesTdApi: body: Any, cursor: OesQryCursorT, ): + """""" data = cast.toOesOptionItemT(body) contract = ContractData( gateway_name=self.gateway.gateway_name, @@ -432,6 +458,7 @@ class OesTdApi: return 1 def query_stock_holding(self) -> bool: + """""" f = OesQryStkHoldingFilterT() ret = OesApi_QueryStkHolding(self._env.qryChannel, f, @@ -445,6 +472,7 @@ class OesTdApi: body: Any, cursor: OesQryCursorT, ): + """""" data = cast.toOesStkHoldingItemT(body) position = PositionData( @@ -463,6 +491,7 @@ class OesTdApi: return 1 def query_option_holding(self) -> bool: + """""" f = OesQryStkHoldingFilterT() f.mktId = eOesMarketIdT.OES_MKT_ID_UNDEFINE f.userInfo = 0 @@ -478,6 +507,7 @@ class OesTdApi: body: Any, cursor: OesQryCursorT, ): + """""" data = cast.toOesOptHoldingItemT(body) # 权利 @@ -513,15 +543,18 @@ class OesTdApi: return 1 def query_contracts(self): + """""" self.query_stock() # self.query_option() # self.query_issue() def query_position(self): + """""" self.query_stock_holding() self.query_option_holding() def send_order(self, vt_req: OrderRequest): + """""" seq_id = self._get_new_seq_index() order_id = seq_id @@ -551,6 +584,7 @@ class OesTdApi: return order.vt_orderid def cancel_order(self, vt_req: CancelRequest): + """""" seq_id = self._get_new_seq_index() oes_req = OesOrdCancelReqT() @@ -565,11 +599,13 @@ class OesTdApi: oes_req) def schedule_query_order(self, internal_order: InternalOrder) -> Thread: + """""" th = Thread(target=self.query_order, args=(internal_order,)) th.start() return th def query_order(self, internal_order: InternalOrder) -> bool: + """""" f = OesQryOrdFilterT() f.mktId = EXCHANGE_VT2OES[internal_order.vt_order.exchange] f.clSeqNo = internal_order.order_id @@ -584,6 +620,7 @@ class OesTdApi: head: SMsgHeadT, body: Any, cursor: OesQryCursorT): + """""" data: OesOrdCnfmT = cast.toOesOrdItemT(body) i = self._td.get_order(data.clSeqNo) @@ -612,6 +649,7 @@ class OesTdApi: body: Any, cursor: OesQryCursorT, ): + """""" data: OesOrdCnfmT = cast.toOesOrdItemT(body) try: i = self.get_order(data.clSeqNo) @@ -649,11 +687,12 @@ class OesTdApi: return 1 def save_order(self, order_id: int, order: OrderData): + """""" self._orders[order_id] = InternalOrder( order_id=order_id, vt_order=order, ) def get_order(self, order_id: int): + """""" return self._orders[order_id] -