diff --git a/examples/client_server/client/run_client.py b/examples/client_server/client/run_client.py new file mode 100644 index 00000000..4f3669e6 --- /dev/null +++ b/examples/client_server/client/run_client.py @@ -0,0 +1,26 @@ +from vnpy.event import EventEngine +from vnpy.trader.engine import MainEngine +from vnpy.trader.ui import MainWindow, create_qapp +from vnpy.gateway.rpc import RpcGateway +from vnpy.app.cta_strategy import CtaStrategyApp + + +def main(): + """""" + qapp = create_qapp() + + event_engine = EventEngine() + + main_engine = MainEngine(event_engine) + + main_engine.add_gateway(RpcGateway) + main_engine.add_app(CtaStrategyApp) + + main_window = MainWindow(main_engine, event_engine) + main_window.showMaximized() + + qapp.exec() + + +if __name__ == "__main__": + main() diff --git a/examples/client_server/server/run_server.py b/examples/client_server/server/run_server.py new file mode 100644 index 00000000..8d9f006a --- /dev/null +++ b/examples/client_server/server/run_server.py @@ -0,0 +1,72 @@ +from time import sleep + +from vnpy.event import EventEngine, Event +from vnpy.trader.engine import MainEngine +from vnpy.trader.ui import MainWindow, create_qapp +from vnpy.trader.event import EVENT_LOG +from vnpy.gateway.ctp import CtpGateway +from vnpy.app.rpc_service import RpcServiceApp +from vnpy.app.rpc_service.engine import EVENT_RPC_LOG + + +def main_ui(): + """""" + qapp = create_qapp() + + event_engine = EventEngine() + + main_engine = MainEngine(event_engine) + + main_engine.add_gateway(CtpGateway) + main_engine.add_app(RpcServiceApp) + + main_window = MainWindow(main_engine, event_engine) + main_window.showMaximized() + + qapp.exec() + + +def process_log_event(event: Event): + """""" + log = event.data + msg = f"{log.time}\t{log.msg}" + print(msg) + + +def main_terminal(): + """""" + event_engine = EventEngine() + event_engine.register(EVENT_LOG, process_log_event) + event_engine.register(EVENT_RPC_LOG, process_log_event) + + main_engine = MainEngine(event_engine) + main_engine.add_gateway(CtpGateway) + rpc_engine = main_engine.add_app(RpcServiceApp) + + setting = { + "用户名": "25500029", + "密码": "20140602", + "经纪商代码": "6010", + "交易服务器": "180.169.75.194:41305", + "行情服务器": "180.166.1.17:41313", + "产品名称": "vntech_vnpy_2.0", + "授权编码": "0Y1J5UIMY79BFL7S", + "产品信息": "" + } + main_engine.connect(setting, "CTP") + sleep(10) + + rep_address = "tcp://127.0.0.1:2014" + pub_address = "tcp://127.0.0.1:4102" + rpc_engine.start(rep_address, pub_address) + + while True: + sleep(1) + + +if __name__ == "__main__": + # Run in GUI mode + # main_ui() + + # Run in CLI mode + main_terminal() diff --git a/vnpy/rpc/test_client.py b/examples/simple_rpc/test_client.py similarity index 51% rename from vnpy/rpc/test_client.py rename to examples/simple_rpc/test_client.py index 7a693a52..77ba34f6 100644 --- a/vnpy/rpc/test_client.py +++ b/examples/simple_rpc/test_client.py @@ -10,26 +10,26 @@ class TestClient(RpcClient): Test RpcClient """ - def __init__(self, req_address, sub_address): + def __init__(self): """ Constructor """ - super(TestClient, self).__init__(req_address, sub_address) + super(TestClient, self).__init__() def callback(self, topic, data): """ Realize callable function """ - print('client received topic:', topic, ', data:', data) + print(f"client received topic:{topic}, data:{data}") -if __name__ == '__main__': - req_address = 'tcp://localhost:2014' - sub_address = 'tcp://localhost:0602' +if __name__ == "__main__": + req_address = "tcp://localhost:2014" + sub_address = "tcp://localhost:4102" - tc = TestClient(req_address, sub_address) - tc.subscribeTopic('') - tc.start() + tc = TestClient() + tc.subscribe_topic("") + tc.start(req_address, sub_address) while 1: print(tc.add(1, 3)) diff --git a/vnpy/rpc/test_server.py b/examples/simple_rpc/test_server.py similarity index 51% rename from vnpy/rpc/test_server.py rename to examples/simple_rpc/test_server.py index 660168fc..2839b861 100644 --- a/vnpy/rpc/test_server.py +++ b/examples/simple_rpc/test_server.py @@ -10,11 +10,11 @@ class TestServer(RpcServer): Test RpcServer """ - def __init__(self, rep_address, pub_address): + def __init__(self): """ Constructor """ - super(TestServer, self).__init__(rep_address, pub_address) + super(TestServer, self).__init__() self.register(self.add) @@ -22,19 +22,19 @@ class TestServer(RpcServer): """ Test function """ - print('receiving: %s, %s' % (a, b)) + print(f"receiving:{a} {b}") return a + b -if __name__ == '__main__': - rep_address = 'tcp://*:2014' - pub_address = 'tcp://*:0602' +if __name__ == "__main__": + rep_address = "tcp://*:2014" + pub_address = "tcp://*:4102" - ts = TestServer(rep_address, pub_address) - ts.start() + ts = TestServer() + ts.start(rep_address, pub_address) while 1: - content = 'current server time is %s' % time() + content = f"current server time is {time()}" print(content) - ts.publish('test', content) + ts.publish("test", content) sleep(2) diff --git a/examples/vn_trader/run.py b/examples/vn_trader/run.py index 187b5abf..e8fb8a62 100644 --- a/examples/vn_trader/run.py +++ b/examples/vn_trader/run.py @@ -31,6 +31,7 @@ from vnpy.app.cta_backtester import CtaBacktesterApp # from vnpy.app.data_recorder import DataRecorderApp # from vnpy.app.risk_manager import RiskManagerApp from vnpy.app.script_trader import ScriptTraderApp +from vnpy.app.rpc_service import RpcServiceApp def main(): @@ -68,6 +69,7 @@ def main(): # main_engine.add_app(DataRecorderApp) # main_engine.add_app(RiskManagerApp) main_engine.add_app(ScriptTraderApp) + main_engine.add_app(RpcServiceApp) main_window = MainWindow(main_engine, event_engine) main_window.showMaximized() diff --git a/vnpy/api/rest/rest_client.py b/vnpy/api/rest/rest_client.py index c2083688..dd36a503 100644 --- a/vnpy/api/rest/rest_client.py +++ b/vnpy/api/rest/rest_client.py @@ -1,5 +1,3 @@ -# encoding: UTF-8 - import sys import traceback from datetime import datetime diff --git a/vnpy/api/websocket/websocket_client.py b/vnpy/api/websocket/websocket_client.py index 84596a34..870d87a4 100644 --- a/vnpy/api/websocket/websocket_client.py +++ b/vnpy/api/websocket/websocket_client.py @@ -1,5 +1,3 @@ -# encoding: UTF-8 - import json import ssl import sys diff --git a/vnpy/app/risk_manager/engine.py b/vnpy/app/risk_manager/engine.py index 35ab4a74..c6193bc5 100644 --- a/vnpy/app/risk_manager/engine.py +++ b/vnpy/app/risk_manager/engine.py @@ -20,9 +20,6 @@ class RiskManagerEngine(BaseEngine): """""" super().__init__(main_engine, event_engine, APP_NAME) - self.main_engine = main_engine - self.event_engine = event_engine - self.active = False self.order_flow_count = 0 diff --git a/vnpy/app/rpc_service/__init__.py b/vnpy/app/rpc_service/__init__.py new file mode 100644 index 00000000..058bd067 --- /dev/null +++ b/vnpy/app/rpc_service/__init__.py @@ -0,0 +1,14 @@ +from pathlib import Path +from vnpy.trader.app import BaseApp +from .engine import RpcEngine, APP_NAME + + +class RpcServiceApp(BaseApp): + """""" + app_name = APP_NAME + app_module = __module__ + app_path = Path(__file__).parent + display_name = "RPC服务" + engine_class = RpcEngine + widget_name = "RpcManager" + icon_name = "rpc.ico" diff --git a/vnpy/app/rpc_service/engine.py b/vnpy/app/rpc_service/engine.py new file mode 100644 index 00000000..62a355a5 --- /dev/null +++ b/vnpy/app/rpc_service/engine.py @@ -0,0 +1,119 @@ +"""""" + +import traceback + +from vnpy.event import Event, EventEngine +from vnpy.rpc import RpcServer +from vnpy.trader.engine import BaseEngine, MainEngine +from vnpy.trader.utility import load_json, save_json +from vnpy.trader.object import LogData + +APP_NAME = "RpcService" + +EVENT_RPC_LOG = "eRpcLog" + + +class RpcEngine(BaseEngine): + """""" + setting_filename = "rpc_service_setting.json" + + def __init__(self, main_engine: MainEngine, event_engine: EventEngine): + """""" + super().__init__(main_engine, event_engine, APP_NAME) + + self.rep_address = "tcp://*:2014" + self.pub_address = "tcp://*:4102" + + self.server = None + + self.init_server() + self.load_setting() + self.register_event() + + def init_server(self): + """""" + self.server = RpcServer() + + self.server.register(self.main_engine.subscribe) + self.server.register(self.main_engine.send_order) + self.server.register(self.main_engine.send_orders) + self.server.register(self.main_engine.cancel_order) + self.server.register(self.main_engine.cancel_orders) + self.server.register(self.main_engine.query_history) + + self.server.register(self.main_engine.get_tick) + self.server.register(self.main_engine.get_order) + self.server.register(self.main_engine.get_trade) + self.server.register(self.main_engine.get_position) + self.server.register(self.main_engine.get_account) + self.server.register(self.main_engine.get_contract) + self.server.register(self.main_engine.get_all_ticks) + self.server.register(self.main_engine.get_all_orders) + self.server.register(self.main_engine.get_all_trades) + self.server.register(self.main_engine.get_all_positions) + self.server.register(self.main_engine.get_all_accounts) + self.server.register(self.main_engine.get_all_contracts) + self.server.register(self.main_engine.get_all_active_orders) + + def load_setting(self): + """""" + setting = load_json(self.setting_filename) + self.rep_address = setting.get("rep_address", self.rep_address) + self.pub_address = setting.get("pub_address", self.pub_address) + + def save_setting(self): + """""" + setting = { + "rep_address": self.rep_address, + "pub_address": self.pub_address + } + save_json(self.setting_filename, setting) + + def start(self, rep_address: str, pub_address: str): + """""" + if self.server.is_active(): + self.write_log("RPC服务运行中") + return False + + self.rep_address = rep_address + self.pub_address = pub_address + + try: + self.server.start(rep_address, pub_address) + except: # noqa + msg = traceback.format_exc() + self.write_log(f"RPC服务启动失败:{msg}") + return False + + self.save_setting() + self.write_log("RPC服务启动成功") + return True + + def stop(self): + """""" + if not self.server.is_active(): + self.write_log("RPC服务未启动") + return False + + self.server.stop() + self.write_log("RPC服务已停止") + return True + + def close(self): + """""" + self.stop() + + def register_event(self): + """""" + self.event_engine.register_general(self.process_event) + + def process_event(self, event: Event): + """""" + if self.server.is_active(): + self.server.publish("", event) + + def write_log(self, msg: str) -> None: + """""" + log = LogData(msg=msg, gateway_name=APP_NAME) + event = Event(EVENT_RPC_LOG, log) + self.event_engine.put(event) diff --git a/vnpy/app/rpc_service/ui/__init__.py b/vnpy/app/rpc_service/ui/__init__.py new file mode 100644 index 00000000..b9aff582 --- /dev/null +++ b/vnpy/app/rpc_service/ui/__init__.py @@ -0,0 +1 @@ +from .widget import RpcManager diff --git a/vnpy/app/rpc_service/ui/rpc.ico b/vnpy/app/rpc_service/ui/rpc.ico new file mode 100644 index 00000000..5e91b2db Binary files /dev/null and b/vnpy/app/rpc_service/ui/rpc.ico differ diff --git a/vnpy/app/rpc_service/ui/widget.py b/vnpy/app/rpc_service/ui/widget.py new file mode 100644 index 00000000..f811a22c --- /dev/null +++ b/vnpy/app/rpc_service/ui/widget.py @@ -0,0 +1,93 @@ +from vnpy.event import EventEngine, Event +from vnpy.trader.engine import MainEngine +from vnpy.trader.ui import QtWidgets, QtCore +from ..engine import APP_NAME, EVENT_RPC_LOG + + +class RpcManager(QtWidgets.QWidget): + """""" + signal_log = QtCore.pyqtSignal(Event) + + def __init__(self, main_engine: MainEngine, event_engine: EventEngine): + """""" + super().__init__() + + self.main_engine = main_engine + self.event_engine = event_engine + + self.rpc_engine = main_engine.get_engine(APP_NAME) + + self.init_ui() + self.register_event() + + def init_ui(self): + """""" + self.setWindowTitle("RPC服务") + self.setFixedWidth(900) + self.setFixedHeight(500) + + self.start_button = QtWidgets.QPushButton("启动") + self.start_button.clicked.connect(self.start_server) + + self.stop_button = QtWidgets.QPushButton("停止") + self.stop_button.clicked.connect(self.stop_server) + self.stop_button.setEnabled(False) + + for button in [self.start_button, self.stop_button]: + hint = button.sizeHint() + button.setFixedHeight(hint.height() * 2) + button.setFixedWidth(hint.width() * 4) + + self.rep_line = QtWidgets.QLineEdit(self.rpc_engine.rep_address) + self.rep_line.setFixedWidth(300) + + self.pub_line = QtWidgets.QLineEdit(self.rpc_engine.pub_address) + self.pub_line.setFixedWidth(300) + + self.log_monitor = QtWidgets.QTextEdit() + self.log_monitor.setReadOnly(True) + + form = QtWidgets.QFormLayout() + form.addRow("请求响应地址", self.rep_line) + form.addRow("事件广播地址", self.pub_line) + + hbox = QtWidgets.QHBoxLayout() + hbox.addLayout(form) + hbox.addWidget(self.start_button) + hbox.addWidget(self.stop_button) + hbox.addStretch() + + vbox = QtWidgets.QVBoxLayout() + vbox.addLayout(hbox) + vbox.addWidget(self.log_monitor) + + self.setLayout(vbox) + + def register_event(self): + """""" + self.signal_log.connect(self.process_log_event) + + self.event_engine.register(EVENT_RPC_LOG, self.signal_log.emit) + + def process_log_event(self, event: Event): + """""" + log = event.data + msg = f"{log.time}\t{log.msg}" + self.log_monitor.append(msg) + + def start_server(self): + """""" + rep_address = self.rep_line.text() + pub_address = self.pub_line.text() + + result = self.rpc_engine.start(rep_address, pub_address) + if result: + self.start_button.setEnabled(False) + self.stop_button.setEnabled(True) + + def stop_server(self): + """""" + result = self.rpc_engine.stop() + if result: + self.start_button.setEnabled(True) + self.stop_button.setEnabled(False) diff --git a/vnpy/app/script_trader/engine.py b/vnpy/app/script_trader/engine.py index 45642c24..ea08d830 100644 --- a/vnpy/app/script_trader/engine.py +++ b/vnpy/app/script_trader/engine.py @@ -40,9 +40,6 @@ class ScriptEngine(BaseEngine): """""" super().__init__(main_engine, event_engine, APP_NAME) - self.main_engine = main_engine - self.event_engine = event_engine - self.get_tick = main_engine.get_tick self.get_order = main_engine.get_order self.get_trade = main_engine.get_trade diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index 036708c3..b9df4b32 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -1,4 +1,3 @@ -# encoding: UTF-8 """ Author: vigarbuaa """ diff --git a/vnpy/gateway/bitfinex/bitfinex_gateway.py b/vnpy/gateway/bitfinex/bitfinex_gateway.py index 832c2969..ecec31d3 100644 --- a/vnpy/gateway/bitfinex/bitfinex_gateway.py +++ b/vnpy/gateway/bitfinex/bitfinex_gateway.py @@ -1,4 +1,3 @@ -# encoding: UTF-8 """ Author: vigarbuaa """ diff --git a/vnpy/gateway/bitmex/bitmex_gateway.py b/vnpy/gateway/bitmex/bitmex_gateway.py index 444f85b2..e5af3082 100644 --- a/vnpy/gateway/bitmex/bitmex_gateway.py +++ b/vnpy/gateway/bitmex/bitmex_gateway.py @@ -1,6 +1,4 @@ -# encoding: UTF-8 -""" -""" +"""""" import hashlib import hmac diff --git a/vnpy/gateway/femas/femas_gateway.py b/vnpy/gateway/femas/femas_gateway.py index 47f80fbb..2fd6a4e8 100644 --- a/vnpy/gateway/femas/femas_gateway.py +++ b/vnpy/gateway/femas/femas_gateway.py @@ -1,6 +1,4 @@ -# encoding: UTF-8 -""" -""" +"""""" from datetime import datetime diff --git a/vnpy/gateway/futu/futu_gateway.py b/vnpy/gateway/futu/futu_gateway.py index 2f45adbe..895be70a 100644 --- a/vnpy/gateway/futu/futu_gateway.py +++ b/vnpy/gateway/futu/futu_gateway.py @@ -1,4 +1,3 @@ -# encoding: UTF-8 """ Please install futu-api before use. """ diff --git a/vnpy/gateway/oes/oes_gateway.py b/vnpy/gateway/oes/oes_gateway.py index fd763782..9ed76ddf 100644 --- a/vnpy/gateway/oes/oes_gateway.py +++ b/vnpy/gateway/oes/oes_gateway.py @@ -1,4 +1,3 @@ -# encoding: UTF-8 """ Author: nanoric """ diff --git a/vnpy/gateway/okex/okex_gateway.py b/vnpy/gateway/okex/okex_gateway.py index ba7a2f80..5ea47268 100644 --- a/vnpy/gateway/okex/okex_gateway.py +++ b/vnpy/gateway/okex/okex_gateway.py @@ -1,4 +1,3 @@ -# encoding: UTF-8 """ """ diff --git a/vnpy/gateway/okexf/okexf_gateway.py b/vnpy/gateway/okexf/okexf_gateway.py index 26c1c87a..b1212682 100644 --- a/vnpy/gateway/okexf/okexf_gateway.py +++ b/vnpy/gateway/okexf/okexf_gateway.py @@ -1,4 +1,3 @@ -# encoding: UTF-8 """ Author: qqqlyx """ diff --git a/vnpy/gateway/onetoken/onetoken_gateway.py b/vnpy/gateway/onetoken/onetoken_gateway.py index e6af5faa..bd586eb3 100644 --- a/vnpy/gateway/onetoken/onetoken_gateway.py +++ b/vnpy/gateway/onetoken/onetoken_gateway.py @@ -1,4 +1,3 @@ -# encoding: UTF-8 """ """ diff --git a/vnpy/gateway/rpc/__init__.py b/vnpy/gateway/rpc/__init__.py new file mode 100644 index 00000000..5611fd75 --- /dev/null +++ b/vnpy/gateway/rpc/__init__.py @@ -0,0 +1 @@ +from .rpc_gateway import RpcGateway diff --git a/vnpy/gateway/rpc/rpc_gateway.py b/vnpy/gateway/rpc/rpc_gateway.py new file mode 100644 index 00000000..c9ad39a8 --- /dev/null +++ b/vnpy/gateway/rpc/rpc_gateway.py @@ -0,0 +1,116 @@ +from vnpy.event import Event +from vnpy.rpc import RpcClient +from vnpy.trader.gateway import BaseGateway +from vnpy.trader.object import ( + SubscribeRequest, + CancelRequest, + OrderRequest +) +from vnpy.trader.constant import Exchange + + +class RpcGateway(BaseGateway): + """ + VN Trader Gateway for RPC service. + """ + + default_setting = { + "主动请求地址": "tcp://127.0.0.1:2014", + "推送订阅地址": "tcp://127.0.0.1:4102" + } + + exchanges = list(Exchange) + + def __init__(self, event_engine): + """Constructor""" + super().__init__(event_engine, "RPC") + + self.symbol_gateway_map = {} + + self.client = RpcClient() + self.client.callback = self.client_callback + + def connect(self, setting: dict): + """""" + req_address = setting["主动请求地址"] + pub_address = setting["推送订阅地址"] + + self.client.subscribe_topic("") + self.client.start(req_address, pub_address) + + self.write_log("服务器连接成功,开始初始化查询") + + self.query_all() + + def subscribe(self, req: SubscribeRequest): + """""" + gateway_name = self.symbol_gateway_map.get(req.vt_symbol, "") + self.client.subscribe(req, gateway_name) + + def send_order(self, req: OrderRequest): + """""" + gateway_name = self.symbol_gateway_map.get(req.vt_symbol, "") + self.client.send_order(req, gateway_name) + + def cancel_order(self, req: CancelRequest): + """""" + gateway_name = self.symbol_gateway_map.get(req.vt_symbol, "") + self.client.cancel_order(req, gateway_name) + + def query_account(self): + """""" + pass + + def query_position(self): + """""" + pass + + def query_all(self): + """""" + contracts = self.client.get_all_contracts() + for contract in contracts: + self.symbol_gateway_map[contract.vt_symbol] = contract.gateway_name + contract.gateway_name = self.gateway_name + self.on_contract(contract) + self.write_log("合约信息查询成功") + + accounts = self.client.get_all_accounts() + for account in accounts: + account.gateway_name = self.gateway_name + self.on_account(account) + self.write_log("资金信息查询成功") + + positions = self.client.get_all_positions() + for position in positions: + position.gateway_name = self.gateway_name + self.on_position(position) + self.write_log("持仓信息查询成功") + + orders = self.client.get_all_orders() + for order in orders: + order.gateway_name = self.gateway_name + self.on_order(order) + self.write_log("委托信息查询成功") + + trades = self.client.get_all_trades() + for trade in trades: + trade.gateway_name = self.gateway_name + self.on_trade(trade) + self.write_log("成交信息查询成功") + + def close(self): + """""" + self.client.stop() + + def client_callback(self, topic: str, event: Event): + """""" + if event is None: + print("none event", topic, event) + return + + data = event.data + + if hasattr(data, "gateway_name"): + data.gateway_name = self.gateway_name + + self.event_engine.put(event) diff --git a/vnpy/gateway/tiger/tiger_gateway.py b/vnpy/gateway/tiger/tiger_gateway.py index c7796782..cfbead5e 100644 --- a/vnpy/gateway/tiger/tiger_gateway.py +++ b/vnpy/gateway/tiger/tiger_gateway.py @@ -1,4 +1,3 @@ -# encoding: UTF-8 """ Author: KeKe Please install tiger-api before use. diff --git a/vnpy/rpc/__init__.py b/vnpy/rpc/__init__.py index b903d2bb..52b2ec0d 100644 --- a/vnpy/rpc/__init__.py +++ b/vnpy/rpc/__init__.py @@ -1 +1,234 @@ -from .vnrpc import RpcServer, RpcClient, RemoteException \ No newline at end of file +import threading +import traceback +import signal +import zmq +from typing import Any, Callable + + +# Achieve Ctrl-c interrupt recv +signal.signal(signal.SIGINT, signal.SIG_DFL) + + +class RemoteException(Exception): + """ + RPC remote exception + """ + + def __init__(self, value): + """ + Constructor + """ + self.__value = value + + def __str__(self): + """ + Output error message + """ + return self.__value + + +class RpcServer: + """""" + + def __init__(self): + """ + Constructor + """ + # Save functions dict: key is fuction name, value is fuction object + self.__functions = {} + + # Zmq port related + self.__context = zmq.Context() + + # Reply socket (Request–reply pattern) + self.__socket_rep = self.__context.socket(zmq.REP) + + # Publish socket (Publish–subscribe pattern) + self.__socket_pub = self.__context.socket(zmq.PUB) + + # Worker thread related + self.__active = False # RpcServer status + self.__thread = None # RpcServer thread + + def is_active(self): + """""" + return self.__active + + def start(self, rep_address: str, pub_address: str): + """ + Start RpcServer + """ + if self.__active: + return + + # Bind socket address + self.__socket_rep.bind(rep_address) + self.__socket_pub.bind(pub_address) + + # Start RpcServer status + self.__active = True + + # Start RpcServer thread + self.__thread = threading.Thread(target=self.run) + self.__thread.start() + + def stop(self): + """ + Stop RpcServer + """ + if not self.__active: + return + + # Stop RpcServer status + self.__active = False + + # Wait for RpcServer thread to exit + if self.__thread.isAlive(): + self.__thread.join() + self.__thread = None + + # Unbind socket address + self.__socket_pub.unbind(self.__socket_pub.LAST_ENDPOINT) + self.__socket_rep.unbind(self.__socket_rep.LAST_ENDPOINT) + + def run(self): + """ + Run RpcServer functions + """ + while self.__active: + # Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds) + if not self.__socket_rep.poll(1000): + continue + + # Receive request data from Reply socket + req = self.__socket_rep.recv_pyobj() + + # Get function name and parameters + name, args, kwargs = req + + # Try to get and execute callable function object; capture exception information if it fails + try: + func = self.__functions[name] + r = func(*args, **kwargs) + rep = [True, r] + except Exception as e: # noqa + rep = [False, traceback.format_exc()] + + # send callable response by Reply socket + self.__socket_rep.send_pyobj(rep) + + def publish(self, topic: str, data: Any): + """ + Publish data + """ + self.__socket_pub.send_pyobj([topic, data]) + + def register(self, func: Callable): + """ + Register function + """ + self.__functions[func.__name__] = func + + +class RpcClient: + """""" + + def __init__(self): + """Constructor""" + # zmq port related + self.__context = zmq.Context() + + # Request socket (Request–reply pattern) + self.__socket_req = self.__context.socket(zmq.REQ) + + # Subscribe socket (Publish–subscribe pattern) + self.__socket_sub = self.__context.socket(zmq.SUB) + + # Worker thread relate, used to process data pushed from server + self.__active = False # RpcClient status + self.__thread = None # RpcClient thread + + def __getattr__(self, name: str): + """ + Realize remote call function + """ + # Perform remote call task + def dorpc(*args, **kwargs): + # Generate request + req = [name, args, kwargs] + + # Send request and wait for response + self.__socket_req.send_pyobj(req) + rep = self.__socket_req.recv_pyobj() + + # Return response if successed; Trigger exception if failed + if rep[0]: + return rep[1] + else: + raise RemoteException(rep[1]) + + return dorpc + + def start(self, req_address: str, sub_address: str): + """ + Start RpcClient + """ + if self.__active: + return + + # Connect zmq port + self.__socket_req.connect(req_address) + self.__socket_sub.connect(sub_address) + + # Start RpcClient status + self.__active = True + + # Start RpcClient thread + self.__thread = threading.Thread(target=self.run) + self.__thread.start() + + def stop(self): + """ + Stop RpcClient + """ + if not self.__active: + return + + # Stop RpcClient status + self.__active = False + + # Wait for RpcClient thread to exit + if self.__thread.isAlive(): + self.__thread.join() + self.__thread = None + + # Close socket + self.__socket_req.close() + self.__socket_sub.close() + + def run(self): + """ + Run RpcClient function + """ + while self.__active: + # Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds) + if not self.__socket_sub.poll(1000): + continue + + # Receive data from subscribe socket + topic, data = self.__socket_sub.recv_pyobj() + + # Process data by callable function + self.callback(topic, data) + + def callback(self, topic: str, data: Any): + """ + Callable function + """ + raise NotImplementedError + + def subscribe_topic(self, topic: str): + """ + Subscribe data + """ + self.__socket_sub.setsockopt_string(zmq.SUBSCRIBE, topic) diff --git a/vnpy/rpc/vnrpc.py b/vnpy/rpc/vnrpc.py deleted file mode 100644 index 1cdd835a..00000000 --- a/vnpy/rpc/vnrpc.py +++ /dev/null @@ -1,329 +0,0 @@ -import threading -import traceback -import signal - -import zmq -from msgpack import packb, unpackb -from json import dumps, loads - -import pickle -p_dumps = pickle.dumps -p_loads = pickle.loads - - -# Achieve Ctrl-c interrupt recv -signal.signal(signal.SIGINT, signal.SIG_DFL) - - -class RpcObject(object): - """ - Referred to serialization of packing and unpacking, we offer 3 tools: - 1) maspack: higher performance, but usually requires the installation of msgpack related tools; - 2) jason: Slightly lower performance but versatility is better, most programming languages have built-in libraries; - 3) cPickle: Lower performance and only can be used in Python, but it is very convenient to transfer Python objects directly. - - Therefore, it is recommended to use msgpack. - Use json, if you want to communicate with some languages without providing msgpack. - Use cPickle, when the data being transferred contains many custom Python objects. - """ - - def __init__(self): - """ - Constructor - Use msgpack as default serialization tool - """ - self.use_msgpack() - - def pack(self, data): - """""" - pass - - def unpack(self, data): - """""" - pass - - def __json_pack(self, data): - """ - Pack with json - """ - return dumps(data) - - def __json_unpack(self, data): - """ - Unpack with json - """ - return loads(data) - - def __msgpack_pack(self, data): - """ - Pack with msgpack - """ - return packb(data) - - def __msgpack_unpack(self, data): - """ - Unpack with msgpack - """ - return unpackb(data) - - def __pickle_pack(self, data): - """ - Pack with cPickle - """ - return p_dumps(data) - - def __pickle_unpack(self, data): - """ - Unpack with cPickle - """ - return p_loads(data) - - def use_json(self): - """ - Use json as serialization tool - """ - self.pack = self.__json_pack - self.unpack = self.__json_unpack - - def use_msgpack(self): - """ - Use msgpack as serialization tool - """ - self.pack = self.__msgpack_pack - self.unpack = self.__msgpack_unpack - - def use_pickle(self): - """ - Use cPickle as serialization tool - """ - self.pack = self.__pickle_pack - self.unpack = self.__pickle_unpack - - -class RpcServer(RpcObject): - """""" - - def __init__(self, rep_address, pub_address): - """ - Constructor - """ - super(RpcServer, self).__init__() - - # Save functions dict: key is fuction name, value is fuction object - self.__functions = {} - - # Zmq port related - self.__context = zmq.Context() - - self.__socket_rep = self.__context.socket( - zmq.REP) # Reply socket (Request–reply pattern) - self.__socket_rep.bind(rep_address) - - # Publish socket (Publish–subscribe pattern) - self.__socket_pub = self.__context.socket(zmq.PUB) - self.__socket_pub.bind(pub_address) - - # Woker thread related - self.__active = False # RpcServer status - self.__thread = threading.Thread(target=self.run) # RpcServer thread - - def start(self): - """ - Start RpcServer - """ - # Start RpcServer status - self.__active = True - - # Start RpcServer thread - if not self.__thread.isAlive(): - self.__thread.start() - - def stop(self, join=False): - """ - Stop RpcServer - """ - # Stop RpcServer status - self.__active = False - - # Wait for RpcServer thread to exit - if join and self.__thread.isAlive(): - self.__thread.join() - - def run(self): - """ - Run RpcServer functions - """ - while self.__active: - # Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds) - if not self.__socket_rep.poll(1000): - continue - - # Receive request data from Reply socket - reqb = self.__socket_rep.recv() - - # Unpack request by deserialization - req = self.unpack(reqb) - - # Get function name and parameters - name, args, kwargs = req - - # Try to get and execute callable function object; capture exception information if it fails - name = name.decode("UTF-8") - - try: - func = self.__functions[name] - r = func(*args, **kwargs) - rep = [True, r] - except Exception as e: # noqa - rep = [False, traceback.format_exc()] - - # Pack response by serialization - repb = self.pack(rep) - - # send callable response by Reply socket - self.__socket_rep.send(repb) - - def publish(self, topic, data): - """ - Publish data - """ - # Serialized data - topic = bytes(topic, "UTF-8") - datab = self.pack(data) - - # Send data by Publish socket - # topci must be ascii encoding - self.__socket_pub.send_multipart([topic, datab]) - - def register(self, func): - """ - Register function - """ - self.__functions[func.__name__] = func - - -class RpcClient(RpcObject): - """""" - - def __init__(self, req_address, sub_address): - """Constructor""" - super(RpcClient, self).__init__() - - # zmq port related - self.__req_address = req_address - self.__sub_address = sub_address - - self.__context = zmq.Context() - # Request socket (Request–reply pattern) - self.__socket_req = self.__context.socket(zmq.REQ) - # Subscribe socket (Publish–subscribe pattern) - self.__socket_sub = self.__context.socket(zmq.SUB) - - # Woker thread relate, used to process data pushed from server - self.__active = False # RpcClient status - self.__thread = threading.Thread( - target=self.run) # RpcClient thread - - def __getattr__(self, name): - """ - Realize remote call function - """ - # Perform remote call task - def dorpc(*args, **kwargs): - # Generate request - req = [name, args, kwargs] - - # Pack request by serialization - reqb = self.pack(req) - - # Send request and wait for response - self.__socket_req.send(reqb) - repb = self.__socket_req.recv() - - # Unpack response by deserialization - rep = self.unpack(repb) - - # Return response if successed; Trigger exception if failed - if rep[0]: - return rep[1] - else: - raise RemoteException(rep[1].decode("UTF-8")) - - return dorpc - - def start(self): - """ - Start RpcClient - """ - # Connect zmq port - self.__socket_req.connect(self.__req_address) - self.__socket_sub.connect(self.__sub_address) - - # Start RpcClient status - self.__active = True - - # Start RpcClient thread - if not self.__thread.isAlive(): - self.__thread.start() - - def stop(self): - """ - Stop RpcClient - """ - # Stop RpcClient status - self.__active = False - - # Wait for RpcClient thread to exit - if self.__thread.isAlive(): - self.__thread.join() - - def run(self): - """ - Run RpcClient function - """ - while self.__active: - # Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds) - if not self.__socket_sub.poll(1000): - continue - - # Receive data from subscribe socket - topic, datab = self.__socket_sub.recv_multipart() - - # Unpack data by deserialization - data = self.unpack(datab) - - # Process data by callable function - topic = topic.decode("UTF-8") - - self.callback(topic, data) - - def callback(self, topic, data): - """ - Callable function - """ - raise NotImplementedError - - def subscribeTopic(self, topic): - """ - Subscribe data - """ - topic = bytes(topic, "UTF-8") - self.__socket_sub.setsockopt(zmq.SUBSCRIBE, topic) - - -class RemoteException(Exception): - """ - RPC remote exception - """ - - def __init__(self, value): - """ - Constructor - """ - self.__value = value - - def __str__(self): - """ - Output error message - """ - return self.__value