[Add]OKEX接口完善自动重连机制

This commit is contained in:
vn.py 2018-06-01 12:31:12 +08:00
parent 230688dea8
commit 653357e582
2 changed files with 42 additions and 14 deletions

View File

@ -75,14 +75,16 @@ class OkexApi(object):
self.heartbeatCount = 0 # 心跳计数
self.heartbeatThread = None # 心跳线程
self.heartbeatReceived = True # 心跳是否收到
self.reconnecting = False # 重新连接中
#----------------------------------------------------------------------
def heartbeat(self):
""""""
while self.active:
self.heartbeatCount += 1
if self.heartbeatCount < 30:
if self.heartbeatCount < 10:
sleep(1)
else:
self.heartbeatCount = 0
@ -90,16 +92,26 @@ class OkexApi(object):
if not self.heartbeatReceived:
self.reconnect()
else:
self.heartbeatReceived = False
d = {'event': 'ping'}
j = json.dumps(d)
self.ws.send(j)
self.heartbeatReceived = False
try:
self.ws.send(j)
except websocket.WebSocketConnectionClosedException:
self.reconnect()
#----------------------------------------------------------------------
def reconnect(self):
"""重新连接"""
self.close() # 首先关闭之前的连接
self.initWebsocket()
if not self.reconnecting:
self.reconnecting = True
self.closeWebsocket() # 首先关闭之前的连接
self.initWebsocket()
self.reconnecting = False
#----------------------------------------------------------------------
def connect(self, host, apiKey, secretKey, trace=False):
@ -132,15 +144,24 @@ class OkexApi(object):
return data
#----------------------------------------------------------------------
def close(self):
def closeHeartbeat(self):
"""关闭接口"""
if self.heartbeatThread and self.heartbeatThread.isAlive():
self.active = False
self.heartbeatThread.join()
#----------------------------------------------------------------------
def closeWebsocket(self):
"""关闭WS"""
if self.wsThread and self.wsThread.isAlive():
self.ws.close()
self.wsThread.join()
#----------------------------------------------------------------------
def close(self):
""""""
self.closeHeartbeat()
self.closeWebsocket()
#----------------------------------------------------------------------
def onMessage(self, data):
@ -186,8 +207,9 @@ class OkexApi(object):
#----------------------------------------------------------------------
def onOpenCallback(self, ws):
""""""
self.heartbeatThread = Thread(target=self.heartbeat)
self.heartbeatThread.start()
if not self.heartbeatThread:
self.heartbeatThread = Thread(target=self.heartbeat)
self.heartbeatThread.start()
self.onOpen()
@ -221,8 +243,10 @@ class OkexApi(object):
# 若触发异常则重连
try:
self.ws.send(j)
return True
except websocket.WebSocketConnectionClosedException:
pass
self.reconnect()
return False
#----------------------------------------------------------------------
def login(self):
@ -241,6 +265,7 @@ class OkexApi(object):
self.ws.send(j)
return True
except websocket.WebSocketConnectionClosedException:
self.reconnect()
return False
@ -288,7 +313,7 @@ class OkexSpotApi(OkexApi):
channel = 'ok_spot_order'
self.sendRequest(channel, params)
return self.sendRequest(channel, params)
#----------------------------------------------------------------------
def spotCancelOrder(self, symbol, orderid):

View File

@ -524,7 +524,6 @@ class SpotApi(OkexSpotApi):
"""初始化接口"""
self.symbols = symbols
self.initCallback()
self.connect(OKEX_SPOT_HOST, apiKey, secretKey, trace)
self.writeLog(u'接口初始化成功')
@ -533,7 +532,11 @@ class SpotApi(OkexSpotApi):
"""发单"""
type_ = priceTypeMapReverse[(req.direction, req.priceType)]
req.volume = 0.001
self.spotOrder(req.symbol, type_, str(req.price), str(req.volume))
result = self.spotOrder(req.symbol, type_, str(req.price), str(req.volume))
# 若请求失败,则返回空字符串委托号
if not result:
return ''
# 本地委托号加1并将对应字符串保存到队列中返回基于本地委托号的vtOrderID
self.localNo += 1