Merge pull request #1038 from vnpy/beta

发布v1.9.0正式版
This commit is contained in:
vn.py 2018-08-13 00:58:24 +08:00 committed by GitHub
commit 72da1ad5f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
278 changed files with 14843 additions and 7252 deletions

32
.gitignore vendored
View File

@ -7,6 +7,8 @@ Release/
*.exp
*.pdb
*.cd
*.o
*.out
# Python编译文件
*.pyc
@ -28,8 +30,6 @@ Release/
# 本地持久化文件
*.vn
# 其他文件
*.dump
*.vssettings
@ -41,15 +41,23 @@ Release/
*.temp
*.vt
*.log
vn.ctp/build/*
vn.lts/build/*
.idea
.vscode
.gitignore
vn.trader/ctaAlgo/data/*
vn.trader/build/*
vn.trader/dist/*
*.bak
# 编译临时文件夹
build/
# CMake临时文件
CMakeCache.txt
CMakeFiles/
build/cmake_install.cmake
build/Makefile
# 目录
.idea
.vscode
.python-version
.gitignore

View File

@ -4,9 +4,10 @@ cache: pip
python:
- 2.7
- 3.6
matrix:
allow_failures:
- python: 3.6
branches:
only:
- master
- dev
install:
- pip install -r requirements.txt
- pip install flake8 # pytest # add another testing frameworks later

View File

@ -6,92 +6,101 @@
The vn.py project is an open-source quantitative trading framework that is developed by traders, for traders. The project is mainly written in Python and uses C++ for low-layer and performance sensitive infrastructure.
Using the vn.py project, institutional investors and professional traders, such as hedge funds, prop trading firms and investment banks, can easily develop complex trading strategies with the Event Engine Strategy Module, and automatically route their orders to the most desired destinations, including equity, commodity, forex and many other financial markets.
### Project Structure
1. A large number of Broker and Market Data APIs, all in Python (vn.api):
**International Financial Markets**
- Interactive Brokersvn.ib
- OANDAvn.oanda
- Shanghai Zhida Futuresvn.shzd
**Chinese Futures Market**
- CTPvn.ctp
- Femasvn.femas
- Kingstar Optionvn.ksotp
- XSpeedvn.xspeed
**Chinese Equity Market**
- LTSvn.lts
- QDPvn.qdp
- CSHSHLPvn.cshshlp
**Chinese Precious Metal Market**
- SGITvn.sgit
- Kingstar Goldvn.ksgold
**Cryptocurrency Market**
- OKCOINvn.okcoin
- Huobivn.huobi
- Lhangvn.lhang
**Market Data**
- Datayesvn.datayes
2. Simple but powerful event engine module (vn.event), which can be used for developing complex quantitative trading strategies
3. RPC framework (vn.rpc) which also supports pushing data from server to client, aimed at implementing distributed trading systems.
4. Ready to use trading platform (vn.trader), which has integrated all the trading APIs in vn.api, and provides easy to use strategy engines for developing different types of quantitative strategies and trading algorithms.
5. Tutorials about how to use vn.py to solve real world trading issues.
6. [Official Website](http://vnpy.org) and [Github Repository](http://www.github.com/vnpy/vnpy)
Using the vn.py project, institutional investors and professional traders, such as hedge funds, prop trading firms and investment banks, can easily develop complex trading strategies with the Event Engine Strategy Module, and automatically route their orders to the most desired destinations, including equity, commodity, forex, cryptocurrency and many other financial markets.
---
### Quick Start
1. Prepare a computer with Windows 7 (64-bit) installed.
1. Prepare a computer with Windows 7/8/10/Server 2008 (64-bit) installed.
2. Install [Anaconda](http://www.continuum.io/downloads), please make sure you download **Anaconda 4.0.0 Python 2.7 (32-bit)**.
2. Install [Anaconda 5.2.0](http://www.continuum.io/downloads), please make sure you download **Python 2.7 (32-bit)**.
3. Install [MongoDB](https://www.mongodb.org/downloads#production)
3. Install [MongoDB](https://www.mongodb.org/downloads#production), please register MongoDB as Windows Service.
4. Install pymongo, just run "pip install pymongo" in cmd.
4. Install [Visual C++ Redistributable Packages for VS2013 (32-bit)](https://www.microsoft.com/en-gb/download/details.aspx?id=40784).
5. Register MongoDB as Windows Service and start it.
5. Run **install.bat** to install vn.py and all dependencies.
6. Install [Visual C++ Redistributable Packages for VS2013 (32-bit)](https://www.microsoft.com/en-gb/download/details.aspx?id=40784).
6. Go to folder **examples/CryptoTrader/** and edit those **ABC_connect.json** files with your exchange config. Taking BitMEX_connect.json as an example:
* apiKey: the API Key of your account provided by BitMEX
* secretKey: the Secret Key of your account provide by BitMEX
* sessionCount: number of sessions and threads you would like to use for RESTFul request
* symbols: symbols of contract you would like to receive data update from Websocket API
7. Click the "Download ZIP" button on this page to download the project source code, assume you unzip to C:\vnpy.
7. Change the "language" setting in **VT_setting.json** to "english" (otherwise you will see the GUI in Chinese).
8. Install the IB TWS software and configure it to allow trading API connection.
8. Start CryptoTrader by running "python run.py", connect to BitMEX and then you are ready to trade!
9. Use sublime text or any other text editors you like, to change related details in C:\vnpy\vn.trader\gateway\ibGateway\IB_connect.json to your data.
### Project Structure
10. Start IB TWS software and run C:\vnpy\vn.trader\vtMain.py, enjoy trading!
1. A large number of Broker and Market Data APIs, all in Python (vnpy.api):
**Cryptocurrency Market**
- BitMEX (bitmex)
- OKEXokex)
- Huobi Prohuobi)
- Binancebinance)
- Bitfinex (bitfinex)
- Coinbase Pro (coinbase)
- FCoin (fcoin)
- BigOne (bigone)
- LBanklbank
- CCXT (ccxt)
**International Financial Markets**
- Interactive Brokersib
- Shanghai Zhida Futuresshzd
- Futu Securities (futu)
**Chinese Futures Market**
- CTPctp
- Femasfemas
- Kingstar Optionksotp
- XSpeedxspeed
**Chinese Equity Market**
- LTSlts
- QDPqdp
- CSHSHLPcshshlp
- XSpeed Securities (sec)
**Chinese Precious Metal Market**
- SGITsgit
- Kingstar Goldksgold
2. Simple but powerful event engine module (vnpy.event), which can be used for developing complex quantitative trading strategies
3. RPC framework (vnpy.rpc) which also supports pushing data from server to client, aimed at implementing distributed trading systems.
4. Ready to use trading platform (vnpy.trader), which has integrated all the trading APIs in vnpy.api, and provides easy to use strategy engines for developing different types of quantitative strategies and trading algorithms.
5. Examples about how to use vn.py framework for solving real world trading issues (vnpy.examples).
6. [Official Forum](http://www.vnpie.org) and [Github Repository](http://www.github.com/vnpy/vnpy)
---
### Contact

122
README.md
View File

@ -9,57 +9,91 @@
vn.py是基于Python的开源量化交易程序开发框架起源于国内私募的自主量化交易系统。2015年初项目启动时只是单纯的交易API接口的Python封装。随着业内关注度的上升和社区不断的贡献目前已经成长为一套全功能的交易程序开发框架用户群体也日渐多样化包括私募基金、券商自营和资管、期货资管和子公司、高校研究机构和专业个人投资者等。
2018年中启动代号为vn.crypto的数字货币量化交易系统开发计划目前已完成第一阶段的开发提供针对数字货币交易所原生的REST/WebSocket API的高性能的Python接口封装设计、以及适合7x24小时长时间交易需求的算法交易AlgoTrading模块、面向币圈交易的前端应用示例CryptoTrader等后续会进一步完善打造功能全面的数字货币量化交易平台。
---
### 项目结构
1. 丰富的Python交易API接口vnpy.api基本覆盖了国内外所有常规交易品种股票、期货、期权、外汇、外盘、比特币),具体包括:
1. 丰富的Python交易API接口vnpy.api基本覆盖了国内外所有常规交易品种股票、期货、期权、外汇、外盘、数字货币),具体包括:
- CTPctp
- 传统金融
- 飞马femas
- CTPctp
- 中泰证券XTPxtp
- 飞马femas
- 中信证券期权cshshlp
- 中泰证券XTPxtp
- 金仕达黄金ksgold
- 中信证券期权cshshlp
- 金仕达期权ksotp
- 金仕达黄金ksgold
- 飞鼠sgit
- 金仕达期权ksotp
- 飞创xspeed
- 飞鼠sgit
- QDPqdp
- 飞创xspeed
- 上海直达期货shzd
- 飞创证券sec)
- Interactive Brokersib
- QDPqdp
- OANDAoanda
- 上海直达期货shzd
- 福汇fxcm
- Interactive Brokersib
- OKCOINokcoin
- 福汇fxcm
- 火币huobi
- LBanklbank
- 数字货币
- OKEXokex)
- 火币huobi)
- 币安binance)
- BitMEX (bitmex)
- Bitfinex (bitfinex)
- Coinbase Pro (coinbase)
- FCoin (fcoin)
- BigOne (bigone)
- LBanklbank
- CCXT (ccxt)
2. 简洁易用的事件驱动引擎vnpy.event作为事件驱动型交易程序的核心
3. 支持服务器端数据推送的RPC框架vnpy.rpc用于实现多进程分布式架构的交易系统
4. 开箱即用的实盘交易平台框架vnpy.trader整合了多种交易接口并针对具体策略算法和功能开发提供了简洁易用的API用于快速构建交易员所需的量化交易程序应用举例
4. 开箱即用的量化交易平台vnpy.trader整合了多种交易接口并针对具体策略算法和功能开发提供了简洁易用的API用于快速构建交易员所需的量化交易程序应用举例
* 同时登录多个交易接口,在一套界面上监控多种市场的行情和多种资产账户的资金、持仓、委托、成交情况
* 支持跨市场套利CTP期货和XTP证券、境内外套利CTP期货和IB外盘、多市场数据整合实时预测走势CTP的股指期货数据、IB的外盘A50数据、Wind的行业指数数据等策略应用
* CTA策略引擎模块在保持易用性的同时允许用户针对CTA类策略运行过程中委托的报撤行为进行细粒度控制降低交易滑点、实现高频策略
* CtaStrategyCTA策略引擎模块在保持易用性的同时允许用户针对CTA类策略运行过程中委托的报撤行为进行细粒度控制降低交易滑点、实现高频策略
* 实盘行情记录支持Tick和K线数据的落地用于策略开发回测以及实盘运行初始化
* SpreadTrading价差交易模块根据用户的配置自动实现价差组合的深度行情以及持仓变化计算同时内置的交易算法SniperAlgo可以满足大部分到价成交策略的需求用户也可以基于AlgoTemplate开发更复杂的价差算法
* OptionMaster期权交易模块强大的期权投资组合管理功能结合基于Cython开发的高效期权定价模型支持毫秒级别的整体希腊值持仓风险计算用户可以基于期权交易引擎OmEngine快速开发各类复杂期权交易应用
* AlgoTrading算法交易模块提供多种常用的智能交易算法TWAP、Sniper、BestLimit、Iceberg、Arbitrage等等支持数据库配置保存、CSV文件加载启动以及RPC跨进程算法交易服务
* RiskManager前端风控模块负责在交易系统将任何交易请求发出到柜台前的一系列标准检查操作支持用户自定义风控规则的扩展
* DataRecorder实盘行情记录支持Tick和K线数据的落地用于策略开发回测以及实盘运行初始化
* RpcServiceRPC跨进程调用服务基于MainEngineProxy组件用户可以如同开发单一进程应用搬开发多进程架构的复杂交易应用
* RtdServiceEXCEL RTD服务组件通过pyxll模块提供EXCEL表格系统对VnTrader系统内所有数据的访问和功能调用未完成
5. 数据相关的API接口vnpy.data用于构建和更新历史行情数据库目前包括
@ -73,7 +107,7 @@ vn.py是基于Python的开源量化交易程序开发框架起源于国内私
8. vn.py项目的Docker镜像docker目前尚未完成
9. [网](http://vnpy.org)和[知乎专栏](http://zhuanlan.zhihu.com/vn-py)内容包括vn.py项目的开发教程和Python在量化交易领域的应用研究等内容
9. [方论坛](http://www.vnpie.com)和[知乎专栏](http://zhuanlan.zhihu.com/vn-py)内容包括vn.py项目的开发教程和Python在量化交易领域的应用研究等内容
10. 官方交流QQ群262656087管理较严格定期清除长期潜水的成员
@ -86,7 +120,7 @@ vn.py是基于Python的开源量化交易程序开发框架起源于国内私
1. 支持的操作系统Windows 7/8/10/Server 2008
2. 安装[MongoDB](https://www.mongodb.org/downloads#production),并[将MongoDB配置为系统服务](https://docs.mongodb.com/manual/tutorial/install-mongodb-on-windows/#configure-a-windows-service-for-mongodb-community-edition)
3. 安装[Anaconda](http://www.continuum.io/downloads)**注意必须是Python 2.7 32位版本**
3. 安装[Anaconda 5.2.0](http://www.continuum.io/downloads)**注意必须是Python 2.7 32位版本**
4. 安装[Visual C++ Redistributable Packages for VS2013 x86版本](https://support.microsoft.com/en-us/help/3138367/update-for-visual-c-2013-and-visual-c-redistributable-package)
**Ubuntu**
@ -125,14 +159,8 @@ sudo /home/vnpy/anaconda2/bin/conda install -c quantopian ta-lib=0.4.9
```
# encoding: UTF-8
# 重载sys模块设置默认字符串编码方式为utf8
import sys
reload(sys)
sys.setdefaultencoding('utf8')
# 判断操作系统
import platform
system = platform.system()
# vn.trader模块
from vnpy.event import EventEngine
@ -141,18 +169,11 @@ from vnpy.trader.uiQt import createQApp
from vnpy.trader.uiMainWindow import MainWindow
# 加载底层接口
from vnpy.trader.gateway import (ctpGateway, oandaGateway, ibGateway,
tkproGateway)
if system == 'Windows':
from vnpy.trader.gateway import (femasGateway, xspeedGateway,
futuGateway, secGateway)
if system == 'Linux':
from vnpy.trader.gateway import xtpGateway
from vnpy.trader.gateway import ctpGateway, ibGateway
# 加载上层应用
from vnpy.trader.app import (riskManager, ctaStrategy, spreadTrading)
from vnpy.trader.app import (riskManager, ctaStrategy,
spreadTrading, algoTrading)
#----------------------------------------------------------------------
@ -160,37 +181,27 @@ def main():
"""主程序入口"""
# 创建Qt应用对象
qApp = createQApp()
# 创建事件引擎
ee = EventEngine()
# 创建主引擎
me = MainEngine(ee)
# 添加交易接口
me.addGateway(ctpGateway)
me.addGateway(tkproGateway)
me.addGateway(oandaGateway)
me.addGateway(ibGateway)
if system == 'Windows':
me.addGateway(femasGateway)
me.addGateway(xspeedGateway)
me.addGateway(secGateway)
me.addGateway(futuGateway)
if system == 'Linux':
me.addGateway(xtpGateway)
# 添加上层应用
me.addApp(riskManager)
me.addApp(ctaStrategy)
me.addApp(spreadTrading)
me.addApp(algoTrading)
# 创建主窗口
mw = MainWindow(me, ee)
mw.showMaximized()
# 在主线程中启动Qt事件循环
sys.exit(qApp.exec_())
@ -252,6 +263,7 @@ vn.py使用github托管其源代码如果希望贡献代码请使用github的
捐赠方式支付宝3216630132@qq.com*晓优)
计划长期维护一份捐赠清单,所以请在留言中注明是项目捐赠以及捐赠人的名字(当然想匿名的用户就随意了)。

View File

@ -1,4 +0,0 @@
# encoding: UTF-8
from __future__ import absolute_import
from .vnokex import OkexSpotApi, OkexFuturesApi, CONTRACT_SYMBOL, SPOT_CURRENCY

View File

@ -1,52 +0,0 @@
# encoding: UTF-8
from __future__ import absolute_import
from .vnokex import *
# 在OkCoin网站申请这两个Key分别对应用户名和密码
apiKey = '你的accessKey'
secretKey = '你的secretKey'
# 创建API对象
api = OkexSpotApi()
api.connect(apiKey, secretKey, True)
sleep(3)
#api.login()
api.subscribeSpotTicker("bch_btc")
api.subscribeSpotDepth("bch_btc")
api.subscribeSpotDepth("bch_btc", 5)
api.subscribeSpotDeals("bch_btc")
api.subscribeSpotKlines("bch_btc","30min")
#api.spotTrade("etc_usdt","sell", "50" , "0.01")
#api.spotCancelOrder("etc_btc","44274138")
#api.spotUserInfo()
#api.spotOrderInfo("etc_btc", 44284731)
# api = OkexFuturesApi()
# api.connect(apiKey, secretKey, True)
# sleep(3)
#api.subsribeFutureTicker("btc","this_week")
#api.subscribeFutureKline("btc","this_week", "30min")
#api.subscribeFutureDepth("btc","this_week")
#api.subscribeFutureDepth("btc","this_week", 5)
#api.subscribeFutureTrades("btc","this_week")
#api.subscribeFutureIndex("btc")
#api.subscribeFutureForecast_price("btc")
#api.login()
#api.futureTrade( "etc_usd", "this_week" ,"1" , 20 , 1 , _match_price = '0' , _lever_rate = '10') # 14245727693
#api.futureCancelOrder("etc_usd","14245727693" , "this_week")
#api.futureUserInfo()
#api.futureOrderInfo("etc_usd" , "14245727693" , "this_week" , '1', '1' , '10')
# api.subscribeFutureTrades()
'''
合约账户信息 持仓信息等在登录后都会自动推送官方文档这样写的还没实际验证过
'''
input()

View File

@ -1,6 +0,0 @@
{
"apiKey": "你的apiKey",
"secretKey": "你的secretKey",
"trace": false,
"leverage": 10
}

View File

@ -1,965 +0,0 @@
# encoding: UTF-8
'''
vnpy.api.okex的gateway接入
注意
1. 目前仅支持USD现货交易
'''
from __future__ import print_function
import os
import json
from datetime import datetime
from time import sleep
from copy import copy
from threading import Condition
from Queue import Queue
from threading import Thread
from time import sleep
from vnpy.api.okex import OkexSpotApi, CONTRACT_SYMBOL, SPOT_CURRENCY
from vnpy.trader.vtGateway import *
from vnpy.trader.vtFunction import getJsonPath
# 价格类型映射
# 买卖类型: 限价单buy/sell 市价单buy_market/sell_market
priceTypeMap = {}
priceTypeMap['buy'] = (DIRECTION_LONG, PRICETYPE_LIMITPRICE)
priceTypeMap['buy_market'] = (DIRECTION_LONG, PRICETYPE_MARKETPRICE)
priceTypeMap['sell'] = (DIRECTION_SHORT, PRICETYPE_LIMITPRICE)
priceTypeMap['sell_market'] = (DIRECTION_SHORT, PRICETYPE_MARKETPRICE)
priceTypeMapReverse = {v: k for k, v in priceTypeMap.items()}
# 委托状态印射
statusMap = {}
statusMap[-1] = STATUS_CANCELLED
statusMap[0] = STATUS_NOTTRADED
statusMap[1] = STATUS_PARTTRADED
statusMap[2] = STATUS_ALLTRADED
statusMap[4] = STATUS_UNKNOWN
okex_all_symbol_pairs = ['ref_usdt', 'soc_usdt', 'light_usdt', 'avt_usdt', 'of_usdt', 'brd_usdt', 'ast_usdt', 'int_usdt', 'zrx_usdt', 'ctr_usdt', 'dgd_usdt', 'aidoc_usdt', 'wtc_usdt', 'swftc_usdt', 'wrc_usdt', 'sub_usdt', 'dna_usdt', 'knc_usdt', 'kcash_usdt', 'mdt_usdt', 'theta_usdt', 'ppt_usdt', 'utk_usdt', 'qvt_usdt', 'salt_usdt', 'la_usdt', 'itc_usdt', 'fair_usdt', 'yee_usdt', '1st_usdt', 'fun_usdt', 'iost_usdt', 'mkr_usdt', 'tio_usdt', 'req_usdt', 'ubtc_usdt', 'icx_usdt', 'tct_usdt', 'san_usdt', 'lrc_usdt', 'icn_usdt', 'cvc_usdt', 'eth_usdt', 'poe_usdt', 'xlm_usdt', 'iota_usdt', 'eos_usdt', 'nuls_usdt', 'mot_usdt', 'neo_usdt', 'gnx_usdt', 'dgb_usdt', 'evx_usdt', 'ltc_usdt', 'mda_usdt', 'etc_usdt', 'dpy_usdt', 'tnb_usdt', 'nas_usdt', 'btc_usdt', 'smt_usdt', 'ssc_usdt', 'oax_usdt', 'yoyo_usdt', 'snc_usdt', 'sngls_usdt', 'bch_usdt', 'mana_usdt', 'mof_usdt', 'mco_usdt', 'vib_usdt', 'topc_usdt', 'pra_usdt', 'bnt_usdt', 'xmr_usdt', 'edo_usdt', 'snt_usdt', 'eng_usdt', 'stc_usdt', 'qtum_usdt', 'key_usdt', 'ins_usdt', 'rnt_usdt', 'bcd_usdt', 'amm_usdt', 'lend_usdt', 'btm_usdt', 'elf_usdt', 'xuc_usdt', 'cag_usdt', 'snm_usdt', 'act_usdt', 'dash_usdt', 'zec_usdt', 'storj_usdt', 'pay_usdt', 'vee_usdt', 'show_usdt', 'trx_usdt', 'atl_usdt', 'ark_usdt', 'ost_usdt', 'gnt_usdt', 'dat_usdt', 'rcn_usdt', 'qun_usdt', 'mth_usdt', 'rct_usdt', 'read_usdt', 'gas_usdt', 'btg_usdt', 'mtl_usdt', 'cmt_usdt', 'xrp_usdt', 'spf_usdt', 'aac_usdt', 'can_usdt', 'omg_usdt', 'hsr_usdt', 'link_usdt', 'dnt_usdt', 'true_usdt', 'ukg_usdt', 'xem_usdt', 'ngc_usdt', 'lev_usdt', 'rdn_usdt', 'ace_usdt', 'ipc_usdt', 'ugc_usdt', 'viu_usdt', 'mag_usdt', 'hot_usdt', 'pst_usdt',
'ref_btc', 'soc_btc', 'light_btc', 'avt_btc', 'of_btc', 'brd_btc', 'ast_btc', 'int_btc', 'zrx_btc', 'ctr_btc', 'dgd_btc', 'aidoc_btc', 'wtc_btc', 'swftc_btc', 'wrc_btc', 'sub_btc', 'dna_btc', 'knc_btc', 'kcash_btc', 'mdt_btc', 'theta_btc', 'ppt_btc', 'utk_btc', 'qvt_btc', 'salt_btc', 'la_btc', 'itc_btc', 'fair_btc', 'yee_btc', '1st_btc', 'fun_btc', 'iost_btc', 'mkr_btc', 'tio_btc', 'req_btc', 'ubtc_btc', 'icx_btc', 'tct_btc', 'san_btc', 'lrc_btc', 'icn_btc', 'cvc_btc', 'eth_btc', 'poe_btc', 'xlm_btc', 'iota_btc', 'eos_btc', 'nuls_btc', 'mot_btc', 'neo_btc', 'gnx_btc', 'dgb_btc', 'evx_btc', 'ltc_btc', 'mda_btc', 'etc_btc', 'dpy_btc', 'tnb_btc', 'nas_btc', 'btc_btc', 'smt_btc', 'ssc_btc', 'oax_btc', 'yoyo_btc', 'snc_btc', 'sngls_btc', 'bch_btc', 'mana_btc', 'mof_btc', 'mco_btc', 'vib_btc', 'topc_btc', 'pra_btc', 'bnt_btc', 'xmr_btc', 'edo_btc', 'snt_btc', 'eng_btc', 'stc_btc', 'qtum_btc', 'key_btc', 'ins_btc', 'rnt_btc', 'bcd_btc', 'amm_btc', 'lend_btc', 'btm_btc', 'elf_btc', 'xuc_btc', 'cag_btc', 'snm_btc', 'act_btc', 'dash_btc', 'zec_btc', 'storj_btc', 'pay_btc', 'vee_btc', 'show_btc', 'trx_btc', 'atl_btc', 'ark_btc', 'ost_btc', 'gnt_btc', 'dat_btc', 'rcn_btc', 'qun_btc', 'mth_btc', 'rct_btc', 'read_btc', 'gas_btc', 'btg_btc', 'mtl_btc', 'cmt_btc', 'xrp_btc', 'spf_btc', 'aac_btc', 'can_btc', 'omg_btc', 'hsr_btc', 'link_btc', 'dnt_btc', 'true_btc', 'ukg_btc', 'xem_btc', 'ngc_btc', 'lev_btc', 'rdn_btc', 'ace_btc', 'ipc_btc', 'ugc_btc', 'viu_btc', 'mag_btc', 'hot_btc', 'pst_btc']
########################################################################
class OkexGateway(VtGateway):
"""OKEX交易接口"""
#----------------------------------------------------------------------
def __init__(self, eventEngine, gatewayName='OKEX'):
"""Constructor"""
super(OkexGateway, self).__init__(eventEngine, gatewayName)
self.api_spot = SpotApi(self)
# self.api_contract = Api_contract(self)
self.leverage = 0
self.connected = False
self.fileName = self.gatewayName + '_connect.json'
self.filePath = getJsonPath(self.fileName, __file__)
#----------------------------------------------------------------------
def connect(self):
"""连接"""
# 载入json文件
try:
f = file(self.filePath)
except IOError:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'读取连接配置出错,请检查'
self.onLog(log)
return
# 解析json文件
setting = json.load(f)
try:
apiKey = str(setting['apiKey'])
secretKey = str(setting['secretKey'])
trace = setting['trace']
leverage = setting['leverage']
except KeyError:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'连接配置缺少字段,请检查'
self.onLog(log)
return
# 初始化接口
self.leverage = leverage
self.api_spot.active = True
self.api_spot.connect(apiKey, secretKey, trace)
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'接口初始化成功'
self.onLog(log)
# 启动查询
# self.initQuery()
# self.startQuery()
#----------------------------------------------------------------------
def subscribe(self, subscribeReq):
"""订阅行情"""
self.api_spot.subscribe(subscribeReq)
#----------------------------------------------------------------------
def sendOrder(self, orderReq):
"""发单"""
return self.api_spot.spotSendOrder(orderReq)
#----------------------------------------------------------------------
def cancelOrder(self, cancelOrderReq):
"""撤单"""
self.api_spot.spotCancel(cancelOrderReq)
#----------------------------------------------------------------------
def qryAccount(self):
"""查询账户资金"""
self.api_spot.spotUserInfo()
#----------------------------------------------------------------------
def qryOrderInfo(self):
self.api_spot.spotAllOrders()
#----------------------------------------------------------------------
def qryPosition(self):
"""查询持仓"""
pass
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.api_spot.active = False
self.api_spot.close()
#----------------------------------------------------------------------
def initQuery(self):
"""初始化连续查询"""
if self.qryEnabled:
# 需要循环的查询函数列表
#self.qryFunctionList = [self.qryAccount, self.qryOrderInfo]
self.qryFunctionList = [ self.qryOrderInfo]
#self.qryFunctionList = []
self.qryCount = 0 # 查询触发倒计时
self.qryTrigger = 2 # 查询触发点
self.qryNextFunction = 0 # 上次运行的查询函数索引
self.startQuery()
#----------------------------------------------------------------------
def query(self, event):
"""注册到事件处理引擎上的查询函数"""
self.qryCount += 1
if self.qryCount > self.qryTrigger:
# 清空倒计时
self.qryCount = 0
# 执行查询函数
function = self.qryFunctionList[self.qryNextFunction]
function()
# 计算下次查询函数的索引如果超过了列表长度则重新设为0
self.qryNextFunction += 1
if self.qryNextFunction == len(self.qryFunctionList):
self.qryNextFunction = 0
#----------------------------------------------------------------------
def startQuery(self):
"""启动连续查询"""
self.eventEngine.register(EVENT_TIMER, self.query)
#----------------------------------------------------------------------
def setQryEnabled(self, qryEnabled):
"""设置是否要启动循环查询"""
self.qryEnabled = qryEnabled
########################################################################
class SpotApi(OkexSpotApi):
"""okex的API实现"""
#----------------------------------------------------------------------
def __init__(self, gateway):
"""Constructor"""
super(SpotApi, self).__init__()
self.gateway = gateway # gateway对象
self.gatewayName = gateway.gatewayName # gateway对象名称
self.active = False # 若为True则会在断线后自动重连
self.cbDict = {}
self.tickDict = {}
self.orderDict = {}
self.channelSymbolMap = {}
self.localNo = 0 # 本地委托号
self.localNoQueue = Queue() # 未收到系统委托号的本地委托号队列
self.localNoDict = {} # key为本地委托号value为系统委托号
self.orderIdDict = {} # key为系统委托号value为本地委托号
self.cancelDict = {} # key为本地委托号value为撤单请求
self.recordOrderId_BefVolume = {} # 记录的之前处理的量
self.cache_some_order = {}
self.tradeID = 0
self.registerSymbolPairArray = set([])
self.initCallback()
'''
登录后每次订单执行撤销后又这样的 推送不知道干啥的先过滤掉了
{u'binary': 1, u'product': u'spot', u'type': u'order', u'base': u'etc'
, u'quote': u'usdt', u'data': {u'status': -1, u'orderType': 0, u'price': u'25.4050', u'modifyTime':
1512288275000L, u'userId': 6548935, u'createTime': 1512288275000L, u'source': 0, u'quoteSize': u'0.0
0000000', u'executedValue': u'0.00000000', u'id': 62877909, u'filledSize': u'0.00000000', u'side': 1
, u'size': u'0.01000000'}}
'''
#----------------------------------------------------------------------
def onMessage(self, ws, evt):
"""信息推送"""
# print evt
data = self.readData(evt)[0]
try:
channel = data['channel']
except Exception as ex:
channel = None
if channel == None:
return
# try:
if channel == "addChannel" and 'data' in data:
channel = data['data']["channel"]
if channel != "addChannel" and 'future' not in channel and channel != 'login':
# print channel
callback = self.cbDict[channel]
callback(data)
# if 'depth' not in channel and 'ticker' not in channel and 'deals' not in channel and 'userinfo' not in channel and 'future' not in channel:
# print data
# except Exception,ex:
# print "Error in callback cbDict ", channel
#print self.cbDict
#----------------------------------------------------------------------
def onError(self, ws, evt):
"""错误推送"""
error = VtErrorData()
error.gatewayName = self.gatewayName
error.errorMsg = str(evt)
self.gateway.onError(error)
#----------------------------------------------------------------------
def onError(self, data):
error = VtErrorData()
error.gatewayName = self.gatewayName
error.errorMsg = str(data["data"]["error_code"])
self.gateway.onError(error)
#----------------------------------------------------------------------
def onClose(self, ws):
"""接口断开"""
# 如果尚未连上,则忽略该次断开提示
if not self.gateway.connected:
return
self.gateway.connected = False
self.writeLog(u'服务器连接断开')
# 重新连接
if self.active:
def reconnect():
while not self.gateway.connected:
self.writeLog(u'等待10秒后重新连接')
sleep(10)
if not self.gateway.connected:
self.reconnect()
t = Thread(target=reconnect)
t.start()
#----------------------------------------------------------------------
def subscribe(self, subscribeReq):
symbol_pair_gateway = subscribeReq.symbol
arr = symbol_pair_gateway.split('.')
symbol_pair = arr[0]
if symbol_pair not in self.registerSymbolPairArray:
self.registerSymbolPairArray.add(symbol_pair)
self.subscribeSingleSymbol(symbol_pair)
self.spotOrderInfo(symbol_pair, '-1')
#----------------------------------------------------------------------
def subscribeSingleSymbol(self, symbol):
if symbol in okex_all_symbol_pairs:
self.subscribeSpotTicker(symbol)
self.subscribeSpotDepth5(symbol)
#self.subscribeSpotDeals(symbol)
#----------------------------------------------------------------------
def spotAllOrders(self):
print(spotAllOrders)
for symbol in registerSymbolPairArray:
if symbol in okex_all_symbol_pairs:
self.spotOrderInfo(symbol, '-1')
for orderId in self.orderIdDict.keys():
order = self.orderDict.get(orderId, None)
if order != None:
symbol_pair = (order.symbol.split('.'))[0]
self.spotOrderInfo(symbol_pair, orderId)
#----------------------------------------------------------------------
def onOpen(self, ws):
"""连接成功"""
self.gateway.connected = True
self.writeLog(u'服务器连接成功')
self.login()
# 连接后查询账户和委托数据
self.spotUserInfo()
self.subscribeSingleSymbol("etc_usdt")
for symbol in okex_all_symbol_pairs:
# self.subscribeSpotTicker(symbol)
# self.subscribeSpotDepth5(symbol)
# self.subscribeSpotDeals(symbol)
#Ticker数据
self.channelSymbolMap["ok_sub_spot_%s_ticker" % symbol] = symbol
#盘口的深度
self.channelSymbolMap["ok_sub_spot_%s_depth_5" % symbol] = symbol
#所有人的交易数据
self.channelSymbolMap["ok_sub_spot_%s_deals" % symbol] = symbol
contract = VtContractData()
contract.gatewayName = self.gatewayName
contract.symbol = symbol
contract.exchange = EXCHANGE_OKEX
contract.vtSymbol = '.'.join([contract.symbol, contract.exchange])
contract.name = u'OKEX现货%s' % symbol
contract.size = 0.00001
contract.priceTick = 0.00001
contract.productClass = PRODUCT_SPOT
self.gateway.onContract(contract)
'''
[{
"channel":"ok_sub_spot_bch_btc_deals",
"data":[["1001","2463.86","0.052","16:34:07","ask"]]
}]
'''
#----------------------------------------------------------------------
def onSpotSubDeals(self, data):
if 'data' not in data:
return
rawData = data["data"]
# print rawData
#----------------------------------------------------------------------
def writeLog(self, content):
"""快速记录日志"""
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = content
self.gateway.onLog(log)
#----------------------------------------------------------------------
def initCallback(self):
"""初始化回调函数"""
# USD_SPOT
for symbol_pair in okex_all_symbol_pairs:
self.cbDict["ok_sub_spot_%s_ticker" % symbol_pair] = self.onTicker
self.cbDict["ok_sub_spot_%s_depth_5" % symbol_pair] = self.onDepth
self.cbDict["ok_sub_spot_%s_deals" % symbol_pair] = self.onSpotSubDeals
self.cbDict["ok_sub_spot_%s_order" % symbol_pair] = self.onSpotSubOrder
self.cbDict["ok_sub_spot_%s_balance" % symbol_pair] = self.onSpotBalance
self.cbDict['ok_spot_userinfo'] = self.onSpotUserInfo
self.cbDict['ok_spot_orderinfo'] = self.onSpotOrderInfo
# 下面这两个好像废弃了
#self.cbDict['ok_sub_spot_userinfo'] = self.onSpotSubUserInfo
#self.cbDict['ok_sub_spot_trades'] = self.onSpotSubTrades
self.cbDict['ok_spot_order'] = self.onSpotOrder
self.cbDict['ok_spot_cancel_order'] = self.onSpotCancelOrder
'''
[
{
"binary": 0,
"channel": "ok_sub_spot_bch_btc_ticker",
"data": {
"high": "10000",
"vol": "185.03743858",
"last": "111",
"low": "0.00000001",
"buy": "115",
"change": "101",
"sell": "115",
"dayLow": "0.00000001",
"dayHigh": "10000",
"timestamp": 1500444626000
}
}
]
'''
#----------------------------------------------------------------------
def onTicker(self, data):
""""""
if 'data' not in data:
return
channel = data['channel']
if channel == 'addChannel':
return
try:
symbol = self.channelSymbolMap[channel]
if symbol not in self.tickDict:
tick = VtTickData()
tick.exchange = EXCHANGE_OKEX
tick.symbol = '.'.join([symbol, tick.exchange])
tick.vtSymbol = '.'.join([symbol, tick.exchange])
tick.gatewayName = self.gatewayName
self.tickDict[symbol] = tick
else:
tick = self.tickDict[symbol]
rawData = data['data']
tick.highPrice = float(rawData['high'])
tick.lowPrice = float(rawData['low'])
tick.lastPrice = float(rawData['last'])
tick.volume = float(rawData['vol'].replace(',', ''))
# tick.date, tick.time = self.generateDateTime(rawData['timestamp'])
# print "ticker", tick.date, tick.time
# newtick = copy(tick)
# self.gateway.onTick(newtick)
except Exception as ex:
print("Error in onTicker ", channel)
#----------------------------------------------------------------------
def onDepth(self, data):
""""""
if 'data' not in data:
return
try:
channel = data['channel']
symbol = self.channelSymbolMap[channel]
except Exception as ex:
symbol = None
if symbol == None:
return
if symbol not in self.tickDict:
tick = VtTickData()
tick.symbol = symbol
tick.vtSymbol = symbol
tick.gatewayName = self.gatewayName
self.tickDict[symbol] = tick
else:
tick = self.tickDict[symbol]
if 'data' not in data:
return
rawData = data['data']
tick.bidPrice1, tick.bidVolume1 = rawData['bids'][0]
tick.bidPrice2, tick.bidVolume2 = rawData['bids'][1]
tick.bidPrice3, tick.bidVolume3 = rawData['bids'][2]
tick.bidPrice4, tick.bidVolume4 = rawData['bids'][3]
tick.bidPrice5, tick.bidVolume5 = rawData['bids'][4]
tick.askPrice1, tick.askVolume1 = rawData['asks'][-1]
tick.askPrice2, tick.askVolume2 = rawData['asks'][-2]
tick.askPrice3, tick.askVolume3 = rawData['asks'][-3]
tick.askPrice4, tick.askVolume4 = rawData['asks'][-4]
tick.askPrice5, tick.askVolume5 = rawData['asks'][-5]
tick.date, tick.time = self.generateDateTime(rawData['timestamp'])
# print "Depth", tick.date, tick.time
newtick = copy(tick)
self.gateway.onTick(newtick)
'''
[
{
"base": "bch",
"binary": 0,
"channel": "ok_sub_spot_bch_btc_balance",
"data": {
"info": {
"free": {
"btc": 5814.850605790395
},
"freezed": {
"btc": 7341
}
}
},
"product": "spot",
"quote": "btc",
"type": "order"
}
]
'''
def onSpotBalance(self, data):
"""交易发生金额变动之后会触发这个函数"""
# print data
rawData = data['data']
info = rawData['info']
for symbol in info["freezed"].keys():
pos = VtPositionData()
pos.gatewayName = self.gatewayName
pos.symbol = symbol + "." + EXCHANGE_OKEX
pos.vtSymbol = symbol + "." + EXCHANGE_OKEX
pos.direction = DIRECTION_NET
pos.frozen = float(info['freezed'][symbol])
pos.position = pos.frozen + float(info['free'][symbol])
self.gateway.onPosition(pos)
'''
[{"binary":0,"channel":"ok_spot_userinfo","data":{"result":true,"info":{"funds":{"borrow":{"dgd":"0"
,"bcd":"0","bcc":"0","bch":"0","hsr":"0","xuc":"0","omg":"0","eos":"0","qtum":"0","btc":"0","act":"0
","bcs":"0","btg":"0","etc":"0","eth":"0","usdt":"0","gas":"0","zec":"0","neo":"0","ltc":"0","bt1":"
0","bt2":"0","iota":"0","pay":"0","storj":"0","gnt":"0","snt":"0","dash":"0"},"free":{"dgd":"0","bcd
":"0","bcc":"0","bch":"0","hsr":"0","xuc":"3","omg":"0","eos":"0","qtum":"0","btc":"0.00266884258369
","act":"0","bcs":"0","btg":"0","etc":"7.9909635","eth":"0","usdt":"0","gas":"0","zec":"0","neo":"0"
,"ltc":"0","bt1":"0","bt2":"0","iota":"0","pay":"0","storj":"0","gnt":"0","snt":"0","dash":"0"},"fre
ezed":{"dgd":"0","bcd":"0","bcc":"0","bch":"0","hsr":"0","xuc":"0","omg":"0","eos":"0","qtum":"0","b
tc":"0","act":"0","bcs":"0","btg":"0","etc":"0","eth":"0","usdt":"0","gas":"0","zec":"0","neo":"0","
ltc":"0","bt1":"0","bt2":"0","iota":"0","pay":"0","storj":"0","gnt":"0","snt":"0","dash":"0"}}}}}]
{u'binary': 0, u'data': {u'info': {u'funds': {u'freezed': {u'zec': u'0', u'usdt': u'0', u'btg': u'0'
, u'btc': u'0', u'bt1': u'0', u'neo': u'0', u'pay': u'0', u'storj': u'0', u'iota': u'0', u'omg': u'0
', u'dgd': u'0', u'bt2': u'0', u'xuc': u'0', u'gas': u'0', u'hsr': u'0', u'snt': u'0', u'dash': u'0'
, u'bch': u'0', u'gnt': u'0', u'bcd': u'0', u'qtum': u'0', u'bcc': u'0', u'eos': u'0', u'etc': u'0',
u'act': u'0', u'eth': u'0', u'ltc': u'0', u'bcs': u'0'}, u'borrow': {u'zec': u'0', u'usdt': u'0', u
'btg': u'0', u'btc': u'0', u'bt1': u'0', u'neo': u'0', u'pay': u'0', u'storj': u'0', u'iota': u'0',
u'omg': u'0', u'dgd': u'0', u'bt2': u'0', u'xuc': u'0', u'gas': u'0', u'hsr': u'0', u'snt': u'0', u'
dash': u'0', u'bch': u'0', u'gnt': u'0', u'bcd': u'0', u'qtum': u'0', u'bcc': u'0', u'eos': u'0', u'
etc': u'0', u'act': u'0', u'eth': u'0', u'ltc': u'0', u'bcs': u'0'}, u'free': {u'zec': u'0', u'usdt'
: u'0', u'btg': u'0', u'btc': u'0.00266884258369', u'bt1': u'0', u'neo': u'0', u'pay': u'0', u'storj
': u'0', u'iota': u'0', u'omg': u'0', u'dgd': u'0', u'bt2': u'0', u'xuc': u'3', u'gas': u'0', u'hsr'
: u'0', u'snt': u'0', u'dash': u'0', u'bch': u'0', u'gnt': u'0', u'bcd': u'0', u'qtum': u'0', u'bcc'
: u'0', u'eos': u'0', u'etc': u'7.9909635', u'act': u'0', u'eth': u'0', u'ltc': u'0', u'bcs': u'0'}}
}, u'result': True}, u'channel': u'ok_spot_userinfo'}
'''
#----------------------------------------------------------------------
def onSpotUserInfo(self, data):
"""现货账户资金推送"""
rawData = data['data']
info = rawData['info']
funds = rawData['info']['funds']
# 持仓信息
for symbol in ['btc', 'ltc','eth', self.currency]:
#for symbol in :
if symbol in funds['free']:
pos = VtPositionData()
pos.gatewayName = self.gatewayName
pos.symbol = symbol + "." + EXCHANGE_OKEX
pos.vtSymbol = symbol + "." + EXCHANGE_OKEX
pos.vtPositionName = symbol
pos.direction = DIRECTION_NET
pos.frozen = float(funds['freezed'][symbol])
pos.position = pos.frozen + float(funds['free'][symbol])
self.gateway.onPosition(pos)
# 账户资金
account = VtAccountData()
account.gatewayName = self.gatewayName
account.accountID = self.gatewayName
account.vtAccountID = account.accountID
account.balance = 0.0
#account.balance = float(funds['asset']['net'])
self.gateway.onAccount(account)
#----------------------------------------------------------------------
# 这个 API 现在文档没找到。。 好像废弃了
def onSpotSubUserInfo(self, data):
"""现货账户资金推送"""
if 'data' not in data:
return
rawData = data['data']
info = rawData['info']
# 持仓信息
#for symbol in ['btc', 'ltc','eth', self.currency]:
for symbol in SPOT_CURRENCY:
if symbol in info['free']:
pos = VtPositionData()
pos.gatewayName = self.gatewayName
pos.symbol = symbol + "." + EXCHANGE_OKEX
pos.vtSymbol = symbol + "." + EXCHANGE_OKEX
pos.vtPositionName = symbol
pos.direction = DIRECTION_NET
pos.frozen = float(info['freezed'][symbol])
pos.position = pos.frozen + float(info['free'][symbol])
self.gateway.onPosition(pos)
'''
交易数据
[
{
"base": "bch",
"binary": 0,
"channel": "ok_sub_spot_bch_btc_order",
"data": {
"symbol": "bch_btc",
"tradeAmount": "1.00000000",
"createdDate": "1504530228987",
"orderId": 6191,
"completedTradeAmount": "0.00000000",
"averagePrice": "0",
"tradePrice": "0.00000000",
"tradeType": "buy",
"status": 0,
"tradeUnitPrice": "113.00000000"
},
"product": "spot",
"quote": "btc",
"type": "balance"
}
]
{u'binary': 0, u'data': {u'orderId': 62870564, u'status': 0, u'tradeType': u'sell', u'tradeUnitPrice
': u'25.3500', u'symbol': u'etc_usdt', u'tradePrice': u'0.0000', u'createdDate': u'1512287172393', u
'averagePrice': u'0', u'tradeAmount': u'0.01000000', u'completedTradeAmount': u'0.00000000'}, u'chan
nel': u'ok_sub_spot_etc_usdt_order'}
'''
#----------------------------------------------------------------------
def onSpotSubOrder(self, data):
"""交易数据"""
if 'data' not in data:
return
rawData = data["data"]
# 本地和系统委托号
orderId = str(rawData['orderId'])
# 这时候出现None, 情况是 已经发出了单子,但是系统这里还没建立 索引
# 先这样返回试一下
# 因为 发完单订单变化是先推送的。。导致不清楚他的localID
# 现在的处理方式是, 先缓存这里的信息,等到出现了 localID再来处理这一段
localNo = self.orderIdDict.get(orderId, None)
if localNo == None:
arr = self.cache_some_order.get(orderId, None)
if arr == None:
arr = []
arr.append(data)
self.cache_some_order[orderId] = arr
else:
arr.append(data)
return
# 委托信息
if orderId not in self.orderDict:
order = VtOrderData()
order.gatewayName = self.gatewayName
order.symbol = '.'.join([rawData['symbol'], EXCHANGE_OKEX])
#order.symbol = spotSymbolMap[rawData['symbol']]
order.vtSymbol = order.symbol
order.orderID = localNo
order.vtOrderID = '.'.join([self.gatewayName, order.orderID])
order.price = float(rawData['tradeUnitPrice'])
order.totalVolume = float(rawData['tradeAmount'])
order.direction, priceType = priceTypeMap[rawData['tradeType']]
self.orderDict[orderId] = order
else:
order = self.orderDict[orderId]
order.tradedVolume = float(rawData['completedTradeAmount'])
order.status = statusMap[rawData['status']]
self.gateway.onOrder(copy(order))
bef_volume = self.recordOrderId_BefVolume.get(orderId, 0.0 )
now_volume = float(rawData['completedTradeAmount']) - bef_volume
if now_volume > 0.000001:
trade = VtTradeData()
trade.gatewayName = self.gatewayName
trade.symbol = order.symbol
trade.vtSymbol = order.symbol
self.tradeID += 1
trade.tradeID = str(self.tradeID)
trade.vtTradeID = '.'.join([self.gatewayName, trade.tradeID])
trade.orderID = localNo
trade.vtOrderID = '.'.join([self.gatewayName, trade.orderID])
trade.price = float(rawData['tradeUnitPrice'])
trade.volume = float(now_volume)
trade.direction, priceType = priceTypeMap[rawData['tradeType']]
trade.tradeTime = datetime.now().strftime('%H:%M:%S')
self.gateway.onTrade(trade)
"""
原来的OK coin方式不过数据一直没有 所以换一种方式
# 成交信息
if 'sigTradeAmount' in rawData and float(rawData['sigTradeAmount'])>0:
trade = VtTradeData()
trade.gatewayName = self.gatewayName
trade.symbol = spotSymbolMap[rawData['symbol']]
trade.vtSymbol = order.symbol
trade.tradeID = str(rawData['id'])
trade.vtTradeID = '.'.join([self.gatewayName, trade.tradeID])
trade.orderID = localNo
trade.vtOrderID = '.'.join([self.gatewayName, trade.orderID])
trade.price = float(rawData['sigTradePrice'])
trade.volume = float(rawData['sigTradeAmount'])
trade.direction, priceType = priceTypeMap[rawData['tradeType']]
trade.tradeTime = datetime.now().strftime('%H:%M:%S')
self.gateway.onTrade(trade)
"""
'''
[
{
"binary": 0,
"channel": "ok_spot_orderinfo",
"data": {
"result": true,
"orders": [
{
"symbol": "bch_btc",
"amount": "0.10000000",
"price": "1.00000000",
"avg_price": 0,
"create_date": 1504529828000,
"type": "buy",
"deal_amount": 0,
"order_id": 6189,
"status": -1
}
]
}
}
]
'''
#----------------------------------------------------------------------
def onSpotOrderInfo(self, data):
"""委托信息查询回调"""
if "error_code" in data.keys():
print(data)
return
rawData = data['data']
for d in rawData['orders']:
self.localNo += 1
localNo = str(self.localNo)
orderId = str(d['order_id'])
self.localNoDict[localNo] = orderId
self.orderIdDict[orderId] = localNo
if orderId not in self.orderDict:
order = VtOrderData()
order.gatewayName = self.gatewayName
#order.symbol = spotSymbolMap[d['symbol']]
order.symbol = '.'.join([d["symbol"], EXCHANGE_OKEX])
order.vtSymbol = order.symbol
order.orderID = localNo
order.vtOrderID = '.'.join([self.gatewayName, order.orderID])
order.price = d['price']
order.totalVolume = d['amount']
order.direction, priceType = priceTypeMap[d['type']]
self.orderDict[orderId] = order
else:
order = self.orderDict[orderId]
order.tradedVolume = d['deal_amount']
order.status = statusMap[d['status']]
self.gateway.onOrder(copy(order))
'''
[
{
"binary": 0,
"channel": "ok_spot_order",
"data": {
"result": true,
"order_id": 6189
}
}
]
'''
def onSpotOrder(self, data):
rawData = data['data']
if 'error_code' in rawData.keys():
print(data)
return
orderId = str(rawData['order_id'])
# 尽管websocket接口的委托号返回是异步的但经过测试是
# 符合先发现回的规律因此这里通过queue获取之前发送的
# 本地委托号,并把它和推送的系统委托号进行映射
# localNo = self.orderIdDict.get(orderId, None)
# if localNo == None:
localNo = self.localNoQueue.get_nowait()
self.localNoDict[localNo] = orderId
self.orderIdDict[orderId] = localNo
# print orderId, self.cache_some_order
if orderId in self.cache_some_order.keys():
arr = self.cache_some_order[orderId]
for d in arr:
self.onSpotSubOrder(d)
# 处理完就删除掉这里
del self.cache_some_order[orderId]
# 检查是否有系统委托号返回前就发出的撤单请求,若有则进
# 行撤单操作
if localNo in self.cancelDict:
req = self.cancelDict[localNo]
self.spotCancel(req)
del self.cancelDict[localNo]
'''
[
{
"binary": 0,
"channel": "ok_spot_cancel_order",
"data": {
"result": true,
"order_id": "125433027"
}
}
]
'''
#----------------------------------------------------------------------
def onSpotCancelOrder(self, data):
"""撤单回报"""
if 'data' not in data:
return
if 'error' in data["data"].keys():
self.onError(data)
return
rawData = data['data']
orderId = str(rawData['order_id'])
localNo = self.orderIdDict[orderId]
if orderId not in self.orderDict:
order = VtOrderData()
order.gatewayName = self.gatewayName
order.symbol = '.'.join([rawData['symbol'], EXCHANGE_OKEX])
order.vtSymbol = order.symbol
order.orderID = localNo
order.vtOrderID = '.'.join([self.gatewayName, order.orderID])
self.orderDict[orderId] = order
else:
order = self.orderDict[orderId]
order.status = STATUS_CANCELLED
self.gateway.onOrder(order)
del self.orderDict[orderId]
del self.orderIdDict[orderId]
del self.localNoDict[localNo]
if orderId in self.cache_some_order.keys():
del self.cache_some_order[orderId]
#----------------------------------------------------------------------
def spotSendOrder(self, req):
"""发单"""
#symbol = spotSymbolMapReverse[req.symbol][:4]
symbol = (req.symbol.split('.'))[0]
type_ = priceTypeMapReverse[(req.direction, req.priceType)]
self.spotTrade(symbol, type_, str(req.price), str(req.volume))
# 本地委托号加1并将对应字符串保存到队列中返回基于本地委托号的vtOrderID
self.localNo += 1
self.localNoQueue.put(str(self.localNo))
vtOrderID = '.'.join([self.gatewayName, str(self.localNo)])
return vtOrderID
#----------------------------------------------------------------------
def spotCancel(self, req):
"""撤单"""
#symbol = spotSymbolMapReverse[req.symbol][:4]
symbol = (req.symbol.split('.'))[0]
localNo = req.orderID
if localNo in self.localNoDict:
orderID = self.localNoDict[localNo]
self.spotCancelOrder(symbol, orderID)
else:
# 如果在系统委托号返回前客户就发送了撤单请求,则保存
# 在cancelDict字典中等待返回后执行撤单任务
self.cancelDict[localNo] = req
#----------------------------------------------------------------------
def generateDateTime(self, s):
"""生成时间"""
dt = datetime.fromtimestamp(float(s)/1e3)
time = dt.strftime("%H:%M:%S.%f")
date = dt.strftime("%Y%m%d")
return date, time

View File

@ -62,7 +62,6 @@ def generateVtBar(row):
bar.low = row['low']
bar.close = row['close']
bar.volume = row['volume']
bar.date = str(row['date'])
bar.time = str(row['time']).rjust(6, '0')
@ -70,6 +69,19 @@ def generateVtBar(row):
hour=bar.time[0:2]
minute=bar.time[2:4]
sec=bar.time[4:6]
# ------------------------------add by yzl :start
# print(row.date, type(row.date), row.time, type(row.time))# add by yzl to show the date type and value
# 20180328 < type'long' > 93400 < type'long' >
# 最佳改进方法: 构建一个datetime,然后滞后一分钟不能简单00000处理日期减一弊端:处理量太大
# 改进2找出 000,此时日期回退一天
if int(hour) == 0 and int(minute) == 0:
temp_date = datetime(int(bar.date[:4]), int(bar.date[4:6]), int(bar.date[6:])).date()
temp_date = temp_date - timedelta(days=1)
bar.date = temp_date.strftime("%Y%m%d")
# -------------------------------add by yzl :end
if minute=="00":
minute="59"
@ -81,6 +93,8 @@ def generateVtBar(row):
else:
minute=str(int(minute)-1).rjust(2,'0')
bar.time=hour+minute+sec
bar.datetime = datetime.strptime(' '.join([bar.date, bar.time]), '%Y%m%d %H%M%S')

View File

@ -1,7 +1,8 @@
# encoding: UTF-8
from jsEngine import JsEngine
from uiJsWidget import JsEngineManager
from __future__ import absolute_import
from .jsEngine import JsEngine
from .uiJsWidget import JsEngineManager
appName = 'JaqsService'
appDisplayName = u'Jaqs服务'

View File

@ -1,3 +1,4 @@
from __future__ import print_function
import zmq
import Queue
import threading
@ -108,11 +109,11 @@ class JRpcServer :
#client_addr_map[client_id] = identity
self._on_data_arrived(identity, data)
except zmq.error.Again, e:
except zmq.error.Again as e:
#print "RECV timeout: ", e
pass
except Exception, e:
print("_recv_run:", e)
except Exception as e:
print(("_recv_run:", e))
def _callback_run(self):
while not self._should_close:
@ -120,12 +121,12 @@ class JRpcServer :
r = self._callback_queue.get(timeout = 1)
if r :
r()
except Queue.Empty, e:
except Queue.Empty as e:
pass
except Exception, e:
except Exception as e:
traceback.print_exc(e)
print "_callback_run", type(e), e
print("_callback_run", type(e), e)
def _async_call(self, func):
self._callback_queue.put( func )
@ -164,12 +165,12 @@ class JRpcServer :
#print "RECV", msg
if not msg:
print "wrong message format"
print("wrong message format")
return
method = msg['method'] if msg.has_key('method') else None
method = msg['method'] if 'method' in msg else None
call_id = msg['id'] if msg.has_key('id') and msg['id'] else None
call_id = msg['id'] if 'id' in msg and msg['id'] else None
if call_id and method:
if method == ".sys.heartbeat":
@ -183,8 +184,8 @@ class JRpcServer :
if self.on_call :
self._async_call( lambda : self.on_call(identity, msg))
except Exception, e:
print( "_on_data_arrived:", e)
except Exception as e:
print(( "_on_data_arrived:", e))
pass

View File

Before

Width:  |  Height:  |  Size: 21 KiB

After

Width:  |  Height:  |  Size: 21 KiB

View File

@ -1,9 +1,10 @@
# encoding: UTF-8
from __future__ import absolute_import
import json
from collections import defaultdict
import jrpc_server
from . import jrpc_server
from vnpy.event import Event
from vnpy.trader.vtFunction import getJsonPath

View File

@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
import jrpc_server
from __future__ import print_function
from __future__ import absolute_import
from . import jrpc_server
import time
import pandas as pd
from qdata.database import DatabaseConn
@ -15,7 +17,7 @@ db = None
def on_call(client_id, req):
if req['method'] != '.sys.heartbeat':
print "on_call", req
print("on_call", req)
if req['method'] == 'auth.login':
server.send_rsp(client_id, req, result = { "username" : "fixme", "name": "fixme" })
@ -25,7 +27,7 @@ def on_call(client_id, req):
server.send_rsp(client_id, req, error=[-1, "unknown method"])
return
if not req.has_key('params'):
if 'params' not in req:
server.send_rsp(client_id, req, error=[-1, "missing params"])
return
@ -55,7 +57,7 @@ def run():
server = jrpc_server.JRpcServer()
server.on_call = on_call
addr = "tcp://%s:%s"%(st.HOST, st.PORT)
print "listen at " + addr
print("listen at " + addr)
server.listen(addr)
while True:

View File

@ -570,7 +570,7 @@ class TkproDataApi(object):
tick.lowerLimit = data['limit_down']
self.gateway.onTick(tick)
except Exception, e:
except Exception as e:
self.writeLog(u'行情更新失败,错误信息:%s' % str(e))
#----------------------------------------------------------------------

View File

@ -0,0 +1,11 @@
{
"MONGO_HOST": "localhost",
"MONGO_PORT": 27017,
"APIKEY": "",
"SYMBOLS": [
"BINANCE_SPOT_BTC_USDT",
"BINANCE_SPOT_ETH_USDT"
]
}

View File

@ -0,0 +1,103 @@
# encoding: UTF-8
from __future__ import print_function
import json
import time
import datetime
import requests
from pymongo import MongoClient, ASCENDING
from vnpy.trader.vtObject import VtBarData
from vnpy.trader.app.ctaStrategy.ctaBase import MINUTE_DB_NAME
# 加载配置
config = open('config.json')
setting = json.load(config)
MONGO_HOST = setting['MONGO_HOST']
MONGO_PORT = setting['MONGO_PORT']
APIKEY = setting['APIKEY']
SYMBOLS = setting['SYMBOLS']
mc = MongoClient(MONGO_HOST, MONGO_PORT) # Mongo连接
db = mc[MINUTE_DB_NAME] # 数据库
headers = {'X-CoinAPI-Key': APIKEY}
#----------------------------------------------------------------------
def generateVtBar(symbol, d):
"""生成K线"""
bar = VtBarData()
bar.symbol = symbol
bar.vtSymbol = symbol
bar.datetime = datetime.datetime.strptime(d['time_open'], '%Y-%m-%dT%H:%M:%S.%f0Z')
bar.date = bar.datetime.strftime('%Y%m%d')
bar.time = bar.datetime.strftime('%H:%M:%S')
bar.open = d['price_open']
bar.high = d['price_high']
bar.low = d['price_low']
bar.close = d['price_close']
bar.volume = d['volume_traded']
return bar
#----------------------------------------------------------------------
def downMinuteBarBySymbol(symbol, period, start, end):
"""下载某一合约的分钟线数据"""
startTime = time.time()
cl = db[symbol] # 集合
cl.ensure_index([('datetime', ASCENDING)], unique=True) # 添加索引
startDt = datetime.datetime.strptime(start, '%Y%m%d')
endDt = datetime.datetime.strptime(end, '%Y%m%d')
url = 'https://rest.coinapi.io/v1/ohlcv/%s/history' %symbol
params = {
'period_id': period,
'time_start': startDt.strftime('%Y-%m-%dT%H:%M:%S.%f0Z'),
'time_end': endDt.strftime('%Y-%m-%dT%H:%M:%S.%f0Z'),
'limit': 10000
}
resp = requests.get(url, headers=headers, params=params)
if resp.status_code != 200:
print(u'%s数据下载失败' %symbol)
return
l = resp.json()
for d in l:
bar = generateVtBar(symbol, d)
d = bar.__dict__
flt = {'datetime': bar.datetime}
cl.replace_one(flt, d, True)
endTime = time.time()
cost = (endTime - startTime) * 1000
print(u'合约%s数据下载完成%s - %s,耗时%s毫秒' %(symbol, l[0]['time_period_start'],
l[-1]['time_period_end'], cost))
#----------------------------------------------------------------------
def downloadAllMinuteBar(start, end):
"""下载所有配置中的合约的分钟线数据"""
print('-' * 50)
print(u'开始下载合约分钟线数据')
print('-' * 50)
for symbol in SYMBOLS:
downMinuteBarBySymbol(symbol, '1MIN', start, end)
time.sleep(1)
print('-' * 50)
print(u'合约分钟线数据下载完成')
print('-' * 50)

View File

@ -0,0 +1,11 @@
# encoding: UTF-8
"""
立即下载数据到数据库中用于手动执行更新操作
"""
from dataService import *
if __name__ == '__main__':
downMinuteBarBySymbol('BINANCE_SPOT_BTC_USDT', '1MIN', '20180725', '20180726')

View File

@ -0,0 +1,33 @@
# encoding: UTF-8
"""
定时服务可无人值守运行实现每日自动下载更新历史行情数据到数据库中
"""
from __future__ import print_function
from dataService import *
if __name__ == '__main__':
taskCompletedDate = None
taskTime = datetime.time(hour=22, minute=0)
# 进入主循环
while True:
t = datetime.datetime.now()
# 每天到达任务下载时间后,执行数据下载的操作
if t.time() > taskTime and (taskCompletedDate is None or t.date() != taskCompletedDate):
end = t.strftime('%Y%m%d')
start = (t - datetime.timedelta(1)).strftime('%Y%m%d')
# 下载过去24小时的K线数据
downloadAllMinuteBar(start, end)
# 更新任务完成的日期
taskCompletedDate = t.date()
else:
print(u'当前时间%s,任务定时%s' %(t, taskTime))
time.sleep(60)

View File

@ -0,0 +1,5 @@
{
"apiKey": "",
"apiSecret": "",
"symbols": ["BTC-USDT", "ETH-USDT", "EOS-USDT"]
}

View File

@ -0,0 +1,5 @@
{
"apiKey": "",
"secretKey": "",
"symbols": ["BTCUSDT", "ETHUSDT", "ETHBTC"]
}

View File

@ -0,0 +1,5 @@
{
"apiKey": "",
"secretKey": "",
"symbols": ["BTCUSD", "ETHUSD", "ETHBTC"]
}

View File

@ -0,0 +1,6 @@
{
"apiKey": "",
"apiSecret": "",
"sessionCount": 3,
"symbols": ["XBTUSD", "EOSM18", "XRPM18"]
}

View File

@ -0,0 +1,6 @@
{
"exchange": "huobipro",
"apiKey": "",
"apiSecret": "",
"symbols": ["THETA/BTC", "BTC/USDT", "ETH/USDT"]
}

View File

@ -0,0 +1,7 @@
{
"apiKey": "",
"secretKey": "",
"passphrase": "",
"sessionCount": 10,
"symbols": ["ETH-USD"]
}

View File

@ -0,0 +1,5 @@
{
"apiKey": "",
"apiSecret": "",
"symbols": ["ethusdt"]
}

View File

@ -0,0 +1,6 @@
{
"exchange": "huobi",
"accessKey": "",
"secretKey": "",
"symbols": ["btcusdt","ethusdt","ethbtc"]
}

View File

@ -0,0 +1,5 @@
{
"apiKey": "",
"secretKey": "",
"symbols": ["eth_usdt", "sc_btc", "btc_usdt"]
}

View File

@ -0,0 +1,6 @@
{
"apiKey": "",
"secretKey": "",
"trace": false,
"symbols": ["eth_btc", "btc_usdt", "eth_usdt"]
}

View File

@ -0,0 +1,20 @@
{
"fontFamily": "微软雅黑",
"fontSize": 12,
"mongoHost": "localhost",
"mongoPort": 27017,
"mongoLogging": true,
"darkStyle": true,
"language": "chinese",
"logActive": false,
"logLevel": "debug",
"logConsole": true,
"logFile": true,
"tdPenalty": [],
"maxDecimal": 10
}

View File

@ -0,0 +1,3 @@
templateName,settingName,vtSymbol,direction,targetPrice,totalVolume,time,interval,priceLevel,minVolume
TWAP,BUY_BTC_TWAP,BTCUSD.BITFINEX,多,4000,10,200,10,3,1
TWAP,SELL_BTC_TWAP,BTCUSD.BITFINEX,空,9000,10,200,10,3,1
1 templateName settingName vtSymbol direction targetPrice totalVolume time interval priceLevel minVolume
2 TWAP BUY_BTC_TWAP BTCUSD.BITFINEX 4000 10 200 10 3 1
3 TWAP SELL_BTC_TWAP BTCUSD.BITFINEX 9000 10 200 10 3 1

View File

@ -0,0 +1,70 @@
# encoding: UTF-8
# 重载sys模块设置默认字符串编码方式为utf8
try:
reload # Python 2
except NameError: # Python 3
from importlib import reload
import sys
reload(sys)
sys.setdefaultencoding('utf8')
# 判断操作系统
import platform
system = platform.system()
# vn.trader模块
from vnpy.event import EventEngine
from vnpy.trader.vtEngine import MainEngine
from vnpy.trader.uiQt import createQApp
# 加载底层接口
from vnpy.trader.gateway import (huobiGateway, okexGateway,
binanceGateway, bitfinexGateway,
bitmexGateway, fcoinGateway,
bigoneGateway, lbankGateway,
coinbaseGateway, ccxtGateway)
# 加载上层应用
from vnpy.trader.app import (algoTrading)
# 当前目录组件
from uiCryptoWindow import MainWindow
#----------------------------------------------------------------------
def main():
"""主程序入口"""
# 创建Qt应用对象
qApp = createQApp()
# 创建事件引擎
ee = EventEngine()
# 创建主引擎
me = MainEngine(ee)
# 添加交易接口
me.addGateway(ccxtGateway)
me.addGateway(coinbaseGateway)
me.addGateway(lbankGateway)
me.addGateway(bigoneGateway)
me.addGateway(fcoinGateway)
me.addGateway(bitmexGateway)
me.addGateway(huobiGateway)
me.addGateway(okexGateway)
me.addGateway(binanceGateway)
me.addGateway(bitfinexGateway)
# 添加上层应用
me.addApp(algoTrading)
# 创建主窗口
mw = MainWindow(me, ee)
mw.showMaximized()
# 在主线程中启动Qt事件循环
sys.exit(qApp.exec_())
if __name__ == '__main__':
main()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,333 @@
# encoding: UTF-8
import psutil
import traceback
from vnpy.trader.vtFunction import loadIconPath
from vnpy.trader.vtGlobal import globalSetting
from uiCryptoWidget import *
########################################################################
class MainWindow(QtWidgets.QMainWindow):
"""主窗口"""
signalStatusBar = QtCore.Signal(type(Event()))
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine):
"""Constructor"""
super(MainWindow, self).__init__()
self.mainEngine = mainEngine
self.eventEngine = eventEngine
l = self.mainEngine.getAllGatewayDetails()
self.gatewayNameList = [d['gatewayName'] for d in l]
self.widgetDict = {} # 用来保存子窗口的字典
# 获取主引擎中的上层应用信息
self.appDetailList = self.mainEngine.getAllAppDetails()
self.initUi()
self.loadWindowSettings('custom')
#----------------------------------------------------------------------
def initUi(self):
"""初始化界面"""
self.setWindowTitle('VnTrader')
self.initCentral()
self.initMenu()
self.initStatusBar()
#----------------------------------------------------------------------
def initCentral(self):
"""初始化中心区域"""
widgetTradingW, dockTradingW = self.createDock(TradingWidget, vtText.TRADING, QtCore.Qt.RightDockWidgetArea)
widgetMarketM, dockMarketM = self.createDock(MarketMonitor, vtText.MARKET_DATA, QtCore.Qt.LeftDockWidgetArea)
widgetOrderM, dockOrderM = self.createDock(OrderMonitor, vtText.ORDER, QtCore.Qt.LeftDockWidgetArea)
widgetWorkingOrderM, dockWorkingOrderM = self.createDock(WorkingOrderMonitor, vtText.WORKING_ORDER, QtCore.Qt.LeftDockWidgetArea)
widgetTradeM, dockTradeM = self.createDock(TradeMonitor, vtText.TRADE, QtCore.Qt.LeftDockWidgetArea)
widgetAccountM, dockAccountM = self.createDock(AccountMonitor, vtText.ACCOUNT, QtCore.Qt.RightDockWidgetArea)
widgetPositionM, dockPositionM = self.createDock(PositionMonitor, vtText.POSITION, QtCore.Qt.RightDockWidgetArea)
widgetLogM, dockLogM = self.createDock(LogMonitor, vtText.LOG, QtCore.Qt.RightDockWidgetArea)
self.tabifyDockWidget(dockOrderM, dockWorkingOrderM)
self.tabifyDockWidget(dockPositionM, dockAccountM)
# 保存默认设置
self.saveWindowSettings('default')
#----------------------------------------------------------------------
def initMenu(self):
"""初始化菜单"""
# 创建菜单
menubar = self.menuBar()
# 设计为只显示存在的接口
gatewayDetails = self.mainEngine.getAllGatewayDetails()
sysMenu = menubar.addMenu(vtText.SYSTEM)
for d in gatewayDetails:
if d['gatewayType'] == GATEWAYTYPE_FUTURES:
self.addConnectAction(sysMenu, d['gatewayName'], d['gatewayDisplayName'])
sysMenu.addSeparator()
for d in gatewayDetails:
if d['gatewayType'] == GATEWAYTYPE_EQUITY:
self.addConnectAction(sysMenu, d['gatewayName'], d['gatewayDisplayName'])
sysMenu.addSeparator()
for d in gatewayDetails:
if d['gatewayType'] == GATEWAYTYPE_INTERNATIONAL:
self.addConnectAction(sysMenu, d['gatewayName'], d['gatewayDisplayName'])
sysMenu.addSeparator()
for d in gatewayDetails:
if d['gatewayType'] == GATEWAYTYPE_BTC:
self.addConnectAction(sysMenu, d['gatewayName'], d['gatewayDisplayName'])
sysMenu.addSeparator()
for d in gatewayDetails:
if d['gatewayType'] == GATEWAYTYPE_DATA:
self.addConnectAction(sysMenu, d['gatewayName'], d['gatewayDisplayName'])
sysMenu.addSeparator()
sysMenu.addAction(self.createAction(vtText.CONNECT_DATABASE, self.mainEngine.dbConnect, loadIconPath('database.ico')))
sysMenu.addSeparator()
sysMenu.addAction(self.createAction(vtText.EXIT, self.close, loadIconPath('exit.ico')))
# 功能应用
appMenu = menubar.addMenu(vtText.APPLICATION)
for appDetail in self.appDetailList:
# 如果没有应用界面,则不添加菜单按钮
if not appDetail['appWidget']:
continue
function = self.createOpenAppFunction(appDetail)
action = self.createAction(appDetail['appDisplayName'], function, loadIconPath(appDetail['appIco']))
appMenu.addAction(action)
# 帮助
helpMenu = menubar.addMenu(vtText.HELP)
helpMenu.addAction(self.createAction(vtText.CONTRACT_SEARCH, self.openContract, loadIconPath('contract.ico')))
helpMenu.addAction(self.createAction(vtText.EDIT_SETTING, self.openSettingEditor, loadIconPath('editor.ico')))
helpMenu.addSeparator()
helpMenu.addAction(self.createAction(vtText.RESTORE, self.restoreWindow, loadIconPath('restore.ico')))
helpMenu.addAction(self.createAction(vtText.ABOUT, self.openAbout, loadIconPath('about.ico')))
helpMenu.addSeparator()
helpMenu.addAction(self.createAction(vtText.TEST, self.test, loadIconPath('test.ico')))
#----------------------------------------------------------------------
def initStatusBar(self):
"""初始化状态栏"""
self.statusLabel = QtWidgets.QLabel()
self.statusLabel.setAlignment(QtCore.Qt.AlignLeft)
self.statusBar().addPermanentWidget(self.statusLabel)
self.statusLabel.setText(self.getCpuMemory())
self.sbCount = 0
self.sbTrigger = 10 # 10秒刷新一次
self.signalStatusBar.connect(self.updateStatusBar)
self.eventEngine.register(EVENT_TIMER, self.signalStatusBar.emit)
#----------------------------------------------------------------------
def updateStatusBar(self, event):
"""在状态栏更新CPU和内存信息"""
self.sbCount += 1
if self.sbCount == self.sbTrigger:
self.sbCount = 0
self.statusLabel.setText(self.getCpuMemory())
#----------------------------------------------------------------------
def getCpuMemory(self):
"""获取CPU和内存状态信息"""
cpuPercent = psutil.cpu_percent()
memoryPercent = psutil.virtual_memory().percent
return vtText.CPU_MEMORY_INFO.format(cpu=cpuPercent, memory=memoryPercent)
#----------------------------------------------------------------------
def addConnectAction(self, menu, gatewayName, displayName=''):
"""增加连接功能"""
if gatewayName not in self.gatewayNameList:
return
def connect():
self.mainEngine.connect(gatewayName)
if not displayName:
displayName = gatewayName
actionName = vtText.CONNECT + displayName
connectAction = self.createAction(actionName, connect,
loadIconPath('connect.ico'))
menu.addAction(connectAction)
#----------------------------------------------------------------------
def createAction(self, actionName, function, iconPath=''):
"""创建操作功能"""
action = QtWidgets.QAction(actionName, self)
action.triggered.connect(function)
if iconPath:
icon = QtGui.QIcon(iconPath)
action.setIcon(icon)
return action
#----------------------------------------------------------------------
def createOpenAppFunction(self, appDetail):
"""创建打开应用UI的函数"""
def openAppFunction():
appName = appDetail['appName']
try:
self.widgetDict[appName].show()
except KeyError:
appEngine = self.mainEngine.getApp(appName)
self.widgetDict[appName] = appDetail['appWidget'](appEngine, self.eventEngine)
self.widgetDict[appName].show()
return openAppFunction
#----------------------------------------------------------------------
def test(self):
"""测试按钮用的函数"""
# 有需要使用手动触发的测试函数可以写在这里
pass
#----------------------------------------------------------------------
def openAbout(self):
"""打开关于"""
try:
self.widgetDict['aboutW'].show()
except KeyError:
self.widgetDict['aboutW'] = AboutWidget(self)
self.widgetDict['aboutW'].show()
#----------------------------------------------------------------------
def openContract(self):
"""打开合约查询"""
try:
self.widgetDict['contractM'].show()
except KeyError:
self.widgetDict['contractM'] = ContractManager(self.mainEngine)
self.widgetDict['contractM'].show()
#----------------------------------------------------------------------
def openSettingEditor(self):
"""打开配置编辑"""
try:
self.widgetDict['settingEditor'].show()
except KeyError:
self.widgetDict['settingEditor'] = SettingEditor(self.mainEngine)
self.widgetDict['settingEditor'].show()
#----------------------------------------------------------------------
def closeEvent(self, event):
"""关闭事件"""
reply = QtWidgets.QMessageBox.question(self, vtText.EXIT,
vtText.CONFIRM_EXIT, QtWidgets.QMessageBox.Yes |
QtWidgets.QMessageBox.No, QtWidgets.QMessageBox.No)
if reply == QtWidgets.QMessageBox.Yes:
for widget in self.widgetDict.values():
widget.close()
self.saveWindowSettings('custom')
self.mainEngine.exit()
event.accept()
else:
event.ignore()
#----------------------------------------------------------------------
def createDock(self, widgetClass, widgetName, widgetArea):
"""创建停靠组件"""
widget = widgetClass(self.mainEngine, self.eventEngine)
dock = QtWidgets.QDockWidget(widgetName)
dock.setWidget(widget)
dock.setObjectName(widgetName)
dock.setFeatures(dock.DockWidgetFloatable|dock.DockWidgetMovable)
self.addDockWidget(widgetArea, dock)
return widget, dock
#----------------------------------------------------------------------
def saveWindowSettings(self, settingName):
"""保存窗口设置"""
settings = QtCore.QSettings('vn.trader', settingName)
settings.setValue('state', self.saveState())
settings.setValue('geometry', self.saveGeometry())
#----------------------------------------------------------------------
def loadWindowSettings(self, settingName):
"""载入窗口设置"""
settings = QtCore.QSettings('vn.trader', settingName)
state = settings.value('state')
geometry = settings.value('geometry')
# 尚未初始化
if state is None:
return
# 老版PyQt
elif isinstance(state, QtCore.QVariant):
self.restoreState(state.toByteArray())
self.restoreGeometry(geometry.toByteArray())
# 新版PyQt
elif isinstance(state, QtCore.QByteArray):
self.restoreState(state)
self.restoreGeometry(geometry)
# 异常
else:
content = u'载入窗口配置异常,请检查'
self.mainEngine.writeLog(content)
#----------------------------------------------------------------------
def restoreWindow(self):
"""还原默认窗口设置(还原停靠组件位置)"""
self.loadWindowSettings('default')
self.showMaximized()
########################################################################
class AboutWidget(QtWidgets.QDialog):
"""显示关于信息"""
#----------------------------------------------------------------------
def __init__(self, parent=None):
"""Constructor"""
super(AboutWidget, self).__init__(parent)
self.initUi()
#----------------------------------------------------------------------
def initUi(self):
""""""
self.setWindowTitle(vtText.ABOUT + 'VnTrader')
text = u"""
Developed by Traders, for Traders.
LicenseMIT
Websitewww.vnpy.org
Githubwww.github.com/vnpy/vnpy
"""
label = QtWidgets.QLabel()
label.setText(text)
label.setMinimumWidth(500)
vbox = QtWidgets.QVBoxLayout()
vbox.addWidget(label)
self.setLayout(vbox)

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,3 @@
from vnpy.trader.app.ctaStrategy.ctaBacktesting import runHistoryDataServer
runHistoryDataServer()

View File

@ -1,5 +1,6 @@
{
"working": true,
"marketCloseTime": "15:05:00",
"tick":
[

View File

@ -0,0 +1,19 @@
[
{
"name": "double ema",
"className": "DoubleMaStrategy",
"vtSymbol": "rb1805"
},
{
"name": "atr rsi",
"className": "AtrRsiStrategy",
"vtSymbol": "IC1802"
},
{
"name": "king keltner",
"className": "KkStrategy",
"vtSymbol": "IH1802"
}
]

View File

@ -0,0 +1,7 @@
{
"host": "127.0.0.1",
"port": 11111,
"market": "HK",
"password": "321321",
"env": "REAL"
}

View File

@ -0,0 +1,10 @@
{
"active": false,
"orderFlowLimit": 50,
"orderFlowClear": 1,
"orderSizeLimit": 100,
"tradeLimit": 1000,
"workingOrderLimit": 20,
"orderCancelLimit": 10,
"marginRatioLimit": 0.85
}

View File

@ -0,0 +1,20 @@
{
"fontFamily": "微软雅黑",
"fontSize": 12,
"mongoHost": "localhost",
"mongoPort": 27017,
"mongoLogging": true,
"darkStyle": true,
"language": "chinese",
"logActive": true,
"logLevel": "debug",
"logConsole": true,
"logFile": true,
"tdPenalty": ["IF", "IH", "IC"],
"maxDecimal": 4
}

View File

@ -0,0 +1,46 @@
# encoding: UTF-8
import sys
# vn.trader模块
from vnpy.event import EventEngine
from vnpy.trader.vtEngine import MainEngine
from vnpy.trader.uiQt import createQApp
from vnpy.trader.uiMainWindow import MainWindow
# 加载底层接口
from vnpy.trader.gateway import futuGateway
# 加载上层应用
from vnpy.trader.app import riskManager, ctaStrategy
#----------------------------------------------------------------------
def main():
"""主程序入口"""
# 创建Qt应用对象
qApp = createQApp()
# 创建事件引擎
ee = EventEngine()
# 创建主引擎
me = MainEngine(ee)
# 添加交易接口
me.addGateway(futuGateway)
# 添加上层应用
me.addApp(riskManager)
me.addApp(ctaStrategy)
# 创建主窗口
mw = MainWindow(me, ee)
mw.showMaximized()
# 在主线程中启动Qt事件循环
sys.exit(qApp.exec_())
if __name__ == '__main__':
main()

Binary file not shown.

View File

@ -0,0 +1 @@
'data', (0, 1462865)

View File

@ -2,12 +2,16 @@
本文件夹中的内容主要是关于如何在交易业务中使用vn.py的示例
* VnTrader最常用的vn.py图形交易界面
* CryptoTradervn.crypto数字货币交易平台
* VnTrader最常用的vn.py图形交易系统
* OptionMaster: 期权量化交易系统
* WebTrader使用Web前端作为监控的交易系统
* FutuTrader针对富途证券futuquant的交易系统只支持Python 3
* DataRecording全自动行情记录工具无需用户每日定时重启
* CtaTrading无图形界面模式的CTA策略交易
@ -24,4 +28,4 @@
* FutuDataService富途证券历史行情服务美股、港股
* QuantosDataService: quantOS历史行情服务A股、期货
* CoinapiDataServiceCoinAPI.io历史行情服务数字货币

View File

@ -3,5 +3,5 @@
"mdAddress": "tcp://180.168.146.187:10031",
"tdAddress": "tcp://180.168.146.187:10030",
"userID": "000300",
"password": "19890624"
"password": "123456789"
}

View File

@ -1,5 +0,0 @@
{
"token": "请在OANDA网站申请",
"accountId": "请在OANDA网站申请",
"settingName": "practice"
}

View File

@ -5,9 +5,14 @@ try:
reload # Python 2
except NameError: # Python 3
from importlib import reload
import sys
reload(sys)
sys.setdefaultencoding('utf8')
try:
sys.setdefaultencoding('utf8')
except AttributeError:
pass
# 判断操作系统
import platform
@ -20,17 +25,17 @@ from vnpy.trader.uiQt import createQApp
from vnpy.trader.uiMainWindow import MainWindow
# 加载底层接口
from vnpy.trader.gateway import (ctpGateway, oandaGateway,
ibGateway)
from vnpy.trader.gateway import (ctpGateway, ibGateway)
if system == 'Linux':
from vnpy.trader.gateway import xtpGateway
elif system == 'Windows':
from vnpy.trader.gateway import (femasGateway, xspeedGateway,
futuGateway, secGateway)
secGateway)
# 加载上层应用
from vnpy.trader.app import (riskManager, ctaStrategy, spreadTrading)
from vnpy.trader.app import (riskManager, ctaStrategy,
spreadTrading, algoTrading)
#----------------------------------------------------------------------
@ -47,14 +52,12 @@ def main():
# 添加交易接口
me.addGateway(ctpGateway)
me.addGateway(oandaGateway)
me.addGateway(ibGateway)
if system == 'Windows':
me.addGateway(femasGateway)
me.addGateway(xspeedGateway)
me.addGateway(secGateway)
me.addGateway(futuGateway)
if system == 'Linux':
me.addGateway(xtpGateway)
@ -63,6 +66,7 @@ def main():
me.addApp(riskManager)
me.addApp(ctaStrategy)
me.addApp(spreadTrading)
me.addApp(algoTrading)
# 创建主窗口
mw = MainWindow(me, ee)

View File

@ -6,17 +6,17 @@
1. 修改CTP_connect.json中的账号和服务器地址
2. 修改WEB_setting.json中的网页登录用户名和密码
3. 打开cmd运行python server.py
4. 打开另一个cmd运行python run.py
5. 用浏览器推荐Chrome访问http://localhost:5000/
6. 输入2中的用户名和密码登录后点击左下角的“连接CTP”
7. 网页前端的使用方法和常规版本的VnTrader基本一致
8. 如需运行CTA策略请修改CTA_setting.json中的配置
3. 打开cmd运行python run.py
4. 浏览器将会自动打开并访问http://127.0.0.1:5000/
5. 输入2中的用户名和密码登录后点击左下角的“连接CTP”
6. 网页前端的使用方法和常规版本的VnTrader基本一致
7. 如需运行CTA策略请修改CTA_setting.json中的配置
## 文件功能
* server.py基于vnpy.rpc模块实现的交易服务器包含CTP接口和CTA策略模块
* run.py基于Flask实现的Web服务器内部通过vnpy.rpc客户端来访问交易服务器
* tradingServer.py基于vnpy.rpc模块实现的交易服务器包含CTP接口和CTA策略模块
* webServer.py基于Flask实现的Web服务器内部通过vnpy.rpc客户端来访问交易服务器
* run.py: 无人值守服务
## 架构设计

View File

@ -9,6 +9,8 @@ from time import sleep
from datetime import datetime, time
from multiprocessing import Process
import webbrowser
from webServer import run as runWebServer
from tradingServer import main as runTradingServer
from vnpy.trader.vtEngine import LogEngine
@ -28,6 +30,9 @@ if __name__ == '__main__':
pWeb = None
pTrading = None
import os
print(os.getpid())
while True:
le.info('-'*30)
@ -48,6 +53,8 @@ if __name__ == '__main__':
pWeb = Process(target=runWebServer)
pWeb.start()
le.info(u'启动WEB服务器进程')
webbrowser.open('http://127.0.0.1:5000')
else:
le.info(u'当前处于非交易时间段')

View File

@ -50,8 +50,8 @@ with open("WEB_setting.json") as f:
# 创建Flask对象
from flask import Flask, send_file
from flask.ext.restful import Api, Resource, reqparse
from flask.ext.socketio import SocketIO
from flask_restful import Api, Resource, reqparse
from flask_socketio import SocketIO
from flask_cors import *
app = Flask(__name__)
@ -510,6 +510,7 @@ class CtaStrategyStop(Resource):
engine.stopStrategy(name)
return {'result_code':'success','data':''}
########################################################################
class CtaStrategyName(Resource):
"""»ñÈ¡²ßÂÔÃû"""
@ -533,6 +534,7 @@ class CtaStrategyName(Resource):
l = engine.getStrategyNames()
return {'result_code':'success','data':l}
########################################################################
class CtaStrategyLoad(Resource):
"""加载策略"""
@ -672,7 +674,11 @@ ee.register(EVENT_CTA_STRATEGY, handleEvent)
#----------------------------------------------------------------------
def run():
"""启动Web服务"""
socketio.run(app,debug=True,host='0.0.0.0',port=5000)
socketio.run(app,
debug=True,
host='0.0.0.0',
port=5000,
use_reloader=False)
if __name__ == '__main__':

View File

@ -2,7 +2,7 @@
@"%SystemRoot%\System32\WindowsPowerShell\v1.0\powershell.exe" -NoProfile -InputFormat None -ExecutionPolicy Bypass -Command "iex ((New-Object System.Net.WebClient).DownloadString('https://chocolatey.org/install.ps1'))" && SET "PATH=%PATH%;%ALLUSERSPROFILE%\chocolatey\bin"
::°²×°Anaconda
choco install anaconda2 --version 4.0.0 --x86 -y --ignorechecksum --params="'/AddToPath=1'"
choco install anaconda2 --version 5.2.0 --x86 -y --ignorechecksum --params="'/AddToPath=1'"
setx PATH "%PATH%;C:\Program Files\Anaconda2\;C:\Program Files\Anaconda2\Scripts\"
::°²×°VC Redist

View File

@ -6,7 +6,5 @@ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/f
conda config --set show_channel_urls yes
conda install -c quantopian ta-lib=0.4.9 -y
::conda install -c conda-forge python-snappy -y
:: Install vn.py
python setup.py install

View File

@ -24,7 +24,6 @@ pip install -r requirements.txt
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --set show_channel_urls yes
conda install -c quantopian ta-lib=0.4.9
#conda install -c conda-forge python-snappy
#Install vn.py
python setup.py install

View File

@ -9,4 +9,6 @@ future
flask-socketio
flask-restful
flask-cors
gevent-websocket
gevent-websocket
pyjwt
ccxt

View File

@ -1,4 +1,4 @@
# encoding: UTF-8
__version__ = '1.8.1'
__version__ = '1.9.0'
__author__ = 'Xiaoyou Chen'

View File

@ -1,24 +0,0 @@
# vn.api - API接口的Python封装
### 内盘
* vn.ctpCTP接口
* vn.lts华宝LTS接口
* vn.femas飞马接口
* vn.xspeed飞创接口
* vn.qdp量投QDP接口
* vn.sgit飞鼠接口
* vn.ksotp金仕达期权接口
* vn.ksgold金仕达黄金接口
### 外盘
* vn.ibInteractive Brokers接口
* vn.oandaOANDA接口
* vn.shzd直达期货接口
### 比特币
* vn.okcoin币行接口
* vn.huobi火币接口
* vn.lhang链行接口
### 数据下载
* vn.datayes通联数据接口

View File

@ -0,0 +1 @@
from .vnbigone import BigoneRestApi

154
vnpy/api/bigone/vnbigone.py Normal file
View File

@ -0,0 +1,154 @@
# encoding: UTF-8
from __future__ import print_function
import hashlib
import hmac
import json
import ssl
import traceback
import base64
from queue import Queue, Empty
from multiprocessing.dummy import Pool
from time import time
from urlparse import urlparse
from copy import copy
from urllib import urlencode
from threading import Thread
import requests
from jwt import PyJWS
from six.moves import input
REST_HOST = 'https://big.one/api/v2/'
########################################################################
class BigoneRestApi(object):
"""REST API"""
jws = PyJWS()
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.apiKey = ''
self.apiSecret = ''
self.active = False
self.reqid = 0
self.queue = Queue()
self.pool = None
self.sessionDict = {} # 会话对象字典
#----------------------------------------------------------------------
def init(self, apiKey, apiSecret):
"""初始化"""
self.apiKey = str(apiKey)
self.apiSecret = str(apiSecret)
#----------------------------------------------------------------------
def start(self, n=10):
"""启动"""
if self.active:
return
self.active = True
self.pool = Pool(n)
self.pool.map_async(self.run, range(n))
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.active = False
if self.pool:
self.pool.close()
self.pool.join()
#----------------------------------------------------------------------
def addReq(self, method, path, callback, params=None, postdict=None):
"""添加请求"""
self.reqid += 1
req = (method, path, callback, params, postdict, self.reqid)
self.queue.put(req)
return self.reqid
#----------------------------------------------------------------------
def processReq(self, req, i):
"""处理请求"""
method, path, callback, params, postdict, reqid = req
url = REST_HOST + path
header = {}
header['Authorization'] = 'Bearer ' + self.generateSignature()
try:
# 使用长连接的session比短连接的耗时缩短20%
session = self.sessionDict[i]
resp = session.request(method, url, headers=header, params=params, json=postdict)
#resp = requests.request(method, url, headers=header, params=params, data=postdict)
code = resp.status_code
d = resp.json()
if code == 200:
callback(d, reqid)
else:
self.onError(code, str(d))
except Exception as e:
self.onError(type(e), e.message)
#----------------------------------------------------------------------
def run(self, i):
"""连续运行"""
self.sessionDict[i] = requests.Session()
while self.active:
try:
req = self.queue.get(timeout=1)
self.processReq(req, i)
except Empty:
pass
#----------------------------------------------------------------------
def generateSignature(self):
"""生成签名"""
payload = '{"type":"OpenAPI","sub":"%s","nonce":%s}' %(self.apiKey, time()*1000000000)
signature = self.jws.encode(payload, self.apiSecret)
return signature
#----------------------------------------------------------------------
def onError(self, code, error):
"""错误回调"""
print('on error')
print(code, error)
#----------------------------------------------------------------------
def onData(self, data, reqid):
"""通用回调"""
print('on data')
print(data, reqid)
if __name__ == '__main__':
from datetime import datetime
from time import sleep
API_KEY = ''
API_SECRET = ''
# REST测试
rest = BigoneRestApi()
rest.init(API_KEY, API_SECRET)
rest.start(1)
#rest.addReq('GET', '/markets/EOS-BTC/depth', rest.onData)
rest.addReq('GET', '/viewer/orders', rest.onData)
input()

View File

@ -0,0 +1 @@
from .vnbinance import BinanceApi

46
vnpy/api/binance/test.py Normal file
View File

@ -0,0 +1,46 @@
from __future__ import absolute_import
from time import sleep
from six.moves import input
from .vnbinance import BinanceApi
if __name__ == '__main__':
apiKey = ''
secretKey = ''
api = BinanceApi()
api.init(apiKey, secretKey)
api.start()
#api.queryPing()
#api.queryTime()
#api.queryExchangeInfo()
api.queryDepth('BTCUSDT')
#api.queryDepth('BTCUSDT')
#api.queryTrades('BTCUSDT')
#api.queryAggTrades('BTCUSDT')
#api.queryKlines('BTCUSDT', '1m')
#api.queryTicker24HR()
#api.queryTickerPrice()
#api.queryBookTicker()
api.queryAccount()
#api.queryOrder('BTCUSDT', '1231231')
#api.queryOpenOrders('BTCUSDT')
#api.queryAllOrders('BTCUSDT')
#api.queryMyTrades('BTCUSDT')
#api.startStream()
#api.keepaliveStream('12312312')
#api.closeStream('123213')
#api.newOrder('BTCUSDT', 'BUY', 'LIMIT', 3000, 1, 'GTC')
#api.cancelOrder('BTCUSDT', '132213123')
#api.initDataStream(['btcusdt@ticker', 'btcusdt@depth5'])
#api.initUserStream('NXvaiFwZz2LuKqINVerKOnWaQQG1JhdQNejiZKY9Kmgk4lZgTvm3nRAnXJK7')
input()

View File

@ -0,0 +1,630 @@
# encoding: UTF-8
from __future__ import print_function
import json
import requests
import hmac
import hashlib
import traceback
import ssl
from queue import Queue, Empty
from threading import Thread
from multiprocessing.dummy import Pool
from time import time, sleep
from urllib import urlencode
from websocket import create_connection
REST_ENDPOINT = 'https://www.binance.com'
DATASTREAM_ENDPOINT = 'wss://stream.binance.com:9443/stream?streams='
USERSTREAM_ENDPOINT = 'wss://stream.binance.com:9443/ws/'
########################################################################
class BinanceApi(object):
""""""
###################################################
## Basic Function
###################################################
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.apiKey = ''
self.secretKey = ''
self.active = False
self.reqid = 0
self.queue = Queue()
self.pool = None
self.headers = {}
self.secret = ''
self.recvWindow = 5000
self.dataStreamNameList = []
self.dataStreamUrl = ''
self.dataStreamActive = False
self.dataStreamWs = None
self.dataStreamThread = None
self.userStreamKey = ''
self.userStreamUrl = ''
self.userStreamActive = False
self.userStreamWs = None
self.userStreamThread = None
self.keepaliveCount = 0
self.keepaliveThread = None
#----------------------------------------------------------------------
def init(self, apiKey, secretKey, recvWindow=5000):
""""""
self.apiKey = apiKey
self.secretKey = secretKey
self.headers['X-MBX-APIKEY'] = apiKey
self.secret = bytes(secretKey.encode('utf-8'))
self.recvWindow = recvWindow
#----------------------------------------------------------------------
def start(self, n=10):
""""""
if self.active:
return
self.active = True
self.pool = Pool(n)
self.pool.map_async(self.run, range(n))
#----------------------------------------------------------------------
def close(self):
""""""
self.active = False
if self.pool:
self.pool.close()
self.pool.join()
#----------------------------------------------------------------------
def request(self, method, path, params=None, signed=False, stream=False):
""""""
if not signed:
url = REST_ENDPOINT + path
headers = {}
else:
if not stream:
params['recvWindow'] = self.recvWindow
params['timestamp'] = int(time()*1000)
query = urlencode(sorted(params.items()))
signature = hmac.new(self.secret, query.encode('utf-8'),
hashlib.sha256).hexdigest()
query += "&signature={}".format(signature)
url = REST_ENDPOINT + path + '?' + query
params = None # 参数添加到query中后清空参数字典
else:
if params:
query = urlencode(sorted(params.items()))
url = REST_ENDPOINT + path + '?' + query
params = None
else:
url = REST_ENDPOINT + path
headers = self.headers
try:
resp = requests.request(method, url, params=params, headers=headers)
if resp.status_code == 200:
return True, resp.json()
else:
error = {
'method': method,
'params': params,
'code': resp.status_code,
'msg': resp.json()['msg']
}
return False, error
except Exception as e:
error = {
'method': method,
'params': params,
'code': e,
'msg': traceback.format_exc()
}
return False, error
#----------------------------------------------------------------------
def addReq(self, method, path, params, callback, signed=False, stream=False):
"""添加请求"""
self.reqid += 1
req = (method, path, params, callback, signed, stream, self.reqid)
self.queue.put(req)
return self.reqid
#----------------------------------------------------------------------
def processReq(self, req):
""""""
method, path, params, callback, signed, stream, reqid = req
result, data = self.request(method, path, params, signed, stream)
if result:
callback(data, reqid)
else:
self.onError(data, reqid)
#----------------------------------------------------------------------
def run(self, n):
""""""
while self.active:
try:
req = self.queue.get(timeout=1)
self.processReq(req)
except Empty:
pass
###################################################
## REST Function
###################################################
#----------------------------------------------------------------------
def queryPing(self):
""""""
path = '/api/v1/ping'
return self.addReq('GET', path, {}, self.onQueryPing)
#----------------------------------------------------------------------
def queryTime(self):
""""""
path = '/api/v1/time'
return self.addReq('GET', path, {}, self.onQueryTime)
#----------------------------------------------------------------------
def queryExchangeInfo(self):
""""""
path = '/api/v1/exchangeInfo'
return self.addReq('GET', path, {}, self.onQueryExchangeInfo)
#----------------------------------------------------------------------
def queryDepth(self, symbol, limit=0):
""""""
path = '/api/v1/depth'
params = {'symbol': symbol}
if limit:
params['limit'] = limit
return self.addReq('GET', path, params, self.onQueryDepth)
#----------------------------------------------------------------------
def queryTrades(self, symbol, limit=0):
""""""
path = '/api/v1/trades'
params = {'symbol': symbol}
if limit:
params['limit'] = limit
return self.addReq('GET', path, params, self.onQueryTrades)
#----------------------------------------------------------------------
def queryAggTrades(self, symbol, fromId=0, startTime=0, endTime=0, limit=0):
""""""
path = '/api/v1/aggTrades'
params = {'symbol': symbol}
if fromId:
params['fromId'] = fromId
if startTime:
params['startTime'] = startTime
if endTime:
params['endTime'] = endTime
if limit:
params['limit'] = limit
return self.addReq('GET', path, params, self.onQueryAggTrades)
#----------------------------------------------------------------------
def queryKlines(self, symbol, interval, limit=0, startTime=0, endTime=0):
""""""
path = '/api/v1/klines'
params = {
'symbol': symbol,
'interval': interval
}
if limit:
params['limit'] = limit
if startTime:
params['startTime'] = startTime
if endTime:
params['endTime'] = endTime
return self.addReq('GET', path, params, self.onQueryKlines)
#----------------------------------------------------------------------
def queryTicker24HR(self, symbol=''):
""""""
path = '/api/v1/ticker/24hr'
params = {}
if symbol:
params['symbol'] = symbol
return self.addReq('GET', path, params, self.onQueryTicker24HR)
#----------------------------------------------------------------------
def queryTickerPrice(self, symbol=''):
""""""
path = '/api/v3/ticker/price'
params = {}
if symbol:
params['symbol'] = symbol
return self.addReq('GET', path, params, self.onQueryTickerPrice)
#----------------------------------------------------------------------
def queryBookTicker(self, symbol=''):
""""""
path = '/api/v3/ticker/bookTicker'
params = {}
if symbol:
params['symbol'] = symbol
return self.addReq('GET', path, params, self.onQueryBookTicker)
#----------------------------------------------------------------------
def newOrder(self, symbol, side, type_, price, quantity, timeInForce,
newClientOrderId='', stopPrice=0, icebergQty=0, newOrderRespType=''):
""""""
path = '/api/v3/order'
params = {
'symbol': symbol,
'side': side,
'type': type_,
'price': price,
'quantity': quantity,
'timeInForce': timeInForce
}
if newClientOrderId:
params['newClientOrderId'] = newClientOrderId
if timeInForce:
params['timeInForce'] = timeInForce
if stopPrice:
params['stopPrice'] = stopPrice
if icebergQty:
params['icebergQty'] = icebergQty
if newOrderRespType:
params['newOrderRespType'] = newOrderRespType
return self.addReq('POST', path, params, self.onNewOrder, signed=True)
#----------------------------------------------------------------------
def queryOrder(self, symbol, orderId=0, origClientOrderId=0):
""""""
path = '/api/v3/order'
params = {'symbol': symbol}
if orderId:
params['orderId'] = orderId
if origClientOrderId:
params['origClientOrderId'] = origClientOrderId
return self.addReq('GET', path, params, self.onQueryOrder, signed=True)
#----------------------------------------------------------------------
def cancelOrder(self, symbol, orderId=0, origClientOrderId='',
newClientOrderId=''):
""""""
path = '/api/v3/order'
params = {'symbol': symbol}
if orderId:
params['orderId'] = orderId
if origClientOrderId:
params['origClientOrderId'] = origClientOrderId
if newClientOrderId:
params['newClientOrderId'] = newClientOrderId
return self.addReq('DELETE', path, params, self.onCancelOrder, signed=True)
#----------------------------------------------------------------------
def queryOpenOrders(self, symbol=''):
""""""
path = '/api/v3/openOrders'
params = {}
if symbol:
params['symbol'] = symbol
return self.addReq('GET', path, params, self.onQueryOpenOrders, signed=True)
#----------------------------------------------------------------------
def queryAllOrders(self, symbol, orderId=0, limit=0):
""""""
path = '/api/v3/allOrders'
params = {'symbol': symbol}
if orderId:
params['orderId'] = orderId
if limit:
params['limit'] = limit
return self.addReq('GET', path, params, self.onQueryAllOrders, signed=True)
#----------------------------------------------------------------------
def queryAccount(self):
""""""
path = '/api/v3/account'
params = {}
return self.addReq('GET', path, params, self.onQueryAccount, signed=True)
#----------------------------------------------------------------------
def queryMyTrades(self, symbol, limit=0, fromId=0):
""""""
path = '/api/v3/myTrades'
params = {'symbol': symbol}
if limit:
params['limit'] = limit
if fromId:
params['fromId'] = fromId
return self.addReq('GET', path, params, self.onQueryMyTrades, signed=True)
#----------------------------------------------------------------------
def startStream(self):
""""""
path = '/api/v1/userDataStream'
return self.addReq('POST', path, {}, self.onStartStream, signed=True, stream=True)
#----------------------------------------------------------------------
def keepaliveStream(self, listenKey):
""""""
path = '/api/v1/userDataStream'
params = {'listenKey': listenKey}
return self.addReq('PUT', path, params, self.onKeepaliveStream, signed=True, stream=True)
#----------------------------------------------------------------------
def closeStream(self, listenKey):
""""""
path = '/api/v1/userDataStream'
params = {'listenKey': listenKey}
return self.addReq('DELETE', path, params, self.onCloseStream, signed=True, stream=True)
###################################################
## REST Callback
###################################################
#----------------------------------------------------------------------
def onError(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onQueryPing(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onQueryTime(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onQueryExchangeInfo(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onQueryDepth(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onQueryTrades(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onQueryAggTrades(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onQueryKlines(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onQueryTicker24HR(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onQueryTickerPrice(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onQueryBookTicker(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onNewOrder(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onQueryOrder(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onCancelOrder(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onQueryOpenOrders(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onQueryAllOrders(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onQueryAccount(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onQueryMyTrades(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onStartStream(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onKeepaliveStream(self, data, reqid):
""""""
print((data, reqid))
#----------------------------------------------------------------------
def onCloseStream(self, data, reqid):
""""""
print((data, reqid))
###################################################
## Websocket Function
###################################################
#----------------------------------------------------------------------
def initDataStream(self, nameList=None):
""""""
if nameList:
self.dataStreamNameList = nameList
s = '/'.join(self.dataStreamNameList)
self.dataStreamUrl = DATASTREAM_ENDPOINT + s
result = self.connectDataStream()
if result:
self.dataStreamActive = True
self.dataStreamThread = Thread(target=self.runDataStream)
self.dataStreamThread.start()
#----------------------------------------------------------------------
def runDataStream(self):
""""""
while self.dataStreamActive:
try:
stream = self.dataStreamWs.recv()
data = json.loads(stream)
self.onMarketData(data)
except:
self.onDataStreamError('Data stream connection lost')
result = self.connectDataStream()
if not result:
self.onDataStreamError(u'Waiting 3 seconds to reconnect')
sleep(3)
else:
self.onDataStreamError(u'Data stream reconnected')
#----------------------------------------------------------------------
def closeDataStream(self):
""""""
self.dataStreamActive = False
self.dataStreamThread.join()
#----------------------------------------------------------------------
def connectDataStream(self):
""""""
try:
self.dataStreamWs = create_connection(self.dataStreamUrl,
sslopt={'cert_reqs': ssl.CERT_NONE})
return True
except:
msg = traceback.format_exc()
self.onDataStreamError('Connecting data stream falied: %s' %msg)
return False
#----------------------------------------------------------------------
def onDataStreamError(self, msg):
""""""
print(msg)
#----------------------------------------------------------------------
def onMarketData(self, data):
""""""
print(data)
#----------------------------------------------------------------------
def initUserStream(self, key):
""""""
self.userStreamKey = key
self.userStreamUrl = USERSTREAM_ENDPOINT + key
result = self.connectUserStream()
if result:
self.userStreamActive = True
self.userStreamThread = Thread(target=self.runUserStream)
self.userStreamThread.start()
self.keepaliveThread = Thread(target=self.runKeepalive)
self.keepaliveThread.start()
#----------------------------------------------------------------------
def runUserStream(self):
""""""
while self.userStreamActive:
try:
stream = self.userStreamWs.recv()
data = json.loads(stream)
self.onUserData(data)
except:
self.onUserStreamError('User stream connection lost')
result = self.connectUserStream()
if not result:
self.onUserStreamError(u'Waiting 3 seconds to reconnect')
sleep(3)
else:
self.onUserStreamError(u'User stream reconnected')
#----------------------------------------------------------------------
def closeUserStream(self):
""""""
self.userStreamActive = False
self.userStreamThread.join()
self.keepaliveThread.join()
#----------------------------------------------------------------------
def connectUserStream(self):
""""""
try:
self.userStreamWs = create_connection(self.userStreamUrl,
sslopt={'cert_reqs': ssl.CERT_NONE})
return True
except:
msg = traceback.format_exc()
self.onUserStreamError('Connecting user stream falied: %s' %msg)
return False
#----------------------------------------------------------------------
def onUserStreamError(self, msg):
""""""
print(msg)
#----------------------------------------------------------------------
def onUserData(self, data):
""""""
print(data)
#----------------------------------------------------------------------
def runKeepalive(self):
""""""
while self.userStreamActive:
self.keepaliveCount += 1
if self.keepaliveCount >= 1800:
self.keepaliveCount = 0
self.keepaliveStream(self.userStreamKey)
sleep(1)

View File

@ -0,0 +1 @@
from .vnbitfinex import BitfinexApi

View File

@ -0,0 +1,136 @@
# encoding: UTF-8
from __future__ import print_function
import json
import requests
import traceback
import ssl
from threading import Thread
from queue import Queue, Empty
import websocket
from six.moves import input
WEBSOCKET_V2_URL = 'wss://api.bitfinex.com/ws/2'
RESTFUL_V1_URL = 'https://api.bitfinex.com/v1'
########################################################################
class BitfinexApi(object):
""""""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.ws = None
self.thread = None
self.active = False
self.restQueue = Queue()
self.restThread = None
#----------------------------------------------------------------------
def start(self):
""""""
self.ws = websocket.create_connection(WEBSOCKET_V2_URL,
sslopt={'cert_reqs': ssl.CERT_NONE})
self.active = True
self.thread = Thread(target=self.run)
self.thread.start()
self.restThread = Thread(target=self.runRest)
self.restThread.start()
self.onConnect()
#----------------------------------------------------------------------
def reconnect(self):
""""""
self.ws = websocket.create_connection(WEBSOCKET_V2_URL,
sslopt={'cert_reqs': ssl.CERT_NONE})
self.onConnect()
#----------------------------------------------------------------------
def run(self):
""""""
while self.active:
try:
stream = self.ws.recv()
data = json.loads(stream)
self.onData(data)
except:
msg = traceback.format_exc()
self.onError(msg)
self.reconnect()
#----------------------------------------------------------------------
def close(self):
""""""
self.active = False
if self.thread:
self.thread.join()
if self.restThread:
self.thread.join()
#----------------------------------------------------------------------
def onConnect(self):
""""""
print('connected')
#----------------------------------------------------------------------
def onData(self, data):
""""""
print(data)
#----------------------------------------------------------------------
def onError(self, msg):
""""""
print(msg)
#----------------------------------------------------------------------
def sendReq(self, req):
""""""
self.ws.send(json.dumps(req))
#----------------------------------------------------------------------
def sendRestReq(self, path, callback):
""""""
self.restQueue.put((path, callback))
#----------------------------------------------------------------------
def runRest(self):
""""""
while self.active:
try:
path, callback = self.restQueue.get(timeout=1)
self.httpGet(path, callback)
except Empty:
pass
#----------------------------------------------------------------------
def httpGet(self, path, callback):
""""""
url = RESTFUL_V1_URL + path
resp = requests.get(url)
callback(resp.json())
if __name__ == '__main__':
api = BitfinexApi()
api.start()
d = {
'event': 'subscribe',
'channel': 'book',
'symbol': 'BTCUSD'
}
api.sendReq(d)
input()

View File

@ -0,0 +1 @@
from .vnbithumb import BithumbRestApi

View File

@ -0,0 +1,166 @@
# encoding: UTF-8
import base64
import hashlib
import hmac
import urllib
from multiprocessing.dummy import Pool
from time import time
import requests
from six.moves import input
from queue import Queue, Empty
REST_HOST = 'https://api.bithumb.com'
########################################################################
class BithumbRestApi(object):
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.apiKey = ''
self.apiSecret = ''
self.active = False
self.reqid = 0
self.queue = Queue()
self.pool = None # type: Pool
self.sessionDict = {}
#----------------------------------------------------------------------
def init(self, apiKey, apiSecret):
"""初始化"""
self.apiKey = str(apiKey)
self.apiSecret = str(apiSecret)
#----------------------------------------------------------------------
def start(self, n=10):
"""启动"""
if self.active:
return
self.active = True
self.pool = Pool(n)
self.pool.map_async(self.run, range(n))
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.active = False
if self.pool:
self.pool.close()
self.pool.join()
#----------------------------------------------------------------------
def addReq(self, method, path, callback, params=None, postdict=None):
"""添加请求"""
self.reqid += 1
req = (method, path, callback, params, postdict, self.reqid)
self.queue.put(req)
return self.reqid
def processReq(self, req, i):
"""处理请求"""
method, path, callback, params, postdict, reqid = req
url = REST_HOST + path
body = ''
header = {}
# 如果调用的是需要签名的API则加上签名
# 不是以/public/开头的API都需要签名
if path[:8] != '/public/':
nonce = BithumbRestApi.generateNonce()
# 如果有参数使用urlencode编码参数
body = urllib.urlencode(postdict) if postdict else ''
# 加上签名
header = {
'Api-Key': self.apiKey,
'Api-Sign': self.generateSignature(path, body, nonce),
'Api-Nonce': nonce,
'Content-Type': 'application/x-www-form-urlencoded'
}
try:
# 使用长连接的session比短连接的耗时缩短20%
session = self.sessionDict[i]
resp = session.request(method, url, headers=header, params=params, data=body)
code = resp.status_code
d = resp.json()
if code == 200:
callback(d, reqid)
else:
self.onError(code, str(d))
except Exception as e:
self.onError(type(e), e.message)
#----------------------------------------------------------------------
def run(self, i):
"""连续运行"""
self.sessionDict[i] = requests.Session()
while self.active:
try:
req = self.queue.get(timeout=1)
self.processReq(req, i)
except Empty:
pass
#----------------------------------------------------------------------
def generateSignature(self, path, body, nonce):
"""生成签名"""
# 要签名的数据包括pathbody和nonce用\x00隔开
data = path + chr(0) + body + chr(0) + nonce
# 签名的核心方法hmac-sha512
# 签名流程base64(hex(hmac-sha512(要签名的数据)))
return base64.b64encode(hmac.new(bytes(self.apiSecret), data, hashlib.sha512).hexdigest())
#----------------------------------------------------------------------
def onError(self, code, error):
"""错误回调"""
print('on error')
print(code, error)
#----------------------------------------------------------------------
def onData(self, data, reqid):
"""通用回调"""
print('on data')
print(data, reqid)
#----------------------------------------------------------------------
@staticmethod # 静态函数:不依赖于self的函数
def generateNonce():
"""生成时间戳"""
return str(int(time() * 1000))
if __name__ == '__main__':
API_KEY = '0c2f5621ac18d26d51ce640b25eb44f9'
API_SECRET = '62bb8b4e263476f443f8d3dbf0aad6bc'
rest = BithumbRestApi()
rest.init(apiKey=API_KEY, apiSecret=API_SECRET)
rest.start(1)
def onBtcTick(jsonObj, reqid):
print('on_btc_tick : \n{}'.format(jsonObj))
pass
def onAccountInfo(jsonObj, reqid):
print('on_account_info : \n{}'.format(jsonObj))
pass
rest.addReq('POST', '/public/ticker/BTC', onBtcTick)
rest.addReq('POST', '/info/account', onAccountInfo, postdict={'currency': 'BTC'})
input()

View File

@ -0,0 +1 @@
from .vnbitmex import BitmexRestApi, BitmexWebsocketApi

273
vnpy/api/bitmex/vnbitmex.py Normal file
View File

@ -0,0 +1,273 @@
# encoding: UTF-8
from __future__ import print_function
import hashlib
import hmac
import json
import ssl
import traceback
from queue import Queue, Empty
from multiprocessing.dummy import Pool
from time import time
from urlparse import urlparse
from copy import copy
from urllib import urlencode
from threading import Thread
from six.moves import input
import requests
import websocket
REST_HOST = 'https://www.bitmex.com/api/v1'
WEBSOCKET_HOST = 'wss://www.bitmex.com/realtime'
########################################################################
class BitmexRestApi(object):
"""REST API"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.apiKey = ''
self.apiSecret = ''
self.active = False
self.reqid = 0
self.queue = Queue()
self.pool = None
self.sessionDict = {} # 会话对象字典
self.header = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
}
#----------------------------------------------------------------------
def init(self, apiKey, apiSecret):
"""初始化"""
self.apiKey = apiKey
self.apiSecret = apiSecret
#----------------------------------------------------------------------
def start(self, n=3):
"""启动"""
if self.active:
return
self.active = True
self.pool = Pool(n)
self.pool.map_async(self.run, range(n))
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.active = False
if self.pool:
self.pool.close()
self.pool.join()
#----------------------------------------------------------------------
def addReq(self, method, path, callback, params=None, postdict=None):
"""添加请求"""
self.reqid += 1
req = (method, path, callback, params, postdict, self.reqid)
self.queue.put(req)
return self.reqid
#----------------------------------------------------------------------
def processReq(self, req, i):
"""处理请求"""
method, path, callback, params, postdict, reqid = req
url = REST_HOST + path
expires = int(time() + 5)
rq = requests.Request(url=url, data=postdict)
p = rq.prepare()
header = copy(self.header)
header['api-expires'] = str(expires)
header['api-key'] = self.apiKey
header['api-signature'] = self.generateSignature(method, path, expires, params, body=p.body)
# 使用长连接的session比短连接的耗时缩短80%
session = self.sessionDict[i]
resp = session.request(method, url, headers=header, params=params, data=postdict)
#resp = requests.request(method, url, headers=header, params=params, data=postdict)
code = resp.status_code
d = resp.json()
if code == 200:
callback(d, reqid)
else:
self.onError(code, d)
#----------------------------------------------------------------------
def run(self, i):
"""连续运行"""
self.sessionDict[i] = requests.Session()
while self.active:
try:
req = self.queue.get(timeout=1)
self.processReq(req, i)
except Empty:
pass
#----------------------------------------------------------------------
def generateSignature(self, method, path, expires, params=None, body=None):
"""生成签名"""
# 对params在HTTP报文路径中以请求字段方式序列化
if params:
query = urlencode(sorted(params.items()))
path = path + '?' + query
if body is None:
body = ''
msg = method + '/api/v1' + path + str(expires) + body
signature = hmac.new(self.apiSecret, msg,
digestmod=hashlib.sha256).hexdigest()
return signature
#----------------------------------------------------------------------
def onError(self, code, error):
"""错误回调"""
print('on error')
print(code, error)
#----------------------------------------------------------------------
def onData(self, data, reqid):
"""通用回调"""
print('on data')
print(data, reqid)
########################################################################
class BitmexWebsocketApi(object):
"""Websocket API"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.ws = None
self.thread = None
self.active = False
#----------------------------------------------------------------------
def start(self):
"""启动"""
self.ws = websocket.create_connection(WEBSOCKET_HOST,
sslopt={'cert_reqs': ssl.CERT_NONE})
self.active = True
self.thread = Thread(target=self.run)
self.thread.start()
self.onConnect()
#----------------------------------------------------------------------
def reconnect(self):
"""重连"""
self.ws = websocket.create_connection(WEBSOCKET_HOST,
sslopt={'cert_reqs': ssl.CERT_NONE})
self.onConnect()
#----------------------------------------------------------------------
def run(self):
"""运行"""
while self.active:
try:
stream = self.ws.recv()
data = json.loads(stream)
self.onData(data)
except:
msg = traceback.format_exc()
self.onError(msg)
self.reconnect()
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.active = False
if self.thread:
self.thread.join()
#----------------------------------------------------------------------
def onConnect(self):
"""连接回调"""
print('connected')
#----------------------------------------------------------------------
def onData(self, data):
"""数据回调"""
print('-' * 30)
l = data.keys()
l.sort()
for k in l:
print(k, data[k])
#----------------------------------------------------------------------
def onError(self, msg):
"""错误回调"""
print(msg)
#----------------------------------------------------------------------
def sendReq(self, req):
"""发出请求"""
self.ws.send(json.dumps(req))
if __name__ == '__main__':
API_KEY = ''
API_SECRET = ''
## REST测试
rest = BitmexRestApi()
rest.init(API_KEY, API_SECRET)
rest.start(3)
data = {
'symbol': 'XBTUSD'
}
rest.addReq('POST', '/position/isolate', rest.onData, postdict=data)
#rest.addReq('GET', '/instrument', rest.onData)
# WEBSOCKET测试
#ws = BitmexWebsocketApi()
#ws.start()
#req = {"op": "subscribe", "args": ['order', 'trade', 'position', 'margin']}
#ws.sendReq(req)
#expires = int(time())
#method = 'GET'
#path = '/realtime'
#msg = method + path + str(expires)
#signature = hmac.new(API_SECRET, msg, digestmod=hashlib.sha256).hexdigest()
#req = {
#'op': 'authKey',
#'args': [API_KEY, expires, signature]
#}
#ws.sendReq(req)
#req = {"op": "subscribe", "args": ['order', 'execution', 'position', 'margin']}
#req = {"op": "subscribe", "args": ['instrument']}
#ws.sendReq(req)
input()

View File

@ -0,0 +1 @@
from .vncoinbase import CoinbaseRestApi, CoinbaseWebsocketApi

View File

@ -0,0 +1,294 @@
# encoding: UTF-8
from __future__ import print_function
import hashlib
import hmac
import base64
import json
import ssl
import traceback
from queue import Queue, Empty
from multiprocessing.dummy import Pool
from time import time
from urlparse import urlparse
from copy import copy
from urllib import urlencode
from threading import Thread
from six.moves import input
import requests
import websocket
REST_HOST = 'https://api-public.sandbox.pro.coinbase.com'
WEBSOCKET_HOST = 'wss://ws-feed-public.sandbox.pro.coinbase.com'
########################################################################
class CoinbaseRestApi(object):
"""REST API"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.apiKey = ''
self.secretKey = ''
self.passphrase = ''
self.hmacKey = ''
self.active = False
self.reqid = 0
self.queue = Queue()
self.pool = None
self.sessionDict = {} # 会话对象字典
self.header = {
'Content-Type': 'Application/JSON'
}
#----------------------------------------------------------------------
def init(self, apiKey, secretKey, passphrase):
"""初始化"""
self.apiKey = apiKey
self.secretKey = secretKey
self.passphrase = passphrase
self.hmacKey = base64.b64decode(self.secretKey)
#----------------------------------------------------------------------
def start(self, n=10):
"""启动"""
if self.active:
return
self.active = True
self.pool = Pool(n)
self.pool.map_async(self.run, range(n))
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.active = False
if self.pool:
self.pool.close()
self.pool.join()
#----------------------------------------------------------------------
def addReq(self, method, path, callback, params=None, postdict=None):
"""添加请求"""
self.reqid += 1
req = (method, path, callback, params, postdict, self.reqid)
self.queue.put(req)
return self.reqid
#----------------------------------------------------------------------
def processReq(self, req, i):
"""处理请求"""
method, path, callback, params, postdict, reqid = req
url = REST_HOST + path
timestamp = str(time())
if postdict:
rq = requests.Request(url=url, data=json.dumps(postdict))
else:
rq = requests.Request(url=url)
p = rq.prepare()
header = copy(self.header)
header['CB-ACCESS-KEY'] = self.apiKey
header['CB-ACCESS-PASSPHRASE'] = self.passphrase
header['CB-ACCESS-TIMESTAMP'] = timestamp
header['CB-ACCESS-SIGN'] = self.generateSignature(method, path, timestamp, params, body=p.body)
# 使用长连接的session比短连接的耗时缩短80%
session = self.sessionDict[i]
if postdict:
resp = session.request(method, url, headers=header, params=params, data=json.dumps(postdict))
else:
resp = session.request(method, url, headers=header, params=params)
#resp = requests.request(method, url, headers=header, params=params, data=postdict)
code = resp.status_code
d = resp.json()
if code == 200:
callback(d, reqid)
else:
self.onError(code, d)
#----------------------------------------------------------------------
def run(self, i):
"""连续运行"""
self.sessionDict[i] = requests.Session()
while self.active:
try:
req = self.queue.get(timeout=1)
self.processReq(req, i)
except Empty:
pass
#----------------------------------------------------------------------
def generateSignature(self, method, path, timestamp, params=None, body=None):
"""生成签名"""
# 对params在HTTP报文路径中以请求字段方式序列化
if params:
query = urlencode(sorted(params.items()))
path = path + '?' + query
if body is None:
body = ''
msg = timestamp + method + path + body
msg = msg.encode('ascii')
signature = hmac.new(self.hmacKey, msg, hashlib.sha256)
signature64 = base64.b64encode(signature.digest()).decode('utf-8')
return signature64
#----------------------------------------------------------------------
def onError(self, code, error):
"""错误回调"""
print('on error')
print(code, error)
#----------------------------------------------------------------------
def onData(self, data, reqid):
"""通用回调"""
print('on data')
print(data, reqid)
########################################################################
class CoinbaseWebsocketApi(object):
"""Websocket API"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.ws = None
self.thread = None
self.active = False
#----------------------------------------------------------------------
def start(self):
"""启动"""
self.ws = websocket.create_connection(WEBSOCKET_HOST,
sslopt={'cert_reqs': ssl.CERT_NONE})
self.active = True
self.thread = Thread(target=self.run)
self.thread.start()
self.onConnect()
#----------------------------------------------------------------------
def reconnect(self):
"""重连"""
self.ws = websocket.create_connection(WEBSOCKET_HOST,
sslopt={'cert_reqs': ssl.CERT_NONE})
self.onConnect()
#----------------------------------------------------------------------
def run(self):
"""运行"""
while self.active:
try:
stream = self.ws.recv()
data = json.loads(stream)
self.onData(data)
except:
msg = traceback.format_exc()
self.onError(msg)
self.reconnect()
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.active = False
if self.thread:
self.thread.join()
#----------------------------------------------------------------------
def onConnect(self):
"""连接回调"""
print('connected')
#----------------------------------------------------------------------
def onData(self, data):
"""数据回调"""
print('-' * 30)
l = data.keys()
l.sort()
for k in l:
print(k, data[k])
#----------------------------------------------------------------------
def onError(self, msg):
"""错误回调"""
print(msg)
#----------------------------------------------------------------------
def sendReq(self, req):
"""发出请求"""
self.ws.send(json.dumps(req))
if __name__ == '__main__':
API_KEY = '2982e190ce2785b862c36f7748ec6864'
API_SECRET = 'sUXjm5HZKA+Dru9+dtekGF6DlfQnHvbQCs+DaTuOTSBFR+vvMIiWkpPTwHcfZwNapSRpFhjNerrb111hojazIA=='
PASSPHRASE = 'vnpytesting'
# REST测试
rest = CoinbaseRestApi()
rest.init(API_KEY, API_SECRET, PASSPHRASE)
rest.start(1)
#data = {
#'symbol': 'XBTUSD'
#}
#rest.addReq('POST', '/position/isolate', rest.onData, postdict=data)
rest.addReq('GET', '/orders', rest.onData, {'status': 'all'})
## WEBSOCKET测试
#ws = CoinbaseWebsocketApi()
#ws.start()
#req = {
#'type': 'subscribe',
#"product_ids": [
#"ETH-USD"
#],
#"channels": ['level2']
#}
#ws.sendReq(req)
#expires = int(time())
#method = 'GET'
#path = '/realtime'
#msg = method + path + str(expires)
#signature = hmac.new(API_SECRET, msg, digestmod=hashlib.sha256).hexdigest()
#req = {
#'op': 'authKey',
#'args': [API_KEY, expires, signature]
#}
#ws.sendReq(req)
#req = {"op": "subscribe", "args": ['order', 'execution', 'position', 'margin']}
#req = {"op": "subscribe", "args": ['instrument']}
#ws.sendReq(req)
input()

View File

@ -0,0 +1 @@
from .vnfcoin import FcoinRestApi, FcoinWebsocketApi

294
vnpy/api/fcoin/vnfcoin.py Normal file
View File

@ -0,0 +1,294 @@
# encoding: UTF-8
from __future__ import print_function
import hashlib
import hmac
import json
import ssl
import traceback
import base64
from queue import Queue, Empty
from multiprocessing.dummy import Pool
from time import time
from urlparse import urlparse
from copy import copy
from urllib import urlencode
from threading import Thread
import requests
import websocket
from six.moves import input
REST_HOST = 'https://api.fcoin.com/v2'
WEBSOCKET_HOST = 'wss://api.fcoin.com/v2/ws'
########################################################################
class FcoinRestApi(object):
"""REST API"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.apiKey = ''
self.apiSecret = ''
self.active = False
self.reqid = 0
self.queue = Queue()
self.pool = None
self.sessionDict = {} # 会话对象字典
#----------------------------------------------------------------------
def init(self, apiKey, apiSecret):
"""初始化"""
self.apiKey = str(apiKey)
self.apiSecret = str(apiSecret)
#----------------------------------------------------------------------
def start(self, n=10):
"""启动"""
if self.active:
return
self.active = True
self.pool = Pool(n)
self.pool.map_async(self.run, range(n))
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.active = False
if self.pool:
self.pool.close()
self.pool.join()
#----------------------------------------------------------------------
def addReq(self, method, path, callback, params=None, postdict=None):
"""添加请求"""
self.reqid += 1
req = (method, path, callback, params, postdict, self.reqid)
self.queue.put(req)
return self.reqid
#----------------------------------------------------------------------
def processReq(self, req, i):
"""处理请求"""
method, path, callback, params, postdict, reqid = req
url = REST_HOST + path
timestamp = str(int(time()) * 1000)
header = {}
header['FC-ACCESS-TIMESTAMP'] = timestamp
header['FC-ACCESS-KEY'] = self.apiKey
header['FC-ACCESS-SIGNATURE'] = self.generateSignature(method, url, timestamp, params, postdict)
try:
# 使用长连接的session比短连接的耗时缩短80%
session = self.sessionDict[i]
resp = session.request(method, url, headers=header, params=params, json=postdict)
#resp = requests.request(method, url, headers=header, params=params, data=postdict)
#if method != 'GET':
#print '-' * 30
#print 'method', method
#print 'url', url
#print 'header', header
#print 'params', params
#print 'postdict', postdict
code = resp.status_code
d = resp.json()
if code == 200:
callback(d, reqid)
else:
self.onError(code, d)
except Exception as e:
self.onError(type(e), e.message)
#----------------------------------------------------------------------
def run(self, i):
"""连续运行"""
self.sessionDict[i] = requests.Session()
while self.active:
try:
req = self.queue.get(timeout=1)
self.processReq(req, i)
except Empty:
pass
#----------------------------------------------------------------------
def generateSignature(self, method, path, timestamp, params=None, postdict=None):
"""生成签名"""
# 对params在HTTP报文路径中以请求字段方式序列化
if params:
query = urlencode(sorted(params.items()))
path = path + '?' + query
if postdict:
post = urlencode(sorted(postdict.items()))
else:
post = ''
msg = method + path + timestamp + post
msg = base64.b64encode(msg)
signature = hmac.new(self.apiSecret, msg, digestmod=hashlib.sha1).digest()
signature = base64.b64encode(signature)
return signature
#----------------------------------------------------------------------
def onError(self, code, error):
"""错误回调"""
print('on error')
print(code, error)
#----------------------------------------------------------------------
def onData(self, data, reqid):
"""通用回调"""
print('on data')
print(data, reqid)
########################################################################
class FcoinWebsocketApi(object):
"""Websocket API"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.ws = None
self.thread = None
self.active = False
#----------------------------------------------------------------------
def start(self):
"""启动"""
self.ws = websocket.create_connection(WEBSOCKET_HOST,
sslopt={'cert_reqs': ssl.CERT_NONE})
self.active = True
self.thread = Thread(target=self.run)
self.thread.start()
self.onConnect()
#----------------------------------------------------------------------
def reconnect(self):
"""重连"""
self.ws = websocket.create_connection(WEBSOCKET_HOST,
sslopt={'cert_reqs': ssl.CERT_NONE})
self.onConnect()
#----------------------------------------------------------------------
def run(self):
"""运行"""
while self.active:
try:
stream = self.ws.recv()
data = json.loads(stream)
self.onData(data)
except:
msg = traceback.format_exc()
self.onError(msg)
self.reconnect()
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.active = False
if self.thread:
self.thread.join()
#----------------------------------------------------------------------
def onConnect(self):
"""连接回调"""
print('connected')
#----------------------------------------------------------------------
def onData(self, data):
"""数据回调"""
print('-' * 30)
l = data.keys()
l.sort()
for k in l:
print(k, data[k])
#----------------------------------------------------------------------
def onError(self, msg):
"""错误回调"""
print(msg)
#----------------------------------------------------------------------
def sendReq(self, req):
"""发出请求"""
self.ws.send(json.dumps(req))
if __name__ == '__main__':
from datetime import datetime
from time import sleep
API_KEY = '88893f839fbd49f4b5fcb03e7c15c015'
API_SECRET = 'ef383295cf4e4c128e6d18d7e9564b12'
# REST测试
rest = FcoinRestApi()
rest.init(API_KEY, API_SECRET)
rest.start(3)
#rest.addReq('GET', '/accounts/balance', rest.onData)
# 查委托
#states = ['submitted', 'partial_filled', 'partial_canceled',
#'filled', 'canceled', 'pending_cancel']
#req = {
#'symbol': 'ethusdt',
#'start': datetime.now().strftime('%Y%m%d'),
#'states': 'submitted',
#'limit': 500
#}
#for i in range(10):
#rest.addReq('GET', '/orders', rest.onData, params=req)
#sleep(2)
req = {
'symbol': 'ethusdt',
'side': 'buy',
'type': 'limit',
'price': 300,
'amount': 0.01
}
rest.addReq('POST', '/orders', rest.onData, postdict=req)
#sleep(1)
#rest.addReq('POST', '/orders', rest.onData, params=req)
## WS测试
#ws = FcoinWebsocketApi()
#ws.start()
#req = {
#'cmd': 'sub',
#'args': ['depth.L20.btcusdt'],
#'id': 1
#}
#ws.sendReq(req)
input()

View File

@ -1,3 +1,4 @@
# encoding: UTF-8
from vnhuobi import TradeApi, DataApi
from __future__ import absolute_import
from .vnhuobi import TradeApi, DataApi

View File

@ -5,7 +5,8 @@
#import zlib
#import time
from vnhuobi import DataApi
from __future__ import absolute_import
from .vnhuobi import DataApi
#if __name__ == '__main__':
#while(1):

Some files were not shown because too many files have changed in this diff Show More