Merge pull request #1921 from vnpy/ws-reconnect

Ws reconnect
This commit is contained in:
vn.py 2019-07-13 13:25:44 +08:00 committed by GitHub
commit 28b9ce6d8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 143 additions and 64 deletions

View File

@ -125,52 +125,55 @@ class WebsocketClient(object):
if ws:
ws._send_binary(data)
def _reconnect(self):
""""""
if self._active:
self._disconnect()
self._connect()
def _create_connection(self, *args, **kwargs):
""""""
return websocket.create_connection(*args, **kwargs)
def _connect(self):
def _ensure_connection(self):
""""""
self._ws = self._create_connection(
self.host,
sslopt={"cert_reqs": ssl.CERT_NONE},
http_proxy_host=self.proxy_host,
http_proxy_port=self.proxy_port,
header=self.header
)
self.on_connected()
triggered = False
with self._ws_lock:
if self._ws is None:
self._ws = self._create_connection(
self.host,
sslopt={"cert_reqs": ssl.CERT_NONE},
http_proxy_host=self.proxy_host,
http_proxy_port=self.proxy_port,
header=self.header
)
triggered = True
if triggered:
self.on_connected()
def _disconnect(self):
"""
"""
triggered = False
with self._ws_lock:
if self._ws:
self._ws.close()
ws: websocket.WebSocket = self._ws
self._ws = None
triggered = True
if triggered:
ws.close()
self.on_disconnected()
def _run(self):
"""
Keep running till stop is called.
"""
try:
self._connect()
# todo: onDisconnect
while self._active:
try:
self._ensure_connection()
ws = self._ws
if ws:
text = ws.recv()
# ws object is closed when recv function is blocking
if not text:
self._reconnect()
self._disconnect()
continue
self._record_last_received_text(text)
@ -185,17 +188,17 @@ class WebsocketClient(object):
# ws is closed before recv function is called
# For socket.error, see Issue #1608
except (websocket.WebSocketConnectionClosedException, socket.error):
self._reconnect()
self._disconnect()
# other internal exception raised in on_packet
except: # noqa
et, ev, tb = sys.exc_info()
self.on_error(et, ev, tb)
self._reconnect()
self._disconnect()
except: # noqa
et, ev, tb = sys.exc_info()
self.on_error(et, ev, tb)
self._reconnect()
self._disconnect()
@staticmethod
def unpack_data(data: str):
@ -214,7 +217,10 @@ class WebsocketClient(object):
except: # noqa
et, ev, tb = sys.exc_info()
self.on_error(et, ev, tb)
self._reconnect()
# self._run() will reconnect websocket
sleep(1)
for i in range(self.ping_interval):
if not self._active:
break

View File

@ -11,8 +11,10 @@ from urllib.parse import urlencode
from requests import ConnectionError
from vnpy.event import Event
from vnpy.api.rest import Request, RestClient
from vnpy.api.websocket import WebsocketClient
from vnpy.trader.event import EVENT_TIMER
from vnpy.trader.constant import (
Direction,
Exchange,
@ -97,6 +99,8 @@ class BitmexGateway(BaseGateway):
self.rest_api = BitmexRestApi(self)
self.ws_api = BitmexWebsocketApi(self)
event_engine.register(EVENT_TIMER, self.process_timer_event)
def connect(self, setting: dict):
""""""
key = setting["ID"]
@ -146,6 +150,10 @@ class BitmexGateway(BaseGateway):
self.rest_api.stop()
self.ws_api.stop()
def process_timer_event(self, event: Event):
""""""
self.rest_api.reset_rate_limit()
class BitmexRestApi(RestClient):
"""
@ -167,6 +175,11 @@ class BitmexRestApi(RestClient):
self.connect_time = 0
# Use 60 by default, and will update after first request
self.rate_limit_limit = 60
self.rate_limit_remaining = 60
self.rate_limit_sleep = 0
def sign(self, request):
"""
Generate BitMEX signature.
@ -238,6 +251,9 @@ class BitmexRestApi(RestClient):
def send_order(self, req: OrderRequest):
""""""
if not self.check_rate_limit():
return ""
orderid = str(self.connect_time + self._new_order_id())
data = {
@ -282,6 +298,9 @@ class BitmexRestApi(RestClient):
def cancel_order(self, req: CancelRequest):
""""""
if not self.check_rate_limit():
return
orderid = req.orderid
if orderid.isdigit():
@ -299,6 +318,9 @@ class BitmexRestApi(RestClient):
def query_history(self, req: HistoryRequest):
""""""
if not self.check_rate_limit():
return
history = []
count = 750
start_time = req.start.isoformat()
@ -369,11 +391,19 @@ class BitmexRestApi(RestClient):
"""
Callback when sending order failed on server.
"""
self.update_rate_limit(request)
order = request.extra
order.status = Status.REJECTED
self.gateway.on_order(order)
msg = f"委托失败,状态码:{status_code},信息:{request.response.text}"
if request.response.text:
data = request.response.json()
error = data["error"]
msg = f"委托失败,状态码:{status_code},类型:{error['name']}, 信息:{error['message']}"
else:
msg = f"委托失败,状态码:{status_code}"
self.gateway.write_log(msg)
def on_send_order_error(
@ -392,7 +422,7 @@ class BitmexRestApi(RestClient):
def on_send_order(self, data, request):
"""Websocket will push a new order status"""
pass
self.update_rate_limit(request)
def on_cancel_order_error(
self, exception_type: type, exception_value: Exception, tb, request: Request
@ -406,13 +436,17 @@ class BitmexRestApi(RestClient):
def on_cancel_order(self, data, request):
"""Websocket will push a new order status"""
pass
self.update_rate_limit(request)
def on_failed(self, status_code: int, request: Request):
"""
Callback to handle request failed.
"""
msg = f"请求失败,状态码:{status_code},信息:{request.response.text}"
self.update_rate_limit(request)
data = request.response.json()
error = data["error"]
msg = f"请求失败,状态码:{status_code},类型:{error['name']}, 信息:{error['message']}"
self.gateway.write_log(msg)
def on_error(
@ -428,6 +462,47 @@ class BitmexRestApi(RestClient):
self.exception_detail(exception_type, exception_value, tb, request)
)
def update_rate_limit(self, request: Request):
"""
Update current request limit remaining status.
"""
if request.response is None:
return
headers = request.response.headers
self.rate_limit_remaining = int(headers["x-ratelimit-remaining"])
self.rate_limit_sleep = int(headers.get("Retry-After", 0)) + 1 # 1 extra second sleep
def reset_rate_limit(self):
"""
Reset request limit remaining every 1 second.
"""
self.rate_limit_remaining += 1
self.rate_limit_remaining = min(
self.rate_limit_remaining, self.rate_limit_limit)
# Countdown of retry sleep seconds
if self.rate_limit_sleep:
self.rate_limit_sleep -= 1
def check_rate_limit(self):
"""
Check if rate limit is reached before sending out requests.
"""
# Already received 429 from server
if self.rate_limit_sleep:
msg = f"请求过于频繁已被BitMEX限制请等待{self.rate_limit_sleep}秒后再试"
self.gateway.write_log(msg)
return False
# Just local request limit is reached
elif not self.rate_limit_remaining:
msg = "请求频率太高有触发BitMEX流控的风险请稍候再试"
self.gateway.write_log(msg)
return False
else:
self.rate_limit_remaining -= 1
return True
class BitmexWebsocketApi(WebsocketClient):
""""""

View File

@ -644,27 +644,26 @@ class OkexWebsocketApi(WebsocketClient):
def on_depth(self, d):
""""""
for tick_data in d:
symbol = d["instrument_id"]
tick = self.ticks.get(symbol, None)
if not tick:
return
symbol = d["instrument_id"]
tick = self.ticks.get(symbol, None)
if not tick:
return
bids = d["bids"]
asks = d["asks"]
for n, buf in enumerate(bids):
price, volume, _ = buf
tick.__setattr__("bid_price_%s" % (n + 1), float(price))
tick.__setattr__("bid_volume_%s" % (n + 1), float(volume))
bids = d["bids"]
asks = d["asks"]
for n, buf in enumerate(bids):
price, volume, _ = buf
tick.__setattr__("bid_price_%s" % (n + 1), float(price))
tick.__setattr__("bid_volume_%s" % (n + 1), float(volume))
for n, buf in enumerate(asks):
price, volume, _ = buf
tick.__setattr__("ask_price_%s" % (n + 1), float(price))
tick.__setattr__("ask_volume_%s" % (n + 1), float(volume))
for n, buf in enumerate(asks):
price, volume, _ = buf
tick.__setattr__("ask_price_%s" % (n + 1), float(price))
tick.__setattr__("ask_volume_%s" % (n + 1), float(volume))
tick.datetime = datetime.strptime(
d["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ")
self.gateway.on_tick(copy(tick))
tick.datetime = datetime.strptime(
d["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ")
self.gateway.on_tick(copy(tick))
def on_order(self, d):
""""""

View File

@ -709,26 +709,25 @@ class OkexfWebsocketApi(WebsocketClient):
def on_depth(self, d):
""""""
for tick_data in d:
symbol = d["instrument_id"]
tick = self.ticks.get(symbol, None)
if not tick:
return
symbol = d["instrument_id"]
tick = self.ticks.get(symbol, None)
if not tick:
return
bids = d["bids"]
asks = d["asks"]
for n, buf in enumerate(bids):
price, volume, _, __ = buf
tick.__setattr__("bid_price_%s" % (n + 1), price)
tick.__setattr__("bid_volume_%s" % (n + 1), volume)
bids = d["bids"]
asks = d["asks"]
for n, buf in enumerate(bids):
price, volume, _, __ = buf
tick.__setattr__("bid_price_%s" % (n + 1), price)
tick.__setattr__("bid_volume_%s" % (n + 1), volume)
for n, buf in enumerate(asks):
price, volume, _, __ = buf
tick.__setattr__("ask_price_%s" % (n + 1), price)
tick.__setattr__("ask_volume_%s" % (n + 1), volume)
for n, buf in enumerate(asks):
price, volume, _, __ = buf
tick.__setattr__("ask_price_%s" % (n + 1), price)
tick.__setattr__("ask_volume_%s" % (n + 1), volume)
tick.datetime = utc_to_local(d["timestamp"])
self.gateway.on_tick(copy(tick))
tick.datetime = utc_to_local(d["timestamp"])
self.gateway.on_tick(copy(tick))
def on_order(self, d):
""""""

View File

@ -259,7 +259,7 @@ class OnetokenRestApi(RestClient):
symbol = instrument_data["name"]
contract = ContractData(
symbol=symbol,
exchange=Exchange.OKEX, # todo
exchange=Exchange(instrument_data['symbol'].split('/')[0].upper()),
name=symbol,
product=Product.SPOT, # todo
size=float(instrument_data["min_amount"]),