[Mod]Use thread pool for changing sync call to async in tiger_gateway

This commit is contained in:
vn.py 2019-03-08 16:42:21 +08:00
parent a30766bc72
commit 67a8e5bdd2

View File

@ -4,13 +4,18 @@ Please install tiger-api before use.
pip install tigeropen pip install tigeropen
""" """
import time
from copy import copy from copy import copy
from threading import Thread from threading import Thread
from time import sleep from datetime import datetime
import time from multiprocessing.dummy import Pool
from queue import Empty, Queue
import pandas as pd import pandas as pd
from pandas import DataFrame from pandas import DataFrame
from datetime import datetime
from tigeropen.tiger_open_config import TigerOpenClientConfig from tigeropen.tiger_open_config import TigerOpenClientConfig
from tigeropen.common.consts import Language, Currency, Market from tigeropen.common.consts import Language, Currency, Market
@ -21,9 +26,6 @@ from tigeropen.push.push_client import PushClient
from tigeropen.common.exceptions import ApiException from tigeropen.common.exceptions import ApiException
from vnpy.trader.constant import Direction, Product, Status, PriceType, Exchange from vnpy.trader.constant import Direction, Product, Status, PriceType, Exchange
from vnpy.trader.event import EVENT_TIMER
from vnpy.trader.gateway import BaseGateway from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import ( from vnpy.trader.object import (
TickData, TickData,
@ -54,9 +56,9 @@ DIRECTION_VT2TIGER = {
} }
DIRECTION_TIGER2VT = { DIRECTION_TIGER2VT = {
"BUY":Direction.LONG, "BUY": Direction.LONG,
"SELL": Direction.SHORT, "SELL": Direction.SHORT,
"sell":Direction.SHORT, "sell": Direction.SHORT,
} }
PRICETYPE_VT2TIGER = { PRICETYPE_VT2TIGER = {
@ -77,17 +79,14 @@ STATUS_TIGER2VT = {
} }
PUSH_STATUS_TIGER2VT = { PUSH_STATUS_TIGER2VT = {
"Invalid":Status.REJECTED, "Invalid": Status.REJECTED,
"Initial":Status.SUBMITTING, "Initial": Status.SUBMITTING,
"PendingCancel":Status.CANCELLED, "PendingCancel": Status.CANCELLED,
"Cancelled":Status.CANCELLED, "Cancelled": Status.CANCELLED,
"Submitted":Status.SUBMITTING, "Submitted": Status.SUBMITTING,
"PendingSubmit":Status.SUBMITTING, "PendingSubmit": Status.SUBMITTING,
"Filled":Status.ALLTRADED, "Filled": Status.ALLTRADED,
"Inactive":Status.REJECTED, "Inactive": Status.REJECTED
} }
# "private_key": "MIICXQIBAAKBgQC1amZa5YsGTklry7DAsUBOwXJCgrsZZtB21PImw/yLmrbqRfsS3vawvMigLWcCwIDnHa+hpdpeze0eHIwbZzJzUDGvRALYK9t3D8pwPVxpwX1OF8RfHCM7YQvSOvPPnHHuVQvKaR7NNm1/WmvGXC9kVJdkYQ7kCmh52siFoy1MLQIDAQABAoGAVabcmIHTt7ByncBXvUJymDxhE+HhMEcImXJEueTCca8kOUu9FNXMJvmax3VoMzZsJbIwX+OMTEJxd0wHIlEA0gECjDwFK4Q42q+ptO4QABJQVSC6I+dOt2OIY28uvT3rkenOO8KRIDt4F52PFd71ZdB1aaXixORORq1MdSLi8EkCQQDiviAB+L5R/HVxwxvqZfJ530OtFd5IipZC9YZlY1CtXWCmu89LK7UUlEuNXyGsOxyz5jLqFuNRsie5AC23tfEPAkEAzNMCa8axJWfPZIH4tGrbZ1F3I41BQdgp2zBmR7AyUMBDkli86OzmJ7QUCJA/PJxK43/IYUWm4OU5Q+SvXCr3AwJBAJTBj1Y7zwES1CpSitn5EF+MbmX71t1YrsQ3OHkD80YJ4QMCbDkw75gUwox5QSoxjd8ow3Z4laJfc1gYGeZQ41kCQQCCiQwm8cceBq3W6To+iUdw7itWngRz2Ta7uXnFwFYgvpeR4jnq3GfF7+9AkeWrVBQqLtrem0xCUfQP/+N+gudPAkBFLbt78/MpQGEDc7jyu/KE5Mp4wMMDQQwch9VLvsAZwWLysB6rZWpo3jIfp9zZ7c3zOYGNMWAZjtMmNkRJ8COH", # "private_key": "MIICXQIBAAKBgQC1amZa5YsGTklry7DAsUBOwXJCgrsZZtB21PImw/yLmrbqRfsS3vawvMigLWcCwIDnHa+hpdpeze0eHIwbZzJzUDGvRALYK9t3D8pwPVxpwX1OF8RfHCM7YQvSOvPPnHHuVQvKaR7NNm1/WmvGXC9kVJdkYQ7kCmh52siFoy1MLQIDAQABAoGAVabcmIHTt7ByncBXvUJymDxhE+HhMEcImXJEueTCca8kOUu9FNXMJvmax3VoMzZsJbIwX+OMTEJxd0wHIlEA0gECjDwFK4Q42q+ptO4QABJQVSC6I+dOt2OIY28uvT3rkenOO8KRIDt4F52PFd71ZdB1aaXixORORq1MdSLi8EkCQQDiviAB+L5R/HVxwxvqZfJ530OtFd5IipZC9YZlY1CtXWCmu89LK7UUlEuNXyGsOxyz5jLqFuNRsie5AC23tfEPAkEAzNMCa8axJWfPZIH4tGrbZ1F3I41BQdgp2zBmR7AyUMBDkli86OzmJ7QUCJA/PJxK43/IYUWm4OU5Q+SvXCr3AwJBAJTBj1Y7zwES1CpSitn5EF+MbmX71t1YrsQ3OHkD80YJ4QMCbDkw75gUwox5QSoxjd8ow3Z4laJfc1gYGeZQ41kCQQCCiQwm8cceBq3W6To+iUdw7itWngRz2Ta7uXnFwFYgvpeR4jnq3GfF7+9AkeWrVBQqLtrem0xCUfQP/+N+gudPAkBFLbt78/MpQGEDc7jyu/KE5Mp4wMMDQQwch9VLvsAZwWLysB6rZWpo3jIfp9zZ7c3zOYGNMWAZjtMmNkRJ8COH",
@ -96,11 +95,9 @@ class TigerGateway(BaseGateway):
"""""" """"""
default_setting = { default_setting = {
"tiger_id": "20150008", "tiger_id": "20150008",
"account": "DU575568", "account": "DU575568",
"standard_account": "DU575568", "standard_account": "DU575568"
} }
def __init__(self, event_engine): def __init__(self, event_engine):
@ -119,13 +116,30 @@ class TigerGateway(BaseGateway):
self.quote_client = None self.quote_client = None
self.push_client = None self.push_client = None
self.active = False
self.queue = Queue()
self.pool = None
self.ticks = {} self.ticks = {}
self.trades = set() self.trades = set()
self.contracts = {} self.contracts = {}
self.symbol_names = {} self.symbol_names = {}
self.thread = Thread(target=self.query_data) def run(self):
""""""
while self.active:
try:
func, arg = self.queue.get(timeout=1)
if arg:
func(arg)
else:
func()
except Empty:
pass
def add_task(self, func, arg=None):
""""""
self.queue.put((func, arg))
def connect(self, setting: dict): def connect(self, setting: dict):
"""""" """"""
@ -136,24 +150,19 @@ class TigerGateway(BaseGateway):
self.paper_account = setting["account"] self.paper_account = setting["account"]
self.languege = Language.zh_CN self.languege = Language.zh_CN
# Start thread pool for REST call
self.active = True
self.pool = Pool(5)
self.pool.apply_async(self.run)
# Put connect task into quque.
self.get_client_config() self.get_client_config()
self.connect_quote() self.add_task(self.connect_quote)
self.connect_trade() self.add_task(self.connect_trade)
self.connect_push() self.add_task(self.connect_push)
self.thread.start() self.thread.start()
def query_data(self):
"""
Query all data necessary.
"""
self.query_contract()
self.query_order()
self.query_position()
self.query_account()
def get_client_config(self, sandbox=True): def get_client_config(self, sandbox=True):
"""""" """"""
self.client_config = TigerOpenClientConfig(sandbox_debug=sandbox) self.client_config = TigerOpenClientConfig(sandbox_debug=sandbox)
@ -179,6 +188,8 @@ class TigerGateway(BaseGateway):
if self.symbol_names: if self.symbol_names:
self.write_log("行情接口连接成功") self.write_log("行情接口连接成功")
self.add_task(self.query_contract)
def connect_trade(self): def connect_trade(self):
""" """
Connect to trade server. Connect to trade server.
@ -193,6 +204,10 @@ class TigerGateway(BaseGateway):
if data: if data:
self.write_log("交易接口连接成功") self.write_log("交易接口连接成功")
self.add_task(self.query_order)
self.add_task(self.query_position)
self.add_task(self.query_account)
def connect_push(self): def connect_push(self):
""" """
Connect to push server. Connect to push server.
@ -218,9 +233,9 @@ class TigerGateway(BaseGateway):
"""""" """"""
symbol = convert_symbol_vt2tiger(req.symbol, req.exchange) symbol = convert_symbol_vt2tiger(req.symbol, req.exchange)
self.push_client.subscribe_quote([req.symbol]) self.push_client.subscribe_quote([req.symbol])
#self.push_client.subscribe_asset() # self.push_client.subscribe_asset()
# self.push_client.subscribe_position() # self.push_client.subscribe_position()
#self.push_client.subscribe_order() # self.push_client.subscribe_order()
def on_quote_change(self, symbol: str, data: list, trading: bool): def on_quote_change(self, symbol: str, data: list, trading: bool):
symbol, exchange = convert_symbol_tiger2vt(symbol) symbol, exchange = convert_symbol_tiger2vt(symbol)
@ -249,7 +264,6 @@ class TigerGateway(BaseGateway):
tick.high_price = data.get("high", 0) # 美股无 tick.high_price = data.get("high", 0) # 美股无
tick.low_price = data.get("low", 0) # 美股无 tick.low_price = data.get("low", 0) # 美股无
tick.ask_price_1=data.get("ask_price", 0) # A股/港股无 tick.ask_price_1=data.get("ask_price", 0) # A股/港股无
tick.bid_price_1=data.get("bid_price", 0) # A股/港股无 tick.bid_price_1=data.get("bid_price", 0) # A股/港股无
tick.ask_volume_1=data.get("ask_size", 0) # A股/港股无 tick.ask_volume_1=data.get("ask_size", 0) # A股/港股无
@ -304,9 +318,6 @@ class TigerGateway(BaseGateway):
) )
self.on_position(pos) self.on_position(pos)
def on_order_changed(self, account: str, data: list): def on_order_changed(self, account: str, data: list):
"""""" """"""
#print("委托", data) #print("委托", data)
@ -321,7 +332,6 @@ class TigerGateway(BaseGateway):
else: else:
price = 0 price = 0
order = OrderData( order = OrderData(
symbol=symbol, symbol=symbol,
exchange=exchange, exchange=exchange,
@ -349,10 +359,6 @@ class TigerGateway(BaseGateway):
) )
self.on_trade(trade) self.on_trade(trade)
def send_order(self, req: OrderRequest): def send_order(self, req: OrderRequest):
"""""" """"""
symbol = convert_symbol_vt2tiger(req.symbol, req.exchange) symbol = convert_symbol_vt2tiger(req.symbol, req.exchange)
@ -547,6 +553,9 @@ class TigerGateway(BaseGateway):
def close(self): def close(self):
"""""" """"""
self.active = False
self.queue.join()
if self.push_client: if self.push_client:
self.push_client.disconnect() self.push_client.disconnect()
@ -562,7 +571,6 @@ class TigerGateway(BaseGateway):
else: else:
price = 0 price = 0
order = OrderData( order = OrderData(
symbol=symbol, symbol=symbol,
exchange=exchange, exchange=exchange,
@ -646,26 +654,3 @@ def config_symbol_currency(symbol):
currency = Currency.CNH currency = Currency.CNH
return currency return currency