[bug fix]

This commit is contained in:
msincenselee 2020-07-22 21:55:35 +08:00
parent dc16eb99cd
commit 3496ee92b0
3 changed files with 204 additions and 9 deletions

View File

@ -197,6 +197,9 @@ class CtaTemplate(ABC):
if self.is_upper_limit(vt_symbol): if self.is_upper_limit(vt_symbol):
self.write_error(u'涨停价不做FAK/FOK委托') self.write_error(u'涨停价不做FAK/FOK委托')
return [] return []
if volume == 0:
self.write_error(f'委托数量有误必须大于0{vt_symbol}, price:{price}')
return []
return self.send_order(vt_symbol=vt_symbol, return self.send_order(vt_symbol=vt_symbol,
direction=Direction.LONG, direction=Direction.LONG,
offset=Offset.OPEN, offset=Offset.OPEN,
@ -218,6 +221,9 @@ class CtaTemplate(ABC):
if self.is_lower_limit(vt_symbol): if self.is_lower_limit(vt_symbol):
self.write_error(u'跌停价不做FAK/FOK sell委托') self.write_error(u'跌停价不做FAK/FOK sell委托')
return [] return []
if volume == 0:
self.write_error(f'委托数量有误必须大于0{vt_symbol}, price:{price}')
return []
return self.send_order(vt_symbol=vt_symbol, return self.send_order(vt_symbol=vt_symbol,
direction=Direction.SHORT, direction=Direction.SHORT,
offset=Offset.CLOSE, offset=Offset.CLOSE,
@ -239,6 +245,9 @@ class CtaTemplate(ABC):
if self.is_lower_limit(vt_symbol): if self.is_lower_limit(vt_symbol):
self.write_error(u'跌停价不做FAK/FOK short委托') self.write_error(u'跌停价不做FAK/FOK short委托')
return [] return []
if volume == 0:
self.write_error(f'委托数量有误必须大于0{vt_symbol}, price:{price}')
return []
return self.send_order(vt_symbol=vt_symbol, return self.send_order(vt_symbol=vt_symbol,
direction=Direction.SHORT, direction=Direction.SHORT,
offset=Offset.OPEN, offset=Offset.OPEN,
@ -260,6 +269,9 @@ class CtaTemplate(ABC):
if self.is_upper_limit(vt_symbol): if self.is_upper_limit(vt_symbol):
self.write_error(u'涨停价不做FAK/FOK cover委托') self.write_error(u'涨停价不做FAK/FOK cover委托')
return [] return []
if volume == 0:
self.write_error(f'委托数量有误必须大于0{vt_symbol}, price:{price}')
return []
return self.send_order(vt_symbol=vt_symbol, return self.send_order(vt_symbol=vt_symbol,
direction=Direction.LONG, direction=Direction.LONG,
offset=Offset.CLOSE, offset=Offset.CLOSE,
@ -2021,7 +2033,7 @@ class CtaProFutureTemplate(CtaProTemplate):
target_long_grid = None target_long_grid = None
remove_long_grid_ids = [] remove_long_grid_ids = []
for g in sorted(locked_long_grids, key=lambda grid: grid.volume): for g in sorted(locked_long_grids, key=lambda grid: grid.volume):
if g.order_status or len(g.orderRef) > 0: if g.order_status or len(g.order_ids) > 0:
continue continue
if target_long_grid is None: if target_long_grid is None:
target_long_grid = g target_long_grid = g
@ -2035,7 +2047,7 @@ class CtaProFutureTemplate(CtaProTemplate):
remain_grid = copy(g) remain_grid = copy(g)
g.volume = open_volume g.volume = open_volume
remain_grid.volume -= open_volume remain_grid.volume -= open_volume
remain_grid.id = uuid.uuid1() remain_grid.id = str(uuid.uuid1())
self.gt.dn_grids.append(remain_grid) self.gt.dn_grids.append(remain_grid)
self.write_log(u'添加剩余仓位到新多单网格:g.volume:{}' self.write_log(u'添加剩余仓位到新多单网格:g.volume:{}'
.format(remain_grid.volume)) .format(remain_grid.volume))
@ -2081,7 +2093,7 @@ class CtaProFutureTemplate(CtaProTemplate):
remain_grid = copy(g) remain_grid = copy(g)
g.volume = open_volume g.volume = open_volume
remain_grid.volume -= open_volume remain_grid.volume -= open_volume
remain_grid.id = uuid.uuid1() remain_grid.id = str(uuid.uuid1())
self.gt.up_grids.append(remain_grid) self.gt.up_grids.append(remain_grid)
self.write_log(u'添加剩余仓位到新空单网格:g.volume:{}' self.write_log(u'添加剩余仓位到新空单网格:g.volume:{}'
.format(remain_grid.volume)) .format(remain_grid.volume))
@ -2146,6 +2158,8 @@ class CtaProFutureTemplate(CtaProTemplate):
# 正在委托时,不处理 # 正在委托时,不处理
if self.entrust != 0: if self.entrust != 0:
return return
if not self.activate_today_lock:
return
# 多单得对锁格 # 多单得对锁格
locked_long_grids = self.gt.get_opened_grids_within_types(direction=Direction.LONG, types=[LOCK_GRID]) locked_long_grids = self.gt.get_opened_grids_within_types(direction=Direction.LONG, types=[LOCK_GRID])

View File

@ -16,7 +16,7 @@ from time import sleep
from functools import lru_cache from functools import lru_cache
from collections import OrderedDict from collections import OrderedDict
from multiprocessing.dummy import Pool from multiprocessing.dummy import Pool
from threading import Thread
from vnpy.event import EventEngine from vnpy.event import EventEngine
from vnpy.trader.event import EVENT_TIMER from vnpy.trader.event import EVENT_TIMER
from vnpy.trader.constant import ( from vnpy.trader.constant import (
@ -25,10 +25,12 @@ from vnpy.trader.constant import (
Direction, Direction,
OrderType, OrderType,
Status, Status,
Offset Offset,
Interval
) )
from vnpy.trader.gateway import BaseGateway, LocalOrderManager from vnpy.trader.gateway import BaseGateway, LocalOrderManager
from vnpy.trader.object import ( from vnpy.trader.object import (
BarData,
CancelRequest, CancelRequest,
OrderRequest, OrderRequest,
SubscribeRequest, SubscribeRequest,
@ -37,7 +39,8 @@ from vnpy.trader.object import (
OrderData, OrderData,
TradeData, TradeData,
PositionData, PositionData,
AccountData AccountData,
HistoryRequest
) )
from vnpy.trader.utility import get_folder_path, print_dict, extract_vt_symbol, get_stock_exchange, append_data from vnpy.trader.utility import get_folder_path, print_dict, extract_vt_symbol, get_stock_exchange, append_data
from vnpy.data.tdx.tdx_common import get_stock_type_sz, get_stock_type_sh from vnpy.data.tdx.tdx_common import get_stock_type_sz, get_stock_type_sh
@ -47,6 +50,14 @@ symbol_name_map: Dict[str, str] = {}
# 代码 <=> 交易所 # 代码 <=> 交易所
symbol_exchange_map: Dict[str, Exchange] = {} symbol_exchange_map: Dict[str, Exchange] = {}
# 时间戳对齐
TIME_GAP = 8 * 60 * 60 * 1000000000
INTERVAL_VT2TQ = {
Interval.MINUTE: 60,
Interval.HOUR: 60 * 60,
Interval.DAILY: 60 * 60 * 24,
}
# 功能<->文件对应 # 功能<->文件对应
PB_FILE_NAMES = { PB_FILE_NAMES = {
'send_order': 'XHPT_WT', # 通用接口_委托 'send_order': 'XHPT_WT', # 通用接口_委托
@ -323,6 +334,7 @@ class PbGateway(BaseGateway):
self.md_api = PbMdApi(self) self.md_api = PbMdApi(self)
self.td_api = PbTdApi(self) self.td_api = PbTdApi(self)
self.tq_api = None
self.tdx_connected = False # 通达信行情API得连接状态 self.tdx_connected = False # 通达信行情API得连接状态
@ -359,6 +371,8 @@ class PbGateway(BaseGateway):
product_id=product_id, product_id=product_id,
unit_id=unit_id, unit_id=unit_id,
holder_ids=holder_ids) holder_ids=holder_ids)
self.tq_api = TqMdApi(self)
self.tq_api.connect()
self.init_query() self.init_query()
def close(self) -> None: def close(self) -> None:
@ -368,7 +382,10 @@ class PbGateway(BaseGateway):
def subscribe(self, req: SubscribeRequest) -> None: def subscribe(self, req: SubscribeRequest) -> None:
"""""" """"""
self.md_api.subscribe(req) if self.tq_api and self.tq_api.is_connected:
self.tq_api.subscribe(req)
else:
self.md_api.subscribe(req)
def send_order(self, req: OrderRequest) -> str: def send_order(self, req: OrderRequest) -> str:
"""""" """"""
@ -2012,3 +2029,167 @@ class PbTdApi(object):
def cancel_all_csv(self): def cancel_all_csv(self):
pass pass
class TqMdApi():
"""天勤行情API"""
def __init__(self, gateway):
""""""
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.api = None
self.is_connected = False
self.subscribe_array = []
# 行情对象列表
self.quote_objs = []
# 数据更新线程
self.update_thread = None
# 所有的合约
self.all_instruments = []
self.ticks = {}
def connect(self, setting = {}):
""""""
try:
from tqsdk import TqApi
self.api = TqApi(_stock=True)
except Exception as e:
self.gateway.write_log(f'天勤股票行情API接入异常:'.format(str(e)))
self.gateway.write_log(traceback.format_exc())
if self.api:
self.is_connected = True
self.gateway.write_log(f'天勤股票行情API已连接')
self.update_thread = Thread(target=self.update)
self.update_thread.start()
def generate_tick_from_quote(self, vt_symbol, quote) -> TickData:
"""
生成TickData
"""
# 清洗 nan
quote = {k: 0 if v != v else v for k, v in quote.items()}
symbol, exchange = extract_vt_symbol(vt_symbol)
return TickData(
symbol=symbol,
exchange=exchange,
datetime=datetime.strptime(quote["datetime"], "%Y-%m-%d %H:%M:%S.%f"),
name=symbol,
volume=quote["volume"],
open_interest=quote["open_interest"],
last_price=quote["last_price"],
limit_up=quote["upper_limit"],
limit_down=quote["lower_limit"],
open_price=quote["open"],
high_price=quote["highest"],
low_price=quote["lowest"],
pre_close=quote["pre_close"],
bid_price_1=quote["bid_price1"],
bid_price_2=quote["bid_price2"],
bid_price_3=quote["bid_price3"],
bid_price_4=quote["bid_price4"],
bid_price_5=quote["bid_price5"],
ask_price_1=quote["ask_price1"],
ask_price_2=quote["ask_price2"],
ask_price_3=quote["ask_price3"],
ask_price_4=quote["ask_price4"],
ask_price_5=quote["ask_price5"],
bid_volume_1=quote["bid_volume1"],
bid_volume_2=quote["bid_volume2"],
bid_volume_3=quote["bid_volume3"],
bid_volume_4=quote["bid_volume4"],
bid_volume_5=quote["bid_volume5"],
ask_volume_1=quote["ask_volume1"],
ask_volume_2=quote["ask_volume2"],
ask_volume_3=quote["ask_volume3"],
ask_volume_4=quote["ask_volume4"],
ask_volume_5=quote["ask_volume5"],
gateway_name=self.gateway_name
)
def update(self) -> None:
"""
更新行情/委托/账户/持仓
"""
while self.api.wait_update():
# 更新行情信息
for vt_symbol, quote in self.quote_objs:
if self.api.is_changing(quote):
tick = self.generate_tick_from_quote(vt_symbol, quote)
tick and self.gateway.on_tick(tick) and self.gateway.on_custom_tick(tick)
def subscribe(self, req: SubscribeRequest) -> None:
"""
订阅行情
"""
if req.vt_symbol not in self.subscribe_array:
symbol, exchange = extract_vt_symbol(req.vt_symbol)
try:
quote = self.api.get_quote(f'{exchange.value}.{symbol}')
self.quote_objs.append((req.vt_symbol, quote))
self.subscribe_array.append(req.vt_symbol)
except Exception as ex:
self.gateway.write_log('订阅天勤行情异常:{}'.format(str(ex)))
def query_history(self, req: HistoryRequest) -> List[BarData]:
"""
获取历史数据
"""
symbol = req.symbol
exchange = req.exchange
interval = req.interval
start = req.start
end = req.end
# 天勤需要的数据
tq_symbol = f'{exchange.value}.{symbol}'
tq_interval = INTERVAL_VT2TQ.get(interval)
end += timedelta(1)
total_days = end - start
# 一次最多只能下载 8964 根Bar
min_length = min(8964, total_days.days * 500)
df = self.api.get_kline_serial(tq_symbol, tq_interval, min_length).sort_values(
by=["datetime"]
)
# 时间戳对齐
df["datetime"] = pd.to_datetime(df["datetime"] + TIME_GAP)
# 过滤开始结束时间
df = df[(df["datetime"] >= start - timedelta(days=1)) & (df["datetime"] < end)]
data: List[BarData] = []
if df is not None:
for ix, row in df.iterrows():
bar = BarData(
symbol=symbol,
exchange=exchange,
interval=interval,
datetime=row["datetime"].to_pydatetime(),
open_price=row["open"],
high_price=row["high"],
low_price=row["low"],
close_price=row["close"],
volume=row["volume"],
open_interest=row.get("close_oi", 0),
gateway_name=self.gateway_name,
)
data.append(bar)
return data
def close(self) -> None:
""""""
try:
if self.api and self.api.wait_update():
self.api.close()
self.is_connected = False
if self.update_thread:
self.update_thread.join()
except Exception as e:
self.gateway.write_log('退出天勤行情api异常:{}'.format(str(e)))

View File

@ -5,7 +5,7 @@ http://wxpusher.zjiecode.com/
开通步骤 开通步骤
1关注公众号注册 1关注公众号注册
2通过公众号获取UID 2通过公众号获取UID
3通过网站=应用列表=新建应用如vnpy2并获得APP_TOOKEN 3通过网站打开管理后台http://wxpusher.zjiecode.com/admin/ 使用微信扫码登录=应用列表=新建应用如vnpy2并获得APP_TOOKEN
4应用列表=应用vnpy2= 关注. 4应用列表=应用vnpy2= 关注.
''' '''
@ -20,7 +20,7 @@ global wechat_lock
wechat_lock = Lock() wechat_lock = Lock()
# 这里可以设置UIDS, 多个人可同时接收 # 这里可以设置UIDS, 多个人可同时接收
UIDS = ['UID_kZguGPBQPWn41Ni9FK4CgPts2Kj'] UIDS = ['UID_kZguGPBQPWn41Ni9FK4CgPts2KjU']
APP_TOKEN = 'AT_aDuiQu41dmAQV2vUMXOaaTDrWyhKJN2z' APP_TOKEN = 'AT_aDuiQu41dmAQV2vUMXOaaTDrWyhKJN2z'