Merge pull request #1914 from nanoric/fix_unhandled_exception_in_websocket.reconnect

[Mod] Fixed: exception raised in WebsocketClient._reconnect() may not …
This commit is contained in:
vn.py 2019-07-12 14:24:49 +08:00 committed by GitHub
commit 02c4a88f02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 24 deletions

View File

@ -125,18 +125,15 @@ class WebsocketClient(object):
if ws: if ws:
ws._send_binary(data) ws._send_binary(data)
def _reconnect(self):
""""""
if self._active:
self._disconnect()
self._connect()
def _create_connection(self, *args, **kwargs): def _create_connection(self, *args, **kwargs):
"""""" """"""
return websocket.create_connection(*args, **kwargs) return websocket.create_connection(*args, **kwargs)
def _connect(self): def _ensure_connection(self):
"""""" """"""
triggered = False
with self._ws_lock:
if self._ws is None:
self._ws = self._create_connection( self._ws = self._create_connection(
self.host, self.host,
sslopt={"cert_reqs": ssl.CERT_NONE}, sslopt={"cert_reqs": ssl.CERT_NONE},
@ -144,33 +141,39 @@ class WebsocketClient(object):
http_proxy_port=self.proxy_port, http_proxy_port=self.proxy_port,
header=self.header header=self.header
) )
triggered = True
if triggered:
self.on_connected() self.on_connected()
def _disconnect(self): def _disconnect(self):
""" """
""" """
triggered = False
with self._ws_lock: with self._ws_lock:
if self._ws: if self._ws:
self._ws.close() ws: websocket.WebSocket = self._ws
self._ws = None self._ws = None
triggered = True
if triggered:
ws.close()
self.on_disconnected()
def _run(self): def _run(self):
""" """
Keep running till stop is called. Keep running till stop is called.
""" """
try: try:
self._connect()
# todo: onDisconnect
while self._active: while self._active:
try: try:
self._ensure_connection()
ws = self._ws ws = self._ws
if ws: if ws:
text = ws.recv() text = ws.recv()
# ws object is closed when recv function is blocking # ws object is closed when recv function is blocking
if not text: if not text:
self._reconnect() self._disconnect()
continue continue
self._record_last_received_text(text) self._record_last_received_text(text)
@ -185,17 +188,17 @@ class WebsocketClient(object):
# ws is closed before recv function is called # ws is closed before recv function is called
# For socket.error, see Issue #1608 # For socket.error, see Issue #1608
except (websocket.WebSocketConnectionClosedException, socket.error): except (websocket.WebSocketConnectionClosedException, socket.error):
self._reconnect() self._disconnect()
# other internal exception raised in on_packet # other internal exception raised in on_packet
except: # noqa except: # noqa
et, ev, tb = sys.exc_info() et, ev, tb = sys.exc_info()
self.on_error(et, ev, tb) self.on_error(et, ev, tb)
self._reconnect() self._disconnect()
except: # noqa except: # noqa
et, ev, tb = sys.exc_info() et, ev, tb = sys.exc_info()
self.on_error(et, ev, tb) self.on_error(et, ev, tb)
self._reconnect() self._disconnect()
@staticmethod @staticmethod
def unpack_data(data: str): def unpack_data(data: str):
@ -214,7 +217,10 @@ class WebsocketClient(object):
except: # noqa except: # noqa
et, ev, tb = sys.exc_info() et, ev, tb = sys.exc_info()
self.on_error(et, ev, tb) self.on_error(et, ev, tb)
self._reconnect()
# self._run() will reconnect websocket
sleep(1)
for i in range(self.ping_interval): for i in range(self.ping_interval):
if not self._active: if not self._active:
break break

View File

@ -716,6 +716,7 @@ class CtpTdApi(TdApi):
ctp_req = { ctp_req = {
"InstrumentID": req.symbol, "InstrumentID": req.symbol,
"ExchangeID": req.exchange.value,
"LimitPrice": req.price, "LimitPrice": req.price,
"VolumeTotalOriginal": int(req.volume), "VolumeTotalOriginal": int(req.volume),
"OrderPriceType": ORDERTYPE_VT2CTP.get(req.type, ""), "OrderPriceType": ORDERTYPE_VT2CTP.get(req.type, ""),