Merge pull request #1891 from vnpy/rpc_server

Rpc server
This commit is contained in:
vn.py 2019-07-02 14:10:43 +08:00 committed by GitHub
commit 703eaa1eac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 699 additions and 373 deletions

View File

@ -0,0 +1,26 @@
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
from vnpy.gateway.rpc import RpcGateway
from vnpy.app.cta_strategy import CtaStrategyApp
def main():
""""""
qapp = create_qapp()
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_gateway(RpcGateway)
main_engine.add_app(CtaStrategyApp)
main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()
qapp.exec()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,72 @@
from time import sleep
from vnpy.event import EventEngine, Event
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
from vnpy.trader.event import EVENT_LOG
from vnpy.gateway.ctp import CtpGateway
from vnpy.app.rpc_service import RpcServiceApp
from vnpy.app.rpc_service.engine import EVENT_RPC_LOG
def main_ui():
""""""
qapp = create_qapp()
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_gateway(CtpGateway)
main_engine.add_app(RpcServiceApp)
main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()
qapp.exec()
def process_log_event(event: Event):
""""""
log = event.data
msg = f"{log.time}\t{log.msg}"
print(msg)
def main_terminal():
""""""
event_engine = EventEngine()
event_engine.register(EVENT_LOG, process_log_event)
event_engine.register(EVENT_RPC_LOG, process_log_event)
main_engine = MainEngine(event_engine)
main_engine.add_gateway(CtpGateway)
rpc_engine = main_engine.add_app(RpcServiceApp)
setting = {
"用户名": "25500029",
"密码": "20140602",
"经纪商代码": "6010",
"交易服务器": "180.169.75.194:41305",
"行情服务器": "180.166.1.17:41313",
"产品名称": "vntech_vnpy_2.0",
"授权编码": "0Y1J5UIMY79BFL7S",
"产品信息": ""
}
main_engine.connect(setting, "CTP")
sleep(10)
rep_address = "tcp://127.0.0.1:2014"
pub_address = "tcp://127.0.0.1:4102"
rpc_engine.start(rep_address, pub_address)
while True:
sleep(1)
if __name__ == "__main__":
# Run in GUI mode
# main_ui()
# Run in CLI mode
main_terminal()

View File

@ -10,26 +10,26 @@ class TestClient(RpcClient):
Test RpcClient Test RpcClient
""" """
def __init__(self, req_address, sub_address): def __init__(self):
""" """
Constructor Constructor
""" """
super(TestClient, self).__init__(req_address, sub_address) super(TestClient, self).__init__()
def callback(self, topic, data): def callback(self, topic, data):
""" """
Realize callable function Realize callable function
""" """
print('client received topic:', topic, ', data:', data) print(f"client received topic:{topic}, data:{data}")
if __name__ == '__main__': if __name__ == "__main__":
req_address = 'tcp://localhost:2014' req_address = "tcp://localhost:2014"
sub_address = 'tcp://localhost:0602' sub_address = "tcp://localhost:4102"
tc = TestClient(req_address, sub_address) tc = TestClient()
tc.subscribeTopic('') tc.subscribe_topic("")
tc.start() tc.start(req_address, sub_address)
while 1: while 1:
print(tc.add(1, 3)) print(tc.add(1, 3))

View File

@ -10,11 +10,11 @@ class TestServer(RpcServer):
Test RpcServer Test RpcServer
""" """
def __init__(self, rep_address, pub_address): def __init__(self):
""" """
Constructor Constructor
""" """
super(TestServer, self).__init__(rep_address, pub_address) super(TestServer, self).__init__()
self.register(self.add) self.register(self.add)
@ -22,19 +22,19 @@ class TestServer(RpcServer):
""" """
Test function Test function
""" """
print('receiving: %s, %s' % (a, b)) print(f"receiving:{a} {b}")
return a + b return a + b
if __name__ == '__main__': if __name__ == "__main__":
rep_address = 'tcp://*:2014' rep_address = "tcp://*:2014"
pub_address = 'tcp://*:0602' pub_address = "tcp://*:4102"
ts = TestServer(rep_address, pub_address) ts = TestServer()
ts.start() ts.start(rep_address, pub_address)
while 1: while 1:
content = 'current server time is %s' % time() content = f"current server time is {time()}"
print(content) print(content)
ts.publish('test', content) ts.publish("test", content)
sleep(2) sleep(2)

View File

@ -31,6 +31,7 @@ from vnpy.app.cta_backtester import CtaBacktesterApp
# from vnpy.app.data_recorder import DataRecorderApp # from vnpy.app.data_recorder import DataRecorderApp
# from vnpy.app.risk_manager import RiskManagerApp # from vnpy.app.risk_manager import RiskManagerApp
from vnpy.app.script_trader import ScriptTraderApp from vnpy.app.script_trader import ScriptTraderApp
from vnpy.app.rpc_service import RpcServiceApp
def main(): def main():
@ -68,6 +69,7 @@ def main():
# main_engine.add_app(DataRecorderApp) # main_engine.add_app(DataRecorderApp)
# main_engine.add_app(RiskManagerApp) # main_engine.add_app(RiskManagerApp)
main_engine.add_app(ScriptTraderApp) main_engine.add_app(ScriptTraderApp)
main_engine.add_app(RpcServiceApp)
main_window = MainWindow(main_engine, event_engine) main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized() main_window.showMaximized()

View File

@ -1,5 +1,3 @@
# encoding: UTF-8
import sys import sys
import traceback import traceback
from datetime import datetime from datetime import datetime

View File

@ -1,5 +1,3 @@
# encoding: UTF-8
import json import json
import ssl import ssl
import sys import sys

View File

@ -20,9 +20,6 @@ class RiskManagerEngine(BaseEngine):
"""""" """"""
super().__init__(main_engine, event_engine, APP_NAME) super().__init__(main_engine, event_engine, APP_NAME)
self.main_engine = main_engine
self.event_engine = event_engine
self.active = False self.active = False
self.order_flow_count = 0 self.order_flow_count = 0

View File

@ -0,0 +1,14 @@
from pathlib import Path
from vnpy.trader.app import BaseApp
from .engine import RpcEngine, APP_NAME
class RpcServiceApp(BaseApp):
""""""
app_name = APP_NAME
app_module = __module__
app_path = Path(__file__).parent
display_name = "RPC服务"
engine_class = RpcEngine
widget_name = "RpcManager"
icon_name = "rpc.ico"

View File

@ -0,0 +1,119 @@
""""""
import traceback
from vnpy.event import Event, EventEngine
from vnpy.rpc import RpcServer
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.utility import load_json, save_json
from vnpy.trader.object import LogData
APP_NAME = "RpcService"
EVENT_RPC_LOG = "eRpcLog"
class RpcEngine(BaseEngine):
""""""
setting_filename = "rpc_service_setting.json"
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine, event_engine, APP_NAME)
self.rep_address = "tcp://*:2014"
self.pub_address = "tcp://*:4102"
self.server = None
self.init_server()
self.load_setting()
self.register_event()
def init_server(self):
""""""
self.server = RpcServer()
self.server.register(self.main_engine.subscribe)
self.server.register(self.main_engine.send_order)
self.server.register(self.main_engine.send_orders)
self.server.register(self.main_engine.cancel_order)
self.server.register(self.main_engine.cancel_orders)
self.server.register(self.main_engine.query_history)
self.server.register(self.main_engine.get_tick)
self.server.register(self.main_engine.get_order)
self.server.register(self.main_engine.get_trade)
self.server.register(self.main_engine.get_position)
self.server.register(self.main_engine.get_account)
self.server.register(self.main_engine.get_contract)
self.server.register(self.main_engine.get_all_ticks)
self.server.register(self.main_engine.get_all_orders)
self.server.register(self.main_engine.get_all_trades)
self.server.register(self.main_engine.get_all_positions)
self.server.register(self.main_engine.get_all_accounts)
self.server.register(self.main_engine.get_all_contracts)
self.server.register(self.main_engine.get_all_active_orders)
def load_setting(self):
""""""
setting = load_json(self.setting_filename)
self.rep_address = setting.get("rep_address", self.rep_address)
self.pub_address = setting.get("pub_address", self.pub_address)
def save_setting(self):
""""""
setting = {
"rep_address": self.rep_address,
"pub_address": self.pub_address
}
save_json(self.setting_filename, setting)
def start(self, rep_address: str, pub_address: str):
""""""
if self.server.is_active():
self.write_log("RPC服务运行中")
return False
self.rep_address = rep_address
self.pub_address = pub_address
try:
self.server.start(rep_address, pub_address)
except: # noqa
msg = traceback.format_exc()
self.write_log(f"RPC服务启动失败{msg}")
return False
self.save_setting()
self.write_log("RPC服务启动成功")
return True
def stop(self):
""""""
if not self.server.is_active():
self.write_log("RPC服务未启动")
return False
self.server.stop()
self.write_log("RPC服务已停止")
return True
def close(self):
""""""
self.stop()
def register_event(self):
""""""
self.event_engine.register_general(self.process_event)
def process_event(self, event: Event):
""""""
if self.server.is_active():
self.server.publish("", event)
def write_log(self, msg: str) -> None:
""""""
log = LogData(msg=msg, gateway_name=APP_NAME)
event = Event(EVENT_RPC_LOG, log)
self.event_engine.put(event)

View File

@ -0,0 +1 @@
from .widget import RpcManager

Binary file not shown.

After

Width:  |  Height:  |  Size: 65 KiB

View File

@ -0,0 +1,93 @@
from vnpy.event import EventEngine, Event
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import QtWidgets, QtCore
from ..engine import APP_NAME, EVENT_RPC_LOG
class RpcManager(QtWidgets.QWidget):
""""""
signal_log = QtCore.pyqtSignal(Event)
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__()
self.main_engine = main_engine
self.event_engine = event_engine
self.rpc_engine = main_engine.get_engine(APP_NAME)
self.init_ui()
self.register_event()
def init_ui(self):
""""""
self.setWindowTitle("RPC服务")
self.setFixedWidth(900)
self.setFixedHeight(500)
self.start_button = QtWidgets.QPushButton("启动")
self.start_button.clicked.connect(self.start_server)
self.stop_button = QtWidgets.QPushButton("停止")
self.stop_button.clicked.connect(self.stop_server)
self.stop_button.setEnabled(False)
for button in [self.start_button, self.stop_button]:
hint = button.sizeHint()
button.setFixedHeight(hint.height() * 2)
button.setFixedWidth(hint.width() * 4)
self.rep_line = QtWidgets.QLineEdit(self.rpc_engine.rep_address)
self.rep_line.setFixedWidth(300)
self.pub_line = QtWidgets.QLineEdit(self.rpc_engine.pub_address)
self.pub_line.setFixedWidth(300)
self.log_monitor = QtWidgets.QTextEdit()
self.log_monitor.setReadOnly(True)
form = QtWidgets.QFormLayout()
form.addRow("请求响应地址", self.rep_line)
form.addRow("事件广播地址", self.pub_line)
hbox = QtWidgets.QHBoxLayout()
hbox.addLayout(form)
hbox.addWidget(self.start_button)
hbox.addWidget(self.stop_button)
hbox.addStretch()
vbox = QtWidgets.QVBoxLayout()
vbox.addLayout(hbox)
vbox.addWidget(self.log_monitor)
self.setLayout(vbox)
def register_event(self):
""""""
self.signal_log.connect(self.process_log_event)
self.event_engine.register(EVENT_RPC_LOG, self.signal_log.emit)
def process_log_event(self, event: Event):
""""""
log = event.data
msg = f"{log.time}\t{log.msg}"
self.log_monitor.append(msg)
def start_server(self):
""""""
rep_address = self.rep_line.text()
pub_address = self.pub_line.text()
result = self.rpc_engine.start(rep_address, pub_address)
if result:
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
def stop_server(self):
""""""
result = self.rpc_engine.stop()
if result:
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)

View File

@ -40,9 +40,6 @@ class ScriptEngine(BaseEngine):
"""""" """"""
super().__init__(main_engine, event_engine, APP_NAME) super().__init__(main_engine, event_engine, APP_NAME)
self.main_engine = main_engine
self.event_engine = event_engine
self.get_tick = main_engine.get_tick self.get_tick = main_engine.get_tick
self.get_order = main_engine.get_order self.get_order = main_engine.get_order
self.get_trade = main_engine.get_trade self.get_trade = main_engine.get_trade

View File

@ -1,4 +1,3 @@
# encoding: UTF-8
""" """
Author: vigarbuaa Author: vigarbuaa
""" """

View File

@ -1,4 +1,3 @@
# encoding: UTF-8
""" """
Author: vigarbuaa Author: vigarbuaa
""" """

View File

@ -1,6 +1,4 @@
# encoding: UTF-8 """"""
"""
"""
import hashlib import hashlib
import hmac import hmac

View File

@ -1,6 +1,4 @@
# encoding: UTF-8 """"""
"""
"""
from datetime import datetime from datetime import datetime

View File

@ -1,4 +1,3 @@
# encoding: UTF-8
""" """
Please install futu-api before use. Please install futu-api before use.
""" """

View File

@ -1,4 +1,3 @@
# encoding: UTF-8
""" """
Author: nanoric Author: nanoric
""" """

View File

@ -1,4 +1,3 @@
# encoding: UTF-8
""" """
""" """

View File

@ -1,4 +1,3 @@
# encoding: UTF-8
""" """
Author: qqqlyx Author: qqqlyx
""" """

View File

@ -1,4 +1,3 @@
# encoding: UTF-8
""" """
""" """

View File

@ -0,0 +1 @@
from .rpc_gateway import RpcGateway

View File

@ -0,0 +1,116 @@
from vnpy.event import Event
from vnpy.rpc import RpcClient
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import (
SubscribeRequest,
CancelRequest,
OrderRequest
)
from vnpy.trader.constant import Exchange
class RpcGateway(BaseGateway):
"""
VN Trader Gateway for RPC service.
"""
default_setting = {
"主动请求地址": "tcp://127.0.0.1:2014",
"推送订阅地址": "tcp://127.0.0.1:4102"
}
exchanges = list(Exchange)
def __init__(self, event_engine):
"""Constructor"""
super().__init__(event_engine, "RPC")
self.symbol_gateway_map = {}
self.client = RpcClient()
self.client.callback = self.client_callback
def connect(self, setting: dict):
""""""
req_address = setting["主动请求地址"]
pub_address = setting["推送订阅地址"]
self.client.subscribe_topic("")
self.client.start(req_address, pub_address)
self.write_log("服务器连接成功,开始初始化查询")
self.query_all()
def subscribe(self, req: SubscribeRequest):
""""""
gateway_name = self.symbol_gateway_map.get(req.vt_symbol, "")
self.client.subscribe(req, gateway_name)
def send_order(self, req: OrderRequest):
""""""
gateway_name = self.symbol_gateway_map.get(req.vt_symbol, "")
self.client.send_order(req, gateway_name)
def cancel_order(self, req: CancelRequest):
""""""
gateway_name = self.symbol_gateway_map.get(req.vt_symbol, "")
self.client.cancel_order(req, gateway_name)
def query_account(self):
""""""
pass
def query_position(self):
""""""
pass
def query_all(self):
""""""
contracts = self.client.get_all_contracts()
for contract in contracts:
self.symbol_gateway_map[contract.vt_symbol] = contract.gateway_name
contract.gateway_name = self.gateway_name
self.on_contract(contract)
self.write_log("合约信息查询成功")
accounts = self.client.get_all_accounts()
for account in accounts:
account.gateway_name = self.gateway_name
self.on_account(account)
self.write_log("资金信息查询成功")
positions = self.client.get_all_positions()
for position in positions:
position.gateway_name = self.gateway_name
self.on_position(position)
self.write_log("持仓信息查询成功")
orders = self.client.get_all_orders()
for order in orders:
order.gateway_name = self.gateway_name
self.on_order(order)
self.write_log("委托信息查询成功")
trades = self.client.get_all_trades()
for trade in trades:
trade.gateway_name = self.gateway_name
self.on_trade(trade)
self.write_log("成交信息查询成功")
def close(self):
""""""
self.client.stop()
def client_callback(self, topic: str, event: Event):
""""""
if event is None:
print("none event", topic, event)
return
data = event.data
if hasattr(data, "gateway_name"):
data.gateway_name = self.gateway_name
self.event_engine.put(event)

View File

@ -1,4 +1,3 @@
# encoding: UTF-8
""" """
Author: KeKe Author: KeKe
Please install tiger-api before use. Please install tiger-api before use.

View File

@ -1 +1,234 @@
from .vnrpc import RpcServer, RpcClient, RemoteException import threading
import traceback
import signal
import zmq
from typing import Any, Callable
# Achieve Ctrl-c interrupt recv
signal.signal(signal.SIGINT, signal.SIG_DFL)
class RemoteException(Exception):
"""
RPC remote exception
"""
def __init__(self, value):
"""
Constructor
"""
self.__value = value
def __str__(self):
"""
Output error message
"""
return self.__value
class RpcServer:
""""""
def __init__(self):
"""
Constructor
"""
# Save functions dict: key is fuction name, value is fuction object
self.__functions = {}
# Zmq port related
self.__context = zmq.Context()
# Reply socket (Requestreply pattern)
self.__socket_rep = self.__context.socket(zmq.REP)
# Publish socket (Publishsubscribe pattern)
self.__socket_pub = self.__context.socket(zmq.PUB)
# Worker thread related
self.__active = False # RpcServer status
self.__thread = None # RpcServer thread
def is_active(self):
""""""
return self.__active
def start(self, rep_address: str, pub_address: str):
"""
Start RpcServer
"""
if self.__active:
return
# Bind socket address
self.__socket_rep.bind(rep_address)
self.__socket_pub.bind(pub_address)
# Start RpcServer status
self.__active = True
# Start RpcServer thread
self.__thread = threading.Thread(target=self.run)
self.__thread.start()
def stop(self):
"""
Stop RpcServer
"""
if not self.__active:
return
# Stop RpcServer status
self.__active = False
# Wait for RpcServer thread to exit
if self.__thread.isAlive():
self.__thread.join()
self.__thread = None
# Unbind socket address
self.__socket_pub.unbind(self.__socket_pub.LAST_ENDPOINT)
self.__socket_rep.unbind(self.__socket_rep.LAST_ENDPOINT)
def run(self):
"""
Run RpcServer functions
"""
while self.__active:
# Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds)
if not self.__socket_rep.poll(1000):
continue
# Receive request data from Reply socket
req = self.__socket_rep.recv_pyobj()
# Get function name and parameters
name, args, kwargs = req
# Try to get and execute callable function object; capture exception information if it fails
try:
func = self.__functions[name]
r = func(*args, **kwargs)
rep = [True, r]
except Exception as e: # noqa
rep = [False, traceback.format_exc()]
# send callable response by Reply socket
self.__socket_rep.send_pyobj(rep)
def publish(self, topic: str, data: Any):
"""
Publish data
"""
self.__socket_pub.send_pyobj([topic, data])
def register(self, func: Callable):
"""
Register function
"""
self.__functions[func.__name__] = func
class RpcClient:
""""""
def __init__(self):
"""Constructor"""
# zmq port related
self.__context = zmq.Context()
# Request socket (Requestreply pattern)
self.__socket_req = self.__context.socket(zmq.REQ)
# Subscribe socket (Publishsubscribe pattern)
self.__socket_sub = self.__context.socket(zmq.SUB)
# Worker thread relate, used to process data pushed from server
self.__active = False # RpcClient status
self.__thread = None # RpcClient thread
def __getattr__(self, name: str):
"""
Realize remote call function
"""
# Perform remote call task
def dorpc(*args, **kwargs):
# Generate request
req = [name, args, kwargs]
# Send request and wait for response
self.__socket_req.send_pyobj(req)
rep = self.__socket_req.recv_pyobj()
# Return response if successed; Trigger exception if failed
if rep[0]:
return rep[1]
else:
raise RemoteException(rep[1])
return dorpc
def start(self, req_address: str, sub_address: str):
"""
Start RpcClient
"""
if self.__active:
return
# Connect zmq port
self.__socket_req.connect(req_address)
self.__socket_sub.connect(sub_address)
# Start RpcClient status
self.__active = True
# Start RpcClient thread
self.__thread = threading.Thread(target=self.run)
self.__thread.start()
def stop(self):
"""
Stop RpcClient
"""
if not self.__active:
return
# Stop RpcClient status
self.__active = False
# Wait for RpcClient thread to exit
if self.__thread.isAlive():
self.__thread.join()
self.__thread = None
# Close socket
self.__socket_req.close()
self.__socket_sub.close()
def run(self):
"""
Run RpcClient function
"""
while self.__active:
# Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds)
if not self.__socket_sub.poll(1000):
continue
# Receive data from subscribe socket
topic, data = self.__socket_sub.recv_pyobj()
# Process data by callable function
self.callback(topic, data)
def callback(self, topic: str, data: Any):
"""
Callable function
"""
raise NotImplementedError
def subscribe_topic(self, topic: str):
"""
Subscribe data
"""
self.__socket_sub.setsockopt_string(zmq.SUBSCRIBE, topic)

View File

@ -1,329 +0,0 @@
import threading
import traceback
import signal
import zmq
from msgpack import packb, unpackb
from json import dumps, loads
import pickle
p_dumps = pickle.dumps
p_loads = pickle.loads
# Achieve Ctrl-c interrupt recv
signal.signal(signal.SIGINT, signal.SIG_DFL)
class RpcObject(object):
"""
Referred to serialization of packing and unpacking, we offer 3 tools:
1) maspack: higher performance, but usually requires the installation of msgpack related tools;
2) jason: Slightly lower performance but versatility is better, most programming languages have built-in libraries;
3) cPickle: Lower performance and only can be used in Python, but it is very convenient to transfer Python objects directly.
Therefore, it is recommended to use msgpack.
Use json, if you want to communicate with some languages without providing msgpack.
Use cPickle, when the data being transferred contains many custom Python objects.
"""
def __init__(self):
"""
Constructor
Use msgpack as default serialization tool
"""
self.use_msgpack()
def pack(self, data):
""""""
pass
def unpack(self, data):
""""""
pass
def __json_pack(self, data):
"""
Pack with json
"""
return dumps(data)
def __json_unpack(self, data):
"""
Unpack with json
"""
return loads(data)
def __msgpack_pack(self, data):
"""
Pack with msgpack
"""
return packb(data)
def __msgpack_unpack(self, data):
"""
Unpack with msgpack
"""
return unpackb(data)
def __pickle_pack(self, data):
"""
Pack with cPickle
"""
return p_dumps(data)
def __pickle_unpack(self, data):
"""
Unpack with cPickle
"""
return p_loads(data)
def use_json(self):
"""
Use json as serialization tool
"""
self.pack = self.__json_pack
self.unpack = self.__json_unpack
def use_msgpack(self):
"""
Use msgpack as serialization tool
"""
self.pack = self.__msgpack_pack
self.unpack = self.__msgpack_unpack
def use_pickle(self):
"""
Use cPickle as serialization tool
"""
self.pack = self.__pickle_pack
self.unpack = self.__pickle_unpack
class RpcServer(RpcObject):
""""""
def __init__(self, rep_address, pub_address):
"""
Constructor
"""
super(RpcServer, self).__init__()
# Save functions dict: key is fuction name, value is fuction object
self.__functions = {}
# Zmq port related
self.__context = zmq.Context()
self.__socket_rep = self.__context.socket(
zmq.REP) # Reply socket (Requestreply pattern)
self.__socket_rep.bind(rep_address)
# Publish socket (Publishsubscribe pattern)
self.__socket_pub = self.__context.socket(zmq.PUB)
self.__socket_pub.bind(pub_address)
# Woker thread related
self.__active = False # RpcServer status
self.__thread = threading.Thread(target=self.run) # RpcServer thread
def start(self):
"""
Start RpcServer
"""
# Start RpcServer status
self.__active = True
# Start RpcServer thread
if not self.__thread.isAlive():
self.__thread.start()
def stop(self, join=False):
"""
Stop RpcServer
"""
# Stop RpcServer status
self.__active = False
# Wait for RpcServer thread to exit
if join and self.__thread.isAlive():
self.__thread.join()
def run(self):
"""
Run RpcServer functions
"""
while self.__active:
# Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds)
if not self.__socket_rep.poll(1000):
continue
# Receive request data from Reply socket
reqb = self.__socket_rep.recv()
# Unpack request by deserialization
req = self.unpack(reqb)
# Get function name and parameters
name, args, kwargs = req
# Try to get and execute callable function object; capture exception information if it fails
name = name.decode("UTF-8")
try:
func = self.__functions[name]
r = func(*args, **kwargs)
rep = [True, r]
except Exception as e: # noqa
rep = [False, traceback.format_exc()]
# Pack response by serialization
repb = self.pack(rep)
# send callable response by Reply socket
self.__socket_rep.send(repb)
def publish(self, topic, data):
"""
Publish data
"""
# Serialized data
topic = bytes(topic, "UTF-8")
datab = self.pack(data)
# Send data by Publish socket
# topci must be ascii encoding
self.__socket_pub.send_multipart([topic, datab])
def register(self, func):
"""
Register function
"""
self.__functions[func.__name__] = func
class RpcClient(RpcObject):
""""""
def __init__(self, req_address, sub_address):
"""Constructor"""
super(RpcClient, self).__init__()
# zmq port related
self.__req_address = req_address
self.__sub_address = sub_address
self.__context = zmq.Context()
# Request socket (Requestreply pattern)
self.__socket_req = self.__context.socket(zmq.REQ)
# Subscribe socket (Publishsubscribe pattern)
self.__socket_sub = self.__context.socket(zmq.SUB)
# Woker thread relate, used to process data pushed from server
self.__active = False # RpcClient status
self.__thread = threading.Thread(
target=self.run) # RpcClient thread
def __getattr__(self, name):
"""
Realize remote call function
"""
# Perform remote call task
def dorpc(*args, **kwargs):
# Generate request
req = [name, args, kwargs]
# Pack request by serialization
reqb = self.pack(req)
# Send request and wait for response
self.__socket_req.send(reqb)
repb = self.__socket_req.recv()
# Unpack response by deserialization
rep = self.unpack(repb)
# Return response if successed; Trigger exception if failed
if rep[0]:
return rep[1]
else:
raise RemoteException(rep[1].decode("UTF-8"))
return dorpc
def start(self):
"""
Start RpcClient
"""
# Connect zmq port
self.__socket_req.connect(self.__req_address)
self.__socket_sub.connect(self.__sub_address)
# Start RpcClient status
self.__active = True
# Start RpcClient thread
if not self.__thread.isAlive():
self.__thread.start()
def stop(self):
"""
Stop RpcClient
"""
# Stop RpcClient status
self.__active = False
# Wait for RpcClient thread to exit
if self.__thread.isAlive():
self.__thread.join()
def run(self):
"""
Run RpcClient function
"""
while self.__active:
# Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds)
if not self.__socket_sub.poll(1000):
continue
# Receive data from subscribe socket
topic, datab = self.__socket_sub.recv_multipart()
# Unpack data by deserialization
data = self.unpack(datab)
# Process data by callable function
topic = topic.decode("UTF-8")
self.callback(topic, data)
def callback(self, topic, data):
"""
Callable function
"""
raise NotImplementedError
def subscribeTopic(self, topic):
"""
Subscribe data
"""
topic = bytes(topic, "UTF-8")
self.__socket_sub.setsockopt(zmq.SUBSCRIBE, topic)
class RemoteException(Exception):
"""
RPC remote exception
"""
def __init__(self, value):
"""
Constructor
"""
self.__value = value
def __str__(self):
"""
Output error message
"""
return self.__value