[update] websocket支持gzip自动解压

This commit is contained in:
msincenselee 2021-02-09 09:30:36 +08:00
parent a446ce08fc
commit 310ff555a3
11 changed files with 108 additions and 67 deletions

View File

@ -29,7 +29,7 @@ if __name__ == "__main__":
cta_path = os.path.abspath(os.path.join(vnpy_root, account_folder))
if not os.path.exists(cta_path):
print(f'{cta_path}不存在', file=sys.stderr)
exit()
continue
print(f'开始检查{cta_path}下的策略运行配置文件')
account_name = account_folder.split('/')[-1]
# 创建API对象

View File

@ -59,6 +59,7 @@ if __name__ == "__main__":
print(f'文件{bar_file_path}存在,最后时间:{start_date}')
else:
last_dt = None
start_dt = datetime.strptime(start_date, '%Y%m%d')
print(f'文件{bar_file_path}不存在,开始时间:{start_date}')

View File

@ -35,7 +35,8 @@ class RemoteClient:
"""
params = locals().copy()
params.pop("self")
# if exe_path is None:
# params['exe_path'] = 'C:\\THS\\xiadan.exe'
if config_path is not None:
account = file2dict(config_path)
params["user"] = account["user"]

View File

@ -1,5 +1,6 @@
import json
import logging
import gzip
import socket
import ssl
import sys
@ -197,19 +198,21 @@ class WebsocketClient:
self._ensure_connection()
ws = self._ws
if ws:
text = ws.recv()
recv_data = ws.recv()
# ws object is closed when recv function is blocking
if not text:
if not recv_data:
self._disconnect()
continue
self._record_last_received_text(text)
self._record_last_received_text(recv_data)
try:
data = self.unpack_data(text)
if isinstance(recv_data, bytes):
recv_data = gzip.decompress(recv_data)
data = self.unpack_data(recv_data)
except ValueError as e:
print("websocket unable to parse data: " + text)
print("websocket unable to parse data: " + recv_data)
raise e
self._log('recv data: %s', data)

View File

@ -2143,7 +2143,7 @@ class CtaSpotTemplate(CtaTemplate):
.format(self.cur_datetime, self.vt_symbol, self.cur_price, self.entrust))
if len(self.active_orders) > 0:
self.write_log('当前活动订单:{}'.format(json.dumps(self.active_orders, indent=2, ensure_ascii=False)))
self.write_log('当前活动订单:{}'.format(len(self.active_orders))) #json.dumps(self.active_orders, indent=2, ensure_ascii=False)))
if hasattr(self, 'policy'):
policy = getattr(self, 'policy')

View File

@ -279,9 +279,10 @@ class CtaEngine(BaseEngine):
strategy = self.orderid_strategy_map.get(order.vt_orderid, None)
if not strategy:
self.write_log(f'委托单没有对应的策略设置:order:{order}')
self.write_log(f'委托单没有对应的策略设置:order:{order.__dict__}')
self.write_log(f'当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}')
return
self.write_log(f'委托更新:{order.vt_orderid} => 策略:{strategy.strategy_name}')
# Remove vt_orderid if order is no longer active.
vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
if order.vt_orderid in vt_orderids and not order.is_active():
@ -311,15 +312,18 @@ class CtaEngine(BaseEngine):
# Filter duplicate trade push
if trade.vt_tradeid in self.vt_tradeids:
self.write_log(f'成交单的委托编号不属于本引擎实例:{trade}')
self.write_log(f'成交单的交易编号{trade.vt_tradeid}已处理完毕,不再处理')
return
self.vt_tradeids.add(trade.vt_tradeid)
strategy = self.orderid_strategy_map.get(trade.vt_orderid, None)
if not strategy:
self.write_log(f'成交单没有对应的策略设置:order:{trade}')
self.write_log(f'成交单没有对应的策略设置:trade:{trade.__dict__}')
self.write_log(f'当前策略侦听委托单:{list(self.orderid_strategy_map.keys())}')
return
self.write_log(f'成交更新:{trade.vt_orderid} => 策略:{strategy.strategy_name}')
# Update strategy pos before calling on_trade method
# 取消外部干预策略pos由策略自行完成更新
# if trade.direction == Direction.LONG:
@ -512,13 +516,10 @@ class CtaEngine(BaseEngine):
vt_orderid = self.main_engine.send_order(
req, gateway_name)
# Check if sending order successful
if not vt_orderid:
vt_orderids
vt_orderids.append(vt_orderid)
# Save relationship between orderid and strategy.
self.write_log(f'委托成功绑定{vt_orderid} <==> {strategy.strategy_name}')
self.orderid_strategy_map[vt_orderid] = strategy
self.strategy_orderid_map[strategy.strategy_name].add(vt_orderid)
@ -874,7 +875,7 @@ class CtaEngine(BaseEngine):
def get_account(self, vt_accountid: str = ""):
""" 查询账号的资金"""
# 如果启动风控,则使用风控中的最大仓位
if self.main_engine.rm_engine:
if self.main_engine.rm_engine and len(vt_accountid) > 0:
return self.main_engine.rm_engine.get_account(vt_accountid)
if len(vt_accountid) > 0:

View File

@ -2003,7 +2003,7 @@ class BackTestingEngine(object):
# 计算每个策略实例的持仓盈亏
strategy_pnl.update({longpos.strategy_name: strategy_pnl.get(longpos.strategy_name, 0) + holding_profit})
positionMsg += "{},long,p={},v={},m={};".format(symbol, longpos.price, longpos.volume, holding_profit)
positionMsg += "{:<10},long ,p:{:<10},vol:{:<3},pnl:{};\n".format(symbol, round(longpos.price,3), longpos.volume, round(holding_profit,3))
for shortpos in self.short_position_list:
# 不计算套利合约的持仓盈亏
@ -2023,7 +2023,7 @@ class BackTestingEngine(object):
# 计算每个策略实例的持仓盈亏
strategy_pnl.update({shortpos.strategy_name: strategy_pnl.get(shortpos.strategy_name, 0) + holding_profit})
positionMsg += "{},short,p={},v={},m={};".format(symbol, shortpos.price, shortpos.volume, holding_profit)
positionMsg += "{:<10},short,p:{:<10},vol:{:<3},pnl:{};\n".format(symbol, round(shortpos.price,3), shortpos.volume, round(holding_profit,3))
data['net'] = c + today_holding_profit # 当日净值(含持仓盈亏)
data['rate'] = (c + today_holding_profit) / self.init_capital
@ -2049,7 +2049,7 @@ class BackTestingEngine(object):
self.daily_max_drawdown_rate = drawdown_rate
self.max_drawdown_rate_time = data['date']
msg = u'{}: net={}, capital={} max={} holding_profit={} commission={} pos: {}' \
msg = u'{}: net={}, capital={} max={} holding_profit={} commission={} pos: \n{}' \
.format(data['date'],
data['net'], c, m,
today_holding_profit,
@ -2143,6 +2143,7 @@ class BackTestingEngine(object):
d = {}
d['init_capital'] = self.init_capital
d['profit'] = self.cur_capital - self.init_capital
d['net_capital'] = self.net_capital
d['max_capital'] = self.max_net_capital # 取消原 maxCapital
if len(self.pnl_list) == 0:
@ -2223,8 +2224,11 @@ class BackTestingEngine(object):
result_info.update({u'期初资金': d['init_capital']})
self.output(u'期初资金:\t%s' % format_number(d['init_capital']))
result_info.update({u'总盈亏': d['profit']})
self.output(u'总盈亏:\t%s' % format_number(d['profit']))
result_info.update({u'期末资金': d['net_capital']})
self.output(u'期末资金:\t%s' % format_number(d['net_capital']))
result_info.update({u'平仓盈亏': d['profit']})
self.output(u'平仓盈亏:\t%s' % format_number(d['profit']))
result_info.update({u'资金最高净值': d['max_capital']})
self.output(u'资金最高净值:\t%s' % format_number(d['max_capital']))

View File

@ -713,7 +713,11 @@ class CtaProTemplate(CtaTemplate):
cb_on_bar = strategy_kline.cb_on_bar
# 缓存实例数据 =》 当前实例数据
strategy_kline.__dict__.update(cache_kline.__dict__)
kline_first_bar_dt = None
kline_last_bar_dt = None
if len(strategy_kline.line_bar) > 0:
kline_first_bar_dt = strategy_kline.line_bar[0].datetime
kline_last_bar_dt = strategy_kline.line_bar[-1].datetime
# 所有K线的最后时间
if last_bar_dt and strategy_kline.cur_datetime:
last_bar_dt = max(last_bar_dt, strategy_kline.cur_datetime)
@ -724,7 +728,7 @@ class CtaProTemplate(CtaTemplate):
strategy_kline.strategy = self
strategy_kline.cb_on_bar = cb_on_bar
self.write_log(f'恢复{kline_name}缓存数据,最新bar结束时间:{last_bar_dt}')
self.write_log(f'恢复{kline_name}缓存数据:[{kline_first_bar_dt}] => [{kline_last_bar_dt}], bar结束时间:{last_bar_dt}')
self.write_log(u'加载缓存k线数据完毕')
return last_bar_dt
@ -1880,7 +1884,7 @@ class CtaProFutureTemplate(CtaProTemplate):
elif order_info['traded'] > 0:
self.write_log('撤单逻辑 = > 部分开仓')
if order_grid.traded_volume < order_info['traded']:
self.write_log('撤单逻辑 = > 调整网格开仓数 {} => {}'.format(order_grid.traded_volume, order_grid['traded'] ))
self.write_log('撤单逻辑 = > 调整网格开仓数 {} => {}'.format(order_grid.traded_volume, order_info['traded'] ))
order_grid.traded_volume = order_info['traded']
self.write_log(f'撤单逻辑 => 调整网格委托状态=> False, 开仓状态:True, 开仓数量:{order_grid.volume}=>{order_grid.traded_volume}')
order_grid.order_status = False

View File

@ -199,7 +199,7 @@ class CtaLineBar(object):
self.mid5_array[:] = np.nan
# 导出到CSV文件 的目录名 和 要导出的 字段
self.export_filename = None # 数据要导出的目标文件夹
self.export_fields = [] # 定义要导出的K线数据字段包含K线元素主图指标附图指标等
self.export_fields = [] # 定义要导出的K线数据字段包含K线元素主图指标附图指标等
self.export_bi_filename = None # 通过唐其安通道输出得笔csv文件(不是缠论得笔)
self.export_zs_filename = None # 通过唐其安通道输出的中枢csv文件不是缠论的笔中枢
@ -654,6 +654,7 @@ class CtaLineBar(object):
self.dict_dif = {} # datetime str: dif mapping
self.line_dea = [] # DEA = 前一日DEA X 8/10 + 今日DIF X 2/10即为talib-MACD返回值
self.line_macd = [] # (dif-dea)*2但是talib中MACD的计算是bar = (dif-dea)*1,国内一般是乘以2
self.dict_macd = {} # datetime str: macd mapping
self.macd_segment_list = [] # macd 金叉/死叉的段列表,记录价格的最高/最低Dif的最高最低Macd的最高/最低Macd面接
self._rt_dif = None
self._rt_dea = None
@ -940,6 +941,7 @@ class CtaLineBar(object):
if self.bar_len > self.max_hold_bars:
del self.line_bar[0]
self.dict_dif.pop(self.index_list[0], None)
self.dict_macd.pop(self.index_list[0], None)
del self.index_list[0]
self.bar_len = self.bar_len - 1 # 删除了最前面的barbar长度少一位
@ -1256,13 +1258,13 @@ class CtaLineBar(object):
# K线的日期时间
self.cur_bar.trading_day = tick.trading_day # K线所在的交易日期
#self.cur_bar.date = tick.date # K线的日期夜盘的话与交易日期不同哦
# self.cur_bar.date = tick.date # K线的日期夜盘的话与交易日期不同哦
self.cur_bar.datetime = tick.datetime
if (self.interval == Interval.SECOND and self.bar_interval % 60 == 0) \
or self.interval in [Interval.MINUTE, Interval.HOUR, Interval.DAILY, Interval.WEEKLY]:
# K线的日期时间去除秒设为第一个Tick的时间
self.cur_bar.datetime = self.cur_bar.datetime.replace(second=0, microsecond=0)
#self.cur_bar.time = self.cur_bar.datetime.strftime('%H:%M:%S')
# self.cur_bar.time = self.cur_bar.datetime.strftime('%H:%M:%S')
self.cur_bar.volume = tick.last_volume if tick.last_volume > 0 else tick.volume
if self.cur_trading_day != self.cur_bar.trading_day or not self.line_bar:
# bar的交易日与记录的当前交易日不一致
@ -1784,7 +1786,7 @@ class CtaLineBar(object):
# 当前笔 不是中枢最后一笔, 方向是回归中枢的
else:
# 向下的一笔,且回落中枢高位下方,变成中枢的最后一笔
if direction == -1 and cur_bi.get('low') < self.cur_tqn_zs.get('high')\
if direction == -1 and cur_bi.get('low') < self.cur_tqn_zs.get('high') \
and cur_bi.get('high') > self.cur_tqn_zs.get('low'):
# 对比中枢之前所有的确认低点,不能超过
zs_lows = self.cur_tqn_zs.get("lows", [self.cur_tqn_zs.get('low')])
@ -1814,7 +1816,7 @@ class CtaLineBar(object):
self.cur_tqn_zs.update({"end": cur_bi.get("start")})
# 向上的一笔,回抽中枢下轨上方,变成中枢的一笔
if direction == 1 and cur_bi.get('high') > self.cur_tqn_zs.get('low')\
if direction == 1 and cur_bi.get('high') > self.cur_tqn_zs.get('low') \
and cur_bi.get('low') < self.cur_tqn_zs.get('high'):
# 对比中枢之前所有的确认高点,不能超过
zs_highs = self.cur_tqn_zs.get("highs", [self.cur_tqn_zs.get('high')])
@ -3753,15 +3755,19 @@ class CtaLineBar(object):
del self.line_dif[0]
dif = round(dif_list[-1], self.round_n)
self.line_dif.append(dif)
if len(self.index_list) > 0:
self.dict_dif.update({self.index_list[-1]: dif})
if len(self.line_dea) > self.max_hold_bars:
del self.line_dea[0]
self.line_dea.append(round(dea_list[-1], self.round_n))
macd = round(macd_list[-1] * 2, self.round_n)
if len(self.line_macd) > self.max_hold_bars:
del self.line_macd[0]
self.line_macd.append(round(macd_list[-1] * 2, self.round_n)) # 国内一般是2倍
self.line_macd.append(macd) # 国内一般是2倍
if len(self.index_list) > 0:
self.dict_dif.update({self.index_list[-1]: dif})
self.dict_macd.update({self.index_list[-1]: macd})
# 更新 “段”(金叉-》死叉;或 死叉-》金叉)
segment = self.macd_segment_list[-1] if len(self.macd_segment_list) > 0 else {}
@ -3889,7 +3895,11 @@ class CtaLineBar(object):
# 通过bar的时间获取dif值
def get_dif_by_dt(self, str_dt):
return self.dict_dif.get(str_dt, 0)
return self.dict_dif.get(str_dt, None)
# 通过bar的时间获取macd值
def get_macd_by_dt(self, str_dt):
return self.dict_macd.get(str_dt, None)
@property
def rt_dif(self):
@ -5390,6 +5400,15 @@ class CtaLineBar(object):
# self.__count_chanlun()
# return self._duan_zs_list
def is_duan(self, direction):
"""当前最新一线段,是否与输入方向一致"""
if isinstance(direction, Direction):
direction = 1 if direction == Direction.LONG else -1
if len(self.duan_list) == 0:
return False
return self.duan_list[-1].direction == direction
def is_bi_beichi_inside_duan(self, direction, cur_duan=None):
"""
当前段内的笔是否形成背驰
@ -5402,7 +5421,7 @@ class CtaLineBar(object):
direction = 1 if direction == Direction.LONG else -1
if cur_duan is None:
if len(self._duan_list) == 0:
if len(self.duan_list) == 0:
return False
# 分型需要确认
@ -5410,7 +5429,7 @@ class CtaLineBar(object):
return False
# 取当前段
cur_duan = self._duan_list[-1]
cur_duan = self.duan_list[-1]
# 获取最近2个匹配direction的分型
fx_list = [fx for fx in self._fenxing_list[-4:] if fx.direction == direction]
if len(fx_list) != 2:
@ -5450,51 +5469,57 @@ class CtaLineBar(object):
return False
def is_fx_macd_divergence(self, direction):
def is_fx_macd_divergence(self, direction, cur_duan=None, use_macd=False):
"""
分型的macd背离
:param direction: 1-1 或者 Direction.LONG判断是否顶背离, Direction.SHORT判断是否底背离
: cur_duan 当前段
: use_macd 使用macd红柱绿柱进行比较
:return:
"""
if isinstance(direction, Direction):
direction = 1 if direction == Direction.LONG else -1
if len(self._duan_list) == 0:
return False
# 当前段
duan = self._duan_list[-1]
if cur_duan is None:
if duan.direction != direction:
if len(self.duan_list) == 0:
return False
# 当前段
cur_duan = self.duan_list[-1]
# 获取最近2个匹配direction的分型
fx_list = [fx for fx in self._fenxing_list[-4:] if fx.direction == direction]
if len(fx_list) != 2:
return False
# 这里是排除段的信号出错,获取了很久之前的一段,而不是最新的一段
if cur_duan.end < fx_list[0].index:
return False
if cur_duan.direction != direction:
return False
# 当前段包含的分笔必须大于3
if len(duan.bi_list) <= 3:
if len(cur_duan.bi_list) < 3:
return False
# 获取最近2个匹配direction的分型
fx_list = [fx for fx in self._fenxing_list[-4:] if fx.direction == direction]
if len(fx_list) != 2:
return False
# 这里是排除段的信号出错,获取了很久之前的一段,而不是最新的一段
if duan.end < fx_list[0].index:
return False
pre_dif = self.get_dif_by_dt(fx_list[0].index)
cur_dif = self.get_dif_by_dt(fx_list[1].index)
if pre_dif is None or cur_dif is None:
# 获取倒数第二根同向分笔的结束dif值或macd值
pre_value = self.get_macd_by_dt(cur_duan.bi_list[-3].end) if use_macd else self.get_dif_by_dt(
cur_duan.bi_list[-3].end)
cur_value = self.get_macd_by_dt(cur_duan.bi_list[-1].end) if use_macd else self.get_dif_by_dt(
cur_duan.bi_list[-1].end)
if pre_value is None or cur_value is None:
return False
if direction == 1:
# 前顶分型顶部价格
pre_price = fx_list[0].high
pre_price = cur_duan.bi_list[-3].high
# 当前顶分型顶部价格
cur_price = fx_list[1].high
if pre_price < cur_price and pre_dif >= cur_dif and 0 < self.line_dif[-1] < self.line_dif[-2]:
cur_price = cur_duan.bi_list[-1].high
if pre_price < cur_price and pre_value >= cur_value > 0:
return True
else:
pre_price = fx_list[0].low
cur_price = fx_list[1].low
if pre_price > cur_price and pre_dif <= cur_dif and self.line_dif[-2] < self.line_dif[-1] < 0:
pre_price = cur_duan.bi_list[-3].low
cur_price = cur_duan.bi_list[-1].low
if pre_price > cur_price and pre_value <= cur_value < 0:
return True
return False
@ -6511,7 +6536,7 @@ class CtaMinuteBar(CtaLineBar):
is_new_bar = False
# 不在同一交易日推入新bar
if len(tick.trading_day) > 0 and len(self.cur_trading_day) > 0 and self.cur_trading_day != tick.trading_day:
if len(tick.trading_day) > 0 and len(self.cur_trading_day) > 0 and self.cur_trading_day != tick.trading_day:
# self.write_log('{} drawLineBar() new_bar,{} curTradingDay:{},tick.trading_day:{} bars_count={}'
# .format(self.name, tick.datetime.strftime("%Y-%m-%d %H:%M:%S"), self.cur_trading_day,
# tick.trading_day, self.bars_count))

View File

@ -43,10 +43,10 @@ class TickData(BaseData):
trading_day: str = "" # '%Y-%m-%d'
name: str = ""
volume: float = 0
volume: float = 0 # 当前交易日累计成交量
open_interest: float = 0
last_price: float = 0
last_volume: float = 0
last_volume: float = 0 # 当前切片的成交量
limit_up: float = 0
limit_down: float = 0

View File

@ -622,6 +622,7 @@ def display_dual_axis(df, columns1, columns2=[], invert_yaxis1=False, invert_yax
:param file_name: 保存的excel 文件名称
:param sheet_name: excel 的sheet
:param image_name: 保存的image 文件名
颜色色系:https://www.osgeo.cn/matplotlib/tutorials/colors/colormaps.html
:return:
"""
@ -634,7 +635,7 @@ def display_dual_axis(df, columns1, columns2=[], invert_yaxis1=False, invert_yax
fig, ax1 = plt.subplots()
if invert_yaxis1:
ax1.invert_yaxis()
ax1.plot(df1)
df1.plot(ax=ax1,cmap='tab20b')
if len(columns2) > 0:
df2 = df[columns2]
@ -642,7 +643,8 @@ def display_dual_axis(df, columns1, columns2=[], invert_yaxis1=False, invert_yax
ax2 = ax1.twinx()
if invert_yaxis2:
ax2.invert_yaxis()
ax2.plot(df2)
df2.plot(ax=ax2, cmap='tab20c')
# 修改x轴得label为时间
xt = ax1.get_xticks()