diff --git a/vnpy/api/websocket/websocket_client.py b/vnpy/api/websocket/websocket_client.py index 870d87a4..e7c767a0 100644 --- a/vnpy/api/websocket/websocket_client.py +++ b/vnpy/api/websocket/websocket_client.py @@ -125,52 +125,55 @@ class WebsocketClient(object): if ws: ws._send_binary(data) - def _reconnect(self): - """""" - if self._active: - self._disconnect() - self._connect() - def _create_connection(self, *args, **kwargs): """""" return websocket.create_connection(*args, **kwargs) - def _connect(self): + def _ensure_connection(self): """""" - self._ws = self._create_connection( - self.host, - sslopt={"cert_reqs": ssl.CERT_NONE}, - http_proxy_host=self.proxy_host, - http_proxy_port=self.proxy_port, - header=self.header - ) - self.on_connected() + triggered = False + with self._ws_lock: + if self._ws is None: + self._ws = self._create_connection( + self.host, + sslopt={"cert_reqs": ssl.CERT_NONE}, + http_proxy_host=self.proxy_host, + http_proxy_port=self.proxy_port, + header=self.header + ) + triggered = True + if triggered: + self.on_connected() def _disconnect(self): """ """ + triggered = False with self._ws_lock: if self._ws: - self._ws.close() + ws: websocket.WebSocket = self._ws self._ws = None + triggered = True + if triggered: + ws.close() + self.on_disconnected() + def _run(self): """ Keep running till stop is called. """ try: - self._connect() - - # todo: onDisconnect while self._active: try: + self._ensure_connection() ws = self._ws if ws: text = ws.recv() # ws object is closed when recv function is blocking if not text: - self._reconnect() + self._disconnect() continue self._record_last_received_text(text) @@ -185,17 +188,17 @@ class WebsocketClient(object): # ws is closed before recv function is called # For socket.error, see Issue #1608 except (websocket.WebSocketConnectionClosedException, socket.error): - self._reconnect() + self._disconnect() # other internal exception raised in on_packet except: # noqa et, ev, tb = sys.exc_info() self.on_error(et, ev, tb) - self._reconnect() + self._disconnect() except: # noqa et, ev, tb = sys.exc_info() self.on_error(et, ev, tb) - self._reconnect() + self._disconnect() @staticmethod def unpack_data(data: str): @@ -214,7 +217,10 @@ class WebsocketClient(object): except: # noqa et, ev, tb = sys.exc_info() self.on_error(et, ev, tb) - self._reconnect() + + # self._run() will reconnect websocket + sleep(1) + for i in range(self.ping_interval): if not self._active: break diff --git a/vnpy/gateway/ctp/ctp_gateway.py b/vnpy/gateway/ctp/ctp_gateway.py index c92781b7..2b3eb8c7 100644 --- a/vnpy/gateway/ctp/ctp_gateway.py +++ b/vnpy/gateway/ctp/ctp_gateway.py @@ -716,6 +716,7 @@ class CtpTdApi(TdApi): ctp_req = { "InstrumentID": req.symbol, + "ExchangeID": req.exchange.value, "LimitPrice": req.price, "VolumeTotalOriginal": int(req.volume), "OrderPriceType": ORDERTYPE_VT2CTP.get(req.type, ""),