Merge pull request #1688 from 1122455801/data_recorder_01

[Add] datarecoder
This commit is contained in:
vn.py 2019-05-13 22:43:48 +08:00 committed by GitHub
commit 1973607262
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

265
docs/datarecoder.md Normal file
View File

@ -0,0 +1,265 @@
# 行情记录
行情记录模块用于实时行情的收录:
- 连接上接口后并且启动行情记录模块;
- 通过本地合约(vt_symbol)添加记录任务;
- 后台会自动调用行情API接口的suscribe()函数自动订阅行情;
- 行情信息通过database_manager模块的save_bar_data()函数/save_tick_data()函数载入到数据库中。
注意目前vnpy支持的数据库为SQLite/ MySQL/ PostgreSQL/ MongoDB。若用户使用MongoDB则行情记录数据直接载入到MongoDB中。
 
## 加载启动
进入VN Trader后首先登陆接口如连接CTP然后在菜单栏中点击“功能”->"行情记录“后,会弹出行情记录窗口,如图。
![](https://vnpy-community.oss-cn-shanghai.aliyuncs.com/forum_experience/yazhang/data_recoder/datarecoder.png)
此时行情记录模块的启动状态为True会启动while循环可以添加任务实现实时行情记录。
```
def start(self):
""""""
self.active = True
self.thread.start()
def run(self):
""""""
while self.active:
try:
task = self.queue.get(timeout=1)
task_type, data = task
if task_type == "tick":
database_manager.save_tick_data([data])
elif task_type == "bar":
database_manager.save_bar_data([data])
except Empty:
continue
```
 
## 开始收录
- 在“本地代码”选择输入需要订阅的行情如rb1905.SHFE
- 然后点击后边“K线记录”或者“Tick记录”中的“添加”选项会把记录特定品种任务添加到data_recorder_setting.json上并且显示到“K线记录列表”或者“Tick记录列表”中如图。
- 通过queue.put()与queue.get()异步完成收录行情信息任务。
![](https://vnpy-community.oss-cn-shanghai.aliyuncs.com/forum_experience/yazhang/data_recoder/start.png)
 
下面介绍行情收录的具体原理若无合约记录的历史用户需要先添加行情记录任务如连接CTP接口后记录rb1905.SHFE的tick数据然后调用add_tick_recording()函数执行下面工作:
1) 先创建tick_recordings字典
2) 调用接口的suscribe()函数订阅行情;
3 )保存该tick_recordings字典到json文件上
4) 推送行情记录事件。
```
def add_tick_recording(self, vt_symbol: str):
""""""
if vt_symbol in self.tick_recordings:
self.write_log(f"已在Tick记录列表中{vt_symbol}")
return
contract = self.main_engine.get_contract(vt_symbol)
if not contract:
self.write_log(f"找不到合约:{vt_symbol}")
return
self.tick_recordings[vt_symbol] = {}
"symbol": contract.symbol,
"exchange": contract.exchange.value,
"gateway_name": contract.gateway_name
}
self.subscribe(contract)
self.save_setting()
self.put_event()
self.write_log(f"添加Tick记录成功{vt_symbol}")
```
下面对add_tick_recording()函数里面调用的子函数进行扩展:
### 订阅行情
调用main_engine的suscribe()函数来订阅行情需要填入的信息为symbol、exchange、gateway_name
```
def subscribe(self, contract: ContractData):
""""""
req = SubscribeRequest(
symbol=contract.symbol,
exchange=contract.exchange
)
self.main_engine.subscribe(req, contract.gateway_name)
```
 
### 将订阅信息保存到json文件
- 主要把tick_recordings或者bar_recordings通过save_json()函数保存到C:\Users\Administrator\\.vntrader文件夹内的data_recorder_setting.json上。
- 该json文件用于存放行情记录的任务当每次启动行情模块后会调用load_setting()函数来得到tick_recordings和bar_recordings字典进而开始记录的任务。
```
setting_filename = "data_recorder_setting.json"
def save_setting(self):
""""""
setting = {
"tick": self.tick_recordings,
"bar": self.bar_recordings
}
save_json(self.setting_filename, setting)
def load_setting(self):
""""""
setting = load_json(self.setting_filename)
self.tick_recordings = setting.get("tick", {})
self.bar_recordings = setting.get("bar", {})
```
 
### 推送行情记录事件
- 创建行情记录列表tick_symbols和bar_symbols并且缓存在data字典里
- 创建evnte对象其类型是EVENT_RECORDER_UPDATE, 内容是data字典
- 调用event_engine的put()函数推送event事件。
```
def put_event(self):
""""""
tick_symbols = list(self.tick_recordings.keys())
tick_symbols.sort()
bar_symbols = list(self.bar_recordings.keys())
bar_symbols.sort()
data = {
"tick": tick_symbols,
"bar": bar_symbols
}
event = Event(
EVENT_RECORDER_UPDATE,
data
)
self.event_engine.put(event)
```
 
### 注册行情记录事件
register_event()函数分别注册2种事件EVENT_CONTRACT、EVENT_TICK
- EVENT_CONTRACT事件调用的是process_contract_event()函数: 从tick_recordings和bar_recordings字典获取需要订阅的合约品种然后使用subscribe()函数进行订阅行情。
- EVENT_TICK事件调用的是process_tick_event()函数从tick_recordings和bar_recordings字典获取需要订阅的合约品种然后使用record_tick()和record_bar()函数把行情记录任务推送到queue队列中等待执行。
```
def register_event(self):
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
def process_tick_event(self, event: Event):
""""""
tick = event.data
if tick.vt_symbol in self.tick_recordings:
self.record_tick(tick)
if tick.vt_symbol in self.bar_recordings:
bg = self.get_bar_generator(tick.vt_symbol)
bg.update_tick(tick)
def process_contract_event(self, event: Event):
""""""
contract = event.data
vt_symbol = contract.vt_symbol
if (vt_symbol in self.tick_recordings or vt_symbol in self.bar_recordings):
self.subscribe(contract)
def record_tick(self, tick: TickData):
""""""
task = ("tick", copy(tick))
self.queue.put(task)
def record_bar(self, bar: BarData):
""""""
task = ("bar", copy(bar))
self.queue.put(task)
def get_bar_generator(self, vt_symbol: str):
""""""
bg = self.bar_generators.get(vt_symbol, None)
if not bg:
bg = BarGenerator(self.record_bar)
self.bar_generators[vt_symbol] = bg
return bg
```
 
### 执行记录行情任务
在while循环中从queue队列读取任务调用save_tick_data()或者save_bar_data()函数来记录数据,并且载入到数据库中。
```
def run(self):
""""""
while self.active:
try:
task = self.queue.get(timeout=1)
task_type, data = task
if task_type == "tick":
database_manager.save_tick_data([data])
elif task_type == "bar":
database_manager.save_bar_data([data])
except Empty:
continue
```
 
## 移除记录
- 从tick_recordings字典移除vt_symbol
- 调用save_setting()函数保存json配置文件
- 推送最新的tick_recordings字典来继续记录行情原来移除合约品种不再记录。
```
def remove_tick_recording(self, vt_symbol: str):
""""""
if vt_symbol not in self.tick_recordings:
self.write_log(f"不在Tick记录列表中{vt_symbol}")
return
self.tick_recordings.pop(vt_symbol)
self.save_setting()
self.put_event()
self.write_log(f"移除Tick记录成功{vt_symbol}")
```
 
## 停止记录
- 记录行情状态改为False, 停止while循环
- 调用join()函数关掉线程。
```
def close(self):
""""""
self.active = False
if self.thread.isAlive():
self.thread.join()
```