diff --git a/examples/VnTrader/ctaStrategy123/CTA_setting.json b/examples/VnTrader/ctaStrategy123/CTA_setting.json deleted file mode 100644 index 19884be3..00000000 --- a/examples/VnTrader/ctaStrategy123/CTA_setting.json +++ /dev/null @@ -1,19 +0,0 @@ -[ - { - "name": "double ema", - "className": "EmaDemoStrategy", - "vtSymbol": "IF1706" - }, - - { - "name": "atr rsi", - "className": "AtrRsiStrategy", - "vtSymbol": "IC1706" - }, - - { - "name": "king keltner", - "className": "KkStrategy", - "vtSymbol": "IH1706" - } -] \ No newline at end of file diff --git a/examples/VnTrader/ctaStrategy123/__init__.py b/examples/VnTrader/ctaStrategy123/__init__.py deleted file mode 100644 index 7c1dbf6a..00000000 --- a/examples/VnTrader/ctaStrategy123/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -# encoding: UTF-8 - -from ctaEngine import CtaEngine -from uiCtaWidget import CtaEngineManager - -appName = 'CtaStrategy' -appDisplayName = u'CTA策略' -appEngine = CtaEngine -appWidget = CtaEngineManager -appIco = 'cta.ico' \ No newline at end of file diff --git a/examples/VnTrader/ctaStrategy123/cta.ico b/examples/VnTrader/ctaStrategy123/cta.ico deleted file mode 100644 index 25cbaa73..00000000 Binary files a/examples/VnTrader/ctaStrategy123/cta.ico and /dev/null differ diff --git a/examples/VnTrader/ctaStrategy123/ctaBacktesting.py b/examples/VnTrader/ctaStrategy123/ctaBacktesting.py deleted file mode 100644 index 0f159152..00000000 --- a/examples/VnTrader/ctaStrategy123/ctaBacktesting.py +++ /dev/null @@ -1,1267 +0,0 @@ -# encoding: UTF-8 - -''' -本文件中包含的是CTA模块的回测引擎,回测引擎的API和CTA引擎一致, -可以使用和实盘相同的代码进行回测。 -''' -from __future__ import division - -from datetime import datetime, timedelta -from collections import OrderedDict -from itertools import product -import multiprocessing -import copy - -import pymongo -import pandas as pd -import numpy as np -import matplotlib.pyplot as plt - -# 如果安装了seaborn则设置为白色风格 -try: - import seaborn as sns - sns.set_style('whitegrid') -except ImportError: - pass - -from vnpy.trader.vtGlobal import globalSetting -from vnpy.trader.vtObject import VtTickData, VtBarData -from vnpy.trader.vtConstant import * -from vnpy.trader.vtGateway import VtOrderData, VtTradeData - -from .ctaBase import * - - -######################################################################## -class BacktestingEngine(object): - """ - CTA回测引擎 - 函数接口和策略引擎保持一样, - 从而实现同一套代码从回测到实盘。 - """ - - TICK_MODE = 'tick' - BAR_MODE = 'bar' - - #---------------------------------------------------------------------- - def __init__(self): - """Constructor""" - # 本地停止单 - self.stopOrderCount = 0 # 编号计数:stopOrderID = STOPORDERPREFIX + str(stopOrderCount) - - # 本地停止单字典, key为stopOrderID,value为stopOrder对象 - self.stopOrderDict = {} # 停止单撤销后不会从本字典中删除 - self.workingStopOrderDict = {} # 停止单撤销后会从本字典中删除 - - self.engineType = ENGINETYPE_BACKTESTING # 引擎类型为回测 - - self.strategy = None # 回测策略 - self.mode = self.BAR_MODE # 回测模式,默认为K线 - - self.startDate = '' - self.initDays = 0 - self.endDate = '' - - self.capital = 1000000 # 回测时的起始本金(默认100万) - self.slippage = 0 # 回测时假设的滑点 - self.rate = 0 # 回测时假设的佣金比例(适用于百分比佣金) - self.size = 1 # 合约大小,默认为1 - self.priceTick = 0 # 价格最小变动 - - self.dbClient = None # 数据库客户端 - self.dbCursor = None # 数据库指针 - - self.initData = [] # 初始化用的数据 - self.dbName = '' # 回测数据库名 - self.symbol = '' # 回测集合名 - - self.dataStartDate = None # 回测数据开始日期,datetime对象 - self.dataEndDate = None # 回测数据结束日期,datetime对象 - self.strategyStartDate = None # 策略启动日期(即前面的数据用于初始化),datetime对象 - - self.limitOrderCount = 0 # 限价单编号 - self.limitOrderDict = OrderedDict() # 限价单字典 - self.workingLimitOrderDict = OrderedDict() # 活动限价单字典,用于进行撮合用 - - self.tradeCount = 0 # 成交编号 - self.tradeDict = OrderedDict() # 成交字典 - - self.logList = [] # 日志记录 - - # 当前最新数据,用于模拟成交用 - self.tick = None - self.bar = None - self.dt = None # 最新的时间 - - # 日线回测结果计算用 - self.dailyResultDict = OrderedDict() - - #------------------------------------------------ - # 通用功能 - #------------------------------------------------ - - #---------------------------------------------------------------------- - def roundToPriceTick(self, price): - """取整价格到合约最小价格变动""" - if not self.priceTick: - return price - - newPrice = round(price/self.priceTick, 0) * self.priceTick - return newPrice - - #---------------------------------------------------------------------- - def output(self, content): - """输出内容""" - print str(datetime.now()) + "\t" + content - - #------------------------------------------------ - # 参数设置相关 - #------------------------------------------------ - - #---------------------------------------------------------------------- - def setStartDate(self, startDate='20100416', initDays=10): - """设置回测的启动日期""" - self.startDate = startDate - self.initDays = initDays - - self.dataStartDate = datetime.strptime(startDate, '%Y%m%d') - - initTimeDelta = timedelta(initDays) - self.strategyStartDate = self.dataStartDate + initTimeDelta - - #---------------------------------------------------------------------- - def setEndDate(self, endDate=''): - """设置回测的结束日期""" - self.endDate = endDate - - if endDate: - self.dataEndDate = datetime.strptime(endDate, '%Y%m%d') - - # 若不修改时间则会导致不包含dataEndDate当天数据 - self.dataEndDate = self.dataEndDate.replace(hour=23, minute=59) - - #---------------------------------------------------------------------- - def setBacktestingMode(self, mode): - """设置回测模式""" - self.mode = mode - - #---------------------------------------------------------------------- - def setDatabase(self, dbName, symbol): - """设置历史数据所用的数据库""" - self.dbName = dbName - self.symbol = symbol - - #---------------------------------------------------------------------- - def setCapital(self, capital): - """设置资本金""" - self.capital = capital - - #---------------------------------------------------------------------- - def setSlippage(self, slippage): - """设置滑点点数""" - self.slippage = slippage - - #---------------------------------------------------------------------- - def setSize(self, size): - """设置合约大小""" - self.size = size - - #---------------------------------------------------------------------- - def setRate(self, rate): - """设置佣金比例""" - self.rate = rate - - #---------------------------------------------------------------------- - def setPriceTick(self, priceTick): - """设置价格最小变动""" - self.priceTick = priceTick - - #------------------------------------------------ - # 数据回放相关 - #------------------------------------------------ - - #---------------------------------------------------------------------- - def loadHistoryData(self): - """载入历史数据""" - self.dbClient = pymongo.MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort']) - collection = self.dbClient[self.dbName][self.symbol] - - self.output(u'开始载入数据') - - # 首先根据回测模式,确认要使用的数据类 - if self.mode == self.BAR_MODE: - dataClass = VtBarData - func = self.newBar - else: - dataClass = VtTickData - func = self.newTick - - # 载入初始化需要用的数据 - flt = {'datetime':{'$gte':self.dataStartDate, - '$lt':self.strategyStartDate}} - initCursor = collection.find(flt).sort('datetime') - - # 将数据从查询指针中读取出,并生成列表 - self.initData = [] # 清空initData列表 - for d in initCursor: - data = dataClass() - data.__dict__ = d - self.initData.append(data) - - # 载入回测数据 - if not self.dataEndDate: - flt = {'datetime':{'$gte':self.strategyStartDate}} # 数据过滤条件 - else: - flt = {'datetime':{'$gte':self.strategyStartDate, - '$lte':self.dataEndDate}} - self.dbCursor = collection.find(flt).sort('datetime') - - self.output(u'载入完成,数据量:%s' %(initCursor.count() + self.dbCursor.count())) - - #---------------------------------------------------------------------- - def runBacktesting(self): - """运行回测""" - # 载入历史数据 - self.loadHistoryData() - - # 首先根据回测模式,确认要使用的数据类 - if self.mode == self.BAR_MODE: - dataClass = VtBarData - func = self.newBar - else: - dataClass = VtTickData - func = self.newTick - - self.output(u'开始回测') - - self.strategy.inited = True - self.strategy.onInit() - self.output(u'策略初始化完成') - - self.strategy.trading = True - self.strategy.onStart() - self.output(u'策略启动完成') - - self.output(u'开始回放数据') - - for d in self.dbCursor: - data = dataClass() - data.__dict__ = d - func(data) - - self.output(u'数据回放结束') - - #---------------------------------------------------------------------- - def newBar(self, bar): - """新的K线""" - self.bar = bar - self.dt = bar.datetime - - self.crossLimitOrder() # 先撮合限价单 - self.crossStopOrder() # 再撮合停止单 - self.strategy.onBar(bar) # 推送K线到策略中 - - self.updateDailyClose(bar.datetime, bar.close) - - #---------------------------------------------------------------------- - def newTick(self, tick): - """新的Tick""" - self.tick = tick - self.dt = tick.datetime - - self.crossLimitOrder() - self.crossStopOrder() - self.strategy.onTick(tick) - - self.updateDailyClose(tick.datetime, tick.lastPrice) - - #---------------------------------------------------------------------- - def initStrategy(self, strategyClass, setting=None): - """ - 初始化策略 - setting是策略的参数设置,如果使用类中写好的默认设置则可以不传该参数 - """ - self.strategy = strategyClass(self, setting) - self.strategy.name = self.strategy.className - - #---------------------------------------------------------------------- - def crossLimitOrder(self): - """基于最新数据撮合限价单""" - # 先确定会撮合成交的价格 - if self.mode == self.BAR_MODE: - buyCrossPrice = self.bar.low # 若买入方向限价单价格高于该价格,则会成交 - sellCrossPrice = self.bar.high # 若卖出方向限价单价格低于该价格,则会成交 - buyBestCrossPrice = self.bar.open # 在当前时间点前发出的买入委托可能的最优成交价 - sellBestCrossPrice = self.bar.open # 在当前时间点前发出的卖出委托可能的最优成交价 - else: - buyCrossPrice = self.tick.askPrice1 - sellCrossPrice = self.tick.bidPrice1 - buyBestCrossPrice = self.tick.askPrice1 - sellBestCrossPrice = self.tick.bidPrice1 - - # 遍历限价单字典中的所有限价单 - for orderID, order in self.workingLimitOrderDict.items(): - # 推送委托进入队列(未成交)的状态更新 - if not order.status: - order.status = STATUS_NOTTRADED - self.strategy.onOrder(order) - - # 判断是否会成交 - buyCross = (order.direction==DIRECTION_LONG and - order.price>=buyCrossPrice and - buyCrossPrice > 0) # 国内的tick行情在涨停时askPrice1为0,此时买无法成交 - - sellCross = (order.direction==DIRECTION_SHORT and - order.price<=sellCrossPrice and - sellCrossPrice > 0) # 国内的tick行情在跌停时bidPrice1为0,此时卖无法成交 - - # 如果发生了成交 - if buyCross or sellCross: - # 推送成交数据 - self.tradeCount += 1 # 成交编号自增1 - tradeID = str(self.tradeCount) - trade = VtTradeData() - trade.vtSymbol = order.vtSymbol - trade.tradeID = tradeID - trade.vtTradeID = tradeID - trade.orderID = order.orderID - trade.vtOrderID = order.orderID - trade.direction = order.direction - trade.offset = order.offset - - # 以买入为例: - # 1. 假设当根K线的OHLC分别为:100, 125, 90, 110 - # 2. 假设在上一根K线结束(也是当前K线开始)的时刻,策略发出的委托为限价105 - # 3. 则在实际中的成交价会是100而不是105,因为委托发出时市场的最优价格是100 - if buyCross: - trade.price = min(order.price, buyBestCrossPrice) - self.strategy.pos += order.totalVolume - else: - trade.price = max(order.price, sellBestCrossPrice) - self.strategy.pos -= order.totalVolume - - trade.volume = order.totalVolume - trade.tradeTime = self.dt.strftime('%H:%M:%S') - trade.dt = self.dt - self.strategy.onTrade(trade) - - self.tradeDict[tradeID] = trade - - # 推送委托数据 - order.tradedVolume = order.totalVolume - order.status = STATUS_ALLTRADED - self.strategy.onOrder(order) - - # 从字典中删除该限价单 - del self.workingLimitOrderDict[orderID] - - #---------------------------------------------------------------------- - def crossStopOrder(self): - """基于最新数据撮合停止单""" - # 先确定会撮合成交的价格,这里和限价单规则相反 - if self.mode == self.BAR_MODE: - buyCrossPrice = self.bar.high # 若买入方向停止单价格低于该价格,则会成交 - sellCrossPrice = self.bar.low # 若卖出方向限价单价格高于该价格,则会成交 - bestCrossPrice = self.bar.open # 最优成交价,买入停止单不能低于,卖出停止单不能高于 - else: - buyCrossPrice = self.tick.lastPrice - sellCrossPrice = self.tick.lastPrice - bestCrossPrice = self.tick.lastPrice - - # 遍历停止单字典中的所有停止单 - for stopOrderID, so in self.workingStopOrderDict.items(): - # 判断是否会成交 - buyCross = so.direction==DIRECTION_LONG and so.price<=buyCrossPrice - sellCross = so.direction==DIRECTION_SHORT and so.price>=sellCrossPrice - - # 如果发生了成交 - if buyCross or sellCross: - # 更新停止单状态,并从字典中删除该停止单 - so.status = STOPORDER_TRIGGERED - if stopOrderID in self.workingStopOrderDict: - del self.workingStopOrderDict[stopOrderID] - - # 推送成交数据 - self.tradeCount += 1 # 成交编号自增1 - tradeID = str(self.tradeCount) - trade = VtTradeData() - trade.vtSymbol = so.vtSymbol - trade.tradeID = tradeID - trade.vtTradeID = tradeID - - if buyCross: - self.strategy.pos += so.volume - trade.price = max(bestCrossPrice, so.price) - else: - self.strategy.pos -= so.volume - trade.price = min(bestCrossPrice, so.price) - - self.limitOrderCount += 1 - orderID = str(self.limitOrderCount) - trade.orderID = orderID - trade.vtOrderID = orderID - trade.direction = so.direction - trade.offset = so.offset - trade.volume = so.volume - trade.tradeTime = self.dt.strftime('%H:%M:%S') - trade.dt = self.dt - - self.tradeDict[tradeID] = trade - - # 推送委托数据 - order = VtOrderData() - order.vtSymbol = so.vtSymbol - order.symbol = so.vtSymbol - order.orderID = orderID - order.vtOrderID = orderID - order.direction = so.direction - order.offset = so.offset - order.price = so.price - order.totalVolume = so.volume - order.tradedVolume = so.volume - order.status = STATUS_ALLTRADED - order.orderTime = trade.tradeTime - - self.limitOrderDict[orderID] = order - - # 按照顺序推送数据 - self.strategy.onStopOrder(so) - self.strategy.onOrder(order) - self.strategy.onTrade(trade) - - #------------------------------------------------ - # 策略接口相关 - #------------------------------------------------ - - #---------------------------------------------------------------------- - def sendOrder(self, vtSymbol, orderType, price, volume, strategy): - """发单""" - self.limitOrderCount += 1 - orderID = str(self.limitOrderCount) - - order = VtOrderData() - order.vtSymbol = vtSymbol - order.price = self.roundToPriceTick(price) - order.totalVolume = volume - order.orderID = orderID - order.vtOrderID = orderID - order.orderTime = self.dt.strftime('%H:%M:%S') - - # CTA委托类型映射 - if orderType == CTAORDER_BUY: - order.direction = DIRECTION_LONG - order.offset = OFFSET_OPEN - elif orderType == CTAORDER_SELL: - order.direction = DIRECTION_SHORT - order.offset = OFFSET_CLOSE - elif orderType == CTAORDER_SHORT: - order.direction = DIRECTION_SHORT - order.offset = OFFSET_OPEN - elif orderType == CTAORDER_COVER: - order.direction = DIRECTION_LONG - order.offset = OFFSET_CLOSE - - # 保存到限价单字典中 - self.workingLimitOrderDict[orderID] = order - self.limitOrderDict[orderID] = order - - return [orderID] - - #---------------------------------------------------------------------- - def cancelOrder(self, vtOrderID): - """撤单""" - if vtOrderID in self.workingLimitOrderDict: - order = self.workingLimitOrderDict[vtOrderID] - - order.status = STATUS_CANCELLED - order.cancelTime = self.dt.strftime('%H:%M:%S') - - self.strategy.onOrder(order) - - del self.workingLimitOrderDict[vtOrderID] - - #---------------------------------------------------------------------- - def sendStopOrder(self, vtSymbol, orderType, price, volume, strategy): - """发停止单(本地实现)""" - self.stopOrderCount += 1 - stopOrderID = STOPORDERPREFIX + str(self.stopOrderCount) - - so = StopOrder() - so.vtSymbol = vtSymbol - so.price = self.roundToPriceTick(price) - so.volume = volume - so.strategy = strategy - so.status = STOPORDER_WAITING - so.stopOrderID = stopOrderID - - if orderType == CTAORDER_BUY: - so.direction = DIRECTION_LONG - so.offset = OFFSET_OPEN - elif orderType == CTAORDER_SELL: - so.direction = DIRECTION_SHORT - so.offset = OFFSET_CLOSE - elif orderType == CTAORDER_SHORT: - so.direction = DIRECTION_SHORT - so.offset = OFFSET_OPEN - elif orderType == CTAORDER_COVER: - so.direction = DIRECTION_LONG - so.offset = OFFSET_CLOSE - - # 保存stopOrder对象到字典中 - self.stopOrderDict[stopOrderID] = so - self.workingStopOrderDict[stopOrderID] = so - - # 推送停止单初始更新 - self.strategy.onStopOrder(so) - - return [stopOrderID] - - #---------------------------------------------------------------------- - def cancelStopOrder(self, stopOrderID): - """撤销停止单""" - # 检查停止单是否存在 - if stopOrderID in self.workingStopOrderDict: - so = self.workingStopOrderDict[stopOrderID] - so.status = STOPORDER_CANCELLED - del self.workingStopOrderDict[stopOrderID] - self.strategy.onStopOrder(so) - - #---------------------------------------------------------------------- - def putStrategyEvent(self, name): - """发送策略更新事件,回测中忽略""" - pass - - #---------------------------------------------------------------------- - def insertData(self, dbName, collectionName, data): - """考虑到回测中不允许向数据库插入数据,防止实盘交易中的一些代码出错""" - pass - - #---------------------------------------------------------------------- - def loadBar(self, dbName, collectionName, startDate): - """直接返回初始化数据列表中的Bar""" - return self.initData - - #---------------------------------------------------------------------- - def loadTick(self, dbName, collectionName, startDate): - """直接返回初始化数据列表中的Tick""" - return self.initData - - #---------------------------------------------------------------------- - def writeCtaLog(self, content): - """记录日志""" - log = str(self.dt) + ' ' + content - self.logList.append(log) - - #---------------------------------------------------------------------- - def cancelAll(self, name): - """全部撤单""" - # 撤销限价单 - for orderID in self.workingLimitOrderDict.keys(): - self.cancelOrder(orderID) - - # 撤销停止单 - for stopOrderID in self.workingStopOrderDict.keys(): - self.cancelStopOrder(stopOrderID) - - #------------------------------------------------ - # 结果计算相关 - #------------------------------------------------ - - #---------------------------------------------------------------------- - def calculateBacktestingResult(self): - """ - 计算回测结果 - """ - self.output(u'计算回测结果') - - # 首先基于回测后的成交记录,计算每笔交易的盈亏 - resultList = [] # 交易结果列表 - - longTrade = [] # 未平仓的多头交易 - shortTrade = [] # 未平仓的空头交易 - - tradeTimeList = [] # 每笔成交时间戳 - posList = [0] # 每笔成交后的持仓情况 - - for trade in self.tradeDict.values(): - # 复制成交对象,因为下面的开平仓交易配对涉及到对成交数量的修改 - # 若不进行复制直接操作,则计算完后所有成交的数量会变成0 - trade = copy.copy(trade) - - # 多头交易 - if trade.direction == DIRECTION_LONG: - # 如果尚无空头交易 - if not shortTrade: - longTrade.append(trade) - # 当前多头交易为平空 - else: - while True: - entryTrade = shortTrade[0] - exitTrade = trade - - # 清算开平仓交易 - closedVolume = min(exitTrade.volume, entryTrade.volume) - result = TradingResult(entryTrade.price, entryTrade.dt, - exitTrade.price, exitTrade.dt, - -closedVolume, self.rate, self.slippage, self.size) - resultList.append(result) - - posList.extend([-1,0]) - tradeTimeList.extend([result.entryDt, result.exitDt]) - - # 计算未清算部分 - entryTrade.volume -= closedVolume - exitTrade.volume -= closedVolume - - # 如果开仓交易已经全部清算,则从列表中移除 - if not entryTrade.volume: - shortTrade.pop(0) - - # 如果平仓交易已经全部清算,则退出循环 - if not exitTrade.volume: - break - - # 如果平仓交易未全部清算, - if exitTrade.volume: - # 且开仓交易已经全部清算完,则平仓交易剩余的部分 - # 等于新的反向开仓交易,添加到队列中 - if not shortTrade: - longTrade.append(exitTrade) - break - # 如果开仓交易还有剩余,则进入下一轮循环 - else: - pass - - # 空头交易 - else: - # 如果尚无多头交易 - if not longTrade: - shortTrade.append(trade) - # 当前空头交易为平多 - else: - while True: - entryTrade = longTrade[0] - exitTrade = trade - - # 清算开平仓交易 - closedVolume = min(exitTrade.volume, entryTrade.volume) - result = TradingResult(entryTrade.price, entryTrade.dt, - exitTrade.price, exitTrade.dt, - closedVolume, self.rate, self.slippage, self.size) - resultList.append(result) - - posList.extend([1,0]) - tradeTimeList.extend([result.entryDt, result.exitDt]) - - # 计算未清算部分 - entryTrade.volume -= closedVolume - exitTrade.volume -= closedVolume - - # 如果开仓交易已经全部清算,则从列表中移除 - if not entryTrade.volume: - longTrade.pop(0) - - # 如果平仓交易已经全部清算,则退出循环 - if not exitTrade.volume: - break - - # 如果平仓交易未全部清算, - if exitTrade.volume: - # 且开仓交易已经全部清算完,则平仓交易剩余的部分 - # 等于新的反向开仓交易,添加到队列中 - if not longTrade: - shortTrade.append(exitTrade) - break - # 如果开仓交易还有剩余,则进入下一轮循环 - else: - pass - - # 到最后交易日尚未平仓的交易,则以最后价格平仓 - if self.mode == self.BAR_MODE: - endPrice = self.bar.close - else: - endPrice = self.tick.lastPrice - - for trade in longTrade: - result = TradingResult(trade.price, trade.dt, endPrice, self.dt, - trade.volume, self.rate, self.slippage, self.size) - resultList.append(result) - - for trade in shortTrade: - result = TradingResult(trade.price, trade.dt, endPrice, self.dt, - -trade.volume, self.rate, self.slippage, self.size) - resultList.append(result) - - # 检查是否有交易 - if not resultList: - self.output(u'无交易结果') - return {} - - # 然后基于每笔交易的结果,我们可以计算具体的盈亏曲线和最大回撤等 - capital = 0 # 资金 - maxCapital = 0 # 资金最高净值 - drawdown = 0 # 回撤 - - totalResult = 0 # 总成交数量 - totalTurnover = 0 # 总成交金额(合约面值) - totalCommission = 0 # 总手续费 - totalSlippage = 0 # 总滑点 - - timeList = [] # 时间序列 - pnlList = [] # 每笔盈亏序列 - capitalList = [] # 盈亏汇总的时间序列 - drawdownList = [] # 回撤的时间序列 - - winningResult = 0 # 盈利次数 - losingResult = 0 # 亏损次数 - totalWinning = 0 # 总盈利金额 - totalLosing = 0 # 总亏损金额 - - for result in resultList: - capital += result.pnl - maxCapital = max(capital, maxCapital) - drawdown = capital - maxCapital - - pnlList.append(result.pnl) - timeList.append(result.exitDt) # 交易的时间戳使用平仓时间 - capitalList.append(capital) - drawdownList.append(drawdown) - - totalResult += 1 - totalTurnover += result.turnover - totalCommission += result.commission - totalSlippage += result.slippage - - if result.pnl >= 0: - winningResult += 1 - totalWinning += result.pnl - else: - losingResult += 1 - totalLosing += result.pnl - - # 计算盈亏相关数据 - winningRate = winningResult/totalResult*100 # 胜率 - - averageWinning = 0 # 这里把数据都初始化为0 - averageLosing = 0 - profitLossRatio = 0 - - if winningResult: - averageWinning = totalWinning/winningResult # 平均每笔盈利 - if losingResult: - averageLosing = totalLosing/losingResult # 平均每笔亏损 - if averageLosing: - profitLossRatio = -averageWinning/averageLosing # 盈亏比 - - # 返回回测结果 - d = {} - d['capital'] = capital - d['maxCapital'] = maxCapital - d['drawdown'] = drawdown - d['totalResult'] = totalResult - d['totalTurnover'] = totalTurnover - d['totalCommission'] = totalCommission - d['totalSlippage'] = totalSlippage - d['timeList'] = timeList - d['pnlList'] = pnlList - d['capitalList'] = capitalList - d['drawdownList'] = drawdownList - d['winningRate'] = winningRate - d['averageWinning'] = averageWinning - d['averageLosing'] = averageLosing - d['profitLossRatio'] = profitLossRatio - d['posList'] = posList - d['tradeTimeList'] = tradeTimeList - - return d - - #---------------------------------------------------------------------- - def showBacktestingResult(self): - """显示回测结果""" - d = self.calculateBacktestingResult() - - # 输出 - self.output('-' * 30) - self.output(u'第一笔交易:\t%s' % d['timeList'][0]) - self.output(u'最后一笔交易:\t%s' % d['timeList'][-1]) - - self.output(u'总交易次数:\t%s' % formatNumber(d['totalResult'])) - self.output(u'总盈亏:\t%s' % formatNumber(d['capital'])) - self.output(u'最大回撤: \t%s' % formatNumber(min(d['drawdownList']))) - - self.output(u'平均每笔盈利:\t%s' %formatNumber(d['capital']/d['totalResult'])) - self.output(u'平均每笔滑点:\t%s' %formatNumber(d['totalSlippage']/d['totalResult'])) - self.output(u'平均每笔佣金:\t%s' %formatNumber(d['totalCommission']/d['totalResult'])) - - self.output(u'胜率\t\t%s%%' %formatNumber(d['winningRate'])) - self.output(u'盈利交易平均值\t%s' %formatNumber(d['averageWinning'])) - self.output(u'亏损交易平均值\t%s' %formatNumber(d['averageLosing'])) - self.output(u'盈亏比:\t%s' %formatNumber(d['profitLossRatio'])) - - # 绘图 - fig = plt.figure(figsize=(10, 16)) - - pCapital = plt.subplot(4, 1, 1) - pCapital.set_ylabel("capital") - pCapital.plot(d['capitalList'], color='r', lw=0.8) - - pDD = plt.subplot(4, 1, 2) - pDD.set_ylabel("DD") - pDD.bar(range(len(d['drawdownList'])), d['drawdownList'], color='g') - - pPnl = plt.subplot(4, 1, 3) - pPnl.set_ylabel("pnl") - pPnl.hist(d['pnlList'], bins=50, color='c') - - pPos = plt.subplot(4, 1, 4) - pPos.set_ylabel("Position") - if d['posList'][-1] == 0: - del d['posList'][-1] - tradeTimeIndex = [item.strftime("%m/%d %H:%M:%S") for item in d['tradeTimeList']] - xindex = np.arange(0, len(tradeTimeIndex), np.int(len(tradeTimeIndex)/10)) - tradeTimeIndex = map(lambda i: tradeTimeIndex[i], xindex) - pPos.plot(d['posList'], color='k', drawstyle='steps-pre') - pPos.set_ylim(-1.2, 1.2) - plt.sca(pPos) - plt.tight_layout() - plt.xticks(xindex, tradeTimeIndex, rotation=30) # 旋转15 - - plt.show() - - #---------------------------------------------------------------------- - def clearBacktestingResult(self): - """清空之前回测的结果""" - # 清空限价单相关 - self.limitOrderCount = 0 - self.limitOrderDict.clear() - self.workingLimitOrderDict.clear() - - # 清空停止单相关 - self.stopOrderCount = 0 - self.stopOrderDict.clear() - self.workingStopOrderDict.clear() - - # 清空成交相关 - self.tradeCount = 0 - self.tradeDict.clear() - - #---------------------------------------------------------------------- - def runOptimization(self, strategyClass, optimizationSetting): - """优化参数""" - # 获取优化设置 - settingList = optimizationSetting.generateSetting() - targetName = optimizationSetting.optimizeTarget - - # 检查参数设置问题 - if not settingList or not targetName: - self.output(u'优化设置有问题,请检查') - - # 遍历优化 - resultList = [] - for setting in settingList: - self.clearBacktestingResult() - self.output('-' * 30) - self.output('setting: %s' %str(setting)) - self.initStrategy(strategyClass, setting) - self.runBacktesting() - d = self.calculateBacktestingResult() - try: - targetValue = d[targetName] - except KeyError: - targetValue = 0 - resultList.append(([str(setting)], targetValue)) - - # 显示结果 - resultList.sort(reverse=True, key=lambda result:result[1]) - self.output('-' * 30) - self.output(u'优化结果:') - for result in resultList: - self.output(u'%s: %s' %(result[0], result[1])) - return result - - #---------------------------------------------------------------------- - def runParallelOptimization(self, strategyClass, optimizationSetting): - """并行优化参数""" - # 获取优化设置 - settingList = optimizationSetting.generateSetting() - targetName = optimizationSetting.optimizeTarget - - # 检查参数设置问题 - if not settingList or not targetName: - self.output(u'优化设置有问题,请检查') - - # 多进程优化,启动一个对应CPU核心数量的进程池 - pool = multiprocessing.Pool(multiprocessing.cpu_count()) - l = [] - - for setting in settingList: - l.append(pool.apply_async(optimize, (strategyClass, setting, - targetName, self.mode, - self.startDate, self.initDays, self.endDate, - self.slippage, self.rate, self.size, self.priceTick, - self.dbName, self.symbol))) - pool.close() - pool.join() - - # 显示结果 - resultList = [res.get() for res in l] - resultList.sort(reverse=True, key=lambda result:result[1]) - self.output('-' * 30) - self.output(u'优化结果:') - for result in resultList: - self.output(u'%s: %s' %(result[0], result[1])) - - #---------------------------------------------------------------------- - def updateDailyClose(self, dt, price): - """更新每日收盘价""" - date = dt.date() - - if date not in self.dailyResultDict: - self.dailyResultDict[date] = DailyResult(date, price) - else: - self.dailyResultDict[date].closePrice = price - - #---------------------------------------------------------------------- - def calculateDailyResult(self): - """计算按日统计的交易结果""" - self.output(u'计算按日统计结果') - - # 将成交添加到每日交易结果中 - for trade in self.tradeDict.values(): - date = trade.dt.date() - dailyResult = self.dailyResultDict[date] - dailyResult.addTrade(trade) - - # 遍历计算每日结果 - previousClose = 0 - openPosition = 0 - for dailyResult in self.dailyResultDict.values(): - dailyResult.previousClose = previousClose - previousClose = dailyResult.closePrice - - dailyResult.calculatePnl(openPosition, self.size, self.rate, self.slippage ) - openPosition = dailyResult.closePosition - - # 生成DataFrame - resultDict = {k:[] for k in dailyResult.__dict__.keys()} - for dailyResult in self.dailyResultDict.values(): - for k, v in dailyResult.__dict__.items(): - resultDict[k].append(v) - - resultDf = pd.DataFrame.from_dict(resultDict) - - # 计算衍生数据 - resultDf = resultDf.set_index('date') - - return resultDf - - #---------------------------------------------------------------------- - def calculateDailyStatistics(self, df): - """计算按日统计的结果""" - df['balance'] = df['netPnl'].cumsum() + self.capital - df['return'] = (np.log(df['balance']) - np.log(df['balance'].shift(1))).fillna(0) - df['highlevel'] = df['balance'].rolling(min_periods=1,window=len(df),center=False).max() - df['drawdown'] = df['balance'] - df['highlevel'] - - # 计算统计结果 - startDate = df.index[0] - endDate = df.index[-1] - - totalDays = len(df) - profitDays = len(df[df['netPnl']>0]) - lossDays = len(df[df['netPnl']<0]) - - endBalance = df['balance'].iloc[-1] - maxDrawdown = df['drawdown'].min() - - totalNetPnl = df['netPnl'].sum() - dailyNetPnl = totalNetPnl / totalDays - - totalCommission = df['commission'].sum() - dailyCommission = totalCommission / totalDays - - totalSlippage = df['slippage'].sum() - dailySlippage = totalSlippage / totalDays - - totalTurnover = df['turnover'].sum() - dailyTurnover = totalTurnover / totalDays - - totalTradeCount = df['tradeCount'].sum() - dailyTradeCount = totalTradeCount / totalDays - - totalReturn = (endBalance/self.capital - 1) * 100 - dailyReturn = df['return'].mean() * 100 - returnStd = df['return'].std() * 100 - - if returnStd: - sharpeRatio = dailyReturn / returnStd * np.sqrt(240) - else: - sharpeRatio = 0 - - # 返回结果 - result = { - 'startDate': startDate, - 'endDate': endDate, - 'totalDays': totalDays, - 'profitDays': profitDays, - 'lossDays': lossDays, - 'endBalance': endBalance, - 'maxDrawdown': maxDrawdown, - 'totalNetPnl': totalNetPnl, - 'dailyNetPnl': dailyNetPnl, - 'totalCommission': totalCommission, - 'dailyCommission': dailyCommission, - 'totalSlippage': totalSlippage, - 'dailySlippage': dailySlippage, - 'totalTurnover': totalTurnover, - 'dailyTurnover': dailyTurnover, - 'totalTradeCount': totalTradeCount, - 'dailyTradeCount': dailyTradeCount, - 'totalReturn': totalReturn, - 'dailyReturn': dailyReturn, - 'returnStd': returnStd, - 'sharpeRatio': sharpeRatio - } - - return df, result - - #---------------------------------------------------------------------- - def showDailyResult(self, df=None, result=None): - """显示按日统计的交易结果""" - if df is None: - df = self.calculateDailyResult() - df, result = self.calculateDailyStatistics(df) - - # 输出统计结果 - self.output('-' * 30) - self.output(u'首个交易日:\t%s' % result['startDate']) - self.output(u'最后交易日:\t%s' % result['endDate']) - - self.output(u'总交易日:\t%s' % result['totalDays']) - self.output(u'盈利交易日\t%s' % result['profitDays']) - self.output(u'亏损交易日:\t%s' % result['lossDays']) - - self.output(u'起始资金:\t%s' % self.capital) - self.output(u'结束资金:\t%s' % formatNumber(result['endBalance'])) - - self.output(u'总收益率:\t%s' % formatNumber(result['totalReturn'])) - self.output(u'总盈亏:\t%s' % formatNumber(result['totalNetPnl'])) - self.output(u'最大回撤: \t%s' % formatNumber(result['maxDrawdown'])) - - self.output(u'总手续费:\t%s' % formatNumber(result['totalCommission'])) - self.output(u'总滑点:\t%s' % formatNumber(result['totalSlippage'])) - self.output(u'总成交金额:\t%s' % formatNumber(result['totalTurnover'])) - self.output(u'总成交笔数:\t%s' % formatNumber(result['totalTradeCount'])) - - self.output(u'日均盈亏:\t%s' % formatNumber(result['dailyNetPnl'])) - self.output(u'日均手续费:\t%s' % formatNumber(result['dailyCommission'])) - self.output(u'日均滑点:\t%s' % formatNumber(result['dailySlippage'])) - self.output(u'日均成交金额:\t%s' % formatNumber(result['dailyTurnover'])) - self.output(u'日均成交笔数:\t%s' % formatNumber(result['dailyTradeCount'])) - - self.output(u'日均收益率:\t%s%%' % formatNumber(result['dailyReturn'])) - self.output(u'收益标准差:\t%s%%' % formatNumber(result['returnStd'])) - self.output(u'Sharpe Ratio:\t%s' % formatNumber(result['sharpeRatio'])) - - # 绘图 - fig = plt.figure(figsize=(10, 16)) - - pBalance = plt.subplot(4, 1, 1) - pBalance.set_title('Balance') - df['balance'].plot(legend=True) - - pDrawdown = plt.subplot(4, 1, 2) - pDrawdown.set_title('Drawdown') - pDrawdown.fill_between(range(len(df)), df['drawdown'].values) - - pPnl = plt.subplot(4, 1, 3) - pPnl.set_title('Daily Pnl') - df['netPnl'].plot(kind='bar', legend=False, grid=False, xticks=[]) - - pKDE = plt.subplot(4, 1, 4) - pKDE.set_title('Daily Pnl Distribution') - df['netPnl'].hist(bins=50) - - plt.show() - - -######################################################################## -class TradingResult(object): - """每笔交易的结果""" - - #---------------------------------------------------------------------- - def __init__(self, entryPrice, entryDt, exitPrice, - exitDt, volume, rate, slippage, size): - """Constructor""" - self.entryPrice = entryPrice # 开仓价格 - self.exitPrice = exitPrice # 平仓价格 - - self.entryDt = entryDt # 开仓时间datetime - self.exitDt = exitDt # 平仓时间 - - self.volume = volume # 交易数量(+/-代表方向) - - self.turnover = (self.entryPrice+self.exitPrice)*size*abs(volume) # 成交金额 - self.commission = self.turnover*rate # 手续费成本 - self.slippage = slippage*2*size*abs(volume) # 滑点成本 - self.pnl = ((self.exitPrice - self.entryPrice) * volume * size - - self.commission - self.slippage) # 净盈亏 - - -######################################################################## -class DailyResult(object): - """每日交易的结果""" - - #---------------------------------------------------------------------- - def __init__(self, date, closePrice): - """Constructor""" - self.date = date # 日期 - self.closePrice = closePrice # 当日收盘价 - self.previousClose = 0 # 昨日收盘价 - - self.tradeList = [] # 成交列表 - self.tradeCount = 0 # 成交数量 - - self.openPosition = 0 # 开盘时的持仓 - self.closePosition = 0 # 收盘时的持仓 - - self.tradingPnl = 0 # 交易盈亏 - self.positionPnl = 0 # 持仓盈亏 - self.totalPnl = 0 # 总盈亏 - - self.turnover = 0 # 成交量 - self.commission = 0 # 手续费 - self.slippage = 0 # 滑点 - self.netPnl = 0 # 净盈亏 - - #---------------------------------------------------------------------- - def addTrade(self, trade): - """添加交易""" - self.tradeList.append(trade) - - #---------------------------------------------------------------------- - def calculatePnl(self, openPosition=0, size=1, rate=0, slippage=0): - """ - 计算盈亏 - size: 合约乘数 - rate:手续费率 - slippage:滑点点数 - """ - # 持仓部分 - self.openPosition = openPosition - self.positionPnl = self.openPosition * (self.closePrice - self.previousClose) * size - self.closePosition = self.openPosition - - # 交易部分 - self.tradeCount = len(self.tradeList) - - for trade in self.tradeList: - if trade.direction == DIRECTION_LONG: - posChange = trade.volume - else: - posChange = -trade.volume - - self.tradingPnl += posChange * (self.closePrice - trade.price) * size - self.closePosition += posChange - self.turnover += trade.price * trade.volume * size - self.commission += trade.price * trade.volume * size * rate - self.slippage += trade.volume * size * slippage - - # 汇总 - self.totalPnl = self.tradingPnl + self.positionPnl - self.netPnl = self.totalPnl - self.commission - self.slippage - - -######################################################################## -class OptimizationSetting(object): - """优化设置""" - - #---------------------------------------------------------------------- - def __init__(self): - """Constructor""" - self.paramDict = OrderedDict() - - self.optimizeTarget = '' # 优化目标字段 - - #---------------------------------------------------------------------- - def addParameter(self, name, start, end=None, step=None): - """增加优化参数""" - if end is None and step is None: - self.paramDict[name] = [start] - return - - if end < start: - print u'参数起始点必须不大于终止点' - return - - if step <= 0: - print u'参数布进必须大于0' - return - - l = [] - param = start - - while param <= end: - l.append(param) - param += step - - self.paramDict[name] = l - - #---------------------------------------------------------------------- - def generateSetting(self): - """生成优化参数组合""" - # 参数名的列表 - nameList = self.paramDict.keys() - paramList = self.paramDict.values() - - # 使用迭代工具生产参数对组合 - productList = list(product(*paramList)) - - # 把参数对组合打包到一个个字典组成的列表中 - settingList = [] - for p in productList: - d = dict(zip(nameList, p)) - settingList.append(d) - - return settingList - - #---------------------------------------------------------------------- - def setOptimizeTarget(self, target): - """设置优化目标字段""" - self.optimizeTarget = target - - -#---------------------------------------------------------------------- -def formatNumber(n): - """格式化数字到字符串""" - rn = round(n, 2) # 保留两位小数 - return format(rn, ',') # 加上千分符 - - -#---------------------------------------------------------------------- -def optimize(strategyClass, setting, targetName, - mode, startDate, initDays, endDate, - slippage, rate, size, priceTick, - dbName, symbol): - """多进程优化时跑在每个进程中运行的函数""" - engine = BacktestingEngine() - engine.setBacktestingMode(mode) - engine.setStartDate(startDate, initDays) - engine.setEndDate(endDate) - engine.setSlippage(slippage) - engine.setRate(rate) - engine.setSize(size) - engine.setPriceTick(priceTick) - engine.setDatabase(dbName, symbol) - - engine.initStrategy(strategyClass, setting) - engine.runBacktesting() - - df = engine.calculateDailyResult() - df, d = engine.calculateDailyStatistics(df) - try: - targetValue = d[targetName] - except KeyError: - targetValue = 0 - return (str(setting), targetValue) - \ No newline at end of file diff --git a/examples/VnTrader/ctaStrategy123/ctaBase.py b/examples/VnTrader/ctaStrategy123/ctaBase.py deleted file mode 100644 index c19eaf18..00000000 --- a/examples/VnTrader/ctaStrategy123/ctaBase.py +++ /dev/null @@ -1,58 +0,0 @@ -# encoding: UTF-8 - -''' -本文件中包含了CTA模块中用到的一些基础设置、类和常量等。 -''' - -# CTA引擎中涉及的数据类定义 -from vnpy.trader.vtConstant import EMPTY_UNICODE, EMPTY_STRING, EMPTY_FLOAT, EMPTY_INT - -# 常量定义 -# CTA引擎中涉及到的交易方向类型 -CTAORDER_BUY = u'买开' -CTAORDER_SELL = u'卖平' -CTAORDER_SHORT = u'卖开' -CTAORDER_COVER = u'买平' - -# 本地停止单状态 -STOPORDER_WAITING = u'等待中' -STOPORDER_CANCELLED = u'已撤销' -STOPORDER_TRIGGERED = u'已触发' - -# 本地停止单前缀 -STOPORDERPREFIX = 'CtaStopOrder.' - -# 数据库名称 -SETTING_DB_NAME = 'VnTrader_Setting_Db' -POSITION_DB_NAME = 'VnTrader_Position_Db' - -TICK_DB_NAME = 'VnTrader_Tick_Db' -DAILY_DB_NAME = 'VnTrader_Daily_Db' -MINUTE_DB_NAME = 'VnTrader_1Min_Db' - -# 引擎类型,用于区分当前策略的运行环境 -ENGINETYPE_BACKTESTING = 'backtesting' # 回测 -ENGINETYPE_TRADING = 'trading' # 实盘 - -# CTA模块事件 -EVENT_CTA_LOG = 'eCtaLog' # CTA相关的日志事件 -EVENT_CTA_STRATEGY = 'eCtaStrategy.' # CTA策略状态变化事件 - - -######################################################################## -class StopOrder(object): - """本地停止单""" - - #---------------------------------------------------------------------- - def __init__(self): - """Constructor""" - self.vtSymbol = EMPTY_STRING - self.orderType = EMPTY_UNICODE - self.direction = EMPTY_UNICODE - self.offset = EMPTY_UNICODE - self.price = EMPTY_FLOAT - self.volume = EMPTY_INT - - self.strategy = None # 下停止单的策略对象 - self.stopOrderID = EMPTY_STRING # 停止单的本地编号 - self.status = EMPTY_STRING # 停止单状态 \ No newline at end of file diff --git a/examples/VnTrader/ctaStrategy123/ctaEngine.py b/examples/VnTrader/ctaStrategy123/ctaEngine.py deleted file mode 100644 index a13177c3..00000000 --- a/examples/VnTrader/ctaStrategy123/ctaEngine.py +++ /dev/null @@ -1,631 +0,0 @@ -# encoding: UTF-8 - -''' -本文件中实现了CTA策略引擎,针对CTA类型的策略,抽象简化了部分底层接口的功能。 - -关于平今和平昨规则: -1. 普通的平仓OFFSET_CLOSET等于平昨OFFSET_CLOSEYESTERDAY -2. 只有上期所的品种需要考虑平今和平昨的区别 -3. 当上期所的期货有今仓时,调用Sell和Cover会使用OFFSET_CLOSETODAY,否则 - 会使用OFFSET_CLOSE -4. 以上设计意味着如果Sell和Cover的数量超过今日持仓量时,会导致出错(即用户 - 希望通过一个指令同时平今和平昨) -5. 采用以上设计的原因是考虑到vn.trader的用户主要是对TB、MC和金字塔类的平台 - 感到功能不足的用户(即希望更高频的交易),交易策略不应该出现4中所述的情况 -6. 对于想要实现4中所述情况的用户,需要实现一个策略信号引擎和交易委托引擎分开 - 的定制化统结构(没错,得自己写) -''' - -from __future__ import division - -import json -import os -import traceback -from collections import OrderedDict -from datetime import datetime, timedelta - -from vnpy.event import Event -from vnpy.trader.vtEvent import * -from vnpy.trader.vtConstant import * -from vnpy.trader.vtObject import VtTickData, VtBarData -from vnpy.trader.vtGateway import VtSubscribeReq, VtOrderReq, VtCancelOrderReq, VtLogData -from vnpy.trader.vtFunction import todayDate, getJsonPath - -from .ctaBase import * -from .strategy import STRATEGY_CLASS - - - - -######################################################################## -class CtaEngine(object): - """CTA策略引擎""" - settingFileName = 'CTA_setting.json' - settingfilePath = getJsonPath(settingFileName, __file__) - - STATUS_FINISHED = set([STATUS_REJECTED, STATUS_CANCELLED, STATUS_ALLTRADED]) - - #---------------------------------------------------------------------- - def __init__(self, mainEngine, eventEngine): - """Constructor""" - self.mainEngine = mainEngine - self.eventEngine = eventEngine - - # 当前日期 - self.today = todayDate() - - # 保存策略实例的字典 - # key为策略名称,value为策略实例,注意策略名称不允许重复 - self.strategyDict = {} - - # 保存vtSymbol和策略实例映射的字典(用于推送tick数据) - # 由于可能多个strategy交易同一个vtSymbol,因此key为vtSymbol - # value为包含所有相关strategy对象的list - self.tickStrategyDict = {} - - # 保存vtOrderID和strategy对象映射的字典(用于推送order和trade数据) - # key为vtOrderID,value为strategy对象 - self.orderStrategyDict = {} - - # 本地停止单编号计数 - self.stopOrderCount = 0 - # stopOrderID = STOPORDERPREFIX + str(stopOrderCount) - - # 本地停止单字典 - # key为stopOrderID,value为stopOrder对象 - self.stopOrderDict = {} # 停止单撤销后不会从本字典中删除 - self.workingStopOrderDict = {} # 停止单撤销后会从本字典中删除 - - # 保存策略名称和委托号列表的字典 - # key为name,value为保存orderID(限价+本地停止)的集合 - self.strategyOrderDict = {} - - # 成交号集合,用来过滤已经收到过的成交推送 - self.tradeSet = set() - - # 引擎类型为实盘 - self.engineType = ENGINETYPE_TRADING - - # 注册日式事件类型 - self.mainEngine.registerLogEvent(EVENT_CTA_LOG) - - # 注册事件监听 - self.registerEvent() - - #---------------------------------------------------------------------- - def sendOrder(self, vtSymbol, orderType, price, volume, strategy): - """发单""" - contract = self.mainEngine.getContract(vtSymbol) - - req = VtOrderReq() - req.symbol = contract.symbol - req.exchange = contract.exchange - req.vtSymbol = contract.vtSymbol - req.price = self.roundToPriceTick(contract.priceTick, price) - req.volume = volume - - req.productClass = strategy.productClass - req.currency = strategy.currency - - # 设计为CTA引擎发出的委托只允许使用限价单 - req.priceType = PRICETYPE_LIMITPRICE - - # CTA委托类型映射 - if orderType == CTAORDER_BUY: - req.direction = DIRECTION_LONG - req.offset = OFFSET_OPEN - - elif orderType == CTAORDER_SELL: - req.direction = DIRECTION_SHORT - req.offset = OFFSET_CLOSE - - elif orderType == CTAORDER_SHORT: - req.direction = DIRECTION_SHORT - req.offset = OFFSET_OPEN - - elif orderType == CTAORDER_COVER: - req.direction = DIRECTION_LONG - req.offset = OFFSET_CLOSE - - # 委托转换 - reqList = self.mainEngine.convertOrderReq(req) - vtOrderIDList = [] - - if not reqList: - return vtOrderIDList - - for convertedReq in reqList: - vtOrderID = self.mainEngine.sendOrder(convertedReq, contract.gatewayName) # 发单 - self.orderStrategyDict[vtOrderID] = strategy # 保存vtOrderID和策略的映射关系 - self.strategyOrderDict[strategy.name].add(vtOrderID) # 添加到策略委托号集合中 - vtOrderIDList.append(vtOrderID) - - self.writeCtaLog(u'策略%s发送委托,%s,%s,%s@%s' - %(strategy.name, vtSymbol, req.direction, volume, price)) - - return vtOrderIDList - - #---------------------------------------------------------------------- - def cancelOrder(self, vtOrderID): - """撤单""" - # 查询报单对象 - order = self.mainEngine.getOrder(vtOrderID) - - # 如果查询成功 - if order: - # 检查是否报单还有效,只有有效时才发出撤单指令 - orderFinished = (order.status==STATUS_ALLTRADED or order.status==STATUS_CANCELLED) - if not orderFinished: - req = VtCancelOrderReq() - req.symbol = order.symbol - req.exchange = order.exchange - req.frontID = order.frontID - req.sessionID = order.sessionID - req.orderID = order.orderID - self.mainEngine.cancelOrder(req, order.gatewayName) - - #---------------------------------------------------------------------- - def sendStopOrder(self, vtSymbol, orderType, price, volume, strategy): - """发停止单(本地实现)""" - self.stopOrderCount += 1 - stopOrderID = STOPORDERPREFIX + str(self.stopOrderCount) - - so = StopOrder() - so.vtSymbol = vtSymbol - so.orderType = orderType - so.price = price - so.volume = volume - so.strategy = strategy - so.stopOrderID = stopOrderID - so.status = STOPORDER_WAITING - - if orderType == CTAORDER_BUY: - so.direction = DIRECTION_LONG - so.offset = OFFSET_OPEN - elif orderType == CTAORDER_SELL: - so.direction = DIRECTION_SHORT - so.offset = OFFSET_CLOSE - elif orderType == CTAORDER_SHORT: - so.direction = DIRECTION_SHORT - so.offset = OFFSET_OPEN - elif orderType == CTAORDER_COVER: - so.direction = DIRECTION_LONG - so.offset = OFFSET_CLOSE - - # 保存stopOrder对象到字典中 - self.stopOrderDict[stopOrderID] = so - self.workingStopOrderDict[stopOrderID] = so - - # 保存stopOrderID到策略委托号集合中 - self.strategyOrderDict[strategy.name].add(stopOrderID) - - # 推送停止单状态 - strategy.onStopOrder(so) - - return [stopOrderID] - - #---------------------------------------------------------------------- - def cancelStopOrder(self, stopOrderID): - """撤销停止单""" - # 检查停止单是否存在 - if stopOrderID in self.workingStopOrderDict: - so = self.workingStopOrderDict[stopOrderID] - strategy = so.strategy - - # 更改停止单状态为已撤销 - so.status = STOPORDER_CANCELLED - - # 从活动停止单字典中移除 - del self.workingStopOrderDict[stopOrderID] - - # 从策略委托号集合中移除 - s = self.strategyOrderDict[strategy.name] - if stopOrderID in s: - s.remove(stopOrderID) - - # 通知策略 - strategy.onStopOrder(so) - - #---------------------------------------------------------------------- - def processStopOrder(self, tick): - """收到行情后处理本地停止单(检查是否要立即发出)""" - vtSymbol = tick.vtSymbol - - # 首先检查是否有策略交易该合约 - if vtSymbol in self.tickStrategyDict: - # 遍历等待中的停止单,检查是否会被触发 - for so in self.workingStopOrderDict.values(): - if so.vtSymbol == vtSymbol: - longTriggered = so.direction==DIRECTION_LONG and tick.lastPrice>=so.price # 多头停止单被触发 - shortTriggered = so.direction==DIRECTION_SHORT and tick.lastPrice<=so.price # 空头停止单被触发 - - if longTriggered or shortTriggered: - # 买入和卖出分别以涨停跌停价发单(模拟市价单) - if so.direction==DIRECTION_LONG: - price = tick.upperLimit - else: - price = tick.lowerLimit - - # 发出市价委托 - self.sendOrder(so.vtSymbol, so.orderType, price, so.volume, so.strategy) - - # 从活动停止单字典中移除该停止单 - del self.workingStopOrderDict[so.stopOrderID] - - # 从策略委托号集合中移除 - s = self.strategyOrderDict[so.strategy.name] - if so.stopOrderID in s: - s.remove(so.stopOrderID) - - # 更新停止单状态,并通知策略 - so.status = STOPORDER_TRIGGERED - so.strategy.onStopOrder(so) - - #---------------------------------------------------------------------- - def processTickEvent(self, event): - """处理行情推送""" - tick = event.dict_['data'] - # 收到tick行情后,先处理本地停止单(检查是否要立即发出) - self.processStopOrder(tick) - - # 推送tick到对应的策略实例进行处理 - if tick.vtSymbol in self.tickStrategyDict: - # tick时间可能出现异常数据,使用try...except实现捕捉和过滤 - try: - # 添加datetime字段 - if not tick.datetime: - tick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f') - except ValueError: - self.writeCtaLog(traceback.format_exc()) - return - - # 逐个推送到策略实例中 - l = self.tickStrategyDict[tick.vtSymbol] - for strategy in l: - self.callStrategyFunc(strategy, strategy.onTick, tick) - - #---------------------------------------------------------------------- - def processOrderEvent(self, event): - """处理委托推送""" - order = event.dict_['data'] - - vtOrderID = order.vtOrderID - - if vtOrderID in self.orderStrategyDict: - strategy = self.orderStrategyDict[vtOrderID] - - # 如果委托已经完成(拒单、撤销、全成),则从活动委托集合中移除 - if order.status in self.STATUS_FINISHED: - s = self.strategyOrderDict[strategy.name] - if vtOrderID in s: - s.remove(vtOrderID) - - self.callStrategyFunc(strategy, strategy.onOrder, order) - - #---------------------------------------------------------------------- - def processTradeEvent(self, event): - """处理成交推送""" - trade = event.dict_['data'] - - # 过滤已经收到过的成交回报 - if trade.vtTradeID in self.tradeSet: - return - self.tradeSet.add(trade.vtTradeID) - - # 将成交推送到策略对象中 - if trade.vtOrderID in self.orderStrategyDict: - strategy = self.orderStrategyDict[trade.vtOrderID] - - # 计算策略持仓 - if trade.direction == DIRECTION_LONG: - strategy.pos += trade.volume - else: - strategy.pos -= trade.volume - - self.callStrategyFunc(strategy, strategy.onTrade, trade) - - # 保存策略持仓到数据库 - self.savePosition(strategy) - - #---------------------------------------------------------------------- - def registerEvent(self): - """注册事件监听""" - self.eventEngine.register(EVENT_TICK, self.processTickEvent) - self.eventEngine.register(EVENT_ORDER, self.processOrderEvent) - self.eventEngine.register(EVENT_TRADE, self.processTradeEvent) - - #---------------------------------------------------------------------- - def insertData(self, dbName, collectionName, data): - """插入数据到数据库(这里的data可以是VtTickData或者VtBarData)""" - self.mainEngine.dbInsert(dbName, collectionName, data.__dict__) - - #---------------------------------------------------------------------- - def loadBar(self, dbName, collectionName, days): - """从数据库中读取Bar数据,startDate是datetime对象""" - startDate = self.today - timedelta(days) - - d = {'datetime':{'$gte':startDate}} - barData = self.mainEngine.dbQuery(dbName, collectionName, d, 'datetime') - - l = [] - for d in barData: - bar = VtBarData() - bar.__dict__ = d - l.append(bar) - return l - - #---------------------------------------------------------------------- - def loadTick(self, dbName, collectionName, days): - """从数据库中读取Tick数据,startDate是datetime对象""" - startDate = self.today - timedelta(days) - - d = {'datetime':{'$gte':startDate}} - tickData = self.mainEngine.dbQuery(dbName, collectionName, d, 'datetime') - - l = [] - for d in tickData: - tick = VtTickData() - tick.__dict__ = d - l.append(tick) - return l - - #---------------------------------------------------------------------- - def writeCtaLog(self, content): - """快速发出CTA模块日志事件""" - log = VtLogData() - log.logContent = content - log.gatewayName = 'CTA_STRATEGY' - event = Event(type_=EVENT_CTA_LOG) - event.dict_['data'] = log - self.eventEngine.put(event) - - #---------------------------------------------------------------------- - def loadStrategy(self, setting): - """载入策略""" - try: - name = setting['name'] - className = setting['className'] - except Exception, e: - self.writeCtaLog(u'载入策略出错:%s' %e) - return - - # 获取策略类 - strategyClass = STRATEGY_CLASS.get(className, None) - if not strategyClass: - self.writeCtaLog(u'找不到策略类:%s' %className) - return - - # 防止策略重名 - if name in self.strategyDict: - self.writeCtaLog(u'策略实例重名:%s' %name) - else: - # 创建策略实例 - strategy = strategyClass(self, setting) - self.strategyDict[name] = strategy - - # 创建委托号列表 - self.strategyOrderDict[name] = set() - - # 保存Tick映射关系 - if strategy.vtSymbol in self.tickStrategyDict: - l = self.tickStrategyDict[strategy.vtSymbol] - else: - l = [] - self.tickStrategyDict[strategy.vtSymbol] = l - l.append(strategy) - - # 订阅合约 - contract = self.mainEngine.getContract(strategy.vtSymbol) - if contract: - req = VtSubscribeReq() - req.symbol = contract.symbol - req.exchange = contract.exchange - - # 对于IB接口订阅行情时所需的货币和产品类型,从策略属性中获取 - req.currency = strategy.currency - req.productClass = strategy.productClass - - self.mainEngine.subscribe(req, contract.gatewayName) - else: - self.writeCtaLog(u'%s的交易合约%s无法找到' %(name, strategy.vtSymbol)) - - #---------------------------------------------------------------------- - def initStrategy(self, name): - """初始化策略""" - if name in self.strategyDict: - strategy = self.strategyDict[name] - - if not strategy.inited: - strategy.inited = True - self.callStrategyFunc(strategy, strategy.onInit) - else: - self.writeCtaLog(u'请勿重复初始化策略实例:%s' %name) - else: - self.writeCtaLog(u'策略实例不存在:%s' %name) - - #--------------------------------------------------------------------- - def startStrategy(self, name): - """启动策略""" - if name in self.strategyDict: - strategy = self.strategyDict[name] - - if strategy.inited and not strategy.trading: - strategy.trading = True - self.callStrategyFunc(strategy, strategy.onStart) - else: - self.writeCtaLog(u'策略实例不存在:%s' %name) - - #---------------------------------------------------------------------- - def stopStrategy(self, name): - """停止策略""" - if name in self.strategyDict: - strategy = self.strategyDict[name] - - if strategy.trading: - strategy.trading = False - self.callStrategyFunc(strategy, strategy.onStop) - - # 对该策略发出的所有限价单进行撤单 - for vtOrderID, s in self.orderStrategyDict.items(): - if s is strategy: - self.cancelOrder(vtOrderID) - - # 对该策略发出的所有本地停止单撤单 - for stopOrderID, so in self.workingStopOrderDict.items(): - if so.strategy is strategy: - self.cancelStopOrder(stopOrderID) - else: - self.writeCtaLog(u'策略实例不存在:%s' %name) - - #---------------------------------------------------------------------- - def initAll(self): - """全部初始化""" - for name in self.strategyDict.keys(): - self.initStrategy(name) - - #---------------------------------------------------------------------- - def startAll(self): - """全部启动""" - for name in self.strategyDict.keys(): - self.startStrategy(name) - - #---------------------------------------------------------------------- - def stopAll(self): - """全部停止""" - for name in self.strategyDict.keys(): - self.stopStrategy(name) - - #---------------------------------------------------------------------- - def saveSetting(self): - """保存策略配置""" - with open(self.settingfilePath, 'w') as f: - l = [] - - for strategy in self.strategyDict.values(): - setting = {} - for param in strategy.paramList: - setting[param] = strategy.__getattribute__(param) - l.append(setting) - - jsonL = json.dumps(l, indent=4) - f.write(jsonL) - - #---------------------------------------------------------------------- - def loadSetting(self): - """读取策略配置""" - with open(self.settingfilePath) as f: - l = json.load(f) - - for setting in l: - self.loadStrategy(setting) - - self.loadPosition() - - #---------------------------------------------------------------------- - def getStrategyVar(self, name): - """获取策略当前的变量字典""" - if name in self.strategyDict: - strategy = self.strategyDict[name] - varDict = OrderedDict() - - for key in strategy.varList: - varDict[key] = strategy.__getattribute__(key) - - return varDict - else: - self.writeCtaLog(u'策略实例不存在:' + name) - return None - - #---------------------------------------------------------------------- - def getStrategyParam(self, name): - """获取策略的参数字典""" - if name in self.strategyDict: - strategy = self.strategyDict[name] - paramDict = OrderedDict() - - for key in strategy.paramList: - paramDict[key] = strategy.__getattribute__(key) - - return paramDict - else: - self.writeCtaLog(u'策略实例不存在:' + name) - return None - - #---------------------------------------------------------------------- - def putStrategyEvent(self, name): - """触发策略状态变化事件(通常用于通知GUI更新)""" - event = Event(EVENT_CTA_STRATEGY+name) - self.eventEngine.put(event) - - #---------------------------------------------------------------------- - def callStrategyFunc(self, strategy, func, params=None): - """调用策略的函数,若触发异常则捕捉""" - try: - if params: - func(params) - else: - func() - except Exception: - # 停止策略,修改状态为未初始化 - strategy.trading = False - strategy.inited = False - - # 发出日志 - content = '\n'.join([u'策略%s触发异常已停止' %strategy.name, - traceback.format_exc()]) - self.writeCtaLog(content) - - #---------------------------------------------------------------------- - def savePosition(self, strategy): - """保存策略的持仓情况到数据库""" - flt = {'name': strategy.name, - 'vtSymbol': strategy.vtSymbol} - - d = {'name': strategy.name, - 'vtSymbol': strategy.vtSymbol, - 'pos': strategy.pos} - - self.mainEngine.dbUpdate(POSITION_DB_NAME, strategy.className, - d, flt, True) - - content = '策略%s持仓保存成功,当前持仓%s' %(strategy.name, strategy.pos) - self.writeCtaLog(content) - - #---------------------------------------------------------------------- - def loadPosition(self): - """从数据库载入策略的持仓情况""" - for strategy in self.strategyDict.values(): - flt = {'name': strategy.name, - 'vtSymbol': strategy.vtSymbol} - posData = self.mainEngine.dbQuery(POSITION_DB_NAME, strategy.className, flt) - - for d in posData: - strategy.pos = d['pos'] - - #---------------------------------------------------------------------- - def roundToPriceTick(self, priceTick, price): - """取整价格到合约最小价格变动""" - if not priceTick: - return price - - newPrice = round(price/priceTick, 0) * priceTick - return newPrice - - #---------------------------------------------------------------------- - def stop(self): - """停止""" - pass - - #---------------------------------------------------------------------- - def cancelAll(self, name): - """全部撤单""" - s = self.strategyOrderDict[name] - - # 遍历列表,全部撤单 - # 这里不能直接遍历集合s,因为撤单时会修改s中的内容,导致出错 - for orderID in list(s): - if STOPORDERPREFIX in orderID: - self.cancelStopOrder(orderID) - else: - self.cancelOrder(orderID) - diff --git a/examples/VnTrader/ctaStrategy123/ctaHistoryData.py b/examples/VnTrader/ctaStrategy123/ctaHistoryData.py deleted file mode 100644 index 6e3b77fe..00000000 --- a/examples/VnTrader/ctaStrategy123/ctaHistoryData.py +++ /dev/null @@ -1,507 +0,0 @@ -# encoding: UTF-8 - -""" -本模块中主要包含: -1. 从通联数据下载历史行情的引擎 -2. 用来把MultiCharts导出的历史数据载入到MongoDB中用的函数 -3. 增加从通达信导出的历史数据载入到MongoDB中的函数 -""" - -from datetime import datetime, timedelta -from time import time -from multiprocessing.pool import ThreadPool - -import pymongo - -from vnpy.data.datayes import DatayesApi -from vnpy.trader.vtGlobal import globalSetting -from vnpy.trader.vtConstant import * -from vnpy.trader.vtObject import VtBarData -from .ctaBase import SETTING_DB_NAME, TICK_DB_NAME, MINUTE_DB_NAME, DAILY_DB_NAME - - -# 以下为vn.trader和通联数据规定的交易所代码映射 -VT_TO_DATAYES_EXCHANGE = {} -VT_TO_DATAYES_EXCHANGE[EXCHANGE_CFFEX] = 'CCFX' # 中金所 -VT_TO_DATAYES_EXCHANGE[EXCHANGE_SHFE] = 'XSGE' # 上期所 -VT_TO_DATAYES_EXCHANGE[EXCHANGE_CZCE] = 'XZCE' # 郑商所 -VT_TO_DATAYES_EXCHANGE[EXCHANGE_DCE] = 'XDCE' # 大商所 -DATAYES_TO_VT_EXCHANGE = {v:k for k,v in VT_TO_DATAYES_EXCHANGE.items()} - - -######################################################################## -class HistoryDataEngine(object): - """CTA模块用的历史数据引擎""" - - #---------------------------------------------------------------------- - def __init__(self, token): - """Constructor""" - self.dbClient = pymongo.MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort']) - self.datayesClient = DatayesApi(token) - - #---------------------------------------------------------------------- - def lastTradeDate(self): - """获取最近交易日(只考虑工作日,无法检查国内假期)""" - today = datetime.now() - oneday = timedelta(1) - - if today.weekday() == 5: - today = today - oneday - elif today.weekday() == 6: - today = today - oneday*2 - - return today.strftime('%Y%m%d') - - #---------------------------------------------------------------------- - def readFuturesProductSymbol(self): - """查询所有期货产品代码""" - cx = self.dbClient[SETTING_DB_NAME]['FuturesSymbol'].find() - return set([d['productSymbol'] for d in cx]) # 这里返回的是集合(因为会重复) - - #---------------------------------------------------------------------- - def readFuturesSymbol(self): - """查询所有期货合约代码""" - cx = self.dbClient[SETTING_DB_NAME]['FuturesSymbol'].find() - return [d['symbol'] for d in cx] # 这里返回的是列表 - - #---------------------------------------------------------------------- - def downloadFuturesSymbol(self, tradeDate=''): - """下载所有期货的代码""" - if not tradeDate: - tradeDate = self.lastTradeDate() - - self.dbClient[SETTING_DB_NAME]['FuturesSymbol'].ensure_index([('symbol', pymongo.ASCENDING)], - unique=True) - - - path = 'api/market/getMktMFutd.json' - - params = {} - params['tradeDate'] = tradeDate - - data = self.datayesClient.downloadData(path, params) - - if data: - for d in data: - symbolDict = {} - symbolDict['symbol'] = d['ticker'] - symbolDict['productSymbol'] = d['contractObject'] - flt = {'symbol': d['ticker']} - - self.dbClient[SETTING_DB_NAME]['FuturesSymbol'].update_one(flt, {'$set':symbolDict}, - upsert=True) - print u'期货合约代码下载完成' - else: - print u'期货合约代码下载失败' - - #---------------------------------------------------------------------- - def downloadFuturesDailyBar(self, symbol): - """ - 下载期货合约的日行情,symbol是合约代码, - 若最后四位为0000(如IF0000),代表下载连续合约。 - """ - print u'开始下载%s日行情' %symbol - - # 查询数据库中已有数据的最后日期 - cl = self.dbClient[DAILY_DB_NAME][symbol] - cx = cl.find(sort=[('datetime', pymongo.DESCENDING)]) - if cx.count(): - last = cx[0] - else: - last = '' - - # 主力合约 - if '0000' in symbol: - path = 'api/market/getMktMFutd.json' - - params = {} - params['contractObject'] = symbol.replace('0000', '') - params['mainCon'] = 1 - if last: - params['startDate'] = last['date'] - # 交易合约 - else: - path = 'api/market/getMktFutd.json' - - params = {} - params['ticker'] = symbol - if last: - params['startDate'] = last['date'] - - # 开始下载数据 - data = self.datayesClient.downloadData(path, params) - - if data: - # 创建datetime索引 - self.dbClient[DAILY_DB_NAME][symbol].ensure_index([('datetime', pymongo.ASCENDING)], - unique=True) - - for d in data: - bar = VtBarData() - bar.vtSymbol = symbol - bar.symbol = symbol - try: - bar.exchange = DATAYES_TO_VT_EXCHANGE.get(d.get('exchangeCD', ''), '') - bar.open = d.get('openPrice', 0) - bar.high = d.get('highestPrice', 0) - bar.low = d.get('lowestPrice', 0) - bar.close = d.get('closePrice', 0) - bar.date = d.get('tradeDate', '').replace('-', '') - bar.time = '' - bar.datetime = datetime.strptime(bar.date, '%Y%m%d') - bar.volume = d.get('turnoverVol', 0) - bar.openInterest = d.get('openInt', 0) - except KeyError: - print d - - flt = {'datetime': bar.datetime} - self.dbClient[DAILY_DB_NAME][symbol].update_one(flt, {'$set':bar.__dict__}, upsert=True) - - print u'%s下载完成' %symbol - else: - print u'找不到合约%s' %symbol - - #---------------------------------------------------------------------- - def downloadAllFuturesDailyBar(self): - """下载所有期货的主力合约日行情""" - start = time() - print u'开始下载所有期货的主力合约日行情' - - productSymbolSet = self.readFuturesProductSymbol() - - print u'代码列表读取成功,产品代码:%s' %productSymbolSet - - # 这里也测试了线程池,但可能由于下载函数中涉及较多的数据格 - # 式转换,CPU开销较大,多线程效率并无显著改变。 - #p = ThreadPool(10) - #p.map(self.downloadFuturesDailyBar, productSymbolSet) - #p.close() - #p.join() - - for productSymbol in productSymbolSet: - self.downloadFuturesDailyBar(productSymbol+'0000') - - print u'所有期货的主力合约日行情已经全部下载完成, 耗时%s秒' %(time()-start) - - #---------------------------------------------------------------------- - def downloadFuturesIntradayBar(self, symbol): - """下载期货的日内分钟行情""" - print u'开始下载%s日内分钟行情' %symbol - - # 日内分钟行情只有具体合约 - path = 'api/market/getFutureBarRTIntraDay.json' - - params = {} - params['instrumentID'] = symbol - params['unit'] = 1 - - data = self.datayesClient.downloadData(path, params) - - if data: - today = datetime.now().strftime('%Y%m%d') - - # 创建datetime索引 - self.dbClient[MINUTE_DB_NAME][symbol].ensure_index([('datetime', pymongo.ASCENDING)], - unique=True) - - for d in data: - bar = VtBarData() - bar.vtSymbol = symbol - bar.symbol = symbol - try: - bar.exchange = DATAYES_TO_VT_EXCHANGE.get(d.get('exchangeCD', ''), '') - bar.open = d.get('openPrice', 0) - bar.high = d.get('highestPrice', 0) - bar.low = d.get('lowestPrice', 0) - bar.close = d.get('closePrice', 0) - bar.date = today - bar.time = d.get('barTime', '') - bar.datetime = datetime.strptime(bar.date + ' ' + bar.time, '%Y%m%d %H:%M') - bar.volume = d.get('totalVolume', 0) - bar.openInterest = 0 - except KeyError: - print d - - flt = {'datetime': bar.datetime} - self.dbClient[MINUTE_DB_NAME][symbol].update_one(flt, {'$set':bar.__dict__}, upsert=True) - - print u'%s下载完成' %symbol - else: - print u'找不到合约%s' %symbol - - #---------------------------------------------------------------------- - def downloadEquitySymbol(self, tradeDate=''): - """下载所有股票的代码""" - if not tradeDate: - tradeDate = self.lastTradeDate() - - self.dbClient[SETTING_DB_NAME]['EquitySymbol'].ensure_index([('symbol', pymongo.ASCENDING)], - unique=True) - - - path = 'api/market/getMktEqud.json' - - params = {} - params['tradeDate'] = tradeDate - - data = self.datayesClient.downloadData(path, params) - - if data: - for d in data: - symbolDict = {} - symbolDict['symbol'] = d['ticker'] - flt = {'symbol': d['ticker']} - - self.dbClient[SETTING_DB_NAME]['EquitySymbol'].update_one(flt, {'$set':symbolDict}, - upsert=True) - print u'股票代码下载完成' - else: - print u'股票代码下载失败' - - #---------------------------------------------------------------------- - def downloadEquityDailyBar(self, symbol): - """ - 下载股票的日行情,symbol是股票代码 - """ - print u'开始下载%s日行情' %symbol - - # 查询数据库中已有数据的最后日期 - cl = self.dbClient[DAILY_DB_NAME][symbol] - cx = cl.find(sort=[('datetime', pymongo.DESCENDING)]) - if cx.count(): - last = cx[0] - else: - last = '' - - # 开始下载数据 - path = 'api/market/getMktEqud.json' - - params = {} - params['ticker'] = symbol - if last: - params['beginDate'] = last['date'] - - data = self.datayesClient.downloadData(path, params) - - if data: - # 创建datetime索引 - self.dbClient[DAILY_DB_NAME][symbol].ensure_index([('datetime', pymongo.ASCENDING)], - unique=True) - - for d in data: - bar = VtBarData() - bar.vtSymbol = symbol - bar.symbol = symbol - try: - bar.exchange = DATAYES_TO_VT_EXCHANGE.get(d.get('exchangeCD', ''), '') - bar.open = d.get('openPrice', 0) - bar.high = d.get('highestPrice', 0) - bar.low = d.get('lowestPrice', 0) - bar.close = d.get('closePrice', 0) - bar.date = d.get('tradeDate', '').replace('-', '') - bar.time = '' - bar.datetime = datetime.strptime(bar.date, '%Y%m%d') - bar.volume = d.get('turnoverVol', 0) - except KeyError: - print d - - flt = {'datetime': bar.datetime} - self.dbClient[DAILY_DB_NAME][symbol].update_one(flt, {'$set':bar.__dict__}, upsert=True) - - print u'%s下载完成' %symbol - else: - print u'找不到合约%s' %symbol - -#---------------------------------------------------------------------- -def downloadEquityDailyBarts(self, symbol): - """ - 下载股票的日行情,symbol是股票代码 - """ - print u'开始下载%s日行情' %symbol - - # 查询数据库中已有数据的最后日期 - cl = self.dbClient[DAILY_DB_NAME][symbol] - cx = cl.find(sort=[('datetime', pymongo.DESCENDING)]) - if cx.count(): - last = cx[0] - else: - last = '' - # 开始下载数据 - import tushare as ts - - if last: - start = last['date'][:4]+'-'+last['date'][4:6]+'-'+last['date'][6:] - - data = ts.get_k_data(symbol,start) - - if not data.empty: - # 创建datetime索引 - self.dbClient[DAILY_DB_NAME][symbol].ensure_index([('datetime', pymongo.ASCENDING)], - unique=True) - - for index, d in data.iterrows(): - bar = VtBarData() - bar.vtSymbol = symbol - bar.symbol = symbol - try: - bar.open = d.get('open') - bar.high = d.get('high') - bar.low = d.get('low') - bar.close = d.get('close') - bar.date = d.get('date').replace('-', '') - bar.time = '' - bar.datetime = datetime.strptime(bar.date, '%Y%m%d') - bar.volume = d.get('volume') - except KeyError: - print d - - flt = {'datetime': bar.datetime} - self.dbClient[DAILY_DB_NAME][symbol].update_one(flt, {'$set':bar.__dict__}, upsert=True) - - print u'%s下载完成' %symbol - else: - print u'找不到合约%s' %symbol - -#---------------------------------------------------------------------- -def loadMcCsv(fileName, dbName, symbol): - """将Multicharts导出的csv格式的历史数据插入到Mongo数据库中""" - import csv - - start = time() - print u'开始读取CSV文件%s中的数据插入到%s的%s中' %(fileName, dbName, symbol) - - # 锁定集合,并创建索引 - client = pymongo.MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort']) - collection = client[dbName][symbol] - collection.ensure_index([('datetime', pymongo.ASCENDING)], unique=True) - - # 读取数据和插入到数据库 - reader = csv.DictReader(file(fileName, 'r')) - for d in reader: - bar = VtBarData() - bar.vtSymbol = symbol - bar.symbol = symbol - bar.open = float(d['Open']) - bar.high = float(d['High']) - bar.low = float(d['Low']) - bar.close = float(d['Close']) - bar.date = datetime.strptime(d['Date'], '%Y-%m-%d').strftime('%Y%m%d') - bar.time = d['Time'] - bar.datetime = datetime.strptime(bar.date + ' ' + bar.time, '%Y%m%d %H:%M:%S') - bar.volume = d['TotalVolume'] - - flt = {'datetime': bar.datetime} - collection.update_one(flt, {'$set':bar.__dict__}, upsert=True) - print bar.date, bar.time - - print u'插入完毕,耗时:%s' % (time()-start) - -#---------------------------------------------------------------------- -def loadTbCsv(fileName, dbName, symbol): - """将TradeBlazer导出的csv格式的历史分钟数据插入到Mongo数据库中""" - import csv - - start = time() - print u'开始读取CSV文件%s中的数据插入到%s的%s中' %(fileName, dbName, symbol) - - # 锁定集合,并创建索引 - client = pymongo.MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort']) - collection = client[dbName][symbol] - collection.ensure_index([('datetime', pymongo.ASCENDING)], unique=True) - - # 读取数据和插入到数据库 - reader = csv.reader(file(fileName, 'r')) - for d in reader: - bar = VtBarData() - bar.vtSymbol = symbol - bar.symbol = symbol - bar.open = float(d[1]) - bar.high = float(d[2]) - bar.low = float(d[3]) - bar.close = float(d[4]) - bar.date = datetime.strptime(d[0].split(' ')[0], '%Y/%m/%d').strftime('%Y%m%d') - bar.time = d[0].split(' ')[1]+":00" - bar.datetime = datetime.strptime(bar.date + ' ' + bar.time, '%Y%m%d %H:%M:%S') - bar.volume = d[5] - bar.openInterest = d[6] - - flt = {'datetime': bar.datetime} - collection.update_one(flt, {'$set':bar.__dict__}, upsert=True) - print bar.date, bar.time - - print u'插入完毕,耗时:%s' % (time()-start) - - #---------------------------------------------------------------------- -def loadTbPlusCsv(fileName, dbName, symbol): - """将TB极速版导出的csv格式的历史分钟数据插入到Mongo数据库中""" - import csv - - start = time() - print u'开始读取CSV文件%s中的数据插入到%s的%s中' %(fileName, dbName, symbol) - - # 锁定集合,并创建索引 - client = pymongo.MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort']) - collection = client[dbName][symbol] - collection.ensure_index([('datetime', pymongo.ASCENDING)], unique=True) - - # 读取数据和插入到数据库 - reader = csv.reader(file(fileName, 'r')) - for d in reader: - bar = VtBarData() - bar.vtSymbol = symbol - bar.symbol = symbol - bar.open = float(d[2]) - bar.high = float(d[3]) - bar.low = float(d[4]) - bar.close = float(d[5]) - bar.date = str(d[0]) - - tempstr=str(round(float(d[1])*10000)).split(".")[0].zfill(4) - bar.time = tempstr[:2]+":"+tempstr[2:4]+":00" - - bar.datetime = datetime.strptime(bar.date + ' ' + bar.time, '%Y%m%d %H:%M:%S') - bar.volume = d[6] - bar.openInterest = d[7] - flt = {'datetime': bar.datetime} - collection.update_one(flt, {'$set':bar.__dict__}, upsert=True) - print bar.date, bar.time - - print u'插入完毕,耗时:%s' % (time()-start) - - -#---------------------------------------------------------------------- -def loadTdxCsv(fileName, dbName, symbol): - """将通达信导出的csv格式的历史分钟数据插入到Mongo数据库中""" - import csv - - start = time() - print u'开始读取CSV文件%s中的数据插入到%s的%s中' %(fileName, dbName, symbol) - - # 锁定集合,并创建索引 - client = pymongo.MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort']) - collection = client[dbName][symbol] - collection.ensure_index([('datetime', pymongo.ASCENDING)], unique=True) - - # 读取数据和插入到数据库 - reader = csv.reader(file(fileName, 'r')) - for d in reader: - bar = VtBarData() - bar.vtSymbol = symbol - bar.symbol = symbol - bar.open = float(d[2]) - bar.high = float(d[3]) - bar.low = float(d[4]) - bar.close = float(d[5]) - bar.date = datetime.strptime(d[0], '%Y/%m/%d').strftime('%Y%m%d') - bar.time = d[1][:2]+':'+d[1][2:4]+':00' - bar.datetime = datetime.strptime(bar.date + ' ' + bar.time, '%Y%m%d %H:%M:%S') - bar.volume = d[6] - bar.openInterest = d[7] - - flt = {'datetime': bar.datetime} - collection.update_one(flt, {'$set':bar.__dict__}, upsert=True) - print bar.date, bar.time - - print u'插入完毕,耗时:%s' % (time()-start) - - diff --git a/examples/VnTrader/ctaStrategy123/ctaTemplate.py b/examples/VnTrader/ctaStrategy123/ctaTemplate.py deleted file mode 100644 index 640833c1..00000000 --- a/examples/VnTrader/ctaStrategy123/ctaTemplate.py +++ /dev/null @@ -1,580 +0,0 @@ -# encoding: UTF-8 - -''' -本文件包含了CTA引擎中的策略开发用模板,开发策略时需要继承CtaTemplate类。 -''' - -import numpy as np -import talib - -from vnpy.trader.vtConstant import * -from vnpy.trader.vtObject import VtBarData - -from .ctaBase import * - - -######################################################################## -class CtaTemplate(object): - """CTA策略模板""" - - # 策略类的名称和作者 - className = 'CtaTemplate' - author = EMPTY_UNICODE - - # MongoDB数据库的名称,K线数据库默认为1分钟 - tickDbName = TICK_DB_NAME - barDbName = MINUTE_DB_NAME - - # 策略的基本参数 - name = EMPTY_UNICODE # 策略实例名称 - vtSymbol = EMPTY_STRING # 交易的合约vt系统代码 - productClass = EMPTY_STRING # 产品类型(只有IB接口需要) - currency = EMPTY_STRING # 货币(只有IB接口需要) - - # 策略的基本变量,由引擎管理 - inited = False # 是否进行了初始化 - trading = False # 是否启动交易,由引擎管理 - pos = 0 # 持仓情况 - - # 参数列表,保存了参数的名称 - paramList = ['name', - 'className', - 'author', - 'vtSymbol'] - - # 变量列表,保存了变量的名称 - varList = ['inited', - 'trading', - 'pos'] - - #---------------------------------------------------------------------- - def __init__(self, ctaEngine, setting): - """Constructor""" - self.ctaEngine = ctaEngine - - # 设置策略的参数 - if setting: - d = self.__dict__ - for key in self.paramList: - if key in setting: - d[key] = setting[key] - - #---------------------------------------------------------------------- - def onInit(self): - """初始化策略(必须由用户继承实现)""" - raise NotImplementedError - - #---------------------------------------------------------------------- - def onStart(self): - """启动策略(必须由用户继承实现)""" - raise NotImplementedError - - #---------------------------------------------------------------------- - def onStop(self): - """停止策略(必须由用户继承实现)""" - raise NotImplementedError - - #---------------------------------------------------------------------- - def onTick(self, tick): - """收到行情TICK推送(必须由用户继承实现)""" - raise NotImplementedError - - #---------------------------------------------------------------------- - def onOrder(self, order): - """收到委托变化推送(必须由用户继承实现)""" - raise NotImplementedError - - #---------------------------------------------------------------------- - def onTrade(self, trade): - """收到成交推送(必须由用户继承实现)""" - raise NotImplementedError - - #---------------------------------------------------------------------- - def onBar(self, bar): - """收到Bar推送(必须由用户继承实现)""" - raise NotImplementedError - - #---------------------------------------------------------------------- - def onStopOrder(self, so): - """收到停止单推送(必须由用户继承实现)""" - raise NotImplementedError - - #---------------------------------------------------------------------- - def buy(self, price, volume, stop=False): - """买开""" - return self.sendOrder(CTAORDER_BUY, price, volume, stop) - - #---------------------------------------------------------------------- - def sell(self, price, volume, stop=False): - """卖平""" - return self.sendOrder(CTAORDER_SELL, price, volume, stop) - - #---------------------------------------------------------------------- - def short(self, price, volume, stop=False): - """卖开""" - return self.sendOrder(CTAORDER_SHORT, price, volume, stop) - - #---------------------------------------------------------------------- - def cover(self, price, volume, stop=False): - """买平""" - return self.sendOrder(CTAORDER_COVER, price, volume, stop) - - #---------------------------------------------------------------------- - def sendOrder(self, orderType, price, volume, stop=False): - """发送委托""" - if self.trading: - # 如果stop为True,则意味着发本地停止单 - if stop: - vtOrderIDList = self.ctaEngine.sendStopOrder(self.vtSymbol, orderType, price, volume, self) - else: - vtOrderIDList = self.ctaEngine.sendOrder(self.vtSymbol, orderType, price, volume, self) - return vtOrderIDList - else: - # 交易停止时发单返回空字符串 - return [] - - #---------------------------------------------------------------------- - def cancelOrder(self, vtOrderID): - """撤单""" - # 如果发单号为空字符串,则不进行后续操作 - if not vtOrderID: - return - - if STOPORDERPREFIX in vtOrderID: - self.ctaEngine.cancelStopOrder(vtOrderID) - else: - self.ctaEngine.cancelOrder(vtOrderID) - - #---------------------------------------------------------------------- - def cancelAll(self): - """全部撤单""" - self.ctaEngine.cancelAll(self.name) - - #---------------------------------------------------------------------- - def insertTick(self, tick): - """向数据库中插入tick数据""" - self.ctaEngine.insertData(self.tickDbName, self.vtSymbol, tick) - - #---------------------------------------------------------------------- - def insertBar(self, bar): - """向数据库中插入bar数据""" - self.ctaEngine.insertData(self.barDbName, self.vtSymbol, bar) - - #---------------------------------------------------------------------- - def loadTick(self, days): - """读取tick数据""" - return self.ctaEngine.loadTick(self.tickDbName, self.vtSymbol, days) - - #---------------------------------------------------------------------- - def loadBar(self, days): - """读取bar数据""" - return self.ctaEngine.loadBar(self.barDbName, self.vtSymbol, days) - - #---------------------------------------------------------------------- - def writeCtaLog(self, content): - """记录CTA日志""" - content = self.name + ':' + content - self.ctaEngine.writeCtaLog(content) - - #---------------------------------------------------------------------- - def putEvent(self): - """发出策略状态变化事件""" - self.ctaEngine.putStrategyEvent(self.name) - - #---------------------------------------------------------------------- - def getEngineType(self): - """查询当前运行的环境""" - return self.ctaEngine.engineType - - -######################################################################## -class TargetPosTemplate(CtaTemplate): - """ - 允许直接通过修改目标持仓来实现交易的策略模板 - - 开发策略时,无需再调用buy/sell/cover/short这些具体的委托指令, - 只需在策略逻辑运行完成后调用setTargetPos设置目标持仓,底层算法 - 会自动完成相关交易,适合不擅长管理交易挂撤单细节的用户。 - - 使用该模板开发策略时,请在以下回调方法中先调用母类的方法: - onTick - onBar - onOrder - - 假设策略名为TestStrategy,请在onTick回调中加上: - super(TestStrategy, self).onTick(tick) - - 其他方法类同。 - """ - - className = 'TargetPosTemplate' - author = u'量衍投资' - - # 目标持仓模板的基本变量 - tickAdd = 1 # 委托时相对基准价格的超价 - lastTick = None # 最新tick数据 - lastBar = None # 最新bar数据 - targetPos = EMPTY_INT # 目标持仓 - orderList = [] # 委托号列表 - - # 变量列表,保存了变量的名称 - varList = ['inited', - 'trading', - 'pos', - 'targetPos'] - - #---------------------------------------------------------------------- - def __init__(self, ctaEngine, setting): - """Constructor""" - super(TargetPosTemplate, self).__init__(ctaEngine, setting) - - #---------------------------------------------------------------------- - def onTick(self, tick): - """收到行情推送""" - self.lastTick = tick - - # 实盘模式下,启动交易后,需要根据tick的实时推送执行自动开平仓操作 - if self.trading: - self.trade() - - #---------------------------------------------------------------------- - def onBar(self, bar): - """收到K线推送""" - self.lastBar = bar - - #---------------------------------------------------------------------- - def onOrder(self, order): - """收到委托推送""" - if order.status == STATUS_ALLTRADED or order.status == STATUS_CANCELLED: - self.orderList.remove(order.vtOrderID) - - #---------------------------------------------------------------------- - def setTargetPos(self, targetPos): - """设置目标仓位""" - self.targetPos = targetPos - - self.trade() - - #---------------------------------------------------------------------- - def trade(self): - """执行交易""" - # 先撤销之前的委托 - for vtOrderID in self.orderList: - self.cancelOrder(vtOrderID) - self.orderList = [] - - # 如果目标仓位和实际仓位一致,则不进行任何操作 - posChange = self.targetPos - self.pos - if not posChange: - return - - # 确定委托基准价格,有tick数据时优先使用,否则使用bar - longPrice = 0 - shortPrice = 0 - - if self.lastTick: - if posChange > 0: - longPrice = self.lastTick.askPrice1 + self.tickAdd - else: - shortPrice = self.lastTick.bidPrice1 - self.tickAdd - else: - if posChange > 0: - longPrice = self.lastBar.close + self.tickAdd - else: - shortPrice = self.lastBar.close - self.tickAdd - - # 回测模式下,采用合并平仓和反向开仓委托的方式 - if self.getEngineType() == ENGINETYPE_BACKTESTING: - if posChange > 0: - l = self.buy(longPrice, abs(posChange)) - else: - l = self.short(shortPrice, abs(posChange)) - self.orderList.extend(l) - - # 实盘模式下,首先确保之前的委托都已经结束(全成、撤销) - # 然后先发平仓委托,等待成交后,再发送新的开仓委托 - else: - # 检查之前委托都已结束 - if self.orderList: - return - - # 买入 - if posChange > 0: - if self.pos < 0: - l = self.cover(longPrice, abs(self.pos)) - else: - l = self.buy(longPrice, abs(posChange)) - # 卖出 - else: - if self.pos > 0: - l = self.sell(shortPrice, abs(self.pos)) - else: - l = self.short(shortPrice, abs(posChange)) - self.orderList.extend(l) - - -######################################################################## -class BarManager(object): - """ - K线合成器,支持: - 1. 基于Tick合成1分钟K线 - 2. 基于1分钟K线合成X分钟K线(X可以是2、3、5、10、15、30、60) - """ - - #---------------------------------------------------------------------- - def __init__(self, onBar, xmin=0, onXminBar=None): - """Constructor""" - self.bar = None # 1分钟K线对象 - self.onBar = onBar # 1分钟K线回调函数 - - self.xminBar = None # X分钟K线对象 - self.xmin = xmin # X的值 - self.onXminBar = onXminBar # X分钟K线的回调函数 - - self.lastTick = None # 上一TICK缓存对象 - - #---------------------------------------------------------------------- - def updateTick(self, tick): - """TICK更新""" - newMinute = False # 默认不是新的一分钟 - - # 尚未创建对象 - if not self.bar: - self.bar = VtBarData() - newMinute = True - # 新的一分钟 - elif self.bar.datetime.minute != tick.datetime.minute: - # 生成上一分钟K线的时间戳 - self.bar.datetime = self.bar.datetime.replace(second=0, microsecond=0) # 将秒和微秒设为0 - self.bar.date = self.bar.datetime.strftime('%Y%m%d') - self.bar.time = self.bar.datetime.strftime('%H:%M:%S.%f') - - # 推送已经结束的上一分钟K线 - self.onBar(self.bar) - - # 创建新的K线对象 - self.bar = VtBarData() - newMinute = True - - # 初始化新一分钟的K线数据 - if newMinute: - self.bar.vtSymbol = tick.vtSymbol - self.bar.symbol = tick.symbol - self.bar.exchange = tick.exchange - - self.bar.open = tick.lastPrice - self.bar.high = tick.lastPrice - self.bar.low = tick.lastPrice - # 累加更新老一分钟的K线数据 - else: - self.bar.high = max(self.bar.high, tick.lastPrice) - self.bar.low = min(self.bar.low, tick.lastPrice) - - # 通用更新部分 - self.bar.close = tick.lastPrice - self.bar.datetime = tick.datetime - self.bar.openInterest = tick.openInterest - - if self.lastTick: - self.bar.volume += (tick.volume - self.lastTick.volume) # 当前K线内的成交量 - - # 缓存Tick - self.lastTick = tick - - #---------------------------------------------------------------------- - def updateBar(self, bar): - """1分钟K线更新""" - # 尚未创建对象 - if not self.xminBar: - self.xminBar = VtBarData() - - self.xminBar.vtSymbol = bar.vtSymbol - self.xminBar.symbol = bar.symbol - self.xminBar.exchange = bar.exchange - - self.xminBar.open = bar.open - self.xminBar.high = bar.high - self.xminBar.low = bar.low - # 累加老K线 - else: - self.xminBar.high = max(self.xminBar.high, bar.high) - self.xminBar.low = min(self.xminBar.low, bar.low) - - # 通用部分 - self.xminBar.close = bar.close - self.xminBar.datetime = bar.datetime - self.xminBar.openInterest = bar.openInterest - self.xminBar.volume += int(bar.volume) - - # X分钟已经走完 - if not bar.datetime.minute % self.xmin: # 可以用X整除 - # 生成上一X分钟K线的时间戳 - self.xminBar.datetime = self.xminBar.datetime.replace(second=0, microsecond=0) # 将秒和微秒设为0 - self.xminBar.date = self.xminBar.datetime.strftime('%Y%m%d') - self.xminBar.time = self.xminBar.datetime.strftime('%H:%M:%S.%f') - - # 推送 - self.onXminBar(self.xminBar) - - # 清空老K线缓存对象 - self.xminBar = None - - -######################################################################## -class ArrayManager(object): - """ - K线序列管理工具,负责: - 1. K线时间序列的维护 - 2. 常用技术指标的计算 - """ - - #---------------------------------------------------------------------- - def __init__(self, size=100): - """Constructor""" - self.count = 0 # 缓存计数 - self.size = size # 缓存大小 - self.inited = False # True if count>=size - - self.openArray = np.zeros(size) # OHLC - self.highArray = np.zeros(size) - self.lowArray = np.zeros(size) - self.closeArray = np.zeros(size) - self.volumeArray = np.zeros(size) - - #---------------------------------------------------------------------- - def updateBar(self, bar): - """更新K线""" - self.count += 1 - if not self.inited and self.count >= self.size: - self.inited = True - - self.openArray[0:self.size-1] = self.openArray[1:self.size] - self.highArray[0:self.size-1] = self.highArray[1:self.size] - self.lowArray[0:self.size-1] = self.lowArray[1:self.size] - self.closeArray[0:self.size-1] = self.closeArray[1:self.size] - self.volumeArray[0:self.size-1] = self.volumeArray[1:self.size] - - self.openArray[-1] = bar.open - self.highArray[-1] = bar.high - self.lowArray[-1] = bar.low - self.closeArray[-1] = bar.close - self.volumeArray[-1] = bar.volume - - #---------------------------------------------------------------------- - @property - def open(self): - """获取开盘价序列""" - return self.openArray - - #---------------------------------------------------------------------- - @property - def high(self): - """获取最高价序列""" - return self.highArray - - #---------------------------------------------------------------------- - @property - def low(self): - """获取最低价序列""" - return self.lowArray - - #---------------------------------------------------------------------- - @property - def close(self): - """获取收盘价序列""" - return self.closeArray - - #---------------------------------------------------------------------- - @property - def volume(self): - """获取成交量序列""" - return self.volumeArray - - #---------------------------------------------------------------------- - def sma(self, n, array=False): - """简单均线""" - result = talib.SMA(self.close, n) - if array: - return result - return result[-1] - - #---------------------------------------------------------------------- - def std(self, n, array=False): - """标准差""" - result = talib.STDDEV(self.close, n) - if array: - return result - return result[-1] - - #---------------------------------------------------------------------- - def cci(self, n, array=False): - """CCI指标""" - result = talib.CCI(self.high, self.low, self.close, n) - if array: - return result - return result[-1] - - #---------------------------------------------------------------------- - def atr(self, n, array=False): - """ATR指标""" - result = talib.ATR(self.high, self.low, self.close, n) - if array: - return result - return result[-1] - - #---------------------------------------------------------------------- - def rsi(self, n, array=False): - """RSI指标""" - result = talib.RSI(self.close, n) - if array: - return result - return result[-1] - - #---------------------------------------------------------------------- - def macd(self, fastPeriod, slowPeriod, signalPeriod, array=False): - """MACD指标""" - macd, signal, hist = talib.MACD(self.close, fastPeriod, - slowPeriod, signalPeriod) - if array: - return macd, signal, hist - return macd[-1], signal[-1], hist[-1] - - #---------------------------------------------------------------------- - def adx(self, n, array=False): - """ADX指标""" - result = talib.ADX(self.high, self.low, self.close, n) - if array: - return result - return result[-1] - - #---------------------------------------------------------------------- - def boll(self, n, dev, array=False): - """布林通道""" - mid = self.sma(n, array) - std = self.std(n, array) - - up = mid + std * dev - down = mid - std * dev - - return up, down - - #---------------------------------------------------------------------- - def keltner(self, n, dev, array=False): - """肯特纳通道""" - mid = self.sma(n, array) - atr = self.atr(n, array) - - up = mid + atr * dev - down = mid - atr * dev - - return up, down - - #---------------------------------------------------------------------- - def donchian(self, n, array=False): - """唐奇安通道""" - up = talib.MAX(self.high, n) - down = talib.MIN(self.low, n) - - if array: - return up, down - return up[-1], down[-1] \ No newline at end of file diff --git a/examples/VnTrader/ctaStrategy123/datayes.json b/examples/VnTrader/ctaStrategy123/datayes.json deleted file mode 100644 index e2d30eed..00000000 --- a/examples/VnTrader/ctaStrategy123/datayes.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "domain": "http://api.wmcloud.com/data", - "version": "v1", - "token": "575593eb7696aec7339224c0fac2313780d8645f68b77369dcb35f8bcb419a0b" -} \ No newline at end of file diff --git a/examples/VnTrader/ctaStrategy123/datayesClient.py b/examples/VnTrader/ctaStrategy123/datayesClient.py deleted file mode 100644 index ec653f10..00000000 --- a/examples/VnTrader/ctaStrategy123/datayesClient.py +++ /dev/null @@ -1,83 +0,0 @@ -# encoding: UTF-8 - -'''一个简单的通联数据客户端,主要使用requests开发,比通联官网的python例子更为简洁。''' - -import os -import requests -import json - -FILENAME = 'datayes.json' -HTTP_OK = 200 - - -######################################################################## -class DatayesClient(object): - """通联数据客户端""" - - name = u'通联数据客户端' - - #---------------------------------------------------------------------- - def __init__(self): - """Constructor""" - self.domain = '' # 主域名 - self.version = '' # API版本 - self.token = '' # 授权码 - self.header = {} # http请求头部 - self.settingLoaded = False # 配置是否已经读取 - - self.loadSetting() - - #---------------------------------------------------------------------- - def loadSetting(self): - """载入配置""" - try: - path = os.path.abspath(os.path.dirname(__file__)) - FILENAME = os.path.join(path, FILENAME) - f = file(FILENAME) - except IOError: - print u'%s无法打开配置文件' % self.name - return - - setting = json.load(f) - try: - self.domain = str(setting['domain']) - self.version = str(setting['version']) - self.token = str(setting['token']) - except KeyError: - print u'%s配置文件字段缺失' % self.name - return - - self.header['Connection'] = 'keep_alive' - self.header['Authorization'] = 'Bearer ' + self.token - self.settingLoaded = True - - print u'%s配置载入完成' % self.name - - - #---------------------------------------------------------------------- - def downloadData(self, path, params): - """下载数据""" - if not self.settingLoaded: - print u'%s配置未载入' % self.name - return None - else: - url = '/'.join([self.domain, self.version, path]) - r = requests.get(url=url, headers=self.header, params=params) - - if r.status_code != HTTP_OK: - print u'%shttp请求失败,状态代码%s' %(self.name, r.status_code) - return None - else: - result = r.json() - if 'retMsg' in result and result['retMsg'] == 'Success': - return result['data'] - else: - if 'retMsg' in result: - print u'%s查询失败,返回信息%s' %(self.name, result['retMsg']) - elif 'message' in result: - print u'%s查询失败,返回信息%s' %(self.name, result['message']) - return None - - - - \ No newline at end of file diff --git a/examples/VnTrader/ctaStrategy123/language/__init__.py b/examples/VnTrader/ctaStrategy123/language/__init__.py deleted file mode 100644 index a058945f..00000000 --- a/examples/VnTrader/ctaStrategy123/language/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# encoding: UTF-8 - -import json -import os -import traceback - -# 默认设置 -from chinese import text - -# 是否要使用英文 -from vnpy.trader.vtGlobal import globalSetting -if globalSetting['language'] == 'english': - from english import text diff --git a/examples/VnTrader/ctaStrategy123/language/chinese/__init__.py b/examples/VnTrader/ctaStrategy123/language/chinese/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/examples/VnTrader/ctaStrategy123/language/chinese/text.py b/examples/VnTrader/ctaStrategy123/language/chinese/text.py deleted file mode 100644 index e8410d15..00000000 --- a/examples/VnTrader/ctaStrategy123/language/chinese/text.py +++ /dev/null @@ -1,18 +0,0 @@ -# encoding: UTF-8 - -INIT = u'初始化' -START = u'启动' -STOP = u'停止' - -CTA_ENGINE_STARTED = u'CTA引擎启动成功' - -CTA_STRATEGY = u'CTA策略' -LOAD_STRATEGY = u'加载策略' -INIT_ALL = u'全部初始化' -START_ALL = u'全部启动' -STOP_ALL = u'全部停止' -SAVE_POSITION_DATA = u'保存持仓' - -STRATEGY_LOADED = u'策略加载成功' - -SAVE_POSITION_QUESTION = u'是否要保存策略持仓数据到数据库?' \ No newline at end of file diff --git a/examples/VnTrader/ctaStrategy123/language/english/__init__.py b/examples/VnTrader/ctaStrategy123/language/english/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/examples/VnTrader/ctaStrategy123/language/english/text.py b/examples/VnTrader/ctaStrategy123/language/english/text.py deleted file mode 100644 index 5e8bfb34..00000000 --- a/examples/VnTrader/ctaStrategy123/language/english/text.py +++ /dev/null @@ -1,18 +0,0 @@ -# encoding: UTF-8 - -INIT = u'Init' -START = u'Start' -STOP = u'Stop' - -CTA_ENGINE_STARTED = u'CTA engine started.' - -CTA_STRATEGY = u'CTA Strategy' -LOAD_STRATEGY = u'Load Strategy' -INIT_ALL = u'Init All' -START_ALL = u'Start All' -STOP_ALL = u'Stop All' -SAVE_POSITION_DATA = u'Save Position Data' - -STRATEGY_LOADED = u'Strategy loaded.' - -SAVE_POSITION_QUESTION = u'Do you want to save strategy position data into database?' \ No newline at end of file diff --git a/examples/VnTrader/ctaStrategy123/strategy/__init__.py b/examples/VnTrader/ctaStrategy123/strategy/__init__.py deleted file mode 100644 index 63daa733..00000000 --- a/examples/VnTrader/ctaStrategy123/strategy/__init__.py +++ /dev/null @@ -1,50 +0,0 @@ -# encoding: UTF-8 - -''' -动态载入所有的策略类 -''' - -import os -import importlib -import traceback - -# 用来保存策略类的字典 -STRATEGY_CLASS = {} - -#---------------------------------------------------------------------- -def loadStrategyModule(moduleName): - """使用importlib动态载入模块""" - try: - module = importlib.import_module(moduleName) - - # 遍历模块下的对象,只有名称中包含'Strategy'的才是策略类 - for k in dir(module): - if 'Strategy' in k: - v = module.__getattribute__(k) - STRATEGY_CLASS[k] = v - except: - print '-' * 20 - print ('Failed to import strategy file %s:' %moduleName) - traceback.print_exc() - - -# 遍历strategy目录下的文件 -path = os.path.abspath(os.path.dirname(__file__)) -for root, subdirs, files in os.walk(path): - for name in files: - # 只有文件名中包含strategy且非.pyc的文件,才是策略文件 - if 'strategy' in name and '.pyc' not in name: - # 模块名称需要模块路径前缀 - moduleName = 'vnpy.trader.app.ctaStrategy.strategy.' + name.replace('.py', '') - loadStrategyModule(moduleName) - - -# 遍历工作目录下的文件 -workingPath = os.getcwd() -for root, subdirs, files in os.walk(workingPath): - for name in files: - # 只有文件名中包含strategy且非.pyc的文件,才是策略文件 - if 'strategy' in name and '.pyc' not in name: - # 模块名称无需前缀 - moduleName = name.replace('.py', '') - loadStrategyModule(moduleName) diff --git a/examples/VnTrader/ctaStrategy123/strategy/strategyAtrRsi.py b/examples/VnTrader/ctaStrategy123/strategy/strategyAtrRsi.py deleted file mode 100644 index cca7fcbf..00000000 --- a/examples/VnTrader/ctaStrategy123/strategy/strategyAtrRsi.py +++ /dev/null @@ -1,184 +0,0 @@ -# encoding: UTF-8 - -""" -一个ATR-RSI指标结合的交易策略,适合用在股指的1分钟和5分钟线上。 - -注意事项: -1. 作者不对交易盈利做任何保证,策略代码仅供参考 -2. 本策略需要用到talib,没有安装的用户请先参考www.vnpy.org上的教程安装 -3. 将IF0000_1min.csv用ctaHistoryData.py导入MongoDB后,直接运行本文件即可回测策略 - -""" - -from vnpy.trader.vtObject import VtBarData -from vnpy.trader.vtConstant import EMPTY_STRING -from vnpy.trader.app.ctaStrategy.ctaTemplate import (CtaTemplate, - BarManager, - ArrayManager) - - -######################################################################## -class AtrRsiStrategy(CtaTemplate): - """结合ATR和RSI指标的一个分钟线交易策略""" - className = 'AtrRsiStrategy' - author = u'用Python的交易员' - - # 策略参数 - atrLength = 22 # 计算ATR指标的窗口数 - atrMaLength = 10 # 计算ATR均线的窗口数 - rsiLength = 5 # 计算RSI的窗口数 - rsiEntry = 16 # RSI的开仓信号 - trailingPercent = 0.8 # 百分比移动止损 - initDays = 10 # 初始化数据所用的天数 - fixedSize = 1 # 每次交易的数量 - - # 策略变量 - atrValue = 0 # 最新的ATR指标数值 - atrMa = 0 # ATR移动平均的数值 - rsiValue = 0 # RSI指标的数值 - rsiBuy = 0 # RSI买开阈值 - rsiSell = 0 # RSI卖开阈值 - intraTradeHigh = 0 # 移动止损用的持仓期内最高价 - intraTradeLow = 0 # 移动止损用的持仓期内最低价 - - # 参数列表,保存了参数的名称 - paramList = ['name', - 'className', - 'author', - 'vtSymbol', - 'atrLength', - 'atrMaLength', - 'rsiLength', - 'rsiEntry', - 'trailingPercent'] - - # 变量列表,保存了变量的名称 - varList = ['inited', - 'trading', - 'pos', - 'atrValue', - 'atrMa', - 'rsiValue', - 'rsiBuy', - 'rsiSell'] - - #---------------------------------------------------------------------- - def __init__(self, ctaEngine, setting): - """Constructor""" - super(AtrRsiStrategy, self).__init__(ctaEngine, setting) - - # 创建K线合成器对象 - self.bm = BarManager(self.onBar) - self.am = ArrayManager() - - # 注意策略类中的可变对象属性(通常是list和dict等),在策略初始化时需要重新创建, - # 否则会出现多个策略实例之间数据共享的情况,有可能导致潜在的策略逻辑错误风险, - # 策略类中的这些可变对象属性可以选择不写,全都放在__init__下面,写主要是为了阅读 - # 策略时方便(更多是个编程习惯的选择) - - #---------------------------------------------------------------------- - def onInit(self): - """初始化策略(必须由用户继承实现)""" - self.writeCtaLog(u'%s策略初始化' %self.name) - - # 初始化RSI入场阈值 - self.rsiBuy = 50 + self.rsiEntry - self.rsiSell = 50 - self.rsiEntry - - # 载入历史数据,并采用回放计算的方式初始化策略数值 - initData = self.loadBar(self.initDays) - for bar in initData: - self.onBar(bar) - - self.putEvent() - - #---------------------------------------------------------------------- - def onStart(self): - """启动策略(必须由用户继承实现)""" - self.writeCtaLog(u'%s策略启动' %self.name) - self.putEvent() - - #---------------------------------------------------------------------- - def onStop(self): - """停止策略(必须由用户继承实现)""" - self.writeCtaLog(u'%s策略停止' %self.name) - self.putEvent() - - #---------------------------------------------------------------------- - def onTick(self, tick): - """收到行情TICK推送(必须由用户继承实现)""" - self.bm.updateTick(tick) - - #---------------------------------------------------------------------- - def onBar(self, bar): - """收到Bar推送(必须由用户继承实现)""" - self.cancelAll() - - # 保存K线数据 - am = self.am - am.updateBar(bar) - if not am.inited: - return - - # 计算指标数值 - atrArray = am.atr(self.atrLength, array=True) - self.atrValue = atrArray[-1] - self.atrMa = atrArray[-self.atrMaLength:].mean() - - self.rsiValue = am.rsi(self.rsiLength) - - # 判断是否要进行交易 - - # 当前无仓位 - if self.pos == 0: - self.intraTradeHigh = bar.high - self.intraTradeLow = bar.low - - # ATR数值上穿其移动平均线,说明行情短期内波动加大 - # 即处于趋势的概率较大,适合CTA开仓 - if self.atrValue > self.atrMa: - # 使用RSI指标的趋势行情时,会在超买超卖区钝化特征,作为开仓信号 - if self.rsiValue > self.rsiBuy: - # 这里为了保证成交,选择超价5个整指数点下单 - self.buy(bar.close+5, self.fixedSize) - - elif self.rsiValue < self.rsiSell: - self.short(bar.close-5, self.fixedSize) - - # 持有多头仓位 - elif self.pos > 0: - # 计算多头持有期内的最高价,以及重置最低价 - self.intraTradeHigh = max(self.intraTradeHigh, bar.high) - self.intraTradeLow = bar.low - - # 计算多头移动止损 - longStop = self.intraTradeHigh * (1-self.trailingPercent/100) - - # 发出本地止损委托,并且把委托号记录下来,用于后续撤单 - self.sell(longStop, abs(self.pos), stop=True) - - # 持有空头仓位 - elif self.pos < 0: - self.intraTradeLow = min(self.intraTradeLow, bar.low) - self.intraTradeHigh = bar.high - - shortStop = self.intraTradeLow * (1+self.trailingPercent/100) - self.cover(shortStop, abs(self.pos), stop=True) - - # 发出状态更新事件 - self.putEvent() - - #---------------------------------------------------------------------- - def onOrder(self, order): - """收到委托变化推送(必须由用户继承实现)""" - pass - - #---------------------------------------------------------------------- - def onTrade(self, trade): - # 发出状态更新事件 - self.putEvent() - - #---------------------------------------------------------------------- - def onStopOrder(self, so): - """停止单推送""" - pass \ No newline at end of file diff --git a/examples/VnTrader/ctaStrategy123/strategy/strategyBollChannel.py b/examples/VnTrader/ctaStrategy123/strategy/strategyBollChannel.py deleted file mode 100644 index 9474612a..00000000 --- a/examples/VnTrader/ctaStrategy123/strategy/strategyBollChannel.py +++ /dev/null @@ -1,187 +0,0 @@ -# encoding: UTF-8 - -""" -感谢Darwin Quant贡献的策略思路。 -知乎专栏原文:https://zhuanlan.zhihu.com/p/24448511 - -策略逻辑: -1. 布林通道(信号) -2. CCI指标(过滤) -3. ATR指标(止损) - -适合品种:螺纹钢 -适合周期:15分钟 - -这里的策略是作者根据原文结合vn.py实现,对策略实现上做了一些修改,仅供参考。 - -""" - -from __future__ import division - -from vnpy.trader.vtObject import VtBarData -from vnpy.trader.vtConstant import EMPTY_STRING -from vnpy.trader.app.ctaStrategy.ctaTemplate import (CtaTemplate, - BarManager, - ArrayManager) - - -######################################################################## -class BollChannelStrategy(CtaTemplate): - """基于布林通道的交易策略""" - className = 'BollChannelStrategy' - author = u'用Python的交易员' - - # 策略参数 - bollWindow = 18 # 布林通道窗口数 - bollDev = 3.4 # 布林通道的偏差 - cciWindow = 10 # CCI窗口数 - atrWindow = 30 # ATR窗口数 - slMultiplier = 5.2 # 计算止损距离的乘数 - initDays = 10 # 初始化数据所用的天数 - fixedSize = 1 # 每次交易的数量 - - # 策略变量 - bollUp = 0 # 布林通道上轨 - bollDown = 0 # 布林通道下轨 - cciValue = 0 # CCI指标数值 - atrValue = 0 # ATR指标数值 - - intraTradeHigh = 0 # 持仓期内的最高点 - intraTradeLow = 0 # 持仓期内的最低点 - longStop = 0 # 多头止损 - shortStop = 0 # 空头止损 - - # 参数列表,保存了参数的名称 - paramList = ['name', - 'className', - 'author', - 'vtSymbol', - 'bollWindow', - 'bollDev', - 'cciWindow', - 'atrWindow', - 'slMultiplier', - 'initDays', - 'fixedSize'] - - # 变量列表,保存了变量的名称 - varList = ['inited', - 'trading', - 'pos', - 'bollUp', - 'bollDown', - 'cciValue', - 'atrValue', - 'intraTradeHigh', - 'intraTradeLow', - 'longStop', - 'shortStop'] - - #---------------------------------------------------------------------- - def __init__(self, ctaEngine, setting): - """Constructor""" - super(BollChannelStrategy, self).__init__(ctaEngine, setting) - - self.bm = BarManager(self.onBar, 15, self.onXminBar) # 创建K线合成器对象 - self.am = ArrayManager() - - #---------------------------------------------------------------------- - def onInit(self): - """初始化策略(必须由用户继承实现)""" - self.writeCtaLog(u'%s策略初始化' %self.name) - - # 载入历史数据,并采用回放计算的方式初始化策略数值 - initData = self.loadBar(self.initDays) - for bar in initData: - self.onBar(bar) - - self.putEvent() - - #---------------------------------------------------------------------- - def onStart(self): - """启动策略(必须由用户继承实现)""" - self.writeCtaLog(u'%s策略启动' %self.name) - self.putEvent() - - #---------------------------------------------------------------------- - def onStop(self): - """停止策略(必须由用户继承实现)""" - self.writeCtaLog(u'%s策略停止' %self.name) - self.putEvent() - - #---------------------------------------------------------------------- - def onTick(self, tick): - """收到行情TICK推送(必须由用户继承实现)""" - self.bm.updateTick(tick) - - #---------------------------------------------------------------------- - def onBar(self, bar): - """收到Bar推送(必须由用户继承实现)""" - self.bm.updateBar(bar) - - #---------------------------------------------------------------------- - def onXminBar(self, bar): - """收到X分钟K线""" - # 全撤之前发出的委托 - self.cancelAll() - - # 保存K线数据 - am = self.am - - am.updateBar(bar) - - if not am.inited: - return - - # 计算指标数值 - self.bollUp, self.bollDown = am.boll(self.bollWindow, self.bollDev) - self.cciValue = am.cci(self.cciWindow) - self.atrValue = am.atr(self.atrWindow) - - # 判断是否要进行交易 - - # 当前无仓位,发送开仓委托 - if self.pos == 0: - self.intraTradeHigh = bar.high - self.intraTradeLow = bar.low - - if self.cciValue > 0: - self.buy(self.bollUp, self.fixedSize, True) - - elif self.cciValue < 0: - self.short(self.bollDown, self.fixedSize, True) - - # 持有多头仓位 - elif self.pos > 0: - self.intraTradeHigh = max(self.intraTradeHigh, bar.high) - self.intraTradeLow = bar.low - self.longStop = self.intraTradeHigh - self.atrValue * self.slMultiplier - - self.sell(self.longStop, abs(self.pos), True) - - # 持有空头仓位 - elif self.pos < 0: - self.intraTradeHigh = bar.high - self.intraTradeLow = min(self.intraTradeLow, bar.low) - self.shortStop = self.intraTradeLow + self.atrValue * self.slMultiplier - - self.cover(self.shortStop, abs(self.pos), True) - - # 发出状态更新事件 - self.putEvent() - - #---------------------------------------------------------------------- - def onOrder(self, order): - """收到委托变化推送(必须由用户继承实现)""" - pass - - #---------------------------------------------------------------------- - def onTrade(self, trade): - # 发出状态更新事件 - self.putEvent() - - #---------------------------------------------------------------------- - def onStopOrder(self, so): - """停止单推送""" - pass - diff --git a/examples/VnTrader/ctaStrategy123/strategy/strategyDoubleMa.py b/examples/VnTrader/ctaStrategy123/strategy/strategyDoubleMa.py deleted file mode 100644 index 7b9b9ff3..00000000 --- a/examples/VnTrader/ctaStrategy123/strategy/strategyDoubleMa.py +++ /dev/null @@ -1,155 +0,0 @@ -# encoding: UTF-8 - -""" -这里的Demo是一个最简单的双均线策略实现,并未考虑太多实盘中的交易细节,如: -1. 委托价格超出涨跌停价导致的委托失败 -2. 委托未成交,需要撤单后重新委托 -3. 断网后恢复交易状态 -4. 等等 -这些点是作者选择特意忽略不去实现,因此想实盘的朋友请自己多多研究CTA交易的一些细节, -做到了然于胸后再去交易,对自己的money和时间负责。 -也希望社区能做出一个解决了以上潜在风险的Demo出来。 -""" - -from __future__ import division - -from vnpy.trader.vtConstant import EMPTY_STRING, EMPTY_FLOAT -from vnpy.trader.app.ctaStrategy.ctaTemplate import (CtaTemplate, - BarManager, - ArrayManager) - - -######################################################################## -class DoubleMaStrategy(CtaTemplate): - """双指数均线策略Demo""" - className = 'DoubleMaStrategy' - author = u'用Python的交易员' - - # 策略参数 - fastWindow = 10 # 快速均线参数 - slowWindow = 60 # 慢速均线参数 - initDays = 10 # 初始化数据所用的天数 - - # 策略变量 - fastMa0 = EMPTY_FLOAT # 当前最新的快速EMA - fastMa1 = EMPTY_FLOAT # 上一根的快速EMA - - slowMa0 = EMPTY_FLOAT - slowMa1 = EMPTY_FLOAT - - # 参数列表,保存了参数的名称 - paramList = ['name', - 'className', - 'author', - 'vtSymbol', - 'fastWindow', - 'slowWindow'] - - # 变量列表,保存了变量的名称 - varList = ['inited', - 'trading', - 'pos', - 'fastMa0', - 'fastMa1', - 'slowMa0', - 'slowMa1'] - - #---------------------------------------------------------------------- - def __init__(self, ctaEngine, setting): - """Constructor""" - super(DoubleMaStrategy, self).__init__(ctaEngine, setting) - - self.bm = BarManager(self.onBar) - self.am = ArrayManager() - - # 注意策略类中的可变对象属性(通常是list和dict等),在策略初始化时需要重新创建, - # 否则会出现多个策略实例之间数据共享的情况,有可能导致潜在的策略逻辑错误风险, - # 策略类中的这些可变对象属性可以选择不写,全都放在__init__下面,写主要是为了阅读 - # 策略时方便(更多是个编程习惯的选择) - - #---------------------------------------------------------------------- - def onInit(self): - """初始化策略(必须由用户继承实现)""" - self.writeCtaLog(u'双EMA演示策略初始化') - - initData = self.loadBar(self.initDays) - for bar in initData: - self.onBar(bar) - - self.putEvent() - - #---------------------------------------------------------------------- - def onStart(self): - """启动策略(必须由用户继承实现)""" - self.writeCtaLog(u'双EMA演示策略启动') - self.putEvent() - - #---------------------------------------------------------------------- - def onStop(self): - """停止策略(必须由用户继承实现)""" - self.writeCtaLog(u'双EMA演示策略停止') - self.putEvent() - - #---------------------------------------------------------------------- - def onTick(self, tick): - """收到行情TICK推送(必须由用户继承实现)""" - self.bm.updateTick(tick) - - #---------------------------------------------------------------------- - def onBar(self, bar): - """收到Bar推送(必须由用户继承实现)""" - am = self.am - am.updateBar(bar) - if not am.inited: - return - - # 计算快慢均线 - fastMa = am.sma(self.fastWindow, array=True) - self.fastMa0 = fastMa[-1] - self.fastMa1 = fastMa[-2] - - slowMa = am.sma(self.slowWindow, array=True) - self.slowMa0 = slowMa[-1] - self.slowMa1 = slowMa[-2] - - # 判断买卖 - crossOver = self.fastMa0>self.slowMa0 and self.fastMa1self.slowMa1 # 死叉下穿 - - # 金叉和死叉的条件是互斥 - # 所有的委托均以K线收盘价委托(这里有一个实盘中无法成交的风险,考虑添加对模拟市价单类型的支持) - if crossOver: - # 如果金叉时手头没有持仓,则直接做多 - if self.pos == 0: - self.buy(bar.close, 1) - # 如果有空头持仓,则先平空,再做多 - elif self.pos < 0: - self.cover(bar.close, 1) - self.buy(bar.close, 1) - # 死叉和金叉相反 - elif crossBelow: - if self.pos == 0: - self.short(bar.close, 1) - elif self.pos > 0: - self.sell(bar.close, 1) - self.short(bar.close, 1) - - # 发出状态更新事件 - self.putEvent() - - #---------------------------------------------------------------------- - def onOrder(self, order): - """收到委托变化推送(必须由用户继承实现)""" - # 对于无需做细粒度委托控制的策略,可以忽略onOrder - pass - - #---------------------------------------------------------------------- - def onTrade(self, trade): - """收到成交推送(必须由用户继承实现)""" - # 对于无需做细粒度委托控制的策略,可以忽略onOrder - pass - - #---------------------------------------------------------------------- - def onStopOrder(self, so): - """停止单推送""" - pass diff --git a/examples/VnTrader/ctaStrategy123/strategy/strategyDualThrust.py b/examples/VnTrader/ctaStrategy123/strategy/strategyDualThrust.py deleted file mode 100644 index e3809a0f..00000000 --- a/examples/VnTrader/ctaStrategy123/strategy/strategyDualThrust.py +++ /dev/null @@ -1,187 +0,0 @@ -# encoding: UTF-8 - -""" -DualThrust交易策略 -""" - -from datetime import time - -from vnpy.trader.vtObject import VtBarData -from vnpy.trader.vtConstant import EMPTY_STRING -from vnpy.trader.app.ctaStrategy.ctaTemplate import CtaTemplate, BarManager - - -######################################################################## -class DualThrustStrategy(CtaTemplate): - """DualThrust交易策略""" - className = 'DualThrustStrategy' - author = u'用Python的交易员' - - # 策略参数 - fixedSize = 100 - k1 = 0.4 - k2 = 0.6 - - initDays = 10 - - # 策略变量 - barList = [] # K线对象的列表 - - dayOpen = 0 - dayHigh = 0 - dayLow = 0 - - range = 0 - longEntry = 0 - shortEntry = 0 - exitTime = time(hour=14, minute=55) - - longEntered = False - shortEntered = False - - # 参数列表,保存了参数的名称 - paramList = ['name', - 'className', - 'author', - 'vtSymbol', - 'k1', - 'k2'] - - # 变量列表,保存了变量的名称 - varList = ['inited', - 'trading', - 'pos', - 'range', - 'longEntry', - 'shortEntry', - 'exitTime'] - - #---------------------------------------------------------------------- - def __init__(self, ctaEngine, setting): - """Constructor""" - super(DualThrustStrategy, self).__init__(ctaEngine, setting) - - self.bm = BarManager(self.onBar) - self.barList = [] - - #---------------------------------------------------------------------- - def onInit(self): - """初始化策略(必须由用户继承实现)""" - self.writeCtaLog(u'%s策略初始化' %self.name) - - # 载入历史数据,并采用回放计算的方式初始化策略数值 - initData = self.loadBar(self.initDays) - for bar in initData: - self.onBar(bar) - - self.putEvent() - - #---------------------------------------------------------------------- - def onStart(self): - """启动策略(必须由用户继承实现)""" - self.writeCtaLog(u'%s策略启动' %self.name) - self.putEvent() - - #---------------------------------------------------------------------- - def onStop(self): - """停止策略(必须由用户继承实现)""" - self.writeCtaLog(u'%s策略停止' %self.name) - self.putEvent() - - #---------------------------------------------------------------------- - def onTick(self, tick): - """收到行情TICK推送(必须由用户继承实现)""" - self.bm.updateTick(tick) - - #---------------------------------------------------------------------- - def onBar(self, bar): - """收到Bar推送(必须由用户继承实现)""" - # 撤销之前发出的尚未成交的委托(包括限价单和停止单) - self.cancelAll() - - # 计算指标数值 - self.barList.append(bar) - - if len(self.barList) <= 2: - return - else: - self.barList.pop(0) - lastBar = self.barList[-2] - - # 新的一天 - if lastBar.datetime.date() != bar.datetime.date(): - # 如果已经初始化 - if self.dayHigh: - self.range = self.dayHigh - self.dayLow - self.longEntry = bar.open + self.k1 * self.range - self.shortEntry = bar.open - self.k2 * self.range - - self.dayOpen = bar.open - self.dayHigh = bar.high - self.dayLow = bar.low - - self.longEntered = False - self.shortEntered = False - else: - self.dayHigh = max(self.dayHigh, bar.high) - self.dayLow = min(self.dayLow, bar.low) - - # 尚未到收盘 - if not self.range: - return - - if bar.datetime.time() < self.exitTime: - if self.pos == 0: - if bar.close > self.dayOpen: - if not self.longEntered: - self.buy(self.longEntry, self.fixedSize, stop=True) - else: - if not self.shortEntered: - self.short(self.shortEntry, self.fixedSize, stop=True) - - # 持有多头仓位 - elif self.pos > 0: - self.longEntered = True - - # 多头止损单 - self.sell(self.shortEntry, self.fixedSize, stop=True) - - # 空头开仓单 - if not self.shortEntered: - self.short(self.shortEntry, self.fixedSize, stop=True) - - # 持有空头仓位 - elif self.pos < 0: - self.shortEntered = True - - # 空头止损单 - self.cover(self.longEntry, self.fixedSize, stop=True) - - # 多头开仓单 - if not self.longEntered: - self.buy(self.longEntry, self.fixedSize, stop=True) - - # 收盘平仓 - else: - if self.pos > 0: - self.sell(bar.close * 0.99, abs(self.pos)) - elif self.pos < 0: - self.cover(bar.close * 1.01, abs(self.pos)) - - # 发出状态更新事件 - self.putEvent() - - #---------------------------------------------------------------------- - def onOrder(self, order): - """收到委托变化推送(必须由用户继承实现)""" - pass - - #---------------------------------------------------------------------- - def onTrade(self, trade): - # 发出状态更新事件 - self.putEvent() - - #---------------------------------------------------------------------- - def onStopOrder(self, so): - """停止单推送""" - pass \ No newline at end of file diff --git a/examples/VnTrader/ctaStrategy123/strategy/strategyKingKeltner.py b/examples/VnTrader/ctaStrategy123/strategy/strategyKingKeltner.py deleted file mode 100644 index 702b5e3c..00000000 --- a/examples/VnTrader/ctaStrategy123/strategy/strategyKingKeltner.py +++ /dev/null @@ -1,198 +0,0 @@ -# encoding: UTF-8 - -""" -基于King Keltner通道的交易策略,适合用在股指上, -展示了OCO委托和5分钟K线聚合的方法。 - -注意事项: -1. 作者不对交易盈利做任何保证,策略代码仅供参考 -2. 本策略需要用到talib,没有安装的用户请先参考www.vnpy.org上的教程安装 -3. 将IF0000_1min.csv用ctaHistoryData.py导入MongoDB后,直接运行本文件即可回测策略 -""" - -from __future__ import division - -from vnpy.trader.vtObject import VtBarData -from vnpy.trader.vtConstant import EMPTY_STRING -from vnpy.trader.app.ctaStrategy.ctaTemplate import (CtaTemplate, - BarManager, - ArrayManager) - - -######################################################################## -class KkStrategy(CtaTemplate): - """基于King Keltner通道的交易策略""" - className = 'KkStrategy' - author = u'用Python的交易员' - - # 策略参数 - kkLength = 11 # 计算通道中值的窗口数 - kkDev = 1.6 # 计算通道宽度的偏差 - trailingPrcnt = 0.8 # 移动止损 - initDays = 10 # 初始化数据所用的天数 - fixedSize = 1 # 每次交易的数量 - - # 策略变量 - kkUp = 0 # KK通道上轨 - kkDown = 0 # KK通道下轨 - intraTradeHigh = 0 # 持仓期内的最高点 - intraTradeLow = 0 # 持仓期内的最低点 - - buyOrderIDList = [] # OCO委托买入开仓的委托号 - shortOrderIDList = [] # OCO委托卖出开仓的委托号 - orderList = [] # 保存委托代码的列表 - - # 参数列表,保存了参数的名称 - paramList = ['name', - 'className', - 'author', - 'vtSymbol', - 'kkLength', - 'kkDev'] - - # 变量列表,保存了变量的名称 - varList = ['inited', - 'trading', - 'pos', - 'kkUp', - 'kkDown'] - - #---------------------------------------------------------------------- - def __init__(self, ctaEngine, setting): - """Constructor""" - super(KkStrategy, self).__init__(ctaEngine, setting) - - self.bm = BarManager(self.onBar, 5, self.onFiveBar) # 创建K线合成器对象 - self.am = ArrayManager() - - self.buyOrderIDList = [] - self.shortOrderIDList = [] - self.orderList = [] - - #---------------------------------------------------------------------- - def onInit(self): - """初始化策略(必须由用户继承实现)""" - self.writeCtaLog(u'%s策略初始化' %self.name) - - # 载入历史数据,并采用回放计算的方式初始化策略数值 - initData = self.loadBar(self.initDays) - for bar in initData: - self.onBar(bar) - - self.putEvent() - - #---------------------------------------------------------------------- - def onStart(self): - """启动策略(必须由用户继承实现)""" - self.writeCtaLog(u'%s策略启动' %self.name) - self.putEvent() - - #---------------------------------------------------------------------- - def onStop(self): - """停止策略(必须由用户继承实现)""" - self.writeCtaLog(u'%s策略停止' %self.name) - self.putEvent() - - #---------------------------------------------------------------------- - def onTick(self, tick): - """收到行情TICK推送(必须由用户继承实现)""" - self.bm.updateTick(tick) - - #---------------------------------------------------------------------- - def onBar(self, bar): - """收到Bar推送(必须由用户继承实现)""" - self.bm.updateBar(bar) - - #---------------------------------------------------------------------- - def onFiveBar(self, bar): - """收到5分钟K线""" - # 撤销之前发出的尚未成交的委托(包括限价单和停止单) - for orderID in self.orderList: - self.cancelOrder(orderID) - self.orderList = [] - - # 保存K线数据 - am = self.am - am.updateBar(bar) - if not am.inited: - return - - # 计算指标数值 - self.kkUp, self.kkDown = am.keltner(self.kkLength, self.kkDev) - - # 判断是否要进行交易 - - # 当前无仓位,发送OCO开仓委托 - if self.pos == 0: - self.intraTradeHigh = bar.high - self.intraTradeLow = bar.low - self.sendOcoOrder(self.kkUp, self.kkDown, self.fixedSize) - - # 持有多头仓位 - elif self.pos > 0: - self.intraTradeHigh = max(self.intraTradeHigh, bar.high) - self.intraTradeLow = bar.low - - l = self.sell(self.intraTradeHigh*(1-self.trailingPrcnt/100), - abs(self.pos), True) - self.orderList.extend(l) - - # 持有空头仓位 - elif self.pos < 0: - self.intraTradeHigh = bar.high - self.intraTradeLow = min(self.intraTradeLow, bar.low) - - l = self.cover(self.intraTradeLow*(1+self.trailingPrcnt/100), - abs(self.pos), True) - self.orderList.extend(l) - - # 发出状态更新事件 - self.putEvent() - - #---------------------------------------------------------------------- - def onOrder(self, order): - """收到委托变化推送(必须由用户继承实现)""" - pass - - #---------------------------------------------------------------------- - def onTrade(self, trade): - if self.pos != 0: - # 多头开仓成交后,撤消空头委托 - if self.pos > 0: - for shortOrderID in self.shortOrderIDList: - self.cancelOrder(shortOrderID) - # 反之同样 - elif self.pos < 0: - for buyOrderID in self.buyOrderIDList: - self.cancelOrder(buyOrderID) - - # 移除委托号 - for orderID in (self.buyOrderIDList + self.shortOrderIDList): - if orderID in self.orderList: - self.orderList.remove(orderID) - - # 发出状态更新事件 - self.putEvent() - - #---------------------------------------------------------------------- - def sendOcoOrder(self, buyPrice, shortPrice, volume): - """ - 发送OCO委托 - - OCO(One Cancel Other)委托: - 1. 主要用于实现区间突破入场 - 2. 包含两个方向相反的停止单 - 3. 一个方向的停止单成交后会立即撤消另一个方向的 - """ - # 发送双边的停止单委托,并记录委托号 - self.buyOrderIDList = self.buy(buyPrice, volume, True) - self.shortOrderIDList = self.short(shortPrice, volume, True) - - # 将委托号记录到列表中 - self.orderList.extend(self.buyOrderIDList) - self.orderList.extend(self.shortOrderIDList) - - #---------------------------------------------------------------------- - def onStopOrder(self, so): - """停止单推送""" - pass \ No newline at end of file diff --git a/examples/VnTrader/ctaStrategy123/uiCtaWidget.py b/examples/VnTrader/ctaStrategy123/uiCtaWidget.py deleted file mode 100644 index 6c8421b3..00000000 --- a/examples/VnTrader/ctaStrategy123/uiCtaWidget.py +++ /dev/null @@ -1,270 +0,0 @@ -# encoding: UTF-8 - -''' -CTA模块相关的GUI控制组件 -''' - - -from vnpy.event import Event -from vnpy.trader.vtEvent import * -from vnpy.trader.uiBasicWidget import QtGui, QtCore, QtWidgets, BasicCell - -from .ctaBase import EVENT_CTA_LOG, EVENT_CTA_STRATEGY -from .language import text - - -######################################################################## -class CtaValueMonitor(QtWidgets.QTableWidget): - """参数监控""" - - #---------------------------------------------------------------------- - def __init__(self, parent=None): - """Constructor""" - super(CtaValueMonitor, self).__init__(parent) - - self.keyCellDict = {} - self.data = None - self.inited = False - - self.initUi() - - #---------------------------------------------------------------------- - def initUi(self): - """初始化界面""" - self.setRowCount(1) - self.verticalHeader().setVisible(False) - self.setEditTriggers(self.NoEditTriggers) - - self.setMaximumHeight(self.sizeHint().height()) - - #---------------------------------------------------------------------- - def updateData(self, data): - """更新数据""" - if not self.inited: - self.setColumnCount(len(data)) - self.setHorizontalHeaderLabels(data.keys()) - - col = 0 - for k, v in data.items(): - cell = QtWidgets.QTableWidgetItem(unicode(v)) - self.keyCellDict[k] = cell - self.setItem(0, col, cell) - col += 1 - - self.inited = True - else: - for k, v in data.items(): - cell = self.keyCellDict[k] - cell.setText(unicode(v)) - - -######################################################################## -class CtaStrategyManager(QtWidgets.QGroupBox): - """策略管理组件""" - signal = QtCore.Signal(type(Event())) - - #---------------------------------------------------------------------- - def __init__(self, ctaEngine, eventEngine, name, parent=None): - """Constructor""" - super(CtaStrategyManager, self).__init__(parent) - - self.ctaEngine = ctaEngine - self.eventEngine = eventEngine - self.name = name - - self.initUi() - self.updateMonitor() - self.registerEvent() - - #---------------------------------------------------------------------- - def initUi(self): - """初始化界面""" - self.setTitle(self.name) - - self.paramMonitor = CtaValueMonitor(self) - self.varMonitor = CtaValueMonitor(self) - - height = 65 - self.paramMonitor.setFixedHeight(height) - self.varMonitor.setFixedHeight(height) - - buttonInit = QtWidgets.QPushButton(text.INIT) - buttonStart = QtWidgets.QPushButton(text.START) - buttonStop = QtWidgets.QPushButton(text.STOP) - buttonInit.clicked.connect(self.init) - buttonStart.clicked.connect(self.start) - buttonStop.clicked.connect(self.stop) - - hbox1 = QtWidgets.QHBoxLayout() - hbox1.addWidget(buttonInit) - hbox1.addWidget(buttonStart) - hbox1.addWidget(buttonStop) - hbox1.addStretch() - - hbox2 = QtWidgets.QHBoxLayout() - hbox2.addWidget(self.paramMonitor) - - hbox3 = QtWidgets.QHBoxLayout() - hbox3.addWidget(self.varMonitor) - - vbox = QtWidgets.QVBoxLayout() - vbox.addLayout(hbox1) - vbox.addLayout(hbox2) - vbox.addLayout(hbox3) - - self.setLayout(vbox) - - #---------------------------------------------------------------------- - def updateMonitor(self, event=None): - """显示策略最新状态""" - paramDict = self.ctaEngine.getStrategyParam(self.name) - if paramDict: - self.paramMonitor.updateData(paramDict) - - varDict = self.ctaEngine.getStrategyVar(self.name) - if varDict: - self.varMonitor.updateData(varDict) - - #---------------------------------------------------------------------- - def registerEvent(self): - """注册事件监听""" - self.signal.connect(self.updateMonitor) - self.eventEngine.register(EVENT_CTA_STRATEGY+self.name, self.signal.emit) - - #---------------------------------------------------------------------- - def init(self): - """初始化策略""" - self.ctaEngine.initStrategy(self.name) - - #---------------------------------------------------------------------- - def start(self): - """启动策略""" - self.ctaEngine.startStrategy(self.name) - - #---------------------------------------------------------------------- - def stop(self): - """停止策略""" - self.ctaEngine.stopStrategy(self.name) - - -######################################################################## -class CtaEngineManager(QtWidgets.QWidget): - """CTA引擎管理组件""" - signal = QtCore.Signal(type(Event())) - - #---------------------------------------------------------------------- - def __init__(self, ctaEngine, eventEngine, parent=None): - """Constructor""" - super(CtaEngineManager, self).__init__(parent) - - self.ctaEngine = ctaEngine - self.eventEngine = eventEngine - - self.strategyLoaded = False - - self.initUi() - self.registerEvent() - - # 记录日志 - self.ctaEngine.writeCtaLog(text.CTA_ENGINE_STARTED) - - #---------------------------------------------------------------------- - def initUi(self): - """初始化界面""" - self.setWindowTitle(text.CTA_STRATEGY) - - # 按钮 - loadButton = QtWidgets.QPushButton(text.LOAD_STRATEGY) - initAllButton = QtWidgets.QPushButton(text.INIT_ALL) - startAllButton = QtWidgets.QPushButton(text.START_ALL) - stopAllButton = QtWidgets.QPushButton(text.STOP_ALL) - - loadButton.clicked.connect(self.load) - initAllButton.clicked.connect(self.initAll) - startAllButton.clicked.connect(self.startAll) - stopAllButton.clicked.connect(self.stopAll) - - # 滚动区域,放置所有的CtaStrategyManager - self.scrollArea = QtWidgets.QScrollArea() - self.scrollArea.setWidgetResizable(True) - - # CTA组件的日志监控 - self.ctaLogMonitor = QtWidgets.QTextEdit() - self.ctaLogMonitor.setReadOnly(True) - self.ctaLogMonitor.setMaximumHeight(200) - - # 设置布局 - hbox2 = QtWidgets.QHBoxLayout() - hbox2.addWidget(loadButton) - hbox2.addWidget(initAllButton) - hbox2.addWidget(startAllButton) - hbox2.addWidget(stopAllButton) - hbox2.addStretch() - - vbox = QtWidgets.QVBoxLayout() - vbox.addLayout(hbox2) - vbox.addWidget(self.scrollArea) - vbox.addWidget(self.ctaLogMonitor) - self.setLayout(vbox) - - #---------------------------------------------------------------------- - def initStrategyManager(self): - """初始化策略管理组件界面""" - w = QtWidgets.QWidget() - vbox = QtWidgets.QVBoxLayout() - - for name in self.ctaEngine.strategyDict.keys(): - strategyManager = CtaStrategyManager(self.ctaEngine, self.eventEngine, name) - vbox.addWidget(strategyManager) - - vbox.addStretch() - - w.setLayout(vbox) - self.scrollArea.setWidget(w) - - #---------------------------------------------------------------------- - def initAll(self): - """全部初始化""" - self.ctaEngine.initAll() - - #---------------------------------------------------------------------- - def startAll(self): - """全部启动""" - self.ctaEngine.startAll() - - #---------------------------------------------------------------------- - def stopAll(self): - """全部停止""" - self.ctaEngine.stopAll() - - #---------------------------------------------------------------------- - def load(self): - """加载策略""" - if not self.strategyLoaded: - self.ctaEngine.loadSetting() - self.initStrategyManager() - self.strategyLoaded = True - self.ctaEngine.writeCtaLog(text.STRATEGY_LOADED) - - #---------------------------------------------------------------------- - def updateCtaLog(self, event): - """更新CTA相关日志""" - log = event.dict_['data'] - content = '\t'.join([log.logTime, log.logContent]) - self.ctaLogMonitor.append(content) - - #---------------------------------------------------------------------- - def registerEvent(self): - """注册事件监听""" - self.signal.connect(self.updateCtaLog) - self.eventEngine.register(EVENT_CTA_LOG, self.signal.emit) - - - - - - - - - - \ No newline at end of file