Merge pull request #2032 from nanoric/rpc

[Add] RpcClient: Added send lock.
This commit is contained in:
vn.py 2019-08-21 10:32:48 +08:00 committed by GitHub
commit ed061f585e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 68 additions and 21 deletions

View File

@ -1,6 +1,7 @@
""""""
import traceback
from typing import Optional
from vnpy.event import Event, EventEngine
from vnpy.rpc import RpcServer
@ -24,7 +25,7 @@ class RpcEngine(BaseEngine):
self.rep_address = "tcp://*:2014"
self.pub_address = "tcp://*:4102"
self.server = None
self.server: Optional[RpcServer] = None
self.init_server()
self.load_setting()
@ -96,6 +97,7 @@ class RpcEngine(BaseEngine):
return False
self.server.stop()
self.server.join()
self.write_log("RPC服务已停止")
return True

View File

@ -101,6 +101,7 @@ class RpcGateway(BaseGateway):
def close(self):
""""""
self.client.stop()
self.client.join()
def client_callback(self, topic: str, event: Event):
""""""

View File

@ -1,13 +1,23 @@
import signal
import threading
import traceback
import signal
import zmq
from datetime import datetime, timedelta
from functools import lru_cache
from typing import Any, Callable
import zmq
_ = lambda x: x
# Achieve Ctrl-c interrupt recv
from zmq.backend.cython.constants import NOBLOCK
signal.signal(signal.SIGINT, signal.SIG_DFL)
KEEP_ALIVE_TOPIC = '_keep_alive'
KEEP_ALIVE_INTERVAL = timedelta(seconds=1)
KEEP_ALIVE_TOLERANCE = timedelta(seconds=3)
class RemoteException(Exception):
"""
@ -50,6 +60,8 @@ class RpcServer:
self.__active = False # RpcServer status
self.__thread = None # RpcServer thread
self._register(KEEP_ALIVE_TOPIC, lambda n: n)
def is_active(self):
""""""
return self.__active
@ -82,21 +94,29 @@ class RpcServer:
# Stop RpcServer status
self.__active = False
# Unbind socket address
self.__socket_pub.unbind(self.__socket_pub.LAST_ENDPOINT)
self.__socket_rep.unbind(self.__socket_rep.LAST_ENDPOINT)
def join(self):
# Wait for RpcServer thread to exit
if self.__thread.isAlive():
self.__thread.join()
self.__thread = None
# Unbind socket address
self.__socket_pub.unbind(self.__socket_pub.LAST_ENDPOINT)
self.__socket_rep.unbind(self.__socket_rep.LAST_ENDPOINT)
def run(self):
"""
Run RpcServer functions
"""
start = datetime.utcnow()
while self.__active:
# Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds)
cur = datetime.utcnow()
delta = cur - start
if delta >= KEEP_ALIVE_INTERVAL:
self.publish(KEEP_ALIVE_TOPIC, cur)
if not self.__socket_rep.poll(1000):
continue
@ -127,7 +147,13 @@ class RpcServer:
"""
Register function
"""
self.__functions[func.__name__] = func
return self._register(func.__name__, func)
def _register(self, name: str, func: Callable):
"""
Register function
"""
self.__functions[name] = func
class RpcClient:
@ -145,21 +171,27 @@ class RpcClient:
self.__socket_sub = self.__context.socket(zmq.SUB)
# Worker thread relate, used to process data pushed from server
self.__active = False # RpcClient status
self.__thread = None # RpcClient thread
self.__active = False # RpcClient status
self.__thread = None # RpcClient thread
self.__lock = threading.Lock()
self._last_received_ping: datetime = datetime.utcnow()
@lru_cache(100)
def __getattr__(self, name: str):
"""
Realize remote call function
"""
# Perform remote call task
def dorpc(*args, **kwargs):
# Generate request
req = [name, args, kwargs]
# Send request and wait for response
self.__socket_req.send_pyobj(req)
rep = self.__socket_req.recv_pyobj()
with self.__lock:
self.__socket_req.send_pyobj(req)
rep = self.__socket_req.recv_pyobj()
# Return response if successed; Trigger exception if failed
if rep[0]:
@ -187,6 +219,8 @@ class RpcClient:
self.__thread = threading.Thread(target=self.run)
self.__thread.start()
self._last_received_ping = datetime.utcnow()
def stop(self):
"""
Stop RpcClient
@ -197,29 +231,39 @@ class RpcClient:
# Stop RpcClient status
self.__active = False
# Close socket
self.__socket_req.close()
self.__socket_sub.close()
def join(self):
# Wait for RpcClient thread to exit
if self.__thread.isAlive():
self.__thread.join()
self.__thread = None
# Close socket
self.__socket_req.close()
self.__socket_sub.close()
def run(self):
"""
Run RpcClient function
"""
pull_tolerance = int(KEEP_ALIVE_TOLERANCE.total_seconds() * 1000)
while self.__active:
# Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds)
if not self.__socket_sub.poll(1000):
if not self.__socket_sub.poll(pull_tolerance):
self._on_unexpected_disconnected()
continue
# Receive data from subscribe socket
topic, data = self.__socket_sub.recv_pyobj()
topic, data = self.__socket_sub.recv_pyobj(flags=NOBLOCK)
# Process data by callable function
self.callback(topic, data)
if topic == KEEP_ALIVE_TOPIC:
self._last_received_ping = data
else:
# Process data by callable function
self.callback(topic, data)
@staticmethod
def _on_unexpected_disconnected():
print(_("RpcServer has no response over {tolerance} seconds, please check you connection."
.format(tolerance=KEEP_ALIVE_TOLERANCE.total_seconds())))
def callback(self, topic: str, data: Any):
"""