[新功能] tick记录服务,记录至文件csv

This commit is contained in:
msincenselee 2019-12-26 15:08:55 +08:00
parent 3912b31e83
commit fa13aaf273
5 changed files with 418 additions and 0 deletions

View File

@ -0,0 +1,18 @@
from pathlib import Path
from vnpy.trader.app import BaseApp
from vnpy.trader.constant import Direction
from vnpy.trader.object import TickData, BarData, TradeData, OrderData
from vnpy.trader.utility import BarGenerator, ArrayManager
from .engine import TickRecorderEngine, APP_NAME
class TickRecorderApp(BaseApp):
""""""
app_name = APP_NAME
app_module = __module__
app_path = Path(__file__).parent
display_name = "Tick行情记录"
engine_class = TickRecorderEngine
widget_name = "RecorderManager"
icon_name = "recorder.ico"

View File

@ -0,0 +1,270 @@
"""
tick 文件记录
华富资产
"""
import os
import csv
from threading import Thread
from queue import Queue, Empty
from copy import copy
from collections import defaultdict
from datetime import datetime
from vnpy.event import Event, EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.constant import Exchange
from vnpy.trader.object import (
SubscribeRequest,
TickData,
ContractData
)
from vnpy.trader.event import EVENT_TICK, EVENT_CONTRACT
from vnpy.trader.utility import load_json, save_json
from vnpy.app.spread_trading.base import EVENT_SPREAD_DATA, SpreadData
APP_NAME = "DataRecorder"
EVENT_RECORDER_LOG = "eRecorderLog"
EVENT_RECORDER_UPDATE = "eRecorderUpdate"
class TickFileRecorder(object):
""" Tick 文件保存"""
def __init__(self, tick_folder: str):
self.tick_dict = defaultdict(list) # symbol_hour_min: []
self.tick_folder = tick_folder
self.last_minute = 0
def save_tick_data(self, tick_list: list = []):
"""接收外部的保存tick请求"""
min = None
for tick in tick_list:
min = tick.datetime.minute
key = f'{tick.vt_symbol}_{tick.datetime.hour}-{tick.datetime.minute}'
save_list = self.tick_dict[key]
save_list.append(tick)
if min is not None and min != self.last_minute:
self.last_minute = min
self.save_expire_datas()
def save_expire_datas(self):
"""保存超时得数据"""
dt = datetime.now()
for key in [key for key in self.tick_dict.keys() if not key.endswith(f'{dt.hour}-{dt.minute}')]:
vt_symbol = key.split('_')[0]
tick_list = self.tick_dict.pop(key)
self.append_ticks_2_file(symbol=vt_symbol, tick_list=tick_list)
def append_ticks_2_file(self, symbol: str, tick_list: list):
"""创建/追加tick list 到csv文件"""
if len(tick_list) == 0:
return
trading_day = tick_list[0].trading_day
file_folder = os.path.abspath(os.path.join(self.tick_folder, trading_day.replace('-', '/')))
if not os.path.exists(file_folder):
os.makedirs(file_folder)
file_name = os.path.abspath(os.path.join(file_folder, f'{symbol}_{trading_day}.csv'))
dict_fieldnames = sorted(list(tick_list[0].__dict__))
dict_fieldnames.remove('datetime')
dict_fieldnames.insert(0, 'datetime')
if not os.path.exists(file_name):
# 写入表头
print(f'create and write data into {file_name}')
with open(file_name, 'a', encoding='utf8', newline='') as csvWriteFile:
writer = csv.DictWriter(f=csvWriteFile, fieldnames=dict_fieldnames, dialect='excel')
writer.writeheader()
for tick in tick_list:
d = tick.__dict__
d.update({'datetime': tick.datetime.strftime('%Y-%m-%d %H:%M:%S.%f')})
writer.writerow(d)
else:
# 写入数据
print(f'write data into {file_name}')
with open(file_name, 'a', encoding='utf8', newline='') as csvWriteFile:
writer = csv.DictWriter(f=csvWriteFile, fieldnames=dict_fieldnames, dialect='excel', extrasaction='ignore')
for tick in tick_list:
d = tick.__dict__
d.update({'datetime': tick.datetime.strftime('%Y-%m-%d %H:%M:%S.%f')})
writer.writerow(d)
class TickRecorderEngine(BaseEngine):
""""""
setting_filename = "data_recorder_setting.json"
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine, event_engine, APP_NAME)
self.queue = Queue()
self.thread = Thread(target=self.run)
self.active = False
self.tick_recordings = {}
self.tick_folder = ''
self.load_setting()
self.tick_recorder = TickFileRecorder(self.tick_folder)
self.register_event()
self.start()
self.put_event()
def load_setting(self):
""""""
setting = load_json(self.setting_filename)
self.tick_recordings = setting.get("tick", {})
self.tick_folder = setting.get('tick_folder', os.getcwd())
def save_setting(self):
""""""
setting = {
"tick": self.tick_recordings
}
save_json(self.setting_filename, setting)
def run(self):
""""""
while self.active:
try:
task = self.queue.get(timeout=1)
task_type, data = task
if task_type == "tick":
self.tick_recorder.save_tick_data([data])
except Empty:
continue
def close(self):
""""""
self.active = False
if self.thread.isAlive():
self.thread.join()
def start(self):
""""""
self.active = True
self.thread.start()
def add_tick_recording(self, vt_symbol: str):
""""""
if vt_symbol in self.tick_recordings:
self.write_log(f"已在Tick记录列表中{vt_symbol}")
return
# For normal contract
if Exchange.LOCAL.value not in vt_symbol:
contract = self.main_engine.get_contract(vt_symbol)
if not contract:
self.write_log(f"找不到合约:{vt_symbol}")
return
self.tick_recordings[vt_symbol] = {
"symbol": contract.symbol,
"exchange": contract.exchange.value,
"gateway_name": contract.gateway_name
}
self.subscribe(contract)
# No need to subscribe for spread data
else:
self.tick_recordings[vt_symbol] = {}
self.save_setting()
self.put_event()
self.write_log(f"添加Tick记录成功{vt_symbol}")
def remove_tick_recording(self, vt_symbol: str):
""""""
if vt_symbol not in self.tick_recordings:
self.write_log(f"不在Tick记录列表中{vt_symbol}")
return
self.tick_recordings.pop(vt_symbol)
self.save_setting()
self.put_event()
self.write_log(f"移除Tick记录成功{vt_symbol}")
def register_event(self):
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
self.event_engine.register(
EVENT_SPREAD_DATA, self.process_spread_event)
def update_tick(self, tick: TickData):
""""""
if tick.vt_symbol in self.tick_recordings:
self.record_tick(tick)
def process_tick_event(self, event: Event):
""""""
tick = event.data
self.update_tick(tick)
def process_contract_event(self, event: Event):
""""""
contract = event.data
vt_symbol = contract.vt_symbol
if vt_symbol in self.tick_recordings:
self.subscribe(contract)
def process_spread_event(self, event: Event):
""""""
spread: SpreadData = event.data
tick = spread.to_tick()
# Filter not inited spread data
if tick.datetime:
self.update_tick(tick)
def write_log(self, msg: str):
""""""
print(msg)
def put_event(self):
""""""
tick_symbols = list(self.tick_recordings.keys())
tick_symbols.sort()
data = {
"tick": tick_symbols
}
event = Event(
EVENT_RECORDER_UPDATE,
data
)
self.event_engine.put(event)
def record_tick(self, tick: TickData):
""""""
task = ("tick", copy(tick))
self.queue.put(task)
def subscribe(self, contract: ContractData):
""""""
req = SubscribeRequest(
symbol=contract.symbol,
exchange=contract.exchange
)
self.main_engine.subscribe(req, contract.gateway_name)

View File

@ -0,0 +1 @@
from .widget import RecorderManager

Binary file not shown.

After

Width:  |  Height:  |  Size: 66 KiB

View File

@ -0,0 +1,129 @@
from datetime import datetime
from vnpy.event import Event, EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import QtCore, QtWidgets
from vnpy.trader.event import EVENT_CONTRACT
from ..engine import (
APP_NAME,
EVENT_RECORDER_LOG,
EVENT_RECORDER_UPDATE
)
class RecorderManager(QtWidgets.QWidget):
""""""
signal_log = QtCore.pyqtSignal(Event)
signal_update = QtCore.pyqtSignal(Event)
signal_contract = QtCore.pyqtSignal(Event)
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
super().__init__()
self.main_engine = main_engine
self.event_engine = event_engine
self.recorder_engine = main_engine.get_engine(APP_NAME)
self.init_ui()
self.register_event()
self.recorder_engine.put_event()
def init_ui(self):
""""""
self.setWindowTitle("Tick行情记录")
self.resize(1000, 600)
# Create widgets
self.symbol_line = QtWidgets.QLineEdit()
self.symbol_line.setFixedHeight(
self.symbol_line.sizeHint().height() * 2)
contracts = self.main_engine.get_all_contracts()
self.vt_symbols = [contract.vt_symbol for contract in contracts]
self.symbol_completer = QtWidgets.QCompleter(self.vt_symbols)
self.symbol_completer.setFilterMode(QtCore.Qt.MatchContains)
self.symbol_completer.setCompletionMode(
self.symbol_completer.PopupCompletion)
self.symbol_line.setCompleter(self.symbol_completer)
add_tick_button = QtWidgets.QPushButton("添加")
add_tick_button.clicked.connect(self.add_tick_recording)
remove_tick_button = QtWidgets.QPushButton("移除")
remove_tick_button.clicked.connect(self.remove_tick_recording)
self.tick_recording_edit = QtWidgets.QTextEdit()
self.tick_recording_edit.setReadOnly(True)
self.log_edit = QtWidgets.QTextEdit()
self.log_edit.setReadOnly(True)
# Set layout
grid = QtWidgets.QGridLayout()
grid.addWidget(QtWidgets.QLabel("Tick记录"), 0, 0)
grid.addWidget(add_tick_button, 0, 1)
grid.addWidget(remove_tick_button, 0, 2)
hbox = QtWidgets.QHBoxLayout()
hbox.addWidget(QtWidgets.QLabel("本地代码"))
hbox.addWidget(self.symbol_line)
hbox.addWidget(QtWidgets.QLabel(" "))
hbox.addLayout(grid)
hbox.addStretch()
grid2 = QtWidgets.QGridLayout()
grid2.addWidget(QtWidgets.QLabel("Tick记录列表"), 0, 0)
grid2.addWidget(self.tick_recording_edit, 0, 1)
vbox = QtWidgets.QVBoxLayout()
vbox.addLayout(hbox)
vbox.addLayout(grid2)
self.setLayout(vbox)
def register_event(self):
""""""
self.signal_log.connect(self.process_log_event)
self.signal_contract.connect(self.process_contract_event)
self.signal_update.connect(self.process_update_event)
self.event_engine.register(EVENT_CONTRACT, self.signal_contract.emit)
self.event_engine.register(
EVENT_RECORDER_LOG, self.signal_log.emit)
self.event_engine.register(
EVENT_RECORDER_UPDATE, self.signal_update.emit)
def process_log_event(self, event: Event):
""""""
timestamp = datetime.now().strftime("%H:%M:%S")
msg = f"{timestamp}\t{event.data}"
self.log_edit.append(msg)
def process_update_event(self, event: Event):
""""""
data = event.data
self.tick_recording_edit.clear()
tick_text = "\n".join(data["tick"])
self.tick_recording_edit.setText(tick_text)
def process_contract_event(self, event: Event):
""""""
contract = event.data
self.vt_symbols.append(contract.vt_symbol)
model = self.symbol_completer.model()
model.setStringList(self.vt_symbols)
def add_tick_recording(self):
""""""
vt_symbol = self.symbol_line.text()
self.recorder_engine.add_tick_recording(vt_symbol)
def remove_tick_recording(self):
""""""
vt_symbol = self.symbol_line.text()
self.recorder_engine.remove_tick_recording(vt_symbol)