diff --git a/vnpy/restful/WebSocketClient.py b/vnpy/restful/WebSocketClient.py index fd6071fb..fdcba4ef 100644 --- a/vnpy/restful/WebSocketClient.py +++ b/vnpy/restful/WebSocketClient.py @@ -7,7 +7,7 @@ import ssl import sys import time from abc import abstractmethod -from threading import Thread +from threading import Thread, Lock import websocket @@ -20,7 +20,9 @@ class WebsocketClient(object): """Constructor""" self.host = None # type: str + self._ws_lock = Lock() self._ws = None # type: websocket.WebSocket + self._workerThread = None # type: Thread self._pingThread = None # type: Thread self._active = False @@ -55,17 +57,17 @@ class WebsocketClient(object): #---------------------------------------------------------------------- def sendReq(self, req): # type: (dict)->None """发出请求""" - return self._ws.send(json.dumps(req), opcode=websocket.ABNF.OPCODE_TEXT) + return self._get_ws().send(json.dumps(req), opcode=websocket.ABNF.OPCODE_TEXT) #---------------------------------------------------------------------- def sendText(self, text): # type: (str)->None """发出请求""" - return self._ws.send(text, opcode=websocket.ABNF.OPCODE_TEXT) + return self._get_ws().send(text, opcode=websocket.ABNF.OPCODE_TEXT) #---------------------------------------------------------------------- def sendData(self, data): # type: (bytes)->None """发出请求""" - return self._ws.send_binary(data) + return self._get_ws().send_binary(data) #---------------------------------------------------------------------- def _reconnect(self): @@ -81,22 +83,36 @@ class WebsocketClient(object): #---------------------------------------------------------------------- def _disconnect(self): - """""" - self._ws.close() + """ + 断开连接 + """ + with self._ws_lock: + if self._ws: + self._ws.close() + self._ws = None + + #---------------------------------------------------------------------- + def _get_ws(self): + with self._ws_lock: + return self._ws #---------------------------------------------------------------------- def _run(self): """运行""" + ws = self._get_ws() while self._active: try: - stream = self._ws.recv() + stream = ws.recv() + if not stream: + if self._active: + self._reconnect() + continue + data = json.loads(stream) self.onMessage(data) except: et, ev, tb = sys.exc_info() self.onError(et, ev, tb) - if self._active: - self._reconnect() #---------------------------------------------------------------------- def _runPing(self): @@ -109,7 +125,7 @@ class WebsocketClient(object): #---------------------------------------------------------------------- def _ping(self): - return self._ws.send('ping', websocket.ABNF.OPCODE_PING) + return self._get_ws().send('ping', websocket.ABNF.OPCODE_PING) #---------------------------------------------------------------------- @abstractmethod