[增强] 币安数字货币历史数据增量更新,修复币安合约接口

This commit is contained in:
msincenselee 2020-04-05 21:52:12 +08:00
parent a1faf9b27d
commit 941a6bcf01
7 changed files with 250 additions and 42 deletions

View File

@ -0,0 +1,105 @@
# flake8: noqa
import os
import sys
import csv
import pandas as pd
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
if ROOT_PATH not in sys.path:
sys.path.append(ROOT_PATH)
print(f'append {ROOT_PATH} into sys.path')
from datetime import datetime, timedelta
from vnpy.data.binance.binance_future_data import BinanceFutureData, HistoryRequest, Exchange, Interval
from vnpy.trader.utility import get_csv_last_dt, append_data
# 获取币安合约交易的所有期货合约
future_data = BinanceFutureData()
contracts = BinanceFutureData.load_contracts()
if len(contracts) == 0:
future_data.save_contracts()
contracts = BinanceFutureData.load_contracts()
# 开始下载日期
start_date = '20190101'
def download_symbol(symbol, start_dt, bar_file_path):
req = HistoryRequest(
symbol=symbol,
exchange=Exchange(contract_info.get('exchange')),
interval=Interval.MINUTE,
start=start_dt
)
bars = future_data.get_bars(req=req, return_dict=True)
future_data.export_to(bars, file_name=bar_file_path)
# 逐一合约进行下载
for vt_symbol, contract_info in contracts.items():
symbol = contract_info.get('symbol')
bar_file_path = os.path.abspath(os.path.join(
ROOT_PATH,
'bar_data',
'binance',
f'{symbol}_{start_date}_1m.csv'))
# 不存在文件,直接下载,并保存
if not os.path.exists(bar_file_path):
print(f'文件{bar_file_path}不存在,开始时间:{start_date}')
start_dt = datetime.strptime(start_date, '%Y%m%d')
download_symbol(symbol, start_dt, bar_file_path)
continue
# 如果存在文件获取最后的bar时间
last_dt = get_csv_last_dt(bar_file_path)
# 获取不到时间,重新下载
if last_dt is None:
print(f'获取文件{bar_file_path}的最后时间失败,开始时间:{start_date}')
start_dt = datetime.strptime(start_date, '%Y%m%d')
download_symbol(symbol, start_dt, bar_file_path)
continue
# 获取到时间,变成那天的开始时间,下载数据
start_dt = last_dt.replace(hour=0, minute=0, second=0, microsecond=0)
print(f'文件{bar_file_path}存在,最后时间:{last_dt}, 调整数据获取开始时间:{start_dt}')
req = HistoryRequest(
symbol=symbol,
exchange=Exchange(contract_info.get('exchange')),
interval=Interval.MINUTE,
start=start_dt
)
bars = future_data.get_bars(req=req, return_dict=True)
if len(bars) <= 0:
print(f'下载{symbol} 1分钟数据为空白')
continue
bar_count = 0
# 获取标题
headers = []
with open(bar_file_path, "r", encoding='utf8') as f:
reader = csv.reader(f)
for header in reader:
headers = header
break
# 写入所有大于最后bar时间的数据
with open(bar_file_path, 'a', encoding='utf8', newline='\n') as csvWriteFile:
writer = csv.DictWriter(f=csvWriteFile, fieldnames=headers, dialect='excel',
extrasaction='ignore')
for bar in bars:
if bar['datetime'] <= last_dt:
continue
bar_count += 1
writer.writerow(bar)
print(f'更新{symbol}数据 => 文件{bar_file_path}, 最后记录:{bars[-1]}')

View File

@ -199,22 +199,29 @@ class CtaEngine(BaseEngine):
def process_timer_event(self, event: Event):
""" 处理定时器事件"""
all_trading = True
# 触发每个策略的定时接口
for strategy in list(self.strategies.values()):
strategy.on_timer()
if not strategy.trading:
all_trading = False
dt = datetime.now()
if self.last_minute != dt.minute:
self.last_minute = dt.minute
if all_trading:
# 主动获取所有策略得持仓信息
all_strategy_pos = self.get_all_strategy_pos()
# 比对仓位,使用上述获取得持仓信息,不用重复获取
self.compare_pos(strategy_pos_list=copy(all_strategy_pos))
# 推送到事件
self.put_all_strategy_pos_event(all_strategy_pos)
def process_tick_event(self, event: Event):
"""处理tick到达事件"""
tick = event.data
@ -1455,7 +1462,7 @@ class CtaEngine(BaseEngine):
d.update(strategy.get_parameters())
return d
def compare_pos(self):
def compare_pos(self, strategy_pos_list=[]):
"""
对比账号&策略的持仓,不同的话则发出微信提醒
:return:
@ -1467,14 +1474,14 @@ class CtaEngine(BaseEngine):
self.write_log(u'开始对比账号&策略的持仓')
# 获取当前策略得持仓
if len(strategy_pos_list) == 0:
strategy_pos_list = self.get_all_strategy_pos()
self.write_log(u'策略持仓清单:{}'.format(strategy_pos_list))
# 需要进行对比得合约集合(来自策略持仓/账号持仓)
vt_symbols = set()
# 账号的持仓处理 => account_pos
# 账号的持仓处理 => compare_pos
compare_pos = dict() # vt_symbol: {'账号多单': xx, '账号空单':xxx, '策略空单':[], '策略多单':[]}
for position in list(self.positions.values()):
@ -1526,10 +1533,13 @@ class CtaEngine(BaseEngine):
pos_compare_result = ''
# 精简输出
compare_info = ''
for vt_symbol in sorted(vt_symbols):
# 发送不一致得结果
symbol_pos = compare_pos.pop(vt_symbol)
symbol_pos = compare_pos.pop(vt_symbol, None)
if not symbol_pos:
self.write_error(f'持仓对比中,找不到{vt_symbol}')
continue
net_symbol_pos = round(round(symbol_pos['策略多单'], 7) - round(symbol_pos['策略空单'], 7), 7)
# 多空都一致
@ -1538,17 +1548,15 @@ class CtaEngine(BaseEngine):
self.write_log(msg)
compare_info += msg
else:
pos_compare_result += '\n{}: '.format(vt_symbol)
msg = f"{vt_symbol} [{symbol_pos}]"
pos_compare_result += '\n{}: {}'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False))
pos_compare_result += msg
self.write_error(u'{}不一致:{}'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False)))
compare_info += u'{}不一致:{}\n'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False))
# 不匹配输入到stdErr通道
if pos_compare_result != '':
msg = u'账户{}持仓不匹配: {}' \
.format(self.engine_config.get('account_id', '-'),
.format(self.engine_config.get('accountid', '-'),
pos_compare_result)
try:
from vnpy.trader.util_wechat import send_wx_msg

View File

@ -449,6 +449,7 @@ class CtaFutureTemplate(CtaTemplate):
self.policy = None # 事务执行组件
self.gt = None # 网格交易组件
self.klines = {} # K线组件字典: kline_name: kline
self.activate_market = False
self.cur_datetime: datetime = None # 当前Tick时间
self.cur_tick: TickData = None # 最新的合约tick( vt_symbol)
@ -845,9 +846,15 @@ class CtaFutureTemplate(CtaTemplate):
if order.traded > 0:
pre_traded_volume = grid.traded_volume
grid.traded_volume = round(grid.traded_volume + order.traded, 7)
self.write_log(f'撤单中部分成交:{order.traded} + 原已成交:{pre_traded_volume} => {grid.traded_volume}')
if not grid.order_ids:
self.write_log(f'撤单中部分开仓:{order.traded} + 原已成交:{pre_traded_volume} => {grid.traded_volume}')
if len(grid.order_ids):
grid.order_status = False
if grid.traded_volume > 0:
pre_volume = grid.volume
grid.volume = grid.traded_volume
grid.traded_volume = 0
grid.open_status = True
self.write_log(f'开仓完成grid.volume {pre_volume} => {grid.volume}')
self.gt.save()
self.active_orders.update({order.vt_orderid: old_order})
@ -877,9 +884,20 @@ class CtaFutureTemplate(CtaTemplate):
if order.traded > 0:
pre_traded_volume = grid.traded_volume
grid.traded_volume = round(grid.traded_volume + order.traded, 7)
self.write_log(f'撤单中部分成交:{order.traded} + 原已成交:{pre_traded_volume} => {grid.traded_volume}')
self.write_log(f'撤单中部分平仓成交:{order.traded} + 原已成交:{pre_traded_volume} => {grid.traded_volume}')
if len(grid.order_ids) == 0:
grid.order_status = False
if grid.traded_volume > 0:
pre_volume = grid.volume
grid.volume = round(grid.volume - grid.traded_volume, 7)
grid.traded_volume = 0
if grid.volume <= 0:
grid.volume = 0
grid.open_status = False
self.write_log(f'强制全部平仓完成')
else:
self.write_log(f'平仓委托中撤单完成部分成交减少持仓grid.volume {pre_volume} => {grid.volume}')
self.gt.save()
self.active_orders.update({order.vt_orderid: old_order})
@ -1235,7 +1253,7 @@ class CtaFutureTemplate(CtaTemplate):
and order_grid \
and len(order_grid.order_ids) == 0 \
and order_grid.traded_volume == 0:
self.write_log(u'移除委托网格{}'.format(order_grid.__dict__))
self.write_log(u'移除从未开仓成功的委托网格{}'.format(order_grid.__dict__))
order_info['grid'] = None
self.gt.remove_grids_by_ids(direction=order_grid.direction, ids=[order_grid.id])

View File

@ -28,23 +28,26 @@ class CtaPosition(CtaComponent):
self.write_error(content=f'开仓异常,净:{self.pos},多:{self.long_pos},加多:{volume},超过:{self.maxPos}')
# 更新
self.write_log(f'多仓:{self.long_pos}->{self.long_pos + volume}')
self.write_log(f'净:{self.pos}->{self.pos + volume}')
pre_long_pos = self.long_pos
pre_pos = self.pos
self.long_pos += volume
self.pos += volume
self.long_pos = round(self.long_pos, 7)
self.pos = round(self.pos, 7)
self.write_log(f'多仓:{pre_long_pos}->{self.long_pos}')
self.write_log(f'净:{pre_pos}->{self.pos}')
if direction == Direction.SHORT: # 加空仓
if (min(self.pos, self.short_pos) - volume) < (0 - self.maxPos):
self.write_error(content=f'开仓异常,净:{self.pos},空:{self.short_pos},加空:{volume},超过:{self.maxPos}')
self.write_log(f'空仓:{self.short_pos}->{self.short_pos - volume}')
self.write_log(f'净:{self.pos}->{self.pos - volume}')
pre_short_pos = self.short_pos
pre_pos = self.pos
self.short_pos -= volume
self.pos -= volume
self.short_pos = round(self.short_pos, 7)
self.pos = round(self.pos, 7)
self.write_log(f'空仓:{pre_short_pos}->{self.short_pos}')
self.write_log(f'净:{pre_pos}->{self.pos}')
return True
def close_pos(self, direction: Direction, volume: float):

View File

@ -173,7 +173,7 @@ class BinancefGateway(BaseGateway):
if self.count < 2:
return
self.count = 0
if len(self.query_functions) > 0:
func = self.query_functions.pop(0)
func()
self.query_functions.append(func)
@ -397,6 +397,7 @@ class BinancefRestApi(RestClient):
self.gateway_name
)
self.orders.update({orderid: copy(order)})
self.gateway.write_log(f'返回订单更新:{order.__dict__}')
self.gateway.on_order(order)
data = {
@ -495,15 +496,24 @@ class BinancefRestApi(RestClient):
def on_query_account(self, data: dict, request: Request) -> None:
""""""
for asset in data["assets"]:
""" {
"asset": "USDT", // 资产名
"initialMargin": "0.33683000", // 起始保证金
"maintMargin": "0.02695000", // 维持保证金
"marginBalance": "8.74947592", // 保证金余额
"maxWithdrawAmount": "8.41264592", // 最大可提款金额,`GET /fapi/balance`withdrawAvailable
"openOrderInitialMargin": "0.00000000", // 挂单起始保证金
"positionInitialMargin": "0.33683000", // 持仓起始保证金
"unrealizedProfit": "-0.44537584", // 持仓未实现盈亏
"walletBalance": "9.19485176" // 账户余额
}"""
account = AccountData(
accountid=asset["asset"],
balance=float(asset["walletBalance"]) + float(asset["maintMargin"]),
accountid=f"{self.gateway_name}_{asset['asset']}",
balance=float(asset["marginBalance"]),
frozen=float(asset["maintMargin"]),
holding_profit=float(asset['unrealizedProfit']),
gateway_name=self.gateway_name
)
# 修正vnpy AccountData
account.balance += account.holding_profit
if account.balance:
self.gateway.on_account(account)
@ -555,6 +565,7 @@ class BinancefRestApi(RestClient):
gateway_name=self.gateway_name,
)
self.orders.update({order.orderid: copy(order)})
self.gateway.write_log(f'返回订单查询结果:{order.__dict__}')
self.gateway.on_order(order)
self.gateway.write_log("委托信息查询成功")
@ -638,10 +649,11 @@ class BinancefRestApi(RestClient):
order = request.extra
order.status = Status.REJECTED
self.orders.update({order.orderid: copy(order)})
self.gateway.write_log(f'订单委托失败:{order.__dict__}')
self.gateway.on_order(order)
msg = f"委托失败,状态码:{status_code},信息:{request.response.text}"
self.gateway.write_log(msg)
self.gateway.write_error(msg)
def on_send_order_error(
self, exception_type: type, exception_value: Exception, tb, request: Request
@ -652,8 +664,11 @@ class BinancefRestApi(RestClient):
order = request.extra
order.status = Status.REJECTED
self.orders.update({order.orderid: copy(order)})
self.gateway.write_log(f'发送订单异常:{order.__dict__}')
self.gateway.on_order(order)
msg = f"委托失败,拒单"
self.gateway.write_error(msg)
# Record exception if not ConnectionError
if not issubclass(exception_type, ConnectionError):
self.on_error(exception_type, exception_value, tb, request)
@ -785,7 +800,35 @@ class BinancefTradeWebsocketApi(WebsocketClient):
self.on_order(packet)
def on_account(self, packet: dict) -> None:
""""""
"""websocket返回得Balance/Position信息更新"""
"""
{
"B":[ // 余额信息
{
"a":"USDT", // 资产名称
"wb":"122624.12345678", // 钱包余额
"cw":"100.12345678" // 除去逐仓保证金的钱包余额
},
{
"a":"BNB",
"wb":"1.00000000",
"cw":"0.00000000"
}
],
"P":[
{
"s":"BTCUSDT", // 交易对
"pa":"1", // 仓位
"ep":"9000", // 入仓价格
"cr":"200", // (费前)累计实现损益
"up":"0.2732781800", // 持仓未实现盈亏
"mt":"isolated", // 保证金模式
"iw":"0.06391979" // 若为逐仓仓位保证金
}
]
}
"""
# 计算持仓收益
holding_pnl = 0
for pos_data in packet["a"]["P"]:
print(pos_data)
@ -794,7 +837,7 @@ class BinancefTradeWebsocketApi(WebsocketClient):
symbol=pos_data["s"],
exchange=Exchange.BINANCE,
direction=Direction.NET,
volume=abs(volume),
volume=volume,
price=float(pos_data["ep"]),
pnl=float(pos_data["cr"]),
gateway_name=self.gateway_name,
@ -804,7 +847,7 @@ class BinancefTradeWebsocketApi(WebsocketClient):
for acc_data in packet["a"]["B"]:
account = AccountData(
accountid=acc_data["a"],
accountid=f"{self.gateway_name}_{acc_data['a']}",
balance=round(float(acc_data["wb"]), 7),
frozen=float(acc_data["wb"]) - float(acc_data["cw"]),
holding_profit=round(holding_pnl, 7),
@ -817,7 +860,7 @@ class BinancefTradeWebsocketApi(WebsocketClient):
self.gateway.on_account(account)
def on_order(self, packet: dict) -> None:
""""""
"""ws处理on_order事件"""
self.gateway.write_log(json.dumps(packet, indent=2))
dt = datetime.fromtimestamp(packet["E"] / 1000)
time = dt.strftime("%Y-%m-%d %H:%M:%S")
@ -848,7 +891,7 @@ class BinancefTradeWebsocketApi(WebsocketClient):
time=time,
gateway_name=self.gateway_name
)
self.gateway.write_log(f'WS返回订单更新:{order.__dict__}')
self.gateway.on_order(order)
# Push trade event

View File

@ -806,6 +806,9 @@ class TradingWidget(QtWidgets.QWidget):
symbol=symbol, exchange=Exchange(exchange_value)
)
if self.checkFixed.isChecked():
self.checkFixed.setChecked(False)
self.main_engine.subscribe(req, gateway_name)
def clear_label_text(self) -> None:
@ -930,9 +933,14 @@ class TradingWidget(QtWidgets.QWidget):
self.offset_combo.setCurrentText(Offset.CLOSE.value)
self.volume_line.setText(str(pos.volume))
self.volume_line.setText(str(abs(pos.volume)))
if pos.direction == Direction.NET:
if pos.volume >= 0:
self.direction_combo.setCurrentText(Direction.SHORT.value)
else:
self.direction_combo.setCurrentText(Direction.LONG.value)
elif pos.direction == Direction.LONG:
if pos.direction in [Direction.LONG, Direction.NET]:
self.direction_combo.setCurrentText(Direction.SHORT.value)
else:
self.direction_combo.setCurrentText(Direction.LONG.value)

View File

@ -314,6 +314,29 @@ def print_dict(d: dict):
return '\n'.join([f'{key}:{d[key]}' for key in sorted(d.keys())])
def get_csv_last_dt(file_name, dt_index=0, dt_format='%Y-%m-%d %H:%M:%S', line_length=1000):
"""
获取csv文件最后一行的日期数据(第dt_index个字段必须是 '%Y-%m-%d %H:%M:%S'格式
:param file_name:文件名
:param line_length: 行数据的长度
:return: None文件不存在或者时间格式不正确
"""
with open(file_name, 'r') as f:
f_size = os.path.getsize(file_name)
if f_size < line_length:
line_length = f_size
f.seek(f_size - line_length) # 移动到最后1000个字节
for row in f.readlines()[-1:]:
datas = row.split(',')
if len(datas) > dt_index + 1:
try:
last_dt = datetime.strptime(datas[dt_index], dt_format)
return last_dt
except:
return None
return None
def append_data(file_name: str, dict_data: dict, field_names: list = []):
"""
添加数据到csv文件中