From d100ff6e579f52dfa08323d09f29590c680c4dcd Mon Sep 17 00:00:00 2001 From: msincenselee Date: Sun, 26 Apr 2020 22:04:00 +0800 Subject: [PATCH] =?UTF-8?q?[=E5=A2=9E=E5=BC=BA=E5=8A=9F=E8=83=BD]=20?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=AF=8F=E6=97=A5=E4=BB=BB=E5=8A=A1=EF=BC=8C?= =?UTF-8?q?=E5=AE=9A=E6=97=B6=E8=A1=A5=E5=85=85=E8=82=A1=E7=A5=A8/?= =?UTF-8?q?=E6=9C=9F=E8=B4=A71=E5=88=86=E9=92=9F=E6=95=B0=E6=8D=AE=20=3D>?= =?UTF-8?q?=20csv=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- prod/jobs/refill_tdx_future_bars.py | 61 +++++++++------ prod/jobs/refill_tdx_stock_bars.py | 113 ++++++++++++++++++++++++++++ prod/jobs/stock_list.json | 4 + 3 files changed, 156 insertions(+), 22 deletions(-) create mode 100644 prod/jobs/refill_tdx_stock_bars.py create mode 100644 prod/jobs/stock_list.json diff --git a/prod/jobs/refill_tdx_future_bars.py b/prod/jobs/refill_tdx_future_bars.py index 073225b6..1d72563d 100644 --- a/prod/jobs/refill_tdx_future_bars.py +++ b/prod/jobs/refill_tdx_future_bars.py @@ -1,10 +1,11 @@ # flake8: noqa """ -下载通达信指数合约1分钟bar => vnpy项目目录/bar_data/ +下载通达信指数合约1分钟bar => vnpy项目目录/bar_data/tdx/ """ import os import sys import json +import csv from collections import OrderedDict import pandas as pd @@ -15,6 +16,7 @@ if vnpy_root not in sys.path: os.environ["VNPY_TESTING"] = "1" from vnpy.data.tdx.tdx_future_data import * +from vnpy.trader.utility import get_csv_last_dt # 保存的1分钟指数 bar目录 bar_data_folder = os.path.abspath(os.path.join(vnpy_root, 'bar_data')) @@ -38,14 +40,14 @@ for underlying_symbol in api_01.future_contracts.keys(): # 如果文件存在, if os.path.exists(bar_file_path): - df_old = pd.read_csv(bar_file_path, index_col=0) - df_old = df_old.rename(lambda x: pd.to_datetime(x, format="%Y-%m-%d %H:%M:%S")) + #df_old = pd.read_csv(bar_file_path, index_col=0) + #df_old = df_old.rename(lambda x: pd.to_datetime(x, format="%Y-%m-%d %H:%M:%S")) # 取最后一条时间 - last_dt = df_old.index[-1] + last_dt = get_csv_last_dt(bar_file_path) start_dt = last_dt - timedelta(days=1) print(f'文件{bar_file_path}存在,最后时间:{start_date}') else: - df_old = None + last_dt = None start_dt = datetime.strptime(start_date, '%Y%m%d') print(f'文件{bar_file_path}不存在,开始时间:{start_date}') @@ -57,25 +59,40 @@ for underlying_symbol in api_01.future_contracts.keys(): # [dict] => dataframe if not result or len(bars) == 0: continue - df_extern = pd.DataFrame(bars) - df_extern.set_index('datetime', inplace=True) - if df_old is not None: - # 扩展数据 - print('扩展数据') - data_df = pd.concat([df_old, df_extern], axis=0) - else: - data_df = df_extern + if last_dt is None: + data_df = pd.DataFrame(bars) + data_df.set_index('datetime', inplace=True) + data_df = data_df.sort_index() + # print(data_df.head()) + print(data_df.tail()) + data_df.to_csv(bar_file_path, index=True) + print(f'首次更新{index_symbol} 数据 => 文件{bar_file_path}') + continue + + + # 获取标题 + headers = [] + with open(bar_file_path, "r", encoding='utf8') as f: + reader = csv.reader(f) + for header in reader: + headers = header + break + + bar_count = 0 + # 写入所有大于最后bar时间的数据 + with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile: + + writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel', + extrasaction='ignore') + for bar in bars: + if bar['datetime'] <= last_dt: + continue + bar_count += 1 + writer.writerow(bar) + + print(f'更新{index_symbol} 数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}') - # 数据按时间戳去重 - print('按时间戳去重') - data_df = data_df[~data_df.index.duplicated(keep='first')] - # 排序 - data_df = data_df.sort_index() - # print(data_df.head()) - print(data_df.tail()) - data_df.to_csv(bar_file_path, index=True) - print(f'更新{index_symbol}数据 => 文件{bar_file_path}') print('更新完毕') os._exit(0) diff --git a/prod/jobs/refill_tdx_stock_bars.py b/prod/jobs/refill_tdx_stock_bars.py new file mode 100644 index 00000000..689ddb14 --- /dev/null +++ b/prod/jobs/refill_tdx_stock_bars.py @@ -0,0 +1,113 @@ +# flake8: noqa +""" +下载通达信指数合约1分钟bar => vnpy项目目录/bar_data/ +上海股票 => SSE子目录 +深圳股票 => SZSE子目录 +""" +import os +import sys +import csv +import json +from collections import OrderedDict +import pandas as pd + +vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +if vnpy_root not in sys.path: + sys.path.append(vnpy_root) + +os.environ["VNPY_TESTING"] = "1" + +from vnpy.data.tdx.tdx_stock_data import * +from vnpy.trader.utility import load_json +from vnpy.trader.utility import get_csv_last_dt + +# 保存的1分钟指数 bar目录 +bar_data_folder = os.path.abspath(os.path.join(vnpy_root, 'bar_data')) + +# 开始日期(每年大概需要几分钟) +start_date = '20160101' + +# 创建API对象 +api_01 = TdxStockData() + +# 更新本地合约缓存信息 +stock_list = load_json('stock_list.json') + +symbol_dict = api_01.symbol_dict + +# 逐一指数合约下载并更新 +for stock_code in stock_list: + market_id = get_tdx_market_code(stock_code) + if market_id == 0: + exchange_name = '深交所' + exchange = Exchange.SZSE + else: + exchange_name = '上交所' + exchange = Exchange.SSE + + symbol_info = symbol_dict.get(f'{stock_code}_{market_id}') + stock_name = symbol_info.get('name') + print(f'开始更新:{exchange_name}/{stock_name}, 代码:{stock_code}') + bar_file_folder = os.path.abspath(os.path.join(bar_data_folder, f'{exchange.value}')) + if not os.path.exists(bar_file_folder): + os.makedirs(bar_file_folder) + # csv数据文件名 + bar_file_path = os.path.abspath(os.path.join(bar_file_folder, f'{stock_code}_{start_date}_1m.csv')) + + # 如果文件存在, + if os.path.exists(bar_file_path): + # 取最后一条时间 + last_dt = get_csv_last_dt(bar_file_path) + else: + last_dt = None + + if last_dt: + start_dt = last_dt - timedelta(days=1) + print(f'文件{bar_file_path}存在,最后时间:{start_date}') + else: + start_dt = datetime.strptime(start_date, '%Y%m%d') + print(f'文件{bar_file_path}不存在,或读取最后记录错误,开始时间:{start_date}') + + result, bars = api_01.get_bars(symbol=stock_code, + period='1min', + callback=None, + start_dt=start_dt, + return_bar=False) + # [dict] => dataframe + if not result or len(bars) == 0: + continue + if last_dt is None: + data_df = pd.DataFrame(bars) + data_df.set_index('datetime', inplace=True) + data_df = data_df.sort_index() + # print(data_df.head()) + print(data_df.tail()) + data_df.to_csv(bar_file_path, index=True) + print(f'首次更新{stock_code} {stock_name}数据 => 文件{bar_file_path}') + continue + + # 获取标题 + headers = [] + with open(bar_file_path, "r", encoding='utf8') as f: + reader = csv.reader(f) + for header in reader: + headers = header + break + + bar_count = 0 + # 写入所有大于最后bar时间的数据 + with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile: + + writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel', + extrasaction='ignore') + for bar in bars: + if bar['datetime'] <= last_dt: + continue + bar_count += 1 + writer.writerow(bar) + + print(f'更新{stock_code} {stock_name} 数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}') + + +print('更新完毕') +os._exit(0) diff --git a/prod/jobs/stock_list.json b/prod/jobs/stock_list.json new file mode 100644 index 00000000..94135eb1 --- /dev/null +++ b/prod/jobs/stock_list.json @@ -0,0 +1,4 @@ +[ + "002600", + "600410" +]