From 9f5ac0c90d5b4ab053f921b710762f3c6d87629a Mon Sep 17 00:00:00 2001 From: yangyang Date: Tue, 22 Aug 2017 10:33:39 +0800 Subject: [PATCH 1/3] =?UTF-8?q?tianqin=E8=A1=8C=E6=83=85=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E6=A8=A1=E5=9D=97=E5=8E=9F=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/TianQinDataService/demo.py | 45 ++++ vnpy/data/tianqin/__init__.py | 0 vnpy/data/tianqin/vntianqin.py | 312 ++++++++++++++++++++++++++++ 3 files changed, 357 insertions(+) create mode 100644 examples/TianQinDataService/demo.py create mode 100644 vnpy/data/tianqin/__init__.py create mode 100644 vnpy/data/tianqin/vntianqin.py diff --git a/examples/TianQinDataService/demo.py b/examples/TianQinDataService/demo.py new file mode 100644 index 00000000..ddcd97ce --- /dev/null +++ b/examples/TianQinDataService/demo.py @@ -0,0 +1,45 @@ +# encoding: UTF-8 + +# 重载sys模块,设置默认字符串编码方式为utf8 +import sys +reload(sys) +sys.setdefaultencoding('utf8') + +from vnpy.event import EventEngine +from vnpy.data.tianqin.vntianqin import TianQinGateway, DataBackEndMemory, DataBackendMongo +from vnpy.trader.uiQt import createQApp + + +class DemoApp(object): + # ---------------------------------------------------------------------- + def __init__(self): + """Constructor""" + self.eventEngine = EventEngine() + self.eventEngine.start() + self.tianqinGateway = TianQinGateway(self.eventEngine, back_end=DataBackEndMemory()) + # mc = MongoClient(MONGO_HOST, MONGO_PORT) # Mongo连接 + # self.tianqinGateway = TianQinGateway(self.eventEngine, back_end=DataBackendMongo(mc)) + + def start(self): + self.tianqinGateway.connect() + self.tianqinGateway.subscribe_quote(["cu1803", "SR801", "c1801", "IF1708"], self.on_quote_data) + self.tianqinGateway.subscribe_chart("cu1803", 5, 1000, self.on_chart_data) + self.tianqinGateway.subscribe_chart("au1712", 0, 1000, self.on_chart_data) + + def on_quote_data(self, ins_id): + quote = self.tianqinGateway.get_quote(ins_id) + print("quote_update", ins_id, quote) + + def on_chart_data(self, ins_id, dur_seconds): + if dur_seconds == 0: + tick_serial = self.tianqinGateway.get_tick_serial(ins_id) + print("tick_serial_update", tick_serial) + else: + kline_serial = self.tianqinGateway.get_kline_serial(ins_id, dur_seconds) + print("kline_serial_update", kline_serial) + +#---------------------------------------------------------------------- +if __name__ == '__main__': + app = DemoApp() + app.start() + sys.exit(createQApp().exec_()) diff --git a/vnpy/data/tianqin/__init__.py b/vnpy/data/tianqin/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/vnpy/data/tianqin/vntianqin.py b/vnpy/data/tianqin/vntianqin.py new file mode 100644 index 00000000..be1c2438 --- /dev/null +++ b/vnpy/data/tianqin/vntianqin.py @@ -0,0 +1,312 @@ +# encoding: UTF-8 + +""" +对接天勤行情的网关接口,可以提供国内期货的 报价/K线/Tick序列 等数据的实时推送和历史仿真 +使用时需要在本机先启动一个天勤终端进程(http://www.tq18.cn) + +使用示例见: + + +""" + + +import json +import threading +import sortedcontainers +import tornado.ioloop +import tornado.iostream +import tornado.websocket +from tornado import gen + +from vnpy.event import * + + +######################################################################## +class DataBackEndMemory(object): + def __init__(self): + self.data = sortedcontainers.SortedDict() + + def input_data_pack(self, pack): + for data in pack: + self.data = self._dict_merge(self.data, data) + + def get_tick_serial(self, ins_id): + return self.data.setdefault("ticks", {}).setdefault(ins_id, {}).get("data", None) + + def get_kline_serial(self, ins_id, duration_seconds): + dur_id = "%d" % (duration_seconds * 1000000000) + return self.data.setdefault("klines", {}).setdefault(ins_id, {}).setdefault(dur_id, {}).get("data", None) + + def _dict_merge(self, *objs): + result = objs[0] + for obj in objs[1:]: + result = self._merge_obj(result, obj) + return result + + def _merge_obj(self, result, obj): + if not isinstance(result, dict): + result = {} + if not isinstance(obj, dict): + return obj + for key, value in obj.items(): + if isinstance(value, dict): + target = result.get(key) + if isinstance(target, dict): + self._merge_obj(target, value) + continue + result[key] = {} + self._merge_obj(result[key], value) + continue + if value is None: + result.pop(key, None) + continue + result[key] = value + return result + + +class DataBackendMongo(object): + def __init__(self, mc): + self.dbClient = mc + + def input_data_pack(self, pack): + for data in pack: + for selector, section in data.items(): + if selector == "ticks": + self.process_tick_data(section) + elif selector == "klines": + self.process_kline_data(section) + + def process_tick_data(self, section): + db_name = "TICK" + db = self.dbClient[db_name] # 数据库 + for ins_id, serials in section.items(): + cl = db[ins_id] + for tick_id, tick in serials: + flt = {'datetime': tick.datetime} + cl.replace_one(flt, tick, True) + + def get_tick_serial(self, ins_id): + collection = self.dbClient["TICK"][ins_id] + cursor = collection.find({}).sort('datetime') + return cursor + + def process_kline_data(self, section): + #todo + pass + + def get_kline_serial(self, ins_id, duration_seconds): + #todo + pass + + +class TianQinGateway(object): + """天勤行情服务""" + + # ---------------------------------------------------------------------- + def __init__(self, eventEngine, back_end=DataBackEndMemory()): + """Constructor""" + self.client = None + self.back_end = back_end + self.data = {} + self.requests = [] + self.eventEngine = eventEngine + self.eventEngine.register('eTianQin.', self._process_pack) + self.quote_callback_func = None + self.chart_callback_func = {} + + # ---------------------------------------------------------------------- + def connect(self): + """ + 建立行情连接。 + """ + self._start() + loop_thread = threading.Thread(target=lambda: tornado.ioloop.IOLoop.current().start()) + loop_thread.setDaemon(True) + loop_thread.start() + + # ---------------------------------------------------------------------- + def subscribe_quote(self, ins_list, notify_func=None): + """ + 订阅实时行情. + 指定一个合约列表,订阅其实时报价信息 + 每次调用此函数时,都会覆盖前一次的订阅设定,不在订阅列表里的合约,会停止行情推送 + :param ins_list: ins_list 是一个列表,列出全部需要实时行情的合约代码。 + :param notify_func (可选): callback_func 是一个回调函数,每当有报价数据变更时会触发。此函数应该接受一个参数 ins_id + :example: + 订阅 cu1803,SR709,IF1709 这三个合约的报价: subscribe_quote(["cu1803", ”SR709", "IF1709"]) + """ + if notify_func: + self.quote_callback_func = notify_func + req = { + "aid": "subscribe_quote", + "ins_list": ",".join(ins_list), + } + self._send_json(req) + + # ---------------------------------------------------------------------- + def subscribe_chart(self, ins_id, duration_seconds, data_length=200, notify_func=None): + """ + 订阅历史行情序列 + 订阅指定合约及周期的历史数据序列(K线数据序列或Tick数据序列),这些序列数据会持续推送 + :param ins_id: 合约代码,需注意大小写 + :param duration_seconds: 历史数据周期,以秒为单位。特别的,此值指定为0表示订阅tick序列。目前支持的周期包括: + 3秒,5秒,10秒,15秒,20秒,30秒,1分钟,2分钟,3分钟,5分钟,10分钟,15分钟,20分钟,30分钟,1小时,2小时,4小时,1日 + :param data_length: 需要获取的序列长度。每个序列最大支持请求 8964 个数据 + :param notify_func (可选): notify_func 是一个回调函数,每当序列数据变更时会触发。此函数应该接受2个参数 ins_id, duration_seconds + :example: + 订阅 cu1803 的1分钟线: subscribe_chart("cu1803", 60) + 订阅 IF1709 的tick线: subscribe_chart("IF1709", 0) + """ + chart_id = "VN_%s_%d" % (ins_id, duration_seconds) + if data_length > 8964: + data_length = 8964 + if notify_func: + self.chart_callback_func[chart_id] = notify_func + req = { + "aid": "set_chart", + "chart_id": chart_id, + "ins_list": ins_id, + "duration": duration_seconds * 1000000000, + "view_width": data_length, + } + self._send_json(req) + + # ---------------------------------------------------------------------- + def get_quote(self, ins_id): + """ + 获取报价数据 + :param ins_id: 指定合约代码 + :return: 若指定的数据不存在,返回None,否则返回如下所示的一个dict + { + u'datetime': u'2017-07-26 23:04:21.000001',# tick从交易所发出的时间(按北京时区) + u'instrument_id': u'SR801', # 合约代码 + u'last_price': 6122.0, # 最新价 + u'bid_price1': 6121.0, # 买一价 + u'ask_price1': 6122.0, # 卖一价 + u'bid_volume1': 54, # 买一量 + u'ask_volume1': 66, # 卖一量 + u'upper_limit': 6388.0, # 涨停价 + u'lower_limit': 5896.0, # 跌停价 + u'volume': 89252, # 成交量 + u'amount': 5461329880.0, # 成交额 + u'open_interest': 616424, # 持仓量 + u'highest': 6129.0, # 当日最高价 + u'lowest': 6101.0, # 当日最低价 + u'average': 6119.0, # 当日均价 + u'open': 6102.0, # 开盘价 + u'close': u'-', # 收盘价 + u'settlement': u'-', # 结算价 + u'pre_close': 6106.0, # 昨收盘价 + u'pre_settlement': 6142.0 # 昨结算价 + u'pre_open_interest': 616620, # 昨持仓量 + } + """ + return self.data.setdefault("quotes", {}).get(ins_id, None) + + # ---------------------------------------------------------------------- + def get_tick_serial(self, ins_id): + """ + 获取tick序列数据 + :param ins_id: 指定合约代码 + :return: 若指定的序列数据不存在,返回None,否则返回如下所示的一个dict + { + u'485107':{ # 每个Tick都有一个唯一编号,在一个序列中,编号总是连续递增的 + u'datetime': 1501074872000000000L, # tick从交易所发出的时间(按北京时区),以nano epoch 方式表示(等于从1970-01-01时刻开始的纳秒数) + u'trading_day': 1501084800000000000L, #交易日, 格式同上 + u'last_price': 3887, # 最新价 + u'bid_price1': 3881, # 买一价 + u'ask_price1': 3886, # 卖一价 + u'bid_volume1': 5, # 买一量 + u'ask_volume1': 1, #卖一量 + u'highest': 3887, # 当日最高价 + u'lowest': 3886, # 当日最低价 + u'volume': 6, # 成交量 + u'open_interest': 1796 # 持仓量 + }, + u'485108': { + ... + } + } + """ + return self.back_end.get_tick_serial(ins_id) + + # ---------------------------------------------------------------------- + def get_kline_serial(self, ins_id, duration_seconds): + """ + 获取k线序列数据 + :param ins_id: 指定合约代码 + :param duration_seconds: 指定K线周期 + :return: 若指定的序列数据不存在,返回None,否则返回如下所示的一个dict + { + u'494835': { # 每根K线都有一个唯一编号,在一个序列中,编号总是连续递增的 + u'datetime': 1501080715000000000L, # K线起点时间(按北京时区),以nano epoch 方式表示(等于从1970-01-01时刻开始的纳秒数) + u'open': 51450, # K线起始时刻的最新价 + u'high': 51450, # K线时间范围内的最高价 + u'low': 51450, # K线时间范围内的最低价 + u'close': 51450, # K线结束时刻的最新价 + u'volume': 0, # K线时间范围内的成交量 + u'open_oi': 27354, # K线起始时刻的持仓量 + u'close_oi': 27354 # K线结束时刻的持仓量 + }, + u'494836': { + ... + } + } + + """ + dur_id = "%d" % (duration_seconds * 1000000000) + return self.back_end.get_kline_serial(ins_id, duration_seconds) + + # ---------------------------------------------------------------------- + @gen.coroutine + def _start(self): + self.client = yield tornado.websocket.websocket_connect(url="ws://127.0.0.1:7777/") + print("connected") + for req in self.requests: + self.client.write_message(req) + self.requests = [] + while True: + msg = yield self.client.read_message() + self._on_receive_msg(msg) + + def _send_json(self, obj): + s = json.dumps(obj) + if self.client: + self.client.write_message(s) + else: + self.requests.append(s) + + def _on_receive_msg(self, msg): + print("msg", msg) + pack = json.loads(msg) + datas = pack["data"] + event1 = Event(type_='eTianQin.') + event1.dict_['data'] = datas + self.eventEngine.put(event1) + + def _process_pack(self, event): + datas = event.dict_['data'] + #更新数据到存储后端 + self.back_end.input_data_pack(datas) + #发出数据变更通知 + for data in datas: + for selector, section in data.items(): + if selector == "quotes": + if self.quote_callback_func: + for ins_id in section.keys(): + self.quote_callback_func(ins_id) + elif selector == "ticks": + for ins_id in section.keys(): + chart_id = "VN_%s_%d" % (ins_id, 0) + callback_func = self.chart_callback_func.get(chart_id, None) + if callback_func: + callback_func(ins_id, 0) + elif selector == "klines": + for ins_id, sub_section in section.items(): + for dur_nanoseconds in sub_section.keys(): + dur_seconds = int(dur_nanoseconds) / 1000000000 + chart_id = "VN_%s_%d" % (ins_id, dur_seconds) + callback_func = self.chart_callback_func.get(chart_id, None) + if callback_func: + callback_func(ins_id, dur_seconds) + From a63cb600b46822a54f46c13d4c85b352a5cacd75 Mon Sep 17 00:00:00 2001 From: yangyang Date: Wed, 30 Aug 2017 21:33:04 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E5=A2=9E=E5=8A=A0tianqin=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- setup.py | 3 + vnpy/data/README.md | 3 +- vnpy/data/tq/__init__.py | 0 vnpy/data/tq/test.py | 45 +++++++ vnpy/data/tq/vntq.py | 264 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 314 insertions(+), 1 deletion(-) create mode 100644 vnpy/data/tq/__init__.py create mode 100644 vnpy/data/tq/test.py create mode 100644 vnpy/data/tq/vntq.py diff --git a/setup.py b/setup.py index a584359f..a3c44866 100644 --- a/setup.py +++ b/setup.py @@ -60,4 +60,7 @@ setup( '*.h', '*.cpp', '*.bash', '*.txt', '*.dll', '*.lib', '*.so', '*.pyd', '*.dat', '*.ini', '*.pfx', '*.scc', '*.crt', '*.key']}, + extras_require={ + 'tq': ["tornado>=4.5.1", "sortedcontainers>=1.5.7"], + } ) \ No newline at end of file diff --git a/vnpy/data/README.md b/vnpy/data/README.md index b00cb83f..fd16a850 100644 --- a/vnpy/data/README.md +++ b/vnpy/data/README.md @@ -2,4 +2,5 @@ ### 历史数据 * datayes:通联数据接口 -* shcifco:上海中期接口 \ No newline at end of file +* shcifco:上海中期接口 +* tq:天勤数据接口 \ No newline at end of file diff --git a/vnpy/data/tq/__init__.py b/vnpy/data/tq/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/vnpy/data/tq/test.py b/vnpy/data/tq/test.py new file mode 100644 index 00000000..2c8f254f --- /dev/null +++ b/vnpy/data/tq/test.py @@ -0,0 +1,45 @@ +# encoding: UTF-8 + + +from vntq import TqApi + + +# 接口对象 +api = None + + +#---------------------------------------------------------------------- +def onQuote(symbol): + """Tick更新""" + print '-' * 30 + print 'onQuote' + quote = api.get_quote(symbol) + print quote + +#---------------------------------------------------------------------- +def onChart(symbol, seconds): + """K线更新""" + print '-' * 30 + print 'onChart' + + if seconds == 0: + serial = api.get_tick_serial(symbol) + else: + serial = api.get_kline_serial(symbol, seconds) + + print serial + + +if __name__ == "__main__": + api = TqApi() + + api.connect() + + api.subscribe_quote(["RM801"], onQuote) + api.subscribe_chart("RM801", 0, 100, onChart) + + raw_input() + + + + \ No newline at end of file diff --git a/vnpy/data/tq/vntq.py b/vnpy/data/tq/vntq.py new file mode 100644 index 00000000..ba91c0f3 --- /dev/null +++ b/vnpy/data/tq/vntq.py @@ -0,0 +1,264 @@ +# encoding: UTF-8 + +""" +对接天勤行情的网关接口,可以提供国内期货的报价/K线/Tick序列等数据的实时推送和历史仿真 +使用时需要在本机先启动一个天勤终端进程 +天勤行情终端: http://www.tq18.cn +""" + + +import json +import threading +import tornado +from tornado import websocket +from sortedcontainers import SortedDict + +######################################################################## +class TqApi(object): + """天勤行情接口""" + + #---------------------------------------------------------------------- + def __init__(self): + """Constructor""" + self.data = {} # 数据存储 + + self.client = None # websocket客户端 + self.requests = [] # 请求缓存 + + self.quote_callback_func = None # tick回调函数 + self.chart_subscribes = {} # k线回调函数 + + #---------------------------------------------------------------------- + def connect(self): + """ + 建立行情连接。 + """ + self.start() + + # 启动tornado的IO线程 + loop_thread = threading.Thread(target=lambda : tornado.ioloop.IOLoop.current().start()) + loop_thread.setDaemon(True) + loop_thread.start() + + #---------------------------------------------------------------------- + def subscribe_quote(self, ins_list, callback_func=None): + """ + 订阅实时行情. + 指定一个合约列表,订阅其实时报价信息 + 每次调用此函数时,都会覆盖前一次的订阅设定,不在订阅列表里的合约,会停止行情推送 + :param ins_list: ins_list 是一个列表,列出全部需要实时行情的合约代码。 + :param callback_func (可选): callback_func 是一个回调函数,每当有报价数据变更时会触发。此函数应该接受一个参数 ins_id + :example: + 订阅 cu1803,SR709,IF1709 这三个合约的报价: subscribe_quote(["cu1803", ”SR709", "IF1709"]) + """ + if callback_func: + self.quote_callback_func = callback_func + + req = { + "aid": "subscribe_quote", + "ins_list": ",".join(ins_list), + } + self.send_json(req) + + #---------------------------------------------------------------------- + def subscribe_chart(self, ins_id, duration_seconds, data_length=200, callback_func=None): + """ + 订阅历史行情序列. + 订阅指定合约及周期的历史行情序列(K线数据序列或Tick数据序列),这些序列数据会持续推送 + :param ins_id: 合约代码,需注意大小写 + :param duration_seconds: 历史数据周期,以秒为单位。目前支持的周期包括: + 3秒,5秒,10秒,15秒,20秒,30秒,1分钟,2分钟,3分钟,5分钟,10分钟,15分钟,20分钟,30分钟,1小时,2小时,4小时,1日 + 特别的,此值指定为0表示订阅tick序列。 + :param data_length: 需要获取的序列长度。每个序列最大支持请求 8964 个数据 + :param callback_func (可选): callback_func 是一个回调函数,每当序列数据变更时会触发。此函数应该接受2个参数 ins_id, duration_seconds + :example: + 订阅 cu1803 的1分钟线: subscribe_chart("s1", "cu1803", 60) + 订阅 IF1709 的tick线: subscribe_chart("s2", "IF1709", 0) + """ + chart_id = self._generate_chart_id(ins_id, duration_seconds) + + # 限制最大数据长度 + if data_length > 8964: + data_length = 8964 + + req = { + "aid": "set_chart", + "chart_id": chart_id, + "ins_list": ins_id, + "duration": duration_seconds * 1000000000, + "view_width": data_length, + } + self.send_json(req) + self.chart_subscribes[chart_id] = req + self.chart_subscribes[chart_id]["callback"] = callback_func + + #---------------------------------------------------------------------- + def get_quote(self, ins_id): + """ + 获取报价数据 + :param ins_id: 指定合约代码 + :return: 若指定的数据不存在,返回None,否则返回如下所示的一个dict + { + u'datetime': u'2017-07-26 23:04:21.000001',# tick从交易所发出的时间(按北京时区) + u'instrument_id': u'SR801', # 合约代码 + u'last_price': 6122.0, # 最新价 + u'bid_price1': 6121.0, # 买一价 + u'ask_price1': 6122.0, # 卖一价 + u'bid_volume1': 54, # 买一量 + u'ask_volume1': 66, # 卖一量 + u'upper_limit': 6388.0, # 涨停价 + u'lower_limit': 5896.0, # 跌停价 + u'volume': 89252, # 成交量 + u'amount': 5461329880.0, # 成交额 + u'open_interest': 616424, # 持仓量 + u'highest': 6129.0, # 当日最高价 + u'lowest': 6101.0, # 当日最低价 + u'average': 6119.0, # 当日均价 + u'open': 6102.0, # 开盘价 + u'close': u'-', # 收盘价 + u'settlement': u'-', # 结算价 + u'pre_close': 6106.0, # 昨收盘价 + u'pre_settlement': 6142.0 # 昨结算价 + u'pre_open_interest': 616620, # 昨持仓量 + } + """ + return self.data.setdefault("quotes", {}).get(ins_id, None) + + #---------------------------------------------------------------------- + def get_tick_serial(self, ins_id): + """ + 获取tick序列数据 + :param ins_id: 指定合约代码 + :return: 若指定的序列数据不存在,返回None,否则返回如下所示的一个dict + { + u'485107':{ # 每个Tick都有一个唯一编号,在一个序列中,编号总是连续递增的 + u'datetime': 1501074872000000000L, # tick从交易所发出的时间(按北京时区),以nano epoch 方式表示(等于从1970-01-01时刻开始的纳秒数) + u'trading_day': 1501084800000000000L, #交易日, 格式同上 + u'last_price': 3887, # 最新价 + u'bid_price1': 3881, # 买一价 + u'ask_price1': 3886, # 卖一价 + u'bid_volume1': 5, # 买一量 + u'ask_volume1': 1, #卖一量 + u'highest': 3887, # 当日最高价 + u'lowest': 3886, # 当日最低价 + u'volume': 6, # 成交量 + u'open_interest': 1796 # 持仓量 + }, + u'485108': { + ... + } + } + """ + return self.data.setdefault("ticks", {}).setdefault(ins_id, {}).get("data", None) + + #---------------------------------------------------------------------- + def get_kline_serial(self, ins_id, duration_seconds): + """ + 获取k线序列数据 + :param ins_id: 指定合约代码 + :param duration_seconds: 指定K线周期 + :return: 若指定的序列数据不存在,返回None,否则返回如下所示的一个dict + { + u'494835': { # 每根K线都有一个唯一编号,在一个序列中,编号总是连续递增的 + u'datetime': 1501080715000000000L, # K线起点时间(按北京时区),以nano epoch 方式表示(等于从1970-01-01时刻开始的纳秒数) + u'open': 51450, # K线起始时刻的最新价 + u'high': 51450, # K线时间范围内的最高价 + u'low': 51450, # K线时间范围内的最低价 + u'close': 51450, # K线结束时刻的最新价 + u'volume': 0, # K线时间范围内的成交量 + u'open_oi': 27354, # K线起始时刻的持仓量 + u'close_oi': 27354 # K线结束时刻的持仓量 + }, + u'494836': { + ... + } + } + """ + dur_id = "%d" % (duration_seconds * 1000000000) + return self.data.setdefault("klines", {}).setdefault(ins_id, {}).setdefault(dur_id, {}).get("data", None) + + #---------------------------------------------------------------------- + @tornado.gen.coroutine + def start(self): + """启动websocket客户端""" + self.client = yield tornado.websocket.websocket_connect(url="ws://127.0.0.1:7777/") + + # 发出所有缓存的请求 + for req in self.requests: + self.client.write_message(req) + self.requests = [] + + # 协程式读取数据 + while True: + msg = yield self.client.read_message() + self.on_receive_msg(msg) + + #---------------------------------------------------------------------- + def send_json(self, obj): + """发送JSON内容""" + s = json.dumps(obj) + + # 如果已经创建了客户端则直接发出请求 + if self.client: + self.client.write_message(s) + # 否则缓存在请求缓存中 + else: + self.requests.append(s) + + #---------------------------------------------------------------------- + def on_receive_msg(self, msg): + """收到数据推送""" + pack = json.loads(msg) + l = pack["data"] + + for data in l: + # 合并更新数据字典 + self._merge_obj(self.data, data) + # 遍历更新内容并调用回调函数 + for selector, section in data.items(): + if selector == "quotes": + if self.quote_callback_func: + for ins_id in section.keys(): + self.quote_callback_func(ins_id) + + elif selector == "ticks": + for ins_id in section.keys(): + chart_id = self._generate_chart_id(ins_id, 0) + sub_info = self.chart_subscribes.get(chart_id) + tick_serial = self.get_tick_serial(ins_id) + while len(tick_serial) > sub_info["view_width"]: + tick_serial.popitem(last=False) + callback_func = sub_info["callback"] + if callback_func: + callback_func(ins_id, 0) + + elif selector == "klines": + for ins_id, sub_section in section.items(): + for dur_nanoseconds in sub_section.keys(): + dur_seconds = int(dur_nanoseconds) / 1000000000 + chart_id = self._generate_chart_id(ins_id, dur_seconds) + sub_info = self.chart_subscribes.get(chart_id) + kline_serial = self.get_kline_serial(ins_id, dur_seconds) + while len(kline_serial) > sub_info["view_width"]: + kline_serial.popitem(last=False) + callback_func = sub_info["callback"] + if callback_func: + callback_func(ins_id, dur_seconds) + + #---------------------------------------------------------------------- + def _merge_obj(self, result, obj): + """合并对象""" + for key, value in obj.items(): + if value is None: + result.pop(key, None) + elif isinstance(value, dict): + target = result.setdefault(key, SortedDict()) + self._merge_obj(target, value) + else: + result[key] = value + + #---------------------------------------------------------------------- + def _generate_chart_id(self, ins_id, duration_seconds): + """生成图表编号""" + chart_id = "VN_%s_%d" % (ins_id, duration_seconds) + return chart_id \ No newline at end of file From 62043eab2e1db45455b11e82086b35449b4040a1 Mon Sep 17 00:00:00 2001 From: yangyang Date: Thu, 31 Aug 2017 08:05:54 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=A4=A9=E5=8B=A4?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E5=8E=9F=E5=9E=8B=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/TianQinDataService/demo.py | 45 ---- vnpy/data/tianqin/__init__.py | 0 vnpy/data/tianqin/vntianqin.py | 312 ---------------------------- 3 files changed, 357 deletions(-) delete mode 100644 examples/TianQinDataService/demo.py delete mode 100644 vnpy/data/tianqin/__init__.py delete mode 100644 vnpy/data/tianqin/vntianqin.py diff --git a/examples/TianQinDataService/demo.py b/examples/TianQinDataService/demo.py deleted file mode 100644 index ddcd97ce..00000000 --- a/examples/TianQinDataService/demo.py +++ /dev/null @@ -1,45 +0,0 @@ -# encoding: UTF-8 - -# 重载sys模块,设置默认字符串编码方式为utf8 -import sys -reload(sys) -sys.setdefaultencoding('utf8') - -from vnpy.event import EventEngine -from vnpy.data.tianqin.vntianqin import TianQinGateway, DataBackEndMemory, DataBackendMongo -from vnpy.trader.uiQt import createQApp - - -class DemoApp(object): - # ---------------------------------------------------------------------- - def __init__(self): - """Constructor""" - self.eventEngine = EventEngine() - self.eventEngine.start() - self.tianqinGateway = TianQinGateway(self.eventEngine, back_end=DataBackEndMemory()) - # mc = MongoClient(MONGO_HOST, MONGO_PORT) # Mongo连接 - # self.tianqinGateway = TianQinGateway(self.eventEngine, back_end=DataBackendMongo(mc)) - - def start(self): - self.tianqinGateway.connect() - self.tianqinGateway.subscribe_quote(["cu1803", "SR801", "c1801", "IF1708"], self.on_quote_data) - self.tianqinGateway.subscribe_chart("cu1803", 5, 1000, self.on_chart_data) - self.tianqinGateway.subscribe_chart("au1712", 0, 1000, self.on_chart_data) - - def on_quote_data(self, ins_id): - quote = self.tianqinGateway.get_quote(ins_id) - print("quote_update", ins_id, quote) - - def on_chart_data(self, ins_id, dur_seconds): - if dur_seconds == 0: - tick_serial = self.tianqinGateway.get_tick_serial(ins_id) - print("tick_serial_update", tick_serial) - else: - kline_serial = self.tianqinGateway.get_kline_serial(ins_id, dur_seconds) - print("kline_serial_update", kline_serial) - -#---------------------------------------------------------------------- -if __name__ == '__main__': - app = DemoApp() - app.start() - sys.exit(createQApp().exec_()) diff --git a/vnpy/data/tianqin/__init__.py b/vnpy/data/tianqin/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/vnpy/data/tianqin/vntianqin.py b/vnpy/data/tianqin/vntianqin.py deleted file mode 100644 index be1c2438..00000000 --- a/vnpy/data/tianqin/vntianqin.py +++ /dev/null @@ -1,312 +0,0 @@ -# encoding: UTF-8 - -""" -对接天勤行情的网关接口,可以提供国内期货的 报价/K线/Tick序列 等数据的实时推送和历史仿真 -使用时需要在本机先启动一个天勤终端进程(http://www.tq18.cn) - -使用示例见: - - -""" - - -import json -import threading -import sortedcontainers -import tornado.ioloop -import tornado.iostream -import tornado.websocket -from tornado import gen - -from vnpy.event import * - - -######################################################################## -class DataBackEndMemory(object): - def __init__(self): - self.data = sortedcontainers.SortedDict() - - def input_data_pack(self, pack): - for data in pack: - self.data = self._dict_merge(self.data, data) - - def get_tick_serial(self, ins_id): - return self.data.setdefault("ticks", {}).setdefault(ins_id, {}).get("data", None) - - def get_kline_serial(self, ins_id, duration_seconds): - dur_id = "%d" % (duration_seconds * 1000000000) - return self.data.setdefault("klines", {}).setdefault(ins_id, {}).setdefault(dur_id, {}).get("data", None) - - def _dict_merge(self, *objs): - result = objs[0] - for obj in objs[1:]: - result = self._merge_obj(result, obj) - return result - - def _merge_obj(self, result, obj): - if not isinstance(result, dict): - result = {} - if not isinstance(obj, dict): - return obj - for key, value in obj.items(): - if isinstance(value, dict): - target = result.get(key) - if isinstance(target, dict): - self._merge_obj(target, value) - continue - result[key] = {} - self._merge_obj(result[key], value) - continue - if value is None: - result.pop(key, None) - continue - result[key] = value - return result - - -class DataBackendMongo(object): - def __init__(self, mc): - self.dbClient = mc - - def input_data_pack(self, pack): - for data in pack: - for selector, section in data.items(): - if selector == "ticks": - self.process_tick_data(section) - elif selector == "klines": - self.process_kline_data(section) - - def process_tick_data(self, section): - db_name = "TICK" - db = self.dbClient[db_name] # 数据库 - for ins_id, serials in section.items(): - cl = db[ins_id] - for tick_id, tick in serials: - flt = {'datetime': tick.datetime} - cl.replace_one(flt, tick, True) - - def get_tick_serial(self, ins_id): - collection = self.dbClient["TICK"][ins_id] - cursor = collection.find({}).sort('datetime') - return cursor - - def process_kline_data(self, section): - #todo - pass - - def get_kline_serial(self, ins_id, duration_seconds): - #todo - pass - - -class TianQinGateway(object): - """天勤行情服务""" - - # ---------------------------------------------------------------------- - def __init__(self, eventEngine, back_end=DataBackEndMemory()): - """Constructor""" - self.client = None - self.back_end = back_end - self.data = {} - self.requests = [] - self.eventEngine = eventEngine - self.eventEngine.register('eTianQin.', self._process_pack) - self.quote_callback_func = None - self.chart_callback_func = {} - - # ---------------------------------------------------------------------- - def connect(self): - """ - 建立行情连接。 - """ - self._start() - loop_thread = threading.Thread(target=lambda: tornado.ioloop.IOLoop.current().start()) - loop_thread.setDaemon(True) - loop_thread.start() - - # ---------------------------------------------------------------------- - def subscribe_quote(self, ins_list, notify_func=None): - """ - 订阅实时行情. - 指定一个合约列表,订阅其实时报价信息 - 每次调用此函数时,都会覆盖前一次的订阅设定,不在订阅列表里的合约,会停止行情推送 - :param ins_list: ins_list 是一个列表,列出全部需要实时行情的合约代码。 - :param notify_func (可选): callback_func 是一个回调函数,每当有报价数据变更时会触发。此函数应该接受一个参数 ins_id - :example: - 订阅 cu1803,SR709,IF1709 这三个合约的报价: subscribe_quote(["cu1803", ”SR709", "IF1709"]) - """ - if notify_func: - self.quote_callback_func = notify_func - req = { - "aid": "subscribe_quote", - "ins_list": ",".join(ins_list), - } - self._send_json(req) - - # ---------------------------------------------------------------------- - def subscribe_chart(self, ins_id, duration_seconds, data_length=200, notify_func=None): - """ - 订阅历史行情序列 - 订阅指定合约及周期的历史数据序列(K线数据序列或Tick数据序列),这些序列数据会持续推送 - :param ins_id: 合约代码,需注意大小写 - :param duration_seconds: 历史数据周期,以秒为单位。特别的,此值指定为0表示订阅tick序列。目前支持的周期包括: - 3秒,5秒,10秒,15秒,20秒,30秒,1分钟,2分钟,3分钟,5分钟,10分钟,15分钟,20分钟,30分钟,1小时,2小时,4小时,1日 - :param data_length: 需要获取的序列长度。每个序列最大支持请求 8964 个数据 - :param notify_func (可选): notify_func 是一个回调函数,每当序列数据变更时会触发。此函数应该接受2个参数 ins_id, duration_seconds - :example: - 订阅 cu1803 的1分钟线: subscribe_chart("cu1803", 60) - 订阅 IF1709 的tick线: subscribe_chart("IF1709", 0) - """ - chart_id = "VN_%s_%d" % (ins_id, duration_seconds) - if data_length > 8964: - data_length = 8964 - if notify_func: - self.chart_callback_func[chart_id] = notify_func - req = { - "aid": "set_chart", - "chart_id": chart_id, - "ins_list": ins_id, - "duration": duration_seconds * 1000000000, - "view_width": data_length, - } - self._send_json(req) - - # ---------------------------------------------------------------------- - def get_quote(self, ins_id): - """ - 获取报价数据 - :param ins_id: 指定合约代码 - :return: 若指定的数据不存在,返回None,否则返回如下所示的一个dict - { - u'datetime': u'2017-07-26 23:04:21.000001',# tick从交易所发出的时间(按北京时区) - u'instrument_id': u'SR801', # 合约代码 - u'last_price': 6122.0, # 最新价 - u'bid_price1': 6121.0, # 买一价 - u'ask_price1': 6122.0, # 卖一价 - u'bid_volume1': 54, # 买一量 - u'ask_volume1': 66, # 卖一量 - u'upper_limit': 6388.0, # 涨停价 - u'lower_limit': 5896.0, # 跌停价 - u'volume': 89252, # 成交量 - u'amount': 5461329880.0, # 成交额 - u'open_interest': 616424, # 持仓量 - u'highest': 6129.0, # 当日最高价 - u'lowest': 6101.0, # 当日最低价 - u'average': 6119.0, # 当日均价 - u'open': 6102.0, # 开盘价 - u'close': u'-', # 收盘价 - u'settlement': u'-', # 结算价 - u'pre_close': 6106.0, # 昨收盘价 - u'pre_settlement': 6142.0 # 昨结算价 - u'pre_open_interest': 616620, # 昨持仓量 - } - """ - return self.data.setdefault("quotes", {}).get(ins_id, None) - - # ---------------------------------------------------------------------- - def get_tick_serial(self, ins_id): - """ - 获取tick序列数据 - :param ins_id: 指定合约代码 - :return: 若指定的序列数据不存在,返回None,否则返回如下所示的一个dict - { - u'485107':{ # 每个Tick都有一个唯一编号,在一个序列中,编号总是连续递增的 - u'datetime': 1501074872000000000L, # tick从交易所发出的时间(按北京时区),以nano epoch 方式表示(等于从1970-01-01时刻开始的纳秒数) - u'trading_day': 1501084800000000000L, #交易日, 格式同上 - u'last_price': 3887, # 最新价 - u'bid_price1': 3881, # 买一价 - u'ask_price1': 3886, # 卖一价 - u'bid_volume1': 5, # 买一量 - u'ask_volume1': 1, #卖一量 - u'highest': 3887, # 当日最高价 - u'lowest': 3886, # 当日最低价 - u'volume': 6, # 成交量 - u'open_interest': 1796 # 持仓量 - }, - u'485108': { - ... - } - } - """ - return self.back_end.get_tick_serial(ins_id) - - # ---------------------------------------------------------------------- - def get_kline_serial(self, ins_id, duration_seconds): - """ - 获取k线序列数据 - :param ins_id: 指定合约代码 - :param duration_seconds: 指定K线周期 - :return: 若指定的序列数据不存在,返回None,否则返回如下所示的一个dict - { - u'494835': { # 每根K线都有一个唯一编号,在一个序列中,编号总是连续递增的 - u'datetime': 1501080715000000000L, # K线起点时间(按北京时区),以nano epoch 方式表示(等于从1970-01-01时刻开始的纳秒数) - u'open': 51450, # K线起始时刻的最新价 - u'high': 51450, # K线时间范围内的最高价 - u'low': 51450, # K线时间范围内的最低价 - u'close': 51450, # K线结束时刻的最新价 - u'volume': 0, # K线时间范围内的成交量 - u'open_oi': 27354, # K线起始时刻的持仓量 - u'close_oi': 27354 # K线结束时刻的持仓量 - }, - u'494836': { - ... - } - } - - """ - dur_id = "%d" % (duration_seconds * 1000000000) - return self.back_end.get_kline_serial(ins_id, duration_seconds) - - # ---------------------------------------------------------------------- - @gen.coroutine - def _start(self): - self.client = yield tornado.websocket.websocket_connect(url="ws://127.0.0.1:7777/") - print("connected") - for req in self.requests: - self.client.write_message(req) - self.requests = [] - while True: - msg = yield self.client.read_message() - self._on_receive_msg(msg) - - def _send_json(self, obj): - s = json.dumps(obj) - if self.client: - self.client.write_message(s) - else: - self.requests.append(s) - - def _on_receive_msg(self, msg): - print("msg", msg) - pack = json.loads(msg) - datas = pack["data"] - event1 = Event(type_='eTianQin.') - event1.dict_['data'] = datas - self.eventEngine.put(event1) - - def _process_pack(self, event): - datas = event.dict_['data'] - #更新数据到存储后端 - self.back_end.input_data_pack(datas) - #发出数据变更通知 - for data in datas: - for selector, section in data.items(): - if selector == "quotes": - if self.quote_callback_func: - for ins_id in section.keys(): - self.quote_callback_func(ins_id) - elif selector == "ticks": - for ins_id in section.keys(): - chart_id = "VN_%s_%d" % (ins_id, 0) - callback_func = self.chart_callback_func.get(chart_id, None) - if callback_func: - callback_func(ins_id, 0) - elif selector == "klines": - for ins_id, sub_section in section.items(): - for dur_nanoseconds in sub_section.keys(): - dur_seconds = int(dur_nanoseconds) / 1000000000 - chart_id = "VN_%s_%d" % (ins_id, dur_seconds) - callback_func = self.chart_callback_func.get(chart_id, None) - if callback_func: - callback_func(ins_id, dur_seconds) -