From 0236fcb86496d20f269ee51ad3c24d066d670d9e Mon Sep 17 00:00:00 2001 From: nanoric Date: Sun, 7 Oct 2018 04:10:36 -0400 Subject: [PATCH] =?UTF-8?q?[Fix]=20WebSocketClient=EF=BC=9A=E4=BF=AE?= =?UTF-8?q?=E6=AD=A3=E4=B8=80=E4=B8=AA=E6=BD=9C=E5=9C=A8Bug=EF=BC=9A?= =?UTF-8?q?=E5=9B=A0=E7=BD=91=E7=BB=9C=E9=97=AE=E9=A2=98=E5=AF=BC=E8=87=B4?= =?UTF-8?q?=E9=87=8D=E8=BF=9E=E7=9A=84exception=E4=B8=8D=E4=BC=9A=E8=A2=AB?= =?UTF-8?q?onError=E6=8D=95=E8=8E=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/restful/WebSocketClient.py | 36 ++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/vnpy/restful/WebSocketClient.py b/vnpy/restful/WebSocketClient.py index fd6071fb..fdcba4ef 100644 --- a/vnpy/restful/WebSocketClient.py +++ b/vnpy/restful/WebSocketClient.py @@ -7,7 +7,7 @@ import ssl import sys import time from abc import abstractmethod -from threading import Thread +from threading import Thread, Lock import websocket @@ -20,7 +20,9 @@ class WebsocketClient(object): """Constructor""" self.host = None # type: str + self._ws_lock = Lock() self._ws = None # type: websocket.WebSocket + self._workerThread = None # type: Thread self._pingThread = None # type: Thread self._active = False @@ -55,17 +57,17 @@ class WebsocketClient(object): #---------------------------------------------------------------------- def sendReq(self, req): # type: (dict)->None """发出请求""" - return self._ws.send(json.dumps(req), opcode=websocket.ABNF.OPCODE_TEXT) + return self._get_ws().send(json.dumps(req), opcode=websocket.ABNF.OPCODE_TEXT) #---------------------------------------------------------------------- def sendText(self, text): # type: (str)->None """发出请求""" - return self._ws.send(text, opcode=websocket.ABNF.OPCODE_TEXT) + return self._get_ws().send(text, opcode=websocket.ABNF.OPCODE_TEXT) #---------------------------------------------------------------------- def sendData(self, data): # type: (bytes)->None """发出请求""" - return self._ws.send_binary(data) + return self._get_ws().send_binary(data) #---------------------------------------------------------------------- def _reconnect(self): @@ -81,22 +83,36 @@ class WebsocketClient(object): #---------------------------------------------------------------------- def _disconnect(self): - """""" - self._ws.close() + """ + 断开连接 + """ + with self._ws_lock: + if self._ws: + self._ws.close() + self._ws = None + + #---------------------------------------------------------------------- + def _get_ws(self): + with self._ws_lock: + return self._ws #---------------------------------------------------------------------- def _run(self): """运行""" + ws = self._get_ws() while self._active: try: - stream = self._ws.recv() + stream = ws.recv() + if not stream: + if self._active: + self._reconnect() + continue + data = json.loads(stream) self.onMessage(data) except: et, ev, tb = sys.exc_info() self.onError(et, ev, tb) - if self._active: - self._reconnect() #---------------------------------------------------------------------- def _runPing(self): @@ -109,7 +125,7 @@ class WebsocketClient(object): #---------------------------------------------------------------------- def _ping(self): - return self._ws.send('ping', websocket.ABNF.OPCODE_PING) + return self._get_ws().send('ping', websocket.ABNF.OPCODE_PING) #---------------------------------------------------------------------- @abstractmethod