diff --git a/vnpy/api/okex/vnokex.py b/vnpy/api/okex/vnokex.py index 40c87796..8269a079 100644 --- a/vnpy/api/okex/vnokex.py +++ b/vnpy/api/okex/vnokex.py @@ -9,7 +9,8 @@ import traceback import websocket import requests import sys - +import ssl +from datetime import datetime # API文档 https://github.com/okcoin-okex/OKEx.com-api-docs from vnpy.api.okex.okexData import SPOT_TRADE_SIZE_DICT,SPOT_REST_ERROR_DICT, FUTURES_ERROR_DICT @@ -78,16 +79,20 @@ class OkexApi(object): """重新连接""" # 首先关闭之前的连接 self.close() - - # 再执行重连任务 - self.ws = websocket.WebSocketApp(self.host, - on_message=self.onMessage, - on_error=self.onError, - on_close=self.onClose, - on_open=self.onOpen) - - self.thread = Thread(target=self.ws.run_forever) - self.thread.start() + try: + # 再执行重连任务 + self.ws = websocket.WebSocketApp(self.host, + on_message=self.onMessage, + on_error=self.onError, + on_close=self.onClose, + on_open=self.onOpen) + + kwargs = {'sslopt': {'cert_reqs': ssl.CERT_NONE}} + self.thread = Thread(target=self.ws.run_forever, kwargs=kwargs) + self.thread.start() + except Exception as ex: + print(u'{} OkexApi reconnect exception :{},{}'.format(datetime.now(), str(ex), traceback.format_exc()), + file=sys.stderr) #---------------------------------------------------------------------- def connect(self, apiKey, secretKey, trace=False): @@ -102,20 +107,23 @@ class OkexApi(object): self.host = OKEX_USD_SPOT self.apiKey = apiKey self.secretKey = secretKey + try: + # 是否开启日志 + websocket.enableTrace(trace) - # 是否开启日志 - websocket.enableTrace(trace) - - # 创建websocket,绑定本地回调函数 onMessage/onError/onClose/onOpen - self.ws = websocket.WebSocketApp(self.host, - on_message=self.onMessage, - on_error=self.onError, - on_close=self.onClose, - on_open=self.onOpen) - - self.thread = Thread(target=self.ws.run_forever) - self.thread.start() + # 创建websocket,绑定本地回调函数 onMessage/onError/onClose/onOpen + self.ws = websocket.WebSocketApp(self.host, + on_message=self.onMessage, + on_error=self.onError, + on_close=self.onClose, + on_open=self.onOpen) + kwargs = {'sslopt': {'cert_reqs': ssl.CERT_NONE}} + self.thread = Thread(target=self.ws.run_forever, kwargs=kwargs) + self.thread.start() + except Exception as ex: + print(u'{} OkexApi connect exception :{},{}'.format(datetime.now(), str(ex), traceback.format_exc()), + file=sys.stderr) #---------------------------------------------------------------------- def readData(self, evt): """ @@ -141,14 +149,13 @@ class OkexApi(object): self.thread.join() #---------------------------------------------------------------------- - def onMessage(self, *args): + def onMessage(self, ws, evt): """ 信息推送事件 :param ws: 接口 :param evt: 事件 :return: """ - evt = args[-1] print(u'vnokex.onMessage:{}'.format(evt)) #---------------------------------------------------------------------- @@ -159,7 +166,9 @@ class OkexApi(object): :param evt: :return: """ - evt = args[-1] + evt = None + if len(args) == 2: + evt = args[-1] print(u'vnokex.onApiError:{}'.format(evt)) @@ -444,17 +453,19 @@ class WsFuturesApi(object): self.apiKey = apiKey self.secretKey = secretKey self.trace = trace + try: + websocket.enableTrace(trace) - websocket.enableTrace(trace) - - self.ws = websocket.WebSocketApp(self.host, - on_message=self.onMessage, - on_error=self.onError, - on_close=self.onClose, - on_open=self.onOpen) - - self.thread = Thread(target=self.ws.run_forever, args=(None, None, 60, 30)) - self.thread.start() + self.ws = websocket.WebSocketApp(self.host, + on_message=self.onMessage, + on_error=self.onError, + on_close=self.onClose, + on_open=self.onOpen) + kwargs = {'sslopt': {'cert_reqs': ssl.CERT_NONE}} + self.thread = Thread(target=self.ws.run_forever, kwargs=kwargs) + self.thread.start() + except Exception as ex: + print(u'{} wsFuturesApi connect exception :{},{}'.format(datetime.now(), str(ex),traceback.format_exc()), file=sys.stderr) # ---------------------------------------------------------------------- def http_get_request(self, url, params, add_to_headers=None, TIMEOUT=5): @@ -473,7 +484,7 @@ class WsFuturesApi(object): else: return {"status": "fail"} except Exception as e: - print(u'httpGet failed :{}'.format(str(e)), file=sys.stderr) + print(u'httpGet failed :{},trace:{}'.format(str(e),traceback.format_exc()), file=sys.stderr) return {"status": "fail", "msg": e} # ---------------------------------------------------------------------- @@ -543,14 +554,17 @@ class WsFuturesApi(object): return data # ---------------------------------------------------------------------- - def onMessage(self, ws, evt): + def onMessage(self, *args): """信息推送""" - print(evt) + if len(args)>0: + print(args[-1]) # ---------------------------------------------------------------------- - def onError(self, ws, evt): + def onError(self, *args): """错误推送""" - print('OkexContractApi.onError:{}'.format(evt)) + if len(args) > 0: + + print('OkexContractApi.onError:{}'.format(evt[-1])) # ---------------------------------------------------------------------- def onClose(self, ws): diff --git a/vnpy/trader/gateway/okexGateway/okexGateway.py b/vnpy/trader/gateway/okexGateway/okexGateway.py index a7ee1a16..b34fec09 100644 --- a/vnpy/trader/gateway/okexGateway/okexGateway.py +++ b/vnpy/trader/gateway/okexGateway/okexGateway.py @@ -430,21 +430,17 @@ class OkexSpotApi(WsSpotApi): ''' # ---------------------------------------------------------------------- - def onMessage(self, *args): + def onMessage(self, ws, evt): """ 响应信息处理,包括心跳响应、请求响应、数据推送 :param ws: websocket接口 :param evt: 消息体 :return: """ - evt = args[-1] - if isinstance(evt,bytes): - # bytes = > str => json - decmp_evt = self.inflate(evt) - ws_data = self.readData(decmp_evt) - else: - # str => json - ws_data = self.readData(evt) + # str => json + decmp_evt = self.inflate(evt) + ws_data = self.readData(decmp_evt) + if self.gateway.log_message: self.gateway.writeLog(u'SpotApi.onMessage:{}'.format(ws_data)) @@ -501,14 +497,13 @@ class OkexSpotApi(WsSpotApi): # ---------------------------------------------------------------------- def onError(self, *args): """Api方法重载,错误推送""" - evt = None - if len(args) == 2: - evt = args[-1] - + if len(args)== 0: + return + evt = args[-1] error = VtErrorData() error.gatewayName = self.gatewayName error.errorID = 0 - if isinstance(evt,bytes): + if isinstance(evt, bytes): decom_evt = self.inflate(evt) error.errorMsg = str(decom_evt) else: @@ -1580,13 +1575,14 @@ class OkexFuturesApi(WsFuturesApi): :param evt: :return: """ + if len(args)==0: + return evt = args[-1] if isinstance(evt,bytes): # bytes => str => json decmp_evt = self.inflate(evt) ws_data = self.readData(decmp_evt) else: - # str => json ws_data = self.readData(evt) if self.gateway.log_message: @@ -1651,8 +1647,10 @@ class OkexFuturesApi(WsFuturesApi): self.writeLog(u'unkonw msg:{}'.format(data)) # ---------------------------------------------------------------------- - def onError(self,*args): + def onError(self, *args): """重载WsFutureApi.onError错误Event推送""" + if len(args)== 0: + return evt = args[-1] error = VtErrorData() error.gatewayName = self.gatewayName @@ -1696,10 +1694,11 @@ class OkexFuturesApi(WsFuturesApi): return self.gateway.futures_connected = False - self.writeLog(u'服务器连接断开') + self.writeLog(u'期货服务器连接断开') # 重新连接 if self.active: + self.writeLog(u'重新连接期货服务器') t = Thread(target=self.reconnect) t.start() @@ -1796,62 +1795,68 @@ class OkexFuturesApi(WsFuturesApi): """连接成功""" self.gateway.futures_connected = True self.writeLog(u'服务器OKEX期货连接成功') + try: + self.initCallback() + self.writeLog(u'服务器OKEX期货回调函数设置成功') - self.initCallback() + for symbol in CONTRACT_SYMBOL: + self.channelSymbolMap[ + "ok_sub_futureusd_%s_index" % symbol] = symbol + "_usd:%s:" + self._use_leverage # + "." + EXCHANGE_OKEX - for symbol in CONTRACT_SYMBOL: - self.channelSymbolMap[ - "ok_sub_futureusd_%s_index" % symbol] = symbol + "_usd:%s:" + self._use_leverage # + "." + EXCHANGE_OKEX + for use_contract_type in CONTRACT_TYPE: + use_symbol_name = symbol + "_usd:%s:%s" % (use_contract_type, self._use_leverage) + # Ticker数据 + self.channelSymbolMap["ok_sub_futureusd_%s_ticker_%s" % (symbol, use_contract_type)] = use_symbol_name + # 盘口的深度 + self.channelSymbolMap["ok_sub_future_%s_depth_%s_usd" % (symbol, use_contract_type)] = use_symbol_name + # 所有人的交易数据 + self.channelSymbolMap["ok_sub_futureusd_%s_trade_%s" % (symbol, use_contract_type)] = use_symbol_name - for use_contract_type in CONTRACT_TYPE: - use_symbol_name = symbol + "_usd:%s:%s" % (use_contract_type, self._use_leverage) - # Ticker数据 - self.channelSymbolMap["ok_sub_futureusd_%s_ticker_%s" % (symbol, use_contract_type)] = use_symbol_name - # 盘口的深度 - self.channelSymbolMap["ok_sub_future_%s_depth_%s_usd" % (symbol, use_contract_type)] = use_symbol_name - # 所有人的交易数据 - self.channelSymbolMap["ok_sub_futureusd_%s_trade_%s" % (symbol, use_contract_type)] = use_symbol_name + contract = VtContractData() + contract.gatewayName = self.gatewayName + contract.symbol = use_symbol_name + "." + EXCHANGE_OKEX + contract.exchange = EXCHANGE_OKEX + contract.vtSymbol = contract.symbol + contract.name = u'期货%s_%s_%s' % (symbol, use_contract_type, self._use_leverage) + contract.size = 0.00001 + contract.priceTick = 0.00001 + contract.productClass = PRODUCT_FUTURES + self.gateway.onContract(contract) - contract = VtContractData() - contract.gatewayName = self.gatewayName - contract.symbol = use_symbol_name + "." + EXCHANGE_OKEX - contract.exchange = EXCHANGE_OKEX - contract.vtSymbol = contract.symbol - contract.name = u'期货%s_%s_%s' % (symbol, use_contract_type, self._use_leverage) - contract.size = 0.00001 - contract.priceTick = 0.00001 - contract.productClass = PRODUCT_FUTURES - self.gateway.onContract(contract) + # print contract.vtSymbol , contract.name - # print contract.vtSymbol , contract.name + quanyi_vtSymbol = symbol + "_usd_future_qy" + "."+ EXCHANGE_OKEX + contract = VtContractData() + contract.gatewayName = self.gatewayName + contract.symbol = quanyi_vtSymbol + contract.exchange = EXCHANGE_OKEX + contract.vtSymbol = contract.symbol + contract.name = u'期货权益%s' % (symbol) + contract.size = 0.00001 + contract.priceTick = 0.00001 + contract.productClass = PRODUCT_FUTURES + self.gateway.onContract(contract) - quanyi_vtSymbol = symbol + "_usd_future_qy" + "."+ EXCHANGE_OKEX - contract = VtContractData() - contract.gatewayName = self.gatewayName - contract.symbol = quanyi_vtSymbol - contract.exchange = EXCHANGE_OKEX - contract.vtSymbol = contract.symbol - contract.name = u'期货权益%s' % (symbol) - contract.size = 0.00001 - contract.priceTick = 0.00001 - contract.productClass = PRODUCT_FUTURES - self.gateway.onContract(contract) + self.writeLog(u'服务器OKEX期货合约信息更新成功') - self.login() - # 连接后查询账户和委托数据 - self.futureUserInfo() - self.futureAllUnfinishedOrderInfo() - self.futureAllIndexSymbol() - for symbol in self.registered_symbols: - try: - self.writeLog(u'okex future_api 重新订阅:'.format(symbol)) - # 分解出 合约对/合约/合约类型/杠杆倍数 - (symbol_pair, symbol, contract_type, leverage) = self.dealSymbolFunc(symbol) - self.subscribeSingleSymbol(symbol, contract_type, leverage) - except Exception as ex: - self.writeLog(u'订阅合约行情异常:{},{}'.format(str(ex),traceback.format_exc())) - continue + self.login() + # 连接后查询账户和委托数据 + self.futureUserInfo() + self.futureAllUnfinishedOrderInfo() + self.futureAllIndexSymbol() + + for symbol in self.registered_symbols: + try: + self.writeLog(u'okex future_api 重新订阅:'.format(symbol)) + # 分解出 合约对/合约/合约类型/杠杆倍数 + (symbol_pair, symbol, contract_type, leverage) = self.dealSymbolFunc(symbol) + self.subscribeSingleSymbol(symbol, contract_type, leverage) + except Exception as ex: + self.gateway.writeError(u'订阅合约行情异常:{},{}'.format(str(ex),traceback.format_exc())) + continue + except Exception as ex: + self.gateway.writeError(u'期货onOpen异常:{},{}'.format(str(ex), traceback.format_exc())) # ---------------------------------------------------------------------- def writeLog(self, content): @@ -1877,7 +1882,8 @@ class OkexFuturesApi(WsFuturesApi): else: new_unfishedSet.add((symbol, use_contract_type)) - self.LoopforceGetContractDict(new_unfishedSet) + if unFishedSet!=new_unfishedSet: + self.LoopforceGetContractDict(new_unfishedSet) # ---------------------------------------------------------------------- def initCallback(self):