diff --git a/README.md b/README.md index e57e2566..89b65ebf 100644 --- a/README.md +++ b/README.md @@ -87,27 +87,29 @@ vn.py是基于Python的开源量化交易程序开发框架,起源于国内私 * AlgoTrading,算法交易模块,提供多种常用的智能交易算法:TWAP、Sniper、BestLimit、Iceberg、Arbitrage等等,支持数据库配置保存、CSV文件加载启动以及RPC跨进程算法交易服务 + * TradeCopy,复制交易模块,用户可以通过发布者Provider进程来对外提供交易策略信号(手动、策略均可),订阅者Subscriber进程根据收到的信号自动执行同步交易,简洁快速得实现一拖多账户交易功能 + * RiskManager,前端风控模块,负责在交易系统将任何交易请求发出到柜台前的一系列标准检查操作,支持用户自定义风控规则的扩展 * DataRecorder,实盘行情记录,支持Tick和K线数据的落地,用于策略开发回测以及实盘运行初始化 * RpcService,RPC跨进程调用服务,基于MainEngineProxy组件,用户可以如同开发单一进程应用搬开发多进程架构的复杂交易应用 - * RtdService,EXCEL RTD服务组件,通过pyxll模块提供EXCEL表格系统对VnTrader系统内所有数据的访问和功能调用(未完成) + * RtdService,EXCEL RTD服务组件,通过pyxll模块提供EXCEL表格系统对VN Trader系统内所有数据的访问 5. 数据相关的API接口(vnpy.data),用于构建和更新历史行情数据库,目前包括: - * 上海中期历史行情服务(shcifco) - - * 通联数据API下载服务(datayes) - - * 天勤行情数据接口(tq) + * 上海中期历史行情服务(shcifco) 6. 关于vn.py项目的应用演示(examples),对于新手而言可以从这里开始学习vn.py项目的使用方式 -8. vn.py项目的Docker镜像(docker),目前尚未完成 +8. vn.py项目的Docker镜像(docker): -9. [官方论坛](http://www.vnpie.com)和[知乎专栏](http://zhuanlan.zhihu.com/vn-py),内容包括vn.py项目的开发教程和Python在量化交易领域的应用研究等内容 + * web docker,在Docker中启动基于Web交易的交易服务器WebTrader,在浏览器中实现CTA策略的运维操作 + + * vnc docker,内嵌了完整的vn.py图形化运行环境(Linux),并通过VNC Server对外提供虚拟桌面访问 + +9. [社区论坛](http://www.vnpy.com)和[知乎专栏](http://zhuanlan.zhihu.com/vn-py),内容包括vn.py项目的开发教程和Python在量化交易领域的应用研究等内容 10. 官方交流QQ群262656087,管理较严格(定期清除长期潜水的成员) @@ -152,9 +154,9 @@ sudo /home/vnpy/anaconda2/bin/conda install -c quantopian ta-lib=0.4.9 1. 在[SimNow](http://simnow.com.cn/)注册CTP仿真账号,记下你的**账号、密码、经纪商编号**,然后下载快期查询你的**交易和行情服务器地址** -2. 找到vn.py应用示例目录examples,打开examples\VnTrader\CTP_connect.json,修改账号、密码、服务器等为上一步注册完成后你的信息(注意使用专门的编程编辑器,如Sublime Text等,防止json编码出错) +2. 找到vn.py应用示例目录examples,打开examples\VN Trader\CTP_connect.json,修改账号、密码、服务器等为上一步注册完成后你的信息(注意使用专门的编程编辑器,如Sublime Text等,防止json编码出错) -3. 找到VnTrader的启动入口run.py,并双击运行(若无法双击,则在当前目录按住Shift点鼠标右键,打开cmd输入python run.py运行),run.py内容如下: +3. 找到VN Trader的启动入口run.py,并双击运行(若无法双击,则在当前目录按住Shift点鼠标右键,打开cmd输入python run.py运行),run.py内容如下: ``` # encoding: UTF-8 diff --git a/vnpy/data/tq/__init__.py b/beta/__init__.py similarity index 100% rename from vnpy/data/tq/__init__.py rename to beta/__init__.py diff --git a/beta/gateway/__init__.py b/beta/gateway/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/beta/gateway/huobiGateway/__init__.py b/beta/gateway/huobiGateway/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/beta/gateway/huobiGateway/huobiGateway.py b/beta/gateway/huobiGateway/huobiGateway.py new file mode 100644 index 00000000..4c8c6ecd --- /dev/null +++ b/beta/gateway/huobiGateway/huobiGateway.py @@ -0,0 +1,302 @@ +# encoding: UTF-8 + +''' +''' + +from __future__ import print_function + +import base64 +import hashlib +import hmac +import json +import re +import urllib +import zlib + +from vnpy.api.rest import Request, RestClient +from vnpy.api.websocket import WebsocketClient +from vnpy.trader.vtGateway import * + +REST_HOST = 'https://api.huobipro.com' +WEBSOCKET_MARKET_HOST = 'wss://api.huobi.pro/ws' # Global站行情 +WEBSOCKET_ASSETS_HOST = 'wss://api.huobi.pro/ws/v1' # 资产和订单 +WEBSOCKET_CONTRACT_HOST = 'wss://www.hbdm.com/ws' # 合约站行情 + + +#---------------------------------------------------------------------- +def _split_url(url): + """ + 将url拆分为host和path + :return: host, path + """ + m = re.match('\w+://([^/]*)(.*)', url) + if m: + return m.group(1), m.group(2) + + +#---------------------------------------------------------------------- +def createSignature(apiKey, method, host, path, secretKey): + """创建签名""" + sortedParams = ( + ("AccessKeyId", apiKey), + ("SignatureMethod", 'HmacSHA256'), + ("SignatureVersion", "2"), + ("Timestamp", datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S')) + ) + encodeParams = urllib.urlencode(sortedParams) + + payload = [method, host, path, encodeParams] + payload = '\n'.join(payload) + payload = payload.encode(encoding='UTF8') + + secretKey = secretKey.encode(encoding='UTF8') + + digest = hmac.new(secretKey, payload, digestmod=hashlib.sha256).digest() + + signature = base64.b64encode(digest) + params = dict(sortedParams) + params["Signature"] = signature + return params + + +######################################################################## +class HuobiRestApi(RestClient): + + def __init__(self, gateway): # type: (VtGateway)->HuobiRestApi + super(HuobiRestApi, self).__init__() + self.gateway = gateway + self.gatewayName = gateway.gatewayName + + self.apiKey = "" + self.apiSecret = "" + self.signHost = "" + + #---------------------------------------------------------------------- + def sign(self, request): + request.headers = { + "User-Agent": + "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.71 Safari/537.36"} + additionalParams = createSignature(self.apiKey, + request.method, + self.signHost, + request.path, + self.apiSecret) + if not request.params: + request.params = additionalParams + else: + request.params.update(additionalParams) + if request.method == "POST": + request.headers['Content-Type'] = 'application/json' + return request + + #---------------------------------------------------------------------- + def connect(self, apiKey, apiSecret, sessionCount=3): + """连接服务器""" + self.apiKey = apiKey + self.apiSecret = apiSecret + + host, path = _split_url(REST_HOST) + self.init(REST_HOST) + self.signHost = host + self.start(sessionCount) + + #---------------------------------------------------------------------- + def qeuryAccount(self): + self.addRequest('GET', '/v1/account/accounts', self.onAccount) + + #---------------------------------------------------------------------- + def onAccount(self, data, request): # type: (dict, Request)->None + pass + + #---------------------------------------------------------------------- + def cancelWithdraw(self, id): + self.addRequest('POST', + "/v1/dw/withdraw-virtual/" + str(id) + "/cancel", + self.onWithdrawCanceled + ) + + #---------------------------------------------------------------------- + def onWithdrawCanceled(self, data, request): # type: (dict, Request)->None + pass + + +######################################################################## +class HuobiWebsocketApiBase(WebsocketClient): + + #---------------------------------------------------------------------- + def __init__(self, gateway): + """Constructor""" + super(HuobiWebsocketApiBase, self).__init__() + + self.gateway = gateway + self.gatewayName = gateway.gatewayName + + self.apiKey = '' + self.apiSecret = '' + self.signHost = '' + self.path = '' + + #---------------------------------------------------------------------- + def connect(self, apiKey, apiSecret, url): + """""" + self.apiKey = apiKey + self.apiSecret = apiSecret + + host, path = _split_url(url) + + self.init(url) + self.signHost = host + self.path = path + self.start() + + #---------------------------------------------------------------------- + def login(self): + params = { + 'op': 'auth', + } + params.update( + createSignature(self.apiKey, + 'GET', + self.signHost, + self.path, + self.apiSecret) + ) + return self.sendPacket(params) + + #---------------------------------------------------------------------- + def onLogin(self, packet): + pass + + #---------------------------------------------------------------------- + @staticmethod + def unpackData(data): + return json.loads(zlib.decompress(data, 31)) + + #---------------------------------------------------------------------- + def onPacket(self, packet): + """ + 这里我新增了一个onHuobiPacket的函数,也可以让子类重写这个函数,然后调用super.onPacket + """ + if 'ping' in packet: + self.sendPacket({'pong': packet['ping']}) + return + + # todo: use another error handing method + if 'err-msg' in packet: + return self.onHuobiErrorPacket(packet) + + if "op" in packet and packet["op"] == "auth": + return self.onLogin(packet) + + self.onHuobiPacket(packet) + + #---------------------------------------------------------------------- + def onHuobiPacket(self, packet): # type: (dict)->None + pass + + #---------------------------------------------------------------------- + def onHuobiErrorPacket(self, packet): # type: (dict)->None + print("error : {}".format(packet)) + + +######################################################################## +class HuobiAssetsWebsocketApi(HuobiWebsocketApiBase): + + def connect(self, apiKey, apiSecret, host=WEBSOCKET_ASSETS_HOST): + """ + 这里我使用重写connect,添加了默认参数。这样写感觉~~不太好~~,不过目前想到的比较好的方式就是这样了 + 虽然在Python中可以直接把这个connect()写成不接收host和path的形式,但是PyCharm会提示重载错误,所以不接收host和path似乎不太好? + + 我觉得最好的写法应该是这个函数不接收host和path。同时为了让PyCharm不提示重载错误(减少歧义),应该给 + HuobiWebsocketApiBase.connect起另外一个名字。 + """ + return super(HuobiAssetsWebsocketApi, self). \ + connect(apiKey, apiSecret, host) + + #---------------------------------------------------------------------- + def onConnected(self): + self.login() + + #---------------------------------------------------------------------- + def subscribeAccount(self): + """ + :param symbol: str ethbtc, ltcbtc, etcbtc, bchbtc + :param period: str 1min, 5min, 15min, 30min, 60min, 1day, 1mon, 1week, 1year + """ + self.sendPacket({ + "op": "sub", + "cid": "any thing you want", + "topic": "accounts" + }) + + #---------------------------------------------------------------------- + def onHuobiPacket(self, packet): # type: (dict)->None + if 'op' in packet: + if packet['op'] == 'sub': + timestamp = packet['ts'] + topic = packet['topic'] + """ + "data": { + "event": "order.match|order.place|order.refund|order.cancel|order.fee-refund|margin.transfer|margin.loan|margin.interest|margin.repay|other", + "list": [ + { + "account-id": 419013, + "currency": "usdt", + "type": "trade", + "balance": "500009195917.4362872650" + }, + { + "account-id": 419013, + "currency": "btc", + "type": "frozen", + "balance": "9786.6783000000" + } + ] + } + """ + pass + + +######################################################################## +class HuobiMarketWebsocketApi(HuobiWebsocketApiBase): + + #---------------------------------------------------------------------- + def connect(self, apiKey, apiSecret, host=WEBSOCKET_MARKET_HOST): + """ + 这里我使用重写connect,添加了默认参数。这样写感觉~~不太好~~,不过目前想到的比较好的方式就是这样了 + 虽然在Python中可以直接把这个connect()写成不接收host和path的形式,但是PyCharm会提示重载错误,所以不接收host和path似乎不太好? + + 我觉得最好的写法应该是这个函数不接收host和path。同时为了让PyCharm不提示重载错误(减少歧义),应该给 + HuobiWebsocketApiBase.connect起另外一个名字。 + """ + return super(HuobiMarketWebsocketApi, self). \ + connect(apiKey, apiSecret, host) + + #---------------------------------------------------------------------- + def subscribeKLine(self, symbol, period): # type:(str, str)->None + """ + :param symbol: str ethbtc, ltcbtc, etcbtc, bchbtc + :param period: str 1min, 5min, 15min, 30min, 60min, 1day, 1mon, 1week, 1year + :return: + """ + self.sendPacket({ + "sub": "market." + symbol + ".kline." + period, + "id": "any thing you want" + }) + + #---------------------------------------------------------------------- + def onHuobiPacket(self, packet): # type: (dict)->None + # code for test purpose only + if 'ch' in packet: + if packet['ch'] == 'market.btcusdt.kline.1min': + timestamp = packet['ts'] + data = packet['tick'] + id = data['id'] + amount = data['amount'] + count = data['count'] + open = data['open'] + close = data['close'] + low = data['low'] + high = data['high'] + vol = data['vol'] + pass diff --git a/examples/CryptoTrader/uiCryptoWindow.py b/examples/CryptoTrader/uiCryptoWindow.py index b491563e..45298e94 100644 --- a/examples/CryptoTrader/uiCryptoWindow.py +++ b/examples/CryptoTrader/uiCryptoWindow.py @@ -37,7 +37,7 @@ class MainWindow(QtWidgets.QMainWindow): #---------------------------------------------------------------------- def initUi(self): """初始化界面""" - self.setWindowTitle('VnTrader') + self.setWindowTitle('VN Crypto') self.initCentral() self.initMenu() self.initStatusBar() @@ -309,7 +309,7 @@ class AboutWidget(QtWidgets.QDialog): #---------------------------------------------------------------------- def initUi(self): """""" - self.setWindowTitle(vtText.ABOUT + 'VnTrader') + self.setWindowTitle(vtText.ABOUT + 'VN Crypto') text = u""" Developed by Traders, for Traders. diff --git a/examples/DataService/CccDataService/dataService.py b/examples/DataService/CccDataService/dataService.py index e7ffc88a..c7a0d1b0 100644 --- a/examples/DataService/CccDataService/dataService.py +++ b/examples/DataService/CccDataService/dataService.py @@ -10,7 +10,7 @@ import requests from pymongo import MongoClient, ASCENDING from vnpy.trader.vtObject import VtBarData -from vnpy.trader.app.ctaStrategy.ctaBase import MINUTE_DB_NAME +from vnpy.trader.app.ctaStrategy.ctaBase import MINUTE_DB_NAME, DAILY_DB_NAME # 加载配置 @@ -23,7 +23,7 @@ SYMBOLS = setting['SYMBOLS'] mc = MongoClient(MONGO_HOST, MONGO_PORT) # Mongo连接 db = mc[MINUTE_DB_NAME] # 数据库 - +dbDaily = mc[DAILY_DB_NAME] #---------------------------------------------------------------------- def generateVtBar(vtSymbol, d): @@ -87,6 +87,49 @@ def downloadMinuteBarBySymbol(vtSymbol, end): datetime.datetime.fromtimestamp(l[-1]['time']), cost)) +#---------------------------------------------------------------------- +def downloadDailyBarBySymbol(vtSymbol): + """下载某一合约的分钟线数据""" + startTime = time.time() + + cl = dbDaily[vtSymbol] + cl.ensure_index([('datetime', ASCENDING)], unique=True) + + symbol, exchange = vtSymbol.split('.') + fsym, tsym = symbol.split('/') + + url = 'https://min-api.cryptocompare.com/data/histoday' + params = { + 'fsym': fsym, + 'tsym': tsym, + 'e': exchange, + 'limit': 2000 + } + resp = requests.get(url, headers={}, params=params) + + if resp.status_code != 200: + print(u'%s数据下载失败' %vtSymbol) + return + + j = resp.json() + l = j['Data'] + + for d in l: + bar = generateVtBar(vtSymbol, d) + d = bar.__dict__ + flt = {'datetime': bar.datetime} + cl.replace_one(flt, d, True) + + endTime = time.time() + cost = (endTime - startTime) * 1000 + + + print(u'合约%s数据下载完成%s - %s,耗时%s毫秒' %(vtSymbol, + datetime.datetime.fromtimestamp(l[0]['time']), + datetime.datetime.fromtimestamp(l[-1]['time']), + cost)) + + #---------------------------------------------------------------------- def downloadAllMinuteBar(end): """下载所有配置中的合约的分钟线数据""" diff --git a/examples/DataService/CccDataService/downloadData.py b/examples/DataService/CccDataService/downloadData.py index 1da0bc31..7c7d9cc9 100644 --- a/examples/DataService/CccDataService/downloadData.py +++ b/examples/DataService/CccDataService/downloadData.py @@ -8,7 +8,9 @@ from dataService import * if __name__ == '__main__': - #downMinuteBarBySymbol('BTC/USDT.OKEX', '20181012') - #downMinuteBarBySymbol('BTC/USDT.HUOBIPRO', '20181012') - #downMinuteBarBySymbol('BTC/USDT.BINANCE', '20181012') - downloadAllMinuteBar('20181012') \ No newline at end of file + #downloadMinuteBarBySymbol('BTC/USDT.OKEX', '20181012') + #downloadMinuteBarBySymbol('BTC/USDT.HUOBIPRO', '20181012') + #downloadMinuteBarBySymbol('BTC/USDT.BINANCE', '20181012') + #downloadAllMinuteBar('20181012') + + downloadDailyBarBySymbol('BTC/USDT.BINANCE') \ No newline at end of file diff --git a/examples/DataService/RqdataDataService/config.json b/examples/DataService/RqdataDataService/config.json deleted file mode 100644 index 6c85451a..00000000 --- a/examples/DataService/RqdataDataService/config.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "MONGO_HOST": "localhost", - "MONGO_PORT": 27017, - - "SYMBOLS": ["510050", "510300"], - - "USERNAME": "", - "PASSWORD": "" -} \ No newline at end of file diff --git a/examples/DataService/RqdataDataService/dataService.py b/examples/DataService/RqdataDataService/dataService.py deleted file mode 100644 index b6736e6d..00000000 --- a/examples/DataService/RqdataDataService/dataService.py +++ /dev/null @@ -1,124 +0,0 @@ -# encoding: UTF-8 - -from __future__ import print_function -import sys -import json -from datetime import datetime -from time import time, sleep - -from pymongo import MongoClient, ASCENDING - -from vnpy.trader.vtObject import VtBarData -from vnpy.trader.app.ctaStrategy.ctaBase import MINUTE_DB_NAME, DAILY_DB_NAME - -import rqdatac as rq - -# 加载配置 -config = open('config.json') -setting = json.load(config) - -MONGO_HOST = setting['MONGO_HOST'] -MONGO_PORT = setting['MONGO_PORT'] -SYMBOLS = setting['SYMBOLS'] - -mc = MongoClient(MONGO_HOST, MONGO_PORT) # Mongo连接 -db = mc[MINUTE_DB_NAME] # 数据库 -db2 = mc[DAILY_DB_NAME] - -USERNAME = setting['USERNAME'] -PASSWORD = setting['PASSWORD'] -rq.init(USERNAME, PASSWORD) - -FIELDS = ['open', 'high', 'low', 'close', 'volume'] - -#---------------------------------------------------------------------- -def generateVtBar(row, symbol): - """生成K线""" - bar = VtBarData() - - bar.symbol = symbol - bar.vtSymbol = symbol - bar.open = row['open'] - bar.high = row['high'] - bar.low = row['low'] - bar.close = row['close'] - bar.volume = row['volume'] - bar.datetime = row.name - bar.date = bar.datetime.strftime("%Y%m%d") - bar.time = bar.datetime.strftime("%H:%M:%S") - - return bar - -#---------------------------------------------------------------------- -def downloadMinuteBarBySymbol(symbol): - """下载某一合约的分钟线数据""" - start = time() - - cl = db[symbol] - cl.ensure_index([('datetime', ASCENDING)], unique=True) # 添加索引 - - df = rq.get_price(symbol, frequency='1m', fields=FIELDS) - - for ix, row in df.iterrows(): - bar = generateVtBar(row, symbol) - d = bar.__dict__ - flt = {'datetime': bar.datetime} - cl.replace_one(flt, d, True) - - end = time() - cost = (end - start) * 1000 - - print(u'合约%s数据下载完成%s - %s,耗时%s毫秒' %(symbol, df.index[0], df.index[-1], cost)) - -#---------------------------------------------------------------------- -def downloadDailyBarBySymbol(symbol): - """下载某一合约日线数据""" - start = time() - - cl = db2[symbol] - cl.ensure_index([('datetime', ASCENDING)], unique=True) # 添加索引 - - df = rq.get_price(symbol, frequency='1d', fields=FIELDS, end_date=datetime.now().strftime('%Y%m%d')) - - for ix, row in df.iterrows(): - bar = generateVtBar(row, symbol) - d = bar.__dict__ - flt = {'datetime': bar.datetime} - cl.replace_one(flt, d, True) - - end = time() - cost = (end - start) * 1000 - - print(u'合约%s数据下载完成%s - %s,耗时%s毫秒' %(symbol, df.index[0], df.index[-1], cost)) - - -#---------------------------------------------------------------------- -def downloadAllMinuteBar(): - """下载所有配置中的合约的分钟线数据""" - print('-' * 50) - print(u'开始下载合约分钟线数据') - print('-' * 50) - - # 添加下载任务 - for symbol in SYMBOLS: - downloadMinuteBarBySymbol(str(symbol)) - - print('-' * 50) - print(u'合约分钟线数据下载完成') - print('-' * 50) - -#---------------------------------------------------------------------- -def downloadAllDailyBar(): - """下载所有配置中的合约的日数据""" - print('-' * 50) - print(u'开始下载合约日线数据') - print('-' * 50) - - # 添加下载任务 - for symbol in SYMBOLS: - downloadDailyBarBySymbol(str(symbol)) - - print('-' * 50) - print(u'合约日线数据下载完成') - print('-' * 50) - \ No newline at end of file diff --git a/examples/DataService/RqdataDataService/downloadData.py b/examples/DataService/RqdataDataService/downloadData.py deleted file mode 100644 index f0263ffc..00000000 --- a/examples/DataService/RqdataDataService/downloadData.py +++ /dev/null @@ -1,14 +0,0 @@ -# encoding: UTF-8 - -""" -立即下载数据到数据库中,用于手动执行更新操作。 -""" - -from dataService import * - - -if __name__ == '__main__': - downloadMinuteBarBySymbol('CU99') - downloadDailyBarBySymbol('IF99') - downloadDailyBarBySymbol('TA99') - downloadDailyBarBySymbol('I99') \ No newline at end of file diff --git a/examples/DataService/RqdataDataService/runService.py b/examples/DataService/RqdataDataService/runService.py deleted file mode 100644 index 58c79e30..00000000 --- a/examples/DataService/RqdataDataService/runService.py +++ /dev/null @@ -1,33 +0,0 @@ -# encoding: UTF-8 - -""" -定时服务,可无人值守运行,实现每日自动下载更新历史行情数据到数据库中。 -""" -from __future__ import print_function - -import time -import datetime - -from dataService import downloadAllMinuteBar - - -if __name__ == '__main__': - taskCompletedDate = None - - # 生成一个随机的任务下载时间,用于避免所有用户在同一时间访问数据服务器 - taskTime = datetime.time(hour=17, minute=0) - - # 进入主循环 - while True: - t = datetime.datetime.now() - - # 每天到达任务下载时间后,执行数据下载的操作 - if t.time() > taskTime and (taskCompletedDate is None or t.date() != taskCompletedDate): - downloadAllMinuteBar() - - # 更新任务完成的日期 - taskCompletedDate = t.date() - else: - print(u'当前时间%s,任务定时%s' %(t, taskTime)) - - time.sleep(60) \ No newline at end of file diff --git a/examples/DataService/TqDataService/README.md b/examples/DataService/TqDataService/README.md deleted file mode 100644 index 422744db..00000000 --- a/examples/DataService/TqDataService/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# 天勤历史行情服务 - -请在[www.tq18.cn](www.tq18.cn)下载天勤行情终端,安装运行后,即可使用该服务。 \ No newline at end of file diff --git a/examples/DataService/TqDataService/config.json b/examples/DataService/TqDataService/config.json deleted file mode 100644 index 467e7857..00000000 --- a/examples/DataService/TqDataService/config.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "MONGO_HOST": "localhost", - "MONGO_PORT": 27017 -} \ No newline at end of file diff --git a/examples/DataService/TqDataService/dataService.py b/examples/DataService/TqDataService/dataService.py deleted file mode 100644 index d7f2ed28..00000000 --- a/examples/DataService/TqDataService/dataService.py +++ /dev/null @@ -1,107 +0,0 @@ -# encoding: UTF-8 - -from __future__ import print_function -import sys -import json -from datetime import datetime -from time import sleep - -from pymongo import MongoClient, ASCENDING - -from vnpy.data.tq.vntq import TqApi -from vnpy.trader.vtObject import VtBarData -from vnpy.trader.app.ctaStrategy.ctaBase import MINUTE_DB_NAME - - -# 加载配置 -config = open('config.json') -setting = json.load(config) - -MONGO_HOST = setting['MONGO_HOST'] -MONGO_PORT = setting['MONGO_PORT'] - -mc = MongoClient(MONGO_HOST, MONGO_PORT) # Mongo连接 -db = mc[MINUTE_DB_NAME] # 数据库 - -api = TqApi() # 历史行情服务API对象 -api.connect() # 连接 -taskList = [] # 下载任务列表 - - -#---------------------------------------------------------------------- -def generateVtBar(symbol, d): - """生成K线""" - bar = VtBarData() - - bar.symbol = symbol - bar.vtSymbol = symbol - bar.open = d['open'] - bar.high = d['high'] - bar.low = d['low'] - bar.close = d['close'] - bar.volume = d['volume'] - bar.openInterest = d['open_oi'] - bar.datetime = datetime.fromtimestamp(d['datetime']/1000000000) - bar.date = bar.datetime.strftime("%Y%m%d") - bar.time = bar.datetime.strftime("%H:%M:%S") - - return bar - -#---------------------------------------------------------------------- -def onChart(symbol, seconds): - """K线更新处理函数""" - # 避免重复记录已经完成的任务 - if symbol not in taskList: - return - - serial = api.get_kline_serial(symbol, seconds) - - cl = db[symbol] # 集合 - cl.ensure_index([('datetime', ASCENDING)], unique=True) # 添加索引 - - l = serial.values() - for d in l: - bar = generateVtBar(symbol, d) - d = bar.__dict__ - flt = {'datetime': bar.datetime} - cl.replace_one(flt, d, True) - - start = datetime.fromtimestamp(l[0]['datetime']/1000000000) - end = datetime.fromtimestamp(l[-1]['datetime']/1000000000) - print(u'合约%s下载完成%s - %s' %(symbol, start, end)) - - # 移除已经完成的任务 - if symbol in taskList: - taskList.remove(symbol) - -#---------------------------------------------------------------------- -def downMinuteBarBySymbol(symbol, num): - """下载某一合约的分钟线数据""" - api.subscribe_chart(symbol, 60, num, onChart) - -#---------------------------------------------------------------------- -def downloadAllMinuteBar(num, symbols): - """下载所有配置中的合约的分钟线数据""" - print('-' * 50) - print(u'开始下载合约分钟线数据') - print('-' * 50) - - # 添加下载任务 - taskList.extend(symbols) - - for symbol in symbols: - downMinuteBarBySymbol(str(symbol), num) - - while True: - sleep(2) - - # 如果任务列表为空,则说明数据已经全部下载完成 - if not taskList: - print('-' * 50) - print(u'合约分钟线数据下载完成') - print('-' * 50) - return - - - - \ No newline at end of file diff --git a/examples/DataService/TqDataService/downloadData.py b/examples/DataService/TqDataService/downloadData.py deleted file mode 100644 index 246df77a..00000000 --- a/examples/DataService/TqDataService/downloadData.py +++ /dev/null @@ -1,17 +0,0 @@ -# encoding: UTF-8 - -""" -立即下载数据到数据库中,用于手动执行更新操作。 - -注意: 请先在本机启动天勤终端 (0.8.0 以上版本) 并保持运行, 再执行本程序 -""" - -from dataService import * - - - -if __name__ == '__main__': - symbols = ["CFFEX.IF1710", "CFFEX.IF1711", "CFFEX.IF1712", "CFFEX.IF1803", - "CFFEX.IH1710", "CFFEX.IH1711", "CFFEX.IH1712", "CFFEX.IH1803", - "CFFEX.IC1710", "CFFEX.IC1711", "CFFEX.IC1712", "CFFEX.IC1803"] - downloadAllMinuteBar(1000, symbols) \ No newline at end of file diff --git a/examples/DataService/TqDataService/runService.py b/examples/DataService/TqDataService/runService.py deleted file mode 100644 index cddbc89e..00000000 --- a/examples/DataService/TqDataService/runService.py +++ /dev/null @@ -1,40 +0,0 @@ -# encoding: UTF-8 - -""" -定时服务,可无人值守运行,实现每日自动下载更新历史行情数据到数据库中。 - -注意: 请确保本程序运行时, 本机天勤终端 (0.8.0 以上版本)正在运行中 -""" -from __future__ import print_function - -import time -import datetime - -from dataService import downloadAllMinuteBar - - -if __name__ == '__main__': - taskCompletedDate = None - - # 生成一个随机的任务下载时间,用于避免所有用户在同一时间访问数据服务器 - taskTime = datetime.time(hour=17, minute=0) - - symbols = ["CFFEX.IF1710", "CFFEX.IF1711", "CFFEX.IF1712", "CFFEX.IF1803", - "CFFEX.IH1710", "CFFEX.IH1711", "CFFEX.IH1712", "CFFEX.IH1803", - "CFFEX.IC1710", "CFFEX.IC1711", "CFFEX.IC1712", "CFFEX.IC1803"] - - # 进入主循环 - while True: - t = datetime.datetime.now() - - # 每天到达任务下载时间后,执行数据下载的操作 - if t.time() > taskTime and (taskCompletedDate is None or t.date() != taskCompletedDate): - # 下载1000根分钟线数据,足以覆盖过去两天的行情 - downloadAllMinuteBar(1000, symbols) - - # 更新任务完成的日期 - taskCompletedDate = t.date() - else: - print(u'当前时间%s,任务定时%s' %(t, taskTime)) - - time.sleep(60) \ No newline at end of file diff --git a/examples/RQData/README.md b/examples/RQData/README.md new file mode 100644 index 00000000..81371b84 --- /dev/null +++ b/examples/RQData/README.md @@ -0,0 +1,10 @@ +### RQData数据自动更新服务 + +vn.py官方推荐的历史数据解决方案,由RiceQuant提供的高质量期货数据服务。 + +使用步骤: +1. 前往[RQData主页](https://www.ricequant.com/purchase#1),购买标准版账户或者申请试用 +2. 获得账户后将自动下载make.bat文件,在make.bat中找到name和password信息 +3. 打开config.json,在rqUsername和rqPassword中填入上述信息,并在product列表中,填入需要更新行情的期货合约产品代码(只包含英文字母即可) +4. 双击“启动更新服务.bat”,来启动RQData数据自动更新服务,点击右上角关闭按钮可最小化到右下方的托盘栏 +5. 在托盘栏图标上点击右键菜单中的“退出”可以退出程序 diff --git a/examples/RQData/config.json b/examples/RQData/config.json new file mode 100644 index 00000000..8780a990 --- /dev/null +++ b/examples/RQData/config.json @@ -0,0 +1,5 @@ +{ + "rqUsername": "", + "rqPassword": "", + "product": ["IF"] +} \ No newline at end of file diff --git a/examples/RQData/dataService.py b/examples/RQData/dataService.py new file mode 100644 index 00000000..33a1e9f4 --- /dev/null +++ b/examples/RQData/dataService.py @@ -0,0 +1,160 @@ +# encoding: UTF-8 + +from __future__ import print_function +import sys +import json +from datetime import datetime +from time import time, sleep + +from pymongo import MongoClient, ASCENDING +import pandas as pd + +from vnpy.trader.vtObject import VtBarData, VtTickData +from vnpy.trader.app.ctaStrategy.ctaBase import (MINUTE_DB_NAME, + DAILY_DB_NAME, + TICK_DB_NAME) + +import rqdatac as rq + +# 加载配置 +config = open('config.json') +setting = json.load(config) + +mc = MongoClient() # Mongo连接 +dbMinute = mc[MINUTE_DB_NAME] # 数据库 +dbDaily = mc[DAILY_DB_NAME] +dbTick = mc[TICK_DB_NAME] + +USERNAME = setting['rqUsername'] +PASSWORD = setting['rqPassword'] +rq.init(USERNAME, PASSWORD) + +FIELDS = ['open', 'high', 'low', 'close', 'volume'] + +#---------------------------------------------------------------------- +def generateVtBar(row, symbol): + """生成K线""" + bar = VtBarData() + + bar.symbol = symbol + bar.vtSymbol = symbol + bar.open = row['open'] + bar.high = row['high'] + bar.low = row['low'] + bar.close = row['close'] + bar.volume = row['volume'] + bar.datetime = row.name + bar.date = bar.datetime.strftime("%Y%m%d") + bar.time = bar.datetime.strftime("%H:%M:%S") + + return bar + +#---------------------------------------------------------------------- +def generateVtTick(row, symbol): + """生成K线""" + tick = VtTickData() + tick.symbol = symbol + tick.vtSymbol = symbol + + tick.lastPrice = row['last'] + tick.volume = row['volume'] + tick.openInterest = row['open_interest'] + tick.datetime = row.name + tick.openPrice = row['open'] + tick.highPrice = row['high'] + tick.lowPrice = row['low'] + tick.preClosePrice = row['prev_close'] + tick.upperLimit = row['limit_up'] + tick.lowerLimit = row['limit_down'] + + tick.bidPrice1 = row['b1'] + tick.bidPrice2 = row['b2'] + tick.bidPrice3 = row['b3'] + tick.bidPrice4 = row['b4'] + tick.bidPrice5 = row['b5'] + + tick.bidVolume1 = row['b1_v'] + tick.bidVolume2 = row['b2_v'] + tick.bidVolume3 = row['b3_v'] + tick.bidVolume4 = row['b4_v'] + tick.bidVolume5 = row['b5_v'] + + tick.askPrice1 = row['a1'] + tick.askPrice2 = row['a2'] + tick.askPrice3 = row['a3'] + tick.askPrice4 = row['a4'] + tick.askPrice5 = row['a5'] + + tick.askVolume1 = row['a1_v'] + tick.askVolume2 = row['a2_v'] + tick.askVolume3 = row['a3_v'] + tick.askVolume4 = row['a4_v'] + tick.askVolume5 = row['a5_v'] + + return tick + +#---------------------------------------------------------------------- +def downloadMinuteBarBySymbol(symbol): + """下载某一合约的分钟线数据""" + start = time() + + cl = dbMinute[symbol] + cl.ensure_index([('datetime', ASCENDING)], unique=True) # 添加索引 + + df = rq.get_price(symbol, frequency='1m', fields=FIELDS) + + for ix, row in df.iterrows(): + bar = generateVtBar(row, symbol) + d = bar.__dict__ + flt = {'datetime': bar.datetime} + cl.replace_one(flt, d, True) + + end = time() + cost = (end - start) * 1000 + + print(u'合约%s的分钟K线数据下载完成%s - %s,耗时%s毫秒' %(symbol, df.index[0], df.index[-1], cost)) + +#---------------------------------------------------------------------- +def downloadDailyBarBySymbol(symbol): + """下载某一合约日线数据""" + start = time() + + cl = dbDaily[symbol] + cl.ensure_index([('datetime', ASCENDING)], unique=True) # 添加索引 + + df = rq.get_price(symbol, frequency='1d', fields=FIELDS, end_date=datetime.now().strftime('%Y%m%d')) + + for ix, row in df.iterrows(): + bar = generateVtBar(row, symbol) + d = bar.__dict__ + flt = {'datetime': bar.datetime} + cl.replace_one(flt, d, True) + + end = time() + cost = (end - start) * 1000 + + print(u'合约%s的日K线数据下载完成%s - %s,耗时%s毫秒' %(symbol, df.index[0], df.index[-1], cost)) + +#---------------------------------------------------------------------- +def downloadTickBySymbol(symbol, date): + """下载某一合约日线数据""" + start = time() + + cl = dbTick[symbol] + cl.ensure_index([('datetime', ASCENDING)], unique=True) # 添加索引 + + df = rq.get_price(symbol, + frequency='tick', + start_date=date, + end_date=date) + + for ix, row in df.iterrows(): + tick = generateVtTick(row, symbol) + d = tick.__dict__ + flt = {'datetime': tick.datetime} + cl.replace_one(flt, d, True) + + end = time() + cost = (end - start) * 1000 + + print(u'合约%sTick数据下载完成%s - %s,耗时%s毫秒' %(symbol, df.index[0], df.index[-1], cost)) diff --git a/examples/RQData/downloadData.py b/examples/RQData/downloadData.py new file mode 100644 index 00000000..19cc7c95 --- /dev/null +++ b/examples/RQData/downloadData.py @@ -0,0 +1,15 @@ +# encoding: UTF-8 + +""" +立即下载数据到数据库中,用于手动执行更新操作。 +""" + +from dataService import * + + +if __name__ == '__main__': + #downloadMinuteBarBySymbol('CU99') + #downloadDailyBarBySymbol('IF99') + #downloadDailyBarBySymbol('TA99') + #downloadDailyBarBySymbol('I99') + downloadTickBySymbol('IF1901', '2018-12-21') \ No newline at end of file diff --git a/examples/RQData/run.py b/examples/RQData/run.py new file mode 100644 index 00000000..ce71f681 --- /dev/null +++ b/examples/RQData/run.py @@ -0,0 +1,274 @@ +# encoding: UTF-8 + +from __future__ import print_function + +import json +import ctypes +from datetime import datetime, timedelta, time +from time import sleep +from threading import Thread +from collections import OrderedDict + +import qdarkstyle +from pymongo import MongoClient, ASCENDING, DESCENDING +from pymongo.errors import ConnectionFailure + +from vnpy.trader.uiQt import QtCore, QtWidgets, QtGui +from vnpy.trader.vtObject import VtBarData +from vnpy.trader.app.ctaStrategy.ctaBase import MINUTE_DB_NAME, DAILY_DB_NAME + + +DAY_START = time(9, 0) # 日盘启动和停止时间 +DAY_END = time(15, 15) +NIGHT_START = time(21, 0) # 夜盘启动和停止时间 +NIGHT_END = time(2, 30) + + +######################################################################## +class RqDataManager(QtWidgets.QWidget): + """""" + signal = QtCore.Signal(str) + + #---------------------------------------------------------------------- + def __init__(self): + """Constructor""" + super(RqDataManager, self).__init__() + + self.client = None + self.rq = None + self.thread = Thread(target=self.run) + + self.productList = [] + self.symbolExchangeDict = OrderedDict() + + self.initUi() + + n1 = self.connectMongo() + if not n1: + return + + n2 = self.initRqData() + if not n2: + return + + self.count = 0 + self.active = True + self.thread.start() + + #---------------------------------------------------------------------- + def connectMongo(self): + """连接数据库""" + try: + self.client = MongoClient(serverSelectionTimeoutMS=10) + self.client.server_info() + self.writeLog(u'MongoDB连接成功') + return True + except ConnectionFailure: + self.client = None + self.writeLog(u'MongoDB连接失败') + return False + + #---------------------------------------------------------------------- + def initUi(self): + """初始化界面""" + self.setWindowTitle(u'RQData数据服务') + self.setWindowIcon(QtGui.QIcon('vnpy.ico')) + + self.setFixedHeight(500) + self.setFixedWidth(900) + + self.logMonitor = QtWidgets.QTextEdit() + self.logMonitor.setReadOnly(True) + + vbox = QtWidgets.QVBoxLayout() + vbox.addWidget(self.logMonitor) + self.setLayout(vbox) + + self.signal.connect(self.updateLog) + + # 托盘配置 + self.tray = QtWidgets.QSystemTrayIcon() + self.tray.setIcon(QtGui.QIcon('vnpy.ico')) + self.tray.activated.connect(self.showManager) + + restoreAction = QtWidgets.QAction(u'还原', self, triggered=self.show) + quitAction = QtWidgets.QAction(u'退出', self, triggered=self.exit) + + menu = QtWidgets.QMenu(QtWidgets.QApplication.desktop()) + menu.addAction(restoreAction) + menu.addAction(quitAction) + self.tray.setContextMenu(menu) + + self.tray.show() + + #---------------------------------------------------------------------- + def initRqData(self): + """""" + with open('config.json') as config: + setting = json.load(config) + + for product in setting['product']: + self.productList.append(product.upper()) + + # 检查是否填写了RQData配置 + username = setting.get('rqUsername', None) + password = setting.get('rqPassword', None) + if not username or not password: + self.writeLog(u'RQData的用户名和密码配置错误,请在config.json中修改') + return False + + # 加载RQData + try: + import rqdatac as rq + except ImportError: + self.writeLog(u'没有安装RQData客户端,请先安装rqdatac') + return False + + # 登录RQData + self.rq = rq + self.rq.init(username, password) + + # 获取本日可交易合约代码 + try: + df = self.rq.all_instruments(type='Future', date=datetime.now()) + for ix, row in df.iterrows(): + self.symbolExchangeDict[row['order_book_id']] = row['exchange'] + except RuntimeError: + self.writeLog(u'RQData的用户名和密码无效,请联系米筐申请试用或者购买') + return False + + self.writeLog(u'RQData客户端登录成功') + return True + + #---------------------------------------------------------------------- + def downloadBar(self, symbol, frequency): + """下载合约数据""" + if 'frequency' == '1m': + db = self.client[MINUTE_DB_NAME] + else: + db = self.client[DAILY_DB_NAME] + + # 上期所和大商所代码改为小写 + exchange = self.symbolExchangeDict[symbol] + if exchange in ['SHFE', 'DCE']: + localSymbol = symbol.lower() + else: + localSymbol = symbol + collection = db[localSymbol] + + # 获取本地数据库中最后一条记录的时间,并下载新数据 + result = collection.find_one(sort=[("datetime", DESCENDING)]) + if result: + startDate = result['datetime'] + else: + startDate = '20180101' + + if startDate: + self.writeLog(u'%s下载更新数据,开始时间:%s' %(localSymbol, startDate)) + else: + self.writeLog(u'%s初次下载数据,耗时可能较长,请耐心等待' %(localSymbol)) + + df = self.rq.get_price(symbol, + frequency=frequency, + fields=['open', 'high', 'low', 'close', 'volume'], + start_date=startDate, + end_date=datetime.now()) + + # 插入到数据库 + for ix, row in df.iterrows(): + bar = self.generateBar(row, localSymbol) + d = bar.__dict__ + flt = {'datetime': bar.datetime} + collection.replace_one(flt, d, True) + + self.writeLog(u'%s数据更新完成:%s - %s' %(localSymbol, df.index[0], df.index[-1])) + + #---------------------------------------------------------------------- + def generateBar(self, row, symbol): + """生成K线对象""" + bar = VtBarData() + + bar.symbol = symbol + bar.vtSymbol = symbol + bar.open = row['open'] + bar.high = row['high'] + bar.low = row['low'] + bar.close = row['close'] + bar.volume = row['volume'] + bar.datetime = row.name + bar.date = bar.datetime.strftime("%Y%m%d") + bar.time = bar.datetime.strftime("%H:%M:%S") + + return bar + + #---------------------------------------------------------------------- + def writeLog(self, msg): + """记录日志""" + self.signal.emit(msg) + + #---------------------------------------------------------------------- + def updateLog(self, msg): + """更新日志""" + dt = datetime.now() + msg = '%s: %s' %(dt, msg) + self.logMonitor.append(msg) + + #---------------------------------------------------------------------- + def run(self): + """运行""" + while self.active: + sleep(1) + + self.count += 1 + if self.count < 10: + continue + self.count = 0 + + now = datetime.now().time() + if ((DAY_START <= now <= DAY_END) or + (now >= NIGHT_START) or + (now <= NIGHT_END)): + for symbol in self.symbolExchangeDict.keys(): + download = False + for product in self.productList: + if product in symbol: + download = True + + if download: + self.downloadBar(symbol, '1m') + else: + self.writeLog(u'非交易时间段,不执行更新') + + #---------------------------------------------------------------------- + def showManager(self, reason): + """""" + self.show() + + #---------------------------------------------------------------------- + def closeEvent(self, event): + """""" + self.hide() + event.ignore() + + #---------------------------------------------------------------------- + def exit(self): + """""" + self.active = False + self.thread.join() + + QtWidgets.qApp.quit() + + +if __name__ == '__main__': + font = QtGui.QFont(u'微软雅黑', 12) + + app = QtWidgets.QApplication([]) + app.setFont(font) + app.setStyleSheet(qdarkstyle.load_stylesheet_from_environment()) + + ctypes.windll.shell32.SetCurrentProcessExplicitAppUserModelID('RQDataService') + + manager = RqDataManager() + manager.show() + + app.exec_() \ No newline at end of file diff --git a/examples/RQData/vnpy.ico b/examples/RQData/vnpy.ico new file mode 100644 index 00000000..f5146d56 Binary files /dev/null and b/examples/RQData/vnpy.ico differ diff --git a/examples/RQData/启动更新服务.bat b/examples/RQData/启动更新服务.bat new file mode 100644 index 00000000..900482a1 --- /dev/null +++ b/examples/RQData/启动更新服务.bat @@ -0,0 +1 @@ +pythonw run.py \ No newline at end of file diff --git a/examples/VnTrader/CTA_setting.json b/examples/VnTrader/CTA_setting.json index c1d08f4a..37bbd46d 100644 --- a/examples/VnTrader/CTA_setting.json +++ b/examples/VnTrader/CTA_setting.json @@ -2,18 +2,18 @@ { "name": "double ema", "className": "DoubleMaStrategy", - "vtSymbol": "rb1805" + "vtSymbol": "rb1905" }, { "name": "atr rsi", "className": "AtrRsiStrategy", - "vtSymbol": "IC1802" + "vtSymbol": "IC1901" }, { "name": "king keltner", "className": "KkStrategy", - "vtSymbol": "IH1802" + "vtSymbol": "IH1901" } ] \ No newline at end of file diff --git a/examples/VnTrader/VT_setting.json b/examples/VnTrader/VT_setting.json index 473017fb..01cefad2 100644 --- a/examples/VnTrader/VT_setting.json +++ b/examples/VnTrader/VT_setting.json @@ -16,5 +16,8 @@ "tdPenalty": ["IF", "IH", "IC"], - "maxDecimal": 4 + "maxDecimal": 4, + + "rqUsername": "", + "rqPassword": "" } \ No newline at end of file diff --git a/examples/VnTrader/run.py b/examples/VnTrader/run.py index b2975c27..53872f1f 100644 --- a/examples/VnTrader/run.py +++ b/examples/VnTrader/run.py @@ -35,7 +35,8 @@ elif system == 'Windows': # 加载上层应用 from vnpy.trader.app import (riskManager, ctaStrategy, - spreadTrading, algoTrading) + spreadTrading, algoTrading, + tradeCopy) #---------------------------------------------------------------------- @@ -67,6 +68,7 @@ def main(): me.addApp(ctaStrategy) me.addApp(spreadTrading) me.addApp(algoTrading) + me.addApp(tradeCopy) # 创建主窗口 mw = MainWindow(me, ee) diff --git a/requirements.txt b/requirements.txt index 6751b70d..9d87d871 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,18 +2,15 @@ pymongo websocket-client msgpack-python qdarkstyle -SortedContainers -futuquant wmi future flask-socketio flask-restful flask-cors gevent-websocket -pyjwt ccxt pyqtgraph qtpy psutil ta-lib - +futu-api diff --git a/vnpy/__init__.py b/vnpy/__init__.py index 708620bf..4d027381 100644 --- a/vnpy/__init__.py +++ b/vnpy/__init__.py @@ -1,4 +1,4 @@ # encoding: UTF-8 -__version__ = '1.9.0' +__version__ = '1.9.2' __author__ = 'Xiaoyou Chen' \ No newline at end of file diff --git a/vnpy/data/README.md b/vnpy/data/README.md index fd16a850..44dac7a5 100644 --- a/vnpy/data/README.md +++ b/vnpy/data/README.md @@ -1,6 +1,5 @@ # vn.data - 数据相关工具 ### 历史数据 -* datayes:通联数据接口 -* shcifco:上海中期接口 -* tq:天勤数据接口 \ No newline at end of file + +* shcifco:上海中期接口 \ No newline at end of file diff --git a/vnpy/data/datayes/__init__.py b/vnpy/data/datayes/__init__.py deleted file mode 100644 index 05395b56..00000000 --- a/vnpy/data/datayes/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from __future__ import absolute_import -from .vndatayes import DatayesApi \ No newline at end of file diff --git a/vnpy/data/datayes/vndatayes.py b/vnpy/data/datayes/vndatayes.py deleted file mode 100644 index ed6e44bd..00000000 --- a/vnpy/data/datayes/vndatayes.py +++ /dev/null @@ -1,53 +0,0 @@ -# encoding: UTF-8 - -'''一个简单的通联数据客户端,主要使用requests开发,比通联官网的python例子更为简洁。''' -from __future__ import print_function - -import os -import requests -import json - - -HTTP_OK = 200 - - -######################################################################## -class DatayesApi(object): - """通联数据API""" - - #---------------------------------------------------------------------- - def __init__(self, token, - domain="http://api.wmcloud.com/data", - version="v1"): - """Constructor""" - self.domain = domain # 主域名 - self.version = version # API版本 - self.token = token # 授权码 - - self.header = {} # http请求头部 - self.header['Connection'] = 'keep_alive' - self.header['Authorization'] = 'Bearer ' + self.token - - #---------------------------------------------------------------------- - def downloadData(self, path, params): - """下载数据""" - url = '/'.join([self.domain, self.version, path]) - r = requests.get(url=url, headers=self.header, params=params) - - if r.status_code != HTTP_OK: - print(u'http请求失败,状态代码%s' %r.status_code) - return None - else: - result = r.json() - if 'retMsg' in result and result['retMsg'] == 'Success': - return result['data'] - else: - if 'retMsg' in result: - print(u'查询失败,返回信息%s' %result['retMsg']) - elif 'message' in result: - print(u'查询失败,返回信息%s' %result['message']) - return None - - - - \ No newline at end of file diff --git a/vnpy/data/tq/test.py b/vnpy/data/tq/test.py deleted file mode 100644 index f91cb0a9..00000000 --- a/vnpy/data/tq/test.py +++ /dev/null @@ -1,49 +0,0 @@ -# encoding: UTF-8 - -from __future__ import print_function -from __future__ import absolute_import -from six import input - -from .vntq import TqApi - -# 接口对象 -api = None - - -#---------------------------------------------------------------------- -def onQuote(symbol): - """Tick更新""" - print('-' * 30) - print('onQuote') - quote = api.get_quote(symbol) - print(quote) - - -#---------------------------------------------------------------------- -def onChart(symbol, seconds): - """K线更新""" - print('-' * 30) - print('onChart') - - if seconds == 0: - serial = api.get_tick_serial(symbol) - else: - serial = api.get_kline_serial(symbol, seconds) - - print(serial) - - -if __name__ == "__main__": - symbol = 'CFFEX.IF1710' - api = TqApi() - api.connect() - - # 订阅Tick推送 - #api.subscribe_quote([symbol], onQuote) - - # 订阅Tick图表 - #api.subscribe_chart(symbol, 0, 100, onChart) - - # 订阅K线图表 - api.subscribe_chart(symbol, 60, 1000, onChart) - input() diff --git a/vnpy/data/tq/vntq.py b/vnpy/data/tq/vntq.py deleted file mode 100644 index e4ca49ea..00000000 --- a/vnpy/data/tq/vntq.py +++ /dev/null @@ -1,282 +0,0 @@ -# encoding: UTF-8 - -""" -对接天勤行情的网关接口,可以提供国内期货的报价/K线/Tick序列等数据的实时推送和历史仿真 -使用时需要在本机先启动一个天勤终端进程 -天勤行情终端: http://www.tq18.cn -天勤接口文档: http://doc.tq18.cn/tq/latest/extension/wsapi/index.html -""" -from __future__ import print_function - - -import json -import threading -import tornado -from tornado import websocket -from sortedcontainers import SortedDict - -######################################################################## -class TqApi(object): - """天勤行情接口""" - - #---------------------------------------------------------------------- - def __init__(self): - """Constructor""" - self.data = {} # 数据存储 - - self.client = None # websocket客户端 - self.requests = [] # 请求缓存 - - self.quote_callback_func = None # tick回调函数 - self.quote_ins_list = [] - self.chart_subscribes = {} # k线回调函数 - - #---------------------------------------------------------------------- - def connect(self): - """ - 建立行情连接。 - """ - self.start() - - # 启动tornado的IO线程 - loop_thread = threading.Thread(target=lambda: tornado.ioloop.IOLoop.current().start()) - loop_thread.setDaemon(True) - loop_thread.start() - - #---------------------------------------------------------------------- - def subscribe_quote(self, ins_list, callback_func=None): - """ - 订阅实时行情. - 指定一个合约列表,订阅其实时报价信息 - 每次调用此函数时,都会覆盖前一次的订阅设定,不在订阅列表里的合约,会停止行情推送 - :param ins_list: ins_list 是一个列表,列出全部需要实时行情的合约代码。注意:天勤接口从0.8版本开始,合约代码格式变更为 交易所代码.合约代码的格式. 交易所代码如下: - CFFEX: 中金所 - SHFE: 上期所 - DCE: 大商所 - CZCE: 郑商所 - INE: 能源交易所(原油) - :param callback_func (可选): callback_func 是一个回调函数,每当有报价数据变更时会触发。此函数应该接受一个参数 ins_id - :example: - 订阅 SHFE.cu1803,CZCE.SR709,CFFEX.IF1709 这三个合约的报价: subscribe_quote(["SHFE.cu1803", ”CZCE.SR709", "CFFEX.IF1709"]) - """ - if callback_func: - self.quote_callback_func = callback_func - self.quote_ins_list = ins_list - - req = { - "aid": "subscribe_quote", - "ins_list": ",".join(ins_list), - } - self.send_json(req) - - #---------------------------------------------------------------------- - def subscribe_chart(self, ins_id, duration_seconds, data_length=200, callback_func=None): - """ - 订阅历史行情序列. - 订阅指定合约及周期的历史行情序列(K线数据序列或Tick数据序列),这些序列数据会持续推送 - :param ins_id: 合约代码,需注意大小写 - :param duration_seconds: 历史数据周期,以秒为单位。目前支持的周期包括: - 3秒,5秒,10秒,15秒,20秒,30秒,1分钟,2分钟,3分钟,5分钟,10分钟,15分钟,20分钟,30分钟,1小时,2小时,4小时,1日 - 特别的,此值指定为0表示订阅tick序列。 - :param data_length: 需要获取的序列长度。每个序列最大支持请求 8964 个数据 - :param callback_func (可选): callback_func 是一个回调函数,每当序列数据变更时会触发。此函数应该接受2个参数 ins_id, duration_seconds - :example: - 订阅 SHFE.cu1803 的1分钟线: subscribe_chart("SHFE.cu1803", 60) - 订阅 CFFEX.IF1709 的tick线: subscribe_chart("CFFEX.IF1709", 0) - """ - chart_id = self._generate_chart_id(ins_id, duration_seconds) - - # 限制最大数据长度 - if data_length > 8964: - data_length = 8964 - - req = { - "aid": "set_chart", - "chart_id": chart_id, - "ins_list": ins_id, - "duration": duration_seconds * 1000000000, - "view_width": data_length, - } - self.send_json(req) - self.chart_subscribes[chart_id] = req - self.chart_subscribes[chart_id]["callback"] = callback_func - - #---------------------------------------------------------------------- - def get_quote(self, ins_id): - """ - 获取报价数据 - :param ins_id: 指定合约代码 - :return: 若指定的数据不存在,返回None,否则返回如下所示的一个dict - { - u'datetime': u'2017-07-26 23:04:21.000001',# tick从交易所发出的时间(按北京时区) - u'instrument_id': u'CZCE.SR801', # 合约代码 - u'last_price': 6122.0, # 最新价 - u'bid_price1': 6121.0, # 买一价 - u'ask_price1': 6122.0, # 卖一价 - u'bid_volume1': 54, # 买一量 - u'ask_volume1': 66, # 卖一量 - u'upper_limit': 6388.0, # 涨停价 - u'lower_limit': 5896.0, # 跌停价 - u'volume': 89252, # 成交量 - u'amount': 5461329880.0, # 成交额 - u'open_interest': 616424, # 持仓量 - u'highest': 6129.0, # 当日最高价 - u'lowest': 6101.0, # 当日最低价 - u'average': 6119.0, # 当日均价 - u'open': 6102.0, # 开盘价 - u'close': u'-', # 收盘价 - u'settlement': u'-', # 结算价 - u'pre_close': 6106.0, # 昨收盘价 - u'pre_settlement': 6142.0 # 昨结算价 - u'pre_open_interest': 616620, # 昨持仓量 - } - """ - return self.data.setdefault("quotes", {}).get(ins_id, None) - - #---------------------------------------------------------------------- - def get_tick_serial(self, ins_id): - """ - 获取tick序列数据 - :param ins_id: 指定合约代码 - :return: 若指定的序列数据不存在,返回None,否则返回如下所示的一个dict - { - u'485107':{ # 每个Tick都有一个唯一编号,在一个序列中,编号总是连续递增的 - u'datetime': 1501074872000000000L, # tick从交易所发出的时间(按北京时区),以nano epoch 方式表示(等于从1970-01-01时刻开始的纳秒数) - u'trading_day': 1501084800000000000L, #交易日, 格式同上 - u'last_price': 3887, # 最新价 - u'bid_price1': 3881, # 买一价 - u'ask_price1': 3886, # 卖一价 - u'bid_volume1': 5, # 买一量 - u'ask_volume1': 1, #卖一量 - u'highest': 3887, # 当日最高价 - u'lowest': 3886, # 当日最低价 - u'volume': 6, # 成交量 - u'open_interest': 1796 # 持仓量 - }, - u'485108': { - ... - } - } - """ - return self.data.setdefault("ticks", {}).setdefault(ins_id, {}).get("data", None) - - #---------------------------------------------------------------------- - def get_kline_serial(self, ins_id, duration_seconds): - """ - 获取k线序列数据 - :param ins_id: 指定合约代码 - :param duration_seconds: 指定K线周期 - :return: 若指定的序列数据不存在,返回None,否则返回如下所示的一个dict - { - u'494835': { # 每根K线都有一个唯一编号,在一个序列中,编号总是连续递增的 - u'datetime': 1501080715000000000L, # K线起点时间(按北京时区),以nano epoch 方式表示(等于从1970-01-01时刻开始的纳秒数) - u'open': 51450, # K线起始时刻的最新价 - u'high': 51450, # K线时间范围内的最高价 - u'low': 51450, # K线时间范围内的最低价 - u'close': 51450, # K线结束时刻的最新价 - u'volume': 0, # K线时间范围内的成交量 - u'open_oi': 27354, # K线起始时刻的持仓量 - u'close_oi': 27354 # K线结束时刻的持仓量 - }, - u'494836': { - ... - } - } - """ - dur_id = "%d" % (duration_seconds * 1000000000) - return self.data.setdefault("klines", {}).setdefault(ins_id, {}).setdefault(dur_id, {}).get("data", None) - - #---------------------------------------------------------------------- - @tornado.gen.coroutine - def start(self): - """启动websocket客户端""" - self.client = yield tornado.websocket.websocket_connect(url="ws://127.0.0.1:7777/") - - # 发出所有缓存的请求 - for req in self.requests: - self.client.write_message(req) - self.requests = [] - - # 协程式读取数据 - while True: - msg = yield self.client.read_message() - self.on_receive_msg(msg) - - #---------------------------------------------------------------------- - def send_json(self, obj): - """发送JSON内容""" - s = json.dumps(obj) - - # 如果已经创建了客户端则直接发出请求 - if self.client: - self.client.write_message(s) - # 否则缓存在请求缓存中 - else: - self.requests.append(s) - - #---------------------------------------------------------------------- - def on_receive_msg(self, msg): - """收到数据推送""" - pack = json.loads(msg) - - if 'data' in pack: - l = pack["data"] - else: - print(u'on_receive_msg收到的数据中没有data字段,数据内容%s' %str(pack)) - return - - for data in l: - # 合并更新数据字典 - self._merge_obj(self.data, data) - # 遍历更新内容并调用回调函数 - for selector, section in data.items(): - if selector == "quotes": - if self.quote_callback_func: - for ins_id in section.keys(): - if ins_id in self.quote_ins_list: - self.quote_callback_func(ins_id) - - elif selector == "ticks": - for ins_id in section.keys(): - chart_id = self._generate_chart_id(ins_id, 0) - sub_info = self.chart_subscribes.get(chart_id, None) - tick_serial = self.get_tick_serial(ins_id) - if tick_serial and sub_info: - while len(tick_serial) > sub_info["view_width"]: - tick_serial.popitem(last=False) - callback_func = sub_info["callback"] - if callback_func: - callback_func(ins_id, 0) - - elif selector == "klines": - for ins_id, sub_section in section.items(): - for dur_nanoseconds in sub_section.keys(): - dur_seconds = int(dur_nanoseconds) / 1000000000 - chart_id = self._generate_chart_id(ins_id, dur_seconds) - sub_info = self.chart_subscribes.get(chart_id, None) - kline_serial = self.get_kline_serial(ins_id, dur_seconds) - if kline_serial and sub_info: - while len(kline_serial) > sub_info["view_width"]: - kline_serial.popitem(last=False) - callback_func = sub_info["callback"] - if callback_func: - callback_func(ins_id, dur_seconds) - - #---------------------------------------------------------------------- - def _merge_obj(self, result, obj): - """合并对象""" - for key, value in obj.items(): - if value is None: - result.pop(key, None) - elif isinstance(value, dict): - target = result.setdefault(key, SortedDict()) - self._merge_obj(target, value) - else: - result[key] = value - - #---------------------------------------------------------------------- - def _generate_chart_id(self, ins_id, duration_seconds): - """生成图表编号""" - chart_id = "VN_%s_%d" % (ins_id, duration_seconds) - chart_id = chart_id.replace(".", "_") - return chart_id \ No newline at end of file diff --git a/vnpy/trader/VT_setting.json b/vnpy/trader/VT_setting.json index 473017fb..01cefad2 100644 --- a/vnpy/trader/VT_setting.json +++ b/vnpy/trader/VT_setting.json @@ -16,5 +16,8 @@ "tdPenalty": ["IF", "IH", "IC"], - "maxDecimal": 4 + "maxDecimal": 4, + + "rqUsername": "", + "rqPassword": "" } \ No newline at end of file diff --git a/vnpy/trader/app/algoTrading/algo/stopAlgo.py b/vnpy/trader/app/algoTrading/algo/stopAlgo.py index 0985d96b..df12514c 100644 --- a/vnpy/trader/app/algoTrading/algo/stopAlgo.py +++ b/vnpy/trader/app/algoTrading/algo/stopAlgo.py @@ -70,7 +70,7 @@ class StopAlgo(AlgoTemplate): func = self.sell - self.vtOrderID = func(self.vtSymbol, price, self.volume, offset=self.offset) + self.vtOrderID = func(self.vtSymbol, price, self.totalVolume, offset=self.offset) msg = u'停止单已触发,代码:%s,方向:%s, 价格:%s,数量:%s,开平:%s' %(self.vtSymbol, self.direction, diff --git a/vnpy/trader/app/ctaStrategy/ctaEngine.py b/vnpy/trader/app/ctaStrategy/ctaEngine.py index a45e1db2..385bb6eb 100644 --- a/vnpy/trader/app/ctaStrategy/ctaEngine.py +++ b/vnpy/trader/app/ctaStrategy/ctaEngine.py @@ -20,6 +20,7 @@ from vnpy.trader.vtObject import VtTickData, VtBarData from vnpy.trader.vtGateway import VtSubscribeReq, VtOrderReq, VtCancelOrderReq, VtLogData from vnpy.trader.vtFunction import todayDate, getJsonPath from vnpy.trader.app import AppEngine +from vnpy.trader.vtGlobal import globalSetting from .ctaBase import * from .strategy import STRATEGY_CLASS @@ -74,6 +75,15 @@ class CtaEngine(AppEngine): # 引擎类型为实盘 self.engineType = ENGINETYPE_TRADING + # RQData数据服务 + self.rq = None + + # RQData能获取的合约代码列表 + self.rqSymbolSet = set() + + # 初始化RQData服务 + self.initRqData() + # 注册日式事件类型 self.mainEngine.registerLogEvent(EVENT_CTA_LOG) @@ -343,6 +353,12 @@ class CtaEngine(AppEngine): #---------------------------------------------------------------------- def loadBar(self, dbName, collectionName, days): """从数据库中读取Bar数据,startDate是datetime对象""" + # 优先尝试从RQData获取数据 + if dbName == MINUTE_DB_NAME and collectionName.upper() in self.rqSymbolSet: + l = self.loadRqBar(collectionName, days) + return l + + # 如果没有则从数据库中读取数据 startDate = self.today - timedelta(days) d = {'datetime':{'$gte':startDate}} @@ -663,4 +679,60 @@ class CtaEngine(AppEngine): return contract.priceTick return 0 - \ No newline at end of file + #---------------------------------------------------------------------- + def initRqData(self): + """初始化RQData客户端""" + # 检查是否填写了RQData配置 + username = globalSetting.get('rqUsername', None) + password = globalSetting.get('rqPassword', None) + if not username or not password: + return + + # 加载RQData + try: + import rqdatac as rq + except ImportError: + return + + # 登录RQData + self.rq = rq + self.rq.init(username, password) + + # 获取本日可交易合约代码 + try: + df = self.rq.all_instruments(type='Future', date=datetime.now()) + for ix, row in df.iterrows(): + self.rqSymbolSet.add(row['order_book_id']) + except RuntimeError: + pass + + #---------------------------------------------------------------------- + def loadRqBar(self, symbol, days): + """从RQData加载K线数据""" + endDate = datetime.now() + startDate = endDate - timedelta(days) + + df = self.rq.get_price(symbol.upper(), + frequency='1m', + fields=['open', 'high', 'low', 'close', 'volume'], + start_date=startDate, + end_date=endDate) + + l = [] + + for ix, row in df.iterrows(): + bar = VtBarData() + bar.symbol = symbol + bar.vtSymbol = symbol + bar.open = row['open'] + bar.high = row['high'] + bar.low = row['low'] + bar.close = row['close'] + bar.volume = row['volume'] + bar.datetime = row.name + bar.date = bar.datetime.strftime("%Y%m%d") + bar.time = bar.datetime.strftime("%H:%M:%S") + + l.append(bar) + + return l \ No newline at end of file diff --git a/vnpy/trader/app/spreadTrading/stEngine.py b/vnpy/trader/app/spreadTrading/stEngine.py index 43339eab..69bdd3ab 100644 --- a/vnpy/trader/app/spreadTrading/stEngine.py +++ b/vnpy/trader/app/spreadTrading/stEngine.py @@ -35,7 +35,7 @@ class StDataEngine(object): # 腿、价差相关字典 self.legDict = {} # vtSymbol:StLeg - self.spreadDict = {} # name:StSpread + self.spreadDict = OrderedDict() # name:StSpread self.vtSymbolSpreadDict = {} # vtSymbol:StSpread self.registerEvent() diff --git a/vnpy/trader/app/spreadTrading/uiStWidget.py b/vnpy/trader/app/spreadTrading/uiStWidget.py index d206745e..d382b894 100644 --- a/vnpy/trader/app/spreadTrading/uiStWidget.py +++ b/vnpy/trader/app/spreadTrading/uiStWidget.py @@ -462,9 +462,10 @@ class StAlgoManager(QtWidgets.QTableWidget): algoEngine = self.algoEngine l = self.algoEngine.getAllAlgoParams() - self.setRowCount(len(l)) - for row, d in enumerate(l): + for d in l: + self.insertRow(0) + cellSpreadName = QtWidgets.QTableWidgetItem(d['spreadName']) cellAlgoName = QtWidgets.QTableWidgetItem(d['algoName']) cellNetPos = QtWidgets.QTableWidgetItem('0') @@ -477,17 +478,17 @@ class StAlgoManager(QtWidgets.QTableWidget): comboMode = StModeComboBox(algoEngine, d['spreadName'], d['mode']) buttonActive = StActiveButton(algoEngine, d['spreadName']) - self.setItem(row, 0, cellSpreadName) - self.setItem(row, 1, cellAlgoName) - self.setItem(row, 2, cellNetPos) - self.setCellWidget(row, 3, spinBuyPrice) - self.setCellWidget(row, 4, spinSellPrice) - self.setCellWidget(row, 5, spinCoverPrice) - self.setCellWidget(row, 6, spinShortPrice) - self.setCellWidget(row, 7, spinMaxOrderSize) - self.setCellWidget(row, 8, spinMaxPosSize) - self.setCellWidget(row, 9, comboMode) - self.setCellWidget(row, 10, buttonActive) + self.setItem(0, 0, cellSpreadName) + self.setItem(0, 1, cellAlgoName) + self.setItem(0, 2, cellNetPos) + self.setCellWidget(0, 3, spinBuyPrice) + self.setCellWidget(0, 4, spinSellPrice) + self.setCellWidget(0, 5, spinCoverPrice) + self.setCellWidget(0, 6, spinShortPrice) + self.setCellWidget(0, 7, spinMaxOrderSize) + self.setCellWidget(0, 8, spinMaxPosSize) + self.setCellWidget(0, 9, comboMode) + self.setCellWidget(0, 10, buttonActive) buttonActive.signalActive.connect(spinBuyPrice.algoActiveChanged) buttonActive.signalActive.connect(spinSellPrice.algoActiveChanged) diff --git a/vnpy/trader/app/tradeCopy/__init__.py b/vnpy/trader/app/tradeCopy/__init__.py new file mode 100644 index 00000000..6f445281 --- /dev/null +++ b/vnpy/trader/app/tradeCopy/__init__.py @@ -0,0 +1,10 @@ +# encoding: UTF-8 + +from .tcEngine import TcEngine +from .uiTcWidget import TcManager + +appName = 'TradeCopy' +appDisplayName = u'交易复制' +appEngine = TcEngine +appWidget = TcManager +appIco = 'tc.ico' \ No newline at end of file diff --git a/vnpy/trader/app/tradeCopy/tc.ico b/vnpy/trader/app/tradeCopy/tc.ico new file mode 100644 index 00000000..e7ee47a9 Binary files /dev/null and b/vnpy/trader/app/tradeCopy/tc.ico differ diff --git a/vnpy/trader/app/tradeCopy/tcEngine.py b/vnpy/trader/app/tradeCopy/tcEngine.py new file mode 100644 index 00000000..446b0471 --- /dev/null +++ b/vnpy/trader/app/tradeCopy/tcEngine.py @@ -0,0 +1,331 @@ +# encoding: UTF-8 + +from collections import defaultdict + +from vnpy.event import Event +from vnpy.rpc import RpcClient, RpcServer +from vnpy.trader.vtEvent import (EVENT_POSITION, EVENT_TRADE, + EVENT_TIMER, EVENT_ORDER) +from vnpy.trader.vtConstant import (DIRECTION_LONG, DIRECTION_SHORT, + OFFSET_OPEN, OFFSET_CLOSE, PRICETYPE_LIMITPRICE, + OFFSET_CLOSEYESTERDAY, OFFSET_CLOSETODAY, + STATUS_REJECTED) +from vnpy.trader.vtObject import VtOrderReq, VtCancelOrderReq, VtLogData, VtSubscribeReq + + +EVENT_TC_LOG = 'eTcLog' + + +######################################################################## +class TcEngine(object): + """交易复制引擎""" + MODE_PROVIDER = 1 + MODE_SUBSCRIBER = 2 + + #---------------------------------------------------------------------- + def __init__(self, mainEngine, eventEngine): + """Constructor""" + self.mainEngine = mainEngine + self.eventEngine = eventEngine + + self.mode = None # Subscriber/Provider + self.posDict = defaultdict(int) # vtPositionName:int + self.targetDict = defaultdict(int) # vtPositionName:int + self.copyRatio = 1 + self.interval = 1 + self.subscribeSet = set() + + self.count = 0 + self.server = None # RPC Server + self.client = None # RPC Client + + self.registerEvent() + + #---------------------------------------------------------------------- + def startProvider(self, repAddress, pubAddress, interval): + """""" + self.mode = self.MODE_PROVIDER + self.interval = interval + + if not self.server: + self.server = RpcServer(repAddress, pubAddress) + self.server.usePickle() + self.server.register(self.getPos) + self.server.start() + + self.writeLog(u'启动发布者模式(如需修改通讯地址请重启程序)') + + #---------------------------------------------------------------------- + def startSubscriber(self, reqAddress, subAddress, copyRatio): + """""" + self.mode = self.MODE_SUBSCRIBER + self.copyRatio = copyRatio + + if not self.client: + self.client = TcClient(self, reqAddress, subAddress) + self.client.usePickle() + self.client.subscribeTopic('') + self.client.start() + + self.writeLog(u'启动订阅者模式,运行时请不要执行其他交易操作') + self.initTarget() + + #---------------------------------------------------------------------- + def stop(self): + """""" + if self.client: + self.client.stop() + self.writeLog(u'订阅者模式已停止') + + if self.server: + self.server.stop() + self.writeLog(u'发布者模式已停止') + + self.mode = None + + #---------------------------------------------------------------------- + def registerEvent(self): + """""" + self.eventEngine.register(EVENT_POSITION, self.processPositionEvent) + self.eventEngine.register(EVENT_TRADE, self.processTradeEvent) + self.eventEngine.register(EVENT_TIMER, self.processTimerEvent) + self.eventEngine.register(EVENT_ORDER, self.processOrderEvent) + + #---------------------------------------------------------------------- + def checkAndTrade(self, vtSymbol): + """""" + if self.checkNoWorkingOrder(vtSymbol): + self.newOrder(vtSymbol) + else: + self.cancelOrder(vtSymbol) + + #---------------------------------------------------------------------- + def processTimerEvent(self, event): + """""" + if self.mode != self.MODE_PROVIDER: + return + + self.count += 1 + if self.count < self.interval: + return + self.count = 0 + + for vtPositionName in self.posDict.keys(): + self.publishPos(vtPositionName) + + #---------------------------------------------------------------------- + def processTradeEvent(self, event): + """""" + trade = event.dict_['data'] + + vtPositionName = '.'.join([trade.vtSymbol, trade.direction]) + + if trade.offset == OFFSET_OPEN: + self.posDict[vtPositionName] += trade.volume + else: + self.posDict[vtPositionName] -= trade.volume + + if self.mode == self.MODE_PROVIDER: + self.publishPos(vtPositionName) + + #---------------------------------------------------------------------- + def processPositionEvent(self, event): + """""" + position = event.dict_['data'] + self.posDict[position.vtPositionName] = position.position + + #---------------------------------------------------------------------- + def processOrderEvent(self, event): + """""" + order = event.dict_['data'] + if order.status == STATUS_REJECTED: + self.writeLog(u'监控到委托拒单,停止运行') + self.stop() + + #---------------------------------------------------------------------- + def publishPos(self, vtPositionName): + """""" + l = vtPositionName.split('.') + direction = l[-1] + vtSymbol = vtPositionName.replace('.' + direction, '') + + data = { + 'vtSymbol': vtSymbol, + 'vtPositionName': vtPositionName, + 'pos': self.posDict[vtPositionName] + } + self.server.publish('', data) + + #---------------------------------------------------------------------- + def updatePos(self, data): + """""" + vtSymbol = data['vtSymbol'] + if vtSymbol not in self.subscribeSet: + contract = self.mainEngine.getContract(vtSymbol) + + req = VtSubscribeReq() + req.symbol = contract.symbol + req.exchange = contract.exchange + self.mainEngine.subscribe(req, contract.gatewayName) + + vtPositionName = data['vtPositionName'] + target = int(data['pos'] * self.copyRatio) + self.targetDict[vtPositionName] = target + + self.checkAndTrade(vtSymbol) + + #---------------------------------------------------------------------- + def newOrder(self, vtSymbol): + """""" + for vtPositionName in self.targetDict.keys(): + if vtSymbol not in vtPositionName: + continue + + pos = self.posDict[vtPositionName] + target = self.targetDict[vtPositionName] + if pos == target: + continue + + contract = self.mainEngine.getContract(vtSymbol) + tick = self.mainEngine.getTick(vtSymbol) + if not tick: + return + + req = VtOrderReq() + req.symbol = contract.symbol + req.exchange = contract.exchange + req.priceType = PRICETYPE_LIMITPRICE + req.volume = abs(target - pos) + + # Open position + if target > pos: + req.offset = OFFSET_OPEN + + if DIRECTION_LONG in vtPositionName: + req.direction = DIRECTION_LONG + if tick.upperLimit: + req.price = tick.upperLimit + else: + req.price = tick.askPrice1 + elif DIRECTION_SHORT in vtPositionName: + req.direction = DIRECTION_SHORT + if tick.lowerLimit: + req.price = tick.lowerLimit + else: + req.price = tick.bidPrice1 + + self.mainEngine.sendOrder(req, contract.gatewayName) + + # Close position + elif target < pos: + req.offset = OFFSET_CLOSE + + if DIRECTION_LONG in vtPositionName: + req.direction = DIRECTION_SHORT + if tick.upperLimit: + req.price = tick.upperLimit + else: + req.price = tick.askPrice1 + + elif DIRECTION_SHORT in vtPositionName: + req.direction = DIRECTION_LONG + if tick.lowerLimit: + req.price = tick.lowerLimit + else: + req.price = tick.bidPrice1 + + # Use auto-convert for solving today/yesterday position problem + reqList = self.mainEngine.convertOrderReq(req) + for convertedReq in reqList: + self.mainEngine.sendOrder(convertedReq, contract.gatewayName) + + # Write log + msg = u'发出%s委托 %s%s %s@%s' %(vtSymbol, req.direction, req.offset, + req.price, req.volume) + self.writeLog(msg) + + #---------------------------------------------------------------------- + def cancelOrder(self, vtSymbol): + """ + Cancel all orders of a certain vtSymbol + """ + l = self.mainEngine.getAllWorkingOrders() + for order in l: + if order.vtSymbol == vtSymbol: + req = VtCancelOrderReq() + req.orderID = order.orderID + req.frontID = order.frontID + req.sessionID = order.sessionID + req.symbol = order.symbol + req.exchange = order.exchange + self.mainEngine.cancelOrder(req, order.gatewayName) + + self.writeLog(u'撤销%s全部活动中委托' %vtSymbol) + + #---------------------------------------------------------------------- + def checkNoWorkingOrder(self, vtSymbol): + """ + Check if there is still any working orders of a certain vtSymbol + """ + l = self.mainEngine.getAllWorkingOrders() + for order in l: + if order.vtSymbol == vtSymbol: + return False + + return True + + #---------------------------------------------------------------------- + def writeLog(self, msg): + """""" + log = VtLogData() + log.logContent = msg + + event = Event(EVENT_TC_LOG) + event.dict_['data'] = log + self.eventEngine.put(event) + + #---------------------------------------------------------------------- + def getPos(self): + """ + Get currenct position data of provider + """ + return dict(self.posDict) + + #---------------------------------------------------------------------- + def initTarget(self): + """ + Init target data of subscriber based on position data from provider + """ + d = self.client.getPos() + for vtPositionName, pos in d.items(): + l = vtPositionName.split('.') + direction = l[-1] + vtSymbol = vtPositionName.replace('.' + direction, '') + + data = { + 'vtPositionName': vtPositionName, + 'vtSymbol': vtSymbol, + 'pos': pos + } + self.updatePos(data) + + self.writeLog(u'目标仓位初始化完成') + + +######################################################################## +class TcClient(RpcClient): + """""" + + #---------------------------------------------------------------------- + def __init__(self, engine, reqAddress, subAddress): + """Constructor""" + super(TcClient, self).__init__(reqAddress, subAddress) + + self.engine = engine + + #---------------------------------------------------------------------- + def callback(self, topic, data): + """""" + self.engine.updatePos(data) + + \ No newline at end of file diff --git a/vnpy/trader/app/tradeCopy/uiTcWidget.py b/vnpy/trader/app/tradeCopy/uiTcWidget.py new file mode 100644 index 00000000..5cdfc9d4 --- /dev/null +++ b/vnpy/trader/app/tradeCopy/uiTcWidget.py @@ -0,0 +1,199 @@ +# encoding: UTF-8 + +import shelve + +from vnpy.event import Event +from vnpy.trader.uiQt import QtCore, QtGui, QtWidgets +from vnpy.trader.vtFunction import getTempPath + +from .tcEngine import EVENT_TC_LOG + + +######################################################################## +class TcManager(QtWidgets.QWidget): + """""" + REQ_ADDRESS = 'tcp://localhost:2015' + SUB_ADDRESS = 'tcp://localhost:2018' + REP_ADDRESS = 'tcp://*:2015' + PUB_ADDRESS = 'tcp://*:2018' + COPY_RATIO = '1' + INTERVAL = '1' + + settingFileName = 'TradeCopy.vt' + settingFilePath = getTempPath(settingFileName) + + signal = QtCore.Signal(type(Event())) + + #---------------------------------------------------------------------- + def __init__(self, tcEngine, eventEngine, parent=None): + """Constructor""" + super(TcManager, self).__init__(parent) + + self.tcEngine = tcEngine + self.eventEngine = eventEngine + + self.initUi() + self.loadSetting() + self.registerEvent() + + self.tcEngine.writeLog(u'欢迎使用TradeCopy交易复制模块') + + #---------------------------------------------------------------------- + def initUi(self): + """""" + self.setWindowTitle(u'交易复制') + self.setMinimumWidth(700) + self.setMinimumHeight(700) + + # 创建组件 + self.lineReqAddress = QtWidgets.QLineEdit(self.REQ_ADDRESS) + self.lineSubAddress= QtWidgets.QLineEdit(self.SUB_ADDRESS) + self.lineRepAddress = QtWidgets.QLineEdit(self.REP_ADDRESS) + self.linePubAddress = QtWidgets.QLineEdit(self.PUB_ADDRESS) + + validator = QtGui.QDoubleValidator() + validator.setBottom(0) + self.lineCopyRatio = QtWidgets.QLineEdit() + self.lineCopyRatio.setValidator(validator) + self.lineCopyRatio.setText(self.COPY_RATIO) + + validator2 = QtGui.QIntValidator() + validator2.setBottom(1) + self.lineInterval = QtWidgets.QLineEdit() + self.lineInterval.setValidator(validator2) + self.lineInterval.setText(self.INTERVAL) + + self.buttonProvider = QtWidgets.QPushButton(u'启动发布者') + self.buttonProvider.clicked.connect(self.startProvider) + + self.buttonSubscriber = QtWidgets.QPushButton(u'启动订阅者') + self.buttonSubscriber.clicked.connect(self.startSubscriber) + + self.buttonStopEngine = QtWidgets.QPushButton(u'停止') + self.buttonStopEngine.clicked.connect(self.stopEngine) + self.buttonStopEngine.setEnabled(False) + + self.buttonResetAddress = QtWidgets.QPushButton(u'重置地址') + self.buttonResetAddress.clicked.connect(self.resetAddress) + + self.logMonitor = QtWidgets.QTextEdit() + self.logMonitor.setReadOnly(True) + + self.widgetList = [ + self.lineCopyRatio, + self.lineInterval, + self.linePubAddress, + self.lineSubAddress, + self.lineRepAddress, + self.lineReqAddress, + self.buttonProvider, + self.buttonSubscriber, + self.buttonResetAddress + ] + + # 布局 + QLabel = QtWidgets.QLabel + grid = QtWidgets.QGridLayout() + + grid.addWidget(QLabel(u'响应地址'), 0, 0) + grid.addWidget(self.lineRepAddress, 0, 1) + grid.addWidget(QLabel(u'请求地址'), 0, 2) + grid.addWidget(self.lineReqAddress, 0, 3) + + grid.addWidget(QLabel(u'发布地址'), 1, 0) + grid.addWidget(self.linePubAddress, 1, 1) + grid.addWidget(QLabel(u'订阅地址'), 1, 2) + grid.addWidget(self.lineSubAddress, 1, 3) + + grid.addWidget(QLabel(u'发布间隔(秒)'), 2, 0) + grid.addWidget(self.lineInterval, 2, 1) + grid.addWidget(QLabel(u'复制比例(倍)'), 2, 2) + grid.addWidget(self.lineCopyRatio, 2, 3) + + grid.addWidget(self.buttonProvider, 3, 0, 1, 2) + grid.addWidget(self.buttonSubscriber, 3, 2, 1, 2) + grid.addWidget(self.buttonStopEngine, 4, 0, 1, 2) + grid.addWidget(self.buttonResetAddress, 4, 2, 1, 2) + + grid.addWidget(self.logMonitor, 5, 0, 1, 4) + + self.setLayout(grid) + + #---------------------------------------------------------------------- + def saveSetting(self): + """""" + f = shelve.open(self.settingFilePath) + f['repAddress'] = self.lineRepAddress.text() + f['reqAddress'] = self.lineReqAddress.text() + f['pubAddress'] = self.linePubAddress.text() + f['subAddress'] = self.lineSubAddress.text() + f['copyRatio'] = self.lineCopyRatio.text() + f['interval'] = self.lineInterval.text() + f.close() + + #---------------------------------------------------------------------- + def loadSetting(self): + """""" + f = shelve.open(self.settingFilePath) + if f: + self.lineRepAddress.setText(f['repAddress']) + self.lineReqAddress.setText(f['reqAddress']) + self.linePubAddress.setText(f['pubAddress']) + self.lineSubAddress.setText(f['subAddress']) + self.lineCopyRatio.setText(f['copyRatio']) + self.lineInterval.setText(f['interval']) + f.close() + + #---------------------------------------------------------------------- + def resetAddress(self): + """""" + self.lineReqAddress.setText(self.REQ_ADDRESS) + self.lineRepAddress.setText(self.REP_ADDRESS) + self.linePubAddress.setText(self.PUB_ADDRESS) + self.lineSubAddress.setText(self.SUB_ADDRESS) + + #---------------------------------------------------------------------- + def stopEngine(self): + """""" + self.tcEngine.stop() + + for widget in self.widgetList: + widget.setEnabled(True) + self.buttonStopEngine.setEnabled(False) + + #---------------------------------------------------------------------- + def registerEvent(self): + """""" + self.signal.connect(self.processLogEvent) + self.eventEngine.register(EVENT_TC_LOG, self.signal.emit) + + #---------------------------------------------------------------------- + def processLogEvent(self, event): + """""" + log = event.dict_['data'] + txt = '%s: %s' %(log.logTime, log.logContent) + self.logMonitor.append(txt) + + #---------------------------------------------------------------------- + def startProvider(self): + """""" + repAddress = str(self.lineRepAddress.text()) + pubAddress = str(self.linePubAddress.text()) + interval = int(self.lineInterval.text()) + self.tcEngine.startProvider(repAddress, pubAddress, interval) + + for widget in self.widgetList: + widget.setEnabled(False) + self.buttonStopEngine.setEnabled(True) + + #---------------------------------------------------------------------- + def startSubscriber(self): + """""" + reqAddress = str(self.lineReqAddress.text()) + subAddress = str(self.lineSubAddress.text()) + copyRatio = float(self.lineCopyRatio.text()) + self.tcEngine.startSubscriber(reqAddress, subAddress, copyRatio) + + for widget in self.widgetList: + widget.setEnabled(False) + self.buttonStopEngine.setEnabled(True) \ No newline at end of file diff --git a/vnpy/trader/gateway/bithumb/bithumbGateway.py b/vnpy/trader/gateway/bithumb/bithumbGateway.py deleted file mode 100644 index 42c3e8e5..00000000 --- a/vnpy/trader/gateway/bithumb/bithumbGateway.py +++ /dev/null @@ -1,742 +0,0 @@ -# encoding: UTF-8 - -''' -vnpy.api.bithumb的gateway接入 -''' -import json -from collections import defaultdict -from datetime import datetime - -from vnpy.api.bithumb import BithumbRestApi -from vnpy.trader.vtFunction import getJsonPath -from vnpy.trader.vtGateway import * - -# 方向映射 -directionMap = { - constant.DIRECTION_LONG: 'bid', - constant.DIRECTION_SHORT: 'ask' -} -directionMapReverse = {v: k for k, v in directionMap.items()} - -# 从https://www.bithumb.com/u1/US127中https://api.bithumb.com/trade/place的API说明中得到 -minimum_ticks = { - 'BTC': 0.001, - 'ETH': 0.01, - 'DASH': 0.01, - 'LTC': 0.01, - 'ETC': 0.1, - 'XRP': 10, - 'BCH': 0.001, - 'XMR': 0.01, - 'ZEC': 0.01, - 'QTUM': 0.1, - 'BTG': 0.1, - 'EOS': 0.1, - 'ICX': 1, - 'VEN': 1, - 'TRX': 100, - 'ELF': 10, - 'MITH': 10, - 'MCO': 10, - 'OMG': 0.1, - 'KNC': 1, - 'GNT': 10, - 'HSR': 1, - 'ZIL': 100, - 'ETHOS': 1, - 'PAY': 1, - 'WAX': 10, - 'POWR': 10, - 'LRC': 10, - 'GTO': 10, - 'STEEM': 10, - 'STRAT': 1, - 'ZRX': 1, - 'REP': 0.1, - 'AE': 1, - 'XEM': 10, - 'SNT': 10, - 'ADA': 10 -} - - -######################################################################## -class BithumbGateway(VtGateway): - - #---------------------------------------------------------------------- - def __init__(self, eventEngine, gatewayName='BithumbGateway'): - super(BithumbGateway, self).__init__(eventEngine, gatewayName) - - self.restApi = RestApi(self) # type: RestApi - - self.qryEnabled = False - - self.fileName = self.gatewayName + '_connect.json' - self.filePath = getJsonPath(self.fileName, __file__) - - #---------------------------------------------------------------------- - def connect(self): - """连接""" - try: - f = open(self.filePath) - except IOError: - log = VtLogData() - log.gatewayName = self.gatewayName - log.logContent = u'读取连接配置出错,请检查' - self.onLog(log) - return - - # 解析json文件 - setting = json.load(f) - f.close() - try: - apiKey = str(setting['apiKey']) - apiSecret = str(setting['apiSecret']) - # symbols = setting['symbols'] - except KeyError: - log = VtLogData() - log.gatewayName = self.gatewayName - log.logContent = u'连接配置缺少字段,请检查' - self.onLog(log) - return - - # 创建行情和交易接口对象 - self.restApi.connect(apiKey, apiSecret) - - # 初始化并启动查询 - self.initQuery() - - #---------------------------------------------------------------------- - def subscribe(self, subscribeReq): - """订阅行情""" - pass - - #---------------------------------------------------------------------- - def sendOrder(self, orderReq): - """发单""" - return self.restApi.sendOrder(orderReq) - - #---------------------------------------------------------------------- - def cancelOrder(self, cancelOrderReq): - """撤单""" - self.restApi.cancelOrder(cancelOrderReq) - - #---------------------------------------------------------------------- - def close(self): - """关闭""" - self.restApi.close() - - #---------------------------------------------------------------------- - def initQuery(self): - """初始化连续查询""" - # if self.qryEnabled: - # 需要循环的查询函数列表 - # self.qryFunctionList = [self.restApi.qryTickers, - # self.restApi.qryDepth, - # self.restApi.qryPosition, - # self.restApi.qryOrder] - # - # self.qryCount = 0 # 查询触发倒计时 - # self.qryTrigger = 1 # 查询触发点 - # self.qryNextFunction = 0 # 上次运行的查询函数索引 - # - # self.startQuery() - pass - - #---------------------------------------------------------------------- - def query(self, event): - """注册到事件处理引擎上的查询函数""" - # self.qryCount += 1 - # - # if self.qryCount > self.qryTrigger: - # # 清空倒计时 - # self.qryCount = 0 - # - # # 执行查询函数 - # function = self.qryFunctionList[self.qryNextFunction] - # function() - # - # # 计算下次查询函数的索引,如果超过了列表长度,则重新设为0 - # self.qryNextFunction += 1 - # if self.qryNextFunction == len(self.qryFunctionList): - # self.qryNextFunction = 0 - pass - - #---------------------------------------------------------------------- - def startQuery(self): - """启动连续查询""" - self.eventEngine.register(EVENT_TIMER, self.query) - - #---------------------------------------------------------------------- - def setQryEnabled(self, qryEnabled): - """设置是否要启动循环查询""" - self.qryEnabled = qryEnabled - - -######################################################################## -# noinspection PyUnusedLocal -class RestApi(BithumbRestApi): - """REST API实现""" - - #---------------------------------------------------------------------- - def __init__(self, gateway): - """Constructor""" - super(RestApi, self).__init__() - - self.gateway = gateway # type: BithumbGateway # gateway对象 - self.gatewayName = gateway.gatewayName # gateway对象名称 - - self.localID = 0 - self.tradeID = 0 - - self.orders = {} # type: dict[str, VtOrderData] # localID:order - self.sysLocalDict = {} # type: dict[str, str] # sysID: localID - self.localSysDict = {} # type: dict[str, str] # localID: sysID - self.reqOrderDict = {} # type: dict[int, VtOrderData] # reqID:order - self.cancelDict = {} # type: dict[str, VtCancelOrderReq] # localID:req - - self.tickDict = {} - - #---------------------------------------------------------------------- - def connect(self, apiKey, apiSecret): - """连接服务器""" - self.init(apiKey, apiSecret) - self.start() - - # self.symbols = symbols - self.writeLog(u'REST API启动成功') - - self.qryContract() - - #---------------------------------------------------------------------- - def writeLog(self, content): - """发出日志""" - log = VtLogData() - log.gatewayName = self.gatewayName - log.logContent = content - self.gateway.onLog(log) - - #---------------------------------------------------------------------- - def generateLocalOrder(self, ): - self.localID += 1 - localID = str(self.localID) - order = VtOrderData() - order.gatewayName = self.gatewayName - order.status = constant.STATUS_UNKNOWN - order.exchange = constant.EXCHANGE_BITHUMB - order.orderID = localID - order.vtOrderID = '.'.join([self.gatewayName, localID]) - self.orders[localID] = order - return order - - #---------------------------------------------------------------------- - def sendOrder(self, orderReq): - """下单""" - req = { - 'order_currency': orderReq.symbol, - 'Payment_currency': orderReq.currency, # todo: 无论如何服务器都会以KRW作为单位 - 'type': directionMap[orderReq.direction], - 'price': int(orderReq.price), - 'units': orderReq.volume - } - - reqid = self.addReq('POST', '/trade/place', self.onSendOrder, postdict=req) - - # 缓存委托数据对象 - order = self.generateLocalOrder() - self.fillLocalOrder(order, - orderReq.symbol, - orderReq.price, - orderReq.volume, - orderReq.direction) - - self.reqOrderDict[reqid] = order - return order.vtOrderID - - #---------------------------------------------------------------------- - def onSendOrder(self, data, reqid): # type: (dict, int)->None - """下单回执""" - if self.checkError(u'委托', data): - return - - order = self.reqOrderDict[reqid] - localID = order.orderID - sysID = data['order_id'] - - self.saveSysIDForOrder(order, sysID) - - self.gateway.onOrder(order) - - # 发出等待的撤单委托 - if localID in self.cancelDict: - req = self.cancelDict[localID] - self.cancelOrder(req) - del self.cancelDict[localID] - - #---------------------------------------------------------------------- - @staticmethod - def fillLocalOrder(order, symbol, price, totalVolume, direction): - order.symbol = symbol - order.vtSymbol = '.'.join([order.symbol, order.exchange]) - order.price = price - order.totalVolume = totalVolume - order.direction = direction - - #---------------------------------------------------------------------- - def saveSysIDForOrder(self, order, sysID): # type: (VtOrderData, str)->None - self.sysLocalDict[sysID] = order.orderID - self.localSysDict[order.orderID] = sysID - - #---------------------------------------------------------------------- - def qryOrder(self, order): # type: (VtOrderData)->None - sysID = self.getSysIDForOrder(order) - req = { - 'currency': order.symbol, - 'order_id': sysID - } - self.addReq('POST', '/info/orders', self.onQryOrders, postdict=req) - - #---------------------------------------------------------------------- - def qryOrders(self, currency='XML'): # type: (VtOrderData)->None - sysID = self.getSysIDForOrder(order) - req = { - 'currency': order.symbol, - } - self.addReq('POST', '/info/orders', self.onQryOrders, postdict=req) - - #---------------------------------------------------------------------- - def onQryOrders(self, data, reqid): - if self.checkError(u'订单查询', data): - return - orders = data['data'] - for detail in orders: - sysID = detail['order_id'] - order = self.getOrderBySysID(sysID) - if not order: - # 查询到了新的order(以前的order) - order = self.generateLocalOrder() - self.fillLocalOrder(order, - detail['order_currency'], - detail['price'], - detail['units'], - directionMapReverse[detail['type']]) - order.tradedVolume = order.totalVolume - detail['units_remaining'] - # todo: payment_currency - # payment_currency = detail['payment_currency'] - self.saveSysIDForOrder(order, sysID) - - # 推送 - self.gateway.onOrder(order) - continue - - originalTradeVolume = order.tradedVolume - order.tradedVolume = newTradeVolume = order.totalVolume - detail['units_remaining'] - - if newTradeVolume != originalTradeVolume: - # 推送更新 - self.gateway.onOrder(order) - - # 尝试更新状态 - # todo: 这一句还未测试,不知道成交之后date_completed是不是就会有值 - order.status = constant.STATUS_ALLTRADED if detail['date_completed'] else order.status - if order.status == constant.STATUS_ALLTRADED: - # 推送成交 - self.pushOrderAsTraded(order) - - #---------------------------------------------------------------------- - def pushOrderAsTraded(self, order): - trade = VtTradeData() - trade.gatewayName = order.gatewayName - trade.symbol = order.symbol - trade.vtSymbol = order.vtSymbol - trade.orderID = order.orderID - trade.vtOrderID = order.vtOrderID - self.tradeID += 1 - trade.tradeID = str(self.tradeID) - trade.vtTradeID = '.'.join([self.gatewayName, trade.tradeID]) - trade.direction = order.direction - trade.price = order.price - trade.volume = order.tradedVolume - trade.tradeTime = datetime.now().strftime('%H:%M:%S') - self.gateway.onTrade(trade) - - #---------------------------------------------------------------------- - def cancelOrder(self, cancelOrderReq): # type: (self, VtCancelOrderReq)->None - """""" - localID = cancelOrderReq.orderID - order = self.getOrderByLocalID(localID) - - if self.isOrderPosted(order): - sysID = self.getSysIDForOrder(order) - req = { - 'type': directionMap[order.direction], - 'order_id': sysID, - 'currency': cancelOrderReq.symbol - } - self.addReq('POST', - '/trade/cancel', - callback=lambda data, reqid: self.onCancelOrder(localID, data, reqid), - postdict=req) - else: - self.cancelDict[localID] = cancelOrderReq - - #---------------------------------------------------------------------- - def onCancelOrder(self, localID, data, reqid): - if self.checkError(u'撤单', data): - return - order = self.getOrderByLocalID(localID) - order.status = constant.STATUS_CANCELLED - - #---------------------------------------------------------------------- - def qryContract(self): - """""" - contract = VtContractData() - contract.gatewayName = self.gatewayName - - for symbol, tick in minimum_ticks.items(): - contract.symbol = symbol - contract.exchange = constant.EXCHANGE_BITHUMB - contract.vtSymbol = '.'.join([contract.symbol, contract.exchange]) - contract.name = contract.vtSymbol - contract.productClass = constant.PRODUCT_SPOT - contract.priceTick = tick - contract.size = 1 - self.gateway.onContract(contract) - - #---------------------------------------------------------------------- - def qryPublicTick(self, symbol='ALL'): - """ symbol 可以是'BTC', 'ETC'等等电子货币符号;也可以使用'ALL',表示要获取所有货币的行情""" - url = '/public/ticker/' + symbol - if symbol.upper() == 'ALL': - self.addReq('GET', url, self.onQryMultiPublicTicker) - else: - self.addReq('GET', url, - callback=lambda data, reqid: self.onQrySinglePublicTicker(symbol, data, id)) - - #---------------------------------------------------------------------- - def qryPublicOrderBook(self, symbol='ALL'): - """ symbol 可以是'BTC', 'ETC'等等电子货币符号;也可以使用'ALL',表示要获取所有货币的行情""" - url = '/public/orderbook/' + symbol - if symbol.upper() == 'ALL': - self.addReq('GET', url, self.onQryMultiPublicOrderBook) - else: - self.addReq('GET', url, - callback=lambda data, reqid: self.onQrySinglePublicOrderBook(symbol, data, id)) - - #---------------------------------------------------------------------- - def qryPrivateTick(self, symbol, currency='CNY'): - """""" - req = { - 'order_currency': symbol, - 'payment_currency': currency, - } - self.addReq('POST', '/info/ticker', self.onQryPrivateTicker, postdict=req) - - #---------------------------------------------------------------------- - def qryPosition(self, symbol='ALL'): - """""" - req = { - 'currency': symbol, - } - self.addReq('POST', '/info/balance', self.onQryPosition, postdict=req) - - pass - - #---------------------------------------------------------------------- - def onQryPosition(self, data, reqid): # type: (self, dict, int)->None - """""" - if self.checkError(u'查询持仓', data): - return - - datas = data['data'] # type: dict - - # 先分类一下 - infos = defaultdict(dict) # type: dict[str, dict[str, str]] - for key, val in datas.items(): # type: str, str - split_position = key.rfind('_') - infoType, symbol = key[:split_position], key[split_position+1:] - infos[symbol.upper()][infoType] = val - - for symbol in infos.keys(): - info = infos[symbol] - if symbol == u'LAST': # 过滤掉xcoin_last,这个值表示的是最后一次交易量 - continue - if symbol == u'KRW': - accountData = VtAccountData() - # todo: accountID必须从另一个API获取 - # accountData.accountID = - accountData.balance = info['total'] - accountData.available = info['available'] - self.gateway.onAccount(accountData) - pass - else: - pos = VtPositionData() - pos.gatewayName = self.gatewayName - - pos.symbol = symbol - pos.exchange = constant.EXCHANGE_BITHUMB - pos.vtSymbol = '.'.join([pos.symbol, pos.exchange]) - pos.direction = constant.DIRECTION_NET - pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction]) - pos.position = float(info['total']) - pos.frozen = float(info['in_use']) - - self.gateway.onPosition(pos) - - #---------------------------------------------------------------------- - def parsePublicTickerData(self, symbol, info): - dt = datetime.now() - date = dt.strftime('%Y%m%d') - time = dt.strftime('%H:%M:%S') - - tick = self.getTick(symbol) - - tick.openPrice = float(info['opening_price']) - tick.highPrice = float(info['max_price']) - tick.lowPrice = float(info['min_price']) - tick.lastPrice = float(info['closing_price']) # todo: 也许应该是'buy_price'? - tick.volume = float(info['volume_1day']) - tick.datetime = datetime - tick.date = date - tick.time = time - - # 只有订阅了深度行情才推送 - if tick.bidPrice1: - self.gateway.onTick(tick) - - #---------------------------------------------------------------------- - def onQrySinglePublicTicker(self, symbol, data, reqid): - if self.checkError(u'查询行情', data): - return - - info = data['data'] - self.parsePublicTickerData(symbol=symbol, info=info) - - #---------------------------------------------------------------------- - def onQryMultiPublicTicker(self, data, reqid): - if self.checkError(u'查询行情', data): - return - - for symbol, info in data['data'].items(): - # 里面可能会出现一对:date: int这样的值,所以要过滤掉 - if isinstance(info, dict): - self.parsePublicTickerData(symbol=symbol, info=info) - pass - - #---------------------------------------------------------------------- - def parsePublicOrderBookData(self, symbol, info): - dt = datetime.now() - date = dt.strftime('%Y%m%d') - time = dt.strftime('%H:%M:%S') - - tick = self.getTick(symbol) - - for i in range(5): - tick.__setattr__('askPrice' + str(i + 1), float(info['asks'][i]['price'])) - tick.__setattr__('askVolume' + str(i + 1), float(info['asks'][i]['quantity'])) - - for i in range(5): - tick.__setattr__('bidPrice' + str(i + 1), float(info['bids'][i]['price'])) - tick.__setattr__('bidVolume' + str(i + 1), float(info['bids'][i]['quantity'])) - - tick.datetime = datetime - tick.date = date - tick.time = time - - # 只有订阅了深度行情才推送 - if tick.bidPrice1: - self.gateway.onTick(tick) - - #---------------------------------------------------------------------- - def onQrySinglePublicOrderBook(self, symbol, data, reqid): - if self.checkError(u'五档行情', data): - return - - info = data['data'] - self.parsePublicTickerData(symbol=symbol, info=info) - pass - - #---------------------------------------------------------------------- - def onQryMultiPublicOrderBook(self, data, reqid): - if self.checkError(u'五档行情', data): - return - - for symbol, info in data['data'].items(): - self.parsePublicTickerData(symbol=symbol, info=info) - pass - - #---------------------------------------------------------------------- - def onQryPrivateTicker(self, data, reqid): - pass - - #---------------------------------------------------------------------- - def getTick(self, symbol): - """""" - tick = self.tickDict.get(symbol, None) # type: VtTickData - - if not tick: - tick = VtTickData() - tick.gatewayName = self.gatewayName - tick.symbol = symbol - tick.exchange = constant.EXCHANGE_BITHUMB - tick.vtSymbol = '.'.join([tick.symbol, tick.exchange]) - self.tickDict[symbol] = tick - - return tick - - #---------------------------------------------------------------------- - def checkError(self, name, data): - """""" - status = data.get('status', None) - if status == u'0000': - return False - elif not status: - self.writeLog(u'%s触发错误:%s' % (name, u"未知的响应报文 : %s".format(data))) - return True - - msg = data.get('message', u'unknown') - self.writeLog(u'%s触发错误:%s' % (name, msg)) - return True - - #---------------------------------------------------------------------- - def getOrderByLocalID(self, localID): # type: (str)->VtOrderData - """如果没有该订单,这个函数会出错""" - return self.orders[localID] - - #---------------------------------------------------------------------- - def getOrderByVtOrderID(self, vtOrderId): # type: (str)->VtOrderData - """如果没有该订单,这个函数会出错""" - localID = vtOrderId[vtOrderId.rfind('.') + 1:] - return self.getOrderByLocalID(localID) - - #---------------------------------------------------------------------- - def getOrderBySysID(self, sysID): # type: (str)->VtOrderData - return self.getOrderByLocalID(self.getLocalIDBySysID(sysID)) - - #---------------------------------------------------------------------- - def getLocalIDBySysID(self, sysID): # type: (str)->str - return self.sysLocalDict[sysID] - - #---------------------------------------------------------------------- - def isOrderPosted(self, order): # type: (VtOrderData)->bool - """检查服务器是否响应了一个下单请求,如果已经响应了返回True,否则False""" - return order.orderID in self.localSysDict - - #---------------------------------------------------------------------- - def getSysIDForOrder(self, order): # type: (VtOrderData)->str - return self.localSysDict[order.orderID] - - #---------------------------------------------------------------------- - def hasSysID(self, sysID): - return sysID in self.sysLocalDict - - - -if __name__ == '__main__': - # default test secret: - API_KEY = '0c2f5621ac18d26d51ce640b25eb44f9' - API_SECRET = '62bb8b4e263476f443f8d3dbf0aad6bc' - - api = BithumbRestApi() - api.init(apiKey=API_KEY, apiSecret=API_SECRET) - api.start(1) - - eventEngine = EventEngine2() - rest = RestApi(BithumbGateway(eventEngine=eventEngine)) - rest.connect(API_KEY, API_SECRET) - - translateDict = { - u'\uac70\ub798 \uccb4\uacb0\ub0b4\uc5ed\uc774 ' - u'\uc874\uc7ac\ud558\uc9c0 \uc54a\uc2b5\ub2c8\ub2e4.': "交易记录不存在", - - u'\uac70\ub798 \uc9c4\ud589\uc911\uc778 \ub0b4\uc5ed\uc774 ' - u'\uc874\uc7ac\ud558\uc9c0 \uc54a\uc2b5\ub2c8\ub2e4.': "没有正在进行的交易", - - u'\ub9e4\uc218\uae08\uc561\uc774 \uc0ac\uc6a9\uac00\ub2a5 KRW' - u' \ub97c \ucd08\uacfc\ud558\uc600\uc2b5\ub2c8\ub2e4.': "购买金额超过可用KRW", - - u'\ub9e4\uc218\uac74\uc758 \uc0c1\ud0dc\uac00 \uc9c4\ud589\uc911\uc774 \uc544\ub2d9\ub2c8\ub2e4. ' - u'\ucde8\uc18c\ud560 \uc218 \uc5c6\uc2b5\ub2c8\ub2e4.': "购买查询的状态未在进行中。 它无法取消。", - - u'\uc9c0\uc6d0\ud558\uc9c0 \uc54a\ub294 \ud654\ud3d0\uc785\ub2c8\ub2e4. [347]': "不支持该货币单位。[347]", - } - - def printError(jsonObj): - if rest.checkError('', data=jsonObj): - print('error : ') - msg = jsonObj['message'] - print(translateDict.get(msg, msg)) - - def manualCancelOrder(sysID): - def onTradeCancel(jsonObj, reqid): - print('onTradeCancel : \n{}'.format(jsonObj)) - printError(jsonObj) - - rest.addReq('POST', '/trade/cancel', onTradeCancel, - postdict={'type': 'bid', 'order_id': sysID, 'currency': 'XMR'}) - - def apiCancelOrder(localId): - cancelReq = VtCancelOrderReq() - cancelReq.symbol = order.symbol - cancelReq.orderID = localId - rest.cancelOrder(cancelReq) - - # query tick - rest.qryPublicTick('BTC') - rest.qryPublicTick('ALL') - rest.qryPosition('BTC') - rest.qryPosition('ALL') - - # send order - sendOrderReq = VtOrderReq() - sendOrderReq.symbol = 'XMR' - sendOrderReq.direction = constant.DIRECTION_LONG - sendOrderReq.volume = minimum_ticks['XMR'] - # sendOrderReq.price = 700 - # sendOrderReq.currency = 'CNY' # 不可用 - # sendOrderReq.price = 16.6461 - # sendOrderReq.currency = 'USD' # 不可用 - sendOrderReq.price = 17500 - sendOrderReq.currency = 'KRW' - - vtOrderId = rest.sendOrder(sendOrderReq) - order = rest.getOrderByVtOrderID(vtOrderId) - - # todo: order的状态表示不够清晰 - while not rest.isOrderPosted(order): - time.sleep(0.1) - - sysID = rest.getSysIDForOrder(order) - print("sysID : ") - print(sysID) - - def onOrders(jsonObj, reqid): - print('on_orders : \n{}'.format(jsonObj)) - printError(jsonObj) - - for detail in jsonObj['data']: - sysID = detail['order_id'] - if rest.hasSysID(sysID): - apiCancelOrder(rest.getLocalIDBySysID(sysID)) - else: - manualCancelOrder(sysID) - - after = '1531926544794' # 2018-07-18T15:09:04.794Z - # rest.addReq('POST', '/info/orders', on_orders, - # postdict={'order_id': sysID, 'type': 'bid', 'after': after, 'currency': 'XMR'}) # got - # - # rest.addReq('POST', '/info/orders', on_orders, - # postdict={'order_id': sysID, 'type': 'bid', 'currency': 'XMR'}) # got - # - # rest.addReq('POST', '/info/orders', on_orders, - # postdict={'after': after, 'currency': 'XMR'}) # got - rest.addReq('POST', '/info/orders', onOrders, postdict={'currency': 'XMR'}) # got - - # rest.addReq('POST', '/info/orders', on_orders, - # postdict={'order_id': sysID, 'type': 'bid', 'after': after}) # 没有正在进行的交易 - # - # rest.addReq('POST', '/info/orders', on_orders, postdict={'after': after}) # 没有正在进行的交易 - # rest.addReq('POST', '/info/orders', on_orders, postdict={}) # 没有正在进行的交易 - # rest.addReq('POST', '/info/orders', on_orders, postdict={'currency': 'ALL'}) # 不支持该货币单位 - - raw_input() diff --git a/vnpy/trader/gateway/ctpGateway/ctpGateway.py b/vnpy/trader/gateway/ctpGateway/ctpGateway.py index 3d726019..f681b247 100644 --- a/vnpy/trader/gateway/ctpGateway/ctpGateway.py +++ b/vnpy/trader/gateway/ctpGateway/ctpGateway.py @@ -379,7 +379,7 @@ class CtpMdApi(MdApi): if tick.exchange == EXCHANGE_DCE: tick.date = datetime.now().strftime('%Y%m%d') - # 上交所,SEE,股票期权相关 + # 上交所,SSE,股票期权相关 if tick.exchange == EXCHANGE_SSE: tick.bidPrice2 = data['BidPrice2'] tick.bidVolume2 = data['BidVolume2'] diff --git a/vnpy/trader/gateway/futuGateway/futuGateway.py b/vnpy/trader/gateway/futuGateway/futuGateway.py index bb7eb19c..861f93a2 100644 --- a/vnpy/trader/gateway/futuGateway/futuGateway.py +++ b/vnpy/trader/gateway/futuGateway/futuGateway.py @@ -11,11 +11,11 @@ from time import sleep from datetime import datetime from copy import copy -from futuquant import (OpenQuoteContext, OpenHKTradeContext, OpenUSTradeContext, - RET_ERROR, RET_OK, - TrdEnv, TrdSide, OrderType, OrderStatus, ModifyOrderOp, - StockQuoteHandlerBase, OrderBookHandlerBase, - TradeOrderHandlerBase, TradeDealHandlerBase) +from futu import (OpenQuoteContext, OpenHKTradeContext, OpenUSTradeContext, + RET_ERROR, RET_OK, + TrdEnv, TrdSide, OrderType, OrderStatus, ModifyOrderOp, + StockQuoteHandlerBase, OrderBookHandlerBase, + TradeOrderHandlerBase, TradeDealHandlerBase) from vnpy.trader.vtGateway import * from vnpy.trader.vtConstant import GATEWAYTYPE_INTERNATIONAL @@ -353,7 +353,7 @@ class FutuGateway(VtGateway): #---------------------------------------------------------------------- def qryTrade(self): """查询成交""" - code, data = self.tradeCtx.deal_list_query(self.env) + code, data = self.tradeCtx.deal_list_query("", trd_env=self.env) if code: self.writeError(code, u'查询成交失败:%s' %data) diff --git a/vnpy/trader/gateway/okexfGateway/okexfGateway.py b/vnpy/trader/gateway/okexfGateway/okexfGateway.py index a81ad856..a62fbb22 100644 --- a/vnpy/trader/gateway/okexfGateway/okexfGateway.py +++ b/vnpy/trader/gateway/okexfGateway/okexfGateway.py @@ -388,6 +388,9 @@ class OkexfRestApi(RestClient): #---------------------------------------------------------------------- def onQueryPosition(self, data, request): """""" + if not data['holding']: + return + for d in data['holding'][0]: longPosition = VtPositionData() longPosition.gatewayName = self.gatewayName @@ -454,7 +457,7 @@ class OkexfRestApi(RestClient): """ order = request.extra order.status = STATUS_REJECTED - self.gateway.onOrder(vtOrder) + self.gateway.onOrder(order) #---------------------------------------------------------------------- def onSendOrder(self, data, request): @@ -683,8 +686,8 @@ class OkexfWebsocketApi(WebsocketClient): for n, buf in enumerate(data['asks']): price, volume = buf[:2] - tick.__setattr__('askPrice%s' %(n+1), float(price)) - tick.__setattr__('askVolume%s' %(n+1), int(volume)) + tick.__setattr__('askPrice%s' %(5-n), float(price)) + tick.__setattr__('askVolume%s' %(5-n), int(volume)) dt = datetime.fromtimestamp(data['timestamp']/1000) tick.date = dt.strftime('%Y%m%d') diff --git a/vnpy/trader/uiMainWindow.py b/vnpy/trader/uiMainWindow.py index 8e0f555a..a03e32d3 100644 --- a/vnpy/trader/uiMainWindow.py +++ b/vnpy/trader/uiMainWindow.py @@ -36,7 +36,7 @@ class MainWindow(QtWidgets.QMainWindow): #---------------------------------------------------------------------- def initUi(self): """初始化界面""" - self.setWindowTitle('VnTrader') + self.setWindowTitle('VN Trader') self.initCentral() self.initMenu() self.initStatusBar() @@ -335,7 +335,7 @@ class AboutWidget(QtWidgets.QDialog): #---------------------------------------------------------------------- def initUi(self): """""" - self.setWindowTitle(vtText.ABOUT + 'VnTrader') + self.setWindowTitle(vtText.ABOUT + 'VN Trader') text = u""" Developed by Traders, for Traders. diff --git a/vnpy/trader/vtEngine.py b/vnpy/trader/vtEngine.py index ee683840..c910c555 100644 --- a/vnpy/trader/vtEngine.py +++ b/vnpy/trader/vtEngine.py @@ -113,6 +113,8 @@ class MainEngine(object): if gateway: gateway.connect() + + self.dbConnect() #---------------------------------------------------------------------- def subscribe(self, subscribeReq, gatewayName): @@ -196,7 +198,7 @@ class MainEngine(object): # 读取MongoDB的设置 try: # 设置MongoDB操作的超时时间为0.5秒 - self.dbClient = MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort'], connectTimeoutMS=500) + self.dbClient = MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort'], serverSelectionTimeoutMS=10) # 调用server_info查询服务器状态,防止服务器异常并未连接成功 self.dbClient.server_info() @@ -208,6 +210,7 @@ class MainEngine(object): self.eventEngine.register(EVENT_LOG, self.dbLogging) except ConnectionFailure: + self.dbClient = None self.writeLog(text.DATABASE_CONNECTING_FAILED) #----------------------------------------------------------------------