vnpy/tutorial/performance/Performance of Receiving Tick Data.ipynb

499 lines
16 KiB
Plaintext
Raw Normal View History

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#vnpy接收行情数据性能测试与改进优化"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"by Jerry He, 2016.12,\n",
"讨论https://zhuanlan.zhihu.com/p/24662087"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"近来量化交易平台vnpy因其开源、功能强大、开发容易、可定制性强的特点目前已经被广泛应用在量化交易中。\n",
"行情数据落地是量化交易平台必须解决的一个基础问题它有两个方面的作用一是供策略开发时进行分析、回测二是为实盘程序时提供近期的历史数据。前者可以通过传统效率更高的实现方式比如我们有基于C++和leveldb实现的行情数据接收、转发、历史数据获取程序实现也可以通过向数据提供方购买获取。但是对于后者直接基于vnpy落地近期的数据是更为简易的方式。\n",
"\n",
"vnpy包含行情落地模块dataRecorder已经实现了tick数据、分钟bar数据保存功能。\n",
"本工作主要包括:\n",
"- vnpy原落地函数的性能考查\n",
"- 针对CTP接口原落地函数的修正与优化\n",
"\n",
"以下所有性能测试时间单位均为毫秒。\n",
"\n",
"测试基于windows 7, i7 3.4GHz."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from datetime import datetime, time\n",
"import time as gtime\n",
"import pymongo\n",
"from dateutil.parser import parse"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 重构vnpy接收行情数据代码以用于测试"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"TICK_DB_NAME='Test'\n",
"\n",
"EMPTY_STRING = ''\n",
"EMPTY_UNICODE = u''\n",
"EMPTY_INT = 0\n",
"EMPTY_FLOAT = 0.0\n",
"\n",
"class DrTickData(object):\n",
" \"\"\"Tick数据\"\"\"\n",
"\n",
" #----------------------------------------------------------------------\n",
" def __init__(self):\n",
" \"\"\"Constructor\"\"\" \n",
" self.vtSymbol = EMPTY_STRING # vt系统代码\n",
" self.symbol = EMPTY_STRING # 合约代码\n",
" self.exchange = EMPTY_STRING # 交易所代码\n",
"\n",
" # 成交数据\n",
" self.lastPrice = EMPTY_FLOAT # 最新成交价\n",
" self.volume = EMPTY_INT # 最新成交量\n",
" self.openInterest = EMPTY_INT # 持仓量\n",
" \n",
" self.upperLimit = EMPTY_FLOAT # 涨停价\n",
" self.lowerLimit = EMPTY_FLOAT # 跌停价\n",
" \n",
" # tick的时间\n",
" self.date = EMPTY_STRING # 日期\n",
" self.time = EMPTY_STRING # 时间\n",
" self.datetime = None # python的datetime时间对象\n",
" \n",
" # 五档行情\n",
" self.bidPrice1 = EMPTY_FLOAT\n",
" self.bidPrice2 = EMPTY_FLOAT\n",
" self.bidPrice3 = EMPTY_FLOAT\n",
" self.bidPrice4 = EMPTY_FLOAT\n",
" self.bidPrice5 = EMPTY_FLOAT\n",
" \n",
" self.askPrice1 = EMPTY_FLOAT\n",
" self.askPrice2 = EMPTY_FLOAT\n",
" self.askPrice3 = EMPTY_FLOAT\n",
" self.askPrice4 = EMPTY_FLOAT\n",
" self.askPrice5 = EMPTY_FLOAT \n",
" \n",
" self.bidVolume1 = EMPTY_INT\n",
" self.bidVolume2 = EMPTY_INT\n",
" self.bidVolume3 = EMPTY_INT\n",
" self.bidVolume4 = EMPTY_INT\n",
" self.bidVolume5 = EMPTY_INT\n",
" \n",
" self.askVolume1 = EMPTY_INT\n",
" self.askVolume2 = EMPTY_INT\n",
" self.askVolume3 = EMPTY_INT\n",
" self.askVolume4 = EMPTY_INT\n",
" self.askVolume5 = EMPTY_INT \n",
" \n",
"def insertData(db,collection,data):\n",
" client[db][collection].insert(data.__dict__)\n",
"\n",
"def procecssTickEvent(tick, insertDB=False):\n",
" \"\"\"处理行情推送\"\"\"\n",
" vtSymbol = tick.vtSymbol\n",
"\n",
" # 转化Tick格式\n",
" drTick = DrTickData()\n",
" d = drTick.__dict__\n",
" for key in d.keys():\n",
" if key != 'datetime':\n",
" d[key] = tick.__dict__[key]\n",
" drTick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f') \n",
" \n",
" # 更新Tick数据\n",
" if insertDB:\n",
" insertData(TICK_DB_NAME, vtSymbol, drTick) "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 创建一个用于测试的Tick数据"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{u'askPrice2': 0.0, u'lastPrice': 2977.0, u'exchange': u'UNKNOWN', u'bidVolume5': 0, u'bidVolume4': 0, u'bidVolume3': 0, u'bidVolume2': 0, u'bidVolume1': 1551, u'datetime': datetime.datetime(2016, 12, 28, 21, 27, 36, 500000), u'askVolume1': 120, u'askVolume3': 0, u'askVolume2': 0, u'askVolume5': 0, u'askVolume4': 0, u'date': u'20161228', u'askPrice5': 0.0, u'volume': 392068, u'lowerLimit': 2725.0, u'bidPrice5': 0.0, u'bidPrice4': 0.0, u'bidPrice1': 2976.0, u'bidPrice3': 0.0, u'bidPrice2': 0.0, u'vtSymbol': u'rb1705', u'time': u'21:27:36.5', u'openInterest': 2304294.0, u'askPrice4': 0.0, u'askPrice3': 0.0, u'symbol': u'rb1705', u'askPrice1': 2977.0, u'upperLimit': 3136.0}\n"
]
}
],
"source": [
"client=pymongo.MongoClient()\n",
"data=client['VnTrader_Tick_Db']['rb1705'].find_one({})\n",
"del data['_id']\n",
"\n",
"class InputTick: pass\n",
"tick=InputTick()\n",
"tick.__dict__.update(data)\n",
"print tick.__dict__"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 测试原版函数性能"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def profiling(count,func=None):\n",
" if func==None: func=lambda: procecssTickEvent(tick)\n",
" t0=gtime.time()\n",
" for i in range(count):\n",
" func()\n",
" total_time=(gtime.time()-t0)\n",
" return total_time*1000/count\n",
"\n",
"test_count=10000"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"原版不保存数据到mongodb单次耗时0.0255\n",
"原版含保存数据到mongodb单次耗时0.2334\n"
]
}
],
"source": [
"original_nodb=profiling(test_count)\n",
"client.drop_database(TICK_DB_NAME)\n",
"original_db=profiling(test_count,func=lambda: procecssTickEvent(tick,insertDB=True))\n",
"print '原版不保存数据到mongodb单次耗时%.4f' %original_nodb\n",
"print '原版含保存数据到mongodb单次耗时%.4f' %original_db"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false
},
"source": [
"##改进版本\n",
"\n",
"原版程序使用CTP接口保存期货数据时存在几个问题\n",
"- 非交易时间收到的野数据没有被过滤掉\n",
"- 当前各交易所提供的date字段混乱有的使用真实日期有的使用交易日导致计算的datetime字段也是有问题的\n",
"\n",
"针对以上问题的改进版本如下:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"#过滤掉的时间区间注意集合竞价tick被过滤了。\n",
"invalid_sections=[(time(2,30,59),time(9,0,0)),\n",
" (time(11,30,59),time(13,0,0)),\n",
" (time(15,15,0),time(21,0,0))]\n",
"\n",
"#本地时间在此区间时对收到的Tick数据不处理避免有时期货公司会抽风把数据重推一次。\n",
"invalid_local_section=(time(5,0,0),time(8,30,0))\n",
"\n",
"def procecssTickEvent(tick, insertDB=False):\n",
" \"\"\"处理行情推送\"\"\"\n",
" # 1. 本地时间检查\n",
" local_datetime=datetime.now()\n",
" local_time=local_datetime.time()\n",
" if local_time>invalid_local_section[0] and local_time<invalid_local_section[1]:\n",
" return\n",
"\n",
" # 2. 转化Tick格式\n",
" drTick = DrTickData()\n",
" d = drTick.__dict__\n",
" for key in d.keys():\n",
" if key != 'datetime':\n",
" d[key] = tick.__dict__[key]\n",
"\n",
" #防御时间格式变为 ”9:00:00.5\"\n",
" if tick.time[2] != ':': \n",
" tick.time = '0' + tick.time\n",
" \n",
" tick_hour = int(tick.time[0:2]) \n",
" local_hour = local_time.hour\n",
" real_date=local_datetime\n",
" if tick_hour == 23 and local_hour == 0:#行情时间慢于系统时间\n",
" real_date+=timedelta(-1)\n",
" elif tick_hour == 0 and local_hour == 23:#系统时间慢于行情时间\n",
" real_date+=timedelta(1)\n",
"\n",
" tick.time = tick.time.ljust(12,'0')\n",
" drTick.datetime = datetime(real_date.year,real_date.month,real_date.day,\n",
" int(tick.time[0:2]), int(tick.time[3:5]), int(tick.time[6:8]), int(tick.time[9:12])*1000)\n",
"\n",
" tmpTime=drTick.datetime.time()\n",
" for sec in invalid_sections:\n",
" if tmpTime>sec[0] and tmpTime<sec[1]:\n",
" return\n",
" \n",
" # 3. 更新Tick数据\n",
" if insertDB:\n",
" insertData(TICK_DB_NAME, tick.vtSymbol, drTick) \n",
"\n",
"procecssTickEvent(tick)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"新版不保存数据到mongodb单次耗时0.0255\n",
"新版含保存数据到mongodb单次耗时0.2334\n"
]
}
],
"source": [
"new_nodb=profiling(test_count)\n",
"client.drop_database(TICK_DB_NAME)\n",
"new_db=profiling(test_count,func=lambda: procecssTickEvent(tick,insertDB=True))\n",
"print '新版不保存数据到mongodb单次耗时%.4f' %original_nodb\n",
"print '新版含保存数据到mongodb单次耗时%.4f' %original_db"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 保存为文本文件效率"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def insertData(db,collection,data):\n",
" for key in data.__dict__:\n",
" fout.write(str(data.__dict__[key])+',')"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"新版含保存数据到text file单次耗时0.2334\n"
]
}
],
"source": [
"fout=open('D:/test.txt','w')\n",
"new_db_text=profiling(test_count,func=lambda: procecssTickEvent(tick,insertDB=True))\n",
"print '新版含保存数据到text file单次耗时%.4f' %original_db\n",
"fout.close()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 时间类型转化效率\n",
"\n",
"注意到不保存数据到数据的版本中,新版相比老版耗时反而降低了,这主要是由于时间转化函数的改写。\n",
"\n",
"如下三种时间转化方法效率差别巨大:"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"转化方法1耗时0.0560\n",
"转化方法2耗时0.0122\n",
"转化方法3耗时0.0032\n"
]
}
],
"source": [
"time_convert1=profiling(10000,lambda:parse('20161212 21:21:21.5'))\n",
"time_convert2=profiling(10000,lambda:datetime.strptime('20161212 21:21:21.5', '%Y%m%d %H:%M:%S.%f'))\n",
"def customized_parse(s):\n",
" s=s.ljust(21,'0')\n",
" return datetime(int(s[0:4]),int(s[4:6]),int(s[6:8]),int(s[9:11]), int(s[12:14]), int(s[15:17]), int(s[18:21])*1000 )\n",
"time_convert3=profiling(10000,lambda:customized_parse('20161212 21:21:21.5')) \n",
"print '转化方法1耗时%.4f' %time_convert1\n",
"print '转化方法2耗时%.4f' %time_convert2\n",
"print '转化方法3耗时%.4f' %time_convert3"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false
},
"source": [
"#总结"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>mongodb写入</th>\n",
" <th>text文件写入</th>\n",
" <th>无数据写入</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>原版</th>\n",
" <td>0.2334</td>\n",
" <td>NaN</td>\n",
" <td>0.0255</td>\n",
" </tr>\n",
" <tr>\n",
" <th>新版</th>\n",
" <td>0.2174</td>\n",
" <td>0.0362</td>\n",
" <td>0.0160</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" mongodb写入 text文件写入 无数据写入\n",
"原版 0.2334 NaN 0.0255\n",
"新版 0.2174 0.0362 0.0160"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pandas as pd\n",
"df=pd.DataFrame([{u'无数据写入':original_nodb,u'mongodb写入':original_db},\n",
" {u'无数据写入': new_nodb, u'mongodb写入': new_db, u'text文件写入':new_db_text}\n",
" ],index=['原版','新版'])\n",
"df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"总的来看行情数据落地原版与新版一次落地耗时都为0.2ms左右。函数中耗时主要来源于mongodb的插入占约为90%的耗时。通过尝试简单的text写入作为数据存储方式耗时得到了大幅降低获得了单次0.04ms耗时的效果采取其它更高效的格式有望进一步降低耗时。但考虑到无数据写入时的耗时为约0.02ms所以期望的最优耗时也就在0.02ms左右。\n",
"\n",
"总的来说基于mongodb的方案能够实时存储的条目数在每秒几百条量级进一步优化可能达到几千条量级。此水平应该己能满足绝大多数的需求。"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 0
}