499 lines
16 KiB
Plaintext
499 lines
16 KiB
Plaintext
{
|
||
"cells": [
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"#vnpy接收行情数据性能测试与改进优化"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"by Jerry He, 2016.12,\n",
|
||
"讨论:https://zhuanlan.zhihu.com/p/24662087"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"近来,量化交易平台vnpy因其开源、功能强大、开发容易、可定制性强的特点,目前已经被广泛应用在量化交易中。\n",
|
||
"行情数据落地是量化交易平台必须解决的一个基础问题,它有两个方面的作用:一是供策略开发时进行分析、回测;二是为实盘程序时提供近期的历史数据。前者可以通过传统效率更高的实现方式(比如我们有基于C++和leveldb实现的行情数据接收、转发、历史数据获取程序)实现,也可以通过向数据提供方购买获取。但是对于后者,直接基于vnpy落地近期的数据是更为简易的方式。\n",
|
||
"\n",
|
||
"vnpy包含行情落地模块dataRecorder,已经实现了tick数据、分钟bar数据保存功能。\n",
|
||
"本工作主要包括:\n",
|
||
"- vnpy原落地函数的性能考查\n",
|
||
"- 针对CTP接口,原落地函数的修正与优化\n",
|
||
"\n",
|
||
"以下所有性能测试时间单位均为毫秒。\n",
|
||
"\n",
|
||
"测试基于windows 7, i7 3.4GHz."
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 1,
|
||
"metadata": {
|
||
"collapsed": true
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"from datetime import datetime, time\n",
|
||
"import time as gtime\n",
|
||
"import pymongo\n",
|
||
"from dateutil.parser import parse"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"## 重构vnpy接收行情数据代码,以用于测试"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 2,
|
||
"metadata": {
|
||
"collapsed": true
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"TICK_DB_NAME='Test'\n",
|
||
"\n",
|
||
"EMPTY_STRING = ''\n",
|
||
"EMPTY_UNICODE = u''\n",
|
||
"EMPTY_INT = 0\n",
|
||
"EMPTY_FLOAT = 0.0\n",
|
||
"\n",
|
||
"class DrTickData(object):\n",
|
||
" \"\"\"Tick数据\"\"\"\n",
|
||
"\n",
|
||
" #----------------------------------------------------------------------\n",
|
||
" def __init__(self):\n",
|
||
" \"\"\"Constructor\"\"\" \n",
|
||
" self.vtSymbol = EMPTY_STRING # vt系统代码\n",
|
||
" self.symbol = EMPTY_STRING # 合约代码\n",
|
||
" self.exchange = EMPTY_STRING # 交易所代码\n",
|
||
"\n",
|
||
" # 成交数据\n",
|
||
" self.lastPrice = EMPTY_FLOAT # 最新成交价\n",
|
||
" self.volume = EMPTY_INT # 最新成交量\n",
|
||
" self.openInterest = EMPTY_INT # 持仓量\n",
|
||
" \n",
|
||
" self.upperLimit = EMPTY_FLOAT # 涨停价\n",
|
||
" self.lowerLimit = EMPTY_FLOAT # 跌停价\n",
|
||
" \n",
|
||
" # tick的时间\n",
|
||
" self.date = EMPTY_STRING # 日期\n",
|
||
" self.time = EMPTY_STRING # 时间\n",
|
||
" self.datetime = None # python的datetime时间对象\n",
|
||
" \n",
|
||
" # 五档行情\n",
|
||
" self.bidPrice1 = EMPTY_FLOAT\n",
|
||
" self.bidPrice2 = EMPTY_FLOAT\n",
|
||
" self.bidPrice3 = EMPTY_FLOAT\n",
|
||
" self.bidPrice4 = EMPTY_FLOAT\n",
|
||
" self.bidPrice5 = EMPTY_FLOAT\n",
|
||
" \n",
|
||
" self.askPrice1 = EMPTY_FLOAT\n",
|
||
" self.askPrice2 = EMPTY_FLOAT\n",
|
||
" self.askPrice3 = EMPTY_FLOAT\n",
|
||
" self.askPrice4 = EMPTY_FLOAT\n",
|
||
" self.askPrice5 = EMPTY_FLOAT \n",
|
||
" \n",
|
||
" self.bidVolume1 = EMPTY_INT\n",
|
||
" self.bidVolume2 = EMPTY_INT\n",
|
||
" self.bidVolume3 = EMPTY_INT\n",
|
||
" self.bidVolume4 = EMPTY_INT\n",
|
||
" self.bidVolume5 = EMPTY_INT\n",
|
||
" \n",
|
||
" self.askVolume1 = EMPTY_INT\n",
|
||
" self.askVolume2 = EMPTY_INT\n",
|
||
" self.askVolume3 = EMPTY_INT\n",
|
||
" self.askVolume4 = EMPTY_INT\n",
|
||
" self.askVolume5 = EMPTY_INT \n",
|
||
" \n",
|
||
"def insertData(db,collection,data):\n",
|
||
" client[db][collection].insert(data.__dict__)\n",
|
||
"\n",
|
||
"def procecssTickEvent(tick, insertDB=False):\n",
|
||
" \"\"\"处理行情推送\"\"\"\n",
|
||
" vtSymbol = tick.vtSymbol\n",
|
||
"\n",
|
||
" # 转化Tick格式\n",
|
||
" drTick = DrTickData()\n",
|
||
" d = drTick.__dict__\n",
|
||
" for key in d.keys():\n",
|
||
" if key != 'datetime':\n",
|
||
" d[key] = tick.__dict__[key]\n",
|
||
" drTick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f') \n",
|
||
" \n",
|
||
" # 更新Tick数据\n",
|
||
" if insertDB:\n",
|
||
" insertData(TICK_DB_NAME, vtSymbol, drTick) "
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"## 创建一个用于测试的Tick数据"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 3,
|
||
"metadata": {
|
||
"collapsed": false
|
||
},
|
||
"outputs": [
|
||
{
|
||
"name": "stdout",
|
||
"output_type": "stream",
|
||
"text": [
|
||
"{u'askPrice2': 0.0, u'lastPrice': 2977.0, u'exchange': u'UNKNOWN', u'bidVolume5': 0, u'bidVolume4': 0, u'bidVolume3': 0, u'bidVolume2': 0, u'bidVolume1': 1551, u'datetime': datetime.datetime(2016, 12, 28, 21, 27, 36, 500000), u'askVolume1': 120, u'askVolume3': 0, u'askVolume2': 0, u'askVolume5': 0, u'askVolume4': 0, u'date': u'20161228', u'askPrice5': 0.0, u'volume': 392068, u'lowerLimit': 2725.0, u'bidPrice5': 0.0, u'bidPrice4': 0.0, u'bidPrice1': 2976.0, u'bidPrice3': 0.0, u'bidPrice2': 0.0, u'vtSymbol': u'rb1705', u'time': u'21:27:36.5', u'openInterest': 2304294.0, u'askPrice4': 0.0, u'askPrice3': 0.0, u'symbol': u'rb1705', u'askPrice1': 2977.0, u'upperLimit': 3136.0}\n"
|
||
]
|
||
}
|
||
],
|
||
"source": [
|
||
"client=pymongo.MongoClient()\n",
|
||
"data=client['VnTrader_Tick_Db']['rb1705'].find_one({})\n",
|
||
"del data['_id']\n",
|
||
"\n",
|
||
"class InputTick: pass\n",
|
||
"tick=InputTick()\n",
|
||
"tick.__dict__.update(data)\n",
|
||
"print tick.__dict__"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"## 测试原版函数性能"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 4,
|
||
"metadata": {
|
||
"collapsed": false
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"def profiling(count,func=None):\n",
|
||
" if func==None: func=lambda: procecssTickEvent(tick)\n",
|
||
" t0=gtime.time()\n",
|
||
" for i in range(count):\n",
|
||
" func()\n",
|
||
" total_time=(gtime.time()-t0)\n",
|
||
" return total_time*1000/count\n",
|
||
"\n",
|
||
"test_count=10000"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 5,
|
||
"metadata": {
|
||
"collapsed": false
|
||
},
|
||
"outputs": [
|
||
{
|
||
"name": "stdout",
|
||
"output_type": "stream",
|
||
"text": [
|
||
"原版不保存数据到mongodb单次耗时:0.0255\n",
|
||
"原版含保存数据到mongodb单次耗时:0.2334\n"
|
||
]
|
||
}
|
||
],
|
||
"source": [
|
||
"original_nodb=profiling(test_count)\n",
|
||
"client.drop_database(TICK_DB_NAME)\n",
|
||
"original_db=profiling(test_count,func=lambda: procecssTickEvent(tick,insertDB=True))\n",
|
||
"print '原版不保存数据到mongodb单次耗时:%.4f' %original_nodb\n",
|
||
"print '原版含保存数据到mongodb单次耗时:%.4f' %original_db"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {
|
||
"collapsed": false
|
||
},
|
||
"source": [
|
||
"##改进版本\n",
|
||
"\n",
|
||
"原版程序使用CTP接口保存期货数据时,存在几个问题:\n",
|
||
"- 非交易时间收到的野数据没有被过滤掉\n",
|
||
"- 当前各交易所提供的date字段混乱,有的使用真实日期,有的使用交易日,导致计算的datetime字段也是有问题的\n",
|
||
"\n",
|
||
"针对以上问题的改进版本如下:"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 6,
|
||
"metadata": {
|
||
"collapsed": false
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"#过滤掉的时间区间,注意集合竞价tick被过滤了。\n",
|
||
"invalid_sections=[(time(2,30,59),time(9,0,0)),\n",
|
||
" (time(11,30,59),time(13,0,0)),\n",
|
||
" (time(15,15,0),time(21,0,0))]\n",
|
||
"\n",
|
||
"#本地时间在此区间时对收到的Tick数据不处理,避免有时期货公司会抽风把数据重推一次。\n",
|
||
"invalid_local_section=(time(5,0,0),time(8,30,0))\n",
|
||
"\n",
|
||
"def procecssTickEvent(tick, insertDB=False):\n",
|
||
" \"\"\"处理行情推送\"\"\"\n",
|
||
" # 1. 本地时间检查\n",
|
||
" local_datetime=datetime.now()\n",
|
||
" local_time=local_datetime.time()\n",
|
||
" if local_time>invalid_local_section[0] and local_time<invalid_local_section[1]:\n",
|
||
" return\n",
|
||
"\n",
|
||
" # 2. 转化Tick格式\n",
|
||
" drTick = DrTickData()\n",
|
||
" d = drTick.__dict__\n",
|
||
" for key in d.keys():\n",
|
||
" if key != 'datetime':\n",
|
||
" d[key] = tick.__dict__[key]\n",
|
||
"\n",
|
||
" #防御时间格式变为 ”9:00:00.5\"\n",
|
||
" if tick.time[2] != ':': \n",
|
||
" tick.time = '0' + tick.time\n",
|
||
" \n",
|
||
" tick_hour = int(tick.time[0:2]) \n",
|
||
" local_hour = local_time.hour\n",
|
||
" real_date=local_datetime\n",
|
||
" if tick_hour == 23 and local_hour == 0:#行情时间慢于系统时间\n",
|
||
" real_date+=timedelta(-1)\n",
|
||
" elif tick_hour == 0 and local_hour == 23:#系统时间慢于行情时间\n",
|
||
" real_date+=timedelta(1)\n",
|
||
"\n",
|
||
" tick.time = tick.time.ljust(12,'0')\n",
|
||
" drTick.datetime = datetime(real_date.year,real_date.month,real_date.day,\n",
|
||
" int(tick.time[0:2]), int(tick.time[3:5]), int(tick.time[6:8]), int(tick.time[9:12])*1000)\n",
|
||
"\n",
|
||
" tmpTime=drTick.datetime.time()\n",
|
||
" for sec in invalid_sections:\n",
|
||
" if tmpTime>sec[0] and tmpTime<sec[1]:\n",
|
||
" return\n",
|
||
" \n",
|
||
" # 3. 更新Tick数据\n",
|
||
" if insertDB:\n",
|
||
" insertData(TICK_DB_NAME, tick.vtSymbol, drTick) \n",
|
||
"\n",
|
||
"procecssTickEvent(tick)"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 7,
|
||
"metadata": {
|
||
"collapsed": false
|
||
},
|
||
"outputs": [
|
||
{
|
||
"name": "stdout",
|
||
"output_type": "stream",
|
||
"text": [
|
||
"新版不保存数据到mongodb单次耗时:0.0255\n",
|
||
"新版含保存数据到mongodb单次耗时:0.2334\n"
|
||
]
|
||
}
|
||
],
|
||
"source": [
|
||
"new_nodb=profiling(test_count)\n",
|
||
"client.drop_database(TICK_DB_NAME)\n",
|
||
"new_db=profiling(test_count,func=lambda: procecssTickEvent(tick,insertDB=True))\n",
|
||
"print '新版不保存数据到mongodb单次耗时:%.4f' %original_nodb\n",
|
||
"print '新版含保存数据到mongodb单次耗时:%.4f' %original_db"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"## 保存为文本文件效率"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 8,
|
||
"metadata": {
|
||
"collapsed": true
|
||
},
|
||
"outputs": [],
|
||
"source": [
|
||
"def insertData(db,collection,data):\n",
|
||
" for key in data.__dict__:\n",
|
||
" fout.write(str(data.__dict__[key])+',')"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 9,
|
||
"metadata": {
|
||
"collapsed": false
|
||
},
|
||
"outputs": [
|
||
{
|
||
"name": "stdout",
|
||
"output_type": "stream",
|
||
"text": [
|
||
"新版含保存数据到text file单次耗时:0.2334\n"
|
||
]
|
||
}
|
||
],
|
||
"source": [
|
||
"fout=open('D:/test.txt','w')\n",
|
||
"new_db_text=profiling(test_count,func=lambda: procecssTickEvent(tick,insertDB=True))\n",
|
||
"print '新版含保存数据到text file单次耗时:%.4f' %original_db\n",
|
||
"fout.close()"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"## 时间类型转化效率\n",
|
||
"\n",
|
||
"注意到不保存数据到数据的版本中,新版相比老版耗时反而降低了,这主要是由于时间转化函数的改写。\n",
|
||
"\n",
|
||
"如下三种时间转化方法效率差别巨大:"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 10,
|
||
"metadata": {
|
||
"collapsed": false
|
||
},
|
||
"outputs": [
|
||
{
|
||
"name": "stdout",
|
||
"output_type": "stream",
|
||
"text": [
|
||
"转化方法1耗时:0.0560\n",
|
||
"转化方法2耗时:0.0122\n",
|
||
"转化方法3耗时:0.0032\n"
|
||
]
|
||
}
|
||
],
|
||
"source": [
|
||
"time_convert1=profiling(10000,lambda:parse('20161212 21:21:21.5'))\n",
|
||
"time_convert2=profiling(10000,lambda:datetime.strptime('20161212 21:21:21.5', '%Y%m%d %H:%M:%S.%f'))\n",
|
||
"def customized_parse(s):\n",
|
||
" s=s.ljust(21,'0')\n",
|
||
" return datetime(int(s[0:4]),int(s[4:6]),int(s[6:8]),int(s[9:11]), int(s[12:14]), int(s[15:17]), int(s[18:21])*1000 )\n",
|
||
"time_convert3=profiling(10000,lambda:customized_parse('20161212 21:21:21.5')) \n",
|
||
"print '转化方法1耗时:%.4f' %time_convert1\n",
|
||
"print '转化方法2耗时:%.4f' %time_convert2\n",
|
||
"print '转化方法3耗时:%.4f' %time_convert3"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {
|
||
"collapsed": false
|
||
},
|
||
"source": [
|
||
"#总结"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": 11,
|
||
"metadata": {
|
||
"collapsed": false
|
||
},
|
||
"outputs": [
|
||
{
|
||
"data": {
|
||
"text/html": [
|
||
"<div>\n",
|
||
"<table border=\"1\" class=\"dataframe\">\n",
|
||
" <thead>\n",
|
||
" <tr style=\"text-align: right;\">\n",
|
||
" <th></th>\n",
|
||
" <th>mongodb写入</th>\n",
|
||
" <th>text文件写入</th>\n",
|
||
" <th>无数据写入</th>\n",
|
||
" </tr>\n",
|
||
" </thead>\n",
|
||
" <tbody>\n",
|
||
" <tr>\n",
|
||
" <th>原版</th>\n",
|
||
" <td>0.2334</td>\n",
|
||
" <td>NaN</td>\n",
|
||
" <td>0.0255</td>\n",
|
||
" </tr>\n",
|
||
" <tr>\n",
|
||
" <th>新版</th>\n",
|
||
" <td>0.2174</td>\n",
|
||
" <td>0.0362</td>\n",
|
||
" <td>0.0160</td>\n",
|
||
" </tr>\n",
|
||
" </tbody>\n",
|
||
"</table>\n",
|
||
"</div>"
|
||
],
|
||
"text/plain": [
|
||
" mongodb写入 text文件写入 无数据写入\n",
|
||
"原版 0.2334 NaN 0.0255\n",
|
||
"新版 0.2174 0.0362 0.0160"
|
||
]
|
||
},
|
||
"execution_count": 11,
|
||
"metadata": {},
|
||
"output_type": "execute_result"
|
||
}
|
||
],
|
||
"source": [
|
||
"import pandas as pd\n",
|
||
"df=pd.DataFrame([{u'无数据写入':original_nodb,u'mongodb写入':original_db},\n",
|
||
" {u'无数据写入': new_nodb, u'mongodb写入': new_db, u'text文件写入':new_db_text}\n",
|
||
" ],index=['原版','新版'])\n",
|
||
"df"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"总的来看,行情数据落地原版与新版一次落地耗时都为0.2ms左右。函数中,耗时主要来源于mongodb的插入,占约为90%的耗时。通过尝试简单的text写入作为数据存储方式,耗时得到了大幅降低,获得了单次0.04ms耗时的效果,采取其它更高效的格式有望进一步降低耗时。但考虑到无数据写入时的耗时为约0.02ms,所以期望的最优耗时也就在0.02ms左右。\n",
|
||
"\n",
|
||
"总的来说,基于mongodb的方案能够实时存储的条目数在每秒几百条量级;进一步优化可能达到几千条量级。此水平应该己能满足绝大多数的需求。"
|
||
]
|
||
}
|
||
],
|
||
"metadata": {
|
||
"kernelspec": {
|
||
"display_name": "Python 2",
|
||
"language": "python",
|
||
"name": "python2"
|
||
},
|
||
"language_info": {
|
||
"codemirror_mode": {
|
||
"name": "ipython",
|
||
"version": 2
|
||
},
|
||
"file_extension": ".py",
|
||
"mimetype": "text/x-python",
|
||
"name": "python",
|
||
"nbconvert_exporter": "python",
|
||
"pygments_lexer": "ipython2",
|
||
"version": "2.7.10"
|
||
}
|
||
},
|
||
"nbformat": 4,
|
||
"nbformat_minor": 0
|
||
}
|