diff --git a/vnpy/api/t2sdk/__init__.py b/vnpy/api/t2sdk/__init__.py new file mode 100644 index 00000000..ec72fde1 --- /dev/null +++ b/vnpy/api/t2sdk/__init__.py @@ -0,0 +1 @@ +from . import py_t2sdk \ No newline at end of file diff --git a/vnpy/api/t2sdk/py_t2sdk.pyd b/vnpy/api/t2sdk/py_t2sdk.pyd new file mode 100644 index 00000000..2819e2aa Binary files /dev/null and b/vnpy/api/t2sdk/py_t2sdk.pyd differ diff --git a/vnpy/api/t2sdk/t2sdk.dll b/vnpy/api/t2sdk/t2sdk.dll new file mode 100644 index 00000000..06ee784b Binary files /dev/null and b/vnpy/api/t2sdk/t2sdk.dll differ diff --git a/vnpy/app/account_recorder/engine.py b/vnpy/app/account_recorder/engine.py index 10fdaccf..d837a2aa 100644 --- a/vnpy/app/account_recorder/engine.py +++ b/vnpy/app/account_recorder/engine.py @@ -40,7 +40,7 @@ from vnpy.trader.event import ( EVENT_WARNING, EVENT_CRITICAL, ) -# from vnpy.trader.constant import Direction +from vnpy.trader.constant import Direction, Exchange, Status from vnpy.trader.engine import BaseEngine, MainEngine from vnpy.trader.utility import get_trading_date, load_json, save_json from vnpy.data.mongo.mongo_data import MongoData @@ -334,6 +334,9 @@ class AccountRecorder(BaseEngine): if len(order.sys_orderid) == 0: # 未有系统的委托编号,不做持久化 order.sys_orderid = order.orderid + if order.status in [Status.SUBMITTING]: + return + dt = getattr(order, 'datetime') if not dt: order_date = datetime.now().strftime('%Y-%m-%d') @@ -350,9 +353,9 @@ class AccountRecorder(BaseEngine): 'account_id': order.accountid, 'sys_orderid': order.sys_orderid, 'order_date': order_date, - 'holder_id': getattr(order,'holder_id','')} + 'holder_id': getattr(order, 'holder_id', '')} - data = copy.copy(order.__dict__) + data = copy.deepcopy(order.__dict__) data.update({'account_id': data.pop('accountid')}) data.update({'order_date': order_date}) data.update({'exchange': order.exchange.value}) @@ -361,6 +364,10 @@ class AccountRecorder(BaseEngine): data.update({'type': order.type.value}) data.update({'status': order.status.value}) + if order.exchange in [Exchange.SSE, Exchange.SZSE]: + if hasattr(self.main_engine, 'get_name'): + data.update({'name': self.main_engine.get_name(order.vt_symbol)}) + self.update_data(db_name=ACCOUNT_DB_NAME, col_name=TODAY_ORDER_COL, fld=fld, data=data) # 数据库需要提前建立多重索引 @@ -373,11 +380,11 @@ class AccountRecorder(BaseEngine): 'account_id': order.accountid, 'sys_orderid': order.sys_orderid, 'order_date': order_date, - 'holder_id': getattr(order,'holder_id','')} + 'holder_id': getattr(order, 'holder_id', '')} self.update_data(db_name=ACCOUNT_DB_NAME, col_name=HISTORY_ORDER_COL, fld=fld2, data=history_data) - def get_trading_date(self, dt:datetime): + def get_trading_date(self, dt: datetime): if self.is_7x24: return dt.strftime('%Y-%m-%d') else: @@ -397,7 +404,7 @@ class AccountRecorder(BaseEngine): # 提前创建索引 # db.today_trades.createIndex({'account_id':1,'vt_symbol':1,'vt_tradeid':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtSymbol_vt_tradeid_trade_date_holder_id','unique':true}) - data = copy.copy(trade.__dict__) + data = copy.deepcopy(trade.__dict__) data.update({'account_id': data.pop('accountid')}) data.update({'trade_date': trade_date}) data.update({'exchange': trade.exchange.value}) diff --git a/vnpy/app/cta_crypto/engine.py b/vnpy/app/cta_crypto/engine.py index f9bcbb2d..0fa2a790 100644 --- a/vnpy/app/cta_crypto/engine.py +++ b/vnpy/app/cta_crypto/engine.py @@ -184,6 +184,7 @@ class CtaEngine(BaseEngine): self.main_engine.reload_strategy = self.reload_strategy self.main_engine.save_strategy_data = self.save_strategy_data self.main_engine.save_strategy_snapshot = self.save_strategy_snapshot + self.main_engine.clean_strategy_cache = self.clean_strategy_cache # 注册到远程服务调用 if self.main_engine.rpc_service: @@ -198,6 +199,7 @@ class CtaEngine(BaseEngine): self.main_engine.rpc_service.register(self.main_engine.reload_strategy) self.main_engine.rpc_service.register(self.main_engine.save_strategy_data) self.main_engine.rpc_service.register(self.main_engine.save_strategy_snapshot) + self.main_engine.rpc_service.register(self.main_engine.clean_strategy_cache) def process_timer_event(self, event: Event): """ 处理定时器事件""" @@ -217,7 +219,7 @@ class CtaEngine(BaseEngine): # 主动获取所有策略得持仓信息 all_strategy_pos = self.get_all_strategy_pos() - if dt.minute % 5 == 0: + if dt.minute % 5 == 0 and self.engine_config.get('compare_pos',True): # 比对仓位,使用上述获取得持仓信息,不用重复获取 self.compare_pos(strategy_pos_list=copy(all_strategy_pos)) @@ -1209,6 +1211,15 @@ class CtaEngine(BaseEngine): self.write_error(u'保存策略{}数据异常:'.format(strategy_name, str(ex))) self.write_error(traceback.format_exc()) + def clean_strategy_cache(self, strategy_name): + """清除策略K线缓存文件""" + cache_file = os.path.abspath(os.path.join(self.get_data_path(), f'{strategy_name}_klines.pkb2')) + if os.path.exists(cache_file): + self.write_log(f'移除策略缓存文件:{cache_file}') + os.remove(cache_file) + else: + self.write_log(f'策略缓存文件不存在:{cache_file}') + def get_strategy_snapshot(self, strategy_name): """实时获取策略的K线切片(比较耗性能)""" strategy = self.strategies.get(strategy_name, None) @@ -1511,6 +1522,23 @@ class CtaEngine(BaseEngine): value = getattr(strategy, parameter, None) return value + def get_none_strategy_pos_list(self): + """获取非策略持有的仓位""" + # 格式 [ 'strategy_name':'account', 'pos': [{'vt_symbol': '', 'direction': 'xxx', 'volume':xxx }] } ] + none_strategy_pos_file = os.path.abspath(os.path.join(os.getcwd(), 'data', 'none_strategy_pos.json')) + if not os.path.exists(none_strategy_pos_file): + return [] + try: + with open(none_strategy_pos_file, encoding='utf8') as f: + pos_list = json.load(f) + if isinstance(pos_list, list): + return pos_list + + return [] + except Exception as ex: + self.write_error(u'未能读取或解释{}'.format(none_strategy_pos_file)) + return [] + def compare_pos(self, strategy_pos_list=[], auto_balance=False): """ 对比账号&策略的持仓,不同的话则发出微信提醒 @@ -1527,6 +1555,10 @@ class CtaEngine(BaseEngine): strategy_pos_list = self.get_all_strategy_pos() self.write_log(u'策略持仓清单:{}'.format(strategy_pos_list)) + none_strategy_pos = self.get_none_strategy_pos_list() + if len(none_strategy_pos) > 0: + strategy_pos_list.extend(none_strategy_pos) + # 需要进行对比得合约集合(来自策略持仓/账号持仓) vt_symbols = set() diff --git a/vnpy/app/cta_crypto/template.py b/vnpy/app/cta_crypto/template.py index d789a674..b2ac7a2f 100644 --- a/vnpy/app/cta_crypto/template.py +++ b/vnpy/app/cta_crypto/template.py @@ -1321,7 +1321,7 @@ class CtaFutureTemplate(CtaTemplate): continue if grid.open_status and not grid.order_status: - up_grids_info += f'持空中: [数量:{grid.volume}\n, 开仓时间:{grid.open_time}]\n' + up_grids_info += f'持空中: [数量:{grid.volume}, 开仓时间:{grid.open_time}]\n' continue if not grid.open_status and grid.order_status: @@ -1355,8 +1355,795 @@ class CtaFutureTemplate(CtaTemplate): """显示事务的过程记录=》 log""" if not self.inited: return - self.write_log(u'{} 当前 {}价格:{}' - .format(self.cur_datetime, self.vt_symbol, self.cur_price)) + self.write_log(u'{} 当前 {}价格:{}, 委托状态:{}' + .format(self.cur_datetime, self.vt_symbol, self.cur_price, self.entrust)) + + if len(self.active_orders) > 0: + self.write_log('当前活动订单:{}'.format(self.active_orders)) + + if hasattr(self, 'policy'): + policy = getattr(self, 'policy') + if policy: + op = getattr(policy, 'to_json', None) + if callable(op): + self.write_log(u'当前Policy:{}'.format(json.dumps(policy.to_json(), indent=2, ensure_ascii=False))) + + def save_dist(self, dist_data): + """ + 保存策略逻辑过程记录=》 csv文件按 + :param dist_data: + :return: + """ + if self.backtesting: + save_path = self.cta_engine.get_logs_path() + else: + save_path = self.cta_engine.get_data_path() + try: + if 'margin' not in dist_data: + dist_data.update({'margin': dist_data.get('price', 0) * dist_data.get('volume', + 0) * self.cta_engine.get_margin_rate( + dist_data.get('symbol', self.vt_symbol))}) + if 'datetime' not in dist_data: + dist_data.update({'datetime': self.cur_datetime}) + if self.position and 'long_pos' not in dist_data: + dist_data.update({'long_pos': self.position.long_pos}) + if self.position and 'short_pos' not in dist_data: + dist_data.update({'short_pos': self.position.short_pos}) + + file_name = os.path.abspath(os.path.join(save_path, f'{self.strategy_name}_dist.csv')) + append_data(file_name=file_name, dict_data=dist_data, field_names=self.dist_fieldnames) + except Exception as ex: + self.write_error(u'save_dist 异常:{} {}'.format(str(ex), traceback.format_exc())) + + def save_tns(self, tns_data): + """ + 保存多空事务记录=》csv文件,便于后续分析 + :param tns_data: + :return: + """ + if self.backtesting: + save_path = self.cta_engine.get_logs_path() + else: + save_path = self.cta_engine.get_data_path() + + try: + file_name = os.path.abspath(os.path.join(save_path, f'{self.strategy_name}_tns.csv')) + append_data(file_name=file_name, dict_data=tns_data) + except Exception as ex: + self.write_error(u'save_tns 异常:{} {}'.format(str(ex), traceback.format_exc())) + + def send_wechat(self, msg: str): + """实盘时才发送微信""" + if self.backtesting: + return + self.cta_engine.send_wechat(msg=msg, strategy=self) + + +class CtaSpotTemplate(CtaTemplate): + """ + 现货模板 + """ + + asset_symbol = "" # 资产币 BTCUSDT => BTC + quote_symbol = "" # 定价币 BTCUSDT => USDT + + price_tick = 0.01 # 商品的最小价格跳动 + symbol_size = 1 # 商品得合约乘数 + margin_rate = 1 # 商品的保证金 + volumn_tick = 0.01 # 商品最小成交数量 + + # 委托类型 + order_type = OrderType.LIMIT + cancel_seconds = 120 # 撤单时间(秒) + activate_market = False + + # 资金相关 + max_invest_rate = 0.1 # 最大仓位(0~1) asset / virtual_quote + max_invest_margin = 0 # 资金上限 0,不限制 virtual_quote + max_invest_pos = 0 # 单向头寸数量上限 0,不限制 asset + + # 是否回测状态 + backtesting = False + + # 逻辑过程日志 + dist_fieldnames = ['datetime', 'symbol', 'volume', 'price', 'margin', + 'operation', 'signal', 'stop_price', 'target_price', + 'long_pos'] + + def __init__(self, cta_engine, strategy_name, vt_symbol, setting): + """""" + + # vt_symbol => symbol, exchange + self.symbol, self.exchange = extract_vt_symbol(vt_symbol) + + self.position = None # 仓位组件 + self.policy = None # 事务执行组件 + self.gt = None # 网格交易组件 + self.klines = {} # K线组件字典: kline_name: kline + + self.price_tick = 0.01 # 商品的最小价格跳动 + self.symbol_size = 1 # 商品得合约乘数 + self.margin_rate = 1 # 商品的保证金 + self.volumn_tick = 0.01 # 商品最小成交数量 + self.cancel_seconds = 120 # 撤单时间(秒) + self.activate_market = False + self.order_type = OrderType.LIMIT + self.backtesting = False + + self.cur_datetime: datetime = None # 当前Tick时间 + self.cur_tick: TickData = None # 最新的合约tick( vt_symbol) + self.cur_price = None # 当前价(主力合约 vt_symbol) + self.asset_pos = None # 当前asset_symbol持仓信息 + self.quote_pos = None # 当前quote_symbol的持仓信息 + + self.last_minute = None # 最后的分钟,用于on_tick内每分钟处理的逻辑 + self.display_bars = True + + super().__init__( + cta_engine, strategy_name, vt_symbol, setting + ) + + # 增加仓位管理模块 + self.position = CtaPosition(strategy=self) + self.position.maxPos = sys.maxsize + # 增加网格持久化模块 + self.gt = CtaGridTrade(strategy=self) + + if 'backtesting' not in self.parameters: + self.parameters.append('backtesting') + + def update_setting(self, setting: dict): + """ + Update strategy parameter wtih value in setting dict. + """ + for name in self.parameters: + if name in setting: + setattr(self, name, setting[name]) + + self.price_tick = self.cta_engine.get_price_tick(self.vt_symbol) + self.symbol_size = self.cta_engine.get_size(self.vt_symbol) + self.margin_rate = self.cta_engine.get_margin_rate(self.vt_symbol) + self.volumn_tick = self.cta_engine.get_volume_tick(self.vt_symbol) + + # 检查资产币+定价币是否与vt_symbol一致 + if self.symbol != f'{self.asset_symbol}{self.quote_symbol}': + raise Exception(f'{self.vt_symbol}与{self.asset_symbol}+{self.quote_symbol}不匹配') + + if self.activate_market: + self.write_log(f'{self.strategy_name}使用市价单委托方式') + self.order_type = OrderType.MARKET + + + def sync_data(self): + """同步更新数据""" + if not self.backtesting: + self.write_log(u'保存k线缓存数据') + self.save_klines_to_cache() + + if self.inited and self.trading: + self.write_log(u'保存policy数据') + self.policy.save() + + def save_klines_to_cache(self, kline_names: list = []): + """ + 保存K线数据到缓存 + :param kline_names: 一般为self.klines的keys + :return: + """ + if len(kline_names) == 0: + kline_names = list(self.klines.keys()) + + # 获取保存路径 + save_path = self.cta_engine.get_data_path() + # 保存缓存的文件名 + file_name = os.path.abspath(os.path.join(save_path, f'{self.strategy_name}_klines.pkb2')) + with bz2.BZ2File(file_name, 'wb') as f: + klines = {} + for kline_name in kline_names: + kline = self.klines.get(kline_name, None) + # if kline: + # kline.strategy = None + # kline.cb_on_bar = None + klines.update({kline_name: kline}) + pickle.dump(klines, f) + + def load_klines_from_cache(self, kline_names: list = []): + """ + 从缓存加载K线数据 + :param kline_names: + :return: + """ + if len(kline_names) == 0: + kline_names = list(self.klines.keys()) + + save_path = self.cta_engine.get_data_path() + file_name = os.path.abspath(os.path.join(save_path, f'{self.strategy_name}_klines.pkb2')) + try: + last_bar_dt = None + with bz2.BZ2File(file_name, 'rb') as f: + klines = pickle.load(f) + # 逐一恢复K线 + for kline_name in kline_names: + # 缓存的k线实例 + cache_kline = klines.get(kline_name, None) + # 当前策略实例的K线实例 + strategy_kline = self.klines.get(kline_name, None) + + if cache_kline and strategy_kline: + # 临时保存当前的回调函数 + cb_on_bar = strategy_kline.cb_on_bar + # 缓存实例数据 =》 当前实例数据 + strategy_kline.__dict__.update(cache_kline.__dict__) + + # 所有K线的最后时间 + if last_bar_dt and strategy_kline.cur_datetime: + last_bar_dt = max(last_bar_dt, strategy_kline.cur_datetime) + else: + last_bar_dt = strategy_kline.cur_datetime + + # 重新绑定k线策略与on_bar回调函数 + strategy_kline.strategy = self + strategy_kline.cb_on_bar = cb_on_bar + + self.write_log(f'恢复{kline_name}缓存数据,最新bar结束时间:{last_bar_dt}') + + self.write_log(u'加载缓存k线数据完毕') + return last_bar_dt + except Exception as ex: + self.write_error(f'加载缓存K线数据失败:{str(ex)}') + return None + + def get_klines_snapshot(self): + """返回当前klines的切片数据""" + try: + d = { + 'strategy': self.strategy_name, + 'datetime': datetime.now()} + klines = {} + for kline_name in sorted(self.klines.keys()): + klines.update({kline_name: self.klines.get(kline_name).get_data()}) + kline_names = list(klines.keys()) + binary_data = zlib.compress(pickle.dumps(klines)) + d.update({'kline_names': kline_names, 'klines': binary_data, 'zlib': True}) + return d + except Exception as ex: + self.write_error(f'获取klines切片数据失败:{str(ex)}') + return {} + + def init_policy(self): + self.write_log(u'init_policy(),初始化执行逻辑') + if self.policy: + self.policy.load() + + def init_position(self): + """ + 初始化Positin + 使用网格的持久化,获取开仓状态的多空单,更新 + :return: + """ + self.write_log(u'init_position(),初始化持仓') + changed = False + + + if len(self.gt.dn_grids) <= 0: + # 加载已开仓的多数据,网格JSON + self.position.long_pos = 0 + long_grids = self.gt.load(direction=Direction.LONG, open_status_filter=[True]) + if len(long_grids) == 0: + self.write_log(u'没有持久化的多单数据') + self.gt.dn_grids = [] + else: + self.gt.dn_grids = long_grids + for lg in long_grids: + + if len(lg.order_ids) > 0 or lg.order_status: + self.write_log(f'重置委托状态:{lg.order_status},清除委托单:{lg.order_ids}') + lg.order_status = False + [self.cancel_order(vt_orderid) for vt_orderid in lg.order_ids] + lg.order_ids = [] + changed = True + + self.write_log(u'加载持仓多单[{},价格:{},数量:{}手, 开仓时间:{}' + .format(lg.vt_symbol, lg.open_price, lg.volume, lg.open_time)) + self.position.long_pos = round(self.position.long_pos + lg.volume, 7) + + self.write_log(f'持久化多单,共持仓:{self.position.long_pos}手') + + self.position.pos = round(self.position.long_pos + self.position.short_pos, 7) + + self.write_log(u'{}加载持久化数据完成,多单:{},空单:{},共:{}手' + .format(self.strategy_name, + self.position.long_pos, + abs(self.position.short_pos), + self.position.pos)) + self.pos = self.position.pos + if changed: + self.gt.save() + self.display_grids() + + def get_positions(self): + """ + 获取策略当前持仓(重构,使用主力合约) + :return: [{'vt_symbol':symbol,'direction':direction,'volume':volume] + """ + if not self.position: + return [] + pos_list = [] + + if self.position.long_pos > 0: + for g in self.gt.get_opened_grids(direction=Direction.LONG): + pos_list.append({'vt_symbol': f'{self.asset_symbol}.{self.exchange.value}', + 'direction': 'long', + 'volume': g.volume - g.traded_volume, + 'price': g.open_price}) + + if self.cur_datetime and (datetime.now() - self.cur_datetime).total_seconds() < 10: + self.write_log(u'{}当前持仓:{}'.format(self.strategy_name, pos_list)) + return pos_list + + def on_trade(self, trade: TradeData): + """交易更新""" + self.write_log(u'{},交易更新:{},当前持仓:{} ' + .format(self.cur_datetime, + trade.__dict__, + self.position.pos)) + + dist_record = dict() + if self.backtesting: + dist_record['datetime'] = trade.time + else: + dist_record['datetime'] = ' '.join([self.cur_datetime.strftime('%Y-%m-%d'), trade.time]) + dist_record['volume'] = trade.volume + dist_record['price'] = trade.price + dist_record['margin'] = trade.price * trade.volume * self.cta_engine.get_margin_rate(trade.vt_symbol) + dist_record['symbol'] = trade.vt_symbol + + if trade.direction == Direction.LONG and trade.offset == Offset.OPEN: + dist_record['operation'] = 'buy' + self.position.open_pos(trade.direction, volume=trade.volume) + dist_record['long_pos'] = self.position.long_pos + dist_record['short_pos'] = self.position.short_pos + + if trade.direction == Direction.SHORT and trade.offset == Offset.OPEN: + dist_record['operation'] = 'short' + self.position.open_pos(trade.direction, volume=trade.volume) + dist_record['long_pos'] = self.position.long_pos + dist_record['short_pos'] = self.position.short_pos + + if trade.direction == Direction.LONG and trade.offset != Offset.OPEN: + dist_record['operation'] = 'cover' + self.position.close_pos(trade.direction, volume=trade.volume) + dist_record['long_pos'] = self.position.long_pos + dist_record['short_pos'] = self.position.short_pos + + if trade.direction == Direction.SHORT and trade.offset != Offset.OPEN: + dist_record['operation'] = 'sell' + self.position.close_pos(trade.direction, volume=trade.volume) + dist_record['long_pos'] = self.position.long_pos + dist_record['short_pos'] = self.position.short_pos + + self.save_dist(dist_record) + self.pos = self.position.pos + + def on_order(self, order: OrderData): + """报单更新""" + # 未执行的订单中,存在是异常,删除 + self.write_log(u'{}报单更新,{}'.format(self.cur_datetime, order.__dict__)) + + if order.vt_orderid in self.active_orders: + + if order.volume == order.traded and order.status in [Status.ALLTRADED]: + self.on_order_all_traded(order) + + elif order.offset == Offset.OPEN and order.status in [Status.CANCELLED]: + # 开仓委托单被撤销 + self.on_order_open_canceled(order) + + elif order.offset != Offset.OPEN and order.status in [Status.CANCELLED]: + # 平仓委托单被撤销 + self.on_order_close_canceled(order) + + elif order.status == Status.REJECTED: + if order.offset == Offset.OPEN: + self.write_error(u'{}委托单开{}被拒,price:{},total:{},traded:{},status:{}' + .format(order.vt_symbol, order.direction, order.price, order.volume, + order.traded, order.status)) + self.on_order_open_canceled(order) + else: + self.write_error(u'OnOrder({})委托单平{}被拒,price:{},total:{},traded:{},status:{}' + .format(order.vt_symbol, order.direction, order.price, order.volume, + order.traded, order.status)) + self.on_order_close_canceled(order) + else: + self.write_log(u'委托单未完成,total:{},traded:{},tradeStatus:{}' + .format(order.volume, order.traded, order.status)) + else: + self.write_error(u'委托单{}不在策略的未完成订单列表中:{}'.format(order.vt_orderid, self.active_orders)) + + def on_order_all_traded(self, order: OrderData): + """ + 订单全部成交 + :param order: + :return: + """ + self.write_log(u'{},委托单:{}全部完成'.format(order.time, order.vt_orderid)) + order_info = self.active_orders[order.vt_orderid] + + # 通过vt_orderid,找到对应的网格 + grid = order_info.get('grid', None) + if grid is not None: + # 移除当前委托单 + if order.vt_orderid in grid.order_ids: + grid.order_ids.remove(order.vt_orderid) + + # 网格的所有委托单已经执行完毕 + if len(grid.order_ids) == 0: + grid.order_status = False + grid.traded_volume = 0 + + # 平仓完毕(cover, sell) + if order.offset != Offset.OPEN: + grid.open_status = False + grid.close_status = True + if grid.volume < order.traded: + self.write_log(f'网格平仓数量{grid.volume},小于委托单成交数量:{order.volume},修正为:{order.volume}') + grid.volume = order.traded + + self.write_log(f'{grid.direction.value}单已平仓完毕,order_price:{order.price}' + + f',volume:{order.volume}') + + self.write_log(f'移除网格:{grid.to_json()}') + self.gt.remove_grids_by_ids(direction=grid.direction, ids=[grid.id]) + + # 开仓完毕( buy, short) + else: + grid.open_status = True + grid.open_time = self.cur_datetime + self.write_log(f'{grid.direction.value}单已开仓完毕,order_price:{order.price}' + + f',volume:{order.volume}') + + # 网格的所有委托单部分执行完毕 + else: + old_traded_volume = grid.traded_volume + grid.traded_volume += order.volume + grid.traded_volume = round(grid.traded_volume, 7) + + self.write_log(f'{grid.direction.value}单部分{order.offset}仓,' + + f'网格volume:{grid.volume}, traded_volume:{old_traded_volume}=>{grid.traded_volume}') + + self.write_log(f'剩余委托单号:{grid.order_ids}') + + self.gt.save() + # 在策略得活动订单中,移除 + self.write_log(f'委托单{order.vt_orderid}完成,从活动订单中移除') + self.active_orders.pop(order.vt_orderid, None) + + def on_order_open_canceled(self, order: OrderData): + """ + 委托开仓单撤销 + :param order: + :return: + """ + self.write_log(u'委托开仓单撤销:{}'.format(order.__dict__)) + + if order.vt_orderid not in self.active_orders: + self.write_error(u'{}不在未完成的委托单中{}。'.format(order.vt_orderid, self.active_orders)) + return + + # 直接更新“未完成委托单”,更新volume,retry次数 + old_order = self.active_orders[order.vt_orderid] + self.write_log(u'{} 委托信息:{}'.format(order.vt_orderid, old_order)) + old_order['traded'] = order.traded + + grid = old_order.get('grid', None) + + pre_status = old_order.get('status', Status.NOTTRADED) + if pre_status == Status.CANCELLED: + self.write_log(f'当前状态已经是{Status.CANCELLED},不做调整处理') + return + + old_order.update({'status': Status.CANCELLED}) + self.write_log(u'委托单状态:{}=>{}'.format(pre_status, old_order.get('status'))) + if grid: + if order.vt_orderid in grid.order_ids: + grid.order_ids.remove(order.vt_orderid) + if order.traded > 0: + pre_traded_volume = grid.traded_volume + grid.traded_volume = round(grid.traded_volume + order.traded, 7) + self.write_log(f'撤单中部分开仓:{order.traded} + 原已成交:{pre_traded_volume} => {grid.traded_volume}') + if len(grid.order_ids) == 0: + grid.order_status = False + if grid.traded_volume > 0: + pre_volume = grid.volume + grid.volume = grid.traded_volume + grid.traded_volume = 0 + grid.open_status = True + self.write_log(f'开仓完成,grid.volume {pre_volume} => {grid.volume}') + + self.gt.save() + self.active_orders.update({order.vt_orderid: old_order}) + + self.display_grids() + + def on_order_close_canceled(self, order: OrderData): + """委托平仓单撤销""" + self.write_log(u'委托平仓单撤销:{}'.format(order.__dict__)) + + if order.vt_orderid not in self.active_orders: + self.write_error(u'{}不在未完成的委托单中:{}。'.format(order.vt_orderid, self.active_orders)) + return + + # 直接更新“未完成委托单”,更新volume,Retry次数 + old_order = self.active_orders[order.vt_orderid] + self.write_log(u'{} 订单信息:{}'.format(order.vt_orderid, old_order)) + old_order['traded'] = order.traded + + grid = old_order.get('grid', None) + pre_status = old_order.get('status', Status.NOTTRADED) + if pre_status == Status.CANCELLED: + self.write_log(f'当前状态已经是{Status.CANCELLED},不做调整处理') + return + + old_order.update({'status': Status.CANCELLED}) + self.write_log(u'委托单状态:{}=>{}'.format(pre_status, old_order.get('status'))) + if grid: + if order.vt_orderid in grid.order_ids: + grid.order_ids.remove(order.vt_orderid) + if order.traded > 0: + pre_traded_volume = grid.traded_volume + grid.traded_volume = round(grid.traded_volume + order.traded, 7) + self.write_log(f'撤单中部分平仓成交:{order.traded} + 原已成交:{pre_traded_volume} => {grid.traded_volume}') + if len(grid.order_ids) == 0: + grid.order_status = False + if grid.traded_volume > 0: + pre_volume = grid.volume + grid.volume = round(grid.volume - grid.traded_volume, 7) + grid.traded_volume = 0 + if grid.volume <= 0: + grid.volume = 0 + grid.open_status = False + self.write_log(f'强制全部平仓完成') + else: + self.write_log(f'平仓委托中,撤单完成,部分成交,减少持仓grid.volume {pre_volume} => {grid.volume}') + + self.gt.save() + self.active_orders.update({order.vt_orderid: old_order}) + + self.display_grids() + + def on_stop_order(self, stop_order: StopOrder): + self.write_log(f'停止单触发:{stop_order.__dict__}') + + def grid_check_stop(self): + """ + 网格逐一止损/止盈检查 (根据指数价格进行止损止盈) + :return: + """ + if self.entrust != 0: + return + + if not self.trading and not self.inited: + self.write_error(u'当前不允许交易') + return + + # 多单网格逐一止损/止盈检查: + long_grids = self.gt.get_opened_grids(direction=Direction.LONG) + for g in long_grids: + if g.stop_price > 0 and g.stop_price > self.cur_price and g.open_status and not g.order_status: + # 调用平仓模块 + self.write_log(u'{} {}当前价:{} 触发多单止损线{},开仓价:{},v:{}'. + format(self.cur_datetime, + g.vt_symbol, + self.cur_price, + g.stop_price, + g.open_price, + g.volume)) + if self.grid_sell(g): + self.write_log(u'多单止盈/止损委托成功') + else: + self.write_error(u'多单止损委托失败') + + + def grid_buy(self, grid): + """ + 事务开多仓 + :return: + """ + if self.backtesting: + buy_price = self.cur_price + self.price_tick + else: + buy_price = self.cur_tick.ask_price_1 + + if self.quote_pos is None: + self.write_error(u'无法获取{}得持仓信息'.format(self.quote_symbol)) + return False + + vt_orderids = self.buy(vt_symbol=self.vt_symbol, + price=buy_price, + volume=grid.volume, + order_type=self.order_type, + order_time=self.cur_datetime, + grid=grid) + if len(vt_orderids) > 0: + self.write_log(u'创建{}事务多单,开仓价:{},数量:{},止盈价:{},止损价:{}' + .format(grid.type, grid.open_price, grid.volume, grid.close_price, grid.stop_price)) + + self.gt.save() + return True + else: + self.write_error(u'创建{}事务多单,委托失败,开仓价:{},数量:{},止盈价:{}' + .format(grid.type, grid.open_price, grid.volume, grid.close_price)) + return False + + + def grid_sell(self, grid): + """ + 事务平多单仓位 + 1.来源自止损止盈平仓 + :param 平仓网格 + :return: + """ + self.write_log(u'执行事务平多仓位:{}'.format(grid.to_json())) + + if self.asset_pos is None: + self.write_error(u'无法获取{}得持仓信息'.format(self.asset_symbol)) + return False + + # 发出委托卖出单 + if self.backtesting: + sell_price = self.cur_price - self.price_tick + else: + sell_price = self.cur_tick.bid_price_1 + + # 发出平多委托 + if grid.traded_volume > 0: + grid.volume -= grid.traded_volume + grid.volume = round(grid.volume, 7) + grid.traded_volume = 0 + + if self.asset_pos.volume <= 0: + self.write_error(u'当前{}的净持仓:{},不能平多单' + .format(self.asset_symbol, + self.asset_pos.volume)) + return False + + if self.asset_pos.volume < grid.volume: + self.write_error(u'当前{}的净持仓:{},不满足平仓目标:{}, 强制降低' + .format(self.asset_symbol, + self.asset_pos.volume, + grid.volume)) + + grid.volume = self.asset_pos.volume + + vt_orderids = self.sell( + vt_symbol=self.vt_symbol, + price=sell_price, + volume=grid.volume, + order_type=self.order_type, + order_time=self.cur_datetime, + grid=grid) + if len(vt_orderids) == 0: + if self.backtesting: + self.write_error(u'多单平仓委托失败') + else: + self.write_error(u'多单平仓委托失败') + return False + else: + self.write_log(u'多单平仓委托成功,编号:{}'.format(vt_orderids)) + + return True + + + def cancel_all_orders(self): + """ + 重载撤销所有正在进行得委托 + :return: + """ + self.write_log(u'撤销所有正在进行得委托') + self.tns_cancel_logic(dt=datetime.now(), force=True, reopen=False) + + def tns_cancel_logic(self, dt, force=False, reopen=False): + "撤单逻辑""" + if len(self.active_orders) < 1: + self.entrust = 0 + return + + canceled_ids = [] + + for vt_orderid in list(self.active_orders.keys()): + order_info = self.active_orders[vt_orderid] + order_vt_symbol = order_info.get('vt_symbol', self.vt_symbol) + order_time = order_info['order_time'] + order_volume = order_info['volume'] - order_info['traded'] + order_grid = order_info['grid'] + order_status = order_info.get('status', Status.NOTTRADED) + order_type = order_info.get('order_type', OrderType.LIMIT) + over_seconds = (dt - order_time).total_seconds() + + # 只处理未成交的限价委托单 + if order_status in [Status.SUBMITTING, Status.NOTTRADED] and order_type == OrderType.LIMIT: + if over_seconds > self.cancel_seconds or force: # 超过设置的时间还未成交 + self.write_log(u'超时{}秒未成交,取消委托单:vt_orderid:{},order:{}' + .format(over_seconds, vt_orderid, order_info)) + order_info.update({'status': Status.CANCELLING}) + self.active_orders.update({vt_orderid: order_info}) + ret = self.cancel_order(str(vt_orderid)) + if not ret: + self.write_log(u'撤单失败,更新状态为撤单成功') + order_info.update({'status': Status.CANCELLED}) + self.active_orders.update({vt_orderid: order_info}) + if order_grid and vt_orderid in order_grid.order_ids: + order_grid.order_ids.remove(vt_orderid) + + continue + + # 处理状态为‘撤销’的委托单 + elif order_status == Status.CANCELLED: + self.write_log(u'委托单{}已成功撤单,删除{}'.format(vt_orderid, order_info)) + canceled_ids.append(vt_orderid) + + if order_info['offset'] == Offset.OPEN \ + and order_grid \ + and len(order_grid.order_ids) == 0 \ + and not order_grid.open_status \ + and not order_grid.order_status \ + and order_grid.traded_volume == 0: + self.write_log(u'移除从未开仓成功的委托网格{}'.format(order_grid.__dict__)) + order_info['grid'] = None + self.gt.remove_grids_by_ids(direction=order_grid.direction, ids=[order_grid.id]) + + # 删除撤单的订单 + for vt_orderid in canceled_ids: + self.write_log(f'活动订单撤单成功,移除{vt_orderid}') + self.active_orders.pop(vt_orderid, None) + + if len(self.active_orders) == 0: + self.entrust = 0 + + def display_grids(self): + """更新网格显示信息""" + if not self.inited: + return + + self.assett_pos = self.cta_engine.get_position(vt_symbol=f'{self.asset_symbol}.{self.exchange.value}', direction=Direction.NET) + if self.asset_pos: + self.write_log( + f'账号{self.asset_symbol}持仓:{self.asset_pos.volume}, 冻结:{self.asset_pos.frozen}') + + self.quote_pos = self.cta_engine.get_position(vt_symbol=f'{self.quote_symbol}.{self.exchange.value}', direction=Direction.NET) + if self.quote_pos: + self.write_log( + f'账号{self.quote_symbol}持仓:{self.quote_pos.volume}, 冻结:{self.quote_pos.frozen}') + + dn_grids_info = "" + for grid in list(self.gt.dn_grids): + if grid.close_status and not grid.open_status and grid.order_status: + dn_grids_info += f'平多中: {grid.vt_symbol}[已平:{grid.traded_volume} => 目标:{grid.volume}, 平仓价格:{grid.close_price},委托时间:{grid.order_time}]\n' + if len(grid.order_ids) > 0: + dn_grids_info += f'委托单号:{grid.order_ids}' + continue + + if grid.open_status and not grid.order_status and not grid.close_status: + dn_grids_info += f'持多中: {grid.vt_symbol}[数量:{grid.volume}, 开仓价格:{grid.open_price},开仓时间:{grid.open_time}]\n' + continue + + if not grid.open_status and grid.order_status and not grid.close_status: + dn_grids_info += f'开多中: {grid.vt_symbol}[已开:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}]\n' + if len(grid.order_ids) > 0: + dn_grids_info += f'委托单号:{grid.order_ids}' + + if len(dn_grids_info) > 0: + self.write_log(dn_grids_info) + + def display_tns(self): + """显示事务的过程记录=》 log""" + if not self.inited: + return + self.write_log(u'{} 当前 {}价格:{}, 委托状态:{}' + .format(self.cur_datetime, self.vt_symbol, self.cur_price, self.entrust)) + + if len(self.active_orders) > 0: + self.write_log('当前活动订单:{}'.format(json.dumps(self.active_orders, indent=2, ensure_ascii=False))) if hasattr(self, 'policy'): policy = getattr(self, 'policy') diff --git a/vnpy/app/cta_stock/engine.py b/vnpy/app/cta_stock/engine.py index 6b262003..962a9a8d 100644 --- a/vnpy/app/cta_stock/engine.py +++ b/vnpy/app/cta_stock/engine.py @@ -184,6 +184,7 @@ class CtaEngine(BaseEngine): register the funcs to main_engine :return: """ + self.main_engine.get_name = self.get_name self.main_engine.get_strategy_status = self.get_strategy_status self.main_engine.get_strategy_pos = self.get_strategy_pos self.main_engine.compare_pos = self.compare_pos @@ -195,6 +196,7 @@ class CtaEngine(BaseEngine): self.main_engine.reload_strategy = self.reload_strategy self.main_engine.save_strategy_data = self.save_strategy_data self.main_engine.save_strategy_snapshot = self.save_strategy_snapshot + self.main_engine.clean_strategy_cache = self.clean_strategy_cache # 注册到远程服务调用 if self.main_engine.rpc_service: @@ -209,6 +211,7 @@ class CtaEngine(BaseEngine): self.main_engine.rpc_service.register(self.main_engine.reload_strategy) self.main_engine.rpc_service.register(self.main_engine.save_strategy_data) self.main_engine.rpc_service.register(self.main_engine.save_strategy_snapshot) + self.main_engine.rpc_service.register(self.main_engine.clean_strategy_cache) def process_timer_event(self, event: Event): """ 处理定时器事件""" @@ -1246,6 +1249,15 @@ class CtaEngine(BaseEngine): self.write_error(u'保存策略{}数据异常:'.format(strategy_name, str(ex))) self.write_error(traceback.format_exc()) + def clean_strategy_cache(self, strategy_name): + """清除策略K线缓存文件""" + cache_file = os.path.abspath(os.path.join(self.get_data_path(), f'{strategy_name}_klines.pkb2')) + if os.path.exists(cache_file): + self.write_log(f'移除策略缓存文件:{cache_file}') + os.remove(cache_file) + else: + self.write_log(f'策略缓存文件不存在:{cache_file}') + def get_strategy_snapshot(self, strategy_name): """实时获取策略的K线切片(比较耗性能)""" strategy = self.strategies.get(strategy_name, None) @@ -1380,7 +1392,8 @@ class CtaEngine(BaseEngine): self.class_module_map[class_name] = module_name return True except: # noqa - msg = f"策略文件{module_name}加载失败,触发异常:\n{traceback.format_exc()}" + account = self.engine_config.get('accountid', '') + msg = f"cta_stock:{account}策略文件{module_name}加载失败,触发异常:\n{traceback.format_exc()}" self.write_log(msg=msg, level=logging.CRITICAL) return False @@ -1615,8 +1628,8 @@ class CtaEngine(BaseEngine): pos_compare_result += '\n{}: '.format(vt_symbol) # 多单不一致 if round(symbol_pos['策略多单'], 7) != round(symbol_pos['账号多单'], 7): - msg = '{}多单[账号({}), 策略{},共({})], ' \ - .format(vt_symbol, + msg = '{}[账号({}), 策略{},共({})], ' \ + .format(self.get_name(vt_symbol), symbol_pos['账号多单'], symbol_pos['多单策略'], symbol_pos['策略多单']) diff --git a/vnpy/app/cta_stock/template.py b/vnpy/app/cta_stock/template.py index c51e272b..9b824100 100644 --- a/vnpy/app/cta_stock/template.py +++ b/vnpy/app/cta_stock/template.py @@ -574,6 +574,7 @@ class CtaStockTemplate(CtaTemplate): :return: """ self.write_log(u'init_position(),初始化持仓') + subscribe_symbols = [] if len(self.gt.dn_grids) <= 0: # 加载已开仓的多数据,网格JSON @@ -615,6 +616,13 @@ class CtaStockTemplate(CtaTemplate): self.positions.update({lg.vt_symbol: pos}) + if len(lg.vt_symbol) > 0 and lg.vt_symbol not in self.vt_symbols and lg.vt_symbol not in subscribe_symbols: + subscribe_symbols.append(lg.vt_symbol) + + for vt_symbol in subscribe_symbols: + self.write_log(f'{vt_symbol}不在配置清单中,添加行情订阅') + self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=vt_symbol) + self.gt.save() self.display_grids() @@ -1301,6 +1309,18 @@ class CtaStockTemplate(CtaTemplate): """显示事务的过程记录=》 log""" if not self.inited: return + + if not self.backtesting: + for vt_symbol in self.vt_symbols: + name = self.cta_engine.get_name(vt_symbol) + price = self.cta_engine.get_price(vt_symbol) + self.write_log('%-11s'%vt_symbol + '[%-12s'%name + f'] 当前价格: {price}') + + self.write_log(f'当前委托状态:{self.entrust}') + + if len(self.active_orders) > 0: + self.write_log('当前活动订单:{}'.format(self.active_orders)) + if hasattr(self, 'policy'): policy = getattr(self, 'policy') op = getattr(policy, 'to_json', None) diff --git a/vnpy/app/cta_strategy_pro/engine.py b/vnpy/app/cta_strategy_pro/engine.py index 7da06283..81c7c97f 100644 --- a/vnpy/app/cta_strategy_pro/engine.py +++ b/vnpy/app/cta_strategy_pro/engine.py @@ -189,6 +189,7 @@ class CtaEngine(BaseEngine): register the funcs to main_engine :return: """ + self.main_engine.get_name = self.get_name self.main_engine.get_strategy_status = self.get_strategy_status self.main_engine.get_strategy_pos = self.get_strategy_pos self.main_engine.compare_pos = self.compare_pos @@ -200,6 +201,7 @@ class CtaEngine(BaseEngine): self.main_engine.reload_strategy = self.reload_strategy self.main_engine.save_strategy_data = self.save_strategy_data self.main_engine.save_strategy_snapshot = self.save_strategy_snapshot + self.main_engine.clean_strategy_cache = self.clean_strategy_cache # 注册到远程服务调用 if self.main_engine.rpc_service: @@ -214,6 +216,7 @@ class CtaEngine(BaseEngine): self.main_engine.rpc_service.register(self.main_engine.reload_strategy) self.main_engine.rpc_service.register(self.main_engine.save_strategy_data) self.main_engine.rpc_service.register(self.main_engine.save_strategy_snapshot) + self.main_engine.rpc_service.register(self.main_engine.clean_strategy_cache) def process_timer_event(self, event: Event): """ 处理定时器事件""" @@ -794,6 +797,15 @@ class CtaEngine(BaseEngine): return True + @lru_cache() + def get_name(self, vt_symbol: str): + """查询合约的name""" + contract = self.main_engine.get_contract(vt_symbol) + if contract is None: + self.write_error(f'查询不到{vt_symbol}合约信息') + return vt_symbol + return contract.name + @lru_cache() def get_size(self, vt_symbol: str): """查询合约的size""" @@ -1258,6 +1270,15 @@ class CtaEngine(BaseEngine): self.write_error(u'保存策略{}数据异常:'.format(strategy_name, str(ex))) self.write_error(traceback.format_exc()) + def clean_strategy_cache(self, strategy_name): + """清除策略K线缓存文件""" + cache_file = os.path.abspath(os.path.join(self.get_data_path(), f'{strategy_name}_klines.pkb2')) + if os.path.exists(cache_file): + self.write_log(f'移除策略缓存文件:{cache_file}') + os.remove(cache_file) + else: + self.write_log(f'策略缓存文件不存在:{cache_file}') + def get_strategy_snapshot(self, strategy_name): """实时获取策略的K线切片(比较耗性能)""" strategy = self.strategies.get(strategy_name, None) diff --git a/vnpy/app/cta_strategy_pro/template.py b/vnpy/app/cta_strategy_pro/template.py index 605d5c3d..71289af8 100644 --- a/vnpy/app/cta_strategy_pro/template.py +++ b/vnpy/app/cta_strategy_pro/template.py @@ -7,7 +7,7 @@ import traceback import zlib from abc import ABC -from copy import copy +from copy import copy,deepcopy from typing import Any, Callable from logging import INFO, ERROR from datetime import datetime @@ -913,7 +913,8 @@ class CtaProTemplate(CtaTemplate): return none_mi_price = max(none_mi_tick.last_price, none_mi_tick.bid_price_1) - grid = copy(none_mi_grid) + grid = deepcopy(none_mi_grid) + grid.id = str(uuid.uuid1()) # 委托卖出非主力合约 vt_orderids = self.sell(price=none_mi_price, @@ -925,7 +926,6 @@ class CtaProTemplate(CtaTemplate): self.write_log(f'切换合约,委托卖出非主力合约{none_mi_symbol}持仓:{none_mi_grid.volume}') # 添加买入主力合约 - grid.id = str(uuid.uuid1()) grid.snapshot.update({'mi_symbol': self.vt_symbol, 'open_price': self.cur_mi_price}) self.gt.dn_grids.append(grid) @@ -982,7 +982,8 @@ class CtaProTemplate(CtaTemplate): return none_mi_price = max(none_mi_tick.last_price, none_mi_tick.bid_price_1) - grid = copy(none_mi_grid) + grid = deepcopy(none_mi_grid) + grid.id = str(uuid.uuid1()) # 委托平空非主力合约 vt_orderids = self.cover(price=none_mi_price, volume=none_mi_grid.volume, @@ -1128,7 +1129,7 @@ class CtaProFutureTemplate(CtaProTemplate): def on_trade(self, trade: TradeData): """交易更新""" - self.write_log(u'{},交易更新:{},当前持仓:{} ' + self.write_log(u'{},交易更新事件:{},当前持仓:{} ' .format(self.cur_datetime, trade.__dict__, self.position.pos)) @@ -1175,7 +1176,7 @@ class CtaProFutureTemplate(CtaProTemplate): if order_info: volume = order_info.get('volume') if volume != order.volume: - self.write_log(f'调整{order.vt_orderid} volume:{volume}=>{order.volume}') + self.write_log(f'修正order被拆单得情况,调整{order.vt_orderid} volume:{volume}=>{order.volume}') order_info.update({'volume': order.volume}) def on_order(self, order: OrderData): @@ -1222,7 +1223,7 @@ class CtaProFutureTemplate(CtaProTemplate): :param order: :return: """ - self.write_log(u'{},委托单:{}全部完成'.format(order.time, order.vt_orderid)) + self.write_log(u'委托单全部完成:{}'.format(order.__dict__)) order_info = self.active_orders[order.vt_orderid] # 通过vt_orderid,找到对应的网格 @@ -1265,7 +1266,8 @@ class CtaProFutureTemplate(CtaProTemplate): self.write_log(f'剩余委托单号:{grid.order_ids}') self.gt.save() - + else: + self.write_error(f'on_trade找不到对应grid') # 在策略得活动订单中,移除 self.active_orders.pop(order.vt_orderid, None) @@ -1380,7 +1382,6 @@ class CtaProFutureTemplate(CtaProTemplate): self.gt.save() - elif old_order['direction'] == Direction.SHORT and order_type == OrderType.FAK: # 删除旧的委托记录 self.write_log(u'移除旧的委托记录:{}'.format(order.vt_orderid)) @@ -1771,12 +1772,17 @@ class CtaProFutureTemplate(CtaProTemplate): if self.activate_today_lock: self.write_log(u'昨仓多单:{},没有今仓,满足条件,直接平昨仓'.format(grid_pos.long_yd)) - sell_price = self.cta_engine.get_price(sell_symbol) if sell_price is None: self.write_error(f'暂时不能获取{sell_symbol}价格,不能平仓') return False + # 实盘使用对价 + if not self.backtesting: + sell_tick = self.cta_engine.get_tick(sell_symbol) + if sell_tick and 0 < sell_tick.bid_price_1 < sell_price: + sell_price = sell_tick.bid_price_1 + # 发出平多委托 if grid.traded_volume > 0: grid.volume -= grid.traded_volume @@ -1871,6 +1877,12 @@ class CtaProFutureTemplate(CtaProTemplate): self.write_error(f'暂时没有{cover_symbol}行情,不能执行平仓') return False + # 实盘使用对价 + if not self.backtesting: + cover_tick = self.cta_engine.get_tick(cover_symbol) + if cover_tick and 0 < cover_price < cover_tick.ask_price_1 : + cover_price = cover_tick.ask_price_1 + # 发出cover委托 if grid.traded_volume > 0: grid.volume -= grid.traded_volume diff --git a/vnpy/component/base.py b/vnpy/component/base.py index 131db431..1e20ac5e 100644 --- a/vnpy/component/base.py +++ b/vnpy/component/base.py @@ -35,7 +35,7 @@ NIGHT_MARKET_ZZ = {'TA': 2, 'JR': 1, 'OI': 0, 'RO': 1, 'PM': 1, 'WH': 1, 'CF': 5 # 大商所夜盘,9:00~10:15, 10:30~11:30, 13:30~15:00, 21:00 ~23:00 NIGHT_MARKET_DL = {'V': 5, 'L': 5, 'BB': 0.05, 'I': 0.5, 'FB': 0.05, 'C': 1, 'PP': 1, 'A': 1, 'B': 1, 'M': 1, 'Y': 2, 'P': 2, - 'JM': 0.5, 'J': 0.5, 'EG': 1} + 'JM': 0.5, 'J': 0.5, 'EG': 1, 'EB': 1} # 中金日盘,9:15 ~11:30, 13:00~15:15 MARKET_ZJ = {'IC': 0.2, 'IF': 0.2, 'IH': 0.2, 'T': 0.005, 'TF': 0.005, 'TS': 0.005} diff --git a/vnpy/gateway/binance/binance_gateway.py b/vnpy/gateway/binance/binance_gateway.py index d8a93e67..600ec66f 100644 --- a/vnpy/gateway/binance/binance_gateway.py +++ b/vnpy/gateway/binance/binance_gateway.py @@ -466,8 +466,8 @@ class BinanceRestApi(RestClient): frozen=0, gateway_name=self.gateway_name ) - self.gateway.on_account(account) + put_event_account = True # ==> position event for account_data in data["balances"]: pos = PositionData( @@ -488,7 +488,27 @@ class BinanceRestApi(RestClient): self.positions.update({pos.symbol: pos}) self.gateway.on_position(copy(pos)) - self.gateway.write_log("账户资金查询成功") + if pos.symbol == 'USDT': + pos.cur_price = 1 + account.balance += pos.volume + else: + price = self.gateway.prices.get(f'{pos.symbol}USDT.{pos.exchange.value}', None) + + if price is None: + req = SubscribeRequest( + symbol=f'{pos.symbol}USDT', + exchange=pos.exchange + ) + self.gateway.subscribe(req) + put_event_account = False + else: + pos.cur_price = price + account.balance += pos.volume * price + + + if put_event_account: + self.gateway.on_account(account) + #self.gateway.write_log("账户资金查询成功") def on_query_order(self, data, request): """""" diff --git a/vnpy/gateway/ib/ib_gateway.py b/vnpy/gateway/ib/ib_gateway.py index b9848093..b0f37b90 100644 --- a/vnpy/gateway/ib/ib_gateway.py +++ b/vnpy/gateway/ib/ib_gateway.py @@ -330,7 +330,13 @@ class IbApi(EWrapper): if exchange is Exchange.IDEALPRO: tick.last_price = (tick.bid_price_1 + tick.ask_price_1) / 2 tick.datetime = datetime.now() - self.gateway.on_tick(copy(tick)) + # 有些错误数据过来,例如ask_price1 = -1.0 + if tick.ask_price_1 < tick.last_price: + return + if tick.bid_price_1 < min(tick.last_price - 10 * contract.pricetick, tick.last_price * 0.8): + return + if tick.last_price != 0: + self.gateway.on_tick(copy(tick)) def tickSize( self, reqId: TickerId, tickType: TickType, size: int @@ -346,8 +352,10 @@ class IbApi(EWrapper): tick = self.ticks[reqId] name = TICKFIELD_IB2VT[tickType] setattr(tick, name, size) - - self.gateway.on_tick(copy(tick)) + if tick.ask_volume_1 == 0 or tick.bid_volume_1 == 0: + return + if tick.last_price != 0: + self.gateway.on_tick(copy(tick)) def tickString( self, reqId: TickerId, tickType: TickType, value: str @@ -362,8 +370,12 @@ class IbApi(EWrapper): tick = self.ticks[reqId] tick.datetime = datetime.fromtimestamp(int(value)) - - self.gateway.on_tick(copy(tick)) + if tick.ask_price_1 < tick.last_price: + return + if tick.bid_price_1 < tick.last_price * 0.8: + return + if tick.last_price != 0: + self.gateway.on_tick(copy(tick)) def orderStatus( # pylint: disable=invalid-name self, @@ -381,6 +393,7 @@ class IbApi(EWrapper): ): """ Callback of order status update. + 委托单状态变化 """ super().orderStatus( orderId, @@ -398,6 +411,10 @@ class IbApi(EWrapper): orderid = str(orderId) order = self.orders.get(orderid, None) + if order is None: + self.gateway.write_error(f'无法获取{orderid}在本地的缓存委托单') + return + order.traded = filled # To filter PendingCancel status @@ -422,8 +439,11 @@ class IbApi(EWrapper): ) orderid = str(orderId) + # ==> 生成 xxxx-HKD-STK等格式的合约名称 + symbol = generate_symbol(ib_contract) + order = OrderData( - symbol=ib_contract.conId, + symbol=symbol, exchange=EXCHANGE_IB2VT.get( ib_contract.exchange, ib_contract.exchange), type=ORDERTYPE_IB2VT[ib_order.orderType], @@ -475,6 +495,7 @@ class IbApi(EWrapper): ): """ Callback of position update. + 持仓更新 """ super().updatePortfolio( contract, @@ -527,14 +548,17 @@ class IbApi(EWrapper): def contractDetails(self, reqId: int, contractDetails: ContractDetails): # pylint: disable=invalid-name """ Callback of contract data update. + 合约数据更新 """ super().contractDetails(reqId, contractDetails) # Generate symbol from ib contract details ib_contract = contractDetails.contract + # 合约乘数 if not ib_contract.multiplier: ib_contract.multiplier = 1 + # ==> 生成 xxxx-HKD-STK等格式的合约名称 symbol = generate_symbol(ib_contract) # Generate contract @@ -562,19 +586,22 @@ class IbApi(EWrapper): ): # pylint: disable=invalid-name """ Callback of trade data update. + 交易数据更新 """ super().execDetails(reqId, contract, execution) # today_date = datetime.now().strftime("%Y%m%d") + dt = datetime.strptime(execution.time, "%Y%m%d %H:%M:%S") trade = TradeData( - symbol=contract.conId, + symbol=generate_symbol(contract), exchange=EXCHANGE_IB2VT.get(contract.exchange, contract.exchange), orderid=str(execution.orderId), tradeid=str(execution.execId), direction=DIRECTION_IB2VT[execution.side], price=execution.price, volume=execution.shares, - time=datetime.strptime(execution.time, "%Y%m%d %H:%M:%S"), + datetime=dt, + time=dt.strftime('%H:%M:%S'), gateway_name=self.gateway_name, ) @@ -583,6 +610,7 @@ class IbApi(EWrapper): def managedAccounts(self, accountsList: str): """ Callback of all sub accountid. + 所有子账号信息更新 """ super().managedAccounts(accountsList) @@ -596,6 +624,7 @@ class IbApi(EWrapper): def historicalData(self, reqId: int, ib_bar: IbBarData): """ Callback of history data update. + 历史行情 """ dt = datetime.strptime(ib_bar.date, "%Y%m%d %H:%M:%S") @@ -617,6 +646,7 @@ class IbApi(EWrapper): def historicalDataEnd(self, reqId: int, start: str, end: str): """ Callback of history data finished. + 行情数据推送结束 """ self.history_condition.acquire() self.history_condition.notify() @@ -625,6 +655,7 @@ class IbApi(EWrapper): def connect(self, host: str, port: int, clientid: int, account: str): """ Connect to TWS. + 连接本地TWS """ if self.status: self.gateway.write_log(f'已连接,不再重连') @@ -650,6 +681,7 @@ class IbApi(EWrapper): def subscribe(self, req: SubscribeRequest): """ Subscribe tick data update. + 订阅行情 """ if not self.status: return @@ -659,6 +691,7 @@ class IbApi(EWrapper): return # Extract ib contract detail + # vn symbol => ib contract ib_contract = generate_ib_contract(req.symbol, req.exchange) if not ib_contract: self.gateway.write_log("代码解析失败,请检查格式是否正确") @@ -684,6 +717,7 @@ class IbApi(EWrapper): def send_order(self, req: OrderRequest): """ Send a new order. + 发送委托 """ if not self.status: return "" @@ -698,6 +732,7 @@ class IbApi(EWrapper): self.orderid += 1 + # vn symbol -> ib contract ib_contract = generate_ib_contract(req.symbol, req.exchange) if not ib_contract: return "" @@ -719,12 +754,14 @@ class IbApi(EWrapper): self.client.reqIds(1) order = req.create_order_data(str(self.orderid), self.gateway_name) + order.datetime = datetime.now() self.gateway.on_order(order) return order.vt_orderid def cancel_order(self, req: CancelRequest): """ Cancel an existing order. + 撤单 """ if not self.status: return @@ -738,7 +775,7 @@ class IbApi(EWrapper): self.reqid += 1 - # 转换为ib的合约 + # vn symbol => ib的合约 ib_contract = generate_ib_contract(req.symbol, req.exchange) if req.end: @@ -782,15 +819,21 @@ class IbApi(EWrapper): return history def load_contract_data(self): - """""" - f = shelve.open(self.data_filepath) - self.contracts = f.get("contracts", {}) - f.close() + """ + 加载本地缓存合约数据 + :return: + """ + try: + f = shelve.open(self.data_filepath) + self.contracts = f.get("contracts", {}) + f.close() - for contract in self.contracts.values(): - self.gateway.on_contract(contract) + for contract in self.contracts.values(): + self.gateway.on_contract(contract) - self.gateway.write_log("本地缓存合约信息加载成功") + self.gateway.write_log("本地缓存合约信息加载成功") + except Exception as ex: + self.gateway.write_error(f'本地缓存合约信息加载失败:{str(ex)}') def save_contract_data(self): """""" diff --git a/vnpy/gateway/xtp/xtp_gateway.py b/vnpy/gateway/xtp/xtp_gateway.py index 7ef08d22..7f841e43 100644 --- a/vnpy/gateway/xtp/xtp_gateway.py +++ b/vnpy/gateway/xtp/xtp_gateway.py @@ -308,7 +308,7 @@ class XtpMdApi(MdApi): tick.ask_volume_1, tick.ask_volume_2, tick.ask_volume_3, tick.ask_volume_4, tick.ask_volume_5 = data["ask_qty"][0:5] tick.name = get_vt_symbol_name(tick.vt_symbol) - self.gateway.prices.update({tick.vt_symbol: tick.last_price}) + #self.gateway.prices.update({tick.vt_symbol: tick.last_price}) self.gateway.on_tick(tick) def onSubOrderBook(self, data: dict, error: dict, last: bool) -> None: diff --git a/vnpy/trader/converter.py b/vnpy/trader/converter.py index 71da7e00..4b0181ee 100644 --- a/vnpy/trader/converter.py +++ b/vnpy/trader/converter.py @@ -29,7 +29,8 @@ class OffsetConverter: # return holding = self.get_position_holding(position.vt_symbol, position.gateway_name) - holding.update_position(position) + if holding: + holding.update_position(position) def update_trade(self, trade: TradeData) -> None: """""" @@ -66,6 +67,8 @@ class OffsetConverter: holding = self.holdings.get(k, None) if not holding: contract = self.main_engine.get_contract(vt_symbol) + if contract is None: + return None holding = PositionHolding(contract) self.holdings[k] = holding return holding diff --git a/vnpy/trader/engine.py b/vnpy/trader/engine.py index 9ec9c6b1..90c3dd11 100644 --- a/vnpy/trader/engine.py +++ b/vnpy/trader/engine.py @@ -463,6 +463,7 @@ class OmsEngine(BaseEngine): # 更新自定义合约 custom_contracts = self.get_all_custom_contracts() + self.get_all_custom_contracts(rtn_setting=True) for contract in custom_contracts.values(): # 更新合约缓存 @@ -481,7 +482,7 @@ class OmsEngine(BaseEngine): spd_mapping_list = self.symbol_spd_maping.get(symbol, []) # 更新映射 symbol => spd_symbol - if contract.symbol not in spd_mapping_list: + if (not contract.symbol.endswith('.SPD')) and contract.symbol not in spd_mapping_list: spd_mapping_list.append(contract.symbol) self.symbol_spd_maping.update({symbol: spd_mapping_list}) @@ -556,6 +557,9 @@ class OmsEngine(BaseEngine): position = event.data self.positions[position.vt_positionid] = position + if position.exchange != Exchange.SPD: + self.create_spd_position_event(position.symbol, position.direction) + def reverse_direction(self, direction): """返回反向持仓""" if direction == Direction.LONG: @@ -586,28 +590,34 @@ class OmsEngine(BaseEngine): leg2_ratio = spd_setting.get('leg2_ratio', 1) # 找出leg1,leg2的持仓,并判断出spd的方向 + spd_pos = None if leg1_symbol == symbol: k1 = f"{leg1_contract.gateway_name}.{leg1_contract.vt_symbol}.{direction.value}" leg1_pos = self.positions.get(k1) k2 = f"{leg2_contract.gateway_name}.{leg2_contract.vt_symbol}.{self.reverse_direction(direction).value}" leg2_pos = self.positions.get(k2) spd_direction = direction + k3 = f"{spd_contract.gateway_name}.{spd_symbol}.{Exchange.SPD.value}.{spd_direction.value}" + spd_pos = self.positions.get(k3) elif leg2_symbol == symbol: k1 = f"{leg1_contract.gateway_name}.{leg1_contract.vt_symbol}.{self.reverse_direction(direction).value}" leg1_pos = self.positions.get(k1) k2 = f"{leg2_contract.gateway_name}.{leg2_contract.vt_symbol}.{direction.value}" leg2_pos = self.positions.get(k2) spd_direction = self.reverse_direction(direction) + k3 = f"{spd_contract.gateway_name}.{spd_symbol}.{Exchange.SPD.value}.{spd_direction.value}" + spd_pos = self.positions.get(k3) else: continue - if leg1_pos is None or leg2_pos is None or leg1_pos.volume ==0 or leg2_pos.volume == 0: + if leg1_pos is None or leg2_pos is None: # or leg1_pos.volume ==0 or leg2_pos.volume == 0: continue # 根据leg1/leg2的volume ratio,计算出最小spd_volume spd_volume = min(int(leg1_pos.volume/leg1_ratio), int(leg2_pos.volume/leg2_ratio)) - if spd_volume <= 0: + if spd_volume <= 0 and spd_pos is None: continue + if spd_setting.get('is_ratio', False) and leg2_pos.price > 0: spd_price = 100 * (leg2_pos.price * leg1_ratio) / (leg2_pos.price * leg2_ratio) elif spd_setting.get('is_spread', False): @@ -617,6 +627,7 @@ class OmsEngine(BaseEngine): spd_pos = PositionData( gateway_name=spd_contract.gateway_name, + accountid=leg1_pos.accountid, symbol=spd_symbol, exchange=Exchange.SPD, direction=spd_direction, diff --git a/vnpy/trader/gateway.py b/vnpy/trader/gateway.py index 5fa796f2..141161cd 100644 --- a/vnpy/trader/gateway.py +++ b/vnpy/trader/gateway.py @@ -90,6 +90,8 @@ class BaseGateway(ABC): self.gateway_name: str = gateway_name self.logger = None + self.accountid = "" + self.create_logger() # 所有订阅on_bar的都会添加 @@ -124,6 +126,7 @@ class BaseGateway(ABC): Tick event push. Tick event of a specific vt_symbol is also pushed. """ + self.prices.update({tick.vt_symbol: tick.last_price}) self.on_event(EVENT_TICK, tick) # self.on_event(EVENT_TICK + tick.vt_symbol, tick) diff --git a/vnpy/trader/ui/kline/kline.py b/vnpy/trader/ui/kline/kline.py index 260f46f4..c6a1d4d0 100644 --- a/vnpy/trader/ui/kline/kline.py +++ b/vnpy/trader/ui/kline/kline.py @@ -434,7 +434,6 @@ class KLineWidget(KeyWraper): # 交易事务有关的线段 self.list_trans = [] # 交易事务( {'start_time','end_time','tns_type','start_price','end_price','start_x','end_x','completed'} - self.list_trans_lines = [] # 交易记录相关的箭头 self.list_trade_arrow = [] # 交易图标 list @@ -446,6 +445,9 @@ class KLineWidget(KeyWraper): self.x_t_markup_map = OrderedDict() # x轴与标记的映射 self.t_markup_dict = OrderedDict() # t 时间的标记 + # 缠论相关的线段 + self.list_bi = [] + # 所有K线上指标 self.main_color_pool = deque(['red', 'green', 'yellow', 'white']) self.main_indicator_data = {} # 主图指标数据(字典,key是指标,value是list) @@ -935,7 +937,6 @@ class KLineWidget(KeyWraper): self.t_trade_dict = OrderedDict() self.list_trans = [] - self.list_trans_lines = [] self.list_markup = [] self.x_t_markup_map = OrderedDict() @@ -1115,7 +1116,7 @@ class KLineWidget(KeyWraper): if direction == Direction.LONG: if offset == Offset.OPEN: # buy - arrow = pg.ArrowItem(pos=(x, price), angle=135, brush=None, pen={'color': 'r', 'width': 1}, + arrow = pg.ArrowItem(pos=(x, price), angle=135, brush=None, pen={'color': 'y', 'width': 2}, tipAngle=30, baseAngle=20, tailLen=10, tailWidth=2) # d = { # "pos": (x, price), @@ -1129,17 +1130,17 @@ class KLineWidget(KeyWraper): # arrow.setData([d]) else: # cover - arrow = pg.ArrowItem(pos=(x, price), angle=0, brush=(255, 0, 0), pen=None, headLen=20, headWidth=20, + arrow = pg.ArrowItem(pos=(x, price), angle=0, brush='y', pen=None, headLen=20, headWidth=20, tailLen=10, tailWidth=2) # 空信号 elif direction == Direction.SHORT: if offset == Offset.CLOSE: # sell - arrow = pg.ArrowItem(pos=(x, price), angle=0, brush=(0, 255, 0), pen=None, headLen=20, headWidth=20, + arrow = pg.ArrowItem(pos=(x, price), angle=0, brush='g', pen=None, headLen=20, headWidth=20, tailLen=10, tailWidth=2) else: # short - arrow = pg.ArrowItem(pos=(x, price), angle=-135, brush=None, pen={'color': 'g', 'width': 1}, + arrow = pg.ArrowItem(pos=(x, price), angle=-135, brush=None, pen={'color': 'g', 'width': 2}, tipAngle=30, baseAngle=20, tailLen=10, tailWidth=2) if arrow: self.pi_main.addItem(arrow) @@ -1374,6 +1375,84 @@ class KLineWidget(KeyWraper): self.add_markup(t_value=t_value, price=price, txt=markup_text) + + def add_bi(self, df_bi, color='b', style= None): + """ + 添加缠论_笔(段)_画线 + # direction,(1/-1),start, end, high, low + # 笔: color = 'y', style: QtCore.Qt.DashLine + # 段: color = 'b', + :return: + """ + if len(self.datas) == 0 or len(df_bi) == 0: + print(u'No datas exist', file=sys.stderr) + return + + for index, row in df_bi.iterrows(): + + start_time = row['start'] + if not isinstance(start_time, datetime) and isinstance(start_time, str): + start_time = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') + + end_time = row['end'] + if not isinstance(end_time, datetime) and isinstance(end_time, str): + end_time = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S') + + start_x = self.axisTime.get_x_by_time(start_time) + end_x = self.axisTime.get_x_by_time(end_time) + + if int(row['direction']) == 1: + pos = np.array([[start_x, row['low']], [end_x, row['high']]]) + elif int(row['direction']) == -1: + pos = np.array([[start_x, row['high']], [end_x, row['low']]]) + else: + continue + + if style: + pen = pg.mkPen({'color': color, 'width': 1, 'style': QtCore.Qt.DashLine}) + else: + pen = pg.mkPen({'color': color, 'width': 1}) + + bi = pg.GraphItem(pos=pos, adj=np.array([[0, 1]]), pen=pen) + self.pi_main.addItem(bi) + + + def add_zs(self, df_zs, color='y'): + """ + 添加缠论中枢_画线 + # direction,(1/-1),start, end, high, low + # 笔中枢: color ='y' + # 段中枢: color = 'b' + :return: + """ + if len(self.datas) == 0 or len(df_zs) == 0: + print(u'No datas exist', file=sys.stderr) + return + + for index,row in df_zs.iterrows(): + + start_time = row['start'] + if not isinstance(start_time, datetime) and isinstance(start_time, str): + start_time = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') + + end_time = row['end'] + if not isinstance(end_time, datetime) and isinstance(end_time, str): + end_time = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S') + + start_x = self.axisTime.get_x_by_time(start_time) + end_x = self.axisTime.get_x_by_time(end_time) + + pos_top = np.array([[start_x, row['high']], [end_x, row['high']]]) + pos_buttom = np.array([[start_x, row['low']], [end_x, row['low']]]) + pos_left = np.array([[start_x, row['high']], [start_x, row['low']]]) + pos_right = np.array([[end_x, row['high']], [end_x, row['low']]]) + + pen = pg.mkPen({'color': color, 'width': 1}) + for pos in [pos_top, pos_buttom, pos_left, pos_right]: + line = pg.GraphItem(pos=pos, adj=np.array([[0, 1]]), pen=pen) + self.pi_main.addItem(line) + + def loadData(self, df_datas, main_indicators=[], sub_indicators=[]): """ 载入pandas.DataFrame数据 @@ -1563,6 +1642,34 @@ class GridKline(QtWidgets.QWidget): include_list=kline_setting.get('dist_include_list', []), exclude_list=['buy', 'short', 'sell', 'cover']) + # 笔 + bi_file = kline_setting.get('bi_file', None) + if bi_file and os.path.exists(bi_file): + print(f'loading {bi_file}') + df_bi = pd.read_csv(bi_file) + self.kline_dict[kline_name].add_bi(df_bi, color='y', style= QtCore.Qt.DashLine) + + # 段 + duan_file = kline_setting.get('duan_file', None) + if duan_file and os.path.exists(duan_file): + print(f'loading {duan_file}') + df_duan = pd.read_csv(duan_file) + self.kline_dict[kline_name].add_bi(df_duan, color='b') + + # 笔中枢 + bi_zs_file = kline_setting.get('bi_zs_file', None) + if bi_zs_file and os.path.exists(bi_zs_file): + print(f'loading {bi_zs_file}') + df_bi_zs = pd.read_csv(bi_zs_file) + self.kline_dict[kline_name].add_zs(df_bi_zs, color='y') + + # 段中枢 + duan_zs_file = kline_setting.get('duan_zs_file', None) + if duan_zs_file and os.path.exists(duan_zs_file): + print(f'loading {duan_zs_file}') + df_duan_zs = pd.read_csv(duan_zs_file) + self.kline_dict[kline_name].add_zs(df_duan_zs, color='b') + except Exception as ex: traceback.print_exc() QtWidgets.QMessageBox.warning(self, 'Exception', u'Load data Exception',