From be039c314d328fbc1d93ffa79322e3ba905f141a Mon Sep 17 00:00:00 2001 From: LimingFang Date: Tue, 3 Sep 2019 10:55:27 +0800 Subject: [PATCH 1/8] coinbase_gateway --- vnpy/gateway/coinbase/__init__.py | 1 + vnpy/gateway/coinbase/coinbase_gateway.py | 885 ++++++++++++++++++++++ 2 files changed, 886 insertions(+) create mode 100644 vnpy/gateway/coinbase/__init__.py create mode 100644 vnpy/gateway/coinbase/coinbase_gateway.py diff --git a/vnpy/gateway/coinbase/__init__.py b/vnpy/gateway/coinbase/__init__.py new file mode 100644 index 00000000..5b34d7bb --- /dev/null +++ b/vnpy/gateway/coinbase/__init__.py @@ -0,0 +1 @@ +from .coinbase_gateway import CoinbaseGateway diff --git a/vnpy/gateway/coinbase/coinbase_gateway.py b/vnpy/gateway/coinbase/coinbase_gateway.py new file mode 100644 index 00000000..9c3a6f29 --- /dev/null +++ b/vnpy/gateway/coinbase/coinbase_gateway.py @@ -0,0 +1,885 @@ +import hashlib +import hmac +import json +import sys +import time +from copy import copy +from datetime import datetime, timedelta +from threading import Lock +import base64 +import uuid + +from typing import List, Sequence + +from requests import ConnectionError + +from vnpy.event import Event +from vnpy.api.rest import Request, RestClient +from vnpy.api.websocket import WebsocketClient +from vnpy.trader.event import EVENT_TIMER +from vnpy.trader.constant import ( + Direction, + Exchange, + OrderType, + Product, + Status, + Offset, + Interval +) +from vnpy.trader.gateway import BaseGateway +from vnpy.trader.object import ( + TickData, + OrderData, + TradeData, + PositionData, + AccountData, + ContractData, + BarData, + OrderRequest, + CancelRequest, + SubscribeRequest, + HistoryRequest +) + +REST_HOST = "https://api.pro.coinbase.com" +WEBSOCKET_HOST = "wss://ws-feed.pro.coinbase.com" +SANDBOX_REST_HOST = "https://api-public.sandbox.pro.coinbase.com" +SANDBOX_WEBSOCKET_HOST = "wss://ws-feed-public.sandbox.pro.coinbase.com" + +DIRECTION_VT2COINBASE = {Direction.LONG: "buy", Direction.SHORT: "sell"} +DIRECTION_COINBASE2VT = {v: k for k, v in DIRECTION_VT2COINBASE.items()} + +STOP_VT2COINBASE = {Direction.LONG: "entry", Direction.SHORT: "loss"} + +ORDERTYPE_VT2COINBASE = { + OrderType.LIMIT: "limit", + OrderType.MARKET: "market", +} +ORDERTYPE_COINBASE2VT = {v: k for k, v in ORDERTYPE_VT2COINBASE.items()} + +INTERVAL_VT2COINBASE = { + Interval.MINUTE: "1m", + Interval.HOUR: "1h", + Interval.DAILY: "1d", +} + +TIMEDELTA_MAP = { + Interval.MINUTE: timedelta(minutes=1), + Interval.HOUR: timedelta(hours=1), + Interval.DAILY: timedelta(days=1), +} + +cancelDict = {} # orderid:cancelreq +orderDict = {} # sysid:order +orderSysDict = {} # orderid:sysid + + +class CoinbaseGateway(BaseGateway): + """ + VN Trader Gateway for coinbase connection + """ + + default_setting = { + "ID": "", + "Secret": "", + "passphrase": "", + "会话数": 3, + "server": ["REAL", "SANDBOX"], + "proxy_host": "", + "proxy_port": "", + } + exchanges = [Exchange.COINBASE] + + def __init__(self, event_engine): + """Constructor""" + super(CoinbaseGateway, self).__init__(event_engine, "COINBASE") + self.rest_api = CoinbaseRestApi(self) + self.ws_api = CoinbaseWebsocketApi(self) + + self.rest_api_inited = False + + self.product_id = [] + self.received_instrument = False + + event_engine.register(EVENT_TIMER, self.process_timer_event) + + def connect(self, setting: dict): + """""" + key = setting["ID"] + secret = setting["Secret"] + session_number = setting["会话数"] + proxy_host = setting["proxy_host"] + proxy_port = setting["proxy_port"] + server = setting['server'] + passphrase = setting['passphrase'] + + if proxy_port.isdigit(): + proxy_port = int(proxy_port) + else: + proxy_port = 0 + + self.rest_api.connect(key, secret, passphrase, session_number, server, + proxy_host, proxy_port) + while not self.received_instrument: + time.sleep(0.5) + + self.write_log("合约查询成功") + self.ws_api.connect( + key, + secret, + passphrase, + server, + proxy_host, + proxy_port) + + def subscribe(self, req: SubscribeRequest): + """""" + self.ws_api.subscribe(req) + + def send_order(self, req: OrderRequest): + """""" + return self.rest_api.send_order(req) + + def cancel_order(self, req: CancelRequest): + """""" + self.rest_api.cancel_order(req) + + def query_account(self): + """""" + return self.rest_api.qry_account() + + def query_position(self): + """ + Query holding positions. + """ + pass + + def query_history(self, req: HistoryRequest): + """ + Query bar history data. + """ + pass + + def close(self): + """""" + self.rest_api.stop() + self.ws_api.stop() + + def process_timer_event(self, event: Event): + """""" + self.rest_api.reset_rate_limit() + if self.rest_api_inited: + self.init_query() + + def init_query(self): + """""" + self.rest_api.qry_account() + + +class CoinbaseWebsocketApi(WebsocketClient): + """ + Coinbase WEBSOCKET API + """ + + def __init__(self, gateway: BaseGateway): + """""" + super(CoinbaseWebsocketApi, self).__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.key = "" + self.secret = "" + self.passphrase = "" + + self.callbacks = { + "ticker": self.on_orderbook, + "l2update": self.on_orderbook, + "snapshot": self.on_orderbook, + "received": self.on_order_received, + "open": self.on_order_open, + "done": self.on_order_done, + "match": self.on_order_match, + } + self.ticks = {} + + self.accounts = {} + self.orderbooks = {} + + def connect( + self, + key: str, + secret: str, + passphrase: str, + server: str, + proxy_host: str, + proxy_port: int): + """""" + self.gateway.write_log("开始连接ws接口") + self.key = key + self.secret = secret.encode() + self.passphrase = passphrase + + if server == "REAL": + self.init(WEBSOCKET_HOST, proxy_host, proxy_port) + else: + self.init(SANDBOX_WEBSOCKET_HOST, proxy_host, proxy_port) + self.start() + + def subscribe(self, req: SubscribeRequest): + """""" + symbol = req.symbol + exchange = req.exchange + + orderbook = OrderBook(symbol, exchange, self.gateway) + self.orderbooks[symbol] = orderbook + + sub_req = { + "type": "subscribe", + "product_ids": [symbol], + "channels": ["user", "level2", "ticker"], + } + + timestamp = str(time.time()) + message = timestamp + 'GET' + '/users/self/verify' + auth_headers = get_auth_header( + timestamp, + message, + self.key, + self.secret, + self.passphrase) + sub_req['signature'] = auth_headers['CB-ACCESS-SIGN'] + sub_req['key'] = auth_headers['CB-ACCESS-KEY'] + sub_req['passphrase'] = auth_headers['CB-ACCESS-PASSPHRASE'] + sub_req['timestamp'] = auth_headers['CB-ACCESS-TIMESTAMP'] + + self.send_packet(sub_req) + + def on_connected(self): + """ + callback when connection is established + """ + self.gateway.write_log("Websocket API连接成功") + + def on_disconnected(self): + """""" + self.gateway.write_log("Websocket API连接断开") + + def on_packet(self, packet: dict): + """ + callback when data is received and unpacked + """ + + if packet['type'] == 'error': + self.gateway.write_log("Websocket API报错: %s" % packet['message']) + self.gateway.write_log("Websocket API报错原因是: %s" % packet['reason']) + self.active = False + + else: + callback = self.callbacks.get(packet['type'], None) + if callback: + if packet['type'] not in ['ticker', 'snapshot', 'l2update']: + callback(packet) + else: + product_id = packet['product_id'] + callback(packet, product_id) + + def on_orderbook(self, packet: dict, product_id: str): + """ + Call back when data is used to update orderbook + """ + orderbook = self.orderbooks[product_id] + orderbook.on_message(packet) + + def on_order_received(self, packet: dict): + """ + Call back when order is received by Coinbase + """ + client_oid = packet['client_oid'] + sysid = packet['order_id'] + + order = OrderData( + symbol=packet['product_id'], + exchange=Exchange.COINBASE, + type=ORDERTYPE_COINBASE2VT[packet['order_type']], + orderid=sysid, + direction=DIRECTION_COINBASE2VT[packet['side']], + price=float(packet['price']), + volume=float(packet['size']), + time=packet['time'], + gateway_name=self.gateway_name, + ) + + order.traded = 0 + order.status = Status.NOTTRADED + + orderSysDict[client_oid] = sysid + orderDict[sysid] = order + + if client_oid in cancelDict: + req = cancelDict[client_oid] + self.gateway.cancel_order(req) + + self.gateway.on_order(copy(order)) + + def on_order_open(self, packet: dict): + """ + Call back when the order is on the orderbook + """ + orderid = packet['order_id'] + order = orderDict.get(orderid) + order.traded = float(order.volume) - float(packet['remaining_size']) + if order.traded: + order.status = Status.PARTTRADED + + self.gateway.on_order(copy(order)) + + def on_order_done(self, packet: dict): + """ + Call back when the order is done + """ + order = orderDict.get(packet['order_id'], None) + if not order: + return + order.traded = order.volume - float(packet['remaining_size']) + if packet['reason'] == 'filled': + order.status = Status.ALLTRADED + else: + order.status = Status.CANCELLED + self.gateway.on_order(copy(order)) + + def on_order_match(self, packet: dict): + """""" + if packet['maker_order_id'] in orderDict: + order = orderDict[packet['maker_order_id']] + else: + order = orderDict[packet['taker_order_id']] + + trade = TradeData( + symbol=packet['product_id'], + exchange=Exchange.COINBASE, + orderid=order.orderid, + tradeid=packet['trade_id'], + direction=DIRECTION_COINBASE2VT[packet['side']], + price=packet['price'], + volume=packet['size'], + time=datetime.strptime( + packet['time'], "%Y-%m-%dT%H:%M:%S.%fZ"), + gateway_name=self.gateway_name, + ) + self.gateway.on_trade(trade) + + +class OrderBook(): + """ + Used to maintain orderbook of coinbase data + """ + + def __init__(self, symbol: str, exchange: Exchange, gateway: BaseGateway): + """ + one symbol per orderbook + """ + + self.asks = dict() + self.bids = dict() + self.gateway = gateway + self.newest_tick = TickData( + "COINBASE", symbol, exchange, datetime.now()) + + def on_message(self, d: dict): + """ + callback by websocket when server send orderbook data + """ + if d['type'] == 'l2update': + dt = datetime.strptime( + d["time"][:-4] + d['time'][-1], "%Y-%m-%dT%H:%M:%S.%fZ") + self.on_update(d['changes'][0], dt) + elif d['type'] == 'snapshot': + self.on_snapshot(d['asks'], d['bids']) + else: + self.on_ticker(d) + + def on_update(self, d: list, dt): + """ + call back when type is 12update + """ + size = d[2] + price = d[1] + side = d[0] + + if side == 'buy': + if float(price) in self.bids: + if size == 0: + del self.bids[float(price)] + else: + self.bids[float(price)] = float(size) + else: + self.bids[float(price)] = float(size) + else: + if float(price) in self.asks: + if size == 0: + del self.asks[float(price)] + else: + self.asks[float(price)] = float(size) + else: + self.asks[float(price)] = float(size) + + self.generate_tick(dt) + + def on_ticker(self, d: dict): + """ + call back when type is ticker + """ + tick = self.newest_tick + + tick.openPrice = float(d['open_24h']) + tick.highPrice = float(d['high_24h']) + tick.lowPrice = float(d['low_24h']) + tick.lastPrice = float(d['price']) + tick.volume = float(d['volume_24h']) + + dt = datetime.strptime( + d['time'], "%Y-%m-%dT%H:%M:%S.%fZ") + + self.gateway.on_tick(copy(tick)) + + def on_snapshot(self, asks: Sequence[List], bids: Sequence[List]): + """ + call back when type is snapshot + """ + for price, size in asks: + self.asks[float(price)] = float(size) + + for price, size in bids: + self.bids[float(price)] = float(size) + + def generate_tick(self, dt: datetime): + """""" + tick = self.newest_tick + + bids_keys = self.bids.keys() + bids_keys = sorted(bids_keys, reverse=True) + + tick.bid_price_1 = bids_keys[0] + tick.bid_price_2 = bids_keys[1] + tick.bid_price_3 = bids_keys[2] + tick.bid_price_4 = bids_keys[3] + tick.bid_price_5 = bids_keys[4] + + tick.bid_volume_1 = self.bids[bids_keys[0]] + tick.bid_volume_2 = self.bids[bids_keys[1]] + tick.bid_volume_3 = self.bids[bids_keys[2]] + tick.bid_volume_4 = self.bids[bids_keys[3]] + tick.bid_volume_5 = self.bids[bids_keys[4]] + + asks_keys = self.asks.keys() + asks_keys = sorted(asks_keys) + + tick.ask_price_1 = asks_keys[0] + tick.ask_price_2 = asks_keys[1] + tick.ask_price_3 = asks_keys[2] + tick.ask_price_4 = asks_keys[3] + tick.ask_price_5 = asks_keys[4] + + tick.ask_volume_1 = self.asks[asks_keys[0]] + tick.ask_volume_2 = self.asks[asks_keys[1]] + tick.ask_volume_3 = self.asks[asks_keys[2]] + tick.ask_volume_4 = self.asks[asks_keys[3]] + tick.ask_volume_5 = self.asks[asks_keys[4]] + + tick.datetime = dt + self.gateway.on_tick(copy(tick)) + + +class CoinbaseRestApi(RestClient): + """ + Coinbase REST API + """ + + def __init__(self, gateway: BaseGateway): + """""" + super(CoinbaseRestApi, self).__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.key = "" + self.secret = "" + self.passphrase = "" + + self.order_count = 1_000_000 + self.order_count_lock = Lock() + + self.connect_time = 0 + + self.accounts = {} + + self.rate_limit = 5 + self.rate_limit_remaining = 5 + + def sign(self, request): + """ + Generate Coinbase signature + """ + timestamp = str(time.time()) + message = ''.join([timestamp, request.method, + request.path, request.data or ""]) + request.headers = (get_auth_header(timestamp, message, + self.key, + self.secret, + self.passphrase)) + return request + + def connect( + self, + key: str, + secret: str, + passphrase: str, + session_number: int, + server: str, + proxy_host: str, + proxy_port: int, + ): + """ + Initialize connection to REST server + """ + self.key = key + self.secret = secret.encode() + self.passphrase = passphrase + + self.connect_time = ( + int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count + ) + if server == "REAL": + self.init(REST_HOST, proxy_host, proxy_port) + else: + self.init(SANDBOX_REST_HOST, proxy_host, proxy_port) + + self.start(session_number) + self.gateway.rest_api_inited = True + + self.qry_instrument() + # self.qry_orders() + + self.gateway.write_log("REST API启动成功") + + def qry_instrument(self): + """ + Get the instrument of Coinbase + """ + self.add_request( + "GET", + "/products", + callback=self.on_qry_instrument, + params={}, + on_error=self.on_qry_instrument_error, + ) + + def qry_orders(self): + """""" + params = {"status": "all"} + self.add_request( + "GET", + "/orders", + callback=self.on_qry_orders, + params=params, + on_error=self.on_qry_orders_error, + ) + + def qry_account(self): + """""" + self.add_request( + "GET", + "/accounts", + callback=self.on_qry_account, + params={}, + on_error=self.on_qry_account_error, + ) + + def on_qry_account(self, data, request): + """""" + for acc in data: + account_id = str(acc['id']) + account = self.accounts.get(account_id, None) + if not account: + account = AccountData(accountid=account_id, + gateway_name=self.gateway_name) + self.accounts[account_id] = account + + account.balance = acc.get("balance", account.balance) + account.available = acc.get("available", account.available) + account.frozen = acc.get("hold", account.frozen) + + self.gateway.on_account(copy(account)) + + def on_qry_account_error( + self, + exception_type: type, + exception_value: Exception, + tb, + request): + """""" + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) + + def on_qry_orders_error( + self, + exception_type: type, + exception_value: Exception, + tb, + request): + """""" + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) + + def on_qry_orders(self, data, request): + """""" + for d in data: + date, time = d['created_at'].split('T') + if d['status'] == 'open': + if not d['filled_size']: + status = Status.NOTTRADED + else: + status = Status.PARTTRADED + else: + if d['size'] == d['filled_size']: + status = Status.ALLTRADED + else: + status = Status.CANCELLED + order = OrderData( + symbol=d['product_id'], + gateway_name=self.gateway_name, + exchange=Exchange.COINBASE, + orderid=d['id'], + direction=DIRECTION_COINBASE2VT[d['side']], + price=float(d['price']), + volume=float(d['size']), + traded=float(d['filled_size']), + time=time.replace('Z', ""), + status=status, + ) + self.gateway.on_order(copy(order)) + + orderDict[order.orderid] = order + orderSysDict[order.orderid] = order.orderID + + self.gateway.writeLog(u'委托信息查询成功') + + def on_qry_instrument_error( + self, + exception_type: type, + exception_value: Exception, + tb, + request: Request): + """ + Callback when sending order caused exception. + """ + # Record exception if not ConnectionError + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) + + def on_qry_instrument(self, data, request): + """""" + for d in data: + contract = ContractData( + symbol=d['id'], + exchange=Exchange.COINBASE, + name=d['display_name'], + product=Product.SPOT, + pricetick=d['quote_increment'], + size=d['base_min_size'], + stop_supported=(not d['limit_only']), + net_position=True, + history_data=False, + gateway_name=self.gateway_name, + ) + + self.gateway.on_contract(contract) + + self.gateway.product_id.append(d['id']) + self.gateway.received_instrument = True + + def send_order(self, req: OrderRequest): + """""" + if not self.check_rate_limit(): + return + + orderid = str(uuid.uuid1()) + + data = { + "size": req.volume, + "product_id": req.symbol, + "side": DIRECTION_VT2COINBASE[req.direction], + "type": ORDERTYPE_VT2COINBASE[req.type], + "client_oid": orderid, + } + + if req.type == OrderType.LIMIT: + data['price'] = req.price + elif req.type == OrderType.STOP: + data['stop_price'] = req.price + data['stop'] = STOP_VT2COINBASE[req.Direction] + + order = req.create_order_data(orderid, self.gateway_name) + self.add_request( + "POST", + "/orders", + callback=self.on_send_order, + data=json.dumps(data), + params={}, + extra=order, + on_failed=self.on_send_order_failed, + on_error=self.on_send_order_error, + ) + + return order.vt_orderid + + def on_send_order_failed(self, status_code: str, request: Request): + """ + Callback when sending order failed on server. + """ + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) + + if request.response.text: + data = request.response.json() + error = data["message"] + msg = f"委托失败,状态码:{status_code},信息:{error}" + else: + msg = f"委托失败,状态码:{status_code}" + + self.gateway.write_log(msg) + + def on_send_order_error( + self, + exception_type: type, + exception_value: Exception, + tb, + request: Request): + """ + Callback when sending order caused exception. + """ + + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) + + # Record exception if not ConnectionError + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) + + def on_send_order(self, data, request): + """""" + pass + + def cancel_order(self, req: CancelRequest): + """""" + if not self.check_rate_limit(): + return + + orderid = req.orderid + + if orderid not in orderSysDict: + cancelDict[orderid] = req + + self.add_request( + "DELETE", + "/orders/" + orderid, + callback=self.on_cancel_order, + params={}, + on_error=self.on_cancel_order_error, + on_failed=self.on_cancel_order_failed, + ) + + def on_cancel_order_error( + self, + exception_type: type, + exception_value: Exception, + tb, + request: Request): + """ + Callback when cancelling order failed on server. + """ + # Record exception if not ConnectionError + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) + + def on_cancel_order(self, data, request): + """Websocket will push a new order status""" + pass + + def on_cancel_order_failed(self, status_code: str, request: Request): + """ + Callback when sending order failed on server. + """ + if request.response.text: + data = request.response.json() + error = data["message"] + msg = f"委托失败,状态码:{status_code},信息:{error}" + else: + msg = f"委托失败,状态码:{status_code}" + + self.gateway.write_log(msg) + + def on_failed(self, status_code: int, request: Request): + """ + Callback to handle request failed. + """ + data = request.response.json() + error = data['message'] + msg = f"请求失败,状态码:{status_code},信息:{error}" + self.gateway.write_log(msg) + + def on_error( + self, + exception_type: type, + exception_value: Exception, + tb, + request: Request): + """ + Callback to handler request exception. + """ + msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" + self.gateway.write_log(msg) + + sys.stderr.write( + self.exception_detail(exception_type, exception_value, tb, request) + ) + + def reset_rate_limit(self): + """ + reset the rate limit every 1 sec + """ + self.rate_limit_remaining = 5 + + def check_rate_limit(self): + """ + Called before send requests + """ + if self.rate_limit_remaining: + self.rate_limit_remaining -= 1 + return True + else: + self.gateway.write_log("已超出请求速率上限,请稍后重试") + return False + + +def get_auth_header( + timestamp, + message, + api_key, + secret_key, + passphrase): + """""" + message = message.encode("ascii") + hmac_key = base64.b64decode(secret_key) + signature = hmac.new(hmac_key, message, hashlib.sha256) + signature_b64 = base64.b64encode(signature.digest()).decode('utf-8') + return{ + 'Content-Type': 'Application/JSON', + 'CB-ACCESS-SIGN': signature_b64, + 'CB-ACCESS-TIMESTAMP': timestamp, + 'CB-ACCESS-KEY': api_key, + 'CB-ACCESS-PASSPHRASE': passphrase + } From 2ab2e49121f1a063cff7a71ea6a7227c41ab69e2 Mon Sep 17 00:00:00 2001 From: LimingFang <28479651+LimingFang@users.noreply.github.com> Date: Wed, 4 Sep 2019 15:51:35 +0800 Subject: [PATCH 2/8] update coinbase --- vnpy/gateway/coinbase/coinbase_gateway.py | 99 +++++++++-------------- 1 file changed, 40 insertions(+), 59 deletions(-) diff --git a/vnpy/gateway/coinbase/coinbase_gateway.py b/vnpy/gateway/coinbase/coinbase_gateway.py index 9c3a6f29..a2c32567 100644 --- a/vnpy/gateway/coinbase/coinbase_gateway.py +++ b/vnpy/gateway/coinbase/coinbase_gateway.py @@ -23,7 +23,6 @@ from vnpy.trader.constant import ( OrderType, Product, Status, - Offset, Interval ) from vnpy.trader.gateway import BaseGateway @@ -31,7 +30,6 @@ from vnpy.trader.object import ( TickData, OrderData, TradeData, - PositionData, AccountData, ContractData, BarData, @@ -49,8 +47,6 @@ SANDBOX_WEBSOCKET_HOST = "wss://ws-feed-public.sandbox.pro.coinbase.com" DIRECTION_VT2COINBASE = {Direction.LONG: "buy", Direction.SHORT: "sell"} DIRECTION_COINBASE2VT = {v: k for k, v in DIRECTION_VT2COINBASE.items()} -STOP_VT2COINBASE = {Direction.LONG: "entry", Direction.SHORT: "loss"} - ORDERTYPE_VT2COINBASE = { OrderType.LIMIT: "limit", OrderType.MARKET: "market", @@ -96,13 +92,6 @@ class CoinbaseGateway(BaseGateway): self.rest_api = CoinbaseRestApi(self) self.ws_api = CoinbaseWebsocketApi(self) - self.rest_api_inited = False - - self.product_id = [] - self.received_instrument = False - - event_engine.register(EVENT_TIMER, self.process_timer_event) - def connect(self, setting: dict): """""" key = setting["ID"] @@ -120,10 +109,7 @@ class CoinbaseGateway(BaseGateway): self.rest_api.connect(key, secret, passphrase, session_number, server, proxy_host, proxy_port) - while not self.received_instrument: - time.sleep(0.5) - self.write_log("合约查询成功") self.ws_api.connect( key, secret, @@ -146,7 +132,7 @@ class CoinbaseGateway(BaseGateway): def query_account(self): """""" - return self.rest_api.qry_account() + return self.rest_api.query_account() def query_position(self): """ @@ -168,12 +154,11 @@ class CoinbaseGateway(BaseGateway): def process_timer_event(self, event: Event): """""" self.rest_api.reset_rate_limit() - if self.rest_api_inited: - self.init_query() + self.init_query() def init_query(self): """""" - self.rest_api.qry_account() + self.rest_api.query_account() class CoinbaseWebsocketApi(WebsocketClient): @@ -226,6 +211,8 @@ class CoinbaseWebsocketApi(WebsocketClient): self.init(SANDBOX_WEBSOCKET_HOST, proxy_host, proxy_port) self.start() + self.gateway.event_engine.register(EVENT_TIMER, self.gateway.process_timer_event) + def subscribe(self, req: SubscribeRequest): """""" symbol = req.symbol @@ -269,10 +256,11 @@ class CoinbaseWebsocketApi(WebsocketClient): """ callback when data is received and unpacked """ - if packet['type'] == 'error': - self.gateway.write_log("Websocket API报错: %s" % packet['message']) - self.gateway.write_log("Websocket API报错原因是: %s" % packet['reason']) + self.gateway.write_log( + "Websocket API报错: %s" % packet['message']) + self.gateway.write_log( + "Websocket API报错原因是: %s" % packet['reason']) self.active = False else: @@ -385,6 +373,7 @@ class OrderBook(): self.gateway = gateway self.newest_tick = TickData( "COINBASE", symbol, exchange, datetime.now()) + self.first_update = False def on_message(self, d: dict): """ @@ -426,21 +415,19 @@ class OrderBook(): self.generate_tick(dt) + def on_ticker(self, d: dict): """ call back when type is ticker """ tick = self.newest_tick - tick.openPrice = float(d['open_24h']) - tick.highPrice = float(d['high_24h']) - tick.lowPrice = float(d['low_24h']) - tick.lastPrice = float(d['price']) + tick.open_price = float(d['open_24h']) + tick.high_price = float(d['high_24h']) + tick.low_price = float(d['low_24h']) + tick.last_price = float(d['price']) tick.volume = float(d['volume_24h']) - dt = datetime.strptime( - d['time'], "%Y-%m-%dT%H:%M:%S.%fZ") - self.gateway.on_tick(copy(tick)) def on_snapshot(self, asks: Sequence[List], bids: Sequence[List]): @@ -556,47 +543,44 @@ class CoinbaseRestApi(RestClient): self.init(SANDBOX_REST_HOST, proxy_host, proxy_port) self.start(session_number) - self.gateway.rest_api_inited = True - - self.qry_instrument() - # self.qry_orders() + self.query_instrument() + self.query_orders() self.gateway.write_log("REST API启动成功") - def qry_instrument(self): + def query_instrument(self): """ Get the instrument of Coinbase """ self.add_request( "GET", "/products", - callback=self.on_qry_instrument, + callback=self.on_query_instrument, params={}, - on_error=self.on_qry_instrument_error, + on_error=self.on_query_instrument_error, ) - def qry_orders(self): + def query_orders(self): """""" - params = {"status": "all"} self.add_request( "GET", - "/orders", - callback=self.on_qry_orders, - params=params, - on_error=self.on_qry_orders_error, + "/orders?status=all", + callback=self.on_query_orders, + params={}, + on_error=self.on_query_orders_error, ) - def qry_account(self): + def query_account(self): """""" self.add_request( "GET", "/accounts", - callback=self.on_qry_account, + callback=self.on_query_account, params={}, - on_error=self.on_qry_account_error, + on_error=self.on_query_account_error, ) - def on_qry_account(self, data, request): + def on_query_account(self, data, request): """""" for acc in data: account_id = str(acc['id']) @@ -612,7 +596,7 @@ class CoinbaseRestApi(RestClient): self.gateway.on_account(copy(account)) - def on_qry_account_error( + def on_query_account_error( self, exception_type: type, exception_value: Exception, @@ -622,7 +606,7 @@ class CoinbaseRestApi(RestClient): if not issubclass(exception_type, ConnectionError): self.on_error(exception_type, exception_value, tb, request) - def on_qry_orders_error( + def on_query_orders_error( self, exception_type: type, exception_value: Exception, @@ -632,12 +616,12 @@ class CoinbaseRestApi(RestClient): if not issubclass(exception_type, ConnectionError): self.on_error(exception_type, exception_value, tb, request) - def on_qry_orders(self, data, request): + def on_query_orders(self, data, request): """""" for d in data: date, time = d['created_at'].split('T') if d['status'] == 'open': - if not d['filled_size']: + if not float(d['filled_size']): status = Status.NOTTRADED else: status = Status.PARTTRADED @@ -661,11 +645,11 @@ class CoinbaseRestApi(RestClient): self.gateway.on_order(copy(order)) orderDict[order.orderid] = order - orderSysDict[order.orderid] = order.orderID + orderSysDict[order.orderid] = order.orderid - self.gateway.writeLog(u'委托信息查询成功') + self.gateway.write_log(u'委托信息查询成功') - def on_qry_instrument_error( + def on_query_instrument_error( self, exception_type: type, exception_value: Exception, @@ -678,7 +662,7 @@ class CoinbaseRestApi(RestClient): if not issubclass(exception_type, ConnectionError): self.on_error(exception_type, exception_value, tb, request) - def on_qry_instrument(self, data, request): + def on_query_instrument(self, data, request): """""" for d in data: contract = ContractData( @@ -688,7 +672,7 @@ class CoinbaseRestApi(RestClient): product=Product.SPOT, pricetick=d['quote_increment'], size=d['base_min_size'], - stop_supported=(not d['limit_only']), + stop_supported=False, net_position=True, history_data=False, gateway_name=self.gateway_name, @@ -696,8 +680,7 @@ class CoinbaseRestApi(RestClient): self.gateway.on_contract(contract) - self.gateway.product_id.append(d['id']) - self.gateway.received_instrument = True + self.gateway.write_log("") def send_order(self, req: OrderRequest): """""" @@ -716,9 +699,7 @@ class CoinbaseRestApi(RestClient): if req.type == OrderType.LIMIT: data['price'] = req.price - elif req.type == OrderType.STOP: - data['stop_price'] = req.price - data['stop'] = STOP_VT2COINBASE[req.Direction] + order = req.create_order_data(orderid, self.gateway_name) self.add_request( From 2908442fd8ad1f53f13b91ec55ef1c4454fc0da1 Mon Sep 17 00:00:00 2001 From: LimingFang <28479651+LimingFang@users.noreply.github.com> Date: Wed, 4 Sep 2019 15:52:41 +0800 Subject: [PATCH 3/8] update constant coinbase --- vnpy/trader/constant.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vnpy/trader/constant.py b/vnpy/trader/constant.py index 73ae1ee4..7bce5e10 100644 --- a/vnpy/trader/constant.py +++ b/vnpy/trader/constant.py @@ -115,7 +115,7 @@ class Exchange(Enum): HUOBI = "HUOBI" BITFINEX = "BITFINEX" BINANCE = "BINANCE" - + COINBASE = "COINBASE" class Currency(Enum): """ From 019da35a554ed46ba47c7a057fc259fec1fb27a2 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Wed, 4 Sep 2019 22:41:33 +0800 Subject: [PATCH 4/8] [Fix] close #2055 --- vnpy/chart/widget.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/vnpy/chart/widget.py b/vnpy/chart/widget.py index 45c0b26c..8d6ff343 100644 --- a/vnpy/chart/widget.py +++ b/vnpy/chart/widget.py @@ -106,6 +106,10 @@ class ChartWidget(pg.PlotWidget): # Store plot object in dict self._plots[plot_name] = plot + # Add plot onto the layout + self._layout.nextRow() + self._layout.addItem(plot) + def add_item( self, item_class: Type[ChartItem], @@ -120,10 +124,8 @@ class ChartWidget(pg.PlotWidget): plot = self._plots.get(plot_name) plot.addItem(item) - self._item_plot_map[item] = plot - self._layout.nextRow() - self._layout.addItem(plot) + self._item_plot_map[item] = plot def get_plot(self, plot_name: str) -> pg.PlotItem: """ From 77c2b52842aee8f676d7fad55ffb368477790f06 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Thu, 5 Sep 2019 11:23:37 +0800 Subject: [PATCH 5/8] [Mod] complete general test of coinbase gateway --- examples/vn_trader/run.py | 6 ++- vnpy/gateway/coinbase/coinbase_gateway.py | 55 +++++++++++++++-------- 2 files changed, 41 insertions(+), 20 deletions(-) diff --git a/examples/vn_trader/run.py b/examples/vn_trader/run.py index 66450992..74e3ca1f 100644 --- a/examples/vn_trader/run.py +++ b/examples/vn_trader/run.py @@ -28,6 +28,7 @@ from vnpy.gateway.okexs import OkexsGateway # from vnpy.gateway.tora import ToraGateway # from vnpy.gateway.alpaca import AlpacaGateway from vnpy.gateway.da import DaGateway +from vnpy.gateway.coinbase import CoinbaseGateway from vnpy.app.cta_strategy import CtaStrategyApp # from vnpy.app.csv_loader import CsvLoaderApp @@ -69,8 +70,9 @@ def main(): # main_engine.add_gateway(TapGateway) # main_engine.add_gateway(ToraGateway) # main_engine.add_gateway(AlpacaGateway) - main_engine.add_gateway(OkexsGateway) - main_engine.add_gateway(DaGateway) + # main_engine.add_gateway(OkexsGateway) + # main_engine.add_gateway(DaGateway) + main_engine.add_gateway(CoinbaseGateway) main_engine.add_app(CtaStrategyApp) main_engine.add_app(CtaBacktesterApp) diff --git a/vnpy/gateway/coinbase/coinbase_gateway.py b/vnpy/gateway/coinbase/coinbase_gateway.py index a2c32567..6438aba6 100644 --- a/vnpy/gateway/coinbase/coinbase_gateway.py +++ b/vnpy/gateway/coinbase/coinbase_gateway.py @@ -41,6 +41,7 @@ from vnpy.trader.object import ( REST_HOST = "https://api.pro.coinbase.com" WEBSOCKET_HOST = "wss://ws-feed.pro.coinbase.com" + SANDBOX_REST_HOST = "https://api-public.sandbox.pro.coinbase.com" SANDBOX_WEBSOCKET_HOST = "wss://ws-feed-public.sandbox.pro.coinbase.com" @@ -68,6 +69,7 @@ TIMEDELTA_MAP = { cancelDict = {} # orderid:cancelreq orderDict = {} # sysid:order orderSysDict = {} # orderid:sysid +symbol_name_map = {} class CoinbaseGateway(BaseGateway): @@ -84,11 +86,13 @@ class CoinbaseGateway(BaseGateway): "proxy_host": "", "proxy_port": "", } + exchanges = [Exchange.COINBASE] def __init__(self, event_engine): """Constructor""" super(CoinbaseGateway, self).__init__(event_engine, "COINBASE") + self.rest_api = CoinbaseRestApi(self) self.ws_api = CoinbaseWebsocketApi(self) @@ -107,8 +111,15 @@ class CoinbaseGateway(BaseGateway): else: proxy_port = 0 - self.rest_api.connect(key, secret, passphrase, session_number, server, - proxy_host, proxy_port) + self.rest_api.connect( + key, + secret, + passphrase, + session_number, + server, + proxy_host, + proxy_port + ) self.ws_api.connect( key, @@ -186,8 +197,8 @@ class CoinbaseWebsocketApi(WebsocketClient): "done": self.on_order_done, "match": self.on_order_match, } - self.ticks = {} + self.ticks = {} self.accounts = {} self.orderbooks = {} @@ -349,10 +360,9 @@ class CoinbaseWebsocketApi(WebsocketClient): orderid=order.orderid, tradeid=packet['trade_id'], direction=DIRECTION_COINBASE2VT[packet['side']], - price=packet['price'], - volume=packet['size'], - time=datetime.strptime( - packet['time'], "%Y-%m-%dT%H:%M:%S.%fZ"), + price=float(packet['price']), + volume=float(packet['size']), + time=packet['time'], gateway_name=self.gateway_name, ) self.gateway.on_trade(trade) @@ -372,7 +382,12 @@ class OrderBook(): self.bids = dict() self.gateway = gateway self.newest_tick = TickData( - "COINBASE", symbol, exchange, datetime.now()) + symbol=symbol, + exchange=exchange, + name=symbol_name_map.get(symbol, ""), + datetime=datetime.now(), + gateway_name=gateway.gateway_name, + ) self.first_update = False def on_message(self, d: dict): @@ -583,16 +598,19 @@ class CoinbaseRestApi(RestClient): def on_query_account(self, data, request): """""" for acc in data: - account_id = str(acc['id']) + account_id = str(acc['currency']) + account = self.accounts.get(account_id, None) if not account: - account = AccountData(accountid=account_id, - gateway_name=self.gateway_name) + account = AccountData( + accountid=account_id, + gateway_name=self.gateway_name + ) self.accounts[account_id] = account - account.balance = acc.get("balance", account.balance) - account.available = acc.get("available", account.available) - account.frozen = acc.get("hold", account.frozen) + account.balance = float(acc.get("balance", account.balance)) + account.available = float(acc.get("available", account.available)) + account.frozen = float(acc.get("hold", account.frozen)) self.gateway.on_account(copy(account)) @@ -670,17 +688,18 @@ class CoinbaseRestApi(RestClient): exchange=Exchange.COINBASE, name=d['display_name'], product=Product.SPOT, - pricetick=d['quote_increment'], - size=d['base_min_size'], - stop_supported=False, + pricetick=float(d['quote_increment']), + size=1, + min_volume=float(d['base_min_size']), net_position=True, history_data=False, gateway_name=self.gateway_name, ) self.gateway.on_contract(contract) + symbol_name_map[contract.symbol] = contract.name - self.gateway.write_log("") + self.gateway.write_log("合约信息查询成功") def send_order(self, req: OrderRequest): """""" From eec5d0a7692b8ddac92199c0b87eec4588fae0d9 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Thu, 5 Sep 2019 13:13:39 +0800 Subject: [PATCH 6/8] [Mod] improve code quality of coinbase gateway --- vnpy/gateway/coinbase/coinbase_gateway.py | 210 +++++++--------------- 1 file changed, 66 insertions(+), 144 deletions(-) diff --git a/vnpy/gateway/coinbase/coinbase_gateway.py b/vnpy/gateway/coinbase/coinbase_gateway.py index 6438aba6..6c262cae 100644 --- a/vnpy/gateway/coinbase/coinbase_gateway.py +++ b/vnpy/gateway/coinbase/coinbase_gateway.py @@ -5,7 +5,6 @@ import sys import time from copy import copy from datetime import datetime, timedelta -from threading import Lock import base64 import uuid @@ -112,12 +111,12 @@ class CoinbaseGateway(BaseGateway): proxy_port = 0 self.rest_api.connect( - key, - secret, - passphrase, - session_number, + key, + secret, + passphrase, + session_number, server, - proxy_host, + proxy_host, proxy_port ) @@ -127,7 +126,10 @@ class CoinbaseGateway(BaseGateway): passphrase, server, proxy_host, - proxy_port) + proxy_port + ) + + self.init_query() def subscribe(self, req: SubscribeRequest): """""" @@ -164,12 +166,11 @@ class CoinbaseGateway(BaseGateway): def process_timer_event(self, event: Event): """""" - self.rest_api.reset_rate_limit() - self.init_query() + self.rest_api.query_account() def init_query(self): """""" - self.rest_api.query_account() + self.event_engine.register(EVENT_TIMER, self.process_timer_event) class CoinbaseWebsocketApi(WebsocketClient): @@ -198,18 +199,17 @@ class CoinbaseWebsocketApi(WebsocketClient): "match": self.on_order_match, } - self.ticks = {} - self.accounts = {} self.orderbooks = {} def connect( - self, - key: str, - secret: str, - passphrase: str, - server: str, - proxy_host: str, - proxy_port: int): + self, + key: str, + secret: str, + passphrase: str, + server: str, + proxy_host: str, + proxy_port: int + ): """""" self.gateway.write_log("开始连接ws接口") self.key = key @@ -220,9 +220,8 @@ class CoinbaseWebsocketApi(WebsocketClient): self.init(WEBSOCKET_HOST, proxy_host, proxy_port) else: self.init(SANDBOX_WEBSOCKET_HOST, proxy_host, proxy_port) - self.start() - self.gateway.event_engine.register(EVENT_TIMER, self.gateway.process_timer_event) + self.start() def subscribe(self, req: SubscribeRequest): """""" @@ -240,12 +239,15 @@ class CoinbaseWebsocketApi(WebsocketClient): timestamp = str(time.time()) message = timestamp + 'GET' + '/users/self/verify' + auth_headers = get_auth_header( timestamp, message, self.key, self.secret, - self.passphrase) + self.passphrase + ) + sub_req['signature'] = auth_headers['CB-ACCESS-SIGN'] sub_req['key'] = auth_headers['CB-ACCESS-KEY'] sub_req['passphrase'] = auth_headers['CB-ACCESS-PASSPHRASE'] @@ -340,11 +342,14 @@ class CoinbaseWebsocketApi(WebsocketClient): order = orderDict.get(packet['order_id'], None) if not order: return + order.traded = order.volume - float(packet['remaining_size']) + if packet['reason'] == 'filled': order.status = Status.ALLTRADED else: order.status = Status.CANCELLED + self.gateway.on_order(copy(order)) def on_order_match(self, packet: dict): @@ -381,13 +386,15 @@ class OrderBook(): self.asks = dict() self.bids = dict() self.gateway = gateway - self.newest_tick = TickData( - symbol=symbol, - exchange=exchange, + + self.tick = TickData( + symbol=symbol, + exchange=exchange, name=symbol_name_map.get(symbol, ""), datetime=datetime.now(), gateway_name=gateway.gateway_name, ) + self.first_update = False def on_message(self, d: dict): @@ -430,12 +437,11 @@ class OrderBook(): self.generate_tick(dt) - def on_ticker(self, d: dict): """ call back when type is ticker """ - tick = self.newest_tick + tick = self.tick tick.open_price = float(d['open_24h']) tick.high_price = float(d['high_24h']) @@ -457,7 +463,7 @@ class OrderBook(): def generate_tick(self, dt: datetime): """""" - tick = self.newest_tick + tick = self.tick bids_keys = self.bids.keys() bids_keys = sorted(bids_keys, reverse=True) @@ -509,16 +515,8 @@ class CoinbaseRestApi(RestClient): self.secret = "" self.passphrase = "" - self.order_count = 1_000_000 - self.order_count_lock = Lock() - - self.connect_time = 0 - self.accounts = {} - self.rate_limit = 5 - self.rate_limit_remaining = 5 - def sign(self, request): """ Generate Coinbase signature @@ -549,17 +547,15 @@ class CoinbaseRestApi(RestClient): self.secret = secret.encode() self.passphrase = passphrase - self.connect_time = ( - int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count - ) if server == "REAL": self.init(REST_HOST, proxy_host, proxy_port) else: self.init(SANDBOX_REST_HOST, proxy_host, proxy_port) self.start(session_number) + self.query_instrument() - self.query_orders() + self.query_order() self.gateway.write_log("REST API启动成功") @@ -570,19 +566,15 @@ class CoinbaseRestApi(RestClient): self.add_request( "GET", "/products", - callback=self.on_query_instrument, - params={}, - on_error=self.on_query_instrument_error, + callback=self.on_query_instrument ) - def query_orders(self): + def query_order(self): """""" self.add_request( "GET", - "/orders?status=all", - callback=self.on_query_orders, - params={}, - on_error=self.on_query_orders_error, + "/orders?status=open", + callback=self.on_query_order ) def query_account(self): @@ -591,8 +583,6 @@ class CoinbaseRestApi(RestClient): "GET", "/accounts", callback=self.on_query_account, - params={}, - on_error=self.on_query_account_error, ) def on_query_account(self, data, request): @@ -614,30 +604,11 @@ class CoinbaseRestApi(RestClient): self.gateway.on_account(copy(account)) - def on_query_account_error( - self, - exception_type: type, - exception_value: Exception, - tb, - request): - """""" - if not issubclass(exception_type, ConnectionError): - self.on_error(exception_type, exception_value, tb, request) - - def on_query_orders_error( - self, - exception_type: type, - exception_value: Exception, - tb, - request): - """""" - if not issubclass(exception_type, ConnectionError): - self.on_error(exception_type, exception_value, tb, request) - - def on_query_orders(self, data, request): + def on_query_order(self, data, request): """""" for d in data: date, time = d['created_at'].split('T') + if d['status'] == 'open': if not float(d['filled_size']): status = Status.NOTTRADED @@ -648,6 +619,7 @@ class CoinbaseRestApi(RestClient): status = Status.ALLTRADED else: status = Status.CANCELLED + order = OrderData( symbol=d['product_id'], gateway_name=self.gateway_name, @@ -667,19 +639,6 @@ class CoinbaseRestApi(RestClient): self.gateway.write_log(u'委托信息查询成功') - def on_query_instrument_error( - self, - exception_type: type, - exception_value: Exception, - tb, - request: Request): - """ - Callback when sending order caused exception. - """ - # Record exception if not ConnectionError - if not issubclass(exception_type, ConnectionError): - self.on_error(exception_type, exception_value, tb, request) - def on_query_instrument(self, data, request): """""" for d in data: @@ -703,9 +662,6 @@ class CoinbaseRestApi(RestClient): def send_order(self, req: OrderRequest): """""" - if not self.check_rate_limit(): - return - orderid = str(uuid.uuid1()) data = { @@ -719,7 +675,6 @@ class CoinbaseRestApi(RestClient): if req.type == OrderType.LIMIT: data['price'] = req.price - order = req.create_order_data(orderid, self.gateway_name) self.add_request( "POST", @@ -732,6 +687,7 @@ class CoinbaseRestApi(RestClient): on_error=self.on_send_order_error, ) + self.gateway.on_order(order) return order.vt_orderid def on_send_order_failed(self, status_code: str, request: Request): @@ -752,11 +708,12 @@ class CoinbaseRestApi(RestClient): self.gateway.write_log(msg) def on_send_order_error( - self, - exception_type: type, - exception_value: Exception, - tb, - request: Request): + self, + exception_type: type, + exception_value: Exception, + tb, + request: Request + ): """ Callback when sending order caused exception. """ @@ -775,36 +732,15 @@ class CoinbaseRestApi(RestClient): def cancel_order(self, req: CancelRequest): """""" - if not self.check_rate_limit(): - return - orderid = req.orderid - if orderid not in orderSysDict: - cancelDict[orderid] = req - self.add_request( "DELETE", - "/orders/" + orderid, + f"/orders/client:{orderid}", callback=self.on_cancel_order, - params={}, - on_error=self.on_cancel_order_error, on_failed=self.on_cancel_order_failed, ) - def on_cancel_order_error( - self, - exception_type: type, - exception_value: Exception, - tb, - request: Request): - """ - Callback when cancelling order failed on server. - """ - # Record exception if not ConnectionError - if not issubclass(exception_type, ConnectionError): - self.on_error(exception_type, exception_value, tb, request) - def on_cancel_order(self, data, request): """Websocket will push a new order status""" pass @@ -816,9 +752,9 @@ class CoinbaseRestApi(RestClient): if request.response.text: data = request.response.json() error = data["message"] - msg = f"委托失败,状态码:{status_code},信息:{error}" + msg = f"撤单失败,状态码:{status_code},信息:{error}" else: - msg = f"委托失败,状态码:{status_code}" + msg = f"撤单失败,状态码:{status_code}" self.gateway.write_log(msg) @@ -832,11 +768,12 @@ class CoinbaseRestApi(RestClient): self.gateway.write_log(msg) def on_error( - self, - exception_type: type, - exception_value: Exception, - tb, - request: Request): + self, + exception_type: type, + exception_value: Exception, + tb, + request: Request + ): """ Callback to handler request exception. """ @@ -847,35 +784,20 @@ class CoinbaseRestApi(RestClient): self.exception_detail(exception_type, exception_value, tb, request) ) - def reset_rate_limit(self): - """ - reset the rate limit every 1 sec - """ - self.rate_limit_remaining = 5 - - def check_rate_limit(self): - """ - Called before send requests - """ - if self.rate_limit_remaining: - self.rate_limit_remaining -= 1 - return True - else: - self.gateway.write_log("已超出请求速率上限,请稍后重试") - return False - def get_auth_header( - timestamp, - message, - api_key, - secret_key, - passphrase): + timestamp, + message, + api_key, + secret_key, + passphrase +): """""" message = message.encode("ascii") hmac_key = base64.b64decode(secret_key) signature = hmac.new(hmac_key, message, hashlib.sha256) signature_b64 = base64.b64encode(signature.digest()).decode('utf-8') + return{ 'Content-Type': 'Application/JSON', 'CB-ACCESS-SIGN': signature_b64, From 8152682a2bc536d6e04c204fc6d0857203192715 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Thu, 5 Sep 2019 22:18:57 +0800 Subject: [PATCH 7/8] [Add] query history data for coinbase gateway --- vnpy/gateway/coinbase/coinbase_gateway.py | 90 +++++++++++++++++++++-- 1 file changed, 84 insertions(+), 6 deletions(-) diff --git a/vnpy/gateway/coinbase/coinbase_gateway.py b/vnpy/gateway/coinbase/coinbase_gateway.py index 6c262cae..b4acb408 100644 --- a/vnpy/gateway/coinbase/coinbase_gateway.py +++ b/vnpy/gateway/coinbase/coinbase_gateway.py @@ -54,9 +54,9 @@ ORDERTYPE_VT2COINBASE = { ORDERTYPE_COINBASE2VT = {v: k for k, v in ORDERTYPE_VT2COINBASE.items()} INTERVAL_VT2COINBASE = { - Interval.MINUTE: "1m", - Interval.HOUR: "1h", - Interval.DAILY: "1d", + Interval.MINUTE: 60, + Interval.HOUR: 3600, + Interval.DAILY: 86400, } TIMEDELTA_MAP = { @@ -145,7 +145,7 @@ class CoinbaseGateway(BaseGateway): def query_account(self): """""" - return self.rest_api.query_account() + self.rest_api.query_account() def query_position(self): """ @@ -157,7 +157,7 @@ class CoinbaseGateway(BaseGateway): """ Query bar history data. """ - pass + return self.rest_api.query_history(req) def close(self): """""" @@ -651,7 +651,7 @@ class CoinbaseRestApi(RestClient): size=1, min_volume=float(d['base_min_size']), net_position=True, - history_data=False, + history_data=True, gateway_name=self.gateway_name, ) @@ -784,6 +784,84 @@ class CoinbaseRestApi(RestClient): self.exception_detail(exception_type, exception_value, tb, request) ) + def query_history(self, req: HistoryRequest): + """""" + history = [] + count = 300 + start = req.start + path = f"/products/{req.symbol}/candles" + time_delta = TIMEDELTA_MAP[req.interval] + + while True: + # Break if start time later than end time + if start > req.end: + break + + # Calculate start and end time for this query + start_time = start.isoformat() + + end = start + time_delta * count + end = min(end, req.end) + end_time = end.isoformat() + + # Create query params + params = { + "start": start_time, + "end": end_time, + "granularity": INTERVAL_VT2COINBASE[req.interval], + } + + # Get response from server + resp = self.request( + "GET", + path, + params=params + ) + + # Break if request failed with other status code + if resp.status_code // 100 != 2: + msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}" + self.gateway.write_log(msg) + break + else: + data = resp.json() + if not data: + msg = f"获取历史数据为空,开始时间:{start_time},数量:{count}" + break + + # Reverse data list + data.reverse() + buf = [] + + for l in data[1:]: + dt = datetime.fromtimestamp(l[0]) + o, h, l, c, v = l[1:] + bar = BarData( + symbol=req.symbol, + exchange=req.exchange, + datetime=dt, + interval=req.interval, + volume=v, + open_price=o, + high_price=h, + low_price=l, + close_price=c, + gateway_name=self.gateway_name + ) + buf.append(bar) + + history.extend(buf) + + begin = buf[0].datetime + end = buf[-1].datetime + msg = f"获取历史数据成功,{req.symbol} - {req.interval.value},{begin} - {end}" + self.gateway.write_log(msg) + + # Update start time + start = bar.datetime + TIMEDELTA_MAP[req.interval] + + return history + def get_auth_header( timestamp, From 6b0bb3cbdb7941ecf991b02d7e95092289f22f49 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Thu, 5 Sep 2019 22:55:03 +0800 Subject: [PATCH 8/8] [Mod] change order lifecycle management detail of coinbase gateway --- vnpy/gateway/coinbase/coinbase_gateway.py | 65 +++++++++++++---------- 1 file changed, 36 insertions(+), 29 deletions(-) diff --git a/vnpy/gateway/coinbase/coinbase_gateway.py b/vnpy/gateway/coinbase/coinbase_gateway.py index b4acb408..1730eb72 100644 --- a/vnpy/gateway/coinbase/coinbase_gateway.py +++ b/vnpy/gateway/coinbase/coinbase_gateway.py @@ -65,9 +65,7 @@ TIMEDELTA_MAP = { Interval.DAILY: timedelta(days=1), } -cancelDict = {} # orderid:cancelreq -orderDict = {} # sysid:order -orderSysDict = {} # orderid:sysid +sys_order_map = {} symbol_name_map = {} @@ -296,30 +294,23 @@ class CoinbaseWebsocketApi(WebsocketClient): """ Call back when order is received by Coinbase """ - client_oid = packet['client_oid'] + orderid = packet['client_oid'] sysid = packet['order_id'] order = OrderData( symbol=packet['product_id'], exchange=Exchange.COINBASE, type=ORDERTYPE_COINBASE2VT[packet['order_type']], - orderid=sysid, + orderid=orderid, direction=DIRECTION_COINBASE2VT[packet['side']], price=float(packet['price']), volume=float(packet['size']), time=packet['time'], + status=Status.NOTTRADED, gateway_name=self.gateway_name, ) - order.traded = 0 - order.status = Status.NOTTRADED - - orderSysDict[client_oid] = sysid - orderDict[sysid] = order - - if client_oid in cancelDict: - req = cancelDict[client_oid] - self.gateway.cancel_order(req) + sys_order_map[sysid] = order self.gateway.on_order(copy(order)) @@ -327,9 +318,10 @@ class CoinbaseWebsocketApi(WebsocketClient): """ Call back when the order is on the orderbook """ - orderid = packet['order_id'] - order = orderDict.get(orderid) - order.traded = float(order.volume) - float(packet['remaining_size']) + sysid = packet['order_id'] + order = sys_order_map[sysid] + + order.traded = order.volume - float(packet['remaining_size']) if order.traded: order.status = Status.PARTTRADED @@ -339,8 +331,10 @@ class CoinbaseWebsocketApi(WebsocketClient): """ Call back when the order is done """ - order = orderDict.get(packet['order_id'], None) - if not order: + sysid = packet['order_id'] + order = sys_order_map[sysid] + + if order.status == Status.CANCELLED: return order.traded = order.volume - float(packet['remaining_size']) @@ -354,10 +348,10 @@ class CoinbaseWebsocketApi(WebsocketClient): def on_order_match(self, packet: dict): """""" - if packet['maker_order_id'] in orderDict: - order = orderDict[packet['maker_order_id']] + if packet['maker_order_id'] in sys_order_map: + order = sys_order_map[packet['maker_order_id']] else: - order = orderDict[packet['taker_order_id']] + order = sys_order_map[packet['taker_order_id']] trade = TradeData( symbol=packet['product_id'], @@ -634,8 +628,7 @@ class CoinbaseRestApi(RestClient): ) self.gateway.on_order(copy(order)) - orderDict[order.orderid] = order - orderSysDict[order.orderid] = order.orderid + sys_order_map[order.orderid] = order self.gateway.write_log(u'委托信息查询成功') @@ -663,6 +656,8 @@ class CoinbaseRestApi(RestClient): def send_order(self, req: OrderRequest): """""" orderid = str(uuid.uuid1()) + order = req.create_order_data(orderid, self.gateway_name) + self.gateway.on_order(order) data = { "size": req.volume, @@ -675,7 +670,6 @@ class CoinbaseRestApi(RestClient): if req.type == OrderType.LIMIT: data['price'] = req.price - order = req.create_order_data(orderid, self.gateway_name) self.add_request( "POST", "/orders", @@ -687,7 +681,6 @@ class CoinbaseRestApi(RestClient): on_error=self.on_send_order_error, ) - self.gateway.on_order(order) return order.vt_orderid def on_send_order_failed(self, status_code: str, request: Request): @@ -734,16 +727,30 @@ class CoinbaseRestApi(RestClient): """""" orderid = req.orderid + # For open orders from previous trading session, use sysid to cancel + if orderid in sys_order_map: + path = f"/orders/{orderid}" + # For open orders from currenct trading session, use client_oid to cancel + else: + path = f"/orders/client:{orderid}" + self.add_request( "DELETE", - f"/orders/client:{orderid}", + path, callback=self.on_cancel_order, on_failed=self.on_cancel_order_failed, ) def on_cancel_order(self, data, request): - """Websocket will push a new order status""" - pass + """ + Callback when order cancelled + """ + sysid = data[0] + order = sys_order_map[sysid] + + if order.status != Status.CANCELLED: + order.status = Status.CANCELLED + self.gateway.on_order(copy(order)) def on_cancel_order_failed(self, status_code: str, request: Request): """