From de47e8eb024641751910ab1eafe55ba23a66c780 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Sat, 13 Jul 2019 13:22:28 +0800 Subject: [PATCH] [Add] request rate limit check for BitmexGateway --- vnpy/gateway/bitmex/bitmex_gateway.py | 83 +++++++++++++++++++++++++-- 1 file changed, 79 insertions(+), 4 deletions(-) diff --git a/vnpy/gateway/bitmex/bitmex_gateway.py b/vnpy/gateway/bitmex/bitmex_gateway.py index e5af3082..549200a1 100644 --- a/vnpy/gateway/bitmex/bitmex_gateway.py +++ b/vnpy/gateway/bitmex/bitmex_gateway.py @@ -11,8 +11,10 @@ from urllib.parse import urlencode from requests import ConnectionError +from vnpy.event import Event from vnpy.api.rest import Request, RestClient from vnpy.api.websocket import WebsocketClient +from vnpy.trader.event import EVENT_TIMER from vnpy.trader.constant import ( Direction, Exchange, @@ -97,6 +99,8 @@ class BitmexGateway(BaseGateway): self.rest_api = BitmexRestApi(self) self.ws_api = BitmexWebsocketApi(self) + event_engine.register(EVENT_TIMER, self.process_timer_event) + def connect(self, setting: dict): """""" key = setting["ID"] @@ -146,6 +150,10 @@ class BitmexGateway(BaseGateway): self.rest_api.stop() self.ws_api.stop() + def process_timer_event(self, event: Event): + """""" + self.rest_api.reset_rate_limit() + class BitmexRestApi(RestClient): """ @@ -167,6 +175,11 @@ class BitmexRestApi(RestClient): self.connect_time = 0 + # Use 60 by default, and will update after first request + self.rate_limit_limit = 60 + self.rate_limit_remaining = 60 + self.rate_limit_sleep = 0 + def sign(self, request): """ Generate BitMEX signature. @@ -238,6 +251,9 @@ class BitmexRestApi(RestClient): def send_order(self, req: OrderRequest): """""" + if not self.check_rate_limit(): + return "" + orderid = str(self.connect_time + self._new_order_id()) data = { @@ -282,6 +298,9 @@ class BitmexRestApi(RestClient): def cancel_order(self, req: CancelRequest): """""" + if not self.check_rate_limit(): + return + orderid = req.orderid if orderid.isdigit(): @@ -299,6 +318,9 @@ class BitmexRestApi(RestClient): def query_history(self, req: HistoryRequest): """""" + if not self.check_rate_limit(): + return + history = [] count = 750 start_time = req.start.isoformat() @@ -369,11 +391,19 @@ class BitmexRestApi(RestClient): """ Callback when sending order failed on server. """ + self.update_rate_limit(request) + order = request.extra order.status = Status.REJECTED self.gateway.on_order(order) - msg = f"委托失败,状态码:{status_code},信息:{request.response.text}" + if request.response.text: + data = request.response.json() + error = data["error"] + msg = f"委托失败,状态码:{status_code},类型:{error['name']}, 信息:{error['message']}" + else: + msg = f"委托失败,状态码:{status_code}" + self.gateway.write_log(msg) def on_send_order_error( @@ -392,7 +422,7 @@ class BitmexRestApi(RestClient): def on_send_order(self, data, request): """Websocket will push a new order status""" - pass + self.update_rate_limit(request) def on_cancel_order_error( self, exception_type: type, exception_value: Exception, tb, request: Request @@ -406,13 +436,17 @@ class BitmexRestApi(RestClient): def on_cancel_order(self, data, request): """Websocket will push a new order status""" - pass + self.update_rate_limit(request) def on_failed(self, status_code: int, request: Request): """ Callback to handle request failed. """ - msg = f"请求失败,状态码:{status_code},信息:{request.response.text}" + self.update_rate_limit(request) + + data = request.response.json() + error = data["error"] + msg = f"请求失败,状态码:{status_code},类型:{error['name']}, 信息:{error['message']}" self.gateway.write_log(msg) def on_error( @@ -428,6 +462,47 @@ class BitmexRestApi(RestClient): self.exception_detail(exception_type, exception_value, tb, request) ) + def update_rate_limit(self, request: Request): + """ + Update current request limit remaining status. + """ + if request.response is None: + return + headers = request.response.headers + + self.rate_limit_remaining = int(headers["x-ratelimit-remaining"]) + self.rate_limit_sleep = int(headers.get("Retry-After", 0)) + 1 # 1 extra second sleep + + def reset_rate_limit(self): + """ + Reset request limit remaining every 1 second. + """ + self.rate_limit_remaining += 1 + self.rate_limit_remaining = min( + self.rate_limit_remaining, self.rate_limit_limit) + + # Countdown of retry sleep seconds + if self.rate_limit_sleep: + self.rate_limit_sleep -= 1 + + def check_rate_limit(self): + """ + Check if rate limit is reached before sending out requests. + """ + # Already received 429 from server + if self.rate_limit_sleep: + msg = f"请求过于频繁,已被BitMEX限制,请等待{self.rate_limit_sleep}秒后再试" + self.gateway.write_log(msg) + return False + # Just local request limit is reached + elif not self.rate_limit_remaining: + msg = "请求频率太高,有触发BitMEX流控的风险,请稍候再试" + self.gateway.write_log(msg) + return False + else: + self.rate_limit_remaining -= 1 + return True + class BitmexWebsocketApi(WebsocketClient): """"""