[增强功能] 策略配置自动排序,天勤行情,bug fix

This commit is contained in:
msincenselee 2020-07-28 16:10:44 +08:00
parent 3496ee92b0
commit 42371cc967
15 changed files with 596 additions and 53 deletions

View File

@ -1741,7 +1741,11 @@ class CtaEngine(BaseEngine):
self.strategy_setting[strategy_name] = new_config
save_json(self.setting_filename, self.strategy_setting)
sorted_setting = OrderedDict()
for k in sorted(self.strategy_setting.keys()):
sorted_setting.update({k: self.strategy_setting.get(k)})
save_json(self.setting_filename, sorted_setting)
def remove_strategy_setting(self, strategy_name: str):
"""
@ -1751,7 +1755,11 @@ class CtaEngine(BaseEngine):
return
self.write_log(f'移除CTA数字货币引擎{strategy_name}的配置')
self.strategy_setting.pop(strategy_name)
save_json(self.setting_filename, self.strategy_setting)
sorted_setting = OrderedDict()
for k in sorted(self.strategy_setting.keys()):
sorted_setting.update({k: self.strategy_setting.get(k)})
save_json(self.setting_filename, sorted_setting)
def put_stop_order_event(self, stop_order: StopOrder):
"""

View File

@ -1742,7 +1742,11 @@ class CtaEngine(BaseEngine):
self.strategy_setting[strategy_name] = new_config
save_json(self.setting_filename, self.strategy_setting)
sorted_setting = OrderedDict()
for k in sorted(self.strategy_setting.keys()):
sorted_setting.update({k: self.strategy_setting.get(k)})
save_json(self.setting_filename, sorted_setting)
def remove_strategy_setting(self, strategy_name: str):
"""
@ -1752,7 +1756,10 @@ class CtaEngine(BaseEngine):
return
self.write_log(f'移除CTA股票引擎{strategy_name}的配置')
self.strategy_setting.pop(strategy_name)
save_json(self.setting_filename, self.strategy_setting)
sorted_setting = OrderedDict()
for k in sorted(self.strategy_setting.keys()):
sorted_setting.update({k: self.strategy_setting.get(k)})
save_json(self.setting_filename, sorted_setting)
def put_stop_order_event(self, stop_order: StopOrder):
"""

View File

@ -373,7 +373,7 @@ class StockPolicy(CtaPolicy):
self.cur_trading_date = json_data.get('cur_trading_date', None)
self.sub_tns = json_data.get('sub_tns',{})
signals = json_data.get('signals', {})
for kline_name, signal in signals:
for k, signal in signals.items():
last_signal = signal.get('last_signal', "")
str_ast_signal_time = signal.get('last_signal_time', "")
try:
@ -383,7 +383,7 @@ class StockPolicy(CtaPolicy):
last_signal_time = None
except Exception as ex:
last_signal_time = None
self.signals.update({kline_name: {'last_signal': last_signal, 'last_signal_time': last_signal_time}})
self.signals.update({k: {'last_signal': last_signal, 'last_signal_time': last_signal_time}})
def to_json(self):
@ -400,7 +400,7 @@ class StockPolicy(CtaPolicy):
'%Y-%m-%d %H:%M:%S') if last_signal_time is not None else ""
}
})
j['singlals'] = d
j['signals'] = d
return j

View File

@ -1906,7 +1906,11 @@ class CtaEngine(BaseEngine):
self.strategy_setting[strategy_name] = new_config
save_json(self.setting_filename, self.strategy_setting)
sorted_setting = OrderedDict()
for k in sorted(self.strategy_setting.keys()):
sorted_setting.update({k: self.strategy_setting.get(k)})
save_json(self.setting_filename, sorted_setting)
def remove_strategy_setting(self, strategy_name: str):
"""
@ -1916,7 +1920,11 @@ class CtaEngine(BaseEngine):
return
self.write_log(f'移除CTA引擎{strategy_name}的配置')
self.strategy_setting.pop(strategy_name)
save_json(self.setting_filename, self.strategy_setting)
sorted_setting = OrderedDict()
for k in sorted(self.strategy_setting.keys()):
sorted_setting.update({k: self.strategy_setting.get(k)})
save_json(self.setting_filename, sorted_setting)
def put_stop_order_event(self, stop_order: StopOrder):
"""

View File

@ -29,6 +29,7 @@ from vnpy.trader.constant import (
from vnpy.trader.utility import (
extract_vt_symbol,
get_underlying_symbol,
get_trading_date,
import_module_by_str
)
@ -58,7 +59,7 @@ class SpreadTestingEngine(BackTestingEngine):
"""Constructor"""
super().__init__(event_engine)
self.tick_path = None # tick级别回测 路径
self.use_tq = False
self.strategy_start_date_dict = {}
self.strategy_end_date_dict = {}
@ -66,6 +67,8 @@ class SpreadTestingEngine(BackTestingEngine):
self.output('portfolio prepare_env')
super().prepare_env(test_setting)
self.use_tq = test_setting.get('use_tq', False)
def load_strategy(self, strategy_name: str, strategy_setting: dict = None):
"""
装载回测的策略
@ -205,6 +208,8 @@ class SpreadTestingEngine(BackTestingEngine):
def load_csv_file(self, tick_folder, vt_symbol, tick_date):
"""从文件中读取tick返回list[{dict}]"""
if self.use_tq:
return self.load_tq_csv_file(tick_folder, vt_symbol, tick_date)
symbol, exchange = extract_vt_symbol(vt_symbol)
underly_symbol = get_underlying_symbol(symbol)
@ -271,6 +276,65 @@ class SpreadTestingEngine(BackTestingEngine):
return ticks
def load_tq_csv_file(self, tick_folder, vt_symbol, tick_date):
"""从天勤下载的csv文件中读取tick返回list[{dict}]"""
symbol, exchange = extract_vt_symbol(vt_symbol)
underly_symbol = get_underlying_symbol(symbol)
exchange_folder = VN_EXCHANGE_TICKFOLDER_MAP.get(exchange.value)
file_path = os.path.abspath(
os.path.join(
tick_folder, 'tq', 'future',
tick_date.strftime('%Y%m'),
'{}_{}.csv'.format(symbol, tick_date.strftime('%Y%m%d'))))
ticks = []
if not os.path.isfile(file_path):
self.write_log(u'{0}文件不存在'.format(file_path))
return None
try:
df = pd.read_csv(file_path, parse_dates=False)
# datetime,symbol,exchange,last_price,highest,lowest,volume,amount,open_interest,upper_limit,lower_limit,
# bid_price_1,bid_volume_1,ask_price_1,ask_volume_1,
# bid_price_2,bid_volume_2,ask_price_2,ask_volume_2,
# bid_price_3,bid_volume_3,ask_price_3,ask_volume_3,
# bid_price_4,bid_volume_4,ask_price_4,ask_volume_4,
# bid_price_5,bid_volume_5,ask_price_5,ask_volume_5
self.write_log(u'加载csv文件{}'.format(file_path))
last_time = None
for index, row in df.iterrows():
tick = row.to_dict()
tick['date'], tick['time'] = tick['datetime'].split(' ')
tick.update({'trading_day': tick_date.strftime('%Y-%m-%d')})
tick_datetime = datetime.strptime(tick['datetime'], '%Y-%m-%d %H:%M:%S.%f')
# 修正毫秒
if tick['time'] == last_time:
# 与上一个tick的时间去除毫秒后相同,修改为500毫秒
tick_datetime = tick_datetime.replace(microsecond=500)
tick['time'] = tick_datetime.strftime('%H:%M:%S.%f')
else:
last_time = tick['time']
tick_datetime = tick_datetime.replace(microsecond=0)
tick['time'] = tick_datetime.strftime('%H:%M:%S.%f')
tick['datetime'] = tick_datetime
# 排除涨停/跌停的数据
if (float(tick['bid_price_1']) == float('1.79769E308') and int(tick['bid_volume_1']) == 0) \
or (float(tick['ask_price_1']) == float('1.79769E308') and int(tick['ask_volume_1']) == 0):
continue
ticks.append(tick)
del df
except Exception as ex:
self.write_log(u'{0}文件读取不成功'.format(file_path))
return None
return ticks
def load_bz2_cache(self, cache_folder, cache_symbol, cache_date):
"""
加载缓存数据

View File

@ -2199,8 +2199,8 @@ class CtaProFutureTemplate(CtaProTemplate):
self.write_log(u'空单对锁格:{}'.format([g.to_json() for g in locked_short_grids]))
if locked_long_volume != locked_short_volume:
self.write_error(u'对锁格多空数量不一致,不能解锁.\n多:{},\n空:{}'
.format(locked_long_volume, locked_short_volume))
self.write_error(u'{}对锁格多空数量不一致,不能解锁.\n多:{},\n空:{}'
.format(self.strategy_name, locked_long_volume, locked_short_volume))
return
# 检查所有品种得昨仓是否满足数量

View File

@ -4,6 +4,9 @@
import os
import traceback
from copy import copy
import bz2
import pickle
import zlib
from vnpy.trader.utility import append_data
from .template import (
CtaPosition,
@ -71,10 +74,6 @@ class CtaSpreadTemplate(CtaTemplate):
"""更新配置参数"""
super().update_setting(setting)
# 订阅主动腿/被动腿合约
self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=self.act_vt_symbol)
self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=self.pas_vt_symbol)
self.act_price_tick = self.cta_engine.get_price_tick(self.act_vt_symbol)
self.pas_price_tick = self.cta_engine.get_price_tick(self.pas_vt_symbol)
@ -95,6 +94,102 @@ class CtaSpreadTemplate(CtaTemplate):
if len(self.gt.dn_grids) > 0:
self.write_log(dn_grids_info)
def sync_data(self):
"""同步更新数据"""
if not self.backtesting:
self.write_log(u'保存k线缓存数据')
self.save_klines_to_cache()
def save_klines_to_cache(self, kline_names: list = []):
"""
保存K线数据到缓存
:param kline_names: 一般为self.klines的keys
:return:
"""
if len(kline_names) == 0:
kline_names = list(self.klines.keys())
# 获取保存路径
save_path = self.cta_engine.get_data_path()
# 保存缓存的文件名
file_name = os.path.abspath(os.path.join(save_path, f'{self.strategy_name}_klines.pkb2'))
with bz2.BZ2File(file_name, 'wb') as f:
klines = {}
for kline_name in kline_names:
kline = self.klines.get(kline_name, None)
if kline:
kline.strategy = None
kline.cb_on_bar = None
if kline.cb_on_period:
kline.cb_on_period = None
kline.cb_dict = {}
klines.update({kline_name: kline})
pickle.dump(klines, f)
def load_klines_from_cache(self, kline_names: list = []):
"""
从缓存加载K线数据
:param kline_names:
:return:
"""
if len(kline_names) == 0:
kline_names = list(self.klines.keys())
save_path = self.cta_engine.get_data_path()
file_name = os.path.abspath(os.path.join(save_path, f'{self.strategy_name}_klines.pkb2'))
try:
last_bar_dt = None
with bz2.BZ2File(file_name, 'rb') as f:
klines = pickle.load(f)
# 逐一恢复K线
for kline_name in kline_names:
# 缓存的k线实例
cache_kline = klines.get(kline_name, None)
# 当前策略实例的K线实例
strategy_kline = self.klines.get(kline_name, None)
if cache_kline and strategy_kline:
# 临时保存当前的回调函数
cb_on_bar = strategy_kline.cb_on_bar
# 缓存实例数据 =》 当前实例数据
strategy_kline.__dict__.update(cache_kline.__dict__)
# 所有K线的最后时间
if last_bar_dt and strategy_kline.cur_datetime:
last_bar_dt = max(last_bar_dt, strategy_kline.cur_datetime)
else:
last_bar_dt = strategy_kline.cur_datetime
# 重新绑定k线策略与on_bar回调函数
strategy_kline.strategy = self
strategy_kline.cb_on_bar = cb_on_bar
self.write_log(f'恢复{kline_name}缓存数据,最新bar结束时间:{last_bar_dt}')
self.write_log(u'加载缓存k线数据完毕')
return last_bar_dt
except Exception as ex:
self.write_error(f'加载缓存K线数据失败:{str(ex)}')
return None
def get_klines_snapshot(self):
"""返回当前klines的切片数据"""
try:
d = {
'strategy': self.strategy_name,
'datetime': datetime.now()}
klines = {}
for kline_name in sorted(self.klines.keys()):
klines.update({kline_name: self.klines.get(kline_name).get_data()})
kline_names = list(klines.keys())
binary_data = zlib.compress(pickle.dumps(klines))
d.update({'kline_names': kline_names, 'klines': binary_data, 'zlib': True})
return d
except Exception as ex:
self.write_error(f'获取klines切片数据失败:{str(ex)}')
return {}
def init_position(self, status_filter=[True]):
"""
初始化Positin
@ -205,6 +300,10 @@ class CtaSpreadTemplate(CtaTemplate):
def on_start(self):
"""启动策略(必须由用户继承实现)"""
# 订阅主动腿/被动腿合约
self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=self.act_vt_symbol)
self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=self.pas_vt_symbol)
self.write_log(u'启动')
self.trading = True
self.put_event()
@ -448,7 +547,8 @@ class CtaSpreadTemplate(CtaTemplate):
self.update_pos(price=grid.close_price,
volume=grid.volume,
operation='cover' if grid.direction == Direction.SHORT else 'sell')
operation='cover' if grid.direction == Direction.SHORT else 'sell',
dt=self.cur_datetime)
self.write_log(f'移除网格:{grid.to_json()}')
self.gt.remove_grids_by_ids(direction=grid.direction, ids=[grid.id])
@ -460,7 +560,8 @@ class CtaSpreadTemplate(CtaTemplate):
self.write_log(f'{grid.direction.value}单已开仓完毕,,手数:{grid.volume}, 详细:{grid.snapshot}')
self.update_pos(price=grid.open_price,
volume=grid.volume,
operation='short' if grid.direction == Direction.SHORT else 'buy')
operation='short' if grid.direction == Direction.SHORT else 'buy',
dt=self.cur_datetime)
# 网格的所有委托单部分执行完毕
else:
self.write_log(f'剩余委托单号:{grid.order_ids}')
@ -656,7 +757,7 @@ class CtaSpreadTemplate(CtaTemplate):
order_price = old_order['price']
order_type = old_order.get('order_type', OrderType.LIMIT)
order_retry = old_order['retry']
order_retry = old_order.get('retry',1)
grid = old_order.get('grid', None)
if order_retry > 10:
msg = u'{} 平仓撤单 {}/{}手, 重试平仓次数{}>10' \
@ -789,18 +890,23 @@ class CtaSpreadTemplate(CtaTemplate):
over_seconds = (dt - order_time).total_seconds()
# 只处理未成交的限价委托单
if order_status in [Status.NOTTRADED] and (order_type == OrderType.LIMIT):
if order_status in [Status.SUBMITTING, Status.NOTTRADED] and (order_type == OrderType.LIMIT):
if over_seconds > self.cancel_seconds or force: # 超过设置的时间还未成交
self.write_log(u'超时{}秒未成交取消委托单vt_orderid:{},order:{}'
.format(over_seconds, vt_orderid, order_info))
order_info.update({'status': Status.CANCELING})
order_info.update({'status': Status.CANCELLING})
self.active_orders.update({vt_orderid: order_info})
ret = self.cancel_order(str(vt_orderid))
if not ret:
self.write_log(u'撤单失败,更新状态为撤单成功')
order_info.update({'status': Status.CANCELLED})
self.active_orders.update({vt_orderid: order_info})
else:
if order_grid:
if vt_orderid in order_grid.order_ids:
order_grid.order_ids.remove(vt_orderid)
if len(order_grid.order_ids) == 0:
order_grid.order_status = False
continue
# 处理状态为‘撤销’的委托单
@ -925,8 +1031,8 @@ class CtaSpreadTemplate(CtaTemplate):
return True
# leg1 接近跌停价10个minDiff 以内)
if self.cur_act_tick.limie_down > 0 \
and self.cur_act_tick.bid_price_1 - 10 * self.act_price_tick < self.cur_act_tick.limie_down:
if self.cur_act_tick.limit_down > 0 \
and self.cur_act_tick.bid_price_1 - 10 * self.act_price_tick < self.cur_act_tick.limit_down:
self.write_log(u'主动腿 bid_price_1{} 接近跌停价{}'
.format(self.cur_act_tick.bid_price_1, self.cur_act_tick.limit_up))
return True
@ -939,8 +1045,8 @@ class CtaSpreadTemplate(CtaTemplate):
return True
# leg2 接近跌停价10个minDiff 以内)
if self.cur_pas_tick.limie_down > 0 \
and self.cur_pas_tick.bid_price_1 - 10 * self.pas_price_tick < self.cur_pas_tick.limie_down:
if self.cur_pas_tick.limit_down > 0 \
and self.cur_pas_tick.bid_price_1 - 10 * self.pas_price_tick < self.cur_pas_tick.limit_down:
self.write_log(u'被动腿 bid_price_1{} 接近跌停价{}'
.format(self.cur_pas_tick.bid_price_1, self.cur_pas_tick.limit_up))
return True
@ -976,26 +1082,26 @@ class CtaSpreadTemplate(CtaTemplate):
# 开空主动腿
act_vt_orderids = self.short(vt_symbol=self.act_vt_symbol,
price=self.cur_act_tick.bid_price1,
price=self.cur_act_tick.bid_price_1,
volume=grid.volume * self.act_vol_ratio,
order_type=self.order_type,
order_time=self.cur_datetime,
grid=grid)
if not act_vt_orderids:
self.write_error(f'spd_short{self.act_vt_symbol}开空仓{grid.volume * self.act_vol_ratio}手失败,'
f'委托价:{self.cur_act_tick.bid_price1}')
f'委托价:{self.cur_act_tick.bid_price_1}')
return []
# 开多被动腿
pas_vt_orderids = self.buy(vt_symbol=self.pas_vt_symbol,
price=self.cur_pas_tick.ask_price1,
price=self.cur_pas_tick.ask_price_1,
volume=grid.volume * self.pas_vol_ratio,
order_type=self.order_type,
order_time=self.cur_datetime,
grid=grid)
if not pas_vt_orderids:
self.write_error(f'spd_short{self.pas_vt_symbol}开多仓{grid.volume * self.pas_vol_ratio}手失败,'
f'委托价:{self.cur_pas_tick.ask_price1}')
f'委托价:{self.cur_pas_tick.ask_price_1}')
return []
grid.order_status = True
@ -1035,26 +1141,26 @@ class CtaSpreadTemplate(CtaTemplate):
# 开多主动腿
act_vt_orderids = self.buy(vt_symbol=self.act_vt_symbol,
price=self.cur_act_tick.ask_price1,
price=self.cur_act_tick.ask_price_1,
volume=grid.volume * self.act_vol_ratio,
order_type=self.order_type,
order_time=self.cur_datetime,
grid=grid)
if not act_vt_orderids:
self.write_error(f'spd_short{self.act_vt_symbol}开多仓{grid.volume * self.act_vol_ratio}手失败,'
f'委托价:{self.cur_act_tick.ask_price1}')
f'委托价:{self.cur_act_tick.ask_price_1}')
return []
# 开空被动腿
pas_vt_orderids = self.short(vt_symbol=self.pas_vt_symbol,
price=self.cur_pas_tick.bid_price1,
price=self.cur_pas_tick.bid_price_1,
volume=grid.volume * self.pas_vol_ratio,
order_type=self.order_type,
order_time=self.cur_datetime,
grid=grid)
if not pas_vt_orderids:
self.write_error(f'spd_short{self.pas_vt_symbol}开空仓{grid.volume * self.pas_vol_ratio}手失败,'
f'委托价:{self.cur_pas_tick.bid_price1}')
f'委托价:{self.cur_pas_tick.bid_price_1}')
return []
grid.order_status = True
@ -1066,7 +1172,7 @@ class CtaSpreadTemplate(CtaTemplate):
# ----------------------------------------------------------------------
def spd_sell(self, grid: CtaGrid, force: bool = False):
"""非标准合约的套利平正套指令"""
self.write_log(u'套利平正套单,price={0,volume={}'.format(grid.close_price, grid.volume))
self.write_log(u'套利平正套单,price={},volume={}'.format(grid.close_price, grid.volume))
if self.entrust != 0:
self.write_log(u'正在委托,不平仓')
return []
@ -1106,26 +1212,26 @@ class CtaSpreadTemplate(CtaTemplate):
# 主动腿多单平仓
act_vt_orderids = self.sell(vt_symbol=self.act_vt_symbol,
price=self.cur_act_tick.bid_price1,
price=self.cur_act_tick.bid_price_1,
volume=grid.volume * self.act_vol_ratio,
order_type=self.order_type,
order_time=self.cur_datetime,
grid=grid)
if not act_vt_orderids:
self.write_error(f'spd_sell{self.act_vt_symbol}多单平仓{grid.volume * self.act_vol_ratio}手失败,'
f'委托价:{self.cur_act_tick.bid_price1}')
f'委托价:{self.cur_act_tick.bid_price_1}')
return []
# 被动腿空单平仓
pas_vt_orderids = self.cover(vt_symbol=self.pas_vt_symbol,
price=self.cur_pas_tick.ask_price1,
price=self.cur_pas_tick.ask_price_1,
volume=grid.volume * self.pas_vol_ratio,
order_type=self.order_type,
order_time=self.cur_datetime,
grid=grid)
if not pas_vt_orderids:
self.write_error(f'spd_sell{self.pas_vt_symbol}空单平仓{grid.volume * self.pas_vol_ratio}手失败,'
f'委托价:{self.cur_pas_tick.ask_price1}')
f'委托价:{self.cur_pas_tick.ask_price_1}')
return []
grid.order_status = True
@ -1155,6 +1261,13 @@ class CtaSpreadTemplate(CtaTemplate):
self.write_log(u'实际价差{}不满足:{}'.format(self.cur_spd_tick.ask_price_1, grid.close_price))
return []
self.act_pos = self.cta_engine.get_position_holding(vt_symbol=self.act_vt_symbol)
self.pas_pos = self.cta_engine.get_position_holding(vt_symbol=self.pas_vt_symbol)
if not all([self.act_pos, self.pas_pos]):
self.write_error('主动腿/被动退得持仓数据不存在')
return []
act_close_volume = grid.snapshot.get('act_open_volume')
pas_close_volume = grid.snapshot.get('pas_open_volume')
@ -1170,26 +1283,26 @@ class CtaSpreadTemplate(CtaTemplate):
# 主动腿空单平仓
act_vt_orderids = self.cover(vt_symbol=self.act_vt_symbol,
price=self.cur_act_tick.ask_price1,
price=self.cur_act_tick.ask_price_1,
volume=grid.volume * self.act_vol_ratio,
order_type=self.order_type,
order_time=self.cur_datetime,
grid=grid)
if not act_vt_orderids:
self.write_error(f'spd_cover{self.act_vt_symbol}空单平仓{grid.volume * self.act_vol_ratio}手失败,'
f'委托价:{self.cur_act_tick.ask_price1}')
f'委托价:{self.cur_act_tick.ask_price_1}')
return []
# 被动腿多单平仓
pas_vt_orderids = self.sell(vt_symbol=self.pas_vt_symbol,
price=self.cur_pas_tick.bid_price1,
price=self.cur_pas_tick.bid_price_1,
volume=grid.volume * self.pas_vol_ratio,
order_type=self.order_type,
order_time=self.cur_datetime,
grid=grid)
if not pas_vt_orderids:
self.write_error(f'spd_cover{self.pas_vt_symbol}多单平仓{grid.volume * self.pas_vol_ratio}手失败,'
f'委托价:{self.cur_pas_tick.bid_price1}')
f'委托价:{self.cur_pas_tick.bid_price_1}')
return []
grid.order_status = True

View File

@ -83,8 +83,8 @@ class CtaGrid(object):
j['snapshot'] = self.snapshot # 切片数据
# datetime => string
j['open_time'] = self.open_time.strftime('%Y-%m-%d %H:%M:%S') if isinstance(self.open_time, datetime) else ''
j['order_time'] = self.order_time.strftime('%Y-%m-%d %H:%M:%S') if isinstance(self.order_time, datetime) else ''
j['open_time'] = self.open_time.strftime('%Y-%m-%d %H:%M:%S') if isinstance(self.open_time, datetime) else self.open_time
j['order_time'] = self.order_time.strftime('%Y-%m-%d %H:%M:%S') if isinstance(self.order_time, datetime) else self.order_time
return j
@ -646,7 +646,7 @@ class CtaGridTrade(CtaComponent):
for i in range(0, lots, 1):
# 做多,开仓价为下阻力线-网格高度*i平仓价为开仓价+止盈高度,开仓数量为缺省
open_price = int((down_line - self.grid_height * down_rate * i) / self.price_tick) * self.price_tick
close_price = int((open_price + self.grid_win * down_rate * i) / self.price_tick) * self.price_tick
close_price = int((open_price + self.grid_win * down_rate ) / self.price_tick) * self.price_tick
grid = CtaGrid(direction=Direction.LONG,
open_price=open_price,
@ -687,7 +687,7 @@ class CtaGridTrade(CtaComponent):
# 做空,开仓价为上阻力线+网格高度*i平仓价为开仓价-止盈高度,开仓数量为缺省
for i in range(0, lots, 1):
open_price = int((upper_line + self.grid_height * upper_rate * i) / self.price_tick) * self.price_tick
close_price = int((open_price - self.grid_win * upper_rate * i) / self.price_tick) * self.price_tick
close_price = int((open_price - self.grid_win * upper_rate ) / self.price_tick) * self.price_tick
grid = CtaGrid(direction=Direction.SHORT,
open_price=open_price,

View File

@ -108,6 +108,7 @@ class TdxFutureData(object):
# 所有期货合约的本地缓存
self.future_contracts = get_future_contracts()
def write_log(self, content):
if self.strategy:
self.strategy.write_log(content)

View File

@ -3,7 +3,8 @@
#__author__ = 'yangyang'
# 修改:
# 1 输入单个合约时,标题不再扩展为 合约.标题
# 2. 下载tick时5当行情都下载
# 2. 下载tick时5档行情都下载
# 3. 五档行情变量调整适合vnpy的命名方式
import csv
from datetime import date, datetime
@ -120,6 +121,10 @@ class DataDownloader:
"focus_datetime": self._start_dt_nano,
"focus_position": 0,
}
if len(self._symbol_list) ==1:
single_exchange, single_symbol = self._symbol_list[0].split('.')
else:
single_exchange, single_symbol = None, None
# 还没有发送过任何请求, 先请求定位左端点
await self._api._send_chan.send(chart_info)
chart = _get_obj(self._api._data, ["charts", chart_info["chart_id"]])
@ -167,7 +172,13 @@ class DataDownloader:
# 写入文件头
csv_header = ["datetime"]
for symbol in self._symbol_list:
# 单一合约时,添加合约和交易所
if single_exchange:
csv_header.extend(['symbol', 'exchange'])
for col in data_cols:
if col.startswith('bid_') or col.startswith('ask_'):
col = col[:-1] + '_' + col[-1]
if len(self._symbol_list) > 2:
csv_header.append(symbol + "." + col)
else:
@ -175,6 +186,11 @@ class DataDownloader:
csv_writer.writerow(csv_header)
row = [self._nano_to_str(item["datetime"])]
# 单一合约时,添加合约和交易所
if single_exchange:
row.extend([single_symbol, single_exchange])
for col in data_cols:
row.append(self._get_value(item, col))
for i in range(1, len(self._symbol_list)):
@ -183,7 +199,11 @@ class DataDownloader:
k = {} if tid == -1 else serials[i]["data"].get(str(tid), {})
for col in data_cols:
row.append(self._get_value(k, col))
csv_writer.writerow(row)
# 抛弃盘前的脏数据
if self._dur_nano == 0 and str(row[3]) == 'nan':
p = 1
else:
csv_writer.writerow(row)
current_id += 1
self._current_dt_nano = item["datetime"]
# 当前 id 已超出订阅范围, 需重新订阅后续数据
@ -213,5 +233,5 @@ class DataDownloader:
def _nano_to_str(nano):
dt = datetime.fromtimestamp(nano // 1000000000)
s = dt.strftime('%Y-%m-%d %H:%M:%S')
s += '.' + str(int(nano % 1000000000)).zfill(9)
s += '.' + str(int(nano % 1000000000)).zfill(9)[:3]
return s

View File

@ -0,0 +1,308 @@
# -*- coding:UTF-8 -*-
# Author chenfeng
import traceback
from contextlib import closing
import os
from datetime import datetime, timedelta
from functools import lru_cache
from tqsdk import TqApi, TqSim
from vnpy.data.tq.downloader import DataDownloader
from vnpy.trader.constant import (
Direction,
Exchange,
Product,
Offset,
Status,
OptionType,
OrderType,
Interval,
)
from vnpy.trader.object import TickData
from vnpy.trader.utility import extract_vt_symbol, get_trading_date
import pandas as pd
import csv
# pd.pandas.set_option('display.max_rows', None) # 设置最大显示行数超过该值用省略号代替为None时显示所有行。
# pd.pandas.set_option('display.max_columns', None) # 设置最大显示列数超过该值用省略号代替为None时显示所有列。
# pd.pandas.reset_option(‘参数名’, 参数值) # 恢复默认相关选项
tick_csv_header = [
"datetime","symbol", "exchange", "last_price","highest","lowest","volume","amount","open_interest",
"upper_limit","lower_limit","bid_price1","bid_volume1","ask_price1",
"ask_volume1","bid_price2","bid_volume2","ask_price2","ask_volume2",
"bid_price3","bid_volume3","ask_price3","ask_volume3","bid_price4",
"bid_volume4",
"ask_price4","ask_volume4",
"bid_price5","bid_volume5","ask_price5","ask_volume5"
]
@lru_cache(maxsize=9999)
def to_vt_symbol(tq_symbol: str) -> str:
""""""
if "KQ.m" in tq_symbol:
ins_type, instrument = tq_symbol.split("@")
exchange, symbol = instrument.split(".")
return f"{symbol}88.{exchange}"
elif "KQ.i" in tq_symbol:
ins_type, instrument = tq_symbol.split("@")
exchange, symbol = instrument.split(".")
return f"{symbol}99.{exchange}"
else:
exchange, symbol = tq_symbol.split(".")
return f"{symbol}.{exchange}"
@lru_cache(maxsize=9999)
def to_tq_symbol(symbol: str, exchange: Exchange) -> str:
"""
TQSdk exchange first
"""
for count, word in enumerate(symbol):
if word.isdigit():
break
fix_symbol = symbol
if exchange in [Exchange.INE, Exchange.SHFE, Exchange.DCE]:
fix_symbol = symbol.lower()
# Check for index symbol
time_str = symbol[count:]
if time_str in ["88"]:
return f"KQ.m@{exchange.value}.{fix_symbol[:count]}"
if time_str in ["99"]:
return f"KQ.i@{exchange.value}.{fix_symbol[:count]}"
return f"{exchange.value}.{fix_symbol}"
def generate_tick_from_dict(vt_symbol: str, data: dict) -> TickData:
"""
生成TickData
"""
symbol, exchange = extract_vt_symbol(vt_symbol)
if '.' in data["datetime"]:
time_format = "%Y-%m-%d %H:%M:%S.%f"
else:
time_format = "%Y-%m-%d %H:%M:%S"
return TickData(
symbol=symbol,
exchange=exchange,
datetime=datetime.strptime(data["datetime"][0:26], time_format),
name=symbol,
volume=int(data["volume"]),
open_interest=data["open_interest"],
last_price=float(data["last_price"]),
#limit_up=float(data["upper_limit"]) if data["upper_limit"] !='#N/A' else None,
#limit_down=float(data["lower_limit"]),
high_price=float(data["highest"]),
low_price=float(data["lowest"]),
bid_price_1=float(data["bid_price1"]),
bid_price_2=float(data["bid_price2"]),
bid_price_3=float(data["bid_price3"]),
bid_price_4=float(data["bid_price4"]),
bid_price_5=float(data["bid_price5"]),
ask_price_1=float(data["ask_price1"]),
ask_price_2=float(data["ask_price2"]),
ask_price_3=float(data["ask_price3"]),
ask_price_4=float(data["ask_price4"]),
ask_price_5=float(data["ask_price5"]),
bid_volume_1=int(data["bid_volume1"]),
bid_volume_2=int(data["bid_volume2"]),
bid_volume_3=int(data["bid_volume3"]),
bid_volume_4=int(data["bid_volume4"]),
bid_volume_5=int(data["bid_volume5"]),
ask_volume_1=int(data["ask_volume1"]),
ask_volume_2=int(data["ask_volume2"]),
ask_volume_3=int(data["ask_volume3"]),
ask_volume_4=int(data["ask_volume4"]),
ask_volume_5=int(data["ask_volume5"]),
gateway_name='',
)
class TqFutureData():
def __init__(self, strategy=None):
self.strategy = strategy # 传进来策略实例,这样可以写日志到策略实例
self.api = TqApi(TqSim())
def get_tick_serial(self, vt_symbol: str):
# 获取最新的8964个数据 tick的话就相当于只有50分钟左右
try:
symbol, exchange = extract_vt_symbol(vt_symbol)
tq_symbol = to_tq_symbol(symbol, exchange)
# 使用with closing机制确保下载完成后释放对应的资源
with closing(self.api):
# 获得 pp2009 tick序列的引用
ticks = self.api.get_tick_serial(symbol=tq_symbol, data_length=8964) # 每个序列最大支持请求 8964 个数据
return ticks # 8964/3/60=49.8分钟
except Exception as ex:
print(u'获取历史tick数据出错{},{}'.format(str(ex), traceback.format_exc()))
return None
def download_tick_history_to_csv(self, vt_symbol: str, cache_file: str, start_date: datetime, end_date: datetime):
symbol, exchange = extract_vt_symbol(vt_symbol)
tq_symbol = to_tq_symbol(symbol, exchange)
td = DataDownloader(self.api, symbol_list=tq_symbol, dur_sec=0, # Tick数据为dur_sec=0
start_dt=start_date, end_dt=end_date,
csv_file_name=cache_file)
# 使用with closing机制确保下载完成后释放对应的资源
# with closing(self.api): # 不能这样关闭,套利要下两个腿,所以在策略中关闭
# while not td.is_finished():
# self.api.wait_update()
# print(f"progress:{vt_symbol}--{start_date}--{end_date}: {td.get_progress()}")
# self.write_error(f"{vt_symbol}--{start_date}--{end_date}历史数据已经下载到csv")
while not td.is_finished():
self.api.wait_update()
self.write_log(f"progress:{vt_symbol}--{start_date}--{end_date}: {td.get_progress()}")
self.write_log(f"{vt_symbol}--{start_date}--{end_date}历史数据已经下载到csv")
def close_api(self):
# 关闭api,释放资源 download_tick_history_to_csv 中因为要下多个所以这里手动关闭
self.api.close()
def get_tick_from_cache(self, vt_symbol: str, trading_day: str):
"""从本地缓存文件读取, 返回[]"""
if '-' in trading_day:
trading_day = trading_day.replace('-', '')
symbol, exchange = extract_vt_symbol(vt_symbol)
vnpy_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
ticks_file = os.path.abspath(os.path.join(vnpy_folder, 'tick_data', 'tq', 'future', trading_day[0:6],
f'{symbol}_{trading_day}.csv'))
tick_dict_list = []
if os.path.exists(ticks_file):
try:
with open(file=ticks_file, mode='r', encoding='utf-8', ) as f:
reader = csv.DictReader(f=f, fieldnames=tick_csv_header, delimiter=",")
for row in reader:
if str(row.get('last_price','nan')) not in['nan','last_price']:
tick_dict_list.append(row)
return tick_dict_list
except Exception as ex:
self.write_log(f'从缓存文件读取{vt_symbol},交易日{trading_day}异常:{str(ex)}')
return []
def get_ticks(self, vt_symbol: str, start_date: datetime, end_date: datetime = None):
"""获取历史tick"""
# 1.0从天勤接口下载指定日期的合约的tick数据
self.write_log(f"从天勤请求合约:{vt_symbol}开始时间:{start_date}的历史tick数据")
symbol, exchange = extract_vt_symbol(vt_symbol)
if end_date is None:
end_date = datetime.now().replace(hour=16)
n_days = (end_date - start_date).days
if n_days <= 0:
n_days = 1
all_ticks = []
# 轮询每一天,读取缓存数据
for n in range(n_days+1):
trading_date = start_date + timedelta(days=n)
if trading_date.isoweekday() in [6, 7]:
continue
trading_day = trading_date.strftime('%Y%m%d')
day_ticks = self.get_tick_from_cache(vt_symbol=vt_symbol, trading_day=trading_day)
if day_ticks:
self.write_log(f'读取{vt_symbol} {trading_day}缓存数据{len(day_ticks)}')
all_ticks.extend(day_ticks)
if all_ticks:
last_tick_dt = all_ticks[-1].get('datetime')
begin_dt = datetime.strptime(last_tick_dt[0:26], "%Y-%m-%d %H:%M:%S.%f")
rt_ticks = self.get_runtime_ticks(vt_symbol=vt_symbol, begin_dt=begin_dt)
if rt_ticks:
all_ticks.extend(rt_ticks)
return all_ticks
def get_runtime_ticks(self, vt_symbol: str, begin_dt: datetime= None):
"""获取实时历史tick"""
self.write_log(f"从天勤请求合约:{vt_symbol}的实时的8964条tick数据")
symbol, exchange = extract_vt_symbol(vt_symbol)
df = self.get_tick_serial(vt_symbol)
ticks = []
if df is None:
return ticks
self.write_log(f"从天勤或历史tick数据成功开始清洗tick")
# print(df.columns.values)
# 给df 的各个列名按vnpy格式重置一下
df.columns = ['datetime', 'id', 'last_price', 'average', 'highest', 'lowest', 'ask_price1',
'ask_volume11', 'bid_price1', 'bid_volume11', 'ask_price2', 'ask_volume12',
'bid_price2', 'bid_volume12', 'ask_price3', 'ask_volume13', 'bid_price3',
'bid_volume13', 'ask_price4', 'ask_volume14', 'bid_price4', 'bid_volume14',
'ask_price5', 'ask_volume15', 'bid_price5', 'bid_volume15', 'volume', 'amount',
'open_interest', 'symbol', 'duration']
df.drop(['id','average','duration'], axis=1)
for index, row in df.iterrows():
# 日期时间, 成交价, 成交量, 总量, 属性(持仓增减), B1价, B1量, B2价, B2量, B3价, B3量, S1价, S1量, S2价, S2量, S3价, S3量, BS
# 日期时间, 成交价,当日最高价,当日最低价, B1价, B1量S1价, S1量日内成交量金额持仓量
# 0 1 2 3 4 5 6 7 8 9 10
tick = row.to_dict()
if str(tick['last_price']) == 'nan':
continue
# datetime: 自unix epoch(1970-01-01 00:00:00 GMT)以来的纳秒数
# 1.0、转换读取的tick 时间文本 到 datetime格式
# tick_datetime = datetime.strptime(tick['datetime'], "%Y-%m-%d %H:%M:%S.%f")
tick_datetime = datetime.strptime(self._nano_to_str(tick['datetime']), "%Y-%m-%d %H:%M:%S.%f")
if tick_datetime <= begin_dt:
continue
# 2.0、获取tick对应的交易日
tick_tradingday = get_trading_date(tick_datetime)
tick.update({'symbol': symbol, 'exchange': exchange.value, 'trading_day': tick_tradingday})
tick['datetime'] = tick_datetime.strftime("%Y-%m-%d %H:%M:%S.%f")
ticks.append(tick)
del df
return ticks
@staticmethod
def _nano_to_str(nano):
# nano: 自unix epoch(1970-01-01 00:00:00 GMT)以来的纳秒数 9位为纳秒 6位为微秒%f只用到微秒所以[:6]
dt = datetime.fromtimestamp(nano // 1000000000)
s = dt.strftime('%Y-%m-%d %H:%M:%S')
s += '.' + str(int(nano % 1000000000)).zfill(9)[:3] # zfill() 方法返回指定长度的字符串原字符串右对齐前面填充0。
return s
def write_log(self, msg):
if self.strategy is None:
print(msg)
else:
self.strategy.write_log(msg)
def write_error(self, msg):
if self.strategy is None:
print(msg)
else:
self.strategy.write_error(msg)
if __name__ == '__main__':
# tqsdk = Query_tqsdk_data(strategy=self) # 在策略中使用
tqsdk = TqFutureData()
# ticks = tqsdk.query_tick_current("pp2009.DCE")
#tick_df = tqsdk.query_tick_history_data(vt_symbol="ni2009.SHFE", start_date=pd.to_datetime("2020-07-22"))
#print(tick_df)
ticks = tqsdk.get_runtime_ticks("ni2009.SHFE")
print(ticks[0])
print(ticks[-1])

View File

@ -476,6 +476,13 @@ class CtpGateway(BaseGateway):
""""""
self.td_api.query_position()
def query_history(self, req: HistoryRequest) -> List[BarData]:
"""查询K线历史"""
if self.tq_api:
return self.tq_api.query_history(req)
else:
return []
def close(self):
""""""
if self.md_api:
@ -1908,7 +1915,9 @@ class TqMdApi():
for vt_symbol, quote in self.quote_objs:
if self.api.is_changing(quote):
tick = self.generate_tick_from_quote(vt_symbol, quote)
tick and self.gateway.on_tick(tick) and self.gateway.on_custom_tick(tick)
if tick:
self.gateway.on_tick(tick)
self.gateway.on_custom_tick(tick)
def subscribe(self, req: SubscribeRequest) -> None:
"""

View File

@ -1646,7 +1646,7 @@ class PbTdApi(object):
order.status = Status.REJECTED
self.gateway.write_log(f'dbf批量下单委托被拒:{order.__dict__}')
self.gateway.order_manager.on_order(order)
self.gateway.write_error(msg=err_msg, error={"ErrorID": err_id, "ErrorMsg": "委托失败"})
self.gateway.write_error(msg=f'{order.direction.value},{order.vt_symbol},{err_msg}', error={"ErrorID": err_id, "ErrorMsg": "委托失败"})
if sys_orderid != '0':
self.gateway.order_manager.update_orderid_map(local_orderid=local_orderid,

View File

@ -88,6 +88,7 @@ class OffsetConverter:
# 平今/平昨拆分
elif req.exchange in [Exchange.SHFE, Exchange.INE]:
print(f'转换平今/平昨')
return holding.convert_order_request_shfe(req)
else:
return [req]
@ -262,10 +263,12 @@ class PositionHolding:
td_available = self.long_td - self.long_td_frozen
if req.volume > pos_available:
print(f'{req.vt_symbol}没有可用仓位')
return []
elif req.volume <= td_available:
req_td = copy(req)
req_td.offset = Offset.CLOSETODAY
print(f'{req.vt_symbol} 平仓=>平今')
return [req_td]
else:
req_list = []
@ -274,11 +277,13 @@ class PositionHolding:
req_td = copy(req)
req_td.offset = Offset.CLOSETODAY
req_td.volume = td_available
print(f'{req.vt_symbol} 平仓 {req_td.volume}手 =>平今')
req_list.append(req_td)
req_yd = copy(req)
req_yd.offset = Offset.CLOSEYESTERDAY
req_yd.volume = req.volume - td_available
print(f'{req.vt_symbol} 平仓 {req_yd.volume}手 =>平昨')
req_list.append(req_yd)
return req_list

View File

@ -627,7 +627,7 @@ class TradingWidget(QtWidgets.QWidget):
[order_type.value for order_type in OrderType])
double_validator = QtGui.QDoubleValidator()
double_validator.setBottom(0)
#double_validator.setBottom(0)
self.price_line = QtWidgets.QLineEdit()
self.price_line.setValidator(double_validator)