From 8d02c24206eb91fa5b906f31f7b68cd2878f6a9c Mon Sep 17 00:00:00 2001 From: uelin Date: Wed, 5 Jun 2019 21:32:15 +0800 Subject: [PATCH 1/8] add binance gateway --- tests/trader/run.py | 82 +-- vnpy/gateway/binance/__init__.py | 1 + vnpy/gateway/binance/binance_gateway.py | 861 ++++++++++++++++++++++++ vnpy/trader/constant.py | 1 + 4 files changed, 905 insertions(+), 40 deletions(-) create mode 100644 vnpy/gateway/binance/__init__.py create mode 100644 vnpy/gateway/binance/binance_gateway.py diff --git a/tests/trader/run.py b/tests/trader/run.py index cf698f0e..43eccd5b 100644 --- a/tests/trader/run.py +++ b/tests/trader/run.py @@ -4,28 +4,29 @@ from vnpy.event import EventEngine from vnpy.trader.engine import MainEngine from vnpy.trader.ui import MainWindow, create_qapp -from vnpy.gateway.bitmex import BitmexGateway -from vnpy.gateway.futu import FutuGateway -from vnpy.gateway.ib import IbGateway -from vnpy.gateway.ctp import CtpGateway +from vnpy.gateway.binance import BinanceGateway +#from vnpy.gateway.bitmex import BitmexGateway +#from vnpy.gateway.futu import FutuGateway +#from vnpy.gateway.ib import IbGateway +#from vnpy.gateway.ctp import CtpGateway # from vnpy.gateway.ctptest import CtptestGateway -from vnpy.gateway.femas import FemasGateway -from vnpy.gateway.tiger import TigerGateway -from vnpy.gateway.oes import OesGateway -from vnpy.gateway.okex import OkexGateway -from vnpy.gateway.huobi import HuobiGateway -from vnpy.gateway.bitfinex import BitfinexGateway -from vnpy.gateway.onetoken import OnetokenGateway -from vnpy.gateway.okexf import OkexfGateway -from vnpy.gateway.xtp import XtpGateway -from vnpy.gateway.hbdm import HbdmGateway +#from vnpy.gateway.femas import FemasGateway +#from vnpy.gateway.tiger import TigerGateway +#from vnpy.gateway.oes import OesGateway +#from vnpy.gateway.okex import OkexGateway +#from vnpy.gateway.huobi import HuobiGateway +#from vnpy.gateway.bitfinex import BitfinexGateway +#from vnpy.gateway.onetoken import OnetokenGateway +#from vnpy.gateway.okexf import OkexfGateway +#from vnpy.gateway.xtp import XtpGateway +#from vnpy.gateway.hbdm import HbdmGateway -from vnpy.app.cta_strategy import CtaStrategyApp -from vnpy.app.csv_loader import CsvLoaderApp -from vnpy.app.algo_trading import AlgoTradingApp -from vnpy.app.cta_backtester import CtaBacktesterApp -from vnpy.app.data_recorder import DataRecorderApp -from vnpy.app.risk_manager import RiskManagerApp +#from vnpy.app.cta_strategy import CtaStrategyApp +#from vnpy.app.csv_loader import CsvLoaderApp +#from vnpy.app.algo_trading import AlgoTradingApp +#from vnpy.app.cta_backtester import CtaBacktesterApp +#from vnpy.app.data_recorder import DataRecorderApp +#from vnpy.app.risk_manager import RiskManagerApp def main(): @@ -35,28 +36,29 @@ def main(): event_engine = EventEngine() main_engine = MainEngine(event_engine) - main_engine.add_gateway(XtpGateway) - main_engine.add_gateway(CtpGateway) + main_engine.add_gateway(BinanceGateway) + #main_engine.add_gateway(XtpGateway) + #main_engine.add_gateway(CtpGateway) # main_engine.add_gateway(CtptestGateway) - main_engine.add_gateway(FemasGateway) - main_engine.add_gateway(IbGateway) - main_engine.add_gateway(FutuGateway) - main_engine.add_gateway(BitmexGateway) - main_engine.add_gateway(TigerGateway) - main_engine.add_gateway(OesGateway) - main_engine.add_gateway(OkexGateway) - main_engine.add_gateway(HuobiGateway) - main_engine.add_gateway(BitfinexGateway) - main_engine.add_gateway(OnetokenGateway) - main_engine.add_gateway(OkexfGateway) - main_engine.add_gateway(HbdmGateway) + #main_engine.add_gateway(FemasGateway) + #main_engine.add_gateway(IbGateway) + #main_engine.add_gateway(FutuGateway) + #main_engine.add_gateway(BitmexGateway) + #main_engine.add_gateway(TigerGateway) + #main_engine.add_gateway(OesGateway) + #main_engine.add_gateway(OkexGateway) + #main_engine.add_gateway(HuobiGateway) + #main_engine.add_gateway(BitfinexGateway) + #main_engine.add_gateway(OnetokenGateway) + #main_engine.add_gateway(OkexfGateway) + #main_engine.add_gateway(HbdmGateway) - main_engine.add_app(CtaStrategyApp) - main_engine.add_app(CtaBacktesterApp) - main_engine.add_app(CsvLoaderApp) - main_engine.add_app(AlgoTradingApp) - main_engine.add_app(DataRecorderApp) - main_engine.add_app(RiskManagerApp) + #main_engine.add_app(CtaStrategyApp) + #main_engine.add_app(CtaBacktesterApp) + #main_engine.add_app(CsvLoaderApp) + #main_engine.add_app(AlgoTradingApp) + #main_engine.add_app(DataRecorderApp) + #main_engine.add_app(RiskManagerApp) main_window = MainWindow(main_engine, event_engine) main_window.showMaximized() diff --git a/vnpy/gateway/binance/__init__.py b/vnpy/gateway/binance/__init__.py new file mode 100644 index 00000000..6b951a7a --- /dev/null +++ b/vnpy/gateway/binance/__init__.py @@ -0,0 +1 @@ +from .binance_gateway import BinanceGateway diff --git a/vnpy/gateway/binance/binance_gateway.py b/vnpy/gateway/binance/binance_gateway.py new file mode 100644 index 00000000..4345067f --- /dev/null +++ b/vnpy/gateway/binance/binance_gateway.py @@ -0,0 +1,861 @@ +# encoding: UTF-8 + +""" +币安交易接口 +""" + +import re +import urllib +import base64 +import json +import zlib +import hashlib +import hmac +import time +from copy import copy +from datetime import datetime +from threading import Thread + +from vnpy.event import Event +from vnpy.api.rest import RestClient, Request +from vnpy.api.websocket import WebsocketClient +from vnpy.trader.constant import ( + Direction, + Exchange, + Product, + Status, + OrderType +) +from vnpy.trader.gateway import BaseGateway, LocalOrderManager +from vnpy.trader.object import ( + TickData, + OrderData, + TradeData, + AccountData, + ContractData, + OrderRequest, + CancelRequest, + SubscribeRequest +) +from vnpy.trader.event import EVENT_TIMER + + +REST_HOST = "https://www.binance.com" +WEBSOCKET_TRADE_HOST = "wss://stream.binance.com:9443/ws/" # Account and Order +WEBSOCKET_DATA_HOST = "wss://stream.binance.com:9443/stream?streams=" # Market Data + +STATUS_BINANCE2VT = { + "NEW": Status.NOTTRADED, + "PARTIALLY_FILLED": Status.PARTTRADED, + "FILLED": Status.ALLTRADED, + "CANCELED": Status.CANCELLED, + "REJECTED": Status.REJECTED +} + +ORDERTYPE_VT2BINANCE = { + OrderType.LIMIT: "LIMIT", + OrderType.MARKET: "MARKET", + OrderType.STOP: "STOP_LOSS", +} + +ORDERTYPE_BINANCE2VT = {v: k for k, v in ORDERTYPE_VT2BINANCE.items()} + +DIRECTION_VT2BINANCE = { + Direction.LONG: "BUY", + Direction.SHORT: "SELL" +} + +DIRECTION_BINANCE2VT = {v: k for k, v in DIRECTION_VT2BINANCE.items()} + +binance_symbols = set() +symbol_name_map = {} + + +class BinanceGateway(BaseGateway): + """ + VN Trader Gateway for Binance connection. + """ + + default_setting = { + "key": "", + "secret": "", + "session_number": 3, + "proxy_host": "127.0.0.1", + "proxy_port": 1080, + } + + exchanges = [Exchange.BINANCE] + + def __init__(self, event_engine): + """Constructor""" + super(BinanceGateway, self).__init__(event_engine, "BINANCE") + + self.order_manager = LocalOrderManager(self) + + self.rest_api = BinanceRestApi(self) + self.trade_ws_api = BinanceTradeWebsocketApi(self) + self.market_ws_api = BinanceDataWebsocketApi(self) + + def connect(self, setting: dict): + """""" + key = setting["key"] + secret = setting["secret"] + session_number = setting["session_number"] + proxy_host = setting["proxy_host"] + proxy_port = setting["proxy_port"] + + self.rest_api.connect(key, secret, session_number, + proxy_host, proxy_port) + #self.trade_ws_api.connect(key, secret, proxy_host, proxy_port) + self.market_ws_api.connect(key, secret, proxy_host, proxy_port) + + #self.init_query() + + def subscribe(self, req: SubscribeRequest): + """""" + self.market_ws_api.subscribe(req) + self.trade_ws_api.subscribe(req) + + def send_order(self, req: OrderRequest): + """""" + return self.rest_api.send_order(req) + + def cancel_order(self, req: CancelRequest): + """""" + self.rest_api.cancel_order(req) + + def query_account(self): + """""" + self.rest_api.query_account() + + def query_position(self): + """""" + pass + + def close(self): + """""" + self.rest_api.stop() + self.trade_ws_api.stop() + self.market_ws_api.stop() + + def process_timer_event(self, event: Event): + """""" + self.count += 1 + if self.count < 3: + return + + self.query_account() + + def init_query(self): + """""" + self.count = 0 + self.event_engine.register(EVENT_TIMER, self.process_timer_event) + +class BinanceRestApi(RestClient): + """ + BINANCE REST API + """ + + def __init__(self, gateway: BaseGateway): + """""" + super(BinanceRestApi, self).__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + self.order_manager = gateway.order_manager + self.trade_ws_api = BinanceTradeWebsocketApi(self.gateway) + + self.host = "" + self.key = "" + self.secret = "" + self.userStreamKey = "" + self.keepaliveCount = 0 + self.recvWindow = 5000 + self.time_offset = 0 + + self.cancel_requests = {} + self.orders = {} + + def sign(self, request): + """ + Generate BINANCE signature. + """ + if request.params: + path = request.path + "?" + urllib.parse.urlencode(request.params) + else: + request.params = dict() + path = request.path + security = "NONE" + if request.data: + security = request.data['security'] + + if security == "SIGNED": + timestamp = int(time.time() * 1000) + if self.time_offset > 0: + timestamp -= abs(self.time_offset) + elif self.time_offset < 0: + timestamp += abs(self.time_offset) + request.params['timestamp'] = timestamp + request.params['recvWindow'] = self.recvWindow + + query = urllib.parse.urlencode(sorted(request.params.items())) + signature = hmac.new(self.secret, query.encode('utf-8'), hashlib.sha256).hexdigest() + query += "&signature={}".format(signature) + path = request.path + "?" + query + request.path = path + request.params = {} + request.data = {} + # Add headers + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "Accept": "application/json", + "X-MBX-APIKEY": self.key + } + if security == "SIGNED" or security == "API-KEY": + request.headers = headers + return request + + def connect( + self, + key: str, + secret: str, + session_number: int, + proxy_host: str, + proxy_port: int + ): + """ + Initialize connection to REST server. + """ + self.key = key + self.secret = secret.encode() + self.proxy_port = proxy_port + self.proxy_host = proxy_host + + self.host, _ = _split_url(REST_HOST) + + self.init(REST_HOST, proxy_host, proxy_port) + self.start(session_number) + + self.gateway.write_log("REST API启动成功") + + self.query_time() + self.query_account() + self.query_order() + self.query_contract() + self.start_userStream() + + def query_time(self): + """""" + data = { + "security": "NONE" + } + path = '/api/v1/time' + return self.add_request( + "GET", + path, + callback=self.on_query_time, + data=data + ) + + def query_account(self): + """""" + data = { + "security": "SIGNED" + } + self.add_request( + method="GET", + path="/api/v3/account", + callback=self.on_query_account, + data=data + ) + + def query_order(self): + """""" + data = { + "security": "SIGNED" + } + self.add_request( + method="GET", + path="/api/v3/openOrders", + callback=self.on_query_order, + data=data + ) + + def query_contract(self): + """""" + data = { + "security": "NONE" + } + self.add_request( + method="GET", + path="/api/v1/exchangeInfo", + callback=self.on_query_contract, + data=data + ) + + def send_order(self, req: OrderRequest): + """""" + local_orderid = self.order_manager.new_local_orderid() + order = req.create_order_data( + local_orderid, + self.gateway_name + ) + order.time = datetime.now().strftime("%H:%M:%S") + + data = { + "security": "SIGNED" + } + + params = { + "symbol": req.symbol, + "timeInForce": "GTC", + "side": DIRECTION_VT2BINANCE[req.direction], + "type": ORDERTYPE_VT2BINANCE[req.type], + "price": str(req.price), + "quantity": str(req.volume) + } + + self.add_request( + method="POST", + path="/api/v3/order", + callback=self.on_send_order, + data=data, + params=params, + extra=order, + on_error=self.on_send_order_error, + on_failed=self.on_send_order_failed + ) + + self.order_manager.on_order(order) + return order.vt_orderid + + def cancel_order(self, req: CancelRequest): + """""" + sys_orderid = self.order_manager.get_sys_orderid(req.orderid) + + data = { + "security": "SIGNED" + } + params = { + "symbol": req.symbol, + "orderId": req.orderid + } + self.add_request( + method="DELETE", + path="/api/v3/order", + callback=self.on_cancel_order, + params=params, + data=data, + extra=req + ) + + def start_userStream(self): + """""" + data = { + "security": "API-KEY" + } + self.add_request( + method="POST", + path='/api/v1/userDataStream', + callback=self.on_start_userStream, + data=data + ) + + def keepalive_userStream(self): + """""" + self.keepaliveCount += 1 + if self.keepaliveCount < 1800: + return + data = { + "security": "SIGNED" + } + params = { + 'listenKey': self.userStreamKey + } + self.add_request( + method='PUT', + path='/api/v1/userDataStream', + callback=self.on_keepalive_userStream, + params=params, + data=data + ) + + def close_userStream(self, listenKey): + """""" + data = { + "security": "SIGNED" + } + params = { + 'listenKey': listenKey + } + self.add_request( + method='DELETE', + path='/api/v1/userDataStream', + callback=self.on_close_userStream, + params=params, + data=data + ) + + def on_query_time(self, data, request): + """""" + time_now = int(time.time() * 1000) + time_server = int(data["serverTime"]) + server_local_time = time.localtime(float(time_server / 1000)) + now_local_time = time.localtime(float(time_now / 1000)) + self.time_offset = time_now - time_server + server_time = time.strftime("%Y-%m-%d %H:%M:%S", server_local_time) + local_time = time.strftime("%Y-%m-%d %H:%M:%S", now_local_time) + msg = f"服务器时间:{server_time},本机时间:{local_time}" + self.gateway.write_log(msg) + + def on_query_account(self, data, request): + """""" + for account_data in data["balances"]: + account = AccountData( + accountid=account_data["asset"], + balance=float(account_data["free"]), + frozen=float(account_data["locked"]), + gateway_name=self.gateway_name + ) + self.gateway.on_account(account) + + self.gateway.write_log("账户资金查询成功") + + def on_query_order(self, data, request): + """""" + for d in data: + sys_orderid = d["orderId"] + local_orderid = self.order_manager.get_local_orderid(sys_orderid) + + direction = DIRECTION_BINANCE2VT[d["side"]] + order_type = ORDERTYPE_BINANCE2VT[d["type"]] + dt = datetime.fromtimestamp(d["time"] / 1000) + time = dt.strftime("%Y-%m-%d %H:%M:%S") + + order = OrderData( + orderid=sys_orderid, + symbol=d["symbol"], + exchange=Exchange.BINANCE, + price=float(d["price"]), + volume=float(d["origQty"]), + type=order_type, + direction=direction, + traded=float(d["executedQty"]), + status=STATUS_BINANCE2VT.get(d["status"], None), + time=time, + gateway_name=self.gateway_name, + ) + + self.order_manager.on_order(order) + + self.gateway.write_log("委托信息查询成功") + + def on_query_contract(self, data, request): + """""" + for d in data["symbols"]: + base_currency = d["baseAsset"] + quote_currency = d["quoteAsset"] + name = f"{base_currency.upper()}/{quote_currency.upper()}" + pricetick = 0 + min_volume = 0 + for f in d["filters"]: + if f["filterType"] == "PRICE_FILTER": + pricetick = f["tickSize"] + if f["filterType"] == "LOT_SIZE": + min_volume = f["stepSize"] + contract = ContractData( + symbol=d["symbol"], + exchange=Exchange.BINANCE, + name=name, + pricetick=pricetick, + size=1, + min_volume=min_volume, + product=Product.SPOT, + gateway_name=self.gateway_name, + ) + self.gateway.on_contract(contract) + + binance_symbols.add(contract.symbol) + symbol_name_map[contract.symbol] = contract.name + + self.gateway.write_log("合约信息查询成功") + + def on_send_order(self, data, request): + """""" + order = request.extra + if self.check_error(data, "委托"): + order.status = Status.REJECTED + self.order_manager.on_order(order) + return + order.status = STATUS_BINANCE2VT.get(data["status"], None) + sys_orderid = data["orderId"] + self.order_manager.on_order(order) + self.order_manager.update_orderid_map(order.orderid, sys_orderid) + + def on_send_order_failed(self, status_code: str, request: Request): + """ + Callback when sending order failed on server. + """ + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) + + msg = f"委托失败,状态码:{status_code},信息:{request.response.text}" + self.gateway.write_log(msg) + + def on_send_order_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback when sending order caused exception. + """ + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) + + # Record exception if not ConnectionError + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) + + def on_cancel_order(self, data, request): + """""" + cancel_request = request.extra + local_orderid = cancel_request.orderid + order = self.order_manager.get_order_with_local_orderid(local_orderid) + + if self.check_error(data, "撤单"): + order.status = Status.REJECTED + else: + order.status = Status.CANCELLED + self.gateway.write_log(f"委托撤单成功:{order.orderid}") + + self.order_manager.on_order(order) + + def on_start_userStream(self, data, request): + self.userStreamKey = data['listenKey'] + self.keepaliveCount = 0 + url = WEBSOCKET_TRADE_HOST + self.userStreamKey + self.trade_ws_api.connect( + key=self.key, + secret=self.secret, + url=url, + proxy_host=self.proxy_host, + proxy_port=self.proxy_port) + + def on_keepalive_userStream(self, data, request): + self.gateway.write_log("交易推送刷新成功") + if self.keepaliveCount >= 1800: + self.keepaliveCount = 0 + self.keepalive_userStream(self.userStreamKey) + + def on_close_userStream(self, listenKey): + self.gateway.write_log("交易推送关闭") + + def check_error(self, data: dict, func: str = ""): + """""" + if data["status"] != "error": + return False + + error_code = data["err-code"] + error_msg = data["err-msg"] + + self.gateway.write_log(f"{func}请求出错,代码:{error_code},信息:{error_msg}") + return True + + +class BinanceWebsocketApiBase(WebsocketClient): + """""" + + def __init__(self, gateway): + """""" + super(BinanceWebsocketApiBase, self).__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.key = "" + self.secret = "" + #self.sign_host = "" + self.path = "" + + def connect( + self, + key: str, + secret: str, + url: str, + proxy_host: str, + proxy_port: int + ): + """""" + self.key = key + self.secret = secret + + #host, path = _split_url(url) + #self.sign_host = host + #self.path = path + + self.init(url, proxy_host, proxy_port) + self.start() + + def login(self): + """""" + params = {"op": "auth"} + #params.update(create_signature(self.key, "GET", self.sign_host, self.path, self.secret)) + return self.send_packet(params) + + def on_login(self, packet): + """""" + pass + + @staticmethod + def unpack_data(data): + """""" + print("==============unpack_data============") + print(data) + return json.loads(data) + #return json.loads(zlib.decompress(data, zlib.Z_BEST_COMPRESSION)) + + def on_packet(self, packet): + """""" + print("=============on_packet=============") + print("event type:"+packet["e"]) + print(packet) + #if packet["e"] == "executionReport": + if "ping" in packet: + req = {"pong": packet["ping"]} + self.send_packet(req) + elif "op" in packet and packet["op"] == "ping": + req = { + "op": "pong", + "ts": packet["ts"] + } + self.send_packet(req) + elif "err-msg" in packet: + return self.on_error_msg(packet) + elif "op" in packet and packet["op"] == "auth": + return self.on_login() + else: + self.on_data(packet) + + def on_data(self, packet): + """""" + print("data : {}".format(packet)) + + def on_error_msg(self, packet): + """""" + msg = packet["err-msg"] + if msg == "invalid pong": + return + + self.gateway.write_log(packet["err-msg"]) + + +class BinanceTradeWebsocketApi(BinanceWebsocketApiBase): + """""" + def __init__(self, gateway): + """""" + super().__init__(gateway) + + self.order_manager = gateway.order_manager + self.order_manager.push_data_callback = self.on_data + + self.req_id = 0 + + def connect(self, key, secret, url, proxy_host, proxy_port): + """""" + super().connect(key, secret, url, proxy_host, proxy_port) + self.gateway.write_log("交易Websocket API连接成功") + self.gateway.rest_api.keepalive_userStream() + + def subscribe(self, req: SubscribeRequest): + """""" + self.req_id += 1 + req = { + "op": "sub", + "cid": str(self.req_id), + "topic": f"orders.{req.symbol}" + } + self.send_packet(req) + + def on_connected(self): + """""" + pass + + def on_login(self): + """""" + pass + + def on_data(self, packet): # type: (dict)->None + """""" + print("==========on_data1=========") + # push order data change + if packet["e"] == "executionReport": + order = OrderData( + symbol=packet["s"], + exchange=Exchange.BINANCE, + orderid=packet["i"], + status=STATUS_BINANCE2VT.get(packet["X"], None), + traded=float(packet["Z"]), + price=float(packet["L"]), + time=packet["O"], + gateway_name=self.gateway_name + ) + self.on_order(order) + # push account data change + if packet["e"] == "outboundAccountInfo": + for account_data in packet["B"]: + account = AccountData( + accountid=account_data["a"], + balance=float(account_data["f"]), + frozen=float(account_data["l"]), + gateway_name=self.gateway_name + ) + self.on_account(account) + + def on_order(self, data: dict): + """""" + sys_orderid = data.orderid + + order = self.order_manager.get_order_with_sys_orderid(sys_orderid) + if not order: + self.order_manager.add_push_data(sys_orderid, data) + return + + # Push order event + order.traded = data.traded + order.status = data.status + self.order_manager.on_order(order) + + # Push trade event + traded_volume = data.traded + if not traded_volume: + return + + trade = TradeData( + symbol=order.symbol, + exchange=Exchange.BINANCE, + orderid=order.orderid, + direction=order.direction, + price=float(order.price), + volume=float(order.traded), + time=datetime.now().strftime("%H:%M:%S"), + gateway_name=self.gateway_name, + ) + self.gateway.on_trade(trade) + + def on_account(self, data: dict): + """""" + self.gateway.on_account(data) + +class BinanceDataWebsocketApi(BinanceWebsocketApiBase): + """""" + + def __init__(self, gateway): + """""" + super().__init__(gateway) + + self.req_id = 0 + self.ticks = {} + + def connect(self, key: str, secret: str, proxy_host: str, proxy_port: int): + """""" + super().connect(key, secret, WEBSOCKET_DATA_HOST, proxy_host, proxy_port) + self.gateway.write_log("行情Websocket API连接成功") + + def on_connected(self): + """""" + pass + + def subscribe(self, req: SubscribeRequest): + """""" + symbol = req.symbol + print("============BinanceDataWebsocketApi.subscribe===========") + print("symbol"+symbol) + # Create tick data buffer + tick = TickData( + symbol=symbol, + name=symbol_name_map.get(symbol, ""), + exchange=Exchange.BINANCE, + datetime=datetime.now(), + gateway_name=self.gateway_name, + ) + self.ticks[symbol] = tick + + # Subscribe to market depth update + self.req_id += 1 + req = { + "sub": f"market.{symbol}.depth.step0", + "id": str(self.req_id) + } + self.send_packet(req) + + # Subscribe to market detail update + self.req_id += 1 + req = { + "sub": f"market.{symbol}.detail", + "id": str(self.req_id) + } + self.send_packet(req) + + def on_data(self, packet): # type: (dict)->None + """""" + print("===================on_data=====================") + print(packet) + + channel = packet.get("ch", None) + if channel: + if "depth.step" in channel: + self.on_market_depth(packet) + elif "detail" in channel: + self.on_market_detail(packet) + elif "err-code" in packet: + code = packet["err-code"] + msg = packet["err-msg"] + self.gateway.write_log(f"错误代码:{code}, 错误信息:{msg}") + + def on_market_depth(self, data): + """行情深度推送 """ + symbol = data["ch"].split(".")[1] + tick = self.ticks[symbol] + tick.datetime = datetime.fromtimestamp(data["ts"] / 1000) + + bids = data["tick"]["bids"] + for n in range(5): + price, volume = bids[n] + tick.__setattr__("bid_price_" + str(n + 1), float(price)) + tick.__setattr__("bid_volume_" + str(n + 1), float(volume)) + + asks = data["tick"]["asks"] + for n in range(5): + price, volume = asks[n] + tick.__setattr__("ask_price_" + str(n + 1), float(price)) + tick.__setattr__("ask_volume_" + str(n + 1), float(volume)) + + if tick.last_price: + self.gateway.on_tick(copy(tick)) + + def on_market_detail(self, data): + """市场细节推送""" + symbol = data["ch"].split(".")[1] + tick = self.ticks[symbol] + tick.datetime = datetime.fromtimestamp(data["ts"] / 1000) + + tick_data = data["tick"] + tick.open_price = float(tick_data["open"]) + tick.high_price = float(tick_data["high"]) + tick.low_price = float(tick_data["low"]) + tick.last_price = float(tick_data["close"]) + tick.volume = float(tick_data["vol"]) + + if tick.bid_price_1: + self.gateway.on_tick(copy(tick)) + + +def _split_url(url): + """ + 将url拆分为host和path + :return: host, path + """ + result = re.match("\w+://([^/]*)(.*)", url) # noqa + if result: + return result.group(1), result.group(2) \ No newline at end of file diff --git a/vnpy/trader/constant.py b/vnpy/trader/constant.py index eb2eec7a..4965a0de 100644 --- a/vnpy/trader/constant.py +++ b/vnpy/trader/constant.py @@ -113,6 +113,7 @@ class Exchange(Enum): OKEX = "OKEX" HUOBI = "HUOBI" BITFINEX = "BITFINEX" + BINANCE = "BINANCE" class Currency(Enum): From 0f841ece0eb514291d894a94714b8d097fdf8e51 Mon Sep 17 00:00:00 2001 From: 1122455801 Date: Tue, 11 Jun 2019 17:02:38 +0800 Subject: [PATCH 2/8] Create __init__.py --- vnpy/gateway/binance/__init__.py | 1 + 1 file changed, 1 insertion(+) create mode 100644 vnpy/gateway/binance/__init__.py diff --git a/vnpy/gateway/binance/__init__.py b/vnpy/gateway/binance/__init__.py new file mode 100644 index 00000000..6b951a7a --- /dev/null +++ b/vnpy/gateway/binance/__init__.py @@ -0,0 +1 @@ +from .binance_gateway import BinanceGateway From 9d8db0fbfd082d1a1f7372a35ff96e6b92a96e85 Mon Sep 17 00:00:00 2001 From: 1122455801 Date: Tue, 11 Jun 2019 17:02:43 +0800 Subject: [PATCH 3/8] Create binance_gateway.py --- vnpy/gateway/binance/binance_gateway.py | 873 ++++++++++++++++++++++++ 1 file changed, 873 insertions(+) create mode 100644 vnpy/gateway/binance/binance_gateway.py diff --git a/vnpy/gateway/binance/binance_gateway.py b/vnpy/gateway/binance/binance_gateway.py new file mode 100644 index 00000000..a677be9a --- /dev/null +++ b/vnpy/gateway/binance/binance_gateway.py @@ -0,0 +1,873 @@ +# encoding: UTF-8 + +""" +币安交易接口 +""" + +import re +import urllib +import base64 +import json +import zlib +import hashlib +import hmac +import time +from copy import copy +from datetime import datetime +from threading import Thread + +from vnpy.event import Event +from vnpy.api.rest import RestClient, Request +from vnpy.api.websocket import WebsocketClient +from vnpy.trader.constant import ( + Direction, + Exchange, + Product, + Status, + OrderType +) +from vnpy.trader.gateway import BaseGateway, LocalOrderManager +from vnpy.trader.object import ( + TickData, + OrderData, + TradeData, + AccountData, + ContractData, + OrderRequest, + CancelRequest, + SubscribeRequest +) +from vnpy.trader.event import EVENT_TIMER + + +REST_HOST = "https://www.binance.com" +WEBSOCKET_TRADE_HOST = "wss://stream.binance.com:9443/ws/" # Account and Order +WEBSOCKET_DATA_HOST = "wss://stream.binance.com:9443/stream?streams=" # Market Data + +STATUS_BINANCE2VT = { + "NEW": Status.NOTTRADED, + "PARTIALLY_FILLED": Status.PARTTRADED, + "FILLED": Status.ALLTRADED, + "CANCELED": Status.CANCELLED, + "REJECTED": Status.REJECTED +} + +ORDERTYPE_VT2BINANCE = { + OrderType.LIMIT: "LIMIT", + OrderType.MARKET: "MARKET", + OrderType.STOP: "STOP_LOSS", +} + +ORDERTYPE_BINANCE2VT = {v: k for k, v in ORDERTYPE_VT2BINANCE.items()} + +DIRECTION_VT2BINANCE = { + Direction.LONG: "BUY", + Direction.SHORT: "SELL" +} + +DIRECTION_BINANCE2VT = {v: k for k, v in DIRECTION_VT2BINANCE.items()} + +binance_symbols = set() +symbol_name_map = {} + + +class BinanceGateway(BaseGateway): + """ + VN Trader Gateway for Binance connection. + """ + + default_setting = { + "key": "", + "secret": "", + "session_number": 3, + "proxy_host": "127.0.0.1", + "proxy_port": 2000, + } + + exchanges = [Exchange.BINANCE] + + def __init__(self, event_engine): + """Constructor""" + super(BinanceGateway, self).__init__(event_engine, "BINANCE") + + self.order_manager = LocalOrderManager(self) + + self.rest_api = BinanceRestApi(self) + self.trade_ws_api = BinanceTradeWebsocketApi(self) + self.market_ws_api = BinanceDataWebsocketApi(self) + + def connect(self, setting: dict): + """""" + key = setting["key"] + secret = setting["secret"] + session_number = setting["session_number"] + proxy_host = setting["proxy_host"] + proxy_port = setting["proxy_port"] + + self.rest_api.connect(key, secret, session_number, + proxy_host, proxy_port) + #self.trade_ws_api.connect(key, secret, proxy_host, proxy_port) + #self.market_ws_api.connect(key, secret, proxy_host, proxy_port) + + #self.init_query() + + def subscribe(self, req: SubscribeRequest): + """""" + self.market_ws_api.subscribe(req) + #self.trade_ws_api.subscribe(req) + + def send_order(self, req: OrderRequest): + """""" + return self.rest_api.send_order(req) + + def cancel_order(self, req: CancelRequest): + """""" + self.rest_api.cancel_order(req) + + def query_account(self): + """""" + self.rest_api.query_account() + + def query_position(self): + """""" + pass + + def close(self): + """""" + self.rest_api.stop() + self.trade_ws_api.stop() + self.market_ws_api.stop() + + def process_timer_event(self, event: Event): + """""" + self.count += 1 + if self.count < 3: + return + + self.query_account() + + def init_query(self): + """""" + self.count = 0 + self.event_engine.register(EVENT_TIMER, self.process_timer_event) + +class BinanceRestApi(RestClient): + """ + BINANCE REST API + """ + + def __init__(self, gateway: BaseGateway): + """""" + super(BinanceRestApi, self).__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + self.order_manager = gateway.order_manager + self.trade_ws_api = BinanceTradeWebsocketApi(self.gateway) + + self.host = "" + self.key = "" + self.secret = "" + self.userStreamKey = "" + self.keepaliveCount = 0 + self.recvWindow = 5000 + self.time_offset = 0 + + self.cancel_requests = {} + self.orders = {} + + def sign(self, request): + """ + Generate BINANCE signature. + """ + if request.params: + path = request.path + "?" + urllib.parse.urlencode(request.params) + else: + request.params = dict() + path = request.path + security = "NONE" + if request.data: + security = request.data['security'] + + if security == "SIGNED": + timestamp = int(time.time() * 1000) + if self.time_offset > 0: + timestamp -= abs(self.time_offset) + elif self.time_offset < 0: + timestamp += abs(self.time_offset) + request.params['timestamp'] = timestamp + request.params['recvWindow'] = self.recvWindow + + query = urllib.parse.urlencode(sorted(request.params.items())) + signature = hmac.new(self.secret, query.encode('utf-8'), hashlib.sha256).hexdigest() + query += "&signature={}".format(signature) + path = request.path + "?" + query + request.path = path + request.params = {} + request.data = {} + # Add headers + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "Accept": "application/json", + "X-MBX-APIKEY": self.key + } + if security == "SIGNED" or security == "API-KEY": + request.headers = headers + return request + + def connect( + self, + key: str, + secret: str, + session_number: int, + proxy_host: str, + proxy_port: int + ): + """ + Initialize connection to REST server. + """ + self.key = key + self.secret = secret.encode() + self.proxy_port = proxy_port + self.proxy_host = proxy_host + + self.host, _ = _split_url(REST_HOST) + + self.init(REST_HOST, proxy_host, proxy_port) + self.start(session_number) + + self.gateway.write_log("REST API启动成功") + + self.query_time() + self.query_account() + self.query_order() + self.query_contract() + self.start_userStream() + + def query_time(self): + """""" + data = { + "security": "NONE" + } + path = '/api/v1/time' + return self.add_request( + "GET", + path, + callback=self.on_query_time, + data=data + ) + + def query_account(self): + """""" + data = { + "security": "SIGNED" + } + self.add_request( + method="GET", + path="/api/v3/account", + callback=self.on_query_account, + data=data + ) + + def query_order(self): + """""" + data = { + "security": "SIGNED" + } + self.add_request( + method="GET", + path="/api/v3/openOrders", + callback=self.on_query_order, + data=data + ) + + def query_contract(self): + """""" + data = { + "security": "NONE" + } + self.add_request( + method="GET", + path="/api/v1/exchangeInfo", + callback=self.on_query_contract, + data=data + ) + + def send_order(self, req: OrderRequest): + """""" + local_orderid = self.order_manager.new_local_orderid() + order = req.create_order_data( + local_orderid, + self.gateway_name + ) + order.time = datetime.now().strftime("%H:%M:%S") + + data = { + "security": "SIGNED" + } + + params = { + "symbol": req.symbol, + "timeInForce": "GTC", + "side": DIRECTION_VT2BINANCE[req.direction], + "type": ORDERTYPE_VT2BINANCE[req.type], + "price": str(req.price), + "quantity": str(req.volume) + } + + self.add_request( + method="POST", + path="/api/v3/order", + callback=self.on_send_order, + data=data, + params=params, + extra=order, + on_error=self.on_send_order_error, + on_failed=self.on_send_order_failed + ) + + self.order_manager.on_order(order) + return order.vt_orderid + + def cancel_order(self, req: CancelRequest): + """""" + sys_orderid = self.order_manager.get_sys_orderid(req.orderid) + + data = { + "security": "SIGNED" + } + params = { + "symbol": req.symbol, + "orderId": sys_orderid + } + self.add_request( + method="DELETE", + path="/api/v3/order", + callback=self.on_cancel_order, + params=params, + data=data, + extra=req + ) + print("撤单本地id:", req.orderid, "撤单远端id:", sys_orderid) + + def start_userStream(self): + """""" + data = { + "security": "API-KEY" + } + self.add_request( + method="POST", + path='/api/v1/userDataStream', + callback=self.on_start_userStream, + data=data + ) + + def keepalive_userStream(self): + """""" + self.keepaliveCount += 1 + if self.keepaliveCount < 1800: + return + data = { + "security": "SIGNED" + } + params = { + 'listenKey': self.userStreamKey + } + self.add_request( + method='PUT', + path='/api/v1/userDataStream', + callback=self.on_keepalive_userStream, + params=params, + data=data + ) + + def close_userStream(self, listenKey): + """""" + data = { + "security": "SIGNED" + } + params = { + 'listenKey': listenKey + } + self.add_request( + method='DELETE', + path='/api/v1/userDataStream', + callback=self.on_close_userStream, + params=params, + data=data + ) + + def on_query_time(self, data, request): + """""" + time_now = int(time.time() * 1000) + time_server = int(data["serverTime"]) + server_local_time = time.localtime(float(time_server / 1000)) + now_local_time = time.localtime(float(time_now / 1000)) + self.time_offset = time_now - time_server + server_time = time.strftime("%Y-%m-%d %H:%M:%S", server_local_time) + local_time = time.strftime("%Y-%m-%d %H:%M:%S", now_local_time) + msg = f"服务器时间:{server_time},本机时间:{local_time}" + self.gateway.write_log(msg) + + def on_query_account(self, data, request): + """""" + for account_data in data["balances"]: + account = AccountData( + accountid=account_data["asset"], + balance=float(account_data["free"]), + frozen=float(account_data["locked"]), + gateway_name=self.gateway_name + ) + self.gateway.on_account(account) + + self.gateway.write_log("账户资金查询成功") + + def on_query_order(self, data, request): + """""" + for d in data: + sys_orderid = d["orderId"] + local_orderid = self.order_manager.get_local_orderid(sys_orderid) + + direction = DIRECTION_BINANCE2VT[d["side"]] + order_type = ORDERTYPE_BINANCE2VT[d["type"]] + dt = datetime.fromtimestamp(d["time"] / 1000) + time = dt.strftime("%Y-%m-%d %H:%M:%S") + + order = OrderData( + orderid=local_orderid, + symbol=d["symbol"], + exchange=Exchange.BINANCE, + price=float(d["price"]), + volume=float(d["origQty"]), + type=order_type, + direction=direction, + traded=float(d["executedQty"]), + status=STATUS_BINANCE2VT.get(d["status"], None), + time=time, + gateway_name=self.gateway_name, + ) + print("委托查询--远端id:",sys_orderid, "本地Id:", local_orderid) + self.order_manager.on_order(order) + + self.gateway.write_log("委托信息查询成功") + + def on_query_contract(self, data, request): + """""" + for d in data["symbols"]: + base_currency = d["baseAsset"] + quote_currency = d["quoteAsset"] + name = f"{base_currency.upper()}/{quote_currency.upper()}" + pricetick = 0 + min_volume = 0 + for f in d["filters"]: + if f["filterType"] == "PRICE_FILTER": + pricetick = f["tickSize"] + if f["filterType"] == "LOT_SIZE": + min_volume = f["stepSize"] + contract = ContractData( + symbol=d["symbol"], + exchange=Exchange.BINANCE, + name=name, + pricetick=pricetick, + size=1, + min_volume=min_volume, + product=Product.SPOT, + gateway_name=self.gateway_name, + ) + self.gateway.on_contract(contract) + + binance_symbols.add(contract.symbol) + symbol_name_map[contract.symbol] = contract.name + + self.gateway.write_log("合约信息查询成功") + + def on_send_order(self, data, request): + """""" + order = request.extra + if self.check_error(data, "委托"): + order.status = Status.REJECTED + self.order_manager.on_order(order) + return + order.status = STATUS_BINANCE2VT.get(data["status"], None) + sys_orderid = data["orderId"] + self.order_manager.on_order(order) + self.order_manager.update_orderid_map(order.orderid, sys_orderid) + + def on_send_order_failed(self, status_code: str, request: Request): + """ + Callback when sending order failed on server. + """ + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) + + msg = f"委托失败,状态码:{status_code},信息:{request.response.text}" + self.gateway.write_log(msg) + + def on_send_order_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback when sending order caused exception. + """ + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) + + # Record exception if not ConnectionError + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) + + def on_cancel_order(self, data, request): + """""" + cancel_request = request.extra + local_orderid = cancel_request.orderid + order = self.order_manager.get_order_with_local_orderid(local_orderid) + + if self.check_error(data, "撤单"): + order.status = Status.REJECTED + else: + order.status = Status.CANCELLED + self.gateway.write_log(f"委托撤单成功:{order.orderid}") + + self.order_manager.on_order(order) + + def on_start_userStream(self, data, request): + self.userStreamKey = data['listenKey'] + self.keepaliveCount = 0 + url = WEBSOCKET_TRADE_HOST + self.userStreamKey + self.trade_ws_api.connect( + key=self.key, + secret=self.secret, + url=url, + proxy_host=self.proxy_host, + proxy_port=self.proxy_port) + + def on_keepalive_userStream(self, data, request): + self.gateway.write_log("交易推送刷新成功") + if self.keepaliveCount >= 1800: + self.keepaliveCount = 0 + self.keepalive_userStream(self.userStreamKey) + + def on_close_userStream(self, listenKey): + self.gateway.write_log("交易推送关闭") + + def check_error(self, data: dict, func: str = ""): + """""" + if data["status"] != "error": + return False + + error_code = data["err-code"] + error_msg = data["err-msg"] + + self.gateway.write_log(f"{func}请求出错,代码:{error_code},信息:{error_msg}") + return True + + +class BinanceWebsocketApiBase(WebsocketClient): + """""" + + def __init__(self, gateway): + """""" + super(BinanceWebsocketApiBase, self).__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.key = "" + self.secret = "" + #self.sign_host = "" + self.path = "" + + def connect( + self, + key: str, + secret: str, + url: str, + proxy_host: str, + proxy_port: int + ): + """""" + self.key = key + self.secret = secret + + #host, path = _split_url(url) + #self.sign_host = host + #self.path = path + + self.init(url, proxy_host, proxy_port) + self.start() + + def login(self): + """""" + params = {"op": "auth"} + #params.update(create_signature(self.key, "GET", self.sign_host, self.path, self.secret)) + return self.send_packet(params) + + def on_login(self, packet): + """""" + pass + + @staticmethod + def unpack_data(data): + """""" + print("==============unpack_data============") + print(data) + return json.loads(data) + #return json.loads(zlib.decompress(data, zlib.Z_BEST_COMPRESSION)) + + def on_packet(self, packet): + """""" + print("=============on_packet=============") + print("event type:"+packet["e"]) + print(packet) + #if packet["e"] == "executionReport": + if "ping" in packet: + req = {"pong": packet["ping"]} + self.send_packet(req) + elif "op" in packet and packet["op"] == "ping": + req = { + "op": "pong", + "ts": packet["ts"] + } + self.send_packet(req) + elif "err-msg" in packet: + return self.on_error_msg(packet) + elif "op" in packet and packet["op"] == "auth": + return self.on_login() + else: + self.on_data(packet) + + def on_data(self, packet): + """""" + print("data : {}".format(packet)) + + def on_error_msg(self, packet): + """""" + msg = packet["err-msg"] + if msg == "invalid pong": + return + + self.gateway.write_log(packet["err-msg"]) + + +class BinanceTradeWebsocketApi(BinanceWebsocketApiBase): + """""" + def __init__(self, gateway): + """""" + super().__init__(gateway) + + self.order_manager = gateway.order_manager + self.order_manager.push_data_callback = self.on_data + + self.req_id = 0 + + def connect(self, key, secret, url, proxy_host, proxy_port): + """""" + super().connect(key, secret, url, proxy_host, proxy_port) + self.gateway.write_log("交易Websocket API连接成功") + self.gateway.rest_api.keepalive_userStream() + + def subscribe(self, req: SubscribeRequest): + """""" + self.req_id += 1 + req = { + "op": "sub", + "cid": str(self.req_id), + "topic": f"orders.{req.symbol}" + } + self.send_packet(req) + + def on_connected(self): + """""" + pass + + def on_login(self): + """""" + pass + + def on_data(self, packet): # type: (dict)->None + """""" + print("==========on_data1=========") + # push order data change + if packet["e"] == "executionReport": + self.on_order(packet) + + # order = OrderData( + # symbol=packet["s"], + # exchange=Exchange.BINANCE, + # orderid=packet["i"], + # status=STATUS_BINANCE2VT.get(packet["X"], None), + # traded=float(packet["Z"]), + # price=float(packet["L"]), + # time=packet["O"], + # gateway_name=self.gateway_name + # ) + # self.on_order(order) + + # push account data change + if packet["e"] == "outboundAccountInfo": + for account_data in packet["B"]: + account = AccountData( + accountid=account_data["a"], + balance=float(account_data["f"]), + frozen=float(account_data["l"]), + gateway_name=self.gateway_name + ) + self.on_account(account) + + def on_order(self, data: dict): + """""" + + sys_orderid = str(data["i"]) + order = self.order_manager.get_order_with_sys_orderid(sys_orderid) + if not order: + self.order_manager.add_push_data(sys_orderid, data) + return + + traded_volume = float(data["Z"]) + # Push order event + + order.traded += traded_volume + order.status = STATUS_BINANCE2VT.get(packet["X"], None) + order.price = float(data["L"]) + order.time = data["O"] + order.symbol = data["s"] + + print("远端ID:", sys_orderid, "本地ID:", order) + self.order_manager.on_order(order) + + + # Push trade event + traded_volume = data.traded + if not traded_volume: + return + + trade = TradeData( + symbol=order.symbol, + exchange=Exchange.BINANCE, + orderid=order.orderid, + direction=order.direction, + price=float(order.price), + volume=float(order.traded), + time=datetime.now().strftime("%H:%M:%S"), + gateway_name=self.gateway_name, + ) + self.gateway.on_trade(trade) + + def on_account(self, data: dict): + """""" + self.gateway.on_account(data) + +class BinanceDataWebsocketApi(BinanceWebsocketApiBase): + """""" + + def __init__(self, gateway): + """""" + super().__init__(gateway) + + self.req_id = 0 + self.ticks = {} + + def connect(self, key: str, secret: str, proxy_host: str, proxy_port: int): + """""" + super().connect(key, secret, WEBSOCKET_DATA_HOST, proxy_host, proxy_port) + self.gateway.write_log("行情Websocket API连接成功") + + def on_connected(self): + """""" + pass + + def subscribe(self, req: SubscribeRequest): + """""" + symbol = req.symbol + print("============BinanceDataWebsocketApi.subscribe===========") + print("symbol"+symbol) + # Create tick data buffer + tick = TickData( + symbol=symbol, + name=symbol_name_map.get(symbol, ""), + exchange=Exchange.BINANCE, + datetime=datetime.now(), + gateway_name=self.gateway_name, + ) + self.ticks[symbol] = tick + + # Subscribe to market depth update + self.req_id += 1 + req = { + "sub": f"market.{symbol}.depth.step0", + "id": str(self.req_id) + } + self.send_packet(req) + + # Subscribe to market detail update + self.req_id += 1 + req = { + "sub": f"market.{symbol}.detail", + "id": str(self.req_id) + } + self.send_packet(req) + + def on_data(self, packet): # type: (dict)->None + """""" + print("===================on_data=====================") + print(packet) + + channel = packet.get("ch", None) + if channel: + if "depth.step" in channel: + self.on_market_depth(packet) + elif "detail" in channel: + self.on_market_detail(packet) + elif "err-code" in packet: + code = packet["err-code"] + msg = packet["err-msg"] + self.gateway.write_log(f"错误代码:{code}, 错误信息:{msg}") + + def on_market_depth(self, data): + """行情深度推送 """ + symbol = data["ch"].split(".")[1] + tick = self.ticks[symbol] + tick.datetime = datetime.fromtimestamp(data["ts"] / 1000) + + bids = data["tick"]["bids"] + for n in range(5): + price, volume = bids[n] + tick.__setattr__("bid_price_" + str(n + 1), float(price)) + tick.__setattr__("bid_volume_" + str(n + 1), float(volume)) + + asks = data["tick"]["asks"] + for n in range(5): + price, volume = asks[n] + tick.__setattr__("ask_price_" + str(n + 1), float(price)) + tick.__setattr__("ask_volume_" + str(n + 1), float(volume)) + + if tick.last_price: + self.gateway.on_tick(copy(tick)) + + def on_market_detail(self, data): + """市场细节推送""" + symbol = data["ch"].split(".")[1] + tick = self.ticks[symbol] + tick.datetime = datetime.fromtimestamp(data["ts"] / 1000) + + tick_data = data["tick"] + tick.open_price = float(tick_data["open"]) + tick.high_price = float(tick_data["high"]) + tick.low_price = float(tick_data["low"]) + tick.last_price = float(tick_data["close"]) + tick.volume = float(tick_data["vol"]) + + if tick.bid_price_1: + self.gateway.on_tick(copy(tick)) + + +def _split_url(url): + """ + 将url拆分为host和path + :return: host, path + """ + result = re.match("\w+://([^/]*)(.*)", url) # noqa + if result: + return result.group(1), result.group(2) \ No newline at end of file From 7abd9b977368a189ca3f298e566dd1dd5b7a66d1 Mon Sep 17 00:00:00 2001 From: 1122455801 Date: Tue, 11 Jun 2019 17:02:48 +0800 Subject: [PATCH 4/8] Update constant.py --- vnpy/trader/constant.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vnpy/trader/constant.py b/vnpy/trader/constant.py index 785ecabf..73ae1ee4 100644 --- a/vnpy/trader/constant.py +++ b/vnpy/trader/constant.py @@ -114,6 +114,7 @@ class Exchange(Enum): OKEX = "OKEX" HUOBI = "HUOBI" BITFINEX = "BITFINEX" + BINANCE = "BINANCE" class Currency(Enum): From ebf49abfcb40eeded2a663fc4dcad1328087c608 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Tue, 11 Jun 2019 22:59:10 +0800 Subject: [PATCH 5/8] [Mod] remove unecessary code --- tests/trader/run.py | 80 ++++++++++++------------- vnpy/gateway/binance/binance_gateway.py | 50 +++++++--------- 2 files changed, 63 insertions(+), 67 deletions(-) diff --git a/tests/trader/run.py b/tests/trader/run.py index 783c88d9..130aa088 100644 --- a/tests/trader/run.py +++ b/tests/trader/run.py @@ -4,28 +4,28 @@ from vnpy.trader.engine import MainEngine from vnpy.trader.ui import MainWindow, create_qapp from vnpy.gateway.binance import BinanceGateway -#from vnpy.gateway.bitmex import BitmexGateway -#from vnpy.gateway.futu import FutuGateway -#from vnpy.gateway.ib import IbGateway -#from vnpy.gateway.ctp import CtpGateway +# from vnpy.gateway.bitmex import BitmexGateway +# from vnpy.gateway.futu import FutuGateway +# from vnpy.gateway.ib import IbGateway +# from vnpy.gateway.ctp import CtpGateway # from vnpy.gateway.ctptest import CtptestGateway -#from vnpy.gateway.femas import FemasGateway -#from vnpy.gateway.tiger import TigerGateway -#from vnpy.gateway.oes import OesGateway -#from vnpy.gateway.okex import OkexGateway -#from vnpy.gateway.huobi import HuobiGateway -#from vnpy.gateway.bitfinex import BitfinexGateway -#from vnpy.gateway.onetoken import OnetokenGateway -#from vnpy.gateway.okexf import OkexfGateway -#from vnpy.gateway.xtp import XtpGateway -#from vnpy.gateway.hbdm import HbdmGateway +# from vnpy.gateway.femas import FemasGateway +# from vnpy.gateway.tiger import TigerGateway +# from vnpy.gateway.oes import OesGateway +# from vnpy.gateway.okex import OkexGateway +# from vnpy.gateway.huobi import HuobiGateway +# from vnpy.gateway.bitfinex import BitfinexGateway +# from vnpy.gateway.onetoken import OnetokenGateway +# from vnpy.gateway.okexf import OkexfGateway +# from vnpy.gateway.xtp import XtpGateway +# from vnpy.gateway.hbdm import HbdmGateway -#from vnpy.app.cta_strategy import CtaStrategyApp -#from vnpy.app.csv_loader import CsvLoaderApp -#from vnpy.app.algo_trading import AlgoTradingApp -#from vnpy.app.cta_backtester import CtaBacktesterApp -#from vnpy.app.data_recorder import DataRecorderApp -#from vnpy.app.risk_manager import RiskManagerApp +# from vnpy.app.cta_strategy import CtaStrategyApp +# from vnpy.app.csv_loader import CsvLoaderApp +# from vnpy.app.algo_trading import AlgoTradingApp +# from vnpy.app.cta_backtester import CtaBacktesterApp +# from vnpy.app.data_recorder import DataRecorderApp +# from vnpy.app.risk_manager import RiskManagerApp def main(): @@ -37,28 +37,28 @@ def main(): main_engine = MainEngine(event_engine) main_engine.add_gateway(BinanceGateway) - #main_engine.add_gateway(XtpGateway) - #main_engine.add_gateway(CtpGateway) + # main_engine.add_gateway(XtpGateway) + # main_engine.add_gateway(CtpGateway) # main_engine.add_gateway(CtptestGateway) - #main_engine.add_gateway(FemasGateway) - #main_engine.add_gateway(IbGateway) - #main_engine.add_gateway(FutuGateway) - #main_engine.add_gateway(BitmexGateway) - #main_engine.add_gateway(TigerGateway) - #main_engine.add_gateway(OesGateway) - #main_engine.add_gateway(OkexGateway) - #main_engine.add_gateway(HuobiGateway) - #main_engine.add_gateway(BitfinexGateway) - #main_engine.add_gateway(OnetokenGateway) - #main_engine.add_gateway(OkexfGateway) - #main_engine.add_gateway(HbdmGateway) + # main_engine.add_gateway(FemasGateway) + # main_engine.add_gateway(IbGateway) + # main_engine.add_gateway(FutuGateway) + # main_engine.add_gateway(BitmexGateway) + # main_engine.add_gateway(TigerGateway) + # main_engine.add_gateway(OesGateway) + # main_engine.add_gateway(OkexGateway) + # main_engine.add_gateway(HuobiGateway) + # main_engine.add_gateway(BitfinexGateway) + # main_engine.add_gateway(OnetokenGateway) + # main_engine.add_gateway(OkexfGateway) + # main_engine.add_gateway(HbdmGateway) - #main_engine.add_app(CtaStrategyApp) - #main_engine.add_app(CtaBacktesterApp) - #main_engine.add_app(CsvLoaderApp) - #main_engine.add_app(AlgoTradingApp) - #main_engine.add_app(DataRecorderApp) - #main_engine.add_app(RiskManagerApp) + # main_engine.add_app(CtaStrategyApp) + # main_engine.add_app(CtaBacktesterApp) + # main_engine.add_app(CsvLoaderApp) + # main_engine.add_app(AlgoTradingApp) + # main_engine.add_app(DataRecorderApp) + # main_engine.add_app(RiskManagerApp) main_window = MainWindow(main_engine, event_engine) main_window.showMaximized() diff --git a/vnpy/gateway/binance/binance_gateway.py b/vnpy/gateway/binance/binance_gateway.py index 3c67ff2e..0415f186 100644 --- a/vnpy/gateway/binance/binance_gateway.py +++ b/vnpy/gateway/binance/binance_gateway.py @@ -1,20 +1,15 @@ -# encoding: UTF-8 - """ -币安交易接口 +Gateway for Binance Crypto Exchange. """ import re import urllib -import base64 import json -import zlib import hashlib import hmac import time from copy import copy from datetime import datetime -from threading import Thread from vnpy.event import Event from vnpy.api.rest import RestClient, Request @@ -57,16 +52,15 @@ ORDERTYPE_VT2BINANCE = { OrderType.MARKET: "MARKET", OrderType.STOP: "STOP_LOSS", } - ORDERTYPE_BINANCE2VT = {v: k for k, v in ORDERTYPE_VT2BINANCE.items()} DIRECTION_VT2BINANCE = { Direction.LONG: "BUY", Direction.SHORT: "SELL" } - DIRECTION_BINANCE2VT = {v: k for k, v in DIRECTION_VT2BINANCE.items()} + binance_symbols = set() symbol_name_map = {} @@ -80,15 +74,15 @@ class BinanceGateway(BaseGateway): "key": "", "secret": "", "session_number": 3, - "proxy_host": "127.0.0.1", - "proxy_port": 2000, + "proxy_host": "", + "proxy_port": 0, } exchanges = [Exchange.BINANCE] def __init__(self, event_engine): """Constructor""" - super(BinanceGateway, self).__init__(event_engine, "BINANCE") + super().__init__(event_engine, "BINANCE") self.order_manager = LocalOrderManager(self) @@ -106,15 +100,15 @@ class BinanceGateway(BaseGateway): self.rest_api.connect(key, secret, session_number, proxy_host, proxy_port) - #self.trade_ws_api.connect(key, secret, proxy_host, proxy_port) - #self.market_ws_api.connect(key, secret, proxy_host, proxy_port) + # self.trade_ws_api.connect(key, secret, proxy_host, proxy_port) + # self.market_ws_api.connect(key, secret, proxy_host, proxy_port) - #self.init_query() + # self.init_query() def subscribe(self, req: SubscribeRequest): """""" self.market_ws_api.subscribe(req) - #self.trade_ws_api.subscribe(req) + # self.trade_ws_api.subscribe(req) def send_order(self, req: OrderRequest): """""" @@ -151,6 +145,7 @@ class BinanceGateway(BaseGateway): self.count = 0 self.event_engine.register(EVENT_TIMER, self.process_timer_event) + class BinanceRestApi(RestClient): """ BINANCE REST API @@ -448,7 +443,7 @@ class BinanceRestApi(RestClient): gateway_name=self.gateway_name, ) - print("委托查询--远端id:",sys_orderid, "本地Id:", local_orderid) + print("委托查询--远端id:", sys_orderid, "本地Id:", local_orderid) self.order_manager.on_order(order) self.gateway.write_log("委托信息查询成功") @@ -578,7 +573,7 @@ class BinanceWebsocketApiBase(WebsocketClient): self.key = "" self.secret = "" - #self.sign_host = "" + # self.sign_host = "" self.path = "" def connect( @@ -593,9 +588,9 @@ class BinanceWebsocketApiBase(WebsocketClient): self.key = key self.secret = secret - #host, path = _split_url(url) - #self.sign_host = host - #self.path = path + # host, path = _split_url(url) + # self.sign_host = host + # self.path = path self.init(url, proxy_host, proxy_port) self.start() @@ -603,7 +598,7 @@ class BinanceWebsocketApiBase(WebsocketClient): def login(self): """""" params = {"op": "auth"} - #params.update(create_signature(self.key, "GET", self.sign_host, self.path, self.secret)) + # params.update(create_signature(self.key, "GET", self.sign_host, self.path, self.secret)) return self.send_packet(params) def on_login(self, packet): @@ -616,14 +611,14 @@ class BinanceWebsocketApiBase(WebsocketClient): print("==============unpack_data============") print(data) return json.loads(data) - #return json.loads(zlib.decompress(data, zlib.Z_BEST_COMPRESSION)) + # return json.loads(zlib.decompress(data, zlib.Z_BEST_COMPRESSION)) def on_packet(self, packet): """""" print("=============on_packet=============") - print("event type:"+packet["e"]) + print("event type:" + packet["e"]) print(packet) - #if packet["e"] == "executionReport": + # if packet["e"] == "executionReport": if "ping" in packet: req = {"pong": packet["ping"]} self.send_packet(req) @@ -731,7 +726,7 @@ class BinanceTradeWebsocketApi(BinanceWebsocketApiBase): # Push order event order.traded += traded_volume - order.status = STATUS_BINANCE2VT.get(packet["X"], None) + order.status = STATUS_BINANCE2VT.get(data["X"], None) order.price = float(data["L"]) order.time = data["O"] order.symbol = data["s"] @@ -760,6 +755,7 @@ class BinanceTradeWebsocketApi(BinanceWebsocketApiBase): """""" self.gateway.on_account(data) + class BinanceDataWebsocketApi(BinanceWebsocketApiBase): """""" @@ -783,7 +779,7 @@ class BinanceDataWebsocketApi(BinanceWebsocketApiBase): """""" symbol = req.symbol print("============BinanceDataWebsocketApi.subscribe===========") - print("symbol"+symbol) + print("symbol" + symbol) # Create tick data buffer tick = TickData( symbol=symbol, @@ -871,4 +867,4 @@ def _split_url(url): """ result = re.match("\w+://([^/]*)(.*)", url) # noqa if result: - return result.group(1), result.group(2) \ No newline at end of file + return result.group(1), result.group(2) From 4ae3b2acae8d5b156149e734342885ee6c90df91 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Tue, 11 Jun 2019 23:00:12 +0800 Subject: [PATCH 6/8] [Mod] remove init_query function --- vnpy/gateway/binance/binance_gateway.py | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/vnpy/gateway/binance/binance_gateway.py b/vnpy/gateway/binance/binance_gateway.py index 0415f186..625ef0ad 100644 --- a/vnpy/gateway/binance/binance_gateway.py +++ b/vnpy/gateway/binance/binance_gateway.py @@ -103,8 +103,6 @@ class BinanceGateway(BaseGateway): # self.trade_ws_api.connect(key, secret, proxy_host, proxy_port) # self.market_ws_api.connect(key, secret, proxy_host, proxy_port) - # self.init_query() - def subscribe(self, req: SubscribeRequest): """""" self.market_ws_api.subscribe(req) @@ -120,7 +118,7 @@ class BinanceGateway(BaseGateway): def query_account(self): """""" - self.rest_api.query_account() + pass def query_position(self): """""" @@ -132,19 +130,6 @@ class BinanceGateway(BaseGateway): self.trade_ws_api.stop() self.market_ws_api.stop() - def process_timer_event(self, event: Event): - """""" - self.count += 1 - if self.count < 3: - return - - self.query_account() - - def init_query(self): - """""" - self.count = 0 - self.event_engine.register(EVENT_TIMER, self.process_timer_event) - class BinanceRestApi(RestClient): """ From 93b4ff40a30a5132841aba74ec7e412c7764184b Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Tue, 11 Jun 2019 23:46:54 +0800 Subject: [PATCH 7/8] [Del] remove _split_url --- vnpy/gateway/binance/binance_gateway.py | 141 ++++++++++++------------ 1 file changed, 69 insertions(+), 72 deletions(-) diff --git a/vnpy/gateway/binance/binance_gateway.py b/vnpy/gateway/binance/binance_gateway.py index 625ef0ad..28cced5f 100644 --- a/vnpy/gateway/binance/binance_gateway.py +++ b/vnpy/gateway/binance/binance_gateway.py @@ -11,7 +11,6 @@ import time from copy import copy from datetime import datetime -from vnpy.event import Event from vnpy.api.rest import RestClient, Request from vnpy.api.websocket import WebsocketClient from vnpy.trader.constant import ( @@ -21,7 +20,7 @@ from vnpy.trader.constant import ( Status, OrderType ) -from vnpy.trader.gateway import BaseGateway, LocalOrderManager +from vnpy.trader.gateway import BaseGateway from vnpy.trader.object import ( TickData, OrderData, @@ -32,11 +31,11 @@ from vnpy.trader.object import ( CancelRequest, SubscribeRequest ) -from vnpy.trader.event import EVENT_TIMER REST_HOST = "https://www.binance.com" -WEBSOCKET_TRADE_HOST = "wss://stream.binance.com:9443/ws/" # Account and Order +# Account and Order +WEBSOCKET_TRADE_HOST = "wss://stream.binance.com:9443/ws/" WEBSOCKET_DATA_HOST = "wss://stream.binance.com:9443/stream?streams=" # Market Data STATUS_BINANCE2VT = { @@ -50,7 +49,7 @@ STATUS_BINANCE2VT = { ORDERTYPE_VT2BINANCE = { OrderType.LIMIT: "LIMIT", OrderType.MARKET: "MARKET", - OrderType.STOP: "STOP_LOSS", + OrderType.STOP: "STOP_LOSS", } ORDERTYPE_BINANCE2VT = {v: k for k, v in ORDERTYPE_VT2BINANCE.items()} @@ -84,8 +83,6 @@ class BinanceGateway(BaseGateway): """Constructor""" super().__init__(event_engine, "BINANCE") - self.order_manager = LocalOrderManager(self) - self.rest_api = BinanceRestApi(self) self.trade_ws_api = BinanceTradeWebsocketApi(self) self.market_ws_api = BinanceDataWebsocketApi(self) @@ -100,13 +97,13 @@ class BinanceGateway(BaseGateway): self.rest_api.connect(key, secret, session_number, proxy_host, proxy_port) - # self.trade_ws_api.connect(key, secret, proxy_host, proxy_port) - # self.market_ws_api.connect(key, secret, proxy_host, proxy_port) + self.trade_ws_api.connect(key, secret, proxy_host, proxy_port) + self.market_ws_api.connect(key, secret, proxy_host, proxy_port) def subscribe(self, req: SubscribeRequest): """""" self.market_ws_api.subscribe(req) - # self.trade_ws_api.subscribe(req) + self.trade_ws_api.subscribe(req) def send_order(self, req: OrderRequest): """""" @@ -136,21 +133,21 @@ class BinanceRestApi(RestClient): BINANCE REST API """ - def __init__(self, gateway: BaseGateway): + def __init__(self, gateway: BinanceGateway): """""" - super(BinanceRestApi, self).__init__() + super().__init__() self.gateway = gateway self.gateway_name = gateway.gateway_name - self.order_manager = gateway.order_manager - self.trade_ws_api = BinanceTradeWebsocketApi(self.gateway) - self.host = "" + self.trade_ws_api = self.gateway.trade_ws_api + self.key = "" self.secret = "" - self.userStreamKey = "" - self.keepaliveCount = 0 - self.recvWindow = 5000 + + self.user_stream_key = "" + self.keep_alive_count = 0 + self.recv_window = 5000 self.time_offset = 0 self.cancel_requests = {} @@ -165,34 +162,44 @@ class BinanceRestApi(RestClient): else: request.params = dict() path = request.path + security = "NONE" + if request.data: security = request.data['security'] if security == "SIGNED": timestamp = int(time.time() * 1000) + if self.time_offset > 0: timestamp -= abs(self.time_offset) elif self.time_offset < 0: timestamp += abs(self.time_offset) + request.params['timestamp'] = timestamp - request.params['recvWindow'] = self.recvWindow + request.params['recv_window'] = self.recv_window query = urllib.parse.urlencode(sorted(request.params.items())) - signature = hmac.new(self.secret, query.encode('utf-8'), hashlib.sha256).hexdigest() + signature = hmac.new(self.secret, query.encode( + 'utf-8'), hashlib.sha256).hexdigest() + query += "&signature={}".format(signature) path = request.path + "?" + query + request.path = path request.params = {} request.data = {} + # Add headers headers = { "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json", "X-MBX-APIKEY": self.key } + if security == "SIGNED" or security == "API-KEY": request.headers = headers + return request def connect( @@ -209,9 +216,7 @@ class BinanceRestApi(RestClient): self.key = key self.secret = secret.encode() self.proxy_port = proxy_port - self.proxy_host = proxy_host - - self.host, _ = _split_url(REST_HOST) + self.proxy_host = proxy_host self.init(REST_HOST, proxy_host, proxy_port) self.start(session_number) @@ -222,7 +227,7 @@ class BinanceRestApi(RestClient): self.query_account() self.query_order() self.query_contract() - self.start_userStream() + self.start_user_stream() def query_time(self): """""" @@ -230,6 +235,7 @@ class BinanceRestApi(RestClient): "security": "NONE" } path = '/api/v1/time' + return self.add_request( "GET", path, @@ -331,7 +337,7 @@ class BinanceRestApi(RestClient): print("撤单本地id:", req.orderid, "撤单远端id:", sys_orderid) - def start_userStream(self): + def start_user_stream(self): """""" data = { "security": "API-KEY" @@ -339,26 +345,26 @@ class BinanceRestApi(RestClient): self.add_request( method="POST", path='/api/v1/userDataStream', - callback=self.on_start_userStream, + callback=self.on_start_user_stream, data=data ) def keepalive_userStream(self): """""" - self.keepaliveCount += 1 - if self.keepaliveCount < 1800: + self.keep_alive_count += 1 + if self.keep_alive_count < 1800: return data = { "security": "SIGNED" } params = { - 'listenKey': self.userStreamKey + 'listenKey': self.user_stream_key } self.add_request( - method='PUT', - path='/api/v1/userDataStream', + method='PUT', + path='/api/v1/userDataStream', callback=self.on_keepalive_userStream, - params=params, + params=params, data=data ) @@ -376,7 +382,7 @@ class BinanceRestApi(RestClient): callback=self.on_close_userStream, params=params, data=data - ) + ) def on_query_time(self, data, request): """""" @@ -473,7 +479,7 @@ class BinanceRestApi(RestClient): order.status = STATUS_BINANCE2VT.get(data["status"], None) sys_orderid = data["orderId"] self.order_manager.on_order(order) - self.order_manager.update_orderid_map(order.orderid, sys_orderid) + self.order_manager.update_orderid_map(order.orderid, sys_orderid) def on_send_order_failed(self, status_code: str, request: Request): """ @@ -514,22 +520,22 @@ class BinanceRestApi(RestClient): self.order_manager.on_order(order) - def on_start_userStream(self, data, request): - self.userStreamKey = data['listenKey'] - self.keepaliveCount = 0 - url = WEBSOCKET_TRADE_HOST + self.userStreamKey + def on_start_user_stream(self, data, request): + self.user_stream_key = data['listenKey'] + self.keep_alive_count = 0 + url = WEBSOCKET_TRADE_HOST + self.user_stream_key self.trade_ws_api.connect( - key=self.key, - secret=self.secret, - url=url, - proxy_host=self.proxy_host, + key=self.key, + secret=self.secret, + url=url, + proxy_host=self.proxy_host, proxy_port=self.proxy_port) def on_keepalive_userStream(self, data, request): self.gateway.write_log("交易推送刷新成功") - if self.keepaliveCount >= 1800: - self.keepaliveCount = 0 - self.keepalive_userStream(self.userStreamKey) + if self.keep_alive_count >= 1800: + self.keep_alive_count = 0 + self.keepalive_userStream(self.user_stream_key) def on_close_userStream(self, listenKey): self.gateway.write_log("交易推送关闭") @@ -562,11 +568,11 @@ class BinanceWebsocketApiBase(WebsocketClient): self.path = "" def connect( - self, - key: str, - secret: str, - url: str, - proxy_host: str, + self, + key: str, + secret: str, + url: str, + proxy_host: str, proxy_port: int ): """""" @@ -583,7 +589,7 @@ class BinanceWebsocketApiBase(WebsocketClient): def login(self): """""" params = {"op": "auth"} - # params.update(create_signature(self.key, "GET", self.sign_host, self.path, self.secret)) + # params.update(create_signature(self.key, "GET", self.sign_host, self.path, self.secret)) return self.send_packet(params) def on_login(self, packet): @@ -620,11 +626,11 @@ class BinanceWebsocketApiBase(WebsocketClient): else: self.on_data(packet) - def on_data(self, packet): + def on_data(self, packet): """""" print("data : {}".format(packet)) - def on_error_msg(self, packet): + def on_error_msg(self, packet): """""" msg = packet["err-msg"] if msg == "invalid pong": @@ -635,6 +641,7 @@ class BinanceWebsocketApiBase(WebsocketClient): class BinanceTradeWebsocketApi(BinanceWebsocketApiBase): """""" + def __init__(self, gateway): """""" super().__init__(gateway) @@ -686,7 +693,7 @@ class BinanceTradeWebsocketApi(BinanceWebsocketApiBase): # gateway_name=self.gateway_name # ) # self.on_order(order) - + # push account data change if packet["e"] == "outboundAccountInfo": for account_data in packet["B"]: @@ -712,11 +719,11 @@ class BinanceTradeWebsocketApi(BinanceWebsocketApiBase): order.traded += traded_volume order.status = STATUS_BINANCE2VT.get(data["X"], None) - order.price = float(data["L"]) - order.time = data["O"] - order.symbol = data["s"] + order.price = float(data["L"]) + order.time = data["O"] + order.symbol = data["s"] - print("远端ID:", sys_orderid, "本地ID:", order) + print("远端ID:", sys_orderid, "本地ID:", order) self.order_manager.on_order(order) # Push trade event @@ -733,7 +740,7 @@ class BinanceTradeWebsocketApi(BinanceWebsocketApiBase): volume=float(order.traded), time=datetime.now().strftime("%H:%M:%S"), gateway_name=self.gateway_name, - ) + ) self.gateway.on_trade(trade) def on_account(self, data: dict): @@ -773,13 +780,13 @@ class BinanceDataWebsocketApi(BinanceWebsocketApiBase): datetime=datetime.now(), gateway_name=self.gateway_name, ) - self.ticks[symbol] = tick + self.ticks[symbol] = tick # Subscribe to market depth update self.req_id += 1 req = { "sub": f"market.{symbol}.depth.step0", - "id": str(self.req_id) + "id": str(self.req_id) } self.send_packet(req) @@ -787,7 +794,7 @@ class BinanceDataWebsocketApi(BinanceWebsocketApiBase): self.req_id += 1 req = { "sub": f"market.{symbol}.detail", - "id": str(self.req_id) + "id": str(self.req_id) } self.send_packet(req) @@ -843,13 +850,3 @@ class BinanceDataWebsocketApi(BinanceWebsocketApiBase): if tick.bid_price_1: self.gateway.on_tick(copy(tick)) - - -def _split_url(url): - """ - 将url拆分为host和path - :return: host, path - """ - result = re.match("\w+://([^/]*)(.*)", url) # noqa - if result: - return result.group(1), result.group(2) From e90e1ee104aacb485c0b680fafb3aaf288d3ea24 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Wed, 12 Jun 2019 13:34:32 +0800 Subject: [PATCH 8/8] [Mod] complete test of BinanceGateway --- vnpy/gateway/binance/binance_gateway.py | 559 ++++++++---------------- vnpy/gateway/bitmex/bitmex_gateway.py | 1 + 2 files changed, 194 insertions(+), 366 deletions(-) diff --git a/vnpy/gateway/binance/binance_gateway.py b/vnpy/gateway/binance/binance_gateway.py index 28cced5f..b3454d71 100644 --- a/vnpy/gateway/binance/binance_gateway.py +++ b/vnpy/gateway/binance/binance_gateway.py @@ -2,14 +2,14 @@ Gateway for Binance Crypto Exchange. """ -import re import urllib -import json import hashlib import hmac import time from copy import copy from datetime import datetime +from enum import Enum +from threading import Lock from vnpy.api.rest import RestClient, Request from vnpy.api.websocket import WebsocketClient @@ -31,12 +31,13 @@ from vnpy.trader.object import ( CancelRequest, SubscribeRequest ) +from vnpy.trader.event import EVENT_TIMER +from vnpy.event import Event REST_HOST = "https://www.binance.com" -# Account and Order WEBSOCKET_TRADE_HOST = "wss://stream.binance.com:9443/ws/" -WEBSOCKET_DATA_HOST = "wss://stream.binance.com:9443/stream?streams=" # Market Data +WEBSOCKET_DATA_HOST = "wss://stream.binance.com:9443/stream?streams=" STATUS_BINANCE2VT = { "NEW": Status.NOTTRADED, @@ -48,8 +49,7 @@ STATUS_BINANCE2VT = { ORDERTYPE_VT2BINANCE = { OrderType.LIMIT: "LIMIT", - OrderType.MARKET: "MARKET", - OrderType.STOP: "STOP_LOSS", + OrderType.MARKET: "MARKET" } ORDERTYPE_BINANCE2VT = {v: k for k, v in ORDERTYPE_VT2BINANCE.items()} @@ -60,7 +60,12 @@ DIRECTION_VT2BINANCE = { DIRECTION_BINANCE2VT = {v: k for k, v in DIRECTION_VT2BINANCE.items()} -binance_symbols = set() +class Security(Enum): + NONE = 0 + SIGNED = 1 + API_KEY = 2 + + symbol_name_map = {} @@ -83,9 +88,11 @@ class BinanceGateway(BaseGateway): """Constructor""" super().__init__(event_engine, "BINANCE") - self.rest_api = BinanceRestApi(self) self.trade_ws_api = BinanceTradeWebsocketApi(self) self.market_ws_api = BinanceDataWebsocketApi(self) + self.rest_api = BinanceRestApi(self) + + self.event_engine.register(EVENT_TIMER, self.process_timer_event) def connect(self, setting: dict): """""" @@ -97,13 +104,11 @@ class BinanceGateway(BaseGateway): self.rest_api.connect(key, secret, session_number, proxy_host, proxy_port) - self.trade_ws_api.connect(key, secret, proxy_host, proxy_port) - self.market_ws_api.connect(key, secret, proxy_host, proxy_port) + self.market_ws_api.connect(proxy_host, proxy_port) def subscribe(self, req: SubscribeRequest): """""" self.market_ws_api.subscribe(req) - self.trade_ws_api.subscribe(req) def send_order(self, req: OrderRequest): """""" @@ -127,6 +132,10 @@ class BinanceGateway(BaseGateway): self.trade_ws_api.stop() self.market_ws_api.stop() + def process_timer_event(self, event: Event): + """""" + self.rest_api.keep_user_stream() + class BinanceRestApi(RestClient): """ @@ -150,8 +159,9 @@ class BinanceRestApi(RestClient): self.recv_window = 5000 self.time_offset = 0 - self.cancel_requests = {} - self.orders = {} + self.order_count = 1_000_000 + self.order_count_lock = Lock() + self.connect_time = 0 def sign(self, request): """ @@ -163,12 +173,9 @@ class BinanceRestApi(RestClient): request.params = dict() path = request.path - security = "NONE" + security = request.data["security"] - if request.data: - security = request.data['security'] - - if security == "SIGNED": + if security == Security.SIGNED: timestamp = int(time.time() * 1000) if self.time_offset > 0: @@ -176,12 +183,12 @@ class BinanceRestApi(RestClient): elif self.time_offset < 0: timestamp += abs(self.time_offset) - request.params['timestamp'] = timestamp - request.params['recv_window'] = self.recv_window + request.params["timestamp"] = timestamp + # request.params["recv_window"] = self.recv_window query = urllib.parse.urlencode(sorted(request.params.items())) signature = hmac.new(self.secret, query.encode( - 'utf-8'), hashlib.sha256).hexdigest() + "utf-8"), hashlib.sha256).hexdigest() query += "&signature={}".format(signature) path = request.path + "?" + query @@ -197,7 +204,7 @@ class BinanceRestApi(RestClient): "X-MBX-APIKEY": self.key } - if security == "SIGNED" or security == "API-KEY": + if security == Security.SIGNED or security == Security.API_KEY: request.headers = headers return request @@ -218,6 +225,10 @@ class BinanceRestApi(RestClient): self.proxy_port = proxy_port self.proxy_host = proxy_host + self.connect_time = ( + int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count + ) + self.init(REST_HOST, proxy_host, proxy_port) self.start(session_number) @@ -232,9 +243,9 @@ class BinanceRestApi(RestClient): def query_time(self): """""" data = { - "security": "NONE" + "security": Security.NONE } - path = '/api/v1/time' + path = "/api/v1/time" return self.add_request( "GET", @@ -245,9 +256,8 @@ class BinanceRestApi(RestClient): def query_account(self): """""" - data = { - "security": "SIGNED" - } + data = {"security": Security.SIGNED} + self.add_request( method="GET", path="/api/v3/account", @@ -257,9 +267,8 @@ class BinanceRestApi(RestClient): def query_order(self): """""" - data = { - "security": "SIGNED" - } + data = {"security": Security.SIGNED} + self.add_request( method="GET", path="/api/v3/openOrders", @@ -270,7 +279,7 @@ class BinanceRestApi(RestClient): def query_contract(self): """""" data = { - "security": "NONE" + "security": Security.NONE } self.add_request( method="GET", @@ -279,17 +288,23 @@ class BinanceRestApi(RestClient): data=data ) + def _new_order_id(self): + """""" + with self.order_count_lock: + self.order_count += 1 + return self.order_count + def send_order(self, req: OrderRequest): """""" - local_orderid = self.order_manager.new_local_orderid() + orderid = str(self.connect_time + self._new_order_id()) order = req.create_order_data( - local_orderid, + orderid, self.gateway_name ) - order.time = datetime.now().strftime("%H:%M:%S") + self.gateway.on_order(order) data = { - "security": "SIGNED" + "security": Security.SIGNED } params = { @@ -298,7 +313,9 @@ class BinanceRestApi(RestClient): "side": DIRECTION_VT2BINANCE[req.direction], "type": ORDERTYPE_VT2BINANCE[req.type], "price": str(req.price), - "quantity": str(req.volume) + "quantity": str(req.volume), + "newClientOrderId": orderid, + "newOrderRespType": "ACK" } self.add_request( @@ -312,20 +329,19 @@ class BinanceRestApi(RestClient): on_failed=self.on_send_order_failed ) - self.order_manager.on_order(order) return order.vt_orderid def cancel_order(self, req: CancelRequest): """""" - sys_orderid = self.order_manager.get_sys_orderid(req.orderid) - data = { - "security": "SIGNED" + "security": Security.SIGNED } + params = { "symbol": req.symbol, - "orderId": sys_orderid + "origClientOrderId": req.orderid } + self.add_request( method="DELETE", path="/api/v3/order", @@ -335,107 +351,82 @@ class BinanceRestApi(RestClient): extra=req ) - print("撤单本地id:", req.orderid, "撤单远端id:", sys_orderid) - def start_user_stream(self): """""" data = { - "security": "API-KEY" + "security": Security.API_KEY } + self.add_request( method="POST", - path='/api/v1/userDataStream', + path="/api/v1/userDataStream", callback=self.on_start_user_stream, data=data ) - def keepalive_userStream(self): + def keep_user_stream(self): """""" self.keep_alive_count += 1 if self.keep_alive_count < 1800: return - data = { - "security": "SIGNED" - } - params = { - 'listenKey': self.user_stream_key - } - self.add_request( - method='PUT', - path='/api/v1/userDataStream', - callback=self.on_keepalive_userStream, - params=params, - data=data - ) - def close_userStream(self, listenKey): - """""" data = { - "security": "SIGNED" + "security": Security.SIGNED } + params = { - 'listenKey': listenKey + "listenKey": self.user_stream_key } + self.add_request( - method='DELETE', - path='/api/v1/userDataStream', - callback=self.on_close_userStream, + method="PUT", + path="/api/v1/userDataStream", + callback=self.on_keep_user_stream, params=params, data=data ) def on_query_time(self, data, request): """""" - time_now = int(time.time() * 1000) - time_server = int(data["serverTime"]) - server_local_time = time.localtime(float(time_server / 1000)) - now_local_time = time.localtime(float(time_now / 1000)) - self.time_offset = time_now - time_server - server_time = time.strftime("%Y-%m-%d %H:%M:%S", server_local_time) - local_time = time.strftime("%Y-%m-%d %H:%M:%S", now_local_time) - msg = f"服务器时间:{server_time},本机时间:{local_time}" - self.gateway.write_log(msg) + local_time = int(time.time() * 1000) + server_time = int(data["serverTime"]) + self.time_offset = local_time - server_time def on_query_account(self, data, request): """""" for account_data in data["balances"]: account = AccountData( accountid=account_data["asset"], - balance=float(account_data["free"]), + balance=float(account_data["free"]) + float(account_data["locked"]), frozen=float(account_data["locked"]), gateway_name=self.gateway_name ) - self.gateway.on_account(account) + + if account.balance: + self.gateway.on_account(account) self.gateway.write_log("账户资金查询成功") def on_query_order(self, data, request): """""" for d in data: - sys_orderid = d["orderId"] - local_orderid = self.order_manager.get_local_orderid(sys_orderid) - - direction = DIRECTION_BINANCE2VT[d["side"]] - order_type = ORDERTYPE_BINANCE2VT[d["type"]] dt = datetime.fromtimestamp(d["time"] / 1000) time = dt.strftime("%Y-%m-%d %H:%M:%S") order = OrderData( - orderid=local_orderid, + orderid=d["clientOrderId"], symbol=d["symbol"], exchange=Exchange.BINANCE, price=float(d["price"]), volume=float(d["origQty"]), - type=order_type, - direction=direction, + type=ORDERTYPE_BINANCE2VT[d["type"]], + direction=DIRECTION_BINANCE2VT[d["side"]], traded=float(d["executedQty"]), status=STATUS_BINANCE2VT.get(d["status"], None), time=time, gateway_name=self.gateway_name, ) - - print("委托查询--远端id:", sys_orderid, "本地Id:", local_orderid) - self.order_manager.on_order(order) + self.gateway.on_order(order) self.gateway.write_log("委托信息查询成功") @@ -445,13 +436,16 @@ class BinanceRestApi(RestClient): base_currency = d["baseAsset"] quote_currency = d["quoteAsset"] name = f"{base_currency.upper()}/{quote_currency.upper()}" + pricetick = 0 min_volume = 0 + for f in d["filters"]: if f["filterType"] == "PRICE_FILTER": pricetick = f["tickSize"] - if f["filterType"] == "LOT_SIZE": + elif f["filterType"] == "LOT_SIZE": min_volume = f["stepSize"] + contract = ContractData( symbol=d["symbol"], exchange=Exchange.BINANCE, @@ -464,22 +458,13 @@ class BinanceRestApi(RestClient): ) self.gateway.on_contract(contract) - binance_symbols.add(contract.symbol) symbol_name_map[contract.symbol] = contract.name self.gateway.write_log("合约信息查询成功") def on_send_order(self, data, request): """""" - order = request.extra - if self.check_error(data, "委托"): - order.status = Status.REJECTED - self.order_manager.on_order(order) - return - order.status = STATUS_BINANCE2VT.get(data["status"], None) - sys_orderid = data["orderId"] - self.order_manager.on_order(order) - self.order_manager.update_orderid_map(order.orderid, sys_orderid) + pass def on_send_order_failed(self, status_code: str, request: Request): """ @@ -508,345 +493,187 @@ class BinanceRestApi(RestClient): def on_cancel_order(self, data, request): """""" - cancel_request = request.extra - local_orderid = cancel_request.orderid - order = self.order_manager.get_order_with_local_orderid(local_orderid) - - if self.check_error(data, "撤单"): - order.status = Status.REJECTED - else: - order.status = Status.CANCELLED - self.gateway.write_log(f"委托撤单成功:{order.orderid}") - - self.order_manager.on_order(order) + pass def on_start_user_stream(self, data, request): - self.user_stream_key = data['listenKey'] + """""" + self.user_stream_key = data["listenKey"] self.keep_alive_count = 0 url = WEBSOCKET_TRADE_HOST + self.user_stream_key - self.trade_ws_api.connect( - key=self.key, - secret=self.secret, - url=url, - proxy_host=self.proxy_host, - proxy_port=self.proxy_port) - def on_keepalive_userStream(self, data, request): - self.gateway.write_log("交易推送刷新成功") - if self.keep_alive_count >= 1800: - self.keep_alive_count = 0 - self.keepalive_userStream(self.user_stream_key) + self.trade_ws_api.connect(url, self.proxy_host, self.proxy_port) - def on_close_userStream(self, listenKey): - self.gateway.write_log("交易推送关闭") - - def check_error(self, data: dict, func: str = ""): + def on_keep_user_stream(self, data, request): """""" - if data["status"] != "error": - return False - - error_code = data["err-code"] - error_msg = data["err-msg"] - - self.gateway.write_log(f"{func}请求出错,代码:{error_code},信息:{error_msg}") - return True + pass -class BinanceWebsocketApiBase(WebsocketClient): +class BinanceTradeWebsocketApi(WebsocketClient): """""" def __init__(self, gateway): """""" - super(BinanceWebsocketApiBase, self).__init__() + super().__init__() self.gateway = gateway self.gateway_name = gateway.gateway_name - self.key = "" - self.secret = "" - # self.sign_host = "" - self.path = "" - - def connect( - self, - key: str, - secret: str, - url: str, - proxy_host: str, - proxy_port: int - ): + def connect(self, url, proxy_host, proxy_port): """""" - self.key = key - self.secret = secret - - # host, path = _split_url(url) - # self.sign_host = host - # self.path = path - self.init(url, proxy_host, proxy_port) self.start() - def login(self): - """""" - params = {"op": "auth"} - # params.update(create_signature(self.key, "GET", self.sign_host, self.path, self.secret)) - return self.send_packet(params) - - def on_login(self, packet): - """""" - pass - - @staticmethod - def unpack_data(data): - """""" - print("==============unpack_data============") - print(data) - return json.loads(data) - # return json.loads(zlib.decompress(data, zlib.Z_BEST_COMPRESSION)) - - def on_packet(self, packet): - """""" - print("=============on_packet=============") - print("event type:" + packet["e"]) - print(packet) - # if packet["e"] == "executionReport": - if "ping" in packet: - req = {"pong": packet["ping"]} - self.send_packet(req) - elif "op" in packet and packet["op"] == "ping": - req = { - "op": "pong", - "ts": packet["ts"] - } - self.send_packet(req) - elif "err-msg" in packet: - return self.on_error_msg(packet) - elif "op" in packet and packet["op"] == "auth": - return self.on_login() - else: - self.on_data(packet) - - def on_data(self, packet): - """""" - print("data : {}".format(packet)) - - def on_error_msg(self, packet): - """""" - msg = packet["err-msg"] - if msg == "invalid pong": - return - - self.gateway.write_log(packet["err-msg"]) - - -class BinanceTradeWebsocketApi(BinanceWebsocketApiBase): - """""" - - def __init__(self, gateway): - """""" - super().__init__(gateway) - - self.order_manager = gateway.order_manager - self.order_manager.push_data_callback = self.on_data - - self.req_id = 0 - - def connect(self, key, secret, url, proxy_host, proxy_port): - """""" - super().connect(key, secret, url, proxy_host, proxy_port) - self.gateway.write_log("交易Websocket API连接成功") - self.gateway.rest_api.keepalive_userStream() - - def subscribe(self, req: SubscribeRequest): - """""" - self.req_id += 1 - req = { - "op": "sub", - "cid": str(self.req_id), - "topic": f"orders.{req.symbol}" - } - self.send_packet(req) - def on_connected(self): """""" - pass + self.gateway.write_log("交易Websocket API连接成功") - def on_login(self): + def on_packet(self, packet: dict): # type: (dict)->None """""" - pass - - def on_data(self, packet): # type: (dict)->None - """""" - print("==========on_data1=========") - # push order data change - if packet["e"] == "executionReport": + if packet["e"] == "outboundAccountInfo": + self.on_account(packet) + else: self.on_order(packet) - # order = OrderData( - # symbol=packet["s"], - # exchange=Exchange.BINANCE, - # orderid=packet["i"], - # status=STATUS_BINANCE2VT.get(packet["X"], None), - # traded=float(packet["Z"]), - # price=float(packet["L"]), - # time=packet["O"], - # gateway_name=self.gateway_name - # ) - # self.on_order(order) - - # push account data change - if packet["e"] == "outboundAccountInfo": - for account_data in packet["B"]: - account = AccountData( - accountid=account_data["a"], - balance=float(account_data["f"]), - frozen=float(account_data["l"]), - gateway_name=self.gateway_name - ) - self.on_account(account) - - def on_order(self, data: dict): + def on_account(self, packet): """""" - sys_orderid = str(data["i"]) + for d in packet["B"]: + account = AccountData( + accountid=d["a"], + balance=float(d["f"]) + float(d["l"]), + frozen=float(d["l"]), + gateway_name=self.gateway_name + ) + + if account.balance: + self.gateway.on_account(account) - order = self.order_manager.get_order_with_sys_orderid(sys_orderid) - if not order: - self.order_manager.add_push_data(sys_orderid, data) - return + def on_order(self, packet: dict): + """""" + dt = datetime.fromtimestamp(packet["O"] / 1000) + time = dt.strftime("%Y-%m-%d %H:%M:%S") - traded_volume = float(data["Z"]) - # Push order event + if packet["C"] == "null": + orderid = packet["c"] + else: + orderid = packet["C"] - order.traded += traded_volume - order.status = STATUS_BINANCE2VT.get(data["X"], None) - order.price = float(data["L"]) - order.time = data["O"] - order.symbol = data["s"] + order = OrderData( + symbol=packet["s"], + exchange=Exchange.BINANCE, + orderid=orderid, + type=ORDERTYPE_BINANCE2VT[packet["o"]], + direction=DIRECTION_BINANCE2VT[packet["S"]], + price=float(packet["p"]), + volume=float(packet["q"]), + traded=float(packet["z"]), + status=STATUS_BINANCE2VT[packet["X"]], + time=time, + gateway_name=self.gateway_name + ) - print("远端ID:", sys_orderid, "本地ID:", order) - self.order_manager.on_order(order) + self.gateway.on_order(order) # Push trade event - traded_volume = data.traded - if not traded_volume: + trade_volume = float(packet["l"]) + if not trade_volume: return + trade_dt = datetime.fromtimestamp(packet["T"] / 1000) + trade_time = trade_dt.strftime("%Y-%m-%d %H:%M:%S") + trade = TradeData( symbol=order.symbol, - exchange=Exchange.BINANCE, + exchange=order.exchange, orderid=order.orderid, + tradeid=packet["t"], direction=order.direction, - price=float(order.price), - volume=float(order.traded), - time=datetime.now().strftime("%H:%M:%S"), + price=float(packet["L"]), + volume=trade_volume, + time=trade_time, gateway_name=self.gateway_name, ) self.gateway.on_trade(trade) - def on_account(self, data: dict): - """""" - self.gateway.on_account(data) - -class BinanceDataWebsocketApi(BinanceWebsocketApiBase): +class BinanceDataWebsocketApi(WebsocketClient): """""" def __init__(self, gateway): """""" - super().__init__(gateway) + super().__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name - self.req_id = 0 self.ticks = {} - def connect(self, key: str, secret: str, proxy_host: str, proxy_port: int): + def connect(self, proxy_host: str, proxy_port: int): """""" - super().connect(key, secret, WEBSOCKET_DATA_HOST, proxy_host, proxy_port) - self.gateway.write_log("行情Websocket API连接成功") + self.proxy_host = proxy_host + self.proxy_port = proxy_port def on_connected(self): """""" - pass + self.gateway.write_log("行情Websocket API连接刷新") def subscribe(self, req: SubscribeRequest): """""" - symbol = req.symbol - print("============BinanceDataWebsocketApi.subscribe===========") - print("symbol" + symbol) - # Create tick data buffer + if req.symbol not in symbol_name_map: + self.gateway.write_log(f"找不到该合约代码{req.symbol}") + return + + # Create tick buf data tick = TickData( - symbol=symbol, - name=symbol_name_map.get(symbol, ""), + symbol=req.symbol, + name=symbol_name_map.get(req.symbol, ""), exchange=Exchange.BINANCE, datetime=datetime.now(), gateway_name=self.gateway_name, ) - self.ticks[symbol] = tick + self.ticks[req.symbol.lower()] = tick - # Subscribe to market depth update - self.req_id += 1 - req = { - "sub": f"market.{symbol}.depth.step0", - "id": str(self.req_id) - } - self.send_packet(req) + # Close previous connection + if self._active: + self.stop() + self.join() - # Subscribe to market detail update - self.req_id += 1 - req = { - "sub": f"market.{symbol}.detail", - "id": str(self.req_id) - } - self.send_packet(req) + # Create new connection + channels = [] + for ws_symbol in self.ticks.keys(): + channels.append(ws_symbol + "@ticker") + channels.append(ws_symbol + "@depth5") - def on_data(self, packet): # type: (dict)->None + url = WEBSOCKET_DATA_HOST + "/".join(channels) + self.init(url, self.proxy_host, self.proxy_port) + self.start() + + def on_packet(self, packet): """""" - print("===================on_data=====================") - print(packet) + stream = packet["stream"] + data = packet["data"] - channel = packet.get("ch", None) - if channel: - if "depth.step" in channel: - self.on_market_depth(packet) - elif "detail" in channel: - self.on_market_detail(packet) - elif "err-code" in packet: - code = packet["err-code"] - msg = packet["err-msg"] - self.gateway.write_log(f"错误代码:{code}, 错误信息:{msg}") - - def on_market_depth(self, data): - """行情深度推送 """ - symbol = data["ch"].split(".")[1] + symbol, channel = stream.split("@") tick = self.ticks[symbol] - tick.datetime = datetime.fromtimestamp(data["ts"] / 1000) - bids = data["tick"]["bids"] - for n in range(5): - price, volume = bids[n] - tick.__setattr__("bid_price_" + str(n + 1), float(price)) - tick.__setattr__("bid_volume_" + str(n + 1), float(volume)) + if channel == "ticker": + tick.volume = float(data['v']) + tick.open_price = float(data['o']) + tick.high_price = float(data['h']) + tick.low_price = float(data['l']) + tick.last_price = float(data['c']) + tick.datetime = datetime.fromtimestamp(float(data['E']) / 1000) + else: + bids = data["bids"] + for n in range(5): + price, volume = bids[n] + tick.__setattr__("bid_price_" + str(n + 1), float(price)) + tick.__setattr__("bid_volume_" + str(n + 1), float(volume)) - asks = data["tick"]["asks"] - for n in range(5): - price, volume = asks[n] - tick.__setattr__("ask_price_" + str(n + 1), float(price)) - tick.__setattr__("ask_volume_" + str(n + 1), float(volume)) + asks = data["asks"] + for n in range(5): + price, volume = asks[n] + tick.__setattr__("ask_price_" + str(n + 1), float(price)) + tick.__setattr__("ask_volume_" + str(n + 1), float(volume)) if tick.last_price: self.gateway.on_tick(copy(tick)) - - def on_market_detail(self, data): - """市场细节推送""" - symbol = data["ch"].split(".")[1] - tick = self.ticks[symbol] - tick.datetime = datetime.fromtimestamp(data["ts"] / 1000) - - tick_data = data["tick"] - tick.open_price = float(tick_data["open"]) - tick.high_price = float(tick_data["high"]) - tick.low_price = float(tick_data["low"]) - tick.last_price = float(tick_data["close"]) - tick.volume = float(tick_data["vol"]) - - if tick.bid_price_1: - self.gateway.on_tick(copy(tick)) diff --git a/vnpy/gateway/bitmex/bitmex_gateway.py b/vnpy/gateway/bitmex/bitmex_gateway.py index e8248f29..72988f56 100644 --- a/vnpy/gateway/bitmex/bitmex_gateway.py +++ b/vnpy/gateway/bitmex/bitmex_gateway.py @@ -233,6 +233,7 @@ class BitmexRestApi(RestClient): self.gateway.write_log("REST API启动成功") def _new_order_id(self): + """""" with self.order_count_lock: self.order_count += 1 return self.order_count