From 7941af5be8d14afa4ffea8e56584d46bb5d33404 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Fri, 26 Apr 2019 16:39:12 +0800 Subject: [PATCH] [Mod]complete test of XtpGateway --- tests/trader/run.py | 2 +- vnpy/gateway/tiger/tiger_gateway.py | 6 - vnpy/gateway/xtp/xtp_gateway.py | 245 +++++++++++++++------------- 3 files changed, 133 insertions(+), 120 deletions(-) diff --git a/tests/trader/run.py b/tests/trader/run.py index 2702f92b..bb940a5f 100644 --- a/tests/trader/run.py +++ b/tests/trader/run.py @@ -8,7 +8,7 @@ from vnpy.gateway.bitmex import BitmexGateway from vnpy.gateway.futu import FutuGateway from vnpy.gateway.ib import IbGateway from vnpy.gateway.ctp import CtpGateway -#from vnpy.gateway.femas import FemasGateway +# from vnpy.gateway.femas import FemasGateway from vnpy.gateway.tiger import TigerGateway from vnpy.gateway.oes import OesGateway from vnpy.gateway.okex import OkexGateway diff --git a/vnpy/gateway/tiger/tiger_gateway.py b/vnpy/gateway/tiger/tiger_gateway.py index 997d3f02..1b17ca2f 100644 --- a/vnpy/gateway/tiger/tiger_gateway.py +++ b/vnpy/gateway/tiger/tiger_gateway.py @@ -299,8 +299,6 @@ class TigerGateway(BaseGateway): def on_order_change(self, tiger_account: str, data: list): """""" data = dict(data) - print("委托推送", data["origin_symbol"], - data["order_id"], data["filled"], data["status"]) symbol, exchange = convert_symbol_tiger2vt(data["origin_symbol"]) status = PUSH_STATUS_TIGER2VT[data["status"]] @@ -368,8 +366,6 @@ class TigerGateway(BaseGateway): self.ID_VT2TIGER[local_id] = str(order.order_id) self.trade_client.place_order(order) - print("发单:", order.contract.symbol, - order.order_id, order.quantity, order.status) except: # noqa traceback.print_exc() @@ -551,8 +547,6 @@ class TigerGateway(BaseGateway): self.on_order(order) self.ID_VT2TIGER = {v: k for k, v in self.ID_TIGER2VT.items()} - print("原始委托字典", self.ID_TIGER2VT) - print("原始反向字典", self.ID_VT2TIGER) def process_deal(self, data): """ diff --git a/vnpy/gateway/xtp/xtp_gateway.py b/vnpy/gateway/xtp/xtp_gateway.py index 4249bb43..726bf41e 100644 --- a/vnpy/gateway/xtp/xtp_gateway.py +++ b/vnpy/gateway/xtp/xtp_gateway.py @@ -1,19 +1,18 @@ from typing import Any, Sequence from datetime import datetime +from threading import Thread from vnpy.api.xtp.vnxtp import ( - OrderBookStruct, XTP, set_async_callback_exception_handler, AsyncDispatchException, + OrderBookStruct, XTPMarketDataStruct, XTPQuoteStaticInfo, XTPRspInfoStruct, XTPSpecificTickerStruct, XTPTickByTickStruct, XTPTickerPriceInfo, - XTPQueryOrderReq, - XTPQueryTraderReq, XTPOrderInsertInfo, XTPOrderInfo, XTPTradeReport, @@ -29,29 +28,18 @@ from vnpy.api.xtp.vnxtp import ( XTPQueryOptionAuctionInfoRsp, XTP_EXCHANGE_TYPE, XTP_LOG_LEVEL, - XTP_TICKER_TYPE_STOCK, - XTP_TICKER_TYPE_INDEX, - XTP_TICKER_TYPE_FUND, - XTP_TICKER_TYPE_BOND, - XTP_TICKER_TYPE_OPTION, - XTP_PROTOCOL_TCP, - XTP_PROTOCOL_UDP, - XTP_TERT_RESTART, + XTP_PROTOCOL_TYPE, + XTP_TE_RESUME_TYPE, XTP_SIDE_BUY, XTP_SIDE_SELL, - XTP_PRICE_LIMIT, - XTP_PRICE_BEST5_OR_CANCEL, - XTP_BUSINESS_TYPE_CASH, - XTP_ORDER_STATUS_INIT, - XTP_ORDER_STATUS_ALLTRADED, - XTP_ORDER_STATUS_PARTTRADEDQUEUEING, - XTP_ORDER_STATUS_PARTTRADEDNOTQUEUEING, - XTP_ORDER_STATUS_NOTRADEQUEUEING, - XTP_ORDER_STATUS_CANCELED, - XTP_ORDER_STATUS_REJECTED + XTP_BUSINESS_TYPE, + XTP_TICKER_TYPE, + XTP_MARKET_TYPE, + XTP_PRICE_TYPE, + XTP_ORDER_STATUS_TYPE ) - from vnpy.event import EventEngine +from vnpy.trader.event import EVENT_TIMER from vnpy.trader.constant import Exchange, Product, Direction, OrderType, Status from vnpy.trader.gateway import BaseGateway from vnpy.trader.object import (CancelRequest, OrderRequest, SubscribeRequest, @@ -59,20 +47,28 @@ from vnpy.trader.object import (CancelRequest, OrderRequest, SubscribeRequest, PositionData, AccountData) from vnpy.trader.utility import get_folder_path + API = XTP.API + EXCHANGE_XTP2VT = { XTP_EXCHANGE_TYPE.XTP_EXCHANGE_SH: Exchange.SSE, XTP_EXCHANGE_TYPE.XTP_EXCHANGE_SZ: Exchange.SZSE, } EXCHANGE_VT2XTP = {v: k for k, v in EXCHANGE_XTP2VT.items()} +MARKET_XTP2VT = { + XTP_MARKET_TYPE.XTP_MKT_SH_A: Exchange.SSE, + XTP_MARKET_TYPE.XTP_MKT_SZ_A: Exchange.SZSE +} +MARKET_VT2XTP = {v: k for k, v in MARKET_XTP2VT.items()} + PRODUCT_XTP2VT = { - XTP_TICKER_TYPE_STOCK: Product.EQUITY, - XTP_TICKER_TYPE_INDEX: Product.INDEX, - XTP_TICKER_TYPE_FUND: Product.FUND, - XTP_TICKER_TYPE_BOND: Product.BOND, - XTP_TICKER_TYPE_OPTION: Product.OPTION + XTP_TICKER_TYPE.XTP_TICKER_TYPE_STOCK: Product.EQUITY, + XTP_TICKER_TYPE.XTP_TICKER_TYPE_INDEX: Product.INDEX, + XTP_TICKER_TYPE.XTP_TICKER_TYPE_FUND: Product.FUND, + XTP_TICKER_TYPE.XTP_TICKER_TYPE_BOND: Product.BOND, + XTP_TICKER_TYPE.XTP_TICKER_TYPE_OPTION: Product.OPTION } DIRECTION_VT2XTP = { @@ -82,19 +78,19 @@ DIRECTION_VT2XTP = { DIRECTION_XTP2VT = {v: k for k, v in DIRECTION_VT2XTP.items()} ORDERTYPE_VT2XTP = { - OrderType.LIMIT: XTP_PRICE_LIMIT, - OrderType.MARKET: XTP_PRICE_BEST5_OR_CANCEL + OrderType.LIMIT: XTP_PRICE_TYPE.XTP_PRICE_LIMIT, + OrderType.MARKET: XTP_PRICE_TYPE.XTP_PRICE_BEST5_OR_CANCEL } ORDERTYPE_XTP2VT = {v: k for k, v in ORDERTYPE_VT2XTP.items()} STATUS_XTP2VT = { - XTP_ORDER_STATUS_INIT: Status.SUBMITTING, - XTP_ORDER_STATUS_ALLTRADED: Status.ALLTRADED, - XTP_ORDER_STATUS_PARTTRADEDQUEUEING: Status.PARTTRADED, - XTP_ORDER_STATUS_PARTTRADEDNOTQUEUEING: Status.PARTTRADED, - XTP_ORDER_STATUS_NOTRADEQUEUEING: Status.NOTTRADED, - XTP_ORDER_STATUS_CANCELED: Status.CANCELLED, - XTP_ORDER_STATUS_REJECTED: Status.REJECTED, + XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_INIT: Status.SUBMITTING, + XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_ALLTRADED: Status.ALLTRADED, + XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_PARTTRADEDQUEUEING: Status.PARTTRADED, + XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_PARTTRADEDNOTQUEUEING: Status.CANCELLED, + XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_NOTRADEQUEUEING: Status.NOTTRADED, + XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_CANCELED: Status.CANCELLED, + XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_REJECTED: Status.REJECTED, } @@ -111,7 +107,8 @@ class XtpGateway(BaseGateway): "行情端口": 0, "交易地址": "", "交易端口": 0, - "行情协议": ["TCP", "UDP"] + "行情协议": ["TCP", "UDP"], + "授权码": "" } def __init__(self, event_engine: EventEngine): @@ -134,11 +131,13 @@ class XtpGateway(BaseGateway): trader_ip = setting['交易地址'] trader_port = int(setting['交易端口']) quote_protocol = setting["行情协议"] + software_key = setting["授权码"] - # self.quote_api.connect(userid, password, client_id, - # quote_ip, quote_port, quote_protocol) + self.quote_api.connect(userid, password, client_id, + quote_ip, quote_port, quote_protocol) self.trader_api.connect(userid, password, client_id, - trader_ip, trader_port) + trader_ip, trader_port, software_key) + self.init_query() def close(self): """""" @@ -165,6 +164,23 @@ class XtpGateway(BaseGateway): """""" self.trader_api.query_position() + def process_timer_event(self, event): + """""" + self.count += 1 + if self.count < 2: + return + self.count = 0 + + func = self.query_functions.pop(0) + func() + self.query_functions.append(func) + + def init_query(self): + """""" + self.count = 0 + self.query_functions = [self.query_account, self.query_position] + self.event_engine.register(EVENT_TIMER, self.process_timer_event) + def _async_callback_exception_handler(self, e: AsyncDispatchException): error_str = f"发生内部错误:\n" f"位置:{e.instance}.{e.function_name}" f"详细信息:{e.what}" print(error_str) @@ -208,9 +224,9 @@ class XtpQuoteApi(API.QuoteSpi): self.server_port = server_port if quote_protocol == "CTP": - self.quote_protocol = XTP_PROTOCOL_TCP + self.quote_protocol = XTP_PROTOCOL_TYPE.XTP_PROTOCOL_TCP else: - self.quote_protocol = XTP_PROTOCOL_UDP + self.quote_protocol = XTP_PROTOCOL_TYPE.XTP_PROTOCOL_UDP # Create API object path = str(get_folder_path(self.gateway_name.lower())) @@ -225,6 +241,10 @@ class XtpQuoteApi(API.QuoteSpi): self.gateway.write_log("行情接口初始化成功") # Login to server + Thread(target=self.login).start() + + def login(self): + """""" ret = self.api.Login( self.server_ip, self.server_port, @@ -459,6 +479,7 @@ class XtpTraderApi(API.TraderSpi): self.client_id = "" self.server_ip = "" self.server_port = "" + self.software_key = "" self.api = None self.session_id = 0 @@ -471,6 +492,7 @@ class XtpTraderApi(API.TraderSpi): client_id: int, server_ip: str, server_port: int, + software_key: str ): """""" if self.api: @@ -481,6 +503,7 @@ class XtpTraderApi(API.TraderSpi): self.client_id = client_id self.server_ip = server_ip self.server_port = server_port + self.software_key = software_key # Create API object path = str(get_folder_path(self.gateway_name.lower())) @@ -492,25 +515,29 @@ class XtpTraderApi(API.TraderSpi): ) self.api.RegisterSpi(self) - self.api.SetSoftwareKey("vnpy_test") - self.api.SubscribePublicTopic(XTP_TERT_RESTART) - + self.api.SetSoftwareKey(self.software_key) + self.api.SubscribePublicTopic(XTP_TE_RESUME_TYPE.XTP_TERT_RESTART) + self.gateway.write_log("交易接口初始化成功") # Login to server + Thread(target=self.login).start() + + def login(self): + """""" self.session_id = self.api.Login( self.server_ip, self.server_port, self.userid, self.password, - XTP_PROTOCOL_TCP + XTP_PROTOCOL_TYPE.XTP_PROTOCOL_TCP ) - + if self.session_id: msg = "交易服务器登录成功" else: - reason = self.api.GetApiLastError() - msg = f"交易服务器登录失败,原因({reason.error_id}):{reason.error_msg}" + error = self.api.GetApiLastError() + msg = f"交易服务器登录失败,原因:{error.error_msg}" self.gateway.write_log(msg) @@ -522,19 +549,26 @@ class XtpTraderApi(API.TraderSpi): def send_order(self, req: OrderRequest) -> str: """""" + if req.exchange not in MARKET_VT2XTP: + self.gateway.write_log(f"委托失败,不支持的交易所{req.exchange.value}") + return "" + + if req.type not in ORDERTYPE_VT2XTP: + self.gateway.write_log(f"委托失败,不支持的委托类型{req.type.value}") + return "" + xtp_req = XTPOrderInsertInfo() xtp_req.ticker = req.symbol - xtp_req.market = EXCHANGE_XTP2VT[req.exchange] + xtp_req.market = MARKET_VT2XTP[req.exchange] xtp_req.price = req.price - xtp_req.quantity = req.volume - xtp_req.order_client_id = self.client_id - xtp_req.side = DIRECTION_XTP2VT.get(req.direction, "") - xtp_req.price_type = ORDERTYPE_XTP2VT.get(req.type, "") - xtp_req.business_type = XTP_BUSINESS_TYPE_CASH + xtp_req.quantity = int(req.volume) + xtp_req.side = DIRECTION_VT2XTP[req.direction] + xtp_req.price_type = ORDERTYPE_VT2XTP[req.type] + xtp_req.business_type = XTP_BUSINESS_TYPE.XTP_BUSINESS_TYPE_CASH orderid = self.api.InsertOrder(xtp_req, self.session_id) - order = req.create_order_data(str(orderid)) + order = req.create_order_data(str(orderid), self.gateway_name) self.gateway.on_order(order) return order.vt_orderid @@ -545,24 +579,20 @@ class XtpTraderApi(API.TraderSpi): def query_account(self): """""" + if not self.api: + return + self.reqid += 1 self.api.QueryAsset(self.session_id, self.reqid) def query_position(self): """""" + if not self.api: + return + self.reqid += 1 self.api.QueryPosition("", self.session_id, self.reqid) - def query_order(self): - """""" - self.reqid += 1 - self.api.QueryOrders(XTPQueryOrderReq(), self.session_id, self.reqid) - - def query_trade(self): - """""" - self.reqid += 1 - self.api.QueryTrades(XTPQueryTraderReq(), self.session_id, self.reqid) - def check_error(self, func_name: str, error_info: XTPRspInfoStruct): """""" if error_info and error_info.error_id: @@ -583,16 +613,39 @@ class XtpTraderApi(API.TraderSpi): def OnOrderEvent(self, order_info: XTPOrderInfo, error_info: XTPRspInfoStruct, session_id: int) -> Any: """""" - print("on orde event") - if self.check_error("委托下单", error_info): - return + self.check_error("委托下单", error_info) - self.updateOrder(order_info) + order = OrderData( + symbol=order_info.ticker, + exchange=MARKET_XTP2VT[order_info.market], + orderid=str(order_info.order_xtp_id), + type=ORDERTYPE_XTP2VT[order_info.price_type], + direction=DIRECTION_XTP2VT[order_info.side], + price=order_info.price, + volume=order_info.quantity, + traded=order_info.qty_traded, + status=STATUS_XTP2VT[order_info.order_status], + time=order_info.insert_time, + gateway_name=self.gateway_name + ) + + self.gateway.on_order(order) def OnTradeEvent(self, trade_info: XTPTradeReport, session_id: int) -> Any: """""" - print("on trade event") - self.updateTrade(trade_info) + trade = TradeData( + symbol=trade_info.ticker, + exchange=MARKET_XTP2VT[trade_info.market], + orderid=str(trade_info.order_xtp_id), + tradeid=str(trade_info.exec_id), + direction=DIRECTION_XTP2VT[trade_info.side], + price=trade_info.price, + volume=trade_info.quantity, + time=trade_info.trade_time, + gateway_name=self.gateway_name + ) + + self.gateway.on_trade(trade) def OnCancelOrderError(self, cancel_info: XTPOrderCancelInfo, error_info: XTPRspInfoStruct, session_id: int) -> Any: @@ -604,7 +657,7 @@ class XtpTraderApi(API.TraderSpi): """""" if self.check_error("查询委托", error_info): return - + self.updateOrder(order_info) if is_last: @@ -615,18 +668,18 @@ class XtpTraderApi(API.TraderSpi): """""" if self.check_error("查询成交", error_info): return - + self.updateTrade(trade_info) if is_last: self.gateway.write_log("查询成交信息成功") def OnQueryPosition(self, xtp_position: XTPQueryStkPositionRsp, error_info: XTPRspInfoStruct, - is_last: bool, session_id: int) -> Any: + request_id: int, is_last: bool, session_id: int) -> Any: """""" position = PositionData( symbol=xtp_position.ticker, - exchange=EXCHANGE_XTP2VT[xtp_position.market], + exchange=MARKET_XTP2VT[xtp_position.market], direction=Direction.NET, volume=xtp_position.total_qty, frozen=xtp_position.locked_position, @@ -638,12 +691,12 @@ class XtpTraderApi(API.TraderSpi): self.gateway.on_position(position) def OnQueryAsset(self, asset: XTPQueryAssetRsp, error_info: XTPRspInfoStruct, - is_last: bool, session_id: int) -> Any: + request_id: int, is_last: bool, session_id: int) -> Any: """""" account = AccountData( accountid=self.userid, - balance=asset.banlance, - frozen=(asset.frozen_margin + asset.frozen_exec_cash + asset.frozen_exec_fee), + balance=asset.buying_power, + frozen=asset.withholding_amount, gateway_name=self.gateway_name ) self.gateway.on_account(account) @@ -686,37 +739,3 @@ class XtpTraderApi(API.TraderSpi): is_last: bool, session_id: int) -> Any: """""" pass - - def updateOrder(self, order_info: XTPOrderInfo): - """""" - order = OrderData( - symbol=order_info.ticker, - exchange=EXCHANGE_XTP2VT[order_info.market], - orderid=str(order_info.order_xtp_id), - type=ORDERTYPE_XTP2VT[order_info.price_type], - direction=DIRECTION_XTP2VT[order_info.side], - price=order_info.price, - volume=order_info.quantity, - traded=order_info.qty_traded, - status=STATUS_XTP2VT[order_info.order_status], - time=order_info.insert_time, - gateway_name=self.gateway_name - ) - - self.gateway.on_order(order) - - def updateTrade(self, trade_info: XTPTradeReport): - """""" - trade = TradeData( - symbol=trade_info.ticker, - exchange=EXCHANGE_XTP2VT[trade_info.market], - orderid=str(trade_info.order_xtp_id), - tradeid=str(trade_info.exec_id), - direction=DIRECTION_XTP2VT[trade_info.side], - price=trade_info.price, - volume=trade_info.quantity, - time=trade_info.trade_time, - gateway_name=self.gateway_name - ) - - self.gateway.on_trade(trade) \ No newline at end of file