vnpy/vn.trader/ctaEngine.py

505 lines
19 KiB
Python
Raw Normal View History

2015-10-19 08:42:17 +00:00
# encoding: UTF-8
from datetime import datetime
import json
from collections import OrderedDict
from eventEngine import *
from vtConstant import *
2015-11-13 03:39:26 +00:00
from vtGateway import VtSubscribeReq, VtOrderReq, VtCancelOrderReq, VtLogData
2015-10-19 08:42:17 +00:00
from ctaConstant import *
from ctaObject import *
2015-10-19 08:42:17 +00:00
from ctaStrategies import strategyClassDict
########################################################################
class CtaEngine(object):
"""CTA策略引擎"""
2015-11-15 07:53:54 +00:00
# 策略配置文件
2015-10-19 08:42:17 +00:00
settingFileName = 'CTA_setting.json'
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
2015-10-19 08:42:17 +00:00
def __init__(self, mainEngine, eventEngine, dataEngine):
"""Constructor"""
self.mainEngine = mainEngine
self.eventEngine = eventEngine
self.dataEngine = dataEngine
# 保存策略对象的字典
# key为策略名称value为策略对象注意策略名称不允许重复
self.strategyDict = {}
# 保存vtSymbol和策略对象映射的字典用于推送tick数据
# 由于可能多个strategy交易同一个vtSymbol因此key为vtSymbol
# value为包含所有相关strategy对象的list
self.tickStrategyDict = {}
# 保存vtOrderID和strategy对象映射的字典用于推送order和trade数据
# key为vtOrderIDvalue为strategy对象
self.orderStrategyDict = {}
# 本地停止单编号计数
self.stopOrderCount = 0
# stopOrderID = STOPORDERPREFIX + str(stopOrderCount)
# 本地停止单字典
# key为stopOrderIDvalue为stopOrder对象
self.stopOrderDict = {} # 停止单撤销后不会从本字典中删除
self.workingStopOrderDict = {} # 停止单撤销后会从本字典中删除
2015-11-13 03:39:26 +00:00
# 注册事件监听
self.registerEvent()
2015-11-13 03:39:26 +00:00
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
2015-10-19 08:42:17 +00:00
def sendOrder(self, vtSymbol, orderType, price, volume, strategy):
"""发单"""
2015-11-15 07:53:54 +00:00
# 1.查询合约对象
2015-10-19 08:42:17 +00:00
contract = self.dataEngine.getContract(vtSymbol)
2015-11-15 07:53:54 +00:00
# 2. 创建发单传入对象
2015-10-19 08:42:17 +00:00
req = VtOrderReq()
2015-11-15 07:53:54 +00:00
req.symbol = contract.symbol # 合约代码
req.exchange = contract.exchange # 交易所
req.price = price # 价格
req.volume = volume # 数量
2015-10-19 08:42:17 +00:00
# 设计为CTA引擎发出的委托只允许使用限价单
2015-11-15 07:53:54 +00:00
req.priceType = PRICETYPE_LIMITPRICE # 价格类型
2015-10-19 08:42:17 +00:00
# CTA委托类型映射
if orderType == CTAORDER_BUY:
2015-11-15 07:53:54 +00:00
req.direction = DIRECTION_LONG # 合约方向
req.offset = OFFSET_OPEN # 开/平
2015-10-19 08:42:17 +00:00
elif orderType == CTAORDER_SELL:
req.direction = DIRECTION_SHORT
req.offset = OFFSET_CLOSE
elif orderType == CTAORDER_SHORT:
req.direction = DIRECTION_SHORT
req.offset = OFFSET_OPEN
elif orderType == CTAORDER_COVER:
req.direction = DIRECTION_LONG
req.offset = OFFSET_CLOSE
2015-11-15 07:53:54 +00:00
2015-11-15 07:53:54 +00:00
# 3.调用主引擎的发单接口进行发单保存OrderID与策略映射关系
2015-11-20 07:25:58 +00:00
vtOrderID = self.mainEngine.sendOrder(req, contract.gatewayName) # 发单
2015-10-19 08:42:17 +00:00
self.orderDict[vtOrderID] = strategy # 保存vtOrderID和策略的映射关系
2015-11-15 07:53:54 +00:00
2015-10-19 08:42:17 +00:00
return vtOrderID
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
2015-10-19 08:42:17 +00:00
def cancelOrder(self, vtOrderID):
"""撤单"""
2015-11-15 07:53:54 +00:00
# 1.调用主引擎接口,查询委托单对象
2015-10-19 08:42:17 +00:00
order = self.dataEngine.getOrder(vtOrderID)
# 如果查询成功
if order:
2015-11-15 07:53:54 +00:00
# 2.检查是否报单(委托单)还有效,只有有效时才发出撤单指令
orderFinished = (order.status == STATUS_ALLTRADED or order.status == STATUS_CANCELLED)
2015-10-19 08:42:17 +00:00
if not orderFinished:
req = VtCancelOrderReq()
req.symbol = order.symbol
req.exchange = order.exchange
req.frontID = order.frontID
req.sessionID = order.sessionID
req.orderID = order.orderID
2015-11-15 07:53:54 +00:00
self.mainEngine.cancelOrder(req, order.gatewayName)
else:
if order.status == STATUS_ALLTRADED:
self.writeCtaLog(u'委托单({0}已执行,无法撤销'.format(vtOrderID))
if order.status == STATUS_CANCELLED:
self.writeCtaLog(u'委托单({0}已撤销,无法再次撤销'.format(vtOrderID))
# 查询不成功
else:
self.writeCtaLog(u'委托单({0}不存在'.format(vtOrderID))
# ----------------------------------------------------------------------
2015-10-19 08:42:17 +00:00
def sendStopOrder(self, vtSymbol, orderType, price, volume, strategy):
"""发停止单(本地实现)"""
2015-11-15 07:53:54 +00:00
# 1.生成本地停止单ID
2015-10-19 08:42:17 +00:00
self.stopOrderCount += 1
stopOrderID = STOPORDERPREFIX + str(self.stopOrderCount)
2015-11-15 07:53:54 +00:00
# 2.创建停止单对象
2015-10-19 08:42:17 +00:00
so = StopOrder()
2015-11-15 07:53:54 +00:00
so.vtSymbol = vtSymbol # 代码
so.orderType = orderType # 停止单类型
so.price = price # 价格
so.volume = volume # 数量
so.strategy = strategy # 来源策略
so.stopOrderID = stopOrderID # Id
so.status = STOPORDER_WAITING # 状态
# 3.保存stopOrder对象到字典中
self.stopOrderDict[stopOrderID] = so # 字典中不会删除
self.workingStopOrderDict[stopOrderID] = so # 字典中会删除
self.writeCtaLog(u'发停止单成功,'
u'Id:{0},Symbol:{1},Type:{2},Price:{3},Volume:{4}'
u'.'.format(stopOrderID, vtSymbol, orderType, price, volume))
2015-10-19 08:42:17 +00:00
return stopOrderID
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
2015-10-19 08:42:17 +00:00
def cancelStopOrder(self, stopOrderID):
"""撤销停止单"""
2015-11-15 07:53:54 +00:00
# 1.检查停止单是否存在
2015-10-19 08:42:17 +00:00
if stopOrderID in self.workingStopOrderDict:
so = self.workingStopOrderDict[stopOrderID]
2015-11-15 07:53:54 +00:00
so.status = STOPORDER_CANCELLED # STOPORDER_WAITING =》STOPORDER_CANCELLED
del self.workingStopOrderDict[stopOrderID] # 删除
self.writeCtaLog(u'撤销停止单:{0}成功.'.format(stopOrderID))
else:
self.writeCtaLog(u'撤销停止单:{0}失败不存在Id.'.format(stopOrderID))
2015-10-19 08:42:17 +00:00
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
2015-10-19 08:42:17 +00:00
def processStopOrder(self, tick):
"""收到行情后处理本地停止单(检查是否要立即发出)"""
vtSymbol = tick.vtSymbol
2015-11-15 07:53:54 +00:00
# 1.首先检查是否有策略交易该合约
2015-10-19 08:42:17 +00:00
if vtSymbol in self.tickStrategyDict:
2015-11-15 07:53:54 +00:00
# 2.遍历等待中的停止单,检查是否会被触发
2015-10-19 08:42:17 +00:00
for so in self.workingStopOrderDict.values():
2015-11-15 07:53:54 +00:00
2015-10-19 08:42:17 +00:00
if so.vtSymbol == vtSymbol:
2015-11-15 07:53:54 +00:00
# 3. 触发标识判断
longTriggered = so.direction == DIRECTION_LONG and tick.lastPrice >= so.price # 多头停止单被触发
shortTriggered = so.direction == DIRECTION_SHORT and tick.lasatPrice <= so.price # 空头停止单被触发
# 4.触发处理
2015-10-19 08:42:17 +00:00
if longTriggered or shortTriggered:
2015-11-15 07:53:54 +00:00
# 5.设定价格,买入和卖出分别以涨停跌停价发单(模拟市价单)
if so.direction == DIRECTION_LONG:
2015-10-19 08:42:17 +00:00
price = tick.upperLimit
else:
price = tick.lowerLimit
2015-11-15 07:53:54 +00:00
# 6.更新停止单状态,触发
2015-10-19 08:42:17 +00:00
so.status = STOPORDER_TRIGGERED
2015-11-15 07:53:54 +00:00
# 7.发单
2015-10-19 08:42:17 +00:00
self.sendOrder(so.vtSymbol, so.orderType, price, so.volume, so.strategy)
2015-11-15 07:53:54 +00:00
# 8.删除停止单
2015-10-19 08:42:17 +00:00
del self.workingStopOrderDict[so.stopOrderID]
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
2015-11-13 03:39:26 +00:00
def procecssTickEvent(self, event):
2015-11-15 07:53:54 +00:00
"""处理行情推送事件"""
# 1. 获取事件的Tick数据
tick = event.dict_['data']
2015-11-13 03:39:26 +00:00
2015-11-15 07:53:54 +00:00
# 2.收到tick行情后优先处理本地停止单检查是否要立即发出
2015-10-19 08:42:17 +00:00
self.processStopOrder(tick)
2015-11-15 07:53:54 +00:00
# 3.推送tick到对应的策略对象进行处理
2015-10-19 08:42:17 +00:00
if tick.vtSymbol in self.tickStrategyDict:
2015-11-15 07:53:54 +00:00
# 4.将vtTickData数据转化为ctaTickData
2015-10-19 08:42:17 +00:00
ctaTick = CtaTickData()
d = ctaTick.__dict__
for key in d.keys():
2015-11-13 03:39:26 +00:00
if key != 'datetime':
d[key] = tick.__getattribute__(key)
2015-11-13 03:39:26 +00:00
2015-11-15 07:53:54 +00:00
# 5.添加datetime字段
2015-10-19 08:42:17 +00:00
ctaTick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')
l = self.tickStrategyDict[tick.vtSymbol]
for strategy in l:
strategy.onTick(ctaTick)
2015-10-19 08:42:17 +00:00
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
2015-11-13 03:39:26 +00:00
def processOrderEvent(self, event):
2015-11-15 07:53:54 +00:00
"""处理委托推送事件"""
# 1.获取事件的Order数据
order = event.dict_['data']
2015-11-13 03:39:26 +00:00
2015-11-15 07:53:54 +00:00
# 2.判断order是否在映射字典中
2015-10-19 08:42:17 +00:00
if order.vtOrderID in self.orderStrategyDict:
2015-11-15 07:53:54 +00:00
# 3.提取对应的策略
strategy = self.orderStrategyDict[order.vtOrderID]
# 4.触发策略的委托推送事件方法
2015-10-19 08:42:17 +00:00
strategy.onOrder(order)
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
2015-11-13 03:39:26 +00:00
def processTradeEvent(self, event):
2015-11-15 07:53:54 +00:00
"""处理成交推送事件"""
# 1.获取事件的Trade数据
trade = event.dict_['data']
2015-11-13 03:39:26 +00:00
2015-11-15 07:53:54 +00:00
# 2.判断Trade是否在映射字典中
2015-10-19 08:42:17 +00:00
if trade.vtOrderID in self.orderStrategyDict:
2015-11-15 07:53:54 +00:00
# 3.提取对应的策略
strategy = self.orderStrategyDict[trade.vtOrderID]
# 4.触发策略的成交推送事件。
2015-10-19 08:42:17 +00:00
strategy.onTrade(trade)
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
2015-10-19 08:42:17 +00:00
def registerEvent(self):
"""注册事件监听"""
2015-11-13 03:39:26 +00:00
2015-11-15 07:53:54 +00:00
# 注册行情数据推送Tick数据到达的响应事件
self.eventEngine.register(EVENT_TICK, self.procecssTickEvent)
2015-11-15 07:53:54 +00:00
# 注册订单推送的响应事件
self.eventEngine.register(EVENT_ORDER, self.processOrderEvent)
2015-11-15 07:53:54 +00:00
# 注册成交推送(交易)的响应时间
self.eventEngine.register(EVENT_TRADE, self.processTradeEvent)
2015-11-13 03:39:26 +00:00
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
2015-10-19 08:42:17 +00:00
def insertData(self, dbName, collectionName, data):
"""插入数据到数据库这里的data可以是CtaTickData或者CtaBarData"""
self.mainEngine.dbInsert(dbName, collectionName, data.__dict__)
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
2015-10-19 08:42:17 +00:00
def loadBar(self, dbName, collectionName, startDate):
2015-11-15 07:53:54 +00:00
"""从数据库中读取Bar数据
startDate是datetime对象"""
2015-10-19 08:42:17 +00:00
d = {'datetime':{'$gte':startDate}}
cursor = self.mainEngine.dbQuery(dbName, collectionName, d)
l = []
for d in cursor:
bar = CtaBarData()
bar.__dict__ = d
l.append(bar)
return l
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
2015-10-19 08:42:17 +00:00
def loadTick(self, dbName, collectionName, startDate):
"""从数据库中读取Tick数据startDate是datetime对象"""
d = {'datetime':{'$gte':startDate}}
cursor = self.mainEngine.dbQuery(dbName, collectionName, d)
l = []
for d in cursor:
tick = CtaTickData()
tick.__dict__ = d
l.append(tick)
return l
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
2015-10-19 08:42:17 +00:00
def getToday(self):
"""获取代表今日的datetime对象"""
today = datetime.today()
today = today.replace(hour=0, minute=0, second=0, microsecond=0)
return today
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
2015-10-19 08:42:17 +00:00
def writeCtaLog(self, content):
"""快速发出CTA模块日志事件"""
log = VtLogData()
log.logContent = content
event = Event(type_=EVENT_CTA_LOG)
event.dict_['data'] = log
self.eventEngine.put(event)
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
2015-10-19 08:42:17 +00:00
def initStrategy(self, name, strategyClass, paramDict=None):
"""初始化策略"""
# 防止策略重名
if name not in self.strategyDict:
2015-11-13 03:39:26 +00:00
2015-11-15 07:53:54 +00:00
# 1.创建策略对象
strategy = strategyClass(self, name, paramDict)
self.strategyDict[name] = strategy
2015-11-15 07:53:54 +00:00
# 2.保存Tick映射关系symbol <==> Strategy[] )
if strategy.vtSymbol in self.tickStrategyDict:
l = self.tickStrategyDict[strategy.vtSymbol]
else:
l = []
self.tickStrategyDict[strategy.vtSymbol] = l
l.append(strategy)
2015-11-15 07:53:54 +00:00
# 3.订阅合约
contract = self.dataEngine.getContract(strategy.vtSymbol)
if contract:
2015-11-15 07:53:54 +00:00
# 4.构造订阅请求包
req = VtSubscribeReq()
req.symbol = contract.symbol
req.exchange = contract.exchange
2015-11-15 07:53:54 +00:00
# 5.调用主引擎的订阅接口
self.mainEngine.subscribe(req, contract.gatewayName)
else:
self.writeCtaLog(u'%s的交易合约%s无法找到' %(name, strategy.vtSymbol))
2015-10-19 08:42:17 +00:00
else:
self.writeCtaLog(u'存在策略对象重名:' + name)
2015-11-15 07:53:54 +00:00
# ---------------------------------------------------------------------
2015-10-19 08:42:17 +00:00
def startStrategy(self, name):
"""启动策略"""
2015-11-15 07:53:54 +00:00
# 1.判断策略名称是否存在字典中
2015-10-19 08:42:17 +00:00
if name in self.strategyDict:
2015-11-15 07:53:54 +00:00
# 2.提取策略
2015-10-19 08:42:17 +00:00
strategy = self.strategyDict[name]
2015-11-15 07:53:54 +00:00
# 3.判断策略是否运行
2015-10-19 08:42:17 +00:00
if not strategy.trading:
2015-11-15 07:53:54 +00:00
# 4.设置运行状态
2015-10-19 08:42:17 +00:00
strategy.trading = True
2015-11-15 07:53:54 +00:00
# 5.启动策略
2015-10-19 08:42:17 +00:00
strategy.start()
else:
self.writeCtaLog(u'策略对象不存在:' + name)
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
2015-10-19 08:42:17 +00:00
def stopStrategy(self, name):
2015-11-15 07:53:54 +00:00
"""停止策略运行"""
# 1.判断策略名称是否存在字典中
2015-10-19 08:42:17 +00:00
if name in self.strategyDict:
2015-11-15 07:53:54 +00:00
# 2.提取策略
2015-10-19 08:42:17 +00:00
strategy = self.strategyDict[name]
2015-11-15 07:53:54 +00:00
# 3.停止交易
2015-10-19 08:42:17 +00:00
if strategy.trading:
2015-11-15 07:53:54 +00:00
# 4.设置交易状态为False
2015-10-19 08:42:17 +00:00
strategy.trading = False
2015-11-15 07:53:54 +00:00
# 5.调用策略的停止方法
2015-10-19 08:42:17 +00:00
strategy.stop()
2015-11-15 07:53:54 +00:00
# 6.对该策略发出的所有限价单进行撤单
2015-10-19 08:42:17 +00:00
for vtOrderID, s in self.orderStrategyDict.items():
if s is strategy:
self.cancelOrder(vtOrderID)
2015-11-15 07:53:54 +00:00
# 7.对该策略发出的所有本地停止单撤单
2015-10-19 08:42:17 +00:00
for stopOrderID, so in self.workingStopOrderDict.items():
if so.strategy is strategy:
self.cancelStopOrder(stopOrderID)
else:
self.writeCtaLog(u'策略对象不存在:' + name)
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
2015-10-19 08:42:17 +00:00
def saveStrategySetting(self):
"""保存引擎中的策略配置"""
with open(self.settingFileName, 'w') as f:
d = {}
2015-11-15 07:53:54 +00:00
# 逐一循环name策略实例名称strategy策略
2015-10-19 08:42:17 +00:00
for name, strategy in self.strategyDict.items():
2015-11-15 07:53:54 +00:00
# 配置串
2015-10-19 08:42:17 +00:00
setting = {}
2015-11-15 07:53:54 +00:00
# 策略的类名称
2015-10-19 08:42:17 +00:00
setting['strategyClassName'] = strategy.strategyClassName
2015-11-15 07:53:54 +00:00
# 策略的参数
2015-10-19 08:42:17 +00:00
for param in strategy.paramList:
setting[param] = strategy.__getattribute__(param)
2015-11-15 07:53:54 +00:00
# 策略实例名,保存配置
2015-10-19 08:42:17 +00:00
d[name] = setting
2015-11-15 07:53:54 +00:00
# Json对象=》Json字符串
2015-10-19 08:42:17 +00:00
jsonD = json.dumps(d, indent=4)
2015-11-15 07:53:54 +00:00
# 保存文件
2015-10-19 08:42:17 +00:00
f.write(jsonD)
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
2015-10-19 08:42:17 +00:00
def loadStrategySetting(self):
"""读取引擎中的策略配置"""
with open(self.settingFileName) as f:
2015-11-15 07:53:54 +00:00
# 读取文件内容串=》Json对象
2015-10-19 08:42:17 +00:00
d = json.load(f)
2015-11-15 07:53:54 +00:00
# 策略实例名称,配置内容
2015-10-19 08:42:17 +00:00
for name, setting in d.items():
2015-11-15 07:53:54 +00:00
# 策略的类名称
2015-10-19 08:42:17 +00:00
strategyClassName = setting['strategyClassName']
2015-11-15 07:53:54 +00:00
2015-10-19 08:42:17 +00:00
if strategyClassName in strategyClassDict:
2015-11-15 07:53:54 +00:00
# 透过策略类字典,反射获取策略
2015-10-19 08:42:17 +00:00
strategyClass = strategyClassDict[strategyClassName]
2015-11-15 07:53:54 +00:00
# 初始化策略的设置
2015-10-19 08:42:17 +00:00
self.initStrategy(name, strategyClass, setting)
else:
self.writeCtaLog(u'无法找到策略类:' + strategyClassName)
break
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
def getStrategyVar(self, name):
2015-10-19 08:42:17 +00:00
"""获取策略当前的变量字典"""
if name in self.strategyDict:
2015-11-15 07:53:54 +00:00
# 获取策略实例
2015-10-19 08:42:17 +00:00
strategy = self.strategyDict[name]
2015-11-15 07:53:54 +00:00
# 策略实例的所有属性
2015-10-19 08:42:17 +00:00
d = strategy.__dict__
2015-11-15 07:53:54 +00:00
# 变量字典
2015-10-19 08:42:17 +00:00
varDict = OrderedDict()
2015-11-15 07:53:54 +00:00
# 策略中的变量名称列表
2015-10-19 08:42:17 +00:00
for key in strategy.varList:
2015-11-15 07:53:54 +00:00
# 如果匹配就增加
2015-10-19 08:42:17 +00:00
if key in d:
varDict[key] = d[key]
return varDict
else:
self.writeCtaLog(u'策略对象不存在:' + name)
return None
2015-11-15 07:53:54 +00:00
# ----------------------------------------------------------------------
def getStrategyParam(self, name):
2015-10-19 08:42:17 +00:00
"""获取策略的参数字典"""
if name in self.strategyDict:
2015-11-15 07:53:54 +00:00
# 获取策略实例
2015-10-19 08:42:17 +00:00
strategy = self.strategyDict[name]
2015-11-15 07:53:54 +00:00
# 策略实例的所有属性
2015-10-19 08:42:17 +00:00
d = strategy.__dict__
2015-11-13 03:39:26 +00:00
2015-11-15 07:53:54 +00:00
# 参数字典
paramDict = OrderedDict()
2015-11-15 07:53:54 +00:00
# 策略内的参数名称列表
for key in strategy.paramList:
2015-11-15 07:53:54 +00:00
# 匹配的就添加
if key in d:
paramDict[key] = d[key]
return paramDict
2015-11-13 03:39:26 +00:00
2015-10-19 08:42:17 +00:00
else:
self.writeCtaLog(u'策略对象不存在:' + name)
return None