[增强] 账号/策略仓位比对,市价单委托

This commit is contained in:
msincenselee 2020-04-06 21:24:23 +08:00
parent 941a6bcf01
commit f3a1fa13e3
7 changed files with 134 additions and 33 deletions

View File

@ -449,7 +449,15 @@ class CtaFutureTemplate(CtaTemplate):
self.policy = None # 事务执行组件
self.gt = None # 网格交易组件
self.klines = {} # K线组件字典: kline_name: kline
self.price_tick = 1 # 商品的最小价格跳动
self.symbol_size = 10 # 商品得合约乘数
self.margin_rate = 0.1 # 商品的保证金
self.volumn_tick = 1 # 商品最小成交数量
self.cancel_seconds = 120 # 撤单时间(秒)
self.activate_market = False
self.order_type = OrderType.LIMIT
self.backtesting = False
self.cur_datetime: datetime = None # 当前Tick时间
self.cur_tick: TickData = None # 最新的合约tick( vt_symbol)
@ -488,6 +496,10 @@ class CtaFutureTemplate(CtaTemplate):
if self.activate_market:
self.write_log(f'{self.strategy_name}使用市价单委托方式')
self.order_type = OrderType.MARKET
else:
if not self.backtesting:
self.cancel_seconds = 10
self.write_log(f'实盘撤单时间10秒')
def sync_data(self):
"""同步更新数据"""
@ -789,6 +801,9 @@ class CtaFutureTemplate(CtaTemplate):
if order.offset != Offset.OPEN:
grid.open_status = False
grid.close_status = True
if grid.volume < order.traded:
self.write_log(f'网格平仓数量{grid.volume},小于委托单成交数量:{order.volume},修正为:{order.volume}')
grid.volume = order.traded
self.write_log(f'{grid.direction.value}单已平仓完毕,order_price:{order.price}'
+ f',volume:{order.volume}')
@ -847,7 +862,7 @@ class CtaFutureTemplate(CtaTemplate):
pre_traded_volume = grid.traded_volume
grid.traded_volume = round(grid.traded_volume + order.traded, 7)
self.write_log(f'撤单中部分开仓:{order.traded} + 原已成交:{pre_traded_volume} => {grid.traded_volume}')
if len(grid.order_ids):
if len(grid.order_ids)==0:
grid.order_status = False
if grid.traded_volume > 0:
pre_volume = grid.volume
@ -953,9 +968,13 @@ class CtaFutureTemplate(CtaTemplate):
事务开多仓
:return:
"""
if self.backtesting:
buy_price = self.cur_price + self.price_tick
else:
buy_price = self.cur_tick.ask_price_1
vt_orderids = self.buy(vt_symbol=self.vt_symbol,
price=self.cur_price,
price=buy_price,
volume=grid.volume,
order_type=self.order_type,
order_time=self.cur_datetime,
@ -976,15 +995,18 @@ class CtaFutureTemplate(CtaTemplate):
事务开空仓
:return:
"""
if self.backtesting:
short_price = self.cur_price - self.price_tick
else:
short_price = self.cur_tick.bid_price_1
vt_orderids = self.short(vt_symbol=self.vt_symbol,
price=self.cur_price,
price=short_price,
volume=grid.volume,
order_type=self.order_type,
order_time=self.cur_datetime,
grid=grid)
if len(vt_orderids) > 0:
self.write_log(u'创建{}事务空单,指数开空价:{},主力开仓价:{},数量:{},止盈价:{},止损价:{}'
self.write_log(u'创建{}事务空单,事务开空价:{},当前价:{},数量:{},止盈价:{},止损价:{}'
.format(grid.type, grid.open_price, self.cur_price, grid.volume, grid.close_price,
grid.stop_price))
self.gt.up_grids.append(grid)
@ -1154,6 +1176,8 @@ class CtaFutureTemplate(CtaTemplate):
self.write_log(u'撤单失败,更新状态为撤单成功')
order_info.update({'status': Status.CANCELLED})
self.active_orders.update({vt_orderid: order_info})
if order_grid and vt_orderid in order_grid.order_ids:
order_grid.order_ids.remove(vt_orderid)
continue
@ -1272,13 +1296,40 @@ class CtaFutureTemplate(CtaTemplate):
self.account_pos = self.cta_engine.get_position(vt_symbol=self.vt_symbol, direction=Direction.NET)
if self.account_pos:
self.write_log(f'账号{self.vt_symbol}持仓:{self.account_pos.volume}, 冻结:{self.account_pos.frozen}, 盈亏:{self.account_pos.pnl}')
up_grids_info = self.gt.to_str(direction=Direction.SHORT)
if len(self.gt.up_grids) > 0:
self.write_log(up_grids_info)
dn_grids_info = self.gt.to_str(direction=Direction.LONG)
if len(self.gt.dn_grids) > 0:
self.write_log(dn_grids_info)
up_grids_info = ""
for grid in list(self.gt.up_grids):
if not grid.open_status and grid.order_status:
up_grids_info += f'平空中: [已平:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}\n'
if len(grid.order_ids) > 0:
up_grids_info += f'委托单号:{grid.order_ids}'
continue
if grid.open_status and not grid.order_status:
up_grids_info += f'持空中: [数量:{grid.volume}\n, 开仓时间:{grid.open_time}'
continue
if not grid.open_status and grid.order_status:
up_grids_info += f'开空中: [已开:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}\n'
if len(grid.order_ids) > 0:
up_grids_info += f'委托单号:{grid.order_ids}'
dn_grids_info = ""
for grid in list(self.gt.dn_grids):
if not grid.open_status and grid.order_status:
up_grids_info += f'平多中: [已平:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}\n'
if len(grid.order_ids) > 0:
up_grids_info += f'委托单号:{grid.order_ids}'
continue
if grid.open_status and not grid.order_status:
up_grids_info += f'持多中: [数量:{grid.volume}\n, 开仓时间:{grid.open_time}'
continue
if not grid.open_status and grid.order_status:
up_grids_info += f'开多中: [已开:{grid.traded_volume} => 目标:{grid.volume}, 委托时间:{grid.order_time}\n'
if len(grid.order_ids) > 0:
up_grids_info += f'委托单号:{grid.order_ids}'
def display_tns(self):
"""显示事务的过程记录=》 log"""

View File

@ -4,9 +4,9 @@
# 华富资产
from vnpy.event import EventEngine
from vnpy.trader.constant import Exchange
from vnpy.trader.constant import Exchange # noqa
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.event import EVENT_TIMER
from vnpy.trader.event import EVENT_TIMER # noqa
APP_NAME = 'DispatchEngine'
@ -20,4 +20,3 @@ class DispatchEngine(BaseEngine):
self.main_engine = main_engine
self.event_engine = event_engine
self.create_logger(logger_name=APP_NAME)

View File

@ -38,7 +38,7 @@ class CtaGrid(object):
包括交易方向开仓价格平仓价格止损价格开仓状态平仓状态
"""
def __init__(self, **kwargs ):
def __init__(self, **kwargs):
self.id: str = kwargs.get('id', str(uuid.uuid1())) # gid
self.direction = kwargs.get('direction', None) # 交易方向LONG正套SHORT反套
@ -51,7 +51,7 @@ class CtaGrid(object):
self.volume = kwargs.get('volume', 0.0) # 开仓数量( 兼容数字货币 )
self.traded_volume = kwargs.get('traded_volume', 0.0) # 已成交数量 开仓时,为开仓数量,平仓时,为平仓数量
self.order_status = kwargs.get('order_status', False) # 挂单状态: True,已挂单False未挂单
self.order_ids = kwargs.get('order_ids',[]) # order_id list
self.order_ids = kwargs.get('order_ids', []) # order_id list
self.open_status = kwargs.get('open_status', False) # 开仓状态
self.close_status = kwargs.get('close_status', False) # 平仓状态
self.open_time = kwargs.get('open_time', None) # 开仓时间
@ -59,7 +59,7 @@ class CtaGrid(object):
self.lock_grid_ids = kwargs.get('lock_grid_ids', []) # 锁单的网格,[gid,gid]
self.reuse_count = kwargs.get('reuse_count', 0) # 重用次数0 平仓后是否删除)
self.type = kwargs.get('type', '') # 网格类型标签
self.snapshot = kwargs.get('snapshot',{}) # 切片数据,如记录开仓点时的某些状态数据
self.snapshot = kwargs.get('snapshot', {}) # 切片数据,如记录开仓点时的某些状态数据
def to_json(self):
"""输出JSON"""

View File

@ -209,7 +209,7 @@ class CtaLineBar(object):
if self.price_tick < 1:
exponent = decimal.Decimal(str(self.price_tick))
self.round_n = max(abs(exponent.as_tuple().exponent) + 2, 4)
self.write_log(f'round_n: {self.round_n}')
# 导入卡尔曼过滤器
if self.para_active_kf:
try:
@ -298,6 +298,8 @@ class CtaLineBar(object):
self.paramList.append('para_bias2_len')
self.paramList.append('para_bias3_len')
self.paramList.append('para_bd_len')
self.paramList.append('is_7x24')
self.paramList.append('price_tick')
@ -409,6 +411,8 @@ class CtaLineBar(object):
self.para_bias2_len = 0 # 乖离率观测周期2
self.para_bias3_len = 0 # 乖离率观测周期3
self.para_bd_len = 0 # 波段买卖观测长度
# K 线的相关计算结果数据
self.line_pre_high = [] # K线的前para_pre_len的的最高
self.line_pre_low = [] # K线的前para_pre_len的的最低
@ -628,6 +632,11 @@ class CtaLineBar(object):
self._rt_bias2 = None
self._rt_bias3 = None
# 波段买卖指标
self.line_bd_fast = [] # 波段快线
self.line_bd_slow = [] # 波段慢线
self.cur_bd_cross = 0 # 当前波段快线慢线金叉死叉, +金叉计算, - 死叉技术
def set_params(self, setting: dict = {}):
"""设置参数"""
d = self.__dict__
@ -800,6 +809,7 @@ class CtaLineBar(object):
self.__count_golden_section()
self.__count_area(bar)
self.__count_bias()
self.__count_bd()
self.export_to_csv(bar)
self.rt_executed = False
@ -1370,7 +1380,7 @@ class CtaLineBar(object):
count_len = min(self.para_ma1_len, self.bar_len)
barMa1 = ta.MA(self.close_array[-count_len:], count_len)[-1]
barMa1 = round(float(barMa1), self.round_n)
barMa1 = round(barMa1, self.round_n)
if len(self.line_ma1) > self.max_hold_bars:
del self.line_ma1[0]
@ -1388,7 +1398,7 @@ class CtaLineBar(object):
if self.para_ma2_len > 0:
count_len = min(self.para_ma2_len, self.bar_len)
barMa2 = ta.MA(self.close_array[-count_len:], count_len)[-1]
barMa2 = round(float(barMa2), self.round_n)
barMa2 = round(barMa2, self.round_n)
if len(self.line_ma2) > self.max_hold_bars:
del self.line_ma2[0]
@ -1406,7 +1416,7 @@ class CtaLineBar(object):
if self.para_ma3_len > 0:
count_len = min(self.para_ma3_len, self.bar_len)
barMa3 = ta.MA(self.close_array[-count_len:], count_len)[-1]
barMa3 = round(float(barMa3), self.round_n)
barMa3 = round(barMa3, self.round_n)
if len(self.line_ma3) > self.max_hold_bars:
del self.line_ma3[0]
@ -1508,7 +1518,7 @@ class CtaLineBar(object):
if count_len > 0:
close_ma_array = ta.MA(np.append(self.close_array[-count_len:], [self.line_bar[-1].close_price]),
count_len)
self._rt_ma1 = round(float(close_ma_array[-1]), self.round_n)
self._rt_ma1 = round(close_ma_array[-1], self.round_n)
# 计算斜率
if len(close_ma_array) > 2 and close_ma_array[-2] != 0:
@ -1520,7 +1530,7 @@ class CtaLineBar(object):
if count_len > 0:
close_ma_array = ta.MA(np.append(self.close_array[-count_len:], [self.line_bar[-1].close_price]),
count_len)
self._rt_ma2 = round(float(close_ma_array[-1]), self.round_n)
self._rt_ma2 = round(close_ma_array[-1], self.round_n)
# 计算斜率
if len(close_ma_array) > 2 and close_ma_array[-2] != 0:
@ -1532,7 +1542,7 @@ class CtaLineBar(object):
if count_len > 0:
close_ma_array = ta.MA(np.append(self.close_array[-count_len:], [self.line_bar[-1].close_price]),
count_len)
self._rt_ma3 = round(float(close_ma_array[-1]), self.round_n)
self._rt_ma3 = round(close_ma_array[-1], self.round_n)
# 计算斜率
if len(close_ma_array) > 2 and close_ma_array[-2] != 0:
@ -3985,10 +3995,49 @@ class CtaLineBar(object):
return self.line_bias3[-1]
return self._rt_bias3
def __count_bd(self):
"""计算波段快/慢线"""
#
if self.para_bd_len <= 0:
# 不计算
return
if len(self.line_bar) < 2 * self.para_bd_len:
return
mid4_ema_array = ta.EMA(self.mid4_array, self.para_bd_len)
mid4_std = np.std(self.mid4_array[-self.para_bd_len:], ddof=1)
mid4_ema_diff_array = self.mid4_array - mid4_ema_array
var5_array = (mid4_ema_diff_array / mid4_std * 100 + 200) / 4
var6_array = (ta.EMA(var5_array, 5) - 25) * 1.56
fast_array = ta.EMA(var6_array, 2) * 1.22
slow_array = ta.EMA(fast_array, 2)
# 快线/慢线最后记录追加到line_bd_fast/ list_bd_slow中
if len(self.line_bd_fast) > self.max_hold_bars:
self.line_bd_fast.pop(0)
if not np.isnan(fast_array[-1]):
self.line_bd_fast.append(fast_array[-1])
if len(self.line_bd_slow) > self.max_hold_bars:
self.line_bd_slow.pop(0)
if not np.isnan(slow_array[-1]):
self.line_bd_slow.append(slow_array[-1])
# 判断金叉/死叉
if len(self.line_bd_fast) > 2 and len(self.line_bd_slow) > 2:
if self.line_bd_fast[-1] > self.line_bd_slow[-1]:
self.cur_bd_cross = max(1, self.cur_bd_cross + 1)
elif self.line_bd_fast[-1] < self.line_bd_slow[-1]:
self.cur_bd_cross = min(-1, self.cur_bd_cross -1)
def write_log(self, content):
"""记录CTA日志"""
self.strategy.write_log(u'[' + self.name + u']' + content)
def append_data(self, file_name, dict_data, field_names=None):
"""
添加数据到csv文件中

View File

@ -142,6 +142,7 @@ class BinancefGateway(BaseGateway):
def cancel_order(self, req: CancelRequest) -> Request:
""""""
self.rest_api.cancel_order(req)
return True
def query_account(self) -> None:
""""""
@ -397,7 +398,7 @@ class BinancefRestApi(RestClient):
self.gateway_name
)
self.orders.update({orderid: copy(order)})
self.gateway.write_log(f'返回订单更新:{order.__dict__}')
self.gateway.write_log(f'委托返回订单更新:{order.__dict__}')
self.gateway.on_order(order)
data = {
@ -861,7 +862,7 @@ class BinancefTradeWebsocketApi(WebsocketClient):
def on_order(self, packet: dict) -> None:
"""ws处理on_order事件"""
self.gateway.write_log(json.dumps(packet, indent=2))
self.gateway.write_log('ws返回订单更新:\n'.format(json.dumps(packet, indent=2)))
dt = datetime.fromtimestamp(packet["E"] / 1000)
time = dt.strftime("%Y-%m-%d %H:%M:%S")
@ -891,12 +892,12 @@ class BinancefTradeWebsocketApi(WebsocketClient):
time=time,
gateway_name=self.gateway_name
)
self.gateway.write_log(f'WS返回订单更新:{order.__dict__}')
self.gateway.write_log(f'WS订单更新:\n{order.__dict__}')
self.gateway.on_order(order)
# Push trade event
trade_volume = float(ord_data["l"])
if not trade_volume:
if trade_volume <= 0:
return
trade_dt = datetime.fromtimestamp(ord_data["T"] / 1000)
@ -915,6 +916,7 @@ class BinancefTradeWebsocketApi(WebsocketClient):
datetime=trade_time,
gateway_name=self.gateway_name,
)
self.gateway.write_log(f'WS成交更新:\n{trade.__dict__}')
self.gateway.on_trade(trade)

View File

@ -703,7 +703,6 @@ class CtpTdApi(TdApi):
if data["OrderPriceType"] == THOST_FTDC_OPT_AnyPrice:
order_type = OrderType.MARKET
order = OrderData(
symbol=symbol,
exchange=exchange,
@ -1287,7 +1286,8 @@ class TdxMdApi():
else:
self.gateway.write_log(u'创建tdx连接, IP: {}/{}'.format(self.best_ip['ip'], self.best_ip['port']))
self.connection_status = True
self.gateway.status.update({'tdx_con': True, 'tdx_con_time': datetime.now().strftime('%Y-%m-%d %H:%M%S')})
self.gateway.status.update(
{'tdx_con': True, 'tdx_con_time': datetime.now().strftime('%Y-%m-%d %H:%M%S')})
self.thread = Thread(target=self.run)
self.thread.start()
@ -1824,7 +1824,7 @@ class TickCombiner(object):
# 昨收盘价
if self.last_leg2_tick.pre_close > 0 and self.last_leg1_tick.pre_close > 0:
ratio_tick.pre_close = 100 * self.last_leg1_tick.pre_close * self.leg1_ratio / (
self.last_leg2_tick.pre_close * self.leg2_ratio) # noqa
self.last_leg2_tick.pre_close * self.leg2_ratio) # noqa
ratio_tick.pre_close = round_to(
target=self.price_tick,
value=ratio_tick.pre_close
@ -1833,7 +1833,7 @@ class TickCombiner(object):
# 开盘价
if self.last_leg2_tick.open_price > 0 and self.last_leg1_tick.open_price > 0:
ratio_tick.open_price = 100 * self.last_leg1_tick.open_price * self.leg1_ratio / (
self.last_leg2_tick.open_price * self.leg2_ratio) # noqa
self.last_leg2_tick.open_price * self.leg2_ratio) # noqa
ratio_tick.open_price = round_to(
target=self.price_tick,
value=ratio_tick.open_price

View File

@ -333,7 +333,7 @@ def get_csv_last_dt(file_name, dt_index=0, dt_format='%Y-%m-%d %H:%M:%S', line_l
try:
last_dt = datetime.strptime(datas[dt_index], dt_format)
return last_dt
except:
except: # noqa
return None
return None