修改接口

This commit is contained in:
msincenselee 2018-05-30 11:20:08 +08:00
parent 7f76e50501
commit c9c4b81ea4
6 changed files with 375 additions and 170 deletions

View File

@ -126,28 +126,32 @@ class Client(object):
return params return params
def _request(self, method, uri, signed, force_params=False, **kwargs): def _request(self, method, uri, signed, force_params=False, **kwargs):
data = kwargs.get('data', None) data = kwargs.get('data', None)
#if data is None:
# kwargs['data'] = data
if data and isinstance(data, dict): if data and isinstance(data, dict):
kwargs['data'] = data kwargs['data'] = data
if signed: if signed:
if data is None:
kwargs['data'] = {}
# generate signature # generate signature
kwargs['data']['timestamp'] = int(time.time() * 1000) kwargs['data']['timestamp'] = int(time.time() * 1000)
kwargs['data']['recvWindow'] = 20000 kwargs['data']['recvWindow'] = 20000
kwargs['data']['signature'] = self._generate_signature(kwargs['data']) kwargs['data']['signature'] = self._generate_signature(kwargs['data'])
if data and (method == 'get' or force_params): if data and (method == 'get' or force_params):
kwargs['params'] = self._order_params(kwargs['data']) kwargs['params'] = self._order_params(kwargs['data'])
del(kwargs['data']) del(kwargs['data'])
# kwargs["verify"] = False # I don't know whay this is error # kwargs["verify"] = False # I don't know whay this is error
print('_request:method:{} uri:{},{}'.format(method, uri, kwargs))
response = getattr(self.session, method)(uri, **kwargs ) response = getattr(self.session, method)(uri, **kwargs )
return self._handle_response(response , uri , **kwargs ) return self._handle_response(response , uri , **kwargs )
def _request_api(self, method, path, signed=False, version=PUBLIC_API_VERSION, **kwargs): def _request_api(self, method, path, signed=False, version=PUBLIC_API_VERSION, **kwargs):
uri = self._create_api_uri(path, signed, version) uri = self._create_api_uri(path, signed, version)
return self._request(method, uri, signed, **kwargs) return self._request(method, uri, signed, **kwargs)
def _request_withdraw_api(self, method, path, signed=False, **kwargs): def _request_withdraw_api(self, method, path, signed=False, **kwargs):
@ -393,6 +397,7 @@ class Client(object):
:raises: BinanceResponseException, BinanceAPIException :raises: BinanceResponseException, BinanceAPIException
""" """
return self._get('depth', data=params) return self._get('depth', data=params)
def get_recent_trades(self, **params): def get_recent_trades(self, **params):

View File

@ -5,21 +5,25 @@ class BinanceAPIException(Exception):
LISTENKEY_NOT_EXIST = '-1125' LISTENKEY_NOT_EXIST = '-1125'
def __init__(self, response): def __init__(self, response, uri, **kwargs):
json_res = response.json() json_res = response.json()
self.status_code = response.status_code self.status_code = response.status_code
self.response = response self.response = response
self.code = json_res['code'] self.code = json_res['code']
self.message = json_res['msg'] self.message = json_res['msg']
self.request = getattr(response, 'request', None) self.request = getattr(response, 'request', None)
self.kwargs = kwargs
self.uri = uri
def __str__(self): # pragma: no cover def __str__(self): # pragma: no cover
return 'APIError(code=%s): %s' % (self.code, self.message) return 'APIError(code=%s): %s' % (self.code, self.message)
class BinanceRequestException(Exception): class BinanceRequestException(Exception):
def __init__(self, message):
def __init__(self, message, uri, **kwargs):
self.message = message self.message = message
self.kwargs = kwargs
self.uri = uri
def __str__(self): def __str__(self):
return 'BinanceRequestException: %s' % self.message return 'BinanceRequestException: %s' % self.message

View File

@ -101,17 +101,16 @@ class BinanceSpotApi(object):
self.active = True self.active = True
self.reqThread.start() self.reqThread.start()
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def processQueue(self): def processQueue(self):
"""处理请求队列中的请求""" """处理请求队列中的请求"""
while self.active: while self.active:
try: #req = self.reqQueue.get(block=True, timeout=0.001) # 获取请求的阻塞为一秒
#req = self.reqQueue.get(block=True, timeout=0.001) # 获取请求的阻塞为一秒 if len(self.reqQueue) == 0:
if len(self.reqQueue) == 0: continue
continue (Type , req) = self.reqQueue[0]
(Type , req) = self.reqQueue[0]
try:
callback = req['callback'] callback = req['callback']
reqID = req['reqID'] reqID = req['reqID']
@ -126,7 +125,12 @@ class BinanceSpotApi(object):
print(u'processQueue exception:{},{}'.format(str(ex), traceback.format_exc()), file=sys.stderr) print(u'processQueue exception:{},{}'.format(str(ex), traceback.format_exc()), file=sys.stderr)
self.reqQueue.pop(0) self.reqQueue.pop(0)
sleep(0.1) sleep(0.1)
#except BinanceAPIException as ex:
# print(u'BinanceAPIException:{},{}'.format( str(ex),traceback.format_exc()),file=sys.stderr)
#except BinanceRequestException as ex:
# print(u'BinanceRequestException:{},{}'.format(str(ex), traceback.format_exc()), file=sys.stderr)
except Exception as ex: except Exception as ex:
self.onAllError(ex,req,reqID)
print(u'processQueue exception:{},{}'.format(str(ex), traceback.format_exc()), file=sys.stderr) print(u'processQueue exception:{},{}'.format(str(ex), traceback.format_exc()), file=sys.stderr)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
@ -227,7 +231,9 @@ class BinanceSpotApi(object):
callback(data , req , reqID) callback(data , req , reqID)
except Exception as ex: except Exception as ex:
print(u'processQueue exception:{},{}'.format(str(ex), traceback.format_exc()), file=sys.stderr) if req is not None or reqID is not None:
self.onAllError(ex, req, reqID)
print(u'processRequest exception:{},{}'.format(str(ex), traceback.format_exc()), file=sys.stderr)
# pass # pass
#---------------------------------------------------------------------- #----------------------------------------------------------------------

View File

@ -2,7 +2,7 @@
"fontFamily": "微软雅黑", "fontFamily": "微软雅黑",
"fontSize": 12, "fontSize": 12,
"mongoHost": "mongodb://vnpy:vnpy@localhost", "mongoHost": "localhost",
"mongoPort": 27017, "mongoPort": 27017,
"mongoLogging": true, "mongoLogging": true,
"darkStyle": true, "darkStyle": true,

View File

@ -16,7 +16,7 @@ import json
from datetime import datetime , timedelta from datetime import datetime , timedelta
import time import time
from vnpy.api.binance import BinanceSpotApi from vnpy.api.binance import BinanceSpotApi,BinanceAPIException,BinanceRequestException
from vnpy.trader.vtGateway import * from vnpy.trader.vtGateway import *
from vnpy.trader.vtFunction import getJsonPath from vnpy.trader.vtFunction import getJsonPath
from vnpy.trader.gateway.binanceGateway.DigitalCurrency import systemSymbolToVnSymbol , VnSymbolToSystemSymbol from vnpy.trader.gateway.binanceGateway.DigitalCurrency import systemSymbolToVnSymbol , VnSymbolToSystemSymbol
@ -238,10 +238,26 @@ class BinanceApi(BinanceSpotApi):
symbol = (req.vtSymbol.split('.'))[0] symbol = (req.vtSymbol.split('.'))[0]
self.gateway.writeLog( "send order %s,%s,%s,%s" % (req.vtSymbol , str(req.direction) , str(req.price) , str(req.volume) )) self.gateway.writeLog( "send order %s,%s,%s,%s" % (req.vtSymbol , str(req.direction) , str(req.price) , str(req.volume) ))
if req.direction == DIRECTION_LONG:
reqID = self.spotTrade(symbol_pair=symbol, type_="buy", price=float(req.price), amount=req.volume) try:
else: reqID = self.spotTrade(symbol_pair=symbol, type_="buy" if req.direction == DIRECTION_LONG else "sell", price=float(req.price), amount=req.volume)
reqID = self.spotTrade(symbol_pair=symbol, type_="sell", price=float(req.price), amount=req.volume)
except BinanceAPIException as ex:
self.gateway.writeError(content=ex.message, error_id=ex.code)
self.gateway.writeLog(traceback.format_exc())
return None
except BinanceRequestException as ex:
self.gateway.writeError(content=ex.message)
self.gateway.writeLog(traceback.format_exc())
return None
except Exception as ex:
self.gateway.writeError(str(ex))
self.gateway.writeLog(traceback.format_exc())
return None
if reqID is None:
self.gateway.writeError(u'委托异常: %s,%s,%s,%s'.format(req.vtSymbol , str(req.direction) , str(req.price) , str(req.volume) ))
return None
self.localID += 1 self.localID += 1
localID = str(self.localID) localID = str(self.localID)
@ -269,7 +285,7 @@ class BinanceApi(BinanceSpotApi):
self.workingOrderDict[localID] = order self.workingOrderDict[localID] = order
self.reqToLocalID[reqID] = localID self.reqToLocalID[reqID] = localID
# self.gateway.onOrder(order) self.gateway.onOrder(order)
# 返回委托号 # 返回委托号
return order.vtOrderID return order.vtOrderID
@ -285,7 +301,8 @@ print ex.status_code, ex.message , ex.code , ex.request , ex.uri , ex.kwargs
""" """
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def onAllError(self, ex , req , reqID): def onAllError(self, ex , req , reqID):
self.gateway.writeError( ex.message + " " + ex.uri , ex.status_code)
self.gateway.writeError(content=str(ex) + ' ' + getattr(ex,'uri',''),error_id=getattr(ex,'status_code',0))
# 判断是否应该是 发出的无效订单 # 判断是否应该是 发出的无效订单
localID = self.reqToLocalID.get( reqID , None) localID = self.reqToLocalID.get( reqID , None)
@ -378,6 +395,8 @@ print ex.status_code, ex.message , ex.code , ex.request , ex.uri , ex.kwargs
contract.size = float(volume_filter["stepSize"]) contract.size = float(volume_filter["stepSize"])
contract.priceTick = float(price_filter["tickSize"]) contract.priceTick = float(price_filter["tickSize"])
contract.productClass = PRODUCT_SPOT contract.productClass = PRODUCT_SPOT
contract.volumeTick = float(volume_filter["minQty"])
self.gateway.onContract(contract) self.gateway.onContract(contract)
#---------------------------------------------------------------------- #----------------------------------------------------------------------

View File

@ -23,7 +23,7 @@ from vnpy.api.okex.okexData import SPOT_TRADE_SIZE_DICT, SPOT_ERROR_DICT, FUTURE
from vnpy.trader.vtGateway import * from vnpy.trader.vtGateway import *
from vnpy.trader.vtFunction import getJsonPath from vnpy.trader.vtFunction import getJsonPath
from vnpy.trader.vtConstant import EXCHANGE_OKEX, DIRECTION_NET, PRODUCT_SPOT, DIRECTION_LONG, DIRECTION_SHORT, PRICETYPE_LIMITPRICE, PRICETYPE_MARKETPRICE, OFFSET_OPEN, OFFSET_CLOSE from vnpy.trader.vtConstant import EXCHANGE_OKEX, DIRECTION_NET, PRODUCT_SPOT, DIRECTION_LONG, DIRECTION_SHORT, PRICETYPE_LIMITPRICE, PRICETYPE_MARKETPRICE, OFFSET_OPEN, OFFSET_CLOSE
from vnpy.trader.vtConstant import STATUS_CANCELLED, STATUS_NOTTRADED, STATUS_PARTTRADED, STATUS_ALLTRADED, STATUS_UNKNOWN, PRODUCT_FUTURES from vnpy.trader.vtConstant import STATUS_CANCELLED, STATUS_NOTTRADED, STATUS_PARTTRADED, STATUS_ALLTRADED, STATUS_UNKNOWN, STATUS_REJECTED, PRODUCT_FUTURES
from vnpy.trader.vtObject import VtErrorData from vnpy.trader.vtObject import VtErrorData
# 价格类型映射 # 价格类型映射
@ -48,6 +48,7 @@ statusMap[-1] = STATUS_CANCELLED # 撤单
statusMap[0] = STATUS_NOTTRADED # 未成交 statusMap[0] = STATUS_NOTTRADED # 未成交
statusMap[1] = STATUS_PARTTRADED # 部分成交 statusMap[1] = STATUS_PARTTRADED # 部分成交
statusMap[2] = STATUS_ALLTRADED # 全部成交 statusMap[2] = STATUS_ALLTRADED # 全部成交
statusMap[3] = STATUS_UNKNOWN
statusMap[4] = STATUS_UNKNOWN # 未知状态 statusMap[4] = STATUS_UNKNOWN # 未知状态
EVENT_OKEX_INDEX_FUTURE = "eFuture_Index_OKEX" EVENT_OKEX_INDEX_FUTURE = "eFuture_Index_OKEX"
@ -122,7 +123,7 @@ class OkexGateway(VtGateway):
# 启动查询 # 启动查询
self.initQuery() self.initQuery()
#self.startQuery() self.startQuery()
def writeLog(self, content): def writeLog(self, content):
""" """
@ -148,6 +149,9 @@ class OkexGateway(VtGateway):
if self.logger: if self.logger:
self.logger.error(content) self.logger.error(content)
def checkStatus(self):
return self.spot_connected or self.futures_connected
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def subscribe(self, subscribeReq): def subscribe(self, subscribeReq):
""" """
@ -169,13 +173,18 @@ class OkexGateway(VtGateway):
self.writeError(u'期货接口未创建/未连接无法调用subscribe') self.writeError(u'期货接口未创建/未连接无法调用subscribe')
except Exception as ex: except Exception as ex:
self.onError(u'OkexGateway.subscribe 异常,请检查日志') self.writeError(u'OkexGateway.subscribe 异常,请检查日志:{}'.format(str(ex)))
self.writeLog(u'OkexGateway.subscribe Exception :{},{}'.format(str(ex), traceback.format_exc())) self.writeLog(u'OkexGateway.subscribe Exception :{},{}'.format(str(ex), traceback.format_exc()))
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def sendOrder(self, orderReq): def sendOrder(self, orderReq):
"""发单""" """发单"""
if orderReq.symbol in SPOT_SYMBOL:
# btc_usdt.OKEX => btc_usdt
order_req_symbol = orderReq.symbol
order_req_symbol = order_req_symbol.replace('.{}'.format(EXCHANGE_OKEX),'')
if order_req_symbol in SPOT_SYMBOL:
if self.api_spot and self.spot_connected: if self.api_spot and self.spot_connected:
return self.api_spot.spotSendOrder(orderReq) return self.api_spot.spotSendOrder(orderReq)
else: else:
@ -207,8 +216,8 @@ class OkexGateway(VtGateway):
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def qryOrderInfo(self): def qryOrderInfo(self):
#if self.spot_connected: if self.spot_connected:
# self.api_spot.spotAllOrders() self.api_spot.spotAllOrders()
if self.futures_connected: if self.futures_connected:
self.api_futures.futureAllUnfinishedOrderInfo() self.api_futures.futureAllUnfinishedOrderInfo()
@ -234,8 +243,8 @@ class OkexGateway(VtGateway):
"""初始化连续查询""" """初始化连续查询"""
if self.qryEnabled: if self.qryEnabled:
# 需要循环的查询函数列表 # 需要循环的查询函数列表
# self.qryFunctionList = [self.qryAccount, self.qryOrderInfo] self.qryFunctionList = [self.qryAccount, self.qryOrderInfo]
self.qryFunctionList = [self.qryOrderInfo] #self.qryFunctionList = [self.qryOrderInfo]
# self.qryFunctionList = [] # self.qryFunctionList = []
self.qryCount = 0 # 查询触发倒计时 self.qryCount = 0 # 查询触发倒计时
@ -319,10 +328,11 @@ class OkexSpotApi(WsSpotApi):
self.cbDict = {} # 回调函数字典 self.cbDict = {} # 回调函数字典
self.tickDict = {} # 缓存最新tick字典 self.tickDict = {} # 缓存最新tick字典
self.orderDict = {} # 委托单字典 self.orderDict = {} # 委托单字典
self.localOrderDict = {} # 本地缓存的order_dictkey 是 localNo.gatewayName
self.channelSymbolMap = {} # 请求数据类型与合约的映射字典 self.channelSymbolMap = {} # 请求数据类型与合约的映射字典
self.localNo = 0 # 本地委托号 self.localNo = 1 # 本地委托号
self.localNoQueue = Queue() # 未收到系统委托号的本地委托号队列 self.localNoQueue = Queue() # 未收到系统委托号的本地委托号队列
self.localNoDict = {} # key为本地委托号value为系统委托号 self.localNoDict = {} # key为本地委托号value为系统委托号
self.orderIdDict = {} # key为系统委托号value为本地委托号 self.orderIdDict = {} # key为系统委托号value为本地委托号
@ -330,7 +340,6 @@ class OkexSpotApi(WsSpotApi):
self.recordOrderId_BefVolume = {} # 记录的之前处理的量 self.recordOrderId_BefVolume = {} # 记录的之前处理的量
self.cache_some_order = {}
self.tradeID = 0 self.tradeID = 0
# 已登记的品种对队列 # 已登记的品种对队列
@ -396,7 +405,7 @@ class OkexSpotApi(WsSpotApi):
elif channel_value == 'addChannel': elif channel_value == 'addChannel':
channel_data = data.get('data', {}) channel_data = data.get('data', {})
result = channel_data.get('result', False) result = channel_data.get('result', False)
channel_value = channel_data["channel"] channel_value = channel_data.get('channel', "")
if result: if result:
self.gateway.writeLog(u'功能订阅请求:{} 成功:'.format(channel_value)) self.gateway.writeLog(u'功能订阅请求:{} 成功:'.format(channel_value))
continue continue
@ -428,7 +437,7 @@ class OkexSpotApi(WsSpotApi):
error.gatewayName = self.gatewayName error.gatewayName = self.gatewayName
if 'data' in data and 'error_code' in data['data']: if 'data' in data and 'error_code' in data['data']:
error_code =data['data']['error_code'] error_code =data['data']['error_code']
error.errorMsg = u'SpotApi 出错:{}'.format(SPOT_ERROR_DICT.get(error_code)) error.errorMsg = u'SpotApi 出错:{}'.format(SPOT_ERROR_DICT.get(str(error_code)))
error.errorID = error_code error.errorID = error_code
else: else:
error.errorMsg = str(data) error.errorMsg = str(data)
@ -452,10 +461,10 @@ class OkexSpotApi(WsSpotApi):
# 重新连接 # 重新连接
if self.active: if self.active:
def reconnect(): def reconnect():
while not self.gateway.connected: while not self.gateway.spot_connected:
self.gateway.writeLog(u'等待10秒后重新连接') self.gateway.writeLog(u'等待10秒后重新连接')
sleep(10) sleep(10)
if not self.gateway.connected: if not self.gateway.spot_connected:
self.reconnect() self.reconnect()
t = Thread(target=reconnect) t = Thread(target=reconnect)
@ -517,11 +526,11 @@ class OkexSpotApi(WsSpotApi):
self.spotOrderInfo(symbol, '-1') self.spotOrderInfo(symbol, '-1')
# 对已经发送委托根据orderid发出查询请求 # 对已经发送委托根据orderid发出查询请求
for orderId in self.orderIdDict.keys(): #for orderId in self.orderIdDict.keys():
order = self.orderDict.get(orderId, None) # order = self.orderDict.get(orderId, None)
if order != None: # if order != None:
symbol_pair = (order.symbol.split('.'))[0] # symbol_pair = (order.symbol.split('.'))[0]
self.spotOrderInfo(symbol_pair, orderId) # self.spotOrderInfo(symbol_pair, orderId)
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def onOpen(self, ws): def onOpen(self, ws):
@ -540,10 +549,18 @@ class OkexSpotApi(WsSpotApi):
self.spotUserInfo() self.spotUserInfo()
# 尝试订阅一个合约对 # 尝试订阅一个合约对
self.subscribeSingleSymbol("etc_usdt") if len(self.registerSymbolPairArray) == 0:
self.subscribeSingleSymbol("btc_usdt") #self.subscribeSingleSymbol("etc_usdt")
self.subscribeSingleSymbol("eth_usdt") #self.subscribeSingleSymbol("btc_usdt")
self.subscribeSingleSymbol("ltc_usdt") self.registerSymbolPairArray.add('eth_usdt')
self.registerSymbolPairArray.add('btc_usdt')
self.registerSymbolPairArray.add('ltc_usdt')
for symbol_pair in self.registerSymbolPairArray:
# 发起品种行情订阅请求
self.subscribeSingleSymbol(symbol_pair)
# 查询该品种对的订单
self.spotOrderInfo(symbol_pair, '-1')
self.gateway.writeLog(u'SpotApi初始化合约信息') self.gateway.writeLog(u'SpotApi初始化合约信息')
@ -562,9 +579,9 @@ class OkexSpotApi(WsSpotApi):
# 更新至数据引擎的合约信息 # 更新至数据引擎的合约信息
contract = VtContractData() contract = VtContractData()
contract.gatewayName = self.gatewayName contract.gatewayName = self.gatewayName
contract.symbol = symbol #'.'.join([contract.symbol, contract.exchange])
contract.exchange = EXCHANGE_OKEX contract.exchange = EXCHANGE_OKEX
contract.vtSymbol = symbol #'.'.join([contract.symbol, contract.exchange]) contract.symbol = '.'.join([symbol, contract.exchange])
contract.vtSymbol = contract.symbol
contract.name = u'现货%s' % symbol contract.name = u'现货%s' % symbol
contract.size = 1 # 现货是1:1 contract.size = 1 # 现货是1:1
contract.priceTick = 0.0001 contract.priceTick = 0.0001
@ -669,8 +686,8 @@ class OkexSpotApi(WsSpotApi):
if symbol not in self.tickDict: if symbol not in self.tickDict:
tick = VtTickData() tick = VtTickData()
tick.exchange = EXCHANGE_OKEX tick.exchange = EXCHANGE_OKEX
tick.symbol = symbol # '.'.join([symbol, tick.exchange]) tick.symbol = '.'.join([symbol, tick.exchange])
tick.vtSymbol = symbol #'.'.join([symbol, tick.exchange]) tick.vtSymbol = '.'.join([symbol, tick.exchange])
tick.gatewayName = self.gatewayName tick.gatewayName = self.gatewayName
self.tickDict[symbol] = tick self.tickDict[symbol] = tick
@ -811,7 +828,7 @@ class OkexSpotApi(WsSpotApi):
tick.askPrice4, tick.askVolume4 = data['asks'][-4] tick.askPrice4, tick.askVolume4 = data['asks'][-4]
tick.askPrice5, tick.askVolume5 = data['asks'][-5] tick.askPrice5, tick.askVolume5 = data['asks'][-5]
tick.date, tick.time = self.generateDateTime(data['timestamp']) tick.date, tick.time,tick.datetime = self.generateDateTime(data['timestamp'])
# print "Depth", tick.date, tick.time # print "Depth", tick.date, tick.time
# 推送tick事件 # 推送tick事件
@ -848,8 +865,8 @@ class OkexSpotApi(WsSpotApi):
for symbol, position in data_info_freezed.items(): for symbol, position in data_info_freezed.items():
pos = VtPositionData() pos = VtPositionData()
pos.gatewayName = self.gatewayName pos.gatewayName = self.gatewayName
pos.symbol = symbol #+ "." + EXCHANGE_OKEX pos.symbol = symbol + "." + EXCHANGE_OKEX
pos.vtSymbol = symbol #+ "." + EXCHANGE_OKEX pos.vtSymbol = symbol + "." + EXCHANGE_OKEX
pos.direction = DIRECTION_NET pos.direction = DIRECTION_NET
pos.frozen = float(position) pos.frozen = float(position)
pos.position = pos.frozen + float(data_info_free.get(symbol, 0)) pos.position = pos.frozen + float(data_info_free.get(symbol, 0))
@ -910,8 +927,8 @@ ltc":"0","bt1":"0","bt2":"0","iota":"0","pay":"0","storj":"0","gnt":"0","snt":"0
if symbol in data_info_funds['free']: if symbol in data_info_funds['free']:
pos = VtPositionData() pos = VtPositionData()
pos.gatewayName = self.gatewayName pos.gatewayName = self.gatewayName
pos.symbol = symbol # + "." + EXCHANGE_OKEX pos.symbol = symbol + "." + EXCHANGE_OKEX
pos.vtSymbol = symbol #+ "." + EXCHANGE_OKEX pos.vtSymbol = symbol + "." + EXCHANGE_OKEX
pos.vtPositionName = symbol pos.vtPositionName = symbol
pos.direction = DIRECTION_NET pos.direction = DIRECTION_NET
@ -1008,27 +1025,38 @@ nel': u'ok_sub_spot_etc_usdt_order'}
def onSpotSubOrder(self, ws_data): def onSpotSubOrder(self, ws_data):
""" """
交易委托更新回报发生部分成交/全部成交/拒单/撤销时API推送的回报) 交易委托更新回报发生部分成交/全部成交/拒单/撤销时API推送的回报)
:param ws_data:ws推送的委托更新数据 :param ws_data:ws推送的单个委托更新数据
:return: :return:
""" """
data = ws_data("data") self.gateway.writeLog(u'SpotApi.onSpotSubOrder:{}'.format(ws_data))
data = ws_data.get("data")
if data is None: return if data is None: return
# 本地和系统委托号 # 本地和系统委托号
orderId = str(data['orderId']) ok_order_id = str(data['orderId'])
localNo = self.orderIdDict.get(orderId, None) localNo = self.orderIdDict.get(ok_order_id, None)
if localNo == None: if localNo == None:
self.gateway.writeError(u'订单与本地委托无关联,不处理') self.gateway.writeError(u'onSpotSubOrder:ok_order_id:{}与本地委托无关联,'.format(ok_order_id))
self.gateway.writeLog(data) self.gateway.writeLog(data)
return
localNo = str(self.localNo)
if ok_order_id not in self.orderIdDict:
# orderId 不在本地缓存中,需要增加一个绑定关系
while str(self.localNo) in self.localNoDict:
self.localNo += 1
localNo = str(self.localNo)
self.localNoDict[localNo] = ok_order_id
self.orderIdDict[ok_order_id] = localNo
self.gateway.writeLog(u'onFutureOrderInfo add orderid: local:{}<=>okex:{}'.format(localNo, ok_order_id))
# 委托信息 # 委托信息
if orderId not in self.orderDict: if ok_order_id not in self.orderDict:
self.gateway.writeLog(u'onSpotSubOrder:添加至委托单缓存:{}'.format(data))
order = VtOrderData() order = VtOrderData()
order.gatewayName = self.gatewayName order.gatewayName = self.gatewayName
order.symbol = data['symbol'] # '.'.join([rawData['symbol'], EXCHANGE_OKEX]) order.symbol = '.'.join([data['symbol'], EXCHANGE_OKEX])
order.vtSymbol = order.symbol order.vtSymbol = order.symbol
order.orderID = localNo order.orderID = localNo
@ -1037,18 +1065,23 @@ nel': u'ok_sub_spot_etc_usdt_order'}
order.price = float(data['tradeUnitPrice']) order.price = float(data['tradeUnitPrice'])
order.totalVolume = float(data['tradeAmount']) order.totalVolume = float(data['tradeAmount'])
order.direction, priceType = priceTypeMap[data['tradeType']] order.direction, priceType = priceTypeMap[data['tradeType']]
order.offset = OFFSET_OPEN if order.direction == DIRECTION_LONG else OFFSET_CLOSE
self.orderDict[orderId] = order create_date, order.orderTime,_ = self.generateDateTime(data['createdDate'])
self.orderDict[ok_order_id] = order
else: else:
order = self.orderDict[orderId] order = self.orderDict[ok_order_id]
order.tradedVolume = float(data['completedTradeAmount']) order.tradedVolume = float(data['completedTradeAmount'])
order.status = statusMap[data['status']] order.status = statusMap[data['status']]
if order.status == STATUS_CANCELLED:
dt = datetime.now()
order.cancelTime = dt.strftime("%H:%M:%S.%f")
# 推送onOrder event # 推送onOrder event
self.gateway.writeLog(u'onSpotSubOrder:发出OnOrdervtOrderId={},orderStatus:{}'.format(order.vtOrderID,order.status))
self.gateway.onOrder(copy(order)) self.gateway.onOrder(copy(order))
bef_volume = self.recordOrderId_BefVolume.get(orderId, 0.0) bef_volume = self.recordOrderId_BefVolume.get(ok_order_id, 0.0)
now_volume = float(data['completedTradeAmount']) - bef_volume now_volume = float(data['completedTradeAmount']) - bef_volume
if now_volume > 0.000001: if now_volume > 0.000001:
@ -1072,6 +1105,7 @@ nel': u'ok_sub_spot_etc_usdt_order'}
trade.tradeTime = datetime.now().strftime('%H:%M:%S') trade.tradeTime = datetime.now().strftime('%H:%M:%S')
# 推送onTrade事件 # 推送onTrade事件
self.gateway.writeLog(u'onSpotSubOrder:发出onTradervtOrderId={}'.format(trade.vtOrderID))
self.gateway.onTrade(trade) self.gateway.onTrade(trade)
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
@ -1101,9 +1135,12 @@ nel': u'ok_sub_spot_etc_usdt_order'}
def onSpotOrderInfo(self, ws_data): def onSpotOrderInfo(self, ws_data):
""" """
所有委托信息查询响应 所有委托信息查询响应
:param ws_data: self.spotOrderInfo(symbol_pair, '-1')返回symbol_pair所有未完成订单
self.spotOrderInfo(symbol_pair, orderId)返回symbol_pairorderId的订单
:param ws_dataorders 清单
:return: :return:
""" """
self.gateway.writeLog(u'SpApi.onSpotOrderInfo:{}'.format(ws_data))
# 获取channel的数据 # 获取channel的数据
data = ws_data.get('data', {}) data = ws_data.get('data', {})
@ -1111,26 +1148,41 @@ nel': u'ok_sub_spot_etc_usdt_order'}
orders = data.get('orders', []) orders = data.get('orders', [])
for data_order in orders: for data_order in orders:
self.localNo += 1 # 本地序列号自增长 ok_order_id = data_order.get('order_id')
localNo = str(self.localNo) # int =》 str if ok_order_id is None:
ok_order_id = data_order.get('orders_id')
order_id = data_order.get('order_id') # 服务端的委托单编号 if ok_order_id is None:
if order_id is None: self.gateway.writeError(u'提取order_id出错')
self.gateway.writeError(u'SpotApi.onSpotOrderInfo取order_id/orderId异常请查日志')
self.gateway.writeLog(u'data_order:{}'.format(data_order))
continue continue
ok_order_id = str(ok_order_id)
orderId = str(order_id) # int =》 str localNo = str(self.localNo)
self.localNoDict[localNo] = orderId # 建立本地序列号与服务端序列号的相互映射 if ok_order_id not in self.orderIdDict:
self.orderIdDict[orderId] = localNo # orderId 不在本地缓存中,需要增加一个绑定关系
while str(self.localNo) in self.localNoDict:
self.localNo += 1
localNo = str(self.localNo)
# 绑定localNo 与 orderId
self.localNoDict[localNo] = ok_order_id
self.orderIdDict[ok_order_id] = localNo
self.gateway.writeLog(u'onFutureOrderInfo add orderid: local:{}<=>okex:{}'.format(localNo, ok_order_id))
else:
# orderid在本地缓存中
localNo = self.orderIdDict[ok_order_id]
# 检验 localNo是否在本地缓存没有则补充
if localNo not in self.localNoDict:
self.localNoDict[localNo] = ok_order_id
self.gateway.writeLog(u'onFutureOrderInfo update orderid: local:{}<=>okex:{}'.format(localNo, ok_order_id))
if orderId not in self.orderDict: # 不在本地委托单缓存时,添加 if ok_order_id not in self.orderDict: # 不在本地委托单缓存时,添加
self.gateway.writeLog(u'onSpotOrderInfo:添加至委托单缓存:{}'.format(data_order))
order = VtOrderData() order = VtOrderData()
order.gatewayName = self.gatewayName order.gatewayName = self.gatewayName
# order.symbol = spotSymbolMap[d['symbol']] # order.symbol = spotSymbolMap[d['symbol']]
order.symbol = data_order["symbol"] # '.'.join([d["symbol"], EXCHANGE_OKEX]) order.symbol = '.'.join([data_order["symbol"], EXCHANGE_OKEX])
order.vtSymbol = order.symbol order.vtSymbol = order.symbol
order.orderID = localNo order.orderID = localNo
@ -1139,14 +1191,14 @@ nel': u'ok_sub_spot_etc_usdt_order'}
order.price = data_order['price'] order.price = data_order['price']
order.totalVolume = data_order['amount'] order.totalVolume = data_order['amount']
order.direction, priceType = priceTypeMap[data_order['type']] order.direction, priceType = priceTypeMap[data_order['type']]
create_date, order.orderTime,_ = self.generateDateTime(data_order['create_date'])
self.orderDict[orderId] = order self.orderDict[ok_order_id] = order
else: else:
order = self.orderDict[orderId] # 使用本地缓存 order = self.orderDict[ok_order_id] # 使用本地缓存
order.tradedVolume = data_order['deal_amount'] # 更新成交数量 order.tradedVolume = data_order['deal_amount'] # 更新成交数量
order.status = statusMap[data_order['status']] # 更新成交状态 order.status = statusMap[data_order['status']] # 更新成交状态
self.gateway.writeLog('orderId:{},tradedVolume:{},status:{}'.format(order.vtOrderID,order.tradedVolume,order.status))
self.gateway.onOrder(copy(order)) # 提交onOrder事件 self.gateway.onOrder(copy(order)) # 提交onOrder事件
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
@ -1166,17 +1218,27 @@ nel': u'ok_sub_spot_etc_usdt_order'}
:param ws_data: :param ws_data:
:return: :return:
""" """
self.gateway.writeLog(u'SpotApi.onSpotOrder:{}'.format(ws_data))
data = ws_data.get('data', {}) data = ws_data.get('data', {})
if 'error_code' in data: if 'error_code' in data:
error_id = data.get('error_code') error_id = data.get('error_code')
self.gateway.writeError(u'SpotApi.onSpotOrder 委托返回错误:{}'.format(SPOT_ERROR_DICT.get(error_id)), error_id=error_id) self.gateway.writeError(u'SpotApi.onSpotOrder 委托返回错误:{}'.format(SPOT_ERROR_DICT.get(str(error_id))), error_id=error_id)
self.gateway.writeLog(ws_data) self.gateway.writeLog(ws_data)
localNo = self.localNoQueue.get_nowait()
self.gateway.writeLog(u'移除本地localNo:{}'.format(localNo))
vtOrderId = '.'.join([self.gatewayName,localNo])
order = self.localOrderDict.get(vtOrderId)
if order is not None:
order.status = STATUS_REJECTED
order.updateTime = datetime.now().strftime("%H:%M:%S.%f")
self.gateway.writeLog(u'发出OnOrder拒单,vtOrderId={}'.format(vtOrderId))
self.gateway.onOrder(order)
return return
orderId = data.get('order_id') ok_order_id = data.get('order_id')
if orderId is None: if ok_order_id is None:
self.gateway.writeError(u'SpotApi.onSpotOrder 委托返回中没有orderid') self.gateway.writeError(u'SpotApi.onSpotOrder 委托返回中没有orderid')
self.gateway.writeLog(ws_data) self.gateway.writeLog(ws_data)
return return
@ -1184,12 +1246,13 @@ nel': u'ok_sub_spot_etc_usdt_order'}
# 从本地编号Queue中FIFO提取最早的localNo # 从本地编号Queue中FIFO提取最早的localNo
localNo = self.localNoQueue.get_nowait() localNo = self.localNoQueue.get_nowait()
if localNo is None: if localNo is None:
self.gateway.writeError(u'SpotApi.onSportOrder未找到本地LocalNo检查日志') self.gateway.writeError(u'SpotApi.onSpotOrder未找到本地LocalNo检查日志')
self.gateway.writeLog(ws_data) self.gateway.writeLog(ws_data)
return return
self.localNoDict[localNo] = str(orderId) self.gateway.writeLog(u'SpotApi.onSpotOrder,绑定 local:{} <==> ok_order_id:{}'.format(localNo,ok_order_id))
self.orderIdDict[str(orderId)] = localNo self.localNoDict[localNo] = str(ok_order_id)
self.orderIdDict[str(ok_order_id)] = localNo
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
""" """
@ -1209,60 +1272,99 @@ nel': u'ok_sub_spot_etc_usdt_order'}
:param ws_data: :param ws_data:
:return: :return:
""" """
self.gateway.writeLog(u'SpotApi.onSpotCancelOrder()')
data = ws_data.get('data', {}) data = ws_data.get('data', {})
if 'error_code' in data: if 'error_code' in data:
error_id = data.get('error_code') error_id = data.get('error_code')
self.gateway.writeError(u'SpotApi.onSpotCancelOrder 委托返回错误:{}'.format(SPOT_ERROR_DICT.get(error_id)), error_id=error_id) self.gateway.writeError(u'SpotApi.onSpotCancelOrder 委托返回错误:{}'.format(SPOT_ERROR_DICT.get(str(error_id))), error_id=error_id)
self.gateway.writeLog(ws_data) self.gateway.writeLog(ws_data)
return return
orderId = data.get('order_id') ok_order_id = data.get('order_id')
if orderId is None: ok_result = data.get('result',False)
self.gateway.writeError(u'SpotApi.onSpotCancelOrder 委托返回中没有orderid') if ok_order_id is None:
self.gateway.writeError(u'SpotApi.onSpotCancelOrder 委托返回中没有order_id')
self.gateway.writeLog(ws_data) self.gateway.writeLog(ws_data)
return return
orderId = str(orderId) if not ok_result:
localNo = self.orderIdDict[orderId] self.gateway.writeError(u'SpotApi.onSpotCancelOrder 撤单失败')
self.gateway.writeLog(ws_data)
return
if orderId not in self.orderDict: ok_order_id = str(ok_order_id)
localNo = self.orderIdDict[ok_order_id]
if ok_order_id not in self.orderDict:
self.gateway.writeLog(u'onSpotCancelOrder,{}的订单不在self.orderDict,创建order对象'.format(ok_order_id))
order = VtOrderData() order = VtOrderData()
order.gatewayName = self.gatewayName order.gatewayName = self.gatewayName
order.symbol = data['symbol'] # '.'.join([rawData['symbol'], EXCHANGE_OKEX]) order.symbol = '.'.join([data['symbol'], EXCHANGE_OKEX])
order.vtSymbol = order.symbol order.vtSymbol = order.symbol
order.orderID = localNo order.orderID = localNo
order.vtOrderID = '.'.join([self.gatewayName, order.orderID]) order.vtOrderID = '.'.join([self.gatewayName, order.orderID])
self.orderDict[orderId] = order self.orderDict[ok_order_id] = order
else: else:
order = self.orderDict[orderId] order = self.orderDict[ok_order_id]
order.status = STATUS_CANCELLED order.status = STATUS_CANCELLED
dt = datetime.now()
order.cancelTime = dt.strftime("%H:%M:%S.%f")
self.gateway.onOrder(order) self.gateway.onOrder(order)
del self.orderDict[orderId] # #self.gateway.writeLog(u'onSpotCancelOrder:删除self.orderDict[{}]'.format(ok_order_id))
del self.orderIdDict[orderId] #del self.orderDict[ok_order_id]
del self.localNoDict[localNo] #self.gateway.writeLog(u'移除 localno: {} <==> ok_order_id:{}'.format(localNo,ok_order_id))
#del self.orderIdDict[ok_order_id]
#del self.localNoDict[localNo]
#
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def spotSendOrder(self, req): def spotSendOrder(self, req):
"""发委托单""" """
发出委托指令
:param req:
:return:
"""
self.gateway.writeLog(u'SpotApi.spotSendOrder()')
# 取得币币配对symbol_pair,如果上层在symbol后添加 .OKEX需要去除 # 取得币币配对symbol_pair,如果上层在symbol后添加 .OKEX需要去除
symbol = (req.symbol.split('.'))[0] symbol = (req.symbol.split('.'))[0]
if not symbol in self.registerSymbolPairArray:
self.registerSymbolPairArray.add(symbol)
# 获取匹配okex的订单类型 # 获取匹配okex的订单类型
type_ = priceTypeMapReverse[(req.direction, req.priceType)] type_ = priceTypeMapReverse[(req.direction, req.priceType)]
# 本地委托号加1并将对应字符串保存到队列中返回基于本地委托号的vtOrderID # 本地委托号加1并将对应字符串保存到队列中返回基于本地委托号的vtOrderID
self.localNo += 1 self.localNo += 1
self.gateway.writeLog(u'localNo:{}'.format(self.localNo))
self.localNoQueue.put(str(self.localNo)) self.localNoQueue.put(str(self.localNo))
vtOrderID = '.'.join([self.gatewayName, str(self.localNo)]) vtOrderID = '.'.join([self.gatewayName, str(self.localNo)])
self.gateway.writeLog(u'创建本地Order对象vtOrderId:{}'.format(vtOrderID))
# 创建本地order对象
order = VtOrderData()
order.gatewayName = self.gatewayName
order.symbol = symbol
order.vtSymbol = order.symbol
order.orderID = self.localNo
order.vtOrderID = vtOrderID
order.price = req.price
order.totalVolume = req.volume
order.direction, priceType = priceTypeMap[type_]
order.offset = OFFSET_OPEN if req.direction == DIRECTION_LONG else OFFSET_CLOSE
order.status = STATUS_NOTTRADED
dt = datetime.now()
order.orderTime = dt.strftime("%H:%M:%S.%f")
self.localOrderDict[vtOrderID] = order
# 调用ws api发送委托 # 调用ws api发送委托
self.gateway.writeLog(u'调用ws api发送委托')
self.spotTrade(symbol, type_, str(req.price), str(req.volume)) self.spotTrade(symbol, type_, str(req.price), str(req.volume))
self.gateway.writeLog('SpotSendOrder:symbol:{},Type:{},price:{},volume:{}'.format(symbol, type_, str(req.price), str(req.volume))) self.gateway.writeLog('SpotSendOrder:symbol:{},Type:{},price:{},volume:{}'.format(symbol, type_, str(req.price), str(req.volume)))
@ -1270,7 +1372,11 @@ nel': u'ok_sub_spot_etc_usdt_order'}
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def spotCancel(self, req): def spotCancel(self, req):
"""撤单""" """
发出撤单指令
:param req:
:return:
"""
# symbol = spotSymbolMapReverse[req.symbol][:4] # symbol = spotSymbolMapReverse[req.symbol][:4]
symbol = (req.symbol.split('.'))[0] symbol = (req.symbol.split('.'))[0]
localNo = req.orderID localNo = req.orderID
@ -1288,9 +1394,8 @@ nel': u'ok_sub_spot_etc_usdt_order'}
"""生成时间""" """生成时间"""
dt = datetime.fromtimestamp(float(s) / 1e3) dt = datetime.fromtimestamp(float(s) / 1e3)
time = dt.strftime("%H:%M:%S.%f") time = dt.strftime("%H:%M:%S.%f")
date = dt.strftime("%Y%m%d") date = dt.strftime("%Y-%m-%d")
return date, time return date, time, dt
######################################################################## ########################################################################
class OkexFuturesApi(WsFuturesApi): class OkexFuturesApi(WsFuturesApi):
@ -1309,9 +1414,10 @@ class OkexFuturesApi(WsFuturesApi):
self.cbDict = {} self.cbDict = {}
self.tickDict = {} self.tickDict = {}
self.orderDict = {} self.orderDict = {}
self.localOrderDict = {} # 本地缓存的order_dictkey 是 localNo.gatewayName
self.channelSymbolMap = {} self.channelSymbolMap = {}
self.localNo = 0 # 本地委托号 self.localNo = 1 # 本地委托号
self.localNoQueue = Queue() # 未收到系统委托号的本地委托号队列 self.localNoQueue = Queue() # 未收到系统委托号的本地委托号队列
self.localNoDict = {} # key为本地委托号value为系统委托号 self.localNoDict = {} # key为本地委托号value为系统委托号
self.orderIdDict = {} # key为系统委托号value为本地委托号 self.orderIdDict = {} # key为系统委托号value为本地委托号
@ -1319,12 +1425,11 @@ class OkexFuturesApi(WsFuturesApi):
self.recordOrderId_BefVolume = {} # 记录的之前处理的量 self.recordOrderId_BefVolume = {} # 记录的之前处理的量
self.cache_some_order = {}
self.tradeID = 0 self.tradeID = 0
self.registerSymbolPairArray = set([]) self.registerSymbolPairArray = set([])
self._use_leverage = "10" self._use_leverage = "10" # 缺省使用的杠杆比率
self.bids_depth_dict = {} self.bids_depth_dict = {}
self.asks_depth_dict = {} self.asks_depth_dict = {}
@ -1335,11 +1440,21 @@ class OkexFuturesApi(WsFuturesApi):
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def setLeverage(self, __leverage): def setLeverage(self, __leverage):
"""
设置杠杆比率
:param __leverage:
:return:
"""
self._use_leverage = __leverage self._use_leverage = __leverage
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def onMessage(self, ws, evt): def onMessage(self, ws, evt):
"""信息推送""" """
信息推送的处理
:param ws:
:param evt:
:return:
"""
# str => json # str => json
ws_data = self.readData(evt) ws_data = self.readData(evt)
@ -1371,10 +1486,11 @@ class OkexFuturesApi(WsFuturesApi):
login_data = data['data'] login_data = data['data']
result = login_data['result'] if 'result' in login_data else False result = login_data['result'] if 'result' in login_data else False
if result: if result:
self.writeLog(u'login success: {}'.format(datetime.now())) self.writeLog(u'登录成功: {}'.format(datetime.now()))
self.gateway.futures_connected = True self.gateway.futures_connected = True
else: else:
print(u'login fail: {},data:{}'.format(datetime.now(), data)) self.gateway.writeError(u'登录失败')
self.writeLog(u'登录失败: {},data:{}'.format(datetime.now(), data))
continue continue
# 功能请求回复 # 功能请求回复
@ -1387,7 +1503,8 @@ class OkexFuturesApi(WsFuturesApi):
self.writeLog(u'请求:{} 成功:'.format(channel_value)) self.writeLog(u'请求:{} 成功:'.format(channel_value))
continue continue
else: else:
print(u'not data in :addChannel:{}'.format(data)) self.gateway.writeError(u'addChannel回复数据没有data')
self.writeLog(u'not data in :addChannel:{}'.format(data))
# 其他回调/数据推送 # 其他回调/数据推送
callback = self.cbDict.get(channel_value) callback = self.cbDict.get(channel_value)
@ -1395,14 +1512,16 @@ class OkexFuturesApi(WsFuturesApi):
try: try:
callback(data) callback(data)
except Exception as ex: except Exception as ex:
print(u'onMessage call back {} exception{},{}'.format(channel_value, str(ex), self.gateway.writeError(u'回调{}发生异常'.format(channel_value))
self.writeLog(u'onMessage call back {} exception{},{}'.format(channel_value, str(ex),
traceback.format_exc())) traceback.format_exc()))
else: else:
print(u'unkonw msg:{}'.format(data)) self.gateway.writeError(u'出现无回调处理的数据')
self.writeLog(u'unkonw msg:{}'.format(data))
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def onError(self, ws, evt): def onError(self, ws, evt):
"""错误推送""" """重载WsFutureApi.onError错误Event推送"""
error = VtErrorData() error = VtErrorData()
error.gatewayName = self.gatewayName error.gatewayName = self.gatewayName
error.errorMsg = str(evt) error.errorMsg = str(evt)
@ -1410,30 +1529,36 @@ class OkexFuturesApi(WsFuturesApi):
# #---------------------------------------------------------------------- # #----------------------------------------------------------------------
def onErrorMsg(self, data): def onErrorMsg(self, data):
"""错误信息处理"""
error = VtErrorData() error = VtErrorData()
error.gatewayName = self.gatewayName error.gatewayName = self.gatewayName
error_code = str(data["data"]["error_code"]) if 'data' in data and 'error_code' in data['data']:
error.errorID = error_code error_code = str(data["data"]["error_code"])
error.errorMsg = u'FutureApi Error:{}'.format(FUTURES_ERROR_DICT.get(error_code)) error.errorID = error_code
self.gateway.onError(error) error.errorMsg = u'FutureApi Error:{}'.format(FUTURES_ERROR_DICT.get(error_code))
self.gateway.onError(error)
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def reconnect(self): def reconnect(self):
while not self.gateway.connected_contract: """
重连
:return:
"""
while not self.gateway.futures_connected:
self.writeLog(u'okex Api_contract 等待10秒后重新连接') self.writeLog(u'okex Api_contract 等待10秒后重新连接')
self.connect_Contract(self.apiKey, self.secretKey, self.trace) self.connect(self.apiKey, self.secretKey, self.trace)
sleep(10) sleep(10)
if not self.gateway.connected_contract: if not self.gateway.futures_connected:
self.reconnect() self.reconnect()
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def onClose(self, ws): def onClose(self, ws):
"""接口断开""" """接口断开"""
# 如果尚未连上,则忽略该次断开提示 # 如果尚未连上,则忽略该次断开提示
if not self.gateway.connected_contract: if not self.gateway.futures_connected:
return return
self.gateway.connected_contract = False self.gateway.futures_connected = False
self.writeLog(u'服务器连接断开') self.writeLog(u'服务器连接断开')
# 重新连接 # 重新连接
@ -1450,7 +1575,11 @@ class OkexFuturesApi(WsFuturesApi):
""" """
arr = symbol.split('.') arr = symbol.split('.')
symbol_pair = arr[0] symbol_pair = arr[0]
(symbol, contract_type, leverage) = symbol_pair.split(':') l = symbol_pair.split(':')
if len(l) !=3:
self.gateway.writeError(u'合约代码{}错误:'.format(symbol))
raise ValueError(u'合约代码{}错误:'.format(symbol))
symbol, contract_type, leverage = l[0], l[1], l[2]
symbol = symbol.replace("_usd", "") symbol = symbol.replace("_usd", "")
return (symbol_pair, symbol, contract_type, leverage) return (symbol_pair, symbol, contract_type, leverage)
@ -1513,8 +1642,8 @@ class OkexFuturesApi(WsFuturesApi):
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def onOpen(self, ws): def onOpen(self, ws):
"""连接成功""" """连接成功"""
self.gateway.connected_contract = True self.gateway.futures_connected = True
self.writeLog(u'服务器OKEX合约连接成功') self.writeLog(u'服务器OKEX期货连接成功')
self.initCallback() self.initCallback()
@ -1533,7 +1662,7 @@ class OkexFuturesApi(WsFuturesApi):
contract = VtContractData() contract = VtContractData()
contract.gatewayName = self.gatewayName contract.gatewayName = self.gatewayName
contract.symbol = use_symbol_name # + "." + EXCHANGE_OKEX contract.symbol = use_symbol_name + "." + EXCHANGE_OKEX
contract.exchange = EXCHANGE_OKEX contract.exchange = EXCHANGE_OKEX
contract.vtSymbol = contract.symbol contract.vtSymbol = contract.symbol
contract.name = u'期货%s_%s_%s' % (symbol, use_contract_type, self._use_leverage) contract.name = u'期货%s_%s_%s' % (symbol, use_contract_type, self._use_leverage)
@ -1544,7 +1673,7 @@ class OkexFuturesApi(WsFuturesApi):
# print contract.vtSymbol , contract.name # print contract.vtSymbol , contract.name
quanyi_vtSymbol = symbol + "_usd_future_qy" # + "."+ EXCHANGE_OKEX quanyi_vtSymbol = symbol + "_usd_future_qy" + "."+ EXCHANGE_OKEX
contract = VtContractData() contract = VtContractData()
contract.gatewayName = self.gatewayName contract.gatewayName = self.gatewayName
contract.symbol = quanyi_vtSymbol contract.symbol = quanyi_vtSymbol
@ -1715,7 +1844,7 @@ class OkexFuturesApi(WsFuturesApi):
if symbol not in self.tickDict: if symbol not in self.tickDict:
tick = VtTickData() tick = VtTickData()
tick.exchange = EXCHANGE_OKEX tick.exchange = EXCHANGE_OKEX
tick.symbol = symbol # '.'.join([symbol, tick.exchange]) tick.symbol = '.'.join([symbol, tick.exchange])
tick.vtSymbol = tick.symbol tick.vtSymbol = tick.symbol
tick.gatewayName = self.gatewayName tick.gatewayName = self.gatewayName
@ -1936,27 +2065,41 @@ h_this_week_usd'}
data = ws_data.get('data', {}) data = ws_data.get('data', {})
error_code = data.get('error_code') error_code = data.get('error_code')
if error_code is not None: if error_code is not None:
self.gateway.writeError(u'委托返回错误:{}'.format(FUTURES_ERROR_DICT.get(error_code)), error_id=error_code) self.gateway.writeError(u'onFutureOrder委托返回错误:{}'.format(FUTURES_ERROR_DICT.get(error_code)), error_id=error_code)
self.gateway.writeLog(ws_data) self.gateway.writeLog(ws_data)
return localNo = self.localNoQueue.get_nowait()
orderId = data.get('order_id') if localNo is None:
return
if orderId is None: self.gateway.writeLog(u'onFutureOrder移除本地localNo:{}'.format(localNo))
self.gateway.writeError(u'SpotApi.onFutureOrder 委托返回中没有orderid') vtOrderId = '.'.join([self.gatewayName,str(localNo)])
order = self.localOrderDict.get(vtOrderId)
if order:
dt = datetime.now()
order.cancelTime = dt.strftime("%H:%M:%S.%f")
order.status = STATUS_REJECTED
self.gateway.writeLog(u'onFutureOrder发出OnOrder拒单,vtOrderId={}'.format(vtOrderId))
self.gateway.onOrder(order)
return
ok_order_id = data.get('order_id')
if ok_order_id is None:
self.gateway.writeError(u'FuturesApi.onFutureOrder 委托返回中没有orderid')
self.gateway.writeLog(ws_data) self.gateway.writeLog(ws_data)
return return
orderId = str(orderId) ok_order_id = str(ok_order_id)
# 从本地编号Queue中FIFO提取最早的localNo # 从本地编号Queue中FIFO提取最早的localNo
localNo = self.localNoQueue.get_nowait() localNo = self.localNoQueue.get_nowait()
if localNo is None: if localNo is None:
self.gateway.writeError(u'SpotApi.onSportOrder未找到本地LocalNo检查日志') self.gateway.writeError(u'FuturesApi.onSportOrder未找到本地LocalNo检查日志')
self.gateway.writeLog(ws_data) self.gateway.writeLog(ws_data)
return return
self.localNoDict[localNo] = orderId self.gateway.writeLog(u'FuturesApi.onSportOrder,绑定 local:{} <==> ok_order_id:{}'.format(localNo, ok_order_id))
self.orderIdDict[orderId] = localNo self.localNoDict[localNo] = ok_order_id
self.orderIdDict[ok_order_id] = localNo
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
""" """
@ -1977,30 +2120,32 @@ h_this_week_usd'}
data = ws_data.get('data', {}) data = ws_data.get('data', {})
if 'error_code' in data: if 'error_code' in data:
error_id = data.get('error_code', 0) error_code = data.get('error_code', 0)
self.gateway.writeError(u'SpotApi.onFutureOrderCancel 委托返回错误:{}'.format(FUTURES_ERROR_DICT.get(error_code)), error_id=error_code)
self.gateway.writeError(u'SpotApi.onFutureOrderCancel 委托返回错误:{}'.format(FUTURES_ERROR_DICT.get(error_code)), error_id=error_id)
self.gateway.writeLog(ws_data) self.gateway.writeLog(ws_data)
return return
orderId = data.get('order_id') ok_order_id = data.get('order_id')
if orderId is None: if ok_order_id is None:
self.gateway.writeError(u'SpotApi.onFutureOrderCancel 委托返回中没有orderid') self.gateway.writeError(u'SpotApi.onFutureOrderCancel 委托返回中没有orderid')
self.gateway.writeLog(ws_data) self.gateway.writeLog(ws_data)
return return
orderId = str(orderId)
ok_order_id = str(ok_order_id)
# 获取本地委托流水号 # 获取本地委托流水号
localNo = self.orderIdDict[orderId] localNo = self.orderIdDict[ok_order_id]
# 发送onOrder事件 # 发送onOrder事件
order = self.orderDict[orderId] order = self.orderDict[ok_order_id]
dt = datetime.now()
order.cancelTime = dt.strftime("%H:%M:%S.%f")
order.status = STATUS_CANCELLED order.status = STATUS_CANCELLED
self.gateway.onOrder(order) self.gateway.onOrder(order)
# 删除本地委托号与orderid的绑定 # 删除本地委托号与orderid的绑定
del self.orderDict[orderId] #del self.orderDict[orderId]
del self.orderIdDict[orderId] #del self.orderIdDict[orderId]
del self.localNoDict[localNo] #del self.localNoDict[localNo]
''' '''
@ -2192,8 +2337,8 @@ u'contracts': [], u'balance': 0, u'rights': 0}, u'xrp': {u'contracts': [], u'bal
data = ws_data.get("data", {}) data = ws_data.get("data", {})
for d in data.get('orders', []): for order_data in data.get('orders', []):
orderId = str(d['order_id']) orderId = str(order_data['order_id'])
localNo = str(self.localNo) localNo = str(self.localNo)
if orderId not in self.orderIdDict: if orderId not in self.orderIdDict:
@ -2217,27 +2362,27 @@ u'contracts': [], u'balance': 0, u'rights': 0}, u'xrp': {u'contracts': [], u'bal
if orderId not in self.orderDict: if orderId not in self.orderDict:
order = VtOrderData() order = VtOrderData()
order.gatewayName = self.gatewayName order.gatewayName = self.gatewayName
contract_name = d["contract_name"] contract_name = order_data["contract_name"]
dic_info = self.contract_name_dict[contract_name] dic_info = self.contract_name_dict[contract_name]
use_contract_type = dic_info["contract_type"] use_contract_type = dic_info["contract_type"]
order.symbol = '.'.join([d["symbol"] + ":" + use_contract_type + ":" + str(self._use_leverage)]) #, EXCHANGE_OKEX]) order.symbol = '.'.join([order_data["symbol"] + ":" + use_contract_type + ":" + str(self._use_leverage)]) #, EXCHANGE_OKEX])
order.vtSymbol = order.symbol order.vtSymbol = order.symbol
order.orderID = self.orderIdDict[orderId] # 更新orderId为本地的序列号 order.orderID = self.orderIdDict[orderId] # 更新orderId为本地的序列号
order.vtOrderID = '.'.join([self.gatewayName, order.orderID]) order.vtOrderID = '.'.join([self.gatewayName, order.orderID])
order.price = d['price'] order.price = order_data['price']
order.totalVolume = d['amount'] order.totalVolume = order_data['amount']
order.direction, offset = priceContractOffsetTypeMap[str(d['type'])] order.direction, offset = priceContractOffsetTypeMap[str(order_data['type'])]
self.orderDict[orderId] = order self.orderDict[orderId] = order
self.gateway.writeLog(u'新增本地orderDict缓存,okex orderId:{},order.orderid:{}'.format(orderId,order.orderID)) self.gateway.writeLog(u'新增本地orderDict缓存,okex orderId:{},order.orderid:{}'.format(orderId, order.orderID))
else: else:
order = self.orderDict[orderId] order = self.orderDict[orderId]
order.tradedVolume = d['deal_amount'] order.tradedVolume = order_data['deal_amount']
order.status = statusMap[int(d['status'])] order.status = statusMap[int(order_data['status'])]
# 推送到OnOrder中 # 推送到OnOrder中
self.gateway.onOrder(copy(order)) self.gateway.onOrder(copy(order))
@ -2331,7 +2476,7 @@ user_id': 6548935, u'contract_id': 201802160040063L, u'price': 24.555, u'create_
order.tradedVolume = float(data['deal_amount']) order.tradedVolume = float(data['deal_amount'])
order.status = statusMap[data['status']] order.status = statusMap[data['status']]
self.orderDict[orderId] = order self.orderDict[orderId] = order
self.gateway.writeLog(u'新增order,orderid:{},symbol:{},data:{}'.format(order.orderID,order.symbol),data) self.gateway.writeLog(u'新增order,orderid:{},symbol:{},data:{}'.format(order.orderID,order.symbol,data))
else: else:
# 更新成交数量/状态 # 更新成交数量/状态
order = self.orderDict[orderId] order = self.orderDict[orderId]
@ -2617,22 +2762,48 @@ utureusd_positions'}
:param req: :param req:
:return: :return:
""" """
symbol_pair, symbol, contract_type, leverage = self.dealSymbolFunc(req.symbol) try:
(symbol_pair, symbol, contract_type, leverage) = self.dealSymbolFunc(req.symbol)
except Exception as ex:
self.gateway.writeError(u'请求合约代码格式错误:{}'.format(req.symbol))
self.writeLog(u'futureSendOrder 请求合约代码格式错误:{},exception:{},{}'.format(req.symbol,str(ex),traceback.format_exc()))
return ''
# print symbol_pair , symbol, contract_type , leverage # print symbol_pair , symbol, contract_type , leverage
type_ = priceContractTypeMapReverse[(req.direction, req.offset)] type_ = priceContractTypeMapReverse[(req.direction, req.offset)]
self.writeLog(u'futureSendOrder:{},{},{},{},{}'.format(symbol_pair, symbol, contract_type, leverage, type_))
# 本地委托号加1并将对应字符串保存到队列中返回基于本地委托号的vtOrderID # 本地委托号加1并将对应字符串保存到队列中返回基于本地委托号的vtOrderID
self.localNo += 1 self.localNo += 1
self.localNoQueue.put(str(self.localNo)) self.localNoQueue.put(str(self.localNo))
vtOrderID = '.'.join([self.gatewayName, str(self.localNo)]) vtOrderID = '.'.join([self.gatewayName, str(self.localNo)])
self.writeLog(u'futureSendOrder:创建本地订单:orderId:{}'.format(vtOrderID))
# print symbol + "_usd", contract_type , type_ , str(req.price), str(req.volume) order = VtOrderData()
self.futureTrade(symbol + "_usd", contract_type, type_, str(req.price), str(req.volume), order.gatewayName = self.gatewayName
_lever_rate=self._use_leverage) order.symbol = symbol
order.vtSymbol = order.symbol
order.orderID = self.localNo
order.vtOrderID = vtOrderID
order.price = req.price
order.totalVolume = req.volume
order.direction = req.direction
order.offset = req.offset
order.status = STATUS_NOTTRADED
dt = datetime.now()
order.orderTime = dt.strftime("%H:%M:%S.%f")
self.localOrderDict[vtOrderID] = order
return vtOrderID self.writeLog(u'futureSendOrder 发送:symbol:{},合约类型:{},交易类型:{},价格:{},委托量:{}'.
format(symbol + "_usd", contract_type , type_ , str(req.price), str(req.volume)))
try:
self.futureTrade(symbol + "_usd", contract_type, type_, str(req.price), str(req.volume),
_lever_rate=self._use_leverage)
return vtOrderID
except Exception as ex:
self.gateway.writeError(u'futureSendOrder发送委托失败:{}'.format(str(ex)))
self.writeLog(u'futureSendOrder发送委托失败.{}'.format(traceback.format_exc()))
return ''
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def futureCancel(self, req): def futureCancel(self, req):
""" """
@ -2640,7 +2811,7 @@ utureusd_positions'}
:param req: :param req:
:return: :return:
""" """
symbol_pair, symbol, contract_type, leverage = self.dealSymbolFunc(req.symbol) (symbol_pair, symbol, contract_type, leverage) = self.dealSymbolFunc(req.symbol)
localNo = req.orderID localNo = req.orderID