[代码更新]

This commit is contained in:
msincenselee 2020-01-05 19:03:13 +08:00
parent f9b004a60d
commit 54c2c70af4
14 changed files with 374 additions and 97 deletions

View File

@ -1,3 +1,5 @@
# flake8: noqa
import os import os
import sys import sys
import multiprocessing import multiprocessing

View File

@ -80,6 +80,7 @@ class StopOrder:
lock: bool = False lock: bool = False
vt_orderids: list = field(default_factory=list) vt_orderids: list = field(default_factory=list)
status: StopOrderStatus = StopOrderStatus.WAITING status: StopOrderStatus = StopOrderStatus.WAITING
gateway_name: str = None
EVENT_CTA_LOG = "eCtaLog" EVENT_CTA_LOG = "eCtaLog"

View File

@ -25,6 +25,7 @@ from vnpy.trader.object import (
) )
from vnpy.trader.event import ( from vnpy.trader.event import (
EVENT_TICK, EVENT_TICK,
EVENT_BAR,
EVENT_ORDER, EVENT_ORDER,
EVENT_TRADE, EVENT_TRADE,
EVENT_POSITION EVENT_POSITION
@ -37,7 +38,7 @@ from vnpy.trader.constant import (
Offset, Offset,
Status Status
) )
from vnpy.trader.utility import load_json, save_json, extract_vt_symbol, round_to, get_folder_path from vnpy.trader.utility import load_json, save_json, extract_vt_symbol, round_to, get_folder_path, get_underlying_symbol
from vnpy.trader.util_logger import setup_logger, logging from vnpy.trader.util_logger import setup_logger, logging
from vnpy.trader.converter import OffsetConverter from vnpy.trader.converter import OffsetConverter
@ -49,7 +50,9 @@ from .base import (
EngineType, EngineType,
StopOrder, StopOrder,
StopOrderStatus, StopOrderStatus,
STOPORDER_PREFIX STOPORDER_PREFIX,
MARKET_DAY_ONLY
) )
from .template import CtaTemplate from .template import CtaTemplate
@ -70,6 +73,8 @@ class CtaEngine(BaseEngine):
2使用免费的tdx源替代rqdata源 2使用免费的tdx源替代rqdata源
3取消初始化数据时从全局的cta_strategy_data中恢复数据改为策略自己初始化恢复数据 3取消初始化数据时从全局的cta_strategy_data中恢复数据改为策略自己初始化恢复数据
4支持多合约订阅和多合约交易. 扩展的合约在setting中配置由策略进行订阅 4支持多合约订阅和多合约交易. 扩展的合约在setting中配置由策略进行订阅
5支持先启动策略后连接gateway
6支持指定gateway的交易主引擎可接入多个gateway
""" """
engine_type = EngineType.LIVE # live trading engine engine_type = EngineType.LIVE # live trading engine
@ -91,7 +96,12 @@ class CtaEngine(BaseEngine):
self.strategy_loggers = {} # strategy_name: logger self.strategy_loggers = {} # strategy_name: logger
# 未能订阅的symbols,支持策略启动时并未接入gateway
# gateway_name.vt_symbol: set() of (strategy_name, is_bar)
self.pending_subcribe_symbol_map = defaultdict(set)
self.symbol_strategy_map = defaultdict(list) # vt_symbol: strategy list self.symbol_strategy_map = defaultdict(list) # vt_symbol: strategy list
self.bar_strategy_map = defaultdict(list) # vt_symbol: strategy list
self.strategy_symbol_map = defaultdict(set) # strategy_name: vt_symbol set self.strategy_symbol_map = defaultdict(set) # strategy_name: vt_symbol set
self.orderid_strategy_map = {} # vt_orderid: strategy self.orderid_strategy_map = {} # vt_orderid: strategy
@ -116,20 +126,27 @@ class CtaEngine(BaseEngine):
self.write_log("CTA策略引擎初始化成功") self.write_log("CTA策略引擎初始化成功")
def close(self): def close(self):
"""""" """停止所属有的策略"""
self.stop_all_strategies() self.stop_all_strategies()
def register_event(self): def register_event(self):
"""""" """注册事件"""
self.event_engine.register(EVENT_TICK, self.process_tick_event) self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_BAR, self.process_bar_event)
self.event_engine.register(EVENT_ORDER, self.process_order_event) self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event) self.event_engine.register(EVENT_TRADE, self.process_trade_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event) self.event_engine.register(EVENT_POSITION, self.process_position_event)
def process_tick_event(self, event: Event): def process_tick_event(self, event: Event):
"""""" """处理tick到达事件"""
tick = event.data tick = event.data
key = f'{tick.gateway_name}.{tick.vt_symbol}'
v = self.pending_subcribe_symbol_map.pop(key, None)
if v:
# 这里不做tick/bar的判断了因为基本有tick就有bar
self.write_log(f'{key} tick已经到达,移除未订阅记录:{v}')
strategies = self.symbol_strategy_map[tick.vt_symbol] strategies = self.symbol_strategy_map[tick.vt_symbol]
if not strategies: if not strategies:
return return
@ -140,6 +157,10 @@ class CtaEngine(BaseEngine):
if strategy.inited: if strategy.inited:
self.call_strategy_func(strategy, strategy.on_tick, tick) self.call_strategy_func(strategy, strategy.on_tick, tick)
def process_bar_event(self, event: Event):
"""处理bar到达事件"""
pass
def process_order_event(self, event: Event): def process_order_event(self, event: Event):
"""""" """"""
order = event.data order = event.data
@ -209,6 +230,43 @@ class CtaEngine(BaseEngine):
self.offset_converter.update_position(position) self.offset_converter.update_position(position)
def check_unsubscribed_symbols(self):
"""检查未订阅合约"""
for key in self.pending_subcribe_symbol_map.keys():
# gateway_name.symbol.exchange = > gateway_name, vt_symbol
keys = key.split('.')
gateway_name = keys[0]
vt_symbol = '.'.join(keys[1:])
contract = self.main_engine.get_contract(vt_symbol)
is_bar = True if vt_symbol in self.bar_strategy_map else False
if contract:
# 获取合约的缩写号
underlying_symbol = get_underlying_symbol(vt_symbol)
dt = datetime.now()
# 若为中金所的合约,白天才提交订阅请求
if underlying_symbol in MARKET_DAY_ONLY and not (9 < dt.hour < 16):
continue
self.write_log(f'重新提交合约{vt_symbol}订阅请求')
for strategy_name, is_bar in list(self.pending_subcribe_symbol_map[vt_symbol]):
self.subscribe_symbol(strategy_name=strategy_name,
vt_symbol=vt_symbol,
gateway_name=gateway_name,
is_bar=is_bar)
else:
try:
self.write_log(f'找不到合约{vt_symbol}信息,尝试请求所有接口')
symbol, exchange = extract_vt_symbol(vt_symbol)
req = SubscribeRequest(symbol=symbol, exchange=exchange)
req.is_bar = is_bar
self.main_engine.subscribe(req, gateway_name)
except Exception as ex:
self.write_error(u'重新订阅{}.{}异常:{},{}'.format(gateway_name, vt_symbol, str(ex), traceback.format_exc()))
return
def check_stop_order(self, tick: TickData): def check_stop_order(self, tick: TickData):
"""""" """"""
for stop_order in list(self.stop_orders.values()): for stop_order in list(self.stop_orders.values()):
@ -278,7 +336,8 @@ class CtaEngine(BaseEngine):
price: float, price: float,
volume: float, volume: float,
type: OrderType, type: OrderType,
lock: bool lock: bool,
gateway_name: str = None
): ):
""" """
Send a new order to server. Send a new order to server.
@ -294,15 +353,19 @@ class CtaEngine(BaseEngine):
volume=volume, volume=volume,
) )
# 如果没有指定网关,则使用合约信息内的网关
if contract.gateway_name and not gateway_name:
gateway_name = contract.gateway_name
# Convert with offset converter # Convert with offset converter
req_list = self.offset_converter.convert_order_request(original_req, lock) req_list = self.offset_converter.convert_order_request(original_req, lock, gateway_name)
# Send Orders # Send Orders
vt_orderids = [] vt_orderids = []
for req in req_list: for req in req_list:
vt_orderid = self.main_engine.send_order( vt_orderid = self.main_engine.send_order(
req, contract.gateway_name) req, gateway_name)
# Check if sending order successful # Check if sending order successful
if not vt_orderid: if not vt_orderid:
@ -310,7 +373,7 @@ class CtaEngine(BaseEngine):
vt_orderids.append(vt_orderid) vt_orderids.append(vt_orderid)
self.offset_converter.update_order_request(req, vt_orderid) self.offset_converter.update_order_request(req, vt_orderid, gateway_name)
# Save relationship between orderid and strategy. # Save relationship between orderid and strategy.
self.orderid_strategy_map[vt_orderid] = strategy self.orderid_strategy_map[vt_orderid] = strategy
@ -326,7 +389,8 @@ class CtaEngine(BaseEngine):
offset: Offset, offset: Offset,
price: float, price: float,
volume: float, volume: float,
lock: bool lock: bool,
gateway_name: str = None
): ):
""" """
Send a limit order to server. Send a limit order to server.
@ -339,7 +403,8 @@ class CtaEngine(BaseEngine):
price, price,
volume, volume,
OrderType.LIMIT, OrderType.LIMIT,
lock lock,
gateway_name
) )
def send_server_stop_order( def send_server_stop_order(
@ -350,7 +415,8 @@ class CtaEngine(BaseEngine):
offset: Offset, offset: Offset,
price: float, price: float,
volume: float, volume: float,
lock: bool lock: bool,
gateway_name: str = None
): ):
""" """
Send a stop order to server. Send a stop order to server.
@ -366,7 +432,8 @@ class CtaEngine(BaseEngine):
price, price,
volume, volume,
OrderType.STOP, OrderType.STOP,
lock lock,
gateway_name
) )
def send_local_stop_order( def send_local_stop_order(
@ -377,7 +444,8 @@ class CtaEngine(BaseEngine):
offset: Offset, offset: Offset,
price: float, price: float,
volume: float, volume: float,
lock: bool lock: bool,
gateway_name: str = None
): ):
""" """
Create a new local stop order. Create a new local stop order.
@ -393,7 +461,8 @@ class CtaEngine(BaseEngine):
volume=volume, volume=volume,
stop_orderid=stop_orderid, stop_orderid=stop_orderid,
strategy_name=strategy.strategy_name, strategy_name=strategy.strategy_name,
lock=lock lock=lock,
gateway_name = gateway_name
) )
self.stop_orders[stop_orderid] = stop_order self.stop_orders[stop_orderid] = stop_order
@ -451,7 +520,8 @@ class CtaEngine(BaseEngine):
price: float, price: float,
volume: float, volume: float,
stop: bool, stop: bool,
lock: bool lock: bool,
gateway_name: str = None
): ):
""" """
该方法供策略使用发送委托 该方法供策略使用发送委托
@ -462,18 +532,19 @@ class CtaEngine(BaseEngine):
strategy_name=strategy.name, strategy_name=strategy.name,
level=logging.ERROR) level=logging.ERROR)
return "" return ""
if contract.gateway_name and not gateway_name:
gateway_name = contract.gateway_name
# Round order price and volume to nearest incremental value # Round order price and volume to nearest incremental value
price = round_to(price, contract.pricetick) price = round_to(price, contract.pricetick)
volume = round_to(volume, contract.min_volume) volume = round_to(volume, contract.min_volume)
if stop: if stop:
if contract.stop_supported: if contract.stop_supported:
return self.send_server_stop_order(strategy, contract, direction, offset, price, volume, lock) return self.send_server_stop_order(strategy, contract, direction, offset, price, volume, lock, gateway_name)
else: else:
return self.send_local_stop_order(strategy, vt_symbol, direction, offset, price, volume, lock) return self.send_local_stop_order(strategy, vt_symbol, direction, offset, price, volume, lock, gateway_name)
else: else:
return self.send_limit_order(strategy, contract, direction, offset, price, volume, lock) return self.send_limit_order(strategy, contract, direction, offset, price, volume, lock, gateway_name)
def cancel_order(self, strategy: CtaTemplate, vt_orderid: str): def cancel_order(self, strategy: CtaTemplate, vt_orderid: str):
""" """
@ -494,7 +565,7 @@ class CtaEngine(BaseEngine):
for vt_orderid in copy(vt_orderids): for vt_orderid in copy(vt_orderids):
self.cancel_order(strategy, vt_orderid) self.cancel_order(strategy, vt_orderid)
def subscribe_symbol(self, strategy_name: str, vt_symbol: str): def subscribe_symbol(self, strategy_name: str, vt_symbol: str, gateway_name: str = '', is_bar: bool = False):
"""订阅合约""" """订阅合约"""
strategy = self.strategies.get(strategy_name, None) strategy = self.strategies.get(strategy_name, None)
if not strategy: if not strategy:
@ -502,24 +573,41 @@ class CtaEngine(BaseEngine):
contract = self.main_engine.get_contract(vt_symbol) contract = self.main_engine.get_contract(vt_symbol)
if contract: if contract:
if contract.gateway_name and not gateway_name:
gateway_name = contract.gateway_name
req = SubscribeRequest( req = SubscribeRequest(
symbol=contract.symbol, exchange=contract.exchange) symbol=contract.symbol, exchange=contract.exchange)
self.main_engine.subscribe(req, contract.gateway_name) self.main_engine.subscribe(req, gateway_name)
else:
self.write_log(msg=f"找不到合约{vt_symbol},添加到待订阅列表",
strategy_name=strategy.name)
self.pending_subcribe_symbol_map[f'{gateway_name}.{vt_symbol}'].add((strategy_name, is_bar))
try:
self.write_log(f'找不到合约{vt_symbol}信息,尝试请求所有接口')
symbol, exchange = extract_vt_symbol(vt_symbol)
req = SubscribeRequest(symbol=symbol, exchange=exchange)
req.is_bar = is_bar
self.main_engine.subscribe(req, gateway_name)
except Exception as ex:
self.write_error(u'重新订阅{}异常:{},{}'.format(vt_symbol, str(ex), traceback.format_exc()))
# 如果是订阅bar
if is_bar:
strategies = self.bar_strategy_map[vt_symbol]
if strategy not in strategies:
strategies.append(strategy)
self.bar_strategy_map.update({vt_symbol: strategies})
else:
# 添加 合约订阅 vt_symbol <=> 策略实例 strategy 映射. # 添加 合约订阅 vt_symbol <=> 策略实例 strategy 映射.
strategies = self.symbol_strategy_map[vt_symbol] strategies = self.symbol_strategy_map[vt_symbol]
strategies.append(strategy) strategies.append(strategy)
# 添加 策略名 strategy_name <=> 合约订阅 vt_symbol 的映射 # 添加 策略名 strategy_name <=> 合约订阅 vt_symbol 的映射
subscribe_symbol_set = self.strategy_symbol_map[strategy.name] subscribe_symbol_set = self.strategy_symbol_map[strategy.name]
subscribe_symbol_set.add(contract.vt_symbol) subscribe_symbol_set.add(contract.vt_symbol)
return True
else: return True
self.write_log(msg=f"行情订阅失败,找不到合约{vt_symbol}",
strategy_name=strategy.name,
level=logging.CRITICAL)
return False
@lru_cache() @lru_cache()
def get_size(self, vt_symbol: str): def get_size(self, vt_symbol: str):
@ -559,6 +647,19 @@ class CtaEngine(BaseEngine):
return None return None
def get_account(self, vt_accountid: str):
""" 查询账号的资金"""
return self.main_engine.get_account(vt_accountid)
def get_position(self, vt_symbol: str, direction: Direction, gateway_name: str = ''):
""" 查询合约在账号的持仓,需要指定方向"""
vt_position_id = f"{gateway_name}.{vt_symbol}.{direction.value}"
return self.main_engine.get_position(vt_position_id)
def get_position_holding(self, vt_symbol: str, gateway_name: str = ''):
""" 查询合约在账号的持仓(包含多空)"""
return self.offset_converter.get_position_holding(vt_symbol, gateway_name)
def get_engine_type(self): def get_engine_type(self):
"""""" """"""
return self.engine_type return self.engine_type

View File

@ -121,7 +121,7 @@ class IndexTickPublisher(BaseEngine):
c -= 1 c -= 1
self.fail_ip_dict.update({k: c}) self.fail_ip_dict.update({k: c})
self.checkStatus() self.check_status()
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def ping(self, ip, port=7709): def ping(self, ip, port=7709):
@ -144,7 +144,7 @@ class IndexTickPublisher(BaseEngine):
self.write_log(u'该服务器IP {}无响应.'.format(ip)) self.write_log(u'该服务器IP {}无响应.'.format(ip))
return timedelta(seconds=10).total_seconds() * 1000 return timedelta(seconds=10).total_seconds() * 1000
except Exception as ex: except Exception as ex:
self.write_error(u'tdx ping服务器{},异常的响应{}'.format(ip,str(ex))) self.write_error(u'tdx ping服务器{},异常的响应{}'.format(ip, str(ex)))
return timedelta(seconds=10).total_seconds() * 1000 return timedelta(seconds=10).total_seconds() * 1000
def sort_ip_speed(self): def sort_ip_speed(self):
@ -232,7 +232,7 @@ class IndexTickPublisher(BaseEngine):
# 更新 symbol_exchange_dict , symbol_market_dict # 更新 symbol_exchange_dict , symbol_market_dict
self.write_log(u'查询合约') self.write_log(u'查询合约')
self.qryInstrument() self.qry_instrument()
self.conf.update(rabbit_config) self.conf.update(rabbit_config)
self.create_publisher(self.conf) self.create_publisher(self.conf)
@ -277,7 +277,7 @@ class IndexTickPublisher(BaseEngine):
self.write_log(u'退出rabbitMQ 发布器') self.write_log(u'退出rabbitMQ 发布器')
self.pub.exit() self.pub.exit()
def checkStatus(self): def check_status(self):
# self.write_log(u'检查tdx接口状态') # self.write_log(u'检查tdx接口状态')
# 若还没有启动连接,就启动连接 # 若还没有启动连接,就启动连接
@ -290,7 +290,7 @@ class IndexTickPublisher(BaseEngine):
# self.write_log(u'tdx接口状态正常') # self.write_log(u'tdx接口状态正常')
def qryInstrument(self): def qry_instrument(self):
""" """
查询/更新合约信息 查询/更新合约信息
:return: :return:

View File

@ -0,0 +1,7 @@
# encoding: UTF-8
HEIGHT_LIST = [3, 5, 10, 'K3', 'K5', 'K10']
FUTURE_RENKO_DB_NAME = 'FutureRenko_Db'
STOCK_RENKO_DB_NAME = 'StockRenko_Db'

View File

@ -0,0 +1,34 @@
# flake8: noqa
import os
import sys
vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
if vnpy_root not in sys.path:
sys.path.append(vnpy_root)
os.environ["VNPY_TESTING"] = "1"
from vnpy.data.renko.rebuild_future import *
# Mongo数据库得地址renko数据库名tick文件缓存目录
setting = {
"host": "192.168.0.207",
"db_name": FUTURE_RENKO_DB_NAME,
"cache_folder": os.path.join(vnpy_root, 'tick_data', 'tdx', 'future')
}
builder = FutureRenkoRebuilder(setting)
# 生成单个
# builder.start(symbol='RB99',min_diff=1, height=10, start_date='2019-04-01', end_date='2019-09-10')
# 生成多个
builder.start(symbol='J99', price_tick=0.5, height=[10], start_date='2016-01-01', end_date='2016-02-16')
# 导出csv
# builder.export(symbol='RB99',height=10, start_date='2019-04-01', end_date='2019-09-10')
# 生成批量更新脚本
# builder.export_scripts()
# builder.check_all_index()
exit(0)

View File

@ -0,0 +1,35 @@
# flake8: noqa
import os
import sys
vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
if vnpy_root not in sys.path:
sys.path.append(vnpy_root)
os.environ["VNPY_TESTING"] = "1"
from vnpy.data.renko.rebuild_stock import *
# Mongo数据库得地址renko数据库名tick文件缓存目录
setting = {
"host": "192.168.0.207",
"db_name": STOCK_RENKO_DB_NAME,
"cache_folder": os.path.join(vnpy_root, 'tick_data', 'tdx', 'stock')
}
builder = StockRenkoRebuilder(setting)
# 生成单个
# builder.start(symbol='600410',min_diff=0.01, height=10, start_date='2019-04-01', end_date='2019-09-10')
# 生成多个
builder.start(symbol='123022', price_tick=0.001, height=[10, 'K3', 'K5'], start_date='2019-01-01',
end_date='2019-12-31')
# 导出csv
# builder.export(symbol='600410',height=10, start_date='2019-04-01', end_date='2019-09-10')
# 生成批量更新脚本
# builder.export_scripts()
builder.check_all_index()
exit(0)

View File

@ -1,7 +1,6 @@
# flake8: noqa # flake8: noqa
""" """
下载通达信指数合约1分钟bar => vnpy项目目录/bar_data/ 下载通达信指数合约1分钟bar => vnpy项目目录/bar_data/
""" """
import os import os
import sys import sys
@ -29,24 +28,7 @@ api_01 = TdxFutureData()
# 更新本地合约缓存信息 # 更新本地合约缓存信息
api_01.update_mi_contracts() api_01.update_mi_contracts()
# 逐一指数合约下载并更新
def bar_to_dict(bar_data: BarData):
d = OrderedDict({
'datetime': bar_data.datetime,
'symbol': bar_data.symbol,
'vt_symbol': bar_data.vt_symbol,
'exchange': bar_data.exchange.value,
'open': bar_data.open_price,
'close': bar_data.close_price,
'high': bar_data.high_price,
'low': bar_data.low_price,
'volume': bar_data.volume,
'open_interest': bar_data.open_interest,
'trading_day': bar_data.trading_day
})
return d
for underlying_symbol in api_01.future_contracts.keys(): for underlying_symbol in api_01.future_contracts.keys():
index_symbol = underlying_symbol + '99' index_symbol = underlying_symbol + '99'
print(f'开始更新:{index_symbol}') print(f'开始更新:{index_symbol}')

View File

@ -1276,6 +1276,7 @@ class TdxMdApi():
self.gateway.on_tick(tick) self.gateway.on_tick(tick)
self.gateway.on_custom_tick(tick) self.gateway.on_custom_tick(tick)
class SubMdApi(): class SubMdApi():
""" """
RabbitMQ Subscriber 数据行情接收API RabbitMQ Subscriber 数据行情接收API
@ -1360,7 +1361,7 @@ class SubMdApi():
self.gateway.write_log(u'关闭订阅器接收线程') self.gateway.write_log(u'关闭订阅器接收线程')
self.thread.join() self.thread.join()
except Exception as ex: except Exception as ex:
self.gateway.write_error(u'退出rabbitMQ行情api异常') self.gateway.write_error(u'退出rabbitMQ行情api异常:{}'.format(str(ex)))
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def subscribe(self, subscribeReq): def subscribe(self, subscribeReq):
@ -1453,22 +1454,22 @@ class TickCombiner(object):
# 以下情况,基本为单腿涨跌停,不合成价差/价格比 Tick # 以下情况,基本为单腿涨跌停,不合成价差/价格比 Tick
if (self.last_leg1_tick.ask_price_1 == 0 or self.last_leg1_tick.bid_price_1 == self.last_leg1_tick.upperLimit) \ if (self.last_leg1_tick.ask_price_1 == 0 or self.last_leg1_tick.bid_price_1 == self.last_leg1_tick.upperLimit) \
and self.last_leg1_tick.askVolume1 == 0: and self.last_leg1_tick.ask_volume_1 == 0:
self.gateway.write_log( self.gateway.write_log(
u'leg1:{0}涨停{1}不合成价差Tick'.format(self.last_leg1_tick.vtSymbol, self.last_leg1_tick.bid_price_1)) u'leg1:{0}涨停{1}不合成价差Tick'.format(self.last_leg1_tick.vtSymbol, self.last_leg1_tick.bid_price_1))
return return
if (self.last_leg1_tick.bid_price_1 == 0 or self.last_leg1_tick.ask_price_1 == self.last_leg1_tick.lowerLimit) \ if (self.last_leg1_tick.bid_price_1 == 0 or self.last_leg1_tick.ask_price_1 == self.last_leg1_tick.lowerLimit) \
and self.last_leg1_tick.bidVolume1 == 0: and self.last_leg1_tick.bid_volume_1 == 0:
self.gateway.write_log( self.gateway.write_log(
u'leg1:{0}跌停{1}不合成价差Tick'.format(self.last_leg1_tick.vtSymbol, self.last_leg1_tick.ask_price_1)) u'leg1:{0}跌停{1}不合成价差Tick'.format(self.last_leg1_tick.vtSymbol, self.last_leg1_tick.ask_price_1))
return return
if (self.last_leg2_tick.ask_price_1 == 0 or self.last_leg2_tick.bid_price_1 == self.last_leg2_tick.upperLimit) \ if (self.last_leg2_tick.ask_price_1 == 0 or self.last_leg2_tick.bid_price_1 == self.last_leg2_tick.upperLimit) \
and self.last_leg2_tick.askVolume1 == 0: and self.last_leg2_tick.ask_volume_1 == 0:
self.gateway.write_log( self.gateway.write_log(
u'leg2:{0}涨停{1}不合成价差Tick'.format(self.last_leg2_tick.vtSymbol, self.last_leg2_tick.bid_price_1)) u'leg2:{0}涨停{1}不合成价差Tick'.format(self.last_leg2_tick.vtSymbol, self.last_leg2_tick.bid_price_1))
return return
if (self.last_leg2_tick.bid_price_1 == 0 or self.last_leg2_tick.ask_price_1 == self.last_leg2_tick.lowerLimit) \ if (self.last_leg2_tick.bid_price_1 == 0 or self.last_leg2_tick.ask_price_1 == self.last_leg2_tick.lowerLimit) \
and self.last_leg2_tick.bidVolume1 == 0: and self.last_leg2_tick.bid_volume_1 == 0:
self.gateway.write_log( self.gateway.write_log(
u'leg2:{0}跌停{1}不合成价差Tick'.format(self.last_leg2_tick.vtSymbol, self.last_leg2_tick.ask_price_1)) u'leg2:{0}跌停{1}不合成价差Tick'.format(self.last_leg2_tick.vtSymbol, self.last_leg2_tick.ask_price_1))
return return
@ -1509,16 +1510,21 @@ class TickCombiner(object):
value=self.last_leg1_tick.pre_close * self.leg1_ratio - self.last_leg2_tick.pre_close * self.leg2_ratio) value=self.last_leg1_tick.pre_close * self.leg1_ratio - self.last_leg2_tick.pre_close * self.leg2_ratio)
# 开盘价 # 开盘价
if self.last_leg2_tick.open_price > 0 and self.last_leg1_tick.open_price > 0: if self.last_leg2_tick.open_price > 0 and self.last_leg1_tick.open_price > 0:
spread_tick.openPrice = round_to(target=self.price_tick, spread_tick.open_price = round_to(target=self.price_tick,
value=self.last_leg1_tick.open_price * self.leg1_ratio - self.last_leg2_tick.open_price * self.leg2_ratio) value=self.last_leg1_tick.open_price * self.leg1_ratio - self.last_leg2_tick.open_price * self.leg2_ratio)
# 最高价 # 最高价
self.spread_high = spread_tick.ask_price_1 if self.spread_high is None else max(self.spread_high, if self.spread_high:
spread_tick.ask_price_1) self.spread_high = max(self.spread_high, spread_tick.ask_price_1)
else:
self.spread_high = spread_tick.ask_price_1
spread_tick.high_price = self.spread_high spread_tick.high_price = self.spread_high
# 最低价 # 最低价
self.spread_low = spread_tick.bid_price_1 if self.spread_low is None else min(self.spread_low, if self.spread_low:
spread_tick.bid_price_1) self.spread_low = min(self.spread_low, spread_tick.bid_price_1)
else:
self.spread_low = spread_tick.bid_price_1
spread_tick.low_price = self.spread_low spread_tick.low_price = self.spread_low
self.gateway.on_tick(spread_tick) self.gateway.on_tick(spread_tick)
@ -1537,33 +1543,38 @@ class TickCombiner(object):
ratio_tick.ask_price_1 = round_to(target=self.price_tick, ratio_tick.ask_price_1 = round_to(target=self.price_tick,
value=100 * self.last_leg1_tick.ask_price_1 * self.leg1_ratio / ( value=100 * self.last_leg1_tick.ask_price_1 * self.leg1_ratio / (
self.last_leg2_tick.bid_price_1 * self.leg2_ratio)) self.last_leg2_tick.bid_price_1 * self.leg2_ratio))
ratio_tick.askVolume1 = min(self.last_leg1_tick.askVolume1, self.last_leg2_tick.bidVolume1) ratio_tick.ask_volume_1 = min(self.last_leg1_tick.ask_volume_1, self.last_leg2_tick.bid_volume_1)
ratio_tick.bid_price_1 = round_to(target=self.price_tick, ratio_tick.bid_price_1 = round_to(target=self.price_tick,
value=100 * self.last_leg1_tick.bid_price_1 * self.leg1_ratio / ( value=100 * self.last_leg1_tick.bid_price_1 * self.leg1_ratio / (
self.last_leg2_tick.ask_price_1 * self.leg2_ratio)) self.last_leg2_tick.ask_price_1 * self.leg2_ratio))
ratio_tick.bidVolume1 = min(self.last_leg1_tick.bidVolume1, self.last_leg2_tick.askVolume1) ratio_tick.bid_volume_1 = min(self.last_leg1_tick.bid_volume_1, self.last_leg2_tick.ask_volume_1)
ratio_tick.lastPrice = round_to(target=self.price_tick, ratio_tick.lastPrice = round_to(target=self.price_tick,
value=(ratio_tick.ask_price_1 + ratio_tick.bid_price_1) / 2) value=(ratio_tick.ask_price_1 + ratio_tick.bid_price_1) / 2)
# 昨收盘价 # 昨收盘价
if self.last_leg2_tick.preClosePrice > 0 and self.last_leg1_tick.preClosePrice > 0: if self.last_leg2_tick.pre_close > 0 and self.last_leg1_tick.pre_close > 0:
ratio_tick.preClosePrice = round_to(target=self.price_tick, ratio_tick.pre_close = round_to(target=self.price_tick,
value=100 * self.last_leg1_tick.preClosePrice * self.leg1_ratio / ( value=100 * self.last_leg1_tick.pre_close * self.leg1_ratio / (
self.last_leg2_tick.preClosePrice * self.leg2_ratio)) self.last_leg2_tick.pre_close * self.leg2_ratio))
# 开盘价 # 开盘价
if self.last_leg2_tick.openPrice > 0 and self.last_leg1_tick.openPrice > 0: if self.last_leg2_tick.open_price > 0 and self.last_leg1_tick.open_price > 0:
ratio_tick.openPrice = round_to(target=self.price_tick, ratio_tick.open_price = round_to(target=self.price_tick,
value=100 * self.last_leg1_tick.openPrice * self.leg1_ratio / ( value=100 * self.last_leg1_tick.open_price * self.leg1_ratio / (
self.last_leg2_tick.openPrice * self.leg2_ratio)) self.last_leg2_tick.open_price * self.leg2_ratio))
# 最高价 # 最高价
self.ratio_high = ratio_tick.ask_price_1 if self.ratio_high is None else max(self.ratio_high, if self.ratio_high:
ratio_tick.ask_price_1) self.ratio_high = max(self.ratio_high, ratio_tick.ask_price_1)
else:
self.ratio_high = ratio_tick.ask_price_1
ratio_tick.high_price = self.spread_high ratio_tick.high_price = self.spread_high
# 最低价 # 最低价
self.ratio_low = ratio_tick.bid_price_1 if self.ratio_low is None else min(self.ratio_low, if self.ratio_low:
ratio_tick.bid_price_1) self.ratio_low = min(self.ratio_low, ratio_tick.bid_price_1)
else:
self.ratio_low = ratio_tick.bid_price_1
ratio_tick.low_price = self.spread_low ratio_tick.low_price = self.spread_low
self.gateway.on_tick(ratio_tick) self.gateway.on_tick(ratio_tick)

View File

@ -0,0 +1,93 @@
# flake8: noqa
import sys
import os
import traceback
from time import sleep
vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
if vnpy_root not in sys.path:
print(u'append {}'.format(vnpy_root))
sys.path.append(vnpy_root)
from vnpy.gateway.ctptest import CtptestGateway
from vnpy.event import EventEngine
from vnpy.trader.constant import Exchange
from vnpy.trader.event import (
EVENT_TICK,
EVENT_ORDER,
EVENT_TRADE,
EVENT_POSITION,
EVENT_ACCOUNT,
EVENT_LOG,
)
from vnpy.trader.object import (
SubscribeRequest,
)
# 这里放期货公司需要你连接的测试系统的相关信息
ctp_setting = {
"用户名": "xxx",
"密码": "xxx",
"经纪商代码": "9999",
"交易服务器": "tcp://180.168.146.187:10100",
"行情服务器": "tcp://180.168.146.187:10110",
"产品名称": "simnow_client_test",
"授权编码": "0000000000000000",
"产品信息": ""
}
def test():
"""测试"""
from qtpy import QtCore
import sys
def print_log(event):
log = event.data
print(f'{log.time}: {log.msg}\n')
def print_event(event):
data = event.data
print(f'{data.__dict__}')
app = QtCore.QCoreApplication(sys.argv)
event_engine = EventEngine()
event_engine.register(EVENT_LOG, print_log)
event_engine.register(EVENT_TICK, print_event)
event_engine.register(EVENT_ACCOUNT, print_event)
event_engine.register(EVENT_ORDER, print_event)
event_engine.register(EVENT_TRADE, print_event)
event_engine.register(EVENT_POSITION, print_event)
event_engine.start()
gateway = CtptestGateway(event_engine)
gateway.connect(ctp_setting)
# gateway.connect()
auto_subscribe_symbols = ['rb2010']
for symbol in auto_subscribe_symbols:
print(u'自动订阅合约:{}'.format(symbol))
sub = SubscribeRequest(symbol=symbol, exchange=Exchange.SHFE)
sub.symbol = symbol
gateway.subscribe(sub)
couter = 20
gateway.init_query()
while couter > 0:
print(u'{}'.format(couter))
sleep(1)
couter -= 1
sys.exit(app.exec_())
if __name__ == '__main__':
try:
test()
except Exception as ex:
print(u'异常:{},{}'.format(str(ex), traceback.format_exc()), file=sys.stderr)
print('Finished')

View File

@ -13,7 +13,9 @@ from vnpy.trader.constant import (Direction, Offset, Exchange)
class OffsetConverter: class OffsetConverter:
"""""" """
仓位转换
"""
def __init__(self, main_engine: MainEngine): def __init__(self, main_engine: MainEngine):
"""""" """"""
@ -25,7 +27,7 @@ class OffsetConverter:
if not self.is_convert_required(position.vt_symbol): if not self.is_convert_required(position.vt_symbol):
return return
holding = self.get_position_holding(position.vt_symbol) holding = self.get_position_holding(position.vt_symbol, position.gateway_name)
holding.update_position(position) holding.update_position(position)
def update_trade(self, trade: TradeData): def update_trade(self, trade: TradeData):
@ -33,7 +35,7 @@ class OffsetConverter:
if not self.is_convert_required(trade.vt_symbol): if not self.is_convert_required(trade.vt_symbol):
return return
holding = self.get_position_holding(trade.vt_symbol) holding = self.get_position_holding(trade.vt_symbol, trade.gateway_name)
holding.update_trade(trade) holding.update_trade(trade)
def update_order(self, order: OrderData): def update_order(self, order: OrderData):
@ -41,32 +43,33 @@ class OffsetConverter:
if not self.is_convert_required(order.vt_symbol): if not self.is_convert_required(order.vt_symbol):
return return
holding = self.get_position_holding(order.vt_symbol) holding = self.get_position_holding(order.vt_symbol, order.gateway_name)
holding.update_order(order) holding.update_order(order)
def update_order_request(self, req: OrderRequest, vt_orderid: str): def update_order_request(self, req: OrderRequest, vt_orderid: str, gateway_name: str = ''):
"""""" """"""
if not self.is_convert_required(req.vt_symbol): if not self.is_convert_required(req.vt_symbol):
return return
holding = self.get_position_holding(req.vt_symbol) holding = self.get_position_holding(req.vt_symbol, gateway_name)
holding.update_order_request(req, vt_orderid) holding.update_order_request(req, vt_orderid)
def get_position_holding(self, vt_symbol: str): def get_position_holding(self, vt_symbol: str, gateway_name: str = ''):
"""""" """获取持仓信息"""
holding = self.holdings.get(vt_symbol, None) k = f'{gateway_name}.{vt_symbol}'
holding = self.holdings.get(k, None)
if not holding: if not holding:
contract = self.main_engine.get_contract(vt_symbol) contract = self.main_engine.get_contract(vt_symbol)
holding = PositionHolding(contract) holding = PositionHolding(contract)
self.holdings[vt_symbol] = holding self.holdings[k] = holding
return holding return holding
def convert_order_request(self, req: OrderRequest, lock: bool): def convert_order_request(self, req: OrderRequest, lock: bool, gateway_name: str = ''):
"""""" """"""
if not self.is_convert_required(req.vt_symbol): if not self.is_convert_required(req.vt_symbol):
return [req] return [req]
holding = self.get_position_holding(req.vt_symbol) holding = self.get_position_holding(req.vt_symbol, gateway_name)
if lock: if lock:
return holding.convert_order_request_lock(req) return holding.convert_order_request_lock(req)

View File

@ -162,10 +162,16 @@ class MainEngine:
def subscribe(self, req: SubscribeRequest, gateway_name: str): def subscribe(self, req: SubscribeRequest, gateway_name: str):
""" """
Subscribe tick data update of a specific gateway. Subscribe tick data update of a specific gateway.
如果没有指定gateway那么所有的gateway都会接收改订阅请求
""" """
gateway = self.get_gateway(gateway_name) if gateway_name:
if gateway: gateway = self.get_gateway(gateway_name)
gateway.subscribe(req) if gateway:
gateway.subscribe(req)
else:
for gateway in self.gateways.items():
if gateway:
gateway.subscribe(req)
def send_order(self, req: OrderRequest, gateway_name: str): def send_order(self, req: OrderRequest, gateway_name: str):
""" """

View File

@ -5,6 +5,7 @@ Event type string used in VN Trader.
from vnpy.event import EVENT_TIMER # noqa from vnpy.event import EVENT_TIMER # noqa
EVENT_TICK = "eTick." EVENT_TICK = "eTick."
EVENT_BAR = "eBar."
EVENT_TRADE = "eTrade." EVENT_TRADE = "eTrade."
EVENT_ORDER = "eOrder." EVENT_ORDER = "eOrder."
EVENT_POSITION = "ePosition." EVENT_POSITION = "ePosition."

View File

@ -206,7 +206,7 @@ class PositionData(BaseData):
def __post_init__(self): def __post_init__(self):
"""""" """"""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}" self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
self.vt_positionid = f"{self.vt_symbol}.{self.direction.value}" self.vt_positionid = f"{self.gateway_name}.{self.vt_symbol}.{self.direction.value}"
@dataclass @dataclass
@ -273,11 +273,12 @@ class ContractData(BaseData):
@dataclass @dataclass
class SubscribeRequest: class SubscribeRequest:
""" """
Request sending to specific gateway for subscribing tick data update. Request sending to specific gateway for subscribing tick/bar data update.
""" """
symbol: str symbol: str
exchange: Exchange exchange: Exchange
is_bar: bool = False
def __post_init__(self): def __post_init__(self):
"""""" """"""