diff --git a/vnpy/api/binance/client.py b/vnpy/api/binance/client.py index 0f37fe2f..70a49220 100644 --- a/vnpy/api/binance/client.py +++ b/vnpy/api/binance/client.py @@ -6,6 +6,7 @@ import hmac import requests import six import time +import sys from .exceptions import BinanceAPIException, BinanceRequestException, BinanceWithdrawException if six.PY2: @@ -68,7 +69,7 @@ class Client(object): ORDER_RESP_TYPE_RESULT = 'RESULT' ORDER_RESP_TYPE_FULL = 'FULL' - def __init__(self, api_key, api_secret): + def __init__(self, api_key, api_secret, api=None): """Binance API Client constructor :param api_key: Api Key @@ -80,11 +81,37 @@ class Client(object): self.API_KEY = api_key self.API_SECRET = api_secret + self.api = api + self.DEBUG = False self.session = self._init_session() # init DNS and SSL cert self.ping() + + def writeLog(self, content): + content = 'client ' + content + if not self.api: + print(content) + return + if not hasattr(self.api, 'writeLog'): + print(content) + return + + self.api.writeLog(content) + + def writeError(self, content, error_id=0): + content = 'client ' + content + if not self.api: + print(u'{},error_id:{}'.format(content, error_id) ,file=sys.stderr) + return + + if not hasattr(self.api, 'writeError'): + print(u'{},error_id:{}'.format(content, error_id), file=sys.stderr) + return + + self.api.writeError(content, error_id) + def _init_session(self): session = requests.session() session.headers.update({'Accept': 'application/json', @@ -127,14 +154,12 @@ class Client(object): def _request(self, method, uri, signed, force_params=False, **kwargs): data = kwargs.get('data', None) - #if data is None: - # kwargs['data'] = data - if data and isinstance(data, dict): + if data is None: kwargs['data'] = data + if data and isinstance(data, dict): + kwargs['data'] = data if signed: - if data is None: - kwargs['data'] = {} # generate signature kwargs['data']['timestamp'] = int(time.time() * 1000) kwargs['data']['recvWindow'] = 20000 @@ -145,13 +170,18 @@ class Client(object): del(kwargs['data']) # kwargs["verify"] = False # I don't know whay this is error - print('_request:method:{} uri:{},{}'.format(method, uri, kwargs)) + if self.DEBUG: + self.writeLog('_request:method:{} uri:{},{}'.format(method, uri, kwargs)) + response = getattr(self.session, method)(uri, **kwargs ) + + if self.DEBUG: + self.writeLog(str(response)) + return self._handle_response(response , uri , **kwargs ) def _request_api(self, method, path, signed=False, version=PUBLIC_API_VERSION, **kwargs): uri = self._create_api_uri(path, signed, version) - return self._request(method, uri, signed, **kwargs) def _request_withdraw_api(self, method, path, signed=False, **kwargs): diff --git a/vnpy/api/binance/vnbinance.py b/vnpy/api/binance/vnbinance.py index 58495388..a119c3ef 100644 --- a/vnpy/api/binance/vnbinance.py +++ b/vnpy/api/binance/vnbinance.py @@ -84,13 +84,18 @@ class BinanceSpotApi(object): self.active = False # API工作状态 - self.DEBUG = True + self.DEBUG = False + + self.writeLog = None + self.writeError = None + #---------------------------------------------------------------------- def connect_Subpot(self, apiKey , secretKey ): self.apiKey = apiKey self.secretKey = secretKey - self.client = Client( apiKey , secretKey) + self.client = Client( apiKey , secretKey, self) + self.client.DEBUG = self.DEBUG self.bm = BinanceSocketManager(self.client) @@ -105,33 +110,31 @@ class BinanceSpotApi(object): def processQueue(self): """处理请求队列中的请求""" while self.active: - #req = self.reqQueue.get(block=True, timeout=0.001) # 获取请求的阻塞为一秒 + if len(self.reqQueue) == 0: continue + (Type , req) = self.reqQueue[0] - try: - callback = req['callback'] - reqID = req['reqID'] - - try: - data = self.processRequest(req) - # 请求成功 - if data != None : - if self.DEBUG: - print(callback.__name__) - callback(data, req, reqID) - except Exception as ex: - print(u'processQueue exception:{},{}'.format(str(ex), traceback.format_exc()), file=sys.stderr) + if req is None: self.reqQueue.pop(0) - sleep(0.1) - #except BinanceAPIException as ex: - # print(u'BinanceAPIException:{},{}'.format( str(ex),traceback.format_exc()),file=sys.stderr) - #except BinanceRequestException as ex: - # print(u'BinanceRequestException:{},{}'.format(str(ex), traceback.format_exc()), file=sys.stderr) + continue + + callback = req.get('callback', None) + reqID = req.get('reqID', None) + + try: + data = self.processRequest(req) + # 请求成功 + if data != None : + if self.DEBUG: + self.writeLog(callback.__name__) + callback(data, req, reqID) except Exception as ex: - self.onAllError(ex,req,reqID) - print(u'processQueue exception:{},{}'.format(str(ex), traceback.format_exc()), file=sys.stderr) + self.writeError(u'processQueue exception:{},{}'.format(str(ex), traceback.format_exc())) + + self.reqQueue.pop(0) + sleep(0.1) #---------------------------------------------------------------------- def processRequest(self, req): @@ -140,6 +143,7 @@ class BinanceSpotApi(object): :param req: :return: """ + try: method = req['method'] reqID = req["reqID"] @@ -233,7 +237,7 @@ class BinanceSpotApi(object): except Exception as ex: if req is not None or reqID is not None: self.onAllError(ex, req, reqID) - print(u'processRequest exception:{},{}'.format(str(ex), traceback.format_exc()), file=sys.stderr) + self.writeError(u'processRequest exception:{},{}'.format(str(ex), traceback.format_exc())) # pass #---------------------------------------------------------------------- @@ -343,21 +347,21 @@ class BinanceSpotApi(object): # print "self.bm != None:" symbol = self.legalSymbolLower(symbol) symbol = symbolFromOtherExchangesToBinance(symbol) - self.bm.start_symbol_ticker_socket( symbol , self.onPreDealTicker) + self.bm.start_symbol_ticker_socket(symbol, self.onPreDealTicker) #---------------------------------------------------------------------- def subscribeSpotDepth(self, symbol): if self.bm != None: symbol = self.legalSymbolLower(symbol) symbol = symbolFromOtherExchangesToBinance(symbol) - self.bm.start_depth_socket(symbol , self.onPreDealDepth) + self.bm.start_depth_socket(symbol, self.onPreDealDepth) #---------------------------------------------------------------------- def subscribeSpotTrades(self, symbol): if self.bm != None: symbol = self.legalSymbolLower(symbol) symbol = symbolFromOtherExchangesToBinance(symbol) - self.bm.start_trade_socket(symbol , self.onPreDealTrades) + self.bm.start_trade_socket(symbol, self.onPreDealTrades) #---------------------------------------------------------------------- def http_get_request(self, url, params, add_to_headers=None): @@ -374,10 +378,10 @@ class BinanceSpotApi(object): if response.status_code == 200: return response.json() else: - return {"status":"fail"} + return {"status": "fail"} except Exception as e: - print("httpGet failed, detail is:%s" %e) - return {"status":"fail","msg":e} + self.writeError('httpGet failed, detail is:{}'.format(str(e))) + return {"status": "fail", "msg":e} # limit in [5, 10, 20, 50, 100, 500, 1000] #---------------------------------------------------------------------- @@ -402,7 +406,7 @@ class BinanceSpotApi(object): #---------------------------------------------------------------------- def onAllError(self, ex , req , reqID): - print( "onAllError" + str(ex)) + self.writeError("onAllError" + str(ex)) #---------------------------------------------------------------------- def onPreDealAllTicker(self, msg): @@ -413,7 +417,7 @@ class BinanceSpotApi(object): #---------------------------------------------------------------------- def onAllTicker(self,msg): """币安支持所有 ticker 同时socket过来""" - print(u'onAllTicker:'.format(msg)) + self.writeLog(u'onAllTicker:'.format(msg)) # ---------------------------------------------------------------------- def onPreDealTicker(self, msg): @@ -423,7 +427,7 @@ class BinanceSpotApi(object): #---------------------------------------------------------------------- def onTick(self, msg): - print(u'onTick:'.format(msg)) + self.writeLog(u'onTick:'.format(msg)) # ---------------------------------------------------------------------- def onPreDealDepth(self, msg): @@ -432,7 +436,7 @@ class BinanceSpotApi(object): self.onDepth(msg) #---------------------------------------------------------------------- def onDepth(self, msg): - print(u'onDepth:'.format(msg)) + self.writeLog(u'onDepth:'.format(msg)) # ---------------------------------------------------------------------- def onPreDealTrades(self, msg): @@ -441,36 +445,36 @@ class BinanceSpotApi(object): self.onTrades(msg) #---------------------------------------------------------------------- def onTrades(self, msg): - print(u'onTrades:{}'.format(msg)) + self.writeLog(u'onTrades:{}'.format(msg)) #---------------------------------------------------------------------- def onGetAccount(self, data, req, reqID): - print(u'onGetAccount:'.format(data, req, reqID)) + self.writeLog(u'onGetAccount:'.format(data, req, reqID)) #---------------------------------------------------------------------- def onGetOpenOrders(self, data, req, reqID): - print(u'onGetOpenOrders:ata:{}, req:{}, reqID:{}'.format(data, req, reqID)) + self.writeLog(u'onGetOpenOrders:ata:{}, req:{}, reqID:{}'.format(data, req, reqID)) #---------------------------------------------------------------------- def onGetAllOrders(self, data, req, reqID): - print(u'onGetAllOrders:data:{}, req:{}, reqID:{}'.format(data, req, reqID)) + self.writeLog(u'onGetAllOrders:data:{}, req:{}, reqID:{}'.format(data, req, reqID)) #---------------------------------------------------------------------- def onGetBuyOrder(self, data, req, reqID): - print(u'onGetBuyOrder:data:{}, req:{}, reqID:{}'.format(data, req, reqID)) + self.writeLog(u'onGetBuyOrder:data:{}, req:{}, reqID:{}'.format(data, req, reqID)) #---------------------------------------------------------------------- def onGetSellOrder(self, data, req, reqID): - print(u'onGetSellOrder:data:{}, req:{}, reqID:{}'.format(data, req, reqID)) + self.writeLog(u'onGetSellOrder:data:{}, req:{}, reqID:{}'.format(data, req, reqID)) #---------------------------------------------------------------------- def onGetCancelOrder(self, data, req, reqID): - print(u'onGetCancelOrder:data:{},req:{},reqId:{}'.format(data, req, reqID)) + self.writeLog(u'onGetCancelOrder:data:{},req:{},reqId:{}'.format(data, req, reqID)) #---------------------------------------------------------------------- def onExchangeInfo(self, data, req, reqID): - print(u'onExchangeInfo:data:{},req:{},reqId:{}'.format(data, req, reqID)) + self.writeLog(u'onExchangeInfo:data:{},req:{},reqId:{}'.format(data, req, reqID)) # ---------------------------------------------------------------------- def onTradeOrder(self, data, req, reqID): - print (u'onTradeOrder:{}'.format(data)) \ No newline at end of file + self.writeLog (u'onTradeOrder:{}'.format(data)) \ No newline at end of file diff --git a/vnpy/trader/gateway/binanceGateway/binanceGateway.py b/vnpy/trader/gateway/binanceGateway/binanceGateway.py index a5aefc74..e300a055 100644 --- a/vnpy/trader/gateway/binanceGateway/binanceGateway.py +++ b/vnpy/trader/gateway/binanceGateway/binanceGateway.py @@ -27,12 +27,18 @@ from vnpy.trader.vtConstant import STATUS_UNKNOWN, STATUS_REJECTED, STATUS_ALLTR ''' class BinanceGateway(VtGateway): """Binance 接口""" - #---------------------------------------------------------------------- + # ---------------------------------------------------------------------- def __init__(self, eventEngine , gatewayName='BINANCE'): """Constructor""" super(BinanceGateway, self).__init__(eventEngine, gatewayName) + + # 创建现货交易API实例 self.api_spot = BinanceApi(self) + # 绑定写入日志和错误 + self.api_spot.writeLog = self.writeLog + self.api_spot.writeError = self.writeError + self.connected = False self.qryEnabled = True @@ -46,7 +52,8 @@ class BinanceGateway(VtGateway): # 消息调试 self.log_message = False - #---------------------------------------------------------------------- + + # ---------------------------------------------------------------------- def connect(self): try: f = open(self.filePath, 'r') @@ -61,22 +68,28 @@ class BinanceGateway(VtGateway): apiKey = str(setting['accessKey']) secretKey = str(setting['secretKey']) self.interval = float(setting['interval']) - - self.api_spot.setAccount(self.accountID) - self.log_message = setting['log_message'] if 'log_message' in setting else False + self.api_spot.setAccount(self.accountID) except KeyError: self.writeLog(u'BINANCE连接配置缺少字段,请检查') return self.api_spot.active = True - self.api_spot.connect_Subpot( apiKey , secretKey) + + if self.log_message: + self.api_spot.DEBUG = True + + self.api_spot.connect_Subpot(apiKey, secretKey) self.api_spot.spotExchangeInfo() - # sub = VtSubscribeReq() - # sub.symbol = "etc_btc.BINANCE" - # self.subscribe(sub) + sub = VtSubscribeReq() + sub.symbol = "btc_usdt.BINANCE" + self.subscribe(sub) + + sub = VtSubscribeReq() + sub.symbol = "eth_usdt.BINANCE" + self.subscribe(sub) self.writeLog(u'{}接口初始化成功'.format(self.gatewayName)) @@ -84,7 +97,7 @@ class BinanceGateway(VtGateway): self.initQuery() self.startQuery() - #---------------------------------------------------------------------- + # ---------------------------------------------------------------------- def subscribe(self, subscribeReq): """订阅行情,自动订阅全部行情,无需实现""" self.api_spot.register(subscribeReq) @@ -95,7 +108,7 @@ class BinanceGateway(VtGateway): try: return self.api_spot.sendOrder(orderReq) except Exception as ex: - print(u'send order Exception:{},{}'.format(str(ex),traceback.format_exc()),file=sys.stderr) + self.writeError(u'send order Exception:{},{}'.format(str(ex),traceback.format_exc())) #---------------------------------------------------------------------- def cancelOrder(self, cancelOrderReq): @@ -219,8 +232,8 @@ class BinanceApi(BinanceSpotApi): symbol = (subscribeReq.symbol.split('.'))[0] if symbol not in self.registerSymbolSets: self.registerSymbolSets.add( symbol ) - self.subscribeSpotTicker( symbol ) - self.subscribeSpotDepth( symbol ) + self.subscribeSpotTicker( symbol ) + self.subscribeSpotDepth( symbol ) #---------------------------------------------------------------------- def setAccount(self, useAccountID): @@ -862,14 +875,16 @@ BREAK status = use_order["status"] side = use_order["side"] tradedVolume = float(use_order["executedQty"]) + use_dt , use_date, now_time = self.generateDateTime(use_order["time"]) # now_time = self.generateDateTime(use_order["time"]) if systemID in local_system_dict_keys: localID = self.systemLocalDict[systemID] order = self.workingOrderDict.get(localID, None) if order != None: - bef_has_volume = self.tradedVolumeDict.get(localID , 0.0) + bef_has_volume = self.tradedVolumeDict.get(localID, 0.0) newTradeVolume = tradedVolume - bef_has_volume + self.writeLog('{} 成交:{} ,之前累计成交:{},当次成交:{}'.format(localID, tradedVolume, bef_has_volume, newTradeVolume)) order.tradedVolume = tradedVolume if newTradeVolume > 0.000001: @@ -880,7 +895,7 @@ BREAK self.tradeID += 1 trade.tradeID = str(self.tradeID) - trade.vtTradeID = '.'.join([ trade.gatewayName,trade.tradeID]) + trade.vtTradeID = '.'.join([trade.gatewayName, trade.tradeID]) trade.orderID = order.orderID trade.vtOrderID = order.vtOrderID @@ -913,6 +928,7 @@ BREAK del self.systemLocalDict[systemID] del self.workingOrderDict[localID] + # 部分成交 elif status == "PARTIALLY_FILLED": order.status = STATUS_PARTTRADED self.gateway.onOrder(order) @@ -922,7 +938,6 @@ BREAK self.gateway.onOrder(order) else: # 说明是以前发的单子 - symbol_pair = systemSymbolToVnSymbol(use_order["symbol"]) @@ -960,7 +975,7 @@ BREAK self.gateway.onOrder(order) - #---------------------------------------------------------------------- + # ---------------------------------------------------------------------- def cancel(self, req): localID = req.orderID if localID in self.localSystemDict: diff --git a/vnpy/trader/gateway/okexGateway/okexGateway.py b/vnpy/trader/gateway/okexGateway/okexGateway.py index f98df5c3..3c50c2e2 100644 --- a/vnpy/trader/gateway/okexGateway/okexGateway.py +++ b/vnpy/trader/gateway/okexGateway/okexGateway.py @@ -160,7 +160,12 @@ class OkexGateway(VtGateway): :return: """ try: - if subscribeReq.symbol in SPOT_SYMBOL: + symbol_pair_gateway = subscribeReq.symbol + arr = symbol_pair_gateway.split('.') + # 提取品种对 eth_usdt + symbol_pair = arr[0] + + if symbol_pair in SPOT_SYMBOL: if self.api_spot and self.spot_connected: self.api_spot.subscribe(subscribeReq) else: