Merge remote-tracking branch 'vnpy/master'

This commit is contained in:
msincenselee 2015-08-31 20:47:05 +08:00
commit a4c2d3b4dd
36 changed files with 7766 additions and 12 deletions

195
vn.datayes/README.md Normal file
View File

@ -0,0 +1,195 @@
#VN.DATAYES - Welcome!
##0. Tutorial - ipythonNotebook
http://nbviewer.ipython.org/github/zedyang/vn.past-demo/blob/master/static/tutorial.ipynb
##1. Preface
###1.1
vn.datayes是一个从属于vnpy的开源历史数据模块使用通联数据API以及MongoDB进行数据的下载和存储管理。项目目前与将来主要解决\准备解决以下问题:
* 从通联数据等API高效地爬取、更新、清洗历史数据。
* 基于MongoDB的数据库管理、快速查询、转换输出格式支持自定义符合需求的行情历史数据库。
* 基于Python.Matplotlib或R.ggplot2快速绘制K线图等可视化对象。
项目目前主要包括了通联API开发者试用方案中大部分的市场行情日线数据股票、期货、期权、指数、基金等以及部分基本面数据。数据下载与更新主要采用多线程设计测试效率如下
| 数据集举例 | 数据集容量 | 下载时间估计 |
| :-------------: | :-------------: | :-------------: |
| 股票日线数据2800个交易代码2013年1月1日至2015年8月1日 | 2800个collection约500条/each | 7-10分钟 |
| 股票分钟线数据2个交易代码2013年1月1日至2015年8月1日 | 2个collection约20万条/each | 1-2分钟 |
| 股票日线数据更新任务2800个交易代码2015年8月1日至2015年8月15日 | 2800个collection约10条/each | 1-2分钟 |
vn.datayes基于MongoDB数据库通过一个json配置文件简化数据库的初始化、设置、动态更新过程。较为精细的数据库操作仍需编写脚本进行。若对MongoDB与pymongo不熟悉推荐使用Robomongo等窗口化查看工具作为辅助。
###1.2 主要依赖:
pymongo, pandas, requests, json
###1.3 开发测试环境:
Mac OS X 10.10; Windows 7 || Anaconda.Python 2.7
##2. Get Started
###2.1 准备
* 下载并安装MongoDB: https://www.mongodb.org/downloads
* 获取API token以通联数据为例。
![fig1](static/figs/fig1.png)
* 更新pymongo至3.0以上版本; 更新requests等包。
```
~$ pip install pymongo --upgrade
~$ pip install requests --upgrade
```
* [ ! 注意本模块需要pymongo3.0新加入的部分方法使用vnpy本体所用的2.7版本对应方法将无法正常插入数据。依赖冲突的问题会尽快被解决目前推荐制作一个virtual environment来单独运行这个模块或者暴力切换pymongo的版本]
```
~$ pip install pymongo==3.0.3 # this module.
~$ pip install pymongo==2.7.2 # pymongo 2.7.
```
* 启动MongoDB
```
~$ mongod
```
###2.2 数据库初始化与下载
* **api.Config** 对象包含了向API进行数据请求所需的信息我们需要一个用户token来初始化这个对象。
```
from storage import *
myConfig = Config(head="Zed's Config",
token='7c2e59e212dbff90ffd6b382c7afb57bc987a99307d382b058af6748f591d723')
myConfig.body
```
```
{'domain': 'api.wmcloud.com/data',
'header': {'Authorization': 'Bearer 7c2e59e212dbff90ffd6b382c7afb57bc987a99307d382b058af6748f591d723',
'Connection': 'keep-alive'},
'ssl': False,
'version': 'v1'}
```
* **storage.DBConfig** 对象包含了数据库配置。我们需要自己编写一个json字典来填充这个对象。举例来说我们希望下载股票日线数据和指数日线数据数据库名称为DATAYES_EQUITY_D1和DATAYES_INDEX_D1index为日期“date”。那么json字典是这样的
```
client = pymongo.MongoClient() # pymongo.connection object.
body = {
'client': client, # connection object.
'dbs': {
'EQU_D1': { # in-python alias: 'EQU_D1'
'self': client['DATAYES_EQUITY_D1'], # pymongo.database[name] object.
'index': 'date', # index name.
'collNames': 'equTicker' # what are collection names consist of.
},
'IDX_D1': { # Another database
'self': client['DATAYES_INDEX_D1'],
'index': 'date',
'collNames': 'idxTicker'
}
},
'dbNames': ['EQU_D1','IDX_D1'] # List of alias.
}
myDbConfig_ = DBConfig(body=body)
# 这看上去有些麻烦不想这么做的话可以直接使用DBConfig的默认构造函数。
myDbConfig = DBConfig()
myDbConfig.body
```
```
{'client': MongoClient('localhost', 27017),
'dbNames': ['EQU_M1', 'EQU_D1', 'FUT_D1', 'OPT_D1', 'FUD_D1', 'IDX_D1'],
'dbs': {'EQU_D1': {'collNames': 'equTicker',
'index': 'date',
'self': Database(MongoClient('localhost', 27017), u'DATAYES_EQUITY_D1')},
'EQU_M1': {'collNames': 'secID',
'index': 'dateTime',
'self': Database(MongoClient('localhost', 27017), u'DATAYES_EQUITY_M1')},
'FUD_D1': {'collNames': 'fudTicker',
'index': 'date',
'self': Database(MongoClient('localhost', 27017), u'DATAYES_FUND_D1')},
'FUT_D1': {'collNames': 'futTicker',
'index': 'date',
'self': Database(MongoClient('localhost', 27017), u'DATAYES_FUTURE_D1')},
'IDX_D1': {'collNames': 'idxTicker',
'index': 'date',
'self': Database(MongoClient('localhost', 27017), u'DATAYES_INDEX_D1')},
'OPT_D1': {'collNames': 'optTicker',
'index': 'date',
'self': Database(MongoClient('localhost', 27017), u'DATAYES_OPTION_D1')}}}
```
* **api.PyApi**是向网络数据源进行请求的主要对象。**storage.MongodController**是进行数据库管理的对象。当我们完成了配置对象的构造即可初始化PyApi与MongodController。**MongodController._get_coll_names()** 和**MongodController._ensure_index()** 是数据库初始化所调用的方法,为了模块开发的方便,它们暂时没有被放进构造函数中自动执行。
```
myApi = PyApi(myConfig) # construct PyApi object.
mc = MongodController(api=myApi, config=myDbConfig) # construct MongodController object,
# on the top of PyApi.
mc._get_coll_names() # get names of collections.
mc._ensure_index() # ensure collection indices.
```
```
[MONGOD]: Collection names gotten.
[MONGOD]: MongoDB index set.
```
![fig2](static/figs/fig2.png)
* 使用**MongodController.download#()**方法进行下载。
mc.download_index_D1('20150101','20150801')
![fig3](static/figs/fig3.png)
###2.3 数据库更新
* 使用**MongodController.update#()**方法进行更新。脚本会自动寻找数据库中的最后一日并更新至最新交易日。
```
from datetime import datetime
datetime.now()
```
```
datetime.datetime(2015, 8, 17, 10, 49, 21, 37758)
```
```
mc.update_index_D1()
```
![fig4](static/figs/fig4.png)
###2.4 Mac OS或Linux下的下载与更新
模块中包含了一些shell脚本方面在linux-like os下的数据下载、更新。
```
~$ cd path/of/vn/datayes
~$ chmod +x prepare.sh
~$ ./prepare.sh
```
![fig5](static/figs/fig5.png)
![fig6](static/figs/fig6.png)

0
vn.datayes/__init__.py Normal file
View File

1561
vn.datayes/api.py Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

51
vn.datayes/download.sh Executable file
View File

@ -0,0 +1,51 @@
#!/bin/bash
echo [API]: Prepare to construct DATAYES_FUTURE_D1, {20150101, 20150801}...
python - << EOF
from storage import *
dc = DBConfig()
api = PyApi(Config())
mc = MongodController(dc, api)
mc.download_future_D1('20150101','20150801')
EOF
echo [MONGOD]: DATAYES_FUTURE_D1 constructed.
echo [API]: Prepare to construct DATAYES_OPTION_D1, {20150101, 20150801}...
python - << EOF
from storage import *
dc = DBConfig()
api = PyApi(Config())
mc = MongodController(dc, api)
mc.download_option_D1('20150101','20150801')
EOF
echo [MONGOD]: DATAYES_OPTION_D1 constructed.
echo [API]: Prepare to construct DATAYES_INDEX_D1, {20150101, 20150801}...
python - << EOF
from storage import *
dc = DBConfig()
api = PyApi(Config())
mc = MongodController(dc, api)
mc.download_index_D1('20150101','20150801')
EOF
echo [MONGOD]: DATAYES_INDEX_D1 constructed.
echo [API]: Prepare to construct DATAYES_FUND_D1, {20150101, 20150801}...
python - << EOF
from storage import *
dc = DBConfig()
api = PyApi(Config())
mc = MongodController(dc, api)
mc.download_fund_D1('20150101','20150801')
EOF
echo [MONGOD]: DATAYES_FUND_D1 constructed.
echo [API]: Prepare to construct DATAYES_EQUITY_D1, {20130101, 20150801}...
python - << EOF
from storage import *
dc = DBConfig()
api = PyApi(Config())
mc = MongodController(dc, api)
mc.download_equity_D1('20130101','20150801')
EOF
echo [MONGOD]: DATAYES_EQUITY_D1 constructed.

33
vn.datayes/errors.py Normal file
View File

@ -0,0 +1,33 @@
class VNPAST_ConfigError(Exception):
"""
Config file error, raised when config.json or python object
is broken or invalid.
"""
pass
class VNPAST_RequestError(Exception):
"""
HTTP Request Error, raised when response is not properly gotten:
* GET response.status code != 200.
* POST response.status code != 201.
* Connection timed out.
* ...
"""
pass
class VNPAST_DatabaseError(Exception):
"""
"""
pass
class VNPAST_DataConstructorError(Exception):
"""
"""
pass

58
vn.datayes/fun/fetch.R Normal file
View File

@ -0,0 +1,58 @@
ensure_pkgs = function(){
if (!'data.table' %in% installed.packages()){
install.packages('data.table')
}
if (!'rmongodb' %in% installed.packages()){
install.packages('rmongodb')
}
require(data.table)
require(rmongodb)
return(1)
}
get_connection = function(){
client = mongo.create()
return(client)
}
get_dbs = function(){
client = mongo.create()
if(mongo.is.connected(client) == TRUE) {
dbs = mongo.get.databases(client)
}
return(dbs)
}
get_colls = function(db){
client = mongo.create()
if(mongo.is.connected(client) == TRUE) {
colls = mongo.get.database.collections(client, db)
}
return(colls)
}
view_doc = function(coll, one=1){
client = mongo.create()
if(mongo.is.connected(client) == TRUE) {
if(one==1){
doc = mongo.find.one(client, coll)
}
else{
doc = mongo.find.all(client, coll)
}
}
return(doc)
}
fetch = function(coll, start, end){
client = mongo.create()
if(mongo.is.connected(client) == TRUE) {
docs = mongo.find.all(client, coll,
query = list(
'date'=list('lte'=end),
'date'=list('gte'=start)
))
}
return(docs)
}

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1 @@
["150001", "150008", "150009", "150012", "150013", "150016", "150017", "150018", "150019", "150020", "150021", "150022", "150023", "150027", "150028", "150029", "150030", "150031", "150032", "150033", "150034", "150035", "150036", "150037", "150039", "150040", "150042", "150047", "150048", "150049", "150050", "150051", "150052", "150053", "150054", "150055", "150056", "150057", "150058", "150059", "150060", "150064", "150065", "150066", "150067", "150073", "150075", "150076", "150077", "150080", "150083", "150084", "150085", "150086", "150088", "150089", "150090", "150091", "150092", "150093", "150094", "150095", "150096", "150097", "150098", "150099", "150100", "150101", "150102", "150104", "150105", "150106", "150107", "150108", "150109", "150112", "150113", "150114", "150117", "150118", "150120", "150121", "150122", "150123", "150124", "150128", "150129", "150130", "150131", "150133", "150134", "150135", "150136", "150137", "150138", "150139", "150140", "150141", "150142", "150143", "150144", "150145", "150146", "150147", "150148", "150149", "150150", "150151", "150152", "150153", "150154", "150156", "150157", "150158", "150160", "150161", "150164", "150165", "150167", "150168", "150169", "150170", "150171", "150172", "150173", "150174", "150175", "150176", "150177", "150178", "150179", "150180", "150181", "150182", "150184", "150185", "150186", "150187", "150188", "150189", "150190", "150191", "150192", "150193", "150194", "150195", "150196", "150197", "150198", "150199", "150200", "150201", "150203", "150204", "150205", "150206", "150207", "150208", "150209", "150210", "150211", "150212", "150213", "150214", "150215", "150216", "150217", "150218", "150219", "150220", "150221", "150222", "150223", "150224", "150227", "150228", "150229", "150230", "150241", "150242", "159001", "159003", "159005", "159901", "159902", "159903", "159905", "159906", "159907", "159908", "159909", "159910", "159911", "159912", "159913", "159915", "159916", "159917", "159918", "159919", "159920", "159921", "159922", "159923", "159924", "159925", "159926", "159927", "159928", "159929", "159930", "159931", "159932", "159933", "159934", "159935", "159936", "159937", "159938", "159939", "159940", "160105", "160106", "160119", "160123", "160125", "160128", "160130", "160131", "160133", "160211", "160212", "160215", "160216", "160220", "160311", "160314", "160415", "160416", "160505", "160512", "160513", "160515", "160607", "160610", "160611", "160613", "160615", "160616", "160617", "160618", "160621", "160706", "160716", "160717", "160719", "160720", "160805", "160807", "160810", "160812", "160813", "160910", "160915", "160916", "160918", "160919", "161005", "161010", "161015", "161017", "161019", "161115", "161116", "161117", "161119", "161210", "161213", "161216", "161217", "161219", "161222", "161224", "161505", "161607", "161610", "161614", "161706", "161713", "161714", "161716", "161810", "161813", "161815", "161820", "161821", "161831", "161903", "161907", "161908", "161911", "162006", "162105", "162207", "162307", "162308", "162411", "162605", "162607", "162703", "162711", "162712", "162715", "163001", "163110", "163208", "163210", "163302", "163402", "163407", "163409", "163412", "163415", "163503", "163801", "163819", "163821", "163824", "163827", "164105", "164208", "164606", "164701", "164702", "164705", "164808", "164810", "164814", "164815", "164902", "165309", "165311", "165313", "165508", "165509", "165510", "165512", "165513", "165516", "165517", "165705", "165806", "166001", "166006", "166007", "166008", "166009", "166011", "166401", "166902", "166904", "167901", "169101", "184721", "184722", "184728", "500038", "500056", "500058", "502000", "502001", "502002", "502048", "502049", "502050", "505888", "510010", "510020", "510030", "510050", "510060", "510070", "510090", "510110", "510120", "510130", "510150", "510160", "510170", "510180", "510190", "510210", "510220", "510230", "510260", "510270", "510280", "510290", "510300", "510310", "510330", "510410", "510420", "510430", "510440", "510450", "510500", "510510", "510520", "510610", "510620", "510630", "510650", "510660", "510680", "510700", "510880", "510900", "511010", "511210", "511220", "511800", "511810", "511860", "511880", "511990", "512010", "512070", "512110", "512120", "512210", "512220", "512230", "512300", "512310", "512600", "512610", "512640", "512990", "513030", "513100", "513500", "513600", "513660", "518800", "518880"]

View File

@ -0,0 +1 @@
["a1505", "a1507", "a1509", "a1511", "a1601", "a1603", "a1605", "a1607", "a1609", "Ag(T+D)", "ag1505", "ag1506", "ag1507", "ag1508", "ag1509", "ag1510", "ag1511", "ag1512", "ag1601", "ag1602", "ag1603", "ag1604", "al1505", "al1506", "al1507", "al1508", "al1509", "al1510", "al1511", "al1512", "al1601", "al1602", "al1603", "al1604", "Au(T+D)", "au1505", "au1506", "au1507", "au1508", "au1510", "au1512", "au1602", "au1604", "b1505", "b1507", "b1509", "b1511", "b1601", "b1603", "bb1505", "bb1506", "bb1507", "bb1508", "bb1509", "bb1510", "bb1511", "bb1512", "bb1601", "bb1602", "bb1603", "bb1604", "bu1505", "bu1506", "bu1507", "bu1508", "bu1509", "bu1510", "bu1512", "bu1603", "bu1606", "bu1609", "bu1612", "bu1703", "c1505", "c1507", "c1509", "c1511", "c1601", "c1603", "CBOZF", "CBOZL", "CBOZN", "CBOZT", "CF505", "CF507", "CF509", "CF511", "CF601", "CF603", "COMGC", "COMHG", "COMSI", "cs1505", "cs1507", "cs1509", "cs1511", "cs1601", "cs1603", "cu1505", "cu1506", "cu1507", "cu1508", "cu1509", "cu1510", "cu1511", "cu1512", "cu1601", "cu1602", "cu1603", "cu1604", "fb1505", "fb1506", "fb1507", "fb1508", "fb1509", "fb1510", "fb1511", "fb1512", "fb1601", "fb1602", "fb1603", "fb1604", "FG505", "FG506", "FG507", "FG508", "FG509", "FG510", "FG511", "FG512", "FG601", "FG602", "FG603", "FG604", "fu1506", "fu1507", "fu1508", "fu1509", "fu1510", "fu1511", "fu1512", "fu1601", "fu1603", "fu1604", "fu1605", "hc1505", "hc1506", "hc1507", "hc1508", "hc1509", "hc1510", "hc1511", "hc1512", "hc1601", "hc1602", "hc1603", "hc1604", "i1505", "i1506", "i1507", "i1508", "i1509", "i1510", "i1511", "i1512", "i1601", "i1602", "i1603", "i1604", "IC1505", "IC1506", "IC1509", "IC1512", "IF1505", "IF1506", "IF1509", "IF1512", "IH1505", "IH1506", "IH1509", "IH1512", "j1505", "j1506", "j1507", "j1508", "j1509", "j1510", "j1511", "j1512", "j1601", "j1602", "j1603", "j1604", "jd1505", "jd1506", "jd1509", "jd1510", "jd1511", "jd1512", "jd1601", "jd1602", "jd1603", "jd1604", "jm1505", "jm1506", "jm1507", "jm1508", "jm1509", "jm1510", "jm1511", "jm1512", "jm1601", "jm1602", "jm1603", "jm1604", "JR505", "JR507", "JR509", "JR511", "JR601", "JR603", "l1505", "l1506", "l1507", "l1508", "l1509", "l1510", "l1511", "l1512", "l1601", "l1602", "l1603", "l1604", "LLG", "LR505", "LR507", "LR509", "LR511", "LR601", "LR603", "m1505", "m1507", "m1508", "m1509", "m1511", "m1512", "m1601", "m1603", "MA506", "MA507", "MA508", "MA509", "MA510", "MA511", "MA512", "MA601", "MA602", "MA603", "MA604", "ME505", "ni1507", "ni1508", "ni1509", "ni1510", "ni1511", "ni1512", "ni1601", "ni1602", "ni1603", "ni1604", "NYMBZ", "NYMNG", "OI505", "OI507", "OI509", "OI511", "OI601", "OI603", "p1505", "p1506", "p1507", "p1508", "p1509", "p1510", "p1511", "p1512", "p1601", "p1602", "p1603", "p1604", "pb1505", "pb1506", "pb1507", "pb1508", "pb1509", "pb1510", "pb1511", "pb1512", "pb1601", "pb1602", "pb1603", "pb1604", "PM505", "PM507", "PM509", "PM511", "PM601", "PM603", "pp1505", "pp1506", "pp1507", "pp1508", "pp1509", "pp1510", "pp1511", "pp1512", "pp1601", "pp1602", "pp1603", "pp1604", "rb1505", "rb1506", "rb1507", "rb1508", "rb1509", "rb1510", "rb1511", "rb1512", "rb1601", "rb1602", "rb1603", "rb1604", "RI505", "RI507", "RI509", "RI511", "RI601", "RI603", "RM505", "RM507", "RM508", "RM509", "RM511", "RM601", "RM603", "RS507", "RS508", "RS509", "RS511", "ru1505", "ru1506", "ru1507", "ru1508", "ru1509", "ru1510", "ru1511", "ru1601", "ru1603", "ru1604", "SF505", "SF506", "SF507", "SF508", "SF509", "SF510", "SF511", "SF512", "SF601", "SF602", "SF603", "SF604", "SM505", "SM506", "SM507", "SM508", "SM509", "SM510", "SM511", "SM512", "SM601", "SM602", "SM603", "SM604", "sn1507", "sn1508", "sn1509", "sn1510", "sn1511", "sn1512", "sn1601", "sn1602", "sn1603", "sn1604", "SR505", "SR507", "SR509", "SR511", "SR601", "SR603", "SR605", "SR607", "SR609", "T1509", "T1512", "T1603", "TA505", "TA506", "TA507", "TA508", "TA509", "TA510", "TA511", "TA512", "TA601", "TA602", "TA603", "TA604", "TC506", "TC507", "TC508", "TC509", "TC510", "TC511", "TC512", "TC601", "TC602", "TC603", "TC604", "TF1506", "TF1509", "TF1512", "TOCRU", "v1505", "v1506", "v1507", "v1508", "v1509", "v1510", "v1511", "v1512", "v1601", "v1602", "v1603", "v1604", "WH505", "WH507", "WH509", "WH511", "WH601", "WH603", "wr1505", "wr1506", "wr1507", "wr1508", "wr1509", "wr1510", "wr1511", "wr1512", "wr1601", "wr1602", "wr1603", "wr1604", "y1505", "y1507", "y1508", "y1509", "y1511", "y1512", "y1601", "y1603", "zn1505", "zn1506", "zn1507", "zn1508", "zn1509", "zn1510", "zn1511", "zn1512", "zn1601", "zn1602", "zn1603", "zn1604"]

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1 @@
["510050C1512M02950", "510050P1512M03500", "510050C1512M02900", "510050P1512M02950", "510050P1512M02900", "510050C1505M03200", "510050P1505M03200", "510050C1506M03200", "510050P1506M03200", "510050C1509M03200", "510050P1509M03200", "510050C1506M03500", "510050P1505M03500", "510050C1509M03500", "510050P1506M03500", "510050P1512M03300", "510050P1512M03200", "510050C1505M03500", "510050P1512M03400", "510050P1506M02500", "510050C1509M02500", "510050P1509M02500", "510050C1512M03500", "510050P1509M03500", "510050C1506M02500", "510050C1506M03400", "510050P1505M03400", "510050C1509M03300", "510050P1506M03300", "510050C1506M03300", "510050P1505M03300", "510050C1505M03400", "510050P1509M03300", "510050C1505M03300", "510050P1509M02450", "510050C1509M02450", "510050P1506M02450", "510050C1506M02450", "510050P1509M02400", "510050P1506M03000", "510050C1509M03000", "510050P1505M03000", "510050C1506M03000", "510050C1505M03000", "510050P1509M02950", "510050C1509M02950", "510050P1509M02900", "510050P1509M02600", "510050P1506M02600", "510050C1509M02600", "510050C1506M02650", "510050P1506M02650", "510050C1509M02650", "510050P1509M03000", "510050C1505M03100", "510050C1506M03100", "510050P1505M03100", "510050C1509M03100", "510050P1506M03100", "510050P1509M03100", "510050C1506M02750", "510050P1506M02750", "510050C1509M02750", "510050P1509M02750", "510050P1509M02550", "510050C1506M02550", "510050C1509M02550", "510050P1506M02550", "510050C1506M02600", "510050P1506M02850", "510050C1509M02850", "510050P1509M02850", "510050P1509M02700", "510050C1505M02850", "510050P1505M02800", "510050P1505M02850", "510050C1506M02850", "510050P1509M02650", "510050C1509M02700", "510050P1506M02700", "510050C1506M02700", "510050P1509M02200", "510050P1509M02250", "510050C1509M02350", "510050C1509M02400", "510050C1509M02250", "510050C1509M02300", "510050P1506M02400", "510050C1509M02200", "510050P1509M02300", "510050P1509M02350", "510050P1506M02900", "510050C1506M02950", "510050C1506M02900", "510050P1505M02950", "510050P1505M02900", "510050C1505M02950", "510050C1505M02900", "510050C1509M02900", "510050P1506M02950", "510050C1505M02650", "510050C1505M02700", "510050P1506M02800", "510050C1509M02800", "510050C1506M02800", "510050C1505M02550", "510050C1505M02600", "510050P1509M02800", "510050C1505M02500", "510050P1509M03400", "510050P1506M02300", "510050C1505M02800", "510050C1505M02750", "510050P1505M02700", "510050P1505M02750", "510050P1505M02550", "510050P1505M02500", "510050P1505M02650", "510050P1505M02600", "510050C1506M02300", "510050C1506M02250", "510050P1506M02350", "510050C1512M03000", "510050P1506M03400", "510050C1509M03400", "510050C1512M03300", "510050C1512M03400", "510050C1512M03100", "510050C1512M03200", "510050C1506M02200", "510050P1512M03000", "510050P1512M03100", "510050C1506M02400", "510050C1506M02350", "510050P1506M02250", "510050P1506M02200"]

File diff suppressed because one or more lines are too long

84
vn.datayes/prepare.sh Executable file
View File

@ -0,0 +1,84 @@
#!/bin/bash
dir_n=names
if [ ! -d $dir_n ]; then
mkdir $dir_n
fi
dir_c=config
if [ ! -d $dir_c ]; then
mkdir $dir_c
fi
echo [vn-past]: Configuration starts.
python - << EOF
from storage import *
import pandas as pd
import os
dc = DBConfig()
api = PyApi(Config())
mc = MongodController(dc, api)
mc._collNames['equTicker'] = mc._allEquTickers()
print '[MONGOD]: Equity tickers collected.'
mc._collNames['secID'] = mc._allSecIds()
print '[MONGOD]: Security IDs collected.'
mc._collNames['futTicker'] = mc._allFutTickers()
print '[MONGOD]: Future tickers collected.'
mc._collNames['optTicker'] = mc._allOptTickers()
print '[MONGOD]: Option symbols collected.'
mc._collNames['fudTicker'] = mc._allFudTickers()
print '[MONGOD]: Mutual Fund symbols collected.'
mc._collNames['idxTicker'] = mc._allIdxTickers()
print '[MONGOD]: Index symbols collected.'
mc._ensure_index()
EOF
echo [vn-past]: Configuration finished.
echo [vn-past]: Selected databases:
cd ./names
ls -l
echo [vn-past]: Prepare to construct[c]/update[u] databases...
read -r -p "[vn-past]: Waiting for orders[c/u]: " response
if [[ $response =~ ^([uU][pP][dD][aA][tT][eE]|[uU])$ ]]
then
echo [API]: Prepare to update data...
read -r -p "[API]: Confirm? [y/N] " response
if [[ $response =~ ^([yY][eE][sS]|[yY])$ ]]
then
cd -
chmod +x update.sh
./update.sh
else
echo [vn-past]: Do not update.
:
fi
else
echo [API]: Prepare to download Bars...
read -r -p "[API]: Confirm? [y/N] " response
if [[ $response =~ ^([yY][eE][sS]|[yY])$ ]]
then
cd -
chmod +x download.sh
./download.sh
else
echo [vn-past]: Do not download.
:
fi
fi
echo [vn-past]: Finished.

Binary file not shown.

After

Width:  |  Height:  |  Size: 618 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 234 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 465 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 465 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 320 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 319 KiB

View File

@ -0,0 +1,348 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#VN.DATAYES - Welcome!\n",
"***\n",
"##1. Preface\n",
"\n",
"###1.1\n",
"vn.datayes是一个从属于vnpy的开源历史数据模块使用通联数据API以及MongoDB进行数据的下载和存储管理。项目目前与将来主要解决\\准备解决以下问题:\n",
"\n",
"* 从通联数据等API高效地爬取、更新、清洗历史数据。\n",
"* 基于MongoDB的数据库管理、快速查询、转换输出格式支持自定义符合需求的行情历史数据库。\n",
"* 基于Python.Matplotlib或R.ggplot2快速绘制K线图等可视化对象。\n",
"\n",
"项目目前主要包括了通联API开发者试用方案中大部分的市场行情日线数据股票、期货、期权、指数、基金等以及部分基本面数据。数据下载与更新主要采用多线程设计测试效率如下\n",
"\n",
"| 数据集举例 | 数据集容量 | 下载时间估计 |\n",
"| :-------------: | :-------------: | :-------------: |\n",
"| 股票日线数据2800个交易代码2013年1月1日至2015年8月1日 | 2800个collection约500条/each | 7-10分钟 |\n",
"| 股票分钟线数据2个交易代码2013年1月1日至2015年8月1日 | 2个collection约20万条/each | 1-2分钟 |\n",
"| 股票日线数据更新任务2800个交易代码2015年8月1日至2015年8月15日 | 2800个collection约10条/each | 1-2分钟 |\n",
"\n",
"vn.datayes基于MongoDB数据库通过一个json配置文件简化数据库的初始化、设置、动态更新过程。较为精细的数据库操作仍需编写脚本进行。若对MongoDB与pymongo不熟悉推荐使用Robomongo等窗口化查看工具作为辅助。\n",
"\n",
"###1.2 主要依赖:\n",
"pymongo, pandas, requests, json\n",
"###1.3 开发测试环境:\n",
"Mac OS X 10.10; Windows 7 || Anaconda.Python 2.7"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"* * *\n",
"##2. Get Started\n",
"###2.1 准备\n",
"\n",
"* 下载并安装MongoDB: https://www.mongodb.org/downloads\n",
"* 获取API token以通联数据为例。\n",
"\n",
"![fig1](figs/fig1.png)\n",
"\n",
"* 更新pymongo至3.0以上版本; 更新requests等包。 \n",
"```\n",
"~$ pip install pymongo --upgrade\n",
"~$ pip install requests --upgrade\n",
"```\n",
"\n",
"* [ ! 注意本模块需要pymongo3.0新加入的部分方法使用vnpy本体所用的2.7版本对应方法将无法正常插入数据。依赖冲突的问题会尽快被解决目前推荐制作一个virtual environment来单独运行这个模块或者暴力切换pymongo的版本]\n",
"```\n",
"~$ pip install pymongo==3.0.3 # this module.\n",
"~$ pip install pymongo==2.7.2 # pymongo 2.7.\n",
"```\n",
"\n",
"* 启动MongoDB\n",
"```\n",
"~$ mongod\n",
"```\n",
"\n",
"\n",
"###2.2 数据库初始化与下载\n",
"* **api.Config** 对象包含了向API进行数据请求所需的信息我们需要一个用户token来初始化这个对象。"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"{'domain': 'api.wmcloud.com/data',\n",
" 'header': {'Authorization': 'Bearer 7c2e59e212dbff90ffd6b382c7afb57bc987a99307d382b058af6748f591d723',\n",
" 'Connection': 'keep-alive'},\n",
" 'ssl': False,\n",
" 'version': 'v1'}"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from storage import *\n",
"\n",
"myConfig = Config(head=\"Zed's Config\", \n",
" token='7c2e59e212dbff90ffd6b382c7afb57bc987a99307d382b058af6748f591d723')\n",
"myConfig.body"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"* * *\n",
"* **storage.DBConfig** 对象包含了数据库配置。我们需要自己编写一个json字典来填充这个对象。举例来说我们希望下载股票日线数据和指数日线数据数据库名称为DATAYES_EQUITY_D1和DATAYES_INDEX_D1index为日期“date”。那么json字典是这样的"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"{'client': MongoClient('localhost', 27017),\n",
" 'dbNames': ['EQU_M1', 'EQU_D1', 'FUT_D1', 'OPT_D1', 'FUD_D1', 'IDX_D1'],\n",
" 'dbs': {'EQU_D1': {'collNames': 'equTicker',\n",
" 'index': 'date',\n",
" 'self': Database(MongoClient('localhost', 27017), u'DATAYES_EQUITY_D1')},\n",
" 'EQU_M1': {'collNames': 'secID',\n",
" 'index': 'dateTime',\n",
" 'self': Database(MongoClient('localhost', 27017), u'DATAYES_EQUITY_M1')},\n",
" 'FUD_D1': {'collNames': 'fudTicker',\n",
" 'index': 'date',\n",
" 'self': Database(MongoClient('localhost', 27017), u'DATAYES_FUND_D1')},\n",
" 'FUT_D1': {'collNames': 'futTicker',\n",
" 'index': 'date',\n",
" 'self': Database(MongoClient('localhost', 27017), u'DATAYES_FUTURE_D1')},\n",
" 'IDX_D1': {'collNames': 'idxTicker',\n",
" 'index': 'date',\n",
" 'self': Database(MongoClient('localhost', 27017), u'DATAYES_INDEX_D1')},\n",
" 'OPT_D1': {'collNames': 'optTicker',\n",
" 'index': 'date',\n",
" 'self': Database(MongoClient('localhost', 27017), u'DATAYES_OPTION_D1')}}}"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"client = pymongo.MongoClient() # pymongo.connection object.\n",
"\n",
"body = {\n",
" 'client': client, # connection object.\n",
" 'dbs': {\n",
" 'EQU_D1': { # in-python alias: 'EQU_D1'\n",
" 'self': client['DATAYES_EQUITY_D1'], # pymongo.database[name] object.\n",
" 'index': 'date', # index name.\n",
" 'collNames': 'equTicker' # what are collection names consist of.\n",
" },\n",
" 'IDX_D1': { # Another database\n",
" 'self': client['DATAYES_INDEX_D1'],\n",
" 'index': 'date',\n",
" 'collNames': 'idxTicker'\n",
" }\n",
" },\n",
" 'dbNames': ['EQU_D1','IDX_D1'] # List of alias.\n",
"}\n",
"\n",
"myDbConfig_ = DBConfig(body=body)\n",
"\n",
"# 这看上去有些麻烦不想这么做的话可以直接使用DBConfig的默认构造函数。\n",
"\n",
"myDbConfig = DBConfig()\n",
"\n",
"myDbConfig.body"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"* * *\n",
"* **api.PyApi**是向网络数据源进行请求的主要对象。**storage.MongodController**是进行数据库管理的对象。当我们完成了配置对象的构造即可初始化PyApi与MongodController。**MongodController._get_coll_names()** 和**MongodController._ensure_index()** 是数据库初始化所调用的方法,为了模块开发的方便,它们暂时没有被放进构造函数中自动执行。"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[MONGOD]: Collection names gotten.\n",
"[MONGOD]: MongoDB index set.\n"
]
},
{
"data": {
"text/plain": [
"1"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"myApi = PyApi(myConfig) # construct PyApi object.\n",
"mc = MongodController(api=myApi, config=myDbConfig) # construct MongodController object, \n",
" # on the top of PyApi.\n",
"mc._get_coll_names() # get names of collections.\n",
"mc._ensure_index() # ensure collection indices."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![fig2](figs/fig2.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"* 使用**MongodController.download#()**方法进行下载。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"mc.download_index_D1('20150101','20150801')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![fig3](figs/fig3.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"###2.3 数据库更新\n",
"* 使用**MongodController.update#()**方法进行更新。脚本会自动寻找数据库中的最后一日并更新至最新交易日。"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"datetime.datetime(2015, 8, 17, 10, 49, 21, 37758)"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from datetime import datetime\n",
"datetime.now()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"mc.update_index_D1()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![fig4](figs/fig4.png)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"###2.4 Mac OS或Linux下的下载与更新\n",
"模块中包含了一些shell脚本方面在linux-like os下的数据下载、更新。\n",
"```\n",
"~$ cd path/of/vn/datayes\n",
"~$ chmod +x prepare.sh\n",
"~$ ./prepare.sh\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"![fig5](figs/fig5.png)\n",
"![fig6](figs/fig6.png)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 0
}

View File

@ -0,0 +1,194 @@
#VN.DATAYES - Welcome!
***
##1. Preface
###1.1
vn.datayes是一个从属于vnpy的开源历史数据模块使用通联数据API以及MongoDB进行数据的下载和存储管理。项目目前与将来主要解决\准备解决以下问题:
* 从通联数据等API高效地爬取、更新、清洗历史数据。
* 基于MongoDB的数据库管理、快速查询、转换输出格式支持自定义符合需求的行情历史数据库。
* 基于Python.Matplotlib或R.ggplot2快速绘制K线图等可视化对象。
项目目前主要包括了通联API开发者试用方案中大部分的市场行情日线数据股票、期货、期权、指数、基金等以及部分基本面数据。数据下载与更新主要采用多线程设计测试效率如下
| 数据集举例 | 数据集容量 | 下载时间估计 |
| :-------------: | :-------------: | :-------------: |
| 股票日线数据2800个交易代码2013年1月1日至2015年8月1日 | 2800个collection约500条/each | 7-10分钟 |
| 股票分钟线数据2个交易代码2013年1月1日至2015年8月1日 | 2个collection约20万条/each | 1-2分钟 |
| 股票日线数据更新任务2800个交易代码2015年8月1日至2015年8月15日 | 2800个collection约10条/each | 1-2分钟 |
vn.datayes基于MongoDB数据库通过一个json配置文件简化数据库的初始化、设置、动态更新过程。较为精细的数据库操作仍需编写脚本进行。若对MongoDB与pymongo不熟悉推荐使用Robomongo等窗口化查看工具作为辅助。
###1.2 主要依赖:
pymongo, pandas, requests, json
###1.3 开发测试环境:
Mac OS X 10.10; Windows 7 || Anaconda.Python 2.7
* * *
##2. Get Started
###2.1 准备
* 下载并安装MongoDB: https://www.mongodb.org/downloads
* 获取API token以通联数据为例。
![fig1](figs/fig1.png)
* 更新pymongo至3.0以上版本; 更新requests等包。
```
~$ pip install pymongo --upgrade
~$ pip install requests --upgrade
```
* [ ! 注意本模块需要pymongo3.0新加入的部分方法使用vnpy本体所用的2.7版本对应方法将无法正常插入数据。依赖冲突的问题会尽快被解决目前推荐制作一个virtual environment来单独运行这个模块或者暴力切换pymongo的版本]
```
~$ pip install pymongo==3.0.3 # this module.
~$ pip install pymongo==2.7.2 # pymongo 2.7.
```
* 启动MongoDB
```
~$ mongod
```
###2.2 数据库初始化与下载
* **api.Config** 对象包含了向API进行数据请求所需的信息我们需要一个用户token来初始化这个对象。
from storage import *
myConfig = Config(head="Zed's Config",
token='7c2e59e212dbff90ffd6b382c7afb57bc987a99307d382b058af6748f591d723')
myConfig.body
{'domain': 'api.wmcloud.com/data',
'header': {'Authorization': 'Bearer 7c2e59e212dbff90ffd6b382c7afb57bc987a99307d382b058af6748f591d723',
'Connection': 'keep-alive'},
'ssl': False,
'version': 'v1'}
* * *
* **storage.DBConfig** 对象包含了数据库配置。我们需要自己编写一个json字典来填充这个对象。举例来说我们希望下载股票日线数据和指数日线数据数据库名称为DATAYES_EQUITY_D1和DATAYES_INDEX_D1index为日期“date”。那么json字典是这样的
client = pymongo.MongoClient() # pymongo.connection object.
body = {
'client': client, # connection object.
'dbs': {
'EQU_D1': { # in-python alias: 'EQU_D1'
'self': client['DATAYES_EQUITY_D1'], # pymongo.database[name] object.
'index': 'date', # index name.
'collNames': 'equTicker' # what are collection names consist of.
},
'IDX_D1': { # Another database
'self': client['DATAYES_INDEX_D1'],
'index': 'date',
'collNames': 'idxTicker'
}
},
'dbNames': ['EQU_D1','IDX_D1'] # List of alias.
}
myDbConfig_ = DBConfig(body=body)
# 这看上去有些麻烦不想这么做的话可以直接使用DBConfig的默认构造函数。
myDbConfig = DBConfig()
myDbConfig.body
{'client': MongoClient('localhost', 27017),
'dbNames': ['EQU_M1', 'EQU_D1', 'FUT_D1', 'OPT_D1', 'FUD_D1', 'IDX_D1'],
'dbs': {'EQU_D1': {'collNames': 'equTicker',
'index': 'date',
'self': Database(MongoClient('localhost', 27017), u'DATAYES_EQUITY_D1')},
'EQU_M1': {'collNames': 'secID',
'index': 'dateTime',
'self': Database(MongoClient('localhost', 27017), u'DATAYES_EQUITY_M1')},
'FUD_D1': {'collNames': 'fudTicker',
'index': 'date',
'self': Database(MongoClient('localhost', 27017), u'DATAYES_FUND_D1')},
'FUT_D1': {'collNames': 'futTicker',
'index': 'date',
'self': Database(MongoClient('localhost', 27017), u'DATAYES_FUTURE_D1')},
'IDX_D1': {'collNames': 'idxTicker',
'index': 'date',
'self': Database(MongoClient('localhost', 27017), u'DATAYES_INDEX_D1')},
'OPT_D1': {'collNames': 'optTicker',
'index': 'date',
'self': Database(MongoClient('localhost', 27017), u'DATAYES_OPTION_D1')}}}
* * *
* **api.PyApi**是向网络数据源进行请求的主要对象。**storage.MongodController**是进行数据库管理的对象。当我们完成了配置对象的构造即可初始化PyApi与MongodController。**MongodController._get_coll_names()** 和**MongodController._ensure_index()** 是数据库初始化所调用的方法,为了模块开发的方便,它们暂时没有被放进构造函数中自动执行。
myApi = PyApi(myConfig) # construct PyApi object.
mc = MongodController(api=myApi, config=myDbConfig) # construct MongodController object,
# on the top of PyApi.
mc._get_coll_names() # get names of collections.
mc._ensure_index() # ensure collection indices.
[MONGOD]: Collection names gotten.
[MONGOD]: MongoDB index set.
1
![fig2](figs/fig2.png)
* 使用**MongodController.download#()**方法进行下载。
mc.download_index_D1('20150101','20150801')
![fig3](figs/fig3.png)
###2.3 数据库更新
* 使用**MongodController.update#()**方法进行更新。脚本会自动寻找数据库中的最后一日并更新至最新交易日。
from datetime import datetime
datetime.now()
datetime.datetime(2015, 8, 17, 10, 49, 21, 37758)
mc.update_index_D1()
![fig4](figs/fig4.png)
###2.4 Mac OS或Linux下的下载与更新
模块中包含了一些shell脚本方面在linux-like os下的数据下载、更新。
```
~$ cd path/of/vn/datayes
~$ chmod +x prepare.sh
~$ ./prepare.sh
```
![fig5](figs/fig5.png)
![fig6](figs/fig6.png)

657
vn.datayes/storage.py Normal file
View File

@ -0,0 +1,657 @@
import os
import json
import pymongo
import pandas as pd
from datetime import datetime, timedelta
from api import Config, PyApi
from api import BaseDataContainer, History, Bar
from errors import (VNPAST_ConfigError, VNPAST_RequestError,
VNPAST_DataConstructorError, VNPAST_DatabaseError)
class DBConfig(Config):
"""
Json-like config object; inherits from Config()
Contains all kinds of settings relating to database settings.
privates
--------
Inherited from api.Config, plus:
* client: pymongo.MongoClient object, the connection
that is to be used for this session.
* body: dictionary; the main content of config.
- client: pymongo.MongoClient(), refers to self.client.
- dbs: dictionary, is a mapping from database alias
to another dictionary, which inclues configurations
and themselves(i.e. pymongo.database entity)
Concretely, dbs has the structure like:
{
alias1 : {
'self': client[dbName1],
'index': dbIndex1,
'collNames': collectionNameType1
},
alias2 : {
'self': client[dbName2],
'index': dbIndex2,
'collNames': collectionNameType2
}, ...
}
where alias#: string;
dbs.alias#.self: pymongo.database;
dbs.alias#.index: string;
dbs.alias#.collNames: string;
- dbNames: list; a list of database alias.
"""
head = 'DB config'
client = pymongo.MongoClient()
body = {
'client': client,
'dbs': {
'EQU_M1': {
'self': client['DATAYES_EQUITY_M1'],
'index': 'dateTime',
'collNames': 'secID'
},
'EQU_D1': {
'self': client['DATAYES_EQUITY_D1'],
'index': 'date',
'collNames': 'equTicker'
},
'FUT_D1': {
'self': client['DATAYES_FUTURE_D1'],
'index': 'date',
'collNames': 'futTicker'
},
'OPT_D1': {
'self': client['DATAYES_OPTION_D1'],
'index': 'date',
'collNames': 'optTicker'
},
'FUD_D1': {
'self': client['DATAYES_FUND_D1'],
'index': 'date',
'collNames': 'fudTicker'
},
'IDX_D1': {
'self': client['DATAYES_INDEX_D1'],
'index': 'date',
'collNames': 'idxTicker'
}
},
'dbNames': ['EQU_M1', 'EQU_D1', 'FUT_D1',
'OPT_D1', 'FUD_D1', 'IDX_D1']
}
def __init__(self, head=None, token=None, body=None):
"""
Inherited constructor.
parameters
----------
* head: string; the name of config file. Default is None.
* token: string; user's token.
* body: dictionary; the main content of config
"""
super(DBConfig, self).__init__(head, token, body)
def view(self):
""" Reloaded Prettify printing method. """
config_view = {
'dbConfig_head' : self.head,
'dbConfig_body' : str(self.body),
}
print json.dumps(config_view,
indent=4,
sort_keys=True)
#----------------------------------------------------------------------
# MongoDB Controller class
class MongodController(object):
"""
The MongoDB controller interface.
MongodController is initialized with a DBConfig configuration
object and a PyApi object, which has already been contructed with
its own Config json. The default version of constructor actually
does nothing special about the database. Yet if user executes shell
script prepare.sh to prepare the connection, MongodController will
firstly gather symbols that are going to become collection names
in corresponding databases. This process is done one database by another,
user can skip useless databases by editing the scripts.
Then, it ensures the index of each collection due to the 'index' value
in DBConfig.body.dbs. Concretely, for D1 bars, the index will be 'date',
and for intraday bars, it will be 'dateTime'; both take the form of
datetime.datetime timestamp.
download() and update() methods of controller dynamically construct
and maintain the databases, requesting data via PyApi. Once the database
is constructed, MongodController can access required data via its fetch()
method.
privates
--------
* _config: DBConfig object; a container of all useful settings for the
databases.
* _api: PyApi object; is responsible for making requests.
* _client: pymongo.MongoClient object; the connection to MongoDB.
* _dbs: dictionary; a mapping from database names to another dictionary,
which includes configurations of the database and the pymongo.database
entity. Inherited from _config.body.['dbs']. Note that keys
self._dbs are mere strings, only self._dbs[key]['self'] refers to the
pymongo.Database object.
* _dbNames: list; a list of names of databases.
* _collNames: dictionary; mapping from self._db[key]['collNames'] attribute
to the names of collections(i.e. tickers) within.
- example: _collNames['equTicker'] = ['000001', '000002', ...]
* _connected: boolean; whether the MongoClient was connected to or not.
* _mapTickersToSecIDs: dictionary; mapping from stock tickers to
its security ID.
example
-------
>> myApi = PyApi(Config())
>> mydbs = DBConfig()
>> controller = MongodController(mydbs, myApi)
>> controller._get_coll_names()
>> controller._ensure_index()
>> controller.download_equity_D1(20130101, 20150801)
>> controller.update_equity_D1()
"""
_config = DBConfig()
_api = None
_client = None
_dbs = None
_dbNames = []
_collNames = dict()
_connected = False
_mapTickersToSecIDs = dict()
def __init__(self, config, api):
"""
Constructor.
parameters
----------
* config: DBConfig object; specifies database configs.
* api: PyApi object.
"""
self._api = api # Set Datayes PyApi.
if config.body:
try:
self._config = config.body
self._client = config.body['client']
self._dbs = config.body['dbs']
self._dbNames = config.body['dbNames']
self._connected = True
except KeyError:
msg = '[MONGOD]: Unable to configure database; ' + \
'config file is incomplete.'
raise VNPAST_ConfigError(msg)
except Exception,e:
msg = '[MONGOD]: Unable to configure database; ' + str(e)
raise VNPAST_ConfigError(msg)
if self._connected:
#self._get_coll_names()
#self._ensure_index()
pass
def view(self):
"""
NOT IMPLEMENTED
"""
return
#----------------------------------------------------------------------
# Get collection names methods.
"""
Decorator;
Targeting at path dName, if exists, read data from this file;
if not, execute handle() which returns a json-like data and
stores the data at dName path.
parameters
----------
* dName: string; the specific path of file that __md looks at.
"""
def __md(dName):
def _md(get):
def handle(*args, **kwargs):
try:
if os.path.isfile(dName):
# if directory exists, read from it.
jsonFile = open(dName,'r')
data = json.loads(jsonFile.read())
jsonFile.close()
else:
# if not, get data via *get method,
# then write to the file.
data = get(*args, **kwargs)
jsonFile = open(dName, 'w+')
jsonFile.write(json.dumps(data))
jsonFile.close()
#print data
return data
except Exception,e:
raise e
return handle
return _md
@__md('names/equTicker.json')
def _allEquTickers(self):
"""get all equity tickers, decorated by @__md()."""
data = self._api.get_equity_D1()
allEquTickers = list(data.body['ticker'])
return allEquTickers
@__md('names/secID.json')
def _allSecIds(self):
"""get all security IDs, decorated by @__md()."""
data = self._api.get_equity_D1()
allTickers = list(data.body['ticker'])
exchangeCDs = list(data.body['exchangeCD'])
allSecIds = [allTickers[k]+'.'+exchangeCDs[k] for k in range(
len(allTickers))]
return allSecIds
@__md('names/futTicker.json')
def _allFutTickers(self):
"""get all future tickers, decorated by @__md()."""
data = self._api.get_future_D1()
allFutTickers = list(data.body['ticker'])
return allFutTickers
@__md('names/optTicker.json')
def _allOptTickers(self):
"""get all option tickers, decorated by @__md()."""
data = self._api.get_option_D1()
allOptTickers = list(data.body['ticker'])
return allOptTickers
@__md('names/fudTicker.json')
def _allFudTickers(self):
"""get all fund tickers, decorated by @__md()."""
data = self._api.get_fund_D1()
allFudTickers = list(data.body['ticker'])
return allFudTickers
@__md('names/idxTicker.json')
def _allIdxTickers(self):
"""get all index tickers, decorated by @__md()."""
data = self._api.get_index_D1()
allIdxTickers = list(data.body['ticker'])
return allIdxTickers
@__md('names/bndTicker.json')
def _allBndTickers(self):
"""get all bond tickers, decorated by @__md()."""
data = self._api.get_bond_D1()
allBndTickers = list(data.body['ticker'])
return allBndTickers
def _get_coll_names(self):
"""
get all instruments'names and store them in self._collNames.
"""
try:
if not os.path.exists('names'):
os.makedirs('names')
self._collNames['equTicker'] = self._allEquTickers()
self._collNames['fudTicker'] = self._allFudTickers()
self._collNames['secID'] = self._allSecIds()
self._collNames['futTicker'] = self._allFutTickers()
self._collNames['optTicker'] = self._allOptTickers()
self._collNames['idxTicker'] = self._allIdxTickers()
print '[MONGOD]: Collection names gotten.'
return 1
except AssertionError:
warning = '[MONGOD]: Warning, collection names ' + \
'is an empty list.'
print warning
except Exception, e:
msg = '[MONGOD]: Unable to set collection names; ' + \
str(e)
raise VNPAST_DatabaseError(msg)
#----------------------------------------------------------------------
# Ensure collection index method.
def _ensure_index(self):
"""
Ensure indices for all databases and collections.
first access self._dbs config to get index column names;
then get collection names from self._collNames and loop
over all collections.
"""
if self._collNames and self._dbs:
try:
for dbName in self._dbs:
# Iterate over database configurations.
db = self._dbs[dbName]
dbSelf = db['self']
index = db['index']
collNames = self._collNames[db['collNames']]
# db['self'] is the pymongo.Database object.
for name in collNames:
coll = dbSelf[name]
coll.ensure_index([(index,
pymongo.DESCENDING)], unique=True)
print '[MONGOD]: MongoDB index set.'
return 1
except KeyError:
msg = '[MONGOD]: Unable to set collection indices; ' + \
'infomation in Config.body["dbs"] is incomplete.'
raise VNPAST_DatabaseError(msg)
except Exception, e:
msg = '[MONGOD]: Unable to set collection indices; ' + str(e)
raise VNPAST_DatabaseError(msg)
#----------------------------------------------------------------------
# Download method.
def download_equity_D1(self, start, end, sessionNum=30):
"""
"""
try:
db = self._dbs['EQU_D1']['self']
self._api.get_equity_D1_mongod(db, start, end, sessionNum)
except Exception, e:
msg = '[MONGOD]: Unable to download data; ' + str(e)
raise VNPAST_DatabaseError(msg)
def download_equity_M1(self, tasks, startYr=2012, endYr=2015):
"""
"""
try:
# map equity tickers to security IDs.
if self._mapTickersToSecIDs:
maps = self._mapTickersToSecIDs
else:
assert os.isfile('./names/secID.json')
jsonFile = open(dName,'r')
allSecIds = json.loads(jsonFile.read())
jsonFile.close()
allTickers = [s.split('.')[0] for s in allSecIds]
maps = dict(zip(allTickers, allSecIds))
self._mapTickersToSecIDs = maps
tasks_ = [maps[task] for task in tasks]
db = self._dbs['EQU_M1']['self']
self._api.get_equity_M1_interMonth(db, id=1,
startYr = startYr,
endYr = endYr,
tasks = tasks_)
except AssertionError:
msg = '[MONGOD]: Cannot map tickers to secIDs; ' + \
'secID.json does not exist.'
raise VNPAST_DatabaseError(msg)
except Exception, e:
msg = '[MONGOD]: Unable to download data; ' + str(e)
raise VNPAST_DatabaseError(msg)
def download_bond_D1(self, start, end, sessionNum=30):
"""
"""
pass
def download_future_D1(self, start, end, sessionNum=30):
"""
"""
try:
db = self._dbs['FUT_D1']['self']
self._api.get_future_D1_mongod(db, start, end, sessionNum)
except Exception, e:
msg = '[MONGOD]: Unable to download data; ' + str(e)
raise VNPAST_DatabaseError(msg)
def download_option_D1(self, start, end, sessionNum=30):
"""
"""
try:
db = self._dbs['OPT_D1']['self']
self._api.get_option_D1_mongod(db, start, end, sessionNum)
except Exception, e:
msg = '[MONGOD]: Unable to download data; ' + str(e)
raise VNPAST_DatabaseError(msg)
def download_index_D1(self, start, end, sessionNum=30):
"""
"""
try:
db = self._dbs['IDX_D1']['self']
self._api.get_index_D1_mongod(db, start, end, sessionNum)
except Exception, e:
msg = '[MONGOD]: Unable to download data; ' + str(e)
raise VNPAST_DatabaseError(msg)
def download_fund_D1(self, start, end, sessionNum=30):
"""
"""
try:
db = self._dbs['FUD_D1']['self']
self._api.get_fund_D1_mongod(db, start, end, sessionNum)
except Exception, e:
msg = '[MONGOD]: Unable to download data; ' + str(e)
raise VNPAST_DatabaseError(msg)
#----------------------------------------------------------------------
# Update methods.
def __update(self, key, target1, target2, sessionNum):
"""
Basic update method.
Looks into the database specified by 'key', find the latest
record in the collection of it. Then update the collections
till last trading date.
parameters
----------
* key: string; a database alias (refer to the database config)
e.g., 'EQU_D1'.
* target1: method; pointer to the function with which controller
obtain all tickers in the database. Concretely, target1 are
self._all#Tickers methods.
* target2: method; pointer to the api overlord requesting functions
i.e. self._api.get_###_mongod methods.
* sessionNum: integer; the number of threads.
"""
try:
# get databases and tickers
db = self._dbs[key]['self']
index = self._dbs[key]['index']
allTickers = target1()
coll = db[allTickers[0]]
# find the latest timestamp in collection.
latest = coll.find_one(
sort=[(index, pymongo.DESCENDING)])[index]
start = datetime.strftime(
latest + timedelta(days=1),'%Y%m%d')
end = datetime.strftime(datetime.now(), '%Y%m%d')
# then download.
target2(db, start, end, sessionNum)
return db
except Exception, e:
msg = '[MONGOD]: Unable to update data; ' + str(e)
raise VNPAST_DatabaseError(msg)
def update_equity_D1(self, sessionNum=30):
"""
"""
db = self.__update(key = 'EQU_D1',
target1 = self._allEquTickers,
target2 = self._api.get_equity_D1_mongod,
sessionNum = sessionNum)
return db
def update_future_D1(self, sessionNum=30):
"""
"""
db = self.__update(key = 'FUT_D1',
target1 = self._allFutTickers,
target2 = self._api.get_future_D1_mongod,
sessionNum = sessionNum)
return db
def update_option_D1(self, sessionNum=30):
"""
"""
db = self.__update(key = 'OPT_D1',
target1 = self._allOptTickers,
target2 = self._api.get_option_D1_mongod,
sessionNum = sessionNum)
return db
def update_index_D1(self, sessionNum=30):
"""
"""
db = self.__update(key = 'IDX_D1',
target1 = self._allIdxTickers,
target2 = self._api.get_index_D1_mongod,
sessionNum = sessionNum)
return db
def update_fund_D1(self, sessionNum=30):
"""
"""
db = self.__update(key = 'FUD_D1',
target1 = self._allFudTickers,
target2 = self._api.get_fund_D1_mongod,
sessionNum = sessionNum)
return db
#----------------------------------------------------------------------#
# stuff that will be deprecated
def update_equity_D1_(self, sessionNum=30):
"""
"""
try:
# set databases and tickers
db = self._dbs['EQU_D1']['self']
index = self._dbs['EQU_D1']['index']
allEquTickers = self._allEquTickers()
coll = db[allEquTickers[0]]
# find the latest timestamp in collection.
latest = coll.find_one(
sort=[(index, pymongo.DESCENDING)])[index]
start = datetime.strftime(latest + timedelta(days=1),'%Y%m%d')
end = datetime.strftime(datetime.now(), '%Y%m%d')
# then download.
self._api.get_equity_D1_mongod(db, start, end, sessionNum)
except Exception, e:
msg = '[MONGOD]: Unable to update data; ' + str(e)
raise VNPAST_DatabaseError(msg)
def update_equity_M1(self):
"""
"""
pass
#----------------------------------------------------------------------
# Fetch method.
def fetch(self, dbName, ticker, start, end, output='list'):
"""
"""
# check inputs' validity.
if output not in ['df', 'list', 'json']:
raise ValueError('[MONGOD]: Unsupported output type.')
if dbName not in self._dbNames:
raise ValueError('[MONGOD]: Unable to locate database name.')
db = self._dbs[dbName]
dbSelf = db['self']
dbIndex = db['index']
try:
coll = db[ticker]
if len(start)==8 and len(end)==8:
# yyyymmdd, len()=8
start = datetime.strptime(start, '%Y%m%d')
end = datetime.strptime(end, '%Y%m%d')
elif len(start)==14 and len(end)==14:
# yyyymmdd HH:MM, len()=14
start = datetime.strptime(start, '%Y%m%d %H:%M')
end = datetime.strptime(end, '%Y%m%d %H:%M')
else:
pass
docs = []
# find in MongoDB.
for doc in coll.find(filter={dbIndex: {'$lte': end,
'$gte': start}}, projection={'_id': False}):
docs.append(doc)
if output == 'list':
return docs[::-1]
except Exception, e:
msg = '[MONGOD]: Error encountered when fetching data' + \
'from MongoDB; '+ str(e)
return -1
if __name__ == '__main__':
dc = DBConfig()
api = PyApi(Config())
mc = MongodController(dc, api)
mc.update_index_D1()

119
vn.datayes/tests.py Normal file
View File

@ -0,0 +1,119 @@
from api import *
def test_config():
cfig = Config()
print cfig.body, cfig.head, cfig.token
cfig.view()
def test_mktbar_D1():
api = PyApi(Config())
data = api.get_equity_D1()
print data.body
def test_mktbar_M1():
api = PyApi(Config())
data = api.get_equity_M1()
print data.body.tail()
def test_bond_D1():
api = PyApi(Config())
data = api.get_bond_D1()
print data.body.tail()
def test_fut_D1():
api = PyApi(Config())
data = api.get_future_D1()
print data.body
def test_fund_D1():
api = PyApi(Config())
data = api.get_fund_D1()
print data.body
def test_index_D1():
api = PyApi(Config())
data = api.get_index_D1()
print data.body
def test_option_D1():
api = PyApi(Config())
data = api.get_option_D1()
print data.body
def test_factors_D1():
api = PyApi(Config())
data = api.get_stockFactor_D1()
print data.body
def test_bs():
api = PyApi(Config())
data = api.get_balanceSheet()
print data.body
def test_cf():
api = PyApi(Config())
data = api.get_cashFlow()
print data.body
def test_is():
api = PyApi(Config())
data = api.get_incomeStatement()
print data.body
def test_output():
api = PyApi(Config())
data = api.get_equity_D1(ticker='000001',output='list')
print data
def test_mongod_get_drudgery():
c = MongoClient()
db = c['test_dy']
api = PyApi(Config())
api.get_equity_D1_drudgery(id=1, db=db,
start='20130101', end='20150801',
tasks=['000001','000002'])
def test_mongod_get_all():
c = MongoClient()
db = c['test_dy']
api = PyApi(Config())
api.get_equity_D1_mongod(db=db, start='20130101', end='20150801')
def test_mktbar_M1_get_drudgery():
c = MongoClient()
db = c['test_dy_m1']
api = PyApi(Config())
api.get_equity_M1_drudgery(id=1, db=db,
start='20150701', end='20150801',
tasks=['000001.XSHE','000002.XSHE'])
def test_mktbar_M1_get_all():
c = MongoClient()
db = c['test_dy_m1']
api = PyApi(Config())
api.get_equity_M1_mongod(db=db)
def test_mktbar_M1_get_interM():
c = MongoClient()
db = c['test_dy_m1']
api = PyApi(Config())
api.get_equity_M1_interMonth(db=db, id=0, tasks=['000001.XSHE','000002.XSHE'])
if __name__ == '__main__':
#test_config()
#test_mktbar_D1()
#test_bond_D1()
#test_fut_D1()
#test_fund_D1()
#test_index_D1()
#test_option_D1()
#test_factors_D1()
#test_mktbar_M1()
#test_bs()
#test_cf()
#test_is()
#test_output()
#test_mongod_get_all()
#test_mktbar_M1_get_drudgery()
#test_mktbar_M1_get_all()
test_mktbar_M1_get_interM()

52
vn.datayes/update.sh Executable file
View File

@ -0,0 +1,52 @@
#!/bin/bash
echo [API]: Prepare to update DATAYES_FUTURE_D1...
python - << EOF
from storage import *
dc = DBConfig()
api = PyApi(Config())
mc = MongodController(dc, api)
mc.update_future_D1()
EOF
echo [API]: DATAYES_FUTURE_D1 updated.
echo [API]: Prepare to update DATAYES_INDEX_D1...
python - << EOF
from storage import *
dc = DBConfig()
api = PyApi(Config())
mc = MongodController(dc, api)
mc.update_index_D1()
EOF
echo [API]: DATAYES_INDEX_D1 updated.
echo [API]: Prepare to update DATAYES_OPTION_D1...
python - << EOF
from storage import *
dc = DBConfig()
api = PyApi(Config())
mc = MongodController(dc, api)
mc.update_option_D1()
EOF
echo [API]: DATAYES_OPTION_D1 updated.
echo [API]: Prepare to update DATAYES_FUND_D1...
python - << EOF
from storage import *
dc = DBConfig()
api = PyApi(Config())
mc = MongodController(dc, api)
mc.update_fund_D1()
EOF
echo [API]: DATAYES_FUND_D1 updated.
echo [MONGOD]: Update finished.
echo [API]: Prepare to update DATAYES_EQUITY_D1...
python - << EOF
from storage import *
dc = DBConfig()
api = PyApi(Config())
mc = MongodController(dc, api)
mc.update_equity_D1()
EOF
echo [API]: DATAYES_EQUITY_D1 updated.

View File

@ -477,12 +477,13 @@ class DemoTdApi(TdApi):
#----------------------------------------------------------------------
def onRspQrySettlementInfo(self, data, error, n, last):
"""查询结算信息回报"""
event = Event(type_=EVENT_LOG)
log = u'结算信息查询完成'
event.dict_['log'] = log
self.__eventEngine.put(event)
self.confirmSettlement() # 查询完成后立即确认结算信息
if last:
event = Event(type_=EVENT_LOG)
log = u'结算信息查询完成'
event.dict_['log'] = log
self.__eventEngine.put(event)
self.confirmSettlement() # 查询完成后立即确认结算信息
#----------------------------------------------------------------------
def onRspQryTransferBank(self, data, error, n, last):

View File

@ -477,12 +477,13 @@ class DemoTdApi(TdApi):
#----------------------------------------------------------------------
def onRspQrySettlementInfo(self, data, error, n, last):
"""查询结算信息回报"""
event = Event(type_=EVENT_LOG)
log = u'结算信息查询完成'
event.dict_['log'] = log
self.__eventEngine.put(event)
self.confirmSettlement() # 查询完成后立即确认结算信息
if last:
event = Event(type_=EVENT_LOG)
log = u'结算信息查询完成'
event.dict_['log'] = log
self.__eventEngine.put(event)
self.confirmSettlement() # 查询完成后立即确认结算信息
#----------------------------------------------------------------------
def onRspQryTransferBank(self, data, error, n, last):

991
vn.trader/ctpGateway.py Normal file
View File

@ -0,0 +1,991 @@
# encoding: UTF-8
from vnctpmd import MdApi
from vnctptd import TdApi
from gateway import *
import os
########################################################################
class CtpGateway(VtGateway):
"""CTP接口"""
#----------------------------------------------------------------------
def __init__(self, eventEngine):
"""Constructor"""
super(CtpGateway, self).__init__(eventEngine)
self.mdApi = None # 行情API
self.tdApi = None # 交易API
self.mdConnected = False # 行情API连接状态
self.tdConnected = False # 交易API连接状态
########################################################################
class CtpMdApi(MdApi):
"""CTP行情API实现"""
#----------------------------------------------------------------------
def __init__(self, gateway, userID, password, brokerID, address):
"""Constructor"""
super(CtpMdApi, self).__init__()
self.gateway = gateway # gateway对象
self.gatewayName = gateway.gatewayName # gateway对象名称
self.reqID = EMPTY_INT # 操作请求编号
self.connectionStatus = False # 连接状态
self.loginStatus = False # 登录状态
self.userID = userID # 账号
self.password = password # 密码
self.brokerID = brokerID # 经纪商代码
self.address = address # 服务器地址
self.subscribedSymbols = set() # 已订阅合约代码
#----------------------------------------------------------------------
def onFrontConnected(self):
"""服务器连接"""
self.connectionStatus = True
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'行情服务器连接成功'
self.gateway.onLog(log)
self.login()
#----------------------------------------------------------------------
def onFrontDisconnected(self, n):
"""服务器断开"""
self.connectionStatus = False
self.loginStatus = False
self.gateway.mdConnected = False
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'行情服务器连接断开'
self.gateway.onLog(log)
#----------------------------------------------------------------------
def onHeartBeatWarning(self, n):
"""心跳报警"""
# 因为API的心跳报警比较常被触发且与API工作关系不大因此选择忽略
pass
#----------------------------------------------------------------------
def onRspError(self, error, n, last):
"""错误回报"""
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = error['ErrorID']
err.errorMsg = error['ErrorMsg'].decode('gbk')
self.gateway.onError(err)
#----------------------------------------------------------------------
def onRspUserLogin(self, data, error, n, last):
"""登陆回报"""
# 如果登录成功,推送日志信息
if error['ErrorID'] == 0:
self.loginStatus = True
self.gateway.mdConnected = True
log = VtLogData()
log.logContent = u'行情服务器登录完成'
self.gateway.onLog(log)
# 重新订阅之前订阅的合约
for subscribeReq in self.subscribedSymbols:
self.subscribe(subscribeReq)
# 否则,推送错误信息
else:
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = error['ErrorID']
err.errorMsg = error['ErrorMsg'].decode('gbk')
self.gateway.onError(err)
#----------------------------------------------------------------------
def onRspUserLogout(self, data, error, n, last):
"""登出回报"""
# 如果登出成功,推送日志信息
if error['ErrorID'] == 0:
self.loginStatus = False
self.gateway.tdConnected = False
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'行情服务器登出完成'
self.gateway.onLog(log)
# 否则,推送错误信息
else:
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = error['ErrorID']
err.errorMsg = error['ErrorMsg'].decode('gbk')
self.gateway.onError(err)
#----------------------------------------------------------------------
def onRspSubMarketData(self, data, error, n, last):
"""订阅合约回报"""
# 通常不在乎订阅错误,选择忽略
pass
#----------------------------------------------------------------------
def onRspUnSubMarketData(self, data, error, n, last):
"""退订合约回报"""
# 同上
pass
#----------------------------------------------------------------------
def onRtnDepthMarketData(self, data):
"""行情推送"""
tick = VtTickData()
tick.symbol = data['InstrumentID']
tick.vtSymbol = '.'.join([self.gatewayName, tick.symbol])
tick.lastPrice = data['LastPrice']
tick.volume = data['Volume']
tick.openInterest = data['OpenInterest']
tick.tickTime = '.'.join([data['UpdateTime'], str(data['UpdateMillisec']/100]))
# CTP只有一档行情
tick.bidPrice1 = data['BidPrice1']
tick.bidVolume1 = data['BidVolume1']
tick.askPrice1 = data['AskPrice1']
tick.askVolume1 = data['AskVolume1']
self.gateway.onTick(tick)
#----------------------------------------------------------------------
def onRspSubForQuoteRsp(self, data, error, n, last):
"""订阅期权询价"""
pass
#----------------------------------------------------------------------
def onRspUnSubForQuoteRsp(self, data, error, n, last):
"""退订期权询价"""
pass
#----------------------------------------------------------------------
def onRtnForQuoteRsp(self, data):
"""期权询价推送"""
pass
#----------------------------------------------------------------------
def connect(self):
"""初始化连接"""
# 如果尚未建立服务器连接,则进行连接
if not self.connectionStatus:
# 创建C++环境中的API对象这里传入的参数是需要用来保存.con文件的文件夹路径
path = os.getcwd() + '\\temp\\' + self.gatewayName + '\\'
if not os.path.exists(path):
os.makedirs(path)
self.createFtdcMdApi(path)
# 注册服务器地址
self.registerFront(self.address)
# 初始化连接成功会调用onFrontConnected
self.init()
# 若已经连接但尚未登录,则进行登录
else:
if not self.loginStatus:
self.login()
#----------------------------------------------------------------------
def subscribe(self, subscribeReq):
"""订阅合约"""
self.subscribeMarketData(subscribeReq.symbol)
self.subscribedSymbols.add(subscribeReq)
#----------------------------------------------------------------------
def login(self):
"""登录"""
# 如果填入了用户名密码等,则登录
if self.userID and self.password and self.brokerID:
req = {}
req['UserID'] = self.userID
req['Password'] = self.password
req['BrokerID'] = self.brokerID
self.reqID += 1
self.reqUserLogin(req, self.reqID)
########################################################################
class CtpTdApi(TdApi):
"""CTP交易API实现"""
#----------------------------------------------------------------------
def __init__(self, gateway, userID, password, brokerID, address):
"""API对象的初始化函数"""
super(CtpTdApi, self).__init__()
self.gateway = gateway # gateway对象
self.gatewayName = gateway.gatewayName # gateway对象名称
self.reqID = EMPTY_INT # 操作请求编号
self.orderRef = EMPTY_INT # 订单编号
self.connectionStatus = False # 连接状态
self.loginStatus = False # 登录状态
self.userID = userID # 账号
self.password = password # 密码
self.brokerID = brokerID # 经纪商代码
self.address = address # 服务器地址
#----------------------------------------------------------------------
def onFrontConnected(self):
"""服务器连接"""
self.connectionStatus = True
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'交易服务器连接成功'
self.gateway.onLog(log)
self.login()
#----------------------------------------------------------------------
def onFrontDisconnected(self, n):
"""服务器断开"""
self.connectionStatus = False
self.loginStatus = False
self.gateway.tdConnected = False
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'交易服务器连接断开'
self.gateway.onLog(log)
#----------------------------------------------------------------------
def onHeartBeatWarning(self, n):
""""""
pass
#----------------------------------------------------------------------
def onRspAuthenticate(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspUserLogin(self, data, error, n, last):
"""登陆回报"""
# 如果登录成功,推送日志信息
if error['ErrorID'] == 0:
self.loginStatus = True
self.gateway.mdConnected = True
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'交易服务器登录完成'
self.gateway.onLog(log)
# 确认结算信息
req = {}
req['BrokerID'] = self.brokerID
req['InvestorID'] = self.userID
self.reqID += 1
self.reqSettlementInfoConfirm(req, self.reqID)
# 否则,推送错误信息
else:
err = VtErrorData()
err.gatewayName = self.gateway
err.errorID = error['ErrorID']
err.errorMsg = error['ErrorMsg'].decode('gbk')
self.gateway.onError(err)
#----------------------------------------------------------------------
def onRspUserLogout(self, data, error, n, last):
"""登出回报"""
# 如果登出成功,推送日志信息
if error['ErrorID'] == 0:
self.loginStatus = False
self.gateway.tdConnected = False
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'交易服务器登出完成'
self.gateway.onLog(log)
# 否则,推送错误信息
else:
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = error['ErrorID']
err.errorMsg = error['ErrorMsg'].decode('gbk')
self.gateway.onError(err)
#----------------------------------------------------------------------
def onRspUserPasswordUpdate(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspTradingAccountPasswordUpdate(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspOrderInsert(self, data, error, n, last):
"""发单错误(柜台)"""
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = error['ErrorID']
err.errorMsg = error['ErrorMsg'].decode('gbk')
self.gateway.onError(err)
#----------------------------------------------------------------------
def onRspParkedOrderInsert(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspParkedOrderAction(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspOrderAction(self, data, error, n, last):
"""撤单错误(柜台)"""
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = error['ErrorID']
err.errorMsg = error['ErrorMsg'].decode('gbk')
self.gateway.onError(err)
#----------------------------------------------------------------------
def onRspQueryMaxOrderVolume(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspSettlementInfoConfirm(self, data, error, n, last):
"""确认结算信息回报"""
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'结算信息确认完成'
self.gateway.onLog(log)
# 查询合约代码
self.reqID += 1
self.reqQryInstrument({}, self.reqID)
#----------------------------------------------------------------------
def onRspRemoveParkedOrder(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspRemoveParkedOrderAction(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspExecOrderInsert(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspExecOrderAction(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspForQuoteInsert(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQuoteInsert(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQuoteAction(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryOrder(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryTrade(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryInvestorPosition(self, data, error, n, last):
"""持仓查询回报"""
if error['ErrorID'] == 0:
event = Event(type_=EVENT_POSITION)
event.dict_['data'] = data
self.__eventEngine.put(event)
else:
event = Event(type_=EVENT_LOG)
log = u'持仓查询回报,错误代码:' + unicode(error['ErrorID']) + u',' + u'错误信息:' + error['ErrorMsg'].decode('gbk')
event.dict_['log'] = log
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def onRspQryTradingAccount(self, data, error, n, last):
"""资金账户查询回报"""
if error['ErrorID'] == 0:
event = Event(type_=EVENT_ACCOUNT)
event.dict_['data'] = data
self.__eventEngine.put(event)
else:
event = Event(type_=EVENT_LOG)
log = u'账户查询回报,错误代码:' + unicode(error['ErrorID']) + u',' + u'错误信息:' + error['ErrorMsg'].decode('gbk')
event.dict_['log'] = log
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def onRspQryInvestor(self, data, error, n, last):
"""投资者查询回报"""
pass
#----------------------------------------------------------------------
def onRspQryTradingCode(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryInstrumentMarginRate(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryInstrumentCommissionRate(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryExchange(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryProduct(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryInstrument(self, data, error, n, last):
"""
合约查询回报
由于该回报的推送速度极快因此不适合全部存入队列中处理
选择先储存在一个本地字典中全部收集完毕后再推送到队列中
由于耗时过长目前使用其他进程读取
"""
if error['ErrorID'] == 0:
event = Event(type_=EVENT_INSTRUMENT)
event.dict_['data'] = data
event.dict_['last'] = last
self.__eventEngine.put(event)
else:
event = Event(type_=EVENT_LOG)
log = u'合约投资者回报,错误代码:' + unicode(error['ErrorID']) + u',' + u'错误信息:' + error['ErrorMsg'].decode('gbk')
event.dict_['log'] = log
self.__eventEngine.put(event)
#----------------------------------------------------------------------
def onRspQryDepthMarketData(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQrySettlementInfo(self, data, error, n, last):
"""查询结算信息回报"""
pass
#----------------------------------------------------------------------
def onRspQryTransferBank(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryInvestorPositionDetail(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryNotice(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQrySettlementInfoConfirm(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryInvestorPositionCombineDetail(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryCFMMCTradingAccountKey(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryEWarrantOffset(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryInvestorProductGroupMargin(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryExchangeMarginRate(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryExchangeMarginRateAdjust(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryExchangeRate(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQrySecAgentACIDMap(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryOptionInstrTradeCost(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryOptionInstrCommRate(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryExecOrder(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryForQuote(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryQuote(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryTransferSerial(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryAccountregister(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspError(self, error, n, last):
"""错误回报"""
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = error['ErrorID']
err.errorMsg = error['ErrorMsg'].decode('gbk')
self.gateway.onError(err)
#----------------------------------------------------------------------
def onRtnOrder(self, data):
"""报单回报"""
# 更新最大报单编号
newref = data['OrderRef']
self.orderRef = max(self.orderRef, int(newref))
# 创建报单数据对象
order = VtOrderData()
order.gatewayName = self.gatewayName
# 保存代码和报单号
order.symbol = data['InstrumentID']
order.vtSymbol = '.'.join([self.gatewayName, order.symbol])
order.orderID = data['OrderRef']
order.vtOrderID = '.'.join([self.gatewayName, order.orderID])
# 方向
if data['Direction'] == '0':
order.direction = DIRECTION_LONG
elif data['Direction'] == '1':
order.direction = DIRECTION_SHORT
else:
order.direction = DIRECTION_UNKNOWN
# 多空
if data['']
#----------------------------------------------------------------------
def onRtnTrade(self, data):
"""成交回报"""
# 常规成交事件
event1 = Event(type_=EVENT_TRADE)
event1.dict_['data'] = data
self.__eventEngine.put(event1)
# 特定合约成交事件
event2 = Event(type_=(EVENT_TRADE_CONTRACT+data['InstrumentID']))
event2.dict_['data'] = data
self.__eventEngine.put(event2)
#----------------------------------------------------------------------
def onErrRtnOrderInsert(self, data, error):
"""发单错误回报(交易所)"""
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = error['ErrorID']
err.errorMsg = error['ErrorMsg'].decode('gbk')
self.gateway.onError(err)
#----------------------------------------------------------------------
def onErrRtnOrderAction(self, data, error):
"""撤单错误回报(交易所)"""
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = error['ErrorID']
err.errorMsg = error['ErrorMsg'].decode('gbk')
self.gateway.onError(err)
#----------------------------------------------------------------------
def onRtnInstrumentStatus(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnTradingNotice(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnErrorConditionalOrder(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnExecOrder(self, data):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnExecOrderInsert(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnExecOrderAction(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnForQuoteInsert(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onRtnQuote(self, data):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnQuoteInsert(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnQuoteAction(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onRtnForQuoteRsp(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRspQryContractBank(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryParkedOrder(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryParkedOrderAction(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryTradingNotice(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryBrokerTradingParams(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQryBrokerTradingAlgos(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRtnFromBankToFutureByBank(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnFromFutureToBankByBank(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnRepealFromBankToFutureByBank(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnRepealFromFutureToBankByBank(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnFromBankToFutureByFuture(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnFromFutureToBankByFuture(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnRepealFromBankToFutureByFutureManual(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnRepealFromFutureToBankByFutureManual(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnQueryBankBalanceByFuture(self, data):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnBankToFutureByFuture(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnFutureToBankByFuture(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnRepealBankToFutureByFutureManual(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnRepealFutureToBankByFutureManual(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onErrRtnQueryBankBalanceByFuture(self, data, error):
""""""
pass
#----------------------------------------------------------------------
def onRtnRepealFromBankToFutureByFuture(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnRepealFromFutureToBankByFuture(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRspFromBankToFutureByFuture(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspFromFutureToBankByFuture(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRspQueryBankAccountMoneyByFuture(self, data, error, n, last):
""""""
pass
#----------------------------------------------------------------------
def onRtnOpenAccountByBank(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnCancelAccountByBank(self, data):
""""""
pass
#----------------------------------------------------------------------
def onRtnChangeAccountByBank(self, data):
""""""
pass
#----------------------------------------------------------------------
def login(self, address, userid, password, brokerid):
"""连接服务器"""
self.__userid = userid
self.__password = password
self.__brokerid = brokerid
# 数据重传模式设为从本日开始
self.subscribePrivateTopic(0)
self.subscribePublicTopic(0)
# 注册服务器地址
self.registerFront(address)
# 初始化连接成功会调用onFrontConnected
self.init()
#----------------------------------------------------------------------
def getInstrument(self):
"""查询合约"""
self.__reqid = self.__reqid + 1
self.reqQryInstrument({}, self.__reqid)
#----------------------------------------------------------------------
def getAccount(self):
"""查询账户"""
self.__reqid = self.__reqid + 1
self.reqQryTradingAccount({}, self.__reqid)
#----------------------------------------------------------------------
def getInvestor(self):
"""查询投资者"""
self.__reqid = self.__reqid + 1
self.reqQryInvestor({}, self.__reqid)
#----------------------------------------------------------------------
def getPosition(self):
"""查询持仓"""
self.__reqid = self.__reqid + 1
req = {}
req['BrokerID'] = self.__brokerid
req['InvestorID'] = self.__userid
self.reqQryInvestorPosition(req, self.__reqid)
#----------------------------------------------------------------------
def sendOrder(self, instrumentid, exchangeid, price, pricetype, volume, direction, offset):
"""发单"""
self.__reqid = self.__reqid + 1
req = {}
req['InstrumentID'] = instrumentid
req['OrderPriceType'] = pricetype
req['LimitPrice'] = price
req['VolumeTotalOriginal'] = volume
req['Direction'] = direction
req['CombOffsetFlag'] = offset
self.__orderref = self.__orderref + 1
req['OrderRef'] = str(self.__orderref)
req['InvestorID'] = self.__userid
req['UserID'] = self.__userid
req['BrokerID'] = self.__brokerid
req['CombHedgeFlag'] = defineDict['THOST_FTDC_HF_Speculation'] # 投机单
req['ContingentCondition'] = defineDict['THOST_FTDC_CC_Immediately'] # 立即发单
req['ForceCloseReason'] = defineDict['THOST_FTDC_FCC_NotForceClose'] # 非强平
req['IsAutoSuspend'] = 0 # 非自动挂起
req['TimeCondition'] = defineDict['THOST_FTDC_TC_GFD'] # 今日有效
req['VolumeCondition'] = defineDict['THOST_FTDC_VC_AV'] # 任意成交量
req['MinVolume'] = 1 # 最小成交量为1
self.reqOrderInsert(req, self.__reqid)
# 返回订单号,便于某些算法进行动态管理
return self.__orderref
#----------------------------------------------------------------------
def cancelOrder(self, instrumentid, exchangeid, orderref, frontid, sessionid):
"""撤单"""
self.__reqid = self.__reqid + 1
req = {}
req['InstrumentID'] = instrumentid
req['ExchangeID'] = exchangeid
req['OrderRef'] = orderref
req['FrontID'] = frontid
req['SessionID'] = sessionid
req['ActionFlag'] = defineDict['THOST_FTDC_AF_Delete']
req['BrokerID'] = self.__brokerid
req['InvestorID'] = self.__userid
self.reqOrderAction(req, self.__reqid)
#----------------------------------------------------------------------
def getSettlement(self):
"""查询结算信息"""
self.__reqid = self.__reqid + 1
req = {}
req['BrokerID'] = self.__brokerid
req['InvestorID'] = self.__userid
self.reqQrySettlementInfo(req, self.__reqid)
#----------------------------------------------------------------------
def confirmSettlement(self):
"""确认结算信息"""
self.__reqid = self.__reqid + 1
req = {}
req['BrokerID'] = self.__brokerid
req['InvestorID'] = self.__userid
self.reqSettlementInfoConfirm(req, self.__reqid)

196
vn.trader/eventEngine.py Normal file
View File

@ -0,0 +1,196 @@
# encoding: UTF-8
# 系统模块
from Queue import Queue, Empty
from threading import Thread
# 第三方模块
from PyQt4.QtCore import QTimer
# 自己开发的模块
from eventType import *
########################################################################
class EventEngine:
"""
事件驱动引擎
事件驱动引擎中所有的变量都设置为了私有这是为了防止不小心
从外部修改了这些变量的值或状态导致bug
变量说明
__queue私有变量事件队列
__active私有变量事件引擎开关
__thread私有变量事件处理线程
__timer私有变量计时器
__handlers私有变量事件处理函数字典
方法说明
__run: 私有方法事件处理线程连续运行用
__process: 私有方法处理事件调用注册在引擎中的监听函数
__onTimer私有方法计时器固定事件间隔触发后向事件队列中存入计时器事件
start: 公共方法启动引擎
stop公共方法停止引擎
register公共方法向引擎中注册监听函数
unregister公共方法向引擎中注销监听函数
put公共方法向事件队列中存入新的事件
事件监听函数必须定义为输入参数仅为一个event对象
函数
def func(event)
...
对象方法
def method(self, event)
...
"""
#----------------------------------------------------------------------
def __init__(self):
"""初始化事件引擎"""
# 事件队列
self.__queue = Queue()
# 事件引擎开关
self.__active = False
# 事件处理线程
self.__thread = Thread(target = self.__run)
# 计时器,用于触发计时器事件
self.__timer = QTimer()
self.__timer.timeout.connect(self.__onTimer)
# 这里的__handlers是一个字典用来保存对应的事件调用关系
# 其中每个键对应的值是一个列表,列表中保存了对该事件进行监听的函数功能
self.__handlers = {}
#----------------------------------------------------------------------
def __run(self):
"""引擎运行"""
while self.__active == True:
try:
event = self.__queue.get(block = True, timeout = 1) # 获取事件的阻塞时间设为1秒
self.__process(event)
except Empty:
pass
#----------------------------------------------------------------------
def __process(self, event):
"""处理事件"""
# 检查是否存在对该事件进行监听的处理函数
if event.type_ in self.__handlers:
# 若存在,则按顺序将事件传递给处理函数执行
[handler(event) for handler in self.__handlers[event.type_]]
# 以上语句为Python列表解析方式的写法对应的常规循环写法为
#for handler in self.__handlers[event.type_]:
#handler(event)
#----------------------------------------------------------------------
def __onTimer(self):
"""向事件队列中存入计时器事件"""
# 创建计时器事件
event = Event(type_=EVENT_TIMER)
# 向队列中存入计时器事件
self.put(event)
#----------------------------------------------------------------------
def start(self):
"""引擎启动"""
# 将引擎设为启动
self.__active = True
# 启动事件处理线程
self.__thread.start()
# 启动计时器计时器事件间隔默认设定为1秒
self.__timer.start(1000)
#----------------------------------------------------------------------
def stop(self):
"""停止引擎"""
# 将引擎设为停止
self.__active = False
# 停止计时器
self.__timer.stop()
# 等待事件处理线程退出
self.__thread.join()
#----------------------------------------------------------------------
def register(self, type_, handler):
"""注册事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则创建
try:
handlerList = self.__handlers[type_]
except KeyError:
handlerList = []
self.__handlers[type_] = handlerList
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in handlerList:
handlerList.append(handler)
#----------------------------------------------------------------------
def unregister(self, type_, handler):
"""注销事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求
try:
handlerList = self.handlers[type_]
# 如果该函数存在于列表中,则移除
if handler in handlerList:
handlerList.remove(handler)
# 如果函数列表为空,则从引擎中移除该事件类型
if not handlerList:
del self.handlers[type_]
except KeyError:
pass
#----------------------------------------------------------------------
def put(self, event):
"""向事件队列中存入事件"""
self.__queue.put(event)
########################################################################
class Event:
"""事件对象"""
#----------------------------------------------------------------------
def __init__(self, type_=None):
"""Constructor"""
self.type_ = type_ # 事件类型
self.dict_ = {} # 字典用于保存具体的事件数据
#----------------------------------------------------------------------
def test():
"""测试函数"""
import sys
from datetime import datetime
from PyQt4.QtCore import QCoreApplication
def simpletest(event):
print u'处理每秒触发的计时器事件:%s' % str(datetime.now())
app = QCoreApplication(sys.argv)
ee = EventEngine()
ee.register(EVENT_TIMER, simpletest)
ee.start()
app.exec_()
# 直接运行脚本可以进行测试
if __name__ == '__main__':
test()

53
vn.trader/eventType.py Normal file
View File

@ -0,0 +1,53 @@
# encoding: UTF-8
'''
本文件仅用于存放对于事件类型常量的定义
由于python中不存在真正的常量概念因此选择使用全大写的变量名来代替常量
这里设计的命名规则以EVENT_前缀开头
常量的内容通常选择一个能够代表真实意义的字符串便于理解
建议将所有的常量定义放在该文件中便于检查是否存在重复的现象
'''
# 系统相关
EVENT_TIMER = 'eTimer' # 计时器事件每隔1秒发送一次
EVENT_LOG = 'eLog' # 日志事件,全局通用
# Gateway相关
EVENT_TICK = 'eTick.' # TICK行情事件可后接具体的vtSymbol
EVENT_TRADE = 'eTrade.' # 成交回报事件
EVENT_ORDER = 'eOrder.' # 报单回报事件
EVENT_POSITION = 'ePosition.' # 持仓回报事件
EVENT_ACCOUNT = 'eAccount.' # 账户回报事件
EVENT_ERROR = 'eError.' # 错误回报事件
#----------------------------------------------------------------------
def test():
"""检查是否存在内容重复的常量定义"""
check_dict = {}
global_dict = globals()
for key, value in global_dict.items():
if '__' not in key: # 不检查python内置对象
if value in check_dict:
check_dict[value].append(key)
else:
check_dict[value] = [key]
for key, value in check_dict.items():
if len(value)>1:
print u'存在重复的常量定义:' + str(key)
for name in value:
print name
print ''
print u'测试完毕'
# 直接运行脚本可以进行测试
if __name__ == '__main__':
test()

348
vn.trader/gateway.py Normal file
View File

@ -0,0 +1,348 @@
# encoding: UTF-8
from eventEngine import *
# 默认空值
EMPTY_STRING = ''
EMPTY_UNICODE = u''
EMPTY_INT = 0
EMPTY_FLOAT = 0.0
# 方向常量
DIRECTION_NONE = 'none'
DIRECTION_LONG = 'long'
DIRECTION_SHORT = 'short'
DIRECTION_UNKNOWN = 'unknown'
# 开平常量
OFFSET_NONE = 'none'
OFFSET_OPEN = 'open'
OFFSET_CLOSE = 'close'
OFFSET_UNKNOWN = 'unknown'
########################################################################
class VtGateway(object):
"""交易接口"""
#----------------------------------------------------------------------
def __init__(self, eventEngine):
"""Constructor"""
self.eventEngine = eventEngine
#----------------------------------------------------------------------
def onTick(self, tick):
"""市场行情推送"""
# 通用事件
event1 = Event(type_=EVENT_TICK)
event1.dict_['data'] = tick
self.eventEngine.put(event1)
# 特定合约代码的事件
event2 = Event(type_=EVENT_TICK+tick.vtSymbol)
event2.dict_['data'] = tick
self.eventEngine.put(event2)
#----------------------------------------------------------------------
def onTrade(self, trade):
"""成交信息推送"""
# 因为成交通常都是事后才会知道成交编号,因此只需要推送通用事件
event1 = Event(type_=EVENT_TRADE)
event1.dict_['data'] = trade
self.eventEngine.put(event1)
#----------------------------------------------------------------------
def onOrder(self, order):
"""订单变化推送"""
# 通用事件
event1 = Event(type_=EVENT_ORDER)
event1.dict_['data'] = order
self.eventEngine.put(event1)
# 特定订单编号的事件
event2 = Event(type_=EVENT_ORDER+order.vtOrderID)
event2.dict_['data'] = order
self.eventEngine.put(event2)
#----------------------------------------------------------------------
def onPosition(self, position):
"""持仓信息推送"""
# 通用事件
event1 = Event(type_=EVENT_POSITION)
event1.dict_['data'] = position
self.eventEngine.put(event1)
# 特定合约代码的事件
event2 = Event(type_=EVENT_POSITION+position.vtPositionName)
event2.dict_['data'] = position
self.eventEngine.put(event2)
#----------------------------------------------------------------------
def onAccount(self, account):
"""账户信息推送"""
# 通用事件
event1 = Event(type_=EVENT_ACCOUNT)
event1.dict_['data'] = account
self.eventEngine.put(event1)
# 特定合约代码的事件
event2 = Event(type_=EVENT_ACCOUNT+account.vtAccountID)
event2.dict_['data'] = account
self.eventEngine.put(event2)
#----------------------------------------------------------------------
def onError(self, error):
"""错误信息推送"""
# 通用事件
event1 = Event(type_=EVENT_ERROR)
event1.dict_['data'] = error
self.eventEngine.put(event1)
#----------------------------------------------------------------------
def onLog(self, log):
"""日志推送"""
# 通用事件
event1 = Event(type_=EVENT_LOG)
event1.dict_['data'] = log
self.eventEngine.put(event1)
#----------------------------------------------------------------------
def connect(self):
"""连接"""
pass
#----------------------------------------------------------------------
def subscribe(self):
"""订阅行情"""
pass
#----------------------------------------------------------------------
def sendOrder(self):
"""发单"""
pass
#----------------------------------------------------------------------
def cancelOrder(self):
"""撤单"""
pass
#----------------------------------------------------------------------
def close(self):
"""关闭"""
pass
########################################################################
class VtBaseData(object):
"""回调函数推送数据的基础类,其他数据类继承于此"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.gatewayName = EMPTY_STRING # Gateway名称
self.rawData = None # 原始数据
########################################################################
class VtTickData(VtBaseData):
"""Tick行情数据类"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
super(VtTickData, self).__init__()
# 代码相关
self.symbol = EMPTY_STRING # 合约代码
self.vtSymbol = EMPTY_STRING # 合约在vt系统中的唯一代码通常是 Gateway名.合约代码
# 成交数据
self.lastPrice = EMPTY_FLOAT # 最新成交价
self.volume = EMPTY_INT # 最新成交量
self.openInterest = EMPTY_INT # 持仓量
self.tickTime = EMPTY_STRING # 更新时间
# 五档行情
self.bidPrice1 = EMPTY_FLOAT
self.bidPrice2 = EMPTY_FLOAT
self.bidPrice3 = EMPTY_FLOAT
self.bidPrice4 = EMPTY_FLOAT
self.bidPrice5 = EMPTY_FLOAT
self.askPrice1 = EMPTY_FLOAT
self.askPrice2 = EMPTY_FLOAT
self.askPrice3 = EMPTY_FLOAT
self.askPrice4 = EMPTY_FLOAT
self.askPrice5 = EMPTY_FLOAT
self.bidVolume1 = EMPTY_INT
self.bidVolume2 = EMPTY_INT
self.bidVolume3 = EMPTY_INT
self.bidVolume4 = EMPTY_INT
self.bidVolume5 = EMPTY_INT
self.askVolume1 = EMPTY_INT
self.askVolume2 = EMPTY_INT
self.askVolume3 = EMPTY_INT
self.askVolume4 = EMPTY_INT
self.askVolume5 = EMPTY_INT
########################################################################
class VtTradeData(VtBaseData):
"""成交数据类"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
super(VtTradeData, self).__init__()
# 代码编号相关
self.symbol = EMPTY_STRING # 合约代码
self.vtSymbol = EMPTY_STRING # 合约在vt系统中的唯一代码通常是 Gateway名.合约代码
self.tradeID = EMPTY_STRING # 成交编号
self.vtTradeID = EMPTY_STRING # 成交在vt系统中的唯一编号通常是 Gateway名.成交编号
self.orderID = EMPTY_STRING # 订单编号
self.vtOrderID = EMPTY_STRING # 订单在vt系统中的唯一编号通常是 Gateway名.订单编号
# 成交相关
self.direction = EMPTY_STRING # 成交方向
self.offset = EMPTY_STRING # 成交开平仓
self.price = EMPTY_FLOAT # 成交价格
self.volume = EMPTY_INT # 成交数量
self.tradeTime = EMPTY_STRING # 成交时间
########################################################################
class VtOrderData(VtBaseData):
"""订单数据类"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
super(VtOrderData, self).__init__()
# 代码编号相关
self.symbol = EMPTY_STRING # 合约代码
self.vtSymbol = EMPTY_STRING # 合约在vt系统中的唯一代码通常是 Gateway名.合约代码
self.orderID = EMPTY_STRING # 订单编号
self.vtOrderID = EMPTY_STRING # 订单在vt系统中的唯一编号通常是 Gateway名.订单编号
# 报单相关
self.direction = EMPTY_STRING # 报单方向
self.offset = EMPTY_STRING # 报单开平仓
self.price = EMPTY_FLOAT # 报单价格
self.totalVolume = EMPTY_INT # 报单总数量
self.tradedVolume = EMPTY_INT # 报单成交数量
self.status = EMPTY_STRING # 报单状态
self.orderTime = EMPTY_STRING # 发单时间
self.cancelTime = EMPTY_STRING # 撤单时间
# CTP/LTS相关
self.frontID = EMPTY_INT # 前置机编号
self.sessionID = EMPTY_INT # 连接编号
########################################################################
class VtPositionData(VtBaseData):
"""持仓数据类"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
super(VtPositionData, self).__init__()
# 代码编号相关
self.symbol = EMPTY_STRING # 合约代码
self.vtSymbol = EMPTY_STRING # 合约在vt系统中的唯一代码通常是 Gateway名.合约代码
# 持仓相关
self.direction = EMPTY_STRING # 持仓方向
self.position = EMPTY_INT # 持仓量
self.frozen = EMPTY_INT # 冻结数量
self.price = EMPTY_FLOAT # 持仓均价
self.vtPositionName = EMPTY_STRING # 持仓在vt系统中的唯一代码通常是vtSymbol.方向
########################################################################
class VtAccountData(VtBaseData):
"""账户数据类"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
super(VtAccountData, self).__init__()
# 账号代码相关
self.accountID = EMPTY_STRING # 账户代码
self.vtAccountID = EMPTY_STRING # 账户在vt中的唯一代码通常是 Gateway名.账户代码
# 代码相关
self.preBalance = EMPTY_FLOAT # 昨日账户结算净值
self.balance = EMPTY_FLOAT # 账户净值
self.available = EMPTY_FLOAT # 可用资金
self.commission = EMPTY_FLOAT # 今日手续费
self.margin = EMPTY_FLOAT # 保证金占用
self.closeProfit = EMPTY_FLOAT # 平仓盈亏
self.positionProfit = EMPTY_FLOAT # 持仓盈亏
########################################################################
class VtErrorData(VtBaseData):
"""错误数据类"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
super(VtErrorData, self).__init__()
self.errorID = EMPTY_STRING # 错误代码
self.errorMsg = EMPTY_UNICODE # 错误信息
########################################################################
class VtLogData(VtBaseData):
"""日志数据类"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
super(VtLogData, self).__init__()
self.logContent = EMPTY_UNICODE # 日志信息
########################################################################
class VtContractData(VtBaseData):
"""合约详细信息类"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
super(VtBaseData, self).__init__()
self.priceTick = EMPTY_FLOAT
########################################################################
class VtSubscribeReq:
"""订阅行情时传入的对象类"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.symbol = EMPTY_STRING
self.exchange = EMPTY_STRING

5
vn.trader/test.json Normal file
View File

@ -0,0 +1,5 @@
{
"1": 1,
"b": 2,
"c": "_____"
}

Binary file not shown.

Binary file not shown.

BIN
vn.trader/vnctpmd.pyd Normal file

Binary file not shown.

BIN
vn.trader/vnctptd.pyd Normal file

Binary file not shown.