[Add]新增用于连接TradeSim的tkproGateway

This commit is contained in:
vn.py 2017-12-04 14:55:48 +08:00
parent bb678b6842
commit c1ef3422a0
26 changed files with 3223 additions and 26 deletions

View File

@ -0,0 +1,7 @@
{
"brokerID": "9999",
"mdAddress": "tcp://180.168.146.187:10011",
"tdAddress": "tcp://180.168.146.187:10001",
"userID": "simnow申请",
"password": "simnow申请"
}

View File

@ -0,0 +1,6 @@
{
"accountID": "110100001088",
"password": "123456",
"mdAddress": "tcp://203.86.95.187:10915",
"tdAddress": "tcp://203.86.95.187:10910"
}

View File

@ -1,6 +0,0 @@
{
"accountID": "200100000085",
"password": "332036",
"mdAddress": "tcp://101.226.253.121:20915",
"tdAddress": "tcp://101.226.253.121:20910"
}

View File

@ -1 +1 @@
3PhIldq+vwFT0Ap6unvLxA==
lBUnZalf043zoNavP3G6rA==

View File

@ -16,7 +16,7 @@ from vnpy.trader.uiQt import createQApp
from vnpy.trader.uiMainWindow import MainWindow
# 加载底层接口
from vnpy.trader.gateway import (secGateway)
from vnpy.trader.gateway import (secGateway, ctpGateway)
# 加载上层应用
from vnpy.trader.app import (riskManager, optionMaster)
@ -36,6 +36,7 @@ def main():
# 添加交易接口
me.addGateway(secGateway)
me.addGateway(ctpGateway)
# 添加上层应用
me.addApp(riskManager)

View File

@ -1,7 +0,0 @@
{
"accessKey": "火币网站申请",
"secretKey": "火币网站申请",
"interval": 0.5,
"market": "cny",
"debug": false
}

View File

@ -1,7 +0,0 @@
{
"host": "CNY",
"apiKey": "OKCOIN网站申请",
"secretKey": "OKCOIN网站申请",
"trace": false,
"leverage": 20
}

View File

@ -0,0 +1,7 @@
{
"username": "请在quantos.org申请",
"token": "请在quantos.org申请",
"strategy": 625,
"tradeAddress": "tcp://gw.quantos.org:8901",
"dataAddress": "tcp://data.tushare.org:8910"
}

View File

@ -1 +0,0 @@
PurwJxAEvTeQ9X8HMnmMRw==

View File

@ -17,7 +17,7 @@ from vnpy.trader.uiMainWindow import MainWindow
# 加载底层接口
from vnpy.trader.gateway import (ctpGateway, oandaGateway, ibGateway,
huobiGateway, okcoinGateway)
tkproGateway)
if system == 'Windows':
from vnpy.trader.gateway import (femasGateway, xspeedGateway,
@ -44,10 +44,9 @@ def main():
# 添加交易接口
me.addGateway(ctpGateway)
me.addGateway(tkproGateway)
me.addGateway(oandaGateway)
me.addGateway(ibGateway)
me.addGateway(huobiGateway)
me.addGateway(okcoinGateway)
if system == 'Windows':
me.addGateway(femasGateway)

View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,44 @@
# DataApi
标准数据API定义。
# 安装步骤
## 1、安装Python环境
如果本地还没有安装Python环境强烈建议安装 [Anaconda](http://www.continuum.io/downloads "Anaconda")。
打开上面的网址选择相应的操作系统确定要按照的Python版本一般建议用Python 2.7。
下载完成以后按照图形界面步骤完成安装。在默认情况下Anaconda会自动设置PATH环境。
## 2、安装依赖包
如果Python环境不是类似Anaconda的集成开发环境我们需要单独安装依赖包在已经有pandas/numpy包前提下还需要有以下几个包
- pyzmq
- msgpack_python
- python-snappy
可以通过单个安装完成,例如: `pip install pyzmq`
需要注意的是python-snappy这个包在Windows上的安装需要比较多的编译依赖,建议从[这个网页](http://www.lfd.uci.edu/~gohlke/pythonlibs)下载编译好的包,然后安装:
```shell
pip install python_snappy-0.5.1-cp27-cp27m-win_amd64.whl
```
## 3、使用DataApi
```python
from .data_api import DataApi # 这里假设工作目录是项目根目录
api = DataApi(addr="tcp://data.tushare.org:8910")
result, msg = api.login("username", "token") # 示例账户用户需要改为自己在www.quantos.org上注册的账户
print(result)
print(msg)
```

View File

@ -0,0 +1,12 @@
# encoding: utf-8
"""
Core data api for fetching data from remote service.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from .data_api import DataApi
__all__ = ['DataApi']

View File

@ -0,0 +1,578 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from builtins import *
import time
import numpy as np
from . import jrpc_py
# import jrpc
from . import utils
# def set_log_dir(log_dir):
# if log_dir:
# jrpc.set_log_dir(log_dir)
class DataApiCallback(object):
"""DataApi Callback
def on_quote(quote):
pass
def on_connection()
"""
def __init__(self):
self.on_quote = None
class DataApi(object):
"""
Abstract base class providing both historic and live data
from various data sources.
Current API version: 1.0
Attributes
----------
Methods
-------
subscribe
quote
daily
bar
bar_quote
"""
def __init__(self, addr="tcp://data.tushare.org:8910", use_jrpc=False):
"""Create DataApi client.
If use_jrpc, try to load the C version of JsonRpc. If failed, use pure
Python version of JsonRpc.
"""
self._remote = None
# if use_jrpc:
# try:
# import jrpc
# self._remote = jrpc.JRpcClient()
# except Exception as e:
# print "Can't load jrpc", e.message
if not self._remote:
self._remote = jrpc_py.JRpcClient()
self._remote.on_rpc_callback = self._on_rpc_callback
self._remote.on_disconnected = self._on_disconnected
self._remote.on_connected = self._on_connected
self._remote.connect(addr)
self._on_jsq_callback = None
self._connected = False
self._loggined = False
self._username = ""
self._password = ""
self._data_format = "default"
self._callback = None
self._schema = []
self._schema_id = 0
self._schema_map = {}
self._sub_hash = ""
self._subscribed_set = set()
self._timeout = 20
def login(self, username, password):
"""
Login before using data api.
Parameters
----------
username : str
username
password : str
password
"""
for i in range(3):
if self._connected:
break
time.sleep(1)
if not self._connected:
return (None, "-1,no connection")
self._username = username
self._password = password
return self._do_login()
def logout(self):
"""
Logout to stop using the data api or switch users.
"""
self._loggined = None
rpc_params = {}
cr = self._remote.call("auth.logout", rpc_params)
return utils.extract_result(cr)
def close(self):
"""
Close the data api.
"""
self._remote.close()
# def set_callback(self, callback):
# self._callback = callback
def set_timeout(self, timeout):
"""
Set timeout for data api.
Default timeout is 20s.
Parameters
----------
timeout : int
the max waiting time for the api return
"""
self._timeout = timeout
def set_data_format(self, format):
"""Set queried data format.
Available formats are:
"" -- Don't convert data, usually the type is map
"pandas" -- Convert table likely data to DataFrame
"""
self._data_format = format
def set_heartbeat(self, interval, timeout):
self._remote.set_hearbeat_options(interval, timeout)
def quote(self, symbol, fields="", data_format="", **kwargs):
r, msg = self._call_rpc("jsq.query",
self._get_format(data_format, "pandas"),
"Quote",
_index_column="symbol",
symbol=str(symbol),
fields=fields,
**kwargs)
return (r, msg)
def bar(self, symbol, start_time=200000, end_time=160000,
trade_date=0, freq="1m", fields="", data_format="", **kwargs):
"""
Query minute bars of various type, return DataFrame.
Parameters
----------
symbol : str
support multiple securities, separated by comma.
start_time : int (HHMMSS) or str ('HH:MM:SS')
Default is market open time.
end_time : int (HHMMSS) or str ('HH:MM:SS')
Default is market close time.
trade_date : int (YYYMMDD) or str ('YYYY-MM-DD')
Default is current trade_date.
fields : str, optional
separated by comma ',', default "" (all fields included).
freq : trade.common.MINBAR_TYPE, optional
{'1m', '5m', '15m'}, Minute bar type, default is '1m'
Returns
-------
df : pd.DataFrame
columns:
symbol, code, date, time, trade_date, freq, open, high, low, close, volume, turnover, vwap, oi
msg : str
error code and error message joined by comma
Examples
--------
df, msg = api.bar("000001.SH,cu1709.SHF", start_time="09:56:00", end_time="13:56:00",
trade_date="20170823", fields="open,high,low,last,volume", freq="5m")
"""
begin_time = utils.to_time_int(start_time)
if (begin_time == -1):
return (-1, "Begin time format error")
end_time = utils.to_time_int(end_time)
if (end_time == -1):
return (-1, "End time format error")
trade_date = utils.to_date_int(trade_date)
if (trade_date == -1):
return (-1, "Trade date format error")
return self._call_rpc("jsi.query",
self._get_format(data_format, "pandas"),
"Bar",
symbol=str(symbol),
fields=fields,
freq=freq,
trade_date=trade_date,
begin_time=begin_time,
end_time=end_time,
**kwargs)
def bar_quote(self, symbol, start_time=200000, end_time=160000,
trade_date=0, freq="1m", fields="", data_format="", **kwargs):
"""
Query minute bars of various type, return DataFrame.
It will also return ask/bid informations of the last quote in this bar
Parameters
----------
symbol : str
support multiple securities, separated by comma.
start_time : int (HHMMSS) or str ('HH:MM:SS')
Default is market open time.
end_time : int (HHMMSS) or str ('HH:MM:SS')
Default is market close time.
trade_date : int (YYYMMDD) or str ('YYYY-MM-DD')
Default is current trade_date.
fields : str, optional
separated by comma ',', default "" (all fields included).
freq : trade.common.MINBAR_TYPE, optional
{'1m', '5m', '15m'}, Minute bar type, default is '1m'
Returns
-------
df : pd.DataFrame
columns:
symbol, code, date, time, trade_date, freq, open, high, low, close, volume, turnover, vwap, oi,
askprice1, askprice2, askprice3, askprice4, askprice5,
bidprice1, bidprice2, bidprice3, bidprice4, bidprice5,
askvolume1, askvolume2, askvolume3, askvolume4, askvolume5,
bidvolume1, bidvolume2, bidvolume3, bidvolume4, bidvolume5
msg : str
error code and error message joined by comma
Examples
--------
df, msg = api.bar_quote("000001.SH,cu1709.SHF", start_time="09:56:00", end_time="13:56:00",
trade_date="20170823", fields="open,high,low,last,volume", freq="5m")
"""
begin_time = utils.to_time_int(start_time)
if (begin_time == -1):
return (-1, "Begin time format error")
end_time = utils.to_time_int(end_time)
if (end_time == -1):
return (-1, "End time format error")
trade_date = utils.to_date_int(trade_date)
if (trade_date == -1):
return (-1, "Trade date format error")
return self._call_rpc("jsi.bar_view",
self._get_format(data_format, "pandas"),
"BarQuote",
symbol=str(symbol),
fields=fields,
freq=freq,
trade_date=trade_date,
begin_time=begin_time,
end_time=end_time,
**kwargs)
def daily(self, symbol, start_date, end_date,
adjust_mode=None, freq="1d", fields="",
data_format="", **kwargs):
"""
Query dar bar,
support auto-fill suspended securities data,
support auto-adjust for splits, dividends and distributions.
Parameters
----------
symbol : str
support multiple securities, separated by comma.
start_date : int or str
YYYMMDD or 'YYYY-MM-DD'
end_date : int or str
YYYMMDD or 'YYYY-MM-DD'
fields : str, optional
separated by comma ',', default "" (all fields included).
adjust_mode : str or None, optional
None for no adjust;
'pre' for forward adjust;
'post' for backward adjust.
Returns
-------
df : pd.DataFrame
columns:
symbol, code, trade_date, open, high, low, close, volume, turnover, vwap, oi, suspended
msg : str
error code and error message joined by comma
Examples
--------
df, msg = api.daily("000001.SH,cu1709.SHF",start_date=20170503, end_date=20170708,
fields="open,high,low,last,volume", adjust_mode = "post")
"""
if adjust_mode == None:
adjust_mode = "none"
begin_date = utils.to_date_int(start_date)
if (begin_date == -1):
return (-1, "Begin date format error")
end_date = utils.to_date_int(end_date)
if (end_date == -1):
return (-1, "End date format error")
return self._call_rpc("jsd.query",
self._get_format(data_format, "pandas"),
"Daily",
symbol=str(symbol),
fields=fields,
begin_date=begin_date,
end_date=end_date,
adjust_mode=adjust_mode,
freq=freq,
**kwargs)
def query(self, view, filter="", fields="", data_format="", **kwargs):
"""
Get various reference data.
Parameters
----------
view : str
data source.
fields : str
Separated by ','
filter : str
filter expressions.
kwargs
Returns
-------
df : pd.DataFrame
msg : str
error code and error message, joined by ','
Examples
--------
res3, msg3 = ds.query("lb.secDailyIndicator", fields="price_level,high_52w_adj,low_52w_adj",\
filter="start_date=20170907&end_date=20170907",\
data_format='pandas')
view does not change. fileds can be any field predefined in reference data api.
"""
return self._call_rpc("jset.query",
self._get_format(data_format, "pandas"),
"JSetData",
view=view,
fields=fields,
filter=filter,
**kwargs)
def subscribe(self, symbol, func=None, fields=""):
"""Subscribe securites
This function adds new securities to subscribed list on the server. If
success, return subscribed codes.
If securities is empty, return current subscribed codes.
"""
r, msg = self._check_session()
if not r:
return (r, msg)
if func:
self._on_jsq_callback = func
rpc_params = {"symbol": symbol,
"fields": fields}
cr = self._remote.call("jsq.subscribe", rpc_params)
rsp, msg = utils.extract_result(cr, data_format="", class_name="SubRsp")
if not rsp:
return (rsp, msg)
new_codes = [x.strip() for x in symbol.split(',') if x]
self._subscribed_set = self._subscribed_set.union(set(new_codes))
self._schema_id = rsp['schema_id']
self._schema = rsp['schema']
self._sub_hash = rsp['sub_hash']
self._make_schema_map()
return (rsp['symbols'], msg)
def unsubscribe(self, symbol):
"""Unsubscribe securities.
Unscribe codes and return list of subscribed code.
"""
assert False, "NOT IMPLEMENTED"
def __del__(self):
self._remote.close()
def _on_disconnected(self):
"""JsonRpc callback"""
# print "DataApi: _on_disconnected"
self._connected = False
if self._callback:
self._callback("connection", False)
def _on_connected(self):
"""JsonRpc callback"""
self._connected = True
self._do_login()
self._do_subscribe()
if self._callback:
self._callback("connection", True)
def _check_session(self):
if not self._connected:
return (False, "no connection")
elif self._loggined:
return (True, "")
elif self._username and self._password:
return self._do_login()
else:
return (False, "no login session")
def _get_format(self, format, default_format):
if format:
return format
elif self._data_format != "default":
return self._data_format
else:
return default_format
def set_callback(self, callback):
self._callback = callback
def _convert_quote_ind(self, quote_ind):
"""Convert original quote_ind to a map.
The original quote_ind contains field index instead of field name!
"""
if quote_ind['schema_id'] != self._schema_id:
return None
indicators = quote_ind['indicators']
values = quote_ind['values']
max_index = len(self._schema)
quote = {}
for i in range(len(indicators)):
if indicators[i] < max_index:
quote[self._schema_map[indicators[i]]['name']] = values[i]
else:
quote[str(indicators[i])] = values[i]
return quote
def _on_rpc_callback(self, method, data):
# print "_on_rpc_callback:", method, data
try:
if method == "jsq.quote_ind":
if self._on_jsq_callback:
q = self._convert_quote_ind(data)
if q:
self._on_jsq_callback("quote", q)
elif method == ".sys.heartbeat":
if 'sub_hash' in data:
if self._sub_hash and self._sub_hash != data['sub_hash']:
print("sub_hash is not same", self._sub_hash, data['sub_hash'])
self._do_subscribe()
except Exception as e:
print("Can't load jrpc", e.message)
def _call_rpc(self, method, data_format, data_class, **kwargs):
r, msg = self._check_session()
if not r:
return (r, msg)
index_column = None
rpc_params = {}
for key, value in kwargs.items():
if key == '_index_column':
index_column = value
else:
if isinstance(value, (int, np.integer)):
value = int(value)
rpc_params[key] = value
cr = self._remote.call(method, rpc_params, timeout=self._timeout)
return utils.extract_result(cr, data_format=data_format, index_column=index_column, class_name=data_class)
def _make_schema_map(self):
self._schema_map = {}
for schema in self._schema:
self._schema_map[schema['id']] = schema
def _do_login(self):
# Shouldn't check connected flag here. ZMQ is a mesageq queue!
# if !self._connected :
# return (False, "-1,no connection")
if self._username and self._password:
rpc_params = {"username": self._username,
"password": self._password}
cr = self._remote.call("auth.login", rpc_params)
r, msg = utils.extract_result(cr, data_format="", class_name="UserInfo")
self._loggined = r
return (r, msg)
else:
self._loggined = None
return (False, "-1,empty username or password")
def _do_subscribe(self):
"""Subscribe again when reconnected or hash_code is not same"""
if not self._subscribed_set: return
codes = list(self._subscribed_set)
codes.sort()
# XXX subscribe with default fields!
rpc_params = {"symbol": ",".join(codes),
"fields": ""}
cr = self._remote.call("jsq.subscribe", rpc_params)
rsp, msg = utils.extract_result(cr, data_format="", class_name="SubRsp")
if not rsp:
# return (rsp, msg)
return
self._schema_id = rsp['schema_id']
self._schema = rsp['schema']
self._sub_hash = rsp['sub_hash']
# return (rsp.securities, msg)
self._make_schema_map()

View File

@ -0,0 +1,309 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from builtins import *
import zmq
import time
import random
try:
import queue
except ImportError:
import queue as queue
import threading
import msgpack
import snappy
import copy
qEmpty = copy.copy(queue.Empty)
def _unpack(str):
if str.startswith(b'S'):
tmp = snappy.uncompress(str[1:])
# print "SNAPPY: ", len(str), len(tmp)
obj = msgpack.loads(tmp, encoding='utf-8')
elif str.startswith(b'\0'):
obj = msgpack.loads(str[1:], encoding='utf-8')
else:
return None
return obj
def _pack(obj):
# print "pack", obj
tmp = msgpack.dumps(obj, encoding='utf-8')
if len(tmp) > 1000:
return b'S' + snappy.compress(tmp)
else:
return b'\0' + tmp
class JRpcClient(object):
def __init__(self):
self._waiter_lock = threading.Lock()
self._waiter_map = {}
self._should_close = False
self._next_callid = 0
self._send_lock = threading.Lock()
self._callid_lock = threading.Lock()
self._last_heartbeat_rsp_time = 0
self._connected = False
self.on_disconnected = None
self.on_rpc_callback = None
self._callback_queue = queue.Queue()
self._call_wait_queue = queue.Queue()
self._ctx = zmq.Context()
self._pull_sock = self._ctx.socket(zmq.PULL)
self._pull_sock.bind("inproc://pull_sock")
self._push_sock = self._ctx.socket(zmq.PUSH)
self._push_sock.connect("inproc://pull_sock")
self._heartbeat_interval = 1
self._heartbeat_timeout = 3
self._addr = None
t = threading.Thread(target=self._recv_run)
t.setDaemon(True)
t.start()
self._recv_thread = t
t = threading.Thread(target=self._callback_run)
t.setDaemon(True)
t.start()
self._callback_thread = t
def __del__(self):
self.close()
def next_callid(self):
self._callid_lock.acquire()
self._next_callid += 1
callid = self._next_callid
self._callid_lock.release()
return callid
def set_heartbeat_options(self, interval, timeout):
self._heartbeat_interval = interval
self._heartbeat_timeout = timeout
def _recv_run(self):
heartbeat_time = 0
poller = zmq.Poller()
poller.register(self._pull_sock, zmq.POLLIN)
remote_sock = None
while not self._should_close:
try:
if self._connected and time.time() - self._last_heartbeat_rsp_time > self._heartbeat_timeout:
self._connected = False
if self.on_disconnected: self._async_call(self.on_disconnected)
if remote_sock and time.time() - heartbeat_time > self._heartbeat_interval:
self._send_hearbeat()
heartbeat_time = time.time()
socks = dict(poller.poll(500))
if self._pull_sock in socks and socks[self._pull_sock] == zmq.POLLIN:
cmd = self._pull_sock.recv()
if cmd == b"CONNECT":
# print time.ctime(), "CONNECT " + self._addr
if remote_sock:
poller.unregister(remote_sock)
remote_sock.close()
remote_sock = None
remote_sock = self._do_connect()
if remote_sock:
poller.register(remote_sock, zmq.POLLIN)
elif cmd.startswith(b"SEND:") and remote_sock:
# print time.ctime(), "SEND " + cmd[5:]
remote_sock.send(cmd[5:])
if remote_sock and remote_sock in socks and socks[remote_sock] == zmq.POLLIN:
data = remote_sock.recv()
if data:
# if not data.find("heartbeat"):
# print time.ctime(), "RECV", data
self._on_data_arrived(data)
except zmq.error.Again as e:
# print "RECV timeout: ", e
pass
except Exception as e:
print("_recv_run:", e)
def _callback_run(self):
while not self._should_close:
try:
r = self._callback_queue.get(timeout=1)
if r:
r()
except qEmpty as e:
pass
except TypeError as e:
if str(e) == "'NoneType' object is not callable":
pass
else:
print("_callback_run {}".format(r), type(e), e)
except Exception as e:
print("_callback_run {}".format(r), type(e), e)
def _async_call(self, func):
self._callback_queue.put(func)
def _send_request(self, json):
try:
self._send_lock.acquire()
self._push_sock.send(b"SEND:" + json)
finally:
self._send_lock.release()
def connect(self, addr):
self._addr = addr
self._push_sock.send_string('CONNECT', encoding='utf-8')
def _do_connect(self):
client_id = str(random.randint(1000000, 100000000))
socket = self._ctx.socket(zmq.DEALER)
identity = (client_id) + '$' + str(random.randint(1000000, 1000000000))
identity = identity.encode('utf-8')
socket.setsockopt(zmq.IDENTITY, identity)
socket.setsockopt(zmq.RCVTIMEO, 500)
socket.setsockopt(zmq.SNDTIMEO, 500)
socket.setsockopt(zmq.LINGER, 0)
socket.connect(self._addr)
return socket
def close(self):
self._should_close = True
self._callback_thread.join()
self._recv_thread.join()
def _on_data_arrived(self, str):
try:
msg = _unpack(str)
# print "RECV", msg
if not msg:
print("wrong message format")
return
if 'method' in msg and msg['method'] == '.sys.heartbeat':
self._last_heartbeat_rsp_time = time.time()
if not self._connected:
self._connected = True
if self.on_connected:
self._async_call(self.on_connected)
# Let user has a chance to check message in .sys.heartbeat
if 'result' in msg and self.on_rpc_callback:
self._async_call(lambda: self.on_rpc_callback(msg['method'], msg['result']))
elif 'id' in msg and msg['id']:
# Call result
id = int(msg['id'])
if self._waiter_lock.acquire():
if id in self._waiter_map:
q = self._waiter_map[id]
if q: q.put(msg)
self._waiter_lock.release()
else:
# Notification message
if 'method' in msg and 'result' in msg and self.on_rpc_callback:
self._async_call(lambda: self.on_rpc_callback(msg['method'], msg['result']))
except Exception as e:
print("_on_data_arrived:", e)
pass
def _send_hearbeat(self):
msg = {'jsonrpc': '2.0',
'method': '.sys.heartbeat',
'params': {'time': time.time()},
'id': str(self.next_callid())}
json_str = _pack(msg)
self._send_request(json_str)
def _alloc_wait_queue(self):
self._waiter_lock.acquire()
if self._call_wait_queue:
q = self._call_wait_queue
self._call_wait_queue = None
else:
q = queue.Queue()
self._waiter_lock.release()
return q
def _free_wait_queue(self, q):
self._waiter_lock.acquire()
if not self._call_wait_queue:
self._call_wait_queue = q
else:
del q
self._waiter_lock.release()
def call(self, method, params, timeout=6):
# print "call", method, params, timeout
callid = self.next_callid()
if timeout:
q = self._alloc_wait_queue()
self._waiter_lock.acquire()
self._waiter_map[callid] = q
self._waiter_lock.release()
msg = {'jsonrpc': '2.0',
'method': method,
'params': params,
'id': str(callid)}
# print "SEND", msg
json_str = _pack(msg)
self._send_request(json_str)
if timeout:
ret = {}
try:
r = q.get(timeout=timeout)
q.task_done()
except qEmpty:
r = None
self._waiter_lock.acquire()
self._waiter_map[callid] = None
self._waiter_lock.release()
self._free_wait_queue(q)
if r:
if 'result' in r:
ret['result'] = r['result']
if 'error' in r:
ret['error'] = r['error']
return ret if ret else {'error': {'error': -1, 'message': "timeout"}}
else:
return {'result': True}

View File

@ -0,0 +1,137 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from builtins import *
from collections import namedtuple
import datetime as dt
import pandas as pd
import numpy as np
long_nan = 9223372036854775807
def is_long_nan(v):
if v == long_nan:
return True
else:
return False
def to_nan(x):
if is_long_nan(x):
return np.nan
else:
return x
def _to_date(row):
date = int(row['DATE'])
return pd.datetime(year=date // 10000, month=date // 100 % 100, day=date % 100)
def _to_datetime(row):
date = int(row['DATE'])
time = int(row['TIME']) // 1000
return pd.datetime(year=date // 10000, month=date / 100 % 100, day=date % 100,
hour=time // 10000, minute=time // 100 % 100, second=time % 100)
def _to_dataframe(cloumset, index_func=None, index_column=None):
df = pd.DataFrame(cloumset)
for col in df.columns:
if df.dtypes.loc[col] == np.int64:
df.loc[:, col] = df.loc[:, col].apply(to_nan)
if index_func:
df.index = df.apply(index_func, axis=1)
elif index_column:
df.index = df[index_column]
del df.index.name
return df
def _error_to_str(error):
if error:
if 'message' in error:
return str(error['error']) + "," + error['message']
else:
return str(error['error']) + ","
else:
return ","
def to_obj(class_name, data):
try:
if type(data) == list or type(data) == tuple:
result = []
for d in data:
result.append(namedtuple(class_name, list(d.keys()))(*list(d.values())))
return result
elif type(data) == dict:
result = namedtuple(class_name, list(data.keys()))(*list(data.values()))
return result
else:
return data
except Exception as e:
print(class_name, data, e)
return data
def to_date_int(date):
if isinstance(date, str):
t = dt.datetime.strptime(date, "%Y-%m-%d")
date_int = t.year * 10000 + t.month * 100 + t.day
return date_int
elif isinstance(date, (int, np.integer)):
return date
else:
return -1
def to_time_int(time):
if isinstance(time, str):
t = dt.datetime.strptime(time, "%H:%M:%S")
time_int = t.hour * 10000 + t.minute * 100 + t.second
return time_int
elif isinstance(time, (int, np.integer)):
return time
else:
return -1
def extract_result(cr, data_format="", index_column=None, class_name=""):
"""
format supports pandas, obj.
"""
err = _error_to_str(cr['error']) if 'error' in cr else None
if 'result' in cr:
if data_format == "pandas":
if index_column:
return (_to_dataframe(cr['result'], None, index_column), err)
# if 'TIME' in cr['result']:
# return (_to_dataframe(cr['result'], _to_datetime), err)
# elif 'DATE' in cr['result']:
# return (_to_dataframe(cr['result'], _to_date), err)
else:
return (_to_dataframe(cr['result']), err)
elif data_format == "obj" and cr['result'] and class_name:
r = cr['result']
if type(r) == list or type(r) == tuple:
result = []
for d in r:
result.append(namedtuple(class_name, list(d.keys()))(*list(d.values())))
elif type(r) == dict:
result = namedtuple(class_name, list(r.keys()))(*list(r.values()))
else:
result = r
return (result, err)
else:
return (cr['result'], err)
else:
return (None, err)

View File

@ -0,0 +1,7 @@
{
"username": "请在quantos.org申请",
"token": "请在quantos.org申请",
"strategy": 625,
"tradeAddress": "tcp://gw.quantos.org:8901",
"dataAddress": "tcp://data.tushare.org:8910"
}

View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,48 @@
# TradeApi
标准交易API定义
# 安装步骤
1、安装Python环境
------
如果本地还没有安装Python环境强烈建议安装 [Anaconda](http://www.continuum.io/downloads "Anaconda")。
打开上面的网址选择相应的操作系统确定要按照的Python版本一般建议用Python 2.7。
下载完成以后按照图形界面步骤完成安装。在默认情况下Anaconda会自动设置PATH环境。
2、安装依赖包
----------------
如果Python环境不是类似Anaconda的集成开发环境我们需要单独安装依赖包在已经有pandas/numpy包前提下还需要有以下几个包
pyzmq
msgpack_python
python-snappy
可以通过单个安装完成,例如: pip install pyzmq
需要注意的是python-snappy和msgpack-python这两个包在Windows上的安装需要比较多的编译依赖,建议从[这个网页](http://www.lfd.uci.edu/~gohlke/pythonlibs)下载编译好的包,然后安装:
pip install msgpack_python-0.4.8-cp27-cp27m-win_amd64.whl
pip install python_snappy-0.5.1-cp27-cp27m-win_amd64.whl
3、使用TradeApi
--------
在项目目录验证TradeApi是否正常使用。
```python
from TradeApi import TradeApi
api = TradeApi(addr="tcp://gw.quantos.org:8901")
result, msg = api.login("username", "token") # 示例账户用户需要改为自己在www.quantos.org上注册的账户
print result
print msg
```

View File

@ -0,0 +1,12 @@
# encoding: utf-8
"""
Core trade api for simulated and live trading.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from .trade_api import TradeApi
__all__ = ['TradeApi']

View File

@ -0,0 +1,341 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import json
import random
import time
from builtins import *
import zmq
try:
import queue
except ImportError:
import queue as queue
import threading
import msgpack
import snappy
import copy
qEmpty = copy.copy(queue.Empty)
def _unpack_msgpack_snappy(str):
if str.startswith(b'S'):
tmp = snappy.uncompress(str[1:])
# print "SNAPPY: ", len(str), len(tmp)
obj = msgpack.loads(tmp, encoding='utf-8')
elif str.startswith(b'\0'):
obj = msgpack.loads(str[1:], encoding='utf-8')
else:
return None
return obj
def _pack_msgpack_snappy(obj):
# print "pack", obj
tmp = msgpack.dumps(obj, encoding='utf-8')
if len(tmp) > 1000:
return b'S' + snappy.compress(tmp)
else:
return b'\0' + tmp
def _unpack_msgpack(str):
return msgpack.loads(str, encoding='utf-8')
def _pack_msgpack(obj):
return msgpack.dumps(obj, encoding='utf-8')
def _unpack_json(str):
return json.loads(str, encoding='utf-8')
def _pack_json(obj):
return json.dumps(obj, encoding='utf-8')
class JRpcClient(object):
def __init__(self, data_format="msgpack_snappy"):
self._waiter_lock = threading.Lock()
self._waiter_map = {}
self._should_close = False
self._next_callid = 0
self._send_lock = threading.Lock()
self._callid_lock = threading.Lock()
self._last_heartbeat_rsp_time = 0
self._connected = False
self.on_disconnected = None
self.on_rpc_callback = None
self._callback_queue = queue.Queue()
self._call_wait_queue = queue.Queue()
self._ctx = zmq.Context()
self._pull_sock = self._ctx.socket(zmq.PULL)
self._pull_sock.bind("inproc://pull_sock")
self._push_sock = self._ctx.socket(zmq.PUSH)
self._push_sock.connect("inproc://pull_sock")
self._heartbeat_interval = 1
self._heartbeat_timeout = 3
self._addr = None
if data_format == "msgpack_snappy":
self._pack = _pack_msgpack_snappy
self._unpack = _unpack_msgpack_snappy
elif data_format == "msgpack":
self._pack = _pack_msgpack
self._unpack = _unpack_msgpack
elif data_format == "json":
self._pack = _pack_json
self._unpack = _unpack_json
else:
assert False, "unknown data_format " + data_format
t = threading.Thread(target=self._recv_run)
t.setDaemon(True)
t.start()
self._recv_thread = t
t = threading.Thread(target=self._callback_run)
t.setDaemon(True)
t.start()
self._callback_thread = t
def __del__(self):
self.close()
def next_callid(self):
self._callid_lock.acquire()
self._next_callid += 1
callid = self._next_callid
self._callid_lock.release()
return callid
def set_heartbeat_options(self, interval, timeout):
self._heartbeat_interval = interval
self._heartbeat_timeout = timeout
def _recv_run(self):
heartbeat_time = 0
poller = zmq.Poller()
poller.register(self._pull_sock, zmq.POLLIN)
remote_sock = None
while not self._should_close:
try:
if self._connected and time.time() - self._last_heartbeat_rsp_time > self._heartbeat_timeout:
self._connected = False
if self.on_disconnected: self._async_call(self.on_disconnected)
if remote_sock and time.time() - heartbeat_time > self._heartbeat_interval:
self._send_hearbeat()
heartbeat_time = time.time()
socks = dict(poller.poll(500))
if self._pull_sock in socks and socks[self._pull_sock] == zmq.POLLIN:
cmd = self._pull_sock.recv()
if cmd == b"CONNECT":
# print time.ctime(), "CONNECT " + self._addr
if remote_sock:
poller.unregister(remote_sock)
remote_sock.close()
remote_sock = None
remote_sock = self._do_connect()
if remote_sock:
poller.register(remote_sock, zmq.POLLIN)
elif cmd.startswith(b"SEND:") and remote_sock:
# print time.ctime(), "SEND " + cmd[5:]
remote_sock.send(cmd[5:])
if remote_sock and remote_sock in socks and socks[remote_sock] == zmq.POLLIN:
data = remote_sock.recv()
if data:
# if not data.find("heartbeat"):
# print time.ctime(), "RECV", data
self._on_data_arrived(data)
except zmq.error.Again as e:
# print "RECV timeout: ", e
pass
except Exception as e:
print("_recv_run:", e)
def _callback_run(self):
while not self._should_close:
try:
r = self._callback_queue.get(timeout=1)
if r:
r()
except qEmpty as e:
pass
except TypeError as e:
if str(e) == "'NoneType' object is not callable":
pass
else:
print("_callback_run {}".format(r), type(e), e)
except Exception as e:
print("_callback_run {}".format(r), type(e), e)
def _async_call(self, func):
self._callback_queue.put(func)
def _send_request(self, json):
try:
self._send_lock.acquire()
self._push_sock.send(b"SEND:" + json)
finally:
self._send_lock.release()
def connect(self, addr):
self._addr = addr
self._push_sock.send_string('CONNECT', encoding='utf-8')
def _do_connect(self):
client_id = str(random.randint(1000000, 100000000))
socket = self._ctx.socket(zmq.DEALER)
identity = (client_id) + '$' + str(random.randint(1000000, 1000000000))
identity = identity.encode('utf-8')
socket.setsockopt(zmq.IDENTITY, identity)
socket.setsockopt(zmq.RCVTIMEO, 500)
socket.setsockopt(zmq.SNDTIMEO, 500)
socket.setsockopt(zmq.LINGER, 0)
socket.connect(self._addr)
return socket
def close(self):
self._should_close = True
self._callback_thread.join()
self._recv_thread.join()
def _on_data_arrived(self, str):
try:
msg = self._unpack(str)
# print "RECV", msg
if not msg:
print("wrong message format")
return
if 'method' in msg and msg['method'] == '.sys.heartbeat':
self._last_heartbeat_rsp_time = time.time()
if not self._connected:
self._connected = True
if self.on_connected:
self._async_call(self.on_connected)
# Let user has a chance to check message in .sys.heartbeat
if 'result' in msg and self.on_rpc_callback:
self._async_call(lambda: self.on_rpc_callback(msg['method'], msg['result']))
elif 'id' in msg and msg['id']:
# Call result
id = int(msg['id'])
if self._waiter_lock.acquire():
if id in self._waiter_map:
q = self._waiter_map[id]
if q: q.put(msg)
self._waiter_lock.release()
else:
# Notification message
if 'method' in msg and 'result' in msg and self.on_rpc_callback:
self._async_call(lambda: self.on_rpc_callback(msg['method'], msg['result']))
except Exception as e:
print("_on_data_arrived:", e)
pass
def _send_hearbeat(self):
msg = {'jsonrpc': '2.0',
'method': '.sys.heartbeat',
'params': {'time': time.time()},
'id': str(self.next_callid())}
json_str = self._pack(msg)
self._send_request(json_str)
def _alloc_wait_queue(self):
self._waiter_lock.acquire()
if self._call_wait_queue:
q = self._call_wait_queue
self._call_wait_queue = None
else:
q = queue.Queue()
self._waiter_lock.release()
return q
def _free_wait_queue(self, q):
self._waiter_lock.acquire()
if not self._call_wait_queue:
self._call_wait_queue = q
else:
del q
self._waiter_lock.release()
def call(self, method, params, timeout=6):
# print "call", method, params, timeout
callid = self.next_callid()
if timeout:
q = self._alloc_wait_queue()
self._waiter_lock.acquire()
self._waiter_map[callid] = q
self._waiter_lock.release()
msg = {'jsonrpc': '2.0',
'method': method,
'params': params,
'id': str(callid)}
# print "SEND", msg
json_str = self._pack(msg)
self._send_request(json_str)
if timeout:
ret = {}
try:
r = q.get(timeout=timeout)
q.task_done()
except qEmpty:
r = None
self._waiter_lock.acquire()
self._waiter_map[callid] = None
self._waiter_lock.release()
self._free_wait_queue(q)
if r:
if 'result' in r:
ret['result'] = r['result']
if 'error' in r:
ret['error'] = r['error']
return ret if ret else {'error': {'error': -1, 'message': "timeout"}}
else:
return {'result': True}

View File

@ -0,0 +1,592 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import json
from builtins import *
import pandas as pd
from . import utils
class EntrustOrder(object):
def __init__(self, security, action, price, size):
self.security = security
self.action = action
self.price = price
self.size = size
def set_log_dir(log_dir):
if log_dir:
try:
import jrpc
jrpc.set_log_dir(log_dir)
except Exception as e:
print("Exception", e)
class TradeApi(object):
def __init__(self, addr, use_jrpc=True, prod_type="jzts"):
"""
use_jrpc:
True -- Use jrcp_client of C version, for jzts only
False -- Use pure python version
prod_type:
"jaqs" -- jrpc_msgpack_wth_snappy
"jzts" -- jrpc_msgpack
"""
self._remote = None
if prod_type == "jzts":
try:
if use_jrpc:
import jrpc
self._remote = jrpc.JsonRpcClient()
else:
from . import jrpc_py
self._remote = jrpc_py.JRpcClient(data_format="msgpack")
except Exception as e:
print("Exception", e)
if not self._remote:
from . import jrpc_py
self._remote = jrpc_py.JRpcClient(data_format="msgpack")
else:
from . import jrpc_py
self._remote = jrpc_py.JRpcClient(data_format="msgpack_snappy")
self._remote.on_rpc_callback = self._on_rpc_callback
self._remote.on_disconnected = self._on_disconnected
self._remote.on_connected = self._on_connected
self._remote.connect(addr)
self._ordstatus_callback = None
self._taskstatus_callback = None
self._internal_order_callback = None
self._trade_callback = None
self._on_connection_callback = None
self._connected = False
self._username = ""
self._password = ""
self._strategy_id = 0
self._strategy_selected = False
self._data_format = "default"
def __del__(self):
self._remote.close()
def _on_rpc_callback(self, method, data):
# print "_on_rpc_callback:", method, data
if method == "oms.orderstatus_ind":
if self._data_format == "obj":
data = utils.to_obj("Order", data)
if self._ordstatus_callback:
self._ordstatus_callback(data)
elif method == "oms.taskstatus_ind":
if self._data_format == "obj":
data = utils.to_obj("TaskStatus", data)
if self._taskstatus_callback:
self._taskstatus_callback(data)
elif method == "oms.trade_ind":
if self._data_format == "obj":
data = utils.to_obj("Trade", data)
if self._trade_callback:
self._trade_callback(data)
elif method == "oms.internal_order_ind":
if self._data_format == "obj":
data = utils.to_obj("QuoteOrder", data)
if self._internal_order_callback:
self._internal_order_callback(data)
def _on_disconnected(self):
print("TradeApi: _on_disconnected")
self._connected = False
self._strategy_selected = False
if self._on_connection_callback:
self._on_connection_callback(False)
def _on_connected(self):
print("TradeApi: _on_connected")
self._connected = True
self._do_login()
self._do_use_strategy()
if self._on_connection_callback:
self._on_connection_callback(True)
def _check_session(self):
if not self._connected:
return (False, "no connection")
if self._strategy_selected:
return (True, "")
r, msg = self._do_login()
if not r: return (r, msg)
if self._strategy_id:
return self._do_use_strategy()
else:
return (r, msg)
def set_data_format(self, format):
self._data_format = format
def set_connection_callback(self, callback):
self._on_connection_callback = callback
def set_ordstatus_callback(self, callback):
self._ordstatus_callback = callback
def set_trade_callback(self, callback):
self._trade_callback = callback
def set_task_callback(self, callback):
self._taskstatus_callback = callback
def set_quoteorder_callback(self, callback):
self._internal_order_callback = callback
def _get_format(self, format, default_format):
if format:
return format
elif self._data_format != "default":
return self._data_format
else:
return default_format
def login(self, username, password, format=""):
self._username = username
self._password = password
return self._do_login(format=format)
def _do_login(self, format=""):
# Shouldn't check connected flag here. ZMQ is a mesageq queue!
# if !self._connected :
# return (False, "-1,no connection")
if self._username and self._password:
rpc_params = {"username": self._username,
"password": self._password}
cr = self._remote.call("auth.login", rpc_params)
f = self._get_format(format, "")
if f != "obj" and f != "":
f = ""
return utils.extract_result(cr, format=f, class_name="UserInfo")
else:
return (False, "-1,empty username or password")
def logout(self):
rpc_params = {}
cr = self._remote.call("auth.logout", rpc_params)
return utils.extract_result(cr)
def close(self):
self._remote.close()
def use_strategy(self, strategy_id):
if strategy_id:
self._strategy_id = strategy_id
return self._do_use_strategy()
else:
# Query
rpc_params = {"account_id": 0}
cr = self._remote.call("auth.use_strategy", rpc_params)
r, msg = utils.extract_result(cr)
self._strategy_selected = r
return (r, msg)
def _do_use_strategy(self):
if self._strategy_id:
rpc_params = {"account_id": self._strategy_id}
cr = self._remote.call("auth.use_strategy", rpc_params)
r, msg = utils.extract_result(cr)
self._strategy_selected = r
return (r, msg)
else:
return (False, "-1,no strategy_id was specified")
def confirm_internal_order(self, task_id, confirmed):
"""
return (result, message)
if result is None, message contains error information
"""
r, msg = self._check_session()
if not r: return (None, msg)
rpc_params = {"task_id": task_id,
"confirmed": confirmed}
cr = self._remote.call("oms.confirm_internal_order", rpc_params)
return utils.extract_result(cr)
def order(self, security, price, size, algo="", algo_param={}, userdata=""):
"""
return (result, message)
if result is None, message contains error information
"""
r, msg = self._check_session()
if not r: return (None, msg)
rpc_params = {"security": security,
"price": price,
"size": int(size),
"algo": algo,
"algo_param": json.dumps(algo_param),
"user": self._username,
"userdata": userdata}
cr = self._remote.call("oms.order", rpc_params)
return utils.extract_result(cr)
def place_order(self, security, action, price, size, algo="", algo_param={}, userdata=""):
"""
return (result, message)
if result is None, message contains error information
"""
r, msg = self._check_session()
if not r: return (None, msg)
rpc_params = {"security": security,
"action": action,
"price": price,
"size": int(size),
"algo": algo,
"algo_param": json.dumps(algo_param),
"user": self._username,
"userdata": userdata}
cr = self._remote.call("oms.place_order", rpc_params)
return utils.extract_result(cr)
def batch_order(self, orders, algo="", algo_param={}, userdata=""):
"""
orders format:
[ {"security": "000001.SZ", "action": "Buy", "price": 10.0, "size" : 100}, ... ]
return (result, message)
if result is None, message contains error information
"""
if not orders or not isinstance(orders, (list, tuple)):
return (None, "empty order")
if isinstance(orders[0], EntrustOrder):
tmp = []
for o in orders:
tmp.append({"security": o.security,
"price": o.price,
"size": int(o.size)})
orders = tmp
r, msg = self._check_session()
if not r: return (None, msg)
rpc_params = {"orders": orders,
"algo": algo,
"algo_param": json.dumps(algo_param),
"user": self._username,
"userdata": userdata}
cr = self._remote.call("oms.batch_order", rpc_params)
return utils.extract_result(cr)
def place_batch_order(self, orders, algo="", algo_param={}, userdata=""):
"""
orders format:
[ {"security": "000001.SZ", "action": "Buy", "price": 10.0, "size" : 100}, ... ]
return (result, message)
if result is None, message contains error information
"""
if not orders or not isinstance(orders, (list, tuple)):
return (None, "empty order")
if isinstance(orders[0], EntrustOrder):
tmp = []
for o in orders:
tmp.append({"security": o.security,
"action": o.action,
"price": o.price,
"size": int(o.size)})
orders = tmp
r, msg = self._check_session()
if not r: return (None, msg)
rpc_params = {"orders": orders,
"algo": algo,
"algo_param": json.dumps(algo_param),
"user": self._username,
"userdata": userdata}
cr = self._remote.call("oms.place_batch_order", rpc_params)
return utils.extract_result(cr)
def cancel_order(self, task_id):
"""
return (result, message)
if result is None, message contains error information
"""
r, msg = self._check_session()
if not r: return (None, msg)
rpc_params = {"task_id": task_id}
cr = self._remote.call("oms.cancel_order", rpc_params)
return utils.extract_result(cr)
def query_account(self, format=""):
"""
return pd.dataframe
"""
r, msg = self._check_session()
if not r: return (None, msg)
rpc_params = {}
data_format = self._get_format(format, "pandas")
if data_format == "pandas":
rpc_params["format"] = "columnset"
cr = self._remote.call("oms.query_account", rpc_params)
return utils.extract_result(cr, format=data_format, class_name="Account")
def query_position(self, mode="all", securities="", format=""):
"""
securities: seperate by ","
return pd.dataframe
"""
r, msg = self._check_session()
if not r: return (None, msg)
rpc_params = {"mode": mode,
"security": securities}
data_format = self._get_format(format, "pandas")
if data_format == "pandas":
rpc_params["format"] = "columnset"
cr = self._remote.call("oms.query_position", rpc_params)
return utils.extract_result(cr, format=data_format, class_name="Position")
def query_net_position(self, mode="all", securities="", format=""):
"""
securities: seperate by ","
return pd.dataframe
"""
r, msg = self._check_session()
if not r: return (None, msg)
rpc_params = {"mode": mode,
"security": securities}
data_format = self._get_format(format, "pandas")
if data_format == "pandas":
rpc_params["format"] = "columnset"
cr = self._remote.call("oms.query_net_position", rpc_params)
return utils.extract_result(cr, format=data_format, class_name="NetPosition")
def query_repo_contract(self, format=""):
"""
securities: seperate by ","
return pd.dataframe
"""
r, msg = self._check_session()
if not r: return (None, msg)
rpc_params = {}
cr = self._remote.call("oms.query_repo_contract", rpc_params)
return utils.extract_result(cr, format=self._get_format(format, "pandas"), class_name="RepoContract")
def query_task(self, task_id=-1, format=""):
"""
task_id: -1 -- all
return pd.dataframe
"""
r, msg = self._check_session()
if not r: return (None, msg)
rpc_params = {"task_id": task_id}
data_format = self._get_format(format, "pandas")
if data_format == "pandas":
rpc_params["format"] = "columnset"
cr = self._remote.call("oms.query_task", rpc_params)
return utils.extract_result(cr, format=data_format, class_name="Task")
def query_order(self, task_id=-1, format=""):
"""
task_id: -1 -- all
return pd.dataframe
"""
r, msg = self._check_session()
if not r: return (None, msg)
rpc_params = {"task_id": task_id}
data_format = self._get_format(format, "pandas")
if data_format == "pandas":
rpc_params["format"] = "columnset"
cr = self._remote.call("oms.query_order", rpc_params)
return utils.extract_result(cr, format=data_format, class_name="Order")
def query_trade(self, task_id=-1, format=""):
"""
task_id: -1 -- all
return pd.dataframe
"""
r, msg = self._check_session()
if not r: return (None, msg)
rpc_params = {"task_id": task_id}
data_format = self._get_format(format, "pandas")
if data_format == "pandas":
rpc_params["format"] = "columnset"
cr = self._remote.call("oms.query_trade", rpc_params)
return utils.extract_result(cr, format=data_format, class_name="Trade")
def query_portfolio(self, format=""):
"""
return pd.dataframe
"""
r, msg = self._check_session()
if not r: return (None, msg)
rpc_params = {}
data_format = self._get_format(format, "pandas")
if data_format == "pandas":
rpc_params["format"] = "columnset"
cr = self._remote.call("pms.query_portfolio", rpc_params)
return utils.extract_result(cr, index_column="security", format=data_format, class_name="NetPosition")
def goal_portfolio(self, positions, algo="", algo_param={}, userdata=""):
"""
positions format:
[ {"security": "000001.SZ", "ref_price": 10.0, "size" : 100}, ...]
return (result, message)
if result is None, message contains error information
"""
r, msg = self._check_session()
if not r: return (False, msg)
if type(positions) is pd.core.frame.DataFrame:
tmp = []
for i in range(0, len(positions)):
tmp.append({'security': positions.index[i], 'ref_price': float(positions['ref_price'][i]),
"size": int(positions['size'][i])})
positions = tmp
rpc_params = {"positions": positions,
"algo": algo,
"algo_param": json.dumps(algo_param),
"user": self._username,
"userdata": userdata}
cr = self._remote.call("pms.goal_portfolio", rpc_params)
return utils.extract_result(cr)
def basket_order(self, orders, algo="", algo_param={}, userdata=""):
"""
orders format:
[ {"security": "000001.SZ", "ref_price": 10.0, "inc_size" : 100}, ...]
return (result, message)
if result is None, message contains error information
"""
r, msg = self._check_session()
if not r: return (False, msg)
if type(orders) is pd.core.frame.DataFrame:
tmp = []
for i in range(0, len(orders)):
tmp.append({'security': orders.index[i], 'ref_price': float(orders['ref_price'][i]),
"inc_size": int(orders['inc_size'][i])})
orders = tmp
rpc_params = {"orders": orders,
"algo": algo,
"algo_param": json.dumps(algo_param),
"user": self._username,
"userdata": userdata}
cr = self._remote.call("pms.basket_order", rpc_params)
return utils.extract_result(cr)
def stop_portfolio(self):
"""
return (result, message)
if result is None, message contains error information
"""
r, msg = self._check_session()
if not r: return (False, msg)
rpc_params = {}
cr = self._remote.call("pms.stop_portfolio", rpc_params)
return utils.extract_result(cr)
def query_universe(self, format=""):
r, msg = self._check_session()
if not r: return (None, msg)
rpc_params = {}
data_format = self._get_format(format, "pandas")
if data_format == "pandas":
rpc_params["format"] = "columnset"
cr = self._remote.call("oms.query_universe", rpc_params)
return utils.extract_result(cr, format=data_format, class_name="UniverseItem")
def set_heartbeat(self, interval, timeout):
self._remote.set_hearbeat_options(interval, timeout)
print("heartbeat_interval =", self._remote._heartbeat_interval, ", heartbeat_timeout =",
self._remote._heartbeat_timeout)

View File

@ -0,0 +1,95 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from builtins import *
from collections import namedtuple
import pandas as pd
def _to_date(row):
date = int(row['DATE'])
return pd.datetime(year=date // 10000, month=date // 100 % 100, day=date % 100)
def _to_datetime(row):
date = int(row['DATE'])
time = int(row['TIME']) // 1000
return pd.datetime(year=date // 10000, month=date // 100 % 100, day=date % 100,
hour=time // 10000, minute=time // 100 % 100, second=time % 100)
def _to_dataframe(cloumset, index_func=None, index_column=None):
df = pd.DataFrame(cloumset)
if index_func:
df.index = df.apply(index_func, axis=1)
elif index_column:
df.index = df[index_column]
del df.index.name
return df
def _error_to_str(error):
if error:
if 'message' in error:
return str(error['error']) + "," + error['message']
else:
return str(error['error']) + ","
else:
return ","
def to_obj(class_name, data):
try:
if isinstance(data, (list, tuple)):
result = []
for d in data:
result.append(namedtuple(class_name, list(d.keys()))(*list(d.values())))
return result
elif type(data) == dict:
result = namedtuple(class_name, list(data.keys()))(*list(data.values()))
return result
else:
return data
except Exception as e:
print(class_name, data, e)
return data
def extract_result(cr, format="", index_column=None, class_name=""):
"""
format supports pandas, obj.
"""
err = _error_to_str(cr['error']) if 'error' in cr else None
if 'result' in cr:
if format == "pandas":
if index_column:
return (_to_dataframe(cr['result'], None, index_column), err)
if 'TIME' in cr['result']:
return (_to_dataframe(cr['result'], _to_datetime), err)
elif 'DATE' in cr['result']:
return (_to_dataframe(cr['result'], _to_date), err)
else:
return (_to_dataframe(cr['result']), err)
elif format == "obj" and cr['result'] and class_name:
r = cr['result']
if isinstance(r, (list, tuple)):
result = []
for d in r:
result.append(namedtuple(class_name, list(d.keys()))(*list(d.values())))
elif isinstance(r, dict):
result = namedtuple(class_name, list(r.keys()))(*list(r.values()))
else:
result = r
return (result, err)
else:
return (cr['result'], err)
else:
return (None, err)

View File

@ -0,0 +1,10 @@
# encoding: UTF-8
from vnpy.trader import vtConstant
from .tkproGateway import TkproGateway
gatewayClass = TkproGateway
gatewayName = 'TKPRO'
gatewayDisplayName = 'TKPRO'
gatewayType = vtConstant.GATEWAYTYPE_EQUITY
gatewayQryEnabled = True

View File

@ -0,0 +1,611 @@
# encoding: UTF-8
'''
quantOS的TkPro系统接入
'''
import sys
import os
import json
import traceback
from datetime import datetime
from vnpy.trader.vtConstant import *
from vnpy.trader.vtObject import *
from vnpy.trader.vtGateway import VtGateway
from vnpy.trader.vtFunction import getJsonPath
from vnpy.trader.vtEvent import EVENT_TIMER
from .DataApi import DataApi
from .TradeApi import TradeApi
from collections import namedtuple
# 以下为一些VT类型和TkPro类型的映射字典
# 动作印射
actionMap = {}
actionMap[(DIRECTION_LONG, OFFSET_OPEN)] = "Buy"
actionMap[(DIRECTION_SHORT, OFFSET_OPEN)] = "Short"
actionMap[(DIRECTION_LONG, OFFSET_CLOSE)] = "Cover"
actionMap[(DIRECTION_SHORT, OFFSET_CLOSE)] = "Sell"
actionMap[(DIRECTION_LONG, OFFSET_CLOSEYESTERDAY)] = "CoverYesterday"
actionMap[(DIRECTION_SHORT, OFFSET_CLOSEYESTERDAY)] = "SellYesterday"
actionMap[(DIRECTION_LONG, OFFSET_CLOSETODAY)] = "CoverToday"
actionMap[(DIRECTION_SHORT, OFFSET_CLOSETODAY)] = "SellToday"
actionMapReverse = {v: k for k, v in actionMap.items()}
# 交易所类型映射
exchangeMap = {}
exchangeMap[EXCHANGE_CFFEX] = 'CFE'
exchangeMap[EXCHANGE_SHFE] = 'SHF'
exchangeMap[EXCHANGE_CZCE] = 'CZC'
exchangeMap[EXCHANGE_DCE] = 'DCE'
exchangeMap[EXCHANGE_SSE] = 'SH'
exchangeMap[EXCHANGE_SZSE] = 'SZ'
exchangeMapReverse = {v:k for k,v in exchangeMap.items()}
# 持仓类型映射
sideMap = {}
sideMap[DIRECTION_LONG] = 'Long'
sideMap[DIRECTION_SHORT] = 'Short'
sideMapReverse = {v:k for k,v in sideMap.items()}
# 产品类型映射
productClassMapReverse = {}
productClassMapReverse[1] = PRODUCT_EQUITY
productClassMapReverse[3] = PRODUCT_EQUITY
productClassMapReverse[4] = PRODUCT_EQUITY
productClassMapReverse[5] = PRODUCT_EQUITY
productClassMapReverse[8] = PRODUCT_BOND
productClassMapReverse[17] = PRODUCT_BOND
productClassMapReverse[101] = PRODUCT_FUTURES
productClassMapReverse[102] = PRODUCT_FUTURES
productClassMapReverse[103] = PRODUCT_FUTURES
# 委托状态映射
statusMapReverse = {}
statusMapReverse['New'] = STATUS_UNKNOWN
statusMapReverse['Accepted'] = STATUS_NOTTRADED
statusMapReverse['Cancelled'] = STATUS_CANCELLED
statusMapReverse['Filled'] = STATUS_ALLTRADED
statusMapReverse['Rejected'] = STATUS_REJECTED
########################################################################
class TkproGateway(VtGateway):
"""TkPro接口"""
#----------------------------------------------------------------------
def __init__(self, eventengine, gatewayName='TKPRO'):
"""Constructor"""
super(TkproGateway, self).__init__(eventengine, gatewayName)
self.dataApi = TkproDataApi(self) # 行情
self.tradeApi = TkproTradeApi(self) # 交易
self.qryEnabled = False # 是否要启动循环查询
self.fileName = self.gatewayName + '_connect.json'
self.filePath = getJsonPath(self.fileName, __file__)
#----------------------------------------------------------------------
def connect(self):
"""连接"""
try:
f = file(self.filePath)
except IOError:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'无法加载配置'
self.onLog(log)
return
setting = json.load(f)
try:
username = str(setting['username'])
token = str(setting['token'])
strategy = int(setting['strategy'])
tradeAddress = str(setting['tradeAddress'])
dataAddress = str(setting['dataAddress'])
except KeyError:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'连接配置缺少字段,请检查'
self.onLog(log)
return
# 创建行情和交易接口对象
self.dataApi.connect(dataAddress, username, token)
self.tradeApi.connect(tradeAddress, username, token, strategy)
# 初始化并启动查询
self.initQuery()
#----------------------------------------------------------------------
def subscribe(self, subscribeReq):
"""订阅行情"""
self.dataApi.subscribe(subscribeReq)
#----------------------------------------------------------------------
def sendOrder(self, orderReq):
"""发单"""
self.tradeApi.sendOrder(orderReq)
#----------------------------------------------------------------------
def cancelOrder(self, cancelOrderReq):
"""撤单"""
self.tradeApi.cancelOrder(cancelOrderReq)
#----------------------------------------------------------------------
def qryAccount(self):
"""查询账户资金"""
self.tradeApi.qryAccount()
#----------------------------------------------------------------------
def qryPosition(self):
"""查询持仓"""
self.tradeApi.qryPosition()
#----------------------------------------------------------------------
def close(self):
"""关闭"""
pass
#----------------------------------------------------------------------
def initQuery(self):
"""初始化连续查询"""
if self.qryEnabled:
# 需要循环的查询函数列表
self.qryFunctionList = [self.qryPosition, self.qryAccount]
self.qryCount = 0 # 查询触发倒计时
self.qryTrigger = 2 # 查询触发点
self.qryNextFunction = 0 # 上次运行的查询函数索引
self.startQuery()
#----------------------------------------------------------------------
def query(self, event):
"""注册到事件处理引擎上的查询函数"""
self.qryCount += 1
if self.qryCount > self.qryTrigger:
# 清空倒计时
self.qryCount = 0
# 执行查询函数
function = self.qryFunctionList[self.qryNextFunction]
function()
# 计算下次查询函数的索引如果超过了列表长度则重新设为0
self.qryNextFunction += 1
if self.qryNextFunction == len(self.qryFunctionList):
self.qryNextFunction = 0
#----------------------------------------------------------------------
def startQuery(self):
"""启动连续查询"""
self.eventEngine.register(EVENT_TIMER, self.query)
#----------------------------------------------------------------------
def setQryEnabled(self, qryEnabled):
"""设置是否要启动循环查询"""
self.qryEnabled = qryEnabled
########################################################################
class TkproTradeApi(object):
"""TkPro交易API实现"""
#----------------------------------------------------------------------
def __init__(self, gateway):
"""Constructor"""
super(TkproTradeApi, self).__init__()
self.gateway = gateway # gateway对象
self.gatewayName = gateway.gatewayName # gateway对象名称
self.api = None
#----------------------------------------------------------------------
def onOrderStatus(self, data):
"""委托信息推送"""
if isinstance(data, dict):
data = namedtuple('Order', list(data.keys()))(*list(data.values()))
order = VtOrderData()
order.gatewayName = self.gatewayName
symbol, exchange = data.security.split('.')
order.symbol = symbol
order.exchange = exchangeMapReverse[exchange]
order.vtSymbol = '.'.join([order.symbol, order.exchange])
order.orderID = str(data.entrust_no)
order.taskID = str(data.task_id)
order.vtOrderID = order.orderID
order.direction, order.offset = actionMapReverse.get(data.entrust_action, (DIRECTION_UNKNOWN, OFFSET_UNKNOWN))
order.totalVolume = data.entrust_size
order.tradedVolume = data.fill_size
order.price = data.entrust_price
order.status = statusMapReverse.get(data.order_status)
order.tradePrice = data.fill_price
t = str(data.entrust_time)
t = t.rjust(6, '0')
order.orderTime = '%s:%s:%s' %(t[0:2],t[2:4],t[4:])
self.gateway.onOrder(order)
#----------------------------------------------------------------------
def onTaskStatus(self, data):
""""""
pass
#----------------------------------------------------------------------
def onTrade(self, data):
"""成交信息推送"""
if isinstance(data, dict):
data = namedtuple('Trade', list(data.keys()))(*list(data.values()))
trade = VtTradeData()
trade.gatewayName = self.gatewayName
symbol, exchange = data.security.split('.')
trade.symbol = symbol
trade.exchange = exchangeMapReverse[exchange]
trade.vtSymbol = '.'.join([trade.symbol, trade.exchange])
trade.direction, trade.offset = actionMapReverse.get(data.entrust_action, (DIRECTION_UNKNOWN, OFFSET_UNKNOWN))
trade.tradeID = str(data.fill_no)
trade.vtTradeID = str(data.fill_no)
trade.orderID = str(data.entrust_no)
trade.vtOrderID = trade.orderID
trade.taskID = str(data.task_id)
trade.price = data.fill_price
trade.volume = data.fill_size
t = str(data.fill_time)
t = t.rjust(6, '0')
trade.tradeTime = '%s:%s:%s' %(t[0:2],t[2:4],t[4:])
self.gateway.onTrade(trade)
#----------------------------------------------------------------------
def onConnection(self, data):
""""""
self.writeLog(u'连接状态更新:%s' %data)
if data:
self.qryInstrument()
self.qryOrder()
self.qryTrade()
#----------------------------------------------------------------------
def connect(self, tradeAddress, username, token, strategy):
"""初始化连接"""
if self.api:
self.writeLog(u'交易已经连接')
return
self.api = TradeApi(tradeAddress)
self.api.set_data_format('obj')
# 登录
result, msg = self.api.login(username, token)
if not result:
self.writeLog(u'交易登录失败,错误信息:%s' %msg)
return
result, msg = self.api.use_strategy(strategy)
if result:
self.writeLog(u'选定策略号:%s' %strategy)
else:
self.writeLog(u'选定策略号失败')
self.api.set_ordstatus_callback(self.onOrderStatus)
self.api.set_trade_callback(self.onTrade)
self.api.set_task_callback(self.onTaskStatus)
self.api.set_connection_callback(self.onConnection)
#----------------------------------------------------------------------
def close(self):
"""关闭"""
pass
#----------------------------------------------------------------------
def writeLog(self, logContent):
"""记录日志"""
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = logContent
self.gateway.onLog(log)
#----------------------------------------------------------------------
def sendOrder(self, orderReq):
"""发单"""
if not self.api:
return
exchange = exchangeMap.get(orderReq.exchange, '')
security = '.'.join([orderReq.symbol, exchange])
action = actionMap.get((orderReq.direction, orderReq.offset), '')
taskid, msg = self.api.place_order(security, action, orderReq.price, int(orderReq.volume))
if taskid is 0:
self.writeLog(u'委托失败,错误信息:%s' %msg)
#----------------------------------------------------------------------
def cancelOrder(self, cancelOrderReq):
"""撤单"""
if not self.api:
return
result, msg = self.api.cancel_order(cancelOrderReq.orderID)
if result is 0:
self.writeLog(u'撤单失败,错误信息:%s' %msg)
#----------------------------------------------------------------------
def qryPosition(self):
"""查询持仓"""
l, msg = self.api.query_position()
if l is None:
self.writeLog(u'查询持仓失败,错误信息:%s' %msg)
return False
for data in l:
position = VtPositionData()
position.gatewayName = self.gatewayName
symbol, exchange = data.security.split('.')
position.symbol = symbol
position.exchange = exchangeMapReverse[exchange]
position.vtSymbol = '.'.join([position.symbol, position.exchange])
position.direction = sideMapReverse.get(data.side, DIRECTION_UNKNOWN)
position.vtPositionName = '.'.join([position.vtSymbol, position.direction])
position.price = data.cost_price
position.ydPosition = data.pre_size
position.tdPosition = data.today_size
position.position = data.current_size
position.frozen = data.frozen_size
position.commission = data.commission
position.enable = data.enable_size
position.want = data.want_size
position.initPosition = data.init_size
position.trading = data.trading_pnl
position.holding = data.holding_pnl
position.last = data.last_price
self.gateway.onPosition(position)
return True
#----------------------------------------------------------------------
def qryAccount(self):
"""查询资金"""
l, msg = self.api.query_account()
if l is None:
self.writeLog(u'查询资金失败,错误信息:%s' %msg)
return False
for data in l:
account = VtAccountData()
account.gatewayName = self.gatewayName
account.accountID = '_'.join([str(data.id), data.type])
account.vtAccountID = '.'.join([account.accountID, account.gatewayName])
account.available = data.enable_balance
account.balance = account.available + data.frozen_balance
account.closeProfit = data.close_pnl
account.commission = data.commission
account.margin = data.margin
account.positionProfit = data.holding_pnl
account.preBalance = data.init_balance
self.gateway.onAccount(account)
#----------------------------------------------------------------------
def qryOrder(self):
"""查询委托"""
l, msg = self.api.query_order()
if l is None:
self.writeLog(u'查询委托失败,错误信息:%s' %msg)
else:
for data in l:
self.onOrderStatus(data)
self.writeLog(u'查询委托完成')
#----------------------------------------------------------------------
def qryTrade(self):
"""查询成交"""
l, msg = self.api.query_trade()
if l is None:
self.writeLog(u'查询成交失败,错误信息:%s' %msg)
return False
for data in l:
self.onTrade(data)
self.writeLog(u'查询成交完成')
return True
#----------------------------------------------------------------------
def qryInstrument(self):
"""查询合约"""
# 通过DataAPI查询所有信息
df, msg = self.gateway.dataApi.api.query(
view='jz.instrumentInfo',
fields='symbol,name,inst_type,buylot,pricetick,multiplier',
filter='inst_type=1',
data_format='pandas'
)
d = {}
for n, row in df.iterrows():
d[row.symbol] = row
# 查询所有信息
l, msg = self.api.query_universe()
if l is None:
self.writeLog(u'查询合约失败,错误信息:%s' %msg)
return False
for data in l:
row = d[data.security]
contract = VtContractData()
contract.gatewayName = self.gatewayName
symbol, exchange = data.security.split('.')
contract.symbol = symbol
contract.exchange = exchangeMapReverse[exchange]
contract.vtSymbol = '.'.join([contract.symbol, contract.exchange])
contract.productClass = PRODUCT_EQUITY
contract.name = unicode(row['name'])
contract.priceTick = float(row['pricetick'])
contract.size = int(row['multiplier'])
self.gateway.onContract(contract)
self.writeLog(u'查询合约完成')
return True
########################################################################
class TkproDataApi(object):
"""TkPro行情API实现"""
#----------------------------------------------------------------------
def __init__(self, gateway):
"""Constructor"""
super(TkproDataApi, self).__init__()
self.gateway = gateway
self.gatewayName = gateway.gatewayName
self.api = None
self.fields = "open,close,high,low,last,\
volume,turnover,oi,preclose,time,date,\
askprice1,askprice2,askprice3,askprice4,askprice5,\
bidprice1,bidprice2,bidprice3,bidprice4,bidprice5,\
askvolume1,askvolume2,askvolume3,askvolume4,askvolume5,\
bidvolume1,bidvolume2,bidvolume3,bidvolume4,bidvolume5,\
limit_up,limit_down"
#----------------------------------------------------------------------
def onMarketData(self, k, data):
"""行情推送"""
tick = VtTickData()
tick.gatewayName = self.gatewayName
try:
l = data['symbol'].split('.')
tick.symbol = l[0]
tick.exchange = exchangeMapReverse[l[1]]
tick.vtSymbol = '.'.join([tick.symbol, tick.exchange])
tick.openPrice = data['open']
tick.highPrice = data['high']
tick.lowPrice = data['low']
tick.volume = data['volume']
tick.turnover = data['turnover']
tick.lastPrice = data['last']
tick.openInterest = data['oi']
tick.preClosePrice = data['preclose']
tick.date = str(data['date'])
t = str(data['time'])
t = t.rjust(9, '0')
tick.time = '%s:%s:%s.%s' %(t[0:2],t[2:4],t[4:6],t[6:])
tick.bidPrice1 = data['bidprice1']
tick.askPrice1 = data['askprice1']
tick.bidVolume1 = data['bidvolume1']
tick.askVolume1 = data['askvolume1']
if 'bidprice2' in data:
tick.bidPrice2 = data['bidprice2']
tick.bidPrice3 = data['bidprice3']
tick.bidPrice4 = data['bidprice4']
tick.bidPrice5 = data['bidprice5']
tick.askPrice2 = data['askprice2']
tick.askPrice3 = data['askprice3']
tick.askPrice4 = data['askprice4']
tick.askPrice5 = data['askprice5']
tick.bidVolume2 = data['bidvolume2']
tick.bidVolume3 = data['bidvolume3']
tick.bidVolume4 = data['bidvolume4']
tick.bidVolume5 = data['bidvolume5']
tick.askVolume2 = data['askvolume2']
tick.askVolume3 = data['askvolume3']
tick.askVolume4 = data['askvolume4']
tick.askVolume5 = data['askvolume5']
tick.upperLimit = data['limit_up']
tick.lowerLimit = data['limit_down']
self.gateway.onTick(tick)
except Exception, e:
self.writeLog(u'行情更新失败,错误信息:%s' % str(e))
#----------------------------------------------------------------------
def connect(self, dataAddress, username, token):
"""连接"""
if self.api:
self.writeLog(u'行情已经连接')
return
self.api = DataApi(dataAddress)
result, msg = self.api.login(username, token)
if not result:
self.writeLog(u'行情登录失败,错误信息:%sa' %str(msg))
return
self.writeLog(u'行情连接成功')
#----------------------------------------------------------------------
def subscribe(self, subscribeReq):
"""订阅行情"""
exchange = exchangeMap.get(subscribeReq.exchange, '')
security = '.'.join([subscribeReq.symbol, exchange])
subscribed, msg = self.api.subscribe(security, fields=self.fields, func=self.onMarketData)
if not subscribed:
self.writeLog(u'行情订阅失败,错误信息:%s' %str(msg))
#----------------------------------------------------------------------
def writeLog(self, logContent):
"""记录日志"""
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = logContent
self.gateway.onLog(log)