diff --git a/vn.rpc/testClient.py b/vn.rpc/testClient.py index 8e04c810..643cb916 100644 --- a/vn.rpc/testClient.py +++ b/vn.rpc/testClient.py @@ -25,7 +25,7 @@ if __name__ == '__main__': subAddress = 'tcp://localhost:0602' tc = TestClient(reqAddress, subAddress) - + tc.subscribe('') tc.start() while 1: diff --git a/vn.rpc/testServer.py b/vn.rpc/testServer.py index 09146189..f067603b 100644 --- a/vn.rpc/testServer.py +++ b/vn.rpc/testServer.py @@ -33,5 +33,5 @@ if __name__ == '__main__': while 1: content = 'current server time is %s' % time() print content - ts.publish(content) + ts.publish('test', content) sleep(2) \ No newline at end of file diff --git a/vn.rpc/vnrpc.py b/vn.rpc/vnrpc.py index ed89d53b..86e97ef2 100644 --- a/vn.rpc/vnrpc.py +++ b/vn.rpc/vnrpc.py @@ -5,15 +5,82 @@ import traceback import zmq from msgpack import packb, unpackb +from json import dumps, loads ######################################################################## -class RpcServer(object): +class RpcObject(object): + """ + RPC对象 + + 提供对数据的序列化打包和解包接口,目前提供了json和msgpack两种工具。 + + msgpack:性能更高,但通常需要安装msgpack相关工具; + json:性能略低但通用性更好,大部分编程语言都内置了相关的库。 + + 因此建议尽量使用msgpack,如果要和某些语言通讯没有提供msgpack时再使用json。 + + 如果希望使用其他的序列化工具也可以在这里添加。 + """ + + #---------------------------------------------------------------------- + def __init__(self): + """Constructor""" + # 默认使用msgpack作为序列化工具 + self.useMsgpack() + + #---------------------------------------------------------------------- + def pack(self, data): + """打包""" + pass + + #---------------------------------------------------------------------- + def unpack(self, data): + """解包""" + pass + + #---------------------------------------------------------------------- + def __jsonPack(self, data): + """使用json打包""" + return dumps(data) + + #---------------------------------------------------------------------- + def __jsonUnpack(self, data): + """使用json解包""" + return loads(data) + + #---------------------------------------------------------------------- + def __msgpackPack(self, data): + """使用msgpack打包""" + return packb(data) + + #---------------------------------------------------------------------- + def __msgpackUnpack(self, data): + """使用msgpack解包""" + return unpackb(data) + + #---------------------------------------------------------------------- + def useJson(self): + """使用json作为序列化工具""" + self.pack = self.__jsonPack + self.unpack = self.__jsonUnpack + + #---------------------------------------------------------------------- + def useMsgpack(self): + """使用msgpack作为序列化工具""" + self.pack = self.__msgpackPack + self.unpack = self.__msgpackUnpack + + +######################################################################## +class RpcServer(RpcObject): """RPC服务器""" #---------------------------------------------------------------------- def __init__(self, repAddress, pubAddress): """Constructor""" + super(RpcServer, self).__init__() + # 保存功能函数的字典,key是函数名,value是函数对象 self.__functions = {} @@ -55,8 +122,8 @@ class RpcServer(object): # 从请求响应socket收取请求数据 reqb = self.__socketREP.recv() - # 用msgpack解包 - req = unpackb(reqb) + # 序列化解包 + req = self.unpack(reqb) # 获取函数名和参数 name, args, kwargs = req @@ -69,20 +136,24 @@ class RpcServer(object): except Exception as e: rep = [False, traceback.format_exc()] - # 用msgpack打包 - repb = packb(rep) + # 序列化打包 + repb = self.pack(rep) # 通过请求响应socket返回调用结果 self.__socketREP.send(repb) #---------------------------------------------------------------------- - def publish(self, data): - """广播推送数据""" - # 使用msgpack序列化数据 - datab = packb(data) + def publish(self, topic, data): + """ + 广播推送数据 + topic:主题内容 + data:具体的数据 + """ + # 序列化数据 + datab = self.pack(data) # 通过广播socket发送数据 - self.__socketPUB.send(datab) + self.__socketPUB.send_multipart([topic, datab]) #---------------------------------------------------------------------- def register(self, func): @@ -91,12 +162,14 @@ class RpcServer(object): ######################################################################## -class RpcClient(object): +class RpcClient(RpcObject): """RPC客户端""" - + #---------------------------------------------------------------------- def __init__(self, reqAddress, subAddress): """Constructor""" + super(RpcClient, self).__init__() + # zmq端口相关 self.__reqAddress = reqAddress self.__subAddress = subAddress @@ -117,21 +190,21 @@ class RpcClient(object): # 生成请求 req = [name, args, kwargs] - # 用msgpack打包请求 - reqb = packb(req) + # 序列化打包请求 + reqb = self.pack(req) # 发送请求并等待回应 self.__socketREQ.send(reqb) repb = self.__socketREQ.recv() - # 用msgpack解包回应 - rep = unpackb(repb) + # 序列化解包回应 + rep = self.unpack(repb) # 若正常则返回结果,调用失败则触发异常 if rep[0]: return rep[1] else: - raise RemoteException, rep[1] + raise RemoteException(rep[1]) return dorpc @@ -140,9 +213,7 @@ class RpcClient(object): """启动客户端""" # 连接端口 self.__socketREQ.connect(self.__reqAddress) - self.__socketSUB.connect(self.__subAddress) - self.__socketSUB.setsockopt(zmq.SUBSCRIBE, '') # 订阅全部数据,不做过滤 # 将服务器设为启动 self.__active = True @@ -164,10 +235,10 @@ class RpcClient(object): """连续运行函数""" while self.__active: # 从订阅socket收取广播数据 - datab = self.__socketSUB.recv() + topic, datab = self.__socketSUB.recv_multipart() - # 用msgpack解包 - data = unpackb(datab) + # 序列化解包 + data = self.unpack(datab) # 调用回调函数处理 self.callback(data) @@ -177,14 +248,29 @@ class RpcClient(object): """回调函数,必须由用户实现""" raise NotImplementedError + #---------------------------------------------------------------------- + def subscribe(self, topic): + """ + 订阅特定主题的广播数据 + + 可以使用topic=''来订阅所有的主题 + """ + self.__socketSUB.setsockopt(zmq.SUBSCRIBE, topic) + + ######################################################################## class RemoteException(Exception): """RPC远程异常""" #---------------------------------------------------------------------- - def __init__(self): + def __init__(self, value): """Constructor""" - pass + self.__value = value + + #---------------------------------------------------------------------- + def __str__(self): + """输出错误信息""" + return self.__value \ No newline at end of file