diff --git a/examples/client_server/server/run_server.py b/examples/client_server/server/run_server.py index 8d9f006a..6af29e7e 100644 --- a/examples/client_server/server/run_server.py +++ b/examples/client_server/server/run_server.py @@ -44,13 +44,13 @@ def main_terminal(): rpc_engine = main_engine.add_app(RpcServiceApp) setting = { - "用户名": "25500029", - "密码": "20140602", - "经纪商代码": "6010", - "交易服务器": "180.169.75.194:41305", - "行情服务器": "180.166.1.17:41313", - "产品名称": "vntech_vnpy_2.0", - "授权编码": "0Y1J5UIMY79BFL7S", + "用户名": "", + "密码": "", + "经纪商代码": "9999", + "交易服务器": "180.168.146.187:10101", + "行情服务器": "180.168.146.187:10111", + "产品名称": "simnow_client_test", + "授权编码": "0000000000000000", "产品信息": "" } main_engine.connect(setting, "CTP") diff --git a/vnpy/app/rpc_service/engine.py b/vnpy/app/rpc_service/engine.py index 62a355a5..5a5fc967 100644 --- a/vnpy/app/rpc_service/engine.py +++ b/vnpy/app/rpc_service/engine.py @@ -1,6 +1,7 @@ """""" import traceback +from typing import Optional from vnpy.event import Event, EventEngine from vnpy.rpc import RpcServer @@ -24,7 +25,7 @@ class RpcEngine(BaseEngine): self.rep_address = "tcp://*:2014" self.pub_address = "tcp://*:4102" - self.server = None + self.server: Optional[RpcServer] = None self.init_server() self.load_setting() @@ -96,6 +97,7 @@ class RpcEngine(BaseEngine): return False self.server.stop() + self.server.join() self.write_log("RPC服务已停止") return True diff --git a/vnpy/gateway/rpc/rpc_gateway.py b/vnpy/gateway/rpc/rpc_gateway.py index a1807adb..215e079d 100644 --- a/vnpy/gateway/rpc/rpc_gateway.py +++ b/vnpy/gateway/rpc/rpc_gateway.py @@ -101,6 +101,7 @@ class RpcGateway(BaseGateway): def close(self): """""" self.client.stop() + self.client.join() def client_callback(self, topic: str, event: Event): """""" diff --git a/vnpy/rpc/__init__.py b/vnpy/rpc/__init__.py index 52b2ec0d..d1b4877a 100644 --- a/vnpy/rpc/__init__.py +++ b/vnpy/rpc/__init__.py @@ -1,13 +1,23 @@ +import signal import threading import traceback -import signal -import zmq +from datetime import datetime, timedelta +from functools import lru_cache from typing import Any, Callable +import zmq + +_ = lambda x: x # Achieve Ctrl-c interrupt recv +from zmq.backend.cython.constants import NOBLOCK + signal.signal(signal.SIGINT, signal.SIG_DFL) +KEEP_ALIVE_TOPIC = '_keep_alive' +KEEP_ALIVE_INTERVAL = timedelta(seconds=1) +KEEP_ALIVE_TOLERANCE = timedelta(seconds=3) + class RemoteException(Exception): """ @@ -50,6 +60,8 @@ class RpcServer: self.__active = False # RpcServer status self.__thread = None # RpcServer thread + self._register(KEEP_ALIVE_TOPIC, lambda n: n) + def is_active(self): """""" return self.__active @@ -82,21 +94,29 @@ class RpcServer: # Stop RpcServer status self.__active = False + # Unbind socket address + self.__socket_pub.unbind(self.__socket_pub.LAST_ENDPOINT) + self.__socket_rep.unbind(self.__socket_rep.LAST_ENDPOINT) + + def join(self): # Wait for RpcServer thread to exit if self.__thread.isAlive(): self.__thread.join() self.__thread = None - # Unbind socket address - self.__socket_pub.unbind(self.__socket_pub.LAST_ENDPOINT) - self.__socket_rep.unbind(self.__socket_rep.LAST_ENDPOINT) - def run(self): """ Run RpcServer functions """ + start = datetime.utcnow() while self.__active: # Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds) + cur = datetime.utcnow() + delta = cur - start + + if delta >= KEEP_ALIVE_INTERVAL: + self.publish(KEEP_ALIVE_TOPIC, cur) + if not self.__socket_rep.poll(1000): continue @@ -127,7 +147,13 @@ class RpcServer: """ Register function """ - self.__functions[func.__name__] = func + return self._register(func.__name__, func) + + def _register(self, name: str, func: Callable): + """ + Register function + """ + self.__functions[name] = func class RpcClient: @@ -145,21 +171,27 @@ class RpcClient: self.__socket_sub = self.__context.socket(zmq.SUB) # Worker thread relate, used to process data pushed from server - self.__active = False # RpcClient status - self.__thread = None # RpcClient thread + self.__active = False # RpcClient status + self.__thread = None # RpcClient thread + self.__lock = threading.Lock() + self._last_received_ping: datetime = datetime.utcnow() + + @lru_cache(100) def __getattr__(self, name: str): """ Realize remote call function """ + # Perform remote call task def dorpc(*args, **kwargs): # Generate request req = [name, args, kwargs] # Send request and wait for response - self.__socket_req.send_pyobj(req) - rep = self.__socket_req.recv_pyobj() + with self.__lock: + self.__socket_req.send_pyobj(req) + rep = self.__socket_req.recv_pyobj() # Return response if successed; Trigger exception if failed if rep[0]: @@ -187,6 +219,8 @@ class RpcClient: self.__thread = threading.Thread(target=self.run) self.__thread.start() + self._last_received_ping = datetime.utcnow() + def stop(self): """ Stop RpcClient @@ -197,29 +231,39 @@ class RpcClient: # Stop RpcClient status self.__active = False + # Close socket + self.__socket_req.close() + self.__socket_sub.close() + + def join(self): # Wait for RpcClient thread to exit if self.__thread.isAlive(): self.__thread.join() self.__thread = None - # Close socket - self.__socket_req.close() - self.__socket_sub.close() - def run(self): """ Run RpcClient function """ + pull_tolerance = int(KEEP_ALIVE_TOLERANCE.total_seconds() * 1000) while self.__active: - # Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds) - if not self.__socket_sub.poll(1000): + if not self.__socket_sub.poll(pull_tolerance): + self._on_unexpected_disconnected() continue # Receive data from subscribe socket - topic, data = self.__socket_sub.recv_pyobj() + topic, data = self.__socket_sub.recv_pyobj(flags=NOBLOCK) - # Process data by callable function - self.callback(topic, data) + if topic == KEEP_ALIVE_TOPIC: + self._last_received_ping = data + else: + # Process data by callable function + self.callback(topic, data) + + @staticmethod + def _on_unexpected_disconnected(): + print(_("RpcServer has no response over {tolerance} seconds, please check you connection." + .format(tolerance=KEEP_ALIVE_TOLERANCE.total_seconds()))) def callback(self, topic: str, data: Any): """