Update tiger_gateway.py

This commit is contained in:
1122455801 2019-03-11 16:16:08 +08:00
parent 5e753f5003
commit 4cfab5b9d9

View File

@ -5,12 +5,13 @@ pip install tigeropen
"""
from copy import copy
from threading import Thread
from time import sleep
import time
from datetime import datetime
from multiprocessing.dummy import Pool
from queue import Empty, Queue
import functools
import pandas as pd
from pandas import DataFrame
from datetime import datetime
from tigeropen.tiger_open_config import TigerOpenClientConfig
from tigeropen.common.consts import Language, Currency, Market
@ -21,9 +22,6 @@ from tigeropen.push.push_client import PushClient
from tigeropen.common.exceptions import ApiException
from vnpy.trader.constant import Direction, Product, Status, PriceType, Exchange
from vnpy.trader.event import EVENT_TIMER
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import (
TickData,
@ -37,7 +35,6 @@ from vnpy.trader.object import (
CancelRequest,
)
PRODUCT_VT2TIGER = {
Product.EQUITY: "STK",
Product.OPTION: "OPT",
@ -45,7 +42,7 @@ PRODUCT_VT2TIGER = {
Product.WARRANT: "IOPT",
Product.FUTURES: "FUT",
Product.OPTION: "FOP",
Product.FOREX: "CASH"
Product.FOREX: "CASH",
}
DIRECTION_VT2TIGER = {
@ -54,9 +51,9 @@ DIRECTION_VT2TIGER = {
}
DIRECTION_TIGER2VT = {
"BUY":Direction.LONG,
"BUY": Direction.LONG,
"SELL": Direction.SHORT,
"sell":Direction.SHORT,
"sell": Direction.SHORT,
}
PRICETYPE_VT2TIGER = {
@ -73,42 +70,34 @@ STATUS_TIGER2VT = {
ORDER_STATUS.CANCELLED: Status.CANCELLED,
ORDER_STATUS.PENDING_CANCEL: Status.CANCELLED,
ORDER_STATUS.REJECTED: Status.REJECTED,
ORDER_STATUS.EXPIRED: Status.NOTTRADED
ORDER_STATUS.EXPIRED: Status.NOTTRADED,
}
PUSH_STATUS_TIGER2VT = {
"Invalid":Status.REJECTED,
"Initial":Status.SUBMITTING,
"PendingCancel":Status.CANCELLED,
"Cancelled":Status.CANCELLED,
"Submitted":Status.SUBMITTING,
"PendingSubmit":Status.SUBMITTING,
"Filled":Status.ALLTRADED,
"Inactive":Status.REJECTED,
"Invalid": Status.REJECTED,
"Initial": Status.SUBMITTING,
"PendingCancel": Status.CANCELLED,
"Cancelled": Status.CANCELLED,
"Submitted": Status.SUBMITTING,
"PendingSubmit": Status.SUBMITTING,
"Filled": Status.ALLTRADED,
"Inactive": Status.REJECTED,
}
# "private_key": "MIICXQIBAAKBgQC1amZa5YsGTklry7DAsUBOwXJCgrsZZtB21PImw/yLmrbqRfsS3vawvMigLWcCwIDnHa+hpdpeze0eHIwbZzJzUDGvRALYK9t3D8pwPVxpwX1OF8RfHCM7YQvSOvPPnHHuVQvKaR7NNm1/WmvGXC9kVJdkYQ7kCmh52siFoy1MLQIDAQABAoGAVabcmIHTt7ByncBXvUJymDxhE+HhMEcImXJEueTCca8kOUu9FNXMJvmax3VoMzZsJbIwX+OMTEJxd0wHIlEA0gECjDwFK4Q42q+ptO4QABJQVSC6I+dOt2OIY28uvT3rkenOO8KRIDt4F52PFd71ZdB1aaXixORORq1MdSLi8EkCQQDiviAB+L5R/HVxwxvqZfJ530OtFd5IipZC9YZlY1CtXWCmu89LK7UUlEuNXyGsOxyz5jLqFuNRsie5AC23tfEPAkEAzNMCa8axJWfPZIH4tGrbZ1F3I41BQdgp2zBmR7AyUMBDkli86OzmJ7QUCJA/PJxK43/IYUWm4OU5Q+SvXCr3AwJBAJTBj1Y7zwES1CpSitn5EF+MbmX71t1YrsQ3OHkD80YJ4QMCbDkw75gUwox5QSoxjd8ow3Z4laJfc1gYGeZQ41kCQQCCiQwm8cceBq3W6To+iUdw7itWngRz2Ta7uXnFwFYgvpeR4jnq3GfF7+9AkeWrVBQqLtrem0xCUfQP/+N+gudPAkBFLbt78/MpQGEDc7jyu/KE5Mp4wMMDQQwch9VLvsAZwWLysB6rZWpo3jIfp9zZ7c3zOYGNMWAZjtMmNkRJ8COH",
class TigerGateway(BaseGateway):
""""""
default_setting = {
default_setting = {
"tiger_id": "20150008",
"account": "DU575568",
"standard_account": "DU575568",
"standard_account": "DU575568"
}
def __init__(self, event_engine):
"""Constructor"""
super(TigerGateway, self).__init__(event_engine, "TIGER")
self.private_key = "MIICXQIBAAKBgQC1amZa5YsGTklry7DAsUBOwXJCgrsZZtB21PImw/yLmrbqRfsS3vawvMigLWcCwIDnHa+hpdpeze0eHIwbZzJzUDGvRALYK9t3D8pwPVxpwX1OF8RfHCM7YQvSOvPPnHHuVQvKaR7NNm1/WmvGXC9kVJdkYQ7kCmh52siFoy1MLQIDAQABAoGAVabcmIHTt7ByncBXvUJymDxhE+HhMEcImXJEueTCca8kOUu9FNXMJvmax3VoMzZsJbIwX+OMTEJxd0wHIlEA0gECjDwFK4Q42q+ptO4QABJQVSC6I+dOt2OIY28uvT3rkenOO8KRIDt4F52PFd71ZdB1aaXixORORq1MdSLi8EkCQQDiviAB+L5R/HVxwxvqZfJ530OtFd5IipZC9YZlY1CtXWCmu89LK7UUlEuNXyGsOxyz5jLqFuNRsie5AC23tfEPAkEAzNMCa8axJWfPZIH4tGrbZ1F3I41BQdgp2zBmR7AyUMBDkli86OzmJ7QUCJA/PJxK43/IYUWm4OU5Q+SvXCr3AwJBAJTBj1Y7zwES1CpSitn5EF+MbmX71t1YrsQ3OHkD80YJ4QMCbDkw75gUwox5QSoxjd8ow3Z4laJfc1gYGeZQ41kCQQCCiQwm8cceBq3W6To+iUdw7itWngRz2Ta7uXnFwFYgvpeR4jnq3GfF7+9AkeWrVBQqLtrem0xCUfQP/+N+gudPAkBFLbt78/MpQGEDc7jyu/KE5Mp4wMMDQQwch9VLvsAZwWLysB6rZWpo3jIfp9zZ7c3zOYGNMWAZjtMmNkRJ8COH"
self.tiger_key_2005 = "MIICXAIBAAKBgQC2s9fGSfp86pYpK/9FFtdzZXcpncxDMaWww9WPPn2EnZC9zqIamz4nUewDGgya33VgoHNL7a3iGNCe4zqivhr8k1ACG68psElaRjALl1UzdAMv4xwnrxpceTCgA9AZM8x+BmVXvO5cfgIfGdoahtdxjMNjIYDkx+HORGJ1cFcmrQIDAQABAoGBAIwf8uYJ5yvXX8PEEsyScDv5HiO0+uyuLz4bdLegXfRQRKrOyFVPq6PMmQ7n87L0n7m0VbluWWaHUboK3PXkiBzTsmx0aFS3aNyr203QGXXwp9hxF2WS968/6K2zSikaDrmSkWps5dVVqhnkJ6STj7cvM6ZGYIHWPC7W79qTYHihAkEA3FvFSznaTTajvZpHq83rrCh3wmI2ggeh1M1i89HAv0EfTLkWweyNM8qO39qeaGzB/TZiOal0LR8Mk7HbGPOTVwJBANRA4LyhwczHEdwH16n5QPcrogoSsPM6uq9ZL8zYwaMTcHvEJUhW6hUMQPyWcNtenH9mwcgRF78TFGLqIH9s95sCP1bv3ebP7FCKPg+Pzrb5hwFk9dq65MZoPHC4l1Gab3EFQFQEsfXQXeURBU1L8zM/tUkxK4+US0GB/nRGtyog7wJBALU4a2lCpqgDc4EshPsP4GLosyHskX4qL4hVGpXIn5NvnoNdlgNsidHMs5O1ksgJwI6aGmuKBH9Ud/x4L6T8UW8CQEbrCa3/vIv5mHzGe9G7ZsK5VaPx3VETSeRbDUai8KGpcMXX7nFUnhsBd7YvehOSwRSd5SCWrZuejhIdn5V7hYM="
self.private_key = ""
self.tiger_id = ""
self.account = ""
self.standard_account = ""
@ -119,13 +108,32 @@ class TigerGateway(BaseGateway):
self.quote_client = None
self.push_client = None
self.tradeid = 0
self.active = False
self.queue = Queue()
self.pool = None
self.ticks = {}
self.trades = set()
self.contracts = {}
self.symbol_names = {}
self.thread = Thread(target=self.query_data)
def run(self):
""""""
while self.active:
try:
func, arg = self.queue.get(timeout=0.1)
if arg:
func(arg)
else:
func()
except Empty:
pass
def add_task(self, func, arg=None):
""""""
self.queue.put((func, arg))
def connect(self, setting: dict):
""""""
@ -136,25 +144,20 @@ class TigerGateway(BaseGateway):
self.paper_account = setting["account"]
self.languege = Language.zh_CN
self.get_client_config()
self.connect_quote()
self.connect_trade()
self.connect_push()
# Start thread pool for REST call
self.active = True
self.pool = Pool(5)
self.pool.apply_async(self.run)
self.thread.start()
# Put connect task into quque.
self.init_client_config()
self.add_task(self.connect_quote)
self.add_task(self.connect_trade)
self.add_task(self.connect_push)
def query_data(self):
"""
Query all data necessary.
"""
# self.thread.start()
self.query_contract()
self.query_order()
self.query_position()
self.query_account()
def get_client_config(self, sandbox=True):
def init_client_config(self, sandbox=True):
""""""
self.client_config = TigerOpenClientConfig(sandbox_debug=sandbox)
self.client_config.private_key = self.private_key
@ -163,7 +166,6 @@ class TigerGateway(BaseGateway):
self.client_config.standard_account = self.standard_account
self.client_config.paper_account = self.paper_account
self.client_config.language = self.language
return self.client_config
def connect_quote(self):
"""
@ -177,7 +179,9 @@ class TigerGateway(BaseGateway):
return
if self.symbol_names:
self.write_log("行情接口连接成功")
self.add_task(self.query_contract)
self.write_log("行情接口连接成功")
def connect_trade(self):
"""
@ -191,8 +195,12 @@ class TigerGateway(BaseGateway):
return
if data:
self.add_task(self.query_order)
self.add_task(self.query_position)
self.add_task(self.query_account)
self.write_log("交易接口连接成功")
def connect_push(self):
"""
Connect to push server.
@ -202,166 +210,127 @@ class TigerGateway(BaseGateway):
self.push_client.connect(self.client_config.tiger_id, self.client_config.private_key)
self.push_client.quote_changed = self.on_quote_change
self.push_client.subscribe_asset()
self.push_client.asset_changed = self.on_asset_changed
self.push_client.subscribe_position()
self.push_client.position_changed = self.on_position_changed
self.push_client.subscribe_order()
self.push_client.order_changed = self.on_order_changed
self.push_client.asset_changed = self.on_asset_change
self.push_client.position_changed = self.on_position_change
self.push_client.order_changed = self.on_order_change
self.write_log("推送接口连接成功")
def subscribe(self, req: SubscribeRequest):
""""""
symbol = convert_symbol_vt2tiger(req.symbol, req.exchange)
self.push_client.subscribe_quote([req.symbol])
#self.push_client.subscribe_asset()
# self.push_client.subscribe_position()
#self.push_client.subscribe_order()
self.push_client.subscribe_asset()
self.push_client.subscribe_position()
self.push_client.subscribe_order()
def on_quote_change(self, symbol: str, data: list, trading: bool):
symbol, exchange = convert_symbol_tiger2vt(symbol)
name = self.symbol_names[symbol]
def on_quote_change(self, tiger_symbol: str, data: list, trading: bool):
""""""
data = dict(data)
symbol, exchange = convert_symbol_tiger2vt(tiger_symbol)
tick = self.ticks.get(symbol, None)
if not tick:
tick = TickData(
symbol=symbol,
exchange=exchange,
datetime=None,
gateway_name=self.gateway_name,
name=name,
datetime=datetime.now(),
name=self.symbol_names[symbol],
)
self.ticks[symbol] = tick
tick.datetime = datetime.now()
tick.pre_close = data.get("prev_close",0)
tick.last_price = data.get("latest_price",0)
tick.datetime = datetime.fromtimestamp(data["latest_time"] / 1000)
tick.pre_close = data.get("prev_close", 0)
tick.last_price = data.get("latest_price", 0)
tick.volume = data.get("volume", 0)
tick.open_price = data.get("open", 0)
tick.open_price = data.get("open", 0) #美股无
tick.high_price = data.get("high", 0) # 美股无
tick.low_price = data.get("low", 0) # 美股无
tick.ask_price_1=data.get("ask_price", 0) # A股/港股无
tick.bid_price_1=data.get("bid_price", 0) # A股/港股无
tick.ask_volume_1=data.get("ask_size", 0) # A股/港股无
tick.bid_volume_1=data.get("bid_size", 0) # A股/港股无
tick.open_price = data.get("open", 0)
tick.high_price = data.get("high", 0)
tick.low_price = data.get("low", 0)
tick.ask_price_1 = data.get("ask_price", 0)
tick.bid_price_1 = data.get("bid_price", 0)
tick.ask_volume_1 = data.get("ask_size", 0)
tick.bid_volume_1 = data.get("bid_size", 0)
self.on_tick(copy(tick))
def on_asset_changed(self, account:str, data:list):
def on_asset_change(self, tiger_account: str, data: list):
""""""
#print("账号", data)
data = dict(data)
account = AccountData(
accountid=account,
accountid=tiger_account,
balance=data["net_liquidation"],
frozen=0.0,
gateway_name=self.gateway_name,
)
self.on_account(account)
def on_position_changed(self, account:str, data:list):
def on_position_change(self, tiger_account: str, data: list):
""""""
#print ("持仓", data)
data = dict(data)
symbol = data["origin_symbol"]
volume = data["quantity"]
# 判断方向
if volume > 0:
direction = Direction.LONG
else:
direction = Direction.SHORT
symbol, exchange = convert_symbol_tiger2vt(symbol)
symbol, exchange = convert_symbol_tiger2vt(data["origin_symbol"])
pos = PositionData(
symbol=symbol,
exchange=exchange,
direction=direction,
volume=volume,
direction=Direction.NET,
volume=data["quantity"],
frozen=0.0,
price=data["average_cost"],
pnl=data["unrealized_pnl"],
gateway_name=self.gateway_name,
)
self.on_position(pos)
def on_order_changed(self, account: str, data: list):
def on_order_change(self, tiger_account: str, data: list):
""""""
#print("委托", data)
print("委托", data)
data = dict(data)
symbol = data["origin_symbol"]
volume = data["quantity"]
symbol, exchange = convert_symbol_tiger2vt(symbol)
if data["order_type"] == "LMT":
price = data["limit_price"]
else:
price = 0
symbol, exchange = convert_symbol_tiger2vt(data["origin_symbol"])
status = PUSH_STATUS_TIGER2VT[data["status"]]
order = OrderData(
symbol=symbol,
exchange=exchange,
orderid=data["order_id"],
direction=DIRECTION_TIGER2VT[data["action"]],
price=price,
volume=volume,
direction=Direction.NET,
price=data.get("limit_price", 0),
volume=data["quantity"],
traded=data["filled"],
status=PUSH_STATUS_TIGER2VT[data["status"]],
time=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(data["order_time"] / 1000)).split(" ")[-1],
status=status,
time=datetime.fromtimestamp(data["order_time"] / 1000).strftime("%H:%M:%S"),
gateway_name=self.gateway_name,
)
self.on_order(order)
trade = TradeData(
symbol=symbol,
exchange=exchange,
direction=DIRECTION_TIGER2VT[data["action"]],
tradeid=data["order_id"],
orderid=data["order_id"],
price=data["avg_fill_price"],
volume=data["filled"],
time=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(data["trade_time"] / 1000)).split(" ")[-1],
gateway_name=self.gateway_name,
)
self.on_trade(trade)
if status == Status.ALLTRADED:
self.tradeid += 1
trade = TradeData(
symbol=symbol,
exchange=exchange,
direction=Direction.NET,
tradeid=self.tradeid,
orderid=data["order_id"],
price=data["avg_fill_price"],
volume=data["filled"],
time=datetime.fromtimestamp(data["trade_time"] / 1000).strftime("%H:%M:%S"),
gateway_name=self.gateway_name,
)
self.on_trade(trade)
def send_order(self, req: OrderRequest):
""""""
symbol = convert_symbol_vt2tiger(req.symbol, req.exchange)
self.add_task(self._send_order, req)
def _send_order(self, req: OrderRequest):
""""""
currency = config_symbol_currency(req.symbol)
order_type = PRICETYPE_VT2TIGER[req.price_type]
# first, get contract
try:
contract = self.trade_client.get_contracts(symbol=symbol, currency=currency)[0]
contract = self.trade_client.get_contracts(symbol=req.symbol, currency=currency)[0]
except ApiException:
self.write_log("获取合约对象失败")
return
@ -372,10 +341,10 @@ class TigerGateway(BaseGateway):
account=self.account,
contract=contract,
action=DIRECTION_VT2TIGER[req.direction],
order_type=order_type,
quantity=req.volume,
limit_price=req.price
)
order_type=PRICETYPE_VT2TIGER[req.price_type],
quantity=int(req.volume),
limit_price=req.price,
)
except ApiException:
self.write_log("创建订单失败")
return
@ -395,11 +364,15 @@ class TigerGateway(BaseGateway):
return order.vt_orderid
def cancel_order(self, req: CancelRequest):
""""""
self.add_task(self._cancel_order, req)
def _cancel_order(self, req: CancelRequest):
""""""
try:
data = self.trade_client.cancel_order(order_id=req.orderid)
except ApiException:
self.write_log("撤单失败")
self.write_log(f"撤单失败{req.orderid}")
return
if not data:
@ -492,7 +465,7 @@ class TigerGateway(BaseGateway):
for i in assets:
account = AccountData(
accountid=self.account,
balance=float(i.summary.net_liquidation),
balance=i.summary.net_liquidation,
frozen=0.0,
gateway_name=self.gateway_name,
)
@ -508,22 +481,15 @@ class TigerGateway(BaseGateway):
return
for i in position:
symbol = i.contract.symbol
symbol, exchange = convert_symbol_tiger2vt(symbol)
volume = float(i.quantity)
# 判断方向
if volume > 0:
direction = Direction.LONG
else:
direction = Direction.SHORT
symbol, exchange = convert_symbol_tiger2vt(i.contract.symbol)
pos = PositionData(
symbol=symbol,
exchange=exchange,
direction=direction,
volume=volume,
direction=Direction.NET,
volume=i.quantity,
frozen=0.0,
price=float(i.average_cost),
price=i.average_cost,
pnl=float(i.unrealized_pnl),
gateway_name=self.gateway_name,
)
@ -541,38 +507,28 @@ class TigerGateway(BaseGateway):
self.process_order(data)
self.process_deal(data)
def query_trade(self):
""""""
pass
def close(self):
""""""
self.active = False
if self.push_client:
self.push_client.disconnect()
def process_order(self, data):
""""""
for i in data:
symbol = str(i.contract)
symbol, exchange = convert_symbol_tiger2vt(symbol)
time_local = time.localtime(i.order_time / 1000)
if i.order_type == "LMT":
price = i.limit_price
else:
price = 0
for i in data:
symbol, exchange = convert_symbol_tiger2vt(str(i.contract))
order = OrderData(
symbol=symbol,
exchange=exchange,
orderid=str(i.order_id),
direction=DIRECTION_TIGER2VT[i.action],
price=float(price),
volume=float(i.quantity),
traded=float(i.filled),
direction=Direction.NET,
price=i.limit_price if i.limit_price else 0.0,
volume=i.quantity,
traded=i.filled,
status=STATUS_TIGER2VT[i.status],
time=time.strftime("%Y-%m-%d %H:%M:%S", time_local).split(" ")[-1],
time=datetime.fromtimestamp(i.order_time / 1000).strftime("%H:%M:%S"),
gateway_name=self.gateway_name,
)
@ -582,27 +538,27 @@ class TigerGateway(BaseGateway):
"""
Process trade data for both query and update.
"""
for i in data:
for i in reversed(data):
if i.status == ORDER_STATUS.PARTIALLY_FILLED or i.status == ORDER_STATUS.FILLED:
symbol = str(i.contract)
symbol, exchange = convert_symbol_tiger2vt(symbol)
time_local = time.localtime(i.trade_time / 1000)
symbol, exchange = convert_symbol_tiger2vt(str(i.contract))
self.tradeid += 1
trade = TradeData(
symbol=symbol,
exchange=exchange,
direction=DIRECTION_TIGER2VT[i.action],
tradeid=i.order_id,
direction=Direction.NET,
tradeid=self.tradeid,
orderid=i.order_id,
price=float(i.avg_fill_price),
volume=float(i.filled),
time=time.strftime("%Y-%m-%d %H:%M:%S", time_local).split(" ")[-1],
price=i.avg_fill_price,
volume=i.filled,
time=datetime.fromtimestamp(i.trade_time / 1000).strftime("%H:%M:%S"),
gateway_name=self.gateway_name,
)
self.on_trade(trade)
@functools.lru_cache()
def convert_symbol_tiger2vt(symbol):
"""
Convert symbol from vt to tiger.
@ -612,7 +568,7 @@ def convert_symbol_tiger2vt(symbol):
else:
if len(symbol) < 6:
exchange = Exchange.SEHK
elif symbol.startswith("6"):
elif symbol.startswith("6"):
exchange = Exchange.SSE
elif symbol.endswith(".SH"):
exchange = Exchange.SSE
@ -622,6 +578,7 @@ def convert_symbol_tiger2vt(symbol):
return symbol, exchange
@functools.lru_cache()
def convert_symbol_vt2tiger(symbol, exchange):
"""
Convert symbol from vt to tiger.
@ -633,6 +590,7 @@ def convert_symbol_vt2tiger(symbol, exchange):
return symbol
@functools.lru_cache()
def config_symbol_currency(symbol):
"""
Config symbol to corresponding currency
@ -644,28 +602,4 @@ def config_symbol_currency(symbol):
currency = Currency.HKD
else:
currency = Currency.CNH
return currency
return currency