[update] bug fix

This commit is contained in:
msincenselee 2021-01-09 10:14:08 +08:00
parent 795d83a6b2
commit c5a46e49a1
18 changed files with 1500 additions and 458 deletions

View File

@ -101,6 +101,8 @@ class AccountRecorder(BaseEngine):
self.copy_history_strategypos = [] # 需要复制至历史策略持仓得gateway名称 self.copy_history_strategypos = [] # 需要复制至历史策略持仓得gateway名称
self.timer_count = 0 self.timer_count = 0
self.event_list = [] # 只侦听处理的事件
self.gw_name_acct_id = {} self.gw_name_acct_id = {}
self.cur_trading_date = "" self.cur_trading_date = ""
@ -147,6 +149,8 @@ class AccountRecorder(BaseEngine):
self.account_dict = d.get('accounts', {}) self.account_dict = d.get('accounts', {})
self.is_7x24 = d.get('is_7x24', False) self.is_7x24 = d.get('is_7x24', False)
self.event_list = d.get('event_list',[])
# 识别配置,检查账号是否需要复制委托/成交到历史表 # 识别配置,检查账号是否需要复制委托/成交到历史表
for gateway_name, account_setting in self.account_dict.items(): for gateway_name, account_setting in self.account_dict.items():
if account_setting.get('copy_history_orders', False): if account_setting.get('copy_history_orders', False):
@ -172,18 +176,46 @@ class AccountRecorder(BaseEngine):
def register_event(self): def register_event(self):
"""注册事件监听""" """注册事件监听"""
self.event_engine.register(EVENT_TIMER, self.update_timer) self.event_engine.register(EVENT_TIMER, self.update_timer)
self.event_engine.register(EVENT_ACCOUNT, self.update_account)
self.event_engine.register(EVENT_ORDER, self.update_order) if len(self.event_list) == 0 or EVENT_ACCOUNT in self.event_list:
self.event_engine.register(EVENT_TRADE, self.update_trade) self.write_log(f'绑定EVENT_ACCOUNT事件 =>更新帐号净值')
self.event_engine.register(EVENT_POSITION, self.update_position) self.event_engine.register(EVENT_ACCOUNT, self.update_account)
self.event_engine.register(EVENT_HISTORY_TRADE, self.update_history_trade)
self.event_engine.register(EVENT_HISTORY_ORDER, self.update_history_order) if len(self.event_list) == 0 or EVENT_ORDER in self.event_list:
self.event_engine.register(EVENT_FUNDS_FLOW, self.update_funds_flow) self.write_log(f'绑定EVENT_ORDER事件 =>更新委托事件')
self.event_engine.register(EVENT_STRATEGY_POS, self.update_strategy_pos) self.event_engine.register(EVENT_ORDER, self.update_order)
if len(self.event_list) == 0 or EVENT_TRADE in self.event_list:
self.write_log(f'绑定EVENT_TRADE事件 =>更新成交事件')
self.event_engine.register(EVENT_TRADE, self.update_trade)
if len(self.event_list) == 0 or EVENT_POSITION in self.event_list:
self.write_log(f'绑定EVENT_POSITION事件 =>更新持仓事件')
self.event_engine.register(EVENT_POSITION, self.update_position)
if len(self.event_list) == 0 or EVENT_HISTORY_TRADE in self.event_list:
self.write_log(f'绑定EVENT_HISTORY_TRADE事件 =>更新历史成交')
self.event_engine.register(EVENT_HISTORY_TRADE, self.update_history_trade)
if len(self.event_list) == 0 or EVENT_HISTORY_ORDER in self.event_list:
self.write_log(f'绑定EVENT_HISTORY_ORDER事件 =>更新历史委托')
self.event_engine.register(EVENT_HISTORY_ORDER, self.update_history_order)
if len(self.event_list) == 0 or EVENT_FUNDS_FLOW in self.event_list:
self.write_log(f'绑定EVENT_FUNDS_FLOW事件 =>更新资金流')
self.event_engine.register(EVENT_FUNDS_FLOW, self.update_funds_flow)
if len(self.event_list) == 0 or EVENT_STRATEGY_POS in self.event_list:
self.write_log(f'绑定EVENT_STRATEGY_POS事件 =>更新策略持仓')
self.event_engine.register(EVENT_STRATEGY_POS, self.update_strategy_pos)
self.event_engine.register(EVENT_ERROR, self.process_gw_error) self.event_engine.register(EVENT_ERROR, self.process_gw_error)
self.event_engine.register(EVENT_WARNING, self.process_warning) self.event_engine.register(EVENT_WARNING, self.process_warning)
self.event_engine.register(EVENT_CRITICAL, self.process_critical) self.event_engine.register(EVENT_CRITICAL, self.process_critical)
self.event_engine.register(EVENT_STRATEGY_SNAPSHOT, self.update_strategy_snapshot)
if len(self.event_list) == 0 or EVENT_STRATEGY_SNAPSHOT in self.event_list:
self.write_log(f'绑定EVENT_STRATEGY_SNAPSHOT事件 =>更新策略切片')
self.event_engine.register(EVENT_STRATEGY_SNAPSHOT, self.update_strategy_snapshot)
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def update_timer(self, event: Event): def update_timer(self, event: Event):
@ -200,24 +232,25 @@ class AccountRecorder(BaseEngine):
dt_now = datetime.now() dt_now = datetime.now()
# 提交查询历史交易记录/历史委托/资金流水 if len(self.event_list) == 0 or 'eHistoryTrade.' in self.event_list:
for data_type in [HISTORY_ORDER_COL, HISTORY_TRADE_COL, FUNDS_FLOW_COL]: # 提交查询历史交易记录/历史委托/资金流水
dt = self.last_qry_dict.get(data_type, None) for data_type in [HISTORY_ORDER_COL, HISTORY_TRADE_COL, FUNDS_FLOW_COL]:
queryed = False dt = self.last_qry_dict.get(data_type, None)
for gw_name, account_info in self.account_dict.items(): queryed = False
if dt is None or (dt_now - dt).total_seconds() > 60 * 5: for gw_name, account_info in self.account_dict.items():
begin_day = self.get_begin_day(gw_name, data_type).replace('-', '') if dt is None or (dt_now - dt).total_seconds() > 60 * 5:
end_day = dt_now.strftime('%Y%m%d') begin_day = self.get_begin_day(gw_name, data_type).replace('-', '')
gw = self.main_engine.get_gateway(gw_name) end_day = dt_now.strftime('%Y%m%d')
if gw is None: gw = self.main_engine.get_gateway(gw_name)
continue if gw is None:
if hasattr(gw, 'qryHistory'): continue
self.write_log(u'{}请求{}数据,{}~{}'.format(gw_name, data_type, begin_day, end_day)) if hasattr(gw, 'qryHistory'):
gw.qryHistory(data_type, begin_day, end_day) self.write_log(u'{}请求{}数据,{}~{}'.format(gw_name, data_type, begin_day, end_day))
queryed = True gw.qryHistory(data_type, begin_day, end_day)
queryed = True
if queryed: if queryed:
self.last_qry_dict.update({data_type: dt}) self.last_qry_dict.update({data_type: dt})
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def update_account(self, event: Event): def update_account(self, event: Event):

View File

@ -279,6 +279,7 @@ class CtaEngine(BaseEngine):
strategy = self.orderid_strategy_map.get(order.vt_orderid, None) strategy = self.orderid_strategy_map.get(order.vt_orderid, None)
if not strategy: if not strategy:
self.write_log(f'委托单没有对应的策略设置:order:{order}')
return return
# Remove vt_orderid if order is no longer active. # Remove vt_orderid if order is no longer active.
@ -310,11 +311,13 @@ class CtaEngine(BaseEngine):
# Filter duplicate trade push # Filter duplicate trade push
if trade.vt_tradeid in self.vt_tradeids: if trade.vt_tradeid in self.vt_tradeids:
self.write_log(f'成交单的委托编号不属于本引擎实例:{trade}')
return return
self.vt_tradeids.add(trade.vt_tradeid) self.vt_tradeids.add(trade.vt_tradeid)
strategy = self.orderid_strategy_map.get(trade.vt_orderid, None) strategy = self.orderid_strategy_map.get(trade.vt_orderid, None)
if not strategy: if not strategy:
self.write_log(f'成交单没有对应的策略设置:order:{trade}')
return return
# Update strategy pos before calling on_trade method # Update strategy pos before calling on_trade method

View File

@ -121,6 +121,8 @@ class CtaTemplate(ABC):
'pnl': v.pnl 'pnl': v.pnl
}) })
if len(pos_list) > 0:
self.write_log(f'策略返回持仓信息:{pos_list}')
return pos_list return pos_list
@virtual @virtual
@ -543,8 +545,12 @@ class CtaStockTemplate(CtaTemplate):
self.write_error(f'加载缓存K线数据失败:{str(ex)}') self.write_error(f'加载缓存K线数据失败:{str(ex)}')
return None return None
def get_klines_snapshot(self): def get_klines_snapshot(self, include_kline_names=[]):
"""返回当前klines的切片数据""" """
返回当前klines的切片数据
:param include_kline_names: 如果存在则只保留这些指定得K线
:return:
"""
try: try:
self.write_log(f'获取{self.strategy_name}的切片数据') self.write_log(f'获取{self.strategy_name}的切片数据')
d = { d = {
@ -552,6 +558,9 @@ class CtaStockTemplate(CtaTemplate):
'datetime': datetime.now()} 'datetime': datetime.now()}
klines = {} klines = {}
for kline_name in sorted(self.klines.keys()): for kline_name in sorted(self.klines.keys()):
if len(include_kline_names) > 0:
if kline_name not in include_kline_names:
continue
klines.update({kline_name: self.klines.get(kline_name).get_data()}) klines.update({kline_name: self.klines.get(kline_name).get_data()})
kline_names = list(klines.keys()) kline_names = list(klines.keys())
binary_data = zlib.compress(pickle.dumps(klines)) binary_data = zlib.compress(pickle.dumps(klines))

View File

@ -847,6 +847,11 @@ class BackTestingEngine(object):
# 更新vt_symbol合约与策略的订阅关系 # 更新vt_symbol合约与策略的订阅关系
self.subscribe_symbol(strategy_name=strategy_name, vt_symbol=vt_symbol) self.subscribe_symbol(strategy_name=strategy_name, vt_symbol=vt_symbol)
# 如果idx_symbol不再列表中需要订阅
if 'idx_symbol' in setting.keys() and setting['idx_symbol'] not in self.symbol_strategy_map.keys():
self.write_log(f"新增订阅指数合约:{setting['idx_symbol']}")
self.subscribe_symbol(strategy_name=strategy_name, vt_symbol=setting['idx_symbol'])
if strategy_setting.get('auto_init', False): if strategy_setting.get('auto_init', False):
self.write_log(u'自动初始化策略') self.write_log(u'自动初始化策略')
strategy.on_init() strategy.on_init()
@ -863,6 +868,7 @@ class BackTestingEngine(object):
"""订阅合约""" """订阅合约"""
strategy = self.strategies.get(strategy_name, None) strategy = self.strategies.get(strategy_name, None)
if not strategy: if not strategy:
self.write_log(f'策略{strategy_name}对应的实例不存在,订阅{vt_symbol}失败')
return False return False
# 添加 合约订阅 vt_symbol <=> 策略实例 strategy 映射. # 添加 合约订阅 vt_symbol <=> 策略实例 strategy 映射.

View File

@ -1872,6 +1872,8 @@ class CtaEngine(BaseEngine):
u'{}({})'.format(strategy_pos['strategy_name'], abs(pos.get('volume', 0)))) u'{}({})'.format(strategy_pos['strategy_name'], abs(pos.get('volume', 0))))
self.write_log(u'更新{}策略持多仓=>{}'.format(vt_symbol, symbol_pos.get('策略多单', 0))) self.write_log(u'更新{}策略持多仓=>{}'.format(vt_symbol, symbol_pos.get('策略多单', 0)))
compare_pos.update({vt_symbol: symbol_pos})
pos_compare_result = '' pos_compare_result = ''
# 精简输出 # 精简输出
compare_info = '' compare_info = ''

View File

@ -304,6 +304,7 @@ class CtaTemplate(ABC):
vt_symbol = self.vt_symbol vt_symbol = self.vt_symbol
if not self.trading: if not self.trading:
self.write_log(f'非交易状态')
return [] return []
vt_orderids = self.cta_engine.send_order( vt_orderids = self.cta_engine.send_order(
@ -317,6 +318,11 @@ class CtaTemplate(ABC):
lock=lock, lock=lock,
order_type=order_type order_type=order_type
) )
if len(vt_orderids) == 0:
self.write_error(f'{self.strategy_name}调用cta_engine.send_order委托返回失败,vt_symbol:{vt_symbol}')
# f',direction:{direction.value},offset:{offset.value},'
# f'price:{price},volume:{volume},stop:{stop},lock:{lock},'
# f'order_type:{order_type}')
if order_time is None: if order_time is None:
order_time = datetime.now() order_time = datetime.now()
@ -641,8 +647,6 @@ class CtaProTemplate(CtaTemplate):
if self.idx_symbol is None: if self.idx_symbol is None:
self.idx_symbol = get_underlying_symbol(symbol).upper() + '99.' + self.exchange.value self.idx_symbol = get_underlying_symbol(symbol).upper() + '99.' + self.exchange.value
self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=self.idx_symbol)
if self.vt_symbol != self.idx_symbol: if self.vt_symbol != self.idx_symbol:
self.write_log(f'指数合约:{self.idx_symbol}, 主力合约:{self.vt_symbol}') self.write_log(f'指数合约:{self.idx_symbol}, 主力合约:{self.vt_symbol}')
self.price_tick = self.cta_engine.get_price_tick(self.vt_symbol) self.price_tick = self.cta_engine.get_price_tick(self.vt_symbol)
@ -753,6 +757,7 @@ class CtaProTemplate(CtaTemplate):
""" """
self.write_log(u'init_position(),初始化持仓') self.write_log(u'init_position(),初始化持仓')
pos_symbols = set() pos_symbols = set()
remove_ids = []
if len(self.gt.up_grids) <= 0: if len(self.gt.up_grids) <= 0:
self.position.short_pos = 0 self.position.short_pos = 0
# 加载已开仓的空单数据网格JSON # 加载已开仓的空单数据网格JSON
@ -770,6 +775,14 @@ class CtaProTemplate(CtaTemplate):
sg.order_ids = [] sg.order_ids = []
short_symbol = sg.snapshot.get('mi_symbol', self.vt_symbol) short_symbol = sg.snapshot.get('mi_symbol', self.vt_symbol)
if sg.traded_volume > 0:
if sg.open_status and sg.volume== sg.traded_volume:
msg = f'{self.strategy_name} {short_symbol}空单持仓{sg.volume},已成交:{sg.traded_volume},不加载'
self.write_log(msg)
self.send_wechat(msg)
remove_ids.append(sg.id)
continue
pos_symbols.add(short_symbol) pos_symbols.add(short_symbol)
self.write_log(u'加载持仓空单[ID:{},vt_symbol:{},价格:{}],[指数:{},价格:{}],数量:{}' self.write_log(u'加载持仓空单[ID:{},vt_symbol:{},价格:{}],[指数:{},价格:{}],数量:{}'
.format(sg.id, short_symbol, sg.snapshot.get('open_price'), .format(sg.id, short_symbol, sg.snapshot.get('open_price'),
@ -778,6 +791,11 @@ class CtaProTemplate(CtaTemplate):
self.write_log(u'持久化空单,共持仓:{}'.format(abs(self.position.short_pos))) self.write_log(u'持久化空单,共持仓:{}'.format(abs(self.position.short_pos)))
if len(remove_ids) > 0:
self.gt.remove_grids_by_ids(direction=Direction.SHORT,ids=remove_ids)
remove_ids = []
if len(self.gt.dn_grids) <= 0: if len(self.gt.dn_grids) <= 0:
# 加载已开仓的多数据网格JSON # 加载已开仓的多数据网格JSON
self.position.long_pos = 0 self.position.long_pos = 0
@ -795,6 +813,14 @@ class CtaProTemplate(CtaTemplate):
lg.order_ids = [] lg.order_ids = []
# lg.type = self.line.name # lg.type = self.line.name
long_symbol = lg.snapshot.get('mi_symbol', self.vt_symbol) long_symbol = lg.snapshot.get('mi_symbol', self.vt_symbol)
if lg.traded_volume > 0:
if lg.open_status and lg.volume == lg.traded_volume:
msg = f'{self.strategy_name} {long_symbol}多单持仓{lg.volume},已成交:{lg.traded_volume},不加载'
self.write_log(msg)
self.send_wechat(msg)
remove_ids.append(lg.id)
continue
pos_symbols.add(long_symbol) pos_symbols.add(long_symbol)
self.write_log(u'加载持仓多单[ID:{},vt_symbol:{},价格:{}],[指数{},价格:{}],数量:{}' self.write_log(u'加载持仓多单[ID:{},vt_symbol:{},价格:{}],[指数{},价格:{}],数量:{}'
@ -804,6 +830,9 @@ class CtaProTemplate(CtaTemplate):
self.write_log(f'持久化多单,共持仓:{self.position.long_pos}') self.write_log(f'持久化多单,共持仓:{self.position.long_pos}')
if len(remove_ids) > 0:
self.gt.remove_grids_by_ids(direction=Direction.LONG,ids=remove_ids)
self.position.pos = self.position.long_pos + self.position.short_pos self.position.pos = self.position.long_pos + self.position.short_pos
self.write_log(u'{}加载持久化数据完成,多单:{},空单:{},共:{}' self.write_log(u'{}加载持久化数据完成,多单:{},空单:{},共:{}'
@ -815,15 +844,15 @@ class CtaProTemplate(CtaTemplate):
self.gt.save() self.gt.save()
self.display_grids() self.display_grids()
if not self.backtesting: #if not self.backtesting:
if self.vt_symbol not in pos_symbols: if len(self.vt_symbol) > 0 and self.vt_symbol not in pos_symbols:
pos_symbols.add(self.vt_symbol) pos_symbols.add(self.vt_symbol)
if self.idx_symbol not in pos_symbols: if len(self.idx_symbol) > 0 and self.idx_symbol not in pos_symbols:
pos_symbols.add(self.idx_symbol) pos_symbols.add(self.idx_symbol)
# 如果持仓的合约不在self.vt_symbol中需要订阅 # 如果持仓的合约不在self.vt_symbol中需要订阅
for symbol in list(pos_symbols): for symbol in list(pos_symbols):
self.write_log(f'新增订阅合约:{symbol}') self.write_log(f'新增订阅合约:{symbol}')
self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=symbol) self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=symbol)
def get_positions(self): def get_positions(self):
""" """
@ -888,10 +917,16 @@ class CtaProTemplate(CtaTemplate):
if len(self.active_orders) < 1: if len(self.active_orders) < 1:
self.entrust = 0 self.entrust = 0
def tns_switch_long_pos(self): def tns_switch_long_pos(self, open_new=True):
"""切换合约,从持仓的非主力合约,切换至主力合约""" """
切换合约从持仓的非主力合约切换至主力合约
:param open_new: 是否开仓主力合约
:return:
"""
if self.entrust != 0 and self.position.long_pos == 0: if self.entrust != 0:
return
if self.position.long_pos == 0:
return return
if self.cur_mi_price == 0: if self.cur_mi_price == 0:
@ -956,6 +991,14 @@ class CtaProTemplate(CtaTemplate):
return return
none_mi_grid.snapshot.update({'switched': True}) none_mi_grid.snapshot.update({'switched': True})
# 如果不买入新主力合约,直接返回
# 某些策略会自动重新开仓得
if not open_new:
self.write_log(f'不买入新的主力合约:{self.vt_symbol},数量:{grid.volume}')
self.gt.save()
return
# 添加买入主力合约 # 添加买入主力合约
grid.snapshot.update({'mi_symbol': self.vt_symbol, 'open_price': self.cur_mi_price}) grid.snapshot.update({'mi_symbol': self.vt_symbol, 'open_price': self.cur_mi_price})
self.gt.dn_grids.append(grid) self.gt.dn_grids.append(grid)
@ -974,9 +1017,15 @@ class CtaProTemplate(CtaTemplate):
else: else:
self.write_error(f'持仓换月=>委托卖出非主力合约:{none_mi_symbol}失败') self.write_error(f'持仓换月=>委托卖出非主力合约:{none_mi_symbol}失败')
def tns_switch_short_pos(self): def tns_switch_short_pos(self,open_new=True):
"""切换合约,从持仓的非主力合约,切换至主力合约""" """
if self.entrust != 0 and self.position.short_pos == 0: 切换合约从持仓的非主力合约切换至主力合约
:param open_new: 是否开仓新得主力合约
:return:
"""
if self.entrust != 0:
return
if self.position.short_pos == 0:
return return
if self.cur_mi_price == 0: if self.cur_mi_price == 0:
@ -1029,6 +1078,14 @@ class CtaProTemplate(CtaTemplate):
return return
none_mi_grid.snapshot.update({'switched': True}) none_mi_grid.snapshot.update({'switched': True})
# 如果不开空新主力合约,直接返回
# 某些策略会自动重新开仓得
if not open_new:
self.write_log(f'不开空新的主力合约:{self.vt_symbol},数量:{grid.volume}')
self.gt.save()
return
# 添加卖出主力合约 # 添加卖出主力合约
grid.id = str(uuid.uuid1()) grid.id = str(uuid.uuid1())
grid.snapshot.update({'mi_symbol': self.vt_symbol, 'open_price': self.cur_mi_price}) grid.snapshot.update({'mi_symbol': self.vt_symbol, 'open_price': self.cur_mi_price})

View File

@ -11,5 +11,5 @@ class IndexTickPublisherApp(BaseApp):
app_name = APP_NAME app_name = APP_NAME
app_module = __module__ app_module = __module__
app_path = Path(__file__).parent app_path = Path(__file__).parent
display_name = u'指数行情推送' display_name = u'期货指数行情推送'
engine_class = IndexTickPublisher engine_class = IndexTickPublisher

File diff suppressed because it is too large Load Diff

View File

@ -160,6 +160,16 @@ class CtaRenkoBar(object):
# K 线的相关计算结果数据 # K 线的相关计算结果数据
self.line_pre_high = [] # K线的前para_pre_len的的最高 self.line_pre_high = [] # K线的前para_pre_len的的最高
self.line_pre_low = [] # K线的前para_pre_len的的最低 self.line_pre_low = [] # K线的前para_pre_len的的最低
# 唐其安高点、低点清单(相当于缠论的分型)
self.tqn_high_list = [] # 所有的创新高的高点(分型)清单 { "price":xxx, "datetime": "yyyy-mm-dd HH:MM:SS"}
self.tqn_low_list = [] # 所有的创新低的低点(分型)清单 { "price":xxx, "datetime": "yyyy-mm-dd HH:MM:SS"}
# 唐其安笔清单,相当与缠论的笔,最后一笔是未完成的
self.tqn_bi_list = []
# 唐其安中枢清单,相当于缠论的中枢
self.cur_tqn_zs = {} # 当前唐其安中枢。
self.tqn_zs_list = []
self.line_ma1 = [] # K线的MA1均线周期是InputMaLen1不包含当前bar self.line_ma1 = [] # K线的MA1均线周期是InputMaLen1不包含当前bar
self.line_ma2 = [] # K线的MA2均线周期是InputMaLen2不包含当前bar self.line_ma2 = [] # K线的MA2均线周期是InputMaLen2不包含当前bar
self.line_ma3 = [] # K线的MA2均线周期是InputMaLen2不包含当前bar self.line_ma3 = [] # K线的MA2均线周期是InputMaLen2不包含当前bar
@ -431,6 +441,9 @@ class CtaRenkoBar(object):
self.export_filename = None self.export_filename = None
self.export_fields = [] self.export_fields = []
self.export_bi_filename = None # 通过唐其安通道输出得笔csv文件(不是缠论得笔)
self.export_zs_filename = None # 通过唐其安通道输出的中枢csv文件不是缠论的笔中枢
# 启动实时得函数 # 启动实时得函数
self.rt_funcs = set() self.rt_funcs = set()
@ -761,10 +774,10 @@ class CtaRenkoBar(object):
new_height = int( new_height = int(
max(cur_price / 1000, self.price_tick) * self.kilo_height / self.price_tick) * self.price_tick max(cur_price / 1000, self.price_tick) * self.kilo_height / self.price_tick) * self.price_tick
if new_height != self.height: if new_height != self.height:
self.write_log(u'修改:{}砖块高度:{}=>{}'.format(self.name, self.height, new_height)) #self.write_log(u'修改:{}砖块高度:{}=>{}'.format(self.name, self.height, new_height))
self.height = new_height self.height = new_height
elif height != self.height: elif height != self.height:
self.write_log(u'修改:{}砖块高度:{}=>{}'.format(self.name, self.height, height)) #self.write_log(u'修改:{}砖块高度:{}=>{}'.format(self.name, self.height, height))
self.height = height self.height = height
def runtime_recount(self): def runtime_recount(self):
@ -1213,6 +1226,440 @@ class CtaRenkoBar(object):
del self.line_pre_low[0] del self.line_pre_low[0]
self.line_pre_low.append(pre_low) self.line_pre_low.append(pre_low)
if len(self.line_pre_high) < 2 and len(self.line_pre_low) < 2:
return
# 产生新得高点
if pre_high > self.line_pre_high[-2] and pre_low == self.line_pre_low[-2]:
d = {
'price': pre_high,
'datetime': self.cur_datetime.strftime('%Y-%m-%d %H:%M:%S')
}
# 顺便记录下MACD & DIF 便于判断是否顶背离
if len(self.line_dif) > 0:
d.update({'dif': self.line_dif[-1]})
if len(self.line_macd) > 0:
d.update({'macd': self.line_macd[-1]})
# 当前不存在最后的高点,创建一个
if len(self.tqn_high_list) == 0:
self.tqn_high_list.append(d)
return
# 如果存在最后的高点,最后的低点
last_low_time = self.tqn_low_list[-1].get('datetime', None) if len(self.tqn_low_list) > 0 else None
last_high_time = self.tqn_high_list[-1].get('datetime', None) if len(
self.tqn_high_list) > 0 else None
last_high_price = self.tqn_high_list[-1].get('price') if len(
self.tqn_high_list) > 0 else None
# 低点的时间,比高点的时间更晚, 添加一个新的高点
if last_low_time is not None and last_high_time is not None and last_high_time < last_low_time:
# 添加一个新的高点
self.tqn_high_list.append(d)
# 创建一个未走完的笔,低点-> 高点
self.create_tqn_bi(direction=Direction.LONG)
# 输出确定的一笔(高点->低点) =>csv文件
self.export_tqn_bi()
# 计算是否有中枢
self.update_tqn_zs()
return
# 延续当前的高点
if pre_high > last_high_price:
self.tqn_high_list[-1].update(d)
self.update_tnq_bi(point=d, direction=Direction.LONG)
# 计算是否有中枢
self.update_tqn_zs()
# 产生新得低点
if pre_low < self.line_pre_low[-2] and pre_high == self.line_pre_high[-2]:
d = {'price': pre_low,
'datetime': self.cur_datetime.strftime('%Y-%m-%d %H:%M:%S')}
# 顺便记录下MACD & DIF 便于判断是否顶背离
if len(self.line_dif) > 0:
d.update({'dif': self.line_dif[-1]})
if len(self.line_macd) > 0:
d.update({'macd': self.line_macd[-1]})
# 当前不存在最后的低点,创建一个
if len(self.tqn_low_list) == 0:
self.tqn_low_list.append(d)
return
# 如果存在最后的高点,最后的低点
last_low_time = self.tqn_low_list[-1].get('datetime', None) if len(
self.tqn_low_list) > 0 else None
last_high_time = self.tqn_high_list[-1].get('datetime', None) if len(
self.tqn_high_list) > 0 else None
last_low_price = self.tqn_low_list[-1].get('price', None) if len(
self.tqn_low_list) > 0 else None
# 高点的时间,比低点的时间更晚, 添加一个新的低点
if last_low_time is not None and last_high_time is not None and last_low_time < last_high_time:
# 添加一个新的低点
self.tqn_low_list.append(d)
# 创建一个未走完的笔, 高点->低点
self.create_tqn_bi(direction=Direction.SHORT)
# 输出确定的一笔(低点->高点) =>csv文件
self.export_tqn_bi()
# 计算是否有中枢
self.update_tqn_zs()
return
# 延续当前的低点
if pre_low < last_low_price:
self.tqn_low_list[-1].update(d)
self.update_tnq_bi(point=d, direction=Direction.SHORT)
# 计算是否有中枢
self.update_tqn_zs()
def create_tqn_bi(self, direction):
"""
创建唐其安的笔该笔未走完的
:param direction: 笔的方向 direction Direction.Long, Direction.Short
:return:
"""
# Direction => int
if direction == Direction.LONG:
direction = 1
else:
direction = -1
if len(self.tqn_bi_list) > self.max_hold_bars: # 维持最大缓存数量 超过则删除最前面
del self.tqn_bi_list[0]
# 从低=>高得线段, self.line_low_list[-1] => self.line_high_list[-1]
if direction == 1:
if len(self.tqn_low_list) < 1:
return
low_point = self.tqn_low_list[-1]
high_point = self.tqn_high_list[-1]
d = {
"start": low_point.get('datetime'),
"end": high_point.get('datetime'),
"direction": direction,
"height": abs(high_point.get('price') - low_point.get('price')),
"high": high_point.get('price'),
"low": low_point.get('price')
}
self.tqn_bi_list.append(d)
# 从高=>低得线段, self.line_high_list[-1] => self.line_low_list[-1]
else:
if len(self.tqn_high_list) < 1:
return
high_point = self.tqn_high_list[-1]
low_point = self.tqn_low_list[-1]
d = {
"start": high_point.get('datetime'),
"end": low_point.get('datetime'),
"direction": direction,
"height": abs(high_point.get('price') - low_point.get('price')),
"high": high_point.get('price'),
"low": low_point.get('price')
}
self.tqn_bi_list.append(d)
def update_tnq_bi(self, point, direction):
"""
更新最后一根唐其安的笔"
:param point: dict: {"price", "datetime"}
:param direction:
:return:
"""
if len(self.tqn_bi_list) < 1:
return
# Direction => int
if direction == Direction.LONG:
direction = 1
else:
direction = -1
bi = self.tqn_bi_list[-1]
if bi.get('direction') != direction:
return
# 方向为多
if direction == 1:
bi.update({
"end": point.get('datetime'),
"height": abs(point.get('price') - bi.get('low')),
"high": point.get('price'),
})
# 方向为空
else:
bi.update({
"end": point.get('datetime'),
"height": abs(bi.get('high') - point.get('price')),
"low": point.get('price'),
})
def export_tqn_bi(self):
"""
唐其安高点低点形成的笔输出.csv文件
start.end,direction,height,high,low
2019-01-02 14:15:00,2019-01-02 11:09:00,1,4.0,496.0,492.0
:param: direction Direction.Long, Direction.Short
:return:
"""
if self.export_bi_filename is None:
return
if len(self.tqn_bi_list) < 2:
return
# 直接插入倒数第二条记录,即已经走完的笔
self.append_data(file_name=self.export_bi_filename,
dict_data=self.tqn_bi_list[-2],
field_names=["start", "end", "direction", "height", "high", "low"]
)
# # Direction => int
# if direction == Direction.LONG:
# direction = 1
# else:
# direction = -1
#
# 从低=>高得线段, self.line_low_list[-2] => self.line_high_list[-1]
# if direction == 1:
# if len(self.tqn_low_list) < 2:
# return
# low_point = self.tqn_low_list[-2]
# high_point = self.tqn_high_list[-1]
# d = {
# "start": low_point.get('datetime'),
# "end": high_point.get('datetime'),
# "direction": direction,
# "height": abs(high_point.get('price') - low_point.get('price')),
# "high": high_point.get('price'),
# "low": low_point.get('price')
# }
# if len(self.tqn_bi_list) < 2:
# return
#
# self.append_data(file_name=self.export_bi_filename,
# dict_data=d,
# field_names=["start","end", "direction", "height", "high", "low"]
# )
# 从高=>低得线段, self.line_high_list[-2] => self.line_low_list[-1]
# else:
# if len(self.tqn_high_list) < 2:
# return
# high_point = self.tqn_high_list[-2]
# low_point = self.tqn_low_list[-1]
# d = {
# "start": high_point.get('datetime'),
# "end": low_point.get('datetime'),
# "direction": direction,
# "height": abs(high_point.get('price') - low_point.get('price')),
# "high": high_point.get('price'),
# "low": low_point.get('price')
# }
# self.append_data(file_name=self.export_bi_filename,
# dict_data=d,
# field_names=["start", "end", "direction", "height", "high", "low"]
# )
def update_tqn_zs(self):
"""
更新唐其安中枢
这里跟缠论的中枢不同主要根据最后一笔判断是否与前2前三形成中枢
三笔形成的中枢不算完整
四笔形成的中枢才算完整
如果形成更新如果不形成则剔除
:return:
"""
if len(self.tqn_bi_list) < 4:
return
cur_bi = self.tqn_bi_list[-1] # 当前笔
second_bi = self.tqn_bi_list[-2] # 倒数第二笔
third_bi = self.tqn_bi_list[-3] # 倒数第三笔
four_bi = self.tqn_bi_list[-4] # 倒数第四笔
# 当前笔的方向
direction = cur_bi.get('direction')
# 当前没有中枢
if len(self.cur_tqn_zs) == 0:
# 1,3 的重叠的线段
first_third_high = min(third_bi.get('high'), cur_bi.get('high'))
first_third_low = max(third_bi.get('low'), cur_bi.get('low'))
# 2,4 的重叠线段
second_four_high = min(four_bi.get('high'), second_bi.get('high'))
second_four_low = max(four_bi.get('low'), second_bi.get('low'))
# 上涨中 1-32-4 形成重叠
if second_four_low <= first_third_low < second_four_high <= first_third_high:
# 中枢的方向按照第四笔
self.cur_tqn_zs = {
"direction": four_bi.get('direction'), # 段的方向:进入笔的方向
"start": four_bi.get('end'), # zs的开始
"end": cur_bi.get("end"), # zs的结束时间
"high": min(first_third_high, second_four_high),
"low": max(first_third_low, second_four_low),
"highs": [second_four_high], # 确认的高点清单(后续不能超过)
"lows": [second_four_low], # 确认的低点清单(后续不能超过)
"exit_direction": cur_bi.get('direction'), # 离开笔的方向
"exit_start": cur_bi.get('start')
}
# 更新中枢高度
self.cur_tqn_zs.update({
"height": self.cur_tqn_zs.get('high') - self.cur_tqn_zs.get('low')
})
return
# 下跌中 1-32-4 形成重叠
if first_third_low <= second_four_low < first_third_high <= second_four_high:
# 中枢的方向按照第四笔
self.cur_tqn_zs = {
"direction": four_bi.get('direction'), # 段的方向:进入笔的方向
"start": four_bi.get('end'), # zs的开始
"end": cur_bi.get("end"), # zs的结束时间
"high": min(first_third_high, second_four_high),
"low": max(first_third_low, second_four_low),
"highs": [second_four_high], # 确认的高点清单(后续不能超过)
"lows": [second_four_low], # 确认的低点清单(后续不能超过)
"exit_direction": cur_bi.get('direction'), # 离开笔的方向
"exit_start": cur_bi.get('start')
}
# 更新中枢高度
self.cur_tqn_zs.update({
"height": self.cur_tqn_zs.get('high') - self.cur_tqn_zs.get('low')
})
return
# 当前存在中枢
# 最后一笔是多,且低点在中枢高点上方,中枢确认结束
if direction == 1 and cur_bi.get('low') > self.cur_tqn_zs.get('high'):
self.export_tqn_zs()
self.cur_tqn_zs = {}
return
# 最后一笔是空,且高点在中枢下方,中枢确认结束
if direction == -1 and cur_bi.get('high') < self.cur_tqn_zs.get('low'):
self.export_tqn_zs()
self.cur_tqn_zs = {}
return
# 当前笔是zs的最后一笔
if cur_bi.get("start") == self.cur_tqn_zs.get("exit_start"):
# 当前笔是做多,判断是否创新高
if direction == 1:
# 对比中枢之前所有的确认高点,不能超过
zs_highs = self.cur_tqn_zs.get("highs", [self.cur_tqn_zs.get('high')])
min_high = min(zs_highs)
new_high = min(min_high, cur_bi.get('high'))
# 当前笔的高度为最短,在生长,则更新中枢的结束时间和高度
if min_high >= new_high > self.cur_tqn_zs.get('high'):
self.cur_tqn_zs.update({
"end": cur_bi.get('end'),
"high": new_high})
# 更新中枢高度
self.cur_tqn_zs.update({
"height": self.cur_tqn_zs.get('high') - self.cur_tqn_zs.get('low')
})
else:
# 对比中枢之前所有的确认低点,不能超过
zs_lows = self.cur_tqn_zs.get("lows", [self.cur_tqn_zs.get('low')])
max_low = max(zs_lows)
new_low = max(max_low, cur_bi.get('low'))
# 下跌笔在生长,中枢底部在扩展
if max_low < new_low < self.cur_tqn_zs.get('low'):
self.cur_tqn_zs.update({
"end": cur_bi.get('end'),
"low": new_low})
# 更新中枢高度
self.cur_tqn_zs.update({
"height": self.cur_tqn_zs.get('high') - self.cur_tqn_zs.get('low')
})
# 当前笔 不是中枢最后一笔, 方向是回归中枢的
else:
# 向下的一笔,且回落中枢高位下方,变成中枢的最后一笔
if direction == -1 and cur_bi.get('low') < self.cur_tqn_zs.get('high') \
and cur_bi.get('high') > self.cur_tqn_zs.get('low'):
# 对比中枢之前所有的确认低点,不能超过
zs_lows = self.cur_tqn_zs.get("lows", [self.cur_tqn_zs.get('low')])
max_low = max(zs_lows)
new_low = max(max_low, cur_bi.get('low'))
# 下跌笔在生长,中枢底部在扩展
if max_low < new_low < self.cur_tqn_zs.get('low'):
self.cur_tqn_zs.update({
"end": cur_bi.get('end'),
"low": new_low})
# 更新中枢高度
self.cur_tqn_zs.update({
"height": self.cur_tqn_zs.get('high') - self.cur_tqn_zs.get('low')
})
# 更新中枢的确认高点,更新最后一笔
zs_highs = self.cur_tqn_zs.get("highs", [self.cur_tqn_zs.get('high')])
zs_highs.append(cur_bi.get('high'))
self.cur_tqn_zs.update({
"highs": zs_highs, # 确认的高点清单(后续不能超过)
"exit_direction": cur_bi.get('direction'), # 离开笔的方向
"exit_start": cur_bi.get('start')
})
# 最后一笔的时间,若比中枢的结束时间晚,就更新
if self.cur_tqn_zs.get('end') < cur_bi.get('start'):
self.cur_tqn_zs.update({"end": cur_bi.get("start")})
# 向上的一笔,回抽中枢下轨上方,变成中枢的一笔
if direction == 1 and cur_bi.get('high') > self.cur_tqn_zs.get('low') \
and cur_bi.get('low') < self.cur_tqn_zs.get('high'):
# 对比中枢之前所有的确认高点,不能超过
zs_highs = self.cur_tqn_zs.get("highs", [self.cur_tqn_zs.get('high')])
min_high = min(zs_highs)
new_high = min(min_high, cur_bi.get('high'))
# 当前笔的高度为最短,在生长,则更新中枢的结束时间和高度
if min_high >= new_high > self.cur_tqn_zs.get('high'):
self.cur_tqn_zs.update({
"end": cur_bi.get('end'),
"high": new_high})
# 更新中枢高度
self.cur_tqn_zs.update({
"height": self.cur_tqn_zs.get('high') - self.cur_tqn_zs.get('low')
})
# 更新中枢的确认低点,更新最后一笔
zs_lows = self.cur_tqn_zs.get("lows", [self.cur_tqn_zs.get('low')])
zs_lows.append(cur_bi.get('low'))
self.cur_tqn_zs.update({
"lows": zs_lows, # 确认的低点清单(后续不能超过)
"exit_direction": cur_bi.get('direction'), # 离开笔的方向
"exit_start": cur_bi.get('start')
})
# 最后一笔的时间,若比中枢的结束时间晚,就更新
if self.cur_tqn_zs.get('end') < cur_bi.get('start'):
self.cur_tqn_zs.update({"end": cur_bi.get("start")})
def export_tqn_zs(self):
"""
输出唐其安中枢 = csv文件
:return:
"""
if self.export_zs_filename is None:
return
if len(self.cur_tqn_zs) < 1:
return
# 将当前中枢的信息写入
self.append_data(file_name=self.export_zs_filename,
dict_data=self.cur_tqn_zs,
field_names=["start", "end", "direction", "height", "high", "low"]
)
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def __count_dmi(self): def __count_dmi(self):
"""计算K线的DMI数据和条件""" """计算K线的DMI数据和条件"""
@ -1369,6 +1816,51 @@ class CtaRenkoBar(object):
else: else:
return cur_sar, cur_af return cur_sar, cur_af
def get_sar2(self, direction, cur_sar, cur_af=0, sar_limit=0.2, sar_step=0.02, restore=False):
"""
抛物线计算方法跟随K线加速度)
:param direction: Direction
:param cur_sar: 当前抛物线价格
:param cur_af: 当前抛物线价格
:param sar_limit: 最大加速范围
:param sar_step: 加速因子
:param restore: 恢复初始加速因子
:return: 新的
"""
if np.isnan(self.high_array[-1]):
return cur_sar, cur_af
# 向上抛物线
if direction == Direction.LONG:
# K线每次新高就更新一次af
if self.high_array[-1] > self.high_array[-2]:
af = cur_af + min(sar_step, sar_limit - cur_af)
else:
if restore:
# 恢复首次初始值
af = sar_step
else:
# 保持计算因子不变
af = cur_af
# K线每更新一次就运行一次
ep = self.high_array[-1]
sar = cur_sar + af * (ep - cur_sar)
return sar, af
# 向下抛物线
elif direction == Direction.SHORT:
# K线每次新低就更新一次af
if self.low_array[-1] < self.low_array[-2]:
af = cur_af + min(sar_step, sar_limit - cur_af)
else:
# af = sar_step
af = cur_af
ep = self.low_array[-1]
sar = cur_sar + af * (ep - cur_sar)
return sar, af
else:
return cur_sar, cur_af
def __count_sar(self): def __count_sar(self):
"""计算K线的SAR""" """计算K线的SAR"""
@ -1889,7 +2381,7 @@ class CtaRenkoBar(object):
count_len = min(self.bar_len, self.para_atr1_len) count_len = min(self.bar_len, self.para_atr1_len)
self.cur_atr1 = ta.ATR(self.high_array[-count_len:], self.low_array[-count_len:], self.cur_atr1 = ta.ATR(self.high_array[-count_len:], self.low_array[-count_len:],
self.close_array[-count_len:], count_len) self.close_array[-count_len:], count_len)
self.cur_atr1 = round(self.cur_atr1, self.round_n) self.cur_atr1 = round(self.cur_atr1[-1], self.round_n)
if len(self.line_atr1) > self.max_hold_bars: if len(self.line_atr1) > self.max_hold_bars:
del self.line_atr1[0] del self.line_atr1[0]
self.line_atr1.append(self.cur_atr1) self.line_atr1.append(self.cur_atr1)
@ -1898,7 +2390,7 @@ class CtaRenkoBar(object):
count_len = min(self.bar_len, self.para_atr2_len) count_len = min(self.bar_len, self.para_atr2_len)
self.cur_atr2 = ta.ATR(self.high_array[-count_len:], self.low_array[-count_len:], self.cur_atr2 = ta.ATR(self.high_array[-count_len:], self.low_array[-count_len:],
self.close_array[-count_len:], count_len) self.close_array[-count_len:], count_len)
self.cur_atr2 = round(self.cur_atr2, self.round_n) self.cur_atr2 = round(self.cur_atr2[-1], self.round_n)
if len(self.line_atr2) > self.max_hold_bars: if len(self.line_atr2) > self.max_hold_bars:
del self.line_atr2[0] del self.line_atr2[0]
self.line_atr2.append(self.cur_atr2) self.line_atr2.append(self.cur_atr2)
@ -1907,7 +2399,7 @@ class CtaRenkoBar(object):
count_len = min(self.bar_len, self.para_atr3_len) count_len = min(self.bar_len, self.para_atr3_len)
self.cur_atr3 = ta.ATR(self.high_array[-count_len:], self.low_array[-count_len:], self.cur_atr3 = ta.ATR(self.high_array[-count_len:], self.low_array[-count_len:],
self.close_array[-count_len:], count_len) self.close_array[-count_len:], count_len)
self.cur_atr3 = round(self.cur_atr3, self.round_n) self.cur_atr3 = round(self.cur_atr3[-1], self.round_n)
if len(self.line_atr3) > self.max_hold_bars: if len(self.line_atr3) > self.max_hold_bars:
del self.line_atr3[0] del self.line_atr3[0]
@ -4084,7 +4576,7 @@ class CtaRenkoBar(object):
if not os.path.exists(file_name): if not os.path.exists(file_name):
self.write_log(u'create csv file:{}'.format(file_name)) self.write_log(u'create csv file:{}'.format(file_name))
with open(file_name, 'a', encoding='utf8', newline='') as csvWriteFile: with open(file_name, 'a', encoding='utf8', newline='') as csvWriteFile:
writer = csv.DictWriter(f=csvWriteFile, fieldnames=dict_fieldnames, dialect='excel') writer = csv.DictWriter(f=csvWriteFile, fieldnames=dict_fieldnames, dialect='excel', extrasaction='ignore')
self.write_log(u'write csv header:{}'.format(dict_fieldnames)) self.write_log(u'write csv header:{}'.format(dict_fieldnames))
writer.writeheader() writer.writeheader()
writer.writerow(dict_data) writer.writerow(dict_data)

View File

@ -24,7 +24,16 @@ import baostock as bs
import pandas as pd import pandas as pd
ADJUST_FACTOR_FILE = 'stock_adjust_factor.pkb2' ADJUST_FACTOR_FILE = 'stock_adjust_factor.pkb2'
# 格式: {vt_symbol: [{dict},{dict}]
#d = {
# 'exchange': exchange.value, # 证券交易所
# 'code': stock_code, # 证券代码
# 'name': stock_name, # 证券中文名称
# 'dividOperateDate': row[1], # 除权除息日期
# 'foreAdjustFactor': float(row[2]), # 向前复权因子 除权除息日前一个交易日的收盘价/除权除息日最近的一个交易日的前收盘价
# 'backAdjustFactor': float(row[3]), # 向后复权因子 除权除息日最近的一个交易日的前收盘价/除权除息日前一个交易日的收盘价
# 'adjustFactor': float(row[4]) # 本次复权因子
# }
def get_all_adjust_factor(): def get_all_adjust_factor():
""" 获取所有股票复权因子""" """ 获取所有股票复权因子"""

View File

@ -70,7 +70,7 @@ INIT_TDX_MARKET_MAP = {
'CJL9': 28, 'CYL9': 28, 'FGL9': 28, 'JRL9': 28, 'LRL9': 28, 'MAL9': 28, 'CJL9': 28, 'CYL9': 28, 'FGL9': 28, 'JRL9': 28, 'LRL9': 28, 'MAL9': 28,
'OIL9': 28, 'PML9': 28, 'RIL9': 28, 'RML9': 28, 'RSL9': 28, 'SFL9': 28, 'OIL9': 28, 'PML9': 28, 'RIL9': 28, 'RML9': 28, 'RSL9': 28, 'SFL9': 28,
'SML9': 28, 'SRL9': 28, 'TAL9': 28, 'ICL9': 47, 'IFL9': 47, 'IHL9': 47, 'SML9': 28, 'SRL9': 28, 'TAL9': 28, 'ICL9': 47, 'IFL9': 47, 'IHL9': 47,
'TFL9': 47, 'TL9': 47, 'TSL9': 47, 'SAL9': 28, 'PGL9': 29, 'PFL9': 28,} 'TFL9': 47, 'TL9': 47, 'TSL9': 47, 'SAL9': 28, 'PGL9': 29, 'PFL9': 28, 'LH':29}
# 常量 # 常量
QSIZE = 500 QSIZE = 500
ALL_MARKET_BEGIN_HOUR = 8 ALL_MARKET_BEGIN_HOUR = 8
@ -88,6 +88,48 @@ def get_tdx_marketid(symbol):
return market_id return market_id
def get_3rd_friday():
"""获取当前月的第三个星期五"""
first_day_in_month = datetime.now().replace(day=1) # 本月第一天
# 获取当前月的所有星期5的日
fridays = [i for i in range(1, 28) if (first_day_in_month + timedelta(days=i - 1)).isoweekday() == 5]
if len(fridays) < 3:
raise Exception(f'获取当前月异常:{fridays}')
# 第三个星期五,是第几天
third_friday = fridays[2]
return datetime.now().replace(day=third_friday)
def convert_cffex_symbol(mi_symbol):
"""
转换中证交易所的合约
如果当前日为当前月的第三个星期的星期5前两天则提前转换为下个月的合约
:param mi_symbol:
:return:
"""
underly_symbol = get_underlying_symbol(mi_symbol).upper()
# 第三个星期五
third_friday = get_3rd_friday()
# 日期是否为星期三,星期四
if not third_friday - timedelta(days=2) <= datetime.now() <= third_friday:
return mi_symbol
cur_year_month = datetime.now().strftime('%y%m')
# 合约是否大于当月合约
if mi_symbol > f'{underly_symbol}{cur_year_month}':
return mi_symbol
next_year_month = (datetime.now() + timedelta(days=30)).strftime('%y%m')
return f'{underly_symbol}{next_year_month}'
class TdxFutureData(object): class TdxFutureData(object):
exclude_ips = [] exclude_ips = []
@ -889,16 +931,21 @@ class TdxFutureData(object):
vn_exchange = Exchange.INE vn_exchange = Exchange.INE
else: else:
vn_exchange = Tdx_Vn_Exchange_Map.get(str(tdx_market_id)) vn_exchange = Tdx_Vn_Exchange_Map.get(str(tdx_market_id))
# 根据合约全路径、交易所 => 真实合约
mi_symbol = get_real_symbol_by_exchange(full_symbol, vn_exchange) mi_symbol = get_real_symbol_by_exchange(full_symbol, vn_exchange)
# if underlying_symbol == 'IC': if underlying_symbol in ['IC', 'IF', 'IH']:
# debug = 1 mi_symbol = convert_cffex_symbol(mi_symbol)
# 更新登记 短合约:真实主力合约 # 更新登记 短合约:真实主力合约
self.write_log( self.write_log(
'{},{},{},{},{}'.format(tdx_market_id, full_symbol, underlying_symbol, mi_symbol, vn_exchange)) '{},{},{},{},{}'.format(tdx_market_id, full_symbol, underlying_symbol, mi_symbol, vn_exchange))
if underlying_symbol in self.future_contracts: if underlying_symbol in self.future_contracts:
info = self.future_contracts.get(underlying_symbol) info = self.future_contracts.get(underlying_symbol)
if mi_symbol != info.get('mi_symbol', ''): cur_mi_symbol = info.get('mi_symbol', None)
if cur_mi_symbol is None or mi_symbol > cur_mi_symbol:
self.write_log(u'主力合约变化:{} =>{}'.format(info.get('mi_symbol'), mi_symbol)) self.write_log(u'主力合约变化:{} =>{}'.format(info.get('mi_symbol'), mi_symbol))
info.update({'mi_symbol': mi_symbol, 'full_symbol': full_symbol}) info.update({'mi_symbol': mi_symbol, 'full_symbol': full_symbol})
self.future_contracts.update({underlying_symbol: info}) self.future_contracts.update({underlying_symbol: info})

View File

@ -312,14 +312,18 @@ class TdxStockData(object):
if return_bar: if return_bar:
self.write_log('dataframe => [BarData]') self.write_log('dataframe => [BarData]')
exchange = TDX_VN_STOCK_MARKET_MAP.get(market_id, Exchange.LOCAL)
for index, row in data.iterrows(): for index, row in data.iterrows():
add_bar = BarData()
try: try:
add_bar.symbol = symbol add_bar = BarData(
add_bar.datetime = index gateway_name='tdx',
symbol=symbol,
exchange=exchange,
datetime=index
)
add_bar.date = row['date'] add_bar.date = row['date']
add_bar.time = row['time'] add_bar.time = row['time']
add_bar.trading_date = row['trading_date'] add_bar.trading_day = row['trading_date']
add_bar.open_price = float(row['open']) add_bar.open_price = float(row['open'])
add_bar.high_price = float(row['high']) add_bar.high_price = float(row['high'])
add_bar.low_price = float(row['low']) add_bar.low_price = float(row['low'])

View File

@ -64,15 +64,15 @@ corr_rate = round(abs(corr.iloc[0, 1]) * 100, 2)
# api_01.get_bars('IF99', period='1min', callback=t1.display_bar, bar_freq=1) # api_01.get_bars('IF99', period='1min', callback=t1.display_bar, bar_freq=1)
# 获取bar只返回 list[dict] # 获取bar只返回 list[dict]
#
result, bars = api_01.get_bars('SA2101', period='1min', return_bar=False) # result, bars = api_01.get_bars('SA99', period='1min', return_bar=False)
if result: # if result:
print('前十根bar') # print('前十根bar')
for bar in bars[0:10]: # for bar in bars[0:10]:
print(bar) # print(bar)
print('后十根bar') # print('后十根bar')
for bar in bars[-10:]: # for bar in bars[-10:]:
print(bar) # print(bar)
# result,datas = api_01.get_transaction_data(symbol='ni1905') # result,datas = api_01.get_transaction_data(symbol='ni1905')
# api_02 = TdxFutureData(t2) # api_02 = TdxFutureData(t2)
@ -83,10 +83,10 @@ if result:
#for r in result[0:10] + result[-10:]: #for r in result[0:10] + result[-10:]:
# print(r) # print(r)
# 获取历史分时数据 # # 获取历史分时数据
# ret, result = api_01.get_history_transaction_data('RB99', '20190109') # ret, result = api_01.get_history_transaction_data('RB99', '20201027')
# for r in result[0:10] + result[-10:]: # for r in result[0:10] + result[-10:]:
# print(r) # print(r)
# 更新本地合约缓存信息 # 更新本地合约缓存信息
#api_01.update_mi_contracts() api_01.update_mi_contracts()

View File

@ -651,9 +651,9 @@ class CtpMdApi(MdApi):
dt = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S.%f") dt = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S.%f")
# 不处理开盘前的tick数据 # 不处理开盘前的tick数据
if dt.hour in [8, 20] and dt.minute < 59: if dt.hour in [8, 20] and dt.minute <= 59:
return return
if exchange is Exchange.CFFEX and dt.hour == 9 and dt.minute < 29: if exchange is Exchange.CFFEX and dt.hour == 9 and dt.minute <= 29:
return return
tick = TickData( tick = TickData(
@ -1014,11 +1014,11 @@ class CtpTdApi(TdApi):
account.available = round(float(data["Available"]), 7) account.available = round(float(data["Available"]), 7)
account.commission = round(float(data['Commission']), 7) account.commission = round(float(data['Commission']), 7)
account.margin = round(float(data['CurrMargin']), 7) account.margin = round(float(data['CurrMargin']), 7)
account.close_profit = round(float(data['CloseProfit']), 7) + round( account.close_profit = round(float(data['CloseProfit']), 7) #+ round(
float(data.get("SpecProductCloseProfit", 0)), 7) #float(data.get("SpecProductCloseProfit", 0)), 7)
account.holding_profit = round(float(data['PositionProfit']), 7) + round( account.holding_profit = round(float(data['PositionProfit']), 7) #+ round(
float(data.get("SpecProductPositionProfit", 0)), 7) + round( #float(data.get("SpecProductPositionProfit", 0)), 7) + round(
float(data.get("SpecProductPositionProfitByAlg", 0)), 7) #float(data.get("SpecProductPositionProfitByAlg", 0)), 7)
account.trading_day = str(data['TradingDay']) account.trading_day = str(data['TradingDay'])
if '-' not in account.trading_day and len(account.trading_day) == 8: if '-' not in account.trading_day and len(account.trading_day) == 8:
account.trading_day = '-'.join( account.trading_day = '-'.join(
@ -1037,6 +1037,7 @@ class CtpTdApi(TdApi):
""" """
product = PRODUCT_CTP2VT.get(data["ProductClass"], None) product = PRODUCT_CTP2VT.get(data["ProductClass"], None)
if product: if product:
contract = ContractData( contract = ContractData(
symbol=data["InstrumentID"], symbol=data["InstrumentID"],
exchange=EXCHANGE_CTP2VT[data["ExchangeID"]], exchange=EXCHANGE_CTP2VT[data["ExchangeID"]],
@ -1046,6 +1047,9 @@ class CtpTdApi(TdApi):
pricetick=data["PriceTick"], pricetick=data["PriceTick"],
gateway_name=self.gateway_name gateway_name=self.gateway_name
) )
# if 'SA' in contract.symbol:
# self.gateway.write_log(print_dict(data))
# 保证金费率(期权合约的保证金比例数值可能不对所以设置个0.2的最大值) # 保证金费率(期权合约的保证金比例数值可能不对所以设置个0.2的最大值)
contract.margin_rate = min(0.2, max(data.get('LongMarginRatio', 0), data.get('ShortMarginRatio', 0))) contract.margin_rate = min(0.2, max(data.get('LongMarginRatio', 0), data.get('ShortMarginRatio', 0)))
if contract.margin_rate == 0: if contract.margin_rate == 0:

View File

@ -1739,6 +1739,7 @@ class PbTdApi(object):
def send_order(self, req: OrderRequest): def send_order(self, req: OrderRequest):
"""委托发单""" """委托发单"""
self.gateway.write_log(f'委托发单:{req.__dict__}')
if self.gateway.file_type == 'dbf': if self.gateway.file_type == 'dbf':
return self.send_order_dbf(req) return self.send_order_dbf(req)
else: else:

View File

@ -53,7 +53,7 @@ from vnpy.trader.constant import (
OptionType, OptionType,
Interval Interval
) )
from vnpy.trader.gateway import BaseGateway from vnpy.trader.gateway import BaseGateway,TickCombiner
from vnpy.trader.object import ( from vnpy.trader.object import (
TickData, TickData,
BarData, BarData,
@ -866,9 +866,11 @@ class RohonTdApi(TdApi):
def onRspQryInvestorPosition(self, data: dict, error: dict, reqid: int, last: bool): def onRspQryInvestorPosition(self, data: dict, error: dict, reqid: int, last: bool):
"""""" """"""
if not data: if not data:
print('onRspQryInvestorPosition:not data')
return return
if data.get("InstrumentID") not in symbol_exchange_map: if data.get("InstrumentID") not in symbol_exchange_map:
print('onRspQryInvestorPosition: {} not in symbol_exchange_map'.format(data.get("InstrumentID")))
return return
# Get buffered position object # Get buffered position object
@ -1045,7 +1047,7 @@ class RohonTdApi(TdApi):
Callback of order status update. Callback of order status update.
""" """
if self.gateway.debug: if self.gateway.debug:
print(f'onRtnOrder') print(f'onRtnOrder{print_dict(data)}')
symbol = data["InstrumentID"] symbol = data["InstrumentID"]
exchange = symbol_exchange_map.get(symbol, "") exchange = symbol_exchange_map.get(symbol, "")
@ -1093,7 +1095,7 @@ class RohonTdApi(TdApi):
Callback of trade status update. Callback of trade status update.
""" """
if self.gateway.debug: if self.gateway.debug:
print(f'onRtnTrade') print(f'onRtnTrade:{print_dict(data)}')
symbol = data["InstrumentID"] symbol = data["InstrumentID"]
exchange = symbol_exchange_map.get(symbol, "") exchange = symbol_exchange_map.get(symbol, "")
@ -2078,226 +2080,3 @@ class TqMdApi():
except Exception as e: except Exception as e:
self.gateway.write_log('退出天勤行情api异常:{}'.format(str(e))) self.gateway.write_log('退出天勤行情api异常:{}'.format(str(e)))
class TickCombiner(object):
"""
Tick合成类
"""
def __init__(self, gateway, setting):
self.gateway = gateway
self.gateway_name = self.gateway.gateway_name
self.gateway.write_log(u'创建tick合成类:{}'.format(setting))
self.symbol = setting.get('symbol', None)
self.leg1_symbol = setting.get('leg1_symbol', None)
self.leg2_symbol = setting.get('leg2_symbol', None)
self.leg1_ratio = setting.get('leg1_ratio', 1) # 腿1的数量配比
self.leg2_ratio = setting.get('leg2_ratio', 1) # 腿2的数量配比
self.price_tick = setting.get('price_tick', 1) # 合成价差加比后的最小跳动
# 价差
self.is_spread = setting.get('is_spread', False)
# 价比
self.is_ratio = setting.get('is_ratio', False)
self.last_leg1_tick = None
self.last_leg2_tick = None
# 价差日内最高/最低价
self.spread_high = None
self.spread_low = None
# 价比日内最高/最低价
self.ratio_high = None
self.ratio_low = None
# 当前交易日
self.trading_day = None
if self.is_ratio and self.is_spread:
self.gateway.write_error(u'{}参数有误,不能同时做价差/加比.setting:{}'.format(self.symbol, setting))
return
self.gateway.write_log(u'初始化{}合成器成功'.format(self.symbol))
if self.is_spread:
self.gateway.write_log(
u'leg1:{} * {} - leg2:{} * {}'.format(self.leg1_symbol, self.leg1_ratio, self.leg2_symbol,
self.leg2_ratio))
if self.is_ratio:
self.gateway.write_log(
u'leg1:{} * {} / leg2:{} * {}'.format(self.leg1_symbol, self.leg1_ratio, self.leg2_symbol,
self.leg2_ratio))
def on_tick(self, tick):
"""OnTick处理"""
combinable = False
if tick.symbol == self.leg1_symbol:
# leg1合约
self.last_leg1_tick = tick
if self.last_leg2_tick is not None:
if self.last_leg1_tick.datetime.replace(microsecond=0) == self.last_leg2_tick.datetime.replace(
microsecond=0):
combinable = True
elif tick.symbol == self.leg2_symbol:
# leg2合约
self.last_leg2_tick = tick
if self.last_leg1_tick is not None:
if self.last_leg2_tick.datetime.replace(microsecond=0) == self.last_leg1_tick.datetime.replace(
microsecond=0):
combinable = True
# 不能合并
if not combinable:
return
if not self.is_ratio and not self.is_spread:
return
# 以下情况,基本为单腿涨跌停,不合成价差/价格比 Tick
if (self.last_leg1_tick.ask_price_1 == 0 or self.last_leg1_tick.bid_price_1 == self.last_leg1_tick.limit_up) \
and self.last_leg1_tick.ask_volume_1 == 0:
self.gateway.write_log(
u'leg1:{0}涨停{1}不合成价差Tick'.format(self.last_leg1_tick.vt_symbol, self.last_leg1_tick.bid_price_1))
return
if (self.last_leg1_tick.bid_price_1 == 0 or self.last_leg1_tick.ask_price_1 == self.last_leg1_tick.limit_down) \
and self.last_leg1_tick.bid_volume_1 == 0:
self.gateway.write_log(
u'leg1:{0}跌停{1}不合成价差Tick'.format(self.last_leg1_tick.vt_symbol, self.last_leg1_tick.ask_price_1))
return
if (self.last_leg2_tick.ask_price_1 == 0 or self.last_leg2_tick.bid_price_1 == self.last_leg2_tick.limit_up) \
and self.last_leg2_tick.ask_volume_1 == 0:
self.gateway.write_log(
u'leg2:{0}涨停{1}不合成价差Tick'.format(self.last_leg2_tick.vt_symbol, self.last_leg2_tick.bid_price_1))
return
if (self.last_leg2_tick.bid_price_1 == 0 or self.last_leg2_tick.ask_price_1 == self.last_leg2_tick.limit_down) \
and self.last_leg2_tick.bid_volume_1 == 0:
self.gateway.write_log(
u'leg2:{0}跌停{1}不合成价差Tick'.format(self.last_leg2_tick.vt_symbol, self.last_leg2_tick.ask_price_1))
return
if self.trading_day != tick.trading_day:
self.trading_day = tick.trading_day
self.spread_high = None
self.spread_low = None
self.ratio_high = None
self.ratio_low = None
if self.is_spread:
spread_tick = TickData(gateway_name=self.gateway_name,
symbol=self.symbol,
exchange=Exchange.SPD,
datetime=tick.datetime)
spread_tick.trading_day = tick.trading_day
spread_tick.date = tick.date
spread_tick.time = tick.time
# 叫卖价差=leg1.ask_price_1 * 配比 - leg2.bid_price_1 * 配比volume为两者最小
spread_tick.ask_price_1 = round_to(target=self.price_tick,
value=self.last_leg1_tick.ask_price_1 * self.leg1_ratio - self.last_leg2_tick.bid_price_1 * self.leg2_ratio)
spread_tick.ask_volume_1 = min(self.last_leg1_tick.ask_volume_1, self.last_leg2_tick.bid_volume_1)
# 叫买价差=leg1.bid_price_1 * 配比 - leg2.ask_price_1 * 配比volume为两者最小
spread_tick.bid_price_1 = round_to(target=self.price_tick,
value=self.last_leg1_tick.bid_price_1 * self.leg1_ratio - self.last_leg2_tick.ask_price_1 * self.leg2_ratio)
spread_tick.bid_volume_1 = min(self.last_leg1_tick.bid_volume_1, self.last_leg2_tick.ask_volume_1)
# 最新价
spread_tick.last_price = round_to(target=self.price_tick,
value=(spread_tick.ask_price_1 + spread_tick.bid_price_1) / 2)
# 昨收盘价
if self.last_leg2_tick.pre_close > 0 and self.last_leg1_tick.pre_close > 0:
spread_tick.pre_close = round_to(target=self.price_tick,
value=self.last_leg1_tick.pre_close * self.leg1_ratio - self.last_leg2_tick.pre_close * self.leg2_ratio)
# 开盘价
if self.last_leg2_tick.open_price > 0 and self.last_leg1_tick.open_price > 0:
spread_tick.open_price = round_to(target=self.price_tick,
value=self.last_leg1_tick.open_price * self.leg1_ratio - self.last_leg2_tick.open_price * self.leg2_ratio)
# 最高价
if self.spread_high:
self.spread_high = max(self.spread_high, spread_tick.ask_price_1)
else:
self.spread_high = spread_tick.ask_price_1
spread_tick.high_price = self.spread_high
# 最低价
if self.spread_low:
self.spread_low = min(self.spread_low, spread_tick.bid_price_1)
else:
self.spread_low = spread_tick.bid_price_1
spread_tick.low_price = self.spread_low
self.gateway.on_tick(spread_tick)
if self.is_ratio:
ratio_tick = TickData(
gateway_name=self.gateway_name,
symbol=self.symbol,
exchange=Exchange.SPD,
datetime=tick.datetime
)
ratio_tick.trading_day = tick.trading_day
ratio_tick.date = tick.date
ratio_tick.time = tick.time
# 比率tick = (腿1 * 腿1 手数 / 腿2价格 * 腿2手数) 百分比
ratio_tick.ask_price_1 = 100 * self.last_leg1_tick.ask_price_1 * self.leg1_ratio \
/ (self.last_leg2_tick.bid_price_1 * self.leg2_ratio) # noqa
ratio_tick.ask_price_1 = round_to(
target=self.price_tick,
value=ratio_tick.ask_price_1
)
ratio_tick.ask_volume_1 = min(self.last_leg1_tick.ask_volume_1, self.last_leg2_tick.bid_volume_1)
ratio_tick.bid_price_1 = 100 * self.last_leg1_tick.bid_price_1 * self.leg1_ratio \
/ (self.last_leg2_tick.ask_price_1 * self.leg2_ratio) # noqa
ratio_tick.bid_price_1 = round_to(
target=self.price_tick,
value=ratio_tick.bid_price_1
)
ratio_tick.bid_volume_1 = min(self.last_leg1_tick.bid_volume_1, self.last_leg2_tick.ask_volume_1)
ratio_tick.last_price = (ratio_tick.ask_price_1 + ratio_tick.bid_price_1) / 2
ratio_tick.last_price = round_to(
target=self.price_tick,
value=ratio_tick.last_price
)
# 昨收盘价
if self.last_leg2_tick.pre_close > 0 and self.last_leg1_tick.pre_close > 0:
ratio_tick.pre_close = 100 * self.last_leg1_tick.pre_close * self.leg1_ratio / (
self.last_leg2_tick.pre_close * self.leg2_ratio) # noqa
ratio_tick.pre_close = round_to(
target=self.price_tick,
value=ratio_tick.pre_close
)
# 开盘价
if self.last_leg2_tick.open_price > 0 and self.last_leg1_tick.open_price > 0:
ratio_tick.open_price = 100 * self.last_leg1_tick.open_price * self.leg1_ratio / (
self.last_leg2_tick.open_price * self.leg2_ratio) # noqa
ratio_tick.open_price = round_to(
target=self.price_tick,
value=ratio_tick.open_price
)
# 最高价
if self.ratio_high:
self.ratio_high = max(self.ratio_high, ratio_tick.ask_price_1)
else:
self.ratio_high = ratio_tick.ask_price_1
ratio_tick.high_price = self.spread_high
# 最低价
if self.ratio_low:
self.ratio_low = min(self.ratio_low, ratio_tick.bid_price_1)
else:
self.ratio_low = ratio_tick.bid_price_1
ratio_tick.low_price = self.spread_low
self.gateway.on_tick(ratio_tick)

View File

@ -386,16 +386,19 @@ class TickCombiner(object):
# leg1合约 # leg1合约
self.last_leg1_tick = tick self.last_leg1_tick = tick
if self.last_leg2_tick is not None: if self.last_leg2_tick is not None:
if self.last_leg1_tick.datetime.replace(microsecond=0) == self.last_leg2_tick.datetime.replace( #if self.last_leg1_tick.datetime.replace(microsecond=0) == self.last_leg2_tick.datetime.replace(
microsecond=0): # microsecond=0):
# 有些跨交易所时间戳会不一致差1~2秒
if abs((self.last_leg1_tick.datetime - self.last_leg2_tick.datetime).total_seconds())<3:
combinable = True combinable = True
elif tick.symbol == self.leg2_symbol: elif tick.symbol == self.leg2_symbol:
# leg2合约 # leg2合约
self.last_leg2_tick = tick self.last_leg2_tick = tick
if self.last_leg1_tick is not None: if self.last_leg1_tick is not None:
if self.last_leg2_tick.datetime.replace(microsecond=0) == self.last_leg1_tick.datetime.replace( # if self.last_leg2_tick.datetime.replace(microsecond=0) == self.last_leg1_tick.datetime.replace(
microsecond=0): # microsecond=0):
if abs((self.last_leg1_tick.datetime - self.last_leg2_tick.datetime).total_seconds()) < 3:
combinable = True combinable = True
# 不能合并 # 不能合并

View File

@ -153,6 +153,12 @@ class MainWindow(QtWidgets.QMainWindow):
"contract.ico", "contract.ico",
partial(self.open_widget, ContractManager, "contract") partial(self.open_widget, ContractManager, "contract")
) )
self.add_menu_action(
help_menu,
"保存合约",
"contract.ico",
self.save_contracts
)
self.add_menu_action( self.add_menu_action(
help_menu, help_menu,
@ -318,6 +324,9 @@ class MainWindow(QtWidgets.QMainWindow):
self.restoreState(state) self.restoreState(state)
self.restoreGeometry(geometry) self.restoreGeometry(geometry)
def save_contracts(self) -> None:
self.main_engine.save_contracts()
def restore_window_setting(self) -> None: def restore_window_setting(self) -> None:
""" """
Restore window to default setting. Restore window to default setting.