[Mod]add more function of XtpTraderApi

This commit is contained in:
vn.py 2019-04-25 20:14:40 +08:00
parent 3853b47e47
commit e0ea44e286

View File

@ -1,8 +1,11 @@
from typing import Any, Sequence
from datetime import datetime
from vnpy.api.xtp.vnxtp import (
OrderBookStruct,
XTP,
set_async_callback_exception_handler,
AsyncDispatchException,
XTPMarketDataStruct,
XTPQuoteStaticInfo,
XTPRspInfoStruct,
@ -12,9 +15,20 @@ from vnpy.api.xtp.vnxtp import (
XTPQueryOrderReq,
XTPQueryTraderReq,
XTPOrderInsertInfo,
XTPOrderInfo,
XTPTradeReport,
XTPOrderCancelInfo,
XTPQueryStkPositionRsp,
XTPQueryAssetRsp,
XTPStructuredFundInfo,
XTPFundTransferNotice,
XTPQueryETFBaseRsp,
XTPQueryETFComponentRsp,
XTPQueryIPOTickerRsp,
XTPQueryIPOQuotaRsp,
XTPQueryOptionAuctionInfoRsp,
XTP_EXCHANGE_TYPE,
XTP_LOG_LEVEL,
XTP_PROTOCOL_TYPE,
XTP_TICKER_TYPE_STOCK,
XTP_TICKER_TYPE_INDEX,
XTP_TICKER_TYPE_FUND,
@ -27,11 +41,18 @@ from vnpy.api.xtp.vnxtp import (
XTP_SIDE_SELL,
XTP_PRICE_LIMIT,
XTP_PRICE_BEST5_OR_CANCEL,
XTP_BUSINESS_TYPE_CASH
XTP_BUSINESS_TYPE_CASH,
XTP_ORDER_STATUS_INIT,
XTP_ORDER_STATUS_ALLTRADED,
XTP_ORDER_STATUS_PARTTRADEDQUEUEING,
XTP_ORDER_STATUS_PARTTRADEDNOTQUEUEING,
XTP_ORDER_STATUS_NOTRADEQUEUEING,
XTP_ORDER_STATUS_CANCELED,
XTP_ORDER_STATUS_REJECTED
)
from vnpy.event import EventEngine
from vnpy.trader.constant import Exchange, Product, Direction, OrderType
from vnpy.trader.constant import Exchange, Product, Direction, OrderType, Status
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import (CancelRequest, OrderRequest, SubscribeRequest,
TickData, ContractData, OrderData, TradeData,
@ -66,6 +87,16 @@ ORDERTYPE_VT2XTP = {
}
ORDERTYPE_XTP2VT = {v: k for k, v in ORDERTYPE_VT2XTP.items()}
STATUS_XTP2VT = {
XTP_ORDER_STATUS_INIT: Status.SUBMITTING,
XTP_ORDER_STATUS_ALLTRADED: Status.ALLTRADED,
XTP_ORDER_STATUS_PARTTRADEDQUEUEING: Status.PARTTRADED,
XTP_ORDER_STATUS_PARTTRADEDNOTQUEUEING: Status.PARTTRADED,
XTP_ORDER_STATUS_NOTRADEQUEUEING: Status.NOTTRADED,
XTP_ORDER_STATUS_CANCELED: Status.CANCELLED,
XTP_ORDER_STATUS_REJECTED: Status.REJECTED,
}
symbol_name_map = {}
@ -90,6 +121,9 @@ class XtpGateway(BaseGateway):
self.quote_api = XtpQuoteApi(self)
self.trader_api = XtpTraderApi(self)
set_async_callback_exception_handler(
self._async_callback_exception_handler)
def connect(self, setting: dict):
""""""
userid = setting['账号']
@ -101,8 +135,8 @@ class XtpGateway(BaseGateway):
trader_port = setting['交易端口']
quote_protocol = setting["行情协议"]
self.quote_api.connect(userid, password, client_id,
quote_ip, quote_port, quote_protocol)
# self.quote_api.connect(userid, password, client_id,
# quote_ip, quote_port, quote_protocol)
self.trader_api.connect(userid, password, client_id,
trader_ip, trader_port)
@ -131,6 +165,10 @@ class XtpGateway(BaseGateway):
""""""
self.trader_api.query_position()
def _async_callback_exception_handler(self, e: AsyncDispatchException):
error_str = f"发生内部错误:\n" f"位置:{e.instance}.{e.function_name}" f"详细信息:{e.what}"
print(error_str)
class XtpQuoteApi(API.QuoteSpi):
@ -220,7 +258,7 @@ class XtpQuoteApi(API.QuoteSpi):
def check_error(self, func_name: str, error_info: XTPRspInfoStruct):
""""""
if error_info.error_id:
if error_info and error_info.error_id:
msg = f"{func_name}发生错误, 代码:{error_info.error_id},信息:{error_info.error_msg}"
self.gateway.write_log(msg)
return True
@ -249,12 +287,13 @@ class XtpQuoteApi(API.QuoteSpi):
bid1_count: int, max_bid1_count: int, ask1_qty: Sequence[int],
ask1_count: int, max_ask1_count: int) -> Any:
""""""
timestamp = market_data.date_time
timestamp = str(market_data.data_time)
dt = datetime.strptime(timestamp, "%Y%m%d%H%M%S%f")
tick = TickData(
symbol=market_data.ticker,
exchange=EXCHANGE_XTP2VT[market_data.exchange_id],
datetime=timestamp,
datetime=dt,
volume=market_data.qty,
last_price=market_data.last_price,
limit_up=market_data.upper_limit_price,
@ -359,7 +398,7 @@ class XtpQuoteApi(API.QuoteSpi):
name=ticker_info.ticker_name,
product=PRODUCT_XTP2VT[ticker_info.ticker_type],
size=1,
pricetick=ticker_info.pricetick,
pricetick=ticker_info.price_tick,
min_volume=ticker_info.buy_qty_unit,
gateway_name=self.gateway_name
)
@ -367,6 +406,9 @@ class XtpQuoteApi(API.QuoteSpi):
symbol_name_map[contract.vt_symbol] = contract.name
if is_last:
self.gateway.write_log(f"{contract.exchange.value}合约信息查询成功")
def OnQueryTickersPriceInfo(self, ticker_info: XTPTickerPriceInfo, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
""""""
@ -420,9 +462,7 @@ class XtpTraderApi(API.TraderSpi):
self.api = None
self.session_id = 0
self.reqid = 0
self.orderid = 0
def connect(
self,
@ -452,8 +492,9 @@ class XtpTraderApi(API.TraderSpi):
)
self.api.RegisterSpi(self)
self.api.SetSoftwareKey("vnpy_test")
self.api.SubscribePublicTopic(XTP_TERT_RESTART)
self.gateway.write_log("交易接口初始化成功")
# Login to server
@ -464,7 +505,7 @@ class XtpTraderApi(API.TraderSpi):
self.password,
XTP_PROTOCOL_TCP
)
if self.session_id:
msg = "交易服务器登录成功"
else:
@ -481,29 +522,26 @@ class XtpTraderApi(API.TraderSpi):
def send_order(self, req: OrderRequest) -> str:
""""""
self.orderid += 1
xtp_req = XTPOrderInsertInfo()
xtp_req.ticker = req.symbol
xtp_req.market = EXCHANGE_XTP2VT[req.exchange]
xtp_req.price = req.price
xtp_req.quantity = req.volume
xtp_req.order_client_id = self.client_id
xtp_req.order_xtp_id = self.orderid
xtp_req.side = DIRECTION_XTP2VT.get(req.direction, "")
xtp_req.price_type = ORDERTYPE_XTP2VT.get(req.type, "")
xtp_req.business_type = XTP_BUSINESS_TYPE_CASH
self.api.InsertOrder(xtp_req, self.session_id)
orderid = self.api.InsertOrder(xtp_req, self.session_id)
order = req.create_order_data(str(self.orderid))
order = req.create_order_data(str(orderid))
self.gateway.on_order(order)
return order.vt_orderid
def cancel_order(self, req: CancelRequest):
""""""
self.api.CancelOrder(req.orderid, self.session_id)
self.api.CancelOrder(int(req.orderid), self.session_id)
def query_account(self):
""""""
@ -524,3 +562,161 @@ class XtpTraderApi(API.TraderSpi):
""""""
self.reqid += 1
self.api.QueryTrades(XTPQueryTraderReq(), self.session_id, self.reqid)
def check_error(self, func_name: str, error_info: XTPRspInfoStruct):
""""""
if error_info and error_info.error_id:
msg = f"{func_name}发生错误, 代码:{error_info.error_id},信息:{error_info.error_msg}"
self.gateway.write_log(msg)
return True
else:
return False
def OnDisconnected(self, session_id: int, reason: int) -> Any:
""""""
self.gateway.write_log("交易服务器连接断开")
def OnError(self, error_info: XTPRspInfoStruct) -> Any:
""""""
self.check_error("交易接口", error_info)
def OnOrderEvent(self, order_info: XTPOrderInfo, error_info: XTPRspInfoStruct,
session_id: int) -> Any:
""""""
print("on orde event")
if self.check_error("委托下单", error_info):
return
self.updateOrder(order_info)
def OnTradeEvent(self, trade_info: XTPTradeReport, session_id: int) -> Any:
""""""
print("on trade event")
self.updateTrade(trade_info)
def OnCancelOrderError(self, cancel_info: XTPOrderCancelInfo, error_info: XTPRspInfoStruct,
session_id: int) -> Any:
""""""
self.check_error("委托撤单", error_info)
def OnQueryOrder(self, order_info: XTPOrderInfo, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
if self.check_error("查询委托", error_info):
return
self.updateOrder(order_info)
if is_last:
self.gateway.write_log("查询委托信息成功")
def OnQueryTrade(self, trade_info: XTPTradeReport, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
if self.check_error("查询成交", error_info):
return
self.updateTrade(trade_info)
if is_last:
self.gateway.write_log("查询成交信息成功")
def OnQueryPosition(self, xtp_position: XTPQueryStkPositionRsp, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
position = PositionData(
symbol=xtp_position.ticker,
exchange=EXCHANGE_XTP2VT[xtp_position.market],
direction=Direction.NET,
volume=xtp_position.total_qty,
frozen=xtp_position.locked_position,
price=xtp_position.avg_price,
pnl=xtp_position.unrealized_pnl,
yd_volume=xtp_position.yesterday_position,
gateway_name=self.gateway_name
)
self.gateway.on_position(position)
def OnQueryAsset(self, asset: XTPQueryAssetRsp, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
account = AccountData(
accountid=self.userid,
balance=asset.banlance,
frozen=(asset.frozen_margin + asset.frozen_exec_cash + asset.frozen_exec_fee),
gateway_name=self.gateway_name
)
self.gateway.on_account(account)
def OnQueryStructuredFund(self, fund_info: XTPStructuredFundInfo, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
pass
def OnQueryFundTransfer(self, fund_transfer_info: XTPFundTransferNotice, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
pass
def OnFundTransfer(self, fund_transfer_info: XTPFundTransferNotice, session_id: int) -> Any:
""""""
pass
def OnQueryETF(self, etf_info: XTPQueryETFBaseRsp, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
pass
def OnQueryETFBasket(self, etf_component_info: XTPQueryETFComponentRsp, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
pass
def OnQueryIPOInfoList(self, ipo_info: XTPQueryIPOTickerRsp, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
pass
def OnQueryIPOQuotaInfo(self, quota_info: XTPQueryIPOQuotaRsp, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
pass
def OnQueryOptionAuctionInfo(self, option_info: XTPQueryOptionAuctionInfoRsp, error_info: XTPRspInfoStruct,
is_last: bool, session_id: int) -> Any:
""""""
pass
def updateOrder(self, order_info: XTPOrderInfo):
""""""
order = OrderData(
symbol=order_info.ticker,
exchange=EXCHANGE_XTP2VT[order_info.market],
orderid=str(order_info.order_xtp_id),
type=ORDERTYPE_XTP2VT[order_info.price_type],
direction=DIRECTION_XTP2VT[order_info.side],
price=order_info.price,
volume=order_info.quantity,
traded=order_info.qty_traded,
status=STATUS_XTP2VT[order_info.order_status],
time=order_info.insert_time,
gateway_name=self.gateway_name
)
self.gateway.on_order(order)
def updateTrade(self, trade_info: XTPTradeReport):
""""""
trade = TradeData(
symbol=trade_info.ticker,
exchange=EXCHANGE_XTP2VT[trade_info.market],
orderid=str(trade_info.order_xtp_id),
tradeid=str(trade_info.exec_id),
direction=DIRECTION_XTP2VT[trade_info.side],
price=trade_info.price,
volume=trade_info.quantity,
time=trade_info.trade_time,
gateway_name=self.gateway_name
)
self.gateway.on_trade(trade)