[bug fix] 修复ctp的标准套利合约last_price为空;仓位比对增加参数判断

This commit is contained in:
msincenselee 2020-08-11 15:01:31 +08:00
parent cf97252314
commit 81b7bf479e
8 changed files with 176 additions and 134 deletions

View File

@ -237,8 +237,9 @@ class CtaEngine(BaseEngine):
# 推送到事件
self.put_all_strategy_pos_event(all_strategy_pos)
for strategy in self.strategies.values():
if strategy.inited:
for strategy_name in list(self.strategies.keys()):
strategy = self.strategies.get(strategy_name, None)
if strategy and strategy.inited:
self.call_strategy_func(strategy, strategy.on_timer)

View File

@ -993,67 +993,62 @@ class CtaStockTemplate(CtaTemplate):
self.tns_finish_sell_grid(grid)
continue
# 定位到首个满足条件的网格,跳出循环
# 定位到首个满足条件的网格
ordering_grid = grid
break
# 没有满足条件的网格
if ordering_grid is None:
return
acc_symbol_pos = self.cta_engine.get_position(
vt_symbol=ordering_grid.vt_symbol,
direction=Direction.NET)
if acc_symbol_pos is None:
self.write_error(f'{self.strategy_name}当前{ordering_grid.vt_symbol}持仓查询不到, 无法执行卖出')
continue
acc_symbol_pos = self.cta_engine.get_position(
vt_symbol=ordering_grid.vt_symbol,
direction=Direction.NET)
if acc_symbol_pos is None:
self.write_error(f'{self.strategy_name}当前{ordering_grid.vt_symbol}持仓查询不到, 无法执行卖出')
return
vt_symbol = ordering_grid.vt_symbol
sell_volume = ordering_grid.volume - ordering_grid.traded_volume
vt_symbol = ordering_grid.vt_symbol
sell_volume = ordering_grid.volume - ordering_grid.traded_volume
if sell_volume > acc_symbol_pos.volume:
self.write_error(u'账号{}持仓{},不满足减仓目标:{}'
.format(vt_symbol, acc_symbol_pos.volume, sell_volume))
continue
if sell_volume > acc_symbol_pos.volume:
self.write_error(u'账号{}持仓{},不满足减仓目标:{}'
.format(vt_symbol, acc_symbol_pos.volume, sell_volume))
return
# 实盘运行时,要加入市场买卖量的判断
if not self.backtesting:
symbol_tick = self.cta_engine.get_tick(vt_symbol)
if symbol_tick is None:
self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=vt_symbol)
self.write_log(f'获取不到{vt_symbol}得tick,无法根据市场深度进行计算')
continue
# 实盘运行时,要加入市场买卖量的判断
if not self.backtesting:
symbol_tick = self.cta_engine.get_tick(vt_symbol)
if symbol_tick is None:
self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=vt_symbol)
self.write_log(f'获取不到{vt_symbol}得tick,无法根据市场深度进行计算')
return
symbol_volume_tick = self.cta_engine.get_volume_tick(vt_symbol)
# 根据市场计算前5档买单数量
if all([symbol_tick.ask_volume_1, symbol_tick.ask_volume_2, symbol_tick.ask_volume_3,
symbol_tick.ask_volume_4, symbol_tick.ask_volume_5]) \
and all(
[symbol_tick.bid_volume_1, symbol_tick.bid_volume_2, symbol_tick.bid_volume_3, symbol_tick.bid_volume_4,
symbol_tick.bid_volume_5]):
market_ask_volumes = symbol_tick.ask_volume_1 + symbol_tick.ask_volume_2 + symbol_tick.ask_volume_3 + symbol_tick.ask_volume_4 + symbol_tick.ask_volume_5
market_bid_volumes = symbol_tick.bid_volume_1 + symbol_tick.bid_volume_2 + symbol_tick.bid_volume_3 + symbol_tick.bid_volume_4 + symbol_tick.bid_volume_5
org_sell_volume = sell_volume
if market_bid_volumes > 0 and market_ask_volumes > 0 and org_sell_volume >= 2 * symbol_volume_tick:
sell_volume = min(market_bid_volumes / 4, market_ask_volumes / 4, sell_volume)
sell_volume = max(round_to(value=sell_volume, target=symbol_volume_tick), symbol_volume_tick)
if org_sell_volume != sell_volume:
self.write_log(u'修正批次卖出{}数量:{}=>{}'.format(vt_symbol, org_sell_volume, sell_volume))
symbol_volume_tick = self.cta_engine.get_volume_tick(vt_symbol)
# 根据市场计算前5档买单数量
if all([symbol_tick.ask_volume_1, symbol_tick.ask_volume_2, symbol_tick.ask_volume_3,
symbol_tick.ask_volume_4, symbol_tick.ask_volume_5]) \
and all(
[symbol_tick.bid_volume_1, symbol_tick.bid_volume_2, symbol_tick.bid_volume_3, symbol_tick.bid_volume_4,
symbol_tick.bid_volume_5]):
market_ask_volumes = symbol_tick.ask_volume_1 + symbol_tick.ask_volume_2 + symbol_tick.ask_volume_3 + symbol_tick.ask_volume_4 + symbol_tick.ask_volume_5
market_bid_volumes = symbol_tick.bid_volume_1 + symbol_tick.bid_volume_2 + symbol_tick.bid_volume_3 + symbol_tick.bid_volume_4 + symbol_tick.bid_volume_5
org_sell_volume = sell_volume
if market_bid_volumes > 0 and market_ask_volumes > 0 and org_sell_volume >= 2 * symbol_volume_tick:
sell_volume = min(market_bid_volumes / 4, market_ask_volumes / 4, sell_volume)
sell_volume = max(round_to(value=sell_volume, target=symbol_volume_tick), symbol_volume_tick)
if org_sell_volume != sell_volume:
self.write_log(u'修正批次卖出{}数量:{}=>{}'.format(vt_symbol, org_sell_volume, sell_volume))
# 获取当前价格
sell_price = self.cta_engine.get_price(vt_symbol) - self.cta_engine.get_price_tick(vt_symbol)
# 发出委托卖出
vt_orderids = self.sell(
vt_symbol=vt_symbol,
price=sell_price,
volume=sell_volume,
order_time=self.cur_datetime,
grid=ordering_grid)
if vt_orderids is None or len(vt_orderids) == 0:
self.write_error(f'{vt_symbol} 委托卖出失败,委托价:{sell_price} 数量:{sell_volume}')
return
else:
self.write_log(f'{vt_symbol} 已委托卖出,{sell_volume},委托价:{sell_price}, 数量:{sell_volume}')
# 获取当前价格
sell_price = self.cta_engine.get_price(vt_symbol) - self.cta_engine.get_price_tick(vt_symbol)
# 发出委托卖出
vt_orderids = self.sell(
vt_symbol=vt_symbol,
price=sell_price,
volume=sell_volume,
order_time=self.cur_datetime,
grid=ordering_grid)
if vt_orderids is None or len(vt_orderids) == 0:
self.write_error(f'{vt_symbol} 委托卖出失败,委托价:{sell_price} 数量:{sell_volume}')
continue
else:
self.write_log(f'{vt_symbol} 已委托卖出,{sell_volume},委托价:{sell_price}, 数量:{sell_volume}')
def tns_finish_sell_grid(self, grid):
@ -1128,68 +1123,63 @@ class CtaStockTemplate(CtaTemplate):
self.tns_finish_buy_grid(grid)
return
# 定位到首个满足条件的网格,跳出循环
# 定位到首个满足条件的网格,
ordering_grid = grid
break
# 没有满足条件的网格
if ordering_grid is None:
return
balance, availiable, _, _ = self.cta_engine.get_account()
if availiable <= 0:
self.write_error(u'当前可用资金不足'.format(availiable))
continue
vt_symbol = ordering_grid.vt_symbol
cur_price = self.cta_engine.get_price(vt_symbol)
if cur_price is None:
self.write_error(f'暂时不能获取{vt_symbol}最新价格')
continue
balance, availiable, _, _ = self.cta_engine.get_account()
if availiable <= 0:
self.write_error(u'当前可用资金不足'.format(availiable))
return
vt_symbol = ordering_grid.vt_symbol
cur_price = self.cta_engine.get_price(vt_symbol)
if cur_price is None:
self.write_error(f'暂时不能获取{vt_symbol}最新价格')
return
buy_volume = ordering_grid.volume - ordering_grid.traded_volume
min_trade_volume = self.cta_engine.get_volume_tick(vt_symbol)
if availiable < buy_volume * cur_price:
self.write_error(f'可用资金{availiable},不满足买入{vt_symbol},数量:{buy_volume} X价格{cur_price}')
max_buy_volume = int(availiable / cur_price)
max_buy_volume = max_buy_volume - max_buy_volume % min_trade_volume
if max_buy_volume <= min_trade_volume:
continue
# 计划买入数量,与可用资金买入数量的差别
diff_volume = buy_volume - max_buy_volume
# 降低计划买入数量
self.write_log(f'总计划{vt_symbol}买入数量:{ordering_grid.volume}=>{ordering_grid.volume - diff_volume}')
ordering_grid.volume -= diff_volume
self.gt.save()
buy_volume = max_buy_volume
buy_volume = ordering_grid.volume - ordering_grid.traded_volume
min_trade_volume = self.cta_engine.get_volume_tick(vt_symbol)
if availiable < buy_volume * cur_price:
self.write_error(f'可用资金{availiable},不满足买入{vt_symbol},数量:{buy_volume} X价格{cur_price}')
max_buy_volume = int(availiable / cur_price)
max_buy_volume = max_buy_volume - max_buy_volume % min_trade_volume
if max_buy_volume <= min_trade_volume:
return
# 计划买入数量,与可用资金买入数量的差别
diff_volume = buy_volume - max_buy_volume
# 降低计划买入数量
self.write_log(f'总计划{vt_symbol}买入数量:{ordering_grid.volume}=>{ordering_grid.volume - diff_volume}')
ordering_grid.volume -= diff_volume
self.gt.save()
buy_volume = max_buy_volume
# 实盘运行时,要加入市场买卖量的判断
if not self.backtesting and 'market' in ordering_grid.snapshot:
symbol_tick = self.cta_engine.get_tick(vt_symbol)
# 根据市场计算前5档买单数量
if all([symbol_tick.ask_volume_1, symbol_tick.ask_volume_2, symbol_tick.ask_volume_3,
symbol_tick.ask_volume_4, symbol_tick.ask_volume_5]) \
and all(
[symbol_tick.bid_volume_1, symbol_tick.bid_volume_2, symbol_tick.bid_volume_3, symbol_tick.bid_volume_4,
symbol_tick.bid_volume_5]):
market_ask_volumes = symbol_tick.ask_volume_1 + symbol_tick.ask_volume_2 + symbol_tick.ask_volume_3 + symbol_tick.ask_volume_4 + symbol_tick.ask_volume_5
market_bid_volumes = symbol_tick.bid_volume_1 + symbol_tick.bid_volume_2 + symbol_tick.bid_volume_3 + symbol_tick.bid_volume_4 + symbol_tick.bid_volume_5
if market_bid_volumes > 0 and market_ask_volumes > 0:
buy_volume = min(market_bid_volumes / 4, market_ask_volumes / 4, buy_volume)
buy_volume = max(buy_volume - buy_volume % min_trade_volume, min_trade_volume)
# 实盘运行时,要加入市场买卖量的判断
if not self.backtesting and 'market' in ordering_grid.snapshot:
symbol_tick = self.cta_engine.get_tick(vt_symbol)
# 根据市场计算前5档买单数量
if all([symbol_tick.ask_volume_1, symbol_tick.ask_volume_2, symbol_tick.ask_volume_3,
symbol_tick.ask_volume_4, symbol_tick.ask_volume_5]) \
and all(
[symbol_tick.bid_volume_1, symbol_tick.bid_volume_2, symbol_tick.bid_volume_3, symbol_tick.bid_volume_4,
symbol_tick.bid_volume_5]):
market_ask_volumes = symbol_tick.ask_volume_1 + symbol_tick.ask_volume_2 + symbol_tick.ask_volume_3 + symbol_tick.ask_volume_4 + symbol_tick.ask_volume_5
market_bid_volumes = symbol_tick.bid_volume_1 + symbol_tick.bid_volume_2 + symbol_tick.bid_volume_3 + symbol_tick.bid_volume_4 + symbol_tick.bid_volume_5
if market_bid_volumes > 0 and market_ask_volumes > 0:
buy_volume = min(market_bid_volumes / 4, market_ask_volumes / 4, buy_volume)
buy_volume = max(buy_volume - buy_volume % min_trade_volume, min_trade_volume)
buy_price = cur_price + self.cta_engine.get_price_tick(vt_symbol) * 10
buy_price = cur_price + self.cta_engine.get_price_tick(vt_symbol) * 10
vt_orderids = self.buy(
vt_symbol=vt_symbol,
price=buy_price,
volume=buy_volume,
order_time=self.cur_datetime,
grid=ordering_grid)
if vt_orderids is None or len(vt_orderids) == 0:
self.write_error(f'委托买入失败,{vt_symbol} 委托价:{buy_price} 数量:{buy_volume}')
return
else:
self.write_log(f'{self.strategy_name}, {vt_orderids},已委托买入,{vt_symbol} 委托价:{buy_price} 数量:{buy_volume}')
vt_orderids = self.buy(
vt_symbol=vt_symbol,
price=buy_price,
volume=buy_volume,
order_time=self.cur_datetime,
grid=ordering_grid)
if vt_orderids is None or len(vt_orderids) == 0:
self.write_error(f'委托买入失败,{vt_symbol} 委托价:{buy_price} 数量:{buy_volume}')
continue
else:
self.write_log(f'{self.strategy_name}, {vt_orderids},已委托买入,{vt_symbol} 委托价:{buy_price} 数量:{buy_volume}')
def tns_finish_buy_grid(self, grid):
"""

View File

@ -237,7 +237,7 @@ class CtaEngine(BaseEngine):
all_strategy_pos = self.get_all_strategy_pos()
# 每5分钟检查一次
if dt.minute % 5 == 0:
if dt.minute % 5 == 0 and self.engine_config.get('compare_pos',True):
# 比对仓位,使用上述获取得持仓信息,不用重复获取
self.compare_pos(strategy_pos_list=copy(all_strategy_pos))
@ -859,7 +859,10 @@ class CtaEngine(BaseEngine):
tick = self.main_engine.get_tick(vt_symbol)
if tick:
return tick.last_price
if '&' in tick.symbol:
return (tick.ask_price_1 + tick.bid_price_1) / 2
else:
return tick.last_price
return None

View File

@ -383,6 +383,7 @@ class CtpGateway(BaseGateway):
# 增加映射( leg1 对应的合成器列表映射)
leg1_symbol = setting.get('leg1_symbol')
leg1_exchange = Exchange(setting.get('leg1_exchange'))
combiner_list = self.tick_combiner_map.get(leg1_symbol, [])
if combiner not in combiner_list:
self.write_log(u'添加Leg1:{}与合成器得映射'.format(leg1_symbol))
@ -391,6 +392,7 @@ class CtpGateway(BaseGateway):
# 增加映射( leg2 对应的合成器列表映射)
leg2_symbol = setting.get('leg2_symbol')
leg2_exchange = Exchange(setting.get('leg2_exchange'))
combiner_list = self.tick_combiner_map.get(leg2_symbol, [])
if combiner not in combiner_list:
self.write_log(u'添加Leg2:{}与合成器得映射'.format(leg2_symbol))
@ -400,14 +402,14 @@ class CtpGateway(BaseGateway):
self.write_log(u'订阅leg1:{}'.format(leg1_symbol))
leg1_req = SubscribeRequest(
symbol=leg1_symbol,
exchange=symbol_exchange_map.get(leg1_symbol, Exchange.LOCAL)
exchange=leg1_exchange
)
self.subscribe(leg1_req)
self.write_log(u'订阅leg2:{}'.format(leg2_symbol))
leg2_req = SubscribeRequest(
symbol=leg2_symbol,
exchange=symbol_exchange_map.get(leg1_symbol, Exchange.LOCAL)
exchange=leg2_exchange
)
self.subscribe(leg2_req)
@ -646,6 +648,10 @@ class CtpMdApi(MdApi):
gateway_name=self.gateway_name
)
# 处理一下标准套利合约的last_price
if '&' in symbol:
tick.last_price = (tick.ask_price_1 + tick.bid_price_1)/2
if data["BidVolume2"] or data["AskVolume2"]:
tick.bid_price_2 = adjust_price(data["BidPrice2"])
tick.bid_price_3 = adjust_price(data["BidPrice3"])

View File

@ -137,9 +137,9 @@ class CtptestGateway(BaseGateway):
exchanges = list(EXCHANGE_CTP2VT.values())
def __init__(self, event_engine):
def __init__(self, event_engine, gateway_name="CTPTEST"):
"""Constructor"""
super().__init__(event_engine, "CTPTEST")
super().__init__(event_engine, gateway_name)
self.td_api = CtpTdApi(self)
self.md_api = CtpMdApi(self)
@ -166,8 +166,9 @@ class CtptestGateway(BaseGateway):
and (not md_address.startswith("ssl://"))
):
md_address = "tcp://" + md_address
print(f'连接交易前置机:{td_address}')
self.td_api.connect(td_address, userid, password, brokerid, auth_code, appid, product_info)
print(f'连接行情前置机:{md_address}')
self.md_api.connect(md_address, userid, password, brokerid)
self.init_query()
@ -772,10 +773,10 @@ class CtpTdApi(TdApi):
ctp_req = {
"InstrumentID": req.symbol,
"ExchangeID": req.exchange,
"ExchangeID": req.exchange.value,
"OrderRef": order_ref,
"FrontID": int(frontid),
"SessionID": int(sessionid),
"SessionID": abs(int(sessionid)),
"ActionFlag": THOST_FTDC_AF_Delete,
"BrokerID": self.brokerid,
"InvestorID": self.userid

View File

@ -12,7 +12,7 @@ if vnpy_root not in sys.path:
from vnpy.gateway.ctptest import CtptestGateway
from vnpy.event import EventEngine
from vnpy.trader.constant import Exchange
from vnpy.trader.constant import Exchange,OrderType
from vnpy.trader.event import (
EVENT_TICK,
EVENT_ORDER,
@ -22,17 +22,17 @@ from vnpy.trader.event import (
EVENT_LOG,
)
from vnpy.trader.object import (
SubscribeRequest,
SubscribeRequest,OrderRequest,Direction,Offset,CancelRequest
)
# 这里放期货公司需要你连接的测试系统的相关信息
ctp_setting = {
"用户名": "xxx",
"密码": "xxx",
"经纪商代码": "9999",
"交易服务器": "tcp://180.168.146.187:10100",
"行情服务器": "tcp://180.168.146.187:10110",
"产品名称": "simnow_client_test",
"授权编码": "0000000000000000",
"用户名": "12000430",
"密码": "11112222w",
"经纪商代码": "0187",
"交易服务器": "tcp://110.87.99.14:61209",
"行情服务器": "tcp://110.87.99.14:61219",
"产品名称": "client_huafu_2.0.0",
"授权编码": "BON2HDZHJBKLXKUK",
"产品信息": ""
}
@ -63,10 +63,11 @@ def test():
event_engine.start()
gateway = CtptestGateway(event_engine)
print(f'开始接入仿真测试:{ctp_setting}')
gateway.connect(ctp_setting)
# gateway.connect()
auto_subscribe_symbols = ['rb2010']
auto_subscribe_symbols = ['rb2101']
for symbol in auto_subscribe_symbols:
print(u'自动订阅合约:{}'.format(symbol))
sub = SubscribeRequest(symbol=symbol, exchange=Exchange.SHFE)
@ -81,6 +82,46 @@ def test():
sleep(1)
couter -= 1
for i in range(5):
print(f'发出rb2101的买入委托{i+1}')
order_req = OrderRequest(
strategy_name='',
symbol='rb2101',
exchange=Exchange.SHFE,
direction=Direction.LONG,
offset=Offset.OPEN,
type=OrderType.LIMIT,
price=3800,
volume=i+1
)
gateway.send_order(order_req)
for i in range(5):
print(f'发出rb2101的平仓委托{i+1}')
order_req = OrderRequest(
strategy_name='',
symbol='rb2101',
exchange=Exchange.SHFE,
direction=Direction.LONG,
offset=Offset.CLOSETODAY,
type=OrderType.LIMIT,
price=3801,
volume=i+1
)
gateway.send_order(order_req)
#
for i in range(5):
print(f'发出rb2101的撤单委托{i + 1}')
cancel_req = CancelRequest(
orderid=f'5_-78969411_{i+1}',
symbol='rb2101',
exchange=Exchange.SHFE
)
gateway.cancel_order(cancel_req)
sys.exit(app.exec_())

View File

@ -2023,14 +2023,14 @@ class PbTdApi(object):
sys_orderid = str(data.wtxh)
if order_status in [Status.NOTTRADED] and len(sys_orderid) > 0:
self.gateway.write_log(f'撤单:{data.__dict__}')
self.gateway.write_log(f'撤单:{data.wtxh}')
cancel_data = (int(sys_orderid), None, None, None, None, None, None, None)
cancel_table.append(cancel_data)
orders_table.close()
cancel_table.close()
except Exchange as ex:
except Exception as ex:
self.gateway.write_error(f'dbf全委托撤单异常:{str(ex)}')
self.gateway.write_error(traceback.format_exc())
return False

View File

@ -532,7 +532,7 @@ class OmsEngine(BaseEngine):
tick = event.data
self.ticks[tick.vt_symbol] = tick
if tick.last_price:
if '&' not in tick.symbol and tick.last_price:
self.prices[tick.vt_symbol] = tick.last_price
def process_order_event(self, event: Event) -> None: