[增强功能] 支持账号级别股指对锁。更新系列jobs脚本

This commit is contained in:
msincenselee 2020-08-07 15:17:41 +08:00
parent 17da760de4
commit b71d0ad919
20 changed files with 1112 additions and 297 deletions

View File

@ -0,0 +1,106 @@
# flake8: noqa
"""
更新主力合约
"""
import os
import sys
import json
from collections import OrderedDict
import pandas as pd
vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
if vnpy_root not in sys.path:
sys.path.append(vnpy_root)
os.environ["VNPY_TESTING"] = "1"
from vnpy.data.tdx.tdx_future_data import *
from vnpy.trader.util_wechat import send_wx_msg
from vnpy.trader.utility import load_json, save_json
if __name__ == "__main__":
if len(sys.argv) < 2:
print(f'请输入{vnpy_root}下检查目录,例如 prod/account01', file=sys.stderr)
exit()
print(sys.argv)
for account_folder in sys.argv[1:]:
cta_path = os.path.abspath(os.path.join(vnpy_root, account_folder))
if not os.path.exists(cta_path):
print(f'{cta_path}不存在', file=sys.stderr)
exit()
print(f'开始检查{cta_path}下的策略运行配置文件')
account_name = account_folder.split('/')[-1]
# 创建API对象
api_01 = TdxFutureData()
# 更新本地合约缓存信息
api_01.update_mi_contracts()
setting_file_path = os.path.abspath(os.path.join(cta_path, 'cta_strategy_pro_setting.json'))
settings = load_json(setting_file_path, auto_save=False)
if len(settings) == 0:
print('无策略配置')
os._exit(0)
changed = False
for strategy_name, setting in settings.items():
vt_symbol = setting.get('vt_symbol')
if not vt_symbol:
print(f'{strategy_name}配置中无vt_symbol', file=sys.stderr)
continue
if '.' in vt_symbol:
symbol, exchange = vt_symbol.split('.')
else:
symbol = vt_symbol
exchange = None
if exchange == Exchange.SPD:
print(f"暂不处理自定义套利合约{vt_symbol}")
continue
full_symbol = get_full_symbol(symbol).upper()
underlying_symbol = get_underlying_symbol(symbol).upper()
contract_info = api_01.future_contracts.get(underlying_symbol)
if not contract_info:
print(f'{account_name}主力合约配置中,找不到{underlying_symbol}', file=sys.stderr)
continue
if 'mi_symbol' not in contract_info or 'exchange' not in contract_info or 'full_symbol' not in contract_info:
print(f'{account_name}主力合约配置中找不到mi_symbol/exchange/full_symbol. {contract_info}', file=sys.stderr)
continue
new_mi_symbol = contract_info.get('mi_symbol')
new_exchange = contract_info.get('exchange')
new_vt_symbol = '.'.join([new_mi_symbol, new_exchange])
new_full_symbol = contract_info.get('full_symbol', '').upper()
if full_symbol >= new_full_symbol:
print(f'{account_name}策略配置:长合约{full_symbol} 主力长合约{new_full_symbol},不更新')
continue
if exchange:
if len(vt_symbol) != len(new_vt_symbol):
print(f'{account_name}配置中,合约{vt_symbol}{new_vt_symbol} 长度不匹配,不更新', file=sys.stderr)
continue
else:
if len(symbol) != len(new_mi_symbol):
print(f'{account_name}配置中,合约{vt_symbol}{new_mi_symbol} 长度不匹配,不更新', file=sys.stderr)
continue
setting.update({'vt_symbol': new_vt_symbol})
send_wx_msg(f'{account_name}{strategy_name} 主力合约更换:{vt_symbol} => {new_vt_symbol} ')
changed = True
if changed:
save_json(setting_file_path, settings)
print(f'保存{account_name}新配置')
print('更新完毕')
os._exit(0)

View File

@ -0,0 +1,64 @@
# flake8: noqa
# 自动导出mongodb补全期货指数合约renko bar => csv文件
# 供renko bar 批量测试使用
import sys, os, copy, csv, signal
vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
if vnpy_root not in sys.path:
print(f'append {vnpy_root} into sys.path')
sys.path.append(vnpy_root)
os.environ["VNPY_TESTING"] = "1"
from vnpy.data.renko.rebuild_future import *
if __name__ == "__main__":
if len(sys.argv) < 2:
print(f'请输入参数 host <db_name> <idx_symbol> <start_date> <end_date> \n '
f'例如: python export_future_renko_bars.py 127.0.0.1 {FUTURE_RENKO_DB_NAME} RB99')
exit()
print(sys.argv)
# Mongo host
host = sys.argv[1]
# 数据库
if len(sys.argv) >= 3:
db_name = sys.argv[2]
else:
db_name = FUTURE_RENKO_DB_NAME
# 导出指数合约
if len(sys.argv) >= 4:
idx_symbol = sys.argv[3]
else:
idx_symbol = 'all'
if len(sys.argv) >= 5:
start_date = sys.argv[4]
else:
start_date = '2016-01-01'
if len(sys.argv) >= 6:
end_date = sys.argv[6]
else:
end_date = '2099-01-01'
setting = {
"host": host,
"db_name": FUTURE_RENKO_DB_NAME,
"cache_folder": os.path.join(vnpy_root, 'tick_data', 'tdx', 'future')
}
builder = FutureRenkoRebuilder(setting)
if idx_symbol.upper() == 'ALL':
print(u'导出所有合约')
csv_folder = os.path.abspath(os.path.join(vnpy_root, 'bar_data', 'future_renko'))
builder.export_all(start_date, end_date, csv_folder)
else:
for height in [3, 5, 10, 'K3', 'K5', 'K10']:
csv_file = os.path.abspath(os.path.join(vnpy_root, 'bar_data', 'future_renko_{}_{}_{}_{}.csv'
.format(idx_symbol, height, start_date.replace('-', ''),
end_date.replace('-', ''))))
builder.export(symbol=idx_symbol, height=height, start_date=start_date, end_date=end_date,
csv_file=csv_file)

View File

@ -0,0 +1,14 @@
#!/bin/bash
CONDA_HOME=~/anaconda3
BASE_PATH=$(cd `dirname $0`; pwd)
echo $BASE_PATH
cd `dirname $0`
PROGRAM_NAME=./export_future_renko_bars.py
# 定时 mongodb => future_renko bars
$CONDA_HOME/envs/py37/bin/python $PROGRAM_NAME 127.0.0.1 FutureRenko

View File

@ -19,7 +19,7 @@ import baostock as bs
from vnpy.trader.constant import Exchange from vnpy.trader.constant import Exchange
from vnpy.data.tdx.tdx_common import get_tdx_market_code from vnpy.data.tdx.tdx_common import get_tdx_market_code
from vnpy.trader.utility import load_json, get_csv_last_dt from vnpy.trader.utility import load_json, get_csv_last_dt, extract_vt_symbol
from vnpy.data.stock.stock_base import get_stock_base from vnpy.data.stock.stock_base import get_stock_base
# 保存的1分钟指数 bar目录 # 保存的1分钟指数 bar目录
bar_data_folder = os.path.abspath(os.path.join(vnpy_root, 'bar_data')) bar_data_folder = os.path.abspath(os.path.join(vnpy_root, 'bar_data'))
@ -27,111 +27,135 @@ bar_data_folder = os.path.abspath(os.path.join(vnpy_root, 'bar_data'))
# 开始日期(每年大概需要几分钟) # 开始日期(每年大概需要几分钟)
start_date = '20060101' start_date = '20060101'
# 证券宝连接 if __name__ == "__main__":
login_msg = bs.login()
if login_msg.error_code != '0':
print(f'证券宝登录错误代码:{login_msg.error_code}, 错误信息:{login_msg.error_msg}')
# 更新本地合约缓存信息 # 证券宝连接
stock_list = load_json('stock_list.json') login_msg = bs.login()
if login_msg.error_code != '0':
print(f'证券宝登录错误代码:{login_msg.error_code}, 错误信息:{login_msg.error_msg}')
symbol_dict = get_stock_base() symbol_dict = get_stock_base()
if len(sys.argv) >= 2 and sys.argv[1].lower() == 'all':
day_fields = "date,code,open,high,low,close,preclose,volume,amount,adjustflag,turn,tradestatus,pctChg,isST" stock_list = list(symbol_dict.keys())
min_fields = "date,time,code,open,high,low,close,volume,amount,adjustflag" print('使用全量股票,共{}'.format(len(stock_list)))
# 逐一股票下载并更新
for stock_code in stock_list:
market_id = get_tdx_market_code(stock_code)
if market_id == 0:
exchange_name = '深交所'
exchange = Exchange.SZSE
exchange_code = 'sz'
else: else:
exchange_name = '上交所' # 更新本地合约缓存信息
exchange = Exchange.SSE stock_list = load_json('stock_list.json')
exchange_code = 'sh' print('读取本地stock_list.json文件{}'.format(len(stock_list)))
symbol_info = symbol_dict.get(f'{stock_code}.{exchange.value}')
stock_name = symbol_info.get('name')
print(f'开始更新:{exchange_name}/{stock_name}, 代码:{stock_code}')
bar_file_folder = os.path.abspath(os.path.join(bar_data_folder, f'{exchange.value}'))
if not os.path.exists(bar_file_folder):
os.makedirs(bar_file_folder)
# csv数据文件名
bar_file_path = os.path.abspath(os.path.join(bar_file_folder, f'{stock_code}_{start_date}_5m.csv'))
# 如果文件存在, day_fields = "date,code,open,high,low,close,preclose,volume,amount,adjustflag,turn,tradestatus,pctChg,isST"
if os.path.exists(bar_file_path): min_fields = "date,time,code,open,high,low,close,volume,amount,adjustflag"
# df_old = pd.read_csv(bar_file_path, index_col=0)
# df_old = df_old.rename(lambda x: pd.to_datetime(x, format="%Y-%m-%d %H:%M:%S"))
# 取最后一条时间
# last_dt = df_old.index[-1]
last_dt = get_csv_last_dt(bar_file_path)
start_dt = last_dt - timedelta(days=1)
print(f'文件{bar_file_path}存在,最后时间:{start_date}')
else:
last_dt = None
start_dt = datetime.strptime(start_date, '%Y%m%d')
print(f'文件{bar_file_path}不存在,开始时间:{start_date}')
rs = bs.query_history_k_data_plus( count = 0
code=f'{exchange_code}.{stock_code}', # 逐一股票下载并更新
fields=min_fields, for stock_code in stock_list:
start_date=start_dt.strftime('%Y-%m-%d'), end_date=datetime.now().strftime('%Y-%m-%d'), count += 1
frequency="5", print('下载进度:{}%'.format(round(count* 100/len(stock_list), 4)))
adjustflag="3" if '.' not in stock_code:
) market_id = get_tdx_market_code(stock_code)
if rs.error_code != '0': if market_id == 0:
print(f'证券宝获取沪深A股历史K线数据错误代码:{rs.error_code}, 错误信息:{rs.error_msg}') exchange_name = '深交所'
continue exchange = Exchange.SZSE
exchange_code = 'sz'
else:
exchange_name = '上交所'
exchange = Exchange.SSE
exchange_code = 'sh'
symbol = stock_code
vt_symbol = f'{stock_code}.{exchange.value}'
else:
vt_symbol = stock_code
symbol, exchange = extract_vt_symbol(vt_symbol)
if exchange == Exchange.SSE:
exchange_name = '上交所'
exchange_code = 'sh'
else:
exchange_name = '深交所'
exchange_code = 'sz'
# [dict] => dataframe symbol_info = symbol_dict.get(vt_symbol)
bars = [] if symbol_info['类型'] == '指数':
while (rs.error_code == '0') and rs.next():
row = rs.get_row_data()
dt = datetime.strptime(row[1], '%Y%m%d%H%M%S%f')
if last_dt and last_dt > dt:
continue continue
bar = { stock_name = symbol_info.get('name')
'datetime': dt, print(f'开始更新:{exchange_name}/{stock_name}, 代码:{symbol}')
'open': float(row[3]), bar_file_folder = os.path.abspath(os.path.join(bar_data_folder, f'{exchange.value}'))
'close': float(row[6]), if not os.path.exists(bar_file_folder):
'high': float(row[4]), os.makedirs(bar_file_folder)
'low': float(row[5]), # csv数据文件名
'volume': float(row[7]), bar_file_path = os.path.abspath(os.path.join(bar_file_folder, f'{symbol}_5m.csv'))
'amount': float(row[8]),
'symbol': stock_code,
'trading_date': row[0],
'date': row[0],
'time': dt.strftime('%H:%M:%S')
}
bars.append(bar)
# 获取标题 # 如果文件存在,
if len(bars) == 0: if os.path.exists(bar_file_path):
continue # df_old = pd.read_csv(bar_file_path, index_col=0)
# df_old = df_old.rename(lambda x: pd.to_datetime(x, format="%Y-%m-%d %H:%M:%S"))
# 取最后一条时间
# last_dt = df_old.index[-1]
last_dt = get_csv_last_dt(bar_file_path)
start_dt = last_dt - timedelta(days=1)
print(f'文件{bar_file_path}存在,最后时间:{start_dt}')
else:
last_dt = None
start_dt = datetime.strptime(start_date, '%Y%m%d')
print(f'文件{bar_file_path}不存在,开始时间:{start_dt}')
headers = list(bars[0].keys()) rs = bs.query_history_k_data_plus(
if headers[0] != 'datetime': code=f'{exchange_code}.{symbol}',
headers.remove('datetime') fields=min_fields,
headers.insert(0, 'datetime') start_date=start_dt.strftime('%Y-%m-%d'), end_date=datetime.now().strftime('%Y-%m-%d'),
frequency="5",
adjustflag="3"
)
if rs.error_code != '0':
print(f'证券宝获取沪深A股历史K线数据错误代码:{rs.error_code}, 错误信息:{rs.error_msg}')
continue
bar_count = 0 # [dict] => dataframe
# 写入所有大于最后bar时间的数据 bars = []
with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile: while (rs.error_code == '0') and rs.next():
row = rs.get_row_data()
dt = datetime.strptime(row[1], '%Y%m%d%H%M%S%f')
if last_dt and last_dt > dt:
continue
bar = {
'datetime': dt,
'open': float(row[3]),
'close': float(row[6]),
'high': float(row[4]),
'low': float(row[5]),
'volume': float(row[7]),
'amount': float(row[8]),
'symbol': symbol,
'trading_date': row[0],
'date': row[0],
'time': dt.strftime('%H:%M:%S')
}
bars.append(bar)
writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel', # 获取标题
extrasaction='ignore') if len(bars) == 0:
if last_dt is None: continue
writer.writeheader()
for bar in bars:
bar_count += 1
writer.writerow(bar)
print(f'更新{stock_code}数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}') headers = list(bars[0].keys())
if headers[0] != 'datetime':
headers.remove('datetime')
headers.insert(0, 'datetime')
bar_count = 0
# 写入所有大于最后bar时间的数据
with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile:
writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel',
extrasaction='ignore')
if last_dt is None:
writer.writeheader()
for bar in bars:
bar_count += 1
writer.writerow(bar)
print(f'更新{vt_symbol}数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}')
print('更新完毕') print('更新完毕')
bs.logout() bs.logout()
os._exit(0) os._exit(0)

View File

@ -26,80 +26,104 @@ if len(contracts) == 0:
# 开始下载日期 # 开始下载日期
start_date = '20190101' start_date = '20190101'
def download_symbol(symbol, start_dt, bar_file_path): if __name__ == "__main__":
req = HistoryRequest(
symbol=symbol,
exchange=Exchange(contract_info.get('exchange')),
interval=Interval.MINUTE,
start=start_dt
)
bars = future_data.get_bars(req=req, return_dict=True) if len(sys.argv) >= 2:
future_data.export_to(bars, file_name=bar_file_path) interval = str(sys.argv[1]).lower()
if interval.isdecimal():
interval_num = int(sys.argv[1])
interval_type = Interval.MINUTE
else:
if 'm' in interval:
interval_type = Interval.MINUTE
interval_num = int(interval.replace('m', ''))
elif 'h' in interval:
interval_type = Interval.HOUR
interval_num = int(interval.replace('h', ''))
elif 'd' in interval:
interval_type = Interval.DAILY
interval_num = int(interval.replace('d', ''))
else:
interval = '1m'
interval_num = 1
interval_type = Interval.MINUTE
# 逐一合约进行下载 def download_symbol(symbol, start_dt, bar_file_path, interval_type, interval_num):
for vt_symbol, contract_info in contracts.items(): req = HistoryRequest(
symbol = contract_info.get('symbol') symbol=symbol,
exchange=Exchange(contract_info.get('exchange')),
interval=interval_type,
interval_num=interval_num,
start=start_dt
)
bar_file_path = os.path.abspath(os.path.join( bars = future_data.get_bars(req=req, return_dict=True)
ROOT_PATH, future_data.export_to(bars, file_name=bar_file_path)
'bar_data',
'binance',
f'{symbol}_{start_date}_1m.csv'))
# 不存在文件,直接下载,并保存 # 逐一合约进行下载
if not os.path.exists(bar_file_path): for vt_symbol, contract_info in contracts.items():
print(f'文件{bar_file_path}不存在,开始时间:{start_date}') symbol = contract_info.get('symbol')
start_dt = datetime.strptime(start_date, '%Y%m%d')
download_symbol(symbol, start_dt, bar_file_path)
continue
# 如果存在文件获取最后的bar时间 bar_file_path = os.path.abspath(os.path.join(
last_dt = get_csv_last_dt(bar_file_path) ROOT_PATH,
'bar_data',
'binance',
f'{symbol}_{start_date}_{interval}.csv'))
# 获取不到时间,重新下载 # 不存在文件,直接下载,并保存
if last_dt is None: if not os.path.exists(bar_file_path):
print(f'获取文件{bar_file_path}的最后时间失败,开始时间:{start_date}') print(f'文件{bar_file_path}不存在,开始时间:{start_date}')
start_dt = datetime.strptime(start_date, '%Y%m%d') start_dt = datetime.strptime(start_date, '%Y%m%d')
download_symbol(symbol, start_dt, bar_file_path) download_symbol(symbol, start_dt, bar_file_path, interval_type, interval_num)
continue continue
# 获取到时间,变成那天的开始时间,下载数据 # 如果存在文件获取最后的bar时间
start_dt = last_dt.replace(hour=0, minute=0, second=0, microsecond=0) last_dt = get_csv_last_dt(bar_file_path)
print(f'文件{bar_file_path}存在,最后时间:{last_dt}, 调整数据获取开始时间:{start_dt}')
req = HistoryRequest(
symbol=symbol,
exchange=Exchange(contract_info.get('exchange')),
interval=Interval.MINUTE,
start=start_dt
)
bars = future_data.get_bars(req=req, return_dict=True) # 获取不到时间,重新下载
if len(bars) <= 0: if last_dt is None:
print(f'下载{symbol} 1分钟数据为空白') print(f'获取文件{bar_file_path}的最后时间失败,开始时间:{start_date}')
continue start_dt = datetime.strptime(start_date, '%Y%m%d')
download_symbol(symbol, start_dt, bar_file_path, interval_type, interval_num)
continue
bar_count = 0 # 获取到时间,变成那天的开始时间,下载数据
start_dt = last_dt.replace(hour=0, minute=0, second=0, microsecond=0)
print(f'文件{bar_file_path}存在,最后时间:{last_dt}, 调整数据获取开始时间:{start_dt}')
req = HistoryRequest(
symbol=symbol,
exchange=Exchange(contract_info.get('exchange')),
interval=interval_type,
interval_num=interval_num,
start=start_dt
)
# 获取标题 bars = future_data.get_bars(req=req, return_dict=True)
headers = [] if len(bars) <= 0:
with open(bar_file_path, "r", encoding='utf8') as f: print(f'下载{symbol} {interval_num} {interval_type.value} 数据为空白')
reader = csv.reader(f) continue
for header in reader:
headers = header
break
# 写入所有大于最后bar时间的数据 bar_count = 0
with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile:
writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel', # 获取标题
extrasaction='ignore') headers = []
for bar in bars: with open(bar_file_path, "r", encoding='utf8') as f:
if bar['datetime'] <= last_dt: reader = csv.reader(f)
continue for header in reader:
bar_count += 1 headers = header
writer.writerow(bar) break
print(f'更新{symbol}数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}') # 写入所有大于最后bar时间的数据
with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile:
writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel',
extrasaction='ignore')
for bar in bars:
if bar['datetime'] <= last_dt:
continue
bar_count += 1
writer.writerow(bar)
print(f'更新{symbol}数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}')

View File

@ -0,0 +1,131 @@
# flake8: noqa
import os
import sys
import csv
import pandas as pd
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
if ROOT_PATH not in sys.path:
sys.path.append(ROOT_PATH)
print(f'append {ROOT_PATH} into sys.path')
from datetime import datetime, timedelta
from vnpy.data.binance.binance_spot_data import BinanceSpotData, HistoryRequest, Exchange, Interval
from vnpy.trader.utility import get_csv_last_dt, append_data
# 获取币安现货交易的所有合约
spot_data = BinanceSpotData()
contracts = BinanceSpotData.load_contracts()
if len(contracts) == 0:
spot_data.save_contracts()
contracts = BinanceSpotData.load_contracts()
# 开始下载日期
start_date = '20170101'
if __name__ == "__main__":
if len(sys.argv) >= 2:
interval = str(sys.argv[1]).lower()
if interval.isdecimal():
interval_num = int(sys.argv[1])
interval_type = Interval.MINUTE
else:
if 'm' in interval:
interval_type = Interval.MINUTE
interval_num = int(interval.replace('m', ''))
elif 'h' in interval:
interval_type = Interval.HOUR
interval_num = int(interval.replace('h', ''))
elif 'd' in interval:
interval_type = Interval.DAILY
interval_num = int(interval.replace('d', ''))
else:
interval = '1m'
interval_num = 1
interval_type = Interval.MINUTE
def download_symbol(symbol, start_dt, bar_file_path, interval_type, interval_num):
req = HistoryRequest(
symbol=symbol,
exchange=Exchange(contract_info.get('exchange')),
interval=interval_type,
interval_num=interval_num,
start=start_dt
)
bars = spot_data.get_bars(req=req, return_dict=True)
spot_data.export_to(bars, file_name=bar_file_path)
# 逐一合约进行下载
for vt_symbol, contract_info in contracts.items():
symbol = contract_info.get('symbol')
if symbol not in ['BTCUSDT', 'ETHUSDT']:
continue
bar_file_path = os.path.abspath(os.path.join(
ROOT_PATH,
'bar_data',
'binance_spot',
f'{symbol}_{start_date}_{interval}.csv'))
# 不存在文件,直接下载,并保存
if not os.path.exists(bar_file_path):
print(f'文件{bar_file_path}不存在,开始时间:{start_date}')
start_dt = datetime.strptime(start_date, '%Y%m%d')
download_symbol(symbol, start_dt, bar_file_path, interval_type, interval_num)
continue
# 如果存在文件获取最后的bar时间
last_dt = get_csv_last_dt(bar_file_path)
# 获取不到时间,重新下载
if last_dt is None:
print(f'获取文件{bar_file_path}的最后时间失败,开始时间:{start_date}')
start_dt = datetime.strptime(start_date, '%Y%m%d')
download_symbol(symbol, start_dt, bar_file_path, interval_type, interval_num)
continue
# 获取到时间,变成那天的开始时间,下载数据
start_dt = last_dt.replace(hour=0, minute=0, second=0, microsecond=0)
print(f'文件{bar_file_path}存在,最后时间:{last_dt}, 调整数据获取开始时间:{start_dt}')
req = HistoryRequest(
symbol=symbol,
exchange=Exchange(contract_info.get('exchange')),
interval=interval_type,
interval_num=interval_num,
start=start_dt
)
bars = spot_data.get_bars(req=req, return_dict=True)
if len(bars) <= 0:
print(f'下载{symbol} {interval_num} {interval_type.value} 数据为空白')
continue
bar_count = 0
# 获取标题
headers = []
with open(bar_file_path, "r", encoding='utf8') as f:
reader = csv.reader(f)
for header in reader:
headers = header
break
# 写入所有大于最后bar时间的数据
with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile:
writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel',
extrasaction='ignore')
for bar in bars:
if bar['datetime'] <= last_dt:
continue
bar_count += 1
writer.writerow(bar)
print(f'更新{symbol}数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}')

View File

@ -0,0 +1,81 @@
#!/bin/bash
CONDA_HOME=~/anaconda3
############ Added by Huang Jianwei at 2018-04-03
# To solve the problem about Javascript runtime
export PATH=$PATH:/usr/local/bin
############ Ended
BASE_PATH=$(cd `dirname $0`; pwd)
echo $BASE_PATH
cd `dirname $0`
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 A99 1 1>logs/refill_history_A99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 AG99 1 1>logs/refill_history_AG99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 AL99 5 1>logs/refill_history_AL99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 AP99 1 1>logs/refill_history_AP99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 AU99 0.05 1>logs/refill_history_AU99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 B99 1 1>logs/refill_history_B99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 BB99 0.05 1>logs/refill_history_BB99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 BU99 2 1>logs/refill_history_BU99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 C99 1 1>logs/refill_history_C99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 CF99 5 1>logs/refill_history_CF99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 CJ99 5 1>logs/refill_history_CJ99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 CS99 1 1>logs/refill_history_CS99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 CU99 10 1>logs/refill_history_CU99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 CY99 5 1>logs/refill_history_CY99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 EG99 1 1>logs/refill_history_EG99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 FB99 0.05 1>logs/refill_history_FB99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 FG99 1 1>logs/refill_history_FG99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 FU99 1 1>logs/refill_history_FU99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 HC99 1 1>logs/refill_history_HC99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 I99 0.5 1>logs/refill_history_I99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 IC99 0.2 1>logs/refill_history_IC99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 IF99 0.2 1>logs/refill_history_IF99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 IH99 0.2 1>logs/refill_history_IH99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 J99 0.5 1>logs/refill_history_J99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 JD99 1 1>logs/refill_history_JD99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 JM99 0.5 1>logs/refill_history_JM99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 JR99 1 1>logs/refill_history_JR99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 L99 5 1>logs/refill_history_L99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 LR99 1 1>logs/refill_history_LR99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 M99 1 1>logs/refill_history_M99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 MA99 1 1>logs/refill_history_MA99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 NI99 10.0 1>logs/refill_history_NI99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 NR99 5 1>logs/refill_history_NR99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 OI99 1 1>logs/refill_history_OI99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 P99 2 1>logs/refill_history_P99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 PB99 5 1>logs/refill_history_PB99.log 2>>logs/refill_error.log
#$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 PM99 1 1>logs/refill_history_PM99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 PP99 1 1>logs/refill_history_PP99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 RB99 1 1>logs/refill_history_RB99.log 2>>logs/refill_error.log
#$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 RI99 1 1>logs/refill_history_RI99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 RM99 1 1>logs/refill_history_RM99.log 2>>logs/refill_error.log
#$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 RR99 1 1>logs/refill_history_RR99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 RS99 1 1>logs/refill_history_RS99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 RU99 5 1>logs/refill_history_RU99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 SC99 0.1 1>logs/refill_history_SC99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 SF99 2 1>logs/refill_history_SF99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 SM99 2 1>logs/refill_history_SM99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 SN99 10.0 1>logs/refill_history_SN99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 SP99 2 1>logs/refill_history_SP99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 SS99 5 1>logs/refill_history_SS99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 SR99 1 1>logs/refill_history_SR99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 T99 0.005 1>logs/refill_history_T99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 TA99 2 1>logs/refill_history_TA99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 TF99 0.005 1>logs/refill_history_TF99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 TS99 0.005 1>logs/refill_history_TS99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 UR99 1 1>logs/refill_history_UR99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 V99 5 1>logs/refill_history_V99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 WH99 1 1>logs/refill_history_WH99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 WR99 1 1>logs/refill_history_WR99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 Y99 2 1>logs/refill_history_Y99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 ZC99 0.2 1>logs/refill_history_ZC99.log 2>>logs/refill_error.log
$CONDA_HOME/envs/py37/bin/python refill_future_renko.py 127.0.0.1 ZN99 5 1>logs/refill_history_ZN99.log 2>>logs/refill_error.log

View File

@ -0,0 +1,98 @@
# flake8: noqa
"""
下载rqdata股票合约1分钟bar => vnpy项目目录/bar_data/
"""
import os
import sys
import json
from collections import OrderedDict
import pandas as pd
vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
if vnpy_root not in sys.path:
sys.path.append(vnpy_root)
os.environ["VNPY_TESTING"] = "1"
from vnpy.data.tdx.tdx_stock_data import *
from vnpy.trader.utility import load_json
from vnpy.trader.rqdata import RqdataClient
# 保存的1分钟指数 bar目录
bar_data_folder = os.path.abspath(os.path.join(vnpy_root, 'bar_data'))
# 开始日期(每年大概需要几分钟)
start_date = '20160101'
# 创建API对象
api_01 = TdxStockData()
rq = RqdataClient()
rq.init(username='incenselee@hotmail.com', password='123456')
# 更新本地合约缓存信息
stock_list = load_json('stock_list.json')
symbol_dict = api_01.symbol_dict
# 逐一指数合约下载并更新
for stock_code in stock_list:
market_id = get_tdx_market_code(stock_code)
if market_id == 0:
exchange_name = '深交所'
exchange = Exchange.SZSE
else:
exchange_name = '上交所'
exchange = Exchange.SSE
symbol_info = symbol_dict.get(f'{stock_code}_{market_id}')
stock_name = symbol_info.get('name')
print(f'开始更新:{exchange_name}/{stock_name}, 代码:{stock_code}')
bar_file_folder = os.path.abspath(os.path.join(bar_data_folder, f'{exchange.value}'))
if not os.path.exists(bar_file_folder):
os.makedirs(bar_file_folder)
# csv数据文件名
bar_file_path = os.path.abspath(os.path.join(bar_file_folder, f'{stock_code}_{start_date}_1m.csv'))
# 如果文件存在,
if os.path.exists(bar_file_path):
df_old = pd.read_csv(bar_file_path, index_col=0)
df_old = df_old.rename(lambda x: pd.to_datetime(x, format="%Y-%m-%d %H:%M:%S"))
# 取最后一条时间
last_dt = df_old.index[-1]
start_dt = last_dt - timedelta(days=1)
print(f'文件{bar_file_path}存在,最后时间:{start_date}')
else:
df_old = None
start_dt = datetime.strptime(start_date, '%Y%m%d')
print(f'文件{bar_file_path}不存在,开始时间:{start_date}')
result, bars = api_01.get_bars(symbol=stock_code,
period='1min',
callback=None,
start_dt=start_dt,
return_bar=False)
# [dict] => dataframe
if not result or len(bars) == 0:
continue
df_extern = pd.DataFrame(bars)
df_extern.set_index('datetime', inplace=True)
if df_old is not None:
# 扩展数据
print('扩展数据')
data_df = pd.concat([df_old, df_extern], axis=0)
else:
data_df = df_extern
# 数据按时间戳去重
print('按时间戳去重')
data_df = data_df[~data_df.index.duplicated(keep='first')]
# 排序
data_df = data_df.sort_index()
# print(data_df.head())
print(data_df.tail())
data_df.to_csv(bar_file_path, index=True)
print(f'更新{stock_name} {stock_code}数据 => 文件{bar_file_path}')
print('更新完毕')
os._exit(0)

View File

@ -0,0 +1,112 @@
# flake8: noqa
"""
下载通达信可转债1分钟bar => vnpy项目目录/bar_data/
上海股票 => SSE子目录
深圳股票 => SZSE子目录
"""
import os
import sys
import csv
import json
from collections import OrderedDict
import pandas as pd
vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
if vnpy_root not in sys.path:
sys.path.append(vnpy_root)
os.environ["VNPY_TESTING"] = "1"
from vnpy.data.tdx.tdx_stock_data import *
from vnpy.trader.utility import load_json
from vnpy.trader.utility import get_csv_last_dt
# 保存的1分钟指数 bar目录
bar_data_folder = os.path.abspath(os.path.join(vnpy_root, 'bar_data'))
# 开始日期(每年大概需要几分钟)
start_date = '20160101'
# 创建API对象
api_01 = TdxStockData()
symbol_dict = api_01.symbol_dict
# 逐一指数合约下载并更新
for k, symbol_info in symbol_dict.items():
stock_code, market_id = k.split('_')
if market_id == '0':
exchange_name = '深交所'
exchange = Exchange.SZSE
else:
exchange_name = '上交所'
exchange = Exchange.SSE
if symbol_info.get('stock_type') != 'cb_cn':
continue
stock_name = symbol_info.get('name')
print(f'开始更新:{exchange_name}/{stock_name}, 代码:{stock_code}')
bar_file_folder = os.path.abspath(os.path.join(bar_data_folder, f'{exchange.value}'))
if not os.path.exists(bar_file_folder):
os.makedirs(bar_file_folder)
# csv数据文件名
bar_file_path = os.path.abspath(os.path.join(bar_file_folder, f'{stock_code}_{start_date}_1m.csv'))
# 如果文件存在,
if os.path.exists(bar_file_path):
# 取最后一条时间
last_dt = get_csv_last_dt(bar_file_path)
else:
last_dt = None
if last_dt:
start_dt = last_dt - timedelta(days=1)
print(f'文件{bar_file_path}存在,最后时间:{start_date}')
else:
start_dt = datetime.strptime(start_date, '%Y%m%d')
print(f'文件{bar_file_path}不存在,或读取最后记录错误,开始时间:{start_date}')
result, bars = api_01.get_bars(symbol=stock_code,
period='1min',
callback=None,
start_dt=start_dt,
return_bar=False)
# [dict] => dataframe
if not result or len(bars) == 0:
continue
if last_dt is None:
data_df = pd.DataFrame(bars)
data_df.set_index('datetime', inplace=True)
data_df = data_df.sort_index()
# print(data_df.head())
print(data_df.tail())
data_df.to_csv(bar_file_path, index=True)
print(f'首次更新{stock_code} {stock_name}数据 => 文件{bar_file_path}')
continue
# 获取标题
headers = []
with open(bar_file_path, "r", encoding='utf8') as f:
reader = csv.reader(f)
for header in reader:
headers = header
break
bar_count = 0
# 写入所有大于最后bar时间的数据
with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile:
writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel',
extrasaction='ignore')
for bar in bars:
if bar['datetime'] <= last_dt:
continue
bar_count += 1
writer.writerow(bar)
print(f'更新{stock_code} {stock_name} 数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}')
print('更新完毕')
os._exit(0)

View File

@ -18,81 +18,92 @@ os.environ["VNPY_TESTING"] = "1"
from vnpy.data.tdx.tdx_future_data import * from vnpy.data.tdx.tdx_future_data import *
from vnpy.trader.utility import get_csv_last_dt from vnpy.trader.utility import get_csv_last_dt
# 保存的1分钟指数 bar目录
bar_data_folder = os.path.abspath(os.path.join(vnpy_root, 'bar_data'))
# 开始日期(每年大概需要几分钟) if __name__ == "__main__":
start_date = '20160101'
# 创建API对象 if len(sys.argv) > 1:
api_01 = TdxFutureData() filter_underlying_symbols = [s.upper() for s in sys.argv[1:]]
# 更新本地合约缓存信息
api_01.update_mi_contracts()
# 逐一指数合约下载并更新
for underlying_symbol in api_01.future_contracts.keys():
index_symbol = underlying_symbol + '99'
print(f'开始更新:{index_symbol}')
# csv数据文件名
bar_file_path = os.path.abspath(os.path.join(bar_data_folder, 'tdx', f'{underlying_symbol}99_{start_date}_1m.csv'))
# 如果文件存在,
if os.path.exists(bar_file_path):
#df_old = pd.read_csv(bar_file_path, index_col=0)
#df_old = df_old.rename(lambda x: pd.to_datetime(x, format="%Y-%m-%d %H:%M:%S"))
# 取最后一条时间
last_dt = get_csv_last_dt(bar_file_path)
start_dt = last_dt - timedelta(days=1)
print(f'文件{bar_file_path}存在,最后时间:{start_date}')
else: else:
last_dt = None filter_underlying_symbols = []
start_dt = datetime.strptime(start_date, '%Y%m%d')
print(f'文件{bar_file_path}不存在,开始时间:{start_date}')
result, bars = api_01.get_bars(symbol=index_symbol, # 保存的1分钟指数 bar目录
period='1min', bar_data_folder = os.path.abspath(os.path.join(vnpy_root, 'bar_data'))
callback=None,
start_dt=start_dt,
return_bar=False)
# [dict] => dataframe
if not result or len(bars) == 0:
continue
if last_dt is None: # 开始日期(每年大概需要几分钟)
data_df = pd.DataFrame(bars) start_date = '20160101'
data_df.set_index('datetime', inplace=True)
data_df = data_df.sort_index() # 创建API对象
# print(data_df.head()) api_01 = TdxFutureData()
print(data_df.tail())
data_df.to_csv(bar_file_path, index=True) # 更新本地合约缓存信息
print(f'首次更新{index_symbol} 数据 => 文件{bar_file_path}') api_01.update_mi_contracts()
continue
# 逐一指数合约下载并更新
for underlying_symbol in api_01.future_contracts.keys():
if len(filter_underlying_symbols) > 0 and underlying_symbol not in filter_underlying_symbols:
continue
index_symbol = underlying_symbol + '99'
print(f'开始更新:{index_symbol}')
# csv数据文件名
bar_file_path = os.path.abspath(os.path.join(bar_data_folder, 'tdx', f'{underlying_symbol}99_{start_date}_1m.csv'))
# 如果文件存在,
if os.path.exists(bar_file_path):
#df_old = pd.read_csv(bar_file_path, index_col=0)
#df_old = df_old.rename(lambda x: pd.to_datetime(x, format="%Y-%m-%d %H:%M:%S"))
# 取最后一条时间
last_dt = get_csv_last_dt(bar_file_path)
start_dt = last_dt - timedelta(days=1)
print(f'文件{bar_file_path}存在,最后时间:{start_date}')
else:
last_dt = None
start_dt = datetime.strptime(start_date, '%Y%m%d')
print(f'文件{bar_file_path}不存在,开始时间:{start_date}')
result, bars = api_01.get_bars(symbol=index_symbol,
period='1min',
callback=None,
start_dt=start_dt,
return_bar=False)
# [dict] => dataframe
if not result or len(bars) == 0:
continue
if last_dt is None:
data_df = pd.DataFrame(bars)
data_df.set_index('datetime', inplace=True)
data_df = data_df.sort_index()
# print(data_df.head())
print(data_df.tail())
data_df.to_csv(bar_file_path, index=True)
print(f'首次更新{index_symbol} 数据 => 文件{bar_file_path}')
continue
# 获取标题 # 获取标题
headers = [] headers = []
with open(bar_file_path, "r", encoding='utf8') as f: with open(bar_file_path, "r", encoding='utf8') as f:
reader = csv.reader(f) reader = csv.reader(f)
for header in reader: for header in reader:
headers = header headers = header
break break
bar_count = 0 bar_count = 0
# 写入所有大于最后bar时间的数据 # 写入所有大于最后bar时间的数据
with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile: with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile:
writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel', writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel',
extrasaction='ignore') extrasaction='ignore')
for bar in bars: for bar in bars:
if bar['datetime'] <= last_dt: if bar['datetime'] <= last_dt:
continue continue
bar_count += 1 bar_count += 1
writer.writerow(bar) writer.writerow(bar)
print(f'更新{index_symbol} 数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}') print(f'更新{index_symbol} 数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}')
print('更新完毕') print('更新完毕')
os._exit(0) os._exit(0)

View File

@ -1,6 +1,6 @@
# flake8: noqa # flake8: noqa
""" """
下载通达信指数合约1分钟bar => vnpy项目目录/bar_data/ 下载通达信股票合约1分钟bar => vnpy项目目录/bar_data/
上海股票 => SSE子目录 上海股票 => SSE子目录
深圳股票 => SZSE子目录 深圳股票 => SZSE子目录
""" """

View File

@ -42,6 +42,8 @@ if __name__ == "__main__":
download_tasks = {} download_tasks = {}
begin_date = datetime.strptime(args.begin, '%Y%m%d') begin_date = datetime.strptime(args.begin, '%Y%m%d')
end_date = datetime.strptime(args.end, '%Y%m%d') end_date = datetime.strptime(args.end, '%Y%m%d')
if end_date > datetime.now():
end_date = datetime.now()
n_days = (end_date - begin_date).days n_days = (end_date - begin_date).days
future_contracts = get_future_contracts() future_contracts = get_future_contracts()
@ -56,7 +58,7 @@ if __name__ == "__main__":
if n_days <= 0: if n_days <= 0:
n_days = 1 n_days = 1
for n in range(n_days): for n in range(n_days+1):
download_date = begin_date + timedelta(days=n) download_date = begin_date + timedelta(days=n)
if download_date.isoweekday() in [6, 7]: if download_date.isoweekday() in [6, 7]:
continue continue
@ -71,10 +73,19 @@ if __name__ == "__main__":
"{}_{}.csv".format(symbol, download_date.strftime('%Y%m%d')))) "{}_{}.csv".format(symbol, download_date.strftime('%Y%m%d'))))
zip_file = os.path.abspath(os.path.join(save_folder, zip_file = os.path.abspath(os.path.join(save_folder,
"{}_{}.pkb2".format(symbol, download_date.strftime('%Y%m%d')))) "{}_{}.pkb2".format(symbol, download_date.strftime('%Y%m%d'))))
if os.path.exists(save_file): if os.path.exists(save_file):
continue # 文件size是否大于1024字节
if os.path.getsize(save_file) > 1024:
# 获取最后的tick时间
dt = get_csv_last_dt(file_name=save_file, dt_format='%Y-%m-%d %H:%M:%S.%f')
# 判断是否为交易日最后
if dt and dt.hour == 14 and dt.minute == 59:
continue
if os.path.exists(zip_file): if os.path.exists(zip_file):
continue if os.path.getsize(save_file) > 1024:
continue
# 下载从 2018-05-01凌晨0点 到 2018-06-01凌晨0点 的 T1809 盘口Tick数据 # 下载从 2018-05-01凌晨0点 到 2018-06-01凌晨0点 的 T1809 盘口Tick数据
download_tasks["{}_{}_tick".format(symbol, download_date.strftime('%Y%m%d'))] = DataDownloader( download_tasks["{}_{}_tick".format(symbol, download_date.strftime('%Y%m%d'))] = DataDownloader(

View File

@ -0,0 +1,52 @@
# flake8: noqa
"""
移除过期日志文件
"""
import os
import sys
from datetime import datetime, timedelta
vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
if vnpy_root not in sys.path:
sys.path.append(vnpy_root)
os.environ["VNPY_TESTING"] = "1"
if __name__ == "__main__":
if len(sys.argv) < 2:
print(f'请输入{vnpy_root}下检查目录,例如 prod/account01', file=sys.stderr)
exit()
print(sys.argv)
keep_days = 4
if len(sys.argv) == 3:
keep_days = int(sys.argv[2])
print(f'保留最近{keep_days}日数据')
log_path = os.path.abspath(os.path.join(vnpy_root, sys.argv[1], 'log'))
if not os.path.exists(log_path):
print(f'{log_path}不存在', file=sys.stderr)
exit()
print(f'开始检查{log_path}下的日志文件')
dt_now = datetime.now()
# 匹配日期
delete_dates = []
for n in range(keep_days, 30, 1):
delete_date = dt_now - timedelta(days=n)
delete_dates.append(delete_date.strftime('%Y-%m-%d'))
delete_dates.append(delete_date.strftime('%Y%m%d'))
# 移除匹配日期
for dirpath, dirnames, filenames in os.walk(str(log_path)):
for file_name in filenames:
for k in delete_dates:
if k in file_name:
file_path = os.path.abspath(os.path.join(dirpath, file_name))
print(f'移除{file_path}')
os.remove(file_path)

View File

@ -0,0 +1,24 @@
#!/bin/bash
CONDA_HOME=~/anaconda3
#$CONDA_HOME/bin/conda deactivate
#$CONDA_HOME/bin/conda activate py37
############ Added by Huang Jianwei at 2018-04-03
# To solve the problem about Javascript runtime
export PATH=$PATH:/usr/local/bin
############ Ended
BASE_PATH=$(cd `dirname $0`; pwd)
echo $BASE_PATH
cd `dirname $0`
PROGRAM_NAME=./remove_expired_logs.py
# 移除三天前的所有日志
$CONDA_HOME/envs/py37/bin/python $PROGRAM_NAME prod/binance01 3
$CONDA_HOME/envs/py37/bin/python $PROGRAM_NAME prod/binance02 3
$CONDA_HOME/envs/py37/bin/python $PROGRAM_NAME prod/binance03 3
$CONDA_HOME/envs/py37/bin/python $PROGRAM_NAME prod/fund01 3

View File

@ -0,0 +1,24 @@
# encoding: UTF-8
import sys, os, copy, csv, signal
vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(vnpy_root)
from vnpy.data.mongo.mongo_data import MongoData
mongo_data = MongoData(host='127.0.0.1', port=27017)
db_name = 'FutureRenko'
collections = mongo_data.get_collections(db_name=db_name)
for collection_name in collections:
#if collection_name != 'AU99_K10':
# continue
filter = {'date': {'$gt': "2020-04-20"}}
print(f'removing {collection_name} with filter: {filter}')
mongo_data.db_delete(db_name=db_name, col_name=collection_name, flt= filter)

View File

@ -1814,6 +1814,8 @@ class CtaEngine(BaseEngine):
symbol, exchange = extract_vt_symbol(vt_symbol) symbol, exchange = extract_vt_symbol(vt_symbol)
self.main_engine.subscribe(req=SubscribeRequest(symbol=symbol, exchange=exchange), self.main_engine.subscribe(req=SubscribeRequest(symbol=symbol, exchange=exchange),
gateway_name=gateway_name) gateway_name=gateway_name)
self.write_log(f'{vt_symbol}无最新tick订阅行情')
if volume > 0 and tick: if volume > 0 and tick:
contract = self.main_engine.get_contract(vt_symbol) contract = self.main_engine.get_contract(vt_symbol)
req = OrderRequest( req = OrderRequest(

View File

@ -575,7 +575,7 @@ class CtaProTemplate(CtaTemplate):
""" """
idx_symbol = None # 指数合约 idx_symbol = None # 指数合约
exchange = Exchange.LOCAL
price_tick = 1 # 商品的最小价格跳动 price_tick = 1 # 商品的最小价格跳动
symbol_size = 10 # 商品得合约乘数 symbol_size = 10 # 商品得合约乘数
margin_rate = 0.1 # 商品的保证金 margin_rate = 0.1 # 商品的保证金
@ -637,10 +637,10 @@ class CtaProTemplate(CtaTemplate):
for name in self.parameters: for name in self.parameters:
if name in setting: if name in setting:
setattr(self, name, setting[name]) setattr(self, name, setting[name])
symbol, self.exchange = extract_vt_symbol(self.vt_symbol)
if self.idx_symbol is None: if self.idx_symbol is None:
symbol, exchange = extract_vt_symbol(self.vt_symbol) self.idx_symbol = get_underlying_symbol(symbol).upper() + '99.' + self.exchange.value
self.idx_symbol = get_underlying_symbol(symbol).upper() + '99.' + exchange.value
self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=self.idx_symbol) self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=self.idx_symbol)
if self.vt_symbol != self.idx_symbol: if self.vt_symbol != self.idx_symbol:
@ -1816,6 +1816,7 @@ class CtaProFutureTemplate(CtaProTemplate):
vt_symbol=sell_symbol, vt_symbol=sell_symbol,
order_type=self.order_type, order_type=self.order_type,
order_time=self.cur_datetime, order_time=self.cur_datetime,
lock=self.exchange==Exchange.CFFEX,
grid=grid) grid=grid)
if len(vt_orderids) == 0: if len(vt_orderids) == 0:
self.write_error(u'多单平仓委托失败') self.write_error(u'多单平仓委托失败')
@ -1916,6 +1917,7 @@ class CtaProFutureTemplate(CtaProTemplate):
vt_symbol=cover_symbol, vt_symbol=cover_symbol,
order_type=self.order_type, order_type=self.order_type,
order_time=self.cur_datetime, order_time=self.cur_datetime,
lock=self.exchange==Exchange.CFFEX,
grid=grid) grid=grid)
if len(vt_orderids) == 0: if len(vt_orderids) == 0:
self.write_error(u'空单平仓委托失败') self.write_error(u'空单平仓委托失败')

View File

@ -229,11 +229,11 @@ class CtaSpreadTemplate(CtaTemplate):
continue continue
act_vt_symbol = grid.snapshot.get('act_vt_symbol', self.act_vt_symbol) act_vt_symbol = grid.snapshot.get('act_vt_symbol', self.act_vt_symbol)
act_open_volume = grid.snapshot.get('act_open_volume', grid.volume) act_open_volume = grid.snapshot.get('act_open_volume', grid.volume * self.act_vol_ratio)
act_open_price = grid.snapshot.get('act_open_price') act_open_price = grid.snapshot.get('act_open_price')
pas_vt_symbol = grid.snapshot.get('pas_vt_symbol', self.pas_vt_symbol) pas_vt_symbol = grid.snapshot.get('pas_vt_symbol', self.pas_vt_symbol)
pas_open_volume = grid.snapshot.get('pas_open_volume', grid.volume) pas_open_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio)
pas_open_price = grid.snapshot.get('pas_open_price') pas_open_price = grid.snapshot.get('pas_open_price')
if act_vt_symbol != self.act_vt_symbol: if act_vt_symbol != self.act_vt_symbol:
pos_symbols.add(act_vt_symbol) pos_symbols.add(act_vt_symbol)
@ -269,11 +269,11 @@ class CtaSpreadTemplate(CtaTemplate):
if not grid.open_status: if not grid.open_status:
continue continue
act_vt_symbol = grid.snapshot.get('act_vt_symbol', self.act_vt_symbol) act_vt_symbol = grid.snapshot.get('act_vt_symbol', self.act_vt_symbol)
act_open_volume = grid.snapshot.get('act_open_volume', grid.volume) act_open_volume = grid.snapshot.get('act_open_volume', grid.volume * self.act_vol_ratio)
act_open_price = grid.snapshot.get('act_open_price') act_open_price = grid.snapshot.get('act_open_price')
pas_vt_symbol = grid.snapshot.get('pas_vt_symbol', self.pas_vt_symbol) pas_vt_symbol = grid.snapshot.get('pas_vt_symbol', self.pas_vt_symbol)
pas_open_volume = grid.snapshot.get('pas_open_volume', grid.volume) pas_open_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio)
pas_open_price = grid.snapshot.get('pas_open_price') pas_open_price = grid.snapshot.get('pas_open_price')
if act_vt_symbol != self.act_vt_symbol: if act_vt_symbol != self.act_vt_symbol:
@ -318,11 +318,11 @@ class CtaSpreadTemplate(CtaTemplate):
for grid in self.gt.get_opened_grids(direction=Direction.LONG): for grid in self.gt.get_opened_grids(direction=Direction.LONG):
act_vt_symbol = grid.snapshot.get('act_vt_symbol', self.act_vt_symbol) act_vt_symbol = grid.snapshot.get('act_vt_symbol', self.act_vt_symbol)
act_open_volume = grid.snapshot.get('act_open_volume', grid.volume) act_open_volume = grid.snapshot.get('act_open_volume', grid.volume * self.act_vol_ratio)
act_open_price = grid.snapshot.get('act_open_price') act_open_price = grid.snapshot.get('act_open_price')
pas_vt_symbol = grid.snapshot.get('pas_vt_symbol', self.pas_vt_symbol) pas_vt_symbol = grid.snapshot.get('pas_vt_symbol', self.pas_vt_symbol)
pas_open_volume = grid.snapshot.get('pas_open_volume', grid.volume) pas_open_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio)
pas_open_price = grid.snapshot.get('pas_open_price') pas_open_price = grid.snapshot.get('pas_open_price')
pos_list.append({'vt_symbol': act_vt_symbol, pos_list.append({'vt_symbol': act_vt_symbol,
@ -335,14 +335,13 @@ class CtaSpreadTemplate(CtaTemplate):
'volume': pas_open_volume, 'volume': pas_open_volume,
'price': pas_open_price}) 'price': pas_open_price})
for grid in self.gt.get_opened_grids(direction=Direction.SHORT): for grid in self.gt.get_opened_grids(direction=Direction.SHORT):
act_vt_symbol = grid.snapshot.get('act_vt_symbol', self.act_vt_symbol) act_vt_symbol = grid.snapshot.get('act_vt_symbol', self.act_vt_symbol)
act_open_volume = grid.snapshot.get('act_open_volume', grid.volume) act_open_volume = grid.snapshot.get('act_open_volume', grid.volume * self.act_vol_ratio)
act_open_price = grid.snapshot.get('act_open_price') act_open_price = grid.snapshot.get('act_open_price')
pas_vt_symbol = grid.snapshot.get('pas_vt_symbol', self.pas_vt_symbol) pas_vt_symbol = grid.snapshot.get('pas_vt_symbol', self.pas_vt_symbol)
pas_open_volume = grid.snapshot.get('pas_open_volume', grid.volume) pas_open_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio)
pas_open_price = grid.snapshot.get('pas_open_price') pas_open_price = grid.snapshot.get('pas_open_price')
pos_list.append({'vt_symbol': act_vt_symbol, pos_list.append({'vt_symbol': act_vt_symbol,
@ -530,8 +529,8 @@ class CtaSpreadTemplate(CtaTemplate):
if trade.offset == Offset.OPEN: if trade.offset == Offset.OPEN:
# 更新开仓均价/数量 # 更新开仓均价/数量
if trade.vt_symbol == self.act_vt_symbol: if trade.vt_symbol == self.act_vt_symbol:
opened_price = grid.snapshot.get('act_open_price', 0) opened_price = grid.snapshot.get('act_open_price', grid.volume * self.act_vol_ratio)
opened_volume = grid.snapshot.get('act_open_volume', 0) opened_volume = grid.snapshot.get('act_open_volume', grid.volume * self.act_vol_ratio)
act_open_volume = opened_volume + trade.volume act_open_volume = opened_volume + trade.volume
act_open_price = (opened_price * opened_volume + trade.price * trade.volume) / act_open_volume act_open_price = (opened_price * opened_volume + trade.price * trade.volume) / act_open_volume
@ -543,7 +542,7 @@ class CtaSpreadTemplate(CtaTemplate):
elif trade.vt_symbol == self.pas_vt_symbol: elif trade.vt_symbol == self.pas_vt_symbol:
opened_price = grid.snapshot.get('pas_open_price', 0) opened_price = grid.snapshot.get('pas_open_price', 0)
opened_volume = grid.snapshot.get('pas_open_volume', 0) opened_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio)
pas_open_volume = opened_volume + trade.volume pas_open_volume = opened_volume + trade.volume
pas_open_price = (opened_price * opened_volume + trade.price * trade.volume) / pas_open_volume pas_open_price = (opened_price * opened_volume + trade.price * trade.volume) / pas_open_volume
@ -557,7 +556,7 @@ class CtaSpreadTemplate(CtaTemplate):
# 更新平仓均价/数量 # 更新平仓均价/数量
if trade.vt_symbol == self.act_vt_symbol: if trade.vt_symbol == self.act_vt_symbol:
closed_price = grid.snapshot.get('act_close_price', 0) closed_price = grid.snapshot.get('act_close_price', 0)
closed_volume = grid.snapshot.get('act_close_volume', 0) closed_volume = grid.snapshot.get('act_close_volume', grid.volume * self.act_vol_ratio)
act_close_volume = closed_volume + trade.volume act_close_volume = closed_volume + trade.volume
act_close_price = (closed_price * closed_volume + trade.price * trade.volume) / act_close_volume act_close_price = (closed_price * closed_volume + trade.price * trade.volume) / act_close_volume
@ -569,7 +568,7 @@ class CtaSpreadTemplate(CtaTemplate):
elif trade.vt_symbol == self.pas_vt_symbol: elif trade.vt_symbol == self.pas_vt_symbol:
closed_price = grid.snapshot.get('pas_close_price', 0) closed_price = grid.snapshot.get('pas_close_price', 0)
closed_volume = grid.snapshot.get('pas_close_volume', 0) closed_volume = grid.snapshot.get('pas_close_volume', grid.volume * self.pas_vol_ratio)
pas_open_volume = closed_volume + trade.volume pas_open_volume = closed_volume + trade.volume
pas_open_price = (closed_price * closed_volume + trade.price * trade.volume) / pas_open_volume pas_open_price = (closed_price * closed_volume + trade.price * trade.volume) / pas_open_volume
@ -580,6 +579,7 @@ class CtaSpreadTemplate(CtaTemplate):
'pas_vt_symbol': self.pas_vt_symbol}) 'pas_vt_symbol': self.pas_vt_symbol})
self.gt.save() self.gt.save()
def on_order_all_traded(self, order: OrderData): def on_order_all_traded(self, order: OrderData):
""" """
订单全部成交 订单全部成交
@ -631,6 +631,7 @@ class CtaSpreadTemplate(CtaTemplate):
# 在策略得活动订单中,移除 # 在策略得活动订单中,移除
self.active_orders.pop(order.vt_orderid, None) self.active_orders.pop(order.vt_orderid, None)
self.gt.save()
def on_order_open_canceled(self, order: OrderData): def on_order_open_canceled(self, order: OrderData):
""" """
@ -687,6 +688,7 @@ class CtaSpreadTemplate(CtaTemplate):
if grid: if grid:
if order.vt_orderid in grid.order_ids: if order.vt_orderid in grid.order_ids:
self.write_log(f'移除grid中order_ids:{order.vt_orderid}')
grid.order_ids.remove(order.vt_orderid) grid.order_ids.remove(order.vt_orderid)
# 网格的所有委托单已经执行完毕 # 网格的所有委托单已经执行完毕
@ -736,6 +738,11 @@ class CtaSpreadTemplate(CtaTemplate):
info = self.active_orders.get(vt_orderid, None) info = self.active_orders.get(vt_orderid, None)
info.update({'retry': order_retry}) info.update({'retry': order_retry})
if grid:
if order.vt_orderid in grid.order_ids:
self.write_log(f'移除grid中order_ids:{order.vt_orderid}')
grid.order_ids.remove(order.vt_orderid)
self.gt.save() self.gt.save()
# 删除旧的委托记录 # 删除旧的委托记录
self.write_log(u'移除旧的委托记录:{}'.format(order.vt_orderid)) self.write_log(u'移除旧的委托记录:{}'.format(order.vt_orderid))
@ -774,6 +781,11 @@ class CtaSpreadTemplate(CtaTemplate):
info = self.active_orders.get(vt_orderid, None) info = self.active_orders.get(vt_orderid, None)
info.update({'retry': order_retry}) info.update({'retry': order_retry})
if grid:
if order.vt_orderid in grid.order_ids:
self.write_log(f'移除grid中order_ids:{order.vt_orderid}')
grid.order_ids.remove(order.vt_orderid)
self.gt.save() self.gt.save()
# 删除旧的委托记录 # 删除旧的委托记录
self.write_log(u'移除旧的委托记录:{}'.format(order.vt_orderid)) self.write_log(u'移除旧的委托记录:{}'.format(order.vt_orderid))
@ -784,6 +796,7 @@ class CtaSpreadTemplate(CtaTemplate):
self.write_log(u'委托单状态:{}=>{}'.format(pre_status, old_order.get('status'))) self.write_log(u'委托单状态:{}=>{}'.format(pre_status, old_order.get('status')))
if grid: if grid:
if order.vt_orderid in grid.order_ids: if order.vt_orderid in grid.order_ids:
self.write_log(f'移除grid中order_ids:{order.vt_orderid}')
grid.order_ids.remove(order.vt_orderid) grid.order_ids.remove(order.vt_orderid)
if not grid.order_ids: if not grid.order_ids:
@ -835,6 +848,7 @@ class CtaSpreadTemplate(CtaTemplate):
self.send_wechat(msg) self.send_wechat(msg)
if grid: if grid:
if order.vt_orderid in grid.order_ids: if order.vt_orderid in grid.order_ids:
self.write_log(f'移除grid中order_ids:{order.vt_orderid}')
grid.order_ids.remove(order.vt_orderid) grid.order_ids.remove(order.vt_orderid)
if not grid.order_ids: if not grid.order_ids:
grid.order_status = False grid.order_status = False
@ -876,7 +890,10 @@ class CtaSpreadTemplate(CtaTemplate):
for vt_orderid in vt_orderids: for vt_orderid in vt_orderids:
info = self.active_orders.get(vt_orderid) info = self.active_orders.get(vt_orderid)
info.update({'retry': order_retry}) info.update({'retry': order_retry})
if grid:
if order.vt_orderid in grid.order_ids:
self.write_log(f'移除grid中order_ids:{order.vt_orderid}')
grid.order_ids.remove(order.vt_orderid)
self.gt.save() self.gt.save()
self.write_log(u'移除活动订单:{}'.format(order.vt_orderid)) self.write_log(u'移除活动订单:{}'.format(order.vt_orderid))
self.active_orders.pop(order.vt_orderid, None) self.active_orders.pop(order.vt_orderid, None)
@ -911,7 +928,10 @@ class CtaSpreadTemplate(CtaTemplate):
for vt_orderid in vt_orderids: for vt_orderid in vt_orderids:
info = self.active_orders.get(vt_orderid) info = self.active_orders.get(vt_orderid)
info.update({'retry': order_retry}) info.update({'retry': order_retry})
if grid:
if order.vt_orderid in grid.order_ids:
self.write_log(f'移除grid中order_ids:{order.vt_orderid}')
grid.order_ids.remove(order.vt_orderid)
self.gt.save() self.gt.save()
self.write_log(u'移除活动订单:{}'.format(order.vt_orderid)) self.write_log(u'移除活动订单:{}'.format(order.vt_orderid))
@ -923,6 +943,7 @@ class CtaSpreadTemplate(CtaTemplate):
self.write_log(u'委托单状态:{}=>{}'.format(pre_status, old_order.get('status'))) self.write_log(u'委托单状态:{}=>{}'.format(pre_status, old_order.get('status')))
if grid: if grid:
if order.vt_orderid in grid.order_ids: if order.vt_orderid in grid.order_ids:
self.write_log(f'移除grid中order_ids:{order.vt_orderid}')
grid.order_ids.remove(order.vt_orderid) grid.order_ids.remove(order.vt_orderid)
if len(grid.order_ids) == 0: if len(grid.order_ids) == 0:
grid.order_status = False grid.order_status = False
@ -1181,7 +1202,8 @@ class CtaSpreadTemplate(CtaTemplate):
self.write_error(f'spd_short{self.pas_vt_symbol}开多仓{grid.volume * self.pas_vol_ratio}手失败,' self.write_error(f'spd_short{self.pas_vt_symbol}开多仓{grid.volume * self.pas_vol_ratio}手失败,'
f'委托价:{self.cur_pas_tick.ask_price_1}') f'委托价:{self.cur_pas_tick.ask_price_1}')
return [] return []
grid.snapshot.update({"act_vt_symbol": self.act_vt_symbol, "act_open_volume": grid.volume * self.act_vol_ratio,
"pas_vt_symbol": self.pas_vt_symbol, "pas_open_volume": grid.volume * self.pas_vol_ratio})
grid.order_status = True grid.order_status = True
grid.order_datetime = self.cur_datetime grid.order_datetime = self.cur_datetime
@ -1242,7 +1264,8 @@ class CtaSpreadTemplate(CtaTemplate):
self.write_error(f'spd_short{self.pas_vt_symbol}开空仓{grid.volume * self.pas_vol_ratio}手失败,' self.write_error(f'spd_short{self.pas_vt_symbol}开空仓{grid.volume * self.pas_vol_ratio}手失败,'
f'委托价:{self.cur_pas_tick.bid_price_1}') f'委托价:{self.cur_pas_tick.bid_price_1}')
return [] return []
grid.snapshot.update({"act_vt_symbol": self.act_vt_symbol, "act_open_volume": grid.volume * self.act_vol_ratio,
"pas_vt_symbol": self.pas_vt_symbol, "pas_open_volume": grid.volume * self.pas_vol_ratio})
grid.order_status = True grid.order_status = True
grid.order_datetime = self.cur_datetime grid.order_datetime = self.cur_datetime
vt_orderids = act_vt_orderids.extend(pas_vt_orderids) vt_orderids = act_vt_orderids.extend(pas_vt_orderids)
@ -1270,26 +1293,40 @@ class CtaSpreadTemplate(CtaTemplate):
self.write_log(u'实际价差{}不满足:{}'.format(self.cur_spd_tick.bid_price_1, grid.close_price)) self.write_log(u'实际价差{}不满足:{}'.format(self.cur_spd_tick.bid_price_1, grid.close_price))
return [] return []
# 获取账号持仓
self.act_pos = self.cta_engine.get_position_holding(vt_symbol=self.act_vt_symbol) self.act_pos = self.cta_engine.get_position_holding(vt_symbol=self.act_vt_symbol)
self.pas_pos = self.cta_engine.get_position_holding(vt_symbol=self.pas_vt_symbol) self.pas_pos = self.cta_engine.get_position_holding(vt_symbol=self.pas_vt_symbol)
if not all([self.act_pos, self.pas_pos]): if not all([self.act_pos, self.pas_pos]):
self.write_error('主动腿/被动退得持仓数据不存在') self.write_error('主动腿/被动腿的账号持仓数据不存在')
return [] return []
act_close_volume = grid.snapshot.get('act_open_volume') # 获取需要平仓的主动腿、被动腿volume
pas_close_volume = grid.snapshot.get('pas_open_volume') act_close_volume = grid.snapshot.get('act_open_volume', grid.volume * self.act_vol_ratio)
pas_close_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio)
# 检查账号持仓是否满足平仓目标
if self.act_pos.long_pos < act_close_volume: if self.act_pos.long_pos < act_close_volume:
self.write_error(f'账号 {self.act_vt_symbol} 多单持仓{self.act_pos.long_pos}' self.write_error(f'账号 {self.act_vt_symbol} 多单持仓{self.act_pos.long_pos}'
f'今仓{self.act_pos.long_td}/昨{self.act_pos.long_yd}, 不满足{act_close_volume}') f'今仓{self.act_pos.long_td}/昨{self.act_pos.long_yd}, 不满足{act_close_volume}')
return [] return []
if self.pas_pos.short_pos < pas_close_volume: if self.pas_pos.short_pos < pas_close_volume:
self.write_error(f'账号 {self.pas_vt_symbol} 多单持仓{self.pas_pos.short_pos}' self.write_error(f'账号 {self.pas_vt_symbol} 多单持仓{self.pas_pos.short_pos}'
f'今仓{self.pas_pos.short_td}/昨{self.pas_pos.short_yd}, 不满足{act_close_volume}') f'今仓{self.pas_pos.short_td}/昨{self.pas_pos.short_yd}, 不满足{act_close_volume}')
return [] return []
# 被动腿空单平仓
pas_vt_orderids = self.cover(vt_symbol=self.pas_vt_symbol,
lock=self.pas_exchange == Exchange.CFFEX,
price=self.cur_pas_tick.ask_price_1,
volume=grid.volume * self.pas_vol_ratio,
order_type=self.order_type,
order_time=self.cur_datetime,
grid=grid)
if not pas_vt_orderids:
self.write_error(f'spd_sell{self.pas_vt_symbol}空单平仓{grid.volume * self.pas_vol_ratio}手失败,'
f'委托价:{self.cur_pas_tick.ask_price_1}')
return []
# 主动腿多单平仓 # 主动腿多单平仓
act_vt_orderids = self.sell(vt_symbol=self.act_vt_symbol, act_vt_orderids = self.sell(vt_symbol=self.act_vt_symbol,
lock=self.act_exchange==Exchange.CFFEX, lock=self.act_exchange==Exchange.CFFEX,
@ -1303,19 +1340,6 @@ class CtaSpreadTemplate(CtaTemplate):
f'委托价:{self.cur_act_tick.bid_price_1}') f'委托价:{self.cur_act_tick.bid_price_1}')
return [] return []
# 被动腿空单平仓
pas_vt_orderids = self.cover(vt_symbol=self.pas_vt_symbol,
lock=self.pas_exchange==Exchange.CFFEX,
price=self.cur_pas_tick.ask_price_1,
volume=grid.volume * self.pas_vol_ratio,
order_type=self.order_type,
order_time=self.cur_datetime,
grid=grid)
if not pas_vt_orderids:
self.write_error(f'spd_sell{self.pas_vt_symbol}空单平仓{grid.volume * self.pas_vol_ratio}手失败,'
f'委托价:{self.cur_pas_tick.ask_price_1}')
return []
grid.order_status = True grid.order_status = True
grid.order_datetime = self.cur_datetime grid.order_datetime = self.cur_datetime
vt_orderids = act_vt_orderids.extend(pas_vt_orderids) vt_orderids = act_vt_orderids.extend(pas_vt_orderids)
@ -1343,6 +1367,7 @@ class CtaSpreadTemplate(CtaTemplate):
self.write_log(u'实际价差{}不满足:{}'.format(self.cur_spd_tick.ask_price_1, grid.close_price)) self.write_log(u'实际价差{}不满足:{}'.format(self.cur_spd_tick.ask_price_1, grid.close_price))
return [] return []
# 获取账号内主动腿和被动腿的持仓
self.act_pos = self.cta_engine.get_position_holding(vt_symbol=self.act_vt_symbol) self.act_pos = self.cta_engine.get_position_holding(vt_symbol=self.act_vt_symbol)
self.pas_pos = self.cta_engine.get_position_holding(vt_symbol=self.pas_vt_symbol) self.pas_pos = self.cta_engine.get_position_holding(vt_symbol=self.pas_vt_symbol)
@ -1350,19 +1375,31 @@ class CtaSpreadTemplate(CtaTemplate):
self.write_error('主动腿/被动退得持仓数据不存在') self.write_error('主动腿/被动退得持仓数据不存在')
return [] return []
act_close_volume = grid.snapshot.get('act_open_volume', 0) # 检查主动腿、被动腿,是否满足
pas_close_volume = grid.snapshot.get('pas_open_volume', 0) act_close_volume = grid.snapshot.get('act_open_volume', grid.volume * self.act_vol_ratio)
pas_close_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio)
if self.act_pos.short_pos < act_close_volume: if self.act_pos.short_pos < act_close_volume:
self.write_error(f'账号 {self.act_vt_symbol} 空单持仓{self.act_pos.short_pos}' self.write_error(f'账号 {self.act_vt_symbol} 空单持仓{self.act_pos.short_pos}'
f'今仓{self.act_pos.short_td}/昨{self.act_pos.short_yd}, 不满足{act_close_volume}') f'今仓{self.act_pos.short_td}/昨{self.act_pos.short_yd}, 不满足{act_close_volume}')
return [] return []
if self.pas_pos.long_pos < pas_close_volume: if self.pas_pos.long_pos < pas_close_volume:
self.write_error(f'账号 {self.pas_vt_symbol} 多单持仓{self.pas_pos.longt_pos}' self.write_error(f'账号 {self.pas_vt_symbol} 多单持仓{self.pas_pos.longt_pos}'
f'今仓{self.pas_pos.long_td}/昨{self.pas_pos.long_yd}, 不满足{act_close_volume}') f'今仓{self.pas_pos.long_td}/昨{self.pas_pos.long_yd}, 不满足{act_close_volume}')
return [] return []
# 被动腿多单平仓
pas_vt_orderids = self.sell(vt_symbol=self.pas_vt_symbol,
lock=self.pas_exchange == Exchange.CFFEX,
price=self.cur_pas_tick.bid_price_1,
volume=grid.volume * self.pas_vol_ratio,
order_type=self.order_type,
order_time=self.cur_datetime,
grid=grid)
if not pas_vt_orderids:
self.write_error(f'spd_cover{self.pas_vt_symbol}多单平仓{grid.volume * self.pas_vol_ratio}手失败,'
f'委托价:{self.cur_pas_tick.bid_price_1}')
return []
# 主动腿空单平仓 # 主动腿空单平仓
act_vt_orderids = self.cover(vt_symbol=self.act_vt_symbol, act_vt_orderids = self.cover(vt_symbol=self.act_vt_symbol,
lock=self.act_exchange==Exchange.CFFEX, lock=self.act_exchange==Exchange.CFFEX,
@ -1376,19 +1413,6 @@ class CtaSpreadTemplate(CtaTemplate):
f'委托价:{self.cur_act_tick.ask_price_1}') f'委托价:{self.cur_act_tick.ask_price_1}')
return [] return []
# 被动腿多单平仓
pas_vt_orderids = self.sell(vt_symbol=self.pas_vt_symbol,
lock=self.pas_exchange==Exchange.CFFEX,
price=self.cur_pas_tick.bid_price_1,
volume=grid.volume * self.pas_vol_ratio,
order_type=self.order_type,
order_time=self.cur_datetime,
grid=grid)
if not pas_vt_orderids:
self.write_error(f'spd_cover{self.pas_vt_symbol}多单平仓{grid.volume * self.pas_vol_ratio}手失败,'
f'委托价:{self.cur_pas_tick.bid_price_1}')
return []
grid.order_status = True grid.order_status = True
grid.order_datetime = self.cur_datetime grid.order_datetime = self.cur_datetime
vt_orderids = act_vt_orderids.extend(pas_vt_orderids) vt_orderids = act_vt_orderids.extend(pas_vt_orderids)

View File

@ -162,7 +162,7 @@ class BinanceGateway(BaseGateway):
self.status.update({'con': True}) self.status.update({'con': True})
self.count += 1 self.count += 1
if self.count < 2: if self.count < 5:
return return
self.count = 0 self.count = 0
if len(self.query_functions) > 0: if len(self.query_functions) > 0:

View File

@ -474,12 +474,13 @@ class PbMdApi(object):
{'ip': "124.160.88.183", 'port': 7709}, {'ip': "124.160.88.183", 'port': 7709},
{'ip': "60.12.136.250", 'port': 7709}, {'ip': "60.12.136.250", 'port': 7709},
{'ip': "218.108.98.244", 'port': 7709}, {'ip': "218.108.98.244", 'port': 7709},
{'ip': "218.108.47.69", 'port': 7709}, #{'ip': "218.108.47.69", 'port': 7709},
{'ip': "114.80.63.12", 'port': 7709}, {'ip': "114.80.63.12", 'port': 7709},
{'ip': "114.80.63.35", 'port': 7709}, {'ip': "114.80.63.35", 'port': 7709},
{'ip': "180.153.39.51", 'port': 7709}, {'ip': "180.153.39.51", 'port': 7709},
{'ip': '14.215.128.18', 'port': 7709}, #{'ip': '14.215.128.18', 'port': 7709},
{'ip': '59.173.18.140', 'port': 7709}] #{'ip': '59.173.18.140', 'port': 7709}
]
self.best_ip = {'ip': None, 'port': None} self.best_ip = {'ip': None, 'port': None}
self.api_dict = {} # API 的连接会话对象字典 self.api_dict = {} # API 的连接会话对象字典
@ -688,7 +689,12 @@ class PbMdApi(object):
exchange = info.get('exchange', '') exchange = info.get('exchange', '')
if len(exchange) == 0: if len(exchange) == 0:
continue continue
vn_exchange_str = get_stock_exchange(symbol)
if exchange != vn_exchange_str:
continue
exchange = Exchange(exchange) exchange = Exchange(exchange)
if info['stock_type'] == 'stock_cn': if info['stock_type'] == 'stock_cn':
product = Product.EQUITY product = Product.EQUITY
elif info['stock_type'] in ['bond_cn', 'cb_cn']: elif info['stock_type'] in ['bond_cn', 'cb_cn']:
@ -715,16 +721,18 @@ class PbMdApi(object):
min_volume=volume_tick, min_volume=volume_tick,
margin_rate=1 margin_rate=1
) )
# 缓存 合约 =》 中文名
symbol_name_map.update({contract.symbol: contract.name})
# 缓存代码和交易所的印射关系 if product!= Product.INDEX:
symbol_exchange_map[contract.symbol] = contract.exchange # 缓存 合约 =》 中文名
symbol_name_map.update({contract.symbol: contract.name})
self.contract_dict.update({contract.symbol: contract}) # 缓存代码和交易所的印射关系
self.contract_dict.update({contract.vt_symbol: contract}) symbol_exchange_map[contract.symbol] = contract.exchange
# 推送
self.gateway.on_contract(contract) self.contract_dict.update({contract.symbol: contract})
self.contract_dict.update({contract.vt_symbol: contract})
# 推送
self.gateway.on_contract(contract)
def get_stock_list(self): def get_stock_list(self):
"""股票所有的code&name列表""" """股票所有的code&name列表"""
@ -1605,7 +1613,7 @@ class PbTdApi(object):
continue continue
def check_send_order_dbf(self): def check_send_order_dbf(self):
"""检查更新委托文件csv""" """检查更新委托文件dbf"""
dbf_file = os.path.abspath(os.path.join(self.order_folder, dbf_file = os.path.abspath(os.path.join(self.order_folder,
'{}{}.dbf'.format(PB_FILE_NAMES.get('send_order'), self.trading_date))) '{}{}.dbf'.format(PB_FILE_NAMES.get('send_order'), self.trading_date)))
@ -1639,7 +1647,7 @@ class PbTdApi(object):
err_msg = err_msg.decode('gbk') err_msg = err_msg.decode('gbk')
if len(err_msg) == 0 or record.wtsbdm == 0: if len(err_msg) == 0 or record.wtsbdm == 0:
self.gateway.write_log(f'收到失败标准,又没有失败原因:{print_dict(record.__dict__)}') self.gateway.write_log(f'收到失败,又没有失败原因')
continue continue
err_id = str(getattr(record, 'wtsbdm', '')).strip() err_id = str(getattr(record, 'wtsbdm', '')).strip()
@ -2056,6 +2064,9 @@ class TqMdApi():
def connect(self, setting = {}): def connect(self, setting = {}):
"""""" """"""
if self.api and self.is_connected:
self.gateway.write_log(f'天勤行情已经接入,无需重新连接')
return
try: try:
from tqsdk import TqApi from tqsdk import TqApi
self.api = TqApi(_stock=True) self.api = TqApi(_stock=True)