From 7a2007ddbd11725e31a69479896cc1befa1fee28 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Mon, 21 Oct 2019 11:01:50 +0800 Subject: [PATCH] [Mod] simplify market data process --- examples/vn_trader/run.py | 12 +- vnpy/gateway/bybit/bybit_gateway.py | 1154 +++++++++------------------ 2 files changed, 379 insertions(+), 787 deletions(-) diff --git a/examples/vn_trader/run.py b/examples/vn_trader/run.py index 6aa6332c..32174775 100644 --- a/examples/vn_trader/run.py +++ b/examples/vn_trader/run.py @@ -31,6 +31,7 @@ from vnpy.gateway.da import DaGateway from vnpy.gateway.coinbase import CoinbaseGateway from vnpy.gateway.bitstamp import BitstampGateway from vnpy.gateway.gateios import GateiosGateway +from vnpy.gateway.bybit import BybitGateway from vnpy.app.cta_strategy import CtaStrategyApp # from vnpy.app.csv_loader import CsvLoaderApp @@ -52,7 +53,7 @@ def main(): main_engine = MainEngine(event_engine) # main_engine.add_gateway(BinanceGateway) - main_engine.add_gateway(CtpGateway) + # main_engine.add_gateway(CtpGateway) # main_engine.add_gateway(CtptestGateway) # main_engine.add_gateway(MiniGateway) # main_engine.add_gateway(SoptGateway) @@ -60,12 +61,12 @@ def main(): # main_engine.add_gateway(FemasGateway) # main_engine.add_gateway(IbGateway) # main_engine.add_gateway(FutuGateway) - main_engine.add_gateway(BitmexGateway) + # main_engine.add_gateway(BitmexGateway) # main_engine.add_gateway(TigerGateway) # main_engine.add_gateway(OesGateway) # main_engine.add_gateway(OkexGateway) # main_engine.add_gateway(HuobiGateway) - main_engine.add_gateway(BitfinexGateway) + # main_engine.add_gateway(BitfinexGateway) # main_engine.add_gateway(OnetokenGateway) # main_engine.add_gateway(OkexfGateway) # main_engine.add_gateway(HbdmGateway) @@ -76,8 +77,9 @@ def main(): # main_engine.add_gateway(OkexsGateway) # main_engine.add_gateway(DaGateway) # main_engine.add_gateway(CoinbaseGateway) - main_engine.add_gateway(BitstampGateway) - main_engine.add_gateway(GateiosGateway) + # main_engine.add_gateway(BitstampGateway) + # main_engine.add_gateway(GateiosGateway) + main_engine.add_gateway(BybitGateway) main_engine.add_app(CtaStrategyApp) main_engine.add_app(CtaBacktesterApp) diff --git a/vnpy/gateway/bybit/bybit_gateway.py b/vnpy/gateway/bybit/bybit_gateway.py index e97ad129..c5de04bf 100644 --- a/vnpy/gateway/bybit/bybit_gateway.py +++ b/vnpy/gateway/bybit/bybit_gateway.py @@ -2,57 +2,40 @@ import hashlib import hmac import time -from datetime import datetime, timedelta, timezone - -from dataclasses import dataclass -from datetime import datetime -from typing import Any, Dict, List, Tuple import sys -from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List, Callable from threading import Lock -from typing import List, TYPE_CHECKING, Tuple -from urllib.parse import urlencode +from copy import copy from requests import ConnectionError - -import hashlib -import hmac -import sys -import time -from collections import defaultdict -from copy import copy -from datetime import datetime -from typing import Any, Callable, Dict, TYPE_CHECKING - -from sortedcontainers import SortedSet +# from sortedcontainers import SortedSet from vnpy.api.websocket import WebsocketClient -from vnpy.trader.constant import (Exchange, Product) -from vnpy.trader.object import (AccountData, ContractData, SubscribeRequest, TickData, TradeData) -from .common import (DIRECTION_BYBIT2VT, generate_timestamp, local_tz, parse_datetime, sign, utc_tz) - - - from vnpy.api.rest import Request, RestClient -from vnpy.trader.constant import Exchange, Interval, OrderType, Product, Status -from vnpy.trader.object import AccountData, BarData, CancelRequest, ContractData, OrderRequest -from .common import (DIRECTION_VT2BYBIT, INTERVAL_VT2BYBIT, ORDER_TYPE_VT2BYBIT, generate_timestamp, - parse_datetime, sign, INTERVAL_VT2BYBIT_INT) - - -from vnpy.event import Event -from vnpy.gateway.bybit.rest_api import BybitRestApi, HistoryDataNextInfo -from vnpy.gateway.bybit.websocket_api import BybitWebsocketApi -from vnpy.trader.constant import (Exchange, Interval, OrderType) -from vnpy.trader.constant import Direction, Interval, OrderType, Status -from vnpy.trader.event import EVENT_TIMER -from vnpy.trader.gateway import BaseGateway -from vnpy.trader.object import (BarData, CancelRequest, HistoryRequest, OrderData, OrderRequest, - PositionData, SubscribeRequest, TickData) -from .common import (DIRECTION_BYBIT2VT, INTERVAL_VT2BYBIT_INT, OPPOSITE_DIRECTION, - ORDER_TYPE_BYBIT2VT, STATUS_BYBIT2VT, STOP_ORDER_STATUS_BYBIT2VT, local_tz, - parse_datetime, utc_tz) +from vnpy.trader.constant import ( + Exchange, + Interval, + OrderType, + Product, + Status, + Direction +) +from vnpy.trader.object import ( + AccountData, + BarData, + TickData, + OrderData, + TradeData, + ContractData, + PositionData, + HistoryRequest, + SubscribeRequest, + CancelRequest, + OrderRequest +) +# from vnpy.trader.event import EVENT_TIMER +from vnpy.trader.gateway import BaseGateway, LocalOrderManager STATUS_BYBIT2VT = { @@ -87,7 +70,6 @@ ORDER_TYPE_VT2BYBIT = { OrderType.LIMIT: "Limit", OrderType.MARKET: "Market", } - ORDER_TYPE_BYBIT2VT = {v: k for k, v in ORDER_TYPE_VT2BYBIT.items()} INTERVAL_VT2BYBIT = { @@ -102,6 +84,7 @@ INTERVAL_VT2BYBIT_INT = { Interval.DAILY: 60 * 24, Interval.WEEKLY: 60 * 24 * 7, } + TIMEDELTA_MAP = { Interval.MINUTE: timedelta(minutes=1), Interval.HOUR: timedelta(hours=1), @@ -109,21 +92,12 @@ TIMEDELTA_MAP = { Interval.WEEKLY: timedelta(days=7), } -if TYPE_CHECKING: - from vnpy.gateway.bybit import BybitGateway -if TYPE_CHECKING: - from vnpy.gateway.bybit import BybitGateway +REST_HOST = "https://api.bybit.com" +WEBSOCKET_HOST = "wss://stream.bybit.com/realtime" -HOST = "wss://stream.bybit.com/realtime" -TEST_HOST = "wss://stream-testnet.bybit.com/realtime" - - -def _(x): return x # noqa - - -HOST = "https://api.bybit.com" -TEST_HOST = "https://api-testnet.bybit.com" +TESTNET_REST_HOST = "https://api-testnet.bybit.com" +TESTNET_WEBSOCKET_HOST = "wss://stream-testnet.bybit.com/realtime" # asked from official developer PRICE_TICKS = { @@ -137,62 +111,33 @@ utc_tz = timezone.utc local_tz = datetime.now(timezone.utc).astimezone().tzinfo -@dataclass() -class HistoryDataInfo: - bars: List[BarData] - extra: Any - - - - -@dataclass() -class HistoryDataNextInfo: - symbol: str - interval: Interval - end: int - - -class RequestFailedException(Exception): - pass - - - - - class BybitGateway(BaseGateway): """ - VN Trader Gateway for BitMEX connection. + VN Trader Gateway for ByBit connection. """ default_setting = { - "APIKey": "", - "PrivateKey": "", - "会话数": 3, + "ID": "", + "Secret": "", "服务器": ["REAL", "TESTNET"], "代理地址": "", "代理端口": "", } - HISTORY_RECORD_PER_REQUEST = 200 # # of records per history request exchanges = [Exchange.BYBIT] def __init__(self, event_engine): """Constructor""" - super(BybitGateway, self).__init__(event_engine, "BYBIT") + super().__init__(event_engine, "BYBIT") + self.order_manager = LocalOrderManager(self) self.rest_api = BybitRestApi(self) self.ws_api = BybitWebsocketApi(self) - self.ticks: Dict[str, TickData] = {} - self.orders: Dict[str, OrderData] = {} - self.local2sys_map: Dict[str, str] = {} - event_engine.register(EVENT_TIMER, self.process_timer_event) - def connect(self, setting: dict): """""" key = setting["ID"] secret = setting["Secret"] - session_number = setting["会话数"] server = setting["服务器"] proxy_host = setting["代理地址"] proxy_port = setting["代理端口"] @@ -202,13 +147,8 @@ class BybitGateway(BaseGateway): else: proxy_port = 0 - self.rest_api.connect(key, secret, session_number, - server, proxy_host, proxy_port) - + self.rest_api.connect(key, secret, server, proxy_host, proxy_port) self.ws_api.connect(key, secret, server, proxy_host, proxy_port) - self.rest_api.query_orders() - self.query_contracts() - self.query_account() def subscribe(self, req: SubscribeRequest): """""" @@ -222,269 +162,64 @@ class BybitGateway(BaseGateway): """""" self.rest_api.cancel_order(req) - def query_contracts(self): - self.rest_api.query_contracts() - def query_account(self): """""" - self.rest_api.query_position() + pass def query_position(self): """""" - self.rest_api.query_position() - - def query_first_history(self, - symbol: str, - interval: Interval, - start: datetime, - ) -> Tuple[List[BarData], "HistoryDataNextInfo"]: - - # datetime for a bar is close_time - # we got open_time from API. - adjustment = INTERVAL_VT2BYBIT_INT[interval] - - utc_time = start.replace(tzinfo=local_tz).astimezone(tz=utc_tz) - return self.rest_api.query_history( - symbol=symbol, - interval=interval, - - # todo: vnpy: shall all datetime object use tzinfo? - start=int(utc_time.timestamp()) - adjustment, - ) - - def query_next_history(self, - next_info: Any, - ): - data: "HistoryDataNextInfo" = next_info - return self.rest_api.query_history( - symbol=data.symbol, - interval=data.interval, - start=data.end, - ) + pass def query_history(self, req: HistoryRequest): - """ - todo: vnpy: download in parallel - todo: vnpy: use yield to simplify logic - :raises RequestFailedException: if server reply an error. - Any Exception might be raised from requests.request: network error. - """ - # todo: this function: test rate limit - history = [] - - symbol = req.symbol - interval = req.interval - start = req.start - - bars, next_info = self.query_first_history( - symbol=symbol, - interval=interval, - start=start, - ) - msg = f"获取历史数据成功,{req.symbol} - {req.interval.value},{bars[0].datetime} - {bars[-1].datetime}" - self.write_log(msg) - history.extend(bars) - while True: - bars, next_info = self.query_next_history(next_info) - - msg = f"获取历史数据成功,{req.symbol} - {req.interval.value},{bars[0].datetime} - {bars[-1].datetime}" - self.write_log(msg) - - # Break if total data count less than (latest date collected) - if len(bars) < self.HISTORY_RECORD_PER_REQUEST: - break - history.extend(bars) - return history + """""" + return self.rest_api.query_history() def close(self): """""" self.rest_api.stop() self.ws_api.stop() - def process_timer_event(self, event: Event): - """""" - self.rest_api.increase_rate_limit() - if self.rest_api.alive: - self.query_account() - self.rest_api.query_stop_orders() - - def write_log(self, msg: str): - return super().write_log(msg) - - def parse_order_data(self, data: dict, time_key: str = 'updated_at'): - """ - Parse order data from json dict. - todo: gateway: put this function to another place. - :note: this method will not fill 'time' record. Fill it by yourself. - """ - # prefer local_id as order_id - order_id = data["order_link_id"] - sys_id = data['order_id'] - if not order_id: - order_id = sys_id - - # saving mapping from order_id to sys_id - self.local2sys_map[order_id] = sys_id - - order = self.orders.get(order_id, None) - time = parse_datetime(data[time_key]) - - # filter outdated order - if order is not None and time < order.time: # string cmp is ok here. - return None - - # if order not exist(created outside this client) - # create it. - if order is None: - order = OrderData( - symbol=data["symbol"], - exchange=Exchange.BYBIT, - type=ORDER_TYPE_BYBIT2VT[data["order_type"]], - orderid=order_id, - direction=DIRECTION_BYBIT2VT[data["side"]], - price=data["price"], - volume=data["qty"], - time=time, - gateway_name=self.gateway_name, - ) - self.orders[order.orderid] = order - order.traded = data.get("cum_exec_qty", order.traded) - if 'order_status' in data: - order.status = STATUS_BYBIT2VT[data["order_status"]] - return order - - def parse_stop_order_data(self, data): - """ - Parse order data from json dict. - todo: put this function to another place. - :note: this method will not fill 'time' record. Fill it by yourself. - """ - # prefer local_id as order_id - order_id = data["order_link_id"] - sys_id = data['stop_order_id'] - if not order_id: - order_id = sys_id - - # saving mapping from order_id to sys_id - self.local2sys_map[order_id] = sys_id - - order = self.orders.get(order_id, None) - - # if order not exist(created outside this client) - # create it. - if not order: - order = OrderData( - symbol=data["symbol"], - exchange=Exchange.BYBIT, - type=OrderType.STOP, - orderid=order_id, - direction=DIRECTION_BYBIT2VT[data["side"]], - price=data["price"], - volume=data["qty"], - # this should be filled manually - time=parse_datetime(data['updated_at']), - gateway_name=self.gateway_name, - ) - self.orders[order.orderid] = order - if 'stop_order_status' in data: - # status = STATUS_BYBIT2VT.get(data["order_status"], None) - # if status is None: - # status = STOP_ORDER_STATUS_BYBIT2VT[data["order_status"]] - # order.status = status - order.status = STOP_ORDER_STATUS_BYBIT2VT[data["stop_order_status"]] - return order - - def orderid2sys(self, order_id: str): - """ - Convert order_id to sys_id - """ - return self.local2sys_map[order_id] - - def parse_position_data(self, data): - position = PositionData( - gateway_name=self.gateway_name, - symbol=data["symbol"], - exchange=Exchange.BYBIT, - direction=DIRECTION_BYBIT2VT[data['side']], - volume=data['size'], - price=data['entry_price'] - ) - - # clear opposite direction if necessary - if position.volume: - pos2 = PositionData( - gateway_name=position.gateway_name, - symbol=position.symbol, - exchange=Exchange.BYBIT, - direction=OPPOSITE_DIRECTION[position.direction], - volume=0, - price=0, - ) - return position, pos2 - return position, None - - def on_order(self, order: OrderData): - """ - Since WebSocket and RestClient will push the same orders asynchronously and separately - outdated orders should be filtered. - Outdated orders is filtered by parse_xxx_order_data(), returning None. - """ - if order is not None: - super().on_order(order) - class BybitRestApi(RestClient): """ - BitMEX REST API + ByBit REST API """ - def __init__(self, gateway: "BybitGateway"): + def __init__(self, gateway: BybitGateway): """""" - super(BybitRestApi, self).__init__() + super().__init__() self.gateway = gateway self.gateway_name = gateway.gateway_name + self.order_manager = gateway.order_manager self.key = "" self.secret = b"" self.order_count = 1_000_000 self.order_count_lock = Lock() - self.connect_time = 0 - # Use 60 by default, and will update after first request - self.rate_limit_limit = 60 - self.rate_limit_remaining = 60 - self.rate_limit_sleep = 0 - - @property - def ticks(self): - return self.gateway.ticks - - def sign(self, request): + def sign(self, request: Request): """ - Generate BitMEX signature. + Generate ByBit signature. """ - if request.method == "GET": - api_params = request.params # dict 2 sign + api_params = request.params if api_params is None: api_params = request.params = {} - else: # POST + else: api_params = request.data if api_params is None: api_params = request.data = {} - expires = generate_timestamp(0) + api_params["api_key"] = self.key + api_params["recv_window"] = 30 * 1000 + api_params["timestamp"] = generate_timestamp(-1) - api_params['api_key'] = self.key - api_params['recv_window'] = 30 * 1000 - api_params['timestamp'] = expires - - data2sign = urlencode({k: api_params[k] for k in sorted(api_params)}) + data2sign = "&".join([f"{k}={v}" for k, v in sorted(api_params.items())]) signature = sign(self.secret, data2sign.encode()) - api_params['sign'] = signature + api_params["sign"] = signature return request @@ -492,7 +227,6 @@ class BybitRestApi(RestClient): self, key: str, secret: str, - session_number: int, server: str, proxy_host: str, proxy_port: int, @@ -508,26 +242,20 @@ class BybitRestApi(RestClient): ) if server == "REAL": - self.init(HOST, proxy_host, proxy_port) + self.init(REST_HOST, proxy_host, proxy_port) else: - self.init(TEST_HOST, proxy_host, proxy_port) + self.init(TESTNET_REST_HOST, proxy_host, proxy_port) - self.start(session_number) + self.start(3) + self.gateway.write_log("REST API启动成功") - self.gateway.write_log(_("REST API启动成功")) - - def _new_order_id(self): - """""" - with self.order_count_lock: - self.order_count += 1 - return self.order_count + self.query_contract() + self.query_order() + self.query_position() def send_order(self, req: OrderRequest): """""" - if not self.check_rate_limit(): - return "" - - order_id = str(self.connect_time + self._new_order_id()) + order_id = self.order_manager.new_local_orderid() symbol = req.symbol data = { @@ -542,138 +270,58 @@ class BybitRestApi(RestClient): order.time = datetime.now().isoformat()[11:19] # Only add price for limit order. - if req.type != OrderType.STOP: - data["order_type"] = ORDER_TYPE_VT2BYBIT[req.type] - data["price"] = req.price - self.add_request( - "POST", - "/open-api/order/create", - callback=self.on_send_order, - data=data, - extra=order, - on_failed=self.on_send_order_failed, - on_error=self.on_send_order_error, - ) - else: - assert self.ticks[symbol], _("Subscribe This Symbol before sending StopOrder.") - last_price = self.ticks[symbol].last_price - data["order_type"] = 'Market' - data["stop_px"] = req.price - data["base_price"] = last_price - self.add_request( - "POST", - "/open-api/stop-order/create", - callback=self.on_send_stop_order, - data=data, - extra=order, - on_failed=self.on_send_order_failed, - on_error=self.on_send_order_error, - ) + data["order_type"] = ORDER_TYPE_VT2BYBIT[req.type] + data["price"] = req.price + self.add_request( + "POST", + "/open-api/order/create", + callback=self.on_send_order, + data=data, + extra=order, + on_failed=self.on_send_order_failed, + on_error=self.on_send_order_error, + ) self.gateway.on_order(order) return order.vt_orderid def cancel_order(self, req: CancelRequest): """""" - if not self.check_rate_limit(): - return - sys_id = self.gateway.orderid2sys(req.orderid) - order = self.gateway.orders[req.orderid] - - if order.type != OrderType.STOP: - path = "/open-api/order/cancel" - key = "order_id" - callback = self.on_cancel_order - else: - path = "/open-api/stop-order/cancel" - key = "stop_order_id" - callback = self.on_cancel_stop_order + sys_orderid = self.order_manager.get_sys_orderid(req.orderid) + data = { + "order_id": sys_orderid, + "symbol": req.symbol, + } self.add_request( "POST", - path, - callback=callback, - data={ - key: sys_id, - "symbol": req.symbol, - }, - on_error=self.on_cancel_order_error, - extra=order, + path="/open-api/order/cancel", + data=data, + callback=self.on_cancel_order ) - def query_history(self, - symbol: str, - interval: Interval, - start: int, # unix timestamp - limit: int = None, - ) -> Tuple[List[BarData], "HistoryDataNextInfo"]: - """ - Get history data synchronously. - """ - if limit is None: - limit = self.gateway.HISTORY_RECORD_PER_REQUEST - - bars = [] - - # datetime for a bar is close_time - # we got open_time from API. - adjustment = INTERVAL_VT2BYBIT_INT[interval] - - params = { - "interval": INTERVAL_VT2BYBIT[interval], - "symbol": symbol, - "limit": limit, - "from": start, - } - - # todo: RestClient: return RestClient.Request object instead of requests.Response. - resp = self.request( - "GET", - "/v2/public/kline/list", - params=params - ) - - # Break if request failed with other status code - raw_data = resp.json() - if not self.is_request_success(raw_data, None): - msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}" - self.gateway.write_log(msg) - raise RequestFailedException(msg) - result = raw_data['result'] - for data in result: - open_time = int(data["open_time"]) - close_time = open_time + adjustment - close_dt = datetime.fromtimestamp(close_time) - bar = BarData( - symbol=symbol, - exchange=Exchange.BYBIT, - datetime=close_dt, - interval=interval, - volume=data["volume"], - open_price=data["open"], - high_price=data["high"], - low_price=data["low"], - close_price=data["close"], - gateway_name=self.gateway_name - ) - bars.append(bar) - - end = result[-1]["open_time"] - return bars, HistoryDataNextInfo(symbol, interval, end) + def query_history( + self, + symbol: str, + interval: Interval, + start: int, # unix timestamp + limit: int = None, + ) -> List[BarData]: + """""" + pass def on_send_order_failed(self, status_code: int, request: Request): """ Callback when sending order failed on server. """ - data = request.response.json() - self.update_rate_limit(data) - order = request.extra order.status = Status.REJECTED self.gateway.on_order(order) - msg = f"委托失败,错误代码:{data['ret_code']}, 错误信息:{data['ret_msg']}" - + data = request.response.json() + error_msg = data["ret_msg"] + error_code = data["ret_code"] + msg = f"委托失败,错误代码:{error_code}, 错误信息:{error_msg}" self.gateway.write_log(msg) def on_send_order_error( @@ -690,23 +338,13 @@ class BybitRestApi(RestClient): if not issubclass(exception_type, ConnectionError): self.on_error(exception_type, exception_value, tb, request) - def on_send_order(self, raw_data: dict, request: Request): + def on_send_order(self, data: dict, request: Request): """""" - - data = raw_data['result'] - order = self.gateway.parse_order_data(data) - self.gateway.on_order(order) - - self.update_rate_limit(raw_data) - - def on_send_stop_order(self, raw_data: dict, request: Request): - """""" - data = raw_data['result'] - order = self.gateway.parse_stop_order_data(data) - order.time = parse_datetime(data['updated_at']) - self.gateway.on_order(order) - - self.update_rate_limit(raw_data) + result = data["result"] + self.order_manager.update_orderid_map( + result["order_link_id"], + result["order_id"] + ) def on_cancel_order_error( self, exception_type: type, exception_value: Exception, tb, request: Request @@ -718,38 +356,21 @@ class BybitRestApi(RestClient): if not issubclass(exception_type, ConnectionError): self.on_error(exception_type, exception_value, tb, request) - def on_cancel_order(self, raw_data: dict, request: Request): + def on_cancel_order(self, data: dict, request: Request): """""" - data: dict = raw_data['result'] - - order = self.gateway.parse_order_data(data) - self.gateway.on_order(order) - self.update_rate_limit(data) - - def on_cancel_stop_order(self, raw_data: dict, request: Request): - """""" - data: dict = raw_data['result'] - order = self.gateway.parse_stop_order_data(data) - order.time = parse_datetime(data['updated_at']) - self.gateway.on_order(order) - self.update_rate_limit(data) + pass def on_failed(self, status_code: int, request: Request): """ Callback to handle request failed. """ data = request.response.json() - self._handle_error_response(data, request) - def _handle_error_response(self, data, request, operation_name: str = None): - if operation_name is None: - operation_name = request.path - - self.update_rate_limit(data) error_msg = data["ret_msg"] - error_code = data['ret_code'] - msg = f"请求{operation_name}失败,状态码:{request.status},错误代码:{error_code}, 信息:{error_msg}" - msg += f'\n{request}' + error_code = data["ret_code"] + + msg = f"请求失败,状态码:{request.status},错误代码:{error_code}, 信息:{error_msg}" + self.gateway.write_log(msg) def on_error( @@ -765,130 +386,150 @@ class BybitRestApi(RestClient): self.exception_detail(exception_type, exception_value, tb, request) ) - def update_rate_limit(self, data: dict): - """ - Update current request limit remaining status. - :param data: - """ - remaining = data.get('rate_limit_status', None) - if remaining is not None: - self.rate_limit_remaining = remaining + def query_contract(self): + """""" + self.add_request( + "GET", + "/v2/public/symbols", + self.on_query_contract + ) - def increase_rate_limit(self): - """ - Reset request limit remaining every 1 second. - """ - self.rate_limit_remaining += 1 - self.rate_limit_remaining = min( - self.rate_limit_remaining, self.rate_limit_limit) - - # Countdown of retry sleep seconds - if self.rate_limit_sleep: - self.rate_limit_sleep -= 1 - - def check_rate_limit(self): - """ - Check if rate limit is reached before sending out requests. - """ - # Already received 429 from server - if self.rate_limit_sleep: - msg = f"请求过于频繁,已被Bybit限制,请等待{self.rate_limit_sleep}秒后再试" + def check_error(self, name: str, data: dict): + """""" + if data["ret_code"]: + error_code = data["ret_code"] + error_msg = data["ret_msg"] + msg = f"{name}失败,错误代码:{error_code},信息:{error_msg}" self.gateway.write_log(msg) - return False - # Just local request limit is reached - elif not self.rate_limit_remaining: - msg = "请求频率太高,有触发Bybit流控的风险,请稍候再试" - self.gateway.write_log(msg) - return False - else: - self.rate_limit_remaining -= 1 return True - def is_request_success(self, data: dict, request: "Request"): - return data['ret_code'] == 0 + return False - def query_contracts(self): - self.add_request("GET", - "/v2/public/tickers", - self.on_query_contracts) + def on_query_contract(self, data: dict, request: Request): + """""" + if self.check_error("查询合约", data): + return - def on_query_contracts(self, data: dict, request: "Request"): - for result in data['result']: - symbol = result['symbol'] + for d in data["result"]: contract = ContractData( gateway_name=self.gateway_name, - symbol=symbol, + symbol=d["name"], exchange=Exchange.BYBIT, - name=symbol, + name=d["name"], product=Product.FUTURES, size=1, - # todo: pricetick: Currently(2019-9-2) unable to query. - pricetick=PRICE_TICKS.get(symbol, 0.0001), + pricetick=d["price_filter"]["tick_size"], + min_volume=d["lot_size_filter"]["qty_step"] ) self.gateway.on_contract(contract) - def query_position(self): - self.add_request("GET", - "/position/list", - self.on_query_position - ) + self.gateway.write_log("合约信息查询成功") - def on_query_position(self, raw_data: dict, request: "Request"): - for data in raw_data['result']: - p1, p2 = self.gateway.parse_position_data(data) - self.gateway.on_position(p1) - if p2: - self.gateway.on_position(p2) + def query_position(self): + """""" + self.add_request( + "GET", + "/position/list", + self.on_query_position + ) + + def on_query_position(self, data: dict, request: Request): + """""" + if self.check_error("查询持仓", data): + return + + for d in data["result"]: + if d["side"] == "Buy": + volume = d["size"] + else: + volume = -d["size"] + + position = PositionData( + symbol=d["symbol"], + exchange=Exchange.BYBIT, + direction=Direction.NET, + volume=volume, + price=d["entry_price"], + pnl=d["unrealised_pnl"], + gateway_name=self.gateway_name + ) + self.gateway.on_position(position) account = AccountData( + accountid=d["symbol"].replace("USD", ""), + balance=d["wallet_balance"], + frozen=d["order_margin"], gateway_name=self.gateway_name, - accountid=p1.symbol, - balance=data['wallet_balance'], - frozen=data['order_margin'] ) self.gateway.on_account(account) - def query_orders(self): - """ - query all orders, including stop orders - """ - self.add_request("GET", - "/open-api/order/list", - callback=self.on_query_orders, - ) - self.query_stop_orders() + def query_order(self, page: int = 1): + """""" + params = { + "limit": 50, + "page": page, + "order_status": "Created,New,PartiallyFilled" + } - def query_stop_orders(self): - self.add_request("GET", - "/open-api/stop-order/list", - callback=self.on_query_stop_orders, - ) + self.add_request( + "GET", + "/open-api/order/list", + callback=self.on_query_order, + params=params + ) - def on_query_orders(self, raw_data: dict, request: "Request"): - result = raw_data['result'] - for data in result['data']: - if data['order_status'] != 'NotActive': # UnTriggered StopOrder - order = self.gateway.parse_order_data(data) - self.gateway.on_order(order) + def on_query_order(self, data: dict, request: Request): + """""" + if self.check_error("查询委托", data): + return - def on_query_stop_orders(self, raw_data: dict, request: "Request"): - result = raw_data['result'] - for data in result['data']: - order = self.gateway.parse_stop_order_data(data) + result = data["result"] + if not result: + self.gateway.write_log("委托信息查询成功") + return + + for d in result["data"]: + sys_orderid = d["order_id"] + + # Use sys_orderid as local_orderid when + # order placed from other source + local_orderid = d["order_link_id"] + if not local_orderid: + local_orderid = sys_orderid + + self.order_manager.update_orderid_map( + local_orderid, + sys_orderid + ) + + order = OrderData( + symbol=d["symbol"], + exchange=Exchange.BYBIT, + orderid=local_orderid, + type=ORDER_TYPE_BYBIT2VT[d["order_type"]], + direction=DIRECTION_BYBIT2VT[d["side"]], + price=d["price"], + volume=d["qty"], + traded=d["cum_exec_qty"], + status=STATUS_BYBIT2VT[d["order_status"]], + time=d["created_at"], + gateway_name=self.gateway_name + ) self.gateway.on_order(order) - - - + if result["current_page"] != result["last_page"]: + self.query_order(result["current_page"] + 1) + else: + self.gateway.write_log("委托信息查询成功") class RawOrderInfo: - def __init__(self, raw_data: dict): - self.id = raw_data['id'] - self.price = raw_data['price'] - self.side = raw_data['side'] # 'Buy', 'Sell' - self.size = raw_data.get('size', 0) + def __init__(self, data: dict): + self.id = data["id"] + self.price = data["price"] + self.side = data["side"] # "Buy", "Sell" + self.size = data.get("size", 0) def __lt__(self, rhs: "RawOrderInfo"): return self.price < rhs.price @@ -897,37 +538,29 @@ class RawOrderInfo: return self.price == rhs.price def __hash__(self): - return self.id # price is a string and we don't known its precision + return self.id # price is a string and we don"t known its precision class BybitWebsocketApi(WebsocketClient): """""" - def __init__(self, gateway: "BybitGateway"): + def __init__(self, gateway: BybitGateway): """""" - super(BybitWebsocketApi, self).__init__() + super().__init__() - self.server: str = "" # REAL or TESTNET self.gateway = gateway self.gateway_name = gateway.gateway_name + self.order_manager = gateway.order_manager self.key = "" self.secret = b"" + self.server: str = "" # REAL or TESTNET - self._instrument_info_data: Dict[str, dict] = defaultdict(dict) - self._orderbook_data: Dict[str, SortedSet[RawOrderInfo]] = defaultdict(SortedSet) + self.callbacks: Dict[str, Callable] = {} + self.ticks: Dict[str, TickData] = {} - self.accounts = {} - - self._topic_callbacks = {} - - @property - def ticks(self): - return self.gateway.ticks - - @property - def orders(self): - return self.gateway.orders + self.symbol_bids: Dict[str, dict] = {} + self.symbol_asks: Dict[str, dict] = {} def connect( self, key: str, secret: str, server: str, proxy_host: str, proxy_port: int @@ -939,50 +572,72 @@ class BybitWebsocketApi(WebsocketClient): self.proxy_port = proxy_port self.server = server - self.reset_authentication() + if self.server == "REAL": + url = WEBSOCKET_HOST + else: + url = TESTNET_WEBSOCKET_HOST + + self.init(url, self.proxy_host, self.proxy_port) self.start() - def reset_authentication(self): + def login(self): + """""" expires = generate_timestamp(30) - d2s = f"GET/realtime{int(expires)}" - signature = sign(self.secret, d2s.encode()) - params = f"api_key={self.key}&expires={expires}&signature={signature}" - if self.server == "REAL": - host = HOST - else: - host = TEST_HOST - url = f'{host}?{params}' - self.init(url, self.proxy_host, self.proxy_port) + msg = f"GET/realtime{int(expires)}" + signature = sign(self.secret, msg.encode()) + + req = { + "op": "auth", + "args": [self.key, expires, signature] + } + self.send_packet(req) def subscribe(self, req: SubscribeRequest): """ Subscribe to tick data upate. """ - symbol = req.symbol - self._subscribe_topic(f"instrument_info.100ms.{symbol}", self.on_tick) - self._subscribe_topic(f"orderBookL2_25.{symbol}", self.on_depth) + tick = TickData( + symbol=req.symbol, + exchange=req.exchange, + datetime=datetime.now(), + name=req.symbol, + gateway_name=self.gateway_name + ) + self.ticks[req.symbol] = tick + + self.subscribe_topic(f"instrument_info.100ms.{req.symbol}", self.on_tick) + self.subscribe_topic(f"orderBookL2_25.{req.symbol}", self.on_depth) def on_connected(self): """""" - self.gateway.write_log("Websocket API连接且认证成功") - self._subscribe_initialize_topics() + self.gateway.write_log("Websocket API连接成功") + self.login() def on_disconnected(self): """""" self.gateway.write_log("Websocket API连接断开") - self.reset_authentication() def on_packet(self, packet: dict): """""" - success = packet.get('success', None) - topic = packet.get('topic', None) - if success is not None: - if success is False: - self.gateway.write_log("Websocket API报错:%s" % packet["ret_msg"]) - elif topic is not None: - self._topic_callbacks[topic](topic, packet) + print(packet) + if "topic" not in packet: + op = packet["request"]["op"] + if op == "auth": + self.on_login(packet) else: - self.gateway.write_log(f"unkonwn packet: {packet}") + channel = packet["topic"] + callback = self.callbacks[channel] + callback(packet) + + # success = packet.get("success", None) + # topic = packet.get("topic", None) + # if success is not None: + # if success is False: + # self.gateway.write_log("Websocket API报错:%s" % packet["ret_msg"]) + # elif topic is not None: + # self.callbacks[topic](topic, packet) + # else: + # self.gateway.write_log(f"unkonwn packet: {packet}") def on_error(self, exception_type: type, exception_value: Exception, tb): """""" @@ -992,26 +647,23 @@ class BybitWebsocketApi(WebsocketClient): sys.stderr.write(self.exception_detail( exception_type, exception_value, tb)) - def authenticate(self): - """ - Authenticate websockey connection to subscribe private topic. - """ - expires = int(time.time()) - method = "GET" - path = "/realtime" - msg = method + path + str(expires) - signature = hmac.new( - self.secret, msg.encode(), digestmod=hashlib.sha256 - ).hexdigest() + def on_login(self, packet: dict): + """""" + success = packet.get("success", False) + if success: + self.gateway.write_log("Websocket API登录成功") - req = {"op": "authKey", "args": [self.key, expires, signature]} - self.send_packet(req) + self.subscribe_topic("order", self.on_order) + self.subscribe_topic("execution", self.on_trade) + self.subscribe_topic("position", self.on_position) + else: + self.gateway.write_log("Websocket API登录失败") - def _subscribe_topic(self, topic: str, callback: Callable[[str, dict], Any]): + def subscribe_topic(self, topic: str, callback: Callable[[str, dict], Any]): """ Subscribe to all private topics. """ - self._topic_callbacks[topic] = callback + self.callbacks[topic] = callback req = { "op": "subscribe", @@ -1019,118 +671,101 @@ class BybitWebsocketApi(WebsocketClient): } self.send_packet(req) - def _subscribe_initialize_topics(self): - """ - Subscribe to all private topics. - """ - self._subscribe_topic("order", self.on_order) - self._subscribe_topic("execution", self.on_trade) - self._subscribe_topic("position", self.on_position) - - def _get_last_tick(self, symbol: str): - tick = self.ticks.get(symbol, None) - if tick is None: - # noinspection PyTypeChecker - tick = TickData( - symbol=symbol, - exchange=Exchange.BYBIT, - name=symbol, - datetime=None, # this will be filled before this new created tick is consumed. - gateway_name=self.gateway_name, - ) - self.ticks[symbol] = tick - return tick - - def on_tick(self, topic: str, raw_data: dict): + def on_tick(self, packet: dict): """""" - # parse incremental data sent from server - symbol = topic[22:] - self._update_tick_incremental_data(symbol, raw_data) + topic = packet["topic"] + type_ = packet["type"] + data = packet["data"] + timestamp = packet["timestamp_e6"] - # convert dict into TickData - last_data = self._instrument_info_data[symbol] - tick = self._get_last_tick(symbol) - tick.last_price = last_data["last_price_e4"] / 10000 - tick.datetime = _parse_timestamp_e6(raw_data['timestamp_e6']) + symbol = topic.replace("instrument_info.100ms.", "") + tick = self.ticks[symbol] + + if type_ == "snapshot": + tick.last_price = data["last_price_e4"] / 10000 + tick.volume = data["volume_24h"] + else: + update = data["update"][0] + + if "last_price_e4" in update: + tick.last_price = update["last_price_e4"] / 10000 + + if "volume_24h" in update: + tick.volume = update["volume_24h"] + + tick.datetime = datetime.fromtimestamp(timestamp / 1_000_000) self.gateway.on_tick(copy(tick)) - def _update_tick_incremental_data(self, symbol, raw_data): - type_ = raw_data['type'] - data = raw_data['data'] - last_data = self._instrument_info_data[symbol] - if type_ == 'snapshot': - last_data.clear() - last_data.update(data) - elif type_ == 'delta': - updates = data['update'] - for update in updates: - assert update['symbol'] == symbol - last_data.update(update) - else: - self.gateway.write_log(f"Unknown type {type_} in websocket_api.on_tick()") - - def on_depth(self, topic: str, raw_data: dict): + def on_depth(self, packet: dict): """""" - symbol = topic[15:] - self._update_depth_incremental_data(symbol, raw_data) + topic = packet["topic"] + type_ = packet["type"] + data = packet["data"] + timestamp = packet["timestamp_e6"] - last_data = self._orderbook_data[symbol] + # Update depth data into dict buf + symbol = topic.replace("orderBookL2_25.", "") + tick = self.ticks[symbol] + bids = self.symbol_bids.setdefault(symbol, {}) + asks = self.symbol_asks.setdefault(symbol, {}) - tick = self._get_last_tick(symbol) - for n, parsed in enumerate(last_data.islice(20, 25)): - tick.__setattr__(f"bid_price_{5 - n}", parsed.price) - tick.__setattr__(f"bid_volume_{5 - n}", parsed.size) + if type_ == "snapshot": + for d in data: + price = float(d["price"]) - for n, parsed in enumerate(last_data.islice(25, 30)): - tick.__setattr__(f"ask_price_{n + 1}", parsed.price) - tick.__setattr__(f"ask_volume_{n + 1}", parsed.size) + if d["side"] == "Buy": + bids[price] = d + else: + asks[price] = d + else: + for d in data["delete"]: + price = float(d["price"]) + if d["side"] == "Buy": + bids.pop(price) + else: + asks.pop(price) - tick.datetime = _parse_timestamp_e6(raw_data['timestamp_e6']) + for d in (data["update"] + data["insert"]): + price = float(d["price"]) + if d["side"] == "Buy": + bids[price] = d + else: + asks[price] = d + + # Calculate 1-5 bid/ask depth + bid_keys = list(bids.keys()) + bid_keys.sort(reverse=True) + + ask_keys = list(asks.keys()) + ask_keys.sort() + + for i in range(5): + n = i + 1 + + bid_price = bid_keys[i] + bid_data = bids[bid_price] + ask_price = ask_keys[i] + ask_data = asks[ask_price] + + setattr(tick, f"bid_price_{n}", bid_price) + setattr(tick, f"bid_volume_{n}", bid_data["size"]) + setattr(tick, f"ask_price_{n}", ask_price) + setattr(tick, f"ask_volume_{n}", ask_data["size"]) + + tick.datetime = datetime.fromtimestamp(timestamp / 1_000_000) self.gateway.on_tick(copy(tick)) - def _update_depth_incremental_data(self, symbol, raw_data): - type_ = raw_data['type'] - data = raw_data['data'] - last_data = self._orderbook_data[symbol] - if type_ == 'snapshot': - last_data.clear() - for item in data: - assert item['symbol'] == symbol - parsed = RawOrderInfo(item) - last_data.add(parsed) - elif type_ == 'delta': - deletes = data['delete'] - for delete in deletes: - assert delete['symbol'] == symbol - parsed = RawOrderInfo(delete) - last_data.remove(parsed) - - updates = data['update'] - for update in updates: - assert update['symbol'] == symbol - parsed = RawOrderInfo(update) - last_data.remove(parsed) - last_data.add(parsed) - - inserts = data['insert'] - for insert in inserts: - assert insert['symbol'] == symbol - parsed = RawOrderInfo(insert) - last_data.add(parsed) - else: - self.gateway.write_log(f"Unknown type {type_} in websocket_api.on_tick()") - - def on_trade(self, topic: str, raw_data: dict): + def on_trade(self, topic: str, data: dict): """""" - for data in raw_data['data']: - order_id = data['order_link_id'] + for data in data["data"]: + order_id = data["order_link_id"] if not order_id: - order_id = data['order_id'] + order_id = data["order_id"] trade = TradeData( symbol=data["symbol"], exchange=Exchange.BYBIT, orderid=order_id, - tradeid=data['exec_id'], + tradeid=data["exec_id"], direction=DIRECTION_BYBIT2VT[data["side"]], price=data["price"], volume=data["exec_qty"], @@ -1140,72 +775,27 @@ class BybitWebsocketApi(WebsocketClient): self.gateway.on_trade(trade) - def on_order(self, topic: str, raw_data: dict): + def on_order(self, topic: str, data: dict): """""" - for data in raw_data['data']: + for data in data["data"]: print(data) - order = self.gateway.parse_order_data(data, 'timestamp') + order = self.gateway.parse_order_data(data, "timestamp") self.gateway.on_order(copy(order)) - def on_position(self, topic: str, raw_data: dict): + def on_position(self, topic: str, data: dict): """""" - for data in raw_data['data']: + for data in data["data"]: p1, p2 = self.gateway.parse_position_data(data) self.gateway.on_position(p1) if p2: self.gateway.on_position(p2) - def on_account(self, d): - """""" - accountid = str(d["account"]) - account = self.accounts.get(accountid, None) - if not account: - account = AccountData(accountid=accountid, - gateway_name=self.gateway_name) - self.accounts[accountid] = account - - account.balance = d.get("marginBalance", account.balance) - account.available = d.get("availableMargin", account.available) - account.frozen = account.balance - account.available - - self.gateway.on_account(copy(account)) - - def on_contract(self, d): - """""" - if "tickSize" not in d: - return - - if not d["lotSize"]: - return - - contract = ContractData( - symbol=d["symbol"], - exchange=Exchange.BYBIT, - name=d["symbol"], - product=Product.FUTURES, - pricetick=d["tickSize"], - size=d["lotSize"], - stop_supported=True, - net_position=True, - history_data=True, - gateway_name=self.gateway_name, - ) - - self.gateway.on_contract(contract) - - def _ping(self): - super()._ping() - self.send_packet({'op': 'ping'}) - def _parse_timestamp_e6(timestamp: int): return datetime.fromtimestamp(timestamp / 1_000_000, utc_tz).astimezone(tz=local_tz) - - - def generate_timestamp(expire_after: float = 30) -> int: """ :param expire_after: expires in seconds.