diff --git a/vnpy/gateway/bitstamp/bitstamp_gateway.py b/vnpy/gateway/bitstamp/bitstamp_gateway.py index 6fcd94ca..8e71adb8 100644 --- a/vnpy/gateway/bitstamp/bitstamp_gateway.py +++ b/vnpy/gateway/bitstamp/bitstamp_gateway.py @@ -8,11 +8,14 @@ import hmac import sys import time import re +import uuid from copy import copy from datetime import datetime, timedelta +from urllib.parse import urlencode +from typing import Dict, Set + from vnpy.api.rest import Request, RestClient from vnpy.api.websocket import WebsocketClient -from time import sleep from vnpy.trader.constant import ( Direction, @@ -77,21 +80,9 @@ TIMEDELTA_MAP = { Interval.DAILY: timedelta(days=1), } -bitstamp_symbols = set() symbol_name_map = {} -SYMBOL_BITSTAMP2VT = { - 'bchusd': "BCH/USD", 'bcheur': "BCH/EUR", - 'xrpusd': "XRP/USD", 'ltcusd': "LTC/USD", - 'eurusd': "EUR/USD", 'etheur': "ETH/EUR", - 'xrpeur': "XRP/EUR", 'btceur': "BTC/EUR", - 'ltcbtc': "LTC/BTC", 'btcusd': "BTC/USD", - 'ltceur': "LTC/EUR", 'ethusd': "ETH/USD", - 'xrpbtc': "XRP/BTC", 'bchbtc': "BCH/BTC", - 'ethbtc': "ETH/BTC", -} - class BitstampGateway(BaseGateway): """ @@ -176,7 +167,7 @@ class BitstampRestApi(RestClient): self.key = "" self.secret = "" - self.username = "" + self.username = "qxfe9863" self.order_count = 1_000_000 self.connect_time = 0 @@ -214,23 +205,47 @@ class BitstampRestApi(RestClient): """ Sign Bitstamp request. """ - # Sign - nonce = int(round(time.time() * 1000000)) - message = f"{nonce}{self.username}{self.key}" + if request.method == "GET": + return request + + timestamp = str(int(round(time.time() * 1000))) + nonce = str(uuid.uuid4()) + content_type = "application/x-www-form-urlencoded" + + # Empty post data leads to API0020 error, + # so use this offset dict instead. + if not request.data: + request.data = {"offset": "1"} + payload_str = urlencode(request.data) + + message = "BITSTAMP " + self.key + \ + request.method + \ + "www.bitstamp.net/api/v2" + \ + request.path + \ + "" + \ + content_type + \ + nonce + \ + timestamp + \ + "v2" + \ + payload_str + message = message.encode("utf-8") signature = hmac.new( self.secret, - msg=message.encode('utf-8'), + msg=message, digestmod=hashlib.sha256 ).hexdigest().upper() - if request.method == "POST": - if request.data is None: - request.data = {} - - request.data["key"] = self.key - request.data["nonce"] = nonce - request.data["signature"] = signature + request.headers = { + "X-Auth": "BITSTAMP " + self.key, + "X-Auth-Signature": signature, + "X-Auth-Nonce": nonce, + "X-Auth-Timestamp": timestamp, + "X-Auth-Version": "v2", + "Content-Type": content_type + } + print(payload_str) + request.data = payload_str return request @@ -280,20 +295,18 @@ class BitstampRestApi(RestClient): def on_query_account(self, data, request): """""" - for d in data: - if "balance" in d: - currency = d.replace("_balance", "") - account = AccountData( - accountid=currency, - balance=float(data[currency + "_balance"]), - frozen=float(data[currency + "_reserved"]), - # available=float(data[currency + "_available"]), - gateway_name=self.gateway_name - ) + for key in data.keys(): + if "balance" not in key: + continue + currency = key.replace("_balance", "") - self.gateway.on_account(account) - - self.on_position(data) + account = AccountData( + accountid=currency, + balance=float(data[currency + "_balance"]), + frozen=float(data[currency + "_reserved"]), + gateway_name=self.gateway_name + ) + self.gateway.on_account(account) def query_contract(self): """""" @@ -306,8 +319,8 @@ class BitstampRestApi(RestClient): def on_query_contract(self, data, request): """""" for d in data: - pricetick = 1 / pow(10, d["base_decimals"]) - min_volume = float(d["minimum_order"]) + pricetick = 1 / pow(10, d["counter_decimals"]) + min_volume = 1 / pow(10, d["base_decimals"]) contract = ContractData( symbol=d["url_symbol"], @@ -322,6 +335,8 @@ class BitstampRestApi(RestClient): ) self.gateway.on_contract(contract) + symbol_name_map[contract.symbol] = contract.name + self.gateway.write_log("合约信息查询成功") def cancel_order(self, req: CancelRequest): @@ -347,22 +362,21 @@ class BitstampRestApi(RestClient): order = self.order_manager.get_order_with_local_orderid(local_orderid) if "error" in data: - local_order.status = Status.REJECTED + order.status = Status.REJECTED else: - local_order.status = Status.CANCELLED + order.status = Status.CANCELLED - self.gateway.write_log(f"委托撤单成功:{order.orderid}) + self.gateway.write_log(f"委托撤单成功:{order.orderid}") self.order_manager.on_order(order) def on_cancel_order_error(self, data, request): - print(f"cancel_order {data}") + """""" error_msg = data["error"] self.gateway.write_log(f"撤单请求出错,信息:{error_msg}") def send_order(self, req: OrderRequest): """""" - local_orderid = self.order_manager.new_local_orderid() order = req.create_order_data( local_orderid, @@ -398,6 +412,7 @@ class BitstampRestApi(RestClient): def on_send_order(self, data, request): """""" + print("on_send", data) order = request.extra if ["reason"] in data: @@ -422,52 +437,13 @@ class BitstampRestApi(RestClient): """ Callback to handle request failed. """ - print(f"on_failed {request}") + data = request.response.json() + reason = data["reason"] + code = data["code"] - reason = request.response.json()["reason"] - code = request.response.json()["code"] msg = f"{request.path} 请求失败,状态码:{status_code},信息: {reason} code: {code}" self.gateway.write_log(msg) - # print(f"reason: {reason} code: {code}") - path = request.path - if code in ["API0004"]: - # nonce 错误重新执行此请求 - if path == "user_transactions/": - self.user_transactions() - self.gateway.write_log("重新获取 Transactions 数据") - - elif path == "query_order/all/": - self.query_order() - self.gateway.write_log("重新获取委托数据") - - elif path == "cancel_order/": - self.cancel_order(request.extra) - self.gateway.write_log(f"重新提交{request.extra.orderid}撤单请求") - elif path == "balance/": - self.query_account() - self.gateway.write_log(f"重新获取balance撤单请求") - - elif ("sell" in path) or ("buy" in path): - order_data = request.extra - - # update order status - order_data.status = Status.REJECTED - self.order_manager.on_order(order_data) - - req = OrderRequest( - symbol=order_data.symbol, - exchange=order_data.exchange, - direction=order_data.direction, - type=order_data.type, - volume=order_data.volume, - price=order_data.price, - offset=order_data.offset - ) - - self.send_order(req) - self.gateway.write_log("重新提交委托请求") - def on_error( self, exception_type: type, exception_value: Exception, tb, request: Request ): @@ -493,128 +469,126 @@ class BitstampWebsocketApi(WebsocketClient): self.gateway_name = gateway.gateway_name self.order_manager = gateway.order_manager - self.ticks = {} + self.subscribed: Dict[str, SubscribeRequest] = {} + self.ticks: Dict[str, TickData] = {} def connect(self, proxy_host: str, proxy_port: int): """""" self.init(WEBSOCKET_HOST, proxy_host, proxy_port) self.start() - def post_connected(self, req: SubscribeRequest): - """ - 测试默认连接btcusd - :return: - """ - sleep(1) - d = { - "event": "bts:subscribe", - "data": { - # "channel": "diff_order_book_" + req.symbol # "live_trades_btcusd" - "channel": "order_book_" + req.symbol # "live_trades_btcusd" - } - } - self.send_packet(d) - sleep(1) - - d = { - "event": "bts:subscribe", - "data": { - "channel": "live_trades_" + req.symbol # "live_trades_btcusd" - } - } - self.send_packet(d) - def on_connected(self): """""" - self.gateway.write_log("行情Websocket API连接刷新") - # 测试默认提交 - # self.post_connected() - # self.subscribe(req) + self.gateway.write_log("Websocket API连接成功") + + # Auto re-subscribe market data after reconnected + for req in self.subscribed.values(): + self.subscribe(req) def subscribe(self, req: SubscribeRequest): """""" - # print(f"webocket subscribe req {req}") - # Create tick buf data + self.subscribed[req.symbol] = req + if not self._active: + return tick = TickData( symbol=req.symbol, - # name=symbol_name_map.get(req.symbol, ""), - name=req.symbol, + name=symbol_name_map.get(req.symbol, ""), exchange=Exchange.BITSTAMP, datetime=datetime.now(), gateway_name=self.gateway_name, ) - self.ticks[req.symbol.lower()] = tick - # 默认使用btcusd 连接 - self.post_connected(req) + for prefix in [ + "order_book_", + "live_trades_", + "live_orders_" + ]: + channel = f"{prefix}{req.symbol}" + d = { + "event": "bts:subscribe", + "data": { + "channel": channel + } + } + self.ticks[channel] = tick + self.send_packet(d) def on_packet(self, packet): """""" - # print(f"on_packet {packet}") - if "bts:request_reconnect" == packet["event"]: - # 重新连接 - self.post_connected() - elif "data" == packet["event"]: - return self.on_market_depth(packet) - else: - self.on_data(packet) + event = packet["event"] - def on_data(self, packet): + if event == "trade": + self.on_market_trade(packet) + elif event == "data": + self.on_market_depth(packet) + elif "order_" in event: + self.on_market_order(packet) + elif event == "bts:request_reconnect": + self._disconnect() # Server requires to reconnect + + def on_market_trade(self, packet): """""" - if packet["event"] == "trade": - self.on_trade_update(packet) - # print("data : {}".format(packet)) - - def on_trade_update(self, packet): channel = packet["channel"] data = packet["data"] - symbol = str(re.sub("live_.*_", "", channel)) - tick = self.ticks[symbol] - tick.last_price = float(data["price"]) - def on_market_depth(self, packet): - """行情深度推送 """ - - channel = packet["channel"] - data = packet["data"] - # symbol = str(re.sub("live_.*_","", channel)) #live order channel - symbol = str(re.sub("order_book_", "", channel)) - # print(f"market_detph {data}") - tick = self.ticks[symbol] - tick.datetime = datetime.fromtimestamp(int(data['timestamp']) / 1000) - - if symbol in self.ticks: - tick = self.ticks[symbol] - else: - tick = TickData( - symbol=symbol, - exchange=Exchange.BITFINEX, - name=symbol, - datetime=datetime.now(), - gateway_name=self.gateway_name, - ) - - self.ticks[symbol] = tick - - if len(data) == 0: - print("请求数据为空") - return - - bids = data["bids"] - - for n in range(5): - # for n in range(len(bids)): - price, volume = bids[n] - tick.__setattr__("bid_price_" + str(n + 1), float(price)) - tick.__setattr__("bid_volume_" + str(n + 1), float(volume)) - - asks = data["asks"] - # print(f"bids count {len(bids)} , asks count {len(asks)}") - for n in range(5): - # for n in range(len(asks)): - price, volume = asks[n] - tick.__setattr__("ask_price_" + str(n + 1), float(price)) - tick.__setattr__("ask_volume_" + str(n + 1), float(volume)) + tick = self.ticks[channel] + tick.last_price = data["price"] + tick.last_volume = data["amount"] + tick.datetime = datetime.fromtimestamp(int(data["timestamp"])) self.gateway.on_tick(copy(tick)) + + buy_orderid = data["buy_order_id"] + sell_orderid = data["sell_order_id"] + + for sys_orderid in [buy_orderid, sell_orderid]: + order = self.order_manager.get_order_with_sys_orderid(sys_orderid) + + if order: + order.traded += data["amount"] + + if order.traded < order.volume: + order.status = Status.PARTTRADED + else: + order.status = Status.ALLTRADED + + self.order_manager.on_order(copy(order)) + + def on_market_depth(self, packet): + """""" + channel = packet["channel"] + data = packet["data"] + + tick = self.ticks[channel] + tick.datetime = datetime.fromtimestamp(int(data["timestamp"])) + + bids = data["bids"] + asks = data["asks"] + + for n in range(5): + ix = n + 1 + + bid_price, bid_volume = bids[n] + tick.__setattr__(f"bid_price_{ix}", float(bid_price)) + tick.__setattr__(f"bid_volume_{ix}", float(bid_volume)) + + ask_price, ask_volume = asks[n] + tick.__setattr__(f"ask_price_{ix}", float(ask_price)) + tick.__setattr__(f"ask_volume_{ix}", float(ask_volume)) + + self.gateway.on_tick(copy(tick)) + + def on_market_order(self, packet): + """""" + event = packet["event"] + data = packet["data"] + + if event != "order_deleted": + return + + sys_orderid = data["id"] + order = self.order_manager.get_order_with_sys_orderid(sys_orderid) + if order: + order.status = Status.CANCELLED + self.order_manager.on_order(copy(order))