vnpy/vn.trader/ksgoldGateway/ksgoldGateway.py

794 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# encoding: UTF-8
'''
vn.ksgold的gateway接入
金仕达黄金接口在用户登录后并不提供之前的Order和Trade数据主动推送
而是需要用户自行查询因此API里做了初始化后的查询设计。
'''
import os
import json
import time
from vnksgoldtd import TdApi
from ksgoldDataType import *
from vtGateway import *
# 以下类型映射参考的是原生API里的Constant.h
# 方向类型映射
directionMap = {}
directionMap[DIRECTION_LONG] = '0'
directionMap[DIRECTION_SHORT] = '1'
directionMapReverse = {v: k for k, v in directionMap.items()}
# 开平类型映射
offsetMap = {}
offsetMap[OFFSET_OPEN] = '0'
offsetMap[OFFSET_CLOSE] = '1'
offsetMapReverse = {v:k for k,v in offsetMap.items()}
########################################################################
class KsgoldGateway(VtGateway):
"""金仕达黄金接口"""
#----------------------------------------------------------------------
def __init__(self, eventEngine, gatewayName='KSGOLD'):
"""Constructor"""
super(KsgoldGateway, self).__init__(eventEngine, gatewayName)
self.tdApi = KsgoldTdApi(self) # 黄金接口只有交易API
self.tdConnected = False # 交易API连接状态
self.qryEnabled = False # 是否要启动循环查询
self.orderInited = False # 委托初始化查询
self.tradeInited = False # 成交初始化查询
#----------------------------------------------------------------------
def connect(self):
"""连接"""
# 载入json文件
fileName = self.gatewayName + '_connect.json'
fileName = os.getcwd() + '\\ksgoldGateway\\' + fileName
try:
f = file(fileName)
except IOError:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'读取连接配置出错,请检查'
self.onLog(log)
return
# 解析json文件
setting = json.load(f)
try:
accountID = str(setting['accountID'])
password = str(setting['password'])
address = str(setting['address'])
except KeyError:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'连接配置缺少字段,请检查'
self.onLog(log)
return
# 创建行情和交易接口对象
self.tdApi.connect(accountID, password, address)
# 初始化并启动查询
self.initQuery()
#----------------------------------------------------------------------
def subscribe(self, subscribeReq):
"""订阅行情"""
self.tdApi.subscribe(subscribeReq)
#----------------------------------------------------------------------
def sendOrder(self, orderReq):
"""发单"""
return self.tdApi.sendOrder(orderReq)
#----------------------------------------------------------------------
def cancelOrder(self, cancelOrderReq):
"""撤单"""
self.tdApi.cancelOrder(cancelOrderReq)
#----------------------------------------------------------------------
def qryAccount(self):
"""查询账户资金"""
self.tdApi.qryAccount()
#----------------------------------------------------------------------
def qryPosition(self):
"""查询持仓"""
self.tdApi.qryPosition()
#----------------------------------------------------------------------
def close(self):
"""关闭"""
if self.mdConnected:
self.mdApi.close()
if self.tdConnected:
self.tdApi.close()
#----------------------------------------------------------------------
def initQuery(self):
"""初始化连续查询"""
# 需要循环的查询函数列表
self.qryFunctionList = [self.qryAccount, self.qryPosition]
self.qryCount = 0 # 查询触发倒计时
self.qryTrigger = 2 # 查询触发点
self.qryNextFunction = 0 # 上次运行的查询函数索引
#----------------------------------------------------------------------
def query(self, event):
"""注册到事件处理引擎上的查询函数"""
self.qryCount += 1
if self.qryCount > self.qryTrigger:
# 清空倒计时
self.qryCount = 0
# 如果尚未完成委托查询则先查询委托
if not self.orderInited:
self.tdApi.getOrder()
# 然后如果未完成成交查询则再查询成交
elif not self.tradeInited:
self.tdApi.getTrade()
else:
# 执行查询函数
function = self.qryFunctionList[self.qryNextFunction]
function()
# 计算下次查询函数的索引如果超过了列表长度则重新设为0
self.qryNextFunction += 1
if self.qryNextFunction == len(self.qryFunctionList):
self.qryNextFunction = 0
#----------------------------------------------------------------------
def startQuery(self):
"""启动连续查询"""
if self.qryEnabled:
self.eventEngine.register(EVENT_TIMER, self.query)
#----------------------------------------------------------------------
def setQryEnabled(self, qryEnabled):
"""设置是否要启动循环查询"""
self.qryEnabled = qryEnabled
########################################################################
class KsgoldTdApi(TdApi):
"""金仕达黄金交易API实现"""
#----------------------------------------------------------------------
def __init__(self, gateway):
"""API对象的初始化函数"""
super(KsgoldTdApi, self).__init__()
self.gateway = gateway # gateway对象
self.gatewayName = gateway.gatewayName # gateway对象名称
self.reqID = EMPTY_INT # 操作请求编号
self.orderRef = EMPTY_INT # 订单编号
self.connectionStatus = False # 连接状态
self.loginStatus = False # 登录状态
self.accountID = EMPTY_STRING # 账号
self.password = EMPTY_STRING # 密码
self.memberID = EMPTY_STRING # 会员代码(应该是银行)
self.address = EMPTY_STRING # 服务器地址
self.seatID = EMPTY_STRING; # 席位号
self.tradeCode = EMPTY_STRING # 交易编码
self.subscribedSymbols = set() # 已订阅合约代码
self.orderDict = {} # 委托字典用于缓存委托对象key为整数
# 循环登录相关
self.loginThread = None
#----------------------------------------------------------------------
def onFrontConnected(self):
"""服务器连接"""
self.connectionStatus = True
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'交易服务器连接成功'
self.gateway.onLog(log)
self.login()
#----------------------------------------------------------------------
def onFrontDisconnected(self, n):
"""服务器断开"""
self.connectionStatus = False
self.loginStatus = False
self.gateway.tdConnected = False
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'交易服务器连接断开'
self.gateway.onLog(log)
#----------------------------------------------------------------------
def onRspUserLogin(self, data, error, n, last):
"""登陆回报"""
# 如果登录成功,推送日志信息
if error['ErrorID'] == 0:
self.seatID = str(data['SeatNo'])
self.tradeCode = str(data['tradeCode'])
self.loginStatus = True
self.gateway.tdConnected = True
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'交易服务器登录完成'
self.gateway.onLog(log)
# 重新订阅之前订阅的合约
for subscribeReq in self.subscribedSymbols:
self.subscribe(subscribeReq)
# 查询合约信息
self.reqID += 1
self.reqQryInstrument({}, self.reqID)
# 启动查询
self.gateway.startQuery()
# 否则,推送错误信息
else:
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = error['ErrorID']
err.errorMsg = error['ErrorMsg'].decode('gbk')
self.gateway.onError(err)
if error['ErrorID'] == -8:
self.login()
#----------------------------------------------------------------------
def onRspUserLogout(self, data, error, n, last):
"""登出回报"""
# 如果登出成功,推送日志信息
if error['ErrorID'] == 0:
self.loginStatus = False
self.gateway.tdConnected = False
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'交易服务器登出完成'
self.gateway.onLog(log)
# 否则,推送错误信息
else:
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = error['ErrorID']
err.errorMsg = error['ErrorMsg'].decode('gbk')
self.gateway.onError(err)
#----------------------------------------------------------------------
def onNtyMktStatus(self, data, error, n, last):
"""更新市场状态"""
pass
#----------------------------------------------------------------------
def onRspOrderInsert(self, data, error, n, last):
"""发单错误(柜台)"""
if error['ErrorID']:
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = error['ErrorID']
err.errorMsg = error['ErrorMsg'].decode('gbk')
self.gateway.onError(err)
#----------------------------------------------------------------------
def onRspOrderAction(self, data, error, n, last):
"""撤单错误(柜台)"""
if error['ErrorID']:
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = error['ErrorID']
err.errorMsg = error['ErrorMsg'].decode('gbk')
self.gateway.onError(err)
#----------------------------------------------------------------------
def onRspQryOrder(self, data, error, n, last):
""""""
if not self.gateway.orderInited:
self.gateway.orderInited = True
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'委托查询完成'
self.gateway.onLog(log)
# 更新最大报单编号
newref = data['localOrderNo']
self.orderRef = max(self.orderRef, int(newref))
# 创建报单数据对象
order = VtOrderData()
order.gatewayName = self.gatewayName
# 保存代码和报单号
order.symbol = data['instID']
order.exchange = EXCHANGE_SGE
order.vtSymbol = order.symbol
order.orderID = data['localOrderNo']
# 方向
if data['buyOrSell'] == '0':
order.direction = DIRECTION_LONG
elif data['buyOrSell'] == '1':
order.direction = DIRECTION_SHORT
else:
order.direction = DIRECTION_UNKNOWN
# 开平
if data['offsetFlag'] == '0':
order.offset = OFFSET_OPEN
elif data['offsetFlag'] == '1':
order.offset = OFFSET_CLOSE
else:
order.offset = OFFSET_UNKNOWN
# 状态
if data['status'] == '5':
order.status = STATUS_ALLTRADED
elif data['status'] == '6':
order.status = STATUS_PARTTRADED
elif data['status'] == '3':
order.status = STATUS_NOTTRADED
elif data['status'] == '4' or data['status'] == '7': # 全部撤销和部成部撤
order.status = STATUS_CANCELLED
else:
order.status = STATUS_UNKNOWN
# CTP的报单号一致性维护需要基于frontID, sessionID, orderID三个字段
# 但在本接口设计中已经考虑了CTP的OrderRef的自增性避免重复
# 唯一可能出现OrderRef重复的情况是多处登录并在非常接近的时间内几乎同时发单
# 考虑到VtTrader的应用场景认为以上情况不会构成问题
order.vtOrderID = '.'.join([self.gatewayName, order.orderID])
# 价格、报单量等数值
order.price = data['price']
order.totalVolume = data['amount']
order.tradedVolume = data['matchQty']
order.orderTime = data['entrustTime']
order.cancelTime = data['cancelTime']
# 推送
self.gateway.onOrder(order)
#----------------------------------------------------------------------
def onRspQryTrade(self, data, error, n, last):
""""""
if not self.gateway.tradeInited:
self.gateway.tradeInited = True
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'成交查询完成'
self.gateway.onLog(log)
# 创建报单数据对象
trade = VtTradeData()
trade.gatewayName = self.gatewayName
# 保存代码和报单号
trade.symbol = data['instID']
trade.exchange = EXCHANGE_SGE
trade.vtSymbol = trade.symbol
trade.tradeID = data['matchNo']
trade.vtTradeID = '.'.join([self.gatewayName, trade.tradeID])
trade.orderID = data['localOrderNo']
trade.vtOrderID = '.'.join([self.gatewayName, trade.orderID])
# 方向
trade.direction = directionMapReverse.get(data['buyOrSell'], '')
# 开平
trade.offset = offsetMapReverse.get(data['offSetFlag'], '')
# 价格、报单量等数值
trade.price = data['price']
trade.volume = data['volume']
trade.tradeTime = data['matchTime']
# 推送
self.gateway.onTrade(trade)
#----------------------------------------------------------------------
def onRspQryInvestorPosition(self, data, error, n, last):
"""持仓查询回报"""
# 过滤一些空的数据
if data['instID']:
# 黄金API推送的持仓数据把多空仓放在了一条中需要分离
longPos = VtPositionData()
longPos.gatewayName = self.gatewayName
longPos.symbol = data['instID']
longPos.vtSymbol = longPos.symbol
longPos.direction = DIRECTION_LONG
longPos.frozen = data['longPosiFrozen']
longPos.position = data['longPosiVol']
longPos.ydPosition = data['lastLong']
longPos.price = data['longPosiAvgPrice']
longPos.vtPositionName = '.'.join([longPos.vtSymbol, longPos.direction])
self.gateway.onPosition(longPos)
shortPos = VtPositionData()
shortPos.gatewayName = self.gatewayName
shortPos.symbol = data['instID']
shortPos.vtSymbol = shortPos.symbol
shortPos.direction = DIRECTION_SHORT
shortPos.frozen = data['shortPosiFrozen']
shortPos.position = data['shortPosiVol']
shortPos.ydPosition = data['lastShort']
shortPos.price = data['shortPosiAvgPrice']
shortPos.vtPositionName = '.'.join([shortPos.vtSymbol, shortPos.direction])
self.gateway.onPosition(shortPos)
#----------------------------------------------------------------------
def onRspQryTradingAccount(self, data, error, n, last):
"""资金账户查询回报"""
account = VtAccountData()
account.gatewayName = self.gatewayName
# 账户代码
account.accountID = 'sge_account' # API不提供该字段自行定义
account.vtAccountID = '.'.join([self.gatewayName, account.accountID])
# 数值相关
account.available = data['availCap']
account.commission = data['totalFee']
account.margin = data['posiMargin']
# 推送
self.gateway.onAccount(account)
#----------------------------------------------------------------------
def onRspQryInstrument(self, data, error, n, last):
"""合约查询回报"""
contract = VtContractData()
contract.gatewayName = self.gatewayName
contract.symbol = data['instID']
contract.exchange = EXCHANGE_SGE # 交易所只有金交所
# 合约类型分为现货、延期、未知
if data['marketID'] == '00':
contract.productClass = PRODUCT_SPOT
elif data['marketID'] == '10':
contract.productClass = PRODUCT_DEFER
else:
contract.productclass = PRODUCT_UNKNOWN
contract.vtSymbol = contract.symbol
contract.name = data['name'].decode('GBK')
# 合约数值
contract.size = data['unit']
contract.priceTick = data['tick']
# 推送考虑到只对T+D类带杠杆的产品感兴趣API里默认过滤了现货
if data['marketID'] == '10':
self.gateway.onContract(contract)
if last:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'交易合约信息获取完成'
self.gateway.onLog(log)
#----------------------------------------------------------------------
def onRtnDepthMarketData(self, data):
"""行情推送"""
tick = VtTickData()
tick.gatewayName = self.gatewayName
tick.symbol = data['InstID']
tick.vtSymbol = tick.symbol
tick.lastPrice = data['Last']
tick.volume = data['Volume']
tick.openInterest = data['OpenInt']
tick.time = data['QuoteTime']
tick.date = data['QuoteDate']
tick.openPrice = data['Open']
tick.highPrice = data['High']
tick.lowPrice = data['Low']
tick.preClosePrice = data['PreClose']
tick.upperLimit = data['highLimit']
tick.lowerLimit = data['lowLimit']
# CTP只有一档行情
tick.bidPrice1 = data['Bid1']
tick.bidPrice2 = data['Bid2']
tick.bidPrice3 = data['Bid3']
tick.bidPrice4 = data['Bid4']
tick.bidPrice5 = data['Bid5']
tick.askPrice1 = data['Ask1']
tick.askPrice2 = data['Ask2']
tick.askPrice3 = data['Ask3']
tick.askPrice4 = data['Ask4']
tick.askPrice5 = data['Ask5']
tick.bidVolume1 = data['BidLot1']
tick.bidVolume2 = data['BidLot2']
tick.bidVolume3 = data['BidLot3']
tick.bidVolume4 = data['BidLot4']
tick.bidVolume5 = data['BidLot5']
tick.askVolume1 = data['AskLot1']
tick.askVolume2 = data['AskLot2']
tick.askVolume3 = data['AskLot3']
tick.askVolume4 = data['AskLot4']
tick.askVolume5 = data['AskLot5']
self.gateway.onTick(tick)
#----------------------------------------------------------------------
def onRtnOrder(self, data):
"""报单回报"""
# 更新最大报单编号
newref = data['localOrderNo']
self.orderRef = max(self.orderRef, int(newref))
# 创建报单数据对象
order = self.orderDict.get(int(newref), VtOrderData())
order.gatewayName = self.gatewayName
order.exchange = EXCHANGE_SGE
order.orderID = data['localOrderNo']
# 状态
if data['status'] == '5':
order.status = STATUS_ALLTRADED
elif data['status'] == '6':
order.status = STATUS_PARTTRADED
elif data['status'] == '3':
order.status = STATUS_NOTTRADED
elif data['status'] == '4' or data['status'] == '7': # 全部撤销和部成部撤
order.status = STATUS_CANCELLED
else:
order.status = STATUS_UNKNOWN
# 推送
self.gateway.onOrder(order)
#----------------------------------------------------------------------
def onForceLogout(self, data):
"""强制登出推送"""
self.gateway.onLog(u'被强制登出')
#----------------------------------------------------------------------
def onRtnTrade(self, data):
"""成交回报"""
# 创建报单数据对象
trade = VtTradeData()
trade.gatewayName = self.gatewayName
# 保存代码和报单号
trade.symbol = data['instID']
trade.exchange = EXCHANGE_SGE
trade.vtSymbol = trade.symbol
trade.tradeID = data['matchNo']
trade.vtTradeID = '.'.join([self.gatewayName, trade.tradeID])
trade.orderID = data['localOrderNo']
trade.vtOrderID = '.'.join([self.gatewayName, trade.orderID])
# 方向
trade.direction = directionMapReverse.get(data['buyOrSell'], '')
# 开平
trade.offset = offsetMapReverse.get(data['offSetFlag'], '')
# 价格、报单量等数值
trade.price = data['price']
trade.volume = data['volume']
trade.tradeTime = data['matchTime']
# 推送
self.gateway.onTrade(trade)
# 计算还原Order状态并推送
order = self.orderDict.get(int(trade.orderID), None)
if order:
order.tradedVolume = order.tradedVolume + trade.volume
if order.tradedVolume == order.totalVolume:
order.status = STATUS_ALLTRADED
else:
order.status = STATUS_PARTTRADED
self.gateway.onOrder(order)
#----------------------------------------------------------------------
def onErrRtnOrderInsert(self, data, error):
"""发单错误回报(交易所)"""
if error['ErrorID']:
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = error['ErrorID']
err.errorMsg = error['ErrorMsg'].decode('gbk')
self.gateway.onError(err)
#----------------------------------------------------------------------
def onErrRtnOrderAction(self, data, error):
"""撤单错误回报(交易所)"""
if error['ErrorID']:
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = error['ErrorID']
err.errorMsg = error['ErrorMsg'].decode('gbk')
self.gateway.onError(err)
#----------------------------------------------------------------------
def onRspQryClientStorage(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspSubMarketData(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def connect(self, accountID, password, address):
"""初始化连接"""
self.accountID = accountID
self.password = password
self.address = address
# 如果尚未建立服务器连接,则进行连接
if not self.connectionStatus:
self.createGoldTraderApi()
# 注册服务器地址
self.registerFront(self.address)
# 初始化连接成功会调用onFrontConnected
self.init(False)
self.connectionStatus = True
# 金仕达接口第一次连接不会调用onFrontConnected只有断线重连才会直接登录
self.login()
# 若已经连接但尚未登录,则进行登录
else:
if not self.loginStatus:
self.login()
#----------------------------------------------------------------------
def login(self):
"""连接服务器"""
# 如果填入了用户名密码等,则登录
if self.accountID and self.password:
req = {}
req['accountID'] = self.accountID
req['password'] = self.password
req['loginType'] = 1
self.reqID += 1
self.reqUserLogin(req, self.reqID)
#----------------------------------------------------------------------
def qryAccount(self):
"""查询账户"""
self.reqID += 1
self.reqQryTradingAccount({}, self.reqID)
#----------------------------------------------------------------------
def qryPosition(self):
"""查询持仓"""
self.reqID += 1
self.reqQryInvestorPosition({}, self.reqID)
#----------------------------------------------------------------------
def getTrade(self):
"""查询成交"""
self.reqID += 1
self.reqQryTrade({}, self.reqID)
#----------------------------------------------------------------------
def getOrder(self):
"""查询委托"""
self.reqID += 1
self.reqQryOrder({}, self.reqID)
#----------------------------------------------------------------------
def sendOrder(self, orderReq):
"""发单"""
self.reqID += 1
self.orderRef += 1
req = {}
req['instID'] = orderReq.symbol
req['marketID'] = '10'
req['price'] = orderReq.price
req['amount'] = orderReq.volume
# 下面如果由于传入的类型本接口不支持,则会返回空字符串
try:
req['buyOrSell'] = directionMap[orderReq.direction]
req['offsetFlag'] = offsetMap[orderReq.offset]
except KeyError:
return ''
strOrderID = generateStrLocalID(self.orderRef)
req['LocalOrderNo'] = strOrderID
req['seatID'] = self.seatID
req['tradeCode'] = self.tradeCode
self.reqOrderInsert(req, self.reqID)
# 返回订单号(字符串),便于某些算法进行动态管理
vtOrderID = '.'.join([self.gatewayName, strOrderID])
# 保存请求到本地,用于在收到回报时查询
# 金仕达黄金API的onRtnOrder只推送很有限的几个数据因此整个数据需要自己还原
order = VtOrderData()
order.gatewayName = self.gatewayName
order.symbol = orderReq.symbol
order.vtSymbol = orderReq.symbol
order.exchange = EXCHANGE_SGE
order.orderID = strOrderID
order.vtOrderID = strOrderID
order.direction = orderReq.direction
order.offset = orderReq.offset
order.price = orderReq.price
order.totalVolume = orderReq.volume
order.orderTime = time.strftime('%H:%M:%S') # 保存委托时间
self.orderDict[self.orderRef] = order
return vtOrderID
#----------------------------------------------------------------------
def cancelOrder(self, cancelOrderReq):
"""撤单"""
self.reqID += 1
req = {}
req['localOrderNo'] = cancelOrderReq.orderID
req['marketID'] = '10' # API默认只允许交易T+D延期
self.reqOrderAction(req, self.reqID)
#----------------------------------------------------------------------
def subscribe(self, subscribeReq):
"""订阅行情"""
# 黄金API的订阅传入的参数不是具体交易合约代码而是类型代码
# 考虑到只希望交易T+D这里直接传入'10'
self.subscribeMarketData('10')
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.exit()
#----------------------------------------------------------------------
def generateStrLocalID(localID):
"""把整数的本地委托号转化为字符串"""
return str(localID).rjust(12, '0')