Merge pull request #1821 from vnpy/binance

Binance
This commit is contained in:
vn.py 2019-06-12 13:36:24 +08:00 committed by GitHub
commit 01e7a54f32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 689 additions and 6 deletions

View File

@ -3,11 +3,12 @@ from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
from vnpy.gateway.binance import BinanceGateway
# from vnpy.gateway.bitmex import BitmexGateway
# from vnpy.gateway.futu import FutuGateway
# from vnpy.gateway.ib import IbGateway
# from vnpy.gateway.ctp import CtpGateway
# # from vnpy.gateway.ctptest import CtptestGateway
# from vnpy.gateway.ctptest import CtptestGateway
# from vnpy.gateway.femas import FemasGateway
# from vnpy.gateway.tiger import TigerGateway
# from vnpy.gateway.oes import OesGateway
@ -17,8 +18,7 @@ from vnpy.trader.ui import MainWindow, create_qapp
# from vnpy.gateway.onetoken import OnetokenGateway
# from vnpy.gateway.okexf import OkexfGateway
# from vnpy.gateway.xtp import XtpGateway
from vnpy.gateway.hbdm import HbdmGateway
# from vnpy.gateway.tap import TapGateway
# from vnpy.gateway.hbdm import HbdmGateway
# from vnpy.app.cta_strategy import CtaStrategyApp
# from vnpy.app.csv_loader import CsvLoaderApp
@ -35,9 +35,11 @@ def main():
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_gateway(BinanceGateway)
# main_engine.add_gateway(XtpGateway)
# main_engine.add_gateway(CtpGateway)
# # main_engine.add_gateway(CtptestGateway)
# main_engine.add_gateway(CtptestGateway)
# main_engine.add_gateway(FemasGateway)
# main_engine.add_gateway(IbGateway)
# main_engine.add_gateway(FutuGateway)
@ -49,8 +51,7 @@ def main():
# main_engine.add_gateway(BitfinexGateway)
# main_engine.add_gateway(OnetokenGateway)
# main_engine.add_gateway(OkexfGateway)
main_engine.add_gateway(HbdmGateway)
# main_engine.add_gateway(TapGateway)
# main_engine.add_gateway(HbdmGateway)
# main_engine.add_app(CtaStrategyApp)
# main_engine.add_app(CtaBacktesterApp)

View File

@ -0,0 +1 @@
from .binance_gateway import BinanceGateway

View File

@ -0,0 +1,679 @@
"""
Gateway for Binance Crypto Exchange.
"""
import urllib
import hashlib
import hmac
import time
from copy import copy
from datetime import datetime
from enum import Enum
from threading import Lock
from vnpy.api.rest import RestClient, Request
from vnpy.api.websocket import WebsocketClient
from vnpy.trader.constant import (
Direction,
Exchange,
Product,
Status,
OrderType
)
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import (
TickData,
OrderData,
TradeData,
AccountData,
ContractData,
OrderRequest,
CancelRequest,
SubscribeRequest
)
from vnpy.trader.event import EVENT_TIMER
from vnpy.event import Event
REST_HOST = "https://www.binance.com"
WEBSOCKET_TRADE_HOST = "wss://stream.binance.com:9443/ws/"
WEBSOCKET_DATA_HOST = "wss://stream.binance.com:9443/stream?streams="
STATUS_BINANCE2VT = {
"NEW": Status.NOTTRADED,
"PARTIALLY_FILLED": Status.PARTTRADED,
"FILLED": Status.ALLTRADED,
"CANCELED": Status.CANCELLED,
"REJECTED": Status.REJECTED
}
ORDERTYPE_VT2BINANCE = {
OrderType.LIMIT: "LIMIT",
OrderType.MARKET: "MARKET"
}
ORDERTYPE_BINANCE2VT = {v: k for k, v in ORDERTYPE_VT2BINANCE.items()}
DIRECTION_VT2BINANCE = {
Direction.LONG: "BUY",
Direction.SHORT: "SELL"
}
DIRECTION_BINANCE2VT = {v: k for k, v in DIRECTION_VT2BINANCE.items()}
class Security(Enum):
NONE = 0
SIGNED = 1
API_KEY = 2
symbol_name_map = {}
class BinanceGateway(BaseGateway):
"""
VN Trader Gateway for Binance connection.
"""
default_setting = {
"key": "",
"secret": "",
"session_number": 3,
"proxy_host": "",
"proxy_port": 0,
}
exchanges = [Exchange.BINANCE]
def __init__(self, event_engine):
"""Constructor"""
super().__init__(event_engine, "BINANCE")
self.trade_ws_api = BinanceTradeWebsocketApi(self)
self.market_ws_api = BinanceDataWebsocketApi(self)
self.rest_api = BinanceRestApi(self)
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def connect(self, setting: dict):
""""""
key = setting["key"]
secret = setting["secret"]
session_number = setting["session_number"]
proxy_host = setting["proxy_host"]
proxy_port = setting["proxy_port"]
self.rest_api.connect(key, secret, session_number,
proxy_host, proxy_port)
self.market_ws_api.connect(proxy_host, proxy_port)
def subscribe(self, req: SubscribeRequest):
""""""
self.market_ws_api.subscribe(req)
def send_order(self, req: OrderRequest):
""""""
return self.rest_api.send_order(req)
def cancel_order(self, req: CancelRequest):
""""""
self.rest_api.cancel_order(req)
def query_account(self):
""""""
pass
def query_position(self):
""""""
pass
def close(self):
""""""
self.rest_api.stop()
self.trade_ws_api.stop()
self.market_ws_api.stop()
def process_timer_event(self, event: Event):
""""""
self.rest_api.keep_user_stream()
class BinanceRestApi(RestClient):
"""
BINANCE REST API
"""
def __init__(self, gateway: BinanceGateway):
""""""
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.trade_ws_api = self.gateway.trade_ws_api
self.key = ""
self.secret = ""
self.user_stream_key = ""
self.keep_alive_count = 0
self.recv_window = 5000
self.time_offset = 0
self.order_count = 1_000_000
self.order_count_lock = Lock()
self.connect_time = 0
def sign(self, request):
"""
Generate BINANCE signature.
"""
if request.params:
path = request.path + "?" + urllib.parse.urlencode(request.params)
else:
request.params = dict()
path = request.path
security = request.data["security"]
if security == Security.SIGNED:
timestamp = int(time.time() * 1000)
if self.time_offset > 0:
timestamp -= abs(self.time_offset)
elif self.time_offset < 0:
timestamp += abs(self.time_offset)
request.params["timestamp"] = timestamp
# request.params["recv_window"] = self.recv_window
query = urllib.parse.urlencode(sorted(request.params.items()))
signature = hmac.new(self.secret, query.encode(
"utf-8"), hashlib.sha256).hexdigest()
query += "&signature={}".format(signature)
path = request.path + "?" + query
request.path = path
request.params = {}
request.data = {}
# Add headers
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
"X-MBX-APIKEY": self.key
}
if security == Security.SIGNED or security == Security.API_KEY:
request.headers = headers
return request
def connect(
self,
key: str,
secret: str,
session_number: int,
proxy_host: str,
proxy_port: int
):
"""
Initialize connection to REST server.
"""
self.key = key
self.secret = secret.encode()
self.proxy_port = proxy_port
self.proxy_host = proxy_host
self.connect_time = (
int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count
)
self.init(REST_HOST, proxy_host, proxy_port)
self.start(session_number)
self.gateway.write_log("REST API启动成功")
self.query_time()
self.query_account()
self.query_order()
self.query_contract()
self.start_user_stream()
def query_time(self):
""""""
data = {
"security": Security.NONE
}
path = "/api/v1/time"
return self.add_request(
"GET",
path,
callback=self.on_query_time,
data=data
)
def query_account(self):
""""""
data = {"security": Security.SIGNED}
self.add_request(
method="GET",
path="/api/v3/account",
callback=self.on_query_account,
data=data
)
def query_order(self):
""""""
data = {"security": Security.SIGNED}
self.add_request(
method="GET",
path="/api/v3/openOrders",
callback=self.on_query_order,
data=data
)
def query_contract(self):
""""""
data = {
"security": Security.NONE
}
self.add_request(
method="GET",
path="/api/v1/exchangeInfo",
callback=self.on_query_contract,
data=data
)
def _new_order_id(self):
""""""
with self.order_count_lock:
self.order_count += 1
return self.order_count
def send_order(self, req: OrderRequest):
""""""
orderid = str(self.connect_time + self._new_order_id())
order = req.create_order_data(
orderid,
self.gateway_name
)
self.gateway.on_order(order)
data = {
"security": Security.SIGNED
}
params = {
"symbol": req.symbol,
"timeInForce": "GTC",
"side": DIRECTION_VT2BINANCE[req.direction],
"type": ORDERTYPE_VT2BINANCE[req.type],
"price": str(req.price),
"quantity": str(req.volume),
"newClientOrderId": orderid,
"newOrderRespType": "ACK"
}
self.add_request(
method="POST",
path="/api/v3/order",
callback=self.on_send_order,
data=data,
params=params,
extra=order,
on_error=self.on_send_order_error,
on_failed=self.on_send_order_failed
)
return order.vt_orderid
def cancel_order(self, req: CancelRequest):
""""""
data = {
"security": Security.SIGNED
}
params = {
"symbol": req.symbol,
"origClientOrderId": req.orderid
}
self.add_request(
method="DELETE",
path="/api/v3/order",
callback=self.on_cancel_order,
params=params,
data=data,
extra=req
)
def start_user_stream(self):
""""""
data = {
"security": Security.API_KEY
}
self.add_request(
method="POST",
path="/api/v1/userDataStream",
callback=self.on_start_user_stream,
data=data
)
def keep_user_stream(self):
""""""
self.keep_alive_count += 1
if self.keep_alive_count < 1800:
return
data = {
"security": Security.SIGNED
}
params = {
"listenKey": self.user_stream_key
}
self.add_request(
method="PUT",
path="/api/v1/userDataStream",
callback=self.on_keep_user_stream,
params=params,
data=data
)
def on_query_time(self, data, request):
""""""
local_time = int(time.time() * 1000)
server_time = int(data["serverTime"])
self.time_offset = local_time - server_time
def on_query_account(self, data, request):
""""""
for account_data in data["balances"]:
account = AccountData(
accountid=account_data["asset"],
balance=float(account_data["free"]) + float(account_data["locked"]),
frozen=float(account_data["locked"]),
gateway_name=self.gateway_name
)
if account.balance:
self.gateway.on_account(account)
self.gateway.write_log("账户资金查询成功")
def on_query_order(self, data, request):
""""""
for d in data:
dt = datetime.fromtimestamp(d["time"] / 1000)
time = dt.strftime("%Y-%m-%d %H:%M:%S")
order = OrderData(
orderid=d["clientOrderId"],
symbol=d["symbol"],
exchange=Exchange.BINANCE,
price=float(d["price"]),
volume=float(d["origQty"]),
type=ORDERTYPE_BINANCE2VT[d["type"]],
direction=DIRECTION_BINANCE2VT[d["side"]],
traded=float(d["executedQty"]),
status=STATUS_BINANCE2VT.get(d["status"], None),
time=time,
gateway_name=self.gateway_name,
)
self.gateway.on_order(order)
self.gateway.write_log("委托信息查询成功")
def on_query_contract(self, data, request):
""""""
for d in data["symbols"]:
base_currency = d["baseAsset"]
quote_currency = d["quoteAsset"]
name = f"{base_currency.upper()}/{quote_currency.upper()}"
pricetick = 0
min_volume = 0
for f in d["filters"]:
if f["filterType"] == "PRICE_FILTER":
pricetick = f["tickSize"]
elif f["filterType"] == "LOT_SIZE":
min_volume = f["stepSize"]
contract = ContractData(
symbol=d["symbol"],
exchange=Exchange.BINANCE,
name=name,
pricetick=pricetick,
size=1,
min_volume=min_volume,
product=Product.SPOT,
gateway_name=self.gateway_name,
)
self.gateway.on_contract(contract)
symbol_name_map[contract.symbol] = contract.name
self.gateway.write_log("合约信息查询成功")
def on_send_order(self, data, request):
""""""
pass
def on_send_order_failed(self, status_code: str, request: Request):
"""
Callback when sending order failed on server.
"""
order = request.extra
order.status = Status.REJECTED
self.gateway.on_order(order)
msg = f"委托失败,状态码:{status_code},信息:{request.response.text}"
self.gateway.write_log(msg)
def on_send_order_error(
self, exception_type: type, exception_value: Exception, tb, request: Request
):
"""
Callback when sending order caused exception.
"""
order = request.extra
order.status = Status.REJECTED
self.gateway.on_order(order)
# Record exception if not ConnectionError
if not issubclass(exception_type, ConnectionError):
self.on_error(exception_type, exception_value, tb, request)
def on_cancel_order(self, data, request):
""""""
pass
def on_start_user_stream(self, data, request):
""""""
self.user_stream_key = data["listenKey"]
self.keep_alive_count = 0
url = WEBSOCKET_TRADE_HOST + self.user_stream_key
self.trade_ws_api.connect(url, self.proxy_host, self.proxy_port)
def on_keep_user_stream(self, data, request):
""""""
pass
class BinanceTradeWebsocketApi(WebsocketClient):
""""""
def __init__(self, gateway):
""""""
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
def connect(self, url, proxy_host, proxy_port):
""""""
self.init(url, proxy_host, proxy_port)
self.start()
def on_connected(self):
""""""
self.gateway.write_log("交易Websocket API连接成功")
def on_packet(self, packet: dict): # type: (dict)->None
""""""
if packet["e"] == "outboundAccountInfo":
self.on_account(packet)
else:
self.on_order(packet)
def on_account(self, packet):
""""""
for d in packet["B"]:
account = AccountData(
accountid=d["a"],
balance=float(d["f"]) + float(d["l"]),
frozen=float(d["l"]),
gateway_name=self.gateway_name
)
if account.balance:
self.gateway.on_account(account)
def on_order(self, packet: dict):
""""""
dt = datetime.fromtimestamp(packet["O"] / 1000)
time = dt.strftime("%Y-%m-%d %H:%M:%S")
if packet["C"] == "null":
orderid = packet["c"]
else:
orderid = packet["C"]
order = OrderData(
symbol=packet["s"],
exchange=Exchange.BINANCE,
orderid=orderid,
type=ORDERTYPE_BINANCE2VT[packet["o"]],
direction=DIRECTION_BINANCE2VT[packet["S"]],
price=float(packet["p"]),
volume=float(packet["q"]),
traded=float(packet["z"]),
status=STATUS_BINANCE2VT[packet["X"]],
time=time,
gateway_name=self.gateway_name
)
self.gateway.on_order(order)
# Push trade event
trade_volume = float(packet["l"])
if not trade_volume:
return
trade_dt = datetime.fromtimestamp(packet["T"] / 1000)
trade_time = trade_dt.strftime("%Y-%m-%d %H:%M:%S")
trade = TradeData(
symbol=order.symbol,
exchange=order.exchange,
orderid=order.orderid,
tradeid=packet["t"],
direction=order.direction,
price=float(packet["L"]),
volume=trade_volume,
time=trade_time,
gateway_name=self.gateway_name,
)
self.gateway.on_trade(trade)
class BinanceDataWebsocketApi(WebsocketClient):
""""""
def __init__(self, gateway):
""""""
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.ticks = {}
def connect(self, proxy_host: str, proxy_port: int):
""""""
self.proxy_host = proxy_host
self.proxy_port = proxy_port
def on_connected(self):
""""""
self.gateway.write_log("行情Websocket API连接刷新")
def subscribe(self, req: SubscribeRequest):
""""""
if req.symbol not in symbol_name_map:
self.gateway.write_log(f"找不到该合约代码{req.symbol}")
return
# Create tick buf data
tick = TickData(
symbol=req.symbol,
name=symbol_name_map.get(req.symbol, ""),
exchange=Exchange.BINANCE,
datetime=datetime.now(),
gateway_name=self.gateway_name,
)
self.ticks[req.symbol.lower()] = tick
# Close previous connection
if self._active:
self.stop()
self.join()
# Create new connection
channels = []
for ws_symbol in self.ticks.keys():
channels.append(ws_symbol + "@ticker")
channels.append(ws_symbol + "@depth5")
url = WEBSOCKET_DATA_HOST + "/".join(channels)
self.init(url, self.proxy_host, self.proxy_port)
self.start()
def on_packet(self, packet):
""""""
stream = packet["stream"]
data = packet["data"]
symbol, channel = stream.split("@")
tick = self.ticks[symbol]
if channel == "ticker":
tick.volume = float(data['v'])
tick.open_price = float(data['o'])
tick.high_price = float(data['h'])
tick.low_price = float(data['l'])
tick.last_price = float(data['c'])
tick.datetime = datetime.fromtimestamp(float(data['E']) / 1000)
else:
bids = data["bids"]
for n in range(5):
price, volume = bids[n]
tick.__setattr__("bid_price_" + str(n + 1), float(price))
tick.__setattr__("bid_volume_" + str(n + 1), float(volume))
asks = data["asks"]
for n in range(5):
price, volume = asks[n]
tick.__setattr__("ask_price_" + str(n + 1), float(price))
tick.__setattr__("ask_volume_" + str(n + 1), float(volume))
if tick.last_price:
self.gateway.on_tick(copy(tick))

View File

@ -233,6 +233,7 @@ class BitmexRestApi(RestClient):
self.gateway.write_log("REST API启动成功")
def _new_order_id(self):
""""""
with self.order_count_lock:
self.order_count += 1
return self.order_count

View File

@ -114,6 +114,7 @@ class Exchange(Enum):
OKEX = "OKEX"
HUOBI = "HUOBI"
BITFINEX = "BITFINEX"
BINANCE = "BINANCE"
class Currency(Enum):