[Add]初步完成更新后的OKEX接口
This commit is contained in:
parent
c28380a6ea
commit
762a838b3d
14
vnpy/api/okex/README.md
Normal file
14
vnpy/api/okex/README.md
Normal file
@ -0,0 +1,14 @@
|
||||
### 简介
|
||||
|
||||
OKEX的比特币交易接口,基于Websocket API开发,实现了以下功能:
|
||||
|
||||
1. 发送、撤销委托
|
||||
|
||||
2. 查询委托、持仓、资金、成交历史
|
||||
|
||||
3. 实时行情、成交、资金更新的推送
|
||||
|
||||
### API信息
|
||||
|
||||
链接:[https://www.okex.com/ws_getStarted.html](https://www.okex.com/ws_getStarted.html)
|
||||
|
4
vnpy/api/okex/__init__.py
Normal file
4
vnpy/api/okex/__init__.py
Normal file
@ -0,0 +1,4 @@
|
||||
# encoding: UTF-8
|
||||
|
||||
from __future__ import absolute_import
|
||||
from .vnokex import OkexSpotApi, SPOT_CURRENCY, SPOT_SYMBOL, OKEX_SPOT_HOST
|
332
vnpy/api/okex/vnokex.py
Normal file
332
vnpy/api/okex/vnokex.py
Normal file
@ -0,0 +1,332 @@
|
||||
# encoding: UTF-8
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
from threading import Thread
|
||||
from time import sleep
|
||||
|
||||
import websocket
|
||||
|
||||
# 常量定义
|
||||
OKEX_SPOT_HOST = 'wss://real.okex.com:10441/websocket'
|
||||
|
||||
|
||||
SPOT_CURRENCY = ["usdt",
|
||||
"btc",
|
||||
"ltc",
|
||||
"eth",
|
||||
"etc",
|
||||
"bch"]
|
||||
|
||||
SPOT_SYMBOL = ["ltc_btc",
|
||||
"eth_btc",
|
||||
"etc_btc",
|
||||
"bch_btc",
|
||||
"btc_usdt",
|
||||
"eth_usdt",
|
||||
"ltc_usdt",
|
||||
"etc_usdt",
|
||||
"bch_usdt",
|
||||
"etc_eth",
|
||||
"bt1_btc",
|
||||
"bt2_btc",
|
||||
"btg_btc",
|
||||
"qtum_btc",
|
||||
"hsr_btc",
|
||||
"neo_btc",
|
||||
"gas_btc",
|
||||
"qtum_usdt",
|
||||
"hsr_usdt",
|
||||
"neo_usdt",
|
||||
"gas_usdt"]
|
||||
|
||||
KLINE_PERIOD = ["1min",
|
||||
"3min",
|
||||
"5min",
|
||||
"15min",
|
||||
"30min",
|
||||
"1hour",
|
||||
"2hour",
|
||||
"4hour",
|
||||
"6hour",
|
||||
"12hour",
|
||||
"day",
|
||||
"3day",
|
||||
"week"]
|
||||
|
||||
|
||||
########################################################################
|
||||
class OkexApi(object):
|
||||
"""交易接口"""
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def __init__(self):
|
||||
"""Constructor"""
|
||||
self.host = '' # 服务器
|
||||
self.apiKey = '' # 用户名
|
||||
self.secretKey = '' # 密码
|
||||
|
||||
self.active = False # 工作状态
|
||||
self.ws = None # websocket应用对象
|
||||
self.wsThread = None # websocket工作线程
|
||||
|
||||
self.heartbeatCount = 0 # 心跳计数
|
||||
self.heartbeatThread = None # 心跳线程
|
||||
self.heartbeatReceived = True # 心跳是否收到
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def heartbeat(self):
|
||||
""""""
|
||||
while self.active:
|
||||
self.heartbeatCount += 1
|
||||
|
||||
if self.heartbeatCount < 30:
|
||||
sleep(1)
|
||||
else:
|
||||
self.heartbeatCount = 0
|
||||
|
||||
if not self.heartbeatReceived:
|
||||
self.reconnect()
|
||||
else:
|
||||
d = {'event': 'ping'}
|
||||
j = json.dumps(d)
|
||||
self.ws.send(j)
|
||||
self.heartbeatReceived = False
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def reconnect(self):
|
||||
"""重新连接"""
|
||||
self.close() # 首先关闭之前的连接
|
||||
self.initWebsocket()
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def connect(self, host, apiKey, secretKey, trace=False):
|
||||
"""连接"""
|
||||
self.host = host
|
||||
self.apiKey = apiKey
|
||||
self.secretKey = secretKey
|
||||
|
||||
websocket.enableTrace(trace)
|
||||
|
||||
self.initWebsocket()
|
||||
self.active = True
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def initWebsocket(self):
|
||||
""""""
|
||||
self.ws = websocket.WebSocketApp(self.host,
|
||||
on_message=self.onMessageCallback,
|
||||
on_error=self.onErrorCallback,
|
||||
on_close=self.onCloseCallback,
|
||||
on_open=self.onOpenCallback)
|
||||
|
||||
self.wsThread = Thread(target=self.ws.run_forever)
|
||||
self.wsThread.start()
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def readData(self, evt):
|
||||
"""解码推送收到的数据"""
|
||||
data = json.loads(evt)
|
||||
return data
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def close(self):
|
||||
"""关闭接口"""
|
||||
if self.heartbeatThread and self.heartbeatThread.isAlive():
|
||||
self.active = False
|
||||
self.heartbeatThread.join()
|
||||
|
||||
if self.wsThread and self.wsThread.isAlive():
|
||||
self.ws.close()
|
||||
self.wsThread.join()
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onMessage(self, data):
|
||||
"""信息推送"""
|
||||
print('onMessage')
|
||||
print(evt)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onError(self, data):
|
||||
"""错误推送"""
|
||||
print('onError')
|
||||
print(evt)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onClose(self):
|
||||
"""接口断开"""
|
||||
print('onClose')
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onOpen(self):
|
||||
"""接口打开"""
|
||||
print('onOpen')
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onMessageCallback(self, ws, evt):
|
||||
""""""
|
||||
data = self.readData(evt)
|
||||
if 'event' in data:
|
||||
self.heartbeatReceived = True
|
||||
else:
|
||||
self.onMessage(data[0])
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onErrorCallback(self, ws, evt):
|
||||
""""""
|
||||
self.onError(evt)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onCloseCallback(self, ws):
|
||||
""""""
|
||||
self.onClose()
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onOpenCallback(self, ws):
|
||||
""""""
|
||||
self.heartbeatThread = Thread(target=self.heartbeat)
|
||||
self.heartbeatThread.start()
|
||||
|
||||
self.onOpen()
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def generateSign(self, params):
|
||||
"""生成签名"""
|
||||
l = []
|
||||
for key in sorted(params.keys()):
|
||||
l.append('%s=%s' %(key, params[key]))
|
||||
l.append('secret_key=%s' %self.secretKey)
|
||||
sign = '&'.join(l)
|
||||
return hashlib.md5(sign.encode('utf-8')).hexdigest().upper()
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def sendRequest(self, channel, params=None):
|
||||
"""发送请求"""
|
||||
# 生成请求
|
||||
d = {}
|
||||
d['event'] = 'addChannel'
|
||||
d['channel'] = channel
|
||||
|
||||
# 如果有参数,在参数字典中加上api_key和签名字段
|
||||
if params is not None:
|
||||
params['api_key'] = self.apiKey
|
||||
params['sign'] = self.generateSign(params)
|
||||
d['parameters'] = params
|
||||
|
||||
# 使用json打包并发送
|
||||
j = json.dumps(d)
|
||||
|
||||
# 若触发异常则重连
|
||||
try:
|
||||
self.ws.send(j)
|
||||
except websocket.WebSocketConnectionClosedException:
|
||||
pass
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def login(self):
|
||||
params = {}
|
||||
params['api_key'] = self.apiKey
|
||||
params['sign'] = self.generateSign(params)
|
||||
|
||||
# 生成请求
|
||||
d = {}
|
||||
d['event'] = 'login'
|
||||
d['parameters'] = params
|
||||
j = json.dumps(d)
|
||||
|
||||
# 若触发异常则重连
|
||||
try:
|
||||
self.ws.send(j)
|
||||
return True
|
||||
except websocket.WebSocketConnectionClosedException:
|
||||
return False
|
||||
|
||||
|
||||
########################################################################
|
||||
class OkexSpotApi(OkexApi):
|
||||
"""现货交易接口"""
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def __init__(self):
|
||||
"""Constructor"""
|
||||
super(OkexSpotApi, self).__init__()
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def subscribeSpotTicker(self, symbol):
|
||||
"""订阅现货的Tick"""
|
||||
channel = 'ok_sub_spot_%s_ticker' %symbol
|
||||
self.sendRequest(channel)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def subscribeSpotDepth(self, symbol, depth=0):
|
||||
"""订阅现货的深度"""
|
||||
channel = 'ok_sub_spot_%s_depth' %symbol
|
||||
if depth:
|
||||
channel = channel + '_' + str(depth)
|
||||
self.sendRequest(channel)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def subscribeSpotDeals(self, symbol):
|
||||
channel = 'ok_sub_spot_%s_deals' %symbol
|
||||
self.sendRequest(channel)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def subscribeSpotKlines(self, symbol, period):
|
||||
channel = 'ok_sub_spot_%s_kline_%s' %(symbol, period)
|
||||
self.sendRequest(channel)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def spotOrder(self, symbol, type_, price, amount):
|
||||
"""现货委托"""
|
||||
params = {}
|
||||
params['symbol'] = str(symbol)
|
||||
params['type'] = str(type_)
|
||||
params['price'] = str(price)
|
||||
params['amount'] = str(amount)
|
||||
|
||||
channel = 'ok_spot_order'
|
||||
|
||||
self.sendRequest(channel, params)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def spotCancelOrder(self, symbol, orderid):
|
||||
"""现货撤单"""
|
||||
params = {}
|
||||
params['symbol'] = str(symbol)
|
||||
params['order_id'] = str(orderid)
|
||||
|
||||
channel = 'ok_spot_cancel_order'
|
||||
|
||||
self.sendRequest(channel, params)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def spotUserInfo(self):
|
||||
"""查询现货账户"""
|
||||
channel = 'ok_spot_userinfo'
|
||||
self.sendRequest(channel, {})
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def spotOrderInfo(self, symbol, orderid):
|
||||
"""查询现货委托信息"""
|
||||
params = {}
|
||||
params['symbol'] = str(symbol)
|
||||
params['order_id'] = str(orderid)
|
||||
|
||||
channel = 'ok_spot_orderinfo'
|
||||
|
||||
self.sendRequest(channel, params)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def subSpotOrder(self, symbol):
|
||||
"""订阅委托推送"""
|
||||
channel = 'ok_sub_spot_%s_order' %symbol
|
||||
self.sendRequest(channel)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def subSpotBalance(self, symbol):
|
||||
"""订阅资金推送"""
|
||||
channel = 'ok_sub_spot_%s_balance' %symbol
|
||||
self.sendRequest(channel)
|
||||
|
6
vnpy/trader/gateway/okexGateway/OKEX_connect.json
Normal file
6
vnpy/trader/gateway/okexGateway/OKEX_connect.json
Normal file
@ -0,0 +1,6 @@
|
||||
{
|
||||
"apiKey": "",
|
||||
"secretKey": "",
|
||||
"trace": false,
|
||||
"symbols": ["eth_btc", "btc_usdt", "eth_usdt"]
|
||||
}
|
12
vnpy/trader/gateway/okexGateway/__init__.py
Normal file
12
vnpy/trader/gateway/okexGateway/__init__.py
Normal file
@ -0,0 +1,12 @@
|
||||
# encoding: UTF-8
|
||||
|
||||
from __future__ import absolute_import
|
||||
from vnpy.trader import vtConstant
|
||||
from .okexGateway import OkexGateway
|
||||
|
||||
gatewayClass = OkexGateway
|
||||
gatewayName = 'OKEX'
|
||||
gatewayDisplayName = u'OKEX'
|
||||
gatewayType = vtConstant.GATEWAYTYPE_BTC
|
||||
gatewayQryEnabled = True
|
||||
|
611
vnpy/trader/gateway/okexGateway/okexGateway.py
Normal file
611
vnpy/trader/gateway/okexGateway/okexGateway.py
Normal file
@ -0,0 +1,611 @@
|
||||
# encoding: UTF-8
|
||||
|
||||
'''
|
||||
vnpy.api.okex的gateway接入
|
||||
'''
|
||||
from __future__ import print_function
|
||||
|
||||
|
||||
import os
|
||||
import json
|
||||
from datetime import datetime
|
||||
from time import sleep
|
||||
from copy import copy
|
||||
from threading import Condition
|
||||
from queue import Queue, Empty
|
||||
from threading import Thread
|
||||
from time import sleep
|
||||
|
||||
from vnpy.api.okex import OkexSpotApi, OKEX_SPOT_HOST
|
||||
from vnpy.trader.vtGateway import *
|
||||
from vnpy.trader.vtFunction import getJsonPath
|
||||
|
||||
# 价格类型映射
|
||||
# 买卖类型: 限价单(buy/sell) 市价单(buy_market/sell_market)
|
||||
priceTypeMap = {}
|
||||
priceTypeMap['buy'] = (DIRECTION_LONG, PRICETYPE_LIMITPRICE)
|
||||
priceTypeMap['buy_market'] = (DIRECTION_LONG, PRICETYPE_MARKETPRICE)
|
||||
priceTypeMap['sell'] = (DIRECTION_SHORT, PRICETYPE_LIMITPRICE)
|
||||
priceTypeMap['sell_market'] = (DIRECTION_SHORT, PRICETYPE_MARKETPRICE)
|
||||
priceTypeMapReverse = {v: k for k, v in priceTypeMap.items()}
|
||||
|
||||
# 委托状态印射
|
||||
statusMap = {}
|
||||
statusMap[-1] = STATUS_CANCELLED
|
||||
statusMap[0] = STATUS_NOTTRADED
|
||||
statusMap[1] = STATUS_PARTTRADED
|
||||
statusMap[2] = STATUS_ALLTRADED
|
||||
statusMap[4] = STATUS_UNKNOWN
|
||||
|
||||
|
||||
########################################################################
|
||||
class OkexGateway(VtGateway):
|
||||
"""OKEX交易接口"""
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def __init__(self, eventEngine, gatewayName='OKEX'):
|
||||
"""Constructor"""
|
||||
super(OkexGateway, self).__init__(eventEngine, gatewayName)
|
||||
|
||||
self.spotApi = SpotApi(self)
|
||||
# self.api_contract = Api_contract(self)
|
||||
|
||||
self.leverage = 0
|
||||
self.connected = False
|
||||
|
||||
self.fileName = self.gatewayName + '_connect.json'
|
||||
self.filePath = getJsonPath(self.fileName, __file__)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def connect(self):
|
||||
"""连接"""
|
||||
# 载入json文件
|
||||
try:
|
||||
f = file(self.filePath)
|
||||
except IOError:
|
||||
log = VtLogData()
|
||||
log.gatewayName = self.gatewayName
|
||||
log.logContent = u'读取连接配置出错,请检查'
|
||||
self.onLog(log)
|
||||
return
|
||||
|
||||
# 解析json文件
|
||||
setting = json.load(f)
|
||||
try:
|
||||
apiKey = str(setting['apiKey'])
|
||||
secretKey = str(setting['secretKey'])
|
||||
trace = setting['trace']
|
||||
symbols = setting['symbols']
|
||||
except KeyError:
|
||||
log = VtLogData()
|
||||
log.gatewayName = self.gatewayName
|
||||
log.logContent = u'连接配置缺少字段,请检查'
|
||||
self.onLog(log)
|
||||
return
|
||||
|
||||
# 初始化接口
|
||||
self.spotApi.init(apiKey, secretKey, trace, symbols)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def subscribe(self, subscribeReq):
|
||||
"""订阅行情"""
|
||||
pass
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def sendOrder(self, orderReq):
|
||||
"""发单"""
|
||||
return self.spotApi.sendOrder(orderReq)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def cancelOrder(self, cancelOrderReq):
|
||||
"""撤单"""
|
||||
self.spotApi.cancelOrder(cancelOrderReq)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def qryAccount(self):
|
||||
"""查询账户资金"""
|
||||
pass
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def qryPosition(self):
|
||||
"""查询持仓"""
|
||||
self.spotApi.spotUserInfo()
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def close(self):
|
||||
"""关闭"""
|
||||
self.spotApi.close()
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def initQuery(self):
|
||||
"""初始化连续查询"""
|
||||
if self.qryEnabled:
|
||||
# 需要循环的查询函数列表
|
||||
self.qryFunctionList = [self.qryPosition]
|
||||
|
||||
self.qryCount = 0 # 查询触发倒计时
|
||||
self.qryTrigger = 2 # 查询触发点
|
||||
self.qryNextFunction = 0 # 上次运行的查询函数索引
|
||||
|
||||
self.startQuery()
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def query(self, event):
|
||||
"""注册到事件处理引擎上的查询函数"""
|
||||
self.qryCount += 1
|
||||
|
||||
if self.qryCount > self.qryTrigger:
|
||||
# 清空倒计时
|
||||
self.qryCount = 0
|
||||
|
||||
# 执行查询函数
|
||||
function = self.qryFunctionList[self.qryNextFunction]
|
||||
function()
|
||||
|
||||
# 计算下次查询函数的索引,如果超过了列表长度,则重新设为0
|
||||
self.qryNextFunction += 1
|
||||
if self.qryNextFunction == len(self.qryFunctionList):
|
||||
self.qryNextFunction = 0
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def startQuery(self):
|
||||
"""启动连续查询"""
|
||||
self.eventEngine.register(EVENT_TIMER, self.query)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def setQryEnabled(self, qryEnabled):
|
||||
"""设置是否要启动循环查询"""
|
||||
self.qryEnabled = qryEnabled
|
||||
|
||||
|
||||
########################################################################
|
||||
class SpotApi(OkexSpotApi):
|
||||
"""OKEX的API实现"""
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def __init__(self, gateway):
|
||||
"""Constructor"""
|
||||
super(SpotApi, self).__init__()
|
||||
|
||||
self.gateway = gateway # gateway对象
|
||||
self.gatewayName = gateway.gatewayName # gateway对象名称
|
||||
|
||||
self.cbDict = {}
|
||||
self.tickDict = {}
|
||||
self.orderDict = {}
|
||||
|
||||
self.channelSymbolMap = {}
|
||||
|
||||
self.localNo = 0 # 本地委托号
|
||||
self.localNoQueue = Queue() # 未收到系统委托号的本地委托号队列
|
||||
self.localNoDict = {} # key为本地委托号,value为系统委托号
|
||||
self.localOrderDict = {} # key为本地委托号, value为委托对象
|
||||
self.orderIdDict = {} # key为系统委托号,value为本地委托号
|
||||
self.cancelDict = {} # key为本地委托号,value为撤单请求
|
||||
|
||||
self.recordOrderId_BefVolume = {} # 记录的之前处理的量
|
||||
|
||||
self.cache_some_order = {}
|
||||
self.tradeID = 0
|
||||
|
||||
self.registerSymbolPairArray = set([])
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onMessage(self, data):
|
||||
"""信息推送"""
|
||||
channel = data.get('channel', '')
|
||||
if not channel:
|
||||
return
|
||||
|
||||
if channel in self.cbDict:
|
||||
callback = self.cbDict[channel]
|
||||
callback(data)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onError(self, data):
|
||||
"""错误推送"""
|
||||
error = VtErrorData()
|
||||
error.gatewayName = self.gatewayName
|
||||
error.errorMsg = str(data)
|
||||
self.gateway.onError(error)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onClose(self):
|
||||
"""接口断开"""
|
||||
self.gateway.connected = False
|
||||
self.writeLog(u'服务器连接断开')
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onOpen(self):
|
||||
"""连接成功"""
|
||||
self.gateway.connected = True
|
||||
self.writeLog(u'服务器连接成功')
|
||||
|
||||
self.login()
|
||||
|
||||
# 推送合约数据
|
||||
for symbol in self.symbols:
|
||||
contract = VtContractData()
|
||||
contract.gatewayName = self.gatewayName
|
||||
contract.symbol = symbol
|
||||
contract.exchange = EXCHANGE_OKEX
|
||||
contract.vtSymbol = '.'.join([contract.symbol, contract.exchange])
|
||||
contract.name = symbol
|
||||
contract.size = 0.00001
|
||||
contract.priceTick = 0.00001
|
||||
contract.productClass = PRODUCT_SPOT
|
||||
self.gateway.onContract(contract)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def initCallback(self):
|
||||
"""初始化回调函数"""
|
||||
for symbol in self.symbols:
|
||||
# channel和symbol映射
|
||||
self.channelSymbolMap["ok_sub_spot_%s_ticker" % symbol] = symbol
|
||||
self.channelSymbolMap["ok_sub_spot_%s_depth_5" % symbol] = symbol
|
||||
|
||||
# channel和callback映射
|
||||
self.cbDict["ok_sub_spot_%s_ticker" % symbol] = self.onTicker
|
||||
self.cbDict["ok_sub_spot_%s_depth_5" % symbol] = self.onDepth
|
||||
self.cbDict["ok_sub_spot_%s_order" % symbol] = self.onSubSpotOrder
|
||||
self.cbDict["ok_sub_spot_%s_balance" % symbol] = self.onSubSpotBalance
|
||||
|
||||
self.cbDict['ok_spot_userinfo'] = self.onSpotUserInfo
|
||||
self.cbDict['ok_spot_orderinfo'] = self.onSpotOrderInfo
|
||||
self.cbDict['ok_spot_order'] = self.onSpotOrder
|
||||
self.cbDict['ok_spot_cancel_order'] = self.onSpotCancelOrder
|
||||
self.cbDict['login'] = self.onLogin
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onLogin(self, data):
|
||||
""""""
|
||||
self.writeLog(u'服务器登录成功')
|
||||
|
||||
# 查询持仓
|
||||
self.spotUserInfo()
|
||||
|
||||
# 订阅推送
|
||||
for symbol in self.symbols:
|
||||
self.subscribe(symbol)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onTicker(self, data):
|
||||
""""""
|
||||
channel = data['channel']
|
||||
symbol = self.channelSymbolMap[channel]
|
||||
|
||||
if symbol not in self.tickDict:
|
||||
tick = VtTickData()
|
||||
tick.symbol = symbol
|
||||
tick.exchange = EXCHANGE_OKEX
|
||||
tick.vtSymbol = '.'.join([tick.symbol, tick.exchange])
|
||||
tick.gatewayName = self.gatewayName
|
||||
|
||||
self.tickDict[symbol] = tick
|
||||
else:
|
||||
tick = self.tickDict[symbol]
|
||||
|
||||
d = data['data']
|
||||
tick.highPrice = float(d['high'])
|
||||
tick.lowPrice = float(d['low'])
|
||||
tick.lastPrice = float(d['last'])
|
||||
tick.volume = float(d['vol'].replace(',', ''))
|
||||
tick.date, tick.time = self.generateDateTime(d['timestamp'])
|
||||
|
||||
if tick.bidPrice1:
|
||||
newtick = copy(tick)
|
||||
self.gateway.onTick(newtick)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onDepth(self, data):
|
||||
""""""
|
||||
channel = data['channel']
|
||||
symbol = self.channelSymbolMap[channel]
|
||||
|
||||
if symbol not in self.tickDict:
|
||||
tick = VtTickData()
|
||||
tick.symbol = symbol
|
||||
tick.exchange = EXCHANGE_OKEX
|
||||
tick.vtSymbol = '.'.join([tick.symbol, tick.exchange])
|
||||
tick.gatewayName = self.gatewayName
|
||||
|
||||
self.tickDict[symbol] = tick
|
||||
else:
|
||||
tick = self.tickDict[symbol]
|
||||
|
||||
d = data['data']
|
||||
|
||||
tick.bidPrice1, tick.bidVolume1 = d['bids'][0]
|
||||
tick.bidPrice2, tick.bidVolume2 = d['bids'][1]
|
||||
tick.bidPrice3, tick.bidVolume3 = d['bids'][2]
|
||||
tick.bidPrice4, tick.bidVolume4 = d['bids'][3]
|
||||
tick.bidPrice5, tick.bidVolume5 = d['bids'][4]
|
||||
|
||||
tick.askPrice1, tick.askVolume1 = d['asks'][-1]
|
||||
tick.askPrice2, tick.askVolume2 = d['asks'][-2]
|
||||
tick.askPrice3, tick.askVolume3 = d['asks'][-3]
|
||||
tick.askPrice4, tick.askVolume4 = d['asks'][-4]
|
||||
tick.askPrice5, tick.askVolume5 = d['asks'][-5]
|
||||
|
||||
tick.date, tick.time = self.generateDateTime(d['timestamp'])
|
||||
|
||||
if tick.lastPrice:
|
||||
newtick = copy(tick)
|
||||
self.gateway.onTick(newtick)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onSpotOrder(self, data):
|
||||
""""""
|
||||
# 如果委托失败,则通知委托被拒单的信息
|
||||
if self.checkDataError(data):
|
||||
try:
|
||||
localNo = self.localNoQueue.get_nowait()
|
||||
except Empty:
|
||||
return
|
||||
|
||||
order = self.localOrderDict[localNo]
|
||||
order.status = STATUS_REJECTED
|
||||
self.gateway.onOrder(order)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onSpotCancelOrder(self, data):
|
||||
""""""
|
||||
self.checkDataError(data)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onSpotUserInfo(self, data):
|
||||
"""现货账户资金推送"""
|
||||
if self.checkDataError(data):
|
||||
return
|
||||
|
||||
funds = data['data']['info']['funds']
|
||||
free = funds['free']
|
||||
freezed = funds['freezed']
|
||||
|
||||
# 持仓信息
|
||||
for symbol in free.keys():
|
||||
frozen = float(freezed[symbol])
|
||||
available = float(free[symbol])
|
||||
|
||||
if frozen or available:
|
||||
pos = VtPositionData()
|
||||
pos.gatewayName = self.gatewayName
|
||||
|
||||
pos.symbol = symbol
|
||||
pos.exchange = EXCHANGE_OKEX
|
||||
pos.vtSymbol = '.'.join([pos.symbol, pos.exchange])
|
||||
pos.direction = DIRECTION_LONG
|
||||
pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction])
|
||||
|
||||
pos.frozen = frozen
|
||||
pos.position = frozen + available
|
||||
|
||||
self.gateway.onPosition(pos)
|
||||
|
||||
self.writeLog(u'持仓信息查询成功')
|
||||
|
||||
# 查询委托
|
||||
for symbol in self.symbols:
|
||||
self.spotOrderInfo(symbol, '-1')
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onSpotOrderInfo(self, data):
|
||||
"""委托信息查询回调"""
|
||||
if self.checkDataError(data):
|
||||
return
|
||||
|
||||
rawData = data['data']
|
||||
|
||||
for d in rawData['orders']:
|
||||
self.localNo += 1
|
||||
localNo = str(self.localNo)
|
||||
orderId = str(d['order_id'])
|
||||
|
||||
self.localNoDict[localNo] = orderId
|
||||
self.orderIdDict[orderId] = localNo
|
||||
|
||||
if orderId not in self.orderDict:
|
||||
order = VtOrderData()
|
||||
order.gatewayName = self.gatewayName
|
||||
|
||||
order.symbol = d['symbol']
|
||||
order.exchange = EXCHANGE_OKEX
|
||||
order.vtSymbol = '.'.join([order.symbol, order.exchange])
|
||||
|
||||
order.orderID = localNo
|
||||
order.vtOrderID = '.'.join([self.gatewayName, order.orderID])
|
||||
|
||||
order.price = d['price']
|
||||
order.totalVolume = d['amount']
|
||||
order.direction, priceType = priceTypeMap[d['type']]
|
||||
date, order.orderTime = self.generateDateTime(d['create_date'])
|
||||
|
||||
self.orderDict[orderId] = order
|
||||
else:
|
||||
order = self.orderDict[orderId]
|
||||
|
||||
order.tradedVolume = d['deal_amount']
|
||||
order.status = statusMap[d['status']]
|
||||
|
||||
self.gateway.onOrder(copy(order))
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onSubSpotOrder(self, data):
|
||||
"""交易数据"""
|
||||
rawData = data["data"]
|
||||
orderId = str(rawData['orderId'])
|
||||
|
||||
# 获取本地委托号
|
||||
if orderId in self.orderIdDict:
|
||||
localNo = self.orderIdDict[orderId]
|
||||
else:
|
||||
try:
|
||||
localNo = self.localNoQueue.get_nowait()
|
||||
except Empty:
|
||||
self.localNo += 1
|
||||
localNo = str(self.localNo)
|
||||
|
||||
self.localNoDict[localNo] = orderId
|
||||
self.orderIdDict[orderId] = localNo
|
||||
|
||||
# 获取委托对象
|
||||
if orderId in self.orderDict:
|
||||
order = self.orderDict[orderId]
|
||||
else:
|
||||
order = VtOrderData()
|
||||
order.gatewayName = self.gatewayName
|
||||
order.symbol = rawData['symbol']
|
||||
order.exchange = EXCHANGE_OKEX
|
||||
order.vtSymbol = '.'.join([order.symbol, order.exchange])
|
||||
order.orderID = localNo
|
||||
order.vtOrderID = '.'.join([self.gatewayName, localNo])
|
||||
order.direction, priceType = priceTypeMap[rawData['tradeType']]
|
||||
order.price = float(rawData['tradeUnitPrice'])
|
||||
order.totalVolume = float(rawData['tradeAmount'])
|
||||
date, order.orderTime = self.generateDateTime(rawData['createdDate'])
|
||||
|
||||
lastTradedVolume = order.tradedVolume
|
||||
|
||||
order.status = statusMap[rawData['status']]
|
||||
order.tradedVolume = float(rawData['completedTradeAmount'])
|
||||
self.gateway.onOrder(copy(order))
|
||||
|
||||
# 成交信息
|
||||
if order.tradedVolume > lastTradedVolume:
|
||||
trade = VtTradeData()
|
||||
trade.gatewayName = self.gatewayName
|
||||
|
||||
trade.symbol = order.symbol
|
||||
trade.exchange = order.exchange
|
||||
trade.vtSymbol = order.vtSymbol
|
||||
|
||||
self.tradeID += 1
|
||||
trade.tradeID = str(self.tradeID)
|
||||
trade.vtTradeID = '.'.join([self.gatewayName, trade.tradeID])
|
||||
|
||||
trade.orderID = order.orderID
|
||||
trade.vtOrderID = order.vtOrderID
|
||||
|
||||
trade.direction = order.direction
|
||||
trade.price = float(rawData['averagePrice'])
|
||||
trade.volume = order.tradedVolume - lastTradedVolume
|
||||
|
||||
trade.tradeTime = datetime.now().strftime('%H:%M:%S')
|
||||
self.gateway.onTrade(trade)
|
||||
|
||||
# 撤单
|
||||
if localNo in self.cancelDict:
|
||||
req = self.cancelDict[localNo]
|
||||
self.spotCancel(req)
|
||||
del self.cancelDict[localNo]
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def onSubSpotBalance(self, data):
|
||||
""""""
|
||||
rawData = data['data']
|
||||
free = rawData['info']['free']
|
||||
freezed = rawData['info']['freezed']
|
||||
|
||||
for symbol in free.keys():
|
||||
pos = VtPositionData()
|
||||
pos.gatewayName = self.gatewayName
|
||||
pos.symbol = symbol
|
||||
pos.exchange = EXCHANGE_OKEX
|
||||
pos.vtSymbol = '.'.join([pos.symbol, pos.exchange])
|
||||
pos.direction = DIRECTION_LONG
|
||||
pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction])
|
||||
pos.frozen = float(freezed[symbol])
|
||||
pos.position = pos.frozen + float(free[symbol])
|
||||
|
||||
self.gateway.onPosition(pos)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def init(self, apiKey, secretKey, trace, symbols):
|
||||
"""初始化接口"""
|
||||
self.symbols = symbols
|
||||
self.initCallback()
|
||||
|
||||
self.connect(OKEX_SPOT_HOST, apiKey, secretKey, trace)
|
||||
self.writeLog(u'接口初始化成功')
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def sendOrder(self, req):
|
||||
"""发单"""
|
||||
type_ = priceTypeMapReverse[(req.direction, req.priceType)]
|
||||
req.volume = 0.001
|
||||
self.spotOrder(req.symbol, type_, str(req.price), str(req.volume))
|
||||
|
||||
# 本地委托号加1,并将对应字符串保存到队列中,返回基于本地委托号的vtOrderID
|
||||
self.localNo += 1
|
||||
self.localNoQueue.put(str(self.localNo))
|
||||
vtOrderID = '.'.join([self.gatewayName, str(self.localNo)])
|
||||
|
||||
# 缓存委托信息
|
||||
order = VtOrderData()
|
||||
order.gatewayName = self.gatewayName
|
||||
|
||||
order.symbol = req.symbol
|
||||
order.exchange = EXCHANGE_OKEX
|
||||
order.vtSymbol = '.'.join([order.symbol, order.exchange])
|
||||
order.orderID= str(self.localNo)
|
||||
order.vtOrderID = vtOrderID
|
||||
order.direction = req.direction
|
||||
order.price = req.price
|
||||
order.totalVolume = req.volume
|
||||
|
||||
self.localOrderDict[str(self.localNo)] = order
|
||||
|
||||
return vtOrderID
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def cancelOrder(self, req):
|
||||
"""撤单"""
|
||||
localNo = req.orderID
|
||||
if localNo in self.localNoDict:
|
||||
orderID = self.localNoDict[localNo]
|
||||
self.spotCancelOrder(req.symbol, orderID)
|
||||
else:
|
||||
# 如果在系统委托号返回前客户就发送了撤单请求,则保存
|
||||
# 在cancelDict字典中,等待返回后执行撤单任务
|
||||
self.cancelDict[localNo] = req
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def generateDateTime(self, s):
|
||||
"""生成时间"""
|
||||
dt = datetime.fromtimestamp(float(s)/1e3)
|
||||
time = dt.strftime("%H:%M:%S.%f")
|
||||
date = dt.strftime("%Y%m%d")
|
||||
return date, time
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def writeLog(self, content):
|
||||
"""快速记录日志"""
|
||||
log = VtLogData()
|
||||
log.gatewayName = self.gatewayName
|
||||
log.logContent = content
|
||||
self.gateway.onLog(log)
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def checkDataError(self, data):
|
||||
"""检查回报是否存在错误"""
|
||||
rawData = data['data']
|
||||
if 'error_code' not in rawData:
|
||||
return False
|
||||
else:
|
||||
error = VtErrorData()
|
||||
error.gatewayName = self.gatewayName
|
||||
error.errorID = rawData['error_code']
|
||||
error.errorMsg = u'请求失败,功能:%s' %data['channel']
|
||||
self.gateway.onError(error)
|
||||
return True
|
||||
|
||||
#----------------------------------------------------------------------
|
||||
def subscribe(self, symbol):
|
||||
"""订阅行情"""
|
||||
symbol = symbol
|
||||
|
||||
self.subscribeSpotTicker(symbol)
|
||||
self.subscribeSpotDepth(symbol, 5)
|
||||
self.subSpotOrder(symbol)
|
||||
self.subSpotBalance(symbol)
|
||||
|
Loading…
Reference in New Issue
Block a user