diff --git a/prod/jobs/refill_binance_future_bars.py b/prod/jobs/refill_binance_future_bars.py new file mode 100644 index 00000000..799db24c --- /dev/null +++ b/prod/jobs/refill_binance_future_bars.py @@ -0,0 +1,105 @@ +# flake8: noqa + + +import os +import sys +import csv +import pandas as pd + +# 将repostory的目录i,作为根目录,添加到系统环境中。 +ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +if ROOT_PATH not in sys.path: + sys.path.append(ROOT_PATH) + print(f'append {ROOT_PATH} into sys.path') + +from datetime import datetime, timedelta +from vnpy.data.binance.binance_future_data import BinanceFutureData, HistoryRequest, Exchange, Interval +from vnpy.trader.utility import get_csv_last_dt, append_data + +# 获取币安合约交易的所有期货合约 +future_data = BinanceFutureData() +contracts = BinanceFutureData.load_contracts() +if len(contracts) == 0: + future_data.save_contracts() + contracts = BinanceFutureData.load_contracts() + +# 开始下载日期 +start_date = '20190101' + +def download_symbol(symbol, start_dt, bar_file_path): + req = HistoryRequest( + symbol=symbol, + exchange=Exchange(contract_info.get('exchange')), + interval=Interval.MINUTE, + start=start_dt + ) + + bars = future_data.get_bars(req=req, return_dict=True) + future_data.export_to(bars, file_name=bar_file_path) + +# 逐一合约进行下载 +for vt_symbol, contract_info in contracts.items(): + symbol = contract_info.get('symbol') + + bar_file_path = os.path.abspath(os.path.join( + ROOT_PATH, + 'bar_data', + 'binance', + f'{symbol}_{start_date}_1m.csv')) + + # 不存在文件,直接下载,并保存 + if not os.path.exists(bar_file_path): + print(f'文件{bar_file_path}不存在,开始时间:{start_date}') + start_dt = datetime.strptime(start_date, '%Y%m%d') + download_symbol(symbol, start_dt, bar_file_path) + continue + + # 如果存在文件,获取最后的bar时间 + last_dt = get_csv_last_dt(bar_file_path) + + # 获取不到时间,重新下载 + if last_dt is None: + print(f'获取文件{bar_file_path}的最后时间失败,开始时间:{start_date}') + start_dt = datetime.strptime(start_date, '%Y%m%d') + download_symbol(symbol, start_dt, bar_file_path) + continue + + # 获取到时间,变成那天的开始时间,下载数据 + start_dt = last_dt.replace(hour=0, minute=0, second=0, microsecond=0) + print(f'文件{bar_file_path}存在,最后时间:{last_dt}, 调整数据获取开始时间:{start_dt}') + req = HistoryRequest( + symbol=symbol, + exchange=Exchange(contract_info.get('exchange')), + interval=Interval.MINUTE, + start=start_dt + ) + + bars = future_data.get_bars(req=req, return_dict=True) + if len(bars) <= 0: + print(f'下载{symbol} 1分钟数据为空白') + continue + + bar_count = 0 + + # 获取标题 + headers = [] + with open(bar_file_path, "r", encoding='utf8') as f: + reader = csv.reader(f) + for header in reader: + headers = header + break + + # 写入所有大于最后bar时间的数据 + with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile: + + writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel', + extrasaction='ignore') + for bar in bars: + if bar['datetime'] <= last_dt: + continue + bar_count += 1 + writer.writerow(bar) + + print(f'更新{symbol}数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}') + + diff --git a/vnpy/app/cta_crypto/engine.py b/vnpy/app/cta_crypto/engine.py index c0302fa4..a3526cfb 100644 --- a/vnpy/app/cta_crypto/engine.py +++ b/vnpy/app/cta_crypto/engine.py @@ -199,21 +199,28 @@ class CtaEngine(BaseEngine): def process_timer_event(self, event: Event): """ 处理定时器事件""" - + all_trading = True # 触发每个策略的定时接口 for strategy in list(self.strategies.values()): strategy.on_timer() + if not strategy.trading: + all_trading = False dt = datetime.now() if self.last_minute != dt.minute: self.last_minute = dt.minute - # 主动获取所有策略得持仓信息 - all_strategy_pos = self.get_all_strategy_pos() + if all_trading: + # 主动获取所有策略得持仓信息 + all_strategy_pos = self.get_all_strategy_pos() + + # 比对仓位,使用上述获取得持仓信息,不用重复获取 + self.compare_pos(strategy_pos_list=copy(all_strategy_pos)) + + # 推送到事件 + self.put_all_strategy_pos_event(all_strategy_pos) - # 推送到事件 - self.put_all_strategy_pos_event(all_strategy_pos) def process_tick_event(self, event: Event): """处理tick到达事件""" @@ -1455,7 +1462,7 @@ class CtaEngine(BaseEngine): d.update(strategy.get_parameters()) return d - def compare_pos(self): + def compare_pos(self, strategy_pos_list=[]): """ 对比账号&策略的持仓,不同的话则发出微信提醒 :return: @@ -1467,14 +1474,14 @@ class CtaEngine(BaseEngine): self.write_log(u'开始对比账号&策略的持仓') # 获取当前策略得持仓 - strategy_pos_list = self.get_all_strategy_pos() + if len(strategy_pos_list) == 0: + strategy_pos_list = self.get_all_strategy_pos() self.write_log(u'策略持仓清单:{}'.format(strategy_pos_list)) # 需要进行对比得合约集合(来自策略持仓/账号持仓) vt_symbols = set() - # 账号的持仓处理 => account_pos - + # 账号的持仓处理 => compare_pos compare_pos = dict() # vt_symbol: {'账号多单': xx, '账号空单':xxx, '策略空单':[], '策略多单':[]} for position in list(self.positions.values()): @@ -1526,11 +1533,14 @@ class CtaEngine(BaseEngine): pos_compare_result = '' # 精简输出 compare_info = '' + for vt_symbol in sorted(vt_symbols): # 发送不一致得结果 - symbol_pos = compare_pos.pop(vt_symbol) - - net_symbol_pos = round(round(symbol_pos['策略多单'], 7) - round(symbol_pos['策略空单'], 7),7) + symbol_pos = compare_pos.pop(vt_symbol, None) + if not symbol_pos: + self.write_error(f'持仓对比中,找不到{vt_symbol}') + continue + net_symbol_pos = round(round(symbol_pos['策略多单'], 7) - round(symbol_pos['策略空单'], 7), 7) # 多空都一致 if round(symbol_pos['账号净仓'], 7) == net_symbol_pos: @@ -1538,17 +1548,15 @@ class CtaEngine(BaseEngine): self.write_log(msg) compare_info += msg else: - pos_compare_result += '\n{}: '.format(vt_symbol) - msg = f"{vt_symbol} [{symbol_pos}]" + pos_compare_result += '\n{}: {}'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False)) - pos_compare_result += msg self.write_error(u'{}不一致:{}'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False))) compare_info += u'{}不一致:{}\n'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False)) # 不匹配,输入到stdErr通道 if pos_compare_result != '': msg = u'账户{}持仓不匹配: {}' \ - .format(self.engine_config.get('account_id', '-'), + .format(self.engine_config.get('accountid', '-'), pos_compare_result) try: from vnpy.trader.util_wechat import send_wx_msg diff --git a/vnpy/app/cta_crypto/template.py b/vnpy/app/cta_crypto/template.py index 2c1c0fa9..81413f9b 100644 --- a/vnpy/app/cta_crypto/template.py +++ b/vnpy/app/cta_crypto/template.py @@ -449,6 +449,7 @@ class CtaFutureTemplate(CtaTemplate): self.policy = None # 事务执行组件 self.gt = None # 网格交易组件 self.klines = {} # K线组件字典: kline_name: kline + self.activate_market = False self.cur_datetime: datetime = None # 当前Tick时间 self.cur_tick: TickData = None # 最新的合约tick( vt_symbol) @@ -844,10 +845,16 @@ class CtaFutureTemplate(CtaTemplate): grid.order_ids.remove(order.vt_orderid) if order.traded > 0: pre_traded_volume = grid.traded_volume - grid.traded_volume = round(grid.traded_volume + order.traded,7) - self.write_log(f'撤单中部分成交:{order.traded} + 原已成交:{pre_traded_volume} => {grid.traded_volume}') - if not grid.order_ids: + grid.traded_volume = round(grid.traded_volume + order.traded, 7) + self.write_log(f'撤单中部分开仓:{order.traded} + 原已成交:{pre_traded_volume} => {grid.traded_volume}') + if len(grid.order_ids): grid.order_status = False + if grid.traded_volume > 0: + pre_volume = grid.volume + grid.volume = grid.traded_volume + grid.traded_volume = 0 + grid.open_status = True + self.write_log(f'开仓完成,grid.volume {pre_volume} => {grid.volume}') self.gt.save() self.active_orders.update({order.vt_orderid: old_order}) @@ -877,9 +884,20 @@ class CtaFutureTemplate(CtaTemplate): if order.traded > 0: pre_traded_volume = grid.traded_volume grid.traded_volume = round(grid.traded_volume + order.traded, 7) - self.write_log(f'撤单中部分成交:{order.traded} + 原已成交:{pre_traded_volume} => {grid.traded_volume}') + self.write_log(f'撤单中部分平仓成交:{order.traded} + 原已成交:{pre_traded_volume} => {grid.traded_volume}') if len(grid.order_ids) == 0: grid.order_status = False + if grid.traded_volume > 0: + pre_volume = grid.volume + grid.volume = round(grid.volume - grid.traded_volume, 7) + grid.traded_volume = 0 + if grid.volume <= 0: + grid.volume = 0 + grid.open_status = False + self.write_log(f'强制全部平仓完成') + else: + self.write_log(f'平仓委托中,撤单完成,部分成交,减少持仓grid.volume {pre_volume} => {grid.volume}') + self.gt.save() self.active_orders.update({order.vt_orderid: old_order}) @@ -1235,7 +1253,7 @@ class CtaFutureTemplate(CtaTemplate): and order_grid \ and len(order_grid.order_ids) == 0 \ and order_grid.traded_volume == 0: - self.write_log(u'移除委托网格{}'.format(order_grid.__dict__)) + self.write_log(u'移除从未开仓成功的委托网格{}'.format(order_grid.__dict__)) order_info['grid'] = None self.gt.remove_grids_by_ids(direction=order_grid.direction, ids=[order_grid.id]) diff --git a/vnpy/component/cta_position.py b/vnpy/component/cta_position.py index 5a59e559..5e4b7011 100644 --- a/vnpy/component/cta_position.py +++ b/vnpy/component/cta_position.py @@ -28,23 +28,26 @@ class CtaPosition(CtaComponent): self.write_error(content=f'开仓异常,净:{self.pos},多:{self.long_pos},加多:{volume},超过:{self.maxPos}') # 更新 - self.write_log(f'多仓:{self.long_pos}->{self.long_pos + volume}') - self.write_log(f'净:{self.pos}->{self.pos + volume}') + pre_long_pos = self.long_pos + pre_pos = self.pos self.long_pos += volume self.pos += volume self.long_pos = round(self.long_pos, 7) self.pos = round(self.pos, 7) + self.write_log(f'多仓:{pre_long_pos}->{self.long_pos}') + self.write_log(f'净:{pre_pos}->{self.pos}') if direction == Direction.SHORT: # 加空仓 if (min(self.pos, self.short_pos) - volume) < (0 - self.maxPos): self.write_error(content=f'开仓异常,净:{self.pos},空:{self.short_pos},加空:{volume},超过:{self.maxPos}') - - self.write_log(f'空仓:{self.short_pos}->{self.short_pos - volume}') - self.write_log(f'净:{self.pos}->{self.pos - volume}') + pre_short_pos = self.short_pos + pre_pos = self.pos self.short_pos -= volume self.pos -= volume self.short_pos = round(self.short_pos, 7) self.pos = round(self.pos, 7) + self.write_log(f'空仓:{pre_short_pos}->{self.short_pos}') + self.write_log(f'净:{pre_pos}->{self.pos}') return True def close_pos(self, direction: Direction, volume: float): diff --git a/vnpy/gateway/binancef/binancef_gateway.py b/vnpy/gateway/binancef/binancef_gateway.py index ffbb2dc4..440e6fe7 100644 --- a/vnpy/gateway/binancef/binancef_gateway.py +++ b/vnpy/gateway/binancef/binancef_gateway.py @@ -173,10 +173,10 @@ class BinancefGateway(BaseGateway): if self.count < 2: return self.count = 0 - - func = self.query_functions.pop(0) - func() - self.query_functions.append(func) + if len(self.query_functions) > 0: + func = self.query_functions.pop(0) + func() + self.query_functions.append(func) def get_order(self, orderid: str): return self.rest_api.get_order(orderid) @@ -397,6 +397,7 @@ class BinancefRestApi(RestClient): self.gateway_name ) self.orders.update({orderid: copy(order)}) + self.gateway.write_log(f'返回订单更新:{order.__dict__}') self.gateway.on_order(order) data = { @@ -495,15 +496,24 @@ class BinancefRestApi(RestClient): def on_query_account(self, data: dict, request: Request) -> None: """""" for asset in data["assets"]: + """ { + "asset": "USDT", // 资产名 + "initialMargin": "0.33683000", // 起始保证金 + "maintMargin": "0.02695000", // 维持保证金 + "marginBalance": "8.74947592", // 保证金余额 + "maxWithdrawAmount": "8.41264592", // 最大可提款金额,同`GET /fapi/balance`中“withdrawAvailable” + "openOrderInitialMargin": "0.00000000", // 挂单起始保证金 + "positionInitialMargin": "0.33683000", // 持仓起始保证金 + "unrealizedProfit": "-0.44537584", // 持仓未实现盈亏 + "walletBalance": "9.19485176" // 账户余额 + }""" account = AccountData( - accountid=asset["asset"], - balance=float(asset["walletBalance"]) + float(asset["maintMargin"]), + accountid=f"{self.gateway_name}_{asset['asset']}", + balance=float(asset["marginBalance"]), frozen=float(asset["maintMargin"]), holding_profit=float(asset['unrealizedProfit']), gateway_name=self.gateway_name ) - # 修正vnpy AccountData - account.balance += account.holding_profit if account.balance: self.gateway.on_account(account) @@ -555,6 +565,7 @@ class BinancefRestApi(RestClient): gateway_name=self.gateway_name, ) self.orders.update({order.orderid: copy(order)}) + self.gateway.write_log(f'返回订单查询结果:{order.__dict__}') self.gateway.on_order(order) self.gateway.write_log("委托信息查询成功") @@ -638,10 +649,11 @@ class BinancefRestApi(RestClient): order = request.extra order.status = Status.REJECTED self.orders.update({order.orderid: copy(order)}) + self.gateway.write_log(f'订单委托失败:{order.__dict__}') self.gateway.on_order(order) msg = f"委托失败,状态码:{status_code},信息:{request.response.text}" - self.gateway.write_log(msg) + self.gateway.write_error(msg) def on_send_order_error( self, exception_type: type, exception_value: Exception, tb, request: Request @@ -652,8 +664,11 @@ class BinancefRestApi(RestClient): order = request.extra order.status = Status.REJECTED self.orders.update({order.orderid: copy(order)}) + self.gateway.write_log(f'发送订单异常:{order.__dict__}') self.gateway.on_order(order) + msg = f"委托失败,拒单" + self.gateway.write_error(msg) # Record exception if not ConnectionError if not issubclass(exception_type, ConnectionError): self.on_error(exception_type, exception_value, tb, request) @@ -785,7 +800,35 @@ class BinancefTradeWebsocketApi(WebsocketClient): self.on_order(packet) def on_account(self, packet: dict) -> None: - """""" + """websocket返回得Balance/Position信息更新""" + """ + { + "B":[ // 余额信息 + { + "a":"USDT", // 资产名称 + "wb":"122624.12345678", // 钱包余额 + "cw":"100.12345678" // 除去逐仓保证金的钱包余额 + }, + { + "a":"BNB", + "wb":"1.00000000", + "cw":"0.00000000" + } + ], + "P":[ + { + "s":"BTCUSDT", // 交易对 + "pa":"1", // 仓位 + "ep":"9000", // 入仓价格 + "cr":"200", // (费前)累计实现损益 + "up":"0.2732781800", // 持仓未实现盈亏 + "mt":"isolated", // 保证金模式 + "iw":"0.06391979" // 若为逐仓,仓位保证金 + } + ] + } + """ + # 计算持仓收益 holding_pnl = 0 for pos_data in packet["a"]["P"]: print(pos_data) @@ -794,7 +837,7 @@ class BinancefTradeWebsocketApi(WebsocketClient): symbol=pos_data["s"], exchange=Exchange.BINANCE, direction=Direction.NET, - volume=abs(volume), + volume=volume, price=float(pos_data["ep"]), pnl=float(pos_data["cr"]), gateway_name=self.gateway_name, @@ -804,7 +847,7 @@ class BinancefTradeWebsocketApi(WebsocketClient): for acc_data in packet["a"]["B"]: account = AccountData( - accountid=acc_data["a"], + accountid=f"{self.gateway_name}_{acc_data['a']}", balance=round(float(acc_data["wb"]), 7), frozen=float(acc_data["wb"]) - float(acc_data["cw"]), holding_profit=round(holding_pnl, 7), @@ -817,7 +860,7 @@ class BinancefTradeWebsocketApi(WebsocketClient): self.gateway.on_account(account) def on_order(self, packet: dict) -> None: - """""" + """ws处理on_order事件""" self.gateway.write_log(json.dumps(packet, indent=2)) dt = datetime.fromtimestamp(packet["E"] / 1000) time = dt.strftime("%Y-%m-%d %H:%M:%S") @@ -848,7 +891,7 @@ class BinancefTradeWebsocketApi(WebsocketClient): time=time, gateway_name=self.gateway_name ) - + self.gateway.write_log(f'WS返回订单更新:{order.__dict__}') self.gateway.on_order(order) # Push trade event diff --git a/vnpy/trader/ui/widget.py b/vnpy/trader/ui/widget.py index 08a71c82..8e76f1b1 100644 --- a/vnpy/trader/ui/widget.py +++ b/vnpy/trader/ui/widget.py @@ -806,6 +806,9 @@ class TradingWidget(QtWidgets.QWidget): symbol=symbol, exchange=Exchange(exchange_value) ) + if self.checkFixed.isChecked(): + self.checkFixed.setChecked(False) + self.main_engine.subscribe(req, gateway_name) def clear_label_text(self) -> None: @@ -930,9 +933,14 @@ class TradingWidget(QtWidgets.QWidget): self.offset_combo.setCurrentText(Offset.CLOSE.value) - self.volume_line.setText(str(pos.volume)) + self.volume_line.setText(str(abs(pos.volume))) + if pos.direction == Direction.NET: + if pos.volume >= 0: + self.direction_combo.setCurrentText(Direction.SHORT.value) + else: + self.direction_combo.setCurrentText(Direction.LONG.value) + elif pos.direction == Direction.LONG: - if pos.direction in [Direction.LONG, Direction.NET]: self.direction_combo.setCurrentText(Direction.SHORT.value) else: self.direction_combo.setCurrentText(Direction.LONG.value) diff --git a/vnpy/trader/utility.py b/vnpy/trader/utility.py index 40500d1c..79fae4ae 100644 --- a/vnpy/trader/utility.py +++ b/vnpy/trader/utility.py @@ -314,6 +314,29 @@ def print_dict(d: dict): return '\n'.join([f'{key}:{d[key]}' for key in sorted(d.keys())]) +def get_csv_last_dt(file_name, dt_index=0, dt_format='%Y-%m-%d %H:%M:%S', line_length=1000): + """ + 获取csv文件最后一行的日期数据(第dt_index个字段必须是 '%Y-%m-%d %H:%M:%S'格式 + :param file_name:文件名 + :param line_length: 行数据的长度 + :return: None,文件不存在,或者时间格式不正确 + """ + with open(file_name, 'r') as f: + f_size = os.path.getsize(file_name) + if f_size < line_length: + line_length = f_size + f.seek(f_size - line_length) # 移动到最后1000个字节 + for row in f.readlines()[-1:]: + + datas = row.split(',') + if len(datas) > dt_index + 1: + try: + last_dt = datetime.strptime(datas[dt_index], dt_format) + return last_dt + except: + return None + return None + def append_data(file_name: str, dict_data: dict, field_names: list = []): """ 添加数据到csv文件中