[Mod] complete test of QuoteApi

This commit is contained in:
vn.py 2019-06-07 17:33:19 +08:00
parent 260e270c22
commit 18b4162f4f
5 changed files with 378 additions and 218 deletions

View File

@ -1,31 +1,31 @@
from vnpy.event import EventEngine from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp from vnpy.trader.ui import MainWindow, create_qapp
from vnpy.gateway.bitmex import BitmexGateway # from vnpy.gateway.bitmex import BitmexGateway
from vnpy.gateway.futu import FutuGateway # from vnpy.gateway.futu import FutuGateway
from vnpy.gateway.ib import IbGateway # from vnpy.gateway.ib import IbGateway
from vnpy.gateway.ctp import CtpGateway # from vnpy.gateway.ctp import CtpGateway
# from vnpy.gateway.ctptest import CtptestGateway # # from vnpy.gateway.ctptest import CtptestGateway
from vnpy.gateway.femas import FemasGateway # from vnpy.gateway.femas import FemasGateway
from vnpy.gateway.tiger import TigerGateway # from vnpy.gateway.tiger import TigerGateway
from vnpy.gateway.oes import OesGateway # from vnpy.gateway.oes import OesGateway
from vnpy.gateway.okex import OkexGateway # from vnpy.gateway.okex import OkexGateway
from vnpy.gateway.huobi import HuobiGateway # from vnpy.gateway.huobi import HuobiGateway
from vnpy.gateway.bitfinex import BitfinexGateway # from vnpy.gateway.bitfinex import BitfinexGateway
from vnpy.gateway.onetoken import OnetokenGateway # from vnpy.gateway.onetoken import OnetokenGateway
from vnpy.gateway.okexf import OkexfGateway # from vnpy.gateway.okexf import OkexfGateway
from vnpy.gateway.xtp import XtpGateway # from vnpy.gateway.xtp import XtpGateway
from vnpy.gateway.hbdm import HbdmGateway # from vnpy.gateway.hbdm import HbdmGateway
from vnpy.gateway.tap import TapGateway
from vnpy.app.cta_strategy import CtaStrategyApp # from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.csv_loader import CsvLoaderApp # from vnpy.app.csv_loader import CsvLoaderApp
from vnpy.app.algo_trading import AlgoTradingApp # from vnpy.app.algo_trading import AlgoTradingApp
from vnpy.app.cta_backtester import CtaBacktesterApp # from vnpy.app.cta_backtester import CtaBacktesterApp
from vnpy.app.data_recorder import DataRecorderApp # from vnpy.app.data_recorder import DataRecorderApp
from vnpy.app.risk_manager import RiskManagerApp # from vnpy.app.risk_manager import RiskManagerApp
def main(): def main():
@ -35,28 +35,29 @@ def main():
event_engine = EventEngine() event_engine = EventEngine()
main_engine = MainEngine(event_engine) main_engine = MainEngine(event_engine)
main_engine.add_gateway(XtpGateway) # main_engine.add_gateway(XtpGateway)
main_engine.add_gateway(CtpGateway) # main_engine.add_gateway(CtpGateway)
# main_engine.add_gateway(CtptestGateway) # # main_engine.add_gateway(CtptestGateway)
main_engine.add_gateway(FemasGateway) # main_engine.add_gateway(FemasGateway)
main_engine.add_gateway(IbGateway) # main_engine.add_gateway(IbGateway)
main_engine.add_gateway(FutuGateway) # main_engine.add_gateway(FutuGateway)
main_engine.add_gateway(BitmexGateway) # main_engine.add_gateway(BitmexGateway)
main_engine.add_gateway(TigerGateway) # main_engine.add_gateway(TigerGateway)
main_engine.add_gateway(OesGateway) # main_engine.add_gateway(OesGateway)
main_engine.add_gateway(OkexGateway) # main_engine.add_gateway(OkexGateway)
main_engine.add_gateway(HuobiGateway) # main_engine.add_gateway(HuobiGateway)
main_engine.add_gateway(BitfinexGateway) # main_engine.add_gateway(BitfinexGateway)
main_engine.add_gateway(OnetokenGateway) # main_engine.add_gateway(OnetokenGateway)
main_engine.add_gateway(OkexfGateway) # main_engine.add_gateway(OkexfGateway)
main_engine.add_gateway(HbdmGateway) # main_engine.add_gateway(HbdmGateway)
main_engine.add_gateway(TapGateway)
main_engine.add_app(CtaStrategyApp) # main_engine.add_app(CtaStrategyApp)
main_engine.add_app(CtaBacktesterApp) # main_engine.add_app(CtaBacktesterApp)
main_engine.add_app(CsvLoaderApp) # main_engine.add_app(CsvLoaderApp)
main_engine.add_app(AlgoTradingApp) # main_engine.add_app(AlgoTradingApp)
main_engine.add_app(DataRecorderApp) # main_engine.add_app(DataRecorderApp)
main_engine.add_app(RiskManagerApp) # main_engine.add_app(RiskManagerApp)
main_window = MainWindow(main_engine, event_engine) main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized() main_window.showMaximized()

Binary file not shown.

0
vnpy/api/tap/__init__.py Normal file
View File

View File

@ -138,7 +138,7 @@ class CtpGateway(BaseGateway):
def __init__(self, event_engine): def __init__(self, event_engine):
"""Constructor""" """Constructor"""
super(CtpGateway, self).__init__(event_engine, "CTP") super().__init__(event_engine, "CTP")
self.td_api = CtpTdApi(self) self.td_api = CtpTdApi(self)
self.md_api = CtpMdApi(self) self.md_api = CtpMdApi(self)

View File

@ -3,7 +3,8 @@ from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from typing import Any, Dict, Optional, Tuple from typing import Any, Dict, Optional, Tuple
from vnpy.api.tap.vntap import (APIYNFLAG_NO, AsyncDispatchException, CreateTapQuoteAPI, from vnpy.api.tap.vntap import (
APIYNFLAG_NO, AsyncDispatchException, CreateTapQuoteAPI,
FreeTapQuoteAPI, ITapQuoteAPI, ITapQuoteAPINotify, FreeTapQuoteAPI, ITapQuoteAPI, ITapQuoteAPINotify,
TAPIERROR_SUCCEED, TAPI_CALLPUT_FLAG_NONE, TAPIERROR_SUCCEED, TAPI_CALLPUT_FLAG_NONE,
TAPI_COMMODITY_TYPE_FUTURES, TAPI_COMMODITY_TYPE_INDEX, TAPI_COMMODITY_TYPE_FUTURES, TAPI_COMMODITY_TYPE_INDEX,
@ -11,9 +12,18 @@ from vnpy.api.tap.vntap import (APIYNFLAG_NO, AsyncDispatchException, CreateTapQ
TAPI_COMMODITY_TYPE_STOCK, TapAPIApplicationInfo, TapAPIContract, TAPI_COMMODITY_TYPE_STOCK, TapAPIApplicationInfo, TapAPIContract,
TapAPIQuotLoginRspInfo, TapAPIQuoteCommodityInfo, TapAPIQuotLoginRspInfo, TapAPIQuoteCommodityInfo,
TapAPIQuoteContractInfo, TapAPIQuoteLoginAuth, TapAPIQuoteWhole, TapAPIQuoteContractInfo, TapAPIQuoteLoginAuth, TapAPIQuoteWhole,
set_async_callback_exception_handler) set_async_callback_exception_handler,
CreateITapTradeAPI
)
from vnpy.api.tap.vntap.ITapTrade import (
ITapTradeAPINotify, ITapTradeAPI,
TapAPITradeLoginRspInfo,
TapAPIApplicationInfo as TapTradeAPIApplicationInfo,
TapAPITradeLoginAuth, TapAPIAccQryReq, TapAPIFundReq,
TapAPIAccountInfo, TapAPIFundData
)
from vnpy.api.tap.error_codes import error_map from vnpy.api.tap.error_codes import error_map
from vnpy.event import EventEngine from vnpy.event import EventEngine
from vnpy.trader.constant import Exchange, Product from vnpy.trader.constant import Exchange, Product
from vnpy.trader.gateway import BaseGateway from vnpy.trader.gateway import BaseGateway
@ -38,124 +48,212 @@ EXCHANGE_TAP2VT = {
'APEX': Exchange.APEX, 'APEX': Exchange.APEX,
'NYMEX': Exchange.NYMEX, 'NYMEX': Exchange.NYMEX,
'LME': Exchange.LME, 'LME': Exchange.LME,
'COMEX': Exchange.COMEX, # verify: I added this exchange in vnpy, is this correct? 'COMEX': Exchange.COMEX,
'CBOT': Exchange.CBOT, 'CBOT': Exchange.CBOT,
'HKEX': Exchange.HKFE, # verify: is this correct?, 'HKEX': Exchange.HKFE,
'CME': Exchange.CME, 'CME': Exchange.CME,
} }
EXCHANGE_VT2TAP = {v: k for k, v in EXCHANGE_TAP2VT.items()} EXCHANGE_VT2TAP = {v: k for k, v in EXCHANGE_TAP2VT.items()}
def parse_datetime(dt_str: str): commodity_infos = {}
# todo: 我不知道这个时间所用的是哪种时间 extra_infos = {}
# yyyy-MM-dd hh:nn:ss.xxx
return datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S.%f")
def error_to_str(err_code: int) -> str: class TapGateway(BaseGateway):
try:
return error_map[err_code]
except KeyError:
return f"Unknown error({err_code})"
@dataclass()
class ContractExtraInfo:
""" """
缓存一些ITAP API不直接提供的信息而且仅仅保存这些信息Symbol之类的作为键就不保存在这里面了 VN Trader gateway for Esunny 9.0.
""" """
name: str
commodity_type: int default_setting = {
commodity_no: str "quote_username": "",
"quote_password": "",
"quote_host": "",
"quote_port": 0,
"trade_username": "",
"trade_password": "",
"trade_host": "",
"trade_port": 0,
"auth_code": ""
}
exchanges = list(EXCHANGE_VT2TAP.keys())
def __init__(self, event_engine: EventEngine):
""""""
super().__init__(event_engine, "TAP")
self.quote_api = QuoteApi(self)
self.trade_api = TradeApi(self)
set_async_callback_exception_handler(
self._async_callback_exception_handler)
def connect(self, setting: dict):
""""""
quote_username = setting["quote_username"]
quote_password = setting["quote_password"]
quote_host = setting["quote_host"]
quote_port = setting["quote_port"]
trade_username = setting["trade_username"]
trade_password = setting["trade_password"]
trade_host = setting["trade_host"]
trade_port = setting["trade_port"]
auth_code = setting["auth_code"]
self.trade_api.connect(
trade_username,
trade_password,
trade_host,
trade_port,
auth_code
)
self.quote_api.connect(
quote_username,
quote_password,
quote_host,
quote_port,
auth_code
)
def close(self):
""""""
self.trade_api.close()
self.quote_api.close()
def subscribe(self, req: SubscribeRequest):
""""""
self.quote_api.subscribe(req)
def send_order(self, req: OrderRequest) -> str:
""""""
return self.trade_api.send_order(req)
def cancel_order(self, req: CancelRequest):
""""""
self.trade_api.cancel_order(req)
def query_account(self):
""""""
self.trade_api.query_account()
def query_position(self):
""""""
self.trade_api.query_position()
def _async_callback_exception_handler(self, e: AsyncDispatchException):
""""""
error_str = (f"发生内部错误,位置:\n{e.instance}.{e.function_name},详细信息:\n"
f"{e.what}\n"
)
print(error_str)
self.write_log(error_str)
return True
class QuoteNotify(ITapQuoteAPINotify): class QuoteApi(ITapQuoteAPINotify):
""""""
def __init__(self, gateway: "TapGateway"): def __init__(self, gateway: TapGateway):
""""""
super().__init__() super().__init__()
self.gateway = gateway self.gateway = gateway
self.api = None
@property def OnRspLogin(self, errorCode: int, info: TapAPIQuotLoginRspInfo):
def api(self) -> ITapQuoteAPI: """"""
return self.gateway.api if errorCode != TAPIERROR_SUCCEED:
def OnRspLogin(self, errorCode: int, info: TapAPIQuotLoginRspInfo) -> Any:
if self.gateway.if_error_write_log(errorCode, "OnRspQryCommodity"):
self.gateway.write_log("行情服务器登录失败") self.gateway.write_log("行情服务器登录失败")
return else:
self.gateway.write_log("行情服务器登录成功") self.gateway.write_log("行情服务器登录成功")
def OnAPIReady(self) -> Any: def OnAPIReady(self):
print("OnApiReady") """"""
error_code, sessionId = self.api.QryCommodity() self.api.QryCommodity()
if self.gateway.if_error_write_log(error_code, "api.QryCommodity"):
return
def OnDisconnect(self, reasonCode: int) -> Any: def OnDisconnect(self, reasonCode: int):
print(f"OnDisconnect : {error_to_str(reasonCode)}") """"""
self.gateway.write_log(f"行情服务器连接断开,原因:{reasonCode}")
def OnRspQryCommodity( def OnRspQryCommodity(
self, self,
sessionID: int, sessionID: int,
errorCode: int, errorCode: int,
isLast: int, isLast: str,
info: TapAPIQuoteCommodityInfo, info: TapAPIQuoteCommodityInfo,
) -> Any: ):
if self.gateway.if_error_write_log(errorCode, "OnRspQryCommodity"): """"""
if errorCode != TAPIERROR_SUCCEED:
self.gateway.write_log("查询交易品种信息失败")
return return
error_code, session_id = self.api.QryContract(info.Commodity) commodity_info = CommodityInfo(
if self.gateway.if_error_write_log(error_code, "api.QryContract"): name=info.CommodityName,
return size=info.ContractSize, # value is 0 in sim environment
pricetick=info.CommodityTickSize
)
commodity_infos[info.Commodity.CommodityNo] = commodity_info
self.api.QryContract(info.Commodity)
if isLast == "Y":
self.gateway.write_log("查询交易品种信息成功")
def OnRspQryContract( def OnRspQryContract(
self, sessionID: int, errorCode: int, isLast: int, info: TapAPIQuoteContractInfo self, sessionID: int, errorCode: int, isLast: str, info: TapAPIQuoteContractInfo
) -> Any: ):
if self.gateway.if_error_write_log(errorCode, "OnRspQryContract"): """"""
if errorCode != TAPIERROR_SUCCEED:
self.gateway.write_log("查询交易合约信息失败")
return return
if info is not None: # isLast == True ==> info is None
symbol = info.Contract.ContractNo1 # what's the different between No1 and No2? if not info:
exchange = EXCHANGE_TAP2VT[info.Contract.Commodity.ExchangeNo] return
name = info.ContractName
contract_data = ContractData( commodity_info = commodity_infos[info.Contract.Commodity.CommodityNo]
gateway_name=self.gateway.gateway_name, symbol = info.Contract.Commodity.CommodityNo + info.Contract.ContractNo1
contract = ContractData(
symbol=symbol, symbol=symbol,
exchange=exchange, exchange=EXCHANGE_TAP2VT[info.Contract.Commodity.ExchangeNo],
name=name, name=symbol,
product=PRODUCT_TYPE_TAP2VT.get(info.ContractType, Product.EQUITY), product=Product.FUTURES,
size=1, # verify: no key for this size=commodity_info.size,
pricetick=1, # verify: no key for this pricetick=commodity_info.pricetick,
min_volume=1, # verify: no key for this gateway_name=self.gateway.gateway_name
# ...
) )
contract_extra_info = ContractExtraInfo( self.gateway.on_contract(contract)
name=name,
extra_info = ExtraInfo(
name=contract.name,
commodity_type=info.Contract.Commodity.CommodityType, commodity_type=info.Contract.Commodity.CommodityType,
commodity_no=info.Contract.Commodity.CommodityNo, commodity_no=info.Contract.Commodity.CommodityNo,
) )
self.gateway.contract_extra_infos[(symbol, exchange)] = contract_extra_info extra_infos[(contract.symbol, contract.exchange)] = extra_info
self.gateway.on_contract(contract_data)
def OnRspSubscribeQuote( def OnRspSubscribeQuote(
self, sessionID: int, errorCode: int, isLast: int, info: TapAPIQuoteWhole self, sessionID: int, errorCode: int, isLast: str, info: TapAPIQuoteWhole
) -> Any: ):
print("OnRspSubscribeQuote") if errorCode != TAPIERROR_SUCCEED:
if self.gateway.if_error_write_log(errorCode, "OnRspSubscribeQuote"): self.gateway.write_log("订阅行情失败")
return else:
self.update_tick(info)
def OnRspUnSubscribeQuote( def OnRtnQuote(self, info: TapAPIQuoteWhole):
self, sessionID: int, errorCode: int, isLast: int, info: TapAPIContract """"""
) -> Any: self.update_tick(info)
print("OnRspUnSubscribeQuote")
if self.gateway.if_error_write_log(errorCode, "OnRspUnSubscribeQuote"):
return
def OnRtnQuote(self, info: TapAPIQuoteWhole) -> Any: def update_tick(self, info: TapAPIQuoteWhole):
""""""
symbol = info.Contract.ContractNo1 symbol = info.Contract.ContractNo1
exchange = EXCHANGE_TAP2VT[info.Contract.Commodity.ExchangeNo] exchange = EXCHANGE_TAP2VT[info.Contract.Commodity.ExchangeNo]
extra_info = self.gateway.contract_extra_infos[(symbol, exchange)]
extra_info = extra_infos.get((symbol, exchange), None)
if not extra_info:
return
tick = TickData( tick = TickData(
gateway_name=self.gateway.gateway_name,
symbol=symbol, symbol=symbol,
exchange=exchange, exchange=exchange,
datetime=parse_datetime(info.DateTimeStamp), datetime=parse_datetime(info.DateTimeStamp),
@ -189,113 +287,174 @@ class QuoteNotify(ITapQuoteAPINotify):
ask_volume_3=info.QAskQty[2], ask_volume_3=info.QAskQty[2],
ask_volume_4=info.QAskQty[3], ask_volume_4=info.QAskQty[3],
ask_volume_5=info.QAskQty[4], ask_volume_5=info.QAskQty[4],
gateway_name=self.gateway.gateway_name,
) )
self.gateway.on_tick(tick=tick) self.gateway.on_tick(tick)
def connect(self, username: str, password: str, host: str, port: int, auth_code: str):
""""""
# Create API object
info = TapAPIApplicationInfo()
info.AuthCode = auth_code
class TapGateway(BaseGateway): self.api, iResult = CreateTapQuoteAPI(info)
default_setting = { if not self.api:
"auth_code": "", self.gateway.write_log("行情API初始化失败")
"quote_host": "123.15.58.21",
"quote_port": 7171,
"quote_username": "",
"quote_password": "",
}
exchanges = list(EXCHANGE_VT2TAP.keys())
def __init__(self, event_engine: EventEngine):
super().__init__(event_engine, "ITAP")
self.api: Optional[ITapQuoteAPI] = None
self.quote_notify = QuoteNotify(self)
# [symbol, exchange] : [CommodityType, CommodityNo]
self.contract_extra_infos: Dict[Tuple[str, Exchange], ContractExtraInfo] = {}
set_async_callback_exception_handler(self._async_callback_exception_handler)
def connect(self, setting: dict):
auth_code = setting["auth_code"]
quote_host = setting["quote_host"]
quote_port = setting["quote_port"]
quote_username = setting["quote_username"]
quote_password = setting["quote_password"]
# print("quote version:", GetTapQuoteAPIVersion())
# print("trade version:", GetITapTradeAPIVersion())
ai = TapAPIApplicationInfo()
ai.AuthCode = auth_code
# iResult是输出参数Python中只能用返回值作为输出。
# 具体输出了哪些可以看pyi的注释
api, iResult = CreateTapQuoteAPI(ai)
if api is None:
self.if_error_write_log(iResult, "CreateTapQuoteAPI")
return return
assert self.api is None # Set server address and port
self.api = api self.api.SetAPINotify(self)
self.api.SetHostAddress(host, port)
api.SetAPINotify(self.quote_notify) # Start connection
error_code: int = api.SetHostAddress(quote_host, quote_port) login_auth = TapAPIQuoteLoginAuth()
if self.if_error_write_log(error_code, "SetHostAddress"): login_auth.UserNo = username
return login_auth.Password = password
login_auth.ISDDA = APIYNFLAG_NO
login_auth.ISModifyPassword = APIYNFLAG_NO
# login self.api.Login(login_auth)
la = TapAPIQuoteLoginAuth()
la.UserNo = quote_username
la.Password = quote_password
la.ISDDA = APIYNFLAG_NO
la.ISModifyPassword = APIYNFLAG_NO
error_code: int = api.Login(la) # async
if self.if_error_write_log(error_code, "api.Login"):
return
def close(self): def close(self):
""""""
self.api.SetAPINotify(None) self.api.SetAPINotify(None)
self.quote_notify = None
FreeTapQuoteAPI(self.api) FreeTapQuoteAPI(self.api)
self.api = None self.api = None
pass
def subscribe(self, req: SubscribeRequest): def subscribe(self, req: SubscribeRequest):
contract = TapAPIContract() """"""
extra_info = self.contract_extra_infos[(req.symbol, req.exchange)] extra_info = extra_infos.get((req.symbol, req.exchange), None)
contract.Commodity.ExchangeNo = EXCHANGE_VT2TAP[req.exchange] if not extra_info:
contract.Commodity.CommodityType = extra_info.commodity_type self.gateway.write_log(
contract.Commodity.CommodityNo = extra_info.commodity_no f"找不到匹配的合约:{req.symbol}{req.exchange.value}")
contract.ContractNo1 = req.symbol
contract.CallOrPutFlag1 = TAPI_CALLPUT_FLAG_NONE
contract.CallOrPutFlag2 = TAPI_CALLPUT_FLAG_NONE
error_code, session_id = self.api.SubscribeQuote(contract)
if self.if_error_write_log(error_code, "api.SubscribeQuote"):
return return
def send_order(self, req: OrderRequest) -> str: tap_contract = TapAPIContract()
tap_contract.Commodity.ExchangeNo = EXCHANGE_VT2TAP[req.exchange]
tap_contract.Commodity.CommodityType = extra_info.commodity_type
tap_contract.Commodity.CommodityNo = extra_info.commodity_no
tap_contract.ContractNo1 = req.symbol
tap_contract.CallOrPutFlag1 = TAPI_CALLPUT_FLAG_NONE
tap_contract.CallOrPutFlag2 = TAPI_CALLPUT_FLAG_NONE
self.api.SubscribeQuote(tap_contract)
class TradeApi(ITapTradeAPINotify):
""""""
def __init__(self, gateway: TapGateway):
""""""
super().__init__()
self.gateway = gateway
self.api = None
def OnConnect(self):
""""""
self.gateway.write_log("交易服务器连接成功")
def OnRspLogin(self, errorCode: int, info: TapAPITradeLoginRspInfo):
""""""
if errorCode != TAPIERROR_SUCCEED:
error_msg = error_to_str(errorCode)
self.gateway.write_log(f"交易服务器登录失败:{error_msg}")
else:
self.gateway.write_log("交易服务器登录成功")
def OnRspQryAccount(
self,
sessionID: int,
errorCode: int,
isLast: str,
info: TapAPIFundData
):
req = TapAPIFundReq()
req.AccountNo = info.AccountNo
self.api.QryFund(req)
def OnRspQryFund(
self,
sessionID: int,
errorCode: int,
isLast: str,
info: TapAPIAccountInfo
):
self.update_account(info)
def update_account(self, info: TapAPIAccountInfo):
""""""
account = AccountData()
def connect(self, username: str, password: str, host: str, port: int, auth_code: str):
""""""
# Create API object
info = TapTradeAPIApplicationInfo()
info.AuthCode = auth_code
self.api, iResult = CreateITapTradeAPI(info)
if not self.api:
self.gateway.write_log("交易API初始化失败")
return
# Set server address and port
self.api.SetAPINotify(self)
self.api.SetHostAddress(host, port, False)
# Start connection
login_auth = TapAPITradeLoginAuth()
login_auth.UserNo = username
login_auth.Password = password
login_auth.ISModifyPassword = APIYNFLAG_NO
self.api.Login(login_auth)
def send_order(self, req: OrderRequest):
""""""
pass pass
def cancel_order(self, req: CancelRequest): def cancel_order(self, req: CancelRequest):
""""""
pass pass
def query_account(self): def query_account(self):
pass """"""
req = TapAPIAccQryReq()
self.api.QryAccount(req)
def query_position(self): def query_position(self):
""""""
pass pass
def _async_callback_exception_handler(self, e: AsyncDispatchException):
error_str = (f"发生内部错误,位置:\n{e.instance}.{e.function_name},详细信息:\n"
f"{e.what}\n"
)
print(error_str, file=sys.stderr, flush=True)
self.write_log(error_str)
return True
def if_error_write_log(self, error_code: int, function: str): def parse_datetime(dt_str: str):
""" """"""
检查返回值如果发生错误调用write_log报告该错误并返回True try:
:return: 若有错误发生返回True dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S.%f")
""" except ValueError:
if TAPIERROR_SUCCEED != error_code: dt = datetime(1970, 1, 1)
error_msg = f"调用{function}时出错:\n{error_to_str(error_code)}" return dt
self.write_log(error_msg)
print(error_msg, file=sys.stderr)
return True def error_to_str(err_code: int) -> str:
""""""
try:
return error_map[err_code]
except KeyError:
return f"Unknown error({err_code})"
@dataclass
class ExtraInfo:
""""""
name: str
commodity_type: int
commodity_no: str
@dataclass
class CommodityInfo:
""""""
name: str
size: int
pricetick: float