[bug fix] bar volume和tick volume 修复

This commit is contained in:
msincenselee 2021-01-28 11:57:04 +08:00
parent 5c394b89b4
commit a446ce08fc
8 changed files with 252 additions and 127 deletions

View File

@ -795,7 +795,7 @@ class CtaLineBar(object):
"""
# Tick 有效性检查
if not self.is_7x24 and (tick.datetime.hour == 8 or tick.datetime.hour == 20):
self.write_log(u'竞价排名tick时间:{0}'.format(tick.datetime))
self.write_log(u'{}竞价排名tick时间:{}'.format(self.name, tick.datetime))
return
self.cur_datetime = tick.datetime
@ -1256,14 +1256,14 @@ class CtaLineBar(object):
# K线的日期时间
self.cur_bar.trading_day = tick.trading_day # K线所在的交易日期
self.cur_bar.date = tick.date # K线的日期夜盘的话与交易日期不同哦
#self.cur_bar.date = tick.date # K线的日期夜盘的话与交易日期不同哦
self.cur_bar.datetime = tick.datetime
if (self.interval == Interval.SECOND and self.bar_interval % 60 == 0) \
or self.interval in [Interval.MINUTE, Interval.HOUR, Interval.DAILY]:
or self.interval in [Interval.MINUTE, Interval.HOUR, Interval.DAILY, Interval.WEEKLY]:
# K线的日期时间去除秒设为第一个Tick的时间
self.cur_bar.datetime = self.cur_bar.datetime.replace(second=0, microsecond=0)
self.cur_bar.time = self.cur_bar.datetime.strftime('%H:%M:%S')
self.cur_bar.volume = tick.last_volume
#self.cur_bar.time = self.cur_bar.datetime.strftime('%H:%M:%S')
self.cur_bar.volume = tick.last_volume if tick.last_volume > 0 else tick.volume
if self.cur_trading_day != self.cur_bar.trading_day or not self.line_bar:
# bar的交易日与记录的当前交易日不一致
self.cur_trading_day = self.cur_bar.trading_day
@ -1271,6 +1271,8 @@ class CtaLineBar(object):
self.is_first_tick = True # 标识该Tick属于该Bar的第一个tick数据
# 6、将生成的正在合成的self.cur_bar 推入到line_bar队列
if self.interval in [Interval.HOUR, Interval.DAILY, Interval.WEEKLY]:
self.write_log(f'[{self.name}] create new bar, [{self.cur_bar.datetime}==> ]')
self.line_bar.append(self.cur_bar) # 推入到lineBar队列
def generate_bar(self, tick: TickData):
@ -1385,7 +1387,7 @@ class CtaLineBar(object):
# volume_change = tick.volume - self.last_tick.volume
# lastbar.volume += max(volume_change, 0)
# 更新 bar内成交量volume 新版根据tick内成交量运算
lastBar.volume += tick.last_volume
lastBar.volume += tick.last_volume if tick.last_volume > 0 else tick.volume
# 更新Bar的颜色
if lastBar.close_price > lastBar.open_price:
@ -6483,7 +6485,7 @@ class CtaMinuteBar(CtaLineBar):
del self.line_bar[0]
# 与最后一个BAR的时间比对判断是否超过K线周期
lastBar = self.line_bar[-1]
last_bar = self.line_bar[-1]
is_new_bar = False
endtick = False
@ -6509,11 +6511,11 @@ class CtaMinuteBar(CtaLineBar):
is_new_bar = False
# 不在同一交易日推入新bar
if self.cur_trading_day != tick.trading_day:
if len(tick.trading_day) > 0 and len(self.cur_trading_day) > 0 and self.cur_trading_day != tick.trading_day:
# self.write_log('{} drawLineBar() new_bar,{} curTradingDay:{},tick.trading_day:{} bars_count={}'
# .format(self.name, tick.datetime.strftime("%Y-%m-%d %H:%M:%S"), self.cur_trading_day,
# tick.trading_day, self.bars_count))
self.write_log(f'trading_day:{self.cur_trading_day} => tick.trading_day:{tick.trading_day} ')
is_new_bar = True
self.cur_trading_day = tick.trading_day
self.bars_count = bars_passed
@ -6533,8 +6535,8 @@ class CtaMinuteBar(CtaLineBar):
if (
(
tick.datetime.hour * 60 + tick.datetime.minute) % self.bar_interval == 0 and tick.datetime.minute != lastBar.datetime.minute) or (
tick.datetime - lastBar.datetime).total_seconds() > self.bar_interval * 60:
tick.datetime.hour * 60 + tick.datetime.minute) % self.bar_interval == 0 and tick.datetime.minute != last_bar.datetime.minute) or (
tick.datetime - last_bar.datetime).total_seconds() > self.bar_interval * 60:
# self.write_log('{} drawLineBar() new_bar,{} lastbar:{}, bars_count={}'
# .format(self.name, tick.datetime, lastBar.datetime,
# self.bars_count))
@ -6544,25 +6546,25 @@ class CtaMinuteBar(CtaLineBar):
# 创建并推入新的Bar
self.first_tick(tick)
# 触发OnBar事件
self.on_bar(lastBar)
self.on_bar(last_bar)
else:
# 更新当前最后一个bar
self.barFirstTick = False
# 更新最高价、最低价、收盘价、成交量
lastBar.high_price = max(lastBar.high_price, tick.last_price)
lastBar.low_price = min(lastBar.low_price, tick.last_price)
lastBar.close_price = tick.last_price
lastBar.open_interest = tick.open_interest
lastBar.volume += tick.volume
last_bar.high_price = max(last_bar.high_price, tick.last_price)
last_bar.low_price = min(last_bar.low_price, tick.last_price)
last_bar.close_price = tick.last_price
last_bar.open_interest = tick.open_interest
last_bar.volume += tick.last_volume if tick.last_volume > 0 else tick.volume
# 更新Bar的颜色
if lastBar.close_price > lastBar.open_price:
lastBar.color = Color.RED
elif lastBar.close_price < lastBar.open_price:
lastBar.color = Color.BLUE
if last_bar.close_price > last_bar.open_price:
last_bar.color = Color.RED
elif last_bar.close_price < last_bar.open_price:
last_bar.color = Color.BLUE
else:
lastBar.color = Color.EQUAL
last_bar.color = Color.EQUAL
# 实时计算
self.rt_executed = False
@ -6626,7 +6628,7 @@ class CtaHourBar(CtaLineBar):
if bar.trading_day is None:
if self.is_7x24:
bar.trading_day = bar.date
bar.trading_day = bar.datetime.strftime('%Y-%m-%d')
else:
bar.trading_day = get_trading_date(bar.datetime)
@ -6722,7 +6724,7 @@ class CtaHourBar(CtaLineBar):
endtick = True
# 与最后一个BAR的时间比对判断是否超过K线周期
lastBar = self.line_bar[-1]
last_bar = self.line_bar[-1]
is_new_bar = False
if self.last_minute is None:
@ -6731,7 +6733,8 @@ class CtaHourBar(CtaLineBar):
self.last_minute = tick.datetime.minute
# 不在同一交易日推入新bar
if self.cur_trading_day != tick.trading_day:
if len(tick.trading_day) > 0 and self.cur_trading_day != tick.trading_day:
self.write_log(f'trading_day:{self.cur_trading_day} => tick.trading_day: {tick.trading_day} ')
is_new_bar = True
# 去除分钟和秒数
tick.datetime = datetime.strptime(tick.datetime.strftime('%Y-%m-%d %H:00:00'), '%Y-%m-%d %H:%M:%S')
@ -6750,7 +6753,7 @@ class CtaHourBar(CtaLineBar):
if self.is_7x24:
# 数字货币,用前后时间间隔
if (tick.datetime - lastBar.datetime).total_seconds() >= 3600 * self.bar_interval:
if (tick.datetime - last_bar.datetime).total_seconds() >= 3600 * self.bar_interval:
# self.write_log('{} drawLineBar() new_bar,{} - {} > 3600 * {} '
# .format(self.name, tick.datetime.strftime("%Y-%m-%d %H:%M:%S"),
# lastBar.datetime.strftime("%Y-%m-%d %H:%M:%S"),
@ -6780,27 +6783,28 @@ class CtaHourBar(CtaLineBar):
self.first_tick(tick)
self.m1_bars_count = 1
# 触发OnBar事件
self.on_bar(lastBar)
self.write_log(f'[{self.name}] process on_bar event [{last_bar.datetime} => {tick.datetime}]')
self.on_bar(last_bar)
else:
# 更新当前最后一个bar
self.barFirstTick = False
# 更新最高价、最低价、收盘价、成交量
lastBar.high_price = max(lastBar.high_price, tick.last_price)
lastBar.low_price = min(lastBar.low_price, tick.last_price)
lastBar.close_price = tick.last_price
lastBar.open_interest = tick.open_interest
last_bar.high_price = max(last_bar.high_price, tick.last_price)
last_bar.low_price = min(last_bar.low_price, tick.last_price)
last_bar.close_price = tick.last_price
last_bar.open_interest = tick.open_interest
lastBar.volume += tick.volume
last_bar.volume += tick.last_volume if tick.last_volume > 0 else tick.volume
# 更新Bar的颜色
if lastBar.close_price > lastBar.open_price:
lastBar.color = Color.RED
elif lastBar.close_price < lastBar.open_price:
lastBar.color = Color.BLUE
if last_bar.close_price > last_bar.open_price:
last_bar.color = Color.RED
elif last_bar.close_price < last_bar.open_price:
last_bar.color = Color.BLUE
else:
lastBar.color = Color.EQUAL
last_bar.color = Color.EQUAL
# 实时计算
self.rt_executed = False
@ -6872,14 +6876,14 @@ class CtaDayBar(CtaLineBar):
if bar_len == 0:
new_bar = copy.deepcopy(bar)
self.line_bar.append(new_bar)
self.cur_trading_day = bar.trading_day if bar.trading_day is not None else bar.date
self.cur_trading_day = bar.trading_day if bar.trading_day is not None else get_trading_date(bar.datetime)
if bar_is_completed:
self.on_bar(bar)
return
# 与最后一个BAR的时间比对判断是否超过K线的周期
lastBar = self.line_bar[-1]
self.cur_trading_day = bar.trading_day if bar.trading_day is not None else bar.date
self.cur_trading_day = bar.trading_day if bar.trading_day is not None else get_trading_date(bar.datetime)
is_new_bar = False
if bar_is_completed:
@ -6932,12 +6936,13 @@ class CtaDayBar(CtaLineBar):
del self.line_bar[0]
# 与最后一个BAR的时间比对判断是否超过K线周期
lastBar = self.line_bar[-1]
last_bar = self.line_bar[-1]
is_new_bar = False
# 交易日期不一致,新的交易日
if len(tick.trading_day) > 0 and tick.trading_day != lastBar.trading_day:
if len(tick.trading_day) > 0 and tick.trading_day != last_bar.trading_day:
self.write_log(f'trading_day:{last_bar.trading_day} => tick.trading_day:{tick.trading_day}')
is_new_bar = True
# 数字货币方面如果当前tick 日期与bar的日期不一致.(取消按照上面的统一处理因为币安是按照UTC时间算的每天开始ok是按照北京时间开始)
@ -6948,26 +6953,27 @@ class CtaDayBar(CtaLineBar):
# 创建并推入新的Bar
self.first_tick(tick)
# 触发OnBar事件
self.on_bar(lastBar)
self.write_log(f'[{self.name}] process on_bar event [{last_bar.datetime} => {tick.datetime}]')
self.on_bar(last_bar)
else:
# 更新当前最后一个bar
self.barFirstTick = False
# 更新最高价、最低价、收盘价、成交量
lastBar.high_price = max(lastBar.high_price, tick.last_price)
lastBar.low_price = min(lastBar.low_price, tick.last_price)
lastBar.close_price = tick.last_price
lastBar.open_interest = tick.open_interest
lastBar.volume += tick.volume
last_bar.high_price = max(last_bar.high_price, tick.last_price)
last_bar.low_price = min(last_bar.low_price, tick.last_price)
last_bar.close_price = tick.last_price
last_bar.open_interest = tick.open_interest
last_bar.volume += tick.last_volume if tick.last_volume > 0 else tick.volume
# 更新Bar的颜色
if lastBar.close_price > lastBar.open_price:
lastBar.color = Color.RED
elif lastBar.close_price < lastBar.open_price:
lastBar.color = Color.BLUE
if last_bar.close_price > last_bar.open_price:
last_bar.color = Color.RED
elif last_bar.close_price < last_bar.open_price:
last_bar.color = Color.BLUE
else:
lastBar.color = Color.EQUAL
last_bar.color = Color.EQUAL
# 实时计算
self.rt_executed = False
@ -7190,7 +7196,7 @@ class CtaWeekBar(CtaLineBar):
lastBar.close_price = tick.last_price
lastBar.open_interest = tick.open_interest
# 更新日内总交易量和bar内交易量
lastBar.volume += tick.volume
lastBar.volume += tick.last_volume if tick.last_volume > 0 else tick.volume
# 更新Bar的颜色
if lastBar.close_price > lastBar.open_price:

View File

@ -21,7 +21,6 @@ from vnpy.trader.utility import round_to
from vnpy.trader.constant import Direction, Color
from vnpy.component.cta_period import CtaPeriod, Period
try:
from vnpy.component.chanlun import ChanGraph, ChanLibrary
except Exception as ex:
@ -587,9 +586,9 @@ class CtaRenkoBar(object):
"""修正tick的价格取平均值"""
# 修正最新价
if tick.last_price is None or tick.last_price == 0:
if tick.ask_price1 == 0 and tick.bid_price1 == 0:
if tick.ask_price_1 == 0 and tick.bid_price_1 == 0:
return None
tick.last_price = round_to(target=self.price_tick, value=(tick.ask_price1 + tick.bid_price1) / 2)
tick.last_price = round_to(target=self.price_tick, value=(tick.ask_price_1 + tick.bid_price_1) / 2)
if self.activate_kf_tick:
avg_price = self.get_kf_tick_lastprice(tick.last_price, tick.datetime)
@ -598,10 +597,10 @@ class CtaRenkoBar(object):
if avg_price != tick.last_price:
tick.last_price = avg_price
if tick.ask_price1 != 0:
tick.ask_price1 = tick.last_price + self.price_tick
if tick.bid_price1 != 0:
tick.bid_price1 = tick.last_price - self.price_tick
if tick.ask_price_1 != 0:
tick.ask_price_1 = tick.last_price + self.price_tick
if tick.bid_price_1 != 0:
tick.bid_price_1 = tick.last_price - self.price_tick
return tick
def on_tick(self, tick):
@ -774,10 +773,10 @@ class CtaRenkoBar(object):
new_height = int(
max(cur_price / 1000, self.price_tick) * self.kilo_height / self.price_tick) * self.price_tick
if new_height != self.height:
#self.write_log(u'修改:{}砖块高度:{}=>{}'.format(self.name, self.height, new_height))
# self.write_log(u'修改:{}砖块高度:{}=>{}'.format(self.name, self.height, new_height))
self.height = new_height
elif height != self.height:
#self.write_log(u'修改:{}砖块高度:{}=>{}'.format(self.name, self.height, height))
# self.write_log(u'修改:{}砖块高度:{}=>{}'.format(self.name, self.height, height))
self.height = height
def runtime_recount(self):
@ -985,7 +984,7 @@ class CtaRenkoBar(object):
elif tick.last_price > self.cur_bar.up_band:
self.cur_bar.high_time = tick.datetime
self.cur_bar.volume = tick.volume
self.cur_bar.volume = tick.last_volume if tick.last_volume > 0 else tick.volume
self.cur_bar.open_interest = tick.open_interest
# ----------------------------------------------------------------------
@ -1041,7 +1040,7 @@ class CtaRenkoBar(object):
self.cur_bar.low_price = min(tick.last_price, self.cur_bar.low_price)
self.cur_bar.close_price = tick.last_price
self.cur_bar.volume = self.cur_bar.volume + tick.volume
self.cur_bar.volume += tick.last_volume if tick.last_volume > 0 else tick.volume
self.cur_bar.open_interest = tick.open_interest
# 仅为第一个bar后续逻辑无意义退出
@ -1081,7 +1080,7 @@ class CtaRenkoBar(object):
self.cur_bar.low_price = min(tick.last_price, self.cur_bar.low_price)
self.cur_bar.close_price = tick.last_price
self.cur_bar.volume = self.cur_bar.volume + tick.volume
self.cur_bar.volume += tick.last_volume if tick.last_volume > 0 else tick.volume
self.cur_bar.open_interest = tick.open_interest
# 实时计算临时砖块的颜色
@ -2140,7 +2139,7 @@ class CtaRenkoBar(object):
:param ma_num:第几条均线, 1对应para_ma1_len,,,,
:return:
"""
if self.para_ma1_len <=0 and self.para_ma2_len <=0 and self.para_ma3_len <= 0:
if self.para_ma1_len <= 0 and self.para_ma2_len <= 0 and self.para_ma3_len <= 0:
return
if self.cur_bar:
rt_close_array = np.append(self.close_array, [self.cur_bar.close_price])
@ -3018,7 +3017,7 @@ class CtaRenkoBar(object):
# 发生死叉
self.cur_kd_count = -1
self.cur_kd_cross = round((self.line_k[-1] + self.line_k[-2]) / 2, self.round_n)
self.cur_kd_cross_price =self.cur_price
self.cur_kd_cross_price = self.cur_price
def __count_macd(self):
"""
@ -3381,8 +3380,8 @@ class CtaRenkoBar(object):
initial_state_mean=self.close_array[-1],
initial_state_covariance=1,
transition_covariance=0.01,
observation_covariance = self.para_kf_obscov_len
)
observation_covariance=self.para_kf_obscov_len
)
except Exception:
self.write_log(u'导入卡尔曼过滤器失败,需先安装 pip install pykalman')
self.para_active_kf = False
@ -4069,9 +4068,9 @@ class CtaRenkoBar(object):
del self.chan_graph
self.chan_graph = None
self.chan_graph = ChanGraph(chan_lib=self.chan_lib,
index=self.index_list[-self.bar_len+1:],
high=self.high_array[-self.bar_len+1:],
low=self.low_array[-self.bar_len+1:])
index=self.index_list[-self.bar_len + 1:],
high=self.high_array[-self.bar_len + 1:],
low=self.low_array[-self.bar_len + 1:])
self._fenxing_list = self.chan_graph.fenxing_list
self._bi_list = self.chan_graph.bi_list
self._bi_zs_list = self.chan_graph.bi_zhongshu_list
@ -4150,15 +4149,15 @@ class CtaRenkoBar(object):
# 背驰: 同向分笔,逐笔提升,最后一笔,比上一同向笔,短,斜率也比上一同向笔小
if direction == 1:
if duan.bi_list[-1].low > duan.bi_list[-3].low > duan.bi_list[-5].low \
and duan.bi_list[-1].low > duan.bi_list[-5].high \
and duan.bi_list[-1].height < duan.bi_list[-3].height \
and duan.bi_list[-1].atan < duan.bi_list[-3].atan:
and duan.bi_list[-1].low > duan.bi_list[-5].high \
and duan.bi_list[-1].height < duan.bi_list[-3].height \
and duan.bi_list[-1].atan < duan.bi_list[-3].atan:
return True
if direction == -1:
if duan.bi_list[-1].high < duan.bi_list[-3].high < duan.bi_list[-5].high \
and duan.bi_list[-1].high < duan.bi_list[-5].low \
and duan.bi_list[-1].height < duan.bi_list[-3].height\
and duan.bi_list[-1].high < duan.bi_list[-5].low \
and duan.bi_list[-1].height < duan.bi_list[-3].height \
and duan.bi_list[-1].atan < duan.bi_list[-3].atan:
return True
@ -4343,20 +4342,20 @@ class CtaRenkoBar(object):
entry_bi = cur_zs.bi_list[0]
if entry_bi.direction != direction:
# 找出中枢之前,与段同向得笔
before_bi_list = [bi for bi in cur_duan.bi_list if bi.start < entry_bi.start and bi.direction==direction]
before_bi_list = [bi for bi in cur_duan.bi_list if bi.start < entry_bi.start and bi.direction == direction]
# 中枢之前得同向笔,不存在(一般不可能,因为中枢得第一笔不同向,该中枢存在与段中间)
if len(before_bi_list) == 0:
return False
entry_bi = before_bi_list[-1]
# 中枢第一笔,与最后一笔,比较力度和能量
if entry_bi.height > cur_zs.bi_list[-1].height\
and entry_bi.atan > cur_zs.bi_list[-1].atan:
if entry_bi.height > cur_zs.bi_list[-1].height \
and entry_bi.atan > cur_zs.bi_list[-1].atan:
return True
return False
def is_zs_fangda(self, cur_bi_zs = None, start=False, last_bi=False):
def is_zs_fangda(self, cur_bi_zs=None, start=False, last_bi=False):
"""
判断中枢是否为放大型中枢
中枢放大一般是反向力量的强烈试探导致
@ -4449,14 +4448,15 @@ class CtaRenkoBar(object):
cur_zs = zs_list_inside_duan[-1]
# 上一个中枢
pre_zs = zs_list_inside_duan[-2]
bi_list_between_zs = [bi for bi in cur_duan.bi_list if bi.direction == direction and bi.end > pre_zs.end and bi.start < cur_zs.start]
if len(bi_list_between_zs) ==0:
bi_list_between_zs = [bi for bi in cur_duan.bi_list if
bi.direction == direction and bi.end > pre_zs.end and bi.start < cur_zs.start]
if len(bi_list_between_zs) == 0:
return False
# 最后一笔作为2个中枢间的笔
bi_between_zs = bi_list_between_zs[-1]
bi_list_after_cur_zs = [bi for bi in cur_duan.bi_list if bi.direction==direction and bi.end > cur_zs.end]
bi_list_after_cur_zs = [bi for bi in cur_duan.bi_list if bi.direction == direction and bi.end > cur_zs.end]
if len(bi_list_after_cur_zs) == 0:
return False
@ -4471,15 +4471,16 @@ class CtaRenkoBar(object):
if bi_leave_cur_zs.start != self.bi_list[-1].start:
return False
fx = [fx for fx in self.fenxing_list[-2:] if fx.direction==direction][-1]
fx = [fx for fx in self.fenxing_list[-2:] if fx.direction == direction][-1]
if fx.is_rt:
return False
return False
# 中枢间的分笔,能量大于最后分笔,形成走势背驰
if bi_between_zs.height > bi_leave_cur_zs.height and bi_between_zs.atan > bi_leave_cur_zs.atan:
return True
return False
# ----------------------------------------------------------------------
def write_log(self, content):
"""记录CTA日志"""
@ -4576,7 +4577,8 @@ class CtaRenkoBar(object):
if not os.path.exists(file_name):
self.write_log(u'create csv file:{}'.format(file_name))
with open(file_name, 'a', encoding='utf8', newline='') as csvWriteFile:
writer = csv.DictWriter(f=csvWriteFile, fieldnames=dict_fieldnames, dialect='excel', extrasaction='ignore')
writer = csv.DictWriter(f=csvWriteFile, fieldnames=dict_fieldnames, dialect='excel',
extrasaction='ignore')
self.write_log(u'write csv header:{}'.format(dict_fieldnames))
writer.writeheader()
writer.writerow(dict_data)

View File

@ -17,6 +17,7 @@ from vnpy.api.rest import RestClient, Request
from vnpy.api.websocket import WebsocketClient
from vnpy.trader.constant import (
Direction,
Offset,
Exchange,
Product,
Status,
@ -353,6 +354,11 @@ class BinanceRestApi(RestClient):
orderid,
self.gateway_name
)
if order.direction == Direction.LONG and order.offset != Offset.OPEN:
order.offset = Offset.OPEN
elif order.direction == Direction.SHORT and order.offset !=Offset.CLOSE:
order.offset = Offset.CLOSE
order.accountid = self.accountid
order.vt_accountid = f"{self.gateway_name}.{self.accountid}"
order.datetime = datetime.now()
@ -699,6 +705,7 @@ class BinanceRestApi(RestClient):
high_price=float(l[2]),
low_price=float(l[3]),
close_price=float(l[4]),
trading_day=dt.strftime('%Y-%m-%d'),
gateway_name=self.gateway_name
)
buf.append(bar)
@ -805,6 +812,11 @@ class BinanceTradeWebsocketApi(WebsocketClient):
time=time,
gateway_name=self.gateway_name
)
if order.direction == Direction.LONG:
order.offset = Offset.OPEN
elif order.direction == Direction.SHORT:
order.offset = Offset.CLOSE
self.gateway.write_log(f'WS订单更新:\n{order.__dict__}')
self.gateway.on_order(order)
@ -829,6 +841,11 @@ class BinanceTradeWebsocketApi(WebsocketClient):
datetime=trade_dt,
gateway_name=self.gateway_name,
)
if trade.direction == Direction.LONG:
trade.offset = Offset.OPEN
elif trade.direction == Direction.SHORT:
trade.offset = Offset.CLOSE
self.gateway.write_log(f'WS成交更新:\n{trade.__dict__}')
self.gateway.on_trade(trade)
@ -895,12 +912,24 @@ class BinanceDataWebsocketApi(WebsocketClient):
tick = self.ticks[symbol]
if channel == "ticker":
tick.volume = float(data['v'])
tick_dt = datetime.fromtimestamp(float(data['E']) / 1000)
trading_day = tick_dt.strftime('%Y-%m-%d')
today_volume = float(data['v'])
if tick.trading_day == trading_day:
volume_changed = max(0, today_volume - tick.volume)
else:
volume_changed = today_volume if len(tick.trading_day) > 0 else 1
tick.volume = today_volume
tick.last_volume = volume_changed
tick.open_price = float(data['o'])
tick.high_price = float(data['h'])
tick.low_price = float(data['l'])
tick.last_price = float(data['c'])
tick.datetime = datetime.fromtimestamp(float(data['E']) / 1000)
tick.datetime = tick_dt
tick.trading_day = trading_day
tick.date = tick.trading_day
tick.time = tick.datetime.strftime('%H:%M:%S.%f')
else:
bids = data["bids"]
for n in range(5):

View File

@ -794,6 +794,7 @@ class BinancefRestApi(RestClient):
high_price=float(l[2]),
low_price=float(l[3]),
close_price=float(l[4]),
trading_day=dt.strftime('%Y-%m-%d'),
gateway_name=self.gateway_name
)
buf.append(bar)
@ -1051,12 +1052,24 @@ class BinancefDataWebsocketApi(WebsocketClient):
tick = self.ticks[symbol]
if channel == "ticker":
tick.volume = float(data['v'])
tick_dt = datetime.fromtimestamp(float(data['E']) / 1000)
trading_day = tick_dt.strftime('%Y-%m-%d')
today_volume = float(data['v'])
if tick.trading_day == trading_day:
volume_changed = max(0, today_volume - tick.volume)
else:
volume_changed = today_volume if len(tick.trading_day) > 0 else 1
tick.volume = today_volume
tick.last_volume = volume_changed
tick.open_price = float(data['o'])
tick.high_price = float(data['h'])
tick.low_price = float(data['l'])
tick.last_price = float(data['c'])
tick.datetime = datetime.fromtimestamp(float(data['E']) / 1000)
tick.datetime = tick_dt
tick.trading_day = trading_day
tick.date = tick.trading_day
tick.time = tick.datetime.strftime('%H:%M:%S.%f')
else:
bids = data["b"]
for n in range(5):

View File

@ -591,6 +591,8 @@ class CtpMdApi(MdApi):
self.userid = ""
self.password = ""
self.brokerid = ""
# 缓存tick的交易日和当前累计volume
self.last_ticks_info = {} # {symbol: {'trading_dat':'xxx', volume:xxx}}
def onFrontConnected(self):
"""
@ -651,20 +653,30 @@ class CtpMdApi(MdApi):
dt = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S.%f")
# 不处理开盘前的tick数据
if dt.hour in [8, 20] and dt.minute <= 59:
if dt.hour in [7, 8, 18, 19, 20] and dt.minute <= 59:
return
if exchange is Exchange.CFFEX and dt.hour == 9 and dt.minute <= 29:
return
today_volume = data["Volume"]
last_tick_info = self.last_ticks_info.get(symbol, {})
trading_day = get_trading_date(dt)
last_trading_day = last_tick_info.get('trading_day', None)
if last_trading_day == trading_day:
volume_changed = max(0, today_volume - last_tick_info.get('volume', 0))
else:
volume_changed = today_volume
self.last_ticks_info.update({symbol: {'trading_day': trading_day, 'volume': today_volume}})
tick = TickData(
symbol=symbol,
exchange=exchange,
datetime=dt,
date=s_date,
time=dt.strftime('%H:%M:%S.%f'),
trading_day=get_trading_date(dt),
trading_day=trading_day,
name=symbol_name_map[symbol],
volume=data["Volume"],
volume=today_volume,
last_volume=volume_changed,
open_interest=data["OpenInterest"],
last_price=data["LastPrice"],
limit_up=data["UpperLimitPrice"],
@ -1014,11 +1026,11 @@ class CtpTdApi(TdApi):
account.available = round(float(data["Available"]), 7)
account.commission = round(float(data['Commission']), 7)
account.margin = round(float(data['CurrMargin']), 7)
account.close_profit = round(float(data['CloseProfit']), 7) #+ round(
#float(data.get("SpecProductCloseProfit", 0)), 7)
account.holding_profit = round(float(data['PositionProfit']), 7) #+ round(
#float(data.get("SpecProductPositionProfit", 0)), 7) + round(
#float(data.get("SpecProductPositionProfitByAlg", 0)), 7)
account.close_profit = round(float(data['CloseProfit']), 7) # + round(
# float(data.get("SpecProductCloseProfit", 0)), 7)
account.holding_profit = round(float(data['PositionProfit']), 7) # + round(
# float(data.get("SpecProductPositionProfit", 0)), 7) + round(
# float(data.get("SpecProductPositionProfitByAlg", 0)), 7)
account.trading_day = str(data['TradingDay'])
if '-' not in account.trading_day and len(account.trading_day) == 8:
account.trading_day = '-'.join(
@ -1185,7 +1197,7 @@ class CtpTdApi(TdApi):
trade_date = trade_date[0:4] + '-' + trade_date[4:6] + '-' + trade_date[6:8]
trade_time = data['TradeTime']
trade_datetime = datetime.strptime(f'{trade_date} {trade_time}', '%Y-%m-%d %H:%M:%S')
#print(f'raw_data:{print_dict(data)}')
# print(f'raw_data:{print_dict(data)}')
# 修正 郑商所、大商所的TradeDate错误
if exchange in [Exchange.DCE, Exchange.CZCE]:
dt_now = datetime.now()
@ -1201,11 +1213,11 @@ class CtpTdApi(TdApi):
# 星期一 =》 星期五
if dt_now.isoweekday() == 1:
trade_datetime -= timedelta(days=3)
#print(f'trade time =>{trade_datetime}')
# print(f'trade time =>{trade_datetime}')
# 星期二~星期五 =》上一天
else:
trade_datetime -= timedelta(days=1)
#print(f'trade time =>{trade_datetime}')
# print(f'trade time =>{trade_datetime}')
tradeid = data["TradeID"]
trade = TradeData(
@ -1706,6 +1718,7 @@ class TdxMdApi():
tick.low_price = float(d.get('ZuiDi', 0.0))
tick.last_price = float(d.get('MaiChu', 0.0))
tick.volume = int(d.get('XianLiang', 0))
tick.last_volume = tick.volume
tick.open_interest = d.get('ChiCangLiang')
tick.time = tick.datetime.strftime('%H:%M:%S.%f')[0:12]
@ -1837,6 +1850,8 @@ class SubMdApi():
d.pop('exchange', None)
d.pop('symbol', None)
tick.__dict__.update(d)
if len(tick.trading_day) == 0:
tick.trading_day = get_trading_date(dt)
self.symbol_tick_dict[symbol] = tick
self.gateway.on_tick(tick)
@ -1953,6 +1968,8 @@ class TqMdApi():
self.ticks = {}
self.last_ticks_info = {}
def connect(self, setting):
""""""
try:
@ -1973,12 +1990,27 @@ class TqMdApi():
# 清洗 nan
quote = {k: 0 if v != v else v for k, v in quote.items()}
symbol, exchange = extract_vt_symbol(vt_symbol)
tick_dt = datetime.strptime(quote["datetime"], "%Y-%m-%d %H:%M:%S.%f")
today_volume = quote["volume"]
last_tick_info = self.last_ticks_info.get(symbol, {})
trading_day = get_trading_date(tick_dt)
last_trading_day = last_tick_info.get('trading_day', None)
if last_trading_day == trading_day:
volume_changed = max(0, today_volume - last_tick_info.get('volume', 0))
else:
volume_changed = today_volume
self.last_ticks_info.update({symbol: {'trading_day': trading_day, 'volume': today_volume}})
tick = TickData(
symbol=symbol,
exchange=exchange,
datetime=datetime.strptime(quote["datetime"], "%Y-%m-%d %H:%M:%S.%f"),
datetime=tick_dt,
trading_day=trading_day,
name=symbol,
volume=quote["volume"],
volume=today_volume,
last_volume=volume_changed,
open_interest=quote["open_interest"],
last_price=quote["last_price"],
limit_up=quote["upper_limit"],

View File

@ -1232,8 +1232,13 @@ class PbTdApi(object):
order_dt = datetime.strptime(f'{order_date} {order_time}', "%Y%m%d %H%M%S")
direction = DIRECTION_STOCK_NAME2VT.get(data["委托方向"])
offset = Offset.NONE
if direction is None:
direction = Direction.NET
elif direction == Direction.LONG:
offset = Offset.OPEN
elif direction == Direction.SHORT:
offset = Offset.CLOSE
sys_order = OrderData(
gateway_name=self.gateway_name,
symbol=data["证券代码"],
@ -1243,7 +1248,7 @@ class PbTdApi(object):
accountid=self.userid,
type=ORDERTYPE_NAME2VT.get(data["价格类型"], OrderType.LIMIT),
direction=direction,
offset=Offset.NONE,
offset=offset,
price=float(data["委托价格"]),
volume=float(data["委托数量"]),
traded=float(data["成交数量"]),
@ -1306,8 +1311,13 @@ class PbTdApi(object):
self.gateway.write_log(f'本地委托编号{local_orderid}不在本地订单中')
direction = DIRECTION_STOCK_NAME2VT.get(str(data.wtfx).strip())
offset = Offset.NONE
if direction is None:
direction = Direction.NET
elif direction == Direction.LONG:
offset = Offset.OPEN
elif direction == Direction.SHORT:
offset = Offset.CLOSE
if order_status == Status.ALLTRADED:
traded = data.wtsl
else:
@ -1323,7 +1333,7 @@ class PbTdApi(object):
accountid=self.userid,
type=ORDERTYPE_PB2VT.get(str(data.wtjglx).strip(), OrderType.LIMIT),
direction=direction,
offset=Offset.NONE,
offset=offset,
price=float(data.wtjg),
volume=float(data.wtsl),
traded=traded,
@ -1460,6 +1470,14 @@ class PbTdApi(object):
trade_date = data["成交日期"]
trade_time = data["成交时间"]
trade_dt = datetime.strptime(f'{trade_date} {trade_time}', "%Y%m%d %H%M%S")
direction = DIRECTION_STOCK_NAME2VT.get(data["委托方向"])
offset = Offset.NONE
if direction is None:
direction = Direction.NET
elif direction == Direction.LONG:
offset = Offset.OPEN
elif direction == Direction.SHORT:
offset = Offset.CLOSE
trade = TradeData(
gateway_name=self.gateway_name,
symbol=data["证券代码"],
@ -1468,8 +1486,8 @@ class PbTdApi(object):
tradeid=sys_tradeid,
sys_orderid=sys_orderid,
accountid=self.userid,
direction=DIRECTION_STOCK_NAME2VT.get(data["委托方向"]),
offset=Offset.NONE,
direction=direction,
offset=offset,
price=float(data["成交价格"]),
volume=float(data["成交数量"]),
datetime=trade_dt,
@ -1514,7 +1532,14 @@ class PbTdApi(object):
trade_date = str(data.cjrq).strip()
trade_time = str(data.cjsj).strip()
trade_dt = datetime.strptime(f'{trade_date} {trade_time}', "%Y%m%d %H%M%S")
direction = DIRECTION_ORDER_PB2VT.get(str(data.wtfx).strip())
offset = Offset.NONE
if direction is None:
direction = Direction.NET
elif direction == Direction.LONG:
offset = Offset.OPEN
elif direction == Direction.SHORT:
offset = Offset.CLOSE
trade = TradeData(
gateway_name=self.gateway_name,
symbol=str(data.zqdm).strip(),
@ -1523,8 +1548,8 @@ class PbTdApi(object):
tradeid=sys_tradeid,
sys_orderid=sys_orderid,
accountid=self.userid,
direction=DIRECTION_ORDER_PB2VT.get(str(data.wtfx).strip()),
offset=Offset.NONE,
direction=direction,
offset=offset,
price=float(data.cjjg),
volume=int(data.cjsl),
datetime=trade_dt,
@ -1582,7 +1607,14 @@ class PbTdApi(object):
trade_date = str(data["CJRQ"]).lstrip()
trade_time = str(data["CJSJ"]).lstrip()
trade_dt = datetime.strptime(f'{trade_date} {trade_time}', "%Y%m%d %H%M%S")
direction = DIRECTION_ORDER_PB2VT.get(str(data.wtfx).strip())
offset = Offset.NONE
if direction is None:
direction = Direction.NET
elif direction == Direction.LONG:
offset = Offset.OPEN
elif direction == Direction.SHORT:
offset = Offset.CLOSE
trade = TradeData(
gateway_name=self.gateway_name,
symbol=str(data["ZQDM"]).lstrip(),
@ -1591,8 +1623,8 @@ class PbTdApi(object):
tradeid=sys_tradeid,
sys_orderid=sys_orderid,
accountid=self.userid,
direction=DIRECTION_ORDER_PB2VT.get(str(data["WTFX"]).lstrip()),
offset=Offset.NONE,
direction=direction,
offset=offset,
price=float(str(data["CJJG"]).lstrip()),
volume=float(str(data["CJSL"]).lstrip()),
datetime=trade_dt,

View File

@ -21,9 +21,9 @@ class RpcGateway(BaseGateway):
exchanges = list(Exchange)
def __init__(self, event_engine):
def __init__(self, event_engine, gateway_name="RPC"):
"""Constructor"""
super().__init__(event_engine, "RPC")
super().__init__(event_engine, gateway_name)
self.symbol_gateway_map = {}

View File

@ -121,6 +121,7 @@ class StockRpcGateway(BaseGateway):
:param req:
:return:
"""
self.write_log(f'使用prc委托:{req.__dict__}')
ref = self.client.send_order(req, self.remote_gw_name)
local_ref = ref.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.')
@ -160,9 +161,9 @@ class StockRpcGateway(BaseGateway):
for position in positions:
position.gateway_name = self.gateway_name
# 更换 vt_positionid得gateway前缀
position.vt_positionid.replace(f'{position.gateway_name}.', f'{self.gateway_name}.')
position.vt_positionid = position.vt_positionid.replace(f'{position.gateway_name}.', f'{self.gateway_name}.')
# 更换 vt_accountid得gateway前缀
position.vt_accountid.replace(f'{position.gateway_name}.', f'{self.gateway_name}.')
position.vt_accountid = position.vt_accountid.replace(f'{position.gateway_name}.', f'{self.gateway_name}.')
self.on_position(position)
self.write_log("持仓信息查询成功")
@ -172,9 +173,9 @@ class StockRpcGateway(BaseGateway):
# 更换gateway
order.gateway_name = self.gateway_name
# 更换 vt_orderid得gateway前缀
order.vt_orderid.replace(f'{order.gateway_name}.', f'{self.gateway_name}.')
order.vt_orderid = order.vt_orderid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.')
# 更换 vt_accountid得gateway前缀
order.vt_accountid.replace(f'{order.gateway_name}.', f'{self.gateway_name}.')
order.vt_accountid = order.vt_accountid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.')
self.on_order(order)
self.write_log("委托信息查询成功")
@ -183,11 +184,11 @@ class StockRpcGateway(BaseGateway):
for trade in trades:
trade.gateway_name = self.gateway_name
# 更换 vt_orderid得gateway前缀
trade.vt_orderid.replace(f'{trade.gateway_name}.', f'{self.gateway_name}.')
trade.vt_orderid = trade.vt_orderid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.')
# 更换 vt_orderid得gateway前缀
trade.vt_orderid.replace(f'{trade.gateway_name}.', f'{self.gateway_name}.')
trade.vt_orderid = trade.vt_orderid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.')
# 更换 vt_accountid得gateway前缀
trade.vt_accountid.replace(f'{trade.gateway_name}.', f'{self.gateway_name}.')
trade.vt_accountid = trade.vt_accountid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.')
self.on_trade(trade)
self.write_log("成交信息查询成功")
@ -208,14 +209,24 @@ class StockRpcGateway(BaseGateway):
if hasattr(data, "gateway_name"):
data.gateway_name = self.gateway_name
if hasattr(data, 'vt_orderid'):
data.vt_orderid = data.vt_orderid.replace(f'{data.gateway_name}.', f'{self.gateway_name}.')
rpc_vt_orderid = data.vt_orderid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.')
self.write_log(f' vt_orderid :{data.vt_orderid} => {rpc_vt_orderid}')
data.vt_orderid = rpc_vt_orderid
if hasattr(data, 'vt_tradeid'):
data.vt_tradeid = data.vt_tradeid.replace(f'{data.gateway_name}.', f'{self.gateway_name}.')
rpc_vt_tradeid = data.vt_tradeid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.')
self.write_log(f' vt_tradeid :{data.vt_tradeid} => {rpc_vt_tradeid}')
data.vt_tradeid = rpc_vt_tradeid
if hasattr(data, 'vt_accountid'):
data.vt_accountid = data.vt_accountid.replace(f'{data.gateway_name}.', f'{self.gateway_name}.')
data.vt_accountid = data.vt_accountid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.')
if hasattr(data, 'vt_positionid'):
data.vt_positionid = data.vt_positionid.replace(f'{data.gateway_name}.', f'{self.gateway_name}.')
data.vt_positionid = data.vt_positionid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.')
if event.type in [EVENT_ORDER,EVENT_TRADE]:
self.write_log(f'{self.remote_gw_name} => {self.gateway_name} event:{data.__dict__}')
self.event_engine.put(event)