diff --git a/vn.lhang/vnlhang.py b/vn.lhang/vnlhang.py index 72b7225b..70245d69 100644 --- a/vn.lhang/vnlhang.py +++ b/vn.lhang/vnlhang.py @@ -6,6 +6,8 @@ import hashlib import requests from Queue import Queue, Empty from threading import Thread +from time import sleep + LHANG_API_ROOT ="https://api.lhang.com/v1/" @@ -47,17 +49,19 @@ class LhangApi(object): """Constructor""" self.apiKey = '' self.secretKey = '' - + + self.interval = 1 # 每次请求的间隔等待 self.active = False # API工作状态 self.reqID = 0 # 请求编号 self.reqQueue = Queue() # 请求队列 self.reqThread = Thread(target=self.processQueue) # 请求处理线程 #---------------------------------------------------------------------- - def init(self, apiKey, secretKey): + def init(self, apiKey, secretKey, interval): """初始化""" self.apiKey = apiKey self.secretKey = secretKey + self.interval = interval self.active = True self.reqThread.start() @@ -66,7 +70,9 @@ class LhangApi(object): def exit(self): """退出""" self.active = False - self.reqThread.join() + + if self.reqThread.isAlive(): + self.reqThread.join() #---------------------------------------------------------------------- def processRequest(self, req): @@ -107,15 +113,18 @@ class LhangApi(object): # 请求失败 if data is None: error = u'请求失败' - self.onError(error, reqID) + self.onError(error, req, reqID) elif 'error_code' in data: error = u'请求出错,错误代码:%s' % data['error_code'] - self.onError(error, reqID) + self.onError(error, req, reqID) # 请求成功 else: if self.DEBUG: print callback.__name__ - callback(data, reqID) + callback(data, req, reqID) + + # 流控等待 + sleep(self.interval) except Empty: pass @@ -138,9 +147,9 @@ class LhangApi(object): return self.reqID #---------------------------------------------------------------------- - def onError(self, error, reqID): + def onError(self, error, req, reqID): """错误推送""" - print error, reqID + print error, req, reqID ############################################### # 行情接口 @@ -192,22 +201,22 @@ class LhangApi(object): return self.sendRequest(function, params, callback) #---------------------------------------------------------------------- - def onGetTicker(self, data, reqID): + def onGetTicker(self, data, req, reqID): """查询行情回调""" print data, reqID # ---------------------------------------------------------------------- - def onGetDepth(self, data, reqID): + def onGetDepth(self, data, req, reqID): """查询深度回调""" print data, reqID # ---------------------------------------------------------------------- - def onGetTrades(self, data, reqID): + def onGetTrades(self, data, req, reqID): """查询历史成交""" print data, reqID # ---------------------------------------------------------------------- - def onGetKline(self, data, reqID): + def onGetKline(self, data, req, reqID): """查询K线回报""" print data, reqID @@ -272,27 +281,27 @@ class LhangApi(object): return self.sendRequest(function, params, callback) # ---------------------------------------------------------------------- - def onGetUserInfo(self, data, reqID): + def onGetUserInfo(self, data, req, reqID): """查询K线回报""" print data, reqID # ---------------------------------------------------------------------- - def onCreateOrder(self, data, reqID): + def onCreateOrder(self, data, req, reqID): """委托回报""" print data, reqID # ---------------------------------------------------------------------- - def onCancelOrder(self, data, reqID): + def onCancelOrder(self, data, req, reqID): """撤单回报""" print data, reqID # ---------------------------------------------------------------------- - def onGetOrdersInfo(self, data, reqID): + def onGetOrdersInfo(self, data, req, reqID): """查询委托回报""" print data, reqID # ---------------------------------------------------------------------- - def onGetOrdersInfoHistory(self, data, reqID): + def onGetOrdersInfoHistory(self, data, req, reqID): """撤单回报""" print data, reqID diff --git a/vn.trader/ctaAlgo/CTA_setting.json b/vn.trader/ctaAlgo/CTA_setting.json index a0eaa491..19884be3 100644 --- a/vn.trader/ctaAlgo/CTA_setting.json +++ b/vn.trader/ctaAlgo/CTA_setting.json @@ -2,12 +2,18 @@ { "name": "double ema", "className": "EmaDemoStrategy", - "vtSymbol": "IF1702" + "vtSymbol": "IF1706" }, { "name": "atr rsi", "className": "AtrRsiStrategy", - "vtSymbol": "IC1702" + "vtSymbol": "IC1706" + }, + + { + "name": "king keltner", + "className": "KkStrategy", + "vtSymbol": "IH1706" } ] \ No newline at end of file diff --git a/vn.trader/ctaAlgo/ctaBacktesting.py b/vn.trader/ctaAlgo/ctaBacktesting.py index 9055be79..5ce06ca3 100644 --- a/vn.trader/ctaAlgo/ctaBacktesting.py +++ b/vn.trader/ctaAlgo/ctaBacktesting.py @@ -434,7 +434,8 @@ class BacktestingEngine(object): self.limitOrderDict[orderID] = order # 从字典中删除该限价单 - del self.workingStopOrderDict[stopOrderID] + if stopOrderID in self.workingStopOrderDict: + del self.workingStopOrderDict[stopOrderID] #---------------------------------------------------------------------- def insertData(self, dbName, collectionName, data): diff --git a/vn.trader/ctaAlgo/strategy/strategyKingKeltner.py b/vn.trader/ctaAlgo/strategy/strategyKingKeltner.py new file mode 100644 index 00000000..4ae900cf --- /dev/null +++ b/vn.trader/ctaAlgo/strategy/strategyKingKeltner.py @@ -0,0 +1,300 @@ +# encoding: UTF-8 + +""" +基于King Keltner通道的交易策略,适合用在股指上, +展示了OCO委托和5分钟K线聚合的方法。 + +注意事项: +1. 作者不对交易盈利做任何保证,策略代码仅供参考 +2. 本策略需要用到talib,没有安装的用户请先参考www.vnpy.org上的教程安装 +3. 将IF0000_1min.csv用ctaHistoryData.py导入MongoDB后,直接运行本文件即可回测策略 +""" + +from __future__ import division + +from ctaBase import * +from ctaTemplate import CtaTemplate + +import talib +import numpy as np + + +######################################################################## +class KkStrategy(CtaTemplate): + """基于King Keltner通道的交易策略""" + className = 'KkStrategy' + author = u'用Python的交易员' + + # 策略参数 + kkLength = 32 # 计算通道中值的窗口数 + kkDev = 2.2 # 计算通道宽度的偏差 + initDays = 10 # 初始化数据所用的天数 + fixedSize = 1 # 每次交易的数量 + + # 策略变量 + bar = None # 1分钟K线对象 + barMinute = EMPTY_STRING # K线当前的分钟 + fiveBar = None # 1分钟K线对象 + + bufferSize = 100 # 需要缓存的数据的大小 + bufferCount = 0 # 目前已经缓存了的数据的计数 + highArray = np.zeros(bufferSize) # K线最高价的数组 + lowArray = np.zeros(bufferSize) # K线最低价的数组 + closeArray = np.zeros(bufferSize) # K线收盘价的数组 + + atrValue = 0 # 最新的ATR指标数值 + kkMid = 0 # KK通道中轨 + kkUp = 0 # KK通道上轨 + kkDown = 0 # KK通道下轨 + + buyOrderID = None # OCO委托买入开仓的委托号 + shortOrderID = None # OCO委托卖出开仓的委托号 + orderList = [] # 保存委托代码的列表 + + # 参数列表,保存了参数的名称 + paramList = ['name', + 'className', + 'author', + 'vtSymbol', + 'kkLength', + 'kkDev'] + + # 变量列表,保存了变量的名称 + varList = ['inited', + 'trading', + 'pos', + 'atrValue', + 'kkMid', + 'kkUp', + 'kkDown'] + + #---------------------------------------------------------------------- + def __init__(self, ctaEngine, setting): + """Constructor""" + super(KkStrategy, self).__init__(ctaEngine, setting) + + #---------------------------------------------------------------------- + def onInit(self): + """初始化策略(必须由用户继承实现)""" + self.writeCtaLog(u'%s策略初始化' %self.name) + + # 载入历史数据,并采用回放计算的方式初始化策略数值 + initData = self.loadBar(self.initDays) + for bar in initData: + self.onBar(bar) + + self.putEvent() + + #---------------------------------------------------------------------- + def onStart(self): + """启动策略(必须由用户继承实现)""" + self.writeCtaLog(u'%s策略启动' %self.name) + self.putEvent() + + #---------------------------------------------------------------------- + def onStop(self): + """停止策略(必须由用户继承实现)""" + self.writeCtaLog(u'%s策略停止' %self.name) + self.putEvent() + + #---------------------------------------------------------------------- + def onTick(self, tick): + """收到行情TICK推送(必须由用户继承实现)""" + # 聚合为1分钟K线 + tickMinute = tick.datetime.minute + + if tickMinute != self.barMinute: + if self.bar: + self.onBar(self.bar) + + bar = CtaBarData() + bar.vtSymbol = tick.vtSymbol + bar.symbol = tick.symbol + bar.exchange = tick.exchange + + bar.open = tick.lastPrice + bar.high = tick.lastPrice + bar.low = tick.lastPrice + bar.close = tick.lastPrice + + bar.date = tick.date + bar.time = tick.time + bar.datetime = tick.datetime # K线的时间设为第一个Tick的时间 + + self.bar = bar # 这种写法为了减少一层访问,加快速度 + self.barMinute = tickMinute # 更新当前的分钟 + else: # 否则继续累加新的K线 + bar = self.bar # 写法同样为了加快速度 + + bar.high = max(bar.high, tick.lastPrice) + bar.low = min(bar.low, tick.lastPrice) + bar.close = tick.lastPrice + + #---------------------------------------------------------------------- + def onBar(self, bar): + """收到Bar推送(必须由用户继承实现)""" + # 如果当前是一个5分钟走完 + if bar.datetime.minute % 5 == 0: + # 如果已经有聚合5分钟K线 + if self.fiveBar: + # 将最新分钟的数据更新到目前5分钟线中 + fiveBar = self.fiveBar + fiveBar.high = max(fiveBar.high, bar.high) + fiveBar.low = min(fiveBar.low, bar.low) + fiveBar.close = bar.close + + # 推送5分钟线数据 + self.onFiveBar(fiveBar) + + # 清空5分钟线数据缓存 + self.fiveBar = None + else: + # 如果没有缓存则新建 + if not self.fiveBar: + fiveBar = CtaBarData() + + fiveBar.vtSymbol = bar.vtSymbol + fiveBar.symbol = bar.symbol + fiveBar.exchange = bar.exchange + + fiveBar.open = bar.open + fiveBar.high = bar.high + fiveBar.low = bar.low + fiveBar.close = bar.close + + fiveBar.date = bar.date + fiveBar.time = bar.time + fiveBar.datetime = bar.datetime + + self.fiveBar = fiveBar + else: + fiveBar = self.fiveBar + fiveBar.high = max(fiveBar.high, bar.high) + fiveBar.low = min(fiveBar.low, bar.low) + fiveBar.close = bar.close + + #---------------------------------------------------------------------- + def onFiveBar(self, bar): + """收到5分钟K线""" + # 撤销之前发出的尚未成交的委托(包括限价单和停止单) + for orderID in self.orderList: + self.cancelOrder(orderID) + self.orderList = [] + + # 保存K线数据 + self.closeArray[0:self.bufferSize-1] = self.closeArray[1:self.bufferSize] + self.highArray[0:self.bufferSize-1] = self.highArray[1:self.bufferSize] + self.lowArray[0:self.bufferSize-1] = self.lowArray[1:self.bufferSize] + + self.closeArray[-1] = bar.close + self.highArray[-1] = bar.high + self.lowArray[-1] = bar.low + + self.bufferCount += 1 + if self.bufferCount < self.bufferSize: + return + + # 计算指标数值 + self.atrValue = talib.ATR(self.highArray, + self.lowArray, + self.closeArray, + self.kkLength)[-1] + self.kkMid = talib.MA(self.closeArray, self.kkLength)[-1] + self.kkUp = self.kkMid + self.atrValue * self.kkDev + self.kkDown = self.kkMid - self.atrValue * self.kkdev + + # 判断是否要进行交易 + + # 当前无仓位,发送OCO开仓委托 + if self.pos == 0: + self.sendOcoOrder(self.kkUp, self.kkDown, self.fixedSize) + + # 持有多头仓位 + elif self.pos > 0 and bar.close < self.kkUp: + orderID = self.sell(bar.close*0.97, abs(self.pos)) + self.orderList.append(orderID) + + # 持有空头仓位 + elif self.pos < 0 and bar.close > self.kkDown: + orderID = self.buy(bar.close*1.03, abs(self.pos)) + self.orderList.append(orderID) + + # 发出状态更新事件 + self.putEvent() + + #---------------------------------------------------------------------- + def onOrder(self, order): + """收到委托变化推送(必须由用户继承实现)""" + pass + + #---------------------------------------------------------------------- + def onTrade(self, trade): + # 多头开仓成交后,撤消空头委托 + if self.pos > 0: + self.cancelOrder(self.shortOrderID) + if self.buyOrderID in self.orderList: + self.orderList.remove(self.buyOrderID) + if self.shortOrderID in self.orderList: + self.orderList.remove(self.shortOrderID) + # 反之同样 + elif self.pos < 0: + self.cancelOrder(self.buyOrderID) + if self.buyOrderID in self.orderList: + self.orderList.remove(self.buyOrderID) + if self.shortOrderID in self.orderList: + self.orderList.remove(self.shortOrderID) + + # 发出状态更新事件 + self.putEvent() + + #---------------------------------------------------------------------- + def sendOcoOrder(self, buyPrice, shortPrice, volume): + """ + 发送OCO委托 + + OCO(One Cancel Other)委托: + 1. 主要用于实现区间突破入场 + 2. 包含两个方向相反的停止单 + 3. 一个方向的停止单成交后会立即撤消另一个方向的 + """ + # 发送双边的停止单委托,并记录委托号 + self.buyOrderID = self.buy(buyPrice, volume, True) + self.shortOrderID = self.short(shortPrice, volume, True) + + # 将委托号记录到列表中 + self.orderList.append(self.buyOrderID) + self.orderList.append(self.shortOrderID) + + +if __name__ == '__main__': + # 提供直接双击回测的功能 + # 导入PyQt4的包是为了保证matplotlib使用PyQt4而不是PySide,防止初始化出错 + from ctaBacktesting import * + from PyQt4 import QtCore, QtGui + + # 创建回测引擎 + engine = BacktestingEngine() + + # 设置引擎的回测模式为K线 + engine.setBacktestingMode(engine.BAR_MODE) + + # 设置回测用的数据起始日期 + engine.setStartDate('20120101') + + # 设置产品相关参数 + engine.setSlippage(0.2) # 股指1跳 + engine.setRate(0.3/10000) # 万0.3 + engine.setSize(300) # 股指合约大小 + + # 设置使用的历史数据库 + engine.setDatabase(MINUTE_DB_NAME, 'IF0000') + + # 在引擎中创建策略对象 + d = {} + engine.initStrategy(KkStrategy, d) + + # 开始跑回测 + engine.runBacktesting() + + # 显示回测结果 + engine.showBacktestingResult() \ No newline at end of file diff --git a/vn.trader/lhangGateway/vnlhang.py b/vn.trader/lhangGateway/vnlhang.py index 742a9184..70245d69 100644 --- a/vn.trader/lhangGateway/vnlhang.py +++ b/vn.trader/lhangGateway/vnlhang.py @@ -70,7 +70,9 @@ class LhangApi(object): def exit(self): """退出""" self.active = False - self.reqThread.join() + + if self.reqThread.isAlive(): + self.reqThread.join() #---------------------------------------------------------------------- def processRequest(self, req):