From 32867a97c2ec601488297fa238afd5b65616e508 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Wed, 16 Jan 2019 09:25:18 +0800 Subject: [PATCH] [Mod]Change websocket client code style --- vnpy/api/rest/rest_client.py | 100 ++++++----- vnpy/api/websocket/websocket_client.py | 221 +++++++++++++------------ 2 files changed, 171 insertions(+), 150 deletions(-) diff --git a/vnpy/api/rest/rest_client.py b/vnpy/api/rest/rest_client.py index 29ce1830..12b20133 100644 --- a/vnpy/api/rest/rest_client.py +++ b/vnpy/api/rest/rest_client.py @@ -23,28 +23,39 @@ class Request(object): Request object for status check. """ - def __init__(self, method, path, params, data, headers, callback): + def __init__( + self, + method: str, + path: str, + params: dict, + data: dict, + headers: dict, + callback: Callable, + on_failed: Callable = None, + on_error: Callable = None, + extra: Any = None + ): """""" - self.method = method # type: str - self.path = path # type: str - self.callback = callback # type: callable - self.params = params # type: dict #, bytes, str - self.data = data # type: dict #, bytes, str - self.headers = headers # type: dict + self.method = method + self.path = path + self.callback = callback + self.params = params + self.data = data + self.headers = headers - self.on_failed = None # type: callable - self.on_error = None # type: callable - self.extra = None # type: Any + self.on_failed = on_failed + self.on_error = on_error + self.extra = extra - self.response = None # type: requests.Response - self.status = RequestStatus.ready # type: RequestStatus + self.response = None + self.status = RequestStatus.ready def __str__(self): if self.response is None: status_code = 'terminated' else: status_code = self.response.status_code - # todo: encoding error + return ( "reuqest : {} {} {} because {}: \n" "headers: {}\n" @@ -83,7 +94,7 @@ class RestClient(object): self._queue = Queue() self._pool = None # type: Pool - def init(self, url_base): + def init(self, url_base: str): """ Init rest client with url_base which is the API root address. e.g. 'https://www.bitmex.com/api/v1/' @@ -94,7 +105,7 @@ class RestClient(object): """""" return requests.session() - def start(self, n=3): + def start(self, n: int = 3): """ Start rest client with session count n. """ @@ -117,19 +128,18 @@ class RestClient(object): """ self._queue.join() - def add_request( - self, - method, # type: str - path, # type: str - callback, # type: Callable[[dict, Request], Any] - params=None, # type: dict - data=None, # type: dict - headers=None, # type: dict - on_failed=None, # type: Callable[[int, Request], Any] - on_error=None, # type: Callable[[type, Exception, traceback, Request], Any] - extra=None # type: Any - ): # type: (...)->Request + self, + method: str, + path: str, + callback: Callable, + params: dict = None, + data: dict = None, + headers: dict = None, + on_failed: Callable = None, + on_error: Callable = None, + extra: Any = None + ): """ Add a new request. :param method: GET, POST, PUT, DELETE, QUERY @@ -166,7 +176,7 @@ class RestClient(object): et, ev, tb = sys.exc_info() self.on_error(et, ev, tb, None) - def sign(self, request): # type: (Request)->Request + def sign(self, request: Request): """ This function is called before sending any request out. Please implement signature method here. @@ -174,19 +184,18 @@ class RestClient(object): """ return request - def on_failed(self, status_code, request): # type:(int, Request)->None + def on_failed(self, status_code: int, request: Request): """ Default on_failed handler for Non-2xx response. """ sys.stderr.write(str(request)) - def on_error( - self, - exception_type, # type: type - exception_value, # type: Exception - tb, - request # type: Optional[Request] + self, + exception_type: type, + exception_value: Exception, + tb, + request: Request ): """ Default on_error handler for Python exception. @@ -199,13 +208,12 @@ class RestClient(object): ) sys.excepthook(exception_type, exception_value, tb) - def exception_detail( - self, - exception_type, # type: type - exception_value, # type: Exception - tb, - request # type: Optional[Request] + self, + exception_type: type, + exception_value: Exception, + tb, + request: Request ): text = "[{}]: Unhandled RestClient Error:{}\n".format( datetime.now().isoformat(), @@ -224,13 +232,13 @@ class RestClient(object): def _process_request( self, - request, - session - ): # type: (Request, requests.Session)->None + request: Request, + session: requests.session + ): # type: (Request, requests.Session)->None """ Sending request to server and get result. """ - # noinspection PyBroadException + # noinspection PyBroadException try: request = self.sign(request) @@ -265,7 +273,7 @@ class RestClient(object): else: self.on_error(t, v, tb, request) - def make_full_url(self, path): + def make_full_url(self, path: str): """ Make relative api path into full url. eg: make_full_url('/get') == 'http://xxxxx/get' diff --git a/vnpy/api/websocket/websocket_client.py b/vnpy/api/websocket/websocket_client.py index 8f3a2f74..f10d92f0 100644 --- a/vnpy/api/websocket/websocket_client.py +++ b/vnpy/api/websocket/websocket_client.py @@ -3,8 +3,8 @@ import json import ssl import sys -import time import traceback +from time import sleep from datetime import datetime from threading import Lock, Thread @@ -15,118 +15,127 @@ class WebsocketClient(object): """ Websocket API - 实例化之后,应调用start开始后台线程。调用start()函数会自动连接websocket。 - 若要终止后台线程,请调用stop()。 stop()函数会顺便断开websocket。 + After creating the client object, use start() to run worker and ping threads. + The worker thread connects websocket automatically. + + Use stop to stop threads and disconnect websocket before destroying the client + object (especially when exiting the programme). + + Default serialization format is json. + + Callbacks to reimplement: + * on_connected + * on_disconnected + * on_packet + * on_error - 该类默认打包方式为json,若从服务器返回的数据不为json,则会触发onError。 - - 可以覆盖以下回调: - onConnected - onDisconnected - onPacket # 数据回调,只有在返回的数据帧为text并且内容为json时才会回调 - onError - - 当然,为了不让用户随意自定义,用自己的init函数覆盖掉原本的init(host)也是个不错的选择。 - - 关于ping: - 在调用start()之后,该类每60s会自动发送一个ping帧至服务器。 + After start() is called, the ping thread will ping server every 60 seconds. """ def __init__(self): """Constructor""" - self.host = None # type: str + self.host = None self._ws_lock = Lock() - self._ws = None # type: websocket.WebSocket + self._ws = None - self._workerThread = None # type: Thread - self._pingThread = None # type: Thread + self._worker_thread = None + self._ping_thread = None self._active = False - # for debugging: - self._lastSentText = None - self._lastReceivedText = None + # For debugging + self._last_sent_text = None + self._last_received_text = None - def init(self, host): + def init(self, host: str): + """""" self.host = host def start(self): """ - 启动 - :note 注意:启动之后不能立即发包,需要等待websocket连接成功。 - websocket连接成功之后会响应onConnected函数 + Start the client and on_connected function is called after webscoket + is connected succesfully. + + Please don't send packet untill on_connected fucntion is called. """ self._active = True - self._workerThread = Thread(target=self._run) - self._workerThread.start() + self._worker_thread = Thread(target=self._run) + self._worker_thread.start() - self._pingThread = Thread(target=self._runPing) - self._pingThread.start() + self._ping_thread = Thread(target=self._run_ping) + self._ping_thread.start() def stop(self): """ - 关闭 - @note 不能从工作线程,也就是websocket的回调中调用 + Stop the client. + + This function cannot be called from worker thread or callback function. """ self._active = False self._disconnect() def join(self): """ - 等待所有工作线程退出 - 正确调用方式:先stop()后join() + Wait till all threads finish. """ - self._pingThread.join() - self._workerThread.join() + self._ping_thread.join() + self._worker_thread.join() - def sendPacket(self, dictObj): # type: (dict)->None - """发出请求:相当于sendText(json.dumps(dictObj))""" - text = json.dumps(dictObj) - self._recordLastSentText(text) - return self._getWs().send(text, opcode=websocket.ABNF.OPCODE_TEXT) + def send_packet(self, packet: dict): + """ + Send a packet (dict data) to server + """ + text = json.dumps(packet) + self._record_last_sent_text(text) + return self._get_ws().send(text, opcode=websocket.ABNF.OPCODE_TEXT) - def sendText(self, text): # type: (str)->None - """发送文本数据""" - return self._getWs().send(text, opcode=websocket.ABNF.OPCODE_TEXT) + def send_text(self, text: str): + """ + Send a text string to server. + """ + return self._get_ws().send(text, opcode=websocket.ABNF.OPCODE_TEXT) - def sendBinary(self, data): # type: (bytes)->None - """发送字节数据""" - return self._getWs().send_binary(data) + def send_binary(self, data: bytes): + """ + Send bytes data to server. + """ + return self._get_ws().send_binary(data) def _reconnect(self): - """重连""" + """""" if self._active: self._disconnect() self._connect() - def _createConnection(self, *args, **kwargs): + def _create_connection(self, *args, **kwargs): + """""" return websocket.create_connection(*args, **kwargs) def _connect(self): """""" - self._ws = self._createConnection( + self._ws = self._create_connection( self.host, sslopt={'cert_reqs': ssl.CERT_NONE} ) - self.onConnected() + self.on_connected() def _disconnect(self): """ - 断开连接 """ with self._ws_lock: if self._ws: self._ws.close() self._ws = None - def _getWs(self): + def _get_ws(self): + """""" with self._ws_lock: return self._ws def _run(self): """ - 运行,直到stop()被调用 + Keep running till stop is called. """ try: self._connect() @@ -134,123 +143,127 @@ class WebsocketClient(object): # todo: onDisconnect while self._active: try: - ws = self._getWs() + ws = self._get_ws() if ws: text = ws.recv() - if not text: # recv在阻塞的时候ws被关闭 + + # ws object is closed when recv function is blocking + if not text: self._reconnect() continue - self._recordLastReceivedText(text) + + self._record_last_received_text(text) + try: - data = self.unpackData(text) + data = self.unpack_data(text) except ValueError as e: print('websocket unable to parse data: ' + text) raise e - self.onPacket(data) - except websocket.WebSocketConnectionClosedException: # 在调用recv之前ws就被关闭了 + + self.on_packet(data) + # ws is closed before recv function is called + except websocket.WebSocketConnectionClosedException: self._reconnect() - except: # Python内部错误(onPacket内出错) + + # other internal exception raised in on_packet + except: et, ev, tb = sys.exc_info() - self.onError(et, ev, tb) + self.on_error(et, ev, tb) self._reconnect() except: et, ev, tb = sys.exc_info() - self.onError(et, ev, tb) + self.on_error(et, ev, tb) self._reconnect() @staticmethod - def unpackData(data): + def unpack_data(data: str): """ - 解密数据,默认使用json解密为dict - 解密后的数据将会传入onPacket - 如果需要使用不同的解密方式,就重载这个函数。 - :param data 收到的数据,可能是text frame,也可能是binary frame, 目前并没有区分这两者 + Default serialization format is json. + + Reimplement this method if you want to use other serialization format. """ return json.loads(data) - def _runPing(self): + def _run_ping(self): + """""" while self._active: try: self._ping() except: et, ev, tb = sys.exc_info() - # todo: just log this, notifying user is not necessary - self.onError(et, ev, tb) + self.on_error(et, ev, tb) self._reconnect() for i in range(60): if not self._active: break - time.sleep(1) + sleep(1) def _ping(self): - ws = self._getWs() + """""" + ws = self._get_ws() if ws: ws.send('ping', websocket.ABNF.OPCODE_PING) @staticmethod - def onConnected(): + def on_connected(): """ - 连接成功回调 + Callback when websocket is connected successfully. """ pass @staticmethod - def onDisconnected(): + def on_disconnected(): """ - 连接断开回调 + Callback when websocket connection is lost. """ pass @staticmethod - def onPacket(packet): + def on_packet(packet): """ - 数据回调。 - 只有在数据为json包的时候才会触发这个回调 - @:param data: dict - @:return: + Callback when receiving data from server. """ pass - def onError(self, exceptionType, exceptionValue, tb): + def on_error(self, exception_type, exception_value, tb): """ - Python错误回调 - todo: 以后详细的错误信息最好记录在文件里,用uuid来联系/区分具体错误 + Callback when exception raised. """ sys.stderr.write( - self.exceptionDetail(exceptionType, - exceptionValue, - tb) + self.exception_detail(exception_type, + exception_value, + tb) ) + return sys.excepthook(exception_type, exception_value, tb) - # 丢给默认的错误处理函数(所以如果不重载onError,一般的结果是程序会崩溃) - return sys.excepthook(exceptionType, exceptionValue, tb) - - def exceptionDetail(self, exceptionType, exceptionValue, tb): - """打印详细的错误信息""" + def exception_detail(self, exception_type, exception_value, tb): + """ + Print detailed exception information. + """ text = "[{}]: Unhandled WebSocket Error:{}\n".format( datetime.now().isoformat(), - exceptionType + exception_type ) - text += "LastSentText:\n{}\n".format(self._lastSentText) - text += "LastReceivedText:\n{}\n".format(self._lastReceivedText) + text += "LastSentText:\n{}\n".format(self._last_sent_text) + text += "LastReceivedText:\n{}\n".format(self._last_received_text) text += "Exception trace: \n" text += "".join( traceback.format_exception( - exceptionType, - exceptionValue, + exception_type, + exception_value, tb, ) ) return text - def _recordLastSentText(self, text): + def _record_last_sent_text(self, text): """ - 用于Debug: 记录最后一次发送出去的text + Record last sent text for debug purpose. """ - self._lastSentText = text[:1000] + self._last_sent_text = text[:1000] - def _recordLastReceivedText(self, text): + def _record_last_received_text(self, text): """ - 用于Debug: 记录最后一次发送出去的text + Record last received text for debug purpose. """ - self._lastReceivedText = text[:1000] \ No newline at end of file + self._last_received_text = text[:1000] \ No newline at end of file