add zaif, coincheck, okex , zb api interface. And add okex,coincheckk gateway

This commit is contained in:
ipqhjjybj 2017-12-10 20:52:43 +08:00
parent 3c0309993d
commit b8c0d436d6
29 changed files with 3961 additions and 0 deletions

View File

@ -0,0 +1,24 @@
# vn.coincheck
### 简介
coincheck 的比特币交易接口基于Rest API开发实现了官方提供API的全部功能。
### 特点
1. 面向对象的API设计接近CTP API的结构对于国内用户而言更容易上手
2. 参考CTP API的设计主动函数调用的结果通过异步回调函数的方式推送到程序中适用于开发稳定可靠的实盘交易程序
链接:
https://coincheck.com/cn/documents/exchange/api
https://coincheck.com/cn/documents/exchange/api#public
https://coincheck.com/cn/documents/exchange/api#private
https://github.com/kmn/coincheck

View File

@ -0,0 +1,3 @@
# encoding: UTF-8
from vncoincheck import TradeApi, DataApi

View File

@ -0,0 +1,34 @@
<!DOCTYPE html>
<html>
<head>
<meta name="viewport" content="width=device-width" />
<title>WebSocket 客户端</title>
</head>
<body>
<script type="text/javascript">
var socket;
socket = new WebSocket("wss://ws-api.coincheck.com/");
socket.onopen = function() {
alert("Socket open");
socket.send(JSON.stringify({type: "subscribe", channel: "btc_jpy-orderbook"}));
//socket.send("这是来自客户端的消息" + location.href + new Date());
};
socket.onmessage = function(e){
var data = e.data;
//处理数据
alert(data);
};
</script>
</body>
</html>

View File

@ -0,0 +1,65 @@
# encoding: utf-8
from vncoincheck import *
def testTrade():
"""测试交易"""
accessKey = '你的accessKey'
secretKey = '你的secretKey'
# 创建API对象并初始化
api = TradeApi()
api.DEBUG = True
api.init(accessKey, secretKey)
# 查询账户,测试通过
#api.get_info()
# api.get_info()
#api.get_balance()
# api.get_balance()
#api.buy_btc_jpy(rate = 200 , amount = 0.005)
#api.cancel_orders("439397799")
for i in range(10):
api.buy_btc_jpy(rate = 200 , amount = 0.005)
#api.sell_btc_jpy(rate = 200000 , amount = 0.005)
#orders = api.list_orders()
#sleep(3)
# for d in orders:
# api.cancel_orders( d["id"])
#sleep(0.3)
# 查询委托,测试通过
#api.active_orders( currency_pair = SYMBOL_BTCJPY )
# 阻塞
input()
def testData():
"""测试行情接口"""
api = DataApi()
api.init(0.5 , 1)
# 订阅成交推送,测试通过
# api.subscribeTick(SYMBOL_BTCJPY)
# 订阅成交记录
# api.subscribeTrades(SYMBOL_BTCJPY)
# # 订阅十档行情
api.subscribeOrderbooks(SYMBOL_BTCJPY)
input()
if __name__ == '__main__':
testTrade()
#testTrade()
#testData()

View File

@ -0,0 +1,6 @@
# encoding: utf-8
from coincheck import order,market,account
ok = order.Order(access_key="你的accessKey", secret_key="你的secretKey")
print ok.buy_btc_jpy(amount=0.01,rate=200)

View File

@ -0,0 +1,31 @@
# encoding: utf-8
from vncoincheck import *
import socket
import json
import websocket
from websocket import create_connection
ws = None
def open():
global ws
print "open"
ws.send( json.dumps({"type": "subscribe", "channel": "btc_jpy-trades"}))
def testWebsocket():
global ws
while 1:
ws = create_connection("wss://ws-api.coincheck.com/", on_open=open)
if ws.connected:
print ws.recv()
sleep(5)
# 阻塞
input()
if __name__ == '__main__':
testWebsocket()

View File

@ -0,0 +1,18 @@
# encoding: utf-8
from vncoincheck import *
def test():
api = DataApiSocket()
api.connect("wss://ws-api.coincheck.com")
sleep(2)
api.sendOrderbookRequest()
api.sendTradesRequest()
raw_input()
if __name__ == '__main__':
test()

View File

@ -0,0 +1,461 @@
# encoding: utf-8
import urllib
import hashlib
import json
import requests
import hmac
import time
from datetime import datetime
from time import time, sleep , mktime
from Queue import Queue, Empty
from threading import Thread
import urllib
import websocket
import inspect
import requests
import cerberus
CURRENCY_JPY = "jpy"
SYMBOL_BTCJPY = 'btc_jpy'
FUNCTIONCODE_GET_INFO_COINCHECK = 'get_info'
FUNCTIONCODE_GET_BALANCE_COINCHECK = 'get_balance'
FUNCTIONCODE_LIST_ORDER_COINCHECK = 'list_order'
FUNCTIONCODE_BUY_ORDER_COINCHECK = 'buy'
FUNCTIONCODE_SELL_ORDER_COINCHECK = 'sell'
FUNCTIONCODE_CANCEL_ORDER_COINCHECK = 'cancel_order'
FUNCTIONCODE_HISTORY_ORDER_COINCHECK = 'history'
class TradeApi(object):
API_account = "https://coincheck.com/api/accounts"
API_balance = "https://coincheck.com/api/accounts/balance"
API_trade = "https://coincheck.com/api/exchange/orders"
API_list_order = "https://coincheck.com/api/exchange/orders/opens"
API_history_order = "https://coincheck.com/api/exchange/orders/transactions"
API_cancel_order = "https://coincheck.com/api/exchange/orders/%s"
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.accessKey = ''
self.secretKey = ''
self.active = False # API工作状态
self.reqID = 0 # 请求编号
#self.reqQueue = Queue() # 请求队列
self.reqQueue = [] # 请求的队列
self.reqThread = Thread(target=self.processQueue) # 请求处理线程
self.nonce = int(mktime(datetime.now().timetuple())) * 1000000000
def make_header(self , url , body = ""):
''' create request header function
:param url: URL for the new :class:`Request` object.
'''
self.nonce += 1
nonce = str(self.nonce)
message = nonce + url + body
signature = hmac.new(self.secretKey.encode('utf-8'), message.encode('utf-8'), hashlib.sha256).hexdigest()
headers = {
'ACCESS-KEY' : self.accessKey,
'ACCESS-NONCE' : nonce,
'ACCESS-SIGNATURE': signature
}
return headers
#----------------------------------------------------------------------
def processRequest(self, req):
"""处理请求"""
# 读取方法和参数
url = req['url']
method = req['method']
r = None
headers = self.make_header(url)
if method in [FUNCTIONCODE_GET_INFO_COINCHECK , FUNCTIONCODE_GET_BALANCE_COINCHECK , FUNCTIONCODE_LIST_ORDER_COINCHECK , FUNCTIONCODE_HISTORY_ORDER_COINCHECK]:
r = requests.get(url , headers=headers)
elif method in [FUNCTIONCODE_CANCEL_ORDER_COINCHECK]:
r = requests.delete(url , headers=headers)
elif method in [FUNCTIONCODE_BUY_ORDER_COINCHECK , FUNCTIONCODE_SELL_ORDER_COINCHECK]:
kwargs = req["kwargs"]
payload = { 'rate': kwargs['rate'],
'amount': kwargs['amount'],
'order_type': method,
'pair': SYMBOL_BTCJPY}
body = 'rate={rate}&amount={amount}&order_type={order_type}&pair={pair}'.format(**payload)
headers = self.make_header(url , body)
r = requests.post(url,headers=headers,data=body)
# print r
# from coincheck import order,market,account
# ok = order.Order(access_key=self.accessKey, secret_key=self.secretKey)
# print "???"
# return ok.buy_btc_jpy(amount=0.01,rate=200)
#return self.create2( rate = 200 , amount = 0.01 , order_type = 'buy' , pair = 'btc_jpy')
#self.create2( rate = 200 , amount = 0.01 , order_type = 'buy' , pair = 'btc_jpy')
#r = self.create( rate = 200 , amount = 0.01 , order_type = 'buy' , pair = 'btc_jpy')
# print "!!!!!!!!!!!!!!" ,method
# if method in [FUNCTIONCODE_BUY_ORDER_COINCHECK , FUNCTIONCODE_SELL_ORDER_COINCHECK]:
# print "what?" , method , r.status_code
# elif method in [FUNCTIONCODE_LIST_ORDER_COINCHECK]:
# print "list?" , method , r.status_code
if r != None and r.status_code == 200:
data = r.json()
if data['success'] == 0:
print "error in coincheck %s" % method
return data
else:
return data
else:
try:
data = json.loads(r.text)
return data
except Exception,ex:
return None
#----------------------------------------------------------------------
def processQueue(self):
"""处理请求队列中的请求"""
while self.active:
try:
#print "processQueue"
#req = self.reqQueue.get(block=True, timeout=0.001) # 获取请求的阻塞为一秒
# out_arr = [s[0] for s in self.reqQueue]
# print out_arr
(Type , req) = self.reqQueue[0]
callback = req['callback']
reqID = req['reqID']
data = self.processRequest(req)
# 请求成功
if data != None :
if self.DEBUG:
print callback.__name__
callback(data, req, reqID)
self.reqQueue.pop(0)
sleep(0.1)
except Exception,ex:
pass
#----------------------------------------------------------------------
def sendRequest(self, url , method, callback, kwargs = None,optional=None):
"""发送请求"""
# 请求编号加1
self.reqID += 1
# 生成请求字典并放入队列中
req = {}
req['url'] = url
req['method'] = method
req['callback'] = callback
req['optional'] = optional
req['kwargs'] = kwargs
req['reqID'] = self.reqID
#if method in [FUNCTIONCODE_LIST_ORDER_COINCHECK , FUNCTIONCODE_GET_BALANCE_COINCHECK , FUNCTIONCODE_BUY_ORDER_COINCHECK , FUNCTIONCODE_SELL_ORDER_COINCHECK]:
if method in [FUNCTIONCODE_LIST_ORDER_COINCHECK , FUNCTIONCODE_GET_BALANCE_COINCHECK ]:
flag = False
for use_method ,r in self.reqQueue:
if use_method == method:
flag = True
break
if False == flag:
self.reqQueue.append( (method , req))
else:
self.reqQueue.append( (method , req))
#self.reqQueue.put(req)
# 返回请求编号
return self.reqID
####################################################
## 主动函数
####################################################
#----------------------------------------------------------------------
def init(self, accessKey, secretKey):
"""初始化"""
self.accessKey = accessKey
self.secretKey = secretKey
self.active = True
self.reqThread.start()
#----------------------------------------------------------------------
def exit(self):
"""退出"""
self.active = False
if self.reqThread.isAlive():
self.reqThread.join()
def get_info(self):
return self.sendRequest( self.API_account , FUNCTIONCODE_GET_INFO_COINCHECK , self.onGet_info , None)
def get_balance(self):
return self.sendRequest( self.API_balance , FUNCTIONCODE_GET_BALANCE_COINCHECK , self.onGet_balance , None)
def buy_btc_jpy(self , **kwargs):
print "buy_btc_jpy"
return self.sendRequest( self.API_trade , FUNCTIONCODE_BUY_ORDER_COINCHECK , self.onBuy_btc , kwargs = kwargs, optional = None)
def sell_btc_jpy(self, **kwargs):
print "sell_btc_jpy"
return self.sendRequest( self.API_trade , FUNCTIONCODE_SELL_ORDER_COINCHECK , self.onSell_btc , kwargs = kwargs, optional = None)
def list_orders(self):
return self.sendRequest( self.API_list_order , FUNCTIONCODE_LIST_ORDER_COINCHECK , self.onList_order , None)
def cancel_orders(self , order_id):
return self.sendRequest( self.API_cancel_order % (str(order_id)) , FUNCTIONCODE_CANCEL_ORDER_COINCHECK , self.onCancel_orders , None)
def history_orders(self):
return self.sendRequest( self.API_history_order , FUNCTIONCODE_HISTORY_ORDER_COINCHECK , self.onHistory_orders , None)
####################################################
## 回调函数
####################################################
def onGet_info(self, data, req, reqID):
print data
def onGet_balance(self, data, req, reqID):
print data
def onBuy_btc(self, data, req, reqID):
print data
def onSell_btc(self, data, req, reqID):
print data
def onList_order(self, data, req, reqID):
print data
def onCancel_orders(self, data, req, reqID):
print data
def onHistory_orders(self, data, req, reqID):
print data
class DataApiSocket(object):
"""基于websocket的API对象"""
#----------------------------------------------------------------------
def __init__(self ):
"""Constructor"""
self.host = '' # 服务器地址
self.currency = '' # 货币类型usd或者cny
self.ws = None # websocket应用对象
self.thread = None # 工作线程
#######################
## 通用函数
#######################
#----------------------------------------------------------------------
def onMessage(self, ws, evt):
"""信息推送"""
print 'onMessage'
print evt
#----------------------------------------------------------------------
def onError(self, ws, evt):
"""错误推送"""
print 'onError'
print evt
#----------------------------------------------------------------------
def onClose(self, ws):
"""接口断开"""
print 'onClose'
#----------------------------------------------------------------------
def onOpen(self, ws):
"""接口打开"""
print "onOpen"
self.sendOrderbookRequest()
self.sendTradesRequest()
#----------------------------------------------------------------------
def connect(self, host, trace=False):
"""连接服务器"""
self.host = host
self.currency = CURRENCY_JPY
websocket.enableTrace(trace)
self.ws = websocket.WebSocketApp(host,
on_message=self.onMessage,
on_error=self.onError,
on_close=self.onClose,
on_open=self.onOpen)
self.thread = Thread(target=self.ws.run_forever)
self.thread.start()
sleep(8)
#----------------------------------------------------------------------
def reconnect(self):
"""重新连接"""
# 首先关闭之前的连接
self.close()
# 再执行重连任务
self.ws = websocket.WebSocketApp(self.host,
on_message=self.onMessage,
on_error=self.onError,
on_close=self.onClose,
on_open=self.onOpen)
self.thread = Thread(target=self.ws.run_forever)
self.thread.start()
#----------------------------------------------------------------------
def exit(self):
self.close()
#----------------------------------------------------------------------
def close(self):
"""关闭接口"""
if self.thread and self.thread.isAlive():
self.ws.close()
self.thread.join()
#----------------------------------------------------------------------
def sendOrderbookRequest(self):
"""发送行情请求 , 订阅 orderbook"""
# 生成请求
d = {}
d['type'] = 'subscribe'
d['channel'] = "btc_jpy-orderbook"
# 使用json打包并发送
j = json.dumps(d)
# 若触发异常则重连
try:
self.ws.send(j)
except websocket.WebSocketConnectionClosedException:
pass
#----------------------------------------------------------------------
def sendTradesRequest(self):
"""订阅最近的交易记录 """
# 生成请求
d = {}
d['type'] = 'subscribe'
d['channel'] = "btc_jpy-trades"
# 使用json打包并发送
j = json.dumps(d)
# 若触发异常则重连
try:
self.ws.send(j)
except websocket.WebSocketConnectionClosedException:
pass
class DataApi(object):
'''get latest information of coincheck market'''
TICK_SYMBOL_URL = {
SYMBOL_BTCJPY: 'https://coincheck.com/api/ticker'
}
'''get latest deal history of coincheck market'''
TRADES_SYMBOL_URL = {
SYMBOL_BTCJPY: 'https://coincheck.com/api/trades'
}
'''get latest asks/bids information of coincheck market'''
ORDERBOOKERS_SYMBOL_URL = {
SYMBOL_BTCJPY: 'https://coincheck.com/api/order_books'
}
DEBUG = False
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.active = False
self.taskInterval = 0 # 每轮请求延时
self.taskList = [] # 订阅的任务列表
self.taskThread = Thread(target=self.run) # 处理任务的线程
#----------------------------------------------------------------------
def init(self, interval, debug):
"""初始化"""
self.taskInterval = interval
self.DEBUG = debug
self.active = True
self.taskThread.start()
#----------------------------------------------------------------------
def exit(self):
"""退出"""
self.active = False
if self.taskThread.isAlive():
self.taskThread.join()
#----------------------------------------------------------------------
def run(self):
"""连续运行"""
while self.active:
for url, callback in self.taskList:
try:
r = requests.get(url)
if r.status_code == 200:
data = r.json()
if self.DEBUG:
print callback.__name__
callback(data)
except Exception, e:
print e
sleep(self.taskInterval)
#----------------------------------------------------------------------
def subscribeTick(self, symbol):
"""订阅实时成交数据"""
url = self.TICK_SYMBOL_URL[symbol]
task = (url, self.onTick)
self.taskList.append(task)
#----------------------------------------------------------------------
def subscribeTrades(self, symbol):
"""订阅实时成交数据"""
url = self.TRADES_SYMBOL_URL[symbol]
task = (url, self.onTrades)
self.taskList.append(task)
#----------------------------------------------------------------------
def subscribeOrderbooks(self, symbol):
"""订阅实时成交数据"""
url = self.ORDERBOOKERS_SYMBOL_URL[symbol]
task = (url, self.onOrderbooks)
self.taskList.append(task)
#----------------------------------------------------------------------
def onTick(self, data):
"""实时成交推送"""
print data
#----------------------------------------------------------------------
def onTrades(self, data):
"""实时成交推送"""
print data
#----------------------------------------------------------------------
def onOrderbooks(self, data):
"""实时成交推送"""
print data
if __name__ == '__main__':
pass

View File

@ -0,0 +1,3 @@
# encoding: UTF-8
from vnokex import OKEX_Sub_Spot_Api , OKEX_Contract_Api , okex_all_symbol_pairs , okex_all_contract_symbol , okex_all_symbol_type

5
vnpy/api/okex/readme.txt Normal file
View File

@ -0,0 +1,5 @@
okex 的平台地址
https://www.okex.com/intro_apiOverview.html
api 文档 https://www.okex.com/ws_request.html

49
vnpy/api/okex/test.py Normal file
View File

@ -0,0 +1,49 @@
# encoding: UTF-8
from vnokex import *
# 在OkCoin网站申请这两个Key分别对应用户名和密码
apiKey = '你的accessKey'
secretKey = '你的secretKey'
# 创建API对象
api = OKEX_Sub_Spot_Api()
api.connect_Subpot(apiKey, secretKey, True)
sleep(3)
#api.login()
#api.subscribeSpotTicker("bch_btc")
#api.subscribeSpotDepth("bch_btc")
#api.subscribeSpotDepth5("bch_btc")
#api.subscribeSpotDeals("bch_btc")
#api.subscribeSpotKlines("bch_btc","30min")
api.spotTrade("etc_usdt","sell", "50" , "0.01")
#api.spotCancelOrder("etc_btc","44274138")
#api.spotUserInfo()
#api.spotOrderInfo("etc_btc", 44284731)
# api = OKEX_Contract_Api()
# api.connect_Contract(apiKey, secretKey, True)
# sleep(3)
#api.subsribeFutureTicker("btc","this_week")
#api.subscribeFutureKline("btc","this_week","30min")
#api.subscribeFutureDepth("btc","this_week")
#api.subscribeFutureDepth20("btc","this_week")
#api.subscribeFutureTrades("btc","this_week")
#api.subscribeFutureIndex("btc")
#api.subscribeFutureForecast_price("btc")
#api.login()
#api.futureTrade( "etc_usd", "this_week" ,"1" , 20 , 1 , _match_price = '0' , _lever_rate = '10') # 14245727693
#api.futureCancelOrder("etc_usd","14245727693" , "this_week")
#api.futureUserInfo()
#api.futureOrderInfo("etc_usd" , "14245727693" , "this_week" , '1', '1' , '10')
# api.subscribeFutureTrades()
'''
合约账户信息 持仓信息等在登录后都会自动推送官方文档这样写的还没实际验证过
'''

461
vnpy/api/okex/vnokex.py Normal file
View File

@ -0,0 +1,461 @@
# encoding: UTF-8
import hashlib
import zlib
import json
from time import sleep
from threading import Thread
import websocket
# OKEX网站
OKEX_USD_SUB_SPOT = 'wss://real.okex.com:10441/websocket' # OKEX 现货地址
OKEX_USD_CONTRACT = 'wss://real.okex.com:10440/websocket/okexapi' # OKEX 期货地址
okex_all_symbol_type = ["usdt","btc","ltc","eth","etc","bch"]
okex_all_symbol_pairs = ["ltc_btc","eth_btc","etc_btc","bch_btc","btc_usdt","eth_usdt",\
"ltc_usdt","etc_usdt","bch_usdt","etc_eth","bt1_btc","bt2_btc","btg_btc","qtum_btc",\
"hsr_btc","neo_btc","gas_btc","qtum_usdt","hsr_usdt","neo_usdt","gas_usdt"]
# for test
# okex_all_symbol_pairs = ['etc_usdt']
okex_all_k_line_periods = ["1min","3min","5min","15min","30min","1hour","2hour","4hour","6hour","12hour","day","3day","week"]
okex_all_contract_symbol = ["btc","ltc","eth","bch"]
okex_all_contract_type = ["this_week","next_week","quarter"]
class OKEX_Sub_Spot_Api(object):
"""基于Websocket的API对象"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.apiKey = '' # 用户名
self.secretKey = '' # 密码
self.ws_sub_spot = None # websocket应用对象 现货对象
#----------------------------------------------------------------------
def reconnect(self):
"""重新连接"""
# 首先关闭之前的连接
self.close()
# 再执行重连任务
self.ws_sub_spot = websocket.WebSocketApp(self.host,
on_message=self.onMessage,
on_error=self.onError,
on_close=self.onClose,
on_open=self.onOpen)
self.thread = Thread(target=self.ws_sub_spot.run_forever)
self.thread.start()
#----------------------------------------------------------------------
def connect_Subpot(self, apiKey , secretKey , trace = False):
self.host = OKEX_USD_SUB_SPOT
self.apiKey = apiKey
self.secretKey = secretKey
websocket.enableTrace(trace)
self.ws_sub_spot = websocket.WebSocketApp(self.host,
on_message=self.onMessage,
on_error=self.onError,
on_close=self.onClose,
on_open=self.onOpen)
self.thread = Thread(target=self.ws_sub_spot.run_forever)
self.thread.start()
#----------------------------------------------------------------------
def readData(self, evt):
"""解压缩推送收到的数据"""
# # 创建解压器
# decompress = zlib.decompressobj(-zlib.MAX_WBITS)
# # 将原始数据解压成字符串
# inflated = decompress.decompress(evt) + decompress.flush()
# 通过json解析字符串
#data = json.loads(inflated)
data = json.loads(evt)
return data
#----------------------------------------------------------------------
def close(self):
"""关闭接口"""
if self.thread and self.thread.isAlive():
self.ws_sub_spot.close()
self.thread.join()
#----------------------------------------------------------------------
def onMessage(self, ws, evt):
"""信息推送"""
print evt
#----------------------------------------------------------------------
def onError(self, ws, evt):
"""错误推送"""
print 'onError'
print evt
#----------------------------------------------------------------------
def onClose(self, ws):
"""接口断开"""
print 'onClose'
#----------------------------------------------------------------------
def onOpen(self, ws):
"""接口打开"""
print 'onOpen'
#----------------------------------------------------------------------
def subscribeSpotTicker(self, symbol_pair):
# 现货的 ticker
req = "{'event':'addChannel','channel':'ok_sub_spot_%s_ticker'}" % symbol_pair
self.ws_sub_spot.send(req)
#----------------------------------------------------------------------
def subscribeSpotDepth(self, symbol_pair):
# 现货的 市场深度
req = "{'event':'addChannel','channel':'ok_sub_spot_%s_depth'}" % symbol_pair
self.ws_sub_spot.send(req)
#----------------------------------------------------------------------
def subscribeSpotDepth5(self, symbol_pair):
# 现货的 市场深度 5
req = "{'event':'addChannel','channel':'ok_sub_spot_%s_depth_5'}" % symbol_pair
self.ws_sub_spot.send(req)
#----------------------------------------------------------------------
def subscribeSpotDeals(self, symbol_pair):
req = "{'event':'addChannel','channel':'ok_sub_spot_%s_deals'}" % symbol_pair
self.ws_sub_spot.send(req)
#----------------------------------------------------------------------
def subscribeSpotKlines(self, symbol_pair , time_period):
req = "{'event':'addChannel','channel':'ok_sub_spot_%s_kline_%s'}" % ( symbol_pair , time_period)
self.ws_sub_spot.send(req)
#----------------------------------------------------------------------
def generateSign(self, params):
"""生成签名"""
l = []
for key in sorted(params.keys()):
l.append('%s=%s' %(key, params[key]))
l.append('secret_key=%s' %self.secretKey)
sign = '&'.join(l)
return hashlib.md5(sign.encode('utf-8')).hexdigest().upper()
#----------------------------------------------------------------------
def sendTradingRequest(self, channel, params):
"""发送交易请求"""
# 在参数字典中加上api_key和签名字段
params['api_key'] = self.apiKey
params['sign'] = self.generateSign(params)
# 生成请求
d = {}
d['event'] = 'addChannel'
d['channel'] = channel
d['parameters'] = params
# 使用json打包并发送
j = json.dumps(d)
# 若触发异常则重连
try:
self.ws_sub_spot.send(j)
except websocket.WebSocketConnectionClosedException:
pass
#----------------------------------------------------------------------
def spotTrade(self, symbol_pair, type_, price, amount):
"""现货委托"""
params = {}
params['symbol'] = str(symbol_pair)
params['type'] = str(type_)
params['price'] = str(price)
params['amount'] = str(amount)
channel = 'ok_spot_order'
self.sendTradingRequest(channel, params)
#----------------------------------------------------------------------
def spotCancelOrder(self, symbol_pair, orderid):
"""现货撤单"""
params = {}
params['symbol'] = str(symbol_pair)
params['order_id'] = str(orderid)
channel = 'ok_spot_cancel_order'
self.sendTradingRequest(channel, params)
#----------------------------------------------------------------------
def spotUserInfo(self):
"""查询现货账户"""
channel = 'ok_spot_userinfo'
self.sendTradingRequest(channel, {})
#----------------------------------------------------------------------
def spotOrderInfo(self, symbol_pair, orderid):
"""查询现货委托信息"""
params = {}
params['symbol'] = str(symbol_pair)
params['order_id'] = str(orderid)
channel = 'ok_spot_orderinfo'
self.sendTradingRequest(channel, params)
#----------------------------------------------------------------------
# 这个 API 应该已经别废弃了
# def subscribeSpotTrades(self , symbol_pair):
# """订阅现货成交信息"""
# channel = 'ok_sub_spot_%s_trades' % ( symbol_pair)
# self.sendTradingRequest(channel, {})
#----------------------------------------------------------------------
# 这个 API 应该已经别废弃了
# def subscribeSpotUserInfo(self):
# """订阅现货账户信息"""
# channel = 'ok_sub_spot_userinfo'
# self.sendTradingRequest(channel, {})
#----------------------------------------------------------------------
def login(self):
params = {}
params['api_key'] = self.apiKey
params['sign'] = self.generateSign(params)
# 生成请求
d = {}
d['event'] = 'login'
d['parameters'] = params
# 使用json打包并发送
j = json.dumps(d)
# 若触发异常则重连
try:
self.ws_sub_spot.send(j)
except websocket.WebSocketConnectionClosedException:
pass
'''
OKEX 合约接口
[{
"channel": "btc_forecast_price",
"timestamp":"1490341322021",
"data": "998.8"
}]
data(string): 预估交割价格
timestamp(string): 时间戳
操作说明
无需订阅交割前一小时自动返回
这段数据交割前会自动返回
'''
class OKEX_Contract_Api(object):
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.apiKey = '' # 用户名
self.secretKey = '' # 密码
self.ws_contract = None # websocket应用对象 期货对象
#-----------------------------------------------------------------------
def connect_Contract(self, apiKey , secretKey , trace = False):
self.host = OKEX_USD_CONTRACT
self.apiKey = apiKey
self.secretKey = secretKey
websocket.enableTrace(trace)
self.ws_contract = websocket.WebSocketApp(self.host,
on_message=self.onMessage,
on_error=self.onError,
on_close=self.onClose,
on_open=self.onOpen)
self.thread = Thread(target=self.ws_contract.run_forever)
self.thread.start()
#-----------------------------------------------------------------------
#----------------------------------------------------------------------
def onMessage(self, ws, evt):
"""信息推送"""
print evt
#----------------------------------------------------------------------
def onError(self, ws, evt):
"""错误推送"""
print 'onError'
print evt
#----------------------------------------------------------------------
def onClose(self, ws):
"""接口断开"""
print 'onClose'
#----------------------------------------------------------------------
def onOpen(self, ws):
"""接口打开"""
print 'onOpen'
def subsribeFutureTicker(self, symbol , contract_type):
req = "{'event':'addChannel','channel':'ok_sub_futureusd_%s_ticker_%s'}" % (symbol , contract_type)
self.ws_contract.send(req)
def subscribeFutureKline(self, symbol , contract_type , time_period):
req = "{'event':'addChannel','channel':'ok_sub_futureusd_%s_kline_%s_%s'}" % (symbol , contract_type , time_period)
self.ws_contract.send(req)
def subscribeFutureDepth(self, symbol , contract_type):
req = "{'event':'addChannel','channel':'ok_sub_future_%s_depth_%s_usd'}" % (symbol , contract_type )
self.ws_contract.send(req)
def subscribeFutureDepth20(self, symbol , contract_type):
req = "{'event':'addChannel','channel':'ok_sub_futureusd_%s_depth_%s_20'}" % (symbol , contract_type)
self.ws_contract.send(req)
def subscribeFutureTrades(self, symbol , contract_type):
req = "{'event':'addChannel','channel':'ok_sub_futureusd_%s_trade_%s'}" % (symbol , contract_type)
self.ws_contract.send(req)
def subscribeFutureIndex(self, symbol ):
req = "{'event':'addChannel','channel':'ok_sub_futureusd_%s_index'}" % (symbol)
self.ws_contract.send(req)
#----------------------------------------------------------------------
def generateSign(self, params):
"""生成签名"""
l = []
for key in sorted(params.keys()):
l.append('%s=%s' %(key, params[key]))
l.append('secret_key=%s' %self.secretKey)
sign = '&'.join(l)
return hashlib.md5(sign.encode('utf-8')).hexdigest().upper()
#----------------------------------------------------------------------
def sendTradingRequest(self, channel, params):
"""发送交易请求"""
# 在参数字典中加上api_key和签名字段
params['api_key'] = self.apiKey
params['sign'] = self.generateSign(params)
# 生成请求
d = {}
d['event'] = 'addChannel'
d['channel'] = channel
d['parameters'] = params
# 使用json打包并发送
j = json.dumps(d)
# 若触发异常则重连
try:
self.ws_contract.send(j)
except websocket.WebSocketConnectionClosedException:
pass
#----------------------------------------------------------------------
def login(self):
params = {}
params['api_key'] = self.apiKey
params['sign'] = self.generateSign(params)
# 生成请求
d = {}
d['event'] = 'login'
d['parameters'] = params
# 使用json打包并发送
j = json.dumps(d)
# 若触发异常则重连
try:
self.ws_contract.send(j)
except websocket.WebSocketConnectionClosedException:
pass
#----------------------------------------------------------------------
def futureTrade(self, symbol_pair, contract_type ,type_, price, amount , _match_price = '0' , _lever_rate = '10'):
"""期货委托"""
params = {}
params['symbol'] = str(symbol_pair)
params['contract_type'] = str(contract_type)
params['price'] = str(price)
params['amount'] = str(amount)
params['type'] = type_ # 1:开多 2:开空 3:平多 4:平空
params['match_price'] = _match_price # 是否为对手价: 0:不是 1:是 当取值为1时,price无效
params['lever_rate'] = _lever_rate
channel = 'ok_futureusd_trade'
self.sendTradingRequest(channel, params)
#----------------------------------------------------------------------
def futureCancelOrder(self, symbol_pair, orderid , contract_type):
"""期货撤单"""
params = {}
params['symbol'] = str(symbol_pair)
params['order_id'] = str(orderid)
params['contract_type'] = str(contract_type)
channel = 'ok_futureusd_cancel_order'
self.sendTradingRequest(channel, params)
#----------------------------------------------------------------------
def futureUserInfo(self):
"""查询期货账户"""
channel = 'ok_futureusd_userinfo'
self.sendTradingRequest(channel, {})
#----------------------------------------------------------------------
def futureOrderInfo(self , symbol_pair , order_id , contract_type , status , current_page , page_length = 10):
"""查询期货委托"""
params = {}
params['symbol'] = str(symbol_pair)
params['order_id'] = str(order_id)
params['contract_type'] = str(contract_type)
params['status'] = str(status)
params['current_page'] = str(current_page)
params['page_length'] = str(page_length)
channel = 'ok_futureusd_orderinfo'
self.sendTradingRequest(channel, params)
#----------------------------------------------------------------------
def subscribeFutureTrades( self):
channel = 'ok_sub_futureusd_trades'
self.sendTradingRequest(channel, {})
#----------------------------------------------------------------------
def subscribeFutureUserInfo(self):
"""订阅期货账户信息"""
channel = 'ok_sub_futureusd_userinfo'
self.sendTradingRequest(channel, {})
#----------------------------------------------------------------------
def subscribeFuturePositions(self):
"""订阅期货持仓信息"""
channel = 'ok_sub_futureusd_positions'
self.sendTradingRequest(channel, {})

23
vnpy/api/zaif/README.txt Normal file
View File

@ -0,0 +1,23 @@
# vn.zaif
### 简介
Zaif 的比特币交易接口基于Rest API开发实现了官方提供API的全部功能。
### 特点
1. 面向对象的API设计接近CTP API的结构对于国内用户而言更容易上手
2. 参考CTP API的设计主动函数调用的结果通过异步回调函数的方式推送到程序中适用于开发稳定可靠的实盘交易程序
### API版本
日期2015-12-02
链接:
https://corp.zaif.jp/api-docs/
https://corp.zaif.jp/api-docs/trade-api/
https://github.com/E-Tsubo/zaifapi/blob/master/zaifapi/impl.py
https://github.com/tk1024/Zaif4PHP/blob/master/Zaif.php

View File

@ -0,0 +1,3 @@
# encoding: UTF-8
from vnzaif import TradeApi, DataApi

153
vnpy/api/zaif/test.php Normal file
View File

@ -0,0 +1,153 @@
<?php
use WebSocket\Client;
class Zaif {
const PUBLIC_BASE_URL = "https://api.zaif.jp/api/1";
const TRADE_BASE_URL = "https://api.zaif.jp/tapi";
const STREAMING_BASE_URL = "ws://api.zaif.jp:8888/stream";
private $key;
private $secret;
private $nonce;
public function __construct($key, $secret) {
$this->key = $key;
$this->secret = $secret;
$this->nonce = time();
$this->nonce = 1507466145;
}
public static function pub($endpoint, $prm) {
switch ($endpoint) {
case 'last_price':
case 'ticker':
case 'trades':
case 'depth':
break;
default:
throw new Exception('Argument has not been set.');
break;
}
switch ($prm) {
case 'btc_jpy':
case 'mona_jpy':
case 'mona_btc':
break;
default:
throw new Exception('Argument has not been set.');
break;
}
$url = self::PUBLIC_BASE_URL.'/'.$endpoint.'/'.$prm;
$data = self::get($url);
$data = json_decode( $data );
return $data;
}
public function trade($method, $prms=null) {
switch ($method) {
case 'get_info':
case 'get_info2':
case 'get_personal_info':
case 'trade_history':
case 'active_orders':
case 'trade' :
case 'cancel_order' :
case 'withdraw' :
case 'deposit_history' :
case 'withdraw_history' :
break;
default:
throw new Exception('Argument has not been set.');
break;
}
$postdata = array( "nonce" => $this->nonce++, "method" => $method );
if( !empty( $prms ) ) {
$postdata = array_merge( $postdata, $prms );
}
$postdata_query = http_build_query( $postdata );
echo $postdata_query;
echo "\n";
$sign = hash_hmac( 'sha512', $postdata_query, $this->secret);
echo $sign;
echo "\n";
$header = array( "Sign: {$sign}", "Key: {$this->key}", );
print_r($postdata_query);
echo "\n";
print_r($header);
$data = self::post( self::TRADE_BASE_URL, $header, $postdata_query );
$data = json_decode( $data );
return $data;
}
public static function streaming($prms, $callback) {
$file_path = dirname(__FILE__).'/vendor/autoload.php';
if (file_exists($file_path) && is_readable($file_path)) {
require_once $file_path ;
} else {
throw new Exception('You can not use Streaming API.You should check including libray.');
}
switch ($prms['currency_pair']) {
case 'btc_jpy':
case 'mona_jpy':
case 'mona_btc':
break;
default:
throw new Exception('Argument has not been set.');
return 0;
break;
}
$ws = self::STREAMING_BASE_URL.'?'.http_build_query($prms);
$client = new Client($ws);
while(true) {
try {
$json = $client->receive();
$data = json_decode($json);
$callback($data);
} catch (WebSocket\ConnectionException $e) {
$clinet = new Client($ws);
}
}
}
private static function get($url) {
$ch = curl_init();
$options = array(
CURLOPT_URL => $url,
CURLOPT_HEADER => false,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_SSL_VERIFYPEER => false,
);
curl_setopt_array($ch, $options);
$data = curl_exec($ch);
curl_close($ch);
return $data;
}
private static function post($url, $header, $postdata) {
$ch = curl_init();
$options = array(
CURLOPT_URL => $url,
CURLOPT_HEADER => false,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_SSL_VERIFYPEER => false,
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => $postdata,
CURLOPT_HTTPHEADER => $header,
);
curl_setopt_array($ch, $options);
$data = curl_exec($ch);
curl_close($ch);
return $data;
}
}
$zaif = new Zaif("f8893900-b764-4cdd-9693-16c23b8a118e","049d3499-1788-4145-b033-dab66ea2feda");
//$info = $zaif -> trade( "get_info");
$info2 = $zaif -> trade( "get_info2");
// 1モナ100円で15モナ売り板に出す
// $ trade_ask = $ zaif - > trade “ trade ”,
// array ' currency_pair ' => ' mona_jpy ' ' action ' => ' ask ' ' price ' = > 100 ' amount ' => 15 ;
// MONA_JPYの现在有效な注文一覧を表示する
//$ active_orders = $ zaif - > trade “ active_orders ”, array ' currency_pair ' => ' mona_jpy ';
//出力
// var_dump( $info);
var_dump( $info2 );
?>

55
vnpy/api/zaif/test.py Normal file
View File

@ -0,0 +1,55 @@
# encoding: utf-8
from vnzaif import *
def testTrade():
"""测试交易"""
accessKey = '你的accessKey'
secretKey = '你的secretKey'
# 创建API对象并初始化
api = TradeApi()
api.DEBUG = True
api.init(accessKey, secretKey)
# 查询账户,测试通过
#api.get_info()
api.get_info2()
# 查询委托,测试通过
#api.active_orders( currency_pair = SYMBOL_BTCJPY )
# 阻塞
input()
def testData():
"""测试行情接口"""
api = DataApi()
api.init(0.5 , 1)
# 订阅成交推送,测试通过
api.subscribeTick(SYMBOL_BTCJPY)
# 订阅最新价推送,测试通过
#api.subscribeLast(SYMBOL_BTCJPY)
# 订阅深度推送,测试通过
api.subscribeDepth(SYMBOL_BTCJPY, 1)
api.subscribeTrades(SYMBOL_BTCJPY)
# 查询K线数据测试通过
#data = api.getKline(SYMBOL_BTCJPY, PERIOD_1MIN, 100)
#print data
input()
if __name__ == '__main__':
#testTrade()
#testTrade()
testData()

500
vnpy/api/zaif/vnzaif.py Normal file
View File

@ -0,0 +1,500 @@
# encoding: utf-8
import urllib
import hashlib
import json
import requests
import hmac
import time
from datetime import datetime
from time import time, sleep , mktime
from Queue import Queue, Empty
from threading import Thread
import urllib
import hashlib
import inspect
import requests
import cerberus
SYMBOL_BTCJPY = 'btc_jpy'
SYMBOL_ETHJPY = 'eth_jpy'
FUNCTIONCODE_GETINFO_ZAIF = 'get_info'
FUNCTIONCODE_GETINFO2_ZAIF = 'get_info2'
FUNCTIONCODE_GETPERSONALINFO_ZAIF = 'get_personal_info'
FUNCTIONCODE_TRADEHISTORY_ZAIF = 'trade_history'
FUNCTIONCODE_ACTIVEORDERS_ZAIF = 'active_orders'
FUNCTIONCODE_TRADE_ZAIF = 'trade'
FUNCTIONCODE_CANCEL_ORDER_ZAIF = 'cancel_order'
FUNCTIONCODE_WITHDRAL_ZAIF = 'withdraw'
FUNCTIONCODE_DEPOSIT_HISTORY_ZAIF = 'deposit_history'
FUNCTIONCODE_WITHDRAW_HISTORY_ZAIF = 'withdraw_history'
SCHEMA = {
'from_num': {
'type': 'integer'
},
'count': {
'type': 'integer'
},
'from_id': {
'type': 'integer'
},
'end_id': {
'type': ['string', 'integer']
},
'order': {
'type': 'string',
'allowed': ['ASC', 'DESC']
},
'since': {
'type': 'integer'
},
'end': {
'type': ['string', 'integer']
},
'currency_pair': {
'type': 'string',
'allowed': ['btc_jpy', 'xem_jpy', 'mona_jpy', 'mona_btc']
},
'currency': {
'required': True,
'type': 'string',
'allowed': ['jpy', 'btc', 'mona']
},
'address': {
'required': True,
'type': 'string'
},
'message': {
'type': 'string'
},
'amount': {
'required': True,
'type': 'number'
},
'opt_fee': {
'type': 'number'
},
'order_id': {
'required': True,
'type': 'integer'
},
'action': {
'required': True,
'type': 'string',
'allowed': ['bid', 'ask']
},
'price': {
'required': True,
'type': 'number'
},
'limit': {
'type': 'number'
}
}
########################################################################
class TradeApi(object):
"""交易接口"""
__API_URL = 'https://api.zaif.jp/tapi'
DEBUG = True
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.accessKey = ''
self.secretKey = ''
self.active = False # API工作状态
self.reqID = 0 # 请求编号
self.reqQueue = Queue() # 请求队列
self.reqThread = Thread(target=self.processQueue) # 请求处理线程
self.nonce = int(mktime(datetime.now().timetuple()))
def params_pre_processing(self, schema_keys, params):
schema = self.__get_schema(schema_keys)
self.__validate(schema, params)
return self.__edit_params(params)
@classmethod
def __get_schema(cls, keys):
schema = {}
for key in keys:
schema[key] = SCHEMA[key]
return schema
@classmethod
def __edit_params(cls, params):
if 'from_num' in params:
params['from'] = params['from_num']
del (params['from_num'])
return params
@classmethod
def __validate(cls, schema, param):
v = cerberus.Validator(schema)
if v.validate(param):
return
raise Exception(json.dumps(v.errors))
def __get_parameter(self , func_name, params):
params['method'] = func_name
params['nonce'] = self.nonce
self.nonce += 1
return urllib.urlencode(params)
def __get_header(self, params):
signature = hmac.new(bytearray(self.secretKey.encode('utf-8')), digestmod=hashlib.sha512)
signature.update(params.encode('utf-8'))
return {
'key': str(self.accessKey),
'sign': str(signature.hexdigest())
}
#----------------------------------------------------------------------
def processRequest(self, req):
"""处理请求"""
# 读取方法和参数
schema_keys = req['schema_keys']
method = req['method']
kwargs = req['kwargs']
optional = req['optional']
params = self.params_pre_processing(schema_keys, kwargs)
params = self.__get_parameter(method, params)
header = self.__get_header(params)
r = requests.post(self.__API_URL , data=params , headers=header)
if r != None:
try:
data = r.json()
if data['success'] == 0:
print "error in vnzaif %s" % method
return data
else:
return data
except Exception,ex:
return None
else:
return None
'''
def processRequest(self, req):
schema_keys = req['schema_keys']
kwargs = req['kwargs']
method = req['method']
optional = req['optional']
params = self.params_pre_processing(schema_keys, kwargs)
params = self.__get_parameter(method, params)
header = self.__get_header(params)
response = requests.post(self.__API_URL, data=params, headers=header)
if response.status_code != 200:
raise Exception('return status code is {}'.format(response.status_code))
res = json.loads(response.text)
if res['success'] == 0:
raise Exception(res['error'])
return res['return']
'''
#----------------------------------------------------------------------
def processQueue(self):
"""处理请求队列中的请求"""
while self.active:
try:
req = self.reqQueue.get(block=True, timeout=1) # 获取请求的阻塞为一秒
callback = req['callback']
reqID = req['reqID']
data = self.processRequest(req)
# 请求失败
if 'code' in data and 'message' in data:
error = u'错误信息:%s' %data['message']
self.onError(error, req, reqID)
# 请求成功
else:
if self.DEBUG:
print callback.__name__
callback(data, req, reqID)
except Empty:
pass
#----------------------------------------------------------------------
def sendRequest(self, method, schema_keys , kwargs , callback, optional=None):
"""发送请求"""
# 请求编号加1
self.reqID += 1
# 生成请求字典并放入队列中
req = {}
req['method'] = method
req['schema_keys'] = schema_keys
req['kwargs'] = kwargs
#req['params'] = params
req['callback'] = callback
req['optional'] = optional
req['reqID'] = self.reqID
self.reqQueue.put(req)
# 返回请求编号
return self.reqID
####################################################
## 主动函数
####################################################
#----------------------------------------------------------------------
def init(self, accessKey, secretKey):
"""初始化"""
self.accessKey = accessKey
self.secretKey = secretKey
self.active = True
self.reqThread.start()
#----------------------------------------------------------------------
def exit(self):
"""退出"""
self.active = False
if self.reqThread.isAlive():
self.reqThread.join()
#---------------------------------------------------
'''
def get_info(self):
print FUNCTIONCODE_GETINFO_ZAIF
method = FUNCTIONCODE_GETINFO_ZAIF
key = {}
params = {}
callback = self.onGet_info
optional = {}
print params
return self.sendRequest(method, key, params,callback, optional)
'''
def get_info2(self):
method = FUNCTIONCODE_GETINFO2_ZAIF
key = {}
params = {}
callback = self.onGet_info2
optional = {}
return self.sendRequest(method, key, params, callback, optional)
def trade_history(self , **kwargs):
method = FUNCTIONCODE_TRADEHISTORY_ZAIF
schema_keys = ['from_num', 'count', 'from_id', 'end_id', 'order', 'since', 'end', 'currency_pair']
callback = self.onTradeHistory
optional = {}
return self.sendRequest(method, schema_keys, kwargs, callback, optional)
def active_orders(self , **kwargs):
method = FUNCTIONCODE_ACTIVEORDERS_ZAIF
schema_keys = ['currency_pair']
callback = self.onActiveOrders
optional = {}
return self.sendRequest(method, schema_keys, kwargs, callback, optional)
def withdraw_history(self , **kwargs):
method = FUNCTIONCODE_WITHDRAW_HISTORY_ZAIF
schema_keys = ['currency', 'from_num', 'count', 'from_id', 'end_id', 'order', 'since', 'end']
callback = self.onWithdrawHistory
optional = {}
return self.sendRequest(method, schema_keys, kwargs, callback, optional)
def deposit_history(self , **kwargs):
method = FUNCTIONCODE_DEPOSIT_HISTORY_ZAIF
schema_keys = ['currency', 'from_num', 'count', 'from_id', 'end_id', 'order', 'since', 'end']
callback = self.onDepositHistory
optional = {}
return self.sendRequest(method, schema_keys, kwargs, callback, optional)
def withdraw(self , **kwargs):
method = FUNCTIONCODE_WITHDRAL_ZAIF
schema_keys = ['currency', 'address', 'message', 'amount', 'opt_fee']
callback = self.onWithdraw
optional = {}
return self.sendRequest(method, schema_keys, kwargs, callback, optional)
def cancel_order(self, **kwargs):
method = FUNCTIONCODE_CANCEL_ORDER_ZAIF
schema_keys = ['order_id']
callback = self.onCancelOrder
optional = {}
return self.sendRequest(method, schema_keys, kwargs, callback, optional)
def trade(self, **kwargs):
method = FUNCTIONCODE_TRADE_ZAIF
schema_keys = ['currency_pair', 'action', 'price', 'amount', 'limit']
callback = self.onTrade
optional = {}
return self.sendRequest(method, schema_keys, kwargs, callback, optional)
####################################################
## 回调函数
####################################################
'''
def onGet_info(self, data, req, reqID):
"""用户信息"""
print data
'''
def onTrade(self, data, req, reqID):
print data
def onCancelOrder(self, data, req, reqID):
print data
def onWithdraw(self, data, req, reqID):
print data
def onDepositHistory(self, data, req, reqID):
print data
def onWithdrawHistory(self, data, req, reqID):
print data
def onTradeHistory(self, data, req, reqID):
print data
def onActiveOrders(self, data, req, reqID):
print data
def onGet_info2(self, data, req, reqID):
"""用户信息"""
print data
class DataApi(object):
"""
行情接口
"""
TICK_SYMBOL_URL = {
SYMBOL_BTCJPY: 'https://api.zaif.jp/api/1/ticker/btc_jpy',
SYMBOL_ETHJPY: 'https://api.zaif.jp/api/1/ticker/eth_jpy'
}
TRADES_SYMBOL_URL = {
SYMBOL_BTCJPY: 'https://api.zaif.jp/api/1/trades/btc_jpy',
SYMBOL_ETHJPY: 'https://api.zaif.jp/api/1/trades/eth_jpy',
}
DEPTH_SYMBOL_URL = {
SYMBOL_BTCJPY: 'https://api.zaif.jp/api/1/depth/btc_jpy',
SYMBOL_ETHJPY: 'https://api.zaif.jp/api/1/depth/eth_jpy',
}
LAST_SYMBOL_URL = {
SYMBOL_BTCJPY: 'https://api.zaif.jp/api/1/last_price/btc_jpy',
SYMBOL_ETHJPY: 'https://api.zaif.jp/api/1/last_price/eth_jpy',
}
DEBUG = True
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.active = False
self.taskInterval = 0 # 每轮请求延时
self.taskList = [] # 订阅的任务列表
self.taskThread = Thread(target=self.run) # 处理任务的线程
#----------------------------------------------------------------------
def init(self, interval, debug):
"""初始化"""
self.taskInterval = interval
self.DEBUG = debug
self.active = True
self.taskThread.start()
#----------------------------------------------------------------------
def exit(self):
"""退出"""
self.active = False
if self.taskThread.isAlive():
self.taskThread.join()
#----------------------------------------------------------------------
def run(self):
"""连续运行"""
while self.active:
for url, callback in self.taskList:
try:
r = requests.get(url)
if r.status_code == 200:
data = r.json()
if self.DEBUG:
print callback.__name__
data = {"return_symbol": (url.split('/'))[-1].split('_')[0] , "data":data}
#data["return_symbol"] =
callback(data)
except Exception, e:
print e
sleep(self.taskInterval)
#----------------------------------------------------------------------
def subscribeTick(self, symbol):
"""订阅实时成交数据"""
url = self.TICK_SYMBOL_URL[symbol]
task = (url, self.onTick)
self.taskList.append(task)
#----------------------------------------------------------------------
def subscribeLast(self, symbol):
"""订阅实时成交数据"""
url = self.LAST_SYMBOL_URL[symbol]
task = (url, self.onLast)
self.taskList.append(task)
#----------------------------------------------------------------------
def subscribeTrades(self, symbol):
"""订阅实时报价数据"""
url = self.TRADES_SYMBOL_URL[symbol]
task = (url, self.onTrades)
self.taskList.append(task)
#----------------------------------------------------------------------
def subscribeDepth(self, symbol, level=0):
"""订阅深度数据"""
url = self.DEPTH_SYMBOL_URL[symbol]
if level:
url = url.replace('json', str(level))
task = (url, self.onDepth)
self.taskList.append(task)
#----------------------------------------------------------------------
def onTick(self, data):
"""实时成交推送"""
print data
#----------------------------------------------------------------------
def onLast(self, data):
"""实时深度推送"""
print data
#----------------------------------------------------------------------
def onTrades(self, data):
"""实时深度推送"""
print data
#----------------------------------------------------------------------
def onDepth(self, data):
"""实时深度推送"""
print data

3
vnpy/api/zb/__init__.py Normal file
View File

@ -0,0 +1,3 @@
# encoding: UTF-8
from vnzb import ZB_Sub_Spot_Api

53
vnpy/api/zb/reame.txt Normal file
View File

@ -0,0 +1,53 @@
API 文档地址
https://www.zb.com/i/developer/websocketApi#config
代码 描述
1000 调用成功
1001 一般错误提示
1002 内部错误
1003 验证不通过
1004 资金安全密码锁定
1005 资金安全密码错误,请确认后重新输入。
1006 实名认证等待审核或审核不通过
1007 频道为空
1008 事件为空
1009 此接口维护中
2001 人民币账户余额不足
2002 比特币账户余额不足
2003 莱特币账户余额不足
2005 以太币账户余额不足
2006 ETC币账户余额不足
2007 BTS币账户余额不足
2008 EOS币账户余额不足
2009 账户余额不足
3001 挂单没有找到
3002 无效的金额
3003 无效的数量
3004 用户不存在
3005 无效的参数
3006 无效的IP或与绑定的IP不一致
3007 请求时间已失效
3008 交易记录没有找到
4001 API接口被锁定或未启用
4002 请求过于频繁
WebSocket服务地址
ZB WebSocket服务连接地址wss://api.zb.com:9999/websocket
接口 描述
ticker行情
depth市场深度
trades历史成交
交易API
用于ZB快速进行交易
接口 描述
order委托下单
cancelOrder取消委托
getOrder获取委托买单或卖单
getorders获取多个委托买单或卖单每次请求返回10条记录
getordersignoretradetype取消tradeType字段过滤可同时获取买单和卖单每次请求返回pageSize<100条记录
getaccountinfo获取用户信息

33
vnpy/api/zb/test.py Normal file
View File

@ -0,0 +1,33 @@
# encoding: UTF-8
from vnzb import *
# 在OkCoin网站申请这两个Key分别对应用户名和密码
apiKey = '你的apiKey'
secretKey = '你的secreKey'
# 创建API对象
api = ZB_Sub_Spot_Api()
api.connect_Subpot(apiKey, secretKey, True)
sleep(3)
# api.login()
# api.subscribeSpotTicker("ltc_btc")
# api.subscribeSpotDepth("ltc_btc")
# api.subscribeSpotTrades("ltc_btc")
#api.subscribeSpotKlines("bch_btc","30min")
#api.spotTrade("btc_qc", 1 , "50" , "0.01")
'''
{"message":"鎿嶄綔鎴愬姛","no":"0","data":"{entrustId:2017121051685}","code":1000,"channel":"btcqc_order","success":true}
{"message":"鎿嶄綔鎴愬姛","no":"0","data":"{entrustId:2017121051713}","code":1000,"channel":"btcqc_order","success":true}
'''
#api.spotUserInfo()
#api.spotCancelOrder("btc_qc", "2017121051685")
#api.spotOrderInfo("btc_qc","2017121051713")
#api.spotGetOrders("btc_qc" , "1" , "1")
#api.spotGetOrderSignOrderTradeType("btc_qc", 1 , 10 , 1)

258
vnpy/api/zb/vnzb.py Normal file
View File

@ -0,0 +1,258 @@
# encoding: UTF-8
import hashlib
import zlib
import json
from time import sleep
from threading import Thread
import websocket
import urllib2, hashlib,struct,sha,time
# OKEX网站
zb_usd_url = "wss://api.zb.com:9999/websocket"
zb_all_symbols = ["ltc_btc"]
class ZB_Sub_Spot_Api(object):
"""基于Websocket的API对象"""
def __init__(self):
"""Constructor"""
self.apiKey = '' # 用户名
self.secretKey = '' # 密码
self.ws_sub_spot = None # websocket应用对象 现货对象
#----------------------------------------------------------------------
def reconnect(self):
"""重新连接"""
# 首先关闭之前的连接
self.close()
# 再执行重连任务
self.ws_sub_spot = websocket.WebSocketApp(self.host,
on_message=self.onMessage,
on_error=self.onError,
on_close=self.onClose,
on_open=self.onOpen)
self.thread = Thread(target=self.ws_sub_spot.run_forever)
self.thread.start()
#----------------------------------------------------------------------
def connect_Subpot(self, apiKey , secretKey , trace = False):
self.host = zb_usd_url
self.apiKey = apiKey
self.secretKey = secretKey
websocket.enableTrace(trace)
self.ws_sub_spot = websocket.WebSocketApp(self.host,
on_message=self.onMessage,
on_error=self.onError,
on_close=self.onClose,
on_open=self.onOpen)
self.thread = Thread(target=self.ws_sub_spot.run_forever)
self.thread.start()
#----------------------------------------------------------------------
def readData(self, evt):
"""解压缩推送收到的数据"""
# # 创建解压器
# decompress = zlib.decompressobj(-zlib.MAX_WBITS)
# # 将原始数据解压成字符串
# inflated = decompress.decompress(evt) + decompress.flush()
# 通过json解析字符串
data = json.loads(evt)
return data
#----------------------------------------------------------------------
def close(self):
"""关闭接口"""
if self.thread and self.thread.isAlive():
self.ws_sub_spot.close()
self.thread.join()
#----------------------------------------------------------------------
def onMessage(self, ws, evt):
"""信息推送"""
print evt
#----------------------------------------------------------------------
def onError(self, ws, evt):
"""错误推送"""
print 'onError'
print evt
#----------------------------------------------------------------------
def onClose(self, ws):
"""接口断开"""
print 'onClose'
#----------------------------------------------------------------------
def onOpen(self, ws):
"""接口打开"""
print 'onOpen'
#----------------------------------------------------------------------
def subscribeSpotTicker(self, symbol_pair):
# 现货的 ticker
symbol_pair = symbol_pair.replace('_','')
req = "{'event':'addChannel','channel':'%s_ticker'}" % symbol_pair
self.ws_sub_spot.send(req)
#----------------------------------------------------------------------
def subscribeSpotDepth(self, symbol_pair):
# 现货的 市场深度
symbol_pair = symbol_pair.replace('_','')
req = "{'event':'addChannel','channel':'%s_depth'}" % symbol_pair
self.ws_sub_spot.send(req)
#----------------------------------------------------------------------
def subscribeSpotTrades(self, symbol_pair):
symbol_pair = symbol_pair.replace('_','')
req = "{'event':'addChannel','channel':'%s_trades'}" % symbol_pair
self.ws_sub_spot.send(req)
#----------------------------------------------------------------------
def __fill(self, value, lenght, fillByte):
if len(value) >= lenght:
return value
else:
fillSize = lenght - len(value)
return value + chr(fillByte) * fillSize
#----------------------------------------------------------------------
def __doXOr(self, s, value):
slist = list(s)
for index in xrange(len(slist)):
slist[index] = chr(ord(slist[index]) ^ value)
return "".join(slist)
#----------------------------------------------------------------------
def __hmacSign(self, aValue, aKey):
keyb = struct.pack("%ds" % len(aKey), aKey)
value = struct.pack("%ds" % len(aValue), aValue)
k_ipad = self.__doXOr(keyb, 0x36)
k_opad = self.__doXOr(keyb, 0x5c)
k_ipad = self.__fill(k_ipad, 64, 54)
k_opad = self.__fill(k_opad, 64, 92)
m = hashlib.md5()
m.update(k_ipad)
m.update(value)
dg = m.digest()
m = hashlib.md5()
m.update(k_opad)
subStr = dg[0:16]
m.update(subStr)
dg = m.hexdigest()
return dg
#----------------------------------------------------------------------
def __digest(self, aValue):
value = struct.pack("%ds" % len(aValue), aValue)
h = sha.new()
h.update(value)
dg = h.hexdigest()
return dg
#----------------------------------------------------------------------
def generateSign(self, params):
"""生成签名"""
l = []
for key in sorted(params.keys()):
l.append('"%s":"%s"' %(key, params[key]))
sign = ','.join(l)
sign = '{' + sign + '}'
SHA_secret = self.__digest(self.secretKey)
return self.__hmacSign( sign, SHA_secret)
# return hashlib.md5(sign.encode('utf-8')).hexdigest().upper()
#----------------------------------------------------------------------
def sendTradingRequest(self, channel, params):
"""发送交易请求"""
# 在参数字典中加上api_key和签名字段
params['accesskey'] = self.apiKey
params['channel'] = channel
params['event'] = "addChannel"
params['sign'] = self.generateSign(params)
# 使用json打包并发送
j = json.dumps(params)
# 若触发异常则重连
try:
self.ws_sub_spot.send(j)
except websocket.WebSocketConnectionClosedException:
pass
#----------------------------------------------------------------------
def spotTrade(self, symbol_pair, type_, price, amount):
"""现货委托"""
symbol_pair = symbol_pair.replace('_','')
params = {}
params['tradeType'] = str(type_)
params['price'] = str(price)
params['amount'] = str(amount)
channel = symbol_pair.lower() + "_order"
self.sendTradingRequest(channel, params)
#----------------------------------------------------------------------
def spotCancelOrder(self, symbol_pair, orderid):
"""现货撤单"""
symbol_pair = symbol_pair.replace('_','')
params = {}
params['id'] = str(orderid)
channel = symbol_pair.lower() + "_cancelorder"
self.sendTradingRequest(channel, params)
#----------------------------------------------------------------------
def spotUserInfo(self):
"""查询现货账户"""
channel = 'getaccountinfo'
self.sendTradingRequest(channel, {})
#----------------------------------------------------------------------
def spotOrderInfo(self, symbol_pair, orderid):
"""查询现货委托信息"""
symbol_pair = symbol_pair.replace('_','')
params = {}
params['id'] = str(orderid)
channel = symbol_pair.lower() + "_getorder"
self.sendTradingRequest(channel, params)
#----------------------------------------------------------------------
def spotGetOrders(self, symbol_pair , pageIndex , type_):
"""查询现货所有委托信息"""
symbol_pair = symbol_pair.replace('_','')
params = {}
params['pageIndex'] = str(pageIndex)
params['tradeType'] = str(type_)
channel = symbol_pair.lower() + "_getorders"
self.sendTradingRequest(channel, params)
#----------------------------------------------------------------------
def spotGetOrderSignOrderTradeType(self , symbol_pair , pageIndex , pageSize , type_):
symbol_pair = symbol_pair.replace('_','')
params = {}
params['pageIndex'] = str(pageIndex)
params['pageSize'] = str(pageSize)
params['tradeType'] = str(type_)
channel = symbol_pair.lower() + "_getordersignoretradetype"
self.sendTradingRequest(channel, params)

View File

@ -0,0 +1,7 @@
{
"accountID": "你的账户ID",
"accessKey": "你的key",
"secretKey": "你的secretKey",
"interval": 0.5,
"debug": false
}

View File

@ -0,0 +1,11 @@
# encoding: UTF-8
from vnpy.trader import vtConstant
from coincheckGateway import CoincheckGateway, CoincheckTradeApi , CoincheckSocketDataApi
gatewayClass = CoincheckGateway
gatewayName = 'COINCHECK'
gatewayDisplayName = u'COINCHECK'
gatewayType = vtConstant.GATEWAYTYPE_BTC
gatewayQryEnabled = True

View File

@ -0,0 +1,741 @@
# encoding: UTF-8
'''
vn.coincheck的gateway接入
'''
import os
import json
from datetime import datetime
from copy import copy
from threading import Condition
from Queue import Queue
from threading import Thread
import json
from vnpy.api.coincheck import vncoincheck
from vnpy.trader.vtGateway import *
from vnpy.trader.vtFunction import getJsonPath
from datetime import datetime , timedelta
SYMBOL_BTCJPY = 'btc_jpy'
COINCHECK_HOSTS = "wss://ws-api.coincheck.com"
class CoincheckGateway(VtGateway):
"""coincheck 接口"""
#----------------------------------------------------------------------
def __init__(self, eventEngine, gatewayName='COINCHECK'):
"""Constructor"""
super(CoincheckGateway, self).__init__(eventEngine, gatewayName)
self.tradeApi = CoincheckTradeApi(self)
#self.dataApi = CoincheckDataApi(self)
self.dataApi = CoincheckSocketDataApi(self)
self.fileName = self.gatewayName + '_connect.json'
self.filePath = getJsonPath(self.fileName, __file__)
#----------------------------------------------------------------------
def connect(self):
"""连接"""
# 载入json文件
try:
f = file(self.filePath)
except IOError:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'读取连接配置出错,请检查'
self.onLog(log)
return
# 解析json文件
setting = json.load(f)
try:
accessKey = str(setting['accessKey'])
secretKey = str(setting['secretKey'])
interval = setting['interval']
debug = setting['debug']
useAccountID = str(setting['accountID'])
except KeyError:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'连接配置缺少字段,请检查'
self.onLog(log)
return
# 设置账户ID
self.tradeApi.setAccountID( useAccountID)
# 初始化接口
self.tradeApi.init(accessKey, secretKey)
self.writeLog(u'交易接口初始化成功')
#self.dataApi.connect(interval, debug)
self.dataApi.connect()
self.writeLog(u'行情接口初始化成功')
# 启动查询
self.initQuery()
self.startQuery()
#----------------------------------------------------------------------
def writeLog(self, content):
"""发出日志"""
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = content
self.onLog(log)
#----------------------------------------------------------------------
def subscribe(self, subscribeReq):
"""订阅行情,自动订阅全部行情,无需实现"""
pass
#----------------------------------------------------------------------
def sendOrder(self, orderReq):
"""发单"""
return self.tradeApi.sendOrder(orderReq)
#----------------------------------------------------------------------
def cancelOrder(self, cancelOrderReq):
"""撤单"""
return self.tradeApi.cancel(cancelOrderReq)
#----------------------------------------------------------------------
def qryAccount(self):
"""查询账户资金"""
pass
#----------------------------------------------------------------------
def qryPosition(self):
"""查询持仓"""
pass
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.tradeApi.exit()
self.dataApi.exit()
#----------------------------------------------------------------------
def initQuery(self):
"""初始化连续查询"""
if self.qryEnabled:
self.qryFunctionList = [self.tradeApi.get_balance , self.tradeApi.list_orders]
#self.qryFunctionList = [self.tradeApi.queryWorkingOrders, self.tradeApi.queryAccount]
self.startQuery()
#----------------------------------------------------------------------
def query(self, event):
"""注册到事件处理引擎上的查询函数"""
for function in self.qryFunctionList:
function()
#----------------------------------------------------------------------
def startQuery(self):
"""启动连续查询"""
self.eventEngine.register(EVENT_TIMER, self.query)
#----------------------------------------------------------------------
def onListOrder(self, data):
print data
#----------------------------------------------------------------------
def setQryEnabled(self, qryEnabled):
"""设置是否要启动循环查询"""
self.qryEnabled = qryEnabled
class CoincheckTradeApi(vncoincheck.TradeApi):
def __init__(self, gateway):
super(CoincheckTradeApi , self).__init__()
self.gateway = gateway
self.gatewayName = gateway.gatewayName
self.accountID = "COINCHECK"
self.DEBUG = False
self.localID = 0 # 本地委托号
self.localSystemDict = {} # key:localID, value:systemID
self.systemLocalDict = {} # key:systemID, value:localID
self.workingOrderDict = {} # key:localID, value:order
self.reqLocalDict = {} # key:reqID, value:localID
self.cancelDict = {} # key:localID, value:cancelOrderReq
self.tradedVolumeDict = {} # key:localID, value:volume ,已经交易成功的数量
self.tradeID = 0 # 本地成交号
#--------------
def setAccountID(self, useAccountID):
self.accountID = useAccountID
#----------------------------------------------------------------------
def onError(self, method ,data):
print method , data
#
'''
"return" :
{u'lending_leverage': u'5.0', u'success': True, u'maker_fee': u'0.0', u'email': u'liyi.riki.thomas@g
mail.com', u'bitcoin_address': u'1Q73J2e46TrBv9cRCtfgcszqEcwNDsei53', u'taker_fee': u'0.0', u'identi
ty_status': u'identity_verified', u'id': 1007549}
'''
def onGet_info(self, data, req, reqID):
"""用户信息"""
print data
'''
{u'zec': u'0', u'rep_debt': u'0.0', u'xem': u'0', u'lsk': u'0', u'rep_lend_in_use': u'0.0', u'ltc_de
bt': u'0.0', u'xmr_reserved': u'0.0', u'cny': u'0', u'btc_reserved': u'0.0', u'dao_reserved': u'0.0'
, u'ltc_lent': u'0.0', u'dao_lend_in_use': u'0.0', u'xrp_reserved': u'0.0', u'zec_debt': u'0.0', u'b
ch_lent': u'0.0', u'dao_debt': u'0.0', u'xmr': u'0', u'rep_reserved': u'0.0', u'dao': u'0', u'xem_le
nd_in_use': u'0.0', u'fct_lent': u'0.0', u'jpy_reserved': u'0.0', u'success': True, u'fct_reserved':
u'0.0', u'xem_lent': u'0.0', u'rep_lent': u'0.0', u'eth_lend_in_use': u'0.0', u'btc': u'0', u'usd_l
end_in_use': u'0.0', u'zec_lent': u'0.0', u'rep': u'0', u'xmr_debt': u'0.0', u'bch_lend_in_use': u'0
.0', u'xrp_debt': u'0.0', u'etc_lend_in_use': u'0.0', u'dash_reserved': u'0.0', u'dash_lent': u'0.0'
, u'dash_debt': u'0.0', u'jpy_lend_in_use': u'0.0', u'lsk_lend_in_use': u'0.0', u'eth_lent': u'0.0',
u'ltc': u'0', u'etc': u'0', u'ltc_lend_in_use': u'0.0', u'eth': u'0', u'usd_debt': u'0.0', u'ltc_re
served': u'0.0', u'cny_reserved': u'0.0', u'xem_debt': u'0.0', u'eth_reserved': u'0.0', u'zec_reserv
ed': u'0.0', u'usd': u'0', u'cny_lend_in_use': u'0.0', u'lsk_debt': u'0.0', u'xmr_lend_in_use': u'0.
0', u'dash_lend_in_use': u'0.0', u'xrp_lent': u'0.0', u'bch_reserved': u'0.0', u'xmr_lent': u'0.0',
u'bch_debt': u'0.0', u'bch': u'0', u'jpy': u'0', u'fct_debt': u'0.0', u'btc_debt': u'0.0', u'usd_len
t': u'0.0', u'btc_lent': u'0.0', u'lsk_reserved': u'0.0', u'etc_debt': u'0.0', u'jpy_lent': u'0.0',
u'dash': u'0', u'cny_debt': u'0.0', u'xrp_lend_in_use': u'0.0', u'xem_reserved': u'0.0', u'dao_lent'
: u'0.0', u'lsk_lent': u'0.0', u'etc_lent': u'0.0', u'jpy_debt': u'0.0', u'xrp': u'0', u'fct': u'0',
u'etc_reserved': u'0.0', u'usd_reserved': u'0.0', u'fct_lend_in_use': u'0.0', u'btc_lend_in_use': u
'0.0', u'zec_lend_in_use': u'0.0', u'eth_debt': u'0.0', u'cny_lent': u'0.0'}
'''
def onGet_balance(self, data, req, reqID):
if data["success"] == 0:
print "Error in onGet_balance"
print data
else:
account = VtAccountData()
account.gatewayName = self.gatewayName
account.accountID = self.accountID
account.vtAccountID = '.'.join([ self.gatewayName , self.accountID])
account.balance = float(data['jpy'])
account.balance = float(data['jpy'])
account.available = float(data['jpy'])
account.margin = 1.0
account.closeProfit = 0.0
account.positionProfit = 0.0
account.commission = 0.0
account.now_has_hands = float(data['jpy'])
self.gateway.onAccount(account)
for symbol in ['btc']:
posObj = VtPositionData()
posObj.gatewayName = self.gatewayName
posObj.symbol = symbol + "_jpy." + EXCHANGE_COINCHECK
posObj.exchange = EXCHANGE_COINCHECK
posObj.vtSymbol = posObj.symbol
posObj.direction = DIRECTION_LONG
posObj.vtPositionName = '.'.join( [posObj.vtSymbol, posObj.direction])
posObj.ydPosition = float(data[symbol])
posObj.position = float(data[symbol]) + float(data[symbol + "_reserved"])
posObj.frozen = float(data[symbol + "_reserved"])
posObj.positionProfit = 0
self.gateway.onPosition(posObj)
'''
发送系统委托
'''
def sendOrder(self, req):
"""发送委托"""
# 检查是否填入了价格,禁止市价委托
if req.priceType != PRICETYPE_LIMITPRICE:
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorMsg = u'Coincheck接口仅支持限价单'
err.errorTime = datetime.now().strftime('%H:%M:%S')
self.gateway.onError(err)
return None
symbol = req.vtSymbol
if req.direction == DIRECTION_LONG:
reqID = self.buy_btc_jpy( rate = req.price , amount = req.volume )
else:
reqID = self.sell_btc_jpy( rate = req.price , amount = req.volume )
self.localID += 1
localID = str(self.localID)
self.reqLocalDict[reqID] = localID
# 推送委托信息
order = VtOrderData()
order.gatewayName = self.gatewayName
order.symbol = req.symbol
order.exchange = EXCHANGE_COINCHECK
order.vtSymbol = order.symbol
order.orderID = localID
order.vtOrderID = '.'.join([order.orderID, order.gatewayName])
order.direction = req.direction
if req.direction == DIRECTION_LONG:
order.offset = OFFSET_OPEN
else:
order.offset = OFFSET_CLOSE
order.price = req.price
order.volume = req.volume
order.orderTime = datetime.now().strftime('%H:%M:%S')
order.status = STATUS_UNKNOWN
self.workingOrderDict[localID] = order
self.gateway.onOrder(order)
# 返回委托号
return order.vtOrderID
'''
{u'market_buy_amount': None, u'order_type': u'buy', u'success': True, u'created_at': u'2017-10-16T13
:53:01.678Z', u'rate': u'100.0', u'amount': u'0.005', u'pair': u'btc_jpy', u'stop_loss_rate': None,
u'id': 324141928}
'''
def onBuy_btc(self, data, req, reqID):
# print "onBuy_btc"
# print data
if data["success"] == 0:
print "Error in onBuy_btc"
print data
else:
localID = self.reqLocalDict[reqID]
systemID = data['id']
self.localSystemDict[localID] = systemID
self.systemLocalDict[systemID] = localID
# 撤单
if localID in self.cancelDict:
req = self.cancelDict[localID]
self.cancel(req)
del self.cancelDict[localID]
# 推送委托信息
order = self.workingOrderDict[localID]
if data['success'] != 0:
order.status = STATUS_NOTTRADED
self.tradedVolumeDict[localID] = 0.0
self.gateway.onOrder(order)
def onSell_btc(self, data, req, reqID):
# print "onSell_btc"
# print data
"""卖出回调"""
if data["success"] == 0:
print "Error in onSell_btc"
else:
localID = self.reqLocalDict[reqID]
systemID = data['id']
self.localSystemDict[localID] = systemID
self.systemLocalDict[systemID] = localID
# 撤单
if localID in self.cancelDict:
req = self.cancelDict[localID]
self.cancel(req)
del self.cancelDict[localID]
# 推送委托信息
order = self.workingOrderDict[localID]
if data['success'] != 0:
order.status = STATUS_NOTTRADED
self.tradedVolumeDict[localID] = 0.0
self.gateway.onOrder(order)
'''
{u'orders': [{u'order_type': u'buy', u'created_at': u'2017-10-16T13:51:41.000Z', u'pending_market_bu
y_amount': None, u'rate': u'200.0', u'pair': u'btc_jpy', u'stop_loss_rate': None, u'id': 324139122,
u'pending_amount': u'0.005'}, {u'order_type': u'buy', u'created_at': u'2017-10-16T13:53:01.000Z', u'
pending_market_buy_amount': None, u'rate': u'100.0', u'pair': u'btc_jpy', u'stop_loss_rate': None, u
'id': 324141928, u'pending_amount': u'0.005'}], u'success': True}
只显示 未结算的 订单 如果订单被结算了说明已经成交了
'''
def onList_order(self, data, req, reqID):
# print "onList_order"
# self.gateway.onListOrder( data)
if data["success"] == 0:
pass
else:
orders = data["orders"]
now_datetime = datetime.now()
ten_seconds_before = now_datetime + timedelta(seconds=-10)
ten_seconds_str = (ten_seconds_before.strftime("%Y-%m-%dT%H:%M:%S.%f"))[:-3] + "Z"
stile_live_order_system_id = [ x["id"] for x in orders]
#print "stile_live_order_system_id", stile_live_order_system_id
local_system_dict_keys = self.systemLocalDict.keys()
# 对系统中有的订单,进行
for bef_system_id in local_system_dict_keys:
if bef_system_id not in stile_live_order_system_id:
# 说明这个单子成交完毕了!
# 或者就是取消了
localID = self.systemLocalDict[bef_system_id]
order = self.workingOrderDict.get(localID, None)
if order != None:
bef_has_volume = self.tradedVolumeDict.get(localID , 0.0)
newTradeVolume = order.volume - bef_has_volume
trade = VtTradeData()
trade.gatewayName = self.gatewayName
trade.symbol = order.symbol
trade.vtSymbol = order.vtSymbol
self.tradeID += 1
trade.tradeID = str(self.tradeID)
trade.vtTradeID = '.'.join([trade.tradeID, trade.gatewayName])
trade.orderID = order.orderID
trade.vtOrderID = order.vtOrderID
trade.volume = newTradeVolume
trade.price = order.price
trade.direction = order.direction
trade.offset = order.offset
trade.exchange = order.exchange
trade.tradeTime = datetime.now().strftime('%H:%M:%S')
self.gateway.onTrade(trade)
order.status = STATUS_ALLTRADED
self.gateway.onOrder(order)
del self.tradedVolumeDict[localID]
del self.systemLocalDict[bef_system_id]
del self.workingOrderDict[localID]
for d in orders:
coinID = d["id"]
if coinID in local_system_dict_keys:
localID = self.systemLocalDict[coinID]
order = self.workingOrderDict.get(localID, None)
if order != None:
bef_has_volume = self.tradedVolumeDict.get(localID , 0.0)
has_traded_volume = d["pending_market_buy_amount"]
if has_traded_volume == None:
has_traded_volume = 0.0
newTradeVolume = float(has_traded_volume) - float(bef_has_volume)
if newTradeVolume > 0.00000001:
trade = VtTradeData()
trade.gatewayName = self.gatewayName
trade.symbol = order.symbol
trade.vtSymbol = order.vtSymbol
self.tradeID += 1
trade.tradeID = str(self.tradeID)
trade.vtTradeID = '.'.join([trade.tradeID, trade.gatewayName])
trade.orderID = order.orderID
trade.vtOrderID = order.vtOrderID
trade.volume = newTradeVolume
trade.price = order.price
trade.direction = order.direction
trade.offset = order.offset
trade.exchange = order.exchange
trade.tradeTime = datetime.now().strftime('%H:%M:%S')
self.gateway.onTrade(trade)
order.status = STATUS_PARTTRADED
self.gateway.onOrder(order)
else:
# 说明这是一个 不知道 哪里来的订单
# 推送委托信息
# 订单有两种可能
# 1、人工发的单
# 2、前面取消失败的单 # 总有些订单是取消失败的 , 如果出现了,那么就取消掉这些交易
# 所以对于订单进行判断如果订单时间超过10秒 那么取消掉
if order.orderTime < ten_seconds_str :
# 判断为需要取消的单子
self.cancel_orders( coinID )
else:
self.localID += 1
localID = str(self.localID)
symbol_pair = d['pair'] # btc_jpy
order = VtOrderData()
order.gatewayName = self.gatewayName
order.symbol = symbol_pair + "." + self.gatewayName
order.exchange = EXCHANGE_COINCHECK
order.vtSymbol = order.symbol
order.orderID = localID
order.vtOrderID = '.'.join(["mistake", order.gatewayName])
order.direction = DIRECTION_LONG
order.offset = OFFSET_OPEN
order.price = float(d["rate"])
order.volume = float(d["pending_amount"])
order.orderTime = d["created_at"]
order.status = STATUS_MISTAKE
self.workingOrderDict[localID] = order
self.systemLocalDict[coinID] = localID
self.localSystemDict[localID] = coinID
self.gateway.onOrder(order)
'''
{
"success": true,
"id": 12345
}
'''
def onCancel_orders(self, data, req, reqID):
# self.gateway.onCancelOrder( data)
if data['success'] != 0:
systemID = data["id"]
localID = self.systemLocalDict[systemID]
order = self.workingOrderDict[localID]
order.status = STATUS_CANCELLED
del self.workingOrderDict[localID]
del self.systemLocalDict[systemID]
del self.localSystemDict[localID]
self.gateway.onOrder(order)
def onHistory_orders(self, data, req, reqID):
print data
def cancel(self, req):
localID = req.orderID
if localID in self.localSystemDict:
systemID = self.localSystemDict[localID]
self.cancel_orders( systemID )
else:
self.cancelDict[localID] = req
class CoincheckSocketDataApi(vncoincheck.DataApiSocket):
"""基于websocket的TICK数据获得对象"""
#----------------------------------------------------------------------
def __init__(self, gateway):
super(CoincheckSocketDataApi, self).__init__()
self.market = 'jpy'
self.gateway = gateway
self.gatewayName = gateway.gatewayName
self.tickDict = {} # key:symbol, value:tick
self.period_flag = False
def connect(self ):
super(CoincheckSocketDataApi, self).connect( COINCHECK_HOSTS)
def onOrderbooks(self, data):
symbol = SYMBOL_BTCJPY
if symbol not in self.tickDict:
tick = VtTickData()
tick.gatewayName = self.gatewayName
tick.exchange = EXCHANGE_COINCHECK
tick.symbol = '.'.join([symbol, tick.exchange])
tick.vtSymbol = '.'.join([symbol, tick.exchange])
self.tickDict[symbol] = tick
else:
tick = self.tickDict[symbol]
data = json.loads(data)
load_symbol , dic = data
if load_symbol == symbol:
bids = dic["bids"]
asks = dic["asks"]
bids = [ (float(x[0]) , float(x[1])) for x in bids ]
asks = [ (float(x[0]) , float(x[1])) for x in asks ]
tick.bidPrice1, tick.bidVolume1 = [0 , 0]
tick.bidPrice2, tick.bidVolume2 = [0 , 0]
tick.bidPrice3, tick.bidVolume3 = [0 , 0]
tick.bidPrice4, tick.bidVolume4 = [0 , 0]
tick.bidPrice5, tick.bidVolume5 = [0 , 0]
tick.askPrice1, tick.askVolume1 = [0 , 0]
tick.askPrice2, tick.askVolume2 = [0 , 0]
tick.askPrice3, tick.askVolume3 = [0 , 0]
tick.askPrice4, tick.askVolume4 = [0 , 0]
tick.askPrice5, tick.askVolume5 = [0 , 0]
try:
tick.bids = bids
tick.asks = asks
tick.bidPrice1, tick.bidVolume1 = bids[0]
tick.bidPrice2, tick.bidVolume2 = bids[1]
tick.bidPrice3, tick.bidVolume3 = bids[2]
tick.bidPrice4, tick.bidVolume4 = bids[3]
tick.bidPrice5, tick.bidVolume5 = bids[4]
except Exception,ex:
pass
try:
tick.askPrice1, tick.askVolume1 = asks[0]
tick.askPrice2, tick.askVolume2 = asks[1]
tick.askPrice3, tick.askVolume3 = asks[2]
tick.askPrice4, tick.askVolume4 = asks[3]
tick.askPrice5, tick.askVolume5 = asks[4]
except Exception,ex:
pass
now = datetime.now()
tick.time = now.strftime('%H:%M:%S')
tick.date = now.strftime('%Y%m%d')
tick.datetime = now
self.gateway.onTick(tick)
self.period_flag = False
def onTrade(self , data):
orderId, symbol , price , volume , direction = data
price = float(price)
volume = float(volume)
if symbol not in self.tickDict:
tick = VtTickData()
tick.gatewayName = self.gatewayName
tick.exchange = EXCHANGE_COINCHECK
tick.symbol = '.'.join([symbol, tick.exchange])
tick.vtSymbol = '.'.join([symbol, tick.exchange])
tick.volume = 0
self.tickDict[symbol] = tick
else:
tick = self.tickDict[symbol]
if self.period_flag == False:
self.period_flag = True
tick.highPrice = price
tick.lowPrice = price
tick.lastPrice = price
else:
tick.highPrice = max(tick.highPrice , price)
tick.lowPrice = min(tick.lowPrice , price)
tick.lastPrice = price
tick.volume += volume
def onMessage(self, ws , evt):
if evt:
data = json.loads(evt)
cclen = len(data)
if cclen == 2:
self.onOrderbooks( evt)
elif cclen == 5:
self.onTrade(data)
class CoincheckDataApi(vncoincheck.DataApi):
#----------------------------------------------------------------------
def __init__(self, gateway):
"""Constructor"""
super(CoincheckDataApi, self).__init__()
self.market = 'jpy'
self.gateway = gateway
self.gatewayName = gateway.gatewayName
self.tickDict = {} # key:symbol, value:tick
def connect(self, interval , market , debug = False):
self.init(interval , debug)
# 订阅行情并推送合约信息
if self.market == 'jpy':
self.subscribeTick(SYMBOL_BTCJPY)
self.subscribeOrderbooks(SYMBOL_BTCJPY)
contract = VtContractData()
contract.gatewayName = self.gatewayName
contract.symbol = SYMBOL_BTCJPY
contract.exchange = EXCHANGE_COINCHECK
contract.vtSymbol = '.'.join([contract.symbol, contract.exchange])
contract.name = u'日元coincheck现货BTC'
contract.size = 0.0001
contract.priceTick = 0.0001
contract.productClass = PRODUCT_SPOT
self.gateway.onContract(contract)
#----------------------------------------------------------------------
def onTick(self, data):
"""实时成交推送"""
symbol = SYMBOL_BTCJPY
if symbol not in self.tickDict:
tick = VtTickData()
tick.gatewayName = self.gatewayName
tick.exchange = EXCHANGE_COINCHECK
tick.symbol = '.'.join([symbol, tick.exchange])
tick.vtSymbol = '.'.join([symbol, tick.exchange])
self.tickDict[symbol] = tick
else:
tick = self.tickDict[symbol]
tick.highPrice = float(data['high'])
tick.lowPrice = float(data['low'])
tick.lastPrice = float(data['last'])
tick.volume = float(data['volume'])
now = datetime.now()
tick.time = now.strftime('%H:%M:%S')
tick.date = now.strftime('%Y%m%d')
tick.datetime = now
#----------------------------------------------------------------------
def onTrades(self, data):
"""实时成交推送"""
print data
#----------------------------------------------------------------------
def onOrderbooks(self, data):
"""实时成交推送"""
symbol = SYMBOL_BTCJPY
bids = data["bids"]
asks = data["asks"]
if symbol not in self.tickDict:
tick = VtTickData()
tick.gatewayName = self.gatewayName
tick.symbol = symbol
tick.exchange = EXCHANGE_COINCHECK
tick.vtSymbol = '.'.join([tick.symbol, tick.exchange])
self.tickDict[symbol] = tick
else:
tick = self.tickDict[symbol]
bids = [ (float(x[0]) , float(x[1])) for x in bids ]
asks = [ (float(x[0]) , float(x[1])) for x in asks ]
tick.bidPrice1, tick.bidVolume1 = bids[0]
tick.bidPrice2, tick.bidVolume2 = bids[1]
tick.bidPrice3, tick.bidVolume3 = bids[2]
tick.bidPrice4, tick.bidVolume4 = bids[3]
tick.bidPrice5, tick.bidVolume5 = bids[4]
tick.askPrice1, tick.askVolume1 = asks[0]
tick.askPrice2, tick.askVolume2 = asks[1]
tick.askPrice3, tick.askVolume3 = asks[2]
tick.askPrice4, tick.askVolume4 = asks[3]
tick.askPrice5, tick.askVolume5 = asks[4]
now = datetime.now()
tick.time = now.strftime('%H:%M:%S')
tick.date = now.strftime('%Y%m%d')
tick.datetime = now
self.gateway.onTick(tick)

View File

@ -0,0 +1,6 @@
{
"apiKey": "你的apiKey",
"secretKey": "你的secretKey",
"trace": false,
"leverage": 10
}

View File

@ -0,0 +1,11 @@
# encoding: UTF-8
from vnpy.trader import vtConstant
from okexGateway import okexGateway
gatewayClass = okexGateway
gatewayName = 'OKEX'
gatewayDisplayName = u'OKEX'
gatewayType = vtConstant.GATEWAYTYPE_BTC
gatewayQryEnabled = True

View File

@ -0,0 +1,934 @@
# encoding: UTF-8
'''
vn.okex的gateway接入
注意
1. 前仅支持USD 现货交易以及usd的期货交易
'''
import os
import json
from datetime import datetime
from time import sleep
from copy import copy
from threading import Condition
from Queue import Queue
from threading import Thread
from time import sleep
from vnpy.api.okex import OKEX_Sub_Spot_Api , OKEX_Contract_Api , okex_all_symbol_pairs , okex_all_contract_symbol , okex_all_symbol_type
from vnpy.trader.vtGateway import *
from vnpy.trader.vtFunction import getJsonPath
# 价格类型映射
# 买卖类型: 限价单buy/sell 市价单buy_market/sell_market
priceTypeMap = {}
priceTypeMap['buy'] = (DIRECTION_LONG, PRICETYPE_LIMITPRICE)
priceTypeMap['buy_market'] = (DIRECTION_LONG, PRICETYPE_MARKETPRICE)
priceTypeMap['sell'] = (DIRECTION_SHORT, PRICETYPE_LIMITPRICE)
priceTypeMap['sell_market'] = (DIRECTION_SHORT, PRICETYPE_MARKETPRICE)
priceTypeMapReverse = {v: k for k, v in priceTypeMap.items()}
# 委托状态印射
statusMap = {}
statusMap[-1] = STATUS_CANCELLED
statusMap[0] = STATUS_NOTTRADED
statusMap[1] = STATUS_PARTTRADED
statusMap[2] = STATUS_ALLTRADED
statusMap[4] = STATUS_UNKNOWN
########################################################################
class okexGateway(VtGateway):
#----------------------------------------------------------------------
def __init__(self, eventEngine, gatewayName='OKEX'):
"""Constructor"""
super(okexGateway, self).__init__(eventEngine, gatewayName)
self.api_spot = Api_Spot(self)
# self.api_contract = Api_contract(self)
self.leverage = 0
self.connected = False
self.fileName = self.gatewayName + '_connect.json'
self.filePath = getJsonPath(self.fileName, __file__)
#----------------------------------------------------------------------
def connect(self):
"""连接"""
# 载入json文件
try:
f = file(self.filePath)
except IOError:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'读取连接配置出错,请检查'
self.onLog(log)
return
# 解析json文件
setting = json.load(f)
try:
apiKey = str(setting['apiKey'])
secretKey = str(setting['secretKey'])
trace = setting['trace']
leverage = setting['leverage']
except KeyError:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'连接配置缺少字段,请检查'
self.onLog(log)
return
# 初始化接口
self.leverage = leverage
self.api_spot.active = True
self.api_spot.connect_Subpot( apiKey, secretKey, trace)
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'接口初始化成功'
self.onLog(log)
# 启动查询
# self.initQuery()
# self.startQuery()
#----------------------------------------------------------------------
def subscribe(self, subscribeReq):
"""订阅行情"""
pass
#----------------------------------------------------------------------
def sendOrder(self, orderReq):
"""发单"""
return self.api_spot.spotSendOrder(orderReq)
#----------------------------------------------------------------------
def cancelOrder(self, cancelOrderReq):
"""撤单"""
self.api_spot.spotCancel(cancelOrderReq)
#----------------------------------------------------------------------
def qryAccount(self):
"""查询账户资金"""
self.api_spot.spotUserInfo()
#----------------------------------------------------------------------
def qryOrderInfo(self):
self.api_spot.spotAllOrders()
#----------------------------------------------------------------------
def qryPosition(self):
"""查询持仓"""
pass
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.api_spot.active = False
self.api_spot.close()
#----------------------------------------------------------------------
def initQuery(self):
"""初始化连续查询"""
if self.qryEnabled:
# 需要循环的查询函数列表
#self.qryFunctionList = [self.qryAccount , self.qryOrderInfo]
self.qryFunctionList = [ self.qryOrderInfo]
#self.qryFunctionList = []
self.qryCount = 0 # 查询触发倒计时
self.qryTrigger = 2 # 查询触发点
self.qryNextFunction = 0 # 上次运行的查询函数索引
self.startQuery()
#----------------------------------------------------------------------
def query(self, event):
"""注册到事件处理引擎上的查询函数"""
self.qryCount += 1
if self.qryCount > self.qryTrigger:
# 清空倒计时
self.qryCount = 0
# 执行查询函数
function = self.qryFunctionList[self.qryNextFunction]
function()
# 计算下次查询函数的索引如果超过了列表长度则重新设为0
self.qryNextFunction += 1
if self.qryNextFunction == len(self.qryFunctionList):
self.qryNextFunction = 0
#----------------------------------------------------------------------
def startQuery(self):
"""启动连续查询"""
self.eventEngine.register(EVENT_TIMER, self.query)
#----------------------------------------------------------------------
def setQryEnabled(self, qryEnabled):
"""设置是否要启动循环查询"""
self.qryEnabled = qryEnabled
########################################################################
class Api_Spot(OKEX_Sub_Spot_Api):
"""okex的API实现"""
#----------------------------------------------------------------------
def __init__(self, gateway):
"""Constructor"""
super(Api_Spot, self).__init__()
self.gateway = gateway # gateway对象
self.gatewayName = gateway.gatewayName # gateway对象名称
self.active = False # 若为True则会在断线后自动重连
self.cbDict = {}
self.tickDict = {}
self.orderDict = {}
self.channelSymbolMap = {}
self.localNo = 0 # 本地委托号
self.localNoQueue = Queue() # 未收到系统委托号的本地委托号队列
self.localNoDict = {} # key为本地委托号value为系统委托号
self.orderIdDict = {} # key为系统委托号value为本地委托号
self.cancelDict = {} # key为本地委托号value为撤单请求
self.recordOrderId_BefVolume = {} # 记录的之前处理的量
self.cache_some_order = {}
self.tradeID = 0
self.initCallback()
'''
登录后每次订单执行撤销后又这样的 推送不知道干啥的先过滤掉了
{u'binary': 1, u'product': u'spot', u'type': u'order', u'base': u'etc'
, u'quote': u'usdt', u'data': {u'status': -1, u'orderType': 0, u'price': u'25.4050', u'modifyTime':
1512288275000L, u'userId': 6548935, u'createTime': 1512288275000L, u'source': 0, u'quoteSize': u'0.0
0000000', u'executedValue': u'0.00000000', u'id': 62877909, u'filledSize': u'0.00000000', u'side': 1
, u'size': u'0.01000000'}}
'''
#----------------------------------------------------------------------
def onMessage(self, ws, evt):
"""信息推送"""
# print evt
data = self.readData(evt)[0]
try:
channel = data['channel']
except Exception,ex:
channel = None
if channel == None:
return
# try:
if channel == "addChannel" and 'data' in data:
channel = data['data']["channel"]
if channel != "addChannel" and 'future' not in channel and channel != 'login':
# print channel
callback = self.cbDict[channel]
callback(data)
# if 'depth' not in channel and 'ticker' not in channel and 'deals' not in channel and 'userinfo' not in channel and 'future' not in channel:
# print data
# except Exception,ex:
# print "Error in callback cbDict ", channel
#print self.cbDict
#----------------------------------------------------------------------
def onError(self, ws, evt):
"""错误推送"""
error = VtErrorData()
error.gatewayName = self.gatewayName
error.errorMsg = str(evt)
self.gateway.onError(error)
#----------------------------------------------------------------------
def onError(self, data):
error = VtErrorData()
error.gatewayName = self.gatewayName
error.errorMsg = str(data["data"]["error_code"])
self.gateway.onError(error)
#----------------------------------------------------------------------
def onClose(self, ws):
"""接口断开"""
# 如果尚未连上,则忽略该次断开提示
if not self.gateway.connected:
return
self.gateway.connected = False
self.writeLog(u'服务器连接断开')
# 重新连接
if self.active:
def reconnect():
while not self.gateway.connected:
self.writeLog(u'等待10秒后重新连接')
sleep(10)
if not self.gateway.connected:
self.reconnect()
t = Thread(target=reconnect)
t.start()
#----------------------------------------------------------------------
def spotAllOrders(self):
for symbol in okex_all_symbol_pairs:
self.spotOrderInfo(symbol , '-1')
for orderId in self.orderIdDict.keys():
order = self.orderDict.get(orderId , None)
if order != None:
symbol_pair = (order.symbol.split('.'))[0]
self.spotOrderInfo(symbol_pair , orderId)
#----------------------------------------------------------------------
def onOpen(self, ws):
"""连接成功"""
self.gateway.connected = True
self.writeLog(u'服务器连接成功')
self.login()
# 连接后查询账户和委托数据
self.spotUserInfo()
for symbol_pair in okex_all_symbol_pairs:
self.spotOrderInfo(symbol_pair , '-1')
for symbol in okex_all_symbol_pairs:
self.subscribeSpotTicker(symbol)
self.subscribeSpotDepth5(symbol)
self.subscribeSpotDeals(symbol)
#Ticker数据
self.channelSymbolMap["ok_sub_spot_%s_ticker" % symbol] = symbol
#盘口的深度
self.channelSymbolMap["ok_sub_spot_%s_depth_5" % symbol] = symbol
#所有人的交易数据
self.channelSymbolMap["ok_sub_spot_%s_deals" % symbol] = symbol
contract = VtContractData()
contract.gatewayName = self.gatewayName
contract.symbol = symbol
contract.exchange = EXCHANGE_OKEX
contract.vtSymbol = '.'.join([contract.symbol, contract.exchange])
contract.name = u'OKEX现货%s' % symbol
contract.size = 0.00001
contract.priceTick = 0.00001
contract.productClass = PRODUCT_SPOT
self.gateway.onContract(contract)
'''
[{
"channel":"ok_sub_spot_bch_btc_deals",
"data":[["1001","2463.86","0.052","16:34:07","ask"]]
}]
'''
#----------------------------------------------------------------------
def onSpotSubDeals(self, data):
if 'data' not in data:
return
rawData = data["data"]
# print rawData
#----------------------------------------------------------------------
def writeLog(self, content):
"""快速记录日志"""
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = content
self.gateway.onLog(log)
#----------------------------------------------------------------------
def initCallback(self):
"""初始化回调函数"""
# USD_SPOT
for symbol_pair in okex_all_symbol_pairs:
self.cbDict["ok_sub_spot_%s_ticker" % symbol_pair] = self.onTicker
self.cbDict["ok_sub_spot_%s_depth_5" % symbol_pair] = self.onDepth
self.cbDict["ok_sub_spot_%s_deals" % symbol_pair] = self.onSpotSubDeals
self.cbDict["ok_sub_spot_%s_order" % symbol_pair] = self.onSpotSubOrder
self.cbDict["ok_sub_spot_%s_balance" % symbol_pair] = self.onSpotBalance
self.cbDict['ok_spot_userinfo'] = self.onSpotUserInfo
self.cbDict['ok_spot_orderinfo'] = self.onSpotOrderInfo
# 下面这两个好像废弃了
#self.cbDict['ok_sub_spot_userinfo'] = self.onSpotSubUserInfo
#self.cbDict['ok_sub_spot_trades'] = self.onSpotSubTrades
self.cbDict['ok_spot_order'] = self.onSpotOrder
self.cbDict['ok_spot_cancel_order'] = self.onSpotCancelOrder
'''
[
{
"binary": 0,
"channel": "ok_sub_spot_bch_btc_ticker",
"data": {
"high": "10000",
"vol": "185.03743858",
"last": "111",
"low": "0.00000001",
"buy": "115",
"change": "101",
"sell": "115",
"dayLow": "0.00000001",
"dayHigh": "10000",
"timestamp": 1500444626000
}
}
]
'''
#----------------------------------------------------------------------
def onTicker(self, data):
""""""
if 'data' not in data:
return
channel = data['channel']
if channel == 'addChannel':
return
try:
symbol = self.channelSymbolMap[channel]
if symbol not in self.tickDict:
tick = VtTickData()
tick.exchange = EXCHANGE_OKEX
tick.symbol = '.'.join([symbol, tick.exchange])
tick.vtSymbol = '.'.join([symbol, tick.exchange])
tick.gatewayName = self.gatewayName
self.tickDict[symbol] = tick
else:
tick = self.tickDict[symbol]
rawData = data['data']
tick.highPrice = float(rawData['high'])
tick.lowPrice = float(rawData['low'])
tick.lastPrice = float(rawData['last'])
tick.volume = float(rawData['vol'].replace(',', ''))
# tick.date, tick.time = self.generateDateTime(rawData['timestamp'])
# print "ticker", tick.date , tick.time
# newtick = copy(tick)
# self.gateway.onTick(newtick)
except Exception,ex:
print "Error in onTicker " , channel
#----------------------------------------------------------------------
def onDepth(self, data):
""""""
if 'data' not in data:
return
try:
channel = data['channel']
symbol = self.channelSymbolMap[channel]
except Exception,ex:
symbol = None
if symbol == None:
return
if symbol not in self.tickDict:
tick = VtTickData()
tick.symbol = symbol
tick.vtSymbol = symbol
tick.gatewayName = self.gatewayName
self.tickDict[symbol] = tick
else:
tick = self.tickDict[symbol]
if 'data' not in data:
return
rawData = data['data']
tick.bidPrice1, tick.bidVolume1 = rawData['bids'][0]
tick.bidPrice2, tick.bidVolume2 = rawData['bids'][1]
tick.bidPrice3, tick.bidVolume3 = rawData['bids'][2]
tick.bidPrice4, tick.bidVolume4 = rawData['bids'][3]
tick.bidPrice5, tick.bidVolume5 = rawData['bids'][4]
tick.askPrice1, tick.askVolume1 = rawData['asks'][-1]
tick.askPrice2, tick.askVolume2 = rawData['asks'][-2]
tick.askPrice3, tick.askVolume3 = rawData['asks'][-3]
tick.askPrice4, tick.askVolume4 = rawData['asks'][-4]
tick.askPrice5, tick.askVolume5 = rawData['asks'][-5]
tick.date, tick.time = self.generateDateTime(rawData['timestamp'])
# print "Depth", tick.date , tick.time
newtick = copy(tick)
self.gateway.onTick(newtick)
'''
[
{
"base": "bch",
"binary": 0,
"channel": "ok_sub_spot_bch_btc_balance",
"data": {
"info": {
"free": {
"btc": 5814.850605790395
},
"freezed": {
"btc": 7341
}
}
},
"product": "spot",
"quote": "btc",
"type": "order"
}
]
'''
def onSpotBalance(self, data):
"""交易发生金额变动之后会触发这个函数"""
# print data
rawData = data['data']
info = rawData['info']
for symbol in info["freezed"].keys():
pos = VtPositionData()
pos.gatewayName = self.gatewayName
pos.symbol = symbol
pos.vtSymbol = symbol
pos.direction = DIRECTION_NET
pos.frozen = float(info['freezed'][symbol])
pos.position = pos.frozen + float(info['free'][symbol])
self.gateway.onPosition(pos)
'''
[{"binary":0,"channel":"ok_spot_userinfo","data":{"result":true,"info":{"funds":{"borrow":{"dgd":"0"
,"bcd":"0","bcc":"0","bch":"0","hsr":"0","xuc":"0","omg":"0","eos":"0","qtum":"0","btc":"0","act":"0
","bcs":"0","btg":"0","etc":"0","eth":"0","usdt":"0","gas":"0","zec":"0","neo":"0","ltc":"0","bt1":"
0","bt2":"0","iota":"0","pay":"0","storj":"0","gnt":"0","snt":"0","dash":"0"},"free":{"dgd":"0","bcd
":"0","bcc":"0","bch":"0","hsr":"0","xuc":"3","omg":"0","eos":"0","qtum":"0","btc":"0.00266884258369
","act":"0","bcs":"0","btg":"0","etc":"7.9909635","eth":"0","usdt":"0","gas":"0","zec":"0","neo":"0"
,"ltc":"0","bt1":"0","bt2":"0","iota":"0","pay":"0","storj":"0","gnt":"0","snt":"0","dash":"0"},"fre
ezed":{"dgd":"0","bcd":"0","bcc":"0","bch":"0","hsr":"0","xuc":"0","omg":"0","eos":"0","qtum":"0","b
tc":"0","act":"0","bcs":"0","btg":"0","etc":"0","eth":"0","usdt":"0","gas":"0","zec":"0","neo":"0","
ltc":"0","bt1":"0","bt2":"0","iota":"0","pay":"0","storj":"0","gnt":"0","snt":"0","dash":"0"}}}}}]
{u'binary': 0, u'data': {u'info': {u'funds': {u'freezed': {u'zec': u'0', u'usdt': u'0', u'btg': u'0'
, u'btc': u'0', u'bt1': u'0', u'neo': u'0', u'pay': u'0', u'storj': u'0', u'iota': u'0', u'omg': u'0
', u'dgd': u'0', u'bt2': u'0', u'xuc': u'0', u'gas': u'0', u'hsr': u'0', u'snt': u'0', u'dash': u'0'
, u'bch': u'0', u'gnt': u'0', u'bcd': u'0', u'qtum': u'0', u'bcc': u'0', u'eos': u'0', u'etc': u'0',
u'act': u'0', u'eth': u'0', u'ltc': u'0', u'bcs': u'0'}, u'borrow': {u'zec': u'0', u'usdt': u'0', u
'btg': u'0', u'btc': u'0', u'bt1': u'0', u'neo': u'0', u'pay': u'0', u'storj': u'0', u'iota': u'0',
u'omg': u'0', u'dgd': u'0', u'bt2': u'0', u'xuc': u'0', u'gas': u'0', u'hsr': u'0', u'snt': u'0', u'
dash': u'0', u'bch': u'0', u'gnt': u'0', u'bcd': u'0', u'qtum': u'0', u'bcc': u'0', u'eos': u'0', u'
etc': u'0', u'act': u'0', u'eth': u'0', u'ltc': u'0', u'bcs': u'0'}, u'free': {u'zec': u'0', u'usdt'
: u'0', u'btg': u'0', u'btc': u'0.00266884258369', u'bt1': u'0', u'neo': u'0', u'pay': u'0', u'storj
': u'0', u'iota': u'0', u'omg': u'0', u'dgd': u'0', u'bt2': u'0', u'xuc': u'3', u'gas': u'0', u'hsr'
: u'0', u'snt': u'0', u'dash': u'0', u'bch': u'0', u'gnt': u'0', u'bcd': u'0', u'qtum': u'0', u'bcc'
: u'0', u'eos': u'0', u'etc': u'7.9909635', u'act': u'0', u'eth': u'0', u'ltc': u'0', u'bcs': u'0'}}
}, u'result': True}, u'channel': u'ok_spot_userinfo'}
'''
#----------------------------------------------------------------------
def onSpotUserInfo(self, data):
"""现货账户资金推送"""
rawData = data['data']
info = rawData['info']
funds = rawData['info']['funds']
# 持仓信息
#for symbol in ['btc', 'ltc','eth', self.currency]:
for symbol in okex_all_symbol_type:
if symbol in funds['free']:
pos = VtPositionData()
pos.gatewayName = self.gatewayName
pos.symbol = symbol
pos.vtSymbol = symbol
pos.vtPositionName = symbol
pos.direction = DIRECTION_NET
pos.frozen = float(funds['freezed'][symbol])
pos.position = pos.frozen + float(funds['free'][symbol])
self.gateway.onPosition(pos)
# 账户资金
account = VtAccountData()
account.gatewayName = self.gatewayName
account.accountID = self.gatewayName
account.vtAccountID = account.accountID
account.balance = 0.0
#account.balance = float(funds['asset']['net'])
self.gateway.onAccount(account)
#----------------------------------------------------------------------
# 这个 API 现在文档没找到。。 好像废弃了
def onSpotSubUserInfo(self, data):
"""现货账户资金推送"""
if 'data' not in data:
return
rawData = data['data']
info = rawData['info']
# 持仓信息
#for symbol in ['btc', 'ltc','eth', self.currency]:
for symbol in okex_all_symbol_type:
if symbol in info['free']:
pos = VtPositionData()
pos.gatewayName = self.gatewayName
pos.symbol = symbol
pos.vtSymbol = symbol
pos.vtPositionName = symbol
pos.direction = DIRECTION_NET
pos.frozen = float(info['freezed'][symbol])
pos.position = pos.frozen + float(info['free'][symbol])
self.gateway.onPosition(pos)
'''
交易数据
[
{
"base": "bch",
"binary": 0,
"channel": "ok_sub_spot_bch_btc_order",
"data": {
"symbol": "bch_btc",
"tradeAmount": "1.00000000",
"createdDate": "1504530228987",
"orderId": 6191,
"completedTradeAmount": "0.00000000",
"averagePrice": "0",
"tradePrice": "0.00000000",
"tradeType": "buy",
"status": 0,
"tradeUnitPrice": "113.00000000"
},
"product": "spot",
"quote": "btc",
"type": "balance"
}
]
{u'binary': 0, u'data': {u'orderId': 62870564, u'status': 0, u'tradeType': u'sell', u'tradeUnitPrice
': u'25.3500', u'symbol': u'etc_usdt', u'tradePrice': u'0.0000', u'createdDate': u'1512287172393', u
'averagePrice': u'0', u'tradeAmount': u'0.01000000', u'completedTradeAmount': u'0.00000000'}, u'chan
nel': u'ok_sub_spot_etc_usdt_order'}
'''
#----------------------------------------------------------------------
def onSpotSubOrder(self, data):
"""交易数据"""
if 'data' not in data:
return
rawData = data["data"]
# 本地和系统委托号
orderId = str(rawData['orderId'])
# 这时候出现None , 情况是 已经发出了单子,但是系统这里还没建立 索引
# 先这样返回试一下
# 因为 发完单订单变化是先推送的。。导致不清楚他的localID
# 现在的处理方式是, 先缓存这里的信息,等到出现了 localID再来处理这一段
localNo = self.orderIdDict.get(orderId , None)
if localNo == None:
arr = self.cache_some_order.get(orderId , None)
if arr == None:
arr = []
arr.append( data)
self.cache_some_order[orderId] = arr
else:
arr.append( data)
return
# 委托信息
if orderId not in self.orderDict:
order = VtOrderData()
order.gatewayName = self.gatewayName
order.symbol = '.'.join([rawData['symbol'] , EXCHANGE_OKEX])
#order.symbol = spotSymbolMap[rawData['symbol']]
order.vtSymbol = order.symbol
order.orderID = localNo
order.vtOrderID = '.'.join([self.gatewayName, order.orderID])
order.price = float(rawData['tradeUnitPrice'])
order.totalVolume = float(rawData['tradeAmount'])
order.direction, priceType = priceTypeMap[rawData['tradeType']]
self.orderDict[orderId] = order
else:
order = self.orderDict[orderId]
order.tradedVolume = float(rawData['completedTradeAmount'])
order.status = statusMap[rawData['status']]
self.gateway.onOrder(copy(order))
bef_volume = self.recordOrderId_BefVolume.get( orderId , 0.0 )
now_volume = float(rawData['completedTradeAmount']) - bef_volume
if now_volume > 0.000001:
trade = VtTradeData()
trade.gatewayName = self.gatewayName
trade.symbol = order.symbol
trade.vtSymbol = order.symbol
self.tradeID += 1
trade.tradeID = str(self.tradeID)
trade.vtTradeID = '.'.join([self.gatewayName, trade.tradeID])
trade.orderID = localNo
trade.vtOrderID = '.'.join([self.gatewayName, trade.orderID])
trade.price = float(rawData['tradeUnitPrice'])
trade.volume = float(now_volume)
trade.direction, priceType = priceTypeMap[rawData['tradeType']]
trade.tradeTime = datetime.now().strftime('%H:%M:%S')
self.gateway.onTrade(trade)
"""
原来的OK coin方式不过数据一直没有 所以换一种方式
# 成交信息
if 'sigTradeAmount' in rawData and float(rawData['sigTradeAmount'])>0:
trade = VtTradeData()
trade.gatewayName = self.gatewayName
trade.symbol = spotSymbolMap[rawData['symbol']]
trade.vtSymbol = order.symbol
trade.tradeID = str(rawData['id'])
trade.vtTradeID = '.'.join([self.gatewayName, trade.tradeID])
trade.orderID = localNo
trade.vtOrderID = '.'.join([self.gatewayName, trade.orderID])
trade.price = float(rawData['sigTradePrice'])
trade.volume = float(rawData['sigTradeAmount'])
trade.direction, priceType = priceTypeMap[rawData['tradeType']]
trade.tradeTime = datetime.now().strftime('%H:%M:%S')
self.gateway.onTrade(trade)
"""
'''
[
{
"binary": 0,
"channel": "ok_spot_orderinfo",
"data": {
"result": true,
"orders": [
{
"symbol": "bch_btc",
"amount": "0.10000000",
"price": "1.00000000",
"avg_price": 0,
"create_date": 1504529828000,
"type": "buy",
"deal_amount": 0,
"order_id": 6189,
"status": -1
}
]
}
}
]
'''
#----------------------------------------------------------------------
def onSpotOrderInfo(self, data):
"""委托信息查询回调"""
rawData = data['data']
for d in rawData['orders']:
self.localNo += 1
localNo = str(self.localNo)
orderId = str(d['order_id'])
self.localNoDict[localNo] = orderId
self.orderIdDict[orderId] = localNo
if orderId not in self.orderDict:
order = VtOrderData()
order.gatewayName = self.gatewayName
#order.symbol = spotSymbolMap[d['symbol']]
order.symbol = '.'.join([d["symbol"] , EXCHANGE_OKEX])
order.vtSymbol = order.symbol
order.orderID = localNo
order.vtOrderID = '.'.join([self.gatewayName, order.orderID])
order.price = d['price']
order.totalVolume = d['amount']
order.direction, priceType = priceTypeMap[d['type']]
self.orderDict[orderId] = order
else:
order = self.orderDict[orderId]
order.tradedVolume = d['deal_amount']
order.status = statusMap[d['status']]
self.gateway.onOrder(copy(order))
'''
[
{
"binary": 0,
"channel": "ok_spot_order",
"data": {
"result": true,
"order_id": 6189
}
}
]
'''
def onSpotOrder(self, data):
rawData = data['data']
orderId = str(rawData['order_id'])
# 尽管websocket接口的委托号返回是异步的但经过测试是
# 符合先发现回的规律因此这里通过queue获取之前发送的
# 本地委托号,并把它和推送的系统委托号进行映射
# localNo = self.orderIdDict.get(orderId , None)
# if localNo == None:
localNo = self.localNoQueue.get_nowait()
self.localNoDict[localNo] = orderId
self.orderIdDict[orderId] = localNo
# print orderId , self.cache_some_order
if orderId in self.cache_some_order.keys():
arr = self.cache_some_order[orderId]
for d in arr:
self.onSpotSubOrder(d)
# 处理完就删除掉这里
del self.cache_some_order[orderId]
# 检查是否有系统委托号返回前就发出的撤单请求,若有则进
# 行撤单操作
if localNo in self.cancelDict:
req = self.cancelDict[localNo]
self.spotCancel(req)
del self.cancelDict[localNo]
'''
[
{
"binary": 0,
"channel": "ok_spot_cancel_order",
"data": {
"result": true,
"order_id": "125433027"
}
}
]
'''
#----------------------------------------------------------------------
def onSpotCancelOrder(self, data):
"""撤单回报"""
if 'data' not in data:
return
if 'error' in data["data"].keys():
self.onError(data)
return
rawData = data['data']
orderId = str(rawData['order_id'])
localNo = self.orderIdDict[orderId]
if orderId not in self.orderDict:
order = VtOrderData()
order.gatewayName = self.gatewayName
order.symbol = ','.join([rawData['symbol'] , EXCHANGE_OKEX])
order.vtSymbol = order.symbol
order.orderID = localNo
order.vtOrderID = '.'.join([self.gatewayName, order.orderID])
self.orderDict[orderId] = order
else:
order = self.orderDict[orderId]
order.status = STATUS_CANCELLED
self.gateway.onOrder(order)
del self.orderDict[orderId]
del self.orderIdDict[orderId]
del self.localNoDict[localNo]
if orderId in self.cache_some_order.keys():
del self.cache_some_order[orderId]
#----------------------------------------------------------------------
def spotSendOrder(self, req):
"""发单"""
#symbol = spotSymbolMapReverse[req.symbol][:4]
symbol = (req.symbol.split('.'))[0]
type_ = priceTypeMapReverse[(req.direction, req.priceType)]
self.spotTrade(symbol, type_, str(req.price), str(req.volume))
# 本地委托号加1并将对应字符串保存到队列中返回基于本地委托号的vtOrderID
self.localNo += 1
self.localNoQueue.put(str(self.localNo))
vtOrderID = '.'.join([self.gatewayName, str(self.localNo)])
return vtOrderID
#----------------------------------------------------------------------
def spotCancel(self, req):
"""撤单"""
#symbol = spotSymbolMapReverse[req.symbol][:4]
symbol = (req.symbol.split('.'))[0]
localNo = req.orderID
if localNo in self.localNoDict:
orderID = self.localNoDict[localNo]
self.spotCancelOrder(symbol, orderID)
else:
# 如果在系统委托号返回前客户就发送了撤单请求,则保存
# 在cancelDict字典中等待返回后执行撤单任务
self.cancelDict[localNo] = req
#----------------------------------------------------------------------
def generateDateTime(self, s):
"""生成时间"""
dt = datetime.fromtimestamp(float(s)/1e3)
time = dt.strftime("%H:%M:%S.%f")
date = dt.strftime("%Y%m%d")
return date, time

View File

@ -85,6 +85,11 @@ EXCHANGE_OKCOIN = 'OKCOIN' # OKCOIN比特币交易所
EXCHANGE_HUOBI = 'HUOBI' # 火币比特币交易所
EXCHANGE_LHANG = 'LHANG' # 链行比特币交易所
EXCHANGE_ZB = 'ZB' # ZB 中国比特币交易所 (比特币中国)
EXCHANGE_OKEX = 'OKEX' # OKEX 中国比特币交易所 (okcoin)
EXCHANGE_ZAIF = "ZAIF" # ZAIF 日本比特币交易所
EXCHANGE_COINCHECK = "COINCHECK" # COINCHECK 日本比特币交易所
# 货币类型
CURRENCY_USD = 'USD' # 美元
CURRENCY_CNY = 'CNY' # 人民币

View File

@ -81,6 +81,11 @@ EXCHANGE_OKCOIN = 'OKCOIN' # OKCOIN比特币交易所
EXCHANGE_HUOBI = 'HUOBI' # 火币比特币交易所
EXCHANGE_LHANG = 'LHANG' # 链行比特币交易所
EXCHANGE_ZB = 'ZB' # ZB 中国比特币交易所 (比特币中国)
EXCHANGE_OKEX = 'OKEX' # OKEX 中国比特币交易所 (okcoin)
EXCHANGE_ZAIF = "ZAIF" # ZAIF 日本比特币交易所
EXCHANGE_COINCHECK = "COINCHECK" # COINCHECK 日本比特币交易所
# 货币类型
CURRENCY_USD = 'USD' # 美元
CURRENCY_CNY = 'CNY' # 人民币