diff --git a/vnpy/rpc/README.md b/vnpy/rpc/README.md new file mode 100644 index 00000000..5292bdcf --- /dev/null +++ b/vnpy/rpc/README.md @@ -0,0 +1,19 @@ +# vn.rpc + +### 简介 + +提供跨进程服务调用的RPC模块,同时支持服务端向客户端的主动数据推送,用于实现vn.py框架下模块的多进程解耦。 + +### 说明 + +1. 使用zmq作为底层通讯库 + +2. 目前支持两种数据序列化方案:msgpack(默认)和json,用户在RpcObject中可以自行添加其他方案 + +3. 客户端和服务端通过REQ-REP模式实现跨进程服务调用 + +4. 客户端和服务端通过SUB-PUB模式实现主动数据推送 + +5. RpcClient的send和RpcServer的publish函数不是多线程安全的,在多线程中使用时需要用户自行加锁,否则可能导致zmq底层崩溃 + +6. 考虑到vn.rpc的主要应用场景是本机多进程或者局域网内分布式架构,网络可靠性较高,因此没有在模块中提供心跳功能,用户可以视乎自己的需求添加 diff --git a/vnpy/rpc/__init__.py b/vnpy/rpc/__init__.py new file mode 100644 index 00000000..bf66414e --- /dev/null +++ b/vnpy/rpc/__init__.py @@ -0,0 +1,3 @@ +# encoding: UTF-8 + +from .vnrpc import RpcServer, RpcClient, RemoteException \ No newline at end of file diff --git a/vnpy/rpc/testClient.py b/vnpy/rpc/testClient.py new file mode 100644 index 00000000..e613cad5 --- /dev/null +++ b/vnpy/rpc/testClient.py @@ -0,0 +1,33 @@ +# encoding: UTF-8 + +from time import sleep + +from vnrpc import RpcClient + + +######################################################################## +class TestClient(RpcClient): + """""" + + #---------------------------------------------------------------------- + def __init__(self, reqAddress, subAddress): + """Constructor""" + super(TestClient, self).__init__(reqAddress, subAddress) + + #---------------------------------------------------------------------- + def callback(self, topic, data): + """回调函数实现""" + print 'client received topic:', topic, ', data:', data + + +if __name__ == '__main__': + reqAddress = 'tcp://localhost:2014' + subAddress = 'tcp://localhost:0602' + + tc = TestClient(reqAddress, subAddress) + tc.subscribeTopic('') + tc.start() + + while 1: + print tc.add(1, 3) + sleep(2) \ No newline at end of file diff --git a/vnpy/rpc/testServer.py b/vnpy/rpc/testServer.py new file mode 100644 index 00000000..f067603b --- /dev/null +++ b/vnpy/rpc/testServer.py @@ -0,0 +1,37 @@ +# encoding: UTF-8 + +from time import sleep, time + +from vnrpc import RpcServer + + +######################################################################## +class TestServer(RpcServer): + """测试服务器""" + + #---------------------------------------------------------------------- + def __init__(self, repAddress, pubAddress): + """Constructor""" + super(TestServer, self).__init__(repAddress, pubAddress) + + self.register(self.add) + + #---------------------------------------------------------------------- + def add(self, a, b): + """测试函数""" + print 'receiving: %s, %s' % (a,b) + return a + b + + +if __name__ == '__main__': + repAddress = 'tcp://*:2014' + pubAddress = 'tcp://*:0602' + + ts = TestServer(repAddress, pubAddress) + ts.start() + + while 1: + content = 'current server time is %s' % time() + print content + ts.publish('test', content) + sleep(2) \ No newline at end of file diff --git a/vnpy/rpc/vnrpc.py b/vnpy/rpc/vnrpc.py new file mode 100644 index 00000000..c3a979c9 --- /dev/null +++ b/vnpy/rpc/vnrpc.py @@ -0,0 +1,316 @@ +# encoding: UTF-8 + +import threading +import traceback +import signal + +import zmq +from msgpack import packb, unpackb +from json import dumps, loads + +import cPickle +pDumps = cPickle.dumps +pLoads = cPickle.loads + + +# 实现Ctrl-c中断recv +signal.signal(signal.SIGINT, signal.SIG_DFL) + + +######################################################################## +class RpcObject(object): + """ + RPC对象 + + 提供对数据的序列化打包和解包接口,目前提供了json、msgpack、cPickle三种工具。 + + msgpack:性能更高,但通常需要安装msgpack相关工具; + json:性能略低但通用性更好,大部分编程语言都内置了相关的库。 + cPickle:性能一般且仅能用于Python,但是可以直接传送Python对象,非常方便。 + + 因此建议尽量使用msgpack,如果要和某些语言通讯没有提供msgpack时再使用json, + 当传送的数据包含很多自定义的Python对象时建议使用cPickle。 + + 如果希望使用其他的序列化工具也可以在这里添加。 + """ + + #---------------------------------------------------------------------- + def __init__(self): + """Constructor""" + # 默认使用msgpack作为序列化工具 + #self.useMsgpack() + self.usePickle() + + #---------------------------------------------------------------------- + def pack(self, data): + """打包""" + pass + + #---------------------------------------------------------------------- + def unpack(self, data): + """解包""" + pass + + #---------------------------------------------------------------------- + def __jsonPack(self, data): + """使用json打包""" + return dumps(data) + + #---------------------------------------------------------------------- + def __jsonUnpack(self, data): + """使用json解包""" + return loads(data) + + #---------------------------------------------------------------------- + def __msgpackPack(self, data): + """使用msgpack打包""" + return packb(data) + + #---------------------------------------------------------------------- + def __msgpackUnpack(self, data): + """使用msgpack解包""" + return unpackb(data) + + #---------------------------------------------------------------------- + def __picklePack(self, data): + """使用cPickle打包""" + return pDumps(data) + + #---------------------------------------------------------------------- + def __pickleUnpack(self, data): + """使用cPickle解包""" + return pLoads(data) + + #---------------------------------------------------------------------- + def useJson(self): + """使用json作为序列化工具""" + self.pack = self.__jsonPack + self.unpack = self.__jsonUnpack + + #---------------------------------------------------------------------- + def useMsgpack(self): + """使用msgpack作为序列化工具""" + self.pack = self.__msgpackPack + self.unpack = self.__msgpackUnpack + + #---------------------------------------------------------------------- + def usePickle(self): + """使用cPickle作为序列化工具""" + self.pack = self.__picklePack + self.unpack = self.__pickleUnpack + + +######################################################################## +class RpcServer(RpcObject): + """RPC服务器""" + + #---------------------------------------------------------------------- + def __init__(self, repAddress, pubAddress): + """Constructor""" + super(RpcServer, self).__init__() + + # 保存功能函数的字典,key是函数名,value是函数对象 + self.__functions = {} + + # zmq端口相关 + self.__context = zmq.Context() + + self.__socketREP = self.__context.socket(zmq.REP) # 请求回应socket + self.__socketREP.bind(repAddress) + + self.__socketPUB = self.__context.socket(zmq.PUB) # 数据广播socket + self.__socketPUB.bind(pubAddress) + + # 工作线程相关 + self.__active = False # 服务器的工作状态 + self.__thread = threading.Thread(target=self.run) # 服务器的工作线程 + + #---------------------------------------------------------------------- + def start(self): + """启动服务器""" + # 将服务器设为启动 + self.__active = True + + # 启动工作线程 + if not self.__thread.isAlive(): + self.__thread.start() + + #---------------------------------------------------------------------- + def stop(self): + """停止服务器""" + # 将服务器设为停止 + self.__active = False + + # 等待工作线程退出 + if self.__thread.isAlive(): + self.__thread.join() + + #---------------------------------------------------------------------- + def run(self): + """服务器运行函数""" + while self.__active: + # 使用poll来等待事件到达,等待1秒(1000毫秒) + if not self.__socketREP.poll(1000): + continue + + # 从请求响应socket收取请求数据 + reqb = self.__socketREP.recv() + + # 序列化解包 + req = self.unpack(reqb) + + # 获取函数名和参数 + name, args, kwargs = req + + # 获取引擎中对应的函数对象,并执行调用,如果有异常则捕捉后返回 + try: + func = self.__functions[name] + r = func(*args, **kwargs) + rep = [True, r] + except Exception as e: + rep = [False, traceback.format_exc()] + + # 序列化打包 + repb = self.pack(rep) + + # 通过请求响应socket返回调用结果 + self.__socketREP.send(repb) + + #---------------------------------------------------------------------- + def publish(self, topic, data): + """ + 广播推送数据 + topic:主题内容 + data:具体的数据 + """ + # 序列化数据 + datab = self.pack(data) + + # 通过广播socket发送数据 + self.__socketPUB.send_multipart([topic, datab]) + + #---------------------------------------------------------------------- + def register(self, func): + """注册函数""" + self.__functions[func.__name__] = func + + +######################################################################## +class RpcClient(RpcObject): + """RPC客户端""" + + #---------------------------------------------------------------------- + def __init__(self, reqAddress, subAddress): + """Constructor""" + super(RpcClient, self).__init__() + + # zmq端口相关 + self.__reqAddress = reqAddress + self.__subAddress = subAddress + + self.__context = zmq.Context() + self.__socketREQ = self.__context.socket(zmq.REQ) # 请求发出socket + self.__socketSUB = self.__context.socket(zmq.SUB) # 广播订阅socket + + # 工作线程相关,用于处理服务器推送的数据 + self.__active = False # 客户端的工作状态 + self.__thread = threading.Thread(target=self.run) # 客户端的工作线程 + + #---------------------------------------------------------------------- + def __getattr__(self, name): + """实现远程调用功能""" + # 执行远程调用任务 + def dorpc(*args, **kwargs): + # 生成请求 + req = [name, args, kwargs] + + # 序列化打包请求 + reqb = self.pack(req) + + # 发送请求并等待回应 + self.__socketREQ.send(reqb) + repb = self.__socketREQ.recv() + + # 序列化解包回应 + rep = self.unpack(repb) + + # 若正常则返回结果,调用失败则触发异常 + if rep[0]: + return rep[1] + else: + raise RemoteException(rep[1]) + + return dorpc + + #---------------------------------------------------------------------- + def start(self): + """启动客户端""" + # 连接端口 + self.__socketREQ.connect(self.__reqAddress) + self.__socketSUB.connect(self.__subAddress) + + # 将服务器设为启动 + self.__active = True + + # 启动工作线程 + if not self.__thread.isAlive(): + self.__thread.start() + + #---------------------------------------------------------------------- + def stop(self): + """停止客户端""" + # 将客户端设为停止 + self.__active = False + + # 等待工作线程退出 + if self.__thread.isAlive(): + self.__thread.join() + + #---------------------------------------------------------------------- + def run(self): + """客户端运行函数""" + while self.__active: + # 使用poll来等待事件到达,等待1秒(1000毫秒) + if not self.__socketSUB.poll(1000): + continue + + # 从订阅socket收取广播数据 + topic, datab = self.__socketSUB.recv_multipart() + + # 序列化解包 + data = self.unpack(datab) + + # 调用回调函数处理 + self.callback(topic, data) + + #---------------------------------------------------------------------- + def callback(self, topic, data): + """回调函数,必须由用户实现""" + raise NotImplementedError + + #---------------------------------------------------------------------- + def subscribeTopic(self, topic): + """ + 订阅特定主题的广播数据 + + 可以使用topic=''来订阅所有的主题 + """ + self.__socketSUB.setsockopt(zmq.SUBSCRIBE, topic) + + + +######################################################################## +class RemoteException(Exception): + """RPC远程异常""" + + #---------------------------------------------------------------------- + def __init__(self, value): + """Constructor""" + self.__value = value + + #---------------------------------------------------------------------- + def __str__(self): + """输出错误信息""" + return self.__value + + \ No newline at end of file