From 0d0a7818e19e96e17b3a0ad1628a7e33c5bdd87a Mon Sep 17 00:00:00 2001 From: msincenselee Date: Wed, 17 Jun 2020 21:27:34 +0800 Subject: [PATCH] [bug fix] --- vnpy/app/algo_trading/engine.py | 4 +- vnpy/app/cta_crypto/back_testing.py | 10 +- vnpy/app/cta_stock/engine.py | 11 +- vnpy/app/cta_stock/template.py | 10 +- vnpy/app/cta_strategy_pro/engine.py | 4 + vnpy/app/cta_strategy_pro/template.py | 15 ++ vnpy/app/cta_strategy_pro/ui/widget.py | 5 +- vnpy/component/cta_fund_kline.py | 2 +- vnpy/component/cta_renko_bar.py | 6 +- vnpy/data/tdx/tdx_future_data.py | 66 ++++- vnpy/data/tdx/test_tdx_future.py | 2 +- vnpy/gateway/futu/futu_gateway.py | 341 +++++++++++++++++++++++-- vnpy/trader/constant.py | 1 + vnpy/trader/engine.py | 1 + 14 files changed, 431 insertions(+), 47 deletions(-) diff --git a/vnpy/app/algo_trading/engine.py b/vnpy/app/algo_trading/engine.py index e50ceb06..a8c750fe 100644 --- a/vnpy/app/algo_trading/engine.py +++ b/vnpy/app/algo_trading/engine.py @@ -365,12 +365,12 @@ class AlgoEngine(BaseEngine): if len(vt_accountid) > 0: account = self.main_engine.get_account(vt_accountid) - return account.balance, account.avaliable, round(account.frozen * 100 / (account.balance + 0.01), 2), 100 + return account.balance, account.available, round(account.frozen * 100 / (account.balance + 0.01), 2), 100 else: accounts = self.main_engine.get_all_accounts() if len(accounts) > 0: account = accounts[0] - return account.balance, account.avaliable, round(account.frozen * 100 / (account.balance + 0.01), + return account.balance, account.available, round(account.frozen * 100 / (account.balance + 0.01), 2), 100 else: return 0, 0, 0, 0 diff --git a/vnpy/app/cta_crypto/back_testing.py b/vnpy/app/cta_crypto/back_testing.py index f52690e8..8c0b4af1 100644 --- a/vnpy/app/cta_crypto/back_testing.py +++ b/vnpy/app/cta_crypto/back_testing.py @@ -160,7 +160,7 @@ class BackTestingEngine(object): self.net_capital = self.init_capital # 实时资金净值(每日根据capital和持仓浮盈计算) self.max_capital = self.init_capital # 资金最高净值 self.max_net_capital = self.init_capital - self.avaliable = self.init_capital + self.available = self.init_capital self.max_pnl = 0 # 最高盈利 self.min_pnl = 0 # 最大亏损 @@ -257,7 +257,7 @@ class BackTestingEngine(object): if self.net_capital == 0.0: self.percent = 0.0 - return self.net_capital, self.avaliable, self.percent, self.percent_limit + return self.net_capital, self.available, self.percent, self.percent_limit def set_test_start_date(self, start_date: str = '20100416', init_days: int = 10): """设置回测的启动日期""" @@ -290,7 +290,7 @@ class BackTestingEngine(object): self.net_capital = capital # 实时资金净值(每日根据capital和持仓浮盈计算) self.max_capital = capital # 资金最高净值 self.max_net_capital = capital - self.avaliable = capital + self.available = capital self.init_capital = capital def set_margin_rate(self, vt_symbol: str, margin_rate: float): @@ -1705,7 +1705,7 @@ class BackTestingEngine(object): 0) # 可用资金 = 当前净值 - 占用保证金 - self.avaliable = self.net_capital - occupy_money + self.available = self.net_capital - occupy_money # 当前保证金占比 self.percent = round(float(occupy_money * 100 / self.net_capital), 2) # 更新最大保证金占比 @@ -1759,7 +1759,7 @@ class BackTestingEngine(object): self.write_log(msg) # 重新计算一次avaliable - self.avaliable = self.net_capital - occupy_money + self.available = self.net_capital - occupy_money self.percent = round(float(occupy_money * 100 / self.net_capital), 2) def saving_daily_data(self, d, c, m, commission, benchmark=0): diff --git a/vnpy/app/cta_stock/engine.py b/vnpy/app/cta_stock/engine.py index 59281d5b..2c6f9f92 100644 --- a/vnpy/app/cta_stock/engine.py +++ b/vnpy/app/cta_stock/engine.py @@ -220,9 +220,10 @@ class CtaEngine(BaseEngine): all_strategy_pos = self.get_all_strategy_pos() # 每5分钟检查一次 - if dt.minute % 5 == 0: + if dt.minute % 10 == 0: # 比对仓位,使用上述获取得持仓信息,不用重复获取 - self.compare_pos(strategy_pos_list=copy(all_strategy_pos)) + #self.compare_pos(strategy_pos_list=copy(all_strategy_pos)) + pass # 推送到事件 self.put_all_strategy_pos_event(all_strategy_pos) @@ -1777,8 +1778,10 @@ class CtaEngine(BaseEngine): self.logger.log(level, msg) # 如果日志数据异常,错误和告警,输出至sys.stderr - if level in [logging.CRITICAL, logging.ERROR, logging.WARNING]: - print(f"{strategy_name}: {msg}" if strategy_name else msg, file=sys.stderr) + if level in [logging.CRITICAL]: + log_msg = f"{strategy_name}: {msg}" if strategy_name else msg + print(log_msg, file=sys.stderr) + send_wx_msg(log_msg) def write_error(self, msg: str, strategy_name: str = ''): """写入错误日志""" diff --git a/vnpy/app/cta_stock/template.py b/vnpy/app/cta_stock/template.py index 370a919e..3f802de7 100644 --- a/vnpy/app/cta_stock/template.py +++ b/vnpy/app/cta_stock/template.py @@ -875,11 +875,14 @@ class CtaStockTemplate(CtaTemplate): # 多单网格逐一止损/止盈检查: long_grids = self.gt.get_opened_grids(direction=Direction.LONG) for lg in long_grids: - if lg.close_status or lg.order_status or not lg.open_status: continue cur_price = self.cta_engine.get_price(lg.vt_symbol) + if cur_price is None: + self.write_log(f'没有获取到{lg.vt_symbol}的当前价格,提交订阅') + self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=lg.vt_symbol) + continue # 主动止盈 if 0 < lg.close_price <= cur_price: @@ -1004,6 +1007,11 @@ class CtaStockTemplate(CtaTemplate): # 实盘运行时,要加入市场买卖量的判断 if not self.backtesting: symbol_tick = self.cta_engine.get_tick(vt_symbol) + if symbol_tick is None: + self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=vt_symbol) + self.write_log(f'获取不到{vt_symbol}得tick,无法根据市场深度进行计算') + return + symbol_volume_tick = self.cta_engine.get_volume_tick(vt_symbol) # 根据市场计算,前5档买单数量 if all([symbol_tick.ask_volume_1, symbol_tick.ask_volume_2, symbol_tick.ask_volume_3, diff --git a/vnpy/app/cta_strategy_pro/engine.py b/vnpy/app/cta_strategy_pro/engine.py index 367e5ba0..6dfeebda 100644 --- a/vnpy/app/cta_strategy_pro/engine.py +++ b/vnpy/app/cta_strategy_pro/engine.py @@ -920,6 +920,10 @@ class CtaEngine(BaseEngine): ) bars = self.main_engine.query_history(req, contract.gateway_name) + if bars is None: + self.write_error(f'获取不到历史K线:{req.__dict__}') + return + for bar in bars: if bar.trading_day: bar.trading_day = bar.datetime.strftime('%Y-%m-%d') diff --git a/vnpy/app/cta_strategy_pro/template.py b/vnpy/app/cta_strategy_pro/template.py index fdfb73f8..a1496447 100644 --- a/vnpy/app/cta_strategy_pro/template.py +++ b/vnpy/app/cta_strategy_pro/template.py @@ -1512,6 +1512,14 @@ class CtaProFutureTemplate(CtaProTemplate): self.write_log(u'{}涨停,不做cover'.format(order_vt_symbol)) return + pos = self.cta_engine.get_position_holding(vt_symbol=order_vt_symbol) + if pos is None: + self.write_error(f'{self.strategy_name}无法获取{order_vt_symbol}的持仓信息,无法平仓') + return + if pos.short_pos < order_volume: + self.write_error(f'{self.strategy_name}{order_vt_symbol}的持仓空单{pos.short_pos}不满足平仓{order_volume}要求,无法平仓') + return + # 发送委托 vt_orderids = self.cover(price=cover_price, volume=order_volume, @@ -1551,6 +1559,13 @@ class CtaProFutureTemplate(CtaProTemplate): self.write_log(u'{}涨停,不做sell'.format(order_vt_symbol)) return + pos = self.cta_engine.get_position_holding(vt_symbol=order_vt_symbol) + if pos is None: + self.write_error(f'{self.strategy_name}无法获取{order_vt_symbol}的持仓信息,无法平仓') + return + if pos.long_pos < order_volume: + self.write_error(f'{self.strategy_name}{order_vt_symbol}的持仓多单{pos.long_pos}不满足平仓{order_volume}要求,无法平仓') + return # 发送委托 vt_orderids = self.sell(price=sell_price, volume=order_volume, diff --git a/vnpy/app/cta_strategy_pro/ui/widget.py b/vnpy/app/cta_strategy_pro/ui/widget.py index 269bb586..ceec637d 100644 --- a/vnpy/app/cta_strategy_pro/ui/widget.py +++ b/vnpy/app/cta_strategy_pro/ui/widget.py @@ -1,3 +1,4 @@ +import os from vnpy.event import Event, EventEngine from vnpy.trader.engine import MainEngine from vnpy.trader.ui import QtCore, QtGui, QtWidgets @@ -295,7 +296,9 @@ class StrategyManager(QtWidgets.QFrame): if snapshot is None: return ui_snapshot = UiSnapshot() - ui_snapshot.show(snapshot_file="", d=snapshot) + trade_csv = os.path.abspath(os.path.join(self.cta_engine.get_data_path(), f'{self.strategy_name}_trade.csv')) + tns_csv = os.path.abspath(os.path.join(self.cta_engine.get_data_path(), f'{self.strategy_name}_tns.csv')) + ui_snapshot.show(snapshot_file="", d=snapshot, trade_file=trade_csv, tns_file=tns_csv) class DataMonitor(QtWidgets.QTableWidget): """ diff --git a/vnpy/component/cta_fund_kline.py b/vnpy/component/cta_fund_kline.py index dc7e597f..279045dd 100644 --- a/vnpy/component/cta_fund_kline.py +++ b/vnpy/component/cta_fund_kline.py @@ -342,7 +342,7 @@ class FundKline(object): return all_holding_profit, holded def on_bar(self, *args, **kwargs): - if self.onbar_callback and len(args) > 0: + if self.onbar_callback and (len(args) > 0 or len(kwargs) > 0): try: self.onbar_callback(*args, **kwargs) except Exception as ex: diff --git a/vnpy/component/cta_renko_bar.py b/vnpy/component/cta_renko_bar.py index 639d0b2a..3b7da59a 100644 --- a/vnpy/component/cta_renko_bar.py +++ b/vnpy/component/cta_renko_bar.py @@ -1610,8 +1610,10 @@ class CtaRenkoBar(object): """ if self.para_ma1_len <=0 and self.para_ma2_len <=0 and self.para_ma3_len <= 0: return - - rt_close_array = np.append(self.close_array, [self.cur_bar.close_price]) + if self.cur_bar: + rt_close_array = np.append(self.close_array, [self.cur_bar.close_price]) + else: + rt_close_array = self.close_array if self.para_ma1_len > 0: count_len = min(self.bar_len, self.para_ma1_len) diff --git a/vnpy/data/tdx/tdx_future_data.py b/vnpy/data/tdx/tdx_future_data.py index 693f4971..90fa64d5 100644 --- a/vnpy/data/tdx/tdx_future_data.py +++ b/vnpy/data/tdx/tdx_future_data.py @@ -104,6 +104,8 @@ class TdxFutureData(object): self.symbol_exchange_dict = {} # tdx合约与vn交易所的字典 self.symbol_market_dict = copy.copy(INIT_TDX_MARKET_MAP) # tdx合约与tdx市场的字典 self.strategy = strategy + + # 所有期货合约的本地缓存 self.future_contracts = get_future_contracts() def write_log(self, content): @@ -503,17 +505,21 @@ class TdxFutureData(object): self.connect(is_reconnect=True) return results - def get_mi_contracts2(self): """ 获取主力合约""" self.connect() contracts = [] for exchange in Vn_Tdx_Exchange_Map.keys(): + self.write_log(f'查询{exchange.value}') contracts.extend(self.get_mi_contracts_from_exchange(exchange)) + # 合约的持仓、主力合约清单发生变化,需要更新 + save_future_contracts(self.future_contracts) + return contracts def get_mi_contracts_from_exchange(self, exchange): + """获取主力合约""" contracts = self.get_contracts(exchange) if len(contracts) == 0: @@ -529,16 +535,70 @@ class TdxFutureData(object): code = contract.get('code') if code[-2:] in ['L9', 'L8', 'L0', 'L1', 'L2', 'L3', '50'] or \ (exchange == Exchange.CFFEX and code[-3:] in ['300', '500']): + #self.write_log(f'过滤:{exchange.value}:{code}') continue short_symbol = get_underlying_symbol(code).upper() contract_list = short_contract_dict.get(short_symbol, []) contract_list.append(contract) short_contract_dict.update({short_symbol: contract_list}) + # { 短合约: [合约的最新quote行情] } for k, v in short_contract_dict.items(): - sorted_list = sorted(v, key=lambda c: c['ZongLiang']) + if len(v) == 0: + self.write_error(f'{k}合约对应的所有合约为空') + continue + + # 缓存的期货合约配置 + cache_info = self.future_contracts.get(k, {}) + # 缓存的所有当前合约清单 + cache_symbols = cache_info.get('symbols', []) + new_symbols = sorted([c.get('code') for c in v]) + + # 检查交易所是否一致 + cache_exchange = cache_info.get('exchange', '') + if len(cache_exchange) > 0 and cache_exchange != exchange.value: + if not (cache_exchange == 'INE' and exchange == Exchange.SHFE): + continue + + # 判断前置条件1:缓存的清单数量, + if len(cache_symbols) > 0: + if len(new_symbols) < len(cache_symbols) * 0.8: + self.write_error(f'查询的期货合约{new_symbols} 总数小于 缓存 {cache_symbols} 的80%数量,不做处理') + continue + + # 判断前置条件2: + cache_mi_symbol = cache_info.get('full_symbol') + # 之前的主力合约不在当前所有合约清单中 + if cache_mi_symbol and cache_mi_symbol not in new_symbols: + # 之前的主力合约,必须小于所有的合约 + if not all([cache_mi_symbol 0: + cache_info.update({'symbols': new_symbols}) + + self.future_contracts.update({k: cache_info}) + # 更新 + mi_contracts.append(select_data) - mi_contracts.append(sorted_list[-1]) return mi_contracts diff --git a/vnpy/data/tdx/test_tdx_future.py b/vnpy/data/tdx/test_tdx_future.py index 24afe61d..d2201312 100644 --- a/vnpy/data/tdx/test_tdx_future.py +++ b/vnpy/data/tdx/test_tdx_future.py @@ -53,7 +53,7 @@ api_01.qry_instrument() # 获取历史分钟线 """ ret,bars = api_01.get_bars('I2001', period='1min', callback=t1.display_bar, start_dt=datetime.now().replace(hour=0,minute=0,second=0,microsecond=0)) -line_close_oi = [{'close':x.close,'oi':x.openInterest} for x in bars] +line_close_oi = [{'close':x.close_price,'oi':x.open_interest} for x in bars] import pandas as pd df = pd.DataFrame(line_close_oi) corr = df.corr() diff --git a/vnpy/gateway/futu/futu_gateway.py b/vnpy/gateway/futu/futu_gateway.py index d692c8f2..f23f5aea 100644 --- a/vnpy/gateway/futu/futu_gateway.py +++ b/vnpy/gateway/futu/futu_gateway.py @@ -3,11 +3,13 @@ Please install futu-api before use. """ from copy import copy +from collections import OrderedDict from datetime import datetime from threading import Thread from time import sleep from futu import ( + KLType, ModifyOrderOp, TrdSide, TrdEnv, @@ -26,8 +28,9 @@ from futu import ( from vnpy.trader.constant import Direction, Exchange, Product, Status from vnpy.trader.event import EVENT_TIMER -from vnpy.trader.gateway import BaseGateway +from vnpy.trader.gateway import BaseGateway, LocalOrderManager from vnpy.trader.object import ( + BarData, TickData, OrderData, TradeData, @@ -36,7 +39,9 @@ from vnpy.trader.object import ( PositionData, SubscribeRequest, OrderRequest, - CancelRequest + CancelRequest, + HistoryRequest, + Interval ) EXCHANGE_VT2FUTU = { @@ -73,23 +78,31 @@ STATUS_FUTU2VT = { OrderStatus.DISABLED: Status.CANCELLED, } +KLTYPE_MINUTES = [1, 3, 5, 15, 30, 60] + class FutuGateway(BaseGateway): - """""" + """ + 富途证券API + # 网络访问路径: vnpy=>FutuGateway=>FutuOpenD 本地客户端[端口11111] => 富途证券 + # FutuOpenD下载地址 https://www.futunn.com/download/openAPI?lang=zh-CN + # windows: 安装完毕后,使用客户端登录=》短信验证=》建立本地11111端口侦听 + """ default_setting = { - "密码": "", + "密码": "", # 交易密码 "地址": "127.0.0.1", "端口": 11111, "市场": ["HK", "US"], "环境": [TrdEnv.REAL, TrdEnv.SIMULATE], } + # 支持的交易所清单 exchanges = list(EXCHANGE_FUTU2VT.values()) - def __init__(self, event_engine): + def __init__(self, event_engine, gateway_name="FUTU"): """Constructor""" - super(FutuGateway, self).__init__(event_engine, "FUTU") + super(FutuGateway, self).__init__(event_engine, gateway_name) self.quote_ctx = None self.trade_ctx = None @@ -104,6 +117,9 @@ class FutuGateway(BaseGateway): self.trades = set() self.contracts = {} + # 引入本地委托单号《=》接口委托单号的管理 + self.order_manager = LocalOrderManager(gateway=self, order_prefix='', order_rjust=4) + self.thread = Thread(target=self.query_data) # For query function. @@ -126,6 +142,7 @@ class FutuGateway(BaseGateway): def query_data(self): """ + 使用异步线程单独查询 Query all data necessary. """ sleep(2.0) # Wait 2 seconds till connection completed. @@ -140,7 +157,7 @@ class FutuGateway(BaseGateway): self.event_engine.register(EVENT_TIMER, self.process_timer_event) def process_timer_event(self, event): - """""" + """定时器""" self.count += 1 if self.count < self.interval: return @@ -152,12 +169,16 @@ class FutuGateway(BaseGateway): def connect_quote(self): """ Connect to market data server. + 连接行情服务器 """ + self.quote_ctx = OpenQuoteContext(self.host, self.port) + # 股票行情处理的实现 class QuoteHandler(StockQuoteHandlerBase): gateway = self + # 处理信息回调 =》 gateway.process_quote def on_recv_rsp(self, rsp_str): ret_code, content = super(QuoteHandler, self).on_recv_rsp( rsp_str @@ -167,9 +188,11 @@ class FutuGateway(BaseGateway): self.gateway.process_quote(content) return RET_OK, content + # 订单簿的实现 class OrderBookHandler(OrderBookHandlerBase): gateway = self + # 处理订单簿信息流回调 => gateway.process_orderbook def on_recv_rsp(self, rsp_str): ret_code, content = super(OrderBookHandler, self).on_recv_rsp( rsp_str @@ -179,6 +202,7 @@ class FutuGateway(BaseGateway): self.gateway.process_orderbook(content) return RET_OK, content + # 绑定两个实现方法 self.quote_ctx.set_handler(QuoteHandler()) self.quote_ctx.set_handler(OrderBookHandler()) self.quote_ctx.start() @@ -188,6 +212,7 @@ class FutuGateway(BaseGateway): def connect_trade(self): """ Connect to trade server. + 连接交易服务器 """ # Initialize context according to market. if self.market == "US": @@ -196,9 +221,11 @@ class FutuGateway(BaseGateway): self.trade_ctx = OpenHKTradeContext(self.host, self.port) # Implement handlers. + # 订单回报的实现 class OrderHandler(TradeOrderHandlerBase): gateway = self + # 订单回报流 =》gateway.process_order def on_recv_rsp(self, rsp_str): ret_code, content = super(OrderHandler, self).on_recv_rsp( rsp_str @@ -208,9 +235,11 @@ class FutuGateway(BaseGateway): self.gateway.process_order(content) return RET_OK, content + # 交易回报的实现 class DealHandler(TradeDealHandlerBase): gateway = self + # 成交回报流 =》 gateway.process_deal def on_recv_rsp(self, rsp_str): ret_code, content = super(DealHandler, self).on_recv_rsp( rsp_str @@ -221,6 +250,7 @@ class FutuGateway(BaseGateway): return RET_OK, content # Unlock to allow trading. + # 解锁交易接口 code, data = self.trade_ctx.unlock_trade(self.password) if code == RET_OK: self.write_log("交易接口解锁成功") @@ -228,13 +258,14 @@ class FutuGateway(BaseGateway): self.write_log(f"交易接口解锁失败,原因:{data}") # Start context. + # 绑定订单回报、成交回报 self.trade_ctx.set_handler(OrderHandler()) self.trade_ctx.set_handler(DealHandler()) self.trade_ctx.start() self.write_log("交易接口连接成功") def subscribe(self, req: SubscribeRequest): - """""" + """订阅行情""" for data_type in ["QUOTE", "ORDER_BOOK"]: futu_symbol = convert_symbol_vt2futu(req.symbol, req.exchange) code, data = self.quote_ctx.subscribe(futu_symbol, data_type, True) @@ -242,8 +273,177 @@ class FutuGateway(BaseGateway): if code: self.write_log(f"订阅行情失败:{data}") + def query_history(self, req: HistoryRequest): + """查询某只股票的历史K线数据""" + history = [] + limit = 60 + + if req.interval not in [Interval.MINUTE, Interval.DAILY]: + self.write_error(f'查询股票历史范围,本接口只支持分钟/日线') + return history + + futu_code = '{}.{}'.format(EXCHANGE_VT2FUTU.get(req.exchange), req.symbol) + + if req.interval == Interval.MINUTE: + if req.interval_num not in KLTYPE_MINUTES: + self.write_error(f'查询股票历史范围,请求分钟数{req.interval_num}不在范围:{KLTYPE_MINUTES}') + return history + k_type = f'K_{req.interval_num}M' + else: + if req.interval_num != 1: + self.write_error(f'查询股票历史范围,请求日线{req.interval_num}只能是1') + return history + k_type = KLType.K_DAY + start_date = req.start.strftime('%Y-%m-%d') + end_date = req.end.strftime('%Y-%m-%d') if req.end else None + + ret, df, page_req_key = self.quote_ctx.request_history_kline( + code=futu_code, + ktype=k_type, + start=start_date, + end=end_date, + max_count=limit) # 每页5个,请求第一页 + if ret == RET_OK: + for index, row in df.iterrows(): + symbol = row['code'] + str_time = row['time_key'] + dt = datetime.strptime(str_time, '%Y-%m-%d %H:%M:%S') + bar = BarData( + gateway_name=self.gateway_name, + symbol=row['code'], + exchange=req.exchange, + datetime=dt, + trading_day=dt.strftime('%Y-%m-%d'), + interval=req.interval, + interval_num=req.interval_num, + volume=row['volume'], + open_price=float(row['open']), + high_price=float(row['high']), + low_price=float(row['low']), + close_price=float(row['close']) + ) + history.append(bar) + else: + return history + while page_req_key != None: # 请求后面的所有结果 + ret, df, page_req_key = self.quote_ctx.request_history_kline( + code=futu_code, + ktype=k_type, + start=start_date, + end=end_date, + page_req_key=page_req_key) # 请求翻页后的数据 + if ret == RET_OK: + for index, row in df.iterrows(): + symbol = row['code'] + str_time = row['time_key'] + dt = datetime.strptime(str_time, '%Y-%m-%d %H:%M:%S') + bar = BarData( + gateway_name=self.gateway_name, + symbol=row['code'], + exchange=req.exchange, + datetime=dt, + trading_day=dt.strftime('%Y-%m-%d'), + interval=req.interval, + interval_num=req.interval_num, + volume=row['volume'], + open_price=float(row['open']), + high_price=float(row['high']), + low_price=float(row['low']), + close_price=float(row['close']) + ) + history.append(bar) + + return history + + def download_bars(self, req: HistoryRequest): + """获取某只股票的历史K线数据""" + history = [] + limit = 60 + + if req.interval not in [Interval.MINUTE, Interval.DAILY]: + self.write_error(f'查询股票历史范围,本接口只支持分钟/日线') + return history + + futu_code = '{}.{}'.format(EXCHANGE_VT2FUTU.get(req.exchange), req.symbol) + + if req.interval == Interval.MINUTE: + if req.interval_num not in KLTYPE_MINUTES: + self.write_error(f'查询股票历史范围,请求分钟数{req.interval_num}不在范围:{KLTYPE_MINUTES}') + return history + k_type = f'K_{req.interval_num}M' + else: + if req.interval_num != 1: + self.write_error(f'查询股票历史范围,请求日线{req.interval_num}只能是1') + return history + k_type = KLType.K_DAY + start_date = req.start.strftime('%Y-%m-%d') + end_date = req.end.strftime('%Y-%m-%d') if req.end else None + + ret, df, page_req_key = self.quote_ctx.request_history_kline( + code=futu_code, + ktype=k_type, + start=start_date, + end=end_date, + max_count=limit) # 每页5个,请求第一页 + if ret == RET_OK: + for index, row in df.iterrows(): + symbol = row['code'] + str_time = row['time_key'] + dt = datetime.strptime(str_time, '%Y-%m-%d %H:%M:%S') + bar = OrderedDict({ + "datetime": str_time, + "open": float(row['open']), + "close": float(row['close']), + "high": float(row['high']), + "low": float(row['low']), + "volume": row['volume'], + "amount": row['turnover'], + "symbol": row['code'], + "trading_date": dt.strftime('%Y-%m-%d'), + "date": dt.strftime('%Y-%m-%d'), + "time": dt.strftime('%H:%M:%S'), + "pre_close": float(row['last_close']), + "turnover_rate": float(row.get('turnover_rate', 0)), + "change_rate": float(row.get('change_rate', 0)) + + }) + history.append(bar) + else: + return history + while page_req_key != None: # 请求后面的所有结果 + ret, df, page_req_key = self.quote_ctx.request_history_kline( + code=futu_code, + ktype=k_type, + start=start_date, + end=end_date, + page_req_key=page_req_key) # 请求翻页后的数据 + if ret == RET_OK: + for index, row in df.iterrows(): + symbol = row['code'] + str_time = row['time_key'] + dt = datetime.strptime(str_time, '%Y-%m-%d %H:%M:%S') + bar = OrderedDict({ + "datetime": str_time, + "open": float(row['open']), + "close": float(row['close']), + "high": float(row['high']), + "low": float(row['low']), + "volume": row['volume'], + "amount": row['turnover'], + "symbol": row['code'], + "trading_date": dt.strftime('%Y-%m-%d'), + "date": dt.strftime('%Y-%m-%d'), + "time": dt.strftime('%H:%M:%S'), + "pre_close": float(row['last_close']), + "turnover_rate": float(row.get('turnover_rate', 0)), + "change_rate": float(row.get('change_rate', 0)) + }) + history.append(bar) + + return history + def send_order(self, req: OrderRequest): - """""" + """发送委托""" side = DIRECTION_VT2FUTU[req.direction] futu_order_type = OrderType.NORMAL # Only limit order is supported. @@ -254,6 +454,19 @@ class FutuGateway(BaseGateway): adjust_limit = -0.05 futu_symbol = convert_symbol_vt2futu(req.symbol, req.exchange) + + # 港股交易手数为整数 + if req.exchange == Exchange.SEHK: + self.write_log(f'交易手数:{req.volume}=>{int(req.volume)}') + req.volume = int(req.volume) + + local_orderid = self.order_manager.new_local_orderid() + order = req.create_order_data(local_orderid, self.gateway_name) + + # 发出委托确认 + order.status = Status.SUBMITTING + self.order_manager.on_order(order) + code, data = self.trade_ctx.place_order( req.price, req.volume, @@ -266,23 +479,59 @@ class FutuGateway(BaseGateway): if code: self.write_log(f"委托失败:{data}") + order.status = Status.REJECTED + self.order_manager.on_order(order) return "" + sys_orderid = "" for ix, row in data.iterrows(): - orderid = str(row["order_id"]) + sys_orderid = str(row.get("order_id","")) + if len(sys_orderid) > 0: + self.write_log(f'系统委托号:{sys_orderid}') + break + + if len(sys_orderid) == 0: + order.status = Status.REJECTED + self.order_manager.on_order(order) + return "" + + # 绑定 系统委托号 + order.sys_orderid = sys_orderid + order.status = Status.NOTTRADED + self.order_manager.update_orderid_map(local_orderid, sys_orderid) + # 更新订单为已委托 + self.order_manager.on_order(copy(order)) - order = req.create_order_data(orderid, self.gateway_name) - self.on_order(order) return order.vt_orderid def cancel_order(self, req: CancelRequest): """""" + + order = self.order_manager.get_order_with_local_orderid(req.orderid) + + # 更新订单委托状态为正在撤销 + if order: + if order.status in [Status.REJECTED, Status.ALLTRADED, Status.CANCELLED]: + self.write_error(f'委托单:{req.orderid},状态已经是:{order.status},不能撤单') + return False + + order.status = Status.CANCELLING + self.order_manager.on_order(order) + sys_orderid = order.sys_orderid + else: + sys_orderid = req.orderid + + # 向接口发出撤单请求 code, data = self.trade_ctx.modify_order( - ModifyOrderOp.CANCEL, req.orderid, 0, 0, trd_env=self.env + ModifyOrderOp.CANCEL, sys_orderid, 0, 0, trd_env=self.env ) if code: self.write_log(f"撤单失败:{data}") + return False + else: + self.write_log(f'成功发出撤单请求:orderid={req.orderid},sys_orderid:{sys_orderid}') + return True def query_contract(self): """""" @@ -291,6 +540,8 @@ class FutuGateway(BaseGateway): self.market, futu_product ) + self.write_log(f'开始查询{futu_product}市场的合约清单') + if code: self.write_log(f"查询合约信息失败:{data}") return @@ -305,6 +556,7 @@ class FutuGateway(BaseGateway): size=1, pricetick=0.001, net_position=True, + history_data=True, gateway_name=self.gateway_name, ) self.on_contract(contract) @@ -459,38 +711,73 @@ class FutuGateway(BaseGateway): continue symbol, exchange = convert_symbol_futu2vt(row["code"]) - order = OrderData( - symbol=symbol, - exchange=exchange, - orderid=str(row["order_id"]), - direction=DIRECTION_FUTU2VT[row["trd_side"]], - price=float(row["price"]), - volume=row["qty"], - traded=row["dealt_qty"], - status=STATUS_FUTU2VT[row["order_status"]], - time=row["create_time"].split(" ")[-1], - gateway_name=self.gateway_name, - ) - self.on_order(order) + # 获取系统委托编号 + sys_orderid = str(row["order_id"]) + + # 系统委托变化=》 缓存 order + order = self.order_manager.get_order_with_sys_orderid(sys_orderid) + + if order is None: + # 本地委托 《=》系统委托号 + local_orderid = self.order_manager.get_local_orderid(sys_orderid) + + # 创建本地order缓存 + order = OrderData( + symbol=symbol, + exchange=exchange, + orderid=local_orderid, + sys_orderid=sys_orderid, + direction=DIRECTION_FUTU2VT[row["trd_side"]], + price=float(row["price"]), + volume=row["qty"], + traded=row["dealt_qty"], + status=STATUS_FUTU2VT[row["order_status"]], + time=row["create_time"].split(" ")[-1], + gateway_name=self.gateway_name, + ) + self.write_log(f'新建委托单缓存=>{order.__dict__}') + self.order_manager.on_order(copy(order)) + else: + # 缓存order存在,判断状态、成交数量是否发生变化 + changed = False + order_status = STATUS_FUTU2VT[row["order_status"]] + if order.status != order_status: + order.status = order_status + changed = True + if order.traded != row["dealt_qty"]: + order.traded = row["dealt_qty"] + changed = True + if changed: + self.write_log(f'委托单更新=>{order.__dict__}') + self.order_manager.on_order(copy(order)) def process_deal(self, data): """ Process trade data for both query and update. """ for ix, row in data.iterrows(): + # 系统委托编号 tradeid = str(row["deal_id"]) if tradeid in self.trades: continue + self.trades.add(tradeid) symbol, exchange = convert_symbol_futu2vt(row["code"]) + + # 系统委托号 + sys_orderid = row["order_id"] + # 本地委托号 + local_orderid = self.order_manager.get_local_orderid(sys_orderid) + trade = TradeData( symbol=symbol, exchange=exchange, direction=DIRECTION_FUTU2VT[row["trd_side"]], tradeid=tradeid, - orderid=row["order_id"], + orderid=local_orderid, + sys_orderid=sys_orderid, price=float(row["price"]), volume=row["qty"], time=row["create_time"].split(" ")[-1], diff --git a/vnpy/trader/constant.py b/vnpy/trader/constant.py index 1a3bc789..cb43a3d2 100644 --- a/vnpy/trader/constant.py +++ b/vnpy/trader/constant.py @@ -123,6 +123,7 @@ class Exchange(Enum): TOCOM = "TOCOM" # Tokyo Commodity Exchange EUNX = "EUNX" # Euronext Exchange KRX = "KRX" # Korean Exchange + AMEX = "AMEX" # NESE American OANDA = "OANDA" # oanda.com diff --git a/vnpy/trader/engine.py b/vnpy/trader/engine.py index 4db59f6a..9ec9c6b1 100644 --- a/vnpy/trader/engine.py +++ b/vnpy/trader/engine.py @@ -264,6 +264,7 @@ class MainEngine: if gateway: return gateway.query_history(req) else: + self.write_log(f'网关为空,请检查合约得网关是否与连接得网关一致') return None def close(self) -> None: