[Add] Rest and websocket client

This commit is contained in:
vn.py 2019-01-13 13:23:10 +08:00
parent 242a97a2b2
commit f912207d3f
9 changed files with 561 additions and 1 deletions

View File

@ -1,3 +1,4 @@
PyQt5 PyQt5
qdarkstyle qdarkstyle
futu-api futu-api
websocket-client

View File

@ -0,0 +1 @@
from .RestClient import Request, RequestStatus, RestClient

View File

@ -0,0 +1,278 @@
# encoding: UTF-8
import sys
import traceback
from queue import Empty, Queue
from datetime import datetime
from multiprocessing.dummy import Pool
import requests
from enum import Enum
from typing import Any, Callable, Optional
class RequestStatus(Enum):
ready = 0 # 刚刚构建
success = 1 # 请求成功 code == 2xx
failed = 2
error = 3 # 发生错误 网络错误、json解析错误等等
class Request(object):
"""
表示一个内部的Request用于状态查询
"""
def __init__(self, method, path, params, data, headers, callback):
self.method = method # type: str
self.path = path # type: str
self.callback = callback # type: callable
self.params = params # type: dict #, bytes, str
self.data = data # type: dict #, bytes, str
self.headers = headers # type: dict
self.onFailed = None # type: callable
self.onError = None # type: callable
self.extra = None # type: Any
self.response = None # type: requests.Response
self.status = RequestStatus.ready # type: RequestStatus
def __str__(self):
if self.response is None:
statusCode = 'terminated'
else:
statusCode = self.response.status_code
# todo: encoding error
return (
"reuqest : {} {} {} because {}: \n"
"headers: {}\n"
"params: {}\n"
"data: {}\n"
"response:"
"{}\n".format(
self.method,
self.path,
self.status.name,
statusCode,
self.headers,
self.params,
self.data,
'' if self.response is None else self.response.text
)
)
class RestClient(object):
"""
HTTP 客户端目前是为了对接各种RESTfulAPI而设计的
如果需要给请求加上签名请设置beforeRequest, 函数类型请参考defaultBeforeRequest
如果需要处理非2xx的请求请设置onFailed函数类型请参考defaultOnFailed
如果每一个请求的非2xx返回都需要单独处理使用addReq函数的onFailed参数
如果捕获Python内部错误例如网络连接失败等等请设置onError函数类型请参考defaultOnError
"""
def __init__(self):
"""
"""
self.urlBase = None # type: str
self._active = False
self._queue = Queue()
self._pool = None # type: Pool
def init(self, urlBase):
"""
初始化
:param urlBase: 路径前缀 例如'https://www.bitmex.com/api/v1/'
"""
self.urlBase = urlBase
def _createSession(self):
""""""
return requests.session()
def start(self, n=3):
"""启动"""
if self._active:
return
self._active = True
self._pool = Pool(n)
self._pool.apply_async(self._run)
def stop(self):
"""
强制停止运行未发出的请求都会被暂停仍处于队列中
:return:
"""
self._active = False
def join(self):
"""
等待所有请求处理结束
如果要并确保RestClient的退出请在调用stop之后紧接着调用join
如果只是要确保所有的请求都处理完直接调用join即可
:return:
"""
self._queue.join()
def addRequest(self,
method, # type: str
path, # type: str
callback, # type: Callable[[dict, Request], Any]
params=None, # type: dict
data=None, # type: dict
headers=None, # type: dict
onFailed=None, # type: Callable[[int, Request], Any]
onError=None, # type: Callable[[type, Exception, traceback, Request], Any]
extra=None # type: Any
): # type: (...)->Request
"""
发送一个请求
:param method: GET, POST, PUT, DELETE, QUERY
:param path:
:param callback: 请求成功后的回调(状态吗为2xx时认为请求成功) type: (dict, Request)
:param params: dict for query string
:param data: dict for body
:param headers: dict for headers
:param onFailed: 请求失败后的回调(状态吗不为2xx时认为请求失败)如果指定该值默认的onFailed将不会被调用 type: (code, dict, Request)
:param onError: 请求出现Python错误后的回调如果指定该值默认的onError将不会被调用 type: (etype, evalue, tb, Request)
:param extra: 返回值的extra字段会被设置为这个值当然你也可以在函数调用之后再设置这个字段
:return: Request
"""
request = Request(method, path, params, data, headers, callback)
request.extra = extra
request.onFailed = onFailed
request.onError = onError
self._queue.put(request)
return request
def _run(self):
try:
session = self._createSession()
while self._active:
try:
request = self._queue.get(timeout=1)
try:
self._processRequest(request, session)
finally:
self._queue.task_done()
except Empty:
pass
except:
et, ev, tb = sys.exc_info()
self.onError(et, ev, tb, None)
def sign(self, request): # type: (Request)->Request
"""
所有请求在发送之前都会经过这个函数
签名之类的前奏可以在这里面实现
需要对request进行什么修改就做什么修改吧
@:return (request)
"""
return request
def onFailed(self, httpStatusCode, request): # type:(int, Request)->None
"""
请求失败处理函数HttpStatusCode!=2xx.
默认行为是打印到stderr
"""
sys.stderr.write(str(request))
def onError(self,
exceptionType, # type: type
exceptionValue, # type: Exception
tb,
request # type: Optional[Request]
):
"""
Python内部错误处理默认行为是仍给excepthook
:param request 如果是在处理请求的时候出错它的值就是对应的Request否则为None
"""
sys.stderr.write(
self.exceptionDetail(exceptionType,
exceptionValue,
tb,
request)
)
sys.excepthook(exceptionType, exceptionValue, tb)
def exceptionDetail(self,
exceptionType, # type: type
exceptionValue, # type: Exception
tb,
request # type: Optional[Request]
):
text = "[{}]: Unhandled RestClient Error:{}\n".format(
datetime.now().isoformat(),
exceptionType
)
text += "request:{}\n".format(request)
text += "Exception trace: \n"
text += "".join(
traceback.format_exception(
exceptionType,
exceptionValue,
tb,
)
)
return text
def _processRequest(
self,
request,
session
): # type: (Request, requests.Session)->None
"""
用于内部将请求发送出去
"""
# noinspection PyBroadException
try:
request = self.sign(request)
url = self.makeFullUrl(request.path)
response = session.request(
request.method,
url,
headers=request.headers,
params=request.params,
data=request.data
)
request.response = response
httpStatusCode = response.status_code
if httpStatusCode / 100 == 2: # 2xx都算成功尽管交易所都用200
jsonBody = response.json()
request.callback(jsonBody, request)
request.status = RequestStatus.success
else:
request.status = RequestStatus.failed
if request.onFailed:
request.onFailed(httpStatusCode, request)
else:
self.onFailed(httpStatusCode, request)
except:
request.status = RequestStatus.error
t, v, tb = sys.exc_info()
if request.onError:
request.onError(t, v, tb, request)
else:
self.onError(t, v, tb, request)
def makeFullUrl(self, path):
"""
将相对路径补充成绝对路径
eg: makeFullUrl('/get') == 'http://xxxxx/get'
:param path:
:return:
"""
url = self.urlBase + path
return url

View File

@ -0,0 +1 @@
from .WebsocketClient import WebsocketClient

View File

@ -0,0 +1,256 @@
# encoding: UTF-8
import json
import ssl
import sys
import time
import traceback
from datetime import datetime
from threading import Lock, Thread
import websocket
class WebsocketClient(object):
"""
Websocket API
实例化之后应调用start开始后台线程调用start()函数会自动连接websocket
若要终止后台线程请调用stop() stop()函数会顺便断开websocket
该类默认打包方式为json若从服务器返回的数据不为json则会触发onError
可以覆盖以下回调
onConnected
onDisconnected
onPacket # 数据回调只有在返回的数据帧为text并且内容为json时才会回调
onError
当然为了不让用户随意自定义用自己的init函数覆盖掉原本的init(host)也是个不错的选择
关于ping
在调用start()之后该类每60s会自动发送一个ping帧至服务器
"""
def __init__(self):
"""Constructor"""
self.host = None # type: str
self._ws_lock = Lock()
self._ws = None # type: websocket.WebSocket
self._workerThread = None # type: Thread
self._pingThread = None # type: Thread
self._active = False
# for debugging:
self._lastSentText = None
self._lastReceivedText = None
def init(self, host):
self.host = host
def start(self):
"""
启动
:note 注意启动之后不能立即发包需要等待websocket连接成功
websocket连接成功之后会响应onConnected函数
"""
self._active = True
self._workerThread = Thread(target=self._run)
self._workerThread.start()
self._pingThread = Thread(target=self._runPing)
self._pingThread.start()
def stop(self):
"""
关闭
@note 不能从工作线程也就是websocket的回调中调用
"""
self._active = False
self._disconnect()
def join(self):
"""
等待所有工作线程退出
正确调用方式先stop()后join()
"""
self._pingThread.join()
self._workerThread.join()
def sendPacket(self, dictObj): # type: (dict)->None
"""发出请求:相当于sendText(json.dumps(dictObj))"""
text = json.dumps(dictObj)
self._recordLastSentText(text)
return self._getWs().send(text, opcode=websocket.ABNF.OPCODE_TEXT)
def sendText(self, text): # type: (str)->None
"""发送文本数据"""
return self._getWs().send(text, opcode=websocket.ABNF.OPCODE_TEXT)
def sendBinary(self, data): # type: (bytes)->None
"""发送字节数据"""
return self._getWs().send_binary(data)
def _reconnect(self):
"""重连"""
if self._active:
self._disconnect()
self._connect()
def _createConnection(self, *args, **kwargs):
return websocket.create_connection(*args, **kwargs)
def _connect(self):
""""""
self._ws = self._createConnection(
self.host,
sslopt={'cert_reqs': ssl.CERT_NONE}
)
self.onConnected()
def _disconnect(self):
"""
断开连接
"""
with self._ws_lock:
if self._ws:
self._ws.close()
self._ws = None
def _getWs(self):
with self._ws_lock:
return self._ws
def _run(self):
"""
运行直到stop()被调用
"""
try:
self._connect()
# todo: onDisconnect
while self._active:
try:
ws = self._getWs()
if ws:
text = ws.recv()
if not text: # recv在阻塞的时候ws被关闭
self._reconnect()
continue
self._recordLastReceivedText(text)
try:
data = self.unpackData(text)
except ValueError as e:
print('websocket unable to parse data: ' + text)
raise e
self.onPacket(data)
except websocket.WebSocketConnectionClosedException: # 在调用recv之前ws就被关闭了
self._reconnect()
except: # Python内部错误onPacket内出错
et, ev, tb = sys.exc_info()
self.onError(et, ev, tb)
self._reconnect()
except:
et, ev, tb = sys.exc_info()
self.onError(et, ev, tb)
self._reconnect()
@staticmethod
def unpackData(data):
"""
解密数据默认使用json解密为dict
解密后的数据将会传入onPacket
如果需要使用不同的解密方式就重载这个函数
:param data 收到的数据可能是text frame也可能是binary frame, 目前并没有区分这两者
"""
return json.loads(data)
def _runPing(self):
while self._active:
try:
self._ping()
except:
et, ev, tb = sys.exc_info()
# todo: just log this, notifying user is not necessary
self.onError(et, ev, tb)
self._reconnect()
for i in range(60):
if not self._active:
break
time.sleep(1)
def _ping(self):
ws = self._getWs()
if ws:
ws.send('ping', websocket.ABNF.OPCODE_PING)
@staticmethod
def onConnected():
"""
连接成功回调
"""
pass
@staticmethod
def onDisconnected():
"""
连接断开回调
"""
pass
@staticmethod
def onPacket(packet):
"""
数据回调
只有在数据为json包的时候才会触发这个回调
@:param data: dict
@:return:
"""
pass
def onError(self, exceptionType, exceptionValue, tb):
"""
Python错误回调
todo: 以后详细的错误信息最好记录在文件里用uuid来联系/区分具体错误
"""
sys.stderr.write(
self.exceptionDetail(exceptionType,
exceptionValue,
tb)
)
# 丢给默认的错误处理函数所以如果不重载onError一般的结果是程序会崩溃
return sys.excepthook(exceptionType, exceptionValue, tb)
def exceptionDetail(self, exceptionType, exceptionValue, tb):
"""打印详细的错误信息"""
text = "[{}]: Unhandled WebSocket Error:{}\n".format(
datetime.now().isoformat(),
exceptionType
)
text += "LastSentText:\n{}\n".format(self._lastSentText)
text += "LastReceivedText:\n{}\n".format(self._lastReceivedText)
text += "Exception trace: \n"
text += "".join(
traceback.format_exception(
exceptionType,
exceptionValue,
tb,
)
)
return text
def _recordLastSentText(self, text):
"""
用于Debug 记录最后一次发送出去的text
"""
self._lastSentText = text[:1000]
def _recordLastReceivedText(self, text):
"""
用于Debug 记录最后一次发送出去的text
"""
self._lastReceivedText = text[:1000]

View File

@ -116,6 +116,8 @@ class FutuGateway(BaseGateway):
self.ticks = {} self.ticks = {}
self.trades = set() self.trades = set()
self.contracts = {}
self.thread = Thread(target=self.query_data) self.thread = Thread(target=self.query_data)
# For query function. # For query function.
@ -311,6 +313,7 @@ class FutuGateway(BaseGateway):
gateway_name=self.gateway_name gateway_name=self.gateway_name
) )
self.on_contract(contract) self.on_contract(contract)
self.contracts[contract.vt_symbol] = contract
self.write_log("合约信息查询成功") self.write_log("合约信息查询成功")
@ -401,6 +404,11 @@ class FutuGateway(BaseGateway):
gateway_name=self.gateway_name gateway_name=self.gateway_name
) )
self.ticks[code] = tick self.ticks[code] = tick
contract = self.contracts.get(tick.vt_symbol, None)
if contract:
tick.name = contract.name
return tick return tick
def process_quote(self, data): def process_quote(self, data):

View File

@ -197,6 +197,7 @@ class IbApi(EWrapper):
self.ticks = {} self.ticks = {}
self.orders = {} self.orders = {}
self.accounts = {} self.accounts = {}
self.contracts = {}
self.tick_exchange = {} self.tick_exchange = {}
@ -265,6 +266,11 @@ class IbApi(EWrapper):
name = TICKFIELD_IB2VT[tickType] name = TICKFIELD_IB2VT[tickType]
setattr(tick, name, price) setattr(tick, name, price)
# Update name into tick data.
contract = self.contracts.get(tick.vt_symbol, None)
if contract:
tick.name = contract.name
# Forex and spot product of IDEALPRO has no tick time and last price. # Forex and spot product of IDEALPRO has no tick time and last price.
# We need to calculate locally. # We need to calculate locally.
exchange = self.tick_exchange[reqId] exchange = self.tick_exchange[reqId]
@ -467,8 +473,11 @@ class IbApi(EWrapper):
pricetick=contractDetails.minTick, pricetick=contractDetails.minTick,
gateway_name=self.gateway_name gateway_name=self.gateway_name
) )
self.gateway.on_contract(contract) self.gateway.on_contract(contract)
self.contracts[contract.vt_symbol] = contract
def execDetails(self, reqId: int, contract: Contract, execution: Execution): def execDetails(self, reqId: int, contract: Contract, execution: Execution):
""" """
Callback of trade data update. Callback of trade data update.

View File

@ -32,6 +32,7 @@ class TickData(BaseData):
exchange: str exchange: str
datetime: datetime datetime: datetime
name: str = ""
volume: float = 0 volume: float = 0
last_price: float = 0 last_price: float = 0
last_volume: float = 0 last_volume: float = 0

View File

@ -315,6 +315,11 @@ class TickMonitor(BaseMonitor):
"cell": BaseCell, "cell": BaseCell,
"update": False "update": False
}, },
"name": {
"display": "名称",
"cell": BaseCell,
"update": True
},
"last_price": { "last_price": {
"display": "最新价", "display": "最新价",
"cell": BaseCell, "cell": BaseCell,