[Mod]增加富途证券接口的成交重复推送过滤功能

This commit is contained in:
vn.py 2017-10-23 10:07:43 +08:00
parent 49a1dfdff1
commit 438948f8ac
3 changed files with 164 additions and 35 deletions

View File

@ -21,7 +21,7 @@ from vnpy.trader.gateway import (ctpGateway, oandaGateway, ibGateway,
if system == 'Windows':
from vnpy.trader.gateway import (femasGateway, xspeedGateway,
futuGateway)
futuGateway, secGateway)
if system == 'Linux':
from vnpy.trader.gateway import xtpGateway
@ -52,6 +52,7 @@ def main():
if system == 'Windows':
me.addGateway(femasGateway)
me.addGateway(xspeedGateway)
me.addGateway(secGateway)
me.addGateway(futuGateway)
if system == 'Linux':

View File

@ -70,6 +70,7 @@ class FutuGateway(VtGateway):
self.filePath = getJsonPath(self.fileName, __file__)
self.tickDict = {}
self.tradeSet = set() # 保存成交编号的集合,防止重复推送
self.qryEnabled = True
self.qryThread = Thread(target=self.qryData)
@ -477,13 +478,18 @@ class FutuGateway(VtGateway):
def processDeal(self, data):
"""处理成交推送"""
for ix, row in data.iterrows():
tradeID = row['dealid']
if tradeID in self.tradeSet:
continue
self.tradeSet.add(tradeID)
trade = VtTradeData()
trade.gatewayName = self.gatewayName
trade.symbol = row['code']
trade.vtSymbol = trade.symbol
trade.tradeID = row['dealid']
trade.tradeID = tradeID
trade.vtTradeID = '.'.join([self.gatewayName, trade.tradeID])
trade.orderID = row['orderid']

View File

@ -202,7 +202,7 @@ class SecMdApi(MdApi):
#----------------------------------------------------------------------
def connect(self, accountID, password, address):
"""初始化连接"""
self.accountID = accountID # 账号
self.accountID = accountID # 账号
self.password = password # 密码
self.address = address # 服务器地址
@ -214,28 +214,26 @@ class SecMdApi(MdApi):
# 初始化连接成功会调用onFrontConnected
self.init(self.address)
# 若已经连接但尚未登录,则进行登录
else:
if not self.loginStatus:
self.login()
#----------------------------------------------------------------------
def subscribe(self, subscribeReq):
"""订阅合约"""
# 这里的设计是,如果尚未登录就调用了订阅方法
# 则先保存订阅请求,登录完成后会自动订阅
if self.loginStatus:
self.reqID += 1
symbol = str(exchangeMap[subscribeReq.exchange] + subscribeReq.symbol)
self.subscribeSOPMarketData(symbol, self.reqID)
if len(subscribeReq.symbol) > 6:
symbol = str(exchangeMap[subscribeReq.exchange] + subscribeReq.symbol)
self.subscribeSOPMarketData(symbol, self.reqID)
else:
symbol = str(exchangeMap[subscribeReq.exchange] + subscribeReq.symbol)
self.subscribeStockMarketData(symbol, self.reqID)
self.subscribedSymbols.add(subscribeReq)
#----------------------------------------------------------------------
def login(self):
"""登录"""
# 如果填入了用户名密码等,则登录
if self.accountID and self.password:
# 登录期权
self.reqID += 1
req = {}
req['accountID'] = self.accountID
@ -243,6 +241,11 @@ class SecMdApi(MdApi):
req['requestID'] = self.reqID
self.reqSOPUserLogin(req)
# 登录股票
self.reqID += 1
req['requestID'] = self.reqID
self.reqStockUserLogin(req)
#----------------------------------------------------------------------
def close(self):
"""关闭"""
@ -292,8 +295,20 @@ class SecMdApi(MdApi):
#----------------------------------------------------------------------
def onRspStockUserLogin(self, data, error):
""""""
pass
"""股票登录回报"""
# 如果登录成功,推送日志信息
if not error:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'股票行情服务器登录完成'
self.gateway.onLog(log)
# 否则,推送错误信息
else:
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = error['errorID']
err.errorMsg = error['errorMsg'].decode('gbk')
self.gateway.onError(err)
#----------------------------------------------------------------------
def onRspStockUserLogout(self, data, error):
@ -302,7 +317,7 @@ class SecMdApi(MdApi):
#----------------------------------------------------------------------
def onRspSOPUserLogin(self, data, error):
"""登陆回报"""
"""期权登录回报"""
# 如果登录成功,推送日志信息
if not error:
self.loginStatus = True
@ -310,7 +325,7 @@ class SecMdApi(MdApi):
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'行情服务器登录完成'
log.logContent = u'期权行情服务器登录完成'
self.gateway.onLog(log)
# 重新订阅之前订阅的合约
@ -366,8 +381,52 @@ class SecMdApi(MdApi):
#----------------------------------------------------------------------
def onStockMarketData(self, data):
""""""
pass
"""股票行情推送"""
tick = VtTickData()
tick.gatewayName = self.gatewayName
tick.symbol = data['securityID']
tick.exchange = exchangeMapReverse.get(data['exchangeID'], u'未知')
tick.vtSymbol = '.'.join([tick.symbol, tick.exchange])
tick.lastPrice = data['latestPrice']
tick.volume = data['tradeQty']
tick.time = data['updateTime']
tick.date = data['tradingDay']
tick.openPrice = data['openPrice']
tick.highPrice = data['highestPrice']
tick.lowPrice = data['lowestPrice']
tick.preClosePrice = data['preClosePrice']
tick.upperLimit = data['upperLimitPrice']
tick.lowerLimit = data['lowerLimitPrice']
tick.bidPrice1 = data['bidPrice1']
tick.bidPrice2 = data['bidPrice2']
tick.bidPrice3 = data['bidPrice3']
tick.bidPrice4 = data['bidPrice4']
tick.bidPrice5 = data['bidPrice5']
tick.askPrice1 = data['askPrice1']
tick.askPrice2 = data['askPrice2']
tick.askPrice3 = data['askPrice3']
tick.askPrice4 = data['askPrice4']
tick.askPrice5 = data['askPrice5']
tick.bidVolume1 = data['bidQty1']
tick.bidVolume2 = data['bidQty2']
tick.bidVolume3 = data['bidQty3']
tick.bidVolume4 = data['bidQty4']
tick.bidVolume5 = data['bidQty5']
tick.askVolume1 = data['askQty1']
tick.askVolume2 = data['askQty2']
tick.askVolume3 = data['askQty3']
tick.askVolume4 = data['askQty4']
tick.askVolume5 = data['askQty5']
self.gateway.onTick(tick)
#----------------------------------------------------------------------
def onSOPMarketData(self, data):
@ -452,7 +511,8 @@ class SecTdApi(TdApi):
self.localID = EMPTY_INT # 订单编号
self.connectionStatus = False # 连接状态
self.loginStatus = False # 登录状态
self.optionLoginStatus = False # 期权登录状态
self.stockLoginStatus = False # 股票登录状态
self.accountID = EMPTY_STRING # 账号
self.password = EMPTY_STRING # 密码
@ -495,6 +555,10 @@ class SecTdApi(TdApi):
req['requestID'] = self.reqID
self.reqSOPUserLogin(req)
self.reqID += 1
req['requestID'] = self.reqID
self.reqStockUserLogin(req)
#----------------------------------------------------------------------
def qryAccount(self):
"""查询账户"""
@ -504,6 +568,9 @@ class SecTdApi(TdApi):
req['accountID'] = self.accountID
self.reqSOPQryCapitalAccountInfo(req)
self.reqID += 1
self.reqStockQryCapitalAccountInfo(req)
#----------------------------------------------------------------------
def qryPosition(self):
"""查询持仓"""
@ -513,16 +580,28 @@ class SecTdApi(TdApi):
req['accountID'] = self.accountID
self.reqSOPQryPosition(req)
self.reqID += 1
self.reqStockQryPosition(req)
#----------------------------------------------------------------------
def qryContracts(self):
"""查询合约"""
# 查询期货合约代码
def qryOptionContracts(self):
"""查询期权合约"""
self.reqID += 1
req = {}
req['requestID'] = self.reqID
req['accountID'] = self.accountID
self.reqSOPQryContactInfo(req)
#----------------------------------------------------------------------
def qryStockContracts(self):
"""查询股票合约"""
self.reqID += 1
req = {}
req['exchangeID'] = 'SH'
req['requestID'] = self.reqID
req['accountID'] = self.accountID
i = self.reqStockQryStockStaticInfo(req)
#----------------------------------------------------------------------
def sendOrder(self, orderReq):
"""发单"""
@ -632,8 +711,25 @@ class SecTdApi(TdApi):
#----------------------------------------------------------------------
def onRspStockUserLogin(self, data, error):
""""""
pass
"""股票登录回报"""
# 如果登录成功,推送日志信息
if not error:
self.stockLoginStatus = True
self.gateway.tdConnected = True
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'股票交易服务器登录完成'
self.gateway.onLog(log)
self.qryStockContracts()
# 否则,推送错误信息
else:
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = error['errorID']
err.errorMsg = error['errorMsg'].decode('gbk')
self.gateway.onError(err)
#----------------------------------------------------------------------
def onRspStockUserLogout(self, data, error):
@ -742,8 +838,34 @@ class SecTdApi(TdApi):
#----------------------------------------------------------------------
def onRspStockQryStockStaticInfo(self, data, error, flag):
""""""
pass
"""股票合约查询回报"""
if not data:
return
contract = VtContractData()
contract.gatewayName = self.gatewayName
contract.symbol = data['securityID']
contract.exchange = exchangeMapReverse.get(data['exchangeID'], EXCHANGE_UNKNOWN)
contract.vtSymbol = '.'.join([contract.symbol, contract.exchange])
contract.name = data['securityName'].decode('GBK')
# 合约数值
contract.size = data['tradeUnit']
contract.priceTick = 0.001 # 50ETF的最小价格变动
# 合约类型
contract.productClass = PRODUCT_EQUITY
# 推送
self.gateway.onContract(contract)
if flag:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'股票交易合约信息获取完成'
self.gateway.onLog(log)
#----------------------------------------------------------------------
def onRspStockQryTradeTime(self, data, error):
@ -767,18 +889,18 @@ class SecTdApi(TdApi):
#----------------------------------------------------------------------
def onRspSOPUserLogin(self, data, error):
"""登陆回报"""
"""期权登录回报"""
# 如果登录成功,推送日志信息
if not error:
self.loginStatus = True
self.optionLoginStatus = True
self.gateway.tdConnected = True
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'交易服务器登录完成'
log.logContent = u'期权交易服务器登录完成'
self.gateway.onLog(log)
self.qryContracts()
#self.qryOptionContracts()
# 否则,推送错误信息
else:
@ -956,7 +1078,7 @@ class SecTdApi(TdApi):
if flag:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'交易合约信息获取完成'
log.logContent = u'期权交易合约信息获取完成'
self.gateway.onLog(log)
#----------------------------------------------------------------------