From d5599bc530f785e5111d0147ffdd0230fbb5719c Mon Sep 17 00:00:00 2001 From: msincenselee Date: Thu, 16 Apr 2020 15:01:00 +0800 Subject: [PATCH] =?UTF-8?q?[=E5=A2=9E=E5=BC=BA]=20=E7=AD=96=E7=95=A5?= =?UTF-8?q?=E5=88=87=E7=89=87=3D=E3=80=8B=E5=90=8C=E6=AD=A5=3D=E3=80=8Bmon?= =?UTF-8?q?godb?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/app/account_recorder/README.md | 1 + vnpy/app/account_recorder/engine.py | 175 ++++++++++++++-------- vnpy/app/cta_crypto/engine.py | 16 +- vnpy/app/cta_crypto/template.py | 22 ++- vnpy/app/cta_crypto/ui/widget.py | 10 +- vnpy/app/cta_strategy_pro/engine.py | 28 +++- vnpy/component/cta_line_bar.py | 50 ++++++- vnpy/gateway/binancef/binancef_gateway.py | 49 +++++- vnpy/gateway/ctp/ctp_gateway.py | 5 + vnpy/trader/engine.py | 2 +- vnpy/trader/event.py | 1 + vnpy/trader/object.py | 12 +- vnpy/trader/ui/widget.py | 5 +- 13 files changed, 283 insertions(+), 93 deletions(-) diff --git a/vnpy/app/account_recorder/README.md b/vnpy/app/account_recorder/README.md index d0cbddda..ef5254a2 100644 --- a/vnpy/app/account_recorder/README.md +++ b/vnpy/app/account_recorder/README.md @@ -8,6 +8,7 @@ 6. 监听股票接口的 EVENT_HISTORY_ORDER 事件, 数据 => history_trades 7. 监听股票接口的 EVENT_FUNDS_FLOW 事件, 数据 => funds_flow 8. 监听 EVENT_STRATEGY_POS事件,数据 =》 mongodb Account.today_strategy_pos + 9. 监听 EVENT_STRATEGY_SNAPSHOT事件, 数据=》 mongodb Account.strategy_snapshot 配置文件 ar_setting.json diff --git a/vnpy/app/account_recorder/engine.py b/vnpy/app/account_recorder/engine.py index 9c396858..b125c412 100644 --- a/vnpy/app/account_recorder/engine.py +++ b/vnpy/app/account_recorder/engine.py @@ -22,6 +22,7 @@ from datetime import datetime, timedelta from queue import Queue from threading import Thread from time import time +from bson import binary from vnpy.event import Event, EventEngine from vnpy.trader.event import ( @@ -34,6 +35,7 @@ from vnpy.trader.event import ( EVENT_HISTORY_ORDER, EVENT_FUNDS_FLOW, EVENT_STRATEGY_POS, + EVENT_STRATEGY_SNAPSHOT, EVENT_ERROR, EVENT_WARNING, EVENT_CRITICAL, @@ -55,13 +57,14 @@ HISTORY_ORDER_COL = 'history_orders' HISTORY_TRADE_COL = 'history_trades' HISTORY_STRATEGY_POS_COL = 'history_strategy_pos' FUNDS_FLOW_COL = 'funds_flow' +STRATEGY_SNAPSHOT = 'strategy_snapshot' ALERT_DB_NAME = "Alert" GW_ERROR_COL_NAME = "gw_error_msg" WARNING_COL_NAME = "warning_msg" CRITICAL_COL_NAME = "critical_msg" -APP_NAME = "ACCOUNT_RECORDER" +APP_NAME = "AccountRecorder" ######################################################################## @@ -129,11 +132,12 @@ class AccountRecorder(BaseEngine): # ---------------------------------------------------------------------- def load_setting(self): """读取配置""" + self.write_log(f'{self.name}读取配置') try: d = load_json(self.setting_file_name) # mongo 数据库连接 - mongo_seetting = d.get('mongo', {}) + mongo_seetting = d.get('mongo_db', {}) self.mongo_db = MongoData(host=mongo_seetting.get('host', 'localhost'), port=mongo_seetting.get('port', 27017)) @@ -151,6 +155,7 @@ class AccountRecorder(BaseEngine): if account_setting.get('copy_history_strategypos', False): self.copy_history_strategypos.append(gateway_name) + self.write_log(f'{self.name}读取配置完成') except Exception as ex: self.main_engine.writeCritical(u'读取:{}异常:{}'.format(self.setting_file_name, str(ex))) @@ -175,6 +180,7 @@ class AccountRecorder(BaseEngine): self.event_engine.register(EVENT_ERROR, self.process_gw_error) self.event_engine.register(EVENT_WARNING, self.process_warning) self.event_engine.register(EVENT_CRITICAL, self.process_critical) + self.event_engine.register(EVENT_STRATEGY_SNAPSHOT, self.update_strategy_snapshot) # ---------------------------------------------------------------------- def update_timer(self, event: Event): @@ -214,7 +220,7 @@ class AccountRecorder(BaseEngine): def update_account(self, event: Event): """更新账号资金""" account = event.data - fld = {'accountid': account.accountid, + fld = {'account_id': account.accountid, 'gateway_name': account.gateway_name, 'trading_day': account.trading_day, 'currency': account.currency} @@ -224,14 +230,15 @@ class AccountRecorder(BaseEngine): self.gw_name_acct_id.update({account.gateway_name: account.accountid}) - data = account.__dict__ + acc_data = copy.copy(account.__dict__) + acc_data.update({'account_id': acc_data.pop('accountid')}) # 更新至历史净值数据表 self.update_data(db_name=ACCOUNT_DB_NAME, col_name=DAILY_INFO_COL, fld=copy.copy(fld), - data=copy.copy(data)) + data=acc_data) fld.pop('trading_day', None) # 更新至最新净值数据 - self.update_data(db_name=ACCOUNT_DB_NAME, col_name=ACCOUNT_INFO_COL, fld=fld, data=data) + self.update_data(db_name=ACCOUNT_DB_NAME, col_name=ACCOUNT_INFO_COL, fld=fld, data=copy.copy(acc_data)) self.remove_pre_trading_day_data(account.accountid, account.trading_day) @@ -274,7 +281,7 @@ class AccountRecorder(BaseEngine): return begin_day - def remove_pre_trading_day_data(self, accountid: str, trading_day: str): + def remove_pre_trading_day_data(self, account_id: str, trading_day: str): """ 移除非当前交易日得所有当日交易数据 :param accountid: @@ -285,20 +292,35 @@ class AccountRecorder(BaseEngine): return # 移除非当日得交易/持仓 - flt = {'accountid': accountid, + flt = {'account_id': account_id, 'trade_date': {'$ne': trading_day}} - self.main_engine.dbDelete(ACCOUNT_DB_NAME, TODAY_TRADE_COL, flt) - self.main_engine.dbDelete(ACCOUNT_DB_NAME, TODAY_POSITION_COL, flt) + self.write_log(f'移除非当日交易持仓:{flt}') + self.mongo_db.db_delete( + db_name=ACCOUNT_DB_NAME, + col_name=TODAY_TRADE_COL, + flt=flt) + self.mongo_db.db_delete( + db_name=ACCOUNT_DB_NAME, + col_name=TODAY_POSITION_COL, + flt=flt) # 移除非当日得委托 - flt = {'accountid': accountid, + flt = {'account_id': account_id, 'order_date': {'$ne': trading_day}} - self.main_engine.dbDelete(ACCOUNT_DB_NAME, TODAY_ORDER_COL, flt) + self.write_log(f'移除非当日委托:{flt}') + self.mongo_db.db_delete( + db_name=ACCOUNT_DB_NAME, + col_name=TODAY_ORDER_COL, + flt=flt) # 移除非当日得持仓 - flt = {'account_id': accountid, + flt = {'account_id': account_id, 'trading_day': {'$ne': trading_day}} - self.main_engine.dbDelete(ACCOUNT_DB_NAME, TODAY_STRATEGY_POS_COL, flt) + self.write_log(f'移除非当日持仓:{flt}') + self.mongo_db.db_delete( + db_name=ACCOUNT_DB_NAME, + col_name=TODAY_STRATEGY_POS_COL, + flt=flt) self.is_remove_pre_data = True @@ -306,39 +328,46 @@ class AccountRecorder(BaseEngine): """更新当日记录""" order = event.data self.write_log(u'记录委托日志:{}'.format(order.__dict__)) - if len(order.sysOrderID) == 0: + if len(order.sys_orderid) == 0: # 未有系统的委托编号,不做持久化 return - - order_date = get_trading_date(datetime.now()) + dt = getattr(order, 'datetime') + if not dt: + order_date = datetime.now().strftime('%Y-%m-%d') + else: + order_date = dt.strftime('%Y-%m-%d') # 数据库需要提前建立多重索引 - # db.today_orders.createIndex({'accountid':1,'vt_symbol':1,'sys_orderid':1,'order_date':1,'holder_id':1},{'name':'accountid_vtsymbol_sysorderid_order_date_holder_id','unique':true}) - # db.today_orders.createIndex({'accountID':1}) + # db.today_orders.createIndex({'account_id':1,'vt_symbol':1,'sys_orderid':1,'order_date':1,'holder_id':1},{'name':'accountid_vtsymbol_sysorderid_order_date_holder_id','unique':true}) + fld = {'vt_symbol': order.vt_symbol, - 'accountid': order.accountid, + 'account_id': order.accountid, 'sys_orderid': order.sys_orderid, 'order_date': order_date, - 'holder_id': order.holder_id} + 'holder_id': getattr(order,'holder_id','')} data = copy.copy(order.__dict__) + data.update({'account_id': data.pop('accountid')}) data.update({'order_date': order_date}) data.update({'exchange': order.exchange.value}) data.update({'direction': order.direction.value}) + data.update({'offset': order.offset.value}) + data.update({'type': order.type.value}) + data.update({'status': order.status.value}) self.update_data(db_name=ACCOUNT_DB_NAME, col_name=TODAY_ORDER_COL, fld=fld, data=data) # 数据库需要提前建立多重索引 - # db.history_orders.createIndex({'accountid':1,'vt_symbol':1,'sys_orderid':1,'order_date':1,'holder_id':1},{'name':'history_accountid_vtsymbol_sysorderid_order_date_holder_id'}) - # db.history_orders.createIndex({'accountid':1}) + # db.history_orders.createIndex({'account_id':1,'vt_symbol':1,'sys_orderid':1,'order_date':1,'holder_id':1},{'name':'history_accountid_vtsymbol_sysorderid_order_date_holder_id'}) + # 复制委托记录=》历史委托记录 - if order.gatewayName in self.copy_history_orders: + if order.gateway_name in self.copy_history_orders: history_data = copy.copy(data) fld2 = {'vt_symbol': order.vt_symbol, - 'accountid': order.accountid, + 'account_id': order.accountid, 'sys_orderid': order.sys_orderid, 'order_date': order_date, - 'holder_id': order.holder_id} + 'holder_id': getattr(order,'holder_id','')} self.update_data(db_name=ACCOUNT_DB_NAME, col_name=HISTORY_ORDER_COL, fld=fld2, data=history_data) @@ -348,28 +377,30 @@ class AccountRecorder(BaseEngine): trade_date = get_trading_date(datetime.now()) fld = {'vt_symbol': trade.vt_symbol, - 'accountid': trade.accountid, + 'account_id': trade.accountid, 'vt_tradeid': trade.vt_tradeid, 'trade_date': trade_date, 'holder_id': trade.holder_id} # 提前创建索引 - # db.today_trades.createIndex({'accountid':1,'vt_symbol':1,'vt_tradeid':1,'trade_date':1,'holder_id':1},{'name':'accountID_vtSymbol_vtTradeID_trade_date_holder_id','unique':true}) - # db.today_trades.createIndex({'accountid':1}) + # db.today_trades.createIndex({'account_id':1,'vt_symbol':1,'vt_tradeid':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtSymbol_vt_tradeid_trade_date_holder_id','unique':true}) + data = copy.copy(trade.__dict__) + data.update({'account_id': data.pop('accountid')}) data.update({'trade_date': trade_date}) data.update({'exchange': trade.exchange.value}) data.update({'direction': trade.direction.value}) + data.update({'offset': trade.offset.value}) self.update_data(db_name=ACCOUNT_DB_NAME, col_name=TODAY_TRADE_COL, fld=fld, data=data) - # db.history_trades.createIndex({'accountid':1,'vt_symbol':1,'vt_tradeid':1,'trade_date':1,'holder_id':1},{'name':'accountID_vtSymbol_vtTradeID_trade_date_holder_id'}) - # db.history_trades.createIndex({'accountid':1}) + # db.history_trades.createIndex({'account_id':1,'vt_symbol':1,'vt_tradeid':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtSymbol_vt_tradeid_trade_date_holder_id'}) + # 复制成交记录=》历史成交记录 if trade.gateway_name in self.copy_history_trades: history_trade = copy.copy(data) fld2 = {'vt_symbol': trade.vt_symbol, - 'accountid': trade.accountid, + 'account_id': trade.accountid, 'vt_tradeid': trade.vt_tradeid, 'trade_date': trade_date, 'holder_id': trade.holder_id} @@ -387,90 +418,106 @@ class AccountRecorder(BaseEngine): fld = {'vt_symbol': pos.vt_symbol, 'direction': pos.direction.value, - 'accountid': pos.accountid, + 'account_id': pos.accountid, 'trade_date': trade_date, 'holder_id': pos.holder_id} - # db.today_positions.createIndex({'accountid':1,'vt_symbol':1,'direction':1,'trade_date':1,'holder_id':1},{'name':'accountID_vtSymbol_direction_trade_date_holder_id'}) - # db.today_positions.createIndex({'accountid':1}) - data = pos.__dict__ + # db.today_positions.createIndex({'account_id':1,'vt_symbol':1,'direction':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtsymbol_direction_trade_date_holder_id'}) + + data = copy.copy(pos.__dict__) + data.update({'account_id': data.pop('accountid')}) data.update({'trade_date': trade_date}) data.update({'exchange': pos.exchange.value}) data.update({'direction': pos.direction.value}) # 补充 当前价格 - try: + if pos.cur_price == 0: price = self.main_engine.get_price(pos.vt_symbol) if price: data.update({'cur_price': price}) - except: # noqa - pass self.update_data(db_name=ACCOUNT_DB_NAME, col_name=TODAY_POSITION_COL, fld=fld, data=data) + def update_strategy_snapshot(self, event: Event): + """更新策略切片""" + snapshot = event.data + self.write_log(f"保存切片,{snapshot.get('account_id')},策略:{snapshot.get('strategy')}") + klines = snapshot.pop('klines', None) + if klines: + self.write_log(f"转换 =>BSON.binary.Binary") + snapshot.update({'klines': binary.Binary(klines)}) + fld = { + 'account_id': snapshot.get('account_id'), + 'strategy': snapshot.get('strategy'), + 'guid': snapshot.get('guid') + } + self.update_data(db_name=ACCOUNT_DB_NAME, col_name=STRATEGY_SNAPSHOT, fld=fld, data=snapshot) + def update_history_trade(self, event: Event): """更新历史查询记录""" trade = event.data trade_date = trade.time.split(' ')[0] fld = {'vt_symbol': trade.vt_symbol, - 'accountid': trade.accountid, - 'vt_tradeID': trade.vt_tradeid, + 'account_id': trade.accountid, + 'vt_tradeid': trade.vt_tradeid, 'trade_date': trade_date, 'holder_id': trade.holder_id} - data = trade.__dict__ + data = copy.copy(trade.__dict__) + data.update({'account_id': data.pop('accountid')}) data.update({'trade_date': trade_date}) self.update_data(db_name=ACCOUNT_DB_NAME, col_name=HISTORY_TRADE_COL, fld=fld, data=data) - self.update_begin_day(trade.gatewayName, HISTORY_TRADE_COL, trade_date) + self.update_begin_day(trade.gateway_name, HISTORY_TRADE_COL, trade_date) def update_history_order(self, event: Event): """更新历史委托""" order = event.data order_date = order.time.split(' ')[0] fld = {'vt_symbol': order.vt_symbol, - 'accountid': order.accountid, + 'account_id': order.accountid, 'sys_orderid': order.sys_orderid, 'order_date': order_date, 'holder_id': order.holder_id} - data = order.__dict__ + data = copy.copy(order.__dict__) + data.update({'account_id': data.pop('accountid')}) data.update({'order_date': order_date}) self.update_data(db_name=ACCOUNT_DB_NAME, col_name=HISTORY_ORDER_COL, fld=fld, data=data) - self.update_begin_day(order.gatewayName, HISTORY_ORDER_COL, order_date) + self.update_begin_day(order.gateway_name, HISTORY_ORDER_COL, order_date) def update_funds_flow(self, event: Event): """更新历史资金流水""" funds_flow = event.data - data = funds_flow.__dict__ + data = copy.copy(funds_flow.__dict__) - fld = {'accountid': funds_flow.accountid, + fld = {'account_id': funds_flow.accountid, 'trade_date': funds_flow.trade_date, 'trade_amount': funds_flow.trade_amount, 'fund_remain': funds_flow.fund_remain, 'contract_id': funds_flow.contract_id, 'holder_id': funds_flow.holder_id} - + data.update({'account_id': data.pop('accountid')}) self.update_data(db_name=ACCOUNT_DB_NAME, col_name=FUNDS_FLOW_COL, fld=fld, data=data) - self.update_begin_day(funds_flow.gatewayName, HISTORY_ORDER_COL, funds_flow.trade_date) + self.update_begin_day(funds_flow.gateway_name, HISTORY_ORDER_COL, funds_flow.trade_date) def update_strategy_pos(self, event: Event): + """更新策略持仓事件""" data = event.data dt = data.get('datetime') - pos_trading_day = get_trading_date(dt) - data.update({'trading_day': pos_trading_day}) - accountid = data.get('accountid') - fld = { - 'accountid': accountid, - 'stratregy_group': data.get('straregy_group', '-'), - 'strategy_name': data.get('strategy_name'), - 'datetime': dt.strftime("%Y-%m-%d %H:%M:%S"), - 'inited': True, - 'trading': True - } - self.update_data(db_name=ACCOUNT_DB_NAME, col_name=TODAY_STRATEGY_POS_COL, fld=fld, data=data) + account_id = data.get('accountid') + fld = { + 'account_id': account_id, + 'strategy_group': data.get('strategy_group', '-'), + 'strategy_name': data.get('strategy_name') + } + pos_data = copy.copy(data) + pos_data.update({'account_id': pos_data.get('accountid')}) + pos_data.update({'datetime': dt.strftime("%Y-%m-%d %H:%M:%S")}) + + self.update_data(db_name=ACCOUNT_DB_NAME, col_name=TODAY_STRATEGY_POS_COL, fld=fld, data=pos_data) def process_gw_error(self, event: Event): """ 处理gw的回报错误日志""" @@ -512,7 +559,7 @@ class AccountRecorder(BaseEngine): d.update({'log_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")}) d.update({'trading_day': get_trading_date()}) - account_id = self.gw_name_acct_id.get(data.gatewayName, None) + account_id = self.gw_name_acct_id.get(data.gateway_name, None) if account_id: d.update({'account_id': account_id}) fld = copy.copy(d) @@ -579,10 +626,12 @@ class AccountRecorder(BaseEngine): """启动""" self.active = True self.thread.start() + self.write_log(f'账号记录引擎启动') # ---------------------------------------------------------------------- def stop(self): """退出""" + self.write_log(f'账号记录引擎退出') if self.mongo_db: self.mongo_db = None diff --git a/vnpy/app/cta_crypto/engine.py b/vnpy/app/cta_crypto/engine.py index 856b32ae..de0363dc 100644 --- a/vnpy/app/cta_crypto/engine.py +++ b/vnpy/app/cta_crypto/engine.py @@ -19,6 +19,7 @@ from collections import OrderedDict from concurrent.futures import ThreadPoolExecutor from copy import copy from functools import lru_cache +from uuid import uuid1 from vnpy.event import Event, EventEngine from vnpy.trader.engine import BaseEngine, MainEngine @@ -40,6 +41,7 @@ from vnpy.trader.event import ( EVENT_TRADE, EVENT_POSITION, EVENT_STRATEGY_POS, + EVENT_STRATEGY_SNAPSHOT ) from vnpy.trader.constant import ( Direction, @@ -1250,6 +1252,15 @@ class CtaEngine(BaseEngine): pickle.dump(snapshot, f) self.write_log(u'切片保存成功:{}'.format(str(snapshot_file))) + # 通过事件方式,传导到account_recorder + snapshot.update({ + 'account_id': self.engine_config.get('accountid', '-'), + 'strategy_group': self.engine_config.get('strategy_group', self.engine_name), + 'guid': str(uuid1()) + }) + event = Event(EVENT_STRATEGY_SNAPSHOT, snapshot) + self.event_engine.put(event) + except Exception as ex: self.write_error(u'获取策略{}切片数据异常:'.format(strategy_name, str(ex))) self.write_error(traceback.format_exc()) @@ -1419,11 +1430,10 @@ class CtaEngine(BaseEngine): for strategy_name in list(self.strategies.keys()): d = OrderedDict() d['accountid'] = self.engine_config.get('accountid', '-') - d['strategy_group'] = self.engine_config.get('strategy_group', '-') + d['strategy_group'] = self.engine_config.get('strategy_group', self.engine_name) d['strategy_name'] = strategy_name dt = datetime.now() - d['date'] = dt.strftime('%Y%m%d') - d['hour'] = dt.hour + d['trading_day'] = dt.strftime('%Y-%m-%d') d['datetime'] = datetime.now() strategy = self.strategies.get(strategy_name) d['inited'] = strategy.inited diff --git a/vnpy/app/cta_crypto/template.py b/vnpy/app/cta_crypto/template.py index bfc699fc..e8d25597 100644 --- a/vnpy/app/cta_crypto/template.py +++ b/vnpy/app/cta_crypto/template.py @@ -815,6 +815,7 @@ class CtaFutureTemplate(CtaTemplate): # 开仓完毕( buy, short) else: grid.open_status = True + grid.open_time = self.cur_datetime self.write_log(f'{grid.direction.value}单已开仓完毕,order_price:{order.price}' + f',volume:{order.volume}') @@ -1312,36 +1313,41 @@ class CtaFutureTemplate(CtaTemplate): up_grids_info = "" for grid in list(self.gt.up_grids): if not grid.open_status and grid.order_status: - up_grids_info += f'平空中: [已平:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}\n' + up_grids_info += f'平空中: [已平:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}]\n' if len(grid.order_ids) > 0: up_grids_info += f'委托单号:{grid.order_ids}' continue if grid.open_status and not grid.order_status: - up_grids_info += f'持空中: [数量:{grid.volume}\n, 开仓时间:{grid.open_time}' + up_grids_info += f'持空中: [数量:{grid.volume}\n, 开仓时间:{grid.open_time}]\n' continue if not grid.open_status and grid.order_status: - up_grids_info += f'开空中: [已开:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}\n' + up_grids_info += f'开空中: [已开:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}]\n' if len(grid.order_ids) > 0: up_grids_info += f'委托单号:{grid.order_ids}' dn_grids_info = "" for grid in list(self.gt.dn_grids): if not grid.open_status and grid.order_status: - up_grids_info += f'平多中: [已平:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}\n' + dn_grids_info += f'平多中: [已平:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}]\n' if len(grid.order_ids) > 0: - up_grids_info += f'委托单号:{grid.order_ids}' + dn_grids_info += f'委托单号:{grid.order_ids}' continue if grid.open_status and not grid.order_status: - up_grids_info += f'持多中: [数量:{grid.volume}\n, 开仓时间:{grid.open_time}' + dn_grids_info += f'持多中: [数量:{grid.volume}\n, 开仓时间:{grid.open_time}]\n' continue if not grid.open_status and grid.order_status: - up_grids_info += f'开多中: [已开:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}\n' + dn_grids_info += f'开多中: [已开:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}]\n' if len(grid.order_ids) > 0: - up_grids_info += f'委托单号:{grid.order_ids}' + dn_grids_info += f'委托单号:{grid.order_ids}' + + if len(up_grids_info) > 0: + self.write_log(up_grids_info) + if len(dn_grids_info) > 0: + self.write_log(dn_grids_info) def display_tns(self): """显示事务的过程记录=》 log""" diff --git a/vnpy/app/cta_crypto/ui/widget.py b/vnpy/app/cta_crypto/ui/widget.py index 935f912d..15f17f1b 100644 --- a/vnpy/app/cta_crypto/ui/widget.py +++ b/vnpy/app/cta_crypto/ui/widget.py @@ -202,9 +202,12 @@ class StrategyManager(QtWidgets.QFrame): reload_button = QtWidgets.QPushButton("重载") reload_button.clicked.connect(self.reload_strategy) - save_button = QtWidgets.QPushButton("保存") + save_button = QtWidgets.QPushButton("缓存") save_button.clicked.connect(self.save_strategy) + snapshot_button = QtWidgets.QPushButton("切片") + snapshot_button.clicked.connect(self.save_snapshot) + strategy_name = self._data["strategy_name"] vt_symbol = self._data["vt_symbol"] class_name = self._data["class_name"] @@ -227,6 +230,7 @@ class StrategyManager(QtWidgets.QFrame): hbox.addWidget(remove_button) hbox.addWidget(reload_button) hbox.addWidget(save_button) + hbox.addWidget(snapshot_button) vbox = QtWidgets.QVBoxLayout() vbox.addWidget(label) @@ -279,8 +283,12 @@ class StrategyManager(QtWidgets.QFrame): self.cta_engine.reload_strategy(self.strategy_name) def save_strategy(self): + """保存K线缓存""" self.cta_engine.save_strategy_data(self.strategy_name) + def save_snapshot(self): + """ 保存切片""" + self.cta_engine.save_strategy_snapshot(self.strategy_name) class DataMonitor(QtWidgets.QTableWidget): """ diff --git a/vnpy/app/cta_strategy_pro/engine.py b/vnpy/app/cta_strategy_pro/engine.py index ce15cf3d..eff51480 100644 --- a/vnpy/app/cta_strategy_pro/engine.py +++ b/vnpy/app/cta_strategy_pro/engine.py @@ -19,6 +19,7 @@ from collections import OrderedDict from concurrent.futures import ThreadPoolExecutor from copy import copy from functools import lru_cache +from uuid import uuid1 from vnpy.event import Event, EventEngine from vnpy.trader.engine import BaseEngine, MainEngine @@ -37,6 +38,7 @@ from vnpy.trader.event import ( EVENT_TRADE, EVENT_POSITION, EVENT_STRATEGY_POS, + EVENT_STRATEGY_SNAPSHOT ) from vnpy.trader.constant import ( Direction, @@ -203,21 +205,27 @@ class CtaEngine(BaseEngine): def process_timer_event(self, event: Event): """ 处理定时器事件""" - + all_trading = True # 触发每个策略的定时接口 for strategy in list(self.strategies.values()): strategy.on_timer() + if not strategy.trading: + all_trading = False dt = datetime.now() if self.last_minute != dt.minute: self.last_minute = dt.minute - # 主动获取所有策略得持仓信息 - all_strategy_pos = self.get_all_strategy_pos() + if all_trading: + # 主动获取所有策略得持仓信息 + all_strategy_pos = self.get_all_strategy_pos() - # 推送到事件 - self.put_all_strategy_pos_event(all_strategy_pos) + # 比对仓位,使用上述获取得持仓信息,不用重复获取 + self.compare_pos(strategy_pos_list=copy(all_strategy_pos)) + + # 推送到事件 + self.put_all_strategy_pos_event(all_strategy_pos) def process_tick_event(self, event: Event): """处理tick到达事件""" @@ -867,6 +875,7 @@ class CtaEngine(BaseEngine): self.write_log(msg=msg, strategy_name=strategy.strategy_name, level=logging.CRITICAL) + self.send_wechat(msg) def add_strategy( self, class_name: str, @@ -1197,6 +1206,15 @@ class CtaEngine(BaseEngine): pickle.dump(snapshot, f) self.write_log(u'切片保存成功:{}'.format(str(snapshot_file))) + # 通过事件方式,传导到account_recorder + snapshot.update({ + 'account_id': self.engine_config.get('accountid', '-'), + 'strategy_group': self.engine_config.get('strategy_group', self.engine_name), + 'guid': str(uuid1()) + }) + event = Event(EVENT_STRATEGY_SNAPSHOT, snapshot) + self.event_engine.put(event) + except Exception as ex: self.write_error(u'获取策略{}切片数据异常:'.format(strategy_name, str(ex))) self.write_error(traceback.format_exc()) diff --git a/vnpy/component/cta_line_bar.py b/vnpy/component/cta_line_bar.py index b6f29176..391ec482 100644 --- a/vnpy/component/cta_line_bar.py +++ b/vnpy/component/cta_line_bar.py @@ -636,6 +636,8 @@ class CtaLineBar(object): self.line_bd_fast = [] # 波段快线 self.line_bd_slow = [] # 波段慢线 self.cur_bd_count = 0 # 当前波段快线慢线金叉死叉, +金叉计算, - 死叉技术 + self._bd_fast = 0 + self._bd_slow = 0 def set_params(self, setting: dict = {}): """设置参数""" @@ -3492,7 +3494,7 @@ class CtaLineBar(object): :param:direction,多:检查是否有顶背离,空,检查是否有底背离 :return: """ - if len(self.skd_top_list) < 2 or len(self.skd_buttom_list) < 2 or self._rt_sk is None or self._rt_sd is None: + if len(self.skd_top_list) < 2 or len(self.skd_buttom_list) < 2: return False t1 = self.skd_top_list[-1] @@ -3501,6 +3503,8 @@ class CtaLineBar(object): b2 = self.__get_2nd_item(self.skd_buttom_list[:-1]) if runtime: + if self._rt_sk is None or self._rt_sd is None: + return False # 峰(顶部) if self._rt_sk < self.line_sk[-1] and self.line_sk[-2] < self.line_sk[-1]: t1 = {} @@ -3610,10 +3614,12 @@ class CtaLineBar(object): 检查SDK的方向风险 :return: """ - if not self.para_active_skd or len(self.line_sk) < 2 or self._rt_sk is None: + if not self.para_active_skd or len(self.line_sk) < 2 : return False if runtime: + if self._rt_sk is None: + return False sk = self._rt_sk else: sk = self.line_sk[-1] @@ -4057,6 +4063,46 @@ class CtaLineBar(object): elif self.line_bd_fast[-1] < self.line_bd_slow[-1]: self.cur_bd_count = min(-1, self.cur_bd_count - 1) + def rt_count_bd(self): + """实时计算波段指标""" + if self.para_bd_len <= 0: + # 不计算 + return + + if len(self.line_bar) < 2 * self.para_bd_len: + return + bar_mid4 = (self.line_bar[-1].close_price * 2 + self.line_bar[-1].high_price + self.line_bar[-1].low_price)/4 + bar_mid4 = round(bar_mid4, self.round_n) + + mid4_array = np.append(self.mid4_array, [bar_mid4]) + mid4_ema_array = ta.EMA(mid4_array, self.para_bd_len) + + mid4_std = np.std(mid4_array[-self.para_bd_len:], ddof=1) + + mid4_ema_diff_array = mid4_array - mid4_ema_array + var5_array = (mid4_ema_diff_array / mid4_std * 100 + 200) / 4 + var6_array = (ta.EMA(var5_array, 5) - 25) * 1.56 + fast_array = ta.EMA(var6_array, 2) * 1.22 + slow_array = ta.EMA(fast_array, 2) + + self._bd_fast = fast_array[-1] + self._bd_slow = slow_array[-1] + + @property + def rt_bd_fast(self): + self.check_rt_funcs(self.rt_count_bd) + if self._bd_fast is None and len(self.para_bd_len) > 0: + return self.line_bd_fast[-1] + return self._bd_fast + + @property + def rt_bd_slow(self): + self.check_rt_funcs(self.rt_count_bd) + if self._bd_slow is None and len(self.para_bd_len) > 0: + return self.line_bd_slow[-1] + return self._bd_slow + + def write_log(self, content): """记录CTA日志""" self.strategy.write_log(u'[' + self.name + u']' + content) diff --git a/vnpy/gateway/binancef/binancef_gateway.py b/vnpy/gateway/binancef/binancef_gateway.py index 3c2b09ac..1b75429b 100644 --- a/vnpy/gateway/binancef/binancef_gateway.py +++ b/vnpy/gateway/binancef/binancef_gateway.py @@ -214,6 +214,8 @@ class BinancefRestApi(RestClient): self.orders = {} + self.accountid = "" + def sign(self, request: Request) -> Request: """ Generate BINANCE signature. @@ -398,6 +400,9 @@ class BinancefRestApi(RestClient): orderid, self.gateway_name ) + order.accountid = self.accountid + order.vt_accountid = f"{self.gateway_name}.{self.accountid}" + order.datetime = datetime.now() self.orders.update({orderid: copy(order)}) self.gateway.write_log(f'委托返回订单更新:{order.__dict__}') self.gateway.on_order(order) @@ -510,14 +515,19 @@ class BinancefRestApi(RestClient): "walletBalance": "9.19485176" // 账户余额 }""" # self.gateway.write_log(print_dict(asset)) + if asset['asset'] != "USDT": + continue + if not self.accountid: + self.accountid = f"{self.gateway_name}_{asset['asset']}" account = AccountData( - accountid=f"{self.gateway_name}_{asset['asset']}", + accountid=self.accountid, balance=float(asset["marginBalance"]), frozen=float(asset["maintMargin"]), holding_profit=float(asset['unrealizedProfit']), currency='USDT', margin=float(asset["initialMargin"]), - gateway_name=self.gateway_name + gateway_name=self.gateway_name, + trading_day=datetime.now().strftime('%Y-%m-%d') ) if account.balance: @@ -536,13 +546,16 @@ class BinancefRestApi(RestClient): def on_query_position(self, data: dict, request: Request) -> None: """""" for d in data: + # self.gateway.write_log(d) volume = float(d["positionAmt"]) position = PositionData( + accountid=self.accountid, symbol=d["symbol"], exchange=Exchange.BINANCE, direction=Direction.NET, volume=volume, price=float(d["entryPrice"]), + cur_price=float(d["markPrice"]), pnl=float(d["unRealizedProfit"]), gateway_name=self.gateway_name, ) @@ -557,7 +570,9 @@ class BinancefRestApi(RestClient): time = dt.strftime("%Y-%m-%d %H:%M:%S") order = OrderData( + accountid=self.accountid, orderid=d["clientOrderId"], + sys_orderid=str(d["orderId"]), symbol=d["symbol"], exchange=Exchange.BINANCE, price=float(d["price"]), @@ -566,6 +581,7 @@ class BinancefRestApi(RestClient): direction=DIRECTION_BINANCEF2VT[d["side"]], traded=float(d["executedQty"]), status=STATUS_BINANCEF2VT.get(d["status"], None), + datetime=dt, time=time, gateway_name=self.gateway_name, ) @@ -582,6 +598,7 @@ class BinancefRestApi(RestClient): time = dt.strftime("%Y-%m-%d %H:%M:%S") trade = TradeData( + accountid=self.accountid, symbol=d['symbol'], exchange=Exchange.BINANCE, orderid=d['orderId'], @@ -655,6 +672,11 @@ class BinancefRestApi(RestClient): order.status = Status.REJECTED self.orders.update({order.orderid: copy(order)}) self.gateway.write_log(f'订单委托失败:{order.__dict__}') + if not order.accountid: + order.accountid = self.accountid + order.vt_accountid = f"{self.gateway_name}.{self.accountid}" + if not order.datetime: + order.datetime = datetime.now() self.gateway.on_order(order) msg = f"委托失败,状态码:{status_code},信息:{request.response.text}" @@ -670,6 +692,11 @@ class BinancefRestApi(RestClient): order.status = Status.REJECTED self.orders.update({order.orderid: copy(order)}) self.gateway.write_log(f'发送订单异常:{order.__dict__}') + if not order.accountid: + order.accountid = self.accountid + order.vt_accountid = f"{self.gateway_name}.{self.accountid}" + if not order.datetime: + order.datetime = datetime.now() self.gateway.on_order(order) msg = f"委托失败,拒单" @@ -784,6 +811,7 @@ class BinancefTradeWebsocketApi(WebsocketClient): self.gateway: BinancefGateway = gateway self.gateway_name: str = gateway.gateway_name + self.accountid = "" def connect(self, url: str, proxy_host: str, proxy_port: int) -> None: """""" @@ -836,9 +864,12 @@ class BinancefTradeWebsocketApi(WebsocketClient): # 计算持仓收益 holding_pnl = 0 for pos_data in packet["a"]["P"]: - print(pos_data) + # print(pos_data) volume = float(pos_data["pa"]) + if not self.accountid: + self.accountid = f"{self.gateway_name}_USDT" position = PositionData( + accountid=self.accountid, symbol=pos_data["s"], exchange=Exchange.BINANCE, direction=Direction.NET, @@ -851,12 +882,16 @@ class BinancefTradeWebsocketApi(WebsocketClient): self.gateway.on_position(position) for acc_data in packet["a"]["B"]: + if acc_data['a'] != 'USDT': + continue account = AccountData( - accountid=f"{self.gateway_name}_{acc_data['a']}", + accountid=self.accountid, balance=round(float(acc_data["wb"]), 7), frozen=float(acc_data["wb"]) - float(acc_data["cw"]), holding_profit=round(holding_pnl, 7), - gateway_name=self.gateway_name + currency='USDT', + gateway_name=self.gateway_name, + trading_day=datetime.now().strftime('%Y-%m-%d') ) if account.balance: @@ -884,15 +919,18 @@ class BinancefTradeWebsocketApi(WebsocketClient): else: self.gateway.write_log(u'缓存中找不到Order,创建一个新的') order = OrderData( + accountid=self.accountid, symbol=ord_data["s"], exchange=Exchange.BINANCE, orderid=str(ord_data["c"]), + sys_orderid=str(ord_data["i"]), type=ORDERTYPE_BINANCEF2VT[ord_data["o"]], direction=DIRECTION_BINANCEF2VT[ord_data["S"]], price=float(ord_data["p"]), volume=float(ord_data["q"]), traded=float(ord_data["z"]), status=STATUS_BINANCEF2VT[ord_data["X"]], + datetime=dt, time=time, gateway_name=self.gateway_name ) @@ -908,6 +946,7 @@ class BinancefTradeWebsocketApi(WebsocketClient): trade_time = trade_dt.strftime("%Y-%m-%d %H:%M:%S") trade = TradeData( + accountid=self.accountid, symbol=order.symbol, exchange=order.exchange, orderid=order.orderid, diff --git a/vnpy/gateway/ctp/ctp_gateway.py b/vnpy/gateway/ctp/ctp_gateway.py index 35d32ebb..67f46ad6 100644 --- a/vnpy/gateway/ctp/ctp_gateway.py +++ b/vnpy/gateway/ctp/ctp_gateway.py @@ -640,6 +640,8 @@ class CtpTdApi(TdApi): self.sysid_orderid_map = {} self.future_contract_changed = False + self.accountid = "" + def onFrontConnected(self): """""" self.gateway.write_log("交易服务器连接成功") @@ -754,6 +756,7 @@ class CtpTdApi(TdApi): position = self.positions.get(key, None) if not position: position = PositionData( + accountid=self.accountid, symbol=data["InstrumentID"], exchange=symbol_exchange_map[data["InstrumentID"]], direction=DIRECTION_CTP2VT[data["PosiDirection"]], @@ -800,6 +803,8 @@ class CtpTdApi(TdApi): """""" if "AccountID" not in data: return + if not self.accountid: + self.accountid = data['AccountID'] account = AccountData( accountid=data["AccountID"], diff --git a/vnpy/trader/engine.py b/vnpy/trader/engine.py index 29e99b25..0836b159 100644 --- a/vnpy/trader/engine.py +++ b/vnpy/trader/engine.py @@ -111,7 +111,7 @@ class MainEngine: if app.app_name == "RiskManager": self.rm_engine = engine elif app.app_name == "AlgoTrading": - self.algo_engine == engine + self.algo_engine = engine elif app.app_name == 'RpcService': self.rpc_service = engine diff --git a/vnpy/trader/event.py b/vnpy/trader/event.py index 5d9db8f2..c95094d3 100644 --- a/vnpy/trader/event.py +++ b/vnpy/trader/event.py @@ -15,6 +15,7 @@ EVENT_LOG = "eLog" # 扩展 EVENT_BAR = "eBar." EVENT_STRATEGY_POS = "eStrategyPos." +EVENT_STRATEGY_SNAPSHOT = "eStrategySnapshot." # 拓展, 支持股票账号中,历史交成交/历史委托/资金流水 EVENT_HISTORY_TRADE = 'eHistoryTrade.' diff --git a/vnpy/trader/object.py b/vnpy/trader/object.py index 2da0edb4..cd47a3ad 100644 --- a/vnpy/trader/object.py +++ b/vnpy/trader/object.py @@ -136,7 +136,7 @@ class OrderData(BaseData): exchange: Exchange orderid: str sys_orderid: str = "" - + accountid: str = "" type: OrderType = OrderType.LIMIT direction: Direction = "" offset: Offset = Offset.NONE @@ -144,6 +144,7 @@ class OrderData(BaseData): volume: float = 0 traded: float = 0 status: Status = Status.SUBMITTING + datetime: datetime = None time: str = "" cancel_time: str = "" @@ -151,6 +152,7 @@ class OrderData(BaseData): """""" self.vt_symbol = f"{self.symbol}.{self.exchange.value}" self.vt_orderid = f"{self.gateway_name}.{self.orderid}" + self.vt_accountid = f"{self.gateway_name}.{self.accountid}" def is_active(self) -> bool: """ @@ -183,6 +185,7 @@ class TradeData(BaseData): orderid: str tradeid: str sys_orderid: str = "" + accountid: str = "" direction: Direction = "" @@ -204,7 +207,7 @@ class TradeData(BaseData): self.vt_symbol = f"{self.symbol}.{self.exchange.value}" self.vt_orderid = f"{self.gateway_name}.{self.orderid}" self.vt_tradeid = f"{self.gateway_name}.{self.tradeid}" - + self.vt_accountid = f"{self.gateway_name}.{self.accountid}" @dataclass class PositionData(BaseData): @@ -215,12 +218,13 @@ class PositionData(BaseData): symbol: str exchange: Exchange direction: Direction - + accountid: str = "" # 账号id volume: float = 0 frozen: float = 0 price: float = 0 pnl: float = 0 yd_volume: float = 0 + cur_price: float = 0 # 当前价 # 股票相关 holder_id: str = "" # 股东代码 @@ -229,7 +233,7 @@ class PositionData(BaseData): """""" self.vt_symbol = f"{self.symbol}.{self.exchange.value}" self.vt_positionid = f"{self.gateway_name}.{self.vt_symbol}.{self.direction.value}" - + self.vt_accountid = f"{self.gateway_name}.{self.accountid}" @dataclass class AccountData(BaseData): diff --git a/vnpy/trader/ui/widget.py b/vnpy/trader/ui/widget.py index 8e76f1b1..982bb7cf 100644 --- a/vnpy/trader/ui/widget.py +++ b/vnpy/trader/ui/widget.py @@ -73,7 +73,10 @@ class EnumCell(BaseCell): Set text using enum.constant.value. """ if content: - super(EnumCell, self).set_content(content.value, data) + if isinstance(content, str): + super(EnumCell, self).set_content(content, data) + else: + super(EnumCell, self).set_content(content.value, data) class DirectionCell(EnumCell):