[update] bug fix

This commit is contained in:
msincenselee 2021-04-27 16:17:02 +08:00
parent a265deb3a1
commit 24d4dbebf7
11 changed files with 69 additions and 29 deletions

View File

@ -41,6 +41,7 @@
#linux编译 #linux编译
1. 复制so文件到ctp根目录并改名 1. 复制so文件到ctp根目录并改名
mv thostmduserapi_se.so libthostmduserapi_se.so mv thostmduserapi_se.so libthostmduserapi_se.so
mv thosttraderapi_se.so libthosttraderapi_se.so mv thosttraderapi_se.so libthosttraderapi_se.so

View File

@ -40,4 +40,4 @@ db.today_trades.createIndex({'account_id':1,'vt_symbol':1,'vt_tradeid':1,'trade_
db.history_trades.createIndex({'account_id':1,'vt_symbol':1,'vt_tradeid':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtSymbol_vt_tradeid_trade_date_holder_id'}) db.history_trades.createIndex({'account_id':1,'vt_symbol':1,'vt_tradeid':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtSymbol_vt_tradeid_trade_date_holder_id'})
db.today_positions.createIndex({'account_id':1,'vt_symbol':1,'direction':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtsymbol_direction_trade_date_holder_id'}) db.today_positions.createIndex({'account_id':1,'vt_symbol':1,'direction':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtsymbol_direction_trade_date_holder_id'})
db.today_strategy_pos.createIndex({'account_id':1,'strategy_group':1,'strategy_name':1,'date':1},{'name':'accountid_strategy_group_strategy_name_date'}) db.today_strategy_pos.createIndex({'account_id':1,'strategy_group':1,'strategy_name':1,'date':1},{'name':'accountid_strategy_group_strategy_name_date'})
db.strategy_snapshot.createIndex({'account_id':1,'strategy_group':1,'strategy':1,'guid':1},{'name':'accountid_strategy_name_guid'}) db.strategy_snapshot.createIndex({'account_id':1,'strategy_group':1,'strategy':1,'guid':1,'datetime':1},{'name':'accountid_strategy_name_guid'})

View File

@ -1798,7 +1798,7 @@ class BackTestingEngine(object):
# 计算每个策略实例的持仓盈亏 # 计算每个策略实例的持仓盈亏
strategy_pnl.update({longpos.strategy_name: strategy_pnl.get(longpos.strategy_name, 0) + holding_profit}) strategy_pnl.update({longpos.strategy_name: strategy_pnl.get(longpos.strategy_name, 0) + holding_profit})
positionMsg += "{},long,p={},v={},m={};".format(symbol, longpos.price, longpos.volume, holding_profit) positionMsg += "{},long,p={},v={},m={};\n".format(symbol, longpos.price, longpos.volume, holding_profit)
for shortpos in self.short_position_list: for shortpos in self.short_position_list:
@ -1815,7 +1815,7 @@ class BackTestingEngine(object):
# 计算每个策略实例的持仓盈亏 # 计算每个策略实例的持仓盈亏
strategy_pnl.update({shortpos.strategy_name: strategy_pnl.get(shortpos.strategy_name, 0) + holding_profit}) strategy_pnl.update({shortpos.strategy_name: strategy_pnl.get(shortpos.strategy_name, 0) + holding_profit})
positionMsg += "{},short,p={},v={},m={};".format(symbol, shortpos.price, shortpos.volume, holding_profit) positionMsg += "{},short,p={},v={},m={};\n".format(symbol, shortpos.price, shortpos.volume, holding_profit)
data['net'] = c + today_holding_profit # 当日净值(含持仓盈亏) data['net'] = c + today_holding_profit # 当日净值(含持仓盈亏)
data['rate'] = (c + today_holding_profit) / self.init_capital data['rate'] = (c + today_holding_profit) / self.init_capital

View File

@ -933,8 +933,8 @@ class CtaEngine(BaseEngine):
except Exception: except Exception:
strategy.trading = False strategy.trading = False
strategy.inited = False strategy.inited = False
account_id = self.engine_config.get('accountid', '-')
msg = f"触发异常已停止\n{traceback.format_exc()}" msg = f"{account_id}/{strategy.strategy_name}触发异常已停止\n{traceback.format_exc()}"
self.write_log(msg=msg, self.write_log(msg=msg,
strategy_name=strategy.strategy_name, strategy_name=strategy.strategy_name,
level=logging.CRITICAL) level=logging.CRITICAL)

View File

@ -502,7 +502,7 @@ class CtaStockTemplate(CtaTemplate):
def load_klines_from_cache(self, kline_names: list = []): def load_klines_from_cache(self, kline_names: list = []):
""" """
从缓存加载K线数据 从缓存加载K线数据
:param kline_names: :param kline_names: 指定需要加载的k线名称列表
:return: :return:
""" """
if len(kline_names) == 0: if len(kline_names) == 0:
@ -1263,12 +1263,13 @@ class CtaStockTemplate(CtaTemplate):
self.active_orders.update({vt_orderid: order_info}) self.active_orders.update({vt_orderid: order_info})
ret = self.cancel_order(str(vt_orderid)) ret = self.cancel_order(str(vt_orderid))
if not ret: if not ret:
self.write_log(u'撤单失败,更新状态为撤单成功') self.write_log(f'撤单失败:{order_info}')
order_info.update({'status': Status.CANCELLED}) # self.write_log(u'撤单失败,更新状态为撤单成功')
self.active_orders.update({vt_orderid: order_info}) # order_info.update({'status': Status.CANCELLED})
if order_grid: # self.active_orders.update({vt_orderid: order_info})
if vt_orderid in order_grid.order_ids: # if order_grid:
order_grid.order_ids.remove(vt_orderid) # if vt_orderid in order_grid.order_ids:
# order_grid.order_ids.remove(vt_orderid)
continue continue

View File

@ -407,7 +407,7 @@ class CtaEngine(BaseEngine):
underlying_symbol = get_underlying_symbol(vt_symbol) underlying_symbol = get_underlying_symbol(vt_symbol)
dt = datetime.now() dt = datetime.now()
# 若为中金所的合约,白天才提交订阅请求 # 若为中金所的合约,白天才提交订阅请求
if underlying_symbol in MARKET_DAY_ONLY and not (9 < dt.hour < 16): if underlying_symbol in MARKET_DAY_ONLY and not (9 <= dt.hour < 16):
continue continue
self.write_log(f'重新提交合约{vt_symbol}订阅请求') self.write_log(f'重新提交合约{vt_symbol}订阅请求')

View File

@ -24,7 +24,7 @@ from vnpy.amqp.producer import publisher
from vnpy.amqp.consumer import worker from vnpy.amqp.consumer import worker
APP_NAME = 'Stock_Publisher' APP_NAME = 'Stock_Publisher'
REST_HOST = 'http://49.234.35.135:8005' REST_HOST = 'http://49.234.35.135:8006'
SUBSCRIBE_FILE = 'today_subscribe.json' SUBSCRIBE_FILE = 'today_subscribe.json'
# 市场交易代码 => vnpy # 市场交易代码 => vnpy

View File

@ -31,11 +31,11 @@ NIGHT_MARKET_SQ2 = {'CU': 10, 'PB': 5, 'AL': 5, 'ZN': 5, 'WR': 1, 'NI': 10, 'SS'
NIGHT_MARKET_SQ3 = {'RU': 5, 'RB': 1, 'HC': 1, 'SP': 2, 'FU': 1, 'BU': 2, 'NR': 5, 'C': 1, 'CS': 1, 'LU':1,'PF':2} NIGHT_MARKET_SQ3 = {'RU': 5, 'RB': 1, 'HC': 1, 'SP': 2, 'FU': 1, 'BU': 2, 'NR': 5, 'C': 1, 'CS': 1, 'LU':1,'PF':2}
# 郑商所夜盘9:00~10:15, 10:30~11:30, 13:30~15:00, 21:00 ~23:00 # 郑商所夜盘9:00~10:15, 10:30~11:30, 13:30~15:00, 21:00 ~23:00
NIGHT_MARKET_ZZ = {'TA': 2, 'JR': 1, 'OI': 0, 'RO': 1, 'PM': 1, 'WH': 1, 'CF': 5, 'SR': 0, 'FG': 1, NIGHT_MARKET_ZZ = {'TA': 2, 'JR': 1, 'OI': 0, 'RO': 1, 'PM': 1, 'WH': 1, 'CF': 5, 'SR': 0, 'FG': 1,
'MA': 1, 'RS': 1, 'RM': 1, 'RI': 1, 'ZC': 0.2} 'MA': 1, 'RS': 1, 'RM': 1, 'RI': 1, 'ZC': 0.2, 'SA':1}
# 大商所夜盘9:00~10:15, 10:30~11:30, 13:30~15:00, 21:00 ~23:00 # 大商所夜盘9:00~10:15, 10:30~11:30, 13:30~15:00, 21:00 ~23:00
NIGHT_MARKET_DL = {'V': 5, 'L': 5, 'BB': 0.05, 'I': 0.5, 'FB': 0.05, 'C': 1, 'PP': 1, 'A': 1, 'B': 1, 'M': 1, 'Y': 2, NIGHT_MARKET_DL = {'V': 5, 'L': 5, 'BB': 0.05, 'I': 0.5, 'FB': 0.05, 'C': 1, 'PP': 1, 'A': 1, 'B': 1, 'M': 1, 'Y': 2,
'P': 2, 'P': 2,
'JM': 0.5, 'J': 0.5, 'EG': 1, 'EB': 1} 'JM': 0.5, 'J': 0.5, 'EG': 1, 'EB': 1,'PG':1}
# 中金日盘9:15 ~11:30, 13:00~15:15 # 中金日盘9:15 ~11:30, 13:00~15:15
MARKET_ZJ = {'IC': 0.2, 'IF': 0.2, 'IH': 0.2, 'T': 0.005, 'TF': 0.005, 'TS': 0.005} MARKET_ZJ = {'IC': 0.2, 'IF': 0.2, 'IH': 0.2, 'T': 0.005, 'TF': 0.005, 'TS': 0.005}

View File

@ -407,7 +407,9 @@ class TdxFutureData(object):
else: else:
tdx_symbol = get_full_symbol(symbol).upper() tdx_symbol = get_full_symbol(symbol).upper()
# 查询合约 => 指数合约, 主要是用来获取市场
tdx_index_symbol = underlying_symbol + 'L9' tdx_index_symbol = underlying_symbol + 'L9'
# 合约缩写 => 交易所
vn_exchange = self._get_vn_exchange(underlying_symbol) vn_exchange = self._get_vn_exchange(underlying_symbol)
self.connect() self.connect()
@ -441,19 +443,20 @@ class TdxFutureData(object):
_bars = [] _bars = []
_pos = 0 _pos = 0
while _start_date > qry_start_date: while _start_date > qry_start_date:
# 利用api查询历史数据
_res = self.api.get_instrument_bars( _res = self.api.get_instrument_bars(
tdx_period, category=tdx_period,
self.symbol_market_dict.get(tdx_index_symbol, 0), market=self.symbol_market_dict.get(tdx_index_symbol, 0),
tdx_symbol, code=tdx_symbol,
_pos, start=_pos,
QSIZE) count=QSIZE)
if _res is not None: if _res is not None:
_bars = _res + _bars _bars = _res + _bars
_pos += QSIZE _pos += QSIZE
if _res is not None and len(_res) > 0: if _res is not None and len(_res) > 0:
_start_date = _res[0]['datetime'] _start_date = _res[0]['datetime']
_start_date = datetime.strptime(_start_date, '%Y-%m-%d %H:%M') _start_date = datetime.strptime(_start_date, '%Y-%m-%d %H:%M')
self.write_log(u'分段取数据开始:{}'.format(_start_date)) self.write_log(u'分段取{}数据,开始时间:{}'.format(tdx_symbol, _start_date))
else: else:
break break
if len(_bars) == 0: if len(_bars) == 0:

View File

@ -28,6 +28,9 @@ api_01 = TdxFutureData(strategy=t1)
# 获取某个合约得最新价 # 获取某个合约得最新价
#price = api_01.get_price('rb2010') #price = api_01.get_price('rb2010')
#print('price={}'.format(price)) #print('price={}'.format(price))
ret, bars = api_01.get_bars('rb2105.SHFE',period='1min')
for bar in bars:
print(bar.__dict__)
# 获取主力合约 # 获取主力合约

View File

@ -340,6 +340,13 @@ class CtpGateway(BaseGateway):
for (vt_symbol, is_bar) in list(self.subscribed_symbols): for (vt_symbol, is_bar) in list(self.subscribed_symbols):
symbol, exchange = extract_vt_symbol(vt_symbol) symbol, exchange = extract_vt_symbol(vt_symbol)
# 获取合约的缩写号
underlying_symbol = get_underlying_symbol(vt_symbol)
dt = datetime.now()
# 若为中金所等的合约,白天才提交订阅请求
if underlying_symbol in MARKET_DAY_ONLY and not (8 < dt.hour < 16):
continue
req = SubscribeRequest( req = SubscribeRequest(
symbol=symbol, symbol=symbol,
exchange=exchange, exchange=exchange,
@ -600,6 +607,7 @@ class CtpMdApi(MdApi):
Callback when front server is connected. Callback when front server is connected.
""" """
self.gateway.write_log(f"{self.name}行情服务器连接成功") self.gateway.write_log(f"{self.name}行情服务器连接成功")
self.connect_status = True
self.login() self.login()
self.gateway.status.update( self.gateway.status.update(
{f'{self.name}md_con': True, f'{self.name}md_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) {f'{self.name}md_con': True, f'{self.name}md_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
@ -609,6 +617,7 @@ class CtpMdApi(MdApi):
Callback when front server is disconnected. Callback when front server is disconnected.
""" """
self.login_status = False self.login_status = False
self.connect_status = False
self.gateway.write_log(f"{self.name}行情服务器连接断开,原因{reason}") self.gateway.write_log(f"{self.name}行情服务器连接断开,原因{reason}")
self.gateway.status.update( self.gateway.status.update(
{f'{self.name}md_con': False, f'{self.name}md_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) {f'{self.name}md_con': False, f'{self.name}md_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
@ -814,27 +823,31 @@ class CtpTdApi(TdApi):
def onFrontConnected(self): def onFrontConnected(self):
"""""" """"""
self.gateway.write_log("交易服务器连接成功") self.gateway.write_log("交易服务器连接成功")
self.connect_status = True
if self.auth_code: if self.auth_code:
self.gateway.write_log("向交易服务器提交授权码验证")
self.authenticate() self.authenticate()
else: else:
self.gateway.write_log("向交易服务器进行帐号登录")
self.login() self.login()
self.gateway.status.update({'td_con': True, 'td_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
def onFrontDisconnected(self, reason: int): def onFrontDisconnected(self, reason: int):
"""""" """"""
self.login_status = False self.login_status = False
self.gateway.write_log(f"交易服务器连接断开,原因{reason}") self.gateway.write_log(f"交易服务器连接断开,原因{reason}")
self.gateway.status.update({'td_con': True, 'td_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) self.gateway.status.update({'td_con': False, 'td_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
def onRspAuthenticate(self, data: dict, error: dict, reqid: int, last: bool): def onRspAuthenticate(self, data: dict, error: dict, reqid: int, last: bool):
"""""" """"""
if not error['ErrorID']: if not error['ErrorID']:
self.auth_staus = True self.auth_staus = True
self.gateway.write_log("交易服务器授权验证成功") self.gateway.write_log("交易服务器授权验证成功")
self.gateway.status.update({"td_auth": True})
self.login() self.login()
else: else:
self.gateway.write_error("交易服务器授权验证失败", error) self.gateway.write_error("交易服务器授权验证失败", error)
self.gateway.status.update({"td_auth":False})
def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool): def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool):
"""""" """"""
@ -842,7 +855,8 @@ class CtpTdApi(TdApi):
self.frontid = data["FrontID"] self.frontid = data["FrontID"]
self.sessionid = data["SessionID"] self.sessionid = data["SessionID"]
self.login_status = True self.login_status = True
self.gateway.write_log("交易服务器登录成功") self.gateway.status.update({'td_con': True, 'td_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
self.gateway.write_log("交易帐号登录完成")
# Confirm settlement # Confirm settlement
req = { req = {
@ -853,7 +867,7 @@ class CtpTdApi(TdApi):
self.reqSettlementInfoConfirm(req, self.reqid) self.reqSettlementInfoConfirm(req, self.reqid)
else: else:
self.login_failed = True self.login_failed = True
self.gateway.status.update({'td_con': False,'td_login_fail_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
self.gateway.write_error("交易服务器登录失败", error) self.gateway.write_error("交易服务器登录失败", error)
def onRspOrderInsert(self, data: dict, error: dict, reqid: int, last: bool): def onRspOrderInsert(self, data: dict, error: dict, reqid: int, last: bool):
@ -975,6 +989,11 @@ class CtpTdApi(TdApi):
if act_symbol != pas_symbol: if act_symbol != pas_symbol:
self.gateway.subscribe(SubscribeRequest(symbol=position.symbol, exchange=position.exchange)) self.gateway.subscribe(SubscribeRequest(symbol=position.symbol, exchange=position.exchange))
else: else:
# 获取合约的缩写号
underlying_symbol = get_underlying_symbol(position.symbol)
dt = datetime.now()
# 若为中金所等的合约,白天才提交订阅请求
if not (underlying_symbol in MARKET_DAY_ONLY and not (8 < dt.hour < 16)):
self.gateway.subscribe(SubscribeRequest(symbol=position.symbol, exchange=position.exchange)) self.gateway.subscribe(SubscribeRequest(symbol=position.symbol, exchange=position.exchange))
if last: if last:
@ -1267,7 +1286,7 @@ class CtpTdApi(TdApi):
self.registerFront(address) self.registerFront(address)
self.init() self.init()
self.gateway.write_log(f'交易前端连接成功')
self.connect_status = True self.connect_status = True
else: else:
self.authenticate() self.authenticate()
@ -1276,6 +1295,7 @@ class CtpTdApi(TdApi):
""" """
Authenticate with auth_code and appid. Authenticate with auth_code and appid.
""" """
req = { req = {
"UserID": self.userid, "UserID": self.userid,
"BrokerID": self.brokerid, "BrokerID": self.brokerid,
@ -1782,8 +1802,14 @@ class TdxMdApi():
continue continue
# self.gateway.write_log(f'{tick.__dict__}') # self.gateway.write_log(f'{tick.__dict__}')
pre_tick = self.symbol_tick_dict.get(tick.symbol, None)
self.symbol_tick_dict[tick.symbol] = tick self.symbol_tick_dict[tick.symbol] = tick
# 排除指数的异常数据(tdx有些服务器异常返回数据偏差超过上一tick的20%
if pre_tick:
if tick.last_price > pre_tick.last_price * 1.2 or tick.last_price < pre_tick.last_price * 0.8:
continue
self.gateway.on_tick(tick) self.gateway.on_tick(tick)
self.gateway.on_custom_tick(tick) self.gateway.on_custom_tick(tick)
@ -1854,7 +1880,13 @@ class SubMdApi():
if len(tick.trading_day) == 0: if len(tick.trading_day) == 0:
tick.trading_day = get_trading_date(dt) tick.trading_day = get_trading_date(dt)
pre_tick = self.symbol_tick_dict.get(symbol,None)
self.symbol_tick_dict[symbol] = tick self.symbol_tick_dict[symbol] = tick
# 排除指数的异常数据(tdx有些服务器异常返回数据偏差超过上一tick的20%
if pre_tick:
if tick.last_price > pre_tick.last_price * 1.2 or tick.last_price < pre_tick.last_price * 0.8:
return
self.gateway.on_tick(tick) self.gateway.on_tick(tick)
self.gateway.on_custom_tick(tick) self.gateway.on_custom_tick(tick)