mongodb=>mysqldb

This commit is contained in:
msincenselee 2015-09-28 15:34:32 +08:00
parent 0d1c787389
commit e5de7cba31

View File

@ -12,7 +12,7 @@ print u'demoStrategy.py import pymongo.errors.* success'
from eventEngine import *
print u'demoStrategy.py import eventEngine.* success'
import MySQLdb
# 常量定义
@ -175,22 +175,32 @@ class StrategyEngine(object):
# value为该合约相关的停止单列表
self.__dictStopOrder = {}
# MongoDB数据库相关
self.__mongoConnected = False
self.__mongoConnection = None
self.__mongoTickDB = None
# MongoDB/Mysql数据库相关
#self.__mongoConnected = False
self.__mysqlConnected = False
#self.__mongoConnection = None
self.__mysqlConnection = None
#self.__mongoTickDB = None
# 调用函数
self.__connectMongo()
#self.__connectMongo()
self.__connectMysql()
self.__registerEvent()
#----------------------------------------------------------------------
def createStrategy(self, strategyName, strategySymbol, strategyClass, strategySetting):
"""创建策略"""
strategy = strategyClass(strategyName, strategySymbol, self)
self.writeLog(u"创建策略:{0}".format(strategyName))
self.dictStrategy[strategyName] = strategy
strategy.loadSetting(strategySetting)
# 订阅合约行情注意这里因为是CTP所以ExchangeID可以忽略
self.mainEngine.subscribe(strategySymbol, None)
@ -198,88 +208,193 @@ class StrategyEngine(object):
self.registerStrategy(strategySymbol, strategy)
#----------------------------------------------------------------------
def __connectMongo(self):
"""连接MongoDB数据库"""
try:
self.__mongoConnection = Connection()
self.__mongoConnected = True
self.__mongoTickDB = self.__mongoConnection['TickDB']
self.writeLog(u'策略引擎连接MongoDB成功')
except ConnectionFailure:
self.writeLog(u'策略引擎连接MongoDB失败')
#def __connectMongo(self):
# """连接MongoDB数据库"""
# try:
# self.__mongoConnection = Connection()
# self.__mongoConnected = True
# self.__mongoTickDB = self.__mongoConnection['TickDB']
# self.writeLog(u'策略引擎连接MongoDB成功')
# except ConnectionFailure:
# self.writeLog(u'策略引擎连接MongoDB失败')
#-------------------------------------------#---------------------------
# def __recordTickToMongo(self, data):
# """将Tick数据插入到MongoDB中"""
# if self.__mongoConnected:
# symbol = data['InstrumentID']
# data['date'] = self.today
# self.__mongoTickDB[symbol].insert(data)
# #
# #----------------------------------------------------------------------
# def loadTickFromMongo(self, symbol, startDate, endDate=None):
# """从MongoDB中读取Tick数据"""
# if self.__mongoConnected:
# collection = self.__mongoTickDB[symbol]
#
# # 如果输入了读取TICK的最后日期
# if endDate:
# cx = collection.find({'date':{'$gte':startDate, '$lte':endDate}})
# else:
# cx = collection.find({'date':{'$gte':startDate}})
# return cx
# else:
# return None
#----------------------------------------------------------------------
def __recordTick(self, data):
"""将Tick数据插入到MongoDB中"""
if self.__mongoConnected:
symbol = data['InstrumentID']
data['date'] = self.today
self.__mongoTickDB[symbol].insert(data)
def __connectMysql(self):
"""连接MysqlDB"""
try:
self.__mysqlConnection = MySQLdb.connect(host='vnpy.cloudapp.net', user='stockcn', passwd='7uhb*IJN', db='stockcn', port=3306)
self.__mysqlConnected = True
self.writeLog(u'策略引擎连接MysqlDB成功')
except ConnectionFailure:
self.writeLog(u'策略引擎连接MysqlDB失败')
#----------------------------------------------------------------------
def __recordTickToMysql(self, data):
"""将Tick数据插入到MysqlDB中"""
#if self.__mongoConnected:
# symbol = data['InstrumentID']
# data['date'] = self.today
# self.__mongoTickDB[symbol].insert(data)
pass;
#----------------------------------------------------------------------
def loadTick(self, symbol, startDate, endDate=None):
"""从MongoDB中读取Tick数据"""
if self.__mongoConnected:
collection = self.__mongoTickDB[symbol]
# 如果输入了读取TICK的最后日期
def loadTickFromMysql(self, symbol, startDate, endDate=None):
"""从MysqlDB中读取Tick数据"""
if self.__mysqlConnected:
#获取指针
cur = self.__mysqlConnection.cursor(MySQLdb.cursors.DictCursor)
if endDate:
cx = collection.find({'date':{'$gte':startDate, '$lte':endDate}})
#指定开始与结束日期
sqlstring = ' select \'{0}\' as InstrumentID, str_to_date(concat(ndate,\' \', ntime),' \
'\'%Y-%m-%d %H:%i:%s\') as UpdateTime,price as LastPrice,vol as Volume,' \
'position_vol as OpenInterest,bid1_price as BidPrice1,bid1_vol as BidVolume1, ' \
'sell1_price as AskPrice1, sell1_vol as AskVolume1 from TB_{0}MI ' \
'where ndate between cast(\'{1}\' as date) and cast(\'{2}\' as date)'.format(symbol, startDate, endDate)
elif startDate:
#指定开始日期
sqlstring = ' select \'{0}\' as InstrumentID,str_to_date(concat(ndate,\' \', ntime),' \
'\'%Y-%m-%d %H:%i:%s\') as UpdateTime,price as LastPrice,vol as Volume,' \
'position_vol as OpenInterest,bid1_price as BidPrice1,bid1_vol as BidVolume1, ' \
'sell1_price as AskPrice1, sell1_vol as AskVolume1 from TB__{0}MI ' \
'where ndate > cast(\'{1}\' as date)'.format( symbol, startDate)
else:
cx = collection.find({'date':{'$gte':startDate}})
#没有指定,所有日期数据
sqlstring =' select \'{0}\' as InstrumentID,str_to_date(concat(ndate,\' \', ntime),' \
'\'%Y-%m-%d %H:%i:%s\') as UpdateTime,price as LastPrice,vol as Volume,' \
'position_vol as OpenInterest,bid1_price as BidPrice1,bid1_vol as BidVolume1, ' \
'sell1_price as AskPrice1, sell1_vol as AskVolume1 from TB__{0}MI '.format(symbol)
self.writeLog(sqlstring)
count = cur.execute(sqlstring)
cx = cur.fetchall()
self.writeLog(u'历史TICK数据载入完成{0}'.format(count))
return cx
else:
return None
return None
#----------------------------------------------------------------------
def getMysqlDeltaDate(self,symbol, startDate, decreaseDays):
try:
if self.__mysqlConnected:
#获取指针
cur = self.__mysqlConnection.cursor()
sqlstring='select distinct ndate from TB_{0}MI where ndate < ' \
'cast(\'{1}\' as date) order by ndate desc limit {2},1'.format(symbol, startDate, decreaseDays-1)
self.writeLog(sqlstring)
count = cur.execute(sqlstring)
if count > 0:
result = cur.fetchone()
return result[0]
else:
self.writeLog(u'MysqlDB没有查询结果请检查日期')
else:
self.writeLog(u'MysqlDB未连接请检查')
except MySQLdb.Error, e:
self.writeLog(u'MysqlDB载入数据失败请检查.Error {0}: {1}'.format(e.arg[0],e.arg[1]))
td = timedelta(days=3)
return startDate-td;
#----------------------------------------------------------------------
def updateMarketData(self, event):
"""行情更新"""
data = event.dict_['data']
#InstrumentID, UpdateTime, LastPrice, Volume, OpenInterest, BidPrice1, BidVolume1, AskPrice1, AskVolume1 = data
symbol = data['InstrumentID']
#symbol = InstrumentID
# 检查是否存在交易该合约的策略
if symbol in self.__dictSymbolStrategy:
# 创建TICK数据对象并更新数据
tick = Tick(symbol)
tick.openPrice = data['OpenPrice']
tick.highPrice = data['HighestPrice']
tick.lowPrice = data['LowestPrice']
tick.lastPrice = data['LastPrice']
#tick.openPrice = data['OpenPrice']
#tick.highPrice = data['HighestPrice']
#tick.lowPrice = data['LowestPrice']
tick.lastPrice = float(data['LastPrice'])
#tick.lastPrice = LastPrice
tick.volume = data['Volume']
tick.openInterest = data['OpenInterest']
tick.upperLimit = data['UpperLimitPrice']
tick.lowerLimit = data['LowerLimitPrice']
#tick.volume = Volume
#tick.openInterest = OpenInterest
#tick.upperLimit = data['UpperLimitPrice']
#tick.lowerLimit = data['LowerLimitPrice']
tick.time = data['UpdateTime']
tick.ms = data['UpdateMillisec']
tick.bidPrice1 = data['BidPrice1']
tick.bidPrice2 = data['BidPrice2']
tick.bidPrice3 = data['BidPrice3']
tick.bidPrice4 = data['BidPrice4']
tick.bidPrice5 = data['BidPrice5']
tick.askPrice1 = data['AskPrice1']
tick.askPrice2 = data['AskPrice2']
tick.askPrice3 = data['AskPrice3']
tick.askPrice4 = data['AskPrice4']
tick.askPrice5 = data['AskPrice5']
#tick.time = UpdateTime
#tick.ms = data['UpdateMillisec']
tick.bidPrice1 = float(data['BidPrice1'])
#tick.bidPrice2 = data['BidPrice2']
#tick.bidPrice3 = data['BidPrice3']
#tick.bidPrice4 = data['BidPrice4']
#tick.bidPrice5 = data['BidPrice5']
#tick.bidPrice1 = BidPrice1
tick.askPrice1 = float(data['AskPrice1'])
#tick.askPrice2 = data['AskPrice2']
#tick.askPrice3 = data['AskPrice3']
#tick.askPrice4 = data['AskPrice4']
#tick.askPrice5 = data['AskPrice5']
#tick.askPrice1 = AskPrice1
tick.bidVolume1 = data['BidVolume1']
tick.bidVolume2 = data['BidVolume2']
tick.bidVolume3 = data['BidVolume3']
tick.bidVolume4 = data['BidVolume4']
tick.bidVolume5 = data['BidVolume5']
#tick.bidVolume2 = data['BidVolume2']
#tick.bidVolume3 = data['BidVolume3']
#tick.bidVolume4 = data['BidVolume4']
#tick.bidVolume5 = data['BidVolume5']
#tick.bidVolume1 = BidVolume1
tick.askVolume1 = data['AskVolume1']
tick.askVolume2 = data['AskVolume2']
tick.askVolume3 = data['AskVolume3']
tick.askVolume4 = data['AskVolume4']
tick.askVolume5 = data['AskVolume5']
#tick.askVolume2 = data['AskVolume2']
#tick.askVolume3 = data['AskVolume3']
#tick.askVolume4 = data['AskVolume4']
#tick.askVolume5 = data['AskVolume5']
#tick.askVolume1 = AskVolume1
# 首先检查停止单是否需要发出
self.__processStopOrder(tick)
@ -287,10 +402,10 @@ class StrategyEngine(object):
for strategy in self.__dictSymbolStrategy[symbol]:
strategy.onTick(tick)
# 将数据插入MongoDB数据库实盘建议另开程序记录TICK数据
# 将数据插入MongoDB/Mysql数据库实盘建议另开程序记录TICK数据
if not self.backtesting:
self.__recordTick(data)
#self.__recordTickToMongo(data)
self.__recordTickToMysql(data)
#----------------------------------------------------------------------
def __processStopOrder(self, tick):
"""处理停止单"""
@ -504,12 +619,15 @@ class StrategyEngine(object):
#----------------------------------------------------------------------
def startAll(self):
"""启动所有策略"""
print( u'启动所有策略')
for strategy in self.dictStrategy.values():
strategy.start()
#----------------------------------------------------------------------
def stopAll(self):
"""停止所有策略"""
print(u'停止所有策略')
for strategy in self.dictStrategy.values():
strategy.stop()