diff --git a/prod/jobs/refill_tdx_stock_bars.py b/prod/jobs/refill_tdx_stock_bars.py index 30d0fc45..a6944b10 100644 --- a/prod/jobs/refill_tdx_stock_bars.py +++ b/prod/jobs/refill_tdx_stock_bars.py @@ -1,6 +1,6 @@ # flake8: noqa """ -下载通达信股票合约1分钟bar => vnpy项目目录/bar_data/ +下载通达信股票合约1分钟&日线bar => vnpy项目目录/bar_data/ 上海股票 => SSE子目录 深圳股票 => SZSE子目录 """ @@ -18,8 +18,10 @@ if vnpy_root not in sys.path: os.environ["VNPY_TESTING"] = "1" from vnpy.data.tdx.tdx_stock_data import * +from vnpy.data.common import resample_bars_file from vnpy.trader.utility import load_json from vnpy.trader.utility import get_csv_last_dt +from vnpy.trader.util_wechat import send_wx_msg # 保存的1分钟指数 bar目录 bar_data_folder = os.path.abspath(os.path.join(vnpy_root, 'bar_data')) @@ -30,83 +32,100 @@ start_date = '20160101' # 创建API对象 api_01 = TdxStockData() -# 更新本地合约缓存信息 +# 额外需要数据下载的基金列表 stock_list = load_json('stock_list.json') symbol_dict = api_01.symbol_dict -# 逐一合约下载并更新 -for stock_code in stock_list: - market_id = get_tdx_market_code(stock_code) - if market_id == 0: - exchange_name = '深交所' - exchange = Exchange.SZSE - else: - exchange_name = '上交所' - exchange = Exchange.SSE +# 下载所有的股票数据 +num_stocks = 0 +for period in ['1min', '1day']: + for symbol in symbol_dict.keys(): + symbol_info = symbol_dict[symbol] + stock_code = symbol_info['code'] + if ('stock_type' in symbol_info.keys() and symbol_info['stock_type'] in ['stock_cn', 'cb_cn']) or stock_code in stock_list: + # if stock_code in stock_list: + # print(symbol_info['code']) + if symbol_info['exchange'] == 'SZSE': + exchange_name = '深交所' + exchange = Exchange.SZSE + else: + exchange_name = '上交所' + exchange = Exchange.SSE + else: + continue + num_stocks += 1 - symbol_info = symbol_dict.get(f'{stock_code}_{market_id}') - stock_name = symbol_info.get('name') - print(f'开始更新:{exchange_name}/{stock_name}, 代码:{stock_code}') - bar_file_folder = os.path.abspath(os.path.join(bar_data_folder, f'{exchange.value}')) - if not os.path.exists(bar_file_folder): - os.makedirs(bar_file_folder) - # csv数据文件名 - bar_file_path = os.path.abspath(os.path.join(bar_file_folder, f'{stock_code}_{start_date}_1m.csv')) + stock_name = symbol_info.get('name') + print(f'开始更新:{exchange_name}/{stock_name}, 代码:{stock_code}') + bar_file_folder = os.path.abspath(os.path.join(bar_data_folder, f'{exchange.value}')) + if not os.path.exists(bar_file_folder): + os.makedirs(bar_file_folder) + # csv数据文件名 + bar_file_path = os.path.abspath(os.path.join(bar_file_folder, f'{stock_code}_{period[0:2]}.csv')) - # 如果文件存在, - if os.path.exists(bar_file_path): - # 取最后一条时间 - last_dt = get_csv_last_dt(bar_file_path) - else: - last_dt = None + # 如果文件存在, + if os.path.exists(bar_file_path): + # 取最后一条时间 + last_dt = get_csv_last_dt(bar_file_path) + else: + last_dt = None - if last_dt: - start_dt = last_dt - timedelta(days=1) - print(f'文件{bar_file_path}存在,最后时间:{start_date}') - else: - start_dt = datetime.strptime(start_date, '%Y%m%d') - print(f'文件{bar_file_path}不存在,或读取最后记录错误,开始时间:{start_date}') + if last_dt: + start_dt = last_dt - timedelta(days=1) + print(f'文件{bar_file_path}存在,最后时间:{start_date}') + else: + start_dt = datetime.strptime(start_date, '%Y%m%d') + print(f'文件{bar_file_path}不存在,或读取最后记录错误,开始时间:{start_date}') - result, bars = api_01.get_bars(symbol=stock_code, - period='1min', - callback=None, - start_dt=start_dt, - return_bar=False) - # [dict] => dataframe - if not result or len(bars) == 0: - continue - if last_dt is None: - data_df = pd.DataFrame(bars) - data_df.set_index('datetime', inplace=True) - data_df = data_df.sort_index() - # print(data_df.head()) - print(data_df.tail()) - data_df.to_csv(bar_file_path, index=True) - print(f'首次更新{stock_code} {stock_name}数据 => 文件{bar_file_path}') - continue + result, bars = api_01.get_bars(symbol=stock_code, + period=period, + callback=None, + start_dt=start_dt, + return_bar=False) + # [dict] => dataframe + if not result or len(bars) == 0: + continue - # 获取标题 - headers = [] - with open(bar_file_path, "r", encoding='utf8') as f: - reader = csv.reader(f) - for header in reader: - headers = header - break + # 全新数据 + if last_dt is None: + data_df = pd.DataFrame(bars) + data_df.set_index('datetime', inplace=True) + data_df = data_df.sort_index() + # print(data_df.head()) + print(data_df.tail()) + data_df.to_csv(bar_file_path, index=True) + print(f'首次更新{stock_code} {stock_name}数据 => 文件{bar_file_path}') - bar_count = 0 - # 写入所有大于最后bar时间的数据 - with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile: + # 增量更新 + else: + # 获取标题 + headers = [] + with open(bar_file_path, "r", encoding='utf8') as f: + reader = csv.reader(f) + for header in reader: + headers = header + break - writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel', - extrasaction='ignore') - for bar in bars: - if bar['datetime'] <= last_dt: - continue - bar_count += 1 - writer.writerow(bar) + bar_count = 0 + # 写入所有大于最后bar时间的数据 + # with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile: + with open(bar_file_path, 'a', encoding='utf8') as csvWriteFile: - print(f'更新{stock_code} {stock_name} 数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}') + writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel', + extrasaction='ignore') + for bar in bars: + if bar['datetime'] <= last_dt: + continue + bar_count += 1 + writer.writerow(bar) -print('更新完毕') + print(f'更新{stock_code} {stock_name} 数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}') + + # 输出 5、15、30分钟的数据 + if period == '1min': + out_files, err_msg = resample_bars_file(vnpy_root=vnpy_root, symbol=stock_code, exchange=exchange, x_mins=[5,15,30]) + +msg = 'tdx股票数据补充完毕: num_stocks={}'.format(num_stocks) +send_wx_msg(content=msg) os._exit(0) diff --git a/vnpy/data/__init__.py b/vnpy/data/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/vnpy/data/common.py b/vnpy/data/common.py new file mode 100644 index 00000000..1a22bdb0 --- /dev/null +++ b/vnpy/data/common.py @@ -0,0 +1,78 @@ +import os +import pandas as pd + + +def resample_bars_file(vnpy_root, symbol, exchange, x_mins=[], include_day=False): + """ + 重建x分钟K线(和日线)csv文件 + :param symbol: + :param x_mins: [5, 15, 30, 60] + :param include_day: 是否也重建日线 + :return: out_files,err_msg + """ + err_msg = "" + out_files = [] + + # 1分钟 csv文件路径 + csv_file = os.path.abspath(os.path.join(vnpy_root, 'bar_data', exchange.value, f'{symbol}_1m.csv')) + + if not os.path.exists(csv_file): + err_msg = f'{csv_file} 文件不存在,不能转换' + return out_files, err_msg + + # 载入1分钟csv => dataframe + df_1m = pd.read_csv(csv_file) + + datetime_format = "%Y-%m-%d %H:%M:%S" + # 转换时间,str =》 datetime + df_1m["datetime"] = pd.to_datetime(df_1m["datetime"], format=datetime_format) + # 使用'datetime'字段作为索引 + df_1m.set_index("datetime", inplace=True) + + # 设置df数据中每列的规则 + ohlc_rule = { + 'open': 'first', # open列:序列中第一个的值 + 'high': 'max', # high列:序列中最大的值 + 'low': 'min', # low列:序列中最小的值 + 'close': 'last', # close列:序列中最后一个的值 + 'volume': 'sum', # volume列:将所有序列里的volume值作和 + 'amount': 'sum', # amount列:将所有序列里的amount值作和 + "symbol": 'first', + "trading_date": 'first', + "date": 'first', + "time": 'first', + # "pre_close": 'first', + # "turnover_rate": 'last', + # "change_rate": 'last' + } + + for x_min in x_mins: + # 目标文件 + target_file = os.path.abspath( + os.path.join(vnpy_root, 'bar_data', exchange.value, f'{symbol}_{x_min}m.csv')) + # 合成x分钟K线并删除为空的行 参数 closed:left类似向上取值既 09:30的k线数据是包含09:30-09:35之间的数据 + #df_target = df_1m.resample(f'{x_min}min', how=ohlc_rule, closed='left', label='left').dropna(axis=0, how='any') + df_target = df_1m.resample(f'{x_min}min', closed='left', label='left').agg(ohlc_rule).dropna(axis=0, + how='any') + # dropna(axis=0, how='any') axis参数0:针对行进行操作 1:针对列进行操作 how参数any:只要包含就删除 all:全是为NaN才删除 + + if len(df_target) > 0: + df_target.to_csv(target_file) + print(f'生成[{x_min}分钟] => {target_file}') + out_files.append(target_file) + + if include_day: + # 目标文件 + target_file = os.path.abspath( + os.path.join(vnpy_root, 'bar_data', exchange.value, f'{symbol}_1d.csv')) + # 合成x分钟K线并删除为空的行 参数 closed:left类似向上取值既 09:30的k线数据是包含09:30-09:35之间的数据 + # df_target = df_1m.resample(f'D', how=ohlc_rule, closed='left', label='left').dropna(axis=0, how='any') + df_target = df_1m.resample(f'D', closed='left', label='left').agg(ohlc_rule).dropna(axis=0, how='any') + # dropna(axis=0, how='any') axis参数0:针对行进行操作 1:针对列进行操作 how参数any:只要包含就删除 all:全是为NaN才删除 + + if len(df_target) > 0: + df_target.to_csv(target_file) + print(f'生成[日线] => {target_file}') + out_files.append(target_file) + + return out_files,err_msg