diff --git a/vnpy/gateway/bybit/bybit_gateway.py b/vnpy/gateway/bybit/bybit_gateway.py index fd9b1ddd..afdd7b2b 100644 --- a/vnpy/gateway/bybit/bybit_gateway.py +++ b/vnpy/gateway/bybit/bybit_gateway.py @@ -194,7 +194,7 @@ class BybitRestApi(RestClient): api_params["api_key"] = self.key api_params["recv_window"] = 30 * 1000 - api_params["timestamp"] = generate_timestamp(-1) + api_params["timestamp"] = generate_timestamp(-5) data2sign = "&".join( [f"{k}={v}" for k, v in sorted(api_params.items())]) @@ -265,87 +265,6 @@ class BybitRestApi(RestClient): self.order_manager.on_order(order) return order.vt_orderid - def cancel_order(self, req: CancelRequest): - """""" - sys_orderid = self.order_manager.get_sys_orderid(req.orderid) - data = { - "order_id": sys_orderid, - "symbol": req.symbol, - } - - self.add_request( - "POST", - path="/open-api/order/cancel", - data=data, - callback=self.on_cancel_order - ) - - def query_history(self, req: HistoryRequest) -> List[BarData]: - """""" - history = [] - count = 200 - start_time = int(req.start.timestamp()) - - while True: - # Create query params - params = { - "symbol": req.symbol, - "interval": INTERVAL_VT2BYBIT[req.interval], - "from": start_time, - "limit": count - } - - # Get response from server - resp = self.request( - "GET", - "/v2/public/kline/list", - params=params - ) - - # Break if request failed with other status code - if resp.status_code // 100 != 2: - msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}" - self.gateway.write_log(msg) - break - else: - data = resp.json() - if not data: - msg = f"获取历史数据为空,开始时间:{start_time},数量:{count}" - break - - buf = [] - for d in data["result"]: - dt = datetime.fromtimestamp(d["open_time"]) - bar = BarData( - symbol=req.symbol, - exchange=req.exchange, - datetime=dt, - interval=req.interval, - volume=int(d["volume"]), - open_price=float(d["open"]), - high_price=float(d["high"]), - low_price=float(d["low"]), - close_price=float(d["close"]), - gateway_name=self.gateway_name - ) - buf.append(bar) - - history.extend(buf) - - begin = buf[0].datetime - end = buf[-1].datetime - msg = f"获取历史数据成功,{req.symbol} - {req.interval.value},{begin} - {end}" - self.gateway.write_log(msg) - - # Break if last data collected - if len(buf) < count: - break - - # Update start time - start_time = int((bar.datetime + TIMEDELTA_MAP[req.interval]).timestamp()) - - return history - def on_send_order_failed(self, status_code: int, request: Request): """ Callback when sending order failed on server. @@ -385,6 +304,21 @@ class BybitRestApi(RestClient): result["order_id"] ) + def cancel_order(self, req: CancelRequest): + """""" + sys_orderid = self.order_manager.get_sys_orderid(req.orderid) + data = { + "order_id": sys_orderid, + "symbol": req.symbol, + } + + self.add_request( + "POST", + path="/open-api/order/cancel", + data=data, + callback=self.on_cancel_order + ) + def on_cancel_order_error( self, exception_type: type, exception_value: Exception, tb, request: Request ): @@ -563,6 +497,72 @@ class BybitRestApi(RestClient): else: self.gateway.write_log("委托信息查询成功") + def query_history(self, req: HistoryRequest) -> List[BarData]: + """""" + history = [] + count = 200 + start_time = int(req.start.timestamp()) + + while True: + # Create query params + params = { + "symbol": req.symbol, + "interval": INTERVAL_VT2BYBIT[req.interval], + "from": start_time, + "limit": count + } + + # Get response from server + resp = self.request( + "GET", + "/v2/public/kline/list", + params=params + ) + + # Break if request failed with other status code + if resp.status_code // 100 != 2: + msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}" + self.gateway.write_log(msg) + break + else: + data = resp.json() + if not data: + msg = f"获取历史数据为空,开始时间:{start_time},数量:{count}" + break + + buf = [] + for d in data["result"]: + dt = datetime.fromtimestamp(d["open_time"]) + bar = BarData( + symbol=req.symbol, + exchange=req.exchange, + datetime=dt, + interval=req.interval, + volume=int(d["volume"]), + open_price=float(d["open"]), + high_price=float(d["high"]), + low_price=float(d["low"]), + close_price=float(d["close"]), + gateway_name=self.gateway_name + ) + buf.append(bar) + + history.extend(buf) + + begin = buf[0].datetime + end = buf[-1].datetime + msg = f"获取历史数据成功,{req.symbol} - {req.interval.value},{begin} - {end}" + self.gateway.write_log(msg) + + # Break if last data collected + if len(buf) < count: + break + + # Update start time + start_time = int((bar.datetime + TIMEDELTA_MAP[req.interval]).timestamp()) + + return history + class BybitWebsocketApi(WebsocketClient): """""" @@ -635,6 +635,18 @@ class BybitWebsocketApi(WebsocketClient): f"instrument_info.100ms.{req.symbol}", self.on_tick) self.subscribe_topic(f"orderBookL2_25.{req.symbol}", self.on_depth) + def subscribe_topic(self, topic: str, callback: Callable[[str, dict], Any]): + """ + Subscribe to all private topics. + """ + self.callbacks[topic] = callback + + req = { + "op": "subscribe", + "args": [topic], + } + self.send_packet(req) + def on_connected(self): """""" self.gateway.write_log("Websocket API连接成功") @@ -678,18 +690,6 @@ class BybitWebsocketApi(WebsocketClient): else: self.gateway.write_log("Websocket API登录失败") - def subscribe_topic(self, topic: str, callback: Callable[[str, dict], Any]): - """ - Subscribe to all private topics. - """ - self.callbacks[topic] = callback - - req = { - "op": "subscribe", - "args": [topic], - } - self.send_packet(req) - def on_tick(self, packet: dict): """""" topic = packet["topic"]