Merge pull request #1468 from 1122455801/tiger_gateway_version_05

[Mod] tiger_gateway
This commit is contained in:
vn.py 2019-03-11 22:16:29 +08:00 committed by GitHub
commit 6e985e5492
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -4,19 +4,15 @@ Please install tiger-api before use.
pip install tigeropen
"""
import time
from copy import copy
from threading import Thread
from datetime import datetime
from multiprocessing.dummy import Pool
from queue import Empty, Queue
import functools
import pandas as pd
from pandas import DataFrame
from tigeropen.tiger_open_config import TigerOpenClientConfig
from tigeropen.common.consts import Language, Currency, Market
from tigeropen.quote.quote_client import QuoteClient
@ -39,7 +35,6 @@ from vnpy.trader.object import (
CancelRequest,
)
PRODUCT_VT2TIGER = {
Product.EQUITY: "STK",
Product.OPTION: "OPT",
@ -47,7 +42,7 @@ PRODUCT_VT2TIGER = {
Product.WARRANT: "IOPT",
Product.FUTURES: "FUT",
Product.OPTION: "FOP",
Product.FOREX: "CASH"
Product.FOREX: "CASH",
}
DIRECTION_VT2TIGER = {
@ -75,7 +70,7 @@ STATUS_TIGER2VT = {
ORDER_STATUS.CANCELLED: Status.CANCELLED,
ORDER_STATUS.PENDING_CANCEL: Status.CANCELLED,
ORDER_STATUS.REJECTED: Status.REJECTED,
ORDER_STATUS.EXPIRED: Status.NOTTRADED
ORDER_STATUS.EXPIRED: Status.NOTTRADED,
}
PUSH_STATUS_TIGER2VT = {
@ -86,26 +81,23 @@ PUSH_STATUS_TIGER2VT = {
"Submitted": Status.SUBMITTING,
"PendingSubmit": Status.SUBMITTING,
"Filled": Status.ALLTRADED,
"Inactive": Status.REJECTED
"Inactive": Status.REJECTED,
}
# "private_key": "MIICXQIBAAKBgQC1amZa5YsGTklry7DAsUBOwXJCgrsZZtB21PImw/yLmrbqRfsS3vawvMigLWcCwIDnHa+hpdpeze0eHIwbZzJzUDGvRALYK9t3D8pwPVxpwX1OF8RfHCM7YQvSOvPPnHHuVQvKaR7NNm1/WmvGXC9kVJdkYQ7kCmh52siFoy1MLQIDAQABAoGAVabcmIHTt7ByncBXvUJymDxhE+HhMEcImXJEueTCca8kOUu9FNXMJvmax3VoMzZsJbIwX+OMTEJxd0wHIlEA0gECjDwFK4Q42q+ptO4QABJQVSC6I+dOt2OIY28uvT3rkenOO8KRIDt4F52PFd71ZdB1aaXixORORq1MdSLi8EkCQQDiviAB+L5R/HVxwxvqZfJ530OtFd5IipZC9YZlY1CtXWCmu89LK7UUlEuNXyGsOxyz5jLqFuNRsie5AC23tfEPAkEAzNMCa8axJWfPZIH4tGrbZ1F3I41BQdgp2zBmR7AyUMBDkli86OzmJ7QUCJA/PJxK43/IYUWm4OU5Q+SvXCr3AwJBAJTBj1Y7zwES1CpSitn5EF+MbmX71t1YrsQ3OHkD80YJ4QMCbDkw75gUwox5QSoxjd8ow3Z4laJfc1gYGeZQ41kCQQCCiQwm8cceBq3W6To+iUdw7itWngRz2Ta7uXnFwFYgvpeR4jnq3GfF7+9AkeWrVBQqLtrem0xCUfQP/+N+gudPAkBFLbt78/MpQGEDc7jyu/KE5Mp4wMMDQQwch9VLvsAZwWLysB6rZWpo3jIfp9zZ7c3zOYGNMWAZjtMmNkRJ8COH",
class TigerGateway(BaseGateway):
""""""
default_setting = {
"tiger_id": "20150008",
"account": "DU575568",
"standard_account": "DU575568"
"tiger_id": "",
"account": "",
"standard_account": "",
}
def __init__(self, event_engine):
"""Constructor"""
super(TigerGateway, self).__init__(event_engine, "TIGER")
self.private_key = "MIICXQIBAAKBgQC1amZa5YsGTklry7DAsUBOwXJCgrsZZtB21PImw/yLmrbqRfsS3vawvMigLWcCwIDnHa+hpdpeze0eHIwbZzJzUDGvRALYK9t3D8pwPVxpwX1OF8RfHCM7YQvSOvPPnHHuVQvKaR7NNm1/WmvGXC9kVJdkYQ7kCmh52siFoy1MLQIDAQABAoGAVabcmIHTt7ByncBXvUJymDxhE+HhMEcImXJEueTCca8kOUu9FNXMJvmax3VoMzZsJbIwX+OMTEJxd0wHIlEA0gECjDwFK4Q42q+ptO4QABJQVSC6I+dOt2OIY28uvT3rkenOO8KRIDt4F52PFd71ZdB1aaXixORORq1MdSLi8EkCQQDiviAB+L5R/HVxwxvqZfJ530OtFd5IipZC9YZlY1CtXWCmu89LK7UUlEuNXyGsOxyz5jLqFuNRsie5AC23tfEPAkEAzNMCa8axJWfPZIH4tGrbZ1F3I41BQdgp2zBmR7AyUMBDkli86OzmJ7QUCJA/PJxK43/IYUWm4OU5Q+SvXCr3AwJBAJTBj1Y7zwES1CpSitn5EF+MbmX71t1YrsQ3OHkD80YJ4QMCbDkw75gUwox5QSoxjd8ow3Z4laJfc1gYGeZQ41kCQQCCiQwm8cceBq3W6To+iUdw7itWngRz2Ta7uXnFwFYgvpeR4jnq3GfF7+9AkeWrVBQqLtrem0xCUfQP/+N+gudPAkBFLbt78/MpQGEDc7jyu/KE5Mp4wMMDQQwch9VLvsAZwWLysB6rZWpo3jIfp9zZ7c3zOYGNMWAZjtMmNkRJ8COH"
self.tiger_key_2005 = "MIICXAIBAAKBgQC2s9fGSfp86pYpK/9FFtdzZXcpncxDMaWww9WPPn2EnZC9zqIamz4nUewDGgya33VgoHNL7a3iGNCe4zqivhr8k1ACG68psElaRjALl1UzdAMv4xwnrxpceTCgA9AZM8x+BmVXvO5cfgIfGdoahtdxjMNjIYDkx+HORGJ1cFcmrQIDAQABAoGBAIwf8uYJ5yvXX8PEEsyScDv5HiO0+uyuLz4bdLegXfRQRKrOyFVPq6PMmQ7n87L0n7m0VbluWWaHUboK3PXkiBzTsmx0aFS3aNyr203QGXXwp9hxF2WS968/6K2zSikaDrmSkWps5dVVqhnkJ6STj7cvM6ZGYIHWPC7W79qTYHihAkEA3FvFSznaTTajvZpHq83rrCh3wmI2ggeh1M1i89HAv0EfTLkWweyNM8qO39qeaGzB/TZiOal0LR8Mk7HbGPOTVwJBANRA4LyhwczHEdwH16n5QPcrogoSsPM6uq9ZL8zYwaMTcHvEJUhW6hUMQPyWcNtenH9mwcgRF78TFGLqIH9s95sCP1bv3ebP7FCKPg+Pzrb5hwFk9dq65MZoPHC4l1Gab3EFQFQEsfXQXeURBU1L8zM/tUkxK4+US0GB/nRGtyog7wJBALU4a2lCpqgDc4EshPsP4GLosyHskX4qL4hVGpXIn5NvnoNdlgNsidHMs5O1ksgJwI6aGmuKBH9Ud/x4L6T8UW8CQEbrCa3/vIv5mHzGe9G7ZsK5VaPx3VETSeRbDUai8KGpcMXX7nFUnhsBd7YvehOSwRSd5SCWrZuejhIdn5V7hYM="
self.private_key = ""
self.tiger_id = ""
self.account = ""
self.standard_account = ""
@ -116,6 +108,8 @@ class TigerGateway(BaseGateway):
self.quote_client = None
self.push_client = None
self.tradeid = 0
self.active = False
self.queue = Queue()
self.pool = None
@ -129,7 +123,7 @@ class TigerGateway(BaseGateway):
""""""
while self.active:
try:
func, arg = self.queue.get(timeout=1)
func, arg = self.queue.get(timeout=0.1)
if arg:
func(arg)
else:
@ -156,14 +150,14 @@ class TigerGateway(BaseGateway):
self.pool.apply_async(self.run)
# Put connect task into quque.
self.get_client_config()
self.init_client_config()
self.add_task(self.connect_quote)
self.add_task(self.connect_trade)
self.add_task(self.connect_push)
self.thread.start()
# self.thread.start()
def get_client_config(self, sandbox=True):
def init_client_config(self, sandbox=True):
""""""
self.client_config = TigerOpenClientConfig(sandbox_debug=sandbox)
self.client_config.private_key = self.private_key
@ -172,7 +166,6 @@ class TigerGateway(BaseGateway):
self.client_config.standard_account = self.standard_account
self.client_config.paper_account = self.paper_account
self.client_config.language = self.language
return self.client_config
def connect_quote(self):
"""
@ -186,9 +179,9 @@ class TigerGateway(BaseGateway):
return
if self.symbol_names:
self.write_log("行情接口连接成功")
self.add_task(self.query_contract)
self.write_log("行情接口连接成功")
def connect_trade(self):
"""
@ -202,12 +195,12 @@ class TigerGateway(BaseGateway):
return
if data:
self.write_log("交易接口连接成功")
self.add_task(self.query_order)
self.add_task(self.query_position)
self.add_task(self.query_account)
self.write_log("交易接口连接成功")
def connect_push(self):
"""
Connect to push server.
@ -217,157 +210,127 @@ class TigerGateway(BaseGateway):
self.push_client.connect(self.client_config.tiger_id, self.client_config.private_key)
self.push_client.quote_changed = self.on_quote_change
self.push_client.subscribe_asset()
self.push_client.asset_changed = self.on_asset_changed
self.push_client.subscribe_position()
self.push_client.position_changed = self.on_position_changed
self.push_client.subscribe_order()
self.push_client.order_changed = self.on_order_changed
self.push_client.asset_changed = self.on_asset_change
self.push_client.position_changed = self.on_position_change
self.push_client.order_changed = self.on_order_change
self.write_log("推送接口连接成功")
def subscribe(self, req: SubscribeRequest):
""""""
symbol = convert_symbol_vt2tiger(req.symbol, req.exchange)
self.push_client.subscribe_quote([req.symbol])
# self.push_client.subscribe_asset()
# self.push_client.subscribe_position()
# self.push_client.subscribe_order()
self.push_client.subscribe_asset()
self.push_client.subscribe_position()
self.push_client.subscribe_order()
def on_quote_change(self, symbol: str, data: list, trading: bool):
symbol, exchange = convert_symbol_tiger2vt(symbol)
name = self.symbol_names[symbol]
def on_quote_change(self, tiger_symbol: str, data: list, trading: bool):
""""""
data = dict(data)
symbol, exchange = convert_symbol_tiger2vt(tiger_symbol)
tick = self.ticks.get(symbol, None)
if not tick:
tick = TickData(
symbol=symbol,
exchange=exchange,
datetime=None,
gateway_name=self.gateway_name,
name=name,
datetime=datetime.now(),
name=self.symbol_names[symbol],
)
self.ticks[symbol] = tick
tick.datetime = datetime.now()
tick.pre_close = data.get("prev_close",0)
tick.last_price = data.get("latest_price",0)
tick.datetime = datetime.fromtimestamp(data["latest_time"] / 1000)
tick.pre_close = data.get("prev_close", 0)
tick.last_price = data.get("latest_price", 0)
tick.volume = data.get("volume", 0)
tick.open_price = data.get("open", 0)
tick.open_price = data.get("open", 0) #美股无
tick.high_price = data.get("high", 0) # 美股无
tick.low_price = data.get("low", 0) # 美股无
tick.ask_price_1=data.get("ask_price", 0) # A股/港股无
tick.bid_price_1=data.get("bid_price", 0) # A股/港股无
tick.ask_volume_1=data.get("ask_size", 0) # A股/港股无
tick.bid_volume_1=data.get("bid_size", 0) # A股/港股无
tick.open_price = data.get("open", 0)
tick.high_price = data.get("high", 0)
tick.low_price = data.get("low", 0)
tick.ask_price_1 = data.get("ask_price", 0)
tick.bid_price_1 = data.get("bid_price", 0)
tick.ask_volume_1 = data.get("ask_size", 0)
tick.bid_volume_1 = data.get("bid_size", 0)
self.on_tick(copy(tick))
def on_asset_changed(self, account:str, data:list):
def on_asset_change(self, tiger_account: str, data: list):
""""""
#print("账号", data)
data = dict(data)
account = AccountData(
accountid=account,
accountid=tiger_account,
balance=data["net_liquidation"],
frozen=0.0,
gateway_name=self.gateway_name,
)
self.on_account(account)
def on_position_changed(self, account:str, data:list):
def on_position_change(self, tiger_account: str, data: list):
""""""
#print ("持仓", data)
data = dict(data)
symbol = data["origin_symbol"]
volume = data["quantity"]
# 判断方向
if volume > 0:
direction = Direction.LONG
else:
direction = Direction.SHORT
symbol, exchange = convert_symbol_tiger2vt(symbol)
symbol, exchange = convert_symbol_tiger2vt(data["origin_symbol"])
pos = PositionData(
symbol=symbol,
exchange=exchange,
direction=direction,
volume=volume,
direction=Direction.NET,
volume=data["quantity"],
frozen=0.0,
price=data["average_cost"],
pnl=data["unrealized_pnl"],
gateway_name=self.gateway_name,
)
self.on_position(pos)
def on_order_changed(self, account: str, data: list):
def on_order_change(self, tiger_account: str, data: list):
""""""
#print("委托", data)
print("委托", data)
data = dict(data)
symbol = data["origin_symbol"]
volume = data["quantity"]
symbol, exchange = convert_symbol_tiger2vt(symbol)
if data["order_type"] == "LMT":
price = data["limit_price"]
else:
price = 0
symbol, exchange = convert_symbol_tiger2vt(data["origin_symbol"])
status = PUSH_STATUS_TIGER2VT[data["status"]]
order = OrderData(
symbol=symbol,
exchange=exchange,
orderid=data["order_id"],
direction=DIRECTION_TIGER2VT[data["action"]],
price=price,
volume=volume,
direction=Direction.NET,
price=data.get("limit_price", 0),
volume=data["quantity"],
traded=data["filled"],
status=PUSH_STATUS_TIGER2VT[data["status"]],
time=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(data["order_time"] / 1000)).split(" ")[-1],
status=status,
time=datetime.fromtimestamp(data["order_time"] / 1000).strftime("%H:%M:%S"),
gateway_name=self.gateway_name,
)
self.on_order(order)
trade = TradeData(
symbol=symbol,
exchange=exchange,
direction=DIRECTION_TIGER2VT[data["action"]],
tradeid=data["order_id"],
orderid=data["order_id"],
price=data["avg_fill_price"],
volume=data["filled"],
time=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(data["trade_time"] / 1000)).split(" ")[-1],
gateway_name=self.gateway_name,
)
self.on_trade(trade)
if status == Status.ALLTRADED:
self.tradeid += 1
trade = TradeData(
symbol=symbol,
exchange=exchange,
direction=Direction.NET,
tradeid=self.tradeid,
orderid=data["order_id"],
price=data["avg_fill_price"],
volume=data["filled"],
time=datetime.fromtimestamp(data["trade_time"] / 1000).strftime("%H:%M:%S"),
gateway_name=self.gateway_name,
)
self.on_trade(trade)
def send_order(self, req: OrderRequest):
""""""
symbol = convert_symbol_vt2tiger(req.symbol, req.exchange)
self.add_task(self._send_order, req)
def _send_order(self, req: OrderRequest):
""""""
currency = config_symbol_currency(req.symbol)
order_type = PRICETYPE_VT2TIGER[req.price_type]
# first, get contract
try:
contract = self.trade_client.get_contracts(symbol=symbol, currency=currency)[0]
contract = self.trade_client.get_contracts(symbol=req.symbol, currency=currency)[0]
except ApiException:
self.write_log("获取合约对象失败")
return
@ -378,10 +341,10 @@ class TigerGateway(BaseGateway):
account=self.account,
contract=contract,
action=DIRECTION_VT2TIGER[req.direction],
order_type=order_type,
quantity=req.volume,
limit_price=req.price
)
order_type=PRICETYPE_VT2TIGER[req.price_type],
quantity=int(req.volume),
limit_price=req.price,
)
except ApiException:
self.write_log("创建订单失败")
return
@ -401,11 +364,15 @@ class TigerGateway(BaseGateway):
return order.vt_orderid
def cancel_order(self, req: CancelRequest):
""""""
self.add_task(self._cancel_order, req)
def _cancel_order(self, req: CancelRequest):
""""""
try:
data = self.trade_client.cancel_order(order_id=req.orderid)
except ApiException:
self.write_log("撤单失败")
self.write_log(f"撤单失败{req.orderid}")
return
if not data:
@ -498,7 +465,7 @@ class TigerGateway(BaseGateway):
for i in assets:
account = AccountData(
accountid=self.account,
balance=float(i.summary.net_liquidation),
balance=i.summary.net_liquidation,
frozen=0.0,
gateway_name=self.gateway_name,
)
@ -514,22 +481,15 @@ class TigerGateway(BaseGateway):
return
for i in position:
symbol = i.contract.symbol
symbol, exchange = convert_symbol_tiger2vt(symbol)
volume = float(i.quantity)
# 判断方向
if volume > 0:
direction = Direction.LONG
else:
direction = Direction.SHORT
symbol, exchange = convert_symbol_tiger2vt(i.contract.symbol)
pos = PositionData(
symbol=symbol,
exchange=exchange,
direction=direction,
volume=volume,
direction=Direction.NET,
volume=i.quantity,
frozen=0.0,
price=float(i.average_cost),
price=i.average_cost,
pnl=float(i.unrealized_pnl),
gateway_name=self.gateway_name,
)
@ -547,40 +507,28 @@ class TigerGateway(BaseGateway):
self.process_order(data)
self.process_deal(data)
def query_trade(self):
""""""
pass
def close(self):
""""""
self.active = False
self.queue.join()
if self.push_client:
self.push_client.disconnect()
def process_order(self, data):
""""""
for i in data:
symbol = str(i.contract)
symbol, exchange = convert_symbol_tiger2vt(symbol)
time_local = time.localtime(i.order_time / 1000)
if i.order_type == "LMT":
price = i.limit_price
else:
price = 0
for i in data:
symbol, exchange = convert_symbol_tiger2vt(str(i.contract))
order = OrderData(
symbol=symbol,
exchange=exchange,
orderid=str(i.order_id),
direction=DIRECTION_TIGER2VT[i.action],
price=float(price),
volume=float(i.quantity),
traded=float(i.filled),
direction=Direction.NET,
price=i.limit_price if i.limit_price else 0.0,
volume=i.quantity,
traded=i.filled,
status=STATUS_TIGER2VT[i.status],
time=time.strftime("%Y-%m-%d %H:%M:%S", time_local).split(" ")[-1],
time=datetime.fromtimestamp(i.order_time / 1000).strftime("%H:%M:%S"),
gateway_name=self.gateway_name,
)
@ -590,27 +538,27 @@ class TigerGateway(BaseGateway):
"""
Process trade data for both query and update.
"""
for i in data:
for i in reversed(data):
if i.status == ORDER_STATUS.PARTIALLY_FILLED or i.status == ORDER_STATUS.FILLED:
symbol = str(i.contract)
symbol, exchange = convert_symbol_tiger2vt(symbol)
time_local = time.localtime(i.trade_time / 1000)
symbol, exchange = convert_symbol_tiger2vt(str(i.contract))
self.tradeid += 1
trade = TradeData(
symbol=symbol,
exchange=exchange,
direction=DIRECTION_TIGER2VT[i.action],
tradeid=i.order_id,
direction=Direction.NET,
tradeid=self.tradeid,
orderid=i.order_id,
price=float(i.avg_fill_price),
volume=float(i.filled),
time=time.strftime("%Y-%m-%d %H:%M:%S", time_local).split(" ")[-1],
price=i.avg_fill_price,
volume=i.filled,
time=datetime.fromtimestamp(i.trade_time / 1000).strftime("%H:%M:%S"),
gateway_name=self.gateway_name,
)
self.on_trade(trade)
@functools.lru_cache()
def convert_symbol_tiger2vt(symbol):
"""
Convert symbol from vt to tiger.
@ -620,7 +568,7 @@ def convert_symbol_tiger2vt(symbol):
else:
if len(symbol) < 6:
exchange = Exchange.SEHK
elif symbol.startswith("6"):
elif symbol.startswith("6"):
exchange = Exchange.SSE
elif symbol.endswith(".SH"):
exchange = Exchange.SSE
@ -630,6 +578,7 @@ def convert_symbol_tiger2vt(symbol):
return symbol, exchange
@functools.lru_cache()
def convert_symbol_vt2tiger(symbol, exchange):
"""
Convert symbol from vt to tiger.
@ -641,6 +590,7 @@ def convert_symbol_vt2tiger(symbol, exchange):
return symbol
@functools.lru_cache()
def config_symbol_currency(symbol):
"""
Config symbol to corresponding currency
@ -652,5 +602,4 @@ def config_symbol_currency(symbol):
currency = Currency.HKD
else:
currency = Currency.CNH
return currency
return currency