diff --git a/vnpy/trader/app/ctaStrategy/ctaGridTrade.py b/vnpy/trader/app/ctaStrategy/ctaGridTrade.py index 8bbf6562..e244ff70 100644 --- a/vnpy/trader/app/ctaStrategy/ctaGridTrade.py +++ b/vnpy/trader/app/ctaStrategy/ctaGridTrade.py @@ -5,6 +5,8 @@ import os,sys from datetime import datetime import json import uuid +import shutil + from vnpy.trader.app.ctaStrategy.ctaBase import * from vnpy.trader.vtConstant import * @@ -110,7 +112,7 @@ class CtaGridTrade(object): 包括两个方向的网格队列, v1, 基本版 v2,增加更新最小价格跳动,增加动态上下网格间距 - v3, 增加持久化到Mongo数据库 + v3, 合并up/dn为一个文件 """ def __init__(self, strategy, maxlots=5, height=2, win=2, vol=1, minDiff = 1): @@ -142,6 +144,8 @@ class CtaGridTrade(object): self.max_up_open_price = EMPTY_FLOAT # 上网格开仓均价 self.min_dn_open_price = EMPTY_FLOAT # 下网格开仓均价 + self.json_file_path = os.path.join(self.get_data_folder(), u'{}_Grids.json'.format(self.jsonName)) # 网格的路径 + def getVolumeRate(self, gridIndex=EMPTY_INT): """获取网格索引对应的开仓数量比例""" if gridIndex >= len(self.volumeList) or gridIndex < 0: @@ -168,7 +172,7 @@ class CtaGridTrade(object): if len(self.upGrids) == 0: self.upGrids = self.load(direction= DIRECTION_SHORT) if len(self.upGrids) > 0: - self.writeCtaLog(u'上网格从文件加载完成') + self.writeCtaLog(u'上网格从文件{}加载完成'.format(self.json_file_path)) else: # 做空,开仓价为上阻力线+网格高度*i,平仓价为开仓价-止盈高度,开仓数量为缺省 for i in range(0, lots, 1): @@ -185,7 +189,7 @@ class CtaGridTrade(object): if len(self.dnGrids) == 0: self.dnGrids = self.load(direction= DIRECTION_LONG) if len(self.dnGrids) > 0: - self.writeCtaLog(u'下网格从文件加载完成') + self.writeCtaLog(u'下网格从文件{}加载完成'.format(self.json_file_path)) else: for i in range(0, lots, 1): # 做多,开仓价为下阻力线-网格高度*i,平仓价为开仓价+止盈高度,开仓数量为缺省 @@ -794,43 +798,47 @@ class CtaGridTrade(object): # 更新开仓均价 self.recount_avg_open_price() + grids_save_path = self.get_data_folder() - # 工作目录 - currentFolder = os.path.abspath(os.path.join(os.getcwd(), u'data')) - if os.path.isdir(currentFolder): - # 如果工作目录下,存在data子目录,就使用data子目录 - path = currentFolder - else: - # 否则,使用缺省保存目录 vnpy/trader/app/ctaStrategy/data - path = os.path.abspath(os.path.join(os.path.dirname(__file__), u'data')) + # 确保json名字与策略一致 + if self.jsonName != self.strategy.name: + self.writeCtaLog(u'JsonName {} 与 上层策略名{} 不一致.'.format(self.jsonName, self.strategy.name)) + self.jsonName = self.strategy.name - # 保存上网格列表 - if direction == DIRECTION_SHORT: - jsonFileName = os.path.join(path, u'{0}_upGrids.json'.format(self.jsonName)) + # 移除旧版上/下网格列表 + old_up_json_file = os.path.join(grids_save_path, u'{0}_upGrids.json'.format(self.jsonName)) + old_dn_json_file = os.path.join(grids_save_path, u'{0}_dnGrids.json'.format(self.jsonName)) + if os.path.exists(old_up_json_file): + try: + os.remove(old_up_json_file) + except: + pass - l = [] - for grid in self.upGrids: - l.append(grid.toJson()) + if os.path.exists(old_dn_json_file): + try: + os.remove(old_dn_json_file) + except: + pass - with open(jsonFileName, 'w') as f: - jsonL = json.dumps(l, indent=4) - f.write(jsonL) + # 新版网格持久化文件 + grid_json_file = os.path.join(grids_save_path, u'{}_Grids.json'.format(self.jsonName)) + self.json_file_path = grid_json_file - #self.writeCtaLog(u'上网格保存文件{0}完成'.format(jsonFileName)) + data = {} + up_grids = [] + for grid in self.upGrids: + up_grids.append(grid.toJson()) + dn_grids = [] + for grid in self.dnGrids: + dn_grids.append(grid.toJson()) + data[u'up_grids'] = up_grids + data[u'dn_grids'] = dn_grids - # 保存上网格列表 - if direction == DIRECTION_LONG: - jsonFileName = os.path.join(path, u'{0}_dnGrids.json'.format(self.jsonName)) + with open(grid_json_file, 'w') as f: + json_data = json.dumps(data, indent=4) + f.write(json_data) - l = [] - for grid in self.dnGrids: - l.append(grid.toJson()) - - with open(jsonFileName, 'w') as f: - jsonL = json.dumps(l, indent=4) - f.write(jsonL) - - #self.writeCtaLog(u'下网格保存文件{0}完成'.format(jsonFileName)) + self.writeCtaLog(u'GrideTrade保存文件{}完成'.format(grid_json_file)) def load(self, direction, openStatusFilter=[]): """ @@ -839,112 +847,230 @@ class CtaGridTrade(object): :param openStatusFilter: 缺省,不做过滤;True,只提取已开仓的数据,False,只提取未开仓的数据 :return: """ + data = {} + grids_save_path = self.get_data_folder() + if self.jsonName != self.strategy.name: + self.writeCtaLog(u'JsonName {} 与 上层策略名{} 不一致.'.format(self.jsonName, self.strategy.name)) + self.jsonName = self.strategy.name + + # 移除旧版上/下网格列表 + old_up_json_file = os.path.join(grids_save_path, u'{0}_upGrids.json'.format(self.jsonName)) + old_dn_json_file = os.path.join(grids_save_path, u'{0}_dnGrids.json'.format(self.jsonName)) + + if os.path.exists(old_up_json_file): + try: + with open(old_up_json_file, 'r', encoding='utf8') as f: + # 解析json文件 + data['up_grids'] = json.load(f) + except IOError: + self.writeCtaLog(u'读取网格{}出错'.format(old_up_json_file)) + data['up_grids'] = [] + try: # 移除旧版上网格文件 + os.remove(old_up_json_file) + except: + pass + + if os.path.exists(old_dn_json_file): + try: + with open(old_dn_json_file, 'r', encoding='utf8') as f: + # 解析json文件 + data['dn_grids'] = json.load(f) + except IOError as ex: + self.writeCtaLog(u'读取网格{}出错,ex:{}'.format(old_dn_json_file,str(ex))) + data['dn_grids'] = [] + try: # 移除旧版下网格文件 + os.remove(old_dn_json_file) + except: + pass + + # 若新版文件不存在,就保存;若存在,就优先使用新版数据文件 + grid_json_file = os.path.join(grids_save_path, u'{}_Grids.json'.format(self.jsonName)) + if not os.path.exists(grid_json_file): + if len(data) == 0: + data['up_grids'] = [] + data['dn_grids'] = [] + self.writeCtaLog(u'{}不存在,初始化') + else: + self.writeCtaLog(u'{}不存在,保存') + try: + with open(grid_json_file, 'w') as f: + json_data = json.dumps(data, indent=4) + f.write(json_data) + except IOError as ex: + self.writeCtaLog(u'写入网格文件{}异常:{}'.format(grid_json_file,str(ex))) + else: + # 读取json文件 + try: + with open(grid_json_file, 'r', encoding='utf8') as f: + data = json.load(f) + except IOError as ex: + self.writeCtaLog(u'读取网格文件{}异常:{}'.format(grid_json_file,str(ex))) + + # 从文件获取数据 + json_grids = [] + if direction == DIRECTION_SHORT : + json_grids = data['up_grids'] if 'up_grids' in data else [] + + elif direction == DIRECTION_LONG: + json_grids = data['dn_grids'] if 'dn_grids' in data else [] + + grids = [] + for i in json_grids: + + closePrice = float(i['closePrice']) + openPrice = float(i['openPrice']) + stopPrice = float(i['stopPrice']) + + self.writeCtaLog(u'load Grid:open:{0},close:{1},stop:{2}'.format(openPrice, closePrice, stopPrice)) + + grid = CtaGrid(direction=i['direction'], openprice=openPrice, closeprice=closePrice, + stopprice=stopPrice, volume=i['volume']) + grid.orderStatus = i['orderStatus'] # 挂单状态: True,已挂单,False,未挂单 + grid.orderRef = i['orderRef'] # OrderId + grid.openStatus = i['openStatus'] # 开仓状态 + grid.closeStatus = i['closeStatus'] # 平仓状态 + + strTime = i['openDatetime'] + if strTime == EMPTY_STRING or type(strTime)==type(None): + grid.openDatetime = None + else: + grid.openDatetime = datetime.strptime(strTime, '%Y-%m-%d %H:%M:%S') + + try: + grid.tradedVolume = i['tradedVolume'] # 已交易的合约数量 + except KeyError: + grid.tradedVolume = EMPTY_INT + try: + grid.lockGrids = i['lockGrids'] + except KeyError: + grid.lockGrids = [] + + try: + grid.type = i['type'] + if grid.type == False: + grid.type = EMPTY_STRING + except KeyError: + grid.type = EMPTY_STRING + + try: + grid.reuse = i['reuse'] + except KeyError: + grid.reuse = False + + try: + grid.openPrices = i['openPrices'] + except KeyError: + grid.openPrices = {} + + try: + grid.snapshot = i['snapshot'] + except KeyError: + grid.snapshot = {} + + self.writeCtaLog(grid.toStr()) + + # 增加对开仓状态的过滤,满足某些策略只提取已开仓的网格数据 + if len(openStatusFilter) > 0: + if grid.openStatus not in openStatusFilter: + continue + + grids.append(grid) + + # 更新开仓均价 + self.recount_avg_open_price() + return grids + + def get_data_folder(self): + """获取数据目录""" # 工作目录 currentFolder = os.path.abspath(os.path.join(os.getcwd(), u'data')) if os.path.isdir(currentFolder): # 如果工作目录下,存在data子目录,就使用data子目录 - path = currentFolder + return currentFolder else: # 否则,使用缺省保存目录 vnpy/trader/app/ctaStrategy/data - path = os.path.abspath(os.path.join(os.path.dirname(__file__), u'data')) + return os.path.abspath(os.path.join(os.path.dirname(__file__), u'data')) - if direction == DIRECTION_SHORT: - jsonFileName = os.path.join(path, '{0}_upGrids.json'.format(self.jsonName)) - self.writeCtaLog(u'开始加载上网格文件{0}'.format(jsonFileName)) - if direction == DIRECTION_LONG: - jsonFileName = os.path.join(path, u'{0}_dnGrids.json'.format(self.jsonName)) - self.writeCtaLog(u'开始加载上网格文件{0}'.format(jsonFileName)) + def changeStrategyName(self, old_name, new_name): + """ + 在线更换策略实例名称,需要把Json文件也转移 + :param old_name: + :param new_name: + :return: + """ + if old_name == new_name: + self.writeCtaLog(u'更换策略实例名称失败,old:{} =>new:{}'.format(old_name, new_name)) + return - if not os.path.isfile(jsonFileName): - self.writeCtaLog(u'网格保存文件{0}不存在'.format(jsonFileName)) - return [] + data_folder = self.get_data_folder() - try: - f = open(jsonFileName,'r',encoding='utf8') - except IOError: - self.writeCtaLog(u'读取网格出错,请检查') - return [] + self.jsonName = new_name + # 旧文件 + old_up_json_file = os.path.join(data_folder, u'{0}_upGrids.json'.format(old_name)) + old_dn_json_file = os.path.join(data_folder, u'{0}_dnGrids.json'.format(old_name)) + old_json_file = os.path.join(data_folder, u'{0}_Grids.json'.format(old_name)) - # 解析json文件 - l = json.load(f) - grids = [] - - if len(l) > 0: - - for i in l: - - closePrice = float(i['closePrice']) - openPrice = float(i['openPrice']) - stopPrice = float(i['stopPrice']) - - self.writeCtaLog(u'load Grid:open:{0},close:{1},stop:{2}'.format(openPrice, closePrice, stopPrice)) - - grid = CtaGrid(direction=i['direction'], openprice=openPrice, closeprice=closePrice, - stopprice=stopPrice, volume=i['volume']) - grid.orderStatus = i['orderStatus'] # 挂单状态: True,已挂单,False,未挂单 - grid.orderRef = i['orderRef'] # OrderId - grid.openStatus = i['openStatus'] # 开仓状态 - grid.closeStatus = i['closeStatus'] # 平仓状态 - - strTime = i['openDatetime'] - if strTime == EMPTY_STRING or type(strTime)==type(None): - grid.openDatetime = None - else: - grid.openDatetime = datetime.strptime(strTime, '%Y-%m-%d %H:%M:%S') - - try: - grid.tradedVolume = i['tradedVolume'] # 已交易的合约数量 - except KeyError: - grid.tradedVolume = EMPTY_INT - try: - grid.lockGrids = i['lockGrids'] - except KeyError: - grid.lockGrids = [] - - try: - grid.type = i['type'] - if grid.type == False: - grid.type = EMPTY_STRING - except KeyError: - grid.type = EMPTY_STRING - - try: - grid.reuse = i['reuse'] - except KeyError: - grid.reuse = False - - try: - grid.openPrices = i['openPrices'] - except KeyError: - grid.openPrices = {} - - try: - grid.snapshot = i['snapshot'] - except KeyError: - grid.snapshot = {} - - self.writeCtaLog(grid.toStr()) - - # 增加对开仓状态的过滤,满足某些策略只提取已开仓的网格数据 - if len(openStatusFilter) > 0: - if grid.openStatus not in openStatusFilter: - continue - - grids.append(grid) + # 新文件 + self.json_file_path = os.path.join(data_folder, u'{0}_Grids.json'.format(new_name)) + if os.path.isfile(self.json_file_path): # 新文件若存在,移除 + try: + os.remove(self.json_file_path) + except Exception as ex: + self.writeCtaLog(u'GridTrade.changeStrategyName 删除文件:{}异常:{}'.format(old_up_json_file,str(ex))) + # 移动文件 + if os.path.isfile(old_json_file): + try: + shutil.move(old_json_file, self.json_file_path) + return + except Exception as ex: + self.writeCtaLog(u'GridTrade.changeStrategyName 移动文件:{}=》{}异常:{}'.format(old_up_json_file, self.json_file_path, str(ex))) else: - self.writeCtaLog(u'解析网格出错,设置为空列表') - - f.close() - - # 更新开仓均价 - self.recount_avg_open_price() - - return grids + data = {} + if os.path.isfile(old_up_json_file): + try: + with open(old_up_json_file, 'r', encoding='utf8') as f: + # 解析json文件 + data['up_grids'] = json.load(f) + except IOError: + self.writeCtaLog(u'读取网格{}出错'.format(old_up_json_file)) + data['up_grids'] = [] + try: # 移除旧版上网格文件 + os.remove(old_up_json_file) + except IOError: + self.writeCtaLog(u'移除网格{}出错'.format(old_up_json_file)) + else: + data['up_grids'] = [] + if os.path.isfile(old_dn_json_file): + try: + with open(old_dn_json_file, 'r', encoding='utf8') as f: + # 解析json文件 + data['dn_grids'] = json.load(f) + except IOError: + self.writeCtaLog(u'读取网格{}出错'.format(old_dn_json_file)) + data['dn_grids'] = [] + try: # 移除旧版上网格文件 + os.remove(old_dn_json_file) + except IOError: + self.writeCtaLog(u'移除网格{}出错'.format(old_dn_json_file)) + else: + data['dn_grids'] = [] + try: + with open(self.json_file_path, 'w') as f: + json_data = json.dumps(data, indent=4) + f.write(json_data) + except IOError as ex: + self.writeCtaLog(u'写入网格文件{}异常:{}'.format(self.json_file_path, str(ex))) + def getJsonFilePath(self): + """ + 返回上下网格的文件路径 + :return: + """ + return self.json_file_path