diff --git a/vnpy/rpc/test_client.py b/vnpy/rpc/test_client.py index 17f42558..7a693a52 100644 --- a/vnpy/rpc/test_client.py +++ b/vnpy/rpc/test_client.py @@ -2,13 +2,14 @@ from __future__ import print_function from __future__ import absolute_import from time import sleep -from .vnrpc import RpcClient +from vnpy.rpc import RpcClient class TestClient(RpcClient): """ Test RpcClient - """ + """ + def __init__(self, req_address, sub_address): """ Constructor diff --git a/vnpy/rpc/test_server.py b/vnpy/rpc/test_server.py index ee4ca6e4..660168fc 100644 --- a/vnpy/rpc/test_server.py +++ b/vnpy/rpc/test_server.py @@ -2,14 +2,14 @@ from __future__ import print_function from __future__ import absolute_import from time import sleep, time -from .vnrpc import RpcServer +from vnpy.rpc import RpcServer class TestServer(RpcServer): """ Test RpcServer """ - + def __init__(self, rep_address, pub_address): """ Constructor diff --git a/vnpy/rpc/vnrpc.py b/vnpy/rpc/vnrpc.py index 3debfa65..1cdd835a 100644 --- a/vnpy/rpc/vnrpc.py +++ b/vnpy/rpc/vnrpc.py @@ -6,9 +6,9 @@ import zmq from msgpack import packb, unpackb from json import dumps, loads -import cPickle -p_dumps = cPickle.dumps -p_loads = cPickle.loads +import pickle +p_dumps = pickle.dumps +p_loads = pickle.loads # Achieve Ctrl-c interrupt recv @@ -21,7 +21,7 @@ class RpcObject(object): 1) maspack: higher performance, but usually requires the installation of msgpack related tools; 2) jason: Slightly lower performance but versatility is better, most programming languages have built-in libraries; 3) cPickle: Lower performance and only can be used in Python, but it is very convenient to transfer Python objects directly. - + Therefore, it is recommended to use msgpack. Use json, if you want to communicate with some languages without providing msgpack. Use cPickle, when the data being transferred contains many custom Python objects. @@ -115,10 +115,12 @@ class RpcServer(RpcObject): # Zmq port related self.__context = zmq.Context() - self.__socket_rep = self.__context.socket(zmq.REP) # Reply socket (Request–reply pattern) + self.__socket_rep = self.__context.socket( + zmq.REP) # Reply socket (Request–reply pattern) self.__socket_rep.bind(rep_address) - self.__socket_pub = self.__context.socket(zmq.PUB) # Publish socket (Publish–subscribe pattern) + # Publish socket (Publish–subscribe pattern) + self.__socket_pub = self.__context.socket(zmq.PUB) self.__socket_pub.bind(pub_address) # Woker thread related @@ -166,11 +168,13 @@ class RpcServer(RpcObject): name, args, kwargs = req # Try to get and execute callable function object; capture exception information if it fails + name = name.decode("UTF-8") + try: func = self.__functions[name] r = func(*args, **kwargs) rep = [True, r] - except Exception as e: + except Exception as e: # noqa rep = [False, traceback.format_exc()] # Pack response by serialization @@ -184,10 +188,12 @@ class RpcServer(RpcObject): Publish data """ # Serialized data + topic = bytes(topic, "UTF-8") datab = self.pack(data) # Send data by Publish socket - self.__socket_pub.send_multipart([topic, datab]) # topci must be ascii encoding + # topci must be ascii encoding + self.__socket_pub.send_multipart([topic, datab]) def register(self, func): """ @@ -208,12 +214,15 @@ class RpcClient(RpcObject): self.__sub_address = sub_address self.__context = zmq.Context() - self.__socket_req = self.__context.socket(zmq.REQ) # Request socket (Request–reply pattern) - self.__socket_sub = self.__context.socket(zmq.SUB) # Subscribe socket (Publish–subscribe pattern) + # Request socket (Request–reply pattern) + self.__socket_req = self.__context.socket(zmq.REQ) + # Subscribe socket (Publish–subscribe pattern) + self.__socket_sub = self.__context.socket(zmq.SUB) # Woker thread relate, used to process data pushed from server self.__active = False # RpcClient status - self.__thread = threading.Thread(target=self.run) # RpcClient thread + self.__thread = threading.Thread( + target=self.run) # RpcClient thread def __getattr__(self, name): """ @@ -238,7 +247,7 @@ class RpcClient(RpcObject): if rep[0]: return rep[1] else: - raise RemoteException(rep[1]) + raise RemoteException(rep[1].decode("UTF-8")) return dorpc @@ -284,6 +293,8 @@ class RpcClient(RpcObject): data = self.unpack(datab) # Process data by callable function + topic = topic.decode("UTF-8") + self.callback(topic, data) def callback(self, topic, data): @@ -296,6 +307,7 @@ class RpcClient(RpcObject): """ Subscribe data """ + topic = bytes(topic, "UTF-8") self.__socket_sub.setsockopt(zmq.SUBSCRIBE, topic)