[增强] 策略切片=》同步=》mongodb

This commit is contained in:
msincenselee 2020-04-16 15:01:00 +08:00
parent 59acdfdf08
commit d5599bc530
13 changed files with 283 additions and 93 deletions

View File

@ -8,6 +8,7 @@
6. 监听股票接口的 EVENT_HISTORY_ORDER 事件, 数据 => history_trades
7. 监听股票接口的 EVENT_FUNDS_FLOW 事件, 数据 => funds_flow
8. 监听 EVENT_STRATEGY_POS事件数据 =》 mongodb Account.today_strategy_pos
9. 监听 EVENT_STRATEGY_SNAPSHOT事件 数据=》 mongodb Account.strategy_snapshot
配置文件 ar_setting.json

View File

@ -22,6 +22,7 @@ from datetime import datetime, timedelta
from queue import Queue
from threading import Thread
from time import time
from bson import binary
from vnpy.event import Event, EventEngine
from vnpy.trader.event import (
@ -34,6 +35,7 @@ from vnpy.trader.event import (
EVENT_HISTORY_ORDER,
EVENT_FUNDS_FLOW,
EVENT_STRATEGY_POS,
EVENT_STRATEGY_SNAPSHOT,
EVENT_ERROR,
EVENT_WARNING,
EVENT_CRITICAL,
@ -55,13 +57,14 @@ HISTORY_ORDER_COL = 'history_orders'
HISTORY_TRADE_COL = 'history_trades'
HISTORY_STRATEGY_POS_COL = 'history_strategy_pos'
FUNDS_FLOW_COL = 'funds_flow'
STRATEGY_SNAPSHOT = 'strategy_snapshot'
ALERT_DB_NAME = "Alert"
GW_ERROR_COL_NAME = "gw_error_msg"
WARNING_COL_NAME = "warning_msg"
CRITICAL_COL_NAME = "critical_msg"
APP_NAME = "ACCOUNT_RECORDER"
APP_NAME = "AccountRecorder"
########################################################################
@ -129,11 +132,12 @@ class AccountRecorder(BaseEngine):
# ----------------------------------------------------------------------
def load_setting(self):
"""读取配置"""
self.write_log(f'{self.name}读取配置')
try:
d = load_json(self.setting_file_name)
# mongo 数据库连接
mongo_seetting = d.get('mongo', {})
mongo_seetting = d.get('mongo_db', {})
self.mongo_db = MongoData(host=mongo_seetting.get('host', 'localhost'),
port=mongo_seetting.get('port', 27017))
@ -151,6 +155,7 @@ class AccountRecorder(BaseEngine):
if account_setting.get('copy_history_strategypos', False):
self.copy_history_strategypos.append(gateway_name)
self.write_log(f'{self.name}读取配置完成')
except Exception as ex:
self.main_engine.writeCritical(u'读取:{}异常:{}'.format(self.setting_file_name, str(ex)))
@ -175,6 +180,7 @@ class AccountRecorder(BaseEngine):
self.event_engine.register(EVENT_ERROR, self.process_gw_error)
self.event_engine.register(EVENT_WARNING, self.process_warning)
self.event_engine.register(EVENT_CRITICAL, self.process_critical)
self.event_engine.register(EVENT_STRATEGY_SNAPSHOT, self.update_strategy_snapshot)
# ----------------------------------------------------------------------
def update_timer(self, event: Event):
@ -214,7 +220,7 @@ class AccountRecorder(BaseEngine):
def update_account(self, event: Event):
"""更新账号资金"""
account = event.data
fld = {'accountid': account.accountid,
fld = {'account_id': account.accountid,
'gateway_name': account.gateway_name,
'trading_day': account.trading_day,
'currency': account.currency}
@ -224,14 +230,15 @@ class AccountRecorder(BaseEngine):
self.gw_name_acct_id.update({account.gateway_name: account.accountid})
data = account.__dict__
acc_data = copy.copy(account.__dict__)
acc_data.update({'account_id': acc_data.pop('accountid')})
# 更新至历史净值数据表
self.update_data(db_name=ACCOUNT_DB_NAME, col_name=DAILY_INFO_COL, fld=copy.copy(fld),
data=copy.copy(data))
data=acc_data)
fld.pop('trading_day', None)
# 更新至最新净值数据
self.update_data(db_name=ACCOUNT_DB_NAME, col_name=ACCOUNT_INFO_COL, fld=fld, data=data)
self.update_data(db_name=ACCOUNT_DB_NAME, col_name=ACCOUNT_INFO_COL, fld=fld, data=copy.copy(acc_data))
self.remove_pre_trading_day_data(account.accountid, account.trading_day)
@ -274,7 +281,7 @@ class AccountRecorder(BaseEngine):
return begin_day
def remove_pre_trading_day_data(self, accountid: str, trading_day: str):
def remove_pre_trading_day_data(self, account_id: str, trading_day: str):
"""
移除非当前交易日得所有当日交易数据
:param accountid:
@ -285,20 +292,35 @@ class AccountRecorder(BaseEngine):
return
# 移除非当日得交易/持仓
flt = {'accountid': accountid,
flt = {'account_id': account_id,
'trade_date': {'$ne': trading_day}}
self.main_engine.dbDelete(ACCOUNT_DB_NAME, TODAY_TRADE_COL, flt)
self.main_engine.dbDelete(ACCOUNT_DB_NAME, TODAY_POSITION_COL, flt)
self.write_log(f'移除非当日交易持仓:{flt}')
self.mongo_db.db_delete(
db_name=ACCOUNT_DB_NAME,
col_name=TODAY_TRADE_COL,
flt=flt)
self.mongo_db.db_delete(
db_name=ACCOUNT_DB_NAME,
col_name=TODAY_POSITION_COL,
flt=flt)
# 移除非当日得委托
flt = {'accountid': accountid,
flt = {'account_id': account_id,
'order_date': {'$ne': trading_day}}
self.main_engine.dbDelete(ACCOUNT_DB_NAME, TODAY_ORDER_COL, flt)
self.write_log(f'移除非当日委托:{flt}')
self.mongo_db.db_delete(
db_name=ACCOUNT_DB_NAME,
col_name=TODAY_ORDER_COL,
flt=flt)
# 移除非当日得持仓
flt = {'account_id': accountid,
flt = {'account_id': account_id,
'trading_day': {'$ne': trading_day}}
self.main_engine.dbDelete(ACCOUNT_DB_NAME, TODAY_STRATEGY_POS_COL, flt)
self.write_log(f'移除非当日持仓:{flt}')
self.mongo_db.db_delete(
db_name=ACCOUNT_DB_NAME,
col_name=TODAY_STRATEGY_POS_COL,
flt=flt)
self.is_remove_pre_data = True
@ -306,39 +328,46 @@ class AccountRecorder(BaseEngine):
"""更新当日记录"""
order = event.data
self.write_log(u'记录委托日志:{}'.format(order.__dict__))
if len(order.sysOrderID) == 0:
if len(order.sys_orderid) == 0:
# 未有系统的委托编号,不做持久化
return
order_date = get_trading_date(datetime.now())
dt = getattr(order, 'datetime')
if not dt:
order_date = datetime.now().strftime('%Y-%m-%d')
else:
order_date = dt.strftime('%Y-%m-%d')
# 数据库需要提前建立多重索引
# db.today_orders.createIndex({'accountid':1,'vt_symbol':1,'sys_orderid':1,'order_date':1,'holder_id':1},{'name':'accountid_vtsymbol_sysorderid_order_date_holder_id','unique':true})
# db.today_orders.createIndex({'accountID':1})
# db.today_orders.createIndex({'account_id':1,'vt_symbol':1,'sys_orderid':1,'order_date':1,'holder_id':1},{'name':'accountid_vtsymbol_sysorderid_order_date_holder_id','unique':true})
fld = {'vt_symbol': order.vt_symbol,
'accountid': order.accountid,
'account_id': order.accountid,
'sys_orderid': order.sys_orderid,
'order_date': order_date,
'holder_id': order.holder_id}
'holder_id': getattr(order,'holder_id','')}
data = copy.copy(order.__dict__)
data.update({'account_id': data.pop('accountid')})
data.update({'order_date': order_date})
data.update({'exchange': order.exchange.value})
data.update({'direction': order.direction.value})
data.update({'offset': order.offset.value})
data.update({'type': order.type.value})
data.update({'status': order.status.value})
self.update_data(db_name=ACCOUNT_DB_NAME, col_name=TODAY_ORDER_COL, fld=fld, data=data)
# 数据库需要提前建立多重索引
# db.history_orders.createIndex({'accountid':1,'vt_symbol':1,'sys_orderid':1,'order_date':1,'holder_id':1},{'name':'history_accountid_vtsymbol_sysorderid_order_date_holder_id'})
# db.history_orders.createIndex({'accountid':1})
# db.history_orders.createIndex({'account_id':1,'vt_symbol':1,'sys_orderid':1,'order_date':1,'holder_id':1},{'name':'history_accountid_vtsymbol_sysorderid_order_date_holder_id'})
# 复制委托记录=》历史委托记录
if order.gatewayName in self.copy_history_orders:
if order.gateway_name in self.copy_history_orders:
history_data = copy.copy(data)
fld2 = {'vt_symbol': order.vt_symbol,
'accountid': order.accountid,
'account_id': order.accountid,
'sys_orderid': order.sys_orderid,
'order_date': order_date,
'holder_id': order.holder_id}
'holder_id': getattr(order,'holder_id','')}
self.update_data(db_name=ACCOUNT_DB_NAME, col_name=HISTORY_ORDER_COL, fld=fld2, data=history_data)
@ -348,28 +377,30 @@ class AccountRecorder(BaseEngine):
trade_date = get_trading_date(datetime.now())
fld = {'vt_symbol': trade.vt_symbol,
'accountid': trade.accountid,
'account_id': trade.accountid,
'vt_tradeid': trade.vt_tradeid,
'trade_date': trade_date,
'holder_id': trade.holder_id}
# 提前创建索引
# db.today_trades.createIndex({'accountid':1,'vt_symbol':1,'vt_tradeid':1,'trade_date':1,'holder_id':1},{'name':'accountID_vtSymbol_vtTradeID_trade_date_holder_id','unique':true})
# db.today_trades.createIndex({'accountid':1})
# db.today_trades.createIndex({'account_id':1,'vt_symbol':1,'vt_tradeid':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtSymbol_vt_tradeid_trade_date_holder_id','unique':true})
data = copy.copy(trade.__dict__)
data.update({'account_id': data.pop('accountid')})
data.update({'trade_date': trade_date})
data.update({'exchange': trade.exchange.value})
data.update({'direction': trade.direction.value})
data.update({'offset': trade.offset.value})
self.update_data(db_name=ACCOUNT_DB_NAME, col_name=TODAY_TRADE_COL, fld=fld, data=data)
# db.history_trades.createIndex({'accountid':1,'vt_symbol':1,'vt_tradeid':1,'trade_date':1,'holder_id':1},{'name':'accountID_vtSymbol_vtTradeID_trade_date_holder_id'})
# db.history_trades.createIndex({'accountid':1})
# db.history_trades.createIndex({'account_id':1,'vt_symbol':1,'vt_tradeid':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtSymbol_vt_tradeid_trade_date_holder_id'})
# 复制成交记录=》历史成交记录
if trade.gateway_name in self.copy_history_trades:
history_trade = copy.copy(data)
fld2 = {'vt_symbol': trade.vt_symbol,
'accountid': trade.accountid,
'account_id': trade.accountid,
'vt_tradeid': trade.vt_tradeid,
'trade_date': trade_date,
'holder_id': trade.holder_id}
@ -387,90 +418,106 @@ class AccountRecorder(BaseEngine):
fld = {'vt_symbol': pos.vt_symbol,
'direction': pos.direction.value,
'accountid': pos.accountid,
'account_id': pos.accountid,
'trade_date': trade_date,
'holder_id': pos.holder_id}
# db.today_positions.createIndex({'accountid':1,'vt_symbol':1,'direction':1,'trade_date':1,'holder_id':1},{'name':'accountID_vtSymbol_direction_trade_date_holder_id'})
# db.today_positions.createIndex({'accountid':1})
data = pos.__dict__
# db.today_positions.createIndex({'account_id':1,'vt_symbol':1,'direction':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtsymbol_direction_trade_date_holder_id'})
data = copy.copy(pos.__dict__)
data.update({'account_id': data.pop('accountid')})
data.update({'trade_date': trade_date})
data.update({'exchange': pos.exchange.value})
data.update({'direction': pos.direction.value})
# 补充 当前价格
try:
if pos.cur_price == 0:
price = self.main_engine.get_price(pos.vt_symbol)
if price:
data.update({'cur_price': price})
except: # noqa
pass
self.update_data(db_name=ACCOUNT_DB_NAME, col_name=TODAY_POSITION_COL, fld=fld, data=data)
def update_strategy_snapshot(self, event: Event):
"""更新策略切片"""
snapshot = event.data
self.write_log(f"保存切片,{snapshot.get('account_id')},策略:{snapshot.get('strategy')}")
klines = snapshot.pop('klines', None)
if klines:
self.write_log(f"转换 =>BSON.binary.Binary")
snapshot.update({'klines': binary.Binary(klines)})
fld = {
'account_id': snapshot.get('account_id'),
'strategy': snapshot.get('strategy'),
'guid': snapshot.get('guid')
}
self.update_data(db_name=ACCOUNT_DB_NAME, col_name=STRATEGY_SNAPSHOT, fld=fld, data=snapshot)
def update_history_trade(self, event: Event):
"""更新历史查询记录"""
trade = event.data
trade_date = trade.time.split(' ')[0]
fld = {'vt_symbol': trade.vt_symbol,
'accountid': trade.accountid,
'vt_tradeID': trade.vt_tradeid,
'account_id': trade.accountid,
'vt_tradeid': trade.vt_tradeid,
'trade_date': trade_date,
'holder_id': trade.holder_id}
data = trade.__dict__
data = copy.copy(trade.__dict__)
data.update({'account_id': data.pop('accountid')})
data.update({'trade_date': trade_date})
self.update_data(db_name=ACCOUNT_DB_NAME, col_name=HISTORY_TRADE_COL, fld=fld, data=data)
self.update_begin_day(trade.gatewayName, HISTORY_TRADE_COL, trade_date)
self.update_begin_day(trade.gateway_name, HISTORY_TRADE_COL, trade_date)
def update_history_order(self, event: Event):
"""更新历史委托"""
order = event.data
order_date = order.time.split(' ')[0]
fld = {'vt_symbol': order.vt_symbol,
'accountid': order.accountid,
'account_id': order.accountid,
'sys_orderid': order.sys_orderid,
'order_date': order_date,
'holder_id': order.holder_id}
data = order.__dict__
data = copy.copy(order.__dict__)
data.update({'account_id': data.pop('accountid')})
data.update({'order_date': order_date})
self.update_data(db_name=ACCOUNT_DB_NAME, col_name=HISTORY_ORDER_COL, fld=fld, data=data)
self.update_begin_day(order.gatewayName, HISTORY_ORDER_COL, order_date)
self.update_begin_day(order.gateway_name, HISTORY_ORDER_COL, order_date)
def update_funds_flow(self, event: Event):
"""更新历史资金流水"""
funds_flow = event.data
data = funds_flow.__dict__
data = copy.copy(funds_flow.__dict__)
fld = {'accountid': funds_flow.accountid,
fld = {'account_id': funds_flow.accountid,
'trade_date': funds_flow.trade_date,
'trade_amount': funds_flow.trade_amount,
'fund_remain': funds_flow.fund_remain,
'contract_id': funds_flow.contract_id,
'holder_id': funds_flow.holder_id}
data.update({'account_id': data.pop('accountid')})
self.update_data(db_name=ACCOUNT_DB_NAME, col_name=FUNDS_FLOW_COL, fld=fld, data=data)
self.update_begin_day(funds_flow.gatewayName, HISTORY_ORDER_COL, funds_flow.trade_date)
self.update_begin_day(funds_flow.gateway_name, HISTORY_ORDER_COL, funds_flow.trade_date)
def update_strategy_pos(self, event: Event):
"""更新策略持仓事件"""
data = event.data
dt = data.get('datetime')
pos_trading_day = get_trading_date(dt)
data.update({'trading_day': pos_trading_day})
accountid = data.get('accountid')
fld = {
'accountid': accountid,
'stratregy_group': data.get('straregy_group', '-'),
'strategy_name': data.get('strategy_name'),
'datetime': dt.strftime("%Y-%m-%d %H:%M:%S"),
'inited': True,
'trading': True
}
self.update_data(db_name=ACCOUNT_DB_NAME, col_name=TODAY_STRATEGY_POS_COL, fld=fld, data=data)
account_id = data.get('accountid')
fld = {
'account_id': account_id,
'strategy_group': data.get('strategy_group', '-'),
'strategy_name': data.get('strategy_name')
}
pos_data = copy.copy(data)
pos_data.update({'account_id': pos_data.get('accountid')})
pos_data.update({'datetime': dt.strftime("%Y-%m-%d %H:%M:%S")})
self.update_data(db_name=ACCOUNT_DB_NAME, col_name=TODAY_STRATEGY_POS_COL, fld=fld, data=pos_data)
def process_gw_error(self, event: Event):
""" 处理gw的回报错误日志"""
@ -512,7 +559,7 @@ class AccountRecorder(BaseEngine):
d.update({'log_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")})
d.update({'trading_day': get_trading_date()})
account_id = self.gw_name_acct_id.get(data.gatewayName, None)
account_id = self.gw_name_acct_id.get(data.gateway_name, None)
if account_id:
d.update({'account_id': account_id})
fld = copy.copy(d)
@ -579,10 +626,12 @@ class AccountRecorder(BaseEngine):
"""启动"""
self.active = True
self.thread.start()
self.write_log(f'账号记录引擎启动')
# ----------------------------------------------------------------------
def stop(self):
"""退出"""
self.write_log(f'账号记录引擎退出')
if self.mongo_db:
self.mongo_db = None

View File

@ -19,6 +19,7 @@ from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from functools import lru_cache
from uuid import uuid1
from vnpy.event import Event, EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine
@ -40,6 +41,7 @@ from vnpy.trader.event import (
EVENT_TRADE,
EVENT_POSITION,
EVENT_STRATEGY_POS,
EVENT_STRATEGY_SNAPSHOT
)
from vnpy.trader.constant import (
Direction,
@ -1250,6 +1252,15 @@ class CtaEngine(BaseEngine):
pickle.dump(snapshot, f)
self.write_log(u'切片保存成功:{}'.format(str(snapshot_file)))
# 通过事件方式传导到account_recorder
snapshot.update({
'account_id': self.engine_config.get('accountid', '-'),
'strategy_group': self.engine_config.get('strategy_group', self.engine_name),
'guid': str(uuid1())
})
event = Event(EVENT_STRATEGY_SNAPSHOT, snapshot)
self.event_engine.put(event)
except Exception as ex:
self.write_error(u'获取策略{}切片数据异常:'.format(strategy_name, str(ex)))
self.write_error(traceback.format_exc())
@ -1419,11 +1430,10 @@ class CtaEngine(BaseEngine):
for strategy_name in list(self.strategies.keys()):
d = OrderedDict()
d['accountid'] = self.engine_config.get('accountid', '-')
d['strategy_group'] = self.engine_config.get('strategy_group', '-')
d['strategy_group'] = self.engine_config.get('strategy_group', self.engine_name)
d['strategy_name'] = strategy_name
dt = datetime.now()
d['date'] = dt.strftime('%Y%m%d')
d['hour'] = dt.hour
d['trading_day'] = dt.strftime('%Y-%m-%d')
d['datetime'] = datetime.now()
strategy = self.strategies.get(strategy_name)
d['inited'] = strategy.inited

View File

@ -815,6 +815,7 @@ class CtaFutureTemplate(CtaTemplate):
# 开仓完毕( buy, short)
else:
grid.open_status = True
grid.open_time = self.cur_datetime
self.write_log(f'{grid.direction.value}单已开仓完毕,order_price:{order.price}'
+ f',volume:{order.volume}')
@ -1312,36 +1313,41 @@ class CtaFutureTemplate(CtaTemplate):
up_grids_info = ""
for grid in list(self.gt.up_grids):
if not grid.open_status and grid.order_status:
up_grids_info += f'平空中: [已平:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}\n'
up_grids_info += f'平空中: [已平:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}]\n'
if len(grid.order_ids) > 0:
up_grids_info += f'委托单号:{grid.order_ids}'
continue
if grid.open_status and not grid.order_status:
up_grids_info += f'持空中: [数量:{grid.volume}\n, 开仓时间:{grid.open_time}'
up_grids_info += f'持空中: [数量:{grid.volume}\n, 开仓时间:{grid.open_time}]\n'
continue
if not grid.open_status and grid.order_status:
up_grids_info += f'开空中: [已开:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}\n'
up_grids_info += f'开空中: [已开:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}]\n'
if len(grid.order_ids) > 0:
up_grids_info += f'委托单号:{grid.order_ids}'
dn_grids_info = ""
for grid in list(self.gt.dn_grids):
if not grid.open_status and grid.order_status:
up_grids_info += f'平多中: [已平:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}\n'
dn_grids_info += f'平多中: [已平:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}]\n'
if len(grid.order_ids) > 0:
up_grids_info += f'委托单号:{grid.order_ids}'
dn_grids_info += f'委托单号:{grid.order_ids}'
continue
if grid.open_status and not grid.order_status:
up_grids_info += f'持多中: [数量:{grid.volume}\n, 开仓时间:{grid.open_time}'
dn_grids_info += f'持多中: [数量:{grid.volume}\n, 开仓时间:{grid.open_time}]\n'
continue
if not grid.open_status and grid.order_status:
up_grids_info += f'开多中: [已开:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}\n'
dn_grids_info += f'开多中: [已开:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}]\n'
if len(grid.order_ids) > 0:
up_grids_info += f'委托单号:{grid.order_ids}'
dn_grids_info += f'委托单号:{grid.order_ids}'
if len(up_grids_info) > 0:
self.write_log(up_grids_info)
if len(dn_grids_info) > 0:
self.write_log(dn_grids_info)
def display_tns(self):
"""显示事务的过程记录=》 log"""

View File

@ -202,9 +202,12 @@ class StrategyManager(QtWidgets.QFrame):
reload_button = QtWidgets.QPushButton("重载")
reload_button.clicked.connect(self.reload_strategy)
save_button = QtWidgets.QPushButton("")
save_button = QtWidgets.QPushButton("")
save_button.clicked.connect(self.save_strategy)
snapshot_button = QtWidgets.QPushButton("切片")
snapshot_button.clicked.connect(self.save_snapshot)
strategy_name = self._data["strategy_name"]
vt_symbol = self._data["vt_symbol"]
class_name = self._data["class_name"]
@ -227,6 +230,7 @@ class StrategyManager(QtWidgets.QFrame):
hbox.addWidget(remove_button)
hbox.addWidget(reload_button)
hbox.addWidget(save_button)
hbox.addWidget(snapshot_button)
vbox = QtWidgets.QVBoxLayout()
vbox.addWidget(label)
@ -279,8 +283,12 @@ class StrategyManager(QtWidgets.QFrame):
self.cta_engine.reload_strategy(self.strategy_name)
def save_strategy(self):
"""保存K线缓存"""
self.cta_engine.save_strategy_data(self.strategy_name)
def save_snapshot(self):
""" 保存切片"""
self.cta_engine.save_strategy_snapshot(self.strategy_name)
class DataMonitor(QtWidgets.QTableWidget):
"""

View File

@ -19,6 +19,7 @@ from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from functools import lru_cache
from uuid import uuid1
from vnpy.event import Event, EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine
@ -37,6 +38,7 @@ from vnpy.trader.event import (
EVENT_TRADE,
EVENT_POSITION,
EVENT_STRATEGY_POS,
EVENT_STRATEGY_SNAPSHOT
)
from vnpy.trader.constant import (
Direction,
@ -203,21 +205,27 @@ class CtaEngine(BaseEngine):
def process_timer_event(self, event: Event):
""" 处理定时器事件"""
all_trading = True
# 触发每个策略的定时接口
for strategy in list(self.strategies.values()):
strategy.on_timer()
if not strategy.trading:
all_trading = False
dt = datetime.now()
if self.last_minute != dt.minute:
self.last_minute = dt.minute
# 主动获取所有策略得持仓信息
all_strategy_pos = self.get_all_strategy_pos()
if all_trading:
# 主动获取所有策略得持仓信息
all_strategy_pos = self.get_all_strategy_pos()
# 推送到事件
self.put_all_strategy_pos_event(all_strategy_pos)
# 比对仓位,使用上述获取得持仓信息,不用重复获取
self.compare_pos(strategy_pos_list=copy(all_strategy_pos))
# 推送到事件
self.put_all_strategy_pos_event(all_strategy_pos)
def process_tick_event(self, event: Event):
"""处理tick到达事件"""
@ -867,6 +875,7 @@ class CtaEngine(BaseEngine):
self.write_log(msg=msg,
strategy_name=strategy.strategy_name,
level=logging.CRITICAL)
self.send_wechat(msg)
def add_strategy(
self, class_name: str,
@ -1197,6 +1206,15 @@ class CtaEngine(BaseEngine):
pickle.dump(snapshot, f)
self.write_log(u'切片保存成功:{}'.format(str(snapshot_file)))
# 通过事件方式传导到account_recorder
snapshot.update({
'account_id': self.engine_config.get('accountid', '-'),
'strategy_group': self.engine_config.get('strategy_group', self.engine_name),
'guid': str(uuid1())
})
event = Event(EVENT_STRATEGY_SNAPSHOT, snapshot)
self.event_engine.put(event)
except Exception as ex:
self.write_error(u'获取策略{}切片数据异常:'.format(strategy_name, str(ex)))
self.write_error(traceback.format_exc())

View File

@ -636,6 +636,8 @@ class CtaLineBar(object):
self.line_bd_fast = [] # 波段快线
self.line_bd_slow = [] # 波段慢线
self.cur_bd_count = 0 # 当前波段快线慢线金叉死叉, +金叉计算, - 死叉技术
self._bd_fast = 0
self._bd_slow = 0
def set_params(self, setting: dict = {}):
"""设置参数"""
@ -3492,7 +3494,7 @@ class CtaLineBar(object):
:param:direction检查是否有顶背离检查是否有底背离
:return:
"""
if len(self.skd_top_list) < 2 or len(self.skd_buttom_list) < 2 or self._rt_sk is None or self._rt_sd is None:
if len(self.skd_top_list) < 2 or len(self.skd_buttom_list) < 2:
return False
t1 = self.skd_top_list[-1]
@ -3501,6 +3503,8 @@ class CtaLineBar(object):
b2 = self.__get_2nd_item(self.skd_buttom_list[:-1])
if runtime:
if self._rt_sk is None or self._rt_sd is None:
return False
# 峰(顶部)
if self._rt_sk < self.line_sk[-1] and self.line_sk[-2] < self.line_sk[-1]:
t1 = {}
@ -3610,10 +3614,12 @@ class CtaLineBar(object):
检查SDK的方向风险
:return:
"""
if not self.para_active_skd or len(self.line_sk) < 2 or self._rt_sk is None:
if not self.para_active_skd or len(self.line_sk) < 2 :
return False
if runtime:
if self._rt_sk is None:
return False
sk = self._rt_sk
else:
sk = self.line_sk[-1]
@ -4057,6 +4063,46 @@ class CtaLineBar(object):
elif self.line_bd_fast[-1] < self.line_bd_slow[-1]:
self.cur_bd_count = min(-1, self.cur_bd_count - 1)
def rt_count_bd(self):
"""实时计算波段指标"""
if self.para_bd_len <= 0:
# 不计算
return
if len(self.line_bar) < 2 * self.para_bd_len:
return
bar_mid4 = (self.line_bar[-1].close_price * 2 + self.line_bar[-1].high_price + self.line_bar[-1].low_price)/4
bar_mid4 = round(bar_mid4, self.round_n)
mid4_array = np.append(self.mid4_array, [bar_mid4])
mid4_ema_array = ta.EMA(mid4_array, self.para_bd_len)
mid4_std = np.std(mid4_array[-self.para_bd_len:], ddof=1)
mid4_ema_diff_array = mid4_array - mid4_ema_array
var5_array = (mid4_ema_diff_array / mid4_std * 100 + 200) / 4
var6_array = (ta.EMA(var5_array, 5) - 25) * 1.56
fast_array = ta.EMA(var6_array, 2) * 1.22
slow_array = ta.EMA(fast_array, 2)
self._bd_fast = fast_array[-1]
self._bd_slow = slow_array[-1]
@property
def rt_bd_fast(self):
self.check_rt_funcs(self.rt_count_bd)
if self._bd_fast is None and len(self.para_bd_len) > 0:
return self.line_bd_fast[-1]
return self._bd_fast
@property
def rt_bd_slow(self):
self.check_rt_funcs(self.rt_count_bd)
if self._bd_slow is None and len(self.para_bd_len) > 0:
return self.line_bd_slow[-1]
return self._bd_slow
def write_log(self, content):
"""记录CTA日志"""
self.strategy.write_log(u'[' + self.name + u']' + content)

View File

@ -214,6 +214,8 @@ class BinancefRestApi(RestClient):
self.orders = {}
self.accountid = ""
def sign(self, request: Request) -> Request:
"""
Generate BINANCE signature.
@ -398,6 +400,9 @@ class BinancefRestApi(RestClient):
orderid,
self.gateway_name
)
order.accountid = self.accountid
order.vt_accountid = f"{self.gateway_name}.{self.accountid}"
order.datetime = datetime.now()
self.orders.update({orderid: copy(order)})
self.gateway.write_log(f'委托返回订单更新:{order.__dict__}')
self.gateway.on_order(order)
@ -510,14 +515,19 @@ class BinancefRestApi(RestClient):
"walletBalance": "9.19485176" // 账户余额
}"""
# self.gateway.write_log(print_dict(asset))
if asset['asset'] != "USDT":
continue
if not self.accountid:
self.accountid = f"{self.gateway_name}_{asset['asset']}"
account = AccountData(
accountid=f"{self.gateway_name}_{asset['asset']}",
accountid=self.accountid,
balance=float(asset["marginBalance"]),
frozen=float(asset["maintMargin"]),
holding_profit=float(asset['unrealizedProfit']),
currency='USDT',
margin=float(asset["initialMargin"]),
gateway_name=self.gateway_name
gateway_name=self.gateway_name,
trading_day=datetime.now().strftime('%Y-%m-%d')
)
if account.balance:
@ -536,13 +546,16 @@ class BinancefRestApi(RestClient):
def on_query_position(self, data: dict, request: Request) -> None:
""""""
for d in data:
# self.gateway.write_log(d)
volume = float(d["positionAmt"])
position = PositionData(
accountid=self.accountid,
symbol=d["symbol"],
exchange=Exchange.BINANCE,
direction=Direction.NET,
volume=volume,
price=float(d["entryPrice"]),
cur_price=float(d["markPrice"]),
pnl=float(d["unRealizedProfit"]),
gateway_name=self.gateway_name,
)
@ -557,7 +570,9 @@ class BinancefRestApi(RestClient):
time = dt.strftime("%Y-%m-%d %H:%M:%S")
order = OrderData(
accountid=self.accountid,
orderid=d["clientOrderId"],
sys_orderid=str(d["orderId"]),
symbol=d["symbol"],
exchange=Exchange.BINANCE,
price=float(d["price"]),
@ -566,6 +581,7 @@ class BinancefRestApi(RestClient):
direction=DIRECTION_BINANCEF2VT[d["side"]],
traded=float(d["executedQty"]),
status=STATUS_BINANCEF2VT.get(d["status"], None),
datetime=dt,
time=time,
gateway_name=self.gateway_name,
)
@ -582,6 +598,7 @@ class BinancefRestApi(RestClient):
time = dt.strftime("%Y-%m-%d %H:%M:%S")
trade = TradeData(
accountid=self.accountid,
symbol=d['symbol'],
exchange=Exchange.BINANCE,
orderid=d['orderId'],
@ -655,6 +672,11 @@ class BinancefRestApi(RestClient):
order.status = Status.REJECTED
self.orders.update({order.orderid: copy(order)})
self.gateway.write_log(f'订单委托失败:{order.__dict__}')
if not order.accountid:
order.accountid = self.accountid
order.vt_accountid = f"{self.gateway_name}.{self.accountid}"
if not order.datetime:
order.datetime = datetime.now()
self.gateway.on_order(order)
msg = f"委托失败,状态码:{status_code},信息:{request.response.text}"
@ -670,6 +692,11 @@ class BinancefRestApi(RestClient):
order.status = Status.REJECTED
self.orders.update({order.orderid: copy(order)})
self.gateway.write_log(f'发送订单异常:{order.__dict__}')
if not order.accountid:
order.accountid = self.accountid
order.vt_accountid = f"{self.gateway_name}.{self.accountid}"
if not order.datetime:
order.datetime = datetime.now()
self.gateway.on_order(order)
msg = f"委托失败,拒单"
@ -784,6 +811,7 @@ class BinancefTradeWebsocketApi(WebsocketClient):
self.gateway: BinancefGateway = gateway
self.gateway_name: str = gateway.gateway_name
self.accountid = ""
def connect(self, url: str, proxy_host: str, proxy_port: int) -> None:
""""""
@ -836,9 +864,12 @@ class BinancefTradeWebsocketApi(WebsocketClient):
# 计算持仓收益
holding_pnl = 0
for pos_data in packet["a"]["P"]:
print(pos_data)
# print(pos_data)
volume = float(pos_data["pa"])
if not self.accountid:
self.accountid = f"{self.gateway_name}_USDT"
position = PositionData(
accountid=self.accountid,
symbol=pos_data["s"],
exchange=Exchange.BINANCE,
direction=Direction.NET,
@ -851,12 +882,16 @@ class BinancefTradeWebsocketApi(WebsocketClient):
self.gateway.on_position(position)
for acc_data in packet["a"]["B"]:
if acc_data['a'] != 'USDT':
continue
account = AccountData(
accountid=f"{self.gateway_name}_{acc_data['a']}",
accountid=self.accountid,
balance=round(float(acc_data["wb"]), 7),
frozen=float(acc_data["wb"]) - float(acc_data["cw"]),
holding_profit=round(holding_pnl, 7),
gateway_name=self.gateway_name
currency='USDT',
gateway_name=self.gateway_name,
trading_day=datetime.now().strftime('%Y-%m-%d')
)
if account.balance:
@ -884,15 +919,18 @@ class BinancefTradeWebsocketApi(WebsocketClient):
else:
self.gateway.write_log(u'缓存中找不到Order,创建一个新的')
order = OrderData(
accountid=self.accountid,
symbol=ord_data["s"],
exchange=Exchange.BINANCE,
orderid=str(ord_data["c"]),
sys_orderid=str(ord_data["i"]),
type=ORDERTYPE_BINANCEF2VT[ord_data["o"]],
direction=DIRECTION_BINANCEF2VT[ord_data["S"]],
price=float(ord_data["p"]),
volume=float(ord_data["q"]),
traded=float(ord_data["z"]),
status=STATUS_BINANCEF2VT[ord_data["X"]],
datetime=dt,
time=time,
gateway_name=self.gateway_name
)
@ -908,6 +946,7 @@ class BinancefTradeWebsocketApi(WebsocketClient):
trade_time = trade_dt.strftime("%Y-%m-%d %H:%M:%S")
trade = TradeData(
accountid=self.accountid,
symbol=order.symbol,
exchange=order.exchange,
orderid=order.orderid,

View File

@ -640,6 +640,8 @@ class CtpTdApi(TdApi):
self.sysid_orderid_map = {}
self.future_contract_changed = False
self.accountid = ""
def onFrontConnected(self):
""""""
self.gateway.write_log("交易服务器连接成功")
@ -754,6 +756,7 @@ class CtpTdApi(TdApi):
position = self.positions.get(key, None)
if not position:
position = PositionData(
accountid=self.accountid,
symbol=data["InstrumentID"],
exchange=symbol_exchange_map[data["InstrumentID"]],
direction=DIRECTION_CTP2VT[data["PosiDirection"]],
@ -800,6 +803,8 @@ class CtpTdApi(TdApi):
""""""
if "AccountID" not in data:
return
if not self.accountid:
self.accountid = data['AccountID']
account = AccountData(
accountid=data["AccountID"],

View File

@ -111,7 +111,7 @@ class MainEngine:
if app.app_name == "RiskManager":
self.rm_engine = engine
elif app.app_name == "AlgoTrading":
self.algo_engine == engine
self.algo_engine = engine
elif app.app_name == 'RpcService':
self.rpc_service = engine

View File

@ -15,6 +15,7 @@ EVENT_LOG = "eLog"
# 扩展
EVENT_BAR = "eBar."
EVENT_STRATEGY_POS = "eStrategyPos."
EVENT_STRATEGY_SNAPSHOT = "eStrategySnapshot."
# 拓展, 支持股票账号中,历史交成交/历史委托/资金流水
EVENT_HISTORY_TRADE = 'eHistoryTrade.'

View File

@ -136,7 +136,7 @@ class OrderData(BaseData):
exchange: Exchange
orderid: str
sys_orderid: str = ""
accountid: str = ""
type: OrderType = OrderType.LIMIT
direction: Direction = ""
offset: Offset = Offset.NONE
@ -144,6 +144,7 @@ class OrderData(BaseData):
volume: float = 0
traded: float = 0
status: Status = Status.SUBMITTING
datetime: datetime = None
time: str = ""
cancel_time: str = ""
@ -151,6 +152,7 @@ class OrderData(BaseData):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
self.vt_orderid = f"{self.gateway_name}.{self.orderid}"
self.vt_accountid = f"{self.gateway_name}.{self.accountid}"
def is_active(self) -> bool:
"""
@ -183,6 +185,7 @@ class TradeData(BaseData):
orderid: str
tradeid: str
sys_orderid: str = ""
accountid: str = ""
direction: Direction = ""
@ -204,7 +207,7 @@ class TradeData(BaseData):
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
self.vt_orderid = f"{self.gateway_name}.{self.orderid}"
self.vt_tradeid = f"{self.gateway_name}.{self.tradeid}"
self.vt_accountid = f"{self.gateway_name}.{self.accountid}"
@dataclass
class PositionData(BaseData):
@ -215,12 +218,13 @@ class PositionData(BaseData):
symbol: str
exchange: Exchange
direction: Direction
accountid: str = "" # 账号id
volume: float = 0
frozen: float = 0
price: float = 0
pnl: float = 0
yd_volume: float = 0
cur_price: float = 0 # 当前价
# 股票相关
holder_id: str = "" # 股东代码
@ -229,7 +233,7 @@ class PositionData(BaseData):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
self.vt_positionid = f"{self.gateway_name}.{self.vt_symbol}.{self.direction.value}"
self.vt_accountid = f"{self.gateway_name}.{self.accountid}"
@dataclass
class AccountData(BaseData):

View File

@ -73,7 +73,10 @@ class EnumCell(BaseCell):
Set text using enum.constant.value.
"""
if content:
super(EnumCell, self).set_content(content.value, data)
if isinstance(content, str):
super(EnumCell, self).set_content(content, data)
else:
super(EnumCell, self).set_content(content.value, data)
class DirectionCell(EnumCell):