diff --git a/beta/api/korbit/vnkorbit.py b/beta/api/korbit/vnkorbit.py index 0f134325..ff123ed6 100644 --- a/beta/api/korbit/vnkorbit.py +++ b/beta/api/korbit/vnkorbit.py @@ -4,6 +4,7 @@ import urllib import hashlib import json +import logging import requests import hmac import time @@ -11,7 +12,7 @@ from datetime import datetime from time import time, sleep , mktime from Queue import Queue, Empty from threading import Thread -import urllib +import urllib import websocket import inspect @@ -49,9 +50,9 @@ class Korbit_TradeApi(object): # self.reqQueue = Queue() # 请求队列 self.reqQueue = [] # 请求的队列 - self.reqThread = Thread(target=self.processQueue) # 请求处理线程 + self.reqThread = Thread(target=self.processQueue) # 请求处理线程 - self.DEBUG = True + self.DEBUG = True ''' 直接发送 request ,获得身份 @@ -104,9 +105,9 @@ class Korbit_TradeApi(object): def exit(self): """退出""" self.active = False - + if self.reqThread.isAlive(): - self.reqThread.join() + self.reqThread.join() @property def nonce(self): @@ -173,7 +174,7 @@ class Korbit_TradeApi(object): return data except Exception,ex: print ex - return None + return None #---------------------------------------------------------------------- def processQueue(self): @@ -184,10 +185,10 @@ class Korbit_TradeApi(object): if len(self.reqQueue) > 0: (Type , req) = self.reqQueue[0] self.reqQueue.pop(0) - + callback = req['callback'] reqID = req['reqID'] - + data = self.processRequest(req) # 请求成功 @@ -195,18 +196,18 @@ class Korbit_TradeApi(object): if self.DEBUG: print callback.__name__ callback(data, req, reqID) - + sleep(0.1) except Exception,ex: print ex - + #---------------------------------------------------------------------- def sendRequest(self, url , method, callback, kwargs = None,optional=None): """发送请求""" # 请求编号加1 self.reqID += 1 - + # 生成请求字典并放入队列中 req = {} req['url'] = url @@ -227,26 +228,26 @@ class Korbit_TradeApi(object): else: self.reqQueue.append( (method , req)) #self.reqQueue.put(req) - + # 返回请求编号 return self.reqID #---------------------------------------------------------------------- def exit(self): """退出""" self.active = False - + if self.reqThread.isAlive(): - self.reqThread.join() + self.reqThread.join() #################################################### ## 主动函数 - #################################################### + #################################################### #---------------------------------------------------------------------- def init(self, accessKey, secretKey , username , password): """初始化""" self.accessKey = accessKey self.secretKey = secretKey - + self.create_token_directly( username , password) self.active = True @@ -293,7 +294,7 @@ class Korbit_TradeApi(object): class Korbit_DataApi(object): simple_ticker_url = korbit_host + "ticker" - detail_ticker_url = korbit_host + "ticker/detailed" + detail_ticker_url = korbit_host + "ticker/detailed" orderbook_url = korbit_host + "orderbook" transactions_url = korbit_host + "transactions" constants_url = korbit_host + "constants" @@ -310,7 +311,7 @@ class Korbit_DataApi(object): """初始化""" self.taskInterval = interval self.DEBUG = debug - + self.active = True self.taskThread.start() @@ -318,10 +319,10 @@ class Korbit_DataApi(object): def exit(self): """退出""" self.active = False - + if self.taskThread.isAlive(): self.taskThread.join() - + #---------------------------------------------------------------------- def run(self): """连续运行""" @@ -353,11 +354,11 @@ class Korbit_DataApi(object): url = self.transactions_url + "?currency_pair=" + symbol + "&time=" + "minute" task = (url, self.onTrades , symbol) self.taskList.append(task) - + #---------------------------------------------------------------------- def subscribeOrderbooks(self, symbol): """订阅实时成交数据""" - url = self.orderbook_url + "?currency_pair=" + symbol + url = self.orderbook_url + "?currency_pair=" + symbol task = (url, self.onOrderbooks , symbol) self.taskList.append(task) @@ -374,4 +375,3 @@ class Korbit_DataApi(object): def onOrderbooks(self, data): """实时成交推送""" print data - diff --git a/examples/TqDataService/config.json b/examples/TqDataService/config.json index ff747220..467e7857 100644 --- a/examples/TqDataService/config.json +++ b/examples/TqDataService/config.json @@ -1,8 +1,4 @@ { "MONGO_HOST": "localhost", - "MONGO_PORT": 27017, - - "SYMBOLS": ["IF1710", "IF1711", "IF1712", "IF1803", - "IH1710", "IH1711", "IH1712", "IH1803", - "IC1710", "IC1711", "IC1712", "IC1803"] + "MONGO_PORT": 27017 } \ No newline at end of file diff --git a/examples/TqDataService/dataService.py b/examples/TqDataService/dataService.py index 0861fe22..cfeb3b93 100644 --- a/examples/TqDataService/dataService.py +++ b/examples/TqDataService/dataService.py @@ -18,7 +18,6 @@ setting = json.load(config) MONGO_HOST = setting['MONGO_HOST'] MONGO_PORT = setting['MONGO_PORT'] -SYMBOLS = setting['SYMBOLS'] mc = MongoClient(MONGO_HOST, MONGO_PORT) # Mongo连接 db = mc[MINUTE_DB_NAME] # 数据库 @@ -80,16 +79,16 @@ def downMinuteBarBySymbol(symbol, num): api.subscribe_chart(symbol, 60, num, onChart) #---------------------------------------------------------------------- -def downloadAllMinuteBar(num): +def downloadAllMinuteBar(num, symbols): """下载所有配置中的合约的分钟线数据""" print '-' * 50 print u'开始下载合约分钟线数据' print '-' * 50 # 添加下载任务 - taskList.extend(SYMBOLS) + taskList.extend(symbols) - for symbol in SYMBOLS: + for symbol in symbols: downMinuteBarBySymbol(str(symbol), num) while True: diff --git a/examples/TqDataService/downloadData.py b/examples/TqDataService/downloadData.py index 3cf6f3cc..246df77a 100644 --- a/examples/TqDataService/downloadData.py +++ b/examples/TqDataService/downloadData.py @@ -2,10 +2,16 @@ """ 立即下载数据到数据库中,用于手动执行更新操作。 + +注意: 请先在本机启动天勤终端 (0.8.0 以上版本) 并保持运行, 再执行本程序 """ from dataService import * + if __name__ == '__main__': - downloadAllMinuteBar(1000) \ No newline at end of file + symbols = ["CFFEX.IF1710", "CFFEX.IF1711", "CFFEX.IF1712", "CFFEX.IF1803", + "CFFEX.IH1710", "CFFEX.IH1711", "CFFEX.IH1712", "CFFEX.IH1803", + "CFFEX.IC1710", "CFFEX.IC1711", "CFFEX.IC1712", "CFFEX.IC1803"] + downloadAllMinuteBar(1000, symbols) \ No newline at end of file diff --git a/examples/TqDataService/runService.py b/examples/TqDataService/runService.py index 17d6d165..3b5d356f 100644 --- a/examples/TqDataService/runService.py +++ b/examples/TqDataService/runService.py @@ -2,6 +2,8 @@ """ 定时服务,可无人值守运行,实现每日自动下载更新历史行情数据到数据库中。 + +注意: 请确保本程序运行时, 本机天勤终端 (0.8.0 以上版本)正在运行中 """ import time @@ -15,7 +17,11 @@ if __name__ == '__main__': # 生成一个随机的任务下载时间,用于避免所有用户在同一时间访问数据服务器 taskTime = datetime.time(hour=17, minute=0) - + + symbols = ["CFFEX.IF1710", "CFFEX.IF1711", "CFFEX.IF1712", "CFFEX.IF1803", + "CFFEX.IH1710", "CFFEX.IH1711", "CFFEX.IH1712", "CFFEX.IH1803", + "CFFEX.IC1710", "CFFEX.IC1711", "CFFEX.IC1712", "CFFEX.IC1803"] + # 进入主循环 while True: t = datetime.datetime.now() @@ -23,7 +29,7 @@ if __name__ == '__main__': # 每天到达任务下载时间后,执行数据下载的操作 if t.time() > taskTime and (taskCompletedDate is None or t.date() != taskCompletedDate): # 下载1000根分钟线数据,足以覆盖过去两天的行情 - downloadAllMinuteBar(1000) + downloadAllMinuteBar(1000, symbols) # 更新任务完成的日期 taskCompletedDate = t.date() diff --git a/vnpy/data/tq/vntq.py b/vnpy/data/tq/vntq.py index 8f3a9797..87268c75 100644 --- a/vnpy/data/tq/vntq.py +++ b/vnpy/data/tq/vntq.py @@ -277,4 +277,5 @@ class TqApi(object): def _generate_chart_id(self, ins_id, duration_seconds): """生成图表编号""" chart_id = "VN_%s_%d" % (ins_id, duration_seconds) + chart_id = chart_id.replace(".", "_") return chart_id \ No newline at end of file diff --git a/vnpy/rpc/testClient.py b/vnpy/rpc/testClient.py index e613cad5..d4d03fd3 100644 --- a/vnpy/rpc/testClient.py +++ b/vnpy/rpc/testClient.py @@ -1,5 +1,6 @@ # encoding: UTF-8 +from __future__ import print_function from time import sleep from vnrpc import RpcClient @@ -13,21 +14,21 @@ class TestClient(RpcClient): def __init__(self, reqAddress, subAddress): """Constructor""" super(TestClient, self).__init__(reqAddress, subAddress) - + #---------------------------------------------------------------------- def callback(self, topic, data): """回调函数实现""" - print 'client received topic:', topic, ', data:', data - + print('client received topic:', topic, ', data:', data) + if __name__ == '__main__': reqAddress = 'tcp://localhost:2014' subAddress = 'tcp://localhost:0602' - + tc = TestClient(reqAddress, subAddress) tc.subscribeTopic('') tc.start() - + while 1: - print tc.add(1, 3) - sleep(2) \ No newline at end of file + print(tc.add(1, 3)) + sleep(2) diff --git a/vnpy/rpc/testServer.py b/vnpy/rpc/testServer.py index f067603b..cb5760b1 100644 --- a/vnpy/rpc/testServer.py +++ b/vnpy/rpc/testServer.py @@ -1,5 +1,6 @@ # encoding: UTF-8 +from __future__ import print_function from time import sleep, time from vnrpc import RpcServer @@ -19,19 +20,19 @@ class TestServer(RpcServer): #---------------------------------------------------------------------- def add(self, a, b): """测试函数""" - print 'receiving: %s, %s' % (a,b) + print('receiving: %s, %s' % (a,b)) return a + b if __name__ == '__main__': repAddress = 'tcp://*:2014' pubAddress = 'tcp://*:0602' - + ts = TestServer(repAddress, pubAddress) ts.start() - + while 1: content = 'current server time is %s' % time() - print content + print(content) ts.publish('test', content) - sleep(2) \ No newline at end of file + sleep(2) diff --git a/vnpy/rpc/vnrpc.py b/vnpy/rpc/vnrpc.py index cc6d7275..7b533eab 100644 --- a/vnpy/rpc/vnrpc.py +++ b/vnpy/rpc/vnrpc.py @@ -21,16 +21,16 @@ signal.signal(signal.SIGINT, signal.SIG_DFL) class RpcObject(object): """ RPC对象 - + 提供对数据的序列化打包和解包接口,目前提供了json、msgpack、cPickle三种工具。 - + msgpack:性能更高,但通常需要安装msgpack相关工具; json:性能略低但通用性更好,大部分编程语言都内置了相关的库。 cPickle:性能一般且仅能用于Python,但是可以直接传送Python对象,非常方便。 - + 因此建议尽量使用msgpack,如果要和某些语言通讯没有提供msgpack时再使用json, 当传送的数据包含很多自定义的Python对象时建议使用cPickle。 - + 如果希望使用其他的序列化工具也可以在这里添加。 """ @@ -40,59 +40,59 @@ class RpcObject(object): # 默认使用msgpack作为序列化工具 #self.useMsgpack() self.usePickle() - + #---------------------------------------------------------------------- def pack(self, data): """打包""" pass - + #---------------------------------------------------------------------- def unpack(self, data): """解包""" pass - + #---------------------------------------------------------------------- def __jsonPack(self, data): """使用json打包""" return dumps(data) - + #---------------------------------------------------------------------- def __jsonUnpack(self, data): """使用json解包""" return loads(data) - + #---------------------------------------------------------------------- def __msgpackPack(self, data): """使用msgpack打包""" return packb(data) - + #---------------------------------------------------------------------- def __msgpackUnpack(self, data): """使用msgpack解包""" return unpackb(data) - + #---------------------------------------------------------------------- def __picklePack(self, data): """使用cPickle打包""" return pDumps(data) - + #---------------------------------------------------------------------- def __pickleUnpack(self, data): """使用cPickle解包""" return pLoads(data) - + #---------------------------------------------------------------------- def useJson(self): """使用json作为序列化工具""" self.pack = self.__jsonPack self.unpack = self.__jsonUnpack - + #---------------------------------------------------------------------- def useMsgpack(self): """使用msgpack作为序列化工具""" self.pack = self.__msgpackPack self.unpack = self.__msgpackUnpack - + #---------------------------------------------------------------------- def usePickle(self): """使用cPickle作为序列化工具""" @@ -108,43 +108,43 @@ class RpcServer(RpcObject): def __init__(self, repAddress, pubAddress): """Constructor""" super(RpcServer, self).__init__() - + # 保存功能函数的字典,key是函数名,value是函数对象 - self.__functions = {} + self.__functions = {} # zmq端口相关 self.__context = zmq.Context() - + self.__socketREP = self.__context.socket(zmq.REP) # 请求回应socket self.__socketREP.bind(repAddress) - + self.__socketPUB = self.__context.socket(zmq.PUB) # 数据广播socket self.__socketPUB.bind(pubAddress) - + # 工作线程相关 self.__active = False # 服务器的工作状态 self.__thread = threading.Thread(target=self.run) # 服务器的工作线程 - + #---------------------------------------------------------------------- def start(self): """启动服务器""" # 将服务器设为启动 self.__active = True - + # 启动工作线程 if not self.__thread.isAlive(): self.__thread.start() - + #---------------------------------------------------------------------- def stop(self): """停止服务器""" # 将服务器设为停止 self.__active = False - + # 等待工作线程退出 if self.__thread.isAlive(): self.__thread.join() - + #---------------------------------------------------------------------- def run(self): """服务器运行函数""" @@ -152,16 +152,16 @@ class RpcServer(RpcObject): # 使用poll来等待事件到达,等待1秒(1000毫秒) if not self.__socketREP.poll(1000): continue - + # 从请求响应socket收取请求数据 reqb = self.__socketREP.recv() - + # 序列化解包 req = self.unpack(reqb) - + # 获取函数名和参数 name, args, kwargs = req - + # 获取引擎中对应的函数对象,并执行调用,如果有异常则捕捉后返回 try: func = self.__functions[name] @@ -169,13 +169,13 @@ class RpcServer(RpcObject): rep = [True, r] except Exception as e: rep = [False, traceback.format_exc()] - + # 序列化打包 repb = self.pack(rep) - + # 通过请求响应socket返回调用结果 self.__socketREP.send(repb) - + #---------------------------------------------------------------------- def publish(self, topic, data): """ @@ -185,10 +185,10 @@ class RpcServer(RpcObject): """ # 序列化数据 datab = self.pack(data) - + # 通过广播socket发送数据 self.__socketPUB.send_multipart([topic, datab]) - + #---------------------------------------------------------------------- def register(self, func): """注册函数""" @@ -198,24 +198,24 @@ class RpcServer(RpcObject): ######################################################################## class RpcClient(RpcObject): """RPC客户端""" - + #---------------------------------------------------------------------- def __init__(self, reqAddress, subAddress): """Constructor""" super(RpcClient, self).__init__() - + # zmq端口相关 self.__reqAddress = reqAddress self.__subAddress = subAddress - + self.__context = zmq.Context() self.__socketREQ = self.__context.socket(zmq.REQ) # 请求发出socket - self.__socketSUB = self.__context.socket(zmq.SUB) # 广播订阅socket + self.__socketSUB = self.__context.socket(zmq.SUB) # 广播订阅socket # 工作线程相关,用于处理服务器推送的数据 self.__active = False # 客户端的工作状态 self.__thread = threading.Thread(target=self.run) # 客户端的工作线程 - + #---------------------------------------------------------------------- def __getattr__(self, name): """实现远程调用功能""" @@ -223,49 +223,49 @@ class RpcClient(RpcObject): def dorpc(*args, **kwargs): # 生成请求 req = [name, args, kwargs] - + # 序列化打包请求 reqb = self.pack(req) - + # 发送请求并等待回应 self.__socketREQ.send(reqb) repb = self.__socketREQ.recv() - + # 序列化解包回应 rep = self.unpack(repb) - + # 若正常则返回结果,调用失败则触发异常 if rep[0]: return rep[1] else: raise RemoteException(rep[1]) - + return dorpc - + #---------------------------------------------------------------------- def start(self): """启动客户端""" # 连接端口 self.__socketREQ.connect(self.__reqAddress) self.__socketSUB.connect(self.__subAddress) - + # 将服务器设为启动 self.__active = True - + # 启动工作线程 if not self.__thread.isAlive(): self.__thread.start() - + #---------------------------------------------------------------------- def stop(self): """停止客户端""" # 将客户端设为停止 self.__active = False - + # 等待工作线程退出 if self.__thread.isAlive(): self.__thread.join() - + #---------------------------------------------------------------------- def run(self): """客户端运行函数""" @@ -273,32 +273,32 @@ class RpcClient(RpcObject): # 使用poll来等待事件到达,等待1秒(1000毫秒) if not self.__socketSUB.poll(1000): continue - + # 从订阅socket收取广播数据 topic, datab = self.__socketSUB.recv_multipart() - + # 序列化解包 data = self.unpack(datab) # 调用回调函数处理 self.callback(topic, data) - + #---------------------------------------------------------------------- def callback(self, topic, data): """回调函数,必须由用户实现""" raise NotImplementedError - + #---------------------------------------------------------------------- def subscribeTopic(self, topic): """ 订阅特定主题的广播数据 - + 可以使用topic=''来订阅所有的主题 - + 注意topic必须是ascii编码 """ self.__socketSUB.setsockopt(zmq.SUBSCRIBE, topic) - + ######################################################################## class RemoteException(Exception): @@ -308,10 +308,8 @@ class RemoteException(Exception): def __init__(self, value): """Constructor""" self.__value = value - + #---------------------------------------------------------------------- def __str__(self): """输出错误信息""" return self.__value - - \ No newline at end of file