[Mod]complete XtpQuoteApi

This commit is contained in:
vn.py 2019-04-25 11:40:02 +08:00
parent 18d483e417
commit 72dd05dcb5
2 changed files with 333 additions and 156 deletions

View File

@ -7,8 +7,8 @@ from vnpy.trader.ui import MainWindow, create_qapp
from vnpy.gateway.bitmex import BitmexGateway
from vnpy.gateway.futu import FutuGateway
from vnpy.gateway.ib import IbGateway
# from vnpy.gateway.ctp import CtpGateway
from vnpy.gateway.femas import FemasGateway
from vnpy.gateway.ctp import CtpGateway
#from vnpy.gateway.femas import FemasGateway
from vnpy.gateway.tiger import TigerGateway
from vnpy.gateway.oes import OesGateway
from vnpy.gateway.okex import OkexGateway
@ -16,6 +16,7 @@ from vnpy.gateway.huobi import HuobiGateway
from vnpy.gateway.bitfinex import BitfinexGateway
from vnpy.gateway.onetoken import OnetokenGateway
from vnpy.gateway.okexf import OkexfGateway
from vnpy.gateway.xtp import XtpGateway
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.csv_loader import CsvLoaderApp
@ -30,8 +31,9 @@ def main():
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
# main_engine.add_gateway(CtpGateway)
main_engine.add_gateway(FemasGateway)
main_engine.add_gateway(XtpGateway)
main_engine.add_gateway(CtpGateway)
# main_engine.add_gateway(FemasGateway)
main_engine.add_gateway(IbGateway)
main_engine.add_gateway(FutuGateway)
main_engine.add_gateway(BitmexGateway)

View File

@ -1,184 +1,94 @@
from typing import Any, Sequence
from vnpy.api.xtp.vnxtp import (OrderBookStruct, XTP, XTPMarketDataStruct, XTPQuoteStaticInfo,
XTPRspInfoStruct, XTPSpecificTickerStruct, XTPTickByTickStruct,
XTPTickerPriceInfo, XTP_EXCHANGE_TYPE, XTP_LOG_LEVEL,
XTP_PROTOCOL_TYPE)
from vnpy.api.xtp.vnxtp import (
OrderBookStruct,
XTP,
XTPMarketDataStruct,
XTPQuoteStaticInfo,
XTPRspInfoStruct,
XTPSpecificTickerStruct,
XTPTickByTickStruct,
XTPTickerPriceInfo,
XTP_EXCHANGE_TYPE,
XTP_LOG_LEVEL,
XTP_PROTOCOL_TYPE,
XTP_TICKER_TYPE_STOCK,
XTP_TICKER_TYPE_INDEX,
XTP_TICKER_TYPE_FUND,
XTP_TICKER_TYPE_BOND,
XTP_TICKER_TYPE_OPTION,
XTP_PROTOCOL_TCP,
XTP_PROTOCOL_UDP
)
from vnpy.event import EventEngine
from vnpy.trader.constant import Exchange
from vnpy.trader.constant import Exchange, Product
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import CancelRequest, OrderRequest, SubscribeRequest
from vnpy.trader.object import (CancelRequest, OrderRequest, SubscribeRequest,
TickData, ContractData, OrderData, TradeData,
PositionData, AccountData)
from vnpy.trader.utility import get_folder_path
API = XTP.API
EXCHANGE_XTP2VT = {
XTP_EXCHANGE_TYPE.XTP_EXCHANGE_SH: Exchange.SSE,
XTP_EXCHANGE_TYPE.XTP_EXCHANGE_SZ: Exchange.SZSE,
}
EXCHANGE_VT2XTP = {v: k for k, v in EXCHANGE_XTP2VT.items()}
PRODUCT_XTP2VT = {
XTP_TICKER_TYPE_STOCK: Product.EQUITY,
XTP_TICKER_TYPE_INDEX: Product.INDEX,
XTP_TICKER_TYPE_FUND: Product.FUND,
XTP_TICKER_TYPE_BOND: Product.BOND,
XTP_TICKER_TYPE_OPTION: Product.OPTION
}
class QuoteSpi(API.QuoteSpi):
def OnDisconnected(self, reason: int) -> Any:
print("OnDisconnected")
return super().OnDisconnected(reason)
def OnError(self, error_info: XTPRspInfoStruct) -> Any:
return super().OnError(error_info)
def OnSubMarketData(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
print("OnSubMarketData")
return super().OnSubMarketData(ticker, error_info, is_last)
def OnUnSubMarketData(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
return super().OnUnSubMarketData(ticker, error_info, is_last)
def OnDepthMarketData(self, market_data: XTPMarketDataStruct, bid1_qty: Sequence[int],
bid1_count: int, max_bid1_count: int, ask1_qty: Sequence[int],
ask1_count: int, max_ask1_count: int) -> Any:
print("OnDepthMarketData")
return super().OnDepthMarketData(market_data, bid1_qty, bid1_count, max_bid1_count,
ask1_qty, ask1_count, max_ask1_count)
def OnSubOrderBook(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
return super().OnSubOrderBook(ticker, error_info, is_last)
def OnUnSubOrderBook(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
return super().OnUnSubOrderBook(ticker, error_info, is_last)
def OnOrderBook(self, order_book: OrderBookStruct) -> Any:
return super().OnOrderBook(order_book)
def OnSubTickByTick(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
return super().OnSubTickByTick(ticker, error_info, is_last)
def OnUnSubTickByTick(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
return super().OnUnSubTickByTick(ticker, error_info, is_last)
def OnTickByTick(self, tbt_data: XTPTickByTickStruct) -> Any:
return super().OnTickByTick(tbt_data)
def OnSubscribeAllMarketData(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnSubscribeAllMarketData(exchange_id, error_info)
def OnUnSubscribeAllMarketData(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnUnSubscribeAllMarketData(exchange_id, error_info)
def OnSubscribeAllOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnSubscribeAllOrderBook(exchange_id, error_info)
def OnUnSubscribeAllOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnUnSubscribeAllOrderBook(exchange_id, error_info)
def OnSubscribeAllTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnSubscribeAllTickByTick(exchange_id, error_info)
def OnUnSubscribeAllTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnUnSubscribeAllTickByTick(exchange_id, error_info)
def OnQueryAllTickers(self, ticker_info: XTPQuoteStaticInfo, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
return super().OnQueryAllTickers(ticker_info, error_info, is_last)
def OnQueryTickersPriceInfo(self, ticker_info: XTPTickerPriceInfo, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
return super().OnQueryTickersPriceInfo(ticker_info, error_info, is_last)
def OnSubscribeAllOptionMarketData(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnSubscribeAllOptionMarketData(exchange_id, error_info)
def OnUnSubscribeAllOptionMarketData(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnUnSubscribeAllOptionMarketData(exchange_id, error_info)
def OnSubscribeAllOptionOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnSubscribeAllOptionOrderBook(exchange_id, error_info)
def OnUnSubscribeAllOptionOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnUnSubscribeAllOptionOrderBook(exchange_id, error_info)
def OnSubscribeAllOptionTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnSubscribeAllOptionTickByTick(exchange_id, error_info)
def OnUnSubscribeAllOptionTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
return super().OnUnSubscribeAllOptionTickByTick(exchange_id, error_info)
symbol_name_map = {}
class XtpGateway(BaseGateway):
def __init__(self, event_engine: "EventEngine"):
self.client_id: int = 1
self.quote_api = API.QuoteApi.CreateQuoteApi(
self.client_id, # todo: change client id
"log", # todo: use vnpy temp path
XTP_LOG_LEVEL.XTP_LOG_LEVEL_TRACE
)
self.quote_spi = QuoteSpi()
super().__init__(event_engine, "XTP")
default_setting = {
"client_id": "0",
"quote_server_ip": "",
"quote_server_port": "",
"quote_server_protocol": ["TCP", "UDP"],
"quote_userid": "",
"quote_password": "",
"账号": "",
"密码": "",
"客户号": 1,
"行情地址": "",
"行情端口": 0,
"交易地址": "",
"交易端口": 0,
"行情协议": ["TCP", "UDP"]
}
def __init__(self, event_engine: EventEngine):
""""""
super().__init__(event_engine, "XTP")
self.quote_api = XtpQuoteApi(self)
def connect(self, setting: dict):
self.client_id = int(setting['client_id'])
quote_server_ip = setting['quote_server_ip']
quote_server_port = int(setting['quote_server_port'])
quote_server_protocol = setting['quote_server_protocol']
quote_userid = setting['quote_userid']
quote_password = setting['quote_password']
""""""
userid = setting['账号']
password = setting['密码']
client_id = setting['客户号']
quote_ip = setting['行情地址']
quote_port = setting['行情端口']
trade_ip = setting['交易地址']
trade_port = setting['交易端口']
quote_protocol = setting["行情协议"]
quote_protocol = XTP_PROTOCOL_TYPE.XTP_PROTOCOL_TCP if quote_server_protocol == 'TCP' else 'UDP'
self.quote_api.RegisterSpi(self.quote_spi)
# self.quote_api.SetHeartBeatInterval(60)
ret = self.quote_api.Login(
quote_server_ip,
quote_server_port,
quote_userid,
quote_password,
quote_protocol
)
if ret == 0:
# login succeed
self.write_log("Login succeed.")
pass
self.quote_api.connect(userid, password, client_id,
quote_ip, quote_port, quote_protocol)
def close(self):
""""""
pass
def subscribe(self, req: SubscribeRequest):
ret = self.quote_api.SubscribeMarketData(
[req.symbol],
EXCHANGE_VT2XTP[req.exchange],
)
if ret != 0:
print("订阅行情失败") # improve: return True or False, or raise with reason
pass
""""""
self.quote_api.subscrbie(req)
def send_order(self, req: OrderRequest) -> str:
pass
@ -191,3 +101,268 @@ class XtpGateway(BaseGateway):
def query_position(self):
pass
class XtpQuoteApi(API.QuoteSpi):
def __init__(self, gateway: BaseGateway):
""""""
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.userid = ""
self.password = ""
self.client_id = ""
self.server_ip = ""
self.server_port = ""
self.server_protocol = ""
self.api = None
def connect(
self,
userid: str,
password: str,
client_id: str,
server_ip: str,
server_port: str,
quote_protocol: str
):
""""""
if self.api:
return
self.userid = userid
self.password = password
self.client_id = client_id
self.server_ip = server_ip
self.server_port = server_port
if quote_protocol == "CTP":
self.quote_protocol = XTP_PROTOCOL_TCP
else:
self.quote_protocol = XTP_PROTOCOL_UDP
# Create API object
path = str(get_folder_path(self.gateway_name.lower()))
self.api = API.QuoteApi.CreateQuoteApi(
self.client_id,
path,
XTP_LOG_LEVEL.XTP_LOG_LEVEL_TRACE
)
self.api.RegisterSpi(self)
self.gateway.write_log("行情接口初始化成功")
# Login to server
ret = self.api.Login(
self.server_ip,
self.server_port,
self.userid,
self.password,
self.quote_protocol
)
if not ret:
msg = "行情服务器登录成功"
self.query_contract()
else:
msg = f"行情服务器登录失败,原因:{ret}"
self.gateway.write_log(msg)
def subscrbie(self, req: SubscribeRequest):
""""""
xtp_exchange = EXCHANGE_VT2XTP.get(req.exchange, "")
self.api.SubscribeMarketData([req.symbol], xtp_exchange)
def query_contract(self):
""""""
for exchange_id in EXCHANGE_XTP2VT.keys():
self.api.QueryAllTickers(exchange_id)
def check_error(self, func_name: str, error_info: XTPRspInfoStruct):
""""""
if error_info.error_id:
msg = f"{func_name}发生错误, 代码:{error_info.error_id},信息:{error_info.error_msg}"
self.gateway.write_log(msg)
return True
else:
return False
def OnDisconnected(self, reason: int) -> Any:
""""""
self.gateway.write_log("行情服务器连接断开")
def OnError(self, error_info: XTPRspInfoStruct) -> Any:
""""""
self.check_error("行情接口", error_info)
def OnSubMarketData(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
""""""
self.check_error("订阅行情", error_info)
def OnUnSubMarketData(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
""""""
pass
def OnDepthMarketData(self, market_data: XTPMarketDataStruct, bid1_qty: Sequence[int],
bid1_count: int, max_bid1_count: int, ask1_qty: Sequence[int],
ask1_count: int, max_ask1_count: int) -> Any:
""""""
timestamp = market_data.date_time
tick = TickData(
symbol=market_data.ticker,
exchange=EXCHANGE_XTP2VT[market_data.exchange_id],
datetime=timestamp,
volume=market_data.qty,
last_price=market_data.last_price,
limit_up=market_data.upper_limit_price,
limit_down=market_data.lower_limit_price,
open_price=market_data.open_price,
high_price=market_data.high_price,
low_price=market_data.low_price,
pre_close=market_data.pre_close_price,
bid_price_1=market_data.bid[0],
bid_price_2=market_data.bid[1],
bid_price_3=market_data.bid[2],
bid_price_4=market_data.bid[3],
bid_price_5=market_data.bid[4],
ask_price_1=market_data.ask[0],
ask_price_2=market_data.ask[1],
ask_price_3=market_data.ask[2],
ask_price_4=market_data.ask[3],
ask_price_5=market_data.ask[4],
bid_volume_1=market_data.bid_qty[0],
bid_volume_2=market_data.bid_qty[1],
bid_volume_3=market_data.bid_qty[2],
bid_volume_4=market_data.bid_qty[3],
bid_volume_5=market_data.bid_qty[4],
ask_volume_1=market_data.ask_qty[0],
ask_volume_2=market_data.ask_qty[1],
ask_volume_3=market_data.ask_qty[2],
ask_volume_4=market_data.ask_qty[3],
ask_volume_5=market_data.ask_qty[4],
gateway_name=self.gateway_name
)
tick.name = symbol_name_map.get(tick.vt_symbol, tick.symbol)
self.gateway.on_tick(tick)
def OnSubOrderBook(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
""""""
pass
def OnUnSubOrderBook(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
""""""
pass
def OnOrderBook(self, order_book: OrderBookStruct) -> Any:
""""""
pass
def OnSubTickByTick(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
""""""
pass
def OnUnSubTickByTick(self, ticker: XTPSpecificTickerStruct, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
""""""
pass
def OnTickByTick(self, tbt_data: XTPTickByTickStruct) -> Any:
""""""
pass
def OnSubscribeAllMarketData(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnUnSubscribeAllMarketData(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnSubscribeAllOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnUnSubscribeAllOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnSubscribeAllTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnUnSubscribeAllTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnQueryAllTickers(self, ticker_info: XTPQuoteStaticInfo, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
""""""
return
# if self.check_error("查询合约", error_info):
# return
# contract = ContractData(
# symbol=ticker_info.ticker,
# exchange=EXCHANGE_XTP2VT[ticker_info.exchange_id],
# name=ticker_info.ticker_name,
# product=PRODUCT_XTP2VT[ticker_info.ticker_type],
# size=1,
# pricetick=ticker_info.pricetick,
# min_volume=ticker_info.buy_qty_unit,
# gateway_name=self.gateway_name
# )
# self.gateway.on_contract(contract)
# symbol_name_map[contract.vt_symbol] = contract.name
def OnQueryTickersPriceInfo(self, ticker_info: XTPTickerPriceInfo, error_info: XTPRspInfoStruct,
is_last: bool) -> Any:
""""""
pass
def OnSubscribeAllOptionMarketData(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnUnSubscribeAllOptionMarketData(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnSubscribeAllOptionOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnUnSubscribeAllOptionOrderBook(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnSubscribeAllOptionTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass
def OnUnSubscribeAllOptionTickByTick(self, exchange_id: XTP_EXCHANGE_TYPE,
error_info: XTPRspInfoStruct) -> Any:
""""""
pass