diff --git a/vnpy/app/cta_strategy_pro/__init__.py b/vnpy/app/cta_strategy_pro/__init__.py index b21941cc..bfa645a9 100644 --- a/vnpy/app/cta_strategy_pro/__init__.py +++ b/vnpy/app/cta_strategy_pro/__init__.py @@ -8,6 +8,7 @@ from .engine import CtaEngine from .template import ( Direction, Offset, + Exchange, Status, Color, TickData, diff --git a/vnpy/app/cta_strategy_pro/back_testing.py b/vnpy/app/cta_strategy_pro/back_testing.py index 0b5b5579..91311b7f 100644 --- a/vnpy/app/cta_strategy_pro/back_testing.py +++ b/vnpy/app/cta_strategy_pro/back_testing.py @@ -1240,7 +1240,7 @@ class BackTestingEngine(object): active_exchange = self.get_exchange(active_symbol) active_vt_symbol = active_symbol + '.' + active_exchange.value passive_exchange = self.get_exchange(passive_symbol) - # passive_vt_symbol = active_symbol + '.' + passive_exchange.value + passive_vt_symbol = passive_symbol + '.' + passive_exchange.value # 主动腿成交记录 act_trade = TradeData(gateway_name=self.gateway_name, symbol=active_symbol, @@ -1438,10 +1438,10 @@ class BackTestingEngine(object): # 如果当前没有空单,属于异常行为 if len(self.short_position_list) == 0: self.write_error(u'异常!没有空单持仓,不能cover') - raise Exception(u'异常!没有空单持仓,不能cover') + # raise Exception(u'异常!没有空单持仓,不能cover') return - cur_short_pos_list = [s_pos.volume for s_pos in self.short_position_list] + cur_short_pos_list = [s_pos.volume for s_pos in self.short_position_list if s_pos.vt_symbol == trade.vt_symbol] self.write_log(u'{}当前空单:{}'.format(trade.vt_symbol, cur_short_pos_list)) @@ -1450,9 +1450,13 @@ class BackTestingEngine(object): val.vt_symbol == trade.vt_symbol and val.strategy_name == trade.strategy_name] if len(pop_indexs) < 1: - self.write_error(u'异常,{}没有对应symbol:{}的空单持仓'.format(trade.strategy_name, trade.vt_symbol)) - raise Exception(u'realtimeCalculate2() Exception,没有对应symbol:{0}的空单持仓'.format(trade.vt_symbol)) - return + if 'spd' in vt_tradeid: + self.write_error(f'没有{trade.strategy_name}对应的symbol:{trade.vt_symbol}的空单持仓, 继续') + break + else: + self.write_error(u'异常,{}没有对应symbol:{}的空单持仓, 终止'.format(trade.strategy_name, trade.vt_symbol)) + # raise Exception(u'realtimeCalculate2() Exception,没有对应symbol:{0}的空单持仓'.format(trade.vt_symbol)) + return pop_index = pop_indexs[0] # 从未平仓的空头交易 @@ -1494,7 +1498,7 @@ class BackTestingEngine(object): self.trade_pnl_list.append(t) # 非自定义套利对,才更新到策略盈亏 - if not open_trade.vt_symbol.endswith('SPD'): + if not (open_trade.vt_symbol.endswith('SPD') or open_trade.vt_symbol.endswith('SPD99')): # 更新策略实例的累加盈亏 self.pnl_strategy_dict.update( {open_trade.strategy_name: self.pnl_strategy_dict.get(open_trade.strategy_name, @@ -1506,7 +1510,9 @@ class BackTestingEngine(object): open_trade.volume, result.pnl, result.commission) self.write_log(msg) - result_list.append(result) + + # 添加到交易结果汇总 + result_list.append(result) if g_result is None: if cover_volume > 0: @@ -1569,6 +1575,9 @@ class BackTestingEngine(object): self.write_log(msg) + # 添加到交易结果汇总 + result_list.append(result) + # 更新(减少)开仓单的volume,重新推进开仓单列表中 open_trade.volume = remain_volume self.write_log(u'更新(减少)开仓单的volume,重新推进开仓单列表中:{}'.format(open_trade.volume)) @@ -1577,7 +1586,7 @@ class BackTestingEngine(object): self.write_log(u'当前空单:{}'.format(cur_short_pos_list)) cover_volume = 0 - result_list.append(result) + if g_result is not None: # 更新组合的数据 @@ -1606,18 +1615,21 @@ class BackTestingEngine(object): while sell_volume > 0: if len(self.long_position_list) == 0: self.write_error(f'异常,没有{trade.vt_symbol}的多仓') - raise RuntimeError(u'realtimeCalculate2() Exception,没有开多单') + # raise RuntimeError(u'realtimeCalculate2() Exception,没有开多单') return pop_indexs = [i for i, val in enumerate(self.long_position_list) if val.vt_symbol == trade.vt_symbol and val.strategy_name == trade.strategy_name] if len(pop_indexs) < 1: - self.write_error(f'没有{trade.strategy_name}对应的symbol{trade.vt_symbol}多单数据,') - raise RuntimeError( - f'realtimeCalculate2() Exception,没有对应的symbol{trade.vt_symbol}多单数据,') - return + if 'spd' in vt_tradeid: + self.write_error(f'没有{trade.strategy_name}对应的symbol:{trade.vt_symbol}多单数据, 继续') + break + else: + self.write_error(f'没有{trade.strategy_name}对应的symbol:{trade.vt_symbol}多单数据, 终止') + # raise RuntimeError(f'realtimeCalculate2() Exception,没有对应的symbol:{trade.vt_symbol}多单数据,') + return - cur_long_pos_list = [s_pos.volume for s_pos in self.long_position_list] + cur_long_pos_list = [s_pos.volume for s_pos in self.long_position_list if s_pos.vt_symbol == trade.vt_symbol] self.write_log(u'{}当前多单:{}'.format(trade.vt_symbol, cur_long_pos_list)) @@ -1669,7 +1681,9 @@ class BackTestingEngine(object): open_trade.volume, result.pnl, result.commission) self.write_log(msg) - result_list.append(result) + + # 添加到交易结果汇总 + result_list.append(result) if g_result is None: if sell_volume > 0: @@ -1728,13 +1742,14 @@ class BackTestingEngine(object): result.commission) self.write_log(msg) + # 添加到交易结果汇总 + result_list.append(result) # 减少开多volume,重新推进多单持仓列表中 open_trade.volume = remain_volume self.long_position_list.append(open_trade) sell_volume = 0 - result_list.append(result) if g_result is not None: # 更新组合的数据 @@ -1786,8 +1801,11 @@ class BackTestingEngine(object): continue # 当前空单保证金 if self.use_margin: - cur_occupy_money = max(self.get_price(t.vt_symbol), t.price) * abs(t.volume) * self.get_size( - t.vt_symbol) * self.get_margin_rate(t.vt_symbol) + try: + cur_occupy_money = max(self.get_price(t.vt_symbol), t.price) * abs(t.volume) * self.get_size( + t.vt_symbol) * self.get_margin_rate(t.vt_symbol) + except Exception as ex: + self.write_error(ex) else: cur_occupy_money = self.get_price(t.vt_symbol) * abs(t.volume) * self.get_size( t.vt_symbol) * self.get_margin_rate(t.vt_symbol) diff --git a/vnpy/app/cta_strategy_pro/engine.py b/vnpy/app/cta_strategy_pro/engine.py index efa2a473..f3f96b79 100644 --- a/vnpy/app/cta_strategy_pro/engine.py +++ b/vnpy/app/cta_strategy_pro/engine.py @@ -797,6 +797,10 @@ class CtaEngine(BaseEngine): return True + @lru_cache() + def get_exchange(self, symbol): + return self.main_engine.get_exchange(symbol) + @lru_cache() def get_name(self, vt_symbol: str): """查询合约的name""" @@ -868,6 +872,9 @@ class CtaEngine(BaseEngine): def get_contract(self, vt_symbol): return self.main_engine.get_contract(vt_symbol) + def get_custom_contract(self, vt_symbol): + return self.main_engine.get_custom_contract(vt_symbol.split('.')[0]) + def get_all_contracts(self): return self.main_engine.get_all_contracts() @@ -986,38 +993,47 @@ class CtaEngine(BaseEngine): """ Add a new strategy. """ - if strategy_name in self.strategies: - msg = f"创建策略失败,存在重名{strategy_name}" - self.write_log(msg=msg, - level=logging.CRITICAL) - return False, msg + try: + if strategy_name in self.strategies: + msg = f"创建策略失败,存在重名{strategy_name}" + self.write_log(msg=msg, + level=logging.CRITICAL) + return False, msg - strategy_class = self.classes.get(class_name, None) - if not strategy_class: - msg = f"创建策略失败,找不到策略类{class_name}" - self.write_log(msg=msg, - level=logging.CRITICAL) - return False, msg + strategy_class = self.classes.get(class_name, None) + if not strategy_class: + msg = f"创建策略失败,找不到策略类{class_name}" + self.write_log(msg=msg, + level=logging.CRITICAL) + return False, msg - self.write_log(f'开始添加策略类{class_name},实例名:{strategy_name}') - strategy = strategy_class(self, strategy_name, vt_symbol, setting) - self.strategies[strategy_name] = strategy + self.write_log(f'开始添加策略类{class_name},实例名:{strategy_name}') + strategy = strategy_class(self, strategy_name, vt_symbol, setting) + self.strategies[strategy_name] = strategy - # Add vt_symbol to strategy map. - strategies = self.symbol_strategy_map[vt_symbol] - strategies.append(strategy) + # Add vt_symbol to strategy map. + strategies = self.symbol_strategy_map[vt_symbol] + strategies.append(strategy) - subscribe_symbol_set = self.strategy_symbol_map[strategy_name] - subscribe_symbol_set.add(vt_symbol) + subscribe_symbol_set = self.strategy_symbol_map[strategy_name] + subscribe_symbol_set.add(vt_symbol) - # Update to setting file. - self.update_strategy_setting(strategy_name, setting, auto_init, auto_start) + # Update to setting file. + self.update_strategy_setting(strategy_name, setting, auto_init, auto_start) - self.put_strategy_event(strategy) + self.put_strategy_event(strategy) - # 判断设置中是否由自动初始化和自动启动项目 - if auto_init: - self.init_strategy(strategy_name, auto_start=auto_start) + # 判断设置中是否由自动初始化和自动启动项目 + if auto_init: + self.init_strategy(strategy_name, auto_start=auto_start) + + except Exception as ex: + msg = f'添加策略实例{strategy_name}失败,{str(ex)}' + self.write_error(msg) + self.write_error(traceback.format_exc()) + self.send_wechat(msg) + + return False, f'添加策略实例{strategy_name}失败' return True, f'成功添加{strategy_name}' @@ -1789,7 +1805,7 @@ class CtaEngine(BaseEngine): # 其他期货:帐号多单 vs 除了多单, 空单 vs 空单 if vt_symbol.endswith(".CFFEX"): diff_match = (symbol_pos.get('账号多单', 0) - symbol_pos.get('账号空单', 0)) == ( - symbol_pos.get('策略多单', 0) - symbol_pos.get('策略空单', 0)) + symbol_pos.get('策略多单', 0) - symbol_pos.get('策略空单', 0)) pos_match = symbol_pos.get('账号空单', 0) == symbol_pos.get('策略空单', 0) and \ symbol_pos.get('账号多单', 0) == symbol_pos.get('策略多单', 0) match = diff_match @@ -1803,11 +1819,12 @@ class CtaEngine(BaseEngine): symbol_pos.get('策略多单', 0), symbol_pos.get('策略空单', 0) )) - diff_pos_dict.update({vt_symbol: {"long":symbol_pos.get('账号多单', 0) - symbol_pos.get('策略多单', 0), - "short":symbol_pos.get('账号空单', 0) - symbol_pos.get('策略空单', 0)}}) + diff_pos_dict.update({vt_symbol: {"long": symbol_pos.get('账号多单', 0) - symbol_pos.get('策略多单', 0), + "short": symbol_pos.get('账号空单', 0) - symbol_pos.get('策略空单', + 0)}}) else: match = round(symbol_pos.get('账号空单', 0), 7) == round(symbol_pos.get('策略空单', 0), 7) and \ - round(symbol_pos.get('账号多单', 0), 7) == round(symbol_pos.get('策略多单', 0), 7) + round(symbol_pos.get('账号多单', 0), 7) == round(symbol_pos.get('策略多单', 0), 7) # 多空都一致 if match: msg = u'{}多空都一致.{}\n'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False)) @@ -1862,7 +1879,7 @@ class CtaEngine(BaseEngine): else: self.write_log(u'账户持仓与策略一致') if len(diff_pos_dict) > 0: - for k,v in diff_pos_dict.items(): + for k, v in diff_pos_dict.items(): self.write_log(f'{k} 存在大于策略的轧差持仓:{v}') return True, compare_info diff --git a/vnpy/app/cta_strategy_pro/portfolio_testing.py b/vnpy/app/cta_strategy_pro/portfolio_testing.py index ec76641b..96aebac8 100644 --- a/vnpy/app/cta_strategy_pro/portfolio_testing.py +++ b/vnpy/app/cta_strategy_pro/portfolio_testing.py @@ -277,7 +277,7 @@ class PortfolioTestingEngine(BackTestingEngine): bar.high_price = float(bar_data['high']) bar.low_price = float(bar_data['low']) bar.volume = int(bar_data['volume']) - bar.open_interest = int(bar_data.get('open_interest', 0)) + bar.open_interest = float(bar_data.get('open_interest', 0)) bar.date = bar_datetime.strftime('%Y-%m-%d') bar.time = bar_datetime.strftime('%H:%M:%S') str_td = str(bar_data.get('trading_day', '')) diff --git a/vnpy/app/cta_strategy_pro/spread_testing.py b/vnpy/app/cta_strategy_pro/spread_testing.py index c2ad0846..1292e641 100644 --- a/vnpy/app/cta_strategy_pro/spread_testing.py +++ b/vnpy/app/cta_strategy_pro/spread_testing.py @@ -13,7 +13,7 @@ import gc import pandas as pd import numpy as np import traceback - +import random import bz2 import pickle @@ -21,7 +21,7 @@ from datetime import datetime, timedelta from time import sleep from vnpy.trader.object import ( - TickData, + TickData, BarData ) from vnpy.trader.constant import ( Exchange, @@ -33,6 +33,7 @@ from vnpy.trader.utility import ( get_trading_date, import_module_by_str ) +from vnpy.trader.gateway import TickCombiner from .back_testing import BackTestingEngine @@ -51,8 +52,9 @@ class SpreadTestingEngine(BackTestingEngine): CTA套利组合回测引擎, 使用回测引擎作为父类 函数接口和策略引擎保持一样, 从而实现同一套代码从回测到实盘。 - 针对tick回测 - 导入CTA_Settings + + tick回测: + 1,设置tick_path, """ @@ -60,9 +62,17 @@ class SpreadTestingEngine(BackTestingEngine): """Constructor""" super().__init__(event_engine) self.tick_path = None # tick级别回测, 路径 - self.use_tq = False + self.use_tq = False # True:使用tq数据; False:使用淘宝购买的数据(19年之前) self.strategy_start_date_dict = {} self.strategy_end_date_dict = {} + self.tick_combiner_dict = {} # tick合成器 + self.symbol_combiner_dict = {} # symbol : [combiner] + + self.bar_csv_file = {} + self.bar_df_dict = {} # 历史数据的df,回测用 + self.bar_df = None # 历史数据的df,时间+symbol作为组合索引 + self.bar_interval_seconds = 60 # bar csv文件,属于K线类型,K线的周期(秒数),缺省是1分钟 + self.on_tick = self.new_tick # 仿造 gateway的on_tick接口 => new_tick def prepare_env(self, test_setting): self.output('portfolio prepare_env') @@ -70,6 +80,38 @@ class SpreadTestingEngine(BackTestingEngine): self.use_tq = test_setting.get('use_tq', False) + def prepare_data(self, data_dict): + """ + 准备组合数据 + :param data_dict: 合约得配置参数 + :return: + """ + # 调用回测引擎,跟新合约得数据 + super().prepare_data(data_dict) + + if len(data_dict) == 0: + self.write_log(u'请指定回测数据和文件') + return + + if self.mode == 'tick': + return + + # 检查/更新bar文件 + for symbol, symbol_data in data_dict.items(): + self.write_log(u'配置{}数据:{}'.format(symbol, symbol_data)) + + bar_file = symbol_data.get('bar_file', None) + + if bar_file is None: + self.write_error(u'{}没有配置数据文件') + continue + + if not os.path.isfile(bar_file): + self.write_log(u'{0}文件不存在'.format(bar_file)) + continue + + self.bar_csv_file.update({symbol: bar_file}) + def load_strategy(self, strategy_name: str, strategy_setting: dict = None): """ 装载回测的策略 @@ -112,29 +154,90 @@ class SpreadTestingEngine(BackTestingEngine): pas_underly = get_underlying_symbol(pas_symbol).upper() act_exchange = self.get_exchange(f'{act_underly}99') pas_exchange = self.get_exchange(f'{pas_underly}99') - idx_contract = self.get_contract(f'{act_underly}99.{act_exchange.value}') - self.set_contract(symbol=act_symbol, - exchange=act_exchange, - product=idx_contract.product, - name=act_symbol, - size=idx_contract.size, - price_tick=idx_contract.pricetick, - margin_rate=idx_contract.margin_rate) + act_contract = self.get_contract(f'{act_underly}99.{act_exchange.value}') + if self.get_contract(f'{act_symbol}.{act_exchange.value}') is None: + self.set_contract(symbol=act_symbol, + exchange=act_exchange, + product=act_contract.product, + name=act_symbol, + size=act_contract.size, + price_tick=act_contract.pricetick, + margin_rate=act_contract.margin_rate) + self.write_log(f'设置主动腿指数合约信息{act_symbol}.{act_exchange.value}') if pas_underly != act_underly: - idx_contract = self.get_contract(f'{pas_underly}99.{pas_exchange.value}') - - self.set_contract(symbol=pas_symbol, + pas_contract = self.get_contract(f'{pas_underly}99.{pas_exchange.value}') + else: + pas_contract = act_contract + if self.get_contract(f'{pas_symbol}.{pas_exchange.value}') is None: + self.set_contract(symbol=pas_symbol, exchange=pas_exchange, - product=idx_contract.product, + product=pas_contract.product, name=act_symbol, - size=idx_contract.size, - price_tick=idx_contract.pricetick, - margin_rate=idx_contract.margin_rate) + size=pas_contract.size, + price_tick=pas_contract.pricetick, + margin_rate=pas_contract.margin_rate) + self.write_log(f'设置被动腿指数合约信息{pas_symbol}.{pas_exchange.value}') + idx_spd_symbol=f'{act_underly}99-{act_ratio}-{pas_underly}99-{pas_ratio}-{spread_type}' - subscribe_symobls.remove(vt_symbol) + if f'{idx_spd_symbol}.SPD' not in self.contract_dict: + if spread_type == 'CJ': + if act_underly == pas_underly: + spd_price_tick = act_contract.pricetick + spd_size = act_contract.size + spd_margin_rate = act_contract.margin_rate + else: + spd_price_tick = min(act_contract.pricetick, pas_contract.pricetick) + spd_size = min(act_contract.size, pas_contract.size) + spd_margin_rate = max(act_contract.margin_rate, pas_contract.margin_rate) + else: + spd_price_tick = 0.01 + spd_size = 100 + spd_margin_rate = 0.1 + + self.set_contract( + symbol=idx_spd_symbol, + exchange=Exchange.SPD, + product=act_contract.product, + name=idx_spd_symbol, + size=spd_size, + price_tick=spd_price_tick, + margin_rate=spd_margin_rate + ) + self.write_log(f'设置套利合约信息{idx_spd_symbol}.SPD') + spd_contract =self.contract_dict.get(f'{idx_spd_symbol}.SPD') + + # subscribe_symobls.remove(vt_symbol) subscribe_symobls.append(f'{act_symbol}.{act_exchange.value}') subscribe_symobls.append(f'{pas_symbol}.{pas_exchange.value}') + # 价差生成器 + combiner = self.tick_combiner_dict.get(vt_symbol, None) + act_combiners = self.symbol_combiner_dict.get(act_symbol, []) + pas_combiners = self.symbol_combiner_dict.get(pas_symbol, []) + if combiner is None: + combiner = TickCombiner( + gateway=self, + setting={ + "symbol": symbol, + "leg1_symbol": act_symbol, + "leg1_ratio": int(act_ratio), + "leg2_symbol": pas_symbol, + "leg2_ratio": int(pas_ratio), + "price_tick": spd_contract.pricetick, + "is_spread": True if spread_type == "CJ" else False, + "is_ratio": True if spread_type == "BJ" else False} + ) + self.tick_combiner_dict[vt_symbol] = combiner + self.write_log(f'添加{vt_symbol} tick合成器') + if combiner not in act_combiners: + act_combiners.append(combiner) + self.symbol_combiner_dict.update({act_symbol: act_combiners}) + self.write_log(f'添加{act_symbol} => {vt_symbol} 合成器映射关系') + if combiner not in pas_combiners: + pas_combiners.append(combiner) + self.symbol_combiner_dict.update({pas_symbol: pas_combiners}) + self.write_log(f'添加{pas_symbol} => {vt_symbol} 合成器映射关系') + # 取消自动启动 if 'auto_start' in strategy_setting: strategy_setting.update({'auto_start': False}) @@ -205,7 +308,10 @@ class SpreadTestingEngine(BackTestingEngine): self.write_log(u'开始回测:{} ~ {}'.format(self.data_start_date, self.data_end_date)) - self.run_tick_test() + if self.mode == 'tick': + self.run_tick_test() + else: + self.run_bar_test() def load_csv_file(self, tick_folder, vt_symbol, tick_date): """从文件中读取tick,返回list[{dict}]""" @@ -237,7 +343,7 @@ class SpreadTestingEngine(BackTestingEngine): ticks = [] if not os.path.isfile(file_path): - self.write_log(u'{0}文件不存在'.format(file_path)) + self.write_log(f'{file_path}文件不存在') return None df = pd.read_csv(file_path, encoding='gbk', parse_dates=False) @@ -292,7 +398,7 @@ class SpreadTestingEngine(BackTestingEngine): ticks = [] if not os.path.isfile(file_path): - self.write_log(u'{0}文件不存在'.format(file_path)) + self.write_log(u'{}文件不存在'.format(file_path)) return None try: df = pd.read_csv(file_path, parse_dates=False) @@ -309,7 +415,7 @@ class SpreadTestingEngine(BackTestingEngine): tick = row.to_dict() tick['date'], tick['time'] = tick['datetime'].split(' ') - tick.update({'trading_day': tick_date.strftime('%Y-%m-%d')}) + tick.update({'trading_day': tick_date.strftime('%Y-%m-%d')}) tick_datetime = datetime.strptime(tick['datetime'], '%Y-%m-%d %H:%M:%S.%f') # 修正毫秒 @@ -332,7 +438,7 @@ class SpreadTestingEngine(BackTestingEngine): del df except Exception as ex: - self.write_log(u'{0}文件读取不成功'.format(file_path)) + self.write_log(f'{file_path}文件读取不成功: {str(ex)}') return None return ticks @@ -389,6 +495,186 @@ class SpreadTestingEngine(BackTestingEngine): return tick_df + def load_bar_csv_to_df(self, vt_symbol, bar_file, data_start_date=None, data_end_date=None): + """加载回测bar数据到DataFrame""" + self.output(u'loading {} from {}'.format(vt_symbol, bar_file)) + if vt_symbol in self.bar_df_dict: + return True + + if bar_file is None or not os.path.exists(bar_file): + self.write_error(u'回测时,{}对应的csv bar文件{}不存在'.format(vt_symbol, bar_file)) + return False + + try: + data_types = { + "datetime": str, + "open": float, + "high": float, + "low": float, + "close": float, + "open_interest": float, + "volume": float, + "instrument_id": str, + "symbol": str, + "total_turnover": float, + "limit_down": float, + "limit_up": float, + "trading_day": str, + "date": str, + "time": str + } + + # 加载csv文件 =》 dateframe + symbol_df = pd.read_csv(bar_file, dtype=data_types) + if len(symbol_df) == 0: + self.write_error(f'回测时加载{vt_symbol} csv文件{bar_file}失败。') + return False + + first_dt = symbol_df.iloc[0]['datetime'] + if '.' in first_dt: + datetime_format = "%Y-%m-%d %H:%M:%S.%f" + else: + datetime_format = "%Y-%m-%d %H:%M:%S" + # 转换时间,str =》 datetime + symbol_df["datetime"] = pd.to_datetime(symbol_df["datetime"], format=datetime_format) + # 设置时间为索引 + symbol_df = symbol_df.set_index("datetime") + + # 裁剪数据 + symbol_df = symbol_df.loc[self.test_start_date:self.test_end_date] + + self.bar_df_dict.update({vt_symbol: symbol_df}) + + except Exception as ex: + self.write_error(u'回测时读取{} csv文件{}失败:{}'.format(vt_symbol, bar_file, ex)) + self.output(u'回测时读取{} csv文件{}失败:{}'.format(vt_symbol, bar_file, ex)) + return False + + return True + + def comine_bar_df(self): + """ + 合并所有回测合约的bar DataFrame =》集中的DataFrame + 把bar_df_dict =》bar_df + :return: + """ + self.output('comine_df') + if len(self.bar_df_dict) == 0: + self.output(f'无加载任何数据,请检查bar文件路径配置') + + self.bar_df = pd.concat(self.bar_df_dict, axis=0).swaplevel(0, 1).sort_index() + self.bar_df_dict.clear() + + def run_bar_test(self): + """使用bar进行组合回测""" + testdays = (self.data_end_date - self.data_start_date).days + + if testdays < 1: + self.write_log(u'回测时间不足') + return + + # 加载数据 + for vt_symbol in self.symbol_strategy_map.keys(): + symbol, exchange = extract_vt_symbol(vt_symbol) + + # 不读取SPD的bar文件 + if exchange == Exchange.SPD: + continue + self.load_bar_csv_to_df(vt_symbol, self.bar_csv_file.get(symbol)) + + # 合并数据 + self.comine_bar_df() + + last_trading_day = None + bars_dt = None + bars_same_dt = [] + + gc_collect_days = 0 + + try: + for (dt, vt_symbol), bar_data in self.bar_df.iterrows(): + symbol, exchange = extract_vt_symbol(vt_symbol) + + bar_datetime = dt - timedelta(seconds=self.bar_interval_seconds) + + bar = BarData( + gateway_name='backtesting', + symbol=symbol, + exchange=exchange, + datetime=bar_datetime + ) + + bar.open_price = float(bar_data['open']) + bar.close_price = float(bar_data['close']) + bar.high_price = float(bar_data['high']) + bar.low_price = float(bar_data['low']) + bar.volume = int(bar_data['volume']) + bar.open_interest = int(bar_data.get('open_interest', 0)) + bar.date = bar_datetime.strftime('%Y-%m-%d') + bar.time = bar_datetime.strftime('%H:%M:%S') + str_td = str(bar_data.get('trading_day', '')) + if len(str_td) == 8: + bar.trading_day = str_td[0:4] + '-' + str_td[4:6] + '-' + str_td[6:8] + elif len(str_td) == 10: + bar.trading_day = str_td + else: + bar.trading_day = get_trading_date(bar_datetime) + + if last_trading_day != bar.trading_day: + self.output(u'回测数据日期:{},资金:{}'.format(bar.trading_day, self.net_capital)) + if self.strategy_start_date > bar.datetime: + last_trading_day = bar.trading_day + + # bar时间与队列时间一致,添加到队列中 + if dt == bars_dt: + bars_same_dt.append(bar) + continue + else: + # bar时间与队列时间不一致,先推送队列的bars + random.shuffle(bars_same_dt) + for _bar_ in bars_same_dt: + self.new_bar(_bar_) + + # 创建新的队列 + bars_same_dt = [bar] + bars_dt = dt + + # 更新每日净值 + if self.strategy_start_date <= dt <= self.data_end_date: + if last_trading_day != bar.trading_day: + if last_trading_day is not None: + self.saving_daily_data(datetime.strptime(last_trading_day, '%Y-%m-%d'), self.cur_capital, + self.max_net_capital, self.total_commission) + last_trading_day = bar.trading_day + + # 第二个交易日,撤单 + self.cancel_orders() + # 更新持仓缓存 + self.update_pos_buffer() + + gc_collect_days += 1 + if gc_collect_days >= 10: + # 执行内存回收 + gc.collect() + sleep(1) + gc_collect_days = 0 + + if self.net_capital < 0: + self.write_error(u'净值低于0,回测停止') + self.output(u'净值低于0,回测停止') + return + + self.write_log(u'bar数据回放完成') + if last_trading_day is not None: + self.saving_daily_data(datetime.strptime(last_trading_day, '%Y-%m-%d'), self.cur_capital, + self.max_net_capital, self.total_commission) + except Exception as ex: + self.write_error(u'回测异常导致停止:{}'.format(str(ex))) + self.write_error(u'{},{}'.format(str(ex), traceback.format_exc())) + print(str(ex), file=sys.stderr) + traceback.print_exc() + return + def run_tick_test(self): """运行tick级别组合回测""" testdays = (self.data_end_date - self.data_start_date).days @@ -432,6 +718,9 @@ class SpreadTestingEngine(BackTestingEngine): self.new_tick(tick) + # 推送至所有tick combiner + [c.on_tick(tick) for c in self.symbol_combiner_dict.get(tick.symbol, [])] + # 结束一个交易日后,更新每日净值 self.saving_daily_data(test_day, self.cur_capital, @@ -463,6 +752,47 @@ class SpreadTestingEngine(BackTestingEngine): self.write_log(u'tick数据回放完成') + def new_bar(self, bar): + """ + 重载new_bar方法 + bar => tick => 合成器 => new_tick + :param bar: + :return: + """ + tick = self.bar_to_tick(bar) + self.new_tick(tick) + # 推送至所有tick combiner + [c.on_tick(tick) for c in self.symbol_combiner_dict.get(tick.symbol, [])] + + def bar_to_tick(self, bar): + """ 通过bar分时bar转换为tick数据 """ + + # tick =》 增加一分钟 + tick = TickData( + gateway_name='backtesting', + symbol=bar.symbol, + exchange=bar.exchange, + datetime=bar.datetime + timedelta(minutes=1) + ) + tick.date = tick.datetime.strftime('%Y-%m-%d') + tick.time = tick.datetime.strftime('%H:%M:%S.000') + tick.trading_day = bar.trading_day if bar.trading_day else get_trading_date(tick.datetime) + tick.volume = bar.volume + tick.open_interest = bar.open_interest + tick.last_price = bar.close_price + tick.last_volume = bar.volume + tick.limit_up = 0 + tick.limit_down = 0 + tick.open_price = 0 + tick.high_price = 0 + tick.low_price = 0 + tick.pre_close = 0 + tick.bid_price_1 = bar.close_price + tick.ask_price_1 = bar.close_price + tick.bid_volume_1 = bar.volume + tick.ask_volume_1 = bar.volume + return tick + def single_test(test_setting: dict, strategy_setting: dict): """ diff --git a/vnpy/app/cta_strategy_pro/template.py b/vnpy/app/cta_strategy_pro/template.py index 3abcdc44..6a7128a0 100644 --- a/vnpy/app/cta_strategy_pro/template.py +++ b/vnpy/app/cta_strategy_pro/template.py @@ -1160,7 +1160,7 @@ class CtaProFutureTemplate(CtaProTemplate): :param trade: :return: """ - self.write_log(u'{},交易更新事件:{},当前持仓:{} ' + self.write_log(u'{},交易更新 =>{},\n 当前持仓:{} ' .format(self.cur_datetime, trade.__dict__, self.position.pos)) @@ -1173,6 +1173,8 @@ class CtaProFutureTemplate(CtaProTemplate): dist_record['volume'] = trade.volume dist_record['price'] = trade.price dist_record['symbol'] = trade.vt_symbol + + # 处理股指锁单 if trade.exchange == Exchange.CFFEX: if trade.direction == Direction.LONG: if abs(self.position.short_pos) >= trade.volume: @@ -1228,7 +1230,7 @@ class CtaProFutureTemplate(CtaProTemplate): def on_order(self, order: OrderData): """报单更新""" # 未执行的订单中,存在是异常,删除 - self.write_log(u'{}报单更新,{}'.format(self.cur_datetime, order.__dict__)) + self.write_log(u'{}报单更新 => {}'.format(self.cur_datetime, order.__dict__)) # 修正order被拆单得情况" self.fix_order(order) @@ -1274,7 +1276,7 @@ class CtaProFutureTemplate(CtaProTemplate): :param order: :return: """ - self.write_log(u'委托单全部完成:{}'.format(order.__dict__)) + self.write_log(u'报单更新 => 委托单全部完成:{}'.format(order.__dict__)) active_order = self.active_orders[order.vt_orderid] # 通过vt_orderid,找到对应的网格 @@ -1330,7 +1332,7 @@ class CtaProFutureTemplate(CtaProTemplate): :param order: :return: """ - self.write_log(u'委托开仓单撤销:{}'.format(order.__dict__)) + self.write_log(u'报单更新 => 委托开仓 => 撤销:{}'.format(order.__dict__)) if not self.trading: if not self.backtesting: @@ -1343,7 +1345,7 @@ class CtaProFutureTemplate(CtaProTemplate): # 直接更新“未完成委托单”,更新volume,retry次数 old_order = self.active_orders[order.vt_orderid] - self.write_log(u'{} 委托信息:{}'.format(order.vt_orderid, old_order)) + self.write_log(u'报单更新 => {} 未完成订单信息:{}'.format(order.vt_orderid, old_order)) old_order['traded'] = order.traded order_vt_symbol = copy(old_order['vt_symbol']) order_volume = old_order['volume'] - old_order['traded'] @@ -1477,7 +1479,7 @@ class CtaProFutureTemplate(CtaProTemplate): else: pre_status = old_order.get('status', Status.NOTTRADED) old_order.update({'status': Status.CANCELLED}) - self.write_log(u'委托单状态:{}=>{}'.format(pre_status, old_order.get('status'))) + self.write_log(u'委托单方式{},状态:{}=>{}'.format(order_type, pre_status, old_order.get('status'))) if grid: if order.vt_orderid in grid.order_ids: grid.order_ids.remove(order.vt_orderid) @@ -1492,7 +1494,7 @@ class CtaProFutureTemplate(CtaProTemplate): def on_order_close_canceled(self, order: OrderData): """委托平仓单撤销""" - self.write_log(u'委托平仓单撤销:{}'.format(order.__dict__)) + self.write_log(u'报单更新 => 委托平仓 => 撤销:{}'.format(order.__dict__)) if order.vt_orderid not in self.active_orders: self.write_error(u'{}不在未完成的委托单中:{}。'.format(order.vt_orderid, self.active_orders)) @@ -1504,7 +1506,7 @@ class CtaProFutureTemplate(CtaProTemplate): # 直接更新“未完成委托单”,更新volume,Retry次数 old_order = self.active_orders[order.vt_orderid] - self.write_log(u'{} 订单信息:{}'.format(order.vt_orderid, old_order)) + self.write_log(u'报单更新 => {} 未完成订单信息:{}'.format(order.vt_orderid, old_order)) old_order['traded'] = order.traded # order_time = old_order['order_time'] order_vt_symbol = copy(old_order['vt_symbol']) @@ -1692,13 +1694,13 @@ class CtaProFutureTemplate(CtaProTemplate): if order_status in [Status.NOTTRADED, Status.SUBMITTING] and ( order_type == OrderType.LIMIT or '.SPD' in order_vt_symbol): if over_seconds > self.cancel_seconds or force: # 超过设置的时间还未成交 - self.write_log(u'超时{}秒未成交,取消委托单:vt_orderid:{},order:{}' + self.write_log(u'撤单逻辑 => 超时{}秒未成交,取消委托单:vt_orderid:{},order:{}' .format(over_seconds, vt_orderid, order_info)) order_info.update({'status': Status.CANCELLING}) self.active_orders.update({vt_orderid: order_info}) ret = self.cancel_order(str(vt_orderid)) if not ret: - self.write_log(u'撤单失败,更新状态为撤单成功') + self.write_log(u'撤单逻辑 => 撤单失败,更新状态为撤单成功') order_info.update({'status': Status.CANCELLED}) self.active_orders.update({vt_orderid: order_info}) if order_grid: @@ -1710,13 +1712,13 @@ class CtaProFutureTemplate(CtaProTemplate): # 处理状态为‘撤销’的委托单 elif order_status == Status.CANCELLED: - self.write_log(u'委托单{}已成功撤单,删除{}'.format(vt_orderid, order_info)) + self.write_log(u'撤单逻辑 => 委托单{}已成功撤单,将删除未完成订单{}'.format(vt_orderid, order_info)) canceled_ids.append(vt_orderid) if reopen: # 撤销的委托单,属于开仓类,需要重新委托 if order_info['offset'] == Offset.OPEN: - self.write_log(u'超时撤单后,重新开仓') + self.write_log(u'撤单逻辑 => 重新开仓') # 开空委托单 if order_info['direction'] == Direction.SHORT: short_price = self.cur_mi_price - self.price_tick @@ -1788,17 +1790,29 @@ class CtaProFutureTemplate(CtaProTemplate): else: self.write_error(u'撤单后,重新委托平空仓失败') else: + self.write_log(u'撤单逻辑 => 无须重新开仓') if order_info['offset'] == Offset.OPEN \ and order_grid \ - and len(order_grid.order_ids) == 0 \ - and order_grid.traded_volume == 0: - self.write_log(u'移除委托网格{}'.format(order_grid.__dict__)) - order_info['grid'] = None - self.gt.remove_grids_by_ids(direction=order_grid.direction, ids=[order_grid.id]) + and len(order_grid.order_ids) == 0: + + if order_info['traded'] == 0 and order_grid.traded_volume == 0: + self.write_log(u'撤单逻辑 => 无任何成交 => 移除委托网格{}'.format(order_grid.__dict__)) + order_info['grid'] = None + self.gt.remove_grids_by_ids(direction=order_grid.direction, ids=[order_grid.id]) + elif order_info['traded'] > 0: + self.write_log('撤单逻辑 = > 部分开仓') + if order_grid.traded_volume < order_info['traded']: + self.write_log('撤单逻辑 = > 调整网格开仓数 {} => {}'.format(order_grid.traded_volume, order_grid['traded'] )) + order_grid.traded_volume = order_info['traded'] + self.write_log(f'撤单逻辑 => 调整网格委托状态=> False, 开仓状态:True, 开仓数量:{order_grid.volume}=>{order_grid.traded_volume}') + order_grid.order_status = False + order_grid.open_status = True + order_grid.volume = order_grid.traded_volume + order_grid.traded_volume = 0 # 删除撤单的订单 for vt_orderid in canceled_ids: - self.write_log(u'删除orderID:{0}'.format(vt_orderid)) + self.write_log(u'撤单逻辑 => 删除未完成订单:{}'.format(vt_orderid)) self.active_orders.pop(vt_orderid, None) if len(self.active_orders) == 0: diff --git a/vnpy/app/cta_strategy_pro/template_spread.py b/vnpy/app/cta_strategy_pro/template_spread.py index 7f2720e1..e19f7ac3 100644 --- a/vnpy/app/cta_strategy_pro/template_spread.py +++ b/vnpy/app/cta_strategy_pro/template_spread.py @@ -1163,19 +1163,19 @@ class CtaSpreadTemplate(CtaTemplate): return True # 检查流动性缺失 - if not self.cur_act_tick.bid_price_1 <= self.cur_act_tick.last_price <= self.cur_act_tick.ask_price_1 \ - and self.cur_act_tick.volume > 0: - self.write_log(u'流动性缺失导致leg1最新价{0} /V:{1}超出买1 {2}卖1 {3}范围,' - .format(self.cur_act_tick.last_price, self.cur_act_tick.volume, - self.cur_act_tick.bid_price_1, self.cur_act_tick.ask_price_1)) - return False - - if not self.cur_pas_tick.bid_price_1 <= self.cur_pas_tick.last_price <= self.cur_pas_tick.ask_price_1 \ - and self.cur_pas_tick.volume > 0: - self.write_log(u'流动性缺失导致leg2最新价{0} /V:{1}超出买1 {2}卖1 {3}范围,' - .format(self.cur_pas_tick.last_price, self.cur_pas_tick.volume, - self.cur_pas_tick.bid_price_1, self.cur_pas_tick.ask_price_1)) - return False + # if not self.cur_act_tick.bid_price_1 <= self.cur_act_tick.last_price <= self.cur_act_tick.ask_price_1 \ + # and self.cur_act_tick.volume > 0: + # self.write_log(u'流动性缺失导致leg1最新价{0} /V:{1}超出买1 {2}卖1 {3}范围,' + # .format(self.cur_act_tick.last_price, self.cur_act_tick.volume, + # self.cur_act_tick.bid_price_1, self.cur_act_tick.ask_price_1)) + # return False + # + # if not self.cur_pas_tick.bid_price_1 <= self.cur_pas_tick.last_price <= self.cur_pas_tick.ask_price_1 \ + # and self.cur_pas_tick.volume > 0: + # self.write_log(u'流动性缺失导致leg2最新价{0} /V:{1}超出买1 {2}卖1 {3}范围,' + # .format(self.cur_pas_tick.last_price, self.cur_pas_tick.volume, + # self.cur_pas_tick.bid_price_1, self.cur_pas_tick.ask_price_1)) + # return False # 如果设置了方向和volume,检查是否满足 if direction==Direction.LONG: diff --git a/vnpy/component/cta_line_bar.py b/vnpy/component/cta_line_bar.py index 36bc7511..825bde04 100644 --- a/vnpy/component/cta_line_bar.py +++ b/vnpy/component/cta_line_bar.py @@ -32,6 +32,12 @@ from vnpy.trader.constant import Interval, Color from vnpy.trader.utility import round_to, get_trading_date, get_underlying_symbol +try: + from vnpy.component.chanlun import ChanGraph, ChanLibrary +except Exception as ex: + print('can not import pyChanlun from vnpy.component.chanlun') + + def get_cta_bar_type(bar_name: str): """根据名称,返回K线类型和K线周期""" @@ -141,7 +147,7 @@ class CtaLineBar(object): CB_ON_PERIOD = 'cb_on_period' # 参数列表,保存了参数的名称 - paramList = ['vt_symbol'] + param_list = ['vt_symbol'] def __init__(self, strategy, cb_on_bar, setting=None): @@ -176,6 +182,7 @@ class CtaLineBar(object): # (实时运行时,或者addbar小于bar得周期时,不包含最后一根正在合成的Bar) # 目标bar合成成功后,才会更新以下序列 + self.index_list = [] self.open_array = np.zeros(self.max_hold_bars) # 与lineBar一致得开仓价清单 self.open_array[:] = np.nan self.high_array = np.zeros(self.max_hold_bars) # 与lineBar一致得最高价清单 @@ -195,7 +202,7 @@ class CtaLineBar(object): self.export_filename = None # 数据要导出的目标文件夹 self.export_fields = [] # 定义要导出的数据字段 - # 创建本类型bar的内部变量,以及添加所有指标输入参数,到self.paramList列表 + # 创建本类型bar的内部变量,以及添加所有指标输入参数,到self.param_list列表 self.init_properties() # 初始化定义所有的指标输入参数,以及指标生成的数据 @@ -226,7 +233,7 @@ class CtaLineBar(object): if self.price_tick < 1: exponent = decimal.Decimal(str(self.price_tick)) self.round_n = max(abs(exponent.as_tuple().exponent) + 2, 4) - self.write_log(f'round_n: {self.round_n}') + #self.write_log(f'round_n: {self.round_n}') # 导入卡尔曼过滤器 if self.para_active_kf: @@ -237,9 +244,18 @@ class CtaLineBar(object): initial_state_covariance=1, observation_covariance=1, transition_covariance=0.01) + except Exception: self.write_log(u'导入卡尔曼过滤器失败,需先安装 pip install pykalman') self.para_active_kf = False + self.para_active_kf2 = False + + if self.para_active_chanlun: + try: + self.chan_lib = ChanLibrary(bi_style=2, duan_style=1, debug=False) + except: + self.write_log(u'导入缠论组件失败') + self.chan_lib = None def register_event(self, event_type, cb_func): """注册事件回调函数""" @@ -248,87 +264,96 @@ class CtaLineBar(object): self.cb_on_period = cb_func def init_param_list(self): - """初始化添加,本类型bar的内部变量,以及添加所有指标输入参数,到self.paramList列表""" + """初始化添加,本类型bar的内部变量,以及添加所有指标输入参数,到self.param_list列表""" # ------- 本类型bar的内部变量 --------- - self.paramList.append('name') # K线的名称 - self.paramList.append('bar_interval') # bar的周期数量 - self.paramList.append('interval') # bar的类型 - self.paramList.append('mode') # tick/bar模式 - self.paramList.append('is_7x24') #是否为7X24小时运行的bar(一般为数字货币) - self.paramList.append('price_tick') # 最小跳动,用于处理指数等不一致的价格 - self.paramList.append('underly_symbol') # 短合约, + self.param_list.append('name') # K线的名称 + self.param_list.append('bar_interval') # bar的周期数量 + self.param_list.append('interval') # bar的类型 + self.param_list.append('mode') # tick/bar模式 + self.param_list.append('is_7x24') #是否为7X24小时运行的bar(一般为数字货币) + self.param_list.append('price_tick') # 最小跳动,用于处理指数等不一致的价格 + self.param_list.append('underly_symbol') # 短合约, # ---------- 下方为指标输入参数 --------------- - self.paramList.append('para_pre_len') # 唐其安通道的长度(前高/前低) + self.param_list.append('para_pre_len') # 唐其安通道的长度(前高/前低) - self.paramList.append('para_ma1_len') # 三条均线 - self.paramList.append('para_ma2_len') - self.paramList.append('para_ma3_len') + self.param_list.append('para_ma1_len') # 三条均线 + self.param_list.append('para_ma2_len') + self.param_list.append('para_ma3_len') - self.paramList.append('para_ema1_len') # 三条EMA均线 - self.paramList.append('para_ema2_len') - self.paramList.append('para_ema3_len') + self.param_list.append('para_ema1_len') # 三条EMA均线 + self.param_list.append('para_ema2_len') + self.param_list.append('para_ema3_len') - self.paramList.append('para_dmi_len') - self.paramList.append('para_dmi_max') + self.param_list.append('para_dmi_len') + self.param_list.append('para_dmi_max') - self.paramList.append('para_atr1_len') # 三个波动率 - self.paramList.append('para_atr2_len') - self.paramList.append('para_atr3_len') + self.param_list.append('para_atr1_len') # 三个波动率 + self.param_list.append('para_atr2_len') + self.param_list.append('para_atr3_len') - self.paramList.append('para_vol_len') # 成交量平均 + self.param_list.append('para_vol_len') # 成交量平均 - self.paramList.append('para_rsi1_len') # 2组 RSI摆动指标 - self.paramList.append('para_rsi2_len') + self.param_list.append('para_jbjs_threshold') # 大单判断比例 (机构买、机构卖指标) + self.param_list.append('para_outstanding_capitals') # 股票的流通市值 (机构买、机构卖指标) - self.paramList.append('para_cmi_len') # + self.param_list.append('para_active_tt') # 计算日内均价线 - self.paramList.append('para_boll_len') # 布林通道长度(文华计算方式) - self.paramList.append('para_boll_tb_len') # 布林通道长度(tb计算方式) - self.paramList.append('para_boll_std_rate') # 标准差倍率,一般为2 - self.paramList.append('para_boll2_len') # 第二条布林通道 - self.paramList.append('para_boll2_tb_len') - self.paramList.append('para_boll2_std_rate') + self.param_list.append('para_rsi1_len') # 2组 RSI摆动指标 + self.param_list.append('para_rsi2_len') - self.paramList.append('para_kdj_len') - self.paramList.append('para_kdj_tb_len') - self.paramList.append('para_kdj_slow_len') - self.paramList.append('para_kdj_smooth_len') + self.param_list.append('para_cmi_len') # - self.paramList.append('para_cci_len') + self.param_list.append('para_boll_len') # 布林通道长度(文华计算方式) + self.param_list.append('para_boll_tb_len') # 布林通道长度(tb计算方式) + self.param_list.append('para_boll_std_rate') # 标准差倍率,一般为2 + self.param_list.append('para_boll2_len') # 第二条布林通道 + self.param_list.append('para_boll2_tb_len') + self.param_list.append('para_boll2_std_rate') - self.paramList.append('para_macd_fast_len') - self.paramList.append('para_macd_slow_len') - self.paramList.append('para_macd_signal_len') + self.param_list.append('para_kdj_len') + self.param_list.append('para_kdj_tb_len') + self.param_list.append('para_kdj_slow_len') + self.param_list.append('para_kdj_smooth_len') - self.paramList.append('para_active_kf') # 卡尔曼均线 + self.param_list.append('para_cci_len') - self.paramList.append('para_sar_step') - self.paramList.append('para_sar_limit') + self.param_list.append('para_macd_fast_len') + self.param_list.append('para_macd_slow_len') + self.param_list.append('para_macd_signal_len') - self.paramList.append('para_active_skd') # 摆动指标 - self.paramList.append('para_skd_fast_len') - self.paramList.append('para_skd_slow_len') - self.paramList.append('para_skd_low') - self.paramList.append('para_skd_high') + self.param_list.append('para_active_kf') # 卡尔曼均线 + self.param_list.append('para_kf_obscov_len') # 卡尔曼均线观测方差的长度 + self.param_list.append('para_active_kf2') # 卡尔曼均线2 + self.param_list.append('para_kf2_obscov_len') # 卡尔曼均线2观测方差的长度 - self.paramList.append('para_active_yb') # 重心线 - self.paramList.append('para_yb_len') - self.paramList.append('para_yb_ref') + self.param_list.append('para_sar_step') + self.param_list.append('para_sar_limit') - self.paramList.append('para_golden_n') # 黄金分割 + self.param_list.append('para_active_skd') # 摆动指标 + self.param_list.append('para_skd_fast_len') + self.param_list.append('para_skd_slow_len') + self.param_list.append('para_skd_low') + self.param_list.append('para_skd_high') - self.paramList.append('para_active_area') + self.param_list.append('para_active_yb') # 重心线 + self.param_list.append('para_yb_len') + self.param_list.append('para_yb_ref') - self.paramList.append('para_bias_len') - self.paramList.append('para_bias2_len') - self.paramList.append('para_bias3_len') + self.param_list.append('para_golden_n') # 黄金分割 - self.paramList.append('para_skdj_n') - self.paramList.append('para_skdj_m') + self.param_list.append('para_active_area') - self.paramList.append('para_bd_len') + self.param_list.append('para_bias_len') + self.param_list.append('para_bias2_len') + self.param_list.append('para_bias3_len') + self.param_list.append('para_skdj_n') + self.param_list.append('para_skdj_m') + + self.param_list.append('para_bd_len') + + self.param_list.append('para_active_chanlun') # 激活缠论 def init_properties(self): @@ -350,7 +375,7 @@ class CtaLineBar(object): """移除Pickle dump()时不支持的Attribute""" state = self.__dict__.copy() # Remove the unpicklable entries. - remove_keys = ['strategy', 'cb_on_bar', 'cb_on_period'] + remove_keys = ['strategy', 'cb_on_bar', 'cb_on_period', 'chan_lib'] for key in self.__dict__.keys(): if key in remove_keys: del state[key] @@ -363,6 +388,8 @@ class CtaLineBar(object): def restore(self, state): """从Pickle中恢复数据""" for key in state.__dict__.keys(): + if key in [ 'chan_lib']: + continue self.__dict__[key] = state.__dict__[key] def init_indicators(self): @@ -388,6 +415,11 @@ class CtaLineBar(object): self.para_vol_len = 0 # 14 # 平均交易量的计算周期 + self.para_jbjs_threshold = 0 # 大单判断比例(机构买、机构卖指标) + self.para_outstanding_capitals = 0 # 股票的流通股数-万万股(机构买、机构卖指标) + + self.para_active_tt = False # 是否激活均价线计算 + self.para_rsi1_len = 0 # 7 # RSI 相对强弱指数(快曲线) self.para_rsi2_len = 0 # 14 # RSI 相对强弱指数(慢曲线) @@ -408,11 +440,15 @@ class CtaLineBar(object): self.para_cci_len = 0 # 计算CCI的K线周期 - self.para_macd_fast_len = 0 # 计算MACD的K线周期(26,12,9) + self.para_macd_fast_len = 0 # 计算MACD的K线周期(12,26,9) self.para_macd_slow_len = 0 # 慢线周期 self.para_macd_signal_len = 0 # 平滑周期 self.para_active_kf = False # 是否激活卡尔曼均线计算 + self.para_kf_obscov_len = 1 # t+1时刻的观测协方差 + + self.para_active_kf2 = False + self.para_kf2_obscov_len = 20 # t+1时刻的观测协方差 self.para_sar_step = 0 # 抛物线的参数 self.para_sar_limit = 0 # 抛物线参数 @@ -501,6 +537,14 @@ class CtaLineBar(object): # K线的交易量平均 self.line_vol_ma = [] # K 线的交易量平均 + # 机构买、机构卖指标 + self.line_jb = [] # 机构买 + self.line_js = [] # 机构卖 + + # 均价线指标 + self.line_tt = [] # 分时均价线 + self.line_tv = [] # 日内的累计成交量 + # K线的RSI计算数据 self.line_rsi1 = [] # 记录K线对应的RSI数值,只保留para_rsi1_len*8 self.line_rsi2 = [] # 记录K线对应的RSI数值,只保留para_rsi2_len*8 @@ -559,6 +603,8 @@ class CtaLineBar(object): self.line_k = [] # K为快速指标 self.line_d = [] # D为慢速指标 self.line_j = [] # + self.line_j_ema1 = [] # + self.line_j_ema2 = [] # self.kdj_top_list = [] # 记录KDJ最高峰,只保留 para_kdj_len个 self.kdj_buttom_list = [] # 记录KDJ的最低谷,只保留 para_kdj_len个 self.line_rsv = [] # RSV @@ -566,6 +612,12 @@ class CtaLineBar(object): self.cur_k = 0 # bar内计算时,最后一个未关闭的bar的实时K值 self.cur_d = 0 # bar内计算时,最后一个未关闭的bar的实时值 self.cur_j = 0 # bar内计算时,最后一个未关闭的bar的实时J值 + self._rt_rsv = None + self._rt_k = None + self._rt_d = None + self._rt_j = None + self._rt_j_ema1 = None + self._rt_j_ema2 = None self.cur_kd_count = 0 # > 0, 金叉, < 0 死叉 self.cur_kd_cross = 0 # 最近一次金叉/死叉的点位 @@ -573,6 +625,7 @@ class CtaLineBar(object): # K线的MACD计算数据(26,12,9) self.line_dif = [] # DIF = EMA12 - EMA26,即为talib-MACD返回值macd + self.dict_dif = {} # datetime str: dif mapping self.line_dea = [] # DEA = (前一日DEA X 8/10 + 今日DIF X 2/10),即为talib-MACD返回值 self.line_macd = [] # (dif-dea)*2,但是talib中MACD的计算是bar = (dif-dea)*1,国内一般是乘以2 self.macd_segment_list = [] # macd 金叉/死叉的段列表,记录价格的最高/最低,Dif的最高,最低,Macd的最高/最低,Macd面接 @@ -600,11 +653,15 @@ class CtaLineBar(object): # 卡尔曼过滤器 self.kf = None + self.kf2 = None self.line_state_mean = [] # 卡尔曼均线 self.line_state_upper = [] # 卡尔曼均线+2标准差 self.line_state_lower = [] # 卡尔曼均线-2标准差 self.line_state_covar = [] # 方差 self.cur_state_std = None + self.line_state_mean2 = [] # 卡尔曼均线2 + self.line_state_covar2 = [] # 方差 + self.kf12_count = 0 # 卡尔曼均线金叉死叉 # SAR 抛物线 self.cur_sar_direction = '' # up/down @@ -685,10 +742,20 @@ class CtaLineBar(object): self.cur_skdj_k = 0 self.cur_skdj_d = 0 + self.para_active_chanlun = False # 是否激活缠论 + self.chan_lib = None + self.chan_graph = None + self.chanlun_calculated = False # 当前bar是否计算过 + self._fenxing_list = [] # 分型列表 + self._bi_list = [] # 笔列表 + self._bi_zs_list = [] # 笔中枢列表 + self._duan_list = [] # 段列表 + self._duan_zs_list = [] # 段中枢列表 + def set_params(self, setting: dict = {}): """设置参数""" d = self.__dict__ - for key in self.paramList: + for key in self.param_list: if key in setting: d[key] = setting[key] @@ -712,7 +779,8 @@ class CtaLineBar(object): if self.cur_tick.last_price is None or self.cur_tick.last_price == 0: if self.cur_tick.ask_price_1 == 0 and self.cur_tick.bid_price_1 == 0: return - + if np.isnan(self.cur_tick.ask_price_1) or np.isnan(self.cur_tick.bid_price_1): + return self.cur_price = round_to((self.cur_tick.ask_price_1 + self.cur_tick.bid_price_1) / 2, self.price_tick) self.cur_tick.last_price = self.cur_price else: @@ -817,7 +885,9 @@ class CtaLineBar(object): bar_mid4 = round((2 * bar.close_price + bar.high_price + bar.low_price) / 4, self.round_n) bar_mid5 = round((2 * bar.close_price + bar.open_price + bar.high_price + bar.low_price) / 5, self.round_n) - # 扩展open,close,high,low numpy array列表 平移更新序列最新值 + # 扩展时间索引,open,close,high,low numpy array列表 平移更新序列最新值 + self.index_list.append(bar.datetime.strftime('%Y-%m-%d %H:%M:%S')) + self.open_array[:-1] = self.open_array[1:] self.open_array[-1] = bar.open_price @@ -843,6 +913,8 @@ class CtaLineBar(object): self.bar_len = len(self.line_bar) # 当前K线得真实数量(包含已经合成以及正在合成的bar) if self.bar_len > self.max_hold_bars: del self.line_bar[0] + self.dict_dif.pop(self.index_list[0], None) + del self.index_list[0] self.bar_len = self.bar_len - 1 # 删除了最前面的bar,bar长度少一位 self.__count_pre_high_low() @@ -851,6 +923,8 @@ class CtaLineBar(object): self.__count_dmi() self.__count_atr() self.__count_vol_ma() + self.__count_jb_js() + self.__count_time_trend() self.__count_rsi() self.__count_cmi() self.__count_kdj() @@ -871,6 +945,7 @@ class CtaLineBar(object): self.export_to_csv(bar) self.rt_executed = False # 是否 启动实时计算得函数 + self.chanlun_calculated = False # 回调上层调用者,将合成的 x分钟bar,回调给策略 def on_bar_x(self, bar: BarData):函数 if self.cb_on_bar: @@ -1000,6 +1075,15 @@ class CtaLineBar(object): if self.para_vol_len > 0 and len(self.line_vol_ma) > 0: msg = msg + u',AvgVol({0}):{1}'.format(self.para_vol_len, self.line_vol_ma[-1]) + if self.para_jbjs_threshold > 0 and len(self.line_jb) > 0 and len(self.line_js) > 0: + msg = msg + u',JBJS({0} {1}):{2} {3}'.format(self.para_jbjs_threshold, + self.para_outstanding_capitals, + self.line_jb[-1], + self.line_js[-1]) + + if self.para_active_tt > 0 and len(self.line_tt) > 0: + msg = msg + u',TT:{0}'.format(self.line_tt[-1]) + if self.para_rsi1_len > 0 and len(self.line_rsi1) > 0: msg = msg + u',Rsi({0}):{1}'.format(self.para_rsi1_len, self.line_rsi1[-1]) @@ -1339,6 +1423,51 @@ class CtaLineBar(object): else: return cur_sar, cur_af + def get_sar2(self, direction, cur_sar, cur_af=0, sar_limit=0.2, sar_step=0.02, restore=False): + """ + 抛物线计算方法(跟随K线加速度) + :param direction: Direction + :param cur_sar: 当前抛物线价格 + :param cur_af: 当前抛物线价格 + :param sar_limit: 最大加速范围 + :param sar_step: 加速因子 + :param restore: 恢复初始加速因子 + :return: 新的 + """ + if np.isnan(self.high_array[-1]): + return cur_sar, cur_af + # 向上抛物线 + if direction == Direction.LONG: + # K线每次新高就更新一次af + if self.high_array[-1] > self.high_array[-2]: + af = cur_af + min(sar_step, sar_limit - cur_af) + else: + if restore: + # 恢复首次初始值 + af = sar_step + else: + # 保持计算因子不变 + af = cur_af + # K线每更新一次,就运行一次 + ep = self.high_array[-1] + sar = cur_sar + af * (ep - cur_sar) + return sar, af + # 向下抛物线 + elif direction == Direction.SHORT: + # K线每次新低就更新一次af + if self.low_array[-1] < self.low_array[-2]: + af = cur_af + min(sar_step, sar_limit - cur_af) + else: + #af = sar_step + af = cur_af + + ep = self.low_array[-1] + sar = cur_sar + af * (ep - cur_sar) + + return sar, af + else: + return cur_sar, cur_af + def __count_sar(self): """计算K线的SAR""" @@ -1480,9 +1609,9 @@ class CtaLineBar(object): # 1、lineBar满足长度才执行计算 if self.bar_len < min(7, self.para_ma1_len, self.para_ma2_len, self.para_ma3_len) + 2: - self.write_log(u'数据未充分,当前Bar数据数量:{0},计算MA需要:{1}'. - format(self.bar_len, - min(7, self.para_ma1_len, self.para_ma2_len, self.para_ma3_len) + 2)) + # self.write_log(u'数据未充分,当前Bar数据数量:{0},计算MA需要:{1}'. + # format(self.bar_len, + # min(7, self.para_ma1_len, self.para_ma2_len, self.para_ma3_len) + 2)) return # 计算第一条MA均线 @@ -2025,6 +2154,111 @@ class CtaLineBar(object): del self.line_vol_ma[0] self.line_vol_ma.append(avgVol) + def __count_jb_js(self): + """ 计算机构买(jb)机构卖(js) + 设1分钟的成交金额为M,M=这一分钟成交量*close价 + 成交额:= VOLUME * C + 流通市值:= CAPITAL * C + 成交比例:= 成交额 / 流通市值 * 10000 (放大10000倍) + 成交额比例:= 成交额 / 100 / 160 + 大单判断标准 + 如果股票流通市值小于等于200亿,则1分钟的成交比例> 1时,这1分钟的成交量为大单 + 如果股票流通市值大于200亿,则1分钟的成交额比例>1时,这1分钟的成交量为大单 + 建立两条资金线 + JB机构买 + 从9:30开盘一直累计,累计规则:如果这1分钟为大单,且这1分钟的close价比前1分钟高,则累计该分钟的成交额,否则累计0 + JS机构卖 + 从9:30开盘一直累计,累计规则:如果这1分钟为大单,且这1分钟的close价比前1分钟低,则累计该分钟的成交额,否则累计0 + """ + + # 大单判断比例大于0才执行计算 + if self.para_jbjs_threshold <= 0: # 不计算 + return + + if len(self.line_bar) > 0: + # 判断是否为大单 + is_big_order = False + outstanding_capitals = self.para_outstanding_capitals * 10000 * 10000 * self.line_bar[-1].close_price # 流通市值 + if outstanding_capitals > 200 * 100000000: + # 成交额比例 + volume_ratio = self.line_bar[-1].volume * self.line_bar[-1].close_price / 100 / 160 + + if volume_ratio > self.para_jbjs_threshold: + is_big_order = True + else: + # 成交比例 + if outstanding_capitals == 0: + # 股票的流通市值应该大于0 + self.para_outstanding_capitals = 1 + order_ratio = self.line_bar[-1].volume * self.line_bar[-1].close_price / outstanding_capitals * 10000 + + if order_ratio > self.para_jbjs_threshold: + is_big_order = True + + # 计算机构买、机构卖指标 + if len(self.line_jb) == 0 or (self.line_bar[-1].trading_day != self.line_bar[-2].trading_day): + # # 新的一天,重新计算JB/JS + # if is_big_order is True: + # if self.line_bar[-1].close_price > self.line_bar[-1].open_price: + # jb = self.line_bar[-1].volume + # js = 0 + # else: + # jb = 0 + # js = self.line_bar[-1].volume + # else: + # jb = 0 + # js = 0 + + # 新的一天,重新计算JB/JS时,忽略第一根K线 + jb = 0 + js = 0 + + if len(self.line_jb) > self.max_hold_bars: + del self.line_jb[0] + del self.line_js[0] + self.line_jb.append(jb) + self.line_js.append(js) + else: + # 日内,累计JB/JS + jb = self.line_jb[-1] + js = self.line_js[-1] + if is_big_order is True: + if self.line_bar[-1].close_price > self.line_bar[-2].close_price: + jb = self.line_bar[-1].volume * self.line_bar[-1].close_price / 10000 + self.line_jb[-1] + js = self.line_js[-1] + elif self.line_bar[-1].close_price < self.line_bar[-2].close_price: + jb = self.line_jb[-1] + js = self.line_bar[-1].volume * self.line_bar[-1].close_price / 10000 + self.line_js[-1] + + if len(self.line_jb) > self.max_hold_bars: + del self.line_jb[0] + del self.line_js[0] + self.line_jb.append(jb) + self.line_js.append(js) + + def __count_time_trend(self): + """ 计算日内分时图""" + if not self.para_active_tt: + return + + if len(self.line_bar) > 0: + # 计算均价线指标 + if len(self.line_tt) == 0 or (self.line_bar[-1].trading_day != self.line_bar[-2].trading_day): + # 新的一天,重新计算 + total_volume = self.line_bar[-1].volume + time_trend = self.line_bar[-1].close_price + else: + # 日内,累计 + total_volume = self.line_bar[-1].volume + self.line_tv[-1] + time_trend = (self.line_bar[-1].close_price * self.line_bar[-1].volume + + self.line_tt[-1] * self.line_tv[-1]) / total_volume + + if len(self.line_tt) > self.max_hold_bars: + del self.line_tt[0] + del self.line_tv[0] + self.line_tt.append(time_trend) + self.line_tv.append(total_volume) + def __count_rsi(self): """计算K线的RSI""" if self.para_rsi1_len <= 0 and self.para_rsi2_len <= 0: @@ -2134,12 +2368,19 @@ class CtaLineBar(object): self.write_log(u'数据未充分,当前Bar数据数量:{0},计算Boll需要:{1}'. format(len(self.line_bar), min(14, self.para_boll_len) + 1)) else: - bollLen = min(self.bar_len - 1, self.para_boll_len) + boll_len = min(self.bar_len - 1, self.para_boll_len) # 不包含当前最新的Bar - upper_list, middle_list, lower_list = ta.BBANDS(self.close_array, - timeperiod=bollLen, nbdevup=self.para_boll_std_rate, + try: + upper_list, middle_list, lower_list = ta.BBANDS(self.close_array, + timeperiod=boll_len, nbdevup=self.para_boll_std_rate, nbdevdn=self.para_boll_std_rate, matype=0) + except Exception as ex: + self.write_log(f'计算布林异常:{str(ex)}') + self.write_log(''.format(self.close_array[-boll_len:])) + print(f'计算布林异常:{str(ex)}',file=sys.stderr) + return + if np.isnan(upper_list[-1]): return @@ -2256,7 +2497,7 @@ class CtaLineBar(object): self.write_log(u'数据未充分,当前Bar数据数量:{0},计算Boll需要:{1}'. format(len(self.line_bar), min(14, self.para_boll_tb_len) + 1)) else: - bollLen = min(self.bar_len - 1, self.para_boll_tb_len) + boll_len = min(self.bar_len - 1, self.para_boll_tb_len) # 不包含当前最新的Bar @@ -2270,10 +2511,10 @@ class CtaLineBar(object): del self.line_boll_std[0] # 1标准差 - std = np.std(self.close_array[-2 * bollLen:], ddof=1) + std = np.std(self.close_array[-2 * boll_len:], ddof=1) self.line_boll_std.append(std) - middle = np.mean(self.close_array[-2 * bollLen:]) + middle = np.mean(self.close_array[-2 * boll_len:]) self.line_boll_middle.append(middle) # 中轨 self.cur_middle = middle - middle % self.price_tick # 中轨取整 @@ -2647,6 +2888,141 @@ class CtaLineBar(object): self.__update_kd_cross() + # J 的EMA值 + j_ema1 = j + j_ema2 = j + if len(self.line_j) >= 30: + j_ema1 = self.__ema(self.__ema(self.__ema(self.line_j[-30:], 2), 2), 2)[-1] + j_ema2 = self.__ema(self.__ema(self.__ema(self.line_j[-30:], 4), 2), 2)[-1] + + if len(self.line_j_ema1) > self.max_hold_bars: + del self.line_j_ema1[0] + del self.line_j_ema2[0] + self.line_j_ema1.append(j_ema1) + self.line_j_ema2.append(j_ema2) + + + def rt_count_kdj(self): + """ + (实时)Kdj计算方法: + 第一步 计算RSV:即未成熟随机值(Raw Stochastic Value)。 + RSV 指标主要用来分析市场是处于“超买”还是“超卖”状态: + - RSV高于80%时候市场即为超买状况,行情即将见顶,应当考虑出仓; + - RSV低于20%时候,市场为超卖状况,行情即将见底,此时可以考虑加仓。 +     N日RSV=(N日收盘价-N日内最低价)÷(N日内最高价-N日内最低价)×100% +     第二步 计算K值:当日K值 = 2/3前1日K值 + 1/3当日RSV ;  +     第三步 计算D值:当日D值 = 2/3前1日D值 + 1/3当日K值;  +     第四步 计算J值:当日J值 = 3当日K值 - 2当日D值.  + """ + if self.para_kdj_len <= 0: + return + + if len(self.line_bar) < self.para_kdj_len + 1: + self.write_log(u'数据未充分,当前Bar数据数量:{0},计算KDJ需要:{1}'.format(len(self.line_bar), self.para_kdj_len + 1)) + return + + if self.para_kdj_slow_len == 0: + self.para_kdj_slow_len = 3 + if self.para_kdj_smooth_len == 0: + self.para_kdj_smooth_len = 3 + + inputKdjLen = min(self.para_kdj_len, self.bar_len - 1) - 1 + + hhv = max(np.append(self.high_array[-inputKdjLen:], [self.line_bar[-1].high_price])) + llv = min(np.append(self.low_array[-inputKdjLen:], [self.line_bar[-1].low_price])) + if np.isnan(hhv) or np.isnan(llv): + return + + if len(self.line_k) > 0: + lastK = self.line_k[-1] + if np.isnan(lastK): + lastK = 0 + else: + lastK = 0 + + if len(self.line_d) > 0: + lastD = self.line_d[-1] + if np.isnan(lastD): + lastD = 0 + else: + lastD = 0 + + if hhv == llv: + rsv = 50 + else: + rsv = (self.line_bar[-1].close_price - llv) / (hhv - llv) * 100 + + k = (self.para_kdj_slow_len - 1) * lastK / self.para_kdj_slow_len + rsv / self.para_kdj_slow_len + if k < 0: + k = 0 + if k > 100: + k = 100 + + d = (self.para_kdj_smooth_len - 1) * lastD / self.para_kdj_smooth_len + k / self.para_kdj_smooth_len + if d < 0: + d = 0 + if d > 100: + d = 100 + + j = self.para_kdj_smooth_len * k - (self.para_kdj_smooth_len - 1) * d + + self._rt_rsv = rsv + self._rt_k = k + self._rt_d = d + self._rt_j = j + + # J 的EMA值 + j_ema1 = j + j_ema2 = j + if len(self.line_j) >= 30: + j_ema1 = self.__ema(self.__ema(self.__ema(np.append(self.line_j[-30:], [j]), 2), 2), 2)[-1] + j_ema2 = self.__ema(self.__ema(self.__ema(np.append(self.line_j[-30:], [j]), 4), 2), 2)[-1] + + self._rt_j_ema1 = j_ema1 + self._rt_j_ema2 = j_ema2 + + @property + def rt_rsv(self): + self.check_rt_funcs(self.rt_count_kdj) + if self._rt_rsv is None and len(self.line_k) > 0: + return self.line_rsv[-1] + return self._rt_rsv + + @property + def rt_k(self): + self.check_rt_funcs(self.rt_count_kdj) + if self._rt_k is None and len(self.line_k) > 0: + return self.line_k[-1] + return self._rt_k + + @property + def rt_d(self): + self.check_rt_funcs(self.rt_count_kdj) + if self._rt_d is None and len(self.line_d) > 0: + return self.line_d[-1] + return self._rt_d + + @property + def rt_j(self): + self.check_rt_funcs(self.rt_count_kdj) + if self._rt_j is None and len(self.line_j) > 0: + return self.line_j[-1] + return self._rt_j + + @property + def rt_j_ema1(self): + self.check_rt_funcs(self.rt_count_kdj) + if self._rt_j_ema1 is None and len(self.line_j_ema1) > 0: + return self.line_j_ema1[-1] + return self._rt_j_ema1 + + @property + def rt_j_ema2(self): + self.check_rt_funcs(self.rt_count_kdj) + if self._rt_j_ema2 is None and len(self.line_j_ema2) > 0: + return self.line_j_ema2[-1] + return self._rt_j_ema2 + def __count_kdj_tb(self): """KDJ指标""" """ @@ -2811,7 +3187,7 @@ class CtaLineBar(object): # maxLen = maxLen * 3 # 注:数据长度需要足够,才能准确。测试过,3倍长度才可以与国内的文华等软件一致 if self.bar_len - 1 < maxLen: - self.write_log(u'数据未充分,当前Bar数据数量:{0},计算MACD需要:{1}'.format(self.bar_len - 1, maxLen)) + #self.write_log(u'数据未充分,当前Bar数据数量:{0},计算MACD需要:{1}'.format(self.bar_len - 1, maxLen)) return dif_list, dea_list, macd_list = ta.MACD(self.close_array, fastperiod=self.para_macd_fast_len, @@ -2826,8 +3202,10 @@ class CtaLineBar(object): if len(self.line_dif) > self.max_hold_bars: del self.line_dif[0] - self.line_dif.append(round(dif_list[-1], self.round_n)) - + dif = round(dif_list[-1], self.round_n) + self.line_dif.append(dif) + if len(self.index_list) > 0: + self.dict_dif.update({self.index_list[-1]: dif}) if len(self.line_dea) > self.max_hold_bars: del self.line_dea[0] self.line_dea.append(round(dea_list[-1], self.round_n)) @@ -2960,6 +3338,10 @@ class CtaLineBar(object): self.rt_macd_cross = round((self._rt_dif + self._rt_dea) / 2, self.round_n) self.rt_macd_cross_price = self.cur_price + # 通过bar的时间,获取dif值 + def get_dif_by_dt(self, str_dt): + return self.dict_dif.get(str_dt, 0) + @property def rt_dif(self): self.check_rt_funcs(self.rt_count_macd) @@ -2986,9 +3368,12 @@ class CtaLineBar(object): 检查MACD DIF是否与价格有背离 :param: direction,多:检查是否有顶背离,空,检查是否有底背离 """ + seg_lens = len(self.macd_segment_list) + if seg_lens <= 2: + return False s1, s2 = None, None # s1,倒数的一个匹配段;s2,倒数第二个匹配段 - for seg in reversed(self.macd_segment_list): - + for idx in range(seg_lens): + seg = self.macd_segment_list[-idx-1] if direction == Direction.LONG: if seg.get('macd_count', 0) > 0: if s1 is None: @@ -3048,9 +3433,12 @@ class CtaLineBar(object): 检查MACD 能量柱是否与价格有背离 :param: direction,多:检查是否有顶背离,空,检查是否有底背离 """ + seg_lens = len(self.macd_segment_list) + if seg_lens <= 2: + return False s1, s2 = None, None # s1,倒数的一个匹配段;s2,倒数第二个匹配段 - for seg in reversed(self.macd_segment_list): - + for idx in range(seg_lens): + seg = self.macd_segment_list[-idx-1] if direction == Direction.LONG: if seg.get('macd_count', 0) > 0: if s1 is None: @@ -3196,8 +3584,6 @@ class CtaLineBar(object): """计算卡尔曼过滤器均线""" if not self.para_active_kf or self.kf is None: return - if self.bar_len < 26: - return if len(self.line_state_mean) == 0 or len(self.line_state_covar) == 0: try: @@ -3205,7 +3591,8 @@ class CtaLineBar(object): observation_matrices=[1], initial_state_mean=self.close_array[-1], initial_state_covariance=1, - transition_covariance=0.01) + transition_covariance=0.01, + observation_covariance=self.para_kf_obscov_len) except Exception: self.write_log(u'导入卡尔曼过滤器失败,需先安装 pip install pykalman') self.para_active_kf = False @@ -3241,6 +3628,45 @@ class CtaLineBar(object): self.line_state_lower.append(m - 3 * std) self.line_state_covar.append(c) + # 计算第二条卡尔曼均线 + if not self.para_active_kf2: + return + + if len(self.line_state_mean2) == 0 or len(self.line_state_covar2) == 0: + try: + self.kf2 = KalmanFilter(transition_matrices=[1], + observation_matrices=[1], + initial_state_mean=self.close_array[-1], + initial_state_covariance=1, + transition_covariance=0.01, + observation_covariance=self.para_kf2_obscov_len) + except Exception: + self.write_log(u'导入卡尔曼过滤器失败,需先安装 pip install pykalman') + self.para_active_kf2 = False + + state_means, state_covariances = self.kf2.filter(self.close_array[-1]) + m = state_means[-1].item() + c = state_covariances[-1].item() + + else: + m = self.line_state_mean2[-1] + c = self.line_state_covar2[-1] + + state_means, state_covariances = self.kf2.filter_update(filtered_state_mean=m, + filtered_state_covariance=c, + observation=self.close_array[-1]) + m = state_means[-1].item() + c = state_covariances[-1].item() + self.line_state_mean2.append(m) + self.line_state_covar2.append(c) + + if len(self.line_state_mean) > 0 and len(self.line_state_mean2) > 0: + # 金叉死叉 + if self.line_state_mean[-1] > self.line_state_mean2[-1]: + self.kf12_count = (self.kf12_count + 1) if self.kf12_count > 0 else 1 + else: + self.kf12_count = self.kf12_count - 1 if self.kf12_count < 0 else -1 + def __count_period(self, bar): """重新计算周期""" @@ -4330,7 +4756,6 @@ class CtaLineBar(object): K = self.__ema(RSV, MM) D = pd.Series(data=K).rolling(window=MM).mean().values - if len(self.line_skdj_k) > self.max_hold_bars: self.line_skdj_k.pop(0) if not np.isnan(K[-1]): @@ -4343,6 +4768,433 @@ class CtaLineBar(object): self.line_skdj_d.append(D[-1]) self.cur_skdj_d = D[-1] + def __count_chanlun(self): + """重新计算缠论""" + if self.chanlun_calculated: + return + + if not self.chan_lib: + return + + if self.bar_len <= 3: + return + + if self.chan_graph is not None: + del self.chan_graph + self.chan_graph = None + self.chan_graph = ChanGraph(chan_lib=self.chan_lib, + index=self.index_list[-self.bar_len+1:], + high=self.high_array[-self.bar_len+1:], + low=self.low_array[-self.bar_len+1:]) + self._fenxing_list = self.chan_graph.fenxing_list + self._bi_list = self.chan_graph.bi_list + self._bi_zs_list = self.chan_graph.bi_zhongshu_list + self._duan_list = self.chan_graph.duan_list + #self._duan_zs_list = self.chan_graph.duan_zhongshu_list + + self.chanlun_calculated = True + + @property + def fenxing_list(self): + if not self.chanlun_calculated: + self.__count_chanlun() + return self._fenxing_list + + @property + def bi_list(self): + if not self.chanlun_calculated: + self.__count_chanlun() + return self._bi_list + + @property + def bi_zs_list(self): + if not self.chanlun_calculated: + self.__count_chanlun() + return self._bi_zs_list + + @property + def duan_list(self): + if not self.chanlun_calculated: + self.__count_chanlun() + return self._duan_list + + # @property + # def duan_zs_list(self): + # if not self.chanlun_calculated: + # self.__count_chanlun() + # return self._duan_zs_list + + def is_bi_beichi_inside_duan(self, direction): + """当前段内的笔,是否形成背驰""" + if len(self._duan_list) == 0: + return False + + # Direction => int + if isinstance(direction, Direction): + direction = 1 if direction == Direction.LONG else -1 + + # 分型需要确认 + if self.fenxing_list[-1].is_rt: + return False + + # 当前段 + duan = self._duan_list[-1] + if duan.direction != direction: + return False + + # 当前段包含的分笔,必须大于等于5(缠论里面,如果只有三个分笔,背驰的力度比较弱) + if len(duan.bi_list) < 5: + return False + + # 获取最近2个匹配direction的分型 + fx_list = [fx for fx in self._fenxing_list[-4:] if fx.direction == direction] + if len(fx_list) != 2: + return False + + # 这里是排除段的信号出错,获取了很久之前的一段,而不是最新的一段 + if duan.end < fx_list[0].index: + return False + + # 分笔与段同向 + if duan.bi_list[-1].direction != direction \ + or duan.bi_list[-3].direction != direction \ + or duan.bi_list[-5].direction != direction: + return False + + # 背驰: 同向分笔,逐笔提升,最后一笔,比上一同向笔,短,斜率也比上一同向笔小 + if direction == 1: + if duan.bi_list[-1].low > duan.bi_list[-3].low > duan.bi_list[-5].low \ + and duan.bi_list[-1].low > duan.bi_list[-5].high \ + and duan.bi_list[-1].height < duan.bi_list[-3].height \ + and duan.bi_list[-1].atan < duan.bi_list[-3].atan: + return True + + if direction == -1: + if duan.bi_list[-1].high < duan.bi_list[-3].high < duan.bi_list[-5].high \ + and duan.bi_list[-1].high < duan.bi_list[-5].low \ + and duan.bi_list[-1].height < duan.bi_list[-3].height\ + and duan.bi_list[-1].atan < duan.bi_list[-3].atan: + return True + + return False + + def is_fx_macd_divergence(self, direction): + """ + 分型的macd背离 + :param direction: 1,-1 或者 Direction.LONG(判断是否顶背离), Direction.SHORT(判断是否底背离) + + :return: + """ + if isinstance(direction, Direction): + direction = 1 if direction == Direction.LONG else -1 + # 当前段 + duan = self._duan_list[-1] + + if duan.direction != direction: + return False + + # 当前段包含的分笔,必须大于3 + if len(duan.bi_list) <= 3: + return False + + # 获取最近2个匹配direction的分型 + fx_list = [fx for fx in self._fenxing_list[-4:] if fx.direction == direction] + if len(fx_list) != 2: + return False + + # 这里是排除段的信号出错,获取了很久之前的一段,而不是最新的一段 + if duan.end < fx_list[0].index: + return False + + pre_dif = self.get_dif_by_dt(fx_list[0].index) + cur_dif = self.get_dif_by_dt(fx_list[1].index) + if pre_dif is None or cur_dif is None: + return False + if direction == 1: + # 前顶分型顶部价格 + pre_price = fx_list[0].high + # 当前顶分型顶部价格 + cur_price = fx_list[1].high + if pre_price < cur_price and pre_dif >= cur_dif and 0 < self.line_dif[-1] < self.line_dif[-2]: + return True + else: + pre_price = fx_list[0].low + cur_price = fx_list[1].low + if pre_price > cur_price and pre_dif <= cur_dif and self.line_dif[-2] < self.line_dif[-1] < 0: + return True + + return False + + def is_2nd_opportunity(self, direction): + """ + 是二买、二卖机会 + 【二买】当前线段下行,最后2笔不在线段中,最后一笔与下行线段同向,该笔底部不破线段底部,底分型出现且确认 + 【二卖】当前线段上行,最后2笔不在线段中,最后一笔与上行线段同向,该笔顶部不破线段顶部,顶分型出现且确认 + :param direction: 1、Direction.LONG, 当前线段的方向, 判断是否二卖机会; -1 Direction.SHORT, 判断是否二买 + :return: + """ + # Direction => int + if isinstance(direction, Direction): + direction = 1 if direction == Direction.LONG else -1 + + # 具备段 + if len(self.duan_list) < 1: + return False + cur_duan = self.duan_list[-1] + if cur_duan.direction != direction: + return False + + # 当前段到最新bar之间的笔列表(此时未出现中枢) + extra_bi_list = [bi for bi in self.bi_list[-3:] if bi.end > cur_duan.end] + if len(extra_bi_list) < 2: + return False + + # 最后一笔是同向 + if extra_bi_list[-1].direction != direction: + return False + + # 线段外一笔的高度,不能超过线段最后一笔高度 + if extra_bi_list[0].height > cur_duan.bi_list[-1].height: + return False + + # 最后一笔的高度,不能超过最后一段的高度的黄金分割38% + if extra_bi_list[-1].height > cur_duan.height * 0.38: + return False + + # 最后的分型,不是实时。 + if not self.fenxing_list[-1].is_rt: + return True + + return False + + def is_contain_zs_inside_duan(self, direction, zs_num): + """最近段,符合方向,并且至少包含zs_num个中枢""" + + # Direction => int + if isinstance(direction, Direction): + direction = 1 if direction == Direction.LONG else -1 + + # 具备中枢 + if len(self.bi_zs_list) < zs_num: + return False + # 具备段 + if len(self.duan_list) < 1: + return False + + cur_duan = self.duan_list[-1] + if cur_duan.direction != direction: + return False + + # 段的开始时间,至少大于前zs_num个中枢的结束时间 + if cur_duan.start > self.bi_zs_list[-zs_num].end: + return False + + return True + + def is_contain_zs_with_direction(self, start, direction, zs_num): + """从start开始计算,至少包含zs_num(>1)个中枢,且最后两个中枢符合方向""" + + if zs_num < 2: + return False + + # Direction => int + if isinstance(direction, Direction): + direction = 1 if direction == Direction.LONG else -1 + + # 具备中枢 + if len(self.bi_zs_list) < zs_num: + return False + + bi_zs_list = [zs for zs in self.bi_zs_list[-zs_num:] if zs.end > start] + + if len(bi_zs_list) != zs_num: + return False + + if direction == 1 and bi_zs_list[-2].high < bi_zs_list[-1].high: + return True + + if direction == -1 and bi_zs_list[-2].high > bi_zs_list[-1].high: + return True + + return False + + def is_zs_beichi_inside_duan(self, direction): + """是否中枢盘整背驰,进入笔、离去笔,高度,能量背驰""" + + # Direction => int + if isinstance(direction, Direction): + direction = 1 if direction == Direction.LONG else -1 + + # 具备中枢 + if len(self.bi_zs_list) < 1: + return False + # 具备段 + if len(self.duan_list) < 1: + return False + # 最后线段 + cur_duan = self.duan_list[-1] + if cur_duan.direction != direction: + return False + + # 线段内的笔中枢(取前三个就可以了) + zs_list_inside_duan = [zs for zs in self.bi_zs_list[-3:] if zs.start >= cur_duan.start] + + # 无中枢,或者超过1个中枢,都不符合中枢背驰 + if len(zs_list_inside_duan) != 1: + return False + # 当前中枢 + cur_zs = zs_list_inside_duan[0] + + # 当前中枢最后一笔,与段最后一笔不一致 + if cur_duan.bi_list[-1].end != cur_zs.bi_list[-1].end: + return False + + # 分型需要确认 + if self.fenxing_list[-1].is_rt: + return False + + # 找出中枢得进入笔 + entry_bi = cur_zs.bi_list[0] + if entry_bi.direction != direction: + # 找出中枢之前,与段同向得笔 + before_bi_list = [bi for bi in cur_duan.bi_list if bi.start < entry_bi.start and bi.direction==direction] + # 中枢之前得同向笔,不存在(一般不可能,因为中枢得第一笔不同向,该中枢存在与段中间) + if len(before_bi_list) == 0: + return False + entry_bi = before_bi_list[-1] + + # 中枢第一笔,与最后一笔,比较力度和能量 + if entry_bi.height > cur_zs.bi_list[-1].height\ + and entry_bi.atan > cur_zs.bi_list[-1].atan: + return True + + return False + + def is_zs_fangda(self, cur_bi_zs = None, start=False, last_bi=False): + """ + 判断中枢,是否为放大型中枢。 + 中枢放大,一般是反向力量的强烈试探导致; + cur_bi_zs: 指定的笔中枢,若不指定,则默认为最后一个中枢 + start: True,从中枢开始的笔进行计算前三, False: 从最后三笔计算 + last_bi: 采用缺省最后一笔时,是否要求最后一笔,必须等于中枢得最后一笔 + """ + if cur_bi_zs is None: + # 具备中枢 + if len(self.bi_zs_list) < 1: + return False + cur_bi_zs = self.bi_zs_list[-1] + if last_bi: + cur_bi = self.bi_list[-1] + # 要求最后一笔,必须等于中枢得最后一笔 + if cur_bi.start != cur_bi_zs.bi_list[-1].start: + return False + + if len(cur_bi_zs.bi_list) < 3: + return False + + # 从开始前三笔计算 + if start and cur_bi_zs.bi_list[2].height > cur_bi_zs.bi_list[1].height > cur_bi_zs.bi_list[0].height: + return True + + # 从最后的三笔计算 + if not start and cur_bi_zs.bi_list[-1].height > cur_bi_zs.bi_list[-2].height > cur_bi_zs.bi_list[-3].height: + return True + + return False + + def is_zs_shoulian(self, cur_bi_zs=None, start=False, last_bi=False): + """ + 判断中枢,是否为收殓型中枢。 + 中枢收敛,一般是多空力量的趋于平衡,如果是段中的第二个或以上中枢,可能存在变盘; + cur_bi_zs: 指定的中枢,或者最后一个中枢 + start: True,从中枢开始的笔进行计算前三, False: 从最后三笔计算 + """ + if cur_bi_zs is None: + # 具备中枢 + if len(self.bi_zs_list) < 1: + return False + + cur_bi_zs = self.bi_zs_list[-1] + if last_bi: + cur_bi = self.bi_list[-1] + # 要求最后一笔,必须等于中枢得最后一笔 + if cur_bi.start != cur_bi_zs.bi_list[-1].start: + return False + + if len(cur_bi_zs.bi_list) < 3: + return False + + if start and cur_bi_zs.bi_list[2].height < cur_bi_zs.bi_list[1].height < cur_bi_zs.bi_list[0].height: + return True + + if not start and cur_bi_zs.bi_list[-1].height < cur_bi_zs.bi_list[-2].height < cur_bi_zs.bi_list[-3].height: + return True + + return False + + def is_zoushi_beichi(self, direction): + """ + 判断是否走势背驰 + :param direction: + :return: + """ + # Direction => int + if isinstance(direction, Direction): + direction = 1 if direction == Direction.LONG else -1 + + # 具备中枢 + if len(self.bi_zs_list) < 1: + return False + # 具备段 + if len(self.duan_list) < 1: + return False + # 最后线段 + cur_duan = self.duan_list[-1] + if cur_duan.direction != direction: + return False + + # 线段内的笔中枢(取前三个就可以了) + zs_list_inside_duan = [zs for zs in self.bi_zs_list[-3:] if zs.start >= cur_duan.start] + + # 少于2个中枢,都不符合走势背驰 + if len(zs_list_inside_duan) < 2: + return False + # 当前中枢 + cur_zs = zs_list_inside_duan[-1] + # 上一个中枢 + pre_zs = zs_list_inside_duan[-2] + bi_list_between_zs = [bi for bi in cur_duan.bi_list if bi.direction == direction and bi.end > pre_zs.end and bi.start < cur_zs.start] + if len(bi_list_between_zs) ==0: + return False + + # 最后一笔,作为2个中枢间的笔 + bi_between_zs = bi_list_between_zs[-1] + + bi_list_after_cur_zs = [bi for bi in cur_duan.bi_list if bi.direction==direction and bi.end > cur_zs.end] + if len(bi_list_after_cur_zs) == 0: + return False + + # 离开中枢的一笔 + bi_leave_cur_zs = bi_list_after_cur_zs[0] + + # 离开中枢的一笔,不是段的最后一笔 + if bi_leave_cur_zs.start != cur_duan.bi_list[-1].start: + return False + + # 离开中枢的一笔,不是最后一笔 + if bi_leave_cur_zs.start != self.bi_list[-1].start: + return False + + fx = [fx for fx in self.fenxing_list[-2:] if fx.direction==direction][-1] + if fx.is_rt: + return False + + # 中枢间的分笔,能量大于最后分笔,形成走势背驰 + if bi_between_zs.height > bi_leave_cur_zs.height and bi_between_zs.atan > bi_leave_cur_zs.atan: + return True + + return False + def write_log(self, content): """记录CTA日志""" self.strategy.write_log(u'[' + self.name + u']' + content) @@ -4616,6 +5468,31 @@ class CtaLineBar(object): 'type': 'line'} indicators.update({indicator.get('name'): copy.copy(indicator)}) + # 机构买、机构卖指标( 副图) + if isinstance(self.para_jbjs_threshold, float) and self.para_jbjs_threshold > 0: + indicator = { + 'name': 'JB', + 'attr_name': 'line_jb', + 'is_main': False, + 'type': 'line'} + indicators.update({indicator.get('name'): copy.copy(indicator)}) + if isinstance(self.para_jbjs_threshold, float) and self.para_jbjs_threshold > 0: + indicator = { + 'name': 'JS', + 'attr_name': 'line_js', + 'is_main': False, + 'type': 'line'} + indicators.update({indicator.get('name'): copy.copy(indicator)}) + + # 日内均价线指标 + if self.para_active_tt: + indicator = { + 'name': 'TT', + 'attr_name': 'line_tt', + 'is_main': True, + 'type': 'line'} + indicators.update({indicator.get('name'): copy.copy(indicator)}) + # 摆动指标(附图) if isinstance(self.para_rsi1_len, int) and self.para_rsi1_len > 0: indicator = { @@ -4876,6 +5753,8 @@ class CtaMinuteBar(CtaLineBar): def restore(self, state): """从Pickle中恢复数据""" for key in state.__dict__.keys(): + if key in ['chan_lib']: + continue self.__dict__[key] = state.__dict__[key] def init_properties(self): @@ -5068,8 +5947,9 @@ class CtaMinuteBar(CtaLineBar): # 数字货币市场,分钟是连续的,所以只判断是否取整,或者与上一根bar的距离 if self.is_7x24: + if ( - tick.datetime.minute % self.bar_interval == 0 and tick.datetime.minute != lastBar.datetime.minute) or ( + (tick.datetime.hour * 60 + tick.datetime.minute) % self.bar_interval == 0 and tick.datetime.minute != lastBar.datetime.minute) or ( tick.datetime - lastBar.datetime).total_seconds() > self.bar_interval * 60: # self.write_log('{} drawLineBar() new_bar,{} lastbar:{}, bars_count={}' # .format(self.name, tick.datetime, lastBar.datetime, @@ -5132,6 +6012,8 @@ class CtaHourBar(CtaLineBar): def restore(self, state): """从Pickle中恢复数据""" for key in state.__dict__.keys(): + if key in ['chan_lib']: + continue self.__dict__[key] = state.__dict__[key] def init_properties(self): @@ -5151,12 +6033,13 @@ class CtaHourBar(CtaLineBar): def add_bar(self, bar, bar_is_completed=False, bar_freq=1): """ - 予以外部初始化程序增加bar + (小时K线)予以外部初始化程序增加bar :param bar: :param bar_is_completed: 插入的bar,其周期与K线周期一致,就设为True :param bar_freq, bar对象得frequency :return: """ + if bar.trading_day is None: if self.is_7x24: bar.trading_day = bar.date @@ -5369,6 +6252,8 @@ class CtaDayBar(CtaLineBar): def restore(self, state): """从Pickle中恢复数据""" for key in state.__dict__.keys(): + if key in ['chan_lib']: + continue self.__dict__[key] = state.__dict__[key] def init_properties(self): @@ -5569,6 +6454,8 @@ class CtaWeekBar(CtaLineBar): def restore(self, state): """从Pickle中恢复数据""" for key in state.__dict__.keys(): + if key in ['chan_lib']: + continue self.__dict__[key] = state.__dict__[key] def init_properties(self): diff --git a/vnpy/component/cta_renko_bar.py b/vnpy/component/cta_renko_bar.py index 3b7da59a..110c14dd 100644 --- a/vnpy/component/cta_renko_bar.py +++ b/vnpy/component/cta_renko_bar.py @@ -9,6 +9,8 @@ import sys import traceback import talib as ta import numpy as np +import pandas as pd +import csv from collections import OrderedDict from datetime import datetime, timedelta @@ -20,6 +22,12 @@ from vnpy.trader.constant import Direction, Color from vnpy.component.cta_period import CtaPeriod, Period +try: + from vnpy.component.chanlun import ChanGraph, ChanLibrary +except Exception as ex: + print('can not import pyChanlun from vnpy.component.chanlun') + + class CtaRenkoBar(object): """CTA 砖型K线""" @@ -75,6 +83,7 @@ class CtaRenkoBar(object): self.param_list.append('para_kdj_smooth_len') self.param_list.append('para_active_kf') # 卡尔曼均线 + self.param_list.append('para_kf_obscov_len') # 卡尔曼均线观测方差的长度 self.param_list.append('para_active_skd') # 摆动指标 self.param_list.append('para_skd_fast_len') @@ -91,6 +100,7 @@ class CtaRenkoBar(object): self.param_list.append('para_yb_ref') self.param_list.append('para_golden_n') # 黄金分割 + self.param_list.append('para_active_chanlun') # 激活缠论 # 输入参数 @@ -322,6 +332,7 @@ class CtaRenkoBar(object): # 卡尔曼过滤器 self.para_active_kf = False + self.para_kf_obscov_len = 1 # t+1时刻的观测协方差 self.kf = None self.line_state_mean = [] self.line_state_covar = [] @@ -389,6 +400,7 @@ class CtaRenkoBar(object): self.is_7x24 = False # (实时运行时,或者addbar小于bar得周期时,不包含最后一根Bar) + self.index_list = [] self.open_array = np.zeros(self.max_hold_bars) # 与lineBar一致得开仓价清单 self.open_array[:] = np.nan self.high_array = np.zeros(self.max_hold_bars) # 与lineBar一致得最高价清单 @@ -405,6 +417,16 @@ class CtaRenkoBar(object): self.mid5_array = np.zeros(self.max_hold_bars) # 收盘价*2/开仓价/最高/最低价 的平均价 self.mid5_array[:] = np.nan + self.para_active_chanlun = False # 是否激活缠论 + self.chan_lib = None + self.chan_graph = None + self.chanlun_calculated = False # 当前bar是否计算过 + self._fenxing_list = [] # 分型列表 + self._bi_list = [] # 笔列表 + self._bi_zs_list = [] # 笔中枢列表 + self._duan_list = [] # 段列表 + self._duan_zs_list = [] # 段中枢列表 + # 导出到csv文件 self.export_filename = None self.export_fields = [] @@ -417,6 +439,7 @@ class CtaRenkoBar(object): # 事件回调函数 self.cb_dict = {} + if setting: self.setParam(setting) @@ -429,6 +452,13 @@ class CtaRenkoBar(object): if self.kilo_height > 0: self.height = self.price_tick * self.kilo_height + if self.para_active_chanlun: + try: + self.chan_lib = ChanLibrary(bi_style=2, duan_style=1) + except: + self.write_log(u'导入缠论组件失败') + self.chan_lib = None + def __getstate__(self): """移除Pickle dump()时不支持的Attribute""" state = self.__dict__.copy() @@ -508,7 +538,8 @@ class CtaRenkoBar(object): observation_matrices=[1], initial_state_mean=self.last_price_list[-1], initial_state_covariance=1, - transition_covariance=0.01) + transition_covariance=0.01, + observation_covariance=self.para_kf_obscov_len) state_means, state_covariances = self.tick_kf.filter(np.array(self.last_price_list, dtype=float)) m = state_means[-1].item() c = state_covariances[-1].item() @@ -600,6 +631,9 @@ class CtaRenkoBar(object): # 新添加得bar比现有得bar时间晚,不添加 if bar.datetime < self.line_bar[-1].datetime: return + if bar.datetime == self.line_bar[-1].datetime: + if bar.close_price != self.line_bar[-1].close_price: + bar.datetime += timedelta(microseconds=1) # 更新最后价格 self.cur_price = bar.close_price @@ -615,7 +649,9 @@ class CtaRenkoBar(object): bar_mid4 = round((2 * bar.close_price + bar.high_price + bar.low_price) / 4, self.round_n) bar_mid5 = round((2 * bar.close_price + bar.open_price + bar.high_price + bar.low_price) / 5, self.round_n) - # 扩展open,close,high,low numpy array列表 + # 扩展时间索引,open,close,high,low numpy array列表 平移更新序列最新值 + self.index_list.append(bar.datetime.strftime('%Y-%m-%d %H:%M:%S.%f')) + self.open_array[:-1] = self.open_array[1:] self.open_array[-1] = bar.open_price @@ -654,7 +690,9 @@ class CtaRenkoBar(object): elif bar.close_price < bar.open_price: bar.color = Color.BLUE - # 扩展open,close,high,low 列表 + # 扩展时间索引,open,close,high,low numpy array列表 平移更新序列最新值 + self.index_list.append(bar.datetime.strftime('%Y-%m-%d %H:%M:%S.%f')) + self.open_array[:-1] = self.open_array[1:] self.open_array[-1] = bar.open_price @@ -709,6 +747,8 @@ class CtaRenkoBar(object): self.runtime_recount() + self.chanlun_calculated = False + # 回调上层调用者 self.cb_on_bar(bar, self.name) @@ -2848,13 +2888,14 @@ class CtaRenkoBar(object): observation_matrices=[1], initial_state_mean=self.close_array[-1], initial_state_covariance=1, - - transition_covariance=0.01) + transition_covariance=0.01, + observation_covariance = self.para_kf_obscov_len + ) except Exception: self.write_log(u'导入卡尔曼过滤器失败,需先安装 pip install pykalman') self.para_active_kf = False - state_means, state_covariances = self.kf.filter(np.array(self.close_array, dtype=float)) + state_means, state_covariances = self.kf.filter(np.array(self.close_array[-1], dtype=float)) m = state_means[-1].item() c = state_covariances[-1].item() else: @@ -3521,6 +3562,432 @@ class CtaRenkoBar(object): self.write_log(u'call back event{} exception:{}'.format(self.CB_ON_PERIOD, str(ex))) self.write_log(u'traceback:{}'.format(traceback.format_exc())) + def __count_chanlun(self): + """重新计算缠论""" + if self.chanlun_calculated: + return + + if not self.chan_lib: + return + + if self.bar_len <= 3: + return + + if self.chan_graph is not None: + del self.chan_graph + self.chan_graph = None + self.chan_graph = ChanGraph(chan_lib=self.chan_lib, + index=self.index_list[-self.bar_len+1:], + high=self.high_array[-self.bar_len+1:], + low=self.low_array[-self.bar_len+1:]) + self._fenxing_list = self.chan_graph.fenxing_list + self._bi_list = self.chan_graph.bi_list + self._bi_zs_list = self.chan_graph.bi_zhongshu_list + self._duan_list = self.chan_graph.duan_list + self._duan_zs_list = self.chan_graph.duan_zhongshu_list + + self.chanlun_calculated = True + + @property + def fenxing_list(self): + if not self.chanlun_calculated: + self.__count_chanlun() + return self._fenxing_list + + @property + def bi_list(self): + if not self.chanlun_calculated: + self.__count_chanlun() + return self._bi_list + + @property + def bi_zs_list(self): + if not self.chanlun_calculated: + self.__count_chanlun() + return self._bi_zs_list + + @property + def duan_list(self): + if not self.chanlun_calculated: + self.__count_chanlun() + return self._duan_list + + @property + def duan_zs_list(self): + if not self.chanlun_calculated: + self.__count_chanlun() + return self._duan_zs_list + + def is_bi_beichi_inside_duan(self, direction): + """当前段内的笔,是否形成背驰""" + if len(self._duan_list) == 0: + return False + + # Direction => int + if isinstance(direction, Direction): + direction = 1 if direction == Direction.LONG else -1 + + # 分型需要确认 + if self.fenxing_list[-1].is_rt: + return False + + # 当前段 + duan = self._duan_list[-1] + if duan.direction != direction: + return False + + # 当前段包含的分笔,必须大于等于5(缠论里面,如果只有三个分笔,背驰的力度比较弱) + if len(duan.bi_list) < 5: + return False + + # 获取最近2个匹配direction的分型 + fx_list = [fx for fx in self._fenxing_list[-4:] if fx.direction == direction] + if len(fx_list) != 2: + return False + + # 这里是排除段的信号出错,获取了很久之前的一段,而不是最新的一段 + if duan.end < fx_list[0].index: + return False + + # 分笔与段同向 + if duan.bi_list[-1].direction != direction \ + or duan.bi_list[-3].direction != direction \ + or duan.bi_list[-5].direction != direction: + return False + + # 背驰: 同向分笔,逐笔提升,最后一笔,比上一同向笔,短,斜率也比上一同向笔小 + if direction == 1: + if duan.bi_list[-1].low > duan.bi_list[-3].low > duan.bi_list[-5].low \ + and duan.bi_list[-1].low > duan.bi_list[-5].high \ + and duan.bi_list[-1].height < duan.bi_list[-3].height \ + and duan.bi_list[-1].atan < duan.bi_list[-3].atan: + return True + + if direction == -1: + if duan.bi_list[-1].high < duan.bi_list[-3].high < duan.bi_list[-5].high \ + and duan.bi_list[-1].high < duan.bi_list[-5].low \ + and duan.bi_list[-1].height < duan.bi_list[-3].height\ + and duan.bi_list[-1].atan < duan.bi_list[-3].atan: + return True + + return False + + def is_fx_macd_divergence(self, direction): + """ + 分型的macd背离 + :param direction: 1,-1 或者 Direction.LONG(判断是否顶背离), Direction.SHORT(判断是否底背离) + + :return: + """ + if isinstance(direction, Direction): + direction = 1 if direction == Direction.LONG else -1 + # 当前段 + duan = self._duan_list[-1] + + if duan.direction != direction: + return False + + # 当前段包含的分笔,必须大于3 + if len(duan.bi_list) <= 3: + return False + + # 获取最近2个匹配direction的分型 + fx_list = [fx for fx in self._fenxing_list[-4:] if fx.direction == direction] + if len(fx_list) != 2: + return False + + # 这里是排除段的信号出错,获取了很久之前的一段,而不是最新的一段 + if duan.end < fx_list[0].index: + return False + + pre_dif = self.get_dif_by_dt(fx_list[0].index) + cur_dif = self.get_dif_by_dt(fx_list[1].index) + if pre_dif is None or cur_dif is None: + return False + if direction == 1: + # 前顶分型顶部价格 + pre_price = fx_list[0].high + # 当前顶分型顶部价格 + cur_price = fx_list[1].high + if pre_price < cur_price and pre_dif >= cur_dif and 0 < self.line_dif[-1] < self.line_dif[-2]: + return True + else: + pre_price = fx_list[0].low + cur_price = fx_list[1].low + if pre_price > cur_price and pre_dif <= cur_dif and self.line_dif[-2] < self.line_dif[-1] < 0: + return True + + return False + + def is_2nd_opportunity(self, direction): + """ + 是二买、二卖机会 + 【二买】当前线段下行,最后2笔不在线段中,最后一笔与下行线段同向,该笔底部不破线段底部,底分型出现且确认 + 【二卖】当前线段上行,最后2笔不在线段中,最后一笔与上行线段同向,该笔顶部不破线段顶部,顶分型出现且确认 + :param direction: 1、Direction.LONG, 当前线段的方向, 判断是否二卖机会; -1 Direction.SHORT, 判断是否二买 + :return: + """ + # Direction => int + if isinstance(direction, Direction): + direction = 1 if direction == Direction.LONG else -1 + + # 具备段 + if len(self.duan_list) < 1: + return False + cur_duan = self.duan_list[-1] + if cur_duan.direction != direction: + return False + + # 当前段到最新bar之间的笔列表(此时未出现中枢) + extra_bi_list = [bi for bi in self.bi_list[-3:] if bi.end > cur_duan.end] + if len(extra_bi_list) < 2: + return False + + # 最后一笔是同向 + if extra_bi_list[-1].direction != direction: + return False + + # 线段外一笔的高度,不能超过线段最后一笔高度 + if extra_bi_list[0].height > cur_duan.bi_list[-1].height: + return False + + # 最后一笔的高度,不能超过最后一段的高度的黄金分割38% + if extra_bi_list[-1].height > cur_duan.height * 0.38: + return False + + # 最后的分型,不是实时。 + if not self.fenxing_list[-1].is_rt: + return True + + return False + + def is_contain_zs_inside_duan(self, direction, zs_num): + """最近段,符合方向,并且至少包含zs_num个中枢""" + + # Direction => int + if isinstance(direction, Direction): + direction = 1 if direction == Direction.LONG else -1 + + # 具备中枢 + if len(self.bi_zs_list) < zs_num: + return False + # 具备段 + if len(self.duan_list) < 1: + return False + + cur_duan = self.duan_list[-1] + if cur_duan.direction != direction: + return False + + # 段的开始时间,至少大于前zs_num个中枢的结束时间 + if cur_duan.start > self.bi_zs_list[-zs_num].end: + return False + + return True + + def is_contain_zs_with_direction(self, start, direction, zs_num): + """从start开始计算,至少包含zs_num(>1)个中枢,且最后两个中枢符合方向""" + + if zs_num < 2: + return False + + # Direction => int + if isinstance(direction, Direction): + direction = 1 if direction == Direction.LONG else -1 + + # 具备中枢 + if len(self.bi_zs_list) < zs_num: + return False + + bi_zs_list = [zs for zs in self.bi_zs_list[-zs_num:] if zs.end > start] + + if len(bi_zs_list) != zs_num: + return False + + if direction == 1 and bi_zs_list[-2].high < bi_zs_list[-1].high: + return True + + if direction == -1 and bi_zs_list[-2].high > bi_zs_list[-1].high: + return True + + return False + + def is_zs_beichi_inside_duan(self, direction): + """是否中枢盘整背驰,进入笔、离去笔,高度,能量背驰""" + + # Direction => int + if isinstance(direction, Direction): + direction = 1 if direction == Direction.LONG else -1 + + # 具备中枢 + if len(self.bi_zs_list) < 1: + return False + # 具备段 + if len(self.duan_list) < 1: + return False + # 最后线段 + cur_duan = self.duan_list[-1] + if cur_duan.direction != direction: + return False + + # 线段内的笔中枢(取前三个就可以了) + zs_list_inside_duan = [zs for zs in self.bi_zs_list[-3:] if zs.start >= cur_duan.start] + + # 无中枢,或者超过1个中枢,都不符合中枢背驰 + if len(zs_list_inside_duan) != 1: + return False + # 当前中枢 + cur_zs = zs_list_inside_duan[0] + + # 当前中枢最后一笔,与段最后一笔不一致 + if cur_duan.bi_list[-1].end != cur_zs.bi_list[-1].end: + return False + + # 分型需要确认 + if self.fenxing_list[-1].is_rt: + return False + + # 找出中枢得进入笔 + entry_bi = cur_zs.bi_list[0] + if entry_bi.direction != direction: + # 找出中枢之前,与段同向得笔 + before_bi_list = [bi for bi in cur_duan.bi_list if bi.start < entry_bi.start and bi.direction==direction] + # 中枢之前得同向笔,不存在(一般不可能,因为中枢得第一笔不同向,该中枢存在与段中间) + if len(before_bi_list) == 0: + return False + entry_bi = before_bi_list[-1] + + # 中枢第一笔,与最后一笔,比较力度和能量 + if entry_bi.height > cur_zs.bi_list[-1].height\ + and entry_bi.atan > cur_zs.bi_list[-1].atan: + return True + + return False + + def is_zs_fangda(self, cur_bi_zs = None, start=False, last_bi=False): + """ + 判断中枢,是否为放大型中枢。 + 中枢放大,一般是反向力量的强烈试探导致; + cur_bi_zs: 指定的笔中枢,若不指定,则默认为最后一个中枢 + start: True,从中枢开始的笔进行计算前三, False: 从最后三笔计算 + last_bi: 采用缺省最后一笔时,是否要求最后一笔,必须等于中枢得最后一笔 + """ + if cur_bi_zs is None: + # 具备中枢 + if len(self.bi_zs_list) < 1: + return False + cur_bi_zs = self.bi_zs_list[-1] + if last_bi: + cur_bi = self.bi_list[-1] + # 要求最后一笔,必须等于中枢得最后一笔 + if cur_bi.start != cur_bi_zs.bi_list[-1].start: + return False + + if len(cur_bi_zs.bi_list) < 3: + return False + + # 从开始前三笔计算 + if start and cur_bi_zs.bi_list[2].height > cur_bi_zs.bi_list[1].height > cur_bi_zs.bi_list[0].height: + return True + + # 从最后的三笔计算 + if not start and cur_bi_zs.bi_list[-1].height > cur_bi_zs.bi_list[-2].height > cur_bi_zs.bi_list[-3].height: + return True + + return False + + def is_zs_shoulian(self, cur_bi_zs=None, start=False, last_bi=False): + """ + 判断中枢,是否为收殓型中枢。 + 中枢收敛,一般是多空力量的趋于平衡,如果是段中的第二个或以上中枢,可能存在变盘; + cur_bi_zs: 指定的中枢,或者最后一个中枢 + start: True,从中枢开始的笔进行计算前三, False: 从最后三笔计算 + """ + if cur_bi_zs is None: + # 具备中枢 + if len(self.bi_zs_list) < 1: + return False + + cur_bi_zs = self.bi_zs_list[-1] + if last_bi: + cur_bi = self.bi_list[-1] + # 要求最后一笔,必须等于中枢得最后一笔 + if cur_bi.start != cur_bi_zs.bi_list[-1].start: + return False + + if len(cur_bi_zs.bi_list) < 3: + return False + + if start and cur_bi_zs.bi_list[2].height < cur_bi_zs.bi_list[1].height < cur_bi_zs.bi_list[0].height: + return True + + if not start and cur_bi_zs.bi_list[-1].height < cur_bi_zs.bi_list[-2].height < cur_bi_zs.bi_list[-3].height: + return True + + return False + + def is_zoushi_beichi(self, direction): + """ + 判断是否走势背驰 + :param direction: + :return: + """ + # Direction => int + if isinstance(direction, Direction): + direction = 1 if direction == Direction.LONG else -1 + + # 具备中枢 + if len(self.bi_zs_list) < 1: + return False + # 具备段 + if len(self.duan_list) < 1: + return False + # 最后线段 + cur_duan = self.duan_list[-1] + if cur_duan.direction != direction: + return False + + # 线段内的笔中枢(取前三个就可以了) + zs_list_inside_duan = [zs for zs in self.bi_zs_list[-3:] if zs.start >= cur_duan.start] + + # 少于2个中枢,都不符合走势背驰 + if len(zs_list_inside_duan) < 2: + return False + # 当前中枢 + cur_zs = zs_list_inside_duan[-1] + # 上一个中枢 + pre_zs = zs_list_inside_duan[-2] + bi_list_between_zs = [bi for bi in cur_duan.bi_list if bi.direction == direction and bi.end > pre_zs.end and bi.start < cur_zs.start] + if len(bi_list_between_zs) ==0: + return False + + # 最后一笔,作为2个中枢间的笔 + bi_between_zs = bi_list_between_zs[-1] + + bi_list_after_cur_zs = [bi for bi in cur_duan.bi_list if bi.direction==direction and bi.end > cur_zs.end] + if len(bi_list_after_cur_zs) == 0: + return False + + # 离开中枢的一笔 + bi_leave_cur_zs = bi_list_after_cur_zs[0] + + # 离开中枢的一笔,不是段的最后一笔 + if bi_leave_cur_zs.start != cur_duan.bi_list[-1].start: + return False + + # 离开中枢的一笔,不是最后一笔 + if bi_leave_cur_zs.start != self.bi_list[-1].start: + return False + + fx = [fx for fx in self.fenxing_list[-2:] if fx.direction==direction][-1] + if fx.is_rt: + return False + + # 中枢间的分笔,能量大于最后分笔,形成走势背驰 + if bi_between_zs.height > bi_leave_cur_zs.height and bi_between_zs.atan > bi_leave_cur_zs.atan: + return True + + return False # ---------------------------------------------------------------------- def write_log(self, content): """记录CTA日志""" diff --git a/vnpy/gateway/ctp/ctp_gateway.py b/vnpy/gateway/ctp/ctp_gateway.py index 3409f91a..e695920c 100644 --- a/vnpy/gateway/ctp/ctp_gateway.py +++ b/vnpy/gateway/ctp/ctp_gateway.py @@ -54,7 +54,7 @@ from vnpy.trader.constant import ( OptionType, Interval ) -from vnpy.trader.gateway import BaseGateway +from vnpy.trader.gateway import BaseGateway, TickCombiner from vnpy.trader.object import ( TickData, BarData, @@ -2091,226 +2091,3 @@ class TqMdApi(): except Exception as e: self.gateway.write_log('退出天勤行情api异常:{}'.format(str(e))) - -class TickCombiner(object): - """ - Tick合成类 - """ - - def __init__(self, gateway, setting): - self.gateway = gateway - self.gateway_name = self.gateway.gateway_name - self.gateway.write_log(u'创建tick合成类:{}'.format(setting)) - - self.symbol = setting.get('symbol', None) - self.leg1_symbol = setting.get('leg1_symbol', None) - self.leg2_symbol = setting.get('leg2_symbol', None) - self.leg1_ratio = setting.get('leg1_ratio', 1) # 腿1的数量配比 - self.leg2_ratio = setting.get('leg2_ratio', 1) # 腿2的数量配比 - self.price_tick = setting.get('price_tick', 1) # 合成价差加比后的最小跳动 - # 价差 - self.is_spread = setting.get('is_spread', False) - # 价比 - self.is_ratio = setting.get('is_ratio', False) - - self.last_leg1_tick = None - self.last_leg2_tick = None - - # 价差日内最高/最低价 - self.spread_high = None - self.spread_low = None - - # 价比日内最高/最低价 - self.ratio_high = None - self.ratio_low = None - - # 当前交易日 - self.trading_day = None - - if self.is_ratio and self.is_spread: - self.gateway.write_error(u'{}参数有误,不能同时做价差/加比.setting:{}'.format(self.symbol, setting)) - return - - self.gateway.write_log(u'初始化{}合成器成功'.format(self.symbol)) - if self.is_spread: - self.gateway.write_log( - u'leg1:{} * {} - leg2:{} * {}'.format(self.leg1_symbol, self.leg1_ratio, self.leg2_symbol, - self.leg2_ratio)) - if self.is_ratio: - self.gateway.write_log( - u'leg1:{} * {} / leg2:{} * {}'.format(self.leg1_symbol, self.leg1_ratio, self.leg2_symbol, - self.leg2_ratio)) - - def on_tick(self, tick): - """OnTick处理""" - combinable = False - - if tick.symbol == self.leg1_symbol: - # leg1合约 - self.last_leg1_tick = tick - if self.last_leg2_tick is not None: - if self.last_leg1_tick.datetime.replace(microsecond=0) == self.last_leg2_tick.datetime.replace( - microsecond=0): - combinable = True - - elif tick.symbol == self.leg2_symbol: - # leg2合约 - self.last_leg2_tick = tick - if self.last_leg1_tick is not None: - if self.last_leg2_tick.datetime.replace(microsecond=0) == self.last_leg1_tick.datetime.replace( - microsecond=0): - combinable = True - - # 不能合并 - if not combinable: - return - - if not self.is_ratio and not self.is_spread: - return - - # 以下情况,基本为单腿涨跌停,不合成价差/价格比 Tick - if (self.last_leg1_tick.ask_price_1 == 0 or self.last_leg1_tick.bid_price_1 == self.last_leg1_tick.limit_up) \ - and self.last_leg1_tick.ask_volume_1 == 0: - self.gateway.write_log( - u'leg1:{0}涨停{1},不合成价差Tick'.format(self.last_leg1_tick.vt_symbol, self.last_leg1_tick.bid_price_1)) - return - if (self.last_leg1_tick.bid_price_1 == 0 or self.last_leg1_tick.ask_price_1 == self.last_leg1_tick.limit_down) \ - and self.last_leg1_tick.bid_volume_1 == 0: - self.gateway.write_log( - u'leg1:{0}跌停{1},不合成价差Tick'.format(self.last_leg1_tick.vt_symbol, self.last_leg1_tick.ask_price_1)) - return - if (self.last_leg2_tick.ask_price_1 == 0 or self.last_leg2_tick.bid_price_1 == self.last_leg2_tick.limit_up) \ - and self.last_leg2_tick.ask_volume_1 == 0: - self.gateway.write_log( - u'leg2:{0}涨停{1},不合成价差Tick'.format(self.last_leg2_tick.vt_symbol, self.last_leg2_tick.bid_price_1)) - return - if (self.last_leg2_tick.bid_price_1 == 0 or self.last_leg2_tick.ask_price_1 == self.last_leg2_tick.limit_down) \ - and self.last_leg2_tick.bid_volume_1 == 0: - self.gateway.write_log( - u'leg2:{0}跌停{1},不合成价差Tick'.format(self.last_leg2_tick.vt_symbol, self.last_leg2_tick.ask_price_1)) - return - - if self.trading_day != tick.trading_day: - self.trading_day = tick.trading_day - self.spread_high = None - self.spread_low = None - self.ratio_high = None - self.ratio_low = None - - if self.is_spread: - spread_tick = TickData(gateway_name=self.gateway_name, - symbol=self.symbol, - exchange=Exchange.SPD, - datetime=tick.datetime) - - spread_tick.trading_day = tick.trading_day - spread_tick.date = tick.date - spread_tick.time = tick.time - - # 叫卖价差=leg1.ask_price_1 * 配比 - leg2.bid_price_1 * 配比,volume为两者最小 - spread_tick.ask_price_1 = round_to(target=self.price_tick, - value=self.last_leg1_tick.ask_price_1 * self.leg1_ratio - self.last_leg2_tick.bid_price_1 * self.leg2_ratio) - spread_tick.ask_volume_1 = min(self.last_leg1_tick.ask_volume_1, self.last_leg2_tick.bid_volume_1) - - # 叫买价差=leg1.bid_price_1 * 配比 - leg2.ask_price_1 * 配比,volume为两者最小 - spread_tick.bid_price_1 = round_to(target=self.price_tick, - value=self.last_leg1_tick.bid_price_1 * self.leg1_ratio - self.last_leg2_tick.ask_price_1 * self.leg2_ratio) - spread_tick.bid_volume_1 = min(self.last_leg1_tick.bid_volume_1, self.last_leg2_tick.ask_volume_1) - - # 最新价 - spread_tick.last_price = round_to(target=self.price_tick, - value=(spread_tick.ask_price_1 + spread_tick.bid_price_1) / 2) - # 昨收盘价 - if self.last_leg2_tick.pre_close > 0 and self.last_leg1_tick.pre_close > 0: - spread_tick.pre_close = round_to(target=self.price_tick, - value=self.last_leg1_tick.pre_close * self.leg1_ratio - self.last_leg2_tick.pre_close * self.leg2_ratio) - # 开盘价 - if self.last_leg2_tick.open_price > 0 and self.last_leg1_tick.open_price > 0: - spread_tick.open_price = round_to(target=self.price_tick, - value=self.last_leg1_tick.open_price * self.leg1_ratio - self.last_leg2_tick.open_price * self.leg2_ratio) - # 最高价 - if self.spread_high: - self.spread_high = max(self.spread_high, spread_tick.ask_price_1) - else: - self.spread_high = spread_tick.ask_price_1 - spread_tick.high_price = self.spread_high - - # 最低价 - if self.spread_low: - self.spread_low = min(self.spread_low, spread_tick.bid_price_1) - else: - self.spread_low = spread_tick.bid_price_1 - - spread_tick.low_price = self.spread_low - - self.gateway.on_tick(spread_tick) - - if self.is_ratio: - ratio_tick = TickData( - gateway_name=self.gateway_name, - symbol=self.symbol, - exchange=Exchange.SPD, - datetime=tick.datetime - ) - - ratio_tick.trading_day = tick.trading_day - ratio_tick.date = tick.date - ratio_tick.time = tick.time - - # 比率tick = (腿1 * 腿1 手数 / 腿2价格 * 腿2手数) 百分比 - ratio_tick.ask_price_1 = 100 * self.last_leg1_tick.ask_price_1 * self.leg1_ratio \ - / (self.last_leg2_tick.bid_price_1 * self.leg2_ratio) # noqa - ratio_tick.ask_price_1 = round_to( - target=self.price_tick, - value=ratio_tick.ask_price_1 - ) - - ratio_tick.ask_volume_1 = min(self.last_leg1_tick.ask_volume_1, self.last_leg2_tick.bid_volume_1) - ratio_tick.bid_price_1 = 100 * self.last_leg1_tick.bid_price_1 * self.leg1_ratio \ - / (self.last_leg2_tick.ask_price_1 * self.leg2_ratio) # noqa - ratio_tick.bid_price_1 = round_to( - target=self.price_tick, - value=ratio_tick.bid_price_1 - ) - - ratio_tick.bid_volume_1 = min(self.last_leg1_tick.bid_volume_1, self.last_leg2_tick.ask_volume_1) - ratio_tick.last_price = (ratio_tick.ask_price_1 + ratio_tick.bid_price_1) / 2 - ratio_tick.last_price = round_to( - target=self.price_tick, - value=ratio_tick.last_price - ) - - # 昨收盘价 - if self.last_leg2_tick.pre_close > 0 and self.last_leg1_tick.pre_close > 0: - ratio_tick.pre_close = 100 * self.last_leg1_tick.pre_close * self.leg1_ratio / ( - self.last_leg2_tick.pre_close * self.leg2_ratio) # noqa - ratio_tick.pre_close = round_to( - target=self.price_tick, - value=ratio_tick.pre_close - ) - - # 开盘价 - if self.last_leg2_tick.open_price > 0 and self.last_leg1_tick.open_price > 0: - ratio_tick.open_price = 100 * self.last_leg1_tick.open_price * self.leg1_ratio / ( - self.last_leg2_tick.open_price * self.leg2_ratio) # noqa - ratio_tick.open_price = round_to( - target=self.price_tick, - value=ratio_tick.open_price - ) - - # 最高价 - if self.ratio_high: - self.ratio_high = max(self.ratio_high, ratio_tick.ask_price_1) - else: - self.ratio_high = ratio_tick.ask_price_1 - ratio_tick.high_price = self.spread_high - - # 最低价 - if self.ratio_low: - self.ratio_low = min(self.ratio_low, ratio_tick.bid_price_1) - else: - self.ratio_low = ratio_tick.bid_price_1 - - ratio_tick.low_price = self.spread_low - - self.gateway.on_tick(ratio_tick) diff --git a/vnpy/gateway/pb/pb_gateway.py b/vnpy/gateway/pb/pb_gateway.py index 9e6515fc..09dc5f1b 100644 --- a/vnpy/gateway/pb/pb_gateway.py +++ b/vnpy/gateway/pb/pb_gateway.py @@ -374,8 +374,8 @@ class PbGateway(BaseGateway): product_id=product_id, unit_id=unit_id, holder_ids=holder_ids) - self.tq_api = TqMdApi(self) - self.tq_api.connect() + #self.tq_api = TqMdApi(self) + #self.tq_api.connect() self.init_query() def close(self) -> None: diff --git a/vnpy/trader/engine.py b/vnpy/trader/engine.py index 837d6e8c..c320a626 100644 --- a/vnpy/trader/engine.py +++ b/vnpy/trader/engine.py @@ -507,6 +507,8 @@ class OmsEngine(BaseEngine): self.main_engine.get_position = self.get_position self.main_engine.get_account = self.get_account self.main_engine.get_contract = self.get_contract + self.main_engine.get_exchange = self.get_exchange + self.main_engine.get_custom_contract = self.get_custom_contract self.main_engine.get_all_ticks = self.get_all_ticks self.main_engine.get_all_orders = self.get_all_orders self.main_engine.get_all_trades = self.get_all_trades @@ -650,6 +652,13 @@ class OmsEngine(BaseEngine): self.today_contracts[contract.vt_symbol] = contract self.today_contracts[contract.symbol] = contract + def get_exchange(self, symbol: str) -> Exchange: + """获取合约对应的交易所""" + contract = self.contracts.get(symbol, None) + if contract is None: + return Exchange.LOCAL + return contract.exchange + def get_tick(self, vt_symbol: str) -> Optional[TickData]: """ Get latest market tick data by vt_symbol. @@ -746,6 +755,27 @@ class OmsEngine(BaseEngine): ] return active_orders + def get_custom_contract(self, symbol): + """ + 获取自定义合约的设置 + :param symbol: "pb2012-1-pb2101-1-CJ" + :return: { + "name": "pb跨期价差", + "exchange": "SPD", + "leg1_symbol": "pb2012", + "leg1_exchange": "SHFE", + "leg1_ratio": 1, + "leg2_symbol": "pb2101", + "leg2_exchange": "SHFE", + "leg2_ratio": 1, + "is_spread": true, + "size": 1, + "margin_rate": 0.1, + "price_tick": 5 + } + """ + return self.custom_settings.get(symbol, None) + def get_all_custom_contracts(self, rtn_setting=False): """ 获取所有自定义合约 @@ -759,6 +789,7 @@ class OmsEngine(BaseEngine): if len(self.custom_contracts) == 0: c = CustomContract() + self.custom_settings = c.get_config() self.custom_contracts = c.get_contracts() return self.custom_contracts diff --git a/vnpy/trader/gateway.py b/vnpy/trader/gateway.py index b51b1ff4..cce67aba 100644 --- a/vnpy/trader/gateway.py +++ b/vnpy/trader/gateway.py @@ -34,7 +34,7 @@ from .object import ( Exchange ) -from vnpy.trader.utility import get_folder_path +from vnpy.trader.utility import get_folder_path, round_to from vnpy.trader.util_logger import setup_logger @@ -329,6 +329,229 @@ class BaseGateway(ABC): return self.status +class TickCombiner(object): + """ + Tick合成类 + """ + + def __init__(self, gateway, setting): + self.gateway = gateway + self.gateway_name = self.gateway.gateway_name + self.gateway.write_log(u'创建tick合成类:{}'.format(setting)) + + self.symbol = setting.get('symbol', None) + self.leg1_symbol = setting.get('leg1_symbol', None) + self.leg2_symbol = setting.get('leg2_symbol', None) + self.leg1_ratio = setting.get('leg1_ratio', 1) # 腿1的数量配比 + self.leg2_ratio = setting.get('leg2_ratio', 1) # 腿2的数量配比 + self.price_tick = setting.get('price_tick', 1) # 合成价差加比后的最小跳动 + # 价差 + self.is_spread = setting.get('is_spread', False) + # 价比 + self.is_ratio = setting.get('is_ratio', False) + + self.last_leg1_tick = None + self.last_leg2_tick = None + + # 价差日内最高/最低价 + self.spread_high = None + self.spread_low = None + + # 价比日内最高/最低价 + self.ratio_high = None + self.ratio_low = None + + # 当前交易日 + self.trading_day = None + + if self.is_ratio and self.is_spread: + self.gateway.write_error(u'{}参数有误,不能同时做价差/加比.setting:{}'.format(self.symbol, setting)) + return + + self.gateway.write_log(u'初始化{}合成器成功'.format(self.symbol)) + if self.is_spread: + self.gateway.write_log( + u'leg1:{} * {} - leg2:{} * {}'.format(self.leg1_symbol, self.leg1_ratio, self.leg2_symbol, + self.leg2_ratio)) + if self.is_ratio: + self.gateway.write_log( + u'leg1:{} * {} / leg2:{} * {}'.format(self.leg1_symbol, self.leg1_ratio, self.leg2_symbol, + self.leg2_ratio)) + + def on_tick(self, tick): + """OnTick处理""" + combinable = False + + if tick.symbol == self.leg1_symbol: + # leg1合约 + self.last_leg1_tick = tick + if self.last_leg2_tick is not None: + if self.last_leg1_tick.datetime.replace(microsecond=0) == self.last_leg2_tick.datetime.replace( + microsecond=0): + combinable = True + + elif tick.symbol == self.leg2_symbol: + # leg2合约 + self.last_leg2_tick = tick + if self.last_leg1_tick is not None: + if self.last_leg2_tick.datetime.replace(microsecond=0) == self.last_leg1_tick.datetime.replace( + microsecond=0): + combinable = True + + # 不能合并 + if not combinable: + return + + if not self.is_ratio and not self.is_spread: + return + + # 以下情况,基本为单腿涨跌停,不合成价差/价格比 Tick + if (self.last_leg1_tick.ask_price_1 == 0 or self.last_leg1_tick.bid_price_1 == self.last_leg1_tick.limit_up) \ + and self.last_leg1_tick.ask_volume_1 == 0: + self.gateway.write_log( + u'leg1:{0}涨停{1},不合成价差Tick'.format(self.last_leg1_tick.vt_symbol, self.last_leg1_tick.bid_price_1)) + return + if (self.last_leg1_tick.bid_price_1 == 0 or self.last_leg1_tick.ask_price_1 == self.last_leg1_tick.limit_down) \ + and self.last_leg1_tick.bid_volume_1 == 0: + self.gateway.write_log( + u'leg1:{0}跌停{1},不合成价差Tick'.format(self.last_leg1_tick.vt_symbol, self.last_leg1_tick.ask_price_1)) + return + if (self.last_leg2_tick.ask_price_1 == 0 or self.last_leg2_tick.bid_price_1 == self.last_leg2_tick.limit_up) \ + and self.last_leg2_tick.ask_volume_1 == 0: + self.gateway.write_log( + u'leg2:{0}涨停{1},不合成价差Tick'.format(self.last_leg2_tick.vt_symbol, self.last_leg2_tick.bid_price_1)) + return + if (self.last_leg2_tick.bid_price_1 == 0 or self.last_leg2_tick.ask_price_1 == self.last_leg2_tick.limit_down) \ + and self.last_leg2_tick.bid_volume_1 == 0: + self.gateway.write_log( + u'leg2:{0}跌停{1},不合成价差Tick'.format(self.last_leg2_tick.vt_symbol, self.last_leg2_tick.ask_price_1)) + return + + if self.trading_day != tick.trading_day: + self.trading_day = tick.trading_day + self.spread_high = None + self.spread_low = None + self.ratio_high = None + self.ratio_low = None + + if self.is_spread: + spread_tick = TickData(gateway_name=self.gateway_name, + symbol=self.symbol, + exchange=Exchange.SPD, + datetime=tick.datetime) + + spread_tick.trading_day = tick.trading_day + spread_tick.date = tick.date + spread_tick.time = tick.time + + # 叫卖价差=leg1.ask_price_1 * 配比 - leg2.bid_price_1 * 配比,volume为两者最小 + spread_tick.ask_price_1 = round_to(target=self.price_tick, + value=self.last_leg1_tick.ask_price_1 * self.leg1_ratio - self.last_leg2_tick.bid_price_1 * self.leg2_ratio) + spread_tick.ask_volume_1 = min(self.last_leg1_tick.ask_volume_1, self.last_leg2_tick.bid_volume_1) + + # 叫买价差=leg1.bid_price_1 * 配比 - leg2.ask_price_1 * 配比,volume为两者最小 + spread_tick.bid_price_1 = round_to(target=self.price_tick, + value=self.last_leg1_tick.bid_price_1 * self.leg1_ratio - self.last_leg2_tick.ask_price_1 * self.leg2_ratio) + spread_tick.bid_volume_1 = min(self.last_leg1_tick.bid_volume_1, self.last_leg2_tick.ask_volume_1) + + # 最新价 + spread_tick.last_price = round_to(target=self.price_tick, + value=(spread_tick.ask_price_1 + spread_tick.bid_price_1) / 2) + # 昨收盘价 + if self.last_leg2_tick.pre_close > 0 and self.last_leg1_tick.pre_close > 0: + spread_tick.pre_close = round_to(target=self.price_tick, + value=self.last_leg1_tick.pre_close * self.leg1_ratio - self.last_leg2_tick.pre_close * self.leg2_ratio) + # 开盘价 + if self.last_leg2_tick.open_price > 0 and self.last_leg1_tick.open_price > 0: + spread_tick.open_price = round_to(target=self.price_tick, + value=self.last_leg1_tick.open_price * self.leg1_ratio - self.last_leg2_tick.open_price * self.leg2_ratio) + # 最高价 + if self.spread_high: + self.spread_high = max(self.spread_high, spread_tick.ask_price_1) + else: + self.spread_high = spread_tick.ask_price_1 + spread_tick.high_price = self.spread_high + + # 最低价 + if self.spread_low: + self.spread_low = min(self.spread_low, spread_tick.bid_price_1) + else: + self.spread_low = spread_tick.bid_price_1 + + spread_tick.low_price = self.spread_low + + self.gateway.on_tick(spread_tick) + + if self.is_ratio: + ratio_tick = TickData( + gateway_name=self.gateway_name, + symbol=self.symbol, + exchange=Exchange.SPD, + datetime=tick.datetime + ) + + ratio_tick.trading_day = tick.trading_day + ratio_tick.date = tick.date + ratio_tick.time = tick.time + + # 比率tick = (腿1 * 腿1 手数 / 腿2价格 * 腿2手数) 百分比 + ratio_tick.ask_price_1 = 100 * self.last_leg1_tick.ask_price_1 * self.leg1_ratio \ + / (self.last_leg2_tick.bid_price_1 * self.leg2_ratio) # noqa + ratio_tick.ask_price_1 = round_to( + target=self.price_tick, + value=ratio_tick.ask_price_1 + ) + + ratio_tick.ask_volume_1 = min(self.last_leg1_tick.ask_volume_1, self.last_leg2_tick.bid_volume_1) + ratio_tick.bid_price_1 = 100 * self.last_leg1_tick.bid_price_1 * self.leg1_ratio \ + / (self.last_leg2_tick.ask_price_1 * self.leg2_ratio) # noqa + ratio_tick.bid_price_1 = round_to( + target=self.price_tick, + value=ratio_tick.bid_price_1 + ) + + ratio_tick.bid_volume_1 = min(self.last_leg1_tick.bid_volume_1, self.last_leg2_tick.ask_volume_1) + ratio_tick.last_price = (ratio_tick.ask_price_1 + ratio_tick.bid_price_1) / 2 + ratio_tick.last_price = round_to( + target=self.price_tick, + value=ratio_tick.last_price + ) + + # 昨收盘价 + if self.last_leg2_tick.pre_close > 0 and self.last_leg1_tick.pre_close > 0: + ratio_tick.pre_close = 100 * self.last_leg1_tick.pre_close * self.leg1_ratio / ( + self.last_leg2_tick.pre_close * self.leg2_ratio) # noqa + ratio_tick.pre_close = round_to( + target=self.price_tick, + value=ratio_tick.pre_close + ) + + # 开盘价 + if self.last_leg2_tick.open_price > 0 and self.last_leg1_tick.open_price > 0: + ratio_tick.open_price = 100 * self.last_leg1_tick.open_price * self.leg1_ratio / ( + self.last_leg2_tick.open_price * self.leg2_ratio) # noqa + ratio_tick.open_price = round_to( + target=self.price_tick, + value=ratio_tick.open_price + ) + + # 最高价 + if self.ratio_high: + self.ratio_high = max(self.ratio_high, ratio_tick.ask_price_1) + else: + self.ratio_high = ratio_tick.ask_price_1 + ratio_tick.high_price = self.spread_high + + # 最低价 + if self.ratio_low: + self.ratio_low = min(self.ratio_low, ratio_tick.bid_price_1) + else: + self.ratio_low = ratio_tick.bid_price_1 + + ratio_tick.low_price = self.spread_low + + self.gateway.on_tick(ratio_tick) + class LocalOrderManager: """ Management tool to support use local order id for trading.