[update] 缠论相关

This commit is contained in:
msincenselee 2021-03-21 20:54:21 +08:00
parent a4c18fb320
commit ed3320d7c5

View File

@ -200,8 +200,16 @@ class CtaLineBar(object):
# 导出到CSV文件 的目录名 和 要导出的 字段
self.export_filename = None # 数据要导出的目标文件夹
self.export_fields = [] # 定义要导出的K线数据字段包含K线元素主图指标附图指标等
self.export_bi_filename = None # 通过唐其安通道输出得笔csv文件(不是缠论得笔)
self.export_zs_filename = None # 通过唐其安通道输出的中枢csv文件不是缠论的笔中枢
self.export_tqa_bi_filename = None # 通过唐其安通道输出得笔csv文件(不是缠论得笔)
self.export_tqa_zs_filename = None # 通过唐其安通道输出的中枢csv文件不是缠论的笔中枢
self.export_bi_filename = None # 通过缠论笔csv文件
self.export_zs_filename = None # 通过缠论的笔中枢csv文件
self.export_duan_filename = None # 通过缠论的线段csv文件
self.pre_bi_start = None # 前一个笔的时间
self.pre_zs_start = None # 前一个中枢的时间
self.pre_duan_start = None
# 创建本类型bar的内部变量以及添加所有指标输入参数到self.param_list列表
self.init_properties()
@ -514,6 +522,10 @@ class CtaLineBar(object):
self.ma13_cross = None # ma1 与 ma3 ,金叉/死叉的点数值
self.ma23_cross = None # ma2 与 ma3 ,金叉/死叉的点数值
self.ma12_cross_list = [] # ma1 与 ma2 金叉、死叉得点 列表
self.ma13_cross_list = [] # ma1 与 ma3 金叉、死叉得点 列表
self.ma23_cross_list = [] # ma2 与 ma3 金叉、死叉得点 列表
self.ma12_cross_price = None # ma1 与 ma2 ,金叉/死叉时K线价格数值
self.ma13_cross_price = None # ma1 与 ma3 ,金叉/死叉时K线价格数值
self.ma23_cross_price = None # ma2 与 ma3 ,金叉/死叉时K线价格数值
@ -670,6 +682,9 @@ class CtaLineBar(object):
self.macd_top_divergence = False # mcad 面积 与price 顶背离
self.macd_buttom_divergence = False # mcad 面积 与price 底背离
self.line_macd_chn_upper = []
self.line_macd_chn_lower = []
# K 线的CCI计算数据
self.line_cci = []
self.line_cci_ema = []
@ -971,11 +986,15 @@ class CtaLineBar(object):
self.__count_bias()
self.__count_bd()
self.__count_skdj()
# 输出行情K线 =》 csv文件
self.export_to_csv(bar)
self.rt_executed = False # 是否 启动实时计算得函数
self.chanlun_calculated = False
# 输出缠论=》csv文件
self.export_chan()
# 回调上层调用者,将合成的 x分钟bar回调给策略 def on_bar_x(self, bar: BarData):函数
if self.cb_on_bar:
self.cb_on_bar(bar=bar)
@ -1606,14 +1625,14 @@ class CtaLineBar(object):
:param: direction Direction.Long, Direction.Short
:return:
"""
if self.export_bi_filename is None:
if self.export_tqa_bi_filename is None:
return
if len(self.tqn_bi_list) < 2:
return
# 直接插入倒数第二条记录,即已经走完的笔
self.append_data(file_name=self.export_bi_filename,
self.append_data(file_name=self.export_tqa_bi_filename,
dict_data=self.tqn_bi_list[-2],
field_names=["start", "end", "direction", "height", "high", "low"]
)
@ -1739,13 +1758,13 @@ class CtaLineBar(object):
# 最后一笔是多,且低点在中枢高点上方,中枢确认结束
if direction == 1 and cur_bi.get('low') > self.cur_tqn_zs.get('high'):
self.export_tqn_zs()
self.export_tqa_zs()
self.cur_tqn_zs = {}
return
# 最后一笔是空,且高点在中枢下方,中枢确认结束
if direction == -1 and cur_bi.get('high') < self.cur_tqn_zs.get('low'):
self.export_tqn_zs()
self.export_tqa_zs()
self.cur_tqn_zs = {}
return
@ -1845,19 +1864,19 @@ class CtaLineBar(object):
if self.cur_tqn_zs.get('end') < cur_bi.get('start'):
self.cur_tqn_zs.update({"end": cur_bi.get("start")})
def export_tqn_zs(self):
def export_tqa_zs(self):
"""
输出唐其安中枢 = csv文件
:return:
"""
if self.export_zs_filename is None:
if self.export_tqa_zs_filename is None:
return
if len(self.cur_tqn_zs) < 1:
return
# 将当前中枢的信息写入
self.append_data(file_name=self.export_zs_filename,
self.append_data(file_name=self.export_tqa_zs_filename,
dict_data=self.cur_tqn_zs,
field_names=["start", "end", "direction", "height", "high", "low"]
)
@ -2168,6 +2187,11 @@ class CtaLineBar(object):
if golden_cross:
self.ma12_count = 1
self.ma12_cross = round((self.line_ma1[-1] + self.line_ma2[-1]) / 2, self.round_n)
self.ma12_cross_list.append({'cross': self.ma12_cross,
'price': self.cur_price,
'datetime': self.cur_datetime.strftime('%Y-%m-%d %H:%M:%S'),
'type': 'gc'})
self.check_cross_type(self.ma12_cross_list)
self.ma12_cross_price = self.cur_price
elif self.line_ma1[-1] < self.line_ma2[-1]:
self.ma12_count -= 1
@ -2176,6 +2200,11 @@ class CtaLineBar(object):
if dead_cross:
self.ma12_count = -1
self.ma12_cross = round((self.line_ma1[-1] + self.line_ma2[-1]) / 2, self.round_n)
self.ma12_cross_list.append({'cross': self.ma12_cross,
'price': self.cur_price,
'datetime': self.cur_datetime.strftime('%Y-%m-%d %H:%M:%S'),
'type': 'dc'})
self.check_cross_type(self.ma12_cross_list)
self.ma12_cross_price = self.cur_price
elif self.line_ma1[-1] > self.line_ma2[-1]:
self.ma12_count += 1
@ -2197,6 +2226,11 @@ class CtaLineBar(object):
if golden_cross:
self.ma23_count = 1
self.ma23_cross = round((self.line_ma2[-1] + self.line_ma3[-1]) / 2, self.round_n)
self.ma23_cross_list.append({'cross': self.ma23_cross,
'price': self.cur_price,
'datetime': self.cur_datetime.strftime('%Y-%m-%d %H:%M:%S'),
'type': 'gc'})
self.check_cross_type(self.ma23_cross_list)
self.ma23_cross_price = self.cur_price
elif self.line_ma2[-1] < self.line_ma3[-1]:
self.ma23_count -= 1
@ -2205,6 +2239,11 @@ class CtaLineBar(object):
if dead_cross:
self.ma23_count = -1
self.ma23_cross = round((self.line_ma2[-1] + self.line_ma3[-1]) / 2, self.round_n)
self.ma23_cross_list.append({'cross': self.ma23_cross,
'price': self.cur_price,
'datetime': self.cur_datetime.strftime('%Y-%m-%d %H:%M:%S'),
'type': 'dc'})
self.check_cross_type(self.ma23_cross_list)
self.ma23_cross_price = self.cur_price
elif self.line_ma2[-1] > self.line_ma3[-1]:
self.ma23_count += 1
@ -2226,6 +2265,11 @@ class CtaLineBar(object):
if golden_cross:
self.ma13_count = 1
self.ma13_cross = round((self.line_ma1[-1] + self.line_ma3[-1]) / 2, self.round_n)
self.ma13_cross_list.append({'cross': self.ma13_cross,
'price': self.cur_price,
'datetime': self.cur_datetime.strftime('%Y-%m-%d %H:%M:%S'),
'type': 'gc'})
self.check_cross_type(self.ma13_cross_list)
self.ma13_cross_price = self.cur_price
elif self.line_ma1[-1] < self.line_ma3[-1]:
self.ma13_count -= 1
@ -2234,10 +2278,64 @@ class CtaLineBar(object):
if dead_cross:
self.ma13_count = -1
self.ma13_cross = round((self.line_ma1[-1] + self.line_ma3[-1]) / 2, self.round_n)
self.ma13_cross_list.append({'cross': self.ma13_cross,
'price': self.cur_price,
'datetime': self.cur_datetime.strftime('%Y-%m-%d %H:%M:%S'),
'type': 'dc'})
self.check_cross_type(self.ma13_cross_list)
self.ma13_cross_price = self.cur_price
elif self.line_ma1[-1] > self.line_ma3[-1]:
self.ma13_count += 1
def check_cross_type(self, cross_list):
"""依据缠论,检测其属于背驰得交叉,还是纠缠得交叉"""
if len(cross_list) <= 1 or not self.para_active_chanlun or len(self.duan_list) == 0:
return
# 交叉得信息
c_dict = cross_list[-1]
c_type = c_dict.get('type')
c_time = c_dict.get('datetime')
p_time = cross_list[-2].get('datetime')
# 金叉
if c_type == 'gc':
direction = Direction.SHORT
else:
direction = Direction.LONG
# 交叉之前,最后一个线段
duan = [d for d in self.duan_list[-2:] if d.end > p_time]
if len(duan) > 0:
cur_duan = duan[-1]
# # 判断,线段是否存在中枢背驰
# zs_beichi = self.is_zs_beichi_inside_duan(direction=direction, cur_duan=cur_duan)
# if zs_beichi:
# c_dict.update({'zs_beichi': True})
#
# # 判断是否存在段内分笔背驰
# bi_beichi = self.is_bi_beichi_inside_duan(direction=direction, cur_duan=cur_duan)
# if bi_beichi:
# c_dict.update({'bi_beichi': True})
#
# # 判断是否存在段内两个同向分笔得macd面积背驰
# macd_beichi = self.is_fx_macd_divergence(direction=direction, cur_duan=cur_duan)
# if macd_beichi:
# c_dict.update({'macd_beichi': True})
# 判断是否存在走势背驰
zoushi_beichi = self.is_zoushi_beichi(direction=direction, cur_duan=cur_duan)
if zoushi_beichi:
c_dict.update({'zoushi_beichi': True})
# 检查当前交叉,是否在最后一个中枢内
zs = [z for z in self.bi_zs_list[-2:] if z.end > p_time]
if len(zs) > 0:
cur_zs = zs[-1]
c_cross = c_dict.get('cross')
if cur_zs.high > c_cross > cur_zs.low:
c_dict.update({"inside_zs": True})
def rt_count_ma(self):
"""
实时计算MA得值
@ -3775,6 +3873,13 @@ class CtaLineBar(object):
# 创建新的段
if (self.line_macd[-1] > 0 and self.cur_macd_count <= 0) or \
(self.line_macd[-1] < 0 and self.cur_macd_count >= 0):
# 上一个segment的高低点作为上、下轨道
if len(self.macd_segment_list) > 1:
seg = self.macd_segment_list[-2]
self.line_macd_chn_upper.append(seg['max_price'])
self.line_macd_chn_lower.append(seg['min_price'])
segment = {}
# 金叉/死叉,更新位置&价格
self.cur_macd_count, self.rt_macd_count = (1, 1) if self.line_macd[-1] > 0 else (-1, -1)
@ -3784,9 +3889,14 @@ class CtaLineBar(object):
self.rt_macd_cross_price = self.cur_macd_cross_price
# 更新段
segment.update({
'start': self.cur_datetime.strftime("%Y-%m-%d %H:%M:%S"),
'end': self.cur_datetime.strftime("%Y-%m-%d %H:%M:%S"),
'cross': self.cur_macd_cross,
'macd_count': self.cur_macd_count,
'max_price': self.high_array[-1],
'min_price': self.low_array[-1],
'max_close': self.close_array[-1],
'min_close': self.close_array[-1],
'max_dif': self.line_dif[-1],
'min_dif': self.line_dif[-1],
'macd_area': abs(self.line_macd[-1]),
@ -3815,9 +3925,12 @@ class CtaLineBar(object):
self.cur_macd_count += 1
segment.update({
'end': self.cur_datetime.strftime("%Y-%m-%d %H:%M:%S"),
'macd_count': self.cur_macd_count,
'max_price': max(segment.get('max_price', self.high_array[-1]), self.high_array[-1]),
'min_price': min(segment.get('min_price', self.low_array[-1]), self.low_array[-1]),
'max_close': max(segment.get('max_close', self.close_array[-1]), self.close_array[-1]),
'min_close': min(segment.get('min_close', self.close_array[-1]), self.close_array[-1]),
'max_dif': max(segment.get('max_dif', self.line_dif[-1]), self.line_dif[-1]),
'min_dif': min(segment.get('min_dif', self.line_dif[-1]), self.line_dif[-1]),
'macd_area': segment.get('macd_area', 0) + abs(self.line_macd[-1]),
@ -3832,9 +3945,12 @@ class CtaLineBar(object):
elif self.line_macd[-1] < 0 and self.cur_macd_count <= 0:
self.cur_macd_count -= 1
segment.update({
'end': self.cur_datetime.strftime("%Y-%m-%d %H:%M:%S"),
'macd_count': self.cur_macd_count,
'max_price': max(segment.get('max_price', self.high_array[-1]), self.high_array[-1]),
'min_price': min(segment.get('min_price', self.low_array[-1]), self.low_array[-1]),
'max_close': max(segment.get('max_close', self.close_array[-1]), self.close_array[-1]),
'min_close': min(segment.get('min_close', self.close_array[-1]), self.close_array[-1]),
'max_dif': max(segment.get('max_dif', self.line_dif[-1]), self.line_dif[-1]),
'min_dif': min(segment.get('min_dif', self.line_dif[-1]), self.line_dif[-1]),
'macd_area': segment.get('macd_area', 0) + abs(self.line_macd[-1]),
@ -3846,9 +3962,18 @@ class CtaLineBar(object):
self.rt_macd_cross = 0
self.rt_macd_cross_price = 0
# 删除超过10个的macd段
if len(self.macd_segment_list) > 10:
# 延续上一个segment的高低点作为上、下轨道
if len(self.line_macd_chn_upper) > 0:
self.line_macd_chn_upper.append(self.line_macd_chn_upper[-1])
self.line_macd_chn_lower.append(self.line_macd_chn_lower[-1])
# 删除超过200个的macd段
if len(self.macd_segment_list) > 200:
self.macd_segment_list.pop(0)
if len(self.line_macd_chn_upper) > self.max_hold_bars:
del self.line_macd_chn_upper[0]
if len(self.line_macd_chn_lower) > self.max_hold_bars:
del self.line_macd_chn_lower[0]
def rt_count_macd(self):
"""
@ -3922,7 +4047,7 @@ class CtaLineBar(object):
return self.line_macd[-1]
return self._rt_macd
def is_dif_divergence(self, direction):
def is_dif_divergence(self, direction,s1_time=None, s2_time=None):
"""
检查MACD DIF是否与价格有背离
:param: direction检查是否有顶背离检查是否有底背离
@ -3930,6 +4055,14 @@ class CtaLineBar(object):
seg_lens = len(self.macd_segment_list)
if seg_lens <= 2:
return False
# if s1_time and s2_time:
# dif 1 = self.get_dif_by_dt(s1_time)
# dif_2 = self.get_last_bar_str(s2_time)
#
# if direction == Direction.LONG:
# if dif_2
s1, s2 = None, None # s1,倒数的一个匹配段s2倒数第二个匹配段
for idx in range(seg_lens):
seg = self.macd_segment_list[-idx - 1]
@ -3964,7 +4097,7 @@ class CtaLineBar(object):
return False
# 上升段累计的bar数量不能低于6
if s1_macd_counts < 6 or s2_macd_counts < 6:
if s1_macd_counts < 4 or s2_macd_counts < 4:
return False
# 顶背离,只能在零轴上方才判断
@ -3986,7 +4119,7 @@ class CtaLineBar(object):
return False
# 每个下跌段累计的bar数量不能低于6
if abs(s1_macd_counts) < 6 or abs(s2_macd_counts) < 6:
if abs(s1_macd_counts) < 4 or abs(s2_macd_counts) < 4:
return False
# 底部背离,只能在零轴下方才判断
@ -3999,38 +4132,56 @@ class CtaLineBar(object):
return False
def is_macd_divergence(self, direction):
def is_macd_divergence(self, direction, s1_time=None, s2_time=None):
"""
检查MACD 能量柱是否与价格有背离
:param: direction检查是否有顶背离检查是否有底背离
:param: s1_time, 指定在这个时间得能量柱区域s1 不填写时缺省为倒数第一个匹配段
:param: s2_time, 指定在这个时间得能量柱区域s2 不填写时缺省为倒数第一个匹配段
"""
seg_lens = len(self.macd_segment_list)
if seg_lens <= 2:
return False
s1, s2 = None, None # s1,倒数的一个匹配段s2倒数第二个匹配段
for idx in range(seg_lens):
seg = self.macd_segment_list[-idx - 1]
if direction == Direction.LONG:
if seg.get('macd_count', 0) > 0:
if s1 is None:
s1 = seg
continue
elif s2 is None:
s2 = seg
break
else:
if seg.get('macd_count', 0) < 0:
if s1 is None:
s1 = seg
continue
elif s2 is None:
s2 = seg
break
if s1_time and s2_time:
s1 = [s for s in self.macd_segment_list if s['start'] < s1_time < s['end']]
s2 = [s for s in self.macd_segment_list if s['start'] < s2_time < s['end']]
if len(s1) != 1 or len(s2) != 1:
return False
s1 = s1[-1]
s2 = s2[-1]
# 指定匹配段必须与direction一致
if direction in [Direction.LONG, 1] and (s1['macd_count'] < 0 or s2['macd_count']) < 0:
return False
if direction in [Direction.SHORT, -1] and (s1['macd_count'] > 0 or s2['macd_count']) > 0:
return False
else:
# 没有指定能量柱子区域,从列表中选择
for idx in range(seg_lens):
seg = self.macd_segment_list[-idx - 1]
if direction in [Direction.LONG, 1]:
if seg.get('macd_count', 0) > 0:
if s1 is None:
s1 = seg
continue
elif s2 is None:
s2 = seg
break
else:
if seg.get('macd_count', 0) < 0:
if s1 is None:
s1 = seg
continue
elif s2 is None:
s2 = seg
break
if not all([s1, s2]):
return False
if direction == Direction.LONG:
if direction in [Direction.LONG, 1]:
s1_macd_counts = s1.get('macd_count', 1)
s2_macd_counts = s2.get('macd_count', 1)
s1_max_price = s1.get('max_price', None)
@ -4046,7 +4197,7 @@ class CtaLineBar(object):
if s1_max_price >= s2_max_price * 0.99 and s1_area < s2_area:
return True
if direction == Direction.SHORT:
if direction in [Direction.SHORT, -1]:
s1_macd_counts = s1.get('macd_count', 1)
s2_macd_counts = s2.get('macd_count', 1)
s1_min_price = s1.get('min_price', None)
@ -5366,7 +5517,7 @@ class CtaLineBar(object):
self._bi_list = self.chan_graph.bi_list
self._bi_zs_list = self.chan_graph.bi_zhongshu_list
self._duan_list = self.chan_graph.duan_list
# self._duan_zs_list = self.chan_graph.duan_zhongshu_list
self._duan_zs_list = self.chan_graph.duan_zhongshu_list
self.chanlun_calculated = True
@ -5394,11 +5545,101 @@ class CtaLineBar(object):
self.__count_chanlun()
return self._duan_list
# @property
# def duan_zs_list(self):
# if not self.chanlun_calculated:
# self.__count_chanlun()
# return self._duan_zs_list
@property
def cur_duan(self):
"""当前线段"""
return self.duan_list[-1] if len(self.duan_list) > 0 else None
@property
def pre_duan(self):
"""倒数第二个线段"""
return self.duan_list[-2] if len(self.duan_list) > 1 else None
@property
def tre_duan(self):
"""倒数第三个线段"""
return self.duan_list[-3] if len(self.duan_list) > 2 else None
@property
def duan_zs_list(self):
if not self.chanlun_calculated:
self.__count_chanlun()
return self._duan_zs_list
def duan_height_ma(self, duan_len=20):
"""返回段得平均高度"""
if not self.chanlun_calculated:
self.__count_chanlun()
return round(sum([d.height for d in self.duan_list[-duan_len:]]) / duan_len, self.round_n)
def bi_height_ma(self, bi_len=20):
"""返回分笔得平均高度"""
if not self.chanlun_calculated:
self.__count_chanlun()
return round(sum([bi.height for bi in self.bi_list[-bi_len:]]) / bi_len, self.round_n)
def export_chan(self):
"""
输出缠论 = csv文件
:return:
"""
if not self.para_active_chanlun:
return
if self.export_bi_filename:
# csv 文件 "start", "end", "direction", "height", "high", "low"
# 获取最后记录的start 开始时间
if self.pre_bi_start is None:
self.pre_bi_start = self.get_csv_last_dt(self.export_bi_filename, dt_index=0)
if isinstance(self.pre_bi_start, datetime):
self.pre_bi_start = self.pre_bi_start.strftime("%Y-%m-%d %H:%M:%S")
# 获取所有未写入文件的笔
bi_list = [bi for bi in self.bi_list[:-1] if (not self.pre_bi_start) or bi.start > self.pre_bi_start]
for bi in bi_list:
self.append_data(
file_name=self.export_bi_filename,
dict_data={"start":bi.start, "end":bi.end, "direction":int(bi.direction), "height":float(bi.high-bi.low), "high":float(bi.high), "low":float(bi.low)},
field_names=["start", "end", "direction", "height", "high", "low"]
)
self.pre_bi_start = bi.start
if self.export_zs_filename :
# csv 文件 "start", "end", "direction", "height", "high", "low"
# 获取最后记录的start 开始时间
if self.pre_zs_start is None:
self.pre_zs_start = self.get_csv_last_dt(self.export_zs_filename, dt_index=0)
if isinstance(self.pre_zs_start, datetime):
self.pre_zs_start = self.pre_zs_start.strftime("%Y-%m-%d %H:%M:%S")
# 获取所有未写入文件的zs
zs_list = [zs for zs in self.bi_zs_list[:-1] if (not self.pre_zs_start) or zs.start > self.pre_zs_start]
for zs in zs_list:
self.append_data(
file_name=self.export_zs_filename,
dict_data={"start": zs.start, "end": zs.end, "direction": int(zs.direction),
"height": float(zs.high - zs.low), "high": float(zs.high), "low": float(zs.low)},
field_names=["start", "end", "direction", "height", "high", "low"]
)
self.pre_zs_start = zs.start
if self.export_duan_filename:
# csv 文件 "start", "end", "direction", "height", "high", "low"
# 获取最后记录的start 开始时间
if self.pre_duan_start is None:
self.pre_duan_start = self.get_csv_last_dt(self.export_duan_filename, dt_index=0)
if isinstance(self.pre_duan_start, datetime):
self.pre_duan_start = self.pre_duan_start.strftime("%Y-%m-%d %H:%M:%S")
# 获取所有未写入文件的笔
duan_list = [duan for duan in self.duan_list[:-1] if (not self.pre_duan_start) or duan.start > self.pre_duan_start]
for duan in duan_list:
self.append_data(
file_name=self.export_duan_filename,
dict_data={"start":duan.start, "end":duan.end, "direction":int(duan.direction), "height":float(duan.high-duan.low), "high":float(duan.high), "low":float(duan.low)},
field_names=["start", "end", "direction", "height", "high", "low"]
)
self.pre_duan_start = duan.start
def is_duan(self, direction):
"""当前最新一线段,是否与输入方向一致"""
@ -5527,6 +5768,7 @@ class CtaLineBar(object):
def is_2nd_opportunity(self, direction):
"""
是二买二卖机会
线段内必须至少有一个以上中枢
二买当前线段下行最后2笔不在线段中最后一笔与下行线段同向该笔底部不破线段底部底分型出现且确认
二卖当前线段上行最后2笔不在线段中最后一笔与上行线段同向该笔顶部不破线段顶部顶分型出现且确认
:param direction: 1Direction.LONG, 当前线段的方向, 判断是否二卖机会 -1 Direction.SHORT 判断是否二买
@ -5537,12 +5779,22 @@ class CtaLineBar(object):
direction = 1 if direction == Direction.LONG else -1
# 具备段
if len(self.duan_list) < 1:
if len(self.duan_list) < 2:
return False
cur_duan = self.duan_list[-1]
if cur_duan.direction != direction:
return False
# 检查是否具有两个连续得笔中枢
zs_list = [zs for zs in self.bi_zs_list[-5:] if zs.end > self.duan_list[-2].start]
if len(zs_list) <2:
return False
pre_zs,cur_zs = zs_list[-2:]
if direction == 1 and pre_zs.high > cur_zs.low:
return False
if direction == -1 and pre_zs.low < cur_zs.high:
return False
# 当前段到最新bar之间的笔列表此时未出现中枢
extra_bi_list = [bi for bi in self.bi_list[-3:] if bi.end > cur_duan.end]
if len(extra_bi_list) < 2:
@ -5566,7 +5818,7 @@ class CtaLineBar(object):
return False
def is_contain_zs_inside_duan(self, direction, zs_num):
def is_contain_zs_inside_duan(self, direction, cur_duan=None, zs_num=1):
"""最近段符合方向并且至少包含zs_num个中枢"""
# Direction => int
@ -5576,16 +5828,22 @@ class CtaLineBar(object):
# 具备中枢
if len(self.bi_zs_list) < zs_num:
return False
# 具备段
if len(self.duan_list) < 1:
return False
cur_duan = self.duan_list[-1]
if cur_duan is None:
# 具备段
if len(self.duan_list) < 1:
return False
cur_duan = self.duan_list[-1]
if cur_duan.direction != direction:
return False
# 段的开始时间至少大于前zs_num个中枢的结束时间
if cur_duan.start > self.bi_zs_list[-zs_num].end:
# if cur_duan.start > self.bi_zs_list[-zs_num].end:
# return False
zs_list = [zs for zs in self.bi_zs_list if zs.end > cur_duan.start and zs.start < cur_duan.end]
if len(zs_list) < zs_num:
return False
return True
@ -5803,6 +6061,9 @@ class CtaLineBar(object):
if bi_between_zs.height > bi_leave_cur_zs.height and bi_between_zs.atan > bi_leave_cur_zs.atan:
return True
if self.is_macd_divergence(direction=direction, s1_time=bi_leave_cur_zs.end, s2_time=bi_between_zs.end):
return True
return False
def write_log(self, content):
@ -5859,6 +6120,8 @@ class CtaLineBar(object):
:param line_length: 行数据的长度
:return: None文件不存在或者时间格式不正确
"""
if not os.path.exists(file_name):
return None
with open(file_name, 'r') as f:
f_size = os.path.getsize(file_name)
if f_size < line_length: