Merge pull request #2149 from vnpy/dev-bybit

Dev bybit
This commit is contained in:
vn.py 2019-10-21 15:53:26 +08:00 committed by GitHub
commit 216a5d3f5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 796 additions and 1132 deletions

View File

@ -31,6 +31,7 @@ from vnpy.gateway.da import DaGateway
from vnpy.gateway.coinbase import CoinbaseGateway from vnpy.gateway.coinbase import CoinbaseGateway
from vnpy.gateway.bitstamp import BitstampGateway from vnpy.gateway.bitstamp import BitstampGateway
from vnpy.gateway.gateios import GateiosGateway from vnpy.gateway.gateios import GateiosGateway
from vnpy.gateway.bybit import BybitGateway
from vnpy.app.cta_strategy import CtaStrategyApp from vnpy.app.cta_strategy import CtaStrategyApp
# from vnpy.app.csv_loader import CsvLoaderApp # from vnpy.app.csv_loader import CsvLoaderApp
@ -52,7 +53,7 @@ def main():
main_engine = MainEngine(event_engine) main_engine = MainEngine(event_engine)
# main_engine.add_gateway(BinanceGateway) # main_engine.add_gateway(BinanceGateway)
main_engine.add_gateway(CtpGateway) # main_engine.add_gateway(CtpGateway)
# main_engine.add_gateway(CtptestGateway) # main_engine.add_gateway(CtptestGateway)
# main_engine.add_gateway(MiniGateway) # main_engine.add_gateway(MiniGateway)
# main_engine.add_gateway(SoptGateway) # main_engine.add_gateway(SoptGateway)
@ -60,12 +61,12 @@ def main():
# main_engine.add_gateway(FemasGateway) # main_engine.add_gateway(FemasGateway)
# main_engine.add_gateway(IbGateway) # main_engine.add_gateway(IbGateway)
# main_engine.add_gateway(FutuGateway) # main_engine.add_gateway(FutuGateway)
main_engine.add_gateway(BitmexGateway) # main_engine.add_gateway(BitmexGateway)
# main_engine.add_gateway(TigerGateway) # main_engine.add_gateway(TigerGateway)
# main_engine.add_gateway(OesGateway) # main_engine.add_gateway(OesGateway)
# main_engine.add_gateway(OkexGateway) # main_engine.add_gateway(OkexGateway)
# main_engine.add_gateway(HuobiGateway) # main_engine.add_gateway(HuobiGateway)
main_engine.add_gateway(BitfinexGateway) # main_engine.add_gateway(BitfinexGateway)
# main_engine.add_gateway(OnetokenGateway) # main_engine.add_gateway(OnetokenGateway)
# main_engine.add_gateway(OkexfGateway) # main_engine.add_gateway(OkexfGateway)
# main_engine.add_gateway(HbdmGateway) # main_engine.add_gateway(HbdmGateway)
@ -76,8 +77,9 @@ def main():
# main_engine.add_gateway(OkexsGateway) # main_engine.add_gateway(OkexsGateway)
# main_engine.add_gateway(DaGateway) # main_engine.add_gateway(DaGateway)
# main_engine.add_gateway(CoinbaseGateway) # main_engine.add_gateway(CoinbaseGateway)
main_engine.add_gateway(BitstampGateway) # main_engine.add_gateway(BitstampGateway)
main_engine.add_gateway(GateiosGateway) # main_engine.add_gateway(GateiosGateway)
main_engine.add_gateway(BybitGateway)
main_engine.add_app(CtaStrategyApp) main_engine.add_app(CtaStrategyApp)
main_engine.add_app(CtaBacktesterApp) main_engine.add_app(CtaBacktesterApp)

View File

@ -18,5 +18,4 @@ ta-lib
ibapi ibapi
deap deap
pyzmq pyzmq
sortedcontainers
wmi wmi

View File

@ -37,10 +37,10 @@ class BacktesterManager(QtWidgets.QWidget):
self.target_display = "" self.target_display = ""
self.init_strategy_settings()
self.init_ui() self.init_ui()
self.register_event() self.register_event()
self.backtester_engine.init_engine() self.backtester_engine.init_engine()
self.init_strategy_settings()
def init_strategy_settings(self): def init_strategy_settings(self):
"""""" """"""
@ -50,13 +50,14 @@ class BacktesterManager(QtWidgets.QWidget):
setting = self.backtester_engine.get_default_setting(class_name) setting = self.backtester_engine.get_default_setting(class_name)
self.settings[class_name] = setting self.settings[class_name] = setting
self.class_combo.addItems(self.class_names)
def init_ui(self): def init_ui(self):
"""""" """"""
self.setWindowTitle("CTA回测") self.setWindowTitle("CTA回测")
# Setting Part # Setting Part
self.class_combo = QtWidgets.QComboBox() self.class_combo = QtWidgets.QComboBox()
self.class_combo.addItems(self.class_names)
self.symbol_line = QtWidgets.QLineEdit("IF88.CFFEX") self.symbol_line = QtWidgets.QLineEdit("IF88.CFFEX")

File diff suppressed because it is too large Load Diff

View File

@ -1,82 +0,0 @@
import hashlib
import hmac
import time
from datetime import datetime, timedelta, timezone
from vnpy.trader.constant import Direction, Interval, OrderType, Status
STATUS_BYBIT2VT = {
"Created": Status.NOTTRADED,
"New": Status.NOTTRADED,
"PartiallyFilled": Status.PARTTRADED,
"Filled": Status.ALLTRADED,
"Cancelled": Status.CANCELLED,
"Rejected": Status.REJECTED,
}
STOP_ORDER_STATUS_BYBIT2VT = {
"Untriggered": Status.NOTTRADED,
"Triggered": Status.NOTTRADED,
# Active: triggered and placed.
# since price is market price, placed == AllTraded?
"Active": Status.ALLTRADED,
"Cancelled": Status.CANCELLED,
"Rejected": Status.REJECTED,
}
DIRECTION_VT2BYBIT = {Direction.LONG: "Buy", Direction.SHORT: "Sell"}
DIRECTION_BYBIT2VT = {v: k for k, v in DIRECTION_VT2BYBIT.items()}
DIRECTION_BYBIT2VT.update({
"None": Direction.LONG
})
OPPOSITE_DIRECTION = {
Direction.LONG: Direction.SHORT,
Direction.SHORT: Direction.LONG,
}
ORDER_TYPE_VT2BYBIT = {
OrderType.LIMIT: "Limit",
OrderType.MARKET: "Market",
}
ORDER_TYPE_BYBIT2VT = {v: k for k, v in ORDER_TYPE_VT2BYBIT.items()}
INTERVAL_VT2BYBIT = {
Interval.MINUTE: "1",
Interval.HOUR: "60",
Interval.DAILY: "D",
Interval.WEEKLY: "W",
}
INTERVAL_VT2BYBIT_INT = {
Interval.MINUTE: 1,
Interval.HOUR: 60,
Interval.DAILY: 60 * 24,
Interval.WEEKLY: 60 * 24 * 7,
}
TIMEDELTA_MAP = {
Interval.MINUTE: timedelta(minutes=1),
Interval.HOUR: timedelta(hours=1),
Interval.DAILY: timedelta(days=1),
Interval.WEEKLY: timedelta(days=7),
}
utc_tz = timezone.utc
local_tz = datetime.now(timezone.utc).astimezone().tzinfo
def generate_timestamp(expire_after: float = 30) -> int:
"""
:param expire_after: expires in seconds.
:return: timestamp in milliseconds
"""
return int(time.time() * 1000 + expire_after * 1000)
def sign(secret: bytes, data: bytes) -> str:
""""""
return hmac.new(
secret, data, digestmod=hashlib.sha256
).hexdigest()
def parse_datetime(dt: str) -> str:
return dt[11:19]

View File

@ -1,485 +0,0 @@
import sys
from dataclasses import dataclass
from datetime import datetime
from threading import Lock
from typing import List, TYPE_CHECKING, Tuple
from urllib.parse import urlencode
from requests import ConnectionError
from vnpy.api.rest import Request, RestClient
from vnpy.trader.constant import Exchange, Interval, OrderType, Product, Status
from vnpy.trader.object import AccountData, BarData, CancelRequest, ContractData, OrderRequest
from .common import (DIRECTION_VT2BYBIT, INTERVAL_VT2BYBIT, ORDER_TYPE_VT2BYBIT, generate_timestamp,
parse_datetime, sign, INTERVAL_VT2BYBIT_INT)
if TYPE_CHECKING:
from vnpy.gateway.bybit import BybitGateway
_ = lambda x: x # noqa
HOST = "https://api.bybit.com"
TEST_HOST = "https://api-testnet.bybit.com"
# asked from official developer
PRICE_TICKS = {
"BTCUSD": 0.5,
"ETHUSD": 0.05,
"EOSUSD": 0.001,
"XRPUSD": 0.0001,
}
@dataclass()
class HistoryDataNextInfo:
symbol: str
interval: Interval
end: int
class RequestFailedException(Exception):
pass
class BybitRestApi(RestClient):
"""
BitMEX REST API
"""
def __init__(self, gateway: "BybitGateway"):
""""""
super(BybitRestApi, self).__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.key = ""
self.secret = b""
self.order_count = 1_000_000
self.order_count_lock = Lock()
self.connect_time = 0
# Use 60 by default, and will update after first request
self.rate_limit_limit = 60
self.rate_limit_remaining = 60
self.rate_limit_sleep = 0
@property
def ticks(self):
return self.gateway.ticks
def sign(self, request):
"""
Generate BitMEX signature.
"""
if request.method == "GET":
api_params = request.params # dict 2 sign
if api_params is None:
api_params = request.params = {}
else: # POST
api_params = request.data
if api_params is None:
api_params = request.data = {}
expires = generate_timestamp(0)
api_params['api_key'] = self.key
api_params['recv_window'] = 30 * 1000
api_params['timestamp'] = expires
data2sign = urlencode({k: api_params[k] for k in sorted(api_params)})
signature = sign(self.secret, data2sign.encode())
api_params['sign'] = signature
return request
def connect(
self,
key: str,
secret: str,
session_number: int,
server: str,
proxy_host: str,
proxy_port: int,
):
"""
Initialize connection to REST server.
"""
self.key = key
self.secret = secret.encode()
self.connect_time = (
int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count
)
if server == "REAL":
self.init(HOST, proxy_host, proxy_port)
else:
self.init(TEST_HOST, proxy_host, proxy_port)
self.start(session_number)
self.gateway.write_log(_("REST API启动成功"))
def _new_order_id(self):
""""""
with self.order_count_lock:
self.order_count += 1
return self.order_count
def send_order(self, req: OrderRequest):
""""""
if not self.check_rate_limit():
return ""
order_id = str(self.connect_time + self._new_order_id())
symbol = req.symbol
data = {
"symbol": symbol,
"side": DIRECTION_VT2BYBIT[req.direction],
"qty": int(req.volume),
"order_link_id": order_id,
"time_in_force": "GoodTillCancel"
}
order = req.create_order_data(order_id, self.gateway_name)
order.time = datetime.now().isoformat()[11:19]
# Only add price for limit order.
if req.type != OrderType.STOP:
data["order_type"] = ORDER_TYPE_VT2BYBIT[req.type]
data["price"] = req.price
self.add_request(
"POST",
"/open-api/order/create",
callback=self.on_send_order,
data=data,
extra=order,
on_failed=self.on_send_order_failed,
on_error=self.on_send_order_error,
)
else:
assert self.ticks[symbol], _("Subscribe This Symbol before sending StopOrder.")
last_price = self.ticks[symbol].last_price
data["order_type"] = 'Market'
data["stop_px"] = req.price
data["base_price"] = last_price
self.add_request(
"POST",
"/open-api/stop-order/create",
callback=self.on_send_stop_order,
data=data,
extra=order,
on_failed=self.on_send_order_failed,
on_error=self.on_send_order_error,
)
self.gateway.on_order(order)
return order.vt_orderid
def cancel_order(self, req: CancelRequest):
""""""
if not self.check_rate_limit():
return
sys_id = self.gateway.orderid2sys(req.orderid)
order = self.gateway.orders[req.orderid]
if order.type != OrderType.STOP:
path = "/open-api/order/cancel"
key = "order_id"
callback = self.on_cancel_order
else:
path = "/open-api/stop-order/cancel"
key = "stop_order_id"
callback = self.on_cancel_stop_order
self.add_request(
"POST",
path,
callback=callback,
data={
key: sys_id,
"symbol": req.symbol,
},
on_error=self.on_cancel_order_error,
extra=order,
)
def query_history(self,
symbol: str,
interval: Interval,
start: int, # unix timestamp
limit: int = None,
) -> Tuple[List[BarData], "HistoryDataNextInfo"]:
"""
Get history data synchronously.
"""
if limit is None:
limit = self.gateway.HISTORY_RECORD_PER_REQUEST
bars = []
# datetime for a bar is close_time
# we got open_time from API.
adjustment = INTERVAL_VT2BYBIT_INT[interval]
params = {
"interval": INTERVAL_VT2BYBIT[interval],
"symbol": symbol,
"limit": limit,
"from": start,
}
# todo: RestClient: return RestClient.Request object instead of requests.Response.
resp = self.request(
"GET",
"/v2/public/kline/list",
params=params
)
# Break if request failed with other status code
raw_data = resp.json()
if not self.is_request_success(raw_data, None):
msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}"
self.gateway.write_log(msg)
raise RequestFailedException(msg)
result = raw_data['result']
for data in result:
open_time = int(data["open_time"])
close_time = open_time + adjustment
close_dt = datetime.fromtimestamp(close_time)
bar = BarData(
symbol=symbol,
exchange=Exchange.BYBIT,
datetime=close_dt,
interval=interval,
volume=data["volume"],
open_price=data["open"],
high_price=data["high"],
low_price=data["low"],
close_price=data["close"],
gateway_name=self.gateway_name
)
bars.append(bar)
end = result[-1]["open_time"]
return bars, HistoryDataNextInfo(symbol, interval, end)
def on_send_order_failed(self, status_code: int, request: Request):
"""
Callback when sending order failed on server.
"""
data = request.response.json()
self.update_rate_limit(data)
order = request.extra
order.status = Status.REJECTED
self.gateway.on_order(order)
msg = f"委托失败,错误代码:{data['ret_code']}, 错误信息:{data['ret_msg']}"
self.gateway.write_log(msg)
def on_send_order_error(
self, exception_type: type, exception_value: Exception, tb, request: Request
):
"""
Callback when sending order caused exception.
"""
order = request.extra
order.status = Status.REJECTED
self.gateway.on_order(order)
# Record exception if not ConnectionError
if not issubclass(exception_type, ConnectionError):
self.on_error(exception_type, exception_value, tb, request)
def on_send_order(self, raw_data: dict, request: Request):
""""""
data = raw_data['result']
order = self.gateway.parse_order_data(data)
self.gateway.on_order(order)
self.update_rate_limit(raw_data)
def on_send_stop_order(self, raw_data: dict, request: Request):
""""""
data = raw_data['result']
order = self.gateway.parse_stop_order_data(data)
order.time = parse_datetime(data['updated_at'])
self.gateway.on_order(order)
self.update_rate_limit(raw_data)
def on_cancel_order_error(
self, exception_type: type, exception_value: Exception, tb, request: Request
):
"""
Callback when cancelling order failed on server.
"""
# Record exception if not ConnectionError
if not issubclass(exception_type, ConnectionError):
self.on_error(exception_type, exception_value, tb, request)
def on_cancel_order(self, raw_data: dict, request: Request):
""""""
data: dict = raw_data['result']
order = self.gateway.parse_order_data(data)
self.gateway.on_order(order)
self.update_rate_limit(data)
def on_cancel_stop_order(self, raw_data: dict, request: Request):
""""""
data: dict = raw_data['result']
order = self.gateway.parse_stop_order_data(data)
order.time = parse_datetime(data['updated_at'])
self.gateway.on_order(order)
self.update_rate_limit(data)
def on_failed(self, status_code: int, request: Request):
"""
Callback to handle request failed.
"""
data = request.response.json()
self._handle_error_response(data, request)
def _handle_error_response(self, data, request, operation_name: str = None):
if operation_name is None:
operation_name = request.path
self.update_rate_limit(data)
error_msg = data["ret_msg"]
error_code = data['ret_code']
msg = f"请求{operation_name}失败,状态码:{request.status},错误代码:{error_code}, 信息:{error_msg}"
msg += f'\n{request}'
self.gateway.write_log(msg)
def on_error(
self, exception_type: type, exception_value: Exception, tb, request: Request
):
"""
Callback to handler request exception.
"""
msg = f"触发异常,状态码:{exception_type},信息:{exception_value}"
self.gateway.write_log(msg)
sys.stderr.write(
self.exception_detail(exception_type, exception_value, tb, request)
)
def update_rate_limit(self, data: dict):
"""
Update current request limit remaining status.
:param data:
"""
remaining = data.get('rate_limit_status', None)
if remaining is not None:
self.rate_limit_remaining = remaining
def increase_rate_limit(self):
"""
Reset request limit remaining every 1 second.
"""
self.rate_limit_remaining += 1
self.rate_limit_remaining = min(
self.rate_limit_remaining, self.rate_limit_limit)
# Countdown of retry sleep seconds
if self.rate_limit_sleep:
self.rate_limit_sleep -= 1
def check_rate_limit(self):
"""
Check if rate limit is reached before sending out requests.
"""
# Already received 429 from server
if self.rate_limit_sleep:
msg = f"请求过于频繁已被Bybit限制请等待{self.rate_limit_sleep}秒后再试"
self.gateway.write_log(msg)
return False
# Just local request limit is reached
elif not self.rate_limit_remaining:
msg = "请求频率太高有触发Bybit流控的风险请稍候再试"
self.gateway.write_log(msg)
return False
else:
self.rate_limit_remaining -= 1
return True
def is_request_success(self, data: dict, request: "Request"):
return data['ret_code'] == 0
def query_contracts(self):
self.add_request("GET",
"/v2/public/tickers",
self.on_query_contracts)
def on_query_contracts(self, data: dict, request: "Request"):
for result in data['result']:
symbol = result['symbol']
contract = ContractData(
gateway_name=self.gateway_name,
symbol=symbol,
exchange=Exchange.BYBIT,
name=symbol,
product=Product.FUTURES,
size=1,
# todo: pricetick: Currently(2019-9-2) unable to query.
pricetick=PRICE_TICKS.get(symbol, 0.0001),
)
self.gateway.on_contract(contract)
def query_position(self):
self.add_request("GET",
"/position/list",
self.on_query_position
)
def on_query_position(self, raw_data: dict, request: "Request"):
for data in raw_data['result']:
p1, p2 = self.gateway.parse_position_data(data)
self.gateway.on_position(p1)
if p2:
self.gateway.on_position(p2)
account = AccountData(
gateway_name=self.gateway_name,
accountid=p1.symbol,
balance=data['wallet_balance'],
frozen=data['order_margin']
)
self.gateway.on_account(account)
def query_orders(self):
"""
query all orders, including stop orders
"""
self.add_request("GET",
"/open-api/order/list",
callback=self.on_query_orders,
)
self.query_stop_orders()
def query_stop_orders(self):
self.add_request("GET",
"/open-api/stop-order/list",
callback=self.on_query_stop_orders,
)
def on_query_orders(self, raw_data: dict, request: "Request"):
result = raw_data['result']
for data in result['data']:
if data['order_status'] != 'NotActive': # UnTriggered StopOrder
order = self.gateway.parse_order_data(data)
self.gateway.on_order(order)
def on_query_stop_orders(self, raw_data: dict, request: "Request"):
result = raw_data['result']
for data in result['data']:
order = self.gateway.parse_stop_order_data(data)
self.gateway.on_order(order)

View File

@ -1,342 +0,0 @@
import hashlib
import hmac
import sys
import time
from collections import defaultdict
from copy import copy
from datetime import datetime
from typing import Any, Callable, Dict, TYPE_CHECKING
from sortedcontainers import SortedSet
from vnpy.api.websocket import WebsocketClient
from vnpy.trader.constant import (Exchange, Product)
from vnpy.trader.object import (AccountData, ContractData, SubscribeRequest, TickData, TradeData)
from .common import (DIRECTION_BYBIT2VT, generate_timestamp, local_tz, parse_datetime, sign, utc_tz)
if TYPE_CHECKING:
from vnpy.gateway.bybit import BybitGateway
HOST = "wss://stream.bybit.com/realtime"
TEST_HOST = "wss://stream-testnet.bybit.com/realtime"
class RawOrderInfo:
def __init__(self, raw_data: dict):
self.id = raw_data['id']
self.price = raw_data['price']
self.side = raw_data['side'] # 'Buy', 'Sell'
self.size = raw_data.get('size', 0)
def __lt__(self, rhs: "RawOrderInfo"):
return self.price < rhs.price
def __eq__(self, rhs: "RawOrderInfo"):
return self.price == rhs.price
def __hash__(self):
return self.id # price is a string and we don't known its precision
class BybitWebsocketApi(WebsocketClient):
""""""
def __init__(self, gateway: "BybitGateway"):
""""""
super(BybitWebsocketApi, self).__init__()
self.server: str = "" # REAL or TESTNET
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.key = ""
self.secret = b""
self._instrument_info_data: Dict[str, dict] = defaultdict(dict)
self._orderbook_data: Dict[str, SortedSet[RawOrderInfo]] = defaultdict(SortedSet)
self.accounts = {}
self._topic_callbacks = {}
@property
def ticks(self):
return self.gateway.ticks
@property
def orders(self):
return self.gateway.orders
def connect(
self, key: str, secret: str, server: str, proxy_host: str, proxy_port: int
):
""""""
self.key = key
self.secret = secret.encode()
self.proxy_host = proxy_host
self.proxy_port = proxy_port
self.server = server
self.reset_authentication()
self.start()
def reset_authentication(self):
expires = generate_timestamp(30)
d2s = f"GET/realtime{int(expires)}"
signature = sign(self.secret, d2s.encode())
params = f"api_key={self.key}&expires={expires}&signature={signature}"
if self.server == "REAL":
host = HOST
else:
host = TEST_HOST
url = f'{host}?{params}'
self.init(url, self.proxy_host, self.proxy_port)
def subscribe(self, req: SubscribeRequest):
"""
Subscribe to tick data upate.
"""
symbol = req.symbol
self._subscribe_topic(f"instrument_info.100ms.{symbol}", self.on_tick)
self._subscribe_topic(f"orderBookL2_25.{symbol}", self.on_depth)
def on_connected(self):
""""""
self.gateway.write_log("Websocket API连接且认证成功")
self._subscribe_initialize_topics()
def on_disconnected(self):
""""""
self.gateway.write_log("Websocket API连接断开")
self.reset_authentication()
def on_packet(self, packet: dict):
""""""
success = packet.get('success', None)
topic = packet.get('topic', None)
if success is not None:
if success is False:
self.gateway.write_log("Websocket API报错%s" % packet["ret_msg"])
elif topic is not None:
self._topic_callbacks[topic](topic, packet)
else:
self.gateway.write_log(f"unkonwn packet: {packet}")
def on_error(self, exception_type: type, exception_value: Exception, tb):
""""""
msg = f"触发异常,状态码:{exception_type},信息:{exception_value}"
self.gateway.write_log(msg)
sys.stderr.write(self.exception_detail(
exception_type, exception_value, tb))
def authenticate(self):
"""
Authenticate websockey connection to subscribe private topic.
"""
expires = int(time.time())
method = "GET"
path = "/realtime"
msg = method + path + str(expires)
signature = hmac.new(
self.secret, msg.encode(), digestmod=hashlib.sha256
).hexdigest()
req = {"op": "authKey", "args": [self.key, expires, signature]}
self.send_packet(req)
def _subscribe_topic(self, topic: str, callback: Callable[[str, dict], Any]):
"""
Subscribe to all private topics.
"""
self._topic_callbacks[topic] = callback
req = {
"op": "subscribe",
"args": [topic],
}
self.send_packet(req)
def _subscribe_initialize_topics(self):
"""
Subscribe to all private topics.
"""
self._subscribe_topic("order", self.on_order)
self._subscribe_topic("execution", self.on_trade)
self._subscribe_topic("position", self.on_position)
def _get_last_tick(self, symbol: str):
tick = self.ticks.get(symbol, None)
if tick is None:
# noinspection PyTypeChecker
tick = TickData(
symbol=symbol,
exchange=Exchange.BYBIT,
name=symbol,
datetime=None, # this will be filled before this new created tick is consumed.
gateway_name=self.gateway_name,
)
self.ticks[symbol] = tick
return tick
def on_tick(self, topic: str, raw_data: dict):
""""""
# parse incremental data sent from server
symbol = topic[22:]
self._update_tick_incremental_data(symbol, raw_data)
# convert dict into TickData
last_data = self._instrument_info_data[symbol]
tick = self._get_last_tick(symbol)
tick.last_price = last_data["last_price_e4"] / 10000
tick.datetime = _parse_timestamp_e6(raw_data['timestamp_e6'])
self.gateway.on_tick(copy(tick))
def _update_tick_incremental_data(self, symbol, raw_data):
type_ = raw_data['type']
data = raw_data['data']
last_data = self._instrument_info_data[symbol]
if type_ == 'snapshot':
last_data.clear()
last_data.update(data)
elif type_ == 'delta':
updates = data['update']
for update in updates:
assert update['symbol'] == symbol
last_data.update(update)
else:
self.gateway.write_log(f"Unknown type {type_} in websocket_api.on_tick()")
def on_depth(self, topic: str, raw_data: dict):
""""""
symbol = topic[15:]
self._update_depth_incremental_data(symbol, raw_data)
last_data = self._orderbook_data[symbol]
tick = self._get_last_tick(symbol)
for n, parsed in enumerate(last_data.islice(20, 25)):
tick.__setattr__(f"bid_price_{5 - n}", parsed.price)
tick.__setattr__(f"bid_volume_{5 - n}", parsed.size)
for n, parsed in enumerate(last_data.islice(25, 30)):
tick.__setattr__(f"ask_price_{n + 1}", parsed.price)
tick.__setattr__(f"ask_volume_{n + 1}", parsed.size)
tick.datetime = _parse_timestamp_e6(raw_data['timestamp_e6'])
self.gateway.on_tick(copy(tick))
def _update_depth_incremental_data(self, symbol, raw_data):
type_ = raw_data['type']
data = raw_data['data']
last_data = self._orderbook_data[symbol]
if type_ == 'snapshot':
last_data.clear()
for item in data:
assert item['symbol'] == symbol
parsed = RawOrderInfo(item)
last_data.add(parsed)
elif type_ == 'delta':
deletes = data['delete']
for delete in deletes:
assert delete['symbol'] == symbol
parsed = RawOrderInfo(delete)
last_data.remove(parsed)
updates = data['update']
for update in updates:
assert update['symbol'] == symbol
parsed = RawOrderInfo(update)
last_data.remove(parsed)
last_data.add(parsed)
inserts = data['insert']
for insert in inserts:
assert insert['symbol'] == symbol
parsed = RawOrderInfo(insert)
last_data.add(parsed)
else:
self.gateway.write_log(f"Unknown type {type_} in websocket_api.on_tick()")
def on_trade(self, topic: str, raw_data: dict):
""""""
for data in raw_data['data']:
order_id = data['order_link_id']
if not order_id:
order_id = data['order_id']
trade = TradeData(
symbol=data["symbol"],
exchange=Exchange.BYBIT,
orderid=order_id,
tradeid=data['exec_id'],
direction=DIRECTION_BYBIT2VT[data["side"]],
price=data["price"],
volume=data["exec_qty"],
time=parse_datetime(data["trade_time"]),
gateway_name=self.gateway_name,
)
self.gateway.on_trade(trade)
def on_order(self, topic: str, raw_data: dict):
""""""
for data in raw_data['data']:
print(data)
order = self.gateway.parse_order_data(data, 'timestamp')
self.gateway.on_order(copy(order))
def on_position(self, topic: str, raw_data: dict):
""""""
for data in raw_data['data']:
p1, p2 = self.gateway.parse_position_data(data)
self.gateway.on_position(p1)
if p2:
self.gateway.on_position(p2)
def on_account(self, d):
""""""
accountid = str(d["account"])
account = self.accounts.get(accountid, None)
if not account:
account = AccountData(accountid=accountid,
gateway_name=self.gateway_name)
self.accounts[accountid] = account
account.balance = d.get("marginBalance", account.balance)
account.available = d.get("availableMargin", account.available)
account.frozen = account.balance - account.available
self.gateway.on_account(copy(account))
def on_contract(self, d):
""""""
if "tickSize" not in d:
return
if not d["lotSize"]:
return
contract = ContractData(
symbol=d["symbol"],
exchange=Exchange.BYBIT,
name=d["symbol"],
product=Product.FUTURES,
pricetick=d["tickSize"],
size=d["lotSize"],
stop_supported=True,
net_position=True,
history_data=True,
gateway_name=self.gateway_name,
)
self.gateway.on_contract(contract)
def _ping(self):
super()._ping()
self.send_packet({'op': 'ping'})
def _parse_timestamp_e6(timestamp: int):
return datetime.fromtimestamp(timestamp / 1_000_000, utc_tz).astimezone(tz=local_tz)

View File

@ -265,12 +265,12 @@ class LocalOrderManager:
Management tool to support use local order id for trading. Management tool to support use local order id for trading.
""" """
def __init__(self, gateway: BaseGateway): def __init__(self, gateway: BaseGateway, order_prefix: str = ""):
"""""" """"""
self.gateway = gateway self.gateway = gateway
# For generating local orderid # For generating local orderid
self.order_prefix = "" self.order_prefix = order_prefix
self.order_count = 0 self.order_count = 0
self.orders = {} # local_orderid:order self.orders = {} # local_orderid:order
@ -296,7 +296,7 @@ class LocalOrderManager:
Generate a new local orderid. Generate a new local orderid.
""" """
self.order_count += 1 self.order_count += 1
local_orderid = str(self.order_count).rjust(8, "0") local_orderid = self.order_prefix + str(self.order_count).rjust(8, "0")
return local_orderid return local_orderid
def get_local_orderid(self, sys_orderid: str): def get_local_orderid(self, sys_orderid: str):