[update] 一般更新

This commit is contained in:
msincenselee 2022-03-19 23:19:22 +08:00
parent b555a13943
commit 2dcbcab0a6
17 changed files with 1352 additions and 152 deletions

View File

@ -1,18 +1,20 @@
# encoding: UTF-8 # encoding: UTF-8
import pika import pika
import sys
class base_broker(): class base_broker():
def __init__(self, host='localhost', port=5672, user='guest', password='guest', def __init__(self, host='localhost', port=5672, user='guest', password='guest',
channel_number=1): channel_number=1):
""" """
:param host: 连接rabbitmq的服务器地址或者群集地址 :param host: 连接rabbitmq的服务器地址或者群集地址,或者多台主机地址使用;分隔开
:param port: 端口 :param port: 端口
:param user: 用户名 :param user: 用户名
:param password: 密码 :param password: 密码
:param channel_number: 频道的数字大于1 :param channel_number: 频道的数字大于1
""" """
self.host = host self.host = host
self.port = port self.port = port
self.user = user self.user = user
self.password = password self.password = password
@ -21,16 +23,26 @@ class base_broker():
# 身份鉴权 # 身份鉴权
self.credentials = pika.PlainCredentials(self.user, self.password, erase_on_connect=True) self.credentials = pika.PlainCredentials(self.user, self.password, erase_on_connect=True)
if ';' in self.host:
hosts = self.host.split(';')
else:
hosts = [self.host]
# 多个连接服务器时,使用
for _host_ in hosts:
try:
# 创建连接 # 创建连接
self.connection = pika.BlockingConnection( self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host, port=self.port, pika.ConnectionParameters(host=_host_, port=self.port,
credentials=self.credentials, credentials=self.credentials,
heartbeat=0, socket_timeout=5)) heartbeat=0, socket_timeout=5))
# 创建一个频道,或者指定频段数字编号 # 创建一个频道,或者指定频段数字编号
self.channel = self.connection.channel( self.channel = self.connection.channel(
channel_number=self.channel_number) channel_number=self.channel_number)
except:
print(f'pika rabbit connect to {_host_} {self.port} fail', file=sys.stderr)
else:
break
def reconnect(self): def reconnect(self):
""" """

View File

@ -31,13 +31,3 @@
} }
} }
} }
# 创建mongodb 索引,提高性能
db.today_orders.createIndex({'account_id':1,'vt_symbol':1,'sys_orderid':1,'order_date':1,'holder_id':1},{'name':'accountid_vtsymbol_sysorderid_order_date_holder_id','unique':true})
db.history_orders.createIndex({'account_id':1,'vt_symbol':1,'sys_orderid':1,'order_date':1,'holder_id':1},{'name':'history_accountid_vtsymbol_sysorderid_order_date_holder_id'})
db.today_trades.createIndex({'account_id':1,'vt_symbol':1,'vt_tradeid':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtSymbol_vt_tradeid_trade_date_holder_id','unique':true})
db.history_trades.createIndex({'account_id':1,'vt_symbol':1,'vt_tradeid':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtSymbol_vt_tradeid_trade_date_holder_id'})
db.today_positions.createIndex({'account_id':1,'vt_symbol':1,'direction':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtsymbol_direction_trade_date_holder_id'})
db.today_strategy_pos.createIndex({'account_id':1,'strategy_group':1,'strategy_name':1,'date':1},{'name':'accountid_strategy_group_strategy_name_date'})
db.strategy_snapshot.createIndex({'account_id':1,'strategy_group':1,'strategy':1,'guid':1,'datetime':1},{'name':'accountid_strategy_name_guid'})

View File

@ -42,7 +42,7 @@ from vnpy.trader.event import (
) )
from vnpy.trader.constant import Direction, Exchange, Status from vnpy.trader.constant import Direction, Exchange, Status
from vnpy.trader.engine import BaseEngine, MainEngine from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.utility import get_trading_date, load_json, save_json from vnpy.trader.utility import get_trading_date, load_json, save_json,print_dict
from vnpy.data.mongo.mongo_data import MongoData from vnpy.data.mongo.mongo_data import MongoData
# 入库 # 入库
@ -487,7 +487,7 @@ class AccountRecorder(BaseEngine):
price = self.main_engine.get_price(pos.vt_symbol) price = self.main_engine.get_price(pos.vt_symbol)
if price: if price:
data.update({'cur_price': price}) data.update({'cur_price': price})
# self.write_log('update position:{}'.format(print_dict(data)))
self.update_data(db_name=ACCOUNT_DB_NAME, col_name=TODAY_POSITION_COL, fld=fld, data=data) self.update_data(db_name=ACCOUNT_DB_NAME, col_name=TODAY_POSITION_COL, fld=fld, data=data)
def update_strategy_snapshot(self, event: Event): def update_strategy_snapshot(self, event: Event):
@ -570,7 +570,7 @@ class AccountRecorder(BaseEngine):
pos_data = copy.copy(data) pos_data = copy.copy(data)
pos_data.update({'account_id': pos_data.get('accountid')}) pos_data.update({'account_id': pos_data.get('accountid')})
pos_data.update({'datetime': dt.strftime("%Y-%m-%d %H:%M:%S")}) pos_data.update({'datetime': dt.strftime("%Y-%m-%d %H:%M:%S")})
self.write_log(f'stratgy_pos event:{print_dict(pos_data)}')
self.update_data(db_name=ACCOUNT_DB_NAME, col_name=TODAY_STRATEGY_POS_COL, fld=fld, data=pos_data) self.update_data(db_name=ACCOUNT_DB_NAME, col_name=TODAY_STRATEGY_POS_COL, fld=fld, data=pos_data)
def process_gw_error(self, event: Event): def process_gw_error(self, event: Event):

View File

@ -63,6 +63,8 @@ from vnpy.trader.utility import (
from vnpy.trader.util_logger import setup_logger, logging from vnpy.trader.util_logger import setup_logger, logging
from vnpy.trader.util_wechat import send_wx_msg from vnpy.trader.util_wechat import send_wx_msg
from vnpy.data.mongo.mongo_data import MongoData
from vnpy.trader.setting import SETTINGS
from vnpy.trader.converter import OffsetConverter from vnpy.trader.converter import OffsetConverter
from .base import ( from .base import (
@ -120,6 +122,9 @@ class CtaEngine(BaseEngine):
# "trade_2_wx": true # 是否交易记录转发至微信通知 # "trade_2_wx": true # 是否交易记录转发至微信通知
# "event_log: false # 是否转发日志到event bus显示在图形界面 # "event_log: false # 是否转发日志到event bus显示在图形界面
# "snapshot2file": false # 是否保存切片到文件 # "snapshot2file": false # 是否保存切片到文件
# "compare_pos": false # False强制不进行 账号 <=> 引擎实例 得仓位比对。一般分布式RPC运行时其他得实例都不进行比对
# "get_pos_from_db": false # True使用数据库得 策略<=>pos 数据作为比较一般分布式RPC运行时其中一个使用即可; False使用当前引擎实例得 策略.pos进行比对
self.engine_config = {} self.engine_config = {}
# 是否激活 write_log写入event bus(比较耗资源) # 是否激活 write_log写入event bus(比较耗资源)
self.event_log = False self.event_log = False
@ -171,6 +176,26 @@ class CtaEngine(BaseEngine):
self.write_log("CTA策略引擎初始化成功") self.write_log("CTA策略引擎初始化成功")
if self.engine_config.get('get_pos_from_db', False):
self.write_log(f'激活数据库策略仓位比对模式')
self.init_mongo_data()
def init_mongo_data(self):
"""初始化hams数据库"""
host = SETTINGS.get('hams.host', 'localhost')
port = SETTINGS.get('hams.port', 27017)
self.write_log(f'初始化hams数据库连接:{host}:{port}')
try:
# Mongo数据连接客户端
self.mongo_data = MongoData(host=host, port=port)
if self.mongo_data and self.mongo_data.db_has_connected:
self.write_log(f'连接成功')
else:
self.write_error(f'HAMS数据库{host}:{port}连接异常.')
except Exception as ex:
self.write_error(f'HAMS数据库{host}:{port}连接异常.{str(ex)}')
def close(self): def close(self):
"""停止所属有的策略""" """停止所属有的策略"""
self.stop_all_strategies() self.stop_all_strategies()
@ -1713,7 +1738,8 @@ class CtaEngine(BaseEngine):
pos_list.append(leg1_pos) pos_list.append(leg1_pos)
pos_list.append(leg2_pos) pos_list.append(leg2_pos)
else:
pos_list.append(pos)
except Exception as ex: except Exception as ex:
self.write_error(f'分解SPD失败:{str(ex)}') self.write_error(f'分解SPD失败:{str(ex)}')
@ -1749,6 +1775,33 @@ class CtaEngine(BaseEngine):
return strategy_pos_list return strategy_pos_list
def get_all_strategy_pos_from_hams(self):
"""
获取hams中该账号下所有策略仓位明细
"""
strategy_pos_dict = {}
if not self.mongo_data:
self.init_mongo_data()
if self.mongo_data and self.mongo_data.db_has_connected:
filter = {'account_id': self.engine_config.get('accountid', '-')}
pos_list = self.mongo_data.db_query(
db_name='Account',
col_name='today_strategy_pos',
filter_dict=filter
)
for pos in pos_list:
s_name = pos.get('strategy_name', None)
if s_name:
if s_name not in strategy_pos_dict:
strategy_pos_dict[s_name] = pos
continue
if pos.get('datetime', '') > strategy_pos_dict[s_name].get('datetime', ''):
strategy_pos_dict[s_name] = pos
return list(strategy_pos_dict.values())
def get_strategy_class_parameters(self, class_name: str): def get_strategy_class_parameters(self, class_name: str):
""" """
Get default parameters of a strategy class. Get default parameters of a strategy class.
@ -1810,9 +1863,14 @@ class CtaEngine(BaseEngine):
self.write_log(u'开始对比账号&策略的持仓') self.write_log(u'开始对比账号&策略的持仓')
# 获取当前策略得持仓 # 获取hams数据库中所有运行实例得策略
if self.engine_config.get("get_pos_from_db", False):
strategy_pos_list = self.get_all_strategy_pos_from_hams()
else:
# 获取当前实例运行策略得持仓
if len(strategy_pos_list) == 0: if len(strategy_pos_list) == 0:
strategy_pos_list = self.get_all_strategy_pos() strategy_pos_list = self.get_all_strategy_pos()
self.write_log(u'策略持仓清单:{}'.format(strategy_pos_list)) self.write_log(u'策略持仓清单:{}'.format(strategy_pos_list))
none_strategy_pos = self.get_none_strategy_pos_list() none_strategy_pos = self.get_none_strategy_pos_list()
@ -1888,53 +1946,44 @@ class CtaEngine(BaseEngine):
pos_compare_result = '' pos_compare_result = ''
# 精简输出 # 精简输出
compare_info = '' compare_info = ''
diff_pos_dict = {} diff_pos_dict = {} # 短合约: {'long_diff':xx, 'short_diff',xx)
for vt_symbol in sorted(vt_symbols): for vt_symbol in sorted(vt_symbols):
# 发送不一致得结果 # 发送不一致得结果
symbol_pos = compare_pos.pop(vt_symbol, {}) symbol_pos = compare_pos.pop(vt_symbol, {})
#
d_long = { # # 股指期货: 帐号多/空轧差, vs 策略多空轧差 是否一致;
'account_id': self.engine_config.get('accountid', '-'), # # 其他期货:帐号多单 vs 除了多单, 空单 vs 空单
'vt_symbol': vt_symbol, # if vt_symbol.endswith(".CFFEX"):
'direction': Direction.LONG.value, # diff_match = (symbol_pos.get('账号多单', 0) - symbol_pos.get('账号空单', 0)) == (
'strategy_list': symbol_pos.get('多单策略', [])} # symbol_pos.get('策略多单', 0) - symbol_pos.get('策略空单', 0))
# pos_match = symbol_pos.get('账号空单', 0) == symbol_pos.get('策略空单', 0) and \
d_short = { # symbol_pos.get('账号多单', 0) == symbol_pos.get('策略多单', 0)
'account_id': self.engine_config.get('accountid', '-'), # match = diff_match
'vt_symbol': vt_symbol, # # 轧差一致,帐号/策略持仓不一致
'direction': Direction.SHORT.value, # if diff_match and not pos_match:
'strategy_list': symbol_pos.get('空单策略', [])} # if symbol_pos.get('账号多单', 0) > symbol_pos.get('策略多单', 0):
# self.write_log('{}轧差持仓:多:{},空:{} 大于 策略持仓 多:{},空:{}'.format(
# 股指期货: 帐号多/空轧差, vs 策略多空轧差 是否一致; # vt_symbol,
# 其他期货:帐号多单 vs 除了多单, 空单 vs 空单 # symbol_pos.get('账号多单', 0),
if vt_symbol.endswith(".CFFEX"): # symbol_pos.get('账号空单', 0),
diff_match = (symbol_pos.get('账号多单', 0) - symbol_pos.get('账号空单', 0)) == ( # symbol_pos.get('策略多单', 0),
symbol_pos.get('策略多单', 0) - symbol_pos.get('策略空单', 0)) # symbol_pos.get('策略空单', 0)
pos_match = symbol_pos.get('账号空单', 0) == symbol_pos.get('策略空单', 0) and \ # ))
symbol_pos.get('账号多单', 0) == symbol_pos.get('策略多单', 0) # diff_pos_dict.update({vt_symbol: {"long": symbol_pos.get('账号多单', 0) - symbol_pos.get('策略多单', 0),
match = diff_match # "short": symbol_pos.get('账号空单', 0) - symbol_pos.get('策略空单',
# 轧差一致,帐号/策略持仓不一致 # 0)}})
if diff_match and not pos_match: # else:
if symbol_pos.get('账号多单', 0) > symbol_pos.get('策略多单', 0):
self.write_log('{}轧差持仓:多:{},空:{} 大于 策略持仓 多:{},空:{}'.format(
vt_symbol,
symbol_pos.get('账号多单', 0),
symbol_pos.get('账号空单', 0),
symbol_pos.get('策略多单', 0),
symbol_pos.get('策略空单', 0)
))
diff_pos_dict.update({vt_symbol: {"long": symbol_pos.get('账号多单', 0) - symbol_pos.get('策略多单', 0),
"short": symbol_pos.get('账号空单', 0) - symbol_pos.get('策略空单',
0)}})
else:
match = round(symbol_pos.get('账号空单', 0), 7) == round(symbol_pos.get('策略空单', 0), 7) and \ match = round(symbol_pos.get('账号空单', 0), 7) == round(symbol_pos.get('策略空单', 0), 7) and \
round(symbol_pos.get('账号多单', 0), 7) == round(symbol_pos.get('策略多单', 0), 7) round(symbol_pos.get('账号多单', 0), 7) == round(symbol_pos.get('策略多单', 0), 7)
# 多空都一致 # 多空都一致
if match: if match:
msg = u'{}多空都一致.{}\n'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False)) msg = u'{}多空都一致.{}\n'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False))
self.write_log(msg) self.write_log(msg)
compare_info += msg compare_info += msg
else: else:
# 多空不一致
pos_compare_result += '\n{}: '.format(vt_symbol) pos_compare_result += '\n{}: '.format(vt_symbol)
# 判断是多单不一致? # 判断是多单不一致?
diff_long_volume = round(symbol_pos.get('账号多单', 0), 7) - round(symbol_pos.get('策略多单', 0), 7) diff_long_volume = round(symbol_pos.get('账号多单', 0), 7) - round(symbol_pos.get('策略多单', 0), 7)
@ -1946,8 +1995,16 @@ class CtaEngine(BaseEngine):
symbol_pos.get('策略多单')) symbol_pos.get('策略多单'))
pos_compare_result += msg pos_compare_result += msg
self.write_error(u'{}不一致:{}'.format(vt_symbol, msg)) self.write_log(u'{}不一致:{}'.format(vt_symbol, msg))
compare_info += u'{}不一致:{}\n'.format(vt_symbol, msg) compare_info += u'{}不一致:{}\n'.format(vt_symbol, msg)
# 登记短合约得差别
underlying_symbol = get_underlying_symbol(vt_symbol.split('.')[0])
diff_pos = diff_pos_dict.get(underlying_symbol, {})
diff_pos.update({'多单': diff_long_volume + diff_pos.get('多单', 0)})
diff_pos_dict[underlying_symbol] = diff_pos
# 自动平衡
if auto_balance: if auto_balance:
self.balance_pos(vt_symbol, Direction.LONG, diff_long_volume) self.balance_pos(vt_symbol, Direction.LONG, diff_long_volume)
@ -1961,13 +2018,24 @@ class CtaEngine(BaseEngine):
symbol_pos.get('空单策略'), symbol_pos.get('空单策略'),
symbol_pos.get('策略空单')) symbol_pos.get('策略空单'))
pos_compare_result += msg pos_compare_result += msg
self.write_error(u'{}不一致:{}'.format(vt_symbol, msg)) self.write_log(u'{}不一致:{}'.format(vt_symbol, msg))
compare_info += u'{}不一致:{}\n'.format(vt_symbol, msg) compare_info += u'{}不一致:{}\n'.format(vt_symbol, msg)
# 登记短合约得差别
underlying_symbol = get_underlying_symbol(vt_symbol.split('.')[0])
diff_pos = diff_pos_dict.get(underlying_symbol, {})
diff_pos.update({'空单': diff_short_volume + diff_pos.get('空单', 0)})
diff_pos_dict[underlying_symbol] = diff_pos
# 自动平衡仓位
if auto_balance: if auto_balance:
self.balance_pos(vt_symbol, Direction.SHORT, diff_short_volume) self.balance_pos(vt_symbol, Direction.SHORT, diff_short_volume)
# 统计所有轧差偏差得
diff_underlying = sum([d.get('多单', 0) - d.get('空单', 0) for d in list(diff_pos_dict.values())])
# 不匹配输入到stdErr通道 # 不匹配输入到stdErr通道
if pos_compare_result != '': if pos_compare_result != '' and diff_underlying != 0:
msg = u'账户{}持仓不匹配: {}' \ msg = u'账户{}持仓不匹配: {}' \
.format(self.engine_config.get('accountid', '-'), .format(self.engine_config.get('accountid', '-'),
pos_compare_result) pos_compare_result)
@ -1982,9 +2050,6 @@ class CtaEngine(BaseEngine):
return True, compare_info + ret_msg return True, compare_info + ret_msg
else: else:
self.write_log(u'账户持仓与策略一致') self.write_log(u'账户持仓与策略一致')
if len(diff_pos_dict) > 0:
for k, v in diff_pos_dict.items():
self.write_log(f'{k} 存在大于策略的轧差持仓:{v}')
return True, compare_info return True, compare_info
def balance_pos(self, vt_symbol, direction, volume): def balance_pos(self, vt_symbol, direction, volume):

View File

@ -13,7 +13,7 @@ from logging import INFO, ERROR
from datetime import datetime from datetime import datetime
from vnpy.trader.constant import Interval, Direction, Offset, Status, OrderType, Color, Exchange from vnpy.trader.constant import Interval, Direction, Offset, Status, OrderType, Color, Exchange
from vnpy.trader.object import BarData, TickData, OrderData, TradeData from vnpy.trader.object import BarData, TickData, OrderData, TradeData
from vnpy.trader.utility import virtual, append_data, extract_vt_symbol, get_underlying_symbol from vnpy.trader.utility import virtual, append_data, extract_vt_symbol, get_underlying_symbol,print_dict
from .base import StopOrder, EngineType from .base import StopOrder, EngineType
from vnpy.component.cta_grid_trade import CtaGrid, CtaGridTrade, LOCK_GRID from vnpy.component.cta_grid_trade import CtaGrid, CtaGridTrade, LOCK_GRID
@ -75,7 +75,7 @@ class CtaTemplate(ABC):
""" """
class_parameters = {} class_parameters = {}
for name in cls.parameters: for name in cls.parameters:
class_parameters[name] = getattr(cls, name) class_parameters[name] = getattr(cls, name,"")
return class_parameters return class_parameters
def get_parameters(self): def get_parameters(self):
@ -84,7 +84,7 @@ class CtaTemplate(ABC):
""" """
strategy_parameters = {} strategy_parameters = {}
for name in self.parameters: for name in self.parameters:
strategy_parameters[name] = getattr(self, name) strategy_parameters[name] = getattr(self, name, "")
return strategy_parameters return strategy_parameters
def get_variables(self): def get_variables(self):
@ -187,6 +187,31 @@ class CtaTemplate(ABC):
""" """
pass pass
def exist_order(self, vt_symbol, direction, offset):
"""
是否存在相同得委托
:param vt_symbol:
:param direction:
:param offset:
:return:
"""
if len(self.active_orders) == 0:
self.write_log(f'当前活动订单数量为零。查询条件:{vt_symbol},方向:{direction.value}, 开平:{offset.value}')
return False
for orderid, order in self.active_orders.items():
# self.write_log(f'当前活动订单:\n{print_dict(order)}')
if offset != Offset.OPEN: # 平昨、平今、平仓
offset_cond = order['offset'] != Offset.OPEN
else: # 开仓
offset_cond = order['offset'] == offset
if order['vt_symbol'] == vt_symbol and order['direction'] == direction and offset_cond:
self.write_log(f'存在相同活动订单。查询条件:{vt_symbol},方向:{direction.value}, 开平:{offset.value}')
return True
return False
def buy(self, price: float, volume: float, stop: bool = False, lock: bool = False, def buy(self, price: float, volume: float, stop: bool = False, lock: bool = False,
vt_symbol: str = '', order_type: OrderType = OrderType.LIMIT, vt_symbol: str = '', order_type: OrderType = OrderType.LIMIT,
order_time: datetime = None, grid: CtaGrid = None): order_time: datetime = None, grid: CtaGrid = None):

File diff suppressed because it is too large Load Diff

View File

@ -100,7 +100,8 @@ class IndexTickPublisherV2(BaseEngine):
exchange=conf.get('exchange', 'x_fanout_idx_tick')) exchange=conf.get('exchange', 'x_fanout_idx_tick'))
self.write_log(f'创建发布器成功') self.write_log(f'创建发布器成功')
except Exception as ex: except Exception as ex:
self.write_log(u'创建tick发布器异常:{}'.format(str(ex))) self.write_error(u'创建tick发布器异常:{}'.format(str(ex)))
self.write_error(traceback.format_exc())
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def registerEvent(self): def registerEvent(self):

View File

@ -259,6 +259,7 @@ class BinanceFutureData(RestClient):
contracts = load_json(f, auto_save=False) contracts = load_json(f, auto_save=False)
return contracts return contracts
def save_contracts(self): def save_contracts(self):
"""保存合约配置""" """保存合约配置"""
contracts = self.get_contracts() contracts = self.get_contracts()

View File

@ -181,8 +181,10 @@ class MongoData(object):
return [] return []
def db_query_by_sort(self, db_name, col_name, filter_dict, sort_name, sort_type, limitNum=0): def db_query_by_sort(self, db_name, col_name, filter_dict, sort_name, sort_type, limitNum=0):
"""从MongoDB中读取数据d是查询要求sortName是排序的字段,sortType是排序类型 """
返回的是数据库查询的指针""" 从MongoDB中读取数据d是查询要求sortName是排序的字段,sortType是排序类型,1正序-1倒序
返回的是数据库查询的指针
"""
try: try:
if self.db_client: if self.db_client:
db = self.db_client[db_name] db = self.db_client[db_name]

View File

@ -263,12 +263,25 @@ class TdxStockData(object):
return results return results
def get_name(self, code, market_id): def get_name(self, symbol, market_id=None):
symbol_info = self.symbol_dict.get(f'{code}_{market_id}') """
if symbol_info: 获取名称
return symbol_info.get('name', code) :param symbol: 代码 或者 代码.交易所
:param market_id: 如果存在代码.交易所时不使用该值
:return:
"""
if '.' in symbol:
symbol, exchange = symbol.split('.')
if exchange == Exchange.SSE.value:
market_id = 1
elif exchange == Exchange.SZSE.value:
market_id = 0
return code symbol_info = self.symbol_dict.get(f'{symbol}_{market_id}')
if symbol_info:
return symbol_info.get('name', symbol)
return symbol
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def get_bars(self, def get_bars(self,

View File

@ -963,6 +963,47 @@ class CtpTdApi(TdApi):
"""""" """"""
self.gateway.write_error("交易撤单失败", error) self.gateway.write_error("交易撤单失败", error)
def onRspParkedOrderInsert(self, data: dict, error: dict, reqid: int, last: bool):
""""""
self.gateway.write_log('预埋单回报')
order_ref = data["OrderRef"]
orderid = f"{self.frontid}_{self.sessionid}_{order_ref}"
symbol = data["InstrumentID"]
exchange = symbol_exchange_map[symbol]
order_type = OrderType.LIMIT
if data["OrderPriceType"] == THOST_FTDC_OPT_LimitPrice and data["TimeCondition"] == THOST_FTDC_TC_IOC:
if data["VolumeCondition"] == THOST_FTDC_VC_AV:
order_type = OrderType.FAK
elif data["VolumeCondition"] == THOST_FTDC_VC_CV:
order_type = OrderType.FOK
if data["OrderPriceType"] == THOST_FTDC_OPT_AnyPrice:
order_type = OrderType.MARKET
order = OrderData(
symbol=symbol,
exchange=exchange,
accountid=self.accountid,
orderid=orderid,
type=order_type,
direction=DIRECTION_CTP2VT[data["Direction"]],
offset=OFFSET_CTP2VT.get(data["CombOffsetFlag"], Offset.NONE),
price=data["LimitPrice"],
volume=data["VolumeTotalOriginal"],
status=Status.REJECTED,
gateway_name=self.gateway_name
)
self.gateway.on_order(order)
# self.gateway.write_error("交易委托失败", error)
def onRspParkedOrderAction(self, data: dict, error: dict, reqid: int, last: bool):
""""""
self.gateway.write_error("预埋单交易撤单失败", error)
def onRspQueryMaxOrderVolume(self, data: dict, error: dict, reqid: int, last: bool): def onRspQueryMaxOrderVolume(self, data: dict, error: dict, reqid: int, last: bool):
"""""" """"""
pass pass
@ -1424,6 +1465,33 @@ class CtpTdApi(TdApi):
ctp_req["VolumeCondition"] = THOST_FTDC_VC_CV ctp_req["VolumeCondition"] = THOST_FTDC_VC_CV
self.reqid += 1 self.reqid += 1
dt = datetime.now()
use_packed = False
if 3 < dt.hour < 8 or 15 < dt.hour < 20:
use_packed = True
else:
if dt.hour in [8,20] and dt.minute <55:
use_packed = True
if req.exchange != Exchange.CFFEX:
if dt.hour == 10 and 15< dt.minute<30:
use_packed = True
if dt.hour == 11 and dt.minute > 30:
use_packed = True
if dt.hour == 12:
use_packed = True
if dt.hour == 13 and dt.minute < 30:
use_packed = True
else:
if dt.hour == 9 and dt.minute < 25:
use_packed = True
if dt.hour == 11 and dt.minute > 30:
use_packed = True
if dt.hour == 12:
use_packed = True
if use_packed:
self.gateway.write_log(f'使用预埋单下单')
self.reqParkedOrderInsert(ctp_req, self.reqid)
else:
self.reqOrderInsert(ctp_req, self.reqid) self.reqOrderInsert(ctp_req, self.reqid)
orderid = f"{self.frontid}_{self.sessionid}_{self.order_ref}" orderid = f"{self.frontid}_{self.sessionid}_{self.order_ref}"

View File

@ -59,6 +59,9 @@ class OffsetConverter:
def get_position_holding(self, vt_symbol: str, gateway_name: str = '') -> "PositionHolding": def get_position_holding(self, vt_symbol: str, gateway_name: str = '') -> "PositionHolding":
"""获取持仓信息""" """获取持仓信息"""
if gateway_name is None or len(gateway_name) == 0: if gateway_name is None or len(gateway_name) == 0:
if len(self.main_engine.gateways.keys()) == 1:
gateway_name = list(self.main_engine.gateways.keys())[0]
else:
contract = self.main_engine.get_contract(vt_symbol) contract = self.main_engine.get_contract(vt_symbol)
if contract: if contract:
gateway_name = contract.gateway_name gateway_name = contract.gateway_name

View File

@ -153,7 +153,7 @@ class MainEngine:
if not gateway: if not gateway:
# 增加兼容得写法如果没有输入gateway_name但当前只有一个gateway时就使用当前gateway # 增加兼容得写法如果没有输入gateway_name但当前只有一个gateway时就使用当前gateway
if len(self.gateways.keys()) == 1: if len(self.gateways.keys()) == 1:
return self.gateways.values()[0] return list(self.gateways.values())[0]
self.write_error(f"{self.gateways.keys()}中找不到底层接口:{gateway_name}") self.write_error(f"{self.gateways.keys()}中找不到底层接口:{gateway_name}")
return gateway return gateway

View File

@ -447,6 +447,16 @@ class TickCombiner(object):
u'leg2:{0}跌停{1}不合成价差Tick'.format(self.last_leg2_tick.vt_symbol, self.last_leg2_tick.ask_price_1)) u'leg2:{0}跌停{1}不合成价差Tick'.format(self.last_leg2_tick.vt_symbol, self.last_leg2_tick.ask_price_1))
return return
# 忽略买卖价格差距过大的tick
if self.last_leg1_tick.ask_price_1 > 5 * self.last_leg1_tick.bid_price_1 > 10:
self.gateway.write_log(u'leg1:{0}买卖价格差距过大{1} {2}不合成价差Tick'.format(
self.last_leg1_tick.vt_symbol, self.last_leg1_tick.ask_price_1, self.last_leg1_tick.bid_price_1))
return
if self.last_leg2_tick.ask_price_1 > 5 * self.last_leg2_tick.bid_price_1 > 10:
self.gateway.write_log(u'leg2:{0}买卖价格差距过大{1} {2}不合成价差Tick'.format(
self.last_leg2_tick.vt_symbol, self.last_leg2_tick.ask_price_1, self.last_leg2_tick.bid_price_1))
return
if self.trading_day != tick.trading_day: if self.trading_day != tick.trading_day:
self.trading_day = tick.trading_day self.trading_day = tick.trading_day
self.spread_high = None self.spread_high = None

View File

@ -1374,6 +1374,9 @@ class KLineWidget(KeyWraper):
# 标记时间 # 标记时间
t_value = df_markup['datetime'].loc[idx] t_value = df_markup['datetime'].loc[idx]
if not isinstance(t_value, datetime) and isinstance(t_value, str): if not isinstance(t_value, datetime) and isinstance(t_value, str):
if '.' in t_value:
t_value = datetime.strptime(t_value, '%Y-%m-%d %H:%M:%S.%f')
else:
t_value = datetime.strptime(t_value, '%Y-%m-%d %H:%M:%S') t_value = datetime.strptime(t_value, '%Y-%m-%d %H:%M:%S')
price = df_markup['price'].loc[idx] price = df_markup['price'].loc[idx]

View File

@ -20,7 +20,7 @@ from ..event import (
EVENT_ACCOUNT, EVENT_ACCOUNT,
EVENT_LOG EVENT_LOG
) )
from ..object import OrderRequest, SubscribeRequest from ..object import OrderRequest, SubscribeRequest,LogData
from ..utility import load_json, save_json from ..utility import load_json, save_json
from ..setting import SETTING_FILENAME, SETTINGS from ..setting import SETTING_FILENAME, SETTINGS
@ -646,6 +646,30 @@ class TradingWidget(QtWidgets.QWidget):
cancel_button = QtWidgets.QPushButton("全撤") cancel_button = QtWidgets.QPushButton("全撤")
cancel_button.clicked.connect(self.cancel_all) cancel_button.clicked.connect(self.cancel_all)
algo_stop_button = QtWidgets.QPushButton("全停算法")
algo_stop_button.clicked.connect(self.stop_algo)
hbox_nomal = QtWidgets.QHBoxLayout()
hbox_nomal.addWidget(send_button)
hbox_nomal.addWidget(cancel_button)
hbox_nomal.addWidget(algo_stop_button)
algo_button = QtWidgets.QPushButton("算法单")
algo_button.clicked.connect(self.send_algo)
self.win_pips = QtWidgets.QLineEdit()
self.win_pips.setText('10')
self.stop_pips = QtWidgets.QLineEdit()
self.stop_pips.setText('5')
hbox_algo = QtWidgets.QHBoxLayout()
win_lable = QtWidgets.QLabel("止盈跳")
hbox_algo.addWidget(win_lable)
hbox_algo.addWidget(self.win_pips)
stop_lable = QtWidgets.QLabel("止损跳")
hbox_algo.addWidget(stop_lable)
hbox_algo.addWidget(self.stop_pips)
hbox_algo.addWidget(algo_button)
self.checkFixed = QtWidgets.QCheckBox("价格") # 价格固定选择框 self.checkFixed = QtWidgets.QCheckBox("价格") # 价格固定选择框
form1 = QtWidgets.QFormLayout() form1 = QtWidgets.QFormLayout()
@ -658,8 +682,8 @@ class TradingWidget(QtWidgets.QWidget):
form1.addRow(self.checkFixed, self.price_line) form1.addRow(self.checkFixed, self.price_line)
form1.addRow("数量", self.volume_line) form1.addRow("数量", self.volume_line)
form1.addRow("接口", self.gateway_combo) form1.addRow("接口", self.gateway_combo)
form1.addRow(send_button) form1.addRow(hbox_nomal)
form1.addRow(cancel_button) form1.addRow(hbox_algo)
# Market depth display area # Market depth display area
bid_color = "rgb(255,174,201)" bid_color = "rgb(255,174,201)"
@ -852,6 +876,75 @@ class TradingWidget(QtWidgets.QWidget):
self.ap4_label.setText("") self.ap4_label.setText("")
self.ap5_label.setText("") self.ap5_label.setText("")
def stop_algo(self) -> None:
if not self.main_engine.algo_engine:
QtWidgets.QMessageBox.critical(self, "算法引擎未启动", "请先启动算法引擎")
return
try:
self.main_engine.algo_engine.stop_all()
except Exception as ex:
QtWidgets.QMessageBox.critical(self, f"算法引擎异常{str(ex)}", "请查看详细日志")
def send_algo(self) ->None:
"""启动算法"""
if not self.main_engine.algo_engine:
QtWidgets.QMessageBox.critical(self, "算法引擎未启动", "请先启动算法引擎")
return
template_name = 'AutoStopWinAlgo'
algo_template = self.main_engine.algo_engine.algo_templates.get(template_name,None)
if algo_template is None:
QtWidgets.QMessageBox.critical(self, f"算法[{template_name}]不存在", "请先部署算法")
return
symbol = str(self.symbol_line.text())
if not symbol:
QtWidgets.QMessageBox.critical(self, "委托失败", "请输入合约代码")
return
volume_text = str(self.volume_line.text())
if not volume_text:
QtWidgets.QMessageBox.critical(self, "委托失败", "请输入委托数量")
return
volume = float(volume_text)
price_text = str(self.price_line.text())
if not price_text:
price = 0
else:
price = float(price_text)
exchange = Exchange(str(self.exchange_combo.currentText()))
win_pips = str(self.win_pips.text())
if int(win_pips) <=0:
QtWidgets.QMessageBox.critical(self, "止盈点数须大于0", "请输入正确止盈点数")
return
stop_pips = str(self.stop_pips.text())
if int(stop_pips) <= 0:
QtWidgets.QMessageBox.critical(self, "止损点数须大于0", "请输入正确止损点数")
return
offset = Offset(str(self.offset_combo.currentText()))
if offset != Offset.OPEN:
QtWidgets.QMessageBox.critical(self, "算法只支持开仓", "请选择开仓方式")
return
setting = {
"vt_symbol": f"{symbol}.{exchange.value}",
"direction": Direction(str(self.direction_combo.currentText())),
"open_price": price,
"win_pips": int(win_pips),
"stop_pips": int(stop_pips),
"volume": volume,
"near_pips": 2, # 价格接近多少个跳动才开始挂单(开仓)
"offset": offset
}
algo = algo_template.new(self.main_engine.algo_engine, setting)
algo.start()
self.main_engine.algo_engine.algos[algo.algo_name] = algo
self.main_engine.write_log(msg=f'算法{algo.algo_name}启动')
def send_order(self) -> None: def send_order(self) -> None:
""" """
Send new order manually. Send new order manually.

View File

@ -699,6 +699,16 @@ def save_data_to_pkb2(data: Any, pkb2_file_name):
pickle.dump(data, f) pickle.dump(data, f)
def get_months_diff(act_symbol, pas_symbol):
"""获取合约得相隔月份"""
pas_month = int(pas_symbol[-2:])
act_month = int(act_symbol[-2:])
if pas_month < act_month:
pas_month += 12
return max(1, pas_month - act_month)
class BarGenerator: class BarGenerator:
""" """
For: For: