From e5de7cba31a44e64dcfd0ac406c1cb5f2305f38a Mon Sep 17 00:00:00 2001 From: msincenselee Date: Mon, 28 Sep 2015 15:34:32 +0800 Subject: [PATCH] mongodb=>mysqldb --- vn.strategy/strategydemo/strategyEngine.py | 256 +++++++++++++++------ 1 file changed, 187 insertions(+), 69 deletions(-) diff --git a/vn.strategy/strategydemo/strategyEngine.py b/vn.strategy/strategydemo/strategyEngine.py index 2a7bcf43..f304d80d 100644 --- a/vn.strategy/strategydemo/strategyEngine.py +++ b/vn.strategy/strategydemo/strategyEngine.py @@ -12,7 +12,7 @@ print u'demoStrategy.py import pymongo.errors.* success' from eventEngine import * print u'demoStrategy.py import eventEngine.* success' - +import MySQLdb # 常量定义 @@ -175,22 +175,32 @@ class StrategyEngine(object): # value为该合约相关的停止单列表 self.__dictStopOrder = {} - # MongoDB数据库相关 - self.__mongoConnected = False - self.__mongoConnection = None - self.__mongoTickDB = None + # MongoDB/Mysql数据库相关 + #self.__mongoConnected = False + + self.__mysqlConnected = False + + #self.__mongoConnection = None + self.__mysqlConnection = None + + #self.__mongoTickDB = None # 调用函数 - self.__connectMongo() + #self.__connectMongo() + self.__connectMysql() self.__registerEvent() #---------------------------------------------------------------------- def createStrategy(self, strategyName, strategySymbol, strategyClass, strategySetting): """创建策略""" strategy = strategyClass(strategyName, strategySymbol, self) + + self.writeLog(u"创建策略:{0}".format(strategyName)) + self.dictStrategy[strategyName] = strategy + strategy.loadSetting(strategySetting) - + # 订阅合约行情,注意这里因为是CTP,所以ExchangeID可以忽略 self.mainEngine.subscribe(strategySymbol, None) @@ -198,88 +208,193 @@ class StrategyEngine(object): self.registerStrategy(strategySymbol, strategy) #---------------------------------------------------------------------- - def __connectMongo(self): - """连接MongoDB数据库""" - try: - self.__mongoConnection = Connection() - self.__mongoConnected = True - self.__mongoTickDB = self.__mongoConnection['TickDB'] - self.writeLog(u'策略引擎连接MongoDB成功') - except ConnectionFailure: - self.writeLog(u'策略引擎连接MongoDB失败') + #def __connectMongo(self): + # """连接MongoDB数据库""" + # try: + # self.__mongoConnection = Connection() + # self.__mongoConnected = True + # self.__mongoTickDB = self.__mongoConnection['TickDB'] + # self.writeLog(u'策略引擎连接MongoDB成功') + # except ConnectionFailure: + # self.writeLog(u'策略引擎连接MongoDB失败') + + #-------------------------------------------#--------------------------- + # def __recordTickToMongo(self, data): + # """将Tick数据插入到MongoDB中""" + # if self.__mongoConnected: + # symbol = data['InstrumentID'] + # data['date'] = self.today + # self.__mongoTickDB[symbol].insert(data) + # # + # #---------------------------------------------------------------------- + # def loadTickFromMongo(self, symbol, startDate, endDate=None): + # """从MongoDB中读取Tick数据""" + # if self.__mongoConnected: + # collection = self.__mongoTickDB[symbol] + # + # # 如果输入了读取TICK的最后日期 + # if endDate: + # cx = collection.find({'date':{'$gte':startDate, '$lte':endDate}}) + # else: + # cx = collection.find({'date':{'$gte':startDate}}) + # return cx + # else: + # return None #---------------------------------------------------------------------- - def __recordTick(self, data): - """将Tick数据插入到MongoDB中""" - if self.__mongoConnected: - symbol = data['InstrumentID'] - data['date'] = self.today - self.__mongoTickDB[symbol].insert(data) - + def __connectMysql(self): + """连接MysqlDB""" + try: + self.__mysqlConnection = MySQLdb.connect(host='vnpy.cloudapp.net', user='stockcn', passwd='7uhb*IJN', db='stockcn', port=3306) + self.__mysqlConnected = True + self.writeLog(u'策略引擎连接MysqlDB成功') + except ConnectionFailure: + self.writeLog(u'策略引擎连接MysqlDB失败') + #---------------------------------------------------------------------- + def __recordTickToMysql(self, data): + """将Tick数据插入到MysqlDB中""" + #if self.__mongoConnected: + # symbol = data['InstrumentID'] + # data['date'] = self.today + # self.__mongoTickDB[symbol].insert(data) + pass; #---------------------------------------------------------------------- - def loadTick(self, symbol, startDate, endDate=None): - """从MongoDB中读取Tick数据""" - if self.__mongoConnected: - collection = self.__mongoTickDB[symbol] - - # 如果输入了读取TICK的最后日期 + def loadTickFromMysql(self, symbol, startDate, endDate=None): + """从MysqlDB中读取Tick数据""" + if self.__mysqlConnected: + + #获取指针 + cur = self.__mysqlConnection.cursor(MySQLdb.cursors.DictCursor) + if endDate: - cx = collection.find({'date':{'$gte':startDate, '$lte':endDate}}) + #指定开始与结束日期 + sqlstring = ' select \'{0}\' as InstrumentID, str_to_date(concat(ndate,\' \', ntime),' \ + '\'%Y-%m-%d %H:%i:%s\') as UpdateTime,price as LastPrice,vol as Volume,' \ + 'position_vol as OpenInterest,bid1_price as BidPrice1,bid1_vol as BidVolume1, ' \ + 'sell1_price as AskPrice1, sell1_vol as AskVolume1 from TB_{0}MI ' \ + 'where ndate between cast(\'{1}\' as date) and cast(\'{2}\' as date)'.format(symbol, startDate, endDate) + + elif startDate: + #指定开始日期 + sqlstring = ' select \'{0}\' as InstrumentID,str_to_date(concat(ndate,\' \', ntime),' \ + '\'%Y-%m-%d %H:%i:%s\') as UpdateTime,price as LastPrice,vol as Volume,' \ + 'position_vol as OpenInterest,bid1_price as BidPrice1,bid1_vol as BidVolume1, ' \ + 'sell1_price as AskPrice1, sell1_vol as AskVolume1 from TB__{0}MI ' \ + 'where ndate > cast(\'{1}\' as date)'.format( symbol, startDate) + else: - cx = collection.find({'date':{'$gte':startDate}}) + #没有指定,所有日期数据 + sqlstring =' select \'{0}\' as InstrumentID,str_to_date(concat(ndate,\' \', ntime),' \ + '\'%Y-%m-%d %H:%i:%s\') as UpdateTime,price as LastPrice,vol as Volume,' \ + 'position_vol as OpenInterest,bid1_price as BidPrice1,bid1_vol as BidVolume1, ' \ + 'sell1_price as AskPrice1, sell1_vol as AskVolume1 from TB__{0}MI '.format(symbol) + + self.writeLog(sqlstring) + + count = cur.execute(sqlstring) + + cx = cur.fetchall() + + self.writeLog(u'历史TICK数据载入完成,共{0}条'.format(count)) + return cx else: - return None + return None + + #---------------------------------------------------------------------- + def getMysqlDeltaDate(self,symbol, startDate, decreaseDays): + try: + if self.__mysqlConnected: + + #获取指针 + cur = self.__mysqlConnection.cursor() + + sqlstring='select distinct ndate from TB_{0}MI where ndate < ' \ + 'cast(\'{1}\' as date) order by ndate desc limit {2},1'.format(symbol, startDate, decreaseDays-1) + + self.writeLog(sqlstring) + + count = cur.execute(sqlstring) + + if count > 0: + + result = cur.fetchone() + + return result[0] + + else: + self.writeLog(u'MysqlDB没有查询结果,请检查日期') + + else: + self.writeLog(u'MysqlDB未连接,请检查') + + except MySQLdb.Error, e: + self.writeLog(u'MysqlDB载入数据失败,请检查.Error {0}: {1}'.format(e.arg[0],e.arg[1])) + + td = timedelta(days=3) + + return startDate-td; #---------------------------------------------------------------------- def updateMarketData(self, event): """行情更新""" data = event.dict_['data'] + #InstrumentID, UpdateTime, LastPrice, Volume, OpenInterest, BidPrice1, BidVolume1, AskPrice1, AskVolume1 = data + symbol = data['InstrumentID'] + #symbol = InstrumentID # 检查是否存在交易该合约的策略 if symbol in self.__dictSymbolStrategy: # 创建TICK数据对象并更新数据 tick = Tick(symbol) - - tick.openPrice = data['OpenPrice'] - tick.highPrice = data['HighestPrice'] - tick.lowPrice = data['LowestPrice'] - tick.lastPrice = data['LastPrice'] - + + #tick.openPrice = data['OpenPrice'] + #tick.highPrice = data['HighestPrice'] + #tick.lowPrice = data['LowestPrice'] + tick.lastPrice = float(data['LastPrice']) + #tick.lastPrice = LastPrice + tick.volume = data['Volume'] tick.openInterest = data['OpenInterest'] - - tick.upperLimit = data['UpperLimitPrice'] - tick.lowerLimit = data['LowerLimitPrice'] - + #tick.volume = Volume + #tick.openInterest = OpenInterest + + #tick.upperLimit = data['UpperLimitPrice'] + #tick.lowerLimit = data['LowerLimitPrice'] + tick.time = data['UpdateTime'] - tick.ms = data['UpdateMillisec'] - - tick.bidPrice1 = data['BidPrice1'] - tick.bidPrice2 = data['BidPrice2'] - tick.bidPrice3 = data['BidPrice3'] - tick.bidPrice4 = data['BidPrice4'] - tick.bidPrice5 = data['BidPrice5'] - - tick.askPrice1 = data['AskPrice1'] - tick.askPrice2 = data['AskPrice2'] - tick.askPrice3 = data['AskPrice3'] - tick.askPrice4 = data['AskPrice4'] - tick.askPrice5 = data['AskPrice5'] - + #tick.time = UpdateTime + #tick.ms = data['UpdateMillisec'] + + tick.bidPrice1 = float(data['BidPrice1']) + #tick.bidPrice2 = data['BidPrice2'] + #tick.bidPrice3 = data['BidPrice3'] + #tick.bidPrice4 = data['BidPrice4'] + #tick.bidPrice5 = data['BidPrice5'] + #tick.bidPrice1 = BidPrice1 + + tick.askPrice1 = float(data['AskPrice1']) + #tick.askPrice2 = data['AskPrice2'] + #tick.askPrice3 = data['AskPrice3'] + #tick.askPrice4 = data['AskPrice4'] + #tick.askPrice5 = data['AskPrice5'] + #tick.askPrice1 = AskPrice1 + tick.bidVolume1 = data['BidVolume1'] - tick.bidVolume2 = data['BidVolume2'] - tick.bidVolume3 = data['BidVolume3'] - tick.bidVolume4 = data['BidVolume4'] - tick.bidVolume5 = data['BidVolume5'] - + #tick.bidVolume2 = data['BidVolume2'] + #tick.bidVolume3 = data['BidVolume3'] + #tick.bidVolume4 = data['BidVolume4'] + #tick.bidVolume5 = data['BidVolume5'] + #tick.bidVolume1 = BidVolume1 + tick.askVolume1 = data['AskVolume1'] - tick.askVolume2 = data['AskVolume2'] - tick.askVolume3 = data['AskVolume3'] - tick.askVolume4 = data['AskVolume4'] - tick.askVolume5 = data['AskVolume5'] - + #tick.askVolume2 = data['AskVolume2'] + #tick.askVolume3 = data['AskVolume3'] + #tick.askVolume4 = data['AskVolume4'] + #tick.askVolume5 = data['AskVolume5'] + #tick.askVolume1 = AskVolume1 + # 首先检查停止单是否需要发出 self.__processStopOrder(tick) @@ -287,10 +402,10 @@ class StrategyEngine(object): for strategy in self.__dictSymbolStrategy[symbol]: strategy.onTick(tick) - # 将数据插入MongoDB数据库,实盘建议另开程序记录TICK数据 + # 将数据插入MongoDB/Mysql数据库,实盘建议另开程序记录TICK数据 if not self.backtesting: - self.__recordTick(data) - + #self.__recordTickToMongo(data) + self.__recordTickToMysql(data) #---------------------------------------------------------------------- def __processStopOrder(self, tick): """处理停止单""" @@ -504,12 +619,15 @@ class StrategyEngine(object): #---------------------------------------------------------------------- def startAll(self): """启动所有策略""" + print( u'启动所有策略') for strategy in self.dictStrategy.values(): + strategy.start() #---------------------------------------------------------------------- def stopAll(self): """停止所有策略""" + print(u'停止所有策略') for strategy in self.dictStrategy.values(): strategy.stop()