Merge pull request #2114 from vnpy/dev-bitstamp-gateway

Dev bitstamp gateway
This commit is contained in:
vn.py 2019-09-25 23:20:32 +08:00 committed by GitHub
commit cdbbb7867e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 693 additions and 6 deletions

View File

@ -14,7 +14,7 @@
vn.py是一套基于Python的开源量化交易系统开发框架于2015年1月正式发布在开源社区5年持续不断的贡献下一步步成长为全功能量化交易平台目前国内外金融机构用户已经超过300家包括私募基金、证券自营和资管、期货资管和子公司、高校研究机构、自营交易公司、交易所、Token Fund等。
**傻瓜式入门教程**已经在官方微信公众号[**vnpy-community**]全新上线,新手使用过程中有任何疑问看这个解决是最快的,后续会不断增加进阶经验、发布公告、活动报名等功能,请扫描下方二维码关注
全新的《vn.py全实战进阶》在线课程已经在官方微信公众号[**vnpy-community**]上线50节内容覆盖从策略设计开发、参数回测优化到最终实盘自动交易的完整CTA量化业务流程。购买请扫描下方二维码关注后点击菜单栏的【进阶课程】按钮即可
<p align="center">
<img src ="https://vnpy.oss-cn-shanghai.aliyuncs.com/vnpy_qr.jpg"/>

View File

@ -29,6 +29,7 @@ from vnpy.gateway.okexs import OkexsGateway
# from vnpy.gateway.alpaca import AlpacaGateway
from vnpy.gateway.da import DaGateway
from vnpy.gateway.coinbase import CoinbaseGateway
from vnpy.gateway.bitstamp import BitstampGateway
from vnpy.app.cta_strategy import CtaStrategyApp
# from vnpy.app.csv_loader import CsvLoaderApp
@ -73,7 +74,8 @@ def main():
# main_engine.add_gateway(AlpacaGateway)
# main_engine.add_gateway(OkexsGateway)
# main_engine.add_gateway(DaGateway)
main_engine.add_gateway(CoinbaseGateway)
# main_engine.add_gateway(CoinbaseGateway)
main_engine.add_gateway(BitstampGateway)
main_engine.add_app(CtaStrategyApp)
main_engine.add_app(CtaBacktesterApp)

View File

@ -0,0 +1 @@
from .bitstamp_gateway import BitstampGateway

View File

@ -0,0 +1,675 @@
"""
Author: Wudi
bitstamp合约接口
"""
import hashlib
import hmac
import sys
import time
import uuid
from copy import copy
from datetime import datetime, timedelta
from urllib.parse import urlencode
from typing import Dict
import requests
from vnpy.api.rest import Request, RestClient, RequestStatus
from vnpy.api.websocket import WebsocketClient
from vnpy.trader.constant import (
Direction,
Exchange,
OrderType,
Product,
Status,
Interval
)
from vnpy.trader.gateway import BaseGateway, LocalOrderManager
from vnpy.trader.object import (
TickData,
OrderData,
TradeData,
BarData,
AccountData,
ContractData,
OrderRequest,
CancelRequest,
SubscribeRequest,
HistoryRequest
)
from vnpy.trader.event import EVENT_TIMER
REST_HOST = "https://www.bitstamp.net/api/v2"
WEBSOCKET_HOST = "wss://ws.bitstamp.net"
STATUS_BITSTAMP2VT = {
"ACTIVE": Status.NOTTRADED,
"PARTIALLY FILLED": Status.PARTTRADED,
"EXECUTED": Status.ALLTRADED,
"CANCELED": Status.CANCELLED,
}
ORDERTYPE_VT2BITSTAMP = {
OrderType.LIMIT: "EXCHANGE LIMIT",
OrderType.MARKET: "EXCHANGE MARKET",
}
DIRECTION_VT2BITSTAMP = {
Direction.LONG: "Buy",
Direction.SHORT: "Sell",
}
DIRECTION_BITSTAMP2VT = {
"0": Direction.LONG,
"1": Direction.SHORT,
}
INTERVAL_VT2BITSTAMP = {
Interval.MINUTE: "60",
Interval.HOUR: "3600",
Interval.DAILY: "86400",
}
TIMEDELTA_MAP = {
Interval.MINUTE: timedelta(minutes=1),
Interval.HOUR: timedelta(hours=1),
Interval.DAILY: timedelta(days=1),
}
symbol_name_map = {}
name_symbol_map = {}
class BitstampGateway(BaseGateway):
"""
VN Trader Gateway for BITSTAMP connection.
"""
default_setting = {
"key": "",
"secret": "",
"username": "",
"session": 3,
"proxy_host": "127.0.0.1",
"proxy_port": 1080,
}
exchanges = [Exchange.BITSTAMP]
def __init__(self, event_engine):
"""Constructor"""
super().__init__(event_engine, "BITSTAMP")
self.order_manager = LocalOrderManager(self)
self.rest_api = BitstampRestApi(self)
self.ws_api = BitstampWebsocketApi(self)
def connect(self, setting: dict):
""""""
key = setting["key"]
secret = setting["secret"]
username = setting["username"]
session = setting["session"]
proxy_host = setting["proxy_host"]
proxy_port = setting["proxy_port"]
self.rest_api.connect(key, secret, username,
session, proxy_host, proxy_port)
self.ws_api.connect(proxy_host, proxy_port)
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def subscribe(self, req: SubscribeRequest):
""""""
self.ws_api.subscribe(req)
def send_order(self, req: OrderRequest):
""""""
return self.rest_api.send_order(req)
def cancel_order(self, req: CancelRequest):
""""""
self.rest_api.cancel_order(req)
def query_account(self):
""""""
pass
def query_position(self):
""""""
pass
def query_history(self, req: HistoryRequest):
""""""
pass
def close(self):
""""""
self.rest_api.stop()
self.ws_api.stop()
def process_timer_event(self, event):
""""""
self.rest_api.query_account()
class BitstampRestApi(RestClient):
"""
Bitstamp REST API
"""
def __init__(self, gateway: BaseGateway):
""""""
super(BitstampRestApi, self).__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.order_manager = gateway.order_manager
self.key = ""
self.secret = ""
self.username = "qxfe9863"
self.order_count = 1_000_000
self.connect_time = 0
def connect(
self,
key: str,
secret: str,
username: str,
session: int,
proxy_host: str,
proxy_port: int,
):
"""
Initialize connection to REST server.
"""
self.key = key
self.secret = secret.encode()
self.username = username
self.connect_time = (
int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count
)
self.init(REST_HOST, proxy_host, proxy_port)
self.start(session)
self.gateway.write_log("REST API启动成功")
self.query_contract()
self.query_account()
def sign(self, request: Request):
"""
Sign Bitstamp request.
"""
if request.method == "GET":
return request
timestamp = str(int(round(time.time() * 1000)))
nonce = str(uuid.uuid4())
content_type = "application/x-www-form-urlencoded"
# Empty post data leads to API0020 error,
# so use this offset dict instead.
if not request.data:
request.data = {"offset": "1"}
payload_str = urlencode(request.data)
message = "BITSTAMP " + self.key + \
request.method + \
"www.bitstamp.net/api/v2" + \
request.path + \
"" + \
content_type + \
nonce + \
timestamp + \
"v2" + \
payload_str
message = message.encode("utf-8")
signature = hmac.new(
self.secret,
msg=message,
digestmod=hashlib.sha256
).hexdigest().upper()
request.headers = {
"X-Auth": "BITSTAMP " + self.key,
"X-Auth-Signature": signature,
"X-Auth-Nonce": nonce,
"X-Auth-Timestamp": timestamp,
"X-Auth-Version": "v2",
"Content-Type": content_type
}
request.data = payload_str
return request
def _process_request(
self, request: Request
):
"""
Bistamp API server does not support keep-alive connection.
So when using session.request will cause header related error.
Reimplement this method to use requests.request instead.
"""
try:
request = self.sign(request)
url = self.make_full_url(request.path)
response = requests.request(
request.method,
url,
headers=request.headers,
params=request.params,
data=request.data,
proxies=self.proxies,
)
request.response = response
status_code = response.status_code
if status_code // 100 == 2: # 2xx codes are all successful
if status_code == 204:
json_body = None
else:
json_body = response.json()
request.callback(json_body, request)
request.status = RequestStatus.success
else:
request.status = RequestStatus.failed
if request.on_failed:
request.on_failed(status_code, request)
else:
self.on_failed(status_code, request)
except Exception:
request.status = RequestStatus.error
t, v, tb = sys.exc_info()
if request.on_error:
request.on_error(t, v, tb, request)
else:
self.on_error(t, v, tb, request)
def query_order(self):
""""""
path = "/open_orders/all/"
self.add_request(
method="POST",
path=path,
callback=self.on_query_order
)
def on_query_order(self, data, request):
"""获取委托订单"""
for d in data:
sys_orderid = d["id"]
local_orderid = self.order_manager.get_local_orderid(sys_orderid)
direction = DIRECTION_BITSTAMP2VT[d["type"]]
name = d["currency_pair"]
symbol = name_symbol_map[name]
order = OrderData(
orderid=local_orderid,
symbol=symbol,
exchange=Exchange.BITSTAMP,
price=float(d["price"]),
volume=float(d["amount"]),
traded=float(0),
direction=direction,
status=Status.NOTTRADED,
time=d["datetime"],
gateway_name=self.gateway_name,
)
self.order_manager.on_order(order)
self.gateway.write_log("委托信息查询成功")
def query_account(self):
""""""
path = "/balance/"
self.add_request(
method="POST",
path=path,
callback=self.on_query_account
)
def on_query_account(self, data, request):
""""""
for key in data.keys():
if "balance" not in key:
continue
currency = key.replace("_balance", "")
account = AccountData(
accountid=currency,
balance=float(data[currency + "_balance"]),
frozen=float(data[currency + "_reserved"]),
gateway_name=self.gateway_name
)
self.gateway.on_account(account)
def query_contract(self):
""""""
self.add_request(
method="GET",
path="/trading-pairs-info/",
callback=self.on_query_contract,
)
def on_query_contract(self, data, request):
""""""
for d in data:
pricetick = 1 / pow(10, d["counter_decimals"])
min_volume = 1 / pow(10, d["base_decimals"])
contract = ContractData(
symbol=d["url_symbol"],
exchange=Exchange.BITSTAMP,
name=d["name"],
product=Product.SPOT,
size=1,
pricetick=pricetick,
min_volume=min_volume,
history_data=False,
gateway_name=self.gateway_name,
)
self.gateway.on_contract(contract)
symbol_name_map[contract.symbol] = contract.name
name_symbol_map[contract.name] = contract.symbol
self.gateway.write_log("合约信息查询成功")
self.query_order()
def cancel_order(self, req: CancelRequest):
""""""
path = "/cancel_order/"
sys_orderid = self.order_manager.get_sys_orderid(req.orderid)
data = {"id": sys_orderid}
self.add_request(
method="POST",
path=path,
data=data,
callback=self.on_cancel_order,
extra=req
)
def on_cancel_order(self, data, request):
""""""
error = data.get("error", "")
if error:
self.gateway.write_log(error)
return
cancel_request = request.extra
local_orderid = cancel_request.orderid
order = self.order_manager.get_order_with_local_orderid(local_orderid)
if order.is_active:
order.status = Status.CANCELLED
self.order_manager.on_order(order)
self.gateway.write_log(f"撤单成功:{order.orderid}")
def on_cancel_order_error(self, data, request):
""""""
error_msg = data["error"]
self.gateway.write_log(f"撤单请求出错,信息:{error_msg}")
def send_order(self, req: OrderRequest):
""""""
local_orderid = self.order_manager.new_local_orderid()
order = req.create_order_data(
local_orderid,
self.gateway_name
)
order.time = datetime.now().strftime("%H:%M:%S")
data = {
"amount": req.volume,
"price": req.price
}
if req.direction == Direction.LONG:
if req.type == OrderType.LIMIT:
path = f"/buy/{req.symbol}/"
elif req.type == OrderType.MARKET:
path = f"/buy/market/{req.symbol}/"
else:
if req.type == OrderType.LIMIT:
path = f"/sell/{req.symbol}/"
elif req.type == OrderType.MARKET:
path = f"/sell/market/{req.symbol}/"
self.add_request(
method="POST",
path=path,
data=data,
callback=self.on_send_order,
extra=order,
)
self.order_manager.on_order(order)
return order.vt_orderid
def on_send_order(self, data, request):
""""""
order = request.extra
status = data.get("status", None)
if status and status == "error":
order.status = Status.REJECTED
self.order_manager.on_order(order)
msg = data["reason"]["__all__"][0]
self.gateway.write_log(msg)
return
sys_orderid = data["id"]
self.order_manager.update_orderid_map(order.orderid, sys_orderid)
order.status = Status.NOTTRADED
self.order_manager.on_order(order)
def on_send_order_error(
self, exception_type: type, exception_value: Exception, tb, request: Request
):
"""
Callback when sending order caused exception.
"""
# Record exception if not ConnectionError
if not issubclass(exception_type, ConnectionError):
self.on_error(exception_type, exception_value, tb, request)
def on_failed(self, status_code: int, request: Request):
"""
Callback to handle request failed.
"""
data = request.response.json()
reason = data["reason"]
code = data["code"]
msg = f"{request.path} 请求失败,状态码:{status_code},错误信息:{reason},错误代码: {code}"
self.gateway.write_log(msg)
def on_error(
self, exception_type: type, exception_value: Exception, tb, request: Request
):
"""
Callback to handler request exception.
"""
msg = f"触发异常,状态码:{exception_type},信息:{exception_value}"
self.gateway.write_log(msg)
sys.stderr.write(
self.exception_detail(exception_type, exception_value, tb, request)
)
class BitstampWebsocketApi(WebsocketClient):
""""""
def __init__(self, gateway):
""""""
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.order_manager = gateway.order_manager
self.subscribed: Dict[str, SubscribeRequest] = {}
self.ticks: Dict[str, TickData] = {}
def connect(self, proxy_host: str, proxy_port: int):
""""""
self.init(WEBSOCKET_HOST, proxy_host, proxy_port)
self.start()
def on_connected(self):
""""""
self.gateway.write_log("Websocket API连接成功")
# Auto re-subscribe market data after reconnected
for req in self.subscribed.values():
self.subscribe(req)
def subscribe(self, req: SubscribeRequest):
""""""
self.subscribed[req.symbol] = req
if not self._active:
return
tick = TickData(
symbol=req.symbol,
name=symbol_name_map.get(req.symbol, ""),
exchange=Exchange.BITSTAMP,
datetime=datetime.now(),
gateway_name=self.gateway_name,
)
for prefix in [
"order_book_",
"live_trades_",
"live_orders_"
]:
channel = f"{prefix}{req.symbol}"
d = {
"event": "bts:subscribe",
"data": {
"channel": channel
}
}
self.ticks[channel] = tick
self.send_packet(d)
def on_packet(self, packet):
""""""
event = packet["event"]
if event == "trade":
self.on_market_trade(packet)
elif event == "data":
self.on_market_depth(packet)
elif "order_" in event:
self.on_market_order(packet)
elif event == "bts:request_reconnect":
self._disconnect() # Server requires to reconnect
def on_market_trade(self, packet):
""""""
channel = packet["channel"]
data = packet["data"]
tick = self.ticks[channel]
tick.last_price = data["price"]
tick.last_volume = data["amount"]
tick.datetime = datetime.fromtimestamp(int(data["timestamp"]))
self.gateway.on_tick(copy(tick))
# Order status check
buy_orderid = str(data["buy_order_id"])
sell_orderid = str(data["sell_order_id"])
for sys_orderid in [buy_orderid, sell_orderid]:
order = self.order_manager.get_order_with_sys_orderid(
sys_orderid)
if order:
order.traded += data["amount"]
if order.traded < order.volume:
order.status = Status.PARTTRADED
else:
order.status = Status.ALLTRADED
self.order_manager.on_order(copy(order))
trade = TradeData(
symbol=order.symbol,
exchange=order.exchange,
orderid=order.orderid,
tradeid=data["id"],
direction=order.direction,
price=data["price"],
volume=data["amount"],
time=tick.datetime.strftime("%H:%M:%S"),
gateway_name=self.gateway_name
)
self.gateway.on_trade(trade)
def on_market_depth(self, packet):
""""""
channel = packet["channel"]
data = packet["data"]
tick = self.ticks[channel]
tick.datetime = datetime.fromtimestamp(int(data["timestamp"]))
bids = data["bids"]
asks = data["asks"]
for n in range(5):
ix = n + 1
bid_price, bid_volume = bids[n]
tick.__setattr__(f"bid_price_{ix}", float(bid_price))
tick.__setattr__(f"bid_volume_{ix}", float(bid_volume))
ask_price, ask_volume = asks[n]
tick.__setattr__(f"ask_price_{ix}", float(ask_price))
tick.__setattr__(f"ask_volume_{ix}", float(ask_volume))
self.gateway.on_tick(copy(tick))
def on_market_order(self, packet):
""""""
event = packet["event"]
data = packet["data"]
if event != "order_deleted":
return
sys_orderid = str(data["id"])
order = self.order_manager.get_order_with_sys_orderid(sys_orderid)
if order and order.is_active():
order.status = Status.CANCELLED
self.order_manager.on_order(copy(order))

View File

@ -69,11 +69,14 @@ EXCHANGE_VT2IB = {
EXCHANGE_IB2VT = {v: k for k, v in EXCHANGE_VT2IB.items()}
STATUS_IB2VT = {
"Submitted": Status.NOTTRADED,
"Filled": Status.ALLTRADED,
"Cancelled": Status.CANCELLED,
"ApiPending": Status.SUBMITTING,
"PendingSubmit": Status.SUBMITTING,
"PreSubmitted": Status.NOTTRADED,
"Submitted": Status.NOTTRADED,
"ApiCancelled": Status.CANCELLED,
"Cancelled": Status.CANCELLED,
"Filled": Status.ALLTRADED,
"Inactive": Status.REJECTED,
}
PRODUCT_VT2IB = {
@ -358,9 +361,13 @@ class IbApi(EWrapper):
orderid = str(orderId)
order = self.orders.get(orderid, None)
order.status = STATUS_IB2VT[status]
order.traded = filled
# To filter PendingCancel status
order_status = STATUS_IB2VT.get(status, None)
if order_status:
order.status = order_status
self.gateway.on_order(copy(order))
def openOrder( # pylint: disable=invalid-name

View File

@ -121,6 +121,8 @@ class Exchange(Enum):
BINANCE = "BINANCE"
BYBIT = "BYBIT" # bybit.com
COINBASE = "COINBASE"
BITSTAMP = "BITSTAMP"
# Special Function
LOCAL = "LOCAL" # For local generated data