diff --git a/examples/vn_trader/run.py b/examples/vn_trader/run.py index 187b5abf..e8fb8a62 100644 --- a/examples/vn_trader/run.py +++ b/examples/vn_trader/run.py @@ -31,6 +31,7 @@ from vnpy.app.cta_backtester import CtaBacktesterApp # from vnpy.app.data_recorder import DataRecorderApp # from vnpy.app.risk_manager import RiskManagerApp from vnpy.app.script_trader import ScriptTraderApp +from vnpy.app.rpc_service import RpcServiceApp def main(): @@ -68,6 +69,7 @@ def main(): # main_engine.add_app(DataRecorderApp) # main_engine.add_app(RiskManagerApp) main_engine.add_app(ScriptTraderApp) + main_engine.add_app(RpcServiceApp) main_window = MainWindow(main_engine, event_engine) main_window.showMaximized() diff --git a/vnpy/app/risk_manager/engine.py b/vnpy/app/risk_manager/engine.py index 35ab4a74..c6193bc5 100644 --- a/vnpy/app/risk_manager/engine.py +++ b/vnpy/app/risk_manager/engine.py @@ -20,9 +20,6 @@ class RiskManagerEngine(BaseEngine): """""" super().__init__(main_engine, event_engine, APP_NAME) - self.main_engine = main_engine - self.event_engine = event_engine - self.active = False self.order_flow_count = 0 diff --git a/vnpy/app/rpc_service/__init__.py b/vnpy/app/rpc_service/__init__.py new file mode 100644 index 00000000..058bd067 --- /dev/null +++ b/vnpy/app/rpc_service/__init__.py @@ -0,0 +1,14 @@ +from pathlib import Path +from vnpy.trader.app import BaseApp +from .engine import RpcEngine, APP_NAME + + +class RpcServiceApp(BaseApp): + """""" + app_name = APP_NAME + app_module = __module__ + app_path = Path(__file__).parent + display_name = "RPC服务" + engine_class = RpcEngine + widget_name = "RpcManager" + icon_name = "rpc.ico" diff --git a/vnpy/app/rpc_service/engine.py b/vnpy/app/rpc_service/engine.py new file mode 100644 index 00000000..7784412a --- /dev/null +++ b/vnpy/app/rpc_service/engine.py @@ -0,0 +1,119 @@ +"""""" + +import traceback + +from vnpy.event import Event, EventEngine +from vnpy.rpc import RpcServer +from vnpy.trader.engine import BaseEngine, MainEngine +from vnpy.trader.utility import load_json, save_json +from vnpy.trader.object import LogData + +APP_NAME = "RpcService" + +EVENT_RPC_LOG = "eRpcLog" + + +class RpcEngine(BaseEngine): + """""" + setting_filename = "rpc_service_setting.json" + + def __init__(self, main_engine: MainEngine, event_engine: EventEngine): + """""" + super().__init__(main_engine, event_engine, APP_NAME) + + self.rep_address = "tcp://*:2014" + self.pub_address = "tcp://*:4102" + + self.server = None + + self.init_server() + self.load_setting() + self.register_event() + + def init_server(self): + """""" + self.server = RpcServer() + + self.server.register(self.main_engine.subscribe) + self.server.register(self.main_engine.send_order) + self.server.register(self.main_engine.send_orders) + self.server.register(self.main_engine.cancel_order) + self.server.register(self.main_engine.cancel_orders) + self.server.register(self.main_engine.query_history) + + self.server.register(self.main_engine.get_tick) + self.server.register(self.main_engine.get_order) + self.server.register(self.main_engine.get_trade) + self.server.register(self.main_engine.get_position) + self.server.register(self.main_engine.get_account) + self.server.register(self.main_engine.get_contract) + self.server.register(self.main_engine.get_all_ticks) + self.server.register(self.main_engine.get_all_orders) + self.server.register(self.main_engine.get_all_trades) + self.server.register(self.main_engine.get_all_positions) + self.server.register(self.main_engine.get_all_accounts) + self.server.register(self.main_engine.get_all_contracts) + self.server.register(self.main_engine.get_all_active_orders) + + def load_setting(self): + """""" + setting = load_json(self.setting_filename) + self.rep_address = setting.get("rep_address", self.rep_address) + self.pub_address = setting.get("pub_address", self.pub_address) + + def save_setting(self): + """""" + setting = { + "rep_address": self.rep_address, + "pub_address": self.pub_address + } + save_json(self.setting_filename, setting) + + def start(self, rep_address: str, pub_address: str): + """""" + if self.server.is_active(): + self.write_log("服务运行中") + return False + + self.rep_address = rep_address + self.pub_address = pub_address + + try: + self.server.start(rep_address, pub_address) + except: # noqa + msg = traceback.format_exc() + self.write_log(f"服务启动失败:{msg}") + return False + + self.save_setting() + self.write_log("服务启动成功") + return True + + def stop(self): + """""" + if not self.server.is_active(): + self.write_log("服务未启动") + return False + + self.server.stop() + self.write_log("服务已停止") + return True + + def close(self): + """""" + self.stop() + + def register_event(self): + """""" + self.event_engine.register_general(self.process_event) + + def process_event(self, event: Event): + """""" + if self.server.is_active(): + self.server.publish("", event) + + def write_log(self, msg: str) -> None: + """""" + log = LogData(msg=msg, gateway_name=APP_NAME) + event = Event(EVENT_RPC_LOG, log) + self.event_engine.put(event) diff --git a/vnpy/app/rpc_service/ui/__init__.py b/vnpy/app/rpc_service/ui/__init__.py new file mode 100644 index 00000000..b9aff582 --- /dev/null +++ b/vnpy/app/rpc_service/ui/__init__.py @@ -0,0 +1 @@ +from .widget import RpcManager diff --git a/vnpy/app/rpc_service/ui/rpc.ico b/vnpy/app/rpc_service/ui/rpc.ico new file mode 100644 index 00000000..5e91b2db Binary files /dev/null and b/vnpy/app/rpc_service/ui/rpc.ico differ diff --git a/vnpy/app/rpc_service/ui/widget.py b/vnpy/app/rpc_service/ui/widget.py new file mode 100644 index 00000000..f811a22c --- /dev/null +++ b/vnpy/app/rpc_service/ui/widget.py @@ -0,0 +1,93 @@ +from vnpy.event import EventEngine, Event +from vnpy.trader.engine import MainEngine +from vnpy.trader.ui import QtWidgets, QtCore +from ..engine import APP_NAME, EVENT_RPC_LOG + + +class RpcManager(QtWidgets.QWidget): + """""" + signal_log = QtCore.pyqtSignal(Event) + + def __init__(self, main_engine: MainEngine, event_engine: EventEngine): + """""" + super().__init__() + + self.main_engine = main_engine + self.event_engine = event_engine + + self.rpc_engine = main_engine.get_engine(APP_NAME) + + self.init_ui() + self.register_event() + + def init_ui(self): + """""" + self.setWindowTitle("RPC服务") + self.setFixedWidth(900) + self.setFixedHeight(500) + + self.start_button = QtWidgets.QPushButton("启动") + self.start_button.clicked.connect(self.start_server) + + self.stop_button = QtWidgets.QPushButton("停止") + self.stop_button.clicked.connect(self.stop_server) + self.stop_button.setEnabled(False) + + for button in [self.start_button, self.stop_button]: + hint = button.sizeHint() + button.setFixedHeight(hint.height() * 2) + button.setFixedWidth(hint.width() * 4) + + self.rep_line = QtWidgets.QLineEdit(self.rpc_engine.rep_address) + self.rep_line.setFixedWidth(300) + + self.pub_line = QtWidgets.QLineEdit(self.rpc_engine.pub_address) + self.pub_line.setFixedWidth(300) + + self.log_monitor = QtWidgets.QTextEdit() + self.log_monitor.setReadOnly(True) + + form = QtWidgets.QFormLayout() + form.addRow("请求响应地址", self.rep_line) + form.addRow("事件广播地址", self.pub_line) + + hbox = QtWidgets.QHBoxLayout() + hbox.addLayout(form) + hbox.addWidget(self.start_button) + hbox.addWidget(self.stop_button) + hbox.addStretch() + + vbox = QtWidgets.QVBoxLayout() + vbox.addLayout(hbox) + vbox.addWidget(self.log_monitor) + + self.setLayout(vbox) + + def register_event(self): + """""" + self.signal_log.connect(self.process_log_event) + + self.event_engine.register(EVENT_RPC_LOG, self.signal_log.emit) + + def process_log_event(self, event: Event): + """""" + log = event.data + msg = f"{log.time}\t{log.msg}" + self.log_monitor.append(msg) + + def start_server(self): + """""" + rep_address = self.rep_line.text() + pub_address = self.pub_line.text() + + result = self.rpc_engine.start(rep_address, pub_address) + if result: + self.start_button.setEnabled(False) + self.stop_button.setEnabled(True) + + def stop_server(self): + """""" + result = self.rpc_engine.stop() + if result: + self.start_button.setEnabled(True) + self.stop_button.setEnabled(False) diff --git a/vnpy/app/script_trader/engine.py b/vnpy/app/script_trader/engine.py index 45642c24..ea08d830 100644 --- a/vnpy/app/script_trader/engine.py +++ b/vnpy/app/script_trader/engine.py @@ -40,9 +40,6 @@ class ScriptEngine(BaseEngine): """""" super().__init__(main_engine, event_engine, APP_NAME) - self.main_engine = main_engine - self.event_engine = event_engine - self.get_tick = main_engine.get_tick self.get_order = main_engine.get_order self.get_trade = main_engine.get_trade diff --git a/vnpy/rpc/__init__.py b/vnpy/rpc/__init__.py index 13fdd4fd..3c4aa58c 100644 --- a/vnpy/rpc/__init__.py +++ b/vnpy/rpc/__init__.py @@ -48,12 +48,19 @@ class RpcServer: # Worker thread related self.__active = False # RpcServer status - self.__thread = threading.Thread(target=self.run) # RpcServer thread + self.__thread = None # RpcServer thread + + def is_active(self): + """""" + return self.__active def start(self, rep_address: str, pub_address: str): """ Start RpcServer """ + if self.__active: + return + # Bind socket address self.__socket_rep.bind(rep_address) self.__socket_pub.bind(pub_address) @@ -62,19 +69,23 @@ class RpcServer: self.__active = True # Start RpcServer thread - if not self.__thread.isAlive(): - self.__thread.start() + self.__thread = threading.Thread(target=self.run) + self.__thread.start() - def stop(self, join: bool = False): + def stop(self): """ Stop RpcServer """ + if not self.__active: + return + # Stop RpcServer status self.__active = False # Wait for RpcServer thread to exit - if join and self.__thread.isAlive(): + if self.__thread.isAlive(): self.__thread.join() + self.__thread = None # Unbind socket address self.__socket_pub.unbind(self.__socket_pub.LAST_ENDPOINT)