{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "#vnpy接收行情数据性能测试与改进优化" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "by Jerry He, 2016.12,\n", "讨论:https://zhuanlan.zhihu.com/p/24662087" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "近来,量化交易平台vnpy因其开源、功能强大、开发容易、可定制性强的特点,目前已经被广泛应用在量化交易中。\n", "行情数据落地是量化交易平台必须解决的一个基础问题,它有两个方面的作用:一是供策略开发时进行分析、回测;二是为实盘程序时提供近期的历史数据。前者可以通过传统效率更高的实现方式(比如我们有基于C++和leveldb实现的行情数据接收、转发、历史数据获取程序)实现,也可以通过向数据提供方购买获取。但是对于后者,直接基于vnpy落地近期的数据是更为简易的方式。\n", "\n", "vnpy包含行情落地模块dataRecorder,已经实现了tick数据、分钟bar数据保存功能。\n", "本工作主要包括:\n", "- vnpy原落地函数的性能考查\n", "- 针对CTP接口,原落地函数的修正与优化\n", "\n", "以下所有性能测试时间单位均为毫秒。\n", "\n", "测试基于windows 7, i7 3.4GHz." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": true }, "outputs": [], "source": [ "from datetime import datetime, time\n", "import time as gtime\n", "import pymongo\n", "from dateutil.parser import parse" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 重构vnpy接收行情数据代码,以用于测试" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": true }, "outputs": [], "source": [ "TICK_DB_NAME='Test'\n", "\n", "EMPTY_STRING = ''\n", "EMPTY_UNICODE = u''\n", "EMPTY_INT = 0\n", "EMPTY_FLOAT = 0.0\n", "\n", "class DrTickData(object):\n", " \"\"\"Tick数据\"\"\"\n", "\n", " #----------------------------------------------------------------------\n", " def __init__(self):\n", " \"\"\"Constructor\"\"\" \n", " self.vtSymbol = EMPTY_STRING # vt系统代码\n", " self.symbol = EMPTY_STRING # 合约代码\n", " self.exchange = EMPTY_STRING # 交易所代码\n", "\n", " # 成交数据\n", " self.lastPrice = EMPTY_FLOAT # 最新成交价\n", " self.volume = EMPTY_INT # 最新成交量\n", " self.openInterest = EMPTY_INT # 持仓量\n", " \n", " self.upperLimit = EMPTY_FLOAT # 涨停价\n", " self.lowerLimit = EMPTY_FLOAT # 跌停价\n", " \n", " # tick的时间\n", " self.date = EMPTY_STRING # 日期\n", " self.time = EMPTY_STRING # 时间\n", " self.datetime = None # python的datetime时间对象\n", " \n", " # 五档行情\n", " self.bidPrice1 = EMPTY_FLOAT\n", " self.bidPrice2 = EMPTY_FLOAT\n", " self.bidPrice3 = EMPTY_FLOAT\n", " self.bidPrice4 = EMPTY_FLOAT\n", " self.bidPrice5 = EMPTY_FLOAT\n", " \n", " self.askPrice1 = EMPTY_FLOAT\n", " self.askPrice2 = EMPTY_FLOAT\n", " self.askPrice3 = EMPTY_FLOAT\n", " self.askPrice4 = EMPTY_FLOAT\n", " self.askPrice5 = EMPTY_FLOAT \n", " \n", " self.bidVolume1 = EMPTY_INT\n", " self.bidVolume2 = EMPTY_INT\n", " self.bidVolume3 = EMPTY_INT\n", " self.bidVolume4 = EMPTY_INT\n", " self.bidVolume5 = EMPTY_INT\n", " \n", " self.askVolume1 = EMPTY_INT\n", " self.askVolume2 = EMPTY_INT\n", " self.askVolume3 = EMPTY_INT\n", " self.askVolume4 = EMPTY_INT\n", " self.askVolume5 = EMPTY_INT \n", " \n", "def insertData(db,collection,data):\n", " client[db][collection].insert(data.__dict__)\n", "\n", "def procecssTickEvent(tick, insertDB=False):\n", " \"\"\"处理行情推送\"\"\"\n", " vtSymbol = tick.vtSymbol\n", "\n", " # 转化Tick格式\n", " drTick = DrTickData()\n", " d = drTick.__dict__\n", " for key in d.keys():\n", " if key != 'datetime':\n", " d[key] = tick.__dict__[key]\n", " drTick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f') \n", " \n", " # 更新Tick数据\n", " if insertDB:\n", " insertData(TICK_DB_NAME, vtSymbol, drTick) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 创建一个用于测试的Tick数据" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{u'askPrice2': 0.0, u'lastPrice': 2977.0, u'exchange': u'UNKNOWN', u'bidVolume5': 0, u'bidVolume4': 0, u'bidVolume3': 0, u'bidVolume2': 0, u'bidVolume1': 1551, u'datetime': datetime.datetime(2016, 12, 28, 21, 27, 36, 500000), u'askVolume1': 120, u'askVolume3': 0, u'askVolume2': 0, u'askVolume5': 0, u'askVolume4': 0, u'date': u'20161228', u'askPrice5': 0.0, u'volume': 392068, u'lowerLimit': 2725.0, u'bidPrice5': 0.0, u'bidPrice4': 0.0, u'bidPrice1': 2976.0, u'bidPrice3': 0.0, u'bidPrice2': 0.0, u'vtSymbol': u'rb1705', u'time': u'21:27:36.5', u'openInterest': 2304294.0, u'askPrice4': 0.0, u'askPrice3': 0.0, u'symbol': u'rb1705', u'askPrice1': 2977.0, u'upperLimit': 3136.0}\n" ] } ], "source": [ "client=pymongo.MongoClient()\n", "data=client['VnTrader_Tick_Db']['rb1705'].find_one({})\n", "del data['_id']\n", "\n", "class InputTick: pass\n", "tick=InputTick()\n", "tick.__dict__.update(data)\n", "print tick.__dict__" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 测试原版函数性能" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def profiling(count,func=None):\n", " if func==None: func=lambda: procecssTickEvent(tick)\n", " t0=gtime.time()\n", " for i in range(count):\n", " func()\n", " total_time=(gtime.time()-t0)\n", " return total_time*1000/count\n", "\n", "test_count=10000" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "原版不保存数据到mongodb单次耗时:0.0255\n", "原版含保存数据到mongodb单次耗时:0.2334\n" ] } ], "source": [ "original_nodb=profiling(test_count)\n", "client.drop_database(TICK_DB_NAME)\n", "original_db=profiling(test_count,func=lambda: procecssTickEvent(tick,insertDB=True))\n", "print '原版不保存数据到mongodb单次耗时:%.4f' %original_nodb\n", "print '原版含保存数据到mongodb单次耗时:%.4f' %original_db" ] }, { "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "##改进版本\n", "\n", "原版程序使用CTP接口保存期货数据时,存在几个问题:\n", "- 非交易时间收到的野数据没有被过滤掉\n", "- 当前各交易所提供的date字段混乱,有的使用真实日期,有的使用交易日,导致计算的datetime字段也是有问题的\n", "\n", "针对以上问题的改进版本如下:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": false }, "outputs": [], "source": [ "#过滤掉的时间区间,注意集合竞价tick被过滤了。\n", "invalid_sections=[(time(2,30,59),time(9,0,0)),\n", " (time(11,30,59),time(13,0,0)),\n", " (time(15,15,0),time(21,0,0))]\n", "\n", "#本地时间在此区间时对收到的Tick数据不处理,避免有时期货公司会抽风把数据重推一次。\n", "invalid_local_section=(time(5,0,0),time(8,30,0))\n", "\n", "def procecssTickEvent(tick, insertDB=False):\n", " \"\"\"处理行情推送\"\"\"\n", " # 1. 本地时间检查\n", " local_datetime=datetime.now()\n", " local_time=local_datetime.time()\n", " if local_time>invalid_local_section[0] and local_timesec[0] and tmpTime\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
mongodb写入text文件写入无数据写入
原版0.2334NaN0.0255
新版0.21740.03620.0160
\n", "" ], "text/plain": [ " mongodb写入 text文件写入 无数据写入\n", "原版 0.2334 NaN 0.0255\n", "新版 0.2174 0.0362 0.0160" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import pandas as pd\n", "df=pd.DataFrame([{u'无数据写入':original_nodb,u'mongodb写入':original_db},\n", " {u'无数据写入': new_nodb, u'mongodb写入': new_db, u'text文件写入':new_db_text}\n", " ],index=['原版','新版'])\n", "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "总的来看,行情数据落地原版与新版一次落地耗时都为0.2ms左右。函数中,耗时主要来源于mongodb的插入,占约为90%的耗时。通过尝试简单的text写入作为数据存储方式,耗时得到了大幅降低,获得了单次0.04ms耗时的效果,采取其它更高效的格式有望进一步降低耗时。但考虑到无数据写入时的耗时为约0.02ms,所以期望的最优耗时也就在0.02ms左右。\n", "\n", "总的来说,基于mongodb的方案能够实时存储的条目数在每秒几百条量级;进一步优化可能达到几千条量级。此水平应该己能满足绝大多数的需求。" ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.10" } }, "nbformat": 4, "nbformat_minor": 0 }