[bug fix]

This commit is contained in:
msincenselee 2020-06-17 21:27:34 +08:00
parent 3c503d5958
commit 0d0a7818e1
14 changed files with 431 additions and 47 deletions

View File

@ -365,12 +365,12 @@ class AlgoEngine(BaseEngine):
if len(vt_accountid) > 0: if len(vt_accountid) > 0:
account = self.main_engine.get_account(vt_accountid) account = self.main_engine.get_account(vt_accountid)
return account.balance, account.avaliable, round(account.frozen * 100 / (account.balance + 0.01), 2), 100 return account.balance, account.available, round(account.frozen * 100 / (account.balance + 0.01), 2), 100
else: else:
accounts = self.main_engine.get_all_accounts() accounts = self.main_engine.get_all_accounts()
if len(accounts) > 0: if len(accounts) > 0:
account = accounts[0] account = accounts[0]
return account.balance, account.avaliable, round(account.frozen * 100 / (account.balance + 0.01), return account.balance, account.available, round(account.frozen * 100 / (account.balance + 0.01),
2), 100 2), 100
else: else:
return 0, 0, 0, 0 return 0, 0, 0, 0

View File

@ -160,7 +160,7 @@ class BackTestingEngine(object):
self.net_capital = self.init_capital # 实时资金净值每日根据capital和持仓浮盈计算 self.net_capital = self.init_capital # 实时资金净值每日根据capital和持仓浮盈计算
self.max_capital = self.init_capital # 资金最高净值 self.max_capital = self.init_capital # 资金最高净值
self.max_net_capital = self.init_capital self.max_net_capital = self.init_capital
self.avaliable = self.init_capital self.available = self.init_capital
self.max_pnl = 0 # 最高盈利 self.max_pnl = 0 # 最高盈利
self.min_pnl = 0 # 最大亏损 self.min_pnl = 0 # 最大亏损
@ -257,7 +257,7 @@ class BackTestingEngine(object):
if self.net_capital == 0.0: if self.net_capital == 0.0:
self.percent = 0.0 self.percent = 0.0
return self.net_capital, self.avaliable, self.percent, self.percent_limit return self.net_capital, self.available, self.percent, self.percent_limit
def set_test_start_date(self, start_date: str = '20100416', init_days: int = 10): def set_test_start_date(self, start_date: str = '20100416', init_days: int = 10):
"""设置回测的启动日期""" """设置回测的启动日期"""
@ -290,7 +290,7 @@ class BackTestingEngine(object):
self.net_capital = capital # 实时资金净值每日根据capital和持仓浮盈计算 self.net_capital = capital # 实时资金净值每日根据capital和持仓浮盈计算
self.max_capital = capital # 资金最高净值 self.max_capital = capital # 资金最高净值
self.max_net_capital = capital self.max_net_capital = capital
self.avaliable = capital self.available = capital
self.init_capital = capital self.init_capital = capital
def set_margin_rate(self, vt_symbol: str, margin_rate: float): def set_margin_rate(self, vt_symbol: str, margin_rate: float):
@ -1705,7 +1705,7 @@ class BackTestingEngine(object):
0) 0)
# 可用资金 = 当前净值 - 占用保证金 # 可用资金 = 当前净值 - 占用保证金
self.avaliable = self.net_capital - occupy_money self.available = self.net_capital - occupy_money
# 当前保证金占比 # 当前保证金占比
self.percent = round(float(occupy_money * 100 / self.net_capital), 2) self.percent = round(float(occupy_money * 100 / self.net_capital), 2)
# 更新最大保证金占比 # 更新最大保证金占比
@ -1759,7 +1759,7 @@ class BackTestingEngine(object):
self.write_log(msg) self.write_log(msg)
# 重新计算一次avaliable # 重新计算一次avaliable
self.avaliable = self.net_capital - occupy_money self.available = self.net_capital - occupy_money
self.percent = round(float(occupy_money * 100 / self.net_capital), 2) self.percent = round(float(occupy_money * 100 / self.net_capital), 2)
def saving_daily_data(self, d, c, m, commission, benchmark=0): def saving_daily_data(self, d, c, m, commission, benchmark=0):

View File

@ -220,9 +220,10 @@ class CtaEngine(BaseEngine):
all_strategy_pos = self.get_all_strategy_pos() all_strategy_pos = self.get_all_strategy_pos()
# 每5分钟检查一次 # 每5分钟检查一次
if dt.minute % 5 == 0: if dt.minute % 10 == 0:
# 比对仓位,使用上述获取得持仓信息,不用重复获取 # 比对仓位,使用上述获取得持仓信息,不用重复获取
self.compare_pos(strategy_pos_list=copy(all_strategy_pos)) #self.compare_pos(strategy_pos_list=copy(all_strategy_pos))
pass
# 推送到事件 # 推送到事件
self.put_all_strategy_pos_event(all_strategy_pos) self.put_all_strategy_pos_event(all_strategy_pos)
@ -1777,8 +1778,10 @@ class CtaEngine(BaseEngine):
self.logger.log(level, msg) self.logger.log(level, msg)
# 如果日志数据异常错误和告警输出至sys.stderr # 如果日志数据异常错误和告警输出至sys.stderr
if level in [logging.CRITICAL, logging.ERROR, logging.WARNING]: if level in [logging.CRITICAL]:
print(f"{strategy_name}: {msg}" if strategy_name else msg, file=sys.stderr) log_msg = f"{strategy_name}: {msg}" if strategy_name else msg
print(log_msg, file=sys.stderr)
send_wx_msg(log_msg)
def write_error(self, msg: str, strategy_name: str = ''): def write_error(self, msg: str, strategy_name: str = ''):
"""写入错误日志""" """写入错误日志"""

View File

@ -875,11 +875,14 @@ class CtaStockTemplate(CtaTemplate):
# 多单网格逐一止损/止盈检查: # 多单网格逐一止损/止盈检查:
long_grids = self.gt.get_opened_grids(direction=Direction.LONG) long_grids = self.gt.get_opened_grids(direction=Direction.LONG)
for lg in long_grids: for lg in long_grids:
if lg.close_status or lg.order_status or not lg.open_status: if lg.close_status or lg.order_status or not lg.open_status:
continue continue
cur_price = self.cta_engine.get_price(lg.vt_symbol) cur_price = self.cta_engine.get_price(lg.vt_symbol)
if cur_price is None:
self.write_log(f'没有获取到{lg.vt_symbol}的当前价格,提交订阅')
self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=lg.vt_symbol)
continue
# 主动止盈 # 主动止盈
if 0 < lg.close_price <= cur_price: if 0 < lg.close_price <= cur_price:
@ -1004,6 +1007,11 @@ class CtaStockTemplate(CtaTemplate):
# 实盘运行时,要加入市场买卖量的判断 # 实盘运行时,要加入市场买卖量的判断
if not self.backtesting: if not self.backtesting:
symbol_tick = self.cta_engine.get_tick(vt_symbol) symbol_tick = self.cta_engine.get_tick(vt_symbol)
if symbol_tick is None:
self.cta_engine.subscribe_symbol(strategy_name=self.strategy_name, vt_symbol=vt_symbol)
self.write_log(f'获取不到{vt_symbol}得tick,无法根据市场深度进行计算')
return
symbol_volume_tick = self.cta_engine.get_volume_tick(vt_symbol) symbol_volume_tick = self.cta_engine.get_volume_tick(vt_symbol)
# 根据市场计算前5档买单数量 # 根据市场计算前5档买单数量
if all([symbol_tick.ask_volume_1, symbol_tick.ask_volume_2, symbol_tick.ask_volume_3, if all([symbol_tick.ask_volume_1, symbol_tick.ask_volume_2, symbol_tick.ask_volume_3,

View File

@ -920,6 +920,10 @@ class CtaEngine(BaseEngine):
) )
bars = self.main_engine.query_history(req, contract.gateway_name) bars = self.main_engine.query_history(req, contract.gateway_name)
if bars is None:
self.write_error(f'获取不到历史K线:{req.__dict__}')
return
for bar in bars: for bar in bars:
if bar.trading_day: if bar.trading_day:
bar.trading_day = bar.datetime.strftime('%Y-%m-%d') bar.trading_day = bar.datetime.strftime('%Y-%m-%d')

View File

@ -1512,6 +1512,14 @@ class CtaProFutureTemplate(CtaProTemplate):
self.write_log(u'{}涨停不做cover'.format(order_vt_symbol)) self.write_log(u'{}涨停不做cover'.format(order_vt_symbol))
return return
pos = self.cta_engine.get_position_holding(vt_symbol=order_vt_symbol)
if pos is None:
self.write_error(f'{self.strategy_name}无法获取{order_vt_symbol}的持仓信息,无法平仓')
return
if pos.short_pos < order_volume:
self.write_error(f'{self.strategy_name}{order_vt_symbol}的持仓空单{pos.short_pos}不满足平仓{order_volume}要求,无法平仓')
return
# 发送委托 # 发送委托
vt_orderids = self.cover(price=cover_price, vt_orderids = self.cover(price=cover_price,
volume=order_volume, volume=order_volume,
@ -1551,6 +1559,13 @@ class CtaProFutureTemplate(CtaProTemplate):
self.write_log(u'{}涨停不做sell'.format(order_vt_symbol)) self.write_log(u'{}涨停不做sell'.format(order_vt_symbol))
return return
pos = self.cta_engine.get_position_holding(vt_symbol=order_vt_symbol)
if pos is None:
self.write_error(f'{self.strategy_name}无法获取{order_vt_symbol}的持仓信息,无法平仓')
return
if pos.long_pos < order_volume:
self.write_error(f'{self.strategy_name}{order_vt_symbol}的持仓多单{pos.long_pos}不满足平仓{order_volume}要求,无法平仓')
return
# 发送委托 # 发送委托
vt_orderids = self.sell(price=sell_price, vt_orderids = self.sell(price=sell_price,
volume=order_volume, volume=order_volume,

View File

@ -1,3 +1,4 @@
import os
from vnpy.event import Event, EventEngine from vnpy.event import Event, EventEngine
from vnpy.trader.engine import MainEngine from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import QtCore, QtGui, QtWidgets from vnpy.trader.ui import QtCore, QtGui, QtWidgets
@ -295,7 +296,9 @@ class StrategyManager(QtWidgets.QFrame):
if snapshot is None: if snapshot is None:
return return
ui_snapshot = UiSnapshot() ui_snapshot = UiSnapshot()
ui_snapshot.show(snapshot_file="", d=snapshot) trade_csv = os.path.abspath(os.path.join(self.cta_engine.get_data_path(), f'{self.strategy_name}_trade.csv'))
tns_csv = os.path.abspath(os.path.join(self.cta_engine.get_data_path(), f'{self.strategy_name}_tns.csv'))
ui_snapshot.show(snapshot_file="", d=snapshot, trade_file=trade_csv, tns_file=tns_csv)
class DataMonitor(QtWidgets.QTableWidget): class DataMonitor(QtWidgets.QTableWidget):
""" """

View File

@ -342,7 +342,7 @@ class FundKline(object):
return all_holding_profit, holded return all_holding_profit, holded
def on_bar(self, *args, **kwargs): def on_bar(self, *args, **kwargs):
if self.onbar_callback and len(args) > 0: if self.onbar_callback and (len(args) > 0 or len(kwargs) > 0):
try: try:
self.onbar_callback(*args, **kwargs) self.onbar_callback(*args, **kwargs)
except Exception as ex: except Exception as ex:

View File

@ -1610,8 +1610,10 @@ class CtaRenkoBar(object):
""" """
if self.para_ma1_len <=0 and self.para_ma2_len <=0 and self.para_ma3_len <= 0: if self.para_ma1_len <=0 and self.para_ma2_len <=0 and self.para_ma3_len <= 0:
return return
if self.cur_bar:
rt_close_array = np.append(self.close_array, [self.cur_bar.close_price]) rt_close_array = np.append(self.close_array, [self.cur_bar.close_price])
else:
rt_close_array = self.close_array
if self.para_ma1_len > 0: if self.para_ma1_len > 0:
count_len = min(self.bar_len, self.para_ma1_len) count_len = min(self.bar_len, self.para_ma1_len)

View File

@ -104,6 +104,8 @@ class TdxFutureData(object):
self.symbol_exchange_dict = {} # tdx合约与vn交易所的字典 self.symbol_exchange_dict = {} # tdx合约与vn交易所的字典
self.symbol_market_dict = copy.copy(INIT_TDX_MARKET_MAP) # tdx合约与tdx市场的字典 self.symbol_market_dict = copy.copy(INIT_TDX_MARKET_MAP) # tdx合约与tdx市场的字典
self.strategy = strategy self.strategy = strategy
# 所有期货合约的本地缓存
self.future_contracts = get_future_contracts() self.future_contracts = get_future_contracts()
def write_log(self, content): def write_log(self, content):
@ -503,17 +505,21 @@ class TdxFutureData(object):
self.connect(is_reconnect=True) self.connect(is_reconnect=True)
return results return results
def get_mi_contracts2(self): def get_mi_contracts2(self):
""" 获取主力合约""" """ 获取主力合约"""
self.connect() self.connect()
contracts = [] contracts = []
for exchange in Vn_Tdx_Exchange_Map.keys(): for exchange in Vn_Tdx_Exchange_Map.keys():
self.write_log(f'查询{exchange.value}')
contracts.extend(self.get_mi_contracts_from_exchange(exchange)) contracts.extend(self.get_mi_contracts_from_exchange(exchange))
# 合约的持仓、主力合约清单发生变化,需要更新
save_future_contracts(self.future_contracts)
return contracts return contracts
def get_mi_contracts_from_exchange(self, exchange): def get_mi_contracts_from_exchange(self, exchange):
"""获取主力合约"""
contracts = self.get_contracts(exchange) contracts = self.get_contracts(exchange)
if len(contracts) == 0: if len(contracts) == 0:
@ -529,16 +535,70 @@ class TdxFutureData(object):
code = contract.get('code') code = contract.get('code')
if code[-2:] in ['L9', 'L8', 'L0', 'L1', 'L2', 'L3', '50'] or \ if code[-2:] in ['L9', 'L8', 'L0', 'L1', 'L2', 'L3', '50'] or \
(exchange == Exchange.CFFEX and code[-3:] in ['300', '500']): (exchange == Exchange.CFFEX and code[-3:] in ['300', '500']):
#self.write_log(f'过滤:{exchange.value}:{code}')
continue continue
short_symbol = get_underlying_symbol(code).upper() short_symbol = get_underlying_symbol(code).upper()
contract_list = short_contract_dict.get(short_symbol, []) contract_list = short_contract_dict.get(short_symbol, [])
contract_list.append(contract) contract_list.append(contract)
short_contract_dict.update({short_symbol: contract_list}) short_contract_dict.update({short_symbol: contract_list})
# { 短合约: [合约的最新quote行情] }
for k, v in short_contract_dict.items(): for k, v in short_contract_dict.items():
sorted_list = sorted(v, key=lambda c: c['ZongLiang']) if len(v) == 0:
self.write_error(f'{k}合约对应的所有合约为空')
continue
# 缓存的期货合约配置
cache_info = self.future_contracts.get(k, {})
# 缓存的所有当前合约清单
cache_symbols = cache_info.get('symbols', [])
new_symbols = sorted([c.get('code') for c in v])
# 检查交易所是否一致
cache_exchange = cache_info.get('exchange', '')
if len(cache_exchange) > 0 and cache_exchange != exchange.value:
if not (cache_exchange == 'INE' and exchange == Exchange.SHFE):
continue
# 判断前置条件1缓存的清单数量
if len(cache_symbols) > 0:
if len(new_symbols) < len(cache_symbols) * 0.8:
self.write_error(f'查询的期货合约{new_symbols} 总数小于 缓存 {cache_symbols} 的80%数量,不做处理')
continue
# 判断前置条件2
cache_mi_symbol = cache_info.get('full_symbol')
# 之前的主力合约不在当前所有合约清单中
if cache_mi_symbol and cache_mi_symbol not in new_symbols:
# 之前的主力合约,必须小于所有的合约
if not all([cache_mi_symbol<symbol for symbol in new_symbols]):
self.write_error(f'前期主力合约{cache_mi_symbol}不在当前合约清单{new_symbols}中,又不是早期合约,不做处理')
continue
# 判断前置条件3
cache_oi = cache_info.get('open_interesting', 0)
# 根据总量排序
sorted_list = sorted(v, key=lambda c: c['ZongLiang'])
select_data = sorted_list[-1]
new_mi_symbol = select_data.get('code')
new_oi = select_data.get('ZongLiang', 0)
if new_oi <= 0:
self.write_error(f'{new_mi_symbol}合约总量为0 不做处理')
continue
if 0 < new_oi < cache_oi / 50 and new_mi_symbol != cache_mi_symbol:
self.write_error(f"新合约{new_mi_symbol}总量:{select_data.get('ZongLiang', 0)} 不到旧合约{cache_mi_symbol}持仓总量:{cache_oi}的一半,不处理")
continue
cache_info.update({'open_interesting': new_oi})
if len(new_symbols) > 0:
cache_info.update({'symbols': new_symbols})
self.future_contracts.update({k: cache_info})
# 更新
mi_contracts.append(select_data)
mi_contracts.append(sorted_list[-1])
return mi_contracts return mi_contracts

View File

@ -53,7 +53,7 @@ api_01.qry_instrument()
# 获取历史分钟线 # 获取历史分钟线
""" """
ret,bars = api_01.get_bars('I2001', period='1min', callback=t1.display_bar, start_dt=datetime.now().replace(hour=0,minute=0,second=0,microsecond=0)) ret,bars = api_01.get_bars('I2001', period='1min', callback=t1.display_bar, start_dt=datetime.now().replace(hour=0,minute=0,second=0,microsecond=0))
line_close_oi = [{'close':x.close,'oi':x.openInterest} for x in bars] line_close_oi = [{'close':x.close_price,'oi':x.open_interest} for x in bars]
import pandas as pd import pandas as pd
df = pd.DataFrame(line_close_oi) df = pd.DataFrame(line_close_oi)
corr = df.corr() corr = df.corr()

View File

@ -3,11 +3,13 @@ Please install futu-api before use.
""" """
from copy import copy from copy import copy
from collections import OrderedDict
from datetime import datetime from datetime import datetime
from threading import Thread from threading import Thread
from time import sleep from time import sleep
from futu import ( from futu import (
KLType,
ModifyOrderOp, ModifyOrderOp,
TrdSide, TrdSide,
TrdEnv, TrdEnv,
@ -26,8 +28,9 @@ from futu import (
from vnpy.trader.constant import Direction, Exchange, Product, Status from vnpy.trader.constant import Direction, Exchange, Product, Status
from vnpy.trader.event import EVENT_TIMER from vnpy.trader.event import EVENT_TIMER
from vnpy.trader.gateway import BaseGateway from vnpy.trader.gateway import BaseGateway, LocalOrderManager
from vnpy.trader.object import ( from vnpy.trader.object import (
BarData,
TickData, TickData,
OrderData, OrderData,
TradeData, TradeData,
@ -36,7 +39,9 @@ from vnpy.trader.object import (
PositionData, PositionData,
SubscribeRequest, SubscribeRequest,
OrderRequest, OrderRequest,
CancelRequest CancelRequest,
HistoryRequest,
Interval
) )
EXCHANGE_VT2FUTU = { EXCHANGE_VT2FUTU = {
@ -73,23 +78,31 @@ STATUS_FUTU2VT = {
OrderStatus.DISABLED: Status.CANCELLED, OrderStatus.DISABLED: Status.CANCELLED,
} }
KLTYPE_MINUTES = [1, 3, 5, 15, 30, 60]
class FutuGateway(BaseGateway): class FutuGateway(BaseGateway):
"""""" """
富途证券API
# 网络访问路径: vnpy=>FutuGateway=>FutuOpenD 本地客户端[端口11111] => 富途证券
# FutuOpenD下载地址 https://www.futunn.com/download/openAPI?lang=zh-CN
# windows 安装完毕后,使用客户端登录=》短信验证=》建立本地11111端口侦听
"""
default_setting = { default_setting = {
"密码": "", "密码": "", # 交易密码
"地址": "127.0.0.1", "地址": "127.0.0.1",
"端口": 11111, "端口": 11111,
"市场": ["HK", "US"], "市场": ["HK", "US"],
"环境": [TrdEnv.REAL, TrdEnv.SIMULATE], "环境": [TrdEnv.REAL, TrdEnv.SIMULATE],
} }
# 支持的交易所清单
exchanges = list(EXCHANGE_FUTU2VT.values()) exchanges = list(EXCHANGE_FUTU2VT.values())
def __init__(self, event_engine): def __init__(self, event_engine, gateway_name="FUTU"):
"""Constructor""" """Constructor"""
super(FutuGateway, self).__init__(event_engine, "FUTU") super(FutuGateway, self).__init__(event_engine, gateway_name)
self.quote_ctx = None self.quote_ctx = None
self.trade_ctx = None self.trade_ctx = None
@ -104,6 +117,9 @@ class FutuGateway(BaseGateway):
self.trades = set() self.trades = set()
self.contracts = {} self.contracts = {}
# 引入本地委托单号《=》接口委托单号的管理
self.order_manager = LocalOrderManager(gateway=self, order_prefix='', order_rjust=4)
self.thread = Thread(target=self.query_data) self.thread = Thread(target=self.query_data)
# For query function. # For query function.
@ -126,6 +142,7 @@ class FutuGateway(BaseGateway):
def query_data(self): def query_data(self):
""" """
使用异步线程单独查询
Query all data necessary. Query all data necessary.
""" """
sleep(2.0) # Wait 2 seconds till connection completed. sleep(2.0) # Wait 2 seconds till connection completed.
@ -140,7 +157,7 @@ class FutuGateway(BaseGateway):
self.event_engine.register(EVENT_TIMER, self.process_timer_event) self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def process_timer_event(self, event): def process_timer_event(self, event):
"""""" """定时器"""
self.count += 1 self.count += 1
if self.count < self.interval: if self.count < self.interval:
return return
@ -152,12 +169,16 @@ class FutuGateway(BaseGateway):
def connect_quote(self): def connect_quote(self):
""" """
Connect to market data server. Connect to market data server.
连接行情服务器
""" """
self.quote_ctx = OpenQuoteContext(self.host, self.port) self.quote_ctx = OpenQuoteContext(self.host, self.port)
# 股票行情处理的实现
class QuoteHandler(StockQuoteHandlerBase): class QuoteHandler(StockQuoteHandlerBase):
gateway = self gateway = self
# 处理信息回调 =》 gateway.process_quote
def on_recv_rsp(self, rsp_str): def on_recv_rsp(self, rsp_str):
ret_code, content = super(QuoteHandler, self).on_recv_rsp( ret_code, content = super(QuoteHandler, self).on_recv_rsp(
rsp_str rsp_str
@ -167,9 +188,11 @@ class FutuGateway(BaseGateway):
self.gateway.process_quote(content) self.gateway.process_quote(content)
return RET_OK, content return RET_OK, content
# 订单簿的实现
class OrderBookHandler(OrderBookHandlerBase): class OrderBookHandler(OrderBookHandlerBase):
gateway = self gateway = self
# 处理订单簿信息流回调 => gateway.process_orderbook
def on_recv_rsp(self, rsp_str): def on_recv_rsp(self, rsp_str):
ret_code, content = super(OrderBookHandler, self).on_recv_rsp( ret_code, content = super(OrderBookHandler, self).on_recv_rsp(
rsp_str rsp_str
@ -179,6 +202,7 @@ class FutuGateway(BaseGateway):
self.gateway.process_orderbook(content) self.gateway.process_orderbook(content)
return RET_OK, content return RET_OK, content
# 绑定两个实现方法
self.quote_ctx.set_handler(QuoteHandler()) self.quote_ctx.set_handler(QuoteHandler())
self.quote_ctx.set_handler(OrderBookHandler()) self.quote_ctx.set_handler(OrderBookHandler())
self.quote_ctx.start() self.quote_ctx.start()
@ -188,6 +212,7 @@ class FutuGateway(BaseGateway):
def connect_trade(self): def connect_trade(self):
""" """
Connect to trade server. Connect to trade server.
连接交易服务器
""" """
# Initialize context according to market. # Initialize context according to market.
if self.market == "US": if self.market == "US":
@ -196,9 +221,11 @@ class FutuGateway(BaseGateway):
self.trade_ctx = OpenHKTradeContext(self.host, self.port) self.trade_ctx = OpenHKTradeContext(self.host, self.port)
# Implement handlers. # Implement handlers.
# 订单回报的实现
class OrderHandler(TradeOrderHandlerBase): class OrderHandler(TradeOrderHandlerBase):
gateway = self gateway = self
# 订单回报流 =》gateway.process_order
def on_recv_rsp(self, rsp_str): def on_recv_rsp(self, rsp_str):
ret_code, content = super(OrderHandler, self).on_recv_rsp( ret_code, content = super(OrderHandler, self).on_recv_rsp(
rsp_str rsp_str
@ -208,9 +235,11 @@ class FutuGateway(BaseGateway):
self.gateway.process_order(content) self.gateway.process_order(content)
return RET_OK, content return RET_OK, content
# 交易回报的实现
class DealHandler(TradeDealHandlerBase): class DealHandler(TradeDealHandlerBase):
gateway = self gateway = self
# 成交回报流 =》 gateway.process_deal
def on_recv_rsp(self, rsp_str): def on_recv_rsp(self, rsp_str):
ret_code, content = super(DealHandler, self).on_recv_rsp( ret_code, content = super(DealHandler, self).on_recv_rsp(
rsp_str rsp_str
@ -221,6 +250,7 @@ class FutuGateway(BaseGateway):
return RET_OK, content return RET_OK, content
# Unlock to allow trading. # Unlock to allow trading.
# 解锁交易接口
code, data = self.trade_ctx.unlock_trade(self.password) code, data = self.trade_ctx.unlock_trade(self.password)
if code == RET_OK: if code == RET_OK:
self.write_log("交易接口解锁成功") self.write_log("交易接口解锁成功")
@ -228,13 +258,14 @@ class FutuGateway(BaseGateway):
self.write_log(f"交易接口解锁失败,原因:{data}") self.write_log(f"交易接口解锁失败,原因:{data}")
# Start context. # Start context.
# 绑定订单回报、成交回报
self.trade_ctx.set_handler(OrderHandler()) self.trade_ctx.set_handler(OrderHandler())
self.trade_ctx.set_handler(DealHandler()) self.trade_ctx.set_handler(DealHandler())
self.trade_ctx.start() self.trade_ctx.start()
self.write_log("交易接口连接成功") self.write_log("交易接口连接成功")
def subscribe(self, req: SubscribeRequest): def subscribe(self, req: SubscribeRequest):
"""""" """订阅行情"""
for data_type in ["QUOTE", "ORDER_BOOK"]: for data_type in ["QUOTE", "ORDER_BOOK"]:
futu_symbol = convert_symbol_vt2futu(req.symbol, req.exchange) futu_symbol = convert_symbol_vt2futu(req.symbol, req.exchange)
code, data = self.quote_ctx.subscribe(futu_symbol, data_type, True) code, data = self.quote_ctx.subscribe(futu_symbol, data_type, True)
@ -242,8 +273,177 @@ class FutuGateway(BaseGateway):
if code: if code:
self.write_log(f"订阅行情失败:{data}") self.write_log(f"订阅行情失败:{data}")
def query_history(self, req: HistoryRequest):
"""查询某只股票的历史K线数据"""
history = []
limit = 60
if req.interval not in [Interval.MINUTE, Interval.DAILY]:
self.write_error(f'查询股票历史范围,本接口只支持分钟/日线')
return history
futu_code = '{}.{}'.format(EXCHANGE_VT2FUTU.get(req.exchange), req.symbol)
if req.interval == Interval.MINUTE:
if req.interval_num not in KLTYPE_MINUTES:
self.write_error(f'查询股票历史范围,请求分钟数{req.interval_num}不在范围:{KLTYPE_MINUTES}')
return history
k_type = f'K_{req.interval_num}M'
else:
if req.interval_num != 1:
self.write_error(f'查询股票历史范围,请求日线{req.interval_num}只能是1')
return history
k_type = KLType.K_DAY
start_date = req.start.strftime('%Y-%m-%d')
end_date = req.end.strftime('%Y-%m-%d') if req.end else None
ret, df, page_req_key = self.quote_ctx.request_history_kline(
code=futu_code,
ktype=k_type,
start=start_date,
end=end_date,
max_count=limit) # 每页5个请求第一页
if ret == RET_OK:
for index, row in df.iterrows():
symbol = row['code']
str_time = row['time_key']
dt = datetime.strptime(str_time, '%Y-%m-%d %H:%M:%S')
bar = BarData(
gateway_name=self.gateway_name,
symbol=row['code'],
exchange=req.exchange,
datetime=dt,
trading_day=dt.strftime('%Y-%m-%d'),
interval=req.interval,
interval_num=req.interval_num,
volume=row['volume'],
open_price=float(row['open']),
high_price=float(row['high']),
low_price=float(row['low']),
close_price=float(row['close'])
)
history.append(bar)
else:
return history
while page_req_key != None: # 请求后面的所有结果
ret, df, page_req_key = self.quote_ctx.request_history_kline(
code=futu_code,
ktype=k_type,
start=start_date,
end=end_date,
page_req_key=page_req_key) # 请求翻页后的数据
if ret == RET_OK:
for index, row in df.iterrows():
symbol = row['code']
str_time = row['time_key']
dt = datetime.strptime(str_time, '%Y-%m-%d %H:%M:%S')
bar = BarData(
gateway_name=self.gateway_name,
symbol=row['code'],
exchange=req.exchange,
datetime=dt,
trading_day=dt.strftime('%Y-%m-%d'),
interval=req.interval,
interval_num=req.interval_num,
volume=row['volume'],
open_price=float(row['open']),
high_price=float(row['high']),
low_price=float(row['low']),
close_price=float(row['close'])
)
history.append(bar)
return history
def download_bars(self, req: HistoryRequest):
"""获取某只股票的历史K线数据"""
history = []
limit = 60
if req.interval not in [Interval.MINUTE, Interval.DAILY]:
self.write_error(f'查询股票历史范围,本接口只支持分钟/日线')
return history
futu_code = '{}.{}'.format(EXCHANGE_VT2FUTU.get(req.exchange), req.symbol)
if req.interval == Interval.MINUTE:
if req.interval_num not in KLTYPE_MINUTES:
self.write_error(f'查询股票历史范围,请求分钟数{req.interval_num}不在范围:{KLTYPE_MINUTES}')
return history
k_type = f'K_{req.interval_num}M'
else:
if req.interval_num != 1:
self.write_error(f'查询股票历史范围,请求日线{req.interval_num}只能是1')
return history
k_type = KLType.K_DAY
start_date = req.start.strftime('%Y-%m-%d')
end_date = req.end.strftime('%Y-%m-%d') if req.end else None
ret, df, page_req_key = self.quote_ctx.request_history_kline(
code=futu_code,
ktype=k_type,
start=start_date,
end=end_date,
max_count=limit) # 每页5个请求第一页
if ret == RET_OK:
for index, row in df.iterrows():
symbol = row['code']
str_time = row['time_key']
dt = datetime.strptime(str_time, '%Y-%m-%d %H:%M:%S')
bar = OrderedDict({
"datetime": str_time,
"open": float(row['open']),
"close": float(row['close']),
"high": float(row['high']),
"low": float(row['low']),
"volume": row['volume'],
"amount": row['turnover'],
"symbol": row['code'],
"trading_date": dt.strftime('%Y-%m-%d'),
"date": dt.strftime('%Y-%m-%d'),
"time": dt.strftime('%H:%M:%S'),
"pre_close": float(row['last_close']),
"turnover_rate": float(row.get('turnover_rate', 0)),
"change_rate": float(row.get('change_rate', 0))
})
history.append(bar)
else:
return history
while page_req_key != None: # 请求后面的所有结果
ret, df, page_req_key = self.quote_ctx.request_history_kline(
code=futu_code,
ktype=k_type,
start=start_date,
end=end_date,
page_req_key=page_req_key) # 请求翻页后的数据
if ret == RET_OK:
for index, row in df.iterrows():
symbol = row['code']
str_time = row['time_key']
dt = datetime.strptime(str_time, '%Y-%m-%d %H:%M:%S')
bar = OrderedDict({
"datetime": str_time,
"open": float(row['open']),
"close": float(row['close']),
"high": float(row['high']),
"low": float(row['low']),
"volume": row['volume'],
"amount": row['turnover'],
"symbol": row['code'],
"trading_date": dt.strftime('%Y-%m-%d'),
"date": dt.strftime('%Y-%m-%d'),
"time": dt.strftime('%H:%M:%S'),
"pre_close": float(row['last_close']),
"turnover_rate": float(row.get('turnover_rate', 0)),
"change_rate": float(row.get('change_rate', 0))
})
history.append(bar)
return history
def send_order(self, req: OrderRequest): def send_order(self, req: OrderRequest):
"""""" """发送委托"""
side = DIRECTION_VT2FUTU[req.direction] side = DIRECTION_VT2FUTU[req.direction]
futu_order_type = OrderType.NORMAL # Only limit order is supported. futu_order_type = OrderType.NORMAL # Only limit order is supported.
@ -254,6 +454,19 @@ class FutuGateway(BaseGateway):
adjust_limit = -0.05 adjust_limit = -0.05
futu_symbol = convert_symbol_vt2futu(req.symbol, req.exchange) futu_symbol = convert_symbol_vt2futu(req.symbol, req.exchange)
# 港股交易手数为整数
if req.exchange == Exchange.SEHK:
self.write_log(f'交易手数:{req.volume}=>{int(req.volume)}')
req.volume = int(req.volume)
local_orderid = self.order_manager.new_local_orderid()
order = req.create_order_data(local_orderid, self.gateway_name)
# 发出委托确认
order.status = Status.SUBMITTING
self.order_manager.on_order(order)
code, data = self.trade_ctx.place_order( code, data = self.trade_ctx.place_order(
req.price, req.price,
req.volume, req.volume,
@ -266,23 +479,59 @@ class FutuGateway(BaseGateway):
if code: if code:
self.write_log(f"委托失败:{data}") self.write_log(f"委托失败:{data}")
order.status = Status.REJECTED
self.order_manager.on_order(order)
return "" return ""
sys_orderid = ""
for ix, row in data.iterrows(): for ix, row in data.iterrows():
orderid = str(row["order_id"]) sys_orderid = str(row.get("order_id",""))
if len(sys_orderid) > 0:
self.write_log(f'系统委托号:{sys_orderid}')
break
if len(sys_orderid) == 0:
order.status = Status.REJECTED
self.order_manager.on_order(order)
return ""
# 绑定 系统委托号
order.sys_orderid = sys_orderid
order.status = Status.NOTTRADED
self.order_manager.update_orderid_map(local_orderid, sys_orderid)
# 更新订单为已委托
self.order_manager.on_order(copy(order))
order = req.create_order_data(orderid, self.gateway_name)
self.on_order(order)
return order.vt_orderid return order.vt_orderid
def cancel_order(self, req: CancelRequest): def cancel_order(self, req: CancelRequest):
"""""" """"""
order = self.order_manager.get_order_with_local_orderid(req.orderid)
# 更新订单委托状态为正在撤销
if order:
if order.status in [Status.REJECTED, Status.ALLTRADED, Status.CANCELLED]:
self.write_error(f'委托单:{req.orderid},状态已经是:{order.status},不能撤单')
return False
order.status = Status.CANCELLING
self.order_manager.on_order(order)
sys_orderid = order.sys_orderid
else:
sys_orderid = req.orderid
# 向接口发出撤单请求
code, data = self.trade_ctx.modify_order( code, data = self.trade_ctx.modify_order(
ModifyOrderOp.CANCEL, req.orderid, 0, 0, trd_env=self.env ModifyOrderOp.CANCEL, sys_orderid, 0, 0, trd_env=self.env
) )
if code: if code:
self.write_log(f"撤单失败:{data}") self.write_log(f"撤单失败:{data}")
return False
else:
self.write_log(f'成功发出撤单请求:orderid={req.orderid},sys_orderid:{sys_orderid}')
return True
def query_contract(self): def query_contract(self):
"""""" """"""
@ -291,6 +540,8 @@ class FutuGateway(BaseGateway):
self.market, futu_product self.market, futu_product
) )
self.write_log(f'开始查询{futu_product}市场的合约清单')
if code: if code:
self.write_log(f"查询合约信息失败:{data}") self.write_log(f"查询合约信息失败:{data}")
return return
@ -305,6 +556,7 @@ class FutuGateway(BaseGateway):
size=1, size=1,
pricetick=0.001, pricetick=0.001,
net_position=True, net_position=True,
history_data=True,
gateway_name=self.gateway_name, gateway_name=self.gateway_name,
) )
self.on_contract(contract) self.on_contract(contract)
@ -459,38 +711,73 @@ class FutuGateway(BaseGateway):
continue continue
symbol, exchange = convert_symbol_futu2vt(row["code"]) symbol, exchange = convert_symbol_futu2vt(row["code"])
order = OrderData(
symbol=symbol,
exchange=exchange,
orderid=str(row["order_id"]),
direction=DIRECTION_FUTU2VT[row["trd_side"]],
price=float(row["price"]),
volume=row["qty"],
traded=row["dealt_qty"],
status=STATUS_FUTU2VT[row["order_status"]],
time=row["create_time"].split(" ")[-1],
gateway_name=self.gateway_name,
)
self.on_order(order) # 获取系统委托编号
sys_orderid = str(row["order_id"])
# 系统委托变化=》 缓存 order
order = self.order_manager.get_order_with_sys_orderid(sys_orderid)
if order is None:
# 本地委托 《=》系统委托号
local_orderid = self.order_manager.get_local_orderid(sys_orderid)
# 创建本地order缓存
order = OrderData(
symbol=symbol,
exchange=exchange,
orderid=local_orderid,
sys_orderid=sys_orderid,
direction=DIRECTION_FUTU2VT[row["trd_side"]],
price=float(row["price"]),
volume=row["qty"],
traded=row["dealt_qty"],
status=STATUS_FUTU2VT[row["order_status"]],
time=row["create_time"].split(" ")[-1],
gateway_name=self.gateway_name,
)
self.write_log(f'新建委托单缓存=>{order.__dict__}')
self.order_manager.on_order(copy(order))
else:
# 缓存order存在判断状态、成交数量是否发生变化
changed = False
order_status = STATUS_FUTU2VT[row["order_status"]]
if order.status != order_status:
order.status = order_status
changed = True
if order.traded != row["dealt_qty"]:
order.traded = row["dealt_qty"]
changed = True
if changed:
self.write_log(f'委托单更新=>{order.__dict__}')
self.order_manager.on_order(copy(order))
def process_deal(self, data): def process_deal(self, data):
""" """
Process trade data for both query and update. Process trade data for both query and update.
""" """
for ix, row in data.iterrows(): for ix, row in data.iterrows():
# 系统委托编号
tradeid = str(row["deal_id"]) tradeid = str(row["deal_id"])
if tradeid in self.trades: if tradeid in self.trades:
continue continue
self.trades.add(tradeid) self.trades.add(tradeid)
symbol, exchange = convert_symbol_futu2vt(row["code"]) symbol, exchange = convert_symbol_futu2vt(row["code"])
# 系统委托号
sys_orderid = row["order_id"]
# 本地委托号
local_orderid = self.order_manager.get_local_orderid(sys_orderid)
trade = TradeData( trade = TradeData(
symbol=symbol, symbol=symbol,
exchange=exchange, exchange=exchange,
direction=DIRECTION_FUTU2VT[row["trd_side"]], direction=DIRECTION_FUTU2VT[row["trd_side"]],
tradeid=tradeid, tradeid=tradeid,
orderid=row["order_id"], orderid=local_orderid,
sys_orderid=sys_orderid,
price=float(row["price"]), price=float(row["price"]),
volume=row["qty"], volume=row["qty"],
time=row["create_time"].split(" ")[-1], time=row["create_time"].split(" ")[-1],

View File

@ -123,6 +123,7 @@ class Exchange(Enum):
TOCOM = "TOCOM" # Tokyo Commodity Exchange TOCOM = "TOCOM" # Tokyo Commodity Exchange
EUNX = "EUNX" # Euronext Exchange EUNX = "EUNX" # Euronext Exchange
KRX = "KRX" # Korean Exchange KRX = "KRX" # Korean Exchange
AMEX = "AMEX" # NESE American
OANDA = "OANDA" # oanda.com OANDA = "OANDA" # oanda.com

View File

@ -264,6 +264,7 @@ class MainEngine:
if gateway: if gateway:
return gateway.query_history(req) return gateway.query_history(req)
else: else:
self.write_log(f'网关为空,请检查合约得网关是否与连接得网关一致')
return None return None
def close(self) -> None: def close(self) -> None: