Merge pull request #1300 from vnpy/dev

发布v1.9.2 beta版本
This commit is contained in:
vn.py 2018-12-24 20:51:42 +08:00 committed by GitHub
commit af9b8bb179
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 1498 additions and 1540 deletions

View File

@ -87,27 +87,29 @@ vn.py是基于Python的开源量化交易程序开发框架起源于国内私
* AlgoTrading算法交易模块提供多种常用的智能交易算法TWAP、Sniper、BestLimit、Iceberg、Arbitrage等等支持数据库配置保存、CSV文件加载启动以及RPC跨进程算法交易服务
* TradeCopy复制交易模块用户可以通过发布者Provider进程来对外提供交易策略信号手动、策略均可订阅者Subscriber进程根据收到的信号自动执行同步交易简洁快速得实现一拖多账户交易功能
* RiskManager前端风控模块负责在交易系统将任何交易请求发出到柜台前的一系列标准检查操作支持用户自定义风控规则的扩展
* DataRecorder实盘行情记录支持Tick和K线数据的落地用于策略开发回测以及实盘运行初始化
* RpcServiceRPC跨进程调用服务基于MainEngineProxy组件用户可以如同开发单一进程应用搬开发多进程架构的复杂交易应用
* RtdServiceEXCEL RTD服务组件通过pyxll模块提供EXCEL表格系统对VnTrader系统内所有数据的访问和功能调用未完成
* RtdServiceEXCEL RTD服务组件通过pyxll模块提供EXCEL表格系统对VN Trader系统内所有数据的访问
5. 数据相关的API接口vnpy.data用于构建和更新历史行情数据库目前包括
* 上海中期历史行情服务shcifco
* 通联数据API下载服务datayes
* 天勤行情数据接口tq
* 上海中期历史行情服务shcifco
6. 关于vn.py项目的应用演示examples对于新手而言可以从这里开始学习vn.py项目的使用方式
8. vn.py项目的Docker镜像docker,目前尚未完成
8. vn.py项目的Docker镜像docker
9. [官方论坛](http://www.vnpie.com)和[知乎专栏](http://zhuanlan.zhihu.com/vn-py)内容包括vn.py项目的开发教程和Python在量化交易领域的应用研究等内容
* web docker在Docker中启动基于Web交易的交易服务器WebTrader在浏览器中实现CTA策略的运维操作
* vnc docker内嵌了完整的vn.py图形化运行环境Linux并通过VNC Server对外提供虚拟桌面访问
9. [社区论坛](http://www.vnpy.com)和[知乎专栏](http://zhuanlan.zhihu.com/vn-py)内容包括vn.py项目的开发教程和Python在量化交易领域的应用研究等内容
10. 官方交流QQ群262656087管理较严格定期清除长期潜水的成员
@ -152,9 +154,9 @@ sudo /home/vnpy/anaconda2/bin/conda install -c quantopian ta-lib=0.4.9
1. 在[SimNow](http://simnow.com.cn/)注册CTP仿真账号记下你的**账号、密码、经纪商编号**,然后下载快期查询你的**交易和行情服务器地址**
2. 找到vn.py应用示例目录examples打开examples\VnTrader\CTP_connect.json修改账号、密码、服务器等为上一步注册完成后你的信息注意使用专门的编程编辑器如Sublime Text等防止json编码出错
2. 找到vn.py应用示例目录examples打开examples\VN Trader\CTP_connect.json修改账号、密码、服务器等为上一步注册完成后你的信息注意使用专门的编程编辑器如Sublime Text等防止json编码出错
3. 找到VnTrader的启动入口run.py并双击运行若无法双击则在当前目录按住Shift点鼠标右键打开cmd输入python run.py运行run.py内容如下
3. 找到VN Trader的启动入口run.py并双击运行若无法双击则在当前目录按住Shift点鼠标右键打开cmd输入python run.py运行run.py内容如下
```
# encoding: UTF-8

0
beta/gateway/__init__.py Normal file
View File

View File

View File

@ -0,0 +1,302 @@
# encoding: UTF-8
'''
'''
from __future__ import print_function
import base64
import hashlib
import hmac
import json
import re
import urllib
import zlib
from vnpy.api.rest import Request, RestClient
from vnpy.api.websocket import WebsocketClient
from vnpy.trader.vtGateway import *
REST_HOST = 'https://api.huobipro.com'
WEBSOCKET_MARKET_HOST = 'wss://api.huobi.pro/ws' # Global站行情
WEBSOCKET_ASSETS_HOST = 'wss://api.huobi.pro/ws/v1' # 资产和订单
WEBSOCKET_CONTRACT_HOST = 'wss://www.hbdm.com/ws' # 合约站行情
#----------------------------------------------------------------------
def _split_url(url):
"""
将url拆分为host和path
:return: host, path
"""
m = re.match('\w+://([^/]*)(.*)', url)
if m:
return m.group(1), m.group(2)
#----------------------------------------------------------------------
def createSignature(apiKey, method, host, path, secretKey):
"""创建签名"""
sortedParams = (
("AccessKeyId", apiKey),
("SignatureMethod", 'HmacSHA256'),
("SignatureVersion", "2"),
("Timestamp", datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S'))
)
encodeParams = urllib.urlencode(sortedParams)
payload = [method, host, path, encodeParams]
payload = '\n'.join(payload)
payload = payload.encode(encoding='UTF8')
secretKey = secretKey.encode(encoding='UTF8')
digest = hmac.new(secretKey, payload, digestmod=hashlib.sha256).digest()
signature = base64.b64encode(digest)
params = dict(sortedParams)
params["Signature"] = signature
return params
########################################################################
class HuobiRestApi(RestClient):
def __init__(self, gateway): # type: (VtGateway)->HuobiRestApi
super(HuobiRestApi, self).__init__()
self.gateway = gateway
self.gatewayName = gateway.gatewayName
self.apiKey = ""
self.apiSecret = ""
self.signHost = ""
#----------------------------------------------------------------------
def sign(self, request):
request.headers = {
"User-Agent":
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.71 Safari/537.36"}
additionalParams = createSignature(self.apiKey,
request.method,
self.signHost,
request.path,
self.apiSecret)
if not request.params:
request.params = additionalParams
else:
request.params.update(additionalParams)
if request.method == "POST":
request.headers['Content-Type'] = 'application/json'
return request
#----------------------------------------------------------------------
def connect(self, apiKey, apiSecret, sessionCount=3):
"""连接服务器"""
self.apiKey = apiKey
self.apiSecret = apiSecret
host, path = _split_url(REST_HOST)
self.init(REST_HOST)
self.signHost = host
self.start(sessionCount)
#----------------------------------------------------------------------
def qeuryAccount(self):
self.addRequest('GET', '/v1/account/accounts', self.onAccount)
#----------------------------------------------------------------------
def onAccount(self, data, request): # type: (dict, Request)->None
pass
#----------------------------------------------------------------------
def cancelWithdraw(self, id):
self.addRequest('POST',
"/v1/dw/withdraw-virtual/" + str(id) + "/cancel",
self.onWithdrawCanceled
)
#----------------------------------------------------------------------
def onWithdrawCanceled(self, data, request): # type: (dict, Request)->None
pass
########################################################################
class HuobiWebsocketApiBase(WebsocketClient):
#----------------------------------------------------------------------
def __init__(self, gateway):
"""Constructor"""
super(HuobiWebsocketApiBase, self).__init__()
self.gateway = gateway
self.gatewayName = gateway.gatewayName
self.apiKey = ''
self.apiSecret = ''
self.signHost = ''
self.path = ''
#----------------------------------------------------------------------
def connect(self, apiKey, apiSecret, url):
""""""
self.apiKey = apiKey
self.apiSecret = apiSecret
host, path = _split_url(url)
self.init(url)
self.signHost = host
self.path = path
self.start()
#----------------------------------------------------------------------
def login(self):
params = {
'op': 'auth',
}
params.update(
createSignature(self.apiKey,
'GET',
self.signHost,
self.path,
self.apiSecret)
)
return self.sendPacket(params)
#----------------------------------------------------------------------
def onLogin(self, packet):
pass
#----------------------------------------------------------------------
@staticmethod
def unpackData(data):
return json.loads(zlib.decompress(data, 31))
#----------------------------------------------------------------------
def onPacket(self, packet):
"""
这里我新增了一个onHuobiPacket的函数也可以让子类重写这个函数然后调用super.onPacket
"""
if 'ping' in packet:
self.sendPacket({'pong': packet['ping']})
return
# todo: use another error handing method
if 'err-msg' in packet:
return self.onHuobiErrorPacket(packet)
if "op" in packet and packet["op"] == "auth":
return self.onLogin(packet)
self.onHuobiPacket(packet)
#----------------------------------------------------------------------
def onHuobiPacket(self, packet): # type: (dict)->None
pass
#----------------------------------------------------------------------
def onHuobiErrorPacket(self, packet): # type: (dict)->None
print("error : {}".format(packet))
########################################################################
class HuobiAssetsWebsocketApi(HuobiWebsocketApiBase):
def connect(self, apiKey, apiSecret, host=WEBSOCKET_ASSETS_HOST):
"""
这里我使用重写connect添加了默认参数这样写感觉~~不太好~~不过目前想到的比较好的方式就是这样了
虽然在Python中可以直接把这个connect()写成不接收host和path的形式但是PyCharm会提示重载错误所以不接收host和path似乎不太好
我觉得最好的写法应该是这个函数不接收host和path同时为了让PyCharm不提示重载错误(减少歧义)应该给
HuobiWebsocketApiBase.connect起另外一个名字
"""
return super(HuobiAssetsWebsocketApi, self). \
connect(apiKey, apiSecret, host)
#----------------------------------------------------------------------
def onConnected(self):
self.login()
#----------------------------------------------------------------------
def subscribeAccount(self):
"""
:param symbol: str ethbtc, ltcbtc, etcbtc, bchbtc
:param period: str 1min, 5min, 15min, 30min, 60min, 1day, 1mon, 1week, 1year
"""
self.sendPacket({
"op": "sub",
"cid": "any thing you want",
"topic": "accounts"
})
#----------------------------------------------------------------------
def onHuobiPacket(self, packet): # type: (dict)->None
if 'op' in packet:
if packet['op'] == 'sub':
timestamp = packet['ts']
topic = packet['topic']
"""
"data": {
"event": "order.match|order.place|order.refund|order.cancel|order.fee-refund|margin.transfer|margin.loan|margin.interest|margin.repay|other",
"list": [
{
"account-id": 419013,
"currency": "usdt",
"type": "trade",
"balance": "500009195917.4362872650"
},
{
"account-id": 419013,
"currency": "btc",
"type": "frozen",
"balance": "9786.6783000000"
}
]
}
"""
pass
########################################################################
class HuobiMarketWebsocketApi(HuobiWebsocketApiBase):
#----------------------------------------------------------------------
def connect(self, apiKey, apiSecret, host=WEBSOCKET_MARKET_HOST):
"""
这里我使用重写connect添加了默认参数这样写感觉~~不太好~~不过目前想到的比较好的方式就是这样了
虽然在Python中可以直接把这个connect()写成不接收host和path的形式但是PyCharm会提示重载错误所以不接收host和path似乎不太好
我觉得最好的写法应该是这个函数不接收host和path同时为了让PyCharm不提示重载错误(减少歧义)应该给
HuobiWebsocketApiBase.connect起另外一个名字
"""
return super(HuobiMarketWebsocketApi, self). \
connect(apiKey, apiSecret, host)
#----------------------------------------------------------------------
def subscribeKLine(self, symbol, period): # type:(str, str)->None
"""
:param symbol: str ethbtc, ltcbtc, etcbtc, bchbtc
:param period: str 1min, 5min, 15min, 30min, 60min, 1day, 1mon, 1week, 1year
:return:
"""
self.sendPacket({
"sub": "market." + symbol + ".kline." + period,
"id": "any thing you want"
})
#----------------------------------------------------------------------
def onHuobiPacket(self, packet): # type: (dict)->None
# code for test purpose only
if 'ch' in packet:
if packet['ch'] == 'market.btcusdt.kline.1min':
timestamp = packet['ts']
data = packet['tick']
id = data['id']
amount = data['amount']
count = data['count']
open = data['open']
close = data['close']
low = data['low']
high = data['high']
vol = data['vol']
pass

View File

@ -37,7 +37,7 @@ class MainWindow(QtWidgets.QMainWindow):
#----------------------------------------------------------------------
def initUi(self):
"""初始化界面"""
self.setWindowTitle('VnTrader')
self.setWindowTitle('VN Crypto')
self.initCentral()
self.initMenu()
self.initStatusBar()
@ -309,7 +309,7 @@ class AboutWidget(QtWidgets.QDialog):
#----------------------------------------------------------------------
def initUi(self):
""""""
self.setWindowTitle(vtText.ABOUT + 'VnTrader')
self.setWindowTitle(vtText.ABOUT + 'VN Crypto')
text = u"""
Developed by Traders, for Traders.

View File

@ -10,7 +10,7 @@ import requests
from pymongo import MongoClient, ASCENDING
from vnpy.trader.vtObject import VtBarData
from vnpy.trader.app.ctaStrategy.ctaBase import MINUTE_DB_NAME
from vnpy.trader.app.ctaStrategy.ctaBase import MINUTE_DB_NAME, DAILY_DB_NAME
# 加载配置
@ -23,7 +23,7 @@ SYMBOLS = setting['SYMBOLS']
mc = MongoClient(MONGO_HOST, MONGO_PORT) # Mongo连接
db = mc[MINUTE_DB_NAME] # 数据库
dbDaily = mc[DAILY_DB_NAME]
#----------------------------------------------------------------------
def generateVtBar(vtSymbol, d):
@ -87,6 +87,49 @@ def downloadMinuteBarBySymbol(vtSymbol, end):
datetime.datetime.fromtimestamp(l[-1]['time']),
cost))
#----------------------------------------------------------------------
def downloadDailyBarBySymbol(vtSymbol):
"""下载某一合约的分钟线数据"""
startTime = time.time()
cl = dbDaily[vtSymbol]
cl.ensure_index([('datetime', ASCENDING)], unique=True)
symbol, exchange = vtSymbol.split('.')
fsym, tsym = symbol.split('/')
url = 'https://min-api.cryptocompare.com/data/histoday'
params = {
'fsym': fsym,
'tsym': tsym,
'e': exchange,
'limit': 2000
}
resp = requests.get(url, headers={}, params=params)
if resp.status_code != 200:
print(u'%s数据下载失败' %vtSymbol)
return
j = resp.json()
l = j['Data']
for d in l:
bar = generateVtBar(vtSymbol, d)
d = bar.__dict__
flt = {'datetime': bar.datetime}
cl.replace_one(flt, d, True)
endTime = time.time()
cost = (endTime - startTime) * 1000
print(u'合约%s数据下载完成%s - %s,耗时%s毫秒' %(vtSymbol,
datetime.datetime.fromtimestamp(l[0]['time']),
datetime.datetime.fromtimestamp(l[-1]['time']),
cost))
#----------------------------------------------------------------------
def downloadAllMinuteBar(end):
"""下载所有配置中的合约的分钟线数据"""

View File

@ -8,7 +8,9 @@ from dataService import *
if __name__ == '__main__':
#downMinuteBarBySymbol('BTC/USDT.OKEX', '20181012')
#downMinuteBarBySymbol('BTC/USDT.HUOBIPRO', '20181012')
#downMinuteBarBySymbol('BTC/USDT.BINANCE', '20181012')
downloadAllMinuteBar('20181012')
#downloadMinuteBarBySymbol('BTC/USDT.OKEX', '20181012')
#downloadMinuteBarBySymbol('BTC/USDT.HUOBIPRO', '20181012')
#downloadMinuteBarBySymbol('BTC/USDT.BINANCE', '20181012')
#downloadAllMinuteBar('20181012')
downloadDailyBarBySymbol('BTC/USDT.BINANCE')

View File

@ -1,9 +0,0 @@
{
"MONGO_HOST": "localhost",
"MONGO_PORT": 27017,
"SYMBOLS": ["510050", "510300"],
"USERNAME": "",
"PASSWORD": ""
}

View File

@ -1,124 +0,0 @@
# encoding: UTF-8
from __future__ import print_function
import sys
import json
from datetime import datetime
from time import time, sleep
from pymongo import MongoClient, ASCENDING
from vnpy.trader.vtObject import VtBarData
from vnpy.trader.app.ctaStrategy.ctaBase import MINUTE_DB_NAME, DAILY_DB_NAME
import rqdatac as rq
# 加载配置
config = open('config.json')
setting = json.load(config)
MONGO_HOST = setting['MONGO_HOST']
MONGO_PORT = setting['MONGO_PORT']
SYMBOLS = setting['SYMBOLS']
mc = MongoClient(MONGO_HOST, MONGO_PORT) # Mongo连接
db = mc[MINUTE_DB_NAME] # 数据库
db2 = mc[DAILY_DB_NAME]
USERNAME = setting['USERNAME']
PASSWORD = setting['PASSWORD']
rq.init(USERNAME, PASSWORD)
FIELDS = ['open', 'high', 'low', 'close', 'volume']
#----------------------------------------------------------------------
def generateVtBar(row, symbol):
"""生成K线"""
bar = VtBarData()
bar.symbol = symbol
bar.vtSymbol = symbol
bar.open = row['open']
bar.high = row['high']
bar.low = row['low']
bar.close = row['close']
bar.volume = row['volume']
bar.datetime = row.name
bar.date = bar.datetime.strftime("%Y%m%d")
bar.time = bar.datetime.strftime("%H:%M:%S")
return bar
#----------------------------------------------------------------------
def downloadMinuteBarBySymbol(symbol):
"""下载某一合约的分钟线数据"""
start = time()
cl = db[symbol]
cl.ensure_index([('datetime', ASCENDING)], unique=True) # 添加索引
df = rq.get_price(symbol, frequency='1m', fields=FIELDS)
for ix, row in df.iterrows():
bar = generateVtBar(row, symbol)
d = bar.__dict__
flt = {'datetime': bar.datetime}
cl.replace_one(flt, d, True)
end = time()
cost = (end - start) * 1000
print(u'合约%s数据下载完成%s - %s,耗时%s毫秒' %(symbol, df.index[0], df.index[-1], cost))
#----------------------------------------------------------------------
def downloadDailyBarBySymbol(symbol):
"""下载某一合约日线数据"""
start = time()
cl = db2[symbol]
cl.ensure_index([('datetime', ASCENDING)], unique=True) # 添加索引
df = rq.get_price(symbol, frequency='1d', fields=FIELDS, end_date=datetime.now().strftime('%Y%m%d'))
for ix, row in df.iterrows():
bar = generateVtBar(row, symbol)
d = bar.__dict__
flt = {'datetime': bar.datetime}
cl.replace_one(flt, d, True)
end = time()
cost = (end - start) * 1000
print(u'合约%s数据下载完成%s - %s,耗时%s毫秒' %(symbol, df.index[0], df.index[-1], cost))
#----------------------------------------------------------------------
def downloadAllMinuteBar():
"""下载所有配置中的合约的分钟线数据"""
print('-' * 50)
print(u'开始下载合约分钟线数据')
print('-' * 50)
# 添加下载任务
for symbol in SYMBOLS:
downloadMinuteBarBySymbol(str(symbol))
print('-' * 50)
print(u'合约分钟线数据下载完成')
print('-' * 50)
#----------------------------------------------------------------------
def downloadAllDailyBar():
"""下载所有配置中的合约的日数据"""
print('-' * 50)
print(u'开始下载合约日线数据')
print('-' * 50)
# 添加下载任务
for symbol in SYMBOLS:
downloadDailyBarBySymbol(str(symbol))
print('-' * 50)
print(u'合约日线数据下载完成')
print('-' * 50)

View File

@ -1,14 +0,0 @@
# encoding: UTF-8
"""
立即下载数据到数据库中用于手动执行更新操作
"""
from dataService import *
if __name__ == '__main__':
downloadMinuteBarBySymbol('CU99')
downloadDailyBarBySymbol('IF99')
downloadDailyBarBySymbol('TA99')
downloadDailyBarBySymbol('I99')

View File

@ -1,33 +0,0 @@
# encoding: UTF-8
"""
定时服务可无人值守运行实现每日自动下载更新历史行情数据到数据库中
"""
from __future__ import print_function
import time
import datetime
from dataService import downloadAllMinuteBar
if __name__ == '__main__':
taskCompletedDate = None
# 生成一个随机的任务下载时间,用于避免所有用户在同一时间访问数据服务器
taskTime = datetime.time(hour=17, minute=0)
# 进入主循环
while True:
t = datetime.datetime.now()
# 每天到达任务下载时间后,执行数据下载的操作
if t.time() > taskTime and (taskCompletedDate is None or t.date() != taskCompletedDate):
downloadAllMinuteBar()
# 更新任务完成的日期
taskCompletedDate = t.date()
else:
print(u'当前时间%s,任务定时%s' %(t, taskTime))
time.sleep(60)

View File

@ -1,3 +0,0 @@
# 天勤历史行情服务
请在[www.tq18.cn](www.tq18.cn)下载天勤行情终端,安装运行后,即可使用该服务。

View File

@ -1,4 +0,0 @@
{
"MONGO_HOST": "localhost",
"MONGO_PORT": 27017
}

View File

@ -1,107 +0,0 @@
# encoding: UTF-8
from __future__ import print_function
import sys
import json
from datetime import datetime
from time import sleep
from pymongo import MongoClient, ASCENDING
from vnpy.data.tq.vntq import TqApi
from vnpy.trader.vtObject import VtBarData
from vnpy.trader.app.ctaStrategy.ctaBase import MINUTE_DB_NAME
# 加载配置
config = open('config.json')
setting = json.load(config)
MONGO_HOST = setting['MONGO_HOST']
MONGO_PORT = setting['MONGO_PORT']
mc = MongoClient(MONGO_HOST, MONGO_PORT) # Mongo连接
db = mc[MINUTE_DB_NAME] # 数据库
api = TqApi() # 历史行情服务API对象
api.connect() # 连接
taskList = [] # 下载任务列表
#----------------------------------------------------------------------
def generateVtBar(symbol, d):
"""生成K线"""
bar = VtBarData()
bar.symbol = symbol
bar.vtSymbol = symbol
bar.open = d['open']
bar.high = d['high']
bar.low = d['low']
bar.close = d['close']
bar.volume = d['volume']
bar.openInterest = d['open_oi']
bar.datetime = datetime.fromtimestamp(d['datetime']/1000000000)
bar.date = bar.datetime.strftime("%Y%m%d")
bar.time = bar.datetime.strftime("%H:%M:%S")
return bar
#----------------------------------------------------------------------
def onChart(symbol, seconds):
"""K线更新处理函数"""
# 避免重复记录已经完成的任务
if symbol not in taskList:
return
serial = api.get_kline_serial(symbol, seconds)
cl = db[symbol] # 集合
cl.ensure_index([('datetime', ASCENDING)], unique=True) # 添加索引
l = serial.values()
for d in l:
bar = generateVtBar(symbol, d)
d = bar.__dict__
flt = {'datetime': bar.datetime}
cl.replace_one(flt, d, True)
start = datetime.fromtimestamp(l[0]['datetime']/1000000000)
end = datetime.fromtimestamp(l[-1]['datetime']/1000000000)
print(u'合约%s下载完成%s - %s' %(symbol, start, end))
# 移除已经完成的任务
if symbol in taskList:
taskList.remove(symbol)
#----------------------------------------------------------------------
def downMinuteBarBySymbol(symbol, num):
"""下载某一合约的分钟线数据"""
api.subscribe_chart(symbol, 60, num, onChart)
#----------------------------------------------------------------------
def downloadAllMinuteBar(num, symbols):
"""下载所有配置中的合约的分钟线数据"""
print('-' * 50)
print(u'开始下载合约分钟线数据')
print('-' * 50)
# 添加下载任务
taskList.extend(symbols)
for symbol in symbols:
downMinuteBarBySymbol(str(symbol), num)
while True:
sleep(2)
# 如果任务列表为空,则说明数据已经全部下载完成
if not taskList:
print('-' * 50)
print(u'合约分钟线数据下载完成')
print('-' * 50)
return

View File

@ -1,17 +0,0 @@
# encoding: UTF-8
"""
立即下载数据到数据库中用于手动执行更新操作
注意: 请先在本机启动天勤终端 (0.8.0 以上版本) 并保持运行, 再执行本程序
"""
from dataService import *
if __name__ == '__main__':
symbols = ["CFFEX.IF1710", "CFFEX.IF1711", "CFFEX.IF1712", "CFFEX.IF1803",
"CFFEX.IH1710", "CFFEX.IH1711", "CFFEX.IH1712", "CFFEX.IH1803",
"CFFEX.IC1710", "CFFEX.IC1711", "CFFEX.IC1712", "CFFEX.IC1803"]
downloadAllMinuteBar(1000, symbols)

View File

@ -1,40 +0,0 @@
# encoding: UTF-8
"""
定时服务可无人值守运行实现每日自动下载更新历史行情数据到数据库中
注意: 请确保本程序运行时, 本机天勤终端 (0.8.0 以上版本)正在运行中
"""
from __future__ import print_function
import time
import datetime
from dataService import downloadAllMinuteBar
if __name__ == '__main__':
taskCompletedDate = None
# 生成一个随机的任务下载时间,用于避免所有用户在同一时间访问数据服务器
taskTime = datetime.time(hour=17, minute=0)
symbols = ["CFFEX.IF1710", "CFFEX.IF1711", "CFFEX.IF1712", "CFFEX.IF1803",
"CFFEX.IH1710", "CFFEX.IH1711", "CFFEX.IH1712", "CFFEX.IH1803",
"CFFEX.IC1710", "CFFEX.IC1711", "CFFEX.IC1712", "CFFEX.IC1803"]
# 进入主循环
while True:
t = datetime.datetime.now()
# 每天到达任务下载时间后,执行数据下载的操作
if t.time() > taskTime and (taskCompletedDate is None or t.date() != taskCompletedDate):
# 下载1000根分钟线数据足以覆盖过去两天的行情
downloadAllMinuteBar(1000, symbols)
# 更新任务完成的日期
taskCompletedDate = t.date()
else:
print(u'当前时间%s,任务定时%s' %(t, taskTime))
time.sleep(60)

10
examples/RQData/README.md Normal file
View File

@ -0,0 +1,10 @@
### RQData数据自动更新服务
vn.py官方推荐的历史数据解决方案由RiceQuant提供的高质量期货数据服务。
使用步骤:
1. 前往[RQData主页](https://www.ricequant.com/purchase#1),购买标准版账户或者申请试用
2. 获得账户后将自动下载make.bat文件在make.bat中找到name和password信息
3. 打开config.json在rqUsername和rqPassword中填入上述信息并在product列表中填入需要更新行情的期货合约产品代码只包含英文字母即可
4. 双击“启动更新服务.bat”来启动RQData数据自动更新服务点击右上角关闭按钮可最小化到右下方的托盘栏
5. 在托盘栏图标上点击右键菜单中的“退出”可以退出程序

View File

@ -0,0 +1,5 @@
{
"rqUsername": "",
"rqPassword": "",
"product": ["IF"]
}

View File

@ -0,0 +1,160 @@
# encoding: UTF-8
from __future__ import print_function
import sys
import json
from datetime import datetime
from time import time, sleep
from pymongo import MongoClient, ASCENDING
import pandas as pd
from vnpy.trader.vtObject import VtBarData, VtTickData
from vnpy.trader.app.ctaStrategy.ctaBase import (MINUTE_DB_NAME,
DAILY_DB_NAME,
TICK_DB_NAME)
import rqdatac as rq
# 加载配置
config = open('config.json')
setting = json.load(config)
mc = MongoClient() # Mongo连接
dbMinute = mc[MINUTE_DB_NAME] # 数据库
dbDaily = mc[DAILY_DB_NAME]
dbTick = mc[TICK_DB_NAME]
USERNAME = setting['rqUsername']
PASSWORD = setting['rqPassword']
rq.init(USERNAME, PASSWORD)
FIELDS = ['open', 'high', 'low', 'close', 'volume']
#----------------------------------------------------------------------
def generateVtBar(row, symbol):
"""生成K线"""
bar = VtBarData()
bar.symbol = symbol
bar.vtSymbol = symbol
bar.open = row['open']
bar.high = row['high']
bar.low = row['low']
bar.close = row['close']
bar.volume = row['volume']
bar.datetime = row.name
bar.date = bar.datetime.strftime("%Y%m%d")
bar.time = bar.datetime.strftime("%H:%M:%S")
return bar
#----------------------------------------------------------------------
def generateVtTick(row, symbol):
"""生成K线"""
tick = VtTickData()
tick.symbol = symbol
tick.vtSymbol = symbol
tick.lastPrice = row['last']
tick.volume = row['volume']
tick.openInterest = row['open_interest']
tick.datetime = row.name
tick.openPrice = row['open']
tick.highPrice = row['high']
tick.lowPrice = row['low']
tick.preClosePrice = row['prev_close']
tick.upperLimit = row['limit_up']
tick.lowerLimit = row['limit_down']
tick.bidPrice1 = row['b1']
tick.bidPrice2 = row['b2']
tick.bidPrice3 = row['b3']
tick.bidPrice4 = row['b4']
tick.bidPrice5 = row['b5']
tick.bidVolume1 = row['b1_v']
tick.bidVolume2 = row['b2_v']
tick.bidVolume3 = row['b3_v']
tick.bidVolume4 = row['b4_v']
tick.bidVolume5 = row['b5_v']
tick.askPrice1 = row['a1']
tick.askPrice2 = row['a2']
tick.askPrice3 = row['a3']
tick.askPrice4 = row['a4']
tick.askPrice5 = row['a5']
tick.askVolume1 = row['a1_v']
tick.askVolume2 = row['a2_v']
tick.askVolume3 = row['a3_v']
tick.askVolume4 = row['a4_v']
tick.askVolume5 = row['a5_v']
return tick
#----------------------------------------------------------------------
def downloadMinuteBarBySymbol(symbol):
"""下载某一合约的分钟线数据"""
start = time()
cl = dbMinute[symbol]
cl.ensure_index([('datetime', ASCENDING)], unique=True) # 添加索引
df = rq.get_price(symbol, frequency='1m', fields=FIELDS)
for ix, row in df.iterrows():
bar = generateVtBar(row, symbol)
d = bar.__dict__
flt = {'datetime': bar.datetime}
cl.replace_one(flt, d, True)
end = time()
cost = (end - start) * 1000
print(u'合约%s的分钟K线数据下载完成%s - %s,耗时%s毫秒' %(symbol, df.index[0], df.index[-1], cost))
#----------------------------------------------------------------------
def downloadDailyBarBySymbol(symbol):
"""下载某一合约日线数据"""
start = time()
cl = dbDaily[symbol]
cl.ensure_index([('datetime', ASCENDING)], unique=True) # 添加索引
df = rq.get_price(symbol, frequency='1d', fields=FIELDS, end_date=datetime.now().strftime('%Y%m%d'))
for ix, row in df.iterrows():
bar = generateVtBar(row, symbol)
d = bar.__dict__
flt = {'datetime': bar.datetime}
cl.replace_one(flt, d, True)
end = time()
cost = (end - start) * 1000
print(u'合约%s的日K线数据下载完成%s - %s,耗时%s毫秒' %(symbol, df.index[0], df.index[-1], cost))
#----------------------------------------------------------------------
def downloadTickBySymbol(symbol, date):
"""下载某一合约日线数据"""
start = time()
cl = dbTick[symbol]
cl.ensure_index([('datetime', ASCENDING)], unique=True) # 添加索引
df = rq.get_price(symbol,
frequency='tick',
start_date=date,
end_date=date)
for ix, row in df.iterrows():
tick = generateVtTick(row, symbol)
d = tick.__dict__
flt = {'datetime': tick.datetime}
cl.replace_one(flt, d, True)
end = time()
cost = (end - start) * 1000
print(u'合约%sTick数据下载完成%s - %s,耗时%s毫秒' %(symbol, df.index[0], df.index[-1], cost))

View File

@ -0,0 +1,15 @@
# encoding: UTF-8
"""
立即下载数据到数据库中用于手动执行更新操作
"""
from dataService import *
if __name__ == '__main__':
#downloadMinuteBarBySymbol('CU99')
#downloadDailyBarBySymbol('IF99')
#downloadDailyBarBySymbol('TA99')
#downloadDailyBarBySymbol('I99')
downloadTickBySymbol('IF1901', '2018-12-21')

274
examples/RQData/run.py Normal file
View File

@ -0,0 +1,274 @@
# encoding: UTF-8
from __future__ import print_function
import json
import ctypes
from datetime import datetime, timedelta, time
from time import sleep
from threading import Thread
from collections import OrderedDict
import qdarkstyle
from pymongo import MongoClient, ASCENDING, DESCENDING
from pymongo.errors import ConnectionFailure
from vnpy.trader.uiQt import QtCore, QtWidgets, QtGui
from vnpy.trader.vtObject import VtBarData
from vnpy.trader.app.ctaStrategy.ctaBase import MINUTE_DB_NAME, DAILY_DB_NAME
DAY_START = time(9, 0) # 日盘启动和停止时间
DAY_END = time(15, 15)
NIGHT_START = time(21, 0) # 夜盘启动和停止时间
NIGHT_END = time(2, 30)
########################################################################
class RqDataManager(QtWidgets.QWidget):
""""""
signal = QtCore.Signal(str)
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
super(RqDataManager, self).__init__()
self.client = None
self.rq = None
self.thread = Thread(target=self.run)
self.productList = []
self.symbolExchangeDict = OrderedDict()
self.initUi()
n1 = self.connectMongo()
if not n1:
return
n2 = self.initRqData()
if not n2:
return
self.count = 0
self.active = True
self.thread.start()
#----------------------------------------------------------------------
def connectMongo(self):
"""连接数据库"""
try:
self.client = MongoClient(serverSelectionTimeoutMS=10)
self.client.server_info()
self.writeLog(u'MongoDB连接成功')
return True
except ConnectionFailure:
self.client = None
self.writeLog(u'MongoDB连接失败')
return False
#----------------------------------------------------------------------
def initUi(self):
"""初始化界面"""
self.setWindowTitle(u'RQData数据服务')
self.setWindowIcon(QtGui.QIcon('vnpy.ico'))
self.setFixedHeight(500)
self.setFixedWidth(900)
self.logMonitor = QtWidgets.QTextEdit()
self.logMonitor.setReadOnly(True)
vbox = QtWidgets.QVBoxLayout()
vbox.addWidget(self.logMonitor)
self.setLayout(vbox)
self.signal.connect(self.updateLog)
# 托盘配置
self.tray = QtWidgets.QSystemTrayIcon()
self.tray.setIcon(QtGui.QIcon('vnpy.ico'))
self.tray.activated.connect(self.showManager)
restoreAction = QtWidgets.QAction(u'还原', self, triggered=self.show)
quitAction = QtWidgets.QAction(u'退出', self, triggered=self.exit)
menu = QtWidgets.QMenu(QtWidgets.QApplication.desktop())
menu.addAction(restoreAction)
menu.addAction(quitAction)
self.tray.setContextMenu(menu)
self.tray.show()
#----------------------------------------------------------------------
def initRqData(self):
""""""
with open('config.json') as config:
setting = json.load(config)
for product in setting['product']:
self.productList.append(product.upper())
# 检查是否填写了RQData配置
username = setting.get('rqUsername', None)
password = setting.get('rqPassword', None)
if not username or not password:
self.writeLog(u'RQData的用户名和密码配置错误请在config.json中修改')
return False
# 加载RQData
try:
import rqdatac as rq
except ImportError:
self.writeLog(u'没有安装RQData客户端请先安装rqdatac')
return False
# 登录RQData
self.rq = rq
self.rq.init(username, password)
# 获取本日可交易合约代码
try:
df = self.rq.all_instruments(type='Future', date=datetime.now())
for ix, row in df.iterrows():
self.symbolExchangeDict[row['order_book_id']] = row['exchange']
except RuntimeError:
self.writeLog(u'RQData的用户名和密码无效请联系米筐申请试用或者购买')
return False
self.writeLog(u'RQData客户端登录成功')
return True
#----------------------------------------------------------------------
def downloadBar(self, symbol, frequency):
"""下载合约数据"""
if 'frequency' == '1m':
db = self.client[MINUTE_DB_NAME]
else:
db = self.client[DAILY_DB_NAME]
# 上期所和大商所代码改为小写
exchange = self.symbolExchangeDict[symbol]
if exchange in ['SHFE', 'DCE']:
localSymbol = symbol.lower()
else:
localSymbol = symbol
collection = db[localSymbol]
# 获取本地数据库中最后一条记录的时间,并下载新数据
result = collection.find_one(sort=[("datetime", DESCENDING)])
if result:
startDate = result['datetime']
else:
startDate = '20180101'
if startDate:
self.writeLog(u'%s下载更新数据,开始时间:%s' %(localSymbol, startDate))
else:
self.writeLog(u'%s初次下载数据,耗时可能较长,请耐心等待' %(localSymbol))
df = self.rq.get_price(symbol,
frequency=frequency,
fields=['open', 'high', 'low', 'close', 'volume'],
start_date=startDate,
end_date=datetime.now())
# 插入到数据库
for ix, row in df.iterrows():
bar = self.generateBar(row, localSymbol)
d = bar.__dict__
flt = {'datetime': bar.datetime}
collection.replace_one(flt, d, True)
self.writeLog(u'%s数据更新完成:%s - %s' %(localSymbol, df.index[0], df.index[-1]))
#----------------------------------------------------------------------
def generateBar(self, row, symbol):
"""生成K线对象"""
bar = VtBarData()
bar.symbol = symbol
bar.vtSymbol = symbol
bar.open = row['open']
bar.high = row['high']
bar.low = row['low']
bar.close = row['close']
bar.volume = row['volume']
bar.datetime = row.name
bar.date = bar.datetime.strftime("%Y%m%d")
bar.time = bar.datetime.strftime("%H:%M:%S")
return bar
#----------------------------------------------------------------------
def writeLog(self, msg):
"""记录日志"""
self.signal.emit(msg)
#----------------------------------------------------------------------
def updateLog(self, msg):
"""更新日志"""
dt = datetime.now()
msg = '%s: %s' %(dt, msg)
self.logMonitor.append(msg)
#----------------------------------------------------------------------
def run(self):
"""运行"""
while self.active:
sleep(1)
self.count += 1
if self.count < 10:
continue
self.count = 0
now = datetime.now().time()
if ((DAY_START <= now <= DAY_END) or
(now >= NIGHT_START) or
(now <= NIGHT_END)):
for symbol in self.symbolExchangeDict.keys():
download = False
for product in self.productList:
if product in symbol:
download = True
if download:
self.downloadBar(symbol, '1m')
else:
self.writeLog(u'非交易时间段,不执行更新')
#----------------------------------------------------------------------
def showManager(self, reason):
""""""
self.show()
#----------------------------------------------------------------------
def closeEvent(self, event):
""""""
self.hide()
event.ignore()
#----------------------------------------------------------------------
def exit(self):
""""""
self.active = False
self.thread.join()
QtWidgets.qApp.quit()
if __name__ == '__main__':
font = QtGui.QFont(u'微软雅黑', 12)
app = QtWidgets.QApplication([])
app.setFont(font)
app.setStyleSheet(qdarkstyle.load_stylesheet_from_environment())
ctypes.windll.shell32.SetCurrentProcessExplicitAppUserModelID('RQDataService')
manager = RqDataManager()
manager.show()
app.exec_()

BIN
examples/RQData/vnpy.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 66 KiB

View File

@ -0,0 +1 @@
pythonw run.py

View File

@ -2,18 +2,18 @@
{
"name": "double ema",
"className": "DoubleMaStrategy",
"vtSymbol": "rb1805"
"vtSymbol": "rb1905"
},
{
"name": "atr rsi",
"className": "AtrRsiStrategy",
"vtSymbol": "IC1802"
"vtSymbol": "IC1901"
},
{
"name": "king keltner",
"className": "KkStrategy",
"vtSymbol": "IH1802"
"vtSymbol": "IH1901"
}
]

View File

@ -16,5 +16,8 @@
"tdPenalty": ["IF", "IH", "IC"],
"maxDecimal": 4
"maxDecimal": 4,
"rqUsername": "",
"rqPassword": ""
}

View File

@ -35,7 +35,8 @@ elif system == 'Windows':
# 加载上层应用
from vnpy.trader.app import (riskManager, ctaStrategy,
spreadTrading, algoTrading)
spreadTrading, algoTrading,
tradeCopy)
#----------------------------------------------------------------------
@ -67,6 +68,7 @@ def main():
me.addApp(ctaStrategy)
me.addApp(spreadTrading)
me.addApp(algoTrading)
me.addApp(tradeCopy)
# 创建主窗口
mw = MainWindow(me, ee)

View File

@ -2,18 +2,15 @@ pymongo
websocket-client
msgpack-python
qdarkstyle
SortedContainers
futuquant
wmi
future
flask-socketio
flask-restful
flask-cors
gevent-websocket
pyjwt
ccxt
pyqtgraph
qtpy
psutil
ta-lib
futu-api

View File

@ -1,4 +1,4 @@
# encoding: UTF-8
__version__ = '1.9.0'
__version__ = '1.9.2'
__author__ = 'Xiaoyou Chen'

View File

@ -1,6 +1,5 @@
# vn.data - 数据相关工具
### 历史数据
* datayes通联数据接口
* shcifco上海中期接口
* tq天勤数据接口
* shcifco上海中期接口

View File

@ -1,2 +0,0 @@
from __future__ import absolute_import
from .vndatayes import DatayesApi

View File

@ -1,53 +0,0 @@
# encoding: UTF-8
'''一个简单的通联数据客户端主要使用requests开发比通联官网的python例子更为简洁。'''
from __future__ import print_function
import os
import requests
import json
HTTP_OK = 200
########################################################################
class DatayesApi(object):
"""通联数据API"""
#----------------------------------------------------------------------
def __init__(self, token,
domain="http://api.wmcloud.com/data",
version="v1"):
"""Constructor"""
self.domain = domain # 主域名
self.version = version # API版本
self.token = token # 授权码
self.header = {} # http请求头部
self.header['Connection'] = 'keep_alive'
self.header['Authorization'] = 'Bearer ' + self.token
#----------------------------------------------------------------------
def downloadData(self, path, params):
"""下载数据"""
url = '/'.join([self.domain, self.version, path])
r = requests.get(url=url, headers=self.header, params=params)
if r.status_code != HTTP_OK:
print(u'http请求失败状态代码%s' %r.status_code)
return None
else:
result = r.json()
if 'retMsg' in result and result['retMsg'] == 'Success':
return result['data']
else:
if 'retMsg' in result:
print(u'查询失败,返回信息%s' %result['retMsg'])
elif 'message' in result:
print(u'查询失败,返回信息%s' %result['message'])
return None

View File

@ -1,49 +0,0 @@
# encoding: UTF-8
from __future__ import print_function
from __future__ import absolute_import
from six import input
from .vntq import TqApi
# 接口对象
api = None
#----------------------------------------------------------------------
def onQuote(symbol):
"""Tick更新"""
print('-' * 30)
print('onQuote')
quote = api.get_quote(symbol)
print(quote)
#----------------------------------------------------------------------
def onChart(symbol, seconds):
"""K线更新"""
print('-' * 30)
print('onChart')
if seconds == 0:
serial = api.get_tick_serial(symbol)
else:
serial = api.get_kline_serial(symbol, seconds)
print(serial)
if __name__ == "__main__":
symbol = 'CFFEX.IF1710'
api = TqApi()
api.connect()
# 订阅Tick推送
#api.subscribe_quote([symbol], onQuote)
# 订阅Tick图表
#api.subscribe_chart(symbol, 0, 100, onChart)
# 订阅K线图表
api.subscribe_chart(symbol, 60, 1000, onChart)
input()

View File

@ -1,282 +0,0 @@
# encoding: UTF-8
"""
对接天勤行情的网关接口可以提供国内期货的报价/K线/Tick序列等数据的实时推送和历史仿真
使用时需要在本机先启动一个天勤终端进程
天勤行情终端: http://www.tq18.cn
天勤接口文档: http://doc.tq18.cn/tq/latest/extension/wsapi/index.html
"""
from __future__ import print_function
import json
import threading
import tornado
from tornado import websocket
from sortedcontainers import SortedDict
########################################################################
class TqApi(object):
"""天勤行情接口"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.data = {} # 数据存储
self.client = None # websocket客户端
self.requests = [] # 请求缓存
self.quote_callback_func = None # tick回调函数
self.quote_ins_list = []
self.chart_subscribes = {} # k线回调函数
#----------------------------------------------------------------------
def connect(self):
"""
建立行情连接
"""
self.start()
# 启动tornado的IO线程
loop_thread = threading.Thread(target=lambda: tornado.ioloop.IOLoop.current().start())
loop_thread.setDaemon(True)
loop_thread.start()
#----------------------------------------------------------------------
def subscribe_quote(self, ins_list, callback_func=None):
"""
订阅实时行情.
指定一个合约列表订阅其实时报价信息
每次调用此函数时都会覆盖前一次的订阅设定不在订阅列表里的合约会停止行情推送
:param ins_list: ins_list 是一个列表列出全部需要实时行情的合约代码注意天勤接口从0.8版本开始合约代码格式变更为 交易所代码.合约代码的格式. 交易所代码如下
CFFEX: 中金所
SHFE: 上期所
DCE: 大商所
CZCE: 郑商所
INE: 能源交易所(原油)
:param callback_func (可选): callback_func 是一个回调函数每当有报价数据变更时会触发此函数应该接受一个参数 ins_id
:example:
订阅 SHFE.cu1803,CZCE.SR709,CFFEX.IF1709 这三个合约的报价: subscribe_quote(["SHFE.cu1803", CZCE.SR709", "CFFEX.IF1709"])
"""
if callback_func:
self.quote_callback_func = callback_func
self.quote_ins_list = ins_list
req = {
"aid": "subscribe_quote",
"ins_list": ",".join(ins_list),
}
self.send_json(req)
#----------------------------------------------------------------------
def subscribe_chart(self, ins_id, duration_seconds, data_length=200, callback_func=None):
"""
订阅历史行情序列.
订阅指定合约及周期的历史行情序列K线数据序列或Tick数据序列这些序列数据会持续推送
:param ins_id: 合约代码需注意大小写
:param duration_seconds: 历史数据周期以秒为单位目前支持的周期包括
35101520301分钟2分钟3分钟5分钟10分钟15分钟20分钟30分钟1小时2小时4小时1
特别的此值指定为0表示订阅tick序列
:param data_length: 需要获取的序列长度每个序列最大支持请求 8964 个数据
:param callback_func (可选): callback_func 是一个回调函数每当序列数据变更时会触发此函数应该接受2个参数 ins_id, duration_seconds
:example:
订阅 SHFE.cu1803 的1分钟线 subscribe_chart("SHFE.cu1803", 60)
订阅 CFFEX.IF1709 的tick线 subscribe_chart("CFFEX.IF1709", 0)
"""
chart_id = self._generate_chart_id(ins_id, duration_seconds)
# 限制最大数据长度
if data_length > 8964:
data_length = 8964
req = {
"aid": "set_chart",
"chart_id": chart_id,
"ins_list": ins_id,
"duration": duration_seconds * 1000000000,
"view_width": data_length,
}
self.send_json(req)
self.chart_subscribes[chart_id] = req
self.chart_subscribes[chart_id]["callback"] = callback_func
#----------------------------------------------------------------------
def get_quote(self, ins_id):
"""
获取报价数据
:param ins_id: 指定合约代码
:return: 若指定的数据不存在返回None否则返回如下所示的一个dict
{
u'datetime': u'2017-07-26 23:04:21.000001',# tick从交易所发出的时间(按北京时区)
u'instrument_id': u'CZCE.SR801', # 合约代码
u'last_price': 6122.0, # 最新价
u'bid_price1': 6121.0, # 买一价
u'ask_price1': 6122.0, # 卖一价
u'bid_volume1': 54, # 买一量
u'ask_volume1': 66, # 卖一量
u'upper_limit': 6388.0, # 涨停价
u'lower_limit': 5896.0, # 跌停价
u'volume': 89252, # 成交量
u'amount': 5461329880.0, # 成交额
u'open_interest': 616424, # 持仓量
u'highest': 6129.0, # 当日最高价
u'lowest': 6101.0, # 当日最低价
u'average': 6119.0, # 当日均价
u'open': 6102.0, # 开盘价
u'close': u'-', # 收盘价
u'settlement': u'-', # 结算价
u'pre_close': 6106.0, # 昨收盘价
u'pre_settlement': 6142.0 # 昨结算价
u'pre_open_interest': 616620, # 昨持仓量
}
"""
return self.data.setdefault("quotes", {}).get(ins_id, None)
#----------------------------------------------------------------------
def get_tick_serial(self, ins_id):
"""
获取tick序列数据
:param ins_id: 指定合约代码
:return: 若指定的序列数据不存在返回None否则返回如下所示的一个dict
{
u'485107':{ # 每个Tick都有一个唯一编号在一个序列中编号总是连续递增的
u'datetime': 1501074872000000000L, # tick从交易所发出的时间(按北京时区)以nano epoch 方式表示(等于从1970-01-01时刻开始的纳秒数)
u'trading_day': 1501084800000000000L, #交易日, 格式同上
u'last_price': 3887, # 最新价
u'bid_price1': 3881, # 买一价
u'ask_price1': 3886, # 卖一价
u'bid_volume1': 5, # 买一量
u'ask_volume1': 1, #卖一量
u'highest': 3887, # 当日最高价
u'lowest': 3886, # 当日最低价
u'volume': 6, # 成交量
u'open_interest': 1796 # 持仓量
},
u'485108': {
...
}
}
"""
return self.data.setdefault("ticks", {}).setdefault(ins_id, {}).get("data", None)
#----------------------------------------------------------------------
def get_kline_serial(self, ins_id, duration_seconds):
"""
获取k线序列数据
:param ins_id: 指定合约代码
:param duration_seconds: 指定K线周期
:return: 若指定的序列数据不存在返回None否则返回如下所示的一个dict
{
u'494835': { # 每根K线都有一个唯一编号在一个序列中编号总是连续递增的
u'datetime': 1501080715000000000L, # K线起点时间(按北京时区)以nano epoch 方式表示(等于从1970-01-01时刻开始的纳秒数)
u'open': 51450, # K线起始时刻的最新价
u'high': 51450, # K线时间范围内的最高价
u'low': 51450, # K线时间范围内的最低价
u'close': 51450, # K线结束时刻的最新价
u'volume': 0, # K线时间范围内的成交量
u'open_oi': 27354, # K线起始时刻的持仓量
u'close_oi': 27354 # K线结束时刻的持仓量
},
u'494836': {
...
}
}
"""
dur_id = "%d" % (duration_seconds * 1000000000)
return self.data.setdefault("klines", {}).setdefault(ins_id, {}).setdefault(dur_id, {}).get("data", None)
#----------------------------------------------------------------------
@tornado.gen.coroutine
def start(self):
"""启动websocket客户端"""
self.client = yield tornado.websocket.websocket_connect(url="ws://127.0.0.1:7777/")
# 发出所有缓存的请求
for req in self.requests:
self.client.write_message(req)
self.requests = []
# 协程式读取数据
while True:
msg = yield self.client.read_message()
self.on_receive_msg(msg)
#----------------------------------------------------------------------
def send_json(self, obj):
"""发送JSON内容"""
s = json.dumps(obj)
# 如果已经创建了客户端则直接发出请求
if self.client:
self.client.write_message(s)
# 否则缓存在请求缓存中
else:
self.requests.append(s)
#----------------------------------------------------------------------
def on_receive_msg(self, msg):
"""收到数据推送"""
pack = json.loads(msg)
if 'data' in pack:
l = pack["data"]
else:
print(u'on_receive_msg收到的数据中没有data字段数据内容%s' %str(pack))
return
for data in l:
# 合并更新数据字典
self._merge_obj(self.data, data)
# 遍历更新内容并调用回调函数
for selector, section in data.items():
if selector == "quotes":
if self.quote_callback_func:
for ins_id in section.keys():
if ins_id in self.quote_ins_list:
self.quote_callback_func(ins_id)
elif selector == "ticks":
for ins_id in section.keys():
chart_id = self._generate_chart_id(ins_id, 0)
sub_info = self.chart_subscribes.get(chart_id, None)
tick_serial = self.get_tick_serial(ins_id)
if tick_serial and sub_info:
while len(tick_serial) > sub_info["view_width"]:
tick_serial.popitem(last=False)
callback_func = sub_info["callback"]
if callback_func:
callback_func(ins_id, 0)
elif selector == "klines":
for ins_id, sub_section in section.items():
for dur_nanoseconds in sub_section.keys():
dur_seconds = int(dur_nanoseconds) / 1000000000
chart_id = self._generate_chart_id(ins_id, dur_seconds)
sub_info = self.chart_subscribes.get(chart_id, None)
kline_serial = self.get_kline_serial(ins_id, dur_seconds)
if kline_serial and sub_info:
while len(kline_serial) > sub_info["view_width"]:
kline_serial.popitem(last=False)
callback_func = sub_info["callback"]
if callback_func:
callback_func(ins_id, dur_seconds)
#----------------------------------------------------------------------
def _merge_obj(self, result, obj):
"""合并对象"""
for key, value in obj.items():
if value is None:
result.pop(key, None)
elif isinstance(value, dict):
target = result.setdefault(key, SortedDict())
self._merge_obj(target, value)
else:
result[key] = value
#----------------------------------------------------------------------
def _generate_chart_id(self, ins_id, duration_seconds):
"""生成图表编号"""
chart_id = "VN_%s_%d" % (ins_id, duration_seconds)
chart_id = chart_id.replace(".", "_")
return chart_id

View File

@ -16,5 +16,8 @@
"tdPenalty": ["IF", "IH", "IC"],
"maxDecimal": 4
"maxDecimal": 4,
"rqUsername": "",
"rqPassword": ""
}

View File

@ -70,7 +70,7 @@ class StopAlgo(AlgoTemplate):
func = self.sell
self.vtOrderID = func(self.vtSymbol, price, self.volume, offset=self.offset)
self.vtOrderID = func(self.vtSymbol, price, self.totalVolume, offset=self.offset)
msg = u'停止单已触发,代码:%s,方向:%s, 价格:%s,数量:%s,开平:%s' %(self.vtSymbol,
self.direction,

View File

@ -20,6 +20,7 @@ from vnpy.trader.vtObject import VtTickData, VtBarData
from vnpy.trader.vtGateway import VtSubscribeReq, VtOrderReq, VtCancelOrderReq, VtLogData
from vnpy.trader.vtFunction import todayDate, getJsonPath
from vnpy.trader.app import AppEngine
from vnpy.trader.vtGlobal import globalSetting
from .ctaBase import *
from .strategy import STRATEGY_CLASS
@ -74,6 +75,15 @@ class CtaEngine(AppEngine):
# 引擎类型为实盘
self.engineType = ENGINETYPE_TRADING
# RQData数据服务
self.rq = None
# RQData能获取的合约代码列表
self.rqSymbolSet = set()
# 初始化RQData服务
self.initRqData()
# 注册日式事件类型
self.mainEngine.registerLogEvent(EVENT_CTA_LOG)
@ -343,6 +353,12 @@ class CtaEngine(AppEngine):
#----------------------------------------------------------------------
def loadBar(self, dbName, collectionName, days):
"""从数据库中读取Bar数据startDate是datetime对象"""
# 优先尝试从RQData获取数据
if dbName == MINUTE_DB_NAME and collectionName.upper() in self.rqSymbolSet:
l = self.loadRqBar(collectionName, days)
return l
# 如果没有则从数据库中读取数据
startDate = self.today - timedelta(days)
d = {'datetime':{'$gte':startDate}}
@ -663,4 +679,60 @@ class CtaEngine(AppEngine):
return contract.priceTick
return 0
#----------------------------------------------------------------------
def initRqData(self):
"""初始化RQData客户端"""
# 检查是否填写了RQData配置
username = globalSetting.get('rqUsername', None)
password = globalSetting.get('rqPassword', None)
if not username or not password:
return
# 加载RQData
try:
import rqdatac as rq
except ImportError:
return
# 登录RQData
self.rq = rq
self.rq.init(username, password)
# 获取本日可交易合约代码
try:
df = self.rq.all_instruments(type='Future', date=datetime.now())
for ix, row in df.iterrows():
self.rqSymbolSet.add(row['order_book_id'])
except RuntimeError:
pass
#----------------------------------------------------------------------
def loadRqBar(self, symbol, days):
"""从RQData加载K线数据"""
endDate = datetime.now()
startDate = endDate - timedelta(days)
df = self.rq.get_price(symbol.upper(),
frequency='1m',
fields=['open', 'high', 'low', 'close', 'volume'],
start_date=startDate,
end_date=endDate)
l = []
for ix, row in df.iterrows():
bar = VtBarData()
bar.symbol = symbol
bar.vtSymbol = symbol
bar.open = row['open']
bar.high = row['high']
bar.low = row['low']
bar.close = row['close']
bar.volume = row['volume']
bar.datetime = row.name
bar.date = bar.datetime.strftime("%Y%m%d")
bar.time = bar.datetime.strftime("%H:%M:%S")
l.append(bar)
return l

View File

@ -35,7 +35,7 @@ class StDataEngine(object):
# 腿、价差相关字典
self.legDict = {} # vtSymbol:StLeg
self.spreadDict = {} # name:StSpread
self.spreadDict = OrderedDict() # name:StSpread
self.vtSymbolSpreadDict = {} # vtSymbol:StSpread
self.registerEvent()

View File

@ -462,9 +462,10 @@ class StAlgoManager(QtWidgets.QTableWidget):
algoEngine = self.algoEngine
l = self.algoEngine.getAllAlgoParams()
self.setRowCount(len(l))
for row, d in enumerate(l):
for d in l:
self.insertRow(0)
cellSpreadName = QtWidgets.QTableWidgetItem(d['spreadName'])
cellAlgoName = QtWidgets.QTableWidgetItem(d['algoName'])
cellNetPos = QtWidgets.QTableWidgetItem('0')
@ -477,17 +478,17 @@ class StAlgoManager(QtWidgets.QTableWidget):
comboMode = StModeComboBox(algoEngine, d['spreadName'], d['mode'])
buttonActive = StActiveButton(algoEngine, d['spreadName'])
self.setItem(row, 0, cellSpreadName)
self.setItem(row, 1, cellAlgoName)
self.setItem(row, 2, cellNetPos)
self.setCellWidget(row, 3, spinBuyPrice)
self.setCellWidget(row, 4, spinSellPrice)
self.setCellWidget(row, 5, spinCoverPrice)
self.setCellWidget(row, 6, spinShortPrice)
self.setCellWidget(row, 7, spinMaxOrderSize)
self.setCellWidget(row, 8, spinMaxPosSize)
self.setCellWidget(row, 9, comboMode)
self.setCellWidget(row, 10, buttonActive)
self.setItem(0, 0, cellSpreadName)
self.setItem(0, 1, cellAlgoName)
self.setItem(0, 2, cellNetPos)
self.setCellWidget(0, 3, spinBuyPrice)
self.setCellWidget(0, 4, spinSellPrice)
self.setCellWidget(0, 5, spinCoverPrice)
self.setCellWidget(0, 6, spinShortPrice)
self.setCellWidget(0, 7, spinMaxOrderSize)
self.setCellWidget(0, 8, spinMaxPosSize)
self.setCellWidget(0, 9, comboMode)
self.setCellWidget(0, 10, buttonActive)
buttonActive.signalActive.connect(spinBuyPrice.algoActiveChanged)
buttonActive.signalActive.connect(spinSellPrice.algoActiveChanged)

View File

@ -0,0 +1,10 @@
# encoding: UTF-8
from .tcEngine import TcEngine
from .uiTcWidget import TcManager
appName = 'TradeCopy'
appDisplayName = u'交易复制'
appEngine = TcEngine
appWidget = TcManager
appIco = 'tc.ico'

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

View File

@ -0,0 +1,331 @@
# encoding: UTF-8
from collections import defaultdict
from vnpy.event import Event
from vnpy.rpc import RpcClient, RpcServer
from vnpy.trader.vtEvent import (EVENT_POSITION, EVENT_TRADE,
EVENT_TIMER, EVENT_ORDER)
from vnpy.trader.vtConstant import (DIRECTION_LONG, DIRECTION_SHORT,
OFFSET_OPEN, OFFSET_CLOSE, PRICETYPE_LIMITPRICE,
OFFSET_CLOSEYESTERDAY, OFFSET_CLOSETODAY,
STATUS_REJECTED)
from vnpy.trader.vtObject import VtOrderReq, VtCancelOrderReq, VtLogData, VtSubscribeReq
EVENT_TC_LOG = 'eTcLog'
########################################################################
class TcEngine(object):
"""交易复制引擎"""
MODE_PROVIDER = 1
MODE_SUBSCRIBER = 2
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine):
"""Constructor"""
self.mainEngine = mainEngine
self.eventEngine = eventEngine
self.mode = None # Subscriber/Provider
self.posDict = defaultdict(int) # vtPositionName:int
self.targetDict = defaultdict(int) # vtPositionName:int
self.copyRatio = 1
self.interval = 1
self.subscribeSet = set()
self.count = 0
self.server = None # RPC Server
self.client = None # RPC Client
self.registerEvent()
#----------------------------------------------------------------------
def startProvider(self, repAddress, pubAddress, interval):
""""""
self.mode = self.MODE_PROVIDER
self.interval = interval
if not self.server:
self.server = RpcServer(repAddress, pubAddress)
self.server.usePickle()
self.server.register(self.getPos)
self.server.start()
self.writeLog(u'启动发布者模式(如需修改通讯地址请重启程序)')
#----------------------------------------------------------------------
def startSubscriber(self, reqAddress, subAddress, copyRatio):
""""""
self.mode = self.MODE_SUBSCRIBER
self.copyRatio = copyRatio
if not self.client:
self.client = TcClient(self, reqAddress, subAddress)
self.client.usePickle()
self.client.subscribeTopic('')
self.client.start()
self.writeLog(u'启动订阅者模式,运行时请不要执行其他交易操作')
self.initTarget()
#----------------------------------------------------------------------
def stop(self):
""""""
if self.client:
self.client.stop()
self.writeLog(u'订阅者模式已停止')
if self.server:
self.server.stop()
self.writeLog(u'发布者模式已停止')
self.mode = None
#----------------------------------------------------------------------
def registerEvent(self):
""""""
self.eventEngine.register(EVENT_POSITION, self.processPositionEvent)
self.eventEngine.register(EVENT_TRADE, self.processTradeEvent)
self.eventEngine.register(EVENT_TIMER, self.processTimerEvent)
self.eventEngine.register(EVENT_ORDER, self.processOrderEvent)
#----------------------------------------------------------------------
def checkAndTrade(self, vtSymbol):
""""""
if self.checkNoWorkingOrder(vtSymbol):
self.newOrder(vtSymbol)
else:
self.cancelOrder(vtSymbol)
#----------------------------------------------------------------------
def processTimerEvent(self, event):
""""""
if self.mode != self.MODE_PROVIDER:
return
self.count += 1
if self.count < self.interval:
return
self.count = 0
for vtPositionName in self.posDict.keys():
self.publishPos(vtPositionName)
#----------------------------------------------------------------------
def processTradeEvent(self, event):
""""""
trade = event.dict_['data']
vtPositionName = '.'.join([trade.vtSymbol, trade.direction])
if trade.offset == OFFSET_OPEN:
self.posDict[vtPositionName] += trade.volume
else:
self.posDict[vtPositionName] -= trade.volume
if self.mode == self.MODE_PROVIDER:
self.publishPos(vtPositionName)
#----------------------------------------------------------------------
def processPositionEvent(self, event):
""""""
position = event.dict_['data']
self.posDict[position.vtPositionName] = position.position
#----------------------------------------------------------------------
def processOrderEvent(self, event):
""""""
order = event.dict_['data']
if order.status == STATUS_REJECTED:
self.writeLog(u'监控到委托拒单,停止运行')
self.stop()
#----------------------------------------------------------------------
def publishPos(self, vtPositionName):
""""""
l = vtPositionName.split('.')
direction = l[-1]
vtSymbol = vtPositionName.replace('.' + direction, '')
data = {
'vtSymbol': vtSymbol,
'vtPositionName': vtPositionName,
'pos': self.posDict[vtPositionName]
}
self.server.publish('', data)
#----------------------------------------------------------------------
def updatePos(self, data):
""""""
vtSymbol = data['vtSymbol']
if vtSymbol not in self.subscribeSet:
contract = self.mainEngine.getContract(vtSymbol)
req = VtSubscribeReq()
req.symbol = contract.symbol
req.exchange = contract.exchange
self.mainEngine.subscribe(req, contract.gatewayName)
vtPositionName = data['vtPositionName']
target = int(data['pos'] * self.copyRatio)
self.targetDict[vtPositionName] = target
self.checkAndTrade(vtSymbol)
#----------------------------------------------------------------------
def newOrder(self, vtSymbol):
""""""
for vtPositionName in self.targetDict.keys():
if vtSymbol not in vtPositionName:
continue
pos = self.posDict[vtPositionName]
target = self.targetDict[vtPositionName]
if pos == target:
continue
contract = self.mainEngine.getContract(vtSymbol)
tick = self.mainEngine.getTick(vtSymbol)
if not tick:
return
req = VtOrderReq()
req.symbol = contract.symbol
req.exchange = contract.exchange
req.priceType = PRICETYPE_LIMITPRICE
req.volume = abs(target - pos)
# Open position
if target > pos:
req.offset = OFFSET_OPEN
if DIRECTION_LONG in vtPositionName:
req.direction = DIRECTION_LONG
if tick.upperLimit:
req.price = tick.upperLimit
else:
req.price = tick.askPrice1
elif DIRECTION_SHORT in vtPositionName:
req.direction = DIRECTION_SHORT
if tick.lowerLimit:
req.price = tick.lowerLimit
else:
req.price = tick.bidPrice1
self.mainEngine.sendOrder(req, contract.gatewayName)
# Close position
elif target < pos:
req.offset = OFFSET_CLOSE
if DIRECTION_LONG in vtPositionName:
req.direction = DIRECTION_SHORT
if tick.upperLimit:
req.price = tick.upperLimit
else:
req.price = tick.askPrice1
elif DIRECTION_SHORT in vtPositionName:
req.direction = DIRECTION_LONG
if tick.lowerLimit:
req.price = tick.lowerLimit
else:
req.price = tick.bidPrice1
# Use auto-convert for solving today/yesterday position problem
reqList = self.mainEngine.convertOrderReq(req)
for convertedReq in reqList:
self.mainEngine.sendOrder(convertedReq, contract.gatewayName)
# Write log
msg = u'发出%s委托 %s%s %s@%s' %(vtSymbol, req.direction, req.offset,
req.price, req.volume)
self.writeLog(msg)
#----------------------------------------------------------------------
def cancelOrder(self, vtSymbol):
"""
Cancel all orders of a certain vtSymbol
"""
l = self.mainEngine.getAllWorkingOrders()
for order in l:
if order.vtSymbol == vtSymbol:
req = VtCancelOrderReq()
req.orderID = order.orderID
req.frontID = order.frontID
req.sessionID = order.sessionID
req.symbol = order.symbol
req.exchange = order.exchange
self.mainEngine.cancelOrder(req, order.gatewayName)
self.writeLog(u'撤销%s全部活动中委托' %vtSymbol)
#----------------------------------------------------------------------
def checkNoWorkingOrder(self, vtSymbol):
"""
Check if there is still any working orders of a certain vtSymbol
"""
l = self.mainEngine.getAllWorkingOrders()
for order in l:
if order.vtSymbol == vtSymbol:
return False
return True
#----------------------------------------------------------------------
def writeLog(self, msg):
""""""
log = VtLogData()
log.logContent = msg
event = Event(EVENT_TC_LOG)
event.dict_['data'] = log
self.eventEngine.put(event)
#----------------------------------------------------------------------
def getPos(self):
"""
Get currenct position data of provider
"""
return dict(self.posDict)
#----------------------------------------------------------------------
def initTarget(self):
"""
Init target data of subscriber based on position data from provider
"""
d = self.client.getPos()
for vtPositionName, pos in d.items():
l = vtPositionName.split('.')
direction = l[-1]
vtSymbol = vtPositionName.replace('.' + direction, '')
data = {
'vtPositionName': vtPositionName,
'vtSymbol': vtSymbol,
'pos': pos
}
self.updatePos(data)
self.writeLog(u'目标仓位初始化完成')
########################################################################
class TcClient(RpcClient):
""""""
#----------------------------------------------------------------------
def __init__(self, engine, reqAddress, subAddress):
"""Constructor"""
super(TcClient, self).__init__(reqAddress, subAddress)
self.engine = engine
#----------------------------------------------------------------------
def callback(self, topic, data):
""""""
self.engine.updatePos(data)

View File

@ -0,0 +1,199 @@
# encoding: UTF-8
import shelve
from vnpy.event import Event
from vnpy.trader.uiQt import QtCore, QtGui, QtWidgets
from vnpy.trader.vtFunction import getTempPath
from .tcEngine import EVENT_TC_LOG
########################################################################
class TcManager(QtWidgets.QWidget):
""""""
REQ_ADDRESS = 'tcp://localhost:2015'
SUB_ADDRESS = 'tcp://localhost:2018'
REP_ADDRESS = 'tcp://*:2015'
PUB_ADDRESS = 'tcp://*:2018'
COPY_RATIO = '1'
INTERVAL = '1'
settingFileName = 'TradeCopy.vt'
settingFilePath = getTempPath(settingFileName)
signal = QtCore.Signal(type(Event()))
#----------------------------------------------------------------------
def __init__(self, tcEngine, eventEngine, parent=None):
"""Constructor"""
super(TcManager, self).__init__(parent)
self.tcEngine = tcEngine
self.eventEngine = eventEngine
self.initUi()
self.loadSetting()
self.registerEvent()
self.tcEngine.writeLog(u'欢迎使用TradeCopy交易复制模块')
#----------------------------------------------------------------------
def initUi(self):
""""""
self.setWindowTitle(u'交易复制')
self.setMinimumWidth(700)
self.setMinimumHeight(700)
# 创建组件
self.lineReqAddress = QtWidgets.QLineEdit(self.REQ_ADDRESS)
self.lineSubAddress= QtWidgets.QLineEdit(self.SUB_ADDRESS)
self.lineRepAddress = QtWidgets.QLineEdit(self.REP_ADDRESS)
self.linePubAddress = QtWidgets.QLineEdit(self.PUB_ADDRESS)
validator = QtGui.QDoubleValidator()
validator.setBottom(0)
self.lineCopyRatio = QtWidgets.QLineEdit()
self.lineCopyRatio.setValidator(validator)
self.lineCopyRatio.setText(self.COPY_RATIO)
validator2 = QtGui.QIntValidator()
validator2.setBottom(1)
self.lineInterval = QtWidgets.QLineEdit()
self.lineInterval.setValidator(validator2)
self.lineInterval.setText(self.INTERVAL)
self.buttonProvider = QtWidgets.QPushButton(u'启动发布者')
self.buttonProvider.clicked.connect(self.startProvider)
self.buttonSubscriber = QtWidgets.QPushButton(u'启动订阅者')
self.buttonSubscriber.clicked.connect(self.startSubscriber)
self.buttonStopEngine = QtWidgets.QPushButton(u'停止')
self.buttonStopEngine.clicked.connect(self.stopEngine)
self.buttonStopEngine.setEnabled(False)
self.buttonResetAddress = QtWidgets.QPushButton(u'重置地址')
self.buttonResetAddress.clicked.connect(self.resetAddress)
self.logMonitor = QtWidgets.QTextEdit()
self.logMonitor.setReadOnly(True)
self.widgetList = [
self.lineCopyRatio,
self.lineInterval,
self.linePubAddress,
self.lineSubAddress,
self.lineRepAddress,
self.lineReqAddress,
self.buttonProvider,
self.buttonSubscriber,
self.buttonResetAddress
]
# 布局
QLabel = QtWidgets.QLabel
grid = QtWidgets.QGridLayout()
grid.addWidget(QLabel(u'响应地址'), 0, 0)
grid.addWidget(self.lineRepAddress, 0, 1)
grid.addWidget(QLabel(u'请求地址'), 0, 2)
grid.addWidget(self.lineReqAddress, 0, 3)
grid.addWidget(QLabel(u'发布地址'), 1, 0)
grid.addWidget(self.linePubAddress, 1, 1)
grid.addWidget(QLabel(u'订阅地址'), 1, 2)
grid.addWidget(self.lineSubAddress, 1, 3)
grid.addWidget(QLabel(u'发布间隔(秒)'), 2, 0)
grid.addWidget(self.lineInterval, 2, 1)
grid.addWidget(QLabel(u'复制比例(倍)'), 2, 2)
grid.addWidget(self.lineCopyRatio, 2, 3)
grid.addWidget(self.buttonProvider, 3, 0, 1, 2)
grid.addWidget(self.buttonSubscriber, 3, 2, 1, 2)
grid.addWidget(self.buttonStopEngine, 4, 0, 1, 2)
grid.addWidget(self.buttonResetAddress, 4, 2, 1, 2)
grid.addWidget(self.logMonitor, 5, 0, 1, 4)
self.setLayout(grid)
#----------------------------------------------------------------------
def saveSetting(self):
""""""
f = shelve.open(self.settingFilePath)
f['repAddress'] = self.lineRepAddress.text()
f['reqAddress'] = self.lineReqAddress.text()
f['pubAddress'] = self.linePubAddress.text()
f['subAddress'] = self.lineSubAddress.text()
f['copyRatio'] = self.lineCopyRatio.text()
f['interval'] = self.lineInterval.text()
f.close()
#----------------------------------------------------------------------
def loadSetting(self):
""""""
f = shelve.open(self.settingFilePath)
if f:
self.lineRepAddress.setText(f['repAddress'])
self.lineReqAddress.setText(f['reqAddress'])
self.linePubAddress.setText(f['pubAddress'])
self.lineSubAddress.setText(f['subAddress'])
self.lineCopyRatio.setText(f['copyRatio'])
self.lineInterval.setText(f['interval'])
f.close()
#----------------------------------------------------------------------
def resetAddress(self):
""""""
self.lineReqAddress.setText(self.REQ_ADDRESS)
self.lineRepAddress.setText(self.REP_ADDRESS)
self.linePubAddress.setText(self.PUB_ADDRESS)
self.lineSubAddress.setText(self.SUB_ADDRESS)
#----------------------------------------------------------------------
def stopEngine(self):
""""""
self.tcEngine.stop()
for widget in self.widgetList:
widget.setEnabled(True)
self.buttonStopEngine.setEnabled(False)
#----------------------------------------------------------------------
def registerEvent(self):
""""""
self.signal.connect(self.processLogEvent)
self.eventEngine.register(EVENT_TC_LOG, self.signal.emit)
#----------------------------------------------------------------------
def processLogEvent(self, event):
""""""
log = event.dict_['data']
txt = '%s: %s' %(log.logTime, log.logContent)
self.logMonitor.append(txt)
#----------------------------------------------------------------------
def startProvider(self):
""""""
repAddress = str(self.lineRepAddress.text())
pubAddress = str(self.linePubAddress.text())
interval = int(self.lineInterval.text())
self.tcEngine.startProvider(repAddress, pubAddress, interval)
for widget in self.widgetList:
widget.setEnabled(False)
self.buttonStopEngine.setEnabled(True)
#----------------------------------------------------------------------
def startSubscriber(self):
""""""
reqAddress = str(self.lineReqAddress.text())
subAddress = str(self.lineSubAddress.text())
copyRatio = float(self.lineCopyRatio.text())
self.tcEngine.startSubscriber(reqAddress, subAddress, copyRatio)
for widget in self.widgetList:
widget.setEnabled(False)
self.buttonStopEngine.setEnabled(True)

View File

@ -1,742 +0,0 @@
# encoding: UTF-8
'''
vnpy.api.bithumb的gateway接入
'''
import json
from collections import defaultdict
from datetime import datetime
from vnpy.api.bithumb import BithumbRestApi
from vnpy.trader.vtFunction import getJsonPath
from vnpy.trader.vtGateway import *
# 方向映射
directionMap = {
constant.DIRECTION_LONG: 'bid',
constant.DIRECTION_SHORT: 'ask'
}
directionMapReverse = {v: k for k, v in directionMap.items()}
# 从https://www.bithumb.com/u1/US127中https://api.bithumb.com/trade/place的API说明中得到
minimum_ticks = {
'BTC': 0.001,
'ETH': 0.01,
'DASH': 0.01,
'LTC': 0.01,
'ETC': 0.1,
'XRP': 10,
'BCH': 0.001,
'XMR': 0.01,
'ZEC': 0.01,
'QTUM': 0.1,
'BTG': 0.1,
'EOS': 0.1,
'ICX': 1,
'VEN': 1,
'TRX': 100,
'ELF': 10,
'MITH': 10,
'MCO': 10,
'OMG': 0.1,
'KNC': 1,
'GNT': 10,
'HSR': 1,
'ZIL': 100,
'ETHOS': 1,
'PAY': 1,
'WAX': 10,
'POWR': 10,
'LRC': 10,
'GTO': 10,
'STEEM': 10,
'STRAT': 1,
'ZRX': 1,
'REP': 0.1,
'AE': 1,
'XEM': 10,
'SNT': 10,
'ADA': 10
}
########################################################################
class BithumbGateway(VtGateway):
#----------------------------------------------------------------------
def __init__(self, eventEngine, gatewayName='BithumbGateway'):
super(BithumbGateway, self).__init__(eventEngine, gatewayName)
self.restApi = RestApi(self) # type: RestApi
self.qryEnabled = False
self.fileName = self.gatewayName + '_connect.json'
self.filePath = getJsonPath(self.fileName, __file__)
#----------------------------------------------------------------------
def connect(self):
"""连接"""
try:
f = open(self.filePath)
except IOError:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'读取连接配置出错,请检查'
self.onLog(log)
return
# 解析json文件
setting = json.load(f)
f.close()
try:
apiKey = str(setting['apiKey'])
apiSecret = str(setting['apiSecret'])
# symbols = setting['symbols']
except KeyError:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'连接配置缺少字段,请检查'
self.onLog(log)
return
# 创建行情和交易接口对象
self.restApi.connect(apiKey, apiSecret)
# 初始化并启动查询
self.initQuery()
#----------------------------------------------------------------------
def subscribe(self, subscribeReq):
"""订阅行情"""
pass
#----------------------------------------------------------------------
def sendOrder(self, orderReq):
"""发单"""
return self.restApi.sendOrder(orderReq)
#----------------------------------------------------------------------
def cancelOrder(self, cancelOrderReq):
"""撤单"""
self.restApi.cancelOrder(cancelOrderReq)
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.restApi.close()
#----------------------------------------------------------------------
def initQuery(self):
"""初始化连续查询"""
# if self.qryEnabled:
# 需要循环的查询函数列表
# self.qryFunctionList = [self.restApi.qryTickers,
# self.restApi.qryDepth,
# self.restApi.qryPosition,
# self.restApi.qryOrder]
#
# self.qryCount = 0 # 查询触发倒计时
# self.qryTrigger = 1 # 查询触发点
# self.qryNextFunction = 0 # 上次运行的查询函数索引
#
# self.startQuery()
pass
#----------------------------------------------------------------------
def query(self, event):
"""注册到事件处理引擎上的查询函数"""
# self.qryCount += 1
#
# if self.qryCount > self.qryTrigger:
# # 清空倒计时
# self.qryCount = 0
#
# # 执行查询函数
# function = self.qryFunctionList[self.qryNextFunction]
# function()
#
# # 计算下次查询函数的索引如果超过了列表长度则重新设为0
# self.qryNextFunction += 1
# if self.qryNextFunction == len(self.qryFunctionList):
# self.qryNextFunction = 0
pass
#----------------------------------------------------------------------
def startQuery(self):
"""启动连续查询"""
self.eventEngine.register(EVENT_TIMER, self.query)
#----------------------------------------------------------------------
def setQryEnabled(self, qryEnabled):
"""设置是否要启动循环查询"""
self.qryEnabled = qryEnabled
########################################################################
# noinspection PyUnusedLocal
class RestApi(BithumbRestApi):
"""REST API实现"""
#----------------------------------------------------------------------
def __init__(self, gateway):
"""Constructor"""
super(RestApi, self).__init__()
self.gateway = gateway # type: BithumbGateway # gateway对象
self.gatewayName = gateway.gatewayName # gateway对象名称
self.localID = 0
self.tradeID = 0
self.orders = {} # type: dict[str, VtOrderData] # localID:order
self.sysLocalDict = {} # type: dict[str, str] # sysID: localID
self.localSysDict = {} # type: dict[str, str] # localID: sysID
self.reqOrderDict = {} # type: dict[int, VtOrderData] # reqID:order
self.cancelDict = {} # type: dict[str, VtCancelOrderReq] # localID:req
self.tickDict = {}
#----------------------------------------------------------------------
def connect(self, apiKey, apiSecret):
"""连接服务器"""
self.init(apiKey, apiSecret)
self.start()
# self.symbols = symbols
self.writeLog(u'REST API启动成功')
self.qryContract()
#----------------------------------------------------------------------
def writeLog(self, content):
"""发出日志"""
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = content
self.gateway.onLog(log)
#----------------------------------------------------------------------
def generateLocalOrder(self, ):
self.localID += 1
localID = str(self.localID)
order = VtOrderData()
order.gatewayName = self.gatewayName
order.status = constant.STATUS_UNKNOWN
order.exchange = constant.EXCHANGE_BITHUMB
order.orderID = localID
order.vtOrderID = '.'.join([self.gatewayName, localID])
self.orders[localID] = order
return order
#----------------------------------------------------------------------
def sendOrder(self, orderReq):
"""下单"""
req = {
'order_currency': orderReq.symbol,
'Payment_currency': orderReq.currency, # todo: 无论如何服务器都会以KRW作为单位
'type': directionMap[orderReq.direction],
'price': int(orderReq.price),
'units': orderReq.volume
}
reqid = self.addReq('POST', '/trade/place', self.onSendOrder, postdict=req)
# 缓存委托数据对象
order = self.generateLocalOrder()
self.fillLocalOrder(order,
orderReq.symbol,
orderReq.price,
orderReq.volume,
orderReq.direction)
self.reqOrderDict[reqid] = order
return order.vtOrderID
#----------------------------------------------------------------------
def onSendOrder(self, data, reqid): # type: (dict, int)->None
"""下单回执"""
if self.checkError(u'委托', data):
return
order = self.reqOrderDict[reqid]
localID = order.orderID
sysID = data['order_id']
self.saveSysIDForOrder(order, sysID)
self.gateway.onOrder(order)
# 发出等待的撤单委托
if localID in self.cancelDict:
req = self.cancelDict[localID]
self.cancelOrder(req)
del self.cancelDict[localID]
#----------------------------------------------------------------------
@staticmethod
def fillLocalOrder(order, symbol, price, totalVolume, direction):
order.symbol = symbol
order.vtSymbol = '.'.join([order.symbol, order.exchange])
order.price = price
order.totalVolume = totalVolume
order.direction = direction
#----------------------------------------------------------------------
def saveSysIDForOrder(self, order, sysID): # type: (VtOrderData, str)->None
self.sysLocalDict[sysID] = order.orderID
self.localSysDict[order.orderID] = sysID
#----------------------------------------------------------------------
def qryOrder(self, order): # type: (VtOrderData)->None
sysID = self.getSysIDForOrder(order)
req = {
'currency': order.symbol,
'order_id': sysID
}
self.addReq('POST', '/info/orders', self.onQryOrders, postdict=req)
#----------------------------------------------------------------------
def qryOrders(self, currency='XML'): # type: (VtOrderData)->None
sysID = self.getSysIDForOrder(order)
req = {
'currency': order.symbol,
}
self.addReq('POST', '/info/orders', self.onQryOrders, postdict=req)
#----------------------------------------------------------------------
def onQryOrders(self, data, reqid):
if self.checkError(u'订单查询', data):
return
orders = data['data']
for detail in orders:
sysID = detail['order_id']
order = self.getOrderBySysID(sysID)
if not order:
# 查询到了新的order(以前的order)
order = self.generateLocalOrder()
self.fillLocalOrder(order,
detail['order_currency'],
detail['price'],
detail['units'],
directionMapReverse[detail['type']])
order.tradedVolume = order.totalVolume - detail['units_remaining']
# todo: payment_currency
# payment_currency = detail['payment_currency']
self.saveSysIDForOrder(order, sysID)
# 推送
self.gateway.onOrder(order)
continue
originalTradeVolume = order.tradedVolume
order.tradedVolume = newTradeVolume = order.totalVolume - detail['units_remaining']
if newTradeVolume != originalTradeVolume:
# 推送更新
self.gateway.onOrder(order)
# 尝试更新状态
# todo: 这一句还未测试不知道成交之后date_completed是不是就会有值
order.status = constant.STATUS_ALLTRADED if detail['date_completed'] else order.status
if order.status == constant.STATUS_ALLTRADED:
# 推送成交
self.pushOrderAsTraded(order)
#----------------------------------------------------------------------
def pushOrderAsTraded(self, order):
trade = VtTradeData()
trade.gatewayName = order.gatewayName
trade.symbol = order.symbol
trade.vtSymbol = order.vtSymbol
trade.orderID = order.orderID
trade.vtOrderID = order.vtOrderID
self.tradeID += 1
trade.tradeID = str(self.tradeID)
trade.vtTradeID = '.'.join([self.gatewayName, trade.tradeID])
trade.direction = order.direction
trade.price = order.price
trade.volume = order.tradedVolume
trade.tradeTime = datetime.now().strftime('%H:%M:%S')
self.gateway.onTrade(trade)
#----------------------------------------------------------------------
def cancelOrder(self, cancelOrderReq): # type: (self, VtCancelOrderReq)->None
""""""
localID = cancelOrderReq.orderID
order = self.getOrderByLocalID(localID)
if self.isOrderPosted(order):
sysID = self.getSysIDForOrder(order)
req = {
'type': directionMap[order.direction],
'order_id': sysID,
'currency': cancelOrderReq.symbol
}
self.addReq('POST',
'/trade/cancel',
callback=lambda data, reqid: self.onCancelOrder(localID, data, reqid),
postdict=req)
else:
self.cancelDict[localID] = cancelOrderReq
#----------------------------------------------------------------------
def onCancelOrder(self, localID, data, reqid):
if self.checkError(u'撤单', data):
return
order = self.getOrderByLocalID(localID)
order.status = constant.STATUS_CANCELLED
#----------------------------------------------------------------------
def qryContract(self):
""""""
contract = VtContractData()
contract.gatewayName = self.gatewayName
for symbol, tick in minimum_ticks.items():
contract.symbol = symbol
contract.exchange = constant.EXCHANGE_BITHUMB
contract.vtSymbol = '.'.join([contract.symbol, contract.exchange])
contract.name = contract.vtSymbol
contract.productClass = constant.PRODUCT_SPOT
contract.priceTick = tick
contract.size = 1
self.gateway.onContract(contract)
#----------------------------------------------------------------------
def qryPublicTick(self, symbol='ALL'):
""" symbol 可以是'BTC', 'ETC'等等电子货币符号;也可以使用'ALL',表示要获取所有货币的行情"""
url = '/public/ticker/' + symbol
if symbol.upper() == 'ALL':
self.addReq('GET', url, self.onQryMultiPublicTicker)
else:
self.addReq('GET', url,
callback=lambda data, reqid: self.onQrySinglePublicTicker(symbol, data, id))
#----------------------------------------------------------------------
def qryPublicOrderBook(self, symbol='ALL'):
""" symbol 可以是'BTC', 'ETC'等等电子货币符号;也可以使用'ALL',表示要获取所有货币的行情"""
url = '/public/orderbook/' + symbol
if symbol.upper() == 'ALL':
self.addReq('GET', url, self.onQryMultiPublicOrderBook)
else:
self.addReq('GET', url,
callback=lambda data, reqid: self.onQrySinglePublicOrderBook(symbol, data, id))
#----------------------------------------------------------------------
def qryPrivateTick(self, symbol, currency='CNY'):
""""""
req = {
'order_currency': symbol,
'payment_currency': currency,
}
self.addReq('POST', '/info/ticker', self.onQryPrivateTicker, postdict=req)
#----------------------------------------------------------------------
def qryPosition(self, symbol='ALL'):
""""""
req = {
'currency': symbol,
}
self.addReq('POST', '/info/balance', self.onQryPosition, postdict=req)
pass
#----------------------------------------------------------------------
def onQryPosition(self, data, reqid): # type: (self, dict, int)->None
""""""
if self.checkError(u'查询持仓', data):
return
datas = data['data'] # type: dict
# 先分类一下
infos = defaultdict(dict) # type: dict[str, dict[str, str]]
for key, val in datas.items(): # type: str, str
split_position = key.rfind('_')
infoType, symbol = key[:split_position], key[split_position+1:]
infos[symbol.upper()][infoType] = val
for symbol in infos.keys():
info = infos[symbol]
if symbol == u'LAST': # 过滤掉xcoin_last这个值表示的是最后一次交易量
continue
if symbol == u'KRW':
accountData = VtAccountData()
# todo: accountID必须从另一个API获取
# accountData.accountID =
accountData.balance = info['total']
accountData.available = info['available']
self.gateway.onAccount(accountData)
pass
else:
pos = VtPositionData()
pos.gatewayName = self.gatewayName
pos.symbol = symbol
pos.exchange = constant.EXCHANGE_BITHUMB
pos.vtSymbol = '.'.join([pos.symbol, pos.exchange])
pos.direction = constant.DIRECTION_NET
pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction])
pos.position = float(info['total'])
pos.frozen = float(info['in_use'])
self.gateway.onPosition(pos)
#----------------------------------------------------------------------
def parsePublicTickerData(self, symbol, info):
dt = datetime.now()
date = dt.strftime('%Y%m%d')
time = dt.strftime('%H:%M:%S')
tick = self.getTick(symbol)
tick.openPrice = float(info['opening_price'])
tick.highPrice = float(info['max_price'])
tick.lowPrice = float(info['min_price'])
tick.lastPrice = float(info['closing_price']) # todo: 也许应该是'buy_price'?
tick.volume = float(info['volume_1day'])
tick.datetime = datetime
tick.date = date
tick.time = time
# 只有订阅了深度行情才推送
if tick.bidPrice1:
self.gateway.onTick(tick)
#----------------------------------------------------------------------
def onQrySinglePublicTicker(self, symbol, data, reqid):
if self.checkError(u'查询行情', data):
return
info = data['data']
self.parsePublicTickerData(symbol=symbol, info=info)
#----------------------------------------------------------------------
def onQryMultiPublicTicker(self, data, reqid):
if self.checkError(u'查询行情', data):
return
for symbol, info in data['data'].items():
# 里面可能会出现一对date: int这样的值所以要过滤掉
if isinstance(info, dict):
self.parsePublicTickerData(symbol=symbol, info=info)
pass
#----------------------------------------------------------------------
def parsePublicOrderBookData(self, symbol, info):
dt = datetime.now()
date = dt.strftime('%Y%m%d')
time = dt.strftime('%H:%M:%S')
tick = self.getTick(symbol)
for i in range(5):
tick.__setattr__('askPrice' + str(i + 1), float(info['asks'][i]['price']))
tick.__setattr__('askVolume' + str(i + 1), float(info['asks'][i]['quantity']))
for i in range(5):
tick.__setattr__('bidPrice' + str(i + 1), float(info['bids'][i]['price']))
tick.__setattr__('bidVolume' + str(i + 1), float(info['bids'][i]['quantity']))
tick.datetime = datetime
tick.date = date
tick.time = time
# 只有订阅了深度行情才推送
if tick.bidPrice1:
self.gateway.onTick(tick)
#----------------------------------------------------------------------
def onQrySinglePublicOrderBook(self, symbol, data, reqid):
if self.checkError(u'五档行情', data):
return
info = data['data']
self.parsePublicTickerData(symbol=symbol, info=info)
pass
#----------------------------------------------------------------------
def onQryMultiPublicOrderBook(self, data, reqid):
if self.checkError(u'五档行情', data):
return
for symbol, info in data['data'].items():
self.parsePublicTickerData(symbol=symbol, info=info)
pass
#----------------------------------------------------------------------
def onQryPrivateTicker(self, data, reqid):
pass
#----------------------------------------------------------------------
def getTick(self, symbol):
""""""
tick = self.tickDict.get(symbol, None) # type: VtTickData
if not tick:
tick = VtTickData()
tick.gatewayName = self.gatewayName
tick.symbol = symbol
tick.exchange = constant.EXCHANGE_BITHUMB
tick.vtSymbol = '.'.join([tick.symbol, tick.exchange])
self.tickDict[symbol] = tick
return tick
#----------------------------------------------------------------------
def checkError(self, name, data):
""""""
status = data.get('status', None)
if status == u'0000':
return False
elif not status:
self.writeLog(u'%s触发错误:%s' % (name, u"未知的响应报文 : %s".format(data)))
return True
msg = data.get('message', u'unknown')
self.writeLog(u'%s触发错误:%s' % (name, msg))
return True
#----------------------------------------------------------------------
def getOrderByLocalID(self, localID): # type: (str)->VtOrderData
"""如果没有该订单,这个函数会出错"""
return self.orders[localID]
#----------------------------------------------------------------------
def getOrderByVtOrderID(self, vtOrderId): # type: (str)->VtOrderData
"""如果没有该订单,这个函数会出错"""
localID = vtOrderId[vtOrderId.rfind('.') + 1:]
return self.getOrderByLocalID(localID)
#----------------------------------------------------------------------
def getOrderBySysID(self, sysID): # type: (str)->VtOrderData
return self.getOrderByLocalID(self.getLocalIDBySysID(sysID))
#----------------------------------------------------------------------
def getLocalIDBySysID(self, sysID): # type: (str)->str
return self.sysLocalDict[sysID]
#----------------------------------------------------------------------
def isOrderPosted(self, order): # type: (VtOrderData)->bool
"""检查服务器是否响应了一个下单请求如果已经响应了返回True否则False"""
return order.orderID in self.localSysDict
#----------------------------------------------------------------------
def getSysIDForOrder(self, order): # type: (VtOrderData)->str
return self.localSysDict[order.orderID]
#----------------------------------------------------------------------
def hasSysID(self, sysID):
return sysID in self.sysLocalDict
if __name__ == '__main__':
# default test secret:
API_KEY = '0c2f5621ac18d26d51ce640b25eb44f9'
API_SECRET = '62bb8b4e263476f443f8d3dbf0aad6bc'
api = BithumbRestApi()
api.init(apiKey=API_KEY, apiSecret=API_SECRET)
api.start(1)
eventEngine = EventEngine2()
rest = RestApi(BithumbGateway(eventEngine=eventEngine))
rest.connect(API_KEY, API_SECRET)
translateDict = {
u'\uac70\ub798 \uccb4\uacb0\ub0b4\uc5ed\uc774 '
u'\uc874\uc7ac\ud558\uc9c0 \uc54a\uc2b5\ub2c8\ub2e4.': "交易记录不存在",
u'\uac70\ub798 \uc9c4\ud589\uc911\uc778 \ub0b4\uc5ed\uc774 '
u'\uc874\uc7ac\ud558\uc9c0 \uc54a\uc2b5\ub2c8\ub2e4.': "没有正在进行的交易",
u'\ub9e4\uc218\uae08\uc561\uc774 \uc0ac\uc6a9\uac00\ub2a5 KRW'
u' \ub97c \ucd08\uacfc\ud558\uc600\uc2b5\ub2c8\ub2e4.': "购买金额超过可用KRW",
u'\ub9e4\uc218\uac74\uc758 \uc0c1\ud0dc\uac00 \uc9c4\ud589\uc911\uc774 \uc544\ub2d9\ub2c8\ub2e4. '
u'\ucde8\uc18c\ud560 \uc218 \uc5c6\uc2b5\ub2c8\ub2e4.': "购买查询的状态未在进行中。 它无法取消。",
u'\uc9c0\uc6d0\ud558\uc9c0 \uc54a\ub294 \ud654\ud3d0\uc785\ub2c8\ub2e4. [347]': "不支持该货币单位。[347]",
}
def printError(jsonObj):
if rest.checkError('', data=jsonObj):
print('error : ')
msg = jsonObj['message']
print(translateDict.get(msg, msg))
def manualCancelOrder(sysID):
def onTradeCancel(jsonObj, reqid):
print('onTradeCancel : \n{}'.format(jsonObj))
printError(jsonObj)
rest.addReq('POST', '/trade/cancel', onTradeCancel,
postdict={'type': 'bid', 'order_id': sysID, 'currency': 'XMR'})
def apiCancelOrder(localId):
cancelReq = VtCancelOrderReq()
cancelReq.symbol = order.symbol
cancelReq.orderID = localId
rest.cancelOrder(cancelReq)
# query tick
rest.qryPublicTick('BTC')
rest.qryPublicTick('ALL')
rest.qryPosition('BTC')
rest.qryPosition('ALL')
# send order
sendOrderReq = VtOrderReq()
sendOrderReq.symbol = 'XMR'
sendOrderReq.direction = constant.DIRECTION_LONG
sendOrderReq.volume = minimum_ticks['XMR']
# sendOrderReq.price = 700
# sendOrderReq.currency = 'CNY' # 不可用
# sendOrderReq.price = 16.6461
# sendOrderReq.currency = 'USD' # 不可用
sendOrderReq.price = 17500
sendOrderReq.currency = 'KRW'
vtOrderId = rest.sendOrder(sendOrderReq)
order = rest.getOrderByVtOrderID(vtOrderId)
# todo: order的状态表示不够清晰
while not rest.isOrderPosted(order):
time.sleep(0.1)
sysID = rest.getSysIDForOrder(order)
print("sysID : ")
print(sysID)
def onOrders(jsonObj, reqid):
print('on_orders : \n{}'.format(jsonObj))
printError(jsonObj)
for detail in jsonObj['data']:
sysID = detail['order_id']
if rest.hasSysID(sysID):
apiCancelOrder(rest.getLocalIDBySysID(sysID))
else:
manualCancelOrder(sysID)
after = '1531926544794' # 2018-07-18T15:09:04.794Z
# rest.addReq('POST', '/info/orders', on_orders,
# postdict={'order_id': sysID, 'type': 'bid', 'after': after, 'currency': 'XMR'}) # got
#
# rest.addReq('POST', '/info/orders', on_orders,
# postdict={'order_id': sysID, 'type': 'bid', 'currency': 'XMR'}) # got
#
# rest.addReq('POST', '/info/orders', on_orders,
# postdict={'after': after, 'currency': 'XMR'}) # got
rest.addReq('POST', '/info/orders', onOrders, postdict={'currency': 'XMR'}) # got
# rest.addReq('POST', '/info/orders', on_orders,
# postdict={'order_id': sysID, 'type': 'bid', 'after': after}) # 没有正在进行的交易
#
# rest.addReq('POST', '/info/orders', on_orders, postdict={'after': after}) # 没有正在进行的交易
# rest.addReq('POST', '/info/orders', on_orders, postdict={}) # 没有正在进行的交易
# rest.addReq('POST', '/info/orders', on_orders, postdict={'currency': 'ALL'}) # 不支持该货币单位
raw_input()

View File

@ -379,7 +379,7 @@ class CtpMdApi(MdApi):
if tick.exchange == EXCHANGE_DCE:
tick.date = datetime.now().strftime('%Y%m%d')
# 上交所SEE股票期权相关
# 上交所SSE股票期权相关
if tick.exchange == EXCHANGE_SSE:
tick.bidPrice2 = data['BidPrice2']
tick.bidVolume2 = data['BidVolume2']

View File

@ -11,11 +11,11 @@ from time import sleep
from datetime import datetime
from copy import copy
from futuquant import (OpenQuoteContext, OpenHKTradeContext, OpenUSTradeContext,
RET_ERROR, RET_OK,
TrdEnv, TrdSide, OrderType, OrderStatus, ModifyOrderOp,
StockQuoteHandlerBase, OrderBookHandlerBase,
TradeOrderHandlerBase, TradeDealHandlerBase)
from futu import (OpenQuoteContext, OpenHKTradeContext, OpenUSTradeContext,
RET_ERROR, RET_OK,
TrdEnv, TrdSide, OrderType, OrderStatus, ModifyOrderOp,
StockQuoteHandlerBase, OrderBookHandlerBase,
TradeOrderHandlerBase, TradeDealHandlerBase)
from vnpy.trader.vtGateway import *
from vnpy.trader.vtConstant import GATEWAYTYPE_INTERNATIONAL
@ -353,7 +353,7 @@ class FutuGateway(VtGateway):
#----------------------------------------------------------------------
def qryTrade(self):
"""查询成交"""
code, data = self.tradeCtx.deal_list_query(self.env)
code, data = self.tradeCtx.deal_list_query("", trd_env=self.env)
if code:
self.writeError(code, u'查询成交失败:%s' %data)

View File

@ -388,6 +388,9 @@ class OkexfRestApi(RestClient):
#----------------------------------------------------------------------
def onQueryPosition(self, data, request):
""""""
if not data['holding']:
return
for d in data['holding'][0]:
longPosition = VtPositionData()
longPosition.gatewayName = self.gatewayName
@ -454,7 +457,7 @@ class OkexfRestApi(RestClient):
"""
order = request.extra
order.status = STATUS_REJECTED
self.gateway.onOrder(vtOrder)
self.gateway.onOrder(order)
#----------------------------------------------------------------------
def onSendOrder(self, data, request):
@ -683,8 +686,8 @@ class OkexfWebsocketApi(WebsocketClient):
for n, buf in enumerate(data['asks']):
price, volume = buf[:2]
tick.__setattr__('askPrice%s' %(n+1), float(price))
tick.__setattr__('askVolume%s' %(n+1), int(volume))
tick.__setattr__('askPrice%s' %(5-n), float(price))
tick.__setattr__('askVolume%s' %(5-n), int(volume))
dt = datetime.fromtimestamp(data['timestamp']/1000)
tick.date = dt.strftime('%Y%m%d')

View File

@ -36,7 +36,7 @@ class MainWindow(QtWidgets.QMainWindow):
#----------------------------------------------------------------------
def initUi(self):
"""初始化界面"""
self.setWindowTitle('VnTrader')
self.setWindowTitle('VN Trader')
self.initCentral()
self.initMenu()
self.initStatusBar()
@ -335,7 +335,7 @@ class AboutWidget(QtWidgets.QDialog):
#----------------------------------------------------------------------
def initUi(self):
""""""
self.setWindowTitle(vtText.ABOUT + 'VnTrader')
self.setWindowTitle(vtText.ABOUT + 'VN Trader')
text = u"""
Developed by Traders, for Traders.

View File

@ -113,6 +113,8 @@ class MainEngine(object):
if gateway:
gateway.connect()
self.dbConnect()
#----------------------------------------------------------------------
def subscribe(self, subscribeReq, gatewayName):
@ -196,7 +198,7 @@ class MainEngine(object):
# 读取MongoDB的设置
try:
# 设置MongoDB操作的超时时间为0.5秒
self.dbClient = MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort'], connectTimeoutMS=500)
self.dbClient = MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort'], serverSelectionTimeoutMS=10)
# 调用server_info查询服务器状态防止服务器异常并未连接成功
self.dbClient.server_info()
@ -208,6 +210,7 @@ class MainEngine(object):
self.eventEngine.register(EVENT_LOG, self.dbLogging)
except ConnectionFailure:
self.dbClient = None
self.writeLog(text.DATABASE_CONNECTING_FAILED)
#----------------------------------------------------------------------