[Add] resubscribe logic to solve websocket data push stop problem, close #2045

This commit is contained in:
vn.py 2019-09-18 12:57:17 +08:00
parent 2448ef072d
commit 99ea570b6c

View File

@ -11,6 +11,8 @@ from datetime import datetime, timedelta
from urllib.parse import urlencode from urllib.parse import urlencode
from vnpy.api.rest import Request, RestClient from vnpy.api.rest import Request, RestClient
from vnpy.api.websocket import WebsocketClient from vnpy.api.websocket import WebsocketClient
from vnpy.event import Event
from vnpy.trader.event import EVENT_TIMER
from vnpy.trader.constant import ( from vnpy.trader.constant import (
Direction, Direction,
@ -92,6 +94,9 @@ class BitfinexGateway(BaseGateway):
"""Constructor""" """Constructor"""
super(BitfinexGateway, self).__init__(event_engine, "BITFINEX") super(BitfinexGateway, self).__init__(event_engine, "BITFINEX")
self.timer_count = 0
self.resubscribe_interval = 60
self.rest_api = BitfinexRestApi(self) self.rest_api = BitfinexRestApi(self)
self.ws_api = BitfinexWebsocketApi(self) self.ws_api = BitfinexWebsocketApi(self)
@ -104,9 +109,10 @@ class BitfinexGateway(BaseGateway):
proxy_port = setting["proxy_port"] proxy_port = setting["proxy_port"]
self.rest_api.connect(key, secret, session, proxy_host, proxy_port) self.rest_api.connect(key, secret, session, proxy_host, proxy_port)
self.ws_api.connect(key, secret, proxy_host, proxy_port) self.ws_api.connect(key, secret, proxy_host, proxy_port)
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def subscribe(self, req: SubscribeRequest): def subscribe(self, req: SubscribeRequest):
"""""" """"""
self.ws_api.subscribe(req) self.ws_api.subscribe(req)
@ -136,6 +142,16 @@ class BitfinexGateway(BaseGateway):
self.rest_api.stop() self.rest_api.stop()
self.ws_api.stop() self.ws_api.stop()
def process_timer_event(self, event: Event):
""""""
self.timer_count += 1
if self.timer_count < self.resubscribe_interval:
return
self.timer_count = 0
self.ws_api.resubscribe()
class BitfinexRestApi(RestClient): class BitfinexRestApi(RestClient):
""" """
@ -359,11 +375,12 @@ class BitfinexWebsocketApi(WebsocketClient):
self.accounts = {} self.accounts = {}
self.orders = {} self.orders = {}
self.trades = set() self.trades = set()
self.tickDict = {} self.ticks = {}
self.bidDict = {} self.bids = {}
self.askDict = {} self.asks = {}
self.orderLocalDict = {} self.channels = {} # channel_id : (Channel, Symbol)
self.channelDict = {} # channel_id : (Channel, Symbol)
self.subscribed = {}
def connect( def connect(
self, key: str, secret: str, proxy_host: str, proxy_port: int self, key: str, secret: str, proxy_host: str, proxy_port: int
@ -378,12 +395,16 @@ class BitfinexWebsocketApi(WebsocketClient):
""" """
Subscribe to tick data upate. Subscribe to tick data upate.
""" """
if req.symbol not in self.subscribed:
self.subscribed[req.symbol] = req
d = { d = {
"event": "subscribe", "event": "subscribe",
"channel": "book", "channel": "book",
"symbol": req.symbol, "symbol": req.symbol,
} }
self.send_packet(d) self.send_packet(d)
d = { d = {
"event": "subscribe", "event": "subscribe",
"channel": "ticker", "channel": "ticker",
@ -393,6 +414,11 @@ class BitfinexWebsocketApi(WebsocketClient):
return int(round(time.time() * 1000)) return int(round(time.time() * 1000))
def resubscribe(self):
""""""
for req in self.subscribed.values():
self.subscribe(req)
def _gen_unqiue_cid(self): def _gen_unqiue_cid(self):
self.order_id += 1 self.order_id += 1
local_oid = time.strftime("%y%m%d") + str(self.order_id) local_oid = time.strftime("%y%m%d") + str(self.order_id)
@ -463,7 +489,7 @@ class BitfinexWebsocketApi(WebsocketClient):
if data["event"] == "subscribed": if data["event"] == "subscribed":
symbol = str(data["symbol"].replace("t", "")) symbol = str(data["symbol"].replace("t", ""))
self.channelDict[data["chanId"]] = (data["channel"], symbol) self.channels[data["chanId"]] = (data["channel"], symbol)
def on_update(self, data): def on_update(self, data):
"""""" """"""
@ -480,12 +506,12 @@ class BitfinexWebsocketApi(WebsocketClient):
def on_data_update(self, data): def on_data_update(self, data):
"""""" """"""
channel_id = data[0] channel_id = data[0]
channel, symbol = self.channelDict[channel_id] channel, symbol = self.channels[channel_id]
symbol = str(symbol.replace("t", "")) symbol = str(symbol.replace("t", ""))
# Get the Tick object # Get the Tick object
if symbol in self.tickDict: if symbol in self.ticks:
tick = self.tickDict[symbol] tick = self.ticks[symbol]
else: else:
tick = TickData( tick = TickData(
symbol=symbol, symbol=symbol,
@ -495,7 +521,7 @@ class BitfinexWebsocketApi(WebsocketClient):
gateway_name=self.gateway_name, gateway_name=self.gateway_name,
) )
self.tickDict[symbol] = tick self.ticks[symbol] = tick
l_data1 = data[1] l_data1 = data[1]
@ -509,8 +535,8 @@ class BitfinexWebsocketApi(WebsocketClient):
# Update deep quote # Update deep quote
elif channel == "book": elif channel == "book":
bid = self.bidDict.setdefault(symbol, {}) bid = self.bids.setdefault(symbol, {})
ask = self.askDict.setdefault(symbol, {}) ask = self.asks.setdefault(symbol, {})
if len(l_data1) > 3: if len(l_data1) > 3:
for price, count, amount in l_data1: for price, count, amount in l_data1: