From ba647a1b631fcf328579a38ddf6bbf88c0c12be5 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Mon, 4 Jun 2018 14:12:25 +0800 Subject: [PATCH] =?UTF-8?q?[Add]=E6=96=B0=E5=A2=9E=E5=B8=81=E5=AE=89?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/api/binance/__init__.py | 1 + vnpy/api/binance/test.py | 43 ++ vnpy/api/binance/vnbinance.py | 624 ++++++++++++++++++ .../binanceGateway/BINANCE_connect.json | 5 + .../trader/gateway/binanceGateway/__init__.py | 10 + .../gateway/binanceGateway/binanceGateway.py | 498 ++++++++++++++ 6 files changed, 1181 insertions(+) create mode 100644 vnpy/api/binance/__init__.py create mode 100644 vnpy/api/binance/test.py create mode 100644 vnpy/api/binance/vnbinance.py create mode 100644 vnpy/trader/gateway/binanceGateway/BINANCE_connect.json create mode 100644 vnpy/trader/gateway/binanceGateway/__init__.py create mode 100644 vnpy/trader/gateway/binanceGateway/binanceGateway.py diff --git a/vnpy/api/binance/__init__.py b/vnpy/api/binance/__init__.py new file mode 100644 index 00000000..57a962fc --- /dev/null +++ b/vnpy/api/binance/__init__.py @@ -0,0 +1 @@ +from .vnbinance import BinanceApi \ No newline at end of file diff --git a/vnpy/api/binance/test.py b/vnpy/api/binance/test.py new file mode 100644 index 00000000..83dd365c --- /dev/null +++ b/vnpy/api/binance/test.py @@ -0,0 +1,43 @@ +from time import sleep + +from vnbinance import BinanceApi + + +if __name__ == '__main__': + apiKey = '' + secretKey = '' + + api = BinanceApi() + api.init(apiKey, secretKey) + api.start() + + #api.queryPing() + #api.queryTime() + #api.queryExchangeInfo() + + api.queryDepth('BTCUSDT') + #api.queryDepth('BTCUSDT') + #api.queryTrades('BTCUSDT') + #api.queryAggTrades('BTCUSDT') + #api.queryKlines('BTCUSDT', '1m') + #api.queryTicker24HR() + #api.queryTickerPrice() + #api.queryBookTicker() + + api.queryAccount() + #api.queryOrder('BTCUSDT', '1231231') + #api.queryOpenOrders('BTCUSDT') + #api.queryAllOrders('BTCUSDT') + #api.queryMyTrades('BTCUSDT') + + #api.startStream() + #api.keepaliveStream('12312312') + #api.closeStream('123213') + + #api.newOrder('BTCUSDT', 'BUY', 'LIMIT', 3000, 1, 'GTC') + #api.cancelOrder('BTCUSDT', '132213123') + + #api.initDataStream(['btcusdt@ticker', 'btcusdt@depth5']) + #api.initUserStream('NXvaiFwZz2LuKqINVerKOnWaQQG1JhdQNejiZKY9Kmgk4lZgTvm3nRAnXJK7') + + raw_input() \ No newline at end of file diff --git a/vnpy/api/binance/vnbinance.py b/vnpy/api/binance/vnbinance.py new file mode 100644 index 00000000..8e9ade96 --- /dev/null +++ b/vnpy/api/binance/vnbinance.py @@ -0,0 +1,624 @@ +# encoding: UTF-8 + +import json +import requests +import hmac +import hashlib +import traceback + +from queue import Queue, Empty +from threading import Thread +from multiprocessing.dummy import Pool +from time import time, sleep +from urllib import urlencode + +from websocket import create_connection + + + +REST_ENDPOINT = 'https://www.binance.com' +DATASTREAM_ENDPOINT = 'wss://stream.binance.com:9443/stream?streams=' +USERSTREAM_ENDPOINT = 'wss://stream.binance.com:9443/ws/' + + + +######################################################################## +class BinanceApi(object): + """""" + + ################################################### + ## Basic Function + ################################################### + + #---------------------------------------------------------------------- + def __init__(self): + """Constructor""" + self.apiKey = '' + self.secretKey = '' + + self.active = False + self.reqid = 0 + self.queue = Queue() + self.pool = None + + self.headers = {} + self.secret = '' + self.recvWindow = 5000 + + self.dataStreamNameList = [] + self.dataStreamUrl = '' + self.dataStreamActive = False + self.dataStreamWs = None + self.dataStreamThread = None + + self.userStreamKey = '' + self.userStreamUrl = '' + self.userStreamActive = False + self.userStreamWs = None + self.userStreamThread = None + + self.keepaliveCount = 0 + self.keepaliveThread = None + + #---------------------------------------------------------------------- + def init(self, apiKey, secretKey, recvWindow=5000): + """""" + self.apiKey = apiKey + self.secretKey = secretKey + + self.headers['X-MBX-APIKEY'] = apiKey + self.secret = bytes(secretKey.encode('utf-8')) + self.recvWindow = recvWindow + + #---------------------------------------------------------------------- + def start(self, n=10): + """""" + if self.active: + return + + self.active = True + + self.pool = Pool(n) + self.pool.map_async(self.run, range(n)) + + #---------------------------------------------------------------------- + def close(self): + """""" + self.active = False + self.pool.close() + self.pool.join() + + #---------------------------------------------------------------------- + def request(self, method, path, params=None, signed=False, stream=False): + """""" + if not signed: + url = REST_ENDPOINT + path + headers = {} + else: + if not stream: + params['recvWindow'] = self.recvWindow + params['timestamp'] = int(time()*1000) + query = urlencode(sorted(params.items())) + + signature = hmac.new(self.secret, query.encode('utf-8'), + hashlib.sha256).hexdigest() + query += "&signature={}".format(signature) + + url = REST_ENDPOINT + path + '?' + query + params = None # 参数添加到query中后,清空参数字典 + else: + if params: + query = urlencode(sorted(params.items())) + url = REST_ENDPOINT + path + '?' + query + params = None + else: + url = REST_ENDPOINT + path + + headers = self.headers + + try: + resp = requests.request(method, url, params=params, headers=headers) + + if resp.status_code == 200: + return True, resp.json() + else: + error = { + 'method': method, + 'params': params, + 'code': resp.status_code, + 'msg': resp.json()['msg'] + } + return False, error + except Exception as e: + error = { + 'method': method, + 'params': params, + 'code': e, + 'msg': traceback.format_exc() + } + return False, error + + #---------------------------------------------------------------------- + def addReq(self, method, path, params, callback, signed=False, stream=False): + """添加请求""" + self.reqid += 1 + req = (method, path, params, callback, signed, stream, self.reqid) + self.queue.put(req) + return self.reqid + + #---------------------------------------------------------------------- + def processReq(self, req): + """""" + method, path, params, callback, signed, stream, reqid = req + result, data = self.request(method, path, params, signed, stream) + + if result: + callback(data, reqid) + else: + self.onError(data, reqid) + + #---------------------------------------------------------------------- + def run(self, n): + """""" + while self.active: + try: + req = self.queue.get(timeout=1) + self.processReq(req) + except Empty: + pass + + ################################################### + ## REST Function + ################################################### + + #---------------------------------------------------------------------- + def queryPing(self): + """""" + path = '/api/v1/ping' + return self.addReq('GET', path, {}, self.onQueryPing) + + #---------------------------------------------------------------------- + def queryTime(self): + """""" + path = '/api/v1/time' + return self.addReq('GET', path, {}, self.onQueryTime) + + #---------------------------------------------------------------------- + def queryExchangeInfo(self): + """""" + path = '/api/v1/exchangeInfo' + return self.addReq('GET', path, {}, self.onQueryExchangeInfo) + + #---------------------------------------------------------------------- + def queryDepth(self, symbol, limit=0): + """""" + path = '/api/v1/depth' + params = {'symbol': symbol} + if limit: + params['limit'] = limit + return self.addReq('GET', path, params, self.onQueryDepth) + + #---------------------------------------------------------------------- + def queryTrades(self, symbol, limit=0): + """""" + path = '/api/v1/trades' + params = {'symbol': symbol} + if limit: + params['limit'] = limit + return self.addReq('GET', path, params, self.onQueryTrades) + + #---------------------------------------------------------------------- + def queryAggTrades(self, symbol, fromId=0, startTime=0, endTime=0, limit=0): + """""" + path = '/api/v1/aggTrades' + + params = {'symbol': symbol} + if fromId: + params['fromId'] = fromId + if startTime: + params['startTime'] = startTime + if endTime: + params['endTime'] = endTime + if limit: + params['limit'] = limit + + return self.addReq('GET', path, params, self.onQueryAggTrades) + + #---------------------------------------------------------------------- + def queryKlines(self, symbol, interval, limit=0, startTime=0, endTime=0): + """""" + path = '/api/v1/klines' + + params = { + 'symbol': symbol, + 'interval': interval + } + if limit: + params['limit'] = limit + if startTime: + params['startTime'] = startTime + if endTime: + params['endTime'] = endTime + + return self.addReq('GET', path, params, self.onQueryKlines) + + #---------------------------------------------------------------------- + def queryTicker24HR(self, symbol=''): + """""" + path = '/api/v1/ticker/24hr' + params = {} + if symbol: + params['symbol'] = symbol + return self.addReq('GET', path, params, self.onQueryTicker24HR) + + #---------------------------------------------------------------------- + def queryTickerPrice(self, symbol=''): + """""" + path = '/api/v3/ticker/price' + params = {} + if symbol: + params['symbol'] = symbol + return self.addReq('GET', path, params, self.onQueryTickerPrice) + + #---------------------------------------------------------------------- + def queryBookTicker(self, symbol=''): + """""" + path = '/api/v3/ticker/bookTicker' + params = {} + if symbol: + params['symbol'] = symbol + return self.addReq('GET', path, params, self.onQueryBookTicker) + + #---------------------------------------------------------------------- + def newOrder(self, symbol, side, type_, price, quantity, timeInForce, + newClientOrderId='', stopPrice=0, icebergQty=0, newOrderRespType=''): + """""" + path = '/api/v3/order' + + params = { + 'symbol': symbol, + 'side': side, + 'type': type_, + 'price': price, + 'quantity': quantity, + 'timeInForce': timeInForce + } + if newClientOrderId: + params['newClientOrderId'] = newClientOrderId + if timeInForce: + params['timeInForce'] = timeInForce + if stopPrice: + params['stopPrice'] = stopPrice + if icebergQty: + params['icebergQty'] = icebergQty + if newOrderRespType: + params['newOrderRespType'] = newOrderRespType + + return self.addReq('POST', path, params, self.onNewOrder, signed=True) + + #---------------------------------------------------------------------- + def queryOrder(self, symbol, orderId=0, origClientOrderId=0): + """""" + path = '/api/v3/order' + params = {'symbol': symbol} + if orderId: + params['orderId'] = orderId + if origClientOrderId: + params['origClientOrderId'] = origClientOrderId + return self.addReq('GET', path, params, self.onQueryOrder, signed=True) + + #---------------------------------------------------------------------- + def cancelOrder(self, symbol, orderId=0, origClientOrderId='', + newClientOrderId=''): + """""" + path = '/api/v3/order' + params = {'symbol': symbol} + if orderId: + params['orderId'] = orderId + if origClientOrderId: + params['origClientOrderId'] = origClientOrderId + if newClientOrderId: + params['newClientOrderId'] = newClientOrderId + return self.addReq('DELETE', path, params, self.onCancelOrder, signed=True) + + #---------------------------------------------------------------------- + def queryOpenOrders(self, symbol=''): + """""" + path = '/api/v3/openOrders' + params = {} + if symbol: + params['symbol'] = symbol + return self.addReq('GET', path, params, self.onQueryOpenOrders, signed=True) + + #---------------------------------------------------------------------- + def queryAllOrders(self, symbol, orderId=0, limit=0): + """""" + path = '/api/v3/allOrders' + params = {'symbol': symbol} + if orderId: + params['orderId'] = orderId + if limit: + params['limit'] = limit + return self.addReq('GET', path, params, self.onQueryAllOrders, signed=True) + + #---------------------------------------------------------------------- + def queryAccount(self): + """""" + path = '/api/v3/account' + params = {} + return self.addReq('GET', path, params, self.onQueryAccount, signed=True) + + #---------------------------------------------------------------------- + def queryMyTrades(self, symbol, limit=0, fromId=0): + """""" + path = '/api/v3/myTrades' + params = {'symbol': symbol} + if limit: + params['limit'] = limit + if fromId: + params['fromId'] = fromId + return self.addReq('GET', path, params, self.onQueryMyTrades, signed=True) + + #---------------------------------------------------------------------- + def startStream(self): + """""" + path = '/api/v1/userDataStream' + return self.addReq('POST', path, {}, self.onStartStream, signed=True, stream=True) + + #---------------------------------------------------------------------- + def keepaliveStream(self, listenKey): + """""" + path = '/api/v1/userDataStream' + params = {'listenKey': listenKey} + return self.addReq('PUT', path, params, self.onKeepaliveStream, signed=True, stream=True) + + #---------------------------------------------------------------------- + def closeStream(self, listenKey): + """""" + path = '/api/v1/userDataStream' + params = {'listenKey': listenKey} + return self.addReq('DELETE', path, params, self.onCloseStream, signed=True, stream=True) + + ################################################### + ## REST Callback + ################################################### + + #---------------------------------------------------------------------- + def onError(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onQueryPing(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onQueryTime(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onQueryExchangeInfo(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onQueryDepth(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onQueryTrades(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onQueryAggTrades(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onQueryKlines(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onQueryTicker24HR(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onQueryTickerPrice(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onQueryBookTicker(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onNewOrder(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onQueryOrder(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onCancelOrder(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onQueryOpenOrders(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onQueryAllOrders(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onQueryAccount(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onQueryMyTrades(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onStartStream(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onKeepaliveStream(self, data, reqid): + """""" + print(data, reqid) + + #---------------------------------------------------------------------- + def onCloseStream(self, data, reqid): + """""" + print(data, reqid) + + ################################################### + ## Websocket Function + ################################################### + + #---------------------------------------------------------------------- + def initDataStream(self, nameList=None): + """""" + if nameList: + self.dataStreamNameList = nameList + s = '/'.join(self.dataStreamNameList) + self.dataStreamUrl = DATASTREAM_ENDPOINT + s + + result = self.connectDataStream() + if result: + self.dataStreamActive = True + self.dataStreamThread = Thread(target=self.runDataStream) + self.dataStreamThread.start() + + #---------------------------------------------------------------------- + def runDataStream(self): + """""" + while self.dataStreamActive: + try: + stream = self.dataStreamWs.recv() + data = json.loads(stream) + self.onMarketData(data) + except: + self.onDataStreamError('Data stream connection lost') + result = self.connectDataStream() + if not result: + self.onDataStreamError(u'Waiting 3 seconds to reconnect') + sleep(3) + else: + self.onDataStreamError(u'Data stream reconnected') + + #---------------------------------------------------------------------- + def closeDataStream(self): + """""" + self.dataStreamActive = False + self.dataStreamThread.join() + + #---------------------------------------------------------------------- + def connectDataStream(self): + """""" + try: + self.dataStreamWs = create_connection(self.dataStreamUrl) + return True + except: + msg = traceback.format_exc() + self.onDataStreamError('Connecting data stream falied: %s' %msg) + return False + + #---------------------------------------------------------------------- + def onDataStreamError(self, msg): + """""" + print msg + + #---------------------------------------------------------------------- + def onMarketData(self, data): + """""" + print data + + #---------------------------------------------------------------------- + def initUserStream(self, key): + """""" + self.userStreamKey = key + self.userStreamUrl = USERSTREAM_ENDPOINT + key + + result = self.connectUserStream() + if result: + self.userStreamActive = True + self.userStreamThread = Thread(target=self.runUserStream) + self.userStreamThread.start() + + self.keepaliveThread = Thread(target=self.runKeepalive) + self.keepaliveThread.start() + + #---------------------------------------------------------------------- + def runUserStream(self): + """""" + while self.userStreamActive: + try: + stream = self.userStreamWs.recv() + data = json.loads(stream) + self.onUserData(data) + except: + self.onUserStreamError('User stream connection lost') + result = self.connectUserStream() + if not result: + self.onUserStreamError(u'Waiting 3 seconds to reconnect') + sleep(3) + else: + self.onUserStreamError(u'User stream reconnected') + + #---------------------------------------------------------------------- + def closeUserStream(self): + """""" + self.userStreamActive = False + self.userStreamThread.join() + self.keepaliveThread.join() + + #---------------------------------------------------------------------- + def connectUserStream(self): + """""" + try: + self.userStreamWs = create_connection(self.userStreamUrl) + return True + except: + msg = traceback.format_exc() + self.onUserStreamError('Connecting user stream falied: %s' %msg) + return False + + #---------------------------------------------------------------------- + def onUserStreamError(self, msg): + """""" + print msg + + #---------------------------------------------------------------------- + def onUserData(self, data): + """""" + print data + + #---------------------------------------------------------------------- + def runKeepalive(self): + """""" + while self.userStreamActive: + self.keepaliveCount += 1 + + if self.keepaliveCount >= 1800: + self.keepaliveCount = 0 + self.keepaliveStream(self.userStreamKey) + + sleep(1) + \ No newline at end of file diff --git a/vnpy/trader/gateway/binanceGateway/BINANCE_connect.json b/vnpy/trader/gateway/binanceGateway/BINANCE_connect.json new file mode 100644 index 00000000..01d370e4 --- /dev/null +++ b/vnpy/trader/gateway/binanceGateway/BINANCE_connect.json @@ -0,0 +1,5 @@ +{ + "apiKey": "", + "secretKey": "", + "symbols": ["BTCUSDT", "ETHUSDT", "ETHBTC"] +} \ No newline at end of file diff --git a/vnpy/trader/gateway/binanceGateway/__init__.py b/vnpy/trader/gateway/binanceGateway/__init__.py new file mode 100644 index 00000000..05256200 --- /dev/null +++ b/vnpy/trader/gateway/binanceGateway/__init__.py @@ -0,0 +1,10 @@ +# encoding: UTF-8 + +from vnpy.trader import vtConstant +from .binanceGateway import BinanceGateway + +gatewayClass = BinanceGateway +gatewayName = 'BINANCE' +gatewayDisplayName = u'币安' +gatewayType = vtConstant.GATEWAYTYPE_BTC +gatewayQryEnabled = True \ No newline at end of file diff --git a/vnpy/trader/gateway/binanceGateway/binanceGateway.py b/vnpy/trader/gateway/binanceGateway/binanceGateway.py new file mode 100644 index 00000000..7bc1e0c4 --- /dev/null +++ b/vnpy/trader/gateway/binanceGateway/binanceGateway.py @@ -0,0 +1,498 @@ +# encoding: UTF-8 + +''' +vnpy.api.binance的gateway接入 +''' + +import os +import json +from datetime import datetime, timedelta +from copy import copy + +from vnpy.api.binance import BinanceApi +from vnpy.trader.vtGateway import * +from vnpy.trader.vtFunction import getJsonPath, getTempPath + + +# 委托状态类型映射 +statusMapReverse = {} +statusMapReverse['NEW'] = STATUS_NOTTRADED +statusMapReverse['PARTIALLY_FILLED'] = STATUS_PARTTRADED +statusMapReverse['FILLED'] = STATUS_ALLTRADED +statusMapReverse['CANCELED'] = STATUS_CANCELLED +statusMapReverse['REJECTED'] = STATUS_REJECTED +statusMapReverse['EXPIRED'] = STATUS_CANCELLED + +# 方向映射 +directionMap = {} +directionMap[DIRECTION_LONG] = 'BUY' +directionMap[DIRECTION_SHORT] = 'SELL' +directionMapReverse = {v:k for k,v in directionMap.items()} + +# 价格类型映射 +priceTypeMap = {} +priceTypeMap[PRICETYPE_LIMITPRICE] = 'LIMIT' +priceTypeMap[PRICETYPE_MARKETPRICE] = 'MARKET' + + + + +#---------------------------------------------------------------------- +def print_dict(d): + """""" + print '-' * 30 + l = d.keys() + l.sort() + for k in l: + print '%s:%s' %(k, d[k]) + + +######################################################################## +class BinanceGateway(VtGateway): + """币安接口""" + + #---------------------------------------------------------------------- + def __init__(self, eventEngine, gatewayName=''): + """Constructor""" + super(BinanceGateway, self).__init__(eventEngine, gatewayName) + + self.api = GatewayApi(self) + + self.qryEnabled = False # 是否要启动循环查询 + + self.fileName = self.gatewayName + '_connect.json' + self.filePath = getJsonPath(self.fileName, __file__) + + #---------------------------------------------------------------------- + def connect(self): + """连接""" + try: + f = file(self.filePath) + except IOError: + log = VtLogData() + log.gatewayName = self.gatewayName + log.logContent = u'读取连接配置出错,请检查' + self.onLog(log) + return + + # 解析json文件 + setting = json.load(f) + try: + apiKey = str(setting['apiKey']) + secretKey = str(setting['secretKey']) + symbols = setting['symbols'] + except KeyError: + log = VtLogData() + log.gatewayName = self.gatewayName + log.logContent = u'连接配置缺少字段,请检查' + self.onLog(log) + return + + # 创建行情和交易接口对象 + self.api.connect(apiKey, secretKey, symbols) + + # 初始化并启动查询 + #self.initQuery() + + #---------------------------------------------------------------------- + def subscribe(self, subscribeReq): + """订阅行情""" + pass + + #---------------------------------------------------------------------- + def sendOrder(self, orderReq): + """发单""" + return self.api.sendOrder(orderReq) + + #---------------------------------------------------------------------- + def cancelOrder(self, cancelOrderReq): + """撤单""" + self.api.cancel(cancelOrderReq) + + #---------------------------------------------------------------------- + def close(self): + """关闭""" + self.api.close() + + #---------------------------------------------------------------------- + def queryAccount(self): + """""" + self.api.queryAccount() + + #---------------------------------------------------------------------- + def initQuery(self): + """初始化连续查询""" + if self.qryEnabled: + # 需要循环的查询函数列表 + self.qryFunctionList = [self.queryAccount] + + self.qryCount = 0 # 查询触发倒计时 + self.qryTrigger = 1 # 查询触发点 + self.qryNextFunction = 0 # 上次运行的查询函数索引 + + self.startQuery() + + #---------------------------------------------------------------------- + def query(self, event): + """注册到事件处理引擎上的查询函数""" + self.qryCount += 1 + + if self.qryCount > self.qryTrigger: + # 清空倒计时 + self.qryCount = 0 + + # 执行查询函数 + function = self.qryFunctionList[self.qryNextFunction] + function() + + # 计算下次查询函数的索引,如果超过了列表长度,则重新设为0 + self.qryNextFunction += 1 + if self.qryNextFunction == len(self.qryFunctionList): + self.qryNextFunction = 0 + + #---------------------------------------------------------------------- + def startQuery(self): + """启动连续查询""" + self.eventEngine.register(EVENT_TIMER, self.query) + + #---------------------------------------------------------------------- + def setQryEnabled(self, qryEnabled): + """设置是否要启动循环查询""" + self.qryEnabled = qryEnabled + + +######################################################################## +class GatewayApi(BinanceApi): + """API实现""" + + #---------------------------------------------------------------------- + def __init__(self, gateway): + """Constructor""" + super(GatewayApi, self).__init__() + + self.gateway = gateway # gateway对象 + self.gatewayName = gateway.gatewayName # gateway对象名称 + + self.date = datetime.now().strftime('%y%m%d%H%M%S') + self.orderId = 0 + + self.tickDict = {} + + #---------------------------------------------------------------------- + def connect(self, apiKey, secretKey, symbols): + """连接服务器""" + self.init(apiKey, secretKey) + self.start() + self.writeLog(u'交易API启动成功') + + l = [] + for symbol in symbols: + symbol = symbol.lower() + l.append(symbol+'@ticker') + l.append(symbol+'@depth5') + self.initDataStream(l) + self.writeLog(u'行情推送订阅成功') + + self.startStream() + + # 初始化查询 + self.queryExchangeInfo() + self.queryAccount() + + for symbol in symbols: + self.queryOpenOrders(symbol.upper()) + + #---------------------------------------------------------------------- + def writeLog(self, content): + """发出日志""" + log = VtLogData() + log.gatewayName = self.gatewayName + log.logContent = content + self.gateway.onLog(log) + + #---------------------------------------------------------------------- + def onError(self, data, reqid): + """""" + err = VtErrorData() + err.gatewayName = self.gatewayName + err.errorID = data['code'] + err.errorMsg = data['msg'] + self.gateway.onError(err) + + #---------------------------------------------------------------------- + def onQueryExchangeInfo(self, data, reqid): + """""" + for d in data['symbols']: + if str(d['symbol']) == 'ETHUSDT': + print d + + contract = VtContractData() + contract.gatewayName = self.gatewayName + + contract.symbol = d['symbol'] + contract.exchange = EXCHANGE_BINANCE + contract.vtSymbol = '.'.join([contract.symbol, contract.exchange]) + contract.name = contract.vtSymbol + contract.productClass = PRODUCT_SPOT + contract.size = 1 + + for f in d['filters']: + if f['filterType'] == 'PRICE_FILTER': + contract.priceTick = float(f['tickSize']) + + self.gateway.onContract(contract) + + #---------------------------------------------------------------------- + def onNewOrder(self, data, reqid): + """""" + pass + + #---------------------------------------------------------------------- + def onCancelOrder(self, data, reqid): + """""" + pass + + #---------------------------------------------------------------------- + def onQueryOpenOrders(self, data, reqid): + """""" + for d in data: + order = VtOrderData() + order.gatewayName = self.gatewayName + + order.symbol = d['symbol'] + order.exchange = EXCHANGE_BINANCE + order.vtSymbol = '.'.join([order.symbol, order.exchange]) + + order.orderID = d['clientOrderId'] + order.vtOrderID = '.'.join([order.gatewayName, order.orderID]) + + order.direction = directionMapReverse[d['side']] + order.price = float(d['price']) + order.totalVolume = float(d['origQty']) + order.tradedVolume = float(d['executedQty']) + date, order.orderTime = self.generateDateTime(d['time']) + order.status = statusMapReverse[d['status']] + + self.gateway.onOrder(order) + + #---------------------------------------------------------------------- + def onQueryAllOrders(self, data, reqid): + """""" + pass + + #---------------------------------------------------------------------- + def onQueryAccount(self, data, reqid): + """""" + for d in data['balances']: + free = float(d['free']) + locked = float(d['locked']) + + if free or locked: + pos = VtPositionData() + pos.gatewayName = self.gatewayName + pos.symbol = d['asset'] + pos.exchange = EXCHANGE_BINANCE + pos.vtSymbol = '.'.join([pos.vtSymbol, pos.direction]) + pos.direction = DIRECTION_LONG + pos.vtPositionName = '.'.join([pos.symbol, pos.direction]) + pos.frozen = locked + pos.position = free + locked + self.gateway.onPosition(pos) + + #---------------------------------------------------------------------- + def onQueryMyTrades(self, data, reqid): + """""" + pass + + #---------------------------------------------------------------------- + def onStartStream(self, data, reqid): + """""" + key = data['listenKey'] + self.initUserStream(key) + self.writeLog(u'交易推送订阅成功') + + #---------------------------------------------------------------------- + def onKeepaliveStream(self, data, reqid): + """""" + self.writeLog(u'交易推送刷新成功') + + #---------------------------------------------------------------------- + def onCloseStream(self, data, reqid): + """""" + self.writeLog(u'交易推送关闭') + + #---------------------------------------------------------------------- + def onUserData(self, data): + """""" + if data['e'] == 'outboundAccountInfo': + self.onPushAccount(data) + elif data['e'] == 'executionReport': + self.onPushOrder(data) + + #---------------------------------------------------------------------- + def onPushAccount(self, data): + """""" + for d in data['B']: + free = float(d['f']) + locked = float(d['l']) + + if free or locked: + pos = VtPositionData() + pos.gatewayName = self.gatewayName + pos.symbol = d['a'] + pos.exchange = EXCHANGE_BINANCE + pos.vtSymbol = '.'.join([pos.vtSymbol, pos.direction]) + pos.direction = DIRECTION_LONG + pos.vtPositionName = '.'.join([pos.symbol, pos.direction]) + pos.frozen = locked + pos.position = free + locked + self.gateway.onPosition(pos) + + #---------------------------------------------------------------------- + def onPushOrder(self, d): + """""" + # 委托更新 + order = VtOrderData() + order.gatewayName = self.gatewayName + + order.symbol = d['s'] + order.exchange = EXCHANGE_BINANCE + order.vtSymbol = '.'.join([order.symbol, order.exchange]) + + if d['C'] != 'null': + order.orderID = d['C'] # 撤单原始委托号 + else: + order.orderID = d['c'] + order.vtOrderID = '.'.join([order.gatewayName, order.orderID]) + + order.direction = directionMapReverse[d['S']] + order.price = float(d['p']) + order.totalVolume = float(d['q']) + order.tradedVolume = float(d['z']) + date, order.orderTime = self.generateDateTime(d['T']) + order.status = statusMapReverse[d['X']] + + self.gateway.onOrder(order) + + # 成交更新 + if float(d['l']): + trade = VtTradeData() + trade.gatewayName = self.gatewayName + + trade.symbol = order.symbol + trade.exchange = order.exchange + trade.vtSymbol = order.vtSymbol + trade.orderID = order.orderID + trade.vtOrderID = order.vtOrderID + trade.tradeID = str(d['t']) + trade.vtTradeID = '.'.join([trade.gatewayName, trade.tradeID]) + trade.direction = order.direction + trade.price = float(d['L']) + trade.volume = float(d['l']) + date, trade.tradeTime = self.generateDateTime(d['E']) + + self.gateway.onTrade(trade) + + #---------------------------------------------------------------------- + def onMarketData(self, data): + """""" + name = data['stream'] + symbol, channel = name.split('@') + symbol = symbol.upper() + + if symbol in self.tickDict: + tick = self.tickDict[symbol] + else: + tick = VtTickData() + tick.gatewayName = self.gatewayName + tick.symbol = symbol + tick.exchange = EXCHANGE_BINANCE + tick.vtSymbol = '.'.join([tick.symbol, tick.exchange]) + + self.tickDict[symbol] = tick + + data = data['data'] + if channel == 'ticker': + tick.volume = float(data['v']) + tick.openPrice = float(data['o']) + tick.highPrice = float(data['h']) + tick.lowPrice = float(data['l']) + tick.lastPrice = float(data['c']) + tick.date, tick.time = self.generateDateTime(data['E']) + else: + tick.askPrice1, tick.askVolume1, buf = data['asks'][0] + tick.askPrice2, tick.askVolume2, buf = data['asks'][1] + tick.askPrice3, tick.askVolume3, buf = data['asks'][2] + tick.askPrice4, tick.askVolume4, buf = data['asks'][3] + tick.askPrice5, tick.askVolume5, buf = data['asks'][4] + + tick.bidPrice1, tick.bidVolume1, buf = data['bids'][0] + tick.bidPrice2, tick.bidVolume2, buf = data['bids'][1] + tick.bidPrice3, tick.bidVolume3, buf = data['bids'][2] + tick.bidPrice4, tick.bidVolume4, buf = data['bids'][3] + tick.bidPrice5, tick.bidVolume5, buf = data['bids'][4] + + tick.askPrice1 = float(tick.askPrice1) + tick.askPrice2 = float(tick.askPrice2) + tick.askPrice3 = float(tick.askPrice3) + tick.askPrice4 = float(tick.askPrice4) + tick.askPrice5 = float(tick.askPrice5) + + tick.bidPrice1 = float(tick.bidPrice1) + tick.bidPrice2 = float(tick.bidPrice2) + tick.bidPrice3 = float(tick.bidPrice3) + tick.bidPrice4 = float(tick.bidPrice4) + tick.bidPrice5 = float(tick.bidPrice5) + + tick.askVolume1 = float(tick.askVolume1) + tick.askVolume2 = float(tick.askVolume2) + tick.askVolume3 = float(tick.askVolume3) + tick.askVolume4 = float(tick.askVolume4) + tick.askVolume5 = float(tick.askVolume5) + + tick.bidVolume1 = float(tick.bidVolume1) + tick.bidVolume2 = float(tick.bidVolume2) + tick.bidVolume3 = float(tick.bidVolume3) + tick.bidVolume4 = float(tick.bidVolume4) + tick.bidVolume5 = float(tick.bidVolume5) + + self.gateway.onTick(copy(tick)) + + #---------------------------------------------------------------------- + def onDataStreamError(self, msg): + """""" + self.writeLog(msg) + + #---------------------------------------------------------------------- + def onUserStreamError(self, msg): + """""" + self.writeLog(msg) + + #---------------------------------------------------------------------- + def generateDateTime(self, s): + """生成时间""" + dt = datetime.fromtimestamp(float(s)/1e3) + time = dt.strftime("%H:%M:%S.%f") + date = dt.strftime("%Y%m%d") + return date, time + + #---------------------------------------------------------------------- + def sendOrder(self, orderReq): + """""" + orderReq.volume = 0.02 + + self.orderId += 1 + orderId = self.date + str(self.orderId).rjust(6, '0') + vtOrderID = '.'.join([self.gatewayName, orderId]) + side = directionMap.get(orderReq.direction, '') + type_ = priceTypeMap.get(orderReq.priceType, PRICETYPE_LIMITPRICE) + + self.newOrder(orderReq.symbol, side, type_, orderReq.price, + orderReq.volume, 'GTC', newClientOrderId=orderId) + + return vtOrderID + + #---------------------------------------------------------------------- + def cancel(self, cancelOrderReq): + """""" + self.cancelOrder(cancelOrderReq.symbol, origClientOrderId=cancelOrderReq.orderID) \ No newline at end of file