diff --git a/README.md b/README.md index 25959d96..e0f82d8b 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ vn.py是一套基于Python的开源量化交易系统开发框架,于2015年1月正式发布,在开源社区5年持续不断的贡献下一步步成长为全功能量化交易平台,目前国内外金融机构用户已经超过300家,包括:私募基金、证券自营和资管、期货资管和子公司、高校研究机构、自营交易公司、交易所、Token Fund等。 -**傻瓜式入门教程**已经在官方微信公众号[**vnpy-community**]全新上线,新手使用过程中有任何疑问看这个解决是最快的,后续会不断增加进阶经验、发布公告、活动报名等功能,请扫描下方二维码关注: +全新的《vn.py全实战进阶》在线课程,已经在官方微信公众号[**vnpy-community**]上线,50节内容覆盖从策略设计开发、参数回测优化,到最终实盘自动交易的完整CTA量化业务流程。购买请扫描下方二维码关注后,点击菜单栏的【进阶课程】按钮即可:
diff --git a/examples/vn_trader/run.py b/examples/vn_trader/run.py index f7c23125..95ac966c 100644 --- a/examples/vn_trader/run.py +++ b/examples/vn_trader/run.py @@ -29,6 +29,7 @@ from vnpy.gateway.okexs import OkexsGateway # from vnpy.gateway.alpaca import AlpacaGateway from vnpy.gateway.da import DaGateway from vnpy.gateway.coinbase import CoinbaseGateway +from vnpy.gateway.bitstamp import BitstampGateway from vnpy.app.cta_strategy import CtaStrategyApp # from vnpy.app.csv_loader import CsvLoaderApp @@ -73,7 +74,8 @@ def main(): # main_engine.add_gateway(AlpacaGateway) # main_engine.add_gateway(OkexsGateway) # main_engine.add_gateway(DaGateway) - main_engine.add_gateway(CoinbaseGateway) + # main_engine.add_gateway(CoinbaseGateway) + main_engine.add_gateway(BitstampGateway) main_engine.add_app(CtaStrategyApp) main_engine.add_app(CtaBacktesterApp) diff --git a/vnpy/gateway/bitstamp/__init__.py b/vnpy/gateway/bitstamp/__init__.py new file mode 100644 index 00000000..575a904b --- /dev/null +++ b/vnpy/gateway/bitstamp/__init__.py @@ -0,0 +1 @@ +from .bitstamp_gateway import BitstampGateway diff --git a/vnpy/gateway/bitstamp/bitstamp_gateway.py b/vnpy/gateway/bitstamp/bitstamp_gateway.py new file mode 100644 index 00000000..d356ae52 --- /dev/null +++ b/vnpy/gateway/bitstamp/bitstamp_gateway.py @@ -0,0 +1,675 @@ +""" +Author: Wudi +bitstamp合约接口 +""" + +import hashlib +import hmac +import sys +import time +import uuid +from copy import copy +from datetime import datetime, timedelta +from urllib.parse import urlencode +from typing import Dict + +import requests + +from vnpy.api.rest import Request, RestClient, RequestStatus +from vnpy.api.websocket import WebsocketClient + +from vnpy.trader.constant import ( + Direction, + Exchange, + OrderType, + Product, + Status, + Interval +) +from vnpy.trader.gateway import BaseGateway, LocalOrderManager +from vnpy.trader.object import ( + TickData, + OrderData, + TradeData, + BarData, + AccountData, + ContractData, + OrderRequest, + CancelRequest, + SubscribeRequest, + HistoryRequest +) + +from vnpy.trader.event import EVENT_TIMER + + +REST_HOST = "https://www.bitstamp.net/api/v2" +WEBSOCKET_HOST = "wss://ws.bitstamp.net" + +STATUS_BITSTAMP2VT = { + "ACTIVE": Status.NOTTRADED, + "PARTIALLY FILLED": Status.PARTTRADED, + "EXECUTED": Status.ALLTRADED, + "CANCELED": Status.CANCELLED, +} + +ORDERTYPE_VT2BITSTAMP = { + OrderType.LIMIT: "EXCHANGE LIMIT", + OrderType.MARKET: "EXCHANGE MARKET", +} + +DIRECTION_VT2BITSTAMP = { + Direction.LONG: "Buy", + Direction.SHORT: "Sell", +} + +DIRECTION_BITSTAMP2VT = { + "0": Direction.LONG, + "1": Direction.SHORT, +} + +INTERVAL_VT2BITSTAMP = { + Interval.MINUTE: "60", + Interval.HOUR: "3600", + Interval.DAILY: "86400", +} + +TIMEDELTA_MAP = { + Interval.MINUTE: timedelta(minutes=1), + Interval.HOUR: timedelta(hours=1), + Interval.DAILY: timedelta(days=1), +} + + +symbol_name_map = {} +name_symbol_map = {} + + +class BitstampGateway(BaseGateway): + """ + VN Trader Gateway for BITSTAMP connection. + """ + + default_setting = { + "key": "", + "secret": "", + "username": "", + "session": 3, + "proxy_host": "127.0.0.1", + "proxy_port": 1080, + } + + exchanges = [Exchange.BITSTAMP] + + def __init__(self, event_engine): + """Constructor""" + super().__init__(event_engine, "BITSTAMP") + + self.order_manager = LocalOrderManager(self) + + self.rest_api = BitstampRestApi(self) + self.ws_api = BitstampWebsocketApi(self) + + def connect(self, setting: dict): + """""" + key = setting["key"] + secret = setting["secret"] + username = setting["username"] + session = setting["session"] + proxy_host = setting["proxy_host"] + proxy_port = setting["proxy_port"] + + self.rest_api.connect(key, secret, username, + session, proxy_host, proxy_port) + self.ws_api.connect(proxy_host, proxy_port) + + self.event_engine.register(EVENT_TIMER, self.process_timer_event) + + def subscribe(self, req: SubscribeRequest): + """""" + self.ws_api.subscribe(req) + + def send_order(self, req: OrderRequest): + """""" + return self.rest_api.send_order(req) + + def cancel_order(self, req: CancelRequest): + """""" + self.rest_api.cancel_order(req) + + def query_account(self): + """""" + pass + + def query_position(self): + """""" + pass + + def query_history(self, req: HistoryRequest): + """""" + pass + + def close(self): + """""" + self.rest_api.stop() + self.ws_api.stop() + + def process_timer_event(self, event): + """""" + self.rest_api.query_account() + + +class BitstampRestApi(RestClient): + """ + Bitstamp REST API + """ + + def __init__(self, gateway: BaseGateway): + """""" + super(BitstampRestApi, self).__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + self.order_manager = gateway.order_manager + + self.key = "" + self.secret = "" + self.username = "qxfe9863" + + self.order_count = 1_000_000 + self.connect_time = 0 + + def connect( + self, + key: str, + secret: str, + username: str, + session: int, + proxy_host: str, + proxy_port: int, + ): + """ + Initialize connection to REST server. + """ + self.key = key + self.secret = secret.encode() + self.username = username + + self.connect_time = ( + int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count + ) + + self.init(REST_HOST, proxy_host, proxy_port) + self.start(session) + + self.gateway.write_log("REST API启动成功") + + self.query_contract() + self.query_account() + + def sign(self, request: Request): + """ + Sign Bitstamp request. + """ + if request.method == "GET": + return request + + timestamp = str(int(round(time.time() * 1000))) + nonce = str(uuid.uuid4()) + content_type = "application/x-www-form-urlencoded" + + # Empty post data leads to API0020 error, + # so use this offset dict instead. + if not request.data: + request.data = {"offset": "1"} + + payload_str = urlencode(request.data) + + message = "BITSTAMP " + self.key + \ + request.method + \ + "www.bitstamp.net/api/v2" + \ + request.path + \ + "" + \ + content_type + \ + nonce + \ + timestamp + \ + "v2" + \ + payload_str + message = message.encode("utf-8") + + signature = hmac.new( + self.secret, + msg=message, + digestmod=hashlib.sha256 + ).hexdigest().upper() + + request.headers = { + "X-Auth": "BITSTAMP " + self.key, + "X-Auth-Signature": signature, + "X-Auth-Nonce": nonce, + "X-Auth-Timestamp": timestamp, + "X-Auth-Version": "v2", + "Content-Type": content_type + } + request.data = payload_str + + return request + + def _process_request( + self, request: Request + ): + """ + Bistamp API server does not support keep-alive connection. + So when using session.request will cause header related error. + Reimplement this method to use requests.request instead. + """ + try: + request = self.sign(request) + + url = self.make_full_url(request.path) + + response = requests.request( + request.method, + url, + headers=request.headers, + params=request.params, + data=request.data, + proxies=self.proxies, + ) + request.response = response + status_code = response.status_code + if status_code // 100 == 2: # 2xx codes are all successful + if status_code == 204: + json_body = None + else: + json_body = response.json() + + request.callback(json_body, request) + request.status = RequestStatus.success + else: + request.status = RequestStatus.failed + + if request.on_failed: + request.on_failed(status_code, request) + else: + self.on_failed(status_code, request) + except Exception: + request.status = RequestStatus.error + t, v, tb = sys.exc_info() + if request.on_error: + request.on_error(t, v, tb, request) + else: + self.on_error(t, v, tb, request) + + def query_order(self): + """""" + path = "/open_orders/all/" + + self.add_request( + method="POST", + path=path, + callback=self.on_query_order + ) + + def on_query_order(self, data, request): + """获取委托订单""" + for d in data: + sys_orderid = d["id"] + local_orderid = self.order_manager.get_local_orderid(sys_orderid) + + direction = DIRECTION_BITSTAMP2VT[d["type"]] + name = d["currency_pair"] + symbol = name_symbol_map[name] + + order = OrderData( + orderid=local_orderid, + symbol=symbol, + exchange=Exchange.BITSTAMP, + price=float(d["price"]), + volume=float(d["amount"]), + traded=float(0), + direction=direction, + status=Status.NOTTRADED, + time=d["datetime"], + gateway_name=self.gateway_name, + ) + self.order_manager.on_order(order) + + self.gateway.write_log("委托信息查询成功") + + def query_account(self): + """""" + path = "/balance/" + + self.add_request( + method="POST", + path=path, + callback=self.on_query_account + ) + + def on_query_account(self, data, request): + """""" + for key in data.keys(): + if "balance" not in key: + continue + currency = key.replace("_balance", "") + + account = AccountData( + accountid=currency, + balance=float(data[currency + "_balance"]), + frozen=float(data[currency + "_reserved"]), + gateway_name=self.gateway_name + ) + self.gateway.on_account(account) + + def query_contract(self): + """""" + self.add_request( + method="GET", + path="/trading-pairs-info/", + callback=self.on_query_contract, + ) + + def on_query_contract(self, data, request): + """""" + for d in data: + pricetick = 1 / pow(10, d["counter_decimals"]) + min_volume = 1 / pow(10, d["base_decimals"]) + + contract = ContractData( + symbol=d["url_symbol"], + exchange=Exchange.BITSTAMP, + name=d["name"], + product=Product.SPOT, + size=1, + pricetick=pricetick, + min_volume=min_volume, + history_data=False, + gateway_name=self.gateway_name, + ) + self.gateway.on_contract(contract) + + symbol_name_map[contract.symbol] = contract.name + name_symbol_map[contract.name] = contract.symbol + + self.gateway.write_log("合约信息查询成功") + + self.query_order() + + def cancel_order(self, req: CancelRequest): + """""" + path = "/cancel_order/" + + sys_orderid = self.order_manager.get_sys_orderid(req.orderid) + + data = {"id": sys_orderid} + + self.add_request( + method="POST", + path=path, + data=data, + callback=self.on_cancel_order, + extra=req + ) + + def on_cancel_order(self, data, request): + """""" + error = data.get("error", "") + if error: + self.gateway.write_log(error) + return + + cancel_request = request.extra + local_orderid = cancel_request.orderid + order = self.order_manager.get_order_with_local_orderid(local_orderid) + + if order.is_active: + order.status = Status.CANCELLED + self.order_manager.on_order(order) + + self.gateway.write_log(f"撤单成功:{order.orderid}") + + def on_cancel_order_error(self, data, request): + """""" + error_msg = data["error"] + self.gateway.write_log(f"撤单请求出错,信息:{error_msg}") + + def send_order(self, req: OrderRequest): + """""" + local_orderid = self.order_manager.new_local_orderid() + order = req.create_order_data( + local_orderid, + self.gateway_name + ) + order.time = datetime.now().strftime("%H:%M:%S") + + data = { + "amount": req.volume, + "price": req.price + } + + if req.direction == Direction.LONG: + if req.type == OrderType.LIMIT: + path = f"/buy/{req.symbol}/" + elif req.type == OrderType.MARKET: + path = f"/buy/market/{req.symbol}/" + else: + if req.type == OrderType.LIMIT: + path = f"/sell/{req.symbol}/" + elif req.type == OrderType.MARKET: + path = f"/sell/market/{req.symbol}/" + + self.add_request( + method="POST", + path=path, + data=data, + callback=self.on_send_order, + extra=order, + ) + self.order_manager.on_order(order) + return order.vt_orderid + + def on_send_order(self, data, request): + """""" + order = request.extra + + status = data.get("status", None) + if status and status == "error": + order.status = Status.REJECTED + self.order_manager.on_order(order) + + msg = data["reason"]["__all__"][0] + self.gateway.write_log(msg) + return + + sys_orderid = data["id"] + self.order_manager.update_orderid_map(order.orderid, sys_orderid) + + order.status = Status.NOTTRADED + self.order_manager.on_order(order) + + def on_send_order_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback when sending order caused exception. + """ + # Record exception if not ConnectionError + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) + + def on_failed(self, status_code: int, request: Request): + """ + Callback to handle request failed. + """ + data = request.response.json() + reason = data["reason"] + code = data["code"] + + msg = f"{request.path} 请求失败,状态码:{status_code},错误信息:{reason},错误代码: {code}" + self.gateway.write_log(msg) + + def on_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback to handler request exception. + """ + msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" + self.gateway.write_log(msg) + + sys.stderr.write( + self.exception_detail(exception_type, exception_value, tb, request) + ) + + +class BitstampWebsocketApi(WebsocketClient): + """""" + + def __init__(self, gateway): + """""" + super().__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + self.order_manager = gateway.order_manager + + self.subscribed: Dict[str, SubscribeRequest] = {} + self.ticks: Dict[str, TickData] = {} + + def connect(self, proxy_host: str, proxy_port: int): + """""" + self.init(WEBSOCKET_HOST, proxy_host, proxy_port) + self.start() + + def on_connected(self): + """""" + self.gateway.write_log("Websocket API连接成功") + + # Auto re-subscribe market data after reconnected + for req in self.subscribed.values(): + self.subscribe(req) + + def subscribe(self, req: SubscribeRequest): + """""" + self.subscribed[req.symbol] = req + if not self._active: + return + + tick = TickData( + symbol=req.symbol, + name=symbol_name_map.get(req.symbol, ""), + exchange=Exchange.BITSTAMP, + datetime=datetime.now(), + gateway_name=self.gateway_name, + ) + + for prefix in [ + "order_book_", + "live_trades_", + "live_orders_" + ]: + channel = f"{prefix}{req.symbol}" + d = { + "event": "bts:subscribe", + "data": { + "channel": channel + } + } + self.ticks[channel] = tick + self.send_packet(d) + + def on_packet(self, packet): + """""" + event = packet["event"] + + if event == "trade": + self.on_market_trade(packet) + elif event == "data": + self.on_market_depth(packet) + elif "order_" in event: + self.on_market_order(packet) + elif event == "bts:request_reconnect": + self._disconnect() # Server requires to reconnect + + def on_market_trade(self, packet): + """""" + channel = packet["channel"] + data = packet["data"] + + tick = self.ticks[channel] + tick.last_price = data["price"] + tick.last_volume = data["amount"] + tick.datetime = datetime.fromtimestamp(int(data["timestamp"])) + + self.gateway.on_tick(copy(tick)) + + # Order status check + buy_orderid = str(data["buy_order_id"]) + sell_orderid = str(data["sell_order_id"]) + + for sys_orderid in [buy_orderid, sell_orderid]: + order = self.order_manager.get_order_with_sys_orderid( + sys_orderid) + + if order: + order.traded += data["amount"] + + if order.traded < order.volume: + order.status = Status.PARTTRADED + else: + order.status = Status.ALLTRADED + + self.order_manager.on_order(copy(order)) + + trade = TradeData( + symbol=order.symbol, + exchange=order.exchange, + orderid=order.orderid, + tradeid=data["id"], + direction=order.direction, + price=data["price"], + volume=data["amount"], + time=tick.datetime.strftime("%H:%M:%S"), + gateway_name=self.gateway_name + ) + self.gateway.on_trade(trade) + + def on_market_depth(self, packet): + """""" + channel = packet["channel"] + data = packet["data"] + + tick = self.ticks[channel] + tick.datetime = datetime.fromtimestamp(int(data["timestamp"])) + + bids = data["bids"] + asks = data["asks"] + + for n in range(5): + ix = n + 1 + + bid_price, bid_volume = bids[n] + tick.__setattr__(f"bid_price_{ix}", float(bid_price)) + tick.__setattr__(f"bid_volume_{ix}", float(bid_volume)) + + ask_price, ask_volume = asks[n] + tick.__setattr__(f"ask_price_{ix}", float(ask_price)) + tick.__setattr__(f"ask_volume_{ix}", float(ask_volume)) + + self.gateway.on_tick(copy(tick)) + + def on_market_order(self, packet): + """""" + event = packet["event"] + data = packet["data"] + + if event != "order_deleted": + return + + sys_orderid = str(data["id"]) + order = self.order_manager.get_order_with_sys_orderid(sys_orderid) + + if order and order.is_active(): + order.status = Status.CANCELLED + self.order_manager.on_order(copy(order)) diff --git a/vnpy/gateway/ib/ib_gateway.py b/vnpy/gateway/ib/ib_gateway.py index 2e4b0d87..795bb344 100644 --- a/vnpy/gateway/ib/ib_gateway.py +++ b/vnpy/gateway/ib/ib_gateway.py @@ -69,11 +69,14 @@ EXCHANGE_VT2IB = { EXCHANGE_IB2VT = {v: k for k, v in EXCHANGE_VT2IB.items()} STATUS_IB2VT = { - "Submitted": Status.NOTTRADED, - "Filled": Status.ALLTRADED, - "Cancelled": Status.CANCELLED, + "ApiPending": Status.SUBMITTING, "PendingSubmit": Status.SUBMITTING, "PreSubmitted": Status.NOTTRADED, + "Submitted": Status.NOTTRADED, + "ApiCancelled": Status.CANCELLED, + "Cancelled": Status.CANCELLED, + "Filled": Status.ALLTRADED, + "Inactive": Status.REJECTED, } PRODUCT_VT2IB = { @@ -358,9 +361,13 @@ class IbApi(EWrapper): orderid = str(orderId) order = self.orders.get(orderid, None) - order.status = STATUS_IB2VT[status] order.traded = filled + # To filter PendingCancel status + order_status = STATUS_IB2VT.get(status, None) + if order_status: + order.status = order_status + self.gateway.on_order(copy(order)) def openOrder( # pylint: disable=invalid-name diff --git a/vnpy/trader/constant.py b/vnpy/trader/constant.py index 41838766..b0cd6d12 100644 --- a/vnpy/trader/constant.py +++ b/vnpy/trader/constant.py @@ -121,6 +121,8 @@ class Exchange(Enum): BINANCE = "BINANCE" BYBIT = "BYBIT" # bybit.com COINBASE = "COINBASE" + BITSTAMP = "BITSTAMP" + # Special Function LOCAL = "LOCAL" # For local generated data