From f912207d3f8c4856836e0f742db34142a77505fe Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Sun, 13 Jan 2019 13:23:10 +0800 Subject: [PATCH] [Add] Rest and websocket client --- requirements.txt | 3 +- vnpy/api/rest/__init__.py | 1 + vnpy/api/rest/rest_client.py | 278 +++++++++++++++++++++++++ vnpy/api/websocket/__init__.py | 1 + vnpy/api/websocket/websocket_client.py | 256 +++++++++++++++++++++++ vnpy/gateway/futu/futu_gateway.py | 8 + vnpy/gateway/ib/ib_gateway.py | 9 + vnpy/trader/object.py | 1 + vnpy/trader/ui/widget.py | 5 + 9 files changed, 561 insertions(+), 1 deletion(-) create mode 100644 vnpy/api/rest/__init__.py create mode 100644 vnpy/api/rest/rest_client.py create mode 100644 vnpy/api/websocket/__init__.py create mode 100644 vnpy/api/websocket/websocket_client.py diff --git a/requirements.txt b/requirements.txt index 1bfc5291..368c612a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ PyQt5 qdarkstyle -futu-api \ No newline at end of file +futu-api +websocket-client \ No newline at end of file diff --git a/vnpy/api/rest/__init__.py b/vnpy/api/rest/__init__.py new file mode 100644 index 00000000..f1e7410c --- /dev/null +++ b/vnpy/api/rest/__init__.py @@ -0,0 +1 @@ +from .RestClient import Request, RequestStatus, RestClient diff --git a/vnpy/api/rest/rest_client.py b/vnpy/api/rest/rest_client.py new file mode 100644 index 00000000..54bf0a06 --- /dev/null +++ b/vnpy/api/rest/rest_client.py @@ -0,0 +1,278 @@ +# encoding: UTF-8 + +import sys +import traceback +from queue import Empty, Queue +from datetime import datetime +from multiprocessing.dummy import Pool + +import requests +from enum import Enum +from typing import Any, Callable, Optional + + +class RequestStatus(Enum): + ready = 0 # 刚刚构建 + success = 1 # 请求成功 code == 2xx + failed = 2 + error = 3 # 发生错误 网络错误、json解析错误,等等 + + +class Request(object): + """ + 表示一个内部的Request,用于状态查询 + """ + + def __init__(self, method, path, params, data, headers, callback): + self.method = method # type: str + self.path = path # type: str + self.callback = callback # type: callable + self.params = params # type: dict #, bytes, str + self.data = data # type: dict #, bytes, str + self.headers = headers # type: dict + + self.onFailed = None # type: callable + self.onError = None # type: callable + self.extra = None # type: Any + + self.response = None # type: requests.Response + self.status = RequestStatus.ready # type: RequestStatus + + def __str__(self): + if self.response is None: + statusCode = 'terminated' + else: + statusCode = self.response.status_code + # todo: encoding error + return ( + "reuqest : {} {} {} because {}: \n" + "headers: {}\n" + "params: {}\n" + "data: {}\n" + "response:" + "{}\n".format( + self.method, + self.path, + self.status.name, + statusCode, + self.headers, + self.params, + self.data, + '' if self.response is None else self.response.text + ) + ) + + +class RestClient(object): + """ + HTTP 客户端。目前是为了对接各种RESTfulAPI而设计的。 + + 如果需要给请求加上签名,请设置beforeRequest, 函数类型请参考defaultBeforeRequest。 + 如果需要处理非2xx的请求,请设置onFailed,函数类型请参考defaultOnFailed。 + 如果每一个请求的非2xx返回都需要单独处理,使用addReq函数的onFailed参数 + 如果捕获Python内部错误,例如网络连接失败等等,请设置onError,函数类型请参考defaultOnError + """ + + def __init__(self): + """ + """ + self.urlBase = None # type: str + self._active = False + + self._queue = Queue() + self._pool = None # type: Pool + + def init(self, urlBase): + """ + 初始化 + :param urlBase: 路径前缀。 例如'https://www.bitmex.com/api/v1/' + """ + self.urlBase = urlBase + + def _createSession(self): + """""" + return requests.session() + + def start(self, n=3): + """启动""" + if self._active: + return + + self._active = True + self._pool = Pool(n) + self._pool.apply_async(self._run) + + def stop(self): + """ + 强制停止运行,未发出的请求都会被暂停(仍处于队列中) + :return: + """ + self._active = False + + def join(self): + """ + 等待所有请求处理结束 + 如果要并确保RestClient的退出,请在调用stop之后紧接着调用join。 + 如果只是要确保所有的请求都处理完,直接调用join即可。 + :return: + """ + self._queue.join() + + + def addRequest(self, + method, # type: str + path, # type: str + callback, # type: Callable[[dict, Request], Any] + params=None, # type: dict + data=None, # type: dict + headers=None, # type: dict + onFailed=None, # type: Callable[[int, Request], Any] + onError=None, # type: Callable[[type, Exception, traceback, Request], Any] + extra=None # type: Any + ): # type: (...)->Request + """ + 发送一个请求 + :param method: GET, POST, PUT, DELETE, QUERY + :param path: + :param callback: 请求成功后的回调(状态吗为2xx时认为请求成功) type: (dict, Request) + :param params: dict for query string + :param data: dict for body + :param headers: dict for headers + :param onFailed: 请求失败后的回调(状态吗不为2xx时认为请求失败)(如果指定该值,默认的onFailed将不会被调用) type: (code, dict, Request) + :param onError: 请求出现Python错误后的回调(如果指定该值,默认的onError将不会被调用) type: (etype, evalue, tb, Request) + :param extra: 返回值的extra字段会被设置为这个值。当然,你也可以在函数调用之后再设置这个字段。 + :return: Request + """ + + request = Request(method, path, params, data, headers, callback) + request.extra = extra + request.onFailed = onFailed + request.onError = onError + self._queue.put(request) + return request + + def _run(self): + try: + session = self._createSession() + while self._active: + try: + request = self._queue.get(timeout=1) + try: + self._processRequest(request, session) + finally: + self._queue.task_done() + except Empty: + pass + except: + et, ev, tb = sys.exc_info() + self.onError(et, ev, tb, None) + + def sign(self, request): # type: (Request)->Request + """ + 所有请求在发送之前都会经过这个函数 + 签名之类的前奏可以在这里面实现 + 需要对request进行什么修改就做什么修改吧 + @:return (request) + """ + return request + + def onFailed(self, httpStatusCode, request): # type:(int, Request)->None + """ + 请求失败处理函数(HttpStatusCode!=2xx). + 默认行为是打印到stderr + """ + sys.stderr.write(str(request)) + + + def onError(self, + exceptionType, # type: type + exceptionValue, # type: Exception + tb, + request # type: Optional[Request] + ): + """ + Python内部错误处理:默认行为是仍给excepthook + :param request 如果是在处理请求的时候出错,它的值就是对应的Request,否则为None + """ + sys.stderr.write( + self.exceptionDetail(exceptionType, + exceptionValue, + tb, + request) + ) + sys.excepthook(exceptionType, exceptionValue, tb) + + + def exceptionDetail(self, + exceptionType, # type: type + exceptionValue, # type: Exception + tb, + request # type: Optional[Request] + ): + text = "[{}]: Unhandled RestClient Error:{}\n".format( + datetime.now().isoformat(), + exceptionType + ) + text += "request:{}\n".format(request) + text += "Exception trace: \n" + text += "".join( + traceback.format_exception( + exceptionType, + exceptionValue, + tb, + ) + ) + return text + + def _processRequest( + self, + request, + session + ): # type: (Request, requests.Session)->None + """ + 用于内部:将请求发送出去 + """ + # noinspection PyBroadException + try: + request = self.sign(request) + + url = self.makeFullUrl(request.path) + + response = session.request( + request.method, + url, + headers=request.headers, + params=request.params, + data=request.data + ) + request.response = response + + httpStatusCode = response.status_code + if httpStatusCode / 100 == 2: # 2xx都算成功,尽管交易所都用200 + jsonBody = response.json() + request.callback(jsonBody, request) + request.status = RequestStatus.success + else: + request.status = RequestStatus.failed + + if request.onFailed: + request.onFailed(httpStatusCode, request) + else: + self.onFailed(httpStatusCode, request) + except: + request.status = RequestStatus.error + t, v, tb = sys.exc_info() + if request.onError: + request.onError(t, v, tb, request) + else: + self.onError(t, v, tb, request) + + def makeFullUrl(self, path): + """ + 将相对路径补充成绝对路径: + eg: makeFullUrl('/get') == 'http://xxxxx/get' + :param path: + :return: + """ + url = self.urlBase + path + return url diff --git a/vnpy/api/websocket/__init__.py b/vnpy/api/websocket/__init__.py new file mode 100644 index 00000000..3accc1ac --- /dev/null +++ b/vnpy/api/websocket/__init__.py @@ -0,0 +1 @@ +from .WebsocketClient import WebsocketClient diff --git a/vnpy/api/websocket/websocket_client.py b/vnpy/api/websocket/websocket_client.py new file mode 100644 index 00000000..8f3a2f74 --- /dev/null +++ b/vnpy/api/websocket/websocket_client.py @@ -0,0 +1,256 @@ +# encoding: UTF-8 + +import json +import ssl +import sys +import time +import traceback +from datetime import datetime +from threading import Lock, Thread + +import websocket + + +class WebsocketClient(object): + """ + Websocket API + + 实例化之后,应调用start开始后台线程。调用start()函数会自动连接websocket。 + 若要终止后台线程,请调用stop()。 stop()函数会顺便断开websocket。 + + 该类默认打包方式为json,若从服务器返回的数据不为json,则会触发onError。 + + 可以覆盖以下回调: + onConnected + onDisconnected + onPacket # 数据回调,只有在返回的数据帧为text并且内容为json时才会回调 + onError + + 当然,为了不让用户随意自定义,用自己的init函数覆盖掉原本的init(host)也是个不错的选择。 + + 关于ping: + 在调用start()之后,该类每60s会自动发送一个ping帧至服务器。 + """ + + def __init__(self): + """Constructor""" + self.host = None # type: str + + self._ws_lock = Lock() + self._ws = None # type: websocket.WebSocket + + self._workerThread = None # type: Thread + self._pingThread = None # type: Thread + self._active = False + + # for debugging: + self._lastSentText = None + self._lastReceivedText = None + + def init(self, host): + self.host = host + + def start(self): + """ + 启动 + :note 注意:启动之后不能立即发包,需要等待websocket连接成功。 + websocket连接成功之后会响应onConnected函数 + """ + + self._active = True + self._workerThread = Thread(target=self._run) + self._workerThread.start() + + self._pingThread = Thread(target=self._runPing) + self._pingThread.start() + + def stop(self): + """ + 关闭 + @note 不能从工作线程,也就是websocket的回调中调用 + """ + self._active = False + self._disconnect() + + def join(self): + """ + 等待所有工作线程退出 + 正确调用方式:先stop()后join() + """ + self._pingThread.join() + self._workerThread.join() + + def sendPacket(self, dictObj): # type: (dict)->None + """发出请求:相当于sendText(json.dumps(dictObj))""" + text = json.dumps(dictObj) + self._recordLastSentText(text) + return self._getWs().send(text, opcode=websocket.ABNF.OPCODE_TEXT) + + def sendText(self, text): # type: (str)->None + """发送文本数据""" + return self._getWs().send(text, opcode=websocket.ABNF.OPCODE_TEXT) + + def sendBinary(self, data): # type: (bytes)->None + """发送字节数据""" + return self._getWs().send_binary(data) + + def _reconnect(self): + """重连""" + if self._active: + self._disconnect() + self._connect() + + def _createConnection(self, *args, **kwargs): + return websocket.create_connection(*args, **kwargs) + + def _connect(self): + """""" + self._ws = self._createConnection( + self.host, + sslopt={'cert_reqs': ssl.CERT_NONE} + ) + self.onConnected() + + def _disconnect(self): + """ + 断开连接 + """ + with self._ws_lock: + if self._ws: + self._ws.close() + self._ws = None + + def _getWs(self): + with self._ws_lock: + return self._ws + + def _run(self): + """ + 运行,直到stop()被调用 + """ + try: + self._connect() + + # todo: onDisconnect + while self._active: + try: + ws = self._getWs() + if ws: + text = ws.recv() + if not text: # recv在阻塞的时候ws被关闭 + self._reconnect() + continue + self._recordLastReceivedText(text) + try: + data = self.unpackData(text) + except ValueError as e: + print('websocket unable to parse data: ' + text) + raise e + self.onPacket(data) + except websocket.WebSocketConnectionClosedException: # 在调用recv之前ws就被关闭了 + self._reconnect() + except: # Python内部错误(onPacket内出错) + et, ev, tb = sys.exc_info() + self.onError(et, ev, tb) + self._reconnect() + except: + et, ev, tb = sys.exc_info() + self.onError(et, ev, tb) + self._reconnect() + + @staticmethod + def unpackData(data): + """ + 解密数据,默认使用json解密为dict + 解密后的数据将会传入onPacket + 如果需要使用不同的解密方式,就重载这个函数。 + :param data 收到的数据,可能是text frame,也可能是binary frame, 目前并没有区分这两者 + """ + return json.loads(data) + + def _runPing(self): + while self._active: + try: + self._ping() + except: + et, ev, tb = sys.exc_info() + # todo: just log this, notifying user is not necessary + self.onError(et, ev, tb) + self._reconnect() + for i in range(60): + if not self._active: + break + time.sleep(1) + + def _ping(self): + ws = self._getWs() + if ws: + ws.send('ping', websocket.ABNF.OPCODE_PING) + + @staticmethod + def onConnected(): + """ + 连接成功回调 + """ + pass + + @staticmethod + def onDisconnected(): + """ + 连接断开回调 + """ + pass + + @staticmethod + def onPacket(packet): + """ + 数据回调。 + 只有在数据为json包的时候才会触发这个回调 + @:param data: dict + @:return: + """ + pass + + def onError(self, exceptionType, exceptionValue, tb): + """ + Python错误回调 + todo: 以后详细的错误信息最好记录在文件里,用uuid来联系/区分具体错误 + """ + sys.stderr.write( + self.exceptionDetail(exceptionType, + exceptionValue, + tb) + ) + + # 丢给默认的错误处理函数(所以如果不重载onError,一般的结果是程序会崩溃) + return sys.excepthook(exceptionType, exceptionValue, tb) + + def exceptionDetail(self, exceptionType, exceptionValue, tb): + """打印详细的错误信息""" + text = "[{}]: Unhandled WebSocket Error:{}\n".format( + datetime.now().isoformat(), + exceptionType + ) + text += "LastSentText:\n{}\n".format(self._lastSentText) + text += "LastReceivedText:\n{}\n".format(self._lastReceivedText) + text += "Exception trace: \n" + text += "".join( + traceback.format_exception( + exceptionType, + exceptionValue, + tb, + ) + ) + return text + + def _recordLastSentText(self, text): + """ + 用于Debug: 记录最后一次发送出去的text + """ + self._lastSentText = text[:1000] + + def _recordLastReceivedText(self, text): + """ + 用于Debug: 记录最后一次发送出去的text + """ + self._lastReceivedText = text[:1000] \ No newline at end of file diff --git a/vnpy/gateway/futu/futu_gateway.py b/vnpy/gateway/futu/futu_gateway.py index 29d7a887..458a96f2 100644 --- a/vnpy/gateway/futu/futu_gateway.py +++ b/vnpy/gateway/futu/futu_gateway.py @@ -116,6 +116,8 @@ class FutuGateway(BaseGateway): self.ticks = {} self.trades = set() + self.contracts = {} + self.thread = Thread(target=self.query_data) # For query function. @@ -311,6 +313,7 @@ class FutuGateway(BaseGateway): gateway_name=self.gateway_name ) self.on_contract(contract) + self.contracts[contract.vt_symbol] = contract self.write_log("合约信息查询成功") @@ -401,6 +404,11 @@ class FutuGateway(BaseGateway): gateway_name=self.gateway_name ) self.ticks[code] = tick + + contract = self.contracts.get(tick.vt_symbol, None) + if contract: + tick.name = contract.name + return tick def process_quote(self, data): diff --git a/vnpy/gateway/ib/ib_gateway.py b/vnpy/gateway/ib/ib_gateway.py index 35e4367d..ea62ea2c 100644 --- a/vnpy/gateway/ib/ib_gateway.py +++ b/vnpy/gateway/ib/ib_gateway.py @@ -197,6 +197,7 @@ class IbApi(EWrapper): self.ticks = {} self.orders = {} self.accounts = {} + self.contracts = {} self.tick_exchange = {} @@ -265,6 +266,11 @@ class IbApi(EWrapper): name = TICKFIELD_IB2VT[tickType] setattr(tick, name, price) + # Update name into tick data. + contract = self.contracts.get(tick.vt_symbol, None) + if contract: + tick.name = contract.name + # Forex and spot product of IDEALPRO has no tick time and last price. # We need to calculate locally. exchange = self.tick_exchange[reqId] @@ -467,8 +473,11 @@ class IbApi(EWrapper): pricetick=contractDetails.minTick, gateway_name=self.gateway_name ) + self.gateway.on_contract(contract) + self.contracts[contract.vt_symbol] = contract + def execDetails(self, reqId: int, contract: Contract, execution: Execution): """ Callback of trade data update. diff --git a/vnpy/trader/object.py b/vnpy/trader/object.py index 4f2601e4..26589f7d 100644 --- a/vnpy/trader/object.py +++ b/vnpy/trader/object.py @@ -32,6 +32,7 @@ class TickData(BaseData): exchange: str datetime: datetime + name: str = "" volume: float = 0 last_price: float = 0 last_volume: float = 0 diff --git a/vnpy/trader/ui/widget.py b/vnpy/trader/ui/widget.py index a49308fe..2fb3a94a 100644 --- a/vnpy/trader/ui/widget.py +++ b/vnpy/trader/ui/widget.py @@ -315,6 +315,11 @@ class TickMonitor(BaseMonitor): "cell": BaseCell, "update": False }, + "name": { + "display": "名称", + "cell": BaseCell, + "update": True + }, "last_price": { "display": "最新价", "cell": BaseCell,