Merge pull request #629 from vnpy/dev

Dev
This commit is contained in:
vn.py 2017-12-08 15:09:46 +08:00 committed by GitHub
commit f1a7856022
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 260 additions and 1395 deletions

View File

@ -11,7 +11,7 @@ vn.py是基于Python的开源量化交易程序开发框架起源于国内私
### 项目结构
1. 丰富的Python交易和数据API接口vnpy.api基本覆盖了国内外所有常规交易品种股票、期货、期权、外汇、外盘、比特币具体包括
1. 丰富的Python交易API接口vnpy.api基本覆盖了国内外所有常规交易品种股票、期货、期权、外汇、外盘、比特币具体包括
- CTPctp
@ -41,9 +41,7 @@ vn.py是基于Python的开源量化交易程序开发框架起源于国内私
- 火币huobi
- 链行lhang
- 通联数据datayes
- LBanklbank
2. 简洁易用的事件驱动引擎vnpy.event作为事件驱动型交易程序的核心
@ -53,7 +51,7 @@ vn.py是基于Python的开源量化交易程序开发框架起源于国内私
* 同时登录多个交易接口,在一套界面上监控多种市场的行情和多种资产账户的资金、持仓、委托、成交情况
* 支持跨市场套利CTP期货和LTS证券、境内外套利CTP期货和IB外盘、多市场数据整合实时预测走势CTP的股指期货数据、IB的外盘A50数据、Wind的行业指数数据等策略应用
* 支持跨市场套利CTP期货和XTP证券、境内外套利CTP期货和IB外盘、多市场数据整合实时预测走势CTP的股指期货数据、IB的外盘A50数据、Wind的行业指数数据等策略应用
* CTA策略引擎模块在保持易用性的同时允许用户针对CTA类策略运行过程中委托的报撤行为进行细粒度控制降低交易滑点、实现高频策略
@ -148,10 +146,11 @@ from vnpy.trader.uiMainWindow import MainWindow
# 加载底层接口
from vnpy.trader.gateway import (ctpGateway, oandaGateway, ibGateway,
huobiGateway, okcoinGateway)
tkproGateway)
if system == 'Windows':
from vnpy.trader.gateway import femasGateway, xspeedGateway
from vnpy.trader.gateway import (femasGateway, xspeedGateway,
futuGateway, secGateway)
if system == 'Linux':
from vnpy.trader.gateway import xtpGateway
@ -174,14 +173,15 @@ def main():
# 添加交易接口
me.addGateway(ctpGateway)
me.addGateway(tkproGateway)
me.addGateway(oandaGateway)
me.addGateway(ibGateway)
me.addGateway(huobiGateway)
me.addGateway(okcoinGateway)
if system == 'Windows':
me.addGateway(femasGateway)
me.addGateway(xspeedGateway)
me.addGateway(secGateway)
me.addGateway(futuGateway)
if system == 'Linux':
me.addGateway(xtpGateway)

View File

@ -0,0 +1,7 @@
{
"brokerID": "2001",
"mdAddress": "tcp://125.64.36.26:51213",
"tdAddress": "tcp://125.64.36.26:51205",
"userID": "请联系经纪商申请",
"password": "请联系经纪商申请"
}

View File

@ -1,7 +1,7 @@
{
"brokerID": "9999",
"mdAddress": "tcp://180.168.146.187:10011",
"tdAddress": "tcp://180.168.146.187:10001",
"userID": "simnow申请",
"password": "simnow申请"
"brokerID": "2001",
"mdAddress": "tcp://125.64.36.26:51213",
"tdAddress": "tcp://125.64.36.26:51205",
"userID": "请联系经纪商申请",
"password": "请联系经纪商申请"
}

View File

@ -0,0 +1,10 @@
# encoding: UTF-8
from vnpy.trader import vtConstant
from vnpy.trader.gateway.ctpGateway import CtpGateway
gatewayClass = CtpGateway
gatewayName = 'CTPSEC'
gatewayDisplayName = 'CTP证券'
gatewayType = vtConstant.GATEWAYTYPE_FUTURES
gatewayQryEnabled = True

View File

@ -1 +0,0 @@
tBr8RMewRABqiW7HBOkrkg==

View File

@ -18,6 +18,8 @@ from vnpy.trader.uiMainWindow import MainWindow
# 加载底层接口
from vnpy.trader.gateway import (secGateway, ctpGateway)
import ctpsecGateway
# 加载上层应用
from vnpy.trader.app import (riskManager, optionMaster)
@ -37,6 +39,7 @@ def main():
# 添加交易接口
me.addGateway(secGateway)
me.addGateway(ctpGateway)
me.addGateway(ctpsecGateway)
# 添加上层应用
me.addApp(riskManager)

View File

@ -0,0 +1,3 @@
# quantOS历史行情服务
请在[www.quantos.org](www.quantos.org)注册后将用户名和TOKEN输入到配置文件config.json中即可使用该服务。

View File

@ -0,0 +1,10 @@
{
"MONGO_HOST": "localhost",
"MONGO_PORT": 27017,
"DATA_SERVER": "tcp://data.tushare.org:8910",
"USERNAME": "",
"TOKEN": "",
"SYMBOLS": ["510050.SSE", "510300.SSE"]
}

View File

@ -0,0 +1,121 @@
# encoding: UTF-8
import sys
import json
from datetime import datetime, timedelta
from time import time, sleep
from pymongo import MongoClient, ASCENDING
from vnpy.trader.vtObject import VtBarData
from vnpy.trader.app.ctaStrategy.ctaBase import MINUTE_DB_NAME
from vnpy.trader.gateway.tkproGateway.DataApi import DataApi
# 交易所类型映射
exchangeMap = {}
exchangeMap['CFFEX'] = 'CFE'
exchangeMap['SHFE'] = 'SHF'
exchangeMap['CZCE'] = 'CZC'
exchangeMap['DCE'] = 'DCE'
exchangeMap['SSE'] = 'SH'
exchangeMap['SZSE'] = 'SZ'
exchangeMapReverse = {v:k for k,v in exchangeMap.items()}
# 加载配置
config = open('config.json')
setting = json.load(config)
config.close()
MONGO_HOST = setting['MONGO_HOST']
MONGO_PORT = setting['MONGO_PORT']
SYMBOLS = setting['SYMBOLS']
USERNAME = setting['USERNAME']
TOKEN = setting['TOKEN']
DATA_SERVER = setting['DATA_SERVER']
# 创建API对象
mc = MongoClient(MONGO_HOST, MONGO_PORT) # Mongo连接
db = mc[MINUTE_DB_NAME] # 数据库
#----------------------------------------------------------------------
def generateVtBar(row):
"""生成K线"""
bar = VtBarData()
symbol, exchange = row['symbol'].split('.')
bar.symbol = symbol
bar.exchange = exchangeMapReverse[exchange]
bar.vtSymbol = '.'.join([bar.symbol, bar.exchange])
bar.open = row['open']
bar.high = row['high']
bar.low = row['low']
bar.close = row['close']
bar.volume = row['volume']
bar.date = str(row['trade_date'])
bar.time = str(row['time'])
bar.datetime = datetime.strptime(' '.join([bar.date, bar.time]), '%Y%m%d %H%M%S')
return bar
#----------------------------------------------------------------------
def downMinuteBarBySymbol(api, vtSymbol, startDate):
"""下载某一合约的分钟线数据"""
start = time()
cl = db[vtSymbol]
cl.ensure_index([('datetime', ASCENDING)], unique=True) # 添加索引
dt = datetime.strptime(startDate, '%Y%m%d')
today = datetime.now()
delta = timedelta(1)
code, exchange = vtSymbol.split('.')
symbol = '.'.join([code, exchangeMap[exchange]])
while dt <= today:
d = int(dt.strftime('%Y%m%d'))
df, msg = api.bar(symbol, freq='1M', trade_date=d)
dt += delta
if df is None:
continue
for ix, row in df.iterrows():
bar = generateVtBar(row)
d = bar.__dict__
flt = {'datetime': bar.datetime}
cl.replace_one(flt, d, True)
end = time()
cost = (end - start) * 1000
print u'合约%s数据下载完成%s - %s,耗时%s毫秒' %(vtSymbol, startDate, today.strftime('%Y%m%d'), cost)
#----------------------------------------------------------------------
def downloadAllMinuteBar(api):
"""下载所有配置中的合约的分钟线数据"""
print '-' * 50
print u'开始下载合约分钟线数据'
print '-' * 50
startDt = datetime.today() - 10 * timedelta(1)
startDate = startDt.strftime('%Y%m%d')
# 添加下载任务
for symbol in SYMBOLS:
downMinuteBarBySymbol(api, str(symbol), startDate)
print '-' * 50
print u'合约分钟线数据下载完成'
print '-' * 50
if __name__ == '__main__':
downloadAllMinuteBar()

View File

@ -0,0 +1,22 @@
# encoding: UTF-8
"""
立即下载数据到数据库中用于手动执行更新操作
"""
import json
from vnpy.trader.gateway.tkproGateway.DataApi import DataApi
from dataService import *
if __name__ == '__main__':
# 创建API对象
api = DataApi(DATA_SERVER)
info, msg = api.login(USERNAME, TOKEN)
if not info:
print u'数据服务器登录失败,原因:%s' %msg
# 下载数据
downloadAllMinuteBar(api)

View File

@ -0,0 +1,39 @@
# encoding: UTF-8
"""
定时服务可无人值守运行实现每日自动下载更新历史行情数据到数据库中
"""
import datetime as ddt
from dataService import *
if __name__ == '__main__':
taskCompletedDate = None
# 生成一个随机的任务下载时间,用于避免所有用户在同一时间访问数据服务器
taskTime = ddt.time(hour=17, minute=0)
# 进入主循环
while True:
t = ddt.datetime.now()
# 每天到达任务下载时间后,执行数据下载的操作
if t.time() > taskTime and (taskCompletedDate is None or t.date() != taskCompletedDate):
# 创建API对象
api = DataApi(DATA_SERVER)
info, msg = api.login(USERNAME, TOKEN)
if not info:
print u'数据服务器登录失败,原因:%s' %msg
# 下载数据
downloadAllMinuteBar(api)
# 更新任务完成的日期
taskCompletedDate = t.date()
else:
print u'当前时间%s,任务定时%s' %(t, taskTime)
sleep(60)

View File

@ -4,6 +4,8 @@
* VnTrader最常用的vn.py图形交易界面
* OptionMaster: 期权量化交易系统
* DataRecording全自动行情记录工具无需用户每日定时重启
* CtaTrading无图形界面模式的CTA策略交易
@ -18,4 +20,6 @@
* TushareDataServiceTuShare历史行情服务A股
* FutuDataService富途证券历史行情服务美股、港股
* FutuDataService富途证券历史行情服务美股、港股
* QuantosDataService: quantOS历史行情服务A股、期货

View File

@ -1,5 +0,0 @@
# vn.py项目的实战应用指南
本文件夹下的内容主要是围绕vn.py在实际交易中的一系列具体应用包括说明文档和代码例子。
* performance[《百倍加速Python量化策略的算法性能提升指南》](http://zhuanlan.zhihu.com/p/24168485)

View File

@ -1,498 +0,0 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#vnpy接收行情数据性能测试与改进优化"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"by Jerry He, 2016.12,\n",
"讨论https://zhuanlan.zhihu.com/p/24662087"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"近来量化交易平台vnpy因其开源、功能强大、开发容易、可定制性强的特点目前已经被广泛应用在量化交易中。\n",
"行情数据落地是量化交易平台必须解决的一个基础问题它有两个方面的作用一是供策略开发时进行分析、回测二是为实盘程序时提供近期的历史数据。前者可以通过传统效率更高的实现方式比如我们有基于C++和leveldb实现的行情数据接收、转发、历史数据获取程序实现也可以通过向数据提供方购买获取。但是对于后者直接基于vnpy落地近期的数据是更为简易的方式。\n",
"\n",
"vnpy包含行情落地模块dataRecorder已经实现了tick数据、分钟bar数据保存功能。\n",
"本工作主要包括:\n",
"- vnpy原落地函数的性能考查\n",
"- 针对CTP接口原落地函数的修正与优化\n",
"\n",
"以下所有性能测试时间单位均为毫秒。\n",
"\n",
"测试基于windows 7, i7 3.4GHz."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from datetime import datetime, time\n",
"import time as gtime\n",
"import pymongo\n",
"from dateutil.parser import parse"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 重构vnpy接收行情数据代码以用于测试"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"TICK_DB_NAME='Test'\n",
"\n",
"EMPTY_STRING = ''\n",
"EMPTY_UNICODE = u''\n",
"EMPTY_INT = 0\n",
"EMPTY_FLOAT = 0.0\n",
"\n",
"class DrTickData(object):\n",
" \"\"\"Tick数据\"\"\"\n",
"\n",
" #----------------------------------------------------------------------\n",
" def __init__(self):\n",
" \"\"\"Constructor\"\"\" \n",
" self.vtSymbol = EMPTY_STRING # vt系统代码\n",
" self.symbol = EMPTY_STRING # 合约代码\n",
" self.exchange = EMPTY_STRING # 交易所代码\n",
"\n",
" # 成交数据\n",
" self.lastPrice = EMPTY_FLOAT # 最新成交价\n",
" self.volume = EMPTY_INT # 最新成交量\n",
" self.openInterest = EMPTY_INT # 持仓量\n",
" \n",
" self.upperLimit = EMPTY_FLOAT # 涨停价\n",
" self.lowerLimit = EMPTY_FLOAT # 跌停价\n",
" \n",
" # tick的时间\n",
" self.date = EMPTY_STRING # 日期\n",
" self.time = EMPTY_STRING # 时间\n",
" self.datetime = None # python的datetime时间对象\n",
" \n",
" # 五档行情\n",
" self.bidPrice1 = EMPTY_FLOAT\n",
" self.bidPrice2 = EMPTY_FLOAT\n",
" self.bidPrice3 = EMPTY_FLOAT\n",
" self.bidPrice4 = EMPTY_FLOAT\n",
" self.bidPrice5 = EMPTY_FLOAT\n",
" \n",
" self.askPrice1 = EMPTY_FLOAT\n",
" self.askPrice2 = EMPTY_FLOAT\n",
" self.askPrice3 = EMPTY_FLOAT\n",
" self.askPrice4 = EMPTY_FLOAT\n",
" self.askPrice5 = EMPTY_FLOAT \n",
" \n",
" self.bidVolume1 = EMPTY_INT\n",
" self.bidVolume2 = EMPTY_INT\n",
" self.bidVolume3 = EMPTY_INT\n",
" self.bidVolume4 = EMPTY_INT\n",
" self.bidVolume5 = EMPTY_INT\n",
" \n",
" self.askVolume1 = EMPTY_INT\n",
" self.askVolume2 = EMPTY_INT\n",
" self.askVolume3 = EMPTY_INT\n",
" self.askVolume4 = EMPTY_INT\n",
" self.askVolume5 = EMPTY_INT \n",
" \n",
"def insertData(db,collection,data):\n",
" client[db][collection].insert(data.__dict__)\n",
"\n",
"def procecssTickEvent(tick, insertDB=False):\n",
" \"\"\"处理行情推送\"\"\"\n",
" vtSymbol = tick.vtSymbol\n",
"\n",
" # 转化Tick格式\n",
" drTick = DrTickData()\n",
" d = drTick.__dict__\n",
" for key in d.keys():\n",
" if key != 'datetime':\n",
" d[key] = tick.__dict__[key]\n",
" drTick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f') \n",
" \n",
" # 更新Tick数据\n",
" if insertDB:\n",
" insertData(TICK_DB_NAME, vtSymbol, drTick) "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 创建一个用于测试的Tick数据"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{u'askPrice2': 0.0, u'lastPrice': 2977.0, u'exchange': u'UNKNOWN', u'bidVolume5': 0, u'bidVolume4': 0, u'bidVolume3': 0, u'bidVolume2': 0, u'bidVolume1': 1551, u'datetime': datetime.datetime(2016, 12, 28, 21, 27, 36, 500000), u'askVolume1': 120, u'askVolume3': 0, u'askVolume2': 0, u'askVolume5': 0, u'askVolume4': 0, u'date': u'20161228', u'askPrice5': 0.0, u'volume': 392068, u'lowerLimit': 2725.0, u'bidPrice5': 0.0, u'bidPrice4': 0.0, u'bidPrice1': 2976.0, u'bidPrice3': 0.0, u'bidPrice2': 0.0, u'vtSymbol': u'rb1705', u'time': u'21:27:36.5', u'openInterest': 2304294.0, u'askPrice4': 0.0, u'askPrice3': 0.0, u'symbol': u'rb1705', u'askPrice1': 2977.0, u'upperLimit': 3136.0}\n"
]
}
],
"source": [
"client=pymongo.MongoClient()\n",
"data=client['VnTrader_Tick_Db']['rb1705'].find_one({})\n",
"del data['_id']\n",
"\n",
"class InputTick: pass\n",
"tick=InputTick()\n",
"tick.__dict__.update(data)\n",
"print tick.__dict__"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 测试原版函数性能"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def profiling(count,func=None):\n",
" if func==None: func=lambda: procecssTickEvent(tick)\n",
" t0=gtime.time()\n",
" for i in range(count):\n",
" func()\n",
" total_time=(gtime.time()-t0)\n",
" return total_time*1000/count\n",
"\n",
"test_count=10000"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"原版不保存数据到mongodb单次耗时0.0255\n",
"原版含保存数据到mongodb单次耗时0.2334\n"
]
}
],
"source": [
"original_nodb=profiling(test_count)\n",
"client.drop_database(TICK_DB_NAME)\n",
"original_db=profiling(test_count,func=lambda: procecssTickEvent(tick,insertDB=True))\n",
"print '原版不保存数据到mongodb单次耗时%.4f' %original_nodb\n",
"print '原版含保存数据到mongodb单次耗时%.4f' %original_db"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false
},
"source": [
"##改进版本\n",
"\n",
"原版程序使用CTP接口保存期货数据时存在几个问题\n",
"- 非交易时间收到的野数据没有被过滤掉\n",
"- 当前各交易所提供的date字段混乱有的使用真实日期有的使用交易日导致计算的datetime字段也是有问题的\n",
"\n",
"针对以上问题的改进版本如下:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"#过滤掉的时间区间注意集合竞价tick被过滤了。\n",
"invalid_sections=[(time(2,30,59),time(9,0,0)),\n",
" (time(11,30,59),time(13,0,0)),\n",
" (time(15,15,0),time(21,0,0))]\n",
"\n",
"#本地时间在此区间时对收到的Tick数据不处理避免有时期货公司会抽风把数据重推一次。\n",
"invalid_local_section=(time(5,0,0),time(8,30,0))\n",
"\n",
"def procecssTickEvent(tick, insertDB=False):\n",
" \"\"\"处理行情推送\"\"\"\n",
" # 1. 本地时间检查\n",
" local_datetime=datetime.now()\n",
" local_time=local_datetime.time()\n",
" if local_time>invalid_local_section[0] and local_time<invalid_local_section[1]:\n",
" return\n",
"\n",
" # 2. 转化Tick格式\n",
" drTick = DrTickData()\n",
" d = drTick.__dict__\n",
" for key in d.keys():\n",
" if key != 'datetime':\n",
" d[key] = tick.__dict__[key]\n",
"\n",
" #防御时间格式变为 ”9:00:00.5\"\n",
" if tick.time[2] != ':': \n",
" tick.time = '0' + tick.time\n",
" \n",
" tick_hour = int(tick.time[0:2]) \n",
" local_hour = local_time.hour\n",
" real_date=local_datetime\n",
" if tick_hour == 23 and local_hour == 0:#行情时间慢于系统时间\n",
" real_date+=timedelta(-1)\n",
" elif tick_hour == 0 and local_hour == 23:#系统时间慢于行情时间\n",
" real_date+=timedelta(1)\n",
"\n",
" tick.time = tick.time.ljust(12,'0')\n",
" drTick.datetime = datetime(real_date.year,real_date.month,real_date.day,\n",
" int(tick.time[0:2]), int(tick.time[3:5]), int(tick.time[6:8]), int(tick.time[9:12])*1000)\n",
"\n",
" tmpTime=drTick.datetime.time()\n",
" for sec in invalid_sections:\n",
" if tmpTime>sec[0] and tmpTime<sec[1]:\n",
" return\n",
" \n",
" # 3. 更新Tick数据\n",
" if insertDB:\n",
" insertData(TICK_DB_NAME, tick.vtSymbol, drTick) \n",
"\n",
"procecssTickEvent(tick)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"新版不保存数据到mongodb单次耗时0.0255\n",
"新版含保存数据到mongodb单次耗时0.2334\n"
]
}
],
"source": [
"new_nodb=profiling(test_count)\n",
"client.drop_database(TICK_DB_NAME)\n",
"new_db=profiling(test_count,func=lambda: procecssTickEvent(tick,insertDB=True))\n",
"print '新版不保存数据到mongodb单次耗时%.4f' %original_nodb\n",
"print '新版含保存数据到mongodb单次耗时%.4f' %original_db"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 保存为文本文件效率"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def insertData(db,collection,data):\n",
" for key in data.__dict__:\n",
" fout.write(str(data.__dict__[key])+',')"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"新版含保存数据到text file单次耗时0.2334\n"
]
}
],
"source": [
"fout=open('D:/test.txt','w')\n",
"new_db_text=profiling(test_count,func=lambda: procecssTickEvent(tick,insertDB=True))\n",
"print '新版含保存数据到text file单次耗时%.4f' %original_db\n",
"fout.close()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 时间类型转化效率\n",
"\n",
"注意到不保存数据到数据的版本中,新版相比老版耗时反而降低了,这主要是由于时间转化函数的改写。\n",
"\n",
"如下三种时间转化方法效率差别巨大:"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"转化方法1耗时0.0560\n",
"转化方法2耗时0.0122\n",
"转化方法3耗时0.0032\n"
]
}
],
"source": [
"time_convert1=profiling(10000,lambda:parse('20161212 21:21:21.5'))\n",
"time_convert2=profiling(10000,lambda:datetime.strptime('20161212 21:21:21.5', '%Y%m%d %H:%M:%S.%f'))\n",
"def customized_parse(s):\n",
" s=s.ljust(21,'0')\n",
" return datetime(int(s[0:4]),int(s[4:6]),int(s[6:8]),int(s[9:11]), int(s[12:14]), int(s[15:17]), int(s[18:21])*1000 )\n",
"time_convert3=profiling(10000,lambda:customized_parse('20161212 21:21:21.5')) \n",
"print '转化方法1耗时%.4f' %time_convert1\n",
"print '转化方法2耗时%.4f' %time_convert2\n",
"print '转化方法3耗时%.4f' %time_convert3"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false
},
"source": [
"#总结"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>mongodb写入</th>\n",
" <th>text文件写入</th>\n",
" <th>无数据写入</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>原版</th>\n",
" <td>0.2334</td>\n",
" <td>NaN</td>\n",
" <td>0.0255</td>\n",
" </tr>\n",
" <tr>\n",
" <th>新版</th>\n",
" <td>0.2174</td>\n",
" <td>0.0362</td>\n",
" <td>0.0160</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" mongodb写入 text文件写入 无数据写入\n",
"原版 0.2334 NaN 0.0255\n",
"新版 0.2174 0.0362 0.0160"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pandas as pd\n",
"df=pd.DataFrame([{u'无数据写入':original_nodb,u'mongodb写入':original_db},\n",
" {u'无数据写入': new_nodb, u'mongodb写入': new_db, u'text文件写入':new_db_text}\n",
" ],index=['原版','新版'])\n",
"df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"总的来看行情数据落地原版与新版一次落地耗时都为0.2ms左右。函数中耗时主要来源于mongodb的插入占约为90%的耗时。通过尝试简单的text写入作为数据存储方式耗时得到了大幅降低获得了单次0.04ms耗时的效果采取其它更高效的格式有望进一步降低耗时。但考虑到无数据写入时的耗时为约0.02ms所以期望的最优耗时也就在0.02ms左右。\n",
"\n",
"总的来说基于mongodb的方案能够实时存储的条目数在每秒几百条量级进一步优化可能达到几千条量级。此水平应该己能满足绝大多数的需求。"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 0
}

View File

@ -1,511 +0,0 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# 这个测试目标在于仿造一个类似于实盘中,不断有新的数据推送过来,\n",
"# 然后需要计算移动平均线数值,这么一个比较常见的任务。\n",
"\n",
"from __future__ import division\n",
"import time\n",
"import random\n",
"\n",
"# 生成测试用的数据\n",
"data = []\n",
"data_length = 100000 # 总数据量\n",
"ma_length = 500 # 移动均线的窗口\n",
"test_times = 10 # 测试次数\n",
"\n",
"for i in range(data_length):\n",
" data.append(random.randint(1, 100))"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"单次耗时1.16959998608秒\n",
"单个数据点耗时11.7547737294微秒\n",
"最后10个移动平均值 [49.804, 49.832, 49.8, 49.9, 49.892, 49.888, 49.928, 50.052, 50.106, 49.982]\n"
]
}
],
"source": [
"# 计算500期的移动均线并将结果保存到一个列表里返回\n",
"def ma_basic(data, ma_length):\n",
" \n",
" # 用于保存均线输出结果的列表\n",
" ma = []\n",
" \n",
" # 计算均线用的数据窗口\n",
" data_window = data[:ma_length]\n",
" \n",
" # 测试用数据(去除了之前初始化用的部分)\n",
" test_data = data[ma_length:]\n",
" \n",
" # 模拟实盘不断收到新数据推送的情景,遍历历史数据计算均线\n",
" for new_tick in test_data:\n",
" # 移除最老的数据点并增加最新的数据点\n",
" data_window.pop(0)\n",
" data_window.append(new_tick)\n",
" \n",
" # 遍历求均线\n",
" sum_tick = 0\n",
" for tick in data_window:\n",
" sum_tick += tick\n",
" ma.append(sum_tick/ma_length)\n",
" \n",
" # 返回数据\n",
" return ma\n",
"\n",
"# 运行测试\n",
"start = time.time()\n",
"\n",
"for i in range(test_times):\n",
" result = ma_basic(data, ma_length)\n",
"\n",
"time_per_test = (time.time()-start)/test_times\n",
"time_per_point = time_per_test/(data_length - ma_length)\n",
" \n",
"print u'单次耗时:%s秒' %time_per_test\n",
"print u'单个数据点耗时:%s微秒' %(time_per_point*1000000)\n",
"print u'最后10个移动平均值', result[-10:]\n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"单次耗时2.11879999638秒\n",
"单个数据点耗时21.2944723254微秒\n",
"最后10个移动平均值 [49.804000000000002, 49.832000000000001, 49.799999999999997, 49.899999999999999, 49.892000000000003, 49.887999999999998, 49.927999999999997, 50.052, 50.106000000000002, 49.981999999999999]\n"
]
}
],
"source": [
"# 改用numpy首先是一种常见的错误用法\n",
"import numpy as np\n",
"\n",
"def ma_numpy_wrong(data, ma_length):\n",
" ma = []\n",
" data_window = data[:ma_length]\n",
" test_data = data[ma_length:]\n",
" \n",
" for new_tick in test_data:\n",
" data_window.pop(0)\n",
" data_window.append(new_tick)\n",
" \n",
" # 使用numpy求均线注意这里本质上每次循环\n",
" # 都在创建一个新的numpy数组对象开销很大\n",
" data_array = np.array(data_window)\n",
" ma.append(data_array.mean())\n",
" \n",
" return ma\n",
"\n",
"# 运行测试\n",
"start = time.time()\n",
"\n",
"for i in range(test_times):\n",
" result = ma_numpy_wrong(data, ma_length)\n",
" \n",
"time_per_test = (time.time()-start)/test_times\n",
"time_per_point = time_per_test/(data_length - ma_length)\n",
" \n",
"print u'单次耗时:%s秒' %time_per_test\n",
"print u'单个数据点耗时:%s微秒' %(time_per_point*1000000)\n",
"print u'最后10个移动平均值', result[-10:]"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"单次耗时0.614300012589秒\n",
"单个数据点耗时6.17386947325微秒\n",
"最后10个移动平均值 [49.804000000000002, 49.832000000000001, 49.799999999999997, 49.899999999999999, 49.892000000000003, 49.887999999999998, 49.927999999999997, 50.052, 50.106000000000002, 49.981999999999999]\n"
]
}
],
"source": [
"# numpy的正确用法\n",
"def ma_numpy_right(data, ma_length):\n",
" ma = []\n",
" \n",
" # 用numpy数组来缓存计算窗口内的数据\n",
" data_window = np.array(data[:ma_length])\n",
" \n",
" test_data = data[ma_length:]\n",
" \n",
" for new_tick in test_data:\n",
" # 使用numpy数组的底层数据偏移来实现数据更新\n",
" data_window[0:ma_length-1] = data_window[1:ma_length]\n",
" data_window[-1] = new_tick\n",
" ma.append(data_window.mean())\n",
" \n",
" return ma\n",
"\n",
"# 运行测试\n",
"start = time.time()\n",
"\n",
"for i in range(test_times):\n",
" result = ma_numpy_right(data, ma_length)\n",
" \n",
"time_per_test = (time.time()-start)/test_times\n",
"time_per_point = time_per_test/(data_length - ma_length)\n",
" \n",
"print u'单次耗时:%s秒' %time_per_test\n",
"print u'单个数据点耗时:%s微秒' %(time_per_point*1000000)\n",
"print u'最后10个移动平均值', result[-10:]"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"单次耗时0.043700003624秒\n",
"单个数据点耗时0.439196016321微秒\n",
"最后10个移动平均值 [49.804, 49.832, 49.8, 49.9, 49.892, 49.888, 49.928, 50.052, 50.106, 49.982]\n"
]
}
],
"source": [
"# 使用numba加速ma_numba函数和ma_basic完全一样\n",
"import numba\n",
"\n",
"@numba.jit\n",
"def ma_numba(data, ma_length):\n",
" ma = []\n",
" data_window = data[:ma_length]\n",
" test_data = data[ma_length:]\n",
" \n",
" for new_tick in test_data:\n",
" data_window.pop(0)\n",
" data_window.append(new_tick)\n",
" sum_tick = 0\n",
" for tick in data_window:\n",
" sum_tick += tick\n",
" ma.append(sum_tick/ma_length)\n",
"\n",
" return ma\n",
"\n",
"# 运行测试\n",
"start = time.time()\n",
"\n",
"for i in range(test_times):\n",
" result = ma_numba(data, ma_length)\n",
"\n",
"time_per_test = (time.time()-start)/test_times\n",
"time_per_point = time_per_test/(data_length - ma_length)\n",
" \n",
"print u'单次耗时:%s秒' %time_per_test\n",
"print u'单个数据点耗时:%s微秒' %(time_per_point*1000000)\n",
"print u'最后10个移动平均值', result[-10:]"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"单次耗时0.0348000049591秒\n",
"单个数据点耗时0.349748793559微秒\n",
"最后10个移动平均值 [49.804, 49.832, 49.8, 49.9, 49.892, 49.888, 49.928, 50.052, 50.106, 49.982]\n"
]
}
],
"source": [
"# 将均线计算改写为高速算法\n",
"def ma_online(data, ma_length):\n",
" ma = []\n",
" data_window = data[:ma_length]\n",
" test_data = data[ma_length:]\n",
" \n",
" # 缓存的窗口内数据求和结果\n",
" sum_buffer = 0\n",
" \n",
" for new_tick in test_data:\n",
" old_tick = data_window.pop(0)\n",
" data_window.append(new_tick)\n",
" \n",
" # 如果缓存结果为空,则先通过遍历求第一次结果\n",
" if not sum_buffer:\n",
" sum_tick = 0\n",
" for tick in data_window:\n",
" sum_tick += tick\n",
" ma.append(sum_tick/ma_length)\n",
" \n",
" # 将求和结果缓存下来\n",
" sum_buffer = sum_tick\n",
" else:\n",
" # 这里的算法将计算复杂度从O(n)降低到了O(1)\n",
" sum_buffer = sum_buffer - old_tick + new_tick\n",
" ma.append(sum_buffer/ma_length)\n",
" \n",
" return ma\n",
"\n",
"# 运行测试\n",
"start = time.time()\n",
"\n",
"for i in range(test_times):\n",
" result = ma_online(data, ma_length)\n",
" \n",
"time_per_test = (time.time()-start)/test_times\n",
"time_per_point = time_per_test/(data_length - ma_length)\n",
"\n",
"print u'单次耗时:%s秒' %time_per_test\n",
"print u'单个数据点耗时:%s微秒' %(time_per_point*1000000)\n",
"print u'最后10个移动平均值', result[-10:]"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"单次耗时0.0290000200272秒\n",
"单个数据点耗时0.29145748771微秒\n",
"最后10个移动平均值 [49.804, 49.832, 49.8, 49.9, 49.892, 49.888, 49.928, 50.052, 50.106, 49.982]\n"
]
}
],
"source": [
"# 高速算法和numba结合ma_online_numba函数和ma_online完全一样\n",
"@numba.jit\n",
"def ma_online_numba(data, ma_length):\n",
" ma = []\n",
" data_window = data[:ma_length]\n",
" test_data = data[ma_length:]\n",
" \n",
" sum_buffer = 0\n",
" \n",
" for new_tick in test_data:\n",
" old_tick = data_window.pop(0)\n",
" data_window.append(new_tick)\n",
" \n",
" if not sum_buffer:\n",
" sum_tick = 0\n",
" for tick in data_window:\n",
" sum_tick += tick\n",
" ma.append(sum_tick/ma_length)\n",
" sum_buffer = sum_tick\n",
" else:\n",
" sum_buffer = sum_buffer - old_tick + new_tick\n",
" ma.append(sum_buffer/ma_length)\n",
"\n",
" return ma\n",
"\n",
"# 运行测试\n",
"start = time.time()\n",
"\n",
"for i in range(test_times):\n",
" result = ma_online_numba(data, ma_length)\n",
" \n",
"time_per_test = (time.time()-start)/test_times\n",
"time_per_point = time_per_test/(data_length - ma_length)\n",
"\n",
"print u'单次耗时:%s秒' %time_per_test\n",
"print u'单个数据点耗时:%s微秒' %(time_per_point*1000000)\n",
"print u'最后10个移动平均值', result[-10:]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"\"\"\"\n",
"# 基础的cython加速\n",
"def ma_cython(data, ma_length):\n",
" ma = []\n",
" data_window = data[:ma_length]\n",
" test_data = data[ma_length:]\n",
" \n",
" for new_tick in test_data:\n",
" data_window.pop(0)\n",
" data_window.append(new_tick)\n",
" \n",
" sum_tick = 0\n",
" for tick in data_window:\n",
" sum_tick += tick\n",
" ma.append(sum_tick/ma_length)\n",
" \n",
" return ma\n",
" \n",
"\n",
"# cython和高速算法\n",
"def ma_cython_online(data, ma_length):\n",
" # 静态声明变量\n",
" cdef int sum_buffer, sum_tick, old_tick, new_tick\n",
"\n",
" ma = []\n",
" data_window = data[:ma_length]\n",
" test_data = data[ma_length:]\n",
" sum_buffer = 0\n",
" \n",
" for new_tick in test_data:\n",
" old_tick = data_window.pop(0)\n",
" data_window.append(new_tick)\n",
" \n",
" if not sum_buffer:\n",
" sum_tick = 0\n",
" for tick in data_window:\n",
" sum_tick += tick\n",
" ma.append(sum_tick/ma_length)\n",
" \n",
" sum_buffer = sum_tick\n",
" else:\n",
" sum_buffer = sum_buffer - old_tick + new_tick\n",
" ma.append(sum_buffer/ma_length)\n",
" \n",
" return ma\n",
"\"\"\""
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"单次耗时0.600800013542秒\n",
"单个数据点耗时6.03819109088微秒\n",
"最后10个移动平均值 [49.804, 49.832, 49.8, 49.9, 49.892, 49.888, 49.928, 50.052, 50.106, 49.982]\n"
]
}
],
"source": [
"# 基础cython加速\n",
"from test import ma_cython\n",
"\n",
"start = time.time()\n",
"\n",
"for i in range(test_times):\n",
" result = ma_cython(data, ma_length)\n",
" \n",
"time_per_test = (time.time()-start)/test_times\n",
"time_per_point = time_per_test/(data_length - ma_length)\n",
"\n",
"print u'单次耗时:%s秒' %time_per_test\n",
"print u'单个数据点耗时:%s微秒' %(time_per_point*1000000)\n",
"print u'最后10个移动平均值', result[-10:]"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"单次耗时0.00980000495911秒\n",
"单个数据点耗时0.0984925121518微秒\n",
"最后10个移动平均值 [49.804, 49.832, 49.8, 49.9, 49.892, 49.888, 49.928, 50.052, 50.106, 49.982]\n"
]
}
],
"source": [
"# 高速算法和cython结合\n",
"from test import ma_cython_online\n",
"\n",
"start = time.time()\n",
"\n",
"for i in range(test_times):\n",
" result = ma_cython_online(data, ma_length)\n",
"\n",
"time_per_test = (time.time()-start)/test_times\n",
"time_per_point = time_per_test/(data_length - ma_length)\n",
"\n",
"print u'单次耗时:%s秒' %time_per_test\n",
"print u'单个数据点耗时:%s微秒' %(time_per_point*1000000)\n",
"print u'最后10个移动平均值', result[-10:]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.11"
}
},
"nbformat": 4,
"nbformat_minor": 0
}

View File

@ -1,16 +0,0 @@
# 使用说明
### 使用步骤
1. 在当前文件夹下打开cmd窗口
2. 输入ipython notebook运行
3. 打开Python Performance笔记本使用Shift+回车逐个Cell运行
### 编译Cython
打开cmd输入运行
> python setup.py build_ext --inplace
### 文件说明
* Python Performance.ipynbJupyter Notebook笔记本
* test.pyxCython模块的源代码
* test_setup.py编译test.pyx所需的配置文件
* test.pyd编译好的Cython模块可以在Python里直接import

Binary file not shown.

View File

@ -1,48 +0,0 @@
#encoding:utf-8
from __future__ import division
# 基础的cython加速
def ma_cython(data, ma_length):
ma = []
data_window = data[:ma_length]
test_data = data[ma_length:]
for new_tick in test_data:
data_window.pop(0)
data_window.append(new_tick)
sum_tick = 0
for tick in data_window:
sum_tick += tick
ma.append(sum_tick/ma_length)
return ma
# cython和高速算法
def ma_cython_online(data, ma_length):
# 静态声明变量
cdef int sum_buffer, sum_tick, old_tick, new_tick
ma = []
data_window = data[:ma_length]
test_data = data[ma_length:]
sum_buffer = 0
for new_tick in test_data:
old_tick = data_window.pop(0)
data_window.append(new_tick)
if not sum_buffer:
sum_tick = 0
for tick in data_window:
sum_tick += tick
ma.append(sum_tick/ma_length)
sum_buffer = sum_tick
else:
sum_buffer = sum_buffer - old_tick + new_tick
ma.append(sum_buffer/ma_length)
return ma

View File

@ -1,7 +0,0 @@
from distutils.core import setup
from Cython.Build import cythonize
setup(
name = 'cython test',
ext_modules = cythonize("test.pyx"),
)

View File

@ -1,4 +1,4 @@
# encoding: UTF-8
__version__ = '1.7.1'
__version__ = '1.7.2'
__author__ = 'Xiaoyou Chen'

View File

@ -211,7 +211,11 @@ class TqApi(object):
def on_receive_msg(self, msg):
"""收到数据推送"""
pack = json.loads(msg)
l = pack["data"]
if 'data' in pack:
l = pack["data"]
else:
print u'on_receive_msg收到的数据中没有data字段数据内容%s' %str(pack)
for data in l:
# 合并更新数据字典

View File

@ -145,7 +145,7 @@ class OmUnderlying(OmInstrument):
#----------------------------------------------------------------------
def calculatePosGreeks(self):
"""计算持仓希腊值"""
self.posDelta = self.theoDelta * self.netPos * self.size
self.posDelta = self.theoDelta * self.netPos
########################################################################
@ -218,21 +218,26 @@ class OmOption(OmInstrument):
if not underlyingPrice or not self.pricingImpv:
return
self.theoPrice, self.theoDelta, self.theoGamma, self.theoTheta, self.theoVega = self.calculateGreeks(underlyingPrice,
self.k,
self.r,
self.t,
self.pricingImpv,
self.cp)
self.theoPrice, delta, gamma, theta, vega = self.calculateGreeks(underlyingPrice,
self.k,
self.r,
self.t,
self.pricingImpv,
self.cp)
self.theoDelta = delta * self.size
self.theoGamma = gamma * self.size
self.theoTheta = theta * self.size
self.theoVega = vega * self.size
#----------------------------------------------------------------------
def calculatePosGreeks(self):
"""计算持仓希腊值"""
self.posValue = self.theoPrice * self.netPos * self.size
self.posDelta = self.theoDelta * self.netPos * self.size
self.posGamma = self.theoGamma * self.netPos * self.size
self.posTheta = self.theoTheta * self.netPos * self.size
self.posVega = self.theoVega * self.netPos * self.size
self.posDelta = self.theoDelta * self.netPos
self.posGamma = self.theoGamma * self.netPos
self.posTheta = self.theoTheta * self.netPos
self.posVega = self.theoVega * self.netPos
#----------------------------------------------------------------------
def newTick(self, tick):

View File

@ -35,11 +35,11 @@ class ScenarioValueMonitor(QtWidgets.QTableWidget):
# 设置表头
self.setColumnCount(len(priceChangeArray))
priceChangeHeaders = [('%s%%' %(priceChange*100)) for priceChange in priceChangeArray]
priceChangeHeaders = [('price %s%%' %(priceChange*100)) for priceChange in priceChangeArray]
self.setHorizontalHeaderLabels(priceChangeHeaders)
self.setRowCount(len(impvChangeArray))
impvChangeHeaders = [('%s%%' %(impvChange*100)) for impvChange in impvChangeArray]
impvChangeHeaders = [('impv %s%%' %(impvChange*100)) for impvChange in impvChangeArray]
self.setVerticalHeaderLabels(impvChangeHeaders)
# 设置数据

View File

@ -201,7 +201,7 @@ class ChainMonitor(QtWidgets.QTableWidget):
for underlying in portfolio.underlyingDict.values():
self.eventEngine.register(EVENT_TICK + underlying.vtSymbol, self.signalTick.emit)
self.eventEngine.register(EVENT_TRADE + underlying.vtSymbol, self.signalTick.emit)
self.eventEngine.register(EVENT_TRADE + underlying.vtSymbol, self.signalTrade.emit)
for chain in portfolio.chainDict.values():
for option in chain.optionDict.values():

Binary file not shown.

Before

Width:  |  Height:  |  Size: 66 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 59 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 66 KiB

View File

@ -1,178 +0,0 @@
# encoding: utf-8
# 重载sys模块设置默认字符串编码方式为utf8
import sys
reload(sys)
sys.setdefaultencoding('utf8')
# vn.trader模块
from vnpy.event import EventEngine2
from vnpy.rpc import RpcClient
from vnpy.trader.vtGlobal import globalSetting
from vnpy.trader.vtEngine import MainEngine
from vnpy.trader.uiQt import createQApp
from vnpy.trader.uiMainWindow import MainWindow
########################################################################
class VtClient(RpcClient):
"""vn.trader客户端"""
#----------------------------------------------------------------------
def __init__(self, reqAddress, subAddress, eventEngine):
"""Constructor"""
super(VtClient, self).__init__(reqAddress, subAddress)
self.eventEngine = eventEngine
self.usePickle()
#----------------------------------------------------------------------
def callback(self, topic, data):
"""回调函数"""
self.eventEngine.put(data)
########################################################################
class ClientEngine(MainEngine):
"""客户端引擎提供和MainEngine完全相同的API接口"""
#----------------------------------------------------------------------
def __init__(self, client, eventEngine):
"""Constructor"""
super(MainEngine, self).__init__(eventEngine)
self.client = client
self.eventEngine = eventEngine
# 扩展模块
self.ctaEngine = CtaEngine(self, self.eventEngine)
self.drEngine = DrEngine(self, self.eventEngine)
self.rmEngine = RmEngine(self, self.eventEngine)
#----------------------------------------------------------------------
def connect(self, gatewayName):
"""连接特定名称的接口"""
self.client.connect(gatewayName)
#----------------------------------------------------------------------
def subscribe(self, subscribeReq, gatewayName):
"""订阅特定接口的行情"""
self.client.subscribe(subscribeReq, gatewayName)
#----------------------------------------------------------------------
def sendOrder(self, orderReq, gatewayName):
"""对特定接口发单"""
return self.client.sendOrder(orderReq, gatewayName)
#----------------------------------------------------------------------
def cancelOrder(self, cancelOrderReq, gatewayName):
"""对特定接口撤单"""
self.client.cancelOrder(cancelOrderReq, gatewayName)
#----------------------------------------------------------------------
def qryAccont(self, gatewayName):
"""查询特定接口的账户"""
self.client.qryAccount(gatewayName)
#----------------------------------------------------------------------
def qryPosition(self, gatewayName):
"""查询特定接口的持仓"""
self.client.qryPosition(gatewayName)
#----------------------------------------------------------------------
def exit(self):
"""退出程序前调用,保证正常退出"""
# 停止事件引擎
self.eventEngine.stop()
# 关闭客户端的推送数据接收
self.client.stop()
# 停止数据记录引擎
self.drEngine.stop()
#----------------------------------------------------------------------
def writeLog(self, content):
"""快速发出日志事件"""
self.client.writeLog(content)
#----------------------------------------------------------------------
def dbConnect(self):
"""连接MongoDB数据库"""
self.client.dbConnect()
#----------------------------------------------------------------------
def dbInsert(self, dbName, collectionName, d):
"""向MongoDB中插入数据d是具体数据"""
self.client.dbInsert(dbName, collectionName, d)
#----------------------------------------------------------------------
def dbQuery(self, dbName, collectionName, d):
"""从MongoDB中读取数据d是查询要求返回的是数据库查询的数据列表"""
return self.client.dbQuery(dbName, collectionName, d)
#----------------------------------------------------------------------
def dbUpdate(self, dbName, collectionName, d, flt, upsert=False):
"""向MongoDB中更新数据d是具体数据flt是过滤条件upsert代表若无是否要插入"""
self.client.dbUpdate(dbName, collectionName, d, flt, upsert)
#----------------------------------------------------------------------
def getContract(self, vtSymbol):
"""查询合约"""
return self.client.getContract(vtSymbol)
#----------------------------------------------------------------------
def getAllContracts(self):
"""查询所有合约(返回列表)"""
return self.client.getAllContracts()
#----------------------------------------------------------------------
def getOrder(self, vtOrderID):
"""查询委托"""
return self.client.getOrder(vtOrderID)
#----------------------------------------------------------------------
def getAllWorkingOrders(self):
"""查询所有的活跃的委托(返回列表)"""
return self.client.getAllWorkingOrders()
#----------------------------------------------------------------------
def getAllGatewayDetails(self):
"""查询所有的接口名称"""
return self.client.getAllGatewayDetails()
#----------------------------------------------------------------------
def main():
"""客户端主程序入口"""
# 创建Qt对象
qApp = createQApp()
# 创建事件引擎
eventEngine = EventEngine2()
eventEngine.start(timer=False)
# 创建客户端
reqAddress = 'tcp://localhost:2014'
subAddress = 'tcp://localhost:0602'
client = VtClient(reqAddress, subAddress, eventEngine)
client.subscribeTopic('')
client.start()
# 初始化主引擎和主窗口对象
mainEngine = ClientEngine(client, eventEngine)
mainWindow = MainWindow(mainEngine, mainEngine.eventEngine)
mainWindow.showMaximized()
# 在主线程中启动Qt事件循环
sys.exit(qApp.exec_())
if __name__ == '__main__':
main()

View File

@ -1,100 +0,0 @@
# encoding: utf-8
import sys
import os
from datetime import datetime
from time import sleep
from threading import Thread
from vnpy.event import EventEngine2
from vnpy.rpc import RpcServer
from vnpy.trader.vtEngine import MainEngine
########################################################################
class VtServer(RpcServer):
"""vn.trader服务器"""
#----------------------------------------------------------------------
def __init__(self, repAddress, pubAddress):
"""Constructor"""
super(VtServer, self).__init__(repAddress, pubAddress)
self.usePickle()
# 创建事件引擎
self.ee = EventEngine2()
# 创建主引擎对象
self.engine = MainEngine(self.ee)
# 注册主引擎的方法到服务器的RPC函数
self.register(self.engine.connect)
self.register(self.engine.subscribe)
self.register(self.engine.sendOrder)
self.register(self.engine.cancelOrder)
self.register(self.engine.qryAccount)
self.register(self.engine.qryPosition)
self.register(self.engine.exit)
self.register(self.engine.writeLog)
self.register(self.engine.dbConnect)
self.register(self.engine.dbInsert)
self.register(self.engine.dbQuery)
self.register(self.engine.dbUpdate)
self.register(self.engine.getContract)
self.register(self.engine.getAllContracts)
self.register(self.engine.getOrder)
self.register(self.engine.getAllWorkingOrders)
self.register(self.engine.getAllGatewayDetails)
# 注册事件引擎发送的事件处理监听
self.engine.eventEngine.registerGeneralHandler(self.eventHandler)
#----------------------------------------------------------------------
def eventHandler(self, event):
"""事件处理"""
self.publish(event.type_, event)
#----------------------------------------------------------------------
def stopServer(self):
"""停止服务器"""
# 关闭引擎
self.engine.exit()
# 停止服务器线程
self.stop()
#----------------------------------------------------------------------
def printLog(content):
"""打印日志"""
print datetime.now().strftime("%H:%M:%S"), '\t', content
#----------------------------------------------------------------------
def runServer():
"""运行服务器"""
repAddress = 'tcp://*:2014'
pubAddress = 'tcp://*:0602'
# 创建并启动服务器
server = VtServer(repAddress, pubAddress)
server.start()
printLog('-'*50)
printLog(u'vn.trader服务器已启动')
# 进入主循环
while True:
printLog(u'请输入exit来关闭服务器')
if raw_input() != 'exit':
continue
printLog(u'确认关闭服务器yes|no')
if raw_input() == 'yes':
break
server.stopServer()
if __name__ == '__main__':
runServer()

View File

@ -48,6 +48,7 @@ exchangeMap[EXCHANGE_SHFE] = 'SHFE'
exchangeMap[EXCHANGE_CZCE] = 'CZCE'
exchangeMap[EXCHANGE_DCE] = 'DCE'
exchangeMap[EXCHANGE_SSE] = 'SSE'
exchangeMap[EXCHANGE_SZSE] = 'SZSE'
exchangeMap[EXCHANGE_INE] = 'INE'
exchangeMap[EXCHANGE_UNKNOWN] = ''
exchangeMapReverse = {v:k for k,v in exchangeMap.items()}