[Mod]add unicode to bytes transfer in rpc

This commit is contained in:
vn.py 2019-04-04 15:57:49 +08:00
parent 0ebe533d4f
commit 1429856d5c
3 changed files with 29 additions and 16 deletions

View File

@ -2,13 +2,14 @@ from __future__ import print_function
from __future__ import absolute_import from __future__ import absolute_import
from time import sleep from time import sleep
from .vnrpc import RpcClient from vnpy.rpc import RpcClient
class TestClient(RpcClient): class TestClient(RpcClient):
""" """
Test RpcClient Test RpcClient
""" """
def __init__(self, req_address, sub_address): def __init__(self, req_address, sub_address):
""" """
Constructor Constructor

View File

@ -2,14 +2,14 @@ from __future__ import print_function
from __future__ import absolute_import from __future__ import absolute_import
from time import sleep, time from time import sleep, time
from .vnrpc import RpcServer from vnpy.rpc import RpcServer
class TestServer(RpcServer): class TestServer(RpcServer):
""" """
Test RpcServer Test RpcServer
""" """
def __init__(self, rep_address, pub_address): def __init__(self, rep_address, pub_address):
""" """
Constructor Constructor

View File

@ -6,9 +6,9 @@ import zmq
from msgpack import packb, unpackb from msgpack import packb, unpackb
from json import dumps, loads from json import dumps, loads
import cPickle import pickle
p_dumps = cPickle.dumps p_dumps = pickle.dumps
p_loads = cPickle.loads p_loads = pickle.loads
# Achieve Ctrl-c interrupt recv # Achieve Ctrl-c interrupt recv
@ -21,7 +21,7 @@ class RpcObject(object):
1) maspack: higher performance, but usually requires the installation of msgpack related tools; 1) maspack: higher performance, but usually requires the installation of msgpack related tools;
2) jason: Slightly lower performance but versatility is better, most programming languages have built-in libraries; 2) jason: Slightly lower performance but versatility is better, most programming languages have built-in libraries;
3) cPickle: Lower performance and only can be used in Python, but it is very convenient to transfer Python objects directly. 3) cPickle: Lower performance and only can be used in Python, but it is very convenient to transfer Python objects directly.
Therefore, it is recommended to use msgpack. Therefore, it is recommended to use msgpack.
Use json, if you want to communicate with some languages without providing msgpack. Use json, if you want to communicate with some languages without providing msgpack.
Use cPickle, when the data being transferred contains many custom Python objects. Use cPickle, when the data being transferred contains many custom Python objects.
@ -115,10 +115,12 @@ class RpcServer(RpcObject):
# Zmq port related # Zmq port related
self.__context = zmq.Context() self.__context = zmq.Context()
self.__socket_rep = self.__context.socket(zmq.REP) # Reply socket (Requestreply pattern) self.__socket_rep = self.__context.socket(
zmq.REP) # Reply socket (Requestreply pattern)
self.__socket_rep.bind(rep_address) self.__socket_rep.bind(rep_address)
self.__socket_pub = self.__context.socket(zmq.PUB) # Publish socket (Publishsubscribe pattern) # Publish socket (Publishsubscribe pattern)
self.__socket_pub = self.__context.socket(zmq.PUB)
self.__socket_pub.bind(pub_address) self.__socket_pub.bind(pub_address)
# Woker thread related # Woker thread related
@ -166,11 +168,13 @@ class RpcServer(RpcObject):
name, args, kwargs = req name, args, kwargs = req
# Try to get and execute callable function object; capture exception information if it fails # Try to get and execute callable function object; capture exception information if it fails
name = name.decode("UTF-8")
try: try:
func = self.__functions[name] func = self.__functions[name]
r = func(*args, **kwargs) r = func(*args, **kwargs)
rep = [True, r] rep = [True, r]
except Exception as e: except Exception as e: # noqa
rep = [False, traceback.format_exc()] rep = [False, traceback.format_exc()]
# Pack response by serialization # Pack response by serialization
@ -184,10 +188,12 @@ class RpcServer(RpcObject):
Publish data Publish data
""" """
# Serialized data # Serialized data
topic = bytes(topic, "UTF-8")
datab = self.pack(data) datab = self.pack(data)
# Send data by Publish socket # Send data by Publish socket
self.__socket_pub.send_multipart([topic, datab]) # topci must be ascii encoding # topci must be ascii encoding
self.__socket_pub.send_multipart([topic, datab])
def register(self, func): def register(self, func):
""" """
@ -208,12 +214,15 @@ class RpcClient(RpcObject):
self.__sub_address = sub_address self.__sub_address = sub_address
self.__context = zmq.Context() self.__context = zmq.Context()
self.__socket_req = self.__context.socket(zmq.REQ) # Request socket (Requestreply pattern) # Request socket (Requestreply pattern)
self.__socket_sub = self.__context.socket(zmq.SUB) # Subscribe socket (Publishsubscribe pattern) self.__socket_req = self.__context.socket(zmq.REQ)
# Subscribe socket (Publishsubscribe pattern)
self.__socket_sub = self.__context.socket(zmq.SUB)
# Woker thread relate, used to process data pushed from server # Woker thread relate, used to process data pushed from server
self.__active = False # RpcClient status self.__active = False # RpcClient status
self.__thread = threading.Thread(target=self.run) # RpcClient thread self.__thread = threading.Thread(
target=self.run) # RpcClient thread
def __getattr__(self, name): def __getattr__(self, name):
""" """
@ -238,7 +247,7 @@ class RpcClient(RpcObject):
if rep[0]: if rep[0]:
return rep[1] return rep[1]
else: else:
raise RemoteException(rep[1]) raise RemoteException(rep[1].decode("UTF-8"))
return dorpc return dorpc
@ -284,6 +293,8 @@ class RpcClient(RpcObject):
data = self.unpack(datab) data = self.unpack(datab)
# Process data by callable function # Process data by callable function
topic = topic.decode("UTF-8")
self.callback(topic, data) self.callback(topic, data)
def callback(self, topic, data): def callback(self, topic, data):
@ -296,6 +307,7 @@ class RpcClient(RpcObject):
""" """
Subscribe data Subscribe data
""" """
topic = bytes(topic, "UTF-8")
self.__socket_sub.setsockopt(zmq.SUBSCRIBE, topic) self.__socket_sub.setsockopt(zmq.SUBSCRIBE, topic)