From 3c24365ae66e99e26c103f8238157f10f41c1094 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Sun, 20 Oct 2019 11:27:05 +0800 Subject: [PATCH 1/5] [Mod] move all bybit codes into bybit_gateway.py --- vnpy/gateway/bybit/bybit_gateway.py | 932 +++++++++++++++++++++++++++- vnpy/gateway/bybit/common.py | 82 --- vnpy/gateway/bybit/rest_api.py | 485 --------------- vnpy/gateway/bybit/websocket_api.py | 342 ---------- 4 files changed, 931 insertions(+), 910 deletions(-) delete mode 100644 vnpy/gateway/bybit/common.py delete mode 100644 vnpy/gateway/bybit/rest_api.py delete mode 100644 vnpy/gateway/bybit/websocket_api.py diff --git a/vnpy/gateway/bybit/bybit_gateway.py b/vnpy/gateway/bybit/bybit_gateway.py index 4559b1df..e97ad129 100644 --- a/vnpy/gateway/bybit/bybit_gateway.py +++ b/vnpy/gateway/bybit/bybit_gateway.py @@ -1,12 +1,51 @@ """""" +import hashlib +import hmac +import time +from datetime import datetime, timedelta, timezone + from dataclasses import dataclass from datetime import datetime from typing import Any, Dict, List, Tuple +import sys +from dataclasses import dataclass +from datetime import datetime +from threading import Lock +from typing import List, TYPE_CHECKING, Tuple +from urllib.parse import urlencode + +from requests import ConnectionError + +import hashlib +import hmac +import sys +import time +from collections import defaultdict +from copy import copy +from datetime import datetime +from typing import Any, Callable, Dict, TYPE_CHECKING + +from sortedcontainers import SortedSet + +from vnpy.api.websocket import WebsocketClient +from vnpy.trader.constant import (Exchange, Product) +from vnpy.trader.object import (AccountData, ContractData, SubscribeRequest, TickData, TradeData) +from .common import (DIRECTION_BYBIT2VT, generate_timestamp, local_tz, parse_datetime, sign, utc_tz) + + + +from vnpy.api.rest import Request, RestClient +from vnpy.trader.constant import Exchange, Interval, OrderType, Product, Status +from vnpy.trader.object import AccountData, BarData, CancelRequest, ContractData, OrderRequest +from .common import (DIRECTION_VT2BYBIT, INTERVAL_VT2BYBIT, ORDER_TYPE_VT2BYBIT, generate_timestamp, + parse_datetime, sign, INTERVAL_VT2BYBIT_INT) + from vnpy.event import Event from vnpy.gateway.bybit.rest_api import BybitRestApi, HistoryDataNextInfo from vnpy.gateway.bybit.websocket_api import BybitWebsocketApi from vnpy.trader.constant import (Exchange, Interval, OrderType) +from vnpy.trader.constant import Direction, Interval, OrderType, Status from vnpy.trader.event import EVENT_TIMER from vnpy.trader.gateway import BaseGateway from vnpy.trader.object import (BarData, CancelRequest, HistoryRequest, OrderData, OrderRequest, @@ -16,12 +55,110 @@ from .common import (DIRECTION_BYBIT2VT, INTERVAL_VT2BYBIT_INT, OPPOSITE_DIRECTI parse_datetime, utc_tz) +STATUS_BYBIT2VT = { + "Created": Status.NOTTRADED, + "New": Status.NOTTRADED, + "PartiallyFilled": Status.PARTTRADED, + "Filled": Status.ALLTRADED, + "Cancelled": Status.CANCELLED, + "Rejected": Status.REJECTED, +} +STOP_ORDER_STATUS_BYBIT2VT = { + "Untriggered": Status.NOTTRADED, + "Triggered": Status.NOTTRADED, + # Active: triggered and placed. + # since price is market price, placed == AllTraded? + "Active": Status.ALLTRADED, + "Cancelled": Status.CANCELLED, + "Rejected": Status.REJECTED, +} +DIRECTION_VT2BYBIT = {Direction.LONG: "Buy", Direction.SHORT: "Sell"} +DIRECTION_BYBIT2VT = {v: k for k, v in DIRECTION_VT2BYBIT.items()} +DIRECTION_BYBIT2VT.update({ + "None": Direction.LONG +}) + +OPPOSITE_DIRECTION = { + Direction.LONG: Direction.SHORT, + Direction.SHORT: Direction.LONG, +} + +ORDER_TYPE_VT2BYBIT = { + OrderType.LIMIT: "Limit", + OrderType.MARKET: "Market", +} + +ORDER_TYPE_BYBIT2VT = {v: k for k, v in ORDER_TYPE_VT2BYBIT.items()} + +INTERVAL_VT2BYBIT = { + Interval.MINUTE: "1", + Interval.HOUR: "60", + Interval.DAILY: "D", + Interval.WEEKLY: "W", +} +INTERVAL_VT2BYBIT_INT = { + Interval.MINUTE: 1, + Interval.HOUR: 60, + Interval.DAILY: 60 * 24, + Interval.WEEKLY: 60 * 24 * 7, +} +TIMEDELTA_MAP = { + Interval.MINUTE: timedelta(minutes=1), + Interval.HOUR: timedelta(hours=1), + Interval.DAILY: timedelta(days=1), + Interval.WEEKLY: timedelta(days=7), +} + +if TYPE_CHECKING: + from vnpy.gateway.bybit import BybitGateway + +if TYPE_CHECKING: + from vnpy.gateway.bybit import BybitGateway + +HOST = "wss://stream.bybit.com/realtime" +TEST_HOST = "wss://stream-testnet.bybit.com/realtime" + + +def _(x): return x # noqa + + +HOST = "https://api.bybit.com" +TEST_HOST = "https://api-testnet.bybit.com" + +# asked from official developer +PRICE_TICKS = { + "BTCUSD": 0.5, + "ETHUSD": 0.05, + "EOSUSD": 0.001, + "XRPUSD": 0.0001, +} + +utc_tz = timezone.utc +local_tz = datetime.now(timezone.utc).astimezone().tzinfo + + @dataclass() class HistoryDataInfo: bars: List[BarData] extra: Any + + +@dataclass() +class HistoryDataNextInfo: + symbol: str + interval: Interval + end: int + + +class RequestFailedException(Exception): + pass + + + + + class BybitGateway(BaseGateway): """ VN Trader Gateway for BitMEX connection. @@ -244,7 +381,8 @@ class BybitGateway(BaseGateway): direction=DIRECTION_BYBIT2VT[data["side"]], price=data["price"], volume=data["qty"], - time=parse_datetime(data['updated_at']), # this should be filled manually + # this should be filled manually + time=parse_datetime(data['updated_at']), gateway_name=self.gateway_name, ) self.orders[order.orderid] = order @@ -293,3 +431,795 @@ class BybitGateway(BaseGateway): """ if order is not None: super().on_order(order) + + +class BybitRestApi(RestClient): + """ + BitMEX REST API + """ + + def __init__(self, gateway: "BybitGateway"): + """""" + super(BybitRestApi, self).__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.key = "" + self.secret = b"" + + self.order_count = 1_000_000 + self.order_count_lock = Lock() + + self.connect_time = 0 + + # Use 60 by default, and will update after first request + self.rate_limit_limit = 60 + self.rate_limit_remaining = 60 + self.rate_limit_sleep = 0 + + @property + def ticks(self): + return self.gateway.ticks + + def sign(self, request): + """ + Generate BitMEX signature. + """ + + if request.method == "GET": + api_params = request.params # dict 2 sign + if api_params is None: + api_params = request.params = {} + else: # POST + api_params = request.data + if api_params is None: + api_params = request.data = {} + + expires = generate_timestamp(0) + + api_params['api_key'] = self.key + api_params['recv_window'] = 30 * 1000 + api_params['timestamp'] = expires + + data2sign = urlencode({k: api_params[k] for k in sorted(api_params)}) + signature = sign(self.secret, data2sign.encode()) + api_params['sign'] = signature + + return request + + def connect( + self, + key: str, + secret: str, + session_number: int, + server: str, + proxy_host: str, + proxy_port: int, + ): + """ + Initialize connection to REST server. + """ + self.key = key + self.secret = secret.encode() + + self.connect_time = ( + int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count + ) + + if server == "REAL": + self.init(HOST, proxy_host, proxy_port) + else: + self.init(TEST_HOST, proxy_host, proxy_port) + + self.start(session_number) + + self.gateway.write_log(_("REST API启动成功")) + + def _new_order_id(self): + """""" + with self.order_count_lock: + self.order_count += 1 + return self.order_count + + def send_order(self, req: OrderRequest): + """""" + if not self.check_rate_limit(): + return "" + + order_id = str(self.connect_time + self._new_order_id()) + + symbol = req.symbol + data = { + "symbol": symbol, + "side": DIRECTION_VT2BYBIT[req.direction], + "qty": int(req.volume), + "order_link_id": order_id, + "time_in_force": "GoodTillCancel" + } + + order = req.create_order_data(order_id, self.gateway_name) + order.time = datetime.now().isoformat()[11:19] + + # Only add price for limit order. + if req.type != OrderType.STOP: + data["order_type"] = ORDER_TYPE_VT2BYBIT[req.type] + data["price"] = req.price + self.add_request( + "POST", + "/open-api/order/create", + callback=self.on_send_order, + data=data, + extra=order, + on_failed=self.on_send_order_failed, + on_error=self.on_send_order_error, + ) + else: + assert self.ticks[symbol], _("Subscribe This Symbol before sending StopOrder.") + last_price = self.ticks[symbol].last_price + data["order_type"] = 'Market' + data["stop_px"] = req.price + data["base_price"] = last_price + self.add_request( + "POST", + "/open-api/stop-order/create", + callback=self.on_send_stop_order, + data=data, + extra=order, + on_failed=self.on_send_order_failed, + on_error=self.on_send_order_error, + ) + + self.gateway.on_order(order) + return order.vt_orderid + + def cancel_order(self, req: CancelRequest): + """""" + if not self.check_rate_limit(): + return + sys_id = self.gateway.orderid2sys(req.orderid) + order = self.gateway.orders[req.orderid] + + if order.type != OrderType.STOP: + path = "/open-api/order/cancel" + key = "order_id" + callback = self.on_cancel_order + else: + path = "/open-api/stop-order/cancel" + key = "stop_order_id" + callback = self.on_cancel_stop_order + + self.add_request( + "POST", + path, + callback=callback, + data={ + key: sys_id, + "symbol": req.symbol, + }, + on_error=self.on_cancel_order_error, + extra=order, + ) + + def query_history(self, + symbol: str, + interval: Interval, + start: int, # unix timestamp + limit: int = None, + ) -> Tuple[List[BarData], "HistoryDataNextInfo"]: + """ + Get history data synchronously. + """ + if limit is None: + limit = self.gateway.HISTORY_RECORD_PER_REQUEST + + bars = [] + + # datetime for a bar is close_time + # we got open_time from API. + adjustment = INTERVAL_VT2BYBIT_INT[interval] + + params = { + "interval": INTERVAL_VT2BYBIT[interval], + "symbol": symbol, + "limit": limit, + "from": start, + } + + # todo: RestClient: return RestClient.Request object instead of requests.Response. + resp = self.request( + "GET", + "/v2/public/kline/list", + params=params + ) + + # Break if request failed with other status code + raw_data = resp.json() + if not self.is_request_success(raw_data, None): + msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}" + self.gateway.write_log(msg) + raise RequestFailedException(msg) + result = raw_data['result'] + for data in result: + open_time = int(data["open_time"]) + close_time = open_time + adjustment + close_dt = datetime.fromtimestamp(close_time) + bar = BarData( + symbol=symbol, + exchange=Exchange.BYBIT, + datetime=close_dt, + interval=interval, + volume=data["volume"], + open_price=data["open"], + high_price=data["high"], + low_price=data["low"], + close_price=data["close"], + gateway_name=self.gateway_name + ) + bars.append(bar) + + end = result[-1]["open_time"] + return bars, HistoryDataNextInfo(symbol, interval, end) + + def on_send_order_failed(self, status_code: int, request: Request): + """ + Callback when sending order failed on server. + """ + data = request.response.json() + self.update_rate_limit(data) + + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) + + msg = f"委托失败,错误代码:{data['ret_code']}, 错误信息:{data['ret_msg']}" + + self.gateway.write_log(msg) + + def on_send_order_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback when sending order caused exception. + """ + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) + + # Record exception if not ConnectionError + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) + + def on_send_order(self, raw_data: dict, request: Request): + """""" + + data = raw_data['result'] + order = self.gateway.parse_order_data(data) + self.gateway.on_order(order) + + self.update_rate_limit(raw_data) + + def on_send_stop_order(self, raw_data: dict, request: Request): + """""" + data = raw_data['result'] + order = self.gateway.parse_stop_order_data(data) + order.time = parse_datetime(data['updated_at']) + self.gateway.on_order(order) + + self.update_rate_limit(raw_data) + + def on_cancel_order_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback when cancelling order failed on server. + """ + # Record exception if not ConnectionError + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) + + def on_cancel_order(self, raw_data: dict, request: Request): + """""" + data: dict = raw_data['result'] + + order = self.gateway.parse_order_data(data) + self.gateway.on_order(order) + self.update_rate_limit(data) + + def on_cancel_stop_order(self, raw_data: dict, request: Request): + """""" + data: dict = raw_data['result'] + order = self.gateway.parse_stop_order_data(data) + order.time = parse_datetime(data['updated_at']) + self.gateway.on_order(order) + self.update_rate_limit(data) + + def on_failed(self, status_code: int, request: Request): + """ + Callback to handle request failed. + """ + data = request.response.json() + self._handle_error_response(data, request) + + def _handle_error_response(self, data, request, operation_name: str = None): + if operation_name is None: + operation_name = request.path + + self.update_rate_limit(data) + error_msg = data["ret_msg"] + error_code = data['ret_code'] + msg = f"请求{operation_name}失败,状态码:{request.status},错误代码:{error_code}, 信息:{error_msg}" + msg += f'\n{request}' + self.gateway.write_log(msg) + + def on_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback to handler request exception. + """ + msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" + self.gateway.write_log(msg) + + sys.stderr.write( + self.exception_detail(exception_type, exception_value, tb, request) + ) + + def update_rate_limit(self, data: dict): + """ + Update current request limit remaining status. + :param data: + """ + remaining = data.get('rate_limit_status', None) + if remaining is not None: + self.rate_limit_remaining = remaining + + def increase_rate_limit(self): + """ + Reset request limit remaining every 1 second. + """ + self.rate_limit_remaining += 1 + self.rate_limit_remaining = min( + self.rate_limit_remaining, self.rate_limit_limit) + + # Countdown of retry sleep seconds + if self.rate_limit_sleep: + self.rate_limit_sleep -= 1 + + def check_rate_limit(self): + """ + Check if rate limit is reached before sending out requests. + """ + # Already received 429 from server + if self.rate_limit_sleep: + msg = f"请求过于频繁,已被Bybit限制,请等待{self.rate_limit_sleep}秒后再试" + self.gateway.write_log(msg) + return False + # Just local request limit is reached + elif not self.rate_limit_remaining: + msg = "请求频率太高,有触发Bybit流控的风险,请稍候再试" + self.gateway.write_log(msg) + return False + else: + self.rate_limit_remaining -= 1 + return True + + def is_request_success(self, data: dict, request: "Request"): + return data['ret_code'] == 0 + + def query_contracts(self): + self.add_request("GET", + "/v2/public/tickers", + self.on_query_contracts) + + def on_query_contracts(self, data: dict, request: "Request"): + for result in data['result']: + symbol = result['symbol'] + contract = ContractData( + gateway_name=self.gateway_name, + symbol=symbol, + exchange=Exchange.BYBIT, + name=symbol, + product=Product.FUTURES, + size=1, + # todo: pricetick: Currently(2019-9-2) unable to query. + pricetick=PRICE_TICKS.get(symbol, 0.0001), + ) + self.gateway.on_contract(contract) + + def query_position(self): + self.add_request("GET", + "/position/list", + self.on_query_position + ) + + def on_query_position(self, raw_data: dict, request: "Request"): + for data in raw_data['result']: + p1, p2 = self.gateway.parse_position_data(data) + self.gateway.on_position(p1) + if p2: + self.gateway.on_position(p2) + + account = AccountData( + gateway_name=self.gateway_name, + accountid=p1.symbol, + balance=data['wallet_balance'], + frozen=data['order_margin'] + ) + self.gateway.on_account(account) + + def query_orders(self): + """ + query all orders, including stop orders + """ + self.add_request("GET", + "/open-api/order/list", + callback=self.on_query_orders, + ) + self.query_stop_orders() + + def query_stop_orders(self): + self.add_request("GET", + "/open-api/stop-order/list", + callback=self.on_query_stop_orders, + ) + + def on_query_orders(self, raw_data: dict, request: "Request"): + result = raw_data['result'] + for data in result['data']: + if data['order_status'] != 'NotActive': # UnTriggered StopOrder + order = self.gateway.parse_order_data(data) + self.gateway.on_order(order) + + def on_query_stop_orders(self, raw_data: dict, request: "Request"): + result = raw_data['result'] + for data in result['data']: + order = self.gateway.parse_stop_order_data(data) + self.gateway.on_order(order) + + + + + + +class RawOrderInfo: + + def __init__(self, raw_data: dict): + self.id = raw_data['id'] + self.price = raw_data['price'] + self.side = raw_data['side'] # 'Buy', 'Sell' + self.size = raw_data.get('size', 0) + + def __lt__(self, rhs: "RawOrderInfo"): + return self.price < rhs.price + + def __eq__(self, rhs: "RawOrderInfo"): + return self.price == rhs.price + + def __hash__(self): + return self.id # price is a string and we don't known its precision + + +class BybitWebsocketApi(WebsocketClient): + """""" + + def __init__(self, gateway: "BybitGateway"): + """""" + super(BybitWebsocketApi, self).__init__() + + self.server: str = "" # REAL or TESTNET + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.key = "" + self.secret = b"" + + self._instrument_info_data: Dict[str, dict] = defaultdict(dict) + self._orderbook_data: Dict[str, SortedSet[RawOrderInfo]] = defaultdict(SortedSet) + + self.accounts = {} + + self._topic_callbacks = {} + + @property + def ticks(self): + return self.gateway.ticks + + @property + def orders(self): + return self.gateway.orders + + def connect( + self, key: str, secret: str, server: str, proxy_host: str, proxy_port: int + ): + """""" + self.key = key + self.secret = secret.encode() + self.proxy_host = proxy_host + self.proxy_port = proxy_port + self.server = server + + self.reset_authentication() + self.start() + + def reset_authentication(self): + expires = generate_timestamp(30) + d2s = f"GET/realtime{int(expires)}" + signature = sign(self.secret, d2s.encode()) + params = f"api_key={self.key}&expires={expires}&signature={signature}" + if self.server == "REAL": + host = HOST + else: + host = TEST_HOST + url = f'{host}?{params}' + self.init(url, self.proxy_host, self.proxy_port) + + def subscribe(self, req: SubscribeRequest): + """ + Subscribe to tick data upate. + """ + symbol = req.symbol + self._subscribe_topic(f"instrument_info.100ms.{symbol}", self.on_tick) + self._subscribe_topic(f"orderBookL2_25.{symbol}", self.on_depth) + + def on_connected(self): + """""" + self.gateway.write_log("Websocket API连接且认证成功") + self._subscribe_initialize_topics() + + def on_disconnected(self): + """""" + self.gateway.write_log("Websocket API连接断开") + self.reset_authentication() + + def on_packet(self, packet: dict): + """""" + success = packet.get('success', None) + topic = packet.get('topic', None) + if success is not None: + if success is False: + self.gateway.write_log("Websocket API报错:%s" % packet["ret_msg"]) + elif topic is not None: + self._topic_callbacks[topic](topic, packet) + else: + self.gateway.write_log(f"unkonwn packet: {packet}") + + def on_error(self, exception_type: type, exception_value: Exception, tb): + """""" + msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" + self.gateway.write_log(msg) + + sys.stderr.write(self.exception_detail( + exception_type, exception_value, tb)) + + def authenticate(self): + """ + Authenticate websockey connection to subscribe private topic. + """ + expires = int(time.time()) + method = "GET" + path = "/realtime" + msg = method + path + str(expires) + signature = hmac.new( + self.secret, msg.encode(), digestmod=hashlib.sha256 + ).hexdigest() + + req = {"op": "authKey", "args": [self.key, expires, signature]} + self.send_packet(req) + + def _subscribe_topic(self, topic: str, callback: Callable[[str, dict], Any]): + """ + Subscribe to all private topics. + """ + self._topic_callbacks[topic] = callback + + req = { + "op": "subscribe", + "args": [topic], + } + self.send_packet(req) + + def _subscribe_initialize_topics(self): + """ + Subscribe to all private topics. + """ + self._subscribe_topic("order", self.on_order) + self._subscribe_topic("execution", self.on_trade) + self._subscribe_topic("position", self.on_position) + + def _get_last_tick(self, symbol: str): + tick = self.ticks.get(symbol, None) + if tick is None: + # noinspection PyTypeChecker + tick = TickData( + symbol=symbol, + exchange=Exchange.BYBIT, + name=symbol, + datetime=None, # this will be filled before this new created tick is consumed. + gateway_name=self.gateway_name, + ) + self.ticks[symbol] = tick + return tick + + def on_tick(self, topic: str, raw_data: dict): + """""" + # parse incremental data sent from server + symbol = topic[22:] + self._update_tick_incremental_data(symbol, raw_data) + + # convert dict into TickData + last_data = self._instrument_info_data[symbol] + tick = self._get_last_tick(symbol) + tick.last_price = last_data["last_price_e4"] / 10000 + tick.datetime = _parse_timestamp_e6(raw_data['timestamp_e6']) + self.gateway.on_tick(copy(tick)) + + def _update_tick_incremental_data(self, symbol, raw_data): + type_ = raw_data['type'] + data = raw_data['data'] + last_data = self._instrument_info_data[symbol] + if type_ == 'snapshot': + last_data.clear() + last_data.update(data) + elif type_ == 'delta': + updates = data['update'] + for update in updates: + assert update['symbol'] == symbol + last_data.update(update) + else: + self.gateway.write_log(f"Unknown type {type_} in websocket_api.on_tick()") + + def on_depth(self, topic: str, raw_data: dict): + """""" + symbol = topic[15:] + self._update_depth_incremental_data(symbol, raw_data) + + last_data = self._orderbook_data[symbol] + + tick = self._get_last_tick(symbol) + for n, parsed in enumerate(last_data.islice(20, 25)): + tick.__setattr__(f"bid_price_{5 - n}", parsed.price) + tick.__setattr__(f"bid_volume_{5 - n}", parsed.size) + + for n, parsed in enumerate(last_data.islice(25, 30)): + tick.__setattr__(f"ask_price_{n + 1}", parsed.price) + tick.__setattr__(f"ask_volume_{n + 1}", parsed.size) + + tick.datetime = _parse_timestamp_e6(raw_data['timestamp_e6']) + self.gateway.on_tick(copy(tick)) + + def _update_depth_incremental_data(self, symbol, raw_data): + type_ = raw_data['type'] + data = raw_data['data'] + last_data = self._orderbook_data[symbol] + if type_ == 'snapshot': + last_data.clear() + for item in data: + assert item['symbol'] == symbol + parsed = RawOrderInfo(item) + last_data.add(parsed) + elif type_ == 'delta': + deletes = data['delete'] + for delete in deletes: + assert delete['symbol'] == symbol + parsed = RawOrderInfo(delete) + last_data.remove(parsed) + + updates = data['update'] + for update in updates: + assert update['symbol'] == symbol + parsed = RawOrderInfo(update) + last_data.remove(parsed) + last_data.add(parsed) + + inserts = data['insert'] + for insert in inserts: + assert insert['symbol'] == symbol + parsed = RawOrderInfo(insert) + last_data.add(parsed) + else: + self.gateway.write_log(f"Unknown type {type_} in websocket_api.on_tick()") + + def on_trade(self, topic: str, raw_data: dict): + """""" + for data in raw_data['data']: + order_id = data['order_link_id'] + if not order_id: + order_id = data['order_id'] + trade = TradeData( + symbol=data["symbol"], + exchange=Exchange.BYBIT, + orderid=order_id, + tradeid=data['exec_id'], + direction=DIRECTION_BYBIT2VT[data["side"]], + price=data["price"], + volume=data["exec_qty"], + time=parse_datetime(data["trade_time"]), + gateway_name=self.gateway_name, + ) + + self.gateway.on_trade(trade) + + def on_order(self, topic: str, raw_data: dict): + """""" + for data in raw_data['data']: + print(data) + order = self.gateway.parse_order_data(data, 'timestamp') + + self.gateway.on_order(copy(order)) + + def on_position(self, topic: str, raw_data: dict): + """""" + for data in raw_data['data']: + p1, p2 = self.gateway.parse_position_data(data) + self.gateway.on_position(p1) + if p2: + self.gateway.on_position(p2) + + def on_account(self, d): + """""" + accountid = str(d["account"]) + account = self.accounts.get(accountid, None) + if not account: + account = AccountData(accountid=accountid, + gateway_name=self.gateway_name) + self.accounts[accountid] = account + + account.balance = d.get("marginBalance", account.balance) + account.available = d.get("availableMargin", account.available) + account.frozen = account.balance - account.available + + self.gateway.on_account(copy(account)) + + def on_contract(self, d): + """""" + if "tickSize" not in d: + return + + if not d["lotSize"]: + return + + contract = ContractData( + symbol=d["symbol"], + exchange=Exchange.BYBIT, + name=d["symbol"], + product=Product.FUTURES, + pricetick=d["tickSize"], + size=d["lotSize"], + stop_supported=True, + net_position=True, + history_data=True, + gateway_name=self.gateway_name, + ) + + self.gateway.on_contract(contract) + + def _ping(self): + super()._ping() + self.send_packet({'op': 'ping'}) + + +def _parse_timestamp_e6(timestamp: int): + return datetime.fromtimestamp(timestamp / 1_000_000, utc_tz).astimezone(tz=local_tz) + + + + + +def generate_timestamp(expire_after: float = 30) -> int: + """ + :param expire_after: expires in seconds. + :return: timestamp in milliseconds + """ + return int(time.time() * 1000 + expire_after * 1000) + + +def sign(secret: bytes, data: bytes) -> str: + """""" + return hmac.new( + secret, data, digestmod=hashlib.sha256 + ).hexdigest() + + +def parse_datetime(dt: str) -> str: + return dt[11:19] diff --git a/vnpy/gateway/bybit/common.py b/vnpy/gateway/bybit/common.py deleted file mode 100644 index 7d9acc17..00000000 --- a/vnpy/gateway/bybit/common.py +++ /dev/null @@ -1,82 +0,0 @@ -import hashlib -import hmac -import time -from datetime import datetime, timedelta, timezone - -from vnpy.trader.constant import Direction, Interval, OrderType, Status - -STATUS_BYBIT2VT = { - "Created": Status.NOTTRADED, - "New": Status.NOTTRADED, - "PartiallyFilled": Status.PARTTRADED, - "Filled": Status.ALLTRADED, - "Cancelled": Status.CANCELLED, - "Rejected": Status.REJECTED, -} -STOP_ORDER_STATUS_BYBIT2VT = { - "Untriggered": Status.NOTTRADED, - "Triggered": Status.NOTTRADED, - # Active: triggered and placed. - # since price is market price, placed == AllTraded? - "Active": Status.ALLTRADED, - "Cancelled": Status.CANCELLED, - "Rejected": Status.REJECTED, -} -DIRECTION_VT2BYBIT = {Direction.LONG: "Buy", Direction.SHORT: "Sell"} -DIRECTION_BYBIT2VT = {v: k for k, v in DIRECTION_VT2BYBIT.items()} -DIRECTION_BYBIT2VT.update({ - "None": Direction.LONG -}) - -OPPOSITE_DIRECTION = { - Direction.LONG: Direction.SHORT, - Direction.SHORT: Direction.LONG, -} - -ORDER_TYPE_VT2BYBIT = { - OrderType.LIMIT: "Limit", - OrderType.MARKET: "Market", -} - -ORDER_TYPE_BYBIT2VT = {v: k for k, v in ORDER_TYPE_VT2BYBIT.items()} - -INTERVAL_VT2BYBIT = { - Interval.MINUTE: "1", - Interval.HOUR: "60", - Interval.DAILY: "D", - Interval.WEEKLY: "W", -} -INTERVAL_VT2BYBIT_INT = { - Interval.MINUTE: 1, - Interval.HOUR: 60, - Interval.DAILY: 60 * 24, - Interval.WEEKLY: 60 * 24 * 7, -} -TIMEDELTA_MAP = { - Interval.MINUTE: timedelta(minutes=1), - Interval.HOUR: timedelta(hours=1), - Interval.DAILY: timedelta(days=1), - Interval.WEEKLY: timedelta(days=7), -} - -utc_tz = timezone.utc -local_tz = datetime.now(timezone.utc).astimezone().tzinfo - - -def generate_timestamp(expire_after: float = 30) -> int: - """ - :param expire_after: expires in seconds. - :return: timestamp in milliseconds - """ - return int(time.time() * 1000 + expire_after * 1000) - - -def sign(secret: bytes, data: bytes) -> str: - """""" - return hmac.new( - secret, data, digestmod=hashlib.sha256 - ).hexdigest() - - -def parse_datetime(dt: str) -> str: - return dt[11:19] diff --git a/vnpy/gateway/bybit/rest_api.py b/vnpy/gateway/bybit/rest_api.py deleted file mode 100644 index abaa9f82..00000000 --- a/vnpy/gateway/bybit/rest_api.py +++ /dev/null @@ -1,485 +0,0 @@ -import sys -from dataclasses import dataclass -from datetime import datetime -from threading import Lock -from typing import List, TYPE_CHECKING, Tuple -from urllib.parse import urlencode - -from requests import ConnectionError - -from vnpy.api.rest import Request, RestClient -from vnpy.trader.constant import Exchange, Interval, OrderType, Product, Status -from vnpy.trader.object import AccountData, BarData, CancelRequest, ContractData, OrderRequest -from .common import (DIRECTION_VT2BYBIT, INTERVAL_VT2BYBIT, ORDER_TYPE_VT2BYBIT, generate_timestamp, - parse_datetime, sign, INTERVAL_VT2BYBIT_INT) - -if TYPE_CHECKING: - from vnpy.gateway.bybit import BybitGateway -_ = lambda x: x # noqa - -HOST = "https://api.bybit.com" -TEST_HOST = "https://api-testnet.bybit.com" - -# asked from official developer -PRICE_TICKS = { - "BTCUSD": 0.5, - "ETHUSD": 0.05, - "EOSUSD": 0.001, - "XRPUSD": 0.0001, -} - - -@dataclass() -class HistoryDataNextInfo: - symbol: str - interval: Interval - end: int - - -class RequestFailedException(Exception): - pass - - -class BybitRestApi(RestClient): - """ - BitMEX REST API - """ - - def __init__(self, gateway: "BybitGateway"): - """""" - super(BybitRestApi, self).__init__() - - self.gateway = gateway - self.gateway_name = gateway.gateway_name - - self.key = "" - self.secret = b"" - - self.order_count = 1_000_000 - self.order_count_lock = Lock() - - self.connect_time = 0 - - # Use 60 by default, and will update after first request - self.rate_limit_limit = 60 - self.rate_limit_remaining = 60 - self.rate_limit_sleep = 0 - - @property - def ticks(self): - return self.gateway.ticks - - def sign(self, request): - """ - Generate BitMEX signature. - """ - - if request.method == "GET": - api_params = request.params # dict 2 sign - if api_params is None: - api_params = request.params = {} - else: # POST - api_params = request.data - if api_params is None: - api_params = request.data = {} - - expires = generate_timestamp(0) - - api_params['api_key'] = self.key - api_params['recv_window'] = 30 * 1000 - api_params['timestamp'] = expires - - data2sign = urlencode({k: api_params[k] for k in sorted(api_params)}) - signature = sign(self.secret, data2sign.encode()) - api_params['sign'] = signature - - return request - - def connect( - self, - key: str, - secret: str, - session_number: int, - server: str, - proxy_host: str, - proxy_port: int, - ): - """ - Initialize connection to REST server. - """ - self.key = key - self.secret = secret.encode() - - self.connect_time = ( - int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count - ) - - if server == "REAL": - self.init(HOST, proxy_host, proxy_port) - else: - self.init(TEST_HOST, proxy_host, proxy_port) - - self.start(session_number) - - self.gateway.write_log(_("REST API启动成功")) - - def _new_order_id(self): - """""" - with self.order_count_lock: - self.order_count += 1 - return self.order_count - - def send_order(self, req: OrderRequest): - """""" - if not self.check_rate_limit(): - return "" - - order_id = str(self.connect_time + self._new_order_id()) - - symbol = req.symbol - data = { - "symbol": symbol, - "side": DIRECTION_VT2BYBIT[req.direction], - "qty": int(req.volume), - "order_link_id": order_id, - "time_in_force": "GoodTillCancel" - } - - order = req.create_order_data(order_id, self.gateway_name) - order.time = datetime.now().isoformat()[11:19] - - # Only add price for limit order. - if req.type != OrderType.STOP: - data["order_type"] = ORDER_TYPE_VT2BYBIT[req.type] - data["price"] = req.price - self.add_request( - "POST", - "/open-api/order/create", - callback=self.on_send_order, - data=data, - extra=order, - on_failed=self.on_send_order_failed, - on_error=self.on_send_order_error, - ) - else: - assert self.ticks[symbol], _("Subscribe This Symbol before sending StopOrder.") - last_price = self.ticks[symbol].last_price - data["order_type"] = 'Market' - data["stop_px"] = req.price - data["base_price"] = last_price - self.add_request( - "POST", - "/open-api/stop-order/create", - callback=self.on_send_stop_order, - data=data, - extra=order, - on_failed=self.on_send_order_failed, - on_error=self.on_send_order_error, - ) - - self.gateway.on_order(order) - return order.vt_orderid - - def cancel_order(self, req: CancelRequest): - """""" - if not self.check_rate_limit(): - return - sys_id = self.gateway.orderid2sys(req.orderid) - order = self.gateway.orders[req.orderid] - - if order.type != OrderType.STOP: - path = "/open-api/order/cancel" - key = "order_id" - callback = self.on_cancel_order - else: - path = "/open-api/stop-order/cancel" - key = "stop_order_id" - callback = self.on_cancel_stop_order - - self.add_request( - "POST", - path, - callback=callback, - data={ - key: sys_id, - "symbol": req.symbol, - }, - on_error=self.on_cancel_order_error, - extra=order, - ) - - def query_history(self, - symbol: str, - interval: Interval, - start: int, # unix timestamp - limit: int = None, - ) -> Tuple[List[BarData], "HistoryDataNextInfo"]: - """ - Get history data synchronously. - """ - if limit is None: - limit = self.gateway.HISTORY_RECORD_PER_REQUEST - - bars = [] - - # datetime for a bar is close_time - # we got open_time from API. - adjustment = INTERVAL_VT2BYBIT_INT[interval] - - params = { - "interval": INTERVAL_VT2BYBIT[interval], - "symbol": symbol, - "limit": limit, - "from": start, - } - - # todo: RestClient: return RestClient.Request object instead of requests.Response. - resp = self.request( - "GET", - "/v2/public/kline/list", - params=params - ) - - # Break if request failed with other status code - raw_data = resp.json() - if not self.is_request_success(raw_data, None): - msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}" - self.gateway.write_log(msg) - raise RequestFailedException(msg) - result = raw_data['result'] - for data in result: - open_time = int(data["open_time"]) - close_time = open_time + adjustment - close_dt = datetime.fromtimestamp(close_time) - bar = BarData( - symbol=symbol, - exchange=Exchange.BYBIT, - datetime=close_dt, - interval=interval, - volume=data["volume"], - open_price=data["open"], - high_price=data["high"], - low_price=data["low"], - close_price=data["close"], - gateway_name=self.gateway_name - ) - bars.append(bar) - - end = result[-1]["open_time"] - return bars, HistoryDataNextInfo(symbol, interval, end) - - def on_send_order_failed(self, status_code: int, request: Request): - """ - Callback when sending order failed on server. - """ - data = request.response.json() - self.update_rate_limit(data) - - order = request.extra - order.status = Status.REJECTED - self.gateway.on_order(order) - - msg = f"委托失败,错误代码:{data['ret_code']}, 错误信息:{data['ret_msg']}" - - self.gateway.write_log(msg) - - def on_send_order_error( - self, exception_type: type, exception_value: Exception, tb, request: Request - ): - """ - Callback when sending order caused exception. - """ - order = request.extra - order.status = Status.REJECTED - self.gateway.on_order(order) - - # Record exception if not ConnectionError - if not issubclass(exception_type, ConnectionError): - self.on_error(exception_type, exception_value, tb, request) - - def on_send_order(self, raw_data: dict, request: Request): - """""" - - data = raw_data['result'] - order = self.gateway.parse_order_data(data) - self.gateway.on_order(order) - - self.update_rate_limit(raw_data) - - def on_send_stop_order(self, raw_data: dict, request: Request): - """""" - data = raw_data['result'] - order = self.gateway.parse_stop_order_data(data) - order.time = parse_datetime(data['updated_at']) - self.gateway.on_order(order) - - self.update_rate_limit(raw_data) - - def on_cancel_order_error( - self, exception_type: type, exception_value: Exception, tb, request: Request - ): - """ - Callback when cancelling order failed on server. - """ - # Record exception if not ConnectionError - if not issubclass(exception_type, ConnectionError): - self.on_error(exception_type, exception_value, tb, request) - - def on_cancel_order(self, raw_data: dict, request: Request): - """""" - data: dict = raw_data['result'] - - order = self.gateway.parse_order_data(data) - self.gateway.on_order(order) - self.update_rate_limit(data) - - def on_cancel_stop_order(self, raw_data: dict, request: Request): - """""" - data: dict = raw_data['result'] - order = self.gateway.parse_stop_order_data(data) - order.time = parse_datetime(data['updated_at']) - self.gateway.on_order(order) - self.update_rate_limit(data) - - def on_failed(self, status_code: int, request: Request): - """ - Callback to handle request failed. - """ - data = request.response.json() - self._handle_error_response(data, request) - - def _handle_error_response(self, data, request, operation_name: str = None): - if operation_name is None: - operation_name = request.path - - self.update_rate_limit(data) - error_msg = data["ret_msg"] - error_code = data['ret_code'] - msg = f"请求{operation_name}失败,状态码:{request.status},错误代码:{error_code}, 信息:{error_msg}" - msg += f'\n{request}' - self.gateway.write_log(msg) - - def on_error( - self, exception_type: type, exception_value: Exception, tb, request: Request - ): - """ - Callback to handler request exception. - """ - msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" - self.gateway.write_log(msg) - - sys.stderr.write( - self.exception_detail(exception_type, exception_value, tb, request) - ) - - def update_rate_limit(self, data: dict): - """ - Update current request limit remaining status. - :param data: - """ - remaining = data.get('rate_limit_status', None) - if remaining is not None: - self.rate_limit_remaining = remaining - - def increase_rate_limit(self): - """ - Reset request limit remaining every 1 second. - """ - self.rate_limit_remaining += 1 - self.rate_limit_remaining = min( - self.rate_limit_remaining, self.rate_limit_limit) - - # Countdown of retry sleep seconds - if self.rate_limit_sleep: - self.rate_limit_sleep -= 1 - - def check_rate_limit(self): - """ - Check if rate limit is reached before sending out requests. - """ - # Already received 429 from server - if self.rate_limit_sleep: - msg = f"请求过于频繁,已被Bybit限制,请等待{self.rate_limit_sleep}秒后再试" - self.gateway.write_log(msg) - return False - # Just local request limit is reached - elif not self.rate_limit_remaining: - msg = "请求频率太高,有触发Bybit流控的风险,请稍候再试" - self.gateway.write_log(msg) - return False - else: - self.rate_limit_remaining -= 1 - return True - - def is_request_success(self, data: dict, request: "Request"): - return data['ret_code'] == 0 - - def query_contracts(self): - self.add_request("GET", - "/v2/public/tickers", - self.on_query_contracts) - - def on_query_contracts(self, data: dict, request: "Request"): - for result in data['result']: - symbol = result['symbol'] - contract = ContractData( - gateway_name=self.gateway_name, - symbol=symbol, - exchange=Exchange.BYBIT, - name=symbol, - product=Product.FUTURES, - size=1, - # todo: pricetick: Currently(2019-9-2) unable to query. - pricetick=PRICE_TICKS.get(symbol, 0.0001), - ) - self.gateway.on_contract(contract) - - def query_position(self): - self.add_request("GET", - "/position/list", - self.on_query_position - ) - - def on_query_position(self, raw_data: dict, request: "Request"): - for data in raw_data['result']: - p1, p2 = self.gateway.parse_position_data(data) - self.gateway.on_position(p1) - if p2: - self.gateway.on_position(p2) - - account = AccountData( - gateway_name=self.gateway_name, - accountid=p1.symbol, - balance=data['wallet_balance'], - frozen=data['order_margin'] - ) - self.gateway.on_account(account) - - def query_orders(self): - """ - query all orders, including stop orders - """ - self.add_request("GET", - "/open-api/order/list", - callback=self.on_query_orders, - ) - self.query_stop_orders() - - def query_stop_orders(self): - self.add_request("GET", - "/open-api/stop-order/list", - callback=self.on_query_stop_orders, - ) - - def on_query_orders(self, raw_data: dict, request: "Request"): - result = raw_data['result'] - for data in result['data']: - if data['order_status'] != 'NotActive': # UnTriggered StopOrder - order = self.gateway.parse_order_data(data) - self.gateway.on_order(order) - - def on_query_stop_orders(self, raw_data: dict, request: "Request"): - result = raw_data['result'] - for data in result['data']: - order = self.gateway.parse_stop_order_data(data) - self.gateway.on_order(order) diff --git a/vnpy/gateway/bybit/websocket_api.py b/vnpy/gateway/bybit/websocket_api.py deleted file mode 100644 index c66c7da0..00000000 --- a/vnpy/gateway/bybit/websocket_api.py +++ /dev/null @@ -1,342 +0,0 @@ -import hashlib -import hmac -import sys -import time -from collections import defaultdict -from copy import copy -from datetime import datetime -from typing import Any, Callable, Dict, TYPE_CHECKING - -from sortedcontainers import SortedSet - -from vnpy.api.websocket import WebsocketClient -from vnpy.trader.constant import (Exchange, Product) -from vnpy.trader.object import (AccountData, ContractData, SubscribeRequest, TickData, TradeData) -from .common import (DIRECTION_BYBIT2VT, generate_timestamp, local_tz, parse_datetime, sign, utc_tz) - -if TYPE_CHECKING: - from vnpy.gateway.bybit import BybitGateway - -HOST = "wss://stream.bybit.com/realtime" -TEST_HOST = "wss://stream-testnet.bybit.com/realtime" - - -class RawOrderInfo: - - def __init__(self, raw_data: dict): - self.id = raw_data['id'] - self.price = raw_data['price'] - self.side = raw_data['side'] # 'Buy', 'Sell' - self.size = raw_data.get('size', 0) - - def __lt__(self, rhs: "RawOrderInfo"): - return self.price < rhs.price - - def __eq__(self, rhs: "RawOrderInfo"): - return self.price == rhs.price - - def __hash__(self): - return self.id # price is a string and we don't known its precision - - -class BybitWebsocketApi(WebsocketClient): - """""" - - def __init__(self, gateway: "BybitGateway"): - """""" - super(BybitWebsocketApi, self).__init__() - - self.server: str = "" # REAL or TESTNET - self.gateway = gateway - self.gateway_name = gateway.gateway_name - - self.key = "" - self.secret = b"" - - self._instrument_info_data: Dict[str, dict] = defaultdict(dict) - self._orderbook_data: Dict[str, SortedSet[RawOrderInfo]] = defaultdict(SortedSet) - - self.accounts = {} - - self._topic_callbacks = {} - - @property - def ticks(self): - return self.gateway.ticks - - @property - def orders(self): - return self.gateway.orders - - def connect( - self, key: str, secret: str, server: str, proxy_host: str, proxy_port: int - ): - """""" - self.key = key - self.secret = secret.encode() - self.proxy_host = proxy_host - self.proxy_port = proxy_port - self.server = server - - self.reset_authentication() - self.start() - - def reset_authentication(self): - expires = generate_timestamp(30) - d2s = f"GET/realtime{int(expires)}" - signature = sign(self.secret, d2s.encode()) - params = f"api_key={self.key}&expires={expires}&signature={signature}" - if self.server == "REAL": - host = HOST - else: - host = TEST_HOST - url = f'{host}?{params}' - self.init(url, self.proxy_host, self.proxy_port) - - def subscribe(self, req: SubscribeRequest): - """ - Subscribe to tick data upate. - """ - symbol = req.symbol - self._subscribe_topic(f"instrument_info.100ms.{symbol}", self.on_tick) - self._subscribe_topic(f"orderBookL2_25.{symbol}", self.on_depth) - - def on_connected(self): - """""" - self.gateway.write_log("Websocket API连接且认证成功") - self._subscribe_initialize_topics() - - def on_disconnected(self): - """""" - self.gateway.write_log("Websocket API连接断开") - self.reset_authentication() - - def on_packet(self, packet: dict): - """""" - success = packet.get('success', None) - topic = packet.get('topic', None) - if success is not None: - if success is False: - self.gateway.write_log("Websocket API报错:%s" % packet["ret_msg"]) - elif topic is not None: - self._topic_callbacks[topic](topic, packet) - else: - self.gateway.write_log(f"unkonwn packet: {packet}") - - def on_error(self, exception_type: type, exception_value: Exception, tb): - """""" - msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" - self.gateway.write_log(msg) - - sys.stderr.write(self.exception_detail( - exception_type, exception_value, tb)) - - def authenticate(self): - """ - Authenticate websockey connection to subscribe private topic. - """ - expires = int(time.time()) - method = "GET" - path = "/realtime" - msg = method + path + str(expires) - signature = hmac.new( - self.secret, msg.encode(), digestmod=hashlib.sha256 - ).hexdigest() - - req = {"op": "authKey", "args": [self.key, expires, signature]} - self.send_packet(req) - - def _subscribe_topic(self, topic: str, callback: Callable[[str, dict], Any]): - """ - Subscribe to all private topics. - """ - self._topic_callbacks[topic] = callback - - req = { - "op": "subscribe", - "args": [topic], - } - self.send_packet(req) - - def _subscribe_initialize_topics(self): - """ - Subscribe to all private topics. - """ - self._subscribe_topic("order", self.on_order) - self._subscribe_topic("execution", self.on_trade) - self._subscribe_topic("position", self.on_position) - - def _get_last_tick(self, symbol: str): - tick = self.ticks.get(symbol, None) - if tick is None: - # noinspection PyTypeChecker - tick = TickData( - symbol=symbol, - exchange=Exchange.BYBIT, - name=symbol, - datetime=None, # this will be filled before this new created tick is consumed. - gateway_name=self.gateway_name, - ) - self.ticks[symbol] = tick - return tick - - def on_tick(self, topic: str, raw_data: dict): - """""" - # parse incremental data sent from server - symbol = topic[22:] - self._update_tick_incremental_data(symbol, raw_data) - - # convert dict into TickData - last_data = self._instrument_info_data[symbol] - tick = self._get_last_tick(symbol) - tick.last_price = last_data["last_price_e4"] / 10000 - tick.datetime = _parse_timestamp_e6(raw_data['timestamp_e6']) - self.gateway.on_tick(copy(tick)) - - def _update_tick_incremental_data(self, symbol, raw_data): - type_ = raw_data['type'] - data = raw_data['data'] - last_data = self._instrument_info_data[symbol] - if type_ == 'snapshot': - last_data.clear() - last_data.update(data) - elif type_ == 'delta': - updates = data['update'] - for update in updates: - assert update['symbol'] == symbol - last_data.update(update) - else: - self.gateway.write_log(f"Unknown type {type_} in websocket_api.on_tick()") - - def on_depth(self, topic: str, raw_data: dict): - """""" - symbol = topic[15:] - self._update_depth_incremental_data(symbol, raw_data) - - last_data = self._orderbook_data[symbol] - - tick = self._get_last_tick(symbol) - for n, parsed in enumerate(last_data.islice(20, 25)): - tick.__setattr__(f"bid_price_{5 - n}", parsed.price) - tick.__setattr__(f"bid_volume_{5 - n}", parsed.size) - - for n, parsed in enumerate(last_data.islice(25, 30)): - tick.__setattr__(f"ask_price_{n + 1}", parsed.price) - tick.__setattr__(f"ask_volume_{n + 1}", parsed.size) - - tick.datetime = _parse_timestamp_e6(raw_data['timestamp_e6']) - self.gateway.on_tick(copy(tick)) - - def _update_depth_incremental_data(self, symbol, raw_data): - type_ = raw_data['type'] - data = raw_data['data'] - last_data = self._orderbook_data[symbol] - if type_ == 'snapshot': - last_data.clear() - for item in data: - assert item['symbol'] == symbol - parsed = RawOrderInfo(item) - last_data.add(parsed) - elif type_ == 'delta': - deletes = data['delete'] - for delete in deletes: - assert delete['symbol'] == symbol - parsed = RawOrderInfo(delete) - last_data.remove(parsed) - - updates = data['update'] - for update in updates: - assert update['symbol'] == symbol - parsed = RawOrderInfo(update) - last_data.remove(parsed) - last_data.add(parsed) - - inserts = data['insert'] - for insert in inserts: - assert insert['symbol'] == symbol - parsed = RawOrderInfo(insert) - last_data.add(parsed) - else: - self.gateway.write_log(f"Unknown type {type_} in websocket_api.on_tick()") - - def on_trade(self, topic: str, raw_data: dict): - """""" - for data in raw_data['data']: - order_id = data['order_link_id'] - if not order_id: - order_id = data['order_id'] - trade = TradeData( - symbol=data["symbol"], - exchange=Exchange.BYBIT, - orderid=order_id, - tradeid=data['exec_id'], - direction=DIRECTION_BYBIT2VT[data["side"]], - price=data["price"], - volume=data["exec_qty"], - time=parse_datetime(data["trade_time"]), - gateway_name=self.gateway_name, - ) - - self.gateway.on_trade(trade) - - def on_order(self, topic: str, raw_data: dict): - """""" - for data in raw_data['data']: - print(data) - order = self.gateway.parse_order_data(data, 'timestamp') - - self.gateway.on_order(copy(order)) - - def on_position(self, topic: str, raw_data: dict): - """""" - for data in raw_data['data']: - p1, p2 = self.gateway.parse_position_data(data) - self.gateway.on_position(p1) - if p2: - self.gateway.on_position(p2) - - def on_account(self, d): - """""" - accountid = str(d["account"]) - account = self.accounts.get(accountid, None) - if not account: - account = AccountData(accountid=accountid, - gateway_name=self.gateway_name) - self.accounts[accountid] = account - - account.balance = d.get("marginBalance", account.balance) - account.available = d.get("availableMargin", account.available) - account.frozen = account.balance - account.available - - self.gateway.on_account(copy(account)) - - def on_contract(self, d): - """""" - if "tickSize" not in d: - return - - if not d["lotSize"]: - return - - contract = ContractData( - symbol=d["symbol"], - exchange=Exchange.BYBIT, - name=d["symbol"], - product=Product.FUTURES, - pricetick=d["tickSize"], - size=d["lotSize"], - stop_supported=True, - net_position=True, - history_data=True, - gateway_name=self.gateway_name, - ) - - self.gateway.on_contract(contract) - - def _ping(self): - super()._ping() - self.send_packet({'op': 'ping'}) - - -def _parse_timestamp_e6(timestamp: int): - return datetime.fromtimestamp(timestamp / 1_000_000, utc_tz).astimezone(tz=local_tz) From 7a2007ddbd11725e31a69479896cc1befa1fee28 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Mon, 21 Oct 2019 11:01:50 +0800 Subject: [PATCH 2/5] [Mod] simplify market data process --- examples/vn_trader/run.py | 12 +- vnpy/gateway/bybit/bybit_gateway.py | 1154 +++++++++------------------ 2 files changed, 379 insertions(+), 787 deletions(-) diff --git a/examples/vn_trader/run.py b/examples/vn_trader/run.py index 6aa6332c..32174775 100644 --- a/examples/vn_trader/run.py +++ b/examples/vn_trader/run.py @@ -31,6 +31,7 @@ from vnpy.gateway.da import DaGateway from vnpy.gateway.coinbase import CoinbaseGateway from vnpy.gateway.bitstamp import BitstampGateway from vnpy.gateway.gateios import GateiosGateway +from vnpy.gateway.bybit import BybitGateway from vnpy.app.cta_strategy import CtaStrategyApp # from vnpy.app.csv_loader import CsvLoaderApp @@ -52,7 +53,7 @@ def main(): main_engine = MainEngine(event_engine) # main_engine.add_gateway(BinanceGateway) - main_engine.add_gateway(CtpGateway) + # main_engine.add_gateway(CtpGateway) # main_engine.add_gateway(CtptestGateway) # main_engine.add_gateway(MiniGateway) # main_engine.add_gateway(SoptGateway) @@ -60,12 +61,12 @@ def main(): # main_engine.add_gateway(FemasGateway) # main_engine.add_gateway(IbGateway) # main_engine.add_gateway(FutuGateway) - main_engine.add_gateway(BitmexGateway) + # main_engine.add_gateway(BitmexGateway) # main_engine.add_gateway(TigerGateway) # main_engine.add_gateway(OesGateway) # main_engine.add_gateway(OkexGateway) # main_engine.add_gateway(HuobiGateway) - main_engine.add_gateway(BitfinexGateway) + # main_engine.add_gateway(BitfinexGateway) # main_engine.add_gateway(OnetokenGateway) # main_engine.add_gateway(OkexfGateway) # main_engine.add_gateway(HbdmGateway) @@ -76,8 +77,9 @@ def main(): # main_engine.add_gateway(OkexsGateway) # main_engine.add_gateway(DaGateway) # main_engine.add_gateway(CoinbaseGateway) - main_engine.add_gateway(BitstampGateway) - main_engine.add_gateway(GateiosGateway) + # main_engine.add_gateway(BitstampGateway) + # main_engine.add_gateway(GateiosGateway) + main_engine.add_gateway(BybitGateway) main_engine.add_app(CtaStrategyApp) main_engine.add_app(CtaBacktesterApp) diff --git a/vnpy/gateway/bybit/bybit_gateway.py b/vnpy/gateway/bybit/bybit_gateway.py index e97ad129..c5de04bf 100644 --- a/vnpy/gateway/bybit/bybit_gateway.py +++ b/vnpy/gateway/bybit/bybit_gateway.py @@ -2,57 +2,40 @@ import hashlib import hmac import time -from datetime import datetime, timedelta, timezone - -from dataclasses import dataclass -from datetime import datetime -from typing import Any, Dict, List, Tuple import sys -from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List, Callable from threading import Lock -from typing import List, TYPE_CHECKING, Tuple -from urllib.parse import urlencode +from copy import copy from requests import ConnectionError - -import hashlib -import hmac -import sys -import time -from collections import defaultdict -from copy import copy -from datetime import datetime -from typing import Any, Callable, Dict, TYPE_CHECKING - -from sortedcontainers import SortedSet +# from sortedcontainers import SortedSet from vnpy.api.websocket import WebsocketClient -from vnpy.trader.constant import (Exchange, Product) -from vnpy.trader.object import (AccountData, ContractData, SubscribeRequest, TickData, TradeData) -from .common import (DIRECTION_BYBIT2VT, generate_timestamp, local_tz, parse_datetime, sign, utc_tz) - - - from vnpy.api.rest import Request, RestClient -from vnpy.trader.constant import Exchange, Interval, OrderType, Product, Status -from vnpy.trader.object import AccountData, BarData, CancelRequest, ContractData, OrderRequest -from .common import (DIRECTION_VT2BYBIT, INTERVAL_VT2BYBIT, ORDER_TYPE_VT2BYBIT, generate_timestamp, - parse_datetime, sign, INTERVAL_VT2BYBIT_INT) - - -from vnpy.event import Event -from vnpy.gateway.bybit.rest_api import BybitRestApi, HistoryDataNextInfo -from vnpy.gateway.bybit.websocket_api import BybitWebsocketApi -from vnpy.trader.constant import (Exchange, Interval, OrderType) -from vnpy.trader.constant import Direction, Interval, OrderType, Status -from vnpy.trader.event import EVENT_TIMER -from vnpy.trader.gateway import BaseGateway -from vnpy.trader.object import (BarData, CancelRequest, HistoryRequest, OrderData, OrderRequest, - PositionData, SubscribeRequest, TickData) -from .common import (DIRECTION_BYBIT2VT, INTERVAL_VT2BYBIT_INT, OPPOSITE_DIRECTION, - ORDER_TYPE_BYBIT2VT, STATUS_BYBIT2VT, STOP_ORDER_STATUS_BYBIT2VT, local_tz, - parse_datetime, utc_tz) +from vnpy.trader.constant import ( + Exchange, + Interval, + OrderType, + Product, + Status, + Direction +) +from vnpy.trader.object import ( + AccountData, + BarData, + TickData, + OrderData, + TradeData, + ContractData, + PositionData, + HistoryRequest, + SubscribeRequest, + CancelRequest, + OrderRequest +) +# from vnpy.trader.event import EVENT_TIMER +from vnpy.trader.gateway import BaseGateway, LocalOrderManager STATUS_BYBIT2VT = { @@ -87,7 +70,6 @@ ORDER_TYPE_VT2BYBIT = { OrderType.LIMIT: "Limit", OrderType.MARKET: "Market", } - ORDER_TYPE_BYBIT2VT = {v: k for k, v in ORDER_TYPE_VT2BYBIT.items()} INTERVAL_VT2BYBIT = { @@ -102,6 +84,7 @@ INTERVAL_VT2BYBIT_INT = { Interval.DAILY: 60 * 24, Interval.WEEKLY: 60 * 24 * 7, } + TIMEDELTA_MAP = { Interval.MINUTE: timedelta(minutes=1), Interval.HOUR: timedelta(hours=1), @@ -109,21 +92,12 @@ TIMEDELTA_MAP = { Interval.WEEKLY: timedelta(days=7), } -if TYPE_CHECKING: - from vnpy.gateway.bybit import BybitGateway -if TYPE_CHECKING: - from vnpy.gateway.bybit import BybitGateway +REST_HOST = "https://api.bybit.com" +WEBSOCKET_HOST = "wss://stream.bybit.com/realtime" -HOST = "wss://stream.bybit.com/realtime" -TEST_HOST = "wss://stream-testnet.bybit.com/realtime" - - -def _(x): return x # noqa - - -HOST = "https://api.bybit.com" -TEST_HOST = "https://api-testnet.bybit.com" +TESTNET_REST_HOST = "https://api-testnet.bybit.com" +TESTNET_WEBSOCKET_HOST = "wss://stream-testnet.bybit.com/realtime" # asked from official developer PRICE_TICKS = { @@ -137,62 +111,33 @@ utc_tz = timezone.utc local_tz = datetime.now(timezone.utc).astimezone().tzinfo -@dataclass() -class HistoryDataInfo: - bars: List[BarData] - extra: Any - - - - -@dataclass() -class HistoryDataNextInfo: - symbol: str - interval: Interval - end: int - - -class RequestFailedException(Exception): - pass - - - - - class BybitGateway(BaseGateway): """ - VN Trader Gateway for BitMEX connection. + VN Trader Gateway for ByBit connection. """ default_setting = { - "APIKey": "", - "PrivateKey": "", - "会话数": 3, + "ID": "", + "Secret": "", "服务器": ["REAL", "TESTNET"], "代理地址": "", "代理端口": "", } - HISTORY_RECORD_PER_REQUEST = 200 # # of records per history request exchanges = [Exchange.BYBIT] def __init__(self, event_engine): """Constructor""" - super(BybitGateway, self).__init__(event_engine, "BYBIT") + super().__init__(event_engine, "BYBIT") + self.order_manager = LocalOrderManager(self) self.rest_api = BybitRestApi(self) self.ws_api = BybitWebsocketApi(self) - self.ticks: Dict[str, TickData] = {} - self.orders: Dict[str, OrderData] = {} - self.local2sys_map: Dict[str, str] = {} - event_engine.register(EVENT_TIMER, self.process_timer_event) - def connect(self, setting: dict): """""" key = setting["ID"] secret = setting["Secret"] - session_number = setting["会话数"] server = setting["服务器"] proxy_host = setting["代理地址"] proxy_port = setting["代理端口"] @@ -202,13 +147,8 @@ class BybitGateway(BaseGateway): else: proxy_port = 0 - self.rest_api.connect(key, secret, session_number, - server, proxy_host, proxy_port) - + self.rest_api.connect(key, secret, server, proxy_host, proxy_port) self.ws_api.connect(key, secret, server, proxy_host, proxy_port) - self.rest_api.query_orders() - self.query_contracts() - self.query_account() def subscribe(self, req: SubscribeRequest): """""" @@ -222,269 +162,64 @@ class BybitGateway(BaseGateway): """""" self.rest_api.cancel_order(req) - def query_contracts(self): - self.rest_api.query_contracts() - def query_account(self): """""" - self.rest_api.query_position() + pass def query_position(self): """""" - self.rest_api.query_position() - - def query_first_history(self, - symbol: str, - interval: Interval, - start: datetime, - ) -> Tuple[List[BarData], "HistoryDataNextInfo"]: - - # datetime for a bar is close_time - # we got open_time from API. - adjustment = INTERVAL_VT2BYBIT_INT[interval] - - utc_time = start.replace(tzinfo=local_tz).astimezone(tz=utc_tz) - return self.rest_api.query_history( - symbol=symbol, - interval=interval, - - # todo: vnpy: shall all datetime object use tzinfo? - start=int(utc_time.timestamp()) - adjustment, - ) - - def query_next_history(self, - next_info: Any, - ): - data: "HistoryDataNextInfo" = next_info - return self.rest_api.query_history( - symbol=data.symbol, - interval=data.interval, - start=data.end, - ) + pass def query_history(self, req: HistoryRequest): - """ - todo: vnpy: download in parallel - todo: vnpy: use yield to simplify logic - :raises RequestFailedException: if server reply an error. - Any Exception might be raised from requests.request: network error. - """ - # todo: this function: test rate limit - history = [] - - symbol = req.symbol - interval = req.interval - start = req.start - - bars, next_info = self.query_first_history( - symbol=symbol, - interval=interval, - start=start, - ) - msg = f"获取历史数据成功,{req.symbol} - {req.interval.value},{bars[0].datetime} - {bars[-1].datetime}" - self.write_log(msg) - history.extend(bars) - while True: - bars, next_info = self.query_next_history(next_info) - - msg = f"获取历史数据成功,{req.symbol} - {req.interval.value},{bars[0].datetime} - {bars[-1].datetime}" - self.write_log(msg) - - # Break if total data count less than (latest date collected) - if len(bars) < self.HISTORY_RECORD_PER_REQUEST: - break - history.extend(bars) - return history + """""" + return self.rest_api.query_history() def close(self): """""" self.rest_api.stop() self.ws_api.stop() - def process_timer_event(self, event: Event): - """""" - self.rest_api.increase_rate_limit() - if self.rest_api.alive: - self.query_account() - self.rest_api.query_stop_orders() - - def write_log(self, msg: str): - return super().write_log(msg) - - def parse_order_data(self, data: dict, time_key: str = 'updated_at'): - """ - Parse order data from json dict. - todo: gateway: put this function to another place. - :note: this method will not fill 'time' record. Fill it by yourself. - """ - # prefer local_id as order_id - order_id = data["order_link_id"] - sys_id = data['order_id'] - if not order_id: - order_id = sys_id - - # saving mapping from order_id to sys_id - self.local2sys_map[order_id] = sys_id - - order = self.orders.get(order_id, None) - time = parse_datetime(data[time_key]) - - # filter outdated order - if order is not None and time < order.time: # string cmp is ok here. - return None - - # if order not exist(created outside this client) - # create it. - if order is None: - order = OrderData( - symbol=data["symbol"], - exchange=Exchange.BYBIT, - type=ORDER_TYPE_BYBIT2VT[data["order_type"]], - orderid=order_id, - direction=DIRECTION_BYBIT2VT[data["side"]], - price=data["price"], - volume=data["qty"], - time=time, - gateway_name=self.gateway_name, - ) - self.orders[order.orderid] = order - order.traded = data.get("cum_exec_qty", order.traded) - if 'order_status' in data: - order.status = STATUS_BYBIT2VT[data["order_status"]] - return order - - def parse_stop_order_data(self, data): - """ - Parse order data from json dict. - todo: put this function to another place. - :note: this method will not fill 'time' record. Fill it by yourself. - """ - # prefer local_id as order_id - order_id = data["order_link_id"] - sys_id = data['stop_order_id'] - if not order_id: - order_id = sys_id - - # saving mapping from order_id to sys_id - self.local2sys_map[order_id] = sys_id - - order = self.orders.get(order_id, None) - - # if order not exist(created outside this client) - # create it. - if not order: - order = OrderData( - symbol=data["symbol"], - exchange=Exchange.BYBIT, - type=OrderType.STOP, - orderid=order_id, - direction=DIRECTION_BYBIT2VT[data["side"]], - price=data["price"], - volume=data["qty"], - # this should be filled manually - time=parse_datetime(data['updated_at']), - gateway_name=self.gateway_name, - ) - self.orders[order.orderid] = order - if 'stop_order_status' in data: - # status = STATUS_BYBIT2VT.get(data["order_status"], None) - # if status is None: - # status = STOP_ORDER_STATUS_BYBIT2VT[data["order_status"]] - # order.status = status - order.status = STOP_ORDER_STATUS_BYBIT2VT[data["stop_order_status"]] - return order - - def orderid2sys(self, order_id: str): - """ - Convert order_id to sys_id - """ - return self.local2sys_map[order_id] - - def parse_position_data(self, data): - position = PositionData( - gateway_name=self.gateway_name, - symbol=data["symbol"], - exchange=Exchange.BYBIT, - direction=DIRECTION_BYBIT2VT[data['side']], - volume=data['size'], - price=data['entry_price'] - ) - - # clear opposite direction if necessary - if position.volume: - pos2 = PositionData( - gateway_name=position.gateway_name, - symbol=position.symbol, - exchange=Exchange.BYBIT, - direction=OPPOSITE_DIRECTION[position.direction], - volume=0, - price=0, - ) - return position, pos2 - return position, None - - def on_order(self, order: OrderData): - """ - Since WebSocket and RestClient will push the same orders asynchronously and separately - outdated orders should be filtered. - Outdated orders is filtered by parse_xxx_order_data(), returning None. - """ - if order is not None: - super().on_order(order) - class BybitRestApi(RestClient): """ - BitMEX REST API + ByBit REST API """ - def __init__(self, gateway: "BybitGateway"): + def __init__(self, gateway: BybitGateway): """""" - super(BybitRestApi, self).__init__() + super().__init__() self.gateway = gateway self.gateway_name = gateway.gateway_name + self.order_manager = gateway.order_manager self.key = "" self.secret = b"" self.order_count = 1_000_000 self.order_count_lock = Lock() - self.connect_time = 0 - # Use 60 by default, and will update after first request - self.rate_limit_limit = 60 - self.rate_limit_remaining = 60 - self.rate_limit_sleep = 0 - - @property - def ticks(self): - return self.gateway.ticks - - def sign(self, request): + def sign(self, request: Request): """ - Generate BitMEX signature. + Generate ByBit signature. """ - if request.method == "GET": - api_params = request.params # dict 2 sign + api_params = request.params if api_params is None: api_params = request.params = {} - else: # POST + else: api_params = request.data if api_params is None: api_params = request.data = {} - expires = generate_timestamp(0) + api_params["api_key"] = self.key + api_params["recv_window"] = 30 * 1000 + api_params["timestamp"] = generate_timestamp(-1) - api_params['api_key'] = self.key - api_params['recv_window'] = 30 * 1000 - api_params['timestamp'] = expires - - data2sign = urlencode({k: api_params[k] for k in sorted(api_params)}) + data2sign = "&".join([f"{k}={v}" for k, v in sorted(api_params.items())]) signature = sign(self.secret, data2sign.encode()) - api_params['sign'] = signature + api_params["sign"] = signature return request @@ -492,7 +227,6 @@ class BybitRestApi(RestClient): self, key: str, secret: str, - session_number: int, server: str, proxy_host: str, proxy_port: int, @@ -508,26 +242,20 @@ class BybitRestApi(RestClient): ) if server == "REAL": - self.init(HOST, proxy_host, proxy_port) + self.init(REST_HOST, proxy_host, proxy_port) else: - self.init(TEST_HOST, proxy_host, proxy_port) + self.init(TESTNET_REST_HOST, proxy_host, proxy_port) - self.start(session_number) + self.start(3) + self.gateway.write_log("REST API启动成功") - self.gateway.write_log(_("REST API启动成功")) - - def _new_order_id(self): - """""" - with self.order_count_lock: - self.order_count += 1 - return self.order_count + self.query_contract() + self.query_order() + self.query_position() def send_order(self, req: OrderRequest): """""" - if not self.check_rate_limit(): - return "" - - order_id = str(self.connect_time + self._new_order_id()) + order_id = self.order_manager.new_local_orderid() symbol = req.symbol data = { @@ -542,138 +270,58 @@ class BybitRestApi(RestClient): order.time = datetime.now().isoformat()[11:19] # Only add price for limit order. - if req.type != OrderType.STOP: - data["order_type"] = ORDER_TYPE_VT2BYBIT[req.type] - data["price"] = req.price - self.add_request( - "POST", - "/open-api/order/create", - callback=self.on_send_order, - data=data, - extra=order, - on_failed=self.on_send_order_failed, - on_error=self.on_send_order_error, - ) - else: - assert self.ticks[symbol], _("Subscribe This Symbol before sending StopOrder.") - last_price = self.ticks[symbol].last_price - data["order_type"] = 'Market' - data["stop_px"] = req.price - data["base_price"] = last_price - self.add_request( - "POST", - "/open-api/stop-order/create", - callback=self.on_send_stop_order, - data=data, - extra=order, - on_failed=self.on_send_order_failed, - on_error=self.on_send_order_error, - ) + data["order_type"] = ORDER_TYPE_VT2BYBIT[req.type] + data["price"] = req.price + self.add_request( + "POST", + "/open-api/order/create", + callback=self.on_send_order, + data=data, + extra=order, + on_failed=self.on_send_order_failed, + on_error=self.on_send_order_error, + ) self.gateway.on_order(order) return order.vt_orderid def cancel_order(self, req: CancelRequest): """""" - if not self.check_rate_limit(): - return - sys_id = self.gateway.orderid2sys(req.orderid) - order = self.gateway.orders[req.orderid] - - if order.type != OrderType.STOP: - path = "/open-api/order/cancel" - key = "order_id" - callback = self.on_cancel_order - else: - path = "/open-api/stop-order/cancel" - key = "stop_order_id" - callback = self.on_cancel_stop_order + sys_orderid = self.order_manager.get_sys_orderid(req.orderid) + data = { + "order_id": sys_orderid, + "symbol": req.symbol, + } self.add_request( "POST", - path, - callback=callback, - data={ - key: sys_id, - "symbol": req.symbol, - }, - on_error=self.on_cancel_order_error, - extra=order, + path="/open-api/order/cancel", + data=data, + callback=self.on_cancel_order ) - def query_history(self, - symbol: str, - interval: Interval, - start: int, # unix timestamp - limit: int = None, - ) -> Tuple[List[BarData], "HistoryDataNextInfo"]: - """ - Get history data synchronously. - """ - if limit is None: - limit = self.gateway.HISTORY_RECORD_PER_REQUEST - - bars = [] - - # datetime for a bar is close_time - # we got open_time from API. - adjustment = INTERVAL_VT2BYBIT_INT[interval] - - params = { - "interval": INTERVAL_VT2BYBIT[interval], - "symbol": symbol, - "limit": limit, - "from": start, - } - - # todo: RestClient: return RestClient.Request object instead of requests.Response. - resp = self.request( - "GET", - "/v2/public/kline/list", - params=params - ) - - # Break if request failed with other status code - raw_data = resp.json() - if not self.is_request_success(raw_data, None): - msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}" - self.gateway.write_log(msg) - raise RequestFailedException(msg) - result = raw_data['result'] - for data in result: - open_time = int(data["open_time"]) - close_time = open_time + adjustment - close_dt = datetime.fromtimestamp(close_time) - bar = BarData( - symbol=symbol, - exchange=Exchange.BYBIT, - datetime=close_dt, - interval=interval, - volume=data["volume"], - open_price=data["open"], - high_price=data["high"], - low_price=data["low"], - close_price=data["close"], - gateway_name=self.gateway_name - ) - bars.append(bar) - - end = result[-1]["open_time"] - return bars, HistoryDataNextInfo(symbol, interval, end) + def query_history( + self, + symbol: str, + interval: Interval, + start: int, # unix timestamp + limit: int = None, + ) -> List[BarData]: + """""" + pass def on_send_order_failed(self, status_code: int, request: Request): """ Callback when sending order failed on server. """ - data = request.response.json() - self.update_rate_limit(data) - order = request.extra order.status = Status.REJECTED self.gateway.on_order(order) - msg = f"委托失败,错误代码:{data['ret_code']}, 错误信息:{data['ret_msg']}" - + data = request.response.json() + error_msg = data["ret_msg"] + error_code = data["ret_code"] + msg = f"委托失败,错误代码:{error_code}, 错误信息:{error_msg}" self.gateway.write_log(msg) def on_send_order_error( @@ -690,23 +338,13 @@ class BybitRestApi(RestClient): if not issubclass(exception_type, ConnectionError): self.on_error(exception_type, exception_value, tb, request) - def on_send_order(self, raw_data: dict, request: Request): + def on_send_order(self, data: dict, request: Request): """""" - - data = raw_data['result'] - order = self.gateway.parse_order_data(data) - self.gateway.on_order(order) - - self.update_rate_limit(raw_data) - - def on_send_stop_order(self, raw_data: dict, request: Request): - """""" - data = raw_data['result'] - order = self.gateway.parse_stop_order_data(data) - order.time = parse_datetime(data['updated_at']) - self.gateway.on_order(order) - - self.update_rate_limit(raw_data) + result = data["result"] + self.order_manager.update_orderid_map( + result["order_link_id"], + result["order_id"] + ) def on_cancel_order_error( self, exception_type: type, exception_value: Exception, tb, request: Request @@ -718,38 +356,21 @@ class BybitRestApi(RestClient): if not issubclass(exception_type, ConnectionError): self.on_error(exception_type, exception_value, tb, request) - def on_cancel_order(self, raw_data: dict, request: Request): + def on_cancel_order(self, data: dict, request: Request): """""" - data: dict = raw_data['result'] - - order = self.gateway.parse_order_data(data) - self.gateway.on_order(order) - self.update_rate_limit(data) - - def on_cancel_stop_order(self, raw_data: dict, request: Request): - """""" - data: dict = raw_data['result'] - order = self.gateway.parse_stop_order_data(data) - order.time = parse_datetime(data['updated_at']) - self.gateway.on_order(order) - self.update_rate_limit(data) + pass def on_failed(self, status_code: int, request: Request): """ Callback to handle request failed. """ data = request.response.json() - self._handle_error_response(data, request) - def _handle_error_response(self, data, request, operation_name: str = None): - if operation_name is None: - operation_name = request.path - - self.update_rate_limit(data) error_msg = data["ret_msg"] - error_code = data['ret_code'] - msg = f"请求{operation_name}失败,状态码:{request.status},错误代码:{error_code}, 信息:{error_msg}" - msg += f'\n{request}' + error_code = data["ret_code"] + + msg = f"请求失败,状态码:{request.status},错误代码:{error_code}, 信息:{error_msg}" + self.gateway.write_log(msg) def on_error( @@ -765,130 +386,150 @@ class BybitRestApi(RestClient): self.exception_detail(exception_type, exception_value, tb, request) ) - def update_rate_limit(self, data: dict): - """ - Update current request limit remaining status. - :param data: - """ - remaining = data.get('rate_limit_status', None) - if remaining is not None: - self.rate_limit_remaining = remaining + def query_contract(self): + """""" + self.add_request( + "GET", + "/v2/public/symbols", + self.on_query_contract + ) - def increase_rate_limit(self): - """ - Reset request limit remaining every 1 second. - """ - self.rate_limit_remaining += 1 - self.rate_limit_remaining = min( - self.rate_limit_remaining, self.rate_limit_limit) - - # Countdown of retry sleep seconds - if self.rate_limit_sleep: - self.rate_limit_sleep -= 1 - - def check_rate_limit(self): - """ - Check if rate limit is reached before sending out requests. - """ - # Already received 429 from server - if self.rate_limit_sleep: - msg = f"请求过于频繁,已被Bybit限制,请等待{self.rate_limit_sleep}秒后再试" + def check_error(self, name: str, data: dict): + """""" + if data["ret_code"]: + error_code = data["ret_code"] + error_msg = data["ret_msg"] + msg = f"{name}失败,错误代码:{error_code},信息:{error_msg}" self.gateway.write_log(msg) - return False - # Just local request limit is reached - elif not self.rate_limit_remaining: - msg = "请求频率太高,有触发Bybit流控的风险,请稍候再试" - self.gateway.write_log(msg) - return False - else: - self.rate_limit_remaining -= 1 return True - def is_request_success(self, data: dict, request: "Request"): - return data['ret_code'] == 0 + return False - def query_contracts(self): - self.add_request("GET", - "/v2/public/tickers", - self.on_query_contracts) + def on_query_contract(self, data: dict, request: Request): + """""" + if self.check_error("查询合约", data): + return - def on_query_contracts(self, data: dict, request: "Request"): - for result in data['result']: - symbol = result['symbol'] + for d in data["result"]: contract = ContractData( gateway_name=self.gateway_name, - symbol=symbol, + symbol=d["name"], exchange=Exchange.BYBIT, - name=symbol, + name=d["name"], product=Product.FUTURES, size=1, - # todo: pricetick: Currently(2019-9-2) unable to query. - pricetick=PRICE_TICKS.get(symbol, 0.0001), + pricetick=d["price_filter"]["tick_size"], + min_volume=d["lot_size_filter"]["qty_step"] ) self.gateway.on_contract(contract) - def query_position(self): - self.add_request("GET", - "/position/list", - self.on_query_position - ) + self.gateway.write_log("合约信息查询成功") - def on_query_position(self, raw_data: dict, request: "Request"): - for data in raw_data['result']: - p1, p2 = self.gateway.parse_position_data(data) - self.gateway.on_position(p1) - if p2: - self.gateway.on_position(p2) + def query_position(self): + """""" + self.add_request( + "GET", + "/position/list", + self.on_query_position + ) + + def on_query_position(self, data: dict, request: Request): + """""" + if self.check_error("查询持仓", data): + return + + for d in data["result"]: + if d["side"] == "Buy": + volume = d["size"] + else: + volume = -d["size"] + + position = PositionData( + symbol=d["symbol"], + exchange=Exchange.BYBIT, + direction=Direction.NET, + volume=volume, + price=d["entry_price"], + pnl=d["unrealised_pnl"], + gateway_name=self.gateway_name + ) + self.gateway.on_position(position) account = AccountData( + accountid=d["symbol"].replace("USD", ""), + balance=d["wallet_balance"], + frozen=d["order_margin"], gateway_name=self.gateway_name, - accountid=p1.symbol, - balance=data['wallet_balance'], - frozen=data['order_margin'] ) self.gateway.on_account(account) - def query_orders(self): - """ - query all orders, including stop orders - """ - self.add_request("GET", - "/open-api/order/list", - callback=self.on_query_orders, - ) - self.query_stop_orders() + def query_order(self, page: int = 1): + """""" + params = { + "limit": 50, + "page": page, + "order_status": "Created,New,PartiallyFilled" + } - def query_stop_orders(self): - self.add_request("GET", - "/open-api/stop-order/list", - callback=self.on_query_stop_orders, - ) + self.add_request( + "GET", + "/open-api/order/list", + callback=self.on_query_order, + params=params + ) - def on_query_orders(self, raw_data: dict, request: "Request"): - result = raw_data['result'] - for data in result['data']: - if data['order_status'] != 'NotActive': # UnTriggered StopOrder - order = self.gateway.parse_order_data(data) - self.gateway.on_order(order) + def on_query_order(self, data: dict, request: Request): + """""" + if self.check_error("查询委托", data): + return - def on_query_stop_orders(self, raw_data: dict, request: "Request"): - result = raw_data['result'] - for data in result['data']: - order = self.gateway.parse_stop_order_data(data) + result = data["result"] + if not result: + self.gateway.write_log("委托信息查询成功") + return + + for d in result["data"]: + sys_orderid = d["order_id"] + + # Use sys_orderid as local_orderid when + # order placed from other source + local_orderid = d["order_link_id"] + if not local_orderid: + local_orderid = sys_orderid + + self.order_manager.update_orderid_map( + local_orderid, + sys_orderid + ) + + order = OrderData( + symbol=d["symbol"], + exchange=Exchange.BYBIT, + orderid=local_orderid, + type=ORDER_TYPE_BYBIT2VT[d["order_type"]], + direction=DIRECTION_BYBIT2VT[d["side"]], + price=d["price"], + volume=d["qty"], + traded=d["cum_exec_qty"], + status=STATUS_BYBIT2VT[d["order_status"]], + time=d["created_at"], + gateway_name=self.gateway_name + ) self.gateway.on_order(order) - - - + if result["current_page"] != result["last_page"]: + self.query_order(result["current_page"] + 1) + else: + self.gateway.write_log("委托信息查询成功") class RawOrderInfo: - def __init__(self, raw_data: dict): - self.id = raw_data['id'] - self.price = raw_data['price'] - self.side = raw_data['side'] # 'Buy', 'Sell' - self.size = raw_data.get('size', 0) + def __init__(self, data: dict): + self.id = data["id"] + self.price = data["price"] + self.side = data["side"] # "Buy", "Sell" + self.size = data.get("size", 0) def __lt__(self, rhs: "RawOrderInfo"): return self.price < rhs.price @@ -897,37 +538,29 @@ class RawOrderInfo: return self.price == rhs.price def __hash__(self): - return self.id # price is a string and we don't known its precision + return self.id # price is a string and we don"t known its precision class BybitWebsocketApi(WebsocketClient): """""" - def __init__(self, gateway: "BybitGateway"): + def __init__(self, gateway: BybitGateway): """""" - super(BybitWebsocketApi, self).__init__() + super().__init__() - self.server: str = "" # REAL or TESTNET self.gateway = gateway self.gateway_name = gateway.gateway_name + self.order_manager = gateway.order_manager self.key = "" self.secret = b"" + self.server: str = "" # REAL or TESTNET - self._instrument_info_data: Dict[str, dict] = defaultdict(dict) - self._orderbook_data: Dict[str, SortedSet[RawOrderInfo]] = defaultdict(SortedSet) + self.callbacks: Dict[str, Callable] = {} + self.ticks: Dict[str, TickData] = {} - self.accounts = {} - - self._topic_callbacks = {} - - @property - def ticks(self): - return self.gateway.ticks - - @property - def orders(self): - return self.gateway.orders + self.symbol_bids: Dict[str, dict] = {} + self.symbol_asks: Dict[str, dict] = {} def connect( self, key: str, secret: str, server: str, proxy_host: str, proxy_port: int @@ -939,50 +572,72 @@ class BybitWebsocketApi(WebsocketClient): self.proxy_port = proxy_port self.server = server - self.reset_authentication() + if self.server == "REAL": + url = WEBSOCKET_HOST + else: + url = TESTNET_WEBSOCKET_HOST + + self.init(url, self.proxy_host, self.proxy_port) self.start() - def reset_authentication(self): + def login(self): + """""" expires = generate_timestamp(30) - d2s = f"GET/realtime{int(expires)}" - signature = sign(self.secret, d2s.encode()) - params = f"api_key={self.key}&expires={expires}&signature={signature}" - if self.server == "REAL": - host = HOST - else: - host = TEST_HOST - url = f'{host}?{params}' - self.init(url, self.proxy_host, self.proxy_port) + msg = f"GET/realtime{int(expires)}" + signature = sign(self.secret, msg.encode()) + + req = { + "op": "auth", + "args": [self.key, expires, signature] + } + self.send_packet(req) def subscribe(self, req: SubscribeRequest): """ Subscribe to tick data upate. """ - symbol = req.symbol - self._subscribe_topic(f"instrument_info.100ms.{symbol}", self.on_tick) - self._subscribe_topic(f"orderBookL2_25.{symbol}", self.on_depth) + tick = TickData( + symbol=req.symbol, + exchange=req.exchange, + datetime=datetime.now(), + name=req.symbol, + gateway_name=self.gateway_name + ) + self.ticks[req.symbol] = tick + + self.subscribe_topic(f"instrument_info.100ms.{req.symbol}", self.on_tick) + self.subscribe_topic(f"orderBookL2_25.{req.symbol}", self.on_depth) def on_connected(self): """""" - self.gateway.write_log("Websocket API连接且认证成功") - self._subscribe_initialize_topics() + self.gateway.write_log("Websocket API连接成功") + self.login() def on_disconnected(self): """""" self.gateway.write_log("Websocket API连接断开") - self.reset_authentication() def on_packet(self, packet: dict): """""" - success = packet.get('success', None) - topic = packet.get('topic', None) - if success is not None: - if success is False: - self.gateway.write_log("Websocket API报错:%s" % packet["ret_msg"]) - elif topic is not None: - self._topic_callbacks[topic](topic, packet) + print(packet) + if "topic" not in packet: + op = packet["request"]["op"] + if op == "auth": + self.on_login(packet) else: - self.gateway.write_log(f"unkonwn packet: {packet}") + channel = packet["topic"] + callback = self.callbacks[channel] + callback(packet) + + # success = packet.get("success", None) + # topic = packet.get("topic", None) + # if success is not None: + # if success is False: + # self.gateway.write_log("Websocket API报错:%s" % packet["ret_msg"]) + # elif topic is not None: + # self.callbacks[topic](topic, packet) + # else: + # self.gateway.write_log(f"unkonwn packet: {packet}") def on_error(self, exception_type: type, exception_value: Exception, tb): """""" @@ -992,26 +647,23 @@ class BybitWebsocketApi(WebsocketClient): sys.stderr.write(self.exception_detail( exception_type, exception_value, tb)) - def authenticate(self): - """ - Authenticate websockey connection to subscribe private topic. - """ - expires = int(time.time()) - method = "GET" - path = "/realtime" - msg = method + path + str(expires) - signature = hmac.new( - self.secret, msg.encode(), digestmod=hashlib.sha256 - ).hexdigest() + def on_login(self, packet: dict): + """""" + success = packet.get("success", False) + if success: + self.gateway.write_log("Websocket API登录成功") - req = {"op": "authKey", "args": [self.key, expires, signature]} - self.send_packet(req) + self.subscribe_topic("order", self.on_order) + self.subscribe_topic("execution", self.on_trade) + self.subscribe_topic("position", self.on_position) + else: + self.gateway.write_log("Websocket API登录失败") - def _subscribe_topic(self, topic: str, callback: Callable[[str, dict], Any]): + def subscribe_topic(self, topic: str, callback: Callable[[str, dict], Any]): """ Subscribe to all private topics. """ - self._topic_callbacks[topic] = callback + self.callbacks[topic] = callback req = { "op": "subscribe", @@ -1019,118 +671,101 @@ class BybitWebsocketApi(WebsocketClient): } self.send_packet(req) - def _subscribe_initialize_topics(self): - """ - Subscribe to all private topics. - """ - self._subscribe_topic("order", self.on_order) - self._subscribe_topic("execution", self.on_trade) - self._subscribe_topic("position", self.on_position) - - def _get_last_tick(self, symbol: str): - tick = self.ticks.get(symbol, None) - if tick is None: - # noinspection PyTypeChecker - tick = TickData( - symbol=symbol, - exchange=Exchange.BYBIT, - name=symbol, - datetime=None, # this will be filled before this new created tick is consumed. - gateway_name=self.gateway_name, - ) - self.ticks[symbol] = tick - return tick - - def on_tick(self, topic: str, raw_data: dict): + def on_tick(self, packet: dict): """""" - # parse incremental data sent from server - symbol = topic[22:] - self._update_tick_incremental_data(symbol, raw_data) + topic = packet["topic"] + type_ = packet["type"] + data = packet["data"] + timestamp = packet["timestamp_e6"] - # convert dict into TickData - last_data = self._instrument_info_data[symbol] - tick = self._get_last_tick(symbol) - tick.last_price = last_data["last_price_e4"] / 10000 - tick.datetime = _parse_timestamp_e6(raw_data['timestamp_e6']) + symbol = topic.replace("instrument_info.100ms.", "") + tick = self.ticks[symbol] + + if type_ == "snapshot": + tick.last_price = data["last_price_e4"] / 10000 + tick.volume = data["volume_24h"] + else: + update = data["update"][0] + + if "last_price_e4" in update: + tick.last_price = update["last_price_e4"] / 10000 + + if "volume_24h" in update: + tick.volume = update["volume_24h"] + + tick.datetime = datetime.fromtimestamp(timestamp / 1_000_000) self.gateway.on_tick(copy(tick)) - def _update_tick_incremental_data(self, symbol, raw_data): - type_ = raw_data['type'] - data = raw_data['data'] - last_data = self._instrument_info_data[symbol] - if type_ == 'snapshot': - last_data.clear() - last_data.update(data) - elif type_ == 'delta': - updates = data['update'] - for update in updates: - assert update['symbol'] == symbol - last_data.update(update) - else: - self.gateway.write_log(f"Unknown type {type_} in websocket_api.on_tick()") - - def on_depth(self, topic: str, raw_data: dict): + def on_depth(self, packet: dict): """""" - symbol = topic[15:] - self._update_depth_incremental_data(symbol, raw_data) + topic = packet["topic"] + type_ = packet["type"] + data = packet["data"] + timestamp = packet["timestamp_e6"] - last_data = self._orderbook_data[symbol] + # Update depth data into dict buf + symbol = topic.replace("orderBookL2_25.", "") + tick = self.ticks[symbol] + bids = self.symbol_bids.setdefault(symbol, {}) + asks = self.symbol_asks.setdefault(symbol, {}) - tick = self._get_last_tick(symbol) - for n, parsed in enumerate(last_data.islice(20, 25)): - tick.__setattr__(f"bid_price_{5 - n}", parsed.price) - tick.__setattr__(f"bid_volume_{5 - n}", parsed.size) + if type_ == "snapshot": + for d in data: + price = float(d["price"]) - for n, parsed in enumerate(last_data.islice(25, 30)): - tick.__setattr__(f"ask_price_{n + 1}", parsed.price) - tick.__setattr__(f"ask_volume_{n + 1}", parsed.size) + if d["side"] == "Buy": + bids[price] = d + else: + asks[price] = d + else: + for d in data["delete"]: + price = float(d["price"]) + if d["side"] == "Buy": + bids.pop(price) + else: + asks.pop(price) - tick.datetime = _parse_timestamp_e6(raw_data['timestamp_e6']) + for d in (data["update"] + data["insert"]): + price = float(d["price"]) + if d["side"] == "Buy": + bids[price] = d + else: + asks[price] = d + + # Calculate 1-5 bid/ask depth + bid_keys = list(bids.keys()) + bid_keys.sort(reverse=True) + + ask_keys = list(asks.keys()) + ask_keys.sort() + + for i in range(5): + n = i + 1 + + bid_price = bid_keys[i] + bid_data = bids[bid_price] + ask_price = ask_keys[i] + ask_data = asks[ask_price] + + setattr(tick, f"bid_price_{n}", bid_price) + setattr(tick, f"bid_volume_{n}", bid_data["size"]) + setattr(tick, f"ask_price_{n}", ask_price) + setattr(tick, f"ask_volume_{n}", ask_data["size"]) + + tick.datetime = datetime.fromtimestamp(timestamp / 1_000_000) self.gateway.on_tick(copy(tick)) - def _update_depth_incremental_data(self, symbol, raw_data): - type_ = raw_data['type'] - data = raw_data['data'] - last_data = self._orderbook_data[symbol] - if type_ == 'snapshot': - last_data.clear() - for item in data: - assert item['symbol'] == symbol - parsed = RawOrderInfo(item) - last_data.add(parsed) - elif type_ == 'delta': - deletes = data['delete'] - for delete in deletes: - assert delete['symbol'] == symbol - parsed = RawOrderInfo(delete) - last_data.remove(parsed) - - updates = data['update'] - for update in updates: - assert update['symbol'] == symbol - parsed = RawOrderInfo(update) - last_data.remove(parsed) - last_data.add(parsed) - - inserts = data['insert'] - for insert in inserts: - assert insert['symbol'] == symbol - parsed = RawOrderInfo(insert) - last_data.add(parsed) - else: - self.gateway.write_log(f"Unknown type {type_} in websocket_api.on_tick()") - - def on_trade(self, topic: str, raw_data: dict): + def on_trade(self, topic: str, data: dict): """""" - for data in raw_data['data']: - order_id = data['order_link_id'] + for data in data["data"]: + order_id = data["order_link_id"] if not order_id: - order_id = data['order_id'] + order_id = data["order_id"] trade = TradeData( symbol=data["symbol"], exchange=Exchange.BYBIT, orderid=order_id, - tradeid=data['exec_id'], + tradeid=data["exec_id"], direction=DIRECTION_BYBIT2VT[data["side"]], price=data["price"], volume=data["exec_qty"], @@ -1140,72 +775,27 @@ class BybitWebsocketApi(WebsocketClient): self.gateway.on_trade(trade) - def on_order(self, topic: str, raw_data: dict): + def on_order(self, topic: str, data: dict): """""" - for data in raw_data['data']: + for data in data["data"]: print(data) - order = self.gateway.parse_order_data(data, 'timestamp') + order = self.gateway.parse_order_data(data, "timestamp") self.gateway.on_order(copy(order)) - def on_position(self, topic: str, raw_data: dict): + def on_position(self, topic: str, data: dict): """""" - for data in raw_data['data']: + for data in data["data"]: p1, p2 = self.gateway.parse_position_data(data) self.gateway.on_position(p1) if p2: self.gateway.on_position(p2) - def on_account(self, d): - """""" - accountid = str(d["account"]) - account = self.accounts.get(accountid, None) - if not account: - account = AccountData(accountid=accountid, - gateway_name=self.gateway_name) - self.accounts[accountid] = account - - account.balance = d.get("marginBalance", account.balance) - account.available = d.get("availableMargin", account.available) - account.frozen = account.balance - account.available - - self.gateway.on_account(copy(account)) - - def on_contract(self, d): - """""" - if "tickSize" not in d: - return - - if not d["lotSize"]: - return - - contract = ContractData( - symbol=d["symbol"], - exchange=Exchange.BYBIT, - name=d["symbol"], - product=Product.FUTURES, - pricetick=d["tickSize"], - size=d["lotSize"], - stop_supported=True, - net_position=True, - history_data=True, - gateway_name=self.gateway_name, - ) - - self.gateway.on_contract(contract) - - def _ping(self): - super()._ping() - self.send_packet({'op': 'ping'}) - def _parse_timestamp_e6(timestamp: int): return datetime.fromtimestamp(timestamp / 1_000_000, utc_tz).astimezone(tz=local_tz) - - - def generate_timestamp(expire_after: float = 30) -> int: """ :param expire_after: expires in seconds. From 2257f8318c2db19f38ce51deeb9c95458880a8e8 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Mon, 21 Oct 2019 15:38:28 +0800 Subject: [PATCH 3/5] [Mod] add query history data function --- requirements.txt | 1 - vnpy/gateway/bybit/bybit_gateway.py | 273 +++++++++++++++++----------- vnpy/trader/gateway.py | 6 +- 3 files changed, 165 insertions(+), 115 deletions(-) diff --git a/requirements.txt b/requirements.txt index 26a5e53b..3d7a0271 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,5 +18,4 @@ ta-lib ibapi deap pyzmq -sortedcontainers wmi diff --git a/vnpy/gateway/bybit/bybit_gateway.py b/vnpy/gateway/bybit/bybit_gateway.py index c5de04bf..fd9b1ddd 100644 --- a/vnpy/gateway/bybit/bybit_gateway.py +++ b/vnpy/gateway/bybit/bybit_gateway.py @@ -3,13 +3,12 @@ import hashlib import hmac import time import sys -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from typing import Any, Dict, List, Callable from threading import Lock from copy import copy from requests import ConnectionError -# from sortedcontainers import SortedSet from vnpy.api.websocket import WebsocketClient from vnpy.api.rest import Request, RestClient @@ -34,7 +33,7 @@ from vnpy.trader.object import ( CancelRequest, OrderRequest ) -# from vnpy.trader.event import EVENT_TIMER +from vnpy.trader.event import EVENT_TIMER from vnpy.trader.gateway import BaseGateway, LocalOrderManager @@ -46,20 +45,9 @@ STATUS_BYBIT2VT = { "Cancelled": Status.CANCELLED, "Rejected": Status.REJECTED, } -STOP_ORDER_STATUS_BYBIT2VT = { - "Untriggered": Status.NOTTRADED, - "Triggered": Status.NOTTRADED, - # Active: triggered and placed. - # since price is market price, placed == AllTraded? - "Active": Status.ALLTRADED, - "Cancelled": Status.CANCELLED, - "Rejected": Status.REJECTED, -} + DIRECTION_VT2BYBIT = {Direction.LONG: "Buy", Direction.SHORT: "Sell"} DIRECTION_BYBIT2VT = {v: k for k, v in DIRECTION_VT2BYBIT.items()} -DIRECTION_BYBIT2VT.update({ - "None": Direction.LONG -}) OPPOSITE_DIRECTION = { Direction.LONG: Direction.SHORT, @@ -78,12 +66,6 @@ INTERVAL_VT2BYBIT = { Interval.DAILY: "D", Interval.WEEKLY: "W", } -INTERVAL_VT2BYBIT_INT = { - Interval.MINUTE: 1, - Interval.HOUR: 60, - Interval.DAILY: 60 * 24, - Interval.WEEKLY: 60 * 24 * 7, -} TIMEDELTA_MAP = { Interval.MINUTE: timedelta(minutes=1), @@ -99,17 +81,6 @@ WEBSOCKET_HOST = "wss://stream.bybit.com/realtime" TESTNET_REST_HOST = "https://api-testnet.bybit.com" TESTNET_WEBSOCKET_HOST = "wss://stream-testnet.bybit.com/realtime" -# asked from official developer -PRICE_TICKS = { - "BTCUSD": 0.5, - "ETHUSD": 0.05, - "EOSUSD": 0.001, - "XRPUSD": 0.0001, -} - -utc_tz = timezone.utc -local_tz = datetime.now(timezone.utc).astimezone().tzinfo - class BybitGateway(BaseGateway): """ @@ -130,7 +101,9 @@ class BybitGateway(BaseGateway): """Constructor""" super().__init__(event_engine, "BYBIT") - self.order_manager = LocalOrderManager(self) + self.connect_time = datetime.now().strftime("%y%m%d%H%M%S") + self.order_manager = LocalOrderManager(self, self.connect_time) + self.rest_api = BybitRestApi(self) self.ws_api = BybitWebsocketApi(self) @@ -150,6 +123,8 @@ class BybitGateway(BaseGateway): self.rest_api.connect(key, secret, server, proxy_host, proxy_port) self.ws_api.connect(key, secret, server, proxy_host, proxy_port) + self.event_engine.register(EVENT_TIMER, self.process_timer_event) + def subscribe(self, req: SubscribeRequest): """""" self.ws_api.subscribe(req) @@ -168,17 +143,21 @@ class BybitGateway(BaseGateway): def query_position(self): """""" - pass + self.rest_api.query_position() def query_history(self, req: HistoryRequest): """""" - return self.rest_api.query_history() + return self.rest_api.query_history(req) def close(self): """""" self.rest_api.stop() self.ws_api.stop() + def process_timer_event(self, event): + """""" + self.query_position() + class BybitRestApi(RestClient): """ @@ -217,7 +196,8 @@ class BybitRestApi(RestClient): api_params["recv_window"] = 30 * 1000 api_params["timestamp"] = generate_timestamp(-1) - data2sign = "&".join([f"{k}={v}" for k, v in sorted(api_params.items())]) + data2sign = "&".join( + [f"{k}={v}" for k, v in sorted(api_params.items())]) signature = sign(self.secret, data2sign.encode()) api_params["sign"] = signature @@ -267,7 +247,7 @@ class BybitRestApi(RestClient): } order = req.create_order_data(order_id, self.gateway_name) - order.time = datetime.now().isoformat()[11:19] + order.time = str(datetime.now().isoformat()) # Only add price for limit order. data["order_type"] = ORDER_TYPE_VT2BYBIT[req.type] @@ -282,7 +262,7 @@ class BybitRestApi(RestClient): on_error=self.on_send_order_error, ) - self.gateway.on_order(order) + self.order_manager.on_order(order) return order.vt_orderid def cancel_order(self, req: CancelRequest): @@ -300,15 +280,71 @@ class BybitRestApi(RestClient): callback=self.on_cancel_order ) - def query_history( - self, - symbol: str, - interval: Interval, - start: int, # unix timestamp - limit: int = None, - ) -> List[BarData]: + def query_history(self, req: HistoryRequest) -> List[BarData]: """""" - pass + history = [] + count = 200 + start_time = int(req.start.timestamp()) + + while True: + # Create query params + params = { + "symbol": req.symbol, + "interval": INTERVAL_VT2BYBIT[req.interval], + "from": start_time, + "limit": count + } + + # Get response from server + resp = self.request( + "GET", + "/v2/public/kline/list", + params=params + ) + + # Break if request failed with other status code + if resp.status_code // 100 != 2: + msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}" + self.gateway.write_log(msg) + break + else: + data = resp.json() + if not data: + msg = f"获取历史数据为空,开始时间:{start_time},数量:{count}" + break + + buf = [] + for d in data["result"]: + dt = datetime.fromtimestamp(d["open_time"]) + bar = BarData( + symbol=req.symbol, + exchange=req.exchange, + datetime=dt, + interval=req.interval, + volume=int(d["volume"]), + open_price=float(d["open"]), + high_price=float(d["high"]), + low_price=float(d["low"]), + close_price=float(d["close"]), + gateway_name=self.gateway_name + ) + buf.append(bar) + + history.extend(buf) + + begin = buf[0].datetime + end = buf[-1].datetime + msg = f"获取历史数据成功,{req.symbol} - {req.interval.value},{begin} - {end}" + self.gateway.write_log(msg) + + # Break if last data collected + if len(buf) < count: + break + + # Update start time + start_time = int((bar.datetime + TIMEDELTA_MAP[req.interval]).timestamp()) + + return history def on_send_order_failed(self, status_code: int, request: Request): """ @@ -316,7 +352,7 @@ class BybitRestApi(RestClient): """ order = request.extra order.status = Status.REJECTED - self.gateway.on_order(order) + self.order_manager.on_order(order) data = request.response.json() error_msg = data["ret_msg"] @@ -332,7 +368,7 @@ class BybitRestApi(RestClient): """ order = request.extra order.status = Status.REJECTED - self.gateway.on_order(order) + self.order_manager.on_order(order) # Record exception if not ConnectionError if not issubclass(exception_type, ConnectionError): @@ -340,6 +376,9 @@ class BybitRestApi(RestClient): def on_send_order(self, data: dict, request: Request): """""" + if self.check_error("委托下单", data): + return + result = data["result"] self.order_manager.update_orderid_map( result["order_link_id"], @@ -358,7 +397,8 @@ class BybitRestApi(RestClient): def on_cancel_order(self, data: dict, request: Request): """""" - pass + if self.check_error("委托下单", data): + return def on_failed(self, status_code: int, request: Request): """ @@ -412,14 +452,16 @@ class BybitRestApi(RestClient): for d in data["result"]: contract = ContractData( - gateway_name=self.gateway_name, symbol=d["name"], exchange=Exchange.BYBIT, name=d["name"], product=Product.FUTURES, size=1, pricetick=d["price_filter"]["tick_size"], - min_volume=d["lot_size_filter"]["qty_step"] + min_volume=d["lot_size_filter"]["qty_step"], + net_position=True, + history_data=True, + gateway_name=self.gateway_name ) self.gateway.on_contract(contract) @@ -450,7 +492,6 @@ class BybitRestApi(RestClient): direction=Direction.NET, volume=volume, price=d["entry_price"], - pnl=d["unrealised_pnl"], gateway_name=self.gateway_name ) self.gateway.on_position(position) @@ -515,7 +556,7 @@ class BybitRestApi(RestClient): time=d["created_at"], gateway_name=self.gateway_name ) - self.gateway.on_order(order) + self.order_manager.on_order(order) if result["current_page"] != result["last_page"]: self.query_order(result["current_page"] + 1) @@ -523,24 +564,6 @@ class BybitRestApi(RestClient): self.gateway.write_log("委托信息查询成功") -class RawOrderInfo: - - def __init__(self, data: dict): - self.id = data["id"] - self.price = data["price"] - self.side = data["side"] # "Buy", "Sell" - self.size = data.get("size", 0) - - def __lt__(self, rhs: "RawOrderInfo"): - return self.price < rhs.price - - def __eq__(self, rhs: "RawOrderInfo"): - return self.price == rhs.price - - def __hash__(self): - return self.id # price is a string and we don"t known its precision - - class BybitWebsocketApi(WebsocketClient): """""" @@ -558,6 +581,7 @@ class BybitWebsocketApi(WebsocketClient): self.callbacks: Dict[str, Callable] = {} self.ticks: Dict[str, TickData] = {} + self.subscribed: Dict[str, SubscribeRequest] = {} self.symbol_bids: Dict[str, dict] = {} self.symbol_asks: Dict[str, dict] = {} @@ -596,6 +620,8 @@ class BybitWebsocketApi(WebsocketClient): """ Subscribe to tick data upate. """ + self.subscribed[req.symbol] = req + tick = TickData( symbol=req.symbol, exchange=req.exchange, @@ -605,7 +631,8 @@ class BybitWebsocketApi(WebsocketClient): ) self.ticks[req.symbol] = tick - self.subscribe_topic(f"instrument_info.100ms.{req.symbol}", self.on_tick) + self.subscribe_topic( + f"instrument_info.100ms.{req.symbol}", self.on_tick) self.subscribe_topic(f"orderBookL2_25.{req.symbol}", self.on_depth) def on_connected(self): @@ -619,7 +646,6 @@ class BybitWebsocketApi(WebsocketClient): def on_packet(self, packet: dict): """""" - print(packet) if "topic" not in packet: op = packet["request"]["op"] if op == "auth": @@ -629,16 +655,6 @@ class BybitWebsocketApi(WebsocketClient): callback = self.callbacks[channel] callback(packet) - # success = packet.get("success", None) - # topic = packet.get("topic", None) - # if success is not None: - # if success is False: - # self.gateway.write_log("Websocket API报错:%s" % packet["ret_msg"]) - # elif topic is not None: - # self.callbacks[topic](topic, packet) - # else: - # self.gateway.write_log(f"unkonwn packet: {packet}") - def on_error(self, exception_type: type, exception_value: Exception, tb): """""" msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" @@ -656,6 +672,9 @@ class BybitWebsocketApi(WebsocketClient): self.subscribe_topic("order", self.on_order) self.subscribe_topic("execution", self.on_trade) self.subscribe_topic("position", self.on_position) + + for req in self.subscribed.values(): + self.subscribe(req) else: self.gateway.write_log("Websocket API登录失败") @@ -755,45 +774,81 @@ class BybitWebsocketApi(WebsocketClient): tick.datetime = datetime.fromtimestamp(timestamp / 1_000_000) self.gateway.on_tick(copy(tick)) - def on_trade(self, topic: str, data: dict): + def on_trade(self, packet: dict): """""" - for data in data["data"]: - order_id = data["order_link_id"] + for d in packet["data"]: + order_id = d["order_link_id"] if not order_id: - order_id = data["order_id"] + order_id = d["order_id"] + trade = TradeData( - symbol=data["symbol"], + symbol=d["symbol"], exchange=Exchange.BYBIT, orderid=order_id, - tradeid=data["exec_id"], - direction=DIRECTION_BYBIT2VT[data["side"]], - price=data["price"], - volume=data["exec_qty"], - time=parse_datetime(data["trade_time"]), + tradeid=d["exec_id"], + direction=DIRECTION_BYBIT2VT[d["side"]], + price=d["price"], + volume=d["exec_qty"], + time=d["trade_time"], gateway_name=self.gateway_name, ) self.gateway.on_trade(trade) - def on_order(self, topic: str, data: dict): + def on_order(self, packet: dict): """""" - for data in data["data"]: - print(data) - order = self.gateway.parse_order_data(data, "timestamp") + for d in packet["data"]: + sys_orderid = d["order_id"] + order = self.order_manager.get_order_with_sys_orderid(sys_orderid) - self.gateway.on_order(copy(order)) + if order: + order.traded = d["cum_exec_qty"] + order.status = STATUS_BYBIT2VT[d["order_status"]] + else: + # Use sys_orderid as local_orderid when + # order placed from other source + local_orderid = d["order_link_id"] + if not local_orderid: + local_orderid = sys_orderid - def on_position(self, topic: str, data: dict): + self.order_manager.update_orderid_map( + local_orderid, + sys_orderid + ) + + order = OrderData( + symbol=d["symbol"], + exchange=Exchange.BYBIT, + orderid=local_orderid, + type=ORDER_TYPE_BYBIT2VT[d["order_type"]], + direction=DIRECTION_BYBIT2VT[d["side"]], + price=d["price"], + volume=d["qty"], + traded=d["cum_exec_qty"], + status=STATUS_BYBIT2VT[d["order_status"]], + time=d["timestamp"], + gateway_name=self.gateway_name + ) + + self.order_manager.on_order(order) + + def on_position(self, packet: dict): """""" - for data in data["data"]: - p1, p2 = self.gateway.parse_position_data(data) - self.gateway.on_position(p1) - if p2: - self.gateway.on_position(p2) + for d in packet["data"]: + if d["side"] == "Buy": + volume = d["size"] + else: + volume = -d["size"] - -def _parse_timestamp_e6(timestamp: int): - return datetime.fromtimestamp(timestamp / 1_000_000, utc_tz).astimezone(tz=local_tz) + position = PositionData( + symbol=d["symbol"], + exchange=Exchange.BYBIT, + direction=Direction.NET, + volume=volume, + price=d["entry_price"], + gateway_name=self.gateway_name + ) + self.gateway.on_position(position) def generate_timestamp(expire_after: float = 30) -> int: @@ -809,7 +864,3 @@ def sign(secret: bytes, data: bytes) -> str: return hmac.new( secret, data, digestmod=hashlib.sha256 ).hexdigest() - - -def parse_datetime(dt: str) -> str: - return dt[11:19] diff --git a/vnpy/trader/gateway.py b/vnpy/trader/gateway.py index 9d8f1770..4ce95cff 100644 --- a/vnpy/trader/gateway.py +++ b/vnpy/trader/gateway.py @@ -265,12 +265,12 @@ class LocalOrderManager: Management tool to support use local order id for trading. """ - def __init__(self, gateway: BaseGateway): + def __init__(self, gateway: BaseGateway, order_prefix: str = ""): """""" self.gateway = gateway # For generating local orderid - self.order_prefix = "" + self.order_prefix = order_prefix self.order_count = 0 self.orders = {} # local_orderid:order @@ -296,7 +296,7 @@ class LocalOrderManager: Generate a new local orderid. """ self.order_count += 1 - local_orderid = str(self.order_count).rjust(8, "0") + local_orderid = self.order_prefix + str(self.order_count).rjust(8, "0") return local_orderid def get_local_orderid(self, sys_orderid: str): From 765bb5078a917e1aed8dd93bd7bbf61d01b69a17 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Mon, 21 Oct 2019 15:44:58 +0800 Subject: [PATCH 4/5] [Fix] update class combo after strategy classes loaded --- vnpy/app/cta_backtester/ui/widget.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/vnpy/app/cta_backtester/ui/widget.py b/vnpy/app/cta_backtester/ui/widget.py index 352fde47..87d1ad0a 100644 --- a/vnpy/app/cta_backtester/ui/widget.py +++ b/vnpy/app/cta_backtester/ui/widget.py @@ -37,10 +37,10 @@ class BacktesterManager(QtWidgets.QWidget): self.target_display = "" - self.init_strategy_settings() self.init_ui() self.register_event() self.backtester_engine.init_engine() + self.init_strategy_settings() def init_strategy_settings(self): """""" @@ -50,13 +50,14 @@ class BacktesterManager(QtWidgets.QWidget): setting = self.backtester_engine.get_default_setting(class_name) self.settings[class_name] = setting + self.class_combo.addItems(self.class_names) + def init_ui(self): """""" self.setWindowTitle("CTA回测") # Setting Part self.class_combo = QtWidgets.QComboBox() - self.class_combo.addItems(self.class_names) self.symbol_line = QtWidgets.QLineEdit("IF88.CFFEX") From 5df4e6f73834b64a5effbec36c2f483cefbfee6d Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Mon, 21 Oct 2019 15:52:39 +0800 Subject: [PATCH 5/5] [Mod] change code struction and complete test of BybitGateway --- vnpy/gateway/bybit/bybit_gateway.py | 188 ++++++++++++++-------------- 1 file changed, 94 insertions(+), 94 deletions(-) diff --git a/vnpy/gateway/bybit/bybit_gateway.py b/vnpy/gateway/bybit/bybit_gateway.py index fd9b1ddd..afdd7b2b 100644 --- a/vnpy/gateway/bybit/bybit_gateway.py +++ b/vnpy/gateway/bybit/bybit_gateway.py @@ -194,7 +194,7 @@ class BybitRestApi(RestClient): api_params["api_key"] = self.key api_params["recv_window"] = 30 * 1000 - api_params["timestamp"] = generate_timestamp(-1) + api_params["timestamp"] = generate_timestamp(-5) data2sign = "&".join( [f"{k}={v}" for k, v in sorted(api_params.items())]) @@ -265,87 +265,6 @@ class BybitRestApi(RestClient): self.order_manager.on_order(order) return order.vt_orderid - def cancel_order(self, req: CancelRequest): - """""" - sys_orderid = self.order_manager.get_sys_orderid(req.orderid) - data = { - "order_id": sys_orderid, - "symbol": req.symbol, - } - - self.add_request( - "POST", - path="/open-api/order/cancel", - data=data, - callback=self.on_cancel_order - ) - - def query_history(self, req: HistoryRequest) -> List[BarData]: - """""" - history = [] - count = 200 - start_time = int(req.start.timestamp()) - - while True: - # Create query params - params = { - "symbol": req.symbol, - "interval": INTERVAL_VT2BYBIT[req.interval], - "from": start_time, - "limit": count - } - - # Get response from server - resp = self.request( - "GET", - "/v2/public/kline/list", - params=params - ) - - # Break if request failed with other status code - if resp.status_code // 100 != 2: - msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}" - self.gateway.write_log(msg) - break - else: - data = resp.json() - if not data: - msg = f"获取历史数据为空,开始时间:{start_time},数量:{count}" - break - - buf = [] - for d in data["result"]: - dt = datetime.fromtimestamp(d["open_time"]) - bar = BarData( - symbol=req.symbol, - exchange=req.exchange, - datetime=dt, - interval=req.interval, - volume=int(d["volume"]), - open_price=float(d["open"]), - high_price=float(d["high"]), - low_price=float(d["low"]), - close_price=float(d["close"]), - gateway_name=self.gateway_name - ) - buf.append(bar) - - history.extend(buf) - - begin = buf[0].datetime - end = buf[-1].datetime - msg = f"获取历史数据成功,{req.symbol} - {req.interval.value},{begin} - {end}" - self.gateway.write_log(msg) - - # Break if last data collected - if len(buf) < count: - break - - # Update start time - start_time = int((bar.datetime + TIMEDELTA_MAP[req.interval]).timestamp()) - - return history - def on_send_order_failed(self, status_code: int, request: Request): """ Callback when sending order failed on server. @@ -385,6 +304,21 @@ class BybitRestApi(RestClient): result["order_id"] ) + def cancel_order(self, req: CancelRequest): + """""" + sys_orderid = self.order_manager.get_sys_orderid(req.orderid) + data = { + "order_id": sys_orderid, + "symbol": req.symbol, + } + + self.add_request( + "POST", + path="/open-api/order/cancel", + data=data, + callback=self.on_cancel_order + ) + def on_cancel_order_error( self, exception_type: type, exception_value: Exception, tb, request: Request ): @@ -563,6 +497,72 @@ class BybitRestApi(RestClient): else: self.gateway.write_log("委托信息查询成功") + def query_history(self, req: HistoryRequest) -> List[BarData]: + """""" + history = [] + count = 200 + start_time = int(req.start.timestamp()) + + while True: + # Create query params + params = { + "symbol": req.symbol, + "interval": INTERVAL_VT2BYBIT[req.interval], + "from": start_time, + "limit": count + } + + # Get response from server + resp = self.request( + "GET", + "/v2/public/kline/list", + params=params + ) + + # Break if request failed with other status code + if resp.status_code // 100 != 2: + msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}" + self.gateway.write_log(msg) + break + else: + data = resp.json() + if not data: + msg = f"获取历史数据为空,开始时间:{start_time},数量:{count}" + break + + buf = [] + for d in data["result"]: + dt = datetime.fromtimestamp(d["open_time"]) + bar = BarData( + symbol=req.symbol, + exchange=req.exchange, + datetime=dt, + interval=req.interval, + volume=int(d["volume"]), + open_price=float(d["open"]), + high_price=float(d["high"]), + low_price=float(d["low"]), + close_price=float(d["close"]), + gateway_name=self.gateway_name + ) + buf.append(bar) + + history.extend(buf) + + begin = buf[0].datetime + end = buf[-1].datetime + msg = f"获取历史数据成功,{req.symbol} - {req.interval.value},{begin} - {end}" + self.gateway.write_log(msg) + + # Break if last data collected + if len(buf) < count: + break + + # Update start time + start_time = int((bar.datetime + TIMEDELTA_MAP[req.interval]).timestamp()) + + return history + class BybitWebsocketApi(WebsocketClient): """""" @@ -635,6 +635,18 @@ class BybitWebsocketApi(WebsocketClient): f"instrument_info.100ms.{req.symbol}", self.on_tick) self.subscribe_topic(f"orderBookL2_25.{req.symbol}", self.on_depth) + def subscribe_topic(self, topic: str, callback: Callable[[str, dict], Any]): + """ + Subscribe to all private topics. + """ + self.callbacks[topic] = callback + + req = { + "op": "subscribe", + "args": [topic], + } + self.send_packet(req) + def on_connected(self): """""" self.gateway.write_log("Websocket API连接成功") @@ -678,18 +690,6 @@ class BybitWebsocketApi(WebsocketClient): else: self.gateway.write_log("Websocket API登录失败") - def subscribe_topic(self, topic: str, callback: Callable[[str, dict], Any]): - """ - Subscribe to all private topics. - """ - self.callbacks[topic] = callback - - req = { - "op": "subscribe", - "args": [topic], - } - self.send_packet(req) - def on_tick(self, packet: dict): """""" topic = packet["topic"]