[Add] OkexFutureGateway.subscribe

[Add] OkexFutureApi增加WebSocket客户端实现
This commit is contained in:
nanoric 2018-10-16 05:48:49 -04:00
parent e8ded9edf5
commit c141360847
3 changed files with 261 additions and 169 deletions

View File

@ -12,8 +12,20 @@ class _OkexFutureCustomExtra(object):
self.onFailed = onFailed self.onFailed = onFailed
self.onSuccess = onSuccess self.onSuccess = onSuccess
self.extra = extra self.extra = extra
########################################################################
class OkexFutureEasySymbol(object):
BTC = 'btc'
LTC = 'ltc'
ETH = 'eth'
ETC = 'etc'
BCH = 'bch'
EOS = 'eos'
XRP = 'xrp'
BTG = 'btg'
######################################################################## ########################################################################
class OkexFutureSymbol(object): class OkexFutureSymbol(object):
BTC = 'btc_usd' BTC = 'btc_usd'
@ -428,15 +440,16 @@ class OkexFutureRestClient(OkexFutureRestBase):
return restErrorCodeMap[code] return restErrorCodeMap[code]
#---------------------------------------------------------------------- ########################################################################
class OkexFutureWebSocketApi(OkexFutureWebSocketBase): class OkexFutureWebSocketClient(OkexFutureWebSocketBase):
def subscribe(self, packet): #----------------------------------------------------------------------
pass def subscribe(self, easySymbol, contractType): # type: (OkexFutureEasySymbol, OkexFutureContractType)->None
self.sendPacket({
'event': 'addChannel',
'channel': 'ok_sub_futureusd_' + easySymbol + '_ticker_' + contractType
})
def onPacket(self, packet):
pass
restErrorCodeMap = { restErrorCodeMap = {
0: '远程服务器并未给出错误代码', 0: '远程服务器并未给出错误代码',

View File

@ -1,2 +1,2 @@
from .OkexFutureApi import OkexFutureSymbol, OkexFutureContractType, OkexFutureOrder, OkexFutureOrderStatus, OkexFuturePosition, \ from .OkexFutureApi import OkexFutureRestClient, OkexFutureWebSocketClient, OkexFutureSymbol, OkexFutureContractType, OkexFutureOrder, OkexFutureOrderStatus, OkexFuturePosition, \
OkexFuturePositionDetail, OkexFuturePriceType, OkexFutureRestClient, OkexFutureUserInfo OkexFuturePositionDetail, OkexFuturePriceType, OkexFutureUserInfo

View File

@ -4,83 +4,14 @@ from __future__ import print_function
import json import json
from abc import abstractmethod, abstractproperty from abc import abstractmethod, abstractproperty
from datetime import datetime
from enum import Enum
from typing import Dict from typing import Dict
from vnpy.api.okexfuture.OkexFutureApi import * from vnpy.api.okexfuture.OkexFutureApi import *
from vnpy.trader.vtFunction import getJsonPath from vnpy.trader.vtFunction import getJsonPath
from vnpy.trader.vtGateway import * from vnpy.trader.vtGateway import *
_orderTypeMap = {
(constant.DIRECTION_LONG, constant.OFFSET_OPEN): OkexFutureOrderType.OpenLong,
(constant.DIRECTION_SHORT, constant.OFFSET_OPEN): OkexFutureOrderType.OpenShort,
(constant.DIRECTION_LONG, constant.OFFSET_CLOSE): OkexFutureOrderType.CloseLong,
(constant.DIRECTION_SHORT, constant.OFFSET_CLOSE): OkexFutureOrderType.CloseShort,
}
_orderTypeMapReverse = {v: k for k, v in _orderTypeMap.items()}
_contractTypeMap = {
'THISWEEK': OkexFutureContractType.ThisWeek,
'NEXTWEEK': OkexFutureContractType.NextWeek,
'QUARTER': OkexFutureContractType.Quarter,
}
_contractTypeMapReverse = {v: k for k, v in _contractTypeMap.items()}
_remoteSymbols = {
OkexFutureSymbol.BTC,
OkexFutureSymbol.LTC,
OkexFutureSymbol.ETH,
OkexFutureSymbol.ETC,
OkexFutureSymbol.BCH,
}
# symbols for ui,
# keys:给用户看的symbols : f"{internalSymbol}_{contractType}"
# values: API接口使用的symbol和contractType字段
_symbolsForUi = {(remoteSymbol.upper() + '_' + upperContractType.upper()): (remoteSymbol, remoteContractType)
for remoteSymbol in _remoteSymbols
for upperContractType, remoteContractType in
_contractTypeMap.items()} # type: Dict[str, List[str, str]]
_symbolsForUiReverse = {v: k for k, v in _symbolsForUi.items()}
#----------------------------------------------------------------------
def localOrderTypeToRemote(direction, offset): # type: (str, str)->str
return _orderTypeMap[(direction, offset)]
#----------------------------------------------------------------------
def remoteOrderTypeToLocal(orderType): # type: (str)->(str, str)
"""
:param orderType:
:return: direction, offset
"""
return _orderTypeMapReverse[orderType]
#----------------------------------------------------------------------
def localContractTypeToRemote(localContractType):
return _contractTypeMap[localContractType]
#----------------------------------------------------------------------
def remoteContractTypeToLocal(remoteContractType):
return _contractTypeMapReverse[remoteContractType]
#----------------------------------------------------------------------
def localSymbolToRemote(symbol): # type: (str)->(OkexFutureSymbol, OkexFutureContractType)
"""
:return: remoteSymbol, remoteContractType
"""
return _symbolsForUi[symbol]
#----------------------------------------------------------------------
def remoteSymbolToLocal(remoteSymbol, localContractType):
return remoteSymbol.upper() + localContractType
######################################################################## ########################################################################
class VnpyGateway(VtGateway): class VnpyGateway(VtGateway):
@ -89,15 +20,6 @@ class VnpyGateway(VtGateway):
于是我设计了这个类将重复代码抽取出来简化gateway的实现 于是我设计了这个类将重复代码抽取出来简化gateway的实现
""" """
#----------------------------------------------------------------------
def __init__(self, eventEngine):
super(VnpyGateway, self).__init__(eventEngine, self.gatewayName)
#----------------------------------------------------------------------
@abstractproperty
def gatewayName(self): # type: ()->str
return 'VnpyGateway'
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def readConfig(self): def readConfig(self):
""" """
@ -146,10 +68,15 @@ class OkexFutureGateway(VnpyGateway):
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def __init__(self, eventEngine, *_, **__): # args, kwargs is needed for compatibility def __init__(self, eventEngine, *_, **__): # args, kwargs is needed for compatibility
"""Constructor""" """Constructor"""
super(OkexFutureGateway, self).__init__(eventEngine) super(OkexFutureGateway, self).__init__(eventEngine, 'OkexFutureGateway')
self.apiKey = None # type: str self.apiKey = None # type: str
self.apiSecret = None # type: str self.apiSecret = None # type: str
self.api = OkexFutureRestClient()
self.restApi = OkexFutureRestClient()
self.webSocket = OkexFutureWebSocketClient()
self.webSocket.onPacket = self._onWebsocketPacket
self.leverRate = 1 self.leverRate = 1
self.symbols = [] self.symbols = []
@ -157,11 +84,6 @@ class OkexFutureGateway(VnpyGateway):
self._orders = {} # type: Dict[str, _Order] self._orders = {} # type: Dict[str, _Order]
self._remoteIds = {} # type: Dict[str, _Order] self._remoteIds = {} # type: Dict[str, _Order]
#----------------------------------------------------------------------
@property
def gatewayName(self):
return 'OkexFutureGateway'
#---------------------------------------------------------------------- #----------------------------------------------------------------------
@property @property
def exchange(self): # type: ()->str def exchange(self): # type: ()->str
@ -190,12 +112,16 @@ class OkexFutureGateway(VnpyGateway):
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def connect(self): def connect(self):
self.loadSetting() self.loadSetting()
self.api.init(self.apiKey, self.apiSecret) self.restApi.init(self.apiKey, self.apiSecret)
self.webSocket.init(self.apiKey, self.apiSecret)
self.restApi.start()
self.webSocket.start()
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def subscribe(self, subscribeReq): def subscribe(self, subscribeReq): # type: (VtSubscribeReq)->None
"""订阅行情""" """订阅行情"""
pass remoteSymbol, remoteContractType = localSymbolToRemote(subscribeReq.symbol)
return self.webSocket.subscribe(remoteSymbol, remoteContractType)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def _getOrderByLocalId(self, localId): def _getOrderByLocalId(self, localId):
@ -241,16 +167,16 @@ class OkexFutureGateway(VnpyGateway):
if vtRequest.priceType == constant.PRICETYPE_MARKETPRICE: if vtRequest.priceType == constant.PRICETYPE_MARKETPRICE:
userMarketPrice = True userMarketPrice = True
self.api.sendOrder(symbol=remoteSymbol, self.restApi.sendOrder(symbol=remoteSymbol,
contractType=remoteContractType, contractType=remoteContractType,
orderType=orderType, orderType=orderType,
volume=vtRequest.volume, volume=vtRequest.volume,
price=vtRequest.price, price=vtRequest.price,
useMarketPrice=userMarketPrice, useMarketPrice=userMarketPrice,
leverRate=self.leverRate, leverRate=self.leverRate,
onSuccess=self._onOrderSent, onSuccess=self._onOrderSent,
extra=None, extra=None,
) )
return myorder.localId return myorder.localId
@ -259,12 +185,12 @@ class OkexFutureGateway(VnpyGateway):
"""撤单""" """撤单"""
myorder = self._getOrderByLocalId(vtCancel.orderID) myorder = self._getOrderByLocalId(vtCancel.orderID)
symbol, contractType = localSymbolToRemote(vtCancel.symbol) symbol, contractType = localSymbolToRemote(vtCancel.symbol)
self.api.cancelOrder(symbol=symbol, self.restApi.cancelOrder(symbol=symbol,
contractType=contractType, contractType=contractType,
orderId=myorder.remoteId, orderId=myorder.remoteId,
onSuccess=self._onOrderCanceled, onSuccess=self._onOrderCanceled,
extra=myorder, extra=myorder,
) )
# cancelDict: 不存在的没有localId就没有remoteId没有remoteId何来cancel # cancelDict: 不存在的没有localId就没有remoteId没有remoteId何来cancel
#---------------------------------------------------------------------- #----------------------------------------------------------------------
@ -284,15 +210,15 @@ class OkexFutureGateway(VnpyGateway):
remoteContractType = contractType remoteContractType = contractType
localContractType = remoteContractTypeToLocal(remoteContractType) localContractType = remoteContractTypeToLocal(remoteContractType)
self.api.queryOrders(symbol=symbol, self.restApi.queryOrders(symbol=symbol,
contractType=remoteContractType, contractType=remoteContractType,
status=status, status=status,
onSuccess=self._onQueryOrders, onSuccess=self._onQueryOrders,
extra=localContractType) extra=localContractType)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def qryAccount(self): def qryAccount(self):
self.api.queryUserInfo(onSuccess=self._onQueryAccount) self.restApi.queryUserInfo(onSuccess=self._onQueryAccount)
"""查询账户资金""" """查询账户资金"""
pass pass
@ -301,16 +227,17 @@ class OkexFutureGateway(VnpyGateway):
"""查询持仓""" """查询持仓"""
for remoteSymbol in _remoteSymbols: for remoteSymbol in _remoteSymbols:
for localContractType, remoteContractType in _contractTypeMap.items(): for localContractType, remoteContractType in _contractTypeMap.items():
self.api.queryPosition(remoteSymbol, self.restApi.queryPosition(remoteSymbol,
remoteContractType, remoteContractType,
onSuccess=self._onQueryPosition, onSuccess=self._onQueryPosition,
extra=localContractType extra=localContractType
) )
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def close(self): def close(self):
"""关闭""" """关闭"""
self.api.stop() self.restApi.stop()
self.webSocket.stop()
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def _onOrderSent(self, remoteId, myorder): #type: (str, _Order)->None def _onOrderSent(self, remoteId, myorder): #type: (str, _Order)->None
@ -319,23 +246,6 @@ class OkexFutureGateway(VnpyGateway):
self._saveRemoteId(remoteId, myorder) self._saveRemoteId(remoteId, myorder)
self.onOrder(myorder.vtOrder) self.onOrder(myorder.vtOrder)
# #----------------------------------------------------------------------
# def _pushOrderAsTraded(self, order):
# trade = VtTradeData()
# trade.gatewayName = order.gatewayName
# trade.symbol = order.symbol
# trade.vtSymbol = order.vtSymbol
# trade.orderID = order.orderID
# trade.vtOrderID = order.vtOrderID
# self.tradeID += 1
# trade.tradeID = str(self.tradeID)
# trade.vtTradeID = '.'.join([self.gatewayName, trade.tradeID])
# trade.direction = order.direction
# trade.price = order.price
# trade.volume = order.tradedVolume
# trade.tradeTime = datetime.now().strftime('%H:%M:%S')
# self.onTrade(trade)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
@staticmethod @staticmethod
def _onOrderCanceled(myorder): #type: (_Order)->None def _onOrderCanceled(myorder): #type: (_Order)->None
@ -367,12 +277,6 @@ class OkexFutureGateway(VnpyGateway):
self._saveRemoteId(myorder.remoteId, myorder) self._saveRemoteId(myorder.remoteId, myorder)
self.onOrder(myorder.vtOrder) self.onOrder(myorder.vtOrder)
# # 如果该订单已经交易完成,推送交易完成消息
# # todo: 这样写会导致同一个订单产生多次交易完成消息
# if order.status == OkexFutureOrderStatus.Finished:
# myorder.vtOrder.status = constant.STATUS_ALLTRADED
# self._pushOrderAsTraded(myorder.vtOrder)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def _onQueryAccount(self, infos, _): # type: (List[OkexFutureUserInfo], Any)->None def _onQueryAccount(self, infos, _): # type: (List[OkexFutureUserInfo], Any)->None
for info in infos: for info in infos:
@ -390,27 +294,202 @@ class OkexFutureGateway(VnpyGateway):
localContractType = extra localContractType = extra
for info in posinfo.holding: for info in posinfo.holding:
# 先生成多头持仓 # 先生成多头持仓
pos = VtPositionData() pos = VtPositionData.createFromGateway(
pos.gatewayName = self.gatewayName gateway=self,
pos.symbol = remoteSymbolToLocal(pos.symbol, localContractType) exchange=self.exchange,
pos.exchange = self.exchange symbol=remoteSymbolToLocal(info.symbol, localContractType),
pos.vtSymbol = '.'.join([pos.symbol, pos.exchange]) direction=constant.DIRECTION_NET,
position=float(info.buyAmount),
pos.direction = constant.DIRECTION_NET )
pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction])
pos.position = float(info.buyAmount)
self.onPosition(pos) self.onPosition(pos)
# 再生存空头持仓 # 再生存空头持仓
pos = VtPositionData() pos = VtPositionData.createFromGateway(
pos.gatewayName = self.gatewayName gateway=self,
pos.symbol = remoteSymbolToLocal(pos.symbol, localContractType) exchange=self.exchange,
pos.exchange = self.exchange symbol=remoteSymbolToLocal(info.symbol, localContractType),
pos.vtSymbol = '.'.join([pos.symbol, pos.exchange]) direction=constant.DIRECTION_SHORT,
position=float(info.sellAmount),
pos.direction = constant.DIRECTION_SHORT )
pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction])
pos.position = float(info.sellAmount)
self.onPosition(pos) self.onPosition(pos)
#----------------------------------------------------------------------
def _onWebsocketPacket(self, packets): # type: (dict)->None
for packet in packets:
print('packets:')
print(packets)
channelName = None
if 'channel' in packet:
channelName = packet['channel']
if not channelName or channelName == 'addChannel':
return
packet = packet['data']
channel = parseChannel(channelName) # type: ExtraSymbolChannel
if channel.type == ChannelType.Tick:
uiSymbol = remoteSymbolToLocal(channel.symbol, remoteContractTypeToLocal(channel.remoteContractType))
tick = VtTickData.createFromGateway(
gateway=self,
symbol=uiSymbol,
exchange=self.exchange,
lastPrice=float(packet['last']),
lastVolume=float(packet['vol']),
highPrice=float(packet['high']),
lowPrice=float(packet['low']),
openInterest=int(packet['hold_amount']),
lowerLimit=float(packet['limitLow']),
upperLimit=float(packet['limitHigh'])
)
self.onTick(tick)
#----------------------------------------------------------------------
def localOrderTypeToRemote(direction, offset): # type: (str, str)->str
return _orderTypeMap[(direction, offset)]
#----------------------------------------------------------------------
def remoteOrderTypeToLocal(orderType): # type: (str)->(str, str)
"""
:param orderType:
:return: direction, offset
"""
return _orderTypeMapReverse[orderType]
#----------------------------------------------------------------------
def localContractTypeToRemote(localContractType):
return _contractTypeMap[localContractType]
#----------------------------------------------------------------------
def remoteContractTypeToLocal(remoteContractType):
return _contractTypeMapReverse[remoteContractType]
#----------------------------------------------------------------------
def localSymbolToRemote(symbol): # type: (str)->(OkexFutureSymbol, OkexFutureContractType)
"""
:return: remoteSymbol, remoteContractType
"""
return _symbolsForUi[symbol]
#----------------------------------------------------------------------
def remoteSymbolToLocal(remoteSymbol, localContractType):
return remoteSymbol.upper() + '_' + localContractType
#----------------------------------------------------------------------
def remotePrefixToRemoteContractType(prefix):
return _prefixForRemoteContractType[prefix]
#----------------------------------------------------------------------
class ChannelType(Enum):
Login = 1
ForecastPrice = 2
Tick = 3
Depth = 4
Trade = 5
Index = 6
########################################################################
class Channel(object):
#----------------------------------------------------------------------
def __init__(self, type):
self.type = type
########################################################################
class SymbolChannel(Channel):
#----------------------------------------------------------------------
def __init__(self, type, symbol):
super(SymbolChannel, self).__init__(type)
self.symbol = symbol
########################################################################
class FutureSymbolChannel(SymbolChannel):
#----------------------------------------------------------------------
def __init__(self, type, symbol, remoteContractType):
super(FutureSymbolChannel, self).__init__(type, symbol)
self.remoteContractType = remoteContractType
########################################################################
class ExtraSymbolChannel(FutureSymbolChannel):
#----------------------------------------------------------------------
def __init__(self, type, symbol, remoteContractType, extra):
super(ExtraSymbolChannel, self).__init__(type, symbol, remoteContractType)
self.extra = extra
#----------------------------------------------------------------------
def parseChannel(channel): # type: (str)->Channel
if channel == 'login':
return Channel(ChannelType.Login)
elif channel[4:12] == 'forecast': # eg: 'btc_forecast_price'
return SymbolChannel(ChannelType.ForecastPrice, channel[:3])
sp = channel.split('_')
if sp[-1] == 'index': # eg: 'ok_sub_futureusd_btc_index'
return SymbolChannel(ChannelType.Index, channel[17:20])
l = len(sp)
if len(sp) == 9:
_, _, _, easySymbol, crash, typeName, contractTypePrefix, _, depth = sp
return ExtraSymbolChannel(ChannelType.Depth, easySymbol + '_' + crash,
remotePrefixToRemoteContractType(contractTypePrefix),
depth)
_, _, _, easySymbol, crash, typeName, contractTypePrefix, _ = sp
return FutureSymbolChannel(ChannelType.Tick, easySymbol + '_' + crash,
remotePrefixToRemoteContractType(contractTypePrefix))
_orderTypeMap = {
(constant.DIRECTION_LONG, constant.OFFSET_OPEN): OkexFutureOrderType.OpenLong,
(constant.DIRECTION_SHORT, constant.OFFSET_OPEN): OkexFutureOrderType.OpenShort,
(constant.DIRECTION_LONG, constant.OFFSET_CLOSE): OkexFutureOrderType.CloseLong,
(constant.DIRECTION_SHORT, constant.OFFSET_CLOSE): OkexFutureOrderType.CloseShort,
}
_orderTypeMapReverse = {v: k for k, v in _orderTypeMap.items()}
_prefixForRemoteContractType = {v.split('_')[0]: v for k, v in OkexFutureContractType.__dict__.items() if
not k.startswith('_')}
_contractTypeMap = {
k.upper(): v for k, v in OkexFutureContractType.__dict__.items() if not k.startswith('_')
}
_contractTypeMapReverse = {v: k for k, v in _contractTypeMap.items()}
_easySymbols = {
v for k, v in OkexFutureEasySymbol.__dict__.items() if not k.startswith('_')
}
_remoteSymbols = {
v for k, v in OkexFutureSymbol.__dict__.items() if not k.startswith('_')
}
# symbols for ui,
# keys:给用户看的symbols : f"{internalSymbol}_{contractType}"
# values: API接口使用的symbol和contractType字段
_symbolsForUi = {(remoteSymbol.upper() + '_' + upperContractType.upper()): (remoteSymbol, remoteContractType)
for remoteSymbol in _remoteSymbols
for upperContractType, remoteContractType in
_contractTypeMap.items()} # type: Dict[str, List[str, str]]
_symbolsForUiReverse = {v: k for k, v in _symbolsForUi.items()}
_channel_for_subscribe = {
'ok_sub_futureusd_' + easySymbol + '_ticker_' + remoteContractType: (easySymbol, remoteContractType)
for easySymbol in _easySymbols
for remoteContractType in _contractTypeMap.values()
}