diff --git a/vnpy/gateway/xtp/xtp_gateway.py b/vnpy/gateway/xtp/xtp_gateway.py index 57f0b1de..0ffb7cf4 100644 --- a/vnpy/gateway/xtp/xtp_gateway.py +++ b/vnpy/gateway/xtp/xtp_gateway.py @@ -1,4 +1,3 @@ -import sys from typing import Any, Sequence from vnpy.api.xtp.vnxtp import ( @@ -10,24 +9,33 @@ from vnpy.api.xtp.vnxtp import ( XTPSpecificTickerStruct, XTPTickByTickStruct, XTPTickerPriceInfo, + XTPQueryOrderReq, + XTPQueryTraderReq, + XTPOrderInsertInfo, XTP_EXCHANGE_TYPE, XTP_LOG_LEVEL, XTP_PROTOCOL_TYPE, - set_async_callback_exception_handler, - AsyncDispatchException, - XTP_TICKER_TYPE, + XTP_TICKER_TYPE_STOCK, + XTP_TICKER_TYPE_INDEX, + XTP_TICKER_TYPE_FUND, + XTP_TICKER_TYPE_BOND, + XTP_TICKER_TYPE_OPTION, + XTP_PROTOCOL_TCP, + XTP_PROTOCOL_UDP, + XTP_TERT_RESTART, + XTP_SIDE_BUY, + XTP_SIDE_SELL, + XTP_PRICE_LIMIT, + XTP_PRICE_BEST5_OR_CANCEL, + XTP_BUSINESS_TYPE_CASH ) from vnpy.event import EventEngine -from vnpy.trader.constant import Exchange, Product +from vnpy.trader.constant import Exchange, Product, Direction, OrderType from vnpy.trader.gateway import BaseGateway -from vnpy.trader.object import ( - CancelRequest, - OrderRequest, - SubscribeRequest, - TickData, - ContractData, -) +from vnpy.trader.object import (CancelRequest, OrderRequest, SubscribeRequest, + TickData, ContractData, OrderData, TradeData, + PositionData, AccountData) from vnpy.trader.utility import get_folder_path API = XTP.API @@ -39,17 +47,31 @@ EXCHANGE_XTP2VT = { EXCHANGE_VT2XTP = {v: k for k, v in EXCHANGE_XTP2VT.items()} PRODUCT_XTP2VT = { - XTP_TICKER_TYPE.XTP_TICKER_TYPE_STOCK: Product.EQUITY, - XTP_TICKER_TYPE.XTP_TICKER_TYPE_INDEX: Product.INDEX, - XTP_TICKER_TYPE.XTP_TICKER_TYPE_FUND: Product.FUND, - XTP_TICKER_TYPE.XTP_TICKER_TYPE_BOND: Product.BOND, - XTP_TICKER_TYPE.XTP_TICKER_TYPE_OPTION: Product.OPTION, + XTP_TICKER_TYPE_STOCK: Product.EQUITY, + XTP_TICKER_TYPE_INDEX: Product.INDEX, + XTP_TICKER_TYPE_FUND: Product.FUND, + XTP_TICKER_TYPE_BOND: Product.BOND, + XTP_TICKER_TYPE_OPTION: Product.OPTION } +DIRECTION_VT2XTP = { + Direction.LONG: XTP_SIDE_BUY, + Direction.SHORT: XTP_SIDE_SELL +} +DIRECTION_XTP2VT = {v: k for k, v in DIRECTION_VT2XTP.items()} + +ORDERTYPE_VT2XTP = { + OrderType.LIMIT: XTP_PRICE_LIMIT, + OrderType.MARKET: XTP_PRICE_BEST5_OR_CANCEL +} +ORDERTYPE_XTP2VT = {v: k for k, v in ORDERTYPE_VT2XTP.items()} + + symbol_name_map = {} class XtpGateway(BaseGateway): + default_setting = { "账号": "", "密码": "", @@ -58,7 +80,7 @@ class XtpGateway(BaseGateway): "行情端口": 0, "交易地址": "", "交易端口": 0, - "行情协议": ["TCP", "UDP"], + "行情协议": ["TCP", "UDP"] } def __init__(self, event_engine: EventEngine): @@ -66,56 +88,56 @@ class XtpGateway(BaseGateway): super().__init__(event_engine, "XTP") self.quote_api = XtpQuoteApi(self) - - set_async_callback_exception_handler(self._async_callback_exception_handler) - pass + self.trader_api = XtpTraderApi(self) def connect(self, setting: dict): """""" - userid = setting["账号"] - password = setting["密码"] - client_id = int(setting["客户号"]) - quote_ip = setting["行情地址"] - quote_port = int(setting["行情端口"]) - trade_ip = setting["交易地址"] - trade_port = setting["交易端口"] + userid = setting['账号'] + password = setting['密码'] + client_id = setting['客户号'] + quote_ip = setting['行情地址'] + quote_port = setting['行情端口'] + trader_ip = setting['交易地址'] + trader_port = setting['交易端口'] quote_protocol = setting["行情协议"] - self.quote_api.connect( - userid, password, client_id, quote_ip, quote_port, quote_protocol - ) + self.quote_api.connect(userid, password, client_id, + quote_ip, quote_port, quote_protocol) + self.trader_api.connect(userid, password, client_id, + trader_ip, trader_port) def close(self): """""" - pass + self.quote_api.close() + self.trader_api.close() def subscribe(self, req: SubscribeRequest): """""" self.quote_api.subscrbie(req) def send_order(self, req: OrderRequest) -> str: - pass + """""" + return self.trader_api.send_order(req) def cancel_order(self, req: CancelRequest): - pass + """""" + self.trader_api.cancel_order(req) def query_account(self): - pass + """""" + self.trader_api.query_account() def query_position(self): - pass - - def _async_callback_exception_handler(self, e: AsyncDispatchException): - error_str = f"发生内部错误:\n" f"位置:{e.instance}.{e.function_name}" f"详细信息:{e.what}" - print(error_str, file=sys.stderr) - - self.write_log(error_str) # write_error function? + """""" + self.trader_api.query_position() class XtpQuoteApi(API.QuoteSpi): + def __init__(self, gateway: BaseGateway): """""" - super(XtpQuoteApi, self).__init__() + super().__init__() + self.gateway = gateway self.gateway_name = gateway.gateway_name @@ -132,10 +154,10 @@ class XtpQuoteApi(API.QuoteSpi): self, userid: str, password: str, - client_id: int, + client_id: str, server_ip: str, - server_port: int, - quote_protocol: str, + server_port: str, + quote_protocol: str ): """""" if self.api: @@ -148,15 +170,17 @@ class XtpQuoteApi(API.QuoteSpi): self.server_port = server_port if quote_protocol == "CTP": - self.quote_protocol = XTP_PROTOCOL_TYPE.XTP_PROTOCOL_TCP + self.quote_protocol = XTP_PROTOCOL_TCP else: - self.quote_protocol = XTP_PROTOCOL_TYPE.XTP_PROTOCOL_UDP + self.quote_protocol = XTP_PROTOCOL_UDP # Create API object path = str(get_folder_path(self.gateway_name.lower())) self.api = API.QuoteApi.CreateQuoteApi( - self.client_id, path, XTP_LOG_LEVEL.XTP_LOG_LEVEL_TRACE + self.client_id, + path, + XTP_LOG_LEVEL.XTP_LOG_LEVEL_TRACE ) self.api.RegisterSpi(self) @@ -168,17 +192,22 @@ class XtpQuoteApi(API.QuoteSpi): self.server_port, self.userid, self.password, - self.quote_protocol, + self.quote_protocol ) if not ret: msg = "行情服务器登录成功" - self.query_contract() else: msg = f"行情服务器登录失败,原因:{ret}" self.gateway.write_log(msg) + def close(self): + """""" + if self.api: + self.api.RegisterSpi(None) + self.api.Release() + def subscrbie(self, req: SubscribeRequest): """""" xtp_exchange = EXCHANGE_VT2XTP.get(req.exchange, "") @@ -187,13 +216,11 @@ class XtpQuoteApi(API.QuoteSpi): def query_contract(self): """""" for exchange_id in EXCHANGE_XTP2VT.keys(): - ret = self.api.QueryAllTickers(exchange_id) - if ret != 0: - self.gateway.write_log("订阅合约失败") + self.api.QueryAllTickers(exchange_id) def check_error(self, func_name: str, error_info: XTPRspInfoStruct): """""" - if error_info and error_info.error_id: + if error_info.error_id: msg = f"{func_name}发生错误, 代码:{error_info.error_id},信息:{error_info.error_msg}" self.gateway.write_log(msg) return True @@ -208,37 +235,21 @@ class XtpQuoteApi(API.QuoteSpi): """""" self.check_error("行情接口", error_info) - def OnSubMarketData( - self, - ticker: XTPSpecificTickerStruct, - error_info: XTPRspInfoStruct, - is_last: bool, - ) -> Any: + def OnSubMarketData(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: """""" self.check_error("订阅行情", error_info) - return super().OnSubMarketData(ticker, error_info, is_last) - def OnUnSubMarketData( - self, - ticker: XTPSpecificTickerStruct, - error_info: XTPRspInfoStruct, - is_last: bool, - ) -> Any: + def OnUnSubMarketData(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: """""" pass - def OnDepthMarketData( - self, - market_data: XTPMarketDataStruct, - bid1_qty: Sequence[int], - bid1_count: int, - max_bid1_count: int, - ask1_qty: Sequence[int], - ask1_count: int, - max_ask1_count: int, - ) -> Any: + def OnDepthMarketData(self, market_data: XTPMarketDataStruct, bid1_qty: Sequence[int], + bid1_count: int, max_bid1_count: int, ask1_qty: Sequence[int], + ask1_count: int, max_ask1_count: int) -> Any: """""" - timestamp = market_data.data_time + timestamp = market_data.date_time tick = TickData( symbol=market_data.ticker, @@ -272,27 +283,19 @@ class XtpQuoteApi(API.QuoteSpi): ask_volume_3=market_data.ask_qty[2], ask_volume_4=market_data.ask_qty[3], ask_volume_5=market_data.ask_qty[4], - gateway_name=self.gateway_name, + gateway_name=self.gateway_name ) tick.name = symbol_name_map.get(tick.vt_symbol, tick.symbol) self.gateway.on_tick(tick) - def OnSubOrderBook( - self, - ticker: XTPSpecificTickerStruct, - error_info: XTPRspInfoStruct, - is_last: bool, - ) -> Any: + def OnSubOrderBook(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: """""" pass - def OnUnSubOrderBook( - self, - ticker: XTPSpecificTickerStruct, - error_info: XTPRspInfoStruct, - is_last: bool, - ) -> Any: + def OnUnSubOrderBook(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: """""" pass @@ -300,21 +303,13 @@ class XtpQuoteApi(API.QuoteSpi): """""" pass - def OnSubTickByTick( - self, - ticker: XTPSpecificTickerStruct, - error_info: XTPRspInfoStruct, - is_last: bool, - ) -> Any: + def OnSubTickByTick(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: """""" pass - def OnUnSubTickByTick( - self, - ticker: XTPSpecificTickerStruct, - error_info: XTPRspInfoStruct, - is_last: bool, - ) -> Any: + def OnUnSubTickByTick(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: """""" pass @@ -322,48 +317,39 @@ class XtpQuoteApi(API.QuoteSpi): """""" pass - def OnSubscribeAllMarketData( - self, exchange_id: XTP_EXCHANGE_TYPE, error_info: XTPRspInfoStruct - ) -> Any: + def OnSubscribeAllMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: """""" pass - def OnUnSubscribeAllMarketData( - self, exchange_id: XTP_EXCHANGE_TYPE, error_info: XTPRspInfoStruct - ) -> Any: + def OnUnSubscribeAllMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: """""" pass - def OnSubscribeAllOrderBook( - self, exchange_id: XTP_EXCHANGE_TYPE, error_info: XTPRspInfoStruct - ) -> Any: + def OnSubscribeAllOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: """""" pass - def OnUnSubscribeAllOrderBook( - self, exchange_id: XTP_EXCHANGE_TYPE, error_info: XTPRspInfoStruct - ) -> Any: + def OnUnSubscribeAllOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: """""" pass - def OnSubscribeAllTickByTick( - self, exchange_id: XTP_EXCHANGE_TYPE, error_info: XTPRspInfoStruct - ) -> Any: + def OnSubscribeAllTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: """""" pass - def OnUnSubscribeAllTickByTick( - self, exchange_id: XTP_EXCHANGE_TYPE, error_info: XTPRspInfoStruct - ) -> Any: + def OnUnSubscribeAllTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: """""" pass - def OnQueryAllTickers( - self, - ticker_info: XTPQuoteStaticInfo, - error_info: XTPRspInfoStruct, - is_last: bool, - ) -> Any: + def OnQueryAllTickers(self, ticker_info: XTPQuoteStaticInfo, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: + """""" if self.check_error("查询合约", error_info): return @@ -375,53 +361,166 @@ class XtpQuoteApi(API.QuoteSpi): size=1, pricetick=ticker_info.pricetick, min_volume=ticker_info.buy_qty_unit, - gateway_name=self.gateway_name, + gateway_name=self.gateway_name ) self.gateway.on_contract(contract) symbol_name_map[contract.vt_symbol] = contract.name - def OnQueryTickersPriceInfo( + def OnQueryTickersPriceInfo(self, ticker_info: XTPTickerPriceInfo, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: + """""" + pass + + def OnSubscribeAllOptionMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnUnSubscribeAllOptionMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnSubscribeAllOptionOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnUnSubscribeAllOptionOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnSubscribeAllOptionTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnUnSubscribeAllOptionTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + +class XtpTraderApi(API.TraderSpi): + + def __init__(self, gateway: BaseGateway): + """""" + super().__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.userid = "" + self.password = "" + self.client_id = "" + self.server_ip = "" + self.server_port = "" + + self.api = None + self.session_id = 0 + + self.reqid = 0 + self.orderid = 0 + + def connect( self, - ticker_info: XTPTickerPriceInfo, - error_info: XTPRspInfoStruct, - is_last: bool, - ) -> Any: + userid: str, + password: str, + client_id: str, + server_ip: str, + server_port: str + ): """""" - pass + if self.api: + return - def OnSubscribeAllOptionMarketData( - self, exchange_id: XTP_EXCHANGE_TYPE, error_info: XTPRspInfoStruct - ) -> Any: - """""" - pass + self.userid = userid + self.password = password + self.client_id = client_id + self.server_ip = server_ip + self.server_port = server_port - def OnUnSubscribeAllOptionMarketData( - self, exchange_id: XTP_EXCHANGE_TYPE, error_info: XTPRspInfoStruct - ) -> Any: - """""" - pass + # Create API object + path = str(get_folder_path(self.gateway_name.lower())) - def OnSubscribeAllOptionOrderBook( - self, exchange_id: XTP_EXCHANGE_TYPE, error_info: XTPRspInfoStruct - ) -> Any: - """""" - pass + self.api = API.TraderApi.CreateTraderApi( + self.client_id, + path, + XTP_LOG_LEVEL.XTP_LOG_LEVEL_TRACE + ) - def OnUnSubscribeAllOptionOrderBook( - self, exchange_id: XTP_EXCHANGE_TYPE, error_info: XTPRspInfoStruct - ) -> Any: - """""" - pass + self.api.RegisterSpi(self) + self.api.SubscribePublicTopic(XTP_TERT_RESTART) - def OnSubscribeAllOptionTickByTick( - self, exchange_id: XTP_EXCHANGE_TYPE, error_info: XTPRspInfoStruct - ) -> Any: - """""" - pass + self.gateway.write_log("交易接口初始化成功") - def OnUnSubscribeAllOptionTickByTick( - self, exchange_id: XTP_EXCHANGE_TYPE, error_info: XTPRspInfoStruct - ) -> Any: + # Login to server + self.session_id = self.api.Login( + self.server_ip, + self.server_port, + self.userid, + self.password, + XTP_PROTOCOL_TCP + ) + + if self.session_id: + msg = "交易服务器登录成功" + else: + reason = self.api.GetApiLastError() + msg = f"交易服务器登录失败,原因:{reason}" + + self.gateway.write_log(msg) + + def close(self): """""" - pass + if self.api: + self.api.RegisterSpi(None) + self.api.Release() + + def send_order(self, req: OrderRequest) -> str: + """""" + self.orderid += 1 + + xtp_req = XTPOrderInsertInfo() + xtp_req.ticker = req.symbol + xtp_req.market = EXCHANGE_XTP2VT[req.exchange] + xtp_req.price = req.price + xtp_req.quantity = req.volume + xtp_req.order_client_id = self.client_id + xtp_req.order_xtp_id = self.orderid + xtp_req.side = DIRECTION_XTP2VT.get(req.direction, "") + xtp_req.price_type = ORDERTYPE_XTP2VT.get(req.type, "") + xtp_req.business_type = XTP_BUSINESS_TYPE_CASH + + self.api.InsertOrder(xtp_req, self.session_id) + + order = req.create_order_data(str(self.orderid)) + self.gateway.on_order(order) + + return order.vt_orderid + + def cancel_order(self, req: CancelRequest): + """""" + self.api.CancelOrder(req.orderid, self.session_id) + + def query_account(self): + """""" + self.reqid += 1 + self.api.QueryAsset(self.session_id, self.reqid) + + def query_position(self): + """""" + self.reqid += 1 + self.api.QueryPosition("", self.session_id, self.reqid) + + def query_order(self): + """""" + self.reqid += 1 + self.api.QueryOrders(XTPQueryOrderReq(), self.session_id, self.reqid) + + def query_trade(self): + """""" + self.reqid += 1 + self.api.QueryTrades(XTPQueryTraderReq(), self.session_id, self.reqid)