From b71d0ad919d48662dc18998959924cbedc4aa2b1 Mon Sep 17 00:00:00 2001 From: msincenselee Date: Fri, 7 Aug 2020 15:17:41 +0800 Subject: [PATCH] =?UTF-8?q?[=E5=A2=9E=E5=BC=BA=E5=8A=9F=E8=83=BD]=20?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E8=B4=A6=E5=8F=B7=E7=BA=A7=E5=88=AB=E8=82=A1?= =?UTF-8?q?=E6=8C=87=E5=AF=B9=E9=94=81=E3=80=82=E6=9B=B4=E6=96=B0=E7=B3=BB?= =?UTF-8?q?=E5=88=97jobs=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- prod/jobs/check_dominat_symbol.py | 106 ++++++++++ prod/jobs/export_future_renko_bars.py | 64 ++++++ prod/jobs/export_future_renko_bars.sh | 14 ++ prod/jobs/refill_bao_stock_bars.py | 212 +++++++++++-------- prod/jobs/refill_binance_future_bars.py | 148 +++++++------ prod/jobs/refill_binance_spot_bars.py | 131 ++++++++++++ prod/jobs/refill_future_renko_all.sh | 81 +++++++ prod/jobs/refill_rqdata_stock_bars.py | 98 +++++++++ prod/jobs/refill_tdx_cb_stock_bars.py | 112 ++++++++++ prod/jobs/refill_tdx_future_bars.py | 143 +++++++------ prod/jobs/refill_tdx_stock_bars.py | 2 +- prod/jobs/refill_tq_future_ticks.py | 17 +- prod/jobs/remove_expired_logs.py | 52 +++++ prod/jobs/remove_expired_logs.sh | 24 +++ prod/jobs/remove_future_renko.py | 24 +++ vnpy/app/cta_strategy_pro/engine.py | 2 + vnpy/app/cta_strategy_pro/template.py | 10 +- vnpy/app/cta_strategy_pro/template_spread.py | 130 +++++++----- vnpy/gateway/binance/binance_gateway.py | 2 +- vnpy/gateway/pb/pb_gateway.py | 37 ++-- 20 files changed, 1112 insertions(+), 297 deletions(-) create mode 100644 prod/jobs/check_dominat_symbol.py create mode 100644 prod/jobs/export_future_renko_bars.py create mode 100644 prod/jobs/export_future_renko_bars.sh create mode 100644 prod/jobs/refill_binance_spot_bars.py create mode 100644 prod/jobs/refill_future_renko_all.sh create mode 100644 prod/jobs/refill_rqdata_stock_bars.py create mode 100644 prod/jobs/refill_tdx_cb_stock_bars.py create mode 100644 prod/jobs/remove_expired_logs.py create mode 100644 prod/jobs/remove_expired_logs.sh create mode 100644 prod/jobs/remove_future_renko.py diff --git a/prod/jobs/check_dominat_symbol.py b/prod/jobs/check_dominat_symbol.py new file mode 100644 index 00000000..64d8b865 --- /dev/null +++ b/prod/jobs/check_dominat_symbol.py @@ -0,0 +1,106 @@ +# flake8: noqa +""" +更新主力合约 +""" +import os +import sys +import json +from collections import OrderedDict +import pandas as pd + +vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +if vnpy_root not in sys.path: + sys.path.append(vnpy_root) + +os.environ["VNPY_TESTING"] = "1" + +from vnpy.data.tdx.tdx_future_data import * +from vnpy.trader.util_wechat import send_wx_msg +from vnpy.trader.utility import load_json, save_json + +if __name__ == "__main__": + + if len(sys.argv) < 2: + print(f'请输入{vnpy_root}下检查目录,例如 prod/account01', file=sys.stderr) + exit() + print(sys.argv) + + for account_folder in sys.argv[1:]: + cta_path = os.path.abspath(os.path.join(vnpy_root, account_folder)) + if not os.path.exists(cta_path): + print(f'{cta_path}不存在', file=sys.stderr) + exit() + print(f'开始检查{cta_path}下的策略运行配置文件') + account_name = account_folder.split('/')[-1] + # 创建API对象 + api_01 = TdxFutureData() + + # 更新本地合约缓存信息 + api_01.update_mi_contracts() + + setting_file_path = os.path.abspath(os.path.join(cta_path, 'cta_strategy_pro_setting.json')) + settings = load_json(setting_file_path, auto_save=False) + + if len(settings) == 0: + print('无策略配置') + os._exit(0) + + changed = False + for strategy_name, setting in settings.items(): + + vt_symbol = setting.get('vt_symbol') + if not vt_symbol: + print(f'{strategy_name}配置中无vt_symbol', file=sys.stderr) + continue + + if '.' in vt_symbol: + symbol, exchange = vt_symbol.split('.') + else: + symbol = vt_symbol + exchange = None + + if exchange == Exchange.SPD: + print(f"暂不处理自定义套利合约{vt_symbol}") + continue + + full_symbol = get_full_symbol(symbol).upper() + + underlying_symbol = get_underlying_symbol(symbol).upper() + + contract_info = api_01.future_contracts.get(underlying_symbol) + + if not contract_info: + print(f'{account_name}主力合约配置中,找不到{underlying_symbol}', file=sys.stderr) + continue + if 'mi_symbol' not in contract_info or 'exchange' not in contract_info or 'full_symbol' not in contract_info: + print(f'{account_name}主力合约配置中,找不到mi_symbol/exchange/full_symbol. {contract_info}', file=sys.stderr) + continue + + new_mi_symbol = contract_info.get('mi_symbol') + new_exchange = contract_info.get('exchange') + + new_vt_symbol = '.'.join([new_mi_symbol, new_exchange]) + new_full_symbol = contract_info.get('full_symbol', '').upper() + if full_symbol >= new_full_symbol: + print(f'{account_name}策略配置:长合约{full_symbol}, 主力长合约{new_full_symbol},不更新') + continue + + if exchange: + if len(vt_symbol) != len(new_vt_symbol): + print(f'{account_name}配置中,合约{vt_symbol} 与{new_vt_symbol} 长度不匹配,不更新', file=sys.stderr) + continue + else: + if len(symbol) != len(new_mi_symbol): + print(f'{account_name}配置中,合约{vt_symbol} 与{new_mi_symbol} 长度不匹配,不更新', file=sys.stderr) + continue + + setting.update({'vt_symbol': new_vt_symbol}) + send_wx_msg(f'{account_name}{strategy_name} 主力合约更换:{vt_symbol} => {new_vt_symbol} ') + changed = True + + if changed: + save_json(setting_file_path, settings) + print(f'保存{account_name}新配置') + + print('更新完毕') + os._exit(0) diff --git a/prod/jobs/export_future_renko_bars.py b/prod/jobs/export_future_renko_bars.py new file mode 100644 index 00000000..19ef99af --- /dev/null +++ b/prod/jobs/export_future_renko_bars.py @@ -0,0 +1,64 @@ +# flake8: noqa +# 自动导出mongodb补全期货指数合约renko bar => csv文件 +# 供renko bar 批量测试使用 +import sys, os, copy, csv, signal + +vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +if vnpy_root not in sys.path: + print(f'append {vnpy_root} into sys.path') + sys.path.append(vnpy_root) + +os.environ["VNPY_TESTING"] = "1" + +from vnpy.data.renko.rebuild_future import * + +if __name__ == "__main__": + + if len(sys.argv) < 2: + print(f'请输入参数 host \n ' + f'例如: python export_future_renko_bars.py 127.0.0.1 {FUTURE_RENKO_DB_NAME} RB99') + exit() + print(sys.argv) + # Mongo host + host = sys.argv[1] + # 数据库 + if len(sys.argv) >= 3: + db_name = sys.argv[2] + else: + db_name = FUTURE_RENKO_DB_NAME + + # 导出指数合约 + if len(sys.argv) >= 4: + idx_symbol = sys.argv[3] + else: + idx_symbol = 'all' + + if len(sys.argv) >= 5: + start_date = sys.argv[4] + else: + start_date = '2016-01-01' + + if len(sys.argv) >= 6: + end_date = sys.argv[6] + else: + end_date = '2099-01-01' + + setting = { + "host": host, + "db_name": FUTURE_RENKO_DB_NAME, + "cache_folder": os.path.join(vnpy_root, 'tick_data', 'tdx', 'future') + } + builder = FutureRenkoRebuilder(setting) + + if idx_symbol.upper() == 'ALL': + print(u'导出所有合约') + csv_folder = os.path.abspath(os.path.join(vnpy_root, 'bar_data', 'future_renko')) + builder.export_all(start_date, end_date, csv_folder) + + else: + for height in [3, 5, 10, 'K3', 'K5', 'K10']: + csv_file = os.path.abspath(os.path.join(vnpy_root, 'bar_data', 'future_renko_{}_{}_{}_{}.csv' + .format(idx_symbol, height, start_date.replace('-', ''), + end_date.replace('-', '')))) + builder.export(symbol=idx_symbol, height=height, start_date=start_date, end_date=end_date, + csv_file=csv_file) diff --git a/prod/jobs/export_future_renko_bars.sh b/prod/jobs/export_future_renko_bars.sh new file mode 100644 index 00000000..47f5f6fb --- /dev/null +++ b/prod/jobs/export_future_renko_bars.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +CONDA_HOME=~/anaconda3 + +BASE_PATH=$(cd `dirname $0`; pwd) +echo $BASE_PATH +cd `dirname $0` +PROGRAM_NAME=./export_future_renko_bars.py + +# 定时 mongodb => future_renko bars +$CONDA_HOME/envs/py37/bin/python $PROGRAM_NAME 127.0.0.1 FutureRenko + + + diff --git a/prod/jobs/refill_bao_stock_bars.py b/prod/jobs/refill_bao_stock_bars.py index b125b490..73573bea 100644 --- a/prod/jobs/refill_bao_stock_bars.py +++ b/prod/jobs/refill_bao_stock_bars.py @@ -19,7 +19,7 @@ import baostock as bs from vnpy.trader.constant import Exchange from vnpy.data.tdx.tdx_common import get_tdx_market_code -from vnpy.trader.utility import load_json, get_csv_last_dt +from vnpy.trader.utility import load_json, get_csv_last_dt, extract_vt_symbol from vnpy.data.stock.stock_base import get_stock_base # 保存的1分钟指数 bar目录 bar_data_folder = os.path.abspath(os.path.join(vnpy_root, 'bar_data')) @@ -27,111 +27,135 @@ bar_data_folder = os.path.abspath(os.path.join(vnpy_root, 'bar_data')) # 开始日期(每年大概需要几分钟) start_date = '20060101' -# 证券宝连接 -login_msg = bs.login() -if login_msg.error_code != '0': - print(f'证券宝登录错误代码:{login_msg.error_code}, 错误信息:{login_msg.error_msg}') +if __name__ == "__main__": -# 更新本地合约缓存信息 -stock_list = load_json('stock_list.json') + # 证券宝连接 + login_msg = bs.login() + if login_msg.error_code != '0': + print(f'证券宝登录错误代码:{login_msg.error_code}, 错误信息:{login_msg.error_msg}') -symbol_dict = get_stock_base() - -day_fields = "date,code,open,high,low,close,preclose,volume,amount,adjustflag,turn,tradestatus,pctChg,isST" -min_fields = "date,time,code,open,high,low,close,volume,amount,adjustflag" - -# 逐一股票下载并更新 -for stock_code in stock_list: - market_id = get_tdx_market_code(stock_code) - if market_id == 0: - exchange_name = '深交所' - exchange = Exchange.SZSE - exchange_code = 'sz' + symbol_dict = get_stock_base() + if len(sys.argv) >= 2 and sys.argv[1].lower() == 'all': + stock_list = list(symbol_dict.keys()) + print('使用全量股票,共{}个'.format(len(stock_list))) else: - exchange_name = '上交所' - exchange = Exchange.SSE - exchange_code = 'sh' + # 更新本地合约缓存信息 + stock_list = load_json('stock_list.json') + print('读取本地stock_list.json文件,共{}个'.format(len(stock_list))) - symbol_info = symbol_dict.get(f'{stock_code}.{exchange.value}') - stock_name = symbol_info.get('name') - print(f'开始更新:{exchange_name}/{stock_name}, 代码:{stock_code}') - bar_file_folder = os.path.abspath(os.path.join(bar_data_folder, f'{exchange.value}')) - if not os.path.exists(bar_file_folder): - os.makedirs(bar_file_folder) - # csv数据文件名 - bar_file_path = os.path.abspath(os.path.join(bar_file_folder, f'{stock_code}_{start_date}_5m.csv')) - # 如果文件存在, - if os.path.exists(bar_file_path): - # df_old = pd.read_csv(bar_file_path, index_col=0) - # df_old = df_old.rename(lambda x: pd.to_datetime(x, format="%Y-%m-%d %H:%M:%S")) - # 取最后一条时间 - # last_dt = df_old.index[-1] - last_dt = get_csv_last_dt(bar_file_path) - start_dt = last_dt - timedelta(days=1) - print(f'文件{bar_file_path}存在,最后时间:{start_date}') - else: - last_dt = None - start_dt = datetime.strptime(start_date, '%Y%m%d') - print(f'文件{bar_file_path}不存在,开始时间:{start_date}') + day_fields = "date,code,open,high,low,close,preclose,volume,amount,adjustflag,turn,tradestatus,pctChg,isST" + min_fields = "date,time,code,open,high,low,close,volume,amount,adjustflag" - rs = bs.query_history_k_data_plus( - code=f'{exchange_code}.{stock_code}', - fields=min_fields, - start_date=start_dt.strftime('%Y-%m-%d'), end_date=datetime.now().strftime('%Y-%m-%d'), - frequency="5", - adjustflag="3" - ) - if rs.error_code != '0': - print(f'证券宝获取沪深A股历史K线数据错误代码:{rs.error_code}, 错误信息:{rs.error_msg}') - continue + count = 0 + # 逐一股票下载并更新 + for stock_code in stock_list: + count += 1 + print('下载进度:{}%'.format(round(count* 100/len(stock_list), 4))) + if '.' not in stock_code: + market_id = get_tdx_market_code(stock_code) + if market_id == 0: + exchange_name = '深交所' + exchange = Exchange.SZSE + exchange_code = 'sz' + else: + exchange_name = '上交所' + exchange = Exchange.SSE + exchange_code = 'sh' + symbol = stock_code + vt_symbol = f'{stock_code}.{exchange.value}' + else: + vt_symbol = stock_code + symbol, exchange = extract_vt_symbol(vt_symbol) + if exchange == Exchange.SSE: + exchange_name = '上交所' + exchange_code = 'sh' + else: + exchange_name = '深交所' + exchange_code = 'sz' - # [dict] => dataframe - bars = [] - while (rs.error_code == '0') and rs.next(): - row = rs.get_row_data() - dt = datetime.strptime(row[1], '%Y%m%d%H%M%S%f') - if last_dt and last_dt > dt: + symbol_info = symbol_dict.get(vt_symbol) + if symbol_info['类型'] == '指数': continue - bar = { - 'datetime': dt, - 'open': float(row[3]), - 'close': float(row[6]), - 'high': float(row[4]), - 'low': float(row[5]), - 'volume': float(row[7]), - 'amount': float(row[8]), - 'symbol': stock_code, - 'trading_date': row[0], - 'date': row[0], - 'time': dt.strftime('%H:%M:%S') - } - bars.append(bar) + stock_name = symbol_info.get('name') + print(f'开始更新:{exchange_name}/{stock_name}, 代码:{symbol}') + bar_file_folder = os.path.abspath(os.path.join(bar_data_folder, f'{exchange.value}')) + if not os.path.exists(bar_file_folder): + os.makedirs(bar_file_folder) + # csv数据文件名 + bar_file_path = os.path.abspath(os.path.join(bar_file_folder, f'{symbol}_5m.csv')) - # 获取标题 - if len(bars) == 0: - continue + # 如果文件存在, + if os.path.exists(bar_file_path): + # df_old = pd.read_csv(bar_file_path, index_col=0) + # df_old = df_old.rename(lambda x: pd.to_datetime(x, format="%Y-%m-%d %H:%M:%S")) + # 取最后一条时间 + # last_dt = df_old.index[-1] + last_dt = get_csv_last_dt(bar_file_path) + start_dt = last_dt - timedelta(days=1) + print(f'文件{bar_file_path}存在,最后时间:{start_dt}') + else: + last_dt = None + start_dt = datetime.strptime(start_date, '%Y%m%d') + print(f'文件{bar_file_path}不存在,开始时间:{start_dt}') - headers = list(bars[0].keys()) - if headers[0] != 'datetime': - headers.remove('datetime') - headers.insert(0, 'datetime') + rs = bs.query_history_k_data_plus( + code=f'{exchange_code}.{symbol}', + fields=min_fields, + start_date=start_dt.strftime('%Y-%m-%d'), end_date=datetime.now().strftime('%Y-%m-%d'), + frequency="5", + adjustflag="3" + ) + if rs.error_code != '0': + print(f'证券宝获取沪深A股历史K线数据错误代码:{rs.error_code}, 错误信息:{rs.error_msg}') + continue - bar_count = 0 - # 写入所有大于最后bar时间的数据 - with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile: + # [dict] => dataframe + bars = [] + while (rs.error_code == '0') and rs.next(): + row = rs.get_row_data() + dt = datetime.strptime(row[1], '%Y%m%d%H%M%S%f') + if last_dt and last_dt > dt: + continue + bar = { + 'datetime': dt, + 'open': float(row[3]), + 'close': float(row[6]), + 'high': float(row[4]), + 'low': float(row[5]), + 'volume': float(row[7]), + 'amount': float(row[8]), + 'symbol': symbol, + 'trading_date': row[0], + 'date': row[0], + 'time': dt.strftime('%H:%M:%S') + } + bars.append(bar) - writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel', - extrasaction='ignore') - if last_dt is None: - writer.writeheader() - for bar in bars: - bar_count += 1 - writer.writerow(bar) + # 获取标题 + if len(bars) == 0: + continue - print(f'更新{stock_code}数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}') + headers = list(bars[0].keys()) + if headers[0] != 'datetime': + headers.remove('datetime') + headers.insert(0, 'datetime') + + bar_count = 0 + # 写入所有大于最后bar时间的数据 + with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile: + + writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel', + extrasaction='ignore') + if last_dt is None: + writer.writeheader() + for bar in bars: + bar_count += 1 + writer.writerow(bar) + + print(f'更新{vt_symbol}数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}') -print('更新完毕') -bs.logout() -os._exit(0) + print('更新完毕') + bs.logout() + os._exit(0) diff --git a/prod/jobs/refill_binance_future_bars.py b/prod/jobs/refill_binance_future_bars.py index 799db24c..2c9d6c78 100644 --- a/prod/jobs/refill_binance_future_bars.py +++ b/prod/jobs/refill_binance_future_bars.py @@ -26,80 +26,104 @@ if len(contracts) == 0: # 开始下载日期 start_date = '20190101' -def download_symbol(symbol, start_dt, bar_file_path): - req = HistoryRequest( - symbol=symbol, - exchange=Exchange(contract_info.get('exchange')), - interval=Interval.MINUTE, - start=start_dt - ) +if __name__ == "__main__": - bars = future_data.get_bars(req=req, return_dict=True) - future_data.export_to(bars, file_name=bar_file_path) + if len(sys.argv) >= 2: + interval = str(sys.argv[1]).lower() + if interval.isdecimal(): + interval_num = int(sys.argv[1]) + interval_type = Interval.MINUTE + else: + if 'm' in interval: + interval_type = Interval.MINUTE + interval_num = int(interval.replace('m', '')) + elif 'h' in interval: + interval_type = Interval.HOUR + interval_num = int(interval.replace('h', '')) + elif 'd' in interval: + interval_type = Interval.DAILY + interval_num = int(interval.replace('d', '')) + else: + interval = '1m' + interval_num = 1 + interval_type = Interval.MINUTE -# 逐一合约进行下载 -for vt_symbol, contract_info in contracts.items(): - symbol = contract_info.get('symbol') + def download_symbol(symbol, start_dt, bar_file_path, interval_type, interval_num): + req = HistoryRequest( + symbol=symbol, + exchange=Exchange(contract_info.get('exchange')), + interval=interval_type, + interval_num=interval_num, + start=start_dt + ) - bar_file_path = os.path.abspath(os.path.join( - ROOT_PATH, - 'bar_data', - 'binance', - f'{symbol}_{start_date}_1m.csv')) + bars = future_data.get_bars(req=req, return_dict=True) + future_data.export_to(bars, file_name=bar_file_path) - # 不存在文件,直接下载,并保存 - if not os.path.exists(bar_file_path): - print(f'文件{bar_file_path}不存在,开始时间:{start_date}') - start_dt = datetime.strptime(start_date, '%Y%m%d') - download_symbol(symbol, start_dt, bar_file_path) - continue + # 逐一合约进行下载 + for vt_symbol, contract_info in contracts.items(): + symbol = contract_info.get('symbol') - # 如果存在文件,获取最后的bar时间 - last_dt = get_csv_last_dt(bar_file_path) + bar_file_path = os.path.abspath(os.path.join( + ROOT_PATH, + 'bar_data', + 'binance', + f'{symbol}_{start_date}_{interval}.csv')) - # 获取不到时间,重新下载 - if last_dt is None: - print(f'获取文件{bar_file_path}的最后时间失败,开始时间:{start_date}') - start_dt = datetime.strptime(start_date, '%Y%m%d') - download_symbol(symbol, start_dt, bar_file_path) - continue + # 不存在文件,直接下载,并保存 + if not os.path.exists(bar_file_path): + print(f'文件{bar_file_path}不存在,开始时间:{start_date}') + start_dt = datetime.strptime(start_date, '%Y%m%d') + download_symbol(symbol, start_dt, bar_file_path, interval_type, interval_num) + continue - # 获取到时间,变成那天的开始时间,下载数据 - start_dt = last_dt.replace(hour=0, minute=0, second=0, microsecond=0) - print(f'文件{bar_file_path}存在,最后时间:{last_dt}, 调整数据获取开始时间:{start_dt}') - req = HistoryRequest( - symbol=symbol, - exchange=Exchange(contract_info.get('exchange')), - interval=Interval.MINUTE, - start=start_dt - ) + # 如果存在文件,获取最后的bar时间 + last_dt = get_csv_last_dt(bar_file_path) - bars = future_data.get_bars(req=req, return_dict=True) - if len(bars) <= 0: - print(f'下载{symbol} 1分钟数据为空白') - continue + # 获取不到时间,重新下载 + if last_dt is None: + print(f'获取文件{bar_file_path}的最后时间失败,开始时间:{start_date}') + start_dt = datetime.strptime(start_date, '%Y%m%d') + download_symbol(symbol, start_dt, bar_file_path, interval_type, interval_num) + continue - bar_count = 0 + # 获取到时间,变成那天的开始时间,下载数据 + start_dt = last_dt.replace(hour=0, minute=0, second=0, microsecond=0) + print(f'文件{bar_file_path}存在,最后时间:{last_dt}, 调整数据获取开始时间:{start_dt}') + req = HistoryRequest( + symbol=symbol, + exchange=Exchange(contract_info.get('exchange')), + interval=interval_type, + interval_num=interval_num, + start=start_dt + ) - # 获取标题 - headers = [] - with open(bar_file_path, "r", encoding='utf8') as f: - reader = csv.reader(f) - for header in reader: - headers = header - break + bars = future_data.get_bars(req=req, return_dict=True) + if len(bars) <= 0: + print(f'下载{symbol} {interval_num} {interval_type.value} 数据为空白') + continue - # 写入所有大于最后bar时间的数据 - with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile: + bar_count = 0 - writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel', - extrasaction='ignore') - for bar in bars: - if bar['datetime'] <= last_dt: - continue - bar_count += 1 - writer.writerow(bar) + # 获取标题 + headers = [] + with open(bar_file_path, "r", encoding='utf8') as f: + reader = csv.reader(f) + for header in reader: + headers = header + break - print(f'更新{symbol}数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}') + # 写入所有大于最后bar时间的数据 + with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile: + + writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel', + extrasaction='ignore') + for bar in bars: + if bar['datetime'] <= last_dt: + continue + bar_count += 1 + writer.writerow(bar) + + print(f'更新{symbol}数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}') diff --git a/prod/jobs/refill_binance_spot_bars.py b/prod/jobs/refill_binance_spot_bars.py new file mode 100644 index 00000000..066b8b16 --- /dev/null +++ b/prod/jobs/refill_binance_spot_bars.py @@ -0,0 +1,131 @@ +# flake8: noqa + + +import os +import sys +import csv +import pandas as pd + +# 将repostory的目录i,作为根目录,添加到系统环境中。 +ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +if ROOT_PATH not in sys.path: + sys.path.append(ROOT_PATH) + print(f'append {ROOT_PATH} into sys.path') + +from datetime import datetime, timedelta +from vnpy.data.binance.binance_spot_data import BinanceSpotData, HistoryRequest, Exchange, Interval +from vnpy.trader.utility import get_csv_last_dt, append_data + +# 获取币安现货交易的所有合约 +spot_data = BinanceSpotData() +contracts = BinanceSpotData.load_contracts() +if len(contracts) == 0: + spot_data.save_contracts() + contracts = BinanceSpotData.load_contracts() + +# 开始下载日期 +start_date = '20170101' + +if __name__ == "__main__": + + if len(sys.argv) >= 2: + interval = str(sys.argv[1]).lower() + if interval.isdecimal(): + interval_num = int(sys.argv[1]) + interval_type = Interval.MINUTE + else: + if 'm' in interval: + interval_type = Interval.MINUTE + interval_num = int(interval.replace('m', '')) + elif 'h' in interval: + interval_type = Interval.HOUR + interval_num = int(interval.replace('h', '')) + elif 'd' in interval: + interval_type = Interval.DAILY + interval_num = int(interval.replace('d', '')) + else: + interval = '1m' + interval_num = 1 + interval_type = Interval.MINUTE + + def download_symbol(symbol, start_dt, bar_file_path, interval_type, interval_num): + req = HistoryRequest( + symbol=symbol, + exchange=Exchange(contract_info.get('exchange')), + interval=interval_type, + interval_num=interval_num, + start=start_dt + ) + + bars = spot_data.get_bars(req=req, return_dict=True) + spot_data.export_to(bars, file_name=bar_file_path) + + # 逐一合约进行下载 + for vt_symbol, contract_info in contracts.items(): + symbol = contract_info.get('symbol') + if symbol not in ['BTCUSDT', 'ETHUSDT']: + continue + + bar_file_path = os.path.abspath(os.path.join( + ROOT_PATH, + 'bar_data', + 'binance_spot', + f'{symbol}_{start_date}_{interval}.csv')) + + # 不存在文件,直接下载,并保存 + if not os.path.exists(bar_file_path): + print(f'文件{bar_file_path}不存在,开始时间:{start_date}') + start_dt = datetime.strptime(start_date, '%Y%m%d') + download_symbol(symbol, start_dt, bar_file_path, interval_type, interval_num) + continue + + # 如果存在文件,获取最后的bar时间 + last_dt = get_csv_last_dt(bar_file_path) + + # 获取不到时间,重新下载 + if last_dt is None: + print(f'获取文件{bar_file_path}的最后时间失败,开始时间:{start_date}') + start_dt = datetime.strptime(start_date, '%Y%m%d') + download_symbol(symbol, start_dt, bar_file_path, interval_type, interval_num) + continue + + # 获取到时间,变成那天的开始时间,下载数据 + start_dt = last_dt.replace(hour=0, minute=0, second=0, microsecond=0) + print(f'文件{bar_file_path}存在,最后时间:{last_dt}, 调整数据获取开始时间:{start_dt}') + req = HistoryRequest( + symbol=symbol, + exchange=Exchange(contract_info.get('exchange')), + interval=interval_type, + interval_num=interval_num, + start=start_dt + ) + + bars = spot_data.get_bars(req=req, return_dict=True) + if len(bars) <= 0: + print(f'下载{symbol} {interval_num} {interval_type.value} 数据为空白') + continue + + bar_count = 0 + + # 获取标题 + headers = [] + with open(bar_file_path, "r", encoding='utf8') as f: + reader = csv.reader(f) + for header in reader: + headers = header + break + + # 写入所有大于最后bar时间的数据 + with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile: + + writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel', + extrasaction='ignore') + for bar in bars: + if bar['datetime'] <= last_dt: + continue + bar_count += 1 + writer.writerow(bar) + + print(f'更新{symbol}数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}') + + diff --git a/prod/jobs/refill_future_renko_all.sh b/prod/jobs/refill_future_renko_all.sh new file mode 100644 index 00000000..b71cfba3 --- /dev/null +++ b/prod/jobs/refill_future_renko_all.sh @@ -0,0 +1,81 @@ +#!/bin/bash + +CONDA_HOME=~/anaconda3 + +############ Added by Huang Jianwei at 2018-04-03 +# To solve the problem about Javascript runtime +export PATH=$PATH:/usr/local/bin +############ Ended + +BASE_PATH=$(cd `dirname $0`; pwd) +echo $BASE_PATH +cd `dirname $0` + +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 A99 1 1>logs/refill_history_A99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 AG99 1 1>logs/refill_history_AG99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 AL99 5 1>logs/refill_history_AL99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 AP99 1 1>logs/refill_history_AP99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 AU99 0.05 1>logs/refill_history_AU99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 B99 1 1>logs/refill_history_B99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 BB99 0.05 1>logs/refill_history_BB99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 BU99 2 1>logs/refill_history_BU99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 C99 1 1>logs/refill_history_C99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 CF99 5 1>logs/refill_history_CF99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 CJ99 5 1>logs/refill_history_CJ99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 CS99 1 1>logs/refill_history_CS99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 CU99 10 1>logs/refill_history_CU99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 CY99 5 1>logs/refill_history_CY99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 EG99 1 1>logs/refill_history_EG99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 FB99 0.05 1>logs/refill_history_FB99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 FG99 1 1>logs/refill_history_FG99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 FU99 1 1>logs/refill_history_FU99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 HC99 1 1>logs/refill_history_HC99.log 2>>logs/refill_error.log + +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 I99 0.5 1>logs/refill_history_I99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 IC99 0.2 1>logs/refill_history_IC99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 IF99 0.2 1>logs/refill_history_IF99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 IH99 0.2 1>logs/refill_history_IH99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 J99 0.5 1>logs/refill_history_J99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 JD99 1 1>logs/refill_history_JD99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 JM99 0.5 1>logs/refill_history_JM99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 JR99 1 1>logs/refill_history_JR99.log 2>>logs/refill_error.log + +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 L99 5 1>logs/refill_history_L99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 LR99 1 1>logs/refill_history_LR99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 M99 1 1>logs/refill_history_M99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 MA99 1 1>logs/refill_history_MA99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 NI99 10.0 1>logs/refill_history_NI99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 NR99 5 1>logs/refill_history_NR99.log 2>>logs/refill_error.log + +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 OI99 1 1>logs/refill_history_OI99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 P99 2 1>logs/refill_history_P99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 PB99 5 1>logs/refill_history_PB99.log 2>>logs/refill_error.log +#$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 PM99 1 1>logs/refill_history_PM99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 PP99 1 1>logs/refill_history_PP99.log 2>>logs/refill_error.log + +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 RB99 1 1>logs/refill_history_RB99.log 2>>logs/refill_error.log +#$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 RI99 1 1>logs/refill_history_RI99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 RM99 1 1>logs/refill_history_RM99.log 2>>logs/refill_error.log +#$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 RR99 1 1>logs/refill_history_RR99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 RS99 1 1>logs/refill_history_RS99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 RU99 5 1>logs/refill_history_RU99.log 2>>logs/refill_error.log + +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 SC99 0.1 1>logs/refill_history_SC99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 SF99 2 1>logs/refill_history_SF99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 SM99 2 1>logs/refill_history_SM99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 SN99 10.0 1>logs/refill_history_SN99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 SP99 2 1>logs/refill_history_SP99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 SS99 5 1>logs/refill_history_SS99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 SR99 1 1>logs/refill_history_SR99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 T99 0.005 1>logs/refill_history_T99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 TA99 2 1>logs/refill_history_TA99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 TF99 0.005 1>logs/refill_history_TF99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 TS99 0.005 1>logs/refill_history_TS99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 UR99 1 1>logs/refill_history_UR99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 V99 5 1>logs/refill_history_V99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 WH99 1 1>logs/refill_history_WH99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 WR99 1 1>logs/refill_history_WR99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 Y99 2 1>logs/refill_history_Y99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 ZC99 0.2 1>logs/refill_history_ZC99.log 2>>logs/refill_error.log +$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 ZN99 5 1>logs/refill_history_ZN99.log 2>>logs/refill_error.log + diff --git a/prod/jobs/refill_rqdata_stock_bars.py b/prod/jobs/refill_rqdata_stock_bars.py new file mode 100644 index 00000000..154994b5 --- /dev/null +++ b/prod/jobs/refill_rqdata_stock_bars.py @@ -0,0 +1,98 @@ +# flake8: noqa +""" +下载rqdata股票合约1分钟bar => vnpy项目目录/bar_data/ +""" +import os +import sys +import json +from collections import OrderedDict +import pandas as pd + +vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +if vnpy_root not in sys.path: + sys.path.append(vnpy_root) + +os.environ["VNPY_TESTING"] = "1" + +from vnpy.data.tdx.tdx_stock_data import * +from vnpy.trader.utility import load_json +from vnpy.trader.rqdata import RqdataClient +# 保存的1分钟指数 bar目录 +bar_data_folder = os.path.abspath(os.path.join(vnpy_root, 'bar_data')) + +# 开始日期(每年大概需要几分钟) +start_date = '20160101' + +# 创建API对象 +api_01 = TdxStockData() +rq = RqdataClient() + +rq.init(username='incenselee@hotmail.com', password='123456') + +# 更新本地合约缓存信息 +stock_list = load_json('stock_list.json') + +symbol_dict = api_01.symbol_dict + +# 逐一指数合约下载并更新 +for stock_code in stock_list: + market_id = get_tdx_market_code(stock_code) + if market_id == 0: + exchange_name = '深交所' + exchange = Exchange.SZSE + else: + exchange_name = '上交所' + exchange = Exchange.SSE + + symbol_info = symbol_dict.get(f'{stock_code}_{market_id}') + stock_name = symbol_info.get('name') + print(f'开始更新:{exchange_name}/{stock_name}, 代码:{stock_code}') + bar_file_folder = os.path.abspath(os.path.join(bar_data_folder, f'{exchange.value}')) + if not os.path.exists(bar_file_folder): + os.makedirs(bar_file_folder) + # csv数据文件名 + bar_file_path = os.path.abspath(os.path.join(bar_file_folder, f'{stock_code}_{start_date}_1m.csv')) + + # 如果文件存在, + if os.path.exists(bar_file_path): + df_old = pd.read_csv(bar_file_path, index_col=0) + df_old = df_old.rename(lambda x: pd.to_datetime(x, format="%Y-%m-%d %H:%M:%S")) + # 取最后一条时间 + last_dt = df_old.index[-1] + start_dt = last_dt - timedelta(days=1) + print(f'文件{bar_file_path}存在,最后时间:{start_date}') + else: + df_old = None + start_dt = datetime.strptime(start_date, '%Y%m%d') + print(f'文件{bar_file_path}不存在,开始时间:{start_date}') + + result, bars = api_01.get_bars(symbol=stock_code, + period='1min', + callback=None, + start_dt=start_dt, + return_bar=False) + # [dict] => dataframe + if not result or len(bars) == 0: + continue + df_extern = pd.DataFrame(bars) + df_extern.set_index('datetime', inplace=True) + + if df_old is not None: + # 扩展数据 + print('扩展数据') + data_df = pd.concat([df_old, df_extern], axis=0) + else: + data_df = df_extern + + # 数据按时间戳去重 + print('按时间戳去重') + data_df = data_df[~data_df.index.duplicated(keep='first')] + # 排序 + data_df = data_df.sort_index() + # print(data_df.head()) + print(data_df.tail()) + data_df.to_csv(bar_file_path, index=True) + print(f'更新{stock_name} {stock_code}数据 => 文件{bar_file_path}') + +print('更新完毕') +os._exit(0) diff --git a/prod/jobs/refill_tdx_cb_stock_bars.py b/prod/jobs/refill_tdx_cb_stock_bars.py new file mode 100644 index 00000000..70b69e72 --- /dev/null +++ b/prod/jobs/refill_tdx_cb_stock_bars.py @@ -0,0 +1,112 @@ +# flake8: noqa +""" +下载通达信可转债1分钟bar => vnpy项目目录/bar_data/ +上海股票 => SSE子目录 +深圳股票 => SZSE子目录 +""" +import os +import sys +import csv +import json +from collections import OrderedDict +import pandas as pd + +vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +if vnpy_root not in sys.path: + sys.path.append(vnpy_root) + +os.environ["VNPY_TESTING"] = "1" + +from vnpy.data.tdx.tdx_stock_data import * +from vnpy.trader.utility import load_json +from vnpy.trader.utility import get_csv_last_dt + +# 保存的1分钟指数 bar目录 +bar_data_folder = os.path.abspath(os.path.join(vnpy_root, 'bar_data')) + +# 开始日期(每年大概需要几分钟) +start_date = '20160101' + +# 创建API对象 +api_01 = TdxStockData() + +symbol_dict = api_01.symbol_dict + +# 逐一指数合约下载并更新 +for k, symbol_info in symbol_dict.items(): + stock_code, market_id = k.split('_') + + if market_id == '0': + exchange_name = '深交所' + exchange = Exchange.SZSE + else: + exchange_name = '上交所' + exchange = Exchange.SSE + if symbol_info.get('stock_type') != 'cb_cn': + continue + + stock_name = symbol_info.get('name') + print(f'开始更新:{exchange_name}/{stock_name}, 代码:{stock_code}') + bar_file_folder = os.path.abspath(os.path.join(bar_data_folder, f'{exchange.value}')) + if not os.path.exists(bar_file_folder): + os.makedirs(bar_file_folder) + # csv数据文件名 + bar_file_path = os.path.abspath(os.path.join(bar_file_folder, f'{stock_code}_{start_date}_1m.csv')) + + # 如果文件存在, + if os.path.exists(bar_file_path): + # 取最后一条时间 + last_dt = get_csv_last_dt(bar_file_path) + else: + last_dt = None + + if last_dt: + start_dt = last_dt - timedelta(days=1) + print(f'文件{bar_file_path}存在,最后时间:{start_date}') + else: + start_dt = datetime.strptime(start_date, '%Y%m%d') + print(f'文件{bar_file_path}不存在,或读取最后记录错误,开始时间:{start_date}') + + result, bars = api_01.get_bars(symbol=stock_code, + period='1min', + callback=None, + start_dt=start_dt, + return_bar=False) + # [dict] => dataframe + if not result or len(bars) == 0: + continue + if last_dt is None: + data_df = pd.DataFrame(bars) + data_df.set_index('datetime', inplace=True) + data_df = data_df.sort_index() + # print(data_df.head()) + print(data_df.tail()) + data_df.to_csv(bar_file_path, index=True) + print(f'首次更新{stock_code} {stock_name}数据 => 文件{bar_file_path}') + continue + + # 获取标题 + headers = [] + with open(bar_file_path, "r", encoding='utf8') as f: + reader = csv.reader(f) + for header in reader: + headers = header + break + + bar_count = 0 + # 写入所有大于最后bar时间的数据 + with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile: + + writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel', + extrasaction='ignore') + for bar in bars: + if bar['datetime'] <= last_dt: + continue + bar_count += 1 + writer.writerow(bar) + + print(f'更新{stock_code} {stock_name} 数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}') + + +print('更新完毕') +os._exit(0) diff --git a/prod/jobs/refill_tdx_future_bars.py b/prod/jobs/refill_tdx_future_bars.py index 1d72563d..f8e3ab4e 100644 --- a/prod/jobs/refill_tdx_future_bars.py +++ b/prod/jobs/refill_tdx_future_bars.py @@ -18,81 +18,92 @@ os.environ["VNPY_TESTING"] = "1" from vnpy.data.tdx.tdx_future_data import * from vnpy.trader.utility import get_csv_last_dt -# 保存的1分钟指数 bar目录 -bar_data_folder = os.path.abspath(os.path.join(vnpy_root, 'bar_data')) -# 开始日期(每年大概需要几分钟) -start_date = '20160101' +if __name__ == "__main__": -# 创建API对象 -api_01 = TdxFutureData() - -# 更新本地合约缓存信息 -api_01.update_mi_contracts() - -# 逐一指数合约下载并更新 -for underlying_symbol in api_01.future_contracts.keys(): - index_symbol = underlying_symbol + '99' - print(f'开始更新:{index_symbol}') - # csv数据文件名 - bar_file_path = os.path.abspath(os.path.join(bar_data_folder, 'tdx', f'{underlying_symbol}99_{start_date}_1m.csv')) - - # 如果文件存在, - if os.path.exists(bar_file_path): - - #df_old = pd.read_csv(bar_file_path, index_col=0) - #df_old = df_old.rename(lambda x: pd.to_datetime(x, format="%Y-%m-%d %H:%M:%S")) - # 取最后一条时间 - last_dt = get_csv_last_dt(bar_file_path) - start_dt = last_dt - timedelta(days=1) - print(f'文件{bar_file_path}存在,最后时间:{start_date}') + if len(sys.argv) > 1: + filter_underlying_symbols = [s.upper() for s in sys.argv[1:]] else: - last_dt = None - start_dt = datetime.strptime(start_date, '%Y%m%d') - print(f'文件{bar_file_path}不存在,开始时间:{start_date}') + filter_underlying_symbols = [] - result, bars = api_01.get_bars(symbol=index_symbol, - period='1min', - callback=None, - start_dt=start_dt, - return_bar=False) - # [dict] => dataframe - if not result or len(bars) == 0: - continue + # 保存的1分钟指数 bar目录 + bar_data_folder = os.path.abspath(os.path.join(vnpy_root, 'bar_data')) - if last_dt is None: - data_df = pd.DataFrame(bars) - data_df.set_index('datetime', inplace=True) - data_df = data_df.sort_index() - # print(data_df.head()) - print(data_df.tail()) - data_df.to_csv(bar_file_path, index=True) - print(f'首次更新{index_symbol} 数据 => 文件{bar_file_path}') - continue + # 开始日期(每年大概需要几分钟) + start_date = '20160101' + + # 创建API对象 + api_01 = TdxFutureData() + + # 更新本地合约缓存信息 + api_01.update_mi_contracts() + + # 逐一指数合约下载并更新 + for underlying_symbol in api_01.future_contracts.keys(): + if len(filter_underlying_symbols) > 0 and underlying_symbol not in filter_underlying_symbols: + continue + + index_symbol = underlying_symbol + '99' + print(f'开始更新:{index_symbol}') + # csv数据文件名 + bar_file_path = os.path.abspath(os.path.join(bar_data_folder, 'tdx', f'{underlying_symbol}99_{start_date}_1m.csv')) + + # 如果文件存在, + if os.path.exists(bar_file_path): + + #df_old = pd.read_csv(bar_file_path, index_col=0) + #df_old = df_old.rename(lambda x: pd.to_datetime(x, format="%Y-%m-%d %H:%M:%S")) + # 取最后一条时间 + last_dt = get_csv_last_dt(bar_file_path) + start_dt = last_dt - timedelta(days=1) + print(f'文件{bar_file_path}存在,最后时间:{start_date}') + else: + last_dt = None + start_dt = datetime.strptime(start_date, '%Y%m%d') + print(f'文件{bar_file_path}不存在,开始时间:{start_date}') + + result, bars = api_01.get_bars(symbol=index_symbol, + period='1min', + callback=None, + start_dt=start_dt, + return_bar=False) + # [dict] => dataframe + if not result or len(bars) == 0: + continue + + if last_dt is None: + data_df = pd.DataFrame(bars) + data_df.set_index('datetime', inplace=True) + data_df = data_df.sort_index() + # print(data_df.head()) + print(data_df.tail()) + data_df.to_csv(bar_file_path, index=True) + print(f'首次更新{index_symbol} 数据 => 文件{bar_file_path}') + continue - # 获取标题 - headers = [] - with open(bar_file_path, "r", encoding='utf8') as f: - reader = csv.reader(f) - for header in reader: - headers = header - break + # 获取标题 + headers = [] + with open(bar_file_path, "r", encoding='utf8') as f: + reader = csv.reader(f) + for header in reader: + headers = header + break - bar_count = 0 - # 写入所有大于最后bar时间的数据 - with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile: + bar_count = 0 + # 写入所有大于最后bar时间的数据 + with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile: - writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel', - extrasaction='ignore') - for bar in bars: - if bar['datetime'] <= last_dt: - continue - bar_count += 1 - writer.writerow(bar) + writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel', + extrasaction='ignore') + for bar in bars: + if bar['datetime'] <= last_dt: + continue + bar_count += 1 + writer.writerow(bar) - print(f'更新{index_symbol} 数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}') + print(f'更新{index_symbol} 数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}') -print('更新完毕') -os._exit(0) + print('更新完毕') + os._exit(0) diff --git a/prod/jobs/refill_tdx_stock_bars.py b/prod/jobs/refill_tdx_stock_bars.py index 689ddb14..2aa20c85 100644 --- a/prod/jobs/refill_tdx_stock_bars.py +++ b/prod/jobs/refill_tdx_stock_bars.py @@ -1,6 +1,6 @@ # flake8: noqa """ -下载通达信指数合约1分钟bar => vnpy项目目录/bar_data/ +下载通达信股票合约1分钟bar => vnpy项目目录/bar_data/ 上海股票 => SSE子目录 深圳股票 => SZSE子目录 """ diff --git a/prod/jobs/refill_tq_future_ticks.py b/prod/jobs/refill_tq_future_ticks.py index 35b9fbd0..be27cc5d 100644 --- a/prod/jobs/refill_tq_future_ticks.py +++ b/prod/jobs/refill_tq_future_ticks.py @@ -42,6 +42,8 @@ if __name__ == "__main__": download_tasks = {} begin_date = datetime.strptime(args.begin, '%Y%m%d') end_date = datetime.strptime(args.end, '%Y%m%d') + if end_date > datetime.now(): + end_date = datetime.now() n_days = (end_date - begin_date).days future_contracts = get_future_contracts() @@ -56,7 +58,7 @@ if __name__ == "__main__": if n_days <= 0: n_days = 1 - for n in range(n_days): + for n in range(n_days+1): download_date = begin_date + timedelta(days=n) if download_date.isoweekday() in [6, 7]: continue @@ -71,10 +73,19 @@ if __name__ == "__main__": "{}_{}.csv".format(symbol, download_date.strftime('%Y%m%d')))) zip_file = os.path.abspath(os.path.join(save_folder, "{}_{}.pkb2".format(symbol, download_date.strftime('%Y%m%d')))) + if os.path.exists(save_file): - continue + # 文件size是否大于1024字节 + if os.path.getsize(save_file) > 1024: + # 获取最后的tick时间 + dt = get_csv_last_dt(file_name=save_file, dt_format='%Y-%m-%d %H:%M:%S.%f') + # 判断是否为交易日最后 + if dt and dt.hour == 14 and dt.minute == 59: + continue + if os.path.exists(zip_file): - continue + if os.path.getsize(save_file) > 1024: + continue # 下载从 2018-05-01凌晨0点 到 2018-06-01凌晨0点 的 T1809 盘口Tick数据 download_tasks["{}_{}_tick".format(symbol, download_date.strftime('%Y%m%d'))] = DataDownloader( diff --git a/prod/jobs/remove_expired_logs.py b/prod/jobs/remove_expired_logs.py new file mode 100644 index 00000000..33a29f96 --- /dev/null +++ b/prod/jobs/remove_expired_logs.py @@ -0,0 +1,52 @@ +# flake8: noqa +""" +移除过期日志文件 +""" +import os +import sys +from datetime import datetime, timedelta + +vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +if vnpy_root not in sys.path: + sys.path.append(vnpy_root) + +os.environ["VNPY_TESTING"] = "1" + +if __name__ == "__main__": + + if len(sys.argv) < 2: + print(f'请输入{vnpy_root}下检查目录,例如 prod/account01', file=sys.stderr) + exit() + print(sys.argv) + + keep_days = 4 + + if len(sys.argv) == 3: + keep_days = int(sys.argv[2]) + print(f'保留最近{keep_days}日数据') + + log_path = os.path.abspath(os.path.join(vnpy_root, sys.argv[1], 'log')) + if not os.path.exists(log_path): + print(f'{log_path}不存在', file=sys.stderr) + exit() + print(f'开始检查{log_path}下的日志文件') + + dt_now = datetime.now() + + # 匹配日期 + delete_dates = [] + for n in range(keep_days, 30, 1): + delete_date = dt_now - timedelta(days=n) + delete_dates.append(delete_date.strftime('%Y-%m-%d')) + delete_dates.append(delete_date.strftime('%Y%m%d')) + + # 移除匹配日期 + for dirpath, dirnames, filenames in os.walk(str(log_path)): + + for file_name in filenames: + + for k in delete_dates: + if k in file_name: + file_path = os.path.abspath(os.path.join(dirpath, file_name)) + print(f'移除{file_path}') + os.remove(file_path) diff --git a/prod/jobs/remove_expired_logs.sh b/prod/jobs/remove_expired_logs.sh new file mode 100644 index 00000000..eda01ba6 --- /dev/null +++ b/prod/jobs/remove_expired_logs.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +CONDA_HOME=~/anaconda3 +#$CONDA_HOME/bin/conda deactivate +#$CONDA_HOME/bin/conda activate py37 + +############ Added by Huang Jianwei at 2018-04-03 +# To solve the problem about Javascript runtime +export PATH=$PATH:/usr/local/bin +############ Ended + +BASE_PATH=$(cd `dirname $0`; pwd) +echo $BASE_PATH +cd `dirname $0` +PROGRAM_NAME=./remove_expired_logs.py + +# 移除三天前的所有日志 +$CONDA_HOME/envs/py37/bin/python $PROGRAM_NAME prod/binance01 3 +$CONDA_HOME/envs/py37/bin/python $PROGRAM_NAME prod/binance02 3 +$CONDA_HOME/envs/py37/bin/python $PROGRAM_NAME prod/binance03 3 +$CONDA_HOME/envs/py37/bin/python $PROGRAM_NAME prod/fund01 3 + + + diff --git a/prod/jobs/remove_future_renko.py b/prod/jobs/remove_future_renko.py new file mode 100644 index 00000000..dd7ac030 --- /dev/null +++ b/prod/jobs/remove_future_renko.py @@ -0,0 +1,24 @@ +# encoding: UTF-8 + +import sys, os, copy, csv, signal + +vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) + +sys.path.append(vnpy_root) + +from vnpy.data.mongo.mongo_data import MongoData + +mongo_data = MongoData(host='127.0.0.1', port=27017) + +db_name = 'FutureRenko' +collections = mongo_data.get_collections(db_name=db_name) + +for collection_name in collections: + + #if collection_name != 'AU99_K10': + # continue + + filter = {'date': {'$gt': "2020-04-20"}} + print(f'removing {collection_name} with filter: {filter}') + mongo_data.db_delete(db_name=db_name, col_name=collection_name, flt= filter) + diff --git a/vnpy/app/cta_strategy_pro/engine.py b/vnpy/app/cta_strategy_pro/engine.py index 2eb47c7e..5908bd62 100644 --- a/vnpy/app/cta_strategy_pro/engine.py +++ b/vnpy/app/cta_strategy_pro/engine.py @@ -1814,6 +1814,8 @@ class CtaEngine(BaseEngine): symbol, exchange = extract_vt_symbol(vt_symbol) self.main_engine.subscribe(req=SubscribeRequest(symbol=symbol, exchange=exchange), gateway_name=gateway_name) + self.write_log(f'{vt_symbol}无最新tick,订阅行情') + if volume > 0 and tick: contract = self.main_engine.get_contract(vt_symbol) req = OrderRequest( diff --git a/vnpy/app/cta_strategy_pro/template.py b/vnpy/app/cta_strategy_pro/template.py index 95c210cd..03d3e604 100644 --- a/vnpy/app/cta_strategy_pro/template.py +++ b/vnpy/app/cta_strategy_pro/template.py @@ -575,7 +575,7 @@ class CtaProTemplate(CtaTemplate): """ idx_symbol = None # 指数合约 - + exchange = Exchange.LOCAL price_tick = 1 # 商品的最小价格跳动 symbol_size = 10 # 商品得合约乘数 margin_rate = 0.1 # 商品的保证金 @@ -637,10 +637,10 @@ class CtaProTemplate(CtaTemplate): for name in self.parameters: if name in setting: setattr(self, name, setting[name]) - + symbol, self.exchange = extract_vt_symbol(self.vt_symbol) if self.idx_symbol is None: - symbol, exchange = extract_vt_symbol(self.vt_symbol) - self.idx_symbol = get_underlying_symbol(symbol).upper() + '99.' + exchange.value + self.idx_symbol = get_underlying_symbol(symbol).upper() + '99.' + self.exchange.value + self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=self.idx_symbol) if self.vt_symbol != self.idx_symbol: @@ -1816,6 +1816,7 @@ class CtaProFutureTemplate(CtaProTemplate): vt_symbol=sell_symbol, order_type=self.order_type, order_time=self.cur_datetime, + lock=self.exchange==Exchange.CFFEX, grid=grid) if len(vt_orderids) == 0: self.write_error(u'多单平仓委托失败') @@ -1916,6 +1917,7 @@ class CtaProFutureTemplate(CtaProTemplate): vt_symbol=cover_symbol, order_type=self.order_type, order_time=self.cur_datetime, + lock=self.exchange==Exchange.CFFEX, grid=grid) if len(vt_orderids) == 0: self.write_error(u'空单平仓委托失败') diff --git a/vnpy/app/cta_strategy_pro/template_spread.py b/vnpy/app/cta_strategy_pro/template_spread.py index 81c45d43..111e097a 100644 --- a/vnpy/app/cta_strategy_pro/template_spread.py +++ b/vnpy/app/cta_strategy_pro/template_spread.py @@ -229,11 +229,11 @@ class CtaSpreadTemplate(CtaTemplate): continue act_vt_symbol = grid.snapshot.get('act_vt_symbol', self.act_vt_symbol) - act_open_volume = grid.snapshot.get('act_open_volume', grid.volume) + act_open_volume = grid.snapshot.get('act_open_volume', grid.volume * self.act_vol_ratio) act_open_price = grid.snapshot.get('act_open_price') pas_vt_symbol = grid.snapshot.get('pas_vt_symbol', self.pas_vt_symbol) - pas_open_volume = grid.snapshot.get('pas_open_volume', grid.volume) + pas_open_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio) pas_open_price = grid.snapshot.get('pas_open_price') if act_vt_symbol != self.act_vt_symbol: pos_symbols.add(act_vt_symbol) @@ -269,11 +269,11 @@ class CtaSpreadTemplate(CtaTemplate): if not grid.open_status: continue act_vt_symbol = grid.snapshot.get('act_vt_symbol', self.act_vt_symbol) - act_open_volume = grid.snapshot.get('act_open_volume', grid.volume) + act_open_volume = grid.snapshot.get('act_open_volume', grid.volume * self.act_vol_ratio) act_open_price = grid.snapshot.get('act_open_price') pas_vt_symbol = grid.snapshot.get('pas_vt_symbol', self.pas_vt_symbol) - pas_open_volume = grid.snapshot.get('pas_open_volume', grid.volume) + pas_open_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio) pas_open_price = grid.snapshot.get('pas_open_price') if act_vt_symbol != self.act_vt_symbol: @@ -318,11 +318,11 @@ class CtaSpreadTemplate(CtaTemplate): for grid in self.gt.get_opened_grids(direction=Direction.LONG): act_vt_symbol = grid.snapshot.get('act_vt_symbol', self.act_vt_symbol) - act_open_volume = grid.snapshot.get('act_open_volume', grid.volume) + act_open_volume = grid.snapshot.get('act_open_volume', grid.volume * self.act_vol_ratio) act_open_price = grid.snapshot.get('act_open_price') pas_vt_symbol = grid.snapshot.get('pas_vt_symbol', self.pas_vt_symbol) - pas_open_volume = grid.snapshot.get('pas_open_volume', grid.volume) + pas_open_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio) pas_open_price = grid.snapshot.get('pas_open_price') pos_list.append({'vt_symbol': act_vt_symbol, @@ -335,14 +335,13 @@ class CtaSpreadTemplate(CtaTemplate): 'volume': pas_open_volume, 'price': pas_open_price}) - for grid in self.gt.get_opened_grids(direction=Direction.SHORT): act_vt_symbol = grid.snapshot.get('act_vt_symbol', self.act_vt_symbol) - act_open_volume = grid.snapshot.get('act_open_volume', grid.volume) + act_open_volume = grid.snapshot.get('act_open_volume', grid.volume * self.act_vol_ratio) act_open_price = grid.snapshot.get('act_open_price') pas_vt_symbol = grid.snapshot.get('pas_vt_symbol', self.pas_vt_symbol) - pas_open_volume = grid.snapshot.get('pas_open_volume', grid.volume) + pas_open_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio) pas_open_price = grid.snapshot.get('pas_open_price') pos_list.append({'vt_symbol': act_vt_symbol, @@ -530,8 +529,8 @@ class CtaSpreadTemplate(CtaTemplate): if trade.offset == Offset.OPEN: # 更新开仓均价/数量 if trade.vt_symbol == self.act_vt_symbol: - opened_price = grid.snapshot.get('act_open_price', 0) - opened_volume = grid.snapshot.get('act_open_volume', 0) + opened_price = grid.snapshot.get('act_open_price', grid.volume * self.act_vol_ratio) + opened_volume = grid.snapshot.get('act_open_volume', grid.volume * self.act_vol_ratio) act_open_volume = opened_volume + trade.volume act_open_price = (opened_price * opened_volume + trade.price * trade.volume) / act_open_volume @@ -543,7 +542,7 @@ class CtaSpreadTemplate(CtaTemplate): elif trade.vt_symbol == self.pas_vt_symbol: opened_price = grid.snapshot.get('pas_open_price', 0) - opened_volume = grid.snapshot.get('pas_open_volume', 0) + opened_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio) pas_open_volume = opened_volume + trade.volume pas_open_price = (opened_price * opened_volume + trade.price * trade.volume) / pas_open_volume @@ -557,7 +556,7 @@ class CtaSpreadTemplate(CtaTemplate): # 更新平仓均价/数量 if trade.vt_symbol == self.act_vt_symbol: closed_price = grid.snapshot.get('act_close_price', 0) - closed_volume = grid.snapshot.get('act_close_volume', 0) + closed_volume = grid.snapshot.get('act_close_volume', grid.volume * self.act_vol_ratio) act_close_volume = closed_volume + trade.volume act_close_price = (closed_price * closed_volume + trade.price * trade.volume) / act_close_volume @@ -569,7 +568,7 @@ class CtaSpreadTemplate(CtaTemplate): elif trade.vt_symbol == self.pas_vt_symbol: closed_price = grid.snapshot.get('pas_close_price', 0) - closed_volume = grid.snapshot.get('pas_close_volume', 0) + closed_volume = grid.snapshot.get('pas_close_volume', grid.volume * self.pas_vol_ratio) pas_open_volume = closed_volume + trade.volume pas_open_price = (closed_price * closed_volume + trade.price * trade.volume) / pas_open_volume @@ -580,6 +579,7 @@ class CtaSpreadTemplate(CtaTemplate): 'pas_vt_symbol': self.pas_vt_symbol}) self.gt.save() + def on_order_all_traded(self, order: OrderData): """ 订单全部成交 @@ -631,6 +631,7 @@ class CtaSpreadTemplate(CtaTemplate): # 在策略得活动订单中,移除 self.active_orders.pop(order.vt_orderid, None) + self.gt.save() def on_order_open_canceled(self, order: OrderData): """ @@ -687,6 +688,7 @@ class CtaSpreadTemplate(CtaTemplate): if grid: if order.vt_orderid in grid.order_ids: + self.write_log(f'移除grid中order_ids:{order.vt_orderid}') grid.order_ids.remove(order.vt_orderid) # 网格的所有委托单已经执行完毕 @@ -736,6 +738,11 @@ class CtaSpreadTemplate(CtaTemplate): info = self.active_orders.get(vt_orderid, None) info.update({'retry': order_retry}) + if grid: + if order.vt_orderid in grid.order_ids: + self.write_log(f'移除grid中order_ids:{order.vt_orderid}') + grid.order_ids.remove(order.vt_orderid) + self.gt.save() # 删除旧的委托记录 self.write_log(u'移除旧的委托记录:{}'.format(order.vt_orderid)) @@ -774,6 +781,11 @@ class CtaSpreadTemplate(CtaTemplate): info = self.active_orders.get(vt_orderid, None) info.update({'retry': order_retry}) + if grid: + if order.vt_orderid in grid.order_ids: + self.write_log(f'移除grid中order_ids:{order.vt_orderid}') + grid.order_ids.remove(order.vt_orderid) + self.gt.save() # 删除旧的委托记录 self.write_log(u'移除旧的委托记录:{}'.format(order.vt_orderid)) @@ -784,6 +796,7 @@ class CtaSpreadTemplate(CtaTemplate): self.write_log(u'委托单状态:{}=>{}'.format(pre_status, old_order.get('status'))) if grid: if order.vt_orderid in grid.order_ids: + self.write_log(f'移除grid中order_ids:{order.vt_orderid}') grid.order_ids.remove(order.vt_orderid) if not grid.order_ids: @@ -835,6 +848,7 @@ class CtaSpreadTemplate(CtaTemplate): self.send_wechat(msg) if grid: if order.vt_orderid in grid.order_ids: + self.write_log(f'移除grid中order_ids:{order.vt_orderid}') grid.order_ids.remove(order.vt_orderid) if not grid.order_ids: grid.order_status = False @@ -876,7 +890,10 @@ class CtaSpreadTemplate(CtaTemplate): for vt_orderid in vt_orderids: info = self.active_orders.get(vt_orderid) info.update({'retry': order_retry}) - + if grid: + if order.vt_orderid in grid.order_ids: + self.write_log(f'移除grid中order_ids:{order.vt_orderid}') + grid.order_ids.remove(order.vt_orderid) self.gt.save() self.write_log(u'移除活动订单:{}'.format(order.vt_orderid)) self.active_orders.pop(order.vt_orderid, None) @@ -911,7 +928,10 @@ class CtaSpreadTemplate(CtaTemplate): for vt_orderid in vt_orderids: info = self.active_orders.get(vt_orderid) info.update({'retry': order_retry}) - + if grid: + if order.vt_orderid in grid.order_ids: + self.write_log(f'移除grid中order_ids:{order.vt_orderid}') + grid.order_ids.remove(order.vt_orderid) self.gt.save() self.write_log(u'移除活动订单:{}'.format(order.vt_orderid)) @@ -923,6 +943,7 @@ class CtaSpreadTemplate(CtaTemplate): self.write_log(u'委托单状态:{}=>{}'.format(pre_status, old_order.get('status'))) if grid: if order.vt_orderid in grid.order_ids: + self.write_log(f'移除grid中order_ids:{order.vt_orderid}') grid.order_ids.remove(order.vt_orderid) if len(grid.order_ids) == 0: grid.order_status = False @@ -1181,7 +1202,8 @@ class CtaSpreadTemplate(CtaTemplate): self.write_error(f'spd_short,{self.pas_vt_symbol}开多仓{grid.volume * self.pas_vol_ratio}手失败,' f'委托价:{self.cur_pas_tick.ask_price_1}') return [] - + grid.snapshot.update({"act_vt_symbol": self.act_vt_symbol, "act_open_volume": grid.volume * self.act_vol_ratio, + "pas_vt_symbol": self.pas_vt_symbol, "pas_open_volume": grid.volume * self.pas_vol_ratio}) grid.order_status = True grid.order_datetime = self.cur_datetime @@ -1242,7 +1264,8 @@ class CtaSpreadTemplate(CtaTemplate): self.write_error(f'spd_short,{self.pas_vt_symbol}开空仓{grid.volume * self.pas_vol_ratio}手失败,' f'委托价:{self.cur_pas_tick.bid_price_1}') return [] - + grid.snapshot.update({"act_vt_symbol": self.act_vt_symbol, "act_open_volume": grid.volume * self.act_vol_ratio, + "pas_vt_symbol": self.pas_vt_symbol, "pas_open_volume": grid.volume * self.pas_vol_ratio}) grid.order_status = True grid.order_datetime = self.cur_datetime vt_orderids = act_vt_orderids.extend(pas_vt_orderids) @@ -1270,26 +1293,40 @@ class CtaSpreadTemplate(CtaTemplate): self.write_log(u'实际价差{}不满足:{}'.format(self.cur_spd_tick.bid_price_1, grid.close_price)) return [] + # 获取账号持仓 self.act_pos = self.cta_engine.get_position_holding(vt_symbol=self.act_vt_symbol) self.pas_pos = self.cta_engine.get_position_holding(vt_symbol=self.pas_vt_symbol) - if not all([self.act_pos, self.pas_pos]): - self.write_error('主动腿/被动退得持仓数据不存在') + self.write_error('主动腿/被动腿的账号持仓数据不存在') return [] - act_close_volume = grid.snapshot.get('act_open_volume') - pas_close_volume = grid.snapshot.get('pas_open_volume') + # 获取需要平仓的主动腿、被动腿volume + act_close_volume = grid.snapshot.get('act_open_volume', grid.volume * self.act_vol_ratio) + pas_close_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio) + # 检查账号持仓是否满足平仓目标 if self.act_pos.long_pos < act_close_volume: self.write_error(f'账号 {self.act_vt_symbol} 多单持仓{self.act_pos.long_pos}' f'今仓{self.act_pos.long_td}/昨{self.act_pos.long_yd}, 不满足{act_close_volume}') return [] - if self.pas_pos.short_pos < pas_close_volume: self.write_error(f'账号 {self.pas_vt_symbol} 多单持仓{self.pas_pos.short_pos}' f'今仓{self.pas_pos.short_td}/昨{self.pas_pos.short_yd}, 不满足{act_close_volume}') return [] + # 被动腿空单平仓 + pas_vt_orderids = self.cover(vt_symbol=self.pas_vt_symbol, + lock=self.pas_exchange == Exchange.CFFEX, + price=self.cur_pas_tick.ask_price_1, + volume=grid.volume * self.pas_vol_ratio, + order_type=self.order_type, + order_time=self.cur_datetime, + grid=grid) + if not pas_vt_orderids: + self.write_error(f'spd_sell,{self.pas_vt_symbol}空单平仓{grid.volume * self.pas_vol_ratio}手失败,' + f'委托价:{self.cur_pas_tick.ask_price_1}') + return [] + # 主动腿多单平仓 act_vt_orderids = self.sell(vt_symbol=self.act_vt_symbol, lock=self.act_exchange==Exchange.CFFEX, @@ -1303,19 +1340,6 @@ class CtaSpreadTemplate(CtaTemplate): f'委托价:{self.cur_act_tick.bid_price_1}') return [] - # 被动腿空单平仓 - pas_vt_orderids = self.cover(vt_symbol=self.pas_vt_symbol, - lock=self.pas_exchange==Exchange.CFFEX, - price=self.cur_pas_tick.ask_price_1, - volume=grid.volume * self.pas_vol_ratio, - order_type=self.order_type, - order_time=self.cur_datetime, - grid=grid) - if not pas_vt_orderids: - self.write_error(f'spd_sell,{self.pas_vt_symbol}空单平仓{grid.volume * self.pas_vol_ratio}手失败,' - f'委托价:{self.cur_pas_tick.ask_price_1}') - return [] - grid.order_status = True grid.order_datetime = self.cur_datetime vt_orderids = act_vt_orderids.extend(pas_vt_orderids) @@ -1343,6 +1367,7 @@ class CtaSpreadTemplate(CtaTemplate): self.write_log(u'实际价差{}不满足:{}'.format(self.cur_spd_tick.ask_price_1, grid.close_price)) return [] + # 获取账号内主动腿和被动腿的持仓 self.act_pos = self.cta_engine.get_position_holding(vt_symbol=self.act_vt_symbol) self.pas_pos = self.cta_engine.get_position_holding(vt_symbol=self.pas_vt_symbol) @@ -1350,19 +1375,31 @@ class CtaSpreadTemplate(CtaTemplate): self.write_error('主动腿/被动退得持仓数据不存在') return [] - act_close_volume = grid.snapshot.get('act_open_volume', 0) - pas_close_volume = grid.snapshot.get('pas_open_volume', 0) - + # 检查主动腿、被动腿,是否满足 + act_close_volume = grid.snapshot.get('act_open_volume', grid.volume * self.act_vol_ratio) + pas_close_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio) if self.act_pos.short_pos < act_close_volume: self.write_error(f'账号 {self.act_vt_symbol} 空单持仓{self.act_pos.short_pos}' f'今仓{self.act_pos.short_td}/昨{self.act_pos.short_yd}, 不满足{act_close_volume}') return [] - if self.pas_pos.long_pos < pas_close_volume: self.write_error(f'账号 {self.pas_vt_symbol} 多单持仓{self.pas_pos.longt_pos}' f'今仓{self.pas_pos.long_td}/昨{self.pas_pos.long_yd}, 不满足{act_close_volume}') return [] + # 被动腿多单平仓 + pas_vt_orderids = self.sell(vt_symbol=self.pas_vt_symbol, + lock=self.pas_exchange == Exchange.CFFEX, + price=self.cur_pas_tick.bid_price_1, + volume=grid.volume * self.pas_vol_ratio, + order_type=self.order_type, + order_time=self.cur_datetime, + grid=grid) + if not pas_vt_orderids: + self.write_error(f'spd_cover,{self.pas_vt_symbol}多单平仓{grid.volume * self.pas_vol_ratio}手失败,' + f'委托价:{self.cur_pas_tick.bid_price_1}') + return [] + # 主动腿空单平仓 act_vt_orderids = self.cover(vt_symbol=self.act_vt_symbol, lock=self.act_exchange==Exchange.CFFEX, @@ -1376,19 +1413,6 @@ class CtaSpreadTemplate(CtaTemplate): f'委托价:{self.cur_act_tick.ask_price_1}') return [] - # 被动腿多单平仓 - pas_vt_orderids = self.sell(vt_symbol=self.pas_vt_symbol, - lock=self.pas_exchange==Exchange.CFFEX, - price=self.cur_pas_tick.bid_price_1, - volume=grid.volume * self.pas_vol_ratio, - order_type=self.order_type, - order_time=self.cur_datetime, - grid=grid) - if not pas_vt_orderids: - self.write_error(f'spd_cover,{self.pas_vt_symbol}多单平仓{grid.volume * self.pas_vol_ratio}手失败,' - f'委托价:{self.cur_pas_tick.bid_price_1}') - return [] - grid.order_status = True grid.order_datetime = self.cur_datetime vt_orderids = act_vt_orderids.extend(pas_vt_orderids) diff --git a/vnpy/gateway/binance/binance_gateway.py b/vnpy/gateway/binance/binance_gateway.py index 600ec66f..e15c8cfd 100644 --- a/vnpy/gateway/binance/binance_gateway.py +++ b/vnpy/gateway/binance/binance_gateway.py @@ -162,7 +162,7 @@ class BinanceGateway(BaseGateway): self.status.update({'con': True}) self.count += 1 - if self.count < 2: + if self.count < 5: return self.count = 0 if len(self.query_functions) > 0: diff --git a/vnpy/gateway/pb/pb_gateway.py b/vnpy/gateway/pb/pb_gateway.py index 02631f1f..4f206375 100644 --- a/vnpy/gateway/pb/pb_gateway.py +++ b/vnpy/gateway/pb/pb_gateway.py @@ -474,12 +474,13 @@ class PbMdApi(object): {'ip': "124.160.88.183", 'port': 7709}, {'ip': "60.12.136.250", 'port': 7709}, {'ip': "218.108.98.244", 'port': 7709}, - {'ip': "218.108.47.69", 'port': 7709}, + #{'ip': "218.108.47.69", 'port': 7709}, {'ip': "114.80.63.12", 'port': 7709}, {'ip': "114.80.63.35", 'port': 7709}, {'ip': "180.153.39.51", 'port': 7709}, - {'ip': '14.215.128.18', 'port': 7709}, - {'ip': '59.173.18.140', 'port': 7709}] + #{'ip': '14.215.128.18', 'port': 7709}, + #{'ip': '59.173.18.140', 'port': 7709} + ] self.best_ip = {'ip': None, 'port': None} self.api_dict = {} # API 的连接会话对象字典 @@ -688,7 +689,12 @@ class PbMdApi(object): exchange = info.get('exchange', '') if len(exchange) == 0: continue + + vn_exchange_str = get_stock_exchange(symbol) + if exchange != vn_exchange_str: + continue exchange = Exchange(exchange) + if info['stock_type'] == 'stock_cn': product = Product.EQUITY elif info['stock_type'] in ['bond_cn', 'cb_cn']: @@ -715,16 +721,18 @@ class PbMdApi(object): min_volume=volume_tick, margin_rate=1 ) - # 缓存 合约 =》 中文名 - symbol_name_map.update({contract.symbol: contract.name}) - # 缓存代码和交易所的印射关系 - symbol_exchange_map[contract.symbol] = contract.exchange + if product!= Product.INDEX: + # 缓存 合约 =》 中文名 + symbol_name_map.update({contract.symbol: contract.name}) - self.contract_dict.update({contract.symbol: contract}) - self.contract_dict.update({contract.vt_symbol: contract}) - # 推送 - self.gateway.on_contract(contract) + # 缓存代码和交易所的印射关系 + symbol_exchange_map[contract.symbol] = contract.exchange + + self.contract_dict.update({contract.symbol: contract}) + self.contract_dict.update({contract.vt_symbol: contract}) + # 推送 + self.gateway.on_contract(contract) def get_stock_list(self): """股票所有的code&name列表""" @@ -1605,7 +1613,7 @@ class PbTdApi(object): continue def check_send_order_dbf(self): - """检查更新委托文件csv""" + """检查更新委托文件dbf""" dbf_file = os.path.abspath(os.path.join(self.order_folder, '{}{}.dbf'.format(PB_FILE_NAMES.get('send_order'), self.trading_date))) @@ -1639,7 +1647,7 @@ class PbTdApi(object): err_msg = err_msg.decode('gbk') if len(err_msg) == 0 or record.wtsbdm == 0: - self.gateway.write_log(f'收到失败标准,又没有失败原因:{print_dict(record.__dict__)}') + self.gateway.write_log(f'收到失败,又没有失败原因') continue err_id = str(getattr(record, 'wtsbdm', '')).strip() @@ -2056,6 +2064,9 @@ class TqMdApi(): def connect(self, setting = {}): """""" + if self.api and self.is_connected: + self.gateway.write_log(f'天勤行情已经接入,无需重新连接') + return try: from tqsdk import TqApi self.api = TqApi(_stock=True)