[update] 缠论、其他

This commit is contained in:
msincenselee 2021-06-30 10:04:42 +08:00
parent 48205283b6
commit d79157dd8e
9 changed files with 820 additions and 102 deletions

View File

@ -1,6 +1,6 @@
six
PyQt5
pyqtgraph
pyqtgraph==0.10.0
dataclasses; python_version<="3.6"
qdarkstyle
requests

View File

@ -2,12 +2,11 @@
import requests
from .utils.misc import file2dict
from vnpy.rpc import RpcClient
def use(broker, host, port=1430, **kwargs):
return RemoteClient(broker, host, port)
TIMEOUT = 10
class RemoteClient:
def __init__(self, broker, host, port=1430, **kwargs):
self._s = requests.session()
@ -44,7 +43,8 @@ class RemoteClient:
params["broker"] = self._broker
response = self._s.post(self._api + "/prepare", json=params)
# prepare需要启动同花顺客户端需要的时间比较长所以超时给长一些时间
response = self._s.post(self._api + "/prepare", json=params, timeout=60)
if response.status_code >= 300:
raise Exception(response.json()["error"])
return response.json()
@ -76,7 +76,7 @@ class RemoteClient:
return self.common_get("exit")
def common_get(self, endpoint):
response = self._s.get(self._api + "/" + endpoint)
response = self._s.get(self._api + "/" + endpoint, timeout=TIMEOUT)
if response.status_code >= 300:
print(Exception(response.json()["error"]))
return response.json()
@ -85,7 +85,7 @@ class RemoteClient:
params = locals().copy()
params.pop("self")
response = self._s.post(self._api + "/buy", json=params)
response = self._s.post(self._api + "/buy", json=params, timeout=TIMEOUT)
if response.status_code >= 300:
raise Exception(response.json()["error"])
return response.json()
@ -94,7 +94,7 @@ class RemoteClient:
params = locals().copy()
params.pop("self")
response = self._s.post(self._api + "/sell", json=params)
response = self._s.post(self._api + "/sell", json=params, timeout=TIMEOUT)
if response.status_code >= 300:
raise Exception(response.json()["error"])
return response.json()
@ -103,7 +103,69 @@ class RemoteClient:
params = locals().copy()
params.pop("self")
response = self._s.post(self._api + "/cancel_entrust", json=params)
response = self._s.post(self._api + "/cancel_entrust", json=params, timeout=TIMEOUT)
if response.status_code >= 300:
raise Exception(response.json()["error"])
return response.json()
###########
# written by 黄健威
# 以下是新增加的ZMQ Client
# 整个接口对外保持和原来的一致
# 通过对原requests接口的“鸭子类型替换”来实现透明化
def use(broker, host, port=1430, use_zmq=True, **kwargs):
if use_zmq:
return ZMQRemoteClient(broker, host, port)
else:
return RemoteClient(broker, host, port)
class ZMQResponse(object):
# 这个类是模仿requests的返回结果
def __init__(self, status_code, data) -> None:
self.data = data
self.status_code = status_code
def json(self):
return self.data
class MyRpcClient(RpcClient):
# 这个类把vnpy原生的rpc组件中的超时输出去除
# 原版rpc组件中如果上一个请求后30秒内没有新的请求会输出一段提示
def on_disconnected(self):
pass
class ZMQSession(object):
# 这个类是模仿requests的Session
def __init__(self, host, port) -> None:
req_addr = "tcp://{}:{}".format(host, port)
sub_addr = "tcp://{}:{}".format(host, port+1)
self._rpc_client = MyRpcClient()
self._rpc_client.start(req_addr, sub_addr)
def post(self, url, json=None, timeout=10):
name = url.split("/")[-1]
data, status_code = self._rpc_client.call_func(name, json)
resp = ZMQResponse(status_code, data)
return resp
def get(self, url, json=None, timeout=10):
return self.post(url, json, timeout)
def __del__(self):
# 当进程开始销毁对象时显式调用stop来杀死后台的zmq线程避免死锁无法退出
self._rpc_client.stop()
class ZMQRemoteClient(RemoteClient):
# 对原RemoteClient的重载
def __init__(self, broker, host, port=1430, **kwargs):
self._broker = broker
# api这个项目已经不需要了
self._api = ""
# 替换Session
self._s = ZMQSession(host, port)
def __del__(self):
del self._s

View File

@ -15,6 +15,7 @@ from collections import defaultdict
from pathlib import Path
from typing import Any, Callable
from datetime import datetime, timedelta
from time import sleep
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
from copy import copy
@ -1072,6 +1073,10 @@ class CtaEngine(BaseEngine):
self.write_log(f'{strategy_name} => 启动交易')
self.start_strategy(strategy_name)
# 等待3秒避免快速重新请求rest数据
self.write_log(f'等待3秒')
sleep(3)
except Exception as ex:
msg = f'{strategy_name} => 执行on_init异常:{str(ex)}'
self.write_error(msg)
@ -1822,6 +1827,7 @@ class CtaEngine(BaseEngine):
self.strategy_setting = load_json(self.setting_filename)
for strategy_name, strategy_config in self.strategy_setting.items():
self.write_log(f'开始加载{strategy_name}')
self.add_strategy(
class_name=strategy_config["class_name"],
strategy_name=strategy_name,
@ -1831,6 +1837,7 @@ class CtaEngine(BaseEngine):
auto_start=strategy_config.get('auto_start', False)
)
def update_strategy_setting(self, strategy_name: str, setting: dict, auto_init: bool = False,
auto_start: bool = False):
"""

View File

@ -335,6 +335,7 @@ class PortfolioTestingEngine(BackTestingEngine):
last_trading_day = bar.trading_day
# 第二个交易日,撤单
if not symbol.startswith('future_renko'):
self.cancel_orders()
# 更新持仓缓存
self.update_pos_buffer()

View File

@ -1386,6 +1386,7 @@ class CtaProFutureTemplate(CtaProTemplate):
if active_order['offset'] != Offset.OPEN:
grid.open_status = False
grid.close_status = True
grid.open_time = None
self.write_log(f'{grid.direction.value}单已平仓完毕,order_price:{order.price}'
+ f',volume:{order.volume}')
@ -1396,6 +1397,7 @@ class CtaProFutureTemplate(CtaProTemplate):
# 开仓完毕( buy, short)
else:
grid.open_status = True
grid.open_time = self.cur_datetime
self.write_log(f'{grid.direction.value}单已开仓完毕,order_price:{order.price}'
+ f',volume:{order.volume}')

View File

@ -5535,6 +5535,11 @@ class CtaLineBar(object):
# 当前bar已计算
self.chanlun_calculated = True
@property
def cur_fenxing(self):
"""当前分型"""
return self.fenxing_list[-1] if len(self.fenxing_list) > 0 else None
@property
def fenxing_list(self):
if not self.chanlun_calculated:
@ -5590,12 +5595,6 @@ class CtaLineBar(object):
self.__count_chanlun()
return self._duan_zs_list
@property
def duan_zs_list(self):
if not self.chanlun_calculated:
self.__count_chanlun()
return self._duan_zs_list
@property
def cur_duan_zs(self):
"""当前段中枢"""
@ -6106,6 +6105,7 @@ class CtaLineBar(object):
return False
def write_log(self, content):
"""记录CTA日志"""
self.strategy.write_log(u'[' + self.name + u']' + content)

View File

@ -5,7 +5,7 @@
from vnpy.trader.constant import ChanSignals, Direction
from vnpy.component.chanlun.pyChanlun import ChanBi, ChanDuan, ChanObject
from vnpy.component.cta_line_bar import CtaLineBar
from typing import List
from typing import List, Union
# 所有底背驰信号集合
DI_BEICHI_SIGNALS = [ChanSignals.LA0.value, ChanSignals.LA1.value, ChanSignals.LA2.value, ChanSignals.LA3.value,
@ -60,7 +60,7 @@ def check_duan_not_rt(kline: CtaLineBar, direction: Direction) -> bool:
def check_bi_not_rt(kline: CtaLineBar, direction: Direction) -> bool:
"""
检查某一个K线当前分笔是否非实时
检查某一个K线当前分笔是否非实时并符合判断方向
:param kline:
:param Direction:
:return:
@ -82,6 +82,68 @@ def check_bi_not_rt(kline: CtaLineBar, direction: Direction) -> bool:
return True
def check_fx_power(kline: CtaLineBar, direction: Direction) -> str:
"""
获取分型强弱
:param kline: 本级别K线
:param direction: 分型方向 1顶分型-1底分型
:return: ,普通,不匹配
"""
ret = '不匹配'
# 不存在分型,或者分型还没结束,不做判断
if not kline.cur_fenxing or kline.cur_fenxing.is_rt:
return ret
direction = 1 if direction == Direction.LONG else -1
# 分型方向不一致
if kline.cur_fenxing.direction != direction:
return ret
# 分型前x根bar
pre_bars = [bar for bar in kline.line_bar[-10:] if
bar.datetime.strftime('%Y-%m-%d %H:%M:%S') < kline.cur_fenxing.index]
if len(pre_bars) == 0:
return ret
pre_bar = pre_bars[-1]
# 分型后x根bar
extra_bars = \
[bar for bar in kline.line_bar[-10:] if bar.datetime.strftime('%Y-%m-%d %H:%M:%S') > kline.cur_fenxing.index]
# 分型后有三根bar
if len(extra_bars) < 3:
return ret
# 处理顶分型
if kline.cur_fenxing.direction == 1:
# 顶分型后第一根bar的低点没有超过前bar的低点
if extra_bars[0].low_price >= pre_bar.low_price:
return '普通'
# 找到正确形态第二、第三根bar都站在顶分型之下
if pre_bar.low_price >= extra_bars[1].high_price > extra_bars[2].high_price:
return ''
return '普通'
# 处理底分型
if kline.cur_fenxing.direction == -1:
# 底分型后第一根bar的高点没有超过前bar的高点
if extra_bars[0].high_price <= pre_bar.high_price:
return ''
# 找到正确形态第二、第三根bar都站在底分型之上
if pre_bar.high_price <= extra_bars[1].low_price < extra_bars[2].low_price:
return ''
return '普通'
return ret
def check_chan_xt(kline: CtaLineBar, bi_list: List[ChanObject]) -> str:
"""
获取缠论得形态
@ -100,8 +162,8 @@ def check_chan_xt(kline: CtaLineBar, bi_list: List[ChanObject]) -> str:
return check_chan_xt_nine_bi(kline, bi_list)
if len(bi_list) == 11:
return check_chan_xt_eleven_bi(kline, bi_list)
if len(bi_list) == 13:
return check_chan_xt_thirteen_bi(kline, bi_list)
if len(bi_list) >= 13:
return check_chan_xt_thirteen_bi(kline, bi_list[-13:])
return v
@ -201,9 +263,10 @@ def check_chan_xt_five_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
if (min_low == bi_3.low and bi_5.low < bi_1.low) or (min_low == bi_5.low):
v = ChanSignals.LA0.value
# 类趋势底背驰
if max_high == bi_1.high and min_low == bi_5.low and bi_4.high < bi_2.low and bi_5.height < max(bi_3.height,
bi_1.height):
# 类趋势底背驰( 笔5 的强度比笔1、笔3低
if max_high == bi_1.high and min_low == bi_5.low and bi_4.high < bi_2.low \
and bi_5.height < max(bi_3.height, bi_1.height) \
and bi_5.atan < max(bi_3.atan, bi_1.atan):
v = ChanSignals.LA0.value
# 上颈线突破
@ -231,8 +294,10 @@ def check_chan_xt_five_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
v = ChanSignals.SA0.value
# 类趋势顶背驰
if min_low == bi_1.low and max_high == bi_5.high and bi_5.height < max(bi_1.height,
bi_3.height) and bi_4.low > bi_2.high:
if min_low == bi_1.low and max_high == bi_5.high \
and bi_5.height < max(bi_1.height, bi_3.height) \
and bi_5.atan < max(bi_1.atan, bi_3.atan) \
and bi_4.low > bi_2.high:
v = ChanSignals.SA0.value
# 下颈线突破
@ -240,7 +305,7 @@ def check_chan_xt_five_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
or (max_high == bi_3.high and bi_5.low < bi_3.low < bi_5.high < max_high):
v = ChanSignals.SG0.value
# 五笔三卖要求bi_5.low是最低点
# 五笔三卖要求bi_5.low是最低点中枢可能是1~3
if min(bi_1.high, bi_3.high) > max(bi_1.low, bi_3.low) > bi_5.high and bi_5.low == min_low:
v = ChanSignals.SI0.value
@ -271,21 +336,26 @@ def check_chan_xt_seven_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
if bi_7.direction == -1:
if bi_1.high == max_high and bi_7.low == min_low:
# aAbcd式底背驰
if min(bi_2.high, bi_4.high) > max(bi_2.low, bi_4.low) > bi_6.high and bi_7.height < bi_5.height:
# aAbcd式底背驰, d.高度斜率 小于 b.高度斜率
if min(bi_2.high, bi_4.high) > max(bi_2.low, bi_4.low) > bi_6.high \
and bi_7.height < bi_5.height and bi_7.atan <= bi_5.atan:
v = ChanSignals.LA0.value
# abcAd式底背驰
if bi_2.low > min(bi_4.high, bi_6.high) > max(bi_4.low, bi_6.low) and bi_7.height < (bi_1.high - bi_3.low):
if bi_2.low > min(bi_4.high, bi_6.high) > max(bi_4.low, bi_6.low) \
and bi_7.height < (bi_1.high - bi_3.low) \
and bi_7.atan < (bi_1.atan + bi_3.atan) / 2:
v = ChanSignals.LA0.value
# aAb式底背驰
if min(bi_2.high, bi_4.high, bi_6.high) > max(bi_2.low, bi_4.low, bi_6.low) and bi_7.height < bi_1.height:
if min(bi_2.high, bi_4.high, bi_6.high) > max(bi_2.low, bi_4.low, bi_6.low) \
and bi_7.height < bi_1.height and bi_7.atan <= bi_1.atan:
v = ChanSignals.LA0.value
# 类趋势底背驰
if bi_2.low > bi_4.high and bi_4.low > bi_6.high and bi_7.height < max(bi_5.height, bi_3.height,
bi_1.height):
if bi_2.low > bi_4.high and bi_4.low > bi_6.high \
and bi_7.height < max(bi_5.height, bi_3.height, bi_1.height)\
and bi_7.atan < max(bi_5.atan, bi_3.atan, bi_1.atan):
v = ChanSignals.LA0.value
# 向上中枢完成
@ -304,20 +374,25 @@ def check_chan_xt_seven_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
# 顶背驰
if bi_1.low == min_low and bi_7.high == max_high:
# aAbcd式顶背驰
if bi_6.low > min(bi_2.high, bi_4.high) > max(bi_2.low, bi_4.low) and bi_7.height < bi_5.height:
if bi_6.low > min(bi_2.high, bi_4.high) > max(bi_2.low, bi_4.low) \
and bi_7.height < bi_5.height and bi_7.atan <= bi_5.atan:
v = ChanSignals.SA0.value
# abcAd式顶背驰
if min(bi_4.high, bi_6.high) > max(bi_4.low, bi_6.low) > bi_2.high and bi_7.height < (bi_3.high - bi_1.low):
if min(bi_4.high, bi_6.high) > max(bi_4.low, bi_6.low) > bi_2.high \
and bi_7.height < (bi_3.high - bi_1.low) \
and bi_7.atan < (bi_1.atan + bi_3.atan) / 2:
v = ChanSignals.SA0.value
# aAb式顶背驰
if min(bi_2.high, bi_4.high, bi_6.high) > max(bi_2.low, bi_4.low, bi_6.low) and bi_7.height < bi_1.height:
if min(bi_2.high, bi_4.high, bi_6.high) > max(bi_2.low, bi_4.low, bi_6.low) \
and bi_7.height < bi_1.height and bi_7.atan <= bi_1.atan:
v = ChanSignals.SA0.value
# 类趋势顶背驰
if bi_2.high < bi_4.low and bi_4.high < bi_6.low and bi_7.height < max(bi_5.height, bi_3.height,
bi_1.height):
if bi_2.high < bi_4.low and bi_4.high < bi_6.low \
and bi_7.height < max(bi_5.height, bi_3.height, bi_1.height)\
and bi_7.atan < max(bi_5.atan, bi_3.atan, bi_1.atan):
v = ChanSignals.SA0.value
# 向下中枢完成
@ -327,7 +402,7 @@ def check_chan_xt_seven_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
if min(bi_1.high, bi_3.high) > min(bi_5.low, bi_7.low):
v = ChanSignals.SH0.value
# 七笔三卖567回调
# 七笔三卖567回调中枢可能在1~3
if bi_5.low == min_low and bi_5.low < bi_7.low \
and min(bi_1.high, bi_3.high) > max(bi_1.low, bi_3.low) > bi_7.high > bi_5.high:
v = ChanSignals.SI0.value
@ -357,17 +432,18 @@ def check_chan_xt_nine_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
if min(bi_2.high, bi_4.high) > max(bi_2.low, bi_4.low) > bi_6.high \
and min(bi_6.high, bi_8.high) > max(bi_6.low, bi_8.low) \
and min(bi_2.low, bi_4.low) > max(bi_6.high, bi_8.high) \
and bi_9.height < bi_5.height:
and bi_9.height < bi_5.height and bi_7.atan <= bi_5.atan:
v = ChanSignals.LA0.value
# aAb式底背驰
if min(bi_2.high, bi_4.high, bi_6.high, bi_8.high) > max(bi_2.low, bi_4.low, bi_6.low, bi_8.low) \
and bi_9.height < bi_1.height and bi_3.low >= bi_1.low and bi_7.high <= bi_9.high:
and bi_9.height < bi_1.height and bi_9.atan <= bi_1.atan \
and bi_3.low >= bi_1.low and bi_7.high <= bi_9.high:
v = ChanSignals.LA0.value
# aAbcd式底背驰
if min(bi_2.high, bi_4.high, bi_6.high) > max(bi_2.low, bi_4.low, bi_6.low) > bi_8.high \
and bi_9.height < bi_7.height:
and bi_9.height < bi_7.height and bi_9.atan <= bi_7.atan:
v = ChanSignals.LA0.value
# ABC式底背驰
@ -376,7 +452,7 @@ def check_chan_xt_nine_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
and (bi_1.high - bi_3.low) > (bi_7.high - bi_9.low):
v = ChanSignals.LA0.value
# 九笔三买
# 九笔三买(789回调中枢可能在3~7内
if min_low == bi_1.low and max_high == bi_9.high \
and bi_9.low > min([x.high for x in [bi_3, bi_5, bi_7]]) > max([x.low for x in [bi_3, bi_5, bi_7]]):
v = ChanSignals.LI0.value
@ -387,17 +463,18 @@ def check_chan_xt_nine_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
if bi_6.low > min(bi_2.high, bi_4.high) > max(bi_2.low, bi_4.low) \
and min(bi_6.high, bi_8.high) > max(bi_6.low, bi_8.low) \
and max(bi_2.high, bi_4.high) < min(bi_6.low, bi_8.low) \
and bi_9.height < bi_5.height:
and bi_9.height < bi_5.height and bi_9.atan <= bi_5.atan:
v = ChanSignals.SA0.value
# aAb式顶背驰
if min(bi_2.high, bi_4.high, bi_6.high, bi_8.high) > max(bi_2.low, bi_4.low, bi_6.low, bi_8.low) \
and bi_9.height < bi_1.height and bi_3.high <= bi_1.high and bi_7.low >= bi_9.low:
and bi_9.height < bi_1.height and bi_9.atan <= bi_1.atan \
and bi_3.high <= bi_1.high and bi_7.low >= bi_9.low:
v = ChanSignals.SA0.value
# aAbcd式顶背驰
if bi_8.low > min(bi_2.high, bi_4.high, bi_6.high) > max(bi_2.low, bi_4.low, bi_6.low) \
and bi_9.height < bi_7.height:
and bi_9.height < bi_7.height and bi_9.atan <= bi_7.atan:
v = ChanSignals.SA0.value
# ABC式顶背驰
@ -435,14 +512,14 @@ def check_chan_xt_eleven_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
if min(bi_2.high, bi_4.high, bi_6.high) > max(bi_2.low, bi_4.low, bi_6.low) > bi_8.high \
and min(bi_8.high, bi_10.high) > max(bi_8.low, bi_10.low) \
and min(bi_2.low, bi_4.low, bi_6.low) > max(bi_8.high, bi_10.high) \
and bi_11.height < bi_7.height:
and bi_11.height < bi_7.height and bi_11.atan <= bi_7.atan:
v = ChanSignals.LA0.value
# aAbBc式底背驰bi_6-bi_10构成B
if min(bi_2.high, bi_4.high) > max(bi_2.low, bi_4.low) > bi_6.high \
and min(bi_6.high, bi_8.high, bi_10.high) > max(bi_6.low, bi_8.low, bi_10.low) \
and min(bi_2.low, bi_4.low) > max(bi_6.high, bi_8.high, bi_10.high) \
and bi_11.height < bi_5.height:
and bi_11.height < bi_5.height and bi_11.atan <= bi_5.atan:
v = ChanSignals.LA0.value
# ABC式底背驰A5B3C3
@ -460,7 +537,7 @@ def check_chan_xt_eleven_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
and bi_6.high > bi_4.low and bi_1.high - bi_3.low > bi_7.high - bi_11.low:
v = ChanSignals.LA0.value
# C内部背驰
if bi_11.height < max(bi_9.height, bi_7.height):
if bi_11.height < max(bi_9.height, bi_7.height) and bi_11.atan <= max(bi_9.atan, bi_7.atan):
v = ChanSignals.LB0.value
# ABC式底背驰A3B5C3
@ -468,7 +545,7 @@ def check_chan_xt_eleven_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
and bi_9.high > bi_11.high and bi_1.high - bi_3.low > bi_9.high - bi_11.low:
v = ChanSignals.LA0.value
# C内部背驰
if bi_11.height < max(bi_9.height, bi_7.height):
if bi_11.height < max(bi_9.height, bi_7.height) and bi_11.atan <= max(bi_9.atan, bi_7.atan):
v = ChanSignals.LB0.value
elif direction == 1:
@ -477,14 +554,14 @@ def check_chan_xt_eleven_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
if bi_8.low > min(bi_2.high, bi_4.high, bi_6.high) >= max(bi_2.low, bi_4.low, bi_6.low) \
and min(bi_8.high, bi_10.high) >= max(bi_8.low, bi_10.low) \
and max(bi_2.high, bi_4.high, bi_6.high) < min(bi_8.low, bi_10.low) \
and bi_11.height < bi_7.height:
and bi_11.height < bi_7.height and bi_11.atan <= bi_7.atan:
v = ChanSignals.SA0.value
# aAbBC式顶背驰bi_6-bi_10构成B
if bi_6.low > min(bi_2.high, bi_4.high) >= max(bi_2.low, bi_4.low) \
and min(bi_6.high, bi_8.high, bi_10.high) >= max(bi_6.low, bi_8.low, bi_10.low) \
and max(bi_2.high, bi_4.high) < min(bi_6.low, bi_8.low, bi_10.low) \
and bi_11.height < bi_7.height:
and bi_11.height < bi_7.height and bi_11.atan <= bi_7.atan:
v = ChanSignals.SA0.value
# ABC式顶背驰A5B3C3
@ -492,7 +569,7 @@ def check_chan_xt_eleven_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
and bi_8.low < bi_6.high and bi_11.high - bi_9.low < bi_5.high - bi_1.low:
v = ChanSignals.SA0.value
# C内部背驰
if bi_11.height < bi_9.height:
if bi_11.height < bi_9.height and bi_11.atan <= bi_9.atan:
v = ChanSignals.SB0.value
# ABC式顶背驰A3B3C5
@ -500,7 +577,7 @@ def check_chan_xt_eleven_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
and bi_6.low < bi_4.high and bi_11.high - bi_7.low < bi_3.high - bi_1.low:
v = ChanSignals.SA0.value
# C内部背驰
if bi_11.height < max(bi_9.height, bi_7.height):
if bi_11.height < max(bi_9.height, bi_7.height) and bi_11.atan <= max(bi_9.atan, bi_7.atan):
v = ChanSignals.SB0.value
# ABC式顶背驰A3B5C3
@ -508,7 +585,7 @@ def check_chan_xt_eleven_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
and bi_9.low < bi_11.low and bi_3.high - bi_1.low > bi_11.high - bi_9.low:
v = ChanSignals.SA0.value
# C内部背驰
if bi_11.height < max(bi_9.height, bi_7.height):
if bi_11.height < max(bi_9.height, bi_7.height) and bi_11.atan <= max(bi_9.atan, bi_7.atan):
v = ChanSignals.SB0.value
return v
@ -536,7 +613,7 @@ def check_chan_xt_thirteen_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
if min(bi_2.high, bi_4.high, bi_6.high) > max(bi_2.low, bi_4.low, bi_6.low) > bi_8.high \
and min(bi_8.high, bi_10.high, bi_12.high) > max(bi_8.low, bi_10.low, bi_12.low) \
and min(bi_2.low, bi_4.low, bi_6.low) > max(bi_8.high, bi_10.high, bi_12.high) \
and bi_13.height < bi_7.height:
and bi_13.height < bi_7.height and bi_13.atan <= bi_7.atan:
v = ChanSignals.LA0.value
# ABC式底背驰A5B3C5
@ -544,7 +621,7 @@ def check_chan_xt_thirteen_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
and bi_8.high > bi_6.low and bi_1.high - bi_5.low > bi_9.high - bi_13.low:
v = ChanSignals.LA0.value
if bi_13.height < max(bi_11.height, bi_9.height):
if bi_13.height < max(bi_11.height, bi_9.height) and bi_13.atan <= max(bi_11.atan, bi_9.atan):
v = ChanSignals.LB0.value
# ABC式底背驰A3B5C5
@ -553,7 +630,7 @@ def check_chan_xt_thirteen_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
and bi_1.high - bi_3.low > bi_9.high - bi_13.low:
v = ChanSignals.LA0.value
if bi_13.height < max(bi_11.height, bi_9.height):
if bi_13.height < max(bi_11.height, bi_9.height) and bi_13.atan <= max(bi_11.atan, bi_9.atan):
v = ChanSignals.LB0.value
# ABC式底背驰A5B5C3
@ -562,7 +639,7 @@ def check_chan_xt_thirteen_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
and bi_1.high - bi_5.low > bi_11.high - bi_13.low:
v = ChanSignals.LA0.value
if bi_13.height < bi_11.height:
if bi_13.height < bi_11.height and bi_13.atan <= bi_11.atan:
v = ChanSignals.LB0.value
# 上涨线段时,判断背驰类型
@ -572,7 +649,7 @@ def check_chan_xt_thirteen_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
if bi_8.low > min(bi_2.high, bi_4.high, bi_6.high) >= max(bi_2.low, bi_4.low, bi_6.low) \
and min(bi_8.high, bi_10.high, bi_12.high) >= max(bi_8.low, bi_10.low, bi_12.low) \
and max(bi_2.high, bi_4.high, bi_6.high) < min(bi_8.low, bi_10.low, bi_12.low) \
and bi_13.height < bi_7.height:
and bi_13.height < bi_7.height and bi_13.atan <= bi_7.atan:
v = ChanSignals.SA0.value
# ABC式顶背驰A5B3C5
@ -580,7 +657,7 @@ def check_chan_xt_thirteen_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
and bi_8.low < bi_6.high and bi_5.high - bi_1.low > bi_13.high - bi_9.low:
v = ChanSignals.SA0.value
# C内部顶背驰形成双重顶背驰
if bi_13.height < max(bi_11.height, bi_9.height):
if bi_13.height < max(bi_11.height, bi_9.height) and bi_13.atan <= max(bi_11.atan, bi_9.atan):
v = ChanSignals.SB0.value
# ABC式顶背驰A3B5C5
@ -589,7 +666,7 @@ def check_chan_xt_thirteen_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
and bi_3.high - bi_1.low > bi_13.high - bi_9.low:
v = ChanSignals.SA0.value
# C内部顶背驰形成双重顶背驰
if bi_13.height < max(bi_11.height, bi_9.height):
if bi_13.height < max(bi_11.height, bi_9.height) and bi_13.atan <= max(bi_11.atan, bi_9.atan):
v = ChanSignals.SB0.value
# ABC式顶背驰A5B5C3
@ -598,6 +675,561 @@ def check_chan_xt_thirteen_bi(kline: CtaLineBar, bi_list: List[ChanObject]):
and bi_5.high - bi_1.low > bi_13.high - bi_11.low:
v = ChanSignals.SA0.value
# C内部顶背驰形成双重顶背驰
if bi_13.height < bi_11.height:
if bi_13.height < bi_11.height and bi_13.atan <= bi_11.atan:
v = ChanSignals.SB0.value
return v
def check_pzbc_1st(big_kline: CtaLineBar, small_kline: Union[CtaLineBar, None], signal_direction: Direction):
"""
判断中枢盘整背驰1买/1卖信号
big_kline当前线段为调整段与信号方向相反线段具有盘整一个中枢
进入中枢与离开中枢的一笔力度对比高度斜率
:param big_kline: 本级别K线
:param small_kline: 次级别K线可选可以是None
:param signal_direction: 信号方向
:return:
"""
direction = 1 if signal_direction == Direction.LONG else -1
# 排除
# 没有前线段、没有笔中枢
# 当前线段方向与判断方向一致、
# 前线段比当前线段高度小
if not big_kline.pre_duan \
or not big_kline.cur_bi_zs \
or big_kline.cur_duan.direction == direction \
or big_kline.pre_duan.height < big_kline.cur_duan.height:
return False
# 如果有次级别K线时也要判断方向
if small_kline and (not small_kline.pre_duan or small_kline.cur_duan.direction == direction):
return False
# 当前线段必须有5笔
if len(big_kline.cur_duan.bi_list) < 5:
return False
# 线段内,只允许有一个中枢
if len([zs for zs in big_kline.bi_zs_list[-3:] if zs.start > big_kline.cur_duan.start]) > 1:
return False
# 当前笔中枢必须在当前线段之内
if big_kline.cur_bi_zs.start < big_kline.cur_duan.start:
return False
# 当前线段的高低点,与最高、最低分笔一致(不会出现区间套)
if signal_direction == Direction.LONG:
# 当前最后一笔,就是线段的最后一笔
if not duan_bi_is_end(big_kline.cur_duan, Direction.SHORT):
return False
# 当前的线段,已经具备底分型
if not check_duan_not_rt(big_kline, Direction.SHORT):
return False
# 当前的笔,走完,具备底分型
if not check_bi_not_rt(big_kline, Direction.SHORT):
return False
else:
if not duan_bi_is_end(big_kline.cur_duan, Direction.LONG):
return False
if not check_duan_not_rt(big_kline, Direction.LONG):
return False
# 笔走完
if not check_bi_not_rt(big_kline, Direction.LONG):
return False
# 中枢的进入笔、离开笔
# 中枢的首笔与线段不同向,则选择中枢之前的一笔和最后的一笔
if big_kline.cur_bi_zs.bi_list[0].direction != big_kline.cur_duan.direction:
entry_bi_list = [bi for bi in big_kline.cur_duan.bi_list if bi.end <= big_kline.cur_bi_zs.start]
exit_bi_list = [bi for bi in big_kline.cur_duan.bi_list if bi.end > big_kline.cur_bi_zs.end]
if not (len(entry_bi_list) >= 1 and len(exit_bi_list) == 1):
return False
entry_bi = entry_bi_list[-1]
exit_bi = exit_bi_list[0]
# 中枢首笔跟线段同向
else:
entry_bi = big_kline.cur_bi_zs.bi_list[0]
exit_bi = big_kline.cur_duan.bi_list[-1]
# 进入笔的高度,要高于离开笔,或者,进入笔的斜率,要大于离开笔
if entry_bi.height > exit_bi.height or entry_bi.atan > exit_bi.atan:
# 分析次级别K线判断其是否也发生线段背驰
if small_kline:
if len(small_kline.cur_duan.bi_list) > 1:
if (small_kline.cur_duan.bi_list[0].height > small_kline.cur_duan.bi_list[-1].height \
and small_kline.cur_duan.bi_list[0].atan > small_kline.cur_duan.bi_list[-1].atan) \
or (small_kline.cur_duan.bi_list[-3].height > small_kline.cur_duan.bi_list[-1].height \
and small_kline.cur_duan.bi_list[-3].atan > small_kline.cur_duan.bi_list[-1].atan):
return True
else:
return True
# 判断是否macd背驰
if big_kline.is_macd_divergence(big_kline.cur_duan.direction, exit_bi.end, entry_bi.end):
return True
return False
def check_qsbc_1st(big_kline: CtaLineBar, small_kline: Union[CtaLineBar, None], signal_direction: Direction):
"""
判断趋势背驰1买/1卖信号
big_kline当前线段为趋势与信号方向相反线段具有2个中枢
进入最后中枢与离开中枢的一笔力度对比高度斜率
:param big_kline: 本级别K线
:param small_kline: 次级别K线可选可以是None
:param signal_direction: 信号方向
:return:
"""
direction = 1 if signal_direction == Direction.LONG else -1
# 排除
# 没有前线段、没有笔中枢
# 当前线段方向与判断方向一致
if not big_kline.pre_duan \
or not big_kline.cur_bi_zs \
or big_kline.cur_duan.direction == direction:
return False
# 如果有次级别K线时也要判断方向
if small_kline and (not small_kline.pre_duan or small_kline.cur_duan.direction == direction):
return False
# 线段内至少有2个或以上中枢
if len([zs for zs in big_kline.bi_zs_list[-4:] if zs.start > big_kline.cur_duan.start]) < 2:
return False
# 当前线段的高低点,与最高、最低分笔一致(不会出现区间套)
if signal_direction == Direction.LONG:
# 笔走完
if not check_bi_not_rt(big_kline, Direction.SHORT):
return False
if not check_duan_not_rt(big_kline, Direction.SHORT):
return False
# 最后一笔
if not duan_bi_is_end(big_kline.cur_duan, Direction.SHORT):
return False
else:
# 笔走完
if not check_bi_not_rt(big_kline, Direction.LONG):
return False
if not check_duan_not_rt(big_kline, Direction.LONG):
return False
# 最后一笔
if not duan_bi_is_end(big_kline.cur_duan, Direction.LONG):
return False
# 中枢的进入笔、离开笔
entry_bi_list = [bi for bi in big_kline.cur_duan.bi_list if bi.end <= big_kline.cur_bi_zs.start]
exit_bi_list = [bi for bi in big_kline.cur_duan.bi_list if bi.end > big_kline.cur_bi_zs.end]
if not (len(entry_bi_list) >= 1 and len(exit_bi_list) == 1):
return False
# 离开中枢的一笔其middle必须也不在中枢内
if signal_direction == Direction.LONG and exit_bi_list[0].middle > big_kline.cur_bi_zs.low:
return False
if signal_direction == Direction.SHORT and exit_bi_list[0].middle < big_kline.cur_bi_zs.high:
return False
# 进入中枢一笔,与离开中枢笔,方向必须相同
if entry_bi_list[-1].direction != exit_bi_list[-1].direction:
return False
# 进入笔的高度,要高于离开笔,或者,进入笔的斜率,要大于离开笔
if entry_bi_list[-1].height > exit_bi_list[0].height and entry_bi_list[-1].atan > exit_bi_list[0].atan:
# 分析次级别K线判断其是否也发生线段背驰
if small_kline:
if len(small_kline.cur_duan.bi_list) > 1:
if (small_kline.cur_duan.bi_list[0].height > small_kline.cur_duan.bi_list[-1].height \
and small_kline.cur_duan.bi_list[0].atan > small_kline.cur_duan.bi_list[-1].atan) \
or (small_kline.cur_duan.bi_list[-3].height > small_kline.cur_duan.bi_list[-1].height \
and small_kline.cur_duan.bi_list[-3].atan > small_kline.cur_duan.bi_list[-1].atan):
return True
else:
return True
return False
def check_pz3bc_1st(big_kline: CtaLineBar, small_kline: Union[CtaLineBar, None], signal_direction: Direction):
"""
判断三卖后盘整背驰一买/三买后盘整背驰1卖信号
big_kline当前线段与信号方向相反线段具有盘整一个中枢离开中枢的一笔力度与三买/卖信号后的一笔对比高度斜率
:param big_kline: 本级别K线
:param small_kline: 次级别K线可选可以是None
:param signal_direction: 信号方向
:return:
"""
direction = 1 if signal_direction == Direction.LONG else -1
# 排除
# 没有前线段、没有笔中枢
# 当前线段方向与判断方向一致、
if not big_kline.pre_duan \
or not big_kline.cur_bi_zs \
or big_kline.cur_duan.direction == direction:
return False
# 如果有次级别K线时也要判断方向
if small_kline and (not small_kline.pre_duan or small_kline.cur_duan.direction == direction):
return False
# 当前线段必须有5笔
if len(big_kline.cur_duan.bi_list) < 5:
return False
# 当前笔中枢必须在当前线段之内
if big_kline.cur_bi_zs.start < big_kline.cur_duan.start:
return False
# 当前线段的高低点,与最高、最低分笔一致(不会出现区间套)
if signal_direction == Direction.LONG:
# 下跌线段与下跌笔为最低点
if not duan_bi_is_end(big_kline.cur_duan, Direction.SHORT):
return False
# 下跌线段具有底分
if not check_duan_not_rt(big_kline, Direction.SHORT):
return False
# 下跌笔具有底分
if not check_bi_not_rt(big_kline, Direction.SHORT):
return False
else:
# 上涨线段与上涨笔为最高点
if not duan_bi_is_end(big_kline.cur_duan, Direction.LONG):
return False
# 上涨线段具有顶分
if not check_duan_not_rt(big_kline, Direction.LONG):
return False
# 上涨笔具有顶分
if not check_bi_not_rt(big_kline, Direction.LONG):
return False
# 中枢的离开笔,有三笔
exit_bi_list = [bi for bi in big_kline.cur_duan.bi_list if bi.end > big_kline.cur_bi_zs.end]
if len(exit_bi_list) != 3:
return False
# 离开中枢首笔的高度,要高于末笔笔,或者,斜率要大于末笔
if exit_bi_list[0].height > exit_bi_list[-1].height and exit_bi_list[0].atan > exit_bi_list[-1].atan:
# 分析次级别K线判断其是否也发生线段背驰
if small_kline:
if len(small_kline.cur_duan.bi_list) > 1:
if (small_kline.cur_duan.bi_list[0].height > small_kline.cur_duan.bi_list[-1].height \
and small_kline.cur_duan.bi_list[0].atan > small_kline.cur_duan.bi_list[-1].atan) \
or (small_kline.cur_duan.bi_list[-3].height > small_kline.cur_duan.bi_list[-1].height \
and small_kline.cur_duan.bi_list[-3].atan > small_kline.cur_duan.bi_list[-1].atan):
return True
else:
return True
return False
def check_qjt_1st(big_kline: CtaLineBar, small_kline: Union[CtaLineBar, None], signal_direction: Direction):
"""
判断区间套一买/区间套1卖信号
big_kline当前线段与信号方向相反线段具有盘整一个中枢
[一买信号为例]
中枢前下跌一笔a中枢后存在两个下跌笔bc
b比a力度小c比b力度小高度斜率
:param big_kline: 本级别K线
:param small_kline: 次级别K线可选可以是None
:param signal_direction: 信号方向
:return:
"""
direction = 1 if signal_direction == Direction.LONG else -1
# 排除
# 没有前线段、没有笔中枢
# 当前线段方向与判断方向一致、
if not big_kline.pre_duan \
or not big_kline.cur_bi_zs \
or big_kline.cur_duan.direction == direction:
return False
# 如果有次级别K线时也要判断方向
if small_kline and (not small_kline.pre_duan or small_kline.cur_duan.direction == direction):
return False
# 当前笔中枢必须在当前线段之内
if big_kline.cur_bi_zs.start < big_kline.cur_duan.start:
return False
# 当前线段结束需要等于当前笔结束
if big_kline.cur_duan.end != big_kline.cur_bi.end:
return False
# 寻找做多信号时,要求当前下跌笔底分型成立
if signal_direction == Direction.LONG:
# 笔走完
if not check_bi_not_rt(big_kline, Direction.SHORT):
return False
# 寻找做空信号时,要求当前上涨笔顶分型成立
else:
# 笔走完
if not check_bi_not_rt(big_kline, Direction.LONG):
return False
# 进入中枢前的一笔
entry_bi_list = [bi for bi in big_kline.cur_duan.bi_list if bi.end <= big_kline.cur_bi_zs.start]
# 中枢的离开笔,有三笔
exit_bi_list = [bi for bi in big_kline.cur_duan.bi_list if bi.end > big_kline.cur_bi_zs.end]
if len(entry_bi_list) < 1 or len(exit_bi_list) != 3:
return False
# c笔的高度要高于b笔高于进入笔a c笔斜率要大于b笔> 进入笔a
if exit_bi_list[0].height > exit_bi_list[-1].height > entry_bi_list[-1].height \
and exit_bi_list[0].atan > exit_bi_list[-1].atan > entry_bi_list[-1].atan:
# 分析次级别K线判断其是否也发生线段背驰
if small_kline:
if len(small_kline.cur_duan.bi_list) > 1:
if (small_kline.cur_duan.bi_list[0].height > small_kline.cur_duan.bi_list[-1].height \
and small_kline.cur_duan.bi_list[0].atan > small_kline.cur_duan.bi_list[-1].atan) \
or (small_kline.cur_duan.bi_list[-3].height > small_kline.cur_duan.bi_list[-1].height \
and small_kline.cur_duan.bi_list[-3].atan > small_kline.cur_duan.bi_list[-1].atan):
return True
else:
return True
return False
def check_qsbc_2nd(big_kline: CtaLineBar, small_kline: Union[CtaLineBar, None], signal_direction: Direction):
"""
判断趋势背驰1买/1卖后的二买二卖信号
big_kline当前线段为趋势与信号方向相反线段具有2个中枢
或者 big_kline的 tre_duan,pre_duan,cur_duan 为趋势之间具有两个以上连续方向的中枢
cur_duan的末端一笔形成趋势背驰或者末端一笔超长时其次级别形成具有背驰信号
big_kline当前段外具有两笔最后一笔具有确认分型斜率比cur_duan末笔的斜率小
:param big_kline: 本级别K线
:param small_kline: 次级别K线可选可以是None
:param signal_direction: 信号方向
:return:
"""
direction = 1 if signal_direction == Direction.LONG else -1
# 排除
# 没有前线段、没有笔中枢
# 当前线段方向与判断方向一致
if not big_kline.pre_duan \
or not big_kline.cur_bi_zs \
or big_kline.cur_duan.direction == direction:
return False
# 二买信号时,当前笔必须时下跌笔+底分型
if signal_direction == Direction.LONG:
# 若不是下跌笔,并且下跌笔没有底分型
if not check_bi_not_rt(big_kline, Direction.SHORT):
return False
# 二卖信号时,当前笔必须是上涨笔+顶分型
else:
# 若不是上涨笔,并且上涨笔没有顶分型
if not check_bi_not_rt(big_kline, Direction.LONG):
return False
# 当前线段内至少有2个或以上中枢
has_2_continue_zs = False
bi_zs_in_cur_duan = [zs for zs in big_kline.bi_zs_list[-4:] if zs.start > big_kline.cur_duan.start]
if len(bi_zs_in_cur_duan) >= 2:
# 两个连续下跌的中枢,可以进一步判断是否满足二买做多
if signal_direction == Direction.LONG and bi_zs_in_cur_duan[-2].low > bi_zs_in_cur_duan[-1].low:
has_2_continue_zs = True
# 两个连续上升的中枢,可以进一步判断是否满足二卖做空
if signal_direction == Direction.SHORT and bi_zs_in_cur_duan[-2].high < bi_zs_in_cur_duan[-1].high:
has_2_continue_zs = True
# 当前线段内,不足两个中枢,判断前三个线段内,是否具有两个或两个以上中枢
elif big_kline.tre_duan:
# 找出三个线段内的所有中枢
bi_zs_after_tre_duan = [zs for zs in big_kline.bi_zs_list[-4:] if zs.start > big_kline.tre_duan.start]
if len(bi_zs_after_tre_duan) >= 2:
if signal_direction == Direction.LONG \
and big_kline.tre_duan.high > big_kline.cur_duan.high \
and bi_zs_after_tre_duan[-2].low > bi_zs_after_tre_duan[-1].low:
has_2_continue_zs = True
if signal_direction == Direction.SHORT \
and big_kline.tre_duan.low < big_kline.cur_duan.low \
and bi_zs_after_tre_duan[-2].high < bi_zs_after_tre_duan[-1].high:
has_2_continue_zs = True
# 找不出两个连续同向的中枢,就不能进一步判断是否存在二买二卖
if not has_2_continue_zs:
return False
# 当前线段外的两笔
extra_bi_list = [bi for bi in big_kline.bi_list[-3:] if bi.start >= big_kline.cur_duan.end]
if len(extra_bi_list) != 2:
return False
# 线段外一笔的高度,不能超过线段最后一笔高度
if extra_bi_list[0].height > big_kline.cur_duan.bi_list[-1].height:
return False
# 最后一笔的高度不能超过最后一段的高度的黄金分割38%
if extra_bi_list[-1].height > big_kline.cur_duan.height * 0.38:
return False
# 二买情况下
if direction == Direction.LONG:
# 当前线段的第二低点
if len(big_kline.cur_duan.bi_list) > 1:
second_low = min([bi.low for bi in big_kline.cur_duan.bi_list[:-1]])
else:
second_low = min([bi.low for bi in big_kline.bi_list[-5:-1]])
# 反抽上涨分笔,高度不能打破第二低点
if extra_bi_list[0].high > second_low:
return False
else:
# 当前线段的第二高点
if len(big_kline.cur_duan.bi_list) > 1:
second_high = max([bi.high for bi in big_kline.cur_duan.bi_list[:-1]])
else:
second_high = max([bi.high for bi in big_kline.bi_list[-5:-1]])
# 反抽下跌分笔,低点不能打破第二高点
if extra_bi_list[0].low < second_high:
return False
return True
def check_zs_3rd(big_kline: CtaLineBar,
small_kline: Union[CtaLineBar, None],
signal_direction: Direction,
first_zs: bool = True,
all_zs: bool = True):
"""
三买三卖信号
:param big_kline: 本级别K线
:param small_kline: 次级别K线
:param signal_direction: 信号方向Direction.LONG: 三买信号, Direction.SHORT, 三卖信号
:param first_zs: 线段内得首个三买三卖即第一个中枢后才有效
:param all_zs: True 中枢的开始在线段开始点之后 False: 中枢结束在线段的开始点之后
:return:
"""
# Diection => 1/-1
direction = 1 if signal_direction == Direction.LONG else -1
if not big_kline.pre_duan or not big_kline.cur_bi_zs:
return
# 排除,须满足:当前段的方向 == 信号方向, 当前笔的方向 != 信号方向
if big_kline.cur_duan.direction != direction or big_kline.cur_bi.direction == direction:
return False
# 当前线段结束,与当前回调笔位置一致
if big_kline.cur_duan.end != big_kline.cur_bi.start:
return False
zs_num = 0
# 中枢与当前线段交集的判断
if all_zs:
# 信号线段必须至少含有5个分笔如果含有1个分笔的可能是强二买信号
if len(big_kline.cur_duan.bi_list) < 3:
return False
# 当前中枢需要完全在当前线段内
if big_kline.cur_bi_zs.start < big_kline.cur_duan.start:
return False
# 当前段之后的所有包含中枢
zs_list = [zs for zs in big_kline.bi_zs_list[-3:] if zs.start >= big_kline.cur_duan.start]
zs_num = len(zs_list)
# 是否现在线段得首个中枢后的三买三卖
if first_zs and zs_num> 1:
return False
else:
# 中枢需要与当前线段有交集[部分交集、或中枢完全在当前段内形成]
if big_kline.cur_bi_zs.end < big_kline.cur_duan.start:
return False
# 当前段之后的所有交集中枢
zs_list = [zs for zs in big_kline.bi_zs_list[-3:] if zs.end > big_kline.cur_duan.start]
zs_num = len(zs_list)
# 是否现在线段得首个中枢后的三买三卖
if first_zs and zs_num > 1:
return False
if not first_zs and zs_num > 1:
# 中枢的进入笔、离开笔
# 中枢的首笔与线段不同向,则选择中枢之前的一笔和最后的一笔
if big_kline.cur_bi_zs.bi_list[0].direction != big_kline.cur_duan.direction:
entry_bi_list = [bi for bi in big_kline.cur_duan.bi_list if bi.end <= big_kline.cur_bi_zs.start]
exit_bi_list = [bi for bi in big_kline.cur_duan.bi_list if bi.end > big_kline.cur_bi_zs.end]
if not (len(entry_bi_list) >= 1 and len(exit_bi_list) == 1):
return False
entry_bi = entry_bi_list[-1]
exit_bi = exit_bi_list[0]
# 中枢首笔跟线段同向
else:
entry_bi = big_kline.cur_bi_zs.bi_list[0]
exit_bi = big_kline.cur_duan.bi_list[-1]
#
# # 防止属于中枢盘整
if entry_bi.height > exit_bi.height and entry_bi.atan > exit_bi.atan:
return False
# 判断三买信号
if signal_direction == Direction.LONG:
# 本级别最后一笔,具有底分型
if not check_bi_not_rt(big_kline, Direction.SHORT):
return
# 线段最后一笔,与中枢有交集,且笔的中心,不在中枢内
if big_kline.cur_duan.bi_list[-1].low > big_kline.cur_bi_zs.high \
or big_kline.cur_duan.bi_list[-1].middle <= big_kline.cur_bi_zs.high:
return False
# # 线段的最后一笔,长度不能超过平均长度的两倍
# if big_kline.cur_duan.bi_list[-1].height > big_kline.bi_height_ma() * 2:
# return False
# 下跌笔不落中枢,一般使用笔的底部必须在中枢上方。为了防止毛刺,这里用了分型的高位在中枢上方即可
if big_kline.cur_fenxing.high <= big_kline.cur_bi_zs.high:
return False
# 判断三卖信号
if signal_direction == Direction.SHORT:
# 本级别最后一笔,具有顶分型
if not check_bi_not_rt(big_kline, Direction.LONG):
return
# 线段最后一笔,与中枢有交集,且笔的中心,不在中枢内
if big_kline.cur_duan.bi_list[-1].high < big_kline.cur_bi_zs.low \
or big_kline.cur_duan.bi_list[-1].middle >= big_kline.cur_bi_zs.low:
return False
# # 线段的最后一笔,长度不能超过平均长度的两倍
# if big_kline.cur_duan.bi_list[-1].height > big_kline.bi_height_ma() * 2:
# return False
# 上涨分笔不回中枢,一般使用笔的顶部必须在中枢下方。为了防止毛刺,这里用了分型的低位在中枢下方即可
if big_kline.cur_fenxing.low >= big_kline.cur_bi_zs.low:
return False
# 分析次级别K线判断其是否也发生线段背驰
if small_kline:
if len(small_kline.cur_duan.bi_list) > 1:
if small_kline.cur_duan.bi_list[0].height > small_kline.cur_duan.bi_list[-1].height \
or small_kline.cur_duan.bi_list[0].atan > small_kline.cur_duan.bi_list[-1].atan \
or small_kline.cur_duan.bi_list[-3].height > small_kline.cur_duan.bi_list[-1].height \
or small_kline.cur_duan.bi_list[-3].atan > small_kline.cur_duan.bi_list[-1].atan:
return True
else:
return True

View File

@ -564,37 +564,37 @@ class CtpGateway(BaseGateway):
def close(self):
""""""
if self.md_api:
self.write_log('断开行情API')
self.write_log('断开行情API',on_log=True)
tmp1 = self.md_api
self.md_api = None
tmp1.close()
if self.l2_md_api:
self.write_log('断开五档行情API')
self.write_log('断开五档行情API',on_log=True)
tmp1 = self.l2_md_api
self.l2_md_api = None
tmp1.close()
if self.td_api:
self.write_log('断开交易API')
self.write_log('断开交易API',on_log=True)
tmp2 = self.td_api
self.td_api = None
tmp2.close()
if self.tdx_api:
self.write_log(u'断开tdx指数行情API')
self.write_log(u'断开tdx指数行情API',on_log=True)
tmp3 = self.tdx_api
self.tdx_api = None
tmp3.close()
if self.rabbit_api:
self.write_log(u'断开rabbit MQ tdx指数行情API')
self.write_log(u'断开rabbit MQ tdx指数行情API',on_log=True)
tmp4 = self.rabbit_api
self.rabbit_api = None
tmp4.close()
if self.tq_api:
self.write_log(u'天勤行情API')
self.write_log(u'断开天勤行情API',on_log=True)
tmp5 = self.tq_api
self.tq_api = None
tmp5.close()
@ -659,7 +659,7 @@ class CtpMdApi(MdApi):
"""
Callback when front server is connected.
"""
self.gateway.write_log(f"{self.name}行情服务器连接成功")
self.gateway.write_log(f"{self.name}行情服务器连接成功",on_log=True)
self.connect_status = True
self.login()
self.gateway.status.update(
@ -671,7 +671,7 @@ class CtpMdApi(MdApi):
"""
self.login_status = False
self.connect_status = False
self.gateway.write_log(f"{self.name}行情服务器连接断开,原因{reason}")
self.gateway.write_log(f"{self.name}行情服务器连接断开,原因{reason}",on_log=True)
self.gateway.status.update(
{f'{self.name}md_con': False, f'{self.name}md_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
@ -681,7 +681,7 @@ class CtpMdApi(MdApi):
"""
if not error["ErrorID"]:
self.login_status = True
self.gateway.write_log(f"{self.name}行情服务器登录成功")
self.gateway.write_log(f"{self.name}行情服务器登录成功",on_log=True)
for symbol in self.subscribed:
self.subscribeMarketData(symbol)
@ -877,7 +877,7 @@ class CtpTdApi(TdApi):
def onFrontConnected(self):
""""""
self.gateway.write_log("交易服务器连接成功")
self.gateway.write_log("交易服务器连接成功",on_log=True)
self.connect_status = True
if self.auth_code:
self.gateway.write_log("向交易服务器提交授权码验证")
@ -889,7 +889,7 @@ class CtpTdApi(TdApi):
def onFrontDisconnected(self, reason: int):
""""""
self.login_status = False
self.gateway.write_log(f"交易服务器连接断开,原因{reason}")
self.gateway.write_log(f"交易服务器连接断开,原因{reason}",on_log=True)
self.gateway.status.update({'td_con': False, 'td_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
def onRspAuthenticate(self, data: dict, error: dict, reqid: int, last: bool):
@ -910,7 +910,7 @@ class CtpTdApi(TdApi):
self.sessionid = data["SessionID"]
self.login_status = True
self.gateway.status.update({'td_con': True, 'td_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
self.gateway.write_log("交易帐号登录完成")
self.gateway.write_log("交易帐号登录完成",on_log=True)
# Confirm settlement
req = {
@ -971,7 +971,7 @@ class CtpTdApi(TdApi):
"""
Callback of settlment info confimation.
"""
self.gateway.write_log("结算信息确认成功")
self.gateway.write_log("结算信息确认成功",on_log=True)
while True:
self.reqid += 1
@ -1594,7 +1594,7 @@ class TdxMdApi():
# 创建api连接对象实例
try:
if self.api is None or not self.connection_status:
self.gateway.write_log(u'开始连接通达信行情服务器')
self.gateway.write_log(u'开始连接通达信行情服务器',on_log=True)
self.api = TdxExHq_API(heartbeat=True, auto_retry=True, raise_exception=True)
# 选取最佳服务器
@ -1619,12 +1619,12 @@ class TdxMdApi():
self.thread.start()
except Exception as ex:
self.gateway.write_log(u'连接服务器tdx异常:{},{}'.format(str(ex), traceback.format_exc()))
self.gateway.write_log(u'连接服务器tdx异常:{},{}'.format(str(ex), traceback.format_exc()),on_log=True)
return
def close(self):
"""退出API"""
self.gateway.write_log(u'退出tdx API')
self.gateway.write_log(u'退出tdx API',on_log=True)
self.connection_status = False
if self.thread:
@ -2110,7 +2110,7 @@ class TqMdApi():
from tqsdk import TqApi
self.api = TqApi(url="wss://u.shinnytech.com/t/md/front/mobile")
except Exception as e:
self.gateway.write_log(f'天勤行情API接入异常'.format(str(e)))
self.gateway.write_log(f'天勤行情API接入异常'.format(str(e)),on_log=True)
if self.api:
self.is_connected = True
self.gateway.write_log(f'天勤行情API已连接')
@ -2216,7 +2216,7 @@ class TqMdApi():
self.quote_objs.append((req.vt_symbol, quote))
self.subscribe_array.append(req.vt_symbol)
except Exception as ex:
self.gateway.write_log('订阅天勤行情异常:{}'.format(str(ex)))
self.gateway.write_log('订阅天勤行情异常:{}'.format(str(ex)),on_log=True)
def query_contracts(self) -> None:
""""""
@ -2317,4 +2317,4 @@ class TqMdApi():
if self.update_thread:
self.update_thread.join()
except Exception as e:
self.gateway.write_log('退出天勤行情api异常:{}'.format(str(e)))
self.gateway.write_log('退出天勤行情api异常:{}'.format(str(e)),on_log=True)

View File

@ -173,6 +173,7 @@ class ThsGateway(BaseGateway):
def connect(self, setting: dict) -> None:
"""连接"""
try:
userid = setting["资金账号"]
password = setting["登录密码"]
@ -191,6 +192,10 @@ class ThsGateway(BaseGateway):
# self.tq_api.connect()
self.init_query()
except Exception as ex:
msg = f'{self.gateway_name}连接行情和交易接口异常:{str(ex)}'
self.write_error(msg)
def close(self) -> None:
""""""
self.md_api.close()
@ -248,6 +253,8 @@ class ThsGateway(BaseGateway):
self.count = 0
self.query_functions = [self.query_orders, self.query_trades, self.query_account, self.query_position]
def check_status(self):
self.write_log(self.status)
class TdxMdApi(object):
"""通达信行情和基础数据"""
@ -403,7 +410,7 @@ class TdxMdApi(object):
self.pool.map_async(self.run, range(n))
# 设置上层的连接状态
self.gateway.tdxConnected = True
self.gateway.tdx_connected = True
def reconnect(self, i):
"""
@ -884,7 +891,8 @@ class ThsTdApi(object):
self.password = user_pwd
self.rpc_host = host
self.rpc_port = port
self.gateway.write_log(f'{self.gateway_name}开始连接{host}:{port}')
try:
# 创建 easy客户端
self.api = easytrader_use(broker='pingan_ths', host=self.rpc_host, port=self.rpc_port)
@ -893,6 +901,12 @@ class ThsTdApi(object):
self.login_status = True
except Exception as ex:
msg = f'{self.gateway_name}连接{host}:{port}异常:{str(ex)}'
self.gateway.write_error(msg)
from vnpy.trader.util_wechat import send_wx_msg
send_wx_msg(msg)
def reconnect(self):
"""连接"""
self.gateway.write_log(f'重新连接')