diff --git a/vnpy/api/okexfuture/OkexFutureApi.py b/vnpy/api/okexfuture/OkexFutureApi.py index b6d801c9..30bf7b03 100644 --- a/vnpy/api/okexfuture/OkexFutureApi.py +++ b/vnpy/api/okexfuture/OkexFutureApi.py @@ -1,7 +1,8 @@ # encoding: UTF-8 +from enum import Enum from typing import Any, Callable, List, Union -from vnpy.api.okexfuture.vnokexFuture import OkexFutureRestBase +from vnpy.api.okexfuture.vnokexFuture import OkexFutureRestBase, OkexFutureWebSocketBase from vnpy.api.rest import Request @@ -12,8 +13,20 @@ class _OkexFutureCustomExtra(object): self.onFailed = onFailed self.onSuccess = onSuccess self.extra = extra - - + + +######################################################################## +class OkexFutureEasySymbol(object): + BTC = 'btc' + LTC = 'ltc' + ETH = 'eth' + ETC = 'etc' + BCH = 'bch' + EOS = 'eos' + XRP = 'xrp' + BTG = 'btg' + + ######################################################################## class OkexFutureSymbol(object): BTC = 'btc_usd' @@ -114,6 +127,68 @@ class OkexFuturePositionDetail(object): self.contractType = None +######################################################################## +class OkexFutureTickInfo(object): + + #---------------------------------------------------------------------- + def __init__(self, symbol, remoteContractType, last, limitHigh, limitLow, vol, sell, buy, unitAmount, holdAmount, + contractId, high, low): + self.symbol = symbol + self.remoteContractType = remoteContractType + self.last = last + self.limitHigh = limitHigh # type: str # 最高买入限制价格 + self.limitLow = limitLow # type: str # 最低卖出限制价格 + self.vol = vol # type: float # 24 小时成交量 + self.sell = sell # type: float # 卖一价格 + self.buy = buy # type: float # 买一价格 + self.unitAmount = unitAmount # type: float # 合约价值 + self.holdAmount = holdAmount # type: float # 当前持仓量 + self.contractId = contractId # type: long # 合约ID + self.high = high # type: float # 24 小时最高价格 + self.low = low # type: float # 24 小时最低价格 + + +######################################################################## +class OkexFutureTradeInfo(object): + + #---------------------------------------------------------------------- + def __init__(self, symbol, remoteContractType, index, price, volume, time, direction, coinVolume): + self.symbol = symbol + self.remoteContractType = remoteContractType + self.index = index + self.price = price + self.volume = volume + self.time = time + self.direction = direction + self.coinVolume = coinVolume + + +######################################################################## +class OkexFutureUserTradeInfo(object): + + #---------------------------------------------------------------------- + def __init__(self, symbol, remoteContractType, amount, + contractName, createdDate, createDateStr, dealAmount, fee, + orderId, price, priceAvg, status, type, unitAmount, leverRate, systemType + ): + self.symbol = symbol # type: str # btcUsd ltcUsd ethUsd etcUsd bchUsd + self.remoteContractType = remoteContractType + self.amount = amount # type: float # 委托数量 + self.contractName = contractName # type: str # 合约名称 + self.createdDate = createdDate # type: long # 委托时间 + self.createDateStr = createDateStr # type: str # 委托时间字符串 + self.dealAmount = dealAmount # type: float # 成交数量 + self.fee = fee # type: float # 手续费 + self.remoteId = orderId # type: long # 订单ID + self.price = price # type: float # 订单价格 + self.priceAvg = priceAvg # type: float # 平均价格 + self.status = status # type: int # 订单状态(0等待成交 1部分成交 2全部成交 -1撤单 4撤单处理中) + self.type = type # type: int # 订单类型 1:开多 2:开空 3:平多 4:平空 + self.unitAmount = unitAmount # type: float # 合约面值 + self.leverRate = leverRate # type: float # 杠杆倍数 value:10/20 默认10 + self.systemType = systemType # type: int # 订单类型 0:普通 1:交割 2:强平 4:全平 5:系统反单 + + ######################################################################## class OkexFutureRestClient(OkexFutureRestBase): @@ -424,11 +499,104 @@ class OkexFutureRestClient(OkexFutureRestBase): #--------------------------------------------------------------------- @staticmethod def errorCodeToString(code): - assert code in errorCodeMap - return errorCodeMap[code] + assert code in restErrorCodeMap + return restErrorCodeMap[code] -errorCodeMap = { +######################################################################## +class OkexFutureWebSocketClient(OkexFutureWebSocketBase): + + #---------------------------------------------------------------------- + def __init__(self): + super(OkexFutureWebSocketClient, self).__init__() + self.onTick = self.defaultOnTick + self.onUserTrade = self.defaultOnUserTrade + + #---------------------------------------------------------------------- + def subscribe(self, easySymbol, contractType): # type: (OkexFutureEasySymbol, OkexFutureContractType)->None + self.sendPacket({ + 'event': 'addChannel', + 'channel': 'ok_sub_futureusd_' + easySymbol + '_ticker_' + contractType + }) + + #---------------------------------------------------------------------- + def subscribeUserTrade(self): + # todo: 没有测试条件 + self.sendPacket({ + 'event': 'addChannel', + 'channel': 'ok_sub_futureusd_trades' + }) + + #---------------------------------------------------------------------- + def defaultOnPacket(self, packets): + + for packet in packets: + print('packets:') + print(packets) + channelName = None + if 'channel' in packet: + channelName = packet['channel'] + if not channelName or channelName == 'addChannel': + return + + packet = packet['data'] + channel = parseChannel(channelName) # type: ExtraSymbolChannel + + if channel.type == ChannelType.Tick: + self.onTick(OkexFutureTickInfo( + symbol=channel.symbol, + remoteContractType=channel.remoteContractType, + last=packet['last'], # float # 最高买入限制价格 + limitHigh=packet['limitHigh'], # str # 最高买入限制价格 + limitLow=packet['limitLow'], # str # 最低卖出限制价格 + vol=packet['vol'], # float # 24 小时成交量 + sell=packet['sell'], # float # 卖一价格 + buy=packet['buy'], # float # 买一价格 + unitAmount=packet['unitAmount'], # float # 合约价值 + holdAmount=packet['hold_amount'], # float # 当前持仓量 + contractId=packet['contractId'], # long # 合约ID + high=packet['high'], # float # 24 小时最高价格 + low=packet['low'], # float # 24 小时最低价格 + )) + # elif channel.type == ChannelType.Trade: + # trades = [] + # for tradeInfo in packet: + # trades.append(OkexFutureTradeInfo( + # channel.symbol, channel.remoteContractType, *tradeInfo + # )) + # self.onTrades(trades) + + # todo: 没有测试条件 + elif channel.type == ChannelType.UserTrade: + self.onUserTrade(OkexFutureUserTradeInfo( + symbol=packet['symbol'], # str # btc_usd ltc_usd eth_usd etc_usd bch_usd + remoteContractType=packet['contract_type'], + amount=packet['amount'], # float # 委托数量 + contractName=packet['contract_name'], # str # 合约名称 + createdDate=packet['created_date'], # long # 委托时间 + createDateStr=packet['create_date_str'], # str # 委托时间字符串 + dealAmount=packet['deal_amount'], # float # 成交数量 + fee=packet['fee'], # float # 手续费 + orderId=packet['order_id'], # long # 订单ID + price=packet['price'], # float # 订单价格 + priceAvg=packet['price_avg'], # float # 平均价格 + status=packet['status'], # int # 订单状态(0等待成交 1部分成交 2全部成交 -1撤单 4撤单处理中) + type=packet['type'], # int # 订单类型 1:开多 2:开空 3:平多 4:平空 + unitAmount=packet['unit_amount'], # float # 合约面值 + leverRate=packet['lever_rate'], # float # 杠杆倍数 value:10/20 默认10 + systemType=packet['system_type'], # int # 订单类型 0:普通 1:交割 2:强平 4:全平 5:系统反单 + )) + + #---------------------------------------------------------------------- + def defaultOnTick(self, tick): # type: (OkexFutureTickInfo)->None + pass + + #---------------------------------------------------------------------- + def defaultOnUserTrade(self, tick): # type: (OkexFutureUserTradeInfo)->None + pass + + +restErrorCodeMap = { 0: '远程服务器并未给出错误代码', 20001: '用户不存在', @@ -473,3 +641,187 @@ errorCodeMap = { 21026: '您的账户已被限制开仓操作', 20119: '接口已下线或无法使用', } + +webSocketErrorCodeMap = { + 10000: '必填参数为空', + 10001: '参数错误', + 10002: '验证失败', + 10003: '该连接已经请求了其他用户的实时交易数据', + 10004: '该连接没有请求此用户的实时交易数据', + 10005: 'api_key或者sign不合法', + 10008: '非法参数', + 10009: '订单不存在', + 10010: '余额不足', + 10011: '卖的数量小于BTC/LTC最小买卖额度', + 10012: '当前网站暂时只支持btc_usd ltc_usd', + 10014: '下单价格不得≤0或≥1000000', + 10015: '暂不支持此channel订阅', + 10016: '币数量不足', + 10017: 'WebSocket鉴权失败', + 10100: '用户被冻结', + 10049: '小额委托(<0.15BTC)的未成交委托数量不得大于50个', + 10216: '非开放API', + 20001: '用户不存在', + 20002: '用户被冻结', + 20003: '用户被爆仓冻结', + 20004: '合约账户被冻结', + 20005: '用户合约账户不存在', + 20006: '必填参数为空', + 20007: '参数错误', + 20008: '合约账户余额为空', + 20009: '虚拟合约状态错误', + 20010: '合约风险率信息不存在', + 20011: '开仓前保证金率超过90%', + 20012: '开仓后保证金率超过90%', + 20013: '暂无对手价', + 20014: '系统错误', + 20015: '订单信息不存在', + 20016: '平仓数量是否大于同方向可用持仓数量', + 20017: '非本人操作', + 20018: '下单价格高于前一分钟的105%或低于95%', + 20019: '该IP限制不能请求该资源', + 20020: '密钥不存在', + 20021: '指数信息不存在', + 20022: '接口调用错误', + 20023: '逐仓用户', + 20024: 'sign签名不匹配', + 20025: '杠杆比率错误', + 20100: '请求超时', + 20101: '数据格式无效', + 20102: '登录无效', + 20103: '数据事件类型无效', + 20104: '数据订阅类型无效', + 20107: 'JSON格式错误', + 20115: 'quote参数未匹配到', + 20116: '参数不匹配', + 1002: '交易金额大于余额', + 1003: '交易金额小于最小交易值', + 1004: '交易金额小于0', + 1007: '没有交易市场信息', + 1008: '没有最新行情信息', + 1009: '没有订单', + 1010: '撤销订单与原订单用户不一致', + 1011: '没有查询到该用户', + 1013: '没有订单类型', + 1014: '没有登录', + 1015: '没有获取到行情深度信息', + 1017: '日期参数错误', + 1018: '下单失败', + 1019: '撤销订单失败', + 1024: '币种不存在', + 1025: '没有K线类型', + 1026: '没有基准币数量', + 1027: '参数不合法可能超出限制', + 1028: '保留小数位失败', + 1029: '正在准备中', + 1030: '有融资融币无法进行交易', + 1031: '转账余额不足', + 1032: '该币种不能转账', + 1035: '密码不合法', + 1036: '谷歌验证码不合法', + 1037: '谷歌验证码不正确', + 1038: '谷歌验证码重复使用', + 1039: '短信验证码输错限制', + 1040: '短信验证码不合法', + 1041: '短信验证码不正确', + 1042: '谷歌验证码输错限制', + 1043: '登陆密码不允许与交易密码一致', + 1044: '原密码错误', + 1045: '未设置二次验证', + 1046: '原密码未输入', + 1048: '用户被冻结', + 1050: '订单已撤销或者撤销中', + 1051: '订单已完成交易', + 1201: '账号零时删除', + 1202: '账号不存在', + 1203: '转账金额大于余额', + 1204: '不同种币种不能转账', + 1205: '账号不存在主从关系', + 1206: '提现用户被冻结', + 1207: '不支持转账', + 1208: '没有该转账用户', + 1209: '当前api不可用', +} + + +######################################################################## +class ChannelType(Enum): + Login = 1 + ForecastPrice = 2 + Tick = 3 + Depth = 4 + Trade = 5 + Index = 6 + UserTrade = 7 + UserInfo = 8 + + +######################################################################## +class Channel(object): + + #---------------------------------------------------------------------- + def __init__(self, type): + self.type = type + + +######################################################################## +class SymbolChannel(Channel): + + #---------------------------------------------------------------------- + def __init__(self, type, symbol): + super(SymbolChannel, self).__init__(type) + self.symbol = symbol + + +######################################################################## +class FutureSymbolChannel(SymbolChannel): + + #---------------------------------------------------------------------- + def __init__(self, type, symbol, remoteContractType): + super(FutureSymbolChannel, self).__init__(type, symbol) + self.remoteContractType = remoteContractType + + +######################################################################## +class ExtraSymbolChannel(FutureSymbolChannel): + + #---------------------------------------------------------------------- + def __init__(self, type, symbol, remoteContractType, extra): + super(ExtraSymbolChannel, self).__init__(type, symbol, remoteContractType) + self.extra = extra + + +#---------------------------------------------------------------------- +def parseChannel(channel): # type: (str)->Channel + if channel == 'login': + return Channel(ChannelType.Login) + + # 还未提供订阅的channel都注释掉 + # elif channel[4:12] == 'forecast': # eg: 'btc_forecast_price' + # return SymbolChannel(ChannelType.ForecastPrice, channel[:3]) + + sp = channel.split('_') + if sp[-1] == 'trades': # eg: 'ok_sub_futureusd_trades' + return Channel(ChannelType.UserTrade) + # if sp[-1] == 'userinfo': # eg: 'ok_sub_futureusd_btc_userinfo' + # return Channel(ChannelType.UserInfo) + # if sp[-1] == 'index': # eg: 'ok_sub_futureusd_btc_index' + # return SymbolChannel(ChannelType.Index, channel[17:20]) + + # if len(sp) == 9: + # _, _, _, easySymbol, crash, typeName, contractTypePrefix, _, depth = sp + # return ExtraSymbolChannel(ChannelType.Depth, easySymbol + '_' + crash, + # remotePrefixToRemoteContractType(contractTypePrefix), + # depth) + _, _, _, easySymbol, crash, typeName, contractTypePrefix, _ = sp + return FutureSymbolChannel(ChannelType.Tick, easySymbol + '_' + crash, + remotePrefixToRemoteContractType(contractTypePrefix)) + + +#---------------------------------------------------------------------- +def remotePrefixToRemoteContractType(prefix): + return _prefixForRemoteContractType[prefix] + + +_prefixForRemoteContractType = {v.split('_')[0]: v for k, v in OkexFutureContractType.__dict__.items() if + not k.startswith('_')} diff --git a/vnpy/api/okexfuture/__init__.py b/vnpy/api/okexfuture/__init__.py index 71802197..6bac0f36 100644 --- a/vnpy/api/okexfuture/__init__.py +++ b/vnpy/api/okexfuture/__init__.py @@ -1,2 +1,2 @@ -from .OkexFutureApi import OkexFutureSymbol, OkexFutureContractType, OkexFutureOrder, OkexFutureOrderStatus, OkexFuturePosition, \ - OkexFuturePositionDetail, OkexFuturePriceType, OkexFutureRestClient, OkexFutureUserInfo +from .OkexFutureApi import OkexFutureRestClient, OkexFutureWebSocketClient, OkexFutureSymbol, OkexFutureContractType, OkexFutureOrder, OkexFutureOrderStatus, OkexFuturePosition, \ + OkexFuturePositionDetail, OkexFuturePriceType, OkexFutureUserInfo diff --git a/vnpy/api/okexfuture/vnokexFuture.py b/vnpy/api/okexfuture/vnokexFuture.py index 0b3a7e2e..67c0a879 100644 --- a/vnpy/api/okexfuture/vnokexFuture.py +++ b/vnpy/api/okexfuture/vnokexFuture.py @@ -3,11 +3,23 @@ import hashlib import urllib from vnpy.api.rest import Request, RestClient +from vnpy.api.websocket import WebSocketClient + + +#---------------------------------------------------------------------- +def paramsToData(params): + return urllib.urlencode(sorted(params.items())) #---------------------------------------------------------------------- def sign(dataWithApiKey, apiSecret): """ + usage: + params = { ... , 'api_key': ...} + data = paramsToData(params) + signature = sign(data, apiSecret) + data += "&sign" + signature + :param dataWithApiKey: sorted urlencoded args with apiKey :return: param 'sign' for okex api """ @@ -41,10 +53,62 @@ class OkexFutureRestBase(RestClient): args.pop('sign') if 'apiKey' not in args: args['api_key'] = self.apiKey - data = urllib.urlencode(sorted(args.items())) + data = paramsToData(args) signature = sign(data, self.apiSecret) data += "&sign=" + signature req.headers = {'Content-Type': 'application/x-www-form-urlencoded'} req.data = data return req + + +######################################################################## +class OkexFutureWebSocketBase(WebSocketClient): + """ + Okex期货websocket客户端 + 实例化后使用init设置apiKey和secretKey(apiSecret) + """ + host = 'wss://real.okex.com:10440/websocket/okexapi' + + def __init__(self): + super(OkexFutureWebSocketBase, self).__init__() + super(OkexFutureWebSocketBase, self).init(OkexFutureWebSocketBase.host) + self.apiKey = None + self.apiSecret = None + self.autoLogin = True + + self.onConnected = self._onConnected + + #---------------------------------------------------------------------- + # noinspection PyMethodOverriding + def init(self, apiKey, secretKey, autoLogin=True): + + self.apiKey = apiKey + self.apiSecret = secretKey + self.autoLogin = autoLogin + + #---------------------------------------------------------------------- + def sendPacket(self, dictObj, authenticate=False): + if authenticate: + data = urllib.urlencode(sorted(dictObj.items())) + signature = sign(data, self.apiSecret) + dictObj['sign'] = signature + return super(OkexFutureWebSocketBase, self).sendPacket(dictObj) + + #---------------------------------------------------------------------- + def _login(self, ): + + params = {"api_key": self.apiKey, } + data = paramsToData(params) + signature = sign(data, self.apiSecret) + params['sign'] = signature + + self.sendPacket({ + "event": "login", + "parameters": params + }, authenticate=False) + + #---------------------------------------------------------------------- + def _onConnected(self): + if self.autoLogin: + self._login() diff --git a/vnpy/api/rest/RestClient.py b/vnpy/api/rest/RestClient.py index 420637b1..4560671d 100644 --- a/vnpy/api/rest/RestClient.py +++ b/vnpy/api/rest/RestClient.py @@ -3,7 +3,6 @@ import sys from Queue import Empty, Queue -from abc import abstractmethod from multiprocessing.dummy import Pool import requests @@ -79,10 +78,10 @@ class RestClient(object): """ HTTP 客户端。目前是为了对接各种RESTfulAPI而设计的。 - 如果需要给请求加上签名,请重载beforeRequest函数。 - 如果需要处理非200的请求,请重载onFailed函数。 + 如果需要给请求加上签名,请设置beforeRequest, 函数类型请参考defaultBeforeRequest。 + 如果需要处理非200的请求,请设置onFailed,函数类型请参考defaultOnFailed。 如果每一个请求的非200返回都需要单独处理,使用addReq函数的onFailed参数 - 如果捕获Python内部错误,例如网络连接失败等等,请重载onError函数。 + 如果捕获Python内部错误,例如网络连接失败等等,请设置onError,函数类型请参考defaultOnError """ #---------------------------------------------------------------------- @@ -92,6 +91,9 @@ class RestClient(object): """ self.urlBase = None # type: str self.sessionProvider = requestsSessionProvider + self.beforeRequest = self.defaultBeforeRequest # 任何请求在发送之前都会经过这个函数,让其加工 + self.onError = self.defaultOnError # Python内部错误处理 + self.onFailed = self.defaultOnFailed # statusCode != 2xx 时触发 self._active = False @@ -157,39 +159,40 @@ class RestClient(object): :return: Request """ - req = Request(method, path, callback, params, data, headers) - req.onFailed = onFailed - req.skipDefaultOnFailed = skipDefaultOnFailed - req.extra = extra - self._queue.put(req) - return req + request = Request(method, path, callback, params, data, headers) + request.onFailed = onFailed + request.skipDefaultOnFailed = skipDefaultOnFailed + request.extra = extra + self._queue.put(request) + return request #---------------------------------------------------------------------- def _run(self): session = self.sessionProvider() while self._active: try: - req = self._queue.get(timeout=1) + request = self._queue.get(timeout=1) try: - self._processRequest(req, session) + self._processRequest(request, session) finally: self._queue.task_done() except Empty: pass #---------------------------------------------------------------------- - @abstractmethod - def beforeRequest(self, req): # type: (Request)->Request + @staticmethod + def defaultBeforeRequest(request): # type: (Request)->Request """ 所有请求在发送之前都会经过这个函数 签名之类的前奏可以在这里面实现 需要对request进行什么修改就做什么修改吧 - @:return (req) + @:return (request) """ - return req + return request #---------------------------------------------------------------------- - def onFailed(self, httpStatusCode, req): # type:(int, Request)->None + @staticmethod + def defaultOnFailed(httpStatusCode, request): # type:(int, Request)->None """ 请求失败处理函数(HttpStatusCode!=200). 默认行为是打印到stderr @@ -200,51 +203,52 @@ class RestClient(object): "data: {}\n" "response:" "{}\n" - .format(req.method, req.path, httpStatusCode, - req.headers, - req.params, - req.data, - req._response.raw)) + .format(request.method, request.path, httpStatusCode, + request.headers, + request.params, + request.data, + request._response.raw)) #---------------------------------------------------------------------- - def onError(self, exceptionType, exceptionValue, tb, req): + @staticmethod + def defaultOnError(exceptionType, exceptionValue, tb, request): """ Python内部错误处理:默认行为是仍给excepthook """ - print("error in req : {}\n".format(req)) + print("error in request : {}\n".format(request)) sys.excepthook(exceptionType, exceptionValue, tb) #---------------------------------------------------------------------- - def _processRequest(self, req, session): # type: (Request, requests.Session)->None + def _processRequest(self, request, session): # type: (Request, requests.Session)->None """ 用于内部:将请求发送出去 """ try: - req = self.beforeRequest(req) + request = self.beforeRequest(request) - url = self.makeFullUrl(req.path) + url = self.makeFullUrl(request.path) - response = session.request(req.method, url, headers=req.headers, params=req.params, data=req.data) - req._response = response + response = session.request(request.method, url, headers=request.headers, params=request.params, data=request.data) + request._response = response httpStatusCode = response.status_code if httpStatusCode/100 == 2: jsonBody = response.json() - req.callback(jsonBody, req) - req._status = RequestStatus.success + request.callback(jsonBody, request) + request._status = RequestStatus.success else: - req._status = RequestStatus.failed + request._status = RequestStatus.failed - if req.onFailed: - req.onFailed(httpStatusCode, response.raw, req) + if request.onFailed: + request.onFailed(httpStatusCode, response.raw, request) # 若没有onFailed或者没设置skipDefaultOnFailed,则调用默认的处理函数 - if not req.onFailed or not req.skipDefaultOnFailed: - self.onFailed(httpStatusCode, req) + if not request.onFailed or not request.skipDefaultOnFailed: + self.onFailed(httpStatusCode, request) except: - req._status = RequestStatus.error + request._status = RequestStatus.error t, v, tb = sys.exc_info() - self.onError(t, v, tb, req) + self.onError(t, v, tb, request) def makeFullUrl(self, path): url = self.urlBase + path diff --git a/vnpy/api/websocket/WebSocketClient.py b/vnpy/api/websocket/WebSocketClient.py index 1ae22dae..c98bb339 100644 --- a/vnpy/api/websocket/WebSocketClient.py +++ b/vnpy/api/websocket/WebSocketClient.py @@ -3,38 +3,46 @@ ######################################################################## import json -import ssl import sys -import time -from abc import abstractmethod -from threading import Thread, Lock +import ssl +import time import websocket +from threading import Lock, Thread class WebSocketClient(object): """ Websocket API - 继承使用该类。 实例化之后,应调用start开始后台线程。调用start()函数会自动连接websocket。 若要终止后台线程,请调用stop()。 stop()函数会顺便断开websocket。 - 可以重写以下函数: + 该类默认打包方式为json,若从服务器返回的数据不为json,则会触发onError。 + + 可以覆盖以下回调: onConnected onDisconnected - onPacket + onPacket # 数据回调,只有在返回的数据帧为text并且内容为json时才会回调 onError 当然,为了不让用户随意自定义,用自己的init函数覆盖掉原本的init(host)也是个不错的选择。 - @note 继承使用该类 + 关于ping: + 在调用start()之后,该类每60s会自动发送一个ping帧至服务器。 """ #---------------------------------------------------------------------- def __init__(self): """Constructor""" self.host = None # type: str + + self.onConnected = self.defaultOnConnected + self.onDisconnected = self.defaultOnDisconnected + self.onPacket = self.defaultOnPacket + self.onError = self.defaultOnError + + self.createConnection = websocket.create_connection self._ws_lock = Lock() self._ws = None # type: websocket.WebSocket @@ -42,13 +50,12 @@ class WebSocketClient(object): self._workerThread = None # type: Thread self._pingThread = None # type: Thread self._active = False - - self.createConnection = websocket.create_connection #---------------------------------------------------------------------- def setCreateConnection(self, func): """ for internal usage + :param func: a function like websocket.create_connection """ self.createConnection = func @@ -68,8 +75,6 @@ class WebSocketClient(object): self._pingThread = Thread(target=self._runPing) self._pingThread.start() - self.onConnected() - #---------------------------------------------------------------------- def stop(self): """ @@ -136,6 +141,9 @@ class WebSocketClient(object): data = json.loads(stream) self.onPacket(data) + except websocket.WebSocketConnectionClosedException: + if self._active: + self._reconnect() except: et, ev, tb = sys.exc_info() self.onError(et, ev, tb) @@ -154,22 +162,24 @@ class WebSocketClient(object): return self._get_ws().send('ping', websocket.ABNF.OPCODE_PING) #---------------------------------------------------------------------- - def onConnected(self): + @staticmethod + def defaultOnConnected(): """ 连接成功回调 """ pass #---------------------------------------------------------------------- - def onDisconnected(self): + @staticmethod + def defaultOnDisconnected(): """ 连接断开回调 """ pass #---------------------------------------------------------------------- - @abstractmethod - def onPacket(self, packet): + @staticmethod + def defaultOnPacket(packet): """ 数据回调。 只有在数据为json包的时候才会触发这个回调 @@ -179,6 +189,7 @@ class WebSocketClient(object): pass #---------------------------------------------------------------------- - def onError(self, exceptionType, exceptionValue, tb): + @staticmethod + def defaultOnError(exceptionType, exceptionValue, tb): """Python错误回调""" return sys.excepthook(exceptionType, exceptionValue, tb) diff --git a/vnpy/trader/gateway/okexFutureGateway/okexFutureGateway.py b/vnpy/trader/gateway/okexFutureGateway/okexFutureGateway.py index 23bc9bed..f9346d9e 100644 --- a/vnpy/trader/gateway/okexFutureGateway/okexFutureGateway.py +++ b/vnpy/trader/gateway/okexFutureGateway/okexFutureGateway.py @@ -3,8 +3,7 @@ from __future__ import print_function import json -from abc import abstractmethod, abstractproperty -from datetime import datetime +from abc import abstractmethod from typing import Dict @@ -12,75 +11,6 @@ from vnpy.api.okexfuture.OkexFutureApi import * from vnpy.trader.vtFunction import getJsonPath from vnpy.trader.vtGateway import * -_orderTypeMap = { - (constant.DIRECTION_LONG, constant.OFFSET_OPEN): OkexFutureOrderType.OpenLong, - (constant.DIRECTION_SHORT, constant.OFFSET_OPEN): OkexFutureOrderType.OpenShort, - (constant.DIRECTION_LONG, constant.OFFSET_CLOSE): OkexFutureOrderType.CloseLong, - (constant.DIRECTION_SHORT, constant.OFFSET_CLOSE): OkexFutureOrderType.CloseShort, -} -_orderTypeMapReverse = {v: k for k, v in _orderTypeMap.items()} - -_contractTypeMap = { - 'THISWEEK': OkexFutureContractType.ThisWeek, - 'NEXTWEEK': OkexFutureContractType.NextWeek, - 'QUARTER': OkexFutureContractType.Quarter, -} -_contractTypeMapReverse = {v: k for k, v in _contractTypeMap.items()} - -_remoteSymbols = { - OkexFutureSymbol.BTC, - OkexFutureSymbol.LTC, - OkexFutureSymbol.ETH, - OkexFutureSymbol.ETC, - OkexFutureSymbol.BCH, -} - -# symbols for ui, -# keys:给用户看的symbols : f"{internalSymbol}_{contractType}" -# values: API接口使用的symbol和contractType字段 -_symbolsForUi = {(remoteSymbol.upper() + '_' + upperContractType.upper()): (remoteSymbol, remoteContractType) - for remoteSymbol in _remoteSymbols - for upperContractType, remoteContractType in - _contractTypeMap.items()} # type: Dict[str, List[str, str]] -_symbolsForUiReverse = {v: k for k, v in _symbolsForUi.items()} - - -#---------------------------------------------------------------------- -def localOrderTypeToRemote(direction, offset): # type: (str, str)->str - return _orderTypeMap[(direction, offset)] - - -#---------------------------------------------------------------------- -def remoteOrderTypeToLocal(orderType): # type: (str)->(str, str) - """ - :param orderType: - :return: direction, offset - """ - return _orderTypeMapReverse[orderType] - - -#---------------------------------------------------------------------- -def localContractTypeToRemote(localContractType): - return _contractTypeMap[localContractType] - - -#---------------------------------------------------------------------- -def remoteContractTypeToLocal(remoteContractType): - return _contractTypeMapReverse[remoteContractType] - - -#---------------------------------------------------------------------- -def localSymbolToRemote(symbol): # type: (str)->(OkexFutureSymbol, OkexFutureContractType) - """ - :return: remoteSymbol, remoteContractType - """ - return _symbolsForUi[symbol] - - -#---------------------------------------------------------------------- -def remoteSymbolToLocal(remoteSymbol, localContractType): - return remoteSymbol.upper() + localContractType - ######################################################################## class VnpyGateway(VtGateway): @@ -89,15 +19,6 @@ class VnpyGateway(VtGateway): 于是我设计了这个类,将重复代码抽取出来,简化gateway的实现 """ - #---------------------------------------------------------------------- - def __init__(self, eventEngine): - super(VnpyGateway, self).__init__(eventEngine, self.gatewayName) - - #---------------------------------------------------------------------- - @abstractproperty - def gatewayName(self): # type: ()->str - return 'VnpyGateway' - #---------------------------------------------------------------------- def readConfig(self): """ @@ -146,10 +67,16 @@ class OkexFutureGateway(VnpyGateway): #---------------------------------------------------------------------- def __init__(self, eventEngine, *_, **__): # args, kwargs is needed for compatibility """Constructor""" - super(OkexFutureGateway, self).__init__(eventEngine) + super(OkexFutureGateway, self).__init__(eventEngine, 'OkexFutureGateway') self.apiKey = None # type: str self.apiSecret = None # type: str - self.api = OkexFutureRestClient() + + self.restApi = OkexFutureRestClient() + + self.webSocket = OkexFutureWebSocketClient() + self.webSocket.onTick = self._onTick + self.webSocket.onUserTrade = self._onUserTrade + self.leverRate = 1 self.symbols = [] @@ -157,11 +84,6 @@ class OkexFutureGateway(VnpyGateway): self._orders = {} # type: Dict[str, _Order] self._remoteIds = {} # type: Dict[str, _Order] - #---------------------------------------------------------------------- - @property - def gatewayName(self): - return 'OkexFutureGateway' - #---------------------------------------------------------------------- @property def exchange(self): # type: ()->str @@ -190,20 +112,28 @@ class OkexFutureGateway(VnpyGateway): #---------------------------------------------------------------------- def connect(self): self.loadSetting() - self.api.init(self.apiKey, self.apiSecret) + self.restApi.init(self.apiKey, self.apiSecret) + self.webSocket.init(self.apiKey, self.apiSecret) + self.restApi.start() + self.webSocket.start() #---------------------------------------------------------------------- - def subscribe(self, subscribeReq): + def subscribe(self, subscribeReq): # type: (VtSubscribeReq)->None """订阅行情""" - pass + remoteSymbol, remoteContractType = localSymbolToRemote(subscribeReq.symbol) + return self.webSocket.subscribe(remoteSymbol, remoteContractType) #---------------------------------------------------------------------- def _getOrderByLocalId(self, localId): - return self._orders[localId] + if localId in self._orders: + return self._orders[localId] + return None #---------------------------------------------------------------------- def _getOrderByRemoteId(self, remoteId): - return self._remoteIds[remoteId] + if remoteId in self._remoteIds: + return self._remoteIds[remoteId] + return None #---------------------------------------------------------------------- def _saveRemoteId(self, remoteId, myorder): @@ -241,16 +171,16 @@ class OkexFutureGateway(VnpyGateway): if vtRequest.priceType == constant.PRICETYPE_MARKETPRICE: userMarketPrice = True - self.api.sendOrder(symbol=remoteSymbol, - contractType=remoteContractType, - orderType=orderType, - volume=vtRequest.volume, - price=vtRequest.price, - useMarketPrice=userMarketPrice, - leverRate=self.leverRate, - onSuccess=self._onOrderSent, - extra=None, - ) + self.restApi.sendOrder(symbol=remoteSymbol, + contractType=remoteContractType, + orderType=orderType, + volume=vtRequest.volume, + price=vtRequest.price, + useMarketPrice=userMarketPrice, + leverRate=self.leverRate, + onSuccess=self._onOrderSent, + extra=None, + ) return myorder.localId @@ -258,13 +188,15 @@ class OkexFutureGateway(VnpyGateway): def cancelOrder(self, vtCancel): # type: (VtCancelOrderReq)->None """撤单""" myorder = self._getOrderByLocalId(vtCancel.orderID) + assert myorder is not None, u"理论上是无法取消一个不存在的本地单的" + symbol, contractType = localSymbolToRemote(vtCancel.symbol) - self.api.cancelOrder(symbol=symbol, - contractType=contractType, - orderId=myorder.remoteId, - onSuccess=self._onOrderCanceled, - extra=myorder, - ) + self.restApi.cancelOrder(symbol=symbol, + contractType=contractType, + orderId=myorder.remoteId, + onSuccess=self._onOrderCanceled, + extra=myorder, + ) # cancelDict: 不存在的,没有localId就没有remoteId,没有remoteId何来cancel #---------------------------------------------------------------------- @@ -284,15 +216,15 @@ class OkexFutureGateway(VnpyGateway): remoteContractType = contractType localContractType = remoteContractTypeToLocal(remoteContractType) - self.api.queryOrders(symbol=symbol, - contractType=remoteContractType, - status=status, - onSuccess=self._onQueryOrders, - extra=localContractType) + self.restApi.queryOrders(symbol=symbol, + contractType=remoteContractType, + status=status, + onSuccess=self._onQueryOrders, + extra=localContractType) #---------------------------------------------------------------------- def qryAccount(self): - self.api.queryUserInfo(onSuccess=self._onQueryAccount) + self.restApi.queryUserInfo(onSuccess=self._onQueryAccount) """查询账户资金""" pass @@ -301,16 +233,17 @@ class OkexFutureGateway(VnpyGateway): """查询持仓""" for remoteSymbol in _remoteSymbols: for localContractType, remoteContractType in _contractTypeMap.items(): - self.api.queryPosition(remoteSymbol, - remoteContractType, - onSuccess=self._onQueryPosition, - extra=localContractType - ) + self.restApi.queryPosition(remoteSymbol, + remoteContractType, + onSuccess=self._onQueryPosition, + extra=localContractType + ) #---------------------------------------------------------------------- def close(self): """关闭""" - self.api.stop() + self.restApi.stop() + self.webSocket.stop() #---------------------------------------------------------------------- def _onOrderSent(self, remoteId, myorder): #type: (str, _Order)->None @@ -319,23 +252,6 @@ class OkexFutureGateway(VnpyGateway): self._saveRemoteId(remoteId, myorder) self.onOrder(myorder.vtOrder) - # #---------------------------------------------------------------------- - # def _pushOrderAsTraded(self, order): - # trade = VtTradeData() - # trade.gatewayName = order.gatewayName - # trade.symbol = order.symbol - # trade.vtSymbol = order.vtSymbol - # trade.orderID = order.orderID - # trade.vtOrderID = order.vtOrderID - # self.tradeID += 1 - # trade.tradeID = str(self.tradeID) - # trade.vtTradeID = '.'.join([self.gatewayName, trade.tradeID]) - # trade.direction = order.direction - # trade.price = order.price - # trade.volume = order.tradedVolume - # trade.tradeTime = datetime.now().strftime('%H:%M:%S') - # self.onTrade(trade) - #---------------------------------------------------------------------- @staticmethod def _onOrderCanceled(myorder): #type: (_Order)->None @@ -346,10 +262,10 @@ class OkexFutureGateway(VnpyGateway): localContractType = extra for order in orders: remoteId = order.remoteId - - if remoteId in self._remoteIds: + + myorder = self._getOrderByRemoteId(remoteId) + if myorder: # 如果订单已经缓存在本地,则尝试更新订单状态 - myorder = self._getOrderByRemoteId(remoteId) # 有新交易才推送更新 if order.tradedVolume != myorder.vtOrder.tradedVolume: @@ -367,12 +283,6 @@ class OkexFutureGateway(VnpyGateway): self._saveRemoteId(myorder.remoteId, myorder) self.onOrder(myorder.vtOrder) - # # 如果该订单已经交易完成,推送交易完成消息 - # # todo: 这样写会导致同一个订单产生多次交易完成消息 - # if order.status == OkexFutureOrderStatus.Finished: - # myorder.vtOrder.status = constant.STATUS_ALLTRADED - # self._pushOrderAsTraded(myorder.vtOrder) - #---------------------------------------------------------------------- def _onQueryAccount(self, infos, _): # type: (List[OkexFutureUserInfo], Any)->None for info in infos: @@ -390,27 +300,130 @@ class OkexFutureGateway(VnpyGateway): localContractType = extra for info in posinfo.holding: # 先生成多头持仓 - pos = VtPositionData() - pos.gatewayName = self.gatewayName - pos.symbol = remoteSymbolToLocal(pos.symbol, localContractType) - pos.exchange = self.exchange - pos.vtSymbol = '.'.join([pos.symbol, pos.exchange]) - - pos.direction = constant.DIRECTION_NET - pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction]) - pos.position = float(info.buyAmount) + pos = VtPositionData.createFromGateway( + gateway=self, + exchange=self.exchange, + symbol=remoteSymbolToLocal(info.symbol, localContractType), + direction=constant.DIRECTION_NET, + position=float(info.buyAmount), + ) self.onPosition(pos) # 再生存空头持仓 - pos = VtPositionData() - pos.gatewayName = self.gatewayName - pos.symbol = remoteSymbolToLocal(pos.symbol, localContractType) - pos.exchange = self.exchange - pos.vtSymbol = '.'.join([pos.symbol, pos.exchange]) - - pos.direction = constant.DIRECTION_SHORT - pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction]) - pos.position = float(info.sellAmount) + pos = VtPositionData.createFromGateway( + gateway=self, + exchange=self.exchange, + symbol=remoteSymbolToLocal(info.symbol, localContractType), + direction=constant.DIRECTION_SHORT, + position=float(info.sellAmount), + ) self.onPosition(pos) + + #---------------------------------------------------------------------- + def _onTick(self, info): # type: (OkexFutureTickInfo)->None + uiSymbol = remoteSymbolToLocal(info.symbol, remoteContractTypeToLocal(info.remoteContractType)) + self.onTick(VtTickData.createFromGateway( + gateway=self, + symbol=uiSymbol, + exchange=self.exchange, + lastPrice=info.last, + lastVolume=info.vol, + highPrice=info.high, + lowPrice=info.low, + openInterest=info.holdAmount, + lowerLimit=info.limitLow, + upperLimit=info.limitHigh, + )) + + def _onUserTrade(self, info): # type: (OkexFutureUserTradeInfo)->None + tradeID = str(self.tradeID) + self.tradeID += 1 + order = self._getOrderByRemoteId(info.remoteId) + if order: + self.onTrade(VtTradeData.createFromOrderData( + order=order.vtOrder, + tradeID=tradeID, + tradePrice=info.price, + tradeVolume=info.dealAmount # todo: 这里应该填写的到底是order总共成交了的数量,还是该次trade成交的数量 + )) + else: + # todo: 与order无关联的trade该如何处理? + # uiSymbol = remoteSymbolToLocal(info.symbol, remoteContractTypeToLocal(info.remoteContractType)) + pass + return + + +#---------------------------------------------------------------------- +def localOrderTypeToRemote(direction, offset): # type: (str, str)->str + return _orderTypeMap[(direction, offset)] + + +#---------------------------------------------------------------------- +def remoteOrderTypeToLocal(orderType): # type: (str)->(str, str) + """ + :param orderType: + :return: direction, offset + """ + return _orderTypeMapReverse[orderType] + + +#---------------------------------------------------------------------- +def localContractTypeToRemote(localContractType): + return _contractTypeMap[localContractType] + + +#---------------------------------------------------------------------- +def remoteContractTypeToLocal(remoteContractType): + return _contractTypeMapReverse[remoteContractType] + + +#---------------------------------------------------------------------- +def localSymbolToRemote(symbol): # type: (str)->(OkexFutureSymbol, OkexFutureContractType) + """ + :return: remoteSymbol, remoteContractType + """ + return _symbolsForUi[symbol] + + +#---------------------------------------------------------------------- +def remoteSymbolToLocal(remoteSymbol, localContractType): + return remoteSymbol.upper() + '_' + localContractType + + +_orderTypeMap = { + (constant.DIRECTION_LONG, constant.OFFSET_OPEN): OkexFutureOrderType.OpenLong, + (constant.DIRECTION_SHORT, constant.OFFSET_OPEN): OkexFutureOrderType.OpenShort, + (constant.DIRECTION_LONG, constant.OFFSET_CLOSE): OkexFutureOrderType.CloseLong, + (constant.DIRECTION_SHORT, constant.OFFSET_CLOSE): OkexFutureOrderType.CloseShort, +} +_orderTypeMapReverse = {v: k for k, v in _orderTypeMap.items()} + +_contractTypeMap = { + k.upper(): v for k, v in OkexFutureContractType.__dict__.items() if not k.startswith('_') +} +_contractTypeMapReverse = {v: k for k, v in _contractTypeMap.items()} + +_easySymbols = { + v for k, v in OkexFutureEasySymbol.__dict__.items() if not k.startswith('_') +} + +_remoteSymbols = { + v for k, v in OkexFutureSymbol.__dict__.items() if not k.startswith('_') +} + +# symbols for ui, +# keys:给用户看的symbols : f"{internalSymbol}_{contractType}" +# values: API接口使用的symbol和contractType字段 +_symbolsForUi = {(remoteSymbol.upper() + '_' + upperContractType.upper()): (remoteSymbol, remoteContractType) + for remoteSymbol in _remoteSymbols + for upperContractType, remoteContractType in + _contractTypeMap.items()} # type: Dict[str, List[str, str]] +_symbolsForUiReverse = {v: k for k, v in _symbolsForUi.items()} + +_channel_for_subscribe = { + 'ok_sub_futureusd_' + easySymbol + '_ticker_' + remoteContractType: (easySymbol, remoteContractType) + for easySymbol in _easySymbols + for remoteContractType in _contractTypeMap.values() +} diff --git a/vnpy/trader/vtObject.py b/vnpy/trader/vtObject.py index 1a844183..e5ac075f 100644 --- a/vnpy/trader/vtObject.py +++ b/vnpy/trader/vtObject.py @@ -1,8 +1,9 @@ # encoding: UTF-8 +from logging import INFO + import time from datetime import datetime -from logging import INFO from vnpy.trader.language import constant from vnpy.trader.vtConstant import (EMPTY_FLOAT, EMPTY_INT, EMPTY_STRING, EMPTY_UNICODE) @@ -74,8 +75,37 @@ class VtTickData(VtBaseData): self.askVolume2 = EMPTY_INT self.askVolume3 = EMPTY_INT self.askVolume4 = EMPTY_INT - self.askVolume5 = EMPTY_INT + self.askVolume5 = EMPTY_INT + #---------------------------------------------------------------------- + @staticmethod + def createFromGateway(gateway, symbol, exchange, + lastPrice, lastVolume, + highPrice, lowPrice, + openPrice=EMPTY_FLOAT, + openInterest=EMPTY_INT, + upperLimit=EMPTY_FLOAT, + lowerLimit=EMPTY_FLOAT): + tick = VtTickData() + tick.gatewayName = gateway.gatewayName + tick.symbol = symbol + tick.exchange = exchange + tick.vtSymbol = symbol + '.' + exchange + + tick.lastPrice = lastPrice + tick.lastVolume = lastVolume + tick.openInterest = openInterest + tick.datetime = datetime.now() + tick.date = tick.datetime.strftime('%Y%m%d') + tick.time = tick.datetime.strftime('%H:%M:%S') + + tick.openPrice = openPrice + tick.highPrice = highPrice + tick.lowPrice = lowPrice + tick.upperLimit = upperLimit + tick.lowerLimit = lowerLimit + return tick + ######################################################################## class VtBarData(VtBaseData): @@ -133,7 +163,27 @@ class VtTradeData(VtBaseData): self.price = EMPTY_FLOAT # 成交价格 self.volume = EMPTY_INT # 成交数量 self.tradeTime = EMPTY_STRING # 成交时间 + + #---------------------------------------------------------------------- + @staticmethod + def createFromGateway(gateway, symbol, exchange, tradeID, orderID, direction, tradePrice, tradeVolume): + trade = VtTradeData() + trade.gatewayName = gateway.gatewayName + trade.symbol = symbol + trade.exchange = exchange + trade.vtSymbol = symbol + '.' + exchange + + trade.orderID = orderID + trade.vtOrderID = trade.gatewayName + '.' + trade.tradeID + trade.tradeID = tradeID + trade.vtTradeID = trade.gatewayName + '.' + tradeID + + trade.direction = direction + trade.price = tradePrice + trade.volume = tradeVolume + trade.tradeTime = datetime.now().strftime('%H:%M:%S') + return trade #---------------------------------------------------------------------- @staticmethod def createFromOrderData(order, @@ -144,6 +194,7 @@ class VtTradeData(VtBaseData): trade.gatewayName = order.gatewayName trade.symbol = order.symbol trade.vtSymbol = order.vtSymbol + trade.orderID = order.orderID trade.vtOrderID = order.vtOrderID trade.tradeID = tradeID