vnpy/trader/gateway/huobiGateway/huobiGateway.py
2017-06-11 20:55:26 +08:00

669 lines
24 KiB
Python

# encoding: UTF-8
'''
vn.huobi的gateway接入
'''
import os
import json
from datetime import datetime
from copy import copy
from threading import Condition
from Queue import Queue
from threading import Thread
import vnhuobi
from vtGateway import *
SYMBOL_BTCCNY = 'BTCCNY'
SYMBOL_LTCCNY = 'LTCCNY'
SYMBOL_BTCUSD = 'BTCUSD'
SYMBOL_MAP = {}
SYMBOL_MAP[(vnhuobi.COINTYPE_BTC, 'cny')] = SYMBOL_BTCCNY
SYMBOL_MAP[(vnhuobi.COINTYPE_LTC, 'cny')] = SYMBOL_LTCCNY
SYMBOL_MAP[(vnhuobi.COINTYPE_BTC, 'usd')] = SYMBOL_BTCUSD
SYMBOL_MAP_REVERSE = {v: k for k, v in SYMBOL_MAP.items()}
MDSYMBOL_MAP = {}
MDSYMBOL_MAP['btccny'] = SYMBOL_BTCCNY
MDSYMBOL_MAP['ltccny'] = SYMBOL_LTCCNY
MDSYMBOL_MAP['btcusd'] = SYMBOL_BTCUSD
DIRECTION_MAP = {}
DIRECTION_MAP[1] = DIRECTION_LONG
DIRECTION_MAP[2] = DIRECTION_SHORT
STATUS_MAP = {}
STATUS_MAP[0] = STATUS_NOTTRADED
STATUS_MAP[1] = STATUS_PARTTRADED
STATUS_MAP[2] = STATUS_ALLTRADED
STATUS_MAP[3] = STATUS_CANCELLED
STATUS_MAP[5] = STATUS_UNKNOWN
STATUS_MAP[7] = STATUS_UNKNOWN
########################################################################
class HuobiGateway(VtGateway):
"""火币接口"""
#----------------------------------------------------------------------
def __init__(self, eventEngine, gatewayName='HUOBI'):
"""Constructor"""
super(HuobiGateway, self).__init__(eventEngine, gatewayName)
self.market = 'cny'
self.tradeApi = HuobiTradeApi(self)
self.dataApi = HuobiDataApi(self)
#----------------------------------------------------------------------
def connect(self):
"""连接"""
# 载入json文件
fileName = self.gatewayName + '_connect.json'
path = os.path.abspath(os.path.dirname(__file__))
fileName = os.path.join(path, fileName)
try:
f = file(fileName)
except IOError:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'读取连接配置出错,请检查'
self.onLog(log)
return
# 解析json文件
setting = json.load(f)
try:
accessKey = str(setting['accessKey'])
secretKey = str(setting['secretKey'])
interval = setting['interval']
market = setting['market']
debug = setting['debug']
except KeyError:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'连接配置缺少字段,请检查'
self.onLog(log)
return
# 初始化接口
self.tradeApi.connect(accessKey, secretKey, market, debug)
self.writeLog(u'交易接口初始化成功')
self.dataApi.connect(interval, market, debug)
self.writeLog(u'行情接口初始化成功')
# 启动查询
self.initQuery()
self.startQuery()
#----------------------------------------------------------------------
def writeLog(self, content):
"""发出日志"""
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = content
self.onLog(log)
#----------------------------------------------------------------------
def subscribe(self, subscribeReq):
"""订阅行情,自动订阅全部行情,无需实现"""
pass
#----------------------------------------------------------------------
def sendOrder(self, orderReq):
"""发单"""
self.tradeApi.sendOrder(orderReq)
#----------------------------------------------------------------------
def cancelOrder(self, cancelOrderReq):
"""撤单"""
self.tradeApi.cancel(cancelOrderReq)
#----------------------------------------------------------------------
def qryAccount(self):
"""查询账户资金"""
pass
#----------------------------------------------------------------------
def qryPosition(self):
"""查询持仓"""
pass
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.tradeApi.exit()
self.dataApi.exit()
#----------------------------------------------------------------------
def initQuery(self):
"""初始化连续查询"""
if self.qryEnabled:
self.qryFunctionList = [self.tradeApi.queryWorkingOrders, self.tradeApi.queryAccount]
self.startQuery()
#----------------------------------------------------------------------
def query(self, event):
"""注册到事件处理引擎上的查询函数"""
for function in self.qryFunctionList:
function()
#----------------------------------------------------------------------
def startQuery(self):
"""启动连续查询"""
self.eventEngine.register(EVENT_TIMER, self.query)
#----------------------------------------------------------------------
def setQryEnabled(self, qryEnabled):
"""设置是否要启动循环查询"""
self.qryEnabled = qryEnabled
########################################################################
class HuobiTradeApi(vnhuobi.TradeApi):
"""交易接口"""
#----------------------------------------------------------------------
def __init__(self, gateway):
"""Constructor"""
super(HuobiTradeApi, self).__init__()
self.gateway = gateway
self.gatewayName = gateway.gatewayName
self.localID = 0 # 本地委托号
self.localSystemDict = {} # key:localID, value:systemID
self.systemLocalDict = {} # key:systemID, value:localID
self.workingOrderDict = {} # key:localID, value:order
self.reqLocalDict = {} # key:reqID, value:localID
self.cancelDict = {} # key:localID, value:cancelOrderReq
self.tradeID = 0 # 本地成交号
#----------------------------------------------------------------------
def onError(self, error, req, reqID):
"""错误推送"""
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorMsg = str(error)
err.errorTime = datetime.now().strftime('%H:%M:%S')
self.gateway.onError(err)
#----------------------------------------------------------------------
def onGetAccountInfo(self, data, req, reqID):
"""查询账户回调"""
# 推送账户数据
account = VtAccountData()
account.gatewayName = self.gatewayName
account.accountID = 'HUOBI'
account.vtAccountID = '.'.join([account.accountID, self.gatewayName])
account.balance = data['net_asset']
self.gateway.onAccount(account)
# 推送持仓数据
if self.market == 'cny':
posCny = VtPositionData()
posCny.gatewayName = self.gatewayName
posCny.symbol = 'CNY'
posCny.exchange = EXCHANGE_HUOBI
posCny.vtSymbol = '.'.join([posCny.symbol, posCny.exchange])
posCny.vtPositionName = posCny.vtSymbol
posCny.position = data['available_cny_display']
posCny.frozen = data['frozen_cny_display']
self.gateway.onPosition(posCny)
posLtc = VtPositionData()
posLtc.gatewayName = self.gatewayName
posLtc.symbol = 'LTC'
posLtc.exchange = EXCHANGE_HUOBI
posLtc.vtSymbol = '.'.join([posLtc.symbol, posLtc.exchange])
posLtc.vtPositionName = posLtc.vtSymbol
posLtc.position = data['available_ltc_display']
posLtc.frozen = data['frozen_ltc_display']
self.gateway.onPosition(posLtc)
else:
posUsd = VtPositionData()
posUsd.gatewayName = self.gatewayName
posUsd.symbol = 'USD'
posUsd.exchange = EXCHANGE_HUOBI
posUsd.vtSymbol = '.'.join([posUsd.symbol, posUsd.exchange])
posUsd.vtPositionName = posUsd.vtSymbol
posUsd.position = data['available_usd_display']
posUsd.frozen = data['frozen_usd_display']
self.gateway.onPosition(posUsd)
posBtc = VtPositionData()
posBtc.gatewayName = self.gatewayName
posBtc.symbol = 'BTC'
posBtc.exchange = EXCHANGE_HUOBI
posBtc.vtSymbol = '.'.join([posBtc.symbol, posBtc.exchange])
posBtc.vtPositionName = posBtc.vtSymbol
posBtc.position = data['available_btc_display']
posBtc.frozen = data['frozen_btc_display']
self.gateway.onPosition(posBtc)
#----------------------------------------------------------------------
def onGetOrders(self, data, req, reqID):
"""查询委托回调"""
for d in data:
order = VtOrderData()
order.gatewayName = self.gatewayName
# 合约代码
params = req['params']
coin = params['coin_type']
order.symbol = SYMBOL_MAP[(coin, self.market)]
order.exchange = EXCHANGE_HUOBI
order.vtSymbol = '.'.join([order.symbol, order.exchange])
# 委托号
systemID = d['id']
self.localID += 1
localID = str(self.localID)
self.systemLocalDict[systemID] = localID
self.localSystemDict[localID] = systemID
order.orderID = localID
order.vtOrderID = '.'.join([order.orderID, order.gatewayName])
# 其他信息
order.direction = DIRECTION_MAP[d['type']]
order.offset = OFFSET_NONE
order.price = float(d['order_price'])
order.totalVolume = float(d['order_amount'])
order.tradedVolume = float(d['processed_amount'])
order.orderTime = d['order_time']
# 委托状态
if order.tradedVolume == 0:
order.status = STATUS_NOTTRADED
else:
order.status = STATUS_PARTTRADED
# 缓存病推送
self.workingOrderDict[localID] = order
self.gateway.onOrder(order)
#----------------------------------------------------------------------
def onOrderInfo(self, data, req, reqID):
"""委托详情回调"""
systemID = data['id']
localID = self.systemLocalDict[systemID]
order = self.workingOrderDict.get(localID, None)
if not order:
return
# 记录最新成交的金额
newTradeVolume = float(data['processed_amount']) - order.tradedVolume
if newTradeVolume:
trade = VtTradeData()
trade.gatewayName = self.gatewayName
trade.symbol = order.symbol
trade.vtSymbol = order.vtSymbol
self.tradeID += 1
trade.tradeID = str(self.tradeID)
trade.vtTradeID = '.'.join([trade.tradeID, trade.gatewayName])
trade.volume = newTradeVolume
trade.price = data['processed_price']
trade.direction = order.direction
trade.offset = order.offset
trade.exchange = order.exchange
trade.tradeTime = datetime.now().strftime('%H:%M:%S')
self.gateway.onTrade(trade)
# 更新委托状态
order.tradedVolume = float(data['processed_amount'])
order.status = STATUS_MAP.get(data['status'], STATUS_UNKNOWN)
if newTradeVolume:
self.gateway.onOrder(order)
if order.status == STATUS_ALLTRADED or order.status == STATUS_CANCELLED:
del self.workingOrderDict[order.orderID]
#----------------------------------------------------------------------
def onBuy(self, data, req, reqID):
"""买入回调"""
localID = self.reqLocalDict[reqID]
systemID = data['id']
self.localSystemDict[localID] = systemID
self.systemLocalDict[systemID] = localID
# 撤单
if localID in self.cancelDict:
req = self.cancelDict[localID]
self.cancel(req)
del self.cancelDict[localID]
# 推送委托信息
order = self.workingOrderDict[localID]
if data['result'] == 'success':
order.status = STATUS_NOTTRADED
self.gateway.onOrder(order)
#----------------------------------------------------------------------
def onSell(self, data, req, reqID):
"""卖出回调"""
localID = self.reqLocalDict[reqID]
systemID = data['id']
self.localSystemDict[localID] = systemID
self.systemLocalDict[systemID] = localID
# 撤单
if localID in self.cancelDict:
req = self.cancelDict[localID]
self.cancel(req)
del self.cancelDict[localID]
# 推送委托信息
order = self.workingOrderDict[localID]
if data['result'] == 'success':
order.status = STATUS_NOTTRADED
self.gateway.onOrder(order)
#----------------------------------------------------------------------
def onBuyMarket(self, data, req, reqID):
"""市价买入回调"""
print data
#----------------------------------------------------------------------
def onSellMarket(self, data, req, reqID):
"""市价卖出回调"""
print data
#----------------------------------------------------------------------
def onCancelOrder(self, data, req, reqID):
"""撤单回调"""
if data['result'] == 'success':
systemID = req['params']['id']
localID = self.systemLocalDict[systemID]
order = self.workingOrderDict[localID]
order.status = STATUS_CANCELLED
del self.workingOrderDict[localID]
self.gateway.onOrder(order)
#----------------------------------------------------------------------
def onGetNewDealOrders(self, data, req, reqID):
"""查询最新成交回调"""
print data
#----------------------------------------------------------------------
def onGetOrderIdByTradeId(self, data, req, reqID):
"""通过成交编号查询委托编号回调"""
print data
#----------------------------------------------------------------------
def onWithdrawCoin(self, data, req, reqID):
"""提币回调"""
print data
#----------------------------------------------------------------------
def onCancelWithdrawCoin(self, data, req, reqID):
"""取消提币回调"""
print data
#----------------------------------------------------------------------
def onGetWithdrawCoinResult(self, data, req, reqID):
"""查询提币结果回调"""
print data
#----------------------------------------------------------------------
def onTransfer(self, data, req, reqID):
"""转账回调"""
print data
#----------------------------------------------------------------------
def onLoan(self, data, req, reqID):
"""申请杠杆回调"""
print data
#----------------------------------------------------------------------
def onRepayment(self, data, req, reqID):
"""归还杠杆回调"""
print data
#----------------------------------------------------------------------
def onLoanAvailable(self, data, req, reqID):
"""查询杠杆额度回调"""
print data
#----------------------------------------------------------------------
def onGetLoans(self, data, req, reqID):
"""查询杠杆列表"""
print data
#----------------------------------------------------------------------
def connect(self, accessKey, secretKey, market, debug=False):
"""连接服务器"""
self.market = market
self.DEBUG = debug
self.init(accessKey, secretKey)
# 查询未成交委托
self.getOrders(vnhuobi.COINTYPE_BTC, self.market)
if self.market == vnhuobi.MARKETTYPE_CNY:
# 只有人民币市场才有莱特币
self.getOrders(vnhuobi.COINTYPE_LTC, self.market)
# ----------------------------------------------------------------------
def queryWorkingOrders(self):
"""查询活动委托状态"""
for order in self.workingOrderDict.values():
# 如果尚未返回委托号,则无法查询
if order.orderID in self.localSystemDict:
systemID = self.localSystemDict[order.orderID]
coin, market = SYMBOL_MAP_REVERSE[order.symbol]
self.orderInfo(systemID, coin, market)
# ----------------------------------------------------------------------
def queryAccount(self):
"""查询活动委托状态"""
self.getAccountInfo(self.market)
# ----------------------------------------------------------------------
def sendOrder(self, req):
"""发送委托"""
# 检查是否填入了价格,禁止市价委托
if req.priceType != PRICETYPE_LIMITPRICE:
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorMsg = u'火币接口仅支持限价单'
err.errorTime = datetime.now().strftime('%H:%M:%S')
self.gateway.onError(err)
return None
# 发送限价委托
coin, market = SYMBOL_MAP_REVERSE[req.symbol]
if req.direction == DIRECTION_LONG:
reqID = self.buy(req.price, req.volume, coinType=coin, market=self.market)
else:
reqID = self.sell(req.price, req.volume, coinType=coin, market=self.market)
self.localID += 1
localID = str(self.localID)
self.reqLocalDict[reqID] = localID
# 推送委托信息
order = VtOrderData()
order.gatewayName = self.gatewayName
order.symbol = req.symbol
order.exchange = EXCHANGE_HUOBI
order.vtSymbol = '.'.join([order.symbol, order.exchange])
order.orderID = localID
order.vtOrderID = '.'.join([order.orderID, order.gatewayName])
order.direction = req.direction
order.offset = OFFSET_UNKNOWN
order.price = req.price
order.volume = req.volume
order.orderTime = datetime.now().strftime('%H:%M:%S')
order.status = STATUS_UNKNOWN
self.workingOrderDict[localID] = order
self.gateway.onOrder(order)
# 返回委托号
return order.vtOrderID
# ----------------------------------------------------------------------
def cancel(self, req):
"""撤单"""
localID = req.orderID
if localID in self.localSystemDict:
systemID = self.localSystemDict[localID]
coin, market = SYMBOL_MAP_REVERSE[req.symbol]
self.cancelOrder(systemID, coin, self.market)
else:
self.cancelDict[localID] = req
########################################################################
class HuobiDataApi(vnhuobi.DataApi):
"""行情接口"""
#----------------------------------------------------------------------
def __init__(self, gateway):
"""Constructor"""
super(HuobiDataApi, self).__init__()
self.market = 'cny'
self.gateway = gateway
self.gatewayName = gateway.gatewayName
self.tickDict = {} # key:symbol, value:tick
#----------------------------------------------------------------------
def onTick(self, data):
"""实时成交推送"""
print data
#----------------------------------------------------------------------
def onQuote(self, data):
"""实时报价推送"""
ticker = data['ticker']
symbol = MDSYMBOL_MAP[ticker['symbol']]
if symbol not in self.tickDict:
tick = VtTickData()
tick.gatewayName = self.gatewayName
tick.symbol = symbol
tick.exchange = EXCHANGE_HUOBI
tick.vtSymbol = '.'.join([tick.symbol, tick.exchange])
self.tickDict[symbol] = tick
else:
tick = self.tickDict[symbol]
tick.highPrice = ticker['high']
tick.lowPrice = ticker['low']
tick.lastPrice = ticker['last']
tick.volume = ticker['vol']
#----------------------------------------------------------------------
def onDepth(self, data):
"""实时深度推送"""
symbol = MDSYMBOL_MAP[data['symbol']]
if symbol not in self.tickDict:
tick = VtTickData()
tick.gatewayName = self.gatewayName
tick.symbol = symbol
tick.exchange = EXCHANGE_HUOBI
tick.vtSymbol = '.'.join([tick.symbol, tick.exchange])
self.tickDict[symbol] = tick
else:
tick = self.tickDict[symbol]
tick.bidPrice1, tick.bidVolume1 = data['bids'][0]
tick.bidPrice2, tick.bidVolume2 = data['bids'][1]
tick.bidPrice3, tick.bidVolume3 = data['bids'][2]
tick.bidPrice4, tick.bidVolume4 = data['bids'][3]
tick.bidPrice5, tick.bidVolume5 = data['bids'][4]
tick.askPrice1, tick.askVolume1 = data['asks'][0]
tick.askPrice2, tick.askVolume2 = data['asks'][1]
tick.askPrice3, tick.askVolume3 = data['asks'][2]
tick.askPrice4, tick.askVolume4 = data['asks'][3]
tick.askPrice5, tick.askVolume5 = data['asks'][4]
now = datetime.now()
tick.time = now.strftime('%H:%M:%S')
tick.date = now.strftime('%Y%m%d')
self.gateway.onTick(tick)
#----------------------------------------------------------------------
def connect(self, interval, market, debug=False):
"""连接服务器"""
self.market = market
self.init(interval, debug)
# 订阅行情并推送合约信息
if self.market == vnhuobi.MARKETTYPE_CNY:
self.subscribeQuote(vnhuobi.SYMBOL_BTCCNY)
self.subscribeQuote(vnhuobi.SYMBOL_LTCCNY)
self.subscribeDepth(vnhuobi.SYMBOL_BTCCNY, 5)
self.subscribeDepth(vnhuobi.SYMBOL_LTCCNY, 5)
contract = VtContractData()
contract.gatewayName = self.gatewayName
contract.symbol = SYMBOL_BTCCNY
contract.exchange = EXCHANGE_HUOBI
contract.vtSymbol = '.'.join([contract.symbol, contract.exchange])
contract.name = u'人民币现货BTC'
contract.size = 1
contract.priceTick = 0.01
contract.productClass = PRODUCT_SPOT
self.gateway.onContract(contract)
contract = VtContractData()
contract.gatewayName = self.gatewayName
contract.symbol = SYMBOL_LTCCNY
contract.exchange = EXCHANGE_HUOBI
contract.vtSymbol = '.'.join([contract.symbol, contract.exchange])
contract.name = u'人民币现货LTC'
contract.size = 1
contract.priceTick = 0.01
contract.productClass = PRODUCT_SPOT
self.gateway.onContract(contract)
else:
self.subscribeQuote(vnhuobi.SYMBOL_BTCUSD)
self.subscribeDepth(vnhuobi.SYMBOL_BTCUSD)
contract = VtContractData()
contract.gatewayName = self.gatewayName
contract.symbol = SYMBOL_BTCUSD
contract.exchange = EXCHANGE_HUOBI
contract.vtSymbol = '.'.join([contract.symbol, contract.exchange])
contract.name = u'美元现货BTC'
contract.size = 1
contract.priceTick = 0.01
contract.productClass = PRODUCT_SPOT
self.gateway.onContract(contract)