From 3b8b73b954ed26b8b179568dbc9791ddeb533665 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Mon, 1 Jul 2019 21:35:28 +0800 Subject: [PATCH 1/6] [Mod] move test_client/test_server to examples folder --- {vnpy/rpc => examples/simple_rpc}/test_client.py | 10 +++++----- {vnpy/rpc => examples/simple_rpc}/test_server.py | 12 ++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) rename {vnpy/rpc => examples/simple_rpc}/test_client.py (74%) rename {vnpy/rpc => examples/simple_rpc}/test_server.py (72%) diff --git a/vnpy/rpc/test_client.py b/examples/simple_rpc/test_client.py similarity index 74% rename from vnpy/rpc/test_client.py rename to examples/simple_rpc/test_client.py index 7a693a52..57dd0909 100644 --- a/vnpy/rpc/test_client.py +++ b/examples/simple_rpc/test_client.py @@ -20,15 +20,15 @@ class TestClient(RpcClient): """ Realize callable function """ - print('client received topic:', topic, ', data:', data) + print(f"client received topic:{topic}, data:{data}") -if __name__ == '__main__': - req_address = 'tcp://localhost:2014' - sub_address = 'tcp://localhost:0602' +if __name__ == "__main__": + req_address = "tcp://localhost:2014" + sub_address = "tcp://localhost:0602" tc = TestClient(req_address, sub_address) - tc.subscribeTopic('') + tc.subscribe_topic("") tc.start() while 1: diff --git a/vnpy/rpc/test_server.py b/examples/simple_rpc/test_server.py similarity index 72% rename from vnpy/rpc/test_server.py rename to examples/simple_rpc/test_server.py index 660168fc..3589bead 100644 --- a/vnpy/rpc/test_server.py +++ b/examples/simple_rpc/test_server.py @@ -22,19 +22,19 @@ class TestServer(RpcServer): """ Test function """ - print('receiving: %s, %s' % (a, b)) + print(f"receiving:{a} {b}") return a + b -if __name__ == '__main__': - rep_address = 'tcp://*:2014' - pub_address = 'tcp://*:0602' +if __name__ == "__main__": + rep_address = "tcp://*:2014" + pub_address = "tcp://*:0602" ts = TestServer(rep_address, pub_address) ts.start() while 1: - content = 'current server time is %s' % time() + content = f"current server time is {time()}" print(content) - ts.publish('test', content) + ts.publish("test", content) sleep(2) From c41f84de550461036c16d393764274c5593980d8 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Mon, 1 Jul 2019 21:35:55 +0800 Subject: [PATCH 2/6] [Mod] simplify rpc module --- vnpy/rpc/__init__.py | 210 ++++++++++++++++++++++++++- vnpy/rpc/vnrpc.py | 329 ------------------------------------------- 2 files changed, 209 insertions(+), 330 deletions(-) delete mode 100644 vnpy/rpc/vnrpc.py diff --git a/vnpy/rpc/__init__.py b/vnpy/rpc/__init__.py index b903d2bb..f12a1c83 100644 --- a/vnpy/rpc/__init__.py +++ b/vnpy/rpc/__init__.py @@ -1 +1,209 @@ -from .vnrpc import RpcServer, RpcClient, RemoteException \ No newline at end of file +import threading +import traceback +import signal +import zmq + + +# Achieve Ctrl-c interrupt recv +signal.signal(signal.SIGINT, signal.SIG_DFL) + + +class RemoteException(Exception): + """ + RPC remote exception + """ + + def __init__(self, value): + """ + Constructor + """ + self.__value = value + + def __str__(self): + """ + Output error message + """ + return self.__value + + +class RpcServer: + """""" + + def __init__(self, rep_address, pub_address): + """ + Constructor + """ + # Save functions dict: key is fuction name, value is fuction object + self.__functions = {} + + # Zmq port related + self.__context = zmq.Context() + + # Reply socket (Request–reply pattern) + self.__socket_rep = self.__context.socket(zmq.REP) + self.__socket_rep.bind(rep_address) + + # Publish socket (Publish–subscribe pattern) + self.__socket_pub = self.__context.socket(zmq.PUB) + self.__socket_pub.bind(pub_address) + + # Worker thread related + self.__active = False # RpcServer status + self.__thread = threading.Thread(target=self.run) # RpcServer thread + + def start(self): + """ + Start RpcServer + """ + # Start RpcServer status + self.__active = True + + # Start RpcServer thread + if not self.__thread.isAlive(): + self.__thread.start() + + def stop(self, join=False): + """ + Stop RpcServer + """ + # Stop RpcServer status + self.__active = False + + # Wait for RpcServer thread to exit + if join and self.__thread.isAlive(): + self.__thread.join() + + def run(self): + """ + Run RpcServer functions + """ + while self.__active: + # Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds) + if not self.__socket_rep.poll(1000): + continue + + # Receive request data from Reply socket + req = self.__socket_rep.recv_pyobj() + + # Get function name and parameters + name, args, kwargs = req + + # Try to get and execute callable function object; capture exception information if it fails + try: + func = self.__functions[name] + r = func(*args, **kwargs) + rep = [True, r] + except Exception as e: # noqa + rep = [False, traceback.format_exc()] + + # send callable response by Reply socket + self.__socket_rep.send_pyobj(rep) + + def publish(self, topic, data): + """ + Publish data + """ + self.__socket_pub.send_pyobj([topic, data]) + + def register(self, func): + """ + Register function + """ + self.__functions[func.__name__] = func + + +class RpcClient: + """""" + + def __init__(self, req_address, sub_address): + """Constructor""" + super(RpcClient, self).__init__() + + # zmq port related + self.__req_address = req_address + self.__sub_address = sub_address + + self.__context = zmq.Context() + # Request socket (Request–reply pattern) + self.__socket_req = self.__context.socket(zmq.REQ) + # Subscribe socket (Publish–subscribe pattern) + self.__socket_sub = self.__context.socket(zmq.SUB) + + # Worker thread relate, used to process data pushed from server + self.__active = False # RpcClient status + self.__thread = threading.Thread( + target=self.run) # RpcClient thread + + def __getattr__(self, name): + """ + Realize remote call function + """ + # Perform remote call task + def dorpc(*args, **kwargs): + # Generate request + req = [name, args, kwargs] + + # Send request and wait for response + self.__socket_req.send_pyobj(req) + rep = self.__socket_req.recv_pyobj() + + # Return response if successed; Trigger exception if failed + if rep[0]: + return rep[1] + else: + raise RemoteException(rep[1]) + + return dorpc + + def start(self): + """ + Start RpcClient + """ + # Connect zmq port + self.__socket_req.connect(self.__req_address) + self.__socket_sub.connect(self.__sub_address) + + # Start RpcClient status + self.__active = True + + # Start RpcClient thread + if not self.__thread.isAlive(): + self.__thread.start() + + def stop(self): + """ + Stop RpcClient + """ + # Stop RpcClient status + self.__active = False + + # Wait for RpcClient thread to exit + if self.__thread.isAlive(): + self.__thread.join() + + def run(self): + """ + Run RpcClient function + """ + while self.__active: + # Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds) + if not self.__socket_sub.poll(1000): + continue + + # Receive data from subscribe socket + topic, data = self.__socket_sub.recv_pyobj() + + # Process data by callable function + self.callback(topic, data) + + def callback(self, topic, data): + """ + Callable function + """ + raise NotImplementedError + + def subscribe_topic(self, topic): + """ + Subscribe data + """ + self.__socket_sub.setsockopt_string(zmq.SUBSCRIBE, topic) diff --git a/vnpy/rpc/vnrpc.py b/vnpy/rpc/vnrpc.py deleted file mode 100644 index 1cdd835a..00000000 --- a/vnpy/rpc/vnrpc.py +++ /dev/null @@ -1,329 +0,0 @@ -import threading -import traceback -import signal - -import zmq -from msgpack import packb, unpackb -from json import dumps, loads - -import pickle -p_dumps = pickle.dumps -p_loads = pickle.loads - - -# Achieve Ctrl-c interrupt recv -signal.signal(signal.SIGINT, signal.SIG_DFL) - - -class RpcObject(object): - """ - Referred to serialization of packing and unpacking, we offer 3 tools: - 1) maspack: higher performance, but usually requires the installation of msgpack related tools; - 2) jason: Slightly lower performance but versatility is better, most programming languages have built-in libraries; - 3) cPickle: Lower performance and only can be used in Python, but it is very convenient to transfer Python objects directly. - - Therefore, it is recommended to use msgpack. - Use json, if you want to communicate with some languages without providing msgpack. - Use cPickle, when the data being transferred contains many custom Python objects. - """ - - def __init__(self): - """ - Constructor - Use msgpack as default serialization tool - """ - self.use_msgpack() - - def pack(self, data): - """""" - pass - - def unpack(self, data): - """""" - pass - - def __json_pack(self, data): - """ - Pack with json - """ - return dumps(data) - - def __json_unpack(self, data): - """ - Unpack with json - """ - return loads(data) - - def __msgpack_pack(self, data): - """ - Pack with msgpack - """ - return packb(data) - - def __msgpack_unpack(self, data): - """ - Unpack with msgpack - """ - return unpackb(data) - - def __pickle_pack(self, data): - """ - Pack with cPickle - """ - return p_dumps(data) - - def __pickle_unpack(self, data): - """ - Unpack with cPickle - """ - return p_loads(data) - - def use_json(self): - """ - Use json as serialization tool - """ - self.pack = self.__json_pack - self.unpack = self.__json_unpack - - def use_msgpack(self): - """ - Use msgpack as serialization tool - """ - self.pack = self.__msgpack_pack - self.unpack = self.__msgpack_unpack - - def use_pickle(self): - """ - Use cPickle as serialization tool - """ - self.pack = self.__pickle_pack - self.unpack = self.__pickle_unpack - - -class RpcServer(RpcObject): - """""" - - def __init__(self, rep_address, pub_address): - """ - Constructor - """ - super(RpcServer, self).__init__() - - # Save functions dict: key is fuction name, value is fuction object - self.__functions = {} - - # Zmq port related - self.__context = zmq.Context() - - self.__socket_rep = self.__context.socket( - zmq.REP) # Reply socket (Request–reply pattern) - self.__socket_rep.bind(rep_address) - - # Publish socket (Publish–subscribe pattern) - self.__socket_pub = self.__context.socket(zmq.PUB) - self.__socket_pub.bind(pub_address) - - # Woker thread related - self.__active = False # RpcServer status - self.__thread = threading.Thread(target=self.run) # RpcServer thread - - def start(self): - """ - Start RpcServer - """ - # Start RpcServer status - self.__active = True - - # Start RpcServer thread - if not self.__thread.isAlive(): - self.__thread.start() - - def stop(self, join=False): - """ - Stop RpcServer - """ - # Stop RpcServer status - self.__active = False - - # Wait for RpcServer thread to exit - if join and self.__thread.isAlive(): - self.__thread.join() - - def run(self): - """ - Run RpcServer functions - """ - while self.__active: - # Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds) - if not self.__socket_rep.poll(1000): - continue - - # Receive request data from Reply socket - reqb = self.__socket_rep.recv() - - # Unpack request by deserialization - req = self.unpack(reqb) - - # Get function name and parameters - name, args, kwargs = req - - # Try to get and execute callable function object; capture exception information if it fails - name = name.decode("UTF-8") - - try: - func = self.__functions[name] - r = func(*args, **kwargs) - rep = [True, r] - except Exception as e: # noqa - rep = [False, traceback.format_exc()] - - # Pack response by serialization - repb = self.pack(rep) - - # send callable response by Reply socket - self.__socket_rep.send(repb) - - def publish(self, topic, data): - """ - Publish data - """ - # Serialized data - topic = bytes(topic, "UTF-8") - datab = self.pack(data) - - # Send data by Publish socket - # topci must be ascii encoding - self.__socket_pub.send_multipart([topic, datab]) - - def register(self, func): - """ - Register function - """ - self.__functions[func.__name__] = func - - -class RpcClient(RpcObject): - """""" - - def __init__(self, req_address, sub_address): - """Constructor""" - super(RpcClient, self).__init__() - - # zmq port related - self.__req_address = req_address - self.__sub_address = sub_address - - self.__context = zmq.Context() - # Request socket (Request–reply pattern) - self.__socket_req = self.__context.socket(zmq.REQ) - # Subscribe socket (Publish–subscribe pattern) - self.__socket_sub = self.__context.socket(zmq.SUB) - - # Woker thread relate, used to process data pushed from server - self.__active = False # RpcClient status - self.__thread = threading.Thread( - target=self.run) # RpcClient thread - - def __getattr__(self, name): - """ - Realize remote call function - """ - # Perform remote call task - def dorpc(*args, **kwargs): - # Generate request - req = [name, args, kwargs] - - # Pack request by serialization - reqb = self.pack(req) - - # Send request and wait for response - self.__socket_req.send(reqb) - repb = self.__socket_req.recv() - - # Unpack response by deserialization - rep = self.unpack(repb) - - # Return response if successed; Trigger exception if failed - if rep[0]: - return rep[1] - else: - raise RemoteException(rep[1].decode("UTF-8")) - - return dorpc - - def start(self): - """ - Start RpcClient - """ - # Connect zmq port - self.__socket_req.connect(self.__req_address) - self.__socket_sub.connect(self.__sub_address) - - # Start RpcClient status - self.__active = True - - # Start RpcClient thread - if not self.__thread.isAlive(): - self.__thread.start() - - def stop(self): - """ - Stop RpcClient - """ - # Stop RpcClient status - self.__active = False - - # Wait for RpcClient thread to exit - if self.__thread.isAlive(): - self.__thread.join() - - def run(self): - """ - Run RpcClient function - """ - while self.__active: - # Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds) - if not self.__socket_sub.poll(1000): - continue - - # Receive data from subscribe socket - topic, datab = self.__socket_sub.recv_multipart() - - # Unpack data by deserialization - data = self.unpack(datab) - - # Process data by callable function - topic = topic.decode("UTF-8") - - self.callback(topic, data) - - def callback(self, topic, data): - """ - Callable function - """ - raise NotImplementedError - - def subscribeTopic(self, topic): - """ - Subscribe data - """ - topic = bytes(topic, "UTF-8") - self.__socket_sub.setsockopt(zmq.SUBSCRIBE, topic) - - -class RemoteException(Exception): - """ - RPC remote exception - """ - - def __init__(self, value): - """ - Constructor - """ - self.__value = value - - def __str__(self): - """ - Output error message - """ - return self.__value From 01122f013d6cf365ea40e3b0278883bcf514e661 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Tue, 2 Jul 2019 09:32:46 +0800 Subject: [PATCH 3/6] [Mod] add unbind socket in server stop method --- examples/simple_rpc/test_client.py | 2 +- examples/simple_rpc/test_server.py | 10 ++++----- vnpy/rpc/__init__.py | 33 +++++++++++++++++------------- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/examples/simple_rpc/test_client.py b/examples/simple_rpc/test_client.py index 57dd0909..22689971 100644 --- a/examples/simple_rpc/test_client.py +++ b/examples/simple_rpc/test_client.py @@ -25,7 +25,7 @@ class TestClient(RpcClient): if __name__ == "__main__": req_address = "tcp://localhost:2014" - sub_address = "tcp://localhost:0602" + sub_address = "tcp://localhost:4102" tc = TestClient(req_address, sub_address) tc.subscribe_topic("") diff --git a/examples/simple_rpc/test_server.py b/examples/simple_rpc/test_server.py index 3589bead..2839b861 100644 --- a/examples/simple_rpc/test_server.py +++ b/examples/simple_rpc/test_server.py @@ -10,11 +10,11 @@ class TestServer(RpcServer): Test RpcServer """ - def __init__(self, rep_address, pub_address): + def __init__(self): """ Constructor """ - super(TestServer, self).__init__(rep_address, pub_address) + super(TestServer, self).__init__() self.register(self.add) @@ -28,10 +28,10 @@ class TestServer(RpcServer): if __name__ == "__main__": rep_address = "tcp://*:2014" - pub_address = "tcp://*:0602" + pub_address = "tcp://*:4102" - ts = TestServer(rep_address, pub_address) - ts.start() + ts = TestServer() + ts.start(rep_address, pub_address) while 1: content = f"current server time is {time()}" diff --git a/vnpy/rpc/__init__.py b/vnpy/rpc/__init__.py index f12a1c83..13fdd4fd 100644 --- a/vnpy/rpc/__init__.py +++ b/vnpy/rpc/__init__.py @@ -2,6 +2,7 @@ import threading import traceback import signal import zmq +from typing import Any, Callable # Achieve Ctrl-c interrupt recv @@ -29,7 +30,7 @@ class RemoteException(Exception): class RpcServer: """""" - def __init__(self, rep_address, pub_address): + def __init__(self): """ Constructor """ @@ -41,20 +42,22 @@ class RpcServer: # Reply socket (Request–reply pattern) self.__socket_rep = self.__context.socket(zmq.REP) - self.__socket_rep.bind(rep_address) # Publish socket (Publish–subscribe pattern) self.__socket_pub = self.__context.socket(zmq.PUB) - self.__socket_pub.bind(pub_address) # Worker thread related self.__active = False # RpcServer status self.__thread = threading.Thread(target=self.run) # RpcServer thread - def start(self): + def start(self, rep_address: str, pub_address: str): """ Start RpcServer """ + # Bind socket address + self.__socket_rep.bind(rep_address) + self.__socket_pub.bind(pub_address) + # Start RpcServer status self.__active = True @@ -62,7 +65,7 @@ class RpcServer: if not self.__thread.isAlive(): self.__thread.start() - def stop(self, join=False): + def stop(self, join: bool = False): """ Stop RpcServer """ @@ -73,6 +76,10 @@ class RpcServer: if join and self.__thread.isAlive(): self.__thread.join() + # Unbind socket address + self.__socket_pub.unbind(self.__socket_pub.LAST_ENDPOINT) + self.__socket_rep.unbind(self.__socket_rep.LAST_ENDPOINT) + def run(self): """ Run RpcServer functions @@ -99,13 +106,13 @@ class RpcServer: # send callable response by Reply socket self.__socket_rep.send_pyobj(rep) - def publish(self, topic, data): + def publish(self, topic: str, data: Any): """ Publish data """ self.__socket_pub.send_pyobj([topic, data]) - def register(self, func): + def register(self, func: Callable): """ Register function """ @@ -115,10 +122,8 @@ class RpcServer: class RpcClient: """""" - def __init__(self, req_address, sub_address): + def __init__(self, req_address: str, sub_address: str): """Constructor""" - super(RpcClient, self).__init__() - # zmq port related self.__req_address = req_address self.__sub_address = sub_address @@ -134,7 +139,7 @@ class RpcClient: self.__thread = threading.Thread( target=self.run) # RpcClient thread - def __getattr__(self, name): + def __getattr__(self, name: str): """ Realize remote call function """ @@ -172,7 +177,7 @@ class RpcClient: def stop(self): """ - Stop RpcClient + Stop RpcClient """ # Stop RpcClient status self.__active = False @@ -196,13 +201,13 @@ class RpcClient: # Process data by callable function self.callback(topic, data) - def callback(self, topic, data): + def callback(self, topic: str, data: Any): """ Callable function """ raise NotImplementedError - def subscribe_topic(self, topic): + def subscribe_topic(self, topic: str): """ Subscribe data """ From 13a3865b8e64c4656b497b427dde6200f8067e35 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Tue, 2 Jul 2019 11:08:14 +0800 Subject: [PATCH 4/6] [Mod] add RpcServiceApp --- examples/vn_trader/run.py | 2 + vnpy/app/risk_manager/engine.py | 3 - vnpy/app/rpc_service/__init__.py | 14 ++++ vnpy/app/rpc_service/engine.py | 119 ++++++++++++++++++++++++++++ vnpy/app/rpc_service/ui/__init__.py | 1 + vnpy/app/rpc_service/ui/rpc.ico | Bin 0 -> 66622 bytes vnpy/app/rpc_service/ui/widget.py | 93 ++++++++++++++++++++++ vnpy/app/script_trader/engine.py | 3 - vnpy/rpc/__init__.py | 21 +++-- 9 files changed, 245 insertions(+), 11 deletions(-) create mode 100644 vnpy/app/rpc_service/__init__.py create mode 100644 vnpy/app/rpc_service/engine.py create mode 100644 vnpy/app/rpc_service/ui/__init__.py create mode 100644 vnpy/app/rpc_service/ui/rpc.ico create mode 100644 vnpy/app/rpc_service/ui/widget.py diff --git a/examples/vn_trader/run.py b/examples/vn_trader/run.py index 187b5abf..e8fb8a62 100644 --- a/examples/vn_trader/run.py +++ b/examples/vn_trader/run.py @@ -31,6 +31,7 @@ from vnpy.app.cta_backtester import CtaBacktesterApp # from vnpy.app.data_recorder import DataRecorderApp # from vnpy.app.risk_manager import RiskManagerApp from vnpy.app.script_trader import ScriptTraderApp +from vnpy.app.rpc_service import RpcServiceApp def main(): @@ -68,6 +69,7 @@ def main(): # main_engine.add_app(DataRecorderApp) # main_engine.add_app(RiskManagerApp) main_engine.add_app(ScriptTraderApp) + main_engine.add_app(RpcServiceApp) main_window = MainWindow(main_engine, event_engine) main_window.showMaximized() diff --git a/vnpy/app/risk_manager/engine.py b/vnpy/app/risk_manager/engine.py index 35ab4a74..c6193bc5 100644 --- a/vnpy/app/risk_manager/engine.py +++ b/vnpy/app/risk_manager/engine.py @@ -20,9 +20,6 @@ class RiskManagerEngine(BaseEngine): """""" super().__init__(main_engine, event_engine, APP_NAME) - self.main_engine = main_engine - self.event_engine = event_engine - self.active = False self.order_flow_count = 0 diff --git a/vnpy/app/rpc_service/__init__.py b/vnpy/app/rpc_service/__init__.py new file mode 100644 index 00000000..058bd067 --- /dev/null +++ b/vnpy/app/rpc_service/__init__.py @@ -0,0 +1,14 @@ +from pathlib import Path +from vnpy.trader.app import BaseApp +from .engine import RpcEngine, APP_NAME + + +class RpcServiceApp(BaseApp): + """""" + app_name = APP_NAME + app_module = __module__ + app_path = Path(__file__).parent + display_name = "RPC服务" + engine_class = RpcEngine + widget_name = "RpcManager" + icon_name = "rpc.ico" diff --git a/vnpy/app/rpc_service/engine.py b/vnpy/app/rpc_service/engine.py new file mode 100644 index 00000000..7784412a --- /dev/null +++ b/vnpy/app/rpc_service/engine.py @@ -0,0 +1,119 @@ +"""""" + +import traceback + +from vnpy.event import Event, EventEngine +from vnpy.rpc import RpcServer +from vnpy.trader.engine import BaseEngine, MainEngine +from vnpy.trader.utility import load_json, save_json +from vnpy.trader.object import LogData + +APP_NAME = "RpcService" + +EVENT_RPC_LOG = "eRpcLog" + + +class RpcEngine(BaseEngine): + """""" + setting_filename = "rpc_service_setting.json" + + def __init__(self, main_engine: MainEngine, event_engine: EventEngine): + """""" + super().__init__(main_engine, event_engine, APP_NAME) + + self.rep_address = "tcp://*:2014" + self.pub_address = "tcp://*:4102" + + self.server = None + + self.init_server() + self.load_setting() + self.register_event() + + def init_server(self): + """""" + self.server = RpcServer() + + self.server.register(self.main_engine.subscribe) + self.server.register(self.main_engine.send_order) + self.server.register(self.main_engine.send_orders) + self.server.register(self.main_engine.cancel_order) + self.server.register(self.main_engine.cancel_orders) + self.server.register(self.main_engine.query_history) + + self.server.register(self.main_engine.get_tick) + self.server.register(self.main_engine.get_order) + self.server.register(self.main_engine.get_trade) + self.server.register(self.main_engine.get_position) + self.server.register(self.main_engine.get_account) + self.server.register(self.main_engine.get_contract) + self.server.register(self.main_engine.get_all_ticks) + self.server.register(self.main_engine.get_all_orders) + self.server.register(self.main_engine.get_all_trades) + self.server.register(self.main_engine.get_all_positions) + self.server.register(self.main_engine.get_all_accounts) + self.server.register(self.main_engine.get_all_contracts) + self.server.register(self.main_engine.get_all_active_orders) + + def load_setting(self): + """""" + setting = load_json(self.setting_filename) + self.rep_address = setting.get("rep_address", self.rep_address) + self.pub_address = setting.get("pub_address", self.pub_address) + + def save_setting(self): + """""" + setting = { + "rep_address": self.rep_address, + "pub_address": self.pub_address + } + save_json(self.setting_filename, setting) + + def start(self, rep_address: str, pub_address: str): + """""" + if self.server.is_active(): + self.write_log("服务运行中") + return False + + self.rep_address = rep_address + self.pub_address = pub_address + + try: + self.server.start(rep_address, pub_address) + except: # noqa + msg = traceback.format_exc() + self.write_log(f"服务启动失败:{msg}") + return False + + self.save_setting() + self.write_log("服务启动成功") + return True + + def stop(self): + """""" + if not self.server.is_active(): + self.write_log("服务未启动") + return False + + self.server.stop() + self.write_log("服务已停止") + return True + + def close(self): + """""" + self.stop() + + def register_event(self): + """""" + self.event_engine.register_general(self.process_event) + + def process_event(self, event: Event): + """""" + if self.server.is_active(): + self.server.publish("", event) + + def write_log(self, msg: str) -> None: + """""" + log = LogData(msg=msg, gateway_name=APP_NAME) + event = Event(EVENT_RPC_LOG, log) + self.event_engine.put(event) diff --git a/vnpy/app/rpc_service/ui/__init__.py b/vnpy/app/rpc_service/ui/__init__.py new file mode 100644 index 00000000..b9aff582 --- /dev/null +++ b/vnpy/app/rpc_service/ui/__init__.py @@ -0,0 +1 @@ +from .widget import RpcManager diff --git a/vnpy/app/rpc_service/ui/rpc.ico b/vnpy/app/rpc_service/ui/rpc.ico new file mode 100644 index 0000000000000000000000000000000000000000..5e91b2dbb0393e7a88cab40368e3dfed12907be2 GIT binary patch literal 66622 zcmeHPYj73S0bUdY{?ryaiq#ICX%&7r{^!^KqVg z&Ue1=oU?m(HyTDO{5Ng7fq#ExbZy2-SDwRjTW7#`YS9>uYE1MZT~gZZvJYS|Py^I90>^RABY@qe-x2A12lP=4u)lNV z$#ZyahYa{mEgHj-W1;o7Uk85>xwNQvx2l^U+s(Cxj4f3JVJ-uV24jC~VlU;U(m^b=H${-gDV)`W=xRTcMALzE7Px)o$z7 zt-l7pZxlzyna460&$ei%728VJK@9+E;nuP(wR3P3)Rr2%>(YyKa#UkyAB5y=hebt2 zMt*+&L_jz&v$(i8W__NYu0wHj^W3s!i?MFqx=X>Y%)zmYc`S2|$=OD_csl!2arR5i zz;C95BQ>2!ja5ArDxT@$n5bRwis`$|Pjq@l$|aX5-;vL}jBrTQiP8&Ul<#5T=p2LO z8@1lmIGlBZic2oq_c#XB*49StCu(1b7w*}!$H>den}9u2C?NgfzJ2@7xh>qk#$gtj z9gj=A{POa0W5tRUoxqQKyW&~KJeI{f&z)aWQ)48{k@ei?CrVQ@YUki6s4X>4bgr|U z*8p-&=5X;;&%EwLdjq|I_WSSMyZ6$1aNZq?%m7L-T<%jWLZCizXf<7W{&Of z;lrMERXrJAjF9)Wo*c=gva)haX=!P>J7zZXf@`r+6(#(qFTp45&S zN-gWCEj3PbF3RcEmGIoWcSbxZ?^#cx1HwN82Z2+-=_F8({>~_%eti<>sCJK_?i;{m zfXN}ArfP21)zz_HKcEz-15{g{q2i~)WY3d*u&;hxM+iOFJL%pB)TE20Mf-JUc2m*eS2yu%lR9P zV~W>SI`iovy^PY(oxwW5<-mR*ot)yeU5$Rb18!<|8tc@MYkgio9LfDzrVmgDhyZXFeP#jfDQTx!2DAeTfFL-sPxeg? zHZOOCWdO&*F$KYs9@0xUwW#!ovU~&R1N;{_69DQ@logCXzfs&&2FrkUz&bzxndZqp z**7^TT2Ud(@>@Oc0fK51V{GC zzRAJn<&Lln;8-}OAb8S4dg-PXl@=Sb!8IFov+DqG%{b1YPiU1L#Z6_f41ng@1warS z*(duZ2SqC?WEsG*a7;n)q=)n}ikmxwb!1)bySG=`cW-^pb)M_>?w_7|z5A!7n#X#* z`>Uy5@BZnj*SmjO>h)k&s^?LVTOTFIx)l{!{fA!@2)c5oLYO1gA z{^_aLyMJ2h>%0G2>g&6|n(FoLub$2I`raG&EqI3)$U7un--iP3S4MW4Wk6dX4^aDV zrbG5k4mK}$gk``mfMd!uKlVot>7|=mRGM{w%Yg$x5L|1}Z+9Syo62CBD0+%K=K(=* zWS{dy!IR{mXhnrA19|`)QxH7qA-#;^=FVUp;Cx^X5Cqr7=(j!Kre>$b#vD^`)ZzT7 z1TxK&eVTJq6ngPAwd7cq1i+D+%n?PVm$r8CBB-l0?SS}P;1F;cs82JFqWwj{F9C3f z|BVR7VsTtMh2a2T2f%xnbaP}M>}!DdVy?xZXi7XOt#V6Gj_ECcV@x+kdPUFZ9X&L8 z#3d;U#Ky-P_!efj#(7r({ei)01iiZ&Ft3*g4YBzNPpW1eIVQv}0d4{A0@B5kZL^O~ zfUS`iNwI<7S=n{@Z4SrCv8Ib7+oe}mVQtVmtN+c9d(pJZ#Hxp!d@~;}c+}?N6*0%9 z*1j5J2174ys!Xgm_oU2gsC3vfT~X1YRzNGD6-Y^e*|TRGvu4dQ7%DJ5XU-gB=FFKX zwe8o*2p0qEB^YO3K&&FWB@YQ|M-8bZ zHKn$Zl|zssC*UDKI51o0hk&7>^t=q5${idTXI_{%k}I`yYDsNF!O=;AoPdcgt}|u6 zvua>*$>manWe%QVa+V>RQkUG+P?R$z^s?(*Wdc7+_ zdy`@H23@68IKSnljEjmW6>MwW4O!PI08Bw*q0aU9FXiz^&B5 zm%M9$J;3L{u7E)fdx8G}N>j#}7Z!fhikc~Y47UN)HmttW|197H;N-32jn{)(Qd4tn zh0S=y3t<}|2RH_3zf;qI8d6JY8cN?e4;43r=K+0yr2zNP3ZPPkj0cH>=)WFNbCPl9 z1<6VEtu&%$OMzPeY8hJ3)&8S6%CI%UPQY(~D?$MBxB>VMP`)$HyfAU3X4KAJOG@7` zhcjl(u=zTo^j?ie*s$P2UZ)AdT=Cx2&n*Xr*ZV)&R=m=aS z!8r4pzW^+0*^1Cg(?*eoKx;F;!y&HVL4!(B}2;I@~x-|Bq$ekw%1bMB4TzDLd_cx4Q? z*86=7*y1 z{CC7{^Ol(C`^rB=|3x3!-u8dfF$?XUgQJ~wK9SpMfe zDDUfAxZU>flkt7{>Jo9wtk*=3sn3Y(v!4|`|M0xH>zy^``j55l6ZwAU-`^ECJTb#; zk8R%g$62D^>+f0KVQt^BU^)7DL0peAmftk(CBeC1dWv$+nd@_&v&yGDBYHnOH)3AI z@Lv0FJ8ngi^Zhf?XXdM-`=d{b9#f{9-ly#QV=tO*^+209J~1=WcdXx@InUeWy`GvA z?fcYc?egBw{6+ij@-pJke=l*7w-#RB(uXnex z3DU{JlRh@1OKxDL~)*?uhW6-lqiV`$?~RzYLe<_JZfy_l8-oeQ(gBCB?Mw zEy?YMv7VFfJFCP51&?ci-scqkLR}+dE3Rft>4DI2T0^Q-|G%} zuSneUZlQ@G?-vQ?scSx-`=ETIU4HKeo2+NV^0(x$kGI-oY~$V!KC#bLZMZ^xeRoa3 zqe`i?cJp2N_S`!{{*lkY3c?{}hnUo3sc zGoHKNS?ltBv-ORZeEu_d**fjJ%hyE7WY)XsJND(s_3oqZdc7yI$@lVPe4ntRLX0ZJ z^_}nA*KU{B_wvB5@56DOh`j#m>-)J*jO2Xha~#LLJn!jaT-Q}zqU*cs+M8&P_M=>1 z-(3?UdEb+NhNpdZi5$YD_C18AWPprKx-LcsiTU-_~_LiK;t`bIeDNrSH zvnz#28C60|tvGvX;_kyI)%|_&VO8#fhf0kJ+pAi#)e&R2eXsP%cg&rs-=Bf+KZMA4 zd{c5z`|dPe`|ebu<;1n`Eyru;NR#%x;;hJa!sA{*@AFQD!x>M?clgckEyir#C&qqS zCdO|0T1@`tMBID8M84yB9=_Y%EAZW|iP1%S`7NgX?05XO17Z}u?N#OGJ72sLRQHGD zOTIJT7mO;j&cFYYuk12a#}(@9yKDL->pOmvS?|dCeFWamO!}%u;M|zrC-fb@X65a$ z|K4uox}EU-fbAW9<^Nj)-}akzMi%T6d?%>9zEO)&3 z<-LHjoYw_)Zk$m29>e#X&3Imy=(!EAod{%J=gO26Lx-&zXj9JX_fWTbFTMs&Di&Gm|w;N+IMBUkyB}|b^6{3DD=a7 ztoPF)dOlwW&pT<~ZOiRQwzbdG`!2u|paQ4?Y8wOiegwWBhSs$Kp$7d{084-_0M>e? zJzgEL6v*Mb*aq(}fY(<5*8tZx2JrnF_mKr7&*0wVILk4%ch>1v~u5GM`e zX>rytdew_er$u2ea4zqRf2uDe-^etW8Rvr;SW>kos~N3ll$l0PGtElIG@WiLdN}-Z z`-^GdJYvk>$ob)(FBwR4o>_-AUscL58kH_@IKMOJKWDT&dNFNIZY$$yrp?1#FNg(| zmNe}CBhuv-+s=w#p_RTcUV6G!-d2_Cv?++zL(cD}HXjO2`>-2G{_b?SRo+>ZBUFc4 I&q=5M53a@(ng9R* literal 0 HcmV?d00001 diff --git a/vnpy/app/rpc_service/ui/widget.py b/vnpy/app/rpc_service/ui/widget.py new file mode 100644 index 00000000..f811a22c --- /dev/null +++ b/vnpy/app/rpc_service/ui/widget.py @@ -0,0 +1,93 @@ +from vnpy.event import EventEngine, Event +from vnpy.trader.engine import MainEngine +from vnpy.trader.ui import QtWidgets, QtCore +from ..engine import APP_NAME, EVENT_RPC_LOG + + +class RpcManager(QtWidgets.QWidget): + """""" + signal_log = QtCore.pyqtSignal(Event) + + def __init__(self, main_engine: MainEngine, event_engine: EventEngine): + """""" + super().__init__() + + self.main_engine = main_engine + self.event_engine = event_engine + + self.rpc_engine = main_engine.get_engine(APP_NAME) + + self.init_ui() + self.register_event() + + def init_ui(self): + """""" + self.setWindowTitle("RPC服务") + self.setFixedWidth(900) + self.setFixedHeight(500) + + self.start_button = QtWidgets.QPushButton("启动") + self.start_button.clicked.connect(self.start_server) + + self.stop_button = QtWidgets.QPushButton("停止") + self.stop_button.clicked.connect(self.stop_server) + self.stop_button.setEnabled(False) + + for button in [self.start_button, self.stop_button]: + hint = button.sizeHint() + button.setFixedHeight(hint.height() * 2) + button.setFixedWidth(hint.width() * 4) + + self.rep_line = QtWidgets.QLineEdit(self.rpc_engine.rep_address) + self.rep_line.setFixedWidth(300) + + self.pub_line = QtWidgets.QLineEdit(self.rpc_engine.pub_address) + self.pub_line.setFixedWidth(300) + + self.log_monitor = QtWidgets.QTextEdit() + self.log_monitor.setReadOnly(True) + + form = QtWidgets.QFormLayout() + form.addRow("请求响应地址", self.rep_line) + form.addRow("事件广播地址", self.pub_line) + + hbox = QtWidgets.QHBoxLayout() + hbox.addLayout(form) + hbox.addWidget(self.start_button) + hbox.addWidget(self.stop_button) + hbox.addStretch() + + vbox = QtWidgets.QVBoxLayout() + vbox.addLayout(hbox) + vbox.addWidget(self.log_monitor) + + self.setLayout(vbox) + + def register_event(self): + """""" + self.signal_log.connect(self.process_log_event) + + self.event_engine.register(EVENT_RPC_LOG, self.signal_log.emit) + + def process_log_event(self, event: Event): + """""" + log = event.data + msg = f"{log.time}\t{log.msg}" + self.log_monitor.append(msg) + + def start_server(self): + """""" + rep_address = self.rep_line.text() + pub_address = self.pub_line.text() + + result = self.rpc_engine.start(rep_address, pub_address) + if result: + self.start_button.setEnabled(False) + self.stop_button.setEnabled(True) + + def stop_server(self): + """""" + result = self.rpc_engine.stop() + if result: + self.start_button.setEnabled(True) + self.stop_button.setEnabled(False) diff --git a/vnpy/app/script_trader/engine.py b/vnpy/app/script_trader/engine.py index 45642c24..ea08d830 100644 --- a/vnpy/app/script_trader/engine.py +++ b/vnpy/app/script_trader/engine.py @@ -40,9 +40,6 @@ class ScriptEngine(BaseEngine): """""" super().__init__(main_engine, event_engine, APP_NAME) - self.main_engine = main_engine - self.event_engine = event_engine - self.get_tick = main_engine.get_tick self.get_order = main_engine.get_order self.get_trade = main_engine.get_trade diff --git a/vnpy/rpc/__init__.py b/vnpy/rpc/__init__.py index 13fdd4fd..3c4aa58c 100644 --- a/vnpy/rpc/__init__.py +++ b/vnpy/rpc/__init__.py @@ -48,12 +48,19 @@ class RpcServer: # Worker thread related self.__active = False # RpcServer status - self.__thread = threading.Thread(target=self.run) # RpcServer thread + self.__thread = None # RpcServer thread + + def is_active(self): + """""" + return self.__active def start(self, rep_address: str, pub_address: str): """ Start RpcServer """ + if self.__active: + return + # Bind socket address self.__socket_rep.bind(rep_address) self.__socket_pub.bind(pub_address) @@ -62,19 +69,23 @@ class RpcServer: self.__active = True # Start RpcServer thread - if not self.__thread.isAlive(): - self.__thread.start() + self.__thread = threading.Thread(target=self.run) + self.__thread.start() - def stop(self, join: bool = False): + def stop(self): """ Stop RpcServer """ + if not self.__active: + return + # Stop RpcServer status self.__active = False # Wait for RpcServer thread to exit - if join and self.__thread.isAlive(): + if self.__thread.isAlive(): self.__thread.join() + self.__thread = None # Unbind socket address self.__socket_pub.unbind(self.__socket_pub.LAST_ENDPOINT) From 4127f1cc5dca65172181af551ca12234500d3594 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Tue, 2 Jul 2019 14:07:38 +0800 Subject: [PATCH 5/6] [Del] remove unecessary encoding comment --- vnpy/api/rest/rest_client.py | 2 -- vnpy/api/websocket/websocket_client.py | 2 -- vnpy/gateway/alpaca/alpaca_gateway.py | 1 - vnpy/gateway/bitfinex/bitfinex_gateway.py | 1 - vnpy/gateway/bitmex/bitmex_gateway.py | 4 +--- vnpy/gateway/femas/femas_gateway.py | 4 +--- vnpy/gateway/futu/futu_gateway.py | 1 - vnpy/gateway/oes/oes_gateway.py | 1 - vnpy/gateway/okex/okex_gateway.py | 1 - vnpy/gateway/okexf/okexf_gateway.py | 1 - vnpy/gateway/onetoken/onetoken_gateway.py | 1 - vnpy/gateway/tiger/tiger_gateway.py | 1 - 12 files changed, 2 insertions(+), 18 deletions(-) diff --git a/vnpy/api/rest/rest_client.py b/vnpy/api/rest/rest_client.py index c2083688..dd36a503 100644 --- a/vnpy/api/rest/rest_client.py +++ b/vnpy/api/rest/rest_client.py @@ -1,5 +1,3 @@ -# encoding: UTF-8 - import sys import traceback from datetime import datetime diff --git a/vnpy/api/websocket/websocket_client.py b/vnpy/api/websocket/websocket_client.py index 84596a34..870d87a4 100644 --- a/vnpy/api/websocket/websocket_client.py +++ b/vnpy/api/websocket/websocket_client.py @@ -1,5 +1,3 @@ -# encoding: UTF-8 - import json import ssl import sys diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index 036708c3..b9df4b32 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -1,4 +1,3 @@ -# encoding: UTF-8 """ Author: vigarbuaa """ diff --git a/vnpy/gateway/bitfinex/bitfinex_gateway.py b/vnpy/gateway/bitfinex/bitfinex_gateway.py index 832c2969..ecec31d3 100644 --- a/vnpy/gateway/bitfinex/bitfinex_gateway.py +++ b/vnpy/gateway/bitfinex/bitfinex_gateway.py @@ -1,4 +1,3 @@ -# encoding: UTF-8 """ Author: vigarbuaa """ diff --git a/vnpy/gateway/bitmex/bitmex_gateway.py b/vnpy/gateway/bitmex/bitmex_gateway.py index 444f85b2..e5af3082 100644 --- a/vnpy/gateway/bitmex/bitmex_gateway.py +++ b/vnpy/gateway/bitmex/bitmex_gateway.py @@ -1,6 +1,4 @@ -# encoding: UTF-8 -""" -""" +"""""" import hashlib import hmac diff --git a/vnpy/gateway/femas/femas_gateway.py b/vnpy/gateway/femas/femas_gateway.py index 47f80fbb..2fd6a4e8 100644 --- a/vnpy/gateway/femas/femas_gateway.py +++ b/vnpy/gateway/femas/femas_gateway.py @@ -1,6 +1,4 @@ -# encoding: UTF-8 -""" -""" +"""""" from datetime import datetime diff --git a/vnpy/gateway/futu/futu_gateway.py b/vnpy/gateway/futu/futu_gateway.py index 2f45adbe..895be70a 100644 --- a/vnpy/gateway/futu/futu_gateway.py +++ b/vnpy/gateway/futu/futu_gateway.py @@ -1,4 +1,3 @@ -# encoding: UTF-8 """ Please install futu-api before use. """ diff --git a/vnpy/gateway/oes/oes_gateway.py b/vnpy/gateway/oes/oes_gateway.py index fd763782..9ed76ddf 100644 --- a/vnpy/gateway/oes/oes_gateway.py +++ b/vnpy/gateway/oes/oes_gateway.py @@ -1,4 +1,3 @@ -# encoding: UTF-8 """ Author: nanoric """ diff --git a/vnpy/gateway/okex/okex_gateway.py b/vnpy/gateway/okex/okex_gateway.py index ba7a2f80..5ea47268 100644 --- a/vnpy/gateway/okex/okex_gateway.py +++ b/vnpy/gateway/okex/okex_gateway.py @@ -1,4 +1,3 @@ -# encoding: UTF-8 """ """ diff --git a/vnpy/gateway/okexf/okexf_gateway.py b/vnpy/gateway/okexf/okexf_gateway.py index 26c1c87a..b1212682 100644 --- a/vnpy/gateway/okexf/okexf_gateway.py +++ b/vnpy/gateway/okexf/okexf_gateway.py @@ -1,4 +1,3 @@ -# encoding: UTF-8 """ Author: qqqlyx """ diff --git a/vnpy/gateway/onetoken/onetoken_gateway.py b/vnpy/gateway/onetoken/onetoken_gateway.py index e6af5faa..bd586eb3 100644 --- a/vnpy/gateway/onetoken/onetoken_gateway.py +++ b/vnpy/gateway/onetoken/onetoken_gateway.py @@ -1,4 +1,3 @@ -# encoding: UTF-8 """ """ diff --git a/vnpy/gateway/tiger/tiger_gateway.py b/vnpy/gateway/tiger/tiger_gateway.py index c7796782..cfbead5e 100644 --- a/vnpy/gateway/tiger/tiger_gateway.py +++ b/vnpy/gateway/tiger/tiger_gateway.py @@ -1,4 +1,3 @@ -# encoding: UTF-8 """ Author: KeKe Please install tiger-api before use. From cc590f1b5bb6e10ff8a3e37b64ffe86f1101830b Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Tue, 2 Jul 2019 14:09:00 +0800 Subject: [PATCH 6/6] [Add] RpcGateway for connecting to RpcService --- examples/client_server/client/run_client.py | 26 +++++ examples/client_server/server/run_server.py | 72 ++++++++++++ examples/simple_rpc/test_client.py | 8 +- vnpy/app/rpc_service/engine.py | 10 +- vnpy/gateway/rpc/__init__.py | 1 + vnpy/gateway/rpc/rpc_gateway.py | 116 ++++++++++++++++++++ vnpy/rpc/__init__.py | 33 ++++-- 7 files changed, 245 insertions(+), 21 deletions(-) create mode 100644 examples/client_server/client/run_client.py create mode 100644 examples/client_server/server/run_server.py create mode 100644 vnpy/gateway/rpc/__init__.py create mode 100644 vnpy/gateway/rpc/rpc_gateway.py diff --git a/examples/client_server/client/run_client.py b/examples/client_server/client/run_client.py new file mode 100644 index 00000000..4f3669e6 --- /dev/null +++ b/examples/client_server/client/run_client.py @@ -0,0 +1,26 @@ +from vnpy.event import EventEngine +from vnpy.trader.engine import MainEngine +from vnpy.trader.ui import MainWindow, create_qapp +from vnpy.gateway.rpc import RpcGateway +from vnpy.app.cta_strategy import CtaStrategyApp + + +def main(): + """""" + qapp = create_qapp() + + event_engine = EventEngine() + + main_engine = MainEngine(event_engine) + + main_engine.add_gateway(RpcGateway) + main_engine.add_app(CtaStrategyApp) + + main_window = MainWindow(main_engine, event_engine) + main_window.showMaximized() + + qapp.exec() + + +if __name__ == "__main__": + main() diff --git a/examples/client_server/server/run_server.py b/examples/client_server/server/run_server.py new file mode 100644 index 00000000..8d9f006a --- /dev/null +++ b/examples/client_server/server/run_server.py @@ -0,0 +1,72 @@ +from time import sleep + +from vnpy.event import EventEngine, Event +from vnpy.trader.engine import MainEngine +from vnpy.trader.ui import MainWindow, create_qapp +from vnpy.trader.event import EVENT_LOG +from vnpy.gateway.ctp import CtpGateway +from vnpy.app.rpc_service import RpcServiceApp +from vnpy.app.rpc_service.engine import EVENT_RPC_LOG + + +def main_ui(): + """""" + qapp = create_qapp() + + event_engine = EventEngine() + + main_engine = MainEngine(event_engine) + + main_engine.add_gateway(CtpGateway) + main_engine.add_app(RpcServiceApp) + + main_window = MainWindow(main_engine, event_engine) + main_window.showMaximized() + + qapp.exec() + + +def process_log_event(event: Event): + """""" + log = event.data + msg = f"{log.time}\t{log.msg}" + print(msg) + + +def main_terminal(): + """""" + event_engine = EventEngine() + event_engine.register(EVENT_LOG, process_log_event) + event_engine.register(EVENT_RPC_LOG, process_log_event) + + main_engine = MainEngine(event_engine) + main_engine.add_gateway(CtpGateway) + rpc_engine = main_engine.add_app(RpcServiceApp) + + setting = { + "用户名": "25500029", + "密码": "20140602", + "经纪商代码": "6010", + "交易服务器": "180.169.75.194:41305", + "行情服务器": "180.166.1.17:41313", + "产品名称": "vntech_vnpy_2.0", + "授权编码": "0Y1J5UIMY79BFL7S", + "产品信息": "" + } + main_engine.connect(setting, "CTP") + sleep(10) + + rep_address = "tcp://127.0.0.1:2014" + pub_address = "tcp://127.0.0.1:4102" + rpc_engine.start(rep_address, pub_address) + + while True: + sleep(1) + + +if __name__ == "__main__": + # Run in GUI mode + # main_ui() + + # Run in CLI mode + main_terminal() diff --git a/examples/simple_rpc/test_client.py b/examples/simple_rpc/test_client.py index 22689971..77ba34f6 100644 --- a/examples/simple_rpc/test_client.py +++ b/examples/simple_rpc/test_client.py @@ -10,11 +10,11 @@ class TestClient(RpcClient): Test RpcClient """ - def __init__(self, req_address, sub_address): + def __init__(self): """ Constructor """ - super(TestClient, self).__init__(req_address, sub_address) + super(TestClient, self).__init__() def callback(self, topic, data): """ @@ -27,9 +27,9 @@ if __name__ == "__main__": req_address = "tcp://localhost:2014" sub_address = "tcp://localhost:4102" - tc = TestClient(req_address, sub_address) + tc = TestClient() tc.subscribe_topic("") - tc.start() + tc.start(req_address, sub_address) while 1: print(tc.add(1, 3)) diff --git a/vnpy/app/rpc_service/engine.py b/vnpy/app/rpc_service/engine.py index 7784412a..62a355a5 100644 --- a/vnpy/app/rpc_service/engine.py +++ b/vnpy/app/rpc_service/engine.py @@ -72,7 +72,7 @@ class RpcEngine(BaseEngine): def start(self, rep_address: str, pub_address: str): """""" if self.server.is_active(): - self.write_log("服务运行中") + self.write_log("RPC服务运行中") return False self.rep_address = rep_address @@ -82,21 +82,21 @@ class RpcEngine(BaseEngine): self.server.start(rep_address, pub_address) except: # noqa msg = traceback.format_exc() - self.write_log(f"服务启动失败:{msg}") + self.write_log(f"RPC服务启动失败:{msg}") return False self.save_setting() - self.write_log("服务启动成功") + self.write_log("RPC服务启动成功") return True def stop(self): """""" if not self.server.is_active(): - self.write_log("服务未启动") + self.write_log("RPC服务未启动") return False self.server.stop() - self.write_log("服务已停止") + self.write_log("RPC服务已停止") return True def close(self): diff --git a/vnpy/gateway/rpc/__init__.py b/vnpy/gateway/rpc/__init__.py new file mode 100644 index 00000000..5611fd75 --- /dev/null +++ b/vnpy/gateway/rpc/__init__.py @@ -0,0 +1 @@ +from .rpc_gateway import RpcGateway diff --git a/vnpy/gateway/rpc/rpc_gateway.py b/vnpy/gateway/rpc/rpc_gateway.py new file mode 100644 index 00000000..c9ad39a8 --- /dev/null +++ b/vnpy/gateway/rpc/rpc_gateway.py @@ -0,0 +1,116 @@ +from vnpy.event import Event +from vnpy.rpc import RpcClient +from vnpy.trader.gateway import BaseGateway +from vnpy.trader.object import ( + SubscribeRequest, + CancelRequest, + OrderRequest +) +from vnpy.trader.constant import Exchange + + +class RpcGateway(BaseGateway): + """ + VN Trader Gateway for RPC service. + """ + + default_setting = { + "主动请求地址": "tcp://127.0.0.1:2014", + "推送订阅地址": "tcp://127.0.0.1:4102" + } + + exchanges = list(Exchange) + + def __init__(self, event_engine): + """Constructor""" + super().__init__(event_engine, "RPC") + + self.symbol_gateway_map = {} + + self.client = RpcClient() + self.client.callback = self.client_callback + + def connect(self, setting: dict): + """""" + req_address = setting["主动请求地址"] + pub_address = setting["推送订阅地址"] + + self.client.subscribe_topic("") + self.client.start(req_address, pub_address) + + self.write_log("服务器连接成功,开始初始化查询") + + self.query_all() + + def subscribe(self, req: SubscribeRequest): + """""" + gateway_name = self.symbol_gateway_map.get(req.vt_symbol, "") + self.client.subscribe(req, gateway_name) + + def send_order(self, req: OrderRequest): + """""" + gateway_name = self.symbol_gateway_map.get(req.vt_symbol, "") + self.client.send_order(req, gateway_name) + + def cancel_order(self, req: CancelRequest): + """""" + gateway_name = self.symbol_gateway_map.get(req.vt_symbol, "") + self.client.cancel_order(req, gateway_name) + + def query_account(self): + """""" + pass + + def query_position(self): + """""" + pass + + def query_all(self): + """""" + contracts = self.client.get_all_contracts() + for contract in contracts: + self.symbol_gateway_map[contract.vt_symbol] = contract.gateway_name + contract.gateway_name = self.gateway_name + self.on_contract(contract) + self.write_log("合约信息查询成功") + + accounts = self.client.get_all_accounts() + for account in accounts: + account.gateway_name = self.gateway_name + self.on_account(account) + self.write_log("资金信息查询成功") + + positions = self.client.get_all_positions() + for position in positions: + position.gateway_name = self.gateway_name + self.on_position(position) + self.write_log("持仓信息查询成功") + + orders = self.client.get_all_orders() + for order in orders: + order.gateway_name = self.gateway_name + self.on_order(order) + self.write_log("委托信息查询成功") + + trades = self.client.get_all_trades() + for trade in trades: + trade.gateway_name = self.gateway_name + self.on_trade(trade) + self.write_log("成交信息查询成功") + + def close(self): + """""" + self.client.stop() + + def client_callback(self, topic: str, event: Event): + """""" + if event is None: + print("none event", topic, event) + return + + data = event.data + + if hasattr(data, "gateway_name"): + data.gateway_name = self.gateway_name + + self.event_engine.put(event) diff --git a/vnpy/rpc/__init__.py b/vnpy/rpc/__init__.py index 3c4aa58c..52b2ec0d 100644 --- a/vnpy/rpc/__init__.py +++ b/vnpy/rpc/__init__.py @@ -133,22 +133,20 @@ class RpcServer: class RpcClient: """""" - def __init__(self, req_address: str, sub_address: str): + def __init__(self): """Constructor""" # zmq port related - self.__req_address = req_address - self.__sub_address = sub_address - self.__context = zmq.Context() + # Request socket (Request–reply pattern) self.__socket_req = self.__context.socket(zmq.REQ) + # Subscribe socket (Publish–subscribe pattern) self.__socket_sub = self.__context.socket(zmq.SUB) # Worker thread relate, used to process data pushed from server - self.__active = False # RpcClient status - self.__thread = threading.Thread( - target=self.run) # RpcClient thread + self.__active = False # RpcClient status + self.__thread = None # RpcClient thread def __getattr__(self, name: str): """ @@ -171,31 +169,42 @@ class RpcClient: return dorpc - def start(self): + def start(self, req_address: str, sub_address: str): """ Start RpcClient """ + if self.__active: + return + # Connect zmq port - self.__socket_req.connect(self.__req_address) - self.__socket_sub.connect(self.__sub_address) + self.__socket_req.connect(req_address) + self.__socket_sub.connect(sub_address) # Start RpcClient status self.__active = True # Start RpcClient thread - if not self.__thread.isAlive(): - self.__thread.start() + self.__thread = threading.Thread(target=self.run) + self.__thread.start() def stop(self): """ Stop RpcClient """ + if not self.__active: + return + # Stop RpcClient status self.__active = False # Wait for RpcClient thread to exit if self.__thread.isAlive(): self.__thread.join() + self.__thread = None + + # Close socket + self.__socket_req.close() + self.__socket_sub.close() def run(self): """