From cc590f1b5bb6e10ff8a3e37b64ffe86f1101830b Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Tue, 2 Jul 2019 14:09:00 +0800 Subject: [PATCH] [Add] RpcGateway for connecting to RpcService --- examples/client_server/client/run_client.py | 26 +++++ examples/client_server/server/run_server.py | 72 ++++++++++++ examples/simple_rpc/test_client.py | 8 +- vnpy/app/rpc_service/engine.py | 10 +- vnpy/gateway/rpc/__init__.py | 1 + vnpy/gateway/rpc/rpc_gateway.py | 116 ++++++++++++++++++++ vnpy/rpc/__init__.py | 33 ++++-- 7 files changed, 245 insertions(+), 21 deletions(-) create mode 100644 examples/client_server/client/run_client.py create mode 100644 examples/client_server/server/run_server.py create mode 100644 vnpy/gateway/rpc/__init__.py create mode 100644 vnpy/gateway/rpc/rpc_gateway.py diff --git a/examples/client_server/client/run_client.py b/examples/client_server/client/run_client.py new file mode 100644 index 00000000..4f3669e6 --- /dev/null +++ b/examples/client_server/client/run_client.py @@ -0,0 +1,26 @@ +from vnpy.event import EventEngine +from vnpy.trader.engine import MainEngine +from vnpy.trader.ui import MainWindow, create_qapp +from vnpy.gateway.rpc import RpcGateway +from vnpy.app.cta_strategy import CtaStrategyApp + + +def main(): + """""" + qapp = create_qapp() + + event_engine = EventEngine() + + main_engine = MainEngine(event_engine) + + main_engine.add_gateway(RpcGateway) + main_engine.add_app(CtaStrategyApp) + + main_window = MainWindow(main_engine, event_engine) + main_window.showMaximized() + + qapp.exec() + + +if __name__ == "__main__": + main() diff --git a/examples/client_server/server/run_server.py b/examples/client_server/server/run_server.py new file mode 100644 index 00000000..8d9f006a --- /dev/null +++ b/examples/client_server/server/run_server.py @@ -0,0 +1,72 @@ +from time import sleep + +from vnpy.event import EventEngine, Event +from vnpy.trader.engine import MainEngine +from vnpy.trader.ui import MainWindow, create_qapp +from vnpy.trader.event import EVENT_LOG +from vnpy.gateway.ctp import CtpGateway +from vnpy.app.rpc_service import RpcServiceApp +from vnpy.app.rpc_service.engine import EVENT_RPC_LOG + + +def main_ui(): + """""" + qapp = create_qapp() + + event_engine = EventEngine() + + main_engine = MainEngine(event_engine) + + main_engine.add_gateway(CtpGateway) + main_engine.add_app(RpcServiceApp) + + main_window = MainWindow(main_engine, event_engine) + main_window.showMaximized() + + qapp.exec() + + +def process_log_event(event: Event): + """""" + log = event.data + msg = f"{log.time}\t{log.msg}" + print(msg) + + +def main_terminal(): + """""" + event_engine = EventEngine() + event_engine.register(EVENT_LOG, process_log_event) + event_engine.register(EVENT_RPC_LOG, process_log_event) + + main_engine = MainEngine(event_engine) + main_engine.add_gateway(CtpGateway) + rpc_engine = main_engine.add_app(RpcServiceApp) + + setting = { + "用户名": "25500029", + "密码": "20140602", + "经纪商代码": "6010", + "交易服务器": "180.169.75.194:41305", + "行情服务器": "180.166.1.17:41313", + "产品名称": "vntech_vnpy_2.0", + "授权编码": "0Y1J5UIMY79BFL7S", + "产品信息": "" + } + main_engine.connect(setting, "CTP") + sleep(10) + + rep_address = "tcp://127.0.0.1:2014" + pub_address = "tcp://127.0.0.1:4102" + rpc_engine.start(rep_address, pub_address) + + while True: + sleep(1) + + +if __name__ == "__main__": + # Run in GUI mode + # main_ui() + + # Run in CLI mode + main_terminal() diff --git a/examples/simple_rpc/test_client.py b/examples/simple_rpc/test_client.py index 22689971..77ba34f6 100644 --- a/examples/simple_rpc/test_client.py +++ b/examples/simple_rpc/test_client.py @@ -10,11 +10,11 @@ class TestClient(RpcClient): Test RpcClient """ - def __init__(self, req_address, sub_address): + def __init__(self): """ Constructor """ - super(TestClient, self).__init__(req_address, sub_address) + super(TestClient, self).__init__() def callback(self, topic, data): """ @@ -27,9 +27,9 @@ if __name__ == "__main__": req_address = "tcp://localhost:2014" sub_address = "tcp://localhost:4102" - tc = TestClient(req_address, sub_address) + tc = TestClient() tc.subscribe_topic("") - tc.start() + tc.start(req_address, sub_address) while 1: print(tc.add(1, 3)) diff --git a/vnpy/app/rpc_service/engine.py b/vnpy/app/rpc_service/engine.py index 7784412a..62a355a5 100644 --- a/vnpy/app/rpc_service/engine.py +++ b/vnpy/app/rpc_service/engine.py @@ -72,7 +72,7 @@ class RpcEngine(BaseEngine): def start(self, rep_address: str, pub_address: str): """""" if self.server.is_active(): - self.write_log("服务运行中") + self.write_log("RPC服务运行中") return False self.rep_address = rep_address @@ -82,21 +82,21 @@ class RpcEngine(BaseEngine): self.server.start(rep_address, pub_address) except: # noqa msg = traceback.format_exc() - self.write_log(f"服务启动失败:{msg}") + self.write_log(f"RPC服务启动失败:{msg}") return False self.save_setting() - self.write_log("服务启动成功") + self.write_log("RPC服务启动成功") return True def stop(self): """""" if not self.server.is_active(): - self.write_log("服务未启动") + self.write_log("RPC服务未启动") return False self.server.stop() - self.write_log("服务已停止") + self.write_log("RPC服务已停止") return True def close(self): diff --git a/vnpy/gateway/rpc/__init__.py b/vnpy/gateway/rpc/__init__.py new file mode 100644 index 00000000..5611fd75 --- /dev/null +++ b/vnpy/gateway/rpc/__init__.py @@ -0,0 +1 @@ +from .rpc_gateway import RpcGateway diff --git a/vnpy/gateway/rpc/rpc_gateway.py b/vnpy/gateway/rpc/rpc_gateway.py new file mode 100644 index 00000000..c9ad39a8 --- /dev/null +++ b/vnpy/gateway/rpc/rpc_gateway.py @@ -0,0 +1,116 @@ +from vnpy.event import Event +from vnpy.rpc import RpcClient +from vnpy.trader.gateway import BaseGateway +from vnpy.trader.object import ( + SubscribeRequest, + CancelRequest, + OrderRequest +) +from vnpy.trader.constant import Exchange + + +class RpcGateway(BaseGateway): + """ + VN Trader Gateway for RPC service. + """ + + default_setting = { + "主动请求地址": "tcp://127.0.0.1:2014", + "推送订阅地址": "tcp://127.0.0.1:4102" + } + + exchanges = list(Exchange) + + def __init__(self, event_engine): + """Constructor""" + super().__init__(event_engine, "RPC") + + self.symbol_gateway_map = {} + + self.client = RpcClient() + self.client.callback = self.client_callback + + def connect(self, setting: dict): + """""" + req_address = setting["主动请求地址"] + pub_address = setting["推送订阅地址"] + + self.client.subscribe_topic("") + self.client.start(req_address, pub_address) + + self.write_log("服务器连接成功,开始初始化查询") + + self.query_all() + + def subscribe(self, req: SubscribeRequest): + """""" + gateway_name = self.symbol_gateway_map.get(req.vt_symbol, "") + self.client.subscribe(req, gateway_name) + + def send_order(self, req: OrderRequest): + """""" + gateway_name = self.symbol_gateway_map.get(req.vt_symbol, "") + self.client.send_order(req, gateway_name) + + def cancel_order(self, req: CancelRequest): + """""" + gateway_name = self.symbol_gateway_map.get(req.vt_symbol, "") + self.client.cancel_order(req, gateway_name) + + def query_account(self): + """""" + pass + + def query_position(self): + """""" + pass + + def query_all(self): + """""" + contracts = self.client.get_all_contracts() + for contract in contracts: + self.symbol_gateway_map[contract.vt_symbol] = contract.gateway_name + contract.gateway_name = self.gateway_name + self.on_contract(contract) + self.write_log("合约信息查询成功") + + accounts = self.client.get_all_accounts() + for account in accounts: + account.gateway_name = self.gateway_name + self.on_account(account) + self.write_log("资金信息查询成功") + + positions = self.client.get_all_positions() + for position in positions: + position.gateway_name = self.gateway_name + self.on_position(position) + self.write_log("持仓信息查询成功") + + orders = self.client.get_all_orders() + for order in orders: + order.gateway_name = self.gateway_name + self.on_order(order) + self.write_log("委托信息查询成功") + + trades = self.client.get_all_trades() + for trade in trades: + trade.gateway_name = self.gateway_name + self.on_trade(trade) + self.write_log("成交信息查询成功") + + def close(self): + """""" + self.client.stop() + + def client_callback(self, topic: str, event: Event): + """""" + if event is None: + print("none event", topic, event) + return + + data = event.data + + if hasattr(data, "gateway_name"): + data.gateway_name = self.gateway_name + + self.event_engine.put(event) diff --git a/vnpy/rpc/__init__.py b/vnpy/rpc/__init__.py index 3c4aa58c..52b2ec0d 100644 --- a/vnpy/rpc/__init__.py +++ b/vnpy/rpc/__init__.py @@ -133,22 +133,20 @@ class RpcServer: class RpcClient: """""" - def __init__(self, req_address: str, sub_address: str): + def __init__(self): """Constructor""" # zmq port related - self.__req_address = req_address - self.__sub_address = sub_address - self.__context = zmq.Context() + # Request socket (Request–reply pattern) self.__socket_req = self.__context.socket(zmq.REQ) + # Subscribe socket (Publish–subscribe pattern) self.__socket_sub = self.__context.socket(zmq.SUB) # Worker thread relate, used to process data pushed from server - self.__active = False # RpcClient status - self.__thread = threading.Thread( - target=self.run) # RpcClient thread + self.__active = False # RpcClient status + self.__thread = None # RpcClient thread def __getattr__(self, name: str): """ @@ -171,31 +169,42 @@ class RpcClient: return dorpc - def start(self): + def start(self, req_address: str, sub_address: str): """ Start RpcClient """ + if self.__active: + return + # Connect zmq port - self.__socket_req.connect(self.__req_address) - self.__socket_sub.connect(self.__sub_address) + self.__socket_req.connect(req_address) + self.__socket_sub.connect(sub_address) # Start RpcClient status self.__active = True # Start RpcClient thread - if not self.__thread.isAlive(): - self.__thread.start() + self.__thread = threading.Thread(target=self.run) + self.__thread.start() def stop(self): """ Stop RpcClient """ + if not self.__active: + return + # Stop RpcClient status self.__active = False # Wait for RpcClient thread to exit if self.__thread.isAlive(): self.__thread.join() + self.__thread = None + + # Close socket + self.__socket_req.close() + self.__socket_sub.close() def run(self): """