diff --git a/tests/trader/run.py b/tests/trader/run.py index b4e1db91..130aa088 100644 --- a/tests/trader/run.py +++ b/tests/trader/run.py @@ -3,11 +3,12 @@ from vnpy.event import EventEngine from vnpy.trader.engine import MainEngine from vnpy.trader.ui import MainWindow, create_qapp +from vnpy.gateway.binance import BinanceGateway # from vnpy.gateway.bitmex import BitmexGateway # from vnpy.gateway.futu import FutuGateway # from vnpy.gateway.ib import IbGateway # from vnpy.gateway.ctp import CtpGateway -# # from vnpy.gateway.ctptest import CtptestGateway +# from vnpy.gateway.ctptest import CtptestGateway # from vnpy.gateway.femas import FemasGateway # from vnpy.gateway.tiger import TigerGateway # from vnpy.gateway.oes import OesGateway @@ -17,8 +18,7 @@ from vnpy.trader.ui import MainWindow, create_qapp # from vnpy.gateway.onetoken import OnetokenGateway # from vnpy.gateway.okexf import OkexfGateway # from vnpy.gateway.xtp import XtpGateway -from vnpy.gateway.hbdm import HbdmGateway -# from vnpy.gateway.tap import TapGateway +# from vnpy.gateway.hbdm import HbdmGateway # from vnpy.app.cta_strategy import CtaStrategyApp # from vnpy.app.csv_loader import CsvLoaderApp @@ -35,9 +35,11 @@ def main(): event_engine = EventEngine() main_engine = MainEngine(event_engine) + + main_engine.add_gateway(BinanceGateway) # main_engine.add_gateway(XtpGateway) # main_engine.add_gateway(CtpGateway) - # # main_engine.add_gateway(CtptestGateway) + # main_engine.add_gateway(CtptestGateway) # main_engine.add_gateway(FemasGateway) # main_engine.add_gateway(IbGateway) # main_engine.add_gateway(FutuGateway) @@ -49,8 +51,7 @@ def main(): # main_engine.add_gateway(BitfinexGateway) # main_engine.add_gateway(OnetokenGateway) # main_engine.add_gateway(OkexfGateway) - main_engine.add_gateway(HbdmGateway) - # main_engine.add_gateway(TapGateway) + # main_engine.add_gateway(HbdmGateway) # main_engine.add_app(CtaStrategyApp) # main_engine.add_app(CtaBacktesterApp) diff --git a/vnpy/gateway/binance/__init__.py b/vnpy/gateway/binance/__init__.py new file mode 100644 index 00000000..6b951a7a --- /dev/null +++ b/vnpy/gateway/binance/__init__.py @@ -0,0 +1 @@ +from .binance_gateway import BinanceGateway diff --git a/vnpy/gateway/binance/binance_gateway.py b/vnpy/gateway/binance/binance_gateway.py new file mode 100644 index 00000000..b3454d71 --- /dev/null +++ b/vnpy/gateway/binance/binance_gateway.py @@ -0,0 +1,679 @@ +""" +Gateway for Binance Crypto Exchange. +""" + +import urllib +import hashlib +import hmac +import time +from copy import copy +from datetime import datetime +from enum import Enum +from threading import Lock + +from vnpy.api.rest import RestClient, Request +from vnpy.api.websocket import WebsocketClient +from vnpy.trader.constant import ( + Direction, + Exchange, + Product, + Status, + OrderType +) +from vnpy.trader.gateway import BaseGateway +from vnpy.trader.object import ( + TickData, + OrderData, + TradeData, + AccountData, + ContractData, + OrderRequest, + CancelRequest, + SubscribeRequest +) +from vnpy.trader.event import EVENT_TIMER +from vnpy.event import Event + + +REST_HOST = "https://www.binance.com" +WEBSOCKET_TRADE_HOST = "wss://stream.binance.com:9443/ws/" +WEBSOCKET_DATA_HOST = "wss://stream.binance.com:9443/stream?streams=" + +STATUS_BINANCE2VT = { + "NEW": Status.NOTTRADED, + "PARTIALLY_FILLED": Status.PARTTRADED, + "FILLED": Status.ALLTRADED, + "CANCELED": Status.CANCELLED, + "REJECTED": Status.REJECTED +} + +ORDERTYPE_VT2BINANCE = { + OrderType.LIMIT: "LIMIT", + OrderType.MARKET: "MARKET" +} +ORDERTYPE_BINANCE2VT = {v: k for k, v in ORDERTYPE_VT2BINANCE.items()} + +DIRECTION_VT2BINANCE = { + Direction.LONG: "BUY", + Direction.SHORT: "SELL" +} +DIRECTION_BINANCE2VT = {v: k for k, v in DIRECTION_VT2BINANCE.items()} + + +class Security(Enum): + NONE = 0 + SIGNED = 1 + API_KEY = 2 + + +symbol_name_map = {} + + +class BinanceGateway(BaseGateway): + """ + VN Trader Gateway for Binance connection. + """ + + default_setting = { + "key": "", + "secret": "", + "session_number": 3, + "proxy_host": "", + "proxy_port": 0, + } + + exchanges = [Exchange.BINANCE] + + def __init__(self, event_engine): + """Constructor""" + super().__init__(event_engine, "BINANCE") + + self.trade_ws_api = BinanceTradeWebsocketApi(self) + self.market_ws_api = BinanceDataWebsocketApi(self) + self.rest_api = BinanceRestApi(self) + + self.event_engine.register(EVENT_TIMER, self.process_timer_event) + + def connect(self, setting: dict): + """""" + key = setting["key"] + secret = setting["secret"] + session_number = setting["session_number"] + proxy_host = setting["proxy_host"] + proxy_port = setting["proxy_port"] + + self.rest_api.connect(key, secret, session_number, + proxy_host, proxy_port) + self.market_ws_api.connect(proxy_host, proxy_port) + + def subscribe(self, req: SubscribeRequest): + """""" + self.market_ws_api.subscribe(req) + + def send_order(self, req: OrderRequest): + """""" + return self.rest_api.send_order(req) + + def cancel_order(self, req: CancelRequest): + """""" + self.rest_api.cancel_order(req) + + def query_account(self): + """""" + pass + + def query_position(self): + """""" + pass + + def close(self): + """""" + self.rest_api.stop() + self.trade_ws_api.stop() + self.market_ws_api.stop() + + def process_timer_event(self, event: Event): + """""" + self.rest_api.keep_user_stream() + + +class BinanceRestApi(RestClient): + """ + BINANCE REST API + """ + + def __init__(self, gateway: BinanceGateway): + """""" + super().__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.trade_ws_api = self.gateway.trade_ws_api + + self.key = "" + self.secret = "" + + self.user_stream_key = "" + self.keep_alive_count = 0 + self.recv_window = 5000 + self.time_offset = 0 + + self.order_count = 1_000_000 + self.order_count_lock = Lock() + self.connect_time = 0 + + def sign(self, request): + """ + Generate BINANCE signature. + """ + if request.params: + path = request.path + "?" + urllib.parse.urlencode(request.params) + else: + request.params = dict() + path = request.path + + security = request.data["security"] + + if security == Security.SIGNED: + timestamp = int(time.time() * 1000) + + if self.time_offset > 0: + timestamp -= abs(self.time_offset) + elif self.time_offset < 0: + timestamp += abs(self.time_offset) + + request.params["timestamp"] = timestamp + # request.params["recv_window"] = self.recv_window + + query = urllib.parse.urlencode(sorted(request.params.items())) + signature = hmac.new(self.secret, query.encode( + "utf-8"), hashlib.sha256).hexdigest() + + query += "&signature={}".format(signature) + path = request.path + "?" + query + + request.path = path + request.params = {} + request.data = {} + + # Add headers + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "Accept": "application/json", + "X-MBX-APIKEY": self.key + } + + if security == Security.SIGNED or security == Security.API_KEY: + request.headers = headers + + return request + + def connect( + self, + key: str, + secret: str, + session_number: int, + proxy_host: str, + proxy_port: int + ): + """ + Initialize connection to REST server. + """ + self.key = key + self.secret = secret.encode() + self.proxy_port = proxy_port + self.proxy_host = proxy_host + + self.connect_time = ( + int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count + ) + + self.init(REST_HOST, proxy_host, proxy_port) + self.start(session_number) + + self.gateway.write_log("REST API启动成功") + + self.query_time() + self.query_account() + self.query_order() + self.query_contract() + self.start_user_stream() + + def query_time(self): + """""" + data = { + "security": Security.NONE + } + path = "/api/v1/time" + + return self.add_request( + "GET", + path, + callback=self.on_query_time, + data=data + ) + + def query_account(self): + """""" + data = {"security": Security.SIGNED} + + self.add_request( + method="GET", + path="/api/v3/account", + callback=self.on_query_account, + data=data + ) + + def query_order(self): + """""" + data = {"security": Security.SIGNED} + + self.add_request( + method="GET", + path="/api/v3/openOrders", + callback=self.on_query_order, + data=data + ) + + def query_contract(self): + """""" + data = { + "security": Security.NONE + } + self.add_request( + method="GET", + path="/api/v1/exchangeInfo", + callback=self.on_query_contract, + data=data + ) + + def _new_order_id(self): + """""" + with self.order_count_lock: + self.order_count += 1 + return self.order_count + + def send_order(self, req: OrderRequest): + """""" + orderid = str(self.connect_time + self._new_order_id()) + order = req.create_order_data( + orderid, + self.gateway_name + ) + self.gateway.on_order(order) + + data = { + "security": Security.SIGNED + } + + params = { + "symbol": req.symbol, + "timeInForce": "GTC", + "side": DIRECTION_VT2BINANCE[req.direction], + "type": ORDERTYPE_VT2BINANCE[req.type], + "price": str(req.price), + "quantity": str(req.volume), + "newClientOrderId": orderid, + "newOrderRespType": "ACK" + } + + self.add_request( + method="POST", + path="/api/v3/order", + callback=self.on_send_order, + data=data, + params=params, + extra=order, + on_error=self.on_send_order_error, + on_failed=self.on_send_order_failed + ) + + return order.vt_orderid + + def cancel_order(self, req: CancelRequest): + """""" + data = { + "security": Security.SIGNED + } + + params = { + "symbol": req.symbol, + "origClientOrderId": req.orderid + } + + self.add_request( + method="DELETE", + path="/api/v3/order", + callback=self.on_cancel_order, + params=params, + data=data, + extra=req + ) + + def start_user_stream(self): + """""" + data = { + "security": Security.API_KEY + } + + self.add_request( + method="POST", + path="/api/v1/userDataStream", + callback=self.on_start_user_stream, + data=data + ) + + def keep_user_stream(self): + """""" + self.keep_alive_count += 1 + if self.keep_alive_count < 1800: + return + + data = { + "security": Security.SIGNED + } + + params = { + "listenKey": self.user_stream_key + } + + self.add_request( + method="PUT", + path="/api/v1/userDataStream", + callback=self.on_keep_user_stream, + params=params, + data=data + ) + + def on_query_time(self, data, request): + """""" + local_time = int(time.time() * 1000) + server_time = int(data["serverTime"]) + self.time_offset = local_time - server_time + + def on_query_account(self, data, request): + """""" + for account_data in data["balances"]: + account = AccountData( + accountid=account_data["asset"], + balance=float(account_data["free"]) + float(account_data["locked"]), + frozen=float(account_data["locked"]), + gateway_name=self.gateway_name + ) + + if account.balance: + self.gateway.on_account(account) + + self.gateway.write_log("账户资金查询成功") + + def on_query_order(self, data, request): + """""" + for d in data: + dt = datetime.fromtimestamp(d["time"] / 1000) + time = dt.strftime("%Y-%m-%d %H:%M:%S") + + order = OrderData( + orderid=d["clientOrderId"], + symbol=d["symbol"], + exchange=Exchange.BINANCE, + price=float(d["price"]), + volume=float(d["origQty"]), + type=ORDERTYPE_BINANCE2VT[d["type"]], + direction=DIRECTION_BINANCE2VT[d["side"]], + traded=float(d["executedQty"]), + status=STATUS_BINANCE2VT.get(d["status"], None), + time=time, + gateway_name=self.gateway_name, + ) + self.gateway.on_order(order) + + self.gateway.write_log("委托信息查询成功") + + def on_query_contract(self, data, request): + """""" + for d in data["symbols"]: + base_currency = d["baseAsset"] + quote_currency = d["quoteAsset"] + name = f"{base_currency.upper()}/{quote_currency.upper()}" + + pricetick = 0 + min_volume = 0 + + for f in d["filters"]: + if f["filterType"] == "PRICE_FILTER": + pricetick = f["tickSize"] + elif f["filterType"] == "LOT_SIZE": + min_volume = f["stepSize"] + + contract = ContractData( + symbol=d["symbol"], + exchange=Exchange.BINANCE, + name=name, + pricetick=pricetick, + size=1, + min_volume=min_volume, + product=Product.SPOT, + gateway_name=self.gateway_name, + ) + self.gateway.on_contract(contract) + + symbol_name_map[contract.symbol] = contract.name + + self.gateway.write_log("合约信息查询成功") + + def on_send_order(self, data, request): + """""" + pass + + def on_send_order_failed(self, status_code: str, request: Request): + """ + Callback when sending order failed on server. + """ + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) + + msg = f"委托失败,状态码:{status_code},信息:{request.response.text}" + self.gateway.write_log(msg) + + def on_send_order_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback when sending order caused exception. + """ + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) + + # Record exception if not ConnectionError + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) + + def on_cancel_order(self, data, request): + """""" + pass + + def on_start_user_stream(self, data, request): + """""" + self.user_stream_key = data["listenKey"] + self.keep_alive_count = 0 + url = WEBSOCKET_TRADE_HOST + self.user_stream_key + + self.trade_ws_api.connect(url, self.proxy_host, self.proxy_port) + + def on_keep_user_stream(self, data, request): + """""" + pass + + +class BinanceTradeWebsocketApi(WebsocketClient): + """""" + + def __init__(self, gateway): + """""" + super().__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + def connect(self, url, proxy_host, proxy_port): + """""" + self.init(url, proxy_host, proxy_port) + self.start() + + def on_connected(self): + """""" + self.gateway.write_log("交易Websocket API连接成功") + + def on_packet(self, packet: dict): # type: (dict)->None + """""" + if packet["e"] == "outboundAccountInfo": + self.on_account(packet) + else: + self.on_order(packet) + + def on_account(self, packet): + """""" + for d in packet["B"]: + account = AccountData( + accountid=d["a"], + balance=float(d["f"]) + float(d["l"]), + frozen=float(d["l"]), + gateway_name=self.gateway_name + ) + + if account.balance: + self.gateway.on_account(account) + + def on_order(self, packet: dict): + """""" + dt = datetime.fromtimestamp(packet["O"] / 1000) + time = dt.strftime("%Y-%m-%d %H:%M:%S") + + if packet["C"] == "null": + orderid = packet["c"] + else: + orderid = packet["C"] + + order = OrderData( + symbol=packet["s"], + exchange=Exchange.BINANCE, + orderid=orderid, + type=ORDERTYPE_BINANCE2VT[packet["o"]], + direction=DIRECTION_BINANCE2VT[packet["S"]], + price=float(packet["p"]), + volume=float(packet["q"]), + traded=float(packet["z"]), + status=STATUS_BINANCE2VT[packet["X"]], + time=time, + gateway_name=self.gateway_name + ) + + self.gateway.on_order(order) + + # Push trade event + trade_volume = float(packet["l"]) + if not trade_volume: + return + + trade_dt = datetime.fromtimestamp(packet["T"] / 1000) + trade_time = trade_dt.strftime("%Y-%m-%d %H:%M:%S") + + trade = TradeData( + symbol=order.symbol, + exchange=order.exchange, + orderid=order.orderid, + tradeid=packet["t"], + direction=order.direction, + price=float(packet["L"]), + volume=trade_volume, + time=trade_time, + gateway_name=self.gateway_name, + ) + self.gateway.on_trade(trade) + + +class BinanceDataWebsocketApi(WebsocketClient): + """""" + + def __init__(self, gateway): + """""" + super().__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.ticks = {} + + def connect(self, proxy_host: str, proxy_port: int): + """""" + self.proxy_host = proxy_host + self.proxy_port = proxy_port + + def on_connected(self): + """""" + self.gateway.write_log("行情Websocket API连接刷新") + + def subscribe(self, req: SubscribeRequest): + """""" + if req.symbol not in symbol_name_map: + self.gateway.write_log(f"找不到该合约代码{req.symbol}") + return + + # Create tick buf data + tick = TickData( + symbol=req.symbol, + name=symbol_name_map.get(req.symbol, ""), + exchange=Exchange.BINANCE, + datetime=datetime.now(), + gateway_name=self.gateway_name, + ) + self.ticks[req.symbol.lower()] = tick + + # Close previous connection + if self._active: + self.stop() + self.join() + + # Create new connection + channels = [] + for ws_symbol in self.ticks.keys(): + channels.append(ws_symbol + "@ticker") + channels.append(ws_symbol + "@depth5") + + url = WEBSOCKET_DATA_HOST + "/".join(channels) + self.init(url, self.proxy_host, self.proxy_port) + self.start() + + def on_packet(self, packet): + """""" + stream = packet["stream"] + data = packet["data"] + + symbol, channel = stream.split("@") + tick = self.ticks[symbol] + + if channel == "ticker": + tick.volume = float(data['v']) + tick.open_price = float(data['o']) + tick.high_price = float(data['h']) + tick.low_price = float(data['l']) + tick.last_price = float(data['c']) + tick.datetime = datetime.fromtimestamp(float(data['E']) / 1000) + else: + bids = data["bids"] + for n in range(5): + price, volume = bids[n] + tick.__setattr__("bid_price_" + str(n + 1), float(price)) + tick.__setattr__("bid_volume_" + str(n + 1), float(volume)) + + asks = data["asks"] + for n in range(5): + price, volume = asks[n] + tick.__setattr__("ask_price_" + str(n + 1), float(price)) + tick.__setattr__("ask_volume_" + str(n + 1), float(volume)) + + if tick.last_price: + self.gateway.on_tick(copy(tick)) diff --git a/vnpy/gateway/bitmex/bitmex_gateway.py b/vnpy/gateway/bitmex/bitmex_gateway.py index e8248f29..72988f56 100644 --- a/vnpy/gateway/bitmex/bitmex_gateway.py +++ b/vnpy/gateway/bitmex/bitmex_gateway.py @@ -233,6 +233,7 @@ class BitmexRestApi(RestClient): self.gateway.write_log("REST API启动成功") def _new_order_id(self): + """""" with self.order_count_lock: self.order_count += 1 return self.order_count diff --git a/vnpy/trader/constant.py b/vnpy/trader/constant.py index 785ecabf..73ae1ee4 100644 --- a/vnpy/trader/constant.py +++ b/vnpy/trader/constant.py @@ -114,6 +114,7 @@ class Exchange(Enum): OKEX = "OKEX" HUOBI = "HUOBI" BITFINEX = "BITFINEX" + BINANCE = "BINANCE" class Currency(Enum):