vnpy/beta/spreadtrading/stEngine.py

587 lines
21 KiB
Python
Raw Normal View History

# encoding: UTF-8
import json
import traceback
import shelve
import parser
import re
from vnpy.event import Event
from vnpy.trader.vtFunction import getJsonPath, getTempPath
from vnpy.trader.vtEvent import (EVENT_TICK, EVENT_TRADE, EVENT_POSITION,
EVENT_TIMER, EVENT_ORDER)
from vnpy.trader.vtObject import (VtSubscribeReq, VtOrderReq,
VtCancelOrderReq, VtLogData)
from vnpy.trader.vtConstant import (DIRECTION_LONG, DIRECTION_SHORT,
OFFSET_OPEN, OFFSET_CLOSE,
PRICETYPE_LIMITPRICE)
from .stBase import (StLeg, StSpread, EVENT_SPREADTRADING_TICK,
EVENT_SPREADTRADING_POS, EVENT_SPREADTRADING_LOG,
EVENT_SPREADTRADING_ALGO, EVENT_SPREADTRADING_ALGOLOG)
from .stAlgo import SniperAlgo
########################################################################
class StDataEngine(object):
"""价差数据计算引擎"""
settingFileName = 'ST_setting.json'
settingFilePath = getJsonPath(settingFileName, __file__)
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine):
"""Constructor"""
self.mainEngine = mainEngine
self.eventEngine = eventEngine
# 腿、价差相关字典
self.legDict = {} # vtSymbol:StLeg
self.spreadDict = {} # name:StSpread
self.vtSymbolSpreadDict = {} # vtSymbol:StSpread
self.registerEvent()
#----------------------------------------------------------------------
def loadSetting(self):
"""加载配置"""
try:
with open(self.settingFilePath) as f:
l = json.load(f)
for setting in l:
result, msg = self.createSpread(setting)
self.writeLog(msg)
self.writeLog(u'价差配置加载完成')
except:
content = u'价差配置加载出错,原因:' + traceback.format_exc()
self.writeLog(content)
#----------------------------------------------------------------------
def saveSetting(self):
"""保存配置"""
with open(self.settingFilePath) as f:
pass
#----------------------------------------------------------------------
def createSpread(self, setting):
"""创建价差"""
result = False
msg = ''
# 检查价差重名
if setting['name'] in self.spreadDict:
msg = u'%s价差存在重名' %setting['name']
return result, msg
# 检查腿是否已使用
l = []
l.append(setting['activeLeg']['vtSymbol'])
for d in setting['passiveLegs']:
l.append(d['vtSymbol'])
for vtSymbol in l:
if vtSymbol in self.vtSymbolSpreadDict:
existingSpread = self.vtSymbolSpreadDict[vtSymbol]
msg = u'%s合约已经存在于%s价差中' %(vtSymbol, existingSpread.name)
return result, msg
# 创建价差
spread = StSpread()
spread.name = setting['name']
spread.formula = setting['formula']
formula = spread.formula
if not re.match("[0-9A-Z\/\+\-\*\(\) ].*", formula) :
msg = u'%s价差存在公式问题请重新编写 %s' % (setting['name'] , spread.formula)
return result, msg
try :
spread.code = parser.expr(formula).compile()
except :
msg = u'%s价差存在公式问题请重新编写 %s' % (setting['name'] , spread.formula)
return result, msg
self.spreadDict[spread.name] = spread
# 创建主动腿
activeSetting = setting['activeLeg']
activeLeg = StLeg()
activeLeg.vtSymbol = str(activeSetting['vtSymbol'])
activeLeg.ratio = float(activeSetting['ratio'])
activeLeg.payup = int(activeSetting['payup'])
activeLeg.legname = str(activeSetting['legname'])
spread.addActiveLeg(activeLeg)
self.legDict[activeLeg.vtSymbol] = activeLeg
self.vtSymbolSpreadDict[activeLeg.vtSymbol] = spread
self.subscribeMarketData(activeLeg.vtSymbol)
# 创建被动腿
passiveSettingList = setting['passiveLegs']
passiveLegList = []
for d in passiveSettingList:
passiveLeg = StLeg()
passiveLeg.vtSymbol = str(d['vtSymbol'])
passiveLeg.ratio = float(d['ratio'])
passiveLeg.payup = int(d['payup'])
passiveLeg.legname = str(d['legname'])
spread.addPassiveLeg(passiveLeg)
self.legDict[passiveLeg.vtSymbol] = passiveLeg
self.vtSymbolSpreadDict[passiveLeg.vtSymbol] = spread
self.subscribeMarketData(passiveLeg.vtSymbol)
# 初始化价差
spread.initSpread()
self.putSpreadTickEvent(spread)
self.putSpreadPosEvent(spread)
# 返回结果
result = True
msg = u'%s价差创建成功' %spread.name
return result, msg
#----------------------------------------------------------------------
def processTickEvent(self, event):
"""处理行情推送"""
# 检查行情是否需要处理
tick = event.dict_['data']
if tick.vtSymbol not in self.legDict:
return
# 更新腿价格
leg = self.legDict[tick.vtSymbol]
leg.bidPrice = tick.bidPrice1
leg.askPrice = tick.askPrice1
leg.bidVolume = tick.bidVolume1
leg.askVolume = tick.askVolume1
# 更新价差价格
spread = self.vtSymbolSpreadDict[tick.vtSymbol]
spread.calculatePrice()
# 发出事件
self.putSpreadTickEvent(spread)
#----------------------------------------------------------------------
def putSpreadTickEvent(self, spread):
"""发出价差行情更新事件"""
event1 = Event(EVENT_SPREADTRADING_TICK+spread.name)
event1.dict_['data'] = spread
self.eventEngine.put(event1)
event2 = Event(EVENT_SPREADTRADING_TICK)
event2.dict_['data'] = spread
self.eventEngine.put(event2)
#----------------------------------------------------------------------
def processTradeEvent(self, event):
"""处理成交推送"""
# 检查成交是否需要处理
trade = event.dict_['data']
if trade.vtSymbol not in self.legDict:
return
# 更新腿持仓
leg = self.legDict[trade.vtSymbol]
direction = trade.direction
offset = trade.offset
if direction == DIRECTION_LONG:
if offset == OFFSET_OPEN:
leg.longPos += trade.volume
else:
leg.shortPos -= trade.volume
else:
if offset == OFFSET_OPEN:
leg.shortPos += trade.volume
else:
leg.longPos -= trade.volume
leg.netPos = leg.longPos - leg.shortPos
# 更新价差持仓
spread = self.vtSymbolSpreadDict[trade.vtSymbol]
spread.calculatePos()
# 推送价差持仓更新
event1 = Event(EVENT_SPREADTRADING_POS+spread.name)
event1.dict_['data'] = spread
self.eventEngine.put(event1)
event2 = Event(EVENT_SPREADTRADING_POS)
event2.dict_['data'] = spread
self.eventEngine.put(event2)
#----------------------------------------------------------------------
def processPosEvent(self, event):
"""处理持仓推送"""
# 检查持仓是否需要处理
pos = event.dict_['data']
if pos.vtSymbol not in self.legDict:
return
# 更新腿持仓
leg = self.legDict[pos.vtSymbol]
direction = pos.direction
if direction == DIRECTION_LONG:
leg.longPos = pos.position
else:
leg.shortPos = pos.position
leg.netPos = leg.longPos - leg.shortPos
# 更新价差持仓
spread = self.vtSymbolSpreadDict[pos.vtSymbol]
spread.calculatePos()
# 推送价差持仓更新
self.putSpreadPosEvent(spread)
#----------------------------------------------------------------------
def putSpreadPosEvent(self, spread):
"""发出价差持仓事件"""
event1 = Event(EVENT_SPREADTRADING_POS+spread.name)
event1.dict_['data'] = spread
self.eventEngine.put(event1)
event2 = Event(EVENT_SPREADTRADING_POS)
event2.dict_['data'] = spread
self.eventEngine.put(event2)
#----------------------------------------------------------------------
def registerEvent(self):
""""""
self.eventEngine.register(EVENT_TICK, self.processTickEvent)
self.eventEngine.register(EVENT_TRADE, self.processTradeEvent)
self.eventEngine.register(EVENT_POSITION, self.processPosEvent)
#----------------------------------------------------------------------
def subscribeMarketData(self, vtSymbol):
"""订阅行情"""
contract = self.mainEngine.getContract(vtSymbol)
if not contract:
self.writeLog(u'订阅行情失败,找不到该合约%s' %vtSymbol)
return
req = VtSubscribeReq()
req.symbol = contract.symbol
req.exchange = contract.exchange
self.mainEngine.subscribe(req, contract.gatewayName)
#----------------------------------------------------------------------
def writeLog(self, content):
"""发出日志"""
log = VtLogData()
log.logContent = content
event = Event(EVENT_SPREADTRADING_LOG)
event.dict_['data'] = log
self.eventEngine.put(event)
#----------------------------------------------------------------------
def getAllSpreads(self):
"""获取所有的价差"""
return self.spreadDict.values()
########################################################################
class StAlgoEngine(object):
"""价差算法交易引擎"""
algoFileName = 'SpreadTradingAlgo.vt'
algoFilePath = getTempPath(algoFileName)
#----------------------------------------------------------------------
def __init__(self, dataEngine, mainEngine, eventEngine):
"""Constructor"""
self.dataEngine = dataEngine
self.mainEngine = mainEngine
self.eventEngine = eventEngine
self.algoDict = {} # spreadName:algo
self.vtSymbolAlgoDict = {} # vtSymbol:algo
self.registerEvent()
#----------------------------------------------------------------------
def registerEvent(self):
"""注册事件监听"""
self.eventEngine.register(EVENT_SPREADTRADING_TICK, self.processSpreadTickEvent)
self.eventEngine.register(EVENT_SPREADTRADING_POS, self.processSpreadPosEvent)
self.eventEngine.register(EVENT_TRADE, self.processTradeEvent)
self.eventEngine.register(EVENT_ORDER, self.processOrderEvent)
self.eventEngine.register(EVENT_TIMER, self.processTimerEvent)
#----------------------------------------------------------------------
def processSpreadTickEvent(self, event):
"""处理价差行情事件"""
spread = event.dict_['data']
algo = self.algoDict.get(spread.name, None)
if algo:
algo.updateSpreadTick(spread)
#----------------------------------------------------------------------
def processSpreadPosEvent(self, event):
"""处理价差持仓事件"""
spread = event.dict_['data']
algo = self.algoDict.get(spread.name, None)
if algo:
algo.updateSpreadPos(spread)
#----------------------------------------------------------------------
def processTradeEvent(self, event):
"""处理成交事件"""
trade = event.dict_['data']
algo = self.vtSymbolAlgoDict.get(trade.vtSymbol, None)
if algo:
algo.updateTrade(trade)
#----------------------------------------------------------------------
def processOrderEvent(self, event):
"""处理委托事件"""
order = event.dict_['data']
algo = self.vtSymbolAlgoDict.get(order.vtSymbol, None)
if algo:
algo.updateOrder(order)
#----------------------------------------------------------------------
def processTimerEvent(self, event):
""""""
for algo in self.algoDict.values():
algo.updateTimer()
#----------------------------------------------------------------------
def sendOrder(self, vtSymbol, direction, offset, price, volume, payup=0):
"""发单"""
contract = self.mainEngine.getContract(vtSymbol)
if not contract:
return ''
req = VtOrderReq()
req.symbol = contract.symbol
req.exchange = contract.exchange
req.vtSymbol = contract.vtSymbol
req.direction = direction
req.offset = offset
req.volume = int(volume)
req.priceType = PRICETYPE_LIMITPRICE
if direction == DIRECTION_LONG:
req.price = price + payup * contract.priceTick
else:
req.price = price - payup * contract.priceTick
# 委托转换
reqList = self.mainEngine.convertOrderReq(req)
vtOrderIDList = []
for req in reqList:
vtOrderID = self.mainEngine.sendOrder(req, contract.gatewayName)
vtOrderIDList.append(vtOrderID)
return vtOrderIDList
#----------------------------------------------------------------------
def cancelOrder(self, vtOrderID):
"""撤单"""
order = self.mainEngine.getOrder(vtOrderID)
if not order:
return
req = VtCancelOrderReq()
req.symbol = order.symbol
req.exchange = order.exchange
req.frontID = order.frontID
req.sessionID = order.sessionID
req.orderID = order.orderID
self.mainEngine.cancelOrder(req, order.gatewayName)
#----------------------------------------------------------------------
def buy(self, vtSymbol, price, volume, payup=0):
"""买入"""
l = self.sendOrder(vtSymbol, DIRECTION_LONG, OFFSET_OPEN, price, volume, payup)
return l
#----------------------------------------------------------------------
def sell(self, vtSymbol, price, volume, payup=0):
"""卖出"""
l = self.sendOrder(vtSymbol, DIRECTION_SHORT, OFFSET_CLOSE, price, volume, payup)
return l
#----------------------------------------------------------------------
def short(self, vtSymbol, price, volume, payup=0):
"""卖空"""
l = self.sendOrder(vtSymbol, DIRECTION_SHORT, OFFSET_OPEN, price, volume, payup)
return l
#----------------------------------------------------------------------
def cover(self, vtSymbol, price, volume, payup=0):
"""平空"""
l = self.sendOrder(vtSymbol, DIRECTION_LONG, OFFSET_CLOSE, price, volume, payup)
return l
#----------------------------------------------------------------------
def putAlgoEvent(self, algo):
"""发出算法状态更新事件"""
event = Event(EVENT_SPREADTRADING_ALGO+algo.name)
self.eventEngine.put(event)
#----------------------------------------------------------------------
def writeLog(self, content):
"""输出日志"""
log = VtLogData()
log.logContent = content
event = Event(EVENT_SPREADTRADING_ALGOLOG)
event.dict_['data'] = log
self.eventEngine.put(event)
#----------------------------------------------------------------------
def saveSetting(self):
"""保存算法配置"""
setting = {}
for algo in self.algoDict.values():
setting[algo.spreadName] = algo.getAlgoParams()
f = shelve.open(self.algoFilePath)
f['setting'] = setting
f.close()
#----------------------------------------------------------------------
def loadSetting(self):
"""加载算法配置"""
# 创建算法对象
l = self.dataEngine.getAllSpreads()
for spread in l:
algo = SniperAlgo(self, spread)
self.algoDict[spread.name] = algo
# 保存腿代码和算法对象的映射
for leg in spread.allLegs:
self.vtSymbolAlgoDict[leg.vtSymbol] = algo
# 加载配置
f = shelve.open(self.algoFilePath)
setting = f.get('setting', None)
f.close()
if not setting:
return
for algo in self.algoDict.values():
if algo.spreadName in setting:
d = setting[algo.spreadName]
algo.setAlgoParams(d)
#----------------------------------------------------------------------
def stopAll(self):
"""停止全部算法"""
for algo in self.algoDict.values():
algo.stop()
#----------------------------------------------------------------------
def startAlgo(self, spreadName):
"""启动算法"""
algo = self.algoDict[spreadName]
algoActive = algo.start()
return algoActive
#----------------------------------------------------------------------
def stopAlgo(self, spreadName):
"""停止算法"""
algo = self.algoDict[spreadName]
algoActive = algo.stop()
return algoActive
#----------------------------------------------------------------------
def getAllAlgoParams(self):
"""获取所有算法的参数"""
return [algo.getAlgoParams() for algo in self.algoDict.values()]
#----------------------------------------------------------------------
def setAlgoBuyPrice(self, spreadName, buyPrice):
"""设置算法买开价格"""
algo = self.algoDict[spreadName]
algo.setBuyPrice(buyPrice)
#----------------------------------------------------------------------
def setAlgoSellPrice(self, spreadName, sellPrice):
"""设置算法卖平价格"""
algo = self.algoDict[spreadName]
algo.setSellPrice(sellPrice)
#----------------------------------------------------------------------
def setAlgoShortPrice(self, spreadName, shortPrice):
"""设置算法卖开价格"""
algo = self.algoDict[spreadName]
algo.setShortPrice(shortPrice)
#----------------------------------------------------------------------
def setAlgoCoverPrice(self, spreadName, coverPrice):
"""设置算法买平价格"""
algo = self.algoDict[spreadName]
algo.setCoverPrice(coverPrice)
#----------------------------------------------------------------------
def setAlgoMode(self, spreadName, mode):
"""设置算法工作模式"""
algo = self.algoDict[spreadName]
algo.setMode(mode)
#----------------------------------------------------------------------
def setAlgoMaxOrderSize(self, spreadName, maxOrderSize):
"""设置算法单笔委托限制"""
algo = self.algoDict[spreadName]
algo.setMaxOrderSize(maxOrderSize)
#----------------------------------------------------------------------
def setAlgoMaxPosSize(self, spreadName, maxPosSize):
"""设置算法持仓限制"""
algo = self.algoDict[spreadName]
algo.setMaxPosSize(maxPosSize)
########################################################################
class StEngine(object):
"""价差引擎"""
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine):
"""Constructor"""
self.mainEngine = mainEngine
self.eventEngine = eventEngine
self.dataEngine = StDataEngine(mainEngine, eventEngine)
self.algoEngine = StAlgoEngine(self.dataEngine, mainEngine, eventEngine)
#----------------------------------------------------------------------
def init(self):
"""初始化"""
self.dataEngine.loadSetting()
self.algoEngine.loadSetting()
#----------------------------------------------------------------------
def stop(self):
"""停止"""
self.dataEngine.saveSetting()
self.algoEngine.stopAll()
self.algoEngine.saveSetting()