diff --git a/vnpy/gateway/binance/binance_gateway.py b/vnpy/gateway/binance/binance_gateway.py index 625ef0ad..28cced5f 100644 --- a/vnpy/gateway/binance/binance_gateway.py +++ b/vnpy/gateway/binance/binance_gateway.py @@ -11,7 +11,6 @@ import time from copy import copy from datetime import datetime -from vnpy.event import Event from vnpy.api.rest import RestClient, Request from vnpy.api.websocket import WebsocketClient from vnpy.trader.constant import ( @@ -21,7 +20,7 @@ from vnpy.trader.constant import ( Status, OrderType ) -from vnpy.trader.gateway import BaseGateway, LocalOrderManager +from vnpy.trader.gateway import BaseGateway from vnpy.trader.object import ( TickData, OrderData, @@ -32,11 +31,11 @@ from vnpy.trader.object import ( CancelRequest, SubscribeRequest ) -from vnpy.trader.event import EVENT_TIMER REST_HOST = "https://www.binance.com" -WEBSOCKET_TRADE_HOST = "wss://stream.binance.com:9443/ws/" # Account and Order +# Account and Order +WEBSOCKET_TRADE_HOST = "wss://stream.binance.com:9443/ws/" WEBSOCKET_DATA_HOST = "wss://stream.binance.com:9443/stream?streams=" # Market Data STATUS_BINANCE2VT = { @@ -50,7 +49,7 @@ STATUS_BINANCE2VT = { ORDERTYPE_VT2BINANCE = { OrderType.LIMIT: "LIMIT", OrderType.MARKET: "MARKET", - OrderType.STOP: "STOP_LOSS", + OrderType.STOP: "STOP_LOSS", } ORDERTYPE_BINANCE2VT = {v: k for k, v in ORDERTYPE_VT2BINANCE.items()} @@ -84,8 +83,6 @@ class BinanceGateway(BaseGateway): """Constructor""" super().__init__(event_engine, "BINANCE") - self.order_manager = LocalOrderManager(self) - self.rest_api = BinanceRestApi(self) self.trade_ws_api = BinanceTradeWebsocketApi(self) self.market_ws_api = BinanceDataWebsocketApi(self) @@ -100,13 +97,13 @@ class BinanceGateway(BaseGateway): self.rest_api.connect(key, secret, session_number, proxy_host, proxy_port) - # self.trade_ws_api.connect(key, secret, proxy_host, proxy_port) - # self.market_ws_api.connect(key, secret, proxy_host, proxy_port) + self.trade_ws_api.connect(key, secret, proxy_host, proxy_port) + self.market_ws_api.connect(key, secret, proxy_host, proxy_port) def subscribe(self, req: SubscribeRequest): """""" self.market_ws_api.subscribe(req) - # self.trade_ws_api.subscribe(req) + self.trade_ws_api.subscribe(req) def send_order(self, req: OrderRequest): """""" @@ -136,21 +133,21 @@ class BinanceRestApi(RestClient): BINANCE REST API """ - def __init__(self, gateway: BaseGateway): + def __init__(self, gateway: BinanceGateway): """""" - super(BinanceRestApi, self).__init__() + super().__init__() self.gateway = gateway self.gateway_name = gateway.gateway_name - self.order_manager = gateway.order_manager - self.trade_ws_api = BinanceTradeWebsocketApi(self.gateway) - self.host = "" + self.trade_ws_api = self.gateway.trade_ws_api + self.key = "" self.secret = "" - self.userStreamKey = "" - self.keepaliveCount = 0 - self.recvWindow = 5000 + + self.user_stream_key = "" + self.keep_alive_count = 0 + self.recv_window = 5000 self.time_offset = 0 self.cancel_requests = {} @@ -165,34 +162,44 @@ class BinanceRestApi(RestClient): else: request.params = dict() path = request.path + security = "NONE" + if request.data: security = request.data['security'] if security == "SIGNED": timestamp = int(time.time() * 1000) + if self.time_offset > 0: timestamp -= abs(self.time_offset) elif self.time_offset < 0: timestamp += abs(self.time_offset) + request.params['timestamp'] = timestamp - request.params['recvWindow'] = self.recvWindow + request.params['recv_window'] = self.recv_window query = urllib.parse.urlencode(sorted(request.params.items())) - signature = hmac.new(self.secret, query.encode('utf-8'), hashlib.sha256).hexdigest() + signature = hmac.new(self.secret, query.encode( + 'utf-8'), hashlib.sha256).hexdigest() + query += "&signature={}".format(signature) path = request.path + "?" + query + request.path = path request.params = {} request.data = {} + # Add headers headers = { "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json", "X-MBX-APIKEY": self.key } + if security == "SIGNED" or security == "API-KEY": request.headers = headers + return request def connect( @@ -209,9 +216,7 @@ class BinanceRestApi(RestClient): self.key = key self.secret = secret.encode() self.proxy_port = proxy_port - self.proxy_host = proxy_host - - self.host, _ = _split_url(REST_HOST) + self.proxy_host = proxy_host self.init(REST_HOST, proxy_host, proxy_port) self.start(session_number) @@ -222,7 +227,7 @@ class BinanceRestApi(RestClient): self.query_account() self.query_order() self.query_contract() - self.start_userStream() + self.start_user_stream() def query_time(self): """""" @@ -230,6 +235,7 @@ class BinanceRestApi(RestClient): "security": "NONE" } path = '/api/v1/time' + return self.add_request( "GET", path, @@ -331,7 +337,7 @@ class BinanceRestApi(RestClient): print("撤单本地id:", req.orderid, "撤单远端id:", sys_orderid) - def start_userStream(self): + def start_user_stream(self): """""" data = { "security": "API-KEY" @@ -339,26 +345,26 @@ class BinanceRestApi(RestClient): self.add_request( method="POST", path='/api/v1/userDataStream', - callback=self.on_start_userStream, + callback=self.on_start_user_stream, data=data ) def keepalive_userStream(self): """""" - self.keepaliveCount += 1 - if self.keepaliveCount < 1800: + self.keep_alive_count += 1 + if self.keep_alive_count < 1800: return data = { "security": "SIGNED" } params = { - 'listenKey': self.userStreamKey + 'listenKey': self.user_stream_key } self.add_request( - method='PUT', - path='/api/v1/userDataStream', + method='PUT', + path='/api/v1/userDataStream', callback=self.on_keepalive_userStream, - params=params, + params=params, data=data ) @@ -376,7 +382,7 @@ class BinanceRestApi(RestClient): callback=self.on_close_userStream, params=params, data=data - ) + ) def on_query_time(self, data, request): """""" @@ -473,7 +479,7 @@ class BinanceRestApi(RestClient): order.status = STATUS_BINANCE2VT.get(data["status"], None) sys_orderid = data["orderId"] self.order_manager.on_order(order) - self.order_manager.update_orderid_map(order.orderid, sys_orderid) + self.order_manager.update_orderid_map(order.orderid, sys_orderid) def on_send_order_failed(self, status_code: str, request: Request): """ @@ -514,22 +520,22 @@ class BinanceRestApi(RestClient): self.order_manager.on_order(order) - def on_start_userStream(self, data, request): - self.userStreamKey = data['listenKey'] - self.keepaliveCount = 0 - url = WEBSOCKET_TRADE_HOST + self.userStreamKey + def on_start_user_stream(self, data, request): + self.user_stream_key = data['listenKey'] + self.keep_alive_count = 0 + url = WEBSOCKET_TRADE_HOST + self.user_stream_key self.trade_ws_api.connect( - key=self.key, - secret=self.secret, - url=url, - proxy_host=self.proxy_host, + key=self.key, + secret=self.secret, + url=url, + proxy_host=self.proxy_host, proxy_port=self.proxy_port) def on_keepalive_userStream(self, data, request): self.gateway.write_log("交易推送刷新成功") - if self.keepaliveCount >= 1800: - self.keepaliveCount = 0 - self.keepalive_userStream(self.userStreamKey) + if self.keep_alive_count >= 1800: + self.keep_alive_count = 0 + self.keepalive_userStream(self.user_stream_key) def on_close_userStream(self, listenKey): self.gateway.write_log("交易推送关闭") @@ -562,11 +568,11 @@ class BinanceWebsocketApiBase(WebsocketClient): self.path = "" def connect( - self, - key: str, - secret: str, - url: str, - proxy_host: str, + self, + key: str, + secret: str, + url: str, + proxy_host: str, proxy_port: int ): """""" @@ -583,7 +589,7 @@ class BinanceWebsocketApiBase(WebsocketClient): def login(self): """""" params = {"op": "auth"} - # params.update(create_signature(self.key, "GET", self.sign_host, self.path, self.secret)) + # params.update(create_signature(self.key, "GET", self.sign_host, self.path, self.secret)) return self.send_packet(params) def on_login(self, packet): @@ -620,11 +626,11 @@ class BinanceWebsocketApiBase(WebsocketClient): else: self.on_data(packet) - def on_data(self, packet): + def on_data(self, packet): """""" print("data : {}".format(packet)) - def on_error_msg(self, packet): + def on_error_msg(self, packet): """""" msg = packet["err-msg"] if msg == "invalid pong": @@ -635,6 +641,7 @@ class BinanceWebsocketApiBase(WebsocketClient): class BinanceTradeWebsocketApi(BinanceWebsocketApiBase): """""" + def __init__(self, gateway): """""" super().__init__(gateway) @@ -686,7 +693,7 @@ class BinanceTradeWebsocketApi(BinanceWebsocketApiBase): # gateway_name=self.gateway_name # ) # self.on_order(order) - + # push account data change if packet["e"] == "outboundAccountInfo": for account_data in packet["B"]: @@ -712,11 +719,11 @@ class BinanceTradeWebsocketApi(BinanceWebsocketApiBase): order.traded += traded_volume order.status = STATUS_BINANCE2VT.get(data["X"], None) - order.price = float(data["L"]) - order.time = data["O"] - order.symbol = data["s"] + order.price = float(data["L"]) + order.time = data["O"] + order.symbol = data["s"] - print("远端ID:", sys_orderid, "本地ID:", order) + print("远端ID:", sys_orderid, "本地ID:", order) self.order_manager.on_order(order) # Push trade event @@ -733,7 +740,7 @@ class BinanceTradeWebsocketApi(BinanceWebsocketApiBase): volume=float(order.traded), time=datetime.now().strftime("%H:%M:%S"), gateway_name=self.gateway_name, - ) + ) self.gateway.on_trade(trade) def on_account(self, data: dict): @@ -773,13 +780,13 @@ class BinanceDataWebsocketApi(BinanceWebsocketApiBase): datetime=datetime.now(), gateway_name=self.gateway_name, ) - self.ticks[symbol] = tick + self.ticks[symbol] = tick # Subscribe to market depth update self.req_id += 1 req = { "sub": f"market.{symbol}.depth.step0", - "id": str(self.req_id) + "id": str(self.req_id) } self.send_packet(req) @@ -787,7 +794,7 @@ class BinanceDataWebsocketApi(BinanceWebsocketApiBase): self.req_id += 1 req = { "sub": f"market.{symbol}.detail", - "id": str(self.req_id) + "id": str(self.req_id) } self.send_packet(req) @@ -843,13 +850,3 @@ class BinanceDataWebsocketApi(BinanceWebsocketApiBase): if tick.bid_price_1: self.gateway.on_tick(copy(tick)) - - -def _split_url(url): - """ - 将url拆分为host和path - :return: host, path - """ - result = re.match("\w+://([^/]*)(.*)", url) # noqa - if result: - return result.group(1), result.group(2)