From 41dee09540fd5cc5aec9560c0816b60035b4a9bd Mon Sep 17 00:00:00 2001 From: cclauss Date: Fri, 13 Apr 2018 10:17:28 +0200 Subject: [PATCH] Modernize vnpy/rpc to get ready for Python 3 --- vnpy/rpc/testClient.py | 15 +++--- vnpy/rpc/testServer.py | 11 ++-- vnpy/rpc/vnrpc.py | 118 ++++++++++++++++++++--------------------- 3 files changed, 72 insertions(+), 72 deletions(-) diff --git a/vnpy/rpc/testClient.py b/vnpy/rpc/testClient.py index e613cad5..d4d03fd3 100644 --- a/vnpy/rpc/testClient.py +++ b/vnpy/rpc/testClient.py @@ -1,5 +1,6 @@ # encoding: UTF-8 +from __future__ import print_function from time import sleep from vnrpc import RpcClient @@ -13,21 +14,21 @@ class TestClient(RpcClient): def __init__(self, reqAddress, subAddress): """Constructor""" super(TestClient, self).__init__(reqAddress, subAddress) - + #---------------------------------------------------------------------- def callback(self, topic, data): """回调函数实现""" - print 'client received topic:', topic, ', data:', data - + print('client received topic:', topic, ', data:', data) + if __name__ == '__main__': reqAddress = 'tcp://localhost:2014' subAddress = 'tcp://localhost:0602' - + tc = TestClient(reqAddress, subAddress) tc.subscribeTopic('') tc.start() - + while 1: - print tc.add(1, 3) - sleep(2) \ No newline at end of file + print(tc.add(1, 3)) + sleep(2) diff --git a/vnpy/rpc/testServer.py b/vnpy/rpc/testServer.py index f067603b..cb5760b1 100644 --- a/vnpy/rpc/testServer.py +++ b/vnpy/rpc/testServer.py @@ -1,5 +1,6 @@ # encoding: UTF-8 +from __future__ import print_function from time import sleep, time from vnrpc import RpcServer @@ -19,19 +20,19 @@ class TestServer(RpcServer): #---------------------------------------------------------------------- def add(self, a, b): """测试函数""" - print 'receiving: %s, %s' % (a,b) + print('receiving: %s, %s' % (a,b)) return a + b if __name__ == '__main__': repAddress = 'tcp://*:2014' pubAddress = 'tcp://*:0602' - + ts = TestServer(repAddress, pubAddress) ts.start() - + while 1: content = 'current server time is %s' % time() - print content + print(content) ts.publish('test', content) - sleep(2) \ No newline at end of file + sleep(2) diff --git a/vnpy/rpc/vnrpc.py b/vnpy/rpc/vnrpc.py index cc6d7275..7b533eab 100644 --- a/vnpy/rpc/vnrpc.py +++ b/vnpy/rpc/vnrpc.py @@ -21,16 +21,16 @@ signal.signal(signal.SIGINT, signal.SIG_DFL) class RpcObject(object): """ RPC对象 - + 提供对数据的序列化打包和解包接口,目前提供了json、msgpack、cPickle三种工具。 - + msgpack:性能更高,但通常需要安装msgpack相关工具; json:性能略低但通用性更好,大部分编程语言都内置了相关的库。 cPickle:性能一般且仅能用于Python,但是可以直接传送Python对象,非常方便。 - + 因此建议尽量使用msgpack,如果要和某些语言通讯没有提供msgpack时再使用json, 当传送的数据包含很多自定义的Python对象时建议使用cPickle。 - + 如果希望使用其他的序列化工具也可以在这里添加。 """ @@ -40,59 +40,59 @@ class RpcObject(object): # 默认使用msgpack作为序列化工具 #self.useMsgpack() self.usePickle() - + #---------------------------------------------------------------------- def pack(self, data): """打包""" pass - + #---------------------------------------------------------------------- def unpack(self, data): """解包""" pass - + #---------------------------------------------------------------------- def __jsonPack(self, data): """使用json打包""" return dumps(data) - + #---------------------------------------------------------------------- def __jsonUnpack(self, data): """使用json解包""" return loads(data) - + #---------------------------------------------------------------------- def __msgpackPack(self, data): """使用msgpack打包""" return packb(data) - + #---------------------------------------------------------------------- def __msgpackUnpack(self, data): """使用msgpack解包""" return unpackb(data) - + #---------------------------------------------------------------------- def __picklePack(self, data): """使用cPickle打包""" return pDumps(data) - + #---------------------------------------------------------------------- def __pickleUnpack(self, data): """使用cPickle解包""" return pLoads(data) - + #---------------------------------------------------------------------- def useJson(self): """使用json作为序列化工具""" self.pack = self.__jsonPack self.unpack = self.__jsonUnpack - + #---------------------------------------------------------------------- def useMsgpack(self): """使用msgpack作为序列化工具""" self.pack = self.__msgpackPack self.unpack = self.__msgpackUnpack - + #---------------------------------------------------------------------- def usePickle(self): """使用cPickle作为序列化工具""" @@ -108,43 +108,43 @@ class RpcServer(RpcObject): def __init__(self, repAddress, pubAddress): """Constructor""" super(RpcServer, self).__init__() - + # 保存功能函数的字典,key是函数名,value是函数对象 - self.__functions = {} + self.__functions = {} # zmq端口相关 self.__context = zmq.Context() - + self.__socketREP = self.__context.socket(zmq.REP) # 请求回应socket self.__socketREP.bind(repAddress) - + self.__socketPUB = self.__context.socket(zmq.PUB) # 数据广播socket self.__socketPUB.bind(pubAddress) - + # 工作线程相关 self.__active = False # 服务器的工作状态 self.__thread = threading.Thread(target=self.run) # 服务器的工作线程 - + #---------------------------------------------------------------------- def start(self): """启动服务器""" # 将服务器设为启动 self.__active = True - + # 启动工作线程 if not self.__thread.isAlive(): self.__thread.start() - + #---------------------------------------------------------------------- def stop(self): """停止服务器""" # 将服务器设为停止 self.__active = False - + # 等待工作线程退出 if self.__thread.isAlive(): self.__thread.join() - + #---------------------------------------------------------------------- def run(self): """服务器运行函数""" @@ -152,16 +152,16 @@ class RpcServer(RpcObject): # 使用poll来等待事件到达,等待1秒(1000毫秒) if not self.__socketREP.poll(1000): continue - + # 从请求响应socket收取请求数据 reqb = self.__socketREP.recv() - + # 序列化解包 req = self.unpack(reqb) - + # 获取函数名和参数 name, args, kwargs = req - + # 获取引擎中对应的函数对象,并执行调用,如果有异常则捕捉后返回 try: func = self.__functions[name] @@ -169,13 +169,13 @@ class RpcServer(RpcObject): rep = [True, r] except Exception as e: rep = [False, traceback.format_exc()] - + # 序列化打包 repb = self.pack(rep) - + # 通过请求响应socket返回调用结果 self.__socketREP.send(repb) - + #---------------------------------------------------------------------- def publish(self, topic, data): """ @@ -185,10 +185,10 @@ class RpcServer(RpcObject): """ # 序列化数据 datab = self.pack(data) - + # 通过广播socket发送数据 self.__socketPUB.send_multipart([topic, datab]) - + #---------------------------------------------------------------------- def register(self, func): """注册函数""" @@ -198,24 +198,24 @@ class RpcServer(RpcObject): ######################################################################## class RpcClient(RpcObject): """RPC客户端""" - + #---------------------------------------------------------------------- def __init__(self, reqAddress, subAddress): """Constructor""" super(RpcClient, self).__init__() - + # zmq端口相关 self.__reqAddress = reqAddress self.__subAddress = subAddress - + self.__context = zmq.Context() self.__socketREQ = self.__context.socket(zmq.REQ) # 请求发出socket - self.__socketSUB = self.__context.socket(zmq.SUB) # 广播订阅socket + self.__socketSUB = self.__context.socket(zmq.SUB) # 广播订阅socket # 工作线程相关,用于处理服务器推送的数据 self.__active = False # 客户端的工作状态 self.__thread = threading.Thread(target=self.run) # 客户端的工作线程 - + #---------------------------------------------------------------------- def __getattr__(self, name): """实现远程调用功能""" @@ -223,49 +223,49 @@ class RpcClient(RpcObject): def dorpc(*args, **kwargs): # 生成请求 req = [name, args, kwargs] - + # 序列化打包请求 reqb = self.pack(req) - + # 发送请求并等待回应 self.__socketREQ.send(reqb) repb = self.__socketREQ.recv() - + # 序列化解包回应 rep = self.unpack(repb) - + # 若正常则返回结果,调用失败则触发异常 if rep[0]: return rep[1] else: raise RemoteException(rep[1]) - + return dorpc - + #---------------------------------------------------------------------- def start(self): """启动客户端""" # 连接端口 self.__socketREQ.connect(self.__reqAddress) self.__socketSUB.connect(self.__subAddress) - + # 将服务器设为启动 self.__active = True - + # 启动工作线程 if not self.__thread.isAlive(): self.__thread.start() - + #---------------------------------------------------------------------- def stop(self): """停止客户端""" # 将客户端设为停止 self.__active = False - + # 等待工作线程退出 if self.__thread.isAlive(): self.__thread.join() - + #---------------------------------------------------------------------- def run(self): """客户端运行函数""" @@ -273,32 +273,32 @@ class RpcClient(RpcObject): # 使用poll来等待事件到达,等待1秒(1000毫秒) if not self.__socketSUB.poll(1000): continue - + # 从订阅socket收取广播数据 topic, datab = self.__socketSUB.recv_multipart() - + # 序列化解包 data = self.unpack(datab) # 调用回调函数处理 self.callback(topic, data) - + #---------------------------------------------------------------------- def callback(self, topic, data): """回调函数,必须由用户实现""" raise NotImplementedError - + #---------------------------------------------------------------------- def subscribeTopic(self, topic): """ 订阅特定主题的广播数据 - + 可以使用topic=''来订阅所有的主题 - + 注意topic必须是ascii编码 """ self.__socketSUB.setsockopt(zmq.SUBSCRIBE, topic) - + ######################################################################## class RemoteException(Exception): @@ -308,10 +308,8 @@ class RemoteException(Exception): def __init__(self, value): """Constructor""" self.__value = value - + #---------------------------------------------------------------------- def __str__(self): """输出错误信息""" return self.__value - - \ No newline at end of file