[Mod] change code struction and complete test of BybitGateway

This commit is contained in:
vn.py 2019-10-21 15:52:39 +08:00
parent 765bb5078a
commit 5df4e6f738

View File

@ -194,7 +194,7 @@ class BybitRestApi(RestClient):
api_params["api_key"] = self.key api_params["api_key"] = self.key
api_params["recv_window"] = 30 * 1000 api_params["recv_window"] = 30 * 1000
api_params["timestamp"] = generate_timestamp(-1) api_params["timestamp"] = generate_timestamp(-5)
data2sign = "&".join( data2sign = "&".join(
[f"{k}={v}" for k, v in sorted(api_params.items())]) [f"{k}={v}" for k, v in sorted(api_params.items())])
@ -265,87 +265,6 @@ class BybitRestApi(RestClient):
self.order_manager.on_order(order) self.order_manager.on_order(order)
return order.vt_orderid return order.vt_orderid
def cancel_order(self, req: CancelRequest):
""""""
sys_orderid = self.order_manager.get_sys_orderid(req.orderid)
data = {
"order_id": sys_orderid,
"symbol": req.symbol,
}
self.add_request(
"POST",
path="/open-api/order/cancel",
data=data,
callback=self.on_cancel_order
)
def query_history(self, req: HistoryRequest) -> List[BarData]:
""""""
history = []
count = 200
start_time = int(req.start.timestamp())
while True:
# Create query params
params = {
"symbol": req.symbol,
"interval": INTERVAL_VT2BYBIT[req.interval],
"from": start_time,
"limit": count
}
# Get response from server
resp = self.request(
"GET",
"/v2/public/kline/list",
params=params
)
# Break if request failed with other status code
if resp.status_code // 100 != 2:
msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}"
self.gateway.write_log(msg)
break
else:
data = resp.json()
if not data:
msg = f"获取历史数据为空,开始时间:{start_time},数量:{count}"
break
buf = []
for d in data["result"]:
dt = datetime.fromtimestamp(d["open_time"])
bar = BarData(
symbol=req.symbol,
exchange=req.exchange,
datetime=dt,
interval=req.interval,
volume=int(d["volume"]),
open_price=float(d["open"]),
high_price=float(d["high"]),
low_price=float(d["low"]),
close_price=float(d["close"]),
gateway_name=self.gateway_name
)
buf.append(bar)
history.extend(buf)
begin = buf[0].datetime
end = buf[-1].datetime
msg = f"获取历史数据成功,{req.symbol} - {req.interval.value}{begin} - {end}"
self.gateway.write_log(msg)
# Break if last data collected
if len(buf) < count:
break
# Update start time
start_time = int((bar.datetime + TIMEDELTA_MAP[req.interval]).timestamp())
return history
def on_send_order_failed(self, status_code: int, request: Request): def on_send_order_failed(self, status_code: int, request: Request):
""" """
Callback when sending order failed on server. Callback when sending order failed on server.
@ -385,6 +304,21 @@ class BybitRestApi(RestClient):
result["order_id"] result["order_id"]
) )
def cancel_order(self, req: CancelRequest):
""""""
sys_orderid = self.order_manager.get_sys_orderid(req.orderid)
data = {
"order_id": sys_orderid,
"symbol": req.symbol,
}
self.add_request(
"POST",
path="/open-api/order/cancel",
data=data,
callback=self.on_cancel_order
)
def on_cancel_order_error( def on_cancel_order_error(
self, exception_type: type, exception_value: Exception, tb, request: Request self, exception_type: type, exception_value: Exception, tb, request: Request
): ):
@ -563,6 +497,72 @@ class BybitRestApi(RestClient):
else: else:
self.gateway.write_log("委托信息查询成功") self.gateway.write_log("委托信息查询成功")
def query_history(self, req: HistoryRequest) -> List[BarData]:
""""""
history = []
count = 200
start_time = int(req.start.timestamp())
while True:
# Create query params
params = {
"symbol": req.symbol,
"interval": INTERVAL_VT2BYBIT[req.interval],
"from": start_time,
"limit": count
}
# Get response from server
resp = self.request(
"GET",
"/v2/public/kline/list",
params=params
)
# Break if request failed with other status code
if resp.status_code // 100 != 2:
msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}"
self.gateway.write_log(msg)
break
else:
data = resp.json()
if not data:
msg = f"获取历史数据为空,开始时间:{start_time},数量:{count}"
break
buf = []
for d in data["result"]:
dt = datetime.fromtimestamp(d["open_time"])
bar = BarData(
symbol=req.symbol,
exchange=req.exchange,
datetime=dt,
interval=req.interval,
volume=int(d["volume"]),
open_price=float(d["open"]),
high_price=float(d["high"]),
low_price=float(d["low"]),
close_price=float(d["close"]),
gateway_name=self.gateway_name
)
buf.append(bar)
history.extend(buf)
begin = buf[0].datetime
end = buf[-1].datetime
msg = f"获取历史数据成功,{req.symbol} - {req.interval.value}{begin} - {end}"
self.gateway.write_log(msg)
# Break if last data collected
if len(buf) < count:
break
# Update start time
start_time = int((bar.datetime + TIMEDELTA_MAP[req.interval]).timestamp())
return history
class BybitWebsocketApi(WebsocketClient): class BybitWebsocketApi(WebsocketClient):
"""""" """"""
@ -635,6 +635,18 @@ class BybitWebsocketApi(WebsocketClient):
f"instrument_info.100ms.{req.symbol}", self.on_tick) f"instrument_info.100ms.{req.symbol}", self.on_tick)
self.subscribe_topic(f"orderBookL2_25.{req.symbol}", self.on_depth) self.subscribe_topic(f"orderBookL2_25.{req.symbol}", self.on_depth)
def subscribe_topic(self, topic: str, callback: Callable[[str, dict], Any]):
"""
Subscribe to all private topics.
"""
self.callbacks[topic] = callback
req = {
"op": "subscribe",
"args": [topic],
}
self.send_packet(req)
def on_connected(self): def on_connected(self):
"""""" """"""
self.gateway.write_log("Websocket API连接成功") self.gateway.write_log("Websocket API连接成功")
@ -678,18 +690,6 @@ class BybitWebsocketApi(WebsocketClient):
else: else:
self.gateway.write_log("Websocket API登录失败") self.gateway.write_log("Websocket API登录失败")
def subscribe_topic(self, topic: str, callback: Callable[[str, dict], Any]):
"""
Subscribe to all private topics.
"""
self.callbacks[topic] = callback
req = {
"op": "subscribe",
"args": [topic],
}
self.send_packet(req)
def on_tick(self, packet: dict): def on_tick(self, packet: dict):
"""""" """"""
topic = packet["topic"] topic = packet["topic"]