diff --git a/vnpy/app/cta_crypto/engine.py b/vnpy/app/cta_crypto/engine.py index 0fa2a790..24408e8d 100644 --- a/vnpy/app/cta_crypto/engine.py +++ b/vnpy/app/cta_crypto/engine.py @@ -1741,7 +1741,11 @@ class CtaEngine(BaseEngine): self.strategy_setting[strategy_name] = new_config - save_json(self.setting_filename, self.strategy_setting) + sorted_setting = OrderedDict() + for k in sorted(self.strategy_setting.keys()): + sorted_setting.update({k: self.strategy_setting.get(k)}) + + save_json(self.setting_filename, sorted_setting) def remove_strategy_setting(self, strategy_name: str): """ @@ -1751,7 +1755,11 @@ class CtaEngine(BaseEngine): return self.write_log(f'移除CTA数字货币引擎{strategy_name}的配置') self.strategy_setting.pop(strategy_name) - save_json(self.setting_filename, self.strategy_setting) + sorted_setting = OrderedDict() + for k in sorted(self.strategy_setting.keys()): + sorted_setting.update({k: self.strategy_setting.get(k)}) + + save_json(self.setting_filename, sorted_setting) def put_stop_order_event(self, stop_order: StopOrder): """ diff --git a/vnpy/app/cta_stock/engine.py b/vnpy/app/cta_stock/engine.py index e876a9e6..53f8f7f0 100644 --- a/vnpy/app/cta_stock/engine.py +++ b/vnpy/app/cta_stock/engine.py @@ -1742,7 +1742,11 @@ class CtaEngine(BaseEngine): self.strategy_setting[strategy_name] = new_config - save_json(self.setting_filename, self.strategy_setting) + sorted_setting = OrderedDict() + for k in sorted(self.strategy_setting.keys()): + sorted_setting.update({k: self.strategy_setting.get(k)}) + + save_json(self.setting_filename, sorted_setting) def remove_strategy_setting(self, strategy_name: str): """ @@ -1752,7 +1756,10 @@ class CtaEngine(BaseEngine): return self.write_log(f'移除CTA股票引擎{strategy_name}的配置') self.strategy_setting.pop(strategy_name) - save_json(self.setting_filename, self.strategy_setting) + sorted_setting = OrderedDict() + for k in sorted(self.strategy_setting.keys()): + sorted_setting.update({k: self.strategy_setting.get(k)}) + save_json(self.setting_filename, sorted_setting) def put_stop_order_event(self, stop_order: StopOrder): """ diff --git a/vnpy/app/cta_stock/template.py b/vnpy/app/cta_stock/template.py index 9b824100..98745a7d 100644 --- a/vnpy/app/cta_stock/template.py +++ b/vnpy/app/cta_stock/template.py @@ -373,7 +373,7 @@ class StockPolicy(CtaPolicy): self.cur_trading_date = json_data.get('cur_trading_date', None) self.sub_tns = json_data.get('sub_tns',{}) signals = json_data.get('signals', {}) - for kline_name, signal in signals: + for k, signal in signals.items(): last_signal = signal.get('last_signal', "") str_ast_signal_time = signal.get('last_signal_time', "") try: @@ -383,7 +383,7 @@ class StockPolicy(CtaPolicy): last_signal_time = None except Exception as ex: last_signal_time = None - self.signals.update({kline_name: {'last_signal': last_signal, 'last_signal_time': last_signal_time}}) + self.signals.update({k: {'last_signal': last_signal, 'last_signal_time': last_signal_time}}) def to_json(self): @@ -400,7 +400,7 @@ class StockPolicy(CtaPolicy): '%Y-%m-%d %H:%M:%S') if last_signal_time is not None else "" } }) - j['singlals'] = d + j['signals'] = d return j diff --git a/vnpy/app/cta_strategy_pro/engine.py b/vnpy/app/cta_strategy_pro/engine.py index 81c7c97f..2eb47c7e 100644 --- a/vnpy/app/cta_strategy_pro/engine.py +++ b/vnpy/app/cta_strategy_pro/engine.py @@ -1906,7 +1906,11 @@ class CtaEngine(BaseEngine): self.strategy_setting[strategy_name] = new_config - save_json(self.setting_filename, self.strategy_setting) + sorted_setting = OrderedDict() + for k in sorted(self.strategy_setting.keys()): + sorted_setting.update({k: self.strategy_setting.get(k)}) + + save_json(self.setting_filename, sorted_setting) def remove_strategy_setting(self, strategy_name: str): """ @@ -1916,7 +1920,11 @@ class CtaEngine(BaseEngine): return self.write_log(f'移除CTA引擎{strategy_name}的配置') self.strategy_setting.pop(strategy_name) - save_json(self.setting_filename, self.strategy_setting) + sorted_setting = OrderedDict() + for k in sorted(self.strategy_setting.keys()): + sorted_setting.update({k: self.strategy_setting.get(k)}) + + save_json(self.setting_filename, sorted_setting) def put_stop_order_event(self, stop_order: StopOrder): """ diff --git a/vnpy/app/cta_strategy_pro/spread_testing.py b/vnpy/app/cta_strategy_pro/spread_testing.py index 2a75a5d5..2d2a06b7 100644 --- a/vnpy/app/cta_strategy_pro/spread_testing.py +++ b/vnpy/app/cta_strategy_pro/spread_testing.py @@ -29,6 +29,7 @@ from vnpy.trader.constant import ( from vnpy.trader.utility import ( extract_vt_symbol, get_underlying_symbol, + get_trading_date, import_module_by_str ) @@ -58,7 +59,7 @@ class SpreadTestingEngine(BackTestingEngine): """Constructor""" super().__init__(event_engine) self.tick_path = None # tick级别回测, 路径 - + self.use_tq = False self.strategy_start_date_dict = {} self.strategy_end_date_dict = {} @@ -66,6 +67,8 @@ class SpreadTestingEngine(BackTestingEngine): self.output('portfolio prepare_env') super().prepare_env(test_setting) + self.use_tq = test_setting.get('use_tq', False) + def load_strategy(self, strategy_name: str, strategy_setting: dict = None): """ 装载回测的策略 @@ -205,6 +208,8 @@ class SpreadTestingEngine(BackTestingEngine): def load_csv_file(self, tick_folder, vt_symbol, tick_date): """从文件中读取tick,返回list[{dict}]""" + if self.use_tq: + return self.load_tq_csv_file(tick_folder, vt_symbol, tick_date) symbol, exchange = extract_vt_symbol(vt_symbol) underly_symbol = get_underlying_symbol(symbol) @@ -271,6 +276,65 @@ class SpreadTestingEngine(BackTestingEngine): return ticks + def load_tq_csv_file(self, tick_folder, vt_symbol, tick_date): + """从天勤下载的csv文件中读取tick,返回list[{dict}]""" + + symbol, exchange = extract_vt_symbol(vt_symbol) + underly_symbol = get_underlying_symbol(symbol) + exchange_folder = VN_EXCHANGE_TICKFOLDER_MAP.get(exchange.value) + + file_path = os.path.abspath( + os.path.join( + tick_folder, 'tq', 'future', + tick_date.strftime('%Y%m'), + '{}_{}.csv'.format(symbol, tick_date.strftime('%Y%m%d')))) + + ticks = [] + if not os.path.isfile(file_path): + self.write_log(u'{0}文件不存在'.format(file_path)) + return None + try: + df = pd.read_csv(file_path, parse_dates=False) + # datetime,symbol,exchange,last_price,highest,lowest,volume,amount,open_interest,upper_limit,lower_limit, + # bid_price_1,bid_volume_1,ask_price_1,ask_volume_1, + # bid_price_2,bid_volume_2,ask_price_2,ask_volume_2, + # bid_price_3,bid_volume_3,ask_price_3,ask_volume_3, + # bid_price_4,bid_volume_4,ask_price_4,ask_volume_4, + # bid_price_5,bid_volume_5,ask_price_5,ask_volume_5 + + self.write_log(u'加载csv文件{}'.format(file_path)) + last_time = None + for index, row in df.iterrows(): + + tick = row.to_dict() + tick['date'], tick['time'] = tick['datetime'].split(' ') + tick.update({'trading_day': tick_date.strftime('%Y-%m-%d')}) + tick_datetime = datetime.strptime(tick['datetime'], '%Y-%m-%d %H:%M:%S.%f') + + # 修正毫秒 + if tick['time'] == last_time: + # 与上一个tick的时间(去除毫秒后)相同,修改为500毫秒 + tick_datetime = tick_datetime.replace(microsecond=500) + tick['time'] = tick_datetime.strftime('%H:%M:%S.%f') + else: + last_time = tick['time'] + tick_datetime = tick_datetime.replace(microsecond=0) + tick['time'] = tick_datetime.strftime('%H:%M:%S.%f') + tick['datetime'] = tick_datetime + + # 排除涨停/跌停的数据 + if (float(tick['bid_price_1']) == float('1.79769E308') and int(tick['bid_volume_1']) == 0) \ + or (float(tick['ask_price_1']) == float('1.79769E308') and int(tick['ask_volume_1']) == 0): + continue + + ticks.append(tick) + + del df + except Exception as ex: + self.write_log(u'{0}文件读取不成功'.format(file_path)) + return None + return ticks + def load_bz2_cache(self, cache_folder, cache_symbol, cache_date): """ 加载缓存数据 diff --git a/vnpy/app/cta_strategy_pro/template.py b/vnpy/app/cta_strategy_pro/template.py index c30ff6d9..b3dc1cee 100644 --- a/vnpy/app/cta_strategy_pro/template.py +++ b/vnpy/app/cta_strategy_pro/template.py @@ -2199,8 +2199,8 @@ class CtaProFutureTemplate(CtaProTemplate): self.write_log(u'空单对锁格:{}'.format([g.to_json() for g in locked_short_grids])) if locked_long_volume != locked_short_volume: - self.write_error(u'对锁格多空数量不一致,不能解锁.\n多:{},\n空:{}' - .format(locked_long_volume, locked_short_volume)) + self.write_error(u'{}对锁格多空数量不一致,不能解锁.\n多:{},\n空:{}' + .format(self.strategy_name, locked_long_volume, locked_short_volume)) return # 检查所有品种得昨仓是否满足数量 diff --git a/vnpy/app/cta_strategy_pro/template_spread.py b/vnpy/app/cta_strategy_pro/template_spread.py index 1c703d19..f33a1554 100644 --- a/vnpy/app/cta_strategy_pro/template_spread.py +++ b/vnpy/app/cta_strategy_pro/template_spread.py @@ -4,6 +4,9 @@ import os import traceback from copy import copy +import bz2 +import pickle +import zlib from vnpy.trader.utility import append_data from .template import ( CtaPosition, @@ -71,10 +74,6 @@ class CtaSpreadTemplate(CtaTemplate): """更新配置参数""" super().update_setting(setting) - # 订阅主动腿/被动腿合约 - self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=self.act_vt_symbol) - self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=self.pas_vt_symbol) - self.act_price_tick = self.cta_engine.get_price_tick(self.act_vt_symbol) self.pas_price_tick = self.cta_engine.get_price_tick(self.pas_vt_symbol) @@ -95,6 +94,102 @@ class CtaSpreadTemplate(CtaTemplate): if len(self.gt.dn_grids) > 0: self.write_log(dn_grids_info) + def sync_data(self): + """同步更新数据""" + if not self.backtesting: + self.write_log(u'保存k线缓存数据') + self.save_klines_to_cache() + + + def save_klines_to_cache(self, kline_names: list = []): + """ + 保存K线数据到缓存 + :param kline_names: 一般为self.klines的keys + :return: + """ + if len(kline_names) == 0: + kline_names = list(self.klines.keys()) + + # 获取保存路径 + save_path = self.cta_engine.get_data_path() + # 保存缓存的文件名 + file_name = os.path.abspath(os.path.join(save_path, f'{self.strategy_name}_klines.pkb2')) + with bz2.BZ2File(file_name, 'wb') as f: + klines = {} + for kline_name in kline_names: + kline = self.klines.get(kline_name, None) + if kline: + kline.strategy = None + kline.cb_on_bar = None + if kline.cb_on_period: + kline.cb_on_period = None + kline.cb_dict = {} + klines.update({kline_name: kline}) + pickle.dump(klines, f) + + def load_klines_from_cache(self, kline_names: list = []): + """ + 从缓存加载K线数据 + :param kline_names: + :return: + """ + if len(kline_names) == 0: + kline_names = list(self.klines.keys()) + + save_path = self.cta_engine.get_data_path() + file_name = os.path.abspath(os.path.join(save_path, f'{self.strategy_name}_klines.pkb2')) + try: + last_bar_dt = None + with bz2.BZ2File(file_name, 'rb') as f: + klines = pickle.load(f) + # 逐一恢复K线 + for kline_name in kline_names: + # 缓存的k线实例 + cache_kline = klines.get(kline_name, None) + # 当前策略实例的K线实例 + strategy_kline = self.klines.get(kline_name, None) + + if cache_kline and strategy_kline: + # 临时保存当前的回调函数 + cb_on_bar = strategy_kline.cb_on_bar + # 缓存实例数据 =》 当前实例数据 + strategy_kline.__dict__.update(cache_kline.__dict__) + + # 所有K线的最后时间 + if last_bar_dt and strategy_kline.cur_datetime: + last_bar_dt = max(last_bar_dt, strategy_kline.cur_datetime) + else: + last_bar_dt = strategy_kline.cur_datetime + + # 重新绑定k线策略与on_bar回调函数 + strategy_kline.strategy = self + strategy_kline.cb_on_bar = cb_on_bar + + self.write_log(f'恢复{kline_name}缓存数据,最新bar结束时间:{last_bar_dt}') + + self.write_log(u'加载缓存k线数据完毕') + return last_bar_dt + except Exception as ex: + self.write_error(f'加载缓存K线数据失败:{str(ex)}') + return None + + def get_klines_snapshot(self): + """返回当前klines的切片数据""" + try: + d = { + 'strategy': self.strategy_name, + 'datetime': datetime.now()} + klines = {} + for kline_name in sorted(self.klines.keys()): + klines.update({kline_name: self.klines.get(kline_name).get_data()}) + kline_names = list(klines.keys()) + binary_data = zlib.compress(pickle.dumps(klines)) + d.update({'kline_names': kline_names, 'klines': binary_data, 'zlib': True}) + return d + except Exception as ex: + self.write_error(f'获取klines切片数据失败:{str(ex)}') + return {} + def init_position(self, status_filter=[True]): """ 初始化Positin @@ -205,6 +300,10 @@ class CtaSpreadTemplate(CtaTemplate): def on_start(self): """启动策略(必须由用户继承实现)""" + # 订阅主动腿/被动腿合约 + self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=self.act_vt_symbol) + self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=self.pas_vt_symbol) + self.write_log(u'启动') self.trading = True self.put_event() @@ -448,7 +547,8 @@ class CtaSpreadTemplate(CtaTemplate): self.update_pos(price=grid.close_price, volume=grid.volume, - operation='cover' if grid.direction == Direction.SHORT else 'sell') + operation='cover' if grid.direction == Direction.SHORT else 'sell', + dt=self.cur_datetime) self.write_log(f'移除网格:{grid.to_json()}') self.gt.remove_grids_by_ids(direction=grid.direction, ids=[grid.id]) @@ -460,7 +560,8 @@ class CtaSpreadTemplate(CtaTemplate): self.write_log(f'{grid.direction.value}单已开仓完毕,,手数:{grid.volume}, 详细:{grid.snapshot}') self.update_pos(price=grid.open_price, volume=grid.volume, - operation='short' if grid.direction == Direction.SHORT else 'buy') + operation='short' if grid.direction == Direction.SHORT else 'buy', + dt=self.cur_datetime) # 网格的所有委托单部分执行完毕 else: self.write_log(f'剩余委托单号:{grid.order_ids}') @@ -656,7 +757,7 @@ class CtaSpreadTemplate(CtaTemplate): order_price = old_order['price'] order_type = old_order.get('order_type', OrderType.LIMIT) - order_retry = old_order['retry'] + order_retry = old_order.get('retry',1) grid = old_order.get('grid', None) if order_retry > 10: msg = u'{} 平仓撤单 {}/{}手, 重试平仓次数{}>10' \ @@ -789,18 +890,23 @@ class CtaSpreadTemplate(CtaTemplate): over_seconds = (dt - order_time).total_seconds() # 只处理未成交的限价委托单 - if order_status in [Status.NOTTRADED] and (order_type == OrderType.LIMIT): + if order_status in [Status.SUBMITTING, Status.NOTTRADED] and (order_type == OrderType.LIMIT): if over_seconds > self.cancel_seconds or force: # 超过设置的时间还未成交 self.write_log(u'超时{}秒未成交,取消委托单:vt_orderid:{},order:{}' .format(over_seconds, vt_orderid, order_info)) - order_info.update({'status': Status.CANCELING}) + order_info.update({'status': Status.CANCELLING}) self.active_orders.update({vt_orderid: order_info}) ret = self.cancel_order(str(vt_orderid)) if not ret: self.write_log(u'撤单失败,更新状态为撤单成功') order_info.update({'status': Status.CANCELLED}) self.active_orders.update({vt_orderid: order_info}) - + else: + if order_grid: + if vt_orderid in order_grid.order_ids: + order_grid.order_ids.remove(vt_orderid) + if len(order_grid.order_ids) == 0: + order_grid.order_status = False continue # 处理状态为‘撤销’的委托单 @@ -925,8 +1031,8 @@ class CtaSpreadTemplate(CtaTemplate): return True # leg1 接近跌停价(10个minDiff 以内) - if self.cur_act_tick.limie_down > 0 \ - and self.cur_act_tick.bid_price_1 - 10 * self.act_price_tick < self.cur_act_tick.limie_down: + if self.cur_act_tick.limit_down > 0 \ + and self.cur_act_tick.bid_price_1 - 10 * self.act_price_tick < self.cur_act_tick.limit_down: self.write_log(u'主动腿 bid_price_1{} 接近跌停价{}' .format(self.cur_act_tick.bid_price_1, self.cur_act_tick.limit_up)) return True @@ -939,8 +1045,8 @@ class CtaSpreadTemplate(CtaTemplate): return True # leg2 接近跌停价(10个minDiff 以内) - if self.cur_pas_tick.limie_down > 0 \ - and self.cur_pas_tick.bid_price_1 - 10 * self.pas_price_tick < self.cur_pas_tick.limie_down: + if self.cur_pas_tick.limit_down > 0 \ + and self.cur_pas_tick.bid_price_1 - 10 * self.pas_price_tick < self.cur_pas_tick.limit_down: self.write_log(u'被动腿 bid_price_1{} 接近跌停价{}' .format(self.cur_pas_tick.bid_price_1, self.cur_pas_tick.limit_up)) return True @@ -976,26 +1082,26 @@ class CtaSpreadTemplate(CtaTemplate): # 开空主动腿 act_vt_orderids = self.short(vt_symbol=self.act_vt_symbol, - price=self.cur_act_tick.bid_price1, + price=self.cur_act_tick.bid_price_1, volume=grid.volume * self.act_vol_ratio, order_type=self.order_type, order_time=self.cur_datetime, grid=grid) if not act_vt_orderids: self.write_error(f'spd_short,{self.act_vt_symbol}开空仓{grid.volume * self.act_vol_ratio}手失败,' - f'委托价:{self.cur_act_tick.bid_price1}') + f'委托价:{self.cur_act_tick.bid_price_1}') return [] # 开多被动腿 pas_vt_orderids = self.buy(vt_symbol=self.pas_vt_symbol, - price=self.cur_pas_tick.ask_price1, + price=self.cur_pas_tick.ask_price_1, volume=grid.volume * self.pas_vol_ratio, order_type=self.order_type, order_time=self.cur_datetime, grid=grid) if not pas_vt_orderids: self.write_error(f'spd_short,{self.pas_vt_symbol}开多仓{grid.volume * self.pas_vol_ratio}手失败,' - f'委托价:{self.cur_pas_tick.ask_price1}') + f'委托价:{self.cur_pas_tick.ask_price_1}') return [] grid.order_status = True @@ -1035,26 +1141,26 @@ class CtaSpreadTemplate(CtaTemplate): # 开多主动腿 act_vt_orderids = self.buy(vt_symbol=self.act_vt_symbol, - price=self.cur_act_tick.ask_price1, + price=self.cur_act_tick.ask_price_1, volume=grid.volume * self.act_vol_ratio, order_type=self.order_type, order_time=self.cur_datetime, grid=grid) if not act_vt_orderids: self.write_error(f'spd_short,{self.act_vt_symbol}开多仓{grid.volume * self.act_vol_ratio}手失败,' - f'委托价:{self.cur_act_tick.ask_price1}') + f'委托价:{self.cur_act_tick.ask_price_1}') return [] # 开空被动腿 pas_vt_orderids = self.short(vt_symbol=self.pas_vt_symbol, - price=self.cur_pas_tick.bid_price1, + price=self.cur_pas_tick.bid_price_1, volume=grid.volume * self.pas_vol_ratio, order_type=self.order_type, order_time=self.cur_datetime, grid=grid) if not pas_vt_orderids: self.write_error(f'spd_short,{self.pas_vt_symbol}开空仓{grid.volume * self.pas_vol_ratio}手失败,' - f'委托价:{self.cur_pas_tick.bid_price1}') + f'委托价:{self.cur_pas_tick.bid_price_1}') return [] grid.order_status = True @@ -1066,7 +1172,7 @@ class CtaSpreadTemplate(CtaTemplate): # ---------------------------------------------------------------------- def spd_sell(self, grid: CtaGrid, force: bool = False): """非标准合约的套利平正套指令""" - self.write_log(u'套利平正套单,price={0,volume={}'.format(grid.close_price, grid.volume)) + self.write_log(u'套利平正套单,price={},volume={}'.format(grid.close_price, grid.volume)) if self.entrust != 0: self.write_log(u'正在委托,不平仓') return [] @@ -1106,26 +1212,26 @@ class CtaSpreadTemplate(CtaTemplate): # 主动腿多单平仓 act_vt_orderids = self.sell(vt_symbol=self.act_vt_symbol, - price=self.cur_act_tick.bid_price1, + price=self.cur_act_tick.bid_price_1, volume=grid.volume * self.act_vol_ratio, order_type=self.order_type, order_time=self.cur_datetime, grid=grid) if not act_vt_orderids: self.write_error(f'spd_sell,{self.act_vt_symbol}多单平仓{grid.volume * self.act_vol_ratio}手失败,' - f'委托价:{self.cur_act_tick.bid_price1}') + f'委托价:{self.cur_act_tick.bid_price_1}') return [] # 被动腿空单平仓 pas_vt_orderids = self.cover(vt_symbol=self.pas_vt_symbol, - price=self.cur_pas_tick.ask_price1, + price=self.cur_pas_tick.ask_price_1, volume=grid.volume * self.pas_vol_ratio, order_type=self.order_type, order_time=self.cur_datetime, grid=grid) if not pas_vt_orderids: self.write_error(f'spd_sell,{self.pas_vt_symbol}空单平仓{grid.volume * self.pas_vol_ratio}手失败,' - f'委托价:{self.cur_pas_tick.ask_price1}') + f'委托价:{self.cur_pas_tick.ask_price_1}') return [] grid.order_status = True @@ -1155,6 +1261,13 @@ class CtaSpreadTemplate(CtaTemplate): self.write_log(u'实际价差{}不满足:{}'.format(self.cur_spd_tick.ask_price_1, grid.close_price)) return [] + self.act_pos = self.cta_engine.get_position_holding(vt_symbol=self.act_vt_symbol) + self.pas_pos = self.cta_engine.get_position_holding(vt_symbol=self.pas_vt_symbol) + + if not all([self.act_pos, self.pas_pos]): + self.write_error('主动腿/被动退得持仓数据不存在') + return [] + act_close_volume = grid.snapshot.get('act_open_volume') pas_close_volume = grid.snapshot.get('pas_open_volume') @@ -1170,26 +1283,26 @@ class CtaSpreadTemplate(CtaTemplate): # 主动腿空单平仓 act_vt_orderids = self.cover(vt_symbol=self.act_vt_symbol, - price=self.cur_act_tick.ask_price1, + price=self.cur_act_tick.ask_price_1, volume=grid.volume * self.act_vol_ratio, order_type=self.order_type, order_time=self.cur_datetime, grid=grid) if not act_vt_orderids: self.write_error(f'spd_cover{self.act_vt_symbol}空单平仓{grid.volume * self.act_vol_ratio}手失败,' - f'委托价:{self.cur_act_tick.ask_price1}') + f'委托价:{self.cur_act_tick.ask_price_1}') return [] # 被动腿多单平仓 pas_vt_orderids = self.sell(vt_symbol=self.pas_vt_symbol, - price=self.cur_pas_tick.bid_price1, + price=self.cur_pas_tick.bid_price_1, volume=grid.volume * self.pas_vol_ratio, order_type=self.order_type, order_time=self.cur_datetime, grid=grid) if not pas_vt_orderids: self.write_error(f'spd_cover,{self.pas_vt_symbol}多单平仓{grid.volume * self.pas_vol_ratio}手失败,' - f'委托价:{self.cur_pas_tick.bid_price1}') + f'委托价:{self.cur_pas_tick.bid_price_1}') return [] grid.order_status = True diff --git a/vnpy/component/cta_grid_trade.py b/vnpy/component/cta_grid_trade.py index 0060a60d..3bef54e2 100644 --- a/vnpy/component/cta_grid_trade.py +++ b/vnpy/component/cta_grid_trade.py @@ -83,8 +83,8 @@ class CtaGrid(object): j['snapshot'] = self.snapshot # 切片数据 # datetime => string - j['open_time'] = self.open_time.strftime('%Y-%m-%d %H:%M:%S') if isinstance(self.open_time, datetime) else '' - j['order_time'] = self.order_time.strftime('%Y-%m-%d %H:%M:%S') if isinstance(self.order_time, datetime) else '' + j['open_time'] = self.open_time.strftime('%Y-%m-%d %H:%M:%S') if isinstance(self.open_time, datetime) else self.open_time + j['order_time'] = self.order_time.strftime('%Y-%m-%d %H:%M:%S') if isinstance(self.order_time, datetime) else self.order_time return j @@ -646,7 +646,7 @@ class CtaGridTrade(CtaComponent): for i in range(0, lots, 1): # 做多,开仓价为下阻力线-网格高度*i,平仓价为开仓价+止盈高度,开仓数量为缺省 open_price = int((down_line - self.grid_height * down_rate * i) / self.price_tick) * self.price_tick - close_price = int((open_price + self.grid_win * down_rate * i) / self.price_tick) * self.price_tick + close_price = int((open_price + self.grid_win * down_rate ) / self.price_tick) * self.price_tick grid = CtaGrid(direction=Direction.LONG, open_price=open_price, @@ -687,7 +687,7 @@ class CtaGridTrade(CtaComponent): # 做空,开仓价为上阻力线+网格高度*i,平仓价为开仓价-止盈高度,开仓数量为缺省 for i in range(0, lots, 1): open_price = int((upper_line + self.grid_height * upper_rate * i) / self.price_tick) * self.price_tick - close_price = int((open_price - self.grid_win * upper_rate * i) / self.price_tick) * self.price_tick + close_price = int((open_price - self.grid_win * upper_rate ) / self.price_tick) * self.price_tick grid = CtaGrid(direction=Direction.SHORT, open_price=open_price, diff --git a/vnpy/data/tdx/tdx_future_data.py b/vnpy/data/tdx/tdx_future_data.py index 7b163036..b0721c8c 100644 --- a/vnpy/data/tdx/tdx_future_data.py +++ b/vnpy/data/tdx/tdx_future_data.py @@ -108,6 +108,7 @@ class TdxFutureData(object): # 所有期货合约的本地缓存 self.future_contracts = get_future_contracts() + def write_log(self, content): if self.strategy: self.strategy.write_log(content) diff --git a/vnpy/data/tq/downloader.py b/vnpy/data/tq/downloader.py index 16b0a528..0849aee0 100644 --- a/vnpy/data/tq/downloader.py +++ b/vnpy/data/tq/downloader.py @@ -3,7 +3,8 @@ #__author__ = 'yangyang' # 修改: # 1, 输入单个合约时,标题不再扩展为 合约.标题 -# 2. 下载tick时,5当行情都下载 +# 2. 下载tick时,5档行情都下载 +# 3. 五档行情变量调整适合vnpy的命名方式 import csv from datetime import date, datetime @@ -120,6 +121,10 @@ class DataDownloader: "focus_datetime": self._start_dt_nano, "focus_position": 0, } + if len(self._symbol_list) ==1: + single_exchange, single_symbol = self._symbol_list[0].split('.') + else: + single_exchange, single_symbol = None, None # 还没有发送过任何请求, 先请求定位左端点 await self._api._send_chan.send(chart_info) chart = _get_obj(self._api._data, ["charts", chart_info["chart_id"]]) @@ -167,7 +172,13 @@ class DataDownloader: # 写入文件头 csv_header = ["datetime"] for symbol in self._symbol_list: + # 单一合约时,添加合约和交易所 + if single_exchange: + csv_header.extend(['symbol', 'exchange']) + for col in data_cols: + if col.startswith('bid_') or col.startswith('ask_'): + col = col[:-1] + '_' + col[-1] if len(self._symbol_list) > 2: csv_header.append(symbol + "." + col) else: @@ -175,6 +186,11 @@ class DataDownloader: csv_writer.writerow(csv_header) row = [self._nano_to_str(item["datetime"])] + + # 单一合约时,添加合约和交易所 + if single_exchange: + row.extend([single_symbol, single_exchange]) + for col in data_cols: row.append(self._get_value(item, col)) for i in range(1, len(self._symbol_list)): @@ -183,7 +199,11 @@ class DataDownloader: k = {} if tid == -1 else serials[i]["data"].get(str(tid), {}) for col in data_cols: row.append(self._get_value(k, col)) - csv_writer.writerow(row) + # 抛弃盘前的脏数据 + if self._dur_nano == 0 and str(row[3]) == 'nan': + p = 1 + else: + csv_writer.writerow(row) current_id += 1 self._current_dt_nano = item["datetime"] # 当前 id 已超出订阅范围, 需重新订阅后续数据 @@ -213,5 +233,5 @@ class DataDownloader: def _nano_to_str(nano): dt = datetime.fromtimestamp(nano // 1000000000) s = dt.strftime('%Y-%m-%d %H:%M:%S') - s += '.' + str(int(nano % 1000000000)).zfill(9) + s += '.' + str(int(nano % 1000000000)).zfill(9)[:3] return s diff --git a/vnpy/data/tq/tianqin_data.py b/vnpy/data/tq/tianqin_data.py new file mode 100644 index 00000000..80076d5a --- /dev/null +++ b/vnpy/data/tq/tianqin_data.py @@ -0,0 +1,308 @@ +# -*- coding:UTF-8 -*- +# Author :chenfeng +import traceback +from contextlib import closing + +import os +from datetime import datetime, timedelta +from functools import lru_cache +from tqsdk import TqApi, TqSim +from vnpy.data.tq.downloader import DataDownloader +from vnpy.trader.constant import ( + Direction, + Exchange, + Product, + Offset, + Status, + OptionType, + OrderType, + Interval, +) +from vnpy.trader.object import TickData +from vnpy.trader.utility import extract_vt_symbol, get_trading_date +import pandas as pd +import csv + +# pd.pandas.set_option('display.max_rows', None) # 设置最大显示行数,超过该值用省略号代替,为None时显示所有行。 +# pd.pandas.set_option('display.max_columns', None) # 设置最大显示列数,超过该值用省略号代替,为None时显示所有列。 +# pd.pandas.reset_option(‘参数名’, 参数值) # 恢复默认相关选项 + +tick_csv_header = [ + "datetime","symbol", "exchange", "last_price","highest","lowest","volume","amount","open_interest", + "upper_limit","lower_limit","bid_price1","bid_volume1","ask_price1", + "ask_volume1","bid_price2","bid_volume2","ask_price2","ask_volume2", + "bid_price3","bid_volume3","ask_price3","ask_volume3","bid_price4", + "bid_volume4", + "ask_price4","ask_volume4", + "bid_price5","bid_volume5","ask_price5","ask_volume5" +] + +@lru_cache(maxsize=9999) +def to_vt_symbol(tq_symbol: str) -> str: + """""" + if "KQ.m" in tq_symbol: + ins_type, instrument = tq_symbol.split("@") + exchange, symbol = instrument.split(".") + return f"{symbol}88.{exchange}" + elif "KQ.i" in tq_symbol: + ins_type, instrument = tq_symbol.split("@") + exchange, symbol = instrument.split(".") + return f"{symbol}99.{exchange}" + else: + exchange, symbol = tq_symbol.split(".") + return f"{symbol}.{exchange}" + + +@lru_cache(maxsize=9999) +def to_tq_symbol(symbol: str, exchange: Exchange) -> str: + """ + TQSdk exchange first + """ + for count, word in enumerate(symbol): + if word.isdigit(): + break + + fix_symbol = symbol + if exchange in [Exchange.INE, Exchange.SHFE, Exchange.DCE]: + fix_symbol = symbol.lower() + + # Check for index symbol + time_str = symbol[count:] + + if time_str in ["88"]: + return f"KQ.m@{exchange.value}.{fix_symbol[:count]}" + if time_str in ["99"]: + return f"KQ.i@{exchange.value}.{fix_symbol[:count]}" + + return f"{exchange.value}.{fix_symbol}" + + +def generate_tick_from_dict(vt_symbol: str, data: dict) -> TickData: + """ + 生成TickData + """ + symbol, exchange = extract_vt_symbol(vt_symbol) + if '.' in data["datetime"]: + time_format = "%Y-%m-%d %H:%M:%S.%f" + else: + time_format = "%Y-%m-%d %H:%M:%S" + + return TickData( + symbol=symbol, + exchange=exchange, + datetime=datetime.strptime(data["datetime"][0:26], time_format), + name=symbol, + volume=int(data["volume"]), + open_interest=data["open_interest"], + last_price=float(data["last_price"]), + #limit_up=float(data["upper_limit"]) if data["upper_limit"] !='#N/A' else None, + #limit_down=float(data["lower_limit"]), + high_price=float(data["highest"]), + low_price=float(data["lowest"]), + bid_price_1=float(data["bid_price1"]), + bid_price_2=float(data["bid_price2"]), + bid_price_3=float(data["bid_price3"]), + bid_price_4=float(data["bid_price4"]), + bid_price_5=float(data["bid_price5"]), + ask_price_1=float(data["ask_price1"]), + ask_price_2=float(data["ask_price2"]), + ask_price_3=float(data["ask_price3"]), + ask_price_4=float(data["ask_price4"]), + ask_price_5=float(data["ask_price5"]), + bid_volume_1=int(data["bid_volume1"]), + bid_volume_2=int(data["bid_volume2"]), + bid_volume_3=int(data["bid_volume3"]), + bid_volume_4=int(data["bid_volume4"]), + bid_volume_5=int(data["bid_volume5"]), + ask_volume_1=int(data["ask_volume1"]), + ask_volume_2=int(data["ask_volume2"]), + ask_volume_3=int(data["ask_volume3"]), + ask_volume_4=int(data["ask_volume4"]), + ask_volume_5=int(data["ask_volume5"]), + gateway_name='', + ) + + +class TqFutureData(): + + def __init__(self, strategy=None): + self.strategy = strategy # 传进来策略实例,这样可以写日志到策略实例 + + self.api = TqApi(TqSim()) + + def get_tick_serial(self, vt_symbol: str): + # 获取最新的8964个数据 tick的话就相当于只有50分钟左右 + try: + symbol, exchange = extract_vt_symbol(vt_symbol) + tq_symbol = to_tq_symbol(symbol, exchange) + # 使用with closing机制确保下载完成后释放对应的资源 + with closing(self.api): + # 获得 pp2009 tick序列的引用 + ticks = self.api.get_tick_serial(symbol=tq_symbol, data_length=8964) # 每个序列最大支持请求 8964 个数据 + return ticks # 8964/3/60=49.8分钟 + except Exception as ex: + print(u'获取历史tick数据出错:{},{}'.format(str(ex), traceback.format_exc())) + return None + + def download_tick_history_to_csv(self, vt_symbol: str, cache_file: str, start_date: datetime, end_date: datetime): + + symbol, exchange = extract_vt_symbol(vt_symbol) + tq_symbol = to_tq_symbol(symbol, exchange) + td = DataDownloader(self.api, symbol_list=tq_symbol, dur_sec=0, # Tick数据为dur_sec=0 + start_dt=start_date, end_dt=end_date, + csv_file_name=cache_file) + + # 使用with closing机制确保下载完成后释放对应的资源 + # with closing(self.api): # 不能这样关闭,套利要下两个腿,所以在策略中关闭 + # while not td.is_finished(): + # self.api.wait_update() + # print(f"progress:{vt_symbol}--{start_date}--{end_date}: {td.get_progress()}") + # self.write_error(f"{vt_symbol}--{start_date}--{end_date}历史数据已经下载到csv") + while not td.is_finished(): + self.api.wait_update() + self.write_log(f"progress:{vt_symbol}--{start_date}--{end_date}: {td.get_progress()}") + self.write_log(f"{vt_symbol}--{start_date}--{end_date}历史数据已经下载到csv") + + def close_api(self): + # 关闭api,释放资源 download_tick_history_to_csv 中因为要下多个所以这里手动关闭 + self.api.close() + + def get_tick_from_cache(self, vt_symbol: str, trading_day: str): + """从本地缓存文件读取, 返回[]""" + if '-' in trading_day: + trading_day = trading_day.replace('-', '') + symbol, exchange = extract_vt_symbol(vt_symbol) + + vnpy_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')) + ticks_file = os.path.abspath(os.path.join(vnpy_folder, 'tick_data', 'tq', 'future', trading_day[0:6], + f'{symbol}_{trading_day}.csv')) + tick_dict_list = [] + if os.path.exists(ticks_file): + try: + with open(file=ticks_file, mode='r', encoding='utf-8', ) as f: + reader = csv.DictReader(f=f, fieldnames=tick_csv_header, delimiter=",") + for row in reader: + if str(row.get('last_price','nan')) not in['nan','last_price']: + tick_dict_list.append(row) + + return tick_dict_list + except Exception as ex: + self.write_log(f'从缓存文件读取{vt_symbol},交易日{trading_day}异常:{str(ex)}') + + return [] + + def get_ticks(self, vt_symbol: str, start_date: datetime, end_date: datetime = None): + """获取历史tick""" + + # 1.0从天勤接口下载指定日期,的合约的tick数据 + self.write_log(f"从天勤请求合约:{vt_symbol}开始时间:{start_date}的历史tick数据") + symbol, exchange = extract_vt_symbol(vt_symbol) + + if end_date is None: + end_date = datetime.now().replace(hour=16) + + n_days = (end_date - start_date).days + + if n_days <= 0: + n_days = 1 + + all_ticks = [] + # 轮询每一天,读取缓存数据 + for n in range(n_days+1): + trading_date = start_date + timedelta(days=n) + if trading_date.isoweekday() in [6, 7]: + continue + trading_day = trading_date.strftime('%Y%m%d') + day_ticks = self.get_tick_from_cache(vt_symbol=vt_symbol, trading_day=trading_day) + + if day_ticks: + self.write_log(f'读取{vt_symbol} {trading_day}缓存数据{len(day_ticks)}条') + all_ticks.extend(day_ticks) + + if all_ticks: + last_tick_dt = all_ticks[-1].get('datetime') + begin_dt = datetime.strptime(last_tick_dt[0:26], "%Y-%m-%d %H:%M:%S.%f") + rt_ticks = self.get_runtime_ticks(vt_symbol=vt_symbol, begin_dt=begin_dt) + if rt_ticks: + all_ticks.extend(rt_ticks) + return all_ticks + + def get_runtime_ticks(self, vt_symbol: str, begin_dt: datetime= None): + """获取实时历史tick""" + self.write_log(f"从天勤请求合约:{vt_symbol}的实时的8964条tick数据") + symbol, exchange = extract_vt_symbol(vt_symbol) + df = self.get_tick_serial(vt_symbol) + ticks = [] + if df is None: + return ticks + + self.write_log(f"从天勤或历史tick数据成功,开始清洗tick") + # print(df.columns.values) + # 给df 的各个列名按vnpy格式重置一下 + df.columns = ['datetime', 'id', 'last_price', 'average', 'highest', 'lowest', 'ask_price1', + 'ask_volume11', 'bid_price1', 'bid_volume11', 'ask_price2', 'ask_volume12', + 'bid_price2', 'bid_volume12', 'ask_price3', 'ask_volume13', 'bid_price3', + 'bid_volume13', 'ask_price4', 'ask_volume14', 'bid_price4', 'bid_volume14', + 'ask_price5', 'ask_volume15', 'bid_price5', 'bid_volume15', 'volume', 'amount', + 'open_interest', 'symbol', 'duration'] + df.drop(['id','average','duration'], axis=1) + + for index, row in df.iterrows(): + # 日期时间, 成交价, 成交量, 总量, 属性(持仓增减), B1价, B1量, B2价, B2量, B3价, B3量, S1价, S1量, S2价, S2量, S3价, S3量, BS + # 日期时间, 成交价,当日最高价,当日最低价, B1价, B1量,S1价, S1量,日内成交量,金额,持仓量 + # 0 1 2 3 4 5 6 7 8 9 10 + tick = row.to_dict() + + if str(tick['last_price']) == 'nan': + continue + # datetime: 自unix epoch(1970-01-01 00:00:00 GMT)以来的纳秒数 + # 1.0、转换读取的tick 时间文本 到 datetime格式 + # tick_datetime = datetime.strptime(tick['datetime'], "%Y-%m-%d %H:%M:%S.%f") + tick_datetime = datetime.strptime(self._nano_to_str(tick['datetime']), "%Y-%m-%d %H:%M:%S.%f") + if tick_datetime <= begin_dt: + continue + # 2.0、获取tick对应的交易日 + tick_tradingday = get_trading_date(tick_datetime) + + tick.update({'symbol': symbol, 'exchange': exchange.value, 'trading_day': tick_tradingday}) + tick['datetime'] = tick_datetime.strftime("%Y-%m-%d %H:%M:%S.%f") + ticks.append(tick) + + del df + return ticks + + @staticmethod + def _nano_to_str(nano): + # nano: 自unix epoch(1970-01-01 00:00:00 GMT)以来的纳秒数 9位为纳秒 6位为微秒,%f只用到微秒,所以[:6] + dt = datetime.fromtimestamp(nano // 1000000000) + s = dt.strftime('%Y-%m-%d %H:%M:%S') + s += '.' + str(int(nano % 1000000000)).zfill(9)[:3] # zfill() 方法返回指定长度的字符串,原字符串右对齐,前面填充0。 + return s + + def write_log(self, msg): + if self.strategy is None: + print(msg) + else: + self.strategy.write_log(msg) + + def write_error(self, msg): + if self.strategy is None: + print(msg) + else: + self.strategy.write_error(msg) + +if __name__ == '__main__': + # tqsdk = Query_tqsdk_data(strategy=self) # 在策略中使用 + tqsdk = TqFutureData() + # ticks = tqsdk.query_tick_current("pp2009.DCE") + #tick_df = tqsdk.query_tick_history_data(vt_symbol="ni2009.SHFE", start_date=pd.to_datetime("2020-07-22")) + #print(tick_df) + + ticks = tqsdk.get_runtime_ticks("ni2009.SHFE") + + print(ticks[0]) + + print(ticks[-1]) + + + diff --git a/vnpy/gateway/ctp/ctp_gateway.py b/vnpy/gateway/ctp/ctp_gateway.py index eb3da627..a8338330 100644 --- a/vnpy/gateway/ctp/ctp_gateway.py +++ b/vnpy/gateway/ctp/ctp_gateway.py @@ -476,6 +476,13 @@ class CtpGateway(BaseGateway): """""" self.td_api.query_position() + def query_history(self, req: HistoryRequest) -> List[BarData]: + """查询K线历史""" + if self.tq_api: + return self.tq_api.query_history(req) + else: + return [] + def close(self): """""" if self.md_api: @@ -1908,7 +1915,9 @@ class TqMdApi(): for vt_symbol, quote in self.quote_objs: if self.api.is_changing(quote): tick = self.generate_tick_from_quote(vt_symbol, quote) - tick and self.gateway.on_tick(tick) and self.gateway.on_custom_tick(tick) + if tick: + self.gateway.on_tick(tick) + self.gateway.on_custom_tick(tick) def subscribe(self, req: SubscribeRequest) -> None: """ diff --git a/vnpy/gateway/pb/pb_gateway.py b/vnpy/gateway/pb/pb_gateway.py index 90667e12..02631f1f 100644 --- a/vnpy/gateway/pb/pb_gateway.py +++ b/vnpy/gateway/pb/pb_gateway.py @@ -1646,7 +1646,7 @@ class PbTdApi(object): order.status = Status.REJECTED self.gateway.write_log(f'dbf批量下单,委托被拒:{order.__dict__}') self.gateway.order_manager.on_order(order) - self.gateway.write_error(msg=err_msg, error={"ErrorID": err_id, "ErrorMsg": "委托失败"}) + self.gateway.write_error(msg=f'{order.direction.value},{order.vt_symbol},{err_msg}', error={"ErrorID": err_id, "ErrorMsg": "委托失败"}) if sys_orderid != '0': self.gateway.order_manager.update_orderid_map(local_orderid=local_orderid, diff --git a/vnpy/trader/converter.py b/vnpy/trader/converter.py index 4b0181ee..40383ced 100644 --- a/vnpy/trader/converter.py +++ b/vnpy/trader/converter.py @@ -88,6 +88,7 @@ class OffsetConverter: # 平今/平昨拆分 elif req.exchange in [Exchange.SHFE, Exchange.INE]: + print(f'转换平今/平昨') return holding.convert_order_request_shfe(req) else: return [req] @@ -262,10 +263,12 @@ class PositionHolding: td_available = self.long_td - self.long_td_frozen if req.volume > pos_available: + print(f'{req.vt_symbol}没有可用仓位') return [] elif req.volume <= td_available: req_td = copy(req) req_td.offset = Offset.CLOSETODAY + print(f'{req.vt_symbol} 平仓=>平今') return [req_td] else: req_list = [] @@ -274,11 +277,13 @@ class PositionHolding: req_td = copy(req) req_td.offset = Offset.CLOSETODAY req_td.volume = td_available + print(f'{req.vt_symbol} 平仓 {req_td.volume}手 =>平今') req_list.append(req_td) req_yd = copy(req) req_yd.offset = Offset.CLOSEYESTERDAY req_yd.volume = req.volume - td_available + print(f'{req.vt_symbol} 平仓 {req_yd.volume}手 =>平昨') req_list.append(req_yd) return req_list diff --git a/vnpy/trader/ui/widget.py b/vnpy/trader/ui/widget.py index 56911b34..1dceabb0 100644 --- a/vnpy/trader/ui/widget.py +++ b/vnpy/trader/ui/widget.py @@ -627,7 +627,7 @@ class TradingWidget(QtWidgets.QWidget): [order_type.value for order_type in OrderType]) double_validator = QtGui.QDoubleValidator() - double_validator.setBottom(0) + #double_validator.setBottom(0) self.price_line = QtWidgets.QLineEdit() self.price_line.setValidator(double_validator)