[Mod] Reimplement BitstampGateway

This commit is contained in:
vn.py 2019-09-24 23:00:50 +08:00
parent 6b753f58a5
commit 04ad1cb981

View File

@ -8,11 +8,14 @@ import hmac
import sys
import time
import re
import uuid
from copy import copy
from datetime import datetime, timedelta
from urllib.parse import urlencode
from typing import Dict, Set
from vnpy.api.rest import Request, RestClient
from vnpy.api.websocket import WebsocketClient
from time import sleep
from vnpy.trader.constant import (
Direction,
@ -77,21 +80,9 @@ TIMEDELTA_MAP = {
Interval.DAILY: timedelta(days=1),
}
bitstamp_symbols = set()
symbol_name_map = {}
SYMBOL_BITSTAMP2VT = {
'bchusd': "BCH/USD", 'bcheur': "BCH/EUR",
'xrpusd': "XRP/USD", 'ltcusd': "LTC/USD",
'eurusd': "EUR/USD", 'etheur': "ETH/EUR",
'xrpeur': "XRP/EUR", 'btceur': "BTC/EUR",
'ltcbtc': "LTC/BTC", 'btcusd': "BTC/USD",
'ltceur': "LTC/EUR", 'ethusd': "ETH/USD",
'xrpbtc': "XRP/BTC", 'bchbtc': "BCH/BTC",
'ethbtc': "ETH/BTC",
}
class BitstampGateway(BaseGateway):
"""
@ -176,7 +167,7 @@ class BitstampRestApi(RestClient):
self.key = ""
self.secret = ""
self.username = ""
self.username = "qxfe9863"
self.order_count = 1_000_000
self.connect_time = 0
@ -214,23 +205,47 @@ class BitstampRestApi(RestClient):
"""
Sign Bitstamp request.
"""
# Sign
nonce = int(round(time.time() * 1000000))
message = f"{nonce}{self.username}{self.key}"
if request.method == "GET":
return request
timestamp = str(int(round(time.time() * 1000)))
nonce = str(uuid.uuid4())
content_type = "application/x-www-form-urlencoded"
# Empty post data leads to API0020 error,
# so use this offset dict instead.
if not request.data:
request.data = {"offset": "1"}
payload_str = urlencode(request.data)
message = "BITSTAMP " + self.key + \
request.method + \
"www.bitstamp.net/api/v2" + \
request.path + \
"" + \
content_type + \
nonce + \
timestamp + \
"v2" + \
payload_str
message = message.encode("utf-8")
signature = hmac.new(
self.secret,
msg=message.encode('utf-8'),
msg=message,
digestmod=hashlib.sha256
).hexdigest().upper()
if request.method == "POST":
if request.data is None:
request.data = {}
request.data["key"] = self.key
request.data["nonce"] = nonce
request.data["signature"] = signature
request.headers = {
"X-Auth": "BITSTAMP " + self.key,
"X-Auth-Signature": signature,
"X-Auth-Nonce": nonce,
"X-Auth-Timestamp": timestamp,
"X-Auth-Version": "v2",
"Content-Type": content_type
}
print(payload_str)
request.data = payload_str
return request
@ -280,20 +295,18 @@ class BitstampRestApi(RestClient):
def on_query_account(self, data, request):
""""""
for d in data:
if "balance" in d:
currency = d.replace("_balance", "")
account = AccountData(
accountid=currency,
balance=float(data[currency + "_balance"]),
frozen=float(data[currency + "_reserved"]),
# available=float(data[currency + "_available"]),
gateway_name=self.gateway_name
)
for key in data.keys():
if "balance" not in key:
continue
currency = key.replace("_balance", "")
self.gateway.on_account(account)
self.on_position(data)
account = AccountData(
accountid=currency,
balance=float(data[currency + "_balance"]),
frozen=float(data[currency + "_reserved"]),
gateway_name=self.gateway_name
)
self.gateway.on_account(account)
def query_contract(self):
""""""
@ -306,8 +319,8 @@ class BitstampRestApi(RestClient):
def on_query_contract(self, data, request):
""""""
for d in data:
pricetick = 1 / pow(10, d["base_decimals"])
min_volume = float(d["minimum_order"])
pricetick = 1 / pow(10, d["counter_decimals"])
min_volume = 1 / pow(10, d["base_decimals"])
contract = ContractData(
symbol=d["url_symbol"],
@ -322,6 +335,8 @@ class BitstampRestApi(RestClient):
)
self.gateway.on_contract(contract)
symbol_name_map[contract.symbol] = contract.name
self.gateway.write_log("合约信息查询成功")
def cancel_order(self, req: CancelRequest):
@ -347,22 +362,21 @@ class BitstampRestApi(RestClient):
order = self.order_manager.get_order_with_local_orderid(local_orderid)
if "error" in data:
local_order.status = Status.REJECTED
order.status = Status.REJECTED
else:
local_order.status = Status.CANCELLED
order.status = Status.CANCELLED
self.gateway.write_log(f"委托撤单成功:{order.orderid})
self.gateway.write_log(f"委托撤单成功:{order.orderid}")
self.order_manager.on_order(order)
def on_cancel_order_error(self, data, request):
print(f"cancel_order {data}")
""""""
error_msg = data["error"]
self.gateway.write_log(f"撤单请求出错,信息:{error_msg}")
def send_order(self, req: OrderRequest):
""""""
local_orderid = self.order_manager.new_local_orderid()
order = req.create_order_data(
local_orderid,
@ -398,6 +412,7 @@ class BitstampRestApi(RestClient):
def on_send_order(self, data, request):
""""""
print("on_send", data)
order = request.extra
if ["reason"] in data:
@ -422,52 +437,13 @@ class BitstampRestApi(RestClient):
"""
Callback to handle request failed.
"""
print(f"on_failed {request}")
data = request.response.json()
reason = data["reason"]
code = data["code"]
reason = request.response.json()["reason"]
code = request.response.json()["code"]
msg = f"{request.path} 请求失败,状态码:{status_code},信息: {reason} code: {code}"
self.gateway.write_log(msg)
# print(f"reason {reason} code: {code}")
path = request.path
if code in ["API0004"]:
# nonce 错误重新执行此请求
if path == "user_transactions/":
self.user_transactions()
self.gateway.write_log("重新获取 Transactions 数据")
elif path == "query_order/all/":
self.query_order()
self.gateway.write_log("重新获取委托数据")
elif path == "cancel_order/":
self.cancel_order(request.extra)
self.gateway.write_log(f"重新提交{request.extra.orderid}撤单请求")
elif path == "balance/":
self.query_account()
self.gateway.write_log(f"重新获取balance撤单请求")
elif ("sell" in path) or ("buy" in path):
order_data = request.extra
# update order status
order_data.status = Status.REJECTED
self.order_manager.on_order(order_data)
req = OrderRequest(
symbol=order_data.symbol,
exchange=order_data.exchange,
direction=order_data.direction,
type=order_data.type,
volume=order_data.volume,
price=order_data.price,
offset=order_data.offset
)
self.send_order(req)
self.gateway.write_log("重新提交委托请求")
def on_error(
self, exception_type: type, exception_value: Exception, tb, request: Request
):
@ -493,128 +469,126 @@ class BitstampWebsocketApi(WebsocketClient):
self.gateway_name = gateway.gateway_name
self.order_manager = gateway.order_manager
self.ticks = {}
self.subscribed: Dict[str, SubscribeRequest] = {}
self.ticks: Dict[str, TickData] = {}
def connect(self, proxy_host: str, proxy_port: int):
""""""
self.init(WEBSOCKET_HOST, proxy_host, proxy_port)
self.start()
def post_connected(self, req: SubscribeRequest):
"""
测试默认连接btcusd
:return:
"""
sleep(1)
d = {
"event": "bts:subscribe",
"data": {
# "channel": "diff_order_book_" + req.symbol # "live_trades_btcusd"
"channel": "order_book_" + req.symbol # "live_trades_btcusd"
}
}
self.send_packet(d)
sleep(1)
d = {
"event": "bts:subscribe",
"data": {
"channel": "live_trades_" + req.symbol # "live_trades_btcusd"
}
}
self.send_packet(d)
def on_connected(self):
""""""
self.gateway.write_log("行情Websocket API连接刷新")
# 测试默认提交
# self.post_connected()
# self.subscribe(req)
self.gateway.write_log("Websocket API连接成功")
# Auto re-subscribe market data after reconnected
for req in self.subscribed.values():
self.subscribe(req)
def subscribe(self, req: SubscribeRequest):
""""""
# print(f"webocket subscribe req {req}")
# Create tick buf data
self.subscribed[req.symbol] = req
if not self._active:
return
tick = TickData(
symbol=req.symbol,
# name=symbol_name_map.get(req.symbol, ""),
name=req.symbol,
name=symbol_name_map.get(req.symbol, ""),
exchange=Exchange.BITSTAMP,
datetime=datetime.now(),
gateway_name=self.gateway_name,
)
self.ticks[req.symbol.lower()] = tick
# 默认使用btcusd 连接
self.post_connected(req)
for prefix in [
"order_book_",
"live_trades_",
"live_orders_"
]:
channel = f"{prefix}{req.symbol}"
d = {
"event": "bts:subscribe",
"data": {
"channel": channel
}
}
self.ticks[channel] = tick
self.send_packet(d)
def on_packet(self, packet):
""""""
# print(f"on_packet {packet}")
if "bts:request_reconnect" == packet["event"]:
# 重新连接
self.post_connected()
elif "data" == packet["event"]:
return self.on_market_depth(packet)
else:
self.on_data(packet)
event = packet["event"]
def on_data(self, packet):
if event == "trade":
self.on_market_trade(packet)
elif event == "data":
self.on_market_depth(packet)
elif "order_" in event:
self.on_market_order(packet)
elif event == "bts:request_reconnect":
self._disconnect() # Server requires to reconnect
def on_market_trade(self, packet):
""""""
if packet["event"] == "trade":
self.on_trade_update(packet)
# print("data : {}".format(packet))
def on_trade_update(self, packet):
channel = packet["channel"]
data = packet["data"]
symbol = str(re.sub("live_.*_", "", channel))
tick = self.ticks[symbol]
tick.last_price = float(data["price"])
def on_market_depth(self, packet):
"""行情深度推送 """
channel = packet["channel"]
data = packet["data"]
# symbol = str(re.sub("live_.*_","", channel)) #live order channel
symbol = str(re.sub("order_book_", "", channel))
# print(f"market_detph {data}")
tick = self.ticks[symbol]
tick.datetime = datetime.fromtimestamp(int(data['timestamp']) / 1000)
if symbol in self.ticks:
tick = self.ticks[symbol]
else:
tick = TickData(
symbol=symbol,
exchange=Exchange.BITFINEX,
name=symbol,
datetime=datetime.now(),
gateway_name=self.gateway_name,
)
self.ticks[symbol] = tick
if len(data) == 0:
print("请求数据为空")
return
bids = data["bids"]
for n in range(5):
# for n in range(len(bids)):
price, volume = bids[n]
tick.__setattr__("bid_price_" + str(n + 1), float(price))
tick.__setattr__("bid_volume_" + str(n + 1), float(volume))
asks = data["asks"]
# print(f"bids count {len(bids)} , asks count {len(asks)}")
for n in range(5):
# for n in range(len(asks)):
price, volume = asks[n]
tick.__setattr__("ask_price_" + str(n + 1), float(price))
tick.__setattr__("ask_volume_" + str(n + 1), float(volume))
tick = self.ticks[channel]
tick.last_price = data["price"]
tick.last_volume = data["amount"]
tick.datetime = datetime.fromtimestamp(int(data["timestamp"]))
self.gateway.on_tick(copy(tick))
buy_orderid = data["buy_order_id"]
sell_orderid = data["sell_order_id"]
for sys_orderid in [buy_orderid, sell_orderid]:
order = self.order_manager.get_order_with_sys_orderid(sys_orderid)
if order:
order.traded += data["amount"]
if order.traded < order.volume:
order.status = Status.PARTTRADED
else:
order.status = Status.ALLTRADED
self.order_manager.on_order(copy(order))
def on_market_depth(self, packet):
""""""
channel = packet["channel"]
data = packet["data"]
tick = self.ticks[channel]
tick.datetime = datetime.fromtimestamp(int(data["timestamp"]))
bids = data["bids"]
asks = data["asks"]
for n in range(5):
ix = n + 1
bid_price, bid_volume = bids[n]
tick.__setattr__(f"bid_price_{ix}", float(bid_price))
tick.__setattr__(f"bid_volume_{ix}", float(bid_volume))
ask_price, ask_volume = asks[n]
tick.__setattr__(f"ask_price_{ix}", float(ask_price))
tick.__setattr__(f"ask_volume_{ix}", float(ask_volume))
self.gateway.on_tick(copy(tick))
def on_market_order(self, packet):
""""""
event = packet["event"]
data = packet["data"]
if event != "order_deleted":
return
sys_orderid = data["id"]
order = self.order_manager.get_order_with_sys_orderid(sys_orderid)
if order:
order.status = Status.CANCELLED
self.order_manager.on_order(copy(order))