Merge pull request #1159 from nanoric/refactor.network

Refactor.network
This commit is contained in:
vn.py 2018-10-12 09:04:26 +08:00 committed by GitHub
commit 9d834b040e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1194 additions and 0 deletions

59
tests/Promise.py Normal file
View File

@ -0,0 +1,59 @@
# encoding: UTF-8
from Queue import Queue
from enum import Enum
class PromiseResultType(Enum):
Result = 1
Exception = 2
Traceback = 3
class Promise(object):
"""
用队列实现的一个简单的Promise类型
"""
def __init__(self):
self._queue = Queue()
def set_result(self, val):
self._queue.put((PromiseResultType.Result, val))
def get(self, timeout=None):
res = self._queue.get(timeout=timeout)
if res[0] == PromiseResultType.Result:
return res[1]
elif res[0] == PromiseResultType.Exception:
raise res[1]
else:
et, ev, tb = res[1]
raise et, ev, tb
def set_exception(self, valueOrType, val=None, tb=None):
if val is None:
self._queue.put((PromiseResultType.Exception, valueOrType))
else:
self._queue.put((PromiseResultType.Traceback, (valueOrType, val, tb)))
def catch(self):
"""
Usage :
with promise.catch():
raise Exception();
"""
return _PromiseCatchContext(self)
class _PromiseCatchContext(object):
def __init__(self, promise):
self.promise = promise
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.promise.set_exception(exc_type, exc_val, exc_tb)

0
tests/__init__.py Normal file
View File

8
tests/all_test.py Normal file
View File

@ -0,0 +1,8 @@
import unittest
# noinspection PyUnresolvedReferences
from tests.network.RestfulClientTest import *
from tests.network.WebSocketClientTest import *
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,86 @@
# encoding: UTF-8
import json
import unittest
from simplejson import JSONDecodeError
from Promise import Promise
from vnpy.network.RestClient import RestClient, Request
class FailedError(RuntimeError):
pass
class TestRestClient(RestClient):
def __init__(self):
urlBase = 'https://httpbin.org'
super(TestRestClient, self).__init__()
self.init(urlBase)
self.p = Promise()
def beforeRequest(self, req): #type: (Request)->Request
req.data = json.dumps(req.data)
req.headers = {'Content-Type': 'application/json'}
return req
def onError(self, exceptionType, exceptionValue, tb, req):
self.p.set_exception(exceptionType, exceptionValue, tb)
def onFailed(self, httpStatusCode, req):
with self.p.catch():
raise FailedError("request failed")
class RestfulClientTest(unittest.TestCase):
def setUp(self):
self.c = TestRestClient()
self.c.start()
def tearDown(self):
self.c.stop()
def test_addReq_get(self):
args = {'user': 'username',
'pw': 'password'}
def callback(data, req):
self.c.p.set_result(data['args'])
self.c.addReq('GET', '/get', callback, params=args)
res = self.c.p.get(3)
self.assertEqual(args, res)
def test_addReq_post(self):
body = {'user': 'username',
'pw': 'password'}
def callback(data, req):
self.c.p.set_result(data['json'])
self.c.addReq('POST', '/post', callback, data=body)
res = self.c.p.get(3)
self.assertEqual(body, res)
def test_addReq_onFailed(self):
def callback(data, req):
pass
self.c.addReq('POST', '/status/401', callback)
with self.assertRaises(FailedError):
self.c.p.get(3)
def test_addReq_jsonParseError(self):
def callback(data, req):
pass
self.c.addReq('GET', '/image/svg', callback)
with self.assertRaises(JSONDecodeError):
self.c.p.get(3)

View File

@ -0,0 +1,45 @@
# encoding: UTF-8
import json
import unittest
from Promise import Promise
from vnpy.network.WebSocketClient import WebsocketClient
class TestWebsocketClient(WebsocketClient):
def __init__(self):
host = 'wss://echo.websocket.org'
super(TestWebsocketClient, self).__init__()
self.init(host)
self.p = Promise()
def onMessage(self, packet):
self.p.set_result(packet)
pass
def onConnect(self):
pass
def onError(self, exceptionType, exceptionValue, tb):
self.p.set_exception(exceptionValue)
pass
class WebsocketClientTest(unittest.TestCase):
def setUp(self):
self.c = TestWebsocketClient()
self.c.start()
def tearDown(self):
self.c.stop()
def test_sendReq(self):
req = {
'name': 'val'
}
self.c.sendReq(req)
res = self.c.p.get(3)
self.assertEqual(res, req)

View File

View File

@ -0,0 +1,345 @@
# encoding: UTF-8
from enum import Enum
from typing import Any, Callable, List, Union
from vnpy.api.okexfuture.vnokexFuture import OkexFutureRestBase
from vnpy.network.RestClient import Request
class _OkexFutureCustomExtra(object):
def __init__(self, onSuccess, onFailed, extra):
self.onFailed = onFailed
self.onSuccess = onSuccess
self.extra = extra
class OkexFuturePriceType(Enum):
Buy = 'buy'
Sell = 'sell'
class OkexFutureContractType(Enum):
ThisWeek = 'this_week'
NextWeek = 'next_week'
Quarter = 'quarter'
class OkexFutureStatus(Enum):
NoTraded = '0'
PartialTraded = '1'
AllTraded = '2'
Canceled = '-1'
CancelProcessing = '4'
Canceling = '5'
class OkexFutureOrderType(Enum):
OpenLong = '1'
OpenShort = '2'
CloseLong = '3'
CloseShort = '4'
class OkexFutureOrder(object):
def __init__(self):
self.volume = None
self.contractName = None
self.createDate = None
self.tradedVolume = None
self.fee = None
self.leverRate = None
self.remoteId = None
self.price = None
self.priceAvg = None
self.status = None
self.symbol = None
self.orderType = None
self.unitAmount = None
class OkexFutureUserInfo(object):
def __init__(self):
self.accountRights = None
self.keepDeposit = None
self.profitReal = None
self.profitUnreal = None
self.riskRate = None
class OkexFuturePosition(object):
def __init__(self, ):
self.forceLiquidatePrice = None
self.holding = [] # type: List[OkexFuturePositionDetail]
class OkexFuturePositionDetail(object):
def __init__(self, ):
self.buyAmount = None
self.buyAvailable = None
self.buyPriceAvg = None
self.buyPriceCost = None
self.buyProfitReal = None
self.contractId = None
self.createDate = None
self.leverRate = None
self.sellAmount = None
self.sellAvailable = None
self.sellPriceAvg = None
self.sellPriceCost = None
self.sellProfitReal = None
self.symbol = None
self.contractType = None
########################################################################
class OkexFutureRestClient(OkexFutureRestBase):
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
super(OkexFutureRestClient, self).__init__()
self.client = ()
self._redirectedOnError = None # type: Callable[[object, object, object, Request], Any]
#----------------------------------------------------------------------
def setOnError(self, callback): # type: (Callable[[object, object, object, Request], Any])->None
self._redirectedOnError = callback
#----------------------------------------------------------------------
def onError(self, exceptionType, exceptionValue, tb, req):
if self._redirectedOnError:
self._redirectedOnError(exceptionType, exceptionValue, tb, req)
#----------------------------------------------------------------------
def onFailed(self, httpStatusCode, req):
super(OkexFutureRestClient, self).onFailed(httpStatusCode, req)
#----------------------------------------------------------------------
def sendOrder(self, symbol, contractType, orderType, volume,
onSuccess, onFailed=None,
price=None, useMarketPrice=False, leverRate=None,
extra=None): # type:(str, OkexFutureContractType, OkexFutureOrderType, float, Callable[[int, Any], Any], Callable[[Any], Any], float, bool, Union[int, None], Any)->Request
"""
:param symbol: str
:param contractType: OkexFutureContractType
:param orderType: OkexFutureOrderType
:param volume: float
:param onSuccess: (orderId: int)->Any
:param onFailed: ()->Any
:param price: float
:param useMarketPrice: bool
:param leverRate: int | None
:param extra: Any
:return: Request
"""
data = {}
if useMarketPrice:
data['match_price'] = 1
else:
data['price'] = price
data.update({
'symbol': symbol,
'contract_typ': contractType, # 合约类型:当周/下周/季度
'amount': volume,
'type': orderType,
})
if leverRate:
data['lever_rate'] = leverRate # 杠杆倍数
request = self.addReq('POST',
'/future_trade.do',
callback=self.onOrderSent,
data=data,
extra=_OkexFutureCustomExtra(onSuccess, onFailed, extra))
return request
#----------------------------------------------------------------------
def cancelOrder(self, symbol, contractType, orderId, onSuccess, onFailed=None,
extra=None): # type: (str, OkexFutureContractType, str, Callable[[object], Any], Callable[[Any], Any], Any)->Request
"""
:param symbol: str
:param contractType: OkexFutureContractType
:param orderId: str
:param onSuccess: ()->Any
:param onFailed: ()->Any
:param extra: Any
:return: Request
"""
data = {
'symbol': symbol,
'contractType': contractType,
'order_id': orderId
}
return self.addReq('POST',
'/future_cancel.do',
callback=self.onOrderCanceled,
data=data,
extra=_OkexFutureCustomExtra(onSuccess, onFailed, extra))
#----------------------------------------------------------------------
def queryOrder(self, symbol, contractType, orderId, onSuccess, onFailed=None,
extra=None): # type: (str, OkexFutureContractType, str, Callable[[OkexFutureOrder, Any], Any], Callable[[Any], Any], Any)->Request
"""
:param symbol: str
:param contractType: OkexFutureContractType
:param orderId: str
:param onSuccess: (OkexFutureOrder, extra:Any)->Any
:param onFailed: (extra: Any)->Any
:param extra: Any
:return: Request
"""
data = {
'symbol': symbol,
'contractType': contractType,
'order_id': orderId
}
return self.addReq('POST',
'/future_order_info.do',
callback=self.onOrder,
data=data,
extra=_OkexFutureCustomExtra(onSuccess, onFailed, extra))
#----------------------------------------------------------------------
def queryUserInfo(self, onSuccess, onFailed=None,
extra=None): # type: (Callable[[List[OkexFutureUserInfo], Any], Any], Callable[[Any], Any], Any)->Request
"""
查询用户信息
:param onSuccess: (userInfos: List[OkexFutureUserInfo], extra: Any)->Any
:param onFailed: (extra: Any)->Any
:param extra: Any
:return: Request
"""
return self.addReq('POST',
'/future_userinfo.do',
callback=self.onOrder,
extra=_OkexFutureCustomExtra(onSuccess, onFailed, extra))
#----------------------------------------------------------------------
def queryPosition(self, symbol, contractType,
onSuccess, onFailed=None,
extra=None): # type: (str, OkexFutureContractType, Callable[[OkexFuturePosition, Any], Any], Callable[[Any], Any], Any)->Request
data = {
'symbol': symbol,
'contractType': contractType
}
return self.addReq('POST',
'/future_position.do',
data=data,
callback=self.onPosition,
extra=_OkexFutureCustomExtra(onSuccess, onFailed, extra))
#----------------------------------------------------------------------
@staticmethod
def onOrderSent(data, req): # type: (dict, Request)->None
"""
下单回执一般用来保存sysId
"""
extra = req.extra # type: _OkexFutureCustomExtra
if data['result'] is True:
remoteId = data['order_id']
extra.onSuccess(remoteId, extra.extra)
else:
if extra.onFailed:
extra.onFailed(extra.extra)
#----------------------------------------------------------------------
@staticmethod
def onOrderCanceled(data, req): # type: (dict, Request)->None
"""
取消订单回执
"""
success = data['result']
extra = req.extra # type: _OkexFutureCustomExtra
if success:
extra.onSuccess(extra.extra)
else:
if extra.onFailed:
extra.onFailed(extra.extra)
#----------------------------------------------------------------------
@staticmethod
def onOrder(data, req): # type: (dict, Request)->None
success = data['result']
extra = req.extra # type: _OkexFutureCustomExtra
if success:
order = data['orders'][0]
okexOrder = OkexFutureOrder()
okexOrder.volume = order['amount']
okexOrder.contractName = order['contract_name']
okexOrder.createDate = order['create_date']
okexOrder.tradedVolume = order['deal_amount']
okexOrder.fee = order['fee']
okexOrder.leverRate = order['lever_rate']
okexOrder.remoteId = order['order_id']
okexOrder.price = order['price']
okexOrder.priceAvg = order['price_avg']
okexOrder.status = order['status']
okexOrder.orderType = order['type']
okexOrder.unitAmount = order['unit_amount']
okexOrder.symbol = order['symbol']
extra.onSuccess(okexOrder, extra.extra)
else:
if extra.onFailed:
extra.onFailed(extra.extra)
#----------------------------------------------------------------------
@staticmethod
def onUserInfo(data, req): # type: (dict, Request)->None
success = data['result']
extra = req.extra # type: _OkexFutureCustomExtra
if success:
infos = data['info']
uis = []
for symbol, info in infos.items(): # type: str, dict
ui = OkexFutureUserInfo()
ui.accountRights = info['account_rights']
ui.keepDeposit = info['keep_deposit']
ui.profitReal = info['profit_real']
ui.profitUnreal = info['profit_unreal']
ui.riskRate = info['risk_rate']
uis.append(ui)
extra.onSuccess(uis, extra.extra)
else:
if extra.onFailed:
extra.onFailed(extra.extra)
#----------------------------------------------------------------------
@staticmethod
def onPosition(data, req): # type: (dict, Request)->None
success = data['result']
extra = req.extra # type: _OkexFutureCustomExtra
if success:
pos = OkexFuturePosition()
pos.forceLiquidatePrice = data['force_liqu_price']
for item in data['holding']:
posDetail = OkexFuturePositionDetail()
posDetail.buyAmount = item['buy_amount']
posDetail.buyAvailable = item['buy_available']
posDetail.buyPriceAvg = item['buy_price_avg']
posDetail.buyPriceCost = item['buy_price_cost']
posDetail.buyProfitReal = item['buy_profit_real']
posDetail.contractId = item['contract_id']
posDetail.contractType = item['contract_type']
posDetail.createDate = item['create_date']
posDetail.leverRate = item['lever_rate']
posDetail.sellAmount = item['sell_amount']
posDetail.sellAvailable = item['sell_available']
posDetail.sellPriceAvg = item['sell_price_avg']
posDetail.sellPriceCost = item['sell_price_cost']
posDetail.sellProfitReal = item['sell_profit_real']
posDetail.symbol = item['symbol']
pos.holding.append(posDetail)
extra.onSuccess(pos, extra.extra)
else:
if extra.onFailed:
extra.onFailed(extra.extra)

View File

View File

@ -0,0 +1,43 @@
# encoding: UTF-8
import hashlib
import urllib
########################################################################
from vnpy.network.RestClient import RestClient, Request
########################################################################
class OkexFutureRestBase(RestClient):
#----------------------------------------------------------------------
def __init__(self):
super(OkexFutureRestBase, self).__init__()
self.apiKey = None
self.apiSecret = None
#----------------------------------------------------------------------
# noinspection PyMethodOverriding
def init(self, apiKey, apiSecret):
# type: (str, str) -> any
super(OkexFutureRestBase, self).init('https://www.okex.com/api/v1')
self.apiKey = apiKey
self.apiSecret = apiSecret
#----------------------------------------------------------------------
def beforeRequest(self, req): # type: (Request)->Request
args = req.params or {}
args.update(req.data or {})
if 'sign' in args:
args.pop('sign')
if 'apiKey' not in args:
args['api_key'] = self.apiKey
data = urllib.urlencode(sorted(args.items()))
data += "&secret_key=" + self.apiSecret
sign = hashlib.md5(data.encode()).hexdigest().upper()
data += "&sign=" + sign
req.headers = {'Content-Type': 'application/x-www-form-urlencoded'}
return req

241
vnpy/network/RestClient.py Normal file
View File

@ -0,0 +1,241 @@
# encoding: UTF-8
import sys
from Queue import Empty, Queue
from abc import abstractmethod
from multiprocessing.dummy import Pool
import requests
from enum import Enum
from typing import Any, Callable
class RequestStatus(Enum):
ready = 0 # 刚刚构建
success = 1 # 请求成功 code == 200
failed = 2
error = 3 # 发生错误 网络错误、json解析错误等等
########################################################################
class Request(object):
"""
表示一个内部的Request用于状态查询
"""
#----------------------------------------------------------------------
def __init__(self, method, path, callback, params, data, headers):
self.method = method # type: str
self.path = path # type: str
self.callback = callback # type: callable
self.params = params # type: dict #, bytes, str
self.data = data # type: dict #, bytes, str
self.headers = headers # type: dict
self.onFailed = None # type: callable
self.skipDefaultOnFailed = None # type: callable
self.extra = None # type: Any
self._response = None # type: requests.Response
self._status = RequestStatus.ready
#----------------------------------------------------------------------
@property
def success(self):
assert self.finished, "'success' property is only available after request is finished"
return self._status == RequestStatus.success
#----------------------------------------------------------------------
@property
def failed(self):
assert self.finished, "'failed' property is only available after request is finished"
return self._status == RequestStatus.failed
#----------------------------------------------------------------------
@property
def finished(self):
return self._status != RequestStatus.ready
#----------------------------------------------------------------------
@property
def error(self):
return self._status == RequestStatus.error
#----------------------------------------------------------------------
@property
def response(self): # type: ()->requests.Response
return self._response
#----------------------------------------------------------------------
def __str__(self):
statusCode = 'not finished'
if self._response:
statusCode = self._response.status_code
return "{} {} : {} {}\n".format(self.method, self.path, self._status, statusCode)
########################################################################
class RestClient(object):
"""
HTTP 客户端目前是为了对接各种RESTfulAPI而设计的
如果需要给请求加上签名请重载beforeRequest函数
如果需要处理非200的请求请重载onFailed函数
如果每一个请求的非200返回都需要单独处理使用addReq函数的onFailed参数
如果捕获Python内部错误例如网络连接失败等等请重载onError函数
"""
#----------------------------------------------------------------------
def __init__(self):
"""
:param urlBase: 路径前缀 例如'https://www.bitmex.com/api/v1/'
"""
self.urlBase = None # type: str
self.sessionProvider = requestsSessionProvider
self._active = False
self._queue = Queue()
self._pool = None # type: Pool
#----------------------------------------------------------------------
def init(self, urlBase):
self.urlBase = urlBase
#----------------------------------------------------------------------
def setSessionProvider(self, sessionProvider):
"""
设置sessionProvider可以使用自定义的requests实现
@:param sessionProvider: callable调用后应该返回一个对象带request函数的对象该request函数的用法应该和requests中的一致 \
每个工作线程会调用该函数一次以期获得一个独立的session实例
"""
self.sessionProvider = sessionProvider
#----------------------------------------------------------------------
def start(self, n=3):
"""启动"""
assert not self._active
self._active = True
self._pool = Pool(n)
self._pool.apply_async(self._run)
#----------------------------------------------------------------------
def stop(self):
"""
强制停止运行未发出的请求都会被暂停仍处于队列中
:return:
"""
self._active = False
#----------------------------------------------------------------------
def addReq(self, method, path, callback,
params=None, data=None, headers = None,
onFailed=None, skipDefaultOnFailed=True,
extra=None): # type: (str, str, Callable[[dict, Request], Any], dict, dict, dict, Callable[[dict, Request], Any], bool, Any)->Request
"""
发送一个请求
:param method: GET, POST, PUT, DELETE, QUERY
:param path:
:param callback: 请求成功后的回调(状态吗为2xx时认为请求成功) type: (dict, Request)
:param params: dict for query string
:param data: dict for body
:param headers: dict for headers
:param onFailed: 请求失败后的回调(状态吗不为2xx时认为请求失败) type: (code, dict, Request)
:param skipDefaultOnFailed: 仅当onFailed参数存在时有效忽略对虚函数onFailed的调用
:param extra: 返回值的extra字段会被设置为这个值当然你也可以在函数调用之后再设置这个字段
:return: Request
"""
req = Request(method, path, callback, params, data, headers)
req.onFailed = onFailed
req.skipDefaultOnFailed = skipDefaultOnFailed
req.extra = extra
self._queue.put(req)
return req
#----------------------------------------------------------------------
def _run(self):
session = self.sessionProvider()
while self._active:
try:
req = self._queue.get(timeout=1)
self.processReq(req, session)
except Empty:
pass
#----------------------------------------------------------------------
@abstractmethod
def beforeRequest(self, req): # type: (Request)->Request
"""
所有请求在发送之前都会经过这个函数
签名之类的前奏可以在这里面实现
需要对request进行什么修改就做什么修改吧
@:return (req)
"""
return req
#----------------------------------------------------------------------
def onFailed(self, httpStatusCode, req): # type:(int, Request)->None
"""
请求失败处理函数HttpStatusCode!=200.
默认行为是打印到stderr
"""
print("reuqest : {} {} failed with {}: \n"
"headers: {}\n"
"params: {}\n"
"data: {}\n"
"response:"
"{}\n"
.format(req.method, req.path, httpStatusCode,
req.headers,
req.params,
req.data,
req._response.raw))
#----------------------------------------------------------------------
def onError(self, exceptionType, exceptionValue, tb, req):
"""
Python内部错误处理默认行为是仍给excepthook
"""
print("error in req : {}\n".format(req))
sys.excepthook(exceptionType, exceptionValue, tb)
#----------------------------------------------------------------------
def processReq(self, req, session): # type: (Request, requests.Session)->None
"""处理请求"""
try:
req = self.beforeRequest(req)
url = self.makeFullUrl(req.path)
response = session.request(req.method, url, headers=req.headers, params=req.params, data=req.data)
req._response = response
httpStatusCode = response.status_code
if httpStatusCode/100 == 2:
jsonBody = response.json()
req.callback(jsonBody, req)
req._status = RequestStatus.success
else:
req._status = RequestStatus.failed
if req.onFailed:
req.onFailed(httpStatusCode, response.raw, req)
# 若没有onFailed或者没设置skipDefaultOnFailed则调用默认的处理函数
if not req.onFailed or not req.skipDefaultOnFailed:
self.onFailed(httpStatusCode, req)
except:
req._status = RequestStatus.error
t, v, tb = sys.exc_info()
self.onError(t, v, tb, req)
def makeFullUrl(self, path):
url = self.urlBase + path
return url
########################################################################
def requestsSessionProvider():
return requests.session()

View File

@ -0,0 +1,151 @@
# encoding: UTF-8
########################################################################
import json
import ssl
import sys
import time
from abc import abstractmethod
from threading import Thread, Lock
import websocket
class WebsocketClient(object):
"""Websocket API"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.host = None # type: str
self._ws_lock = Lock()
self._ws = None # type: websocket.WebSocket
self._workerThread = None # type: Thread
self._pingThread = None # type: Thread
self._active = False
#----------------------------------------------------------------------
def init(self, host):
self.host = host
#----------------------------------------------------------------------
def start(self):
"""启动"""
self._connect()
self._active = True
self._workerThread = Thread(target=self._run)
self._workerThread.start()
self._pingThread = Thread(target=self._runPing)
self._pingThread.start()
self.onConnect()
#----------------------------------------------------------------------
def stop(self):
"""
关闭
@note 不能从工作线程也就是websocket的回调中调用
"""
self._active = False
self._disconnect()
#----------------------------------------------------------------------
def sendReq(self, req): # type: (dict)->None
"""发出请求"""
return self._get_ws().send(json.dumps(req), opcode=websocket.ABNF.OPCODE_TEXT)
#----------------------------------------------------------------------
def sendText(self, text): # type: (str)->None
"""发出请求"""
return self._get_ws().send(text, opcode=websocket.ABNF.OPCODE_TEXT)
#----------------------------------------------------------------------
def sendData(self, data): # type: (bytes)->None
"""发出请求"""
return self._get_ws().send_binary(data)
#----------------------------------------------------------------------
def _reconnect(self):
"""重连"""
self._disconnect()
self._connect()
#----------------------------------------------------------------------
def _connect(self):
""""""
self._ws = websocket.create_connection(self.host, sslopt={'cert_reqs': ssl.CERT_NONE})
self.onConnect()
#----------------------------------------------------------------------
def _disconnect(self):
"""
断开连接
"""
with self._ws_lock:
if self._ws:
self._ws.close()
self._ws = None
#----------------------------------------------------------------------
def _get_ws(self):
with self._ws_lock:
return self._ws
#----------------------------------------------------------------------
def _run(self):
"""运行"""
ws = self._get_ws()
while self._active:
try:
stream = ws.recv()
if not stream:
if self._active:
self._reconnect()
continue
data = json.loads(stream)
self.onMessage(data)
except:
et, ev, tb = sys.exc_info()
self.onError(et, ev, tb)
#----------------------------------------------------------------------
def _runPing(self):
while self._active:
self._ping()
for i in range(60):
if not self._active:
break
time.sleep(1)
#----------------------------------------------------------------------
def _ping(self):
return self._get_ws().send('ping', websocket.ABNF.OPCODE_PING)
#----------------------------------------------------------------------
@abstractmethod
def onConnect(self):
"""连接回调"""
pass
#----------------------------------------------------------------------
@abstractmethod
def onMessage(self, packet):
"""
数据回调
只有在数据为json包的时候才会触发这个回调
@:param data: dict
@:return:
"""
pass
#----------------------------------------------------------------------
@abstractmethod
def onError(self, exceptionType, exceptionValue, tb):
"""Python错误回调"""
pass

0
vnpy/network/__init__.py Normal file
View File

View File

@ -0,0 +1,214 @@
# encoding: UTF-8
from __future__ import print_function
import json
from abc import abstractmethod, abstractproperty
from vnpy.api.okexfuture.OkexFutureApi import *
from vnpy.trader.vtFunction import getJsonPath
from vnpy.trader.vtGateway import *
orderTypeMap = {
(constant.DIRECTION_LONG, constant.OFFSET_OPEN): OkexFutureOrderType.OpenLong,
(constant.DIRECTION_SHORT, constant.OFFSET_OPEN): OkexFutureOrderType.OpenShort,
(constant.DIRECTION_LONG, constant.OFFSET_CLOSE): OkexFutureOrderType.CloseLong,
(constant.DIRECTION_SHORT, constant.OFFSET_CLOSE): OkexFutureOrderType.CloseShort,
}
orderTypeMapReverse = {v: k for k, v in orderTypeMap.items()}
contracts = (
'btc_usd', 'ltc_usd', 'eth_usd', 'etc_usd', 'bch_usd',
)
contractTypeMap = {
'THISWEEK': OkexFutureContractType.ThisWeek,
'NEXTWEEK': OkexFutureContractType.NextWeek,
'QUARTER': OkexFutureContractType.Quarter,
}
# symbols for ui,
# keys:给用户看的symbols
# values: API接口使用的symbol和contractType字段
symbolsForUi = {} # type: dict[str, [str, str]]
for s in contracts:
for vtContractType, contractType_ in contractTypeMap.items():
vtSymbol = s + '_' + vtContractType
symbolsForUi[vtSymbol] = (s, contractType_)
########################################################################
class VnpyGateway(VtGateway):
"""
每个gateway有太多重复代码难以拓展和维护
于是我设计了这个类将重复代码抽取出来简化gateway的实现
"""
#----------------------------------------------------------------------
def __init__(self, eventEngine):
super(VnpyGateway, self).__init__(eventEngine, self.gatewayName)
#----------------------------------------------------------------------
@abstractproperty
def gatewayName(self): # type: ()->str
return 'VnpyGateway'
#----------------------------------------------------------------------
@abstractproperty
def exchange(self): # type: ()->str
return constant.EXCHANGE_UNKNOWN
#----------------------------------------------------------------------
def readConfig(self):
"""
从json文件中读取设置并将其内容返回为一个dict
:一个一个return:
"""
fileName = self.gatewayName + '_connect.json'
filePath = getJsonPath(fileName, __file__)
try:
with open(filePath, 'rt') as f:
return json.load(f)
except IOError:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'读取连接配置出错,请检查'
# todo: pop a message box is better
self.onLog(log)
return
#----------------------------------------------------------------------
@abstractmethod
def loadSetting(self):
"""
载入设置在connect的时候会被调用到
"""
pass
class _Order(object):
_lastLocalId = 0
def __init__(self):
_Order._lastLocalId += 1
self.localId = str(_Order._lastLocalId)
self.remoteId = None
self.vtOrder = None # type: VtOrderData
########################################################################
class OkexFutureGateway(VnpyGateway):
"""OKEX期货交易接口"""
#----------------------------------------------------------------------
def __init__(self, eventEngine, *args, **kwargs): # args, kwargs is needed for compatibility
"""Constructor"""
super(OkexFutureGateway, self).__init__(eventEngine)
self.apiKey = None # type: str
self.apiSecret = None # type: str
self.api = OkexFutureRestClient()
self.leverRate = 1
self.symbols = []
self.orders =
#----------------------------------------------------------------------
@property
def gatewayName(self):
return 'OkexFutureGateway'
#----------------------------------------------------------------------
@abstractproperty
def exchange(self): # type: ()->str
return constant.EXCHANGE_OKEXFUTURE
#----------------------------------------------------------------------
def loadSetting(self):
setting = self.readConfig()
if setting:
"""连接"""
# 载入json文件
try:
# todo: check by predefined settings names and types
# or check by validator
self.apiKey = str(setting['apiKey'])
self.apiSecret = str(setting['secretKey'])
self.leverRate = setting['leverRate']
self.symbols = setting['symbols']
except KeyError:
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = u'连接配置缺少字段,请检查'
self.onLog(log)
return
#----------------------------------------------------------------------
def connect(self):
self.loadSetting()
self.api.init(self.apiKey, self.apiSecret)
#----------------------------------------------------------------------
def subscribe(self, subscribeReq):
"""订阅行情"""
pass
#----------------------------------------------------------------------
def sendOrder(self, vtRequest): # type: (VtOrderReq)->str
"""发单"""
myorder = _Order()
localId = myorder.localId
vtOrder = VtOrderData()
vtOrder.orderID = localId
vtOrder.vtOrderID = ".".join([self.gatewayName, localId])
vtOrder.exchange = self.exchange
vtOrder.symbol = vtRequest.symbol
vtOrder.vtSymbol = '.'.join([vtOrder.symbol, vtOrder.exchange])
vtOrder.price = vtRequest.price
vtOrder.totalVolume = vtRequest.volume
vtOrder.direction = vtRequest.direction
myorder.vtOrder = vtOrder
symbol, contractType = symbolsForUi[vtRequest.symbol]
orderType = orderTypeMap[(vtRequest.priceType, vtRequest.offset)] # 开多、开空、平多、平空
userMarketPrice = False
if vtRequest.priceType == constant.PRICETYPE_MARKETPRICE:
userMarketPrice = True
self.api.sendOrder(symbol=symbol,
contractType=contractType,
orderType=orderType,
volume=vtRequest.volume,
price=vtRequest.price,
useMarketPrice=userMarketPrice,
leverRate=self.leverRate,
onSuccess=self.onOrderSent,
extra=None)
return myorder.localId
#----------------------------------------------------------------------
def cancelOrder(self, cancelOrderReq):
"""撤单"""
self.api.cancelOrder(cancelOrderReq)
#----------------------------------------------------------------------
def qryAccount(self):
"""查询账户资金"""
pass
#----------------------------------------------------------------------
def qryPosition(self):
"""查询持仓"""
self.api.spotUserInfo()
#----------------------------------------------------------------------
def close(self):
"""关闭"""
self.api.close()
def onOrderSent(self, remoteId, myorder): #type: (int, _Order)->None
myorder.remoteId = remoteId
myorder.vtOrder.status = constant.STATUS_NOTTRADED
self.onOrder(myorder.vtOrder)

View File

@ -87,6 +87,7 @@ EXCHANGE_HUOBI = 'HUOBI' # 火币比特币交易所
EXCHANGE_LBANK = 'LBANK' # LBANK比特币交易所
EXCHANGE_ZB = 'ZB' # 比特币中国比特币交易所
EXCHANGE_OKEX = 'OKEX' # OKEX比特币交易所
EXCHANGE_OKEXFUTURE = 'OKEXFUTURE' # OKEX比特币交易所-期货
EXCHANGE_BINANCE = "BINANCE" # 币安比特币交易所
EXCHANGE_BITFINEX = "BITFINEX" # Bitfinex比特币交易所
EXCHANGE_BITMEX = 'BITMEX' # BitMEX比特币交易所

View File

@ -83,6 +83,7 @@ EXCHANGE_HUOBI = 'HUOBI' # 火币比特币交易所
EXCHANGE_LBANK = 'LBANK' # LBANK比特币交易所
EXCHANGE_ZB = 'ZB' # 比特币中国比特币交易所
EXCHANGE_OKEX = 'OKEX' # OKEX比特币交易所
EXCHANGE_OKEXFUTURE = 'OKEXFUTURE' # OKEX比特币交易所-期货
EXCHANGE_BINANCE = "BINANCE" # 币安比特币交易所
EXCHANGE_BITFINEX = "BITFINEX" # Bitfinex比特币交易所
EXCHANGE_BITMEX = 'BITMEX' # BitMEX比特币交易所