[Del]移除错误添加:ctaStrategy123

This commit is contained in:
vn.py 2017-11-28 09:18:04 +08:00
parent c6979d8a78
commit 9ce1039caf
22 changed files with 0 additions and 4440 deletions

View File

@ -1,19 +0,0 @@
[
{
"name": "double ema",
"className": "EmaDemoStrategy",
"vtSymbol": "IF1706"
},
{
"name": "atr rsi",
"className": "AtrRsiStrategy",
"vtSymbol": "IC1706"
},
{
"name": "king keltner",
"className": "KkStrategy",
"vtSymbol": "IH1706"
}
]

View File

@ -1,10 +0,0 @@
# encoding: UTF-8
from ctaEngine import CtaEngine
from uiCtaWidget import CtaEngineManager
appName = 'CtaStrategy'
appDisplayName = u'CTA策略'
appEngine = CtaEngine
appWidget = CtaEngineManager
appIco = 'cta.ico'

Binary file not shown.

Before

Width:  |  Height:  |  Size: 66 KiB

File diff suppressed because it is too large Load Diff

View File

@ -1,58 +0,0 @@
# encoding: UTF-8
'''
本文件中包含了CTA模块中用到的一些基础设置类和常量等
'''
# CTA引擎中涉及的数据类定义
from vnpy.trader.vtConstant import EMPTY_UNICODE, EMPTY_STRING, EMPTY_FLOAT, EMPTY_INT
# 常量定义
# CTA引擎中涉及到的交易方向类型
CTAORDER_BUY = u'买开'
CTAORDER_SELL = u'卖平'
CTAORDER_SHORT = u'卖开'
CTAORDER_COVER = u'买平'
# 本地停止单状态
STOPORDER_WAITING = u'等待中'
STOPORDER_CANCELLED = u'已撤销'
STOPORDER_TRIGGERED = u'已触发'
# 本地停止单前缀
STOPORDERPREFIX = 'CtaStopOrder.'
# 数据库名称
SETTING_DB_NAME = 'VnTrader_Setting_Db'
POSITION_DB_NAME = 'VnTrader_Position_Db'
TICK_DB_NAME = 'VnTrader_Tick_Db'
DAILY_DB_NAME = 'VnTrader_Daily_Db'
MINUTE_DB_NAME = 'VnTrader_1Min_Db'
# 引擎类型,用于区分当前策略的运行环境
ENGINETYPE_BACKTESTING = 'backtesting' # 回测
ENGINETYPE_TRADING = 'trading' # 实盘
# CTA模块事件
EVENT_CTA_LOG = 'eCtaLog' # CTA相关的日志事件
EVENT_CTA_STRATEGY = 'eCtaStrategy.' # CTA策略状态变化事件
########################################################################
class StopOrder(object):
"""本地停止单"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.vtSymbol = EMPTY_STRING
self.orderType = EMPTY_UNICODE
self.direction = EMPTY_UNICODE
self.offset = EMPTY_UNICODE
self.price = EMPTY_FLOAT
self.volume = EMPTY_INT
self.strategy = None # 下停止单的策略对象
self.stopOrderID = EMPTY_STRING # 停止单的本地编号
self.status = EMPTY_STRING # 停止单状态

View File

@ -1,631 +0,0 @@
# encoding: UTF-8
'''
本文件中实现了CTA策略引擎针对CTA类型的策略抽象简化了部分底层接口的功能
关于平今和平昨规则
1. 普通的平仓OFFSET_CLOSET等于平昨OFFSET_CLOSEYESTERDAY
2. 只有上期所的品种需要考虑平今和平昨的区别
3. 当上期所的期货有今仓时调用Sell和Cover会使用OFFSET_CLOSETODAY否则
会使用OFFSET_CLOSE
4. 以上设计意味着如果Sell和Cover的数量超过今日持仓量时会导致出错即用户
希望通过一个指令同时平今和平昨
5. 采用以上设计的原因是考虑到vn.trader的用户主要是对TBMC和金字塔类的平台
感到功能不足的用户即希望更高频的交易交易策略不应该出现4中所述的情况
6. 对于想要实现4中所述情况的用户需要实现一个策略信号引擎和交易委托引擎分开
的定制化统结构没错得自己写
'''
from __future__ import division
import json
import os
import traceback
from collections import OrderedDict
from datetime import datetime, timedelta
from vnpy.event import Event
from vnpy.trader.vtEvent import *
from vnpy.trader.vtConstant import *
from vnpy.trader.vtObject import VtTickData, VtBarData
from vnpy.trader.vtGateway import VtSubscribeReq, VtOrderReq, VtCancelOrderReq, VtLogData
from vnpy.trader.vtFunction import todayDate, getJsonPath
from .ctaBase import *
from .strategy import STRATEGY_CLASS
########################################################################
class CtaEngine(object):
"""CTA策略引擎"""
settingFileName = 'CTA_setting.json'
settingfilePath = getJsonPath(settingFileName, __file__)
STATUS_FINISHED = set([STATUS_REJECTED, STATUS_CANCELLED, STATUS_ALLTRADED])
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine):
"""Constructor"""
self.mainEngine = mainEngine
self.eventEngine = eventEngine
# 当前日期
self.today = todayDate()
# 保存策略实例的字典
# key为策略名称value为策略实例注意策略名称不允许重复
self.strategyDict = {}
# 保存vtSymbol和策略实例映射的字典用于推送tick数据
# 由于可能多个strategy交易同一个vtSymbol因此key为vtSymbol
# value为包含所有相关strategy对象的list
self.tickStrategyDict = {}
# 保存vtOrderID和strategy对象映射的字典用于推送order和trade数据
# key为vtOrderIDvalue为strategy对象
self.orderStrategyDict = {}
# 本地停止单编号计数
self.stopOrderCount = 0
# stopOrderID = STOPORDERPREFIX + str(stopOrderCount)
# 本地停止单字典
# key为stopOrderIDvalue为stopOrder对象
self.stopOrderDict = {} # 停止单撤销后不会从本字典中删除
self.workingStopOrderDict = {} # 停止单撤销后会从本字典中删除
# 保存策略名称和委托号列表的字典
# key为namevalue为保存orderID限价+本地停止)的集合
self.strategyOrderDict = {}
# 成交号集合,用来过滤已经收到过的成交推送
self.tradeSet = set()
# 引擎类型为实盘
self.engineType = ENGINETYPE_TRADING
# 注册日式事件类型
self.mainEngine.registerLogEvent(EVENT_CTA_LOG)
# 注册事件监听
self.registerEvent()
#----------------------------------------------------------------------
def sendOrder(self, vtSymbol, orderType, price, volume, strategy):
"""发单"""
contract = self.mainEngine.getContract(vtSymbol)
req = VtOrderReq()
req.symbol = contract.symbol
req.exchange = contract.exchange
req.vtSymbol = contract.vtSymbol
req.price = self.roundToPriceTick(contract.priceTick, price)
req.volume = volume
req.productClass = strategy.productClass
req.currency = strategy.currency
# 设计为CTA引擎发出的委托只允许使用限价单
req.priceType = PRICETYPE_LIMITPRICE
# CTA委托类型映射
if orderType == CTAORDER_BUY:
req.direction = DIRECTION_LONG
req.offset = OFFSET_OPEN
elif orderType == CTAORDER_SELL:
req.direction = DIRECTION_SHORT
req.offset = OFFSET_CLOSE
elif orderType == CTAORDER_SHORT:
req.direction = DIRECTION_SHORT
req.offset = OFFSET_OPEN
elif orderType == CTAORDER_COVER:
req.direction = DIRECTION_LONG
req.offset = OFFSET_CLOSE
# 委托转换
reqList = self.mainEngine.convertOrderReq(req)
vtOrderIDList = []
if not reqList:
return vtOrderIDList
for convertedReq in reqList:
vtOrderID = self.mainEngine.sendOrder(convertedReq, contract.gatewayName) # 发单
self.orderStrategyDict[vtOrderID] = strategy # 保存vtOrderID和策略的映射关系
self.strategyOrderDict[strategy.name].add(vtOrderID) # 添加到策略委托号集合中
vtOrderIDList.append(vtOrderID)
self.writeCtaLog(u'策略%s发送委托,%s%s%s@%s'
%(strategy.name, vtSymbol, req.direction, volume, price))
return vtOrderIDList
#----------------------------------------------------------------------
def cancelOrder(self, vtOrderID):
"""撤单"""
# 查询报单对象
order = self.mainEngine.getOrder(vtOrderID)
# 如果查询成功
if order:
# 检查是否报单还有效,只有有效时才发出撤单指令
orderFinished = (order.status==STATUS_ALLTRADED or order.status==STATUS_CANCELLED)
if not orderFinished:
req = VtCancelOrderReq()
req.symbol = order.symbol
req.exchange = order.exchange
req.frontID = order.frontID
req.sessionID = order.sessionID
req.orderID = order.orderID
self.mainEngine.cancelOrder(req, order.gatewayName)
#----------------------------------------------------------------------
def sendStopOrder(self, vtSymbol, orderType, price, volume, strategy):
"""发停止单(本地实现)"""
self.stopOrderCount += 1
stopOrderID = STOPORDERPREFIX + str(self.stopOrderCount)
so = StopOrder()
so.vtSymbol = vtSymbol
so.orderType = orderType
so.price = price
so.volume = volume
so.strategy = strategy
so.stopOrderID = stopOrderID
so.status = STOPORDER_WAITING
if orderType == CTAORDER_BUY:
so.direction = DIRECTION_LONG
so.offset = OFFSET_OPEN
elif orderType == CTAORDER_SELL:
so.direction = DIRECTION_SHORT
so.offset = OFFSET_CLOSE
elif orderType == CTAORDER_SHORT:
so.direction = DIRECTION_SHORT
so.offset = OFFSET_OPEN
elif orderType == CTAORDER_COVER:
so.direction = DIRECTION_LONG
so.offset = OFFSET_CLOSE
# 保存stopOrder对象到字典中
self.stopOrderDict[stopOrderID] = so
self.workingStopOrderDict[stopOrderID] = so
# 保存stopOrderID到策略委托号集合中
self.strategyOrderDict[strategy.name].add(stopOrderID)
# 推送停止单状态
strategy.onStopOrder(so)
return [stopOrderID]
#----------------------------------------------------------------------
def cancelStopOrder(self, stopOrderID):
"""撤销停止单"""
# 检查停止单是否存在
if stopOrderID in self.workingStopOrderDict:
so = self.workingStopOrderDict[stopOrderID]
strategy = so.strategy
# 更改停止单状态为已撤销
so.status = STOPORDER_CANCELLED
# 从活动停止单字典中移除
del self.workingStopOrderDict[stopOrderID]
# 从策略委托号集合中移除
s = self.strategyOrderDict[strategy.name]
if stopOrderID in s:
s.remove(stopOrderID)
# 通知策略
strategy.onStopOrder(so)
#----------------------------------------------------------------------
def processStopOrder(self, tick):
"""收到行情后处理本地停止单(检查是否要立即发出)"""
vtSymbol = tick.vtSymbol
# 首先检查是否有策略交易该合约
if vtSymbol in self.tickStrategyDict:
# 遍历等待中的停止单,检查是否会被触发
for so in self.workingStopOrderDict.values():
if so.vtSymbol == vtSymbol:
longTriggered = so.direction==DIRECTION_LONG and tick.lastPrice>=so.price # 多头停止单被触发
shortTriggered = so.direction==DIRECTION_SHORT and tick.lastPrice<=so.price # 空头停止单被触发
if longTriggered or shortTriggered:
# 买入和卖出分别以涨停跌停价发单(模拟市价单)
if so.direction==DIRECTION_LONG:
price = tick.upperLimit
else:
price = tick.lowerLimit
# 发出市价委托
self.sendOrder(so.vtSymbol, so.orderType, price, so.volume, so.strategy)
# 从活动停止单字典中移除该停止单
del self.workingStopOrderDict[so.stopOrderID]
# 从策略委托号集合中移除
s = self.strategyOrderDict[so.strategy.name]
if so.stopOrderID in s:
s.remove(so.stopOrderID)
# 更新停止单状态,并通知策略
so.status = STOPORDER_TRIGGERED
so.strategy.onStopOrder(so)
#----------------------------------------------------------------------
def processTickEvent(self, event):
"""处理行情推送"""
tick = event.dict_['data']
# 收到tick行情后先处理本地停止单检查是否要立即发出
self.processStopOrder(tick)
# 推送tick到对应的策略实例进行处理
if tick.vtSymbol in self.tickStrategyDict:
# tick时间可能出现异常数据使用try...except实现捕捉和过滤
try:
# 添加datetime字段
if not tick.datetime:
tick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')
except ValueError:
self.writeCtaLog(traceback.format_exc())
return
# 逐个推送到策略实例中
l = self.tickStrategyDict[tick.vtSymbol]
for strategy in l:
self.callStrategyFunc(strategy, strategy.onTick, tick)
#----------------------------------------------------------------------
def processOrderEvent(self, event):
"""处理委托推送"""
order = event.dict_['data']
vtOrderID = order.vtOrderID
if vtOrderID in self.orderStrategyDict:
strategy = self.orderStrategyDict[vtOrderID]
# 如果委托已经完成(拒单、撤销、全成),则从活动委托集合中移除
if order.status in self.STATUS_FINISHED:
s = self.strategyOrderDict[strategy.name]
if vtOrderID in s:
s.remove(vtOrderID)
self.callStrategyFunc(strategy, strategy.onOrder, order)
#----------------------------------------------------------------------
def processTradeEvent(self, event):
"""处理成交推送"""
trade = event.dict_['data']
# 过滤已经收到过的成交回报
if trade.vtTradeID in self.tradeSet:
return
self.tradeSet.add(trade.vtTradeID)
# 将成交推送到策略对象中
if trade.vtOrderID in self.orderStrategyDict:
strategy = self.orderStrategyDict[trade.vtOrderID]
# 计算策略持仓
if trade.direction == DIRECTION_LONG:
strategy.pos += trade.volume
else:
strategy.pos -= trade.volume
self.callStrategyFunc(strategy, strategy.onTrade, trade)
# 保存策略持仓到数据库
self.savePosition(strategy)
#----------------------------------------------------------------------
def registerEvent(self):
"""注册事件监听"""
self.eventEngine.register(EVENT_TICK, self.processTickEvent)
self.eventEngine.register(EVENT_ORDER, self.processOrderEvent)
self.eventEngine.register(EVENT_TRADE, self.processTradeEvent)
#----------------------------------------------------------------------
def insertData(self, dbName, collectionName, data):
"""插入数据到数据库这里的data可以是VtTickData或者VtBarData"""
self.mainEngine.dbInsert(dbName, collectionName, data.__dict__)
#----------------------------------------------------------------------
def loadBar(self, dbName, collectionName, days):
"""从数据库中读取Bar数据startDate是datetime对象"""
startDate = self.today - timedelta(days)
d = {'datetime':{'$gte':startDate}}
barData = self.mainEngine.dbQuery(dbName, collectionName, d, 'datetime')
l = []
for d in barData:
bar = VtBarData()
bar.__dict__ = d
l.append(bar)
return l
#----------------------------------------------------------------------
def loadTick(self, dbName, collectionName, days):
"""从数据库中读取Tick数据startDate是datetime对象"""
startDate = self.today - timedelta(days)
d = {'datetime':{'$gte':startDate}}
tickData = self.mainEngine.dbQuery(dbName, collectionName, d, 'datetime')
l = []
for d in tickData:
tick = VtTickData()
tick.__dict__ = d
l.append(tick)
return l
#----------------------------------------------------------------------
def writeCtaLog(self, content):
"""快速发出CTA模块日志事件"""
log = VtLogData()
log.logContent = content
log.gatewayName = 'CTA_STRATEGY'
event = Event(type_=EVENT_CTA_LOG)
event.dict_['data'] = log
self.eventEngine.put(event)
#----------------------------------------------------------------------
def loadStrategy(self, setting):
"""载入策略"""
try:
name = setting['name']
className = setting['className']
except Exception, e:
self.writeCtaLog(u'载入策略出错:%s' %e)
return
# 获取策略类
strategyClass = STRATEGY_CLASS.get(className, None)
if not strategyClass:
self.writeCtaLog(u'找不到策略类:%s' %className)
return
# 防止策略重名
if name in self.strategyDict:
self.writeCtaLog(u'策略实例重名:%s' %name)
else:
# 创建策略实例
strategy = strategyClass(self, setting)
self.strategyDict[name] = strategy
# 创建委托号列表
self.strategyOrderDict[name] = set()
# 保存Tick映射关系
if strategy.vtSymbol in self.tickStrategyDict:
l = self.tickStrategyDict[strategy.vtSymbol]
else:
l = []
self.tickStrategyDict[strategy.vtSymbol] = l
l.append(strategy)
# 订阅合约
contract = self.mainEngine.getContract(strategy.vtSymbol)
if contract:
req = VtSubscribeReq()
req.symbol = contract.symbol
req.exchange = contract.exchange
# 对于IB接口订阅行情时所需的货币和产品类型从策略属性中获取
req.currency = strategy.currency
req.productClass = strategy.productClass
self.mainEngine.subscribe(req, contract.gatewayName)
else:
self.writeCtaLog(u'%s的交易合约%s无法找到' %(name, strategy.vtSymbol))
#----------------------------------------------------------------------
def initStrategy(self, name):
"""初始化策略"""
if name in self.strategyDict:
strategy = self.strategyDict[name]
if not strategy.inited:
strategy.inited = True
self.callStrategyFunc(strategy, strategy.onInit)
else:
self.writeCtaLog(u'请勿重复初始化策略实例:%s' %name)
else:
self.writeCtaLog(u'策略实例不存在:%s' %name)
#---------------------------------------------------------------------
def startStrategy(self, name):
"""启动策略"""
if name in self.strategyDict:
strategy = self.strategyDict[name]
if strategy.inited and not strategy.trading:
strategy.trading = True
self.callStrategyFunc(strategy, strategy.onStart)
else:
self.writeCtaLog(u'策略实例不存在:%s' %name)
#----------------------------------------------------------------------
def stopStrategy(self, name):
"""停止策略"""
if name in self.strategyDict:
strategy = self.strategyDict[name]
if strategy.trading:
strategy.trading = False
self.callStrategyFunc(strategy, strategy.onStop)
# 对该策略发出的所有限价单进行撤单
for vtOrderID, s in self.orderStrategyDict.items():
if s is strategy:
self.cancelOrder(vtOrderID)
# 对该策略发出的所有本地停止单撤单
for stopOrderID, so in self.workingStopOrderDict.items():
if so.strategy is strategy:
self.cancelStopOrder(stopOrderID)
else:
self.writeCtaLog(u'策略实例不存在:%s' %name)
#----------------------------------------------------------------------
def initAll(self):
"""全部初始化"""
for name in self.strategyDict.keys():
self.initStrategy(name)
#----------------------------------------------------------------------
def startAll(self):
"""全部启动"""
for name in self.strategyDict.keys():
self.startStrategy(name)
#----------------------------------------------------------------------
def stopAll(self):
"""全部停止"""
for name in self.strategyDict.keys():
self.stopStrategy(name)
#----------------------------------------------------------------------
def saveSetting(self):
"""保存策略配置"""
with open(self.settingfilePath, 'w') as f:
l = []
for strategy in self.strategyDict.values():
setting = {}
for param in strategy.paramList:
setting[param] = strategy.__getattribute__(param)
l.append(setting)
jsonL = json.dumps(l, indent=4)
f.write(jsonL)
#----------------------------------------------------------------------
def loadSetting(self):
"""读取策略配置"""
with open(self.settingfilePath) as f:
l = json.load(f)
for setting in l:
self.loadStrategy(setting)
self.loadPosition()
#----------------------------------------------------------------------
def getStrategyVar(self, name):
"""获取策略当前的变量字典"""
if name in self.strategyDict:
strategy = self.strategyDict[name]
varDict = OrderedDict()
for key in strategy.varList:
varDict[key] = strategy.__getattribute__(key)
return varDict
else:
self.writeCtaLog(u'策略实例不存在:' + name)
return None
#----------------------------------------------------------------------
def getStrategyParam(self, name):
"""获取策略的参数字典"""
if name in self.strategyDict:
strategy = self.strategyDict[name]
paramDict = OrderedDict()
for key in strategy.paramList:
paramDict[key] = strategy.__getattribute__(key)
return paramDict
else:
self.writeCtaLog(u'策略实例不存在:' + name)
return None
#----------------------------------------------------------------------
def putStrategyEvent(self, name):
"""触发策略状态变化事件通常用于通知GUI更新"""
event = Event(EVENT_CTA_STRATEGY+name)
self.eventEngine.put(event)
#----------------------------------------------------------------------
def callStrategyFunc(self, strategy, func, params=None):
"""调用策略的函数,若触发异常则捕捉"""
try:
if params:
func(params)
else:
func()
except Exception:
# 停止策略,修改状态为未初始化
strategy.trading = False
strategy.inited = False
# 发出日志
content = '\n'.join([u'策略%s触发异常已停止' %strategy.name,
traceback.format_exc()])
self.writeCtaLog(content)
#----------------------------------------------------------------------
def savePosition(self, strategy):
"""保存策略的持仓情况到数据库"""
flt = {'name': strategy.name,
'vtSymbol': strategy.vtSymbol}
d = {'name': strategy.name,
'vtSymbol': strategy.vtSymbol,
'pos': strategy.pos}
self.mainEngine.dbUpdate(POSITION_DB_NAME, strategy.className,
d, flt, True)
content = '策略%s持仓保存成功,当前持仓%s' %(strategy.name, strategy.pos)
self.writeCtaLog(content)
#----------------------------------------------------------------------
def loadPosition(self):
"""从数据库载入策略的持仓情况"""
for strategy in self.strategyDict.values():
flt = {'name': strategy.name,
'vtSymbol': strategy.vtSymbol}
posData = self.mainEngine.dbQuery(POSITION_DB_NAME, strategy.className, flt)
for d in posData:
strategy.pos = d['pos']
#----------------------------------------------------------------------
def roundToPriceTick(self, priceTick, price):
"""取整价格到合约最小价格变动"""
if not priceTick:
return price
newPrice = round(price/priceTick, 0) * priceTick
return newPrice
#----------------------------------------------------------------------
def stop(self):
"""停止"""
pass
#----------------------------------------------------------------------
def cancelAll(self, name):
"""全部撤单"""
s = self.strategyOrderDict[name]
# 遍历列表,全部撤单
# 这里不能直接遍历集合s因为撤单时会修改s中的内容导致出错
for orderID in list(s):
if STOPORDERPREFIX in orderID:
self.cancelStopOrder(orderID)
else:
self.cancelOrder(orderID)

View File

@ -1,507 +0,0 @@
# encoding: UTF-8
"""
本模块中主要包含
1. 从通联数据下载历史行情的引擎
2. 用来把MultiCharts导出的历史数据载入到MongoDB中用的函数
3. 增加从通达信导出的历史数据载入到MongoDB中的函数
"""
from datetime import datetime, timedelta
from time import time
from multiprocessing.pool import ThreadPool
import pymongo
from vnpy.data.datayes import DatayesApi
from vnpy.trader.vtGlobal import globalSetting
from vnpy.trader.vtConstant import *
from vnpy.trader.vtObject import VtBarData
from .ctaBase import SETTING_DB_NAME, TICK_DB_NAME, MINUTE_DB_NAME, DAILY_DB_NAME
# 以下为vn.trader和通联数据规定的交易所代码映射
VT_TO_DATAYES_EXCHANGE = {}
VT_TO_DATAYES_EXCHANGE[EXCHANGE_CFFEX] = 'CCFX' # 中金所
VT_TO_DATAYES_EXCHANGE[EXCHANGE_SHFE] = 'XSGE' # 上期所
VT_TO_DATAYES_EXCHANGE[EXCHANGE_CZCE] = 'XZCE' # 郑商所
VT_TO_DATAYES_EXCHANGE[EXCHANGE_DCE] = 'XDCE' # 大商所
DATAYES_TO_VT_EXCHANGE = {v:k for k,v in VT_TO_DATAYES_EXCHANGE.items()}
########################################################################
class HistoryDataEngine(object):
"""CTA模块用的历史数据引擎"""
#----------------------------------------------------------------------
def __init__(self, token):
"""Constructor"""
self.dbClient = pymongo.MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort'])
self.datayesClient = DatayesApi(token)
#----------------------------------------------------------------------
def lastTradeDate(self):
"""获取最近交易日(只考虑工作日,无法检查国内假期)"""
today = datetime.now()
oneday = timedelta(1)
if today.weekday() == 5:
today = today - oneday
elif today.weekday() == 6:
today = today - oneday*2
return today.strftime('%Y%m%d')
#----------------------------------------------------------------------
def readFuturesProductSymbol(self):
"""查询所有期货产品代码"""
cx = self.dbClient[SETTING_DB_NAME]['FuturesSymbol'].find()
return set([d['productSymbol'] for d in cx]) # 这里返回的是集合(因为会重复)
#----------------------------------------------------------------------
def readFuturesSymbol(self):
"""查询所有期货合约代码"""
cx = self.dbClient[SETTING_DB_NAME]['FuturesSymbol'].find()
return [d['symbol'] for d in cx] # 这里返回的是列表
#----------------------------------------------------------------------
def downloadFuturesSymbol(self, tradeDate=''):
"""下载所有期货的代码"""
if not tradeDate:
tradeDate = self.lastTradeDate()
self.dbClient[SETTING_DB_NAME]['FuturesSymbol'].ensure_index([('symbol', pymongo.ASCENDING)],
unique=True)
path = 'api/market/getMktMFutd.json'
params = {}
params['tradeDate'] = tradeDate
data = self.datayesClient.downloadData(path, params)
if data:
for d in data:
symbolDict = {}
symbolDict['symbol'] = d['ticker']
symbolDict['productSymbol'] = d['contractObject']
flt = {'symbol': d['ticker']}
self.dbClient[SETTING_DB_NAME]['FuturesSymbol'].update_one(flt, {'$set':symbolDict},
upsert=True)
print u'期货合约代码下载完成'
else:
print u'期货合约代码下载失败'
#----------------------------------------------------------------------
def downloadFuturesDailyBar(self, symbol):
"""
下载期货合约的日行情symbol是合约代码
若最后四位为0000如IF0000代表下载连续合约
"""
print u'开始下载%s日行情' %symbol
# 查询数据库中已有数据的最后日期
cl = self.dbClient[DAILY_DB_NAME][symbol]
cx = cl.find(sort=[('datetime', pymongo.DESCENDING)])
if cx.count():
last = cx[0]
else:
last = ''
# 主力合约
if '0000' in symbol:
path = 'api/market/getMktMFutd.json'
params = {}
params['contractObject'] = symbol.replace('0000', '')
params['mainCon'] = 1
if last:
params['startDate'] = last['date']
# 交易合约
else:
path = 'api/market/getMktFutd.json'
params = {}
params['ticker'] = symbol
if last:
params['startDate'] = last['date']
# 开始下载数据
data = self.datayesClient.downloadData(path, params)
if data:
# 创建datetime索引
self.dbClient[DAILY_DB_NAME][symbol].ensure_index([('datetime', pymongo.ASCENDING)],
unique=True)
for d in data:
bar = VtBarData()
bar.vtSymbol = symbol
bar.symbol = symbol
try:
bar.exchange = DATAYES_TO_VT_EXCHANGE.get(d.get('exchangeCD', ''), '')
bar.open = d.get('openPrice', 0)
bar.high = d.get('highestPrice', 0)
bar.low = d.get('lowestPrice', 0)
bar.close = d.get('closePrice', 0)
bar.date = d.get('tradeDate', '').replace('-', '')
bar.time = ''
bar.datetime = datetime.strptime(bar.date, '%Y%m%d')
bar.volume = d.get('turnoverVol', 0)
bar.openInterest = d.get('openInt', 0)
except KeyError:
print d
flt = {'datetime': bar.datetime}
self.dbClient[DAILY_DB_NAME][symbol].update_one(flt, {'$set':bar.__dict__}, upsert=True)
print u'%s下载完成' %symbol
else:
print u'找不到合约%s' %symbol
#----------------------------------------------------------------------
def downloadAllFuturesDailyBar(self):
"""下载所有期货的主力合约日行情"""
start = time()
print u'开始下载所有期货的主力合约日行情'
productSymbolSet = self.readFuturesProductSymbol()
print u'代码列表读取成功,产品代码:%s' %productSymbolSet
# 这里也测试了线程池,但可能由于下载函数中涉及较多的数据格
# 式转换CPU开销较大多线程效率并无显著改变。
#p = ThreadPool(10)
#p.map(self.downloadFuturesDailyBar, productSymbolSet)
#p.close()
#p.join()
for productSymbol in productSymbolSet:
self.downloadFuturesDailyBar(productSymbol+'0000')
print u'所有期货的主力合约日行情已经全部下载完成, 耗时%s' %(time()-start)
#----------------------------------------------------------------------
def downloadFuturesIntradayBar(self, symbol):
"""下载期货的日内分钟行情"""
print u'开始下载%s日内分钟行情' %symbol
# 日内分钟行情只有具体合约
path = 'api/market/getFutureBarRTIntraDay.json'
params = {}
params['instrumentID'] = symbol
params['unit'] = 1
data = self.datayesClient.downloadData(path, params)
if data:
today = datetime.now().strftime('%Y%m%d')
# 创建datetime索引
self.dbClient[MINUTE_DB_NAME][symbol].ensure_index([('datetime', pymongo.ASCENDING)],
unique=True)
for d in data:
bar = VtBarData()
bar.vtSymbol = symbol
bar.symbol = symbol
try:
bar.exchange = DATAYES_TO_VT_EXCHANGE.get(d.get('exchangeCD', ''), '')
bar.open = d.get('openPrice', 0)
bar.high = d.get('highestPrice', 0)
bar.low = d.get('lowestPrice', 0)
bar.close = d.get('closePrice', 0)
bar.date = today
bar.time = d.get('barTime', '')
bar.datetime = datetime.strptime(bar.date + ' ' + bar.time, '%Y%m%d %H:%M')
bar.volume = d.get('totalVolume', 0)
bar.openInterest = 0
except KeyError:
print d
flt = {'datetime': bar.datetime}
self.dbClient[MINUTE_DB_NAME][symbol].update_one(flt, {'$set':bar.__dict__}, upsert=True)
print u'%s下载完成' %symbol
else:
print u'找不到合约%s' %symbol
#----------------------------------------------------------------------
def downloadEquitySymbol(self, tradeDate=''):
"""下载所有股票的代码"""
if not tradeDate:
tradeDate = self.lastTradeDate()
self.dbClient[SETTING_DB_NAME]['EquitySymbol'].ensure_index([('symbol', pymongo.ASCENDING)],
unique=True)
path = 'api/market/getMktEqud.json'
params = {}
params['tradeDate'] = tradeDate
data = self.datayesClient.downloadData(path, params)
if data:
for d in data:
symbolDict = {}
symbolDict['symbol'] = d['ticker']
flt = {'symbol': d['ticker']}
self.dbClient[SETTING_DB_NAME]['EquitySymbol'].update_one(flt, {'$set':symbolDict},
upsert=True)
print u'股票代码下载完成'
else:
print u'股票代码下载失败'
#----------------------------------------------------------------------
def downloadEquityDailyBar(self, symbol):
"""
下载股票的日行情symbol是股票代码
"""
print u'开始下载%s日行情' %symbol
# 查询数据库中已有数据的最后日期
cl = self.dbClient[DAILY_DB_NAME][symbol]
cx = cl.find(sort=[('datetime', pymongo.DESCENDING)])
if cx.count():
last = cx[0]
else:
last = ''
# 开始下载数据
path = 'api/market/getMktEqud.json'
params = {}
params['ticker'] = symbol
if last:
params['beginDate'] = last['date']
data = self.datayesClient.downloadData(path, params)
if data:
# 创建datetime索引
self.dbClient[DAILY_DB_NAME][symbol].ensure_index([('datetime', pymongo.ASCENDING)],
unique=True)
for d in data:
bar = VtBarData()
bar.vtSymbol = symbol
bar.symbol = symbol
try:
bar.exchange = DATAYES_TO_VT_EXCHANGE.get(d.get('exchangeCD', ''), '')
bar.open = d.get('openPrice', 0)
bar.high = d.get('highestPrice', 0)
bar.low = d.get('lowestPrice', 0)
bar.close = d.get('closePrice', 0)
bar.date = d.get('tradeDate', '').replace('-', '')
bar.time = ''
bar.datetime = datetime.strptime(bar.date, '%Y%m%d')
bar.volume = d.get('turnoverVol', 0)
except KeyError:
print d
flt = {'datetime': bar.datetime}
self.dbClient[DAILY_DB_NAME][symbol].update_one(flt, {'$set':bar.__dict__}, upsert=True)
print u'%s下载完成' %symbol
else:
print u'找不到合约%s' %symbol
#----------------------------------------------------------------------
def downloadEquityDailyBarts(self, symbol):
"""
下载股票的日行情symbol是股票代码
"""
print u'开始下载%s日行情' %symbol
# 查询数据库中已有数据的最后日期
cl = self.dbClient[DAILY_DB_NAME][symbol]
cx = cl.find(sort=[('datetime', pymongo.DESCENDING)])
if cx.count():
last = cx[0]
else:
last = ''
# 开始下载数据
import tushare as ts
if last:
start = last['date'][:4]+'-'+last['date'][4:6]+'-'+last['date'][6:]
data = ts.get_k_data(symbol,start)
if not data.empty:
# 创建datetime索引
self.dbClient[DAILY_DB_NAME][symbol].ensure_index([('datetime', pymongo.ASCENDING)],
unique=True)
for index, d in data.iterrows():
bar = VtBarData()
bar.vtSymbol = symbol
bar.symbol = symbol
try:
bar.open = d.get('open')
bar.high = d.get('high')
bar.low = d.get('low')
bar.close = d.get('close')
bar.date = d.get('date').replace('-', '')
bar.time = ''
bar.datetime = datetime.strptime(bar.date, '%Y%m%d')
bar.volume = d.get('volume')
except KeyError:
print d
flt = {'datetime': bar.datetime}
self.dbClient[DAILY_DB_NAME][symbol].update_one(flt, {'$set':bar.__dict__}, upsert=True)
print u'%s下载完成' %symbol
else:
print u'找不到合约%s' %symbol
#----------------------------------------------------------------------
def loadMcCsv(fileName, dbName, symbol):
"""将Multicharts导出的csv格式的历史数据插入到Mongo数据库中"""
import csv
start = time()
print u'开始读取CSV文件%s中的数据插入到%s%s' %(fileName, dbName, symbol)
# 锁定集合,并创建索引
client = pymongo.MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort'])
collection = client[dbName][symbol]
collection.ensure_index([('datetime', pymongo.ASCENDING)], unique=True)
# 读取数据和插入到数据库
reader = csv.DictReader(file(fileName, 'r'))
for d in reader:
bar = VtBarData()
bar.vtSymbol = symbol
bar.symbol = symbol
bar.open = float(d['Open'])
bar.high = float(d['High'])
bar.low = float(d['Low'])
bar.close = float(d['Close'])
bar.date = datetime.strptime(d['Date'], '%Y-%m-%d').strftime('%Y%m%d')
bar.time = d['Time']
bar.datetime = datetime.strptime(bar.date + ' ' + bar.time, '%Y%m%d %H:%M:%S')
bar.volume = d['TotalVolume']
flt = {'datetime': bar.datetime}
collection.update_one(flt, {'$set':bar.__dict__}, upsert=True)
print bar.date, bar.time
print u'插入完毕,耗时:%s' % (time()-start)
#----------------------------------------------------------------------
def loadTbCsv(fileName, dbName, symbol):
"""将TradeBlazer导出的csv格式的历史分钟数据插入到Mongo数据库中"""
import csv
start = time()
print u'开始读取CSV文件%s中的数据插入到%s%s' %(fileName, dbName, symbol)
# 锁定集合,并创建索引
client = pymongo.MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort'])
collection = client[dbName][symbol]
collection.ensure_index([('datetime', pymongo.ASCENDING)], unique=True)
# 读取数据和插入到数据库
reader = csv.reader(file(fileName, 'r'))
for d in reader:
bar = VtBarData()
bar.vtSymbol = symbol
bar.symbol = symbol
bar.open = float(d[1])
bar.high = float(d[2])
bar.low = float(d[3])
bar.close = float(d[4])
bar.date = datetime.strptime(d[0].split(' ')[0], '%Y/%m/%d').strftime('%Y%m%d')
bar.time = d[0].split(' ')[1]+":00"
bar.datetime = datetime.strptime(bar.date + ' ' + bar.time, '%Y%m%d %H:%M:%S')
bar.volume = d[5]
bar.openInterest = d[6]
flt = {'datetime': bar.datetime}
collection.update_one(flt, {'$set':bar.__dict__}, upsert=True)
print bar.date, bar.time
print u'插入完毕,耗时:%s' % (time()-start)
#----------------------------------------------------------------------
def loadTbPlusCsv(fileName, dbName, symbol):
"""将TB极速版导出的csv格式的历史分钟数据插入到Mongo数据库中"""
import csv
start = time()
print u'开始读取CSV文件%s中的数据插入到%s%s' %(fileName, dbName, symbol)
# 锁定集合,并创建索引
client = pymongo.MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort'])
collection = client[dbName][symbol]
collection.ensure_index([('datetime', pymongo.ASCENDING)], unique=True)
# 读取数据和插入到数据库
reader = csv.reader(file(fileName, 'r'))
for d in reader:
bar = VtBarData()
bar.vtSymbol = symbol
bar.symbol = symbol
bar.open = float(d[2])
bar.high = float(d[3])
bar.low = float(d[4])
bar.close = float(d[5])
bar.date = str(d[0])
tempstr=str(round(float(d[1])*10000)).split(".")[0].zfill(4)
bar.time = tempstr[:2]+":"+tempstr[2:4]+":00"
bar.datetime = datetime.strptime(bar.date + ' ' + bar.time, '%Y%m%d %H:%M:%S')
bar.volume = d[6]
bar.openInterest = d[7]
flt = {'datetime': bar.datetime}
collection.update_one(flt, {'$set':bar.__dict__}, upsert=True)
print bar.date, bar.time
print u'插入完毕,耗时:%s' % (time()-start)
#----------------------------------------------------------------------
def loadTdxCsv(fileName, dbName, symbol):
"""将通达信导出的csv格式的历史分钟数据插入到Mongo数据库中"""
import csv
start = time()
print u'开始读取CSV文件%s中的数据插入到%s%s' %(fileName, dbName, symbol)
# 锁定集合,并创建索引
client = pymongo.MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort'])
collection = client[dbName][symbol]
collection.ensure_index([('datetime', pymongo.ASCENDING)], unique=True)
# 读取数据和插入到数据库
reader = csv.reader(file(fileName, 'r'))
for d in reader:
bar = VtBarData()
bar.vtSymbol = symbol
bar.symbol = symbol
bar.open = float(d[2])
bar.high = float(d[3])
bar.low = float(d[4])
bar.close = float(d[5])
bar.date = datetime.strptime(d[0], '%Y/%m/%d').strftime('%Y%m%d')
bar.time = d[1][:2]+':'+d[1][2:4]+':00'
bar.datetime = datetime.strptime(bar.date + ' ' + bar.time, '%Y%m%d %H:%M:%S')
bar.volume = d[6]
bar.openInterest = d[7]
flt = {'datetime': bar.datetime}
collection.update_one(flt, {'$set':bar.__dict__}, upsert=True)
print bar.date, bar.time
print u'插入完毕,耗时:%s' % (time()-start)

View File

@ -1,580 +0,0 @@
# encoding: UTF-8
'''
本文件包含了CTA引擎中的策略开发用模板开发策略时需要继承CtaTemplate类
'''
import numpy as np
import talib
from vnpy.trader.vtConstant import *
from vnpy.trader.vtObject import VtBarData
from .ctaBase import *
########################################################################
class CtaTemplate(object):
"""CTA策略模板"""
# 策略类的名称和作者
className = 'CtaTemplate'
author = EMPTY_UNICODE
# MongoDB数据库的名称K线数据库默认为1分钟
tickDbName = TICK_DB_NAME
barDbName = MINUTE_DB_NAME
# 策略的基本参数
name = EMPTY_UNICODE # 策略实例名称
vtSymbol = EMPTY_STRING # 交易的合约vt系统代码
productClass = EMPTY_STRING # 产品类型只有IB接口需要
currency = EMPTY_STRING # 货币只有IB接口需要
# 策略的基本变量,由引擎管理
inited = False # 是否进行了初始化
trading = False # 是否启动交易,由引擎管理
pos = 0 # 持仓情况
# 参数列表,保存了参数的名称
paramList = ['name',
'className',
'author',
'vtSymbol']
# 变量列表,保存了变量的名称
varList = ['inited',
'trading',
'pos']
#----------------------------------------------------------------------
def __init__(self, ctaEngine, setting):
"""Constructor"""
self.ctaEngine = ctaEngine
# 设置策略的参数
if setting:
d = self.__dict__
for key in self.paramList:
if key in setting:
d[key] = setting[key]
#----------------------------------------------------------------------
def onInit(self):
"""初始化策略(必须由用户继承实现)"""
raise NotImplementedError
#----------------------------------------------------------------------
def onStart(self):
"""启动策略(必须由用户继承实现)"""
raise NotImplementedError
#----------------------------------------------------------------------
def onStop(self):
"""停止策略(必须由用户继承实现)"""
raise NotImplementedError
#----------------------------------------------------------------------
def onTick(self, tick):
"""收到行情TICK推送必须由用户继承实现"""
raise NotImplementedError
#----------------------------------------------------------------------
def onOrder(self, order):
"""收到委托变化推送(必须由用户继承实现)"""
raise NotImplementedError
#----------------------------------------------------------------------
def onTrade(self, trade):
"""收到成交推送(必须由用户继承实现)"""
raise NotImplementedError
#----------------------------------------------------------------------
def onBar(self, bar):
"""收到Bar推送必须由用户继承实现"""
raise NotImplementedError
#----------------------------------------------------------------------
def onStopOrder(self, so):
"""收到停止单推送(必须由用户继承实现)"""
raise NotImplementedError
#----------------------------------------------------------------------
def buy(self, price, volume, stop=False):
"""买开"""
return self.sendOrder(CTAORDER_BUY, price, volume, stop)
#----------------------------------------------------------------------
def sell(self, price, volume, stop=False):
"""卖平"""
return self.sendOrder(CTAORDER_SELL, price, volume, stop)
#----------------------------------------------------------------------
def short(self, price, volume, stop=False):
"""卖开"""
return self.sendOrder(CTAORDER_SHORT, price, volume, stop)
#----------------------------------------------------------------------
def cover(self, price, volume, stop=False):
"""买平"""
return self.sendOrder(CTAORDER_COVER, price, volume, stop)
#----------------------------------------------------------------------
def sendOrder(self, orderType, price, volume, stop=False):
"""发送委托"""
if self.trading:
# 如果stop为True则意味着发本地停止单
if stop:
vtOrderIDList = self.ctaEngine.sendStopOrder(self.vtSymbol, orderType, price, volume, self)
else:
vtOrderIDList = self.ctaEngine.sendOrder(self.vtSymbol, orderType, price, volume, self)
return vtOrderIDList
else:
# 交易停止时发单返回空字符串
return []
#----------------------------------------------------------------------
def cancelOrder(self, vtOrderID):
"""撤单"""
# 如果发单号为空字符串,则不进行后续操作
if not vtOrderID:
return
if STOPORDERPREFIX in vtOrderID:
self.ctaEngine.cancelStopOrder(vtOrderID)
else:
self.ctaEngine.cancelOrder(vtOrderID)
#----------------------------------------------------------------------
def cancelAll(self):
"""全部撤单"""
self.ctaEngine.cancelAll(self.name)
#----------------------------------------------------------------------
def insertTick(self, tick):
"""向数据库中插入tick数据"""
self.ctaEngine.insertData(self.tickDbName, self.vtSymbol, tick)
#----------------------------------------------------------------------
def insertBar(self, bar):
"""向数据库中插入bar数据"""
self.ctaEngine.insertData(self.barDbName, self.vtSymbol, bar)
#----------------------------------------------------------------------
def loadTick(self, days):
"""读取tick数据"""
return self.ctaEngine.loadTick(self.tickDbName, self.vtSymbol, days)
#----------------------------------------------------------------------
def loadBar(self, days):
"""读取bar数据"""
return self.ctaEngine.loadBar(self.barDbName, self.vtSymbol, days)
#----------------------------------------------------------------------
def writeCtaLog(self, content):
"""记录CTA日志"""
content = self.name + ':' + content
self.ctaEngine.writeCtaLog(content)
#----------------------------------------------------------------------
def putEvent(self):
"""发出策略状态变化事件"""
self.ctaEngine.putStrategyEvent(self.name)
#----------------------------------------------------------------------
def getEngineType(self):
"""查询当前运行的环境"""
return self.ctaEngine.engineType
########################################################################
class TargetPosTemplate(CtaTemplate):
"""
允许直接通过修改目标持仓来实现交易的策略模板
开发策略时无需再调用buy/sell/cover/short这些具体的委托指令
只需在策略逻辑运行完成后调用setTargetPos设置目标持仓底层算法
会自动完成相关交易适合不擅长管理交易挂撤单细节的用户
使用该模板开发策略时请在以下回调方法中先调用母类的方法
onTick
onBar
onOrder
假设策略名为TestStrategy请在onTick回调中加上
super(TestStrategy, self).onTick(tick)
其他方法类同
"""
className = 'TargetPosTemplate'
author = u'量衍投资'
# 目标持仓模板的基本变量
tickAdd = 1 # 委托时相对基准价格的超价
lastTick = None # 最新tick数据
lastBar = None # 最新bar数据
targetPos = EMPTY_INT # 目标持仓
orderList = [] # 委托号列表
# 变量列表,保存了变量的名称
varList = ['inited',
'trading',
'pos',
'targetPos']
#----------------------------------------------------------------------
def __init__(self, ctaEngine, setting):
"""Constructor"""
super(TargetPosTemplate, self).__init__(ctaEngine, setting)
#----------------------------------------------------------------------
def onTick(self, tick):
"""收到行情推送"""
self.lastTick = tick
# 实盘模式下启动交易后需要根据tick的实时推送执行自动开平仓操作
if self.trading:
self.trade()
#----------------------------------------------------------------------
def onBar(self, bar):
"""收到K线推送"""
self.lastBar = bar
#----------------------------------------------------------------------
def onOrder(self, order):
"""收到委托推送"""
if order.status == STATUS_ALLTRADED or order.status == STATUS_CANCELLED:
self.orderList.remove(order.vtOrderID)
#----------------------------------------------------------------------
def setTargetPos(self, targetPos):
"""设置目标仓位"""
self.targetPos = targetPos
self.trade()
#----------------------------------------------------------------------
def trade(self):
"""执行交易"""
# 先撤销之前的委托
for vtOrderID in self.orderList:
self.cancelOrder(vtOrderID)
self.orderList = []
# 如果目标仓位和实际仓位一致,则不进行任何操作
posChange = self.targetPos - self.pos
if not posChange:
return
# 确定委托基准价格有tick数据时优先使用否则使用bar
longPrice = 0
shortPrice = 0
if self.lastTick:
if posChange > 0:
longPrice = self.lastTick.askPrice1 + self.tickAdd
else:
shortPrice = self.lastTick.bidPrice1 - self.tickAdd
else:
if posChange > 0:
longPrice = self.lastBar.close + self.tickAdd
else:
shortPrice = self.lastBar.close - self.tickAdd
# 回测模式下,采用合并平仓和反向开仓委托的方式
if self.getEngineType() == ENGINETYPE_BACKTESTING:
if posChange > 0:
l = self.buy(longPrice, abs(posChange))
else:
l = self.short(shortPrice, abs(posChange))
self.orderList.extend(l)
# 实盘模式下,首先确保之前的委托都已经结束(全成、撤销)
# 然后先发平仓委托,等待成交后,再发送新的开仓委托
else:
# 检查之前委托都已结束
if self.orderList:
return
# 买入
if posChange > 0:
if self.pos < 0:
l = self.cover(longPrice, abs(self.pos))
else:
l = self.buy(longPrice, abs(posChange))
# 卖出
else:
if self.pos > 0:
l = self.sell(shortPrice, abs(self.pos))
else:
l = self.short(shortPrice, abs(posChange))
self.orderList.extend(l)
########################################################################
class BarManager(object):
"""
K线合成器支持
1. 基于Tick合成1分钟K线
2. 基于1分钟K线合成X分钟K线X可以是23510153060
"""
#----------------------------------------------------------------------
def __init__(self, onBar, xmin=0, onXminBar=None):
"""Constructor"""
self.bar = None # 1分钟K线对象
self.onBar = onBar # 1分钟K线回调函数
self.xminBar = None # X分钟K线对象
self.xmin = xmin # X的值
self.onXminBar = onXminBar # X分钟K线的回调函数
self.lastTick = None # 上一TICK缓存对象
#----------------------------------------------------------------------
def updateTick(self, tick):
"""TICK更新"""
newMinute = False # 默认不是新的一分钟
# 尚未创建对象
if not self.bar:
self.bar = VtBarData()
newMinute = True
# 新的一分钟
elif self.bar.datetime.minute != tick.datetime.minute:
# 生成上一分钟K线的时间戳
self.bar.datetime = self.bar.datetime.replace(second=0, microsecond=0) # 将秒和微秒设为0
self.bar.date = self.bar.datetime.strftime('%Y%m%d')
self.bar.time = self.bar.datetime.strftime('%H:%M:%S.%f')
# 推送已经结束的上一分钟K线
self.onBar(self.bar)
# 创建新的K线对象
self.bar = VtBarData()
newMinute = True
# 初始化新一分钟的K线数据
if newMinute:
self.bar.vtSymbol = tick.vtSymbol
self.bar.symbol = tick.symbol
self.bar.exchange = tick.exchange
self.bar.open = tick.lastPrice
self.bar.high = tick.lastPrice
self.bar.low = tick.lastPrice
# 累加更新老一分钟的K线数据
else:
self.bar.high = max(self.bar.high, tick.lastPrice)
self.bar.low = min(self.bar.low, tick.lastPrice)
# 通用更新部分
self.bar.close = tick.lastPrice
self.bar.datetime = tick.datetime
self.bar.openInterest = tick.openInterest
if self.lastTick:
self.bar.volume += (tick.volume - self.lastTick.volume) # 当前K线内的成交量
# 缓存Tick
self.lastTick = tick
#----------------------------------------------------------------------
def updateBar(self, bar):
"""1分钟K线更新"""
# 尚未创建对象
if not self.xminBar:
self.xminBar = VtBarData()
self.xminBar.vtSymbol = bar.vtSymbol
self.xminBar.symbol = bar.symbol
self.xminBar.exchange = bar.exchange
self.xminBar.open = bar.open
self.xminBar.high = bar.high
self.xminBar.low = bar.low
# 累加老K线
else:
self.xminBar.high = max(self.xminBar.high, bar.high)
self.xminBar.low = min(self.xminBar.low, bar.low)
# 通用部分
self.xminBar.close = bar.close
self.xminBar.datetime = bar.datetime
self.xminBar.openInterest = bar.openInterest
self.xminBar.volume += int(bar.volume)
# X分钟已经走完
if not bar.datetime.minute % self.xmin: # 可以用X整除
# 生成上一X分钟K线的时间戳
self.xminBar.datetime = self.xminBar.datetime.replace(second=0, microsecond=0) # 将秒和微秒设为0
self.xminBar.date = self.xminBar.datetime.strftime('%Y%m%d')
self.xminBar.time = self.xminBar.datetime.strftime('%H:%M:%S.%f')
# 推送
self.onXminBar(self.xminBar)
# 清空老K线缓存对象
self.xminBar = None
########################################################################
class ArrayManager(object):
"""
K线序列管理工具负责
1. K线时间序列的维护
2. 常用技术指标的计算
"""
#----------------------------------------------------------------------
def __init__(self, size=100):
"""Constructor"""
self.count = 0 # 缓存计数
self.size = size # 缓存大小
self.inited = False # True if count>=size
self.openArray = np.zeros(size) # OHLC
self.highArray = np.zeros(size)
self.lowArray = np.zeros(size)
self.closeArray = np.zeros(size)
self.volumeArray = np.zeros(size)
#----------------------------------------------------------------------
def updateBar(self, bar):
"""更新K线"""
self.count += 1
if not self.inited and self.count >= self.size:
self.inited = True
self.openArray[0:self.size-1] = self.openArray[1:self.size]
self.highArray[0:self.size-1] = self.highArray[1:self.size]
self.lowArray[0:self.size-1] = self.lowArray[1:self.size]
self.closeArray[0:self.size-1] = self.closeArray[1:self.size]
self.volumeArray[0:self.size-1] = self.volumeArray[1:self.size]
self.openArray[-1] = bar.open
self.highArray[-1] = bar.high
self.lowArray[-1] = bar.low
self.closeArray[-1] = bar.close
self.volumeArray[-1] = bar.volume
#----------------------------------------------------------------------
@property
def open(self):
"""获取开盘价序列"""
return self.openArray
#----------------------------------------------------------------------
@property
def high(self):
"""获取最高价序列"""
return self.highArray
#----------------------------------------------------------------------
@property
def low(self):
"""获取最低价序列"""
return self.lowArray
#----------------------------------------------------------------------
@property
def close(self):
"""获取收盘价序列"""
return self.closeArray
#----------------------------------------------------------------------
@property
def volume(self):
"""获取成交量序列"""
return self.volumeArray
#----------------------------------------------------------------------
def sma(self, n, array=False):
"""简单均线"""
result = talib.SMA(self.close, n)
if array:
return result
return result[-1]
#----------------------------------------------------------------------
def std(self, n, array=False):
"""标准差"""
result = talib.STDDEV(self.close, n)
if array:
return result
return result[-1]
#----------------------------------------------------------------------
def cci(self, n, array=False):
"""CCI指标"""
result = talib.CCI(self.high, self.low, self.close, n)
if array:
return result
return result[-1]
#----------------------------------------------------------------------
def atr(self, n, array=False):
"""ATR指标"""
result = talib.ATR(self.high, self.low, self.close, n)
if array:
return result
return result[-1]
#----------------------------------------------------------------------
def rsi(self, n, array=False):
"""RSI指标"""
result = talib.RSI(self.close, n)
if array:
return result
return result[-1]
#----------------------------------------------------------------------
def macd(self, fastPeriod, slowPeriod, signalPeriod, array=False):
"""MACD指标"""
macd, signal, hist = talib.MACD(self.close, fastPeriod,
slowPeriod, signalPeriod)
if array:
return macd, signal, hist
return macd[-1], signal[-1], hist[-1]
#----------------------------------------------------------------------
def adx(self, n, array=False):
"""ADX指标"""
result = talib.ADX(self.high, self.low, self.close, n)
if array:
return result
return result[-1]
#----------------------------------------------------------------------
def boll(self, n, dev, array=False):
"""布林通道"""
mid = self.sma(n, array)
std = self.std(n, array)
up = mid + std * dev
down = mid - std * dev
return up, down
#----------------------------------------------------------------------
def keltner(self, n, dev, array=False):
"""肯特纳通道"""
mid = self.sma(n, array)
atr = self.atr(n, array)
up = mid + atr * dev
down = mid - atr * dev
return up, down
#----------------------------------------------------------------------
def donchian(self, n, array=False):
"""唐奇安通道"""
up = talib.MAX(self.high, n)
down = talib.MIN(self.low, n)
if array:
return up, down
return up[-1], down[-1]

View File

@ -1,5 +0,0 @@
{
"domain": "http://api.wmcloud.com/data",
"version": "v1",
"token": "575593eb7696aec7339224c0fac2313780d8645f68b77369dcb35f8bcb419a0b"
}

View File

@ -1,83 +0,0 @@
# encoding: UTF-8
'''一个简单的通联数据客户端主要使用requests开发比通联官网的python例子更为简洁。'''
import os
import requests
import json
FILENAME = 'datayes.json'
HTTP_OK = 200
########################################################################
class DatayesClient(object):
"""通联数据客户端"""
name = u'通联数据客户端'
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.domain = '' # 主域名
self.version = '' # API版本
self.token = '' # 授权码
self.header = {} # http请求头部
self.settingLoaded = False # 配置是否已经读取
self.loadSetting()
#----------------------------------------------------------------------
def loadSetting(self):
"""载入配置"""
try:
path = os.path.abspath(os.path.dirname(__file__))
FILENAME = os.path.join(path, FILENAME)
f = file(FILENAME)
except IOError:
print u'%s无法打开配置文件' % self.name
return
setting = json.load(f)
try:
self.domain = str(setting['domain'])
self.version = str(setting['version'])
self.token = str(setting['token'])
except KeyError:
print u'%s配置文件字段缺失' % self.name
return
self.header['Connection'] = 'keep_alive'
self.header['Authorization'] = 'Bearer ' + self.token
self.settingLoaded = True
print u'%s配置载入完成' % self.name
#----------------------------------------------------------------------
def downloadData(self, path, params):
"""下载数据"""
if not self.settingLoaded:
print u'%s配置未载入' % self.name
return None
else:
url = '/'.join([self.domain, self.version, path])
r = requests.get(url=url, headers=self.header, params=params)
if r.status_code != HTTP_OK:
print u'%shttp请求失败状态代码%s' %(self.name, r.status_code)
return None
else:
result = r.json()
if 'retMsg' in result and result['retMsg'] == 'Success':
return result['data']
else:
if 'retMsg' in result:
print u'%s查询失败,返回信息%s' %(self.name, result['retMsg'])
elif 'message' in result:
print u'%s查询失败,返回信息%s' %(self.name, result['message'])
return None

View File

@ -1,13 +0,0 @@
# encoding: UTF-8
import json
import os
import traceback
# 默认设置
from chinese import text
# 是否要使用英文
from vnpy.trader.vtGlobal import globalSetting
if globalSetting['language'] == 'english':
from english import text

View File

@ -1,18 +0,0 @@
# encoding: UTF-8
INIT = u'初始化'
START = u'启动'
STOP = u'停止'
CTA_ENGINE_STARTED = u'CTA引擎启动成功'
CTA_STRATEGY = u'CTA策略'
LOAD_STRATEGY = u'加载策略'
INIT_ALL = u'全部初始化'
START_ALL = u'全部启动'
STOP_ALL = u'全部停止'
SAVE_POSITION_DATA = u'保存持仓'
STRATEGY_LOADED = u'策略加载成功'
SAVE_POSITION_QUESTION = u'是否要保存策略持仓数据到数据库?'

View File

@ -1,18 +0,0 @@
# encoding: UTF-8
INIT = u'Init'
START = u'Start'
STOP = u'Stop'
CTA_ENGINE_STARTED = u'CTA engine started.'
CTA_STRATEGY = u'CTA Strategy'
LOAD_STRATEGY = u'Load Strategy'
INIT_ALL = u'Init All'
START_ALL = u'Start All'
STOP_ALL = u'Stop All'
SAVE_POSITION_DATA = u'Save Position Data'
STRATEGY_LOADED = u'Strategy loaded.'
SAVE_POSITION_QUESTION = u'Do you want to save strategy position data into database?'

View File

@ -1,50 +0,0 @@
# encoding: UTF-8
'''
动态载入所有的策略类
'''
import os
import importlib
import traceback
# 用来保存策略类的字典
STRATEGY_CLASS = {}
#----------------------------------------------------------------------
def loadStrategyModule(moduleName):
"""使用importlib动态载入模块"""
try:
module = importlib.import_module(moduleName)
# 遍历模块下的对象,只有名称中包含'Strategy'的才是策略类
for k in dir(module):
if 'Strategy' in k:
v = module.__getattribute__(k)
STRATEGY_CLASS[k] = v
except:
print '-' * 20
print ('Failed to import strategy file %s:' %moduleName)
traceback.print_exc()
# 遍历strategy目录下的文件
path = os.path.abspath(os.path.dirname(__file__))
for root, subdirs, files in os.walk(path):
for name in files:
# 只有文件名中包含strategy且非.pyc的文件才是策略文件
if 'strategy' in name and '.pyc' not in name:
# 模块名称需要模块路径前缀
moduleName = 'vnpy.trader.app.ctaStrategy.strategy.' + name.replace('.py', '')
loadStrategyModule(moduleName)
# 遍历工作目录下的文件
workingPath = os.getcwd()
for root, subdirs, files in os.walk(workingPath):
for name in files:
# 只有文件名中包含strategy且非.pyc的文件才是策略文件
if 'strategy' in name and '.pyc' not in name:
# 模块名称无需前缀
moduleName = name.replace('.py', '')
loadStrategyModule(moduleName)

View File

@ -1,184 +0,0 @@
# encoding: UTF-8
"""
一个ATR-RSI指标结合的交易策略适合用在股指的1分钟和5分钟线上
注意事项
1. 作者不对交易盈利做任何保证策略代码仅供参考
2. 本策略需要用到talib没有安装的用户请先参考www.vnpy.org上的教程安装
3. 将IF0000_1min.csv用ctaHistoryData.py导入MongoDB后直接运行本文件即可回测策略
"""
from vnpy.trader.vtObject import VtBarData
from vnpy.trader.vtConstant import EMPTY_STRING
from vnpy.trader.app.ctaStrategy.ctaTemplate import (CtaTemplate,
BarManager,
ArrayManager)
########################################################################
class AtrRsiStrategy(CtaTemplate):
"""结合ATR和RSI指标的一个分钟线交易策略"""
className = 'AtrRsiStrategy'
author = u'用Python的交易员'
# 策略参数
atrLength = 22 # 计算ATR指标的窗口数
atrMaLength = 10 # 计算ATR均线的窗口数
rsiLength = 5 # 计算RSI的窗口数
rsiEntry = 16 # RSI的开仓信号
trailingPercent = 0.8 # 百分比移动止损
initDays = 10 # 初始化数据所用的天数
fixedSize = 1 # 每次交易的数量
# 策略变量
atrValue = 0 # 最新的ATR指标数值
atrMa = 0 # ATR移动平均的数值
rsiValue = 0 # RSI指标的数值
rsiBuy = 0 # RSI买开阈值
rsiSell = 0 # RSI卖开阈值
intraTradeHigh = 0 # 移动止损用的持仓期内最高价
intraTradeLow = 0 # 移动止损用的持仓期内最低价
# 参数列表,保存了参数的名称
paramList = ['name',
'className',
'author',
'vtSymbol',
'atrLength',
'atrMaLength',
'rsiLength',
'rsiEntry',
'trailingPercent']
# 变量列表,保存了变量的名称
varList = ['inited',
'trading',
'pos',
'atrValue',
'atrMa',
'rsiValue',
'rsiBuy',
'rsiSell']
#----------------------------------------------------------------------
def __init__(self, ctaEngine, setting):
"""Constructor"""
super(AtrRsiStrategy, self).__init__(ctaEngine, setting)
# 创建K线合成器对象
self.bm = BarManager(self.onBar)
self.am = ArrayManager()
# 注意策略类中的可变对象属性通常是list和dict等在策略初始化时需要重新创建
# 否则会出现多个策略实例之间数据共享的情况,有可能导致潜在的策略逻辑错误风险,
# 策略类中的这些可变对象属性可以选择不写全都放在__init__下面写主要是为了阅读
# 策略时方便(更多是个编程习惯的选择)
#----------------------------------------------------------------------
def onInit(self):
"""初始化策略(必须由用户继承实现)"""
self.writeCtaLog(u'%s策略初始化' %self.name)
# 初始化RSI入场阈值
self.rsiBuy = 50 + self.rsiEntry
self.rsiSell = 50 - self.rsiEntry
# 载入历史数据,并采用回放计算的方式初始化策略数值
initData = self.loadBar(self.initDays)
for bar in initData:
self.onBar(bar)
self.putEvent()
#----------------------------------------------------------------------
def onStart(self):
"""启动策略(必须由用户继承实现)"""
self.writeCtaLog(u'%s策略启动' %self.name)
self.putEvent()
#----------------------------------------------------------------------
def onStop(self):
"""停止策略(必须由用户继承实现)"""
self.writeCtaLog(u'%s策略停止' %self.name)
self.putEvent()
#----------------------------------------------------------------------
def onTick(self, tick):
"""收到行情TICK推送必须由用户继承实现"""
self.bm.updateTick(tick)
#----------------------------------------------------------------------
def onBar(self, bar):
"""收到Bar推送必须由用户继承实现"""
self.cancelAll()
# 保存K线数据
am = self.am
am.updateBar(bar)
if not am.inited:
return
# 计算指标数值
atrArray = am.atr(self.atrLength, array=True)
self.atrValue = atrArray[-1]
self.atrMa = atrArray[-self.atrMaLength:].mean()
self.rsiValue = am.rsi(self.rsiLength)
# 判断是否要进行交易
# 当前无仓位
if self.pos == 0:
self.intraTradeHigh = bar.high
self.intraTradeLow = bar.low
# ATR数值上穿其移动平均线说明行情短期内波动加大
# 即处于趋势的概率较大适合CTA开仓
if self.atrValue > self.atrMa:
# 使用RSI指标的趋势行情时会在超买超卖区钝化特征作为开仓信号
if self.rsiValue > self.rsiBuy:
# 这里为了保证成交选择超价5个整指数点下单
self.buy(bar.close+5, self.fixedSize)
elif self.rsiValue < self.rsiSell:
self.short(bar.close-5, self.fixedSize)
# 持有多头仓位
elif self.pos > 0:
# 计算多头持有期内的最高价,以及重置最低价
self.intraTradeHigh = max(self.intraTradeHigh, bar.high)
self.intraTradeLow = bar.low
# 计算多头移动止损
longStop = self.intraTradeHigh * (1-self.trailingPercent/100)
# 发出本地止损委托,并且把委托号记录下来,用于后续撤单
self.sell(longStop, abs(self.pos), stop=True)
# 持有空头仓位
elif self.pos < 0:
self.intraTradeLow = min(self.intraTradeLow, bar.low)
self.intraTradeHigh = bar.high
shortStop = self.intraTradeLow * (1+self.trailingPercent/100)
self.cover(shortStop, abs(self.pos), stop=True)
# 发出状态更新事件
self.putEvent()
#----------------------------------------------------------------------
def onOrder(self, order):
"""收到委托变化推送(必须由用户继承实现)"""
pass
#----------------------------------------------------------------------
def onTrade(self, trade):
# 发出状态更新事件
self.putEvent()
#----------------------------------------------------------------------
def onStopOrder(self, so):
"""停止单推送"""
pass

View File

@ -1,187 +0,0 @@
# encoding: UTF-8
"""
感谢Darwin Quant贡献的策略思路
知乎专栏原文https://zhuanlan.zhihu.com/p/24448511
策略逻辑
1. 布林通道信号
2. CCI指标过滤
3. ATR指标止损
适合品种螺纹钢
适合周期15分钟
这里的策略是作者根据原文结合vn.py实现对策略实现上做了一些修改仅供参考
"""
from __future__ import division
from vnpy.trader.vtObject import VtBarData
from vnpy.trader.vtConstant import EMPTY_STRING
from vnpy.trader.app.ctaStrategy.ctaTemplate import (CtaTemplate,
BarManager,
ArrayManager)
########################################################################
class BollChannelStrategy(CtaTemplate):
"""基于布林通道的交易策略"""
className = 'BollChannelStrategy'
author = u'用Python的交易员'
# 策略参数
bollWindow = 18 # 布林通道窗口数
bollDev = 3.4 # 布林通道的偏差
cciWindow = 10 # CCI窗口数
atrWindow = 30 # ATR窗口数
slMultiplier = 5.2 # 计算止损距离的乘数
initDays = 10 # 初始化数据所用的天数
fixedSize = 1 # 每次交易的数量
# 策略变量
bollUp = 0 # 布林通道上轨
bollDown = 0 # 布林通道下轨
cciValue = 0 # CCI指标数值
atrValue = 0 # ATR指标数值
intraTradeHigh = 0 # 持仓期内的最高点
intraTradeLow = 0 # 持仓期内的最低点
longStop = 0 # 多头止损
shortStop = 0 # 空头止损
# 参数列表,保存了参数的名称
paramList = ['name',
'className',
'author',
'vtSymbol',
'bollWindow',
'bollDev',
'cciWindow',
'atrWindow',
'slMultiplier',
'initDays',
'fixedSize']
# 变量列表,保存了变量的名称
varList = ['inited',
'trading',
'pos',
'bollUp',
'bollDown',
'cciValue',
'atrValue',
'intraTradeHigh',
'intraTradeLow',
'longStop',
'shortStop']
#----------------------------------------------------------------------
def __init__(self, ctaEngine, setting):
"""Constructor"""
super(BollChannelStrategy, self).__init__(ctaEngine, setting)
self.bm = BarManager(self.onBar, 15, self.onXminBar) # 创建K线合成器对象
self.am = ArrayManager()
#----------------------------------------------------------------------
def onInit(self):
"""初始化策略(必须由用户继承实现)"""
self.writeCtaLog(u'%s策略初始化' %self.name)
# 载入历史数据,并采用回放计算的方式初始化策略数值
initData = self.loadBar(self.initDays)
for bar in initData:
self.onBar(bar)
self.putEvent()
#----------------------------------------------------------------------
def onStart(self):
"""启动策略(必须由用户继承实现)"""
self.writeCtaLog(u'%s策略启动' %self.name)
self.putEvent()
#----------------------------------------------------------------------
def onStop(self):
"""停止策略(必须由用户继承实现)"""
self.writeCtaLog(u'%s策略停止' %self.name)
self.putEvent()
#----------------------------------------------------------------------
def onTick(self, tick):
"""收到行情TICK推送必须由用户继承实现"""
self.bm.updateTick(tick)
#----------------------------------------------------------------------
def onBar(self, bar):
"""收到Bar推送必须由用户继承实现"""
self.bm.updateBar(bar)
#----------------------------------------------------------------------
def onXminBar(self, bar):
"""收到X分钟K线"""
# 全撤之前发出的委托
self.cancelAll()
# 保存K线数据
am = self.am
am.updateBar(bar)
if not am.inited:
return
# 计算指标数值
self.bollUp, self.bollDown = am.boll(self.bollWindow, self.bollDev)
self.cciValue = am.cci(self.cciWindow)
self.atrValue = am.atr(self.atrWindow)
# 判断是否要进行交易
# 当前无仓位,发送开仓委托
if self.pos == 0:
self.intraTradeHigh = bar.high
self.intraTradeLow = bar.low
if self.cciValue > 0:
self.buy(self.bollUp, self.fixedSize, True)
elif self.cciValue < 0:
self.short(self.bollDown, self.fixedSize, True)
# 持有多头仓位
elif self.pos > 0:
self.intraTradeHigh = max(self.intraTradeHigh, bar.high)
self.intraTradeLow = bar.low
self.longStop = self.intraTradeHigh - self.atrValue * self.slMultiplier
self.sell(self.longStop, abs(self.pos), True)
# 持有空头仓位
elif self.pos < 0:
self.intraTradeHigh = bar.high
self.intraTradeLow = min(self.intraTradeLow, bar.low)
self.shortStop = self.intraTradeLow + self.atrValue * self.slMultiplier
self.cover(self.shortStop, abs(self.pos), True)
# 发出状态更新事件
self.putEvent()
#----------------------------------------------------------------------
def onOrder(self, order):
"""收到委托变化推送(必须由用户继承实现)"""
pass
#----------------------------------------------------------------------
def onTrade(self, trade):
# 发出状态更新事件
self.putEvent()
#----------------------------------------------------------------------
def onStopOrder(self, so):
"""停止单推送"""
pass

View File

@ -1,155 +0,0 @@
# encoding: UTF-8
"""
这里的Demo是一个最简单的双均线策略实现并未考虑太多实盘中的交易细节
1. 委托价格超出涨跌停价导致的委托失败
2. 委托未成交需要撤单后重新委托
3. 断网后恢复交易状态
4. 等等
这些点是作者选择特意忽略不去实现因此想实盘的朋友请自己多多研究CTA交易的一些细节
做到了然于胸后再去交易对自己的money和时间负责
也希望社区能做出一个解决了以上潜在风险的Demo出来
"""
from __future__ import division
from vnpy.trader.vtConstant import EMPTY_STRING, EMPTY_FLOAT
from vnpy.trader.app.ctaStrategy.ctaTemplate import (CtaTemplate,
BarManager,
ArrayManager)
########################################################################
class DoubleMaStrategy(CtaTemplate):
"""双指数均线策略Demo"""
className = 'DoubleMaStrategy'
author = u'用Python的交易员'
# 策略参数
fastWindow = 10 # 快速均线参数
slowWindow = 60 # 慢速均线参数
initDays = 10 # 初始化数据所用的天数
# 策略变量
fastMa0 = EMPTY_FLOAT # 当前最新的快速EMA
fastMa1 = EMPTY_FLOAT # 上一根的快速EMA
slowMa0 = EMPTY_FLOAT
slowMa1 = EMPTY_FLOAT
# 参数列表,保存了参数的名称
paramList = ['name',
'className',
'author',
'vtSymbol',
'fastWindow',
'slowWindow']
# 变量列表,保存了变量的名称
varList = ['inited',
'trading',
'pos',
'fastMa0',
'fastMa1',
'slowMa0',
'slowMa1']
#----------------------------------------------------------------------
def __init__(self, ctaEngine, setting):
"""Constructor"""
super(DoubleMaStrategy, self).__init__(ctaEngine, setting)
self.bm = BarManager(self.onBar)
self.am = ArrayManager()
# 注意策略类中的可变对象属性通常是list和dict等在策略初始化时需要重新创建
# 否则会出现多个策略实例之间数据共享的情况,有可能导致潜在的策略逻辑错误风险,
# 策略类中的这些可变对象属性可以选择不写全都放在__init__下面写主要是为了阅读
# 策略时方便(更多是个编程习惯的选择)
#----------------------------------------------------------------------
def onInit(self):
"""初始化策略(必须由用户继承实现)"""
self.writeCtaLog(u'双EMA演示策略初始化')
initData = self.loadBar(self.initDays)
for bar in initData:
self.onBar(bar)
self.putEvent()
#----------------------------------------------------------------------
def onStart(self):
"""启动策略(必须由用户继承实现)"""
self.writeCtaLog(u'双EMA演示策略启动')
self.putEvent()
#----------------------------------------------------------------------
def onStop(self):
"""停止策略(必须由用户继承实现)"""
self.writeCtaLog(u'双EMA演示策略停止')
self.putEvent()
#----------------------------------------------------------------------
def onTick(self, tick):
"""收到行情TICK推送必须由用户继承实现"""
self.bm.updateTick(tick)
#----------------------------------------------------------------------
def onBar(self, bar):
"""收到Bar推送必须由用户继承实现"""
am = self.am
am.updateBar(bar)
if not am.inited:
return
# 计算快慢均线
fastMa = am.sma(self.fastWindow, array=True)
self.fastMa0 = fastMa[-1]
self.fastMa1 = fastMa[-2]
slowMa = am.sma(self.slowWindow, array=True)
self.slowMa0 = slowMa[-1]
self.slowMa1 = slowMa[-2]
# 判断买卖
crossOver = self.fastMa0>self.slowMa0 and self.fastMa1<self.slowMa1 # 金叉上穿
crossBelow = self.fastMa0<self.slowMa0 and self.fastMa1>self.slowMa1 # 死叉下穿
# 金叉和死叉的条件是互斥
# 所有的委托均以K线收盘价委托这里有一个实盘中无法成交的风险考虑添加对模拟市价单类型的支持
if crossOver:
# 如果金叉时手头没有持仓,则直接做多
if self.pos == 0:
self.buy(bar.close, 1)
# 如果有空头持仓,则先平空,再做多
elif self.pos < 0:
self.cover(bar.close, 1)
self.buy(bar.close, 1)
# 死叉和金叉相反
elif crossBelow:
if self.pos == 0:
self.short(bar.close, 1)
elif self.pos > 0:
self.sell(bar.close, 1)
self.short(bar.close, 1)
# 发出状态更新事件
self.putEvent()
#----------------------------------------------------------------------
def onOrder(self, order):
"""收到委托变化推送(必须由用户继承实现)"""
# 对于无需做细粒度委托控制的策略可以忽略onOrder
pass
#----------------------------------------------------------------------
def onTrade(self, trade):
"""收到成交推送(必须由用户继承实现)"""
# 对于无需做细粒度委托控制的策略可以忽略onOrder
pass
#----------------------------------------------------------------------
def onStopOrder(self, so):
"""停止单推送"""
pass

View File

@ -1,187 +0,0 @@
# encoding: UTF-8
"""
DualThrust交易策略
"""
from datetime import time
from vnpy.trader.vtObject import VtBarData
from vnpy.trader.vtConstant import EMPTY_STRING
from vnpy.trader.app.ctaStrategy.ctaTemplate import CtaTemplate, BarManager
########################################################################
class DualThrustStrategy(CtaTemplate):
"""DualThrust交易策略"""
className = 'DualThrustStrategy'
author = u'用Python的交易员'
# 策略参数
fixedSize = 100
k1 = 0.4
k2 = 0.6
initDays = 10
# 策略变量
barList = [] # K线对象的列表
dayOpen = 0
dayHigh = 0
dayLow = 0
range = 0
longEntry = 0
shortEntry = 0
exitTime = time(hour=14, minute=55)
longEntered = False
shortEntered = False
# 参数列表,保存了参数的名称
paramList = ['name',
'className',
'author',
'vtSymbol',
'k1',
'k2']
# 变量列表,保存了变量的名称
varList = ['inited',
'trading',
'pos',
'range',
'longEntry',
'shortEntry',
'exitTime']
#----------------------------------------------------------------------
def __init__(self, ctaEngine, setting):
"""Constructor"""
super(DualThrustStrategy, self).__init__(ctaEngine, setting)
self.bm = BarManager(self.onBar)
self.barList = []
#----------------------------------------------------------------------
def onInit(self):
"""初始化策略(必须由用户继承实现)"""
self.writeCtaLog(u'%s策略初始化' %self.name)
# 载入历史数据,并采用回放计算的方式初始化策略数值
initData = self.loadBar(self.initDays)
for bar in initData:
self.onBar(bar)
self.putEvent()
#----------------------------------------------------------------------
def onStart(self):
"""启动策略(必须由用户继承实现)"""
self.writeCtaLog(u'%s策略启动' %self.name)
self.putEvent()
#----------------------------------------------------------------------
def onStop(self):
"""停止策略(必须由用户继承实现)"""
self.writeCtaLog(u'%s策略停止' %self.name)
self.putEvent()
#----------------------------------------------------------------------
def onTick(self, tick):
"""收到行情TICK推送必须由用户继承实现"""
self.bm.updateTick(tick)
#----------------------------------------------------------------------
def onBar(self, bar):
"""收到Bar推送必须由用户继承实现"""
# 撤销之前发出的尚未成交的委托(包括限价单和停止单)
self.cancelAll()
# 计算指标数值
self.barList.append(bar)
if len(self.barList) <= 2:
return
else:
self.barList.pop(0)
lastBar = self.barList[-2]
# 新的一天
if lastBar.datetime.date() != bar.datetime.date():
# 如果已经初始化
if self.dayHigh:
self.range = self.dayHigh - self.dayLow
self.longEntry = bar.open + self.k1 * self.range
self.shortEntry = bar.open - self.k2 * self.range
self.dayOpen = bar.open
self.dayHigh = bar.high
self.dayLow = bar.low
self.longEntered = False
self.shortEntered = False
else:
self.dayHigh = max(self.dayHigh, bar.high)
self.dayLow = min(self.dayLow, bar.low)
# 尚未到收盘
if not self.range:
return
if bar.datetime.time() < self.exitTime:
if self.pos == 0:
if bar.close > self.dayOpen:
if not self.longEntered:
self.buy(self.longEntry, self.fixedSize, stop=True)
else:
if not self.shortEntered:
self.short(self.shortEntry, self.fixedSize, stop=True)
# 持有多头仓位
elif self.pos > 0:
self.longEntered = True
# 多头止损单
self.sell(self.shortEntry, self.fixedSize, stop=True)
# 空头开仓单
if not self.shortEntered:
self.short(self.shortEntry, self.fixedSize, stop=True)
# 持有空头仓位
elif self.pos < 0:
self.shortEntered = True
# 空头止损单
self.cover(self.longEntry, self.fixedSize, stop=True)
# 多头开仓单
if not self.longEntered:
self.buy(self.longEntry, self.fixedSize, stop=True)
# 收盘平仓
else:
if self.pos > 0:
self.sell(bar.close * 0.99, abs(self.pos))
elif self.pos < 0:
self.cover(bar.close * 1.01, abs(self.pos))
# 发出状态更新事件
self.putEvent()
#----------------------------------------------------------------------
def onOrder(self, order):
"""收到委托变化推送(必须由用户继承实现)"""
pass
#----------------------------------------------------------------------
def onTrade(self, trade):
# 发出状态更新事件
self.putEvent()
#----------------------------------------------------------------------
def onStopOrder(self, so):
"""停止单推送"""
pass

View File

@ -1,198 +0,0 @@
# encoding: UTF-8
"""
基于King Keltner通道的交易策略适合用在股指上
展示了OCO委托和5分钟K线聚合的方法
注意事项
1. 作者不对交易盈利做任何保证策略代码仅供参考
2. 本策略需要用到talib没有安装的用户请先参考www.vnpy.org上的教程安装
3. 将IF0000_1min.csv用ctaHistoryData.py导入MongoDB后直接运行本文件即可回测策略
"""
from __future__ import division
from vnpy.trader.vtObject import VtBarData
from vnpy.trader.vtConstant import EMPTY_STRING
from vnpy.trader.app.ctaStrategy.ctaTemplate import (CtaTemplate,
BarManager,
ArrayManager)
########################################################################
class KkStrategy(CtaTemplate):
"""基于King Keltner通道的交易策略"""
className = 'KkStrategy'
author = u'用Python的交易员'
# 策略参数
kkLength = 11 # 计算通道中值的窗口数
kkDev = 1.6 # 计算通道宽度的偏差
trailingPrcnt = 0.8 # 移动止损
initDays = 10 # 初始化数据所用的天数
fixedSize = 1 # 每次交易的数量
# 策略变量
kkUp = 0 # KK通道上轨
kkDown = 0 # KK通道下轨
intraTradeHigh = 0 # 持仓期内的最高点
intraTradeLow = 0 # 持仓期内的最低点
buyOrderIDList = [] # OCO委托买入开仓的委托号
shortOrderIDList = [] # OCO委托卖出开仓的委托号
orderList = [] # 保存委托代码的列表
# 参数列表,保存了参数的名称
paramList = ['name',
'className',
'author',
'vtSymbol',
'kkLength',
'kkDev']
# 变量列表,保存了变量的名称
varList = ['inited',
'trading',
'pos',
'kkUp',
'kkDown']
#----------------------------------------------------------------------
def __init__(self, ctaEngine, setting):
"""Constructor"""
super(KkStrategy, self).__init__(ctaEngine, setting)
self.bm = BarManager(self.onBar, 5, self.onFiveBar) # 创建K线合成器对象
self.am = ArrayManager()
self.buyOrderIDList = []
self.shortOrderIDList = []
self.orderList = []
#----------------------------------------------------------------------
def onInit(self):
"""初始化策略(必须由用户继承实现)"""
self.writeCtaLog(u'%s策略初始化' %self.name)
# 载入历史数据,并采用回放计算的方式初始化策略数值
initData = self.loadBar(self.initDays)
for bar in initData:
self.onBar(bar)
self.putEvent()
#----------------------------------------------------------------------
def onStart(self):
"""启动策略(必须由用户继承实现)"""
self.writeCtaLog(u'%s策略启动' %self.name)
self.putEvent()
#----------------------------------------------------------------------
def onStop(self):
"""停止策略(必须由用户继承实现)"""
self.writeCtaLog(u'%s策略停止' %self.name)
self.putEvent()
#----------------------------------------------------------------------
def onTick(self, tick):
"""收到行情TICK推送必须由用户继承实现"""
self.bm.updateTick(tick)
#----------------------------------------------------------------------
def onBar(self, bar):
"""收到Bar推送必须由用户继承实现"""
self.bm.updateBar(bar)
#----------------------------------------------------------------------
def onFiveBar(self, bar):
"""收到5分钟K线"""
# 撤销之前发出的尚未成交的委托(包括限价单和停止单)
for orderID in self.orderList:
self.cancelOrder(orderID)
self.orderList = []
# 保存K线数据
am = self.am
am.updateBar(bar)
if not am.inited:
return
# 计算指标数值
self.kkUp, self.kkDown = am.keltner(self.kkLength, self.kkDev)
# 判断是否要进行交易
# 当前无仓位发送OCO开仓委托
if self.pos == 0:
self.intraTradeHigh = bar.high
self.intraTradeLow = bar.low
self.sendOcoOrder(self.kkUp, self.kkDown, self.fixedSize)
# 持有多头仓位
elif self.pos > 0:
self.intraTradeHigh = max(self.intraTradeHigh, bar.high)
self.intraTradeLow = bar.low
l = self.sell(self.intraTradeHigh*(1-self.trailingPrcnt/100),
abs(self.pos), True)
self.orderList.extend(l)
# 持有空头仓位
elif self.pos < 0:
self.intraTradeHigh = bar.high
self.intraTradeLow = min(self.intraTradeLow, bar.low)
l = self.cover(self.intraTradeLow*(1+self.trailingPrcnt/100),
abs(self.pos), True)
self.orderList.extend(l)
# 发出状态更新事件
self.putEvent()
#----------------------------------------------------------------------
def onOrder(self, order):
"""收到委托变化推送(必须由用户继承实现)"""
pass
#----------------------------------------------------------------------
def onTrade(self, trade):
if self.pos != 0:
# 多头开仓成交后,撤消空头委托
if self.pos > 0:
for shortOrderID in self.shortOrderIDList:
self.cancelOrder(shortOrderID)
# 反之同样
elif self.pos < 0:
for buyOrderID in self.buyOrderIDList:
self.cancelOrder(buyOrderID)
# 移除委托号
for orderID in (self.buyOrderIDList + self.shortOrderIDList):
if orderID in self.orderList:
self.orderList.remove(orderID)
# 发出状态更新事件
self.putEvent()
#----------------------------------------------------------------------
def sendOcoOrder(self, buyPrice, shortPrice, volume):
"""
发送OCO委托
OCO(One Cancel Other)委托
1. 主要用于实现区间突破入场
2. 包含两个方向相反的停止单
3. 一个方向的停止单成交后会立即撤消另一个方向的
"""
# 发送双边的停止单委托,并记录委托号
self.buyOrderIDList = self.buy(buyPrice, volume, True)
self.shortOrderIDList = self.short(shortPrice, volume, True)
# 将委托号记录到列表中
self.orderList.extend(self.buyOrderIDList)
self.orderList.extend(self.shortOrderIDList)
#----------------------------------------------------------------------
def onStopOrder(self, so):
"""停止单推送"""
pass

View File

@ -1,270 +0,0 @@
# encoding: UTF-8
'''
CTA模块相关的GUI控制组件
'''
from vnpy.event import Event
from vnpy.trader.vtEvent import *
from vnpy.trader.uiBasicWidget import QtGui, QtCore, QtWidgets, BasicCell
from .ctaBase import EVENT_CTA_LOG, EVENT_CTA_STRATEGY
from .language import text
########################################################################
class CtaValueMonitor(QtWidgets.QTableWidget):
"""参数监控"""
#----------------------------------------------------------------------
def __init__(self, parent=None):
"""Constructor"""
super(CtaValueMonitor, self).__init__(parent)
self.keyCellDict = {}
self.data = None
self.inited = False
self.initUi()
#----------------------------------------------------------------------
def initUi(self):
"""初始化界面"""
self.setRowCount(1)
self.verticalHeader().setVisible(False)
self.setEditTriggers(self.NoEditTriggers)
self.setMaximumHeight(self.sizeHint().height())
#----------------------------------------------------------------------
def updateData(self, data):
"""更新数据"""
if not self.inited:
self.setColumnCount(len(data))
self.setHorizontalHeaderLabels(data.keys())
col = 0
for k, v in data.items():
cell = QtWidgets.QTableWidgetItem(unicode(v))
self.keyCellDict[k] = cell
self.setItem(0, col, cell)
col += 1
self.inited = True
else:
for k, v in data.items():
cell = self.keyCellDict[k]
cell.setText(unicode(v))
########################################################################
class CtaStrategyManager(QtWidgets.QGroupBox):
"""策略管理组件"""
signal = QtCore.Signal(type(Event()))
#----------------------------------------------------------------------
def __init__(self, ctaEngine, eventEngine, name, parent=None):
"""Constructor"""
super(CtaStrategyManager, self).__init__(parent)
self.ctaEngine = ctaEngine
self.eventEngine = eventEngine
self.name = name
self.initUi()
self.updateMonitor()
self.registerEvent()
#----------------------------------------------------------------------
def initUi(self):
"""初始化界面"""
self.setTitle(self.name)
self.paramMonitor = CtaValueMonitor(self)
self.varMonitor = CtaValueMonitor(self)
height = 65
self.paramMonitor.setFixedHeight(height)
self.varMonitor.setFixedHeight(height)
buttonInit = QtWidgets.QPushButton(text.INIT)
buttonStart = QtWidgets.QPushButton(text.START)
buttonStop = QtWidgets.QPushButton(text.STOP)
buttonInit.clicked.connect(self.init)
buttonStart.clicked.connect(self.start)
buttonStop.clicked.connect(self.stop)
hbox1 = QtWidgets.QHBoxLayout()
hbox1.addWidget(buttonInit)
hbox1.addWidget(buttonStart)
hbox1.addWidget(buttonStop)
hbox1.addStretch()
hbox2 = QtWidgets.QHBoxLayout()
hbox2.addWidget(self.paramMonitor)
hbox3 = QtWidgets.QHBoxLayout()
hbox3.addWidget(self.varMonitor)
vbox = QtWidgets.QVBoxLayout()
vbox.addLayout(hbox1)
vbox.addLayout(hbox2)
vbox.addLayout(hbox3)
self.setLayout(vbox)
#----------------------------------------------------------------------
def updateMonitor(self, event=None):
"""显示策略最新状态"""
paramDict = self.ctaEngine.getStrategyParam(self.name)
if paramDict:
self.paramMonitor.updateData(paramDict)
varDict = self.ctaEngine.getStrategyVar(self.name)
if varDict:
self.varMonitor.updateData(varDict)
#----------------------------------------------------------------------
def registerEvent(self):
"""注册事件监听"""
self.signal.connect(self.updateMonitor)
self.eventEngine.register(EVENT_CTA_STRATEGY+self.name, self.signal.emit)
#----------------------------------------------------------------------
def init(self):
"""初始化策略"""
self.ctaEngine.initStrategy(self.name)
#----------------------------------------------------------------------
def start(self):
"""启动策略"""
self.ctaEngine.startStrategy(self.name)
#----------------------------------------------------------------------
def stop(self):
"""停止策略"""
self.ctaEngine.stopStrategy(self.name)
########################################################################
class CtaEngineManager(QtWidgets.QWidget):
"""CTA引擎管理组件"""
signal = QtCore.Signal(type(Event()))
#----------------------------------------------------------------------
def __init__(self, ctaEngine, eventEngine, parent=None):
"""Constructor"""
super(CtaEngineManager, self).__init__(parent)
self.ctaEngine = ctaEngine
self.eventEngine = eventEngine
self.strategyLoaded = False
self.initUi()
self.registerEvent()
# 记录日志
self.ctaEngine.writeCtaLog(text.CTA_ENGINE_STARTED)
#----------------------------------------------------------------------
def initUi(self):
"""初始化界面"""
self.setWindowTitle(text.CTA_STRATEGY)
# 按钮
loadButton = QtWidgets.QPushButton(text.LOAD_STRATEGY)
initAllButton = QtWidgets.QPushButton(text.INIT_ALL)
startAllButton = QtWidgets.QPushButton(text.START_ALL)
stopAllButton = QtWidgets.QPushButton(text.STOP_ALL)
loadButton.clicked.connect(self.load)
initAllButton.clicked.connect(self.initAll)
startAllButton.clicked.connect(self.startAll)
stopAllButton.clicked.connect(self.stopAll)
# 滚动区域放置所有的CtaStrategyManager
self.scrollArea = QtWidgets.QScrollArea()
self.scrollArea.setWidgetResizable(True)
# CTA组件的日志监控
self.ctaLogMonitor = QtWidgets.QTextEdit()
self.ctaLogMonitor.setReadOnly(True)
self.ctaLogMonitor.setMaximumHeight(200)
# 设置布局
hbox2 = QtWidgets.QHBoxLayout()
hbox2.addWidget(loadButton)
hbox2.addWidget(initAllButton)
hbox2.addWidget(startAllButton)
hbox2.addWidget(stopAllButton)
hbox2.addStretch()
vbox = QtWidgets.QVBoxLayout()
vbox.addLayout(hbox2)
vbox.addWidget(self.scrollArea)
vbox.addWidget(self.ctaLogMonitor)
self.setLayout(vbox)
#----------------------------------------------------------------------
def initStrategyManager(self):
"""初始化策略管理组件界面"""
w = QtWidgets.QWidget()
vbox = QtWidgets.QVBoxLayout()
for name in self.ctaEngine.strategyDict.keys():
strategyManager = CtaStrategyManager(self.ctaEngine, self.eventEngine, name)
vbox.addWidget(strategyManager)
vbox.addStretch()
w.setLayout(vbox)
self.scrollArea.setWidget(w)
#----------------------------------------------------------------------
def initAll(self):
"""全部初始化"""
self.ctaEngine.initAll()
#----------------------------------------------------------------------
def startAll(self):
"""全部启动"""
self.ctaEngine.startAll()
#----------------------------------------------------------------------
def stopAll(self):
"""全部停止"""
self.ctaEngine.stopAll()
#----------------------------------------------------------------------
def load(self):
"""加载策略"""
if not self.strategyLoaded:
self.ctaEngine.loadSetting()
self.initStrategyManager()
self.strategyLoaded = True
self.ctaEngine.writeCtaLog(text.STRATEGY_LOADED)
#----------------------------------------------------------------------
def updateCtaLog(self, event):
"""更新CTA相关日志"""
log = event.dict_['data']
content = '\t'.join([log.logTime, log.logContent])
self.ctaLogMonitor.append(content)
#----------------------------------------------------------------------
def registerEvent(self):
"""注册事件监听"""
self.signal.connect(self.updateCtaLog)
self.eventEngine.register(EVENT_CTA_LOG, self.signal.emit)