diff --git a/vnpy/gateway/xtp/xtp_gateway.py b/vnpy/gateway/xtp/xtp_gateway.py index 0ffb7cf4..c4a66ba2 100644 --- a/vnpy/gateway/xtp/xtp_gateway.py +++ b/vnpy/gateway/xtp/xtp_gateway.py @@ -1,8 +1,11 @@ from typing import Any, Sequence +from datetime import datetime from vnpy.api.xtp.vnxtp import ( OrderBookStruct, XTP, + set_async_callback_exception_handler, + AsyncDispatchException, XTPMarketDataStruct, XTPQuoteStaticInfo, XTPRspInfoStruct, @@ -12,9 +15,20 @@ from vnpy.api.xtp.vnxtp import ( XTPQueryOrderReq, XTPQueryTraderReq, XTPOrderInsertInfo, + XTPOrderInfo, + XTPTradeReport, + XTPOrderCancelInfo, + XTPQueryStkPositionRsp, + XTPQueryAssetRsp, + XTPStructuredFundInfo, + XTPFundTransferNotice, + XTPQueryETFBaseRsp, + XTPQueryETFComponentRsp, + XTPQueryIPOTickerRsp, + XTPQueryIPOQuotaRsp, + XTPQueryOptionAuctionInfoRsp, XTP_EXCHANGE_TYPE, XTP_LOG_LEVEL, - XTP_PROTOCOL_TYPE, XTP_TICKER_TYPE_STOCK, XTP_TICKER_TYPE_INDEX, XTP_TICKER_TYPE_FUND, @@ -27,11 +41,18 @@ from vnpy.api.xtp.vnxtp import ( XTP_SIDE_SELL, XTP_PRICE_LIMIT, XTP_PRICE_BEST5_OR_CANCEL, - XTP_BUSINESS_TYPE_CASH + XTP_BUSINESS_TYPE_CASH, + XTP_ORDER_STATUS_INIT, + XTP_ORDER_STATUS_ALLTRADED, + XTP_ORDER_STATUS_PARTTRADEDQUEUEING, + XTP_ORDER_STATUS_PARTTRADEDNOTQUEUEING, + XTP_ORDER_STATUS_NOTRADEQUEUEING, + XTP_ORDER_STATUS_CANCELED, + XTP_ORDER_STATUS_REJECTED ) from vnpy.event import EventEngine -from vnpy.trader.constant import Exchange, Product, Direction, OrderType +from vnpy.trader.constant import Exchange, Product, Direction, OrderType, Status from vnpy.trader.gateway import BaseGateway from vnpy.trader.object import (CancelRequest, OrderRequest, SubscribeRequest, TickData, ContractData, OrderData, TradeData, @@ -66,6 +87,16 @@ ORDERTYPE_VT2XTP = { } ORDERTYPE_XTP2VT = {v: k for k, v in ORDERTYPE_VT2XTP.items()} +STATUS_XTP2VT = { + XTP_ORDER_STATUS_INIT: Status.SUBMITTING, + XTP_ORDER_STATUS_ALLTRADED: Status.ALLTRADED, + XTP_ORDER_STATUS_PARTTRADEDQUEUEING: Status.PARTTRADED, + XTP_ORDER_STATUS_PARTTRADEDNOTQUEUEING: Status.PARTTRADED, + XTP_ORDER_STATUS_NOTRADEQUEUEING: Status.NOTTRADED, + XTP_ORDER_STATUS_CANCELED: Status.CANCELLED, + XTP_ORDER_STATUS_REJECTED: Status.REJECTED, +} + symbol_name_map = {} @@ -90,6 +121,9 @@ class XtpGateway(BaseGateway): self.quote_api = XtpQuoteApi(self) self.trader_api = XtpTraderApi(self) + set_async_callback_exception_handler( + self._async_callback_exception_handler) + def connect(self, setting: dict): """""" userid = setting['账号'] @@ -101,8 +135,8 @@ class XtpGateway(BaseGateway): trader_port = setting['交易端口'] quote_protocol = setting["行情协议"] - self.quote_api.connect(userid, password, client_id, - quote_ip, quote_port, quote_protocol) + # self.quote_api.connect(userid, password, client_id, + # quote_ip, quote_port, quote_protocol) self.trader_api.connect(userid, password, client_id, trader_ip, trader_port) @@ -131,6 +165,10 @@ class XtpGateway(BaseGateway): """""" self.trader_api.query_position() + def _async_callback_exception_handler(self, e: AsyncDispatchException): + error_str = f"发生内部错误:\n" f"位置:{e.instance}.{e.function_name}" f"详细信息:{e.what}" + print(error_str) + class XtpQuoteApi(API.QuoteSpi): @@ -220,7 +258,7 @@ class XtpQuoteApi(API.QuoteSpi): def check_error(self, func_name: str, error_info: XTPRspInfoStruct): """""" - if error_info.error_id: + if error_info and error_info.error_id: msg = f"{func_name}发生错误, 代码:{error_info.error_id},信息:{error_info.error_msg}" self.gateway.write_log(msg) return True @@ -249,12 +287,13 @@ class XtpQuoteApi(API.QuoteSpi): bid1_count: int, max_bid1_count: int, ask1_qty: Sequence[int], ask1_count: int, max_ask1_count: int) -> Any: """""" - timestamp = market_data.date_time + timestamp = str(market_data.data_time) + dt = datetime.strptime(timestamp, "%Y%m%d%H%M%S%f") tick = TickData( symbol=market_data.ticker, exchange=EXCHANGE_XTP2VT[market_data.exchange_id], - datetime=timestamp, + datetime=dt, volume=market_data.qty, last_price=market_data.last_price, limit_up=market_data.upper_limit_price, @@ -359,7 +398,7 @@ class XtpQuoteApi(API.QuoteSpi): name=ticker_info.ticker_name, product=PRODUCT_XTP2VT[ticker_info.ticker_type], size=1, - pricetick=ticker_info.pricetick, + pricetick=ticker_info.price_tick, min_volume=ticker_info.buy_qty_unit, gateway_name=self.gateway_name ) @@ -367,6 +406,9 @@ class XtpQuoteApi(API.QuoteSpi): symbol_name_map[contract.vt_symbol] = contract.name + if is_last: + self.gateway.write_log(f"{contract.exchange.value}合约信息查询成功") + def OnQueryTickersPriceInfo(self, ticker_info: XTPTickerPriceInfo, error_info: XTPRspInfoStruct, is_last: bool) -> Any: """""" @@ -420,9 +462,7 @@ class XtpTraderApi(API.TraderSpi): self.api = None self.session_id = 0 - self.reqid = 0 - self.orderid = 0 def connect( self, @@ -452,8 +492,9 @@ class XtpTraderApi(API.TraderSpi): ) self.api.RegisterSpi(self) + self.api.SetSoftwareKey("vnpy_test") self.api.SubscribePublicTopic(XTP_TERT_RESTART) - + self.gateway.write_log("交易接口初始化成功") # Login to server @@ -464,7 +505,7 @@ class XtpTraderApi(API.TraderSpi): self.password, XTP_PROTOCOL_TCP ) - + if self.session_id: msg = "交易服务器登录成功" else: @@ -481,29 +522,26 @@ class XtpTraderApi(API.TraderSpi): def send_order(self, req: OrderRequest) -> str: """""" - self.orderid += 1 - xtp_req = XTPOrderInsertInfo() xtp_req.ticker = req.symbol xtp_req.market = EXCHANGE_XTP2VT[req.exchange] xtp_req.price = req.price xtp_req.quantity = req.volume xtp_req.order_client_id = self.client_id - xtp_req.order_xtp_id = self.orderid xtp_req.side = DIRECTION_XTP2VT.get(req.direction, "") xtp_req.price_type = ORDERTYPE_XTP2VT.get(req.type, "") xtp_req.business_type = XTP_BUSINESS_TYPE_CASH - self.api.InsertOrder(xtp_req, self.session_id) + orderid = self.api.InsertOrder(xtp_req, self.session_id) - order = req.create_order_data(str(self.orderid)) + order = req.create_order_data(str(orderid)) self.gateway.on_order(order) return order.vt_orderid def cancel_order(self, req: CancelRequest): """""" - self.api.CancelOrder(req.orderid, self.session_id) + self.api.CancelOrder(int(req.orderid), self.session_id) def query_account(self): """""" @@ -524,3 +562,161 @@ class XtpTraderApi(API.TraderSpi): """""" self.reqid += 1 self.api.QueryTrades(XTPQueryTraderReq(), self.session_id, self.reqid) + + def check_error(self, func_name: str, error_info: XTPRspInfoStruct): + """""" + if error_info and error_info.error_id: + msg = f"{func_name}发生错误, 代码:{error_info.error_id},信息:{error_info.error_msg}" + self.gateway.write_log(msg) + return True + else: + return False + + def OnDisconnected(self, session_id: int, reason: int) -> Any: + """""" + self.gateway.write_log("交易服务器连接断开") + + def OnError(self, error_info: XTPRspInfoStruct) -> Any: + """""" + self.check_error("交易接口", error_info) + + def OnOrderEvent(self, order_info: XTPOrderInfo, error_info: XTPRspInfoStruct, + session_id: int) -> Any: + """""" + print("on orde event") + if self.check_error("委托下单", error_info): + return + + self.updateOrder(order_info) + + def OnTradeEvent(self, trade_info: XTPTradeReport, session_id: int) -> Any: + """""" + print("on trade event") + self.updateTrade(trade_info) + + def OnCancelOrderError(self, cancel_info: XTPOrderCancelInfo, error_info: XTPRspInfoStruct, + session_id: int) -> Any: + """""" + self.check_error("委托撤单", error_info) + + def OnQueryOrder(self, order_info: XTPOrderInfo, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + if self.check_error("查询委托", error_info): + return + + self.updateOrder(order_info) + + if is_last: + self.gateway.write_log("查询委托信息成功") + + def OnQueryTrade(self, trade_info: XTPTradeReport, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + if self.check_error("查询成交", error_info): + return + + self.updateTrade(trade_info) + + if is_last: + self.gateway.write_log("查询成交信息成功") + + def OnQueryPosition(self, xtp_position: XTPQueryStkPositionRsp, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + position = PositionData( + symbol=xtp_position.ticker, + exchange=EXCHANGE_XTP2VT[xtp_position.market], + direction=Direction.NET, + volume=xtp_position.total_qty, + frozen=xtp_position.locked_position, + price=xtp_position.avg_price, + pnl=xtp_position.unrealized_pnl, + yd_volume=xtp_position.yesterday_position, + gateway_name=self.gateway_name + ) + self.gateway.on_position(position) + + def OnQueryAsset(self, asset: XTPQueryAssetRsp, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + account = AccountData( + accountid=self.userid, + balance=asset.banlance, + frozen=(asset.frozen_margin + asset.frozen_exec_cash + asset.frozen_exec_fee), + gateway_name=self.gateway_name + ) + self.gateway.on_account(account) + + def OnQueryStructuredFund(self, fund_info: XTPStructuredFundInfo, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + pass + + def OnQueryFundTransfer(self, fund_transfer_info: XTPFundTransferNotice, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + pass + + def OnFundTransfer(self, fund_transfer_info: XTPFundTransferNotice, session_id: int) -> Any: + """""" + pass + + def OnQueryETF(self, etf_info: XTPQueryETFBaseRsp, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + pass + + def OnQueryETFBasket(self, etf_component_info: XTPQueryETFComponentRsp, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + pass + + def OnQueryIPOInfoList(self, ipo_info: XTPQueryIPOTickerRsp, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + pass + + def OnQueryIPOQuotaInfo(self, quota_info: XTPQueryIPOQuotaRsp, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + pass + + def OnQueryOptionAuctionInfo(self, option_info: XTPQueryOptionAuctionInfoRsp, error_info: XTPRspInfoStruct, + is_last: bool, session_id: int) -> Any: + """""" + pass + + def updateOrder(self, order_info: XTPOrderInfo): + """""" + order = OrderData( + symbol=order_info.ticker, + exchange=EXCHANGE_XTP2VT[order_info.market], + orderid=str(order_info.order_xtp_id), + type=ORDERTYPE_XTP2VT[order_info.price_type], + direction=DIRECTION_XTP2VT[order_info.side], + price=order_info.price, + volume=order_info.quantity, + traded=order_info.qty_traded, + status=STATUS_XTP2VT[order_info.order_status], + time=order_info.insert_time, + gateway_name=self.gateway_name + ) + + self.gateway.on_order(order) + + def updateTrade(self, trade_info: XTPTradeReport): + """""" + trade = TradeData( + symbol=trade_info.ticker, + exchange=EXCHANGE_XTP2VT[trade_info.market], + orderid=str(trade_info.order_xtp_id), + tradeid=str(trade_info.exec_id), + direction=DIRECTION_XTP2VT[trade_info.side], + price=trade_info.price, + volume=trade_info.quantity, + time=trade_info.trade_time, + gateway_name=self.gateway_name + ) + + self.gateway.on_trade(trade) \ No newline at end of file