From eedea7708bb82db1eee774a31414095e4067df5c Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Tue, 27 Aug 2019 23:18:51 +0800 Subject: [PATCH 01/10] [Mod] update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 4ed0dc18..9e55708c 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ vn.py是一套基于Python的开源量化交易系统开发框架,于2015年1月正式发布,在开源社区5年持续不断的贡献下一步步成长为全功能量化交易平台,目前国内外金融机构用户已经超过300家,包括:私募基金、证券自营和资管、期货资管和子公司、高校研究机构、自营交易公司、交易所、Token Fund等。 -**傻瓜式入门教程**已经在官方微信公众号[**vnpy-community**]全新上线,新手使用过程中有任何疑问看这个解决是最快的,后续会不断增加进阶经验、发布公告、活动报名等功能,请扫描下方二维码关注: +全新的《vn.py全实战进阶》在线课程,已经在官方微信公众号[**vnpy-community**]上线,50节内容覆盖从策略设计开发、参数回测优化,到最终实盘自动交易的完整CTA量化业务流程。购买请扫描下方二维码关注后,点击菜单栏的【进阶课程】按钮即可:

From 32639a1ed732c884ae35f28a099a62a8989e6860 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Tue, 3 Sep 2019 10:15:02 +0800 Subject: [PATCH 02/10] [Fix] close #2046 --- vnpy/gateway/ib/ib_gateway.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/vnpy/gateway/ib/ib_gateway.py b/vnpy/gateway/ib/ib_gateway.py index 4b8996cf..bcd2f65e 100644 --- a/vnpy/gateway/ib/ib_gateway.py +++ b/vnpy/gateway/ib/ib_gateway.py @@ -64,11 +64,14 @@ EXCHANGE_VT2IB = { EXCHANGE_IB2VT = {v: k for k, v in EXCHANGE_VT2IB.items()} STATUS_IB2VT = { - "Submitted": Status.NOTTRADED, - "Filled": Status.ALLTRADED, - "Cancelled": Status.CANCELLED, + "ApiPending": Status.SUBMITTING, "PendingSubmit": Status.SUBMITTING, "PreSubmitted": Status.NOTTRADED, + "Submitted": Status.NOTTRADED, + "ApiCancelled": Status.CANCELLED, + "Cancelled": Status.CANCELLED, + "Filled": Status.ALLTRADED, + "Inactive": Status.REJECTED, } PRODUCT_VT2IB = { @@ -353,9 +356,13 @@ class IbApi(EWrapper): orderid = str(orderId) order = self.orders.get(orderid, None) - order.status = STATUS_IB2VT[status] order.traded = filled + # To filter PendingCancel status + order_status = STATUS_IB2VT.get(status, None) + if order_status: + order.status = order_status + self.gateway.on_order(copy(order)) def openOrder( # pylint: disable=invalid-name From 0da8b784eb4ab9fa9411382507aa691bc0075a3d Mon Sep 17 00:00:00 2001 From: K Date: Tue, 10 Sep 2019 10:19:59 +0800 Subject: [PATCH 03/10] adding bitstamp gateway from Wudi --- vnpy/gateway/bitstamp/__init__.py | 1 + vnpy/gateway/bitstamp/bitstamp_gateway.py | 1281 +++++++++++++++++++++ 2 files changed, 1282 insertions(+) create mode 100644 vnpy/gateway/bitstamp/__init__.py create mode 100644 vnpy/gateway/bitstamp/bitstamp_gateway.py diff --git a/vnpy/gateway/bitstamp/__init__.py b/vnpy/gateway/bitstamp/__init__.py new file mode 100644 index 00000000..575a904b --- /dev/null +++ b/vnpy/gateway/bitstamp/__init__.py @@ -0,0 +1 @@ +from .bitstamp_gateway import BitstampGateway diff --git a/vnpy/gateway/bitstamp/bitstamp_gateway.py b/vnpy/gateway/bitstamp/bitstamp_gateway.py new file mode 100644 index 00000000..f6308bed --- /dev/null +++ b/vnpy/gateway/bitstamp/bitstamp_gateway.py @@ -0,0 +1,1281 @@ +""" +Author: Wudi +bitstamp合约接口 +""" + +import hashlib +import hmac +import sys +import time +import re +import hmac +import hashlib +from copy import copy +from datetime import datetime, timedelta +from urllib.parse import urlencode +from vnpy.api.rest import Request, RestClient +from vnpy.api.websocket import WebsocketClient +from vnpy.event import Event +from urllib import parse +from time import sleep + +from vnpy.trader.constant import ( + Direction, + Exchange, + OrderType, + Product, + Status, + Interval +) +from vnpy.trader.gateway import BaseGateway, LocalOrderManager +from vnpy.trader.object import ( + TickData, + OrderData, + TradeData, + BarData, + PositionData, + AccountData, + ContractData, + OrderRequest, + CancelRequest, + SubscribeRequest, + HistoryRequest +) + +from vnpy.trader.event import EVENT_TIMER + + +REST_HOST = "https://www.bitstamp.net/api/v2/" +REST_HOST_V1 = "https://www.bitstamp.net/api/" +WEBSOCKET_HOST = "wss://ws.bitstamp.net" +HISTORY_HOST = "https://api.blockchain.info/" + +STATUS_BITSTAMP2VT = { + "ACTIVE": Status.NOTTRADED, + "PARTIALLY FILLED": Status.PARTTRADED, + "EXECUTED": Status.ALLTRADED, + "CANCELED": Status.CANCELLED, +} + +ORDERTYPE_VT2BITSTAMP = { + OrderType.LIMIT: "EXCHANGE LIMIT", + OrderType.MARKET: "EXCHANGE MARKET", +} + +DIRECTION_VT2BITSTAMP = { + Direction.LONG: "Buy", + Direction.SHORT: "Sell", +} + +DIRECTION_BITSTAMP2VT = { + "0": Direction.LONG, + "1": Direction.SHORT, +} + +INTERVAL_VT2BITSTAMP = { + Interval.MINUTE: "60", + Interval.HOUR: "3600", + Interval.DAILY: "86400", +} + +TIMEDELTA_MAP = { + Interval.MINUTE: timedelta(minutes=1), + Interval.HOUR: timedelta(hours=1), + Interval.DAILY: timedelta(days=1), +} + +bitstamp_symbols = set() + +symbol_name_map = {} + +SYMBOL_BITSTAMP2VT = { + 'bchusd': "BCH/USD", 'bcheur': "BCH/EUR", + 'xrpusd': "XRP/USD", 'ltcusd': "LTC/USD", + 'eurusd': "EUR/USD", 'etheur': "ETH/EUR", + 'xrpeur': "XRP/EUR", 'btceur': "BTC/EUR", + 'ltcbtc': "LTC/BTC", 'btcusd': "BTC/USD", + 'ltceur': "LTC/EUR", 'ethusd': "ETH/USD", + 'xrpbtc': "XRP/BTC", 'bchbtc': "BCH/BTC", + 'ethbtc': "ETH/BTC", +} + + +class BitstampGateway(BaseGateway): + """ + VN Trader Gateway for BITSTAMP connection. + """ + + default_setting = { + "key": "", + "secret": "", + "username": "", + "session": 3, + "proxy_host": "127.0.0.1", + "proxy_port": 1087, + } + + exchanges = [Exchange.BITSTAMP] + + def __init__(self, event_engine): + """Constructor""" + super(BitstampGateway, self).__init__(event_engine, "BITSTAMP") + + self.order_manager = LocalOrderManager(self) + + self.rest_api = BitstampRestApi(self) + self.rest_api_v1 = BitstampRestApiV1(self) + self.ws_data_api = BitstampDataWebsocketApi(self) + self.history_api = BitstampHistoryApi(self) + self.orders = {} + + def connect(self, setting: dict): + """""" + key = setting["key"] + secret = setting["secret"] + username = setting["username"] + session = setting["session"] + proxy_host = setting["proxy_host"] + proxy_port = setting["proxy_port"] + + self.rest_api.connect(key, secret, username, + session, proxy_host, proxy_port) + self.ws_data_api.connect(proxy_host, proxy_port) + self.history_api.connect(key, session, proxy_host, proxy_port) + self.rest_api_v1.connect( + key, secret, username, session, proxy_host, proxy_port) + + # 循环监听账户balance + self.init_query() + + def subscribe(self, req: SubscribeRequest): + """""" + # self.ws_api.subscribe(req) + self.ws_data_api.subscribe(req) + + def send_order(self, req: OrderRequest): + """""" + return self.rest_api.send_order(req) + # return self.ws_data_api.send_order(req) + + def cancel_order(self, req: CancelRequest): + """""" + # print(f"show cancel req {req}") + return self.rest_api.cancel_order(req) + + def get_order(self, orderid: str): + """""" + return self.orders.get(orderid, None) + + def check_order(self): + ids = self.rest_api.get_submit_ids() + # print(f" show check order ids {ids}") + for i in ids: + + self.rest_api_v1.order_status(i) + data = self.rest_api_v1.get_callback_data(i) + self.rest_api.update_trade(i, data) + + + def query_account(self): + """""" + self.rest_api.query_account_balance() + + def query_position(self): + """""" + pass + + def query_history(self, req: HistoryRequest): + """""" + return self.history_api.query_history(req) + + def close(self): + """""" + self.rest_api.stop() + self.rest_api_v1.stop() + self.ws_data_api.stop() + self.history_api.stop() + + def process_timer_event(self, event: Event): + """""" + self.count += 1 + if self.count < 5: + return + + self.count = 0 + self.check_order() + + def init_query(self): + """""" + self.count = 0 + self.event_engine.register(EVENT_TIMER, self.process_timer_event) + + +class BitstampRestApiV1(RestClient): + """Bitstap public api""" + + def __init__(self, gateway: BaseGateway): + super(BitstampRestApiV1, self).__init__() + self.gateway = gateway + self.gateway_name = gateway.gateway_name + self.order_manager = gateway.order_manager + + self.key = "" + self.secret = "" + self.username = "" + self.order_count = 1_000_000 + self.connect_time = 0 + self.ticks = {} + self.push_callback_data = {} + + def connect( + self, + key: str, + secret: str, + username: str, + session: int, + proxy_host: str, + proxy_port: int, + ): + """ + Initialize connection to REST server. + """ + self.key = key + self.secret = secret.encode() + self.username = username + self.connect_time = ( + int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count + ) + + self.init(REST_HOST_V1, proxy_host, proxy_port) + self.start(session) + + self.gateway.write_log("REST API V1 启动成功") + # self.order_status() + # self.query_account_balance() + # self.cancel_all_orders() + + # self.order_status() + def get_nonce(self): + """""" + nonce = getattr(self, '_nonce', 0) + print(f"nonce ====== {nonce}") + if nonce: + nonce += 1 + # If the unix time is greater though, use that instead (helps low + # concurrency multi-threaded apps always call with the largest nonce). + self._nonce = max(int(time.time()), nonce) + return self._nonce + + def default_data(self, *args, **kwargs): + """ + Generate a one-time signature and other data required to send a secure + POST request to the Bitstamp API. + """ + data = {} + data['key'] = self.key + # nonce = self.get_nonce() + nonce = str(int(round(time.time() * 1000))) + + msg = str(nonce) + self.username + self.key + print(f"usrname {self.username} nonce {nonce}") + print(f"msg {msg.encode('utf-8')}") + signature = hmac.new( + self.secret, + msg=msg.encode('utf-8'), + digestmod=hashlib.sha256).hexdigest().upper() + data['signature'] = signature + data['nonce'] = nonce + return data + + def sign(self, request): + """ + Generate Bitstamp signature. + """ + # Sign + # nonce = str(int(round(time.time() * 1000000))) + + if request.params: + query = urlencode(request.params) + path = request.path + "?" + query + else: + path = request.path + + if request.data: + request.data = urlencode(request.data) + else: + request.data = "" + + # print(request) + + # msg = request.method + \ + # "/api/{}{}".format(path, request.data) + # print(msg) + headers = {"Content-Type": "application/x-www-form-urlencoded"} + + request.headers = headers + return request + + def order_status(self, id): + """""" + + data = self.default_data() + data["id"] = id + # data["id"] = 4002801278 + # print(f"ordre status show order {self.order_manager.__dict__}") + self.add_request( + method="POST", + path="order_status/", + # params=data, + data=data, + callback=self.on_order_status, + extra=id + ) + + def on_order_status(self, data, request): + """""" + self.push_callback_data = {} + + print(f"v1 On order status {data}, {data.keys()}") + print(f" error in data.keys() {'error' in data.keys()} ") + if "error" in data.keys(): + error_data = data["error"] + msg = f"{request.path} 请求失败,信息: {error_data}" + print(msg) + if error_data == 'Invalid nonce': + self.order_status(request.extra) + return + + self.push_callback_data[str(request.extra)] = data + + def get_callback_data(self, id): + # print(f"get info {id} {self.push_callback_data}") + if str(id) in self.push_callback_data.keys(): + return self.push_callback_data[str(id)] + else: + return {"status": "error", "reason": "id not find"} + + def on_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback to handler request exception. + """ + msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" + self.gateway.write_log(msg) + + sys.stderr.write( + self.exception_detail(exception_type, exception_value, tb, request) + ) + + +class BitstampHistoryApi(RestClient): + + def __init__(self, gateway: BaseGateway): + super(BitstampHistoryApi, self).__init__() + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + def connect( + self, + key: str, + # secret: str, + # username: str, + session: int, + proxy_host: str, + proxy_port: int, + ): + """ + Initialize connection to REST server. + """ + self.key = key + self.init(HISTORY_HOST, proxy_host, proxy_port) + self.start(session) + + self.gateway.write_log("REST history API启动成功") + # self.order_status() + # self.query_account_balance() + # self.cancel_all_orders() + + def query_history(self, req: HistoryRequest): + """ + https://www.bitstamp.net/ajax/tradeview/price-history/?step=1800¤cy_pair=BTC%2FUSD&start_datetime=2019-06-13T00:02:17.000Z&end_datetime=2019-06-29T13:02:15.000Z + :params {step currency_pair start_datetime end_datetime + "https://api.blockchain.info/price/bar-series?exchange=bitstamp&base=BTC"e=USD&start=1523111200&scale=60&end=1523439439" + :params { "exchange" : exchange, "base" : base, "quote" : quote, "start" : start timestamp, "scale": 60 86400... , "end" : timestamp} + + HistoryRequest = { + symbol: str + exchange: Exchange + start: datetime + end: datetime = None + interval: Interval = None + } + """ + print(f"History {req}") + + history = [] + limit = 1000 + step = int(INTERVAL_VT2BITSTAMP[req.interval]) + symbol = SYMBOL_BITSTAMP2VT[req.symbol] + base, quote = symbol.split("/") + start_time = int(datetime.timestamp(req.start)) + path = "price/bar-series" + + while True: + if req.end: + # print(f"start time {start_time}") + end_time = start_time + \ + int(INTERVAL_VT2BITSTAMP[req.interval]) * 1000 + # print(f"end time {end_time}") + + # Create query params + params = { + "exchange": "bitstamp", + "base": base, + "quote": quote, + "start": start_time, + "end": end_time, + "scale": step + } + """ + btcusd.BITSTAMP + """ + # Get response from server + resp = self.request( + "GET", + path, + params=params, + ) + + # Break if request failed with other status code + if resp.status_code // 100 != 2: + msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}" + self.gateway.write_log(msg) + break + else: + data = resp.json() + if not data: + msg = f"获取历史数据为空,开始时间:{start_time}" + self.gateway.write_log(msg) + break + + buf = [] + + for l in data: + + dt = datetime.fromtimestamp(l["start"]) + bar = BarData( + symbol=req.symbol, + exchange=req.exchange, + datetime=dt, + interval=req.interval, + volume=l["volume"], + open_price=l["open"], + high_price=l["high"], + low_price=l["low"], + close_price=l["close"], + gateway_name=self.gateway_name + ) + buf.append(bar) + + history.extend(buf) + + begin = buf[0].datetime + end = buf[-1].datetime + msg = f"获取历史数据成功,{req.symbol} - {req.interval.value},{begin} - {end}" + self.gateway.write_log(msg) + + # # Break if total data count less than 1000 (latest date collected) + # if len(data) < limit: + # break + + if int(datetime.timestamp(req.end)) < end_time: + break + + # Update start time + start_time = int(datetime.timestamp(end)) + print(f"update start time {start_time}") + # start_time = datetime.timestamp(bar.datetime) + int(TIMEDELTA_MAP[req.interval]) + + return history + + +class BitstampRestApi(RestClient): + """ + Bitstamp REST API + """ + + def __init__(self, gateway: BaseGateway): + """""" + super(BitstampRestApi, self).__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + self.order_manager = gateway.order_manager + + self.key = "" + self.secret = "" + self.username = "" + self.order_count = 1_000_000 + self.connect_time = 0 + self.ticks = {} + self.trade_id = 1_000_000 + self.submit_ids = [] # sys_id + self.trades = [] + self.position = {} + + def connect( + self, + key: str, + secret: str, + username: str, + session: int, + proxy_host: str, + proxy_port: int, + ): + """ + Initialize connection to REST server. + """ + self.key = key + self.secret = secret.encode() + self.username = username + self.connect_time = ( + int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count + ) + + self.init(REST_HOST, proxy_host, proxy_port) + # self.api_v1.connect(key, secret, username, session, proxy_host, proxy_port) + + self.start(session) + + self.gateway.write_log("REST API启动成功") + self.query_contract() + self.query_account_balance() + # self.query_ticker() + self.open_orders() + self.user_transactions() + + def get_signature(self): + nonce = str(int(round(time.time() * 1000000))) + message = nonce + self.username + self.key + print(message) + signature = hmac.new( + self.secret, + msg=message.encode('utf-8'), + digestmod=hashlib.sha256 + ).hexdigest().upper() + print(signature) + + return signature + + def get_nonce(self): + """""" + nonce = getattr(self, '_nonce', 0) + print(f"nonce ====== {nonce}") + if nonce: + nonce += 1 + # If the unix time is greater though, use that instead (helps low + # concurrency multi-threaded apps always call with the largest nonce). + self._nonce = max(int(time.time()), nonce) + return self._nonce + + def default_data(self, *args, **kwargs): + """ + Generate a one-time signature and other data required to send a secure + POST request to the Bitstamp API. + """ + data = {} + data['key'] = self.key + # nonce = self.get_nonce() + nonce = str(int(round(time.time() * 1000))) + + msg = str(nonce) + self.username + self.key + print(f"usrname {self.username} nonce {nonce}") + print(f"msg {msg.encode('utf-8')}") + signature = hmac.new( + self.secret, + msg=msg.encode('utf-8'), + digestmod=hashlib.sha256).hexdigest().upper() + data['signature'] = signature + data['nonce'] = nonce + return data + + def sign(self, request): + """ + Generate BitfineX signature. + """ + # Sign + # nonce = str(int(round(time.time() * 1000000))) + + if request.params: + query = urlencode(request.params) + path = request.path + "?" + query + else: + path = request.path + + if request.data: + request.data = urlencode(request.data) + else: + request.data = "" + + # print(request) + # + # msg = request.method + \ + # "/api/v2/{}{}".format(path, request.data) + # print(msg) + headers = {"Content-Type": "application/x-www-form-urlencoded"} + + request.headers = headers + return request + + def user_transactions(self, offset=0, limit=100, descending=True, + symbol=""): + """账户转账记录""" + data = self.default_data() + data["offset"] = offset + data["limit"] = limit + data["sort"] = 'desc' if descending else 'asc' + + path = "user_transactions/" + + self.add_request( + method="POST", + path=path + symbol, + data=data, + callback=self.on_user_transactions, + ) + + def on_user_transactions(self, data, request): + """ + Transaction type: 0 - deposit; 1 - withdrawal; 2 - market trade; 14 - sub account transfer. + :param data: + :param request: + :return: + """ + # print(f"transaction {data}") + if not data: + self.user_transactions() + self.gateway.write_log("无 transcations 数据重新请求") + return + + self.on_trade(data) + + + + def on_trade(self, data): + """""" + # self.trade_id += 1 + + for d in data: + if int(d["type"]) == 2: # 交易记录 + btc = d["btc"] + usd = d["usd"] + btcusd = d["btc_usd"] + tradetime = d["datetime"] + fee = d["fee"] + tradeid = d["id"] + orderid = d["order_id"] + + if tradeid in self.trades: + # print(f"交易记录已存在") + continue + + if float(btc) > 0: + direction = Direction.LONG + else: + direction = Direction.SHORT + symbol = "BTC/USD" if float(btc) != 0 else "" + + trade = TradeData( + symbol=symbol, + exchange=Exchange.BITSTAMP, + orderid=orderid, + tradeid=tradeid, + direction=direction, + price=btcusd, + volume=abs(float(btc)), + time=tradetime, + gateway_name=self.gateway_name, + ) + self.trades.append(tradeid) + self.gateway.on_trade(trade) + + def update_trade(self, id, data): + """ + check_order 中使用 + 更新order成交状态 + """ + # print(f"update trade data {data}") + if "error" in data.keys(): + self.gateway.write_log(f"Update Trade Error, Info: {data}") + return + + order = self.order_manager.get_order_with_sys_orderid(id) + local_id = self.order_manager.get_local_orderid(id) + status = data["status"] + if status == "Open": + print(f"LocalId {local_id} still Submiting ") + return + + if status == "Finished": + + btc_volume = sum(float(x["btc"]) for x in data["transactions"]) + usd_volume = sum(float(x["usd"]) for x in data["transactions"]) + + if btc_volume == order.volume: + order.status = Status.ALLTRADED + order.traded = btc_volume + self.gateway.write_log(f"委托成交完成:成交量 {btc_volume} 本地单号 {local_id} 系统单号 {id}") + self.order_manager.on_order(order) + """保留 后续验证是否需要提交到on order信息中""" + self.gateway.on_order(order) + + """提交结束,移除检查订单id""" + self.submit_ids.remove(str(id)) + + else: + order.status = Status.PARTTRADED + order.traded = btc_volume + self.gateway.write_log(f"委托部分成交:成交量 {btc_volume} 本地单号 {local_id} 系统单号 {id}") + self.order_manager.on_order(order) + """保留 后续验证是否需要提交到on order信息中""" + self.gateway.on_order(order) + + self.query_account_balance() + """更新线上交易记录""" + self.user_transactions() + + + def open_orders(self, symbol="all"): + """ + id Transaction ID. + datetime Date and time. + type Type: 0 - buy; 1 - sell. + price Price. + amount Amount. + currency_pair (if all currency pairs) + :param base: + :param quote: + :return: + """ + + path = "open_orders/" + symbol + "/" + data = self.default_data() + self.add_request( + method="POST", + path=path, + data=data, + callback=self.on_open_order, + ) + + def on_open_order(self, data, request): + """获取委托订单""" + + if type(data) == "dict": + if self.check_error(data, "查询委托"): + return + for d in data: + sys_orderid = d["id"] + local_orderid = self.order_manager.get_local_orderid(sys_orderid) + direction = DIRECTION_BITSTAMP2VT[d["type"]] + if sys_orderid not in self.submit_ids: + self.submit_ids.append(sys_orderid) + + print(f"on open ids {d}") + order = OrderData( + # orderid=d["id"], + orderid=local_orderid, + symbol=d["currency_pair"], + exchange=Exchange.BITSTAMP, + price=float(d["price"]), + volume=float(d["amount"]), + traded=float(0), + direction=direction, + time=d["datetime"], + gateway_name=self.gateway_name, + ) + + self.order_manager.on_order(order) + """保留 后续验证是否需要提交到on order信息中""" + self.gateway.on_order(order) + + self.gateway.write_log("委托信息查询成功") + + def cancel_order(self, req: CancelRequest): + """""" + path = "cancel_order/" + data = self.default_data() + + # 测试 nonce 报错使用 + # if req.exchange != None: + # data["nonce"] = 1234123 + + sys_orderid = self.order_manager.get_sys_orderid(req.orderid) + data["id"] = sys_orderid + + # print(f"request cancel order id {req.orderid}, {sys_orderid}") + # print(self.order_manager) + + self.add_request( + method="POST", + path=path, + data=data, + callback=self.on_cancel_order, + extra=req + ) + + def on_cancel_order(self, data, request): + """""" + cancel_request = request.extra + local_orderid = cancel_request.orderid + local_order = self.order_manager.get_order_with_local_orderid( + local_orderid) + sys_orderid = self.order_manager.get_sys_orderid(local_orderid) + if self.check_error(data, "撤单"): + local_order.status = Status.REJECTED + else: + local_order.status = Status.CANCELLED + + self.gateway.write_log( + f"委托撤单成功:本地单号 {local_order.orderid} 系统单号 {sys_orderid}") + + # 更新本地 order 数据 + self.order_manager.on_order(local_order) + # 更新 gate order 数据 + self.gateway.on_order(copy(local_order)) + # 更新账户资金数据 + self.query_account_balance() + + # # 移除提交状态id list + # print(f"cancel order {sys_orderid}") + # print(f"submit_ids {self.submit_ids}") + if str(sys_orderid) in self.submit_ids: + self.submit_ids.remove(str(sys_orderid)) + else: + self.gateway.write_log(f"本地订单号不存在 {self.submit_ids}") + + def on_cancel_order_error(self, data, request): + print(f"cancel_order {data}") + error_msg = data["error"] + self.gateway.write_log(f"撤单请求出错,信息:{error_msg}") + + def cancel_all_orders(self): + """path ="https://www.bitstamp.net/api/cancel_all_orders/""" + pass + + def send_order(self, req: OrderRequest): + """ + only limit order + :param req: + :return: + """ + print(f"send order req {req}") + data = self.default_data() + data["amount"] = req.volume + data["price"] = req.price + symbol = req.symbol + side = DIRECTION_VT2BITSTAMP[req.direction].lower() + # print(f"in gate_way {symbol}, direction {req.direction}, \ + # dir info {DIRECTION_VT2BITSTAMP[req.direction]}, \ + # type{req.type, ORDERTYPE_VT2BITSTAMP[req.type]}, \ + # bitstmap_type {req}") + + # data = {'amount': 0.0001, 'price': 11000} + path = side + "/" + symbol + "/" + local_orderid = self.order_manager.new_local_orderid() + order = req.create_order_data( + local_orderid, + self.gateway_name + ) + order.time = datetime.now().strftime("%H:%M:%S") + + self.add_request( + method="POST", + path=path, + data=data, + callback=self.on_send_order, + extra=order, + # on_error=self.on_send_order_error, + ) + self.order_manager.on_order(order) + return order.vt_orderid + + def on_send_order(self, data, request): + """""" + local_order = request.extra + local_order_id = local_order.orderid + # order = self.order_manager.get_order_with_local_orderid(local_order_id) + # print(f"[in on send order] get local order{order}") + if self.check_error(data, "委托"): + local_order.status = Status.REJECTED + self.order_manager.on_order(local_order) + return + + # local_order.orderid = data["id"] + # 提交后切换为未成交 , 直到trade 返回获取成交后切换为成交状态 + local_order.status = Status.NOTTRADED + self.order_manager.update_orderid_map(local_order_id, data["id"]) + self.order_manager.on_order(local_order) + self.gateway.on_order(local_order) + self.gateway.write_log( + f"委托提交成功: 系统单号 {data['id']} :本地单号 {local_order_id}") + + # 添加已经提交订单监控列表 + print(f"新增订单 {data['id']}") + if data["id"] not in self.submit_ids: + self.submit_ids.append(data["id"]) + + # 更新资金数据 + self.query_account_balance() + + def get_submit_ids(self): + """更新订单状态""" + return self.submit_ids + + def find_id_by_status(self, data): + pass + + def check_error(self, data: dict, func: str = ""): + """""" + if "status" in data.keys(): + error_msg = data["reason"]["__all__"] + self.gateway.write_log(f"{func}请求出错,信息:{error_msg}") + return True + else: + return False + + def on_send_order_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback when sending order caused exception. + """ + # Record exception if not ConnectionError + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) + + def query_account_balance(self): + """""" + path = "balance/" + data = self.default_data() + self.add_request( + method="POST", + path=path, + data=data, + callback=self.on_query_account_balance + ) + + def on_query_account_balance(self, data, request): + """""" + # print(f"on_balance {data}") + + for d in data: + if "balance" in d: + currency = d.replace("_balance", "") + account = AccountData( + accountid=currency, + balance=float(data[currency + "_balance"]), + frozen=float(data[currency + "_reserved"]), + # available=float(data[currency + "_available"]), + gateway_name=self.gateway_name + ) + + self.gateway.on_account(account) + + self.on_position(data) + + def on_position(self, data): + # print(data) + position = PositionData( + symbol="btcusd", + exchange=Exchange.BITSTAMP, + direction=Direction.NET, + # volume = float(data.get("btc_balance", 0)), + # notional = float(data.get("usd_balance", 0)), + notional=float(data.get("btc_balance", 0)), + # notional=float(data.get("usd_balance", 0)), + #last_notional = round( (d.get("currentQty", 0.0) / d.get("lastPrice", 0.0)) if ( not d.get("lastPrice", 0.0) and not d.get("currentQty", 0.0)) else 0, 8), + gateway_name=self.gateway_name, + + ) + self.gateway.on_position(position) + + def query_contract(self): + """ + 查询合约信息 + :return: + """ + self.add_request( + method="get", + path="trading-pairs-info/", + callback=self.on_query_contract, + ) + + def query_ticker(self, symbol): + """查询ticker 信息""" + # print(f"qurey ticker{symbol}") + self.add_request( + method="get", + path="ticker/" + symbol, + callback=self.on_query_ticker, + ) + + def on_query_ticker(self, data, request): + + symbol = request.path.replace("ticker/", "") + ticker_rest_data = data + print(symbol) + if symbol in self.ticks: + tick = self.ticks[symbol] + else: + tick = TickData( + symbol=symbol, + exchange=Exchange.BITFINEX, + name=symbol, + datetime=datetime.now(), + gateway_name=self.gateway_name, + ) + + self.ticks[symbol] = tick + + tick.volume = float(ticker_rest_data['volume']) + tick.high_price = float(ticker_rest_data['high']) + tick.low_price = float(ticker_rest_data['low']) + tick.open_price = float(ticker_rest_data['open']) + + self.gateway.on_tick(copy(tick)) + + def on_query_contract(self, data, request): + """""" + for d in data: + contract = ContractData( + symbol=d["url_symbol"], + exchange=Exchange.BITSTAMP, + name=d["name"].upper(), + product=Product.SPOT, + size=1, + pricetick=1 / pow(10, d["base_decimals"]), + min_volume=float(d["minimum_order"].split(" ")[0]), + history_data=True, + gateway_name=self.gateway_name, + ) + self.gateway.on_contract(contract) + + bitstamp_symbols.add(contract.symbol) + symbol_name_map[contract.symbol] = contract.name + + self.gateway.write_log("交易对查询成功") + + def on_failed(self, status_code: int, request: Request): + """ + Callback to handle request failed. + """ + print(f"on_failed {request}") + + reason = request.response.json()["reason"] + code = request.response.json()["code"] + msg = f"{request.path} 请求失败,状态码:{status_code},信息: {reason} code: {code}" + self.gateway.write_log(msg) + + # print(f"reason: {reason} code: {code}") + path = request.path + if code in ["API0004"]: + # nonce 错误重新执行此请求 + if path == "user_transactions/": + self.user_transactions() + self.gateway.write_log("重新获取 Transactions 数据") + + elif path == "open_orders/all/": + self.open_orders() + self.gateway.write_log("重新获取委托数据") + + elif path == "cancel_order/": + self.cancel_order(request.extra) + self.gateway.write_log(f"重新提交{request.extra.orderid}撤单请求") + elif path == "balance/": + self.query_account_balance() + self.gateway.write_log(f"重新获取balance撤单请求") + + elif ("sell" in path) or ("buy" in path): + order_data = request.extra + + # update order status + order_data.status = Status.REJECTED + self.order_manager.on_order(order_data) + + req = OrderRequest( + symbol=order_data.symbol, + exchange=order_data.exchange, + direction=order_data.direction, + type=order_data.type, + volume=order_data.volume, + price=order_data.price, + offset=order_data.offset + ) + + self.send_order(req) + self.gateway.write_log("重新提交委托请求") + + def on_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback to handler request exception. + """ + msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" + self.gateway.write_log(msg) + + sys.stderr.write( + self.exception_detail(exception_type, exception_value, tb, request) + ) + + def buy_market_order(self, amount, base="btc", quote="usd"): + """""" + pass + + def buy_limit_order(self, amount, base="btc", quote="usd"): + """""" + pass + + def sell_limit_order(self, amount, price, base="btc", quote="usd", limit_price=None, ioc_order=False): + """""" + pass + + def sell_market_order(self, amount, base="btc", quote="usd"): + """""" + pass + + +class BitstampDataWebsocketApi(WebsocketClient): + def __init__(self, gateway): + """""" + super(BitstampDataWebsocketApi, self).__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.ticks = {} + + def connect(self, proxy_host: str, proxy_port: int): + """""" + self.init(WEBSOCKET_HOST, proxy_host, proxy_port) + self.start() + + def post_connected(self, req: SubscribeRequest): + """ + 测试默认连接btcusd + :return: + """ + sleep(1) + d = { + "event": "bts:subscribe", + "data": { + # "channel": "diff_order_book_" + req.symbol # "live_trades_btcusd" + "channel": "order_book_" + req.symbol # "live_trades_btcusd" + } + } + self.send_packet(d) + sleep(1) + + d = { + "event": "bts:subscribe", + "data": { + "channel": "live_trades_" + req.symbol # "live_trades_btcusd" + } + } + self.send_packet(d) + + def on_connected(self): + """""" + self.gateway.write_log("行情Websocket API连接刷新") + # 测试默认提交 + # self.post_connected() + # self.subscribe(req) + + def subscribe(self, req: SubscribeRequest): + """""" + # print(f"webocket subscribe req {req}") + # Create tick buf data + + tick = TickData( + symbol=req.symbol, + # name=symbol_name_map.get(req.symbol, ""), + name=req.symbol, + exchange=Exchange.BITSTAMP, + datetime=datetime.now(), + gateway_name=self.gateway_name, + ) + self.ticks[req.symbol.lower()] = tick + + # 默认使用btcusd 连接 + self.post_connected(req) + + def on_packet(self, packet): + """""" + # print(f"on_packet {packet}") + if "bts:request_reconnect" == packet["event"]: + # 重新连接 + self.post_connected() + elif "data" == packet["event"]: + return self.on_market_depth(packet) + else: + self.on_data(packet) + + def on_data(self, packet): + """""" + if packet["event"] == "trade": + self.on_trade_update(packet) + # print("data : {}".format(packet)) + + def on_trade_update(self, packet): + channel = packet["channel"] + data = packet["data"] + symbol = str(re.sub("live_.*_", "", channel)) + tick = self.ticks[symbol] + tick.last_price = float(data["price"]) + + def on_market_depth(self, packet): + """行情深度推送 """ + + channel = packet["channel"] + data = packet["data"] + # symbol = str(re.sub("live_.*_","", channel)) #live order channel + symbol = str(re.sub("order_book_", "", channel)) + # print(f"market_detph {data}") + tick = self.ticks[symbol] + tick.datetime = datetime.fromtimestamp(int(data['timestamp']) / 1000) + + if symbol in self.ticks: + tick = self.ticks[symbol] + else: + tick = TickData( + symbol=symbol, + exchange=Exchange.BITFINEX, + name=symbol, + datetime=datetime.now(), + gateway_name=self.gateway_name, + ) + + self.ticks[symbol] = tick + + if len(data) == 0: + print("请求数据为空") + return + + bids = data["bids"] + + for n in range(5): + # for n in range(len(bids)): + price, volume = bids[n] + tick.__setattr__("bid_price_" + str(n + 1), float(price)) + tick.__setattr__("bid_volume_" + str(n + 1), float(volume)) + + asks = data["asks"] + # print(f"bids count {len(bids)} , asks count {len(asks)}") + for n in range(5): + # for n in range(len(asks)): + price, volume = asks[n] + tick.__setattr__("ask_price_" + str(n + 1), float(price)) + tick.__setattr__("ask_volume_" + str(n + 1), float(volume)) + + self.gateway.on_tick(copy(tick)) From 9dd329e4d549f2c7663654e526a8e9a7c18e8fce Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Wed, 11 Sep 2019 20:59:49 +0800 Subject: [PATCH 04/10] [Mod] flake8 code quality improve --- vnpy/gateway/bitstamp/bitstamp_gateway.py | 37 +++++++++-------------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/vnpy/gateway/bitstamp/bitstamp_gateway.py b/vnpy/gateway/bitstamp/bitstamp_gateway.py index f6308bed..234b36a7 100644 --- a/vnpy/gateway/bitstamp/bitstamp_gateway.py +++ b/vnpy/gateway/bitstamp/bitstamp_gateway.py @@ -8,15 +8,12 @@ import hmac import sys import time import re -import hmac -import hashlib from copy import copy from datetime import datetime, timedelta from urllib.parse import urlencode from vnpy.api.rest import Request, RestClient from vnpy.api.websocket import WebsocketClient from vnpy.event import Event -from urllib import parse from time import sleep from vnpy.trader.constant import ( @@ -175,7 +172,6 @@ class BitstampGateway(BaseGateway): data = self.rest_api_v1.get_callback_data(i) self.rest_api.update_trade(i, data) - def query_account(self): """""" self.rest_api.query_account_balance() @@ -294,11 +290,11 @@ class BitstampRestApiV1(RestClient): # Sign # nonce = str(int(round(time.time() * 1000000))) - if request.params: - query = urlencode(request.params) - path = request.path + "?" + query - else: - path = request.path + # if request.params: + # query = urlencode(request.params) + # path = request.path + "?" + query + # else: + # path = request.path if request.data: request.data = urlencode(request.data) @@ -414,7 +410,7 @@ class BitstampHistoryApi(RestClient): print(f"History {req}") history = [] - limit = 1000 + # limit = 1000 step = int(INTERVAL_VT2BITSTAMP[req.interval]) symbol = SYMBOL_BITSTAMP2VT[req.symbol] base, quote = symbol.split("/") @@ -607,11 +603,11 @@ class BitstampRestApi(RestClient): # Sign # nonce = str(int(round(time.time() * 1000000))) - if request.params: - query = urlencode(request.params) - path = request.path + "?" + query - else: - path = request.path + # if request.params: + # query = urlencode(request.params) + # path = request.path + "?" + query + # else: + # path = request.path if request.data: request.data = urlencode(request.data) @@ -660,8 +656,6 @@ class BitstampRestApi(RestClient): self.on_trade(data) - - def on_trade(self, data): """""" # self.trade_id += 1 @@ -669,10 +663,10 @@ class BitstampRestApi(RestClient): for d in data: if int(d["type"]) == 2: # 交易记录 btc = d["btc"] - usd = d["usd"] + # usd = d["usd"] btcusd = d["btc_usd"] tradetime = d["datetime"] - fee = d["fee"] + # fee = d["fee"] tradeid = d["id"] orderid = d["order_id"] @@ -720,7 +714,7 @@ class BitstampRestApi(RestClient): if status == "Finished": btc_volume = sum(float(x["btc"]) for x in data["transactions"]) - usd_volume = sum(float(x["usd"]) for x in data["transactions"]) + # usd_volume = sum(float(x["usd"]) for x in data["transactions"]) if btc_volume == order.volume: order.status = Status.ALLTRADED @@ -745,7 +739,6 @@ class BitstampRestApi(RestClient): """更新线上交易记录""" self.user_transactions() - def open_orders(self, symbol="all"): """ id Transaction ID. @@ -994,7 +987,7 @@ class BitstampRestApi(RestClient): # notional = float(data.get("usd_balance", 0)), notional=float(data.get("btc_balance", 0)), # notional=float(data.get("usd_balance", 0)), - #last_notional = round( (d.get("currentQty", 0.0) / d.get("lastPrice", 0.0)) if ( not d.get("lastPrice", 0.0) and not d.get("currentQty", 0.0)) else 0, 8), + # last_notional = round( (d.get("currentQty", 0.0) / d.get("lastPrice", 0.0)) if ( not d.get("lastPrice", 0.0) and not d.get("currentQty", 0.0)) else 0, 8), gateway_name=self.gateway_name, ) From 6b753f58a55b842038800c8dccdba759441c2a5a Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Wed, 11 Sep 2019 22:20:56 +0800 Subject: [PATCH 05/10] [Mod]simplify bitstamp gateway --- vnpy/gateway/bitstamp/bitstamp_gateway.py | 954 ++++------------------ vnpy/trader/constant.py | 2 + 2 files changed, 152 insertions(+), 804 deletions(-) diff --git a/vnpy/gateway/bitstamp/bitstamp_gateway.py b/vnpy/gateway/bitstamp/bitstamp_gateway.py index 234b36a7..6fcd94ca 100644 --- a/vnpy/gateway/bitstamp/bitstamp_gateway.py +++ b/vnpy/gateway/bitstamp/bitstamp_gateway.py @@ -10,10 +10,8 @@ import time import re from copy import copy from datetime import datetime, timedelta -from urllib.parse import urlencode from vnpy.api.rest import Request, RestClient from vnpy.api.websocket import WebsocketClient -from vnpy.event import Event from time import sleep from vnpy.trader.constant import ( @@ -42,10 +40,8 @@ from vnpy.trader.object import ( from vnpy.trader.event import EVENT_TIMER -REST_HOST = "https://www.bitstamp.net/api/v2/" -REST_HOST_V1 = "https://www.bitstamp.net/api/" +REST_HOST = "https://www.bitstamp.net/api/v2" WEBSOCKET_HOST = "wss://ws.bitstamp.net" -HISTORY_HOST = "https://api.blockchain.info/" STATUS_BITSTAMP2VT = { "ACTIVE": Status.NOTTRADED, @@ -108,22 +104,19 @@ class BitstampGateway(BaseGateway): "username": "", "session": 3, "proxy_host": "127.0.0.1", - "proxy_port": 1087, + "proxy_port": 1080, } exchanges = [Exchange.BITSTAMP] def __init__(self, event_engine): """Constructor""" - super(BitstampGateway, self).__init__(event_engine, "BITSTAMP") + super().__init__(event_engine, "BITSTAMP") self.order_manager = LocalOrderManager(self) self.rest_api = BitstampRestApi(self) - self.rest_api_v1 = BitstampRestApiV1(self) - self.ws_data_api = BitstampDataWebsocketApi(self) - self.history_api = BitstampHistoryApi(self) - self.orders = {} + self.ws_api = BitstampWebsocketApi(self) def connect(self, setting: dict): """""" @@ -136,45 +129,23 @@ class BitstampGateway(BaseGateway): self.rest_api.connect(key, secret, username, session, proxy_host, proxy_port) - self.ws_data_api.connect(proxy_host, proxy_port) - self.history_api.connect(key, session, proxy_host, proxy_port) - self.rest_api_v1.connect( - key, secret, username, session, proxy_host, proxy_port) - - # 循环监听账户balance - self.init_query() + self.ws_api.connect(proxy_host, proxy_port) def subscribe(self, req: SubscribeRequest): """""" - # self.ws_api.subscribe(req) - self.ws_data_api.subscribe(req) + self.ws_api.subscribe(req) def send_order(self, req: OrderRequest): """""" return self.rest_api.send_order(req) - # return self.ws_data_api.send_order(req) def cancel_order(self, req: CancelRequest): """""" - # print(f"show cancel req {req}") - return self.rest_api.cancel_order(req) - - def get_order(self, orderid: str): - """""" - return self.orders.get(orderid, None) - - def check_order(self): - ids = self.rest_api.get_submit_ids() - # print(f" show check order ids {ids}") - for i in ids: - - self.rest_api_v1.order_status(i) - data = self.rest_api_v1.get_callback_data(i) - self.rest_api.update_trade(i, data) + self.rest_api.cancel_order(req) def query_account(self): """""" - self.rest_api.query_account_balance() + pass def query_position(self): """""" @@ -182,318 +153,12 @@ class BitstampGateway(BaseGateway): def query_history(self, req: HistoryRequest): """""" - return self.history_api.query_history(req) + return self.rest_api.query_history(req) def close(self): """""" self.rest_api.stop() - self.rest_api_v1.stop() - self.ws_data_api.stop() - self.history_api.stop() - - def process_timer_event(self, event: Event): - """""" - self.count += 1 - if self.count < 5: - return - - self.count = 0 - self.check_order() - - def init_query(self): - """""" - self.count = 0 - self.event_engine.register(EVENT_TIMER, self.process_timer_event) - - -class BitstampRestApiV1(RestClient): - """Bitstap public api""" - - def __init__(self, gateway: BaseGateway): - super(BitstampRestApiV1, self).__init__() - self.gateway = gateway - self.gateway_name = gateway.gateway_name - self.order_manager = gateway.order_manager - - self.key = "" - self.secret = "" - self.username = "" - self.order_count = 1_000_000 - self.connect_time = 0 - self.ticks = {} - self.push_callback_data = {} - - def connect( - self, - key: str, - secret: str, - username: str, - session: int, - proxy_host: str, - proxy_port: int, - ): - """ - Initialize connection to REST server. - """ - self.key = key - self.secret = secret.encode() - self.username = username - self.connect_time = ( - int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count - ) - - self.init(REST_HOST_V1, proxy_host, proxy_port) - self.start(session) - - self.gateway.write_log("REST API V1 启动成功") - # self.order_status() - # self.query_account_balance() - # self.cancel_all_orders() - - # self.order_status() - def get_nonce(self): - """""" - nonce = getattr(self, '_nonce', 0) - print(f"nonce ====== {nonce}") - if nonce: - nonce += 1 - # If the unix time is greater though, use that instead (helps low - # concurrency multi-threaded apps always call with the largest nonce). - self._nonce = max(int(time.time()), nonce) - return self._nonce - - def default_data(self, *args, **kwargs): - """ - Generate a one-time signature and other data required to send a secure - POST request to the Bitstamp API. - """ - data = {} - data['key'] = self.key - # nonce = self.get_nonce() - nonce = str(int(round(time.time() * 1000))) - - msg = str(nonce) + self.username + self.key - print(f"usrname {self.username} nonce {nonce}") - print(f"msg {msg.encode('utf-8')}") - signature = hmac.new( - self.secret, - msg=msg.encode('utf-8'), - digestmod=hashlib.sha256).hexdigest().upper() - data['signature'] = signature - data['nonce'] = nonce - return data - - def sign(self, request): - """ - Generate Bitstamp signature. - """ - # Sign - # nonce = str(int(round(time.time() * 1000000))) - - # if request.params: - # query = urlencode(request.params) - # path = request.path + "?" + query - # else: - # path = request.path - - if request.data: - request.data = urlencode(request.data) - else: - request.data = "" - - # print(request) - - # msg = request.method + \ - # "/api/{}{}".format(path, request.data) - # print(msg) - headers = {"Content-Type": "application/x-www-form-urlencoded"} - - request.headers = headers - return request - - def order_status(self, id): - """""" - - data = self.default_data() - data["id"] = id - # data["id"] = 4002801278 - # print(f"ordre status show order {self.order_manager.__dict__}") - self.add_request( - method="POST", - path="order_status/", - # params=data, - data=data, - callback=self.on_order_status, - extra=id - ) - - def on_order_status(self, data, request): - """""" - self.push_callback_data = {} - - print(f"v1 On order status {data}, {data.keys()}") - print(f" error in data.keys() {'error' in data.keys()} ") - if "error" in data.keys(): - error_data = data["error"] - msg = f"{request.path} 请求失败,信息: {error_data}" - print(msg) - if error_data == 'Invalid nonce': - self.order_status(request.extra) - return - - self.push_callback_data[str(request.extra)] = data - - def get_callback_data(self, id): - # print(f"get info {id} {self.push_callback_data}") - if str(id) in self.push_callback_data.keys(): - return self.push_callback_data[str(id)] - else: - return {"status": "error", "reason": "id not find"} - - def on_error( - self, exception_type: type, exception_value: Exception, tb, request: Request - ): - """ - Callback to handler request exception. - """ - msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" - self.gateway.write_log(msg) - - sys.stderr.write( - self.exception_detail(exception_type, exception_value, tb, request) - ) - - -class BitstampHistoryApi(RestClient): - - def __init__(self, gateway: BaseGateway): - super(BitstampHistoryApi, self).__init__() - self.gateway = gateway - self.gateway_name = gateway.gateway_name - - def connect( - self, - key: str, - # secret: str, - # username: str, - session: int, - proxy_host: str, - proxy_port: int, - ): - """ - Initialize connection to REST server. - """ - self.key = key - self.init(HISTORY_HOST, proxy_host, proxy_port) - self.start(session) - - self.gateway.write_log("REST history API启动成功") - # self.order_status() - # self.query_account_balance() - # self.cancel_all_orders() - - def query_history(self, req: HistoryRequest): - """ - https://www.bitstamp.net/ajax/tradeview/price-history/?step=1800¤cy_pair=BTC%2FUSD&start_datetime=2019-06-13T00:02:17.000Z&end_datetime=2019-06-29T13:02:15.000Z - :params {step currency_pair start_datetime end_datetime - "https://api.blockchain.info/price/bar-series?exchange=bitstamp&base=BTC"e=USD&start=1523111200&scale=60&end=1523439439" - :params { "exchange" : exchange, "base" : base, "quote" : quote, "start" : start timestamp, "scale": 60 86400... , "end" : timestamp} - - HistoryRequest = { - symbol: str - exchange: Exchange - start: datetime - end: datetime = None - interval: Interval = None - } - """ - print(f"History {req}") - - history = [] - # limit = 1000 - step = int(INTERVAL_VT2BITSTAMP[req.interval]) - symbol = SYMBOL_BITSTAMP2VT[req.symbol] - base, quote = symbol.split("/") - start_time = int(datetime.timestamp(req.start)) - path = "price/bar-series" - - while True: - if req.end: - # print(f"start time {start_time}") - end_time = start_time + \ - int(INTERVAL_VT2BITSTAMP[req.interval]) * 1000 - # print(f"end time {end_time}") - - # Create query params - params = { - "exchange": "bitstamp", - "base": base, - "quote": quote, - "start": start_time, - "end": end_time, - "scale": step - } - """ - btcusd.BITSTAMP - """ - # Get response from server - resp = self.request( - "GET", - path, - params=params, - ) - - # Break if request failed with other status code - if resp.status_code // 100 != 2: - msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}" - self.gateway.write_log(msg) - break - else: - data = resp.json() - if not data: - msg = f"获取历史数据为空,开始时间:{start_time}" - self.gateway.write_log(msg) - break - - buf = [] - - for l in data: - - dt = datetime.fromtimestamp(l["start"]) - bar = BarData( - symbol=req.symbol, - exchange=req.exchange, - datetime=dt, - interval=req.interval, - volume=l["volume"], - open_price=l["open"], - high_price=l["high"], - low_price=l["low"], - close_price=l["close"], - gateway_name=self.gateway_name - ) - buf.append(bar) - - history.extend(buf) - - begin = buf[0].datetime - end = buf[-1].datetime - msg = f"获取历史数据成功,{req.symbol} - {req.interval.value},{begin} - {end}" - self.gateway.write_log(msg) - - # # Break if total data count less than 1000 (latest date collected) - # if len(data) < limit: - # break - - if int(datetime.timestamp(req.end)) < end_time: - break - - # Update start time - start_time = int(datetime.timestamp(end)) - print(f"update start time {start_time}") - # start_time = datetime.timestamp(bar.datetime) + int(TIMEDELTA_MAP[req.interval]) - - return history + self.ws_api.stop() class BitstampRestApi(RestClient): @@ -512,13 +177,9 @@ class BitstampRestApi(RestClient): self.key = "" self.secret = "" self.username = "" + self.order_count = 1_000_000 self.connect_time = 0 - self.ticks = {} - self.trade_id = 1_000_000 - self.submit_ids = [] # sys_id - self.trades = [] - self.position = {} def connect( self, @@ -535,248 +196,63 @@ class BitstampRestApi(RestClient): self.key = key self.secret = secret.encode() self.username = username + self.connect_time = ( int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count ) self.init(REST_HOST, proxy_host, proxy_port) - # self.api_v1.connect(key, secret, username, session, proxy_host, proxy_port) - self.start(session) self.gateway.write_log("REST API启动成功") - self.query_contract() - self.query_account_balance() - # self.query_ticker() - self.open_orders() - self.user_transactions() - def get_signature(self): - nonce = str(int(round(time.time() * 1000000))) - message = nonce + self.username + self.key - print(message) + self.query_contract() + self.query_account() + self.query_order() + + def sign(self, request: Request): + """ + Sign Bitstamp request. + """ + # Sign + nonce = int(round(time.time() * 1000000)) + message = f"{nonce}{self.username}{self.key}" + signature = hmac.new( self.secret, msg=message.encode('utf-8'), digestmod=hashlib.sha256 ).hexdigest().upper() - print(signature) - return signature + if request.method == "POST": + if request.data is None: + request.data = {} - def get_nonce(self): - """""" - nonce = getattr(self, '_nonce', 0) - print(f"nonce ====== {nonce}") - if nonce: - nonce += 1 - # If the unix time is greater though, use that instead (helps low - # concurrency multi-threaded apps always call with the largest nonce). - self._nonce = max(int(time.time()), nonce) - return self._nonce + request.data["key"] = self.key + request.data["nonce"] = nonce + request.data["signature"] = signature - def default_data(self, *args, **kwargs): - """ - Generate a one-time signature and other data required to send a secure - POST request to the Bitstamp API. - """ - data = {} - data['key'] = self.key - # nonce = self.get_nonce() - nonce = str(int(round(time.time() * 1000))) - - msg = str(nonce) + self.username + self.key - print(f"usrname {self.username} nonce {nonce}") - print(f"msg {msg.encode('utf-8')}") - signature = hmac.new( - self.secret, - msg=msg.encode('utf-8'), - digestmod=hashlib.sha256).hexdigest().upper() - data['signature'] = signature - data['nonce'] = nonce - return data - - def sign(self, request): - """ - Generate BitfineX signature. - """ - # Sign - # nonce = str(int(round(time.time() * 1000000))) - - # if request.params: - # query = urlencode(request.params) - # path = request.path + "?" + query - # else: - # path = request.path - - if request.data: - request.data = urlencode(request.data) - else: - request.data = "" - - # print(request) - # - # msg = request.method + \ - # "/api/v2/{}{}".format(path, request.data) - # print(msg) - headers = {"Content-Type": "application/x-www-form-urlencoded"} - - request.headers = headers return request - def user_transactions(self, offset=0, limit=100, descending=True, - symbol=""): - """账户转账记录""" - data = self.default_data() - data["offset"] = offset - data["limit"] = limit - data["sort"] = 'desc' if descending else 'asc' - - path = "user_transactions/" - - self.add_request( - method="POST", - path=path + symbol, - data=data, - callback=self.on_user_transactions, - ) - - def on_user_transactions(self, data, request): - """ - Transaction type: 0 - deposit; 1 - withdrawal; 2 - market trade; 14 - sub account transfer. - :param data: - :param request: - :return: - """ - # print(f"transaction {data}") - if not data: - self.user_transactions() - self.gateway.write_log("无 transcations 数据重新请求") - return - - self.on_trade(data) - - def on_trade(self, data): + def query_order(self): """""" - # self.trade_id += 1 + path = "/open_orders/all/" - for d in data: - if int(d["type"]) == 2: # 交易记录 - btc = d["btc"] - # usd = d["usd"] - btcusd = d["btc_usd"] - tradetime = d["datetime"] - # fee = d["fee"] - tradeid = d["id"] - orderid = d["order_id"] - - if tradeid in self.trades: - # print(f"交易记录已存在") - continue - - if float(btc) > 0: - direction = Direction.LONG - else: - direction = Direction.SHORT - symbol = "BTC/USD" if float(btc) != 0 else "" - - trade = TradeData( - symbol=symbol, - exchange=Exchange.BITSTAMP, - orderid=orderid, - tradeid=tradeid, - direction=direction, - price=btcusd, - volume=abs(float(btc)), - time=tradetime, - gateway_name=self.gateway_name, - ) - self.trades.append(tradeid) - self.gateway.on_trade(trade) - - def update_trade(self, id, data): - """ - check_order 中使用 - 更新order成交状态 - """ - # print(f"update trade data {data}") - if "error" in data.keys(): - self.gateway.write_log(f"Update Trade Error, Info: {data}") - return - - order = self.order_manager.get_order_with_sys_orderid(id) - local_id = self.order_manager.get_local_orderid(id) - status = data["status"] - if status == "Open": - print(f"LocalId {local_id} still Submiting ") - return - - if status == "Finished": - - btc_volume = sum(float(x["btc"]) for x in data["transactions"]) - # usd_volume = sum(float(x["usd"]) for x in data["transactions"]) - - if btc_volume == order.volume: - order.status = Status.ALLTRADED - order.traded = btc_volume - self.gateway.write_log(f"委托成交完成:成交量 {btc_volume} 本地单号 {local_id} 系统单号 {id}") - self.order_manager.on_order(order) - """保留 后续验证是否需要提交到on order信息中""" - self.gateway.on_order(order) - - """提交结束,移除检查订单id""" - self.submit_ids.remove(str(id)) - - else: - order.status = Status.PARTTRADED - order.traded = btc_volume - self.gateway.write_log(f"委托部分成交:成交量 {btc_volume} 本地单号 {local_id} 系统单号 {id}") - self.order_manager.on_order(order) - """保留 后续验证是否需要提交到on order信息中""" - self.gateway.on_order(order) - - self.query_account_balance() - """更新线上交易记录""" - self.user_transactions() - - def open_orders(self, symbol="all"): - """ - id Transaction ID. - datetime Date and time. - type Type: 0 - buy; 1 - sell. - price Price. - amount Amount. - currency_pair (if all currency pairs) - :param base: - :param quote: - :return: - """ - - path = "open_orders/" + symbol + "/" - data = self.default_data() self.add_request( method="POST", path=path, - data=data, - callback=self.on_open_order, + callback=self.on_query_order ) - def on_open_order(self, data, request): + def on_query_order(self, data, request): """获取委托订单""" - - if type(data) == "dict": - if self.check_error(data, "查询委托"): - return for d in data: sys_orderid = d["id"] local_orderid = self.order_manager.get_local_orderid(sys_orderid) - direction = DIRECTION_BITSTAMP2VT[d["type"]] - if sys_orderid not in self.submit_ids: - self.submit_ids.append(sys_orderid) - print(f"on open ids {d}") + direction = DIRECTION_BITSTAMP2VT[d["type"]] + order = OrderData( - # orderid=d["id"], orderid=local_orderid, symbol=d["currency_pair"], exchange=Exchange.BITSTAMP, @@ -789,179 +265,21 @@ class BitstampRestApi(RestClient): ) self.order_manager.on_order(order) - """保留 后续验证是否需要提交到on order信息中""" - self.gateway.on_order(order) self.gateway.write_log("委托信息查询成功") - def cancel_order(self, req: CancelRequest): + def query_account(self): """""" - path = "cancel_order/" - data = self.default_data() - - # 测试 nonce 报错使用 - # if req.exchange != None: - # data["nonce"] = 1234123 - - sys_orderid = self.order_manager.get_sys_orderid(req.orderid) - data["id"] = sys_orderid - - # print(f"request cancel order id {req.orderid}, {sys_orderid}") - # print(self.order_manager) + path = "/balance/" self.add_request( method="POST", path=path, - data=data, - callback=self.on_cancel_order, - extra=req + callback=self.on_query_account ) - def on_cancel_order(self, data, request): + def on_query_account(self, data, request): """""" - cancel_request = request.extra - local_orderid = cancel_request.orderid - local_order = self.order_manager.get_order_with_local_orderid( - local_orderid) - sys_orderid = self.order_manager.get_sys_orderid(local_orderid) - if self.check_error(data, "撤单"): - local_order.status = Status.REJECTED - else: - local_order.status = Status.CANCELLED - - self.gateway.write_log( - f"委托撤单成功:本地单号 {local_order.orderid} 系统单号 {sys_orderid}") - - # 更新本地 order 数据 - self.order_manager.on_order(local_order) - # 更新 gate order 数据 - self.gateway.on_order(copy(local_order)) - # 更新账户资金数据 - self.query_account_balance() - - # # 移除提交状态id list - # print(f"cancel order {sys_orderid}") - # print(f"submit_ids {self.submit_ids}") - if str(sys_orderid) in self.submit_ids: - self.submit_ids.remove(str(sys_orderid)) - else: - self.gateway.write_log(f"本地订单号不存在 {self.submit_ids}") - - def on_cancel_order_error(self, data, request): - print(f"cancel_order {data}") - error_msg = data["error"] - self.gateway.write_log(f"撤单请求出错,信息:{error_msg}") - - def cancel_all_orders(self): - """path ="https://www.bitstamp.net/api/cancel_all_orders/""" - pass - - def send_order(self, req: OrderRequest): - """ - only limit order - :param req: - :return: - """ - print(f"send order req {req}") - data = self.default_data() - data["amount"] = req.volume - data["price"] = req.price - symbol = req.symbol - side = DIRECTION_VT2BITSTAMP[req.direction].lower() - # print(f"in gate_way {symbol}, direction {req.direction}, \ - # dir info {DIRECTION_VT2BITSTAMP[req.direction]}, \ - # type{req.type, ORDERTYPE_VT2BITSTAMP[req.type]}, \ - # bitstmap_type {req}") - - # data = {'amount': 0.0001, 'price': 11000} - path = side + "/" + symbol + "/" - local_orderid = self.order_manager.new_local_orderid() - order = req.create_order_data( - local_orderid, - self.gateway_name - ) - order.time = datetime.now().strftime("%H:%M:%S") - - self.add_request( - method="POST", - path=path, - data=data, - callback=self.on_send_order, - extra=order, - # on_error=self.on_send_order_error, - ) - self.order_manager.on_order(order) - return order.vt_orderid - - def on_send_order(self, data, request): - """""" - local_order = request.extra - local_order_id = local_order.orderid - # order = self.order_manager.get_order_with_local_orderid(local_order_id) - # print(f"[in on send order] get local order{order}") - if self.check_error(data, "委托"): - local_order.status = Status.REJECTED - self.order_manager.on_order(local_order) - return - - # local_order.orderid = data["id"] - # 提交后切换为未成交 , 直到trade 返回获取成交后切换为成交状态 - local_order.status = Status.NOTTRADED - self.order_manager.update_orderid_map(local_order_id, data["id"]) - self.order_manager.on_order(local_order) - self.gateway.on_order(local_order) - self.gateway.write_log( - f"委托提交成功: 系统单号 {data['id']} :本地单号 {local_order_id}") - - # 添加已经提交订单监控列表 - print(f"新增订单 {data['id']}") - if data["id"] not in self.submit_ids: - self.submit_ids.append(data["id"]) - - # 更新资金数据 - self.query_account_balance() - - def get_submit_ids(self): - """更新订单状态""" - return self.submit_ids - - def find_id_by_status(self, data): - pass - - def check_error(self, data: dict, func: str = ""): - """""" - if "status" in data.keys(): - error_msg = data["reason"]["__all__"] - self.gateway.write_log(f"{func}请求出错,信息:{error_msg}") - return True - else: - return False - - def on_send_order_error( - self, exception_type: type, exception_value: Exception, tb, request: Request - ): - """ - Callback when sending order caused exception. - """ - # Record exception if not ConnectionError - if not issubclass(exception_type, ConnectionError): - self.on_error(exception_type, exception_value, tb, request) - - def query_account_balance(self): - """""" - path = "balance/" - data = self.default_data() - self.add_request( - method="POST", - path=path, - data=data, - callback=self.on_query_account_balance - ) - - def on_query_account_balance(self, data, request): - """""" - # print(f"on_balance {data}") - for d in data: if "balance" in d: currency = d.replace("_balance", "") @@ -977,87 +295,128 @@ class BitstampRestApi(RestClient): self.on_position(data) - def on_position(self, data): - # print(data) - position = PositionData( - symbol="btcusd", - exchange=Exchange.BITSTAMP, - direction=Direction.NET, - # volume = float(data.get("btc_balance", 0)), - # notional = float(data.get("usd_balance", 0)), - notional=float(data.get("btc_balance", 0)), - # notional=float(data.get("usd_balance", 0)), - # last_notional = round( (d.get("currentQty", 0.0) / d.get("lastPrice", 0.0)) if ( not d.get("lastPrice", 0.0) and not d.get("currentQty", 0.0)) else 0, 8), - gateway_name=self.gateway_name, - - ) - self.gateway.on_position(position) - def query_contract(self): - """ - 查询合约信息 - :return: - """ + """""" self.add_request( - method="get", - path="trading-pairs-info/", + method="GET", + path="/trading-pairs-info/", callback=self.on_query_contract, ) - def query_ticker(self, symbol): - """查询ticker 信息""" - # print(f"qurey ticker{symbol}") - self.add_request( - method="get", - path="ticker/" + symbol, - callback=self.on_query_ticker, - ) - - def on_query_ticker(self, data, request): - - symbol = request.path.replace("ticker/", "") - ticker_rest_data = data - print(symbol) - if symbol in self.ticks: - tick = self.ticks[symbol] - else: - tick = TickData( - symbol=symbol, - exchange=Exchange.BITFINEX, - name=symbol, - datetime=datetime.now(), - gateway_name=self.gateway_name, - ) - - self.ticks[symbol] = tick - - tick.volume = float(ticker_rest_data['volume']) - tick.high_price = float(ticker_rest_data['high']) - tick.low_price = float(ticker_rest_data['low']) - tick.open_price = float(ticker_rest_data['open']) - - self.gateway.on_tick(copy(tick)) - def on_query_contract(self, data, request): """""" for d in data: + pricetick = 1 / pow(10, d["base_decimals"]) + min_volume = float(d["minimum_order"]) + contract = ContractData( symbol=d["url_symbol"], exchange=Exchange.BITSTAMP, - name=d["name"].upper(), + name=d["name"], product=Product.SPOT, size=1, - pricetick=1 / pow(10, d["base_decimals"]), - min_volume=float(d["minimum_order"].split(" ")[0]), + pricetick=pricetick, + min_volume=min_volume, history_data=True, gateway_name=self.gateway_name, ) self.gateway.on_contract(contract) - bitstamp_symbols.add(contract.symbol) - symbol_name_map[contract.symbol] = contract.name + self.gateway.write_log("合约信息查询成功") - self.gateway.write_log("交易对查询成功") + def cancel_order(self, req: CancelRequest): + """""" + path = "/cancel_order/" + + sys_orderid = self.order_manager.get_sys_orderid(req.orderid) + + data = {"id": sys_orderid} + + self.add_request( + method="POST", + path=path, + data=data, + callback=self.on_cancel_order, + extra=req + ) + + def on_cancel_order(self, data, request): + """""" + cancel_request = request.extra + local_orderid = cancel_request.orderid + order = self.order_manager.get_order_with_local_orderid(local_orderid) + + if "error" in data: + local_order.status = Status.REJECTED + else: + local_order.status = Status.CANCELLED + + self.gateway.write_log(f"委托撤单成功:{order.orderid}) + + self.order_manager.on_order(order) + + def on_cancel_order_error(self, data, request): + print(f"cancel_order {data}") + error_msg = data["error"] + self.gateway.write_log(f"撤单请求出错,信息:{error_msg}") + + def send_order(self, req: OrderRequest): + """""" + + local_orderid = self.order_manager.new_local_orderid() + order = req.create_order_data( + local_orderid, + self.gateway_name + ) + order.time = datetime.now().strftime("%H:%M:%S") + + data = { + "amount": req.volume, + "price": req.price + } + + if req.direction == Direction.LONG: + if req.type == OrderType.LIMIT: + path = f"/buy/{req.symbol}/" + elif req.type == OrderType.MARKET: + path = f"/buy/market/{req.symbol}/" + else: + if req.type == OrderType.LIMIT: + path = f"/sell/{req.symbol}/" + elif req.type == OrderType.MARKET: + path = f"/sell/market/{req.symbol}/" + + self.add_request( + method="POST", + path=path, + data=data, + callback=self.on_send_order, + extra=order, + ) + self.order_manager.on_order(order) + return order.vt_orderid + + def on_send_order(self, data, request): + """""" + order = request.extra + + if ["reason"] in data: + order.status = Status.REJECTED + self.order_manager.on_order(order) + return + + sys_orderid = data["id"] + self.order_manager.update_orderid_map(order.orderid, sys_orderid) + + def on_send_order_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback when sending order caused exception. + """ + # Record exception if not ConnectionError + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) def on_failed(self, status_code: int, request: Request): """ @@ -1078,15 +437,15 @@ class BitstampRestApi(RestClient): self.user_transactions() self.gateway.write_log("重新获取 Transactions 数据") - elif path == "open_orders/all/": - self.open_orders() + elif path == "query_order/all/": + self.query_order() self.gateway.write_log("重新获取委托数据") elif path == "cancel_order/": self.cancel_order(request.extra) self.gateway.write_log(f"重新提交{request.extra.orderid}撤单请求") elif path == "balance/": - self.query_account_balance() + self.query_account() self.gateway.write_log(f"重新获取balance撤单请求") elif ("sell" in path) or ("buy" in path): @@ -1122,30 +481,17 @@ class BitstampRestApi(RestClient): self.exception_detail(exception_type, exception_value, tb, request) ) - def buy_market_order(self, amount, base="btc", quote="usd"): - """""" - pass - def buy_limit_order(self, amount, base="btc", quote="usd"): - """""" - pass +class BitstampWebsocketApi(WebsocketClient): + """""" - def sell_limit_order(self, amount, price, base="btc", quote="usd", limit_price=None, ioc_order=False): - """""" - pass - - def sell_market_order(self, amount, base="btc", quote="usd"): - """""" - pass - - -class BitstampDataWebsocketApi(WebsocketClient): def __init__(self, gateway): """""" - super(BitstampDataWebsocketApi, self).__init__() + super().__init__() self.gateway = gateway self.gateway_name = gateway.gateway_name + self.order_manager = gateway.order_manager self.ticks = {} diff --git a/vnpy/trader/constant.py b/vnpy/trader/constant.py index 7bce5e10..53fc8f9f 100644 --- a/vnpy/trader/constant.py +++ b/vnpy/trader/constant.py @@ -116,6 +116,8 @@ class Exchange(Enum): BITFINEX = "BITFINEX" BINANCE = "BINANCE" COINBASE = "COINBASE" + BITSTAMP = "BITSTAMP" + class Currency(Enum): """ From 04ad1cb9815cfa01cde18f380164bcd87e0e0b02 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Tue, 24 Sep 2019 23:00:50 +0800 Subject: [PATCH 06/10] [Mod] Reimplement BitstampGateway --- vnpy/gateway/bitstamp/bitstamp_gateway.py | 340 ++++++++++------------ 1 file changed, 157 insertions(+), 183 deletions(-) diff --git a/vnpy/gateway/bitstamp/bitstamp_gateway.py b/vnpy/gateway/bitstamp/bitstamp_gateway.py index 6fcd94ca..8e71adb8 100644 --- a/vnpy/gateway/bitstamp/bitstamp_gateway.py +++ b/vnpy/gateway/bitstamp/bitstamp_gateway.py @@ -8,11 +8,14 @@ import hmac import sys import time import re +import uuid from copy import copy from datetime import datetime, timedelta +from urllib.parse import urlencode +from typing import Dict, Set + from vnpy.api.rest import Request, RestClient from vnpy.api.websocket import WebsocketClient -from time import sleep from vnpy.trader.constant import ( Direction, @@ -77,21 +80,9 @@ TIMEDELTA_MAP = { Interval.DAILY: timedelta(days=1), } -bitstamp_symbols = set() symbol_name_map = {} -SYMBOL_BITSTAMP2VT = { - 'bchusd': "BCH/USD", 'bcheur': "BCH/EUR", - 'xrpusd': "XRP/USD", 'ltcusd': "LTC/USD", - 'eurusd': "EUR/USD", 'etheur': "ETH/EUR", - 'xrpeur': "XRP/EUR", 'btceur': "BTC/EUR", - 'ltcbtc': "LTC/BTC", 'btcusd': "BTC/USD", - 'ltceur': "LTC/EUR", 'ethusd': "ETH/USD", - 'xrpbtc': "XRP/BTC", 'bchbtc': "BCH/BTC", - 'ethbtc': "ETH/BTC", -} - class BitstampGateway(BaseGateway): """ @@ -176,7 +167,7 @@ class BitstampRestApi(RestClient): self.key = "" self.secret = "" - self.username = "" + self.username = "qxfe9863" self.order_count = 1_000_000 self.connect_time = 0 @@ -214,23 +205,47 @@ class BitstampRestApi(RestClient): """ Sign Bitstamp request. """ - # Sign - nonce = int(round(time.time() * 1000000)) - message = f"{nonce}{self.username}{self.key}" + if request.method == "GET": + return request + + timestamp = str(int(round(time.time() * 1000))) + nonce = str(uuid.uuid4()) + content_type = "application/x-www-form-urlencoded" + + # Empty post data leads to API0020 error, + # so use this offset dict instead. + if not request.data: + request.data = {"offset": "1"} + payload_str = urlencode(request.data) + + message = "BITSTAMP " + self.key + \ + request.method + \ + "www.bitstamp.net/api/v2" + \ + request.path + \ + "" + \ + content_type + \ + nonce + \ + timestamp + \ + "v2" + \ + payload_str + message = message.encode("utf-8") signature = hmac.new( self.secret, - msg=message.encode('utf-8'), + msg=message, digestmod=hashlib.sha256 ).hexdigest().upper() - if request.method == "POST": - if request.data is None: - request.data = {} - - request.data["key"] = self.key - request.data["nonce"] = nonce - request.data["signature"] = signature + request.headers = { + "X-Auth": "BITSTAMP " + self.key, + "X-Auth-Signature": signature, + "X-Auth-Nonce": nonce, + "X-Auth-Timestamp": timestamp, + "X-Auth-Version": "v2", + "Content-Type": content_type + } + print(payload_str) + request.data = payload_str return request @@ -280,20 +295,18 @@ class BitstampRestApi(RestClient): def on_query_account(self, data, request): """""" - for d in data: - if "balance" in d: - currency = d.replace("_balance", "") - account = AccountData( - accountid=currency, - balance=float(data[currency + "_balance"]), - frozen=float(data[currency + "_reserved"]), - # available=float(data[currency + "_available"]), - gateway_name=self.gateway_name - ) + for key in data.keys(): + if "balance" not in key: + continue + currency = key.replace("_balance", "") - self.gateway.on_account(account) - - self.on_position(data) + account = AccountData( + accountid=currency, + balance=float(data[currency + "_balance"]), + frozen=float(data[currency + "_reserved"]), + gateway_name=self.gateway_name + ) + self.gateway.on_account(account) def query_contract(self): """""" @@ -306,8 +319,8 @@ class BitstampRestApi(RestClient): def on_query_contract(self, data, request): """""" for d in data: - pricetick = 1 / pow(10, d["base_decimals"]) - min_volume = float(d["minimum_order"]) + pricetick = 1 / pow(10, d["counter_decimals"]) + min_volume = 1 / pow(10, d["base_decimals"]) contract = ContractData( symbol=d["url_symbol"], @@ -322,6 +335,8 @@ class BitstampRestApi(RestClient): ) self.gateway.on_contract(contract) + symbol_name_map[contract.symbol] = contract.name + self.gateway.write_log("合约信息查询成功") def cancel_order(self, req: CancelRequest): @@ -347,22 +362,21 @@ class BitstampRestApi(RestClient): order = self.order_manager.get_order_with_local_orderid(local_orderid) if "error" in data: - local_order.status = Status.REJECTED + order.status = Status.REJECTED else: - local_order.status = Status.CANCELLED + order.status = Status.CANCELLED - self.gateway.write_log(f"委托撤单成功:{order.orderid}) + self.gateway.write_log(f"委托撤单成功:{order.orderid}") self.order_manager.on_order(order) def on_cancel_order_error(self, data, request): - print(f"cancel_order {data}") + """""" error_msg = data["error"] self.gateway.write_log(f"撤单请求出错,信息:{error_msg}") def send_order(self, req: OrderRequest): """""" - local_orderid = self.order_manager.new_local_orderid() order = req.create_order_data( local_orderid, @@ -398,6 +412,7 @@ class BitstampRestApi(RestClient): def on_send_order(self, data, request): """""" + print("on_send", data) order = request.extra if ["reason"] in data: @@ -422,52 +437,13 @@ class BitstampRestApi(RestClient): """ Callback to handle request failed. """ - print(f"on_failed {request}") + data = request.response.json() + reason = data["reason"] + code = data["code"] - reason = request.response.json()["reason"] - code = request.response.json()["code"] msg = f"{request.path} 请求失败,状态码:{status_code},信息: {reason} code: {code}" self.gateway.write_log(msg) - # print(f"reason: {reason} code: {code}") - path = request.path - if code in ["API0004"]: - # nonce 错误重新执行此请求 - if path == "user_transactions/": - self.user_transactions() - self.gateway.write_log("重新获取 Transactions 数据") - - elif path == "query_order/all/": - self.query_order() - self.gateway.write_log("重新获取委托数据") - - elif path == "cancel_order/": - self.cancel_order(request.extra) - self.gateway.write_log(f"重新提交{request.extra.orderid}撤单请求") - elif path == "balance/": - self.query_account() - self.gateway.write_log(f"重新获取balance撤单请求") - - elif ("sell" in path) or ("buy" in path): - order_data = request.extra - - # update order status - order_data.status = Status.REJECTED - self.order_manager.on_order(order_data) - - req = OrderRequest( - symbol=order_data.symbol, - exchange=order_data.exchange, - direction=order_data.direction, - type=order_data.type, - volume=order_data.volume, - price=order_data.price, - offset=order_data.offset - ) - - self.send_order(req) - self.gateway.write_log("重新提交委托请求") - def on_error( self, exception_type: type, exception_value: Exception, tb, request: Request ): @@ -493,128 +469,126 @@ class BitstampWebsocketApi(WebsocketClient): self.gateway_name = gateway.gateway_name self.order_manager = gateway.order_manager - self.ticks = {} + self.subscribed: Dict[str, SubscribeRequest] = {} + self.ticks: Dict[str, TickData] = {} def connect(self, proxy_host: str, proxy_port: int): """""" self.init(WEBSOCKET_HOST, proxy_host, proxy_port) self.start() - def post_connected(self, req: SubscribeRequest): - """ - 测试默认连接btcusd - :return: - """ - sleep(1) - d = { - "event": "bts:subscribe", - "data": { - # "channel": "diff_order_book_" + req.symbol # "live_trades_btcusd" - "channel": "order_book_" + req.symbol # "live_trades_btcusd" - } - } - self.send_packet(d) - sleep(1) - - d = { - "event": "bts:subscribe", - "data": { - "channel": "live_trades_" + req.symbol # "live_trades_btcusd" - } - } - self.send_packet(d) - def on_connected(self): """""" - self.gateway.write_log("行情Websocket API连接刷新") - # 测试默认提交 - # self.post_connected() - # self.subscribe(req) + self.gateway.write_log("Websocket API连接成功") + + # Auto re-subscribe market data after reconnected + for req in self.subscribed.values(): + self.subscribe(req) def subscribe(self, req: SubscribeRequest): """""" - # print(f"webocket subscribe req {req}") - # Create tick buf data + self.subscribed[req.symbol] = req + if not self._active: + return tick = TickData( symbol=req.symbol, - # name=symbol_name_map.get(req.symbol, ""), - name=req.symbol, + name=symbol_name_map.get(req.symbol, ""), exchange=Exchange.BITSTAMP, datetime=datetime.now(), gateway_name=self.gateway_name, ) - self.ticks[req.symbol.lower()] = tick - # 默认使用btcusd 连接 - self.post_connected(req) + for prefix in [ + "order_book_", + "live_trades_", + "live_orders_" + ]: + channel = f"{prefix}{req.symbol}" + d = { + "event": "bts:subscribe", + "data": { + "channel": channel + } + } + self.ticks[channel] = tick + self.send_packet(d) def on_packet(self, packet): """""" - # print(f"on_packet {packet}") - if "bts:request_reconnect" == packet["event"]: - # 重新连接 - self.post_connected() - elif "data" == packet["event"]: - return self.on_market_depth(packet) - else: - self.on_data(packet) + event = packet["event"] - def on_data(self, packet): + if event == "trade": + self.on_market_trade(packet) + elif event == "data": + self.on_market_depth(packet) + elif "order_" in event: + self.on_market_order(packet) + elif event == "bts:request_reconnect": + self._disconnect() # Server requires to reconnect + + def on_market_trade(self, packet): """""" - if packet["event"] == "trade": - self.on_trade_update(packet) - # print("data : {}".format(packet)) - - def on_trade_update(self, packet): channel = packet["channel"] data = packet["data"] - symbol = str(re.sub("live_.*_", "", channel)) - tick = self.ticks[symbol] - tick.last_price = float(data["price"]) - def on_market_depth(self, packet): - """行情深度推送 """ - - channel = packet["channel"] - data = packet["data"] - # symbol = str(re.sub("live_.*_","", channel)) #live order channel - symbol = str(re.sub("order_book_", "", channel)) - # print(f"market_detph {data}") - tick = self.ticks[symbol] - tick.datetime = datetime.fromtimestamp(int(data['timestamp']) / 1000) - - if symbol in self.ticks: - tick = self.ticks[symbol] - else: - tick = TickData( - symbol=symbol, - exchange=Exchange.BITFINEX, - name=symbol, - datetime=datetime.now(), - gateway_name=self.gateway_name, - ) - - self.ticks[symbol] = tick - - if len(data) == 0: - print("请求数据为空") - return - - bids = data["bids"] - - for n in range(5): - # for n in range(len(bids)): - price, volume = bids[n] - tick.__setattr__("bid_price_" + str(n + 1), float(price)) - tick.__setattr__("bid_volume_" + str(n + 1), float(volume)) - - asks = data["asks"] - # print(f"bids count {len(bids)} , asks count {len(asks)}") - for n in range(5): - # for n in range(len(asks)): - price, volume = asks[n] - tick.__setattr__("ask_price_" + str(n + 1), float(price)) - tick.__setattr__("ask_volume_" + str(n + 1), float(volume)) + tick = self.ticks[channel] + tick.last_price = data["price"] + tick.last_volume = data["amount"] + tick.datetime = datetime.fromtimestamp(int(data["timestamp"])) self.gateway.on_tick(copy(tick)) + + buy_orderid = data["buy_order_id"] + sell_orderid = data["sell_order_id"] + + for sys_orderid in [buy_orderid, sell_orderid]: + order = self.order_manager.get_order_with_sys_orderid(sys_orderid) + + if order: + order.traded += data["amount"] + + if order.traded < order.volume: + order.status = Status.PARTTRADED + else: + order.status = Status.ALLTRADED + + self.order_manager.on_order(copy(order)) + + def on_market_depth(self, packet): + """""" + channel = packet["channel"] + data = packet["data"] + + tick = self.ticks[channel] + tick.datetime = datetime.fromtimestamp(int(data["timestamp"])) + + bids = data["bids"] + asks = data["asks"] + + for n in range(5): + ix = n + 1 + + bid_price, bid_volume = bids[n] + tick.__setattr__(f"bid_price_{ix}", float(bid_price)) + tick.__setattr__(f"bid_volume_{ix}", float(bid_volume)) + + ask_price, ask_volume = asks[n] + tick.__setattr__(f"ask_price_{ix}", float(ask_price)) + tick.__setattr__(f"ask_volume_{ix}", float(ask_volume)) + + self.gateway.on_tick(copy(tick)) + + def on_market_order(self, packet): + """""" + event = packet["event"] + data = packet["data"] + + if event != "order_deleted": + return + + sys_orderid = data["id"] + order = self.order_manager.get_order_with_sys_orderid(sys_orderid) + if order: + order.status = Status.CANCELLED + self.order_manager.on_order(copy(order)) From d741221c418804bb75fdc808ee57b2231fc08a7d Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Wed, 25 Sep 2019 15:34:13 +0800 Subject: [PATCH 07/10] [Mod] complete send/cancel order function --- vnpy/gateway/bitstamp/bitstamp_gateway.py | 78 +++++++++++++++++++---- 1 file changed, 66 insertions(+), 12 deletions(-) diff --git a/vnpy/gateway/bitstamp/bitstamp_gateway.py b/vnpy/gateway/bitstamp/bitstamp_gateway.py index 8e71adb8..d4d9051a 100644 --- a/vnpy/gateway/bitstamp/bitstamp_gateway.py +++ b/vnpy/gateway/bitstamp/bitstamp_gateway.py @@ -7,14 +7,15 @@ import hashlib import hmac import sys import time -import re import uuid from copy import copy from datetime import datetime, timedelta from urllib.parse import urlencode -from typing import Dict, Set +from typing import Dict -from vnpy.api.rest import Request, RestClient +import requests + +from vnpy.api.rest import Request, RestClient, RequestStatus from vnpy.api.websocket import WebsocketClient from vnpy.trader.constant import ( @@ -31,7 +32,6 @@ from vnpy.trader.object import ( OrderData, TradeData, BarData, - PositionData, AccountData, ContractData, OrderRequest, @@ -82,6 +82,7 @@ TIMEDELTA_MAP = { symbol_name_map = {} +name_symbol_map = {} class BitstampGateway(BaseGateway): @@ -199,7 +200,6 @@ class BitstampRestApi(RestClient): self.query_contract() self.query_account() - self.query_order() def sign(self, request: Request): """ @@ -244,11 +244,54 @@ class BitstampRestApi(RestClient): "X-Auth-Version": "v2", "Content-Type": content_type } - print(payload_str) request.data = payload_str return request + def _process_request( + self, request: Request + ): + """ + Sending request to server and get result. + """ + try: + request = self.sign(request) + + url = self.make_full_url(request.path) + + response = requests.request( + request.method, + url, + headers=request.headers, + params=request.params, + data=request.data, + proxies=self.proxies, + ) + request.response = response + status_code = response.status_code + if status_code // 100 == 2: # 2xx codes are all successful + if status_code == 204: + json_body = None + else: + json_body = response.json() + + request.callback(json_body, request) + request.status = RequestStatus.success + else: + request.status = RequestStatus.failed + + if request.on_failed: + request.on_failed(status_code, request) + else: + self.on_failed(status_code, request) + except Exception: + request.status = RequestStatus.error + t, v, tb = sys.exc_info() + if request.on_error: + request.on_error(t, v, tb, request) + else: + self.on_error(t, v, tb, request) + def query_order(self): """""" path = "/open_orders/all/" @@ -266,19 +309,21 @@ class BitstampRestApi(RestClient): local_orderid = self.order_manager.get_local_orderid(sys_orderid) direction = DIRECTION_BITSTAMP2VT[d["type"]] + name = d["currency_pair"] + symbol = name_symbol_map[name] order = OrderData( orderid=local_orderid, - symbol=d["currency_pair"], + symbol=symbol, exchange=Exchange.BITSTAMP, price=float(d["price"]), volume=float(d["amount"]), traded=float(0), direction=direction, + status=Status.NOTTRADED, time=d["datetime"], gateway_name=self.gateway_name, ) - self.order_manager.on_order(order) self.gateway.write_log("委托信息查询成功") @@ -336,9 +381,12 @@ class BitstampRestApi(RestClient): self.gateway.on_contract(contract) symbol_name_map[contract.symbol] = contract.name + name_symbol_map[contract.name] = contract.symbol self.gateway.write_log("合约信息查询成功") + self.query_order() + def cancel_order(self, req: CancelRequest): """""" path = "/cancel_order/" @@ -366,7 +414,7 @@ class BitstampRestApi(RestClient): else: order.status = Status.CANCELLED - self.gateway.write_log(f"委托撤单成功:{order.orderid}") + self.gateway.write_log(f"撤单成功:{order.orderid}") self.order_manager.on_order(order) @@ -412,17 +460,23 @@ class BitstampRestApi(RestClient): def on_send_order(self, data, request): """""" - print("on_send", data) order = request.extra - if ["reason"] in data: + status = data.get("status", None) + if status and status == "error": order.status = Status.REJECTED self.order_manager.on_order(order) + + msg = data["reason"]["__all__"][0] + self.gateway.write_log(msg) return sys_orderid = data["id"] self.order_manager.update_orderid_map(order.orderid, sys_orderid) + order.status = Status.NOTTRADED + self.order_manager.on_order(order) + def on_send_order_error( self, exception_type: type, exception_value: Exception, tb, request: Request ): @@ -441,7 +495,7 @@ class BitstampRestApi(RestClient): reason = data["reason"] code = data["code"] - msg = f"{request.path} 请求失败,状态码:{status_code},信息: {reason} code: {code}" + msg = f"{request.path} 请求失败,状态码:{status_code},错误信息:{reason},错误代码: {code}" self.gateway.write_log(msg) def on_error( From d868bdf0da7c5cf4b77e2b31ae977c5a63f1811a Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Wed, 25 Sep 2019 16:00:24 +0800 Subject: [PATCH 08/10] [Mod] use bitstamp gateway --- examples/vn_trader/run.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/vn_trader/run.py b/examples/vn_trader/run.py index 74e3ca1f..298cfbb1 100644 --- a/examples/vn_trader/run.py +++ b/examples/vn_trader/run.py @@ -29,6 +29,7 @@ from vnpy.gateway.okexs import OkexsGateway # from vnpy.gateway.alpaca import AlpacaGateway from vnpy.gateway.da import DaGateway from vnpy.gateway.coinbase import CoinbaseGateway +from vnpy.gateway.bitstamp import BitstampGateway from vnpy.app.cta_strategy import CtaStrategyApp # from vnpy.app.csv_loader import CsvLoaderApp @@ -72,7 +73,8 @@ def main(): # main_engine.add_gateway(AlpacaGateway) # main_engine.add_gateway(OkexsGateway) # main_engine.add_gateway(DaGateway) - main_engine.add_gateway(CoinbaseGateway) + # main_engine.add_gateway(CoinbaseGateway) + main_engine.add_gateway(BitstampGateway) main_engine.add_app(CtaStrategyApp) main_engine.add_app(CtaBacktesterApp) From e0c15d4eee57a49613dd64e863382be68f28186d Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Wed, 25 Sep 2019 22:53:13 +0800 Subject: [PATCH 09/10] [Mod] complete test of order/trade push update --- vnpy/gateway/bitstamp/bitstamp_gateway.py | 53 ++++++++++++++++++----- 1 file changed, 41 insertions(+), 12 deletions(-) diff --git a/vnpy/gateway/bitstamp/bitstamp_gateway.py b/vnpy/gateway/bitstamp/bitstamp_gateway.py index d4d9051a..9296b821 100644 --- a/vnpy/gateway/bitstamp/bitstamp_gateway.py +++ b/vnpy/gateway/bitstamp/bitstamp_gateway.py @@ -216,6 +216,7 @@ class BitstampRestApi(RestClient): # so use this offset dict instead. if not request.data: request.data = {"offset": "1"} + payload_str = urlencode(request.data) message = "BITSTAMP " + self.key + \ @@ -252,7 +253,9 @@ class BitstampRestApi(RestClient): self, request: Request ): """ - Sending request to server and get result. + Bistamp API server does not support keep-alive connection. + So when using session.request will cause header related error. + Reimplement this method to use requests.request instead. """ try: request = self.sign(request) @@ -387,6 +390,14 @@ class BitstampRestApi(RestClient): self.query_order() + def query_history(self): + """""" + self.add_request( + method="GET", + path="/trading-pairs-info/", + callback=self.on_query_contract, + ) + def cancel_order(self, req: CancelRequest): """""" path = "/cancel_order/" @@ -405,18 +416,20 @@ class BitstampRestApi(RestClient): def on_cancel_order(self, data, request): """""" + error = data.get("error", "") + if error: + self.gateway.write_log(error) + return + cancel_request = request.extra local_orderid = cancel_request.orderid order = self.order_manager.get_order_with_local_orderid(local_orderid) - if "error" in data: - order.status = Status.REJECTED - else: + if order.is_active: order.status = Status.CANCELLED + self.order_manager.on_order(order) - self.gateway.write_log(f"撤单成功:{order.orderid}") - - self.order_manager.on_order(order) + self.gateway.write_log(f"撤单成功:{order.orderid}") def on_cancel_order_error(self, data, request): """""" @@ -593,11 +606,13 @@ class BitstampWebsocketApi(WebsocketClient): self.gateway.on_tick(copy(tick)) - buy_orderid = data["buy_order_id"] - sell_orderid = data["sell_order_id"] + # Order status check + buy_orderid = str(data["buy_order_id"]) + sell_orderid = str(data["sell_order_id"]) for sys_orderid in [buy_orderid, sell_orderid]: - order = self.order_manager.get_order_with_sys_orderid(sys_orderid) + order = self.order_manager.get_order_with_sys_orderid( + sys_orderid) if order: order.traded += data["amount"] @@ -609,6 +624,19 @@ class BitstampWebsocketApi(WebsocketClient): self.order_manager.on_order(copy(order)) + trade = TradeData( + symbol=order.symbol, + exchange=order.exchange, + orderid=order.orderid, + tradeid=data["id"], + direction=order.direction, + price=data["price"], + volume=data["amount"], + time=tick.datetime.strftime("%H:%M:%S"), + gateway_name=self.gateway_name + ) + self.gateway.on_trade(trade) + def on_market_depth(self, packet): """""" channel = packet["channel"] @@ -641,8 +669,9 @@ class BitstampWebsocketApi(WebsocketClient): if event != "order_deleted": return - sys_orderid = data["id"] + sys_orderid = str(data["id"]) order = self.order_manager.get_order_with_sys_orderid(sys_orderid) - if order: + + if order and order.is_active(): order.status = Status.CANCELLED self.order_manager.on_order(copy(order)) From 9139d3b01e15f9fe8831ba670f74e84fcdfdceb1 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Wed, 25 Sep 2019 23:08:33 +0800 Subject: [PATCH 10/10] [Mod] continuous query account data --- vnpy/gateway/bitstamp/bitstamp_gateway.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/vnpy/gateway/bitstamp/bitstamp_gateway.py b/vnpy/gateway/bitstamp/bitstamp_gateway.py index 9296b821..d356ae52 100644 --- a/vnpy/gateway/bitstamp/bitstamp_gateway.py +++ b/vnpy/gateway/bitstamp/bitstamp_gateway.py @@ -123,6 +123,8 @@ class BitstampGateway(BaseGateway): session, proxy_host, proxy_port) self.ws_api.connect(proxy_host, proxy_port) + self.event_engine.register(EVENT_TIMER, self.process_timer_event) + def subscribe(self, req: SubscribeRequest): """""" self.ws_api.subscribe(req) @@ -145,13 +147,17 @@ class BitstampGateway(BaseGateway): def query_history(self, req: HistoryRequest): """""" - return self.rest_api.query_history(req) + pass def close(self): """""" self.rest_api.stop() self.ws_api.stop() + def process_timer_event(self, event): + """""" + self.rest_api.query_account() + class BitstampRestApi(RestClient): """ @@ -378,7 +384,7 @@ class BitstampRestApi(RestClient): size=1, pricetick=pricetick, min_volume=min_volume, - history_data=True, + history_data=False, gateway_name=self.gateway_name, ) self.gateway.on_contract(contract) @@ -390,14 +396,6 @@ class BitstampRestApi(RestClient): self.query_order() - def query_history(self): - """""" - self.add_request( - method="GET", - path="/trading-pairs-info/", - callback=self.on_query_contract, - ) - def cancel_order(self, req: CancelRequest): """""" path = "/cancel_order/"