From 3c24365ae66e99e26c103f8238157f10f41c1094 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Sun, 20 Oct 2019 11:27:05 +0800 Subject: [PATCH] [Mod] move all bybit codes into bybit_gateway.py --- vnpy/gateway/bybit/bybit_gateway.py | 932 +++++++++++++++++++++++++++- vnpy/gateway/bybit/common.py | 82 --- vnpy/gateway/bybit/rest_api.py | 485 --------------- vnpy/gateway/bybit/websocket_api.py | 342 ---------- 4 files changed, 931 insertions(+), 910 deletions(-) delete mode 100644 vnpy/gateway/bybit/common.py delete mode 100644 vnpy/gateway/bybit/rest_api.py delete mode 100644 vnpy/gateway/bybit/websocket_api.py diff --git a/vnpy/gateway/bybit/bybit_gateway.py b/vnpy/gateway/bybit/bybit_gateway.py index 4559b1df..e97ad129 100644 --- a/vnpy/gateway/bybit/bybit_gateway.py +++ b/vnpy/gateway/bybit/bybit_gateway.py @@ -1,12 +1,51 @@ """""" +import hashlib +import hmac +import time +from datetime import datetime, timedelta, timezone + from dataclasses import dataclass from datetime import datetime from typing import Any, Dict, List, Tuple +import sys +from dataclasses import dataclass +from datetime import datetime +from threading import Lock +from typing import List, TYPE_CHECKING, Tuple +from urllib.parse import urlencode + +from requests import ConnectionError + +import hashlib +import hmac +import sys +import time +from collections import defaultdict +from copy import copy +from datetime import datetime +from typing import Any, Callable, Dict, TYPE_CHECKING + +from sortedcontainers import SortedSet + +from vnpy.api.websocket import WebsocketClient +from vnpy.trader.constant import (Exchange, Product) +from vnpy.trader.object import (AccountData, ContractData, SubscribeRequest, TickData, TradeData) +from .common import (DIRECTION_BYBIT2VT, generate_timestamp, local_tz, parse_datetime, sign, utc_tz) + + + +from vnpy.api.rest import Request, RestClient +from vnpy.trader.constant import Exchange, Interval, OrderType, Product, Status +from vnpy.trader.object import AccountData, BarData, CancelRequest, ContractData, OrderRequest +from .common import (DIRECTION_VT2BYBIT, INTERVAL_VT2BYBIT, ORDER_TYPE_VT2BYBIT, generate_timestamp, + parse_datetime, sign, INTERVAL_VT2BYBIT_INT) + from vnpy.event import Event from vnpy.gateway.bybit.rest_api import BybitRestApi, HistoryDataNextInfo from vnpy.gateway.bybit.websocket_api import BybitWebsocketApi from vnpy.trader.constant import (Exchange, Interval, OrderType) +from vnpy.trader.constant import Direction, Interval, OrderType, Status from vnpy.trader.event import EVENT_TIMER from vnpy.trader.gateway import BaseGateway from vnpy.trader.object import (BarData, CancelRequest, HistoryRequest, OrderData, OrderRequest, @@ -16,12 +55,110 @@ from .common import (DIRECTION_BYBIT2VT, INTERVAL_VT2BYBIT_INT, OPPOSITE_DIRECTI parse_datetime, utc_tz) +STATUS_BYBIT2VT = { + "Created": Status.NOTTRADED, + "New": Status.NOTTRADED, + "PartiallyFilled": Status.PARTTRADED, + "Filled": Status.ALLTRADED, + "Cancelled": Status.CANCELLED, + "Rejected": Status.REJECTED, +} +STOP_ORDER_STATUS_BYBIT2VT = { + "Untriggered": Status.NOTTRADED, + "Triggered": Status.NOTTRADED, + # Active: triggered and placed. + # since price is market price, placed == AllTraded? + "Active": Status.ALLTRADED, + "Cancelled": Status.CANCELLED, + "Rejected": Status.REJECTED, +} +DIRECTION_VT2BYBIT = {Direction.LONG: "Buy", Direction.SHORT: "Sell"} +DIRECTION_BYBIT2VT = {v: k for k, v in DIRECTION_VT2BYBIT.items()} +DIRECTION_BYBIT2VT.update({ + "None": Direction.LONG +}) + +OPPOSITE_DIRECTION = { + Direction.LONG: Direction.SHORT, + Direction.SHORT: Direction.LONG, +} + +ORDER_TYPE_VT2BYBIT = { + OrderType.LIMIT: "Limit", + OrderType.MARKET: "Market", +} + +ORDER_TYPE_BYBIT2VT = {v: k for k, v in ORDER_TYPE_VT2BYBIT.items()} + +INTERVAL_VT2BYBIT = { + Interval.MINUTE: "1", + Interval.HOUR: "60", + Interval.DAILY: "D", + Interval.WEEKLY: "W", +} +INTERVAL_VT2BYBIT_INT = { + Interval.MINUTE: 1, + Interval.HOUR: 60, + Interval.DAILY: 60 * 24, + Interval.WEEKLY: 60 * 24 * 7, +} +TIMEDELTA_MAP = { + Interval.MINUTE: timedelta(minutes=1), + Interval.HOUR: timedelta(hours=1), + Interval.DAILY: timedelta(days=1), + Interval.WEEKLY: timedelta(days=7), +} + +if TYPE_CHECKING: + from vnpy.gateway.bybit import BybitGateway + +if TYPE_CHECKING: + from vnpy.gateway.bybit import BybitGateway + +HOST = "wss://stream.bybit.com/realtime" +TEST_HOST = "wss://stream-testnet.bybit.com/realtime" + + +def _(x): return x # noqa + + +HOST = "https://api.bybit.com" +TEST_HOST = "https://api-testnet.bybit.com" + +# asked from official developer +PRICE_TICKS = { + "BTCUSD": 0.5, + "ETHUSD": 0.05, + "EOSUSD": 0.001, + "XRPUSD": 0.0001, +} + +utc_tz = timezone.utc +local_tz = datetime.now(timezone.utc).astimezone().tzinfo + + @dataclass() class HistoryDataInfo: bars: List[BarData] extra: Any + + +@dataclass() +class HistoryDataNextInfo: + symbol: str + interval: Interval + end: int + + +class RequestFailedException(Exception): + pass + + + + + class BybitGateway(BaseGateway): """ VN Trader Gateway for BitMEX connection. @@ -244,7 +381,8 @@ class BybitGateway(BaseGateway): direction=DIRECTION_BYBIT2VT[data["side"]], price=data["price"], volume=data["qty"], - time=parse_datetime(data['updated_at']), # this should be filled manually + # this should be filled manually + time=parse_datetime(data['updated_at']), gateway_name=self.gateway_name, ) self.orders[order.orderid] = order @@ -293,3 +431,795 @@ class BybitGateway(BaseGateway): """ if order is not None: super().on_order(order) + + +class BybitRestApi(RestClient): + """ + BitMEX REST API + """ + + def __init__(self, gateway: "BybitGateway"): + """""" + super(BybitRestApi, self).__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.key = "" + self.secret = b"" + + self.order_count = 1_000_000 + self.order_count_lock = Lock() + + self.connect_time = 0 + + # Use 60 by default, and will update after first request + self.rate_limit_limit = 60 + self.rate_limit_remaining = 60 + self.rate_limit_sleep = 0 + + @property + def ticks(self): + return self.gateway.ticks + + def sign(self, request): + """ + Generate BitMEX signature. + """ + + if request.method == "GET": + api_params = request.params # dict 2 sign + if api_params is None: + api_params = request.params = {} + else: # POST + api_params = request.data + if api_params is None: + api_params = request.data = {} + + expires = generate_timestamp(0) + + api_params['api_key'] = self.key + api_params['recv_window'] = 30 * 1000 + api_params['timestamp'] = expires + + data2sign = urlencode({k: api_params[k] for k in sorted(api_params)}) + signature = sign(self.secret, data2sign.encode()) + api_params['sign'] = signature + + return request + + def connect( + self, + key: str, + secret: str, + session_number: int, + server: str, + proxy_host: str, + proxy_port: int, + ): + """ + Initialize connection to REST server. + """ + self.key = key + self.secret = secret.encode() + + self.connect_time = ( + int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count + ) + + if server == "REAL": + self.init(HOST, proxy_host, proxy_port) + else: + self.init(TEST_HOST, proxy_host, proxy_port) + + self.start(session_number) + + self.gateway.write_log(_("REST API启动成功")) + + def _new_order_id(self): + """""" + with self.order_count_lock: + self.order_count += 1 + return self.order_count + + def send_order(self, req: OrderRequest): + """""" + if not self.check_rate_limit(): + return "" + + order_id = str(self.connect_time + self._new_order_id()) + + symbol = req.symbol + data = { + "symbol": symbol, + "side": DIRECTION_VT2BYBIT[req.direction], + "qty": int(req.volume), + "order_link_id": order_id, + "time_in_force": "GoodTillCancel" + } + + order = req.create_order_data(order_id, self.gateway_name) + order.time = datetime.now().isoformat()[11:19] + + # Only add price for limit order. + if req.type != OrderType.STOP: + data["order_type"] = ORDER_TYPE_VT2BYBIT[req.type] + data["price"] = req.price + self.add_request( + "POST", + "/open-api/order/create", + callback=self.on_send_order, + data=data, + extra=order, + on_failed=self.on_send_order_failed, + on_error=self.on_send_order_error, + ) + else: + assert self.ticks[symbol], _("Subscribe This Symbol before sending StopOrder.") + last_price = self.ticks[symbol].last_price + data["order_type"] = 'Market' + data["stop_px"] = req.price + data["base_price"] = last_price + self.add_request( + "POST", + "/open-api/stop-order/create", + callback=self.on_send_stop_order, + data=data, + extra=order, + on_failed=self.on_send_order_failed, + on_error=self.on_send_order_error, + ) + + self.gateway.on_order(order) + return order.vt_orderid + + def cancel_order(self, req: CancelRequest): + """""" + if not self.check_rate_limit(): + return + sys_id = self.gateway.orderid2sys(req.orderid) + order = self.gateway.orders[req.orderid] + + if order.type != OrderType.STOP: + path = "/open-api/order/cancel" + key = "order_id" + callback = self.on_cancel_order + else: + path = "/open-api/stop-order/cancel" + key = "stop_order_id" + callback = self.on_cancel_stop_order + + self.add_request( + "POST", + path, + callback=callback, + data={ + key: sys_id, + "symbol": req.symbol, + }, + on_error=self.on_cancel_order_error, + extra=order, + ) + + def query_history(self, + symbol: str, + interval: Interval, + start: int, # unix timestamp + limit: int = None, + ) -> Tuple[List[BarData], "HistoryDataNextInfo"]: + """ + Get history data synchronously. + """ + if limit is None: + limit = self.gateway.HISTORY_RECORD_PER_REQUEST + + bars = [] + + # datetime for a bar is close_time + # we got open_time from API. + adjustment = INTERVAL_VT2BYBIT_INT[interval] + + params = { + "interval": INTERVAL_VT2BYBIT[interval], + "symbol": symbol, + "limit": limit, + "from": start, + } + + # todo: RestClient: return RestClient.Request object instead of requests.Response. + resp = self.request( + "GET", + "/v2/public/kline/list", + params=params + ) + + # Break if request failed with other status code + raw_data = resp.json() + if not self.is_request_success(raw_data, None): + msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}" + self.gateway.write_log(msg) + raise RequestFailedException(msg) + result = raw_data['result'] + for data in result: + open_time = int(data["open_time"]) + close_time = open_time + adjustment + close_dt = datetime.fromtimestamp(close_time) + bar = BarData( + symbol=symbol, + exchange=Exchange.BYBIT, + datetime=close_dt, + interval=interval, + volume=data["volume"], + open_price=data["open"], + high_price=data["high"], + low_price=data["low"], + close_price=data["close"], + gateway_name=self.gateway_name + ) + bars.append(bar) + + end = result[-1]["open_time"] + return bars, HistoryDataNextInfo(symbol, interval, end) + + def on_send_order_failed(self, status_code: int, request: Request): + """ + Callback when sending order failed on server. + """ + data = request.response.json() + self.update_rate_limit(data) + + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) + + msg = f"委托失败,错误代码:{data['ret_code']}, 错误信息:{data['ret_msg']}" + + self.gateway.write_log(msg) + + def on_send_order_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback when sending order caused exception. + """ + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) + + # Record exception if not ConnectionError + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) + + def on_send_order(self, raw_data: dict, request: Request): + """""" + + data = raw_data['result'] + order = self.gateway.parse_order_data(data) + self.gateway.on_order(order) + + self.update_rate_limit(raw_data) + + def on_send_stop_order(self, raw_data: dict, request: Request): + """""" + data = raw_data['result'] + order = self.gateway.parse_stop_order_data(data) + order.time = parse_datetime(data['updated_at']) + self.gateway.on_order(order) + + self.update_rate_limit(raw_data) + + def on_cancel_order_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback when cancelling order failed on server. + """ + # Record exception if not ConnectionError + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) + + def on_cancel_order(self, raw_data: dict, request: Request): + """""" + data: dict = raw_data['result'] + + order = self.gateway.parse_order_data(data) + self.gateway.on_order(order) + self.update_rate_limit(data) + + def on_cancel_stop_order(self, raw_data: dict, request: Request): + """""" + data: dict = raw_data['result'] + order = self.gateway.parse_stop_order_data(data) + order.time = parse_datetime(data['updated_at']) + self.gateway.on_order(order) + self.update_rate_limit(data) + + def on_failed(self, status_code: int, request: Request): + """ + Callback to handle request failed. + """ + data = request.response.json() + self._handle_error_response(data, request) + + def _handle_error_response(self, data, request, operation_name: str = None): + if operation_name is None: + operation_name = request.path + + self.update_rate_limit(data) + error_msg = data["ret_msg"] + error_code = data['ret_code'] + msg = f"请求{operation_name}失败,状态码:{request.status},错误代码:{error_code}, 信息:{error_msg}" + msg += f'\n{request}' + self.gateway.write_log(msg) + + def on_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback to handler request exception. + """ + msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" + self.gateway.write_log(msg) + + sys.stderr.write( + self.exception_detail(exception_type, exception_value, tb, request) + ) + + def update_rate_limit(self, data: dict): + """ + Update current request limit remaining status. + :param data: + """ + remaining = data.get('rate_limit_status', None) + if remaining is not None: + self.rate_limit_remaining = remaining + + def increase_rate_limit(self): + """ + Reset request limit remaining every 1 second. + """ + self.rate_limit_remaining += 1 + self.rate_limit_remaining = min( + self.rate_limit_remaining, self.rate_limit_limit) + + # Countdown of retry sleep seconds + if self.rate_limit_sleep: + self.rate_limit_sleep -= 1 + + def check_rate_limit(self): + """ + Check if rate limit is reached before sending out requests. + """ + # Already received 429 from server + if self.rate_limit_sleep: + msg = f"请求过于频繁,已被Bybit限制,请等待{self.rate_limit_sleep}秒后再试" + self.gateway.write_log(msg) + return False + # Just local request limit is reached + elif not self.rate_limit_remaining: + msg = "请求频率太高,有触发Bybit流控的风险,请稍候再试" + self.gateway.write_log(msg) + return False + else: + self.rate_limit_remaining -= 1 + return True + + def is_request_success(self, data: dict, request: "Request"): + return data['ret_code'] == 0 + + def query_contracts(self): + self.add_request("GET", + "/v2/public/tickers", + self.on_query_contracts) + + def on_query_contracts(self, data: dict, request: "Request"): + for result in data['result']: + symbol = result['symbol'] + contract = ContractData( + gateway_name=self.gateway_name, + symbol=symbol, + exchange=Exchange.BYBIT, + name=symbol, + product=Product.FUTURES, + size=1, + # todo: pricetick: Currently(2019-9-2) unable to query. + pricetick=PRICE_TICKS.get(symbol, 0.0001), + ) + self.gateway.on_contract(contract) + + def query_position(self): + self.add_request("GET", + "/position/list", + self.on_query_position + ) + + def on_query_position(self, raw_data: dict, request: "Request"): + for data in raw_data['result']: + p1, p2 = self.gateway.parse_position_data(data) + self.gateway.on_position(p1) + if p2: + self.gateway.on_position(p2) + + account = AccountData( + gateway_name=self.gateway_name, + accountid=p1.symbol, + balance=data['wallet_balance'], + frozen=data['order_margin'] + ) + self.gateway.on_account(account) + + def query_orders(self): + """ + query all orders, including stop orders + """ + self.add_request("GET", + "/open-api/order/list", + callback=self.on_query_orders, + ) + self.query_stop_orders() + + def query_stop_orders(self): + self.add_request("GET", + "/open-api/stop-order/list", + callback=self.on_query_stop_orders, + ) + + def on_query_orders(self, raw_data: dict, request: "Request"): + result = raw_data['result'] + for data in result['data']: + if data['order_status'] != 'NotActive': # UnTriggered StopOrder + order = self.gateway.parse_order_data(data) + self.gateway.on_order(order) + + def on_query_stop_orders(self, raw_data: dict, request: "Request"): + result = raw_data['result'] + for data in result['data']: + order = self.gateway.parse_stop_order_data(data) + self.gateway.on_order(order) + + + + + + +class RawOrderInfo: + + def __init__(self, raw_data: dict): + self.id = raw_data['id'] + self.price = raw_data['price'] + self.side = raw_data['side'] # 'Buy', 'Sell' + self.size = raw_data.get('size', 0) + + def __lt__(self, rhs: "RawOrderInfo"): + return self.price < rhs.price + + def __eq__(self, rhs: "RawOrderInfo"): + return self.price == rhs.price + + def __hash__(self): + return self.id # price is a string and we don't known its precision + + +class BybitWebsocketApi(WebsocketClient): + """""" + + def __init__(self, gateway: "BybitGateway"): + """""" + super(BybitWebsocketApi, self).__init__() + + self.server: str = "" # REAL or TESTNET + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.key = "" + self.secret = b"" + + self._instrument_info_data: Dict[str, dict] = defaultdict(dict) + self._orderbook_data: Dict[str, SortedSet[RawOrderInfo]] = defaultdict(SortedSet) + + self.accounts = {} + + self._topic_callbacks = {} + + @property + def ticks(self): + return self.gateway.ticks + + @property + def orders(self): + return self.gateway.orders + + def connect( + self, key: str, secret: str, server: str, proxy_host: str, proxy_port: int + ): + """""" + self.key = key + self.secret = secret.encode() + self.proxy_host = proxy_host + self.proxy_port = proxy_port + self.server = server + + self.reset_authentication() + self.start() + + def reset_authentication(self): + expires = generate_timestamp(30) + d2s = f"GET/realtime{int(expires)}" + signature = sign(self.secret, d2s.encode()) + params = f"api_key={self.key}&expires={expires}&signature={signature}" + if self.server == "REAL": + host = HOST + else: + host = TEST_HOST + url = f'{host}?{params}' + self.init(url, self.proxy_host, self.proxy_port) + + def subscribe(self, req: SubscribeRequest): + """ + Subscribe to tick data upate. + """ + symbol = req.symbol + self._subscribe_topic(f"instrument_info.100ms.{symbol}", self.on_tick) + self._subscribe_topic(f"orderBookL2_25.{symbol}", self.on_depth) + + def on_connected(self): + """""" + self.gateway.write_log("Websocket API连接且认证成功") + self._subscribe_initialize_topics() + + def on_disconnected(self): + """""" + self.gateway.write_log("Websocket API连接断开") + self.reset_authentication() + + def on_packet(self, packet: dict): + """""" + success = packet.get('success', None) + topic = packet.get('topic', None) + if success is not None: + if success is False: + self.gateway.write_log("Websocket API报错:%s" % packet["ret_msg"]) + elif topic is not None: + self._topic_callbacks[topic](topic, packet) + else: + self.gateway.write_log(f"unkonwn packet: {packet}") + + def on_error(self, exception_type: type, exception_value: Exception, tb): + """""" + msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" + self.gateway.write_log(msg) + + sys.stderr.write(self.exception_detail( + exception_type, exception_value, tb)) + + def authenticate(self): + """ + Authenticate websockey connection to subscribe private topic. + """ + expires = int(time.time()) + method = "GET" + path = "/realtime" + msg = method + path + str(expires) + signature = hmac.new( + self.secret, msg.encode(), digestmod=hashlib.sha256 + ).hexdigest() + + req = {"op": "authKey", "args": [self.key, expires, signature]} + self.send_packet(req) + + def _subscribe_topic(self, topic: str, callback: Callable[[str, dict], Any]): + """ + Subscribe to all private topics. + """ + self._topic_callbacks[topic] = callback + + req = { + "op": "subscribe", + "args": [topic], + } + self.send_packet(req) + + def _subscribe_initialize_topics(self): + """ + Subscribe to all private topics. + """ + self._subscribe_topic("order", self.on_order) + self._subscribe_topic("execution", self.on_trade) + self._subscribe_topic("position", self.on_position) + + def _get_last_tick(self, symbol: str): + tick = self.ticks.get(symbol, None) + if tick is None: + # noinspection PyTypeChecker + tick = TickData( + symbol=symbol, + exchange=Exchange.BYBIT, + name=symbol, + datetime=None, # this will be filled before this new created tick is consumed. + gateway_name=self.gateway_name, + ) + self.ticks[symbol] = tick + return tick + + def on_tick(self, topic: str, raw_data: dict): + """""" + # parse incremental data sent from server + symbol = topic[22:] + self._update_tick_incremental_data(symbol, raw_data) + + # convert dict into TickData + last_data = self._instrument_info_data[symbol] + tick = self._get_last_tick(symbol) + tick.last_price = last_data["last_price_e4"] / 10000 + tick.datetime = _parse_timestamp_e6(raw_data['timestamp_e6']) + self.gateway.on_tick(copy(tick)) + + def _update_tick_incremental_data(self, symbol, raw_data): + type_ = raw_data['type'] + data = raw_data['data'] + last_data = self._instrument_info_data[symbol] + if type_ == 'snapshot': + last_data.clear() + last_data.update(data) + elif type_ == 'delta': + updates = data['update'] + for update in updates: + assert update['symbol'] == symbol + last_data.update(update) + else: + self.gateway.write_log(f"Unknown type {type_} in websocket_api.on_tick()") + + def on_depth(self, topic: str, raw_data: dict): + """""" + symbol = topic[15:] + self._update_depth_incremental_data(symbol, raw_data) + + last_data = self._orderbook_data[symbol] + + tick = self._get_last_tick(symbol) + for n, parsed in enumerate(last_data.islice(20, 25)): + tick.__setattr__(f"bid_price_{5 - n}", parsed.price) + tick.__setattr__(f"bid_volume_{5 - n}", parsed.size) + + for n, parsed in enumerate(last_data.islice(25, 30)): + tick.__setattr__(f"ask_price_{n + 1}", parsed.price) + tick.__setattr__(f"ask_volume_{n + 1}", parsed.size) + + tick.datetime = _parse_timestamp_e6(raw_data['timestamp_e6']) + self.gateway.on_tick(copy(tick)) + + def _update_depth_incremental_data(self, symbol, raw_data): + type_ = raw_data['type'] + data = raw_data['data'] + last_data = self._orderbook_data[symbol] + if type_ == 'snapshot': + last_data.clear() + for item in data: + assert item['symbol'] == symbol + parsed = RawOrderInfo(item) + last_data.add(parsed) + elif type_ == 'delta': + deletes = data['delete'] + for delete in deletes: + assert delete['symbol'] == symbol + parsed = RawOrderInfo(delete) + last_data.remove(parsed) + + updates = data['update'] + for update in updates: + assert update['symbol'] == symbol + parsed = RawOrderInfo(update) + last_data.remove(parsed) + last_data.add(parsed) + + inserts = data['insert'] + for insert in inserts: + assert insert['symbol'] == symbol + parsed = RawOrderInfo(insert) + last_data.add(parsed) + else: + self.gateway.write_log(f"Unknown type {type_} in websocket_api.on_tick()") + + def on_trade(self, topic: str, raw_data: dict): + """""" + for data in raw_data['data']: + order_id = data['order_link_id'] + if not order_id: + order_id = data['order_id'] + trade = TradeData( + symbol=data["symbol"], + exchange=Exchange.BYBIT, + orderid=order_id, + tradeid=data['exec_id'], + direction=DIRECTION_BYBIT2VT[data["side"]], + price=data["price"], + volume=data["exec_qty"], + time=parse_datetime(data["trade_time"]), + gateway_name=self.gateway_name, + ) + + self.gateway.on_trade(trade) + + def on_order(self, topic: str, raw_data: dict): + """""" + for data in raw_data['data']: + print(data) + order = self.gateway.parse_order_data(data, 'timestamp') + + self.gateway.on_order(copy(order)) + + def on_position(self, topic: str, raw_data: dict): + """""" + for data in raw_data['data']: + p1, p2 = self.gateway.parse_position_data(data) + self.gateway.on_position(p1) + if p2: + self.gateway.on_position(p2) + + def on_account(self, d): + """""" + accountid = str(d["account"]) + account = self.accounts.get(accountid, None) + if not account: + account = AccountData(accountid=accountid, + gateway_name=self.gateway_name) + self.accounts[accountid] = account + + account.balance = d.get("marginBalance", account.balance) + account.available = d.get("availableMargin", account.available) + account.frozen = account.balance - account.available + + self.gateway.on_account(copy(account)) + + def on_contract(self, d): + """""" + if "tickSize" not in d: + return + + if not d["lotSize"]: + return + + contract = ContractData( + symbol=d["symbol"], + exchange=Exchange.BYBIT, + name=d["symbol"], + product=Product.FUTURES, + pricetick=d["tickSize"], + size=d["lotSize"], + stop_supported=True, + net_position=True, + history_data=True, + gateway_name=self.gateway_name, + ) + + self.gateway.on_contract(contract) + + def _ping(self): + super()._ping() + self.send_packet({'op': 'ping'}) + + +def _parse_timestamp_e6(timestamp: int): + return datetime.fromtimestamp(timestamp / 1_000_000, utc_tz).astimezone(tz=local_tz) + + + + + +def generate_timestamp(expire_after: float = 30) -> int: + """ + :param expire_after: expires in seconds. + :return: timestamp in milliseconds + """ + return int(time.time() * 1000 + expire_after * 1000) + + +def sign(secret: bytes, data: bytes) -> str: + """""" + return hmac.new( + secret, data, digestmod=hashlib.sha256 + ).hexdigest() + + +def parse_datetime(dt: str) -> str: + return dt[11:19] diff --git a/vnpy/gateway/bybit/common.py b/vnpy/gateway/bybit/common.py deleted file mode 100644 index 7d9acc17..00000000 --- a/vnpy/gateway/bybit/common.py +++ /dev/null @@ -1,82 +0,0 @@ -import hashlib -import hmac -import time -from datetime import datetime, timedelta, timezone - -from vnpy.trader.constant import Direction, Interval, OrderType, Status - -STATUS_BYBIT2VT = { - "Created": Status.NOTTRADED, - "New": Status.NOTTRADED, - "PartiallyFilled": Status.PARTTRADED, - "Filled": Status.ALLTRADED, - "Cancelled": Status.CANCELLED, - "Rejected": Status.REJECTED, -} -STOP_ORDER_STATUS_BYBIT2VT = { - "Untriggered": Status.NOTTRADED, - "Triggered": Status.NOTTRADED, - # Active: triggered and placed. - # since price is market price, placed == AllTraded? - "Active": Status.ALLTRADED, - "Cancelled": Status.CANCELLED, - "Rejected": Status.REJECTED, -} -DIRECTION_VT2BYBIT = {Direction.LONG: "Buy", Direction.SHORT: "Sell"} -DIRECTION_BYBIT2VT = {v: k for k, v in DIRECTION_VT2BYBIT.items()} -DIRECTION_BYBIT2VT.update({ - "None": Direction.LONG -}) - -OPPOSITE_DIRECTION = { - Direction.LONG: Direction.SHORT, - Direction.SHORT: Direction.LONG, -} - -ORDER_TYPE_VT2BYBIT = { - OrderType.LIMIT: "Limit", - OrderType.MARKET: "Market", -} - -ORDER_TYPE_BYBIT2VT = {v: k for k, v in ORDER_TYPE_VT2BYBIT.items()} - -INTERVAL_VT2BYBIT = { - Interval.MINUTE: "1", - Interval.HOUR: "60", - Interval.DAILY: "D", - Interval.WEEKLY: "W", -} -INTERVAL_VT2BYBIT_INT = { - Interval.MINUTE: 1, - Interval.HOUR: 60, - Interval.DAILY: 60 * 24, - Interval.WEEKLY: 60 * 24 * 7, -} -TIMEDELTA_MAP = { - Interval.MINUTE: timedelta(minutes=1), - Interval.HOUR: timedelta(hours=1), - Interval.DAILY: timedelta(days=1), - Interval.WEEKLY: timedelta(days=7), -} - -utc_tz = timezone.utc -local_tz = datetime.now(timezone.utc).astimezone().tzinfo - - -def generate_timestamp(expire_after: float = 30) -> int: - """ - :param expire_after: expires in seconds. - :return: timestamp in milliseconds - """ - return int(time.time() * 1000 + expire_after * 1000) - - -def sign(secret: bytes, data: bytes) -> str: - """""" - return hmac.new( - secret, data, digestmod=hashlib.sha256 - ).hexdigest() - - -def parse_datetime(dt: str) -> str: - return dt[11:19] diff --git a/vnpy/gateway/bybit/rest_api.py b/vnpy/gateway/bybit/rest_api.py deleted file mode 100644 index abaa9f82..00000000 --- a/vnpy/gateway/bybit/rest_api.py +++ /dev/null @@ -1,485 +0,0 @@ -import sys -from dataclasses import dataclass -from datetime import datetime -from threading import Lock -from typing import List, TYPE_CHECKING, Tuple -from urllib.parse import urlencode - -from requests import ConnectionError - -from vnpy.api.rest import Request, RestClient -from vnpy.trader.constant import Exchange, Interval, OrderType, Product, Status -from vnpy.trader.object import AccountData, BarData, CancelRequest, ContractData, OrderRequest -from .common import (DIRECTION_VT2BYBIT, INTERVAL_VT2BYBIT, ORDER_TYPE_VT2BYBIT, generate_timestamp, - parse_datetime, sign, INTERVAL_VT2BYBIT_INT) - -if TYPE_CHECKING: - from vnpy.gateway.bybit import BybitGateway -_ = lambda x: x # noqa - -HOST = "https://api.bybit.com" -TEST_HOST = "https://api-testnet.bybit.com" - -# asked from official developer -PRICE_TICKS = { - "BTCUSD": 0.5, - "ETHUSD": 0.05, - "EOSUSD": 0.001, - "XRPUSD": 0.0001, -} - - -@dataclass() -class HistoryDataNextInfo: - symbol: str - interval: Interval - end: int - - -class RequestFailedException(Exception): - pass - - -class BybitRestApi(RestClient): - """ - BitMEX REST API - """ - - def __init__(self, gateway: "BybitGateway"): - """""" - super(BybitRestApi, self).__init__() - - self.gateway = gateway - self.gateway_name = gateway.gateway_name - - self.key = "" - self.secret = b"" - - self.order_count = 1_000_000 - self.order_count_lock = Lock() - - self.connect_time = 0 - - # Use 60 by default, and will update after first request - self.rate_limit_limit = 60 - self.rate_limit_remaining = 60 - self.rate_limit_sleep = 0 - - @property - def ticks(self): - return self.gateway.ticks - - def sign(self, request): - """ - Generate BitMEX signature. - """ - - if request.method == "GET": - api_params = request.params # dict 2 sign - if api_params is None: - api_params = request.params = {} - else: # POST - api_params = request.data - if api_params is None: - api_params = request.data = {} - - expires = generate_timestamp(0) - - api_params['api_key'] = self.key - api_params['recv_window'] = 30 * 1000 - api_params['timestamp'] = expires - - data2sign = urlencode({k: api_params[k] for k in sorted(api_params)}) - signature = sign(self.secret, data2sign.encode()) - api_params['sign'] = signature - - return request - - def connect( - self, - key: str, - secret: str, - session_number: int, - server: str, - proxy_host: str, - proxy_port: int, - ): - """ - Initialize connection to REST server. - """ - self.key = key - self.secret = secret.encode() - - self.connect_time = ( - int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count - ) - - if server == "REAL": - self.init(HOST, proxy_host, proxy_port) - else: - self.init(TEST_HOST, proxy_host, proxy_port) - - self.start(session_number) - - self.gateway.write_log(_("REST API启动成功")) - - def _new_order_id(self): - """""" - with self.order_count_lock: - self.order_count += 1 - return self.order_count - - def send_order(self, req: OrderRequest): - """""" - if not self.check_rate_limit(): - return "" - - order_id = str(self.connect_time + self._new_order_id()) - - symbol = req.symbol - data = { - "symbol": symbol, - "side": DIRECTION_VT2BYBIT[req.direction], - "qty": int(req.volume), - "order_link_id": order_id, - "time_in_force": "GoodTillCancel" - } - - order = req.create_order_data(order_id, self.gateway_name) - order.time = datetime.now().isoformat()[11:19] - - # Only add price for limit order. - if req.type != OrderType.STOP: - data["order_type"] = ORDER_TYPE_VT2BYBIT[req.type] - data["price"] = req.price - self.add_request( - "POST", - "/open-api/order/create", - callback=self.on_send_order, - data=data, - extra=order, - on_failed=self.on_send_order_failed, - on_error=self.on_send_order_error, - ) - else: - assert self.ticks[symbol], _("Subscribe This Symbol before sending StopOrder.") - last_price = self.ticks[symbol].last_price - data["order_type"] = 'Market' - data["stop_px"] = req.price - data["base_price"] = last_price - self.add_request( - "POST", - "/open-api/stop-order/create", - callback=self.on_send_stop_order, - data=data, - extra=order, - on_failed=self.on_send_order_failed, - on_error=self.on_send_order_error, - ) - - self.gateway.on_order(order) - return order.vt_orderid - - def cancel_order(self, req: CancelRequest): - """""" - if not self.check_rate_limit(): - return - sys_id = self.gateway.orderid2sys(req.orderid) - order = self.gateway.orders[req.orderid] - - if order.type != OrderType.STOP: - path = "/open-api/order/cancel" - key = "order_id" - callback = self.on_cancel_order - else: - path = "/open-api/stop-order/cancel" - key = "stop_order_id" - callback = self.on_cancel_stop_order - - self.add_request( - "POST", - path, - callback=callback, - data={ - key: sys_id, - "symbol": req.symbol, - }, - on_error=self.on_cancel_order_error, - extra=order, - ) - - def query_history(self, - symbol: str, - interval: Interval, - start: int, # unix timestamp - limit: int = None, - ) -> Tuple[List[BarData], "HistoryDataNextInfo"]: - """ - Get history data synchronously. - """ - if limit is None: - limit = self.gateway.HISTORY_RECORD_PER_REQUEST - - bars = [] - - # datetime for a bar is close_time - # we got open_time from API. - adjustment = INTERVAL_VT2BYBIT_INT[interval] - - params = { - "interval": INTERVAL_VT2BYBIT[interval], - "symbol": symbol, - "limit": limit, - "from": start, - } - - # todo: RestClient: return RestClient.Request object instead of requests.Response. - resp = self.request( - "GET", - "/v2/public/kline/list", - params=params - ) - - # Break if request failed with other status code - raw_data = resp.json() - if not self.is_request_success(raw_data, None): - msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}" - self.gateway.write_log(msg) - raise RequestFailedException(msg) - result = raw_data['result'] - for data in result: - open_time = int(data["open_time"]) - close_time = open_time + adjustment - close_dt = datetime.fromtimestamp(close_time) - bar = BarData( - symbol=symbol, - exchange=Exchange.BYBIT, - datetime=close_dt, - interval=interval, - volume=data["volume"], - open_price=data["open"], - high_price=data["high"], - low_price=data["low"], - close_price=data["close"], - gateway_name=self.gateway_name - ) - bars.append(bar) - - end = result[-1]["open_time"] - return bars, HistoryDataNextInfo(symbol, interval, end) - - def on_send_order_failed(self, status_code: int, request: Request): - """ - Callback when sending order failed on server. - """ - data = request.response.json() - self.update_rate_limit(data) - - order = request.extra - order.status = Status.REJECTED - self.gateway.on_order(order) - - msg = f"委托失败,错误代码:{data['ret_code']}, 错误信息:{data['ret_msg']}" - - self.gateway.write_log(msg) - - def on_send_order_error( - self, exception_type: type, exception_value: Exception, tb, request: Request - ): - """ - Callback when sending order caused exception. - """ - order = request.extra - order.status = Status.REJECTED - self.gateway.on_order(order) - - # Record exception if not ConnectionError - if not issubclass(exception_type, ConnectionError): - self.on_error(exception_type, exception_value, tb, request) - - def on_send_order(self, raw_data: dict, request: Request): - """""" - - data = raw_data['result'] - order = self.gateway.parse_order_data(data) - self.gateway.on_order(order) - - self.update_rate_limit(raw_data) - - def on_send_stop_order(self, raw_data: dict, request: Request): - """""" - data = raw_data['result'] - order = self.gateway.parse_stop_order_data(data) - order.time = parse_datetime(data['updated_at']) - self.gateway.on_order(order) - - self.update_rate_limit(raw_data) - - def on_cancel_order_error( - self, exception_type: type, exception_value: Exception, tb, request: Request - ): - """ - Callback when cancelling order failed on server. - """ - # Record exception if not ConnectionError - if not issubclass(exception_type, ConnectionError): - self.on_error(exception_type, exception_value, tb, request) - - def on_cancel_order(self, raw_data: dict, request: Request): - """""" - data: dict = raw_data['result'] - - order = self.gateway.parse_order_data(data) - self.gateway.on_order(order) - self.update_rate_limit(data) - - def on_cancel_stop_order(self, raw_data: dict, request: Request): - """""" - data: dict = raw_data['result'] - order = self.gateway.parse_stop_order_data(data) - order.time = parse_datetime(data['updated_at']) - self.gateway.on_order(order) - self.update_rate_limit(data) - - def on_failed(self, status_code: int, request: Request): - """ - Callback to handle request failed. - """ - data = request.response.json() - self._handle_error_response(data, request) - - def _handle_error_response(self, data, request, operation_name: str = None): - if operation_name is None: - operation_name = request.path - - self.update_rate_limit(data) - error_msg = data["ret_msg"] - error_code = data['ret_code'] - msg = f"请求{operation_name}失败,状态码:{request.status},错误代码:{error_code}, 信息:{error_msg}" - msg += f'\n{request}' - self.gateway.write_log(msg) - - def on_error( - self, exception_type: type, exception_value: Exception, tb, request: Request - ): - """ - Callback to handler request exception. - """ - msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" - self.gateway.write_log(msg) - - sys.stderr.write( - self.exception_detail(exception_type, exception_value, tb, request) - ) - - def update_rate_limit(self, data: dict): - """ - Update current request limit remaining status. - :param data: - """ - remaining = data.get('rate_limit_status', None) - if remaining is not None: - self.rate_limit_remaining = remaining - - def increase_rate_limit(self): - """ - Reset request limit remaining every 1 second. - """ - self.rate_limit_remaining += 1 - self.rate_limit_remaining = min( - self.rate_limit_remaining, self.rate_limit_limit) - - # Countdown of retry sleep seconds - if self.rate_limit_sleep: - self.rate_limit_sleep -= 1 - - def check_rate_limit(self): - """ - Check if rate limit is reached before sending out requests. - """ - # Already received 429 from server - if self.rate_limit_sleep: - msg = f"请求过于频繁,已被Bybit限制,请等待{self.rate_limit_sleep}秒后再试" - self.gateway.write_log(msg) - return False - # Just local request limit is reached - elif not self.rate_limit_remaining: - msg = "请求频率太高,有触发Bybit流控的风险,请稍候再试" - self.gateway.write_log(msg) - return False - else: - self.rate_limit_remaining -= 1 - return True - - def is_request_success(self, data: dict, request: "Request"): - return data['ret_code'] == 0 - - def query_contracts(self): - self.add_request("GET", - "/v2/public/tickers", - self.on_query_contracts) - - def on_query_contracts(self, data: dict, request: "Request"): - for result in data['result']: - symbol = result['symbol'] - contract = ContractData( - gateway_name=self.gateway_name, - symbol=symbol, - exchange=Exchange.BYBIT, - name=symbol, - product=Product.FUTURES, - size=1, - # todo: pricetick: Currently(2019-9-2) unable to query. - pricetick=PRICE_TICKS.get(symbol, 0.0001), - ) - self.gateway.on_contract(contract) - - def query_position(self): - self.add_request("GET", - "/position/list", - self.on_query_position - ) - - def on_query_position(self, raw_data: dict, request: "Request"): - for data in raw_data['result']: - p1, p2 = self.gateway.parse_position_data(data) - self.gateway.on_position(p1) - if p2: - self.gateway.on_position(p2) - - account = AccountData( - gateway_name=self.gateway_name, - accountid=p1.symbol, - balance=data['wallet_balance'], - frozen=data['order_margin'] - ) - self.gateway.on_account(account) - - def query_orders(self): - """ - query all orders, including stop orders - """ - self.add_request("GET", - "/open-api/order/list", - callback=self.on_query_orders, - ) - self.query_stop_orders() - - def query_stop_orders(self): - self.add_request("GET", - "/open-api/stop-order/list", - callback=self.on_query_stop_orders, - ) - - def on_query_orders(self, raw_data: dict, request: "Request"): - result = raw_data['result'] - for data in result['data']: - if data['order_status'] != 'NotActive': # UnTriggered StopOrder - order = self.gateway.parse_order_data(data) - self.gateway.on_order(order) - - def on_query_stop_orders(self, raw_data: dict, request: "Request"): - result = raw_data['result'] - for data in result['data']: - order = self.gateway.parse_stop_order_data(data) - self.gateway.on_order(order) diff --git a/vnpy/gateway/bybit/websocket_api.py b/vnpy/gateway/bybit/websocket_api.py deleted file mode 100644 index c66c7da0..00000000 --- a/vnpy/gateway/bybit/websocket_api.py +++ /dev/null @@ -1,342 +0,0 @@ -import hashlib -import hmac -import sys -import time -from collections import defaultdict -from copy import copy -from datetime import datetime -from typing import Any, Callable, Dict, TYPE_CHECKING - -from sortedcontainers import SortedSet - -from vnpy.api.websocket import WebsocketClient -from vnpy.trader.constant import (Exchange, Product) -from vnpy.trader.object import (AccountData, ContractData, SubscribeRequest, TickData, TradeData) -from .common import (DIRECTION_BYBIT2VT, generate_timestamp, local_tz, parse_datetime, sign, utc_tz) - -if TYPE_CHECKING: - from vnpy.gateway.bybit import BybitGateway - -HOST = "wss://stream.bybit.com/realtime" -TEST_HOST = "wss://stream-testnet.bybit.com/realtime" - - -class RawOrderInfo: - - def __init__(self, raw_data: dict): - self.id = raw_data['id'] - self.price = raw_data['price'] - self.side = raw_data['side'] # 'Buy', 'Sell' - self.size = raw_data.get('size', 0) - - def __lt__(self, rhs: "RawOrderInfo"): - return self.price < rhs.price - - def __eq__(self, rhs: "RawOrderInfo"): - return self.price == rhs.price - - def __hash__(self): - return self.id # price is a string and we don't known its precision - - -class BybitWebsocketApi(WebsocketClient): - """""" - - def __init__(self, gateway: "BybitGateway"): - """""" - super(BybitWebsocketApi, self).__init__() - - self.server: str = "" # REAL or TESTNET - self.gateway = gateway - self.gateway_name = gateway.gateway_name - - self.key = "" - self.secret = b"" - - self._instrument_info_data: Dict[str, dict] = defaultdict(dict) - self._orderbook_data: Dict[str, SortedSet[RawOrderInfo]] = defaultdict(SortedSet) - - self.accounts = {} - - self._topic_callbacks = {} - - @property - def ticks(self): - return self.gateway.ticks - - @property - def orders(self): - return self.gateway.orders - - def connect( - self, key: str, secret: str, server: str, proxy_host: str, proxy_port: int - ): - """""" - self.key = key - self.secret = secret.encode() - self.proxy_host = proxy_host - self.proxy_port = proxy_port - self.server = server - - self.reset_authentication() - self.start() - - def reset_authentication(self): - expires = generate_timestamp(30) - d2s = f"GET/realtime{int(expires)}" - signature = sign(self.secret, d2s.encode()) - params = f"api_key={self.key}&expires={expires}&signature={signature}" - if self.server == "REAL": - host = HOST - else: - host = TEST_HOST - url = f'{host}?{params}' - self.init(url, self.proxy_host, self.proxy_port) - - def subscribe(self, req: SubscribeRequest): - """ - Subscribe to tick data upate. - """ - symbol = req.symbol - self._subscribe_topic(f"instrument_info.100ms.{symbol}", self.on_tick) - self._subscribe_topic(f"orderBookL2_25.{symbol}", self.on_depth) - - def on_connected(self): - """""" - self.gateway.write_log("Websocket API连接且认证成功") - self._subscribe_initialize_topics() - - def on_disconnected(self): - """""" - self.gateway.write_log("Websocket API连接断开") - self.reset_authentication() - - def on_packet(self, packet: dict): - """""" - success = packet.get('success', None) - topic = packet.get('topic', None) - if success is not None: - if success is False: - self.gateway.write_log("Websocket API报错:%s" % packet["ret_msg"]) - elif topic is not None: - self._topic_callbacks[topic](topic, packet) - else: - self.gateway.write_log(f"unkonwn packet: {packet}") - - def on_error(self, exception_type: type, exception_value: Exception, tb): - """""" - msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" - self.gateway.write_log(msg) - - sys.stderr.write(self.exception_detail( - exception_type, exception_value, tb)) - - def authenticate(self): - """ - Authenticate websockey connection to subscribe private topic. - """ - expires = int(time.time()) - method = "GET" - path = "/realtime" - msg = method + path + str(expires) - signature = hmac.new( - self.secret, msg.encode(), digestmod=hashlib.sha256 - ).hexdigest() - - req = {"op": "authKey", "args": [self.key, expires, signature]} - self.send_packet(req) - - def _subscribe_topic(self, topic: str, callback: Callable[[str, dict], Any]): - """ - Subscribe to all private topics. - """ - self._topic_callbacks[topic] = callback - - req = { - "op": "subscribe", - "args": [topic], - } - self.send_packet(req) - - def _subscribe_initialize_topics(self): - """ - Subscribe to all private topics. - """ - self._subscribe_topic("order", self.on_order) - self._subscribe_topic("execution", self.on_trade) - self._subscribe_topic("position", self.on_position) - - def _get_last_tick(self, symbol: str): - tick = self.ticks.get(symbol, None) - if tick is None: - # noinspection PyTypeChecker - tick = TickData( - symbol=symbol, - exchange=Exchange.BYBIT, - name=symbol, - datetime=None, # this will be filled before this new created tick is consumed. - gateway_name=self.gateway_name, - ) - self.ticks[symbol] = tick - return tick - - def on_tick(self, topic: str, raw_data: dict): - """""" - # parse incremental data sent from server - symbol = topic[22:] - self._update_tick_incremental_data(symbol, raw_data) - - # convert dict into TickData - last_data = self._instrument_info_data[symbol] - tick = self._get_last_tick(symbol) - tick.last_price = last_data["last_price_e4"] / 10000 - tick.datetime = _parse_timestamp_e6(raw_data['timestamp_e6']) - self.gateway.on_tick(copy(tick)) - - def _update_tick_incremental_data(self, symbol, raw_data): - type_ = raw_data['type'] - data = raw_data['data'] - last_data = self._instrument_info_data[symbol] - if type_ == 'snapshot': - last_data.clear() - last_data.update(data) - elif type_ == 'delta': - updates = data['update'] - for update in updates: - assert update['symbol'] == symbol - last_data.update(update) - else: - self.gateway.write_log(f"Unknown type {type_} in websocket_api.on_tick()") - - def on_depth(self, topic: str, raw_data: dict): - """""" - symbol = topic[15:] - self._update_depth_incremental_data(symbol, raw_data) - - last_data = self._orderbook_data[symbol] - - tick = self._get_last_tick(symbol) - for n, parsed in enumerate(last_data.islice(20, 25)): - tick.__setattr__(f"bid_price_{5 - n}", parsed.price) - tick.__setattr__(f"bid_volume_{5 - n}", parsed.size) - - for n, parsed in enumerate(last_data.islice(25, 30)): - tick.__setattr__(f"ask_price_{n + 1}", parsed.price) - tick.__setattr__(f"ask_volume_{n + 1}", parsed.size) - - tick.datetime = _parse_timestamp_e6(raw_data['timestamp_e6']) - self.gateway.on_tick(copy(tick)) - - def _update_depth_incremental_data(self, symbol, raw_data): - type_ = raw_data['type'] - data = raw_data['data'] - last_data = self._orderbook_data[symbol] - if type_ == 'snapshot': - last_data.clear() - for item in data: - assert item['symbol'] == symbol - parsed = RawOrderInfo(item) - last_data.add(parsed) - elif type_ == 'delta': - deletes = data['delete'] - for delete in deletes: - assert delete['symbol'] == symbol - parsed = RawOrderInfo(delete) - last_data.remove(parsed) - - updates = data['update'] - for update in updates: - assert update['symbol'] == symbol - parsed = RawOrderInfo(update) - last_data.remove(parsed) - last_data.add(parsed) - - inserts = data['insert'] - for insert in inserts: - assert insert['symbol'] == symbol - parsed = RawOrderInfo(insert) - last_data.add(parsed) - else: - self.gateway.write_log(f"Unknown type {type_} in websocket_api.on_tick()") - - def on_trade(self, topic: str, raw_data: dict): - """""" - for data in raw_data['data']: - order_id = data['order_link_id'] - if not order_id: - order_id = data['order_id'] - trade = TradeData( - symbol=data["symbol"], - exchange=Exchange.BYBIT, - orderid=order_id, - tradeid=data['exec_id'], - direction=DIRECTION_BYBIT2VT[data["side"]], - price=data["price"], - volume=data["exec_qty"], - time=parse_datetime(data["trade_time"]), - gateway_name=self.gateway_name, - ) - - self.gateway.on_trade(trade) - - def on_order(self, topic: str, raw_data: dict): - """""" - for data in raw_data['data']: - print(data) - order = self.gateway.parse_order_data(data, 'timestamp') - - self.gateway.on_order(copy(order)) - - def on_position(self, topic: str, raw_data: dict): - """""" - for data in raw_data['data']: - p1, p2 = self.gateway.parse_position_data(data) - self.gateway.on_position(p1) - if p2: - self.gateway.on_position(p2) - - def on_account(self, d): - """""" - accountid = str(d["account"]) - account = self.accounts.get(accountid, None) - if not account: - account = AccountData(accountid=accountid, - gateway_name=self.gateway_name) - self.accounts[accountid] = account - - account.balance = d.get("marginBalance", account.balance) - account.available = d.get("availableMargin", account.available) - account.frozen = account.balance - account.available - - self.gateway.on_account(copy(account)) - - def on_contract(self, d): - """""" - if "tickSize" not in d: - return - - if not d["lotSize"]: - return - - contract = ContractData( - symbol=d["symbol"], - exchange=Exchange.BYBIT, - name=d["symbol"], - product=Product.FUTURES, - pricetick=d["tickSize"], - size=d["lotSize"], - stop_supported=True, - net_position=True, - history_data=True, - gateway_name=self.gateway_name, - ) - - self.gateway.on_contract(contract) - - def _ping(self): - super()._ping() - self.send_packet({'op': 'ping'}) - - -def _parse_timestamp_e6(timestamp: int): - return datetime.fromtimestamp(timestamp / 1_000_000, utc_tz).astimezone(tz=local_tz)