[Mod]Change websocket client code style

This commit is contained in:
vn.py 2019-01-16 09:25:18 +08:00
parent ab9ddbbce3
commit 32867a97c2
2 changed files with 171 additions and 150 deletions

View File

@ -23,28 +23,39 @@ class Request(object):
Request object for status check.
"""
def __init__(self, method, path, params, data, headers, callback):
def __init__(
self,
method: str,
path: str,
params: dict,
data: dict,
headers: dict,
callback: Callable,
on_failed: Callable = None,
on_error: Callable = None,
extra: Any = None
):
""""""
self.method = method # type: str
self.path = path # type: str
self.callback = callback # type: callable
self.params = params # type: dict #, bytes, str
self.data = data # type: dict #, bytes, str
self.headers = headers # type: dict
self.method = method
self.path = path
self.callback = callback
self.params = params
self.data = data
self.headers = headers
self.on_failed = None # type: callable
self.on_error = None # type: callable
self.extra = None # type: Any
self.on_failed = on_failed
self.on_error = on_error
self.extra = extra
self.response = None # type: requests.Response
self.status = RequestStatus.ready # type: RequestStatus
self.response = None
self.status = RequestStatus.ready
def __str__(self):
if self.response is None:
status_code = 'terminated'
else:
status_code = self.response.status_code
# todo: encoding error
return (
"reuqest : {} {} {} because {}: \n"
"headers: {}\n"
@ -83,7 +94,7 @@ class RestClient(object):
self._queue = Queue()
self._pool = None # type: Pool
def init(self, url_base):
def init(self, url_base: str):
"""
Init rest client with url_base which is the API root address.
e.g. 'https://www.bitmex.com/api/v1/'
@ -94,7 +105,7 @@ class RestClient(object):
""""""
return requests.session()
def start(self, n=3):
def start(self, n: int = 3):
"""
Start rest client with session count n.
"""
@ -117,19 +128,18 @@ class RestClient(object):
"""
self._queue.join()
def add_request(
self,
method, # type: str
path, # type: str
callback, # type: Callable[[dict, Request], Any]
params=None, # type: dict
data=None, # type: dict
headers=None, # type: dict
on_failed=None, # type: Callable[[int, Request], Any]
on_error=None, # type: Callable[[type, Exception, traceback, Request], Any]
extra=None # type: Any
): # type: (...)->Request
self,
method: str,
path: str,
callback: Callable,
params: dict = None,
data: dict = None,
headers: dict = None,
on_failed: Callable = None,
on_error: Callable = None,
extra: Any = None
):
"""
Add a new request.
:param method: GET, POST, PUT, DELETE, QUERY
@ -166,7 +176,7 @@ class RestClient(object):
et, ev, tb = sys.exc_info()
self.on_error(et, ev, tb, None)
def sign(self, request): # type: (Request)->Request
def sign(self, request: Request):
"""
This function is called before sending any request out.
Please implement signature method here.
@ -174,19 +184,18 @@ class RestClient(object):
"""
return request
def on_failed(self, status_code, request): # type:(int, Request)->None
def on_failed(self, status_code: int, request: Request):
"""
Default on_failed handler for Non-2xx response.
"""
sys.stderr.write(str(request))
def on_error(
self,
exception_type, # type: type
exception_value, # type: Exception
tb,
request # type: Optional[Request]
self,
exception_type: type,
exception_value: Exception,
tb,
request: Request
):
"""
Default on_error handler for Python exception.
@ -199,13 +208,12 @@ class RestClient(object):
)
sys.excepthook(exception_type, exception_value, tb)
def exception_detail(
self,
exception_type, # type: type
exception_value, # type: Exception
tb,
request # type: Optional[Request]
self,
exception_type: type,
exception_value: Exception,
tb,
request: Request
):
text = "[{}]: Unhandled RestClient Error:{}\n".format(
datetime.now().isoformat(),
@ -224,13 +232,13 @@ class RestClient(object):
def _process_request(
self,
request,
session
): # type: (Request, requests.Session)->None
request: Request,
session: requests.session
): # type: (Request, requests.Session)->None
"""
Sending request to server and get result.
"""
# noinspection PyBroadException
# noinspection PyBroadException
try:
request = self.sign(request)
@ -265,7 +273,7 @@ class RestClient(object):
else:
self.on_error(t, v, tb, request)
def make_full_url(self, path):
def make_full_url(self, path: str):
"""
Make relative api path into full url.
eg: make_full_url('/get') == 'http://xxxxx/get'

View File

@ -3,8 +3,8 @@
import json
import ssl
import sys
import time
import traceback
from time import sleep
from datetime import datetime
from threading import Lock, Thread
@ -15,118 +15,127 @@ class WebsocketClient(object):
"""
Websocket API
实例化之后应调用start开始后台线程调用start()函数会自动连接websocket
若要终止后台线程请调用stop() stop()函数会顺便断开websocket
After creating the client object, use start() to run worker and ping threads.
The worker thread connects websocket automatically.
Use stop to stop threads and disconnect websocket before destroying the client
object (especially when exiting the programme).
Default serialization format is json.
Callbacks to reimplement:
* on_connected
* on_disconnected
* on_packet
* on_error
该类默认打包方式为json若从服务器返回的数据不为json则会触发onError
可以覆盖以下回调
onConnected
onDisconnected
onPacket # 数据回调只有在返回的数据帧为text并且内容为json时才会回调
onError
当然为了不让用户随意自定义用自己的init函数覆盖掉原本的init(host)也是个不错的选择
关于ping
在调用start()之后该类每60s会自动发送一个ping帧至服务器
After start() is called, the ping thread will ping server every 60 seconds.
"""
def __init__(self):
"""Constructor"""
self.host = None # type: str
self.host = None
self._ws_lock = Lock()
self._ws = None # type: websocket.WebSocket
self._ws = None
self._workerThread = None # type: Thread
self._pingThread = None # type: Thread
self._worker_thread = None
self._ping_thread = None
self._active = False
# for debugging:
self._lastSentText = None
self._lastReceivedText = None
# For debugging
self._last_sent_text = None
self._last_received_text = None
def init(self, host):
def init(self, host: str):
""""""
self.host = host
def start(self):
"""
启动
:note 注意启动之后不能立即发包需要等待websocket连接成功
websocket连接成功之后会响应onConnected函数
Start the client and on_connected function is called after webscoket
is connected succesfully.
Please don't send packet untill on_connected fucntion is called.
"""
self._active = True
self._workerThread = Thread(target=self._run)
self._workerThread.start()
self._worker_thread = Thread(target=self._run)
self._worker_thread.start()
self._pingThread = Thread(target=self._runPing)
self._pingThread.start()
self._ping_thread = Thread(target=self._run_ping)
self._ping_thread.start()
def stop(self):
"""
关闭
@note 不能从工作线程也就是websocket的回调中调用
Stop the client.
This function cannot be called from worker thread or callback function.
"""
self._active = False
self._disconnect()
def join(self):
"""
等待所有工作线程退出
正确调用方式先stop()后join()
Wait till all threads finish.
"""
self._pingThread.join()
self._workerThread.join()
self._ping_thread.join()
self._worker_thread.join()
def sendPacket(self, dictObj): # type: (dict)->None
"""发出请求:相当于sendText(json.dumps(dictObj))"""
text = json.dumps(dictObj)
self._recordLastSentText(text)
return self._getWs().send(text, opcode=websocket.ABNF.OPCODE_TEXT)
def send_packet(self, packet: dict):
"""
Send a packet (dict data) to server
"""
text = json.dumps(packet)
self._record_last_sent_text(text)
return self._get_ws().send(text, opcode=websocket.ABNF.OPCODE_TEXT)
def sendText(self, text): # type: (str)->None
"""发送文本数据"""
return self._getWs().send(text, opcode=websocket.ABNF.OPCODE_TEXT)
def send_text(self, text: str):
"""
Send a text string to server.
"""
return self._get_ws().send(text, opcode=websocket.ABNF.OPCODE_TEXT)
def sendBinary(self, data): # type: (bytes)->None
"""发送字节数据"""
return self._getWs().send_binary(data)
def send_binary(self, data: bytes):
"""
Send bytes data to server.
"""
return self._get_ws().send_binary(data)
def _reconnect(self):
"""重连"""
""""""
if self._active:
self._disconnect()
self._connect()
def _createConnection(self, *args, **kwargs):
def _create_connection(self, *args, **kwargs):
""""""
return websocket.create_connection(*args, **kwargs)
def _connect(self):
""""""
self._ws = self._createConnection(
self._ws = self._create_connection(
self.host,
sslopt={'cert_reqs': ssl.CERT_NONE}
)
self.onConnected()
self.on_connected()
def _disconnect(self):
"""
断开连接
"""
with self._ws_lock:
if self._ws:
self._ws.close()
self._ws = None
def _getWs(self):
def _get_ws(self):
""""""
with self._ws_lock:
return self._ws
def _run(self):
"""
运行直到stop()被调用
Keep running till stop is called.
"""
try:
self._connect()
@ -134,123 +143,127 @@ class WebsocketClient(object):
# todo: onDisconnect
while self._active:
try:
ws = self._getWs()
ws = self._get_ws()
if ws:
text = ws.recv()
if not text: # recv在阻塞的时候ws被关闭
# ws object is closed when recv function is blocking
if not text:
self._reconnect()
continue
self._recordLastReceivedText(text)
self._record_last_received_text(text)
try:
data = self.unpackData(text)
data = self.unpack_data(text)
except ValueError as e:
print('websocket unable to parse data: ' + text)
raise e
self.onPacket(data)
except websocket.WebSocketConnectionClosedException: # 在调用recv之前ws就被关闭了
self.on_packet(data)
# ws is closed before recv function is called
except websocket.WebSocketConnectionClosedException:
self._reconnect()
except: # Python内部错误onPacket内出错
# other internal exception raised in on_packet
except:
et, ev, tb = sys.exc_info()
self.onError(et, ev, tb)
self.on_error(et, ev, tb)
self._reconnect()
except:
et, ev, tb = sys.exc_info()
self.onError(et, ev, tb)
self.on_error(et, ev, tb)
self._reconnect()
@staticmethod
def unpackData(data):
def unpack_data(data: str):
"""
解密数据默认使用json解密为dict
解密后的数据将会传入onPacket
如果需要使用不同的解密方式就重载这个函数
:param data 收到的数据可能是text frame也可能是binary frame, 目前并没有区分这两者
Default serialization format is json.
Reimplement this method if you want to use other serialization format.
"""
return json.loads(data)
def _runPing(self):
def _run_ping(self):
""""""
while self._active:
try:
self._ping()
except:
et, ev, tb = sys.exc_info()
# todo: just log this, notifying user is not necessary
self.onError(et, ev, tb)
self.on_error(et, ev, tb)
self._reconnect()
for i in range(60):
if not self._active:
break
time.sleep(1)
sleep(1)
def _ping(self):
ws = self._getWs()
""""""
ws = self._get_ws()
if ws:
ws.send('ping', websocket.ABNF.OPCODE_PING)
@staticmethod
def onConnected():
def on_connected():
"""
连接成功回调
Callback when websocket is connected successfully.
"""
pass
@staticmethod
def onDisconnected():
def on_disconnected():
"""
连接断开回调
Callback when websocket connection is lost.
"""
pass
@staticmethod
def onPacket(packet):
def on_packet(packet):
"""
数据回调
只有在数据为json包的时候才会触发这个回调
@:param data: dict
@:return:
Callback when receiving data from server.
"""
pass
def onError(self, exceptionType, exceptionValue, tb):
def on_error(self, exception_type, exception_value, tb):
"""
Python错误回调
todo: 以后详细的错误信息最好记录在文件里用uuid来联系/区分具体错误
Callback when exception raised.
"""
sys.stderr.write(
self.exceptionDetail(exceptionType,
exceptionValue,
tb)
self.exception_detail(exception_type,
exception_value,
tb)
)
return sys.excepthook(exception_type, exception_value, tb)
# 丢给默认的错误处理函数所以如果不重载onError一般的结果是程序会崩溃
return sys.excepthook(exceptionType, exceptionValue, tb)
def exceptionDetail(self, exceptionType, exceptionValue, tb):
"""打印详细的错误信息"""
def exception_detail(self, exception_type, exception_value, tb):
"""
Print detailed exception information.
"""
text = "[{}]: Unhandled WebSocket Error:{}\n".format(
datetime.now().isoformat(),
exceptionType
exception_type
)
text += "LastSentText:\n{}\n".format(self._lastSentText)
text += "LastReceivedText:\n{}\n".format(self._lastReceivedText)
text += "LastSentText:\n{}\n".format(self._last_sent_text)
text += "LastReceivedText:\n{}\n".format(self._last_received_text)
text += "Exception trace: \n"
text += "".join(
traceback.format_exception(
exceptionType,
exceptionValue,
exception_type,
exception_value,
tb,
)
)
return text
def _recordLastSentText(self, text):
def _record_last_sent_text(self, text):
"""
用于Debug 记录最后一次发送出去的text
Record last sent text for debug purpose.
"""
self._lastSentText = text[:1000]
self._last_sent_text = text[:1000]
def _recordLastReceivedText(self, text):
def _record_last_received_text(self, text):
"""
用于Debug 记录最后一次发送出去的text
Record last received text for debug purpose.
"""
self._lastReceivedText = text[:1000]
self._last_received_text = text[:1000]