From 13a3865b8e64c4656b497b427dde6200f8067e35 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Tue, 2 Jul 2019 11:08:14 +0800 Subject: [PATCH] [Mod] add RpcServiceApp --- examples/vn_trader/run.py | 2 + vnpy/app/risk_manager/engine.py | 3 - vnpy/app/rpc_service/__init__.py | 14 ++++ vnpy/app/rpc_service/engine.py | 119 ++++++++++++++++++++++++++++ vnpy/app/rpc_service/ui/__init__.py | 1 + vnpy/app/rpc_service/ui/rpc.ico | Bin 0 -> 66622 bytes vnpy/app/rpc_service/ui/widget.py | 93 ++++++++++++++++++++++ vnpy/app/script_trader/engine.py | 3 - vnpy/rpc/__init__.py | 21 +++-- 9 files changed, 245 insertions(+), 11 deletions(-) create mode 100644 vnpy/app/rpc_service/__init__.py create mode 100644 vnpy/app/rpc_service/engine.py create mode 100644 vnpy/app/rpc_service/ui/__init__.py create mode 100644 vnpy/app/rpc_service/ui/rpc.ico create mode 100644 vnpy/app/rpc_service/ui/widget.py diff --git a/examples/vn_trader/run.py b/examples/vn_trader/run.py index 187b5abf..e8fb8a62 100644 --- a/examples/vn_trader/run.py +++ b/examples/vn_trader/run.py @@ -31,6 +31,7 @@ from vnpy.app.cta_backtester import CtaBacktesterApp # from vnpy.app.data_recorder import DataRecorderApp # from vnpy.app.risk_manager import RiskManagerApp from vnpy.app.script_trader import ScriptTraderApp +from vnpy.app.rpc_service import RpcServiceApp def main(): @@ -68,6 +69,7 @@ def main(): # main_engine.add_app(DataRecorderApp) # main_engine.add_app(RiskManagerApp) main_engine.add_app(ScriptTraderApp) + main_engine.add_app(RpcServiceApp) main_window = MainWindow(main_engine, event_engine) main_window.showMaximized() diff --git a/vnpy/app/risk_manager/engine.py b/vnpy/app/risk_manager/engine.py index 35ab4a74..c6193bc5 100644 --- a/vnpy/app/risk_manager/engine.py +++ b/vnpy/app/risk_manager/engine.py @@ -20,9 +20,6 @@ class RiskManagerEngine(BaseEngine): """""" super().__init__(main_engine, event_engine, APP_NAME) - self.main_engine = main_engine - self.event_engine = event_engine - self.active = False self.order_flow_count = 0 diff --git a/vnpy/app/rpc_service/__init__.py b/vnpy/app/rpc_service/__init__.py new file mode 100644 index 00000000..058bd067 --- /dev/null +++ b/vnpy/app/rpc_service/__init__.py @@ -0,0 +1,14 @@ +from pathlib import Path +from vnpy.trader.app import BaseApp +from .engine import RpcEngine, APP_NAME + + +class RpcServiceApp(BaseApp): + """""" + app_name = APP_NAME + app_module = __module__ + app_path = Path(__file__).parent + display_name = "RPC服务" + engine_class = RpcEngine + widget_name = "RpcManager" + icon_name = "rpc.ico" diff --git a/vnpy/app/rpc_service/engine.py b/vnpy/app/rpc_service/engine.py new file mode 100644 index 00000000..7784412a --- /dev/null +++ b/vnpy/app/rpc_service/engine.py @@ -0,0 +1,119 @@ +"""""" + +import traceback + +from vnpy.event import Event, EventEngine +from vnpy.rpc import RpcServer +from vnpy.trader.engine import BaseEngine, MainEngine +from vnpy.trader.utility import load_json, save_json +from vnpy.trader.object import LogData + +APP_NAME = "RpcService" + +EVENT_RPC_LOG = "eRpcLog" + + +class RpcEngine(BaseEngine): + """""" + setting_filename = "rpc_service_setting.json" + + def __init__(self, main_engine: MainEngine, event_engine: EventEngine): + """""" + super().__init__(main_engine, event_engine, APP_NAME) + + self.rep_address = "tcp://*:2014" + self.pub_address = "tcp://*:4102" + + self.server = None + + self.init_server() + self.load_setting() + self.register_event() + + def init_server(self): + """""" + self.server = RpcServer() + + self.server.register(self.main_engine.subscribe) + self.server.register(self.main_engine.send_order) + self.server.register(self.main_engine.send_orders) + self.server.register(self.main_engine.cancel_order) + self.server.register(self.main_engine.cancel_orders) + self.server.register(self.main_engine.query_history) + + self.server.register(self.main_engine.get_tick) + self.server.register(self.main_engine.get_order) + self.server.register(self.main_engine.get_trade) + self.server.register(self.main_engine.get_position) + self.server.register(self.main_engine.get_account) + self.server.register(self.main_engine.get_contract) + self.server.register(self.main_engine.get_all_ticks) + self.server.register(self.main_engine.get_all_orders) + self.server.register(self.main_engine.get_all_trades) + self.server.register(self.main_engine.get_all_positions) + self.server.register(self.main_engine.get_all_accounts) + self.server.register(self.main_engine.get_all_contracts) + self.server.register(self.main_engine.get_all_active_orders) + + def load_setting(self): + """""" + setting = load_json(self.setting_filename) + self.rep_address = setting.get("rep_address", self.rep_address) + self.pub_address = setting.get("pub_address", self.pub_address) + + def save_setting(self): + """""" + setting = { + "rep_address": self.rep_address, + "pub_address": self.pub_address + } + save_json(self.setting_filename, setting) + + def start(self, rep_address: str, pub_address: str): + """""" + if self.server.is_active(): + self.write_log("服务运行中") + return False + + self.rep_address = rep_address + self.pub_address = pub_address + + try: + self.server.start(rep_address, pub_address) + except: # noqa + msg = traceback.format_exc() + self.write_log(f"服务启动失败:{msg}") + return False + + self.save_setting() + self.write_log("服务启动成功") + return True + + def stop(self): + """""" + if not self.server.is_active(): + self.write_log("服务未启动") + return False + + self.server.stop() + self.write_log("服务已停止") + return True + + def close(self): + """""" + self.stop() + + def register_event(self): + """""" + self.event_engine.register_general(self.process_event) + + def process_event(self, event: Event): + """""" + if self.server.is_active(): + self.server.publish("", event) + + def write_log(self, msg: str) -> None: + """""" + log = LogData(msg=msg, gateway_name=APP_NAME) + event = Event(EVENT_RPC_LOG, log) + self.event_engine.put(event) diff --git a/vnpy/app/rpc_service/ui/__init__.py b/vnpy/app/rpc_service/ui/__init__.py new file mode 100644 index 00000000..b9aff582 --- /dev/null +++ b/vnpy/app/rpc_service/ui/__init__.py @@ -0,0 +1 @@ +from .widget import RpcManager diff --git a/vnpy/app/rpc_service/ui/rpc.ico b/vnpy/app/rpc_service/ui/rpc.ico new file mode 100644 index 0000000000000000000000000000000000000000..5e91b2dbb0393e7a88cab40368e3dfed12907be2 GIT binary patch literal 66622 zcmeHPYj73S0bUdY{?ryaiq#ICX%&7r{^!^KqVg z&Ue1=oU?m(HyTDO{5Ng7fq#ExbZy2-SDwRjTW7#`YS9>uYE1MZT~gZZvJYS|Py^I90>^RABY@qe-x2A12lP=4u)lNV z$#ZyahYa{mEgHj-W1;o7Uk85>xwNQvx2l^U+s(Cxj4f3JVJ-uV24jC~VlU;U(m^b=H${-gDV)`W=xRTcMALzE7Px)o$z7 zt-l7pZxlzyna460&$ei%728VJK@9+E;nuP(wR3P3)Rr2%>(YyKa#UkyAB5y=hebt2 zMt*+&L_jz&v$(i8W__NYu0wHj^W3s!i?MFqx=X>Y%)zmYc`S2|$=OD_csl!2arR5i zz;C95BQ>2!ja5ArDxT@$n5bRwis`$|Pjq@l$|aX5-;vL}jBrTQiP8&Ul<#5T=p2LO z8@1lmIGlBZic2oq_c#XB*49StCu(1b7w*}!$H>den}9u2C?NgfzJ2@7xh>qk#$gtj z9gj=A{POa0W5tRUoxqQKyW&~KJeI{f&z)aWQ)48{k@ei?CrVQ@YUki6s4X>4bgr|U z*8p-&=5X;;&%EwLdjq|I_WSSMyZ6$1aNZq?%m7L-T<%jWLZCizXf<7W{&Of z;lrMERXrJAjF9)Wo*c=gva)haX=!P>J7zZXf@`r+6(#(qFTp45&S zN-gWCEj3PbF3RcEmGIoWcSbxZ?^#cx1HwN82Z2+-=_F8({>~_%eti<>sCJK_?i;{m zfXN}ArfP21)zz_HKcEz-15{g{q2i~)WY3d*u&;hxM+iOFJL%pB)TE20Mf-JUc2m*eS2yu%lR9P zV~W>SI`iovy^PY(oxwW5<-mR*ot)yeU5$Rb18!<|8tc@MYkgio9LfDzrVmgDhyZXFeP#jfDQTx!2DAeTfFL-sPxeg? zHZOOCWdO&*F$KYs9@0xUwW#!ovU~&R1N;{_69DQ@logCXzfs&&2FrkUz&bzxndZqp z**7^TT2Ud(@>@Oc0fK51V{GC zzRAJn<&Lln;8-}OAb8S4dg-PXl@=Sb!8IFov+DqG%{b1YPiU1L#Z6_f41ng@1warS z*(duZ2SqC?WEsG*a7;n)q=)n}ikmxwb!1)bySG=`cW-^pb)M_>?w_7|z5A!7n#X#* z`>Uy5@BZnj*SmjO>h)k&s^?LVTOTFIx)l{!{fA!@2)c5oLYO1gA z{^_aLyMJ2h>%0G2>g&6|n(FoLub$2I`raG&EqI3)$U7un--iP3S4MW4Wk6dX4^aDV zrbG5k4mK}$gk``mfMd!uKlVot>7|=mRGM{w%Yg$x5L|1}Z+9Syo62CBD0+%K=K(=* zWS{dy!IR{mXhnrA19|`)QxH7qA-#;^=FVUp;Cx^X5Cqr7=(j!Kre>$b#vD^`)ZzT7 z1TxK&eVTJq6ngPAwd7cq1i+D+%n?PVm$r8CBB-l0?SS}P;1F;cs82JFqWwj{F9C3f z|BVR7VsTtMh2a2T2f%xnbaP}M>}!DdVy?xZXi7XOt#V6Gj_ECcV@x+kdPUFZ9X&L8 z#3d;U#Ky-P_!efj#(7r({ei)01iiZ&Ft3*g4YBzNPpW1eIVQv}0d4{A0@B5kZL^O~ zfUS`iNwI<7S=n{@Z4SrCv8Ib7+oe}mVQtVmtN+c9d(pJZ#Hxp!d@~;}c+}?N6*0%9 z*1j5J2174ys!Xgm_oU2gsC3vfT~X1YRzNGD6-Y^e*|TRGvu4dQ7%DJ5XU-gB=FFKX zwe8o*2p0qEB^YO3K&&FWB@YQ|M-8bZ zHKn$Zl|zssC*UDKI51o0hk&7>^t=q5${idTXI_{%k}I`yYDsNF!O=;AoPdcgt}|u6 zvua>*$>manWe%QVa+V>RQkUG+P?R$z^s?(*Wdc7+_ zdy`@H23@68IKSnljEjmW6>MwW4O!PI08Bw*q0aU9FXiz^&B5 zm%M9$J;3L{u7E)fdx8G}N>j#}7Z!fhikc~Y47UN)HmttW|197H;N-32jn{)(Qd4tn zh0S=y3t<}|2RH_3zf;qI8d6JY8cN?e4;43r=K+0yr2zNP3ZPPkj0cH>=)WFNbCPl9 z1<6VEtu&%$OMzPeY8hJ3)&8S6%CI%UPQY(~D?$MBxB>VMP`)$HyfAU3X4KAJOG@7` zhcjl(u=zTo^j?ie*s$P2UZ)AdT=Cx2&n*Xr*ZV)&R=m=aS z!8r4pzW^+0*^1Cg(?*eoKx;F;!y&HVL4!(B}2;I@~x-|Bq$ekw%1bMB4TzDLd_cx4Q? z*86=7*y1 z{CC7{^Ol(C`^rB=|3x3!-u8dfF$?XUgQJ~wK9SpMfe zDDUfAxZU>flkt7{>Jo9wtk*=3sn3Y(v!4|`|M0xH>zy^``j55l6ZwAU-`^ECJTb#; zk8R%g$62D^>+f0KVQt^BU^)7DL0peAmftk(CBeC1dWv$+nd@_&v&yGDBYHnOH)3AI z@Lv0FJ8ngi^Zhf?XXdM-`=d{b9#f{9-ly#QV=tO*^+209J~1=WcdXx@InUeWy`GvA z?fcYc?egBw{6+ij@-pJke=l*7w-#RB(uXnex z3DU{JlRh@1OKxDL~)*?uhW6-lqiV`$?~RzYLe<_JZfy_l8-oeQ(gBCB?Mw zEy?YMv7VFfJFCP51&?ci-scqkLR}+dE3Rft>4DI2T0^Q-|G%} zuSneUZlQ@G?-vQ?scSx-`=ETIU4HKeo2+NV^0(x$kGI-oY~$V!KC#bLZMZ^xeRoa3 zqe`i?cJp2N_S`!{{*lkY3c?{}hnUo3sc zGoHKNS?ltBv-ORZeEu_d**fjJ%hyE7WY)XsJND(s_3oqZdc7yI$@lVPe4ntRLX0ZJ z^_}nA*KU{B_wvB5@56DOh`j#m>-)J*jO2Xha~#LLJn!jaT-Q}zqU*cs+M8&P_M=>1 z-(3?UdEb+NhNpdZi5$YD_C18AWPprKx-LcsiTU-_~_LiK;t`bIeDNrSH zvnz#28C60|tvGvX;_kyI)%|_&VO8#fhf0kJ+pAi#)e&R2eXsP%cg&rs-=Bf+KZMA4 zd{c5z`|dPe`|ebu<;1n`Eyru;NR#%x;;hJa!sA{*@AFQD!x>M?clgckEyir#C&qqS zCdO|0T1@`tMBID8M84yB9=_Y%EAZW|iP1%S`7NgX?05XO17Z}u?N#OGJ72sLRQHGD zOTIJT7mO;j&cFYYuk12a#}(@9yKDL->pOmvS?|dCeFWamO!}%u;M|zrC-fb@X65a$ z|K4uox}EU-fbAW9<^Nj)-}akzMi%T6d?%>9zEO)&3 z<-LHjoYw_)Zk$m29>e#X&3Imy=(!EAod{%J=gO26Lx-&zXj9JX_fWTbFTMs&Di&Gm|w;N+IMBUkyB}|b^6{3DD=a7 ztoPF)dOlwW&pT<~ZOiRQwzbdG`!2u|paQ4?Y8wOiegwWBhSs$Kp$7d{084-_0M>e? zJzgEL6v*Mb*aq(}fY(<5*8tZx2JrnF_mKr7&*0wVILk4%ch>1v~u5GM`e zX>rytdew_er$u2ea4zqRf2uDe-^etW8Rvr;SW>kos~N3ll$l0PGtElIG@WiLdN}-Z z`-^GdJYvk>$ob)(FBwR4o>_-AUscL58kH_@IKMOJKWDT&dNFNIZY$$yrp?1#FNg(| zmNe}CBhuv-+s=w#p_RTcUV6G!-d2_Cv?++zL(cD}HXjO2`>-2G{_b?SRo+>ZBUFc4 I&q=5M53a@(ng9R* literal 0 HcmV?d00001 diff --git a/vnpy/app/rpc_service/ui/widget.py b/vnpy/app/rpc_service/ui/widget.py new file mode 100644 index 00000000..f811a22c --- /dev/null +++ b/vnpy/app/rpc_service/ui/widget.py @@ -0,0 +1,93 @@ +from vnpy.event import EventEngine, Event +from vnpy.trader.engine import MainEngine +from vnpy.trader.ui import QtWidgets, QtCore +from ..engine import APP_NAME, EVENT_RPC_LOG + + +class RpcManager(QtWidgets.QWidget): + """""" + signal_log = QtCore.pyqtSignal(Event) + + def __init__(self, main_engine: MainEngine, event_engine: EventEngine): + """""" + super().__init__() + + self.main_engine = main_engine + self.event_engine = event_engine + + self.rpc_engine = main_engine.get_engine(APP_NAME) + + self.init_ui() + self.register_event() + + def init_ui(self): + """""" + self.setWindowTitle("RPC服务") + self.setFixedWidth(900) + self.setFixedHeight(500) + + self.start_button = QtWidgets.QPushButton("启动") + self.start_button.clicked.connect(self.start_server) + + self.stop_button = QtWidgets.QPushButton("停止") + self.stop_button.clicked.connect(self.stop_server) + self.stop_button.setEnabled(False) + + for button in [self.start_button, self.stop_button]: + hint = button.sizeHint() + button.setFixedHeight(hint.height() * 2) + button.setFixedWidth(hint.width() * 4) + + self.rep_line = QtWidgets.QLineEdit(self.rpc_engine.rep_address) + self.rep_line.setFixedWidth(300) + + self.pub_line = QtWidgets.QLineEdit(self.rpc_engine.pub_address) + self.pub_line.setFixedWidth(300) + + self.log_monitor = QtWidgets.QTextEdit() + self.log_monitor.setReadOnly(True) + + form = QtWidgets.QFormLayout() + form.addRow("请求响应地址", self.rep_line) + form.addRow("事件广播地址", self.pub_line) + + hbox = QtWidgets.QHBoxLayout() + hbox.addLayout(form) + hbox.addWidget(self.start_button) + hbox.addWidget(self.stop_button) + hbox.addStretch() + + vbox = QtWidgets.QVBoxLayout() + vbox.addLayout(hbox) + vbox.addWidget(self.log_monitor) + + self.setLayout(vbox) + + def register_event(self): + """""" + self.signal_log.connect(self.process_log_event) + + self.event_engine.register(EVENT_RPC_LOG, self.signal_log.emit) + + def process_log_event(self, event: Event): + """""" + log = event.data + msg = f"{log.time}\t{log.msg}" + self.log_monitor.append(msg) + + def start_server(self): + """""" + rep_address = self.rep_line.text() + pub_address = self.pub_line.text() + + result = self.rpc_engine.start(rep_address, pub_address) + if result: + self.start_button.setEnabled(False) + self.stop_button.setEnabled(True) + + def stop_server(self): + """""" + result = self.rpc_engine.stop() + if result: + self.start_button.setEnabled(True) + self.stop_button.setEnabled(False) diff --git a/vnpy/app/script_trader/engine.py b/vnpy/app/script_trader/engine.py index 45642c24..ea08d830 100644 --- a/vnpy/app/script_trader/engine.py +++ b/vnpy/app/script_trader/engine.py @@ -40,9 +40,6 @@ class ScriptEngine(BaseEngine): """""" super().__init__(main_engine, event_engine, APP_NAME) - self.main_engine = main_engine - self.event_engine = event_engine - self.get_tick = main_engine.get_tick self.get_order = main_engine.get_order self.get_trade = main_engine.get_trade diff --git a/vnpy/rpc/__init__.py b/vnpy/rpc/__init__.py index 13fdd4fd..3c4aa58c 100644 --- a/vnpy/rpc/__init__.py +++ b/vnpy/rpc/__init__.py @@ -48,12 +48,19 @@ class RpcServer: # Worker thread related self.__active = False # RpcServer status - self.__thread = threading.Thread(target=self.run) # RpcServer thread + self.__thread = None # RpcServer thread + + def is_active(self): + """""" + return self.__active def start(self, rep_address: str, pub_address: str): """ Start RpcServer """ + if self.__active: + return + # Bind socket address self.__socket_rep.bind(rep_address) self.__socket_pub.bind(pub_address) @@ -62,19 +69,23 @@ class RpcServer: self.__active = True # Start RpcServer thread - if not self.__thread.isAlive(): - self.__thread.start() + self.__thread = threading.Thread(target=self.run) + self.__thread.start() - def stop(self, join: bool = False): + def stop(self): """ Stop RpcServer """ + if not self.__active: + return + # Stop RpcServer status self.__active = False # Wait for RpcServer thread to exit - if join and self.__thread.isAlive(): + if self.__thread.isAlive(): self.__thread.join() + self.__thread = None # Unbind socket address self.__socket_pub.unbind(self.__socket_pub.LAST_ENDPOINT)