[Mod]修改HuobiGateway的委托查询机制

This commit is contained in:
vn.py 2018-05-21 10:31:47 +08:00
parent e661a9f5d3
commit eddfd56e6d

View File

@ -6,7 +6,7 @@ vn.sec的gateway接入
import os import os
import json import json
from datetime import datetime from datetime import datetime, timedelta
from copy import copy from copy import copy
from math import pow from math import pow
@ -34,7 +34,7 @@ def print_dict(d):
l.sort() l.sort()
for k in l: for k in l:
print '%s:%s' %(k, d[k]) print '%s:%s' %(k, d[k])
######################################################################## ########################################################################
class HuobiGateway(VtGateway): class HuobiGateway(VtGateway):
@ -44,21 +44,21 @@ class HuobiGateway(VtGateway):
def __init__(self, eventEngine, gatewayName='HUOBI'): def __init__(self, eventEngine, gatewayName='HUOBI'):
"""Constructor""" """Constructor"""
super(HuobiGateway, self).__init__(eventEngine, gatewayName) super(HuobiGateway, self).__init__(eventEngine, gatewayName)
self.dataApi = HuobiDataApi(self) # 行情API self.dataApi = HuobiDataApi(self) # 行情API
self.tradeApi = HuobiTradeApi(self) # 交易API self.tradeApi = HuobiTradeApi(self) # 交易API
self.mdConnected = False # 行情API连接状态登录完成后为True self.mdConnected = False # 行情API连接状态登录完成后为True
self.tdConnected = False # 交易API连接状态 self.tdConnected = False # 交易API连接状态
self.qryEnabled = False # 是否要启动循环查询 self.qryEnabled = False # 是否要启动循环查询
self.fileName = self.gatewayName + '_connect.json' self.fileName = self.gatewayName + '_connect.json'
self.filePath = getJsonPath(self.fileName, __file__) self.filePath = getJsonPath(self.fileName, __file__)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def connect(self): def connect(self):
"""连接""" """连接"""
try: try:
f = file(self.filePath) f = file(self.filePath)
except IOError: except IOError:
@ -67,7 +67,7 @@ class HuobiGateway(VtGateway):
log.logContent = u'读取连接配置出错,请检查' log.logContent = u'读取连接配置出错,请检查'
self.onLog(log) self.onLog(log)
return return
# 解析json文件 # 解析json文件
setting = json.load(f) setting = json.load(f)
try: try:
@ -82,37 +82,37 @@ class HuobiGateway(VtGateway):
log.gatewayName = self.gatewayName log.gatewayName = self.gatewayName
log.logContent = u'连接配置缺少字段,请检查' log.logContent = u'连接配置缺少字段,请检查'
self.onLog(log) self.onLog(log)
return return
# 创建行情和交易接口对象 # 创建行情和交易接口对象
self.dataApi.connect(exchange, proxyHost, proxyPort) self.dataApi.connect(exchange, proxyHost, proxyPort)
self.tradeApi.connect(exchange, accessKey, secretKey, symbols) self.tradeApi.connect(exchange, accessKey, secretKey, symbols)
# 初始化并启动查询 # 初始化并启动查询
self.initQuery() self.initQuery()
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def subscribe(self, subscribeReq): def subscribe(self, subscribeReq):
"""订阅行情""" """订阅行情"""
self.dataApi.subscribe(subscribeReq) self.dataApi.subscribe(subscribeReq)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def sendOrder(self, orderReq): def sendOrder(self, orderReq):
"""发单""" """发单"""
return self.tradeApi.sendOrder(orderReq) return self.tradeApi.sendOrder(orderReq)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def cancelOrder(self, cancelOrderReq): def cancelOrder(self, cancelOrderReq):
"""撤单""" """撤单"""
self.tradeApi.cancelOrder(cancelOrderReq) self.tradeApi.cancelOrder(cancelOrderReq)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def qryInfo(self): def qryInfo(self):
"""查询委托、成交、持仓""" """查询委托、成交、持仓"""
self.tradeApi.qryOrder() self.tradeApi.qryOrder()
self.tradeApi.qryTrade() self.tradeApi.qryTrade()
self.tradeApi.qryPosition() self.tradeApi.qryPosition()
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def close(self): def close(self):
"""关闭""" """关闭"""
@ -120,43 +120,43 @@ class HuobiGateway(VtGateway):
self.dataApi.close() self.dataApi.close()
if self.tdConnected: if self.tdConnected:
self.tradeApi.close() self.tradeApi.close()
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def initQuery(self): def initQuery(self):
"""初始化连续查询""" """初始化连续查询"""
if self.qryEnabled: if self.qryEnabled:
# 需要循环的查询函数列表 # 需要循环的查询函数列表
self.qryFunctionList = [self.qryInfo] self.qryFunctionList = [self.qryInfo]
self.qryCount = 0 # 查询触发倒计时 self.qryCount = 0 # 查询触发倒计时
self.qryTrigger = 1 # 查询触发点 self.qryTrigger = 1 # 查询触发点
self.qryNextFunction = 0 # 上次运行的查询函数索引 self.qryNextFunction = 0 # 上次运行的查询函数索引
self.startQuery() self.startQuery()
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def query(self, event): def query(self, event):
"""注册到事件处理引擎上的查询函数""" """注册到事件处理引擎上的查询函数"""
self.qryCount += 1 self.qryCount += 1
if self.qryCount > self.qryTrigger: if self.qryCount > self.qryTrigger:
# 清空倒计时 # 清空倒计时
self.qryCount = 0 self.qryCount = 0
# 执行查询函数 # 执行查询函数
function = self.qryFunctionList[self.qryNextFunction] function = self.qryFunctionList[self.qryNextFunction]
function() function()
# 计算下次查询函数的索引如果超过了列表长度则重新设为0 # 计算下次查询函数的索引如果超过了列表长度则重新设为0
self.qryNextFunction += 1 self.qryNextFunction += 1
if self.qryNextFunction == len(self.qryFunctionList): if self.qryNextFunction == len(self.qryFunctionList):
self.qryNextFunction = 0 self.qryNextFunction = 0
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def startQuery(self): def startQuery(self):
"""启动连续查询""" """启动连续查询"""
self.eventEngine.register(EVENT_TIMER, self.query) self.eventEngine.register(EVENT_TIMER, self.query)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def setQryEnabled(self, qryEnabled): def setQryEnabled(self, qryEnabled):
"""设置是否要启动循环查询""" """设置是否要启动循环查询"""
@ -171,16 +171,16 @@ class HuobiDataApi(DataApi):
def __init__(self, gateway): def __init__(self, gateway):
"""Constructor""" """Constructor"""
super(HuobiDataApi, self).__init__() super(HuobiDataApi, self).__init__()
self.gateway = gateway # gateway对象 self.gateway = gateway # gateway对象
self.gatewayName = gateway.gatewayName # gateway对象名称 self.gatewayName = gateway.gatewayName # gateway对象名称
self.connectionStatus = False # 连接状态 self.connectionStatus = False # 连接状态
self.tickDict = {} self.tickDict = {}
self.subscribeDict = {} self.subscribeDict = {}
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def connect(self, exchange, proxyHost, proxyPort): def connect(self, exchange, proxyHost, proxyPort):
"""连接服务器""" """连接服务器"""
@ -188,43 +188,43 @@ class HuobiDataApi(DataApi):
url = 'wss://api.huobi.pro/ws' url = 'wss://api.huobi.pro/ws'
else: else:
url = 'wss://api.hadax.com/ws' url = 'wss://api.hadax.com/ws'
if proxyHost: if proxyHost:
self.connectionStatus = super(HuobiDataApi, self).connect(url, proxyHost, proxyPort) self.connectionStatus = super(HuobiDataApi, self).connect(url, proxyHost, proxyPort)
else: else:
self.connectionStatus = super(HuobiDataApi, self).connect(url) self.connectionStatus = super(HuobiDataApi, self).connect(url)
self.gateway.mdConnected = True self.gateway.mdConnected = True
if self.connectionStatus: if self.connectionStatus:
self.writeLog(u'行情服务器连接成功') self.writeLog(u'行情服务器连接成功')
# 订阅所有之前订阅过的行情 # 订阅所有之前订阅过的行情
for req in self.subscribeDict.values(): for req in self.subscribeDict.values():
self.subscribe(req) self.subscribe(req)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def subscribe(self, subscribeReq): def subscribe(self, subscribeReq):
"""订阅合约""" """订阅合约"""
self.subscribeDict[subscribeReq.symbol] = subscribeReq self.subscribeDict[subscribeReq.symbol] = subscribeReq
if not self.connectionStatus: if not self.connectionStatus:
return return
symbol = subscribeReq.symbol symbol = subscribeReq.symbol
if symbol in self.tickDict: if symbol in self.tickDict:
return return
tick = VtTickData() tick = VtTickData()
tick.gatewayName = self.gatewayName tick.gatewayName = self.gatewayName
tick.symbol = symbol tick.symbol = symbol
tick.exchange = EXCHANGE_HUOBI tick.exchange = EXCHANGE_HUOBI
tick.vtSymbol = '.'.join([tick.symbol, tick.exchange]) tick.vtSymbol = '.'.join([tick.symbol, tick.exchange])
self.tickDict[symbol] = tick self.tickDict[symbol] = tick
self.subscribeMarketDepth(symbol) self.subscribeMarketDepth(symbol)
self.subscribeMarketDetail(symbol) self.subscribeMarketDetail(symbol)
#self.subscribeTradeDetail(symbol) #self.subscribeTradeDetail(symbol)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def writeLog(self, content): def writeLog(self, content):
"""发出日志""" """发出日志"""
@ -232,7 +232,7 @@ class HuobiDataApi(DataApi):
log.gatewayName = self.gatewayName log.gatewayName = self.gatewayName
log.logContent = content log.logContent = content
self.gateway.onLog(log) self.gateway.onLog(log)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onError(self, msg): def onError(self, msg):
"""错误推送""" """错误推送"""
@ -241,74 +241,74 @@ class HuobiDataApi(DataApi):
err.errorID = 'Data' err.errorID = 'Data'
err.errorMsg = msg err.errorMsg = msg
self.gateway.onError(err) self.gateway.onError(err)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onMarketDepth(self, data): def onMarketDepth(self, data):
"""行情深度推送 """ """行情深度推送 """
symbol = data['ch'].split('.')[1] symbol = data['ch'].split('.')[1]
tick = self.tickDict.get(symbol, None) tick = self.tickDict.get(symbol, None)
if not tick: if not tick:
return return
tick.datetime = datetime.fromtimestamp(data['ts']/1000) tick.datetime = datetime.fromtimestamp(data['ts']/1000)
tick.date = tick.datetime.strftime('%Y%m%d') tick.date = tick.datetime.strftime('%Y%m%d')
tick.time = tick.datetime.strftime('%H:%M:%S.%f') tick.time = tick.datetime.strftime('%H:%M:%S.%f')
bids = data['tick']['bids'] bids = data['tick']['bids']
for n in range(5): for n in range(5):
l = bids[n] l = bids[n]
tick.__setattr__('bidPrice' + str(n+1), l[0]) tick.__setattr__('bidPrice' + str(n+1), l[0])
tick.__setattr__('bidVolume' + str(n+1), l[1]) tick.__setattr__('bidVolume' + str(n+1), l[1])
asks = data['tick']['asks'] asks = data['tick']['asks']
for n in range(5): for n in range(5):
l = asks[n] l = asks[n]
tick.__setattr__('askPrice' + str(n+1), l[0]) tick.__setattr__('askPrice' + str(n+1), l[0])
tick.__setattr__('askVolume' + str(n+1), l[1]) tick.__setattr__('askVolume' + str(n+1), l[1])
#print '-' * 50 #print '-' * 50
#for d in data['tick']['asks']: #for d in data['tick']['asks']:
#print 'ask', d #print 'ask', d
#for d in data['tick']['bids']: #for d in data['tick']['bids']:
#print 'bid', d #print 'bid', d
#print '-' * 50 #print '-' * 50
#print 'ask5', tick.askPrice5, tick.askVolume5 #print 'ask5', tick.askPrice5, tick.askVolume5
#print 'ask4', tick.askPrice4, tick.askVolume4 #print 'ask4', tick.askPrice4, tick.askVolume4
#print 'ask3', tick.askPrice3, tick.askVolume3 #print 'ask3', tick.askPrice3, tick.askVolume3
#print 'ask2', tick.askPrice2, tick.askVolume2 #print 'ask2', tick.askPrice2, tick.askVolume2
#print 'ask1', tick.askPrice1, tick.askVolume1 #print 'ask1', tick.askPrice1, tick.askVolume1
#print 'bid1', tick.bidPrice1, tick.bidVolume1 #print 'bid1', tick.bidPrice1, tick.bidVolume1
#print 'bid2', tick.bidPrice2, tick.bidVolume2 #print 'bid2', tick.bidPrice2, tick.bidVolume2
#print 'bid3', tick.bidPrice3, tick.bidVolume3 #print 'bid3', tick.bidPrice3, tick.bidVolume3
#print 'bid4', tick.bidPrice4, tick.bidVolume4 #print 'bid4', tick.bidPrice4, tick.bidVolume4
#print 'bid5', tick.bidPrice5, tick.bidVolume5 #print 'bid5', tick.bidPrice5, tick.bidVolume5
if tick.lastPrice: if tick.lastPrice:
newtick = copy(tick) newtick = copy(tick)
self.gateway.onTick(tick) self.gateway.onTick(tick)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onTradeDetail(self, data): def onTradeDetail(self, data):
"""成交细节推送""" """成交细节推送"""
print data print data
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onMarketDetail(self, data): def onMarketDetail(self, data):
"""市场细节推送""" """市场细节推送"""
symbol = data['ch'].split('.')[1] symbol = data['ch'].split('.')[1]
tick = self.tickDict.get(symbol, None) tick = self.tickDict.get(symbol, None)
if not tick: if not tick:
return return
tick.datetime = datetime.fromtimestamp(data['ts']/1000) tick.datetime = datetime.fromtimestamp(data['ts']/1000)
tick.date = tick.datetime.strftime('%Y%m%d') tick.date = tick.datetime.strftime('%Y%m%d')
tick.time = tick.datetime.strftime('%H:%M:%S.%f') tick.time = tick.datetime.strftime('%H:%M:%S.%f')
t = data['tick'] t = data['tick']
tick.openPrice = t['open'] tick.openPrice = t['open']
tick.highPrice = t['high'] tick.highPrice = t['high']
@ -316,57 +316,55 @@ class HuobiDataApi(DataApi):
tick.lastPrice = t['close'] tick.lastPrice = t['close']
tick.volume = t['vol'] tick.volume = t['vol']
tick.preClosePrice = tick.openPrice tick.preClosePrice = tick.openPrice
if tick.bidPrice1: if tick.bidPrice1:
newtick = copy(tick) newtick = copy(tick)
self.gateway.onTick(tick) self.gateway.onTick(tick)
######################################################################## ########################################################################
class HuobiTradeApi(TradeApi): class HuobiTradeApi(TradeApi):
"""交易API实现""" """交易API实现"""
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def __init__(self, gateway): def __init__(self, gateway):
"""API对象的初始化函数""" """API对象的初始化函数"""
super(HuobiTradeApi, self).__init__() super(HuobiTradeApi, self).__init__()
self.gateway = gateway # gateway对象 self.gateway = gateway # gateway对象
self.gatewayName = gateway.gatewayName # gateway对象名称 self.gatewayName = gateway.gatewayName # gateway对象名称
self.connectionStatus = False # 连接状态 self.connectionStatus = False # 连接状态
self.accountid = '' self.accountid = ''
self.todayDate = datetime.now().strftime('%Y-%m-%d')
self.orderDict = {} # 缓存委托数据的字典 self.orderDict = {} # 缓存委托数据的字典
self.symbols = [] # 所有交易代码的字符串集合 self.symbols = [] # 所有交易代码的字符串集合
self.qryTradeID = None # 查询起始成交编号 self.qryTradeID = None # 查询起始成交编号
self.tradeIDs = set() # 成交编号集合 self.tradeIDs = set() # 成交编号集合
self.qryOrderID = None # 查询起始委托编号 self.qryOrderID = None # 查询起始委托编号
self.localid = 100000 # 订单编号10000为起始 self.localid = 100000 # 订单编号10000为起始
self.reqLocalDict = {} # 请求编号和本地委托编号映射 self.reqLocalDict = {} # 请求编号和本地委托编号映射
self.localOrderDict = {} # 本地委托编号和交易所委托编号映射 self.localOrderDict = {} # 本地委托编号和交易所委托编号映射
self.orderLocalDict = {} # 交易所委托编号和本地委托编号映射 self.orderLocalDict = {} # 交易所委托编号和本地委托编号映射
self.cancelReqDict = {} # 撤单请求字典 self.cancelReqDict = {} # 撤单请求字典
self.activeOrderSet = set() # 活动委托集合 #self.activeOrderSet = set() # 活动委托集合
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def connect(self, exchange, accessKey, secretKey, symbols=''): def connect(self, exchange, accessKey, secretKey, symbols=''):
"""初始化连接""" """初始化连接"""
if not self.connectionStatus: if not self.connectionStatus:
self.symbols = symbols self.symbols = symbols
self.connectionStatus = self.init(exchange, accessKey, secretKey) self.connectionStatus = self.init(exchange, accessKey, secretKey)
self.gateway.tdConnected = True self.gateway.tdConnected = True
self.start() self.start()
self.writeLog(u'交易服务器连接成功') self.writeLog(u'交易服务器连接成功')
self.getTimestamp() self.getTimestamp()
self.getSymbols() self.getSymbols()
@ -375,64 +373,74 @@ class HuobiTradeApi(TradeApi):
"""查询持仓""" """查询持仓"""
if self.accountid: if self.accountid:
self.getAccountBalance(self.accountid) self.getAccountBalance(self.accountid)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def qryOrder(self): def qryOrder(self):
"""查询委托""" """查询委托"""
if not self.accountid: if not self.accountid:
return return
#states = 'pre-submitted,submitting,submitted,partial-filled,partial-canceled,filled,canceled' now = datetime.now()
states = 'pre-submitted,submitting,submitted,partial-filled' oneday = timedelta(1)
todayDate = now.strftime('%Y-%m-%d')
yesterdayDate = (now - oneday).strftime('%Y-%m-%d')
statesAll = 'pre-submitted,submitting,submitted,partial-filled,partial-canceled,filled,canceled'
statesActive = 'submitted,partial-filled'
for symbol in self.symbols: for symbol in self.symbols:
self.getOrders(symbol, states, startDate=self.todayDate) self.getOrders(symbol, statesAll, startDate=todayDate) # 查询今日所有状态的委托
self.getOrders(symbol, statesActive, endDate=yesterdayDate) # 查询昨日往前所有未结束的委托
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def qryTrade(self): def qryTrade(self):
"""查询成交""" """查询成交"""
if not self.accountid: if not self.accountid:
return return
now = datetime.now()
todayDate = now.strftime('%Y-%m-%d')
for symbol in self.symbols: for symbol in self.symbols:
self.getMatchResults(symbol, startDate=self.todayDate, size=50) self.getMatchResults(symbol, startDate=todayDate, size=50) # 只查询今日最新50笔成交
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def sendOrder(self, orderReq): def sendOrder(self, orderReq):
"""发单""" """发单"""
self.localid += 1 self.localid += 1
localid = str(self.localid) localid = str(self.localid)
vtOrderID = '.'.join([self.gatewayName, localid]) vtOrderID = '.'.join([self.gatewayName, localid])
if orderReq.direction == DIRECTION_LONG: if orderReq.direction == DIRECTION_LONG:
type_ = 'buy-limit' type_ = 'buy-limit'
else: else:
type_ = 'sell-limit' type_ = 'sell-limit'
reqid = self.placeOrder(self.accountid, reqid = self.placeOrder(self.accountid,
str(orderReq.volume), str(orderReq.volume),
orderReq.symbol, orderReq.symbol,
type_, type_,
price=str(orderReq.price), price=str(orderReq.price),
source='api') source='api')
self.reqLocalDict[reqid] = localid self.reqLocalDict[reqid] = localid
# 返回订单号 # 返回订单号
return vtOrderID return vtOrderID
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def cancelOrder(self, cancelOrderReq): def cancelOrder(self, cancelOrderReq):
"""撤单""" """撤单"""
localid = cancelOrderReq.orderID localid = cancelOrderReq.orderID
orderID = self.localOrderDict.get(localid, None) orderID = self.localOrderDict.get(localid, None)
if orderID: if orderID:
super(HuobiTradeApi, self).cancelOrder(orderID) super(HuobiTradeApi, self).cancelOrder(orderID)
if localid in self.cancelReqDict: if localid in self.cancelReqDict:
del self.cancelReqDict[localid] del self.cancelReqDict[localid]
else: else:
self.cancelReqDict[localid] = cancelOrderReq self.cancelReqDict[localid] = cancelOrderReq
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def writeLog(self, content): def writeLog(self, content):
"""发出日志""" """发出日志"""
@ -448,53 +456,53 @@ class HuobiTradeApi(TradeApi):
err.gatewayName = self.gatewayName err.gatewayName = self.gatewayName
err.errorID = 'Trade' err.errorID = 'Trade'
err.errorMsg = msg err.errorMsg = msg
self.gateway.onError(err) self.gateway.onError(err)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onGetSymbols(self, data, reqid): def onGetSymbols(self, data, reqid):
"""查询代码回调""" """查询代码回调"""
for d in data: for d in data:
contract = VtContractData() contract = VtContractData()
contract.gatewayName = self.gatewayName contract.gatewayName = self.gatewayName
contract.symbol = d['base-currency'] + d['quote-currency'] contract.symbol = d['base-currency'] + d['quote-currency']
contract.exchange = EXCHANGE_HUOBI contract.exchange = EXCHANGE_HUOBI
contract.vtSymbol = '.'.join([contract.symbol, contract.exchange]) contract.vtSymbol = '.'.join([contract.symbol, contract.exchange])
contract.name = '/'.join([d['base-currency'].upper(), d['quote-currency'].upper()]) contract.name = '/'.join([d['base-currency'].upper(), d['quote-currency'].upper()])
contract.priceTick = 1 / pow(10, d['price-precision']) contract.priceTick = 1 / pow(10, d['price-precision'])
contract.size = 1 / pow(10, d['amount-precision']) contract.size = 1 / pow(10, d['amount-precision'])
contract.productClass = PRODUCT_SPOT contract.productClass = PRODUCT_SPOT
self.gateway.onContract(contract) self.gateway.onContract(contract)
self.writeLog(u'交易代码查询成功') self.writeLog(u'交易代码查询成功')
self.getAccounts() self.getAccounts()
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onGetCurrencys(self, data, reqid): def onGetCurrencys(self, data, reqid):
"""查询货币回调""" """查询货币回调"""
pass pass
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onGetTimestamp(self, data, reqid): def onGetTimestamp(self, data, reqid):
"""查询时间回调""" """查询时间回调"""
event = Event(EVENT_LOG+'Time') event = Event(EVENT_LOG+'Time')
event.dict_['data'] = datetime.fromtimestamp(data/1000) event.dict_['data'] = datetime.fromtimestamp(data/1000)
self.gateway.eventEngine.put(event) self.gateway.eventEngine.put(event)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onGetAccounts(self, data, reqid): def onGetAccounts(self, data, reqid):
"""查询账户回调""" """查询账户回调"""
for d in data: for d in data:
self.accountid = str(d['id']) self.accountid = str(d['id'])
self.writeLog(u'交易账户%s查询成功' %self.accountid) self.writeLog(u'交易账户%s查询成功' %self.accountid)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onGetAccountBalance(self, data, reqid): def onGetAccountBalance(self, data, reqid):
"""查询余额回调""" """查询余额回调"""
posDict = {} posDict = {}
for d in data['list']: for d in data['list']:
symbol = d['currency'] symbol = d['currency']
pos = posDict.get(symbol, None) pos = posDict.get(symbol, None)
@ -507,189 +515,206 @@ class HuobiTradeApi(TradeApi):
pos.vtSymbol = '.'.join([pos.symbol, pos.exchange]) pos.vtSymbol = '.'.join([pos.symbol, pos.exchange])
pos.direction = DIRECTION_LONG pos.direction = DIRECTION_LONG
pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction]) pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction])
pos.position += float(d['balance']) pos.position += float(d['balance'])
if d['type'] == 'fozen': if d['type'] == 'fozen':
pos.frozen = float(d['balance']) pos.frozen = float(d['balance'])
posDict[symbol] = pos posDict[symbol] = pos
for pos in posDict.values(): for pos in posDict.values():
if pos.position: if pos.position:
self.gateway.onPosition(pos) self.gateway.onPosition(pos)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onGetOrders(self, data, reqid): def onGetOrders(self, data, reqid):
"""查询委托回调""" """查询委托回调"""
# 比对寻找已结束的委托号 # 比对寻找已结束的委托号
"""
newset = set([d['id'] for d in data]) newset = set([d['id'] for d in data])
print '-'*50
print [d['id'] for d in data]
print self.activeOrderSet
for id_ in self.activeOrderSet: for id_ in self.activeOrderSet:
if id_ not in newset: if id_ not in newset:
print 'finished:', id_
self.getOrder(id_) self.getOrder(id_)
self.activeOrderSet = newset #self.activeOrderSet = newset
"""
# 推送数据 # 推送数据
qryOrderID = None #qryOrderID = None
data.reverse() data.reverse()
for d in data: for d in data:
orderID = d['id'] orderID = d['id']
#self.activeOrderSet.add(orderID)
strOrderID = str(orderID) strOrderID = str(orderID)
updated = False updated = False
if strOrderID in self.orderLocalDict: if strOrderID in self.orderLocalDict:
localid = self.orderLocalDict[strOrderID] localid = self.orderLocalDict[strOrderID]
else: else:
self.localid += 1 self.localid += 1
localid = str(self.localid) localid = str(self.localid)
self.orderLocalDict[strOrderID] = localid self.orderLocalDict[strOrderID] = localid
self.localOrderDict[localid] = strOrderID self.localOrderDict[localid] = strOrderID
order = self.orderDict.get(orderID, None) order = self.orderDict.get(orderID, None)
if not order: if not order:
updated = True updated = True
order = VtOrderData() order = VtOrderData()
order.gatewayName = self.gatewayName order.gatewayName = self.gatewayName
order.orderID = localid order.orderID = localid
order.vtOrderID = '.'.join([order.gatewayName, order.orderID]) order.vtOrderID = '.'.join([order.gatewayName, order.orderID])
order.symbol = d['symbol'] order.symbol = d['symbol']
order.exchange = EXCHANGE_HUOBI order.exchange = EXCHANGE_HUOBI
order.vtSymbol = '.'.join([order.symbol, order.exchange]) order.vtSymbol = '.'.join([order.symbol, order.exchange])
order.price = float(d['price']) order.price = float(d['price'])
order.totalVolume = float(d['amount']) order.totalVolume = float(d['amount'])
order.orderTime = datetime.fromtimestamp(d['created-at']/1000).strftime('%H:%M:%S') order.orderTime = datetime.fromtimestamp(d['created-at']/1000).strftime('%H:%M:%S')
if 'buy' in d['type']: if 'buy' in d['type']:
order.direction = DIRECTION_LONG order.direction = DIRECTION_LONG
else: else:
order.direction = DIRECTION_SHORT order.direction = DIRECTION_SHORT
order.offset = OFFSET_NONE order.offset = OFFSET_NONE
self.orderDict[orderID] = order self.orderDict[orderID] = order
# 数据更新,只有当成交数量或者委托状态变化时,才执行推送 # 数据更新,只有当成交数量或者委托状态变化时,才执行推送
if d['canceled-at']: if d['canceled-at']:
order.cancelTime = datetime.fromtimestamp(d['canceled-at']/1000).strftime('%H:%M:%S') order.cancelTime = datetime.fromtimestamp(d['canceled-at']/1000).strftime('%H:%M:%S')
newTradedVolume = d['field-amount'] newTradedVolume = d['field-amount']
newStatus = statusMapReverse.get(d['state'], STATUS_UNKNOWN) newStatus = statusMapReverse.get(d['state'], STATUS_UNKNOWN)
if newTradedVolume != order.tradedVolume or newStatus != order.status: if newTradedVolume != order.tradedVolume or newStatus != order.status:
updated = True updated = True
order.tradedVolume = float(newTradedVolume) order.tradedVolume = float(newTradedVolume)
order.status = newStatus order.status = newStatus
# 只推送有更新的数据 # 只推送有更新的数据
if updated: if updated:
self.gateway.onOrder(order) self.gateway.onOrder(order)
## 计算查询下标(即最早的未全成或撤委托) ## 计算查询下标(即最早的未全成或撤委托)
#if order.status not in [STATUS_ALLTRADED, STATUS_CANCELLED]: #if order.status not in [STATUS_ALLTRADED, STATUS_CANCELLED]:
#if not qryOrderID: #if not qryOrderID:
#qryOrderID = orderID #qryOrderID = orderID
#else: #else:
#qryOrderID = min(qryOrderID, orderID) #qryOrderID = min(qryOrderID, orderID)
## 更新查询下标 ## 更新查询下标
#if qryOrderID: #if qryOrderID:
#self.qryOrderID = qryOrderID #self.qryOrderID = qryOrderID
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onGetMatchResults(self, data, reqid): def onGetMatchResults(self, data, reqid):
"""查询成交回调""" """查询成交回调"""
data.reverse() data.reverse()
for d in data: for d in data:
tradeID = d['match-id'] tradeID = d['match-id']
# 成交仅需要推送一次,去重判断 # 成交仅需要推送一次,去重判断
if tradeID in self.tradeIDs: if tradeID in self.tradeIDs:
continue continue
self.tradeIDs.add(tradeID) self.tradeIDs.add(tradeID)
# 查询起始编号更新 # 查询起始编号更新
self.qryTradeID = max(tradeID, self.qryTradeID) self.qryTradeID = max(tradeID, self.qryTradeID)
# 推送数据 # 推送数据
trade = VtTradeData() trade = VtTradeData()
trade.gatewayName = self.gatewayName trade.gatewayName = self.gatewayName
trade.tradeID = str(tradeID) trade.tradeID = str(tradeID)
trade.vtTradeID = '.'.join([trade.tradeID, trade.gatewayName]) trade.vtTradeID = '.'.join([trade.tradeID, trade.gatewayName])
trade.symbol = d['symbol'] trade.symbol = d['symbol']
trade.exchange = EXCHANGE_HUOBI trade.exchange = EXCHANGE_HUOBI
trade.vtSymbol = '.'.join([trade.symbol, trade.exchange]) trade.vtSymbol = '.'.join([trade.symbol, trade.exchange])
if 'buy' in d['type']: if 'buy' in d['type']:
trade.direction = DIRECTION_LONG trade.direction = DIRECTION_LONG
else: else:
trade.direction = DIRECTION_SHORT trade.direction = DIRECTION_SHORT
trade.offset = OFFSET_NONE trade.offset = OFFSET_NONE
strOrderID = str(d['order-id']) strOrderID = str(d['order-id'])
localid = self.orderLocalDict.get(strOrderID, '') localid = self.orderLocalDict.get(strOrderID, '')
trade.orderID = localid trade.orderID = localid
trade.vtOrderID = '.'.join([trade.gatewayName, trade.orderID]) trade.vtOrderID = '.'.join([trade.gatewayName, trade.orderID])
trade.tradeID = str(tradeID) trade.tradeID = str(tradeID)
trade.vtTradeID = '.'.join([trade.gatewayName, trade.tradeID]) trade.vtTradeID = '.'.join([trade.gatewayName, trade.tradeID])
trade.price = float(d['price']) trade.price = float(d['price'])
trade.volume = float(d['filled-amount']) trade.volume = float(d['filled-amount'])
dt = datetime.fromtimestamp(d['created-at']/1000) dt = datetime.fromtimestamp(d['created-at']/1000)
trade.tradeTime = dt.strftime('%H:%M:%S') trade.tradeTime = dt.strftime('%H:%M:%S')
self.gateway.onTrade(trade) self.gateway.onTrade(trade)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onGetOrder(self, data, reqid): def onGetOrder(self, data, reqid):
"""查询单一委托回调""" """查询单一委托回调"""
orderID = data['id'] #orderID = data['id']
strOrderID = str(orderID) #strOrderID = str(orderID)
localid = self.orderLocalDict[strOrderID] #localid = self.orderLocalDict[strOrderID]
order = self.orderDict[orderID] #order = self.orderDict[orderID]
order.tradedVolume = float(data['field-amount']) #order.tradedVolume = float(data['field-amount'])
order.status = statusMapReverse.get(data['state'], STATUS_UNKNOWN) #order.status = statusMapReverse.get(data['state'], STATUS_UNKNOWN)
#if data['canceled-at']:
#order.cancelTime = datetime.fromtimestamp(data['canceled-at']/1000).strftime('%H:%M:%S')
## 完成的委托则从集合中移除
#if order.status in [STATUS_ALLTRADED, STATUS_CANCELLED]:
#self.activeOrderSet.remove(orderID)
#self.gateway.onOrder(order)
pass
if data['canceled-at']:
order.cancelTime = datetime.fromtimestamp(data['canceled-at']/1000).strftime('%H:%M:%S')
self.gateway.onOrder(order)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onGetMatchResult(self, data, reqid): def onGetMatchResult(self, data, reqid):
"""查询单一成交回调""" """查询单一成交回调"""
print reqid, data print reqid, data
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onPlaceOrder(self, data, reqid): def onPlaceOrder(self, data, reqid):
"""委托回调""" """委托回调"""
localid = self.reqLocalDict[reqid] localid = self.reqLocalDict[reqid]
self.localOrderDict[localid] = data self.localOrderDict[localid] = data
self.orderLocalDict[data] = localid self.orderLocalDict[data] = localid
#self.activeOrderSet.add(data)
if localid in self.cancelReqDict: if localid in self.cancelReqDict:
req = self.cancelReqDict[localid] req = self.cancelReqDict[localid]
self.cancelOrder(req) self.cancelOrder(req)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onCancelOrder(self, data, reqid): def onCancelOrder(self, data, reqid):
"""撤单回调""" """撤单回调"""
self.writeLog(u'委托撤单成功:%s' %data) self.writeLog(u'委托撤单成功:%s' %data)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onBatchCancel(self, data, reqid): def onBatchCancel(self, data, reqid):
"""批量撤单回调""" """批量撤单回调"""
print reqid, data print reqid, data