[Mod]修改WebTrader的安全退出机制

This commit is contained in:
vn.py 2018-02-07 22:08:21 +08:00
parent 28f35fd7d8
commit 6e23c0460b
6 changed files with 1389 additions and 1 deletions

View File

@ -0,0 +1,23 @@
[
{
"name": "cuzn4",
"formula" : "(B+()2)/A",
"activeLeg":
{
"vtSymbol": "zn1806",
"ratio": -2,
"payup": 2,
"legname" : "A"
},
"passiveLegs": [
{
"vtSymbol": "cu1806",
"ratio": 1,
"payup": 2,
"legname" : "B"
}
]
}
]

View File

@ -0,0 +1,517 @@
# encoding: UTF-8
from math import floor
from vnpy.trader.vtConstant import (EMPTY_INT, EMPTY_FLOAT,
EMPTY_STRING, EMPTY_UNICODE,
DIRECTION_LONG, DIRECTION_SHORT,
STATUS_ALLTRADED, STATUS_CANCELLED, STATUS_REJECTED)
########################################################################
class StAlgoTemplate(object):
"""价差算法交易模板"""
MODE_LONGSHORT = u'双向'
MODE_LONGONLY = u'做多'
MODE_SHORTONLY = u'做空'
SPREAD_LONG = 1
SPREAD_SHORT = 2
#----------------------------------------------------------------------
def __init__(self, algoEngine, spread):
"""Constructor"""
self.algoEngine = algoEngine # 算法引擎
self.spreadName = spread.name # 价差名称
self.spread = spread # 价差对象
self.algoName = EMPTY_STRING # 算法名称
self.active = False # 工作状态
self.mode = self.MODE_LONGSHORT # 工作模式
self.buyPrice = EMPTY_FLOAT # 开平仓价格
self.sellPrice = EMPTY_FLOAT
self.shortPrice = EMPTY_FLOAT
self.coverPrice = EMPTY_FLOAT
self.maxPosSize = EMPTY_INT # 最大单边持仓量
self.maxOrderSize = EMPTY_INT # 最大单笔委托量
#----------------------------------------------------------------------
def updateSpreadTick(self, spread):
""""""
raise NotImplementedError
#----------------------------------------------------------------------
def updateSpreadPos(self, spread):
""""""
raise NotImplementedError
#----------------------------------------------------------------------
def updateTrade(self, trade):
""""""
raise NotImplementedError
#----------------------------------------------------------------------
def updateOrder(self, order):
""""""
raise NotImplementedError
#----------------------------------------------------------------------
def updateTimer(self):
""""""
raise NotImplementedError
#----------------------------------------------------------------------
def start(self):
""""""
raise NotImplementedError
#----------------------------------------------------------------------
def stop(self):
""""""
raise NotImplementedError
#----------------------------------------------------------------------
def setBuyPrice(self, buyPrice):
"""设置买开的价格"""
self.buyPrice = buyPrice
#----------------------------------------------------------------------
def setSellPrice(self, sellPrice):
"""设置卖平的价格"""
self.sellPrice = sellPrice
#----------------------------------------------------------------------
def setShortPrice(self, shortPrice):
"""设置卖开的价格"""
self.shortPrice = shortPrice
#----------------------------------------------------------------------
def setCoverPrice(self, coverPrice):
"""设置买平的价格"""
self.coverPrice = coverPrice
#----------------------------------------------------------------------
def setMode(self, mode):
"""设置算法交易方向"""
self.mode = mode
#----------------------------------------------------------------------
def setMaxOrderSize(self, maxOrderSize):
"""设置最大单笔委托数量"""
self.maxOrderSize = maxOrderSize
#----------------------------------------------------------------------
def setMaxPosSize(self, maxPosSize):
"""设置最大持仓数量"""
self.maxPosSize = maxPosSize
#----------------------------------------------------------------------
def putEvent(self):
"""发出算法更新事件"""
self.algoEngine.putAlgoEvent(self)
#----------------------------------------------------------------------
def writeLog(self, content):
"""输出算法日志"""
prefix = ' '.join([self.spreadName, self.algoName])
content = ':'.join([prefix, content])
self.algoEngine.writeLog(content)
#----------------------------------------------------------------------
def getAlgoParams(self):
"""获取算法参数"""
d = {
"spreadName": self.spreadName,
"algoName": self.algoName,
"buyPrice": self.buyPrice,
"sellPrice": self.sellPrice,
"shortPrice": self.shortPrice,
"coverPrice": self.coverPrice,
"maxOrderSize": self.maxOrderSize,
"maxPosSize": self.maxPosSize,
"mode": self.mode
}
return d
#----------------------------------------------------------------------
def setAlgoParams(self, d):
"""设置算法参数"""
self.buyPrice = d.get('buyPrice', EMPTY_FLOAT)
self.sellPrice = d.get('sellPrice', EMPTY_FLOAT)
self.shortPrice = d.get('shortPrice', EMPTY_FLOAT)
self.coverPrice = d.get('coverPrice', EMPTY_FLOAT)
self.maxOrderSize = d.get('maxOrderSize', EMPTY_INT)
self.maxPosSize = d.get('maxPosSize', EMPTY_INT)
self.mode = d.get('mode', self.MODE_LONGSHORT)
########################################################################
class SniperAlgo(StAlgoTemplate):
"""狙击算法(市价委托)"""
FINISHED_STATUS = [STATUS_ALLTRADED, STATUS_CANCELLED, STATUS_REJECTED]
#----------------------------------------------------------------------
def __init__(self, algoEngine, spread):
"""Constructor"""
super(SniperAlgo, self).__init__(algoEngine, spread)
self.algoName = u'Sniper'
self.quoteInterval = 2 # 主动腿报价撤单再发前等待的时间
self.quoteCount = 0 # 报价计数
self.hedgeInterval = 2 # 对冲腿对冲撤单再发前的等待时间
self.hedgeCount = 0 # 对冲计数
self.activeVtSymbol = spread.activeLeg.vtSymbol # 主动腿代码
self.passiveVtSymbols = [leg.vtSymbol for leg in spread.passiveLegs] # 被动腿代码列表
# 缓存每条腿对象的字典
self.legDict = {}
self.legDict[spread.activeLeg.vtSymbol] = spread.activeLeg
for leg in spread.passiveLegs:
self.legDict[leg.vtSymbol] = leg
self.hedgingTaskDict = {} # 被动腿需要对冲的数量字典 vtSymbol:volume
self.legOrderDict = {} # vtSymbol: list of vtOrderID
self.orderTradedDict = {} # vtOrderID: tradedVolume
#----------------------------------------------------------------------
def updateSpreadTick(self, spread):
"""价差行情更新"""
self.spread = spread
# 若算法没有启动则直接返回
if not self.active:
return
# 若当前已有主动腿委托则直接返回
if (self.activeVtSymbol in self.legOrderDict and
self.legOrderDict[self.activeVtSymbol]):
return
# 允许做多
if self.mode == self.MODE_LONGSHORT or self.mode == self.MODE_LONGONLY:
# 买入
if (spread.netPos >= 0 and
spread.netPos < self.maxPosSize and
spread.askPrice <= self.buyPrice):
self.quoteActiveLeg(self.SPREAD_LONG)
self.writeLog(u'买入开仓')
# 卖出
elif (spread.netPos > 0 and
spread.bidPrice >= self.sellPrice):
self.quoteActiveLeg(self.SPREAD_SHORT)
self.writeLog(u'卖出平仓')
# 允许做空
if self.mode == self.MODE_LONGSHORT or self.mode == self.MODE_SHORTONLY:
# 做空
if (spread.netPos <= 0 and
spread.netPos > -self.maxPosSize and
spread.bidPrice >= self.shortPrice):
self.quoteActiveLeg(self.SPREAD_SHORT)
self.writeLog(u'卖出开仓')
# 平空
elif (spread.netPos < 0 and
spread.askPrice <= self.coverPrice):
self.quoteActiveLeg(self.SPREAD_LONG)
self.writeLog(u'买入平仓')
#----------------------------------------------------------------------
def updateSpreadPos(self, spread):
"""价差持仓更新"""
self.spread = spread
#----------------------------------------------------------------------
def updateTrade(self, trade):
"""成交更新"""
pass
#----------------------------------------------------------------------
def updateOrder(self, order):
"""委托更新"""
if not self.active:
return
vtOrderID = order.vtOrderID
vtSymbol = order.vtSymbol
newTradedVolume = order.tradedVolume
lastTradedVolume = self.orderTradedDict.get(vtOrderID, 0)
# 检查是否有新的成交
if newTradedVolume > lastTradedVolume:
self.orderTradedDict[vtOrderID] = newTradedVolume # 缓存委托已经成交数量
volume = newTradedVolume - lastTradedVolume # 计算本次成交数量
if vtSymbol == self.activeVtSymbol:
self.newActiveLegTrade(vtSymbol, order.direction, volume)
else:
self.newPassiveLegTrade(vtSymbol, order.direction, volume)
# 处理完成委托
if order.status in self.FINISHED_STATUS:
vtOrderID = order.vtOrderID
vtSymbol = order.vtSymbol
# 从委托列表中移除该委托
orderList = self.legOrderDict.get(vtSymbol, None)
if orderList and vtOrderID in orderList:
orderList.remove(vtOrderID)
# 检查若是被动腿,且已经没有未完成委托,则执行对冲
if not orderList and vtSymbol in self.passiveVtSymbols:
self.hedgePassiveLeg(vtSymbol)
#----------------------------------------------------------------------
def updateTimer(self):
"""计时更新"""
if not self.active:
return
self.quoteCount += 1
self.hedgeCount += 1
# 计时到达报价间隔后,则对尚未成交的主动腿委托全部撤单
# 收到撤单回报后清空委托列表,等待下次价差更新再发单
if self.quoteCount > self.quoteInterval:
self.cancelLegOrder(self.activeVtSymbol)
self.quoteCount = 0
# 计时到达对冲间隔后,则对尚未成交的全部被动腿委托全部撤单
# 收到撤单回报后,会自动发送新的对冲委托
if self.hedgeCount > self.hedgeInterval:
self.cancelAllPassiveLegOrders()
self.hedgeCount = 0
#----------------------------------------------------------------------
def start(self):
"""启动"""
# 如果已经运行则直接返回状态
if self.active:
return self.active
# 做多检查
if self.mode != self.MODE_SHORTONLY:
if self.buyPrice >= self.sellPrice:
self.writeLog(u'启动失败允许多头交易时BuyPrice必须小于SellPrice')
return self.active
# 做空检查
if self.mode != self.MODE_LONGONLY:
if self.shortPrice <= self.coverPrice:
self.writeLog(u'启动失败允许空头交易时ShortPrice必须大于CoverPrice')
return self.active
# 多空检查
if self.mode == self.MODE_LONGSHORT:
if self.buyPrice >= self.coverPrice:
self.writeLog(u'启动失败允许双向交易时BuyPrice必须小于CoverPrice')
return self.active
if self.shortPrice <= self.sellPrice:
self.writeLog(u'启动失败允许双向交易时ShortPrice必须大于SellPrice')
return self.active
# 启动算法
self.quoteCount = 0
self.hedgeCount = 0
self.active = True
self.writeLog(u'算法启动')
return self.active
#----------------------------------------------------------------------
def stop(self):
"""停止"""
if self.active:
self.hedgingTaskDict.clear()
self.cancelAllOrders()
self.active = False
self.writeLog(u'算法停止')
return self.active
#----------------------------------------------------------------------
def sendLegOrder(self, leg, legVolume):
"""发送每条腿的委托"""
vtSymbol = leg.vtSymbol
volume = abs(legVolume)
payup = leg.payup
# 发送委托
if legVolume > 0:
price = leg.askPrice
if leg.shortPos > 0:
orderList = self.algoEngine.cover(vtSymbol, price, volume, payup)
else:
orderList = self.algoEngine.buy(vtSymbol, price, volume, payup)
elif legVolume < 0:
price = leg.bidPrice
if leg.longPos > 0:
orderList = self.algoEngine.sell(vtSymbol, price, volume, payup)
else:
orderList = self.algoEngine.short(vtSymbol, price, volume, payup)
# 保存到字典中
if vtSymbol not in self.legOrderDict:
self.legOrderDict[vtSymbol] = orderList
else:
self.legOrderDict[vtSymbol].extend(orderList)
#----------------------------------------------------------------------
def quoteActiveLeg(self, direction):
"""发出主动腿"""
spread = self.spread
# 首先计算不带正负号的价差委托量
if direction == self.SPREAD_LONG:
spreadVolume = min(spread.askVolume,
self.maxPosSize - spread.netPos,
self.maxOrderSize)
# 有价差空头持仓的情况下,则本次委托最多平完空头
if spread.shortPos > 0:
spreadVolume = min(spreadVolume, spread.shortPos)
else:
spreadVolume = min(spread.bidVolume,
self.maxPosSize + spread.netPos,
self.maxOrderSize)
# 有价差多头持仓的情况下,则本次委托最多平完多头
if spread.longPos > 0:
spreadVolume = min(spreadVolume, spread.longPos)
if spreadVolume <= 0:
return
# 加上价差方向
if direction == self.SPREAD_SHORT:
spreadVolume = -spreadVolume
# 计算主动腿委托量
leg = self.legDict[self.activeVtSymbol]
legVolume = spreadVolume * leg.ratio
self.sendLegOrder(leg, legVolume)
self.writeLog(u'发出新的主动腿%s狙击单' %self.activeVtSymbol)
self.quoteCount = 0 # 重置主动腿报价撤单等待计数
#----------------------------------------------------------------------
def hedgePassiveLeg(self, vtSymbol):
"""被动腿对冲"""
if vtSymbol not in self.hedgingTaskDict:
return
orderList = self.legOrderDict.get(vtSymbol, None)
if orderList:
return
legVolume = self.hedgingTaskDict[vtSymbol]
leg = self.legDict[vtSymbol]
self.sendLegOrder(leg, legVolume)
self.writeLog(u'发出新的被动腿%s对冲单' %vtSymbol)
#----------------------------------------------------------------------
def hedgeAllPassiveLegs(self):
"""执行所有被动腿对冲"""
for vtSymbol in self.hedgingTaskDict.keys():
self.hedgePassiveLeg(vtSymbol)
self.hedgeCount = 0 # 重置被动腿对冲撤单等待计数
#----------------------------------------------------------------------
def newActiveLegTrade(self, vtSymbol, direction, volume):
"""新的主动腿成交"""
# 输出日志
self.writeLog(u'主动腿%s成交,方向%s,数量%s' %(vtSymbol, direction, volume))
# 将主动腿成交带上方向
if direction == DIRECTION_SHORT:
volume = -volume
# 计算主动腿成交后,对应的价差仓位
spread = self.spread
activeRatio = spread.activeLeg.ratio
spreadVolume = round(volume / activeRatio) # 四舍五入求主动腿成交量对应的价差份数
# 计算价差新仓位,对应的被动腿需要对冲部分
for leg in self.spread.passiveLegs:
newHedgingTask = leg.ratio * spreadVolume
if leg.vtSymbol not in self.hedgingTaskDict:
self.hedgingTaskDict[leg.vtSymbol] = newHedgingTask
else:
self.hedgingTaskDict[leg.vtSymbol] += newHedgingTask
# 发出被动腿对冲委托
self.hedgeAllPassiveLegs()
#----------------------------------------------------------------------
def newPassiveLegTrade(self, vtSymbol, direction, volume):
"""新的被动腿成交"""
if vtSymbol in self.hedgingTaskDict:
# 计算完成的对冲数量
if direction == DIRECTION_LONG:
hedgedVolume = volume
else:
hedgedVolume = -volume
# 计算剩余尚未完成的数量
self.hedgingTaskDict[vtSymbol] -= hedgedVolume
# 如果已全部完成,则从字典中移除
if not self.hedgingTaskDict[vtSymbol]:
del self.hedgingTaskDict[vtSymbol]
# 输出日志
self.writeLog(u'被动腿%s成交,方向%s,数量%s' %(vtSymbol, direction, volume))
#----------------------------------------------------------------------
def cancelLegOrder(self, vtSymbol):
"""撤销某条腿的委托"""
if vtSymbol not in self.legOrderDict:
return
orderList = self.legOrderDict[vtSymbol]
if not orderList:
return
for vtOrderID in orderList:
self.algoEngine.cancelOrder(vtOrderID)
self.writeLog(u'撤单%s的所有委托' %vtSymbol)
#----------------------------------------------------------------------
def cancelAllOrders(self):
"""撤销全部委托"""
for orderList in self.legOrderDict.values():
for vtOrderID in orderList:
self.algoEngine.cancelOrder(vtOrderID)
self.writeLog(u'全部撤单')
#----------------------------------------------------------------------
def cancelAllPassiveLegOrders(self):
"""撤销全部被动腿委托"""
cancelPassive = False
for vtSymbol in self.passiveVtSymbols:
if vtSymbol in self.legOrderDict and self.legOrderDict[vtSymbol]:
self.cancelLegOrder(vtSymbol)
cancelPassive = True
# 只有确实发出撤单委托时,才输出信息
if cancelPassive:
self.writeLog(u'被动腿全撤')

View File

@ -0,0 +1,237 @@
# encoding: UTF-8
from __future__ import division
import re
from math import floor
from datetime import datetime
from vnpy.trader.vtConstant import (EMPTY_INT, EMPTY_FLOAT,
EMPTY_STRING, EMPTY_UNICODE)
EVENT_SPREADTRADING_TICK = 'eSpreadTradingTick.'
EVENT_SPREADTRADING_POS = 'eSpreadTradingPos.'
EVENT_SPREADTRADING_LOG = 'eSpreadTradingLog'
EVENT_SPREADTRADING_ALGO = 'eSpreadTradingAlgo.'
EVENT_SPREADTRADING_ALGOLOG = 'eSpreadTradingAlgoLog'
########################################################################
class StLeg(object):
""""""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.vtSymbol = EMPTY_STRING # 代码
self.ratio = EMPTY_INT # 实际交易时的比例
self.multiplier = EMPTY_FLOAT # 计算价差时的乘数
self.payup = EMPTY_INT # 对冲时的超价tick
self.bidPrice = EMPTY_FLOAT
self.askPrice = EMPTY_FLOAT
self.bidVolume = EMPTY_INT
self.askVolume = EMPTY_INT
self.longPos = EMPTY_INT
self.shortPos = EMPTY_INT
self.netPos = EMPTY_INT
self.opcode = EMPTY_STRING
self.legname = EMPTY_STRING
########################################################################
class StSpread(object):
""""""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.name = EMPTY_UNICODE # 名称
self.symbol = EMPTY_STRING # 代码(基于组成腿计算)
self.formula = EMPTY_STRING
self.code =None
self.activeLeg = None # 主动腿
self.passiveLegs = [] # 被动腿(支持多条)
self.allLegs = [] # 所有腿
self.bidPrice = EMPTY_FLOAT
self.askPrice = EMPTY_FLOAT
self.bidVolume = EMPTY_INT
self.askVolume = EMPTY_INT
self.time = EMPTY_STRING
self.longPos = EMPTY_INT
self.shortPos = EMPTY_INT
self.netPos = EMPTY_INT
#----------------------------------------------------------------------
def initSpread(self):
"""初始化价差"""
# 价差最少要有一条主动腿
if not self.activeLeg:
return
# 生成所有腿列表
self.allLegs.append(self.activeLeg)
self.allLegs.extend(self.passiveLegs)
# 生成价差代码
legSymbolList = []
self.symbol = self.formula
for leg in self.allLegs:
self.symbol = re.sub(leg.legname, leg.vtSymbol , self.symbol)
#self.symbol = ''.join(legSymbolList)
#----------------------------------------------------------------------
def calculatePrice(self):
"""计算价格"""
# 清空价格和委托量数据
self.bidPrice = EMPTY_FLOAT
self.askPrice = EMPTY_FLOAT
self.askVolume = EMPTY_INT
self.bidVolume = EMPTY_INT
if not self.code :
raise ValueError("Formula Error Can't Calculate Price")
legbidglobal = {}
legaskglobal = {}
# 遍历价差腿列表
for n, leg in enumerate(self.allLegs):
# 计算价格
if leg.askPrice == leg.bidPrice and leg.bidPrice == EMPTY_FLOAT:
# 清空价格和委托量数据
self.bidPrice = EMPTY_FLOAT
self.askPrice = EMPTY_FLOAT
self.askVolume = EMPTY_INT
self.bidVolume = EMPTY_INT
else:
if leg.ratio > 0:
legbidglobal[leg.legname] = leg.bidPrice
legaskglobal[leg.legname] = leg.askPrice
else :
legbidglobal[leg.legname] = leg.askPrice
legaskglobal[leg.legname] = leg.bidPrice
#tick数据有误暂时不计算
#if leg.opcode == "add":
#if leg.ratio > 0:
#self.bidPrice += leg.bidPrice * leg.multiplier
#self.askPrice += leg.askPrice * leg.multiplier
#else:
#self.bidPrice += leg.askPrice * leg.multiplier
#self.askPrice += leg.bidPrice * leg.multiplier
#elif leg.opcode == "div":
#if n == 0 :
#if leg.ratio > 0:
#self.bidPrice = 1 / (leg.bidPrice * leg.multiplier)
#self.askPrice = 1 / (leg.askPrice * leg.multiplier)
#else:
#self.bidPrice = 1 / (leg.askPrice * leg.multiplier)
#self.askPrice = 1 / (leg.bidPrice * leg.multiplier)
#else:
#if leg.ratio > 0:
#self.bidPrice = self.bidPrice / (leg.bidPrice * leg.multiplier)
#self.askPrice = self.askPrice / (leg.askPrice * leg.multiplier)
#else:
#self.bidPrice = self.bidPrice / (leg.askPrice * leg.multiplier)
#self.askPrice = self.askPrice / (leg.bidPrice * leg.multiplier)
#elif leg.opcode == "mul":
#if n == 0 :
#if leg.ratio > 0:
#self.bidPrice = (leg.bidPrice * leg.multiplier)
#self.askPrice = (leg.askPrice * leg.multiplier)
#else:
#self.bidPrice = (leg.bidPrice * leg.multiplier)
#self.askPrice = (leg.askPrice * leg.multiplier)
#else:
#if leg.ratio > 0:
#self.bidPrice = self.bidPrice * (leg.bidPrice * leg.multiplier)
#self.askPrice = self.askPrice * (leg.askPrice * leg.multiplier)
#else:
#self.bidPrice = self.bidPrice * (leg.askPrice * leg.multiplier)
#self.askPrice = self.askPrice * (leg.bidPrice * leg.multiplier)
# 计算报单量
if leg.ratio > 0:
legAdjustedBidVolume = floor(leg.bidVolume / leg.ratio)
legAdjustedAskVolume = floor(leg.askVolume / leg.ratio)
else:
legAdjustedBidVolume = floor(leg.askVolume / abs(leg.ratio))
legAdjustedAskVolume = floor(leg.bidVolume / abs(leg.ratio))
if n == 0:
self.bidVolume = legAdjustedBidVolume # 对于第一条腿,直接初始化
self.askVolume = legAdjustedAskVolume
else:
self.bidVolume = min(self.bidVolume, legAdjustedBidVolume) # 对于后续的腿,价差可交易报单量取较小值
self.askVolume = min(self.askVolume, legAdjustedAskVolume)
bidok = True
askok = True
for k in self.code.co_names :
bidok = bidok and legbidglobal.has_key(k)
askok = askok and legaskglobal.has_key(k)
if bidok and askok :
self.bidPrice = eval(self.code,legbidglobal)
self.askPrice = eval(self.code,legaskglobal)
# 更新时间
self.time = datetime.now().strftime('%H:%M:%S.%f')[:-3]
#----------------------------------------------------------------------
def calculatePos(self):
"""计算持仓"""
# 清空持仓数据
self.longPos = EMPTY_INT
self.shortPos = EMPTY_INT
self.netPos = EMPTY_INT
# 遍历价差腿列表
for n, leg in enumerate(self.allLegs):
if leg.ratio > 0:
legAdjustedLongPos = floor(leg.longPos / leg.ratio)
legAdjustedShortPos = floor(leg.shortPos / leg.ratio)
else:
legAdjustedLongPos = floor(leg.shortPos / abs(leg.ratio))
legAdjustedShortPos = floor(leg.longPos / abs(leg.ratio))
if n == 0:
self.longPos = legAdjustedLongPos
self.shortPos = legAdjustedShortPos
else:
self.longPos = min(self.longPos, legAdjustedLongPos)
self.shortPos = min(self.shortPos, legAdjustedShortPos)
# 计算净仓位
self.longPos = int(self.longPos)
self.shortPos = int(self.shortPos)
self.netPos = self.longPos - self.shortPos
#----------------------------------------------------------------------
def addActiveLeg(self, leg):
"""添加主动腿"""
self.activeLeg = leg
#----------------------------------------------------------------------
def addPassiveLeg(self, leg):
"""添加被动腿"""
self.passiveLegs.append(leg)

View File

@ -0,0 +1,587 @@
# encoding: UTF-8
import json
import traceback
import shelve
import parser
import re
from vnpy.event import Event
from vnpy.trader.vtFunction import getJsonPath, getTempPath
from vnpy.trader.vtEvent import (EVENT_TICK, EVENT_TRADE, EVENT_POSITION,
EVENT_TIMER, EVENT_ORDER)
from vnpy.trader.vtObject import (VtSubscribeReq, VtOrderReq,
VtCancelOrderReq, VtLogData)
from vnpy.trader.vtConstant import (DIRECTION_LONG, DIRECTION_SHORT,
OFFSET_OPEN, OFFSET_CLOSE,
PRICETYPE_LIMITPRICE)
from .stBase import (StLeg, StSpread, EVENT_SPREADTRADING_TICK,
EVENT_SPREADTRADING_POS, EVENT_SPREADTRADING_LOG,
EVENT_SPREADTRADING_ALGO, EVENT_SPREADTRADING_ALGOLOG)
from .stAlgo import SniperAlgo
########################################################################
class StDataEngine(object):
"""价差数据计算引擎"""
settingFileName = 'ST_setting.json'
settingFilePath = getJsonPath(settingFileName, __file__)
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine):
"""Constructor"""
self.mainEngine = mainEngine
self.eventEngine = eventEngine
# 腿、价差相关字典
self.legDict = {} # vtSymbol:StLeg
self.spreadDict = {} # name:StSpread
self.vtSymbolSpreadDict = {} # vtSymbol:StSpread
self.registerEvent()
#----------------------------------------------------------------------
def loadSetting(self):
"""加载配置"""
try:
with open(self.settingFilePath) as f:
l = json.load(f)
for setting in l:
result, msg = self.createSpread(setting)
self.writeLog(msg)
self.writeLog(u'价差配置加载完成')
except:
content = u'价差配置加载出错,原因:' + traceback.format_exc()
self.writeLog(content)
#----------------------------------------------------------------------
def saveSetting(self):
"""保存配置"""
with open(self.settingFilePath) as f:
pass
#----------------------------------------------------------------------
def createSpread(self, setting):
"""创建价差"""
result = False
msg = ''
# 检查价差重名
if setting['name'] in self.spreadDict:
msg = u'%s价差存在重名' %setting['name']
return result, msg
# 检查腿是否已使用
l = []
l.append(setting['activeLeg']['vtSymbol'])
for d in setting['passiveLegs']:
l.append(d['vtSymbol'])
for vtSymbol in l:
if vtSymbol in self.vtSymbolSpreadDict:
existingSpread = self.vtSymbolSpreadDict[vtSymbol]
msg = u'%s合约已经存在于%s价差中' %(vtSymbol, existingSpread.name)
return result, msg
# 创建价差
spread = StSpread()
spread.name = setting['name']
spread.formula = setting['formula']
formula = spread.formula
if not re.match("[0-9A-Z\/\+\-\*\(\) ].*", formula) :
msg = u'%s价差存在公式问题请重新编写 %s' % (setting['name'] , spread.formula)
return result, msg
try :
spread.code = parser.expr(formula).compile()
except :
msg = u'%s价差存在公式问题请重新编写 %s' % (setting['name'] , spread.formula)
return result, msg
self.spreadDict[spread.name] = spread
# 创建主动腿
activeSetting = setting['activeLeg']
activeLeg = StLeg()
activeLeg.vtSymbol = str(activeSetting['vtSymbol'])
activeLeg.ratio = float(activeSetting['ratio'])
activeLeg.payup = int(activeSetting['payup'])
activeLeg.legname = str(activeSetting['legname'])
spread.addActiveLeg(activeLeg)
self.legDict[activeLeg.vtSymbol] = activeLeg
self.vtSymbolSpreadDict[activeLeg.vtSymbol] = spread
self.subscribeMarketData(activeLeg.vtSymbol)
# 创建被动腿
passiveSettingList = setting['passiveLegs']
passiveLegList = []
for d in passiveSettingList:
passiveLeg = StLeg()
passiveLeg.vtSymbol = str(d['vtSymbol'])
passiveLeg.ratio = float(d['ratio'])
passiveLeg.payup = int(d['payup'])
passiveLeg.legname = str(d['legname'])
spread.addPassiveLeg(passiveLeg)
self.legDict[passiveLeg.vtSymbol] = passiveLeg
self.vtSymbolSpreadDict[passiveLeg.vtSymbol] = spread
self.subscribeMarketData(passiveLeg.vtSymbol)
# 初始化价差
spread.initSpread()
self.putSpreadTickEvent(spread)
self.putSpreadPosEvent(spread)
# 返回结果
result = True
msg = u'%s价差创建成功' %spread.name
return result, msg
#----------------------------------------------------------------------
def processTickEvent(self, event):
"""处理行情推送"""
# 检查行情是否需要处理
tick = event.dict_['data']
if tick.vtSymbol not in self.legDict:
return
# 更新腿价格
leg = self.legDict[tick.vtSymbol]
leg.bidPrice = tick.bidPrice1
leg.askPrice = tick.askPrice1
leg.bidVolume = tick.bidVolume1
leg.askVolume = tick.askVolume1
# 更新价差价格
spread = self.vtSymbolSpreadDict[tick.vtSymbol]
spread.calculatePrice()
# 发出事件
self.putSpreadTickEvent(spread)
#----------------------------------------------------------------------
def putSpreadTickEvent(self, spread):
"""发出价差行情更新事件"""
event1 = Event(EVENT_SPREADTRADING_TICK+spread.name)
event1.dict_['data'] = spread
self.eventEngine.put(event1)
event2 = Event(EVENT_SPREADTRADING_TICK)
event2.dict_['data'] = spread
self.eventEngine.put(event2)
#----------------------------------------------------------------------
def processTradeEvent(self, event):
"""处理成交推送"""
# 检查成交是否需要处理
trade = event.dict_['data']
if trade.vtSymbol not in self.legDict:
return
# 更新腿持仓
leg = self.legDict[trade.vtSymbol]
direction = trade.direction
offset = trade.offset
if direction == DIRECTION_LONG:
if offset == OFFSET_OPEN:
leg.longPos += trade.volume
else:
leg.shortPos -= trade.volume
else:
if offset == OFFSET_OPEN:
leg.shortPos += trade.volume
else:
leg.longPos -= trade.volume
leg.netPos = leg.longPos - leg.shortPos
# 更新价差持仓
spread = self.vtSymbolSpreadDict[trade.vtSymbol]
spread.calculatePos()
# 推送价差持仓更新
event1 = Event(EVENT_SPREADTRADING_POS+spread.name)
event1.dict_['data'] = spread
self.eventEngine.put(event1)
event2 = Event(EVENT_SPREADTRADING_POS)
event2.dict_['data'] = spread
self.eventEngine.put(event2)
#----------------------------------------------------------------------
def processPosEvent(self, event):
"""处理持仓推送"""
# 检查持仓是否需要处理
pos = event.dict_['data']
if pos.vtSymbol not in self.legDict:
return
# 更新腿持仓
leg = self.legDict[pos.vtSymbol]
direction = pos.direction
if direction == DIRECTION_LONG:
leg.longPos = pos.position
else:
leg.shortPos = pos.position
leg.netPos = leg.longPos - leg.shortPos
# 更新价差持仓
spread = self.vtSymbolSpreadDict[pos.vtSymbol]
spread.calculatePos()
# 推送价差持仓更新
self.putSpreadPosEvent(spread)
#----------------------------------------------------------------------
def putSpreadPosEvent(self, spread):
"""发出价差持仓事件"""
event1 = Event(EVENT_SPREADTRADING_POS+spread.name)
event1.dict_['data'] = spread
self.eventEngine.put(event1)
event2 = Event(EVENT_SPREADTRADING_POS)
event2.dict_['data'] = spread
self.eventEngine.put(event2)
#----------------------------------------------------------------------
def registerEvent(self):
""""""
self.eventEngine.register(EVENT_TICK, self.processTickEvent)
self.eventEngine.register(EVENT_TRADE, self.processTradeEvent)
self.eventEngine.register(EVENT_POSITION, self.processPosEvent)
#----------------------------------------------------------------------
def subscribeMarketData(self, vtSymbol):
"""订阅行情"""
contract = self.mainEngine.getContract(vtSymbol)
if not contract:
self.writeLog(u'订阅行情失败,找不到该合约%s' %vtSymbol)
return
req = VtSubscribeReq()
req.symbol = contract.symbol
req.exchange = contract.exchange
self.mainEngine.subscribe(req, contract.gatewayName)
#----------------------------------------------------------------------
def writeLog(self, content):
"""发出日志"""
log = VtLogData()
log.logContent = content
event = Event(EVENT_SPREADTRADING_LOG)
event.dict_['data'] = log
self.eventEngine.put(event)
#----------------------------------------------------------------------
def getAllSpreads(self):
"""获取所有的价差"""
return self.spreadDict.values()
########################################################################
class StAlgoEngine(object):
"""价差算法交易引擎"""
algoFileName = 'SpreadTradingAlgo.vt'
algoFilePath = getTempPath(algoFileName)
#----------------------------------------------------------------------
def __init__(self, dataEngine, mainEngine, eventEngine):
"""Constructor"""
self.dataEngine = dataEngine
self.mainEngine = mainEngine
self.eventEngine = eventEngine
self.algoDict = {} # spreadName:algo
self.vtSymbolAlgoDict = {} # vtSymbol:algo
self.registerEvent()
#----------------------------------------------------------------------
def registerEvent(self):
"""注册事件监听"""
self.eventEngine.register(EVENT_SPREADTRADING_TICK, self.processSpreadTickEvent)
self.eventEngine.register(EVENT_SPREADTRADING_POS, self.processSpreadPosEvent)
self.eventEngine.register(EVENT_TRADE, self.processTradeEvent)
self.eventEngine.register(EVENT_ORDER, self.processOrderEvent)
self.eventEngine.register(EVENT_TIMER, self.processTimerEvent)
#----------------------------------------------------------------------
def processSpreadTickEvent(self, event):
"""处理价差行情事件"""
spread = event.dict_['data']
algo = self.algoDict.get(spread.name, None)
if algo:
algo.updateSpreadTick(spread)
#----------------------------------------------------------------------
def processSpreadPosEvent(self, event):
"""处理价差持仓事件"""
spread = event.dict_['data']
algo = self.algoDict.get(spread.name, None)
if algo:
algo.updateSpreadPos(spread)
#----------------------------------------------------------------------
def processTradeEvent(self, event):
"""处理成交事件"""
trade = event.dict_['data']
algo = self.vtSymbolAlgoDict.get(trade.vtSymbol, None)
if algo:
algo.updateTrade(trade)
#----------------------------------------------------------------------
def processOrderEvent(self, event):
"""处理委托事件"""
order = event.dict_['data']
algo = self.vtSymbolAlgoDict.get(order.vtSymbol, None)
if algo:
algo.updateOrder(order)
#----------------------------------------------------------------------
def processTimerEvent(self, event):
""""""
for algo in self.algoDict.values():
algo.updateTimer()
#----------------------------------------------------------------------
def sendOrder(self, vtSymbol, direction, offset, price, volume, payup=0):
"""发单"""
contract = self.mainEngine.getContract(vtSymbol)
if not contract:
return ''
req = VtOrderReq()
req.symbol = contract.symbol
req.exchange = contract.exchange
req.vtSymbol = contract.vtSymbol
req.direction = direction
req.offset = offset
req.volume = int(volume)
req.priceType = PRICETYPE_LIMITPRICE
if direction == DIRECTION_LONG:
req.price = price + payup * contract.priceTick
else:
req.price = price - payup * contract.priceTick
# 委托转换
reqList = self.mainEngine.convertOrderReq(req)
vtOrderIDList = []
for req in reqList:
vtOrderID = self.mainEngine.sendOrder(req, contract.gatewayName)
vtOrderIDList.append(vtOrderID)
return vtOrderIDList
#----------------------------------------------------------------------
def cancelOrder(self, vtOrderID):
"""撤单"""
order = self.mainEngine.getOrder(vtOrderID)
if not order:
return
req = VtCancelOrderReq()
req.symbol = order.symbol
req.exchange = order.exchange
req.frontID = order.frontID
req.sessionID = order.sessionID
req.orderID = order.orderID
self.mainEngine.cancelOrder(req, order.gatewayName)
#----------------------------------------------------------------------
def buy(self, vtSymbol, price, volume, payup=0):
"""买入"""
l = self.sendOrder(vtSymbol, DIRECTION_LONG, OFFSET_OPEN, price, volume, payup)
return l
#----------------------------------------------------------------------
def sell(self, vtSymbol, price, volume, payup=0):
"""卖出"""
l = self.sendOrder(vtSymbol, DIRECTION_SHORT, OFFSET_CLOSE, price, volume, payup)
return l
#----------------------------------------------------------------------
def short(self, vtSymbol, price, volume, payup=0):
"""卖空"""
l = self.sendOrder(vtSymbol, DIRECTION_SHORT, OFFSET_OPEN, price, volume, payup)
return l
#----------------------------------------------------------------------
def cover(self, vtSymbol, price, volume, payup=0):
"""平空"""
l = self.sendOrder(vtSymbol, DIRECTION_LONG, OFFSET_CLOSE, price, volume, payup)
return l
#----------------------------------------------------------------------
def putAlgoEvent(self, algo):
"""发出算法状态更新事件"""
event = Event(EVENT_SPREADTRADING_ALGO+algo.name)
self.eventEngine.put(event)
#----------------------------------------------------------------------
def writeLog(self, content):
"""输出日志"""
log = VtLogData()
log.logContent = content
event = Event(EVENT_SPREADTRADING_ALGOLOG)
event.dict_['data'] = log
self.eventEngine.put(event)
#----------------------------------------------------------------------
def saveSetting(self):
"""保存算法配置"""
setting = {}
for algo in self.algoDict.values():
setting[algo.spreadName] = algo.getAlgoParams()
f = shelve.open(self.algoFilePath)
f['setting'] = setting
f.close()
#----------------------------------------------------------------------
def loadSetting(self):
"""加载算法配置"""
# 创建算法对象
l = self.dataEngine.getAllSpreads()
for spread in l:
algo = SniperAlgo(self, spread)
self.algoDict[spread.name] = algo
# 保存腿代码和算法对象的映射
for leg in spread.allLegs:
self.vtSymbolAlgoDict[leg.vtSymbol] = algo
# 加载配置
f = shelve.open(self.algoFilePath)
setting = f.get('setting', None)
f.close()
if not setting:
return
for algo in self.algoDict.values():
if algo.spreadName in setting:
d = setting[algo.spreadName]
algo.setAlgoParams(d)
#----------------------------------------------------------------------
def stopAll(self):
"""停止全部算法"""
for algo in self.algoDict.values():
algo.stop()
#----------------------------------------------------------------------
def startAlgo(self, spreadName):
"""启动算法"""
algo = self.algoDict[spreadName]
algoActive = algo.start()
return algoActive
#----------------------------------------------------------------------
def stopAlgo(self, spreadName):
"""停止算法"""
algo = self.algoDict[spreadName]
algoActive = algo.stop()
return algoActive
#----------------------------------------------------------------------
def getAllAlgoParams(self):
"""获取所有算法的参数"""
return [algo.getAlgoParams() for algo in self.algoDict.values()]
#----------------------------------------------------------------------
def setAlgoBuyPrice(self, spreadName, buyPrice):
"""设置算法买开价格"""
algo = self.algoDict[spreadName]
algo.setBuyPrice(buyPrice)
#----------------------------------------------------------------------
def setAlgoSellPrice(self, spreadName, sellPrice):
"""设置算法卖平价格"""
algo = self.algoDict[spreadName]
algo.setSellPrice(sellPrice)
#----------------------------------------------------------------------
def setAlgoShortPrice(self, spreadName, shortPrice):
"""设置算法卖开价格"""
algo = self.algoDict[spreadName]
algo.setShortPrice(shortPrice)
#----------------------------------------------------------------------
def setAlgoCoverPrice(self, spreadName, coverPrice):
"""设置算法买平价格"""
algo = self.algoDict[spreadName]
algo.setCoverPrice(coverPrice)
#----------------------------------------------------------------------
def setAlgoMode(self, spreadName, mode):
"""设置算法工作模式"""
algo = self.algoDict[spreadName]
algo.setMode(mode)
#----------------------------------------------------------------------
def setAlgoMaxOrderSize(self, spreadName, maxOrderSize):
"""设置算法单笔委托限制"""
algo = self.algoDict[spreadName]
algo.setMaxOrderSize(maxOrderSize)
#----------------------------------------------------------------------
def setAlgoMaxPosSize(self, spreadName, maxPosSize):
"""设置算法持仓限制"""
algo = self.algoDict[spreadName]
algo.setMaxPosSize(maxPosSize)
########################################################################
class StEngine(object):
"""价差引擎"""
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine):
"""Constructor"""
self.mainEngine = mainEngine
self.eventEngine = eventEngine
self.dataEngine = StDataEngine(mainEngine, eventEngine)
self.algoEngine = StAlgoEngine(self.dataEngine, mainEngine, eventEngine)
#----------------------------------------------------------------------
def init(self):
"""初始化"""
self.dataEngine.loadSetting()
self.algoEngine.loadSetting()
#----------------------------------------------------------------------
def stop(self):
"""停止"""
self.dataEngine.saveSetting()
self.algoEngine.stopAll()
self.algoEngine.saveSetting()

View File

@ -0,0 +1,19 @@
[
{
"name": "double ema",
"className": "DoubleMaStrategy",
"vtSymbol": "rb1805"
},
{
"name": "atr rsi",
"className": "AtrRsiStrategy",
"vtSymbol": "IC1802"
},
{
"name": "king keltner",
"className": "KkStrategy",
"vtSymbol": "IH1802"
}
]

View File

@ -37,8 +37,13 @@ def main():
me = MainEngine(ee)
# 安全退出机制
def shutdown(signal, frame):
le.info(u'安全关闭进程')
me.exit()
sys.exit()
for sig in [signal.SIGINT, signal.SIGHUP, signal.SIGTERM]:
signal.signal(sig, me.exit)
signal.signal(sig, shutdown)
# 添加交易接口
me.addGateway(ctpGateway)