[Mod]complete test of XtpGateway

This commit is contained in:
vn.py 2019-04-26 16:39:12 +08:00
parent fc774499c2
commit 7941af5be8
3 changed files with 133 additions and 120 deletions

View File

@ -8,7 +8,7 @@ from vnpy.gateway.bitmex import BitmexGateway
from vnpy.gateway.futu import FutuGateway from vnpy.gateway.futu import FutuGateway
from vnpy.gateway.ib import IbGateway from vnpy.gateway.ib import IbGateway
from vnpy.gateway.ctp import CtpGateway from vnpy.gateway.ctp import CtpGateway
#from vnpy.gateway.femas import FemasGateway # from vnpy.gateway.femas import FemasGateway
from vnpy.gateway.tiger import TigerGateway from vnpy.gateway.tiger import TigerGateway
from vnpy.gateway.oes import OesGateway from vnpy.gateway.oes import OesGateway
from vnpy.gateway.okex import OkexGateway from vnpy.gateway.okex import OkexGateway

View File

@ -299,8 +299,6 @@ class TigerGateway(BaseGateway):
def on_order_change(self, tiger_account: str, data: list): def on_order_change(self, tiger_account: str, data: list):
"""""" """"""
data = dict(data) data = dict(data)
print("委托推送", data["origin_symbol"],
data["order_id"], data["filled"], data["status"])
symbol, exchange = convert_symbol_tiger2vt(data["origin_symbol"]) symbol, exchange = convert_symbol_tiger2vt(data["origin_symbol"])
status = PUSH_STATUS_TIGER2VT[data["status"]] status = PUSH_STATUS_TIGER2VT[data["status"]]
@ -368,8 +366,6 @@ class TigerGateway(BaseGateway):
self.ID_VT2TIGER[local_id] = str(order.order_id) self.ID_VT2TIGER[local_id] = str(order.order_id)
self.trade_client.place_order(order) self.trade_client.place_order(order)
print("发单:", order.contract.symbol,
order.order_id, order.quantity, order.status)
except: # noqa except: # noqa
traceback.print_exc() traceback.print_exc()
@ -551,8 +547,6 @@ class TigerGateway(BaseGateway):
self.on_order(order) self.on_order(order)
self.ID_VT2TIGER = {v: k for k, v in self.ID_TIGER2VT.items()} self.ID_VT2TIGER = {v: k for k, v in self.ID_TIGER2VT.items()}
print("原始委托字典", self.ID_TIGER2VT)
print("原始反向字典", self.ID_VT2TIGER)
def process_deal(self, data): def process_deal(self, data):
""" """

View File

@ -1,19 +1,18 @@
from typing import Any, Sequence from typing import Any, Sequence
from datetime import datetime from datetime import datetime
from threading import Thread
from vnpy.api.xtp.vnxtp import ( from vnpy.api.xtp.vnxtp import (
OrderBookStruct,
XTP, XTP,
set_async_callback_exception_handler, set_async_callback_exception_handler,
AsyncDispatchException, AsyncDispatchException,
OrderBookStruct,
XTPMarketDataStruct, XTPMarketDataStruct,
XTPQuoteStaticInfo, XTPQuoteStaticInfo,
XTPRspInfoStruct, XTPRspInfoStruct,
XTPSpecificTickerStruct, XTPSpecificTickerStruct,
XTPTickByTickStruct, XTPTickByTickStruct,
XTPTickerPriceInfo, XTPTickerPriceInfo,
XTPQueryOrderReq,
XTPQueryTraderReq,
XTPOrderInsertInfo, XTPOrderInsertInfo,
XTPOrderInfo, XTPOrderInfo,
XTPTradeReport, XTPTradeReport,
@ -29,29 +28,18 @@ from vnpy.api.xtp.vnxtp import (
XTPQueryOptionAuctionInfoRsp, XTPQueryOptionAuctionInfoRsp,
XTP_EXCHANGE_TYPE, XTP_EXCHANGE_TYPE,
XTP_LOG_LEVEL, XTP_LOG_LEVEL,
XTP_TICKER_TYPE_STOCK, XTP_PROTOCOL_TYPE,
XTP_TICKER_TYPE_INDEX, XTP_TE_RESUME_TYPE,
XTP_TICKER_TYPE_FUND,
XTP_TICKER_TYPE_BOND,
XTP_TICKER_TYPE_OPTION,
XTP_PROTOCOL_TCP,
XTP_PROTOCOL_UDP,
XTP_TERT_RESTART,
XTP_SIDE_BUY, XTP_SIDE_BUY,
XTP_SIDE_SELL, XTP_SIDE_SELL,
XTP_PRICE_LIMIT, XTP_BUSINESS_TYPE,
XTP_PRICE_BEST5_OR_CANCEL, XTP_TICKER_TYPE,
XTP_BUSINESS_TYPE_CASH, XTP_MARKET_TYPE,
XTP_ORDER_STATUS_INIT, XTP_PRICE_TYPE,
XTP_ORDER_STATUS_ALLTRADED, XTP_ORDER_STATUS_TYPE
XTP_ORDER_STATUS_PARTTRADEDQUEUEING,
XTP_ORDER_STATUS_PARTTRADEDNOTQUEUEING,
XTP_ORDER_STATUS_NOTRADEQUEUEING,
XTP_ORDER_STATUS_CANCELED,
XTP_ORDER_STATUS_REJECTED
) )
from vnpy.event import EventEngine from vnpy.event import EventEngine
from vnpy.trader.event import EVENT_TIMER
from vnpy.trader.constant import Exchange, Product, Direction, OrderType, Status from vnpy.trader.constant import Exchange, Product, Direction, OrderType, Status
from vnpy.trader.gateway import BaseGateway from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import (CancelRequest, OrderRequest, SubscribeRequest, from vnpy.trader.object import (CancelRequest, OrderRequest, SubscribeRequest,
@ -59,20 +47,28 @@ from vnpy.trader.object import (CancelRequest, OrderRequest, SubscribeRequest,
PositionData, AccountData) PositionData, AccountData)
from vnpy.trader.utility import get_folder_path from vnpy.trader.utility import get_folder_path
API = XTP.API API = XTP.API
EXCHANGE_XTP2VT = { EXCHANGE_XTP2VT = {
XTP_EXCHANGE_TYPE.XTP_EXCHANGE_SH: Exchange.SSE, XTP_EXCHANGE_TYPE.XTP_EXCHANGE_SH: Exchange.SSE,
XTP_EXCHANGE_TYPE.XTP_EXCHANGE_SZ: Exchange.SZSE, XTP_EXCHANGE_TYPE.XTP_EXCHANGE_SZ: Exchange.SZSE,
} }
EXCHANGE_VT2XTP = {v: k for k, v in EXCHANGE_XTP2VT.items()} EXCHANGE_VT2XTP = {v: k for k, v in EXCHANGE_XTP2VT.items()}
MARKET_XTP2VT = {
XTP_MARKET_TYPE.XTP_MKT_SH_A: Exchange.SSE,
XTP_MARKET_TYPE.XTP_MKT_SZ_A: Exchange.SZSE
}
MARKET_VT2XTP = {v: k for k, v in MARKET_XTP2VT.items()}
PRODUCT_XTP2VT = { PRODUCT_XTP2VT = {
XTP_TICKER_TYPE_STOCK: Product.EQUITY, XTP_TICKER_TYPE.XTP_TICKER_TYPE_STOCK: Product.EQUITY,
XTP_TICKER_TYPE_INDEX: Product.INDEX, XTP_TICKER_TYPE.XTP_TICKER_TYPE_INDEX: Product.INDEX,
XTP_TICKER_TYPE_FUND: Product.FUND, XTP_TICKER_TYPE.XTP_TICKER_TYPE_FUND: Product.FUND,
XTP_TICKER_TYPE_BOND: Product.BOND, XTP_TICKER_TYPE.XTP_TICKER_TYPE_BOND: Product.BOND,
XTP_TICKER_TYPE_OPTION: Product.OPTION XTP_TICKER_TYPE.XTP_TICKER_TYPE_OPTION: Product.OPTION
} }
DIRECTION_VT2XTP = { DIRECTION_VT2XTP = {
@ -82,19 +78,19 @@ DIRECTION_VT2XTP = {
DIRECTION_XTP2VT = {v: k for k, v in DIRECTION_VT2XTP.items()} DIRECTION_XTP2VT = {v: k for k, v in DIRECTION_VT2XTP.items()}
ORDERTYPE_VT2XTP = { ORDERTYPE_VT2XTP = {
OrderType.LIMIT: XTP_PRICE_LIMIT, OrderType.LIMIT: XTP_PRICE_TYPE.XTP_PRICE_LIMIT,
OrderType.MARKET: XTP_PRICE_BEST5_OR_CANCEL OrderType.MARKET: XTP_PRICE_TYPE.XTP_PRICE_BEST5_OR_CANCEL
} }
ORDERTYPE_XTP2VT = {v: k for k, v in ORDERTYPE_VT2XTP.items()} ORDERTYPE_XTP2VT = {v: k for k, v in ORDERTYPE_VT2XTP.items()}
STATUS_XTP2VT = { STATUS_XTP2VT = {
XTP_ORDER_STATUS_INIT: Status.SUBMITTING, XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_INIT: Status.SUBMITTING,
XTP_ORDER_STATUS_ALLTRADED: Status.ALLTRADED, XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_ALLTRADED: Status.ALLTRADED,
XTP_ORDER_STATUS_PARTTRADEDQUEUEING: Status.PARTTRADED, XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_PARTTRADEDQUEUEING: Status.PARTTRADED,
XTP_ORDER_STATUS_PARTTRADEDNOTQUEUEING: Status.PARTTRADED, XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_PARTTRADEDNOTQUEUEING: Status.CANCELLED,
XTP_ORDER_STATUS_NOTRADEQUEUEING: Status.NOTTRADED, XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_NOTRADEQUEUEING: Status.NOTTRADED,
XTP_ORDER_STATUS_CANCELED: Status.CANCELLED, XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_CANCELED: Status.CANCELLED,
XTP_ORDER_STATUS_REJECTED: Status.REJECTED, XTP_ORDER_STATUS_TYPE.XTP_ORDER_STATUS_REJECTED: Status.REJECTED,
} }
@ -111,7 +107,8 @@ class XtpGateway(BaseGateway):
"行情端口": 0, "行情端口": 0,
"交易地址": "", "交易地址": "",
"交易端口": 0, "交易端口": 0,
"行情协议": ["TCP", "UDP"] "行情协议": ["TCP", "UDP"],
"授权码": ""
} }
def __init__(self, event_engine: EventEngine): def __init__(self, event_engine: EventEngine):
@ -134,11 +131,13 @@ class XtpGateway(BaseGateway):
trader_ip = setting['交易地址'] trader_ip = setting['交易地址']
trader_port = int(setting['交易端口']) trader_port = int(setting['交易端口'])
quote_protocol = setting["行情协议"] quote_protocol = setting["行情协议"]
software_key = setting["授权码"]
# self.quote_api.connect(userid, password, client_id, self.quote_api.connect(userid, password, client_id,
# quote_ip, quote_port, quote_protocol) quote_ip, quote_port, quote_protocol)
self.trader_api.connect(userid, password, client_id, self.trader_api.connect(userid, password, client_id,
trader_ip, trader_port) trader_ip, trader_port, software_key)
self.init_query()
def close(self): def close(self):
"""""" """"""
@ -165,6 +164,23 @@ class XtpGateway(BaseGateway):
"""""" """"""
self.trader_api.query_position() self.trader_api.query_position()
def process_timer_event(self, event):
""""""
self.count += 1
if self.count < 2:
return
self.count = 0
func = self.query_functions.pop(0)
func()
self.query_functions.append(func)
def init_query(self):
""""""
self.count = 0
self.query_functions = [self.query_account, self.query_position]
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def _async_callback_exception_handler(self, e: AsyncDispatchException): def _async_callback_exception_handler(self, e: AsyncDispatchException):
error_str = f"发生内部错误:\n" f"位置:{e.instance}.{e.function_name}" f"详细信息:{e.what}" error_str = f"发生内部错误:\n" f"位置:{e.instance}.{e.function_name}" f"详细信息:{e.what}"
print(error_str) print(error_str)
@ -208,9 +224,9 @@ class XtpQuoteApi(API.QuoteSpi):
self.server_port = server_port self.server_port = server_port
if quote_protocol == "CTP": if quote_protocol == "CTP":
self.quote_protocol = XTP_PROTOCOL_TCP self.quote_protocol = XTP_PROTOCOL_TYPE.XTP_PROTOCOL_TCP
else: else:
self.quote_protocol = XTP_PROTOCOL_UDP self.quote_protocol = XTP_PROTOCOL_TYPE.XTP_PROTOCOL_UDP
# Create API object # Create API object
path = str(get_folder_path(self.gateway_name.lower())) path = str(get_folder_path(self.gateway_name.lower()))
@ -225,6 +241,10 @@ class XtpQuoteApi(API.QuoteSpi):
self.gateway.write_log("行情接口初始化成功") self.gateway.write_log("行情接口初始化成功")
# Login to server # Login to server
Thread(target=self.login).start()
def login(self):
""""""
ret = self.api.Login( ret = self.api.Login(
self.server_ip, self.server_ip,
self.server_port, self.server_port,
@ -459,6 +479,7 @@ class XtpTraderApi(API.TraderSpi):
self.client_id = "" self.client_id = ""
self.server_ip = "" self.server_ip = ""
self.server_port = "" self.server_port = ""
self.software_key = ""
self.api = None self.api = None
self.session_id = 0 self.session_id = 0
@ -471,6 +492,7 @@ class XtpTraderApi(API.TraderSpi):
client_id: int, client_id: int,
server_ip: str, server_ip: str,
server_port: int, server_port: int,
software_key: str
): ):
"""""" """"""
if self.api: if self.api:
@ -481,6 +503,7 @@ class XtpTraderApi(API.TraderSpi):
self.client_id = client_id self.client_id = client_id
self.server_ip = server_ip self.server_ip = server_ip
self.server_port = server_port self.server_port = server_port
self.software_key = software_key
# Create API object # Create API object
path = str(get_folder_path(self.gateway_name.lower())) path = str(get_folder_path(self.gateway_name.lower()))
@ -492,25 +515,29 @@ class XtpTraderApi(API.TraderSpi):
) )
self.api.RegisterSpi(self) self.api.RegisterSpi(self)
self.api.SetSoftwareKey("vnpy_test") self.api.SetSoftwareKey(self.software_key)
self.api.SubscribePublicTopic(XTP_TERT_RESTART) self.api.SubscribePublicTopic(XTP_TE_RESUME_TYPE.XTP_TERT_RESTART)
self.gateway.write_log("交易接口初始化成功") self.gateway.write_log("交易接口初始化成功")
# Login to server # Login to server
Thread(target=self.login).start()
def login(self):
""""""
self.session_id = self.api.Login( self.session_id = self.api.Login(
self.server_ip, self.server_ip,
self.server_port, self.server_port,
self.userid, self.userid,
self.password, self.password,
XTP_PROTOCOL_TCP XTP_PROTOCOL_TYPE.XTP_PROTOCOL_TCP
) )
if self.session_id: if self.session_id:
msg = "交易服务器登录成功" msg = "交易服务器登录成功"
else: else:
reason = self.api.GetApiLastError() error = self.api.GetApiLastError()
msg = f"交易服务器登录失败,原因({reason.error_id}){reason.error_msg}" msg = f"交易服务器登录失败,原因{error.error_msg}"
self.gateway.write_log(msg) self.gateway.write_log(msg)
@ -522,19 +549,26 @@ class XtpTraderApi(API.TraderSpi):
def send_order(self, req: OrderRequest) -> str: def send_order(self, req: OrderRequest) -> str:
"""""" """"""
if req.exchange not in MARKET_VT2XTP:
self.gateway.write_log(f"委托失败,不支持的交易所{req.exchange.value}")
return ""
if req.type not in ORDERTYPE_VT2XTP:
self.gateway.write_log(f"委托失败,不支持的委托类型{req.type.value}")
return ""
xtp_req = XTPOrderInsertInfo() xtp_req = XTPOrderInsertInfo()
xtp_req.ticker = req.symbol xtp_req.ticker = req.symbol
xtp_req.market = EXCHANGE_XTP2VT[req.exchange] xtp_req.market = MARKET_VT2XTP[req.exchange]
xtp_req.price = req.price xtp_req.price = req.price
xtp_req.quantity = req.volume xtp_req.quantity = int(req.volume)
xtp_req.order_client_id = self.client_id xtp_req.side = DIRECTION_VT2XTP[req.direction]
xtp_req.side = DIRECTION_XTP2VT.get(req.direction, "") xtp_req.price_type = ORDERTYPE_VT2XTP[req.type]
xtp_req.price_type = ORDERTYPE_XTP2VT.get(req.type, "") xtp_req.business_type = XTP_BUSINESS_TYPE.XTP_BUSINESS_TYPE_CASH
xtp_req.business_type = XTP_BUSINESS_TYPE_CASH
orderid = self.api.InsertOrder(xtp_req, self.session_id) orderid = self.api.InsertOrder(xtp_req, self.session_id)
order = req.create_order_data(str(orderid)) order = req.create_order_data(str(orderid), self.gateway_name)
self.gateway.on_order(order) self.gateway.on_order(order)
return order.vt_orderid return order.vt_orderid
@ -545,24 +579,20 @@ class XtpTraderApi(API.TraderSpi):
def query_account(self): def query_account(self):
"""""" """"""
if not self.api:
return
self.reqid += 1 self.reqid += 1
self.api.QueryAsset(self.session_id, self.reqid) self.api.QueryAsset(self.session_id, self.reqid)
def query_position(self): def query_position(self):
"""""" """"""
if not self.api:
return
self.reqid += 1 self.reqid += 1
self.api.QueryPosition("", self.session_id, self.reqid) self.api.QueryPosition("", self.session_id, self.reqid)
def query_order(self):
""""""
self.reqid += 1
self.api.QueryOrders(XTPQueryOrderReq(), self.session_id, self.reqid)
def query_trade(self):
""""""
self.reqid += 1
self.api.QueryTrades(XTPQueryTraderReq(), self.session_id, self.reqid)
def check_error(self, func_name: str, error_info: XTPRspInfoStruct): def check_error(self, func_name: str, error_info: XTPRspInfoStruct):
"""""" """"""
if error_info and error_info.error_id: if error_info and error_info.error_id:
@ -583,16 +613,39 @@ class XtpTraderApi(API.TraderSpi):
def OnOrderEvent(self, order_info: XTPOrderInfo, error_info: XTPRspInfoStruct, def OnOrderEvent(self, order_info: XTPOrderInfo, error_info: XTPRspInfoStruct,
session_id: int) -> Any: session_id: int) -> Any:
"""""" """"""
print("on orde event") self.check_error("委托下单", error_info)
if self.check_error("委托下单", error_info):
return
self.updateOrder(order_info) order = OrderData(
symbol=order_info.ticker,
exchange=MARKET_XTP2VT[order_info.market],
orderid=str(order_info.order_xtp_id),
type=ORDERTYPE_XTP2VT[order_info.price_type],
direction=DIRECTION_XTP2VT[order_info.side],
price=order_info.price,
volume=order_info.quantity,
traded=order_info.qty_traded,
status=STATUS_XTP2VT[order_info.order_status],
time=order_info.insert_time,
gateway_name=self.gateway_name
)
self.gateway.on_order(order)
def OnTradeEvent(self, trade_info: XTPTradeReport, session_id: int) -> Any: def OnTradeEvent(self, trade_info: XTPTradeReport, session_id: int) -> Any:
"""""" """"""
print("on trade event") trade = TradeData(
self.updateTrade(trade_info) symbol=trade_info.ticker,
exchange=MARKET_XTP2VT[trade_info.market],
orderid=str(trade_info.order_xtp_id),
tradeid=str(trade_info.exec_id),
direction=DIRECTION_XTP2VT[trade_info.side],
price=trade_info.price,
volume=trade_info.quantity,
time=trade_info.trade_time,
gateway_name=self.gateway_name
)
self.gateway.on_trade(trade)
def OnCancelOrderError(self, cancel_info: XTPOrderCancelInfo, error_info: XTPRspInfoStruct, def OnCancelOrderError(self, cancel_info: XTPOrderCancelInfo, error_info: XTPRspInfoStruct,
session_id: int) -> Any: session_id: int) -> Any:
@ -604,7 +657,7 @@ class XtpTraderApi(API.TraderSpi):
"""""" """"""
if self.check_error("查询委托", error_info): if self.check_error("查询委托", error_info):
return return
self.updateOrder(order_info) self.updateOrder(order_info)
if is_last: if is_last:
@ -615,18 +668,18 @@ class XtpTraderApi(API.TraderSpi):
"""""" """"""
if self.check_error("查询成交", error_info): if self.check_error("查询成交", error_info):
return return
self.updateTrade(trade_info) self.updateTrade(trade_info)
if is_last: if is_last:
self.gateway.write_log("查询成交信息成功") self.gateway.write_log("查询成交信息成功")
def OnQueryPosition(self, xtp_position: XTPQueryStkPositionRsp, error_info: XTPRspInfoStruct, def OnQueryPosition(self, xtp_position: XTPQueryStkPositionRsp, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any: request_id: int, is_last: bool, session_id: int) -> Any:
"""""" """"""
position = PositionData( position = PositionData(
symbol=xtp_position.ticker, symbol=xtp_position.ticker,
exchange=EXCHANGE_XTP2VT[xtp_position.market], exchange=MARKET_XTP2VT[xtp_position.market],
direction=Direction.NET, direction=Direction.NET,
volume=xtp_position.total_qty, volume=xtp_position.total_qty,
frozen=xtp_position.locked_position, frozen=xtp_position.locked_position,
@ -638,12 +691,12 @@ class XtpTraderApi(API.TraderSpi):
self.gateway.on_position(position) self.gateway.on_position(position)
def OnQueryAsset(self, asset: XTPQueryAssetRsp, error_info: XTPRspInfoStruct, def OnQueryAsset(self, asset: XTPQueryAssetRsp, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any: request_id: int, is_last: bool, session_id: int) -> Any:
"""""" """"""
account = AccountData( account = AccountData(
accountid=self.userid, accountid=self.userid,
balance=asset.banlance, balance=asset.buying_power,
frozen=(asset.frozen_margin + asset.frozen_exec_cash + asset.frozen_exec_fee), frozen=asset.withholding_amount,
gateway_name=self.gateway_name gateway_name=self.gateway_name
) )
self.gateway.on_account(account) self.gateway.on_account(account)
@ -686,37 +739,3 @@ class XtpTraderApi(API.TraderSpi):
is_last: bool, session_id: int) -> Any: is_last: bool, session_id: int) -> Any:
"""""" """"""
pass pass
def updateOrder(self, order_info: XTPOrderInfo):
""""""
order = OrderData(
symbol=order_info.ticker,
exchange=EXCHANGE_XTP2VT[order_info.market],
orderid=str(order_info.order_xtp_id),
type=ORDERTYPE_XTP2VT[order_info.price_type],
direction=DIRECTION_XTP2VT[order_info.side],
price=order_info.price,
volume=order_info.quantity,
traded=order_info.qty_traded,
status=STATUS_XTP2VT[order_info.order_status],
time=order_info.insert_time,
gateway_name=self.gateway_name
)
self.gateway.on_order(order)
def updateTrade(self, trade_info: XTPTradeReport):
""""""
trade = TradeData(
symbol=trade_info.ticker,
exchange=EXCHANGE_XTP2VT[trade_info.market],
orderid=str(trade_info.order_xtp_id),
tradeid=str(trade_info.exec_id),
direction=DIRECTION_XTP2VT[trade_info.side],
price=trade_info.price,
volume=trade_info.quantity,
time=trade_info.trade_time,
gateway_name=self.gateway_name
)
self.gateway.on_trade(trade)