From d0a26081c386796b183afa885bb44272101abe8d Mon Sep 17 00:00:00 2001 From: 1122455801 Date: Sat, 20 Apr 2019 18:19:06 +0800 Subject: [PATCH] Create okexf_gateway.py --- vnpy/gateway/okexf/okexf_gateway.py | 823 ++++++++++++++++++++++++++++ 1 file changed, 823 insertions(+) create mode 100644 vnpy/gateway/okexf/okexf_gateway.py diff --git a/vnpy/gateway/okexf/okexf_gateway.py b/vnpy/gateway/okexf/okexf_gateway.py new file mode 100644 index 00000000..7d775871 --- /dev/null +++ b/vnpy/gateway/okexf/okexf_gateway.py @@ -0,0 +1,823 @@ +# encoding: UTF-8 +""" +Author: qqqlyx +""" + +import hashlib +import hmac +import sys +import time +import json +import base64 +import zlib +from copy import copy +from datetime import datetime +from threading import Lock +from urllib.parse import urlencode + +from requests import ConnectionError + +from vnpy.api.rest import Request, RestClient +from vnpy.api.websocket import WebsocketClient +from vnpy.trader.constant import ( + Direction, + Exchange, + OrderType, + Product, + Status, + Offset, +) +from vnpy.trader.gateway import BaseGateway +from vnpy.trader.object import ( + TickData, + OrderData, + TradeData, + AccountData, + ContractData, + OrderRequest, + CancelRequest, + SubscribeRequest, + PositionData, +) +REST_HOST = "https://www.okex.com" +WEBSOCKET_HOST = "wss://real.okex.com:10442/ws/v3" + +STATUS_OKEXF2VT = { + "0": Status.NOTTRADED, + "1": Status.PARTTRADED, + "2": Status.ALLTRADED, + "-1": Status.CANCELLED, +} + +ORDERTYPE_OKEXF2VT = { + "0": OrderType.LIMIT, + "1": OrderType.MARKET, +} + +TYPE_OKEXF2VT = { + "1": (Offset.OPEN, Direction.LONG), + "2": (Offset.OPEN, Direction.SHORT), + "3": (Offset.CLOSE, Direction.SHORT), + "4": (Offset.CLOSE, Direction.LONG), +} +TYPE_VT2OKEXF = {v: k for k, v in TYPE_OKEXF2VT.items()} + +instruments = set() +currencies = set() + + +class OkexfGateway(BaseGateway): + """ + VN Trader Gateway for OKEX connection. + """ + + default_setting = { + "API Key": "", + "Secret Key": "", + "Passphrase": "", + "Leverage": 10, + "会话数": 3, + "代理地址": "", + "代理端口": "", + } + + def __init__(self, event_engine): + """Constructor""" + super(OkexfGateway, self).__init__(event_engine, "OKEXF") + + self.rest_api = OkexfRestApi(self) + self.ws_api = OkexfWebsocketApi(self) + + self.orders = {} + + def connect(self, setting: dict): + """""" + key = setting["API Key"] + secret = setting["Secret Key"] + passphrase = setting["Passphrase"] + session_number = setting["会话数"] + leverage = setting["Leverage"] + proxy_host = setting["代理地址"] + proxy_port = setting["代理端口"] + + if proxy_port.isdigit(): + proxy_port = int(proxy_port) + else: + proxy_port = 0 + + self.rest_api.connect(key, secret, passphrase, leverage, + session_number, proxy_host, proxy_port) + self.ws_api.connect(key, secret, passphrase, proxy_host, proxy_port) + + def subscribe(self, req: SubscribeRequest): + """""" + self.ws_api.subscribe(req) + + def send_order(self, req: OrderRequest): + """""" + return self.rest_api.send_order(req) + + def cancel_order(self, req: CancelRequest): + """""" + self.rest_api.cancel_order(req) + + def query_account(self): + """""" + pass + + def query_position(self): + """""" + pass + + def close(self): + """""" + self.rest_api.stop() + self.ws_api.stop() + + def on_order(self, order: OrderData): + """""" + self.orders[order.orderid] = order + super().on_order(order) + + def get_order(self, orderid: str): + """""" + return self.orders.get(orderid, None) + + +class OkexfRestApi(RestClient): + """ + OKEXF REST API + """ + + def __init__(self, gateway: BaseGateway): + """""" + super(OkexfRestApi, self).__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.key = "" + self.secret = "" + self.passphrase = "" + self.leverage = 0 + + self.order_count = 10000 + self.order_count_lock = Lock() + + self.connect_time = 0 + + def sign(self, request): + """ + Generate OKEXF signature. + """ + # Sign + timestamp = get_timestamp() + request.data = json.dumps(request.data) + + if request.params: + path = request.path + "?" + urlencode(request.params) + else: + path = request.path + + msg = timestamp + request.method + path + request.data + signature = generate_signature(msg, self.secret) + + # Add headers + request.headers = { + "OK-ACCESS-KEY": self.key, + "OK-ACCESS-SIGN": signature, + "OK-ACCESS-TIMESTAMP": timestamp, + "OK-ACCESS-PASSPHRASE": self.passphrase, + "Content-Type": "application/json" + } + return request + + def connect( + self, + key: str, + secret: str, + passphrase: str, + leverage: int, + session_number: int, + proxy_host: str, + proxy_port: int, + ): + """ + Initialize connection to REST server. + """ + self.key = key + self.secret = secret.encode() + self.passphrase = passphrase + self.leverage = leverage + + self.connect_time = int(datetime.now().strftime("%y%m%d%H%M%S")) + + self.init(REST_HOST, proxy_host, proxy_port) + self.start(session_number) + self.gateway.write_log("REST API启动成功") + + self.query_time() + self.query_contract() + self.query_account() + self.query_position() + + def _new_order_id(self): + with self.order_count_lock: + self.order_count += 1 + return self.order_count + + def send_order(self, req: OrderRequest): + """""" + orderid = f"a{self.connect_time}{self._new_order_id()}" + + data = { + "client_oid": orderid, + "type": TYPE_VT2OKEXF[(req.offset, req.direction)], + "instrument_id": req.symbol, + "price": str(req.price), + "size": str(int(req.volume)), + "leverage": self.leverage, + } + + if req.type == OrderType.MARKET: + data["match_price"] = "1" + else: + data["match_price"] = "0" + + order = req.create_order_data(orderid, self.gateway_name) + + self.add_request( + "POST", + "/api/futures/v3/order", + callback=self.on_send_order, + data=data, + extra=order, + on_failed=self.on_send_order_failed, + on_error=self.on_send_order_error, + ) + + self.gateway.on_order(order) + return order.vt_orderid + + def cancel_order(self, req: CancelRequest): + """""" + path = "/api/futures/v3/cancel_order/%s/%s" % (req.symbol, req.orderid) + self.add_request( + "POST", + path, + callback=self.on_cancel_order, + on_error=self.on_cancel_order_error, + on_failed=self.on_cancel_order_failed, + extra=req + ) + + def query_contract(self): + """""" + self.add_request( + "GET", + "/api/futures/v3/instruments", + callback=self.on_query_contract + ) + + def query_account(self): + """""" + self.add_request( + "GET", + "/api/futures/v3/accounts", + callback=self.on_query_account + ) + + def query_order(self): + """""" + for code in instruments: + + # get waiting orders + self.add_request( + "GET", + "/api/futures/v3/orders/%s?status=0" % (code), + callback=self.on_query_order + ) + + # get part traded orders + self.add_request( + "GET", + "/api/futures/v3/orders/%s?status=1" % (code), + callback=self.on_query_order + ) + + def query_position(self): + """""" + self.add_request( + "GET", + "/api/futures/v3/position", + callback=self.on_query_position + ) + + def query_time(self): + """""" + self.add_request( + "GET", + "/api/general/v3/time", + callback=self.on_query_time + ) + + def on_query_contract(self, data, request): + """""" + for instrument_data in data: + symbol = instrument_data["instrument_id"] + contract = ContractData( + symbol=symbol, + exchange=Exchange.OKEX, + name=symbol, + product=Product.FUTURES, + size=int(instrument_data["trade_increment"]), + pricetick=float(instrument_data["tick_size"]), + gateway_name=self.gateway_name, + ) + self.gateway.on_contract(contract) + + instruments.add(instrument_data["instrument_id"]) + currencies.add(instrument_data["underlying_index"]) + currencies.add(instrument_data["quote_currency"]) + + self.gateway.write_log("合约信息查询成功") + + # Start websocket api after instruments data collected + self.gateway.ws_api.start() + + # and query pending orders + self.query_order() + + def on_query_account(self, data, request): + """""" + + for currency, d in data["info"].items(): + account = AccountData( + accountid=currency.upper(), + balance=float(d["equity"]), + frozen=float(d.get("margin_for_unfilled", 0)), + gateway_name=self.gateway_name, + ) + self.gateway.on_account(account) + self.gateway.write_log("账户资金查询成功") + + def on_query_position(self, data, request): + """""" + if not data["holding"]: + return + + for pos_data in data["holding"][0]: + if float(pos_data["long_qty"]) > 0: + pos = PositionData( + symbol=pos_data["instrument_id"].upper(), + exchange=Exchange.OKEX, + direction=Direction.LONG, + volume=pos_data["long_qty"], + frozen=float(pos_data["long_qty"]) - float(pos_data["long_avail_qty"]), + price=pos_data["long_avg_cost"], + pnl=pos_data["realised_pnl"], + gateway_name=self.gateway_name, + ) + self.gateway.on_position(pos) + + if float(pos_data["short_qty"]) > 0: + pos = PositionData( + symbol=pos_data["instrument_id"], + exchange=Exchange.OKEX, + direction=Direction.SHORT, + volume=pos_data["short_qty"], + frozen=float(pos_data["short_qty"]) - float(pos_data["short_avail_qty"]), + price=pos_data["short_avg_cost"], + pnl=pos_data["realised_pnl"], + gateway_name=self.gateway_name, + ) + self.gateway.on_position(pos) + + def on_query_order(self, data, request): + """""" + for order_data in data["order_info"]: + offset, direction = TYPE_OKEXF2VT[order_data["type"]] + order = OrderData( + symbol=order_data["instrument_id"], + exchange=Exchange.OKEX, + type=ORDERTYPE_OKEXF2VT[order_data["order_type"]], + orderid=order_data["client_oid"], + direction=direction, + offset=offset, + traded=int(order_data["filled_qty"]), + price=float(order_data["price"]), + volume=float(order_data["size"]), + time=UTC2LOCAL(order_data["timestamp"]).strftime("%H:%M:%S"), + status=STATUS_OKEXF2VT[order_data["status"]], + gateway_name=self.gateway_name, + ) + self.gateway.on_order(order) + + def on_query_time(self, data, request): + """""" + server_time = data["iso"] + local_time = datetime.utcnow().isoformat() + msg = f"服务器时间:{server_time},本机时间:{local_time}" + self.gateway.write_log(msg) + + def on_send_order_failed(self, status_code: str, request: Request): + """ + Callback when sending order failed on server. + """ + + order = request.extra + order.status = Status.REJECTED + order.time = datetime.now().strftime("%H:%M:%S.%f") + self.gateway.on_order(order) + msg = f"委托失败,状态码:{status_code},信息:{request.response.text}" + self.gateway.write_log(msg) + + def on_send_order_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback when sending order caused exception. + """ + + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) + + # Record exception if not ConnectionError + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) + + def on_send_order(self, data, request): + """ + Websocket will push a new order status + """ + order = request.extra + error_msg = data["error_message"] + if error_msg: + order.status = Status.REJECTED + self.gateway.on_order(order) + + self.gateway.write_log(f"委托失败:{error_msg}") + + def on_cancel_order_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback when cancelling order failed on server. + """ + # Record exception if not ConnectionError + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) + + def on_cancel_order(self, data, request): + """ + Websocket will push a new order status + """ + pass + + def on_cancel_order_failed(self, status_code: int, request: Request): + """ + If cancel failed, mark order status to be rejected. + """ + req = request.extra + order = self.gateway.get_order(req.orderid) + if order: + order.status = Status.REJECTED + self.gateway.on_order(order) + + def on_failed(self, status_code: int, request: Request): + """ + Callback to handle request failed. + """ + msg = f"请求失败,状态码:{status_code},信息:{request.response.text}" + self.gateway.write_log(msg) + + def on_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback to handler request exception. + """ + msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" + self.gateway.write_log(msg) + + sys.stderr.write( + self.exception_detail(exception_type, exception_value, tb, request) + ) + + +class OkexfWebsocketApi(WebsocketClient): + """""" + + def __init__(self, gateway): + """""" + super(OkexfWebsocketApi, self).__init__() + self.ping_interval = 20 # OKEX use 30 seconds for ping + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.key = "" + self.secret = "" + self.passphrase = "" + + self.trade_count = 10000 + self.connect_time = 0 + + self.callbacks = {} + self.ticks = {} + + def connect( + self, + key: str, + secret: str, + passphrase: str, + proxy_host: str, + proxy_port: int + ): + """""" + self.key = key + self.secret = secret.encode() + self.passphrase = passphrase + + self.connect_time = int(datetime.now().strftime("%y%m%d%H%M%S")) + + self.init(WEBSOCKET_HOST, proxy_host, proxy_port) + + def unpack_data(self, data): + """""" + return json.loads(zlib.decompress(data, -zlib.MAX_WBITS)) + + def subscribe(self, req: SubscribeRequest): + """ + Subscribe to tick data upate. + """ + tick = TickData( + symbol=req.symbol, + exchange=req.exchange, + name=req.symbol, + datetime=datetime.now(), + gateway_name=self.gateway_name, + ) + self.ticks[req.symbol] = tick + + channel_ticker = f"futures/ticker:{req.symbol}" + channel_depth = f"futures/depth5:{req.symbol}" + + self.callbacks[channel_ticker] = self.on_ticker + self.callbacks[channel_depth] = self.on_depth + + req = { + "op": "subscribe", + "args": [channel_ticker, channel_depth] + } + self.send_packet(req) + + def on_connected(self): + """""" + self.gateway.write_log("Websocket API连接成功") + self.login() + + def on_disconnected(self): + """""" + self.gateway.write_log("Websocket API连接断开") + + def on_packet(self, packet: dict): + """""" + if "event" in packet: + event = packet["event"] + if event == "subscribe": + return + elif event == "error": + msg = packet["message"] + self.gateway.write_log(f"Websocket API请求异常:{msg}") + elif event == "login": + self.on_login(packet) + else: + channel = packet["table"] + data = packet["data"] + callback = self.callbacks.get(channel, None) + + if callback: + for d in data: + callback(d) + + def on_error(self, exception_type: type, exception_value: Exception, tb): + """""" + msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" + self.gateway.write_log(msg) + + sys.stderr.write(self.exception_detail(exception_type, exception_value, tb)) + + def login(self): + """ + Need to login befores subscribe to websocket topic. + """ + timestamp = str(time.time()) + + msg = timestamp + "GET" + "/users/self/verify" + signature = generate_signature(msg, self.secret) + + req = { + "op": "login", + "args": [ + self.key, + self.passphrase, + timestamp, + signature.decode("utf-8") + ] + } + self.send_packet(req) + self.callbacks["login"] = self.on_login + + def subscribe_topic(self): + """ + Subscribe to all private topics. + """ + self.callbacks["futures/ticker"] = self.on_ticker + self.callbacks["futures/depth5"] = self.on_depth + self.callbacks["futures/account"] = self.on_account + self.callbacks["futures/order"] = self.on_order + self.callbacks["futures/position"] = self.on_position + + # Subscribe to order update + channels = [] + for instrument_id in instruments: + channel = f"futures/order:{instrument_id}" + channels.append(channel) + + req = { + "op": "subscribe", + "args": channels + } + self.send_packet(req) + + # Subscribe to account update + channels = [] + for currency in currencies: + if currency != "USD": + channel = f"futures/account:{currency}" + channels.append(channel) + + req = { + "op": "subscribe", + "args": channels + } + self.send_packet(req) + + # Subscribe to position update + channels = [] + for instrument_id in instruments: + channel = f"futures/position:{instrument_id}" + channels.append(channel) + + req = { + "op": "subscribe", + "args": channels + } + self.send_packet(req) + + def on_login(self, data: dict): + """""" + success = data.get("success", False) + + if success: + self.gateway.write_log("Websocket API登录成功") + self.subscribe_topic() + else: + self.gateway.write_log("Websocket API登录失败") + + def on_ticker(self, d): + """""" + symbol = d["instrument_id"] + tick = self.ticks.get(symbol, None) + if not tick: + return + + tick.last_price = d["last"] + tick.high_price = d["high_24h"] + tick.low_price = d["low_24h"] + tick.volume = d["volume_24h"] + tick.datetime = UTC2LOCAL(d["timestamp"]) + + self.gateway.on_tick(copy(tick)) + + def on_depth(self, d): + """""" + for tick_data in d: + symbol = d["instrument_id"] + tick = self.ticks.get(symbol, None) + if not tick: + return + + bids = d["bids"] + asks = d["asks"] + for n, buf in enumerate(bids): + price, volume, _, __ = buf + tick.__setattr__("bid_price_%s" % (n + 1), price) + tick.__setattr__("bid_volume_%s" % (n + 1), volume) + + for n, buf in enumerate(asks): + price, volume, _, __ = buf + tick.__setattr__("ask_price_%s" % (n + 1), price) + tick.__setattr__("ask_volume_%s" % (n + 1), volume) + + tick.datetime = UTC2LOCAL(d["timestamp"]) + self.gateway.on_tick(copy(tick)) + + def on_order(self, d): + """""" + offset, direction = TYPE_OKEXF2VT[d["type"]] + order = OrderData( + symbol=d["instrument_id"], + exchange=Exchange.OKEX, + type=ORDERTYPE_OKEXF2VT[d["order_type"]], + orderid=d["client_oid"], + offset=offset, + direction=direction, + price=float(d["price"]), + volume=float(d["size"]), + traded=float(d["filled_qty"]), + time=UTC2LOCAL(d["timestamp"]).strftime("%H:%M:%S"), + status=STATUS_OKEXF2VT[d["status"]], + gateway_name=self.gateway_name, + ) + self.gateway.on_order(copy(order)) + + trade_volume = d.get("last_fill_qty", 0) + if not trade_volume or float(trade_volume) == 0: + return + + self.trade_count += 1 + tradeid = f"{self.connect_time}{self.trade_count}" + + trade = TradeData( + symbol=order.symbol, + exchange=order.exchange, + orderid=order.orderid, + tradeid=tradeid, + direction=order.direction, + offset=order.offset, + price=float(d["last_fill_px"]), + volume=float(trade_volume), + time=order.time, + gateway_name=self.gateway_name, + ) + self.gateway.on_trade(trade) + + def on_account(self, d): + """""" + for key in d: + account_data = d[key] + account = AccountData( + accountid=key, + balance=float(account_data["equity"]), + frozen=float(d.get("margin_for_unfilled", 0)), + gateway_name=self.gateway_name + ) + self.gateway.on_account(account) + + def on_position(self, d): + """""" + pos = PositionData( + symbol=d["instrument_id"], + exchange=Exchange.OKEX, + direction=Direction.LONG, + volume=d["long_qty"], + frozen=float(d["long_qty"]) - float(d["long_avail_qty"]), + price=d["long_avg_cost"], + pnl=d["realised_pnl"], + gateway_name=self.gateway_name, + ) + self.gateway.on_position(pos) + + pos = PositionData( + symbol=d["instrument_id"], + exchange=Exchange.OKEX, + direction=Direction.SHORT, + volume=d["short_qty"], + frozen=float(d["short_qty"]) - float(d["short_avail_qty"]), + price=d["short_avg_cost"], + pnl=d["realised_pnl"], + gateway_name=self.gateway_name, + ) + self.gateway.on_position(pos) + + +def generate_signature(msg: str, secret_key: str): + """OKEX V3 signature""" + return base64.b64encode(hmac.new(secret_key, msg.encode(), hashlib.sha256).digest()) + + +def get_timestamp(): + """""" + now = datetime.utcnow() + timestamp = now.isoformat("T", "milliseconds") + return timestamp + "Z" + + +def UTC2LOCAL(timestamp): + import datetime + time = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") + utc_time = time + datetime.timedelta(hours=8) + return utc_time