[新模块] 跟单APP

This commit is contained in:
msincenselee 2021-09-11 21:46:27 +08:00
parent 5b7d660304
commit b3c5a23563
4 changed files with 628 additions and 0 deletions

View File

@ -0,0 +1,443 @@
"""
vnpy 2.x版跟单应用
华富资产大佳
源帐号 => 跟单应用 => 目标帐号
配置步骤
1源帐号需要添加RpcServerApp并启动无界面或有界面都可以
2目标帐号交易程序添加TradeCopyApp配置源账号的Rep/Pub地址
跟单规则源帐号 仓位 * 倍率 => 目标帐号的目标仓位
"""
import os
import csv
from threading import Thread
from queue import Queue, Empty
from copy import copy
from collections import defaultdict, namedtuple
from datetime import datetime
import logging
from vnpy.event import Event, EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.constant import Exchange
from vnpy.trader.object import (
SubscribeRequest,
OrderRequest,
Offset,
Direction,
OrderType,
TickData,
ContractData
)
from vnpy.rpc import RpcClient
from vnpy.trader.event import EVENT_TICK, EVENT_CONTRACT, EVENT_POSITION, EVENT_TIMER
from vnpy.trader.utility import load_json, save_json, extract_vt_symbol
from vnpy.app.spread_trading.base import EVENT_SPREAD_DATA, SpreadData
APP_NAME = "TradeCopy"
EVENT_TRADECOPY_LOG = "eTradeCopyLog"
EVENT_TRADECOPY = 'eTradeCopy'
class TradeCopyEngine(BaseEngine):
"""
跟单交易
source ==> trade_copy ==> target
"""
setting_filename = "trade_copy_setting.json"
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine, event_engine, APP_NAME)
# 是否激活跟单
self.active = False
# 持仓字典 合约+方向: vt_symbol, direction, src_volume, target_volume, cur_volume, count
self.pos_dict = {}
# 被跟单得rpc server得请求端口、广播端口
self.source_rpc_rep = 'tcp://localhost:2015'
self.source_rpc_pub = 'tcp://localhost:2018'
# 跟单比率(1 ~ n)
# target_pos_volume = round(source_pos_volume * copy_ratio)
# 简单得四舍五入
self.copy_ratio = 1
# 跟单检查间隔缺省5秒
self.copy_interval = 5
# 定时器
self.timer_count = 0
# RPC客户端用于连接源帐号
self.client = RpcClient()
# 回调函数
self.client.callback = self.client_callback
# 接受本地position event更新
self.accept_local = False
# 加载配置
self.load_setting()
# 注册事件
self.register_event()
def load_setting(self):
"""
加载配置
:return:
"""
try:
setting = load_json(self.setting_filename)
self.source_rpc_rep = setting.get("source_rpc_rep", "")
self.source_rpc_pub = setting.get("source_rpc_pub", "")
self.copy_ratio = setting.get('copy_ratio', 1)
self.copy_interval = setting.get('copy_interval', 5)
except Exception as ex:
self.write_log(f'{APP_NAME}加载配置文件{self.setting_filename}异常{str(ex)}')
def save_setting(self):
"""
保存设置
:return:
"""
setting = {
"source_rpc_rep": self.source_rpc_rep,
"source_rpc_pub": self.source_rpc_pub,
"copy_ratio": self.copy_ratio,
'copy_interval': self.copy_interval
}
save_json(self.setting_filename, setting)
self.write_log(f'保存设置完毕:{setting}')
def register_event(self):
"""
注册事件处理
:return:
"""
self.event_engine.register(EVENT_POSITION, self.process_position_event)
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def client_callback(self, topic: str, event: Event):
"""
rpc客户端源帐号得event回报
:param topic:
:param event:
:return:
"""
if event is None:
print("none event", topic, event)
return
# 只处理持仓事件
if event.type == EVENT_POSITION:
src_pos = event.data
# 不处理套利合约
if ' ' in src_pos.symbol or '&' in src_pos.symbol:
return
# key = 合约+方向
k = f'{src_pos.vt_symbol}.{src_pos.direction.value}'
pos = self.pos_dict.get(k, {})
# 更新持仓中属于source得部分, 计算出目标持仓
target_volume = int(round(src_pos.volume * self.copy_ratio))
pos.update({'vt_symbol': src_pos.vt_symbol,
'direction': src_pos.direction,
'name': src_pos.name,
'symbol': src_pos.symbol,
'src_volume': src_pos.volume,
'target_volume': target_volume,
'count': pos.get('count', 0) + 1,
'volume': pos.get('volume', 0),
'yd_volume': pos.get('yd_volume', 0),
'cur_price': pos.get('cur_price', 0),
'price': pos.get('price', 0),
'diff': pos.get('diff', target_volume - pos.get('volume', 0))
})
self.pos_dict.update({k: pos})
def process_position_event(self, event: Event):
"""
处理本地帐号得持仓更新事件
:param event:
:return:
"""
# 必须等到rpc的仓位都到齐了才接收本地仓位
if not self.accept_local:
return
cur_pos = event.data
# 不处理套利合约
if ' ' in cur_pos.symbol or '&' in cur_pos.symbol:
return
# key = 合约+方向
k = f'{cur_pos.vt_symbol}.{cur_pos.direction.value}'
pos = self.pos_dict.get(k, {})
# 更新持仓中属于source得部分
pos.update({
'cur_positionid': cur_pos.vt_positionid,
'volume': cur_pos.volume,
'yd_volume': cur_pos.yd_volume,
'price': cur_pos.price,
'cur_price': cur_pos.cur_price,
'diff': pos.get('target_volume', 0) - cur_pos.volume,
'count': pos.get('count', 0) + 1
})
if 'name' not in pos:
pos.update({'name': cur_pos.name})
if 'vt_symbol' not in pos:
pos.update({'vt_symbol': cur_pos.vt_symbol})
if 'direction' not in pos:
pos.update({'direction': cur_pos.direction})
if 'symbol' not in pos:
pos.update({'symbol': cur_pos.symbol})
if 'src_volume' not in pos:
pos.update({'src_volume': 0})
if 'target_volume' not in pos:
pos.update({'target_volume': 0})
self.pos_dict.update({k: pos})
def put_event(self):
"""
更细监控表
:return:
"""
for key in self.pos_dict.keys():
pos = self.pos_dict.get(key, {})
# 补充key
pos.update({'vt_positionid': key})
# dict => object
data = namedtuple("TradeCopy", pos.keys())(*pos.values())
# 推送事件
event = Event(
EVENT_TRADECOPY,
data
)
self.event_engine.put(event)
def process_timer_event(self, event: Event):
"""定时执行"""
self.timer_count += 1
if self.timer_count % 2 == 0:
self.put_event()
if self.timer_count < self.copy_interval:
return
self.timer_count = 0
# 未激活,不执行
if not self.active:
return
# 执行跟单仓位比较
for key in self.pos_dict.keys():
pos = self.pos_dict.get(key)
# 等到两次rpc pos后才激活本地持仓更新
if not self.accept_local and pos.get('count', 0) > 2:
self.accept_local = True
self.write_log(f'激活本地持仓更新')
# 需要20次更新pos才算有效
if pos.get('count', 0) <= 20:
continue
target_volume = pos.get('target_volume', 0)
cur_volume = pos.get('volume', 0)
direction = pos.get('direction')
# 目标仓位 > 当前仓位, 需要开仓
if target_volume > cur_volume >= 0:
volume = target_volume - cur_volume
self.open_pos(vt_symbol=pos.get('vt_symbol'),
direction=direction,
volume=volume)
continue
# 目标仓位 < 当前仓位, 需要减仓
if 0 <= target_volume < cur_volume:
# 减仓数量
volume = cur_volume - target_volume
# 平仓相反方向
if direction == Direction.LONG:
direction = Direction.SHORT
else:
direction = Direction.LONG
self.close_pos(vt_symbol=pos.get('vt_symbol'),
direction=direction,
volume=volume,
vt_positionid=pos.get('cur_positionid'))
def open_pos(self, vt_symbol, direction, volume):
"""
买入或做空
:param vt_symbol:
:param direction:
:param volume:
:return:
"""
cur_tick = self.main_engine.get_tick(vt_symbol)
contract = self.main_engine.get_contract(vt_symbol)
symbol, exchange = extract_vt_symbol(vt_symbol)
if contract is None:
self.write_log(f'异常,{vt_symbol}的合约信息不存在')
return
if cur_tick is None:
req = SubscribeRequest(
symbol=symbol,
exchange=exchange
)
self.main_engine.subscribe(req, contract.gateway_name)
self.write_log(f'订阅合约{vt_symbol}')
return
dt_now = datetime.now()
# 最新tick的时间与当前的时间超过间隔不处理例如休盘时间
if (dt_now - cur_tick.datetime).total_seconds() > self.copy_interval:
self.write_log(f'{vt_symbol} 最后tick时间{cur_tick.datetime}不满足开仓要求,当前时间:{dt_now}')
return
open_price = cur_tick.ask_price_1 if direction == Direction.LONG else cur_tick.bid_price_1
order = OrderRequest(
symbol=symbol,
exchange=exchange,
direction=direction,
offset=Offset.OPEN,
volume=volume,
price=open_price,
type=OrderType.FAK
)
self.write_log(f'发出委托开仓,{vt_symbol}, {direction.value},{volume},{open_price} ')
self.main_engine.send_order(order, contract.gateway_name)
def close_pos(self, vt_symbol, direction, volume, vt_positionid):
"""
sell or cover
:param vt_symbol:
:param direction:
:param volume:
:return:
"""
cur_tick = self.main_engine.get_tick(vt_symbol)
contract = self.main_engine.get_contract(vt_symbol)
cur_pos = self.main_engine.get_position(vt_positionid)
if contract is None:
self.write_log(f'异常,{vt_symbol}的合约信息不存在')
return
symbol, exchange = extract_vt_symbol(vt_symbol)
if cur_tick is None:
req = SubscribeRequest(
symbol=symbol,
exchange=exchange
)
self.main_engine.subscribe(req, contract.gateway_name)
self.write_log(f'订阅合约{vt_symbol}')
return
if cur_pos is None:
self.write_log(f'异常,{vt_positionid}的持仓信息不存在')
return
dt_now = datetime.now()
# 最新tick的时间与当前的时间超过间隔不处理例如休盘时间
if (dt_now - cur_tick.datetime).total_seconds() > self.copy_interval:
self.write_log(f'{vt_symbol} 最后tick时间{cur_tick.datetime}不满足开仓要求,当前时间:{dt_now}')
return
close_price = cur_tick.ask_price_1 if direction == Direction.LONG else cur_tick.bid_price_1
offset = Offset.CLOSE
if exchange in [Exchange.SHFE, Exchange.CFFEX and Exchange.INE]:
# 优先平昨仓
if cur_pos.yd_volume > 0:
# 平昨
offset = Offset.CLOSEYESTERDAY
# 如果平昨数量不够,平掉所有昨仓,剩余仓位,下次再平
if cur_pos.yd_volume < volume:
self.write_log(f'{vt_symbol} 平仓数量:{volume} => {cur_pos.yd_volume}')
volume = cur_pos.yd_volume
else:
offset = Offset.CLOSETODAY
order = OrderRequest(
symbol=symbol,
exchange=exchange,
direction=direction,
offset=offset,
volume=volume,
price=close_price,
type=OrderType.FAK
)
self.write_log(f'发出委托开仓,{vt_symbol}, {direction.value},{volume},{close_price} ')
self.main_engine.send_order(order, contract.gateway_name)
def start_copy(self, source_req_addr, source_pub_addr, copy_ratio, copy_interval):
"""
开始执行跟单
:return:
"""
# 订阅事件
self.client.subscribe_topic("")
if source_req_addr != self.source_rpc_rep:
self.source_rpc_rep = source_req_addr
if source_pub_addr != self.source_rpc_pub:
self.source_rpc_pub = source_pub_addr
if copy_ratio != self.copy_ratio:
self.copy_ratio = copy_ratio
if copy_interval != self.copy_interval and self.copy_interval >= 1:
self.copy_interval = copy_interval
self.write_log(f'保存设置')
self.save_setting()
# 连接rpc客户端
self.write_log(f'开始连接rpc客户端')
self.client.start(self.source_rpc_rep, self.source_rpc_pub)
# 激活
self.write_log(f'激活跟单')
self.active = True
def stop_copy(self):
"""
停止跟单
:return:
"""
self.active = False
self.write_log(f'停止跟单')
def write_log(self, msg: str, source: str = "", level: int = logging.DEBUG):
"""
更新日志
:param msg:
:param source:
:param level:
:return:
"""
self.event_engine.put(Event(EVENT_TRADECOPY_LOG, msg))
super().write_log(msg, source, level)

View File

@ -0,0 +1 @@
from .widget import TcManager

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

View File

@ -0,0 +1,184 @@
from datetime import datetime
from vnpy.event import Event, EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import QtCore, QtWidgets, QtGui
from vnpy.trader.ui.widget import BaseCell, EnumCell, PnlCell, BaseMonitor, DirectionCell
from ..engine import (
APP_NAME,
EVENT_TRADECOPY_LOG,
EVENT_TRADECOPY
)
class PositionCopyMonitor(BaseMonitor):
"""
Monitor for position copy data.
"""
event_type = EVENT_TRADECOPY
data_key = "vt_positionid"
sorting = True
headers = {
"name": {"display": "名称", "cell": BaseCell, "update": False},
"symbol": {"display": "代码", "cell": BaseCell, "update": False},
"direction": {"display": "方向", "cell": DirectionCell, "update": False},
"src_volume": {"display": "源数量", "cell": BaseCell, "update": True},
"target_volume": {"display": "目标", "cell": BaseCell, "update": True},
"volume": {"display": "数量", "cell": BaseCell, "update": True},
"yd_volume": {"display": "昨仓", "cell": BaseCell, "update": True},
"diff": {"display": "偏差", "cell": PnlCell, "update": True},
"cur_price": {"display": "当前价", "cell": BaseCell, "update": True},
"price": {"display": "均价", "cell": BaseCell, "update": True},
}
def process_event(self, event: Event):
super().process_event(event)
class TcManager(QtWidgets.QWidget):
"""跟单应用界面"""
# qt日志事件
signal_log = QtCore.pyqtSignal(Event)
default_source_rpc_rep = 'tcp://localhost:2015'
default_source_rpc_pub = 'tcp://localhost:2018'
default_copy_ratio = 1
default_copy_interval = 5
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
super().__init__()
self.main_engine = main_engine
self.event_engine = event_engine
self.tc_engine = self.main_engine.get_engine(APP_NAME)
self.init_ui()
self.register_event()
def init_ui(self):
""""""
self.setWindowTitle("跟单应用")
self.resize(1098, 800)
# 创建组件
# 源帐号得rpc rep地址
self.line_rep_addr = QtWidgets.QLineEdit(self.tc_engine.source_rpc_rep)
# 源帐号得rpc pub地址
self.line_pub_addr = QtWidgets.QLineEdit(self.tc_engine.source_rpc_pub)
# 跟单比率
validator = QtGui.QDoubleValidator()
validator.setBottom(0)
self.line_copy_ratio = QtWidgets.QLineEdit()
self.line_copy_ratio.setValidator(validator)
self.line_copy_ratio.setText(str(self.tc_engine.copy_ratio))
# 更新频率
validator2 = QtGui.QIntValidator()
validator2.setBottom(1)
self.line_interval = QtWidgets.QLineEdit()
self.line_interval.setValidator(validator2)
self.line_interval.setText(str(self.tc_engine.copy_interval))
# 跟单按钮动作
self.btn_start_copy = QtWidgets.QPushButton(u'连接&跟单')
self.btn_start_copy.clicked.connect(self.start_copy)
# 停止动作
self.btn_stop_engine = QtWidgets.QPushButton(u'停止')
self.btn_stop_engine.clicked.connect(self.stop_copy)
self.btn_stop_engine.setEnabled(False)
# 重置设置
self.btn_reset_addr = QtWidgets.QPushButton(u'重置配置')
self.btn_reset_addr.clicked.connect(self.reset_setting)
# 仓位差异组件
self.pos_monitor = PositionCopyMonitor(self.main_engine, self.event_engine)
# 日志组件
self.log_monitor = QtWidgets.QTextEdit()
self.log_monitor.setReadOnly(True)
self.widgetList = [
self.line_copy_ratio,
self.line_interval,
self.line_rep_addr,
self.line_pub_addr,
self.btn_stop_engine,
self.btn_start_copy,
self.btn_reset_addr
]
# 布局
QLabel = QtWidgets.QLabel
grid = QtWidgets.QGridLayout()
grid.addWidget(QLabel(u'响应地址'), 0, 0)
grid.addWidget(self.line_rep_addr, 0, 1)
grid.addWidget(QLabel(u'发布地址'), 1, 0)
grid.addWidget(self.line_pub_addr, 1, 1)
grid.addWidget(QLabel(u'发布间隔(秒)'), 0, 2)
grid.addWidget(self.line_interval, 0, 3)
grid.addWidget(QLabel(u'复制比例(倍)'), 1, 2)
grid.addWidget(self.line_copy_ratio, 1, 3)
grid.addWidget(self.btn_start_copy, 2, 2, 1, 2)
grid.addWidget(self.btn_stop_engine, 2, 0, 1, 2)
grid.addWidget(self.btn_reset_addr, 3, 2, 1, 2)
grid.addWidget(self.pos_monitor, 4, 0, 1, 4)
grid.addWidget(self.log_monitor, 5, 0, 1, 4)
self.setLayout(grid)
def register_event(self):
"""注册事件绑定"""
# qt信号 => 日志更新函数()
self.signal_log.connect(self.process_log_event)
self.event_engine.register(EVENT_TRADECOPY_LOG, self.signal_log.emit)
def process_log_event(self, event: Event):
"""处理日志"""
timestamp = datetime.now().strftime("%H:%M:%S")
msg = f"{timestamp}\t{event.data}"
self.log_monitor.append(msg)
def reset_setting(self):
"""
重置配置
:return:
"""
self.line_rep_addr.setText(self.default_source_rpc_pub)
self.line_pub_addr.setText(self.default_source_rpc_pub)
self.line_copy_ratio.setText(self.default_copy_ratio)
self.line_interval.setText(self.default_copy_interval)
def start_copy(self):
"""
连接源帐号RPC
:return:
"""
source_req_addr = str(self.line_rep_addr.text())
source_pub_addr = str(self.line_pub_addr.text())
copy_ratio = float(self.line_copy_ratio.text())
copy_interval = float(self.line_interval.text())
self.tc_engine.start_copy(source_req_addr, source_pub_addr, copy_ratio, copy_interval)
for widget in self.widgetList:
widget.setEnabled(False)
self.btn_stop_engine.setEnabled(True)
def stop_copy(self):
if self.tc_engine:
self.tc_engine.stop_copy()