[Mod] add query history data function

This commit is contained in:
vn.py 2019-10-21 15:38:28 +08:00
parent 7a2007ddbd
commit 2257f8318c
3 changed files with 165 additions and 115 deletions

View File

@ -18,5 +18,4 @@ ta-lib
ibapi ibapi
deap deap
pyzmq pyzmq
sortedcontainers
wmi wmi

View File

@ -3,13 +3,12 @@ import hashlib
import hmac import hmac
import time import time
import sys import sys
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta
from typing import Any, Dict, List, Callable from typing import Any, Dict, List, Callable
from threading import Lock from threading import Lock
from copy import copy from copy import copy
from requests import ConnectionError from requests import ConnectionError
# from sortedcontainers import SortedSet
from vnpy.api.websocket import WebsocketClient from vnpy.api.websocket import WebsocketClient
from vnpy.api.rest import Request, RestClient from vnpy.api.rest import Request, RestClient
@ -34,7 +33,7 @@ from vnpy.trader.object import (
CancelRequest, CancelRequest,
OrderRequest OrderRequest
) )
# from vnpy.trader.event import EVENT_TIMER from vnpy.trader.event import EVENT_TIMER
from vnpy.trader.gateway import BaseGateway, LocalOrderManager from vnpy.trader.gateway import BaseGateway, LocalOrderManager
@ -46,20 +45,9 @@ STATUS_BYBIT2VT = {
"Cancelled": Status.CANCELLED, "Cancelled": Status.CANCELLED,
"Rejected": Status.REJECTED, "Rejected": Status.REJECTED,
} }
STOP_ORDER_STATUS_BYBIT2VT = {
"Untriggered": Status.NOTTRADED,
"Triggered": Status.NOTTRADED,
# Active: triggered and placed.
# since price is market price, placed == AllTraded?
"Active": Status.ALLTRADED,
"Cancelled": Status.CANCELLED,
"Rejected": Status.REJECTED,
}
DIRECTION_VT2BYBIT = {Direction.LONG: "Buy", Direction.SHORT: "Sell"} DIRECTION_VT2BYBIT = {Direction.LONG: "Buy", Direction.SHORT: "Sell"}
DIRECTION_BYBIT2VT = {v: k for k, v in DIRECTION_VT2BYBIT.items()} DIRECTION_BYBIT2VT = {v: k for k, v in DIRECTION_VT2BYBIT.items()}
DIRECTION_BYBIT2VT.update({
"None": Direction.LONG
})
OPPOSITE_DIRECTION = { OPPOSITE_DIRECTION = {
Direction.LONG: Direction.SHORT, Direction.LONG: Direction.SHORT,
@ -78,12 +66,6 @@ INTERVAL_VT2BYBIT = {
Interval.DAILY: "D", Interval.DAILY: "D",
Interval.WEEKLY: "W", Interval.WEEKLY: "W",
} }
INTERVAL_VT2BYBIT_INT = {
Interval.MINUTE: 1,
Interval.HOUR: 60,
Interval.DAILY: 60 * 24,
Interval.WEEKLY: 60 * 24 * 7,
}
TIMEDELTA_MAP = { TIMEDELTA_MAP = {
Interval.MINUTE: timedelta(minutes=1), Interval.MINUTE: timedelta(minutes=1),
@ -99,17 +81,6 @@ WEBSOCKET_HOST = "wss://stream.bybit.com/realtime"
TESTNET_REST_HOST = "https://api-testnet.bybit.com" TESTNET_REST_HOST = "https://api-testnet.bybit.com"
TESTNET_WEBSOCKET_HOST = "wss://stream-testnet.bybit.com/realtime" TESTNET_WEBSOCKET_HOST = "wss://stream-testnet.bybit.com/realtime"
# asked from official developer
PRICE_TICKS = {
"BTCUSD": 0.5,
"ETHUSD": 0.05,
"EOSUSD": 0.001,
"XRPUSD": 0.0001,
}
utc_tz = timezone.utc
local_tz = datetime.now(timezone.utc).astimezone().tzinfo
class BybitGateway(BaseGateway): class BybitGateway(BaseGateway):
""" """
@ -130,7 +101,9 @@ class BybitGateway(BaseGateway):
"""Constructor""" """Constructor"""
super().__init__(event_engine, "BYBIT") super().__init__(event_engine, "BYBIT")
self.order_manager = LocalOrderManager(self) self.connect_time = datetime.now().strftime("%y%m%d%H%M%S")
self.order_manager = LocalOrderManager(self, self.connect_time)
self.rest_api = BybitRestApi(self) self.rest_api = BybitRestApi(self)
self.ws_api = BybitWebsocketApi(self) self.ws_api = BybitWebsocketApi(self)
@ -150,6 +123,8 @@ class BybitGateway(BaseGateway):
self.rest_api.connect(key, secret, server, proxy_host, proxy_port) self.rest_api.connect(key, secret, server, proxy_host, proxy_port)
self.ws_api.connect(key, secret, server, proxy_host, proxy_port) self.ws_api.connect(key, secret, server, proxy_host, proxy_port)
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def subscribe(self, req: SubscribeRequest): def subscribe(self, req: SubscribeRequest):
"""""" """"""
self.ws_api.subscribe(req) self.ws_api.subscribe(req)
@ -168,17 +143,21 @@ class BybitGateway(BaseGateway):
def query_position(self): def query_position(self):
"""""" """"""
pass self.rest_api.query_position()
def query_history(self, req: HistoryRequest): def query_history(self, req: HistoryRequest):
"""""" """"""
return self.rest_api.query_history() return self.rest_api.query_history(req)
def close(self): def close(self):
"""""" """"""
self.rest_api.stop() self.rest_api.stop()
self.ws_api.stop() self.ws_api.stop()
def process_timer_event(self, event):
""""""
self.query_position()
class BybitRestApi(RestClient): class BybitRestApi(RestClient):
""" """
@ -217,7 +196,8 @@ class BybitRestApi(RestClient):
api_params["recv_window"] = 30 * 1000 api_params["recv_window"] = 30 * 1000
api_params["timestamp"] = generate_timestamp(-1) api_params["timestamp"] = generate_timestamp(-1)
data2sign = "&".join([f"{k}={v}" for k, v in sorted(api_params.items())]) data2sign = "&".join(
[f"{k}={v}" for k, v in sorted(api_params.items())])
signature = sign(self.secret, data2sign.encode()) signature = sign(self.secret, data2sign.encode())
api_params["sign"] = signature api_params["sign"] = signature
@ -267,7 +247,7 @@ class BybitRestApi(RestClient):
} }
order = req.create_order_data(order_id, self.gateway_name) order = req.create_order_data(order_id, self.gateway_name)
order.time = datetime.now().isoformat()[11:19] order.time = str(datetime.now().isoformat())
# Only add price for limit order. # Only add price for limit order.
data["order_type"] = ORDER_TYPE_VT2BYBIT[req.type] data["order_type"] = ORDER_TYPE_VT2BYBIT[req.type]
@ -282,7 +262,7 @@ class BybitRestApi(RestClient):
on_error=self.on_send_order_error, on_error=self.on_send_order_error,
) )
self.gateway.on_order(order) self.order_manager.on_order(order)
return order.vt_orderid return order.vt_orderid
def cancel_order(self, req: CancelRequest): def cancel_order(self, req: CancelRequest):
@ -300,15 +280,71 @@ class BybitRestApi(RestClient):
callback=self.on_cancel_order callback=self.on_cancel_order
) )
def query_history( def query_history(self, req: HistoryRequest) -> List[BarData]:
self,
symbol: str,
interval: Interval,
start: int, # unix timestamp
limit: int = None,
) -> List[BarData]:
"""""" """"""
pass history = []
count = 200
start_time = int(req.start.timestamp())
while True:
# Create query params
params = {
"symbol": req.symbol,
"interval": INTERVAL_VT2BYBIT[req.interval],
"from": start_time,
"limit": count
}
# Get response from server
resp = self.request(
"GET",
"/v2/public/kline/list",
params=params
)
# Break if request failed with other status code
if resp.status_code // 100 != 2:
msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}"
self.gateway.write_log(msg)
break
else:
data = resp.json()
if not data:
msg = f"获取历史数据为空,开始时间:{start_time},数量:{count}"
break
buf = []
for d in data["result"]:
dt = datetime.fromtimestamp(d["open_time"])
bar = BarData(
symbol=req.symbol,
exchange=req.exchange,
datetime=dt,
interval=req.interval,
volume=int(d["volume"]),
open_price=float(d["open"]),
high_price=float(d["high"]),
low_price=float(d["low"]),
close_price=float(d["close"]),
gateway_name=self.gateway_name
)
buf.append(bar)
history.extend(buf)
begin = buf[0].datetime
end = buf[-1].datetime
msg = f"获取历史数据成功,{req.symbol} - {req.interval.value}{begin} - {end}"
self.gateway.write_log(msg)
# Break if last data collected
if len(buf) < count:
break
# Update start time
start_time = int((bar.datetime + TIMEDELTA_MAP[req.interval]).timestamp())
return history
def on_send_order_failed(self, status_code: int, request: Request): def on_send_order_failed(self, status_code: int, request: Request):
""" """
@ -316,7 +352,7 @@ class BybitRestApi(RestClient):
""" """
order = request.extra order = request.extra
order.status = Status.REJECTED order.status = Status.REJECTED
self.gateway.on_order(order) self.order_manager.on_order(order)
data = request.response.json() data = request.response.json()
error_msg = data["ret_msg"] error_msg = data["ret_msg"]
@ -332,7 +368,7 @@ class BybitRestApi(RestClient):
""" """
order = request.extra order = request.extra
order.status = Status.REJECTED order.status = Status.REJECTED
self.gateway.on_order(order) self.order_manager.on_order(order)
# Record exception if not ConnectionError # Record exception if not ConnectionError
if not issubclass(exception_type, ConnectionError): if not issubclass(exception_type, ConnectionError):
@ -340,6 +376,9 @@ class BybitRestApi(RestClient):
def on_send_order(self, data: dict, request: Request): def on_send_order(self, data: dict, request: Request):
"""""" """"""
if self.check_error("委托下单", data):
return
result = data["result"] result = data["result"]
self.order_manager.update_orderid_map( self.order_manager.update_orderid_map(
result["order_link_id"], result["order_link_id"],
@ -358,7 +397,8 @@ class BybitRestApi(RestClient):
def on_cancel_order(self, data: dict, request: Request): def on_cancel_order(self, data: dict, request: Request):
"""""" """"""
pass if self.check_error("委托下单", data):
return
def on_failed(self, status_code: int, request: Request): def on_failed(self, status_code: int, request: Request):
""" """
@ -412,14 +452,16 @@ class BybitRestApi(RestClient):
for d in data["result"]: for d in data["result"]:
contract = ContractData( contract = ContractData(
gateway_name=self.gateway_name,
symbol=d["name"], symbol=d["name"],
exchange=Exchange.BYBIT, exchange=Exchange.BYBIT,
name=d["name"], name=d["name"],
product=Product.FUTURES, product=Product.FUTURES,
size=1, size=1,
pricetick=d["price_filter"]["tick_size"], pricetick=d["price_filter"]["tick_size"],
min_volume=d["lot_size_filter"]["qty_step"] min_volume=d["lot_size_filter"]["qty_step"],
net_position=True,
history_data=True,
gateway_name=self.gateway_name
) )
self.gateway.on_contract(contract) self.gateway.on_contract(contract)
@ -450,7 +492,6 @@ class BybitRestApi(RestClient):
direction=Direction.NET, direction=Direction.NET,
volume=volume, volume=volume,
price=d["entry_price"], price=d["entry_price"],
pnl=d["unrealised_pnl"],
gateway_name=self.gateway_name gateway_name=self.gateway_name
) )
self.gateway.on_position(position) self.gateway.on_position(position)
@ -515,7 +556,7 @@ class BybitRestApi(RestClient):
time=d["created_at"], time=d["created_at"],
gateway_name=self.gateway_name gateway_name=self.gateway_name
) )
self.gateway.on_order(order) self.order_manager.on_order(order)
if result["current_page"] != result["last_page"]: if result["current_page"] != result["last_page"]:
self.query_order(result["current_page"] + 1) self.query_order(result["current_page"] + 1)
@ -523,24 +564,6 @@ class BybitRestApi(RestClient):
self.gateway.write_log("委托信息查询成功") self.gateway.write_log("委托信息查询成功")
class RawOrderInfo:
def __init__(self, data: dict):
self.id = data["id"]
self.price = data["price"]
self.side = data["side"] # "Buy", "Sell"
self.size = data.get("size", 0)
def __lt__(self, rhs: "RawOrderInfo"):
return self.price < rhs.price
def __eq__(self, rhs: "RawOrderInfo"):
return self.price == rhs.price
def __hash__(self):
return self.id # price is a string and we don"t known its precision
class BybitWebsocketApi(WebsocketClient): class BybitWebsocketApi(WebsocketClient):
"""""" """"""
@ -558,6 +581,7 @@ class BybitWebsocketApi(WebsocketClient):
self.callbacks: Dict[str, Callable] = {} self.callbacks: Dict[str, Callable] = {}
self.ticks: Dict[str, TickData] = {} self.ticks: Dict[str, TickData] = {}
self.subscribed: Dict[str, SubscribeRequest] = {}
self.symbol_bids: Dict[str, dict] = {} self.symbol_bids: Dict[str, dict] = {}
self.symbol_asks: Dict[str, dict] = {} self.symbol_asks: Dict[str, dict] = {}
@ -596,6 +620,8 @@ class BybitWebsocketApi(WebsocketClient):
""" """
Subscribe to tick data upate. Subscribe to tick data upate.
""" """
self.subscribed[req.symbol] = req
tick = TickData( tick = TickData(
symbol=req.symbol, symbol=req.symbol,
exchange=req.exchange, exchange=req.exchange,
@ -605,7 +631,8 @@ class BybitWebsocketApi(WebsocketClient):
) )
self.ticks[req.symbol] = tick self.ticks[req.symbol] = tick
self.subscribe_topic(f"instrument_info.100ms.{req.symbol}", self.on_tick) self.subscribe_topic(
f"instrument_info.100ms.{req.symbol}", self.on_tick)
self.subscribe_topic(f"orderBookL2_25.{req.symbol}", self.on_depth) self.subscribe_topic(f"orderBookL2_25.{req.symbol}", self.on_depth)
def on_connected(self): def on_connected(self):
@ -619,7 +646,6 @@ class BybitWebsocketApi(WebsocketClient):
def on_packet(self, packet: dict): def on_packet(self, packet: dict):
"""""" """"""
print(packet)
if "topic" not in packet: if "topic" not in packet:
op = packet["request"]["op"] op = packet["request"]["op"]
if op == "auth": if op == "auth":
@ -629,16 +655,6 @@ class BybitWebsocketApi(WebsocketClient):
callback = self.callbacks[channel] callback = self.callbacks[channel]
callback(packet) callback(packet)
# success = packet.get("success", None)
# topic = packet.get("topic", None)
# if success is not None:
# if success is False:
# self.gateway.write_log("Websocket API报错%s" % packet["ret_msg"])
# elif topic is not None:
# self.callbacks[topic](topic, packet)
# else:
# self.gateway.write_log(f"unkonwn packet: {packet}")
def on_error(self, exception_type: type, exception_value: Exception, tb): def on_error(self, exception_type: type, exception_value: Exception, tb):
"""""" """"""
msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" msg = f"触发异常,状态码:{exception_type},信息:{exception_value}"
@ -656,6 +672,9 @@ class BybitWebsocketApi(WebsocketClient):
self.subscribe_topic("order", self.on_order) self.subscribe_topic("order", self.on_order)
self.subscribe_topic("execution", self.on_trade) self.subscribe_topic("execution", self.on_trade)
self.subscribe_topic("position", self.on_position) self.subscribe_topic("position", self.on_position)
for req in self.subscribed.values():
self.subscribe(req)
else: else:
self.gateway.write_log("Websocket API登录失败") self.gateway.write_log("Websocket API登录失败")
@ -755,45 +774,81 @@ class BybitWebsocketApi(WebsocketClient):
tick.datetime = datetime.fromtimestamp(timestamp / 1_000_000) tick.datetime = datetime.fromtimestamp(timestamp / 1_000_000)
self.gateway.on_tick(copy(tick)) self.gateway.on_tick(copy(tick))
def on_trade(self, topic: str, data: dict): def on_trade(self, packet: dict):
"""""" """"""
for data in data["data"]: for d in packet["data"]:
order_id = data["order_link_id"] order_id = d["order_link_id"]
if not order_id: if not order_id:
order_id = data["order_id"] order_id = d["order_id"]
trade = TradeData( trade = TradeData(
symbol=data["symbol"], symbol=d["symbol"],
exchange=Exchange.BYBIT, exchange=Exchange.BYBIT,
orderid=order_id, orderid=order_id,
tradeid=data["exec_id"], tradeid=d["exec_id"],
direction=DIRECTION_BYBIT2VT[data["side"]], direction=DIRECTION_BYBIT2VT[d["side"]],
price=data["price"], price=d["price"],
volume=data["exec_qty"], volume=d["exec_qty"],
time=parse_datetime(data["trade_time"]), time=d["trade_time"],
gateway_name=self.gateway_name, gateway_name=self.gateway_name,
) )
self.gateway.on_trade(trade) self.gateway.on_trade(trade)
def on_order(self, topic: str, data: dict): def on_order(self, packet: dict):
"""""" """"""
for data in data["data"]: for d in packet["data"]:
print(data) sys_orderid = d["order_id"]
order = self.gateway.parse_order_data(data, "timestamp") order = self.order_manager.get_order_with_sys_orderid(sys_orderid)
self.gateway.on_order(copy(order)) if order:
order.traded = d["cum_exec_qty"]
order.status = STATUS_BYBIT2VT[d["order_status"]]
else:
# Use sys_orderid as local_orderid when
# order placed from other source
local_orderid = d["order_link_id"]
if not local_orderid:
local_orderid = sys_orderid
def on_position(self, topic: str, data: dict): self.order_manager.update_orderid_map(
local_orderid,
sys_orderid
)
order = OrderData(
symbol=d["symbol"],
exchange=Exchange.BYBIT,
orderid=local_orderid,
type=ORDER_TYPE_BYBIT2VT[d["order_type"]],
direction=DIRECTION_BYBIT2VT[d["side"]],
price=d["price"],
volume=d["qty"],
traded=d["cum_exec_qty"],
status=STATUS_BYBIT2VT[d["order_status"]],
time=d["timestamp"],
gateway_name=self.gateway_name
)
self.order_manager.on_order(order)
def on_position(self, packet: dict):
"""""" """"""
for data in data["data"]: for d in packet["data"]:
p1, p2 = self.gateway.parse_position_data(data) if d["side"] == "Buy":
self.gateway.on_position(p1) volume = d["size"]
if p2: else:
self.gateway.on_position(p2) volume = -d["size"]
position = PositionData(
def _parse_timestamp_e6(timestamp: int): symbol=d["symbol"],
return datetime.fromtimestamp(timestamp / 1_000_000, utc_tz).astimezone(tz=local_tz) exchange=Exchange.BYBIT,
direction=Direction.NET,
volume=volume,
price=d["entry_price"],
gateway_name=self.gateway_name
)
self.gateway.on_position(position)
def generate_timestamp(expire_after: float = 30) -> int: def generate_timestamp(expire_after: float = 30) -> int:
@ -809,7 +864,3 @@ def sign(secret: bytes, data: bytes) -> str:
return hmac.new( return hmac.new(
secret, data, digestmod=hashlib.sha256 secret, data, digestmod=hashlib.sha256
).hexdigest() ).hexdigest()
def parse_datetime(dt: str) -> str:
return dt[11:19]

View File

@ -265,12 +265,12 @@ class LocalOrderManager:
Management tool to support use local order id for trading. Management tool to support use local order id for trading.
""" """
def __init__(self, gateway: BaseGateway): def __init__(self, gateway: BaseGateway, order_prefix: str = ""):
"""""" """"""
self.gateway = gateway self.gateway = gateway
# For generating local orderid # For generating local orderid
self.order_prefix = "" self.order_prefix = order_prefix
self.order_count = 0 self.order_count = 0
self.orders = {} # local_orderid:order self.orders = {} # local_orderid:order
@ -296,7 +296,7 @@ class LocalOrderManager:
Generate a new local orderid. Generate a new local orderid.
""" """
self.order_count += 1 self.order_count += 1
local_orderid = str(self.order_count).rjust(8, "0") local_orderid = self.order_prefix + str(self.order_count).rjust(8, "0")
return local_orderid return local_orderid
def get_local_orderid(self, sys_orderid: str): def get_local_orderid(self, sys_orderid: str):