diff --git a/prod/jobs/check_dominat_symbol.py b/prod/jobs/check_dominat_symbol.py index 64d8b865..65ac0a4c 100644 --- a/prod/jobs/check_dominat_symbol.py +++ b/prod/jobs/check_dominat_symbol.py @@ -29,7 +29,7 @@ if __name__ == "__main__": cta_path = os.path.abspath(os.path.join(vnpy_root, account_folder)) if not os.path.exists(cta_path): print(f'{cta_path}不存在', file=sys.stderr) - exit() + continue print(f'开始检查{cta_path}下的策略运行配置文件') account_name = account_folder.split('/')[-1] # 创建API对象 diff --git a/prod/jobs/refill_tdx_future_bars.py b/prod/jobs/refill_tdx_future_bars.py index f8e3ab4e..447a1796 100644 --- a/prod/jobs/refill_tdx_future_bars.py +++ b/prod/jobs/refill_tdx_future_bars.py @@ -59,6 +59,7 @@ if __name__ == "__main__": print(f'文件{bar_file_path}存在,最后时间:{start_date}') else: last_dt = None + start_dt = datetime.strptime(start_date, '%Y%m%d') print(f'文件{bar_file_path}不存在,开始时间:{start_date}') diff --git a/vnpy/api/easytrader/remoteclient.py b/vnpy/api/easytrader/remoteclient.py index 19ce6c81..15a46e72 100644 --- a/vnpy/api/easytrader/remoteclient.py +++ b/vnpy/api/easytrader/remoteclient.py @@ -35,7 +35,8 @@ class RemoteClient: """ params = locals().copy() params.pop("self") - + # if exe_path is None: + # params['exe_path'] = 'C:\\THS\\xiadan.exe' if config_path is not None: account = file2dict(config_path) params["user"] = account["user"] diff --git a/vnpy/api/websocket/websocket_client.py b/vnpy/api/websocket/websocket_client.py index 48912b28..6cda0ac7 100644 --- a/vnpy/api/websocket/websocket_client.py +++ b/vnpy/api/websocket/websocket_client.py @@ -1,5 +1,6 @@ import json import logging +import gzip import socket import ssl import sys @@ -197,19 +198,21 @@ class WebsocketClient: self._ensure_connection() ws = self._ws if ws: - text = ws.recv() + recv_data = ws.recv() # ws object is closed when recv function is blocking - if not text: + if not recv_data: self._disconnect() continue - self._record_last_received_text(text) + self._record_last_received_text(recv_data) try: - data = self.unpack_data(text) + if isinstance(recv_data, bytes): + recv_data = gzip.decompress(recv_data) + data = self.unpack_data(recv_data) except ValueError as e: - print("websocket unable to parse data: " + text) + print("websocket unable to parse data: " + recv_data) raise e self._log('recv data: %s', data) diff --git a/vnpy/app/cta_crypto/template.py b/vnpy/app/cta_crypto/template.py index b2ace400..905ecd5f 100644 --- a/vnpy/app/cta_crypto/template.py +++ b/vnpy/app/cta_crypto/template.py @@ -2143,7 +2143,7 @@ class CtaSpotTemplate(CtaTemplate): .format(self.cur_datetime, self.vt_symbol, self.cur_price, self.entrust)) if len(self.active_orders) > 0: - self.write_log('当前活动订单:{}'.format(json.dumps(self.active_orders, indent=2, ensure_ascii=False))) + self.write_log('当前活动订单数:{}'.format(len(self.active_orders))) #json.dumps(self.active_orders, indent=2, ensure_ascii=False))) if hasattr(self, 'policy'): policy = getattr(self, 'policy') diff --git a/vnpy/app/cta_stock/engine.py b/vnpy/app/cta_stock/engine.py index a228c1e4..d7ed4ae5 100644 --- a/vnpy/app/cta_stock/engine.py +++ b/vnpy/app/cta_stock/engine.py @@ -279,9 +279,10 @@ class CtaEngine(BaseEngine): strategy = self.orderid_strategy_map.get(order.vt_orderid, None) if not strategy: - self.write_log(f'委托单没有对应的策略设置:order:{order}') + self.write_log(f'委托单没有对应的策略设置:order:{order.__dict__}') + self.write_log(f'当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}') return - + self.write_log(f'委托更新:{order.vt_orderid} => 策略:{strategy.strategy_name}') # Remove vt_orderid if order is no longer active. vt_orderids = self.strategy_orderid_map[strategy.strategy_name] if order.vt_orderid in vt_orderids and not order.is_active(): @@ -311,15 +312,18 @@ class CtaEngine(BaseEngine): # Filter duplicate trade push if trade.vt_tradeid in self.vt_tradeids: - self.write_log(f'成交单的委托编号不属于本引擎实例:{trade}') + self.write_log(f'成交单的交易编号{trade.vt_tradeid}已处理完毕,不再处理') return self.vt_tradeids.add(trade.vt_tradeid) strategy = self.orderid_strategy_map.get(trade.vt_orderid, None) if not strategy: - self.write_log(f'成交单没有对应的策略设置:order:{trade}') + self.write_log(f'成交单没有对应的策略设置:trade:{trade.__dict__}') + self.write_log(f'当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}') return + self.write_log(f'成交更新:{trade.vt_orderid} => 策略:{strategy.strategy_name}') + # Update strategy pos before calling on_trade method # 取消外部干预策略pos,由策略自行完成更新 # if trade.direction == Direction.LONG: @@ -512,13 +516,10 @@ class CtaEngine(BaseEngine): vt_orderid = self.main_engine.send_order( req, gateway_name) - # Check if sending order successful - if not vt_orderid: - vt_orderids - vt_orderids.append(vt_orderid) # Save relationship between orderid and strategy. + self.write_log(f'委托成功绑定{vt_orderid} <==> {strategy.strategy_name}') self.orderid_strategy_map[vt_orderid] = strategy self.strategy_orderid_map[strategy.strategy_name].add(vt_orderid) @@ -874,7 +875,7 @@ class CtaEngine(BaseEngine): def get_account(self, vt_accountid: str = ""): """ 查询账号的资金""" # 如果启动风控,则使用风控中的最大仓位 - if self.main_engine.rm_engine: + if self.main_engine.rm_engine and len(vt_accountid) > 0: return self.main_engine.rm_engine.get_account(vt_accountid) if len(vt_accountid) > 0: diff --git a/vnpy/app/cta_strategy_pro/back_testing.py b/vnpy/app/cta_strategy_pro/back_testing.py index ef3ac812..91acbc3e 100644 --- a/vnpy/app/cta_strategy_pro/back_testing.py +++ b/vnpy/app/cta_strategy_pro/back_testing.py @@ -2003,7 +2003,7 @@ class BackTestingEngine(object): # 计算每个策略实例的持仓盈亏 strategy_pnl.update({longpos.strategy_name: strategy_pnl.get(longpos.strategy_name, 0) + holding_profit}) - positionMsg += "{},long,p={},v={},m={};".format(symbol, longpos.price, longpos.volume, holding_profit) + positionMsg += "{:<10},long ,p:{:<10},vol:{:<3},pnl:{};\n".format(symbol, round(longpos.price,3), longpos.volume, round(holding_profit,3)) for shortpos in self.short_position_list: # 不计算套利合约的持仓盈亏 @@ -2023,7 +2023,7 @@ class BackTestingEngine(object): # 计算每个策略实例的持仓盈亏 strategy_pnl.update({shortpos.strategy_name: strategy_pnl.get(shortpos.strategy_name, 0) + holding_profit}) - positionMsg += "{},short,p={},v={},m={};".format(symbol, shortpos.price, shortpos.volume, holding_profit) + positionMsg += "{:<10},short,p:{:<10},vol:{:<3},pnl:{};\n".format(symbol, round(shortpos.price,3), shortpos.volume, round(holding_profit,3)) data['net'] = c + today_holding_profit # 当日净值(含持仓盈亏) data['rate'] = (c + today_holding_profit) / self.init_capital @@ -2049,7 +2049,7 @@ class BackTestingEngine(object): self.daily_max_drawdown_rate = drawdown_rate self.max_drawdown_rate_time = data['date'] - msg = u'{}: net={}, capital={} max={} holding_profit={} commission={}, pos: {}' \ + msg = u'{}: net={}, capital={} max={} holding_profit={} commission={}, pos: \n{}' \ .format(data['date'], data['net'], c, m, today_holding_profit, @@ -2143,6 +2143,7 @@ class BackTestingEngine(object): d = {} d['init_capital'] = self.init_capital d['profit'] = self.cur_capital - self.init_capital + d['net_capital'] = self.net_capital d['max_capital'] = self.max_net_capital # 取消原 maxCapital if len(self.pnl_list) == 0: @@ -2223,8 +2224,11 @@ class BackTestingEngine(object): result_info.update({u'期初资金': d['init_capital']}) self.output(u'期初资金:\t%s' % format_number(d['init_capital'])) - result_info.update({u'总盈亏': d['profit']}) - self.output(u'总盈亏:\t%s' % format_number(d['profit'])) + result_info.update({u'期末资金': d['net_capital']}) + self.output(u'期末资金:\t%s' % format_number(d['net_capital'])) + + result_info.update({u'平仓盈亏': d['profit']}) + self.output(u'平仓盈亏:\t%s' % format_number(d['profit'])) result_info.update({u'资金最高净值': d['max_capital']}) self.output(u'资金最高净值:\t%s' % format_number(d['max_capital'])) diff --git a/vnpy/app/cta_strategy_pro/template.py b/vnpy/app/cta_strategy_pro/template.py index 227b2b60..94dd338e 100644 --- a/vnpy/app/cta_strategy_pro/template.py +++ b/vnpy/app/cta_strategy_pro/template.py @@ -713,7 +713,11 @@ class CtaProTemplate(CtaTemplate): cb_on_bar = strategy_kline.cb_on_bar # 缓存实例数据 =》 当前实例数据 strategy_kline.__dict__.update(cache_kline.__dict__) - + kline_first_bar_dt = None + kline_last_bar_dt = None + if len(strategy_kline.line_bar) > 0: + kline_first_bar_dt = strategy_kline.line_bar[0].datetime + kline_last_bar_dt = strategy_kline.line_bar[-1].datetime # 所有K线的最后时间 if last_bar_dt and strategy_kline.cur_datetime: last_bar_dt = max(last_bar_dt, strategy_kline.cur_datetime) @@ -724,7 +728,7 @@ class CtaProTemplate(CtaTemplate): strategy_kline.strategy = self strategy_kline.cb_on_bar = cb_on_bar - self.write_log(f'恢复{kline_name}缓存数据,最新bar结束时间:{last_bar_dt}') + self.write_log(f'恢复{kline_name}缓存数据:[{kline_first_bar_dt}] => [{kline_last_bar_dt}], bar结束时间:{last_bar_dt}') self.write_log(u'加载缓存k线数据完毕') return last_bar_dt @@ -1880,7 +1884,7 @@ class CtaProFutureTemplate(CtaProTemplate): elif order_info['traded'] > 0: self.write_log('撤单逻辑 = > 部分开仓') if order_grid.traded_volume < order_info['traded']: - self.write_log('撤单逻辑 = > 调整网格开仓数 {} => {}'.format(order_grid.traded_volume, order_grid['traded'] )) + self.write_log('撤单逻辑 = > 调整网格开仓数 {} => {}'.format(order_grid.traded_volume, order_info['traded'] )) order_grid.traded_volume = order_info['traded'] self.write_log(f'撤单逻辑 => 调整网格委托状态=> False, 开仓状态:True, 开仓数量:{order_grid.volume}=>{order_grid.traded_volume}') order_grid.order_status = False diff --git a/vnpy/component/cta_line_bar.py b/vnpy/component/cta_line_bar.py index 227874fb..f9a6e193 100644 --- a/vnpy/component/cta_line_bar.py +++ b/vnpy/component/cta_line_bar.py @@ -199,7 +199,7 @@ class CtaLineBar(object): self.mid5_array[:] = np.nan # 导出到CSV文件 的目录名 和 要导出的 字段 self.export_filename = None # 数据要导出的目标文件夹 - self.export_fields = [] # 定义要导出的K线数据字段(包含K线元素,主图指标,附图指标等) + self.export_fields = [] # 定义要导出的K线数据字段(包含K线元素,主图指标,附图指标等) self.export_bi_filename = None # 通过唐其安通道输出得笔csv文件(不是缠论得笔) self.export_zs_filename = None # 通过唐其安通道输出的中枢csv文件(不是缠论的笔中枢) @@ -654,6 +654,7 @@ class CtaLineBar(object): self.dict_dif = {} # datetime str: dif mapping self.line_dea = [] # DEA = (前一日DEA X 8/10 + 今日DIF X 2/10),即为talib-MACD返回值 self.line_macd = [] # (dif-dea)*2,但是talib中MACD的计算是bar = (dif-dea)*1,国内一般是乘以2 + self.dict_macd = {} # datetime str: macd mapping self.macd_segment_list = [] # macd 金叉/死叉的段列表,记录价格的最高/最低,Dif的最高,最低,Macd的最高/最低,Macd面接 self._rt_dif = None self._rt_dea = None @@ -940,6 +941,7 @@ class CtaLineBar(object): if self.bar_len > self.max_hold_bars: del self.line_bar[0] self.dict_dif.pop(self.index_list[0], None) + self.dict_macd.pop(self.index_list[0], None) del self.index_list[0] self.bar_len = self.bar_len - 1 # 删除了最前面的bar,bar长度少一位 @@ -1256,13 +1258,13 @@ class CtaLineBar(object): # K线的日期时间 self.cur_bar.trading_day = tick.trading_day # K线所在的交易日期 - #self.cur_bar.date = tick.date # K线的日期,(夜盘的话,与交易日期不同哦) + # self.cur_bar.date = tick.date # K线的日期,(夜盘的话,与交易日期不同哦) self.cur_bar.datetime = tick.datetime if (self.interval == Interval.SECOND and self.bar_interval % 60 == 0) \ or self.interval in [Interval.MINUTE, Interval.HOUR, Interval.DAILY, Interval.WEEKLY]: # K线的日期时间(去除秒)设为第一个Tick的时间 self.cur_bar.datetime = self.cur_bar.datetime.replace(second=0, microsecond=0) - #self.cur_bar.time = self.cur_bar.datetime.strftime('%H:%M:%S') + # self.cur_bar.time = self.cur_bar.datetime.strftime('%H:%M:%S') self.cur_bar.volume = tick.last_volume if tick.last_volume > 0 else tick.volume if self.cur_trading_day != self.cur_bar.trading_day or not self.line_bar: # bar的交易日与记录的当前交易日不一致: @@ -1784,7 +1786,7 @@ class CtaLineBar(object): # 当前笔 不是中枢最后一笔, 方向是回归中枢的 else: # 向下的一笔,且回落中枢高位下方,变成中枢的最后一笔 - if direction == -1 and cur_bi.get('low') < self.cur_tqn_zs.get('high')\ + if direction == -1 and cur_bi.get('low') < self.cur_tqn_zs.get('high') \ and cur_bi.get('high') > self.cur_tqn_zs.get('low'): # 对比中枢之前所有的确认低点,不能超过 zs_lows = self.cur_tqn_zs.get("lows", [self.cur_tqn_zs.get('low')]) @@ -1814,7 +1816,7 @@ class CtaLineBar(object): self.cur_tqn_zs.update({"end": cur_bi.get("start")}) # 向上的一笔,回抽中枢下轨上方,变成中枢的一笔 - if direction == 1 and cur_bi.get('high') > self.cur_tqn_zs.get('low')\ + if direction == 1 and cur_bi.get('high') > self.cur_tqn_zs.get('low') \ and cur_bi.get('low') < self.cur_tqn_zs.get('high'): # 对比中枢之前所有的确认高点,不能超过 zs_highs = self.cur_tqn_zs.get("highs", [self.cur_tqn_zs.get('high')]) @@ -3753,15 +3755,19 @@ class CtaLineBar(object): del self.line_dif[0] dif = round(dif_list[-1], self.round_n) self.line_dif.append(dif) - if len(self.index_list) > 0: - self.dict_dif.update({self.index_list[-1]: dif}) + if len(self.line_dea) > self.max_hold_bars: del self.line_dea[0] self.line_dea.append(round(dea_list[-1], self.round_n)) + macd = round(macd_list[-1] * 2, self.round_n) if len(self.line_macd) > self.max_hold_bars: del self.line_macd[0] - self.line_macd.append(round(macd_list[-1] * 2, self.round_n)) # 国内一般是2倍 + self.line_macd.append(macd) # 国内一般是2倍 + + if len(self.index_list) > 0: + self.dict_dif.update({self.index_list[-1]: dif}) + self.dict_macd.update({self.index_list[-1]: macd}) # 更新 “段”(金叉-》死叉;或 死叉-》金叉) segment = self.macd_segment_list[-1] if len(self.macd_segment_list) > 0 else {} @@ -3889,7 +3895,11 @@ class CtaLineBar(object): # 通过bar的时间,获取dif值 def get_dif_by_dt(self, str_dt): - return self.dict_dif.get(str_dt, 0) + return self.dict_dif.get(str_dt, None) + + # 通过bar的时间,获取macd值 + def get_macd_by_dt(self, str_dt): + return self.dict_macd.get(str_dt, None) @property def rt_dif(self): @@ -5390,6 +5400,15 @@ class CtaLineBar(object): # self.__count_chanlun() # return self._duan_zs_list + def is_duan(self, direction): + """当前最新一线段,是否与输入方向一致""" + if isinstance(direction, Direction): + direction = 1 if direction == Direction.LONG else -1 + if len(self.duan_list) == 0: + return False + + return self.duan_list[-1].direction == direction + def is_bi_beichi_inside_duan(self, direction, cur_duan=None): """ 当前段内的笔,是否形成背驰 @@ -5402,7 +5421,7 @@ class CtaLineBar(object): direction = 1 if direction == Direction.LONG else -1 if cur_duan is None: - if len(self._duan_list) == 0: + if len(self.duan_list) == 0: return False # 分型需要确认 @@ -5410,7 +5429,7 @@ class CtaLineBar(object): return False # 取当前段 - cur_duan = self._duan_list[-1] + cur_duan = self.duan_list[-1] # 获取最近2个匹配direction的分型 fx_list = [fx for fx in self._fenxing_list[-4:] if fx.direction == direction] if len(fx_list) != 2: @@ -5450,51 +5469,57 @@ class CtaLineBar(object): return False - def is_fx_macd_divergence(self, direction): + def is_fx_macd_divergence(self, direction, cur_duan=None, use_macd=False): """ 分型的macd背离 :param direction: 1,-1 或者 Direction.LONG(判断是否顶背离), Direction.SHORT(判断是否底背离) - + : cur_duan 当前段 + : use_macd 使用macd红柱,绿柱进行比较 :return: """ if isinstance(direction, Direction): direction = 1 if direction == Direction.LONG else -1 - if len(self._duan_list) == 0: - return False - # 当前段 - duan = self._duan_list[-1] + if cur_duan is None: - if duan.direction != direction: + if len(self.duan_list) == 0: + return False + # 当前段 + cur_duan = self.duan_list[-1] + + # 获取最近2个匹配direction的分型 + fx_list = [fx for fx in self._fenxing_list[-4:] if fx.direction == direction] + if len(fx_list) != 2: + return False + + # 这里是排除段的信号出错,获取了很久之前的一段,而不是最新的一段 + if cur_duan.end < fx_list[0].index: + return False + + if cur_duan.direction != direction: return False # 当前段包含的分笔,必须大于3 - if len(duan.bi_list) <= 3: + if len(cur_duan.bi_list) < 3: return False - # 获取最近2个匹配direction的分型 - fx_list = [fx for fx in self._fenxing_list[-4:] if fx.direction == direction] - if len(fx_list) != 2: - return False - - # 这里是排除段的信号出错,获取了很久之前的一段,而不是最新的一段 - if duan.end < fx_list[0].index: - return False - - pre_dif = self.get_dif_by_dt(fx_list[0].index) - cur_dif = self.get_dif_by_dt(fx_list[1].index) - if pre_dif is None or cur_dif is None: + # 获取倒数第二根同向分笔的结束dif值或macd值 + pre_value = self.get_macd_by_dt(cur_duan.bi_list[-3].end) if use_macd else self.get_dif_by_dt( + cur_duan.bi_list[-3].end) + cur_value = self.get_macd_by_dt(cur_duan.bi_list[-1].end) if use_macd else self.get_dif_by_dt( + cur_duan.bi_list[-1].end) + if pre_value is None or cur_value is None: return False if direction == 1: # 前顶分型顶部价格 - pre_price = fx_list[0].high + pre_price = cur_duan.bi_list[-3].high # 当前顶分型顶部价格 - cur_price = fx_list[1].high - if pre_price < cur_price and pre_dif >= cur_dif and 0 < self.line_dif[-1] < self.line_dif[-2]: + cur_price = cur_duan.bi_list[-1].high + if pre_price < cur_price and pre_value >= cur_value > 0: return True else: - pre_price = fx_list[0].low - cur_price = fx_list[1].low - if pre_price > cur_price and pre_dif <= cur_dif and self.line_dif[-2] < self.line_dif[-1] < 0: + pre_price = cur_duan.bi_list[-3].low + cur_price = cur_duan.bi_list[-1].low + if pre_price > cur_price and pre_value <= cur_value < 0: return True return False @@ -6511,7 +6536,7 @@ class CtaMinuteBar(CtaLineBar): is_new_bar = False # 不在同一交易日,推入新bar - if len(tick.trading_day) > 0 and len(self.cur_trading_day) > 0 and self.cur_trading_day != tick.trading_day: + if len(tick.trading_day) > 0 and len(self.cur_trading_day) > 0 and self.cur_trading_day != tick.trading_day: # self.write_log('{} drawLineBar() new_bar,{} curTradingDay:{},tick.trading_day:{} bars_count={}' # .format(self.name, tick.datetime.strftime("%Y-%m-%d %H:%M:%S"), self.cur_trading_day, # tick.trading_day, self.bars_count)) diff --git a/vnpy/trader/object.py b/vnpy/trader/object.py index efdc4e99..fc971295 100644 --- a/vnpy/trader/object.py +++ b/vnpy/trader/object.py @@ -43,10 +43,10 @@ class TickData(BaseData): trading_day: str = "" # '%Y-%m-%d' name: str = "" - volume: float = 0 + volume: float = 0 # 当前交易日累计成交量 open_interest: float = 0 last_price: float = 0 - last_volume: float = 0 + last_volume: float = 0 # 当前切片的成交量 limit_up: float = 0 limit_down: float = 0 diff --git a/vnpy/trader/utility.py b/vnpy/trader/utility.py index 4f1a572b..99882aa7 100644 --- a/vnpy/trader/utility.py +++ b/vnpy/trader/utility.py @@ -622,6 +622,7 @@ def display_dual_axis(df, columns1, columns2=[], invert_yaxis1=False, invert_yax :param file_name: 保存的excel 文件名称 :param sheet_name: excel 的sheet :param image_name: 保存的image 文件名 + 颜色色系:https://www.osgeo.cn/matplotlib/tutorials/colors/colormaps.html :return: """ @@ -634,7 +635,7 @@ def display_dual_axis(df, columns1, columns2=[], invert_yaxis1=False, invert_yax fig, ax1 = plt.subplots() if invert_yaxis1: ax1.invert_yaxis() - ax1.plot(df1) + df1.plot(ax=ax1,cmap='tab20b') if len(columns2) > 0: df2 = df[columns2] @@ -642,7 +643,8 @@ def display_dual_axis(df, columns1, columns2=[], invert_yaxis1=False, invert_yax ax2 = ax1.twinx() if invert_yaxis2: ax2.invert_yaxis() - ax2.plot(df2) + df2.plot(ax=ax2, cmap='tab20c') + # 修改x轴得label为时间 xt = ax1.get_xticks()