[Add] New bitmex gateway for crypto currency futures trading

This commit is contained in:
vn.py 2019-01-16 15:22:44 +08:00
parent 32867a97c2
commit d5a39a7d7f
12 changed files with 702 additions and 37 deletions

View File

@ -3,6 +3,7 @@ from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
from vnpy.gateway.ib import IbGateway
from vnpy.gateway.futu import FutuGateway
from vnpy.gateway.bitmex import BitmexGateway
import os
import logging
@ -18,6 +19,7 @@ def main():
main_engine = MainEngine(event_engine)
main_engine.add_gateway(IbGateway)
main_engine.add_gateway(FutuGateway)
main_engine.add_gateway(BitmexGateway)
main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()

View File

@ -1 +1 @@
from .RestClient import Request, RequestStatus, RestClient
from .rest_client import Request, RequestStatus, RestClient

View File

@ -94,13 +94,19 @@ class RestClient(object):
self._queue = Queue()
self._pool = None # type: Pool
def init(self, url_base: str):
self.proxies = None
def init(self, url_base: str, proxy_host: str = "", proxy_port: int = 0):
"""
Init rest client with url_base which is the API root address.
e.g. 'https://www.bitmex.com/api/v1/'
"""
self.url_base = url_base
if proxy_host and proxy_port:
proxy = f"{proxy_host}:{proxy_port}"
self.proxies = {"http": proxy, "https": proxy}
def _create_session(self):
""""""
return requests.session()
@ -153,10 +159,17 @@ class RestClient(object):
:param extra: Any extra data which can be used when handling callback
:return: Request
"""
request = Request(method, path, params, data, headers, callback)
request.extra = extra
request.on_failed = on_failed
request.on_error = on_error
request = Request(
method,
path,
params,
data,
headers,
callback,
extra,
on_failed,
on_error
)
self._queue.put(request)
return request
@ -249,7 +262,8 @@ class RestClient(object):
url,
headers=request.headers,
params=request.params,
data=request.data
data=request.data,
proxies=self.proxies
)
request.response = response

View File

@ -1 +1 @@
from .WebsocketClient import WebsocketClient
from .websocket_client import WebsocketClient

View File

@ -43,14 +43,21 @@ class WebsocketClient(object):
self._ping_thread = None
self._active = False
self.proxy_host = None
self.proxy_port = None
# For debugging
self._last_sent_text = None
self._last_received_text = None
def init(self, host: str):
def init(self, host: str, proxy_host: str = "", proxy_port: int = 0):
""""""
self.host = host
if proxy_host and proxy_port:
self.proxy_host = proxy_host
self.proxy_port = proxy_port
def start(self):
"""
Start the client and on_connected function is called after webscoket
@ -116,7 +123,9 @@ class WebsocketClient(object):
""""""
self._ws = self._create_connection(
self.host,
sslopt={'cert_reqs': ssl.CERT_NONE}
sslopt={'cert_reqs': ssl.CERT_NONE},
http_proxy_host=self.proxy_host,
http_proxy_port=self.proxy_port
)
self.on_connected()
@ -219,13 +228,13 @@ class WebsocketClient(object):
pass
@staticmethod
def on_packet(packet):
def on_packet(packet: dict):
"""
Callback when receiving data from server.
"""
pass
def on_error(self, exception_type, exception_value, tb):
def on_error(self, exception_type: type, exception_value: Exception, tb):
"""
Callback when exception raised.
"""
@ -236,7 +245,12 @@ class WebsocketClient(object):
)
return sys.excepthook(exception_type, exception_value, tb)
def exception_detail(self, exception_type, exception_value, tb):
def exception_detail(
self,
exception_type: type,
exception_value: Exception,
tb
):
"""
Print detailed exception information.
"""
@ -256,13 +270,13 @@ class WebsocketClient(object):
)
return text
def _record_last_sent_text(self, text):
def _record_last_sent_text(self, text: str):
"""
Record last sent text for debug purpose.
"""
self._last_sent_text = text[:1000]
def _record_last_received_text(self, text):
def _record_last_received_text(self, text: str):
"""
Record last received text for debug purpose.
"""

View File

@ -0,0 +1 @@
from .bitmex_gateway import BitmexGateway

View File

@ -0,0 +1,618 @@
# encoding: UTF-8
"""
"""
from __future__ import print_function
import logging
import os
import json
import hashlib
import hmac
import sys
import time
import traceback
from datetime import datetime, timedelta
from copy import copy
from math import pow
from urllib.parse import urlencode
from requests import ConnectionError
from vnpy.api.rest import RestClient, Request
from vnpy.api.websocket import WebsocketClient
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import (
SubscribeRequest,
OrderRequest,
CancelRequest,
TickData,
OrderData,
TradeData,
PositionData,
AccountData,
ContractData
)
from vnpy.trader.constant import Direction, Status, PriceType, Exchange, Product
REST_HOST = "https://www.bitmex.com/api/v1"
WEBSOCKET_HOST = "wss://www.bitmex.com/realtime"
TESTNET_REST_HOST = "https://testnet.bitmex.com/api/v1"
TESTNET_WEBSOCKET_HOST = "wss://testnet.bitmex.com/realtime"
STATUS_BITMEX2VT = {
"New": Status.NOTTRADED,
"Partially filled": Status.PARTTRADED,
"Filled": Status.ALLTRADED,
"Canceled": Status.CANCELLED,
"Rejected": Status.REJECTED
}
DIRECTION_VT2BITMEX = {Direction.LONG: "Buy", Direction.SHORT: "Sell"}
DIRECTION_BITMEX2VT = {v: k for k, v in DIRECTION_VT2BITMEX.items()}
PRICETYPE_VT2BITMEX = {PriceType.LIMIT: "Limit", PriceType.MARKET: "Market"}
class BitmexGateway(BaseGateway):
"""
VN Trader Gateway for BitMEX connection.
"""
default_setting = {
"key": "",
"secret": "",
"session": 3,
"server": ["REAL",
"TESTNET"],
"proxy_host": "127.0.0.1",
"proxy_port": 1080
}
def __init__(self, event_engine):
"""Constructor"""
super(BitmexGateway, self).__init__(event_engine, "BITMEX")
self.rest_api = BitmexRestApi(self)
self.ws_api = BitmexWebsocketApi(self)
def connect(self, setting: dict):
""""""
key = setting["key"]
secret = setting["secret"]
session = setting["session"]
server = setting["server"]
proxy_host = setting["proxy_host"]
proxy_port = setting["proxy_port"]
self.rest_api.connect(
key,
secret,
session,
server,
proxy_host,
proxy_port
)
self.ws_api.connect(key, secret, server, proxy_host, proxy_port)
def subscribe(self, req: SubscribeRequest):
""""""
self.ws_api.subscribe(req)
def send_order(self, req):
""""""
return self.rest_api.send_order(req)
def cancel_order(self, req):
""""""
self.rest_api.cancel_order(req)
def query_account(self):
""""""
pass
def query_position(self):
""""""
pass
def close(self):
""""""
self.rest_api.stop()
self.ws_api.stop()
class BitmexRestApi(RestClient):
"""
BitMEX REST API
"""
def __init__(self, gateway: BaseGateway):
""""""
super(BitmexRestApi, self).__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.key = ""
self.secret = ""
self.order_count = 1000000
self.connect_time = 0
def sign(self, request):
"""
Generate BitMEX signature.
"""
# Sign
expires = int(time.time() + 5)
if request.params:
query = urlencode(request.params)
path = request.path + "?" + query
else:
path = request.path
if request.data:
request.data = urlencode(request.data)
else:
request.data = ""
msg = request.method + "/api/v1" + path + str(expires) + request.data
signature = hmac.new(
self.secret,
msg,
digestmod=hashlib.sha256
).hexdigest()
# Add headers
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
"api-key": self.key,
"api-expires": str(expires),
"api-signature": signature
}
request.headers = headers
return request
def connect(
self,
key: str,
secret: str,
session: int,
server: str,
proxy_host: str,
proxy_port: int
):
"""
Initialize connection to REST server.
"""
self.key = key
self.secret = secret
self.connect_time = int(
datetime.now().strftime("%y%m%d%H%M%S")
) * self.order_count
if server == "REAL":
self.init(REST_HOST, proxy_host, proxy_port)
else:
self.init(TESTNET_REST_HOST, proxy_host, proxy_port)
self.start(session)
self.gateway.write_log(u"REST API启动成功")
def send_order(self, req: SubscribeRequest):
""""""
self.order_count += 1
orderid = str(self.connect_time + self.order_count)
data = {
"symbol": req.symbol,
"side": DIRECTION_VT2BITMEX[req.direction],
"ordType": PRICETYPE_VT2BITMEX[req.price_type],
"price": req.price,
"orderQty": req.volume,
"clOrdID": orderid
}
# Only add price for limit order.
if req.price_type == PriceType.LIMIT:
data["price"] = req.price
order = req.create_order_data(orderid, self.gateway_name)
self.add_request(
"POST",
"/order",
callback=self.on_send_order,
data=data,
extra=order,
on_failed=self.on_send_order_failed,
on_error=self.on_send_order_error,
)
self.gateway.on_order(order)
return order.vt_orderid
def cancel_order(self, req: CancelRequest):
""""""
orderid = req.orderid
if orderID.isdigit():
params = {"clOrdID": orderid}
else:
params = {"orderID": orderid}
self.add_request(
"DELETE",
"/order",
callback=self.on_cancel_order,
params=params,
on_error=self.on_cancel_order_error,
)
def on_send_order_failed(self, _, request: Request):
"""
Callback when sending order failed on server.
"""
order = request.extra
order.status = Status.REJECTED
self.gateway.on_order(order)
def on_send_order_error(
self,
exception_type: type,
exception_value: Exception,
tb,
request: Request
):
"""
Callback when sending order caused exception.
"""
order = request.extra
order.status = Status.REJECTED
self.gateway.on_order(order)
# Record exception if not ConnectionError
if not issubclass(exception_type, ConnectionError):
self.on_error(exception_type, exception_value, tb, request)
def on_send_order(self, data, request):
""""""
pass
def on_cancel_order_error(
self,
exception_type: type,
exception_value: Exception,
tb,
request: Request
):
"""
Callback when cancelling order failed on server.
"""
# Record exception if not ConnectionError
if not issubclass(exception_type, Connection_error):
self.on_error(exception_type, exception_value, tb, request)
def on_cancel_order(self, data, request):
""""""
pass
def on_failed(self, status_code: int, request: Request):
"""
Callback to handle request failed.
"""
msg = f"请求失败,状态码:{status_code},信息:{request.response.text}"
self.gateway.write_log(msg)
def on_error(
self,
exception_type: type,
exception_value: Exception,
tb,
request: Request
):
"""
Callback to handler request exception.
"""
msg = f"触发异常,状态码:{exception_type},信息:{exception_value}"
self.gateway.write_log(msg)
sys.stderr.write(
self.exception_detail(exception_type,
exception_value,
tb,
request)
)
class BitmexWebsocketApi(WebsocketClient):
""""""
def __init__(self, gateway):
""""""
super(BitmexWebsocketApi, self).__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.key = ""
self.secret = ""
self.callbacks = {
"trade": self.on_tick,
"orderBook10": self.on_depth,
"execution": self.on_trade,
"order": self.on_order,
"position": self.on_position,
"margin": self.on_account,
"instrument": self.on_contract
}
self.ticks = {}
self.accounts = {}
self.orders = {}
self.trades = set()
def connect(
self,
key: str,
secret: str,
server: str,
proxy_host: str,
proxy_port: int
):
""""""
self.key = key
self.secret = secret.encode()
if server == "REAL":
self.init(WEBSOCKET_HOST, proxy_host, proxy_port)
else:
self.init(TESTNET_WEBSOCKET_HOST, proxy_host, proxy_port)
self.start()
def subscribe(self, req: SubscribeRequest):
"""
Subscribe to tick data upate.
"""
tick = TickData(
symbol=req.symbol,
exchange=req.exchange,
name=req.symbol,
datetime=datetime.now(),
gateway_name=self.gateway_name
)
self.ticks[req.symbol] = tick
def on_connected(self):
""""""
self.gateway.write_log(u"Websocket API连接成功")
self.authenticate()
def on_disconnected(self):
""""""
self.gateway.write_log(u"Websocket API连接断开")
def on_packet(self, packet: dict):
""""""
if "error" in packet:
self.gateway.write_log(u"Websocket API报错%s" % packet["error"])
if "not valid" in packet["error"]:
self.active = False
elif "request" in packet:
req = packet["request"]
success = packet["success"]
if success:
if req["op"] == "authKey":
self.gateway.write_log(u"Websocket API验证授权成功")
self.subscribe_topic()
elif "table" in packet:
name = packet["table"]
callback = self.callbacks[name]
if isinstance(packet["data"], list):
for d in packet["data"]:
callback(d)
else:
callback(packet["data"])
def on_error(self, exception_type: type, exception_value: Exception, tb):
""""""
msg = f"触发异常,状态码:{exception_type},信息:{exception_value}"
self.gateway.write_log(msg)
sys.stderr.write(
self.exception_detail(exception_type,
exception_value,
tb)
)
def authenticate(self):
"""
Authenticate websockey connection to subscribe private topic.
"""
expires = int(time.time())
method = "GET"
path = "/realtime"
msg = method + path + str(expires)
signature = hmac.new(
self.secret,
msg.encode(),
digestmod=hashlib.sha256
).hexdigest()
req = {"op": "authKey", "args": [self.key, expires, signature]}
self.send_packet(req)
def subscribe_topic(self):
"""
Subscribe to all private topics.
"""
req = {
"op":
"subscribe",
"args": [
"instrument",
"trade",
"orderBook10",
"execution",
"order",
"position",
"margin"
]
}
self.send_packet(req)
def on_tick(self, d):
""""""
symbol = d["symbol"]
tick = self.ticks.get(symbol, None)
if not tick:
return
tick.last_price = d["price"]
tick.datetime = datetime.strptime(d["timestamp"], "%Y-%m-%d %H:%M:%SZ")
self.gateway.on_tick(copy(tick))
def on_depth(self, d):
""""""
symbol = d["symbol"]
tick = self.ticks.get(symbol, None)
if not tick:
return
for n, buf in enumerate(d["bids"][:5]):
price, volume = buf
tick.__setattr__("bid_price_%s" % (n + 1), price)
tick.__setattr__("bid_volume_%s" % (n + 1), volume)
for n, buf in enumerate(d["asks"][:5]):
price, volume = buf
tick.__setattr__("ask_price_%s" % (n + 1), price)
tick.__setattr__("ask_volume_%s" % (n + 1), volume)
tick.datetime = datetime.strptime(d["timestamp"], "%Y-%m-%d %H:%M:%SZ")
self.gateway.on_tick(copy(tick))
def on_trade(self, d):
""""""
if not d["lastQty"]:
return
tradeid = d["execID"]
if tradeid in self.trades:
return
self.trades.add(tradeid)
if d["clOrdID"]:
orderid = d["clOrdID"]
else:
orderid = d["orderID"]
trade = TradeData(
symbol=d["symbol"],
exchange=Exchange.BITMEX,
orderid=orderid,
tradeid=tradeid,
direction=DIRECTION_BITMEX2VT[d["side"]],
price=d["lastPx"],
volume=d["lastQty"],
time=d["timestamp"][0:10].replace("-",
""),
gateway_name=self.gateway_name
)
self.gateway.on_trade(trade)
def on_order(self, d):
""""""
if "ordStatus" not in d:
return
sysid = d["orderID"]
order = self.orders.get(sysid, None)
if not order:
if d["clOrdID"]:
orderid = d["clOrdID"]
else:
orderid = sysid
time = d["timestamp"][0:10].replace("-", ""),
order = OrderData(
symbol=d["symbol"],
exchange=Exchange.BITMEX,
orderid=orderid,
direction=DIRECTION_BITMEX2VT[d["side"]],
price=d["price"],
volume=d["orderQty"],
time=time,
gateway_name=self.gateway_name
)
self.orders[sysid] = order
order.traded = d.get("cumQty", order.tradedVolume)
order.status = STATUS_BITMEX2VT.get(d["ordStatus"], STATUS_UNKNOWN)
self.gateway.on_order(copy(order))
def on_position(self, d):
""""""
position = PositionData(
symbol=d["symbol"],
exchange=EXCHANGE_BITMEX,
direction=DIRECTION_NET,
position=d["currentQty"],
gateway_name=self.gateway_name
)
self.gateway.on_position(position)
def on_account(self, d):
""""""
accountid = str(d["account"])
account = self.accounts.get(accountid, None)
if not account:
account = AccountData(
accountid=accountid,
gateway_name=self.gateway_name
)
self.accounts[accountid] = account
account.balance = d.get("marginBalance", account.balance)
account.available = d.get("availableMargin", account.available)
account.frozen = account.balance - account.available
self.gateway.on_account(copy(account))
def on_contract(self, d):
""""""
if "tickSize" not in d:
return
if not d["lotSize"]:
return
contract = ContractData(
symbol=d["symbol"],
exchange=Exchange.BITMEX,
name=d["symbol"],
product=Product.FUTURES,
pricetick=d["tickSize"],
size=d["lotSize"],
gateway_name=self.gateway_name
)
self.gateway.on_contract(contract)

View File

@ -261,8 +261,9 @@ class FutuGateway(BaseGateway):
for ix, row in data.iterrows():
orderid = str(row["order_id"])
vt_orderid = f"{self.gateway_name}.{orderid}"
return vt_orderid
order = req.create_order_data(orderid, self.gateway_name)
self.gateway.on_order(order)
return order.vt_orderid
def cancel_order(self, req):
""""""

View File

@ -581,8 +581,9 @@ class IbApi(EWrapper):
self.client.placeOrder(self.orderid, ib_contract, ib_order)
self.client.reqIds(1)
vt_orderid = f"{self.gateway_name}.{self.orderid}"
return vt_orderid
order = req.create_order_data(str(self.orderid), self.gateway_name)
self.gateway.on_order(order)
return order.vt_orderid
def cancel_order(self, req: CancelRequest):
"""

View File

@ -18,6 +18,7 @@ class Offset(Enum):
"""
Offset of order/trade.
"""
NONE = ""
OPEN = ""
CLOSE = ""
CLOSETODAY = "平今"
@ -73,6 +74,7 @@ class Exchange(Enum):
"""
Exchange.
"""
# Chinese
CFFEX = "CFFEX"
SHFE = "SHFE"
CZCE = "CZCE"
@ -81,6 +83,8 @@ class Exchange(Enum):
SSE = "SSE"
SZSE = "SZSE"
SGE = "SGE"
# Global
SMART = "SMART"
NYMEX = "NYMEX"
GLOBEX = "GLOBEX"
@ -90,6 +94,9 @@ class Exchange(Enum):
SEHK = "SEHK"
HKFE = "HKFE"
# CryptoCurrency
BITMEX = "BITMEX"
class Currency(Enum):
"""

View File

@ -105,7 +105,7 @@ class OrderData(BaseData):
orderid: str
direction: Direction = ""
offset: Offset = None
offset: Offset = Offset.NONE
price: float = 0
volume: float = 0
traded: float = 0
@ -150,7 +150,7 @@ class TradeData(BaseData):
tradeid: str
direction: Direction = ""
offset: Offset = None
offset: Offset = Offset.NONE
price: float = 0
volume: float = 0
time: str = ""
@ -258,12 +258,28 @@ class OrderRequest:
volume: float
exchange: Exchange
price: float = 0
offset: Offset = ''
offset: Offset = Offset.NONE
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
def create_order_data(self, orderid: str, gateway_name: str):
"""
Create order data from request.
"""
order = OrderData(
symbol=self.symbol,
exchange=self.exchange,
orderid=orderid,
direction=self.direction,
offset=self.offset,
price=self.price,
volume=self.volume,
gateway_name=gateway_name
)
return order
@dataclass
class CancelRequest:

View File

@ -739,11 +739,7 @@ class TradingWidget(QtWidgets.QWidget):
"""
signal_tick = QtCore.pyqtSignal(Event)
exchange_map = {exchange.value:exchange for exchange in Exchange}
direction_map = {direction.value:direction for direction in Direction}
offset_map = {offset.value:offset for offset in Offset}
price_type_map = {price_type.value:price_type for price_type in PriceType}
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super(TradingWidget, self).__init__()
@ -949,7 +945,7 @@ class TradingWidget(QtWidgets.QWidget):
# Subscribe tick data
req = SubscribeRequest(
symbol=symbol,
exchange=self.exchange_map[exchange_value]
exchange=Exchange(exchange_value)
)
self.main_engine.subscribe(req, gateway_name)
@ -1012,19 +1008,14 @@ class TradingWidget(QtWidgets.QWidget):
else:
price = float(price_text)
exchange = self.exchange_map[str(self.exchange_combo.currentText())]
direction = self.direction_map[str(self.direction_combo.currentText())]
price_type = self.price_type_map[str(self.price_type_combo.currentText())]
offset = self.offset_map[str(self.offset_combo.currentText())]
req = OrderRequest(
symbol=symbol,
exchange=exchange,
direction=direction,
price_type=price_type,
exchange=Exchange(str(self.exchange_combo.currentText())),
direction=Direction(str(self.direction_combo.currentText())),
price_type=PriceType(str(self.price_type_combo.currentText())),
volume=volume,
price=price,
offset=offset
offset=Offset(str(self.offset_combo.currentText()))
)
gateway_name = str(self.gateway_combo.currentText())