vnpy/trader/app/ctaStrategy/utilArbTickLoader.py

356 lines
13 KiB
Python
Raw Normal View History

# encoding: UTF-8
import os
import cPickle
import csv
import logging
import pandas
import copy
from datetime import datetime, timedelta
from ctaBase import *
class UtilArbTickLoader(object):
"""一个套利tick的数据加载工具类"""
# ----------------------------------------------------------------------
def __init__(self, ticksFolder, symbol):
# tick 存放的文件系统路径
if not ticksFolder:
self.ticksFolder = u'z:\\ticks'
else:
self.ticksFolder = ticksFolder
self.symbol = symbol
def writeCtaLog(self, content):
"""记录日志"""
# log = str(self.dt) + ' ' + content
# self.logList.append(log)
# 写入本地log日志
logging.info(content)
def writeCtaError(self, content):
"""记录异常"""
self.output(content)
self.writeCtaLog(content)
def output(self, content):
"""输出内容"""
print str(datetime.now()) + "\t" + content
# ----------------------------------------------------------------------
def loadArbTicks(self, mainPath, dtDate, leg1Symbol, leg2Symbol):
self.writeCtaLog(u'加载日期:{0}\{1}的价差tick'.format(mainPath, dtDate))
cachefilename = u'{0}_{1}_{2}_{3}_{4}'.\
format(self.symbol, leg1Symbol, leg2Symbol, mainPath, dtDate.strftime('%Y%m%d'))
arbTicks = self.__loadArbTicksFromLocalCache(cachefilename)
dt = None
if len(arbTicks) < 1:
leg1File = self.ticksFolder + u'\\{0}\\{1}\\{2}\\{3}\\{4}.txt' \
.format(mainPath, dtDate.strftime('%Y%m'), self.symbol, dtDate.strftime('%m%d'), leg1Symbol)
if not os.path.isfile(leg1File):
self.writeCtaLog(u'{0}文件不存在'.format(leg1File))
return []
leg2File = self.ticksFolder + u'\\{0}\\{1}\\{2}\\{3}\\{4}.txt' \
.format(mainPath, dtDate.strftime('%Y%m'), self.symbol, dtDate.strftime('%m%d'), leg2Symbol)
if not os.path.isfile(leg2File):
self.writeCtaLog(u'{0}文件不存在'.format(leg2File))
return []
# 先读取leg2的数据到目录以日期时间为key
leg2Ticks = {}
leg2CsvReadFile = file(leg2File, 'rb')
#reader = csv.DictReader((line.replace('\0',' ') for line in leg2CsvReadFile), delimiter=",")
reader = csv.DictReader(leg2CsvReadFile, delimiter=",")
self.writeCtaLog(u'加载{0}'.format(leg2File))
for row in reader:
tick = CtaTickData()
tick.vtSymbol = self.symbol
tick.symbol = self.symbol
tick.date = dtDate.strftime('%Y%m%d')
tick.tradingDay = tick.date
tick.time = row['Time']
try:
tick.datetime = datetime.strptime(tick.date + ' ' + tick.time, '%Y%m%d %H:%M:%S.%f')
except Exception as ex:
self.writeCtaError(u'日期转换错误:{0},{1}:{2}'.format(tick.date + ' ' + tick.time, Exception, ex))
continue
# 修正毫秒
if tick.datetime.replace(microsecond = 0) == dt:
# 与上一个tick的时间去除毫秒后相同,修改为500毫秒
tick.datetime=tick.datetime.replace(microsecond = 500)
tick.time = tick.datetime.strftime('%H:%M:%S.%f')
else:
tick.datetime = tick.datetime.replace(microsecond=0)
tick.time = tick.datetime.strftime('%H:%M:%S.%f')
dt = tick.datetime
tick.lastPrice = float(row['LastPrice'])
tick.volume = int(float(row['LVolume']))
tick.bidPrice1 = float(row['BidPrice']) # 叫买价(价格低)
tick.bidVolume1 = int(float(row['BidVolume']))
tick.askPrice1 = float(row['AskPrice']) # 叫卖价(价格高)
tick.askVolume1 = int(float(row['AskVolume']))
# 排除涨停/跌停的数据
if (tick.bidPrice1 == float('1.79769E308') and tick.bidVolume1 == 0) \
or (tick.askPrice1 == float('1.79769E308') and tick.askVolume1 == 0):
continue
dtStr = tick.date + ' ' + tick.time
if dtStr in leg2Ticks:
self.writeCtaError(u'日内数据重复,异常,数据时间为:{0}'.format(dtStr))
else:
leg2Ticks[dtStr] = tick
leg1CsvReadFile = file(leg1File, 'rb')
#reader = csv.DictReader((line.replace('\0',' ') for line in leg1CsvReadFile), delimiter=",")
reader = csv.DictReader(leg1CsvReadFile, delimiter=",")
self.writeCtaLog(u'加载{0}'.format(leg1File))
dt = None
for row in reader:
arbTick = CtaTickData()
arbTick.date = dtDate.strftime('%Y%m%d')
arbTick.time = row['Time']
try:
arbTick.datetime = datetime.strptime(arbTick.date + ' ' + arbTick.time, '%Y%m%d %H:%M:%S.%f')
except Exception as ex:
self.writeCtaError(u'日期转换错误:{0},{1}:{2}'.format(arbTick.date + ' ' + arbTick.time, Exception, ex))
continue
# 修正毫秒
if arbTick.datetime.replace(microsecond=0) == dt:
# 与上一个tick的时间去除毫秒后相同,修改为500毫秒
arbTick.datetime = arbTick.datetime.replace(microsecond=500)
arbTick.time = arbTick.datetime.strftime('%H:%M:%S.%f')
else:
arbTick.datetime = arbTick.datetime.replace(microsecond=0)
arbTick.time = arbTick.datetime.strftime('%H:%M:%S.%f')
dt = arbTick.datetime
dtStr = ' '.join([arbTick.date, arbTick.time])
if dtStr in leg2Ticks:
leg2Tick = leg2Ticks[dtStr]
arbTick.vtSymbol = self.symbol
arbTick.symbol = self.symbol
arbTick.lastPrice = EMPTY_FLOAT
arbTick.volume = EMPTY_INT
leg1AskPrice1 = float(row['AskPrice'])
leg1AskVolume1 = int(float(row['AskVolume']))
leg1BidPrice1 = float(row['BidPrice'])
leg1BidVolume1 = int(float(row['BidVolume']))
# 排除涨停/跌停的数据
if ((leg1AskPrice1 == float('1.79769E308') or leg1AskPrice1 == 0) and leg1AskVolume1 == 0) \
or ((leg1BidPrice1 == float('1.79769E308') or leg1BidPrice1 == 0) and leg1BidVolume1 == 0):
continue
# 叫卖价差=leg1.askPrice1 - leg2.bidPrice1volume为两者最小
arbTick.askPrice1 = leg1AskPrice1 - leg2Tick.bidPrice1
arbTick.askVolume1 = min(leg1AskVolume1, leg2Tick.bidVolume1)
# 叫买价差=leg1.bidPrice1 - leg2.askPrice1volume为两者最小
arbTick.bidPrice1 = leg1BidPrice1 - leg2Tick.askPrice1
arbTick.bidVolume1 = min(leg1BidVolume1, leg2Tick.askVolume1)
arbTicks.append(arbTick)
del leg2Ticks[dtStr]
# 保存到历史目录
if len(arbTicks) > 0:
self.__saveArbTicksToLocalCache(cachefilename, arbTicks)
return arbTicks
def __loadArbTicksFromLocalCache(self, filename):
"""从本地缓存中,加载数据"""
# 运行路径下cache子目录
cacheFolder = os.getcwd() + '/cache'
# cache文件
cacheFile = u'{0}/{1}.pickle'. \
format(cacheFolder, filename)
if not os.path.isfile(cacheFile):
return []
else:
# 从cache文件加载
cache = open(cacheFile, mode='r')
l = cPickle.load(cache)
cache.close()
return l
def __saveArbTicksToLocalCache(self, filename, arbticks):
"""保存价差tick到本地缓存目录"""
# 运行路径下cache子目录
cacheFolder = os.getcwd() + '/cache'
# 创建cache子目录
if not os.path.isdir(cacheFolder):
os.mkdir(cacheFolder)
# cache 文件名
cacheFile = u'{0}/{1}.pickle'. \
format(cacheFolder, filename)
# 重复存在 返回
if os.path.isfile(cacheFile):
return False
else:
# 写入cache文件
cache = open(cacheFile, mode='w')
cPickle.dump(arbticks, cache)
cache.close()
return True
def convert_to_dataframe(self, ticklist):
"""转换为dataframe格式"""
variables = ['date', 'time', 'askPrice1', 'askVolume1', 'bidPrice1', 'bidVolume1']
dataframe = pandas.DataFrame([[getattr(i, j) for j in variables] for i in ticklist], columns=variables)
return dataframe
def __saveArbTicksToLocalCsv(self, filename, df):
"""保存为本地csv文件"""
# 运行路径下cache子目录
cacheFolder = os.getcwd() + '/cache'
# 创建cache子目录
if not os.path.isdir(cacheFolder):
os.mkdir(cacheFolder)
# cache 文件名
cacheFile = u'{0}/{1}.csv'. \
format(cacheFolder, filename)
# 重复存在 返回
if os.path.isfile(cacheFile):
return False
df.to_csv(cacheFile, index=False)
def __loadArbTicksFromLocalCsv(self ,filename):
"""从本地缓存csv中加载数据"""
# 运行路径下cache子目录
cacheFolder = os.getcwd() + '/cache'
# cache文件
cacheFile = u'{0}/{1}.csv'. \
format(cacheFolder, filename)
if not os.path.isfile(cacheFile):
return None
else:
# 从cache文件加载
df = pandas.read_csv(cacheFile)
return df
def loadDataFrame(self, mainPath, dtDate, leg1Symbol, leg2Symbol):
self.writeCtaLog(u'加载日期:{0}\{1}的价差tick dataframe'.format(mainPath, dtDate))
cachefilename = u'{0}_{1}_{2}_{3}_{4}'. \
format(self.symbol, leg1Symbol, leg2Symbol, mainPath, dtDate.strftime('%Y%m%d'))
df = self.__loadArbTicksFromLocalCsv(cachefilename)
if df is None:
ticks = self.loadArbTicks(mainPath, dtDate, leg1Symbol, leg2Symbol)
if len(ticks) >0:
df = self.convert_to_dataframe(ticks)
self.__saveArbTicksToLocalCsv(cachefilename,df)
return df
def loadDataFrame2(self, mainPath, beginDate, endDate, leg1Symbol, leg2Symbol):
dayIntervals = (endDate- beginDate).days
if dayIntervals < 1:
self.writeCtaLog(u'时间不足')
return None
df = None
cachefilename = u'{0}_{1}_{2}_{3}_{4}'. \
format(self.symbol, leg1Symbol, leg2Symbol, beginDate.strftime('%Y%m%d'), endDate.strftime('%Y%m%d'))
df = self.__loadArbTicksFromLocalCsv(cachefilename)
if df is not None:
return df
for i in range(0, dayIntervals):
getDate = beginDate + timedelta(days=i)
self.output(u'取数据日期:{0}'.format(getDate))
df1,df2 =None,None
# 白天数据
df1 = self.loadDataFrame(mainPath, getDate, leg1Symbol, leg2Symbol)
if df1 is not None and df is None:
df = copy.copy(df1)
self.output(u'数据{0}'.format(len(df)))
elif df1 is not None and df is not None:
df = pandas.concat([df,df1], ignore_index=True)
self.output(u'数据增加{0}行,共{1}'.format(len(df1),len(df)))
# 夜盘数据
df2 = self.loadDataFrame(mainPath + '_night', getDate, leg1Symbol, leg2Symbol)
if df2 is not None and df is None:
df = copy.copy(df2)
self.output(u'数据{0}'.format(len(df)))
elif df2 is not None and df is not None:
df = pandas.concat([df,df2], ignore_index=True)
self.output(u'数据增加{0}行,共{1}'.format(len(df2), len(df)))
self.__saveArbTicksToLocalCsv(cachefilename, df)
return df
if __name__ == '__main__':
loader = UtilArbTickLoader(ticksFolder='z://ticks', symbol='RB')
"""只取tick对象队列"""
#ticks = loader.loadArbTicks(mainPath='SHFE',dtDate=datetime.strptime('20140801', '%Y%m%d'),
# leg1Symbol='RB1501', leg2Symbol='RB1505')
#df = loader.convert_to_dataframe(ticks)
"""取单日dataframe"""
#df = loader.loadDataFrame(mainPath='SHFE',dtDate=datetime.strptime('20140801', '%Y%m%d'),
# leg1Symbol='RB1501', leg2Symbol='RB1505')
"""取一段日期内的dataframe"""
df = loader.loadDataFrame2(mainPath='SHFE', beginDate=datetime.strptime('20140801', '%Y%m%d'),
endDate=datetime.strptime('20141030', '%Y%m%d'),
leg1Symbol='RB1501', leg2Symbol='RB1505')
#print df