From 5d046d360c893e6d3abcaf922104873d606d5ea6 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Thu, 5 Jul 2018 14:03:04 +0800 Subject: [PATCH] =?UTF-8?q?[Add]=E9=87=8D=E6=96=B0=E5=AE=9E=E7=8E=B0Lbank?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/CryptoTrader/LBANK_connect.json | 5 + examples/CryptoTrader/run.py | 6 +- vnpy/api/bigone/vnbigone.py | 2 +- vnpy/api/lbank/__init__.py | 2 +- vnpy/api/lbank/test.py | 103 +- vnpy/api/lbank/vnlbank.py | 397 +++----- vnpy/trader/gateway/fcoinGateway/__init__.py | 4 +- .../gateway/fcoinGateway/fcoinGateway.py | 4 +- .../gateway/lbankGateway/LBANK_connect.json | 7 +- .../gateway/lbankGateway/lbankGateway.py | 958 ++++++++++-------- 10 files changed, 758 insertions(+), 730 deletions(-) create mode 100644 examples/CryptoTrader/LBANK_connect.json diff --git a/examples/CryptoTrader/LBANK_connect.json b/examples/CryptoTrader/LBANK_connect.json new file mode 100644 index 00000000..59286a19 --- /dev/null +++ b/examples/CryptoTrader/LBANK_connect.json @@ -0,0 +1,5 @@ +{ + "apiKey": "", + "secretKey": "", + "symbols": ["eth_usdt", "sc_btc", "btc_usdt"] +} diff --git a/examples/CryptoTrader/run.py b/examples/CryptoTrader/run.py index 80ce604d..23868fa1 100644 --- a/examples/CryptoTrader/run.py +++ b/examples/CryptoTrader/run.py @@ -23,10 +23,10 @@ from vnpy.trader.uiMainWindow import MainWindow from vnpy.trader.gateway import (huobiGateway, okexGateway, binanceGateway, bitfinexGateway, bitmexGateway, fcoinGateway, - bigoneGateway) + bigoneGateway, lbankGateway) # 加载上层应用 -from vnpy.trader.app import (riskManager, algoTrading) +from vnpy.trader.app import (algoTrading) #---------------------------------------------------------------------- @@ -42,6 +42,7 @@ def main(): me = MainEngine(ee) # 添加交易接口 + me.addGateway(lbankGateway) me.addGateway(bigoneGateway) me.addGateway(fcoinGateway) me.addGateway(bitmexGateway) @@ -51,7 +52,6 @@ def main(): me.addGateway(bitfinexGateway) # 添加上层应用 - me.addApp(riskManager) me.addApp(algoTrading) # 创建主窗口 diff --git a/vnpy/api/bigone/vnbigone.py b/vnpy/api/bigone/vnbigone.py index 425868ce..cf82b685 100644 --- a/vnpy/api/bigone/vnbigone.py +++ b/vnpy/api/bigone/vnbigone.py @@ -95,7 +95,7 @@ class BigoneRestApi(object): if code == 200: callback(d, reqid) else: - self.onError(code, d) + self.onError(code, str(d)) except Exception as e: self.onError(type(e), e.message) diff --git a/vnpy/api/lbank/__init__.py b/vnpy/api/lbank/__init__.py index 2e68766f..080efd11 100644 --- a/vnpy/api/lbank/__init__.py +++ b/vnpy/api/lbank/__init__.py @@ -1,4 +1,4 @@ # encoding: UTF-8 from __future__ import absolute_import -from .vnlbank import LbankApi \ No newline at end of file +from .vnlbank import LbankRestApi, LbankWebsocketApi \ No newline at end of file diff --git a/vnpy/api/lbank/test.py b/vnpy/api/lbank/test.py index 463830e1..5391f15d 100644 --- a/vnpy/api/lbank/test.py +++ b/vnpy/api/lbank/test.py @@ -1,50 +1,67 @@ -# encoding: utf-8 - -from __future__ import absolute_import -from time import time, sleep +# encoding: UTF-8 from six.moves import input +from time import time -from .vnlbank import LbankApi +from vnlbank import LbankRestApi, LbankWebsocketApi + +API_KEY = '132a36ce-ad1c-409a-b48c-09b7877ae49b' +SECRET_KEY = '319320BF875297E7F4050E1195B880E8' + + +#---------------------------------------------------------------------- +def restTest(): + """""" + # 创建API对象并初始化 + api = LbankRestApi() + api.init(API_KEY, SECRET_KEY) + api.start(1) + + # 测试 + #api.addReq('GET', '/currencyPairs.do', {}, api.onData) + #api.addReq('GET', '/accuracy.do', {}, api.onData) + + #api.addReq('GET', '/ticker.do', {'symbol': 'eth_btc'}, api.onData) + #api.addReq('GET', '/depth.do', {'symbol': 'eth_btc', 'size': '5'}, api.onData) + + #api.addReq('post', '/user_info.do', {}, api.onData) + + req = { + 'symbol': 'sc_btc', + 'current_page': '1', + 'page_length': '50' + } + api.addReq('POST', '/orders_info_no_deal.do', req, api.onData) + + # 阻塞 + input() + + +#---------------------------------------------------------------------- +def wsTest(): + """""" + ws = LbankWebsocketApi() + ws.start() + + channels = [ + 'lh_sub_spot_eth_btc_depth_20', + 'lh_sub_spot_eth_btc_trades', + 'lh_sub_spot_eth_btc_ticker' + ] + + for channel in channels: + req = { + 'event': 'addChannel', + 'channel': channel + } + ws.sendReq(req) + + + # 阻塞 + input() if __name__ == '__main__': - apiKey = '' - secretKey = '' + restTest() - # 创建API对象并初始化 - api = LbankApi() - api.DEBUG = True - api.init(apiKey, secretKey, 2) - - # 查询行情 - api.getTicker('btc_cny') - - # 查询深度 - api.getDepth('btc_cny', '60', '1') - - # 查询历史成交 - #api.getTrades('btc_cny', '1', str(int(time()))) - - # 查询K线 - #t = int(time()) - #sleep(300) - #api.getKline('btc_cny', '20', 'minute1', str(t)) - - # 查询账户 - #api.getUserInfo() - - # 发送委托 - #api.createOrder('btc_cny', 'sell', '8000', '0.001') - - # 撤单 - #api.cancelOrder('btc_cny', '725bd2da-73aa-419f-8090-f68488074e8f') - - # 查询委托 - #api.getOrdersInfo('btc_cny', '725bd2da-73aa-419f-8090-f68488074e8f') - - # 查询委托历史 - #api.getOrdersInfoHistory('btc_cny', '0', '1', '100') - - # 阻塞 - input() + #wsTest() \ No newline at end of file diff --git a/vnpy/api/lbank/vnlbank.py b/vnpy/api/lbank/vnlbank.py index d0f9158a..1d04ecf6 100644 --- a/vnpy/api/lbank/vnlbank.py +++ b/vnpy/api/lbank/vnlbank.py @@ -3,306 +3,223 @@ from __future__ import print_function import urllib import hashlib +import ssl +import json +import traceback import requests from Queue import Queue, Empty from threading import Thread -from time import sleep +from multiprocessing.dummy import Pool +from time import time + +import websocket -API_ROOT ="https://api.lbank.info/v1/" - -FUNCTION_TICKER = ('ticker.do', 'get') -FUNCTION_DEPTH = ('depth.do', 'get') -FUNCTION_TRADES = ('trades.do', 'get') -FUNCTION_KLINE = ('kline.do', 'get') - -FUNCTION_USERINFO = ('user_info.do', 'post') -FUNCTION_CREATEORDER = ('create_order.do', 'post') -FUNCTION_CANCELORDER = ('cancel_order.do', 'post') -FUNCTION_ORDERSINFO = ('orders_info.do', 'post') -FUNCTION_ORDERSINFOHISTORY = ('orders_info_history.do', 'post') +REST_HOST = "https://api.lbank.info/v1" +WEBSOCKET_HOST = 'ws://api.lbank.info/ws' -#---------------------------------------------------------------------- -def signature(params, secretKey): - """生成签名""" - params = sorted(params.iteritems(), key=lambda d:d[0], reverse=False) - params.append(('secret_key', secretKey)) - message = urllib.urlencode(params) - - m = hashlib.md5() - m.update(message) - m.digest() - - sig=m.hexdigest() - return sig - ######################################################################## -class LbankApi(object): +class LbankRestApi(object): """""" - DEBUG = True - + #---------------------------------------------------------------------- def __init__(self): """Constructor""" self.apiKey = '' self.secretKey = '' - self.interval = 1 # 每次请求的间隔等待 self.active = False # API工作状态 self.reqID = 0 # 请求编号 - self.reqQueue = Queue() # 请求队列 - self.reqThread = Thread(target=self.processQueue) # 请求处理线程 - + self.queue = Queue() # 请求队列 + self.pool = None # 线程池 + self.sessionDict = {} # 连接池 + #---------------------------------------------------------------------- - def init(self, apiKey, secretKey, interval): + def init(self, apiKey, secretKey): """初始化""" self.apiKey = apiKey self.secretKey = secretKey - self.interval = interval - - self.active = True - self.reqThread.start() #---------------------------------------------------------------------- - def exit(self): + def start(self, n=10): + """""" + if self.active: + return + + self.active = True + self.pool = Pool(n) + self.pool.map_async(self.run, range(n)) + + #---------------------------------------------------------------------- + def close(self): """退出""" self.active = False - if self.reqThread.isAlive(): - self.reqThread.join() + if self.pool: + self.pool.close() + self.pool.join() #---------------------------------------------------------------------- - def processRequest(self, req): + def processReq(self, req, i): """处理请求""" # 读取方法和参数 - api, method = req['function'] - params = req['params'] - url = API_ROOT + api + method, path, params, callback, reqID = req + url = REST_HOST + path # 在参数中增加必须的字段 params['api_key'] = self.apiKey - - # 添加签名 - sign = signature(params, self.secretKey) - params['sign'] = sign + params['sign'] = self.generateSignature(params) # 发送请求 payload = urllib.urlencode(params) - - r = requests.request(method, url, params=payload) - if r.status_code == 200: - data = r.json() - return data - else: - return None + + try: + # 使用会话重用技术,请求延时降低80% + session = self.sessionDict[i] + resp = session.request(method, url, params=payload) + #resp = requests.request(method, url, params=payload) + + code = resp.status_code + d = resp.json() + + if code == 200: + callback(d, reqID) + else: + self.onError(code, str(d)) + + except Exception as e: + self.onError(type(e), e.message) #---------------------------------------------------------------------- - def processQueue(self): - """处理请求队列中的请求""" + def run(self, i): + """连续运行""" + self.sessionDict[i] = requests.Session() + while self.active: try: - req = self.reqQueue.get(block=True, timeout=1) # 获取请求的阻塞为一秒 - callback = req['callback'] - reqID = req['reqID'] - - data = self.processRequest(req) - - # 请求失败 - if data is None: - error = u'请求失败' - self.onError(error, req, reqID) - elif 'error_code' in data: - error = u'请求出错,错误代码:%s' % data['error_code'] - self.onError(error, req, reqID) - # 请求成功 - else: - if self.DEBUG: - print(callback.__name__) - callback(data, req, reqID) - - # 流控等待 - sleep(self.interval) - + req = self.queue.get(block=True, timeout=1) # 获取请求的阻塞为一秒 + self.processReq(req, i) except Empty: pass #---------------------------------------------------------------------- - def sendRequest(self, function, params, callback): + def addReq(self, method, path, params, callback): """发送请求""" # 请求编号加1 self.reqID += 1 # 生成请求字典并放入队列中 - req = {} - req['function'] = function - req['params'] = params - req['callback'] = callback - req['reqID'] = self.reqID - self.reqQueue.put(req) + req = (method, path, params, callback, self.reqID) + self.queue.put(req) # 返回请求编号 return self.reqID - + #---------------------------------------------------------------------- - def onError(self, error, req, reqID): + def generateSignature(self, params): + """生成签名""" + params = sorted(params.iteritems(), key=lambda d:d[0], reverse=False) + params.append(('secret_key', self.secretKey)) + message = urllib.urlencode(params) + + m = hashlib.md5() + m.update(message) + m.digest() + + sig = m.hexdigest() + return sig + + #---------------------------------------------------------------------- + def onError(self, code, msg): """错误推送""" - print(error, req, reqID) + print(code, msg) - ############################################### - # 行情接口 - ############################################### + #---------------------------------------------------------------------- + def onData(self, data, reqID): + """""" + print(data, reqID) + + +######################################################################## +class LbankWebsocketApi(object): + """Websocket API""" + + #---------------------------------------------------------------------- + def __init__(self): + """Constructor""" + self.ws = None + self.thread = None + self.active = False #---------------------------------------------------------------------- - def getTicker(self, symbol): - """查询行情""" - function = FUNCTION_TICKER - params = {'symbol': symbol} - callback = self.onGetTicker - return self.sendRequest(function, params, callback) - - # ---------------------------------------------------------------------- - def getDepth(self, symbol, size, merge): - """查询深度""" - function = FUNCTION_DEPTH - params = { - 'symbol': symbol, - 'size': size, - 'mege': merge - } - callback = self.onGetDepth - return self.sendRequest(function, params, callback) - - # ---------------------------------------------------------------------- - def getTrades(self, symbol, size, time): - """查询历史成交""" - function = FUNCTION_TRADES - params = { - 'symbol': symbol, - 'size': size, - 'time': time - } - callback = self.onGetTrades - return self.sendRequest(function, params, callback) - - # ---------------------------------------------------------------------- - def getKline(self, symbol, size, type_, time): - """查询K线""" - function = FUNCTION_KLINE - params = { - 'symbol': symbol, - 'size': size, - 'type': type_, - 'time': time - } - callback = self.onGetKline - return self.sendRequest(function, params, callback) - + def start(self): + """启动""" + self.ws = websocket.create_connection(WEBSOCKET_HOST, + sslopt={'cert_reqs': ssl.CERT_NONE}) + + self.active = True + self.thread = Thread(target=self.run) + self.thread.start() + + self.onConnect() + #---------------------------------------------------------------------- - def onGetTicker(self, data, req, reqID): - """查询行情回调""" - print(data, reqID) + def reconnect(self): + """重连""" + self.ws = websocket.create_connection(WEBSOCKET_HOST, + sslopt={'cert_reqs': ssl.CERT_NONE}) + + self.onConnect() + + #---------------------------------------------------------------------- + def run(self): + """运行""" + while self.active: + try: + stream = self.ws.recv() + data = json.loads(stream) + self.onData(data) + except: + msg = traceback.format_exc() + print(msg) + self.onError(msg) + self.reconnect() + + #---------------------------------------------------------------------- + def close(self): + """关闭""" + self.active = False + + if self.thread: + self.ws.shutdown() + self.thread.join() + + #---------------------------------------------------------------------- + def onConnect(self): + """连接回调""" + print('connected') + + #---------------------------------------------------------------------- + def onData(self, data): + """数据回调""" + print('-' * 30) + l = data.keys() + l.sort() + for k in l: + print(k, data[k]) + + #---------------------------------------------------------------------- + def onError(self, msg): + """错误回调""" + print(msg) + + #---------------------------------------------------------------------- + def sendReq(self, req): + """发出请求""" + self.ws.send(json.dumps(req)) - # ---------------------------------------------------------------------- - def onGetDepth(self, data, req, reqID): - """查询深度回调""" - print(data, reqID) - - # ---------------------------------------------------------------------- - def onGetTrades(self, data, req, reqID): - """查询历史成交""" - print(data, reqID) - - # ---------------------------------------------------------------------- - def onGetKline(self, data, req, reqID): - """查询K线回报""" - print(data, reqID) - - ############################################### - # 交易接口 - ############################################### - - # ---------------------------------------------------------------------- - def getUserInfo(self): - """查询账户信息""" - function = FUNCTION_USERINFO - params = {} - callback = self.onGetUserInfo - return self.sendRequest(function, params, callback) - - # ---------------------------------------------------------------------- - def createOrder(self, symbol, type_, price, amount): - """发送委托""" - function = FUNCTION_CREATEORDER - params = { - 'symbol': symbol, - 'type': type_, - 'price': price, - 'amount': amount - } - callback = self.onCreateOrder - return self.sendRequest(function, params, callback) - - # ---------------------------------------------------------------------- - def cancelOrder(self, symbol, orderId): - """撤单""" - function = FUNCTION_CANCELORDER - params = { - 'symbol': symbol, - 'order_id': orderId - } - callback = self.onCancelOrder - return self.sendRequest(function, params, callback) - - # ---------------------------------------------------------------------- - def getOrdersInfo(self, symbol, orderId): - """查询委托""" - function = FUNCTION_ORDERSINFO - params = { - 'symbol': symbol, - 'order_id': orderId - } - callback = self.onGetOrdersInfo - return self.sendRequest(function, params, callback) - - # ---------------------------------------------------------------------- - def getOrdersInfoHistory(self, symbol, status, currentPage, pageLength): - """撤单""" - function = FUNCTION_ORDERSINFOHISTORY - params = { - 'symbol': symbol, - 'status': status, - 'current_page': currentPage, - 'page_length': pageLength - } - callback = self.onGetOrdersInfoHistory - return self.sendRequest(function, params, callback) - - # ---------------------------------------------------------------------- - def onGetUserInfo(self, data, req, reqID): - """查询账户信息""" - print(data, reqID) - - # ---------------------------------------------------------------------- - def onCreateOrder(self, data, req, reqID): - """委托回报""" - print(data, reqID) - - # ---------------------------------------------------------------------- - def onCancelOrder(self, data, req, reqID): - """撤单回报""" - print(data, reqID) - - # ---------------------------------------------------------------------- - def onGetOrdersInfo(self, data, req, reqID): - """查询委托回报""" - print(data, reqID) - - # ---------------------------------------------------------------------- - def onGetOrdersInfoHistory(self, data, req, reqID): - """撤单回报""" - print(data, reqID) diff --git a/vnpy/trader/gateway/fcoinGateway/__init__.py b/vnpy/trader/gateway/fcoinGateway/__init__.py index 2c6a4ee6..bd76f422 100644 --- a/vnpy/trader/gateway/fcoinGateway/__init__.py +++ b/vnpy/trader/gateway/fcoinGateway/__init__.py @@ -1,9 +1,9 @@ # encoding: UTF-8 from vnpy.trader import vtConstant -from .fcoinGateway import FcoinGateay +from .fcoinGateway import FcoinGateway -gatewayClass = FcoinGateay +gatewayClass = FcoinGateway gatewayName = 'FCOIN' gatewayDisplayName = 'FCOIN' gatewayType = vtConstant.GATEWAYTYPE_BTC diff --git a/vnpy/trader/gateway/fcoinGateway/fcoinGateway.py b/vnpy/trader/gateway/fcoinGateway/fcoinGateway.py index 0832ab78..637b49e6 100644 --- a/vnpy/trader/gateway/fcoinGateway/fcoinGateway.py +++ b/vnpy/trader/gateway/fcoinGateway/fcoinGateway.py @@ -39,13 +39,13 @@ priceTypeMap[PRICETYPE_MARKETPRICE] = 'market' ######################################################################## -class FcoinGateay(VtGateway): +class FcoinGateway(VtGateway): """FCOIN接口""" #---------------------------------------------------------------------- def __init__(self, eventEngine, gatewayName=''): """Constructor""" - super(FcoinGateay, self).__init__(eventEngine, gatewayName) + super(FcoinGateway, self).__init__(eventEngine, gatewayName) self.restApi = RestApi(self) self.wsApi = WebsocketApi(self) diff --git a/vnpy/trader/gateway/lbankGateway/LBANK_connect.json b/vnpy/trader/gateway/lbankGateway/LBANK_connect.json index 5df1ee29..59286a19 100644 --- a/vnpy/trader/gateway/lbankGateway/LBANK_connect.json +++ b/vnpy/trader/gateway/lbankGateway/LBANK_connect.json @@ -1,6 +1,5 @@ { - "apiKey": "请在链行官网申请", - "secretKey": "请在链行官网申请", - "interval": 1, - "debug": false + "apiKey": "", + "secretKey": "", + "symbols": ["eth_usdt", "sc_btc", "btc_usdt"] } diff --git a/vnpy/trader/gateway/lbankGateway/lbankGateway.py b/vnpy/trader/gateway/lbankGateway/lbankGateway.py index 3cade80e..744eec64 100644 --- a/vnpy/trader/gateway/lbankGateway/lbankGateway.py +++ b/vnpy/trader/gateway/lbankGateway/lbankGateway.py @@ -1,60 +1,54 @@ # encoding: UTF-8 ''' -vn.lhang的gateway接入 +vnpy.api.lhang的gateway接入 ''' -from __future__ import print_function +from __future__ import print_function import os import json from datetime import datetime from time import sleep +from copy import copy -from vnpy.api.lbank import LbankApi +from vnpy.api.lbank import LbankRestApi, LbankWebsocketApi from vnpy.trader.vtGateway import * from vnpy.trader.vtFunction import getJsonPath -SYMBOL_BTCCNY = 'BTCCNY' -SYMBOL_ZECCNY = 'ZECCNY' +directionMap = {} +directionMap[DIRECTION_LONG] = 'buy' +directionMap[DIRECTION_SHORT] = 'sell' +directionMapReverse = {v:k for k,v in directionMap.items()} -SYMBOL_MAP = {} -SYMBOL_MAP['btc_cny'] = SYMBOL_BTCCNY -SYMBOL_MAP['zec_cny'] = SYMBOL_ZECCNY -SYMBOL_MAP_REVERSE = {v: k for k, v in SYMBOL_MAP.items()} - - -DIRECTION_MAP = {} -DIRECTION_MAP['buy'] = DIRECTION_LONG -DIRECTION_MAP['sell'] = DIRECTION_SHORT - -STATUS_MAP = {} -STATUS_MAP[0] = STATUS_NOTTRADED -STATUS_MAP[1] = STATUS_PARTTRADED -STATUS_MAP[2] = STATUS_ALLTRADED -STATUS_MAP[4] = STATUS_UNKNOWN -STATUS_MAP[-1] = STATUS_CANCELLED +statusMapReverse = {} +statusMapReverse[0] = STATUS_NOTTRADED +statusMapReverse[1] = STATUS_PARTTRADED +statusMapReverse[2] = STATUS_ALLTRADED +statusMapReverse[4] = STATUS_UNKNOWN +statusMapReverse[-1] = STATUS_CANCELLED ######################################################################## class LbankGateway(VtGateway): - """LBANK接口""" + """FCOIN接口""" #---------------------------------------------------------------------- - def __init__(self, eventEngine, gatewayName='LBANK'): + def __init__(self, eventEngine, gatewayName=''): """Constructor""" super(LbankGateway, self).__init__(eventEngine, gatewayName) + + self.restApi = RestApi(self) - self.api = LbankApi(self) - + self.qryEnabled = False # 是否要启动循环查询 + self.fileName = self.gatewayName + '_connect.json' - self.filePath = getJsonPath(self.fileName, __file__) - + self.filePath = getJsonPath(self.fileName, __file__) + #---------------------------------------------------------------------- def connect(self): """连接""" - # 载入json文件 try: f = file(self.filePath) except IOError: @@ -63,466 +57,562 @@ class LbankGateway(VtGateway): log.logContent = u'读取连接配置出错,请检查' self.onLog(log) return - + # 解析json文件 setting = json.load(f) try: - accessKey = str(setting['apiKey']) + apiKey = str(setting['apiKey']) secretKey = str(setting['secretKey']) - interval = setting['interval'] - debug = setting['debug'] + symbols = setting['symbols'] except KeyError: log = VtLogData() log.gatewayName = self.gatewayName log.logContent = u'连接配置缺少字段,请检查' self.onLog(log) - return - - # 初始化接口 - self.api.connect(accessKey, secretKey, interval, debug) - self.writeLog(u'接口初始化成功') - - # 启动查询 + return + + # 创建行情和交易接口对象 + self.restApi.connect(apiKey, secretKey, symbols) + + # 初始化并启动查询 self.initQuery() - self.startQuery() + + #---------------------------------------------------------------------- + def subscribe(self, subscribeReq): + """订阅行情""" + pass + + #---------------------------------------------------------------------- + def sendOrder(self, orderReq): + """发单""" + return self.restApi.sendOrder(orderReq) + + #---------------------------------------------------------------------- + def cancelOrder(self, cancelOrderReq): + """撤单""" + self.restApi.cancelOrder(cancelOrderReq) + + #---------------------------------------------------------------------- + def close(self): + """关闭""" + self.restApi.close() + #---------------------------------------------------------------------- + def initQuery(self): + """初始化连续查询""" + if self.qryEnabled: + # 需要循环的查询函数列表 + self.qryFunctionList = [self.restApi.qryAccount, + self.restApi.qryWorkingOrder, + self.restApi.qryCompletedOrder, + self.restApi.qryMarketData] + + self.qryCount = 0 # 查询触发倒计时 + self.qryTrigger = 1 # 查询触发点 + self.qryNextFunction = 0 # 上次运行的查询函数索引 + + self.startQuery() + + #---------------------------------------------------------------------- + def query(self, event): + """注册到事件处理引擎上的查询函数""" + self.qryCount += 1 + + if self.qryCount > self.qryTrigger: + # 清空倒计时 + self.qryCount = 0 + + # 执行查询函数 + function = self.qryFunctionList[self.qryNextFunction] + function() + + # 计算下次查询函数的索引,如果超过了列表长度,则重新设为0 + self.qryNextFunction += 1 + if self.qryNextFunction == len(self.qryFunctionList): + self.qryNextFunction = 0 + + #---------------------------------------------------------------------- + def startQuery(self): + """启动连续查询""" + self.eventEngine.register(EVENT_TIMER, self.query) + + #---------------------------------------------------------------------- + def setQryEnabled(self, qryEnabled): + """设置是否要启动循环查询""" + self.qryEnabled = qryEnabled + + +######################################################################## +class RestApi(LbankRestApi): + """REST API实现""" + + #---------------------------------------------------------------------- + def __init__(self, gateway): + """Constructor""" + super(RestApi, self).__init__() + + self.gateway = gateway # gateway对象 + self.gatewayName = gateway.gatewayName # gateway对象名称 + + self.localID = 0 + self.tradeID = 0 + + self.orderDict = {} # sysID:order + self.localSysDict = {} # localID:sysID + self.reqOrderDict = {} # reqID:order + self.cancelDict = {} # localID:req + + self.tickDict = {} # symbol:tick + self.reqSymbolDict = {} + + #---------------------------------------------------------------------- + def connect(self, apiKey, apiSecret, symbols): + """连接服务器""" + self.init(apiKey, apiSecret) + self.start() + + self.symbols = symbols + for symbol in symbols: + tick = VtTickData() + tick.gatewayName = self.gatewayName + tick.symbol = symbol + tick.exchange = EXCHANGE_LBANK + tick.vtSymbol = '.'.join([tick.symbol, tick.exchange]) + self.tickDict[symbol] = tick + + self.writeLog(u'REST API启动成功') + self.qryContract() + #---------------------------------------------------------------------- def writeLog(self, content): """发出日志""" log = VtLogData() log.gatewayName = self.gatewayName log.logContent = content - self.onLog(log) + self.gateway.onLog(log) - #---------------------------------------------------------------------- - def subscribe(self, subscribeReq): - """订阅行情,自动订阅全部行情,无需实现""" - pass - #---------------------------------------------------------------------- def sendOrder(self, orderReq): - """发单""" - self.api.sendOrder(orderReq) + """""" + self.localID += 1 + orderID = str(self.localID) + vtOrderID = '.'.join([self.gatewayName, orderID]) + req = { + 'symbol': orderReq.symbol, + 'type': directionMap[orderReq.direction], + 'price': orderReq.price, + 'amount': orderReq.volume + } + + reqid = self.addReq('POST', '/create_order.do', req, self.onSendOrder) + + # 缓存委托数据对象 + order = VtOrderData() + order.gatewayName = self.gatewayName + order.symbol = orderReq.symbol + order.exchange = EXCHANGE_LBANK + order.vtSymbol = '.'.join([order.symbol, order.exchange]) + order.orderID = orderID + order.vtOrderID = vtOrderID + order.price = orderReq.price + order.totalVolume = orderReq.volume + order.direction = orderReq.direction + order.status = STATUS_UNKNOWN + + self.reqOrderDict[reqid] = order + + return vtOrderID + #---------------------------------------------------------------------- def cancelOrder(self, cancelOrderReq): - """撤单""" - self.api.cancel(cancelOrderReq) + """""" + localID = cancelOrderReq.orderID + if localID in self.localSysDict: + sysID = self.localSysDict[localID] + order = self.orderDict[sysID] + req = { + 'symbol': order.symbol, + 'order_id': sysID + } + self.addReq('POST', '/cancel_order.do', req, self.onCancelOrder) + + else: + self.cancelDict[localID] = cancelOrderReq + + #---------------------------------------------------------------------- + def qryContract(self): + """""" + self.addReq('GET', '/accuracy.do', {}, self.onQryContract) + + #---------------------------------------------------------------------- + def qryCompletedOrder(self): + """""" + for symbol in self.symbols: + req = { + 'symbol': symbol, + 'current_page': '1', + 'page_length': '100' + } + self.addReq('POST', '/orders_info_history.do', req, self.onQryOrder) + + #---------------------------------------------------------------------- + def qryWorkingOrder(self): + """""" + for symbol in self.symbols: + req = { + 'symbol': symbol, + 'current_page': '1', + 'page_length': '100' + } + self.addReq('POST', '/orders_info_no_deal.do', req, self.onQryOrder) + #---------------------------------------------------------------------- def qryAccount(self): - """查询账户资金""" - pass - - #---------------------------------------------------------------------- - def qryPosition(self): - """查询持仓""" - pass - - #---------------------------------------------------------------------- - def close(self): - """关闭""" - self.api.exit() - - #---------------------------------------------------------------------- - def initQuery(self): - """初始化连续查询""" - if self.qryEnabled: - self.qryFunctionList = [self.api.queryPrice, - self.api.queryWorkingOrders, - self.api.queryAccount] - self.startQuery() + """""" + self.addReq('POST', '/user_info.do', {}, self.onQryAccount) #---------------------------------------------------------------------- - def query(self, event): - """注册到事件处理引擎上的查询函数""" - for function in self.qryFunctionList: - function() + def qryDepth(self): + """""" + for symbol in self.symbols: + req = { + 'symbol': symbol, + 'size': '5' + } + i = self.addReq('GET', '/depth.do', req, self.onQryDepth) + self.reqSymbolDict[i] = symbol + + #---------------------------------------------------------------------- + def qryTicker(self): + """""" + for symbol in self.symbols: + req = {'symbol': symbol} + i = self.addReq('GET', '/ticker.do', req, self.onQryTicker) + self.reqSymbolDict[i] = symbol + + #---------------------------------------------------------------------- + def qryMarketData(self): + """""" + self.qryDepth() + self.qryTicker() + + #---------------------------------------------------------------------- + def onSendOrder(self, data, reqid): + """""" + order = self.reqOrderDict[reqid] + localID = order.orderID + sysID = data['order_id'] + + self.localSysDict[localID] = sysID + self.orderDict[sysID] = order + + self.gateway.onOrder(order) + + # 发出等待的撤单委托 + if localID in self.cancelDict: + req = self.cancelDict[localID] + self.cancelOrder(req) + del self.cancelDict[localID] + + #---------------------------------------------------------------------- + def onCancelOrder(self, data, reqid): + """""" + pass + + #---------------------------------------------------------------------- + def onError(self, code, error): + """""" + msg = u'发生异常,错误代码:%s,错误信息:%s' %(code, error) + self.writeLog(msg) + + #---------------------------------------------------------------------- + def onQryOrder(self, data, reqid): + """""" + if 'orders' not in data: + return + + if not isinstance(data['orders'], list): + return + + data['orders'].reverse() + + for d in data['orders']: + orderUpdated = False + tradeUpdated = False + + # 获取委托对象 + sysID = d['order_id'] + if sysID in self.orderDict: + order = self.orderDict[sysID] + else: + order = VtOrderData() + order.gatewayName = self.gatewayName + order.symbol = d['symbol'] + order.exchange = EXCHANGE_LBANK + order.vtSymbol = '.'.join([order.symbol, order.exchange]) + + self.localID += 1 + localID = str(self.localID) + self.localSysDict[localID] = sysID + + order.orderID = localID + order.vtOrderID = '.'.join([order.gatewayName, order.orderID]) + + order.direction = directionMapReverse[d['type']] + order.price = float(d['price']) + order.totalVolume = float(d['amount']) + + dt = datetime.fromtimestamp(d['create_time']/1000) + order.orderTime = dt.strftime('%H:%M:%S') + + self.orderDict[sysID] = order + orderUpdated = True + + # 检查是否委托有变化 + newTradedVolume = float(d['deal_amount']) + newStatus = statusMapReverse[d['status']] + + if newTradedVolume != float(order.tradedVolume) or newStatus != order.status: + orderUpdated = True + + if newTradedVolume != float(order.tradedVolume): + tradeUpdated = True + newVolume = newTradedVolume - order.tradedVolume + + order.tradedVolume = newTradedVolume + order.status = newStatus + + # 若有更新才推送 + if orderUpdated: + self.gateway.onOrder(order) + + if tradeUpdated: + # 推送成交 + trade = VtTradeData() + trade.gatewayName = order.gatewayName + + trade.symbol = order.symbol + trade.vtSymbol = order.vtSymbol + + trade.orderID = order.orderID + trade.vtOrderID = order.vtOrderID + + self.tradeID += 1 + trade.tradeID = str(self.tradeID) + trade.vtTradeID = '.'.join([self.gatewayName, trade.tradeID]) + + trade.direction = order.direction + trade.price = order.price + trade.volume = newTradedVolume + trade.tradeTime = datetime.now().strftime('%H:%M:%S') + + self.gateway.onTrade(trade) + #---------------------------------------------------------------------- - def startQuery(self): - """启动连续查询""" - self.eventEngine.register(EVENT_TIMER, self.query) + def onQryAccount(self, data, reqid): + """""" + info = data['info'] + asset = info['asset'] + free = info['free'] + freeze = info['freeze'] + + for currency in asset.keys(): + account = VtAccountData() + account.gatewayName = self.gatewayName + + account.accountID = currency + account.vtAccountID = '.'.join([self.gatewayName, account.accountID]) + account.balance = float(asset[currency]) + account.available = float(free[currency]) + + self.gateway.onAccount(account) #---------------------------------------------------------------------- - def setQryEnabled(self, qryEnabled): - """设置是否要启动循环查询""" - self.qryEnabled = qryEnabled + def onQryContract(self, data, reqid): + """""" + for d in data: + contract = VtContractData() + contract.gatewayName = self.gatewayName + + contract.symbol = str(d['symbol']) + contract.exchange = EXCHANGE_LBANK + contract.vtSymbol = '.'.join([contract.symbol, contract.exchange]) + contract.name = contract.vtSymbol + contract.productClass = PRODUCT_SPOT + contract.priceTick = pow(10, -int(d['priceAccuracy'])) + contract.size = 1 + + self.gateway.onContract(contract) + + self.writeLog(u'合约信息查询完成') + #---------------------------------------------------------------------- + def onQryTicker(self, data, reqid): + """""" + ticker = data['ticker'] + + symbol = self.reqSymbolDict.pop(reqid) + tick = self.tickDict[symbol] + + tick.highPrice = float(ticker['high']) + tick.lowPrice = float(ticker['low']) + tick.lastPrice = float(ticker['latest']) + tick.volume = float(ticker['vol']) + + tick.datetime = datetime.fromtimestamp(int(data['timestamp']/1000)) + tick.date = tick.datetime.strftime('%Y%m%d') + tick.time = tick.datetime.strftime('%H:%M:%S') + + if tick.bidPrice1: + self.gateway.onTick(copy(tick)) + + #---------------------------------------------------------------------- + def onQryDepth(self, data, reqid): + """""" + symbol = self.reqSymbolDict.pop(reqid) + tick = self.tickDict[symbol] + + bids = data['bids'] + asks = data['asks'] + + tick.bidPrice1, tick.bidVolume1 = bids[0] + tick.bidPrice2, tick.bidVolume2 = bids[1] + tick.bidPrice3, tick.bidVolume3 = bids[2] + tick.bidPrice4, tick.bidVolume4 = bids[3] + tick.bidPrice5, tick.bidVolume5 = bids[4] + + tick.askPrice1, tick.askVolume1 = asks[0] + tick.askPrice2, tick.askVolume2 = asks[1] + tick.askPrice3, tick.askVolume3 = asks[2] + tick.askPrice4, tick.askVolume4 = asks[3] + tick.askPrice5, tick.askVolume5 = asks[4] + + if tick.lastPrice: + self.gateway.onTick(copy(tick)) + ######################################################################## -class LbankApi(LbankApi): +class WebsocketApi(LbankWebsocketApi): """""" #---------------------------------------------------------------------- def __init__(self, gateway): """Constructor""" - super(LbankApi, self).__init__() + super(WebsocketApi, self).__init__() self.gateway = gateway self.gatewayName = gateway.gatewayName - self.interval = 1 - - self.localID = 0 # 本地委托号 - self.localSystemDict = {} # key:localID, value:systemID - self.systemLocalDict = {} # key:systemID, value:localID - self.workingOrderDict = {} # key:localID, value:order - self.reqLocalDict = {} # key:reqID, value:localID - self.cancelDict = {} # key:localID, value:cancelOrderReq - - self.tradeID = 0 - - self.tickDict = {} # key:symbol, value:tick - + self.symbols = [] + self.channelTickDict = {} + #---------------------------------------------------------------------- - def onError(self, error, req, reqID): - """错误推送""" - err = VtErrorData() - err.gatewayName = self.gatewayName - err.errorMsg = str(error) - err.errorTime = datetime.now().strftime('%H:%M:%S.%f')[:-3] - self.gateway.onError(err) - + def connect(self, symbols): + """""" + self.symbols = symbols + self.start() + #---------------------------------------------------------------------- - def onGetTicker(self, data, req, reqID): - """查询行情回调""" - ticker = data['ticker'] - params = req['params'] - symbol = SYMBOL_MAP[params['symbol']] - - if symbol not in self.tickDict: + def onConnect(self): + """连接回调""" + self.writeLog(u'Websocket API连接成功') + self.subscribe() + + #---------------------------------------------------------------------- + def subscribe(self): + """""" + for symbol in self.symbols: tick = VtTickData() tick.gatewayName = self.gatewayName - tick.symbol = symbol tick.exchange = EXCHANGE_LBANK tick.vtSymbol = '.'.join([tick.symbol, tick.exchange]) - self.tickDict[symbol] = tick - else: - tick = self.tickDict[symbol] - - tick.highPrice = float(ticker['high']) - tick.lowPrice = float(ticker['low']) - tick.lastPrice = float(ticker['latest']) - tick.openPrice = tick.lastPrice - float(ticker['change']) - tick.volume = ticker['vol'] - - # ---------------------------------------------------------------------- - def onGetDepth(self, data, req, reqID): - """查询深度回调""" - params = req['params'] - symbol = SYMBOL_MAP[params['symbol']] - if symbol not in self.tickDict: - tick = VtTickData() - tick.gatewayName = self.gatewayName - - tick.symbol = symbol - tick.exchange = EXCHANGE_LBANK - tick.vtSymbol = '.'.join([tick.symbol, tick.exchange]) - self.tickDict[symbol] = tick - else: - tick = self.tickDict[symbol] - - tick.bidPrice1, tick.bidVolume1 = data['bids'][0] - tick.bidPrice2, tick.bidVolume2 = data['bids'][1] - tick.bidPrice3, tick.bidVolume3 = data['bids'][2] - tick.bidPrice4, tick.bidVolume4 = data['bids'][3] - tick.bidPrice5, tick.bidVolume5 = data['bids'][4] - - tick.askPrice1, tick.askVolume1 = data['asks'][0] - tick.askPrice2, tick.askVolume2 = data['asks'][1] - tick.askPrice3, tick.askVolume3 = data['asks'][2] - tick.askPrice4, tick.askVolume4 = data['asks'][3] - tick.askPrice5, tick.askVolume5 = data['asks'][4] - - now = datetime.now() - tick.time = now.strftime('%H:%M:%S.%f')[:-3] - tick.date = now.strftime('%Y%m%d') - - self.gateway.onTick(tick) - - # ---------------------------------------------------------------------- - def onGetTrades(self, data, req, reqID): - """查询历史成交""" - print(data, reqID) - - # ---------------------------------------------------------------------- - def onGetKline(self, data, req, reqID): - print(data, reqID) - - # ---------------------------------------------------------------------- - def onGetUserInfo(self, data, req, reqID): - """查询K线回报""" - d = data['info'] - account = VtAccountData() - account.gatewayName = self.gatewayName - account.accountID = self.gatewayName - account.vtAccountID = '.'.join([account.accountID, self.gatewayName]) - account.balance = d['asset']['net'] - self.gateway.onAccount(account) - - # 推送持仓数据 - posCny = VtPositionData() - posCny.gatewayName = self.gatewayName - posCny.symbol = 'CNY' - posCny.exchange = EXCHANGE_LBANK - posCny.vtSymbol = '.'.join([posCny.symbol, posCny.exchange]) - posCny.vtPositionName = posCny.vtSymbol - posCny.frozen = d['freeze']['cny'] - posCny.position = posCny.frozen + d['free']['cny'] - self.gateway.onPosition(posCny) - - posBtc = VtPositionData() - posBtc.gatewayName = self.gatewayName - posBtc.symbol = 'BTC' - posBtc.exchange = EXCHANGE_LBANK - posBtc.vtSymbol = '.'.join([posBtc.symbol, posBtc.exchange]) - posBtc.vtPositionName = posBtc.vtSymbol - posBtc.frozen = d['freeze']['btc'] - posBtc.position = posBtc.frozen + d['free']['btc'] - self.gateway.onPosition(posBtc) - - posZec = VtPositionData() - posZec.gatewayName = self.gatewayName - posZec.symbol = 'ZEC' - posZec.exchange = EXCHANGE_LBANK - posZec.vtSymbol = '.'.join([posZec.symbol, posZec.exchange]) - posZec.vtPositionName = posZec.vtSymbol - posZec.frozen = d['freeze']['zec'] - posZec.position = posZec.frozen + d['free']['zec'] - self.gateway.onPosition(posZec) - - # 查询历史委托 - self.queryOrders() - - # ---------------------------------------------------------------------- - def onCreateOrder(self, data, req, reqID): - """发单回调""" - localID = self.reqLocalDict[reqID] - systemID = data['id'] - self.localSystemDict[localID] = systemID - self.systemLocalDict[systemID] = localID - - # 撤单 - if localID in self.cancelDict: - req = self.cancelDict[localID] - self.cancel(req) - del self.cancelDict[localID] - - # 推送委托信息 - order = self.workingOrderDict[localID] - if data['result'] == 'success': - order.status = STATUS_NOTTRADED - self.gateway.onOrder(order) - - # ---------------------------------------------------------------------- - def onCancelOrder(self, data, req, reqID): - """撤单回调""" - if data['result'] == 'success': - systemID = req['params']['id'] - localID = self.systemLocalDict[systemID] - - order = self.workingOrderDict[localID] - order.status = STATUS_CANCELLED - - del self.workingOrderDict[localID] - self.gateway.onOrder(order) - - # ---------------------------------------------------------------------- - def onGetOrdersInfo(self, data, req, reqID): - """查询委托回报""" - if 'orders' in data: - for d in data['orders']: - systemID = d['order_id'] - localID = self.systemLocalDict[systemID] - order = self.workingOrderDict.get(localID, None) - if not order: - return - - # 记录最新成交的金额 - newTradeVolume = float(d['deal_amount']) - order.tradedVolume - if newTradeVolume: - trade = VtTradeData() - trade.gatewayName = self.gatewayName - trade.symbol = order.symbol - trade.vtSymbol = order.vtSymbol - - self.tradeID += 1 - trade.tradeID = str(self.tradeID) - trade.vtTradeID = '.'.join([trade.tradeID, trade.gatewayName]) - - trade.volume = newTradeVolume - trade.price = d['avg_price'] - trade.direction = order.direction - trade.offset = order.offset - trade.exchange = order.exchange - trade.tradeTime = datetime.now().strftime('%H:%M:%S.%f')[:-3] - - self.gateway.onTrade(trade) - - # 更新委托状态 - order.tradedVolume = float(d['deal_amount']) - order.status = STATUS_MAP.get(d['status'], STATUS_UNKNOWN) - - if newTradeVolume: - self.gateway.onOrder(order) - - if order.status == STATUS_ALLTRADED or order.status == STATUS_CANCELLED: - del self.workingOrderDict[order.orderID] - - # ---------------------------------------------------------------------- - def onGetOrdersInfoHistory(self, data, req, reqID): - """撤单回报""" - if 'orders' in data: - for d in data['orders']: - order = VtOrderData() - order.gatewayName = self.gatewayName - - order.symbol = SYMBOL_MAP[data['symbol']] - order.exchange = EXCHANGE_LBANK - order.vtSymbol = '.'.join([order.symbol, order.exchange]) - - systemID = d['order_id'] - self.localID += 1 - localID = str(self.localID) - self.systemLocalDict[systemID] = localID - self.localSystemDict[localID] = systemID - order.orderID = localID - order.vtOrderID = '.'.join([order.orderID, order.gatewayName]) - - order.totalVolume = float(d['amount']) - order.tradedVolume = float(d['deal_amount']) - order.price = float(d['price']) - order.direction = DIRECTION_MAP[d['type']] - order.offset = OFFSET_NONE - order.orderTime = datetime.fromtimestamp(d['create_time'], '%H:%M:%S') - - # 委托状态 - if order.tradedVolume == 0: - order.status = STATUS_NOTTRADED - else: - order.status = STATUS_PARTTRADED - - # 缓存病推送 - self.workingOrderDict[localID] = order - self.gateway.onOrder(order) - - #---------------------------------------------------------------------- - def connect(self, apiKey, secretKey, interval, debug): - """初始化""" - self.interval = interval - self.DEBUG = debug - - self.init(apiKey, secretKey, self.interval) - - # 推送合约信息 - contract = VtContractData() - contract.gatewayName = self.gatewayName - contract.symbol = SYMBOL_BTCCNY - contract.exchange = EXCHANGE_LBANK - contract.vtSymbol = '.'.join([contract.symbol, contract.exchange]) - contract.name = u'人民币现货BTC' - contract.size = 1 - contract.priceTick = 0.01 - contract.productClass = PRODUCT_SPOT - self.gateway.onContract(contract) - - contract = VtContractData() - contract.gatewayName = self.gatewayName - contract.symbol = SYMBOL_ZECCNY - contract.exchange = EXCHANGE_LBANK - contract.vtSymbol = '.'.join([contract.symbol, contract.exchange]) - contract.name = u'人民币现货ZEC' - contract.size = 1 - contract.priceTick = 0.01 - contract.productClass = PRODUCT_SPOT - self.gateway.onContract(contract) - - #---------------------------------------------------------------------- - def sendOrder(self, req): - """发单""" - # 检查是否填入了价格,禁止市价委托 - if req.priceType != PRICETYPE_LIMITPRICE: - err = VtErrorData() - err.gatewayName = self.gatewayName - err.errorMsg = u'LBANK接口仅支持限价单' - err.errorTime = datetime.now().strftime('%H:%M:%S.%f')[:-3] - self.gateway.onError(err) - return None - - # 发送限价委托 - s = SYMBOL_MAP_REVERSE[req.symbol] - - if req.direction == DIRECTION_LONG: - type_ = 'buy' - else: - type_ = 'sell' - - reqID = self.createOrder(s, type_, req.price, req.volume) - - self.localID += 1 - localID = str(self.localID) - self.reqLocalDict[reqID] = localID - - # 推送委托信息 - order = VtOrderData() - order.gatewayName = self.gatewayName - - order.symbol = req.symbol - order.exchange = EXCHANGE_LBANK - order.vtSymbol = '.'.join([order.symbol, order.exchange]) - - order.orderID = localID - order.vtOrderID = '.'.join([order.orderID, order.gatewayName]) - - order.direction = req.direction - order.offset = OFFSET_UNKNOWN - order.price = req.price - order.volume = req.volume - order.orderTime = datetime.now().strftime('%H:%M:%S.%f')[:-3] - order.status = STATUS_UNKNOWN - - self.workingOrderDict[localID] = order - self.gateway.onOrder(order) - - # 返回委托号 - return order.vtOrderID + + channelDepth = 'lh_sub_spot_%s_depth_20' %symbol + #channelTrades = 'lh_sub_spot_%s_ticker' %symbol + self.channelTickDict[channelDepth] = tick + # self.channelTickDict[channelTrades] = tick + + for channel in [channelDepth]: #, channelTrades]: + req = { + 'event': 'addChannel', + 'channel': channel + } + self.sendReq(req) #---------------------------------------------------------------------- - def cancel(self, req): - """撤单""" - localID = req.orderID - if localID in self.localSystemDict: - systemID = self.localSystemDict[localID] - s = SYMBOL_MAP_REVERSE[req.symbol] - self.cancelOrder(s, systemID) - else: - self.cancelDict[localID] = req + def onData(self, data): + """数据回调""" + channel = data['channel'] + print(data) + if 'trades' in channel: + self.onTick(data) + elif 'depth' in channel: + self.onDepth(data) + + #---------------------------------------------------------------------- + def onError(self, msg): + """错误回调""" + self.writeLog(msg) + + #---------------------------------------------------------------------- + def writeLog(self, content): + """发出日志""" + log = VtLogData() + log.gatewayName = self.gatewayName + log.logContent = content + self.gateway.onLog(log) + + #---------------------------------------------------------------------- + def onTick(self, data): + """""" + if 'data' not in data: + return + + channel = data['channel'] + tick = self.channelTickDict[channel] + + fill = data['data'][0] + tick.lastPrice = float(fill[0]) + tick.datetime = datetime.fromtimestamp(int(fill[2])/1000) + tick.date = tick.datetime.strftime('%Y%m%d') + tick.time = tick.datetime.strftime('%H:%M:%S') + + self.gateway.onTick(copy(tick)) #---------------------------------------------------------------------- - def queryOrders(self): - """查询委托""" - for s in SYMBOL_MAP.keys(): - self.getOrdersInfoHistory(s, '0', '1', '200') + def onDepth(self, data): + """""" + if 'data' not in data: + return + + channel = data['channel'] + tick = self.channelTickDict[channel] - #---------------------------------------------------------------------- - def queryWorkingOrders(self): - """查询活动委托""" - for localID, order in self.workingOrderDict.items(): - if localID in self.localSystemDict: - systemID = self.localSystemDict[localID] - s = SYMBOL_MAP_REVERSE[order.symbol] - self.getOrdersInfo(s, systemID) + d = data['data'] + bids = d['bids'] + asks = d['asks'] + + tick.bidPrice1, tick.bidVolume1 = bids[0] + tick.bidPrice2, tick.bidVolume2 = bids[1] + tick.bidPrice3, tick.bidVolume3 = bids[2] + tick.bidPrice4, tick.bidVolume4 = bids[3] + tick.bidPrice5, tick.bidVolume5 = bids[4] + + tick.askPrice1, tick.askVolume1 = asks[0] + tick.askPrice2, tick.askVolume2 = asks[1] + tick.askPrice3, tick.askVolume3 = asks[2] + tick.askPrice4, tick.askVolume4 = asks[3] + tick.askPrice5, tick.askVolume5 = asks[4] + + tick.lastPrice = (tick.askPrice1 + tick.askPrice2) / 2 + + tick.datetime = datetime.fromtimestamp(d['timestamp']/1000) + tick.date = tick.datetime.strftime('%Y%m%d') + tick.time = tick.datetime.strftime('%H:%M:%S') + + self.gateway.onTick(copy(tick)) - #---------------------------------------------------------------------- - def queryPrice(self): - """查询行情""" - for s in SYMBOL_MAP.keys(): - self.getTicker(s) - self.getDepth(s, 5, 0) - - #---------------------------------------------------------------------- - def queryAccount(self): - """查询资金和资产""" - self.getUserInfo() \ No newline at end of file