[Add] request rate limit check for BitmexGateway

This commit is contained in:
vn.py 2019-07-13 13:22:28 +08:00
parent 33bd9c56c7
commit de47e8eb02

View File

@ -11,8 +11,10 @@ from urllib.parse import urlencode
from requests import ConnectionError from requests import ConnectionError
from vnpy.event import Event
from vnpy.api.rest import Request, RestClient from vnpy.api.rest import Request, RestClient
from vnpy.api.websocket import WebsocketClient from vnpy.api.websocket import WebsocketClient
from vnpy.trader.event import EVENT_TIMER
from vnpy.trader.constant import ( from vnpy.trader.constant import (
Direction, Direction,
Exchange, Exchange,
@ -97,6 +99,8 @@ class BitmexGateway(BaseGateway):
self.rest_api = BitmexRestApi(self) self.rest_api = BitmexRestApi(self)
self.ws_api = BitmexWebsocketApi(self) self.ws_api = BitmexWebsocketApi(self)
event_engine.register(EVENT_TIMER, self.process_timer_event)
def connect(self, setting: dict): def connect(self, setting: dict):
"""""" """"""
key = setting["ID"] key = setting["ID"]
@ -146,6 +150,10 @@ class BitmexGateway(BaseGateway):
self.rest_api.stop() self.rest_api.stop()
self.ws_api.stop() self.ws_api.stop()
def process_timer_event(self, event: Event):
""""""
self.rest_api.reset_rate_limit()
class BitmexRestApi(RestClient): class BitmexRestApi(RestClient):
""" """
@ -167,6 +175,11 @@ class BitmexRestApi(RestClient):
self.connect_time = 0 self.connect_time = 0
# Use 60 by default, and will update after first request
self.rate_limit_limit = 60
self.rate_limit_remaining = 60
self.rate_limit_sleep = 0
def sign(self, request): def sign(self, request):
""" """
Generate BitMEX signature. Generate BitMEX signature.
@ -238,6 +251,9 @@ class BitmexRestApi(RestClient):
def send_order(self, req: OrderRequest): def send_order(self, req: OrderRequest):
"""""" """"""
if not self.check_rate_limit():
return ""
orderid = str(self.connect_time + self._new_order_id()) orderid = str(self.connect_time + self._new_order_id())
data = { data = {
@ -282,6 +298,9 @@ class BitmexRestApi(RestClient):
def cancel_order(self, req: CancelRequest): def cancel_order(self, req: CancelRequest):
"""""" """"""
if not self.check_rate_limit():
return
orderid = req.orderid orderid = req.orderid
if orderid.isdigit(): if orderid.isdigit():
@ -299,6 +318,9 @@ class BitmexRestApi(RestClient):
def query_history(self, req: HistoryRequest): def query_history(self, req: HistoryRequest):
"""""" """"""
if not self.check_rate_limit():
return
history = [] history = []
count = 750 count = 750
start_time = req.start.isoformat() start_time = req.start.isoformat()
@ -369,11 +391,19 @@ class BitmexRestApi(RestClient):
""" """
Callback when sending order failed on server. Callback when sending order failed on server.
""" """
self.update_rate_limit(request)
order = request.extra order = request.extra
order.status = Status.REJECTED order.status = Status.REJECTED
self.gateway.on_order(order) self.gateway.on_order(order)
msg = f"委托失败,状态码:{status_code},信息:{request.response.text}" if request.response.text:
data = request.response.json()
error = data["error"]
msg = f"委托失败,状态码:{status_code},类型:{error['name']}, 信息:{error['message']}"
else:
msg = f"委托失败,状态码:{status_code}"
self.gateway.write_log(msg) self.gateway.write_log(msg)
def on_send_order_error( def on_send_order_error(
@ -392,7 +422,7 @@ class BitmexRestApi(RestClient):
def on_send_order(self, data, request): def on_send_order(self, data, request):
"""Websocket will push a new order status""" """Websocket will push a new order status"""
pass self.update_rate_limit(request)
def on_cancel_order_error( def on_cancel_order_error(
self, exception_type: type, exception_value: Exception, tb, request: Request self, exception_type: type, exception_value: Exception, tb, request: Request
@ -406,13 +436,17 @@ class BitmexRestApi(RestClient):
def on_cancel_order(self, data, request): def on_cancel_order(self, data, request):
"""Websocket will push a new order status""" """Websocket will push a new order status"""
pass self.update_rate_limit(request)
def on_failed(self, status_code: int, request: Request): def on_failed(self, status_code: int, request: Request):
""" """
Callback to handle request failed. Callback to handle request failed.
""" """
msg = f"请求失败,状态码:{status_code},信息:{request.response.text}" self.update_rate_limit(request)
data = request.response.json()
error = data["error"]
msg = f"请求失败,状态码:{status_code},类型:{error['name']}, 信息:{error['message']}"
self.gateway.write_log(msg) self.gateway.write_log(msg)
def on_error( def on_error(
@ -428,6 +462,47 @@ class BitmexRestApi(RestClient):
self.exception_detail(exception_type, exception_value, tb, request) self.exception_detail(exception_type, exception_value, tb, request)
) )
def update_rate_limit(self, request: Request):
"""
Update current request limit remaining status.
"""
if request.response is None:
return
headers = request.response.headers
self.rate_limit_remaining = int(headers["x-ratelimit-remaining"])
self.rate_limit_sleep = int(headers.get("Retry-After", 0)) + 1 # 1 extra second sleep
def reset_rate_limit(self):
"""
Reset request limit remaining every 1 second.
"""
self.rate_limit_remaining += 1
self.rate_limit_remaining = min(
self.rate_limit_remaining, self.rate_limit_limit)
# Countdown of retry sleep seconds
if self.rate_limit_sleep:
self.rate_limit_sleep -= 1
def check_rate_limit(self):
"""
Check if rate limit is reached before sending out requests.
"""
# Already received 429 from server
if self.rate_limit_sleep:
msg = f"请求过于频繁已被BitMEX限制请等待{self.rate_limit_sleep}秒后再试"
self.gateway.write_log(msg)
return False
# Just local request limit is reached
elif not self.rate_limit_remaining:
msg = "请求频率太高有触发BitMEX流控的风险请稍候再试"
self.gateway.write_log(msg)
return False
else:
self.rate_limit_remaining -= 1
return True
class BitmexWebsocketApi(WebsocketClient): class BitmexWebsocketApi(WebsocketClient):
"""""" """"""