[Add]新增币安接口

This commit is contained in:
vn.py 2018-06-04 14:12:25 +08:00
parent 5374829224
commit ba647a1b63
6 changed files with 1181 additions and 0 deletions

View File

@ -0,0 +1 @@
from .vnbinance import BinanceApi

43
vnpy/api/binance/test.py Normal file
View File

@ -0,0 +1,43 @@
from time import sleep
from vnbinance import BinanceApi
if __name__ == '__main__':
apiKey = ''
secretKey = ''
api = BinanceApi()
api.init(apiKey, secretKey)
api.start()
#api.queryPing()
#api.queryTime()
#api.queryExchangeInfo()
api.queryDepth('BTCUSDT')
#api.queryDepth('BTCUSDT')
#api.queryTrades('BTCUSDT')
#api.queryAggTrades('BTCUSDT')
#api.queryKlines('BTCUSDT', '1m')
#api.queryTicker24HR()
#api.queryTickerPrice()
#api.queryBookTicker()
api.queryAccount()
#api.queryOrder('BTCUSDT', '1231231')
#api.queryOpenOrders('BTCUSDT')
#api.queryAllOrders('BTCUSDT')
#api.queryMyTrades('BTCUSDT')
#api.startStream()
#api.keepaliveStream('12312312')
#api.closeStream('123213')
#api.newOrder('BTCUSDT', 'BUY', 'LIMIT', 3000, 1, 'GTC')
#api.cancelOrder('BTCUSDT', '132213123')
#api.initDataStream(['btcusdt@ticker', 'btcusdt@depth5'])
#api.initUserStream('NXvaiFwZz2LuKqINVerKOnWaQQG1JhdQNejiZKY9Kmgk4lZgTvm3nRAnXJK7')
raw_input()

View File

@ -0,0 +1,624 @@
# encoding: UTF-8
import json
import requests
import hmac
import hashlib
import traceback
from queue import Queue, Empty
from threading import Thread
from multiprocessing.dummy import Pool
from time import time, sleep
from urllib import urlencode
from websocket import create_connection
REST_ENDPOINT = 'https://www.binance.com'
DATASTREAM_ENDPOINT = 'wss://stream.binance.com:9443/stream?streams='
USERSTREAM_ENDPOINT = 'wss://stream.binance.com:9443/ws/'
########################################################################
class BinanceApi(object):
""""""
###################################################
## Basic Function
###################################################
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.apiKey = ''
self.secretKey = ''
self.active = False
self.reqid = 0
self.queue = Queue()
self.pool = None
self.headers = {}
self.secret = ''
self.recvWindow = 5000
self.dataStreamNameList = []
self.dataStreamUrl = ''
self.dataStreamActive = False
self.dataStreamWs = None
self.dataStreamThread = None
self.userStreamKey = ''
self.userStreamUrl = ''
self.userStreamActive = False
self.userStreamWs = None
self.userStreamThread = None
self.keepaliveCount = 0
self.keepaliveThread = None
#----------------------------------------------------------------------
def init(self, apiKey, secretKey, recvWindow=5000):
""""""
self.apiKey = apiKey
self.secretKey = secretKey
self.headers['X-MBX-APIKEY'] = apiKey
self.secret = bytes(secretKey.encode('utf-8'))
self.recvWindow = recvWindow
#----------------------------------------------------------------------
def start(self, n=10):
""""""
if self.active:
return
self.active = True
self.pool = Pool(n)
self.pool.map_async(self.run, range(n))
#----------------------------------------------------------------------
def close(self):
""""""
self.active = False
self.pool.close()
self.pool.join()
#----------------------------------------------------------------------
def request(self, method, path, params=None, signed=False, stream=False):
""""""
if not signed:
url = REST_ENDPOINT + path
headers = {}
else:
if not stream:
params['recvWindow'] = self.recvWindow
params['timestamp'] = int(time()*1000)
query = urlencode(sorted(params.items()))
signature = hmac.new(self.secret, query.encode('utf-8'),
hashlib.sha256).hexdigest()
query += "&signature={}".format(signature)
url = REST_ENDPOINT + path + '?' + query
params = None # 参数添加到query中后清空参数字典
else:
if params:
query = urlencode(sorted(params.items()))
url = REST_ENDPOINT + path + '?' + query
params = None
else:
url = REST_ENDPOINT + path
headers = self.headers
try:
resp = requests.request(method, url, params=params, headers=headers)
if resp.status_code == 200:
return True, resp.json()
else:
error = {
'method': method,
'params': params,
'code': resp.status_code,
'msg': resp.json()['msg']
}
return False, error
except Exception as e:
error = {
'method': method,
'params': params,
'code': e,
'msg': traceback.format_exc()
}
return False, error
#----------------------------------------------------------------------
def addReq(self, method, path, params, callback, signed=False, stream=False):
"""添加请求"""
self.reqid += 1
req = (method, path, params, callback, signed, stream, self.reqid)
self.queue.put(req)
return self.reqid
#----------------------------------------------------------------------
def processReq(self, req):
""""""
method, path, params, callback, signed, stream, reqid = req
result, data = self.request(method, path, params, signed, stream)
if result:
callback(data, reqid)
else:
self.onError(data, reqid)
#----------------------------------------------------------------------
def run(self, n):
""""""
while self.active:
try:
req = self.queue.get(timeout=1)
self.processReq(req)
except Empty:
pass
###################################################
## REST Function
###################################################
#----------------------------------------------------------------------
def queryPing(self):
""""""
path = '/api/v1/ping'
return self.addReq('GET', path, {}, self.onQueryPing)
#----------------------------------------------------------------------
def queryTime(self):
""""""
path = '/api/v1/time'
return self.addReq('GET', path, {}, self.onQueryTime)
#----------------------------------------------------------------------
def queryExchangeInfo(self):
""""""
path = '/api/v1/exchangeInfo'
return self.addReq('GET', path, {}, self.onQueryExchangeInfo)
#----------------------------------------------------------------------
def queryDepth(self, symbol, limit=0):
""""""
path = '/api/v1/depth'
params = {'symbol': symbol}
if limit:
params['limit'] = limit
return self.addReq('GET', path, params, self.onQueryDepth)
#----------------------------------------------------------------------
def queryTrades(self, symbol, limit=0):
""""""
path = '/api/v1/trades'
params = {'symbol': symbol}
if limit:
params['limit'] = limit
return self.addReq('GET', path, params, self.onQueryTrades)
#----------------------------------------------------------------------
def queryAggTrades(self, symbol, fromId=0, startTime=0, endTime=0, limit=0):
""""""
path = '/api/v1/aggTrades'
params = {'symbol': symbol}
if fromId:
params['fromId'] = fromId
if startTime:
params['startTime'] = startTime
if endTime:
params['endTime'] = endTime
if limit:
params['limit'] = limit
return self.addReq('GET', path, params, self.onQueryAggTrades)
#----------------------------------------------------------------------
def queryKlines(self, symbol, interval, limit=0, startTime=0, endTime=0):
""""""
path = '/api/v1/klines'
params = {
'symbol': symbol,
'interval': interval
}
if limit:
params['limit'] = limit
if startTime:
params['startTime'] = startTime
if endTime:
params['endTime'] = endTime
return self.addReq('GET', path, params, self.onQueryKlines)
#----------------------------------------------------------------------
def queryTicker24HR(self, symbol=''):
""""""
path = '/api/v1/ticker/24hr'
params = {}
if symbol:
params['symbol'] = symbol
return self.addReq('GET', path, params, self.onQueryTicker24HR)
#----------------------------------------------------------------------
def queryTickerPrice(self, symbol=''):
""""""
path = '/api/v3/ticker/price'
params = {}
if symbol:
params['symbol'] = symbol
return self.addReq('GET', path, params, self.onQueryTickerPrice)
#----------------------------------------------------------------------
def queryBookTicker(self, symbol=''):
""""""
path = '/api/v3/ticker/bookTicker'
params = {}
if symbol:
params['symbol'] = symbol
return self.addReq('GET', path, params, self.onQueryBookTicker)
#----------------------------------------------------------------------
def newOrder(self, symbol, side, type_, price, quantity, timeInForce,
newClientOrderId='', stopPrice=0, icebergQty=0, newOrderRespType=''):
""""""
path = '/api/v3/order'
params = {
'symbol': symbol,
'side': side,
'type': type_,
'price': price,
'quantity': quantity,
'timeInForce': timeInForce
}
if newClientOrderId:
params['newClientOrderId'] = newClientOrderId
if timeInForce:
params['timeInForce'] = timeInForce
if stopPrice:
params['stopPrice'] = stopPrice
if icebergQty:
params['icebergQty'] = icebergQty
if newOrderRespType:
params['newOrderRespType'] = newOrderRespType
return self.addReq('POST', path, params, self.onNewOrder, signed=True)
#----------------------------------------------------------------------
def queryOrder(self, symbol, orderId=0, origClientOrderId=0):
""""""
path = '/api/v3/order'
params = {'symbol': symbol}
if orderId:
params['orderId'] = orderId
if origClientOrderId:
params['origClientOrderId'] = origClientOrderId
return self.addReq('GET', path, params, self.onQueryOrder, signed=True)
#----------------------------------------------------------------------
def cancelOrder(self, symbol, orderId=0, origClientOrderId='',
newClientOrderId=''):
""""""
path = '/api/v3/order'
params = {'symbol': symbol}
if orderId:
params['orderId'] = orderId
if origClientOrderId:
params['origClientOrderId'] = origClientOrderId
if newClientOrderId:
params['newClientOrderId'] = newClientOrderId
return self.addReq('DELETE', path, params, self.onCancelOrder, signed=True)
#----------------------------------------------------------------------
def queryOpenOrders(self, symbol=''):
""""""
path = '/api/v3/openOrders'
params = {}
if symbol:
params['symbol'] = symbol
return self.addReq('GET', path, params, self.onQueryOpenOrders, signed=True)
#----------------------------------------------------------------------
def queryAllOrders(self, symbol, orderId=0, limit=0):
""""""
path = '/api/v3/allOrders'
params = {'symbol': symbol}
if orderId:
params['orderId'] = orderId
if limit:
params['limit'] = limit
return self.addReq('GET', path, params, self.onQueryAllOrders, signed=True)
#----------------------------------------------------------------------
def queryAccount(self):
""""""
path = '/api/v3/account'
params = {}
return self.addReq('GET', path, params, self.onQueryAccount, signed=True)
#----------------------------------------------------------------------
def queryMyTrades(self, symbol, limit=0, fromId=0):
""""""
path = '/api/v3/myTrades'
params = {'symbol': symbol}
if limit:
params['limit'] = limit
if fromId:
params['fromId'] = fromId
return self.addReq('GET', path, params, self.onQueryMyTrades, signed=True)
#----------------------------------------------------------------------
def startStream(self):
""""""
path = '/api/v1/userDataStream'
return self.addReq('POST', path, {}, self.onStartStream, signed=True, stream=True)
#----------------------------------------------------------------------
def keepaliveStream(self, listenKey):
""""""
path = '/api/v1/userDataStream'
params = {'listenKey': listenKey}
return self.addReq('PUT', path, params, self.onKeepaliveStream, signed=True, stream=True)
#----------------------------------------------------------------------
def closeStream(self, listenKey):
""""""
path = '/api/v1/userDataStream'
params = {'listenKey': listenKey}
return self.addReq('DELETE', path, params, self.onCloseStream, signed=True, stream=True)
###################################################
## REST Callback
###################################################
#----------------------------------------------------------------------
def onError(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onQueryPing(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onQueryTime(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onQueryExchangeInfo(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onQueryDepth(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onQueryTrades(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onQueryAggTrades(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onQueryKlines(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onQueryTicker24HR(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onQueryTickerPrice(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onQueryBookTicker(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onNewOrder(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onQueryOrder(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onCancelOrder(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onQueryOpenOrders(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onQueryAllOrders(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onQueryAccount(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onQueryMyTrades(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onStartStream(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onKeepaliveStream(self, data, reqid):
""""""
print(data, reqid)
#----------------------------------------------------------------------
def onCloseStream(self, data, reqid):
""""""
print(data, reqid)
###################################################
## Websocket Function
###################################################
#----------------------------------------------------------------------
def initDataStream(self, nameList=None):
""""""
if nameList:
self.dataStreamNameList = nameList
s = '/'.join(self.dataStreamNameList)
self.dataStreamUrl = DATASTREAM_ENDPOINT + s
result = self.connectDataStream()
if result:
self.dataStreamActive = True
self.dataStreamThread = Thread(target=self.runDataStream)
self.dataStreamThread.start()
#----------------------------------------------------------------------
def runDataStream(self):
""""""
while self.dataStreamActive:
try:
stream = self.dataStreamWs.recv()
data = json.loads(stream)
self.onMarketData(data)
except:
self.onDataStreamError('Data stream connection lost')
result = self.connectDataStream()
if not result:
self.onDataStreamError(u'Waiting 3 seconds to reconnect')
sleep(3)
else:
self.onDataStreamError(u'Data stream reconnected')
#----------------------------------------------------------------------
def closeDataStream(self):
""""""
self.dataStreamActive = False
self.dataStreamThread.join()
#----------------------------------------------------------------------
def connectDataStream(self):
""""""
try:
self.dataStreamWs = create_connection(self.dataStreamUrl)
return True
except:
msg = traceback.format_exc()
self.onDataStreamError('Connecting data stream falied: %s' %msg)
return False
#----------------------------------------------------------------------
def onDataStreamError(self, msg):
""""""
print msg
#----------------------------------------------------------------------
def onMarketData(self, data):
""""""
print data
#----------------------------------------------------------------------
def initUserStream(self, key):
""""""
self.userStreamKey = key
self.userStreamUrl = USERSTREAM_ENDPOINT + key
result = self.connectUserStream()
if result:
self.userStreamActive = True
self.userStreamThread = Thread(target=self.runUserStream)
self.userStreamThread.start()
self.keepaliveThread = Thread(target=self.runKeepalive)
self.keepaliveThread.start()
#----------------------------------------------------------------------
def runUserStream(self):
""""""
while self.userStreamActive:
try:
stream = self.userStreamWs.recv()
data = json.loads(stream)
self.onUserData(data)
except:
self.onUserStreamError('User stream connection lost')
result = self.connectUserStream()
if not result:
self.onUserStreamError(u'Waiting 3 seconds to reconnect')
sleep(3)
else:
self.onUserStreamError(u'User stream reconnected')
#----------------------------------------------------------------------
def closeUserStream(self):
""""""
self.userStreamActive = False
self.userStreamThread.join()
self.keepaliveThread.join()
#----------------------------------------------------------------------
def connectUserStream(self):
""""""
try:
self.userStreamWs = create_connection(self.userStreamUrl)
return True
except:
msg = traceback.format_exc()
self.onUserStreamError('Connecting user stream falied: %s' %msg)
return False
#----------------------------------------------------------------------
def onUserStreamError(self, msg):
""""""
print msg
#----------------------------------------------------------------------
def onUserData(self, data):
""""""
print data
#----------------------------------------------------------------------
def runKeepalive(self):
""""""
while self.userStreamActive:
self.keepaliveCount += 1
if self.keepaliveCount >= 1800:
self.keepaliveCount = 0
self.keepaliveStream(self.userStreamKey)
sleep(1)

View File

@ -0,0 +1,5 @@
{
"apiKey": "",
"secretKey": "",
"symbols": ["BTCUSDT", "ETHUSDT", "ETHBTC"]
}

View File

@ -0,0 +1,10 @@
# encoding: UTF-8
from vnpy.trader import vtConstant
from .binanceGateway import BinanceGateway
gatewayClass = BinanceGateway
gatewayName = 'BINANCE'
gatewayDisplayName = u'币安'
gatewayType = vtConstant.GATEWAYTYPE_BTC
gatewayQryEnabled = True

View File

@ -0,0 +1,498 @@
# encoding: UTF-8
'''
vnpy.api.binance的gateway接入
'''
import os
import json
from datetime import datetime, timedelta
from copy import copy
from vnpy.api.binance import BinanceApi
from vnpy.trader.vtGateway import *
from vnpy.trader.vtFunction import getJsonPath, getTempPath
# 委托状态类型映射
statusMapReverse = {}
statusMapReverse['NEW'] = STATUS_NOTTRADED
statusMapReverse['PARTIALLY_FILLED'] = STATUS_PARTTRADED
statusMapReverse['FILLED'] = STATUS_ALLTRADED
statusMapReverse['CANCELED'] = STATUS_CANCELLED
statusMapReverse['REJECTED'] = STATUS_REJECTED
statusMapReverse['EXPIRED'] = STATUS_CANCELLED
# 方向映射
directionMap = {}
directionMap[DIRECTION_LONG] = 'BUY'
directionMap[DIRECTION_SHORT] = 'SELL'
directionMapReverse = {v:k for k,v in directionMap.items()}
# 价格类型映射
priceTypeMap = {}
priceTypeMap[PRICETYPE_LIMITPRICE] = 'LIMIT'
priceTypeMap[PRICETYPE_MARKETPRICE] = 'MARKET'
#----------------------------------------------------------------------
def print_dict(d):
""""""
print '-' * 30
l = d.keys()
l.sort()
for k in l:
print '%s:%s' %(k, d[k])
########################################################################
class BinanceGateway(VtGateway):
"""币安接口"""
#----------------------------------------------------------------------
def __init__(self, eventEngine, gatewayName=''):
"""Constructor"""
super(BinanceGateway, self).__init__(eventEngine, gatewayName)
self.api = GatewayApi(self)
self.qryEnabled = False # 是否要启动循环查询
self.fileName = self.gatewayName + '_connect.json'
self.filePath = getJsonPath(self.fileName, __file__)
#----------------------------------------------------------------------
def connect(self):
"""连接"""
try:
f = file(self.filePath)
except IOError:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'读取连接配置出错,请检查'
self.onLog(log)
return
# 解析json文件
setting = json.load(f)
try:
apiKey = str(setting['apiKey'])
secretKey = str(setting['secretKey'])
symbols = setting['symbols']
except KeyError:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'连接配置缺少字段,请检查'
self.onLog(log)
return
# 创建行情和交易接口对象
self.api.connect(apiKey, secretKey, symbols)
# 初始化并启动查询
#self.initQuery()
#----------------------------------------------------------------------
def subscribe(self, subscribeReq):
"""订阅行情"""
pass
#----------------------------------------------------------------------
def sendOrder(self, orderReq):
"""发单"""
return self.api.sendOrder(orderReq)
#----------------------------------------------------------------------
def cancelOrder(self, cancelOrderReq):
"""撤单"""
self.api.cancel(cancelOrderReq)
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.api.close()
#----------------------------------------------------------------------
def queryAccount(self):
""""""
self.api.queryAccount()
#----------------------------------------------------------------------
def initQuery(self):
"""初始化连续查询"""
if self.qryEnabled:
# 需要循环的查询函数列表
self.qryFunctionList = [self.queryAccount]
self.qryCount = 0 # 查询触发倒计时
self.qryTrigger = 1 # 查询触发点
self.qryNextFunction = 0 # 上次运行的查询函数索引
self.startQuery()
#----------------------------------------------------------------------
def query(self, event):
"""注册到事件处理引擎上的查询函数"""
self.qryCount += 1
if self.qryCount > self.qryTrigger:
# 清空倒计时
self.qryCount = 0
# 执行查询函数
function = self.qryFunctionList[self.qryNextFunction]
function()
# 计算下次查询函数的索引如果超过了列表长度则重新设为0
self.qryNextFunction += 1
if self.qryNextFunction == len(self.qryFunctionList):
self.qryNextFunction = 0
#----------------------------------------------------------------------
def startQuery(self):
"""启动连续查询"""
self.eventEngine.register(EVENT_TIMER, self.query)
#----------------------------------------------------------------------
def setQryEnabled(self, qryEnabled):
"""设置是否要启动循环查询"""
self.qryEnabled = qryEnabled
########################################################################
class GatewayApi(BinanceApi):
"""API实现"""
#----------------------------------------------------------------------
def __init__(self, gateway):
"""Constructor"""
super(GatewayApi, self).__init__()
self.gateway = gateway # gateway对象
self.gatewayName = gateway.gatewayName # gateway对象名称
self.date = datetime.now().strftime('%y%m%d%H%M%S')
self.orderId = 0
self.tickDict = {}
#----------------------------------------------------------------------
def connect(self, apiKey, secretKey, symbols):
"""连接服务器"""
self.init(apiKey, secretKey)
self.start()
self.writeLog(u'交易API启动成功')
l = []
for symbol in symbols:
symbol = symbol.lower()
l.append(symbol+'@ticker')
l.append(symbol+'@depth5')
self.initDataStream(l)
self.writeLog(u'行情推送订阅成功')
self.startStream()
# 初始化查询
self.queryExchangeInfo()
self.queryAccount()
for symbol in symbols:
self.queryOpenOrders(symbol.upper())
#----------------------------------------------------------------------
def writeLog(self, content):
"""发出日志"""
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = content
self.gateway.onLog(log)
#----------------------------------------------------------------------
def onError(self, data, reqid):
""""""
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = data['code']
err.errorMsg = data['msg']
self.gateway.onError(err)
#----------------------------------------------------------------------
def onQueryExchangeInfo(self, data, reqid):
""""""
for d in data['symbols']:
if str(d['symbol']) == 'ETHUSDT':
print d
contract = VtContractData()
contract.gatewayName = self.gatewayName
contract.symbol = d['symbol']
contract.exchange = EXCHANGE_BINANCE
contract.vtSymbol = '.'.join([contract.symbol, contract.exchange])
contract.name = contract.vtSymbol
contract.productClass = PRODUCT_SPOT
contract.size = 1
for f in d['filters']:
if f['filterType'] == 'PRICE_FILTER':
contract.priceTick = float(f['tickSize'])
self.gateway.onContract(contract)
#----------------------------------------------------------------------
def onNewOrder(self, data, reqid):
""""""
pass
#----------------------------------------------------------------------
def onCancelOrder(self, data, reqid):
""""""
pass
#----------------------------------------------------------------------
def onQueryOpenOrders(self, data, reqid):
""""""
for d in data:
order = VtOrderData()
order.gatewayName = self.gatewayName
order.symbol = d['symbol']
order.exchange = EXCHANGE_BINANCE
order.vtSymbol = '.'.join([order.symbol, order.exchange])
order.orderID = d['clientOrderId']
order.vtOrderID = '.'.join([order.gatewayName, order.orderID])
order.direction = directionMapReverse[d['side']]
order.price = float(d['price'])
order.totalVolume = float(d['origQty'])
order.tradedVolume = float(d['executedQty'])
date, order.orderTime = self.generateDateTime(d['time'])
order.status = statusMapReverse[d['status']]
self.gateway.onOrder(order)
#----------------------------------------------------------------------
def onQueryAllOrders(self, data, reqid):
""""""
pass
#----------------------------------------------------------------------
def onQueryAccount(self, data, reqid):
""""""
for d in data['balances']:
free = float(d['free'])
locked = float(d['locked'])
if free or locked:
pos = VtPositionData()
pos.gatewayName = self.gatewayName
pos.symbol = d['asset']
pos.exchange = EXCHANGE_BINANCE
pos.vtSymbol = '.'.join([pos.vtSymbol, pos.direction])
pos.direction = DIRECTION_LONG
pos.vtPositionName = '.'.join([pos.symbol, pos.direction])
pos.frozen = locked
pos.position = free + locked
self.gateway.onPosition(pos)
#----------------------------------------------------------------------
def onQueryMyTrades(self, data, reqid):
""""""
pass
#----------------------------------------------------------------------
def onStartStream(self, data, reqid):
""""""
key = data['listenKey']
self.initUserStream(key)
self.writeLog(u'交易推送订阅成功')
#----------------------------------------------------------------------
def onKeepaliveStream(self, data, reqid):
""""""
self.writeLog(u'交易推送刷新成功')
#----------------------------------------------------------------------
def onCloseStream(self, data, reqid):
""""""
self.writeLog(u'交易推送关闭')
#----------------------------------------------------------------------
def onUserData(self, data):
""""""
if data['e'] == 'outboundAccountInfo':
self.onPushAccount(data)
elif data['e'] == 'executionReport':
self.onPushOrder(data)
#----------------------------------------------------------------------
def onPushAccount(self, data):
""""""
for d in data['B']:
free = float(d['f'])
locked = float(d['l'])
if free or locked:
pos = VtPositionData()
pos.gatewayName = self.gatewayName
pos.symbol = d['a']
pos.exchange = EXCHANGE_BINANCE
pos.vtSymbol = '.'.join([pos.vtSymbol, pos.direction])
pos.direction = DIRECTION_LONG
pos.vtPositionName = '.'.join([pos.symbol, pos.direction])
pos.frozen = locked
pos.position = free + locked
self.gateway.onPosition(pos)
#----------------------------------------------------------------------
def onPushOrder(self, d):
""""""
# 委托更新
order = VtOrderData()
order.gatewayName = self.gatewayName
order.symbol = d['s']
order.exchange = EXCHANGE_BINANCE
order.vtSymbol = '.'.join([order.symbol, order.exchange])
if d['C'] != 'null':
order.orderID = d['C'] # 撤单原始委托号
else:
order.orderID = d['c']
order.vtOrderID = '.'.join([order.gatewayName, order.orderID])
order.direction = directionMapReverse[d['S']]
order.price = float(d['p'])
order.totalVolume = float(d['q'])
order.tradedVolume = float(d['z'])
date, order.orderTime = self.generateDateTime(d['T'])
order.status = statusMapReverse[d['X']]
self.gateway.onOrder(order)
# 成交更新
if float(d['l']):
trade = VtTradeData()
trade.gatewayName = self.gatewayName
trade.symbol = order.symbol
trade.exchange = order.exchange
trade.vtSymbol = order.vtSymbol
trade.orderID = order.orderID
trade.vtOrderID = order.vtOrderID
trade.tradeID = str(d['t'])
trade.vtTradeID = '.'.join([trade.gatewayName, trade.tradeID])
trade.direction = order.direction
trade.price = float(d['L'])
trade.volume = float(d['l'])
date, trade.tradeTime = self.generateDateTime(d['E'])
self.gateway.onTrade(trade)
#----------------------------------------------------------------------
def onMarketData(self, data):
""""""
name = data['stream']
symbol, channel = name.split('@')
symbol = symbol.upper()
if symbol in self.tickDict:
tick = self.tickDict[symbol]
else:
tick = VtTickData()
tick.gatewayName = self.gatewayName
tick.symbol = symbol
tick.exchange = EXCHANGE_BINANCE
tick.vtSymbol = '.'.join([tick.symbol, tick.exchange])
self.tickDict[symbol] = tick
data = data['data']
if channel == 'ticker':
tick.volume = float(data['v'])
tick.openPrice = float(data['o'])
tick.highPrice = float(data['h'])
tick.lowPrice = float(data['l'])
tick.lastPrice = float(data['c'])
tick.date, tick.time = self.generateDateTime(data['E'])
else:
tick.askPrice1, tick.askVolume1, buf = data['asks'][0]
tick.askPrice2, tick.askVolume2, buf = data['asks'][1]
tick.askPrice3, tick.askVolume3, buf = data['asks'][2]
tick.askPrice4, tick.askVolume4, buf = data['asks'][3]
tick.askPrice5, tick.askVolume5, buf = data['asks'][4]
tick.bidPrice1, tick.bidVolume1, buf = data['bids'][0]
tick.bidPrice2, tick.bidVolume2, buf = data['bids'][1]
tick.bidPrice3, tick.bidVolume3, buf = data['bids'][2]
tick.bidPrice4, tick.bidVolume4, buf = data['bids'][3]
tick.bidPrice5, tick.bidVolume5, buf = data['bids'][4]
tick.askPrice1 = float(tick.askPrice1)
tick.askPrice2 = float(tick.askPrice2)
tick.askPrice3 = float(tick.askPrice3)
tick.askPrice4 = float(tick.askPrice4)
tick.askPrice5 = float(tick.askPrice5)
tick.bidPrice1 = float(tick.bidPrice1)
tick.bidPrice2 = float(tick.bidPrice2)
tick.bidPrice3 = float(tick.bidPrice3)
tick.bidPrice4 = float(tick.bidPrice4)
tick.bidPrice5 = float(tick.bidPrice5)
tick.askVolume1 = float(tick.askVolume1)
tick.askVolume2 = float(tick.askVolume2)
tick.askVolume3 = float(tick.askVolume3)
tick.askVolume4 = float(tick.askVolume4)
tick.askVolume5 = float(tick.askVolume5)
tick.bidVolume1 = float(tick.bidVolume1)
tick.bidVolume2 = float(tick.bidVolume2)
tick.bidVolume3 = float(tick.bidVolume3)
tick.bidVolume4 = float(tick.bidVolume4)
tick.bidVolume5 = float(tick.bidVolume5)
self.gateway.onTick(copy(tick))
#----------------------------------------------------------------------
def onDataStreamError(self, msg):
""""""
self.writeLog(msg)
#----------------------------------------------------------------------
def onUserStreamError(self, msg):
""""""
self.writeLog(msg)
#----------------------------------------------------------------------
def generateDateTime(self, s):
"""生成时间"""
dt = datetime.fromtimestamp(float(s)/1e3)
time = dt.strftime("%H:%M:%S.%f")
date = dt.strftime("%Y%m%d")
return date, time
#----------------------------------------------------------------------
def sendOrder(self, orderReq):
""""""
orderReq.volume = 0.02
self.orderId += 1
orderId = self.date + str(self.orderId).rjust(6, '0')
vtOrderID = '.'.join([self.gatewayName, orderId])
side = directionMap.get(orderReq.direction, '')
type_ = priceTypeMap.get(orderReq.priceType, PRICETYPE_LIMITPRICE)
self.newOrder(orderReq.symbol, side, type_, orderReq.price,
orderReq.volume, 'GTC', newClientOrderId=orderId)
return vtOrderID
#----------------------------------------------------------------------
def cancel(self, cancelOrderReq):
""""""
self.cancelOrder(cancelOrderReq.symbol, origClientOrderId=cancelOrderReq.orderID)