[增强功能] gateway增加最新价格dict,rpc增加保存k线缓存调用

This commit is contained in:
msincenselee 2020-07-11 10:57:00 +08:00
parent 22535f7acf
commit 658faafd8b
18 changed files with 1133 additions and 53 deletions

View File

@ -0,0 +1 @@
from . import py_t2sdk

BIN
vnpy/api/t2sdk/py_t2sdk.pyd Normal file

Binary file not shown.

BIN
vnpy/api/t2sdk/t2sdk.dll Normal file

Binary file not shown.

View File

@ -40,7 +40,7 @@ from vnpy.trader.event import (
EVENT_WARNING,
EVENT_CRITICAL,
)
# from vnpy.trader.constant import Direction
from vnpy.trader.constant import Direction, Exchange, Status
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.utility import get_trading_date, load_json, save_json
from vnpy.data.mongo.mongo_data import MongoData
@ -334,6 +334,9 @@ class AccountRecorder(BaseEngine):
if len(order.sys_orderid) == 0:
# 未有系统的委托编号,不做持久化
order.sys_orderid = order.orderid
if order.status in [Status.SUBMITTING]:
return
dt = getattr(order, 'datetime')
if not dt:
order_date = datetime.now().strftime('%Y-%m-%d')
@ -350,9 +353,9 @@ class AccountRecorder(BaseEngine):
'account_id': order.accountid,
'sys_orderid': order.sys_orderid,
'order_date': order_date,
'holder_id': getattr(order,'holder_id','')}
'holder_id': getattr(order, 'holder_id', '')}
data = copy.copy(order.__dict__)
data = copy.deepcopy(order.__dict__)
data.update({'account_id': data.pop('accountid')})
data.update({'order_date': order_date})
data.update({'exchange': order.exchange.value})
@ -361,6 +364,10 @@ class AccountRecorder(BaseEngine):
data.update({'type': order.type.value})
data.update({'status': order.status.value})
if order.exchange in [Exchange.SSE, Exchange.SZSE]:
if hasattr(self.main_engine, 'get_name'):
data.update({'name': self.main_engine.get_name(order.vt_symbol)})
self.update_data(db_name=ACCOUNT_DB_NAME, col_name=TODAY_ORDER_COL, fld=fld, data=data)
# 数据库需要提前建立多重索引
@ -373,11 +380,11 @@ class AccountRecorder(BaseEngine):
'account_id': order.accountid,
'sys_orderid': order.sys_orderid,
'order_date': order_date,
'holder_id': getattr(order,'holder_id','')}
'holder_id': getattr(order, 'holder_id', '')}
self.update_data(db_name=ACCOUNT_DB_NAME, col_name=HISTORY_ORDER_COL, fld=fld2, data=history_data)
def get_trading_date(self, dt:datetime):
def get_trading_date(self, dt: datetime):
if self.is_7x24:
return dt.strftime('%Y-%m-%d')
else:
@ -397,7 +404,7 @@ class AccountRecorder(BaseEngine):
# 提前创建索引
# db.today_trades.createIndex({'account_id':1,'vt_symbol':1,'vt_tradeid':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtSymbol_vt_tradeid_trade_date_holder_id','unique':true})
data = copy.copy(trade.__dict__)
data = copy.deepcopy(trade.__dict__)
data.update({'account_id': data.pop('accountid')})
data.update({'trade_date': trade_date})
data.update({'exchange': trade.exchange.value})

View File

@ -184,6 +184,7 @@ class CtaEngine(BaseEngine):
self.main_engine.reload_strategy = self.reload_strategy
self.main_engine.save_strategy_data = self.save_strategy_data
self.main_engine.save_strategy_snapshot = self.save_strategy_snapshot
self.main_engine.clean_strategy_cache = self.clean_strategy_cache
# 注册到远程服务调用
if self.main_engine.rpc_service:
@ -198,6 +199,7 @@ class CtaEngine(BaseEngine):
self.main_engine.rpc_service.register(self.main_engine.reload_strategy)
self.main_engine.rpc_service.register(self.main_engine.save_strategy_data)
self.main_engine.rpc_service.register(self.main_engine.save_strategy_snapshot)
self.main_engine.rpc_service.register(self.main_engine.clean_strategy_cache)
def process_timer_event(self, event: Event):
""" 处理定时器事件"""
@ -217,7 +219,7 @@ class CtaEngine(BaseEngine):
# 主动获取所有策略得持仓信息
all_strategy_pos = self.get_all_strategy_pos()
if dt.minute % 5 == 0:
if dt.minute % 5 == 0 and self.engine_config.get('compare_pos',True):
# 比对仓位,使用上述获取得持仓信息,不用重复获取
self.compare_pos(strategy_pos_list=copy(all_strategy_pos))
@ -1209,6 +1211,15 @@ class CtaEngine(BaseEngine):
self.write_error(u'保存策略{}数据异常:'.format(strategy_name, str(ex)))
self.write_error(traceback.format_exc())
def clean_strategy_cache(self, strategy_name):
"""清除策略K线缓存文件"""
cache_file = os.path.abspath(os.path.join(self.get_data_path(), f'{strategy_name}_klines.pkb2'))
if os.path.exists(cache_file):
self.write_log(f'移除策略缓存文件:{cache_file}')
os.remove(cache_file)
else:
self.write_log(f'策略缓存文件不存在:{cache_file}')
def get_strategy_snapshot(self, strategy_name):
"""实时获取策略的K线切片比较耗性能"""
strategy = self.strategies.get(strategy_name, None)
@ -1511,6 +1522,23 @@ class CtaEngine(BaseEngine):
value = getattr(strategy, parameter, None)
return value
def get_none_strategy_pos_list(self):
"""获取非策略持有的仓位"""
# 格式 [ 'strategy_name':'account', 'pos': [{'vt_symbol': '', 'direction': 'xxx', 'volume':xxx }] } ]
none_strategy_pos_file = os.path.abspath(os.path.join(os.getcwd(), 'data', 'none_strategy_pos.json'))
if not os.path.exists(none_strategy_pos_file):
return []
try:
with open(none_strategy_pos_file, encoding='utf8') as f:
pos_list = json.load(f)
if isinstance(pos_list, list):
return pos_list
return []
except Exception as ex:
self.write_error(u'未能读取或解释{}'.format(none_strategy_pos_file))
return []
def compare_pos(self, strategy_pos_list=[], auto_balance=False):
"""
对比账号&策略的持仓,不同的话则发出微信提醒
@ -1527,6 +1555,10 @@ class CtaEngine(BaseEngine):
strategy_pos_list = self.get_all_strategy_pos()
self.write_log(u'策略持仓清单:{}'.format(strategy_pos_list))
none_strategy_pos = self.get_none_strategy_pos_list()
if len(none_strategy_pos) > 0:
strategy_pos_list.extend(none_strategy_pos)
# 需要进行对比得合约集合(来自策略持仓/账号持仓)
vt_symbols = set()

View File

@ -1321,7 +1321,7 @@ class CtaFutureTemplate(CtaTemplate):
continue
if grid.open_status and not grid.order_status:
up_grids_info += f'持空中: [数量:{grid.volume}\n, 开仓时间:{grid.open_time}]\n'
up_grids_info += f'持空中: [数量:{grid.volume}, 开仓时间:{grid.open_time}]\n'
continue
if not grid.open_status and grid.order_status:
@ -1355,8 +1355,795 @@ class CtaFutureTemplate(CtaTemplate):
"""显示事务的过程记录=》 log"""
if not self.inited:
return
self.write_log(u'{} 当前 {}价格:{}'
.format(self.cur_datetime, self.vt_symbol, self.cur_price))
self.write_log(u'{} 当前 {}价格:{}, 委托状态:{}'
.format(self.cur_datetime, self.vt_symbol, self.cur_price, self.entrust))
if len(self.active_orders) > 0:
self.write_log('当前活动订单:{}'.format(self.active_orders))
if hasattr(self, 'policy'):
policy = getattr(self, 'policy')
if policy:
op = getattr(policy, 'to_json', None)
if callable(op):
self.write_log(u'当前Policy:{}'.format(json.dumps(policy.to_json(), indent=2, ensure_ascii=False)))
def save_dist(self, dist_data):
"""
保存策略逻辑过程记录= csv文件按
:param dist_data:
:return:
"""
if self.backtesting:
save_path = self.cta_engine.get_logs_path()
else:
save_path = self.cta_engine.get_data_path()
try:
if 'margin' not in dist_data:
dist_data.update({'margin': dist_data.get('price', 0) * dist_data.get('volume',
0) * self.cta_engine.get_margin_rate(
dist_data.get('symbol', self.vt_symbol))})
if 'datetime' not in dist_data:
dist_data.update({'datetime': self.cur_datetime})
if self.position and 'long_pos' not in dist_data:
dist_data.update({'long_pos': self.position.long_pos})
if self.position and 'short_pos' not in dist_data:
dist_data.update({'short_pos': self.position.short_pos})
file_name = os.path.abspath(os.path.join(save_path, f'{self.strategy_name}_dist.csv'))
append_data(file_name=file_name, dict_data=dist_data, field_names=self.dist_fieldnames)
except Exception as ex:
self.write_error(u'save_dist 异常:{} {}'.format(str(ex), traceback.format_exc()))
def save_tns(self, tns_data):
"""
保存多空事务记录=csv文件,便于后续分析
:param tns_data:
:return:
"""
if self.backtesting:
save_path = self.cta_engine.get_logs_path()
else:
save_path = self.cta_engine.get_data_path()
try:
file_name = os.path.abspath(os.path.join(save_path, f'{self.strategy_name}_tns.csv'))
append_data(file_name=file_name, dict_data=tns_data)
except Exception as ex:
self.write_error(u'save_tns 异常:{} {}'.format(str(ex), traceback.format_exc()))
def send_wechat(self, msg: str):
"""实盘时才发送微信"""
if self.backtesting:
return
self.cta_engine.send_wechat(msg=msg, strategy=self)
class CtaSpotTemplate(CtaTemplate):
"""
现货模板
"""
asset_symbol = "" # 资产币 BTCUSDT => BTC
quote_symbol = "" # 定价币 BTCUSDT => USDT
price_tick = 0.01 # 商品的最小价格跳动
symbol_size = 1 # 商品得合约乘数
margin_rate = 1 # 商品的保证金
volumn_tick = 0.01 # 商品最小成交数量
# 委托类型
order_type = OrderType.LIMIT
cancel_seconds = 120 # 撤单时间(秒)
activate_market = False
# 资金相关
max_invest_rate = 0.1 # 最大仓位(0~1) asset / virtual_quote
max_invest_margin = 0 # 资金上限 0不限制 virtual_quote
max_invest_pos = 0 # 单向头寸数量上限 0不限制 asset
# 是否回测状态
backtesting = False
# 逻辑过程日志
dist_fieldnames = ['datetime', 'symbol', 'volume', 'price', 'margin',
'operation', 'signal', 'stop_price', 'target_price',
'long_pos']
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""""""
# vt_symbol => symbol, exchange
self.symbol, self.exchange = extract_vt_symbol(vt_symbol)
self.position = None # 仓位组件
self.policy = None # 事务执行组件
self.gt = None # 网格交易组件
self.klines = {} # K线组件字典: kline_name: kline
self.price_tick = 0.01 # 商品的最小价格跳动
self.symbol_size = 1 # 商品得合约乘数
self.margin_rate = 1 # 商品的保证金
self.volumn_tick = 0.01 # 商品最小成交数量
self.cancel_seconds = 120 # 撤单时间(秒)
self.activate_market = False
self.order_type = OrderType.LIMIT
self.backtesting = False
self.cur_datetime: datetime = None # 当前Tick时间
self.cur_tick: TickData = None # 最新的合约tick( vt_symbol)
self.cur_price = None # 当前价(主力合约 vt_symbol)
self.asset_pos = None # 当前asset_symbol持仓信息
self.quote_pos = None # 当前quote_symbol的持仓信息
self.last_minute = None # 最后的分钟,用于on_tick内每分钟处理的逻辑
self.display_bars = True
super().__init__(
cta_engine, strategy_name, vt_symbol, setting
)
# 增加仓位管理模块
self.position = CtaPosition(strategy=self)
self.position.maxPos = sys.maxsize
# 增加网格持久化模块
self.gt = CtaGridTrade(strategy=self)
if 'backtesting' not in self.parameters:
self.parameters.append('backtesting')
def update_setting(self, setting: dict):
"""
Update strategy parameter wtih value in setting dict.
"""
for name in self.parameters:
if name in setting:
setattr(self, name, setting[name])
self.price_tick = self.cta_engine.get_price_tick(self.vt_symbol)
self.symbol_size = self.cta_engine.get_size(self.vt_symbol)
self.margin_rate = self.cta_engine.get_margin_rate(self.vt_symbol)
self.volumn_tick = self.cta_engine.get_volume_tick(self.vt_symbol)
# 检查资产币+定价币是否与vt_symbol一致
if self.symbol != f'{self.asset_symbol}{self.quote_symbol}':
raise Exception(f'{self.vt_symbol}{self.asset_symbol}+{self.quote_symbol}不匹配')
if self.activate_market:
self.write_log(f'{self.strategy_name}使用市价单委托方式')
self.order_type = OrderType.MARKET
def sync_data(self):
"""同步更新数据"""
if not self.backtesting:
self.write_log(u'保存k线缓存数据')
self.save_klines_to_cache()
if self.inited and self.trading:
self.write_log(u'保存policy数据')
self.policy.save()
def save_klines_to_cache(self, kline_names: list = []):
"""
保存K线数据到缓存
:param kline_names: 一般为self.klines的keys
:return:
"""
if len(kline_names) == 0:
kline_names = list(self.klines.keys())
# 获取保存路径
save_path = self.cta_engine.get_data_path()
# 保存缓存的文件名
file_name = os.path.abspath(os.path.join(save_path, f'{self.strategy_name}_klines.pkb2'))
with bz2.BZ2File(file_name, 'wb') as f:
klines = {}
for kline_name in kline_names:
kline = self.klines.get(kline_name, None)
# if kline:
# kline.strategy = None
# kline.cb_on_bar = None
klines.update({kline_name: kline})
pickle.dump(klines, f)
def load_klines_from_cache(self, kline_names: list = []):
"""
从缓存加载K线数据
:param kline_names:
:return:
"""
if len(kline_names) == 0:
kline_names = list(self.klines.keys())
save_path = self.cta_engine.get_data_path()
file_name = os.path.abspath(os.path.join(save_path, f'{self.strategy_name}_klines.pkb2'))
try:
last_bar_dt = None
with bz2.BZ2File(file_name, 'rb') as f:
klines = pickle.load(f)
# 逐一恢复K线
for kline_name in kline_names:
# 缓存的k线实例
cache_kline = klines.get(kline_name, None)
# 当前策略实例的K线实例
strategy_kline = self.klines.get(kline_name, None)
if cache_kline and strategy_kline:
# 临时保存当前的回调函数
cb_on_bar = strategy_kline.cb_on_bar
# 缓存实例数据 =》 当前实例数据
strategy_kline.__dict__.update(cache_kline.__dict__)
# 所有K线的最后时间
if last_bar_dt and strategy_kline.cur_datetime:
last_bar_dt = max(last_bar_dt, strategy_kline.cur_datetime)
else:
last_bar_dt = strategy_kline.cur_datetime
# 重新绑定k线策略与on_bar回调函数
strategy_kline.strategy = self
strategy_kline.cb_on_bar = cb_on_bar
self.write_log(f'恢复{kline_name}缓存数据,最新bar结束时间:{last_bar_dt}')
self.write_log(u'加载缓存k线数据完毕')
return last_bar_dt
except Exception as ex:
self.write_error(f'加载缓存K线数据失败:{str(ex)}')
return None
def get_klines_snapshot(self):
"""返回当前klines的切片数据"""
try:
d = {
'strategy': self.strategy_name,
'datetime': datetime.now()}
klines = {}
for kline_name in sorted(self.klines.keys()):
klines.update({kline_name: self.klines.get(kline_name).get_data()})
kline_names = list(klines.keys())
binary_data = zlib.compress(pickle.dumps(klines))
d.update({'kline_names': kline_names, 'klines': binary_data, 'zlib': True})
return d
except Exception as ex:
self.write_error(f'获取klines切片数据失败:{str(ex)}')
return {}
def init_policy(self):
self.write_log(u'init_policy(),初始化执行逻辑')
if self.policy:
self.policy.load()
def init_position(self):
"""
初始化Positin
使用网格的持久化获取开仓状态的多空单更新
:return:
"""
self.write_log(u'init_position(),初始化持仓')
changed = False
if len(self.gt.dn_grids) <= 0:
# 加载已开仓的多数据网格JSON
self.position.long_pos = 0
long_grids = self.gt.load(direction=Direction.LONG, open_status_filter=[True])
if len(long_grids) == 0:
self.write_log(u'没有持久化的多单数据')
self.gt.dn_grids = []
else:
self.gt.dn_grids = long_grids
for lg in long_grids:
if len(lg.order_ids) > 0 or lg.order_status:
self.write_log(f'重置委托状态:{lg.order_status},清除委托单:{lg.order_ids}')
lg.order_status = False
[self.cancel_order(vt_orderid) for vt_orderid in lg.order_ids]
lg.order_ids = []
changed = True
self.write_log(u'加载持仓多单[{},价格:{},数量:{}手, 开仓时间:{}'
.format(lg.vt_symbol, lg.open_price, lg.volume, lg.open_time))
self.position.long_pos = round(self.position.long_pos + lg.volume, 7)
self.write_log(f'持久化多单,共持仓:{self.position.long_pos}')
self.position.pos = round(self.position.long_pos + self.position.short_pos, 7)
self.write_log(u'{}加载持久化数据完成,多单:{},空单:{},共:{}'
.format(self.strategy_name,
self.position.long_pos,
abs(self.position.short_pos),
self.position.pos))
self.pos = self.position.pos
if changed:
self.gt.save()
self.display_grids()
def get_positions(self):
"""
获取策略当前持仓(重构使用主力合约
:return: [{'vt_symbol':symbol,'direction':direction,'volume':volume]
"""
if not self.position:
return []
pos_list = []
if self.position.long_pos > 0:
for g in self.gt.get_opened_grids(direction=Direction.LONG):
pos_list.append({'vt_symbol': f'{self.asset_symbol}.{self.exchange.value}',
'direction': 'long',
'volume': g.volume - g.traded_volume,
'price': g.open_price})
if self.cur_datetime and (datetime.now() - self.cur_datetime).total_seconds() < 10:
self.write_log(u'{}当前持仓:{}'.format(self.strategy_name, pos_list))
return pos_list
def on_trade(self, trade: TradeData):
"""交易更新"""
self.write_log(u'{},交易更新:{},当前持仓:{} '
.format(self.cur_datetime,
trade.__dict__,
self.position.pos))
dist_record = dict()
if self.backtesting:
dist_record['datetime'] = trade.time
else:
dist_record['datetime'] = ' '.join([self.cur_datetime.strftime('%Y-%m-%d'), trade.time])
dist_record['volume'] = trade.volume
dist_record['price'] = trade.price
dist_record['margin'] = trade.price * trade.volume * self.cta_engine.get_margin_rate(trade.vt_symbol)
dist_record['symbol'] = trade.vt_symbol
if trade.direction == Direction.LONG and trade.offset == Offset.OPEN:
dist_record['operation'] = 'buy'
self.position.open_pos(trade.direction, volume=trade.volume)
dist_record['long_pos'] = self.position.long_pos
dist_record['short_pos'] = self.position.short_pos
if trade.direction == Direction.SHORT and trade.offset == Offset.OPEN:
dist_record['operation'] = 'short'
self.position.open_pos(trade.direction, volume=trade.volume)
dist_record['long_pos'] = self.position.long_pos
dist_record['short_pos'] = self.position.short_pos
if trade.direction == Direction.LONG and trade.offset != Offset.OPEN:
dist_record['operation'] = 'cover'
self.position.close_pos(trade.direction, volume=trade.volume)
dist_record['long_pos'] = self.position.long_pos
dist_record['short_pos'] = self.position.short_pos
if trade.direction == Direction.SHORT and trade.offset != Offset.OPEN:
dist_record['operation'] = 'sell'
self.position.close_pos(trade.direction, volume=trade.volume)
dist_record['long_pos'] = self.position.long_pos
dist_record['short_pos'] = self.position.short_pos
self.save_dist(dist_record)
self.pos = self.position.pos
def on_order(self, order: OrderData):
"""报单更新"""
# 未执行的订单中,存在是异常,删除
self.write_log(u'{}报单更新,{}'.format(self.cur_datetime, order.__dict__))
if order.vt_orderid in self.active_orders:
if order.volume == order.traded and order.status in [Status.ALLTRADED]:
self.on_order_all_traded(order)
elif order.offset == Offset.OPEN and order.status in [Status.CANCELLED]:
# 开仓委托单被撤销
self.on_order_open_canceled(order)
elif order.offset != Offset.OPEN and order.status in [Status.CANCELLED]:
# 平仓委托单被撤销
self.on_order_close_canceled(order)
elif order.status == Status.REJECTED:
if order.offset == Offset.OPEN:
self.write_error(u'{}委托单开{}被拒price:{},total:{},traded:{}status:{}'
.format(order.vt_symbol, order.direction, order.price, order.volume,
order.traded, order.status))
self.on_order_open_canceled(order)
else:
self.write_error(u'OnOrder({})委托单平{}被拒price:{},total:{},traded:{}status:{}'
.format(order.vt_symbol, order.direction, order.price, order.volume,
order.traded, order.status))
self.on_order_close_canceled(order)
else:
self.write_log(u'委托单未完成,total:{},traded:{},tradeStatus:{}'
.format(order.volume, order.traded, order.status))
else:
self.write_error(u'委托单{}不在策略的未完成订单列表中:{}'.format(order.vt_orderid, self.active_orders))
def on_order_all_traded(self, order: OrderData):
"""
订单全部成交
:param order:
:return:
"""
self.write_log(u'{},委托单:{}全部完成'.format(order.time, order.vt_orderid))
order_info = self.active_orders[order.vt_orderid]
# 通过vt_orderid找到对应的网格
grid = order_info.get('grid', None)
if grid is not None:
# 移除当前委托单
if order.vt_orderid in grid.order_ids:
grid.order_ids.remove(order.vt_orderid)
# 网格的所有委托单已经执行完毕
if len(grid.order_ids) == 0:
grid.order_status = False
grid.traded_volume = 0
# 平仓完毕cover sell
if order.offset != Offset.OPEN:
grid.open_status = False
grid.close_status = True
if grid.volume < order.traded:
self.write_log(f'网格平仓数量{grid.volume},小于委托单成交数量:{order.volume},修正为:{order.volume}')
grid.volume = order.traded
self.write_log(f'{grid.direction.value}单已平仓完毕,order_price:{order.price}'
+ f',volume:{order.volume}')
self.write_log(f'移除网格:{grid.to_json()}')
self.gt.remove_grids_by_ids(direction=grid.direction, ids=[grid.id])
# 开仓完毕( buy, short)
else:
grid.open_status = True
grid.open_time = self.cur_datetime
self.write_log(f'{grid.direction.value}单已开仓完毕,order_price:{order.price}'
+ f',volume:{order.volume}')
# 网格的所有委托单部分执行完毕
else:
old_traded_volume = grid.traded_volume
grid.traded_volume += order.volume
grid.traded_volume = round(grid.traded_volume, 7)
self.write_log(f'{grid.direction.value}单部分{order.offset}仓,'
+ f'网格volume:{grid.volume}, traded_volume:{old_traded_volume}=>{grid.traded_volume}')
self.write_log(f'剩余委托单号:{grid.order_ids}')
self.gt.save()
# 在策略得活动订单中,移除
self.write_log(f'委托单{order.vt_orderid}完成,从活动订单中移除')
self.active_orders.pop(order.vt_orderid, None)
def on_order_open_canceled(self, order: OrderData):
"""
委托开仓单撤销
:param order:
:return:
"""
self.write_log(u'委托开仓单撤销:{}'.format(order.__dict__))
if order.vt_orderid not in self.active_orders:
self.write_error(u'{}不在未完成的委托单中{}'.format(order.vt_orderid, self.active_orders))
return
# 直接更新“未完成委托单”更新volume,retry次数
old_order = self.active_orders[order.vt_orderid]
self.write_log(u'{} 委托信息:{}'.format(order.vt_orderid, old_order))
old_order['traded'] = order.traded
grid = old_order.get('grid', None)
pre_status = old_order.get('status', Status.NOTTRADED)
if pre_status == Status.CANCELLED:
self.write_log(f'当前状态已经是{Status.CANCELLED},不做调整处理')
return
old_order.update({'status': Status.CANCELLED})
self.write_log(u'委托单状态:{}=>{}'.format(pre_status, old_order.get('status')))
if grid:
if order.vt_orderid in grid.order_ids:
grid.order_ids.remove(order.vt_orderid)
if order.traded > 0:
pre_traded_volume = grid.traded_volume
grid.traded_volume = round(grid.traded_volume + order.traded, 7)
self.write_log(f'撤单中部分开仓:{order.traded} + 原已成交:{pre_traded_volume} => {grid.traded_volume}')
if len(grid.order_ids) == 0:
grid.order_status = False
if grid.traded_volume > 0:
pre_volume = grid.volume
grid.volume = grid.traded_volume
grid.traded_volume = 0
grid.open_status = True
self.write_log(f'开仓完成grid.volume {pre_volume} => {grid.volume}')
self.gt.save()
self.active_orders.update({order.vt_orderid: old_order})
self.display_grids()
def on_order_close_canceled(self, order: OrderData):
"""委托平仓单撤销"""
self.write_log(u'委托平仓单撤销:{}'.format(order.__dict__))
if order.vt_orderid not in self.active_orders:
self.write_error(u'{}不在未完成的委托单中:{}'.format(order.vt_orderid, self.active_orders))
return
# 直接更新“未完成委托单”更新volume,Retry次数
old_order = self.active_orders[order.vt_orderid]
self.write_log(u'{} 订单信息:{}'.format(order.vt_orderid, old_order))
old_order['traded'] = order.traded
grid = old_order.get('grid', None)
pre_status = old_order.get('status', Status.NOTTRADED)
if pre_status == Status.CANCELLED:
self.write_log(f'当前状态已经是{Status.CANCELLED},不做调整处理')
return
old_order.update({'status': Status.CANCELLED})
self.write_log(u'委托单状态:{}=>{}'.format(pre_status, old_order.get('status')))
if grid:
if order.vt_orderid in grid.order_ids:
grid.order_ids.remove(order.vt_orderid)
if order.traded > 0:
pre_traded_volume = grid.traded_volume
grid.traded_volume = round(grid.traded_volume + order.traded, 7)
self.write_log(f'撤单中部分平仓成交:{order.traded} + 原已成交:{pre_traded_volume} => {grid.traded_volume}')
if len(grid.order_ids) == 0:
grid.order_status = False
if grid.traded_volume > 0:
pre_volume = grid.volume
grid.volume = round(grid.volume - grid.traded_volume, 7)
grid.traded_volume = 0
if grid.volume <= 0:
grid.volume = 0
grid.open_status = False
self.write_log(f'强制全部平仓完成')
else:
self.write_log(f'平仓委托中撤单完成部分成交减少持仓grid.volume {pre_volume} => {grid.volume}')
self.gt.save()
self.active_orders.update({order.vt_orderid: old_order})
self.display_grids()
def on_stop_order(self, stop_order: StopOrder):
self.write_log(f'停止单触发:{stop_order.__dict__}')
def grid_check_stop(self):
"""
网格逐一止损/止盈检查 (根据指数价格进行止损止盈
:return:
"""
if self.entrust != 0:
return
if not self.trading and not self.inited:
self.write_error(u'当前不允许交易')
return
# 多单网格逐一止损/止盈检查:
long_grids = self.gt.get_opened_grids(direction=Direction.LONG)
for g in long_grids:
if g.stop_price > 0 and g.stop_price > self.cur_price and g.open_status and not g.order_status:
# 调用平仓模块
self.write_log(u'{} {}当前价:{} 触发多单止损线{},开仓价:{},v{}'.
format(self.cur_datetime,
g.vt_symbol,
self.cur_price,
g.stop_price,
g.open_price,
g.volume))
if self.grid_sell(g):
self.write_log(u'多单止盈/止损委托成功')
else:
self.write_error(u'多单止损委托失败')
def grid_buy(self, grid):
"""
事务开多仓
:return:
"""
if self.backtesting:
buy_price = self.cur_price + self.price_tick
else:
buy_price = self.cur_tick.ask_price_1
if self.quote_pos is None:
self.write_error(u'无法获取{}得持仓信息'.format(self.quote_symbol))
return False
vt_orderids = self.buy(vt_symbol=self.vt_symbol,
price=buy_price,
volume=grid.volume,
order_type=self.order_type,
order_time=self.cur_datetime,
grid=grid)
if len(vt_orderids) > 0:
self.write_log(u'创建{}事务多单,开仓价:{},数量:{},止盈价:{},止损价:{}'
.format(grid.type, grid.open_price, grid.volume, grid.close_price, grid.stop_price))
self.gt.save()
return True
else:
self.write_error(u'创建{}事务多单,委托失败,开仓价:{},数量:{},止盈价:{}'
.format(grid.type, grid.open_price, grid.volume, grid.close_price))
return False
def grid_sell(self, grid):
"""
事务平多单仓位
1.来源自止损止盈平仓
:param 平仓网格
:return:
"""
self.write_log(u'执行事务平多仓位:{}'.format(grid.to_json()))
if self.asset_pos is None:
self.write_error(u'无法获取{}得持仓信息'.format(self.asset_symbol))
return False
# 发出委托卖出单
if self.backtesting:
sell_price = self.cur_price - self.price_tick
else:
sell_price = self.cur_tick.bid_price_1
# 发出平多委托
if grid.traded_volume > 0:
grid.volume -= grid.traded_volume
grid.volume = round(grid.volume, 7)
grid.traded_volume = 0
if self.asset_pos.volume <= 0:
self.write_error(u'当前{}的净持仓:{},不能平多单'
.format(self.asset_symbol,
self.asset_pos.volume))
return False
if self.asset_pos.volume < grid.volume:
self.write_error(u'当前{}的净持仓:{},不满足平仓目标:{}, 强制降低'
.format(self.asset_symbol,
self.asset_pos.volume,
grid.volume))
grid.volume = self.asset_pos.volume
vt_orderids = self.sell(
vt_symbol=self.vt_symbol,
price=sell_price,
volume=grid.volume,
order_type=self.order_type,
order_time=self.cur_datetime,
grid=grid)
if len(vt_orderids) == 0:
if self.backtesting:
self.write_error(u'多单平仓委托失败')
else:
self.write_error(u'多单平仓委托失败')
return False
else:
self.write_log(u'多单平仓委托成功,编号:{}'.format(vt_orderids))
return True
def cancel_all_orders(self):
"""
重载撤销所有正在进行得委托
:return:
"""
self.write_log(u'撤销所有正在进行得委托')
self.tns_cancel_logic(dt=datetime.now(), force=True, reopen=False)
def tns_cancel_logic(self, dt, force=False, reopen=False):
"撤单逻辑"""
if len(self.active_orders) < 1:
self.entrust = 0
return
canceled_ids = []
for vt_orderid in list(self.active_orders.keys()):
order_info = self.active_orders[vt_orderid]
order_vt_symbol = order_info.get('vt_symbol', self.vt_symbol)
order_time = order_info['order_time']
order_volume = order_info['volume'] - order_info['traded']
order_grid = order_info['grid']
order_status = order_info.get('status', Status.NOTTRADED)
order_type = order_info.get('order_type', OrderType.LIMIT)
over_seconds = (dt - order_time).total_seconds()
# 只处理未成交的限价委托单
if order_status in [Status.SUBMITTING, Status.NOTTRADED] and order_type == OrderType.LIMIT:
if over_seconds > self.cancel_seconds or force: # 超过设置的时间还未成交
self.write_log(u'超时{}秒未成交取消委托单vt_orderid:{},order:{}'
.format(over_seconds, vt_orderid, order_info))
order_info.update({'status': Status.CANCELLING})
self.active_orders.update({vt_orderid: order_info})
ret = self.cancel_order(str(vt_orderid))
if not ret:
self.write_log(u'撤单失败,更新状态为撤单成功')
order_info.update({'status': Status.CANCELLED})
self.active_orders.update({vt_orderid: order_info})
if order_grid and vt_orderid in order_grid.order_ids:
order_grid.order_ids.remove(vt_orderid)
continue
# 处理状态为‘撤销’的委托单
elif order_status == Status.CANCELLED:
self.write_log(u'委托单{}已成功撤单,删除{}'.format(vt_orderid, order_info))
canceled_ids.append(vt_orderid)
if order_info['offset'] == Offset.OPEN \
and order_grid \
and len(order_grid.order_ids) == 0 \
and not order_grid.open_status \
and not order_grid.order_status \
and order_grid.traded_volume == 0:
self.write_log(u'移除从未开仓成功的委托网格{}'.format(order_grid.__dict__))
order_info['grid'] = None
self.gt.remove_grids_by_ids(direction=order_grid.direction, ids=[order_grid.id])
# 删除撤单的订单
for vt_orderid in canceled_ids:
self.write_log(f'活动订单撤单成功,移除{vt_orderid}')
self.active_orders.pop(vt_orderid, None)
if len(self.active_orders) == 0:
self.entrust = 0
def display_grids(self):
"""更新网格显示信息"""
if not self.inited:
return
self.assett_pos = self.cta_engine.get_position(vt_symbol=f'{self.asset_symbol}.{self.exchange.value}', direction=Direction.NET)
if self.asset_pos:
self.write_log(
f'账号{self.asset_symbol}持仓:{self.asset_pos.volume}, 冻结:{self.asset_pos.frozen}')
self.quote_pos = self.cta_engine.get_position(vt_symbol=f'{self.quote_symbol}.{self.exchange.value}', direction=Direction.NET)
if self.quote_pos:
self.write_log(
f'账号{self.quote_symbol}持仓:{self.quote_pos.volume}, 冻结:{self.quote_pos.frozen}')
dn_grids_info = ""
for grid in list(self.gt.dn_grids):
if grid.close_status and not grid.open_status and grid.order_status:
dn_grids_info += f'平多中: {grid.vt_symbol}[已平:{grid.traded_volume} => 目标:{grid.volume}, 平仓价格:{grid.close_price},委托时间:{grid.order_time}]\n'
if len(grid.order_ids) > 0:
dn_grids_info += f'委托单号:{grid.order_ids}'
continue
if grid.open_status and not grid.order_status and not grid.close_status:
dn_grids_info += f'持多中: {grid.vt_symbol}[数量:{grid.volume}, 开仓价格:{grid.open_price},开仓时间:{grid.open_time}]\n'
continue
if not grid.open_status and grid.order_status and not grid.close_status:
dn_grids_info += f'开多中: {grid.vt_symbol}[已开:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}]\n'
if len(grid.order_ids) > 0:
dn_grids_info += f'委托单号:{grid.order_ids}'
if len(dn_grids_info) > 0:
self.write_log(dn_grids_info)
def display_tns(self):
"""显示事务的过程记录=》 log"""
if not self.inited:
return
self.write_log(u'{} 当前 {}价格:{}, 委托状态:{}'
.format(self.cur_datetime, self.vt_symbol, self.cur_price, self.entrust))
if len(self.active_orders) > 0:
self.write_log('当前活动订单:{}'.format(json.dumps(self.active_orders, indent=2, ensure_ascii=False)))
if hasattr(self, 'policy'):
policy = getattr(self, 'policy')

View File

@ -184,6 +184,7 @@ class CtaEngine(BaseEngine):
register the funcs to main_engine
:return:
"""
self.main_engine.get_name = self.get_name
self.main_engine.get_strategy_status = self.get_strategy_status
self.main_engine.get_strategy_pos = self.get_strategy_pos
self.main_engine.compare_pos = self.compare_pos
@ -195,6 +196,7 @@ class CtaEngine(BaseEngine):
self.main_engine.reload_strategy = self.reload_strategy
self.main_engine.save_strategy_data = self.save_strategy_data
self.main_engine.save_strategy_snapshot = self.save_strategy_snapshot
self.main_engine.clean_strategy_cache = self.clean_strategy_cache
# 注册到远程服务调用
if self.main_engine.rpc_service:
@ -209,6 +211,7 @@ class CtaEngine(BaseEngine):
self.main_engine.rpc_service.register(self.main_engine.reload_strategy)
self.main_engine.rpc_service.register(self.main_engine.save_strategy_data)
self.main_engine.rpc_service.register(self.main_engine.save_strategy_snapshot)
self.main_engine.rpc_service.register(self.main_engine.clean_strategy_cache)
def process_timer_event(self, event: Event):
""" 处理定时器事件"""
@ -1246,6 +1249,15 @@ class CtaEngine(BaseEngine):
self.write_error(u'保存策略{}数据异常:'.format(strategy_name, str(ex)))
self.write_error(traceback.format_exc())
def clean_strategy_cache(self, strategy_name):
"""清除策略K线缓存文件"""
cache_file = os.path.abspath(os.path.join(self.get_data_path(), f'{strategy_name}_klines.pkb2'))
if os.path.exists(cache_file):
self.write_log(f'移除策略缓存文件:{cache_file}')
os.remove(cache_file)
else:
self.write_log(f'策略缓存文件不存在:{cache_file}')
def get_strategy_snapshot(self, strategy_name):
"""实时获取策略的K线切片比较耗性能"""
strategy = self.strategies.get(strategy_name, None)
@ -1380,7 +1392,8 @@ class CtaEngine(BaseEngine):
self.class_module_map[class_name] = module_name
return True
except: # noqa
msg = f"策略文件{module_name}加载失败,触发异常:\n{traceback.format_exc()}"
account = self.engine_config.get('accountid', '')
msg = f"cta_stock:{account}策略文件{module_name}加载失败,触发异常:\n{traceback.format_exc()}"
self.write_log(msg=msg, level=logging.CRITICAL)
return False
@ -1615,8 +1628,8 @@ class CtaEngine(BaseEngine):
pos_compare_result += '\n{}: '.format(vt_symbol)
# 多单不一致
if round(symbol_pos['策略多单'], 7) != round(symbol_pos['账号多单'], 7):
msg = '{}多单[账号({}), 策略{},共({})], ' \
.format(vt_symbol,
msg = '{}[账号({}), 策略{},共({})], ' \
.format(self.get_name(vt_symbol),
symbol_pos['账号多单'],
symbol_pos['多单策略'],
symbol_pos['策略多单'])

View File

@ -574,6 +574,7 @@ class CtaStockTemplate(CtaTemplate):
:return:
"""
self.write_log(u'init_position(),初始化持仓')
subscribe_symbols = []
if len(self.gt.dn_grids) <= 0:
# 加载已开仓的多数据网格JSON
@ -615,6 +616,13 @@ class CtaStockTemplate(CtaTemplate):
self.positions.update({lg.vt_symbol: pos})
if len(lg.vt_symbol) > 0 and lg.vt_symbol not in self.vt_symbols and lg.vt_symbol not in subscribe_symbols:
subscribe_symbols.append(lg.vt_symbol)
for vt_symbol in subscribe_symbols:
self.write_log(f'{vt_symbol}不在配置清单中,添加行情订阅')
self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=vt_symbol)
self.gt.save()
self.display_grids()
@ -1301,6 +1309,18 @@ class CtaStockTemplate(CtaTemplate):
"""显示事务的过程记录=》 log"""
if not self.inited:
return
if not self.backtesting:
for vt_symbol in self.vt_symbols:
name = self.cta_engine.get_name(vt_symbol)
price = self.cta_engine.get_price(vt_symbol)
self.write_log('%-11s'%vt_symbol + '[%-12s'%name + f'] 当前价格: {price}')
self.write_log(f'当前委托状态:{self.entrust}')
if len(self.active_orders) > 0:
self.write_log('当前活动订单:{}'.format(self.active_orders))
if hasattr(self, 'policy'):
policy = getattr(self, 'policy')
op = getattr(policy, 'to_json', None)

View File

@ -189,6 +189,7 @@ class CtaEngine(BaseEngine):
register the funcs to main_engine
:return:
"""
self.main_engine.get_name = self.get_name
self.main_engine.get_strategy_status = self.get_strategy_status
self.main_engine.get_strategy_pos = self.get_strategy_pos
self.main_engine.compare_pos = self.compare_pos
@ -200,6 +201,7 @@ class CtaEngine(BaseEngine):
self.main_engine.reload_strategy = self.reload_strategy
self.main_engine.save_strategy_data = self.save_strategy_data
self.main_engine.save_strategy_snapshot = self.save_strategy_snapshot
self.main_engine.clean_strategy_cache = self.clean_strategy_cache
# 注册到远程服务调用
if self.main_engine.rpc_service:
@ -214,6 +216,7 @@ class CtaEngine(BaseEngine):
self.main_engine.rpc_service.register(self.main_engine.reload_strategy)
self.main_engine.rpc_service.register(self.main_engine.save_strategy_data)
self.main_engine.rpc_service.register(self.main_engine.save_strategy_snapshot)
self.main_engine.rpc_service.register(self.main_engine.clean_strategy_cache)
def process_timer_event(self, event: Event):
""" 处理定时器事件"""
@ -794,6 +797,15 @@ class CtaEngine(BaseEngine):
return True
@lru_cache()
def get_name(self, vt_symbol: str):
"""查询合约的name"""
contract = self.main_engine.get_contract(vt_symbol)
if contract is None:
self.write_error(f'查询不到{vt_symbol}合约信息')
return vt_symbol
return contract.name
@lru_cache()
def get_size(self, vt_symbol: str):
"""查询合约的size"""
@ -1258,6 +1270,15 @@ class CtaEngine(BaseEngine):
self.write_error(u'保存策略{}数据异常:'.format(strategy_name, str(ex)))
self.write_error(traceback.format_exc())
def clean_strategy_cache(self, strategy_name):
"""清除策略K线缓存文件"""
cache_file = os.path.abspath(os.path.join(self.get_data_path(), f'{strategy_name}_klines.pkb2'))
if os.path.exists(cache_file):
self.write_log(f'移除策略缓存文件:{cache_file}')
os.remove(cache_file)
else:
self.write_log(f'策略缓存文件不存在:{cache_file}')
def get_strategy_snapshot(self, strategy_name):
"""实时获取策略的K线切片比较耗性能"""
strategy = self.strategies.get(strategy_name, None)

View File

@ -7,7 +7,7 @@ import traceback
import zlib
from abc import ABC
from copy import copy
from copy import copy,deepcopy
from typing import Any, Callable
from logging import INFO, ERROR
from datetime import datetime
@ -913,7 +913,8 @@ class CtaProTemplate(CtaTemplate):
return
none_mi_price = max(none_mi_tick.last_price, none_mi_tick.bid_price_1)
grid = copy(none_mi_grid)
grid = deepcopy(none_mi_grid)
grid.id = str(uuid.uuid1())
# 委托卖出非主力合约
vt_orderids = self.sell(price=none_mi_price,
@ -925,7 +926,6 @@ class CtaProTemplate(CtaTemplate):
self.write_log(f'切换合约,委托卖出非主力合约{none_mi_symbol}持仓:{none_mi_grid.volume}')
# 添加买入主力合约
grid.id = str(uuid.uuid1())
grid.snapshot.update({'mi_symbol': self.vt_symbol, 'open_price': self.cur_mi_price})
self.gt.dn_grids.append(grid)
@ -982,7 +982,8 @@ class CtaProTemplate(CtaTemplate):
return
none_mi_price = max(none_mi_tick.last_price, none_mi_tick.bid_price_1)
grid = copy(none_mi_grid)
grid = deepcopy(none_mi_grid)
grid.id = str(uuid.uuid1())
# 委托平空非主力合约
vt_orderids = self.cover(price=none_mi_price,
volume=none_mi_grid.volume,
@ -1128,7 +1129,7 @@ class CtaProFutureTemplate(CtaProTemplate):
def on_trade(self, trade: TradeData):
"""交易更新"""
self.write_log(u'{},交易更新:{},当前持仓:{} '
self.write_log(u'{},交易更新事件:{},当前持仓:{} '
.format(self.cur_datetime,
trade.__dict__,
self.position.pos))
@ -1175,7 +1176,7 @@ class CtaProFutureTemplate(CtaProTemplate):
if order_info:
volume = order_info.get('volume')
if volume != order.volume:
self.write_log(f'调整{order.vt_orderid} volume:{volume}=>{order.volume}')
self.write_log(f'修正order被拆单得情况调整{order.vt_orderid} volume:{volume}=>{order.volume}')
order_info.update({'volume': order.volume})
def on_order(self, order: OrderData):
@ -1222,7 +1223,7 @@ class CtaProFutureTemplate(CtaProTemplate):
:param order:
:return:
"""
self.write_log(u'{},委托单:{}全部完成'.format(order.time, order.vt_orderid))
self.write_log(u'委托单全部完成:{}'.format(order.__dict__))
order_info = self.active_orders[order.vt_orderid]
# 通过vt_orderid找到对应的网格
@ -1265,7 +1266,8 @@ class CtaProFutureTemplate(CtaProTemplate):
self.write_log(f'剩余委托单号:{grid.order_ids}')
self.gt.save()
else:
self.write_error(f'on_trade找不到对应grid')
# 在策略得活动订单中,移除
self.active_orders.pop(order.vt_orderid, None)
@ -1380,7 +1382,6 @@ class CtaProFutureTemplate(CtaProTemplate):
self.gt.save()
elif old_order['direction'] == Direction.SHORT and order_type == OrderType.FAK:
# 删除旧的委托记录
self.write_log(u'移除旧的委托记录:{}'.format(order.vt_orderid))
@ -1771,12 +1772,17 @@ class CtaProFutureTemplate(CtaProTemplate):
if self.activate_today_lock:
self.write_log(u'昨仓多单:{},没有今仓,满足条件,直接平昨仓'.format(grid_pos.long_yd))
sell_price = self.cta_engine.get_price(sell_symbol)
if sell_price is None:
self.write_error(f'暂时不能获取{sell_symbol}价格,不能平仓')
return False
# 实盘使用对价
if not self.backtesting:
sell_tick = self.cta_engine.get_tick(sell_symbol)
if sell_tick and 0 < sell_tick.bid_price_1 < sell_price:
sell_price = sell_tick.bid_price_1
# 发出平多委托
if grid.traded_volume > 0:
grid.volume -= grid.traded_volume
@ -1871,6 +1877,12 @@ class CtaProFutureTemplate(CtaProTemplate):
self.write_error(f'暂时没有{cover_symbol}行情,不能执行平仓')
return False
# 实盘使用对价
if not self.backtesting:
cover_tick = self.cta_engine.get_tick(cover_symbol)
if cover_tick and 0 < cover_price < cover_tick.ask_price_1 :
cover_price = cover_tick.ask_price_1
# 发出cover委托
if grid.traded_volume > 0:
grid.volume -= grid.traded_volume

View File

@ -35,7 +35,7 @@ NIGHT_MARKET_ZZ = {'TA': 2, 'JR': 1, 'OI': 0, 'RO': 1, 'PM': 1, 'WH': 1, 'CF': 5
# 大商所夜盘9:00~10:15, 10:30~11:30, 13:30~15:00, 21:00 ~23:00
NIGHT_MARKET_DL = {'V': 5, 'L': 5, 'BB': 0.05, 'I': 0.5, 'FB': 0.05, 'C': 1, 'PP': 1, 'A': 1, 'B': 1, 'M': 1, 'Y': 2,
'P': 2,
'JM': 0.5, 'J': 0.5, 'EG': 1}
'JM': 0.5, 'J': 0.5, 'EG': 1, 'EB': 1}
# 中金日盘9:15 ~11:30, 13:00~15:15
MARKET_ZJ = {'IC': 0.2, 'IF': 0.2, 'IH': 0.2, 'T': 0.005, 'TF': 0.005, 'TS': 0.005}

View File

@ -466,8 +466,8 @@ class BinanceRestApi(RestClient):
frozen=0,
gateway_name=self.gateway_name
)
self.gateway.on_account(account)
put_event_account = True
# ==> position event
for account_data in data["balances"]:
pos = PositionData(
@ -488,7 +488,27 @@ class BinanceRestApi(RestClient):
self.positions.update({pos.symbol: pos})
self.gateway.on_position(copy(pos))
self.gateway.write_log("账户资金查询成功")
if pos.symbol == 'USDT':
pos.cur_price = 1
account.balance += pos.volume
else:
price = self.gateway.prices.get(f'{pos.symbol}USDT.{pos.exchange.value}', None)
if price is None:
req = SubscribeRequest(
symbol=f'{pos.symbol}USDT',
exchange=pos.exchange
)
self.gateway.subscribe(req)
put_event_account = False
else:
pos.cur_price = price
account.balance += pos.volume * price
if put_event_account:
self.gateway.on_account(account)
#self.gateway.write_log("账户资金查询成功")
def on_query_order(self, data, request):
""""""

View File

@ -330,7 +330,13 @@ class IbApi(EWrapper):
if exchange is Exchange.IDEALPRO:
tick.last_price = (tick.bid_price_1 + tick.ask_price_1) / 2
tick.datetime = datetime.now()
self.gateway.on_tick(copy(tick))
# 有些错误数据过来例如ask_price1 = -1.0
if tick.ask_price_1 < tick.last_price:
return
if tick.bid_price_1 < min(tick.last_price - 10 * contract.pricetick, tick.last_price * 0.8):
return
if tick.last_price != 0:
self.gateway.on_tick(copy(tick))
def tickSize(
self, reqId: TickerId, tickType: TickType, size: int
@ -346,8 +352,10 @@ class IbApi(EWrapper):
tick = self.ticks[reqId]
name = TICKFIELD_IB2VT[tickType]
setattr(tick, name, size)
self.gateway.on_tick(copy(tick))
if tick.ask_volume_1 == 0 or tick.bid_volume_1 == 0:
return
if tick.last_price != 0:
self.gateway.on_tick(copy(tick))
def tickString(
self, reqId: TickerId, tickType: TickType, value: str
@ -362,8 +370,12 @@ class IbApi(EWrapper):
tick = self.ticks[reqId]
tick.datetime = datetime.fromtimestamp(int(value))
self.gateway.on_tick(copy(tick))
if tick.ask_price_1 < tick.last_price:
return
if tick.bid_price_1 < tick.last_price * 0.8:
return
if tick.last_price != 0:
self.gateway.on_tick(copy(tick))
def orderStatus( # pylint: disable=invalid-name
self,
@ -381,6 +393,7 @@ class IbApi(EWrapper):
):
"""
Callback of order status update.
委托单状态变化
"""
super().orderStatus(
orderId,
@ -398,6 +411,10 @@ class IbApi(EWrapper):
orderid = str(orderId)
order = self.orders.get(orderid, None)
if order is None:
self.gateway.write_error(f'无法获取{orderid}在本地的缓存委托单')
return
order.traded = filled
# To filter PendingCancel status
@ -422,8 +439,11 @@ class IbApi(EWrapper):
)
orderid = str(orderId)
# ==> 生成 xxxx-HKD-STK等格式的合约名称
symbol = generate_symbol(ib_contract)
order = OrderData(
symbol=ib_contract.conId,
symbol=symbol,
exchange=EXCHANGE_IB2VT.get(
ib_contract.exchange, ib_contract.exchange),
type=ORDERTYPE_IB2VT[ib_order.orderType],
@ -475,6 +495,7 @@ class IbApi(EWrapper):
):
"""
Callback of position update.
持仓更新
"""
super().updatePortfolio(
contract,
@ -527,14 +548,17 @@ class IbApi(EWrapper):
def contractDetails(self, reqId: int, contractDetails: ContractDetails): # pylint: disable=invalid-name
"""
Callback of contract data update.
合约数据更新
"""
super().contractDetails(reqId, contractDetails)
# Generate symbol from ib contract details
ib_contract = contractDetails.contract
# 合约乘数
if not ib_contract.multiplier:
ib_contract.multiplier = 1
# ==> 生成 xxxx-HKD-STK等格式的合约名称
symbol = generate_symbol(ib_contract)
# Generate contract
@ -562,19 +586,22 @@ class IbApi(EWrapper):
): # pylint: disable=invalid-name
"""
Callback of trade data update.
交易数据更新
"""
super().execDetails(reqId, contract, execution)
# today_date = datetime.now().strftime("%Y%m%d")
dt = datetime.strptime(execution.time, "%Y%m%d %H:%M:%S")
trade = TradeData(
symbol=contract.conId,
symbol=generate_symbol(contract),
exchange=EXCHANGE_IB2VT.get(contract.exchange, contract.exchange),
orderid=str(execution.orderId),
tradeid=str(execution.execId),
direction=DIRECTION_IB2VT[execution.side],
price=execution.price,
volume=execution.shares,
time=datetime.strptime(execution.time, "%Y%m%d %H:%M:%S"),
datetime=dt,
time=dt.strftime('%H:%M:%S'),
gateway_name=self.gateway_name,
)
@ -583,6 +610,7 @@ class IbApi(EWrapper):
def managedAccounts(self, accountsList: str):
"""
Callback of all sub accountid.
所有子账号信息更新
"""
super().managedAccounts(accountsList)
@ -596,6 +624,7 @@ class IbApi(EWrapper):
def historicalData(self, reqId: int, ib_bar: IbBarData):
"""
Callback of history data update.
历史行情
"""
dt = datetime.strptime(ib_bar.date, "%Y%m%d %H:%M:%S")
@ -617,6 +646,7 @@ class IbApi(EWrapper):
def historicalDataEnd(self, reqId: int, start: str, end: str):
"""
Callback of history data finished.
行情数据推送结束
"""
self.history_condition.acquire()
self.history_condition.notify()
@ -625,6 +655,7 @@ class IbApi(EWrapper):
def connect(self, host: str, port: int, clientid: int, account: str):
"""
Connect to TWS.
连接本地TWS
"""
if self.status:
self.gateway.write_log(f'已连接,不再重连')
@ -650,6 +681,7 @@ class IbApi(EWrapper):
def subscribe(self, req: SubscribeRequest):
"""
Subscribe tick data update.
订阅行情
"""
if not self.status:
return
@ -659,6 +691,7 @@ class IbApi(EWrapper):
return
# Extract ib contract detail
# vn symbol => ib contract
ib_contract = generate_ib_contract(req.symbol, req.exchange)
if not ib_contract:
self.gateway.write_log("代码解析失败,请检查格式是否正确")
@ -684,6 +717,7 @@ class IbApi(EWrapper):
def send_order(self, req: OrderRequest):
"""
Send a new order.
发送委托
"""
if not self.status:
return ""
@ -698,6 +732,7 @@ class IbApi(EWrapper):
self.orderid += 1
# vn symbol -> ib contract
ib_contract = generate_ib_contract(req.symbol, req.exchange)
if not ib_contract:
return ""
@ -719,12 +754,14 @@ class IbApi(EWrapper):
self.client.reqIds(1)
order = req.create_order_data(str(self.orderid), self.gateway_name)
order.datetime = datetime.now()
self.gateway.on_order(order)
return order.vt_orderid
def cancel_order(self, req: CancelRequest):
"""
Cancel an existing order.
撤单
"""
if not self.status:
return
@ -738,7 +775,7 @@ class IbApi(EWrapper):
self.reqid += 1
# 转换为ib的合约
# vn symbol => ib的合约
ib_contract = generate_ib_contract(req.symbol, req.exchange)
if req.end:
@ -782,15 +819,21 @@ class IbApi(EWrapper):
return history
def load_contract_data(self):
""""""
f = shelve.open(self.data_filepath)
self.contracts = f.get("contracts", {})
f.close()
"""
加载本地缓存合约数据
:return:
"""
try:
f = shelve.open(self.data_filepath)
self.contracts = f.get("contracts", {})
f.close()
for contract in self.contracts.values():
self.gateway.on_contract(contract)
for contract in self.contracts.values():
self.gateway.on_contract(contract)
self.gateway.write_log("本地缓存合约信息加载成功")
self.gateway.write_log("本地缓存合约信息加载成功")
except Exception as ex:
self.gateway.write_error(f'本地缓存合约信息加载失败:{str(ex)}')
def save_contract_data(self):
""""""

View File

@ -308,7 +308,7 @@ class XtpMdApi(MdApi):
tick.ask_volume_1, tick.ask_volume_2, tick.ask_volume_3, tick.ask_volume_4, tick.ask_volume_5 = data["ask_qty"][0:5]
tick.name = get_vt_symbol_name(tick.vt_symbol)
self.gateway.prices.update({tick.vt_symbol: tick.last_price})
#self.gateway.prices.update({tick.vt_symbol: tick.last_price})
self.gateway.on_tick(tick)
def onSubOrderBook(self, data: dict, error: dict, last: bool) -> None:

View File

@ -29,7 +29,8 @@ class OffsetConverter:
# return
holding = self.get_position_holding(position.vt_symbol, position.gateway_name)
holding.update_position(position)
if holding:
holding.update_position(position)
def update_trade(self, trade: TradeData) -> None:
""""""
@ -66,6 +67,8 @@ class OffsetConverter:
holding = self.holdings.get(k, None)
if not holding:
contract = self.main_engine.get_contract(vt_symbol)
if contract is None:
return None
holding = PositionHolding(contract)
self.holdings[k] = holding
return holding

View File

@ -463,6 +463,7 @@ class OmsEngine(BaseEngine):
# 更新自定义合约
custom_contracts = self.get_all_custom_contracts()
self.get_all_custom_contracts(rtn_setting=True)
for contract in custom_contracts.values():
# 更新合约缓存
@ -481,7 +482,7 @@ class OmsEngine(BaseEngine):
spd_mapping_list = self.symbol_spd_maping.get(symbol, [])
# 更新映射 symbol => spd_symbol
if contract.symbol not in spd_mapping_list:
if (not contract.symbol.endswith('.SPD')) and contract.symbol not in spd_mapping_list:
spd_mapping_list.append(contract.symbol)
self.symbol_spd_maping.update({symbol: spd_mapping_list})
@ -556,6 +557,9 @@ class OmsEngine(BaseEngine):
position = event.data
self.positions[position.vt_positionid] = position
if position.exchange != Exchange.SPD:
self.create_spd_position_event(position.symbol, position.direction)
def reverse_direction(self, direction):
"""返回反向持仓"""
if direction == Direction.LONG:
@ -586,28 +590,34 @@ class OmsEngine(BaseEngine):
leg2_ratio = spd_setting.get('leg2_ratio', 1)
# 找出leg1leg2的持仓并判断出spd的方向
spd_pos = None
if leg1_symbol == symbol:
k1 = f"{leg1_contract.gateway_name}.{leg1_contract.vt_symbol}.{direction.value}"
leg1_pos = self.positions.get(k1)
k2 = f"{leg2_contract.gateway_name}.{leg2_contract.vt_symbol}.{self.reverse_direction(direction).value}"
leg2_pos = self.positions.get(k2)
spd_direction = direction
k3 = f"{spd_contract.gateway_name}.{spd_symbol}.{Exchange.SPD.value}.{spd_direction.value}"
spd_pos = self.positions.get(k3)
elif leg2_symbol == symbol:
k1 = f"{leg1_contract.gateway_name}.{leg1_contract.vt_symbol}.{self.reverse_direction(direction).value}"
leg1_pos = self.positions.get(k1)
k2 = f"{leg2_contract.gateway_name}.{leg2_contract.vt_symbol}.{direction.value}"
leg2_pos = self.positions.get(k2)
spd_direction = self.reverse_direction(direction)
k3 = f"{spd_contract.gateway_name}.{spd_symbol}.{Exchange.SPD.value}.{spd_direction.value}"
spd_pos = self.positions.get(k3)
else:
continue
if leg1_pos is None or leg2_pos is None or leg1_pos.volume ==0 or leg2_pos.volume == 0:
if leg1_pos is None or leg2_pos is None: # or leg1_pos.volume ==0 or leg2_pos.volume == 0:
continue
# 根据leg1/leg2的volume ratio计算出最小spd_volume
spd_volume = min(int(leg1_pos.volume/leg1_ratio), int(leg2_pos.volume/leg2_ratio))
if spd_volume <= 0:
if spd_volume <= 0 and spd_pos is None:
continue
if spd_setting.get('is_ratio', False) and leg2_pos.price > 0:
spd_price = 100 * (leg2_pos.price * leg1_ratio) / (leg2_pos.price * leg2_ratio)
elif spd_setting.get('is_spread', False):
@ -617,6 +627,7 @@ class OmsEngine(BaseEngine):
spd_pos = PositionData(
gateway_name=spd_contract.gateway_name,
accountid=leg1_pos.accountid,
symbol=spd_symbol,
exchange=Exchange.SPD,
direction=spd_direction,

View File

@ -90,6 +90,8 @@ class BaseGateway(ABC):
self.gateway_name: str = gateway_name
self.logger = None
self.accountid = ""
self.create_logger()
# 所有订阅on_bar的都会添加
@ -124,6 +126,7 @@ class BaseGateway(ABC):
Tick event push.
Tick event of a specific vt_symbol is also pushed.
"""
self.prices.update({tick.vt_symbol: tick.last_price})
self.on_event(EVENT_TICK, tick)
# self.on_event(EVENT_TICK + tick.vt_symbol, tick)

View File

@ -434,7 +434,6 @@ class KLineWidget(KeyWraper):
# 交易事务有关的线段
self.list_trans = [] # 交易事务( {'start_time','end_time','tns_type','start_price','end_price','start_x','end_x','completed'}
self.list_trans_lines = []
# 交易记录相关的箭头
self.list_trade_arrow = [] # 交易图标 list
@ -446,6 +445,9 @@ class KLineWidget(KeyWraper):
self.x_t_markup_map = OrderedDict() # x轴与标记的映射
self.t_markup_dict = OrderedDict() # t 时间的标记
# 缠论相关的线段
self.list_bi = []
# 所有K线上指标
self.main_color_pool = deque(['red', 'green', 'yellow', 'white'])
self.main_indicator_data = {} # 主图指标数据字典key是指标value是list
@ -935,7 +937,6 @@ class KLineWidget(KeyWraper):
self.t_trade_dict = OrderedDict()
self.list_trans = []
self.list_trans_lines = []
self.list_markup = []
self.x_t_markup_map = OrderedDict()
@ -1115,7 +1116,7 @@ class KLineWidget(KeyWraper):
if direction == Direction.LONG:
if offset == Offset.OPEN:
# buy
arrow = pg.ArrowItem(pos=(x, price), angle=135, brush=None, pen={'color': 'r', 'width': 1},
arrow = pg.ArrowItem(pos=(x, price), angle=135, brush=None, pen={'color': 'y', 'width': 2},
tipAngle=30, baseAngle=20, tailLen=10, tailWidth=2)
# d = {
# "pos": (x, price),
@ -1129,17 +1130,17 @@ class KLineWidget(KeyWraper):
# arrow.setData([d])
else:
# cover
arrow = pg.ArrowItem(pos=(x, price), angle=0, brush=(255, 0, 0), pen=None, headLen=20, headWidth=20,
arrow = pg.ArrowItem(pos=(x, price), angle=0, brush='y', pen=None, headLen=20, headWidth=20,
tailLen=10, tailWidth=2)
# 空信号
elif direction == Direction.SHORT:
if offset == Offset.CLOSE:
# sell
arrow = pg.ArrowItem(pos=(x, price), angle=0, brush=(0, 255, 0), pen=None, headLen=20, headWidth=20,
arrow = pg.ArrowItem(pos=(x, price), angle=0, brush='g', pen=None, headLen=20, headWidth=20,
tailLen=10, tailWidth=2)
else:
# short
arrow = pg.ArrowItem(pos=(x, price), angle=-135, brush=None, pen={'color': 'g', 'width': 1},
arrow = pg.ArrowItem(pos=(x, price), angle=-135, brush=None, pen={'color': 'g', 'width': 2},
tipAngle=30, baseAngle=20, tailLen=10, tailWidth=2)
if arrow:
self.pi_main.addItem(arrow)
@ -1374,6 +1375,84 @@ class KLineWidget(KeyWraper):
self.add_markup(t_value=t_value, price=price, txt=markup_text)
def add_bi(self, df_bi, color='b', style= None):
"""
添加缠论_笔_画线
# direction,(1/-1)start, end, high, low
# 笔: color = 'y', style: QtCore.Qt.DashLine
# 段: color = 'b',
:return:
"""
if len(self.datas) == 0 or len(df_bi) == 0:
print(u'No datas exist', file=sys.stderr)
return
for index, row in df_bi.iterrows():
start_time = row['start']
if not isinstance(start_time, datetime) and isinstance(start_time, str):
start_time = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S')
end_time = row['end']
if not isinstance(end_time, datetime) and isinstance(end_time, str):
end_time = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S')
start_x = self.axisTime.get_x_by_time(start_time)
end_x = self.axisTime.get_x_by_time(end_time)
if int(row['direction']) == 1:
pos = np.array([[start_x, row['low']], [end_x, row['high']]])
elif int(row['direction']) == -1:
pos = np.array([[start_x, row['high']], [end_x, row['low']]])
else:
continue
if style:
pen = pg.mkPen({'color': color, 'width': 1, 'style': QtCore.Qt.DashLine})
else:
pen = pg.mkPen({'color': color, 'width': 1})
bi = pg.GraphItem(pos=pos, adj=np.array([[0, 1]]), pen=pen)
self.pi_main.addItem(bi)
def add_zs(self, df_zs, color='y'):
"""
添加缠论中枢_画线
# direction,(1/-1)start, end, high, low
# 笔中枢: color ='y'
# 段中枢: color = 'b'
:return:
"""
if len(self.datas) == 0 or len(df_zs) == 0:
print(u'No datas exist', file=sys.stderr)
return
for index,row in df_zs.iterrows():
start_time = row['start']
if not isinstance(start_time, datetime) and isinstance(start_time, str):
start_time = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S')
end_time = row['end']
if not isinstance(end_time, datetime) and isinstance(end_time, str):
end_time = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S')
start_x = self.axisTime.get_x_by_time(start_time)
end_x = self.axisTime.get_x_by_time(end_time)
pos_top = np.array([[start_x, row['high']], [end_x, row['high']]])
pos_buttom = np.array([[start_x, row['low']], [end_x, row['low']]])
pos_left = np.array([[start_x, row['high']], [start_x, row['low']]])
pos_right = np.array([[end_x, row['high']], [end_x, row['low']]])
pen = pg.mkPen({'color': color, 'width': 1})
for pos in [pos_top, pos_buttom, pos_left, pos_right]:
line = pg.GraphItem(pos=pos, adj=np.array([[0, 1]]), pen=pen)
self.pi_main.addItem(line)
def loadData(self, df_datas, main_indicators=[], sub_indicators=[]):
"""
载入pandas.DataFrame数据
@ -1563,6 +1642,34 @@ class GridKline(QtWidgets.QWidget):
include_list=kline_setting.get('dist_include_list', []),
exclude_list=['buy', 'short', 'sell', 'cover'])
# 笔
bi_file = kline_setting.get('bi_file', None)
if bi_file and os.path.exists(bi_file):
print(f'loading {bi_file}')
df_bi = pd.read_csv(bi_file)
self.kline_dict[kline_name].add_bi(df_bi, color='y', style= QtCore.Qt.DashLine)
# 段
duan_file = kline_setting.get('duan_file', None)
if duan_file and os.path.exists(duan_file):
print(f'loading {duan_file}')
df_duan = pd.read_csv(duan_file)
self.kline_dict[kline_name].add_bi(df_duan, color='b')
# 笔中枢
bi_zs_file = kline_setting.get('bi_zs_file', None)
if bi_zs_file and os.path.exists(bi_zs_file):
print(f'loading {bi_zs_file}')
df_bi_zs = pd.read_csv(bi_zs_file)
self.kline_dict[kline_name].add_zs(df_bi_zs, color='y')
# 段中枢
duan_zs_file = kline_setting.get('duan_zs_file', None)
if duan_zs_file and os.path.exists(duan_zs_file):
print(f'loading {duan_zs_file}')
df_duan_zs = pd.read_csv(duan_zs_file)
self.kline_dict[kline_name].add_zs(df_duan_zs, color='b')
except Exception as ex:
traceback.print_exc()
QtWidgets.QMessageBox.warning(self, 'Exception', u'Load data Exception',