vnpy/prod/jobs/refill_binance_future_bars.py

130 lines
4.5 KiB
Python
Raw Permalink Normal View History

# flake8: noqa
import os
import sys
import csv
import pandas as pd
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
if ROOT_PATH not in sys.path:
sys.path.append(ROOT_PATH)
print(f'append {ROOT_PATH} into sys.path')
from datetime import datetime, timedelta
from vnpy.data.binance.binance_future_data import BinanceFutureData, HistoryRequest, Exchange, Interval
from vnpy.trader.utility import get_csv_last_dt, append_data
# 获取币安合约交易的所有期货合约
future_data = BinanceFutureData()
2021-05-16 02:46:25 +00:00
# 获取全量合约信息
future_data.save_contracts()
contracts = BinanceFutureData.load_contracts()
# 开始下载日期
start_date = '20190101'
if __name__ == "__main__":
if len(sys.argv) >= 2:
interval = str(sys.argv[1]).lower()
if interval.isdecimal():
interval_num = int(sys.argv[1])
interval_type = Interval.MINUTE
else:
if 'm' in interval:
interval_type = Interval.MINUTE
interval_num = int(interval.replace('m', ''))
elif 'h' in interval:
interval_type = Interval.HOUR
interval_num = int(interval.replace('h', ''))
elif 'd' in interval:
interval_type = Interval.DAILY
interval_num = int(interval.replace('d', ''))
else:
interval = '1m'
interval_num = 1
interval_type = Interval.MINUTE
def download_symbol(symbol, start_dt, bar_file_path, interval_type, interval_num):
req = HistoryRequest(
symbol=symbol,
exchange=Exchange(contract_info.get('exchange')),
interval=interval_type,
interval_num=interval_num,
start=start_dt
)
bars = future_data.get_bars(req=req, return_dict=True)
future_data.export_to(bars, file_name=bar_file_path)
# 逐一合约进行下载
for vt_symbol, contract_info in contracts.items():
symbol = contract_info.get('symbol')
bar_file_path = os.path.abspath(os.path.join(
ROOT_PATH,
'bar_data',
'binance',
f'{symbol}_{start_date}_{interval}.csv'))
# 不存在文件,直接下载,并保存
if not os.path.exists(bar_file_path):
print(f'文件{bar_file_path}不存在,开始时间:{start_date}')
start_dt = datetime.strptime(start_date, '%Y%m%d')
download_symbol(symbol, start_dt, bar_file_path, interval_type, interval_num)
continue
# 如果存在文件获取最后的bar时间
last_dt = get_csv_last_dt(bar_file_path)
# 获取不到时间,重新下载
if last_dt is None:
print(f'获取文件{bar_file_path}的最后时间失败,开始时间:{start_date}')
start_dt = datetime.strptime(start_date, '%Y%m%d')
download_symbol(symbol, start_dt, bar_file_path, interval_type, interval_num)
continue
# 获取到时间,变成那天的开始时间,下载数据
start_dt = last_dt.replace(hour=0, minute=0, second=0, microsecond=0)
print(f'文件{bar_file_path}存在,最后时间:{last_dt}, 调整数据获取开始时间:{start_dt}')
req = HistoryRequest(
symbol=symbol,
exchange=Exchange(contract_info.get('exchange')),
interval=interval_type,
interval_num=interval_num,
start=start_dt
)
bars = future_data.get_bars(req=req, return_dict=True)
if len(bars) <= 0:
print(f'下载{symbol} {interval_num} {interval_type.value} 数据为空白')
continue
bar_count = 0
# 获取标题
headers = []
with open(bar_file_path, "r", encoding='utf8') as f:
reader = csv.reader(f)
for header in reader:
headers = header
break
# 写入所有大于最后bar时间的数据
with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile:
writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel',
extrasaction='ignore')
for bar in bars:
if bar['datetime'] <= last_dt:
continue
bar_count += 1
writer.writerow(bar)
print(f'更新{symbol}数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}')