diff --git a/vnpy/app/tick_recorder/__init__.py b/vnpy/app/tick_recorder/__init__.py new file mode 100644 index 00000000..65ebc750 --- /dev/null +++ b/vnpy/app/tick_recorder/__init__.py @@ -0,0 +1,18 @@ +from pathlib import Path + +from vnpy.trader.app import BaseApp +from vnpy.trader.constant import Direction +from vnpy.trader.object import TickData, BarData, TradeData, OrderData +from vnpy.trader.utility import BarGenerator, ArrayManager + +from .engine import TickRecorderEngine, APP_NAME + +class TickRecorderApp(BaseApp): + """""" + app_name = APP_NAME + app_module = __module__ + app_path = Path(__file__).parent + display_name = "Tick行情记录" + engine_class = TickRecorderEngine + widget_name = "RecorderManager" + icon_name = "recorder.ico" diff --git a/vnpy/app/tick_recorder/engine.py b/vnpy/app/tick_recorder/engine.py new file mode 100644 index 00000000..ed05d65f --- /dev/null +++ b/vnpy/app/tick_recorder/engine.py @@ -0,0 +1,270 @@ +""" +tick 文件记录 +华富资产 +""" +import os +import csv +from threading import Thread +from queue import Queue, Empty +from copy import copy +from collections import defaultdict +from datetime import datetime + +from vnpy.event import Event, EventEngine +from vnpy.trader.engine import BaseEngine, MainEngine +from vnpy.trader.constant import Exchange +from vnpy.trader.object import ( + SubscribeRequest, + TickData, + ContractData +) +from vnpy.trader.event import EVENT_TICK, EVENT_CONTRACT +from vnpy.trader.utility import load_json, save_json +from vnpy.app.spread_trading.base import EVENT_SPREAD_DATA, SpreadData + + +APP_NAME = "DataRecorder" + +EVENT_RECORDER_LOG = "eRecorderLog" +EVENT_RECORDER_UPDATE = "eRecorderUpdate" + + +class TickFileRecorder(object): + """ Tick 文件保存""" + def __init__(self, tick_folder: str): + + self.tick_dict = defaultdict(list) # symbol_hour_min: [] + + self.tick_folder = tick_folder + + self.last_minute = 0 + + def save_tick_data(self, tick_list: list = []): + """接收外部的保存tick请求""" + min = None + for tick in tick_list: + min = tick.datetime.minute + key = f'{tick.vt_symbol}_{tick.datetime.hour}-{tick.datetime.minute}' + save_list = self.tick_dict[key] + save_list.append(tick) + + if min is not None and min != self.last_minute: + self.last_minute = min + self.save_expire_datas() + + def save_expire_datas(self): + """保存超时得数据""" + dt = datetime.now() + for key in [key for key in self.tick_dict.keys() if not key.endswith(f'{dt.hour}-{dt.minute}')]: + vt_symbol = key.split('_')[0] + tick_list = self.tick_dict.pop(key) + + self.append_ticks_2_file(symbol=vt_symbol, tick_list=tick_list) + + def append_ticks_2_file(self, symbol: str, tick_list: list): + """创建/追加tick list 到csv文件""" + if len(tick_list) == 0: + return + + trading_day = tick_list[0].trading_day + + file_folder = os.path.abspath(os.path.join(self.tick_folder, trading_day.replace('-', '/'))) + if not os.path.exists(file_folder): + os.makedirs(file_folder) + + file_name = os.path.abspath(os.path.join(file_folder, f'{symbol}_{trading_day}.csv')) + + dict_fieldnames = sorted(list(tick_list[0].__dict__)) + + dict_fieldnames.remove('datetime') + + dict_fieldnames.insert(0, 'datetime') + + if not os.path.exists(file_name): + # 写入表头 + print(f'create and write data into {file_name}') + with open(file_name, 'a', encoding='utf8', newline='') as csvWriteFile: + writer = csv.DictWriter(f=csvWriteFile, fieldnames=dict_fieldnames, dialect='excel') + writer.writeheader() + for tick in tick_list: + d = tick.__dict__ + d.update({'datetime': tick.datetime.strftime('%Y-%m-%d %H:%M:%S.%f')}) + writer.writerow(d) + else: + # 写入数据 + print(f'write data into {file_name}') + with open(file_name, 'a', encoding='utf8', newline='') as csvWriteFile: + writer = csv.DictWriter(f=csvWriteFile, fieldnames=dict_fieldnames, dialect='excel', extrasaction='ignore') + for tick in tick_list: + d = tick.__dict__ + d.update({'datetime': tick.datetime.strftime('%Y-%m-%d %H:%M:%S.%f')}) + writer.writerow(d) + + +class TickRecorderEngine(BaseEngine): + """""" + setting_filename = "data_recorder_setting.json" + + def __init__(self, main_engine: MainEngine, event_engine: EventEngine): + """""" + super().__init__(main_engine, event_engine, APP_NAME) + + self.queue = Queue() + self.thread = Thread(target=self.run) + self.active = False + + self.tick_recordings = {} + self.tick_folder = '' + + self.load_setting() + + self.tick_recorder = TickFileRecorder(self.tick_folder) + + self.register_event() + self.start() + self.put_event() + + def load_setting(self): + """""" + setting = load_json(self.setting_filename) + self.tick_recordings = setting.get("tick", {}) + self.tick_folder = setting.get('tick_folder', os.getcwd()) + + def save_setting(self): + """""" + setting = { + "tick": self.tick_recordings + } + save_json(self.setting_filename, setting) + + def run(self): + """""" + while self.active: + try: + task = self.queue.get(timeout=1) + task_type, data = task + + if task_type == "tick": + self.tick_recorder.save_tick_data([data]) + + except Empty: + continue + + def close(self): + """""" + self.active = False + + if self.thread.isAlive(): + self.thread.join() + + def start(self): + """""" + self.active = True + self.thread.start() + + def add_tick_recording(self, vt_symbol: str): + """""" + if vt_symbol in self.tick_recordings: + self.write_log(f"已在Tick记录列表中:{vt_symbol}") + return + + # For normal contract + if Exchange.LOCAL.value not in vt_symbol: + contract = self.main_engine.get_contract(vt_symbol) + if not contract: + self.write_log(f"找不到合约:{vt_symbol}") + return + + self.tick_recordings[vt_symbol] = { + "symbol": contract.symbol, + "exchange": contract.exchange.value, + "gateway_name": contract.gateway_name + } + + self.subscribe(contract) + # No need to subscribe for spread data + else: + self.tick_recordings[vt_symbol] = {} + + self.save_setting() + self.put_event() + + self.write_log(f"添加Tick记录成功:{vt_symbol}") + + def remove_tick_recording(self, vt_symbol: str): + """""" + if vt_symbol not in self.tick_recordings: + self.write_log(f"不在Tick记录列表中:{vt_symbol}") + return + + self.tick_recordings.pop(vt_symbol) + self.save_setting() + self.put_event() + + self.write_log(f"移除Tick记录成功:{vt_symbol}") + + def register_event(self): + """""" + self.event_engine.register(EVENT_TICK, self.process_tick_event) + self.event_engine.register(EVENT_CONTRACT, self.process_contract_event) + self.event_engine.register( + EVENT_SPREAD_DATA, self.process_spread_event) + + def update_tick(self, tick: TickData): + """""" + if tick.vt_symbol in self.tick_recordings: + self.record_tick(tick) + + def process_tick_event(self, event: Event): + """""" + tick = event.data + self.update_tick(tick) + + def process_contract_event(self, event: Event): + """""" + contract = event.data + vt_symbol = contract.vt_symbol + + if vt_symbol in self.tick_recordings: + self.subscribe(contract) + + def process_spread_event(self, event: Event): + """""" + spread: SpreadData = event.data + tick = spread.to_tick() + + # Filter not inited spread data + if tick.datetime: + self.update_tick(tick) + + def write_log(self, msg: str): + """""" + print(msg) + + def put_event(self): + """""" + tick_symbols = list(self.tick_recordings.keys()) + tick_symbols.sort() + + data = { + "tick": tick_symbols + } + + event = Event( + EVENT_RECORDER_UPDATE, + data + ) + self.event_engine.put(event) + + def record_tick(self, tick: TickData): + """""" + task = ("tick", copy(tick)) + self.queue.put(task) + + def subscribe(self, contract: ContractData): + """""" + req = SubscribeRequest( + symbol=contract.symbol, + exchange=contract.exchange + ) + self.main_engine.subscribe(req, contract.gateway_name) diff --git a/vnpy/app/tick_recorder/ui/__init__.py b/vnpy/app/tick_recorder/ui/__init__.py new file mode 100644 index 00000000..7339f138 --- /dev/null +++ b/vnpy/app/tick_recorder/ui/__init__.py @@ -0,0 +1 @@ +from .widget import RecorderManager diff --git a/vnpy/app/tick_recorder/ui/recorder.ico b/vnpy/app/tick_recorder/ui/recorder.ico new file mode 100644 index 00000000..1ddc8b85 Binary files /dev/null and b/vnpy/app/tick_recorder/ui/recorder.ico differ diff --git a/vnpy/app/tick_recorder/ui/widget.py b/vnpy/app/tick_recorder/ui/widget.py new file mode 100644 index 00000000..724427b1 --- /dev/null +++ b/vnpy/app/tick_recorder/ui/widget.py @@ -0,0 +1,129 @@ +from datetime import datetime + + +from vnpy.event import Event, EventEngine +from vnpy.trader.engine import MainEngine +from vnpy.trader.ui import QtCore, QtWidgets +from vnpy.trader.event import EVENT_CONTRACT + +from ..engine import ( + APP_NAME, + EVENT_RECORDER_LOG, + EVENT_RECORDER_UPDATE +) + + +class RecorderManager(QtWidgets.QWidget): + """""" + + signal_log = QtCore.pyqtSignal(Event) + signal_update = QtCore.pyqtSignal(Event) + signal_contract = QtCore.pyqtSignal(Event) + + def __init__(self, main_engine: MainEngine, event_engine: EventEngine): + super().__init__() + + self.main_engine = main_engine + self.event_engine = event_engine + self.recorder_engine = main_engine.get_engine(APP_NAME) + + self.init_ui() + self.register_event() + self.recorder_engine.put_event() + + def init_ui(self): + """""" + self.setWindowTitle("Tick行情记录") + self.resize(1000, 600) + + # Create widgets + self.symbol_line = QtWidgets.QLineEdit() + self.symbol_line.setFixedHeight( + self.symbol_line.sizeHint().height() * 2) + + contracts = self.main_engine.get_all_contracts() + self.vt_symbols = [contract.vt_symbol for contract in contracts] + + self.symbol_completer = QtWidgets.QCompleter(self.vt_symbols) + self.symbol_completer.setFilterMode(QtCore.Qt.MatchContains) + self.symbol_completer.setCompletionMode( + self.symbol_completer.PopupCompletion) + self.symbol_line.setCompleter(self.symbol_completer) + + add_tick_button = QtWidgets.QPushButton("添加") + add_tick_button.clicked.connect(self.add_tick_recording) + + remove_tick_button = QtWidgets.QPushButton("移除") + remove_tick_button.clicked.connect(self.remove_tick_recording) + + self.tick_recording_edit = QtWidgets.QTextEdit() + self.tick_recording_edit.setReadOnly(True) + + self.log_edit = QtWidgets.QTextEdit() + self.log_edit.setReadOnly(True) + + # Set layout + grid = QtWidgets.QGridLayout() + grid.addWidget(QtWidgets.QLabel("Tick记录"), 0, 0) + grid.addWidget(add_tick_button, 0, 1) + grid.addWidget(remove_tick_button, 0, 2) + + hbox = QtWidgets.QHBoxLayout() + hbox.addWidget(QtWidgets.QLabel("本地代码")) + hbox.addWidget(self.symbol_line) + hbox.addWidget(QtWidgets.QLabel(" ")) + hbox.addLayout(grid) + hbox.addStretch() + + grid2 = QtWidgets.QGridLayout() + grid2.addWidget(QtWidgets.QLabel("Tick记录列表"), 0, 0) + grid2.addWidget(self.tick_recording_edit, 0, 1) + + vbox = QtWidgets.QVBoxLayout() + vbox.addLayout(hbox) + vbox.addLayout(grid2) + self.setLayout(vbox) + + def register_event(self): + """""" + self.signal_log.connect(self.process_log_event) + self.signal_contract.connect(self.process_contract_event) + self.signal_update.connect(self.process_update_event) + + self.event_engine.register(EVENT_CONTRACT, self.signal_contract.emit) + self.event_engine.register( + EVENT_RECORDER_LOG, self.signal_log.emit) + self.event_engine.register( + EVENT_RECORDER_UPDATE, self.signal_update.emit) + + def process_log_event(self, event: Event): + """""" + timestamp = datetime.now().strftime("%H:%M:%S") + msg = f"{timestamp}\t{event.data}" + self.log_edit.append(msg) + + def process_update_event(self, event: Event): + """""" + data = event.data + + self.tick_recording_edit.clear() + tick_text = "\n".join(data["tick"]) + self.tick_recording_edit.setText(tick_text) + + def process_contract_event(self, event: Event): + """""" + contract = event.data + self.vt_symbols.append(contract.vt_symbol) + + model = self.symbol_completer.model() + model.setStringList(self.vt_symbols) + + def add_tick_recording(self): + """""" + vt_symbol = self.symbol_line.text() + self.recorder_engine.add_tick_recording(vt_symbol) + + def remove_tick_recording(self): + """""" + vt_symbol = self.symbol_line.text() + self.recorder_engine.remove_tick_recording(vt_symbol)