From 6d3135ce055db244e56b48c197170025315fcdac Mon Sep 17 00:00:00 2001 From: Jerry Date: Mon, 16 Jan 2017 23:06:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0vn.py=20tick=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E4=BF=9D=E5=AD=98=E6=80=A7=E8=83=BD=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E5=92=8C=E5=AE=8C=E5=96=84=E6=94=B9=E8=BF=9B=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Performance of Receiving Tick Data.ipynb | 498 ++++++++++++++++++ 1 file changed, 498 insertions(+) create mode 100644 vn.how/performance/Performance of Receiving Tick Data.ipynb diff --git a/vn.how/performance/Performance of Receiving Tick Data.ipynb b/vn.how/performance/Performance of Receiving Tick Data.ipynb new file mode 100644 index 00000000..da78784a --- /dev/null +++ b/vn.how/performance/Performance of Receiving Tick Data.ipynb @@ -0,0 +1,498 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#vnpy接收行情数据性能测试与改进优化" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "by Jerry He, 2016.12,\n", + "讨论:https://zhuanlan.zhihu.com/p/24662087" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "近来,量化交易平台vnpy因其开源、功能强大、开发容易、可定制性强的特点,目前已经被广泛应用在量化交易中。\n", + "行情数据落地是量化交易平台必须解决的一个基础问题,它有两个方面的作用:一是供策略开发时进行分析、回测;二是为实盘程序时提供近期的历史数据。前者可以通过传统效率更高的实现方式(比如我们有基于C++和leveldb实现的行情数据接收、转发、历史数据获取程序)实现,也可以通过向数据提供方购买获取。但是对于后者,直接基于vnpy落地近期的数据是更为简易的方式。\n", + "\n", + "vnpy包含行情落地模块dataRecorder,已经实现了tick数据、分钟bar数据保存功能。\n", + "本工作主要包括:\n", + "- vnpy原落地函数的性能考查\n", + "- 针对CTP接口,原落地函数的修正与优化\n", + "\n", + "以下所有性能测试时间单位均为毫秒。\n", + "\n", + "测试基于windows 7, i7 3.4GHz." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from datetime import datetime, time\n", + "import time as gtime\n", + "import pymongo\n", + "from dateutil.parser import parse" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 重构vnpy接收行情数据代码,以用于测试" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "TICK_DB_NAME='Test'\n", + "\n", + "EMPTY_STRING = ''\n", + "EMPTY_UNICODE = u''\n", + "EMPTY_INT = 0\n", + "EMPTY_FLOAT = 0.0\n", + "\n", + "class DrTickData(object):\n", + " \"\"\"Tick数据\"\"\"\n", + "\n", + " #----------------------------------------------------------------------\n", + " def __init__(self):\n", + " \"\"\"Constructor\"\"\" \n", + " self.vtSymbol = EMPTY_STRING # vt系统代码\n", + " self.symbol = EMPTY_STRING # 合约代码\n", + " self.exchange = EMPTY_STRING # 交易所代码\n", + "\n", + " # 成交数据\n", + " self.lastPrice = EMPTY_FLOAT # 最新成交价\n", + " self.volume = EMPTY_INT # 最新成交量\n", + " self.openInterest = EMPTY_INT # 持仓量\n", + " \n", + " self.upperLimit = EMPTY_FLOAT # 涨停价\n", + " self.lowerLimit = EMPTY_FLOAT # 跌停价\n", + " \n", + " # tick的时间\n", + " self.date = EMPTY_STRING # 日期\n", + " self.time = EMPTY_STRING # 时间\n", + " self.datetime = None # python的datetime时间对象\n", + " \n", + " # 五档行情\n", + " self.bidPrice1 = EMPTY_FLOAT\n", + " self.bidPrice2 = EMPTY_FLOAT\n", + " self.bidPrice3 = EMPTY_FLOAT\n", + " self.bidPrice4 = EMPTY_FLOAT\n", + " self.bidPrice5 = EMPTY_FLOAT\n", + " \n", + " self.askPrice1 = EMPTY_FLOAT\n", + " self.askPrice2 = EMPTY_FLOAT\n", + " self.askPrice3 = EMPTY_FLOAT\n", + " self.askPrice4 = EMPTY_FLOAT\n", + " self.askPrice5 = EMPTY_FLOAT \n", + " \n", + " self.bidVolume1 = EMPTY_INT\n", + " self.bidVolume2 = EMPTY_INT\n", + " self.bidVolume3 = EMPTY_INT\n", + " self.bidVolume4 = EMPTY_INT\n", + " self.bidVolume5 = EMPTY_INT\n", + " \n", + " self.askVolume1 = EMPTY_INT\n", + " self.askVolume2 = EMPTY_INT\n", + " self.askVolume3 = EMPTY_INT\n", + " self.askVolume4 = EMPTY_INT\n", + " self.askVolume5 = EMPTY_INT \n", + " \n", + "def insertData(db,collection,data):\n", + " client[db][collection].insert(data.__dict__)\n", + "\n", + "def procecssTickEvent(tick, insertDB=False):\n", + " \"\"\"处理行情推送\"\"\"\n", + " vtSymbol = tick.vtSymbol\n", + "\n", + " # 转化Tick格式\n", + " drTick = DrTickData()\n", + " d = drTick.__dict__\n", + " for key in d.keys():\n", + " if key != 'datetime':\n", + " d[key] = tick.__dict__[key]\n", + " drTick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f') \n", + " \n", + " # 更新Tick数据\n", + " if insertDB:\n", + " insertData(TICK_DB_NAME, vtSymbol, drTick) " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 创建一个用于测试的Tick数据" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{u'askPrice2': 0.0, u'lastPrice': 2977.0, u'exchange': u'UNKNOWN', u'bidVolume5': 0, u'bidVolume4': 0, u'bidVolume3': 0, u'bidVolume2': 0, u'bidVolume1': 1551, u'datetime': datetime.datetime(2016, 12, 28, 21, 27, 36, 500000), u'askVolume1': 120, u'askVolume3': 0, u'askVolume2': 0, u'askVolume5': 0, u'askVolume4': 0, u'date': u'20161228', u'askPrice5': 0.0, u'volume': 392068, u'lowerLimit': 2725.0, u'bidPrice5': 0.0, u'bidPrice4': 0.0, u'bidPrice1': 2976.0, u'bidPrice3': 0.0, u'bidPrice2': 0.0, u'vtSymbol': u'rb1705', u'time': u'21:27:36.5', u'openInterest': 2304294.0, u'askPrice4': 0.0, u'askPrice3': 0.0, u'symbol': u'rb1705', u'askPrice1': 2977.0, u'upperLimit': 3136.0}\n" + ] + } + ], + "source": [ + "client=pymongo.MongoClient()\n", + "data=client['VnTrader_Tick_Db']['rb1705'].find_one({})\n", + "del data['_id']\n", + "\n", + "class InputTick: pass\n", + "tick=InputTick()\n", + "tick.__dict__.update(data)\n", + "print tick.__dict__" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 测试原版函数性能" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "def profiling(count,func=None):\n", + " if func==None: func=lambda: procecssTickEvent(tick)\n", + " t0=gtime.time()\n", + " for i in range(count):\n", + " func()\n", + " total_time=(gtime.time()-t0)\n", + " return total_time*1000/count\n", + "\n", + "test_count=10000" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "collapsed": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "原版不保存数据到mongodb单次耗时:0.0255\n", + "原版含保存数据到mongodb单次耗时:0.2334\n" + ] + } + ], + "source": [ + "original_nodb=profiling(test_count)\n", + "client.drop_database(TICK_DB_NAME)\n", + "original_db=profiling(test_count,func=lambda: procecssTickEvent(tick,insertDB=True))\n", + "print '原版不保存数据到mongodb单次耗时:%.4f' %original_nodb\n", + "print '原版含保存数据到mongodb单次耗时:%.4f' %original_db" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false + }, + "source": [ + "##改进版本\n", + "\n", + "原版程序使用CTP接口保存期货数据时,存在几个问题:\n", + "- 非交易时间收到的野数据没有被过滤掉\n", + "- 当前各交易所提供的date字段混乱,有的使用真实日期,有的使用交易日,导致计算的datetime字段也是有问题的\n", + "\n", + "针对以上问题的改进版本如下:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "#过滤掉的时间区间,注意集合竞价tick被过滤了。\n", + "invalid_sections=[(time(2,30,59),time(9,0,0)),\n", + " (time(11,30,59),time(13,0,0)),\n", + " (time(15,15,0),time(21,0,0))]\n", + "\n", + "#本地时间在此区间时对收到的Tick数据不处理,避免有时期货公司会抽风把数据重推一次。\n", + "invalid_local_section=(time(5,0,0),time(8,30,0))\n", + "\n", + "def procecssTickEvent(tick, insertDB=False):\n", + " \"\"\"处理行情推送\"\"\"\n", + " # 1. 本地时间检查\n", + " local_datetime=datetime.now()\n", + " local_time=local_datetime.time()\n", + " if local_time>invalid_local_section[0] and local_timesec[0] and tmpTime\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
mongodb写入text文件写入无数据写入
原版0.2334NaN0.0255
新版0.21740.03620.0160
\n", + "" + ], + "text/plain": [ + " mongodb写入 text文件写入 无数据写入\n", + "原版 0.2334 NaN 0.0255\n", + "新版 0.2174 0.0362 0.0160" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import pandas as pd\n", + "df=pd.DataFrame([{u'无数据写入':original_nodb,u'mongodb写入':original_db},\n", + " {u'无数据写入': new_nodb, u'mongodb写入': new_db, u'text文件写入':new_db_text}\n", + " ],index=['原版','新版'])\n", + "df" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "总的来看,行情数据落地原版与新版一次落地耗时都为0.2ms左右。函数中,耗时主要来源于mongodb的插入,占约为90%的耗时。通过尝试简单的text写入作为数据存储方式,耗时得到了大幅降低,获得了单次0.04ms耗时的效果,采取其它更高效的格式有望进一步降低耗时。但考虑到无数据写入时的耗时为约0.02ms,所以期望的最优耗时也就在0.02ms左右。\n", + "\n", + "总的来说,基于mongodb的方案能够实时存储的条目数在每秒几百条量级;进一步优化可能达到几千条量级。此水平应该己能满足绝大多数的需求。" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 2", + "language": "python", + "name": "python2" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.10" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +}