From c141360847e2293120bb06758c8fb7c67eac3a24 Mon Sep 17 00:00:00 2001 From: nanoric Date: Tue, 16 Oct 2018 05:48:49 -0400 Subject: [PATCH] =?UTF-8?q?[Add]=20OkexFutureGateway.subscribe=20[Add]=20O?= =?UTF-8?q?kexFutureApi=E5=A2=9E=E5=8A=A0WebSocket=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E7=AB=AF=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/api/okexfuture/OkexFutureApi.py | 31 +- vnpy/api/okexfuture/__init__.py | 4 +- .../okexFutureGateway/okexFutureGateway.py | 395 +++++++++++------- 3 files changed, 261 insertions(+), 169 deletions(-) diff --git a/vnpy/api/okexfuture/OkexFutureApi.py b/vnpy/api/okexfuture/OkexFutureApi.py index 1fc96cbb..038dde13 100644 --- a/vnpy/api/okexfuture/OkexFutureApi.py +++ b/vnpy/api/okexfuture/OkexFutureApi.py @@ -12,8 +12,20 @@ class _OkexFutureCustomExtra(object): self.onFailed = onFailed self.onSuccess = onSuccess self.extra = extra - - + + +######################################################################## +class OkexFutureEasySymbol(object): + BTC = 'btc' + LTC = 'ltc' + ETH = 'eth' + ETC = 'etc' + BCH = 'bch' + EOS = 'eos' + XRP = 'xrp' + BTG = 'btg' + + ######################################################################## class OkexFutureSymbol(object): BTC = 'btc_usd' @@ -428,15 +440,16 @@ class OkexFutureRestClient(OkexFutureRestBase): return restErrorCodeMap[code] -#---------------------------------------------------------------------- -class OkexFutureWebSocketApi(OkexFutureWebSocketBase): +######################################################################## +class OkexFutureWebSocketClient(OkexFutureWebSocketBase): - def subscribe(self, packet): - pass + #---------------------------------------------------------------------- + def subscribe(self, easySymbol, contractType): # type: (OkexFutureEasySymbol, OkexFutureContractType)->None + self.sendPacket({ + 'event': 'addChannel', + 'channel': 'ok_sub_futureusd_' + easySymbol + '_ticker_' + contractType + }) - def onPacket(self, packet): - pass - restErrorCodeMap = { 0: '远程服务器并未给出错误代码', diff --git a/vnpy/api/okexfuture/__init__.py b/vnpy/api/okexfuture/__init__.py index 71802197..6bac0f36 100644 --- a/vnpy/api/okexfuture/__init__.py +++ b/vnpy/api/okexfuture/__init__.py @@ -1,2 +1,2 @@ -from .OkexFutureApi import OkexFutureSymbol, OkexFutureContractType, OkexFutureOrder, OkexFutureOrderStatus, OkexFuturePosition, \ - OkexFuturePositionDetail, OkexFuturePriceType, OkexFutureRestClient, OkexFutureUserInfo +from .OkexFutureApi import OkexFutureRestClient, OkexFutureWebSocketClient, OkexFutureSymbol, OkexFutureContractType, OkexFutureOrder, OkexFutureOrderStatus, OkexFuturePosition, \ + OkexFuturePositionDetail, OkexFuturePriceType, OkexFutureUserInfo diff --git a/vnpy/trader/gateway/okexFutureGateway/okexFutureGateway.py b/vnpy/trader/gateway/okexFutureGateway/okexFutureGateway.py index 23bc9bed..618634c0 100644 --- a/vnpy/trader/gateway/okexFutureGateway/okexFutureGateway.py +++ b/vnpy/trader/gateway/okexFutureGateway/okexFutureGateway.py @@ -4,83 +4,14 @@ from __future__ import print_function import json from abc import abstractmethod, abstractproperty -from datetime import datetime +from enum import Enum from typing import Dict from vnpy.api.okexfuture.OkexFutureApi import * from vnpy.trader.vtFunction import getJsonPath from vnpy.trader.vtGateway import * -_orderTypeMap = { - (constant.DIRECTION_LONG, constant.OFFSET_OPEN): OkexFutureOrderType.OpenLong, - (constant.DIRECTION_SHORT, constant.OFFSET_OPEN): OkexFutureOrderType.OpenShort, - (constant.DIRECTION_LONG, constant.OFFSET_CLOSE): OkexFutureOrderType.CloseLong, - (constant.DIRECTION_SHORT, constant.OFFSET_CLOSE): OkexFutureOrderType.CloseShort, -} -_orderTypeMapReverse = {v: k for k, v in _orderTypeMap.items()} - -_contractTypeMap = { - 'THISWEEK': OkexFutureContractType.ThisWeek, - 'NEXTWEEK': OkexFutureContractType.NextWeek, - 'QUARTER': OkexFutureContractType.Quarter, -} -_contractTypeMapReverse = {v: k for k, v in _contractTypeMap.items()} - -_remoteSymbols = { - OkexFutureSymbol.BTC, - OkexFutureSymbol.LTC, - OkexFutureSymbol.ETH, - OkexFutureSymbol.ETC, - OkexFutureSymbol.BCH, -} - -# symbols for ui, -# keys:给用户看的symbols : f"{internalSymbol}_{contractType}" -# values: API接口使用的symbol和contractType字段 -_symbolsForUi = {(remoteSymbol.upper() + '_' + upperContractType.upper()): (remoteSymbol, remoteContractType) - for remoteSymbol in _remoteSymbols - for upperContractType, remoteContractType in - _contractTypeMap.items()} # type: Dict[str, List[str, str]] -_symbolsForUiReverse = {v: k for k, v in _symbolsForUi.items()} - - -#---------------------------------------------------------------------- -def localOrderTypeToRemote(direction, offset): # type: (str, str)->str - return _orderTypeMap[(direction, offset)] - - -#---------------------------------------------------------------------- -def remoteOrderTypeToLocal(orderType): # type: (str)->(str, str) - """ - :param orderType: - :return: direction, offset - """ - return _orderTypeMapReverse[orderType] - - -#---------------------------------------------------------------------- -def localContractTypeToRemote(localContractType): - return _contractTypeMap[localContractType] - - -#---------------------------------------------------------------------- -def remoteContractTypeToLocal(remoteContractType): - return _contractTypeMapReverse[remoteContractType] - - -#---------------------------------------------------------------------- -def localSymbolToRemote(symbol): # type: (str)->(OkexFutureSymbol, OkexFutureContractType) - """ - :return: remoteSymbol, remoteContractType - """ - return _symbolsForUi[symbol] - - -#---------------------------------------------------------------------- -def remoteSymbolToLocal(remoteSymbol, localContractType): - return remoteSymbol.upper() + localContractType - ######################################################################## class VnpyGateway(VtGateway): @@ -89,15 +20,6 @@ class VnpyGateway(VtGateway): 于是我设计了这个类,将重复代码抽取出来,简化gateway的实现 """ - #---------------------------------------------------------------------- - def __init__(self, eventEngine): - super(VnpyGateway, self).__init__(eventEngine, self.gatewayName) - - #---------------------------------------------------------------------- - @abstractproperty - def gatewayName(self): # type: ()->str - return 'VnpyGateway' - #---------------------------------------------------------------------- def readConfig(self): """ @@ -146,10 +68,15 @@ class OkexFutureGateway(VnpyGateway): #---------------------------------------------------------------------- def __init__(self, eventEngine, *_, **__): # args, kwargs is needed for compatibility """Constructor""" - super(OkexFutureGateway, self).__init__(eventEngine) + super(OkexFutureGateway, self).__init__(eventEngine, 'OkexFutureGateway') self.apiKey = None # type: str self.apiSecret = None # type: str - self.api = OkexFutureRestClient() + + self.restApi = OkexFutureRestClient() + + self.webSocket = OkexFutureWebSocketClient() + self.webSocket.onPacket = self._onWebsocketPacket + self.leverRate = 1 self.symbols = [] @@ -157,11 +84,6 @@ class OkexFutureGateway(VnpyGateway): self._orders = {} # type: Dict[str, _Order] self._remoteIds = {} # type: Dict[str, _Order] - #---------------------------------------------------------------------- - @property - def gatewayName(self): - return 'OkexFutureGateway' - #---------------------------------------------------------------------- @property def exchange(self): # type: ()->str @@ -190,12 +112,16 @@ class OkexFutureGateway(VnpyGateway): #---------------------------------------------------------------------- def connect(self): self.loadSetting() - self.api.init(self.apiKey, self.apiSecret) + self.restApi.init(self.apiKey, self.apiSecret) + self.webSocket.init(self.apiKey, self.apiSecret) + self.restApi.start() + self.webSocket.start() #---------------------------------------------------------------------- - def subscribe(self, subscribeReq): + def subscribe(self, subscribeReq): # type: (VtSubscribeReq)->None """订阅行情""" - pass + remoteSymbol, remoteContractType = localSymbolToRemote(subscribeReq.symbol) + return self.webSocket.subscribe(remoteSymbol, remoteContractType) #---------------------------------------------------------------------- def _getOrderByLocalId(self, localId): @@ -241,16 +167,16 @@ class OkexFutureGateway(VnpyGateway): if vtRequest.priceType == constant.PRICETYPE_MARKETPRICE: userMarketPrice = True - self.api.sendOrder(symbol=remoteSymbol, - contractType=remoteContractType, - orderType=orderType, - volume=vtRequest.volume, - price=vtRequest.price, - useMarketPrice=userMarketPrice, - leverRate=self.leverRate, - onSuccess=self._onOrderSent, - extra=None, - ) + self.restApi.sendOrder(symbol=remoteSymbol, + contractType=remoteContractType, + orderType=orderType, + volume=vtRequest.volume, + price=vtRequest.price, + useMarketPrice=userMarketPrice, + leverRate=self.leverRate, + onSuccess=self._onOrderSent, + extra=None, + ) return myorder.localId @@ -259,12 +185,12 @@ class OkexFutureGateway(VnpyGateway): """撤单""" myorder = self._getOrderByLocalId(vtCancel.orderID) symbol, contractType = localSymbolToRemote(vtCancel.symbol) - self.api.cancelOrder(symbol=symbol, - contractType=contractType, - orderId=myorder.remoteId, - onSuccess=self._onOrderCanceled, - extra=myorder, - ) + self.restApi.cancelOrder(symbol=symbol, + contractType=contractType, + orderId=myorder.remoteId, + onSuccess=self._onOrderCanceled, + extra=myorder, + ) # cancelDict: 不存在的,没有localId就没有remoteId,没有remoteId何来cancel #---------------------------------------------------------------------- @@ -284,15 +210,15 @@ class OkexFutureGateway(VnpyGateway): remoteContractType = contractType localContractType = remoteContractTypeToLocal(remoteContractType) - self.api.queryOrders(symbol=symbol, - contractType=remoteContractType, - status=status, - onSuccess=self._onQueryOrders, - extra=localContractType) + self.restApi.queryOrders(symbol=symbol, + contractType=remoteContractType, + status=status, + onSuccess=self._onQueryOrders, + extra=localContractType) #---------------------------------------------------------------------- def qryAccount(self): - self.api.queryUserInfo(onSuccess=self._onQueryAccount) + self.restApi.queryUserInfo(onSuccess=self._onQueryAccount) """查询账户资金""" pass @@ -301,16 +227,17 @@ class OkexFutureGateway(VnpyGateway): """查询持仓""" for remoteSymbol in _remoteSymbols: for localContractType, remoteContractType in _contractTypeMap.items(): - self.api.queryPosition(remoteSymbol, - remoteContractType, - onSuccess=self._onQueryPosition, - extra=localContractType - ) + self.restApi.queryPosition(remoteSymbol, + remoteContractType, + onSuccess=self._onQueryPosition, + extra=localContractType + ) #---------------------------------------------------------------------- def close(self): """关闭""" - self.api.stop() + self.restApi.stop() + self.webSocket.stop() #---------------------------------------------------------------------- def _onOrderSent(self, remoteId, myorder): #type: (str, _Order)->None @@ -319,23 +246,6 @@ class OkexFutureGateway(VnpyGateway): self._saveRemoteId(remoteId, myorder) self.onOrder(myorder.vtOrder) - # #---------------------------------------------------------------------- - # def _pushOrderAsTraded(self, order): - # trade = VtTradeData() - # trade.gatewayName = order.gatewayName - # trade.symbol = order.symbol - # trade.vtSymbol = order.vtSymbol - # trade.orderID = order.orderID - # trade.vtOrderID = order.vtOrderID - # self.tradeID += 1 - # trade.tradeID = str(self.tradeID) - # trade.vtTradeID = '.'.join([self.gatewayName, trade.tradeID]) - # trade.direction = order.direction - # trade.price = order.price - # trade.volume = order.tradedVolume - # trade.tradeTime = datetime.now().strftime('%H:%M:%S') - # self.onTrade(trade) - #---------------------------------------------------------------------- @staticmethod def _onOrderCanceled(myorder): #type: (_Order)->None @@ -367,12 +277,6 @@ class OkexFutureGateway(VnpyGateway): self._saveRemoteId(myorder.remoteId, myorder) self.onOrder(myorder.vtOrder) - # # 如果该订单已经交易完成,推送交易完成消息 - # # todo: 这样写会导致同一个订单产生多次交易完成消息 - # if order.status == OkexFutureOrderStatus.Finished: - # myorder.vtOrder.status = constant.STATUS_ALLTRADED - # self._pushOrderAsTraded(myorder.vtOrder) - #---------------------------------------------------------------------- def _onQueryAccount(self, infos, _): # type: (List[OkexFutureUserInfo], Any)->None for info in infos: @@ -390,27 +294,202 @@ class OkexFutureGateway(VnpyGateway): localContractType = extra for info in posinfo.holding: # 先生成多头持仓 - pos = VtPositionData() - pos.gatewayName = self.gatewayName - pos.symbol = remoteSymbolToLocal(pos.symbol, localContractType) - pos.exchange = self.exchange - pos.vtSymbol = '.'.join([pos.symbol, pos.exchange]) - - pos.direction = constant.DIRECTION_NET - pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction]) - pos.position = float(info.buyAmount) + pos = VtPositionData.createFromGateway( + gateway=self, + exchange=self.exchange, + symbol=remoteSymbolToLocal(info.symbol, localContractType), + direction=constant.DIRECTION_NET, + position=float(info.buyAmount), + ) self.onPosition(pos) # 再生存空头持仓 - pos = VtPositionData() - pos.gatewayName = self.gatewayName - pos.symbol = remoteSymbolToLocal(pos.symbol, localContractType) - pos.exchange = self.exchange - pos.vtSymbol = '.'.join([pos.symbol, pos.exchange]) - - pos.direction = constant.DIRECTION_SHORT - pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction]) - pos.position = float(info.sellAmount) + pos = VtPositionData.createFromGateway( + gateway=self, + exchange=self.exchange, + symbol=remoteSymbolToLocal(info.symbol, localContractType), + direction=constant.DIRECTION_SHORT, + position=float(info.sellAmount), + ) self.onPosition(pos) + + #---------------------------------------------------------------------- + def _onWebsocketPacket(self, packets): # type: (dict)->None + + for packet in packets: + print('packets:') + print(packets) + channelName = None + if 'channel' in packet: + channelName = packet['channel'] + if not channelName or channelName == 'addChannel': + return + + packet = packet['data'] + channel = parseChannel(channelName) # type: ExtraSymbolChannel + + if channel.type == ChannelType.Tick: + uiSymbol = remoteSymbolToLocal(channel.symbol, remoteContractTypeToLocal(channel.remoteContractType)) + tick = VtTickData.createFromGateway( + gateway=self, + symbol=uiSymbol, + exchange=self.exchange, + lastPrice=float(packet['last']), + lastVolume=float(packet['vol']), + highPrice=float(packet['high']), + lowPrice=float(packet['low']), + openInterest=int(packet['hold_amount']), + lowerLimit=float(packet['limitLow']), + upperLimit=float(packet['limitHigh']) + ) + self.onTick(tick) + + +#---------------------------------------------------------------------- +def localOrderTypeToRemote(direction, offset): # type: (str, str)->str + return _orderTypeMap[(direction, offset)] + + +#---------------------------------------------------------------------- +def remoteOrderTypeToLocal(orderType): # type: (str)->(str, str) + """ + :param orderType: + :return: direction, offset + """ + return _orderTypeMapReverse[orderType] + + +#---------------------------------------------------------------------- +def localContractTypeToRemote(localContractType): + return _contractTypeMap[localContractType] + + +#---------------------------------------------------------------------- +def remoteContractTypeToLocal(remoteContractType): + return _contractTypeMapReverse[remoteContractType] + + +#---------------------------------------------------------------------- +def localSymbolToRemote(symbol): # type: (str)->(OkexFutureSymbol, OkexFutureContractType) + """ + :return: remoteSymbol, remoteContractType + """ + return _symbolsForUi[symbol] + + +#---------------------------------------------------------------------- +def remoteSymbolToLocal(remoteSymbol, localContractType): + return remoteSymbol.upper() + '_' + localContractType + + +#---------------------------------------------------------------------- +def remotePrefixToRemoteContractType(prefix): + return _prefixForRemoteContractType[prefix] + + +#---------------------------------------------------------------------- +class ChannelType(Enum): + Login = 1 + ForecastPrice = 2 + Tick = 3 + Depth = 4 + Trade = 5 + Index = 6 + + +######################################################################## +class Channel(object): + + #---------------------------------------------------------------------- + def __init__(self, type): + self.type = type + + +######################################################################## +class SymbolChannel(Channel): + + #---------------------------------------------------------------------- + def __init__(self, type, symbol): + super(SymbolChannel, self).__init__(type) + self.symbol = symbol + + +######################################################################## +class FutureSymbolChannel(SymbolChannel): + + #---------------------------------------------------------------------- + def __init__(self, type, symbol, remoteContractType): + super(FutureSymbolChannel, self).__init__(type, symbol) + self.remoteContractType = remoteContractType + + +######################################################################## +class ExtraSymbolChannel(FutureSymbolChannel): + + #---------------------------------------------------------------------- + def __init__(self, type, symbol, remoteContractType, extra): + super(ExtraSymbolChannel, self).__init__(type, symbol, remoteContractType) + self.extra = extra + + +#---------------------------------------------------------------------- +def parseChannel(channel): # type: (str)->Channel + if channel == 'login': + return Channel(ChannelType.Login) + elif channel[4:12] == 'forecast': # eg: 'btc_forecast_price' + return SymbolChannel(ChannelType.ForecastPrice, channel[:3]) + sp = channel.split('_') + if sp[-1] == 'index': # eg: 'ok_sub_futureusd_btc_index' + return SymbolChannel(ChannelType.Index, channel[17:20]) + + l = len(sp) + if len(sp) == 9: + _, _, _, easySymbol, crash, typeName, contractTypePrefix, _, depth = sp + return ExtraSymbolChannel(ChannelType.Depth, easySymbol + '_' + crash, + remotePrefixToRemoteContractType(contractTypePrefix), + depth) + _, _, _, easySymbol, crash, typeName, contractTypePrefix, _ = sp + return FutureSymbolChannel(ChannelType.Tick, easySymbol + '_' + crash, + remotePrefixToRemoteContractType(contractTypePrefix)) + + +_orderTypeMap = { + (constant.DIRECTION_LONG, constant.OFFSET_OPEN): OkexFutureOrderType.OpenLong, + (constant.DIRECTION_SHORT, constant.OFFSET_OPEN): OkexFutureOrderType.OpenShort, + (constant.DIRECTION_LONG, constant.OFFSET_CLOSE): OkexFutureOrderType.CloseLong, + (constant.DIRECTION_SHORT, constant.OFFSET_CLOSE): OkexFutureOrderType.CloseShort, +} +_orderTypeMapReverse = {v: k for k, v in _orderTypeMap.items()} + +_prefixForRemoteContractType = {v.split('_')[0]: v for k, v in OkexFutureContractType.__dict__.items() if + not k.startswith('_')} + +_contractTypeMap = { + k.upper(): v for k, v in OkexFutureContractType.__dict__.items() if not k.startswith('_') +} +_contractTypeMapReverse = {v: k for k, v in _contractTypeMap.items()} + +_easySymbols = { + v for k, v in OkexFutureEasySymbol.__dict__.items() if not k.startswith('_') +} + +_remoteSymbols = { + v for k, v in OkexFutureSymbol.__dict__.items() if not k.startswith('_') +} + +# symbols for ui, +# keys:给用户看的symbols : f"{internalSymbol}_{contractType}" +# values: API接口使用的symbol和contractType字段 +_symbolsForUi = {(remoteSymbol.upper() + '_' + upperContractType.upper()): (remoteSymbol, remoteContractType) + for remoteSymbol in _remoteSymbols + for upperContractType, remoteContractType in + _contractTypeMap.items()} # type: Dict[str, List[str, str]] +_symbolsForUiReverse = {v: k for k, v in _symbolsForUi.items()} + +_channel_for_subscribe = { + 'ok_sub_futureusd_' + easySymbol + '_ticker_' + remoteContractType: (easySymbol, remoteContractType) + for easySymbol in _easySymbols + for remoteContractType in _contractTypeMap.values() +}