Create vnrpc.py

This commit is contained in:
1122455801 2019-04-02 17:07:14 +08:00
parent b3a72c1283
commit d5585ff269

317
vnpy/rpc/vnrpc.py Normal file
View File

@ -0,0 +1,317 @@
import threading
import traceback
import signal
import zmq
from msgpack import packb, unpackb
from json import dumps, loads
import cPickle
p_dumps = cPickle.dumps
p_loads = cPickle.loads
# Achieve Ctrl-c interrupt recv
signal.signal(signal.SIGINT, signal.SIG_DFL)
class RpcObject(object):
"""
Referred to serialization of packing and unpacking, we offer 3 tools:
1) maspack: higher performance, but usually requires the installation of msgpack related tools;
2) jason: Slightly lower performance but versatility is better, most programming languages have built-in libraries;
3) cPickle: Lower performance and only can be used in Python, but it is very convenient to transfer Python objects directly.
Therefore, it is recommended to use msgpack.
Use json, if you want to communicate with some languages without providing msgpack.
Use cPickle, when the data being transferred contains many custom Python objects.
"""
def __init__(self):
"""
Constructor
Use msgpack as default serialization tool
"""
self.use_msgpack()
def pack(self, data):
""""""
pass
def unpack(self, data):
""""""
pass
def __json_pack(self, data):
"""
Pack with json
"""
return dumps(data)
def __json_unpack(self, data):
"""
Unpack with json
"""
return loads(data)
def __msgpack_pack(self, data):
"""
Pack with msgpack
"""
return packb(data)
def __msgpack_unpack(self, data):
"""
Unpack with msgpack
"""
return unpackb(data)
def __pickle_pack(self, data):
"""
Pack with cPickle
"""
return p_dumps(data)
def __pickle_unpack(self, data):
"""
Unpack with cPickle
"""
return p_loads(data)
def use_json(self):
"""
Use json as serialization tool
"""
self.pack = self.__json_pack
self.unpack = self.__json_unpack
def use_msgpack(self):
"""
Use msgpack as serialization tool
"""
self.pack = self.__msgpack_pack
self.unpack = self.__msgpack_unpack
def use_pickle(self):
"""
Use cPickle as serialization tool
"""
self.pack = self.__pickle_pack
self.unpack = self.__pickle_unpack
class RpcServer(RpcObject):
""""""
def __init__(self, rep_address, pub_address):
"""
Constructor
"""
super(RpcServer, self).__init__()
# Save functions dict: key is fuction name, value is fuction object
self.__functions = {}
# Zmq port related
self.__context = zmq.Context()
self.__socket_rep = self.__context.socket(zmq.REP) # Reply socket (Requestreply pattern)
self.__socket_rep.bind(rep_address)
self.__socket_pub = self.__context.socket(zmq.PUB) # Publish socket (Publishsubscribe pattern)
self.__socket_pub.bind(pub_address)
# Woker thread related
self.__active = False # RpcServer status
self.__thread = threading.Thread(target=self.run) # RpcServer thread
def start(self):
"""
Start RpcServer
"""
# Start RpcServer status
self.__active = True
# Start RpcServer thread
if not self.__thread.isAlive():
self.__thread.start()
def stop(self, join=False):
"""
Stop RpcServer
"""
# Stop RpcServer status
self.__active = False
# Wait for RpcServer thread to exit
if join and self.__thread.isAlive():
self.__thread.join()
def run(self):
"""
Run RpcServer functions
"""
while self.__active:
# Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds)
if not self.__socket_rep.poll(1000):
continue
# Receive request data from Reply socket
reqb = self.__socket_rep.recv()
# Unpack request by deserialization
req = self.unpack(reqb)
# Get function name and parameters
name, args, kwargs = req
# Try to get and execute callable function object; capture exception information if it fails
try:
func = self.__functions[name]
r = func(*args, **kwargs)
rep = [True, r]
except Exception as e:
rep = [False, traceback.format_exc()]
# Pack response by serialization
repb = self.pack(rep)
# send callable response by Reply socket
self.__socket_rep.send(repb)
def publish(self, topic, data):
"""
Publish data
"""
# Serialized data
datab = self.pack(data)
# Send data by Publish socket
self.__socket_pub.send_multipart([topic, datab]) # topci must be ascii encoding
def register(self, func):
"""
Register function
"""
self.__functions[func.__name__] = func
class RpcClient(RpcObject):
""""""
def __init__(self, req_address, sub_address):
"""Constructor"""
super(RpcClient, self).__init__()
# zmq port related
self.__req_address = req_address
self.__sub_address = sub_address
self.__context = zmq.Context()
self.__socket_req = self.__context.socket(zmq.REQ) # Request socket (Requestreply pattern)
self.__socket_sub = self.__context.socket(zmq.SUB) # Subscribe socket (Publishsubscribe pattern)
# Woker thread relate, used to process data pushed from server
self.__active = False # RpcClient status
self.__thread = threading.Thread(target=self.run) # RpcClient thread
def __getattr__(self, name):
"""
Realize remote call function
"""
# Perform remote call task
def dorpc(*args, **kwargs):
# Generate request
req = [name, args, kwargs]
# Pack request by serialization
reqb = self.pack(req)
# Send request and wait for response
self.__socket_req.send(reqb)
repb = self.__socket_req.recv()
# Unpack response by deserialization
rep = self.unpack(repb)
# Return response if successed; Trigger exception if failed
if rep[0]:
return rep[1]
else:
raise RemoteException(rep[1])
return dorpc
def start(self):
"""
Start RpcClient
"""
# Connect zmq port
self.__socket_req.connect(self.__req_address)
self.__socket_sub.connect(self.__sub_address)
# Start RpcClient status
self.__active = True
# Start RpcClient thread
if not self.__thread.isAlive():
self.__thread.start()
def stop(self):
"""
Stop RpcClient
"""
# Stop RpcClient status
self.__active = False
# Wait for RpcClient thread to exit
if self.__thread.isAlive():
self.__thread.join()
def run(self):
"""
Run RpcClient function
"""
while self.__active:
# Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds)
if not self.__socket_sub.poll(1000):
continue
# Receive data from subscribe socket
topic, datab = self.__socket_sub.recv_multipart()
# Unpack data by deserialization
data = self.unpack(datab)
# Process data by callable function
self.callback(topic, data)
def callback(self, topic, data):
"""
Callable function
"""
raise NotImplementedError
def subscribeTopic(self, topic):
"""
Subscribe data
"""
self.__socket_sub.setsockopt(zmq.SUBSCRIBE, topic)
class RemoteException(Exception):
"""
RPC remote exception
"""
def __init__(self, value):
"""
Constructor
"""
self.__value = value
def __str__(self):
"""
Output error message
"""
return self.__value