diff --git a/vnpy/api/rest/rest_client.py b/vnpy/api/rest/rest_client.py index 7ae2c386..7bd4568e 100644 --- a/vnpy/api/rest/rest_client.py +++ b/vnpy/api/rest/rest_client.py @@ -30,7 +30,7 @@ class Request(object): params: dict, data: dict, headers: dict, - callback: Callable, + callback: Callable = None, on_failed: Callable = None, on_error: Callable = None, extra: Any = None, @@ -258,7 +258,7 @@ class RestClient(object): request.response = response status_code = response.status_code - if status_code / 100 == 2: # 2xx都算成功,尽管交易所都用200 + if status_code // 100 == 2: # 2xx都算成功,尽管交易所都用200 jsonBody = response.json() request.callback(jsonBody, request) request.status = RequestStatus.success @@ -284,3 +284,41 @@ class RestClient(object): """ url = self.url_base + path return url + + def request( + self, + method: str, + path: str, + params: dict = None, + data: dict = None, + headers: dict = None, + ): + """ + Add a new request. + :param method: GET, POST, PUT, DELETE, QUERY + :param path: + :param params: dict for query string + :param data: dict for body + :param headers: dict for headers + :return: requests.Response + """ + request = Request( + method, + path, + params, + data, + headers + ) + request = self.sign(request) + + url = self.make_full_url(request.path) + + response = requests.request( + request.method, + url, + headers=request.headers, + params=request.params, + data=request.data, + proxies=self.proxies, + ) + return response diff --git a/vnpy/app/cta_backtester/engine.py b/vnpy/app/cta_backtester/engine.py index 573feefe..7ed94fbf 100644 --- a/vnpy/app/cta_backtester/engine.py +++ b/vnpy/app/cta_backtester/engine.py @@ -9,6 +9,7 @@ from vnpy.event import Event, EventEngine from vnpy.trader.engine import BaseEngine, MainEngine from vnpy.trader.constant import Interval from vnpy.trader.utility import extract_vt_symbol +from vnpy.trader.object import HistoryRequest from vnpy.trader.rqdata import rqdata_client from vnpy.trader.database import database_manager from vnpy.app.cta_strategy import ( @@ -338,10 +339,24 @@ class BacktesterEngine(BaseEngine): self.write_log(f"{vt_symbol}-{interval}开始下载历史数据") symbol, exchange = extract_vt_symbol(vt_symbol) - data = rqdata_client.query_bar( - symbol, exchange, Interval(interval), start, end + + req = HistoryRequest( + symbol=symbol, + exchange=exchange, + interval=Interval(interval), + start=start, + end=end ) + contract = self.main_engine.get_contract(vt_symbol) + + # If history data provided in gateway, then query + if contract and contract.history_data: + data = self.main_engine.query_history(req, contract.gateway_name) + # Otherwise use RQData to query data + else: + data = rqdata_client.query_history(req) + if data: database_manager.save_bar_data(data) self.write_log(f"{vt_symbol}-{interval}历史数据下载完成") diff --git a/vnpy/app/cta_strategy/engine.py b/vnpy/app/cta_strategy/engine.py index a28b4878..e17f49ec 100644 --- a/vnpy/app/cta_strategy/engine.py +++ b/vnpy/app/cta_strategy/engine.py @@ -16,6 +16,7 @@ from vnpy.trader.engine import BaseEngine, MainEngine from vnpy.trader.object import ( OrderRequest, SubscribeRequest, + HistoryRequest, LogData, TickData, BarData, @@ -136,9 +137,14 @@ class CtaEngine(BaseEngine): """ Query bar data from RQData. """ - data = rqdata_client.query_bar( - symbol, exchange, interval, start, end + req = HistoryRequest( + symbol=symbol, + exchange=exchange, + interval=interval, + start=start, + end=end ) + data = rqdata_client.query_history(req) return data def process_tick_event(self, event: Event): diff --git a/vnpy/gateway/bitmex/bitmex_gateway.py b/vnpy/gateway/bitmex/bitmex_gateway.py index 678f09c9..cb815936 100644 --- a/vnpy/gateway/bitmex/bitmex_gateway.py +++ b/vnpy/gateway/bitmex/bitmex_gateway.py @@ -7,7 +7,7 @@ import hmac import sys import time from copy import copy -from datetime import datetime +from datetime import datetime, timedelta from threading import Lock from urllib.parse import urlencode @@ -21,7 +21,8 @@ from vnpy.trader.constant import ( OrderType, Product, Status, - Offset + Offset, + Interval ) from vnpy.trader.gateway import BaseGateway from vnpy.trader.object import ( @@ -31,9 +32,11 @@ from vnpy.trader.object import ( PositionData, AccountData, ContractData, + BarData, OrderRequest, CancelRequest, SubscribeRequest, + HistoryRequest ) REST_HOST = "https://www.bitmex.com/api/v1" @@ -60,6 +63,18 @@ ORDERTYPE_VT2BITMEX = { } ORDERTYPE_BITMEX2VT = {v: k for k, v in ORDERTYPE_VT2BITMEX.items()} +INTERVAL_VT2BITMEX = { + Interval.MINUTE: "1m", + Interval.HOUR: "1h", + Interval.DAILY: "1d", +} + +TIMEDELTA_MAP = { + Interval.MINUTE: timedelta(minutes=1), + Interval.HOUR: timedelta(hours=1), + Interval.DAILY: timedelta(days=1), +} + class BitmexGateway(BaseGateway): """ @@ -124,6 +139,10 @@ class BitmexGateway(BaseGateway): """""" pass + def query_history(self, req: HistoryRequest): + """""" + return self.rest_api.query_history(req) + def close(self): """""" self.rest_api.stop() @@ -155,7 +174,7 @@ class BitmexRestApi(RestClient): Generate BitMEX signature. """ # Sign - expires = int(time.time() + 5) + expires = int(time.time() + 30) if request.params: query = urlencode(request.params) @@ -279,6 +298,70 @@ class BitmexRestApi(RestClient): on_error=self.on_cancel_order_error, ) + def query_history(self, req: HistoryRequest): + """""" + history = [] + count = 750 + start_time = req.start.isoformat() + + while True: + # Create query params + params = { + "binSize": INTERVAL_VT2BITMEX[req.interval], + "symbol": req.symbol, + "count": count, + "startTime": start_time + } + + # Add end time if specified + if req.end: + params["endTime"] = req.end.isoformat() + + # Get response from server + resp = self.request( + "GET", + "/trade/bucketed", + params=params + ) + + # Break if request failed with other status code + if resp.status_code // 100 != 2: + msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}" + self.gateway.write_log(msg) + break + else: + data = resp.json() + + for d in data: + dt = datetime.strptime(d["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") + bar = BarData( + symbol=req.symbol, + exchange=req.exchange, + datetime=dt, + interval=req.interval, + volume=d["volume"], + open_price=d["open"], + high_price=d["high"], + low_price=d["low"], + close_price=d["close"], + gateway_name=self.gateway_name + ) + history.append(bar) + + begin = data[0]["timestamp"] + end = data[-1]["timestamp"] + msg = f"获取历史数据成功,{req.symbol} - {req.interval.value},{begin} - {end}" + self.gateway.write_log(msg) + + # Break if total data count less than 750 (latest date collected) + if len(data) < 750: + break + + # Update start time + start_time = bar.datetime + TIMEDELTA_MAP[req.interval] + + return history + def on_send_order_failed(self, status_code: str, request: Request): """ Callback when sending order failed on server. @@ -615,6 +698,7 @@ class BitmexWebsocketApi(WebsocketClient): size=d["lotSize"], stop_supported=True, net_position=True, + history_data=True, gateway_name=self.gateway_name, ) diff --git a/vnpy/trader/engine.py b/vnpy/trader/engine.py index 12e3c3d3..96246487 100644 --- a/vnpy/trader/engine.py +++ b/vnpy/trader/engine.py @@ -22,7 +22,13 @@ from .event import ( EVENT_LOG ) from .gateway import BaseGateway -from .object import CancelRequest, LogData, OrderRequest, SubscribeRequest +from .object import ( + CancelRequest, + LogData, + OrderRequest, + SubscribeRequest, + HistoryRequest +) from .setting import SETTINGS from .utility import get_folder_path @@ -174,6 +180,16 @@ class MainEngine: if gateway: gateway.cancel_order(req) + def query_history(self, req: HistoryRequest, gateway_name: str): + """ + Send cancel order request to a specific gateway. + """ + gateway = self.get_gateway(gateway_name) + if gateway: + return gateway.query_history(req) + else: + return None + def close(self): """ Make sure every gateway and app is closed properly before diff --git a/vnpy/trader/gateway.py b/vnpy/trader/gateway.py index edd2c6ab..ac2cd704 100644 --- a/vnpy/trader/gateway.py +++ b/vnpy/trader/gateway.py @@ -27,12 +27,13 @@ from .object import ( OrderRequest, CancelRequest, SubscribeRequest, + HistoryRequest ) class BaseGateway(ABC): """ - Abstract gateway class for creating gateways connection + Abstract gateway class for creating gateways connection to different trading systems. # How to implement a gateway: @@ -206,8 +207,6 @@ class BaseGateway(ABC): Cancel an existing order. implementation should finish the tasks blow: * send request to server - - """ pass @@ -215,7 +214,6 @@ class BaseGateway(ABC): def query_account(self): """ Query account balance. - """ pass @@ -226,6 +224,12 @@ class BaseGateway(ABC): """ pass + def query_history(self, req: HistoryRequest): + """ + Query bar history data. + """ + pass + def get_default_setting(self): """ Return default setting dict. diff --git a/vnpy/trader/object.py b/vnpy/trader/object.py index 734093cb..952c0360 100644 --- a/vnpy/trader/object.py +++ b/vnpy/trader/object.py @@ -236,6 +236,7 @@ class ContractData(BaseData): min_volume: float = 1 # minimum trading volume of the contract stop_supported: bool = False # whether server supports stop order net_position: bool = False # whether gateway uses net position volume + history_data: bool = False # whether gateway provides bar history data option_strike: float = 0 option_underlying: str = "" # vt_symbol of underlying contract @@ -310,3 +311,20 @@ class CancelRequest: def __post_init__(self): """""" self.vt_symbol = f"{self.symbol}.{self.exchange.value}" + + +@dataclass +class HistoryRequest: + """ + Request sending to specific gateway for querying history data. + """ + + symbol: str + exchange: Exchange + start: datetime + end: datetime = None + interval: Interval = None + + def __post_init__(self): + """""" + self.vt_symbol = f"{self.symbol}.{self.exchange.value}" diff --git a/vnpy/trader/rqdata.py b/vnpy/trader/rqdata.py index 965d32c7..f3e650bc 100644 --- a/vnpy/trader/rqdata.py +++ b/vnpy/trader/rqdata.py @@ -7,7 +7,7 @@ from rqdatac.services.get_price import get_price as rqdata_get_price from .setting import SETTINGS from .constant import Exchange, Interval -from .object import BarData +from .object import BarData, HistoryRequest INTERVAL_VT2RQ = { @@ -89,17 +89,16 @@ class RqdataClient: return rq_symbol - def query_bar( - self, - symbol: str, - exchange: Exchange, - interval: Interval, - start: datetime, - end: datetime - ): + def query_history(self, req: HistoryRequest): """ - Query bar data from RQData. + Query history bar data from RQData. """ + symbol = req.symbol + exchange = req.exchange + interval = req.interval + start = req.start + end = req.end + rq_symbol = self.to_rq_symbol(symbol, exchange) if rq_symbol not in self.symbols: return None