diff --git a/vnpy/component/cta_line_bar.py b/vnpy/component/cta_line_bar.py index f9a6e193..37085d6b 100644 --- a/vnpy/component/cta_line_bar.py +++ b/vnpy/component/cta_line_bar.py @@ -200,8 +200,16 @@ class CtaLineBar(object): # 导出到CSV文件 的目录名 和 要导出的 字段 self.export_filename = None # 数据要导出的目标文件夹 self.export_fields = [] # 定义要导出的K线数据字段(包含K线元素,主图指标,附图指标等) - self.export_bi_filename = None # 通过唐其安通道输出得笔csv文件(不是缠论得笔) - self.export_zs_filename = None # 通过唐其安通道输出的中枢csv文件(不是缠论的笔中枢) + self.export_tqa_bi_filename = None # 通过唐其安通道输出得笔csv文件(不是缠论得笔) + self.export_tqa_zs_filename = None # 通过唐其安通道输出的中枢csv文件(不是缠论的笔中枢) + + self.export_bi_filename = None # 通过缠论笔csv文件 + self.export_zs_filename = None # 通过缠论的笔中枢csv文件 + self.export_duan_filename = None # 通过缠论的线段csv文件 + + self.pre_bi_start = None # 前一个笔的时间 + self.pre_zs_start = None # 前一个中枢的时间 + self.pre_duan_start = None # 创建本类型bar的内部变量,以及添加所有指标输入参数,到self.param_list列表 self.init_properties() @@ -514,6 +522,10 @@ class CtaLineBar(object): self.ma13_cross = None # ma1 与 ma3 ,金叉/死叉的点数值 self.ma23_cross = None # ma2 与 ma3 ,金叉/死叉的点数值 + self.ma12_cross_list = [] # ma1 与 ma2 金叉、死叉得点 列表 + self.ma13_cross_list = [] # ma1 与 ma3 金叉、死叉得点 列表 + self.ma23_cross_list = [] # ma2 与 ma3 金叉、死叉得点 列表 + self.ma12_cross_price = None # ma1 与 ma2 ,金叉/死叉时,K线价格数值 self.ma13_cross_price = None # ma1 与 ma3 ,金叉/死叉时,K线价格数值 self.ma23_cross_price = None # ma2 与 ma3 ,金叉/死叉时,K线价格数值 @@ -670,6 +682,9 @@ class CtaLineBar(object): self.macd_top_divergence = False # mcad 面积 与price 顶背离 self.macd_buttom_divergence = False # mcad 面积 与price 底背离 + self.line_macd_chn_upper = [] + self.line_macd_chn_lower = [] + # K 线的CCI计算数据 self.line_cci = [] self.line_cci_ema = [] @@ -971,11 +986,15 @@ class CtaLineBar(object): self.__count_bias() self.__count_bd() self.__count_skdj() + # 输出行情K线 =》 csv文件 self.export_to_csv(bar) self.rt_executed = False # 是否 启动实时计算得函数 self.chanlun_calculated = False + # 输出缠论=》csv文件 + self.export_chan() + # 回调上层调用者,将合成的 x分钟bar,回调给策略 def on_bar_x(self, bar: BarData):函数 if self.cb_on_bar: self.cb_on_bar(bar=bar) @@ -1606,14 +1625,14 @@ class CtaLineBar(object): :param: direction Direction.Long, Direction.Short :return: """ - if self.export_bi_filename is None: + if self.export_tqa_bi_filename is None: return if len(self.tqn_bi_list) < 2: return # 直接插入倒数第二条记录,即已经走完的笔 - self.append_data(file_name=self.export_bi_filename, + self.append_data(file_name=self.export_tqa_bi_filename, dict_data=self.tqn_bi_list[-2], field_names=["start", "end", "direction", "height", "high", "low"] ) @@ -1739,13 +1758,13 @@ class CtaLineBar(object): # 最后一笔是多,且低点在中枢高点上方,中枢确认结束 if direction == 1 and cur_bi.get('low') > self.cur_tqn_zs.get('high'): - self.export_tqn_zs() + self.export_tqa_zs() self.cur_tqn_zs = {} return # 最后一笔是空,且高点在中枢下方,中枢确认结束 if direction == -1 and cur_bi.get('high') < self.cur_tqn_zs.get('low'): - self.export_tqn_zs() + self.export_tqa_zs() self.cur_tqn_zs = {} return @@ -1845,19 +1864,19 @@ class CtaLineBar(object): if self.cur_tqn_zs.get('end') < cur_bi.get('start'): self.cur_tqn_zs.update({"end": cur_bi.get("start")}) - def export_tqn_zs(self): + def export_tqa_zs(self): """ 输出唐其安中枢 =》 csv文件 :return: """ - if self.export_zs_filename is None: + if self.export_tqa_zs_filename is None: return if len(self.cur_tqn_zs) < 1: return # 将当前中枢的信息写入 - self.append_data(file_name=self.export_zs_filename, + self.append_data(file_name=self.export_tqa_zs_filename, dict_data=self.cur_tqn_zs, field_names=["start", "end", "direction", "height", "high", "low"] ) @@ -2168,6 +2187,11 @@ class CtaLineBar(object): if golden_cross: self.ma12_count = 1 self.ma12_cross = round((self.line_ma1[-1] + self.line_ma2[-1]) / 2, self.round_n) + self.ma12_cross_list.append({'cross': self.ma12_cross, + 'price': self.cur_price, + 'datetime': self.cur_datetime.strftime('%Y-%m-%d %H:%M:%S'), + 'type': 'gc'}) + self.check_cross_type(self.ma12_cross_list) self.ma12_cross_price = self.cur_price elif self.line_ma1[-1] < self.line_ma2[-1]: self.ma12_count -= 1 @@ -2176,6 +2200,11 @@ class CtaLineBar(object): if dead_cross: self.ma12_count = -1 self.ma12_cross = round((self.line_ma1[-1] + self.line_ma2[-1]) / 2, self.round_n) + self.ma12_cross_list.append({'cross': self.ma12_cross, + 'price': self.cur_price, + 'datetime': self.cur_datetime.strftime('%Y-%m-%d %H:%M:%S'), + 'type': 'dc'}) + self.check_cross_type(self.ma12_cross_list) self.ma12_cross_price = self.cur_price elif self.line_ma1[-1] > self.line_ma2[-1]: self.ma12_count += 1 @@ -2197,6 +2226,11 @@ class CtaLineBar(object): if golden_cross: self.ma23_count = 1 self.ma23_cross = round((self.line_ma2[-1] + self.line_ma3[-1]) / 2, self.round_n) + self.ma23_cross_list.append({'cross': self.ma23_cross, + 'price': self.cur_price, + 'datetime': self.cur_datetime.strftime('%Y-%m-%d %H:%M:%S'), + 'type': 'gc'}) + self.check_cross_type(self.ma23_cross_list) self.ma23_cross_price = self.cur_price elif self.line_ma2[-1] < self.line_ma3[-1]: self.ma23_count -= 1 @@ -2205,6 +2239,11 @@ class CtaLineBar(object): if dead_cross: self.ma23_count = -1 self.ma23_cross = round((self.line_ma2[-1] + self.line_ma3[-1]) / 2, self.round_n) + self.ma23_cross_list.append({'cross': self.ma23_cross, + 'price': self.cur_price, + 'datetime': self.cur_datetime.strftime('%Y-%m-%d %H:%M:%S'), + 'type': 'dc'}) + self.check_cross_type(self.ma23_cross_list) self.ma23_cross_price = self.cur_price elif self.line_ma2[-1] > self.line_ma3[-1]: self.ma23_count += 1 @@ -2226,6 +2265,11 @@ class CtaLineBar(object): if golden_cross: self.ma13_count = 1 self.ma13_cross = round((self.line_ma1[-1] + self.line_ma3[-1]) / 2, self.round_n) + self.ma13_cross_list.append({'cross': self.ma13_cross, + 'price': self.cur_price, + 'datetime': self.cur_datetime.strftime('%Y-%m-%d %H:%M:%S'), + 'type': 'gc'}) + self.check_cross_type(self.ma13_cross_list) self.ma13_cross_price = self.cur_price elif self.line_ma1[-1] < self.line_ma3[-1]: self.ma13_count -= 1 @@ -2234,10 +2278,64 @@ class CtaLineBar(object): if dead_cross: self.ma13_count = -1 self.ma13_cross = round((self.line_ma1[-1] + self.line_ma3[-1]) / 2, self.round_n) + self.ma13_cross_list.append({'cross': self.ma13_cross, + 'price': self.cur_price, + 'datetime': self.cur_datetime.strftime('%Y-%m-%d %H:%M:%S'), + 'type': 'dc'}) + self.check_cross_type(self.ma13_cross_list) self.ma13_cross_price = self.cur_price elif self.line_ma1[-1] > self.line_ma3[-1]: self.ma13_count += 1 + def check_cross_type(self, cross_list): + """依据缠论,检测其属于背驰得交叉,还是纠缠得交叉""" + if len(cross_list) <= 1 or not self.para_active_chanlun or len(self.duan_list) == 0: + return + + # 交叉得信息 + c_dict = cross_list[-1] + c_type = c_dict.get('type') + c_time = c_dict.get('datetime') + p_time = cross_list[-2].get('datetime') + + # 金叉 + if c_type == 'gc': + direction = Direction.SHORT + else: + direction = Direction.LONG + + # 交叉之前,最后一个线段 + duan = [d for d in self.duan_list[-2:] if d.end > p_time] + if len(duan) > 0: + cur_duan = duan[-1] + # # 判断,线段是否存在中枢背驰 + # zs_beichi = self.is_zs_beichi_inside_duan(direction=direction, cur_duan=cur_duan) + # if zs_beichi: + # c_dict.update({'zs_beichi': True}) + # + # # 判断是否存在段内分笔背驰 + # bi_beichi = self.is_bi_beichi_inside_duan(direction=direction, cur_duan=cur_duan) + # if bi_beichi: + # c_dict.update({'bi_beichi': True}) + # + # # 判断是否存在段内两个同向分笔得macd面积背驰 + # macd_beichi = self.is_fx_macd_divergence(direction=direction, cur_duan=cur_duan) + # if macd_beichi: + # c_dict.update({'macd_beichi': True}) + + # 判断是否存在走势背驰 + zoushi_beichi = self.is_zoushi_beichi(direction=direction, cur_duan=cur_duan) + if zoushi_beichi: + c_dict.update({'zoushi_beichi': True}) + + # 检查当前交叉,是否在最后一个中枢内 + zs = [z for z in self.bi_zs_list[-2:] if z.end > p_time] + if len(zs) > 0: + cur_zs = zs[-1] + c_cross = c_dict.get('cross') + if cur_zs.high > c_cross > cur_zs.low: + c_dict.update({"inside_zs": True}) + def rt_count_ma(self): """ 实时计算MA得值 @@ -3775,6 +3873,13 @@ class CtaLineBar(object): # 创建新的段 if (self.line_macd[-1] > 0 and self.cur_macd_count <= 0) or \ (self.line_macd[-1] < 0 and self.cur_macd_count >= 0): + + # 上一个segment的高低点,作为上、下轨道 + if len(self.macd_segment_list) > 1: + seg = self.macd_segment_list[-2] + self.line_macd_chn_upper.append(seg['max_price']) + self.line_macd_chn_lower.append(seg['min_price']) + segment = {} # 金叉/死叉,更新位置&价格 self.cur_macd_count, self.rt_macd_count = (1, 1) if self.line_macd[-1] > 0 else (-1, -1) @@ -3784,9 +3889,14 @@ class CtaLineBar(object): self.rt_macd_cross_price = self.cur_macd_cross_price # 更新段 segment.update({ + 'start': self.cur_datetime.strftime("%Y-%m-%d %H:%M:%S"), + 'end': self.cur_datetime.strftime("%Y-%m-%d %H:%M:%S"), + 'cross': self.cur_macd_cross, 'macd_count': self.cur_macd_count, 'max_price': self.high_array[-1], 'min_price': self.low_array[-1], + 'max_close': self.close_array[-1], + 'min_close': self.close_array[-1], 'max_dif': self.line_dif[-1], 'min_dif': self.line_dif[-1], 'macd_area': abs(self.line_macd[-1]), @@ -3815,9 +3925,12 @@ class CtaLineBar(object): self.cur_macd_count += 1 segment.update({ + 'end': self.cur_datetime.strftime("%Y-%m-%d %H:%M:%S"), 'macd_count': self.cur_macd_count, 'max_price': max(segment.get('max_price', self.high_array[-1]), self.high_array[-1]), 'min_price': min(segment.get('min_price', self.low_array[-1]), self.low_array[-1]), + 'max_close': max(segment.get('max_close', self.close_array[-1]), self.close_array[-1]), + 'min_close': min(segment.get('min_close', self.close_array[-1]), self.close_array[-1]), 'max_dif': max(segment.get('max_dif', self.line_dif[-1]), self.line_dif[-1]), 'min_dif': min(segment.get('min_dif', self.line_dif[-1]), self.line_dif[-1]), 'macd_area': segment.get('macd_area', 0) + abs(self.line_macd[-1]), @@ -3832,9 +3945,12 @@ class CtaLineBar(object): elif self.line_macd[-1] < 0 and self.cur_macd_count <= 0: self.cur_macd_count -= 1 segment.update({ + 'end': self.cur_datetime.strftime("%Y-%m-%d %H:%M:%S"), 'macd_count': self.cur_macd_count, 'max_price': max(segment.get('max_price', self.high_array[-1]), self.high_array[-1]), 'min_price': min(segment.get('min_price', self.low_array[-1]), self.low_array[-1]), + 'max_close': max(segment.get('max_close', self.close_array[-1]), self.close_array[-1]), + 'min_close': min(segment.get('min_close', self.close_array[-1]), self.close_array[-1]), 'max_dif': max(segment.get('max_dif', self.line_dif[-1]), self.line_dif[-1]), 'min_dif': min(segment.get('min_dif', self.line_dif[-1]), self.line_dif[-1]), 'macd_area': segment.get('macd_area', 0) + abs(self.line_macd[-1]), @@ -3846,9 +3962,18 @@ class CtaLineBar(object): self.rt_macd_cross = 0 self.rt_macd_cross_price = 0 - # 删除超过10个的macd段 - if len(self.macd_segment_list) > 10: + # 延续上一个segment的高低点,作为上、下轨道 + if len(self.line_macd_chn_upper) > 0: + self.line_macd_chn_upper.append(self.line_macd_chn_upper[-1]) + self.line_macd_chn_lower.append(self.line_macd_chn_lower[-1]) + + # 删除超过200个的macd段 + if len(self.macd_segment_list) > 200: self.macd_segment_list.pop(0) + if len(self.line_macd_chn_upper) > self.max_hold_bars: + del self.line_macd_chn_upper[0] + if len(self.line_macd_chn_lower) > self.max_hold_bars: + del self.line_macd_chn_lower[0] def rt_count_macd(self): """ @@ -3922,7 +4047,7 @@ class CtaLineBar(object): return self.line_macd[-1] return self._rt_macd - def is_dif_divergence(self, direction): + def is_dif_divergence(self, direction,s1_time=None, s2_time=None): """ 检查MACD DIF是否与价格有背离 :param: direction,多:检查是否有顶背离,空,检查是否有底背离 @@ -3930,6 +4055,14 @@ class CtaLineBar(object): seg_lens = len(self.macd_segment_list) if seg_lens <= 2: return False + + # if s1_time and s2_time: + # dif 1 = self.get_dif_by_dt(s1_time) + # dif_2 = self.get_last_bar_str(s2_time) + # + # if direction == Direction.LONG: + # if dif_2 + s1, s2 = None, None # s1,倒数的一个匹配段;s2,倒数第二个匹配段 for idx in range(seg_lens): seg = self.macd_segment_list[-idx - 1] @@ -3964,7 +4097,7 @@ class CtaLineBar(object): return False # 上升段,累计的bar数量,不能低于6 - if s1_macd_counts < 6 or s2_macd_counts < 6: + if s1_macd_counts < 4 or s2_macd_counts < 4: return False # 顶背离,只能在零轴上方才判断 @@ -3986,7 +4119,7 @@ class CtaLineBar(object): return False # 每个下跌段,累计的bar数量,不能低于6 - if abs(s1_macd_counts) < 6 or abs(s2_macd_counts) < 6: + if abs(s1_macd_counts) < 4 or abs(s2_macd_counts) < 4: return False # 底部背离,只能在零轴下方才判断 @@ -3999,38 +4132,56 @@ class CtaLineBar(object): return False - def is_macd_divergence(self, direction): + def is_macd_divergence(self, direction, s1_time=None, s2_time=None): """ 检查MACD 能量柱是否与价格有背离 :param: direction,多:检查是否有顶背离,空,检查是否有底背离 + :param: s1_time, 指定在这个时间得能量柱区域s1, 不填写时,缺省为倒数第一个匹配段 + :param: s2_time, 指定在这个时间得能量柱区域s2, 不填写时,缺省为倒数第一个匹配段 """ seg_lens = len(self.macd_segment_list) if seg_lens <= 2: return False s1, s2 = None, None # s1,倒数的一个匹配段;s2,倒数第二个匹配段 - for idx in range(seg_lens): - seg = self.macd_segment_list[-idx - 1] - if direction == Direction.LONG: - if seg.get('macd_count', 0) > 0: - if s1 is None: - s1 = seg - continue - elif s2 is None: - s2 = seg - break - else: - if seg.get('macd_count', 0) < 0: - if s1 is None: - s1 = seg - continue - elif s2 is None: - s2 = seg - break + if s1_time and s2_time: + s1 = [s for s in self.macd_segment_list if s['start'] < s1_time < s['end']] + s2 = [s for s in self.macd_segment_list if s['start'] < s2_time < s['end']] + if len(s1) != 1 or len(s2) != 1: + return False + s1 = s1[-1] + s2 = s2[-1] + + # 指定匹配段,必须与direction一致 + if direction in [Direction.LONG, 1] and (s1['macd_count'] < 0 or s2['macd_count']) < 0: + return False + if direction in [Direction.SHORT, -1] and (s1['macd_count'] > 0 or s2['macd_count']) > 0: + return False + + else: + # 没有指定能量柱子区域,从列表中选择 + for idx in range(seg_lens): + seg = self.macd_segment_list[-idx - 1] + if direction in [Direction.LONG, 1]: + if seg.get('macd_count', 0) > 0: + if s1 is None: + s1 = seg + continue + elif s2 is None: + s2 = seg + break + else: + if seg.get('macd_count', 0) < 0: + if s1 is None: + s1 = seg + continue + elif s2 is None: + s2 = seg + break if not all([s1, s2]): return False - if direction == Direction.LONG: + if direction in [Direction.LONG, 1]: s1_macd_counts = s1.get('macd_count', 1) s2_macd_counts = s2.get('macd_count', 1) s1_max_price = s1.get('max_price', None) @@ -4046,7 +4197,7 @@ class CtaLineBar(object): if s1_max_price >= s2_max_price * 0.99 and s1_area < s2_area: return True - if direction == Direction.SHORT: + if direction in [Direction.SHORT, -1]: s1_macd_counts = s1.get('macd_count', 1) s2_macd_counts = s2.get('macd_count', 1) s1_min_price = s1.get('min_price', None) @@ -5366,7 +5517,7 @@ class CtaLineBar(object): self._bi_list = self.chan_graph.bi_list self._bi_zs_list = self.chan_graph.bi_zhongshu_list self._duan_list = self.chan_graph.duan_list - # self._duan_zs_list = self.chan_graph.duan_zhongshu_list + self._duan_zs_list = self.chan_graph.duan_zhongshu_list self.chanlun_calculated = True @@ -5394,11 +5545,101 @@ class CtaLineBar(object): self.__count_chanlun() return self._duan_list - # @property - # def duan_zs_list(self): - # if not self.chanlun_calculated: - # self.__count_chanlun() - # return self._duan_zs_list + @property + def cur_duan(self): + """当前线段""" + return self.duan_list[-1] if len(self.duan_list) > 0 else None + + @property + def pre_duan(self): + """倒数第二个线段""" + return self.duan_list[-2] if len(self.duan_list) > 1 else None + + @property + def tre_duan(self): + """倒数第三个线段""" + return self.duan_list[-3] if len(self.duan_list) > 2 else None + + @property + def duan_zs_list(self): + if not self.chanlun_calculated: + self.__count_chanlun() + return self._duan_zs_list + + def duan_height_ma(self, duan_len=20): + """返回段得平均高度""" + if not self.chanlun_calculated: + self.__count_chanlun() + return round(sum([d.height for d in self.duan_list[-duan_len:]]) / duan_len, self.round_n) + + def bi_height_ma(self, bi_len=20): + """返回分笔得平均高度""" + if not self.chanlun_calculated: + self.__count_chanlun() + return round(sum([bi.height for bi in self.bi_list[-bi_len:]]) / bi_len, self.round_n) + + def export_chan(self): + """ + 输出缠论 =》 csv文件 + :return: + """ + if not self.para_active_chanlun: + return + + if self.export_bi_filename: + # csv 文件 "start", "end", "direction", "height", "high", "low" + # 获取最后记录的start 开始时间 + if self.pre_bi_start is None: + self.pre_bi_start = self.get_csv_last_dt(self.export_bi_filename, dt_index=0) + if isinstance(self.pre_bi_start, datetime): + self.pre_bi_start = self.pre_bi_start.strftime("%Y-%m-%d %H:%M:%S") + + # 获取所有未写入文件的笔 + bi_list = [bi for bi in self.bi_list[:-1] if (not self.pre_bi_start) or bi.start > self.pre_bi_start] + for bi in bi_list: + self.append_data( + file_name=self.export_bi_filename, + dict_data={"start":bi.start, "end":bi.end, "direction":int(bi.direction), "height":float(bi.high-bi.low), "high":float(bi.high), "low":float(bi.low)}, + field_names=["start", "end", "direction", "height", "high", "low"] + ) + self.pre_bi_start = bi.start + + if self.export_zs_filename : + # csv 文件 "start", "end", "direction", "height", "high", "low" + # 获取最后记录的start 开始时间 + if self.pre_zs_start is None: + self.pre_zs_start = self.get_csv_last_dt(self.export_zs_filename, dt_index=0) + if isinstance(self.pre_zs_start, datetime): + self.pre_zs_start = self.pre_zs_start.strftime("%Y-%m-%d %H:%M:%S") + + # 获取所有未写入文件的zs + zs_list = [zs for zs in self.bi_zs_list[:-1] if (not self.pre_zs_start) or zs.start > self.pre_zs_start] + for zs in zs_list: + self.append_data( + file_name=self.export_zs_filename, + dict_data={"start": zs.start, "end": zs.end, "direction": int(zs.direction), + "height": float(zs.high - zs.low), "high": float(zs.high), "low": float(zs.low)}, + field_names=["start", "end", "direction", "height", "high", "low"] + ) + self.pre_zs_start = zs.start + + if self.export_duan_filename: + # csv 文件 "start", "end", "direction", "height", "high", "low" + # 获取最后记录的start 开始时间 + if self.pre_duan_start is None: + self.pre_duan_start = self.get_csv_last_dt(self.export_duan_filename, dt_index=0) + if isinstance(self.pre_duan_start, datetime): + self.pre_duan_start = self.pre_duan_start.strftime("%Y-%m-%d %H:%M:%S") + + # 获取所有未写入文件的笔 + duan_list = [duan for duan in self.duan_list[:-1] if (not self.pre_duan_start) or duan.start > self.pre_duan_start] + for duan in duan_list: + self.append_data( + file_name=self.export_duan_filename, + dict_data={"start":duan.start, "end":duan.end, "direction":int(duan.direction), "height":float(duan.high-duan.low), "high":float(duan.high), "low":float(duan.low)}, + field_names=["start", "end", "direction", "height", "high", "low"] + ) + self.pre_duan_start = duan.start def is_duan(self, direction): """当前最新一线段,是否与输入方向一致""" @@ -5527,6 +5768,7 @@ class CtaLineBar(object): def is_2nd_opportunity(self, direction): """ 是二买、二卖机会 + 线段内必须至少有一个以上中枢 【二买】当前线段下行,最后2笔不在线段中,最后一笔与下行线段同向,该笔底部不破线段底部,底分型出现且确认 【二卖】当前线段上行,最后2笔不在线段中,最后一笔与上行线段同向,该笔顶部不破线段顶部,顶分型出现且确认 :param direction: 1、Direction.LONG, 当前线段的方向, 判断是否二卖机会; -1 Direction.SHORT, 判断是否二买 @@ -5537,12 +5779,22 @@ class CtaLineBar(object): direction = 1 if direction == Direction.LONG else -1 # 具备段 - if len(self.duan_list) < 1: + if len(self.duan_list) < 2: return False cur_duan = self.duan_list[-1] if cur_duan.direction != direction: return False + # 检查是否具有两个连续得笔中枢 + zs_list = [zs for zs in self.bi_zs_list[-5:] if zs.end > self.duan_list[-2].start] + if len(zs_list) <2: + return False + pre_zs,cur_zs = zs_list[-2:] + if direction == 1 and pre_zs.high > cur_zs.low: + return False + if direction == -1 and pre_zs.low < cur_zs.high: + return False + # 当前段到最新bar之间的笔列表(此时未出现中枢) extra_bi_list = [bi for bi in self.bi_list[-3:] if bi.end > cur_duan.end] if len(extra_bi_list) < 2: @@ -5566,7 +5818,7 @@ class CtaLineBar(object): return False - def is_contain_zs_inside_duan(self, direction, zs_num): + def is_contain_zs_inside_duan(self, direction, cur_duan=None, zs_num=1): """最近段,符合方向,并且至少包含zs_num个中枢""" # Direction => int @@ -5576,16 +5828,22 @@ class CtaLineBar(object): # 具备中枢 if len(self.bi_zs_list) < zs_num: return False - # 具备段 - if len(self.duan_list) < 1: - return False - cur_duan = self.duan_list[-1] + if cur_duan is None: + # 具备段 + if len(self.duan_list) < 1: + return False + cur_duan = self.duan_list[-1] + if cur_duan.direction != direction: return False # 段的开始时间,至少大于前zs_num个中枢的结束时间 - if cur_duan.start > self.bi_zs_list[-zs_num].end: + # if cur_duan.start > self.bi_zs_list[-zs_num].end: + # return False + zs_list = [zs for zs in self.bi_zs_list if zs.end > cur_duan.start and zs.start < cur_duan.end] + + if len(zs_list) < zs_num: return False return True @@ -5803,6 +6061,9 @@ class CtaLineBar(object): if bi_between_zs.height > bi_leave_cur_zs.height and bi_between_zs.atan > bi_leave_cur_zs.atan: return True + if self.is_macd_divergence(direction=direction, s1_time=bi_leave_cur_zs.end, s2_time=bi_between_zs.end): + return True + return False def write_log(self, content): @@ -5859,6 +6120,8 @@ class CtaLineBar(object): :param line_length: 行数据的长度 :return: None,文件不存在,或者时间格式不正确 """ + if not os.path.exists(file_name): + return None with open(file_name, 'r') as f: f_size = os.path.getsize(file_name) if f_size < line_length: