diff --git a/vnpy/api/websocket/websocket_client.py b/vnpy/api/websocket/websocket_client.py index 870d87a4..e7c767a0 100644 --- a/vnpy/api/websocket/websocket_client.py +++ b/vnpy/api/websocket/websocket_client.py @@ -125,52 +125,55 @@ class WebsocketClient(object): if ws: ws._send_binary(data) - def _reconnect(self): - """""" - if self._active: - self._disconnect() - self._connect() - def _create_connection(self, *args, **kwargs): """""" return websocket.create_connection(*args, **kwargs) - def _connect(self): + def _ensure_connection(self): """""" - self._ws = self._create_connection( - self.host, - sslopt={"cert_reqs": ssl.CERT_NONE}, - http_proxy_host=self.proxy_host, - http_proxy_port=self.proxy_port, - header=self.header - ) - self.on_connected() + triggered = False + with self._ws_lock: + if self._ws is None: + self._ws = self._create_connection( + self.host, + sslopt={"cert_reqs": ssl.CERT_NONE}, + http_proxy_host=self.proxy_host, + http_proxy_port=self.proxy_port, + header=self.header + ) + triggered = True + if triggered: + self.on_connected() def _disconnect(self): """ """ + triggered = False with self._ws_lock: if self._ws: - self._ws.close() + ws: websocket.WebSocket = self._ws self._ws = None + triggered = True + if triggered: + ws.close() + self.on_disconnected() + def _run(self): """ Keep running till stop is called. """ try: - self._connect() - - # todo: onDisconnect while self._active: try: + self._ensure_connection() ws = self._ws if ws: text = ws.recv() # ws object is closed when recv function is blocking if not text: - self._reconnect() + self._disconnect() continue self._record_last_received_text(text) @@ -185,17 +188,17 @@ class WebsocketClient(object): # ws is closed before recv function is called # For socket.error, see Issue #1608 except (websocket.WebSocketConnectionClosedException, socket.error): - self._reconnect() + self._disconnect() # other internal exception raised in on_packet except: # noqa et, ev, tb = sys.exc_info() self.on_error(et, ev, tb) - self._reconnect() + self._disconnect() except: # noqa et, ev, tb = sys.exc_info() self.on_error(et, ev, tb) - self._reconnect() + self._disconnect() @staticmethod def unpack_data(data: str): @@ -214,7 +217,10 @@ class WebsocketClient(object): except: # noqa et, ev, tb = sys.exc_info() self.on_error(et, ev, tb) - self._reconnect() + + # self._run() will reconnect websocket + sleep(1) + for i in range(self.ping_interval): if not self._active: break diff --git a/vnpy/gateway/bitmex/bitmex_gateway.py b/vnpy/gateway/bitmex/bitmex_gateway.py index e5af3082..549200a1 100644 --- a/vnpy/gateway/bitmex/bitmex_gateway.py +++ b/vnpy/gateway/bitmex/bitmex_gateway.py @@ -11,8 +11,10 @@ from urllib.parse import urlencode from requests import ConnectionError +from vnpy.event import Event from vnpy.api.rest import Request, RestClient from vnpy.api.websocket import WebsocketClient +from vnpy.trader.event import EVENT_TIMER from vnpy.trader.constant import ( Direction, Exchange, @@ -97,6 +99,8 @@ class BitmexGateway(BaseGateway): self.rest_api = BitmexRestApi(self) self.ws_api = BitmexWebsocketApi(self) + event_engine.register(EVENT_TIMER, self.process_timer_event) + def connect(self, setting: dict): """""" key = setting["ID"] @@ -146,6 +150,10 @@ class BitmexGateway(BaseGateway): self.rest_api.stop() self.ws_api.stop() + def process_timer_event(self, event: Event): + """""" + self.rest_api.reset_rate_limit() + class BitmexRestApi(RestClient): """ @@ -167,6 +175,11 @@ class BitmexRestApi(RestClient): self.connect_time = 0 + # Use 60 by default, and will update after first request + self.rate_limit_limit = 60 + self.rate_limit_remaining = 60 + self.rate_limit_sleep = 0 + def sign(self, request): """ Generate BitMEX signature. @@ -238,6 +251,9 @@ class BitmexRestApi(RestClient): def send_order(self, req: OrderRequest): """""" + if not self.check_rate_limit(): + return "" + orderid = str(self.connect_time + self._new_order_id()) data = { @@ -282,6 +298,9 @@ class BitmexRestApi(RestClient): def cancel_order(self, req: CancelRequest): """""" + if not self.check_rate_limit(): + return + orderid = req.orderid if orderid.isdigit(): @@ -299,6 +318,9 @@ class BitmexRestApi(RestClient): def query_history(self, req: HistoryRequest): """""" + if not self.check_rate_limit(): + return + history = [] count = 750 start_time = req.start.isoformat() @@ -369,11 +391,19 @@ class BitmexRestApi(RestClient): """ Callback when sending order failed on server. """ + self.update_rate_limit(request) + order = request.extra order.status = Status.REJECTED self.gateway.on_order(order) - msg = f"委托失败,状态码:{status_code},信息:{request.response.text}" + if request.response.text: + data = request.response.json() + error = data["error"] + msg = f"委托失败,状态码:{status_code},类型:{error['name']}, 信息:{error['message']}" + else: + msg = f"委托失败,状态码:{status_code}" + self.gateway.write_log(msg) def on_send_order_error( @@ -392,7 +422,7 @@ class BitmexRestApi(RestClient): def on_send_order(self, data, request): """Websocket will push a new order status""" - pass + self.update_rate_limit(request) def on_cancel_order_error( self, exception_type: type, exception_value: Exception, tb, request: Request @@ -406,13 +436,17 @@ class BitmexRestApi(RestClient): def on_cancel_order(self, data, request): """Websocket will push a new order status""" - pass + self.update_rate_limit(request) def on_failed(self, status_code: int, request: Request): """ Callback to handle request failed. """ - msg = f"请求失败,状态码:{status_code},信息:{request.response.text}" + self.update_rate_limit(request) + + data = request.response.json() + error = data["error"] + msg = f"请求失败,状态码:{status_code},类型:{error['name']}, 信息:{error['message']}" self.gateway.write_log(msg) def on_error( @@ -428,6 +462,47 @@ class BitmexRestApi(RestClient): self.exception_detail(exception_type, exception_value, tb, request) ) + def update_rate_limit(self, request: Request): + """ + Update current request limit remaining status. + """ + if request.response is None: + return + headers = request.response.headers + + self.rate_limit_remaining = int(headers["x-ratelimit-remaining"]) + self.rate_limit_sleep = int(headers.get("Retry-After", 0)) + 1 # 1 extra second sleep + + def reset_rate_limit(self): + """ + Reset request limit remaining every 1 second. + """ + self.rate_limit_remaining += 1 + self.rate_limit_remaining = min( + self.rate_limit_remaining, self.rate_limit_limit) + + # Countdown of retry sleep seconds + if self.rate_limit_sleep: + self.rate_limit_sleep -= 1 + + def check_rate_limit(self): + """ + Check if rate limit is reached before sending out requests. + """ + # Already received 429 from server + if self.rate_limit_sleep: + msg = f"请求过于频繁,已被BitMEX限制,请等待{self.rate_limit_sleep}秒后再试" + self.gateway.write_log(msg) + return False + # Just local request limit is reached + elif not self.rate_limit_remaining: + msg = "请求频率太高,有触发BitMEX流控的风险,请稍候再试" + self.gateway.write_log(msg) + return False + else: + self.rate_limit_remaining -= 1 + return True + class BitmexWebsocketApi(WebsocketClient): """""" diff --git a/vnpy/gateway/okex/okex_gateway.py b/vnpy/gateway/okex/okex_gateway.py index 5ea47268..9d257e6a 100644 --- a/vnpy/gateway/okex/okex_gateway.py +++ b/vnpy/gateway/okex/okex_gateway.py @@ -644,27 +644,26 @@ class OkexWebsocketApi(WebsocketClient): def on_depth(self, d): """""" - for tick_data in d: - symbol = d["instrument_id"] - tick = self.ticks.get(symbol, None) - if not tick: - return + symbol = d["instrument_id"] + tick = self.ticks.get(symbol, None) + if not tick: + return - bids = d["bids"] - asks = d["asks"] - for n, buf in enumerate(bids): - price, volume, _ = buf - tick.__setattr__("bid_price_%s" % (n + 1), float(price)) - tick.__setattr__("bid_volume_%s" % (n + 1), float(volume)) + bids = d["bids"] + asks = d["asks"] + for n, buf in enumerate(bids): + price, volume, _ = buf + tick.__setattr__("bid_price_%s" % (n + 1), float(price)) + tick.__setattr__("bid_volume_%s" % (n + 1), float(volume)) - for n, buf in enumerate(asks): - price, volume, _ = buf - tick.__setattr__("ask_price_%s" % (n + 1), float(price)) - tick.__setattr__("ask_volume_%s" % (n + 1), float(volume)) + for n, buf in enumerate(asks): + price, volume, _ = buf + tick.__setattr__("ask_price_%s" % (n + 1), float(price)) + tick.__setattr__("ask_volume_%s" % (n + 1), float(volume)) - tick.datetime = datetime.strptime( - d["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") - self.gateway.on_tick(copy(tick)) + tick.datetime = datetime.strptime( + d["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") + self.gateway.on_tick(copy(tick)) def on_order(self, d): """""" diff --git a/vnpy/gateway/okexf/okexf_gateway.py b/vnpy/gateway/okexf/okexf_gateway.py index b1212682..c42248ea 100644 --- a/vnpy/gateway/okexf/okexf_gateway.py +++ b/vnpy/gateway/okexf/okexf_gateway.py @@ -709,26 +709,25 @@ class OkexfWebsocketApi(WebsocketClient): def on_depth(self, d): """""" - for tick_data in d: - symbol = d["instrument_id"] - tick = self.ticks.get(symbol, None) - if not tick: - return + symbol = d["instrument_id"] + tick = self.ticks.get(symbol, None) + if not tick: + return - bids = d["bids"] - asks = d["asks"] - for n, buf in enumerate(bids): - price, volume, _, __ = buf - tick.__setattr__("bid_price_%s" % (n + 1), price) - tick.__setattr__("bid_volume_%s" % (n + 1), volume) + bids = d["bids"] + asks = d["asks"] + for n, buf in enumerate(bids): + price, volume, _, __ = buf + tick.__setattr__("bid_price_%s" % (n + 1), price) + tick.__setattr__("bid_volume_%s" % (n + 1), volume) - for n, buf in enumerate(asks): - price, volume, _, __ = buf - tick.__setattr__("ask_price_%s" % (n + 1), price) - tick.__setattr__("ask_volume_%s" % (n + 1), volume) + for n, buf in enumerate(asks): + price, volume, _, __ = buf + tick.__setattr__("ask_price_%s" % (n + 1), price) + tick.__setattr__("ask_volume_%s" % (n + 1), volume) - tick.datetime = utc_to_local(d["timestamp"]) - self.gateway.on_tick(copy(tick)) + tick.datetime = utc_to_local(d["timestamp"]) + self.gateway.on_tick(copy(tick)) def on_order(self, d): """""" diff --git a/vnpy/gateway/onetoken/onetoken_gateway.py b/vnpy/gateway/onetoken/onetoken_gateway.py index bd586eb3..50181d7a 100644 --- a/vnpy/gateway/onetoken/onetoken_gateway.py +++ b/vnpy/gateway/onetoken/onetoken_gateway.py @@ -259,7 +259,7 @@ class OnetokenRestApi(RestClient): symbol = instrument_data["name"] contract = ContractData( symbol=symbol, - exchange=Exchange.OKEX, # todo + exchange=Exchange(instrument_data['symbol'].split('/')[0].upper()), name=symbol, product=Product.SPOT, # todo size=float(instrument_data["min_amount"]),