[Mod] add RpcServiceApp

This commit is contained in:
vn.py 2019-07-02 11:08:14 +08:00
parent 01122f013d
commit 13a3865b8e
9 changed files with 245 additions and 11 deletions

View File

@ -31,6 +31,7 @@ from vnpy.app.cta_backtester import CtaBacktesterApp
# from vnpy.app.data_recorder import DataRecorderApp
# from vnpy.app.risk_manager import RiskManagerApp
from vnpy.app.script_trader import ScriptTraderApp
from vnpy.app.rpc_service import RpcServiceApp
def main():
@ -68,6 +69,7 @@ def main():
# main_engine.add_app(DataRecorderApp)
# main_engine.add_app(RiskManagerApp)
main_engine.add_app(ScriptTraderApp)
main_engine.add_app(RpcServiceApp)
main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()

View File

@ -20,9 +20,6 @@ class RiskManagerEngine(BaseEngine):
""""""
super().__init__(main_engine, event_engine, APP_NAME)
self.main_engine = main_engine
self.event_engine = event_engine
self.active = False
self.order_flow_count = 0

View File

@ -0,0 +1,14 @@
from pathlib import Path
from vnpy.trader.app import BaseApp
from .engine import RpcEngine, APP_NAME
class RpcServiceApp(BaseApp):
""""""
app_name = APP_NAME
app_module = __module__
app_path = Path(__file__).parent
display_name = "RPC服务"
engine_class = RpcEngine
widget_name = "RpcManager"
icon_name = "rpc.ico"

View File

@ -0,0 +1,119 @@
""""""
import traceback
from vnpy.event import Event, EventEngine
from vnpy.rpc import RpcServer
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.utility import load_json, save_json
from vnpy.trader.object import LogData
APP_NAME = "RpcService"
EVENT_RPC_LOG = "eRpcLog"
class RpcEngine(BaseEngine):
""""""
setting_filename = "rpc_service_setting.json"
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine, event_engine, APP_NAME)
self.rep_address = "tcp://*:2014"
self.pub_address = "tcp://*:4102"
self.server = None
self.init_server()
self.load_setting()
self.register_event()
def init_server(self):
""""""
self.server = RpcServer()
self.server.register(self.main_engine.subscribe)
self.server.register(self.main_engine.send_order)
self.server.register(self.main_engine.send_orders)
self.server.register(self.main_engine.cancel_order)
self.server.register(self.main_engine.cancel_orders)
self.server.register(self.main_engine.query_history)
self.server.register(self.main_engine.get_tick)
self.server.register(self.main_engine.get_order)
self.server.register(self.main_engine.get_trade)
self.server.register(self.main_engine.get_position)
self.server.register(self.main_engine.get_account)
self.server.register(self.main_engine.get_contract)
self.server.register(self.main_engine.get_all_ticks)
self.server.register(self.main_engine.get_all_orders)
self.server.register(self.main_engine.get_all_trades)
self.server.register(self.main_engine.get_all_positions)
self.server.register(self.main_engine.get_all_accounts)
self.server.register(self.main_engine.get_all_contracts)
self.server.register(self.main_engine.get_all_active_orders)
def load_setting(self):
""""""
setting = load_json(self.setting_filename)
self.rep_address = setting.get("rep_address", self.rep_address)
self.pub_address = setting.get("pub_address", self.pub_address)
def save_setting(self):
""""""
setting = {
"rep_address": self.rep_address,
"pub_address": self.pub_address
}
save_json(self.setting_filename, setting)
def start(self, rep_address: str, pub_address: str):
""""""
if self.server.is_active():
self.write_log("服务运行中")
return False
self.rep_address = rep_address
self.pub_address = pub_address
try:
self.server.start(rep_address, pub_address)
except: # noqa
msg = traceback.format_exc()
self.write_log(f"服务启动失败:{msg}")
return False
self.save_setting()
self.write_log("服务启动成功")
return True
def stop(self):
""""""
if not self.server.is_active():
self.write_log("服务未启动")
return False
self.server.stop()
self.write_log("服务已停止")
return True
def close(self):
""""""
self.stop()
def register_event(self):
""""""
self.event_engine.register_general(self.process_event)
def process_event(self, event: Event):
""""""
if self.server.is_active():
self.server.publish("", event)
def write_log(self, msg: str) -> None:
""""""
log = LogData(msg=msg, gateway_name=APP_NAME)
event = Event(EVENT_RPC_LOG, log)
self.event_engine.put(event)

View File

@ -0,0 +1 @@
from .widget import RpcManager

Binary file not shown.

After

Width:  |  Height:  |  Size: 65 KiB

View File

@ -0,0 +1,93 @@
from vnpy.event import EventEngine, Event
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import QtWidgets, QtCore
from ..engine import APP_NAME, EVENT_RPC_LOG
class RpcManager(QtWidgets.QWidget):
""""""
signal_log = QtCore.pyqtSignal(Event)
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__()
self.main_engine = main_engine
self.event_engine = event_engine
self.rpc_engine = main_engine.get_engine(APP_NAME)
self.init_ui()
self.register_event()
def init_ui(self):
""""""
self.setWindowTitle("RPC服务")
self.setFixedWidth(900)
self.setFixedHeight(500)
self.start_button = QtWidgets.QPushButton("启动")
self.start_button.clicked.connect(self.start_server)
self.stop_button = QtWidgets.QPushButton("停止")
self.stop_button.clicked.connect(self.stop_server)
self.stop_button.setEnabled(False)
for button in [self.start_button, self.stop_button]:
hint = button.sizeHint()
button.setFixedHeight(hint.height() * 2)
button.setFixedWidth(hint.width() * 4)
self.rep_line = QtWidgets.QLineEdit(self.rpc_engine.rep_address)
self.rep_line.setFixedWidth(300)
self.pub_line = QtWidgets.QLineEdit(self.rpc_engine.pub_address)
self.pub_line.setFixedWidth(300)
self.log_monitor = QtWidgets.QTextEdit()
self.log_monitor.setReadOnly(True)
form = QtWidgets.QFormLayout()
form.addRow("请求响应地址", self.rep_line)
form.addRow("事件广播地址", self.pub_line)
hbox = QtWidgets.QHBoxLayout()
hbox.addLayout(form)
hbox.addWidget(self.start_button)
hbox.addWidget(self.stop_button)
hbox.addStretch()
vbox = QtWidgets.QVBoxLayout()
vbox.addLayout(hbox)
vbox.addWidget(self.log_monitor)
self.setLayout(vbox)
def register_event(self):
""""""
self.signal_log.connect(self.process_log_event)
self.event_engine.register(EVENT_RPC_LOG, self.signal_log.emit)
def process_log_event(self, event: Event):
""""""
log = event.data
msg = f"{log.time}\t{log.msg}"
self.log_monitor.append(msg)
def start_server(self):
""""""
rep_address = self.rep_line.text()
pub_address = self.pub_line.text()
result = self.rpc_engine.start(rep_address, pub_address)
if result:
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
def stop_server(self):
""""""
result = self.rpc_engine.stop()
if result:
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)

View File

@ -40,9 +40,6 @@ class ScriptEngine(BaseEngine):
""""""
super().__init__(main_engine, event_engine, APP_NAME)
self.main_engine = main_engine
self.event_engine = event_engine
self.get_tick = main_engine.get_tick
self.get_order = main_engine.get_order
self.get_trade = main_engine.get_trade

View File

@ -48,12 +48,19 @@ class RpcServer:
# Worker thread related
self.__active = False # RpcServer status
self.__thread = threading.Thread(target=self.run) # RpcServer thread
self.__thread = None # RpcServer thread
def is_active(self):
""""""
return self.__active
def start(self, rep_address: str, pub_address: str):
"""
Start RpcServer
"""
if self.__active:
return
# Bind socket address
self.__socket_rep.bind(rep_address)
self.__socket_pub.bind(pub_address)
@ -62,19 +69,23 @@ class RpcServer:
self.__active = True
# Start RpcServer thread
if not self.__thread.isAlive():
self.__thread.start()
self.__thread = threading.Thread(target=self.run)
self.__thread.start()
def stop(self, join: bool = False):
def stop(self):
"""
Stop RpcServer
"""
if not self.__active:
return
# Stop RpcServer status
self.__active = False
# Wait for RpcServer thread to exit
if join and self.__thread.isAlive():
if self.__thread.isAlive():
self.__thread.join()
self.__thread = None
# Unbind socket address
self.__socket_pub.unbind(self.__socket_pub.LAST_ENDPOINT)