Merge pull request #1170 from nanoric/okex.websocket

Okex.websocket
This commit is contained in:
vn.py 2018-10-17 21:01:08 +08:00 committed by GitHub
commit 4541673f58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 725 additions and 230 deletions

View File

@ -1,7 +1,8 @@
# encoding: UTF-8
from enum import Enum
from typing import Any, Callable, List, Union
from vnpy.api.okexfuture.vnokexFuture import OkexFutureRestBase
from vnpy.api.okexfuture.vnokexFuture import OkexFutureRestBase, OkexFutureWebSocketBase
from vnpy.api.rest import Request
@ -12,8 +13,20 @@ class _OkexFutureCustomExtra(object):
self.onFailed = onFailed
self.onSuccess = onSuccess
self.extra = extra
########################################################################
class OkexFutureEasySymbol(object):
BTC = 'btc'
LTC = 'ltc'
ETH = 'eth'
ETC = 'etc'
BCH = 'bch'
EOS = 'eos'
XRP = 'xrp'
BTG = 'btg'
########################################################################
class OkexFutureSymbol(object):
BTC = 'btc_usd'
@ -114,6 +127,68 @@ class OkexFuturePositionDetail(object):
self.contractType = None
########################################################################
class OkexFutureTickInfo(object):
#----------------------------------------------------------------------
def __init__(self, symbol, remoteContractType, last, limitHigh, limitLow, vol, sell, buy, unitAmount, holdAmount,
contractId, high, low):
self.symbol = symbol
self.remoteContractType = remoteContractType
self.last = last
self.limitHigh = limitHigh # type: str # 最高买入限制价格
self.limitLow = limitLow # type: str # 最低卖出限制价格
self.vol = vol # type: float # 24 小时成交量
self.sell = sell # type: float # 卖一价格
self.buy = buy # type: float # 买一价格
self.unitAmount = unitAmount # type: float # 合约价值
self.holdAmount = holdAmount # type: float # 当前持仓量
self.contractId = contractId # type: long # 合约ID
self.high = high # type: float # 24 小时最高价格
self.low = low # type: float # 24 小时最低价格
########################################################################
class OkexFutureTradeInfo(object):
#----------------------------------------------------------------------
def __init__(self, symbol, remoteContractType, index, price, volume, time, direction, coinVolume):
self.symbol = symbol
self.remoteContractType = remoteContractType
self.index = index
self.price = price
self.volume = volume
self.time = time
self.direction = direction
self.coinVolume = coinVolume
########################################################################
class OkexFutureUserTradeInfo(object):
#----------------------------------------------------------------------
def __init__(self, symbol, remoteContractType, amount,
contractName, createdDate, createDateStr, dealAmount, fee,
orderId, price, priceAvg, status, type, unitAmount, leverRate, systemType
):
self.symbol = symbol # type: str # btcUsd ltcUsd ethUsd etcUsd bchUsd
self.remoteContractType = remoteContractType
self.amount = amount # type: float # 委托数量
self.contractName = contractName # type: str # 合约名称
self.createdDate = createdDate # type: long # 委托时间
self.createDateStr = createDateStr # type: str # 委托时间字符串
self.dealAmount = dealAmount # type: float # 成交数量
self.fee = fee # type: float # 手续费
self.remoteId = orderId # type: long # 订单ID
self.price = price # type: float # 订单价格
self.priceAvg = priceAvg # type: float # 平均价格
self.status = status # type: int # 订单状态(0等待成交 1部分成交 2全部成交 -1撤单 4撤单处理中)
self.type = type # type: int # 订单类型 1开多 2开空 3平多 4平空
self.unitAmount = unitAmount # type: float # 合约面值
self.leverRate = leverRate # type: float # 杠杆倍数 value:10/20 默认10
self.systemType = systemType # type: int # 订单类型 0:普通 1:交割 2:强平 4:全平 5:系统反单
########################################################################
class OkexFutureRestClient(OkexFutureRestBase):
@ -424,11 +499,104 @@ class OkexFutureRestClient(OkexFutureRestBase):
#---------------------------------------------------------------------
@staticmethod
def errorCodeToString(code):
assert code in errorCodeMap
return errorCodeMap[code]
assert code in restErrorCodeMap
return restErrorCodeMap[code]
errorCodeMap = {
########################################################################
class OkexFutureWebSocketClient(OkexFutureWebSocketBase):
#----------------------------------------------------------------------
def __init__(self):
super(OkexFutureWebSocketClient, self).__init__()
self.onTick = self.defaultOnTick
self.onUserTrade = self.defaultOnUserTrade
#----------------------------------------------------------------------
def subscribe(self, easySymbol, contractType): # type: (OkexFutureEasySymbol, OkexFutureContractType)->None
self.sendPacket({
'event': 'addChannel',
'channel': 'ok_sub_futureusd_' + easySymbol + '_ticker_' + contractType
})
#----------------------------------------------------------------------
def subscribeUserTrade(self):
# todo: 没有测试条件
self.sendPacket({
'event': 'addChannel',
'channel': 'ok_sub_futureusd_trades'
})
#----------------------------------------------------------------------
def defaultOnPacket(self, packets):
for packet in packets:
print('packets:')
print(packets)
channelName = None
if 'channel' in packet:
channelName = packet['channel']
if not channelName or channelName == 'addChannel':
return
packet = packet['data']
channel = parseChannel(channelName) # type: ExtraSymbolChannel
if channel.type == ChannelType.Tick:
self.onTick(OkexFutureTickInfo(
symbol=channel.symbol,
remoteContractType=channel.remoteContractType,
last=packet['last'], # float # 最高买入限制价格
limitHigh=packet['limitHigh'], # str # 最高买入限制价格
limitLow=packet['limitLow'], # str # 最低卖出限制价格
vol=packet['vol'], # float # 24 小时成交量
sell=packet['sell'], # float # 卖一价格
buy=packet['buy'], # float # 买一价格
unitAmount=packet['unitAmount'], # float # 合约价值
holdAmount=packet['hold_amount'], # float # 当前持仓量
contractId=packet['contractId'], # long # 合约ID
high=packet['high'], # float # 24 小时最高价格
low=packet['low'], # float # 24 小时最低价格
))
# elif channel.type == ChannelType.Trade:
# trades = []
# for tradeInfo in packet:
# trades.append(OkexFutureTradeInfo(
# channel.symbol, channel.remoteContractType, *tradeInfo
# ))
# self.onTrades(trades)
# todo: 没有测试条件
elif channel.type == ChannelType.UserTrade:
self.onUserTrade(OkexFutureUserTradeInfo(
symbol=packet['symbol'], # str # btc_usd ltc_usd eth_usd etc_usd bch_usd
remoteContractType=packet['contract_type'],
amount=packet['amount'], # float # 委托数量
contractName=packet['contract_name'], # str # 合约名称
createdDate=packet['created_date'], # long # 委托时间
createDateStr=packet['create_date_str'], # str # 委托时间字符串
dealAmount=packet['deal_amount'], # float # 成交数量
fee=packet['fee'], # float # 手续费
orderId=packet['order_id'], # long # 订单ID
price=packet['price'], # float # 订单价格
priceAvg=packet['price_avg'], # float # 平均价格
status=packet['status'], # int # 订单状态(0等待成交 1部分成交 2全部成交 -1撤单 4撤单处理中)
type=packet['type'], # int # 订单类型 1开多 2开空 3平多 4平空
unitAmount=packet['unit_amount'], # float # 合约面值
leverRate=packet['lever_rate'], # float # 杠杆倍数 value:10/20 默认10
systemType=packet['system_type'], # int # 订单类型 0:普通 1:交割 2:强平 4:全平 5:系统反单
))
#----------------------------------------------------------------------
def defaultOnTick(self, tick): # type: (OkexFutureTickInfo)->None
pass
#----------------------------------------------------------------------
def defaultOnUserTrade(self, tick): # type: (OkexFutureUserTradeInfo)->None
pass
restErrorCodeMap = {
0: '远程服务器并未给出错误代码',
20001: '用户不存在',
@ -473,3 +641,187 @@ errorCodeMap = {
21026: '您的账户已被限制开仓操作',
20119: '接口已下线或无法使用',
}
webSocketErrorCodeMap = {
10000: '必填参数为空',
10001: '参数错误',
10002: '验证失败',
10003: '该连接已经请求了其他用户的实时交易数据',
10004: '该连接没有请求此用户的实时交易数据',
10005: 'api_key或者sign不合法',
10008: '非法参数',
10009: '订单不存在',
10010: '余额不足',
10011: '卖的数量小于BTC/LTC最小买卖额度',
10012: '当前网站暂时只支持btc_usd ltc_usd',
10014: '下单价格不得≤0或≥1000000',
10015: '暂不支持此channel订阅',
10016: '币数量不足',
10017: 'WebSocket鉴权失败',
10100: '用户被冻结',
10049: '小额委托(<0.15BTC)的未成交委托数量不得大于50个',
10216: '非开放API',
20001: '用户不存在',
20002: '用户被冻结',
20003: '用户被爆仓冻结',
20004: '合约账户被冻结',
20005: '用户合约账户不存在',
20006: '必填参数为空',
20007: '参数错误',
20008: '合约账户余额为空',
20009: '虚拟合约状态错误',
20010: '合约风险率信息不存在',
20011: '开仓前保证金率超过90%',
20012: '开仓后保证金率超过90%',
20013: '暂无对手价',
20014: '系统错误',
20015: '订单信息不存在',
20016: '平仓数量是否大于同方向可用持仓数量',
20017: '非本人操作',
20018: '下单价格高于前一分钟的105%或低于95%',
20019: '该IP限制不能请求该资源',
20020: '密钥不存在',
20021: '指数信息不存在',
20022: '接口调用错误',
20023: '逐仓用户',
20024: 'sign签名不匹配',
20025: '杠杆比率错误',
20100: '请求超时',
20101: '数据格式无效',
20102: '登录无效',
20103: '数据事件类型无效',
20104: '数据订阅类型无效',
20107: 'JSON格式错误',
20115: 'quote参数未匹配到',
20116: '参数不匹配',
1002: '交易金额大于余额',
1003: '交易金额小于最小交易值',
1004: '交易金额小于0',
1007: '没有交易市场信息',
1008: '没有最新行情信息',
1009: '没有订单',
1010: '撤销订单与原订单用户不一致',
1011: '没有查询到该用户',
1013: '没有订单类型',
1014: '没有登录',
1015: '没有获取到行情深度信息',
1017: '日期参数错误',
1018: '下单失败',
1019: '撤销订单失败',
1024: '币种不存在',
1025: '没有K线类型',
1026: '没有基准币数量',
1027: '参数不合法可能超出限制',
1028: '保留小数位失败',
1029: '正在准备中',
1030: '有融资融币无法进行交易',
1031: '转账余额不足',
1032: '该币种不能转账',
1035: '密码不合法',
1036: '谷歌验证码不合法',
1037: '谷歌验证码不正确',
1038: '谷歌验证码重复使用',
1039: '短信验证码输错限制',
1040: '短信验证码不合法',
1041: '短信验证码不正确',
1042: '谷歌验证码输错限制',
1043: '登陆密码不允许与交易密码一致',
1044: '原密码错误',
1045: '未设置二次验证',
1046: '原密码未输入',
1048: '用户被冻结',
1050: '订单已撤销或者撤销中',
1051: '订单已完成交易',
1201: '账号零时删除',
1202: '账号不存在',
1203: '转账金额大于余额',
1204: '不同种币种不能转账',
1205: '账号不存在主从关系',
1206: '提现用户被冻结',
1207: '不支持转账',
1208: '没有该转账用户',
1209: '当前api不可用',
}
########################################################################
class ChannelType(Enum):
Login = 1
ForecastPrice = 2
Tick = 3
Depth = 4
Trade = 5
Index = 6
UserTrade = 7
UserInfo = 8
########################################################################
class Channel(object):
#----------------------------------------------------------------------
def __init__(self, type):
self.type = type
########################################################################
class SymbolChannel(Channel):
#----------------------------------------------------------------------
def __init__(self, type, symbol):
super(SymbolChannel, self).__init__(type)
self.symbol = symbol
########################################################################
class FutureSymbolChannel(SymbolChannel):
#----------------------------------------------------------------------
def __init__(self, type, symbol, remoteContractType):
super(FutureSymbolChannel, self).__init__(type, symbol)
self.remoteContractType = remoteContractType
########################################################################
class ExtraSymbolChannel(FutureSymbolChannel):
#----------------------------------------------------------------------
def __init__(self, type, symbol, remoteContractType, extra):
super(ExtraSymbolChannel, self).__init__(type, symbol, remoteContractType)
self.extra = extra
#----------------------------------------------------------------------
def parseChannel(channel): # type: (str)->Channel
if channel == 'login':
return Channel(ChannelType.Login)
# 还未提供订阅的channel都注释掉
# elif channel[4:12] == 'forecast': # eg: 'btc_forecast_price'
# return SymbolChannel(ChannelType.ForecastPrice, channel[:3])
sp = channel.split('_')
if sp[-1] == 'trades': # eg: 'ok_sub_futureusd_trades'
return Channel(ChannelType.UserTrade)
# if sp[-1] == 'userinfo': # eg: 'ok_sub_futureusd_btc_userinfo'
# return Channel(ChannelType.UserInfo)
# if sp[-1] == 'index': # eg: 'ok_sub_futureusd_btc_index'
# return SymbolChannel(ChannelType.Index, channel[17:20])
# if len(sp) == 9:
# _, _, _, easySymbol, crash, typeName, contractTypePrefix, _, depth = sp
# return ExtraSymbolChannel(ChannelType.Depth, easySymbol + '_' + crash,
# remotePrefixToRemoteContractType(contractTypePrefix),
# depth)
_, _, _, easySymbol, crash, typeName, contractTypePrefix, _ = sp
return FutureSymbolChannel(ChannelType.Tick, easySymbol + '_' + crash,
remotePrefixToRemoteContractType(contractTypePrefix))
#----------------------------------------------------------------------
def remotePrefixToRemoteContractType(prefix):
return _prefixForRemoteContractType[prefix]
_prefixForRemoteContractType = {v.split('_')[0]: v for k, v in OkexFutureContractType.__dict__.items() if
not k.startswith('_')}

View File

@ -1,2 +1,2 @@
from .OkexFutureApi import OkexFutureSymbol, OkexFutureContractType, OkexFutureOrder, OkexFutureOrderStatus, OkexFuturePosition, \
OkexFuturePositionDetail, OkexFuturePriceType, OkexFutureRestClient, OkexFutureUserInfo
from .OkexFutureApi import OkexFutureRestClient, OkexFutureWebSocketClient, OkexFutureSymbol, OkexFutureContractType, OkexFutureOrder, OkexFutureOrderStatus, OkexFuturePosition, \
OkexFuturePositionDetail, OkexFuturePriceType, OkexFutureUserInfo

View File

@ -3,11 +3,23 @@ import hashlib
import urllib
from vnpy.api.rest import Request, RestClient
from vnpy.api.websocket import WebSocketClient
#----------------------------------------------------------------------
def paramsToData(params):
return urllib.urlencode(sorted(params.items()))
#----------------------------------------------------------------------
def sign(dataWithApiKey, apiSecret):
"""
usage:
params = { ... , 'api_key': ...}
data = paramsToData(params)
signature = sign(data, apiSecret)
data += "&sign" + signature
:param dataWithApiKey: sorted urlencoded args with apiKey
:return: param 'sign' for okex api
"""
@ -41,10 +53,62 @@ class OkexFutureRestBase(RestClient):
args.pop('sign')
if 'apiKey' not in args:
args['api_key'] = self.apiKey
data = urllib.urlencode(sorted(args.items()))
data = paramsToData(args)
signature = sign(data, self.apiSecret)
data += "&sign=" + signature
req.headers = {'Content-Type': 'application/x-www-form-urlencoded'}
req.data = data
return req
########################################################################
class OkexFutureWebSocketBase(WebSocketClient):
"""
Okex期货websocket客户端
实例化后使用init设置apiKey和secretKeyapiSecret
"""
host = 'wss://real.okex.com:10440/websocket/okexapi'
def __init__(self):
super(OkexFutureWebSocketBase, self).__init__()
super(OkexFutureWebSocketBase, self).init(OkexFutureWebSocketBase.host)
self.apiKey = None
self.apiSecret = None
self.autoLogin = True
self.onConnected = self._onConnected
#----------------------------------------------------------------------
# noinspection PyMethodOverriding
def init(self, apiKey, secretKey, autoLogin=True):
self.apiKey = apiKey
self.apiSecret = secretKey
self.autoLogin = autoLogin
#----------------------------------------------------------------------
def sendPacket(self, dictObj, authenticate=False):
if authenticate:
data = urllib.urlencode(sorted(dictObj.items()))
signature = sign(data, self.apiSecret)
dictObj['sign'] = signature
return super(OkexFutureWebSocketBase, self).sendPacket(dictObj)
#----------------------------------------------------------------------
def _login(self, ):
params = {"api_key": self.apiKey, }
data = paramsToData(params)
signature = sign(data, self.apiSecret)
params['sign'] = signature
self.sendPacket({
"event": "login",
"parameters": params
}, authenticate=False)
#----------------------------------------------------------------------
def _onConnected(self):
if self.autoLogin:
self._login()

View File

@ -3,7 +3,6 @@
import sys
from Queue import Empty, Queue
from abc import abstractmethod
from multiprocessing.dummy import Pool
import requests
@ -79,10 +78,10 @@ class RestClient(object):
"""
HTTP 客户端目前是为了对接各种RESTfulAPI而设计的
如果需要给请求加上签名重载beforeRequest函数
如果需要处理非200的请求重载onFailed函数
如果需要给请求加上签名设置beforeRequest, 函数类型请参考defaultBeforeRequest
如果需要处理非200的请求设置onFailed函数类型请参考defaultOnFailed
如果每一个请求的非200返回都需要单独处理使用addReq函数的onFailed参数
如果捕获Python内部错误例如网络连接失败等等重载onError函数
如果捕获Python内部错误例如网络连接失败等等设置onError函数类型请参考defaultOnError
"""
#----------------------------------------------------------------------
@ -92,6 +91,9 @@ class RestClient(object):
"""
self.urlBase = None # type: str
self.sessionProvider = requestsSessionProvider
self.beforeRequest = self.defaultBeforeRequest # 任何请求在发送之前都会经过这个函数,让其加工
self.onError = self.defaultOnError # Python内部错误处理
self.onFailed = self.defaultOnFailed # statusCode != 2xx 时触发
self._active = False
@ -157,39 +159,40 @@ class RestClient(object):
:return: Request
"""
req = Request(method, path, callback, params, data, headers)
req.onFailed = onFailed
req.skipDefaultOnFailed = skipDefaultOnFailed
req.extra = extra
self._queue.put(req)
return req
request = Request(method, path, callback, params, data, headers)
request.onFailed = onFailed
request.skipDefaultOnFailed = skipDefaultOnFailed
request.extra = extra
self._queue.put(request)
return request
#----------------------------------------------------------------------
def _run(self):
session = self.sessionProvider()
while self._active:
try:
req = self._queue.get(timeout=1)
request = self._queue.get(timeout=1)
try:
self._processRequest(req, session)
self._processRequest(request, session)
finally:
self._queue.task_done()
except Empty:
pass
#----------------------------------------------------------------------
@abstractmethod
def beforeRequest(self, req): # type: (Request)->Request
@staticmethod
def defaultBeforeRequest(request): # type: (Request)->Request
"""
所有请求在发送之前都会经过这个函数
签名之类的前奏可以在这里面实现
需要对request进行什么修改就做什么修改吧
@:return (req)
@:return (request)
"""
return req
return request
#----------------------------------------------------------------------
def onFailed(self, httpStatusCode, req): # type:(int, Request)->None
@staticmethod
def defaultOnFailed(httpStatusCode, request): # type:(int, Request)->None
"""
请求失败处理函数HttpStatusCode!=200.
默认行为是打印到stderr
@ -200,51 +203,52 @@ class RestClient(object):
"data: {}\n"
"response:"
"{}\n"
.format(req.method, req.path, httpStatusCode,
req.headers,
req.params,
req.data,
req._response.raw))
.format(request.method, request.path, httpStatusCode,
request.headers,
request.params,
request.data,
request._response.raw))
#----------------------------------------------------------------------
def onError(self, exceptionType, exceptionValue, tb, req):
@staticmethod
def defaultOnError(exceptionType, exceptionValue, tb, request):
"""
Python内部错误处理默认行为是仍给excepthook
"""
print("error in req : {}\n".format(req))
print("error in request : {}\n".format(request))
sys.excepthook(exceptionType, exceptionValue, tb)
#----------------------------------------------------------------------
def _processRequest(self, req, session): # type: (Request, requests.Session)->None
def _processRequest(self, request, session): # type: (Request, requests.Session)->None
"""
用于内部将请求发送出去
"""
try:
req = self.beforeRequest(req)
request = self.beforeRequest(request)
url = self.makeFullUrl(req.path)
url = self.makeFullUrl(request.path)
response = session.request(req.method, url, headers=req.headers, params=req.params, data=req.data)
req._response = response
response = session.request(request.method, url, headers=request.headers, params=request.params, data=request.data)
request._response = response
httpStatusCode = response.status_code
if httpStatusCode/100 == 2:
jsonBody = response.json()
req.callback(jsonBody, req)
req._status = RequestStatus.success
request.callback(jsonBody, request)
request._status = RequestStatus.success
else:
req._status = RequestStatus.failed
request._status = RequestStatus.failed
if req.onFailed:
req.onFailed(httpStatusCode, response.raw, req)
if request.onFailed:
request.onFailed(httpStatusCode, response.raw, request)
# 若没有onFailed或者没设置skipDefaultOnFailed则调用默认的处理函数
if not req.onFailed or not req.skipDefaultOnFailed:
self.onFailed(httpStatusCode, req)
if not request.onFailed or not request.skipDefaultOnFailed:
self.onFailed(httpStatusCode, request)
except:
req._status = RequestStatus.error
request._status = RequestStatus.error
t, v, tb = sys.exc_info()
self.onError(t, v, tb, req)
self.onError(t, v, tb, request)
def makeFullUrl(self, path):
url = self.urlBase + path

View File

@ -3,38 +3,46 @@
########################################################################
import json
import ssl
import sys
import time
from abc import abstractmethod
from threading import Thread, Lock
import ssl
import time
import websocket
from threading import Lock, Thread
class WebSocketClient(object):
"""
Websocket API
继承使用该类
实例化之后应调用start开始后台线程调用start()函数会自动连接websocket
若要终止后台线程请调用stop() stop()函数会顺便断开websocket
可以重写以下函数
该类默认打包方式为json若从服务器返回的数据不为json则会触发onError
可以覆盖以下回调
onConnected
onDisconnected
onPacket
onPacket # 数据回调只有在返回的数据帧为text并且内容为json时才会回调
onError
当然为了不让用户随意自定义用自己的init函数覆盖掉原本的init(host)也是个不错的选择
@note 继承使用该类
关于ping
在调用start()之后该类每60s会自动发送一个ping帧至服务器
"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.host = None # type: str
self.onConnected = self.defaultOnConnected
self.onDisconnected = self.defaultOnDisconnected
self.onPacket = self.defaultOnPacket
self.onError = self.defaultOnError
self.createConnection = websocket.create_connection
self._ws_lock = Lock()
self._ws = None # type: websocket.WebSocket
@ -42,13 +50,12 @@ class WebSocketClient(object):
self._workerThread = None # type: Thread
self._pingThread = None # type: Thread
self._active = False
self.createConnection = websocket.create_connection
#----------------------------------------------------------------------
def setCreateConnection(self, func):
"""
for internal usage
:param func: a function like websocket.create_connection
"""
self.createConnection = func
@ -68,8 +75,6 @@ class WebSocketClient(object):
self._pingThread = Thread(target=self._runPing)
self._pingThread.start()
self.onConnected()
#----------------------------------------------------------------------
def stop(self):
"""
@ -136,6 +141,9 @@ class WebSocketClient(object):
data = json.loads(stream)
self.onPacket(data)
except websocket.WebSocketConnectionClosedException:
if self._active:
self._reconnect()
except:
et, ev, tb = sys.exc_info()
self.onError(et, ev, tb)
@ -154,22 +162,24 @@ class WebSocketClient(object):
return self._get_ws().send('ping', websocket.ABNF.OPCODE_PING)
#----------------------------------------------------------------------
def onConnected(self):
@staticmethod
def defaultOnConnected():
"""
连接成功回调
"""
pass
#----------------------------------------------------------------------
def onDisconnected(self):
@staticmethod
def defaultOnDisconnected():
"""
连接断开回调
"""
pass
#----------------------------------------------------------------------
@abstractmethod
def onPacket(self, packet):
@staticmethod
def defaultOnPacket(packet):
"""
数据回调
只有在数据为json包的时候才会触发这个回调
@ -179,6 +189,7 @@ class WebSocketClient(object):
pass
#----------------------------------------------------------------------
def onError(self, exceptionType, exceptionValue, tb):
@staticmethod
def defaultOnError(exceptionType, exceptionValue, tb):
"""Python错误回调"""
return sys.excepthook(exceptionType, exceptionValue, tb)

View File

@ -3,8 +3,7 @@
from __future__ import print_function
import json
from abc import abstractmethod, abstractproperty
from datetime import datetime
from abc import abstractmethod
from typing import Dict
@ -12,75 +11,6 @@ from vnpy.api.okexfuture.OkexFutureApi import *
from vnpy.trader.vtFunction import getJsonPath
from vnpy.trader.vtGateway import *
_orderTypeMap = {
(constant.DIRECTION_LONG, constant.OFFSET_OPEN): OkexFutureOrderType.OpenLong,
(constant.DIRECTION_SHORT, constant.OFFSET_OPEN): OkexFutureOrderType.OpenShort,
(constant.DIRECTION_LONG, constant.OFFSET_CLOSE): OkexFutureOrderType.CloseLong,
(constant.DIRECTION_SHORT, constant.OFFSET_CLOSE): OkexFutureOrderType.CloseShort,
}
_orderTypeMapReverse = {v: k for k, v in _orderTypeMap.items()}
_contractTypeMap = {
'THISWEEK': OkexFutureContractType.ThisWeek,
'NEXTWEEK': OkexFutureContractType.NextWeek,
'QUARTER': OkexFutureContractType.Quarter,
}
_contractTypeMapReverse = {v: k for k, v in _contractTypeMap.items()}
_remoteSymbols = {
OkexFutureSymbol.BTC,
OkexFutureSymbol.LTC,
OkexFutureSymbol.ETH,
OkexFutureSymbol.ETC,
OkexFutureSymbol.BCH,
}
# symbols for ui,
# keys:给用户看的symbols : f"{internalSymbol}_{contractType}"
# values: API接口使用的symbol和contractType字段
_symbolsForUi = {(remoteSymbol.upper() + '_' + upperContractType.upper()): (remoteSymbol, remoteContractType)
for remoteSymbol in _remoteSymbols
for upperContractType, remoteContractType in
_contractTypeMap.items()} # type: Dict[str, List[str, str]]
_symbolsForUiReverse = {v: k for k, v in _symbolsForUi.items()}
#----------------------------------------------------------------------
def localOrderTypeToRemote(direction, offset): # type: (str, str)->str
return _orderTypeMap[(direction, offset)]
#----------------------------------------------------------------------
def remoteOrderTypeToLocal(orderType): # type: (str)->(str, str)
"""
:param orderType:
:return: direction, offset
"""
return _orderTypeMapReverse[orderType]
#----------------------------------------------------------------------
def localContractTypeToRemote(localContractType):
return _contractTypeMap[localContractType]
#----------------------------------------------------------------------
def remoteContractTypeToLocal(remoteContractType):
return _contractTypeMapReverse[remoteContractType]
#----------------------------------------------------------------------
def localSymbolToRemote(symbol): # type: (str)->(OkexFutureSymbol, OkexFutureContractType)
"""
:return: remoteSymbol, remoteContractType
"""
return _symbolsForUi[symbol]
#----------------------------------------------------------------------
def remoteSymbolToLocal(remoteSymbol, localContractType):
return remoteSymbol.upper() + localContractType
########################################################################
class VnpyGateway(VtGateway):
@ -89,15 +19,6 @@ class VnpyGateway(VtGateway):
于是我设计了这个类将重复代码抽取出来简化gateway的实现
"""
#----------------------------------------------------------------------
def __init__(self, eventEngine):
super(VnpyGateway, self).__init__(eventEngine, self.gatewayName)
#----------------------------------------------------------------------
@abstractproperty
def gatewayName(self): # type: ()->str
return 'VnpyGateway'
#----------------------------------------------------------------------
def readConfig(self):
"""
@ -146,10 +67,16 @@ class OkexFutureGateway(VnpyGateway):
#----------------------------------------------------------------------
def __init__(self, eventEngine, *_, **__): # args, kwargs is needed for compatibility
"""Constructor"""
super(OkexFutureGateway, self).__init__(eventEngine)
super(OkexFutureGateway, self).__init__(eventEngine, 'OkexFutureGateway')
self.apiKey = None # type: str
self.apiSecret = None # type: str
self.api = OkexFutureRestClient()
self.restApi = OkexFutureRestClient()
self.webSocket = OkexFutureWebSocketClient()
self.webSocket.onTick = self._onTick
self.webSocket.onUserTrade = self._onUserTrade
self.leverRate = 1
self.symbols = []
@ -157,11 +84,6 @@ class OkexFutureGateway(VnpyGateway):
self._orders = {} # type: Dict[str, _Order]
self._remoteIds = {} # type: Dict[str, _Order]
#----------------------------------------------------------------------
@property
def gatewayName(self):
return 'OkexFutureGateway'
#----------------------------------------------------------------------
@property
def exchange(self): # type: ()->str
@ -190,20 +112,28 @@ class OkexFutureGateway(VnpyGateway):
#----------------------------------------------------------------------
def connect(self):
self.loadSetting()
self.api.init(self.apiKey, self.apiSecret)
self.restApi.init(self.apiKey, self.apiSecret)
self.webSocket.init(self.apiKey, self.apiSecret)
self.restApi.start()
self.webSocket.start()
#----------------------------------------------------------------------
def subscribe(self, subscribeReq):
def subscribe(self, subscribeReq): # type: (VtSubscribeReq)->None
"""订阅行情"""
pass
remoteSymbol, remoteContractType = localSymbolToRemote(subscribeReq.symbol)
return self.webSocket.subscribe(remoteSymbol, remoteContractType)
#----------------------------------------------------------------------
def _getOrderByLocalId(self, localId):
return self._orders[localId]
if localId in self._orders:
return self._orders[localId]
return None
#----------------------------------------------------------------------
def _getOrderByRemoteId(self, remoteId):
return self._remoteIds[remoteId]
if remoteId in self._remoteIds:
return self._remoteIds[remoteId]
return None
#----------------------------------------------------------------------
def _saveRemoteId(self, remoteId, myorder):
@ -241,16 +171,16 @@ class OkexFutureGateway(VnpyGateway):
if vtRequest.priceType == constant.PRICETYPE_MARKETPRICE:
userMarketPrice = True
self.api.sendOrder(symbol=remoteSymbol,
contractType=remoteContractType,
orderType=orderType,
volume=vtRequest.volume,
price=vtRequest.price,
useMarketPrice=userMarketPrice,
leverRate=self.leverRate,
onSuccess=self._onOrderSent,
extra=None,
)
self.restApi.sendOrder(symbol=remoteSymbol,
contractType=remoteContractType,
orderType=orderType,
volume=vtRequest.volume,
price=vtRequest.price,
useMarketPrice=userMarketPrice,
leverRate=self.leverRate,
onSuccess=self._onOrderSent,
extra=None,
)
return myorder.localId
@ -258,13 +188,15 @@ class OkexFutureGateway(VnpyGateway):
def cancelOrder(self, vtCancel): # type: (VtCancelOrderReq)->None
"""撤单"""
myorder = self._getOrderByLocalId(vtCancel.orderID)
assert myorder is not None, u"理论上是无法取消一个不存在的本地单的"
symbol, contractType = localSymbolToRemote(vtCancel.symbol)
self.api.cancelOrder(symbol=symbol,
contractType=contractType,
orderId=myorder.remoteId,
onSuccess=self._onOrderCanceled,
extra=myorder,
)
self.restApi.cancelOrder(symbol=symbol,
contractType=contractType,
orderId=myorder.remoteId,
onSuccess=self._onOrderCanceled,
extra=myorder,
)
# cancelDict: 不存在的没有localId就没有remoteId没有remoteId何来cancel
#----------------------------------------------------------------------
@ -284,15 +216,15 @@ class OkexFutureGateway(VnpyGateway):
remoteContractType = contractType
localContractType = remoteContractTypeToLocal(remoteContractType)
self.api.queryOrders(symbol=symbol,
contractType=remoteContractType,
status=status,
onSuccess=self._onQueryOrders,
extra=localContractType)
self.restApi.queryOrders(symbol=symbol,
contractType=remoteContractType,
status=status,
onSuccess=self._onQueryOrders,
extra=localContractType)
#----------------------------------------------------------------------
def qryAccount(self):
self.api.queryUserInfo(onSuccess=self._onQueryAccount)
self.restApi.queryUserInfo(onSuccess=self._onQueryAccount)
"""查询账户资金"""
pass
@ -301,16 +233,17 @@ class OkexFutureGateway(VnpyGateway):
"""查询持仓"""
for remoteSymbol in _remoteSymbols:
for localContractType, remoteContractType in _contractTypeMap.items():
self.api.queryPosition(remoteSymbol,
remoteContractType,
onSuccess=self._onQueryPosition,
extra=localContractType
)
self.restApi.queryPosition(remoteSymbol,
remoteContractType,
onSuccess=self._onQueryPosition,
extra=localContractType
)
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.api.stop()
self.restApi.stop()
self.webSocket.stop()
#----------------------------------------------------------------------
def _onOrderSent(self, remoteId, myorder): #type: (str, _Order)->None
@ -319,23 +252,6 @@ class OkexFutureGateway(VnpyGateway):
self._saveRemoteId(remoteId, myorder)
self.onOrder(myorder.vtOrder)
# #----------------------------------------------------------------------
# def _pushOrderAsTraded(self, order):
# trade = VtTradeData()
# trade.gatewayName = order.gatewayName
# trade.symbol = order.symbol
# trade.vtSymbol = order.vtSymbol
# trade.orderID = order.orderID
# trade.vtOrderID = order.vtOrderID
# self.tradeID += 1
# trade.tradeID = str(self.tradeID)
# trade.vtTradeID = '.'.join([self.gatewayName, trade.tradeID])
# trade.direction = order.direction
# trade.price = order.price
# trade.volume = order.tradedVolume
# trade.tradeTime = datetime.now().strftime('%H:%M:%S')
# self.onTrade(trade)
#----------------------------------------------------------------------
@staticmethod
def _onOrderCanceled(myorder): #type: (_Order)->None
@ -346,10 +262,10 @@ class OkexFutureGateway(VnpyGateway):
localContractType = extra
for order in orders:
remoteId = order.remoteId
if remoteId in self._remoteIds:
myorder = self._getOrderByRemoteId(remoteId)
if myorder:
# 如果订单已经缓存在本地,则尝试更新订单状态
myorder = self._getOrderByRemoteId(remoteId)
# 有新交易才推送更新
if order.tradedVolume != myorder.vtOrder.tradedVolume:
@ -367,12 +283,6 @@ class OkexFutureGateway(VnpyGateway):
self._saveRemoteId(myorder.remoteId, myorder)
self.onOrder(myorder.vtOrder)
# # 如果该订单已经交易完成,推送交易完成消息
# # todo: 这样写会导致同一个订单产生多次交易完成消息
# if order.status == OkexFutureOrderStatus.Finished:
# myorder.vtOrder.status = constant.STATUS_ALLTRADED
# self._pushOrderAsTraded(myorder.vtOrder)
#----------------------------------------------------------------------
def _onQueryAccount(self, infos, _): # type: (List[OkexFutureUserInfo], Any)->None
for info in infos:
@ -390,27 +300,130 @@ class OkexFutureGateway(VnpyGateway):
localContractType = extra
for info in posinfo.holding:
# 先生成多头持仓
pos = VtPositionData()
pos.gatewayName = self.gatewayName
pos.symbol = remoteSymbolToLocal(pos.symbol, localContractType)
pos.exchange = self.exchange
pos.vtSymbol = '.'.join([pos.symbol, pos.exchange])
pos.direction = constant.DIRECTION_NET
pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction])
pos.position = float(info.buyAmount)
pos = VtPositionData.createFromGateway(
gateway=self,
exchange=self.exchange,
symbol=remoteSymbolToLocal(info.symbol, localContractType),
direction=constant.DIRECTION_NET,
position=float(info.buyAmount),
)
self.onPosition(pos)
# 再生存空头持仓
pos = VtPositionData()
pos.gatewayName = self.gatewayName
pos.symbol = remoteSymbolToLocal(pos.symbol, localContractType)
pos.exchange = self.exchange
pos.vtSymbol = '.'.join([pos.symbol, pos.exchange])
pos.direction = constant.DIRECTION_SHORT
pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction])
pos.position = float(info.sellAmount)
pos = VtPositionData.createFromGateway(
gateway=self,
exchange=self.exchange,
symbol=remoteSymbolToLocal(info.symbol, localContractType),
direction=constant.DIRECTION_SHORT,
position=float(info.sellAmount),
)
self.onPosition(pos)
#----------------------------------------------------------------------
def _onTick(self, info): # type: (OkexFutureTickInfo)->None
uiSymbol = remoteSymbolToLocal(info.symbol, remoteContractTypeToLocal(info.remoteContractType))
self.onTick(VtTickData.createFromGateway(
gateway=self,
symbol=uiSymbol,
exchange=self.exchange,
lastPrice=info.last,
lastVolume=info.vol,
highPrice=info.high,
lowPrice=info.low,
openInterest=info.holdAmount,
lowerLimit=info.limitLow,
upperLimit=info.limitHigh,
))
def _onUserTrade(self, info): # type: (OkexFutureUserTradeInfo)->None
tradeID = str(self.tradeID)
self.tradeID += 1
order = self._getOrderByRemoteId(info.remoteId)
if order:
self.onTrade(VtTradeData.createFromOrderData(
order=order.vtOrder,
tradeID=tradeID,
tradePrice=info.price,
tradeVolume=info.dealAmount # todo: 这里应该填写的到底是order总共成交了的数量还是该次trade成交的数量
))
else:
# todo: 与order无关联的trade该如何处理
# uiSymbol = remoteSymbolToLocal(info.symbol, remoteContractTypeToLocal(info.remoteContractType))
pass
return
#----------------------------------------------------------------------
def localOrderTypeToRemote(direction, offset): # type: (str, str)->str
return _orderTypeMap[(direction, offset)]
#----------------------------------------------------------------------
def remoteOrderTypeToLocal(orderType): # type: (str)->(str, str)
"""
:param orderType:
:return: direction, offset
"""
return _orderTypeMapReverse[orderType]
#----------------------------------------------------------------------
def localContractTypeToRemote(localContractType):
return _contractTypeMap[localContractType]
#----------------------------------------------------------------------
def remoteContractTypeToLocal(remoteContractType):
return _contractTypeMapReverse[remoteContractType]
#----------------------------------------------------------------------
def localSymbolToRemote(symbol): # type: (str)->(OkexFutureSymbol, OkexFutureContractType)
"""
:return: remoteSymbol, remoteContractType
"""
return _symbolsForUi[symbol]
#----------------------------------------------------------------------
def remoteSymbolToLocal(remoteSymbol, localContractType):
return remoteSymbol.upper() + '_' + localContractType
_orderTypeMap = {
(constant.DIRECTION_LONG, constant.OFFSET_OPEN): OkexFutureOrderType.OpenLong,
(constant.DIRECTION_SHORT, constant.OFFSET_OPEN): OkexFutureOrderType.OpenShort,
(constant.DIRECTION_LONG, constant.OFFSET_CLOSE): OkexFutureOrderType.CloseLong,
(constant.DIRECTION_SHORT, constant.OFFSET_CLOSE): OkexFutureOrderType.CloseShort,
}
_orderTypeMapReverse = {v: k for k, v in _orderTypeMap.items()}
_contractTypeMap = {
k.upper(): v for k, v in OkexFutureContractType.__dict__.items() if not k.startswith('_')
}
_contractTypeMapReverse = {v: k for k, v in _contractTypeMap.items()}
_easySymbols = {
v for k, v in OkexFutureEasySymbol.__dict__.items() if not k.startswith('_')
}
_remoteSymbols = {
v for k, v in OkexFutureSymbol.__dict__.items() if not k.startswith('_')
}
# symbols for ui,
# keys:给用户看的symbols : f"{internalSymbol}_{contractType}"
# values: API接口使用的symbol和contractType字段
_symbolsForUi = {(remoteSymbol.upper() + '_' + upperContractType.upper()): (remoteSymbol, remoteContractType)
for remoteSymbol in _remoteSymbols
for upperContractType, remoteContractType in
_contractTypeMap.items()} # type: Dict[str, List[str, str]]
_symbolsForUiReverse = {v: k for k, v in _symbolsForUi.items()}
_channel_for_subscribe = {
'ok_sub_futureusd_' + easySymbol + '_ticker_' + remoteContractType: (easySymbol, remoteContractType)
for easySymbol in _easySymbols
for remoteContractType in _contractTypeMap.values()
}

View File

@ -1,8 +1,9 @@
# encoding: UTF-8
from logging import INFO
import time
from datetime import datetime
from logging import INFO
from vnpy.trader.language import constant
from vnpy.trader.vtConstant import (EMPTY_FLOAT, EMPTY_INT, EMPTY_STRING, EMPTY_UNICODE)
@ -74,8 +75,37 @@ class VtTickData(VtBaseData):
self.askVolume2 = EMPTY_INT
self.askVolume3 = EMPTY_INT
self.askVolume4 = EMPTY_INT
self.askVolume5 = EMPTY_INT
self.askVolume5 = EMPTY_INT
#----------------------------------------------------------------------
@staticmethod
def createFromGateway(gateway, symbol, exchange,
lastPrice, lastVolume,
highPrice, lowPrice,
openPrice=EMPTY_FLOAT,
openInterest=EMPTY_INT,
upperLimit=EMPTY_FLOAT,
lowerLimit=EMPTY_FLOAT):
tick = VtTickData()
tick.gatewayName = gateway.gatewayName
tick.symbol = symbol
tick.exchange = exchange
tick.vtSymbol = symbol + '.' + exchange
tick.lastPrice = lastPrice
tick.lastVolume = lastVolume
tick.openInterest = openInterest
tick.datetime = datetime.now()
tick.date = tick.datetime.strftime('%Y%m%d')
tick.time = tick.datetime.strftime('%H:%M:%S')
tick.openPrice = openPrice
tick.highPrice = highPrice
tick.lowPrice = lowPrice
tick.upperLimit = upperLimit
tick.lowerLimit = lowerLimit
return tick
########################################################################
class VtBarData(VtBaseData):
@ -133,7 +163,27 @@ class VtTradeData(VtBaseData):
self.price = EMPTY_FLOAT # 成交价格
self.volume = EMPTY_INT # 成交数量
self.tradeTime = EMPTY_STRING # 成交时间
#----------------------------------------------------------------------
@staticmethod
def createFromGateway(gateway, symbol, exchange, tradeID, orderID, direction, tradePrice, tradeVolume):
trade = VtTradeData()
trade.gatewayName = gateway.gatewayName
trade.symbol = symbol
trade.exchange = exchange
trade.vtSymbol = symbol + '.' + exchange
trade.orderID = orderID
trade.vtOrderID = trade.gatewayName + '.' + trade.tradeID
trade.tradeID = tradeID
trade.vtTradeID = trade.gatewayName + '.' + tradeID
trade.direction = direction
trade.price = tradePrice
trade.volume = tradeVolume
trade.tradeTime = datetime.now().strftime('%H:%M:%S')
return trade
#----------------------------------------------------------------------
@staticmethod
def createFromOrderData(order,
@ -144,6 +194,7 @@ class VtTradeData(VtBaseData):
trade.gatewayName = order.gatewayName
trade.symbol = order.symbol
trade.vtSymbol = order.vtSymbol
trade.orderID = order.orderID
trade.vtOrderID = order.vtOrderID
trade.tradeID = tradeID