合并json文件为一个

This commit is contained in:
msincenselee 2018-03-29 13:47:41 +08:00
parent d173201a0e
commit 44f2926a3b

View File

@ -5,6 +5,8 @@ import os,sys
from datetime import datetime from datetime import datetime
import json import json
import uuid import uuid
import shutil
from vnpy.trader.app.ctaStrategy.ctaBase import * from vnpy.trader.app.ctaStrategy.ctaBase import *
from vnpy.trader.vtConstant import * from vnpy.trader.vtConstant import *
@ -110,7 +112,7 @@ class CtaGridTrade(object):
包括两个方向的网格队列 包括两个方向的网格队列
v1, 基本版 v1, 基本版
v2增加更新最小价格跳动增加动态上下网格间距 v2增加更新最小价格跳动增加动态上下网格间距
v3, 增加持久化到Mongo数据库 v3, 合并up/dn为一个文件
""" """
def __init__(self, strategy, maxlots=5, height=2, win=2, vol=1, minDiff = 1): def __init__(self, strategy, maxlots=5, height=2, win=2, vol=1, minDiff = 1):
@ -142,6 +144,8 @@ class CtaGridTrade(object):
self.max_up_open_price = EMPTY_FLOAT # 上网格开仓均价 self.max_up_open_price = EMPTY_FLOAT # 上网格开仓均价
self.min_dn_open_price = EMPTY_FLOAT # 下网格开仓均价 self.min_dn_open_price = EMPTY_FLOAT # 下网格开仓均价
self.json_file_path = os.path.join(self.get_data_folder(), u'{}_Grids.json'.format(self.jsonName)) # 网格的路径
def getVolumeRate(self, gridIndex=EMPTY_INT): def getVolumeRate(self, gridIndex=EMPTY_INT):
"""获取网格索引对应的开仓数量比例""" """获取网格索引对应的开仓数量比例"""
if gridIndex >= len(self.volumeList) or gridIndex < 0: if gridIndex >= len(self.volumeList) or gridIndex < 0:
@ -168,7 +172,7 @@ class CtaGridTrade(object):
if len(self.upGrids) == 0: if len(self.upGrids) == 0:
self.upGrids = self.load(direction= DIRECTION_SHORT) self.upGrids = self.load(direction= DIRECTION_SHORT)
if len(self.upGrids) > 0: if len(self.upGrids) > 0:
self.writeCtaLog(u'上网格从文件加载完成') self.writeCtaLog(u'上网格从文件{}加载完成'.format(self.json_file_path))
else: else:
# 做空,开仓价为上阻力线+网格高度*i平仓价为开仓价-止盈高度,开仓数量为缺省 # 做空,开仓价为上阻力线+网格高度*i平仓价为开仓价-止盈高度,开仓数量为缺省
for i in range(0, lots, 1): for i in range(0, lots, 1):
@ -185,7 +189,7 @@ class CtaGridTrade(object):
if len(self.dnGrids) == 0: if len(self.dnGrids) == 0:
self.dnGrids = self.load(direction= DIRECTION_LONG) self.dnGrids = self.load(direction= DIRECTION_LONG)
if len(self.dnGrids) > 0: if len(self.dnGrids) > 0:
self.writeCtaLog(u'下网格从文件加载完成') self.writeCtaLog(u'下网格从文件{}加载完成'.format(self.json_file_path))
else: else:
for i in range(0, lots, 1): for i in range(0, lots, 1):
# 做多,开仓价为下阻力线-网格高度*i平仓价为开仓价+止盈高度,开仓数量为缺省 # 做多,开仓价为下阻力线-网格高度*i平仓价为开仓价+止盈高度,开仓数量为缺省
@ -794,43 +798,47 @@ class CtaGridTrade(object):
# 更新开仓均价 # 更新开仓均价
self.recount_avg_open_price() self.recount_avg_open_price()
grids_save_path = self.get_data_folder()
# 工作目录 # 确保json名字与策略一致
currentFolder = os.path.abspath(os.path.join(os.getcwd(), u'data')) if self.jsonName != self.strategy.name:
if os.path.isdir(currentFolder): self.writeCtaLog(u'JsonName {} 与 上层策略名{} 不一致.'.format(self.jsonName, self.strategy.name))
# 如果工作目录下存在data子目录就使用data子目录 self.jsonName = self.strategy.name
path = currentFolder
else:
# 否则,使用缺省保存目录 vnpy/trader/app/ctaStrategy/data
path = os.path.abspath(os.path.join(os.path.dirname(__file__), u'data'))
# 保存上网格列表 # 移除旧版上/下网格列表
if direction == DIRECTION_SHORT: old_up_json_file = os.path.join(grids_save_path, u'{0}_upGrids.json'.format(self.jsonName))
jsonFileName = os.path.join(path, u'{0}_upGrids.json'.format(self.jsonName)) old_dn_json_file = os.path.join(grids_save_path, u'{0}_dnGrids.json'.format(self.jsonName))
if os.path.exists(old_up_json_file):
try:
os.remove(old_up_json_file)
except:
pass
l = [] if os.path.exists(old_dn_json_file):
for grid in self.upGrids: try:
l.append(grid.toJson()) os.remove(old_dn_json_file)
except:
pass
with open(jsonFileName, 'w') as f: # 新版网格持久化文件
jsonL = json.dumps(l, indent=4) grid_json_file = os.path.join(grids_save_path, u'{}_Grids.json'.format(self.jsonName))
f.write(jsonL) self.json_file_path = grid_json_file
#self.writeCtaLog(u'上网格保存文件{0}完成'.format(jsonFileName)) data = {}
up_grids = []
for grid in self.upGrids:
up_grids.append(grid.toJson())
dn_grids = []
for grid in self.dnGrids:
dn_grids.append(grid.toJson())
data[u'up_grids'] = up_grids
data[u'dn_grids'] = dn_grids
# 保存上网格列表 with open(grid_json_file, 'w') as f:
if direction == DIRECTION_LONG: json_data = json.dumps(data, indent=4)
jsonFileName = os.path.join(path, u'{0}_dnGrids.json'.format(self.jsonName)) f.write(json_data)
l = [] self.writeCtaLog(u'GrideTrade保存文件{}完成'.format(grid_json_file))
for grid in self.dnGrids:
l.append(grid.toJson())
with open(jsonFileName, 'w') as f:
jsonL = json.dumps(l, indent=4)
f.write(jsonL)
#self.writeCtaLog(u'下网格保存文件{0}完成'.format(jsonFileName))
def load(self, direction, openStatusFilter=[]): def load(self, direction, openStatusFilter=[]):
""" """
@ -839,112 +847,230 @@ class CtaGridTrade(object):
:param openStatusFilter: 缺省不做过滤True只提取已开仓的数据False只提取未开仓的数据 :param openStatusFilter: 缺省不做过滤True只提取已开仓的数据False只提取未开仓的数据
:return: :return:
""" """
data = {}
grids_save_path = self.get_data_folder()
if self.jsonName != self.strategy.name:
self.writeCtaLog(u'JsonName {} 与 上层策略名{} 不一致.'.format(self.jsonName, self.strategy.name))
self.jsonName = self.strategy.name
# 移除旧版上/下网格列表
old_up_json_file = os.path.join(grids_save_path, u'{0}_upGrids.json'.format(self.jsonName))
old_dn_json_file = os.path.join(grids_save_path, u'{0}_dnGrids.json'.format(self.jsonName))
if os.path.exists(old_up_json_file):
try:
with open(old_up_json_file, 'r', encoding='utf8') as f:
# 解析json文件
data['up_grids'] = json.load(f)
except IOError:
self.writeCtaLog(u'读取网格{}出错'.format(old_up_json_file))
data['up_grids'] = []
try: # 移除旧版上网格文件
os.remove(old_up_json_file)
except:
pass
if os.path.exists(old_dn_json_file):
try:
with open(old_dn_json_file, 'r', encoding='utf8') as f:
# 解析json文件
data['dn_grids'] = json.load(f)
except IOError as ex:
self.writeCtaLog(u'读取网格{}出错,ex:{}'.format(old_dn_json_file,str(ex)))
data['dn_grids'] = []
try: # 移除旧版下网格文件
os.remove(old_dn_json_file)
except:
pass
# 若新版文件不存在,就保存;若存在,就优先使用新版数据文件
grid_json_file = os.path.join(grids_save_path, u'{}_Grids.json'.format(self.jsonName))
if not os.path.exists(grid_json_file):
if len(data) == 0:
data['up_grids'] = []
data['dn_grids'] = []
self.writeCtaLog(u'{}不存在,初始化')
else:
self.writeCtaLog(u'{}不存在,保存')
try:
with open(grid_json_file, 'w') as f:
json_data = json.dumps(data, indent=4)
f.write(json_data)
except IOError as ex:
self.writeCtaLog(u'写入网格文件{}异常:{}'.format(grid_json_file,str(ex)))
else:
# 读取json文件
try:
with open(grid_json_file, 'r', encoding='utf8') as f:
data = json.load(f)
except IOError as ex:
self.writeCtaLog(u'读取网格文件{}异常:{}'.format(grid_json_file,str(ex)))
# 从文件获取数据
json_grids = []
if direction == DIRECTION_SHORT :
json_grids = data['up_grids'] if 'up_grids' in data else []
elif direction == DIRECTION_LONG:
json_grids = data['dn_grids'] if 'dn_grids' in data else []
grids = []
for i in json_grids:
closePrice = float(i['closePrice'])
openPrice = float(i['openPrice'])
stopPrice = float(i['stopPrice'])
self.writeCtaLog(u'load Grid:open:{0},close:{1},stop:{2}'.format(openPrice, closePrice, stopPrice))
grid = CtaGrid(direction=i['direction'], openprice=openPrice, closeprice=closePrice,
stopprice=stopPrice, volume=i['volume'])
grid.orderStatus = i['orderStatus'] # 挂单状态: True,已挂单False未挂单
grid.orderRef = i['orderRef'] # OrderId
grid.openStatus = i['openStatus'] # 开仓状态
grid.closeStatus = i['closeStatus'] # 平仓状态
strTime = i['openDatetime']
if strTime == EMPTY_STRING or type(strTime)==type(None):
grid.openDatetime = None
else:
grid.openDatetime = datetime.strptime(strTime, '%Y-%m-%d %H:%M:%S')
try:
grid.tradedVolume = i['tradedVolume'] # 已交易的合约数量
except KeyError:
grid.tradedVolume = EMPTY_INT
try:
grid.lockGrids = i['lockGrids']
except KeyError:
grid.lockGrids = []
try:
grid.type = i['type']
if grid.type == False:
grid.type = EMPTY_STRING
except KeyError:
grid.type = EMPTY_STRING
try:
grid.reuse = i['reuse']
except KeyError:
grid.reuse = False
try:
grid.openPrices = i['openPrices']
except KeyError:
grid.openPrices = {}
try:
grid.snapshot = i['snapshot']
except KeyError:
grid.snapshot = {}
self.writeCtaLog(grid.toStr())
# 增加对开仓状态的过滤,满足某些策略只提取已开仓的网格数据
if len(openStatusFilter) > 0:
if grid.openStatus not in openStatusFilter:
continue
grids.append(grid)
# 更新开仓均价
self.recount_avg_open_price()
return grids
def get_data_folder(self):
"""获取数据目录"""
# 工作目录 # 工作目录
currentFolder = os.path.abspath(os.path.join(os.getcwd(), u'data')) currentFolder = os.path.abspath(os.path.join(os.getcwd(), u'data'))
if os.path.isdir(currentFolder): if os.path.isdir(currentFolder):
# 如果工作目录下存在data子目录就使用data子目录 # 如果工作目录下存在data子目录就使用data子目录
path = currentFolder return currentFolder
else: else:
# 否则,使用缺省保存目录 vnpy/trader/app/ctaStrategy/data # 否则,使用缺省保存目录 vnpy/trader/app/ctaStrategy/data
path = os.path.abspath(os.path.join(os.path.dirname(__file__), u'data')) return os.path.abspath(os.path.join(os.path.dirname(__file__), u'data'))
if direction == DIRECTION_SHORT: def changeStrategyName(self, old_name, new_name):
jsonFileName = os.path.join(path, '{0}_upGrids.json'.format(self.jsonName)) """
self.writeCtaLog(u'开始加载上网格文件{0}'.format(jsonFileName)) 在线更换策略实例名称需要把Json文件也转移
if direction == DIRECTION_LONG: :param old_name:
jsonFileName = os.path.join(path, u'{0}_dnGrids.json'.format(self.jsonName)) :param new_name:
self.writeCtaLog(u'开始加载上网格文件{0}'.format(jsonFileName)) :return:
"""
if old_name == new_name:
self.writeCtaLog(u'更换策略实例名称失败,old:{} =>new:{}'.format(old_name, new_name))
return
if not os.path.isfile(jsonFileName): data_folder = self.get_data_folder()
self.writeCtaLog(u'网格保存文件{0}不存在'.format(jsonFileName))
return []
try: self.jsonName = new_name
f = open(jsonFileName,'r',encoding='utf8') # 旧文件
except IOError: old_up_json_file = os.path.join(data_folder, u'{0}_upGrids.json'.format(old_name))
self.writeCtaLog(u'读取网格出错,请检查') old_dn_json_file = os.path.join(data_folder, u'{0}_dnGrids.json'.format(old_name))
return [] old_json_file = os.path.join(data_folder, u'{0}_Grids.json'.format(old_name))
# 解析json文件 # 新文件
l = json.load(f) self.json_file_path = os.path.join(data_folder, u'{0}_Grids.json'.format(new_name))
grids = [] if os.path.isfile(self.json_file_path): # 新文件若存在,移除
try:
if len(l) > 0: os.remove(self.json_file_path)
except Exception as ex:
for i in l: self.writeCtaLog(u'GridTrade.changeStrategyName 删除文件:{}异常:{}'.format(old_up_json_file,str(ex)))
closePrice = float(i['closePrice'])
openPrice = float(i['openPrice'])
stopPrice = float(i['stopPrice'])
self.writeCtaLog(u'load Grid:open:{0},close:{1},stop:{2}'.format(openPrice, closePrice, stopPrice))
grid = CtaGrid(direction=i['direction'], openprice=openPrice, closeprice=closePrice,
stopprice=stopPrice, volume=i['volume'])
grid.orderStatus = i['orderStatus'] # 挂单状态: True,已挂单False未挂单
grid.orderRef = i['orderRef'] # OrderId
grid.openStatus = i['openStatus'] # 开仓状态
grid.closeStatus = i['closeStatus'] # 平仓状态
strTime = i['openDatetime']
if strTime == EMPTY_STRING or type(strTime)==type(None):
grid.openDatetime = None
else:
grid.openDatetime = datetime.strptime(strTime, '%Y-%m-%d %H:%M:%S')
try:
grid.tradedVolume = i['tradedVolume'] # 已交易的合约数量
except KeyError:
grid.tradedVolume = EMPTY_INT
try:
grid.lockGrids = i['lockGrids']
except KeyError:
grid.lockGrids = []
try:
grid.type = i['type']
if grid.type == False:
grid.type = EMPTY_STRING
except KeyError:
grid.type = EMPTY_STRING
try:
grid.reuse = i['reuse']
except KeyError:
grid.reuse = False
try:
grid.openPrices = i['openPrices']
except KeyError:
grid.openPrices = {}
try:
grid.snapshot = i['snapshot']
except KeyError:
grid.snapshot = {}
self.writeCtaLog(grid.toStr())
# 增加对开仓状态的过滤,满足某些策略只提取已开仓的网格数据
if len(openStatusFilter) > 0:
if grid.openStatus not in openStatusFilter:
continue
grids.append(grid)
# 移动文件
if os.path.isfile(old_json_file):
try:
shutil.move(old_json_file, self.json_file_path)
return
except Exception as ex:
self.writeCtaLog(u'GridTrade.changeStrategyName 移动文件:{}=》{}异常:{}'.format(old_up_json_file, self.json_file_path, str(ex)))
else: else:
self.writeCtaLog(u'解析网格出错,设置为空列表') data = {}
if os.path.isfile(old_up_json_file):
f.close() try:
with open(old_up_json_file, 'r', encoding='utf8') as f:
# 更新开仓均价 # 解析json文件
self.recount_avg_open_price() data['up_grids'] = json.load(f)
except IOError:
return grids self.writeCtaLog(u'读取网格{}出错'.format(old_up_json_file))
data['up_grids'] = []
try: # 移除旧版上网格文件
os.remove(old_up_json_file)
except IOError:
self.writeCtaLog(u'移除网格{}出错'.format(old_up_json_file))
else:
data['up_grids'] = []
if os.path.isfile(old_dn_json_file):
try:
with open(old_dn_json_file, 'r', encoding='utf8') as f:
# 解析json文件
data['dn_grids'] = json.load(f)
except IOError:
self.writeCtaLog(u'读取网格{}出错'.format(old_dn_json_file))
data['dn_grids'] = []
try: # 移除旧版上网格文件
os.remove(old_dn_json_file)
except IOError:
self.writeCtaLog(u'移除网格{}出错'.format(old_dn_json_file))
else:
data['dn_grids'] = []
try:
with open(self.json_file_path, 'w') as f:
json_data = json.dumps(data, indent=4)
f.write(json_data)
except IOError as ex:
self.writeCtaLog(u'写入网格文件{}异常:{}'.format(self.json_file_path, str(ex)))
def getJsonFilePath(self):
"""
返回上下网格的文件路径
:return:
"""
return self.json_file_path