diff --git a/tests/trader/run.py b/tests/trader/run.py index 6d38ac31..2702f92b 100644 --- a/tests/trader/run.py +++ b/tests/trader/run.py @@ -7,8 +7,8 @@ from vnpy.trader.ui import MainWindow, create_qapp from vnpy.gateway.bitmex import BitmexGateway from vnpy.gateway.futu import FutuGateway from vnpy.gateway.ib import IbGateway -# from vnpy.gateway.ctp import CtpGateway -from vnpy.gateway.femas import FemasGateway +from vnpy.gateway.ctp import CtpGateway +#from vnpy.gateway.femas import FemasGateway from vnpy.gateway.tiger import TigerGateway from vnpy.gateway.oes import OesGateway from vnpy.gateway.okex import OkexGateway @@ -16,6 +16,7 @@ from vnpy.gateway.huobi import HuobiGateway from vnpy.gateway.bitfinex import BitfinexGateway from vnpy.gateway.onetoken import OnetokenGateway from vnpy.gateway.okexf import OkexfGateway +from vnpy.gateway.xtp import XtpGateway from vnpy.app.cta_strategy import CtaStrategyApp from vnpy.app.csv_loader import CsvLoaderApp @@ -30,8 +31,9 @@ def main(): event_engine = EventEngine() main_engine = MainEngine(event_engine) - # main_engine.add_gateway(CtpGateway) - main_engine.add_gateway(FemasGateway) + main_engine.add_gateway(XtpGateway) + main_engine.add_gateway(CtpGateway) + # main_engine.add_gateway(FemasGateway) main_engine.add_gateway(IbGateway) main_engine.add_gateway(FutuGateway) main_engine.add_gateway(BitmexGateway) diff --git a/vnpy/gateway/xtp/xtp_gateway.py b/vnpy/gateway/xtp/xtp_gateway.py index 91b72d3f..c3d8ac85 100644 --- a/vnpy/gateway/xtp/xtp_gateway.py +++ b/vnpy/gateway/xtp/xtp_gateway.py @@ -1,184 +1,94 @@ from typing import Any, Sequence -from vnpy.api.xtp.vnxtp import (OrderBookStruct, XTP, XTPMarketDataStruct, XTPQuoteStaticInfo, - XTPRspInfoStruct, XTPSpecificTickerStruct, XTPTickByTickStruct, - XTPTickerPriceInfo, XTP_EXCHANGE_TYPE, XTP_LOG_LEVEL, - XTP_PROTOCOL_TYPE) +from vnpy.api.xtp.vnxtp import ( + OrderBookStruct, + XTP, + XTPMarketDataStruct, + XTPQuoteStaticInfo, + XTPRspInfoStruct, + XTPSpecificTickerStruct, + XTPTickByTickStruct, + XTPTickerPriceInfo, + XTP_EXCHANGE_TYPE, + XTP_LOG_LEVEL, + XTP_PROTOCOL_TYPE, + XTP_TICKER_TYPE_STOCK, + XTP_TICKER_TYPE_INDEX, + XTP_TICKER_TYPE_FUND, + XTP_TICKER_TYPE_BOND, + XTP_TICKER_TYPE_OPTION, + XTP_PROTOCOL_TCP, + XTP_PROTOCOL_UDP +) from vnpy.event import EventEngine -from vnpy.trader.constant import Exchange +from vnpy.trader.constant import Exchange, Product from vnpy.trader.gateway import BaseGateway -from vnpy.trader.object import CancelRequest, OrderRequest, SubscribeRequest +from vnpy.trader.object import (CancelRequest, OrderRequest, SubscribeRequest, + TickData, ContractData, OrderData, TradeData, + PositionData, AccountData) +from vnpy.trader.utility import get_folder_path API = XTP.API + EXCHANGE_XTP2VT = { XTP_EXCHANGE_TYPE.XTP_EXCHANGE_SH: Exchange.SSE, XTP_EXCHANGE_TYPE.XTP_EXCHANGE_SZ: Exchange.SZSE, } EXCHANGE_VT2XTP = {v: k for k, v in EXCHANGE_XTP2VT.items()} +PRODUCT_XTP2VT = { + XTP_TICKER_TYPE_STOCK: Product.EQUITY, + XTP_TICKER_TYPE_INDEX: Product.INDEX, + XTP_TICKER_TYPE_FUND: Product.FUND, + XTP_TICKER_TYPE_BOND: Product.BOND, + XTP_TICKER_TYPE_OPTION: Product.OPTION +} -class QuoteSpi(API.QuoteSpi): - def OnDisconnected(self, reason: int) -> Any: - print("OnDisconnected") - return super().OnDisconnected(reason) - - def OnError(self, error_info: XTPRspInfoStruct) -> Any: - return super().OnError(error_info) - - def OnSubMarketData(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, - is_last: bool) -> Any: - print("OnSubMarketData") - return super().OnSubMarketData(ticker, error_info, is_last) - - def OnUnSubMarketData(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, - is_last: bool) -> Any: - return super().OnUnSubMarketData(ticker, error_info, is_last) - - def OnDepthMarketData(self, market_data: XTPMarketDataStruct, bid1_qty: Sequence[int], - bid1_count: int, max_bid1_count: int, ask1_qty: Sequence[int], - ask1_count: int, max_ask1_count: int) -> Any: - print("OnDepthMarketData") - return super().OnDepthMarketData(market_data, bid1_qty, bid1_count, max_bid1_count, - ask1_qty, ask1_count, max_ask1_count) - - def OnSubOrderBook(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, - is_last: bool) -> Any: - return super().OnSubOrderBook(ticker, error_info, is_last) - - def OnUnSubOrderBook(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, - is_last: bool) -> Any: - return super().OnUnSubOrderBook(ticker, error_info, is_last) - - def OnOrderBook(self, order_book: OrderBookStruct) -> Any: - return super().OnOrderBook(order_book) - - def OnSubTickByTick(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, - is_last: bool) -> Any: - return super().OnSubTickByTick(ticker, error_info, is_last) - - def OnUnSubTickByTick(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, - is_last: bool) -> Any: - return super().OnUnSubTickByTick(ticker, error_info, is_last) - - def OnTickByTick(self, tbt_data: XTPTickByTickStruct) -> Any: - return super().OnTickByTick(tbt_data) - - def OnSubscribeAllMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnSubscribeAllMarketData(exchange_id, error_info) - - def OnUnSubscribeAllMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnUnSubscribeAllMarketData(exchange_id, error_info) - - def OnSubscribeAllOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnSubscribeAllOrderBook(exchange_id, error_info) - - def OnUnSubscribeAllOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnUnSubscribeAllOrderBook(exchange_id, error_info) - - def OnSubscribeAllTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnSubscribeAllTickByTick(exchange_id, error_info) - - def OnUnSubscribeAllTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnUnSubscribeAllTickByTick(exchange_id, error_info) - - def OnQueryAllTickers(self, ticker_info: XTPQuoteStaticInfo, error_info: XTPRspInfoStruct, - is_last: bool) -> Any: - return super().OnQueryAllTickers(ticker_info, error_info, is_last) - - def OnQueryTickersPriceInfo(self, ticker_info: XTPTickerPriceInfo, error_info: XTPRspInfoStruct, - is_last: bool) -> Any: - return super().OnQueryTickersPriceInfo(ticker_info, error_info, is_last) - - def OnSubscribeAllOptionMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnSubscribeAllOptionMarketData(exchange_id, error_info) - - def OnUnSubscribeAllOptionMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnUnSubscribeAllOptionMarketData(exchange_id, error_info) - - def OnSubscribeAllOptionOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnSubscribeAllOptionOrderBook(exchange_id, error_info) - - def OnUnSubscribeAllOptionOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnUnSubscribeAllOptionOrderBook(exchange_id, error_info) - - def OnSubscribeAllOptionTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnSubscribeAllOptionTickByTick(exchange_id, error_info) - - def OnUnSubscribeAllOptionTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, - error_info: XTPRspInfoStruct) -> Any: - return super().OnUnSubscribeAllOptionTickByTick(exchange_id, error_info) +symbol_name_map = {} class XtpGateway(BaseGateway): - def __init__(self, event_engine: "EventEngine"): - self.client_id: int = 1 - self.quote_api = API.QuoteApi.CreateQuoteApi( - self.client_id, # todo: change client id - "log", # todo: use vnpy temp path - XTP_LOG_LEVEL.XTP_LOG_LEVEL_TRACE - ) - self.quote_spi = QuoteSpi() - super().__init__(event_engine, "XTP") - default_setting = { - "client_id": "0", - "quote_server_ip": "", - "quote_server_port": "", - "quote_server_protocol": ["TCP", "UDP"], - "quote_userid": "", - "quote_password": "", + "账号": "", + "密码": "", + "客户号": 1, + "行情地址": "", + "行情端口": 0, + "交易地址": "", + "交易端口": 0, + "行情协议": ["TCP", "UDP"] } + def __init__(self, event_engine: EventEngine): + """""" + super().__init__(event_engine, "XTP") + + self.quote_api = XtpQuoteApi(self) + def connect(self, setting: dict): - self.client_id = int(setting['client_id']) - quote_server_ip = setting['quote_server_ip'] - quote_server_port = int(setting['quote_server_port']) - quote_server_protocol = setting['quote_server_protocol'] - quote_userid = setting['quote_userid'] - quote_password = setting['quote_password'] + """""" + userid = setting['账号'] + password = setting['密码'] + client_id = setting['客户号'] + quote_ip = setting['行情地址'] + quote_port = setting['行情端口'] + trade_ip = setting['交易地址'] + trade_port = setting['交易端口'] + quote_protocol = setting["行情协议"] - quote_protocol = XTP_PROTOCOL_TYPE.XTP_PROTOCOL_TCP if quote_server_protocol == 'TCP' else 'UDP' - - self.quote_api.RegisterSpi(self.quote_spi) - # self.quote_api.SetHeartBeatInterval(60) - - ret = self.quote_api.Login( - quote_server_ip, - quote_server_port, - quote_userid, - quote_password, - quote_protocol - ) - if ret == 0: - # login succeed - self.write_log("Login succeed.") - pass + self.quote_api.connect(userid, password, client_id, + quote_ip, quote_port, quote_protocol) def close(self): + """""" pass def subscribe(self, req: SubscribeRequest): - ret = self.quote_api.SubscribeMarketData( - [req.symbol], - EXCHANGE_VT2XTP[req.exchange], - ) - if ret != 0: - print("订阅行情失败") # improve: return True or False, or raise with reason - pass + """""" + self.quote_api.subscrbie(req) def send_order(self, req: OrderRequest) -> str: pass @@ -191,3 +101,268 @@ class XtpGateway(BaseGateway): def query_position(self): pass + + +class XtpQuoteApi(API.QuoteSpi): + + def __init__(self, gateway: BaseGateway): + """""" + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.userid = "" + self.password = "" + self.client_id = "" + self.server_ip = "" + self.server_port = "" + self.server_protocol = "" + + self.api = None + + def connect( + self, + userid: str, + password: str, + client_id: str, + server_ip: str, + server_port: str, + quote_protocol: str + ): + """""" + if self.api: + return + + self.userid = userid + self.password = password + self.client_id = client_id + self.server_ip = server_ip + self.server_port = server_port + + if quote_protocol == "CTP": + self.quote_protocol = XTP_PROTOCOL_TCP + else: + self.quote_protocol = XTP_PROTOCOL_UDP + + # Create API object + path = str(get_folder_path(self.gateway_name.lower())) + + self.api = API.QuoteApi.CreateQuoteApi( + self.client_id, + path, + XTP_LOG_LEVEL.XTP_LOG_LEVEL_TRACE + ) + + self.api.RegisterSpi(self) + self.gateway.write_log("行情接口初始化成功") + + # Login to server + ret = self.api.Login( + self.server_ip, + self.server_port, + self.userid, + self.password, + self.quote_protocol + ) + + if not ret: + msg = "行情服务器登录成功" + + self.query_contract() + else: + msg = f"行情服务器登录失败,原因:{ret}" + self.gateway.write_log(msg) + + def subscrbie(self, req: SubscribeRequest): + """""" + xtp_exchange = EXCHANGE_VT2XTP.get(req.exchange, "") + self.api.SubscribeMarketData([req.symbol], xtp_exchange) + + def query_contract(self): + """""" + for exchange_id in EXCHANGE_XTP2VT.keys(): + self.api.QueryAllTickers(exchange_id) + + def check_error(self, func_name: str, error_info: XTPRspInfoStruct): + """""" + if error_info.error_id: + msg = f"{func_name}发生错误, 代码:{error_info.error_id},信息:{error_info.error_msg}" + self.gateway.write_log(msg) + return True + else: + return False + + def OnDisconnected(self, reason: int) -> Any: + """""" + self.gateway.write_log("行情服务器连接断开") + + def OnError(self, error_info: XTPRspInfoStruct) -> Any: + """""" + self.check_error("行情接口", error_info) + + def OnSubMarketData(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: + """""" + self.check_error("订阅行情", error_info) + + def OnUnSubMarketData(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: + """""" + pass + + def OnDepthMarketData(self, market_data: XTPMarketDataStruct, bid1_qty: Sequence[int], + bid1_count: int, max_bid1_count: int, ask1_qty: Sequence[int], + ask1_count: int, max_ask1_count: int) -> Any: + """""" + timestamp = market_data.date_time + + tick = TickData( + symbol=market_data.ticker, + exchange=EXCHANGE_XTP2VT[market_data.exchange_id], + datetime=timestamp, + volume=market_data.qty, + last_price=market_data.last_price, + limit_up=market_data.upper_limit_price, + limit_down=market_data.lower_limit_price, + open_price=market_data.open_price, + high_price=market_data.high_price, + low_price=market_data.low_price, + pre_close=market_data.pre_close_price, + bid_price_1=market_data.bid[0], + bid_price_2=market_data.bid[1], + bid_price_3=market_data.bid[2], + bid_price_4=market_data.bid[3], + bid_price_5=market_data.bid[4], + ask_price_1=market_data.ask[0], + ask_price_2=market_data.ask[1], + ask_price_3=market_data.ask[2], + ask_price_4=market_data.ask[3], + ask_price_5=market_data.ask[4], + bid_volume_1=market_data.bid_qty[0], + bid_volume_2=market_data.bid_qty[1], + bid_volume_3=market_data.bid_qty[2], + bid_volume_4=market_data.bid_qty[3], + bid_volume_5=market_data.bid_qty[4], + ask_volume_1=market_data.ask_qty[0], + ask_volume_2=market_data.ask_qty[1], + ask_volume_3=market_data.ask_qty[2], + ask_volume_4=market_data.ask_qty[3], + ask_volume_5=market_data.ask_qty[4], + gateway_name=self.gateway_name + ) + tick.name = symbol_name_map.get(tick.vt_symbol, tick.symbol) + + self.gateway.on_tick(tick) + + def OnSubOrderBook(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: + """""" + pass + + def OnUnSubOrderBook(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: + """""" + pass + + def OnOrderBook(self, order_book: OrderBookStruct) -> Any: + """""" + pass + + def OnSubTickByTick(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: + """""" + pass + + def OnUnSubTickByTick(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: + """""" + pass + + def OnTickByTick(self, tbt_data: XTPTickByTickStruct) -> Any: + """""" + pass + + def OnSubscribeAllMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnUnSubscribeAllMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnSubscribeAllOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnUnSubscribeAllOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnSubscribeAllTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnUnSubscribeAllTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnQueryAllTickers(self, ticker_info: XTPQuoteStaticInfo, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: + """""" + return + # if self.check_error("查询合约", error_info): + # return + + # contract = ContractData( + # symbol=ticker_info.ticker, + # exchange=EXCHANGE_XTP2VT[ticker_info.exchange_id], + # name=ticker_info.ticker_name, + # product=PRODUCT_XTP2VT[ticker_info.ticker_type], + # size=1, + # pricetick=ticker_info.pricetick, + # min_volume=ticker_info.buy_qty_unit, + # gateway_name=self.gateway_name + # ) + # self.gateway.on_contract(contract) + + # symbol_name_map[contract.vt_symbol] = contract.name + + def OnQueryTickersPriceInfo(self, ticker_info: XTPTickerPriceInfo, error_info: XTPRspInfoStruct, + is_last: bool) -> Any: + """""" + pass + + def OnSubscribeAllOptionMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnUnSubscribeAllOptionMarketData(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnSubscribeAllOptionOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnUnSubscribeAllOptionOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnSubscribeAllOptionTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass + + def OnUnSubscribeAllOptionTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE, + error_info: XTPRspInfoStruct) -> Any: + """""" + pass