Modernize vnpy/rpc to get ready for Python 3

This commit is contained in:
cclauss 2018-04-13 10:17:28 +02:00
parent be395abd82
commit 41dee09540
3 changed files with 72 additions and 72 deletions

View File

@ -1,5 +1,6 @@
# encoding: UTF-8 # encoding: UTF-8
from __future__ import print_function
from time import sleep from time import sleep
from vnrpc import RpcClient from vnrpc import RpcClient
@ -13,21 +14,21 @@ class TestClient(RpcClient):
def __init__(self, reqAddress, subAddress): def __init__(self, reqAddress, subAddress):
"""Constructor""" """Constructor"""
super(TestClient, self).__init__(reqAddress, subAddress) super(TestClient, self).__init__(reqAddress, subAddress)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def callback(self, topic, data): def callback(self, topic, data):
"""回调函数实现""" """回调函数实现"""
print 'client received topic:', topic, ', data:', data print('client received topic:', topic, ', data:', data)
if __name__ == '__main__': if __name__ == '__main__':
reqAddress = 'tcp://localhost:2014' reqAddress = 'tcp://localhost:2014'
subAddress = 'tcp://localhost:0602' subAddress = 'tcp://localhost:0602'
tc = TestClient(reqAddress, subAddress) tc = TestClient(reqAddress, subAddress)
tc.subscribeTopic('') tc.subscribeTopic('')
tc.start() tc.start()
while 1: while 1:
print tc.add(1, 3) print(tc.add(1, 3))
sleep(2) sleep(2)

View File

@ -1,5 +1,6 @@
# encoding: UTF-8 # encoding: UTF-8
from __future__ import print_function
from time import sleep, time from time import sleep, time
from vnrpc import RpcServer from vnrpc import RpcServer
@ -19,19 +20,19 @@ class TestServer(RpcServer):
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def add(self, a, b): def add(self, a, b):
"""测试函数""" """测试函数"""
print 'receiving: %s, %s' % (a,b) print('receiving: %s, %s' % (a,b))
return a + b return a + b
if __name__ == '__main__': if __name__ == '__main__':
repAddress = 'tcp://*:2014' repAddress = 'tcp://*:2014'
pubAddress = 'tcp://*:0602' pubAddress = 'tcp://*:0602'
ts = TestServer(repAddress, pubAddress) ts = TestServer(repAddress, pubAddress)
ts.start() ts.start()
while 1: while 1:
content = 'current server time is %s' % time() content = 'current server time is %s' % time()
print content print(content)
ts.publish('test', content) ts.publish('test', content)
sleep(2) sleep(2)

View File

@ -21,16 +21,16 @@ signal.signal(signal.SIGINT, signal.SIG_DFL)
class RpcObject(object): class RpcObject(object):
""" """
RPC对象 RPC对象
提供对数据的序列化打包和解包接口目前提供了jsonmsgpackcPickle三种工具 提供对数据的序列化打包和解包接口目前提供了jsonmsgpackcPickle三种工具
msgpack性能更高但通常需要安装msgpack相关工具 msgpack性能更高但通常需要安装msgpack相关工具
json性能略低但通用性更好大部分编程语言都内置了相关的库 json性能略低但通用性更好大部分编程语言都内置了相关的库
cPickle性能一般且仅能用于Python但是可以直接传送Python对象非常方便 cPickle性能一般且仅能用于Python但是可以直接传送Python对象非常方便
因此建议尽量使用msgpack如果要和某些语言通讯没有提供msgpack时再使用json 因此建议尽量使用msgpack如果要和某些语言通讯没有提供msgpack时再使用json
当传送的数据包含很多自定义的Python对象时建议使用cPickle 当传送的数据包含很多自定义的Python对象时建议使用cPickle
如果希望使用其他的序列化工具也可以在这里添加 如果希望使用其他的序列化工具也可以在这里添加
""" """
@ -40,59 +40,59 @@ class RpcObject(object):
# 默认使用msgpack作为序列化工具 # 默认使用msgpack作为序列化工具
#self.useMsgpack() #self.useMsgpack()
self.usePickle() self.usePickle()
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def pack(self, data): def pack(self, data):
"""打包""" """打包"""
pass pass
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def unpack(self, data): def unpack(self, data):
"""解包""" """解包"""
pass pass
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def __jsonPack(self, data): def __jsonPack(self, data):
"""使用json打包""" """使用json打包"""
return dumps(data) return dumps(data)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def __jsonUnpack(self, data): def __jsonUnpack(self, data):
"""使用json解包""" """使用json解包"""
return loads(data) return loads(data)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def __msgpackPack(self, data): def __msgpackPack(self, data):
"""使用msgpack打包""" """使用msgpack打包"""
return packb(data) return packb(data)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def __msgpackUnpack(self, data): def __msgpackUnpack(self, data):
"""使用msgpack解包""" """使用msgpack解包"""
return unpackb(data) return unpackb(data)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def __picklePack(self, data): def __picklePack(self, data):
"""使用cPickle打包""" """使用cPickle打包"""
return pDumps(data) return pDumps(data)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def __pickleUnpack(self, data): def __pickleUnpack(self, data):
"""使用cPickle解包""" """使用cPickle解包"""
return pLoads(data) return pLoads(data)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def useJson(self): def useJson(self):
"""使用json作为序列化工具""" """使用json作为序列化工具"""
self.pack = self.__jsonPack self.pack = self.__jsonPack
self.unpack = self.__jsonUnpack self.unpack = self.__jsonUnpack
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def useMsgpack(self): def useMsgpack(self):
"""使用msgpack作为序列化工具""" """使用msgpack作为序列化工具"""
self.pack = self.__msgpackPack self.pack = self.__msgpackPack
self.unpack = self.__msgpackUnpack self.unpack = self.__msgpackUnpack
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def usePickle(self): def usePickle(self):
"""使用cPickle作为序列化工具""" """使用cPickle作为序列化工具"""
@ -108,43 +108,43 @@ class RpcServer(RpcObject):
def __init__(self, repAddress, pubAddress): def __init__(self, repAddress, pubAddress):
"""Constructor""" """Constructor"""
super(RpcServer, self).__init__() super(RpcServer, self).__init__()
# 保存功能函数的字典key是函数名value是函数对象 # 保存功能函数的字典key是函数名value是函数对象
self.__functions = {} self.__functions = {}
# zmq端口相关 # zmq端口相关
self.__context = zmq.Context() self.__context = zmq.Context()
self.__socketREP = self.__context.socket(zmq.REP) # 请求回应socket self.__socketREP = self.__context.socket(zmq.REP) # 请求回应socket
self.__socketREP.bind(repAddress) self.__socketREP.bind(repAddress)
self.__socketPUB = self.__context.socket(zmq.PUB) # 数据广播socket self.__socketPUB = self.__context.socket(zmq.PUB) # 数据广播socket
self.__socketPUB.bind(pubAddress) self.__socketPUB.bind(pubAddress)
# 工作线程相关 # 工作线程相关
self.__active = False # 服务器的工作状态 self.__active = False # 服务器的工作状态
self.__thread = threading.Thread(target=self.run) # 服务器的工作线程 self.__thread = threading.Thread(target=self.run) # 服务器的工作线程
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def start(self): def start(self):
"""启动服务器""" """启动服务器"""
# 将服务器设为启动 # 将服务器设为启动
self.__active = True self.__active = True
# 启动工作线程 # 启动工作线程
if not self.__thread.isAlive(): if not self.__thread.isAlive():
self.__thread.start() self.__thread.start()
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def stop(self): def stop(self):
"""停止服务器""" """停止服务器"""
# 将服务器设为停止 # 将服务器设为停止
self.__active = False self.__active = False
# 等待工作线程退出 # 等待工作线程退出
if self.__thread.isAlive(): if self.__thread.isAlive():
self.__thread.join() self.__thread.join()
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def run(self): def run(self):
"""服务器运行函数""" """服务器运行函数"""
@ -152,16 +152,16 @@ class RpcServer(RpcObject):
# 使用poll来等待事件到达等待1秒1000毫秒 # 使用poll来等待事件到达等待1秒1000毫秒
if not self.__socketREP.poll(1000): if not self.__socketREP.poll(1000):
continue continue
# 从请求响应socket收取请求数据 # 从请求响应socket收取请求数据
reqb = self.__socketREP.recv() reqb = self.__socketREP.recv()
# 序列化解包 # 序列化解包
req = self.unpack(reqb) req = self.unpack(reqb)
# 获取函数名和参数 # 获取函数名和参数
name, args, kwargs = req name, args, kwargs = req
# 获取引擎中对应的函数对象,并执行调用,如果有异常则捕捉后返回 # 获取引擎中对应的函数对象,并执行调用,如果有异常则捕捉后返回
try: try:
func = self.__functions[name] func = self.__functions[name]
@ -169,13 +169,13 @@ class RpcServer(RpcObject):
rep = [True, r] rep = [True, r]
except Exception as e: except Exception as e:
rep = [False, traceback.format_exc()] rep = [False, traceback.format_exc()]
# 序列化打包 # 序列化打包
repb = self.pack(rep) repb = self.pack(rep)
# 通过请求响应socket返回调用结果 # 通过请求响应socket返回调用结果
self.__socketREP.send(repb) self.__socketREP.send(repb)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def publish(self, topic, data): def publish(self, topic, data):
""" """
@ -185,10 +185,10 @@ class RpcServer(RpcObject):
""" """
# 序列化数据 # 序列化数据
datab = self.pack(data) datab = self.pack(data)
# 通过广播socket发送数据 # 通过广播socket发送数据
self.__socketPUB.send_multipart([topic, datab]) self.__socketPUB.send_multipart([topic, datab])
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def register(self, func): def register(self, func):
"""注册函数""" """注册函数"""
@ -198,24 +198,24 @@ class RpcServer(RpcObject):
######################################################################## ########################################################################
class RpcClient(RpcObject): class RpcClient(RpcObject):
"""RPC客户端""" """RPC客户端"""
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def __init__(self, reqAddress, subAddress): def __init__(self, reqAddress, subAddress):
"""Constructor""" """Constructor"""
super(RpcClient, self).__init__() super(RpcClient, self).__init__()
# zmq端口相关 # zmq端口相关
self.__reqAddress = reqAddress self.__reqAddress = reqAddress
self.__subAddress = subAddress self.__subAddress = subAddress
self.__context = zmq.Context() self.__context = zmq.Context()
self.__socketREQ = self.__context.socket(zmq.REQ) # 请求发出socket self.__socketREQ = self.__context.socket(zmq.REQ) # 请求发出socket
self.__socketSUB = self.__context.socket(zmq.SUB) # 广播订阅socket self.__socketSUB = self.__context.socket(zmq.SUB) # 广播订阅socket
# 工作线程相关,用于处理服务器推送的数据 # 工作线程相关,用于处理服务器推送的数据
self.__active = False # 客户端的工作状态 self.__active = False # 客户端的工作状态
self.__thread = threading.Thread(target=self.run) # 客户端的工作线程 self.__thread = threading.Thread(target=self.run) # 客户端的工作线程
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def __getattr__(self, name): def __getattr__(self, name):
"""实现远程调用功能""" """实现远程调用功能"""
@ -223,49 +223,49 @@ class RpcClient(RpcObject):
def dorpc(*args, **kwargs): def dorpc(*args, **kwargs):
# 生成请求 # 生成请求
req = [name, args, kwargs] req = [name, args, kwargs]
# 序列化打包请求 # 序列化打包请求
reqb = self.pack(req) reqb = self.pack(req)
# 发送请求并等待回应 # 发送请求并等待回应
self.__socketREQ.send(reqb) self.__socketREQ.send(reqb)
repb = self.__socketREQ.recv() repb = self.__socketREQ.recv()
# 序列化解包回应 # 序列化解包回应
rep = self.unpack(repb) rep = self.unpack(repb)
# 若正常则返回结果,调用失败则触发异常 # 若正常则返回结果,调用失败则触发异常
if rep[0]: if rep[0]:
return rep[1] return rep[1]
else: else:
raise RemoteException(rep[1]) raise RemoteException(rep[1])
return dorpc return dorpc
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def start(self): def start(self):
"""启动客户端""" """启动客户端"""
# 连接端口 # 连接端口
self.__socketREQ.connect(self.__reqAddress) self.__socketREQ.connect(self.__reqAddress)
self.__socketSUB.connect(self.__subAddress) self.__socketSUB.connect(self.__subAddress)
# 将服务器设为启动 # 将服务器设为启动
self.__active = True self.__active = True
# 启动工作线程 # 启动工作线程
if not self.__thread.isAlive(): if not self.__thread.isAlive():
self.__thread.start() self.__thread.start()
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def stop(self): def stop(self):
"""停止客户端""" """停止客户端"""
# 将客户端设为停止 # 将客户端设为停止
self.__active = False self.__active = False
# 等待工作线程退出 # 等待工作线程退出
if self.__thread.isAlive(): if self.__thread.isAlive():
self.__thread.join() self.__thread.join()
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def run(self): def run(self):
"""客户端运行函数""" """客户端运行函数"""
@ -273,32 +273,32 @@ class RpcClient(RpcObject):
# 使用poll来等待事件到达等待1秒1000毫秒 # 使用poll来等待事件到达等待1秒1000毫秒
if not self.__socketSUB.poll(1000): if not self.__socketSUB.poll(1000):
continue continue
# 从订阅socket收取广播数据 # 从订阅socket收取广播数据
topic, datab = self.__socketSUB.recv_multipart() topic, datab = self.__socketSUB.recv_multipart()
# 序列化解包 # 序列化解包
data = self.unpack(datab) data = self.unpack(datab)
# 调用回调函数处理 # 调用回调函数处理
self.callback(topic, data) self.callback(topic, data)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def callback(self, topic, data): def callback(self, topic, data):
"""回调函数,必须由用户实现""" """回调函数,必须由用户实现"""
raise NotImplementedError raise NotImplementedError
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def subscribeTopic(self, topic): def subscribeTopic(self, topic):
""" """
订阅特定主题的广播数据 订阅特定主题的广播数据
可以使用topic=''来订阅所有的主题 可以使用topic=''来订阅所有的主题
注意topic必须是ascii编码 注意topic必须是ascii编码
""" """
self.__socketSUB.setsockopt(zmq.SUBSCRIBE, topic) self.__socketSUB.setsockopt(zmq.SUBSCRIBE, topic)
######################################################################## ########################################################################
class RemoteException(Exception): class RemoteException(Exception):
@ -308,10 +308,8 @@ class RemoteException(Exception):
def __init__(self, value): def __init__(self, value):
"""Constructor""" """Constructor"""
self.__value = value self.__value = value
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def __str__(self): def __str__(self):
"""输出错误信息""" """输出错误信息"""
return self.__value return self.__value