From a446ce08fcd3b4966c263f9d92b29e59b042b023 Mon Sep 17 00:00:00 2001 From: msincenselee Date: Thu, 28 Jan 2021 11:57:04 +0800 Subject: [PATCH] =?UTF-8?q?[bug=20fix]=20bar=20volume=E5=92=8Ctick=20volum?= =?UTF-8?q?e=20=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/component/cta_line_bar.py | 112 +++++++++++---------- vnpy/component/cta_renko_bar.py | 70 ++++++------- vnpy/gateway/binance/binance_gateway.py | 33 +++++- vnpy/gateway/binancef/binancef_gateway.py | 17 +++- vnpy/gateway/ctp/ctp_gateway.py | 58 ++++++++--- vnpy/gateway/pb/pb_gateway.py | 52 ++++++++-- vnpy/gateway/rpc/rpc_gateway.py | 4 +- vnpy/gateway/stockrpc/stock_rpc_gateway.py | 33 ++++-- 8 files changed, 252 insertions(+), 127 deletions(-) diff --git a/vnpy/component/cta_line_bar.py b/vnpy/component/cta_line_bar.py index a062df9f..227874fb 100644 --- a/vnpy/component/cta_line_bar.py +++ b/vnpy/component/cta_line_bar.py @@ -795,7 +795,7 @@ class CtaLineBar(object): """ # Tick 有效性检查 if not self.is_7x24 and (tick.datetime.hour == 8 or tick.datetime.hour == 20): - self.write_log(u'竞价排名tick时间:{0}'.format(tick.datetime)) + self.write_log(u'{}竞价排名tick时间:{}'.format(self.name, tick.datetime)) return self.cur_datetime = tick.datetime @@ -1256,14 +1256,14 @@ class CtaLineBar(object): # K线的日期时间 self.cur_bar.trading_day = tick.trading_day # K线所在的交易日期 - self.cur_bar.date = tick.date # K线的日期,(夜盘的话,与交易日期不同哦) + #self.cur_bar.date = tick.date # K线的日期,(夜盘的话,与交易日期不同哦) self.cur_bar.datetime = tick.datetime if (self.interval == Interval.SECOND and self.bar_interval % 60 == 0) \ - or self.interval in [Interval.MINUTE, Interval.HOUR, Interval.DAILY]: + or self.interval in [Interval.MINUTE, Interval.HOUR, Interval.DAILY, Interval.WEEKLY]: # K线的日期时间(去除秒)设为第一个Tick的时间 self.cur_bar.datetime = self.cur_bar.datetime.replace(second=0, microsecond=0) - self.cur_bar.time = self.cur_bar.datetime.strftime('%H:%M:%S') - self.cur_bar.volume = tick.last_volume + #self.cur_bar.time = self.cur_bar.datetime.strftime('%H:%M:%S') + self.cur_bar.volume = tick.last_volume if tick.last_volume > 0 else tick.volume if self.cur_trading_day != self.cur_bar.trading_day or not self.line_bar: # bar的交易日与记录的当前交易日不一致: self.cur_trading_day = self.cur_bar.trading_day @@ -1271,6 +1271,8 @@ class CtaLineBar(object): self.is_first_tick = True # 标识该Tick属于该Bar的第一个tick数据 # 6、将生成的正在合成的self.cur_bar 推入到line_bar队列 + if self.interval in [Interval.HOUR, Interval.DAILY, Interval.WEEKLY]: + self.write_log(f'[{self.name}] create new bar, [{self.cur_bar.datetime}==> ]') self.line_bar.append(self.cur_bar) # 推入到lineBar队列 def generate_bar(self, tick: TickData): @@ -1385,7 +1387,7 @@ class CtaLineBar(object): # volume_change = tick.volume - self.last_tick.volume # lastbar.volume += max(volume_change, 0) # 更新 bar内成交量volume 新版根据tick内成交量运算 - lastBar.volume += tick.last_volume + lastBar.volume += tick.last_volume if tick.last_volume > 0 else tick.volume # 更新Bar的颜色 if lastBar.close_price > lastBar.open_price: @@ -6483,7 +6485,7 @@ class CtaMinuteBar(CtaLineBar): del self.line_bar[0] # 与最后一个BAR的时间比对,判断是否超过K线周期 - lastBar = self.line_bar[-1] + last_bar = self.line_bar[-1] is_new_bar = False endtick = False @@ -6509,11 +6511,11 @@ class CtaMinuteBar(CtaLineBar): is_new_bar = False # 不在同一交易日,推入新bar - if self.cur_trading_day != tick.trading_day: + if len(tick.trading_day) > 0 and len(self.cur_trading_day) > 0 and self.cur_trading_day != tick.trading_day: # self.write_log('{} drawLineBar() new_bar,{} curTradingDay:{},tick.trading_day:{} bars_count={}' # .format(self.name, tick.datetime.strftime("%Y-%m-%d %H:%M:%S"), self.cur_trading_day, # tick.trading_day, self.bars_count)) - + self.write_log(f'trading_day:{self.cur_trading_day} => tick.trading_day:{tick.trading_day} ') is_new_bar = True self.cur_trading_day = tick.trading_day self.bars_count = bars_passed @@ -6533,8 +6535,8 @@ class CtaMinuteBar(CtaLineBar): if ( ( - tick.datetime.hour * 60 + tick.datetime.minute) % self.bar_interval == 0 and tick.datetime.minute != lastBar.datetime.minute) or ( - tick.datetime - lastBar.datetime).total_seconds() > self.bar_interval * 60: + tick.datetime.hour * 60 + tick.datetime.minute) % self.bar_interval == 0 and tick.datetime.minute != last_bar.datetime.minute) or ( + tick.datetime - last_bar.datetime).total_seconds() > self.bar_interval * 60: # self.write_log('{} drawLineBar() new_bar,{} lastbar:{}, bars_count={}' # .format(self.name, tick.datetime, lastBar.datetime, # self.bars_count)) @@ -6544,25 +6546,25 @@ class CtaMinuteBar(CtaLineBar): # 创建并推入新的Bar self.first_tick(tick) # 触发OnBar事件 - self.on_bar(lastBar) + self.on_bar(last_bar) else: # 更新当前最后一个bar self.barFirstTick = False # 更新最高价、最低价、收盘价、成交量 - lastBar.high_price = max(lastBar.high_price, tick.last_price) - lastBar.low_price = min(lastBar.low_price, tick.last_price) - lastBar.close_price = tick.last_price - lastBar.open_interest = tick.open_interest - lastBar.volume += tick.volume + last_bar.high_price = max(last_bar.high_price, tick.last_price) + last_bar.low_price = min(last_bar.low_price, tick.last_price) + last_bar.close_price = tick.last_price + last_bar.open_interest = tick.open_interest + last_bar.volume += tick.last_volume if tick.last_volume > 0 else tick.volume # 更新Bar的颜色 - if lastBar.close_price > lastBar.open_price: - lastBar.color = Color.RED - elif lastBar.close_price < lastBar.open_price: - lastBar.color = Color.BLUE + if last_bar.close_price > last_bar.open_price: + last_bar.color = Color.RED + elif last_bar.close_price < last_bar.open_price: + last_bar.color = Color.BLUE else: - lastBar.color = Color.EQUAL + last_bar.color = Color.EQUAL # 实时计算 self.rt_executed = False @@ -6626,7 +6628,7 @@ class CtaHourBar(CtaLineBar): if bar.trading_day is None: if self.is_7x24: - bar.trading_day = bar.date + bar.trading_day = bar.datetime.strftime('%Y-%m-%d') else: bar.trading_day = get_trading_date(bar.datetime) @@ -6722,7 +6724,7 @@ class CtaHourBar(CtaLineBar): endtick = True # 与最后一个BAR的时间比对,判断是否超过K线周期 - lastBar = self.line_bar[-1] + last_bar = self.line_bar[-1] is_new_bar = False if self.last_minute is None: @@ -6731,7 +6733,8 @@ class CtaHourBar(CtaLineBar): self.last_minute = tick.datetime.minute # 不在同一交易日,推入新bar - if self.cur_trading_day != tick.trading_day: + if len(tick.trading_day) > 0 and self.cur_trading_day != tick.trading_day: + self.write_log(f'trading_day:{self.cur_trading_day} => tick.trading_day: {tick.trading_day} ') is_new_bar = True # 去除分钟和秒数 tick.datetime = datetime.strptime(tick.datetime.strftime('%Y-%m-%d %H:00:00'), '%Y-%m-%d %H:%M:%S') @@ -6750,7 +6753,7 @@ class CtaHourBar(CtaLineBar): if self.is_7x24: # 数字货币,用前后时间间隔 - if (tick.datetime - lastBar.datetime).total_seconds() >= 3600 * self.bar_interval: + if (tick.datetime - last_bar.datetime).total_seconds() >= 3600 * self.bar_interval: # self.write_log('{} drawLineBar() new_bar,{} - {} > 3600 * {} ' # .format(self.name, tick.datetime.strftime("%Y-%m-%d %H:%M:%S"), # lastBar.datetime.strftime("%Y-%m-%d %H:%M:%S"), @@ -6780,27 +6783,28 @@ class CtaHourBar(CtaLineBar): self.first_tick(tick) self.m1_bars_count = 1 # 触发OnBar事件 - self.on_bar(lastBar) + self.write_log(f'[{self.name}] process on_bar event [{last_bar.datetime} => {tick.datetime}]') + self.on_bar(last_bar) else: # 更新当前最后一个bar self.barFirstTick = False # 更新最高价、最低价、收盘价、成交量 - lastBar.high_price = max(lastBar.high_price, tick.last_price) - lastBar.low_price = min(lastBar.low_price, tick.last_price) - lastBar.close_price = tick.last_price - lastBar.open_interest = tick.open_interest + last_bar.high_price = max(last_bar.high_price, tick.last_price) + last_bar.low_price = min(last_bar.low_price, tick.last_price) + last_bar.close_price = tick.last_price + last_bar.open_interest = tick.open_interest - lastBar.volume += tick.volume + last_bar.volume += tick.last_volume if tick.last_volume > 0 else tick.volume # 更新Bar的颜色 - if lastBar.close_price > lastBar.open_price: - lastBar.color = Color.RED - elif lastBar.close_price < lastBar.open_price: - lastBar.color = Color.BLUE + if last_bar.close_price > last_bar.open_price: + last_bar.color = Color.RED + elif last_bar.close_price < last_bar.open_price: + last_bar.color = Color.BLUE else: - lastBar.color = Color.EQUAL + last_bar.color = Color.EQUAL # 实时计算 self.rt_executed = False @@ -6872,14 +6876,14 @@ class CtaDayBar(CtaLineBar): if bar_len == 0: new_bar = copy.deepcopy(bar) self.line_bar.append(new_bar) - self.cur_trading_day = bar.trading_day if bar.trading_day is not None else bar.date + self.cur_trading_day = bar.trading_day if bar.trading_day is not None else get_trading_date(bar.datetime) if bar_is_completed: self.on_bar(bar) return # 与最后一个BAR的时间比对,判断是否超过K线的周期 lastBar = self.line_bar[-1] - self.cur_trading_day = bar.trading_day if bar.trading_day is not None else bar.date + self.cur_trading_day = bar.trading_day if bar.trading_day is not None else get_trading_date(bar.datetime) is_new_bar = False if bar_is_completed: @@ -6932,12 +6936,13 @@ class CtaDayBar(CtaLineBar): del self.line_bar[0] # 与最后一个BAR的时间比对,判断是否超过K线周期 - lastBar = self.line_bar[-1] + last_bar = self.line_bar[-1] is_new_bar = False # 交易日期不一致,新的交易日 - if len(tick.trading_day) > 0 and tick.trading_day != lastBar.trading_day: + if len(tick.trading_day) > 0 and tick.trading_day != last_bar.trading_day: + self.write_log(f'trading_day:{last_bar.trading_day} => tick.trading_day:{tick.trading_day}') is_new_bar = True # 数字货币方面,如果当前tick 日期与bar的日期不一致.(取消,按照上面的统一处理,因为币安是按照UTC时间算的每天开始,ok是按照北京时间开始) @@ -6948,26 +6953,27 @@ class CtaDayBar(CtaLineBar): # 创建并推入新的Bar self.first_tick(tick) # 触发OnBar事件 - self.on_bar(lastBar) + self.write_log(f'[{self.name}] process on_bar event [{last_bar.datetime} => {tick.datetime}]') + self.on_bar(last_bar) else: # 更新当前最后一个bar self.barFirstTick = False # 更新最高价、最低价、收盘价、成交量 - lastBar.high_price = max(lastBar.high_price, tick.last_price) - lastBar.low_price = min(lastBar.low_price, tick.last_price) - lastBar.close_price = tick.last_price - lastBar.open_interest = tick.open_interest - lastBar.volume += tick.volume + last_bar.high_price = max(last_bar.high_price, tick.last_price) + last_bar.low_price = min(last_bar.low_price, tick.last_price) + last_bar.close_price = tick.last_price + last_bar.open_interest = tick.open_interest + last_bar.volume += tick.last_volume if tick.last_volume > 0 else tick.volume # 更新Bar的颜色 - if lastBar.close_price > lastBar.open_price: - lastBar.color = Color.RED - elif lastBar.close_price < lastBar.open_price: - lastBar.color = Color.BLUE + if last_bar.close_price > last_bar.open_price: + last_bar.color = Color.RED + elif last_bar.close_price < last_bar.open_price: + last_bar.color = Color.BLUE else: - lastBar.color = Color.EQUAL + last_bar.color = Color.EQUAL # 实时计算 self.rt_executed = False @@ -7190,7 +7196,7 @@ class CtaWeekBar(CtaLineBar): lastBar.close_price = tick.last_price lastBar.open_interest = tick.open_interest # 更新日内总交易量,和bar内交易量 - lastBar.volume += tick.volume + lastBar.volume += tick.last_volume if tick.last_volume > 0 else tick.volume # 更新Bar的颜色 if lastBar.close_price > lastBar.open_price: diff --git a/vnpy/component/cta_renko_bar.py b/vnpy/component/cta_renko_bar.py index 476ab5ea..01637228 100644 --- a/vnpy/component/cta_renko_bar.py +++ b/vnpy/component/cta_renko_bar.py @@ -21,7 +21,6 @@ from vnpy.trader.utility import round_to from vnpy.trader.constant import Direction, Color from vnpy.component.cta_period import CtaPeriod, Period - try: from vnpy.component.chanlun import ChanGraph, ChanLibrary except Exception as ex: @@ -587,9 +586,9 @@ class CtaRenkoBar(object): """修正tick的价格,取平均值""" # 修正最新价 if tick.last_price is None or tick.last_price == 0: - if tick.ask_price1 == 0 and tick.bid_price1 == 0: + if tick.ask_price_1 == 0 and tick.bid_price_1 == 0: return None - tick.last_price = round_to(target=self.price_tick, value=(tick.ask_price1 + tick.bid_price1) / 2) + tick.last_price = round_to(target=self.price_tick, value=(tick.ask_price_1 + tick.bid_price_1) / 2) if self.activate_kf_tick: avg_price = self.get_kf_tick_lastprice(tick.last_price, tick.datetime) @@ -598,10 +597,10 @@ class CtaRenkoBar(object): if avg_price != tick.last_price: tick.last_price = avg_price - if tick.ask_price1 != 0: - tick.ask_price1 = tick.last_price + self.price_tick - if tick.bid_price1 != 0: - tick.bid_price1 = tick.last_price - self.price_tick + if tick.ask_price_1 != 0: + tick.ask_price_1 = tick.last_price + self.price_tick + if tick.bid_price_1 != 0: + tick.bid_price_1 = tick.last_price - self.price_tick return tick def on_tick(self, tick): @@ -774,10 +773,10 @@ class CtaRenkoBar(object): new_height = int( max(cur_price / 1000, self.price_tick) * self.kilo_height / self.price_tick) * self.price_tick if new_height != self.height: - #self.write_log(u'修改:{}砖块高度:{}=>{}'.format(self.name, self.height, new_height)) + # self.write_log(u'修改:{}砖块高度:{}=>{}'.format(self.name, self.height, new_height)) self.height = new_height elif height != self.height: - #self.write_log(u'修改:{}砖块高度:{}=>{}'.format(self.name, self.height, height)) + # self.write_log(u'修改:{}砖块高度:{}=>{}'.format(self.name, self.height, height)) self.height = height def runtime_recount(self): @@ -985,7 +984,7 @@ class CtaRenkoBar(object): elif tick.last_price > self.cur_bar.up_band: self.cur_bar.high_time = tick.datetime - self.cur_bar.volume = tick.volume + self.cur_bar.volume = tick.last_volume if tick.last_volume > 0 else tick.volume self.cur_bar.open_interest = tick.open_interest # ---------------------------------------------------------------------- @@ -1041,7 +1040,7 @@ class CtaRenkoBar(object): self.cur_bar.low_price = min(tick.last_price, self.cur_bar.low_price) self.cur_bar.close_price = tick.last_price - self.cur_bar.volume = self.cur_bar.volume + tick.volume + self.cur_bar.volume += tick.last_volume if tick.last_volume > 0 else tick.volume self.cur_bar.open_interest = tick.open_interest # 仅为第一个bar,后续逻辑无意义,退出 @@ -1081,7 +1080,7 @@ class CtaRenkoBar(object): self.cur_bar.low_price = min(tick.last_price, self.cur_bar.low_price) self.cur_bar.close_price = tick.last_price - self.cur_bar.volume = self.cur_bar.volume + tick.volume + self.cur_bar.volume += tick.last_volume if tick.last_volume > 0 else tick.volume self.cur_bar.open_interest = tick.open_interest # 实时计算临时砖块的颜色 @@ -2140,7 +2139,7 @@ class CtaRenkoBar(object): :param ma_num:第几条均线, 1,对应para_ma1_len,,,, :return: """ - if self.para_ma1_len <=0 and self.para_ma2_len <=0 and self.para_ma3_len <= 0: + if self.para_ma1_len <= 0 and self.para_ma2_len <= 0 and self.para_ma3_len <= 0: return if self.cur_bar: rt_close_array = np.append(self.close_array, [self.cur_bar.close_price]) @@ -3018,7 +3017,7 @@ class CtaRenkoBar(object): # 发生死叉 self.cur_kd_count = -1 self.cur_kd_cross = round((self.line_k[-1] + self.line_k[-2]) / 2, self.round_n) - self.cur_kd_cross_price =self.cur_price + self.cur_kd_cross_price = self.cur_price def __count_macd(self): """ @@ -3381,8 +3380,8 @@ class CtaRenkoBar(object): initial_state_mean=self.close_array[-1], initial_state_covariance=1, transition_covariance=0.01, - observation_covariance = self.para_kf_obscov_len - ) + observation_covariance=self.para_kf_obscov_len + ) except Exception: self.write_log(u'导入卡尔曼过滤器失败,需先安装 pip install pykalman') self.para_active_kf = False @@ -4069,9 +4068,9 @@ class CtaRenkoBar(object): del self.chan_graph self.chan_graph = None self.chan_graph = ChanGraph(chan_lib=self.chan_lib, - index=self.index_list[-self.bar_len+1:], - high=self.high_array[-self.bar_len+1:], - low=self.low_array[-self.bar_len+1:]) + index=self.index_list[-self.bar_len + 1:], + high=self.high_array[-self.bar_len + 1:], + low=self.low_array[-self.bar_len + 1:]) self._fenxing_list = self.chan_graph.fenxing_list self._bi_list = self.chan_graph.bi_list self._bi_zs_list = self.chan_graph.bi_zhongshu_list @@ -4150,15 +4149,15 @@ class CtaRenkoBar(object): # 背驰: 同向分笔,逐笔提升,最后一笔,比上一同向笔,短,斜率也比上一同向笔小 if direction == 1: if duan.bi_list[-1].low > duan.bi_list[-3].low > duan.bi_list[-5].low \ - and duan.bi_list[-1].low > duan.bi_list[-5].high \ - and duan.bi_list[-1].height < duan.bi_list[-3].height \ - and duan.bi_list[-1].atan < duan.bi_list[-3].atan: + and duan.bi_list[-1].low > duan.bi_list[-5].high \ + and duan.bi_list[-1].height < duan.bi_list[-3].height \ + and duan.bi_list[-1].atan < duan.bi_list[-3].atan: return True if direction == -1: if duan.bi_list[-1].high < duan.bi_list[-3].high < duan.bi_list[-5].high \ - and duan.bi_list[-1].high < duan.bi_list[-5].low \ - and duan.bi_list[-1].height < duan.bi_list[-3].height\ + and duan.bi_list[-1].high < duan.bi_list[-5].low \ + and duan.bi_list[-1].height < duan.bi_list[-3].height \ and duan.bi_list[-1].atan < duan.bi_list[-3].atan: return True @@ -4343,20 +4342,20 @@ class CtaRenkoBar(object): entry_bi = cur_zs.bi_list[0] if entry_bi.direction != direction: # 找出中枢之前,与段同向得笔 - before_bi_list = [bi for bi in cur_duan.bi_list if bi.start < entry_bi.start and bi.direction==direction] + before_bi_list = [bi for bi in cur_duan.bi_list if bi.start < entry_bi.start and bi.direction == direction] # 中枢之前得同向笔,不存在(一般不可能,因为中枢得第一笔不同向,该中枢存在与段中间) if len(before_bi_list) == 0: return False entry_bi = before_bi_list[-1] # 中枢第一笔,与最后一笔,比较力度和能量 - if entry_bi.height > cur_zs.bi_list[-1].height\ - and entry_bi.atan > cur_zs.bi_list[-1].atan: + if entry_bi.height > cur_zs.bi_list[-1].height \ + and entry_bi.atan > cur_zs.bi_list[-1].atan: return True return False - def is_zs_fangda(self, cur_bi_zs = None, start=False, last_bi=False): + def is_zs_fangda(self, cur_bi_zs=None, start=False, last_bi=False): """ 判断中枢,是否为放大型中枢。 中枢放大,一般是反向力量的强烈试探导致; @@ -4449,14 +4448,15 @@ class CtaRenkoBar(object): cur_zs = zs_list_inside_duan[-1] # 上一个中枢 pre_zs = zs_list_inside_duan[-2] - bi_list_between_zs = [bi for bi in cur_duan.bi_list if bi.direction == direction and bi.end > pre_zs.end and bi.start < cur_zs.start] - if len(bi_list_between_zs) ==0: + bi_list_between_zs = [bi for bi in cur_duan.bi_list if + bi.direction == direction and bi.end > pre_zs.end and bi.start < cur_zs.start] + if len(bi_list_between_zs) == 0: return False # 最后一笔,作为2个中枢间的笔 bi_between_zs = bi_list_between_zs[-1] - bi_list_after_cur_zs = [bi for bi in cur_duan.bi_list if bi.direction==direction and bi.end > cur_zs.end] + bi_list_after_cur_zs = [bi for bi in cur_duan.bi_list if bi.direction == direction and bi.end > cur_zs.end] if len(bi_list_after_cur_zs) == 0: return False @@ -4471,15 +4471,16 @@ class CtaRenkoBar(object): if bi_leave_cur_zs.start != self.bi_list[-1].start: return False - fx = [fx for fx in self.fenxing_list[-2:] if fx.direction==direction][-1] + fx = [fx for fx in self.fenxing_list[-2:] if fx.direction == direction][-1] if fx.is_rt: - return False + return False # 中枢间的分笔,能量大于最后分笔,形成走势背驰 if bi_between_zs.height > bi_leave_cur_zs.height and bi_between_zs.atan > bi_leave_cur_zs.atan: return True return False + # ---------------------------------------------------------------------- def write_log(self, content): """记录CTA日志""" @@ -4576,7 +4577,8 @@ class CtaRenkoBar(object): if not os.path.exists(file_name): self.write_log(u'create csv file:{}'.format(file_name)) with open(file_name, 'a', encoding='utf8', newline='') as csvWriteFile: - writer = csv.DictWriter(f=csvWriteFile, fieldnames=dict_fieldnames, dialect='excel', extrasaction='ignore') + writer = csv.DictWriter(f=csvWriteFile, fieldnames=dict_fieldnames, dialect='excel', + extrasaction='ignore') self.write_log(u'write csv header:{}'.format(dict_fieldnames)) writer.writeheader() writer.writerow(dict_data) diff --git a/vnpy/gateway/binance/binance_gateway.py b/vnpy/gateway/binance/binance_gateway.py index e15c8cfd..c392a376 100644 --- a/vnpy/gateway/binance/binance_gateway.py +++ b/vnpy/gateway/binance/binance_gateway.py @@ -17,6 +17,7 @@ from vnpy.api.rest import RestClient, Request from vnpy.api.websocket import WebsocketClient from vnpy.trader.constant import ( Direction, + Offset, Exchange, Product, Status, @@ -353,6 +354,11 @@ class BinanceRestApi(RestClient): orderid, self.gateway_name ) + if order.direction == Direction.LONG and order.offset != Offset.OPEN: + order.offset = Offset.OPEN + elif order.direction == Direction.SHORT and order.offset !=Offset.CLOSE: + order.offset = Offset.CLOSE + order.accountid = self.accountid order.vt_accountid = f"{self.gateway_name}.{self.accountid}" order.datetime = datetime.now() @@ -699,6 +705,7 @@ class BinanceRestApi(RestClient): high_price=float(l[2]), low_price=float(l[3]), close_price=float(l[4]), + trading_day=dt.strftime('%Y-%m-%d'), gateway_name=self.gateway_name ) buf.append(bar) @@ -805,6 +812,11 @@ class BinanceTradeWebsocketApi(WebsocketClient): time=time, gateway_name=self.gateway_name ) + if order.direction == Direction.LONG: + order.offset = Offset.OPEN + elif order.direction == Direction.SHORT: + order.offset = Offset.CLOSE + self.gateway.write_log(f'WS订单更新:\n{order.__dict__}') self.gateway.on_order(order) @@ -829,6 +841,11 @@ class BinanceTradeWebsocketApi(WebsocketClient): datetime=trade_dt, gateway_name=self.gateway_name, ) + if trade.direction == Direction.LONG: + trade.offset = Offset.OPEN + elif trade.direction == Direction.SHORT: + trade.offset = Offset.CLOSE + self.gateway.write_log(f'WS成交更新:\n{trade.__dict__}') self.gateway.on_trade(trade) @@ -895,12 +912,24 @@ class BinanceDataWebsocketApi(WebsocketClient): tick = self.ticks[symbol] if channel == "ticker": - tick.volume = float(data['v']) + tick_dt = datetime.fromtimestamp(float(data['E']) / 1000) + trading_day = tick_dt.strftime('%Y-%m-%d') + today_volume = float(data['v']) + if tick.trading_day == trading_day: + volume_changed = max(0, today_volume - tick.volume) + else: + volume_changed = today_volume if len(tick.trading_day) > 0 else 1 + + tick.volume = today_volume + tick.last_volume = volume_changed tick.open_price = float(data['o']) tick.high_price = float(data['h']) tick.low_price = float(data['l']) tick.last_price = float(data['c']) - tick.datetime = datetime.fromtimestamp(float(data['E']) / 1000) + tick.datetime = tick_dt + tick.trading_day = trading_day + tick.date = tick.trading_day + tick.time = tick.datetime.strftime('%H:%M:%S.%f') else: bids = data["bids"] for n in range(5): diff --git a/vnpy/gateway/binancef/binancef_gateway.py b/vnpy/gateway/binancef/binancef_gateway.py index e766d45e..1dfef594 100644 --- a/vnpy/gateway/binancef/binancef_gateway.py +++ b/vnpy/gateway/binancef/binancef_gateway.py @@ -794,6 +794,7 @@ class BinancefRestApi(RestClient): high_price=float(l[2]), low_price=float(l[3]), close_price=float(l[4]), + trading_day=dt.strftime('%Y-%m-%d'), gateway_name=self.gateway_name ) buf.append(bar) @@ -1051,12 +1052,24 @@ class BinancefDataWebsocketApi(WebsocketClient): tick = self.ticks[symbol] if channel == "ticker": - tick.volume = float(data['v']) + tick_dt = datetime.fromtimestamp(float(data['E']) / 1000) + trading_day = tick_dt.strftime('%Y-%m-%d') + today_volume = float(data['v']) + if tick.trading_day == trading_day: + volume_changed = max(0, today_volume - tick.volume) + else: + volume_changed = today_volume if len(tick.trading_day) > 0 else 1 + + tick.volume = today_volume + tick.last_volume = volume_changed tick.open_price = float(data['o']) tick.high_price = float(data['h']) tick.low_price = float(data['l']) tick.last_price = float(data['c']) - tick.datetime = datetime.fromtimestamp(float(data['E']) / 1000) + tick.datetime = tick_dt + tick.trading_day = trading_day + tick.date = tick.trading_day + tick.time = tick.datetime.strftime('%H:%M:%S.%f') else: bids = data["b"] for n in range(5): diff --git a/vnpy/gateway/ctp/ctp_gateway.py b/vnpy/gateway/ctp/ctp_gateway.py index fcd2d1d7..4e54024f 100644 --- a/vnpy/gateway/ctp/ctp_gateway.py +++ b/vnpy/gateway/ctp/ctp_gateway.py @@ -591,6 +591,8 @@ class CtpMdApi(MdApi): self.userid = "" self.password = "" self.brokerid = "" + # 缓存tick的交易日和当前累计volume + self.last_ticks_info = {} # {symbol: {'trading_dat':'xxx', volume:xxx}} def onFrontConnected(self): """ @@ -651,20 +653,30 @@ class CtpMdApi(MdApi): dt = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S.%f") # 不处理开盘前的tick数据 - if dt.hour in [8, 20] and dt.minute <= 59: + if dt.hour in [7, 8, 18, 19, 20] and dt.minute <= 59: return if exchange is Exchange.CFFEX and dt.hour == 9 and dt.minute <= 29: return + today_volume = data["Volume"] + last_tick_info = self.last_ticks_info.get(symbol, {}) + trading_day = get_trading_date(dt) + last_trading_day = last_tick_info.get('trading_day', None) + if last_trading_day == trading_day: + volume_changed = max(0, today_volume - last_tick_info.get('volume', 0)) + else: + volume_changed = today_volume + self.last_ticks_info.update({symbol: {'trading_day': trading_day, 'volume': today_volume}}) tick = TickData( symbol=symbol, exchange=exchange, datetime=dt, date=s_date, time=dt.strftime('%H:%M:%S.%f'), - trading_day=get_trading_date(dt), + trading_day=trading_day, name=symbol_name_map[symbol], - volume=data["Volume"], + volume=today_volume, + last_volume=volume_changed, open_interest=data["OpenInterest"], last_price=data["LastPrice"], limit_up=data["UpperLimitPrice"], @@ -1014,11 +1026,11 @@ class CtpTdApi(TdApi): account.available = round(float(data["Available"]), 7) account.commission = round(float(data['Commission']), 7) account.margin = round(float(data['CurrMargin']), 7) - account.close_profit = round(float(data['CloseProfit']), 7) #+ round( - #float(data.get("SpecProductCloseProfit", 0)), 7) - account.holding_profit = round(float(data['PositionProfit']), 7) #+ round( - #float(data.get("SpecProductPositionProfit", 0)), 7) + round( - #float(data.get("SpecProductPositionProfitByAlg", 0)), 7) + account.close_profit = round(float(data['CloseProfit']), 7) # + round( + # float(data.get("SpecProductCloseProfit", 0)), 7) + account.holding_profit = round(float(data['PositionProfit']), 7) # + round( + # float(data.get("SpecProductPositionProfit", 0)), 7) + round( + # float(data.get("SpecProductPositionProfitByAlg", 0)), 7) account.trading_day = str(data['TradingDay']) if '-' not in account.trading_day and len(account.trading_day) == 8: account.trading_day = '-'.join( @@ -1185,7 +1197,7 @@ class CtpTdApi(TdApi): trade_date = trade_date[0:4] + '-' + trade_date[4:6] + '-' + trade_date[6:8] trade_time = data['TradeTime'] trade_datetime = datetime.strptime(f'{trade_date} {trade_time}', '%Y-%m-%d %H:%M:%S') - #print(f'raw_data:{print_dict(data)}') + # print(f'raw_data:{print_dict(data)}') # 修正 郑商所、大商所的TradeDate错误 if exchange in [Exchange.DCE, Exchange.CZCE]: dt_now = datetime.now() @@ -1201,11 +1213,11 @@ class CtpTdApi(TdApi): # 星期一 =》 星期五 if dt_now.isoweekday() == 1: trade_datetime -= timedelta(days=3) - #print(f'trade time =>{trade_datetime}') + # print(f'trade time =>{trade_datetime}') # 星期二~星期五 =》上一天 else: trade_datetime -= timedelta(days=1) - #print(f'trade time =>{trade_datetime}') + # print(f'trade time =>{trade_datetime}') tradeid = data["TradeID"] trade = TradeData( @@ -1706,6 +1718,7 @@ class TdxMdApi(): tick.low_price = float(d.get('ZuiDi', 0.0)) tick.last_price = float(d.get('MaiChu', 0.0)) tick.volume = int(d.get('XianLiang', 0)) + tick.last_volume = tick.volume tick.open_interest = d.get('ChiCangLiang') tick.time = tick.datetime.strftime('%H:%M:%S.%f')[0:12] @@ -1837,6 +1850,8 @@ class SubMdApi(): d.pop('exchange', None) d.pop('symbol', None) tick.__dict__.update(d) + if len(tick.trading_day) == 0: + tick.trading_day = get_trading_date(dt) self.symbol_tick_dict[symbol] = tick self.gateway.on_tick(tick) @@ -1953,6 +1968,8 @@ class TqMdApi(): self.ticks = {} + self.last_ticks_info = {} + def connect(self, setting): """""" try: @@ -1973,12 +1990,27 @@ class TqMdApi(): # 清洗 nan quote = {k: 0 if v != v else v for k, v in quote.items()} symbol, exchange = extract_vt_symbol(vt_symbol) + tick_dt = datetime.strptime(quote["datetime"], "%Y-%m-%d %H:%M:%S.%f") + + today_volume = quote["volume"] + last_tick_info = self.last_ticks_info.get(symbol, {}) + trading_day = get_trading_date(tick_dt) + last_trading_day = last_tick_info.get('trading_day', None) + if last_trading_day == trading_day: + volume_changed = max(0, today_volume - last_tick_info.get('volume', 0)) + else: + volume_changed = today_volume + + self.last_ticks_info.update({symbol: {'trading_day': trading_day, 'volume': today_volume}}) + tick = TickData( symbol=symbol, exchange=exchange, - datetime=datetime.strptime(quote["datetime"], "%Y-%m-%d %H:%M:%S.%f"), + datetime=tick_dt, + trading_day=trading_day, name=symbol, - volume=quote["volume"], + volume=today_volume, + last_volume=volume_changed, open_interest=quote["open_interest"], last_price=quote["last_price"], limit_up=quote["upper_limit"], diff --git a/vnpy/gateway/pb/pb_gateway.py b/vnpy/gateway/pb/pb_gateway.py index 4dc21772..3aed71b3 100644 --- a/vnpy/gateway/pb/pb_gateway.py +++ b/vnpy/gateway/pb/pb_gateway.py @@ -1232,8 +1232,13 @@ class PbTdApi(object): order_dt = datetime.strptime(f'{order_date} {order_time}', "%Y%m%d %H%M%S") direction = DIRECTION_STOCK_NAME2VT.get(data["委托方向"]) + offset = Offset.NONE if direction is None: direction = Direction.NET + elif direction == Direction.LONG: + offset = Offset.OPEN + elif direction == Direction.SHORT: + offset = Offset.CLOSE sys_order = OrderData( gateway_name=self.gateway_name, symbol=data["证券代码"], @@ -1243,7 +1248,7 @@ class PbTdApi(object): accountid=self.userid, type=ORDERTYPE_NAME2VT.get(data["价格类型"], OrderType.LIMIT), direction=direction, - offset=Offset.NONE, + offset=offset, price=float(data["委托价格"]), volume=float(data["委托数量"]), traded=float(data["成交数量"]), @@ -1306,8 +1311,13 @@ class PbTdApi(object): self.gateway.write_log(f'本地委托编号{local_orderid}不在本地订单中') direction = DIRECTION_STOCK_NAME2VT.get(str(data.wtfx).strip()) + offset = Offset.NONE if direction is None: direction = Direction.NET + elif direction == Direction.LONG: + offset = Offset.OPEN + elif direction == Direction.SHORT: + offset = Offset.CLOSE if order_status == Status.ALLTRADED: traded = data.wtsl else: @@ -1323,7 +1333,7 @@ class PbTdApi(object): accountid=self.userid, type=ORDERTYPE_PB2VT.get(str(data.wtjglx).strip(), OrderType.LIMIT), direction=direction, - offset=Offset.NONE, + offset=offset, price=float(data.wtjg), volume=float(data.wtsl), traded=traded, @@ -1460,6 +1470,14 @@ class PbTdApi(object): trade_date = data["成交日期"] trade_time = data["成交时间"] trade_dt = datetime.strptime(f'{trade_date} {trade_time}', "%Y%m%d %H%M%S") + direction = DIRECTION_STOCK_NAME2VT.get(data["委托方向"]) + offset = Offset.NONE + if direction is None: + direction = Direction.NET + elif direction == Direction.LONG: + offset = Offset.OPEN + elif direction == Direction.SHORT: + offset = Offset.CLOSE trade = TradeData( gateway_name=self.gateway_name, symbol=data["证券代码"], @@ -1468,8 +1486,8 @@ class PbTdApi(object): tradeid=sys_tradeid, sys_orderid=sys_orderid, accountid=self.userid, - direction=DIRECTION_STOCK_NAME2VT.get(data["委托方向"]), - offset=Offset.NONE, + direction=direction, + offset=offset, price=float(data["成交价格"]), volume=float(data["成交数量"]), datetime=trade_dt, @@ -1514,7 +1532,14 @@ class PbTdApi(object): trade_date = str(data.cjrq).strip() trade_time = str(data.cjsj).strip() trade_dt = datetime.strptime(f'{trade_date} {trade_time}', "%Y%m%d %H%M%S") - + direction = DIRECTION_ORDER_PB2VT.get(str(data.wtfx).strip()) + offset = Offset.NONE + if direction is None: + direction = Direction.NET + elif direction == Direction.LONG: + offset = Offset.OPEN + elif direction == Direction.SHORT: + offset = Offset.CLOSE trade = TradeData( gateway_name=self.gateway_name, symbol=str(data.zqdm).strip(), @@ -1523,8 +1548,8 @@ class PbTdApi(object): tradeid=sys_tradeid, sys_orderid=sys_orderid, accountid=self.userid, - direction=DIRECTION_ORDER_PB2VT.get(str(data.wtfx).strip()), - offset=Offset.NONE, + direction=direction, + offset=offset, price=float(data.cjjg), volume=int(data.cjsl), datetime=trade_dt, @@ -1582,7 +1607,14 @@ class PbTdApi(object): trade_date = str(data["CJRQ"]).lstrip() trade_time = str(data["CJSJ"]).lstrip() trade_dt = datetime.strptime(f'{trade_date} {trade_time}', "%Y%m%d %H%M%S") - + direction = DIRECTION_ORDER_PB2VT.get(str(data.wtfx).strip()) + offset = Offset.NONE + if direction is None: + direction = Direction.NET + elif direction == Direction.LONG: + offset = Offset.OPEN + elif direction == Direction.SHORT: + offset = Offset.CLOSE trade = TradeData( gateway_name=self.gateway_name, symbol=str(data["ZQDM"]).lstrip(), @@ -1591,8 +1623,8 @@ class PbTdApi(object): tradeid=sys_tradeid, sys_orderid=sys_orderid, accountid=self.userid, - direction=DIRECTION_ORDER_PB2VT.get(str(data["WTFX"]).lstrip()), - offset=Offset.NONE, + direction=direction, + offset=offset, price=float(str(data["CJJG"]).lstrip()), volume=float(str(data["CJSL"]).lstrip()), datetime=trade_dt, diff --git a/vnpy/gateway/rpc/rpc_gateway.py b/vnpy/gateway/rpc/rpc_gateway.py index 215e079d..4af5d765 100644 --- a/vnpy/gateway/rpc/rpc_gateway.py +++ b/vnpy/gateway/rpc/rpc_gateway.py @@ -21,9 +21,9 @@ class RpcGateway(BaseGateway): exchanges = list(Exchange) - def __init__(self, event_engine): + def __init__(self, event_engine, gateway_name="RPC"): """Constructor""" - super().__init__(event_engine, "RPC") + super().__init__(event_engine, gateway_name) self.symbol_gateway_map = {} diff --git a/vnpy/gateway/stockrpc/stock_rpc_gateway.py b/vnpy/gateway/stockrpc/stock_rpc_gateway.py index 42600877..87cf6b0a 100644 --- a/vnpy/gateway/stockrpc/stock_rpc_gateway.py +++ b/vnpy/gateway/stockrpc/stock_rpc_gateway.py @@ -121,6 +121,7 @@ class StockRpcGateway(BaseGateway): :param req: :return: """ + self.write_log(f'使用prc委托:{req.__dict__}') ref = self.client.send_order(req, self.remote_gw_name) local_ref = ref.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.') @@ -160,9 +161,9 @@ class StockRpcGateway(BaseGateway): for position in positions: position.gateway_name = self.gateway_name # 更换 vt_positionid得gateway前缀 - position.vt_positionid.replace(f'{position.gateway_name}.', f'{self.gateway_name}.') + position.vt_positionid = position.vt_positionid.replace(f'{position.gateway_name}.', f'{self.gateway_name}.') # 更换 vt_accountid得gateway前缀 - position.vt_accountid.replace(f'{position.gateway_name}.', f'{self.gateway_name}.') + position.vt_accountid = position.vt_accountid.replace(f'{position.gateway_name}.', f'{self.gateway_name}.') self.on_position(position) self.write_log("持仓信息查询成功") @@ -172,9 +173,9 @@ class StockRpcGateway(BaseGateway): # 更换gateway order.gateway_name = self.gateway_name # 更换 vt_orderid得gateway前缀 - order.vt_orderid.replace(f'{order.gateway_name}.', f'{self.gateway_name}.') + order.vt_orderid = order.vt_orderid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.') # 更换 vt_accountid得gateway前缀 - order.vt_accountid.replace(f'{order.gateway_name}.', f'{self.gateway_name}.') + order.vt_accountid = order.vt_accountid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.') self.on_order(order) self.write_log("委托信息查询成功") @@ -183,11 +184,11 @@ class StockRpcGateway(BaseGateway): for trade in trades: trade.gateway_name = self.gateway_name # 更换 vt_orderid得gateway前缀 - trade.vt_orderid.replace(f'{trade.gateway_name}.', f'{self.gateway_name}.') + trade.vt_orderid = trade.vt_orderid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.') # 更换 vt_orderid得gateway前缀 - trade.vt_orderid.replace(f'{trade.gateway_name}.', f'{self.gateway_name}.') + trade.vt_orderid = trade.vt_orderid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.') # 更换 vt_accountid得gateway前缀 - trade.vt_accountid.replace(f'{trade.gateway_name}.', f'{self.gateway_name}.') + trade.vt_accountid = trade.vt_accountid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.') self.on_trade(trade) self.write_log("成交信息查询成功") @@ -208,14 +209,24 @@ class StockRpcGateway(BaseGateway): if hasattr(data, "gateway_name"): data.gateway_name = self.gateway_name + if hasattr(data, 'vt_orderid'): - data.vt_orderid = data.vt_orderid.replace(f'{data.gateway_name}.', f'{self.gateway_name}.') + rpc_vt_orderid = data.vt_orderid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.') + self.write_log(f' vt_orderid :{data.vt_orderid} => {rpc_vt_orderid}') + data.vt_orderid = rpc_vt_orderid + if hasattr(data, 'vt_tradeid'): - data.vt_tradeid = data.vt_tradeid.replace(f'{data.gateway_name}.', f'{self.gateway_name}.') + rpc_vt_tradeid = data.vt_tradeid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.') + self.write_log(f' vt_tradeid :{data.vt_tradeid} => {rpc_vt_tradeid}') + data.vt_tradeid = rpc_vt_tradeid + if hasattr(data, 'vt_accountid'): - data.vt_accountid = data.vt_accountid.replace(f'{data.gateway_name}.', f'{self.gateway_name}.') + data.vt_accountid = data.vt_accountid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.') if hasattr(data, 'vt_positionid'): - data.vt_positionid = data.vt_positionid.replace(f'{data.gateway_name}.', f'{self.gateway_name}.') + data.vt_positionid = data.vt_positionid.replace(f'{self.remote_gw_name}.', f'{self.gateway_name}.') + + if event.type in [EVENT_ORDER,EVENT_TRADE]: + self.write_log(f'{self.remote_gw_name} => {self.gateway_name} event:{data.__dict__}') self.event_engine.put(event)