From 194ccdb09c0736a14bade1e3029e087e2300d95f Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Wed, 17 Apr 2019 13:16:34 +0800 Subject: [PATCH] [Add]websocket api for OnetokenGateway --- tests/trader/run.py | 2 + vnpy/gateway/onetoken/onetoken_gateway.py | 471 ++++++++++++++++++---- 2 files changed, 404 insertions(+), 69 deletions(-) diff --git a/tests/trader/run.py b/tests/trader/run.py index e9d5d892..007720c7 100644 --- a/tests/trader/run.py +++ b/tests/trader/run.py @@ -13,6 +13,7 @@ from vnpy.gateway.oes import OesGateway from vnpy.gateway.okex import OkexGateway from vnpy.gateway.huobi import HuobiGateway from vnpy.gateway.bitfinex import BitfinexGateway +from vnpy.gateway.onetoken import OnetokenGateway from vnpy.app.cta_strategy import CtaStrategyApp from vnpy.app.csv_loader import CsvLoaderApp @@ -36,6 +37,7 @@ def main(): main_engine.add_gateway(OkexGateway) main_engine.add_gateway(HuobiGateway) main_engine.add_gateway(BitfinexGateway) + main_engine.add_gateway(OnetokenGateway) main_engine.add_app(CtaStrategyApp) main_engine.add_app(CtaBacktesterApp) diff --git a/vnpy/gateway/onetoken/onetoken_gateway.py b/vnpy/gateway/onetoken/onetoken_gateway.py index 772a5913..e245fd85 100644 --- a/vnpy/gateway/onetoken/onetoken_gateway.py +++ b/vnpy/gateway/onetoken/onetoken_gateway.py @@ -3,16 +3,19 @@ """ import hashlib +import sys import hmac import json import time from datetime import datetime from threading import Lock from urllib.parse import urlparse +from copy import copy from requests import ConnectionError from vnpy.api.rest import Request, RestClient +from vnpy.api.websocket import WebsocketClient from vnpy.trader.constant import ( Direction, Exchange, @@ -22,20 +25,34 @@ from vnpy.trader.constant import ( ) from vnpy.trader.gateway import BaseGateway from vnpy.trader.object import ( + TickData, PositionData, AccountData, OrderRequest, CancelRequest, SubscribeRequest, - ContractData + ContractData, + OrderData, + TradeData ) +from vnpy.trader.event import EVENT_TIMER -REST_HOST = 'https://1token.trade/api' + +REST_HOST = "https://1token.trade/api" +DATA_WEBSOCKET_HOST = "wss://1token.trade/api/v1/ws/tick" +TRADE_WEBSOCKET_HOST = "wss://1token.trade/api/v1/ws/trade" DIRECTION_VT2ONETOKEN = {Direction.LONG: "b", Direction.SHORT: "s"} DIRECTION_ONETOKEN2VT = {v: k for k, v in DIRECTION_VT2ONETOKEN.items()} +EXCHANGE_VT2ONETOKEN = { + Exchange.OKEX: "okex", + Exchange.HUOBI: "huobi" +} +EXCHANGE_ONETOKEN2VT = {v: k for k, v in EXCHANGE_VT2ONETOKEN.items()} + + class OnetokenGateway(BaseGateway): """ VN Trader Gateway for 1Token connection @@ -44,7 +61,7 @@ class OnetokenGateway(BaseGateway): default_setting = { "OT Key": "", "OT Secret": "", - "交易所": "", + "交易所": ["BINANCE", "BITMEX", "OKEX", "OKEF"], "账户": "", "会话数": 3, "代理地址": "127.0.0.1", @@ -53,9 +70,13 @@ class OnetokenGateway(BaseGateway): def __init__(self, event_engine): """Constructor""" - super(OnetokenGateway, self).__init__(event_engine, "1Token") + super(OnetokenGateway, self).__init__(event_engine, "1TOKEN") self.rest_api = OnetokenRestApi(self) + self.data_ws_api = OnetokenDataWebsocketApi(self) + self.trade_ws_api = OnetokenTradeWebsocketApi(self) + + self.count = 0 def connect(self, setting: dict): """""" @@ -66,13 +87,18 @@ class OnetokenGateway(BaseGateway): account = setting["账户"] proxy_host = setting["代理地址"] proxy_port = setting["代理端口"] + self.rest_api.connect(key, secret, session_number, exchange, account, proxy_host, proxy_port) + self.data_ws_api.connect(proxy_host, proxy_port) + self.trade_ws_api.connect( + key, secret, exchange, account, proxy_host, proxy_port) + + self.init_ping() def subscribe(self, req: SubscribeRequest): """""" - pass - # self.ws_api.subscribe(req) + self.data_ws_api.subscribe(req) def send_order(self, req: OrderRequest): """""" @@ -93,6 +119,22 @@ class OnetokenGateway(BaseGateway): def close(self): """""" self.rest_api.stop() + self.data_ws_api.stop() + self.trade_ws_api.stop() + + def process_timer_event(self, event): + """""" + self.count += 1 + if self.count < 20: + return + self.count = 0 + + self.data_ws_api.ping() + self.trade_ws_api.ping() + + def init_ping(self): + """""" + self.event_engine.register(EVENT_TIMER, self.process_timer_event) class OnetokenRestApi(RestClient): @@ -123,23 +165,24 @@ class OnetokenRestApi(RestClient): """ method = request.method - endpoint = '/' + request.path.split('/', 3)[3] + endpoint = "/" + request.path.split("/", 3)[3] # v1/trade/okex/mock-example/info -> okex/mock-example/info parsed_url = urlparse(endpoint) path = parsed_url.path nonce = str(int(time.time() * 1e6)) data = request.data - json_str = data if data else '' + json_str = data if data else "" message = method + path + nonce + json_str - signature = hmac.new(bytes(self.secret, 'utf8'), bytes(message, 'utf8'), digestmod=hashlib.sha256).hexdigest() + signature = hmac.new(bytes(self.secret, "utf8"), bytes( + message, "utf8"), digestmod=hashlib.sha256).hexdigest() - headers = {'Api-Nonce': nonce, - 'Api-Key': self.key, - 'Api-Signature': signature, - 'Content-Type': 'application/json'} + headers = {"Api-Nonce": nonce, + "Api-Key": self.key, + "Api-Signature": signature, + "Content-Type": "application/json"} request.headers = headers return request @@ -174,58 +217,13 @@ class OnetokenRestApi(RestClient): self.query_time() self.query_contract() - self.query_account() + # self.query_account() def _new_order_id(self): with self.order_count_lock: self.order_count += 1 return self.order_count - def query_account(self): # get balance and positions at the same time - """""" - self.add_request( - "GET", - "/v1/trade/{}/{}/info".format(self.exchange, self.account), - callback=self.on_query_account - ) - - def on_query_account(self, data, request): - """This is for WS Example""" - for account_data in data["position"]: - _type = account_data['type'] - if 'spot' in _type: # 统计balance - account = AccountData( - accountid=account_data["contract"], - balance=float(account_data["total_amount"]), - frozen=float(account_data["frozen"]), - gateway_name=self.gateway_name - ) - self.gateway.on_account(account) - elif _type == 'future': # 期货合约 - long_position = PositionData( - symbol=account_data["contract"], - exchange=Exchange.OKEX, # todo add Exchange - direction=Direction.LONG, - volume=account_data['total_amount_long'], - frozen=account_data['total_amount_long'] - account_data['available_long'], - gateway_name=self.gateway_name, - # yd_volume=? - ) - short_position = PositionData( - symbol=account_data["contract"], - exchange=Exchange.OKEX, # todo add Exchange - direction=Direction.SHORT, - volume=account_data['total_amount_short'], - frozen=account_data['total_amount_short'] - account_data['available_short'], - gateway_name=self.gateway_name, - # yd_volume=? - ) - self.gateway.on_position(long_position) - self.gateway.on_position(short_position) - - self.gateway.write_log("账户资金查询成功") - self.gateway.write_log("账户持仓查询成功") - def query_time(self): """""" self.add_request( @@ -238,7 +236,7 @@ class OnetokenRestApi(RestClient): """""" server_timestamp = data["server_time"] dt = datetime.utcfromtimestamp(server_timestamp) - server_time = dt.isoformat() + 'Z' + server_time = dt.isoformat() + "Z" local_time = datetime.utcnow().isoformat() msg = f"服务器时间:{server_time},本机时间:{local_time}" self.gateway.write_log(msg) @@ -260,30 +258,31 @@ class OnetokenRestApi(RestClient): exchange=Exchange.OKEX, # todo name=symbol, product=Product.SPOT, # todo - size=float(instrument_data['min_amount']), - pricetick=float(instrument_data['unit_amount']), + size=float(instrument_data["min_amount"]), + pricetick=float(instrument_data["unit_amount"]), gateway_name=self.gateway_name ) self.gateway.on_contract(contract) self.gateway.write_log("合约信息查询成功") # Start websocket api after instruments data collected - # self.gateway.ws_api.start() + self.gateway.data_ws_api.start() + self.gateway.trade_ws_api.start() def send_order(self, req: OrderRequest): """""" orderid = str(self.connect_time + self._new_order_id()) data = { - 'contract': self.exchange + '/' + req.symbol, - 'price': float(req.price), + "contract": self.exchange + "/" + req.symbol, + "price": float(req.price), "bs": DIRECTION_VT2ONETOKEN[req.direction], - 'amount': float(req.volume), - 'client_oid': orderid + "amount": float(req.volume), + "client_oid": orderid } if req.offset == Offset.CLOSE: - data['options'] = {'close': True} + data["options"] = {"close": True} data = json.dumps(data) order = req.create_order_data(orderid, self.gateway_name) @@ -304,7 +303,7 @@ class OnetokenRestApi(RestClient): def cancel_order(self, req: CancelRequest): """""" params = { - 'client_oid': req.orderid + "client_oid": req.orderid } self.add_request( @@ -358,3 +357,337 @@ class OnetokenRestApi(RestClient): # Record exception if not ConnectionError if not issubclass(exception_type, ConnectionError): self.on_error(exception_type, exception_value, tb, request) + + +class OnetokenDataWebsocketApi(WebsocketClient): + """""" + + def __init__(self, gateway): + """""" + super().__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.ticks = {} + self.callbacks = { + "auth": self.on_login, + "single-tick-verbose": self.on_tick + } + + def connect( + self, + proxy_host: str, + proxy_port: int + ): + """""" + self.init(DATA_WEBSOCKET_HOST, proxy_host, proxy_port) + + def subscribe(self, req: SubscribeRequest): + """ + Subscribe to tick data upate. + """ + tick = TickData( + symbol=req.symbol, + exchange=req.exchange, + name=req.symbol, + datetime=datetime.now(), + gateway_name=self.gateway_name, + ) + + contract_symbol = f"{req.exchange.value.lower()}/{req.symbol.lower()}" + self.ticks[contract_symbol] = tick + + req = { + "uri": "subscribe-single-tick-verbose", + "contract": contract_symbol + } + self.send_packet(req) + + def on_connected(self): + """""" + self.gateway.write_log("行情Websocket API连接成功") + self.login() + + def on_disconnected(self): + """""" + self.gateway.write_log("行情Websocket API连接断开") + + def on_packet(self, packet: dict): + """""" + channel = packet.get("uri", "") + if not channel: + return + + data = packet.get("data", None) + callback = self.callbacks.get(channel, None) + + if callback: + callback(data) + + def on_error(self, exception_type: type, exception_value: Exception, tb): + """""" + msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" + self.gateway.write_log(msg) + + sys.stderr.write(self.exception_detail( + exception_type, exception_value, tb)) + + def login(self): + """ + Need to login befores subscribe to websocket topic. + """ + req = {"uri": "auth"} + self.send_packet(req) + + self.callbacks["auth"] = self.on_login + + def on_login(self, data: dict): + """""" + self.gateway.write_log("行情Websocket API登录成功") + + def on_tick(self, data: dict): + """""" + contract_symbol = data["contract"] + tick = self.ticks.get(contract_symbol, None) + if not tick: + return + + tick.last_price = data["last"] + tick.datetime = datetime.strptime( + data["time"][:-6], "%Y-%m-%dT%H:%M:%S.%f") + + bids = data["bids"] + asks = data["asks"] + for n, buf in enumerate(bids): + tick.__setattr__("bid_price_%s" % (n + 1), buf["price"]) + tick.__setattr__("bid_volume_%s" % (n + 1), buf["volume"]) + + for n, buf in enumerate(asks): + tick.__setattr__("ask_price_%s" % (n + 1), buf["price"]) + tick.__setattr__("ask_volume_%s" % (n + 1), buf["volume"]) + + self.gateway.on_tick(copy(tick)) + + def ping(self): + """""" + self.send_packet({"uri": "ping"}) + + +class OnetokenTradeWebsocketApi(WebsocketClient): + """""" + + def __init__(self, gateway): + """""" + super().__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.key = "" + self.secret = "" + self.exchange = "" + self.account = "" + + self.trade_count = 0 + + self.callbacks = { + "sub-info": self.on_subscribe_info, + "sub-order": self.on_subscribe_order, + "info": self.on_info, + "order": self.on_order + } + + def connect( + self, + key: str, + secret: str, + exchange: str, + account: str, + proxy_host: str, + proxy_port: int + ): + """""" + self.key = key + self.secret = secret + self.exchange = exchange + self.account = account + + # Create header for ws connection + nonce = str(int(time.time() * 1e6)) + path = f"/ws/{self.account}" + message = "GET" + path + nonce + + signature = hmac.new(bytes(self.secret, "utf8"), bytes( + message, "utf8"), digestmod=hashlib.sha256).hexdigest() + + header = { + "Api-Nonce": nonce, + "Api-Key": self.key, + "Api-Signature": signature + } + + host = f"{TRADE_WEBSOCKET_HOST}/{self.exchange}/{self.account}" + + self.init(host, proxy_host, proxy_port, header=header) + + def subscribe_info(self): + """ + Subscribe to account update. + """ + self.send_packet({"uri": "sub-info"}) + + def subscribe_order(self): + """ + Subscribe to order update. + """ + self.send_packet({"uri": "sub-order"}) + + def on_connected(self): + """""" + self.gateway.write_log("交易Websocket API连接成功") + self.subscribe_info() + self.subscribe_order() + + def on_disconnected(self): + """""" + self.gateway.write_log("交易Websocket API连接断开") + + def on_packet(self, packet: dict): + """""" + if "uri" in packet: + channel = packet["uri"] + + if "data" in packet: + data = packet["data"] + elif "code" in packet: + data = packet["code"] + else: + data = None + + elif "action" in packet: + channel = packet["action"] + data = packet.get("data", None) + else: + print(packet) + return + + callback = self.callbacks.get(channel, None) + if callback: + callback(data) + + def on_error(self, exception_type: type, exception_value: Exception, tb): + """""" + msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" + self.gateway.write_log(msg) + + sys.stderr.write(self.exception_detail( + exception_type, exception_value, tb)) + + def on_subscribe_info(self, data: str): + """""" + if data == "success": + self.gateway.write_log("账户资金推送订阅成功") + + def on_subscribe_order(self, data: str): + """""" + if data == "success": + self.gateway.write_log("委托更新推送订阅成功") + + def on_info(self, data: dict): + """""" + for account_data in data["position"]: + _type = account_data["type"] + + # Spot + if "spot" in _type: + account = AccountData( + accountid=account_data["contract"], + balance=float(account_data["total_amount"]), + frozen=float(account_data["frozen"]), + gateway_name=self.gateway_name + ) + self.gateway.on_account(account) + + # Futures + elif _type == "future": + long_position = PositionData( + symbol=account_data["contract"], + exchange=Exchange.OKEX, # todo add Exchange + direction=Direction.LONG, + volume=account_data["total_amount_long"], + frozen=account_data["total_amount_long"] - \ + account_data["available_long"], + gateway_name=self.gateway_name, + ) + short_position = PositionData( + symbol=account_data["contract"], + exchange=Exchange.OKEX, # todo add Exchange + direction=Direction.SHORT, + volume=account_data["total_amount_short"], + frozen=account_data["total_amount_short"] - \ + account_data["available_short"], + gateway_name=self.gateway_name, + ) + self.gateway.on_position(long_position) + self.gateway.on_position(short_position) + + def on_order(self, data: dict): + """""" + print("--------------------------") + for order_data in data: + print(order_data) + contract_symbol = order_data["contract"] + exchange_str, symbol = contract_symbol.split("/") + timestamp = order_data["entrust_time"][11:19] + + orderid = order_data["options"]["client_oid"] + + order = OrderData( + symbol=symbol, + exchange=EXCHANGE_ONETOKEN2VT[exchange_str], + orderid=orderid, + direction=DIRECTION_ONETOKEN2VT[order_data["bs"]], + price=order_data["entrust_price"], + volume=order_data["entrust_amount"], + traded=order_data["dealt_amount"], + time=timestamp, + gateway_name=self.gateway_name + ) + + if order_data["canceled_time"]: + order.status = Status.CANCELLED + else: + if order.traded == order.volume: + order.status = Status.ALLTRADED + elif not order.traded: + order.status = Status.NOTTRADED + else: + order.status = Status.PARTTRADED + + self.gateway.on_order(order) + + # Push trade data + if not order_data["last_dealt_amount"]: + return + + trade_timestamp = order_data["last_update"][11:19] + self.trade_count += 1 + + trade = TradeData( + symbol=order.symbol, + exchange=order.exchange, + orderid=order.orderid, + tradeid=str(self.trade_count), + direction=order.direction, + price=order_data["average_dealt_price"], + volume=order_data["last_dealt_amount"], + gateway_name=self.gateway_name, + time=trade_timestamp + ) + + self.gateway.on_trade(trade) + + def ping(self): + """""" + self.send_packet({"uri": "ping"})