From 2257f8318c2db19f38ce51deeb9c95458880a8e8 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Mon, 21 Oct 2019 15:38:28 +0800 Subject: [PATCH] [Mod] add query history data function --- requirements.txt | 1 - vnpy/gateway/bybit/bybit_gateway.py | 273 +++++++++++++++++----------- vnpy/trader/gateway.py | 6 +- 3 files changed, 165 insertions(+), 115 deletions(-) diff --git a/requirements.txt b/requirements.txt index 26a5e53b..3d7a0271 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,5 +18,4 @@ ta-lib ibapi deap pyzmq -sortedcontainers wmi diff --git a/vnpy/gateway/bybit/bybit_gateway.py b/vnpy/gateway/bybit/bybit_gateway.py index c5de04bf..fd9b1ddd 100644 --- a/vnpy/gateway/bybit/bybit_gateway.py +++ b/vnpy/gateway/bybit/bybit_gateway.py @@ -3,13 +3,12 @@ import hashlib import hmac import time import sys -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from typing import Any, Dict, List, Callable from threading import Lock from copy import copy from requests import ConnectionError -# from sortedcontainers import SortedSet from vnpy.api.websocket import WebsocketClient from vnpy.api.rest import Request, RestClient @@ -34,7 +33,7 @@ from vnpy.trader.object import ( CancelRequest, OrderRequest ) -# from vnpy.trader.event import EVENT_TIMER +from vnpy.trader.event import EVENT_TIMER from vnpy.trader.gateway import BaseGateway, LocalOrderManager @@ -46,20 +45,9 @@ STATUS_BYBIT2VT = { "Cancelled": Status.CANCELLED, "Rejected": Status.REJECTED, } -STOP_ORDER_STATUS_BYBIT2VT = { - "Untriggered": Status.NOTTRADED, - "Triggered": Status.NOTTRADED, - # Active: triggered and placed. - # since price is market price, placed == AllTraded? - "Active": Status.ALLTRADED, - "Cancelled": Status.CANCELLED, - "Rejected": Status.REJECTED, -} + DIRECTION_VT2BYBIT = {Direction.LONG: "Buy", Direction.SHORT: "Sell"} DIRECTION_BYBIT2VT = {v: k for k, v in DIRECTION_VT2BYBIT.items()} -DIRECTION_BYBIT2VT.update({ - "None": Direction.LONG -}) OPPOSITE_DIRECTION = { Direction.LONG: Direction.SHORT, @@ -78,12 +66,6 @@ INTERVAL_VT2BYBIT = { Interval.DAILY: "D", Interval.WEEKLY: "W", } -INTERVAL_VT2BYBIT_INT = { - Interval.MINUTE: 1, - Interval.HOUR: 60, - Interval.DAILY: 60 * 24, - Interval.WEEKLY: 60 * 24 * 7, -} TIMEDELTA_MAP = { Interval.MINUTE: timedelta(minutes=1), @@ -99,17 +81,6 @@ WEBSOCKET_HOST = "wss://stream.bybit.com/realtime" TESTNET_REST_HOST = "https://api-testnet.bybit.com" TESTNET_WEBSOCKET_HOST = "wss://stream-testnet.bybit.com/realtime" -# asked from official developer -PRICE_TICKS = { - "BTCUSD": 0.5, - "ETHUSD": 0.05, - "EOSUSD": 0.001, - "XRPUSD": 0.0001, -} - -utc_tz = timezone.utc -local_tz = datetime.now(timezone.utc).astimezone().tzinfo - class BybitGateway(BaseGateway): """ @@ -130,7 +101,9 @@ class BybitGateway(BaseGateway): """Constructor""" super().__init__(event_engine, "BYBIT") - self.order_manager = LocalOrderManager(self) + self.connect_time = datetime.now().strftime("%y%m%d%H%M%S") + self.order_manager = LocalOrderManager(self, self.connect_time) + self.rest_api = BybitRestApi(self) self.ws_api = BybitWebsocketApi(self) @@ -150,6 +123,8 @@ class BybitGateway(BaseGateway): self.rest_api.connect(key, secret, server, proxy_host, proxy_port) self.ws_api.connect(key, secret, server, proxy_host, proxy_port) + self.event_engine.register(EVENT_TIMER, self.process_timer_event) + def subscribe(self, req: SubscribeRequest): """""" self.ws_api.subscribe(req) @@ -168,17 +143,21 @@ class BybitGateway(BaseGateway): def query_position(self): """""" - pass + self.rest_api.query_position() def query_history(self, req: HistoryRequest): """""" - return self.rest_api.query_history() + return self.rest_api.query_history(req) def close(self): """""" self.rest_api.stop() self.ws_api.stop() + def process_timer_event(self, event): + """""" + self.query_position() + class BybitRestApi(RestClient): """ @@ -217,7 +196,8 @@ class BybitRestApi(RestClient): api_params["recv_window"] = 30 * 1000 api_params["timestamp"] = generate_timestamp(-1) - data2sign = "&".join([f"{k}={v}" for k, v in sorted(api_params.items())]) + data2sign = "&".join( + [f"{k}={v}" for k, v in sorted(api_params.items())]) signature = sign(self.secret, data2sign.encode()) api_params["sign"] = signature @@ -267,7 +247,7 @@ class BybitRestApi(RestClient): } order = req.create_order_data(order_id, self.gateway_name) - order.time = datetime.now().isoformat()[11:19] + order.time = str(datetime.now().isoformat()) # Only add price for limit order. data["order_type"] = ORDER_TYPE_VT2BYBIT[req.type] @@ -282,7 +262,7 @@ class BybitRestApi(RestClient): on_error=self.on_send_order_error, ) - self.gateway.on_order(order) + self.order_manager.on_order(order) return order.vt_orderid def cancel_order(self, req: CancelRequest): @@ -300,15 +280,71 @@ class BybitRestApi(RestClient): callback=self.on_cancel_order ) - def query_history( - self, - symbol: str, - interval: Interval, - start: int, # unix timestamp - limit: int = None, - ) -> List[BarData]: + def query_history(self, req: HistoryRequest) -> List[BarData]: """""" - pass + history = [] + count = 200 + start_time = int(req.start.timestamp()) + + while True: + # Create query params + params = { + "symbol": req.symbol, + "interval": INTERVAL_VT2BYBIT[req.interval], + "from": start_time, + "limit": count + } + + # Get response from server + resp = self.request( + "GET", + "/v2/public/kline/list", + params=params + ) + + # Break if request failed with other status code + if resp.status_code // 100 != 2: + msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}" + self.gateway.write_log(msg) + break + else: + data = resp.json() + if not data: + msg = f"获取历史数据为空,开始时间:{start_time},数量:{count}" + break + + buf = [] + for d in data["result"]: + dt = datetime.fromtimestamp(d["open_time"]) + bar = BarData( + symbol=req.symbol, + exchange=req.exchange, + datetime=dt, + interval=req.interval, + volume=int(d["volume"]), + open_price=float(d["open"]), + high_price=float(d["high"]), + low_price=float(d["low"]), + close_price=float(d["close"]), + gateway_name=self.gateway_name + ) + buf.append(bar) + + history.extend(buf) + + begin = buf[0].datetime + end = buf[-1].datetime + msg = f"获取历史数据成功,{req.symbol} - {req.interval.value},{begin} - {end}" + self.gateway.write_log(msg) + + # Break if last data collected + if len(buf) < count: + break + + # Update start time + start_time = int((bar.datetime + TIMEDELTA_MAP[req.interval]).timestamp()) + + return history def on_send_order_failed(self, status_code: int, request: Request): """ @@ -316,7 +352,7 @@ class BybitRestApi(RestClient): """ order = request.extra order.status = Status.REJECTED - self.gateway.on_order(order) + self.order_manager.on_order(order) data = request.response.json() error_msg = data["ret_msg"] @@ -332,7 +368,7 @@ class BybitRestApi(RestClient): """ order = request.extra order.status = Status.REJECTED - self.gateway.on_order(order) + self.order_manager.on_order(order) # Record exception if not ConnectionError if not issubclass(exception_type, ConnectionError): @@ -340,6 +376,9 @@ class BybitRestApi(RestClient): def on_send_order(self, data: dict, request: Request): """""" + if self.check_error("委托下单", data): + return + result = data["result"] self.order_manager.update_orderid_map( result["order_link_id"], @@ -358,7 +397,8 @@ class BybitRestApi(RestClient): def on_cancel_order(self, data: dict, request: Request): """""" - pass + if self.check_error("委托下单", data): + return def on_failed(self, status_code: int, request: Request): """ @@ -412,14 +452,16 @@ class BybitRestApi(RestClient): for d in data["result"]: contract = ContractData( - gateway_name=self.gateway_name, symbol=d["name"], exchange=Exchange.BYBIT, name=d["name"], product=Product.FUTURES, size=1, pricetick=d["price_filter"]["tick_size"], - min_volume=d["lot_size_filter"]["qty_step"] + min_volume=d["lot_size_filter"]["qty_step"], + net_position=True, + history_data=True, + gateway_name=self.gateway_name ) self.gateway.on_contract(contract) @@ -450,7 +492,6 @@ class BybitRestApi(RestClient): direction=Direction.NET, volume=volume, price=d["entry_price"], - pnl=d["unrealised_pnl"], gateway_name=self.gateway_name ) self.gateway.on_position(position) @@ -515,7 +556,7 @@ class BybitRestApi(RestClient): time=d["created_at"], gateway_name=self.gateway_name ) - self.gateway.on_order(order) + self.order_manager.on_order(order) if result["current_page"] != result["last_page"]: self.query_order(result["current_page"] + 1) @@ -523,24 +564,6 @@ class BybitRestApi(RestClient): self.gateway.write_log("委托信息查询成功") -class RawOrderInfo: - - def __init__(self, data: dict): - self.id = data["id"] - self.price = data["price"] - self.side = data["side"] # "Buy", "Sell" - self.size = data.get("size", 0) - - def __lt__(self, rhs: "RawOrderInfo"): - return self.price < rhs.price - - def __eq__(self, rhs: "RawOrderInfo"): - return self.price == rhs.price - - def __hash__(self): - return self.id # price is a string and we don"t known its precision - - class BybitWebsocketApi(WebsocketClient): """""" @@ -558,6 +581,7 @@ class BybitWebsocketApi(WebsocketClient): self.callbacks: Dict[str, Callable] = {} self.ticks: Dict[str, TickData] = {} + self.subscribed: Dict[str, SubscribeRequest] = {} self.symbol_bids: Dict[str, dict] = {} self.symbol_asks: Dict[str, dict] = {} @@ -596,6 +620,8 @@ class BybitWebsocketApi(WebsocketClient): """ Subscribe to tick data upate. """ + self.subscribed[req.symbol] = req + tick = TickData( symbol=req.symbol, exchange=req.exchange, @@ -605,7 +631,8 @@ class BybitWebsocketApi(WebsocketClient): ) self.ticks[req.symbol] = tick - self.subscribe_topic(f"instrument_info.100ms.{req.symbol}", self.on_tick) + self.subscribe_topic( + f"instrument_info.100ms.{req.symbol}", self.on_tick) self.subscribe_topic(f"orderBookL2_25.{req.symbol}", self.on_depth) def on_connected(self): @@ -619,7 +646,6 @@ class BybitWebsocketApi(WebsocketClient): def on_packet(self, packet: dict): """""" - print(packet) if "topic" not in packet: op = packet["request"]["op"] if op == "auth": @@ -629,16 +655,6 @@ class BybitWebsocketApi(WebsocketClient): callback = self.callbacks[channel] callback(packet) - # success = packet.get("success", None) - # topic = packet.get("topic", None) - # if success is not None: - # if success is False: - # self.gateway.write_log("Websocket API报错:%s" % packet["ret_msg"]) - # elif topic is not None: - # self.callbacks[topic](topic, packet) - # else: - # self.gateway.write_log(f"unkonwn packet: {packet}") - def on_error(self, exception_type: type, exception_value: Exception, tb): """""" msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" @@ -656,6 +672,9 @@ class BybitWebsocketApi(WebsocketClient): self.subscribe_topic("order", self.on_order) self.subscribe_topic("execution", self.on_trade) self.subscribe_topic("position", self.on_position) + + for req in self.subscribed.values(): + self.subscribe(req) else: self.gateway.write_log("Websocket API登录失败") @@ -755,45 +774,81 @@ class BybitWebsocketApi(WebsocketClient): tick.datetime = datetime.fromtimestamp(timestamp / 1_000_000) self.gateway.on_tick(copy(tick)) - def on_trade(self, topic: str, data: dict): + def on_trade(self, packet: dict): """""" - for data in data["data"]: - order_id = data["order_link_id"] + for d in packet["data"]: + order_id = d["order_link_id"] if not order_id: - order_id = data["order_id"] + order_id = d["order_id"] + trade = TradeData( - symbol=data["symbol"], + symbol=d["symbol"], exchange=Exchange.BYBIT, orderid=order_id, - tradeid=data["exec_id"], - direction=DIRECTION_BYBIT2VT[data["side"]], - price=data["price"], - volume=data["exec_qty"], - time=parse_datetime(data["trade_time"]), + tradeid=d["exec_id"], + direction=DIRECTION_BYBIT2VT[d["side"]], + price=d["price"], + volume=d["exec_qty"], + time=d["trade_time"], gateway_name=self.gateway_name, ) self.gateway.on_trade(trade) - def on_order(self, topic: str, data: dict): + def on_order(self, packet: dict): """""" - for data in data["data"]: - print(data) - order = self.gateway.parse_order_data(data, "timestamp") + for d in packet["data"]: + sys_orderid = d["order_id"] + order = self.order_manager.get_order_with_sys_orderid(sys_orderid) - self.gateway.on_order(copy(order)) + if order: + order.traded = d["cum_exec_qty"] + order.status = STATUS_BYBIT2VT[d["order_status"]] + else: + # Use sys_orderid as local_orderid when + # order placed from other source + local_orderid = d["order_link_id"] + if not local_orderid: + local_orderid = sys_orderid - def on_position(self, topic: str, data: dict): + self.order_manager.update_orderid_map( + local_orderid, + sys_orderid + ) + + order = OrderData( + symbol=d["symbol"], + exchange=Exchange.BYBIT, + orderid=local_orderid, + type=ORDER_TYPE_BYBIT2VT[d["order_type"]], + direction=DIRECTION_BYBIT2VT[d["side"]], + price=d["price"], + volume=d["qty"], + traded=d["cum_exec_qty"], + status=STATUS_BYBIT2VT[d["order_status"]], + time=d["timestamp"], + gateway_name=self.gateway_name + ) + + self.order_manager.on_order(order) + + def on_position(self, packet: dict): """""" - for data in data["data"]: - p1, p2 = self.gateway.parse_position_data(data) - self.gateway.on_position(p1) - if p2: - self.gateway.on_position(p2) + for d in packet["data"]: + if d["side"] == "Buy": + volume = d["size"] + else: + volume = -d["size"] - -def _parse_timestamp_e6(timestamp: int): - return datetime.fromtimestamp(timestamp / 1_000_000, utc_tz).astimezone(tz=local_tz) + position = PositionData( + symbol=d["symbol"], + exchange=Exchange.BYBIT, + direction=Direction.NET, + volume=volume, + price=d["entry_price"], + gateway_name=self.gateway_name + ) + self.gateway.on_position(position) def generate_timestamp(expire_after: float = 30) -> int: @@ -809,7 +864,3 @@ def sign(secret: bytes, data: bytes) -> str: return hmac.new( secret, data, digestmod=hashlib.sha256 ).hexdigest() - - -def parse_datetime(dt: str) -> str: - return dt[11:19] diff --git a/vnpy/trader/gateway.py b/vnpy/trader/gateway.py index 9d8f1770..4ce95cff 100644 --- a/vnpy/trader/gateway.py +++ b/vnpy/trader/gateway.py @@ -265,12 +265,12 @@ class LocalOrderManager: Management tool to support use local order id for trading. """ - def __init__(self, gateway: BaseGateway): + def __init__(self, gateway: BaseGateway, order_prefix: str = ""): """""" self.gateway = gateway # For generating local orderid - self.order_prefix = "" + self.order_prefix = order_prefix self.order_count = 0 self.orders = {} # local_orderid:order @@ -296,7 +296,7 @@ class LocalOrderManager: Generate a new local orderid. """ self.order_count += 1 - local_orderid = str(self.order_count).rjust(8, "0") + local_orderid = self.order_prefix + str(self.order_count).rjust(8, "0") return local_orderid def get_local_orderid(self, sys_orderid: str):