From 99ea570b6c285251629dcd0a4dacb77483036c80 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Wed, 18 Sep 2019 12:57:17 +0800 Subject: [PATCH] [Add] resubscribe logic to solve websocket data push stop problem, close #2045 --- vnpy/gateway/bitfinex/bitfinex_gateway.py | 52 +++++++++++++++++------ 1 file changed, 39 insertions(+), 13 deletions(-) diff --git a/vnpy/gateway/bitfinex/bitfinex_gateway.py b/vnpy/gateway/bitfinex/bitfinex_gateway.py index ac2d614e..de1aff81 100644 --- a/vnpy/gateway/bitfinex/bitfinex_gateway.py +++ b/vnpy/gateway/bitfinex/bitfinex_gateway.py @@ -11,6 +11,8 @@ from datetime import datetime, timedelta from urllib.parse import urlencode from vnpy.api.rest import Request, RestClient from vnpy.api.websocket import WebsocketClient +from vnpy.event import Event +from vnpy.trader.event import EVENT_TIMER from vnpy.trader.constant import ( Direction, @@ -92,6 +94,9 @@ class BitfinexGateway(BaseGateway): """Constructor""" super(BitfinexGateway, self).__init__(event_engine, "BITFINEX") + self.timer_count = 0 + self.resubscribe_interval = 60 + self.rest_api = BitfinexRestApi(self) self.ws_api = BitfinexWebsocketApi(self) @@ -104,9 +109,10 @@ class BitfinexGateway(BaseGateway): proxy_port = setting["proxy_port"] self.rest_api.connect(key, secret, session, proxy_host, proxy_port) - self.ws_api.connect(key, secret, proxy_host, proxy_port) + self.event_engine.register(EVENT_TIMER, self.process_timer_event) + def subscribe(self, req: SubscribeRequest): """""" self.ws_api.subscribe(req) @@ -136,6 +142,16 @@ class BitfinexGateway(BaseGateway): self.rest_api.stop() self.ws_api.stop() + def process_timer_event(self, event: Event): + """""" + self.timer_count += 1 + + if self.timer_count < self.resubscribe_interval: + return + + self.timer_count = 0 + self.ws_api.resubscribe() + class BitfinexRestApi(RestClient): """ @@ -359,11 +375,12 @@ class BitfinexWebsocketApi(WebsocketClient): self.accounts = {} self.orders = {} self.trades = set() - self.tickDict = {} - self.bidDict = {} - self.askDict = {} - self.orderLocalDict = {} - self.channelDict = {} # channel_id : (Channel, Symbol) + self.ticks = {} + self.bids = {} + self.asks = {} + self.channels = {} # channel_id : (Channel, Symbol) + + self.subscribed = {} def connect( self, key: str, secret: str, proxy_host: str, proxy_port: int @@ -378,12 +395,16 @@ class BitfinexWebsocketApi(WebsocketClient): """ Subscribe to tick data upate. """ + if req.symbol not in self.subscribed: + self.subscribed[req.symbol] = req + d = { "event": "subscribe", "channel": "book", "symbol": req.symbol, } self.send_packet(d) + d = { "event": "subscribe", "channel": "ticker", @@ -393,6 +414,11 @@ class BitfinexWebsocketApi(WebsocketClient): return int(round(time.time() * 1000)) + def resubscribe(self): + """""" + for req in self.subscribed.values(): + self.subscribe(req) + def _gen_unqiue_cid(self): self.order_id += 1 local_oid = time.strftime("%y%m%d") + str(self.order_id) @@ -463,7 +489,7 @@ class BitfinexWebsocketApi(WebsocketClient): if data["event"] == "subscribed": symbol = str(data["symbol"].replace("t", "")) - self.channelDict[data["chanId"]] = (data["channel"], symbol) + self.channels[data["chanId"]] = (data["channel"], symbol) def on_update(self, data): """""" @@ -480,12 +506,12 @@ class BitfinexWebsocketApi(WebsocketClient): def on_data_update(self, data): """""" channel_id = data[0] - channel, symbol = self.channelDict[channel_id] + channel, symbol = self.channels[channel_id] symbol = str(symbol.replace("t", "")) # Get the Tick object - if symbol in self.tickDict: - tick = self.tickDict[symbol] + if symbol in self.ticks: + tick = self.ticks[symbol] else: tick = TickData( symbol=symbol, @@ -495,7 +521,7 @@ class BitfinexWebsocketApi(WebsocketClient): gateway_name=self.gateway_name, ) - self.tickDict[symbol] = tick + self.ticks[symbol] = tick l_data1 = data[1] @@ -509,8 +535,8 @@ class BitfinexWebsocketApi(WebsocketClient): # Update deep quote elif channel == "book": - bid = self.bidDict.setdefault(symbol, {}) - ask = self.askDict.setdefault(symbol, {}) + bid = self.bids.setdefault(symbol, {}) + ask = self.asks.setdefault(symbol, {}) if len(l_data1) > 3: for price, count, amount in l_data1: