add binance api

This commit is contained in:
msincenselee 2018-05-29 11:17:16 +08:00
parent 2335e4d47f
commit 4c83eca3fe
13 changed files with 2799 additions and 0 deletions

View File

@ -0,0 +1,6 @@
# encoding: UTF-8
from vnpy.api.binance.vnbinance import BinanceSpotApi
from vnpy.api.binance.client import Client
from vnpy.api.binance.websockets import BinanceSocketManager
from vnpy.api.binance.exceptions import BinanceAPIException, BinanceRequestException, BinanceWithdrawException

1443
vnpy/api/binance/client.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,80 @@
# encoding: UTF-8
class BinanceAPIException(Exception):
LISTENKEY_NOT_EXIST = '-1125'
def __init__(self, response):
json_res = response.json()
self.status_code = response.status_code
self.response = response
self.code = json_res['code']
self.message = json_res['msg']
self.request = getattr(response, 'request', None)
def __str__(self): # pragma: no cover
return 'APIError(code=%s): %s' % (self.code, self.message)
class BinanceRequestException(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
return 'BinanceRequestException: %s' % self.message
class BinanceOrderException(Exception):
def __init__(self, code, message):
self.code = code
self.message = message
def __str__(self):
return 'BinanceOrderException(code=%s): %s' % (self.code, self.message)
class BinanceOrderMinAmountException(BinanceOrderException):
def __init__(self, value):
message = "Amount must be a multiple of %s" % value
super(BinanceOrderMinAmountException, self).__init__(-1013, message)
class BinanceOrderMinPriceException(BinanceOrderException):
def __init__(self, value):
message = "Price must be at least %s" % value
super(BinanceOrderMinPriceException, self).__init__(-1013, message)
class BinanceOrderMinTotalException(BinanceOrderException):
def __init__(self, value):
message = "Total must be at least %s" % value
super(BinanceOrderMinTotalException, self).__init__(-1013, message)
class BinanceOrderUnknownSymbolException(BinanceOrderException):
def __init__(self, value):
message = "Unknown symbol %s" % value
super(BinanceOrderUnknownSymbolException, self).__init__(-1013, message)
class BinanceOrderInactiveSymbolException(BinanceOrderException):
def __init__(self, value):
message = "Attempting to trade an inactive symbol %s" % value
super(BinanceOrderInactiveSymbolException, self).__init__(-1013, message)
class BinanceWithdrawException(Exception):
def __init__(self, message):
if message == u'参数异常':
message = 'Withdraw to this address through the website first'
self.message = message
def __str__(self):
return 'BinanceWithdrawException: %s' % self.message

150
vnpy/api/binance/readme.txt Normal file
View File

@ -0,0 +1,150 @@
参考了 https://github.com/wudian/marketMaker 的代码
表示感谢!
以及 https://github.com/sammchardy/python-binance 的代码
API 文档
https://github.com/binance-exchange/binance-official-api-docs/blob/master/rest-api.md
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md
------------------------------------------------------------------------
准备采用的方案
1、Individual Symbol Ticker Streams
24hr Ticker statistics for a single symbol pushed every second
Stream Name: <symbol>@ticker
Payload:
{
"e": "24hrTicker", // Event type
"E": 123456789, // Event time
"s": "BNBBTC", // Symbol
"p": "0.0015", // Price change
"P": "250.00", // Price change percent
"w": "0.0018", // Weighted average price
"x": "0.0009", // Previous day's close price
"c": "0.0025", // Current day's close price
"Q": "10", // Close trade's quantity
"b": "0.0024", // Best bid price
"B": "10", // Bid bid quantity
"a": "0.0026", // Best ask price
"A": "100", // Best ask quantity
"o": "0.0010", // Open price
"h": "0.0025", // High price
"l": "0.0010", // Low price
"v": "10000", // Total traded base asset volume
"q": "18", // Total traded quote asset volume
"O": 0, // Statistics open time
"C": 86400000, // Statistics close time
"F": 0, // First trade ID
"L": 18150, // Last trade Id
"n": 18151 // Total number of trades
}
2、All Market Tickers Stream
24hr Ticker statistics for all symbols in an array pushed every second
Stream Name: !ticker@arr
Payload:
[
{
// Same as <symbol>@ticker payload
}
]
3、
Account Update
Account state is updated with the outboundAccountInfo event.
Payload:
{
"e": "outboundAccountInfo", // Event type
"E": 1499405658849, // Event time
"m": 0, // Maker commission rate (bips)
"t": 0, // Taker commission rate (bips)
"b": 0, // Buyer commission rate (bips)
"s": 0, // Seller commission rate (bips)
"T": true, // Can trade?
"W": true, // Can withdraw?
"D": true, // Can deposit?
"u": 1499405658848, // Time of last account update
"B": [ // Balances array
{
"a": "LTC", // Asset
"f": "17366.18538083", // Free amount
"l": "0.00000000" // Locked amount
},
{
"a": "BTC",
"f": "10537.85314051",
"l": "2.19464093"
},
{
"a": "ETH",
"f": "17902.35190619",
"l": "0.00000000"
},
{
"a": "BNC",
"f": "1114503.29769312",
"l": "0.00000000"
},
{
"a": "NEO",
"f": "0.00000000",
"l": "0.00000000"
}
]
}
Order Update
Orders are updated with the executionReport event. Check the API documentation and below for relevant enum definitions.
Payload:
{
"e": "executionReport", // Event type
"E": 1499405658658, // Event time
"s": "ETHBTC", // Symbol
"c": "mUvoqJxFIILMdfAW5iGSOW", // Client order ID
"S": "BUY", // Side
"o": "LIMIT", // Order type
"f": "GTC", // Time in force
"q": "1.00000000", // Order quantity
"p": "0.10264410", // Order price
"P": "0.00000000", // Stop price
"F": "0.00000000", // Iceberg quantity
"g": -1, // Ignore
"C": "null", // Original client order ID; This is the ID of the order being canceled
"x": "NEW", // Current execution type
"X": "NEW", // Current order status
"r": "NONE", // Order reject reason; will be an error code.
"i": 4293153, // Order ID
"l": "0.00000000", // Last executed quantity
"z": "0.00000000", // Cumulative filled quantity
"L": "0.00000000", // Last executed price
"n": "0", // Commission amount
"N": null, // Commission asset
"T": 1499405658657, // Transaction time
"t": -1, // Trade ID
"I": 8641984, // Ignore
"w": true, // Is the order working? Stops will have
"m": false, // Is this trade the maker side?
"M": false // Ignore
}
Execution types:
NEW
CANCELED
REPLACED (currently unused)
REJECTED
TRADE
EXPIRED

1
vnpy/api/binance/run.bat Normal file
View File

@ -0,0 +1 @@
python test4.py

View File

@ -0,0 +1 @@
python test5.py

26
vnpy/api/binance/test.py Normal file
View File

@ -0,0 +1,26 @@
# encoding: UTF-8
from vnpy.vnbinance import *
# api = BINANCE_Sub_Spot_Api()
# api.connect_Subpot(apiKey , secretKey)
# api.subscribeSpotTicker("BNBBTC".lower())
url = "wss://stream.binance.com:9443/ws/bnbbtc@kline_1m"
# print BINANCE_STREAM_URL
factory = BinanceClientFactory( url )
factory.protocol = BinanceClientProtocol
factory.callback = process_message
context_factory = ssl.ClientContextFactory()
connectWS(factory, context_factory)
reactor.run( installSignalHandlers=False )
input()
# bm = BinanceSocketManager(None)
# #bm.start_aggtrade_socket(symbol='BNBBTC' , callback=process_message)
# bm.start_symbol_ticker_socket("BNBBTC" , callback=process_message)
# bm.start()

28
vnpy/api/binance/test2.py Normal file
View File

@ -0,0 +1,28 @@
# encoding: UTF-8
import hashlib
import zlib
import json
from time import sleep
from threading import Thread
from autobahn.twisted.websocket import WebSocketClientFactory, \
WebSocketClientProtocol, \
connectWS
from twisted.internet import reactor, ssl
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.internet.error import ReactorAlreadyRunning
import threading
import websocket
def onMessage(ws, data):
print(ws ,data)
BINANCE_STREAM_URL = 'wss://stream.binance.com:9443/ws/bnbbtc@ticker'
ss = websocket.WebSocketApp(BINANCE_STREAM_URL,
on_message=onMessage)
reactor.run(installSignalHandlers=False)
input()

14
vnpy/api/binance/test3.py Normal file
View File

@ -0,0 +1,14 @@
# encoding: UTF-8
from vnbinance import *
apiKey = 'xxxx'
secretKey = 'xxxx'
aa = BinanceSpotApi()
aa.connect_Subpot( apiKey , secretKey)
# aa.subcribeSymbol( "bnb_btc")
input()

46
vnpy/api/binance/test4.py Normal file
View File

@ -0,0 +1,46 @@
# encoding: UTF-8
from vnpy.api.binance.vnbinance import *
apiKey = "xxx"
secretKey = "xxxx"
ap = BinanceSpotApi()
ap.connect_Subpot( apiKey , secretKey)
# ap.subscribeSpotDepth( "BNBBTC" )
# ap.spotListAllOrders("ETCBTC")
# id 5509642
# ap.spotListOpenOrders()
# ap.spotAccountInfo()
# ap.subscribeAllTicker()
# print "ap.spotExchangeInfo()"
# ap.spotExchangeInfo()
# print ap.spotCancelOrder( "ETCBTC" , "5509642" )
# ap.subcribeSymbol( "bnb_btc")
# ap.spotListAllOrders("ETCBTC")
print ("ap.subscribeSpotTicker")
# ap.subscribeSpotTicker("bnb_btc")
price_f = "0.00000325"
# ap.spotTrade( symbol_pair = "ETCBTC" , type_ = "sell" , price = 0.1 , amount = 0.01)
ap.spotTrade( symbol_pair = "TNBBTC" , type_ = "buy" , price = price_f , amount = 400)
# ap.spotListAllOrdesrs("TNBBTC")
# ap.spotCancelOrder( "TNBBTC" , "5047104" )
# ap.spotListAllOrders("ETCBTC")
#input()

74
vnpy/api/binance/test5.py Normal file
View File

@ -0,0 +1,74 @@
from vnpy.api.binance.client import Client
apiKey = "xxx"
secretKey = "xxxx"
client = Client(apiKey, secretKey)
# # get market depth
# depth = client.get_order_book(symbol='BNBBTC')
# # place a test market buy order, to place an actual order use the create_order function
# order = client.create_test_order(
# symbol='BNBBTC',
# side=Client.SIDE_BUY,
# type=Client.ORDER_TYPE_MARKET,
# quantity=100)
# # get all symbol prices
# prices = client.get_all_tickers()
# # withdraw 100 ETH
# # check docs for assumptions around withdrawals
# from binance.exceptions import BinanceAPIException, BinanceWithdrawException
# try:
# result = client.withdraw(
# asset='ETH',
# address='<eth_address>',
# amount=100)
# except BinanceAPIException as e:
# print(e)
# except BinanceWithdrawException as e:
# print(e)
# else:
# print("Success")
# # fetch list of withdrawals
# withdraws = client.get_withdraw_history()
# # fetch list of ETH withdrawals
# eth_withdraws = client.get_withdraw_history(asset='ETH')
# # get a deposit address for BTC
# address = client.get_deposit_address(asset='BTC')
# # start aggregated trade websocket for BNBBTC
def process_message(msg):
print("message type: {}".format(msg['e']))
print(msg)
# do something
#from vnpy.api.binance.websockets import BinanceSocketManager
#bm = BinanceSocketManager(client)
#bm.start_aggtrade_socket('BNBBTC', process_message)
#bm.start()
# get historical kline data from any date range
from datetime import datetime,timedelta
# fetch 1 minute klines for the last day up until now
start_time = datetime.now()
start_time = start_time - timedelta(hours=1)
t =int(start_time.timestamp()*1000)
# startTime =t
klines = client.get_klines(symbol="BTCUSDT",interval= Client.KLINE_INTERVAL_1MINUTE)
for kl in klines:
print(datetime.fromtimestamp(kl[0]/1e3))
# fetch 30 minute klines for the last month of 2017
# klines = client.get_historical_klines("ETHBTC", Client.KLINE_INTERVAL_30MINUTE, "1 Dec, 2017", "1 Jan, 2018")
# fetch weekly klines since it listed
# klines = client.get_historical_klines("NEOBTC", KLINE_INTERVAL_1WEEK, "1 Jan, 2017")

View File

@ -0,0 +1,470 @@
# encoding: UTF-8
import sys
import hashlib
import zlib
import json
from time import sleep
from threading import Thread
from autobahn.twisted.websocket import WebSocketClientFactory, \
WebSocketClientProtocol, \
connectWS
from twisted.internet import reactor, ssl
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.internet.error import ReactorAlreadyRunning
import threading
import traceback
from vnpy.api.binance.exceptions import BinanceAPIException, BinanceRequestException, BinanceWithdrawException
from vnpy.api.binance.websockets import BinanceSocketManager
from vnpy.api.binance.client import Client
import urllib, requests
import urllib.parse
# 请求指令
FUNCTIONCODE_GET_ACCOUNT_BINANCE = 'get_account'
# FUNCTIONCODE_GET_ORDER = 'get_order' # 不准备实现
FUNCTIONCODE_GET_OPEN_ORDERS = 'get_open_orders' # 获得所有的 open orders
FUNCTIONCODE_GET_ALL_ORDERS = "get_all_orders" # 获得所有的 orders
FUNCTIONCODE_BUY_ORDER_BINANCE = 'buy'
FUNCTIONCODE_SELL_ORDER_BINANCE = 'sell'
FUNCTIONCODE_CANCEL_ORDER_BINANCE = 'cancel_order'
FUNCTIONCODE_GET_EXCHANGE_INFO = "get_exchange_info"
# 印射关系
binance_exchanges_dict = {
"bcc" : "bch",
"bccusdt": "bchusdt",
"bccbtc" : "bchbtc",
"bccbnb" : "bchbnb",
"bcceth" : "bcheth",
"BCC" : "BCH",
"BCCUSDT": "BCHUSDT",
"BCCBTC" : "BCHBTC",
"BCCBNB" : "BCHBNB",
"BCCETH" : "BCHETH"
}
exchanges_biannce_dict = {v: k for k, v in binance_exchanges_dict.items()}
'''
币安的某些symbol_pair 其他交易所统一, 比如 bcc 换成 bch
'''
def symbolFromBinanceToOtherExchanges(symbol_pair):
global binance_exchanges_dict
if symbol_pair in binance_exchanges_dict.keys():
return binance_exchanges_dict[symbol_pair]
return symbol_pair
'''
其他交易所的symbol 印射到 币安的实际symbol
'''
def symbolFromOtherExchangesToBinance(symbol_pair):
global exchanges_biannce_dict
if symbol_pair in exchanges_biannce_dict.keys():
return exchanges_biannce_dict[symbol_pair]
return symbol_pair
class BinanceSpotApi(object):
"""基于Websocket的API对象"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.apiKey = '' # 用户名
self.secretKey = '' # 密码
self.client = None # 客户端管理器
self.bm = None # sockets 管理器
self.reqID = 0 # 请求编号
# self.reqQueue = Queue() # 请求队列
self.reqQueue = [] # 请求的队列
self.reqThread = Thread(target=self.processQueue) # 请求处理线程
self.active = False # API工作状态
self.DEBUG = True
#----------------------------------------------------------------------
def connect_Subpot(self, apiKey , secretKey ):
self.apiKey = apiKey
self.secretKey = secretKey
self.client = Client( apiKey , secretKey)
self.bm = BinanceSocketManager(self.client)
self.bm.start_user_socket( self.onMessage)
self.bm.start()
self.active = True
self.reqThread.start()
#----------------------------------------------------------------------
def processQueue(self):
"""处理请求队列中的请求"""
while self.active:
try:
#req = self.reqQueue.get(block=True, timeout=0.001) # 获取请求的阻塞为一秒
if len(self.reqQueue) == 0:
continue
(Type , req) = self.reqQueue[0]
callback = req['callback']
reqID = req['reqID']
try:
data = self.processRequest(req)
# 请求成功
if data != None :
if self.DEBUG:
print(callback.__name__)
callback(data, req, reqID)
except Exception as ex:
print(u'processQueue exception:{},{}'.format(str(ex), traceback.format_exc()), file=sys.stderr)
self.reqQueue.pop(0)
sleep(0.1)
except Exception as ex:
print(u'processQueue exception:{},{}'.format(str(ex), traceback.format_exc()), file=sys.stderr)
#----------------------------------------------------------------------
def processRequest(self, req):
"""
处理各类请求
:param req:
:return:
"""
try:
method = req['method']
reqID = req["reqID"]
callback = req['callback']
data = None
# 请求账号信息
if method == FUNCTIONCODE_GET_ACCOUNT_BINANCE:
data = self.client.get_account()
# 替换 一些 symbol
for b_symbol_dic in data["balances"]:
b_symbol_dic["asset"] = symbolFromBinanceToOtherExchanges(b_symbol_dic["asset"])
self.onGetAccount(data , req , reqID)
# 请求委托单
elif method == FUNCTIONCODE_GET_OPEN_ORDERS:
kwargs = req["kwargs"]
symbol = kwargs["symbol"]
if symbol != None:
data = self.client.get_open_orders(symbol = symbol)
# update symbol
for dic in data:
dic["symbol"] = symbolFromBinanceToOtherExchanges( dic["symbol"] )
callback(data , req , reqID)
else:
data = self.client.get_open_orders()
# update symbol
for dic in data:
dic["symbol"] = symbolFromBinanceToOtherExchanges( dic["symbol"] )
callback(data , req , reqID)
# 请求所有委托单
elif method == FUNCTIONCODE_GET_ALL_ORDERS:
kwargs = req["kwargs"]
symbol = kwargs["symbol"]
#if '_' not in symbol and len(symbol) in [3,4]:
# return None
data = self.client.get_all_orders( symbol = symbol )
# update symbol
for dic in data:
dic["symbol"] = symbolFromBinanceToOtherExchanges( dic["symbol"] )
callback(data , req , reqID)
# 买入现货
elif method == FUNCTIONCODE_BUY_ORDER_BINANCE:
kwargs = req["kwargs"]
symbol = self.legalSymbolUpper(kwargs["symbol"])
quantity = float(kwargs["quantity"])
price = float(kwargs["price"])
price = str('%.8f' % float(price))
data = self.client.order_limit_buy( symbol=symbol, price = price ,quantity=quantity)
# update symbol
data["symbol"] = symbolFromBinanceToOtherExchanges( data["symbol"] )
callback(data , req , reqID)
# 卖出现货
elif method == FUNCTIONCODE_SELL_ORDER_BINANCE:
kwargs = req["kwargs"]
symbol = self.legalSymbolUpper(kwargs["symbol"])
quantity = float(kwargs["quantity"])
price = float(kwargs["price"])
price = str('%.8f' % float(price))
data = self.client.order_limit_sell( symbol=symbol, price = price ,quantity=quantity)
# update symbol
data["symbol"] = symbolFromBinanceToOtherExchanges( data["symbol"] )
callback(data , req , reqID)
# 取消订单
elif method == FUNCTIONCODE_CANCEL_ORDER_BINANCE:
kwargs = req["kwargs"]
symbol = self.legalSymbolUpper(kwargs["symbol"])
orderId = int(kwargs["orderId"])
data = self.client.cancel_order( symbol = symbol , orderId = orderId )
# update symbol
data["symbol"] = symbolFromBinanceToOtherExchanges( data["symbol"] )
callback(data , req , reqID)
# 获取交易所信息
elif method == FUNCTIONCODE_GET_EXCHANGE_INFO:
data = self.client.get_exchange_info()
# update symbol
for symbol_dic in data["symbols"]:
symbol_dic["symbol"] = symbolFromBinanceToOtherExchanges( symbol_dic["symbol"] )
symbol_dic["baseAsset"] = symbolFromBinanceToOtherExchanges( symbol_dic["baseAsset"] )
callback(data , req , reqID)
except Exception as ex:
print(u'processQueue exception:{},{}'.format(str(ex), traceback.format_exc()), file=sys.stderr)
# pass
#----------------------------------------------------------------------
def legalSymbolLower(self, symbol):
symbol = symbol.lower()
symbol = ''.join(symbol.split('_'))
return symbol
#----------------------------------------------------------------------
def legalSymbolUpper(self, symbol):
symbol = symbol.upper()
symbol = ''.join(symbol.split('_'))
return symbol
#----------------------------------------------------------------------
def exit(self):
"""退出"""
self.active = False
if self.bm != None:
self.bm.close()
if self.reqThread.isAlive():
self.reqThread.join()
#-----------------------------------------------------------------------
def sendTradingRequest(self, method , callback , kwargs = None,optional=None):
# 请求编号加1
self.reqID += 1
# 生成请求字典并放入队列中
req = {}
req['method'] = method
req['callback'] = callback
req['optional'] = optional
req['kwargs'] = kwargs
req['reqID'] = self.reqID
if method in [FUNCTIONCODE_GET_OPEN_ORDERS , FUNCTIONCODE_GET_ACCOUNT_BINANCE ]:
flag = False
for use_method ,r in self.reqQueue:
if use_method == method:
flag = True
break
if False == flag:
self.reqQueue.append( (method , req))
else:
self.reqQueue.append( (method , req))
# 返回请求编号
return self.reqID
#----------------------------------------------------------------------
def spotTrade(self, symbol_pair, type_, price, amount):
"""现货委托"""
symbol_pair = self.legalSymbolUpper(symbol_pair)
symbol_pair = symbolFromOtherExchangesToBinance(symbol_pair)
if type_ == "buy":
return self.sendTradingRequest( method = FUNCTIONCODE_BUY_ORDER_BINANCE , callback = self.onTradeOrder , kwargs = {"symbol":symbol_pair, "price":price,"quantity":amount} , optional=None)
elif type_ == "sell":
return self.sendTradingRequest( method = FUNCTIONCODE_SELL_ORDER_BINANCE , callback = self.onTradeOrder , kwargs = {"symbol":symbol_pair, "price":price,"quantity":amount} , optional=None)
return None
#----------------------------------------------------------------------
def spotCancelOrder(self, symbol_pair, orderid):
"""现货委托撤单"""
symbol_pair = self.legalSymbolUpper(symbol_pair)
symbol_pair = symbolFromOtherExchangesToBinance(symbol_pair)
return self.sendTradingRequest( method = FUNCTIONCODE_CANCEL_ORDER_BINANCE , callback = self.onGetCancelOrder , kwargs = {"symbol":symbol_pair,"orderId":int(orderid)} , optional=None)
#----------------------------------------------------------------------
def spotAccountInfo(self):
"""列出账户"""
return self.sendTradingRequest( method = FUNCTIONCODE_GET_ACCOUNT_BINANCE , callback = self.onGetAccount , kwargs = {} , optional = None)
#----------------------------------------------------------------------
def spotListOpenOrders(self , symbol = None):
"""列出所有的 orders"""
if symbol != None:
symbol = self.legalSymbolUpper(symbol)
symbol = symbolFromOtherExchangesToBinance(symbol)
return self.sendTradingRequest( method = FUNCTIONCODE_GET_OPEN_ORDERS , callback = self.onGetOpenOrders , kwargs = {"symbol":symbol} , optional=None)
#----------------------------------------------------------------------
def spotListAllOrders( self , symbol):
symbol = self.legalSymbolUpper(symbol)
symbol = symbolFromOtherExchangesToBinance(symbol)
return self.sendTradingRequest( method = FUNCTIONCODE_GET_ALL_ORDERS , callback = self.onGetAllOrders , kwargs = {"symbol":symbol} , optional=None)
#----------------------------------------------------------------------
def spotExchangeInfo(self):
return self.sendTradingRequest( method = FUNCTIONCODE_GET_EXCHANGE_INFO , callback = self.onExchangeInfo , kwargs = {} , optional = None)
#----------------------------------------------------------------------
# def subcribeSymbol(self , symbol):
# symbol = self.legalSymbolLower(symbol)
# symbol = symbolFromOtherExchangesToBinance(symbol)
# #self.bm.start_symbol_ticker_socket( symbol , self.onTick)
# self.bm.start_multiplex_socket([symbol + "@ticker" , symbol + "@depth5" , symbol + "@trade"] , callback=self.onMessage)
#----------------------------------------------------------------------
def subscribeAllTicker(self):
self.bm.start_ticker_socket( callback = self.onPreDealAllTicker)
#----------------------------------------------------------------------
def subscribeSpotTicker(self , symbol):
if self.bm != None:
# print "self.bm != None:"
symbol = self.legalSymbolLower(symbol)
symbol = symbolFromOtherExchangesToBinance(symbol)
self.bm.start_symbol_ticker_socket( symbol , self.onPreDealTicker)
#----------------------------------------------------------------------
def subscribeSpotDepth(self, symbol):
if self.bm != None:
symbol = self.legalSymbolLower(symbol)
symbol = symbolFromOtherExchangesToBinance(symbol)
self.bm.start_depth_socket(symbol , self.onPreDealDepth)
#----------------------------------------------------------------------
def subscribeSpotTrades(self, symbol):
if self.bm != None:
symbol = self.legalSymbolLower(symbol)
symbol = symbolFromOtherExchangesToBinance(symbol)
self.bm.start_trade_socket(symbol , self.onPreDealTrades)
#----------------------------------------------------------------------
def http_get_request(self, url, params, add_to_headers=None):
headers = {
"Content-type": "application/x-www-form-urlencoded",
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0'
}
if add_to_headers:
headers.update(add_to_headers)
postdata = urllib.parse.urlencode(params)
try:
#response = requests.get(url, postdata, headers=headers, timeout=5)
response = requests.get(url )
if response.status_code == 200:
return response.json()
else:
return {"status":"fail"}
except Exception as e:
print("httpGet failed, detail is:%s" %e)
return {"status":"fail","msg":e}
# limit in [5, 10, 20, 50, 100, 500, 1000]
#----------------------------------------------------------------------
def getDepthSymbol(self, symbol , limit = 10):
symbol = self.legalSymbolUpper(symbol)
symbol = symbolFromOtherExchangesToBinance(symbol)
url = "https://www.binance.com/api/v1/depth?symbol=%s&limit=%s" % ( symbol , str(limit))
data = self.http_get_request( url , "")
return data
#----------------------------------------------------------------------
def onMessage(self, msg):
if 's' in msg.keys():
msg['s'] = symbolFromBinanceToOtherExchanges(msg["s"])
if "lastUpdateId" in msg.keys():
self.onDepth( msg)
else:
if msg["e"] == "trade":
self.onTrades(msg)
elif "ticker" in msg["e"]:
self.onTick(msg)
#----------------------------------------------------------------------
def onAllError(self, ex , req , reqID):
print( "onAllError" + str(ex))
#----------------------------------------------------------------------
def onPreDealAllTicker(self, msg):
for dic in msg:
dic["s"] = symbolFromBinanceToOtherExchanges(dic["s"])
self.onAllTicker(msg)
#----------------------------------------------------------------------
def onAllTicker(self,msg):
"""币安支持所有 ticker 同时socket过来"""
print(u'onAllTicker:'.format(msg))
# ----------------------------------------------------------------------
def onPreDealTicker(self, msg):
if 's' in msg.keys():
msg['s'] = symbolFromBinanceToOtherExchanges(msg["s"])
self.onTick(msg)
#----------------------------------------------------------------------
def onTick(self, msg):
print(u'onTick:'.format(msg))
# ----------------------------------------------------------------------
def onPreDealDepth(self, msg):
if 's' in msg.keys():
msg['s'] = symbolFromBinanceToOtherExchanges(msg["s"])
self.onDepth(msg)
#----------------------------------------------------------------------
def onDepth(self, msg):
print(u'onDepth:'.format(msg))
# ----------------------------------------------------------------------
def onPreDealTrades(self, msg):
if 's' in msg.keys():
msg['s'] = symbolFromBinanceToOtherExchanges(msg["s"])
self.onTrades(msg)
#----------------------------------------------------------------------
def onTrades(self, msg):
print(u'onTrades:{}'.format(msg))
#----------------------------------------------------------------------
def onGetAccount(self, data, req, reqID):
print(u'onGetAccount:'.format(data, req, reqID))
#----------------------------------------------------------------------
def onGetOpenOrders(self, data, req, reqID):
print(u'onGetOpenOrders:ata:{}, req:{}, reqID:{}'.format(data, req, reqID))
#----------------------------------------------------------------------
def onGetAllOrders(self, data, req, reqID):
print(u'onGetAllOrders:data:{}, req:{}, reqID:{}'.format(data, req, reqID))
#----------------------------------------------------------------------
def onGetBuyOrder(self, data, req, reqID):
print(u'onGetBuyOrder:data:{}, req:{}, reqID:{}'.format(data, req, reqID))
#----------------------------------------------------------------------
def onGetSellOrder(self, data, req, reqID):
print(u'onGetSellOrder:data:{}, req:{}, reqID:{}'.format(data, req, reqID))
#----------------------------------------------------------------------
def onGetCancelOrder(self, data, req, reqID):
print(u'onGetCancelOrder:data:{},req:{},reqId:{}'.format(data, req, reqID))
#----------------------------------------------------------------------
def onExchangeInfo(self, data, req, reqID):
print(u'onExchangeInfo:data:{},req:{},reqId:{}'.format(data, req, reqID))
# ----------------------------------------------------------------------
def onTradeOrder(self, data, req, reqID):
print (u'onTradeOrder:{}'.format(data))

View File

@ -0,0 +1,460 @@
#!/usr/bin/env python
# coding=utf-8
import json
import threading
import sys
from autobahn.twisted.websocket import WebSocketClientFactory, \
WebSocketClientProtocol, \
connectWS
from twisted.internet import reactor, ssl
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.internet.error import ReactorAlreadyRunning
from vnpy.api.binance.client import Client
from datetime import datetime
class BinanceClientProtocol(WebSocketClientProtocol):
def onConnect(self, response):
# reset the delay after reconnecting
self.factory.resetDelay()
def onMessage(self, payload, isBinary):
if not isBinary:
try:
payload_obj = json.loads(payload.decode('utf8'))
except ValueError:
pass
else:
self.factory.callback(payload_obj)
class BinanceReconnectingClientFactory(ReconnectingClientFactory):
# set initial delay to a short time
initialDelay = 0.1
maxDelay = 10
maxRetries = 5
class BinanceClientFactory(WebSocketClientFactory, BinanceReconnectingClientFactory):
protocol = BinanceClientProtocol
def clientConnectionFailed(self, connector, reason):
self.retry(connector)
def clientConnectionLost(self, connector, reason):
# check if closed cleanly
if reason.getErrorMessage() != 'Connection was closed cleanly.':
self.retry(connector)
class BinanceSocketManager(threading.Thread):
STREAM_URL = 'wss://stream.binance.com:9443/'
WEBSOCKET_DEPTH_1 = '1'
WEBSOCKET_DEPTH_5 = '5'
WEBSOCKET_DEPTH_10 = '10'
WEBSOCKET_DEPTH_20 = '20'
_user_timeout = 30 * 60 # 30 minutes
def __init__(self, client):
"""Initialise the BinanceSocketManager
:param client: Binance API client
:type client: binance.Client
"""
threading.Thread.__init__(self)
self._conns = {}
self._user_timer = None
self._user_listen_key = None
self._user_callback = None
self._client = client
def _start_socket(self, path, callback, prefix='ws/'):
if path in self._conns:
return False
factory_url = self.STREAM_URL + prefix + path
factory = BinanceClientFactory(factory_url)
factory.protocol = BinanceClientProtocol
factory.callback = callback
context_factory = ssl.ClientContextFactory()
self._conns[path] = connectWS(factory, context_factory)
return path
def start_depth_socket(self, symbol, callback, depth=WEBSOCKET_DEPTH_1):
"""Start a websocket for symbol market depth
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#partial-book-depth-streams
:param symbol: required
:type symbol: str
:param callback: callback function to handle messages
:type callback: function
:param depth: Number of depth entries to return, default WEBSOCKET_DEPTH_1
:type depth: enum
:returns: connection key string if successful, False otherwise
Message Format
.. code-block:: python
{
"e": "depthUpdate", # event type
"E": 1499404630606, # event time
"s": "ETHBTC", # symbol
"u": 7913455, # updateId to sync up with updateid in /api/v1/depth
"b": [ # bid depth delta
[
"0.10376590", # price (need to update the quantity on this price)
"59.15767010", # quantity
[] # can be ignored
],
],
"a": [ # ask depth delta
[
"0.10376586", # price (need to update the quantity on this price)
"159.15767010", # quantity
[] # can be ignored
],
[
"0.10383109",
"345.86845230",
[]
],
[
"0.10490700",
"0.00000000", # quantity=0 means remove this level
[]
]
]
}
"""
socket_name = symbol.lower() + '@depth'
if depth != self.WEBSOCKET_DEPTH_1:
socket_name = '{}{}'.format(socket_name, depth)
return self._start_socket(socket_name, callback)
def start_kline_socket(self, symbol, callback, interval=Client.KLINE_INTERVAL_1MINUTE):
"""Start a websocket for symbol kline data
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#klinecandlestick-streams
:param symbol: required
:type symbol: str
:param callback: callback function to handle messages
:type callback: function
:param interval: Kline interval, default KLINE_INTERVAL_1MINUTE
:type interval: enum
:returns: connection key string if successful, False otherwise
Message Format
.. code-block:: python
{
"e": "kline", # event type
"E": 1499404907056, # event time
"s": "ETHBTC", # symbol
"k": {
"t": 1499404860000, # start time of this bar
"T": 1499404919999, # end time of this bar
"s": "ETHBTC", # symbol
"i": "1m", # interval
"f": 77462, # first trade id
"L": 77465, # last trade id
"o": "0.10278577", # open
"c": "0.10278645", # close
"h": "0.10278712", # high
"l": "0.10278518", # low
"v": "17.47929838", # volume
"n": 4, # number of trades
"x": false, # whether this bar is final
"q": "1.79662878", # quote volume
"V": "2.34879839", # volume of active buy
"Q": "0.24142166", # quote volume of active buy
"B": "13279784.01349473" # can be ignored
}
}
"""
socket_name = '{}@kline_{}'.format(symbol.lower(), interval)
return self._start_socket(socket_name, callback)
def start_trade_socket(self, symbol, callback):
"""Start a websocket for symbol trade data
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#trade-streams
:param symbol: required
:type symbol: str
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format
.. code-block:: python
{
"e": "trade", # Event type
"E": 123456789, # Event time
"s": "BNBBTC", # Symbol
"t": 12345, # Trade ID
"p": "0.001", # Price
"q": "100", # Quantity
"b": 88, # Buyer order Id
"a": 50, # Seller order Id
"T": 123456785, # Trade time
"m": true, # Is the buyer the market maker?
"M": true # Ignore.
}
"""
return self._start_socket(symbol.lower() + '@trade', callback)
def start_aggtrade_socket(self, symbol, callback):
"""Start a websocket for symbol trade data
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#aggregate-trade-streams
:param symbol: required
:type symbol: str
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format
.. code-block:: python
{
"e": "aggTrade", # event type
"E": 1499405254326, # event time
"s": "ETHBTC", # symbol
"a": 70232, # aggregated tradeid
"p": "0.10281118", # price
"q": "8.15632997", # quantity
"f": 77489, # first breakdown trade id
"l": 77489, # last breakdown trade id
"T": 1499405254324, # trade time
"m": false, # whether buyer is a maker
"M": true # can be ignored
}
"""
return self._start_socket(symbol.lower() + '@aggTrade', callback)
def start_symbol_ticker_socket(self, symbol, callback):
"""Start a websocket for a symbol's ticker data
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#individual-symbol-ticker-streams
:param symbol: required
:type symbol: str
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format
.. code-block:: python
{
"e": "24hrTicker", # Event type
"E": 123456789, # Event time
"s": "BNBBTC", # Symbol
"p": "0.0015", # Price change
"P": "250.00", # Price change percent
"w": "0.0018", # Weighted average price
"x": "0.0009", # Previous day's close price
"c": "0.0025", # Current day's close price
"Q": "10", # Close trade's quantity
"b": "0.0024", # Best bid price
"B": "10", # Bid bid quantity
"a": "0.0026", # Best ask price
"A": "100", # Best ask quantity
"o": "0.0010", # Open price
"h": "0.0025", # High price
"l": "0.0010", # Low price
"v": "10000", # Total traded base asset volume
"q": "18", # Total traded quote asset volume
"O": 0, # Statistics open time
"C": 86400000, # Statistics close time
"F": 0, # First trade ID
"L": 18150, # Last trade Id
"n": 18151 # Total number of trades
}
"""
return self._start_socket(symbol.lower() + '@ticker', callback)
def start_ticker_socket(self, callback):
"""Start a websocket for all ticker data
By default all markets are included in an array.
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#all-market-tickers-stream
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format
.. code-block:: python
[
{
'F': 278610,
'o': '0.07393000',
's': 'BCCBTC',
'C': 1509622420916,
'b': '0.07800800',
'l': '0.07160300',
'h': '0.08199900',
'L': 287722,
'P': '6.694',
'Q': '0.10000000',
'q': '1202.67106335',
'p': '0.00494900',
'O': 1509536020916,
'a': '0.07887800',
'n': 9113,
'B': '1.00000000',
'c': '0.07887900',
'x': '0.07399600',
'w': '0.07639068',
'A': '2.41900000',
'v': '15743.68900000'
}
]
"""
return self._start_socket('!ticker@arr', callback)
def start_multiplex_socket(self, streams, callback):
"""Start a multiplexed socket using a list of socket names.
User stream sockets can not be included.
Symbols in socket name must be lowercase i.e bnbbtc@aggTrade, neobtc@ticker
Combined stream events are wrapped as follows: {"stream":"<streamName>","data":<rawPayload>}
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md
:param streams: list of stream names in lower case
:type streams: list
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format - see Binance API docs for all types
"""
stream_path = 'streams={}'.format('/'.join(streams))
return self._start_socket(stream_path, callback, 'stream?')
def start_user_socket(self, callback):
"""Start a websocket for user data
https://www.binance.com/restapipub.html#user-wss-endpoint
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format - see Binance API docs for all types
"""
if self._user_listen_key:
# cleanup any sockets with this key
for conn_key in self._conns:
if len(conn_key) >= 60 and conn_key[:60] == self._user_listen_key:
self.stop_socket(conn_key)
break
self._user_listen_key = self._client.stream_get_listen_key()
self._user_callback = callback
conn_key = self._start_socket(self._user_listen_key, callback)
if conn_key:
# start timer to keep socket alive
self._start_user_timer()
return conn_key
def _start_user_timer(self):
self._user_timer = threading.Timer(self._user_timeout, self._keepalive_user_socket)
self._user_timer.setDaemon(True)
self._user_timer.start()
def _keepalive_user_socket(self):
try:
listen_key = self._client.stream_get_listen_key()
# check if they key changed and
if listen_key != self._user_listen_key:
self.start_user_socket(self._user_callback)
self._start_user_timer()
except Exception as ex:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " in _keepalive_user_socket")
print(str(ex),file=sys.stderr)
def stop_socket(self, conn_key):
"""Stop a websocket given the connection key
:param conn_key: Socket connection key
:type conn_key: string
:returns: connection key string if successful, False otherwise
"""
if conn_key not in self._conns:
return
self._conns[conn_key].disconnect()
del(self._conns[conn_key])
# check if we have a user stream socket
if len(conn_key) >= 60 and conn_key[:60] == self._user_listen_key:
self._stop_user_socket()
def _stop_user_socket(self):
if not self._user_listen_key:
return
# stop the timer
self._user_timer.cancel()
self._user_timer = None
# close the stream
self._client.stream_close(listenKey=self._user_listen_key)
self._user_listen_key = None
def run(self):
try:
reactor.run(installSignalHandlers=False)
except ReactorAlreadyRunning:
# Ignore error about reactor already running
pass
def close(self):
"""Close all connections
"""
keys = set(self._conns.keys())
for key in keys:
self.stop_socket(key)
self._conns = {}