From 01122f013d6cf365ea40e3b0278883bcf514e661 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Tue, 2 Jul 2019 09:32:46 +0800 Subject: [PATCH] [Mod] add unbind socket in server stop method --- examples/simple_rpc/test_client.py | 2 +- examples/simple_rpc/test_server.py | 10 ++++----- vnpy/rpc/__init__.py | 33 +++++++++++++++++------------- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/examples/simple_rpc/test_client.py b/examples/simple_rpc/test_client.py index 57dd0909..22689971 100644 --- a/examples/simple_rpc/test_client.py +++ b/examples/simple_rpc/test_client.py @@ -25,7 +25,7 @@ class TestClient(RpcClient): if __name__ == "__main__": req_address = "tcp://localhost:2014" - sub_address = "tcp://localhost:0602" + sub_address = "tcp://localhost:4102" tc = TestClient(req_address, sub_address) tc.subscribe_topic("") diff --git a/examples/simple_rpc/test_server.py b/examples/simple_rpc/test_server.py index 3589bead..2839b861 100644 --- a/examples/simple_rpc/test_server.py +++ b/examples/simple_rpc/test_server.py @@ -10,11 +10,11 @@ class TestServer(RpcServer): Test RpcServer """ - def __init__(self, rep_address, pub_address): + def __init__(self): """ Constructor """ - super(TestServer, self).__init__(rep_address, pub_address) + super(TestServer, self).__init__() self.register(self.add) @@ -28,10 +28,10 @@ class TestServer(RpcServer): if __name__ == "__main__": rep_address = "tcp://*:2014" - pub_address = "tcp://*:0602" + pub_address = "tcp://*:4102" - ts = TestServer(rep_address, pub_address) - ts.start() + ts = TestServer() + ts.start(rep_address, pub_address) while 1: content = f"current server time is {time()}" diff --git a/vnpy/rpc/__init__.py b/vnpy/rpc/__init__.py index f12a1c83..13fdd4fd 100644 --- a/vnpy/rpc/__init__.py +++ b/vnpy/rpc/__init__.py @@ -2,6 +2,7 @@ import threading import traceback import signal import zmq +from typing import Any, Callable # Achieve Ctrl-c interrupt recv @@ -29,7 +30,7 @@ class RemoteException(Exception): class RpcServer: """""" - def __init__(self, rep_address, pub_address): + def __init__(self): """ Constructor """ @@ -41,20 +42,22 @@ class RpcServer: # Reply socket (Request–reply pattern) self.__socket_rep = self.__context.socket(zmq.REP) - self.__socket_rep.bind(rep_address) # Publish socket (Publish–subscribe pattern) self.__socket_pub = self.__context.socket(zmq.PUB) - self.__socket_pub.bind(pub_address) # Worker thread related self.__active = False # RpcServer status self.__thread = threading.Thread(target=self.run) # RpcServer thread - def start(self): + def start(self, rep_address: str, pub_address: str): """ Start RpcServer """ + # Bind socket address + self.__socket_rep.bind(rep_address) + self.__socket_pub.bind(pub_address) + # Start RpcServer status self.__active = True @@ -62,7 +65,7 @@ class RpcServer: if not self.__thread.isAlive(): self.__thread.start() - def stop(self, join=False): + def stop(self, join: bool = False): """ Stop RpcServer """ @@ -73,6 +76,10 @@ class RpcServer: if join and self.__thread.isAlive(): self.__thread.join() + # Unbind socket address + self.__socket_pub.unbind(self.__socket_pub.LAST_ENDPOINT) + self.__socket_rep.unbind(self.__socket_rep.LAST_ENDPOINT) + def run(self): """ Run RpcServer functions @@ -99,13 +106,13 @@ class RpcServer: # send callable response by Reply socket self.__socket_rep.send_pyobj(rep) - def publish(self, topic, data): + def publish(self, topic: str, data: Any): """ Publish data """ self.__socket_pub.send_pyobj([topic, data]) - def register(self, func): + def register(self, func: Callable): """ Register function """ @@ -115,10 +122,8 @@ class RpcServer: class RpcClient: """""" - def __init__(self, req_address, sub_address): + def __init__(self, req_address: str, sub_address: str): """Constructor""" - super(RpcClient, self).__init__() - # zmq port related self.__req_address = req_address self.__sub_address = sub_address @@ -134,7 +139,7 @@ class RpcClient: self.__thread = threading.Thread( target=self.run) # RpcClient thread - def __getattr__(self, name): + def __getattr__(self, name: str): """ Realize remote call function """ @@ -172,7 +177,7 @@ class RpcClient: def stop(self): """ - Stop RpcClient + Stop RpcClient """ # Stop RpcClient status self.__active = False @@ -196,13 +201,13 @@ class RpcClient: # Process data by callable function self.callback(topic, data) - def callback(self, topic, data): + def callback(self, topic: str, data: Any): """ Callable function """ raise NotImplementedError - def subscribe_topic(self, topic): + def subscribe_topic(self, topic: str): """ Subscribe data """