增加vn.rpc模块功能:

1. 提供RpcObject用于兼容不同的序列化工具
2. 提供广播数据时的主题筛选功能,实现数据筛选的工作由python层下放到zmq c++底层,提高效率
This commit is contained in:
chenxy123 2016-10-03 12:15:33 +08:00
parent d7d6ecf840
commit f538e6aa12
3 changed files with 112 additions and 26 deletions

View File

@ -25,7 +25,7 @@ if __name__ == '__main__':
subAddress = 'tcp://localhost:0602' subAddress = 'tcp://localhost:0602'
tc = TestClient(reqAddress, subAddress) tc = TestClient(reqAddress, subAddress)
tc.subscribe('')
tc.start() tc.start()
while 1: while 1:

View File

@ -33,5 +33,5 @@ if __name__ == '__main__':
while 1: while 1:
content = 'current server time is %s' % time() content = 'current server time is %s' % time()
print content print content
ts.publish(content) ts.publish('test', content)
sleep(2) sleep(2)

View File

@ -5,15 +5,82 @@ import traceback
import zmq import zmq
from msgpack import packb, unpackb from msgpack import packb, unpackb
from json import dumps, loads
######################################################################## ########################################################################
class RpcServer(object): class RpcObject(object):
"""
RPC对象
提供对数据的序列化打包和解包接口目前提供了json和msgpack两种工具
msgpack性能更高但通常需要安装msgpack相关工具
json性能略低但通用性更好大部分编程语言都内置了相关的库
因此建议尽量使用msgpack如果要和某些语言通讯没有提供msgpack时再使用json
如果希望使用其他的序列化工具也可以在这里添加
"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
# 默认使用msgpack作为序列化工具
self.useMsgpack()
#----------------------------------------------------------------------
def pack(self, data):
"""打包"""
pass
#----------------------------------------------------------------------
def unpack(self, data):
"""解包"""
pass
#----------------------------------------------------------------------
def __jsonPack(self, data):
"""使用json打包"""
return dumps(data)
#----------------------------------------------------------------------
def __jsonUnpack(self, data):
"""使用json解包"""
return loads(data)
#----------------------------------------------------------------------
def __msgpackPack(self, data):
"""使用msgpack打包"""
return packb(data)
#----------------------------------------------------------------------
def __msgpackUnpack(self, data):
"""使用msgpack解包"""
return unpackb(data)
#----------------------------------------------------------------------
def useJson(self):
"""使用json作为序列化工具"""
self.pack = self.__jsonPack
self.unpack = self.__jsonUnpack
#----------------------------------------------------------------------
def useMsgpack(self):
"""使用msgpack作为序列化工具"""
self.pack = self.__msgpackPack
self.unpack = self.__msgpackUnpack
########################################################################
class RpcServer(RpcObject):
"""RPC服务器""" """RPC服务器"""
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def __init__(self, repAddress, pubAddress): def __init__(self, repAddress, pubAddress):
"""Constructor""" """Constructor"""
super(RpcServer, self).__init__()
# 保存功能函数的字典key是函数名value是函数对象 # 保存功能函数的字典key是函数名value是函数对象
self.__functions = {} self.__functions = {}
@ -55,8 +122,8 @@ class RpcServer(object):
# 从请求响应socket收取请求数据 # 从请求响应socket收取请求数据
reqb = self.__socketREP.recv() reqb = self.__socketREP.recv()
# 用msgpack解包 # 序列化解包
req = unpackb(reqb) req = self.unpack(reqb)
# 获取函数名和参数 # 获取函数名和参数
name, args, kwargs = req name, args, kwargs = req
@ -69,20 +136,24 @@ class RpcServer(object):
except Exception as e: except Exception as e:
rep = [False, traceback.format_exc()] rep = [False, traceback.format_exc()]
# 用msgpack打包 # 序列化打包
repb = packb(rep) repb = self.pack(rep)
# 通过请求响应socket返回调用结果 # 通过请求响应socket返回调用结果
self.__socketREP.send(repb) self.__socketREP.send(repb)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def publish(self, data): def publish(self, topic, data):
"""广播推送数据""" """
# 使用msgpack序列化数据 广播推送数据
datab = packb(data) topic主题内容
data具体的数据
"""
# 序列化数据
datab = self.pack(data)
# 通过广播socket发送数据 # 通过广播socket发送数据
self.__socketPUB.send(datab) self.__socketPUB.send_multipart([topic, datab])
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def register(self, func): def register(self, func):
@ -91,12 +162,14 @@ class RpcServer(object):
######################################################################## ########################################################################
class RpcClient(object): class RpcClient(RpcObject):
"""RPC客户端""" """RPC客户端"""
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def __init__(self, reqAddress, subAddress): def __init__(self, reqAddress, subAddress):
"""Constructor""" """Constructor"""
super(RpcClient, self).__init__()
# zmq端口相关 # zmq端口相关
self.__reqAddress = reqAddress self.__reqAddress = reqAddress
self.__subAddress = subAddress self.__subAddress = subAddress
@ -117,21 +190,21 @@ class RpcClient(object):
# 生成请求 # 生成请求
req = [name, args, kwargs] req = [name, args, kwargs]
# 用msgpack打包请求 # 序列化打包请求
reqb = packb(req) reqb = self.pack(req)
# 发送请求并等待回应 # 发送请求并等待回应
self.__socketREQ.send(reqb) self.__socketREQ.send(reqb)
repb = self.__socketREQ.recv() repb = self.__socketREQ.recv()
# 用msgpack解包回应 # 序列化解包回应
rep = unpackb(repb) rep = self.unpack(repb)
# 若正常则返回结果,调用失败则触发异常 # 若正常则返回结果,调用失败则触发异常
if rep[0]: if rep[0]:
return rep[1] return rep[1]
else: else:
raise RemoteException, rep[1] raise RemoteException(rep[1])
return dorpc return dorpc
@ -140,9 +213,7 @@ class RpcClient(object):
"""启动客户端""" """启动客户端"""
# 连接端口 # 连接端口
self.__socketREQ.connect(self.__reqAddress) self.__socketREQ.connect(self.__reqAddress)
self.__socketSUB.connect(self.__subAddress) self.__socketSUB.connect(self.__subAddress)
self.__socketSUB.setsockopt(zmq.SUBSCRIBE, '') # 订阅全部数据,不做过滤
# 将服务器设为启动 # 将服务器设为启动
self.__active = True self.__active = True
@ -164,10 +235,10 @@ class RpcClient(object):
"""连续运行函数""" """连续运行函数"""
while self.__active: while self.__active:
# 从订阅socket收取广播数据 # 从订阅socket收取广播数据
datab = self.__socketSUB.recv() topic, datab = self.__socketSUB.recv_multipart()
# 用msgpack解包 # 序列化解包
data = unpackb(datab) data = self.unpack(datab)
# 调用回调函数处理 # 调用回调函数处理
self.callback(data) self.callback(data)
@ -177,14 +248,29 @@ class RpcClient(object):
"""回调函数,必须由用户实现""" """回调函数,必须由用户实现"""
raise NotImplementedError raise NotImplementedError
#----------------------------------------------------------------------
def subscribe(self, topic):
"""
订阅特定主题的广播数据
可以使用topic=''来订阅所有的主题
"""
self.__socketSUB.setsockopt(zmq.SUBSCRIBE, topic)
######################################################################## ########################################################################
class RemoteException(Exception): class RemoteException(Exception):
"""RPC远程异常""" """RPC远程异常"""
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def __init__(self): def __init__(self, value):
"""Constructor""" """Constructor"""
pass self.__value = value
#----------------------------------------------------------------------
def __str__(self):
"""输出错误信息"""
return self.__value