This commit is contained in:
msincenselee 2018-10-04 10:29:13 +08:00
parent f4df43e423
commit 5459e07e94
4 changed files with 787 additions and 426 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,45 @@
# coding=utf-8
import dateparser
import pytz
from datetime import datetime
def date_to_milliseconds(date_str):
"""Convert UTC date to milliseconds
If using offset strings add "UTC" to date string e.g. "now UTC", "11 hours ago UTC"
See dateparse docs for formats http://dateparser.readthedocs.io/en/latest/
:param date_str: date in readable format, i.e. "January 01, 2018", "11 hours ago UTC", "now UTC"
:type date_str: str
"""
# get epoch value in UTC
epoch = datetime.utcfromtimestamp(0).replace(tzinfo=pytz.utc)
# parse our date string
d = dateparser.parse(date_str)
# if the date is not timezone aware apply UTC timezone
if d.tzinfo is None or d.tzinfo.utcoffset(d) is None:
d = d.replace(tzinfo=pytz.utc)
# return the difference in time
return int((d - epoch).total_seconds() * 1000.0)
def interval_to_milliseconds(interval):
"""Convert a Binance interval string to milliseconds
:param interval: Binance interval string, e.g.: 1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 6h, 8h, 12h, 1d, 3d, 1w
:type interval: str
:return:
int value of interval in milliseconds
None if interval prefix is not a decimal integer
None if interval suffix is not one of m, h, d, w
"""
seconds_per_unit = {
"m": 60,
"h": 60 * 60,
"d": 24 * 60 * 60,
"w": 7 * 24 * 60 * 60,
}
try:
return int(interval[:-1]) * seconds_per_unit[interval[-1]] * 1000
except (ValueError, KeyError):
return None

View File

@ -144,7 +144,7 @@ class BinanceSpotApi(object):
:param req:
:return:
"""
reqID = None
try:
method = req['method']
reqID = req["reqID"]
@ -275,6 +275,7 @@ class BinanceSpotApi(object):
req['kwargs'] = kwargs
req['reqID'] = self.reqID
# 如果方法是查询委托订单和查询账号信息,则需要检查是否重复
if method in [FUNCTIONCODE_GET_OPEN_ORDERS , FUNCTIONCODE_GET_ACCOUNT_BINANCE ]:
flag = False
for use_method ,r in self.reqQueue:
@ -292,8 +293,17 @@ class BinanceSpotApi(object):
#----------------------------------------------------------------------
def spotTrade(self, symbol_pair, type_, price, amount):
"""现货委托"""
symbol_pair = self.legalSymbolUpper(symbol_pair)
symbol_pair = symbolFromOtherExchangesToBinance(symbol_pair)
print('binance:spotTrade:symbol_pair:{}'.format(symbol_pair))
upper_symbol_pair = self.legalSymbolUpper(symbol_pair)
if upper_symbol_pair != symbol_pair:
print('upper symbol_pair:{}=>{}'.format(symbol_pair,upper_symbol_pair))
symbol_pair = upper_symbol_pair
other_symbol_pair = symbolFromOtherExchangesToBinance(symbol_pair)
if other_symbol_pair != symbol_pair:
print('symbol_pair:{}=>{}'.format(symbol_pair,other_symbol_pair))
symbol_pair = other_symbol_pair
if type_ == "buy":
return self.sendTradingRequest( method = FUNCTIONCODE_BUY_ORDER_BINANCE , callback = self.onTradeOrder , kwargs = {"symbol":symbol_pair, "price":price,"quantity":amount} , optional=None)
elif type_ == "sell":
@ -344,8 +354,6 @@ class BinanceSpotApi(object):
#----------------------------------------------------------------------
def subscribeSpotTicker(self , symbol):
if '.' in symbol:
symbol = symbol.split('.')[0]
if self.bm != None:
# print "self.bm != None:"
symbol = self.legalSymbolLower(symbol)
@ -354,8 +362,6 @@ class BinanceSpotApi(object):
#----------------------------------------------------------------------
def subscribeSpotDepth(self, symbol):
if '.' in symbol:
symbol = symbol.split('.')[0]
if self.bm != None:
symbol = self.legalSymbolLower(symbol)
symbol = symbolFromOtherExchangesToBinance(symbol)

View File

@ -18,6 +18,9 @@ from datetime import datetime
class BinanceClientProtocol(WebSocketClientProtocol):
def __init__(self):
super().__init__()
def onConnect(self, response):
# reset the delay after reconnecting
self.factory.resetDelay()
@ -45,33 +48,38 @@ class BinanceReconnectingClientFactory(ReconnectingClientFactory):
class BinanceClientFactory(WebSocketClientFactory, BinanceReconnectingClientFactory):
protocol = BinanceClientProtocol
_reconnect_error_payload = {
'e': 'error',
'm': 'Max reconnect retries reached'
}
def clientConnectionFailed(self, connector, reason):
self.retry(connector)
if self.retries > self.maxRetries:
self.callback(self._reconnect_error_payload)
def clientConnectionLost(self, connector, reason):
# check if closed cleanly
if reason.getErrorMessage() != 'Connection was closed cleanly.':
self.retry(connector)
self.retry(connector)
if self.retries > self.maxRetries:
self.callback(self._reconnect_error_payload)
class BinanceSocketManager(threading.Thread):
STREAM_URL = 'wss://stream.binance.com:9443/'
WEBSOCKET_DEPTH_1 = '1'
WEBSOCKET_DEPTH_5 = '5'
WEBSOCKET_DEPTH_10 = '10'
WEBSOCKET_DEPTH_20 = '20'
_user_timeout = 30 * 60 # 30 minutes
DEFAULT_USER_TIMEOUT = 30 * 60 # 30 minutes
def __init__(self, client):
def __init__(self, client, user_timeout=DEFAULT_USER_TIMEOUT):
"""Initialise the BinanceSocketManager
:param client: Binance API client
:type client: binance.Client
:param user_timeout: Custom websocket timeout
:type user_timeout: int
"""
threading.Thread.__init__(self)
self._conns = {}
@ -79,6 +87,7 @@ class BinanceSocketManager(threading.Thread):
self._user_listen_key = None
self._user_callback = None
self._client = client
self._user_timeout = user_timeout
def _start_socket(self, path, callback, prefix='ws/'):
if path in self._conns:
@ -88,83 +97,82 @@ class BinanceSocketManager(threading.Thread):
factory = BinanceClientFactory(factory_url)
factory.protocol = BinanceClientProtocol
factory.callback = callback
factory.reconnect = True
context_factory = ssl.ClientContextFactory()
self._conns[path] = connectWS(factory, context_factory)
return path
def start_depth_socket(self, symbol, callback, depth=WEBSOCKET_DEPTH_1):
"""Start a websocket for symbol market depth
def start_depth_socket(self, symbol, callback, depth=None):
"""Start a websocket for symbol market depth returning either a diff or a partial book
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#partial-book-depth-streams
:param symbol: required
:type symbol: str
:param callback: callback function to handle messages
:type callback: function
:param depth: Number of depth entries to return, default WEBSOCKET_DEPTH_1
:type depth: enum
:param depth: optional Number of depth entries to return, default None. If passed returns a partial book instead of a diff
:type depth: str
:returns: connection key string if successful, False otherwise
Message Format
Partial Message Format
.. code-block:: python
{
"e": "depthUpdate", # event type
"E": 1499404630606, # event time
"s": "ETHBTC", # symbol
"u": 7913455, # updateId to sync up with updateid in /api/v1/depth
"b": [ # bid depth delta
"lastUpdateId": 160, # Last update ID
"bids": [ # Bids to be updated
[
"0.10376590", # price (need to update the quantity on this price)
"59.15767010", # quantity
[] # can be ignored
],
"0.0024", # price level to be updated
"10", # quantity
[] # ignore
]
],
"a": [ # ask depth delta
"asks": [ # Asks to be updated
[
"0.10376586", # price (need to update the quantity on this price)
"159.15767010", # quantity
[] # can be ignored
],
"0.0026", # price level to be updated
"100", # quantity
[] # ignore
]
]
}
Diff Message Format
.. code-block:: python
{
"e": "depthUpdate", # Event type
"E": 123456789, # Event time
"s": "BNBBTC", # Symbol
"U": 157, # First update ID in event
"u": 160, # Final update ID in event
"b": [ # Bids to be updated
[
"0.10383109",
"345.86845230",
[]
],
"0.0024", # price level to be updated
"10", # quantity
[] # ignore
]
],
"a": [ # Asks to be updated
[
"0.10490700",
"0.00000000", # quantity=0 means remove this level
[]
"0.0026", # price level to be updated
"100", # quantity
[] # ignore
]
]
}
"""
socket_name = symbol.lower() + '@depth'
if depth != self.WEBSOCKET_DEPTH_1:
if depth and depth != '1':
socket_name = '{}{}'.format(socket_name, depth)
return self._start_socket(socket_name, callback)
def start_kline_socket(self, symbol, callback, interval=Client.KLINE_INTERVAL_1MINUTE):
"""Start a websocket for symbol kline data
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#klinecandlestick-streams
:param symbol: required
:type symbol: str
:param callback: callback function to handle messages
:type callback: function
:param interval: Kline interval, default KLINE_INTERVAL_1MINUTE
:type interval: enum
:type interval: str
:returns: connection key string if successful, False otherwise
Message Format
.. code-block:: python
{
"e": "kline", # event type
"E": 1499404907056, # event time
@ -193,22 +201,44 @@ class BinanceSocketManager(threading.Thread):
socket_name = '{}@kline_{}'.format(symbol.lower(), interval)
return self._start_socket(socket_name, callback)
def start_miniticker_socket(self, callback, update_time=1000):
"""Start a miniticker websocket for all trades
This is not in the official Binance api docs, but this is what
feeds the right column on a ticker page on Binance.
:param callback: callback function to handle messages
:type callback: function
:param update_time: time between callbacks in milliseconds, must be 1000 or greater
:type update_time: int
:returns: connection key string if successful, False otherwise
Message Format
.. code-block:: python
[
{
'e': '24hrMiniTicker', # Event type
'E': 1515906156273, # Event time
's': 'QTUMETH', # Symbol
'c': '0.03836900', # close
'o': '0.03953500', # open
'h': '0.04400000', # high
'l': '0.03756000', # low
'v': '147435.80000000', # volume
'q': '5903.84338533' # quote volume
}
]
"""
return self._start_socket('!miniTicker@arr@{}ms'.format(update_time), callback)
def start_trade_socket(self, symbol, callback):
"""Start a websocket for symbol trade data
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#trade-streams
:param symbol: required
:type symbol: str
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format
.. code-block:: python
{
"e": "trade", # Event type
"E": 123456789, # Event time
@ -222,26 +252,19 @@ class BinanceSocketManager(threading.Thread):
"m": true, # Is the buyer the market maker?
"M": true # Ignore.
}
"""
return self._start_socket(symbol.lower() + '@trade', callback)
def start_aggtrade_socket(self, symbol, callback):
"""Start a websocket for symbol trade data
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#aggregate-trade-streams
:param symbol: required
:type symbol: str
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format
.. code-block:: python
{
"e": "aggTrade", # event type
"E": 1499405254326, # event time
@ -255,26 +278,19 @@ class BinanceSocketManager(threading.Thread):
"m": false, # whether buyer is a maker
"M": true # can be ignored
}
"""
return self._start_socket(symbol.lower() + '@aggTrade', callback)
def start_symbol_ticker_socket(self, symbol, callback):
"""Start a websocket for a symbol's ticker data
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#individual-symbol-ticker-streams
:param symbol: required
:type symbol: str
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format
.. code-block:: python
{
"e": "24hrTicker", # Event type
"E": 123456789, # Event time
@ -300,26 +316,18 @@ class BinanceSocketManager(threading.Thread):
"L": 18150, # Last trade Id
"n": 18151 # Total number of trades
}
"""
return self._start_socket(symbol.lower() + '@ticker', callback)
def start_ticker_socket(self, callback):
"""Start a websocket for all ticker data
By default all markets are included in an array.
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md#all-market-tickers-stream
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format
.. code-block:: python
[
{
'F': 278610,
@ -351,45 +359,42 @@ class BinanceSocketManager(threading.Thread):
def start_multiplex_socket(self, streams, callback):
"""Start a multiplexed socket using a list of socket names.
User stream sockets can not be included.
Symbols in socket name must be lowercase i.e bnbbtc@aggTrade, neobtc@ticker
Combined stream events are wrapped as follows: {"stream":"<streamName>","data":<rawPayload>}
https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md
:param streams: list of stream names in lower case
:type streams: list
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format - see Binance API docs for all types
"""
stream_path = 'streams={}'.format('/'.join(streams))
return self._start_socket(stream_path, callback, 'stream?')
def start_user_socket(self, callback):
"""Start a websocket for user data
https://www.binance.com/restapipub.html#user-wss-endpoint
:param callback: callback function to handle messages
:type callback: function
:returns: connection key string if successful, False otherwise
Message Format - see Binance API docs for all types
"""
# Get the user listen key
user_listen_key = self._client.stream_get_listen_key()
# and start the socket with this specific key
conn_key = self._start_user_socket(user_listen_key, callback)
return conn_key
def _start_user_socket(self, user_listen_key, callback):
# With this function we can start a user socket with a specific key
if self._user_listen_key:
# cleanup any sockets with this key
for conn_key in self._conns:
if len(conn_key) >= 60 and conn_key[:60] == self._user_listen_key:
self.stop_socket(conn_key)
break
self._user_listen_key = self._client.stream_get_listen_key()
self._user_listen_key = user_listen_key
self._user_callback = callback
conn_key = self._start_socket(self._user_listen_key, callback)
if conn_key:
@ -405,26 +410,31 @@ class BinanceSocketManager(threading.Thread):
def _keepalive_user_socket(self):
try:
listen_key = self._client.stream_get_listen_key()
user_listen_key = self._client.stream_get_listen_key()
# check if they key changed and
if listen_key != self._user_listen_key:
self.start_user_socket(self._user_callback)
self._start_user_timer()
if user_listen_key != self._user_listen_key:
# Start a new socket with the key received
# `_start_user_socket` automatically cleanup open sockets
# and starts timer to keep socket alive
self._start_user_socket(user_listen_key, self._user_callback)
else:
# Restart timer only if the user listen key is not changed
self._start_user_timer()
except Exception as ex:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " in _keepalive_user_socket")
print(str(ex),file=sys.stderr)
def stop_socket(self, conn_key):
"""Stop a websocket given the connection key
:param conn_key: Socket connection key
:type conn_key: string
:returns: connection key string if successful, False otherwise
"""
if conn_key not in self._conns:
return
# disable reconnecting if we are closing
self._conns[conn_key].factory = WebSocketClientFactory(self.STREAM_URL + 'tmp_path')
self._conns[conn_key].disconnect()
del(self._conns[conn_key])
@ -438,8 +448,6 @@ class BinanceSocketManager(threading.Thread):
# stop the timer
self._user_timer.cancel()
self._user_timer = None
# close the stream
self._client.stream_close(listenKey=self._user_listen_key)
self._user_listen_key = None
def run(self):
@ -451,10 +459,9 @@ class BinanceSocketManager(threading.Thread):
def close(self):
"""Close all connections
"""
keys = set(self._conns.keys())
for key in keys:
self.stop_socket(key)
self._conns = {}
self._conns = {}