From 54c2c70af43085f970114b8ea623c94fa7403030 Mon Sep 17 00:00:00 2001 From: msincenselee Date: Sun, 5 Jan 2020 19:03:13 +0800 Subject: [PATCH] =?UTF-8?q?[=E4=BB=A3=E7=A0=81=E6=9B=B4=E6=96=B0]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- prod/linux/index_publisher/run.py | 2 + vnpy/app/cta_strategy_pro/base.py | 1 + vnpy/app/cta_strategy_pro/engine.py | 163 +++++++++++++++++++----- vnpy/app/index_tick_publisher/engine.py | 10 +- vnpy/data/renko/config.py | 7 + vnpy/data/renko/test_rebuild_future.py | 34 +++++ vnpy/data/renko/test_rebuild_stock.py | 35 +++++ vnpy/data/tdx/refill_tdx_future_bars.py | 20 +-- vnpy/gateway/ctp/ctp_gateway.py | 61 +++++---- vnpy/gateway/ctptest/test.py | 93 ++++++++++++++ vnpy/trader/converter.py | 27 ++-- vnpy/trader/engine.py | 12 +- vnpy/trader/event.py | 1 + vnpy/trader/object.py | 5 +- 14 files changed, 374 insertions(+), 97 deletions(-) create mode 100644 vnpy/data/renko/config.py create mode 100644 vnpy/data/renko/test_rebuild_future.py create mode 100644 vnpy/data/renko/test_rebuild_stock.py create mode 100644 vnpy/gateway/ctptest/test.py diff --git a/prod/linux/index_publisher/run.py b/prod/linux/index_publisher/run.py index 807fb9c6..6e70a75e 100644 --- a/prod/linux/index_publisher/run.py +++ b/prod/linux/index_publisher/run.py @@ -1,3 +1,5 @@ +# flake8: noqa + import os import sys import multiprocessing diff --git a/vnpy/app/cta_strategy_pro/base.py b/vnpy/app/cta_strategy_pro/base.py index cf27f3dd..b22eb7e1 100644 --- a/vnpy/app/cta_strategy_pro/base.py +++ b/vnpy/app/cta_strategy_pro/base.py @@ -80,6 +80,7 @@ class StopOrder: lock: bool = False vt_orderids: list = field(default_factory=list) status: StopOrderStatus = StopOrderStatus.WAITING + gateway_name: str = None EVENT_CTA_LOG = "eCtaLog" diff --git a/vnpy/app/cta_strategy_pro/engine.py b/vnpy/app/cta_strategy_pro/engine.py index 3f2b7b04..4f227262 100644 --- a/vnpy/app/cta_strategy_pro/engine.py +++ b/vnpy/app/cta_strategy_pro/engine.py @@ -25,6 +25,7 @@ from vnpy.trader.object import ( ) from vnpy.trader.event import ( EVENT_TICK, + EVENT_BAR, EVENT_ORDER, EVENT_TRADE, EVENT_POSITION @@ -37,7 +38,7 @@ from vnpy.trader.constant import ( Offset, Status ) -from vnpy.trader.utility import load_json, save_json, extract_vt_symbol, round_to, get_folder_path +from vnpy.trader.utility import load_json, save_json, extract_vt_symbol, round_to, get_folder_path, get_underlying_symbol from vnpy.trader.util_logger import setup_logger, logging from vnpy.trader.converter import OffsetConverter @@ -49,7 +50,9 @@ from .base import ( EngineType, StopOrder, StopOrderStatus, - STOPORDER_PREFIX + STOPORDER_PREFIX, + MARKET_DAY_ONLY + ) from .template import CtaTemplate @@ -70,6 +73,8 @@ class CtaEngine(BaseEngine): 2、使用免费的tdx源,替代rqdata源 3、取消初始化数据时,从全局的cta_strategy_data中恢复数据,改为策略自己初始化恢复数据 4、支持多合约订阅和多合约交易. 扩展的合约在setting中配置,由策略进行订阅 + 5、支持先启动策略,后连接gateway + 6、支持指定gateway的交易。主引擎可接入多个gateway """ engine_type = EngineType.LIVE # live trading engine @@ -91,7 +96,12 @@ class CtaEngine(BaseEngine): self.strategy_loggers = {} # strategy_name: logger + # 未能订阅的symbols,支持策略启动时,并未接入gateway + # gateway_name.vt_symbol: set() of (strategy_name, is_bar) + self.pending_subcribe_symbol_map = defaultdict(set) + self.symbol_strategy_map = defaultdict(list) # vt_symbol: strategy list + self.bar_strategy_map = defaultdict(list) # vt_symbol: strategy list self.strategy_symbol_map = defaultdict(set) # strategy_name: vt_symbol set self.orderid_strategy_map = {} # vt_orderid: strategy @@ -116,20 +126,27 @@ class CtaEngine(BaseEngine): self.write_log("CTA策略引擎初始化成功") def close(self): - """""" + """停止所属有的策略""" self.stop_all_strategies() def register_event(self): - """""" + """注册事件""" self.event_engine.register(EVENT_TICK, self.process_tick_event) + self.event_engine.register(EVENT_BAR, self.process_bar_event) self.event_engine.register(EVENT_ORDER, self.process_order_event) self.event_engine.register(EVENT_TRADE, self.process_trade_event) self.event_engine.register(EVENT_POSITION, self.process_position_event) def process_tick_event(self, event: Event): - """""" + """处理tick到达事件""" tick = event.data + key = f'{tick.gateway_name}.{tick.vt_symbol}' + v = self.pending_subcribe_symbol_map.pop(key, None) + if v: + # 这里不做tick/bar的判断了,因为基本有tick就有bar + self.write_log(f'{key} tick已经到达,移除未订阅记录:{v}') + strategies = self.symbol_strategy_map[tick.vt_symbol] if not strategies: return @@ -140,6 +157,10 @@ class CtaEngine(BaseEngine): if strategy.inited: self.call_strategy_func(strategy, strategy.on_tick, tick) + def process_bar_event(self, event: Event): + """处理bar到达事件""" + pass + def process_order_event(self, event: Event): """""" order = event.data @@ -209,6 +230,43 @@ class CtaEngine(BaseEngine): self.offset_converter.update_position(position) + def check_unsubscribed_symbols(self): + """检查未订阅合约""" + + for key in self.pending_subcribe_symbol_map.keys(): + # gateway_name.symbol.exchange = > gateway_name, vt_symbol + keys = key.split('.') + gateway_name = keys[0] + vt_symbol = '.'.join(keys[1:]) + + contract = self.main_engine.get_contract(vt_symbol) + is_bar = True if vt_symbol in self.bar_strategy_map else False + if contract: + # 获取合约的缩写号 + underlying_symbol = get_underlying_symbol(vt_symbol) + dt = datetime.now() + # 若为中金所的合约,白天才提交订阅请求 + if underlying_symbol in MARKET_DAY_ONLY and not (9 < dt.hour < 16): + continue + + self.write_log(f'重新提交合约{vt_symbol}订阅请求') + for strategy_name, is_bar in list(self.pending_subcribe_symbol_map[vt_symbol]): + self.subscribe_symbol(strategy_name=strategy_name, + vt_symbol=vt_symbol, + gateway_name=gateway_name, + is_bar=is_bar) + else: + try: + self.write_log(f'找不到合约{vt_symbol}信息,尝试请求所有接口') + symbol, exchange = extract_vt_symbol(vt_symbol) + req = SubscribeRequest(symbol=symbol, exchange=exchange) + req.is_bar = is_bar + self.main_engine.subscribe(req, gateway_name) + + except Exception as ex: + self.write_error(u'重新订阅{}.{}异常:{},{}'.format(gateway_name, vt_symbol, str(ex), traceback.format_exc())) + return + def check_stop_order(self, tick: TickData): """""" for stop_order in list(self.stop_orders.values()): @@ -278,7 +336,8 @@ class CtaEngine(BaseEngine): price: float, volume: float, type: OrderType, - lock: bool + lock: bool, + gateway_name: str = None ): """ Send a new order to server. @@ -294,15 +353,19 @@ class CtaEngine(BaseEngine): volume=volume, ) + # 如果没有指定网关,则使用合约信息内的网关 + if contract.gateway_name and not gateway_name: + gateway_name = contract.gateway_name + # Convert with offset converter - req_list = self.offset_converter.convert_order_request(original_req, lock) + req_list = self.offset_converter.convert_order_request(original_req, lock, gateway_name) # Send Orders vt_orderids = [] for req in req_list: vt_orderid = self.main_engine.send_order( - req, contract.gateway_name) + req, gateway_name) # Check if sending order successful if not vt_orderid: @@ -310,7 +373,7 @@ class CtaEngine(BaseEngine): vt_orderids.append(vt_orderid) - self.offset_converter.update_order_request(req, vt_orderid) + self.offset_converter.update_order_request(req, vt_orderid, gateway_name) # Save relationship between orderid and strategy. self.orderid_strategy_map[vt_orderid] = strategy @@ -326,7 +389,8 @@ class CtaEngine(BaseEngine): offset: Offset, price: float, volume: float, - lock: bool + lock: bool, + gateway_name: str = None ): """ Send a limit order to server. @@ -339,7 +403,8 @@ class CtaEngine(BaseEngine): price, volume, OrderType.LIMIT, - lock + lock, + gateway_name ) def send_server_stop_order( @@ -350,7 +415,8 @@ class CtaEngine(BaseEngine): offset: Offset, price: float, volume: float, - lock: bool + lock: bool, + gateway_name: str = None ): """ Send a stop order to server. @@ -366,7 +432,8 @@ class CtaEngine(BaseEngine): price, volume, OrderType.STOP, - lock + lock, + gateway_name ) def send_local_stop_order( @@ -377,7 +444,8 @@ class CtaEngine(BaseEngine): offset: Offset, price: float, volume: float, - lock: bool + lock: bool, + gateway_name: str = None ): """ Create a new local stop order. @@ -393,7 +461,8 @@ class CtaEngine(BaseEngine): volume=volume, stop_orderid=stop_orderid, strategy_name=strategy.strategy_name, - lock=lock + lock=lock, + gateway_name = gateway_name ) self.stop_orders[stop_orderid] = stop_order @@ -451,7 +520,8 @@ class CtaEngine(BaseEngine): price: float, volume: float, stop: bool, - lock: bool + lock: bool, + gateway_name: str = None ): """ 该方法供策略使用,发送委托。 @@ -462,18 +532,19 @@ class CtaEngine(BaseEngine): strategy_name=strategy.name, level=logging.ERROR) return "" - + if contract.gateway_name and not gateway_name: + gateway_name = contract.gateway_name # Round order price and volume to nearest incremental value price = round_to(price, contract.pricetick) volume = round_to(volume, contract.min_volume) if stop: if contract.stop_supported: - return self.send_server_stop_order(strategy, contract, direction, offset, price, volume, lock) + return self.send_server_stop_order(strategy, contract, direction, offset, price, volume, lock, gateway_name) else: - return self.send_local_stop_order(strategy, vt_symbol, direction, offset, price, volume, lock) + return self.send_local_stop_order(strategy, vt_symbol, direction, offset, price, volume, lock, gateway_name) else: - return self.send_limit_order(strategy, contract, direction, offset, price, volume, lock) + return self.send_limit_order(strategy, contract, direction, offset, price, volume, lock, gateway_name) def cancel_order(self, strategy: CtaTemplate, vt_orderid: str): """ @@ -494,7 +565,7 @@ class CtaEngine(BaseEngine): for vt_orderid in copy(vt_orderids): self.cancel_order(strategy, vt_orderid) - def subscribe_symbol(self, strategy_name: str, vt_symbol: str): + def subscribe_symbol(self, strategy_name: str, vt_symbol: str, gateway_name: str = '', is_bar: bool = False): """订阅合约""" strategy = self.strategies.get(strategy_name, None) if not strategy: @@ -502,24 +573,41 @@ class CtaEngine(BaseEngine): contract = self.main_engine.get_contract(vt_symbol) if contract: + if contract.gateway_name and not gateway_name: + gateway_name = contract.gateway_name req = SubscribeRequest( symbol=contract.symbol, exchange=contract.exchange) - self.main_engine.subscribe(req, contract.gateway_name) + self.main_engine.subscribe(req, gateway_name) + else: + self.write_log(msg=f"找不到合约{vt_symbol},添加到待订阅列表", + strategy_name=strategy.name) + self.pending_subcribe_symbol_map[f'{gateway_name}.{vt_symbol}'].add((strategy_name, is_bar)) + try: + self.write_log(f'找不到合约{vt_symbol}信息,尝试请求所有接口') + symbol, exchange = extract_vt_symbol(vt_symbol) + req = SubscribeRequest(symbol=symbol, exchange=exchange) + req.is_bar = is_bar + self.main_engine.subscribe(req, gateway_name) + except Exception as ex: + self.write_error(u'重新订阅{}异常:{},{}'.format(vt_symbol, str(ex), traceback.format_exc())) + + # 如果是订阅bar + if is_bar: + strategies = self.bar_strategy_map[vt_symbol] + if strategy not in strategies: + strategies.append(strategy) + self.bar_strategy_map.update({vt_symbol: strategies}) + else: # 添加 合约订阅 vt_symbol <=> 策略实例 strategy 映射. strategies = self.symbol_strategy_map[vt_symbol] strategies.append(strategy) - # 添加 策略名 strategy_name <=> 合约订阅 vt_symbol 的映射 - subscribe_symbol_set = self.strategy_symbol_map[strategy.name] - subscribe_symbol_set.add(contract.vt_symbol) - return True + # 添加 策略名 strategy_name <=> 合约订阅 vt_symbol 的映射 + subscribe_symbol_set = self.strategy_symbol_map[strategy.name] + subscribe_symbol_set.add(contract.vt_symbol) - else: - self.write_log(msg=f"行情订阅失败,找不到合约{vt_symbol}", - strategy_name=strategy.name, - level=logging.CRITICAL) - return False + return True @lru_cache() def get_size(self, vt_symbol: str): @@ -559,6 +647,19 @@ class CtaEngine(BaseEngine): return None + def get_account(self, vt_accountid: str): + """ 查询账号的资金""" + return self.main_engine.get_account(vt_accountid) + + def get_position(self, vt_symbol: str, direction: Direction, gateway_name: str = ''): + """ 查询合约在账号的持仓,需要指定方向""" + vt_position_id = f"{gateway_name}.{vt_symbol}.{direction.value}" + return self.main_engine.get_position(vt_position_id) + + def get_position_holding(self, vt_symbol: str, gateway_name: str = ''): + """ 查询合约在账号的持仓(包含多空)""" + return self.offset_converter.get_position_holding(vt_symbol, gateway_name) + def get_engine_type(self): """""" return self.engine_type diff --git a/vnpy/app/index_tick_publisher/engine.py b/vnpy/app/index_tick_publisher/engine.py index 054acc21..2a9447a1 100644 --- a/vnpy/app/index_tick_publisher/engine.py +++ b/vnpy/app/index_tick_publisher/engine.py @@ -121,7 +121,7 @@ class IndexTickPublisher(BaseEngine): c -= 1 self.fail_ip_dict.update({k: c}) - self.checkStatus() + self.check_status() # ---------------------------------------------------------------------- def ping(self, ip, port=7709): @@ -144,7 +144,7 @@ class IndexTickPublisher(BaseEngine): self.write_log(u'该服务器IP {}无响应.'.format(ip)) return timedelta(seconds=10).total_seconds() * 1000 except Exception as ex: - self.write_error(u'tdx ping服务器{},异常的响应{}'.format(ip,str(ex))) + self.write_error(u'tdx ping服务器{},异常的响应{}'.format(ip, str(ex))) return timedelta(seconds=10).total_seconds() * 1000 def sort_ip_speed(self): @@ -232,7 +232,7 @@ class IndexTickPublisher(BaseEngine): # 更新 symbol_exchange_dict , symbol_market_dict self.write_log(u'查询合约') - self.qryInstrument() + self.qry_instrument() self.conf.update(rabbit_config) self.create_publisher(self.conf) @@ -277,7 +277,7 @@ class IndexTickPublisher(BaseEngine): self.write_log(u'退出rabbitMQ 发布器') self.pub.exit() - def checkStatus(self): + def check_status(self): # self.write_log(u'检查tdx接口状态') # 若还没有启动连接,就启动连接 @@ -290,7 +290,7 @@ class IndexTickPublisher(BaseEngine): # self.write_log(u'tdx接口状态正常') - def qryInstrument(self): + def qry_instrument(self): """ 查询/更新合约信息 :return: diff --git a/vnpy/data/renko/config.py b/vnpy/data/renko/config.py new file mode 100644 index 00000000..e2159d29 --- /dev/null +++ b/vnpy/data/renko/config.py @@ -0,0 +1,7 @@ +# encoding: UTF-8 + + +HEIGHT_LIST = [3, 5, 10, 'K3', 'K5', 'K10'] + +FUTURE_RENKO_DB_NAME = 'FutureRenko_Db' +STOCK_RENKO_DB_NAME = 'StockRenko_Db' diff --git a/vnpy/data/renko/test_rebuild_future.py b/vnpy/data/renko/test_rebuild_future.py new file mode 100644 index 00000000..8fbd89ca --- /dev/null +++ b/vnpy/data/renko/test_rebuild_future.py @@ -0,0 +1,34 @@ +# flake8: noqa +import os +import sys + +vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')) +if vnpy_root not in sys.path: + sys.path.append(vnpy_root) + +os.environ["VNPY_TESTING"] = "1" + +from vnpy.data.renko.rebuild_future import * + +# Mongo数据库得地址,renko数据库名,tick文件缓存目录 +setting = { + "host": "192.168.0.207", + "db_name": FUTURE_RENKO_DB_NAME, + "cache_folder": os.path.join(vnpy_root, 'tick_data', 'tdx', 'future') +} +builder = FutureRenkoRebuilder(setting) + +# 生成单个 +# builder.start(symbol='RB99',min_diff=1, height=10, start_date='2019-04-01', end_date='2019-09-10') +# 生成多个 +builder.start(symbol='J99', price_tick=0.5, height=[10], start_date='2016-01-01', end_date='2016-02-16') + +# 导出csv +# builder.export(symbol='RB99',height=10, start_date='2019-04-01', end_date='2019-09-10') + +# 生成批量更新脚本 +# builder.export_scripts() + +# builder.check_all_index() + +exit(0) diff --git a/vnpy/data/renko/test_rebuild_stock.py b/vnpy/data/renko/test_rebuild_stock.py new file mode 100644 index 00000000..5a961543 --- /dev/null +++ b/vnpy/data/renko/test_rebuild_stock.py @@ -0,0 +1,35 @@ +# flake8: noqa +import os +import sys + +vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')) +if vnpy_root not in sys.path: + sys.path.append(vnpy_root) + +os.environ["VNPY_TESTING"] = "1" + +from vnpy.data.renko.rebuild_stock import * + +# Mongo数据库得地址,renko数据库名,tick文件缓存目录 +setting = { + "host": "192.168.0.207", + "db_name": STOCK_RENKO_DB_NAME, + "cache_folder": os.path.join(vnpy_root, 'tick_data', 'tdx', 'stock') +} +builder = StockRenkoRebuilder(setting) + +# 生成单个 +# builder.start(symbol='600410',min_diff=0.01, height=10, start_date='2019-04-01', end_date='2019-09-10') +# 生成多个 +builder.start(symbol='123022', price_tick=0.001, height=[10, 'K3', 'K5'], start_date='2019-01-01', + end_date='2019-12-31') + +# 导出csv +# builder.export(symbol='600410',height=10, start_date='2019-04-01', end_date='2019-09-10') + +# 生成批量更新脚本 +# builder.export_scripts() + +builder.check_all_index() + +exit(0) diff --git a/vnpy/data/tdx/refill_tdx_future_bars.py b/vnpy/data/tdx/refill_tdx_future_bars.py index 59f969b8..a70c37f9 100644 --- a/vnpy/data/tdx/refill_tdx_future_bars.py +++ b/vnpy/data/tdx/refill_tdx_future_bars.py @@ -1,7 +1,6 @@ # flake8: noqa """ 下载通达信指数合约1分钟bar => vnpy项目目录/bar_data/ - """ import os import sys @@ -29,24 +28,7 @@ api_01 = TdxFutureData() # 更新本地合约缓存信息 api_01.update_mi_contracts() - -def bar_to_dict(bar_data: BarData): - d = OrderedDict({ - 'datetime': bar_data.datetime, - 'symbol': bar_data.symbol, - 'vt_symbol': bar_data.vt_symbol, - 'exchange': bar_data.exchange.value, - 'open': bar_data.open_price, - 'close': bar_data.close_price, - 'high': bar_data.high_price, - 'low': bar_data.low_price, - 'volume': bar_data.volume, - 'open_interest': bar_data.open_interest, - 'trading_day': bar_data.trading_day - }) - return d - - +# 逐一指数合约下载并更新 for underlying_symbol in api_01.future_contracts.keys(): index_symbol = underlying_symbol + '99' print(f'开始更新:{index_symbol}') diff --git a/vnpy/gateway/ctp/ctp_gateway.py b/vnpy/gateway/ctp/ctp_gateway.py index 79545b33..f82797ad 100644 --- a/vnpy/gateway/ctp/ctp_gateway.py +++ b/vnpy/gateway/ctp/ctp_gateway.py @@ -1276,6 +1276,7 @@ class TdxMdApi(): self.gateway.on_tick(tick) self.gateway.on_custom_tick(tick) + class SubMdApi(): """ RabbitMQ Subscriber 数据行情接收API @@ -1360,7 +1361,7 @@ class SubMdApi(): self.gateway.write_log(u'关闭订阅器接收线程') self.thread.join() except Exception as ex: - self.gateway.write_error(u'退出rabbitMQ行情api异常') + self.gateway.write_error(u'退出rabbitMQ行情api异常:{}'.format(str(ex))) # ---------------------------------------------------------------------- def subscribe(self, subscribeReq): @@ -1453,22 +1454,22 @@ class TickCombiner(object): # 以下情况,基本为单腿涨跌停,不合成价差/价格比 Tick if (self.last_leg1_tick.ask_price_1 == 0 or self.last_leg1_tick.bid_price_1 == self.last_leg1_tick.upperLimit) \ - and self.last_leg1_tick.askVolume1 == 0: + and self.last_leg1_tick.ask_volume_1 == 0: self.gateway.write_log( u'leg1:{0}涨停{1},不合成价差Tick'.format(self.last_leg1_tick.vtSymbol, self.last_leg1_tick.bid_price_1)) return if (self.last_leg1_tick.bid_price_1 == 0 or self.last_leg1_tick.ask_price_1 == self.last_leg1_tick.lowerLimit) \ - and self.last_leg1_tick.bidVolume1 == 0: + and self.last_leg1_tick.bid_volume_1 == 0: self.gateway.write_log( u'leg1:{0}跌停{1},不合成价差Tick'.format(self.last_leg1_tick.vtSymbol, self.last_leg1_tick.ask_price_1)) return if (self.last_leg2_tick.ask_price_1 == 0 or self.last_leg2_tick.bid_price_1 == self.last_leg2_tick.upperLimit) \ - and self.last_leg2_tick.askVolume1 == 0: + and self.last_leg2_tick.ask_volume_1 == 0: self.gateway.write_log( u'leg2:{0}涨停{1},不合成价差Tick'.format(self.last_leg2_tick.vtSymbol, self.last_leg2_tick.bid_price_1)) return if (self.last_leg2_tick.bid_price_1 == 0 or self.last_leg2_tick.ask_price_1 == self.last_leg2_tick.lowerLimit) \ - and self.last_leg2_tick.bidVolume1 == 0: + and self.last_leg2_tick.bid_volume_1 == 0: self.gateway.write_log( u'leg2:{0}跌停{1},不合成价差Tick'.format(self.last_leg2_tick.vtSymbol, self.last_leg2_tick.ask_price_1)) return @@ -1509,16 +1510,21 @@ class TickCombiner(object): value=self.last_leg1_tick.pre_close * self.leg1_ratio - self.last_leg2_tick.pre_close * self.leg2_ratio) # 开盘价 if self.last_leg2_tick.open_price > 0 and self.last_leg1_tick.open_price > 0: - spread_tick.openPrice = round_to(target=self.price_tick, - value=self.last_leg1_tick.open_price * self.leg1_ratio - self.last_leg2_tick.open_price * self.leg2_ratio) + spread_tick.open_price = round_to(target=self.price_tick, + value=self.last_leg1_tick.open_price * self.leg1_ratio - self.last_leg2_tick.open_price * self.leg2_ratio) # 最高价 - self.spread_high = spread_tick.ask_price_1 if self.spread_high is None else max(self.spread_high, - spread_tick.ask_price_1) + if self.spread_high: + self.spread_high = max(self.spread_high, spread_tick.ask_price_1) + else: + self.spread_high = spread_tick.ask_price_1 spread_tick.high_price = self.spread_high # 最低价 - self.spread_low = spread_tick.bid_price_1 if self.spread_low is None else min(self.spread_low, - spread_tick.bid_price_1) + if self.spread_low: + self.spread_low = min(self.spread_low, spread_tick.bid_price_1) + else: + self.spread_low = spread_tick.bid_price_1 + spread_tick.low_price = self.spread_low self.gateway.on_tick(spread_tick) @@ -1537,33 +1543,38 @@ class TickCombiner(object): ratio_tick.ask_price_1 = round_to(target=self.price_tick, value=100 * self.last_leg1_tick.ask_price_1 * self.leg1_ratio / ( self.last_leg2_tick.bid_price_1 * self.leg2_ratio)) - ratio_tick.askVolume1 = min(self.last_leg1_tick.askVolume1, self.last_leg2_tick.bidVolume1) + ratio_tick.ask_volume_1 = min(self.last_leg1_tick.ask_volume_1, self.last_leg2_tick.bid_volume_1) ratio_tick.bid_price_1 = round_to(target=self.price_tick, value=100 * self.last_leg1_tick.bid_price_1 * self.leg1_ratio / ( self.last_leg2_tick.ask_price_1 * self.leg2_ratio)) - ratio_tick.bidVolume1 = min(self.last_leg1_tick.bidVolume1, self.last_leg2_tick.askVolume1) + ratio_tick.bid_volume_1 = min(self.last_leg1_tick.bid_volume_1, self.last_leg2_tick.ask_volume_1) ratio_tick.lastPrice = round_to(target=self.price_tick, value=(ratio_tick.ask_price_1 + ratio_tick.bid_price_1) / 2) # 昨收盘价 - if self.last_leg2_tick.preClosePrice > 0 and self.last_leg1_tick.preClosePrice > 0: - ratio_tick.preClosePrice = round_to(target=self.price_tick, - value=100 * self.last_leg1_tick.preClosePrice * self.leg1_ratio / ( - self.last_leg2_tick.preClosePrice * self.leg2_ratio)) + if self.last_leg2_tick.pre_close > 0 and self.last_leg1_tick.pre_close > 0: + ratio_tick.pre_close = round_to(target=self.price_tick, + value=100 * self.last_leg1_tick.pre_close * self.leg1_ratio / ( + self.last_leg2_tick.pre_close * self.leg2_ratio)) # 开盘价 - if self.last_leg2_tick.openPrice > 0 and self.last_leg1_tick.openPrice > 0: - ratio_tick.openPrice = round_to(target=self.price_tick, - value=100 * self.last_leg1_tick.openPrice * self.leg1_ratio / ( - self.last_leg2_tick.openPrice * self.leg2_ratio)) + if self.last_leg2_tick.open_price > 0 and self.last_leg1_tick.open_price > 0: + ratio_tick.open_price = round_to(target=self.price_tick, + value=100 * self.last_leg1_tick.open_price * self.leg1_ratio / ( + self.last_leg2_tick.open_price * self.leg2_ratio)) # 最高价 - self.ratio_high = ratio_tick.ask_price_1 if self.ratio_high is None else max(self.ratio_high, - ratio_tick.ask_price_1) + if self.ratio_high: + self.ratio_high = max(self.ratio_high, ratio_tick.ask_price_1) + else: + self.ratio_high = ratio_tick.ask_price_1 ratio_tick.high_price = self.spread_high # 最低价 - self.ratio_low = ratio_tick.bid_price_1 if self.ratio_low is None else min(self.ratio_low, - ratio_tick.bid_price_1) + if self.ratio_low: + self.ratio_low = min(self.ratio_low, ratio_tick.bid_price_1) + else: + self.ratio_low = ratio_tick.bid_price_1 + ratio_tick.low_price = self.spread_low self.gateway.on_tick(ratio_tick) diff --git a/vnpy/gateway/ctptest/test.py b/vnpy/gateway/ctptest/test.py new file mode 100644 index 00000000..472c3dd4 --- /dev/null +++ b/vnpy/gateway/ctptest/test.py @@ -0,0 +1,93 @@ +# flake8: noqa + +import sys +import os +import traceback +from time import sleep + +vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')) +if vnpy_root not in sys.path: + print(u'append {}'.format(vnpy_root)) + sys.path.append(vnpy_root) + +from vnpy.gateway.ctptest import CtptestGateway +from vnpy.event import EventEngine +from vnpy.trader.constant import Exchange +from vnpy.trader.event import ( + EVENT_TICK, + EVENT_ORDER, + EVENT_TRADE, + EVENT_POSITION, + EVENT_ACCOUNT, + EVENT_LOG, +) +from vnpy.trader.object import ( + SubscribeRequest, +) +# 这里放期货公司需要你连接的测试系统的相关信息 +ctp_setting = { + "用户名": "xxx", + "密码": "xxx", + "经纪商代码": "9999", + "交易服务器": "tcp://180.168.146.187:10100", + "行情服务器": "tcp://180.168.146.187:10110", + "产品名称": "simnow_client_test", + "授权编码": "0000000000000000", + "产品信息": "" +} + + +def test(): + """测试""" + from qtpy import QtCore + import sys + + def print_log(event): + log = event.data + print(f'{log.time}: {log.msg}\n') + + def print_event(event): + data = event.data + print(f'{data.__dict__}') + + app = QtCore.QCoreApplication(sys.argv) + + event_engine = EventEngine() + event_engine.register(EVENT_LOG, print_log) + event_engine.register(EVENT_TICK, print_event) + event_engine.register(EVENT_ACCOUNT, print_event) + event_engine.register(EVENT_ORDER, print_event) + event_engine.register(EVENT_TRADE, print_event) + event_engine.register(EVENT_POSITION, print_event) + + event_engine.start() + + gateway = CtptestGateway(event_engine) + gateway.connect(ctp_setting) + + # gateway.connect() + auto_subscribe_symbols = ['rb2010'] + for symbol in auto_subscribe_symbols: + print(u'自动订阅合约:{}'.format(symbol)) + sub = SubscribeRequest(symbol=symbol, exchange=Exchange.SHFE) + sub.symbol = symbol + gateway.subscribe(sub) + + couter = 20 + gateway.init_query() + + while couter > 0: + print(u'{}'.format(couter)) + sleep(1) + couter -= 1 + + sys.exit(app.exec_()) + + +if __name__ == '__main__': + + try: + test() + except Exception as ex: + print(u'异常:{},{}'.format(str(ex), traceback.format_exc()), file=sys.stderr) + print('Finished') diff --git a/vnpy/trader/converter.py b/vnpy/trader/converter.py index b181bdbe..ed512428 100644 --- a/vnpy/trader/converter.py +++ b/vnpy/trader/converter.py @@ -13,7 +13,9 @@ from vnpy.trader.constant import (Direction, Offset, Exchange) class OffsetConverter: - """""" + """ + 仓位转换 + """ def __init__(self, main_engine: MainEngine): """""" @@ -25,7 +27,7 @@ class OffsetConverter: if not self.is_convert_required(position.vt_symbol): return - holding = self.get_position_holding(position.vt_symbol) + holding = self.get_position_holding(position.vt_symbol, position.gateway_name) holding.update_position(position) def update_trade(self, trade: TradeData): @@ -33,7 +35,7 @@ class OffsetConverter: if not self.is_convert_required(trade.vt_symbol): return - holding = self.get_position_holding(trade.vt_symbol) + holding = self.get_position_holding(trade.vt_symbol, trade.gateway_name) holding.update_trade(trade) def update_order(self, order: OrderData): @@ -41,32 +43,33 @@ class OffsetConverter: if not self.is_convert_required(order.vt_symbol): return - holding = self.get_position_holding(order.vt_symbol) + holding = self.get_position_holding(order.vt_symbol, order.gateway_name) holding.update_order(order) - def update_order_request(self, req: OrderRequest, vt_orderid: str): + def update_order_request(self, req: OrderRequest, vt_orderid: str, gateway_name: str = ''): """""" if not self.is_convert_required(req.vt_symbol): return - holding = self.get_position_holding(req.vt_symbol) + holding = self.get_position_holding(req.vt_symbol, gateway_name) holding.update_order_request(req, vt_orderid) - def get_position_holding(self, vt_symbol: str): - """""" - holding = self.holdings.get(vt_symbol, None) + def get_position_holding(self, vt_symbol: str, gateway_name: str = ''): + """获取持仓信息""" + k = f'{gateway_name}.{vt_symbol}' + holding = self.holdings.get(k, None) if not holding: contract = self.main_engine.get_contract(vt_symbol) holding = PositionHolding(contract) - self.holdings[vt_symbol] = holding + self.holdings[k] = holding return holding - def convert_order_request(self, req: OrderRequest, lock: bool): + def convert_order_request(self, req: OrderRequest, lock: bool, gateway_name: str = ''): """""" if not self.is_convert_required(req.vt_symbol): return [req] - holding = self.get_position_holding(req.vt_symbol) + holding = self.get_position_holding(req.vt_symbol, gateway_name) if lock: return holding.convert_order_request_lock(req) diff --git a/vnpy/trader/engine.py b/vnpy/trader/engine.py index db8df656..b2d24257 100644 --- a/vnpy/trader/engine.py +++ b/vnpy/trader/engine.py @@ -162,10 +162,16 @@ class MainEngine: def subscribe(self, req: SubscribeRequest, gateway_name: str): """ Subscribe tick data update of a specific gateway. + 如果没有指定gateway,那么所有的gateway都会接收改订阅请求 """ - gateway = self.get_gateway(gateway_name) - if gateway: - gateway.subscribe(req) + if gateway_name: + gateway = self.get_gateway(gateway_name) + if gateway: + gateway.subscribe(req) + else: + for gateway in self.gateways.items(): + if gateway: + gateway.subscribe(req) def send_order(self, req: OrderRequest, gateway_name: str): """ diff --git a/vnpy/trader/event.py b/vnpy/trader/event.py index 83902ec2..dd64d3a9 100644 --- a/vnpy/trader/event.py +++ b/vnpy/trader/event.py @@ -5,6 +5,7 @@ Event type string used in VN Trader. from vnpy.event import EVENT_TIMER # noqa EVENT_TICK = "eTick." +EVENT_BAR = "eBar." EVENT_TRADE = "eTrade." EVENT_ORDER = "eOrder." EVENT_POSITION = "ePosition." diff --git a/vnpy/trader/object.py b/vnpy/trader/object.py index 51914588..bb607e95 100644 --- a/vnpy/trader/object.py +++ b/vnpy/trader/object.py @@ -206,7 +206,7 @@ class PositionData(BaseData): def __post_init__(self): """""" self.vt_symbol = f"{self.symbol}.{self.exchange.value}" - self.vt_positionid = f"{self.vt_symbol}.{self.direction.value}" + self.vt_positionid = f"{self.gateway_name}.{self.vt_symbol}.{self.direction.value}" @dataclass @@ -273,11 +273,12 @@ class ContractData(BaseData): @dataclass class SubscribeRequest: """ - Request sending to specific gateway for subscribing tick data update. + Request sending to specific gateway for subscribing tick/bar data update. """ symbol: str exchange: Exchange + is_bar: bool = False def __post_init__(self): """"""