This commit is contained in:
msincenselee 2018-11-15 13:01:49 +08:00
parent 6c1bb74966
commit 9aeaa626d8
2 changed files with 127 additions and 107 deletions

View File

@ -9,7 +9,8 @@ import traceback
import websocket
import requests
import sys
import ssl
from datetime import datetime
# API文档 https://github.com/okcoin-okex/OKEx.com-api-docs
from vnpy.api.okex.okexData import SPOT_TRADE_SIZE_DICT,SPOT_REST_ERROR_DICT, FUTURES_ERROR_DICT
@ -78,16 +79,20 @@ class OkexApi(object):
"""重新连接"""
# 首先关闭之前的连接
self.close()
# 再执行重连任务
self.ws = websocket.WebSocketApp(self.host,
on_message=self.onMessage,
on_error=self.onError,
on_close=self.onClose,
on_open=self.onOpen)
self.thread = Thread(target=self.ws.run_forever)
self.thread.start()
try:
# 再执行重连任务
self.ws = websocket.WebSocketApp(self.host,
on_message=self.onMessage,
on_error=self.onError,
on_close=self.onClose,
on_open=self.onOpen)
kwargs = {'sslopt': {'cert_reqs': ssl.CERT_NONE}}
self.thread = Thread(target=self.ws.run_forever, kwargs=kwargs)
self.thread.start()
except Exception as ex:
print(u'{} OkexApi reconnect exception :{},{}'.format(datetime.now(), str(ex), traceback.format_exc()),
file=sys.stderr)
#----------------------------------------------------------------------
def connect(self, apiKey, secretKey, trace=False):
@ -102,20 +107,23 @@ class OkexApi(object):
self.host = OKEX_USD_SPOT
self.apiKey = apiKey
self.secretKey = secretKey
try:
# 是否开启日志
websocket.enableTrace(trace)
# 是否开启日志
websocket.enableTrace(trace)
# 创建websocket绑定本地回调函数 onMessage/onError/onClose/onOpen
self.ws = websocket.WebSocketApp(self.host,
on_message=self.onMessage,
on_error=self.onError,
on_close=self.onClose,
on_open=self.onOpen)
self.thread = Thread(target=self.ws.run_forever)
self.thread.start()
# 创建websocket绑定本地回调函数 onMessage/onError/onClose/onOpen
self.ws = websocket.WebSocketApp(self.host,
on_message=self.onMessage,
on_error=self.onError,
on_close=self.onClose,
on_open=self.onOpen)
kwargs = {'sslopt': {'cert_reqs': ssl.CERT_NONE}}
self.thread = Thread(target=self.ws.run_forever, kwargs=kwargs)
self.thread.start()
except Exception as ex:
print(u'{} OkexApi connect exception :{},{}'.format(datetime.now(), str(ex), traceback.format_exc()),
file=sys.stderr)
#----------------------------------------------------------------------
def readData(self, evt):
"""
@ -141,14 +149,13 @@ class OkexApi(object):
self.thread.join()
#----------------------------------------------------------------------
def onMessage(self, *args):
def onMessage(self, ws, evt):
"""
信息推送事件
:param ws: 接口
:param evt: 事件
:return:
"""
evt = args[-1]
print(u'vnokex.onMessage:{}'.format(evt))
#----------------------------------------------------------------------
@ -159,7 +166,9 @@ class OkexApi(object):
:param evt:
:return:
"""
evt = args[-1]
evt = None
if len(args) == 2:
evt = args[-1]
print(u'vnokex.onApiError:{}'.format(evt))
@ -444,17 +453,19 @@ class WsFuturesApi(object):
self.apiKey = apiKey
self.secretKey = secretKey
self.trace = trace
try:
websocket.enableTrace(trace)
websocket.enableTrace(trace)
self.ws = websocket.WebSocketApp(self.host,
on_message=self.onMessage,
on_error=self.onError,
on_close=self.onClose,
on_open=self.onOpen)
self.thread = Thread(target=self.ws.run_forever, args=(None, None, 60, 30))
self.thread.start()
self.ws = websocket.WebSocketApp(self.host,
on_message=self.onMessage,
on_error=self.onError,
on_close=self.onClose,
on_open=self.onOpen)
kwargs = {'sslopt': {'cert_reqs': ssl.CERT_NONE}}
self.thread = Thread(target=self.ws.run_forever, kwargs=kwargs)
self.thread.start()
except Exception as ex:
print(u'{} wsFuturesApi connect exception :{},{}'.format(datetime.now(), str(ex),traceback.format_exc()), file=sys.stderr)
# ----------------------------------------------------------------------
def http_get_request(self, url, params, add_to_headers=None, TIMEOUT=5):
@ -473,7 +484,7 @@ class WsFuturesApi(object):
else:
return {"status": "fail"}
except Exception as e:
print(u'httpGet failed :{}'.format(str(e)), file=sys.stderr)
print(u'httpGet failed :{},trace:{}'.format(str(e),traceback.format_exc()), file=sys.stderr)
return {"status": "fail", "msg": e}
# ----------------------------------------------------------------------
@ -543,14 +554,17 @@ class WsFuturesApi(object):
return data
# ----------------------------------------------------------------------
def onMessage(self, ws, evt):
def onMessage(self, *args):
"""信息推送"""
print(evt)
if len(args)>0:
print(args[-1])
# ----------------------------------------------------------------------
def onError(self, ws, evt):
def onError(self, *args):
"""错误推送"""
print('OkexContractApi.onError:{}'.format(evt))
if len(args) > 0:
print('OkexContractApi.onError:{}'.format(evt[-1]))
# ----------------------------------------------------------------------
def onClose(self, ws):

View File

@ -430,21 +430,17 @@ class OkexSpotApi(WsSpotApi):
'''
# ----------------------------------------------------------------------
def onMessage(self, *args):
def onMessage(self, ws, evt):
"""
响应信息处理包括心跳响应请求响应数据推送
:param ws: websocket接口
:param evt: 消息体
:return:
"""
evt = args[-1]
if isinstance(evt,bytes):
# bytes = > str => json
decmp_evt = self.inflate(evt)
ws_data = self.readData(decmp_evt)
else:
# str => json
ws_data = self.readData(evt)
# str => json
decmp_evt = self.inflate(evt)
ws_data = self.readData(decmp_evt)
if self.gateway.log_message:
self.gateway.writeLog(u'SpotApi.onMessage:{}'.format(ws_data))
@ -501,14 +497,13 @@ class OkexSpotApi(WsSpotApi):
# ----------------------------------------------------------------------
def onError(self, *args):
"""Api方法重载,错误推送"""
evt = None
if len(args) == 2:
evt = args[-1]
if len(args)== 0:
return
evt = args[-1]
error = VtErrorData()
error.gatewayName = self.gatewayName
error.errorID = 0
if isinstance(evt,bytes):
if isinstance(evt, bytes):
decom_evt = self.inflate(evt)
error.errorMsg = str(decom_evt)
else:
@ -1580,13 +1575,14 @@ class OkexFuturesApi(WsFuturesApi):
:param evt:
:return:
"""
if len(args)==0:
return
evt = args[-1]
if isinstance(evt,bytes):
# bytes => str => json
decmp_evt = self.inflate(evt)
ws_data = self.readData(decmp_evt)
else:
# str => json
ws_data = self.readData(evt)
if self.gateway.log_message:
@ -1651,8 +1647,10 @@ class OkexFuturesApi(WsFuturesApi):
self.writeLog(u'unkonw msg:{}'.format(data))
# ----------------------------------------------------------------------
def onError(self,*args):
def onError(self, *args):
"""重载WsFutureApi.onError错误Event推送"""
if len(args)== 0:
return
evt = args[-1]
error = VtErrorData()
error.gatewayName = self.gatewayName
@ -1696,10 +1694,11 @@ class OkexFuturesApi(WsFuturesApi):
return
self.gateway.futures_connected = False
self.writeLog(u'服务器连接断开')
self.writeLog(u'期货服务器连接断开')
# 重新连接
if self.active:
self.writeLog(u'重新连接期货服务器')
t = Thread(target=self.reconnect)
t.start()
@ -1796,62 +1795,68 @@ class OkexFuturesApi(WsFuturesApi):
"""连接成功"""
self.gateway.futures_connected = True
self.writeLog(u'服务器OKEX期货连接成功')
try:
self.initCallback()
self.writeLog(u'服务器OKEX期货回调函数设置成功')
self.initCallback()
for symbol in CONTRACT_SYMBOL:
self.channelSymbolMap[
"ok_sub_futureusd_%s_index" % symbol] = symbol + "_usd:%s:" + self._use_leverage # + "." + EXCHANGE_OKEX
for symbol in CONTRACT_SYMBOL:
self.channelSymbolMap[
"ok_sub_futureusd_%s_index" % symbol] = symbol + "_usd:%s:" + self._use_leverage # + "." + EXCHANGE_OKEX
for use_contract_type in CONTRACT_TYPE:
use_symbol_name = symbol + "_usd:%s:%s" % (use_contract_type, self._use_leverage)
# Ticker数据
self.channelSymbolMap["ok_sub_futureusd_%s_ticker_%s" % (symbol, use_contract_type)] = use_symbol_name
# 盘口的深度
self.channelSymbolMap["ok_sub_future_%s_depth_%s_usd" % (symbol, use_contract_type)] = use_symbol_name
# 所有人的交易数据
self.channelSymbolMap["ok_sub_futureusd_%s_trade_%s" % (symbol, use_contract_type)] = use_symbol_name
for use_contract_type in CONTRACT_TYPE:
use_symbol_name = symbol + "_usd:%s:%s" % (use_contract_type, self._use_leverage)
# Ticker数据
self.channelSymbolMap["ok_sub_futureusd_%s_ticker_%s" % (symbol, use_contract_type)] = use_symbol_name
# 盘口的深度
self.channelSymbolMap["ok_sub_future_%s_depth_%s_usd" % (symbol, use_contract_type)] = use_symbol_name
# 所有人的交易数据
self.channelSymbolMap["ok_sub_futureusd_%s_trade_%s" % (symbol, use_contract_type)] = use_symbol_name
contract = VtContractData()
contract.gatewayName = self.gatewayName
contract.symbol = use_symbol_name + "." + EXCHANGE_OKEX
contract.exchange = EXCHANGE_OKEX
contract.vtSymbol = contract.symbol
contract.name = u'期货%s_%s_%s' % (symbol, use_contract_type, self._use_leverage)
contract.size = 0.00001
contract.priceTick = 0.00001
contract.productClass = PRODUCT_FUTURES
self.gateway.onContract(contract)
contract = VtContractData()
contract.gatewayName = self.gatewayName
contract.symbol = use_symbol_name + "." + EXCHANGE_OKEX
contract.exchange = EXCHANGE_OKEX
contract.vtSymbol = contract.symbol
contract.name = u'期货%s_%s_%s' % (symbol, use_contract_type, self._use_leverage)
contract.size = 0.00001
contract.priceTick = 0.00001
contract.productClass = PRODUCT_FUTURES
self.gateway.onContract(contract)
# print contract.vtSymbol , contract.name
# print contract.vtSymbol , contract.name
quanyi_vtSymbol = symbol + "_usd_future_qy" + "."+ EXCHANGE_OKEX
contract = VtContractData()
contract.gatewayName = self.gatewayName
contract.symbol = quanyi_vtSymbol
contract.exchange = EXCHANGE_OKEX
contract.vtSymbol = contract.symbol
contract.name = u'期货权益%s' % (symbol)
contract.size = 0.00001
contract.priceTick = 0.00001
contract.productClass = PRODUCT_FUTURES
self.gateway.onContract(contract)
quanyi_vtSymbol = symbol + "_usd_future_qy" + "."+ EXCHANGE_OKEX
contract = VtContractData()
contract.gatewayName = self.gatewayName
contract.symbol = quanyi_vtSymbol
contract.exchange = EXCHANGE_OKEX
contract.vtSymbol = contract.symbol
contract.name = u'期货权益%s' % (symbol)
contract.size = 0.00001
contract.priceTick = 0.00001
contract.productClass = PRODUCT_FUTURES
self.gateway.onContract(contract)
self.writeLog(u'服务器OKEX期货合约信息更新成功')
self.login()
# 连接后查询账户和委托数据
self.futureUserInfo()
self.futureAllUnfinishedOrderInfo()
self.futureAllIndexSymbol()
for symbol in self.registered_symbols:
try:
self.writeLog(u'okex future_api 重新订阅:'.format(symbol))
# 分解出 合约对/合约/合约类型/杠杆倍数
(symbol_pair, symbol, contract_type, leverage) = self.dealSymbolFunc(symbol)
self.subscribeSingleSymbol(symbol, contract_type, leverage)
except Exception as ex:
self.writeLog(u'订阅合约行情异常:{},{}'.format(str(ex),traceback.format_exc()))
continue
self.login()
# 连接后查询账户和委托数据
self.futureUserInfo()
self.futureAllUnfinishedOrderInfo()
self.futureAllIndexSymbol()
for symbol in self.registered_symbols:
try:
self.writeLog(u'okex future_api 重新订阅:'.format(symbol))
# 分解出 合约对/合约/合约类型/杠杆倍数
(symbol_pair, symbol, contract_type, leverage) = self.dealSymbolFunc(symbol)
self.subscribeSingleSymbol(symbol, contract_type, leverage)
except Exception as ex:
self.gateway.writeError(u'订阅合约行情异常:{},{}'.format(str(ex),traceback.format_exc()))
continue
except Exception as ex:
self.gateway.writeError(u'期货onOpen异常:{},{}'.format(str(ex), traceback.format_exc()))
# ----------------------------------------------------------------------
def writeLog(self, content):
@ -1877,7 +1882,8 @@ class OkexFuturesApi(WsFuturesApi):
else:
new_unfishedSet.add((symbol, use_contract_type))
self.LoopforceGetContractDict(new_unfishedSet)
if unFishedSet!=new_unfishedSet:
self.LoopforceGetContractDict(new_unfishedSet)
# ----------------------------------------------------------------------
def initCallback(self):