diff --git a/vnpy/gateway/tiger/tiger_gateway.py b/vnpy/gateway/tiger/tiger_gateway.py index 652d0681..0f8dc88f 100644 --- a/vnpy/gateway/tiger/tiger_gateway.py +++ b/vnpy/gateway/tiger/tiger_gateway.py @@ -5,12 +5,13 @@ pip install tigeropen """ from copy import copy -from threading import Thread -from time import sleep -import time +from datetime import datetime +from multiprocessing.dummy import Pool +from queue import Empty, Queue +import functools + import pandas as pd from pandas import DataFrame -from datetime import datetime from tigeropen.tiger_open_config import TigerOpenClientConfig from tigeropen.common.consts import Language, Currency, Market @@ -21,9 +22,6 @@ from tigeropen.push.push_client import PushClient from tigeropen.common.exceptions import ApiException from vnpy.trader.constant import Direction, Product, Status, PriceType, Exchange - - -from vnpy.trader.event import EVENT_TIMER from vnpy.trader.gateway import BaseGateway from vnpy.trader.object import ( TickData, @@ -37,7 +35,6 @@ from vnpy.trader.object import ( CancelRequest, ) - PRODUCT_VT2TIGER = { Product.EQUITY: "STK", Product.OPTION: "OPT", @@ -45,7 +42,7 @@ PRODUCT_VT2TIGER = { Product.WARRANT: "IOPT", Product.FUTURES: "FUT", Product.OPTION: "FOP", - Product.FOREX: "CASH" + Product.FOREX: "CASH", } DIRECTION_VT2TIGER = { @@ -54,9 +51,9 @@ DIRECTION_VT2TIGER = { } DIRECTION_TIGER2VT = { - "BUY":Direction.LONG, + "BUY": Direction.LONG, "SELL": Direction.SHORT, - "sell":Direction.SHORT, + "sell": Direction.SHORT, } PRICETYPE_VT2TIGER = { @@ -73,42 +70,34 @@ STATUS_TIGER2VT = { ORDER_STATUS.CANCELLED: Status.CANCELLED, ORDER_STATUS.PENDING_CANCEL: Status.CANCELLED, ORDER_STATUS.REJECTED: Status.REJECTED, - ORDER_STATUS.EXPIRED: Status.NOTTRADED + ORDER_STATUS.EXPIRED: Status.NOTTRADED, } PUSH_STATUS_TIGER2VT = { - "Invalid":Status.REJECTED, - "Initial":Status.SUBMITTING, - "PendingCancel":Status.CANCELLED, - "Cancelled":Status.CANCELLED, - "Submitted":Status.SUBMITTING, - "PendingSubmit":Status.SUBMITTING, - "Filled":Status.ALLTRADED, - "Inactive":Status.REJECTED, - - - + "Invalid": Status.REJECTED, + "Initial": Status.SUBMITTING, + "PendingCancel": Status.CANCELLED, + "Cancelled": Status.CANCELLED, + "Submitted": Status.SUBMITTING, + "PendingSubmit": Status.SUBMITTING, + "Filled": Status.ALLTRADED, + "Inactive": Status.REJECTED, } -# "private_key": "MIICXQIBAAKBgQC1amZa5YsGTklry7DAsUBOwXJCgrsZZtB21PImw/yLmrbqRfsS3vawvMigLWcCwIDnHa+hpdpeze0eHIwbZzJzUDGvRALYK9t3D8pwPVxpwX1OF8RfHCM7YQvSOvPPnHHuVQvKaR7NNm1/WmvGXC9kVJdkYQ7kCmh52siFoy1MLQIDAQABAoGAVabcmIHTt7ByncBXvUJymDxhE+HhMEcImXJEueTCca8kOUu9FNXMJvmax3VoMzZsJbIwX+OMTEJxd0wHIlEA0gECjDwFK4Q42q+ptO4QABJQVSC6I+dOt2OIY28uvT3rkenOO8KRIDt4F52PFd71ZdB1aaXixORORq1MdSLi8EkCQQDiviAB+L5R/HVxwxvqZfJ530OtFd5IipZC9YZlY1CtXWCmu89LK7UUlEuNXyGsOxyz5jLqFuNRsie5AC23tfEPAkEAzNMCa8axJWfPZIH4tGrbZ1F3I41BQdgp2zBmR7AyUMBDkli86OzmJ7QUCJA/PJxK43/IYUWm4OU5Q+SvXCr3AwJBAJTBj1Y7zwES1CpSitn5EF+MbmX71t1YrsQ3OHkD80YJ4QMCbDkw75gUwox5QSoxjd8ow3Z4laJfc1gYGeZQ41kCQQCCiQwm8cceBq3W6To+iUdw7itWngRz2Ta7uXnFwFYgvpeR4jnq3GfF7+9AkeWrVBQqLtrem0xCUfQP/+N+gudPAkBFLbt78/MpQGEDc7jyu/KE5Mp4wMMDQQwch9VLvsAZwWLysB6rZWpo3jIfp9zZ7c3zOYGNMWAZjtMmNkRJ8COH", class TigerGateway(BaseGateway): """""" - - default_setting = { - + default_setting = { "tiger_id": "20150008", "account": "DU575568", - "standard_account": "DU575568", - + "standard_account": "DU575568" } def __init__(self, event_engine): """Constructor""" super(TigerGateway, self).__init__(event_engine, "TIGER") - self.private_key = "MIICXQIBAAKBgQC1amZa5YsGTklry7DAsUBOwXJCgrsZZtB21PImw/yLmrbqRfsS3vawvMigLWcCwIDnHa+hpdpeze0eHIwbZzJzUDGvRALYK9t3D8pwPVxpwX1OF8RfHCM7YQvSOvPPnHHuVQvKaR7NNm1/WmvGXC9kVJdkYQ7kCmh52siFoy1MLQIDAQABAoGAVabcmIHTt7ByncBXvUJymDxhE+HhMEcImXJEueTCca8kOUu9FNXMJvmax3VoMzZsJbIwX+OMTEJxd0wHIlEA0gECjDwFK4Q42q+ptO4QABJQVSC6I+dOt2OIY28uvT3rkenOO8KRIDt4F52PFd71ZdB1aaXixORORq1MdSLi8EkCQQDiviAB+L5R/HVxwxvqZfJ530OtFd5IipZC9YZlY1CtXWCmu89LK7UUlEuNXyGsOxyz5jLqFuNRsie5AC23tfEPAkEAzNMCa8axJWfPZIH4tGrbZ1F3I41BQdgp2zBmR7AyUMBDkli86OzmJ7QUCJA/PJxK43/IYUWm4OU5Q+SvXCr3AwJBAJTBj1Y7zwES1CpSitn5EF+MbmX71t1YrsQ3OHkD80YJ4QMCbDkw75gUwox5QSoxjd8ow3Z4laJfc1gYGeZQ41kCQQCCiQwm8cceBq3W6To+iUdw7itWngRz2Ta7uXnFwFYgvpeR4jnq3GfF7+9AkeWrVBQqLtrem0xCUfQP/+N+gudPAkBFLbt78/MpQGEDc7jyu/KE5Mp4wMMDQQwch9VLvsAZwWLysB6rZWpo3jIfp9zZ7c3zOYGNMWAZjtMmNkRJ8COH" - self.tiger_key_2005 = "MIICXAIBAAKBgQC2s9fGSfp86pYpK/9FFtdzZXcpncxDMaWww9WPPn2EnZC9zqIamz4nUewDGgya33VgoHNL7a3iGNCe4zqivhr8k1ACG68psElaRjALl1UzdAMv4xwnrxpceTCgA9AZM8x+BmVXvO5cfgIfGdoahtdxjMNjIYDkx+HORGJ1cFcmrQIDAQABAoGBAIwf8uYJ5yvXX8PEEsyScDv5HiO0+uyuLz4bdLegXfRQRKrOyFVPq6PMmQ7n87L0n7m0VbluWWaHUboK3PXkiBzTsmx0aFS3aNyr203QGXXwp9hxF2WS968/6K2zSikaDrmSkWps5dVVqhnkJ6STj7cvM6ZGYIHWPC7W79qTYHihAkEA3FvFSznaTTajvZpHq83rrCh3wmI2ggeh1M1i89HAv0EfTLkWweyNM8qO39qeaGzB/TZiOal0LR8Mk7HbGPOTVwJBANRA4LyhwczHEdwH16n5QPcrogoSsPM6uq9ZL8zYwaMTcHvEJUhW6hUMQPyWcNtenH9mwcgRF78TFGLqIH9s95sCP1bv3ebP7FCKPg+Pzrb5hwFk9dq65MZoPHC4l1Gab3EFQFQEsfXQXeURBU1L8zM/tUkxK4+US0GB/nRGtyog7wJBALU4a2lCpqgDc4EshPsP4GLosyHskX4qL4hVGpXIn5NvnoNdlgNsidHMs5O1ksgJwI6aGmuKBH9Ud/x4L6T8UW8CQEbrCa3/vIv5mHzGe9G7ZsK5VaPx3VETSeRbDUai8KGpcMXX7nFUnhsBd7YvehOSwRSd5SCWrZuejhIdn5V7hYM=" + self.private_key = "" self.tiger_id = "" self.account = "" self.standard_account = "" @@ -119,13 +108,32 @@ class TigerGateway(BaseGateway): self.quote_client = None self.push_client = None + self.tradeid = 0 + + self.active = False + self.queue = Queue() + self.pool = None + self.ticks = {} self.trades = set() self.contracts = {} self.symbol_names = {} - self.thread = Thread(target=self.query_data) - + def run(self): + """""" + while self.active: + try: + func, arg = self.queue.get(timeout=0.1) + if arg: + func(arg) + else: + func() + except Empty: + pass + + def add_task(self, func, arg=None): + """""" + self.queue.put((func, arg)) def connect(self, setting: dict): """""" @@ -136,25 +144,20 @@ class TigerGateway(BaseGateway): self.paper_account = setting["account"] self.languege = Language.zh_CN - self.get_client_config() - self.connect_quote() - self.connect_trade() - self.connect_push() + # Start thread pool for REST call + self.active = True + self.pool = Pool(5) + self.pool.apply_async(self.run) - self.thread.start() + # Put connect task into quque. + self.init_client_config() + self.add_task(self.connect_quote) + self.add_task(self.connect_trade) + self.add_task(self.connect_push) - def query_data(self): - """ - Query all data necessary. - """ + # self.thread.start() - self.query_contract() - self.query_order() - self.query_position() - self.query_account() - - - def get_client_config(self, sandbox=True): + def init_client_config(self, sandbox=True): """""" self.client_config = TigerOpenClientConfig(sandbox_debug=sandbox) self.client_config.private_key = self.private_key @@ -163,7 +166,6 @@ class TigerGateway(BaseGateway): self.client_config.standard_account = self.standard_account self.client_config.paper_account = self.paper_account self.client_config.language = self.language - return self.client_config def connect_quote(self): """ @@ -177,7 +179,9 @@ class TigerGateway(BaseGateway): return if self.symbol_names: - self.write_log("行情接口连接成功") + self.add_task(self.query_contract) + + self.write_log("行情接口连接成功") def connect_trade(self): """ @@ -191,8 +195,12 @@ class TigerGateway(BaseGateway): return if data: + self.add_task(self.query_order) + self.add_task(self.query_position) + self.add_task(self.query_account) + self.write_log("交易接口连接成功") - + def connect_push(self): """ Connect to push server. @@ -202,166 +210,127 @@ class TigerGateway(BaseGateway): self.push_client.connect(self.client_config.tiger_id, self.client_config.private_key) self.push_client.quote_changed = self.on_quote_change - - self.push_client.subscribe_asset() - self.push_client.asset_changed = self.on_asset_changed - - self.push_client.subscribe_position() - self.push_client.position_changed = self.on_position_changed - - self.push_client.subscribe_order() - self.push_client.order_changed = self.on_order_changed + self.push_client.asset_changed = self.on_asset_change + self.push_client.position_changed = self.on_position_change + self.push_client.order_changed = self.on_order_change self.write_log("推送接口连接成功") def subscribe(self, req: SubscribeRequest): """""" - symbol = convert_symbol_vt2tiger(req.symbol, req.exchange) self.push_client.subscribe_quote([req.symbol]) - #self.push_client.subscribe_asset() - # self.push_client.subscribe_position() - #self.push_client.subscribe_order() + self.push_client.subscribe_asset() + self.push_client.subscribe_position() + self.push_client.subscribe_order() - def on_quote_change(self, symbol: str, data: list, trading: bool): - symbol, exchange = convert_symbol_tiger2vt(symbol) - name = self.symbol_names[symbol] + def on_quote_change(self, tiger_symbol: str, data: list, trading: bool): + """""" data = dict(data) + symbol, exchange = convert_symbol_tiger2vt(tiger_symbol) tick = self.ticks.get(symbol, None) if not tick: tick = TickData( symbol=symbol, exchange=exchange, - datetime=None, gateway_name=self.gateway_name, - name=name, + datetime=datetime.now(), + name=self.symbol_names[symbol], ) self.ticks[symbol] = tick - tick.datetime = datetime.now() - - tick.pre_close = data.get("prev_close",0) - tick.last_price = data.get("latest_price",0) + tick.datetime = datetime.fromtimestamp(data["latest_time"] / 1000) + tick.pre_close = data.get("prev_close", 0) + tick.last_price = data.get("latest_price", 0) tick.volume = data.get("volume", 0) tick.open_price = data.get("open", 0) - - tick.open_price = data.get("open", 0) #美股无 - tick.high_price = data.get("high", 0) # 美股无 - tick.low_price = data.get("low", 0) # 美股无 - - - tick.ask_price_1=data.get("ask_price", 0) # A股/港股无 - tick.bid_price_1=data.get("bid_price", 0) # A股/港股无 - tick.ask_volume_1=data.get("ask_size", 0) # A股/港股无 - tick.bid_volume_1=data.get("bid_size", 0) # A股/港股无 + tick.open_price = data.get("open", 0) + tick.high_price = data.get("high", 0) + tick.low_price = data.get("low", 0) + tick.ask_price_1 = data.get("ask_price", 0) + tick.bid_price_1 = data.get("bid_price", 0) + tick.ask_volume_1 = data.get("ask_size", 0) + tick.bid_volume_1 = data.get("bid_size", 0) self.on_tick(copy(tick)) - - - def on_asset_changed(self, account:str, data:list): + def on_asset_change(self, tiger_account: str, data: list): """""" - - #print("账号", data) data = dict(data) - account = AccountData( - accountid=account, + accountid=tiger_account, balance=data["net_liquidation"], frozen=0.0, gateway_name=self.gateway_name, ) self.on_account(account) - - - def on_position_changed(self, account:str, data:list): + def on_position_change(self, tiger_account: str, data: list): """""" - #print ("持仓", data) data = dict(data) - - symbol = data["origin_symbol"] - volume = data["quantity"] - # 判断方向 - if volume > 0: - direction = Direction.LONG - else: - direction = Direction.SHORT - - symbol, exchange = convert_symbol_tiger2vt(symbol) + symbol, exchange = convert_symbol_tiger2vt(data["origin_symbol"]) pos = PositionData( symbol=symbol, exchange=exchange, - direction=direction, - volume=volume, + direction=Direction.NET, + volume=data["quantity"], frozen=0.0, price=data["average_cost"], pnl=data["unrealized_pnl"], gateway_name=self.gateway_name, - ) self.on_position(pos) - - - - def on_order_changed(self, account: str, data: list): + def on_order_change(self, tiger_account: str, data: list): """""" - #print("委托", data) + print("委托", data) data = dict(data) - symbol = data["origin_symbol"] - volume = data["quantity"] - - symbol, exchange = convert_symbol_tiger2vt(symbol) - - if data["order_type"] == "LMT": - price = data["limit_price"] - else: - price = 0 - + symbol, exchange = convert_symbol_tiger2vt(data["origin_symbol"]) + status = PUSH_STATUS_TIGER2VT[data["status"]] order = OrderData( symbol=symbol, exchange=exchange, orderid=data["order_id"], - direction=DIRECTION_TIGER2VT[data["action"]], - price=price, - volume=volume, + direction=Direction.NET, + price=data.get("limit_price", 0), + volume=data["quantity"], traded=data["filled"], - status=PUSH_STATUS_TIGER2VT[data["status"]], - time=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(data["order_time"] / 1000)).split(" ")[-1], + status=status, + time=datetime.fromtimestamp(data["order_time"] / 1000).strftime("%H:%M:%S"), gateway_name=self.gateway_name, ) self.on_order(order) - trade = TradeData( - symbol=symbol, - exchange=exchange, - direction=DIRECTION_TIGER2VT[data["action"]], - tradeid=data["order_id"], - orderid=data["order_id"], - price=data["avg_fill_price"], - volume=data["filled"], - time=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(data["trade_time"] / 1000)).split(" ")[-1], - gateway_name=self.gateway_name, - ) - self.on_trade(trade) - - - - + if status == Status.ALLTRADED: + self.tradeid += 1 + trade = TradeData( + symbol=symbol, + exchange=exchange, + direction=Direction.NET, + tradeid=self.tradeid, + orderid=data["order_id"], + price=data["avg_fill_price"], + volume=data["filled"], + time=datetime.fromtimestamp(data["trade_time"] / 1000).strftime("%H:%M:%S"), + gateway_name=self.gateway_name, + ) + self.on_trade(trade) + def send_order(self, req: OrderRequest): """""" - symbol = convert_symbol_vt2tiger(req.symbol, req.exchange) + self.add_task(self._send_order, req) + + def _send_order(self, req: OrderRequest): + """""" currency = config_symbol_currency(req.symbol) - order_type = PRICETYPE_VT2TIGER[req.price_type] # first, get contract try: - contract = self.trade_client.get_contracts(symbol=symbol, currency=currency)[0] + contract = self.trade_client.get_contracts(symbol=req.symbol, currency=currency)[0] except ApiException: self.write_log("获取合约对象失败") return @@ -372,10 +341,10 @@ class TigerGateway(BaseGateway): account=self.account, contract=contract, action=DIRECTION_VT2TIGER[req.direction], - order_type=order_type, - quantity=req.volume, - limit_price=req.price - ) + order_type=PRICETYPE_VT2TIGER[req.price_type], + quantity=int(req.volume), + limit_price=req.price, + ) except ApiException: self.write_log("创建订单失败") return @@ -395,11 +364,15 @@ class TigerGateway(BaseGateway): return order.vt_orderid def cancel_order(self, req: CancelRequest): + """""" + self.add_task(self._cancel_order, req) + + def _cancel_order(self, req: CancelRequest): """""" try: data = self.trade_client.cancel_order(order_id=req.orderid) except ApiException: - self.write_log("撤单失败") + self.write_log(f"撤单失败:{req.orderid}") return if not data: @@ -492,7 +465,7 @@ class TigerGateway(BaseGateway): for i in assets: account = AccountData( accountid=self.account, - balance=float(i.summary.net_liquidation), + balance=i.summary.net_liquidation, frozen=0.0, gateway_name=self.gateway_name, ) @@ -508,22 +481,15 @@ class TigerGateway(BaseGateway): return for i in position: - symbol = i.contract.symbol - symbol, exchange = convert_symbol_tiger2vt(symbol) - volume = float(i.quantity) - # 判断方向 - if volume > 0: - direction = Direction.LONG - else: - direction = Direction.SHORT + symbol, exchange = convert_symbol_tiger2vt(i.contract.symbol) pos = PositionData( symbol=symbol, exchange=exchange, - direction=direction, - volume=volume, + direction=Direction.NET, + volume=i.quantity, frozen=0.0, - price=float(i.average_cost), + price=i.average_cost, pnl=float(i.unrealized_pnl), gateway_name=self.gateway_name, ) @@ -541,38 +507,28 @@ class TigerGateway(BaseGateway): self.process_order(data) self.process_deal(data) - def query_trade(self): - """""" - pass - def close(self): """""" + self.active = False + if self.push_client: self.push_client.disconnect() def process_order(self, data): """""" - for i in data: - symbol = str(i.contract) - symbol, exchange = convert_symbol_tiger2vt(symbol) - time_local = time.localtime(i.order_time / 1000) - - if i.order_type == "LMT": - price = i.limit_price - else: - price = 0 - + for i in data: + symbol, exchange = convert_symbol_tiger2vt(str(i.contract)) order = OrderData( symbol=symbol, exchange=exchange, orderid=str(i.order_id), - direction=DIRECTION_TIGER2VT[i.action], - price=float(price), - volume=float(i.quantity), - traded=float(i.filled), + direction=Direction.NET, + price=i.limit_price if i.limit_price else 0.0, + volume=i.quantity, + traded=i.filled, status=STATUS_TIGER2VT[i.status], - time=time.strftime("%Y-%m-%d %H:%M:%S", time_local).split(" ")[-1], + time=datetime.fromtimestamp(i.order_time / 1000).strftime("%H:%M:%S"), gateway_name=self.gateway_name, ) @@ -582,27 +538,27 @@ class TigerGateway(BaseGateway): """ Process trade data for both query and update. """ - for i in data: + for i in reversed(data): if i.status == ORDER_STATUS.PARTIALLY_FILLED or i.status == ORDER_STATUS.FILLED: - symbol = str(i.contract) - symbol, exchange = convert_symbol_tiger2vt(symbol) - time_local = time.localtime(i.trade_time / 1000) + symbol, exchange = convert_symbol_tiger2vt(str(i.contract)) + self.tradeid += 1 trade = TradeData( symbol=symbol, exchange=exchange, - direction=DIRECTION_TIGER2VT[i.action], - tradeid=i.order_id, + direction=Direction.NET, + tradeid=self.tradeid, orderid=i.order_id, - price=float(i.avg_fill_price), - volume=float(i.filled), - time=time.strftime("%Y-%m-%d %H:%M:%S", time_local).split(" ")[-1], + price=i.avg_fill_price, + volume=i.filled, + time=datetime.fromtimestamp(i.trade_time / 1000).strftime("%H:%M:%S"), gateway_name=self.gateway_name, ) self.on_trade(trade) +@functools.lru_cache() def convert_symbol_tiger2vt(symbol): """ Convert symbol from vt to tiger. @@ -612,7 +568,7 @@ def convert_symbol_tiger2vt(symbol): else: if len(symbol) < 6: exchange = Exchange.SEHK - elif symbol.startswith("6"): + elif symbol.startswith("6"): exchange = Exchange.SSE elif symbol.endswith(".SH"): exchange = Exchange.SSE @@ -622,6 +578,7 @@ def convert_symbol_tiger2vt(symbol): return symbol, exchange +@functools.lru_cache() def convert_symbol_vt2tiger(symbol, exchange): """ Convert symbol from vt to tiger. @@ -633,6 +590,7 @@ def convert_symbol_vt2tiger(symbol, exchange): return symbol +@functools.lru_cache() def config_symbol_currency(symbol): """ Config symbol to corresponding currency @@ -644,28 +602,4 @@ def config_symbol_currency(symbol): currency = Currency.HKD else: currency = Currency.CNH - - return currency - - - - - - - - - - - - - - - - - - - - - - - + return currency