diff --git a/examples/OptionMaster/CTP_connect.json b/examples/OptionMaster/CTP_connect.json new file mode 100644 index 00000000..e4bfa6b8 --- /dev/null +++ b/examples/OptionMaster/CTP_connect.json @@ -0,0 +1,7 @@ +{ + "brokerID": "9999", + "mdAddress": "tcp://180.168.146.187:10011", + "tdAddress": "tcp://180.168.146.187:10001", + "userID": "simnow申请", + "password": "simnow申请" +} \ No newline at end of file diff --git a/examples/OptionMaster/SEC_connect.json b/examples/OptionMaster/SEC_connect.json new file mode 100644 index 00000000..205c28db --- /dev/null +++ b/examples/OptionMaster/SEC_connect.json @@ -0,0 +1,6 @@ +{ + "accountID": "110100001088", + "password": "123456", + "mdAddress": "tcp://203.86.95.187:10915", + "tdAddress": "tcp://203.86.95.187:10910" +} \ No newline at end of file diff --git a/examples/OptionMaster/SEC_connect.json123 b/examples/OptionMaster/SEC_connect.json123 deleted file mode 100644 index b5e05452..00000000 --- a/examples/OptionMaster/SEC_connect.json123 +++ /dev/null @@ -1,6 +0,0 @@ -{ - "accountID": "200100000085", - "password": "332036", - "mdAddress": "tcp://101.226.253.121:20915", - "tdAddress": "tcp://101.226.253.121:20910" -} \ No newline at end of file diff --git a/examples/OptionMaster/data/TRADE_MARKET_110100001088sop.dat b/examples/OptionMaster/data/TRADE_MARKET_110100001088sop.dat index 3faff51b..14e7a965 100644 --- a/examples/OptionMaster/data/TRADE_MARKET_110100001088sop.dat +++ b/examples/OptionMaster/data/TRADE_MARKET_110100001088sop.dat @@ -1 +1 @@ -3PhIldq+vwFT0Ap6unvLxA== \ No newline at end of file +lBUnZalf043zoNavP3G6rA== \ No newline at end of file diff --git a/examples/OptionMaster/run.py b/examples/OptionMaster/run.py index bc6948b0..fac0164f 100644 --- a/examples/OptionMaster/run.py +++ b/examples/OptionMaster/run.py @@ -16,7 +16,7 @@ from vnpy.trader.uiQt import createQApp from vnpy.trader.uiMainWindow import MainWindow # 加载底层接口 -from vnpy.trader.gateway import (secGateway) +from vnpy.trader.gateway import (secGateway, ctpGateway) # 加载上层应用 from vnpy.trader.app import (riskManager, optionMaster) @@ -36,6 +36,7 @@ def main(): # 添加交易接口 me.addGateway(secGateway) + me.addGateway(ctpGateway) # 添加上层应用 me.addApp(riskManager) diff --git a/examples/VnTrader/HUOBI_connect.json b/examples/VnTrader/HUOBI_connect.json deleted file mode 100644 index 3c427281..00000000 --- a/examples/VnTrader/HUOBI_connect.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "accessKey": "火币网站申请", - "secretKey": "火币网站申请", - "interval": 0.5, - "market": "cny", - "debug": false -} \ No newline at end of file diff --git a/examples/VnTrader/OKCOIN_connect.json b/examples/VnTrader/OKCOIN_connect.json deleted file mode 100644 index 8240e55e..00000000 --- a/examples/VnTrader/OKCOIN_connect.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "host": "CNY", - "apiKey": "OKCOIN网站申请", - "secretKey": "OKCOIN网站申请", - "trace": false, - "leverage": 20 -} \ No newline at end of file diff --git a/examples/VnTrader/TKPRO_connect.json b/examples/VnTrader/TKPRO_connect.json new file mode 100644 index 00000000..38b5a354 --- /dev/null +++ b/examples/VnTrader/TKPRO_connect.json @@ -0,0 +1,7 @@ +{ + "username": "请在quantos.org申请", + "token": "请在quantos.org申请", + "strategy": 625, + "tradeAddress": "tcp://gw.quantos.org:8901", + "dataAddress": "tcp://data.tushare.org:8910" +} \ No newline at end of file diff --git a/examples/VnTrader/data/TRADE_MARKET_110100001088sop.dat b/examples/VnTrader/data/TRADE_MARKET_110100001088sop.dat deleted file mode 100644 index 78625f8b..00000000 --- a/examples/VnTrader/data/TRADE_MARKET_110100001088sop.dat +++ /dev/null @@ -1 +0,0 @@ -PurwJxAEvTeQ9X8HMnmMRw== \ No newline at end of file diff --git a/examples/VnTrader/data/TRADE_MARKET_110100001088stock.dat b/examples/VnTrader/data/TRADE_MARKET_110100001088stock.dat deleted file mode 100644 index e69de29b..00000000 diff --git a/examples/VnTrader/run.py b/examples/VnTrader/run.py index b771eff9..e9bf3d04 100644 --- a/examples/VnTrader/run.py +++ b/examples/VnTrader/run.py @@ -17,7 +17,7 @@ from vnpy.trader.uiMainWindow import MainWindow # 加载底层接口 from vnpy.trader.gateway import (ctpGateway, oandaGateway, ibGateway, - huobiGateway, okcoinGateway) + tkproGateway) if system == 'Windows': from vnpy.trader.gateway import (femasGateway, xspeedGateway, @@ -44,10 +44,9 @@ def main(): # 添加交易接口 me.addGateway(ctpGateway) + me.addGateway(tkproGateway) me.addGateway(oandaGateway) me.addGateway(ibGateway) - me.addGateway(huobiGateway) - me.addGateway(okcoinGateway) if system == 'Windows': me.addGateway(femasGateway) diff --git a/vnpy/trader/gateway/tkproGateway/DataApi/LICENSE b/vnpy/trader/gateway/tkproGateway/DataApi/LICENSE new file mode 100644 index 00000000..261eeb9e --- /dev/null +++ b/vnpy/trader/gateway/tkproGateway/DataApi/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vnpy/trader/gateway/tkproGateway/DataApi/README.md b/vnpy/trader/gateway/tkproGateway/DataApi/README.md new file mode 100644 index 00000000..44280d62 --- /dev/null +++ b/vnpy/trader/gateway/tkproGateway/DataApi/README.md @@ -0,0 +1,44 @@ + +# DataApi + +标准数据API定义。 + +# 安装步骤 + +## 1、安装Python环境 + +如果本地还没有安装Python环境,强烈建议安装 [Anaconda](http://www.continuum.io/downloads "Anaconda")。 + +打开上面的网址,选择相应的操作系统,确定要按照的Python版本,一般建议用Python 2.7。 + +下载完成以后,按照图形界面步骤完成安装。在默认情况下,Anaconda会自动设置PATH环境。 + +## 2、安装依赖包 + +如果Python环境不是类似Anaconda的集成开发环境,我们需要单独安装依赖包,在已经有pandas/numpy包前提下,还需要有以下几个包: + +- pyzmq +- msgpack_python +- python-snappy + +可以通过单个安装完成,例如: `pip install pyzmq` + +需要注意的是,python-snappy这个包在Windows上的安装需要比较多的编译依赖,建议从[这个网页](http://www.lfd.uci.edu/~gohlke/pythonlibs)下载编译好的包,然后安装: + +```shell +pip install python_snappy-0.5.1-cp27-cp27m-win_amd64.whl +``` + + +## 3、使用DataApi + +```python +from .data_api import DataApi # 这里假设工作目录是项目根目录 + +api = DataApi(addr="tcp://data.tushare.org:8910") +result, msg = api.login("username", "token") # 示例账户,用户需要改为自己在www.quantos.org上注册的账户 +print(result) +print(msg) + +``` + diff --git a/vnpy/trader/gateway/tkproGateway/DataApi/__init__.py b/vnpy/trader/gateway/tkproGateway/DataApi/__init__.py new file mode 100644 index 00000000..d56c4b78 --- /dev/null +++ b/vnpy/trader/gateway/tkproGateway/DataApi/__init__.py @@ -0,0 +1,12 @@ +# encoding: utf-8 +""" +Core data api for fetching data from remote service. +""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from .data_api import DataApi + +__all__ = ['DataApi'] diff --git a/vnpy/trader/gateway/tkproGateway/DataApi/data_api.py b/vnpy/trader/gateway/tkproGateway/DataApi/data_api.py new file mode 100644 index 00000000..9c01e748 --- /dev/null +++ b/vnpy/trader/gateway/tkproGateway/DataApi/data_api.py @@ -0,0 +1,578 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from builtins import * +import time + +import numpy as np + +from . import jrpc_py +# import jrpc +from . import utils + + +# def set_log_dir(log_dir): +# if log_dir: +# jrpc.set_log_dir(log_dir) + +class DataApiCallback(object): + """DataApi Callback + + def on_quote(quote): + pass + + def on_connection() + """ + + def __init__(self): + self.on_quote = None + + +class DataApi(object): + """ + Abstract base class providing both historic and live data + from various data sources. + Current API version: 1.0 + + Attributes + ---------- + + + Methods + ------- + subscribe + quote + daily + bar + bar_quote + + """ + + def __init__(self, addr="tcp://data.tushare.org:8910", use_jrpc=False): + """Create DataApi client. + + If use_jrpc, try to load the C version of JsonRpc. If failed, use pure + Python version of JsonRpc. + """ + self._remote = None + # if use_jrpc: + # try: + # import jrpc + # self._remote = jrpc.JRpcClient() + # except Exception as e: + # print "Can't load jrpc", e.message + + if not self._remote: + self._remote = jrpc_py.JRpcClient() + + self._remote.on_rpc_callback = self._on_rpc_callback + self._remote.on_disconnected = self._on_disconnected + self._remote.on_connected = self._on_connected + self._remote.connect(addr) + + self._on_jsq_callback = None + + self._connected = False + self._loggined = False + self._username = "" + self._password = "" + self._data_format = "default" + self._callback = None + self._schema = [] + self._schema_id = 0 + self._schema_map = {} + self._sub_hash = "" + self._subscribed_set = set() + self._timeout = 20 + + def login(self, username, password): + + """ + Login before using data api. + + Parameters + ---------- + username : str + username + password : str + password + """ + for i in range(3): + if self._connected: + break + time.sleep(1) + + if not self._connected: + return (None, "-1,no connection") + + self._username = username + self._password = password + return self._do_login() + + def logout(self): + """ + Logout to stop using the data api or switch users. + + """ + self._loggined = None + + rpc_params = {} + + cr = self._remote.call("auth.logout", rpc_params) + return utils.extract_result(cr) + + def close(self): + """ + Close the data api. + + """ + + self._remote.close() + + # def set_callback(self, callback): + + # self._callback = callback + + def set_timeout(self, timeout): + """ + Set timeout for data api. + Default timeout is 20s. + + Parameters + ---------- + timeout : int + the max waiting time for the api return + + """ + self._timeout = timeout + + def set_data_format(self, format): + """Set queried data format. + + Available formats are: + "" -- Don't convert data, usually the type is map + "pandas" -- Convert table likely data to DataFrame + """ + self._data_format = format + + def set_heartbeat(self, interval, timeout): + self._remote.set_hearbeat_options(interval, timeout) + + def quote(self, symbol, fields="", data_format="", **kwargs): + + r, msg = self._call_rpc("jsq.query", + self._get_format(data_format, "pandas"), + "Quote", + _index_column="symbol", + symbol=str(symbol), + fields=fields, + **kwargs) + return (r, msg) + + def bar(self, symbol, start_time=200000, end_time=160000, + trade_date=0, freq="1m", fields="", data_format="", **kwargs): + + """ + Query minute bars of various type, return DataFrame. + + Parameters + ---------- + symbol : str + support multiple securities, separated by comma. + start_time : int (HHMMSS) or str ('HH:MM:SS') + Default is market open time. + end_time : int (HHMMSS) or str ('HH:MM:SS') + Default is market close time. + trade_date : int (YYYMMDD) or str ('YYYY-MM-DD') + Default is current trade_date. + fields : str, optional + separated by comma ',', default "" (all fields included). + freq : trade.common.MINBAR_TYPE, optional + {'1m', '5m', '15m'}, Minute bar type, default is '1m' + + Returns + ------- + df : pd.DataFrame + columns: + symbol, code, date, time, trade_date, freq, open, high, low, close, volume, turnover, vwap, oi + msg : str + error code and error message joined by comma + + Examples + -------- + df, msg = api.bar("000001.SH,cu1709.SHF", start_time="09:56:00", end_time="13:56:00", + trade_date="20170823", fields="open,high,low,last,volume", freq="5m") + + """ + + begin_time = utils.to_time_int(start_time) + if (begin_time == -1): + return (-1, "Begin time format error") + end_time = utils.to_time_int(end_time) + if (end_time == -1): + return (-1, "End time format error") + trade_date = utils.to_date_int(trade_date) + if (trade_date == -1): + return (-1, "Trade date format error") + + return self._call_rpc("jsi.query", + self._get_format(data_format, "pandas"), + "Bar", + symbol=str(symbol), + fields=fields, + freq=freq, + trade_date=trade_date, + begin_time=begin_time, + end_time=end_time, + **kwargs) + + def bar_quote(self, symbol, start_time=200000, end_time=160000, + trade_date=0, freq="1m", fields="", data_format="", **kwargs): + """ + Query minute bars of various type, return DataFrame. + It will also return ask/bid informations of the last quote in this bar + + + Parameters + ---------- + symbol : str + support multiple securities, separated by comma. + start_time : int (HHMMSS) or str ('HH:MM:SS') + Default is market open time. + end_time : int (HHMMSS) or str ('HH:MM:SS') + Default is market close time. + trade_date : int (YYYMMDD) or str ('YYYY-MM-DD') + Default is current trade_date. + fields : str, optional + separated by comma ',', default "" (all fields included). + freq : trade.common.MINBAR_TYPE, optional + {'1m', '5m', '15m'}, Minute bar type, default is '1m' + + Returns + ------- + df : pd.DataFrame + columns: + symbol, code, date, time, trade_date, freq, open, high, low, close, volume, turnover, vwap, oi, + askprice1, askprice2, askprice3, askprice4, askprice5, + bidprice1, bidprice2, bidprice3, bidprice4, bidprice5, + askvolume1, askvolume2, askvolume3, askvolume4, askvolume5, + bidvolume1, bidvolume2, bidvolume3, bidvolume4, bidvolume5 + + msg : str + error code and error message joined by comma + + Examples + -------- + df, msg = api.bar_quote("000001.SH,cu1709.SHF", start_time="09:56:00", end_time="13:56:00", + trade_date="20170823", fields="open,high,low,last,volume", freq="5m") + + """ + begin_time = utils.to_time_int(start_time) + if (begin_time == -1): + return (-1, "Begin time format error") + end_time = utils.to_time_int(end_time) + if (end_time == -1): + return (-1, "End time format error") + trade_date = utils.to_date_int(trade_date) + if (trade_date == -1): + return (-1, "Trade date format error") + + return self._call_rpc("jsi.bar_view", + self._get_format(data_format, "pandas"), + "BarQuote", + symbol=str(symbol), + fields=fields, + freq=freq, + trade_date=trade_date, + begin_time=begin_time, + end_time=end_time, + **kwargs) + + def daily(self, symbol, start_date, end_date, + adjust_mode=None, freq="1d", fields="", + data_format="", **kwargs): + + """ + Query dar bar, + support auto-fill suspended securities data, + support auto-adjust for splits, dividends and distributions. + + Parameters + ---------- + symbol : str + support multiple securities, separated by comma. + start_date : int or str + YYYMMDD or 'YYYY-MM-DD' + end_date : int or str + YYYMMDD or 'YYYY-MM-DD' + fields : str, optional + separated by comma ',', default "" (all fields included). + adjust_mode : str or None, optional + None for no adjust; + 'pre' for forward adjust; + 'post' for backward adjust. + + Returns + ------- + df : pd.DataFrame + columns: + symbol, code, trade_date, open, high, low, close, volume, turnover, vwap, oi, suspended + msg : str + error code and error message joined by comma + + Examples + -------- + df, msg = api.daily("000001.SH,cu1709.SHF",start_date=20170503, end_date=20170708, + fields="open,high,low,last,volume", adjust_mode = "post") + + """ + + if adjust_mode == None: + adjust_mode = "none" + + begin_date = utils.to_date_int(start_date) + if (begin_date == -1): + return (-1, "Begin date format error") + end_date = utils.to_date_int(end_date) + if (end_date == -1): + return (-1, "End date format error") + + return self._call_rpc("jsd.query", + self._get_format(data_format, "pandas"), + "Daily", + symbol=str(symbol), + fields=fields, + begin_date=begin_date, + end_date=end_date, + adjust_mode=adjust_mode, + freq=freq, + **kwargs) + + def query(self, view, filter="", fields="", data_format="", **kwargs): + """ + Get various reference data. + + Parameters + ---------- + view : str + data source. + fields : str + Separated by ',' + filter : str + filter expressions. + kwargs + + Returns + ------- + df : pd.DataFrame + msg : str + error code and error message, joined by ',' + + Examples + -------- + res3, msg3 = ds.query("lb.secDailyIndicator", fields="price_level,high_52w_adj,low_52w_adj",\ + filter="start_date=20170907&end_date=20170907",\ + data_format='pandas') + view does not change. fileds can be any field predefined in reference data api. + + """ + return self._call_rpc("jset.query", + self._get_format(data_format, "pandas"), + "JSetData", + view=view, + fields=fields, + filter=filter, + **kwargs) + + def subscribe(self, symbol, func=None, fields=""): + """Subscribe securites + + This function adds new securities to subscribed list on the server. If + success, return subscribed codes. + + If securities is empty, return current subscribed codes. + """ + r, msg = self._check_session() + if not r: + return (r, msg) + + if func: + self._on_jsq_callback = func + + rpc_params = {"symbol": symbol, + "fields": fields} + + cr = self._remote.call("jsq.subscribe", rpc_params) + + rsp, msg = utils.extract_result(cr, data_format="", class_name="SubRsp") + if not rsp: + return (rsp, msg) + + new_codes = [x.strip() for x in symbol.split(',') if x] + + self._subscribed_set = self._subscribed_set.union(set(new_codes)) + self._schema_id = rsp['schema_id'] + self._schema = rsp['schema'] + self._sub_hash = rsp['sub_hash'] + self._make_schema_map() + return (rsp['symbols'], msg) + + def unsubscribe(self, symbol): + """Unsubscribe securities. + + Unscribe codes and return list of subscribed code. + """ + assert False, "NOT IMPLEMENTED" + + def __del__(self): + self._remote.close() + + def _on_disconnected(self): + """JsonRpc callback""" + # print "DataApi: _on_disconnected" + self._connected = False + + if self._callback: + self._callback("connection", False) + + def _on_connected(self): + """JsonRpc callback""" + self._connected = True + + self._do_login() + self._do_subscribe() + + if self._callback: + self._callback("connection", True) + + def _check_session(self): + if not self._connected: + return (False, "no connection") + elif self._loggined: + return (True, "") + elif self._username and self._password: + return self._do_login() + else: + return (False, "no login session") + + def _get_format(self, format, default_format): + if format: + return format + elif self._data_format != "default": + return self._data_format + else: + return default_format + + def set_callback(self, callback): + self._callback = callback + + def _convert_quote_ind(self, quote_ind): + """Convert original quote_ind to a map. + + The original quote_ind contains field index instead of field name! + """ + + if quote_ind['schema_id'] != self._schema_id: + return None + + indicators = quote_ind['indicators'] + values = quote_ind['values'] + + max_index = len(self._schema) + + quote = {} + for i in range(len(indicators)): + if indicators[i] < max_index: + quote[self._schema_map[indicators[i]]['name']] = values[i] + else: + quote[str(indicators[i])] = values[i] + + return quote + + def _on_rpc_callback(self, method, data): + # print "_on_rpc_callback:", method, data + + try: + if method == "jsq.quote_ind": + if self._on_jsq_callback: + q = self._convert_quote_ind(data) + if q: + self._on_jsq_callback("quote", q) + + elif method == ".sys.heartbeat": + if 'sub_hash' in data: + if self._sub_hash and self._sub_hash != data['sub_hash']: + print("sub_hash is not same", self._sub_hash, data['sub_hash']) + self._do_subscribe() + + except Exception as e: + print("Can't load jrpc", e.message) + + def _call_rpc(self, method, data_format, data_class, **kwargs): + + r, msg = self._check_session() + if not r: + return (r, msg) + + index_column = None + rpc_params = {} + for key, value in kwargs.items(): + if key == '_index_column': + index_column = value + else: + if isinstance(value, (int, np.integer)): + value = int(value) + rpc_params[key] = value + + cr = self._remote.call(method, rpc_params, timeout=self._timeout) + + return utils.extract_result(cr, data_format=data_format, index_column=index_column, class_name=data_class) + + def _make_schema_map(self): + self._schema_map = {} + for schema in self._schema: + self._schema_map[schema['id']] = schema + + def _do_login(self): + # Shouldn't check connected flag here. ZMQ is a mesageq queue! + # if !self._connected : + # return (False, "-1,no connection") + + if self._username and self._password: + rpc_params = {"username": self._username, + "password": self._password} + + cr = self._remote.call("auth.login", rpc_params) + r, msg = utils.extract_result(cr, data_format="", class_name="UserInfo") + self._loggined = r + return (r, msg) + else: + self._loggined = None + return (False, "-1,empty username or password") + + def _do_subscribe(self): + """Subscribe again when reconnected or hash_code is not same""" + if not self._subscribed_set: return + + codes = list(self._subscribed_set) + codes.sort() + + # XXX subscribe with default fields! + rpc_params = {"symbol": ",".join(codes), + "fields": ""} + + cr = self._remote.call("jsq.subscribe", rpc_params) + + rsp, msg = utils.extract_result(cr, data_format="", class_name="SubRsp") + if not rsp: + # return (rsp, msg) + return + + self._schema_id = rsp['schema_id'] + self._schema = rsp['schema'] + self._sub_hash = rsp['sub_hash'] + # return (rsp.securities, msg) + + self._make_schema_map() diff --git a/vnpy/trader/gateway/tkproGateway/DataApi/jrpc_py.py b/vnpy/trader/gateway/tkproGateway/DataApi/jrpc_py.py new file mode 100644 index 00000000..da3cd059 --- /dev/null +++ b/vnpy/trader/gateway/tkproGateway/DataApi/jrpc_py.py @@ -0,0 +1,309 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from builtins import * + +import zmq +import time +import random + +try: + import queue +except ImportError: + import queue as queue +import threading +import msgpack +import snappy +import copy + +qEmpty = copy.copy(queue.Empty) + + +def _unpack(str): + if str.startswith(b'S'): + tmp = snappy.uncompress(str[1:]) + # print "SNAPPY: ", len(str), len(tmp) + obj = msgpack.loads(tmp, encoding='utf-8') + elif str.startswith(b'\0'): + obj = msgpack.loads(str[1:], encoding='utf-8') + else: + return None + + return obj + + +def _pack(obj): + # print "pack", obj + tmp = msgpack.dumps(obj, encoding='utf-8') + if len(tmp) > 1000: + return b'S' + snappy.compress(tmp) + else: + return b'\0' + tmp + + +class JRpcClient(object): + def __init__(self): + self._waiter_lock = threading.Lock() + self._waiter_map = {} + + self._should_close = False + self._next_callid = 0 + self._send_lock = threading.Lock() + self._callid_lock = threading.Lock() + + self._last_heartbeat_rsp_time = 0 + self._connected = False + + self.on_disconnected = None + self.on_rpc_callback = None + self._callback_queue = queue.Queue() + self._call_wait_queue = queue.Queue() + + self._ctx = zmq.Context() + self._pull_sock = self._ctx.socket(zmq.PULL) + self._pull_sock.bind("inproc://pull_sock") + self._push_sock = self._ctx.socket(zmq.PUSH) + self._push_sock.connect("inproc://pull_sock") + + self._heartbeat_interval = 1 + self._heartbeat_timeout = 3 + + self._addr = None + + t = threading.Thread(target=self._recv_run) + t.setDaemon(True) + t.start() + self._recv_thread = t + + t = threading.Thread(target=self._callback_run) + t.setDaemon(True) + t.start() + self._callback_thread = t + + def __del__(self): + self.close() + + def next_callid(self): + self._callid_lock.acquire() + self._next_callid += 1 + callid = self._next_callid + self._callid_lock.release() + return callid + + def set_heartbeat_options(self, interval, timeout): + self._heartbeat_interval = interval + self._heartbeat_timeout = timeout + + def _recv_run(self): + + heartbeat_time = 0 + + poller = zmq.Poller() + poller.register(self._pull_sock, zmq.POLLIN) + + remote_sock = None + + while not self._should_close: + + try: + if self._connected and time.time() - self._last_heartbeat_rsp_time > self._heartbeat_timeout: + self._connected = False + if self.on_disconnected: self._async_call(self.on_disconnected) + + if remote_sock and time.time() - heartbeat_time > self._heartbeat_interval: + self._send_hearbeat() + heartbeat_time = time.time() + + socks = dict(poller.poll(500)) + if self._pull_sock in socks and socks[self._pull_sock] == zmq.POLLIN: + cmd = self._pull_sock.recv() + if cmd == b"CONNECT": + # print time.ctime(), "CONNECT " + self._addr + if remote_sock: + poller.unregister(remote_sock) + remote_sock.close() + remote_sock = None + + remote_sock = self._do_connect() + + if remote_sock: + poller.register(remote_sock, zmq.POLLIN) + + elif cmd.startswith(b"SEND:") and remote_sock: + # print time.ctime(), "SEND " + cmd[5:] + remote_sock.send(cmd[5:]) + + if remote_sock and remote_sock in socks and socks[remote_sock] == zmq.POLLIN: + data = remote_sock.recv() + if data: + # if not data.find("heartbeat"): + # print time.ctime(), "RECV", data + self._on_data_arrived(data) + + except zmq.error.Again as e: + # print "RECV timeout: ", e + pass + except Exception as e: + print("_recv_run:", e) + + def _callback_run(self): + while not self._should_close: + try: + r = self._callback_queue.get(timeout=1) + if r: + r() + except qEmpty as e: + pass + except TypeError as e: + if str(e) == "'NoneType' object is not callable": + pass + else: + print("_callback_run {}".format(r), type(e), e) + except Exception as e: + print("_callback_run {}".format(r), type(e), e) + + def _async_call(self, func): + self._callback_queue.put(func) + + def _send_request(self, json): + + try: + self._send_lock.acquire() + self._push_sock.send(b"SEND:" + json) + + finally: + self._send_lock.release() + + def connect(self, addr): + self._addr = addr + self._push_sock.send_string('CONNECT', encoding='utf-8') + + def _do_connect(self): + + client_id = str(random.randint(1000000, 100000000)) + + socket = self._ctx.socket(zmq.DEALER) + identity = (client_id) + '$' + str(random.randint(1000000, 1000000000)) + identity = identity.encode('utf-8') + socket.setsockopt(zmq.IDENTITY, identity) + socket.setsockopt(zmq.RCVTIMEO, 500) + socket.setsockopt(zmq.SNDTIMEO, 500) + socket.setsockopt(zmq.LINGER, 0) + socket.connect(self._addr) + + return socket + + def close(self): + self._should_close = True + self._callback_thread.join() + self._recv_thread.join() + + def _on_data_arrived(self, str): + try: + msg = _unpack(str) + # print "RECV", msg + + if not msg: + print("wrong message format") + return + + if 'method' in msg and msg['method'] == '.sys.heartbeat': + self._last_heartbeat_rsp_time = time.time() + if not self._connected: + self._connected = True + if self.on_connected: + self._async_call(self.on_connected) + + # Let user has a chance to check message in .sys.heartbeat + if 'result' in msg and self.on_rpc_callback: + self._async_call(lambda: self.on_rpc_callback(msg['method'], msg['result'])) + + elif 'id' in msg and msg['id']: + + # Call result + id = int(msg['id']) + + if self._waiter_lock.acquire(): + if id in self._waiter_map: + q = self._waiter_map[id] + if q: q.put(msg) + self._waiter_lock.release() + else: + # Notification message + if 'method' in msg and 'result' in msg and self.on_rpc_callback: + self._async_call(lambda: self.on_rpc_callback(msg['method'], msg['result'])) + + except Exception as e: + print("_on_data_arrived:", e) + pass + + def _send_hearbeat(self): + msg = {'jsonrpc': '2.0', + 'method': '.sys.heartbeat', + 'params': {'time': time.time()}, + 'id': str(self.next_callid())} + json_str = _pack(msg) + self._send_request(json_str) + + def _alloc_wait_queue(self): + self._waiter_lock.acquire() + if self._call_wait_queue: + q = self._call_wait_queue + self._call_wait_queue = None + else: + q = queue.Queue() + self._waiter_lock.release() + return q + + def _free_wait_queue(self, q): + self._waiter_lock.acquire() + if not self._call_wait_queue: + self._call_wait_queue = q + else: + del q + self._waiter_lock.release() + + def call(self, method, params, timeout=6): + # print "call", method, params, timeout + callid = self.next_callid() + if timeout: + q = self._alloc_wait_queue() + + self._waiter_lock.acquire() + self._waiter_map[callid] = q + self._waiter_lock.release() + + msg = {'jsonrpc': '2.0', + 'method': method, + 'params': params, + 'id': str(callid)} + + # print "SEND", msg + json_str = _pack(msg) + self._send_request(json_str) + + if timeout: + ret = {} + try: + r = q.get(timeout=timeout) + q.task_done() + except qEmpty: + r = None + + self._waiter_lock.acquire() + self._waiter_map[callid] = None + self._waiter_lock.release() + self._free_wait_queue(q) + + if r: + if 'result' in r: + ret['result'] = r['result'] + + if 'error' in r: + ret['error'] = r['error'] + + return ret if ret else {'error': {'error': -1, 'message': "timeout"}} + else: + return {'result': True} diff --git a/vnpy/trader/gateway/tkproGateway/DataApi/utils.py b/vnpy/trader/gateway/tkproGateway/DataApi/utils.py new file mode 100644 index 00000000..32fa05c5 --- /dev/null +++ b/vnpy/trader/gateway/tkproGateway/DataApi/utils.py @@ -0,0 +1,137 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from builtins import * +from collections import namedtuple +import datetime as dt +import pandas as pd +import numpy as np + +long_nan = 9223372036854775807 + + +def is_long_nan(v): + if v == long_nan: + return True + else: + return False + + +def to_nan(x): + if is_long_nan(x): + return np.nan + else: + return x + + +def _to_date(row): + date = int(row['DATE']) + return pd.datetime(year=date // 10000, month=date // 100 % 100, day=date % 100) + + +def _to_datetime(row): + date = int(row['DATE']) + time = int(row['TIME']) // 1000 + return pd.datetime(year=date // 10000, month=date / 100 % 100, day=date % 100, + hour=time // 10000, minute=time // 100 % 100, second=time % 100) + + +def _to_dataframe(cloumset, index_func=None, index_column=None): + df = pd.DataFrame(cloumset) + for col in df.columns: + if df.dtypes.loc[col] == np.int64: + df.loc[:, col] = df.loc[:, col].apply(to_nan) + if index_func: + df.index = df.apply(index_func, axis=1) + elif index_column: + df.index = df[index_column] + del df.index.name + + return df + + +def _error_to_str(error): + if error: + if 'message' in error: + return str(error['error']) + "," + error['message'] + else: + return str(error['error']) + "," + else: + return "," + + +def to_obj(class_name, data): + try: + if type(data) == list or type(data) == tuple: + result = [] + for d in data: + result.append(namedtuple(class_name, list(d.keys()))(*list(d.values()))) + return result + + elif type(data) == dict: + result = namedtuple(class_name, list(data.keys()))(*list(data.values())) + return result + else: + return data + except Exception as e: + print(class_name, data, e) + return data + + +def to_date_int(date): + if isinstance(date, str): + t = dt.datetime.strptime(date, "%Y-%m-%d") + date_int = t.year * 10000 + t.month * 100 + t.day + return date_int + elif isinstance(date, (int, np.integer)): + return date + else: + return -1 + + +def to_time_int(time): + if isinstance(time, str): + t = dt.datetime.strptime(time, "%H:%M:%S") + time_int = t.hour * 10000 + t.minute * 100 + t.second + return time_int + elif isinstance(time, (int, np.integer)): + return time + else: + return -1 + + +def extract_result(cr, data_format="", index_column=None, class_name=""): + """ + format supports pandas, obj. + """ + + err = _error_to_str(cr['error']) if 'error' in cr else None + if 'result' in cr: + if data_format == "pandas": + if index_column: + return (_to_dataframe(cr['result'], None, index_column), err) + # if 'TIME' in cr['result']: + # return (_to_dataframe(cr['result'], _to_datetime), err) + # elif 'DATE' in cr['result']: + # return (_to_dataframe(cr['result'], _to_date), err) + else: + return (_to_dataframe(cr['result']), err) + + elif data_format == "obj" and cr['result'] and class_name: + r = cr['result'] + if type(r) == list or type(r) == tuple: + result = [] + for d in r: + result.append(namedtuple(class_name, list(d.keys()))(*list(d.values()))) + elif type(r) == dict: + result = namedtuple(class_name, list(r.keys()))(*list(r.values())) + else: + result = r + + return (result, err) + else: + return (cr['result'], err) + else: + return (None, err) diff --git a/vnpy/trader/gateway/tkproGateway/TKPRO_connect.json b/vnpy/trader/gateway/tkproGateway/TKPRO_connect.json new file mode 100644 index 00000000..38b5a354 --- /dev/null +++ b/vnpy/trader/gateway/tkproGateway/TKPRO_connect.json @@ -0,0 +1,7 @@ +{ + "username": "请在quantos.org申请", + "token": "请在quantos.org申请", + "strategy": 625, + "tradeAddress": "tcp://gw.quantos.org:8901", + "dataAddress": "tcp://data.tushare.org:8910" +} \ No newline at end of file diff --git a/vnpy/trader/gateway/tkproGateway/TradeApi/LICENSE b/vnpy/trader/gateway/tkproGateway/TradeApi/LICENSE new file mode 100644 index 00000000..261eeb9e --- /dev/null +++ b/vnpy/trader/gateway/tkproGateway/TradeApi/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vnpy/trader/gateway/tkproGateway/TradeApi/README.md b/vnpy/trader/gateway/tkproGateway/TradeApi/README.md new file mode 100644 index 00000000..d0f90f99 --- /dev/null +++ b/vnpy/trader/gateway/tkproGateway/TradeApi/README.md @@ -0,0 +1,48 @@ +# TradeApi + +标准交易API定义 + +# 安装步骤 + +1、安装Python环境 +------ + +如果本地还没有安装Python环境,强烈建议安装 [Anaconda](http://www.continuum.io/downloads "Anaconda")。 + +打开上面的网址,选择相应的操作系统,确定要按照的Python版本,一般建议用Python 2.7。 + +下载完成以后,按照图形界面步骤完成安装。在默认情况下,Anaconda会自动设置PATH环境。 + +2、安装依赖包 +---------------- + +如果Python环境不是类似Anaconda的集成开发环境,我们需要单独安装依赖包,在已经有pandas/numpy包前提下,还需要有以下几个包: + + pyzmq + msgpack_python + python-snappy + +可以通过单个安装完成,例如: pip install pyzmq + +需要注意的是,python-snappy和msgpack-python这两个包在Windows上的安装需要比较多的编译依赖,建议从[这个网页](http://www.lfd.uci.edu/~gohlke/pythonlibs)下载编译好的包,然后安装: + + pip install msgpack_python-0.4.8-cp27-cp27m-win_amd64.whl + + pip install python_snappy-0.5.1-cp27-cp27m-win_amd64.whl + + + +3、使用TradeApi +-------- + +在项目目录,验证TradeApi是否正常使用。 + +```python +from TradeApi import TradeApi + +api = TradeApi(addr="tcp://gw.quantos.org:8901") +result, msg = api.login("username", "token") # 示例账户,用户需要改为自己在www.quantos.org上注册的账户 +print result +print msg + +``` diff --git a/vnpy/trader/gateway/tkproGateway/TradeApi/__init__.py b/vnpy/trader/gateway/tkproGateway/TradeApi/__init__.py new file mode 100644 index 00000000..a7f82e5d --- /dev/null +++ b/vnpy/trader/gateway/tkproGateway/TradeApi/__init__.py @@ -0,0 +1,12 @@ +# encoding: utf-8 +""" +Core trade api for simulated and live trading. +""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from .trade_api import TradeApi + +__all__ = ['TradeApi'] diff --git a/vnpy/trader/gateway/tkproGateway/TradeApi/jrpc_py.py b/vnpy/trader/gateway/tkproGateway/TradeApi/jrpc_py.py new file mode 100644 index 00000000..5d9ff0af --- /dev/null +++ b/vnpy/trader/gateway/tkproGateway/TradeApi/jrpc_py.py @@ -0,0 +1,341 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +import json +import random +import time +from builtins import * + +import zmq + +try: + import queue +except ImportError: + import queue as queue +import threading +import msgpack +import snappy +import copy + +qEmpty = copy.copy(queue.Empty) + + +def _unpack_msgpack_snappy(str): + if str.startswith(b'S'): + tmp = snappy.uncompress(str[1:]) + # print "SNAPPY: ", len(str), len(tmp) + obj = msgpack.loads(tmp, encoding='utf-8') + elif str.startswith(b'\0'): + obj = msgpack.loads(str[1:], encoding='utf-8') + else: + return None + + return obj + + +def _pack_msgpack_snappy(obj): + # print "pack", obj + tmp = msgpack.dumps(obj, encoding='utf-8') + if len(tmp) > 1000: + return b'S' + snappy.compress(tmp) + else: + return b'\0' + tmp + + +def _unpack_msgpack(str): + return msgpack.loads(str, encoding='utf-8') + + +def _pack_msgpack(obj): + return msgpack.dumps(obj, encoding='utf-8') + + +def _unpack_json(str): + return json.loads(str, encoding='utf-8') + + +def _pack_json(obj): + return json.dumps(obj, encoding='utf-8') + + +class JRpcClient(object): + def __init__(self, data_format="msgpack_snappy"): + self._waiter_lock = threading.Lock() + self._waiter_map = {} + + self._should_close = False + self._next_callid = 0 + self._send_lock = threading.Lock() + self._callid_lock = threading.Lock() + + self._last_heartbeat_rsp_time = 0 + self._connected = False + + self.on_disconnected = None + self.on_rpc_callback = None + self._callback_queue = queue.Queue() + self._call_wait_queue = queue.Queue() + + self._ctx = zmq.Context() + self._pull_sock = self._ctx.socket(zmq.PULL) + self._pull_sock.bind("inproc://pull_sock") + self._push_sock = self._ctx.socket(zmq.PUSH) + self._push_sock.connect("inproc://pull_sock") + + self._heartbeat_interval = 1 + self._heartbeat_timeout = 3 + + self._addr = None + + if data_format == "msgpack_snappy": + self._pack = _pack_msgpack_snappy + self._unpack = _unpack_msgpack_snappy + + elif data_format == "msgpack": + self._pack = _pack_msgpack + self._unpack = _unpack_msgpack + + elif data_format == "json": + self._pack = _pack_json + self._unpack = _unpack_json + + else: + assert False, "unknown data_format " + data_format + + t = threading.Thread(target=self._recv_run) + t.setDaemon(True) + t.start() + self._recv_thread = t + + t = threading.Thread(target=self._callback_run) + t.setDaemon(True) + t.start() + self._callback_thread = t + + def __del__(self): + self.close() + + def next_callid(self): + self._callid_lock.acquire() + self._next_callid += 1 + callid = self._next_callid + self._callid_lock.release() + return callid + + def set_heartbeat_options(self, interval, timeout): + self._heartbeat_interval = interval + self._heartbeat_timeout = timeout + + def _recv_run(self): + + heartbeat_time = 0 + + poller = zmq.Poller() + poller.register(self._pull_sock, zmq.POLLIN) + + remote_sock = None + + while not self._should_close: + + try: + if self._connected and time.time() - self._last_heartbeat_rsp_time > self._heartbeat_timeout: + self._connected = False + if self.on_disconnected: self._async_call(self.on_disconnected) + + if remote_sock and time.time() - heartbeat_time > self._heartbeat_interval: + self._send_hearbeat() + heartbeat_time = time.time() + + socks = dict(poller.poll(500)) + if self._pull_sock in socks and socks[self._pull_sock] == zmq.POLLIN: + cmd = self._pull_sock.recv() + if cmd == b"CONNECT": + # print time.ctime(), "CONNECT " + self._addr + if remote_sock: + poller.unregister(remote_sock) + remote_sock.close() + remote_sock = None + + remote_sock = self._do_connect() + + if remote_sock: + poller.register(remote_sock, zmq.POLLIN) + + elif cmd.startswith(b"SEND:") and remote_sock: + # print time.ctime(), "SEND " + cmd[5:] + remote_sock.send(cmd[5:]) + + if remote_sock and remote_sock in socks and socks[remote_sock] == zmq.POLLIN: + data = remote_sock.recv() + if data: + # if not data.find("heartbeat"): + # print time.ctime(), "RECV", data + self._on_data_arrived(data) + + except zmq.error.Again as e: + # print "RECV timeout: ", e + pass + except Exception as e: + print("_recv_run:", e) + + def _callback_run(self): + while not self._should_close: + try: + r = self._callback_queue.get(timeout=1) + if r: + r() + except qEmpty as e: + pass + except TypeError as e: + if str(e) == "'NoneType' object is not callable": + pass + else: + print("_callback_run {}".format(r), type(e), e) + except Exception as e: + print("_callback_run {}".format(r), type(e), e) + + def _async_call(self, func): + self._callback_queue.put(func) + + def _send_request(self, json): + + try: + self._send_lock.acquire() + self._push_sock.send(b"SEND:" + json) + + finally: + self._send_lock.release() + + def connect(self, addr): + self._addr = addr + self._push_sock.send_string('CONNECT', encoding='utf-8') + + def _do_connect(self): + + client_id = str(random.randint(1000000, 100000000)) + + socket = self._ctx.socket(zmq.DEALER) + identity = (client_id) + '$' + str(random.randint(1000000, 1000000000)) + identity = identity.encode('utf-8') + socket.setsockopt(zmq.IDENTITY, identity) + socket.setsockopt(zmq.RCVTIMEO, 500) + socket.setsockopt(zmq.SNDTIMEO, 500) + socket.setsockopt(zmq.LINGER, 0) + socket.connect(self._addr) + + return socket + + def close(self): + self._should_close = True + self._callback_thread.join() + self._recv_thread.join() + + def _on_data_arrived(self, str): + try: + msg = self._unpack(str) + # print "RECV", msg + + if not msg: + print("wrong message format") + return + + if 'method' in msg and msg['method'] == '.sys.heartbeat': + self._last_heartbeat_rsp_time = time.time() + if not self._connected: + self._connected = True + if self.on_connected: + self._async_call(self.on_connected) + + # Let user has a chance to check message in .sys.heartbeat + if 'result' in msg and self.on_rpc_callback: + self._async_call(lambda: self.on_rpc_callback(msg['method'], msg['result'])) + + elif 'id' in msg and msg['id']: + + # Call result + id = int(msg['id']) + + if self._waiter_lock.acquire(): + if id in self._waiter_map: + q = self._waiter_map[id] + if q: q.put(msg) + self._waiter_lock.release() + else: + # Notification message + if 'method' in msg and 'result' in msg and self.on_rpc_callback: + self._async_call(lambda: self.on_rpc_callback(msg['method'], msg['result'])) + + except Exception as e: + print("_on_data_arrived:", e) + pass + + def _send_hearbeat(self): + msg = {'jsonrpc': '2.0', + 'method': '.sys.heartbeat', + 'params': {'time': time.time()}, + 'id': str(self.next_callid())} + json_str = self._pack(msg) + self._send_request(json_str) + + def _alloc_wait_queue(self): + self._waiter_lock.acquire() + if self._call_wait_queue: + q = self._call_wait_queue + self._call_wait_queue = None + else: + q = queue.Queue() + self._waiter_lock.release() + return q + + def _free_wait_queue(self, q): + self._waiter_lock.acquire() + if not self._call_wait_queue: + self._call_wait_queue = q + else: + del q + self._waiter_lock.release() + + def call(self, method, params, timeout=6): + # print "call", method, params, timeout + callid = self.next_callid() + if timeout: + q = self._alloc_wait_queue() + + self._waiter_lock.acquire() + self._waiter_map[callid] = q + self._waiter_lock.release() + + msg = {'jsonrpc': '2.0', + 'method': method, + 'params': params, + 'id': str(callid)} + + # print "SEND", msg + json_str = self._pack(msg) + self._send_request(json_str) + + if timeout: + ret = {} + try: + r = q.get(timeout=timeout) + q.task_done() + except qEmpty: + r = None + + self._waiter_lock.acquire() + self._waiter_map[callid] = None + self._waiter_lock.release() + self._free_wait_queue(q) + + if r: + if 'result' in r: + ret['result'] = r['result'] + + if 'error' in r: + ret['error'] = r['error'] + + return ret if ret else {'error': {'error': -1, 'message': "timeout"}} + else: + return {'result': True} diff --git a/vnpy/trader/gateway/tkproGateway/TradeApi/trade_api.py b/vnpy/trader/gateway/tkproGateway/TradeApi/trade_api.py new file mode 100644 index 00000000..58f928d2 --- /dev/null +++ b/vnpy/trader/gateway/tkproGateway/TradeApi/trade_api.py @@ -0,0 +1,592 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +import json +from builtins import * + +import pandas as pd + +from . import utils + + +class EntrustOrder(object): + def __init__(self, security, action, price, size): + self.security = security + self.action = action + self.price = price + self.size = size + + +def set_log_dir(log_dir): + if log_dir: + try: + import jrpc + jrpc.set_log_dir(log_dir) + except Exception as e: + print("Exception", e) + + +class TradeApi(object): + def __init__(self, addr, use_jrpc=True, prod_type="jzts"): + """ + use_jrpc: + True -- Use jrcp_client of C version, for jzts only + False -- Use pure python version + prod_type: + "jaqs" -- jrpc_msgpack_wth_snappy + "jzts" -- jrpc_msgpack + """ + + self._remote = None + if prod_type == "jzts": + try: + if use_jrpc: + import jrpc + self._remote = jrpc.JsonRpcClient() + else: + from . import jrpc_py + self._remote = jrpc_py.JRpcClient(data_format="msgpack") + except Exception as e: + print("Exception", e) + + if not self._remote: + from . import jrpc_py + self._remote = jrpc_py.JRpcClient(data_format="msgpack") + + else: + from . import jrpc_py + self._remote = jrpc_py.JRpcClient(data_format="msgpack_snappy") + + self._remote.on_rpc_callback = self._on_rpc_callback + self._remote.on_disconnected = self._on_disconnected + self._remote.on_connected = self._on_connected + self._remote.connect(addr) + + self._ordstatus_callback = None + self._taskstatus_callback = None + self._internal_order_callback = None + self._trade_callback = None + self._on_connection_callback = None + self._connected = False + self._username = "" + self._password = "" + self._strategy_id = 0 + self._strategy_selected = False + self._data_format = "default" + + def __del__(self): + self._remote.close() + + def _on_rpc_callback(self, method, data): + # print "_on_rpc_callback:", method, data + + if method == "oms.orderstatus_ind": + if self._data_format == "obj": + data = utils.to_obj("Order", data) + + if self._ordstatus_callback: + self._ordstatus_callback(data) + + elif method == "oms.taskstatus_ind": + if self._data_format == "obj": + data = utils.to_obj("TaskStatus", data) + + if self._taskstatus_callback: + self._taskstatus_callback(data) + + elif method == "oms.trade_ind": + if self._data_format == "obj": + data = utils.to_obj("Trade", data) + + if self._trade_callback: + self._trade_callback(data) + + elif method == "oms.internal_order_ind": + if self._data_format == "obj": + data = utils.to_obj("QuoteOrder", data) + + if self._internal_order_callback: + self._internal_order_callback(data) + + def _on_disconnected(self): + print("TradeApi: _on_disconnected") + self._connected = False + self._strategy_selected = False + if self._on_connection_callback: + self._on_connection_callback(False) + + def _on_connected(self): + print("TradeApi: _on_connected") + self._connected = True + self._do_login() + self._do_use_strategy() + if self._on_connection_callback: + self._on_connection_callback(True) + + def _check_session(self): + if not self._connected: + return (False, "no connection") + + if self._strategy_selected: + return (True, "") + + r, msg = self._do_login() + if not r: return (r, msg) + if self._strategy_id: + return self._do_use_strategy() + else: + return (r, msg) + + def set_data_format(self, format): + self._data_format = format + + def set_connection_callback(self, callback): + self._on_connection_callback = callback + + def set_ordstatus_callback(self, callback): + self._ordstatus_callback = callback + + def set_trade_callback(self, callback): + self._trade_callback = callback + + def set_task_callback(self, callback): + self._taskstatus_callback = callback + + def set_quoteorder_callback(self, callback): + self._internal_order_callback = callback + + def _get_format(self, format, default_format): + if format: + return format + elif self._data_format != "default": + return self._data_format + else: + return default_format + + def login(self, username, password, format=""): + self._username = username + self._password = password + return self._do_login(format=format) + + def _do_login(self, format=""): + # Shouldn't check connected flag here. ZMQ is a mesageq queue! + # if !self._connected : + # return (False, "-1,no connection") + + if self._username and self._password: + rpc_params = {"username": self._username, + "password": self._password} + + cr = self._remote.call("auth.login", rpc_params) + f = self._get_format(format, "") + if f != "obj" and f != "": + f = "" + return utils.extract_result(cr, format=f, class_name="UserInfo") + else: + return (False, "-1,empty username or password") + + def logout(self): + rpc_params = {} + + cr = self._remote.call("auth.logout", rpc_params) + return utils.extract_result(cr) + + def close(self): + self._remote.close() + + def use_strategy(self, strategy_id): + if strategy_id: + self._strategy_id = strategy_id + return self._do_use_strategy() + else: + # Query + rpc_params = {"account_id": 0} + + cr = self._remote.call("auth.use_strategy", rpc_params) + r, msg = utils.extract_result(cr) + self._strategy_selected = r + + return (r, msg) + + def _do_use_strategy(self): + if self._strategy_id: + rpc_params = {"account_id": self._strategy_id} + + cr = self._remote.call("auth.use_strategy", rpc_params) + r, msg = utils.extract_result(cr) + self._strategy_selected = r + + return (r, msg) + else: + return (False, "-1,no strategy_id was specified") + + def confirm_internal_order(self, task_id, confirmed): + """ + return (result, message) + if result is None, message contains error information + """ + + r, msg = self._check_session() + if not r: return (None, msg) + + rpc_params = {"task_id": task_id, + "confirmed": confirmed} + + cr = self._remote.call("oms.confirm_internal_order", rpc_params) + return utils.extract_result(cr) + + def order(self, security, price, size, algo="", algo_param={}, userdata=""): + """ + return (result, message) + if result is None, message contains error information + """ + + r, msg = self._check_session() + if not r: return (None, msg) + + rpc_params = {"security": security, + "price": price, + "size": int(size), + "algo": algo, + "algo_param": json.dumps(algo_param), + "user": self._username, + "userdata": userdata} + + cr = self._remote.call("oms.order", rpc_params) + return utils.extract_result(cr) + + def place_order(self, security, action, price, size, algo="", algo_param={}, userdata=""): + """ + return (result, message) + if result is None, message contains error information + """ + + r, msg = self._check_session() + if not r: return (None, msg) + + rpc_params = {"security": security, + "action": action, + "price": price, + "size": int(size), + "algo": algo, + "algo_param": json.dumps(algo_param), + "user": self._username, + "userdata": userdata} + + cr = self._remote.call("oms.place_order", rpc_params) + return utils.extract_result(cr) + + def batch_order(self, orders, algo="", algo_param={}, userdata=""): + """ + orders format: + [ {"security": "000001.SZ", "action": "Buy", "price": 10.0, "size" : 100}, ... ] + return (result, message) + if result is None, message contains error information + """ + + if not orders or not isinstance(orders, (list, tuple)): + return (None, "empty order") + + if isinstance(orders[0], EntrustOrder): + tmp = [] + for o in orders: + tmp.append({"security": o.security, + "price": o.price, + "size": int(o.size)}) + + orders = tmp + + r, msg = self._check_session() + if not r: return (None, msg) + + rpc_params = {"orders": orders, + "algo": algo, + "algo_param": json.dumps(algo_param), + "user": self._username, + "userdata": userdata} + + cr = self._remote.call("oms.batch_order", rpc_params) + return utils.extract_result(cr) + + def place_batch_order(self, orders, algo="", algo_param={}, userdata=""): + """ + orders format: + [ {"security": "000001.SZ", "action": "Buy", "price": 10.0, "size" : 100}, ... ] + return (result, message) + if result is None, message contains error information + """ + + if not orders or not isinstance(orders, (list, tuple)): + return (None, "empty order") + + if isinstance(orders[0], EntrustOrder): + tmp = [] + for o in orders: + tmp.append({"security": o.security, + "action": o.action, + "price": o.price, + "size": int(o.size)}) + + orders = tmp + + r, msg = self._check_session() + if not r: return (None, msg) + + rpc_params = {"orders": orders, + "algo": algo, + "algo_param": json.dumps(algo_param), + "user": self._username, + "userdata": userdata} + + cr = self._remote.call("oms.place_batch_order", rpc_params) + return utils.extract_result(cr) + + def cancel_order(self, task_id): + """ + return (result, message) + if result is None, message contains error information + """ + + r, msg = self._check_session() + if not r: return (None, msg) + + rpc_params = {"task_id": task_id} + + cr = self._remote.call("oms.cancel_order", rpc_params) + return utils.extract_result(cr) + + def query_account(self, format=""): + """ + return pd.dataframe + """ + r, msg = self._check_session() + if not r: return (None, msg) + + rpc_params = {} + + data_format = self._get_format(format, "pandas") + if data_format == "pandas": + rpc_params["format"] = "columnset" + + cr = self._remote.call("oms.query_account", rpc_params) + + return utils.extract_result(cr, format=data_format, class_name="Account") + + def query_position(self, mode="all", securities="", format=""): + """ + securities: seperate by "," + return pd.dataframe + """ + + r, msg = self._check_session() + if not r: return (None, msg) + + rpc_params = {"mode": mode, + "security": securities} + + data_format = self._get_format(format, "pandas") + if data_format == "pandas": + rpc_params["format"] = "columnset" + + cr = self._remote.call("oms.query_position", rpc_params) + + return utils.extract_result(cr, format=data_format, class_name="Position") + + def query_net_position(self, mode="all", securities="", format=""): + """ + securities: seperate by "," + return pd.dataframe + """ + + r, msg = self._check_session() + if not r: return (None, msg) + + rpc_params = {"mode": mode, + "security": securities} + + data_format = self._get_format(format, "pandas") + if data_format == "pandas": + rpc_params["format"] = "columnset" + + cr = self._remote.call("oms.query_net_position", rpc_params) + + return utils.extract_result(cr, format=data_format, class_name="NetPosition") + + def query_repo_contract(self, format=""): + """ + securities: seperate by "," + return pd.dataframe + """ + + r, msg = self._check_session() + if not r: return (None, msg) + + rpc_params = {} + + cr = self._remote.call("oms.query_repo_contract", rpc_params) + + return utils.extract_result(cr, format=self._get_format(format, "pandas"), class_name="RepoContract") + + def query_task(self, task_id=-1, format=""): + """ + task_id: -1 -- all + return pd.dataframe + """ + + r, msg = self._check_session() + if not r: return (None, msg) + + rpc_params = {"task_id": task_id} + + data_format = self._get_format(format, "pandas") + if data_format == "pandas": + rpc_params["format"] = "columnset" + + cr = self._remote.call("oms.query_task", rpc_params) + + return utils.extract_result(cr, format=data_format, class_name="Task") + + def query_order(self, task_id=-1, format=""): + """ + task_id: -1 -- all + return pd.dataframe + """ + + r, msg = self._check_session() + if not r: return (None, msg) + + rpc_params = {"task_id": task_id} + + data_format = self._get_format(format, "pandas") + if data_format == "pandas": + rpc_params["format"] = "columnset" + + cr = self._remote.call("oms.query_order", rpc_params) + + return utils.extract_result(cr, format=data_format, class_name="Order") + + def query_trade(self, task_id=-1, format=""): + """ + task_id: -1 -- all + return pd.dataframe + """ + + r, msg = self._check_session() + if not r: return (None, msg) + + rpc_params = {"task_id": task_id} + + data_format = self._get_format(format, "pandas") + if data_format == "pandas": + rpc_params["format"] = "columnset" + + cr = self._remote.call("oms.query_trade", rpc_params) + + return utils.extract_result(cr, format=data_format, class_name="Trade") + + def query_portfolio(self, format=""): + """ + return pd.dataframe + """ + + r, msg = self._check_session() + if not r: return (None, msg) + + rpc_params = {} + + data_format = self._get_format(format, "pandas") + if data_format == "pandas": + rpc_params["format"] = "columnset" + + cr = self._remote.call("pms.query_portfolio", rpc_params) + + return utils.extract_result(cr, index_column="security", format=data_format, class_name="NetPosition") + + def goal_portfolio(self, positions, algo="", algo_param={}, userdata=""): + """ + positions format: + [ {"security": "000001.SZ", "ref_price": 10.0, "size" : 100}, ...] + return (result, message) + if result is None, message contains error information + """ + + r, msg = self._check_session() + if not r: return (False, msg) + + if type(positions) is pd.core.frame.DataFrame: + tmp = [] + for i in range(0, len(positions)): + tmp.append({'security': positions.index[i], 'ref_price': float(positions['ref_price'][i]), + "size": int(positions['size'][i])}) + positions = tmp + + rpc_params = {"positions": positions, + "algo": algo, + "algo_param": json.dumps(algo_param), + "user": self._username, + "userdata": userdata} + + cr = self._remote.call("pms.goal_portfolio", rpc_params) + return utils.extract_result(cr) + + def basket_order(self, orders, algo="", algo_param={}, userdata=""): + """ + orders format: + [ {"security": "000001.SZ", "ref_price": 10.0, "inc_size" : 100}, ...] + return (result, message) + if result is None, message contains error information + """ + + r, msg = self._check_session() + if not r: return (False, msg) + + if type(orders) is pd.core.frame.DataFrame: + tmp = [] + for i in range(0, len(orders)): + tmp.append({'security': orders.index[i], 'ref_price': float(orders['ref_price'][i]), + "inc_size": int(orders['inc_size'][i])}) + orders = tmp + + rpc_params = {"orders": orders, + "algo": algo, + "algo_param": json.dumps(algo_param), + "user": self._username, + "userdata": userdata} + + cr = self._remote.call("pms.basket_order", rpc_params) + return utils.extract_result(cr) + + def stop_portfolio(self): + """ + return (result, message) + if result is None, message contains error information + """ + + r, msg = self._check_session() + if not r: return (False, msg) + + rpc_params = {} + + cr = self._remote.call("pms.stop_portfolio", rpc_params) + return utils.extract_result(cr) + + def query_universe(self, format=""): + + r, msg = self._check_session() + if not r: return (None, msg) + + rpc_params = {} + data_format = self._get_format(format, "pandas") + if data_format == "pandas": + rpc_params["format"] = "columnset" + + cr = self._remote.call("oms.query_universe", rpc_params) + + return utils.extract_result(cr, format=data_format, class_name="UniverseItem") + + def set_heartbeat(self, interval, timeout): + self._remote.set_hearbeat_options(interval, timeout) + print("heartbeat_interval =", self._remote._heartbeat_interval, ", heartbeat_timeout =", + self._remote._heartbeat_timeout) diff --git a/vnpy/trader/gateway/tkproGateway/TradeApi/utils.py b/vnpy/trader/gateway/tkproGateway/TradeApi/utils.py new file mode 100644 index 00000000..9a1d3f28 --- /dev/null +++ b/vnpy/trader/gateway/tkproGateway/TradeApi/utils.py @@ -0,0 +1,95 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from builtins import * +from collections import namedtuple + +import pandas as pd + + +def _to_date(row): + date = int(row['DATE']) + return pd.datetime(year=date // 10000, month=date // 100 % 100, day=date % 100) + + +def _to_datetime(row): + date = int(row['DATE']) + time = int(row['TIME']) // 1000 + return pd.datetime(year=date // 10000, month=date // 100 % 100, day=date % 100, + hour=time // 10000, minute=time // 100 % 100, second=time % 100) + + +def _to_dataframe(cloumset, index_func=None, index_column=None): + df = pd.DataFrame(cloumset) + if index_func: + df.index = df.apply(index_func, axis=1) + elif index_column: + df.index = df[index_column] + del df.index.name + + return df + + +def _error_to_str(error): + if error: + if 'message' in error: + return str(error['error']) + "," + error['message'] + else: + return str(error['error']) + "," + else: + return "," + + +def to_obj(class_name, data): + try: + if isinstance(data, (list, tuple)): + result = [] + for d in data: + result.append(namedtuple(class_name, list(d.keys()))(*list(d.values()))) + return result + + elif type(data) == dict: + result = namedtuple(class_name, list(data.keys()))(*list(data.values())) + return result + else: + return data + except Exception as e: + print(class_name, data, e) + return data + + +def extract_result(cr, format="", index_column=None, class_name=""): + """ + format supports pandas, obj. + """ + + err = _error_to_str(cr['error']) if 'error' in cr else None + if 'result' in cr: + if format == "pandas": + if index_column: + return (_to_dataframe(cr['result'], None, index_column), err) + if 'TIME' in cr['result']: + return (_to_dataframe(cr['result'], _to_datetime), err) + elif 'DATE' in cr['result']: + return (_to_dataframe(cr['result'], _to_date), err) + else: + return (_to_dataframe(cr['result']), err) + + elif format == "obj" and cr['result'] and class_name: + r = cr['result'] + if isinstance(r, (list, tuple)): + result = [] + for d in r: + result.append(namedtuple(class_name, list(d.keys()))(*list(d.values()))) + elif isinstance(r, dict): + result = namedtuple(class_name, list(r.keys()))(*list(r.values())) + else: + result = r + + return (result, err) + else: + return (cr['result'], err) + else: + return (None, err) diff --git a/vnpy/trader/gateway/tkproGateway/__init__.py b/vnpy/trader/gateway/tkproGateway/__init__.py new file mode 100644 index 00000000..b747a8de --- /dev/null +++ b/vnpy/trader/gateway/tkproGateway/__init__.py @@ -0,0 +1,10 @@ +# encoding: UTF-8 + +from vnpy.trader import vtConstant +from .tkproGateway import TkproGateway + +gatewayClass = TkproGateway +gatewayName = 'TKPRO' +gatewayDisplayName = 'TKPRO' +gatewayType = vtConstant.GATEWAYTYPE_EQUITY +gatewayQryEnabled = True diff --git a/vnpy/trader/gateway/tkproGateway/tkproGateway.py b/vnpy/trader/gateway/tkproGateway/tkproGateway.py new file mode 100644 index 00000000..85d406cc --- /dev/null +++ b/vnpy/trader/gateway/tkproGateway/tkproGateway.py @@ -0,0 +1,611 @@ +# encoding: UTF-8 + +''' +quantOS的TkPro系统接入 +''' + +import sys +import os +import json +import traceback +from datetime import datetime + +from vnpy.trader.vtConstant import * +from vnpy.trader.vtObject import * +from vnpy.trader.vtGateway import VtGateway +from vnpy.trader.vtFunction import getJsonPath +from vnpy.trader.vtEvent import EVENT_TIMER + +from .DataApi import DataApi +from .TradeApi import TradeApi +from collections import namedtuple + + + +# 以下为一些VT类型和TkPro类型的映射字典 + +# 动作印射 +actionMap = {} +actionMap[(DIRECTION_LONG, OFFSET_OPEN)] = "Buy" +actionMap[(DIRECTION_SHORT, OFFSET_OPEN)] = "Short" +actionMap[(DIRECTION_LONG, OFFSET_CLOSE)] = "Cover" +actionMap[(DIRECTION_SHORT, OFFSET_CLOSE)] = "Sell" +actionMap[(DIRECTION_LONG, OFFSET_CLOSEYESTERDAY)] = "CoverYesterday" +actionMap[(DIRECTION_SHORT, OFFSET_CLOSEYESTERDAY)] = "SellYesterday" +actionMap[(DIRECTION_LONG, OFFSET_CLOSETODAY)] = "CoverToday" +actionMap[(DIRECTION_SHORT, OFFSET_CLOSETODAY)] = "SellToday" +actionMapReverse = {v: k for k, v in actionMap.items()} + +# 交易所类型映射 +exchangeMap = {} +exchangeMap[EXCHANGE_CFFEX] = 'CFE' +exchangeMap[EXCHANGE_SHFE] = 'SHF' +exchangeMap[EXCHANGE_CZCE] = 'CZC' +exchangeMap[EXCHANGE_DCE] = 'DCE' +exchangeMap[EXCHANGE_SSE] = 'SH' +exchangeMap[EXCHANGE_SZSE] = 'SZ' +exchangeMapReverse = {v:k for k,v in exchangeMap.items()} + + +# 持仓类型映射 +sideMap = {} +sideMap[DIRECTION_LONG] = 'Long' +sideMap[DIRECTION_SHORT] = 'Short' +sideMapReverse = {v:k for k,v in sideMap.items()} + +# 产品类型映射 +productClassMapReverse = {} +productClassMapReverse[1] = PRODUCT_EQUITY +productClassMapReverse[3] = PRODUCT_EQUITY +productClassMapReverse[4] = PRODUCT_EQUITY +productClassMapReverse[5] = PRODUCT_EQUITY +productClassMapReverse[8] = PRODUCT_BOND +productClassMapReverse[17] = PRODUCT_BOND +productClassMapReverse[101] = PRODUCT_FUTURES +productClassMapReverse[102] = PRODUCT_FUTURES +productClassMapReverse[103] = PRODUCT_FUTURES + +# 委托状态映射 +statusMapReverse = {} +statusMapReverse['New'] = STATUS_UNKNOWN +statusMapReverse['Accepted'] = STATUS_NOTTRADED +statusMapReverse['Cancelled'] = STATUS_CANCELLED +statusMapReverse['Filled'] = STATUS_ALLTRADED +statusMapReverse['Rejected'] = STATUS_REJECTED + + + +######################################################################## +class TkproGateway(VtGateway): + """TkPro接口""" + + #---------------------------------------------------------------------- + def __init__(self, eventengine, gatewayName='TKPRO'): + """Constructor""" + super(TkproGateway, self).__init__(eventengine, gatewayName) + + self.dataApi = TkproDataApi(self) # 行情 + self.tradeApi = TkproTradeApi(self) # 交易 + + self.qryEnabled = False # 是否要启动循环查询 + + self.fileName = self.gatewayName + '_connect.json' + self.filePath = getJsonPath(self.fileName, __file__) + + #---------------------------------------------------------------------- + def connect(self): + """连接""" + try: + f = file(self.filePath) + except IOError: + log = VtLogData() + log.gatewayName = self.gatewayName + log.logContent = u'无法加载配置' + self.onLog(log) + return + + setting = json.load(f) + try: + username = str(setting['username']) + token = str(setting['token']) + strategy = int(setting['strategy']) + tradeAddress = str(setting['tradeAddress']) + dataAddress = str(setting['dataAddress']) + except KeyError: + log = VtLogData() + log.gatewayName = self.gatewayName + log.logContent = u'连接配置缺少字段,请检查' + self.onLog(log) + return + + # 创建行情和交易接口对象 + self.dataApi.connect(dataAddress, username, token) + self.tradeApi.connect(tradeAddress, username, token, strategy) + + # 初始化并启动查询 + self.initQuery() + + #---------------------------------------------------------------------- + def subscribe(self, subscribeReq): + """订阅行情""" + self.dataApi.subscribe(subscribeReq) + + #---------------------------------------------------------------------- + def sendOrder(self, orderReq): + """发单""" + self.tradeApi.sendOrder(orderReq) + + #---------------------------------------------------------------------- + def cancelOrder(self, cancelOrderReq): + """撤单""" + self.tradeApi.cancelOrder(cancelOrderReq) + + #---------------------------------------------------------------------- + def qryAccount(self): + """查询账户资金""" + self.tradeApi.qryAccount() + + #---------------------------------------------------------------------- + def qryPosition(self): + """查询持仓""" + self.tradeApi.qryPosition() + + #---------------------------------------------------------------------- + def close(self): + """关闭""" + pass + + #---------------------------------------------------------------------- + def initQuery(self): + """初始化连续查询""" + if self.qryEnabled: + # 需要循环的查询函数列表 + self.qryFunctionList = [self.qryPosition, self.qryAccount] + + self.qryCount = 0 # 查询触发倒计时 + self.qryTrigger = 2 # 查询触发点 + self.qryNextFunction = 0 # 上次运行的查询函数索引 + + self.startQuery() + + #---------------------------------------------------------------------- + def query(self, event): + """注册到事件处理引擎上的查询函数""" + self.qryCount += 1 + + if self.qryCount > self.qryTrigger: + # 清空倒计时 + self.qryCount = 0 + + # 执行查询函数 + function = self.qryFunctionList[self.qryNextFunction] + function() + + # 计算下次查询函数的索引,如果超过了列表长度,则重新设为0 + self.qryNextFunction += 1 + if self.qryNextFunction == len(self.qryFunctionList): + self.qryNextFunction = 0 + + #---------------------------------------------------------------------- + def startQuery(self): + """启动连续查询""" + self.eventEngine.register(EVENT_TIMER, self.query) + + #---------------------------------------------------------------------- + def setQryEnabled(self, qryEnabled): + """设置是否要启动循环查询""" + self.qryEnabled = qryEnabled + + +######################################################################## +class TkproTradeApi(object): + """TkPro交易API实现""" + + #---------------------------------------------------------------------- + def __init__(self, gateway): + """Constructor""" + super(TkproTradeApi, self).__init__() + + self.gateway = gateway # gateway对象 + self.gatewayName = gateway.gatewayName # gateway对象名称 + + self.api = None + + #---------------------------------------------------------------------- + def onOrderStatus(self, data): + """委托信息推送""" + if isinstance(data, dict): + data = namedtuple('Order', list(data.keys()))(*list(data.values())) + + order = VtOrderData() + order.gatewayName = self.gatewayName + + symbol, exchange = data.security.split('.') + order.symbol = symbol + order.exchange = exchangeMapReverse[exchange] + order.vtSymbol = '.'.join([order.symbol, order.exchange]) + + order.orderID = str(data.entrust_no) + order.taskID = str(data.task_id) + order.vtOrderID = order.orderID + + order.direction, order.offset = actionMapReverse.get(data.entrust_action, (DIRECTION_UNKNOWN, OFFSET_UNKNOWN)) + order.totalVolume = data.entrust_size + order.tradedVolume = data.fill_size + order.price = data.entrust_price + order.status = statusMapReverse.get(data.order_status) + order.tradePrice = data.fill_price + + t = str(data.entrust_time) + t = t.rjust(6, '0') + order.orderTime = '%s:%s:%s' %(t[0:2],t[2:4],t[4:]) + + self.gateway.onOrder(order) + + #---------------------------------------------------------------------- + def onTaskStatus(self, data): + """""" + pass + + #---------------------------------------------------------------------- + def onTrade(self, data): + """成交信息推送""" + if isinstance(data, dict): + data = namedtuple('Trade', list(data.keys()))(*list(data.values())) + + trade = VtTradeData() + trade.gatewayName = self.gatewayName + + symbol, exchange = data.security.split('.') + trade.symbol = symbol + trade.exchange = exchangeMapReverse[exchange] + trade.vtSymbol = '.'.join([trade.symbol, trade.exchange]) + + trade.direction, trade.offset = actionMapReverse.get(data.entrust_action, (DIRECTION_UNKNOWN, OFFSET_UNKNOWN)) + + trade.tradeID = str(data.fill_no) + trade.vtTradeID = str(data.fill_no) + + trade.orderID = str(data.entrust_no) + trade.vtOrderID = trade.orderID + trade.taskID = str(data.task_id) + + trade.price = data.fill_price + trade.volume = data.fill_size + + t = str(data.fill_time) + t = t.rjust(6, '0') + trade.tradeTime = '%s:%s:%s' %(t[0:2],t[2:4],t[4:]) + + self.gateway.onTrade(trade) + + #---------------------------------------------------------------------- + def onConnection(self, data): + """""" + self.writeLog(u'连接状态更新:%s' %data) + + if data: + self.qryInstrument() + self.qryOrder() + self.qryTrade() + + #---------------------------------------------------------------------- + def connect(self, tradeAddress, username, token, strategy): + """初始化连接""" + if self.api: + self.writeLog(u'交易已经连接') + return + + self.api = TradeApi(tradeAddress) + self.api.set_data_format('obj') + + # 登录 + result, msg = self.api.login(username, token) + + if not result: + self.writeLog(u'交易登录失败,错误信息:%s' %msg) + return + + result, msg = self.api.use_strategy(strategy) + + if result: + self.writeLog(u'选定策略号:%s' %strategy) + else: + self.writeLog(u'选定策略号失败') + + self.api.set_ordstatus_callback(self.onOrderStatus) + self.api.set_trade_callback(self.onTrade) + self.api.set_task_callback(self.onTaskStatus) + self.api.set_connection_callback(self.onConnection) + + #---------------------------------------------------------------------- + def close(self): + """关闭""" + pass + + #---------------------------------------------------------------------- + def writeLog(self, logContent): + """记录日志""" + log = VtLogData() + log.gatewayName = self.gatewayName + log.logContent = logContent + self.gateway.onLog(log) + + #---------------------------------------------------------------------- + def sendOrder(self, orderReq): + """发单""" + if not self.api: + return + + exchange = exchangeMap.get(orderReq.exchange, '') + security = '.'.join([orderReq.symbol, exchange]) + action = actionMap.get((orderReq.direction, orderReq.offset), '') + + taskid, msg = self.api.place_order(security, action, orderReq.price, int(orderReq.volume)) + + if taskid is 0: + self.writeLog(u'委托失败,错误信息:%s' %msg) + + #---------------------------------------------------------------------- + def cancelOrder(self, cancelOrderReq): + """撤单""" + if not self.api: + return + + result, msg = self.api.cancel_order(cancelOrderReq.orderID) + if result is 0: + self.writeLog(u'撤单失败,错误信息:%s' %msg) + + #---------------------------------------------------------------------- + def qryPosition(self): + """查询持仓""" + l, msg = self.api.query_position() + + if l is None: + self.writeLog(u'查询持仓失败,错误信息:%s' %msg) + return False + + for data in l: + position = VtPositionData() + position.gatewayName = self.gatewayName + + symbol, exchange = data.security.split('.') + position.symbol = symbol + position.exchange = exchangeMapReverse[exchange] + position.vtSymbol = '.'.join([position.symbol, position.exchange]) + + position.direction = sideMapReverse.get(data.side, DIRECTION_UNKNOWN) + position.vtPositionName = '.'.join([position.vtSymbol, position.direction]) + + position.price = data.cost_price + position.ydPosition = data.pre_size + position.tdPosition = data.today_size + position.position = data.current_size + position.frozen = data.frozen_size + + position.commission = data.commission + position.enable = data.enable_size + position.want = data.want_size + position.initPosition = data.init_size + position.trading = data.trading_pnl + position.holding = data.holding_pnl + position.last = data.last_price + + self.gateway.onPosition(position) + + return True + + #---------------------------------------------------------------------- + def qryAccount(self): + """查询资金""" + l, msg = self.api.query_account() + + if l is None: + self.writeLog(u'查询资金失败,错误信息:%s' %msg) + return False + + for data in l: + account = VtAccountData() + account.gatewayName = self.gatewayName + + account.accountID = '_'.join([str(data.id), data.type]) + account.vtAccountID = '.'.join([account.accountID, account.gatewayName]) + account.available = data.enable_balance + account.balance = account.available + data.frozen_balance + account.closeProfit = data.close_pnl + account.commission = data.commission + account.margin = data.margin + account.positionProfit = data.holding_pnl + account.preBalance = data.init_balance + + self.gateway.onAccount(account) + + #---------------------------------------------------------------------- + def qryOrder(self): + """查询委托""" + l, msg = self.api.query_order() + + if l is None: + self.writeLog(u'查询委托失败,错误信息:%s' %msg) + else: + for data in l: + self.onOrderStatus(data) + + self.writeLog(u'查询委托完成') + + #---------------------------------------------------------------------- + def qryTrade(self): + """查询成交""" + l, msg = self.api.query_trade() + + if l is None: + self.writeLog(u'查询成交失败,错误信息:%s' %msg) + return False + + for data in l: + self.onTrade(data) + + self.writeLog(u'查询成交完成') + return True + + #---------------------------------------------------------------------- + def qryInstrument(self): + """查询合约""" + # 通过DataAPI查询所有信息 + df, msg = self.gateway.dataApi.api.query( + view='jz.instrumentInfo', + fields='symbol,name,inst_type,buylot,pricetick,multiplier', + filter='inst_type=1', + data_format='pandas' + ) + + d = {} + for n, row in df.iterrows(): + d[row.symbol] = row + + # 查询所有信息 + l, msg = self.api.query_universe() + + if l is None: + self.writeLog(u'查询合约失败,错误信息:%s' %msg) + return False + + for data in l: + row = d[data.security] + + contract = VtContractData() + contract.gatewayName = self.gatewayName + + symbol, exchange = data.security.split('.') + contract.symbol = symbol + contract.exchange = exchangeMapReverse[exchange] + contract.vtSymbol = '.'.join([contract.symbol, contract.exchange]) + contract.productClass = PRODUCT_EQUITY + contract.name = unicode(row['name']) + contract.priceTick = float(row['pricetick']) + contract.size = int(row['multiplier']) + + self.gateway.onContract(contract) + + self.writeLog(u'查询合约完成') + return True + + +######################################################################## +class TkproDataApi(object): + """TkPro行情API实现""" + + #---------------------------------------------------------------------- + def __init__(self, gateway): + """Constructor""" + super(TkproDataApi, self).__init__() + + self.gateway = gateway + self.gatewayName = gateway.gatewayName + + self.api = None + + self.fields = "open,close,high,low,last,\ + volume,turnover,oi,preclose,time,date,\ + askprice1,askprice2,askprice3,askprice4,askprice5,\ + bidprice1,bidprice2,bidprice3,bidprice4,bidprice5,\ + askvolume1,askvolume2,askvolume3,askvolume4,askvolume5,\ + bidvolume1,bidvolume2,bidvolume3,bidvolume4,bidvolume5,\ + limit_up,limit_down" + + #---------------------------------------------------------------------- + def onMarketData(self, k, data): + """行情推送""" + tick = VtTickData() + tick.gatewayName = self.gatewayName + + try: + l = data['symbol'].split('.') + tick.symbol = l[0] + tick.exchange = exchangeMapReverse[l[1]] + tick.vtSymbol = '.'.join([tick.symbol, tick.exchange]) + + tick.openPrice = data['open'] + tick.highPrice = data['high'] + tick.lowPrice = data['low'] + tick.volume = data['volume'] + tick.turnover = data['turnover'] + tick.lastPrice = data['last'] + + tick.openInterest = data['oi'] + tick.preClosePrice = data['preclose'] + tick.date = str(data['date']) + + t = str(data['time']) + t = t.rjust(9, '0') + tick.time = '%s:%s:%s.%s' %(t[0:2],t[2:4],t[4:6],t[6:]) + + tick.bidPrice1 = data['bidprice1'] + tick.askPrice1 = data['askprice1'] + tick.bidVolume1 = data['bidvolume1'] + tick.askVolume1 = data['askvolume1'] + + if 'bidprice2' in data: + tick.bidPrice2 = data['bidprice2'] + tick.bidPrice3 = data['bidprice3'] + tick.bidPrice4 = data['bidprice4'] + tick.bidPrice5 = data['bidprice5'] + + tick.askPrice2 = data['askprice2'] + tick.askPrice3 = data['askprice3'] + tick.askPrice4 = data['askprice4'] + tick.askPrice5 = data['askprice5'] + + tick.bidVolume2 = data['bidvolume2'] + tick.bidVolume3 = data['bidvolume3'] + tick.bidVolume4 = data['bidvolume4'] + tick.bidVolume5 = data['bidvolume5'] + + tick.askVolume2 = data['askvolume2'] + tick.askVolume3 = data['askvolume3'] + tick.askVolume4 = data['askvolume4'] + tick.askVolume5 = data['askvolume5'] + + tick.upperLimit = data['limit_up'] + tick.lowerLimit = data['limit_down'] + + self.gateway.onTick(tick) + except Exception, e: + self.writeLog(u'行情更新失败,错误信息:%s' % str(e)) + + #---------------------------------------------------------------------- + def connect(self, dataAddress, username, token): + """连接""" + if self.api: + self.writeLog(u'行情已经连接') + return + + self.api = DataApi(dataAddress) + + result, msg = self.api.login(username, token) + + if not result: + self.writeLog(u'行情登录失败,错误信息:%sa' %str(msg)) + return + + self.writeLog(u'行情连接成功') + + #---------------------------------------------------------------------- + def subscribe(self, subscribeReq): + """订阅行情""" + exchange = exchangeMap.get(subscribeReq.exchange, '') + security = '.'.join([subscribeReq.symbol, exchange]) + + subscribed, msg = self.api.subscribe(security, fields=self.fields, func=self.onMarketData) + + if not subscribed: + self.writeLog(u'行情订阅失败,错误信息:%s' %str(msg)) + + #---------------------------------------------------------------------- + def writeLog(self, logContent): + """记录日志""" + log = VtLogData() + log.gatewayName = self.gatewayName + log.logContent = logContent + self.gateway.onLog(log) +