diff --git a/vnpy/app/cta_crypto/engine.py b/vnpy/app/cta_crypto/engine.py index c455473c..43f8ba15 100644 --- a/vnpy/app/cta_crypto/engine.py +++ b/vnpy/app/cta_crypto/engine.py @@ -86,7 +86,6 @@ STOP_STATUS_MAP = { Status.REJECTED: StopOrderStatus.CANCELLED } - class CtaEngine(BaseEngine): """ 策略引擎【数字货币版】 @@ -216,6 +215,8 @@ class CtaEngine(BaseEngine): if self.last_minute != dt.minute: self.last_minute = dt.minute + # 检查未订阅得合约 + self.check_unsubscribed_symbols() if all_trading: # 主动获取所有策略得持仓信息 @@ -960,6 +961,7 @@ class CtaEngine(BaseEngine): Add a new strategy. """ try: + self.write_log(f'{strategy_name} => 开始添加实例:{setting}') if strategy_name in self.strategies: msg = f"{strategy_name} => 创建策略失败,存在重名" self.write_log(msg=msg, @@ -1006,15 +1008,31 @@ class CtaEngine(BaseEngine): self.write_error(traceback.format_exc()) return False, msg + + def init_strategy(self, strategy_name: str, auto_start: bool = False): """ Init a strategy. """ self.write_log(f'创建独立线程执行{strategy_name} on_init()') task = self.thread_executor.submit(self._init_strategy, strategy_name, auto_start) + # 添加执行完毕得回调函数 + task.add_done_callback(self.thread_pool_callback) self.thread_tasks.append(task) return True + def thread_pool_callback(self, worker): + """线程异常捕捉""" + worker_exception = worker.exception() + if worker_exception: + account_id = self.engine_config.get('accountid','cta_crypto') + msg = f'{account_id}worker_exception :{str(worker_exception)}' + self.write_error(msg) + self.send_wechat(msg) + + else: + self.write_log(f'crypto engine thread worker completed') + def _init_strategy(self, strategy_name: str, auto_start: bool = False): """ Init strategies in queue. @@ -1147,9 +1165,14 @@ class CtaEngine(BaseEngine): strategy = self.strategies[strategy_name] if strategy.trading: - err_msg = f"策略{strategy.strategy_name}移除失败,请先停止" - self.write_error(err_msg) - return False, err_msg + # err_msg = f"策略{strategy.strategy_name}正在运行,先停止" + # self.write_error(err_msg) + # return False, err_msg + ret, msg = self.stop_strategy(strategy_name) + if not ret: + return False, msg + else: + self.write_log(msg) # Remove setting self.remove_strategy_setting(strategy_name) @@ -1648,11 +1671,13 @@ class CtaEngine(BaseEngine): # 账号的持仓处理 => compare_pos compare_pos = dict() # vt_symbol: {'账号多单': xx, '账号空单':xxx, '策略空单':[], '策略多单':[]} - for position in list(self.positions.values()): + self.write_log(f'扫描帐号持仓') + positions = self.main_engine.get_all_positions() + for position in positions: # list(self.positions.values()): # gateway_name.symbol.exchange => symbol.exchange vt_symbol = position.vt_symbol vt_symbols.add(vt_symbol) - + self.write_log(f'帐号:{position.vt_symbol}:{position.volume}') compare_pos[vt_symbol] = OrderedDict( { "账号净仓": position.volume, @@ -1664,6 +1689,7 @@ class CtaEngine(BaseEngine): ) # 逐一根据策略仓位,与Account_pos进行处理比对 + self.write_log(f'扫描策略持仓') for strategy_pos in strategy_pos_list: for pos in strategy_pos.get('pos', []): vt_symbol = pos.get('vt_symbol') @@ -1694,20 +1720,22 @@ class CtaEngine(BaseEngine): u'{}({})'.format(strategy_pos['strategy_name'], abs(pos.get('volume', 0)))) self.write_log(u'更新{}策略持多仓=>{}'.format(vt_symbol, symbol_pos.get('策略多单', 0))) + compare_pos.update({vt_symbol: symbol_pos}) + pos_compare_result = '' # 精简输出 compare_info = '' for vt_symbol in sorted(vt_symbols): # 发送不一致得结果 - symbol_pos = compare_pos.pop(vt_symbol, None) - if not symbol_pos: - self.write_error(f'持仓对比中,找不到{vt_symbol}') - continue - net_symbol_pos = round(round(symbol_pos['策略多单'], 7) - round(symbol_pos['策略空单'], 7), 7) + symbol_pos = compare_pos.pop(vt_symbol, {}) + # if not symbol_pos: + # self.write_error(f'持仓对比中,找不到{vt_symbol}') + # continue + net_symbol_pos = round(round(symbol_pos.get('策略多单',0), 7) - round(symbol_pos.get('策略空单',0), 7), 7) # 多空都一致 - if round(symbol_pos['账号净仓'], 7) == net_symbol_pos: + if round(symbol_pos.get('账号净仓',0), 7) == net_symbol_pos: msg = u'{}多空都一致.{}\n'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False)) self.write_log(msg) compare_info += msg @@ -1716,7 +1744,7 @@ class CtaEngine(BaseEngine): self.write_error(u'{}不一致:{}'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False))) compare_info += u'{}不一致:{}\n'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False)) - diff_volume = round(symbol_pos['账号净仓'], 7) - net_symbol_pos + diff_volume = round(symbol_pos.get('账号净仓',0), 7) - net_symbol_pos # 账号仓位> 策略仓位, sell if diff_volume > 0 and auto_balance: contract = self.main_engine.get_contract(vt_symbol) diff --git a/vnpy/app/cta_stock/engine.py b/vnpy/app/cta_stock/engine.py index d7ed4ae5..4e85d650 100644 --- a/vnpy/app/cta_stock/engine.py +++ b/vnpy/app/cta_stock/engine.py @@ -1127,9 +1127,14 @@ class CtaEngine(BaseEngine): """ strategy = self.strategies[strategy_name] if strategy.trading: - err_msg = f"策略{strategy.strategy_name}移除失败,请先停止" - self.write_error(err_msg) - return False, err_msg + # err_msg = f"策略{strategy.strategy_name}正在运行,先停止" + #self.write_error(err_msg) + #return False, err_msg + ret , msg = self.stop_strategy(strategy_name) + if not ret: + return False, msg + else: + self.write_log(msg) # Remove setting self.remove_strategy_setting(strategy_name) diff --git a/vnpy/app/index_tick_publisher/__init__.py b/vnpy/app/index_tick_publisher/__init__.py index 33a36992..c48fe186 100644 --- a/vnpy/app/index_tick_publisher/__init__.py +++ b/vnpy/app/index_tick_publisher/__init__.py @@ -3,7 +3,7 @@ import os from pathlib import Path from vnpy.trader.app import BaseApp -from .engine import IndexTickPublisher, APP_NAME +from .engine import IndexTickPublisher,IndexTickPublisherV2, APP_NAME class IndexTickPublisherApp(BaseApp): @@ -12,4 +12,4 @@ class IndexTickPublisherApp(BaseApp): app_module = __module__ app_path = Path(__file__).parent display_name = u'期货指数全行情推送' - engine_class = IndexTickPublisher + engine_class = IndexTickPublisherV2 diff --git a/vnpy/app/index_tick_publisher/engine.py b/vnpy/app/index_tick_publisher/engine.py index f231f166..289741e1 100644 --- a/vnpy/app/index_tick_publisher/engine.py +++ b/vnpy/app/index_tick_publisher/engine.py @@ -11,24 +11,278 @@ from datetime import datetime, timedelta from time import sleep from logging import ERROR from pytdx.exhq import TdxExHq_API +from copy import deepcopy from vnpy.event import EventEngine from vnpy.trader.constant import Exchange from vnpy.trader.engine import BaseEngine, MainEngine from vnpy.trader.event import EVENT_TIMER -from vnpy.trader.object import TickData -from vnpy.trader.utility import get_trading_date -from vnpy.data.tdx.tdx_common import TDX_FUTURE_HOSTS +from vnpy.trader.object import TickData, SubscribeRequest +from vnpy.trader.utility import get_trading_date, get_underlying_symbol, load_json, get_real_symbol_by_exchange +from vnpy.data.tdx.tdx_common import TDX_FUTURE_HOSTS, get_future_contracts from vnpy.component.base import ( NIGHT_MARKET_23, NIGHT_MARKET_SQ2, MARKET_DAY_ONLY) from vnpy.amqp.producer import publisher +from vnpy.gateway.ctp.ctp_gateway import CtpMdApi, symbol_exchange_map APP_NAME = 'Idx_Publisher' +class IndexTickPublisherV2(BaseEngine): + """ + 指数tick发布服务 + 透过ctp 行情接口,获取所有合约,并根据合约的仓指,生成指数tick,发布至rabbitMQ + """ + + # ---------------------------------------------------------------------- + def __init__(self, main_engine: MainEngine, event_engine: EventEngine): + """""" + super(IndexTickPublisherV2, self).__init__( + main_engine, event_engine, APP_NAME) + + self.main_engine = main_engine + self.event_engine = event_engine + self.create_logger(logger_name=APP_NAME) + self.gateway_name = 'CTP' + self.last_minute = None + + self.registerEvent() + + self.connection_status = False # 连接状态 + + # ctp md api + self.subscribed_symbols = set() # 已订阅合约代码 + + self.md_api = None # API 的连接会话对象 + self.last_tick_dt = {} # 记录该会话对象的最后一个tick时间 + + self.instrument_count = 50000 + + self.has_qry_instrument = False + + # vt_setting.json内rabbitmq配置项 + self.conf = {} + self.pub = None + + self.status = {} + self.subscribed_symbols = set() # 已订阅合约代码 + self.ticks = {} + + # 本地/vnpy/data/tdx/future_contracts.json + self.all_contracts = get_future_contracts() + # 需要订阅的短合约 + self.selected_underly_symbols = load_json('subscribe_symbols.json', auto_save=False) + + # 短合约 <=> 所有真实合约 的数量 + self.underly_symbols_num_dict = {} + + def write_error(self, content: str): + self.write_log(msg=content, level=ERROR) + + def create_publisher(self, conf): + """创建rabbitmq 消息发布器""" + if self.pub: + return + try: + self.write_log(f'创建发布器:{conf}') + # 消息发布 + self.pub = publisher(host=conf.get('host', 'localhost'), + port=conf.get('port', 5672), + user=conf.get('user', 'admin'), + password=conf.get('password', 'admin'), + channel_number=conf.get('channel_number', 1), + queue_name=conf.get('queue_name', ''), + routing_key=conf.get('routing_key', 'default'), + exchange=conf.get('exchange', 'x_fanout_idx_tick')) + self.write_log(f'创建发布器成功') + except Exception as ex: + self.write_log(u'创建tick发布器异常:{}'.format(str(ex))) + + # ---------------------------------------------------------------------- + def registerEvent(self): + """注册事件监听""" + self.event_engine.register(EVENT_TIMER, self.process_timer_event) + + def process_timer_event(self, event): + """定时执行""" + dt = datetime.now() + + if self.last_minute and dt.minute == self.last_minute: + return + + self.last_minute = dt.minute + + self.check_status() + + def check_status(self): + """定期检查状态""" + if not self.md_api: + self.status.update({'con': False}) + self.write_log(f'行情接口未连接') + return + dt_now = datetime.now() + + # 扫描合约配置文件 + for underly_symbol, info in self.all_contracts.items(): + # 如果本地subscribe_symbols内有合约的指定订阅清单,进行排除 ['RB','IF'] + if len(self.selected_underly_symbols) > 0 and underly_symbol not in self.selected_underly_symbols: + continue + + # 日盘数据,夜盘期间不订阅 + if dt_now.hour < 4 or dt_now.hour > 20: + if underly_symbol in MARKET_DAY_ONLY: + continue + + # 获取当前所有的合约列表 + symbols = info.get('symbols', {}) + # 获取交易所 + exchange = info.get('exchange', 'LOCAL') + # 获取本地记录的tick dict + tick_dict = self.ticks.get(underly_symbol, {}) + + for symbol in symbols.keys(): + # 全路径合约 => 标准合约 ,如 ZC2109 => ZC109, RB2110 => rb2110 + vn_symbol = get_real_symbol_by_exchange(symbol, Exchange(exchange)) + + if symbol.replace(underly_symbol, '') < dt_now.strftime('%Y%m%d'): + self.write_log(f'移除早于当月的合约{symbol}') + symbols.pop(symbol, None) + continue + # 生成带交易所信息的合约 + vt_symbol = f'{vn_symbol}.{exchange}' + # symbol_exchange_map是全局变量,ctp md api会使用到,所以需要更新其 合约与交易所的关系 + if vn_symbol not in symbol_exchange_map: + symbol_exchange_map.update({vn_symbol: Exchange(exchange)}) + + # 该合约没有在行情中,重新发出订阅 + if vt_symbol not in tick_dict: + req = SubscribeRequest( + symbol=vn_symbol, + exchange=Exchange(exchange) + ) + self.subscribe(req) + + # 等级短合约 <=> 真实合约数量 + self.underly_symbols_num_dict.update({underly_symbol: len(symbols.keys())}) + + def connect(self, *args, **kwargs): + """ + 连接ctp行情,和rabbitmq推送 + :param args: + :param kwargs: + :return: + """ + self.write_log(f'connect({kwargs}') + + # 连接ctp行情服务器 + md_address = kwargs.get('md_address') + userid = kwargs.get('userid') + password = kwargs.get('password') + brokerid = kwargs.get('brokerid') + if not self.md_api: + self.write_log(f'创建ctp行情服务器{md_address}') + self.md_api = CtpMdApi(gateway=self) + self.md_api.connect(address=md_address, + userid=userid, + password=password, + brokerid=brokerid) + + # 连接rabbit MQ + rabbit_config = kwargs.get('rabbit_config', {}) + self.write_log(f'创建rabbitMQ 消息推送桩,{rabbit_config}') + self.conf.update(rabbit_config) + self.create_publisher(self.conf) + + def subscribe(self, req: SubscribeRequest): + """订阅合约""" + self.write_log(f'engine:订阅合约: {req.vt_symbol}') + + if req.vt_symbol not in self.subscribed_symbols: + self.subscribed_symbols.add(req.vt_symbol) + + if self.md_api: + self.md_api.subscribe(req) + + def on_tick(self, tick): + """ tick到达事件""" + short_symbol = get_underlying_symbol(tick.symbol).upper() + # 更新tick + tick_dict = self.ticks.get(short_symbol, None) + if tick_dict is None: + tick_dict = {tick.symbol: tick} + self.ticks.update({short_symbol: tick_dict}) + return + + # 与最后 + last_dt = self.last_tick_dt.get(short_symbol, tick.datetime) + + # 进行指数合成 + if last_dt and tick.datetime.second != last_dt.second: + all_amount = 0 + all_interest = 0 + all_volume = 0 + all_ask1 = 0 + all_bid1 = 0 + last_price = 0 + ask_price_1 = 0 + bid_price_1 = 0 + mi_tick = None + + # 已经积累的行情tick数量,不足总数减1,不处理 + n = self.underly_symbols_num_dict.get(short_symbol, 1) + if len(tick_dict) < min(n*0.8, 3) : + self.write_log(f'{short_symbol}合约数据{len(tick_dict)}不足{n} 0.8,暂不合成指数') + return + + # 计算所有合约的累加持仓量、资金、成交量、找出最大持仓量的主力合约 + for t in tick_dict.values(): + all_interest += t.open_interest + all_amount += t.last_price * t.open_interest + all_volume += t.volume + all_ask1 += t.ask_price_1 * t.open_interest + all_bid1 += t.bid_price_1 * t.open_interest + if mi_tick is None or mi_tick.open_interest < t.open_interest: + mi_tick = t + + # 总量 > 0 + if all_interest > 0 and all_amount > 0: + last_price = round(float(all_amount / all_interest), 4) + # 卖1价 + if all_ask1 > 0 and all_interest > 0: + ask_price_1 = round(float(all_ask1 / all_interest), 4) + # 买1价 + if all_bid1 > 0 and all_interest > 0: + bid_price_1 = round(float(all_bid1 / all_interest), 4) + + if mi_tick and last_price > 0: + if self.pub: + d = copy.copy(mi_tick.__dict__) + # 时间 =》 字符串 + if isinstance(mi_tick.datetime, datetime): + d.update({'datetime': mi_tick.datetime.strftime('%Y-%m-%d %H:%M:%S.%f')}) + # 变量 => 字符串 + d.update({'exchange': mi_tick.exchange.value}) + d.update({'symbol': f'{short_symbol}99', 'vt_symbol': f'{short_symbol}99.{mi_tick.exchange.value}'}) + # 更新未指数的持仓量、交易量,最后价格,ask1,bid1 + d.update({'open_interest': all_interest, 'volume': all_volume, + 'last_price': last_price, 'ask_price_1': ask_price_1, 'bid_price_1': bid_price_1}) + print('{} {}:{}'.format(d.get('datetime'), d.get("vt_symbol"), d.get('last_price'))) + d = json.dumps(d) + self.pub.pub(d) + + # 更新时间 + self.last_tick_dt.update({short_symbol: tick.datetime}) + + tick_dict.update({tick.symbol: tick}) + self.ticks.update({short_symbol: tick_dict}) + + def on_custom_tick(self, tick): + pass + + class IndexTickPublisher(BaseEngine): # 指数tick发布服务 # 通过通达信接口,获取指数行情tick,发布至rabbitMQ @@ -276,7 +530,14 @@ class IndexTickPublisher(BaseEngine): self.pub.exit() def check_status(self): - # self.write_log(u'检查tdx接口状态') + self.write_log(u'检查tdx接口状态') + if len(self.symbol_tick_dict) > 0: + k = self.symbol_tick_dict.keys()[0] + tick = self.symbol_tick_dict.get(k, None) + if tick: + self.write_log(f'{tick.vt_symbol}: {tick.datetime}, price:{tick.last_price}') + else: + self.write_log(f'目前没有收到tick') # 若还没有启动连接,就启动连接 over_time = self.last_tick_dt is None or (datetime.now() - self.last_tick_dt).total_seconds() > 60 @@ -285,8 +546,8 @@ class IndexTickPublisher(BaseEngine): self.close() self.api = None self.reconnect() - - # self.write_log(u'tdx接口状态正常') + else: + self.write_log(u'tdx接口状态正常') def qry_instrument(self): """ diff --git a/vnpy/data/tdx/tdx_common.py b/vnpy/data/tdx/tdx_common.py index ffc7d884..88eb0c10 100644 --- a/vnpy/data/tdx/tdx_common.py +++ b/vnpy/data/tdx/tdx_common.py @@ -65,27 +65,25 @@ PERIOD_MAPPING['1week'] = 5 PERIOD_MAPPING['1month'] = 6 # 期货行情服务器清单 -TDX_FUTURE_HOSTS = [ - #{"ip": "120.24.0.77", "port": 443, "name": "通达信接入主站"}, - {"ip": "112.74.214.43", "port": 7727, "name": "扩展市场深圳双线1"}, - #{"ip": "120.24.0.77", "port": 7727, "name": "扩展市场深圳双线2"}, - {"ip": "47.107.75.159", "port": 7727, "name": "扩展市场深圳双线3"}, - - {"ip": "113.105.142.136", "port": 443, "name": "扩展市场东莞主站"}, - {"ip": "113.105.142.133", "port": 443, "name": "港股期货东莞电信"}, - - {"ip": "119.97.185.5", "port": 7727, "name": "扩展市场武汉主站1"}, - {"ip": "119.97.185.7", "port": 7727, "name": "港股期货武汉主站1"}, - {"ip": "119.97.185.9", "port": 7727, "name": "港股期货武汉主站2"}, - {"ip": "59.175.238.38", "port": 7727, "name": "扩展市场武汉主站3"}, - - {"ip": "202.103.36.71", "port": 443, "name": "扩展市场武汉主站2"}, - - {"ip": "47.92.127.181", "port": 7727, "name": "扩展市场北京主站"}, - {"ip": "106.14.95.149", "port": 7727, "name": "扩展市场上海双线"}, - {"ip": '218.80.248.229', 'port': 7721, "name": "备用服务器1"}, - {"ip": '124.74.236.94', 'port': 7721, "name": "备用服务器2"}, - {'ip': '58.246.109.27', 'port': 7721, "name": "备用服务器3"}] +TDX_FUTURE_HOSTS =[ + #{'ip': '42.193.151.197', 'port': 7727, 'name': '广州期货双线1', 'speed': 6.622}, + #{'ip': '119.29.63.178', 'port': 7727, 'name': '广州期货双线3', 'speed': 7.716}, + #{'ip': '81.71.76.101', 'port': 7727, 'name': '广州期货双线2', 'speed': 14.914}, + #{'ip': '47.107.75.159', 'port': 7727, 'name': '扩展市场深圳双线3', 'speed': 34.542}, + #{'ip': '112.74.214.43', 'port': 7727, 'name': '扩展市场深圳双线1', 'speed': 37.881}, + #{'ip': '59.175.238.38', 'port': 7727, 'name': '扩展市场武汉主站3', 'speed': 49.63}, + #{'ip': '119.97.185.5', 'port': 7727, 'name': '扩展市场武汉主站1', 'speed': 70.563}, + {'ip': '218.80.248.229', 'port': 7721, 'name': '备用服务器1', 'speed': 86.91300000000001}, + #{'ip': '119.97.185.7', 'port': 7727, 'name': '港股期货武汉主站1', 'speed': 101.06099999999999}, + #{'ip': '106.14.95.149', 'port': 7727, 'name': '扩展市场上海双线', 'speed': 105.294}, + {'ip': '113.105.142.136', 'port': 443, 'name': '扩展市场东莞主站', 'speed': 10000.0}, + {'ip': '113.105.142.133', 'port': 443, 'name': '港股期货东莞电信', 'speed': 10000.0}, + #{'ip': '119.97.185.9', 'port': 7727, 'name': '港股期货武汉主站2', 'speed': 10000.0}, + {'ip': '202.103.36.71', 'port': 443, 'name': '扩展市场武汉主站2', 'speed': 10000.0}, + #{'ip': '47.92.127.181', 'port': 7727, 'name': '扩展市场北京主站', 'speed': 10000.0}, + {'ip': '124.74.236.94', 'port': 7721, 'name': '备用服务器2', 'speed': 10000.0}, + {'ip': '58.246.109.27', 'port': 7721, 'name': '备用服务器3', 'speed': 10000.0} +] def get_future_contracts(): diff --git a/vnpy/gateway/ctp/ctp_gateway.py b/vnpy/gateway/ctp/ctp_gateway.py index a3796693..180e3f41 100644 --- a/vnpy/gateway/ctp/ctp_gateway.py +++ b/vnpy/gateway/ctp/ctp_gateway.py @@ -55,7 +55,7 @@ from vnpy.trader.constant import ( OptionType, Interval ) -from vnpy.trader.gateway import BaseGateway, TickCombiner +from vnpy.trader.gateway import BaseGateway, TickCombiner, IndexGenerator from vnpy.trader.object import ( TickData, BarData, @@ -264,8 +264,18 @@ class CtpGateway(BaseGateway): self.combiners = {} self.tick_combiner_map = {} + # 本地指数行情合成器{ 'rb2110':x, 'rb2201':x',,} + self.index_generators = {} + # 已经创建得指数合成器symbol列表 ['RB99','J99',,,] + self.subscribed_index_symbols = [] + def connect(self, setting: dict): - """""" + """ + 连接交易服务器、行情服务器 + 行情服务器包括:ctp普通行情、ctp5档行情(上海、能源所)、tdx指数行情、rabbitMQ指数行情、天勤指数行情 + :param setting: + :return: + """ userid = setting["用户名"] password = setting["密码"] brokerid = setting["经纪商代码"] @@ -277,6 +287,7 @@ class CtpGateway(BaseGateway): product_info = setting["产品信息"] rabbit_dict = setting.get('rabbit', None) tq_dict = setting.get('tq', None) + tdx_dict = setting.get('tdx', None) if ( (not td_address.startswith("tcp://")) and (not td_address.startswith("ssl://")) @@ -331,7 +342,7 @@ class CtpGateway(BaseGateway): self.write_log(f'激活天勤行情接口') self.tq_api = TqMdApi(gateway=self) self.tq_api.connect(tq_dict) - else: + elif tdx_dict is not None: self.write_log(f'激活通达信行情接口') self.tdx_api = TdxMdApi(gateway=self) self.tdx_api.connect() @@ -394,7 +405,14 @@ class CtpGateway(BaseGateway): return True def subscribe(self, req: SubscribeRequest): - """""" + """ + 订阅合约行情 + 普通合约 => ctp行情、5档行情 + 指数合约 => 通达信、rabbitMQ,天勤 + 套利合约 => 合约合并器 + :param req: + :return: + """ try: if self.md_api: # 如果是自定义的套利合约符号 @@ -447,6 +465,7 @@ class CtpGateway(BaseGateway): return elif req.exchange == Exchange.SPD: self.write_error(u'自定义合约{}不在CTP设置中'.format(req.symbol)) + return # 指数合约,从tdx行情订阅 if req.symbol[-2:] in ['99']: @@ -460,6 +479,11 @@ class CtpGateway(BaseGateway): elif self.tq_api: self.write_log(f'使用天勤接口订阅{req.symbol}') self.tq_api.subscribe(req) + else: + if req.symbol not in self.subscribed_index_symbols: + self.write_log(f'使用本地指数生成器进行订阅') + self.subscribe_local_index(req) + else: # 上期所、上能源支持五档行情,使用天勤接口 if self.tq_api and req.exchange in [Exchange.SHFE, Exchange.INE]: @@ -468,6 +492,7 @@ class CtpGateway(BaseGateway): if self.l2_md_api and req.exchange in [Exchange.SHFE, Exchange.INE]: self.write_log(f'使用五档行情接口订阅:{req.symbol}') self.l2_md_api.subscribe(req) + else: self.write_log(f'使用CTP接口订阅{req.symbol}') self.md_api.subscribe(req) @@ -492,6 +517,26 @@ class CtpGateway(BaseGateway): bg = BarGenerator(on_bar=self.on_bar) self.klines.update({vt_symbol: bg}) + def subscribe_local_index(self, req): + """ + 订阅本地合约 + :param req: + :return: + """ + underlying_symbol = get_underlying_symbol(req.symbol) + symbol_info = future_contracts.get(underlying_symbol,None) + if symbol_info: + generator = IndexGenerator(gateway=self, setting=symbol_info) + + # 登记订阅真实合约 <=>合成器 关系 + for vn_symbol in generator.symbols: + self.index_generators[vn_symbol] = generator + + # 登记指数合约到本地已订阅信息 + self.subscribed_index_symbols.append(req.symbol) + else: + self.write_error(f'{underlying_symbol}信息没有在vnpy/data/tdx/future_contracts.json文件中,不能创建指数订阅') + def send_order(self, req: OrderRequest): """""" return self.td_api.send_order(req) @@ -574,11 +619,15 @@ class CtpGateway(BaseGateway): def on_custom_tick(self, tick): """推送自定义合约行情""" # 自定义合约行情 - for combiner in self.tick_combiner_map.get(tick.symbol, []): tick = copy(tick) combiner.on_tick(tick) + # 推送至指数生成器 + if tick.symbol in self.index_generators: + tick = copy(tick) + # 推送on_tick()方法 + self.index_generators[tick.symbol].on_tick(tick) class CtpMdApi(MdApi): """""" @@ -688,7 +737,7 @@ class CtpMdApi(MdApi): date=s_date, time=dt.strftime('%H:%M:%S.%f'), trading_day=trading_day, - name=symbol_name_map[symbol], + name=symbol_name_map.get(symbol,symbol), volume=today_volume, last_volume=volume_changed, open_interest=data["OpenInterest"], @@ -759,6 +808,7 @@ class CtpMdApi(MdApi): """ Login onto server. """ + self.gateway.write_log(f'{self.name}向行情服务器发出登录请求') req = { "UserID": self.userid, "Password": self.password, @@ -772,8 +822,8 @@ class CtpMdApi(MdApi): """ Subscribe to tick data update. """ + self.gateway.write_log(f'{self.name}订阅:{req.exchange} {req.symbol}') if self.login_status: - self.gateway.write_log(f'{self.name}订阅:{req.exchange} {req.symbol}') self.subscribeMarketData(req.symbol) self.subscribed.add(req.symbol) @@ -835,7 +885,6 @@ class CtpTdApi(TdApi): self.gateway.write_log("向交易服务器进行帐号登录") self.login() - def onFrontDisconnected(self, reason: int): """""" self.login_status = False @@ -1864,7 +1913,7 @@ class SubMdApi(): def check_status(self): """接口状态的健康检查""" - + self.gateway.write_log("检查sub接口的状态") # 订阅的合约 d = {'sub_symbols': sorted(self.symbol_tick_dict.keys())} @@ -1899,7 +1948,7 @@ class SubMdApi(): self.gateway.status.update(d) def on_message(self, chan, method_frame, _header_frame, body, userdata=None): - # print(" [x] %r" % body) + #print(" [x] %r" % body) try: str_tick = body.decode('utf-8') d = json.loads(str_tick) @@ -1908,6 +1957,8 @@ class SubMdApi(): d = self.conver_update(d) symbol = d.pop('symbol', None) + if symbol == 'ZC99': + a = 1 str_datetime = d.pop('datetime', None) if symbol not in self.registed_symbol_set or str_datetime is None: return diff --git a/vnpy/gateway/pb/pb_gateway.py b/vnpy/gateway/pb/pb_gateway.py index 04344e78..221be787 100644 --- a/vnpy/gateway/pb/pb_gateway.py +++ b/vnpy/gateway/pb/pb_gateway.py @@ -1100,6 +1100,45 @@ class PbTdApi(object): return results def query_account(self): + + if self.gateway.file_type == 'dbf': + self.query_account_dbf() + else: + self.query_account_csv() + + def query_account_dbf(self): + """获取资金账号信息""" + # dbf 文件名 + account_dbf = os.path.abspath(os.path.join(self.account_folder, + '{}{}.dbf'.format( + PB_FILE_NAMES.get('accounts'), + self.trading_date))) + try: + # dbf => 资金帐号信息 + self.gateway.write_log(f'扫描资金帐号信息:{account_dbf}') + table = dbf.Table(account_dbf, codepage='cp936') + table.open(dbf.READ_ONLY) + for data in table: + # ["资金账户"] + if str(data.zjzh).strip() != self.userid: + continue + account = AccountData( + gateway_name=self.gateway_name, + accountid=self.userid, + balance=float(data.dyjz), # ["单元净值"] + frozen=float(data.dyjz) - float(data.kyye), # data["可用余额"] + currency="人民币", + trading_day=self.trading_day + ) + self.gateway.on_account(account) + + table.close() + + except Exception as ex: + self.gateway.write_error(f'dbf扫描资金帐号异常:{str(ex)}') + self.gateway.write_error(traceback.format_exc()) + + def query_account_csv(self): """获取资金账号信息""" if self.gateway.pb_version == '2018': # 账号的文件 @@ -1134,6 +1173,67 @@ class PbTdApi(object): def query_position(self): """获取持仓信息""" + if self.gateway.file_type == 'dbf': + self.query_position_dbf() + else: + self.query_position_csv() + + def query_position_dbf(self): + """从dbf文件获取持仓信息""" + # fields:['zqgs', 'zjzh', 'zhlx', 'zqdm', 'zqmc', 'zqlb', 'zxjg', 'cbjg', 'cpbh', 'cpmc', 'dybh', 'dymc', 'ccsl', 'dqcb', 'kysl', 'jjsz', 'qjsz', 'zqlx' + # , 'jysc', 'jybz', 'dryk', 'ljyk', 'fdyk', 'fyl', 'ykl', 'tzlx', 'gddm', 'mrsl', 'mcsl', 'mrje', 'mcje', 'zdf', 'bbj', 'qjcb', 'gtcb', 'gtyk', 'zgb'] + # dbf 文件名 + position_dbf = os.path.abspath(os.path.join(self.account_folder, + '{}{}.dbf'.format( + PB_FILE_NAMES.get('positions'), + self.trading_date))) + try: + # dbf => 股票持仓信息 + self.gateway.write_log(f'扫描股票持仓信息:{position_dbf}') + table = dbf.Table(position_dbf, codepage='cp936') + table.open(dbf.READ_ONLY) + for data in table: + if str(data.zjzh).strip() != self.userid: + continue + symbol = str(data.zqdm).strip() #["证券代码"] + + # symbol => Exchange + exchange = symbol_exchange_map.get(symbol, None) + if not exchange: + exchange_str = get_stock_exchange(code=symbol) + if len(exchange_str) > 0: + exchange = Exchange(exchange_str) + symbol_exchange_map.update({symbol: exchange}) + + name = symbol_name_map.get(symbol, None) + if not name: + name = data.zqmc # ["证券名称"] + symbol_name_map.update({symbol: name}) + + position = PositionData( + gateway_name=self.gateway_name, + accountid=self.userid, + symbol=symbol, #["证券代码"], + exchange=exchange, + direction=Direction.NET, + name=name, + volume=int(data.ccsl), # ["持仓数量"] + yd_volume=int(data.kysl),# ["可用数量"] + price=float(data.cbjg), # ["成本价"] + cur_price=float(data.zxjg), # ["最新价"] + pnl=float(data.fdyk), # ["浮动盈亏"] + holder_id=str(data.gddm).strip() #["股东"] + ) + self.gateway.on_position(position) + + table.close() + + except Exception as ex: + self.gateway.write_error(f'dbf扫描股票持仓异常:{str(ex)}') + self.gateway.write_error(traceback.format_exc()) + + def query_position_csv(self): + """从csv获取持仓信息""" if self.gateway.pb_version == '2018': # 持仓的文件 positions_csv = os.path.abspath(os.path.join(self.account_folder, @@ -1187,6 +1287,98 @@ class PbTdApi(object): self.gateway.on_position(position) def query_orders(self): + if self.gateway.file_type == 'dbf': + self.query_orders_dbf() + else: + self.query_orders_csv() + + def query_orders_dbf(self): + """dbf文件获取所有委托""" + # fields:['zqgs', 'zjzh', 'zhlx', 'cpbh', 'cpmc', 'dybh', 'dymc', 'wtph', 'wtxh', 'zqdm', 'zqmc', 'wtfx', 'jglx', 'wtjg', 'wtsl', 'wtzt', 'cjsl', 'wtje' + # , 'cjjj', 'cdsl', 'jysc', 'fdyy', 'wtly', 'wtrq', 'wtsj', 'jybz'] + orders_dbf = os.path.abspath(os.path.join(self.account_folder, + '{}{}.dbf'.format( + PB_FILE_NAMES.get('orders'), + self.trading_date))) + try: + # dbf => 股票委托信息 + self.gateway.write_log(f'扫描股票委托信息:{orders_dbf}') + table = dbf.Table(orders_dbf, codepage='cp936') + table.open(dbf.READ_ONLY) + for data in table: + if str(data.zjzh).strip() != self.userid: # ["资金账户"] + continue + + sys_orderid = str(data.wtxh).strip() # ["委托序号"] + + # 检查是否存在本地order_manager缓存中 + order = self.gateway.order_manager.get_order_with_sys_orderid(sys_orderid) + order_date = str(data.wtrq).strip() #["委托日期"] + order_time = str(data.wtsj).strip() #["委托时间"] + order_status = STATUS_NAME2VT.get(str(data.wtzt).strip()) # ["委托状态"] + + # 检查是否存在本地orders缓存中(系统级别的委托单) + sys_order = self.orders.get(sys_orderid, None) + + if order is not None: + continue + # 委托单不存在本地映射库,说明是其他地方下的单子,不是通过本接口下单 + if sys_order is None: + + # 不处理以下状态 + if order_status in [Status.SUBMITTING, Status.REJECTED, Status.CANCELLED, Status.CANCELLING]: + continue + + order_dt = datetime.strptime(f'{order_date} {order_time}', "%Y%m%d %H%M%S") + direction = DIRECTION_STOCK_NAME2VT.get(str(data.wtfx).strip()) # ["委托方向"] + offset = Offset.NONE + if direction is None: + direction = Direction.NET + elif direction == Direction.LONG: + offset = Offset.OPEN + elif direction == Direction.SHORT: + offset = Offset.CLOSE + sys_order = OrderData( + gateway_name=self.gateway_name, + symbol=str(data.zqdm).strip(), # ["证券代码"] + exchange=EXCHANGE_NAME2VT.get(str(data.jysc).strip()), # ["交易市场"] + orderid=sys_orderid, + sys_orderid=sys_orderid, + accountid=self.userid, + type=ORDERTYPE_NAME2VT.get(str(data.jglx).strip(), OrderType.LIMIT), # ["价格类型"] + direction=direction, + offset=offset, + price=float(data.wtjg), # ["委托价格"] + volume=float(data.wtsl), # ["委托数量"] + traded=float(data.cjsl), # ["成交数量"] + status=order_status, + datetime=order_dt, + time=order_dt.strftime('%H:%M:%S') + ) + # 直接发出订单更新事件 + self.gateway.write_log(f'账号订单查询,新增:{sys_order.__dict__}') + self.orders.update({sys_order.sys_orderid: sys_order}) + self.gateway.on_order(sys_order) + continue + + # 存在账号缓存,判断状态是否更新 + else: + # 暂不处理,交给XHPT_WTCX模块处理 + if sys_order.status != order_status or sys_order.traded != float(data.cjsl): # ["成交数量"] + sys_order.traded = float(data.cjsl) # ["成交数量"] + sys_order.status = order_status + self.orders.update({sys_order.sys_orderid: sys_order}) + self.gateway.write_log(f'账号订单查询,更新:{sys_order.__dict__}') + self.gateway.on_order(sys_order) + continue + + table.close() + + except Exception as ex: + self.gateway.write_error(f'dbf扫描股票委托异常:{str(ex)}') + self.gateway.write_error(traceback.format_exc()) + + def query_orders_csv(self): """获取所有委托""" # 所有委托的文件 if self.gateway.pb_version == '2018': @@ -1435,6 +1627,76 @@ class PbTdApi(object): continue def query_trades(self): + if self.gateway.file_type == 'dbf': + self.query_trades_dbf() + else: + self.query_trades_csv() + + def query_trades_dbf(self): + """dbf文件获取所有成交""" + # fields:['zqgs', 'zjzh', 'zhlx', 'cpbh', 'cpmc', 'dybh', 'dymc', 'cjxh', 'wtph', 'wtxh', 'zqdm', 'zqmc', 'wtfx', 'zqlb', 'ywfl', 'cjrq', 'cjsj', 'cjsl' + # , 'cjjg', 'zfy', 'cjje', 'jysc', 'jybz', 'wtly', 'rybh', 'rymc'] + trades_dbf = os.path.abspath(os.path.join(self.account_folder, + '{}{}.dbf'.format( + PB_FILE_NAMES.get('trades'), + self.trading_date))) + try: + # dbf => 股票成交信息 + self.gateway.write_log(f'扫描股票成交信息:{trades_dbf}') + table = dbf.Table(trades_dbf, codepage='cp936') + table.open(dbf.READ_ONLY) + for data in table: + if str(data.zjzh).strip()!= self.userid: # ["资金账户"] + continue + + sys_orderid = str(data.wtxh) # ["委托序号"] + sys_tradeid = str(data.cjxh) # ["成交序号"] + + # 检查是否存在本地trades缓存中 + trade = self.trades.get(sys_tradeid, None) + order = self.gateway.order_manager.get_order_with_sys_orderid(sys_orderid) + + # 如果交易不再本地映射关系 + if trade is None and order is None: + trade_date = str(data.cjrq).strip() #["成交日期"] + trade_time = str(data.cjsj).strip() #["成交时间"] + trade_dt = datetime.strptime(f'{trade_date} {trade_time}', "%Y%m%d %H%M%S") + direction = DIRECTION_STOCK_NAME2VT.get(str(data.wtfx).strip()) # ["委托方向"] + offset = Offset.NONE + if direction is None: + direction = Direction.NET + elif direction == Direction.LONG: + offset = Offset.OPEN + elif direction == Direction.SHORT: + offset = Offset.CLOSE + trade = TradeData( + gateway_name=self.gateway_name, + symbol=str(data.zqdm).strip(), # ["证券代码"] + exchange=EXCHANGE_NAME2VT.get(str(data.jysc).strip()), # ["交易市场"] + orderid=sys_tradeid, + tradeid=sys_tradeid, + sys_orderid=sys_orderid, + accountid=self.userid, + direction=direction, + offset=offset, + price=float(data.cjjg), # ["成交价格"] + volume=float(data.cjsl), # ["成交数量"] + datetime=trade_dt, + time=trade_dt.strftime('%H:%M:%S'), + trade_amount=float(data.cjje), # ["成交金额"] + commission=float(data.zfy) # ["总费用"] + ) + self.trades[sys_tradeid] = trade + self.gateway.on_trade(copy.copy(trade)) + continue + table.close() + + except Exception as ex: + self.gateway.write_error(f'dbf扫描股票成交异常:{str(ex)}') + self.gateway.write_error(traceback.format_exc()) + + + def query_trades_csv(self): """获取所有成交""" # 所有成交的文件 if self.gateway.pb_version == '2018': diff --git a/vnpy/trader/gateway.py b/vnpy/trader/gateway.py index a5a72191..d809f567 100644 --- a/vnpy/trader/gateway.py +++ b/vnpy/trader/gateway.py @@ -4,8 +4,9 @@ import sys from abc import ABC, abstractmethod from typing import Any, Sequence, Dict, List, Optional, Callable -from copy import copy +from copy import copy,deepcopy from logging import INFO, DEBUG, ERROR +from datetime import datetime from vnpy.event import Event, EventEngine from .event import ( @@ -34,7 +35,7 @@ from .object import ( Exchange ) -from vnpy.trader.utility import get_folder_path, round_to +from vnpy.trader.utility import get_folder_path, round_to, get_underlying_symbol, get_real_symbol_by_exchange from vnpy.trader.util_logger import setup_logger @@ -561,6 +562,113 @@ class TickCombiner(object): self.gateway.on_tick(ratio_tick) +class IndexGenerator: + """ + 指数生成器 + """ + + def __init__(self, gateway, setting): + self.gateway = gateway + self.gateway_name = self.gateway.gateway_name + self.gateway.write_log(u'创建指数合成类:{}'.format(setting)) + + self.ticks = {} # 所有真实合约, symbol: tick + self.last_dt = None # 最后tick得时间 + self.underlying_symbol = setting.get('underlying_symbol') + self.exchange = setting.get('exchange', None) + self.price_tick = setting.get('price_tick') + self.symbols = setting.get('symbols', {}) + # 订阅行情 + self.subscribe() + + self.n = len(self.symbols) + + def subscribe(self): + """订阅行情""" + dt_now = datetime.now() + for symbol in list(self.symbols.keys()): + pre_open_interest = self.symbols.get(symbol,0) + # 全路径合约 => 标准合约 ,如 ZC2109 => ZC109, RB2110 => rb2110 + vn_symbol = get_real_symbol_by_exchange(symbol, Exchange(self.exchange)) + # 先移除 + self.symbols.pop(symbol, None) + if symbol.replace(self.underlying_symbol, '') < dt_now.strftime('%Y%m%d'): + self.gateway.write_log(f'移除早于当月的合约{symbol}') + continue + + # 重新登记合约 + self.symbols[vn_symbol] = pre_open_interest + + # 发出订阅 + req = SubscribeRequest( + symbol=vn_symbol, + exchange=Exchange(self.exchange) + ) + self.gateway.subscribe(req) + + def on_tick(self, tick): + """tick到达事件""" + # 更新tick + if self.ticks is {}: + self.ticks.update({tick.symbol: tick}) + return + + # 进行指数合成 + if self.last_dt and tick.datetime.second != self.last_dt.second: + all_amount = 0 + all_interest = 0 + all_volume = 0 + all_ask1 = 0 + all_bid1 = 0 + last_price = 0 + ask_price_1 = 0 + bid_price_1 = 0 + mi_tick = None + + # 已经积累的行情tick数量,不足总数减1,不处理 + + if len(self.ticks) < min(self.n * 0.8, 3): + self.gateway.write_log(f'{self.underlying_symbol}合约数据{len(self.ticks)}不足{self.n} 0.8,暂不合成指数') + return + + # 计算所有合约的累加持仓量、资金、成交量、找出最大持仓量的主力合约 + for t in self.ticks.values(): + all_interest += t.open_interest + all_amount += t.last_price * t.open_interest + all_volume += t.volume + all_ask1 += t.ask_price_1 * t.open_interest + all_bid1 += t.bid_price_1 * t.open_interest + if mi_tick is None or mi_tick.open_interest < t.open_interest: + mi_tick = t + + # 总量 > 0 + if all_interest > 0 and all_amount > 0: + last_price = round(float(all_amount / all_interest), 4) + # 卖1价 + if all_ask1 > 0 and all_interest > 0: + ask_price_1 = round(float(all_ask1 / all_interest), 4) + # 买1价 + if all_bid1 > 0 and all_interest > 0: + bid_price_1 = round(float(all_bid1 / all_interest), 4) + + if mi_tick and last_price > 0: + idx_tick = deepcopy(mi_tick) + idx_tick.symbol = f'{self.underlying_symbol}99' + idx_tick.vt_symbol = f'{idx_tick.symbol}.{self.exchange}' + idx_tick.open_interest = all_interest + idx_tick.volume = all_volume + idx_tick.last_price = last_price + idx_tick.ask_price_1 = ask_price_1 + idx_tick.bid_price_1 = bid_price_1 + + self.gateway.on_tick(idx_tick) + + # 更新时间 + self.last_dt = tick.datetime + # 更新tick + self.ticks.update({tick.symbol: tick}) + + class LocalOrderManager: """ Management tool to support use local order id for trading.