From 67a8e5bdd2d1d9dcd4a1ec69c1a85a0a9327ec69 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Fri, 8 Mar 2019 16:42:21 +0800 Subject: [PATCH] [Mod]Use thread pool for changing sync call to async in tiger_gateway --- vnpy/gateway/tiger/tiger_gateway.py | 135 +++++++++++++--------------- 1 file changed, 60 insertions(+), 75 deletions(-) diff --git a/vnpy/gateway/tiger/tiger_gateway.py b/vnpy/gateway/tiger/tiger_gateway.py index 652d0681..46e10d4b 100644 --- a/vnpy/gateway/tiger/tiger_gateway.py +++ b/vnpy/gateway/tiger/tiger_gateway.py @@ -4,13 +4,18 @@ Please install tiger-api before use. pip install tigeropen """ +import time from copy import copy from threading import Thread -from time import sleep -import time +from datetime import datetime +from multiprocessing.dummy import Pool +from queue import Empty, Queue + import pandas as pd from pandas import DataFrame -from datetime import datetime + + + from tigeropen.tiger_open_config import TigerOpenClientConfig from tigeropen.common.consts import Language, Currency, Market @@ -21,9 +26,6 @@ from tigeropen.push.push_client import PushClient from tigeropen.common.exceptions import ApiException from vnpy.trader.constant import Direction, Product, Status, PriceType, Exchange - - -from vnpy.trader.event import EVENT_TIMER from vnpy.trader.gateway import BaseGateway from vnpy.trader.object import ( TickData, @@ -54,9 +56,9 @@ DIRECTION_VT2TIGER = { } DIRECTION_TIGER2VT = { - "BUY":Direction.LONG, + "BUY": Direction.LONG, "SELL": Direction.SHORT, - "sell":Direction.SHORT, + "sell": Direction.SHORT, } PRICETYPE_VT2TIGER = { @@ -77,17 +79,14 @@ STATUS_TIGER2VT = { } PUSH_STATUS_TIGER2VT = { - "Invalid":Status.REJECTED, - "Initial":Status.SUBMITTING, - "PendingCancel":Status.CANCELLED, - "Cancelled":Status.CANCELLED, - "Submitted":Status.SUBMITTING, - "PendingSubmit":Status.SUBMITTING, - "Filled":Status.ALLTRADED, - "Inactive":Status.REJECTED, - - - + "Invalid": Status.REJECTED, + "Initial": Status.SUBMITTING, + "PendingCancel": Status.CANCELLED, + "Cancelled": Status.CANCELLED, + "Submitted": Status.SUBMITTING, + "PendingSubmit": Status.SUBMITTING, + "Filled": Status.ALLTRADED, + "Inactive": Status.REJECTED } # "private_key": "MIICXQIBAAKBgQC1amZa5YsGTklry7DAsUBOwXJCgrsZZtB21PImw/yLmrbqRfsS3vawvMigLWcCwIDnHa+hpdpeze0eHIwbZzJzUDGvRALYK9t3D8pwPVxpwX1OF8RfHCM7YQvSOvPPnHHuVQvKaR7NNm1/WmvGXC9kVJdkYQ7kCmh52siFoy1MLQIDAQABAoGAVabcmIHTt7ByncBXvUJymDxhE+HhMEcImXJEueTCca8kOUu9FNXMJvmax3VoMzZsJbIwX+OMTEJxd0wHIlEA0gECjDwFK4Q42q+ptO4QABJQVSC6I+dOt2OIY28uvT3rkenOO8KRIDt4F52PFd71ZdB1aaXixORORq1MdSLi8EkCQQDiviAB+L5R/HVxwxvqZfJ530OtFd5IipZC9YZlY1CtXWCmu89LK7UUlEuNXyGsOxyz5jLqFuNRsie5AC23tfEPAkEAzNMCa8axJWfPZIH4tGrbZ1F3I41BQdgp2zBmR7AyUMBDkli86OzmJ7QUCJA/PJxK43/IYUWm4OU5Q+SvXCr3AwJBAJTBj1Y7zwES1CpSitn5EF+MbmX71t1YrsQ3OHkD80YJ4QMCbDkw75gUwox5QSoxjd8ow3Z4laJfc1gYGeZQ41kCQQCCiQwm8cceBq3W6To+iUdw7itWngRz2Ta7uXnFwFYgvpeR4jnq3GfF7+9AkeWrVBQqLtrem0xCUfQP/+N+gudPAkBFLbt78/MpQGEDc7jyu/KE5Mp4wMMDQQwch9VLvsAZwWLysB6rZWpo3jIfp9zZ7c3zOYGNMWAZjtMmNkRJ8COH", @@ -95,12 +94,10 @@ PUSH_STATUS_TIGER2VT = { class TigerGateway(BaseGateway): """""" - default_setting = { - + default_setting = { "tiger_id": "20150008", "account": "DU575568", - "standard_account": "DU575568", - + "standard_account": "DU575568" } def __init__(self, event_engine): @@ -119,13 +116,30 @@ class TigerGateway(BaseGateway): self.quote_client = None self.push_client = None + self.active = False + self.queue = Queue() + self.pool = None + self.ticks = {} self.trades = set() self.contracts = {} self.symbol_names = {} - self.thread = Thread(target=self.query_data) - + def run(self): + """""" + while self.active: + try: + func, arg = self.queue.get(timeout=1) + if arg: + func(arg) + else: + func() + except Empty: + pass + + def add_task(self, func, arg=None): + """""" + self.queue.put((func, arg)) def connect(self, setting: dict): """""" @@ -136,24 +150,19 @@ class TigerGateway(BaseGateway): self.paper_account = setting["account"] self.languege = Language.zh_CN + # Start thread pool for REST call + self.active = True + self.pool = Pool(5) + self.pool.apply_async(self.run) + + # Put connect task into quque. self.get_client_config() - self.connect_quote() - self.connect_trade() - self.connect_push() + self.add_task(self.connect_quote) + self.add_task(self.connect_trade) + self.add_task(self.connect_push) self.thread.start() - def query_data(self): - """ - Query all data necessary. - """ - - self.query_contract() - self.query_order() - self.query_position() - self.query_account() - - def get_client_config(self, sandbox=True): """""" self.client_config = TigerOpenClientConfig(sandbox_debug=sandbox) @@ -178,6 +187,8 @@ class TigerGateway(BaseGateway): if self.symbol_names: self.write_log("行情接口连接成功") + + self.add_task(self.query_contract) def connect_trade(self): """ @@ -192,6 +203,10 @@ class TigerGateway(BaseGateway): if data: self.write_log("交易接口连接成功") + + self.add_task(self.query_order) + self.add_task(self.query_position) + self.add_task(self.query_account) def connect_push(self): """ @@ -218,9 +233,9 @@ class TigerGateway(BaseGateway): """""" symbol = convert_symbol_vt2tiger(req.symbol, req.exchange) self.push_client.subscribe_quote([req.symbol]) - #self.push_client.subscribe_asset() + # self.push_client.subscribe_asset() # self.push_client.subscribe_position() - #self.push_client.subscribe_order() + # self.push_client.subscribe_order() def on_quote_change(self, symbol: str, data: list, trading: bool): symbol, exchange = convert_symbol_tiger2vt(symbol) @@ -249,7 +264,6 @@ class TigerGateway(BaseGateway): tick.high_price = data.get("high", 0) # 美股无 tick.low_price = data.get("low", 0) # 美股无 - tick.ask_price_1=data.get("ask_price", 0) # A股/港股无 tick.bid_price_1=data.get("bid_price", 0) # A股/港股无 tick.ask_volume_1=data.get("ask_size", 0) # A股/港股无 @@ -304,9 +318,6 @@ class TigerGateway(BaseGateway): ) self.on_position(pos) - - - def on_order_changed(self, account: str, data: list): """""" #print("委托", data) @@ -320,7 +331,6 @@ class TigerGateway(BaseGateway): price = data["limit_price"] else: price = 0 - order = OrderData( symbol=symbol, @@ -349,10 +359,6 @@ class TigerGateway(BaseGateway): ) self.on_trade(trade) - - - - def send_order(self, req: OrderRequest): """""" symbol = convert_symbol_vt2tiger(req.symbol, req.exchange) @@ -547,6 +553,9 @@ class TigerGateway(BaseGateway): def close(self): """""" + self.active = False + self.queue.join() + if self.push_client: self.push_client.disconnect() @@ -562,7 +571,6 @@ class TigerGateway(BaseGateway): else: price = 0 - order = OrderData( symbol=symbol, exchange=exchange, @@ -645,27 +653,4 @@ def config_symbol_currency(symbol): else: currency = Currency.CNH - return currency - - - - - - - - - - - - - - - - - - - - - - - + return currency \ No newline at end of file