[Add] RpcGateway for connecting to RpcService

This commit is contained in:
vn.py 2019-07-02 14:09:00 +08:00
parent 4127f1cc5d
commit cc590f1b5b
7 changed files with 245 additions and 21 deletions

View File

@ -0,0 +1,26 @@
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
from vnpy.gateway.rpc import RpcGateway
from vnpy.app.cta_strategy import CtaStrategyApp
def main():
""""""
qapp = create_qapp()
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_gateway(RpcGateway)
main_engine.add_app(CtaStrategyApp)
main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()
qapp.exec()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,72 @@
from time import sleep
from vnpy.event import EventEngine, Event
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
from vnpy.trader.event import EVENT_LOG
from vnpy.gateway.ctp import CtpGateway
from vnpy.app.rpc_service import RpcServiceApp
from vnpy.app.rpc_service.engine import EVENT_RPC_LOG
def main_ui():
""""""
qapp = create_qapp()
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_gateway(CtpGateway)
main_engine.add_app(RpcServiceApp)
main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()
qapp.exec()
def process_log_event(event: Event):
""""""
log = event.data
msg = f"{log.time}\t{log.msg}"
print(msg)
def main_terminal():
""""""
event_engine = EventEngine()
event_engine.register(EVENT_LOG, process_log_event)
event_engine.register(EVENT_RPC_LOG, process_log_event)
main_engine = MainEngine(event_engine)
main_engine.add_gateway(CtpGateway)
rpc_engine = main_engine.add_app(RpcServiceApp)
setting = {
"用户名": "25500029",
"密码": "20140602",
"经纪商代码": "6010",
"交易服务器": "180.169.75.194:41305",
"行情服务器": "180.166.1.17:41313",
"产品名称": "vntech_vnpy_2.0",
"授权编码": "0Y1J5UIMY79BFL7S",
"产品信息": ""
}
main_engine.connect(setting, "CTP")
sleep(10)
rep_address = "tcp://127.0.0.1:2014"
pub_address = "tcp://127.0.0.1:4102"
rpc_engine.start(rep_address, pub_address)
while True:
sleep(1)
if __name__ == "__main__":
# Run in GUI mode
# main_ui()
# Run in CLI mode
main_terminal()

View File

@ -10,11 +10,11 @@ class TestClient(RpcClient):
Test RpcClient
"""
def __init__(self, req_address, sub_address):
def __init__(self):
"""
Constructor
"""
super(TestClient, self).__init__(req_address, sub_address)
super(TestClient, self).__init__()
def callback(self, topic, data):
"""
@ -27,9 +27,9 @@ if __name__ == "__main__":
req_address = "tcp://localhost:2014"
sub_address = "tcp://localhost:4102"
tc = TestClient(req_address, sub_address)
tc = TestClient()
tc.subscribe_topic("")
tc.start()
tc.start(req_address, sub_address)
while 1:
print(tc.add(1, 3))

View File

@ -72,7 +72,7 @@ class RpcEngine(BaseEngine):
def start(self, rep_address: str, pub_address: str):
""""""
if self.server.is_active():
self.write_log("服务运行中")
self.write_log("RPC服务运行中")
return False
self.rep_address = rep_address
@ -82,21 +82,21 @@ class RpcEngine(BaseEngine):
self.server.start(rep_address, pub_address)
except: # noqa
msg = traceback.format_exc()
self.write_log(f"服务启动失败:{msg}")
self.write_log(f"RPC服务启动失败:{msg}")
return False
self.save_setting()
self.write_log("服务启动成功")
self.write_log("RPC服务启动成功")
return True
def stop(self):
""""""
if not self.server.is_active():
self.write_log("服务未启动")
self.write_log("RPC服务未启动")
return False
self.server.stop()
self.write_log("服务已停止")
self.write_log("RPC服务已停止")
return True
def close(self):

View File

@ -0,0 +1 @@
from .rpc_gateway import RpcGateway

View File

@ -0,0 +1,116 @@
from vnpy.event import Event
from vnpy.rpc import RpcClient
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import (
SubscribeRequest,
CancelRequest,
OrderRequest
)
from vnpy.trader.constant import Exchange
class RpcGateway(BaseGateway):
"""
VN Trader Gateway for RPC service.
"""
default_setting = {
"主动请求地址": "tcp://127.0.0.1:2014",
"推送订阅地址": "tcp://127.0.0.1:4102"
}
exchanges = list(Exchange)
def __init__(self, event_engine):
"""Constructor"""
super().__init__(event_engine, "RPC")
self.symbol_gateway_map = {}
self.client = RpcClient()
self.client.callback = self.client_callback
def connect(self, setting: dict):
""""""
req_address = setting["主动请求地址"]
pub_address = setting["推送订阅地址"]
self.client.subscribe_topic("")
self.client.start(req_address, pub_address)
self.write_log("服务器连接成功,开始初始化查询")
self.query_all()
def subscribe(self, req: SubscribeRequest):
""""""
gateway_name = self.symbol_gateway_map.get(req.vt_symbol, "")
self.client.subscribe(req, gateway_name)
def send_order(self, req: OrderRequest):
""""""
gateway_name = self.symbol_gateway_map.get(req.vt_symbol, "")
self.client.send_order(req, gateway_name)
def cancel_order(self, req: CancelRequest):
""""""
gateway_name = self.symbol_gateway_map.get(req.vt_symbol, "")
self.client.cancel_order(req, gateway_name)
def query_account(self):
""""""
pass
def query_position(self):
""""""
pass
def query_all(self):
""""""
contracts = self.client.get_all_contracts()
for contract in contracts:
self.symbol_gateway_map[contract.vt_symbol] = contract.gateway_name
contract.gateway_name = self.gateway_name
self.on_contract(contract)
self.write_log("合约信息查询成功")
accounts = self.client.get_all_accounts()
for account in accounts:
account.gateway_name = self.gateway_name
self.on_account(account)
self.write_log("资金信息查询成功")
positions = self.client.get_all_positions()
for position in positions:
position.gateway_name = self.gateway_name
self.on_position(position)
self.write_log("持仓信息查询成功")
orders = self.client.get_all_orders()
for order in orders:
order.gateway_name = self.gateway_name
self.on_order(order)
self.write_log("委托信息查询成功")
trades = self.client.get_all_trades()
for trade in trades:
trade.gateway_name = self.gateway_name
self.on_trade(trade)
self.write_log("成交信息查询成功")
def close(self):
""""""
self.client.stop()
def client_callback(self, topic: str, event: Event):
""""""
if event is None:
print("none event", topic, event)
return
data = event.data
if hasattr(data, "gateway_name"):
data.gateway_name = self.gateway_name
self.event_engine.put(event)

View File

@ -133,22 +133,20 @@ class RpcServer:
class RpcClient:
""""""
def __init__(self, req_address: str, sub_address: str):
def __init__(self):
"""Constructor"""
# zmq port related
self.__req_address = req_address
self.__sub_address = sub_address
self.__context = zmq.Context()
# Request socket (Requestreply pattern)
self.__socket_req = self.__context.socket(zmq.REQ)
# Subscribe socket (Publishsubscribe pattern)
self.__socket_sub = self.__context.socket(zmq.SUB)
# Worker thread relate, used to process data pushed from server
self.__active = False # RpcClient status
self.__thread = threading.Thread(
target=self.run) # RpcClient thread
self.__thread = None # RpcClient thread
def __getattr__(self, name: str):
"""
@ -171,31 +169,42 @@ class RpcClient:
return dorpc
def start(self):
def start(self, req_address: str, sub_address: str):
"""
Start RpcClient
"""
if self.__active:
return
# Connect zmq port
self.__socket_req.connect(self.__req_address)
self.__socket_sub.connect(self.__sub_address)
self.__socket_req.connect(req_address)
self.__socket_sub.connect(sub_address)
# Start RpcClient status
self.__active = True
# Start RpcClient thread
if not self.__thread.isAlive():
self.__thread = threading.Thread(target=self.run)
self.__thread.start()
def stop(self):
"""
Stop RpcClient
"""
if not self.__active:
return
# Stop RpcClient status
self.__active = False
# Wait for RpcClient thread to exit
if self.__thread.isAlive():
self.__thread.join()
self.__thread = None
# Close socket
self.__socket_req.close()
self.__socket_sub.close()
def run(self):
"""