From 3d99ccb5f8b71c9774bdc35845684345a476c236 Mon Sep 17 00:00:00 2001 From: 1122455801 Date: Tue, 2 Apr 2019 17:07:04 +0800 Subject: [PATCH 1/4] Create __init__.py --- vnpy/rpc/__init__.py | 1 + 1 file changed, 1 insertion(+) create mode 100644 vnpy/rpc/__init__.py diff --git a/vnpy/rpc/__init__.py b/vnpy/rpc/__init__.py new file mode 100644 index 00000000..b903d2bb --- /dev/null +++ b/vnpy/rpc/__init__.py @@ -0,0 +1 @@ +from .vnrpc import RpcServer, RpcClient, RemoteException \ No newline at end of file From e4a19972960b2b2618a380239d8e9e0687296d0f Mon Sep 17 00:00:00 2001 From: 1122455801 Date: Tue, 2 Apr 2019 17:07:07 +0800 Subject: [PATCH 2/4] Create test_client.py --- vnpy/rpc/test_client.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 vnpy/rpc/test_client.py diff --git a/vnpy/rpc/test_client.py b/vnpy/rpc/test_client.py new file mode 100644 index 00000000..17f42558 --- /dev/null +++ b/vnpy/rpc/test_client.py @@ -0,0 +1,35 @@ +from __future__ import print_function +from __future__ import absolute_import +from time import sleep + +from .vnrpc import RpcClient + + +class TestClient(RpcClient): + """ + Test RpcClient + """ + def __init__(self, req_address, sub_address): + """ + Constructor + """ + super(TestClient, self).__init__(req_address, sub_address) + + def callback(self, topic, data): + """ + Realize callable function + """ + print('client received topic:', topic, ', data:', data) + + +if __name__ == '__main__': + req_address = 'tcp://localhost:2014' + sub_address = 'tcp://localhost:0602' + + tc = TestClient(req_address, sub_address) + tc.subscribeTopic('') + tc.start() + + while 1: + print(tc.add(1, 3)) + sleep(2) From b3a72c1283ac678b6e7d3b4c3123791fc978d0d9 Mon Sep 17 00:00:00 2001 From: 1122455801 Date: Tue, 2 Apr 2019 17:07:10 +0800 Subject: [PATCH 3/4] Create test_server.py --- vnpy/rpc/test_server.py | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 vnpy/rpc/test_server.py diff --git a/vnpy/rpc/test_server.py b/vnpy/rpc/test_server.py new file mode 100644 index 00000000..ee4ca6e4 --- /dev/null +++ b/vnpy/rpc/test_server.py @@ -0,0 +1,40 @@ +from __future__ import print_function +from __future__ import absolute_import +from time import sleep, time + +from .vnrpc import RpcServer + + +class TestServer(RpcServer): + """ + Test RpcServer + """ + + def __init__(self, rep_address, pub_address): + """ + Constructor + """ + super(TestServer, self).__init__(rep_address, pub_address) + + self.register(self.add) + + def add(self, a, b): + """ + Test function + """ + print('receiving: %s, %s' % (a, b)) + return a + b + + +if __name__ == '__main__': + rep_address = 'tcp://*:2014' + pub_address = 'tcp://*:0602' + + ts = TestServer(rep_address, pub_address) + ts.start() + + while 1: + content = 'current server time is %s' % time() + print(content) + ts.publish('test', content) + sleep(2) From d5585ff269868ed5407bc573a851860fdb35a5ec Mon Sep 17 00:00:00 2001 From: 1122455801 Date: Tue, 2 Apr 2019 17:07:14 +0800 Subject: [PATCH 4/4] Create vnrpc.py --- vnpy/rpc/vnrpc.py | 317 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 317 insertions(+) create mode 100644 vnpy/rpc/vnrpc.py diff --git a/vnpy/rpc/vnrpc.py b/vnpy/rpc/vnrpc.py new file mode 100644 index 00000000..3debfa65 --- /dev/null +++ b/vnpy/rpc/vnrpc.py @@ -0,0 +1,317 @@ +import threading +import traceback +import signal + +import zmq +from msgpack import packb, unpackb +from json import dumps, loads + +import cPickle +p_dumps = cPickle.dumps +p_loads = cPickle.loads + + +# Achieve Ctrl-c interrupt recv +signal.signal(signal.SIGINT, signal.SIG_DFL) + + +class RpcObject(object): + """ + Referred to serialization of packing and unpacking, we offer 3 tools: + 1) maspack: higher performance, but usually requires the installation of msgpack related tools; + 2) jason: Slightly lower performance but versatility is better, most programming languages have built-in libraries; + 3) cPickle: Lower performance and only can be used in Python, but it is very convenient to transfer Python objects directly. + + Therefore, it is recommended to use msgpack. + Use json, if you want to communicate with some languages without providing msgpack. + Use cPickle, when the data being transferred contains many custom Python objects. + """ + + def __init__(self): + """ + Constructor + Use msgpack as default serialization tool + """ + self.use_msgpack() + + def pack(self, data): + """""" + pass + + def unpack(self, data): + """""" + pass + + def __json_pack(self, data): + """ + Pack with json + """ + return dumps(data) + + def __json_unpack(self, data): + """ + Unpack with json + """ + return loads(data) + + def __msgpack_pack(self, data): + """ + Pack with msgpack + """ + return packb(data) + + def __msgpack_unpack(self, data): + """ + Unpack with msgpack + """ + return unpackb(data) + + def __pickle_pack(self, data): + """ + Pack with cPickle + """ + return p_dumps(data) + + def __pickle_unpack(self, data): + """ + Unpack with cPickle + """ + return p_loads(data) + + def use_json(self): + """ + Use json as serialization tool + """ + self.pack = self.__json_pack + self.unpack = self.__json_unpack + + def use_msgpack(self): + """ + Use msgpack as serialization tool + """ + self.pack = self.__msgpack_pack + self.unpack = self.__msgpack_unpack + + def use_pickle(self): + """ + Use cPickle as serialization tool + """ + self.pack = self.__pickle_pack + self.unpack = self.__pickle_unpack + + +class RpcServer(RpcObject): + """""" + + def __init__(self, rep_address, pub_address): + """ + Constructor + """ + super(RpcServer, self).__init__() + + # Save functions dict: key is fuction name, value is fuction object + self.__functions = {} + + # Zmq port related + self.__context = zmq.Context() + + self.__socket_rep = self.__context.socket(zmq.REP) # Reply socket (Request–reply pattern) + self.__socket_rep.bind(rep_address) + + self.__socket_pub = self.__context.socket(zmq.PUB) # Publish socket (Publish–subscribe pattern) + self.__socket_pub.bind(pub_address) + + # Woker thread related + self.__active = False # RpcServer status + self.__thread = threading.Thread(target=self.run) # RpcServer thread + + def start(self): + """ + Start RpcServer + """ + # Start RpcServer status + self.__active = True + + # Start RpcServer thread + if not self.__thread.isAlive(): + self.__thread.start() + + def stop(self, join=False): + """ + Stop RpcServer + """ + # Stop RpcServer status + self.__active = False + + # Wait for RpcServer thread to exit + if join and self.__thread.isAlive(): + self.__thread.join() + + def run(self): + """ + Run RpcServer functions + """ + while self.__active: + # Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds) + if not self.__socket_rep.poll(1000): + continue + + # Receive request data from Reply socket + reqb = self.__socket_rep.recv() + + # Unpack request by deserialization + req = self.unpack(reqb) + + # Get function name and parameters + name, args, kwargs = req + + # Try to get and execute callable function object; capture exception information if it fails + try: + func = self.__functions[name] + r = func(*args, **kwargs) + rep = [True, r] + except Exception as e: + rep = [False, traceback.format_exc()] + + # Pack response by serialization + repb = self.pack(rep) + + # send callable response by Reply socket + self.__socket_rep.send(repb) + + def publish(self, topic, data): + """ + Publish data + """ + # Serialized data + datab = self.pack(data) + + # Send data by Publish socket + self.__socket_pub.send_multipart([topic, datab]) # topci must be ascii encoding + + def register(self, func): + """ + Register function + """ + self.__functions[func.__name__] = func + + +class RpcClient(RpcObject): + """""" + + def __init__(self, req_address, sub_address): + """Constructor""" + super(RpcClient, self).__init__() + + # zmq port related + self.__req_address = req_address + self.__sub_address = sub_address + + self.__context = zmq.Context() + self.__socket_req = self.__context.socket(zmq.REQ) # Request socket (Request–reply pattern) + self.__socket_sub = self.__context.socket(zmq.SUB) # Subscribe socket (Publish–subscribe pattern) + + # Woker thread relate, used to process data pushed from server + self.__active = False # RpcClient status + self.__thread = threading.Thread(target=self.run) # RpcClient thread + + def __getattr__(self, name): + """ + Realize remote call function + """ + # Perform remote call task + def dorpc(*args, **kwargs): + # Generate request + req = [name, args, kwargs] + + # Pack request by serialization + reqb = self.pack(req) + + # Send request and wait for response + self.__socket_req.send(reqb) + repb = self.__socket_req.recv() + + # Unpack response by deserialization + rep = self.unpack(repb) + + # Return response if successed; Trigger exception if failed + if rep[0]: + return rep[1] + else: + raise RemoteException(rep[1]) + + return dorpc + + def start(self): + """ + Start RpcClient + """ + # Connect zmq port + self.__socket_req.connect(self.__req_address) + self.__socket_sub.connect(self.__sub_address) + + # Start RpcClient status + self.__active = True + + # Start RpcClient thread + if not self.__thread.isAlive(): + self.__thread.start() + + def stop(self): + """ + Stop RpcClient + """ + # Stop RpcClient status + self.__active = False + + # Wait for RpcClient thread to exit + if self.__thread.isAlive(): + self.__thread.join() + + def run(self): + """ + Run RpcClient function + """ + while self.__active: + # Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds) + if not self.__socket_sub.poll(1000): + continue + + # Receive data from subscribe socket + topic, datab = self.__socket_sub.recv_multipart() + + # Unpack data by deserialization + data = self.unpack(datab) + + # Process data by callable function + self.callback(topic, data) + + def callback(self, topic, data): + """ + Callable function + """ + raise NotImplementedError + + def subscribeTopic(self, topic): + """ + Subscribe data + """ + self.__socket_sub.setsockopt(zmq.SUBSCRIBE, topic) + + +class RemoteException(Exception): + """ + RPC remote exception + """ + + def __init__(self, value): + """ + Constructor + """ + self.__value = value + + def __str__(self): + """ + Output error message + """ + return self.__value