Merge pull request #1674 from vnpy/query_history

Query history
This commit is contained in:
vn.py 2019-05-08 15:59:49 +08:00 committed by GitHub
commit 9a5166d8ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 204 additions and 24 deletions

View File

@ -30,7 +30,7 @@ class Request(object):
params: dict,
data: dict,
headers: dict,
callback: Callable,
callback: Callable = None,
on_failed: Callable = None,
on_error: Callable = None,
extra: Any = None,
@ -258,7 +258,7 @@ class RestClient(object):
request.response = response
status_code = response.status_code
if status_code / 100 == 2: # 2xx都算成功尽管交易所都用200
if status_code // 100 == 2: # 2xx都算成功尽管交易所都用200
jsonBody = response.json()
request.callback(jsonBody, request)
request.status = RequestStatus.success
@ -284,3 +284,41 @@ class RestClient(object):
"""
url = self.url_base + path
return url
def request(
self,
method: str,
path: str,
params: dict = None,
data: dict = None,
headers: dict = None,
):
"""
Add a new request.
:param method: GET, POST, PUT, DELETE, QUERY
:param path:
:param params: dict for query string
:param data: dict for body
:param headers: dict for headers
:return: requests.Response
"""
request = Request(
method,
path,
params,
data,
headers
)
request = self.sign(request)
url = self.make_full_url(request.path)
response = requests.request(
request.method,
url,
headers=request.headers,
params=request.params,
data=request.data,
proxies=self.proxies,
)
return response

View File

@ -9,6 +9,7 @@ from vnpy.event import Event, EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.constant import Interval
from vnpy.trader.utility import extract_vt_symbol
from vnpy.trader.object import HistoryRequest
from vnpy.trader.rqdata import rqdata_client
from vnpy.trader.database import database_manager
from vnpy.app.cta_strategy import (
@ -338,10 +339,24 @@ class BacktesterEngine(BaseEngine):
self.write_log(f"{vt_symbol}-{interval}开始下载历史数据")
symbol, exchange = extract_vt_symbol(vt_symbol)
data = rqdata_client.query_bar(
symbol, exchange, Interval(interval), start, end
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
interval=Interval(interval),
start=start,
end=end
)
contract = self.main_engine.get_contract(vt_symbol)
# If history data provided in gateway, then query
if contract and contract.history_data:
data = self.main_engine.query_history(req, contract.gateway_name)
# Otherwise use RQData to query data
else:
data = rqdata_client.query_history(req)
if data:
database_manager.save_bar_data(data)
self.write_log(f"{vt_symbol}-{interval}历史数据下载完成")

View File

@ -16,6 +16,7 @@ from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.object import (
OrderRequest,
SubscribeRequest,
HistoryRequest,
LogData,
TickData,
BarData,
@ -136,9 +137,14 @@ class CtaEngine(BaseEngine):
"""
Query bar data from RQData.
"""
data = rqdata_client.query_bar(
symbol, exchange, interval, start, end
req = HistoryRequest(
symbol=symbol,
exchange=exchange,
interval=interval,
start=start,
end=end
)
data = rqdata_client.query_history(req)
return data
def process_tick_event(self, event: Event):

View File

@ -7,7 +7,7 @@ import hmac
import sys
import time
from copy import copy
from datetime import datetime
from datetime import datetime, timedelta
from threading import Lock
from urllib.parse import urlencode
@ -21,7 +21,8 @@ from vnpy.trader.constant import (
OrderType,
Product,
Status,
Offset
Offset,
Interval
)
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import (
@ -31,9 +32,11 @@ from vnpy.trader.object import (
PositionData,
AccountData,
ContractData,
BarData,
OrderRequest,
CancelRequest,
SubscribeRequest,
HistoryRequest
)
REST_HOST = "https://www.bitmex.com/api/v1"
@ -60,6 +63,18 @@ ORDERTYPE_VT2BITMEX = {
}
ORDERTYPE_BITMEX2VT = {v: k for k, v in ORDERTYPE_VT2BITMEX.items()}
INTERVAL_VT2BITMEX = {
Interval.MINUTE: "1m",
Interval.HOUR: "1h",
Interval.DAILY: "1d",
}
TIMEDELTA_MAP = {
Interval.MINUTE: timedelta(minutes=1),
Interval.HOUR: timedelta(hours=1),
Interval.DAILY: timedelta(days=1),
}
class BitmexGateway(BaseGateway):
"""
@ -124,6 +139,10 @@ class BitmexGateway(BaseGateway):
""""""
pass
def query_history(self, req: HistoryRequest):
""""""
return self.rest_api.query_history(req)
def close(self):
""""""
self.rest_api.stop()
@ -155,7 +174,7 @@ class BitmexRestApi(RestClient):
Generate BitMEX signature.
"""
# Sign
expires = int(time.time() + 5)
expires = int(time.time() + 30)
if request.params:
query = urlencode(request.params)
@ -279,6 +298,70 @@ class BitmexRestApi(RestClient):
on_error=self.on_cancel_order_error,
)
def query_history(self, req: HistoryRequest):
""""""
history = []
count = 750
start_time = req.start.isoformat()
while True:
# Create query params
params = {
"binSize": INTERVAL_VT2BITMEX[req.interval],
"symbol": req.symbol,
"count": count,
"startTime": start_time
}
# Add end time if specified
if req.end:
params["endTime"] = req.end.isoformat()
# Get response from server
resp = self.request(
"GET",
"/trade/bucketed",
params=params
)
# Break if request failed with other status code
if resp.status_code // 100 != 2:
msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}"
self.gateway.write_log(msg)
break
else:
data = resp.json()
for d in data:
dt = datetime.strptime(d["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ")
bar = BarData(
symbol=req.symbol,
exchange=req.exchange,
datetime=dt,
interval=req.interval,
volume=d["volume"],
open_price=d["open"],
high_price=d["high"],
low_price=d["low"],
close_price=d["close"],
gateway_name=self.gateway_name
)
history.append(bar)
begin = data[0]["timestamp"]
end = data[-1]["timestamp"]
msg = f"获取历史数据成功,{req.symbol} - {req.interval.value}{begin} - {end}"
self.gateway.write_log(msg)
# Break if total data count less than 750 (latest date collected)
if len(data) < 750:
break
# Update start time
start_time = bar.datetime + TIMEDELTA_MAP[req.interval]
return history
def on_send_order_failed(self, status_code: str, request: Request):
"""
Callback when sending order failed on server.
@ -615,6 +698,7 @@ class BitmexWebsocketApi(WebsocketClient):
size=d["lotSize"],
stop_supported=True,
net_position=True,
history_data=True,
gateway_name=self.gateway_name,
)

View File

@ -22,7 +22,13 @@ from .event import (
EVENT_LOG
)
from .gateway import BaseGateway
from .object import CancelRequest, LogData, OrderRequest, SubscribeRequest
from .object import (
CancelRequest,
LogData,
OrderRequest,
SubscribeRequest,
HistoryRequest
)
from .setting import SETTINGS
from .utility import get_folder_path
@ -174,6 +180,16 @@ class MainEngine:
if gateway:
gateway.cancel_order(req)
def query_history(self, req: HistoryRequest, gateway_name: str):
"""
Send cancel order request to a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
if gateway:
return gateway.query_history(req)
else:
return None
def close(self):
"""
Make sure every gateway and app is closed properly before

View File

@ -27,12 +27,13 @@ from .object import (
OrderRequest,
CancelRequest,
SubscribeRequest,
HistoryRequest
)
class BaseGateway(ABC):
"""
Abstract gateway class for creating gateways connection
Abstract gateway class for creating gateways connection
to different trading systems.
# How to implement a gateway:
@ -206,8 +207,6 @@ class BaseGateway(ABC):
Cancel an existing order.
implementation should finish the tasks blow:
* send request to server
"""
pass
@ -215,7 +214,6 @@ class BaseGateway(ABC):
def query_account(self):
"""
Query account balance.
"""
pass
@ -226,6 +224,12 @@ class BaseGateway(ABC):
"""
pass
def query_history(self, req: HistoryRequest):
"""
Query bar history data.
"""
pass
def get_default_setting(self):
"""
Return default setting dict.

View File

@ -236,6 +236,7 @@ class ContractData(BaseData):
min_volume: float = 1 # minimum trading volume of the contract
stop_supported: bool = False # whether server supports stop order
net_position: bool = False # whether gateway uses net position volume
history_data: bool = False # whether gateway provides bar history data
option_strike: float = 0
option_underlying: str = "" # vt_symbol of underlying contract
@ -310,3 +311,20 @@ class CancelRequest:
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
@dataclass
class HistoryRequest:
"""
Request sending to specific gateway for querying history data.
"""
symbol: str
exchange: Exchange
start: datetime
end: datetime = None
interval: Interval = None
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"

View File

@ -7,7 +7,7 @@ from rqdatac.services.get_price import get_price as rqdata_get_price
from .setting import SETTINGS
from .constant import Exchange, Interval
from .object import BarData
from .object import BarData, HistoryRequest
INTERVAL_VT2RQ = {
@ -89,17 +89,16 @@ class RqdataClient:
return rq_symbol
def query_bar(
self,
symbol: str,
exchange: Exchange,
interval: Interval,
start: datetime,
end: datetime
):
def query_history(self, req: HistoryRequest):
"""
Query bar data from RQData.
Query history bar data from RQData.
"""
symbol = req.symbol
exchange = req.exchange
interval = req.interval
start = req.start
end = req.end
rq_symbol = self.to_rq_symbol(symbol, exchange)
if rq_symbol not in self.symbols:
return None