[Mod]使用RestClient和WebsocketClient重新实现

This commit is contained in:
vn.py 2018-10-24 16:54:12 +08:00
parent 309a7e2c0a
commit 8f83b6dcf2

View File

@ -1,8 +1,9 @@
# encoding: UTF-8
'''
vnpy.api.bitmex的gateway接入
'''
from __future__ import print_function
import os
@ -14,11 +15,21 @@ import traceback
from datetime import datetime, timedelta
from copy import copy
from math import pow
from urllib import urlencode
from vnpy.api.bitmex import BitmexRestApi, BitmexWebsocketApi
from vnpy.api.rest import RestClient
from vnpy.api.websocket import WebsocketClient
from vnpy.trader.vtGateway import *
from vnpy.trader.vtFunction import getJsonPath, getTempPath
REST_HOST = 'https://www.bitmex.com/api/v1'
WEBSOCKET_HOST = 'wss://www.bitmex.com/realtime'
TESTNET_REST_HOST = 'https://testnet.bitmex.com/api/v1'
TESTNET_WEBSOCKET_HOST = 'wss://testnet.bitmex.com/realtime'
# 委托状态类型映射
statusMapReverse = {}
statusMapReverse['New'] = STATUS_NOTTRADED
@ -49,8 +60,8 @@ class BitmexGateway(VtGateway):
"""Constructor"""
super(BitmexGateway, self).__init__(eventEngine, gatewayName)
self.restApi = RestApi(self)
self.wsApi = WebsocketApi(self)
self.restApi = BitmexRestApi(self)
self.wsApi = BitmexWebsocketApi(self)
self.qryEnabled = False # 是否要启动循环查询
@ -153,24 +164,69 @@ class BitmexGateway(VtGateway):
########################################################################
class RestApi(BitmexRestApi):
class BitmexRestApi(RestClient):
"""REST API实现"""
#----------------------------------------------------------------------
def __init__(self, gateway):
"""Constructor"""
super(RestApi, self).__init__()
super(BitmexRestApi, self).__init__()
self.gateway = gateway # gateway对象
self.gatewayName = gateway.gatewayName # gateway对象名称
self.orderId = 1000000
self.date = int(datetime.now().strftime('%y%m%d%H%M%S')) * self.orderId
self.apiKey = ''
self.apiSecret = ''
self.orderId = 1000000
self.loginTime = 0
#----------------------------------------------------------------------
def sign(self, request):
"""BitMEX的签名方案"""
# 生成签名
expires = int(time.time() + 5)
if request.params:
query = urlencode(request.params)
path = request.path + '?' + query
else:
path = request.path
if request.data:
request.data = urlencode(request.data)
else:
request.data = ''
msg = request.method + '/api/v1' + path + str(expires) + request.data
signature = hmac.new(self.apiSecret, msg,
digestmod=hashlib.sha256).hexdigest()
# 添加表头
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json',
'api-key': self.apiKey,
'api-expires': str(expires),
'api-signature': signature
}
request.headers = headers
return request
#----------------------------------------------------------------------
def connect(self, apiKey, apiSecret, sessionCount, testnet):
"""连接服务器"""
self.init(apiKey, apiSecret, testnet)
self.apiKey = apiKey
self.apiSecret = apiSecret
self.loginTime = int(datetime.now().strftime('%y%m%d%H%M%S')) * self.orderId
if not testnet:
self.init(REST_HOST)
else:
self.init(TESTNET_REST_HOST)
self.start(sessionCount)
self.writeLog(u'REST API启动成功')
@ -187,10 +243,10 @@ class RestApi(BitmexRestApi):
def sendOrder(self, orderReq):
""""""
self.orderId += 1
orderId = self.date + self.orderId
orderId = self.loginTime + self.orderId
vtOrderID = '.'.join([self.gatewayName, str(orderId)])
req = {
data = {
'symbol': orderReq.symbol,
'side': directionMap[orderReq.direction],
'ordType': priceTypeMap[orderReq.priceType],
@ -199,24 +255,24 @@ class RestApi(BitmexRestApi):
'clOrdID': str(orderId)
}
# 市价单不能有price字段
if orderReq.priceType == PRICETYPE_MARKETPRICE:
req.pop('price')
self.addReq('POST', '/order', self.onSendOrder, postdict=req)
# 只有限价单才有price字段
if orderReq.priceType == PRICETYPE_LIMITPRICE:
data['price'] = orderReq.price
self.addRequest('POST', '/order', self.onSendOrder, data=data)
return vtOrderID
#----------------------------------------------------------------------
def cancelOrder(self, cancelOrderReq):
""""""
orderID = cancelOrderReq.orderID
if orderID.isdigit():
req = {'clOrdID': orderID}
else:
req = {'orderID': orderID}
self.addReq('DELETE', '/order', self.onCancelOrder, params=req)
if orderID.isdigit():
params = {'clOrdID': orderID}
else:
params = {'orderID': orderID}
self.addRequest('DELETE', '/order', self.onCancelOrder, params=params)
#----------------------------------------------------------------------
def onSendOrder(self, data, reqid):
@ -229,22 +285,40 @@ class RestApi(BitmexRestApi):
pass
#----------------------------------------------------------------------
def onError(self, code, error):
""""""
def onFailed(self, httpStatusCode, request): # type:(int, Request)->None
"""
请求失败处理函数HttpStatusCode!=2xx.
默认行为是打印到stderr
"""
e = VtErrorData()
e.errorID = code
e.errorID = error
e.gatewayName = self.gatewayName
e.errorID = httpStatusCode
e.errorMsg = request.response.text
self.gateway.onError(e)
print(request.response.text)
#----------------------------------------------------------------------
def onError(self, exceptionType, exceptionValue, tb, request):
"""
Python内部错误处理默认行为是仍给excepthook
"""
e = VtErrorData()
e.gatewayName = self.gatewayName
e.errorID = exceptionType
e.errorMsg = exceptionValue
self.gateway.onError(e)
traceback.print_exc()
########################################################################
class WebsocketApi(BitmexWebsocketApi):
class BitmexWebsocketApi(WebsocketClient):
""""""
#----------------------------------------------------------------------
def __init__(self, gateway):
"""Constructor"""
super(WebsocketApi, self).__init__()
super(BitmexWebsocketApi, self).__init__()
self.gateway = gateway
self.gatewayName = gateway.gatewayName
@ -273,10 +347,15 @@ class WebsocketApi(BitmexWebsocketApi):
self.apiKey = apiKey
self.apiSecret = apiSecret
if not testnet:
self.init(WEBSOCKET_HOST)
else:
self.init(TESTNET_WEBSOCKET_HOST)
self.start()
for symbol in symbols:
self.subscribeMarketData(symbol)
self.start(testnet)
self.subscribeMarketData(symbol)
#----------------------------------------------------------------------
def subscribeMarketData(self, symbol):
@ -289,50 +368,55 @@ class WebsocketApi(BitmexWebsocketApi):
self.tickDict[symbol] = tick
#----------------------------------------------------------------------
def onConnect(self):
def onConnected(self):
"""连接回调"""
self.writeLog(u'Websocket API连接成功')
self.authenticate()
#----------------------------------------------------------------------
def onData(self, data):
def onDisconnected(self):
"""连接回调"""
self.writeLog(u'Websocket API连接断开')
self.authenticate()
#----------------------------------------------------------------------
def onPacket(self, packet):
"""数据回调"""
if 'error' in data:
self.writeLog(u'Websocket API报错%s' %data['error'])
if 'error' in packet:
self.writeLog(u'Websocket API报错%s' %packet['error'])
if 'not valid' in data['error']:
if 'not valid' in packet['error']:
self.active = False
elif 'request' in data:
req = data['request']
success = data['success']
elif 'request' in packet:
req = packet['request']
success = packet['success']
if success:
if req['op'] == 'authKey':
self.writeLog(u'Websocket API验证授权成功')
self.subscribe()
elif 'table' in data:
name = data['table']
elif 'table' in packet:
name = packet['table']
callback = self.callbackDict[name]
if isinstance(data['data'], list):
for d in data['data']:
if isinstance(packet['data'], list):
for d in packet['data']:
callback(d)
else:
callback(data['data'])
#if data['action'] == 'update' and data['table'] != 'instrument':
#callback(data['data'])
#elif data['action'] == 'partial':
#for d in data['data']:
#callback(d)
callback(packet['data'])
#----------------------------------------------------------------------
def onError(self, msg):
"""错误回调"""
self.writeLog(msg)
def onError(self, exceptionType, exceptionValue, tb):
"""Python错误回调"""
e = VtErrorData()
e.gatewayName = self.gatewayName
e.errorID = exceptionType
e.errorMsg = exceptionValue
self.gateway.onError(e)
traceback.print_exc()
#----------------------------------------------------------------------
def writeLog(self, content):
@ -355,7 +439,7 @@ class WebsocketApi(BitmexWebsocketApi):
'op': 'authKey',
'args': [self.apiKey, expires, signature]
}
self.sendReq(req)
self.sendPacket(req)
#----------------------------------------------------------------------
def subscribe(self):
@ -364,7 +448,7 @@ class WebsocketApi(BitmexWebsocketApi):
'op': 'subscribe',
'args': ['instrument', 'trade', 'orderBook10', 'execution', 'order', 'position', 'margin']
}
self.sendReq(req)
self.sendPacket(req)
#----------------------------------------------------------------------
def onTick(self, d):