From 762a838b3d0acaa07f29c26f4afb6eb8f0211d20 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Fri, 1 Jun 2018 09:34:01 +0800 Subject: [PATCH] =?UTF-8?q?[Add]=E5=88=9D=E6=AD=A5=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=E5=90=8E=E7=9A=84OKEX=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/api/okex/README.md | 14 + vnpy/api/okex/__init__.py | 4 + vnpy/api/okex/vnokex.py | 332 ++++++++++ .../gateway/okexGateway/OKEX_connect.json | 6 + vnpy/trader/gateway/okexGateway/__init__.py | 12 + .../trader/gateway/okexGateway/okexGateway.py | 611 ++++++++++++++++++ 6 files changed, 979 insertions(+) create mode 100644 vnpy/api/okex/README.md create mode 100644 vnpy/api/okex/__init__.py create mode 100644 vnpy/api/okex/vnokex.py create mode 100644 vnpy/trader/gateway/okexGateway/OKEX_connect.json create mode 100644 vnpy/trader/gateway/okexGateway/__init__.py create mode 100644 vnpy/trader/gateway/okexGateway/okexGateway.py diff --git a/vnpy/api/okex/README.md b/vnpy/api/okex/README.md new file mode 100644 index 00000000..f98f155c --- /dev/null +++ b/vnpy/api/okex/README.md @@ -0,0 +1,14 @@ +### 简介 + +OKEX的比特币交易接口,基于Websocket API开发,实现了以下功能: + +1. 发送、撤销委托 + +2. 查询委托、持仓、资金、成交历史 + +3. 实时行情、成交、资金更新的推送 + +### API信息 + +链接:[https://www.okex.com/ws_getStarted.html](https://www.okex.com/ws_getStarted.html) + diff --git a/vnpy/api/okex/__init__.py b/vnpy/api/okex/__init__.py new file mode 100644 index 00000000..2b7ddb71 --- /dev/null +++ b/vnpy/api/okex/__init__.py @@ -0,0 +1,4 @@ +# encoding: UTF-8 + +from __future__ import absolute_import +from .vnokex import OkexSpotApi, SPOT_CURRENCY, SPOT_SYMBOL, OKEX_SPOT_HOST \ No newline at end of file diff --git a/vnpy/api/okex/vnokex.py b/vnpy/api/okex/vnokex.py new file mode 100644 index 00000000..b2b86db0 --- /dev/null +++ b/vnpy/api/okex/vnokex.py @@ -0,0 +1,332 @@ +# encoding: UTF-8 + +from __future__ import print_function + +import hashlib +import json +from threading import Thread +from time import sleep + +import websocket + +# 常量定义 +OKEX_SPOT_HOST = 'wss://real.okex.com:10441/websocket' + + +SPOT_CURRENCY = ["usdt", + "btc", + "ltc", + "eth", + "etc", + "bch"] + +SPOT_SYMBOL = ["ltc_btc", + "eth_btc", + "etc_btc", + "bch_btc", + "btc_usdt", + "eth_usdt", + "ltc_usdt", + "etc_usdt", + "bch_usdt", + "etc_eth", + "bt1_btc", + "bt2_btc", + "btg_btc", + "qtum_btc", + "hsr_btc", + "neo_btc", + "gas_btc", + "qtum_usdt", + "hsr_usdt", + "neo_usdt", + "gas_usdt"] + +KLINE_PERIOD = ["1min", + "3min", + "5min", + "15min", + "30min", + "1hour", + "2hour", + "4hour", + "6hour", + "12hour", + "day", + "3day", + "week"] + + +######################################################################## +class OkexApi(object): + """交易接口""" + + #---------------------------------------------------------------------- + def __init__(self): + """Constructor""" + self.host = '' # 服务器 + self.apiKey = '' # 用户名 + self.secretKey = '' # 密码 + + self.active = False # 工作状态 + self.ws = None # websocket应用对象 + self.wsThread = None # websocket工作线程 + + self.heartbeatCount = 0 # 心跳计数 + self.heartbeatThread = None # 心跳线程 + self.heartbeatReceived = True # 心跳是否收到 + + #---------------------------------------------------------------------- + def heartbeat(self): + """""" + while self.active: + self.heartbeatCount += 1 + + if self.heartbeatCount < 30: + sleep(1) + else: + self.heartbeatCount = 0 + + if not self.heartbeatReceived: + self.reconnect() + else: + d = {'event': 'ping'} + j = json.dumps(d) + self.ws.send(j) + self.heartbeatReceived = False + + #---------------------------------------------------------------------- + def reconnect(self): + """重新连接""" + self.close() # 首先关闭之前的连接 + self.initWebsocket() + + #---------------------------------------------------------------------- + def connect(self, host, apiKey, secretKey, trace=False): + """连接""" + self.host = host + self.apiKey = apiKey + self.secretKey = secretKey + + websocket.enableTrace(trace) + + self.initWebsocket() + self.active = True + + #---------------------------------------------------------------------- + def initWebsocket(self): + """""" + self.ws = websocket.WebSocketApp(self.host, + on_message=self.onMessageCallback, + on_error=self.onErrorCallback, + on_close=self.onCloseCallback, + on_open=self.onOpenCallback) + + self.wsThread = Thread(target=self.ws.run_forever) + self.wsThread.start() + + #---------------------------------------------------------------------- + def readData(self, evt): + """解码推送收到的数据""" + data = json.loads(evt) + return data + + #---------------------------------------------------------------------- + def close(self): + """关闭接口""" + if self.heartbeatThread and self.heartbeatThread.isAlive(): + self.active = False + self.heartbeatThread.join() + + if self.wsThread and self.wsThread.isAlive(): + self.ws.close() + self.wsThread.join() + + #---------------------------------------------------------------------- + def onMessage(self, data): + """信息推送""" + print('onMessage') + print(evt) + + #---------------------------------------------------------------------- + def onError(self, data): + """错误推送""" + print('onError') + print(evt) + + #---------------------------------------------------------------------- + def onClose(self): + """接口断开""" + print('onClose') + + #---------------------------------------------------------------------- + def onOpen(self): + """接口打开""" + print('onOpen') + + #---------------------------------------------------------------------- + def onMessageCallback(self, ws, evt): + """""" + data = self.readData(evt) + if 'event' in data: + self.heartbeatReceived = True + else: + self.onMessage(data[0]) + + #---------------------------------------------------------------------- + def onErrorCallback(self, ws, evt): + """""" + self.onError(evt) + + #---------------------------------------------------------------------- + def onCloseCallback(self, ws): + """""" + self.onClose() + + #---------------------------------------------------------------------- + def onOpenCallback(self, ws): + """""" + self.heartbeatThread = Thread(target=self.heartbeat) + self.heartbeatThread.start() + + self.onOpen() + + #---------------------------------------------------------------------- + def generateSign(self, params): + """生成签名""" + l = [] + for key in sorted(params.keys()): + l.append('%s=%s' %(key, params[key])) + l.append('secret_key=%s' %self.secretKey) + sign = '&'.join(l) + return hashlib.md5(sign.encode('utf-8')).hexdigest().upper() + + #---------------------------------------------------------------------- + def sendRequest(self, channel, params=None): + """发送请求""" + # 生成请求 + d = {} + d['event'] = 'addChannel' + d['channel'] = channel + + # 如果有参数,在参数字典中加上api_key和签名字段 + if params is not None: + params['api_key'] = self.apiKey + params['sign'] = self.generateSign(params) + d['parameters'] = params + + # 使用json打包并发送 + j = json.dumps(d) + + # 若触发异常则重连 + try: + self.ws.send(j) + except websocket.WebSocketConnectionClosedException: + pass + + #---------------------------------------------------------------------- + def login(self): + params = {} + params['api_key'] = self.apiKey + params['sign'] = self.generateSign(params) + + # 生成请求 + d = {} + d['event'] = 'login' + d['parameters'] = params + j = json.dumps(d) + + # 若触发异常则重连 + try: + self.ws.send(j) + return True + except websocket.WebSocketConnectionClosedException: + return False + + +######################################################################## +class OkexSpotApi(OkexApi): + """现货交易接口""" + + #---------------------------------------------------------------------- + def __init__(self): + """Constructor""" + super(OkexSpotApi, self).__init__() + + #---------------------------------------------------------------------- + def subscribeSpotTicker(self, symbol): + """订阅现货的Tick""" + channel = 'ok_sub_spot_%s_ticker' %symbol + self.sendRequest(channel) + + #---------------------------------------------------------------------- + def subscribeSpotDepth(self, symbol, depth=0): + """订阅现货的深度""" + channel = 'ok_sub_spot_%s_depth' %symbol + if depth: + channel = channel + '_' + str(depth) + self.sendRequest(channel) + + #---------------------------------------------------------------------- + def subscribeSpotDeals(self, symbol): + channel = 'ok_sub_spot_%s_deals' %symbol + self.sendRequest(channel) + + #---------------------------------------------------------------------- + def subscribeSpotKlines(self, symbol, period): + channel = 'ok_sub_spot_%s_kline_%s' %(symbol, period) + self.sendRequest(channel) + + #---------------------------------------------------------------------- + def spotOrder(self, symbol, type_, price, amount): + """现货委托""" + params = {} + params['symbol'] = str(symbol) + params['type'] = str(type_) + params['price'] = str(price) + params['amount'] = str(amount) + + channel = 'ok_spot_order' + + self.sendRequest(channel, params) + + #---------------------------------------------------------------------- + def spotCancelOrder(self, symbol, orderid): + """现货撤单""" + params = {} + params['symbol'] = str(symbol) + params['order_id'] = str(orderid) + + channel = 'ok_spot_cancel_order' + + self.sendRequest(channel, params) + + #---------------------------------------------------------------------- + def spotUserInfo(self): + """查询现货账户""" + channel = 'ok_spot_userinfo' + self.sendRequest(channel, {}) + + #---------------------------------------------------------------------- + def spotOrderInfo(self, symbol, orderid): + """查询现货委托信息""" + params = {} + params['symbol'] = str(symbol) + params['order_id'] = str(orderid) + + channel = 'ok_spot_orderinfo' + + self.sendRequest(channel, params) + + #---------------------------------------------------------------------- + def subSpotOrder(self, symbol): + """订阅委托推送""" + channel = 'ok_sub_spot_%s_order' %symbol + self.sendRequest(channel) + + #---------------------------------------------------------------------- + def subSpotBalance(self, symbol): + """订阅资金推送""" + channel = 'ok_sub_spot_%s_balance' %symbol + self.sendRequest(channel) + diff --git a/vnpy/trader/gateway/okexGateway/OKEX_connect.json b/vnpy/trader/gateway/okexGateway/OKEX_connect.json new file mode 100644 index 00000000..5c77f027 --- /dev/null +++ b/vnpy/trader/gateway/okexGateway/OKEX_connect.json @@ -0,0 +1,6 @@ +{ + "apiKey": "", + "secretKey": "", + "trace": false, + "symbols": ["eth_btc", "btc_usdt", "eth_usdt"] +} \ No newline at end of file diff --git a/vnpy/trader/gateway/okexGateway/__init__.py b/vnpy/trader/gateway/okexGateway/__init__.py new file mode 100644 index 00000000..5f0265a9 --- /dev/null +++ b/vnpy/trader/gateway/okexGateway/__init__.py @@ -0,0 +1,12 @@ +# encoding: UTF-8 + +from __future__ import absolute_import +from vnpy.trader import vtConstant +from .okexGateway import OkexGateway + +gatewayClass = OkexGateway +gatewayName = 'OKEX' +gatewayDisplayName = u'OKEX' +gatewayType = vtConstant.GATEWAYTYPE_BTC +gatewayQryEnabled = True + diff --git a/vnpy/trader/gateway/okexGateway/okexGateway.py b/vnpy/trader/gateway/okexGateway/okexGateway.py new file mode 100644 index 00000000..9ee1cf79 --- /dev/null +++ b/vnpy/trader/gateway/okexGateway/okexGateway.py @@ -0,0 +1,611 @@ +# encoding: UTF-8 + +''' +vnpy.api.okex的gateway接入 +''' +from __future__ import print_function + + +import os +import json +from datetime import datetime +from time import sleep +from copy import copy +from threading import Condition +from queue import Queue, Empty +from threading import Thread +from time import sleep + +from vnpy.api.okex import OkexSpotApi, OKEX_SPOT_HOST +from vnpy.trader.vtGateway import * +from vnpy.trader.vtFunction import getJsonPath + +# 价格类型映射 +# 买卖类型: 限价单(buy/sell) 市价单(buy_market/sell_market) +priceTypeMap = {} +priceTypeMap['buy'] = (DIRECTION_LONG, PRICETYPE_LIMITPRICE) +priceTypeMap['buy_market'] = (DIRECTION_LONG, PRICETYPE_MARKETPRICE) +priceTypeMap['sell'] = (DIRECTION_SHORT, PRICETYPE_LIMITPRICE) +priceTypeMap['sell_market'] = (DIRECTION_SHORT, PRICETYPE_MARKETPRICE) +priceTypeMapReverse = {v: k for k, v in priceTypeMap.items()} + +# 委托状态印射 +statusMap = {} +statusMap[-1] = STATUS_CANCELLED +statusMap[0] = STATUS_NOTTRADED +statusMap[1] = STATUS_PARTTRADED +statusMap[2] = STATUS_ALLTRADED +statusMap[4] = STATUS_UNKNOWN + + +######################################################################## +class OkexGateway(VtGateway): + """OKEX交易接口""" + + #---------------------------------------------------------------------- + def __init__(self, eventEngine, gatewayName='OKEX'): + """Constructor""" + super(OkexGateway, self).__init__(eventEngine, gatewayName) + + self.spotApi = SpotApi(self) + # self.api_contract = Api_contract(self) + + self.leverage = 0 + self.connected = False + + self.fileName = self.gatewayName + '_connect.json' + self.filePath = getJsonPath(self.fileName, __file__) + + #---------------------------------------------------------------------- + def connect(self): + """连接""" + # 载入json文件 + try: + f = file(self.filePath) + except IOError: + log = VtLogData() + log.gatewayName = self.gatewayName + log.logContent = u'读取连接配置出错,请检查' + self.onLog(log) + return + + # 解析json文件 + setting = json.load(f) + try: + apiKey = str(setting['apiKey']) + secretKey = str(setting['secretKey']) + trace = setting['trace'] + symbols = setting['symbols'] + except KeyError: + log = VtLogData() + log.gatewayName = self.gatewayName + log.logContent = u'连接配置缺少字段,请检查' + self.onLog(log) + return + + # 初始化接口 + self.spotApi.init(apiKey, secretKey, trace, symbols) + + #---------------------------------------------------------------------- + def subscribe(self, subscribeReq): + """订阅行情""" + pass + + #---------------------------------------------------------------------- + def sendOrder(self, orderReq): + """发单""" + return self.spotApi.sendOrder(orderReq) + + #---------------------------------------------------------------------- + def cancelOrder(self, cancelOrderReq): + """撤单""" + self.spotApi.cancelOrder(cancelOrderReq) + + #---------------------------------------------------------------------- + def qryAccount(self): + """查询账户资金""" + pass + + #---------------------------------------------------------------------- + def qryPosition(self): + """查询持仓""" + self.spotApi.spotUserInfo() + + #---------------------------------------------------------------------- + def close(self): + """关闭""" + self.spotApi.close() + + #---------------------------------------------------------------------- + def initQuery(self): + """初始化连续查询""" + if self.qryEnabled: + # 需要循环的查询函数列表 + self.qryFunctionList = [self.qryPosition] + + self.qryCount = 0 # 查询触发倒计时 + self.qryTrigger = 2 # 查询触发点 + self.qryNextFunction = 0 # 上次运行的查询函数索引 + + self.startQuery() + + #---------------------------------------------------------------------- + def query(self, event): + """注册到事件处理引擎上的查询函数""" + self.qryCount += 1 + + if self.qryCount > self.qryTrigger: + # 清空倒计时 + self.qryCount = 0 + + # 执行查询函数 + function = self.qryFunctionList[self.qryNextFunction] + function() + + # 计算下次查询函数的索引,如果超过了列表长度,则重新设为0 + self.qryNextFunction += 1 + if self.qryNextFunction == len(self.qryFunctionList): + self.qryNextFunction = 0 + + #---------------------------------------------------------------------- + def startQuery(self): + """启动连续查询""" + self.eventEngine.register(EVENT_TIMER, self.query) + + #---------------------------------------------------------------------- + def setQryEnabled(self, qryEnabled): + """设置是否要启动循环查询""" + self.qryEnabled = qryEnabled + + +######################################################################## +class SpotApi(OkexSpotApi): + """OKEX的API实现""" + + #---------------------------------------------------------------------- + def __init__(self, gateway): + """Constructor""" + super(SpotApi, self).__init__() + + self.gateway = gateway # gateway对象 + self.gatewayName = gateway.gatewayName # gateway对象名称 + + self.cbDict = {} + self.tickDict = {} + self.orderDict = {} + + self.channelSymbolMap = {} + + self.localNo = 0 # 本地委托号 + self.localNoQueue = Queue() # 未收到系统委托号的本地委托号队列 + self.localNoDict = {} # key为本地委托号,value为系统委托号 + self.localOrderDict = {} # key为本地委托号, value为委托对象 + self.orderIdDict = {} # key为系统委托号,value为本地委托号 + self.cancelDict = {} # key为本地委托号,value为撤单请求 + + self.recordOrderId_BefVolume = {} # 记录的之前处理的量 + + self.cache_some_order = {} + self.tradeID = 0 + + self.registerSymbolPairArray = set([]) + + #---------------------------------------------------------------------- + def onMessage(self, data): + """信息推送""" + channel = data.get('channel', '') + if not channel: + return + + if channel in self.cbDict: + callback = self.cbDict[channel] + callback(data) + + #---------------------------------------------------------------------- + def onError(self, data): + """错误推送""" + error = VtErrorData() + error.gatewayName = self.gatewayName + error.errorMsg = str(data) + self.gateway.onError(error) + + #---------------------------------------------------------------------- + def onClose(self): + """接口断开""" + self.gateway.connected = False + self.writeLog(u'服务器连接断开') + + #---------------------------------------------------------------------- + def onOpen(self): + """连接成功""" + self.gateway.connected = True + self.writeLog(u'服务器连接成功') + + self.login() + + # 推送合约数据 + for symbol in self.symbols: + contract = VtContractData() + contract.gatewayName = self.gatewayName + contract.symbol = symbol + contract.exchange = EXCHANGE_OKEX + contract.vtSymbol = '.'.join([contract.symbol, contract.exchange]) + contract.name = symbol + contract.size = 0.00001 + contract.priceTick = 0.00001 + contract.productClass = PRODUCT_SPOT + self.gateway.onContract(contract) + + #---------------------------------------------------------------------- + def initCallback(self): + """初始化回调函数""" + for symbol in self.symbols: + # channel和symbol映射 + self.channelSymbolMap["ok_sub_spot_%s_ticker" % symbol] = symbol + self.channelSymbolMap["ok_sub_spot_%s_depth_5" % symbol] = symbol + + # channel和callback映射 + self.cbDict["ok_sub_spot_%s_ticker" % symbol] = self.onTicker + self.cbDict["ok_sub_spot_%s_depth_5" % symbol] = self.onDepth + self.cbDict["ok_sub_spot_%s_order" % symbol] = self.onSubSpotOrder + self.cbDict["ok_sub_spot_%s_balance" % symbol] = self.onSubSpotBalance + + self.cbDict['ok_spot_userinfo'] = self.onSpotUserInfo + self.cbDict['ok_spot_orderinfo'] = self.onSpotOrderInfo + self.cbDict['ok_spot_order'] = self.onSpotOrder + self.cbDict['ok_spot_cancel_order'] = self.onSpotCancelOrder + self.cbDict['login'] = self.onLogin + + #---------------------------------------------------------------------- + def onLogin(self, data): + """""" + self.writeLog(u'服务器登录成功') + + # 查询持仓 + self.spotUserInfo() + + # 订阅推送 + for symbol in self.symbols: + self.subscribe(symbol) + + #---------------------------------------------------------------------- + def onTicker(self, data): + """""" + channel = data['channel'] + symbol = self.channelSymbolMap[channel] + + if symbol not in self.tickDict: + tick = VtTickData() + tick.symbol = symbol + tick.exchange = EXCHANGE_OKEX + tick.vtSymbol = '.'.join([tick.symbol, tick.exchange]) + tick.gatewayName = self.gatewayName + + self.tickDict[symbol] = tick + else: + tick = self.tickDict[symbol] + + d = data['data'] + tick.highPrice = float(d['high']) + tick.lowPrice = float(d['low']) + tick.lastPrice = float(d['last']) + tick.volume = float(d['vol'].replace(',', '')) + tick.date, tick.time = self.generateDateTime(d['timestamp']) + + if tick.bidPrice1: + newtick = copy(tick) + self.gateway.onTick(newtick) + + #---------------------------------------------------------------------- + def onDepth(self, data): + """""" + channel = data['channel'] + symbol = self.channelSymbolMap[channel] + + if symbol not in self.tickDict: + tick = VtTickData() + tick.symbol = symbol + tick.exchange = EXCHANGE_OKEX + tick.vtSymbol = '.'.join([tick.symbol, tick.exchange]) + tick.gatewayName = self.gatewayName + + self.tickDict[symbol] = tick + else: + tick = self.tickDict[symbol] + + d = data['data'] + + tick.bidPrice1, tick.bidVolume1 = d['bids'][0] + tick.bidPrice2, tick.bidVolume2 = d['bids'][1] + tick.bidPrice3, tick.bidVolume3 = d['bids'][2] + tick.bidPrice4, tick.bidVolume4 = d['bids'][3] + tick.bidPrice5, tick.bidVolume5 = d['bids'][4] + + tick.askPrice1, tick.askVolume1 = d['asks'][-1] + tick.askPrice2, tick.askVolume2 = d['asks'][-2] + tick.askPrice3, tick.askVolume3 = d['asks'][-3] + tick.askPrice4, tick.askVolume4 = d['asks'][-4] + tick.askPrice5, tick.askVolume5 = d['asks'][-5] + + tick.date, tick.time = self.generateDateTime(d['timestamp']) + + if tick.lastPrice: + newtick = copy(tick) + self.gateway.onTick(newtick) + + #---------------------------------------------------------------------- + def onSpotOrder(self, data): + """""" + # 如果委托失败,则通知委托被拒单的信息 + if self.checkDataError(data): + try: + localNo = self.localNoQueue.get_nowait() + except Empty: + return + + order = self.localOrderDict[localNo] + order.status = STATUS_REJECTED + self.gateway.onOrder(order) + + #---------------------------------------------------------------------- + def onSpotCancelOrder(self, data): + """""" + self.checkDataError(data) + + #---------------------------------------------------------------------- + def onSpotUserInfo(self, data): + """现货账户资金推送""" + if self.checkDataError(data): + return + + funds = data['data']['info']['funds'] + free = funds['free'] + freezed = funds['freezed'] + + # 持仓信息 + for symbol in free.keys(): + frozen = float(freezed[symbol]) + available = float(free[symbol]) + + if frozen or available: + pos = VtPositionData() + pos.gatewayName = self.gatewayName + + pos.symbol = symbol + pos.exchange = EXCHANGE_OKEX + pos.vtSymbol = '.'.join([pos.symbol, pos.exchange]) + pos.direction = DIRECTION_LONG + pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction]) + + pos.frozen = frozen + pos.position = frozen + available + + self.gateway.onPosition(pos) + + self.writeLog(u'持仓信息查询成功') + + # 查询委托 + for symbol in self.symbols: + self.spotOrderInfo(symbol, '-1') + + #---------------------------------------------------------------------- + def onSpotOrderInfo(self, data): + """委托信息查询回调""" + if self.checkDataError(data): + return + + rawData = data['data'] + + for d in rawData['orders']: + self.localNo += 1 + localNo = str(self.localNo) + orderId = str(d['order_id']) + + self.localNoDict[localNo] = orderId + self.orderIdDict[orderId] = localNo + + if orderId not in self.orderDict: + order = VtOrderData() + order.gatewayName = self.gatewayName + + order.symbol = d['symbol'] + order.exchange = EXCHANGE_OKEX + order.vtSymbol = '.'.join([order.symbol, order.exchange]) + + order.orderID = localNo + order.vtOrderID = '.'.join([self.gatewayName, order.orderID]) + + order.price = d['price'] + order.totalVolume = d['amount'] + order.direction, priceType = priceTypeMap[d['type']] + date, order.orderTime = self.generateDateTime(d['create_date']) + + self.orderDict[orderId] = order + else: + order = self.orderDict[orderId] + + order.tradedVolume = d['deal_amount'] + order.status = statusMap[d['status']] + + self.gateway.onOrder(copy(order)) + + #---------------------------------------------------------------------- + def onSubSpotOrder(self, data): + """交易数据""" + rawData = data["data"] + orderId = str(rawData['orderId']) + + # 获取本地委托号 + if orderId in self.orderIdDict: + localNo = self.orderIdDict[orderId] + else: + try: + localNo = self.localNoQueue.get_nowait() + except Empty: + self.localNo += 1 + localNo = str(self.localNo) + + self.localNoDict[localNo] = orderId + self.orderIdDict[orderId] = localNo + + # 获取委托对象 + if orderId in self.orderDict: + order = self.orderDict[orderId] + else: + order = VtOrderData() + order.gatewayName = self.gatewayName + order.symbol = rawData['symbol'] + order.exchange = EXCHANGE_OKEX + order.vtSymbol = '.'.join([order.symbol, order.exchange]) + order.orderID = localNo + order.vtOrderID = '.'.join([self.gatewayName, localNo]) + order.direction, priceType = priceTypeMap[rawData['tradeType']] + order.price = float(rawData['tradeUnitPrice']) + order.totalVolume = float(rawData['tradeAmount']) + date, order.orderTime = self.generateDateTime(rawData['createdDate']) + + lastTradedVolume = order.tradedVolume + + order.status = statusMap[rawData['status']] + order.tradedVolume = float(rawData['completedTradeAmount']) + self.gateway.onOrder(copy(order)) + + # 成交信息 + if order.tradedVolume > lastTradedVolume: + trade = VtTradeData() + trade.gatewayName = self.gatewayName + + trade.symbol = order.symbol + trade.exchange = order.exchange + trade.vtSymbol = order.vtSymbol + + self.tradeID += 1 + trade.tradeID = str(self.tradeID) + trade.vtTradeID = '.'.join([self.gatewayName, trade.tradeID]) + + trade.orderID = order.orderID + trade.vtOrderID = order.vtOrderID + + trade.direction = order.direction + trade.price = float(rawData['averagePrice']) + trade.volume = order.tradedVolume - lastTradedVolume + + trade.tradeTime = datetime.now().strftime('%H:%M:%S') + self.gateway.onTrade(trade) + + # 撤单 + if localNo in self.cancelDict: + req = self.cancelDict[localNo] + self.spotCancel(req) + del self.cancelDict[localNo] + + #---------------------------------------------------------------------- + def onSubSpotBalance(self, data): + """""" + rawData = data['data'] + free = rawData['info']['free'] + freezed = rawData['info']['freezed'] + + for symbol in free.keys(): + pos = VtPositionData() + pos.gatewayName = self.gatewayName + pos.symbol = symbol + pos.exchange = EXCHANGE_OKEX + pos.vtSymbol = '.'.join([pos.symbol, pos.exchange]) + pos.direction = DIRECTION_LONG + pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction]) + pos.frozen = float(freezed[symbol]) + pos.position = pos.frozen + float(free[symbol]) + + self.gateway.onPosition(pos) + + #---------------------------------------------------------------------- + def init(self, apiKey, secretKey, trace, symbols): + """初始化接口""" + self.symbols = symbols + self.initCallback() + + self.connect(OKEX_SPOT_HOST, apiKey, secretKey, trace) + self.writeLog(u'接口初始化成功') + + #---------------------------------------------------------------------- + def sendOrder(self, req): + """发单""" + type_ = priceTypeMapReverse[(req.direction, req.priceType)] + req.volume = 0.001 + self.spotOrder(req.symbol, type_, str(req.price), str(req.volume)) + + # 本地委托号加1,并将对应字符串保存到队列中,返回基于本地委托号的vtOrderID + self.localNo += 1 + self.localNoQueue.put(str(self.localNo)) + vtOrderID = '.'.join([self.gatewayName, str(self.localNo)]) + + # 缓存委托信息 + order = VtOrderData() + order.gatewayName = self.gatewayName + + order.symbol = req.symbol + order.exchange = EXCHANGE_OKEX + order.vtSymbol = '.'.join([order.symbol, order.exchange]) + order.orderID= str(self.localNo) + order.vtOrderID = vtOrderID + order.direction = req.direction + order.price = req.price + order.totalVolume = req.volume + + self.localOrderDict[str(self.localNo)] = order + + return vtOrderID + + #---------------------------------------------------------------------- + def cancelOrder(self, req): + """撤单""" + localNo = req.orderID + if localNo in self.localNoDict: + orderID = self.localNoDict[localNo] + self.spotCancelOrder(req.symbol, orderID) + else: + # 如果在系统委托号返回前客户就发送了撤单请求,则保存 + # 在cancelDict字典中,等待返回后执行撤单任务 + self.cancelDict[localNo] = req + + #---------------------------------------------------------------------- + def generateDateTime(self, s): + """生成时间""" + dt = datetime.fromtimestamp(float(s)/1e3) + time = dt.strftime("%H:%M:%S.%f") + date = dt.strftime("%Y%m%d") + return date, time + + #---------------------------------------------------------------------- + def writeLog(self, content): + """快速记录日志""" + log = VtLogData() + log.gatewayName = self.gatewayName + log.logContent = content + self.gateway.onLog(log) + + #---------------------------------------------------------------------- + def checkDataError(self, data): + """检查回报是否存在错误""" + rawData = data['data'] + if 'error_code' not in rawData: + return False + else: + error = VtErrorData() + error.gatewayName = self.gatewayName + error.errorID = rawData['error_code'] + error.errorMsg = u'请求失败,功能:%s' %data['channel'] + self.gateway.onError(error) + return True + + #---------------------------------------------------------------------- + def subscribe(self, symbol): + """订阅行情""" + symbol = symbol + + self.subscribeSpotTicker(symbol) + self.subscribeSpotDepth(symbol, 5) + self.subSpotOrder(symbol) + self.subSpotBalance(symbol) + \ No newline at end of file