[update] 指数行情修正为CTP合成

This commit is contained in:
msincenselee 2021-05-28 14:02:58 +08:00
parent b1e0b74d98
commit a7775d5124
8 changed files with 770 additions and 57 deletions

View File

@ -86,7 +86,6 @@ STOP_STATUS_MAP = {
Status.REJECTED: StopOrderStatus.CANCELLED
}
class CtaEngine(BaseEngine):
"""
策略引擎数字货币版
@ -216,6 +215,8 @@ class CtaEngine(BaseEngine):
if self.last_minute != dt.minute:
self.last_minute = dt.minute
# 检查未订阅得合约
self.check_unsubscribed_symbols()
if all_trading:
# 主动获取所有策略得持仓信息
@ -960,6 +961,7 @@ class CtaEngine(BaseEngine):
Add a new strategy.
"""
try:
self.write_log(f'{strategy_name} => 开始添加实例:{setting}')
if strategy_name in self.strategies:
msg = f"{strategy_name} => 创建策略失败,存在重名"
self.write_log(msg=msg,
@ -1006,15 +1008,31 @@ class CtaEngine(BaseEngine):
self.write_error(traceback.format_exc())
return False, msg
def init_strategy(self, strategy_name: str, auto_start: bool = False):
"""
Init a strategy.
"""
self.write_log(f'创建独立线程执行{strategy_name} on_init()')
task = self.thread_executor.submit(self._init_strategy, strategy_name, auto_start)
# 添加执行完毕得回调函数
task.add_done_callback(self.thread_pool_callback)
self.thread_tasks.append(task)
return True
def thread_pool_callback(self, worker):
"""线程异常捕捉"""
worker_exception = worker.exception()
if worker_exception:
account_id = self.engine_config.get('accountid','cta_crypto')
msg = f'{account_id}worker_exception :{str(worker_exception)}'
self.write_error(msg)
self.send_wechat(msg)
else:
self.write_log(f'crypto engine thread worker completed')
def _init_strategy(self, strategy_name: str, auto_start: bool = False):
"""
Init strategies in queue.
@ -1147,9 +1165,14 @@ class CtaEngine(BaseEngine):
strategy = self.strategies[strategy_name]
if strategy.trading:
err_msg = f"策略{strategy.strategy_name}移除失败,请先停止"
self.write_error(err_msg)
return False, err_msg
# err_msg = f"策略{strategy.strategy_name}正在运行,先停止"
# self.write_error(err_msg)
# return False, err_msg
ret, msg = self.stop_strategy(strategy_name)
if not ret:
return False, msg
else:
self.write_log(msg)
# Remove setting
self.remove_strategy_setting(strategy_name)
@ -1648,11 +1671,13 @@ class CtaEngine(BaseEngine):
# 账号的持仓处理 => compare_pos
compare_pos = dict() # vt_symbol: {'账号多单': xx, '账号空单':xxx, '策略空单':[], '策略多单':[]}
for position in list(self.positions.values()):
self.write_log(f'扫描帐号持仓')
positions = self.main_engine.get_all_positions()
for position in positions: # list(self.positions.values()):
# gateway_name.symbol.exchange => symbol.exchange
vt_symbol = position.vt_symbol
vt_symbols.add(vt_symbol)
self.write_log(f'帐号:{position.vt_symbol}:{position.volume}')
compare_pos[vt_symbol] = OrderedDict(
{
"账号净仓": position.volume,
@ -1664,6 +1689,7 @@ class CtaEngine(BaseEngine):
)
# 逐一根据策略仓位与Account_pos进行处理比对
self.write_log(f'扫描策略持仓')
for strategy_pos in strategy_pos_list:
for pos in strategy_pos.get('pos', []):
vt_symbol = pos.get('vt_symbol')
@ -1694,20 +1720,22 @@ class CtaEngine(BaseEngine):
u'{}({})'.format(strategy_pos['strategy_name'], abs(pos.get('volume', 0))))
self.write_log(u'更新{}策略持多仓=>{}'.format(vt_symbol, symbol_pos.get('策略多单', 0)))
compare_pos.update({vt_symbol: symbol_pos})
pos_compare_result = ''
# 精简输出
compare_info = ''
for vt_symbol in sorted(vt_symbols):
# 发送不一致得结果
symbol_pos = compare_pos.pop(vt_symbol, None)
if not symbol_pos:
self.write_error(f'持仓对比中,找不到{vt_symbol}')
continue
net_symbol_pos = round(round(symbol_pos['策略多单'], 7) - round(symbol_pos['策略空单'], 7), 7)
symbol_pos = compare_pos.pop(vt_symbol, {})
# if not symbol_pos:
# self.write_error(f'持仓对比中,找不到{vt_symbol}')
# continue
net_symbol_pos = round(round(symbol_pos.get('策略多单',0), 7) - round(symbol_pos.get('策略空单',0), 7), 7)
# 多空都一致
if round(symbol_pos['账号净仓'], 7) == net_symbol_pos:
if round(symbol_pos.get('账号净仓',0), 7) == net_symbol_pos:
msg = u'{}多空都一致.{}\n'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False))
self.write_log(msg)
compare_info += msg
@ -1716,7 +1744,7 @@ class CtaEngine(BaseEngine):
self.write_error(u'{}不一致:{}'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False)))
compare_info += u'{}不一致:{}\n'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False))
diff_volume = round(symbol_pos['账号净仓'], 7) - net_symbol_pos
diff_volume = round(symbol_pos.get('账号净仓',0), 7) - net_symbol_pos
# 账号仓位> 策略仓位, sell
if diff_volume > 0 and auto_balance:
contract = self.main_engine.get_contract(vt_symbol)

View File

@ -1127,9 +1127,14 @@ class CtaEngine(BaseEngine):
"""
strategy = self.strategies[strategy_name]
if strategy.trading:
err_msg = f"策略{strategy.strategy_name}移除失败,请先停止"
self.write_error(err_msg)
return False, err_msg
# err_msg = f"策略{strategy.strategy_name}正在运行,先停止"
#self.write_error(err_msg)
#return False, err_msg
ret , msg = self.stop_strategy(strategy_name)
if not ret:
return False, msg
else:
self.write_log(msg)
# Remove setting
self.remove_strategy_setting(strategy_name)

View File

@ -3,7 +3,7 @@
import os
from pathlib import Path
from vnpy.trader.app import BaseApp
from .engine import IndexTickPublisher, APP_NAME
from .engine import IndexTickPublisher,IndexTickPublisherV2, APP_NAME
class IndexTickPublisherApp(BaseApp):
@ -12,4 +12,4 @@ class IndexTickPublisherApp(BaseApp):
app_module = __module__
app_path = Path(__file__).parent
display_name = u'期货指数全行情推送'
engine_class = IndexTickPublisher
engine_class = IndexTickPublisherV2

View File

@ -11,24 +11,278 @@ from datetime import datetime, timedelta
from time import sleep
from logging import ERROR
from pytdx.exhq import TdxExHq_API
from copy import deepcopy
from vnpy.event import EventEngine
from vnpy.trader.constant import Exchange
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.event import EVENT_TIMER
from vnpy.trader.object import TickData
from vnpy.trader.utility import get_trading_date
from vnpy.data.tdx.tdx_common import TDX_FUTURE_HOSTS
from vnpy.trader.object import TickData, SubscribeRequest
from vnpy.trader.utility import get_trading_date, get_underlying_symbol, load_json, get_real_symbol_by_exchange
from vnpy.data.tdx.tdx_common import TDX_FUTURE_HOSTS, get_future_contracts
from vnpy.component.base import (
NIGHT_MARKET_23,
NIGHT_MARKET_SQ2,
MARKET_DAY_ONLY)
from vnpy.amqp.producer import publisher
from vnpy.gateway.ctp.ctp_gateway import CtpMdApi, symbol_exchange_map
APP_NAME = 'Idx_Publisher'
class IndexTickPublisherV2(BaseEngine):
"""
指数tick发布服务
透过ctp 行情接口获取所有合约并根据合约的仓指生成指数tick发布至rabbitMQ
"""
# ----------------------------------------------------------------------
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super(IndexTickPublisherV2, self).__init__(
main_engine, event_engine, APP_NAME)
self.main_engine = main_engine
self.event_engine = event_engine
self.create_logger(logger_name=APP_NAME)
self.gateway_name = 'CTP'
self.last_minute = None
self.registerEvent()
self.connection_status = False # 连接状态
# ctp md api
self.subscribed_symbols = set() # 已订阅合约代码
self.md_api = None # API 的连接会话对象
self.last_tick_dt = {} # 记录该会话对象的最后一个tick时间
self.instrument_count = 50000
self.has_qry_instrument = False
# vt_setting.json内rabbitmq配置项
self.conf = {}
self.pub = None
self.status = {}
self.subscribed_symbols = set() # 已订阅合约代码
self.ticks = {}
# 本地/vnpy/data/tdx/future_contracts.json
self.all_contracts = get_future_contracts()
# 需要订阅的短合约
self.selected_underly_symbols = load_json('subscribe_symbols.json', auto_save=False)
# 短合约 <=> 所有真实合约 的数量
self.underly_symbols_num_dict = {}
def write_error(self, content: str):
self.write_log(msg=content, level=ERROR)
def create_publisher(self, conf):
"""创建rabbitmq 消息发布器"""
if self.pub:
return
try:
self.write_log(f'创建发布器:{conf}')
# 消息发布
self.pub = publisher(host=conf.get('host', 'localhost'),
port=conf.get('port', 5672),
user=conf.get('user', 'admin'),
password=conf.get('password', 'admin'),
channel_number=conf.get('channel_number', 1),
queue_name=conf.get('queue_name', ''),
routing_key=conf.get('routing_key', 'default'),
exchange=conf.get('exchange', 'x_fanout_idx_tick'))
self.write_log(f'创建发布器成功')
except Exception as ex:
self.write_log(u'创建tick发布器异常:{}'.format(str(ex)))
# ----------------------------------------------------------------------
def registerEvent(self):
"""注册事件监听"""
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def process_timer_event(self, event):
"""定时执行"""
dt = datetime.now()
if self.last_minute and dt.minute == self.last_minute:
return
self.last_minute = dt.minute
self.check_status()
def check_status(self):
"""定期检查状态"""
if not self.md_api:
self.status.update({'con': False})
self.write_log(f'行情接口未连接')
return
dt_now = datetime.now()
# 扫描合约配置文件
for underly_symbol, info in self.all_contracts.items():
# 如果本地subscribe_symbols内有合约的指定订阅清单进行排除 ['RB','IF']
if len(self.selected_underly_symbols) > 0 and underly_symbol not in self.selected_underly_symbols:
continue
# 日盘数据,夜盘期间不订阅
if dt_now.hour < 4 or dt_now.hour > 20:
if underly_symbol in MARKET_DAY_ONLY:
continue
# 获取当前所有的合约列表
symbols = info.get('symbols', {})
# 获取交易所
exchange = info.get('exchange', 'LOCAL')
# 获取本地记录的tick dict
tick_dict = self.ticks.get(underly_symbol, {})
for symbol in symbols.keys():
# 全路径合约 => 标准合约 ,如 ZC2109 => ZC109, RB2110 => rb2110
vn_symbol = get_real_symbol_by_exchange(symbol, Exchange(exchange))
if symbol.replace(underly_symbol, '') < dt_now.strftime('%Y%m%d'):
self.write_log(f'移除早于当月的合约{symbol}')
symbols.pop(symbol, None)
continue
# 生成带交易所信息的合约
vt_symbol = f'{vn_symbol}.{exchange}'
# symbol_exchange_map是全局变量ctp md api会使用到所以需要更新其 合约与交易所的关系
if vn_symbol not in symbol_exchange_map:
symbol_exchange_map.update({vn_symbol: Exchange(exchange)})
# 该合约没有在行情中,重新发出订阅
if vt_symbol not in tick_dict:
req = SubscribeRequest(
symbol=vn_symbol,
exchange=Exchange(exchange)
)
self.subscribe(req)
# 等级短合约 <=> 真实合约数量
self.underly_symbols_num_dict.update({underly_symbol: len(symbols.keys())})
def connect(self, *args, **kwargs):
"""
连接ctp行情和rabbitmq推送
:param args:
:param kwargs:
:return:
"""
self.write_log(f'connect({kwargs}')
# 连接ctp行情服务器
md_address = kwargs.get('md_address')
userid = kwargs.get('userid')
password = kwargs.get('password')
brokerid = kwargs.get('brokerid')
if not self.md_api:
self.write_log(f'创建ctp行情服务器{md_address}')
self.md_api = CtpMdApi(gateway=self)
self.md_api.connect(address=md_address,
userid=userid,
password=password,
brokerid=brokerid)
# 连接rabbit MQ
rabbit_config = kwargs.get('rabbit_config', {})
self.write_log(f'创建rabbitMQ 消息推送桩,{rabbit_config}')
self.conf.update(rabbit_config)
self.create_publisher(self.conf)
def subscribe(self, req: SubscribeRequest):
"""订阅合约"""
self.write_log(f'engine:订阅合约: {req.vt_symbol}')
if req.vt_symbol not in self.subscribed_symbols:
self.subscribed_symbols.add(req.vt_symbol)
if self.md_api:
self.md_api.subscribe(req)
def on_tick(self, tick):
""" tick到达事件"""
short_symbol = get_underlying_symbol(tick.symbol).upper()
# 更新tick
tick_dict = self.ticks.get(short_symbol, None)
if tick_dict is None:
tick_dict = {tick.symbol: tick}
self.ticks.update({short_symbol: tick_dict})
return
# 与最后
last_dt = self.last_tick_dt.get(short_symbol, tick.datetime)
# 进行指数合成
if last_dt and tick.datetime.second != last_dt.second:
all_amount = 0
all_interest = 0
all_volume = 0
all_ask1 = 0
all_bid1 = 0
last_price = 0
ask_price_1 = 0
bid_price_1 = 0
mi_tick = None
# 已经积累的行情tick数量不足总数减1不处理
n = self.underly_symbols_num_dict.get(short_symbol, 1)
if len(tick_dict) < min(n*0.8, 3) :
self.write_log(f'{short_symbol}合约数据{len(tick_dict)}不足{n} 0.8,暂不合成指数')
return
# 计算所有合约的累加持仓量、资金、成交量、找出最大持仓量的主力合约
for t in tick_dict.values():
all_interest += t.open_interest
all_amount += t.last_price * t.open_interest
all_volume += t.volume
all_ask1 += t.ask_price_1 * t.open_interest
all_bid1 += t.bid_price_1 * t.open_interest
if mi_tick is None or mi_tick.open_interest < t.open_interest:
mi_tick = t
# 总量 > 0
if all_interest > 0 and all_amount > 0:
last_price = round(float(all_amount / all_interest), 4)
# 卖1价
if all_ask1 > 0 and all_interest > 0:
ask_price_1 = round(float(all_ask1 / all_interest), 4)
# 买1价
if all_bid1 > 0 and all_interest > 0:
bid_price_1 = round(float(all_bid1 / all_interest), 4)
if mi_tick and last_price > 0:
if self.pub:
d = copy.copy(mi_tick.__dict__)
# 时间 =》 字符串
if isinstance(mi_tick.datetime, datetime):
d.update({'datetime': mi_tick.datetime.strftime('%Y-%m-%d %H:%M:%S.%f')})
# 变量 => 字符串
d.update({'exchange': mi_tick.exchange.value})
d.update({'symbol': f'{short_symbol}99', 'vt_symbol': f'{short_symbol}99.{mi_tick.exchange.value}'})
# 更新未指数的持仓量、交易量最后价格ask1bid1
d.update({'open_interest': all_interest, 'volume': all_volume,
'last_price': last_price, 'ask_price_1': ask_price_1, 'bid_price_1': bid_price_1})
print('{} {}:{}'.format(d.get('datetime'), d.get("vt_symbol"), d.get('last_price')))
d = json.dumps(d)
self.pub.pub(d)
# 更新时间
self.last_tick_dt.update({short_symbol: tick.datetime})
tick_dict.update({tick.symbol: tick})
self.ticks.update({short_symbol: tick_dict})
def on_custom_tick(self, tick):
pass
class IndexTickPublisher(BaseEngine):
# 指数tick发布服务
# 通过通达信接口获取指数行情tick发布至rabbitMQ
@ -276,7 +530,14 @@ class IndexTickPublisher(BaseEngine):
self.pub.exit()
def check_status(self):
# self.write_log(u'检查tdx接口状态')
self.write_log(u'检查tdx接口状态')
if len(self.symbol_tick_dict) > 0:
k = self.symbol_tick_dict.keys()[0]
tick = self.symbol_tick_dict.get(k, None)
if tick:
self.write_log(f'{tick.vt_symbol}: {tick.datetime}, price:{tick.last_price}')
else:
self.write_log(f'目前没有收到tick')
# 若还没有启动连接,就启动连接
over_time = self.last_tick_dt is None or (datetime.now() - self.last_tick_dt).total_seconds() > 60
@ -285,8 +546,8 @@ class IndexTickPublisher(BaseEngine):
self.close()
self.api = None
self.reconnect()
# self.write_log(u'tdx接口状态正常')
else:
self.write_log(u'tdx接口状态正常')
def qry_instrument(self):
"""

View File

@ -65,27 +65,25 @@ PERIOD_MAPPING['1week'] = 5
PERIOD_MAPPING['1month'] = 6
# 期货行情服务器清单
TDX_FUTURE_HOSTS = [
#{"ip": "120.24.0.77", "port": 443, "name": "通达信接入主站"},
{"ip": "112.74.214.43", "port": 7727, "name": "扩展市场深圳双线1"},
#{"ip": "120.24.0.77", "port": 7727, "name": "扩展市场深圳双线2"},
{"ip": "47.107.75.159", "port": 7727, "name": "扩展市场深圳双线3"},
{"ip": "113.105.142.136", "port": 443, "name": "扩展市场东莞主站"},
{"ip": "113.105.142.133", "port": 443, "name": "港股期货东莞电信"},
{"ip": "119.97.185.5", "port": 7727, "name": "扩展市场武汉主站1"},
{"ip": "119.97.185.7", "port": 7727, "name": "港股期货武汉主站1"},
{"ip": "119.97.185.9", "port": 7727, "name": "港股期货武汉主站2"},
{"ip": "59.175.238.38", "port": 7727, "name": "扩展市场武汉主站3"},
{"ip": "202.103.36.71", "port": 443, "name": "扩展市场武汉主站2"},
{"ip": "47.92.127.181", "port": 7727, "name": "扩展市场北京主站"},
{"ip": "106.14.95.149", "port": 7727, "name": "扩展市场上海双线"},
{"ip": '218.80.248.229', 'port': 7721, "name": "备用服务器1"},
{"ip": '124.74.236.94', 'port': 7721, "name": "备用服务器2"},
{'ip': '58.246.109.27', 'port': 7721, "name": "备用服务器3"}]
TDX_FUTURE_HOSTS =[
#{'ip': '42.193.151.197', 'port': 7727, 'name': '广州期货双线1', 'speed': 6.622},
#{'ip': '119.29.63.178', 'port': 7727, 'name': '广州期货双线3', 'speed': 7.716},
#{'ip': '81.71.76.101', 'port': 7727, 'name': '广州期货双线2', 'speed': 14.914},
#{'ip': '47.107.75.159', 'port': 7727, 'name': '扩展市场深圳双线3', 'speed': 34.542},
#{'ip': '112.74.214.43', 'port': 7727, 'name': '扩展市场深圳双线1', 'speed': 37.881},
#{'ip': '59.175.238.38', 'port': 7727, 'name': '扩展市场武汉主站3', 'speed': 49.63},
#{'ip': '119.97.185.5', 'port': 7727, 'name': '扩展市场武汉主站1', 'speed': 70.563},
{'ip': '218.80.248.229', 'port': 7721, 'name': '备用服务器1', 'speed': 86.91300000000001},
#{'ip': '119.97.185.7', 'port': 7727, 'name': '港股期货武汉主站1', 'speed': 101.06099999999999},
#{'ip': '106.14.95.149', 'port': 7727, 'name': '扩展市场上海双线', 'speed': 105.294},
{'ip': '113.105.142.136', 'port': 443, 'name': '扩展市场东莞主站', 'speed': 10000.0},
{'ip': '113.105.142.133', 'port': 443, 'name': '港股期货东莞电信', 'speed': 10000.0},
#{'ip': '119.97.185.9', 'port': 7727, 'name': '港股期货武汉主站2', 'speed': 10000.0},
{'ip': '202.103.36.71', 'port': 443, 'name': '扩展市场武汉主站2', 'speed': 10000.0},
#{'ip': '47.92.127.181', 'port': 7727, 'name': '扩展市场北京主站', 'speed': 10000.0},
{'ip': '124.74.236.94', 'port': 7721, 'name': '备用服务器2', 'speed': 10000.0},
{'ip': '58.246.109.27', 'port': 7721, 'name': '备用服务器3', 'speed': 10000.0}
]
def get_future_contracts():

View File

@ -55,7 +55,7 @@ from vnpy.trader.constant import (
OptionType,
Interval
)
from vnpy.trader.gateway import BaseGateway, TickCombiner
from vnpy.trader.gateway import BaseGateway, TickCombiner, IndexGenerator
from vnpy.trader.object import (
TickData,
BarData,
@ -264,8 +264,18 @@ class CtpGateway(BaseGateway):
self.combiners = {}
self.tick_combiner_map = {}
# 本地指数行情合成器{ 'rb2110':x, 'rb2201':x',,}
self.index_generators = {}
# 已经创建得指数合成器symbol列表 ['RB99','J99',,,]
self.subscribed_index_symbols = []
def connect(self, setting: dict):
""""""
"""
连接交易服务器行情服务器
行情服务器包括ctp普通行情ctp5档行情上海能源所tdx指数行情rabbitMQ指数行情天勤指数行情
:param setting:
:return:
"""
userid = setting["用户名"]
password = setting["密码"]
brokerid = setting["经纪商代码"]
@ -277,6 +287,7 @@ class CtpGateway(BaseGateway):
product_info = setting["产品信息"]
rabbit_dict = setting.get('rabbit', None)
tq_dict = setting.get('tq', None)
tdx_dict = setting.get('tdx', None)
if (
(not td_address.startswith("tcp://"))
and (not td_address.startswith("ssl://"))
@ -331,7 +342,7 @@ class CtpGateway(BaseGateway):
self.write_log(f'激活天勤行情接口')
self.tq_api = TqMdApi(gateway=self)
self.tq_api.connect(tq_dict)
else:
elif tdx_dict is not None:
self.write_log(f'激活通达信行情接口')
self.tdx_api = TdxMdApi(gateway=self)
self.tdx_api.connect()
@ -394,7 +405,14 @@ class CtpGateway(BaseGateway):
return True
def subscribe(self, req: SubscribeRequest):
""""""
"""
订阅合约行情
普通合约 => ctp行情5档行情
指数合约 => 通达信rabbitMQ天勤
套利合约 => 合约合并器
:param req:
:return:
"""
try:
if self.md_api:
# 如果是自定义的套利合约符号
@ -447,6 +465,7 @@ class CtpGateway(BaseGateway):
return
elif req.exchange == Exchange.SPD:
self.write_error(u'自定义合约{}不在CTP设置中'.format(req.symbol))
return
# 指数合约从tdx行情订阅
if req.symbol[-2:] in ['99']:
@ -460,6 +479,11 @@ class CtpGateway(BaseGateway):
elif self.tq_api:
self.write_log(f'使用天勤接口订阅{req.symbol}')
self.tq_api.subscribe(req)
else:
if req.symbol not in self.subscribed_index_symbols:
self.write_log(f'使用本地指数生成器进行订阅')
self.subscribe_local_index(req)
else:
# 上期所、上能源支持五档行情,使用天勤接口
if self.tq_api and req.exchange in [Exchange.SHFE, Exchange.INE]:
@ -468,6 +492,7 @@ class CtpGateway(BaseGateway):
if self.l2_md_api and req.exchange in [Exchange.SHFE, Exchange.INE]:
self.write_log(f'使用五档行情接口订阅:{req.symbol}')
self.l2_md_api.subscribe(req)
else:
self.write_log(f'使用CTP接口订阅{req.symbol}')
self.md_api.subscribe(req)
@ -492,6 +517,26 @@ class CtpGateway(BaseGateway):
bg = BarGenerator(on_bar=self.on_bar)
self.klines.update({vt_symbol: bg})
def subscribe_local_index(self, req):
"""
订阅本地合约
:param req:
:return:
"""
underlying_symbol = get_underlying_symbol(req.symbol)
symbol_info = future_contracts.get(underlying_symbol,None)
if symbol_info:
generator = IndexGenerator(gateway=self, setting=symbol_info)
# 登记订阅真实合约 <=>合成器 关系
for vn_symbol in generator.symbols:
self.index_generators[vn_symbol] = generator
# 登记指数合约到本地已订阅信息
self.subscribed_index_symbols.append(req.symbol)
else:
self.write_error(f'{underlying_symbol}信息没有在vnpy/data/tdx/future_contracts.json文件中,不能创建指数订阅')
def send_order(self, req: OrderRequest):
""""""
return self.td_api.send_order(req)
@ -574,11 +619,15 @@ class CtpGateway(BaseGateway):
def on_custom_tick(self, tick):
"""推送自定义合约行情"""
# 自定义合约行情
for combiner in self.tick_combiner_map.get(tick.symbol, []):
tick = copy(tick)
combiner.on_tick(tick)
# 推送至指数生成器
if tick.symbol in self.index_generators:
tick = copy(tick)
# 推送on_tick()方法
self.index_generators[tick.symbol].on_tick(tick)
class CtpMdApi(MdApi):
""""""
@ -688,7 +737,7 @@ class CtpMdApi(MdApi):
date=s_date,
time=dt.strftime('%H:%M:%S.%f'),
trading_day=trading_day,
name=symbol_name_map[symbol],
name=symbol_name_map.get(symbol,symbol),
volume=today_volume,
last_volume=volume_changed,
open_interest=data["OpenInterest"],
@ -759,6 +808,7 @@ class CtpMdApi(MdApi):
"""
Login onto server.
"""
self.gateway.write_log(f'{self.name}向行情服务器发出登录请求')
req = {
"UserID": self.userid,
"Password": self.password,
@ -772,8 +822,8 @@ class CtpMdApi(MdApi):
"""
Subscribe to tick data update.
"""
if self.login_status:
self.gateway.write_log(f'{self.name}订阅:{req.exchange} {req.symbol}')
if self.login_status:
self.subscribeMarketData(req.symbol)
self.subscribed.add(req.symbol)
@ -835,7 +885,6 @@ class CtpTdApi(TdApi):
self.gateway.write_log("向交易服务器进行帐号登录")
self.login()
def onFrontDisconnected(self, reason: int):
""""""
self.login_status = False
@ -1864,7 +1913,7 @@ class SubMdApi():
def check_status(self):
"""接口状态的健康检查"""
self.gateway.write_log("检查sub接口的状态")
# 订阅的合约
d = {'sub_symbols': sorted(self.symbol_tick_dict.keys())}
@ -1899,7 +1948,7 @@ class SubMdApi():
self.gateway.status.update(d)
def on_message(self, chan, method_frame, _header_frame, body, userdata=None):
# print(" [x] %r" % body)
#print(" [x] %r" % body)
try:
str_tick = body.decode('utf-8')
d = json.loads(str_tick)
@ -1908,6 +1957,8 @@ class SubMdApi():
d = self.conver_update(d)
symbol = d.pop('symbol', None)
if symbol == 'ZC99':
a = 1
str_datetime = d.pop('datetime', None)
if symbol not in self.registed_symbol_set or str_datetime is None:
return

View File

@ -1100,6 +1100,45 @@ class PbTdApi(object):
return results
def query_account(self):
if self.gateway.file_type == 'dbf':
self.query_account_dbf()
else:
self.query_account_csv()
def query_account_dbf(self):
"""获取资金账号信息"""
# dbf 文件名
account_dbf = os.path.abspath(os.path.join(self.account_folder,
'{}{}.dbf'.format(
PB_FILE_NAMES.get('accounts'),
self.trading_date)))
try:
# dbf => 资金帐号信息
self.gateway.write_log(f'扫描资金帐号信息:{account_dbf}')
table = dbf.Table(account_dbf, codepage='cp936')
table.open(dbf.READ_ONLY)
for data in table:
# ["资金账户"]
if str(data.zjzh).strip() != self.userid:
continue
account = AccountData(
gateway_name=self.gateway_name,
accountid=self.userid,
balance=float(data.dyjz), # ["单元净值"]
frozen=float(data.dyjz) - float(data.kyye), # data["可用余额"]
currency="人民币",
trading_day=self.trading_day
)
self.gateway.on_account(account)
table.close()
except Exception as ex:
self.gateway.write_error(f'dbf扫描资金帐号异常:{str(ex)}')
self.gateway.write_error(traceback.format_exc())
def query_account_csv(self):
"""获取资金账号信息"""
if self.gateway.pb_version == '2018':
# 账号的文件
@ -1134,6 +1173,67 @@ class PbTdApi(object):
def query_position(self):
"""获取持仓信息"""
if self.gateway.file_type == 'dbf':
self.query_position_dbf()
else:
self.query_position_csv()
def query_position_dbf(self):
"""从dbf文件获取持仓信息"""
# fields:['zqgs', 'zjzh', 'zhlx', 'zqdm', 'zqmc', 'zqlb', 'zxjg', 'cbjg', 'cpbh', 'cpmc', 'dybh', 'dymc', 'ccsl', 'dqcb', 'kysl', 'jjsz', 'qjsz', 'zqlx'
# , 'jysc', 'jybz', 'dryk', 'ljyk', 'fdyk', 'fyl', 'ykl', 'tzlx', 'gddm', 'mrsl', 'mcsl', 'mrje', 'mcje', 'zdf', 'bbj', 'qjcb', 'gtcb', 'gtyk', 'zgb']
# dbf 文件名
position_dbf = os.path.abspath(os.path.join(self.account_folder,
'{}{}.dbf'.format(
PB_FILE_NAMES.get('positions'),
self.trading_date)))
try:
# dbf => 股票持仓信息
self.gateway.write_log(f'扫描股票持仓信息:{position_dbf}')
table = dbf.Table(position_dbf, codepage='cp936')
table.open(dbf.READ_ONLY)
for data in table:
if str(data.zjzh).strip() != self.userid:
continue
symbol = str(data.zqdm).strip() #["证券代码"]
# symbol => Exchange
exchange = symbol_exchange_map.get(symbol, None)
if not exchange:
exchange_str = get_stock_exchange(code=symbol)
if len(exchange_str) > 0:
exchange = Exchange(exchange_str)
symbol_exchange_map.update({symbol: exchange})
name = symbol_name_map.get(symbol, None)
if not name:
name = data.zqmc # ["证券名称"]
symbol_name_map.update({symbol: name})
position = PositionData(
gateway_name=self.gateway_name,
accountid=self.userid,
symbol=symbol, #["证券代码"],
exchange=exchange,
direction=Direction.NET,
name=name,
volume=int(data.ccsl), # ["持仓数量"]
yd_volume=int(data.kysl),# ["可用数量"]
price=float(data.cbjg), # ["成本价"]
cur_price=float(data.zxjg), # ["最新价"]
pnl=float(data.fdyk), # ["浮动盈亏"]
holder_id=str(data.gddm).strip() #["股东"]
)
self.gateway.on_position(position)
table.close()
except Exception as ex:
self.gateway.write_error(f'dbf扫描股票持仓异常:{str(ex)}')
self.gateway.write_error(traceback.format_exc())
def query_position_csv(self):
"""从csv获取持仓信息"""
if self.gateway.pb_version == '2018':
# 持仓的文件
positions_csv = os.path.abspath(os.path.join(self.account_folder,
@ -1187,6 +1287,98 @@ class PbTdApi(object):
self.gateway.on_position(position)
def query_orders(self):
if self.gateway.file_type == 'dbf':
self.query_orders_dbf()
else:
self.query_orders_csv()
def query_orders_dbf(self):
"""dbf文件获取所有委托"""
# fields:['zqgs', 'zjzh', 'zhlx', 'cpbh', 'cpmc', 'dybh', 'dymc', 'wtph', 'wtxh', 'zqdm', 'zqmc', 'wtfx', 'jglx', 'wtjg', 'wtsl', 'wtzt', 'cjsl', 'wtje'
# , 'cjjj', 'cdsl', 'jysc', 'fdyy', 'wtly', 'wtrq', 'wtsj', 'jybz']
orders_dbf = os.path.abspath(os.path.join(self.account_folder,
'{}{}.dbf'.format(
PB_FILE_NAMES.get('orders'),
self.trading_date)))
try:
# dbf => 股票委托信息
self.gateway.write_log(f'扫描股票委托信息:{orders_dbf}')
table = dbf.Table(orders_dbf, codepage='cp936')
table.open(dbf.READ_ONLY)
for data in table:
if str(data.zjzh).strip() != self.userid: # ["资金账户"]
continue
sys_orderid = str(data.wtxh).strip() # ["委托序号"]
# 检查是否存在本地order_manager缓存中
order = self.gateway.order_manager.get_order_with_sys_orderid(sys_orderid)
order_date = str(data.wtrq).strip() #["委托日期"]
order_time = str(data.wtsj).strip() #["委托时间"]
order_status = STATUS_NAME2VT.get(str(data.wtzt).strip()) # ["委托状态"]
# 检查是否存在本地orders缓存中系统级别的委托单
sys_order = self.orders.get(sys_orderid, None)
if order is not None:
continue
# 委托单不存在本地映射库,说明是其他地方下的单子,不是通过本接口下单
if sys_order is None:
# 不处理以下状态
if order_status in [Status.SUBMITTING, Status.REJECTED, Status.CANCELLED, Status.CANCELLING]:
continue
order_dt = datetime.strptime(f'{order_date} {order_time}', "%Y%m%d %H%M%S")
direction = DIRECTION_STOCK_NAME2VT.get(str(data.wtfx).strip()) # ["委托方向"]
offset = Offset.NONE
if direction is None:
direction = Direction.NET
elif direction == Direction.LONG:
offset = Offset.OPEN
elif direction == Direction.SHORT:
offset = Offset.CLOSE
sys_order = OrderData(
gateway_name=self.gateway_name,
symbol=str(data.zqdm).strip(), # ["证券代码"]
exchange=EXCHANGE_NAME2VT.get(str(data.jysc).strip()), # ["交易市场"]
orderid=sys_orderid,
sys_orderid=sys_orderid,
accountid=self.userid,
type=ORDERTYPE_NAME2VT.get(str(data.jglx).strip(), OrderType.LIMIT), # ["价格类型"]
direction=direction,
offset=offset,
price=float(data.wtjg), # ["委托价格"]
volume=float(data.wtsl), # ["委托数量"]
traded=float(data.cjsl), # ["成交数量"]
status=order_status,
datetime=order_dt,
time=order_dt.strftime('%H:%M:%S')
)
# 直接发出订单更新事件
self.gateway.write_log(f'账号订单查询,新增:{sys_order.__dict__}')
self.orders.update({sys_order.sys_orderid: sys_order})
self.gateway.on_order(sys_order)
continue
# 存在账号缓存,判断状态是否更新
else:
# 暂不处理交给XHPT_WTCX模块处理
if sys_order.status != order_status or sys_order.traded != float(data.cjsl): # ["成交数量"]
sys_order.traded = float(data.cjsl) # ["成交数量"]
sys_order.status = order_status
self.orders.update({sys_order.sys_orderid: sys_order})
self.gateway.write_log(f'账号订单查询,更新:{sys_order.__dict__}')
self.gateway.on_order(sys_order)
continue
table.close()
except Exception as ex:
self.gateway.write_error(f'dbf扫描股票委托异常:{str(ex)}')
self.gateway.write_error(traceback.format_exc())
def query_orders_csv(self):
"""获取所有委托"""
# 所有委托的文件
if self.gateway.pb_version == '2018':
@ -1435,6 +1627,76 @@ class PbTdApi(object):
continue
def query_trades(self):
if self.gateway.file_type == 'dbf':
self.query_trades_dbf()
else:
self.query_trades_csv()
def query_trades_dbf(self):
"""dbf文件获取所有成交"""
# fields:['zqgs', 'zjzh', 'zhlx', 'cpbh', 'cpmc', 'dybh', 'dymc', 'cjxh', 'wtph', 'wtxh', 'zqdm', 'zqmc', 'wtfx', 'zqlb', 'ywfl', 'cjrq', 'cjsj', 'cjsl'
# , 'cjjg', 'zfy', 'cjje', 'jysc', 'jybz', 'wtly', 'rybh', 'rymc']
trades_dbf = os.path.abspath(os.path.join(self.account_folder,
'{}{}.dbf'.format(
PB_FILE_NAMES.get('trades'),
self.trading_date)))
try:
# dbf => 股票成交信息
self.gateway.write_log(f'扫描股票成交信息:{trades_dbf}')
table = dbf.Table(trades_dbf, codepage='cp936')
table.open(dbf.READ_ONLY)
for data in table:
if str(data.zjzh).strip()!= self.userid: # ["资金账户"]
continue
sys_orderid = str(data.wtxh) # ["委托序号"]
sys_tradeid = str(data.cjxh) # ["成交序号"]
# 检查是否存在本地trades缓存中
trade = self.trades.get(sys_tradeid, None)
order = self.gateway.order_manager.get_order_with_sys_orderid(sys_orderid)
# 如果交易不再本地映射关系
if trade is None and order is None:
trade_date = str(data.cjrq).strip() #["成交日期"]
trade_time = str(data.cjsj).strip() #["成交时间"]
trade_dt = datetime.strptime(f'{trade_date} {trade_time}', "%Y%m%d %H%M%S")
direction = DIRECTION_STOCK_NAME2VT.get(str(data.wtfx).strip()) # ["委托方向"]
offset = Offset.NONE
if direction is None:
direction = Direction.NET
elif direction == Direction.LONG:
offset = Offset.OPEN
elif direction == Direction.SHORT:
offset = Offset.CLOSE
trade = TradeData(
gateway_name=self.gateway_name,
symbol=str(data.zqdm).strip(), # ["证券代码"]
exchange=EXCHANGE_NAME2VT.get(str(data.jysc).strip()), # ["交易市场"]
orderid=sys_tradeid,
tradeid=sys_tradeid,
sys_orderid=sys_orderid,
accountid=self.userid,
direction=direction,
offset=offset,
price=float(data.cjjg), # ["成交价格"]
volume=float(data.cjsl), # ["成交数量"]
datetime=trade_dt,
time=trade_dt.strftime('%H:%M:%S'),
trade_amount=float(data.cjje), # ["成交金额"]
commission=float(data.zfy) # ["总费用"]
)
self.trades[sys_tradeid] = trade
self.gateway.on_trade(copy.copy(trade))
continue
table.close()
except Exception as ex:
self.gateway.write_error(f'dbf扫描股票成交异常:{str(ex)}')
self.gateway.write_error(traceback.format_exc())
def query_trades_csv(self):
"""获取所有成交"""
# 所有成交的文件
if self.gateway.pb_version == '2018':

View File

@ -4,8 +4,9 @@
import sys
from abc import ABC, abstractmethod
from typing import Any, Sequence, Dict, List, Optional, Callable
from copy import copy
from copy import copy,deepcopy
from logging import INFO, DEBUG, ERROR
from datetime import datetime
from vnpy.event import Event, EventEngine
from .event import (
@ -34,7 +35,7 @@ from .object import (
Exchange
)
from vnpy.trader.utility import get_folder_path, round_to
from vnpy.trader.utility import get_folder_path, round_to, get_underlying_symbol, get_real_symbol_by_exchange
from vnpy.trader.util_logger import setup_logger
@ -561,6 +562,113 @@ class TickCombiner(object):
self.gateway.on_tick(ratio_tick)
class IndexGenerator:
"""
指数生成器
"""
def __init__(self, gateway, setting):
self.gateway = gateway
self.gateway_name = self.gateway.gateway_name
self.gateway.write_log(u'创建指数合成类:{}'.format(setting))
self.ticks = {} # 所有真实合约, symbol: tick
self.last_dt = None # 最后tick得时间
self.underlying_symbol = setting.get('underlying_symbol')
self.exchange = setting.get('exchange', None)
self.price_tick = setting.get('price_tick')
self.symbols = setting.get('symbols', {})
# 订阅行情
self.subscribe()
self.n = len(self.symbols)
def subscribe(self):
"""订阅行情"""
dt_now = datetime.now()
for symbol in list(self.symbols.keys()):
pre_open_interest = self.symbols.get(symbol,0)
# 全路径合约 => 标准合约 ,如 ZC2109 => ZC109, RB2110 => rb2110
vn_symbol = get_real_symbol_by_exchange(symbol, Exchange(self.exchange))
# 先移除
self.symbols.pop(symbol, None)
if symbol.replace(self.underlying_symbol, '') < dt_now.strftime('%Y%m%d'):
self.gateway.write_log(f'移除早于当月的合约{symbol}')
continue
# 重新登记合约
self.symbols[vn_symbol] = pre_open_interest
# 发出订阅
req = SubscribeRequest(
symbol=vn_symbol,
exchange=Exchange(self.exchange)
)
self.gateway.subscribe(req)
def on_tick(self, tick):
"""tick到达事件"""
# 更新tick
if self.ticks is {}:
self.ticks.update({tick.symbol: tick})
return
# 进行指数合成
if self.last_dt and tick.datetime.second != self.last_dt.second:
all_amount = 0
all_interest = 0
all_volume = 0
all_ask1 = 0
all_bid1 = 0
last_price = 0
ask_price_1 = 0
bid_price_1 = 0
mi_tick = None
# 已经积累的行情tick数量不足总数减1不处理
if len(self.ticks) < min(self.n * 0.8, 3):
self.gateway.write_log(f'{self.underlying_symbol}合约数据{len(self.ticks)}不足{self.n} 0.8,暂不合成指数')
return
# 计算所有合约的累加持仓量、资金、成交量、找出最大持仓量的主力合约
for t in self.ticks.values():
all_interest += t.open_interest
all_amount += t.last_price * t.open_interest
all_volume += t.volume
all_ask1 += t.ask_price_1 * t.open_interest
all_bid1 += t.bid_price_1 * t.open_interest
if mi_tick is None or mi_tick.open_interest < t.open_interest:
mi_tick = t
# 总量 > 0
if all_interest > 0 and all_amount > 0:
last_price = round(float(all_amount / all_interest), 4)
# 卖1价
if all_ask1 > 0 and all_interest > 0:
ask_price_1 = round(float(all_ask1 / all_interest), 4)
# 买1价
if all_bid1 > 0 and all_interest > 0:
bid_price_1 = round(float(all_bid1 / all_interest), 4)
if mi_tick and last_price > 0:
idx_tick = deepcopy(mi_tick)
idx_tick.symbol = f'{self.underlying_symbol}99'
idx_tick.vt_symbol = f'{idx_tick.symbol}.{self.exchange}'
idx_tick.open_interest = all_interest
idx_tick.volume = all_volume
idx_tick.last_price = last_price
idx_tick.ask_price_1 = ask_price_1
idx_tick.bid_price_1 = bid_price_1
self.gateway.on_tick(idx_tick)
# 更新时间
self.last_dt = tick.datetime
# 更新tick
self.ticks.update({tick.symbol: tick})
class LocalOrderManager:
"""
Management tool to support use local order id for trading.