[Mod] complete test of BinanceGateway

This commit is contained in:
vn.py 2019-06-12 13:34:32 +08:00
parent 93b4ff40a3
commit e90e1ee104
2 changed files with 194 additions and 366 deletions

View File

@ -2,14 +2,14 @@
Gateway for Binance Crypto Exchange.
"""
import re
import urllib
import json
import hashlib
import hmac
import time
from copy import copy
from datetime import datetime
from enum import Enum
from threading import Lock
from vnpy.api.rest import RestClient, Request
from vnpy.api.websocket import WebsocketClient
@ -31,12 +31,13 @@ from vnpy.trader.object import (
CancelRequest,
SubscribeRequest
)
from vnpy.trader.event import EVENT_TIMER
from vnpy.event import Event
REST_HOST = "https://www.binance.com"
# Account and Order
WEBSOCKET_TRADE_HOST = "wss://stream.binance.com:9443/ws/"
WEBSOCKET_DATA_HOST = "wss://stream.binance.com:9443/stream?streams=" # Market Data
WEBSOCKET_DATA_HOST = "wss://stream.binance.com:9443/stream?streams="
STATUS_BINANCE2VT = {
"NEW": Status.NOTTRADED,
@ -48,8 +49,7 @@ STATUS_BINANCE2VT = {
ORDERTYPE_VT2BINANCE = {
OrderType.LIMIT: "LIMIT",
OrderType.MARKET: "MARKET",
OrderType.STOP: "STOP_LOSS",
OrderType.MARKET: "MARKET"
}
ORDERTYPE_BINANCE2VT = {v: k for k, v in ORDERTYPE_VT2BINANCE.items()}
@ -60,7 +60,12 @@ DIRECTION_VT2BINANCE = {
DIRECTION_BINANCE2VT = {v: k for k, v in DIRECTION_VT2BINANCE.items()}
binance_symbols = set()
class Security(Enum):
NONE = 0
SIGNED = 1
API_KEY = 2
symbol_name_map = {}
@ -83,9 +88,11 @@ class BinanceGateway(BaseGateway):
"""Constructor"""
super().__init__(event_engine, "BINANCE")
self.rest_api = BinanceRestApi(self)
self.trade_ws_api = BinanceTradeWebsocketApi(self)
self.market_ws_api = BinanceDataWebsocketApi(self)
self.rest_api = BinanceRestApi(self)
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def connect(self, setting: dict):
""""""
@ -97,13 +104,11 @@ class BinanceGateway(BaseGateway):
self.rest_api.connect(key, secret, session_number,
proxy_host, proxy_port)
self.trade_ws_api.connect(key, secret, proxy_host, proxy_port)
self.market_ws_api.connect(key, secret, proxy_host, proxy_port)
self.market_ws_api.connect(proxy_host, proxy_port)
def subscribe(self, req: SubscribeRequest):
""""""
self.market_ws_api.subscribe(req)
self.trade_ws_api.subscribe(req)
def send_order(self, req: OrderRequest):
""""""
@ -127,6 +132,10 @@ class BinanceGateway(BaseGateway):
self.trade_ws_api.stop()
self.market_ws_api.stop()
def process_timer_event(self, event: Event):
""""""
self.rest_api.keep_user_stream()
class BinanceRestApi(RestClient):
"""
@ -150,8 +159,9 @@ class BinanceRestApi(RestClient):
self.recv_window = 5000
self.time_offset = 0
self.cancel_requests = {}
self.orders = {}
self.order_count = 1_000_000
self.order_count_lock = Lock()
self.connect_time = 0
def sign(self, request):
"""
@ -163,12 +173,9 @@ class BinanceRestApi(RestClient):
request.params = dict()
path = request.path
security = "NONE"
security = request.data["security"]
if request.data:
security = request.data['security']
if security == "SIGNED":
if security == Security.SIGNED:
timestamp = int(time.time() * 1000)
if self.time_offset > 0:
@ -176,12 +183,12 @@ class BinanceRestApi(RestClient):
elif self.time_offset < 0:
timestamp += abs(self.time_offset)
request.params['timestamp'] = timestamp
request.params['recv_window'] = self.recv_window
request.params["timestamp"] = timestamp
# request.params["recv_window"] = self.recv_window
query = urllib.parse.urlencode(sorted(request.params.items()))
signature = hmac.new(self.secret, query.encode(
'utf-8'), hashlib.sha256).hexdigest()
"utf-8"), hashlib.sha256).hexdigest()
query += "&signature={}".format(signature)
path = request.path + "?" + query
@ -197,7 +204,7 @@ class BinanceRestApi(RestClient):
"X-MBX-APIKEY": self.key
}
if security == "SIGNED" or security == "API-KEY":
if security == Security.SIGNED or security == Security.API_KEY:
request.headers = headers
return request
@ -218,6 +225,10 @@ class BinanceRestApi(RestClient):
self.proxy_port = proxy_port
self.proxy_host = proxy_host
self.connect_time = (
int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count
)
self.init(REST_HOST, proxy_host, proxy_port)
self.start(session_number)
@ -232,9 +243,9 @@ class BinanceRestApi(RestClient):
def query_time(self):
""""""
data = {
"security": "NONE"
"security": Security.NONE
}
path = '/api/v1/time'
path = "/api/v1/time"
return self.add_request(
"GET",
@ -245,9 +256,8 @@ class BinanceRestApi(RestClient):
def query_account(self):
""""""
data = {
"security": "SIGNED"
}
data = {"security": Security.SIGNED}
self.add_request(
method="GET",
path="/api/v3/account",
@ -257,9 +267,8 @@ class BinanceRestApi(RestClient):
def query_order(self):
""""""
data = {
"security": "SIGNED"
}
data = {"security": Security.SIGNED}
self.add_request(
method="GET",
path="/api/v3/openOrders",
@ -270,7 +279,7 @@ class BinanceRestApi(RestClient):
def query_contract(self):
""""""
data = {
"security": "NONE"
"security": Security.NONE
}
self.add_request(
method="GET",
@ -279,17 +288,23 @@ class BinanceRestApi(RestClient):
data=data
)
def _new_order_id(self):
""""""
with self.order_count_lock:
self.order_count += 1
return self.order_count
def send_order(self, req: OrderRequest):
""""""
local_orderid = self.order_manager.new_local_orderid()
orderid = str(self.connect_time + self._new_order_id())
order = req.create_order_data(
local_orderid,
orderid,
self.gateway_name
)
order.time = datetime.now().strftime("%H:%M:%S")
self.gateway.on_order(order)
data = {
"security": "SIGNED"
"security": Security.SIGNED
}
params = {
@ -298,7 +313,9 @@ class BinanceRestApi(RestClient):
"side": DIRECTION_VT2BINANCE[req.direction],
"type": ORDERTYPE_VT2BINANCE[req.type],
"price": str(req.price),
"quantity": str(req.volume)
"quantity": str(req.volume),
"newClientOrderId": orderid,
"newOrderRespType": "ACK"
}
self.add_request(
@ -312,20 +329,19 @@ class BinanceRestApi(RestClient):
on_failed=self.on_send_order_failed
)
self.order_manager.on_order(order)
return order.vt_orderid
def cancel_order(self, req: CancelRequest):
""""""
sys_orderid = self.order_manager.get_sys_orderid(req.orderid)
data = {
"security": "SIGNED"
"security": Security.SIGNED
}
params = {
"symbol": req.symbol,
"orderId": sys_orderid
"origClientOrderId": req.orderid
}
self.add_request(
method="DELETE",
path="/api/v3/order",
@ -335,107 +351,82 @@ class BinanceRestApi(RestClient):
extra=req
)
print("撤单本地id", req.orderid, "撤单远端id", sys_orderid)
def start_user_stream(self):
""""""
data = {
"security": "API-KEY"
"security": Security.API_KEY
}
self.add_request(
method="POST",
path='/api/v1/userDataStream',
path="/api/v1/userDataStream",
callback=self.on_start_user_stream,
data=data
)
def keepalive_userStream(self):
def keep_user_stream(self):
""""""
self.keep_alive_count += 1
if self.keep_alive_count < 1800:
return
data = {
"security": "SIGNED"
}
params = {
'listenKey': self.user_stream_key
}
self.add_request(
method='PUT',
path='/api/v1/userDataStream',
callback=self.on_keepalive_userStream,
params=params,
data=data
)
def close_userStream(self, listenKey):
""""""
data = {
"security": "SIGNED"
"security": Security.SIGNED
}
params = {
'listenKey': listenKey
"listenKey": self.user_stream_key
}
self.add_request(
method='DELETE',
path='/api/v1/userDataStream',
callback=self.on_close_userStream,
method="PUT",
path="/api/v1/userDataStream",
callback=self.on_keep_user_stream,
params=params,
data=data
)
def on_query_time(self, data, request):
""""""
time_now = int(time.time() * 1000)
time_server = int(data["serverTime"])
server_local_time = time.localtime(float(time_server / 1000))
now_local_time = time.localtime(float(time_now / 1000))
self.time_offset = time_now - time_server
server_time = time.strftime("%Y-%m-%d %H:%M:%S", server_local_time)
local_time = time.strftime("%Y-%m-%d %H:%M:%S", now_local_time)
msg = f"服务器时间:{server_time},本机时间:{local_time}"
self.gateway.write_log(msg)
local_time = int(time.time() * 1000)
server_time = int(data["serverTime"])
self.time_offset = local_time - server_time
def on_query_account(self, data, request):
""""""
for account_data in data["balances"]:
account = AccountData(
accountid=account_data["asset"],
balance=float(account_data["free"]),
balance=float(account_data["free"]) + float(account_data["locked"]),
frozen=float(account_data["locked"]),
gateway_name=self.gateway_name
)
self.gateway.on_account(account)
if account.balance:
self.gateway.on_account(account)
self.gateway.write_log("账户资金查询成功")
def on_query_order(self, data, request):
""""""
for d in data:
sys_orderid = d["orderId"]
local_orderid = self.order_manager.get_local_orderid(sys_orderid)
direction = DIRECTION_BINANCE2VT[d["side"]]
order_type = ORDERTYPE_BINANCE2VT[d["type"]]
dt = datetime.fromtimestamp(d["time"] / 1000)
time = dt.strftime("%Y-%m-%d %H:%M:%S")
order = OrderData(
orderid=local_orderid,
orderid=d["clientOrderId"],
symbol=d["symbol"],
exchange=Exchange.BINANCE,
price=float(d["price"]),
volume=float(d["origQty"]),
type=order_type,
direction=direction,
type=ORDERTYPE_BINANCE2VT[d["type"]],
direction=DIRECTION_BINANCE2VT[d["side"]],
traded=float(d["executedQty"]),
status=STATUS_BINANCE2VT.get(d["status"], None),
time=time,
gateway_name=self.gateway_name,
)
print("委托查询--远端id", sys_orderid, "本地Id", local_orderid)
self.order_manager.on_order(order)
self.gateway.on_order(order)
self.gateway.write_log("委托信息查询成功")
@ -445,13 +436,16 @@ class BinanceRestApi(RestClient):
base_currency = d["baseAsset"]
quote_currency = d["quoteAsset"]
name = f"{base_currency.upper()}/{quote_currency.upper()}"
pricetick = 0
min_volume = 0
for f in d["filters"]:
if f["filterType"] == "PRICE_FILTER":
pricetick = f["tickSize"]
if f["filterType"] == "LOT_SIZE":
elif f["filterType"] == "LOT_SIZE":
min_volume = f["stepSize"]
contract = ContractData(
symbol=d["symbol"],
exchange=Exchange.BINANCE,
@ -464,22 +458,13 @@ class BinanceRestApi(RestClient):
)
self.gateway.on_contract(contract)
binance_symbols.add(contract.symbol)
symbol_name_map[contract.symbol] = contract.name
self.gateway.write_log("合约信息查询成功")
def on_send_order(self, data, request):
""""""
order = request.extra
if self.check_error(data, "委托"):
order.status = Status.REJECTED
self.order_manager.on_order(order)
return
order.status = STATUS_BINANCE2VT.get(data["status"], None)
sys_orderid = data["orderId"]
self.order_manager.on_order(order)
self.order_manager.update_orderid_map(order.orderid, sys_orderid)
pass
def on_send_order_failed(self, status_code: str, request: Request):
"""
@ -508,345 +493,187 @@ class BinanceRestApi(RestClient):
def on_cancel_order(self, data, request):
""""""
cancel_request = request.extra
local_orderid = cancel_request.orderid
order = self.order_manager.get_order_with_local_orderid(local_orderid)
if self.check_error(data, "撤单"):
order.status = Status.REJECTED
else:
order.status = Status.CANCELLED
self.gateway.write_log(f"委托撤单成功:{order.orderid}")
self.order_manager.on_order(order)
pass
def on_start_user_stream(self, data, request):
self.user_stream_key = data['listenKey']
""""""
self.user_stream_key = data["listenKey"]
self.keep_alive_count = 0
url = WEBSOCKET_TRADE_HOST + self.user_stream_key
self.trade_ws_api.connect(
key=self.key,
secret=self.secret,
url=url,
proxy_host=self.proxy_host,
proxy_port=self.proxy_port)
def on_keepalive_userStream(self, data, request):
self.gateway.write_log("交易推送刷新成功")
if self.keep_alive_count >= 1800:
self.keep_alive_count = 0
self.keepalive_userStream(self.user_stream_key)
self.trade_ws_api.connect(url, self.proxy_host, self.proxy_port)
def on_close_userStream(self, listenKey):
self.gateway.write_log("交易推送关闭")
def check_error(self, data: dict, func: str = ""):
def on_keep_user_stream(self, data, request):
""""""
if data["status"] != "error":
return False
error_code = data["err-code"]
error_msg = data["err-msg"]
self.gateway.write_log(f"{func}请求出错,代码:{error_code},信息:{error_msg}")
return True
pass
class BinanceWebsocketApiBase(WebsocketClient):
class BinanceTradeWebsocketApi(WebsocketClient):
""""""
def __init__(self, gateway):
""""""
super(BinanceWebsocketApiBase, self).__init__()
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.key = ""
self.secret = ""
# self.sign_host = ""
self.path = ""
def connect(
self,
key: str,
secret: str,
url: str,
proxy_host: str,
proxy_port: int
):
def connect(self, url, proxy_host, proxy_port):
""""""
self.key = key
self.secret = secret
# host, path = _split_url(url)
# self.sign_host = host
# self.path = path
self.init(url, proxy_host, proxy_port)
self.start()
def login(self):
""""""
params = {"op": "auth"}
# params.update(create_signature(self.key, "GET", self.sign_host, self.path, self.secret))
return self.send_packet(params)
def on_login(self, packet):
""""""
pass
@staticmethod
def unpack_data(data):
""""""
print("==============unpack_data============")
print(data)
return json.loads(data)
# return json.loads(zlib.decompress(data, zlib.Z_BEST_COMPRESSION))
def on_packet(self, packet):
""""""
print("=============on_packet=============")
print("event type:" + packet["e"])
print(packet)
# if packet["e"] == "executionReport":
if "ping" in packet:
req = {"pong": packet["ping"]}
self.send_packet(req)
elif "op" in packet and packet["op"] == "ping":
req = {
"op": "pong",
"ts": packet["ts"]
}
self.send_packet(req)
elif "err-msg" in packet:
return self.on_error_msg(packet)
elif "op" in packet and packet["op"] == "auth":
return self.on_login()
else:
self.on_data(packet)
def on_data(self, packet):
""""""
print("data : {}".format(packet))
def on_error_msg(self, packet):
""""""
msg = packet["err-msg"]
if msg == "invalid pong":
return
self.gateway.write_log(packet["err-msg"])
class BinanceTradeWebsocketApi(BinanceWebsocketApiBase):
""""""
def __init__(self, gateway):
""""""
super().__init__(gateway)
self.order_manager = gateway.order_manager
self.order_manager.push_data_callback = self.on_data
self.req_id = 0
def connect(self, key, secret, url, proxy_host, proxy_port):
""""""
super().connect(key, secret, url, proxy_host, proxy_port)
self.gateway.write_log("交易Websocket API连接成功")
self.gateway.rest_api.keepalive_userStream()
def subscribe(self, req: SubscribeRequest):
""""""
self.req_id += 1
req = {
"op": "sub",
"cid": str(self.req_id),
"topic": f"orders.{req.symbol}"
}
self.send_packet(req)
def on_connected(self):
""""""
pass
self.gateway.write_log("交易Websocket API连接成功")
def on_login(self):
def on_packet(self, packet: dict): # type: (dict)->None
""""""
pass
def on_data(self, packet): # type: (dict)->None
""""""
print("==========on_data1=========")
# push order data change
if packet["e"] == "executionReport":
if packet["e"] == "outboundAccountInfo":
self.on_account(packet)
else:
self.on_order(packet)
# order = OrderData(
# symbol=packet["s"],
# exchange=Exchange.BINANCE,
# orderid=packet["i"],
# status=STATUS_BINANCE2VT.get(packet["X"], None),
# traded=float(packet["Z"]),
# price=float(packet["L"]),
# time=packet["O"],
# gateway_name=self.gateway_name
# )
# self.on_order(order)
# push account data change
if packet["e"] == "outboundAccountInfo":
for account_data in packet["B"]:
account = AccountData(
accountid=account_data["a"],
balance=float(account_data["f"]),
frozen=float(account_data["l"]),
gateway_name=self.gateway_name
)
self.on_account(account)
def on_order(self, data: dict):
def on_account(self, packet):
""""""
sys_orderid = str(data["i"])
for d in packet["B"]:
account = AccountData(
accountid=d["a"],
balance=float(d["f"]) + float(d["l"]),
frozen=float(d["l"]),
gateway_name=self.gateway_name
)
if account.balance:
self.gateway.on_account(account)
order = self.order_manager.get_order_with_sys_orderid(sys_orderid)
if not order:
self.order_manager.add_push_data(sys_orderid, data)
return
def on_order(self, packet: dict):
""""""
dt = datetime.fromtimestamp(packet["O"] / 1000)
time = dt.strftime("%Y-%m-%d %H:%M:%S")
traded_volume = float(data["Z"])
# Push order event
if packet["C"] == "null":
orderid = packet["c"]
else:
orderid = packet["C"]
order.traded += traded_volume
order.status = STATUS_BINANCE2VT.get(data["X"], None)
order.price = float(data["L"])
order.time = data["O"]
order.symbol = data["s"]
order = OrderData(
symbol=packet["s"],
exchange=Exchange.BINANCE,
orderid=orderid,
type=ORDERTYPE_BINANCE2VT[packet["o"]],
direction=DIRECTION_BINANCE2VT[packet["S"]],
price=float(packet["p"]),
volume=float(packet["q"]),
traded=float(packet["z"]),
status=STATUS_BINANCE2VT[packet["X"]],
time=time,
gateway_name=self.gateway_name
)
print("远端ID", sys_orderid, "本地ID", order)
self.order_manager.on_order(order)
self.gateway.on_order(order)
# Push trade event
traded_volume = data.traded
if not traded_volume:
trade_volume = float(packet["l"])
if not trade_volume:
return
trade_dt = datetime.fromtimestamp(packet["T"] / 1000)
trade_time = trade_dt.strftime("%Y-%m-%d %H:%M:%S")
trade = TradeData(
symbol=order.symbol,
exchange=Exchange.BINANCE,
exchange=order.exchange,
orderid=order.orderid,
tradeid=packet["t"],
direction=order.direction,
price=float(order.price),
volume=float(order.traded),
time=datetime.now().strftime("%H:%M:%S"),
price=float(packet["L"]),
volume=trade_volume,
time=trade_time,
gateway_name=self.gateway_name,
)
self.gateway.on_trade(trade)
def on_account(self, data: dict):
""""""
self.gateway.on_account(data)
class BinanceDataWebsocketApi(BinanceWebsocketApiBase):
class BinanceDataWebsocketApi(WebsocketClient):
""""""
def __init__(self, gateway):
""""""
super().__init__(gateway)
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.req_id = 0
self.ticks = {}
def connect(self, key: str, secret: str, proxy_host: str, proxy_port: int):
def connect(self, proxy_host: str, proxy_port: int):
""""""
super().connect(key, secret, WEBSOCKET_DATA_HOST, proxy_host, proxy_port)
self.gateway.write_log("行情Websocket API连接成功")
self.proxy_host = proxy_host
self.proxy_port = proxy_port
def on_connected(self):
""""""
pass
self.gateway.write_log("行情Websocket API连接刷新")
def subscribe(self, req: SubscribeRequest):
""""""
symbol = req.symbol
print("============BinanceDataWebsocketApi.subscribe===========")
print("symbol" + symbol)
# Create tick data buffer
if req.symbol not in symbol_name_map:
self.gateway.write_log(f"找不到该合约代码{req.symbol}")
return
# Create tick buf data
tick = TickData(
symbol=symbol,
name=symbol_name_map.get(symbol, ""),
symbol=req.symbol,
name=symbol_name_map.get(req.symbol, ""),
exchange=Exchange.BINANCE,
datetime=datetime.now(),
gateway_name=self.gateway_name,
)
self.ticks[symbol] = tick
self.ticks[req.symbol.lower()] = tick
# Subscribe to market depth update
self.req_id += 1
req = {
"sub": f"market.{symbol}.depth.step0",
"id": str(self.req_id)
}
self.send_packet(req)
# Close previous connection
if self._active:
self.stop()
self.join()
# Subscribe to market detail update
self.req_id += 1
req = {
"sub": f"market.{symbol}.detail",
"id": str(self.req_id)
}
self.send_packet(req)
# Create new connection
channels = []
for ws_symbol in self.ticks.keys():
channels.append(ws_symbol + "@ticker")
channels.append(ws_symbol + "@depth5")
def on_data(self, packet): # type: (dict)->None
url = WEBSOCKET_DATA_HOST + "/".join(channels)
self.init(url, self.proxy_host, self.proxy_port)
self.start()
def on_packet(self, packet):
""""""
print("===================on_data=====================")
print(packet)
stream = packet["stream"]
data = packet["data"]
channel = packet.get("ch", None)
if channel:
if "depth.step" in channel:
self.on_market_depth(packet)
elif "detail" in channel:
self.on_market_detail(packet)
elif "err-code" in packet:
code = packet["err-code"]
msg = packet["err-msg"]
self.gateway.write_log(f"错误代码:{code}, 错误信息:{msg}")
def on_market_depth(self, data):
"""行情深度推送 """
symbol = data["ch"].split(".")[1]
symbol, channel = stream.split("@")
tick = self.ticks[symbol]
tick.datetime = datetime.fromtimestamp(data["ts"] / 1000)
bids = data["tick"]["bids"]
for n in range(5):
price, volume = bids[n]
tick.__setattr__("bid_price_" + str(n + 1), float(price))
tick.__setattr__("bid_volume_" + str(n + 1), float(volume))
if channel == "ticker":
tick.volume = float(data['v'])
tick.open_price = float(data['o'])
tick.high_price = float(data['h'])
tick.low_price = float(data['l'])
tick.last_price = float(data['c'])
tick.datetime = datetime.fromtimestamp(float(data['E']) / 1000)
else:
bids = data["bids"]
for n in range(5):
price, volume = bids[n]
tick.__setattr__("bid_price_" + str(n + 1), float(price))
tick.__setattr__("bid_volume_" + str(n + 1), float(volume))
asks = data["tick"]["asks"]
for n in range(5):
price, volume = asks[n]
tick.__setattr__("ask_price_" + str(n + 1), float(price))
tick.__setattr__("ask_volume_" + str(n + 1), float(volume))
asks = data["asks"]
for n in range(5):
price, volume = asks[n]
tick.__setattr__("ask_price_" + str(n + 1), float(price))
tick.__setattr__("ask_volume_" + str(n + 1), float(volume))
if tick.last_price:
self.gateway.on_tick(copy(tick))
def on_market_detail(self, data):
"""市场细节推送"""
symbol = data["ch"].split(".")[1]
tick = self.ticks[symbol]
tick.datetime = datetime.fromtimestamp(data["ts"] / 1000)
tick_data = data["tick"]
tick.open_price = float(tick_data["open"])
tick.high_price = float(tick_data["high"])
tick.low_price = float(tick_data["low"])
tick.last_price = float(tick_data["close"])
tick.volume = float(tick_data["vol"])
if tick.bid_price_1:
self.gateway.on_tick(copy(tick))

View File

@ -233,6 +233,7 @@ class BitmexRestApi(RestClient):
self.gateway.write_log("REST API启动成功")
def _new_order_id(self):
""""""
with self.order_count_lock:
self.order_count += 1
return self.order_count