[Add]重新实现Lbank接口

This commit is contained in:
vn.py 2018-07-05 14:03:04 +08:00
parent 04065369e3
commit 5d046d360c
10 changed files with 758 additions and 730 deletions

View File

@ -0,0 +1,5 @@
{
"apiKey": "",
"secretKey": "",
"symbols": ["eth_usdt", "sc_btc", "btc_usdt"]
}

View File

@ -23,10 +23,10 @@ from vnpy.trader.uiMainWindow import MainWindow
from vnpy.trader.gateway import (huobiGateway, okexGateway,
binanceGateway, bitfinexGateway,
bitmexGateway, fcoinGateway,
bigoneGateway)
bigoneGateway, lbankGateway)
# 加载上层应用
from vnpy.trader.app import (riskManager, algoTrading)
from vnpy.trader.app import (algoTrading)
#----------------------------------------------------------------------
@ -42,6 +42,7 @@ def main():
me = MainEngine(ee)
# 添加交易接口
me.addGateway(lbankGateway)
me.addGateway(bigoneGateway)
me.addGateway(fcoinGateway)
me.addGateway(bitmexGateway)
@ -51,7 +52,6 @@ def main():
me.addGateway(bitfinexGateway)
# 添加上层应用
me.addApp(riskManager)
me.addApp(algoTrading)
# 创建主窗口

View File

@ -95,7 +95,7 @@ class BigoneRestApi(object):
if code == 200:
callback(d, reqid)
else:
self.onError(code, d)
self.onError(code, str(d))
except Exception as e:
self.onError(type(e), e.message)

View File

@ -1,4 +1,4 @@
# encoding: UTF-8
from __future__ import absolute_import
from .vnlbank import LbankApi
from .vnlbank import LbankRestApi, LbankWebsocketApi

View File

@ -1,50 +1,67 @@
# encoding: utf-8
from __future__ import absolute_import
from time import time, sleep
# encoding: UTF-8
from six.moves import input
from time import time
from .vnlbank import LbankApi
from vnlbank import LbankRestApi, LbankWebsocketApi
API_KEY = '132a36ce-ad1c-409a-b48c-09b7877ae49b'
SECRET_KEY = '319320BF875297E7F4050E1195B880E8'
if __name__ == '__main__':
apiKey = ''
secretKey = ''
#----------------------------------------------------------------------
def restTest():
""""""
# 创建API对象并初始化
api = LbankApi()
api.DEBUG = True
api.init(apiKey, secretKey, 2)
api = LbankRestApi()
api.init(API_KEY, SECRET_KEY)
api.start(1)
# 查询行情
api.getTicker('btc_cny')
# 测试
#api.addReq('GET', '/currencyPairs.do', {}, api.onData)
#api.addReq('GET', '/accuracy.do', {}, api.onData)
# 查询深度
api.getDepth('btc_cny', '60', '1')
#api.addReq('GET', '/ticker.do', {'symbol': 'eth_btc'}, api.onData)
#api.addReq('GET', '/depth.do', {'symbol': 'eth_btc', 'size': '5'}, api.onData)
# 查询历史成交
#api.getTrades('btc_cny', '1', str(int(time())))
#api.addReq('post', '/user_info.do', {}, api.onData)
# 查询K线
#t = int(time())
#sleep(300)
#api.getKline('btc_cny', '20', 'minute1', str(t))
# 查询账户
#api.getUserInfo()
# 发送委托
#api.createOrder('btc_cny', 'sell', '8000', '0.001')
# 撤单
#api.cancelOrder('btc_cny', '725bd2da-73aa-419f-8090-f68488074e8f')
# 查询委托
#api.getOrdersInfo('btc_cny', '725bd2da-73aa-419f-8090-f68488074e8f')
# 查询委托历史
#api.getOrdersInfoHistory('btc_cny', '0', '1', '100')
req = {
'symbol': 'sc_btc',
'current_page': '1',
'page_length': '50'
}
api.addReq('POST', '/orders_info_no_deal.do', req, api.onData)
# 阻塞
input()
#----------------------------------------------------------------------
def wsTest():
""""""
ws = LbankWebsocketApi()
ws.start()
channels = [
'lh_sub_spot_eth_btc_depth_20',
'lh_sub_spot_eth_btc_trades',
'lh_sub_spot_eth_btc_ticker'
]
for channel in channels:
req = {
'event': 'addChannel',
'channel': channel
}
ws.sendReq(req)
# 阻塞
input()
if __name__ == '__main__':
restTest()
#wsTest()

View File

@ -3,47 +3,28 @@
from __future__ import print_function
import urllib
import hashlib
import ssl
import json
import traceback
import requests
from Queue import Queue, Empty
from threading import Thread
from time import sleep
from multiprocessing.dummy import Pool
from time import time
import websocket
API_ROOT ="https://api.lbank.info/v1/"
REST_HOST = "https://api.lbank.info/v1"
WEBSOCKET_HOST = 'ws://api.lbank.info/ws'
FUNCTION_TICKER = ('ticker.do', 'get')
FUNCTION_DEPTH = ('depth.do', 'get')
FUNCTION_TRADES = ('trades.do', 'get')
FUNCTION_KLINE = ('kline.do', 'get')
FUNCTION_USERINFO = ('user_info.do', 'post')
FUNCTION_CREATEORDER = ('create_order.do', 'post')
FUNCTION_CANCELORDER = ('cancel_order.do', 'post')
FUNCTION_ORDERSINFO = ('orders_info.do', 'post')
FUNCTION_ORDERSINFOHISTORY = ('orders_info_history.do', 'post')
#----------------------------------------------------------------------
def signature(params, secretKey):
"""生成签名"""
params = sorted(params.iteritems(), key=lambda d:d[0], reverse=False)
params.append(('secret_key', secretKey))
message = urllib.urlencode(params)
m = hashlib.md5()
m.update(message)
m.digest()
sig=m.hexdigest()
return sig
########################################################################
class LbankApi(object):
class LbankRestApi(object):
""""""
DEBUG = True
#----------------------------------------------------------------------
def __init__(self):
@ -51,258 +32,194 @@ class LbankApi(object):
self.apiKey = ''
self.secretKey = ''
self.interval = 1 # 每次请求的间隔等待
self.active = False # API工作状态
self.reqID = 0 # 请求编号
self.reqQueue = Queue() # 请求队列
self.reqThread = Thread(target=self.processQueue) # 请求处理线程
self.queue = Queue() # 请求队列
self.pool = None # 线程池
self.sessionDict = {} # 连接池
#----------------------------------------------------------------------
def init(self, apiKey, secretKey, interval):
def init(self, apiKey, secretKey):
"""初始化"""
self.apiKey = apiKey
self.secretKey = secretKey
self.interval = interval
self.active = True
self.reqThread.start()
#----------------------------------------------------------------------
def exit(self):
def start(self, n=10):
""""""
if self.active:
return
self.active = True
self.pool = Pool(n)
self.pool.map_async(self.run, range(n))
#----------------------------------------------------------------------
def close(self):
"""退出"""
self.active = False
if self.reqThread.isAlive():
self.reqThread.join()
if self.pool:
self.pool.close()
self.pool.join()
#----------------------------------------------------------------------
def processRequest(self, req):
def processReq(self, req, i):
"""处理请求"""
# 读取方法和参数
api, method = req['function']
params = req['params']
url = API_ROOT + api
method, path, params, callback, reqID = req
url = REST_HOST + path
# 在参数中增加必须的字段
params['api_key'] = self.apiKey
# 添加签名
sign = signature(params, self.secretKey)
params['sign'] = sign
params['sign'] = self.generateSignature(params)
# 发送请求
payload = urllib.urlencode(params)
r = requests.request(method, url, params=payload)
if r.status_code == 200:
data = r.json()
return data
else:
return None
try:
# 使用会话重用技术请求延时降低80%
session = self.sessionDict[i]
resp = session.request(method, url, params=payload)
#resp = requests.request(method, url, params=payload)
code = resp.status_code
d = resp.json()
if code == 200:
callback(d, reqID)
else:
self.onError(code, str(d))
except Exception as e:
self.onError(type(e), e.message)
#----------------------------------------------------------------------
def processQueue(self):
"""处理请求队列中的请求"""
def run(self, i):
"""连续运行"""
self.sessionDict[i] = requests.Session()
while self.active:
try:
req = self.reqQueue.get(block=True, timeout=1) # 获取请求的阻塞为一秒
callback = req['callback']
reqID = req['reqID']
data = self.processRequest(req)
# 请求失败
if data is None:
error = u'请求失败'
self.onError(error, req, reqID)
elif 'error_code' in data:
error = u'请求出错,错误代码:%s' % data['error_code']
self.onError(error, req, reqID)
# 请求成功
else:
if self.DEBUG:
print(callback.__name__)
callback(data, req, reqID)
# 流控等待
sleep(self.interval)
req = self.queue.get(block=True, timeout=1) # 获取请求的阻塞为一秒
self.processReq(req, i)
except Empty:
pass
#----------------------------------------------------------------------
def sendRequest(self, function, params, callback):
def addReq(self, method, path, params, callback):
"""发送请求"""
# 请求编号加1
self.reqID += 1
# 生成请求字典并放入队列中
req = {}
req['function'] = function
req['params'] = params
req['callback'] = callback
req['reqID'] = self.reqID
self.reqQueue.put(req)
req = (method, path, params, callback, self.reqID)
self.queue.put(req)
# 返回请求编号
return self.reqID
#----------------------------------------------------------------------
def onError(self, error, req, reqID):
def generateSignature(self, params):
"""生成签名"""
params = sorted(params.iteritems(), key=lambda d:d[0], reverse=False)
params.append(('secret_key', self.secretKey))
message = urllib.urlencode(params)
m = hashlib.md5()
m.update(message)
m.digest()
sig = m.hexdigest()
return sig
#----------------------------------------------------------------------
def onError(self, code, msg):
"""错误推送"""
print(error, req, reqID)
###############################################
# 行情接口
###############################################
print(code, msg)
#----------------------------------------------------------------------
def getTicker(self, symbol):
"""查询行情"""
function = FUNCTION_TICKER
params = {'symbol': symbol}
callback = self.onGetTicker
return self.sendRequest(function, params, callback)
def onData(self, data, reqID):
""""""
print(data, reqID)
# ----------------------------------------------------------------------
def getDepth(self, symbol, size, merge):
"""查询深度"""
function = FUNCTION_DEPTH
params = {
'symbol': symbol,
'size': size,
'mege': merge
}
callback = self.onGetDepth
return self.sendRequest(function, params, callback)
# ----------------------------------------------------------------------
def getTrades(self, symbol, size, time):
"""查询历史成交"""
function = FUNCTION_TRADES
params = {
'symbol': symbol,
'size': size,
'time': time
}
callback = self.onGetTrades
return self.sendRequest(function, params, callback)
# ----------------------------------------------------------------------
def getKline(self, symbol, size, type_, time):
"""查询K线"""
function = FUNCTION_KLINE
params = {
'symbol': symbol,
'size': size,
'type': type_,
'time': time
}
callback = self.onGetKline
return self.sendRequest(function, params, callback)
########################################################################
class LbankWebsocketApi(object):
"""Websocket API"""
#----------------------------------------------------------------------
def onGetTicker(self, data, req, reqID):
"""查询行情回调"""
print(data, reqID)
def __init__(self):
"""Constructor"""
self.ws = None
self.thread = None
self.active = False
# ----------------------------------------------------------------------
def onGetDepth(self, data, req, reqID):
"""查询深度回调"""
print(data, reqID)
#----------------------------------------------------------------------
def start(self):
"""启动"""
self.ws = websocket.create_connection(WEBSOCKET_HOST,
sslopt={'cert_reqs': ssl.CERT_NONE})
# ----------------------------------------------------------------------
def onGetTrades(self, data, req, reqID):
"""查询历史成交"""
print(data, reqID)
self.active = True
self.thread = Thread(target=self.run)
self.thread.start()
# ----------------------------------------------------------------------
def onGetKline(self, data, req, reqID):
"""查询K线回报"""
print(data, reqID)
self.onConnect()
###############################################
# 交易接口
###############################################
#----------------------------------------------------------------------
def reconnect(self):
"""重连"""
self.ws = websocket.create_connection(WEBSOCKET_HOST,
sslopt={'cert_reqs': ssl.CERT_NONE})
# ----------------------------------------------------------------------
def getUserInfo(self):
"""查询账户信息"""
function = FUNCTION_USERINFO
params = {}
callback = self.onGetUserInfo
return self.sendRequest(function, params, callback)
self.onConnect()
# ----------------------------------------------------------------------
def createOrder(self, symbol, type_, price, amount):
"""发送委托"""
function = FUNCTION_CREATEORDER
params = {
'symbol': symbol,
'type': type_,
'price': price,
'amount': amount
}
callback = self.onCreateOrder
return self.sendRequest(function, params, callback)
#----------------------------------------------------------------------
def run(self):
"""运行"""
while self.active:
try:
stream = self.ws.recv()
data = json.loads(stream)
self.onData(data)
except:
msg = traceback.format_exc()
print(msg)
self.onError(msg)
self.reconnect()
# ----------------------------------------------------------------------
def cancelOrder(self, symbol, orderId):
"""撤单"""
function = FUNCTION_CANCELORDER
params = {
'symbol': symbol,
'order_id': orderId
}
callback = self.onCancelOrder
return self.sendRequest(function, params, callback)
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.active = False
# ----------------------------------------------------------------------
def getOrdersInfo(self, symbol, orderId):
"""查询委托"""
function = FUNCTION_ORDERSINFO
params = {
'symbol': symbol,
'order_id': orderId
}
callback = self.onGetOrdersInfo
return self.sendRequest(function, params, callback)
if self.thread:
self.ws.shutdown()
self.thread.join()
# ----------------------------------------------------------------------
def getOrdersInfoHistory(self, symbol, status, currentPage, pageLength):
"""撤单"""
function = FUNCTION_ORDERSINFOHISTORY
params = {
'symbol': symbol,
'status': status,
'current_page': currentPage,
'page_length': pageLength
}
callback = self.onGetOrdersInfoHistory
return self.sendRequest(function, params, callback)
#----------------------------------------------------------------------
def onConnect(self):
"""连接回调"""
print('connected')
# ----------------------------------------------------------------------
def onGetUserInfo(self, data, req, reqID):
"""查询账户信息"""
print(data, reqID)
#----------------------------------------------------------------------
def onData(self, data):
"""数据回调"""
print('-' * 30)
l = data.keys()
l.sort()
for k in l:
print(k, data[k])
# ----------------------------------------------------------------------
def onCreateOrder(self, data, req, reqID):
"""委托回报"""
print(data, reqID)
#----------------------------------------------------------------------
def onError(self, msg):
"""错误回调"""
print(msg)
# ----------------------------------------------------------------------
def onCancelOrder(self, data, req, reqID):
"""撤单回报"""
print(data, reqID)
#----------------------------------------------------------------------
def sendReq(self, req):
"""发出请求"""
self.ws.send(json.dumps(req))
# ----------------------------------------------------------------------
def onGetOrdersInfo(self, data, req, reqID):
"""查询委托回报"""
print(data, reqID)
# ----------------------------------------------------------------------
def onGetOrdersInfoHistory(self, data, req, reqID):
"""撤单回报"""
print(data, reqID)

View File

@ -1,9 +1,9 @@
# encoding: UTF-8
from vnpy.trader import vtConstant
from .fcoinGateway import FcoinGateay
from .fcoinGateway import FcoinGateway
gatewayClass = FcoinGateay
gatewayClass = FcoinGateway
gatewayName = 'FCOIN'
gatewayDisplayName = 'FCOIN'
gatewayType = vtConstant.GATEWAYTYPE_BTC

View File

@ -39,13 +39,13 @@ priceTypeMap[PRICETYPE_MARKETPRICE] = 'market'
########################################################################
class FcoinGateay(VtGateway):
class FcoinGateway(VtGateway):
"""FCOIN接口"""
#----------------------------------------------------------------------
def __init__(self, eventEngine, gatewayName=''):
"""Constructor"""
super(FcoinGateay, self).__init__(eventEngine, gatewayName)
super(FcoinGateway, self).__init__(eventEngine, gatewayName)
self.restApi = RestApi(self)
self.wsApi = WebsocketApi(self)

View File

@ -1,6 +1,5 @@
{
"apiKey": "请在链行官网申请",
"secretKey": "请在链行官网申请",
"interval": 1,
"debug": false
"apiKey": "",
"secretKey": "",
"symbols": ["eth_usdt", "sc_btc", "btc_usdt"]
}

File diff suppressed because it is too large Load Diff