Merge branch 'dev' of https://github.com/vnpy/vnpy into dev

This commit is contained in:
vn.py 2018-10-26 12:26:42 +08:00
commit 7ded11666f
3 changed files with 130 additions and 52 deletions

View File

@ -7,7 +7,7 @@ from multiprocessing.dummy import Pool
import requests
from enum import Enum
from typing import Any, Callable
from typing import Any, Callable, Optional
########################################################################
@ -151,16 +151,20 @@ class RestClient(object):
#----------------------------------------------------------------------
def _run(self):
session = self._createSession()
while self._active:
try:
request = self._queue.get(timeout=1)
try:
session = self._createSession()
while self._active:
try:
self._processRequest(request, session)
finally:
self._queue.task_done()
except Empty:
pass
request = self._queue.get(timeout=1)
try:
self._processRequest(request, session)
finally:
self._queue.task_done()
except Empty:
pass
except:
et, ev, tb = sys.exc_info()
self.onError(et, ev, tb, None)
#----------------------------------------------------------------------
def sign(self, request): # type: (Request)->Request
@ -181,9 +185,15 @@ class RestClient(object):
sys.stderr.write(str(request))
#----------------------------------------------------------------------
def onError(self, exceptionType, exceptionValue, tb, request):
def onError(self,
exceptionType, # type: type
exceptionValue, # type: Exception
tb,
request # type: Optional[Request]
):
"""
Python内部错误处理默认行为是仍给excepthook
:param request 如果是在处理请求的时候出错它的值就是对应的Request否则为None
"""
print("error in request : {}\n".format(request))
sys.excepthook(exceptionType, exceptionValue, tb)

View File

@ -51,7 +51,6 @@ class WebsocketClient(object):
#----------------------------------------------------------------------
def start(self):
"""启动"""
self._connect()
self._active = True
self._workerThread = Thread(target=self._run)
@ -130,29 +129,35 @@ class WebsocketClient(object):
"""
运行直到stop()被调用
"""
try:
self._connect()
# todo: onDisconnect
while self._active:
try:
ws = self._getWs()
if ws:
stream = ws.recv()
if not stream: # recv在阻塞的时候ws被关闭
self._reconnect()
continue
try:
data = json.loads(stream)
except ValueError as e:
print('websocket unable to parse data: ' + stream)
raise e
self.onPacket(data)
except websocket.WebSocketConnectionClosedException: # 在调用recv之前ws就被关闭了
self._reconnect()
except: # Python内部错误onPacket内出错
et, ev, tb = sys.exc_info()
self.onError(et, ev, tb)
self._reconnect()
# todo: onDisconnect
while self._active:
try:
ws = self._getWs()
if ws:
stream = ws.recv()
if not stream: # recv在阻塞的时候ws被关闭
self._reconnect()
continue
try:
data = json.loads(stream)
except ValueError as e:
print('websocket unable to parse data: ' + stream)
raise e
self.onPacket(data)
except websocket.WebSocketConnectionClosedException: # 在调用recv之前ws就被关闭了
self._reconnect()
except: # Python内部错误onPacket内出错
et, ev, tb = sys.exc_info()
self.onError(et, ev, tb)
self._reconnect()
except:
et, ev, tb = sys.exc_info()
self.onError(et, ev, tb)
self._reconnect()
#----------------------------------------------------------------------
def _runPing(self):

View File

@ -150,6 +150,8 @@ class OkexFuturesGateway(VtGateway):
self.tradeID = 0
self._orders = {} # type: Dict[str, Order]
self._remoteIds = {} # type: Dict[str, Order]
self._lastTicker = None # type: VtTickData
self._utcOffset = datetime.now() - datetime.utcnow()
#----------------------------------------------------------------------
def readConfig(self):
@ -211,9 +213,18 @@ class OkexFuturesGateway(VtGateway):
remoteSymbol = s.v1Symbol.lower()
remoteContractType = s.remoteContractType
# ticker
self.webSocket.sendPacket({
'event': 'addChannel',
'channel': 'ok_sub_futureusd_' + remoteSymbol.lower() + '_ticker_' + remoteContractType
'channel':
'ok_sub_futureusd_' + remoteSymbol.lower() + '_ticker_' + remoteContractType
})
# depth
self.webSocket.sendPacket({
'event': 'addChannel',
'channel':
'ok_sub_futureusd_' + remoteSymbol.lower() + '_depth' + remoteContractType + '_5'
})
#----------------------------------------------------------------------
@ -620,18 +631,64 @@ class OkexFuturesGateway(VtGateway):
uiSymbol = remoteSymbolToLocal(channel.symbol,
remoteContractTypeToLocal(
channel.remoteContractType))
self.onTick(VtTickData.createFromGateway(
gateway=self,
symbol=uiSymbol,
exchange=self.exchange,
lastPrice=data['last'],
lastVolume=data['vol'],
highPrice=data['high'],
lowPrice=data['low'],
openInterest=data['hold_amount'],
lowerLimit=data['limitLow'],
upperLimit=data['limitHigh'],
))
if self._lastTicker is None:
self._lastTicker = VtTickData.createFromGateway(
gateway=self,
symbol=uiSymbol,
exchange=self.exchange,
lastPrice=float(data['last']),
lastVolume=float(data['vol']),
highPrice=float(data['high']),
lowPrice=float(data['low']),
openInterest=float(data['hold_amount']),
lowerLimit=float(data['limitLow']),
upperLimit=float(data['limitHigh']),
)
else:
self._lastTicker.lastPrice = float(data['last'])
self._lastTicker.lastVolume = float(data['vol'])
self._lastTicker.highPrice = float(data['high'])
self._lastTicker.lowPrice = float(data['low'])
self._lastTicker.openInterest = float(data['hold_amount'])
self._lastTicker.lowerLimit = float(data['limitLow'])
self._lastTicker.upperLimit = float(data['limitHigh'])
self._lastTicker.datetime = datetime.now()
self._lastTicker.date = self._lastTicker.datetime.strftime('%Y%m%d')
self._lastTicker.time = self._lastTicker.datetime.strftime('%H:%M:%S')
self.onTick(self._lastTicker)
elif channel.type == ChannelType.Depth:
asks = data['asks']
bids = data['bids']
if self._lastTicker is not None:
timestamp = float(data['timestamp'])
ts = datetime.utcfromtimestamp(timestamp/1000) + self._utcOffset
self._lastTicker.askPrice1 = asks[0][0]
self._lastTicker.askPrice2 = asks[1][0]
self._lastTicker.askPrice3 = asks[2][0]
self._lastTicker.askPrice4 = asks[3][0]
self._lastTicker.askPrice5 = asks[4][0]
self._lastTicker.askVolume1 = asks[0][1]
self._lastTicker.askVolume2 = asks[1][1]
self._lastTicker.askVolume3 = asks[2][1]
self._lastTicker.askVolume4 = asks[3][1]
self._lastTicker.askVolume5 = asks[4][1]
self._lastTicker.bidPrice1 = bids[0][0]
self._lastTicker.bidPrice2 = bids[1][0]
self._lastTicker.bidPrice3 = bids[2][0]
self._lastTicker.bidPrice4 = bids[3][0]
self._lastTicker.bidPrice5 = bids[4][0]
self._lastTicker.bidVolume1 = bids[0][1]
self._lastTicker.bidVolume2 = bids[1][1]
self._lastTicker.bidVolume3 = bids[2][1]
self._lastTicker.bidVolume4 = bids[3][1]
self._lastTicker.bidVolume5 = bids[4][1]
self._lastTicker.datetime = ts
self._lastTicker.date = self._lastTicker.datetime.strftime('%Y%m%d')
self._lastTicker.time = self._lastTicker.datetime.strftime('%H:%M:%S')
self.onTick(self._lastTicker)
elif channel.type == ChannelType.Position:
symbol = data['symbol']
positions = data['positions']
@ -771,16 +828,22 @@ def parseChannel(channel): # type: (str)->Channel
easySymbol + '_' + crash,
remotePrefixToRemoteContractType(contractTypePrefix))
elif sp[-1] == 'week':
# if lsp == 9:
# _, _, _, easySymbol, crash, typeName, contractTypePrefix, _, depth = sp
# return ExtraSymbolChannel(ChannelType.Depth, easySymbol + '_' + crash,
# remotePrefixToRemoteContractType(contractTypePrefix),
# depth)
if lsp == 8:
_, _, _, easySymbol, crash, typeName, contractTypePrefix, _ = sp
return Channel(ChannelType.Tick,
easySymbol + '_' + crash,
remotePrefixToRemoteContractType(contractTypePrefix))
if sp[-1] == '5':
if lsp == 7: # eg "ok_sub_futureusd_eth_usd_depthquarter_5"
_, _, _, easySymbol, crash, typeName_contractTypePrefix, depth = sp
return Channel(ChannelType.Depth, easySymbol + '_' + crash,
remotePrefixToRemoteContractType(typeName_contractTypePrefix[5:]),
depth)
if lsp == 8: # eg "ok_sub_futureusd_eth_usd_depthnext_week_5"
_, _, _, easySymbol, crash, typeName_contractTypePrefix, _, depth = sp
return Channel(ChannelType.Depth, easySymbol + '_' + crash,
remotePrefixToRemoteContractType(typeName_contractTypePrefix[5:]),
depth)
#----------------------------------------------------------------------