vnpy/vn.trader/ctaStrategy/utilArbTickLoader.py

357 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# encoding: UTF-8
import os
import cPickle
import csv
import logging
import pandas
import copy
from datetime import datetime, timedelta
from ctaBase import *
class UtilArbTickLoader(object):
"""一个套利tick的数据加载工具类"""
# ----------------------------------------------------------------------
def __init__(self, ticksFolder, symbol):
# tick 存放的文件系统路径
if not ticksFolder:
self.ticksFolder = u'z:\\ticks'
else:
self.ticksFolder = ticksFolder
self.symbol = symbol
def writeCtaLog(self, content):
"""记录日志"""
# log = str(self.dt) + ' ' + content
# self.logList.append(log)
# 写入本地log日志
logging.info(content)
def writeCtaError(self, content):
"""记录异常"""
self.output(content)
self.writeCtaLog(content)
def output(self, content):
"""输出内容"""
print str(datetime.now()) + "\t" + content
# ----------------------------------------------------------------------
def loadArbTicks(self, mainPath, dtDate, leg1Symbol, leg2Symbol):
self.writeCtaLog(u'加载日期:{0}\{1}的价差tick'.format(mainPath, dtDate))
cachefilename = u'{0}_{1}_{2}_{3}_{4}'.\
format(self.symbol, leg1Symbol, leg2Symbol, mainPath, dtDate.strftime('%Y%m%d'))
arbTicks = self.__loadArbTicksFromLocalCache(cachefilename)
dt = None
if len(arbTicks) < 1:
leg1File = self.ticksFolder + u'\\{0}\\{1}\\{2}\\{3}\\{4}.txt' \
.format(mainPath, dtDate.strftime('%Y%m'), self.symbol, dtDate.strftime('%m%d'), leg1Symbol)
if not os.path.isfile(leg1File):
self.writeCtaLog(u'{0}文件不存在'.format(leg1File))
return []
leg2File = self.ticksFolder + u'\\{0}\\{1}\\{2}\\{3}\\{4}.txt' \
.format(mainPath, dtDate.strftime('%Y%m'), self.symbol, dtDate.strftime('%m%d'), leg2Symbol)
if not os.path.isfile(leg2File):
self.writeCtaLog(u'{0}文件不存在'.format(leg2File))
return []
# 先读取leg2的数据到目录以日期时间为key
leg2Ticks = {}
leg2CsvReadFile = file(leg2File, 'rb')
#reader = csv.DictReader((line.replace('\0',' ') for line in leg2CsvReadFile), delimiter=",")
reader = csv.DictReader(leg2CsvReadFile, delimiter=",")
self.writeCtaLog(u'加载{0}'.format(leg2File))
for row in reader:
tick = CtaTickData()
tick.vtSymbol = self.symbol
tick.symbol = self.symbol
tick.date = dtDate.strftime('%Y%m%d')
tick.tradingDay = tick.date
tick.time = row['Time']
try:
tick.datetime = datetime.strptime(tick.date + ' ' + tick.time, '%Y%m%d %H:%M:%S.%f')
except Exception as ex:
self.writeCtaError(u'日期转换错误:{0},{1}:{2}'.format(tick.date + ' ' + tick.time, Exception, ex))
continue
# 修正毫秒
if tick.datetime.replace(microsecond = 0) == dt:
# 与上一个tick的时间去除毫秒后相同,修改为500毫秒
tick.datetime=tick.datetime.replace(microsecond = 500)
tick.time = tick.datetime.strftime('%H:%M:%S.%f')
else:
tick.datetime = tick.datetime.replace(microsecond=0)
tick.time = tick.datetime.strftime('%H:%M:%S.%f')
dt = tick.datetime
tick.lastPrice = float(row['LastPrice'])
tick.volume = int(float(row['LVolume']))
tick.bidPrice1 = float(row['BidPrice']) # 叫买价(价格低)
tick.bidVolume1 = int(float(row['BidVolume']))
tick.askPrice1 = float(row['AskPrice']) # 叫卖价(价格高)
tick.askVolume1 = int(float(row['AskVolume']))
# 排除涨停/跌停的数据
if (tick.bidPrice1 == float('1.79769E308') and tick.bidVolume1 == 0) \
or (tick.askPrice1 == float('1.79769E308') and tick.askVolume1 == 0):
continue
dtStr = tick.date + ' ' + tick.time
if dtStr in leg2Ticks:
self.writeCtaError(u'日内数据重复,异常,数据时间为:{0}'.format(dtStr))
else:
leg2Ticks[dtStr] = tick
leg1CsvReadFile = file(leg1File, 'rb')
#reader = csv.DictReader((line.replace('\0',' ') for line in leg1CsvReadFile), delimiter=",")
reader = csv.DictReader(leg1CsvReadFile, delimiter=",")
self.writeCtaLog(u'加载{0}'.format(leg1File))
dt = None
for row in reader:
arbTick = CtaTickData()
arbTick.date = dtDate.strftime('%Y%m%d')
arbTick.time = row['Time']
try:
arbTick.datetime = datetime.strptime(arbTick.date + ' ' + arbTick.time, '%Y%m%d %H:%M:%S.%f')
except Exception as ex:
self.writeCtaError(u'日期转换错误:{0},{1}:{2}'.format(arbTick.date + ' ' + arbTick.time, Exception, ex))
continue
# 修正毫秒
if arbTick.datetime.replace(microsecond=0) == dt:
# 与上一个tick的时间去除毫秒后相同,修改为500毫秒
arbTick.datetime = arbTick.datetime.replace(microsecond=500)
arbTick.time = arbTick.datetime.strftime('%H:%M:%S.%f')
else:
arbTick.datetime = arbTick.datetime.replace(microsecond=0)
arbTick.time = arbTick.datetime.strftime('%H:%M:%S.%f')
dt = arbTick.datetime
dtStr = ' '.join([arbTick.date, arbTick.time])
if dtStr in leg2Ticks:
leg2Tick = leg2Ticks[dtStr]
arbTick.vtSymbol = self.symbol
arbTick.symbol = self.symbol
arbTick.lastPrice = EMPTY_FLOAT
arbTick.volume = EMPTY_INT
leg1AskPrice1 = float(row['AskPrice'])
leg1AskVolume1 = int(float(row['AskVolume']))
leg1BidPrice1 = float(row['BidPrice'])
leg1BidVolume1 = int(float(row['BidVolume']))
# 排除涨停/跌停的数据
if ((leg1AskPrice1 == float('1.79769E308') or leg1AskPrice1 == 0) and leg1AskVolume1 == 0) \
or ((leg1BidPrice1 == float('1.79769E308') or leg1BidPrice1 == 0) and leg1BidVolume1 == 0):
continue
# 叫卖价差=leg1.askPrice1 - leg2.bidPrice1volume为两者最小
arbTick.askPrice1 = leg1AskPrice1 - leg2Tick.bidPrice1
arbTick.askVolume1 = min(leg1AskVolume1, leg2Tick.bidVolume1)
# 叫买价差=leg1.bidPrice1 - leg2.askPrice1volume为两者最小
arbTick.bidPrice1 = leg1BidPrice1 - leg2Tick.askPrice1
arbTick.bidVolume1 = min(leg1BidVolume1, leg2Tick.askVolume1)
arbTicks.append(arbTick)
del leg2Ticks[dtStr]
# 保存到历史目录
if len(arbTicks) > 0:
self.__saveArbTicksToLocalCache(cachefilename, arbTicks)
return arbTicks
def __loadArbTicksFromLocalCache(self, filename):
"""从本地缓存中,加载数据"""
# 运行路径下cache子目录
cacheFolder = os.getcwd() + '/cache'
# cache文件
cacheFile = u'{0}/{1}.pickle'. \
format(cacheFolder, filename)
if not os.path.isfile(cacheFile):
return []
else:
# 从cache文件加载
cache = open(cacheFile, mode='r')
l = cPickle.load(cache)
cache.close()
return l
def __saveArbTicksToLocalCache(self, filename, arbticks):
"""保存价差tick到本地缓存目录"""
# 运行路径下cache子目录
cacheFolder = os.getcwd() + '/cache'
# 创建cache子目录
if not os.path.isdir(cacheFolder):
os.mkdir(cacheFolder)
# cache 文件名
cacheFile = u'{0}/{1}.pickle'. \
format(cacheFolder, filename)
# 重复存在 返回
if os.path.isfile(cacheFile):
return False
else:
# 写入cache文件
cache = open(cacheFile, mode='w')
cPickle.dump(arbticks, cache)
cache.close()
return True
def convert_to_dataframe(self, ticklist):
"""转换为dataframe格式"""
variables = ['date', 'time', 'askPrice1', 'askVolume1', 'bidPrice1', 'bidVolume1']
dataframe = pandas.DataFrame([[getattr(i, j) for j in variables] for i in ticklist], columns=variables)
return dataframe
def __saveArbTicksToLocalCsv(self, filename, df):
"""保存为本地csv文件"""
# 运行路径下cache子目录
cacheFolder = os.getcwd() + '/cache'
# 创建cache子目录
if not os.path.isdir(cacheFolder):
os.mkdir(cacheFolder)
# cache 文件名
cacheFile = u'{0}/{1}.csv'. \
format(cacheFolder, filename)
# 重复存在 返回
if os.path.isfile(cacheFile):
return False
df.to_csv(cacheFile, index=False)
def __loadArbTicksFromLocalCsv(self ,filename):
"""从本地缓存csv中加载数据"""
# 运行路径下cache子目录
cacheFolder = os.getcwd() + '/cache'
# cache文件
cacheFile = u'{0}/{1}.csv'. \
format(cacheFolder, filename)
if not os.path.isfile(cacheFile):
return None
else:
# 从cache文件加载
df = pandas.read_csv(cacheFile)
return df
def loadDataFrame(self, mainPath, dtDate, leg1Symbol, leg2Symbol):
self.writeCtaLog(u'加载日期:{0}\{1}的价差tick dataframe'.format(mainPath, dtDate))
cachefilename = u'{0}_{1}_{2}_{3}_{4}'. \
format(self.symbol, leg1Symbol, leg2Symbol, mainPath, dtDate.strftime('%Y%m%d'))
df = self.__loadArbTicksFromLocalCsv(cachefilename)
if df is None:
ticks = self.loadArbTicks(mainPath, dtDate, leg1Symbol, leg2Symbol)
if len(ticks) >0:
df = self.convert_to_dataframe(ticks)
self.__saveArbTicksToLocalCsv(cachefilename,df)
return df
def loadDataFrame2(self, mainPath, beginDate, endDate, leg1Symbol, leg2Symbol):
dayIntervals = (endDate- beginDate).days
if dayIntervals < 1:
self.writeCtaLog(u'时间不足')
return None
df = None
cachefilename = u'{0}_{1}_{2}_{3}_{4}'. \
format(self.symbol, leg1Symbol, leg2Symbol, beginDate.strftime('%Y%m%d'), endDate.strftime('%Y%m%d'))
df = self.__loadArbTicksFromLocalCsv(cachefilename)
if df is not None:
return df
for i in range(0, dayIntervals):
getDate = beginDate + timedelta(days=i)
self.output(u'取数据日期:{0}'.format(getDate))
df1,df2 =None,None
# 白天数据
df1 = self.loadDataFrame(mainPath, getDate, leg1Symbol, leg2Symbol)
if df1 is not None and df is None:
df = copy.copy(df1)
self.output(u'数据{0}'.format(len(df)))
elif df1 is not None and df is not None:
df = pandas.concat([df,df1], ignore_index=True)
self.output(u'数据增加{0}行,共{1}'.format(len(df1),len(df)))
# 夜盘数据
df2 = self.loadDataFrame(mainPath + '_night', getDate, leg1Symbol, leg2Symbol)
if df2 is not None and df is None:
df = copy.copy(df2)
self.output(u'数据{0}'.format(len(df)))
elif df2 is not None and df is not None:
df = pandas.concat([df,df2], ignore_index=True)
self.output(u'数据增加{0}行,共{1}'.format(len(df2), len(df)))
self.__saveArbTicksToLocalCsv(cachefilename, df)
return df
if __name__ == '__main__':
loader = UtilArbTickLoader(ticksFolder='z://ticks', symbol='RB')
"""只取tick对象队列"""
#ticks = loader.loadArbTicks(mainPath='SHFE',dtDate=datetime.strptime('20140801', '%Y%m%d'),
# leg1Symbol='RB1501', leg2Symbol='RB1505')
#df = loader.convert_to_dataframe(ticks)
"""取单日dataframe"""
#df = loader.loadDataFrame(mainPath='SHFE',dtDate=datetime.strptime('20140801', '%Y%m%d'),
# leg1Symbol='RB1501', leg2Symbol='RB1505')
"""取一段日期内的dataframe"""
df = loader.loadDataFrame2(mainPath='SHFE', beginDate=datetime.strptime('20140801', '%Y%m%d'),
endDate=datetime.strptime('20141030', '%Y%m%d'),
leg1Symbol='RB1501', leg2Symbol='RB1505')
#print df