Merge pull request #2065 from vnpy/rpc-lock

Rpc lock
This commit is contained in:
vn.py 2019-09-03 10:04:26 +08:00 committed by GitHub
commit 65e71ae66d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 75 additions and 28 deletions

View File

@ -44,13 +44,13 @@ def main_terminal():
rpc_engine = main_engine.add_app(RpcServiceApp) rpc_engine = main_engine.add_app(RpcServiceApp)
setting = { setting = {
"用户名": "25500029", "用户名": "",
"密码": "20140602", "密码": "",
"经纪商代码": "6010", "经纪商代码": "9999",
"交易服务器": "180.169.75.194:41305", "交易服务器": "180.168.146.187:10101",
"行情服务器": "180.166.1.17:41313", "行情服务器": "180.168.146.187:10111",
"产品名称": "vntech_vnpy_2.0", "产品名称": "simnow_client_test",
"授权编码": "0Y1J5UIMY79BFL7S", "授权编码": "0000000000000000",
"产品信息": "" "产品信息": ""
} }
main_engine.connect(setting, "CTP") main_engine.connect(setting, "CTP")

View File

@ -1,6 +1,7 @@
"""""" """"""
import traceback import traceback
from typing import Optional
from vnpy.event import Event, EventEngine from vnpy.event import Event, EventEngine
from vnpy.rpc import RpcServer from vnpy.rpc import RpcServer
@ -24,7 +25,7 @@ class RpcEngine(BaseEngine):
self.rep_address = "tcp://*:2014" self.rep_address = "tcp://*:2014"
self.pub_address = "tcp://*:4102" self.pub_address = "tcp://*:4102"
self.server = None self.server: Optional[RpcServer] = None
self.init_server() self.init_server()
self.load_setting() self.load_setting()
@ -96,6 +97,7 @@ class RpcEngine(BaseEngine):
return False return False
self.server.stop() self.server.stop()
self.server.join()
self.write_log("RPC服务已停止") self.write_log("RPC服务已停止")
return True return True

View File

@ -101,6 +101,7 @@ class RpcGateway(BaseGateway):
def close(self): def close(self):
"""""" """"""
self.client.stop() self.client.stop()
self.client.join()
def client_callback(self, topic: str, event: Event): def client_callback(self, topic: str, event: Event):
"""""" """"""

View File

@ -1,13 +1,23 @@
import signal
import threading import threading
import traceback import traceback
import signal from datetime import datetime, timedelta
import zmq from functools import lru_cache
from typing import Any, Callable from typing import Any, Callable
import zmq
_ = lambda x: x
# Achieve Ctrl-c interrupt recv # Achieve Ctrl-c interrupt recv
from zmq.backend.cython.constants import NOBLOCK
signal.signal(signal.SIGINT, signal.SIG_DFL) signal.signal(signal.SIGINT, signal.SIG_DFL)
KEEP_ALIVE_TOPIC = '_keep_alive'
KEEP_ALIVE_INTERVAL = timedelta(seconds=1)
KEEP_ALIVE_TOLERANCE = timedelta(seconds=3)
class RemoteException(Exception): class RemoteException(Exception):
""" """
@ -50,6 +60,8 @@ class RpcServer:
self.__active = False # RpcServer status self.__active = False # RpcServer status
self.__thread = None # RpcServer thread self.__thread = None # RpcServer thread
self._register(KEEP_ALIVE_TOPIC, lambda n: n)
def is_active(self): def is_active(self):
"""""" """"""
return self.__active return self.__active
@ -82,21 +94,29 @@ class RpcServer:
# Stop RpcServer status # Stop RpcServer status
self.__active = False self.__active = False
# Unbind socket address
self.__socket_pub.unbind(self.__socket_pub.LAST_ENDPOINT)
self.__socket_rep.unbind(self.__socket_rep.LAST_ENDPOINT)
def join(self):
# Wait for RpcServer thread to exit # Wait for RpcServer thread to exit
if self.__thread.isAlive(): if self.__thread.isAlive():
self.__thread.join() self.__thread.join()
self.__thread = None self.__thread = None
# Unbind socket address
self.__socket_pub.unbind(self.__socket_pub.LAST_ENDPOINT)
self.__socket_rep.unbind(self.__socket_rep.LAST_ENDPOINT)
def run(self): def run(self):
""" """
Run RpcServer functions Run RpcServer functions
""" """
start = datetime.utcnow()
while self.__active: while self.__active:
# Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds) # Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds)
cur = datetime.utcnow()
delta = cur - start
if delta >= KEEP_ALIVE_INTERVAL:
self.publish(KEEP_ALIVE_TOPIC, cur)
if not self.__socket_rep.poll(1000): if not self.__socket_rep.poll(1000):
continue continue
@ -127,7 +147,13 @@ class RpcServer:
""" """
Register function Register function
""" """
self.__functions[func.__name__] = func return self._register(func.__name__, func)
def _register(self, name: str, func: Callable):
"""
Register function
"""
self.__functions[name] = func
class RpcClient: class RpcClient:
@ -147,17 +173,23 @@ class RpcClient:
# Worker thread relate, used to process data pushed from server # Worker thread relate, used to process data pushed from server
self.__active = False # RpcClient status self.__active = False # RpcClient status
self.__thread = None # RpcClient thread self.__thread = None # RpcClient thread
self.__lock = threading.Lock()
self._last_received_ping: datetime = datetime.utcnow()
@lru_cache(100)
def __getattr__(self, name: str): def __getattr__(self, name: str):
""" """
Realize remote call function Realize remote call function
""" """
# Perform remote call task # Perform remote call task
def dorpc(*args, **kwargs): def dorpc(*args, **kwargs):
# Generate request # Generate request
req = [name, args, kwargs] req = [name, args, kwargs]
# Send request and wait for response # Send request and wait for response
with self.__lock:
self.__socket_req.send_pyobj(req) self.__socket_req.send_pyobj(req)
rep = self.__socket_req.recv_pyobj() rep = self.__socket_req.recv_pyobj()
@ -187,6 +219,8 @@ class RpcClient:
self.__thread = threading.Thread(target=self.run) self.__thread = threading.Thread(target=self.run)
self.__thread.start() self.__thread.start()
self._last_received_ping = datetime.utcnow()
def stop(self): def stop(self):
""" """
Stop RpcClient Stop RpcClient
@ -197,30 +231,40 @@ class RpcClient:
# Stop RpcClient status # Stop RpcClient status
self.__active = False self.__active = False
# Close socket
self.__socket_req.close()
self.__socket_sub.close()
def join(self):
# Wait for RpcClient thread to exit # Wait for RpcClient thread to exit
if self.__thread.isAlive(): if self.__thread.isAlive():
self.__thread.join() self.__thread.join()
self.__thread = None self.__thread = None
# Close socket
self.__socket_req.close()
self.__socket_sub.close()
def run(self): def run(self):
""" """
Run RpcClient function Run RpcClient function
""" """
pull_tolerance = int(KEEP_ALIVE_TOLERANCE.total_seconds() * 1000)
while self.__active: while self.__active:
# Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds) if not self.__socket_sub.poll(pull_tolerance):
if not self.__socket_sub.poll(1000): self._on_unexpected_disconnected()
continue continue
# Receive data from subscribe socket # Receive data from subscribe socket
topic, data = self.__socket_sub.recv_pyobj() topic, data = self.__socket_sub.recv_pyobj(flags=NOBLOCK)
if topic == KEEP_ALIVE_TOPIC:
self._last_received_ping = data
else:
# Process data by callable function # Process data by callable function
self.callback(topic, data) self.callback(topic, data)
@staticmethod
def _on_unexpected_disconnected():
print(_("RpcServer has no response over {tolerance} seconds, please check you connection."
.format(tolerance=KEEP_ALIVE_TOLERANCE.total_seconds())))
def callback(self, topic: str, data: Any): def callback(self, topic: str, data: Any):
""" """
Callable function Callable function