vnpy/prod/jobs/refill_binance_spot_bars.py

132 lines
4.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# flake8: noqa
import os
import sys
import csv
import pandas as pd
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
if ROOT_PATH not in sys.path:
sys.path.append(ROOT_PATH)
print(f'append {ROOT_PATH} into sys.path')
from datetime import datetime, timedelta
from vnpy.data.binance.binance_spot_data import BinanceSpotData, HistoryRequest, Exchange, Interval
from vnpy.trader.utility import get_csv_last_dt, append_data
# 获取币安现货交易的所有合约
spot_data = BinanceSpotData()
contracts = BinanceSpotData.load_contracts()
if len(contracts) == 0:
spot_data.save_contracts()
contracts = BinanceSpotData.load_contracts()
# 开始下载日期
start_date = '20170101'
if __name__ == "__main__":
if len(sys.argv) >= 2:
interval = str(sys.argv[1]).lower()
if interval.isdecimal():
interval_num = int(sys.argv[1])
interval_type = Interval.MINUTE
else:
if 'm' in interval:
interval_type = Interval.MINUTE
interval_num = int(interval.replace('m', ''))
elif 'h' in interval:
interval_type = Interval.HOUR
interval_num = int(interval.replace('h', ''))
elif 'd' in interval:
interval_type = Interval.DAILY
interval_num = int(interval.replace('d', ''))
else:
interval = '1m'
interval_num = 1
interval_type = Interval.MINUTE
def download_symbol(symbol, start_dt, bar_file_path, interval_type, interval_num):
req = HistoryRequest(
symbol=symbol,
exchange=Exchange(contract_info.get('exchange')),
interval=interval_type,
interval_num=interval_num,
start=start_dt
)
bars = spot_data.get_bars(req=req, return_dict=True)
spot_data.export_to(bars, file_name=bar_file_path)
# 逐一合约进行下载
for vt_symbol, contract_info in contracts.items():
symbol = contract_info.get('symbol')
if symbol not in ['BTCUSDT', 'ETHUSDT']:
continue
bar_file_path = os.path.abspath(os.path.join(
ROOT_PATH,
'bar_data',
'binance_spot',
f'{symbol}_{start_date}_{interval}.csv'))
# 不存在文件,直接下载,并保存
if not os.path.exists(bar_file_path):
print(f'文件{bar_file_path}不存在,开始时间:{start_date}')
start_dt = datetime.strptime(start_date, '%Y%m%d')
download_symbol(symbol, start_dt, bar_file_path, interval_type, interval_num)
continue
# 如果存在文件获取最后的bar时间
last_dt = get_csv_last_dt(bar_file_path)
# 获取不到时间,重新下载
if last_dt is None:
print(f'获取文件{bar_file_path}的最后时间失败,开始时间:{start_date}')
start_dt = datetime.strptime(start_date, '%Y%m%d')
download_symbol(symbol, start_dt, bar_file_path, interval_type, interval_num)
continue
# 获取到时间,变成那天的开始时间,下载数据
start_dt = last_dt.replace(hour=0, minute=0, second=0, microsecond=0)
print(f'文件{bar_file_path}存在,最后时间:{last_dt}, 调整数据获取开始时间:{start_dt}')
req = HistoryRequest(
symbol=symbol,
exchange=Exchange(contract_info.get('exchange')),
interval=interval_type,
interval_num=interval_num,
start=start_dt
)
bars = spot_data.get_bars(req=req, return_dict=True)
if len(bars) <= 0:
print(f'下载{symbol} {interval_num} {interval_type.value} 数据为空白')
continue
bar_count = 0
# 获取标题
headers = []
with open(bar_file_path, "r", encoding='utf8') as f:
reader = csv.reader(f)
for header in reader:
headers = header
break
# 写入所有大于最后bar时间的数据
with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile:
writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel',
extrasaction='ignore')
for bar in bars:
if bar['datetime'] <= last_dt:
continue
bar_count += 1
writer.writerow(bar)
print(f'更新{symbol}数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}')