From 2f7cba2680bd2d0ace84f65037d69d9973327b48 Mon Sep 17 00:00:00 2001 From: KeKe <45591222+1122455801@users.noreply.github.com> Date: Wed, 25 Sep 2019 17:10:30 +0800 Subject: [PATCH 1/6] Create __init__.py --- vnpy/gateway/gateio_f/__init__.py | 1 + 1 file changed, 1 insertion(+) create mode 100644 vnpy/gateway/gateio_f/__init__.py diff --git a/vnpy/gateway/gateio_f/__init__.py b/vnpy/gateway/gateio_f/__init__.py new file mode 100644 index 00000000..d39ab445 --- /dev/null +++ b/vnpy/gateway/gateio_f/__init__.py @@ -0,0 +1 @@ +from .gateiof_gateway import GateiofGateway From 04224b5e1387e60fd43d8a2b9f0d6be5ab8d8ba2 Mon Sep 17 00:00:00 2001 From: KeKe <45591222+1122455801@users.noreply.github.com> Date: Wed, 25 Sep 2019 17:12:13 +0800 Subject: [PATCH 2/6] Update constant.py --- vnpy/trader/constant.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vnpy/trader/constant.py b/vnpy/trader/constant.py index 41838766..ca1771d6 100644 --- a/vnpy/trader/constant.py +++ b/vnpy/trader/constant.py @@ -121,6 +121,7 @@ class Exchange(Enum): BINANCE = "BINANCE" BYBIT = "BYBIT" # bybit.com COINBASE = "COINBASE" + GATEIO = "GATEIO" # Special Function LOCAL = "LOCAL" # For local generated data From 4626deed62d0ea28de83da95746af25dcbcd926f Mon Sep 17 00:00:00 2001 From: KeKe <45591222+1122455801@users.noreply.github.com> Date: Wed, 25 Sep 2019 17:13:11 +0800 Subject: [PATCH 3/6] Create gateiof_gateway.py --- vnpy/gateway/gateio_f/gateiof_gateway.py | 868 +++++++++++++++++++++++ 1 file changed, 868 insertions(+) create mode 100644 vnpy/gateway/gateio_f/gateiof_gateway.py diff --git a/vnpy/gateway/gateio_f/gateiof_gateway.py b/vnpy/gateway/gateio_f/gateiof_gateway.py new file mode 100644 index 00000000..67a7911c --- /dev/null +++ b/vnpy/gateway/gateio_f/gateiof_gateway.py @@ -0,0 +1,868 @@ +""" +Gate.io 期货接口 +""" +import hashlib +import hmac +import json +import sys +import time +from copy import copy +from datetime import datetime, timedelta +from threading import Lock +from urllib.parse import urlencode + +from vnpy.api.rest import Request, RestClient +from vnpy.api.websocket import WebsocketClient +from vnpy.event import Event +from vnpy.trader.constant import (Direction, Exchange, Interval, OrderType, + Product, Status) +from vnpy.trader.event import EVENT_TIMER +from vnpy.trader.gateway import BaseGateway, LocalOrderManager +from vnpy.trader.object import (AccountData, BarData, CancelRequest, + ContractData, HistoryRequest, OrderData, + OrderRequest, PositionData, SubscribeRequest, + TickData, TradeData) + +TESTNET_REST_HOST = "https://fx-api-testnet.gateio.ws/" +REST_HOST = "https://api.gateio.ws" + +TESTNET_WEBSOCKET_HOST = "wss://fx-ws-testnet.gateio.ws/v4/ws" +WEBSOCKET_HOST = "wss://fx-ws.gateio.ws/v4/ws" + +STATUS_GATEIO2VT = { + "open": Status.NOTTRADED, + "finished": Status.ALLTRADED, +} + +INTERVAL_VT2GATEIO = { + Interval.MINUTE: "1m", + Interval.HOUR: "1h", + Interval.DAILY: "1d", +} + +TIMEDELTA_MAP = { + Interval.MINUTE: timedelta(minutes=1), + Interval.HOUR: timedelta(hours=1), + Interval.DAILY: timedelta(days=1), +} + + +class GateiofGateway(BaseGateway): + """ + VN Trader Gateway for Gate.io future connection. + """ + + default_setting = { + "API Key": "", + "Secret Key": "", + "会话数": 3, + "服务器": ["REAL", "TESTNET"], + "代理地址": "", + "代理端口": "", + } + + exchanges = [Exchange.GATEIO] + + def __init__(self, event_engine): + """Constructor""" + super(GateiofGateway, self).__init__(event_engine, "GATEIO") + self.order_manager = LocalOrderManager(self) + + self.ws_api = GateiofWebsocketApi(self) + self.rest_api = GateiofRestApi(self) + + def connect(self, setting: dict): + """""" + key = setting["API Key"] + secret = setting["Secret Key"] + session_number = setting["会话数"] + server = setting["服务器"] + proxy_host = setting["代理地址"] + proxy_port = setting["代理端口"] + + if proxy_port.isdigit(): + proxy_port = int(proxy_port) + else: + proxy_port = 0 + + self.rest_api.connect(key, secret, session_number, server, + proxy_host, proxy_port) + self.ws_api.connect(key, secret, server, proxy_host, proxy_port) + + self.query_account() + self.query_position() + self.cycle_query() + + def subscribe(self, req: SubscribeRequest): + """""" + self.ws_api.subscribe(req) + + def send_order(self, req: OrderRequest): + """""" + return self.rest_api.send_order(req) + + def cancel_order(self, req: CancelRequest): + """""" + self.rest_api.cancel_order(req) + + def query_account(self): + """""" + self.rest_api.query_account() + + def query_position(self): + """""" + self.rest_api.query_position() + + def query_history(self, req: HistoryRequest): + """""" + return self.rest_api.query_history(req) + + def close(self): + """""" + self.rest_api.stop() + self.ws_api.stop() + + def process_timer_event(self, event: Event): + """""" + if self.rest_api.cycle_query: + self.count += 1 + if self.count < 3: + return + + self.query_account() + self.query_position() + + def cycle_query(self): + """""" + self.count = 0 + self.event_engine.register(EVENT_TIMER, self.process_timer_event) + + +class GateiofRestApi(RestClient): + """ + Gateiof REST API + """ + + def __init__(self, gateway: BaseGateway): + """""" + super().__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + self.order_manager = gateway.order_manager + self.ws_api = self.gateway.ws_api + + self.key = "" + self.secret = "" + self.account_id = "" + + self.order_count = 10000 + self.trade_count = 0 + self.order_count_lock = Lock() + self.connect_date = 0 + + self.contracts = [] + + self.cycle_query = False + self.contract_query = False + + def sign(self, request): + """ + Generate HBDM signature. + """ + + headers_with_signature = generate_sign( + self.key, + self.secret, + request.method, + request.path, + get_params=request.params, + get_data=request.data + ) + + request.headers = headers_with_signature + + # if request.params: + # query = urlencode(request.params) + # path = request.path + "?" + query + # else: + # path = request.path + + if not request.data: + request.data = "" + + return request + + def connect( + self, + key: str, + secret: str, + session_number: int, + server: str, + proxy_host: str, + proxy_port: int + ): + """ + Initialize connection to REST server. + """ + self.key = key + self.secret = secret + + self.connect_date = int(datetime.now().strftime("%y%m%d")) + + if server == "REAL": + self.init(REST_HOST, proxy_host, proxy_port) + else: + self.init(TESTNET_REST_HOST, proxy_host, proxy_port) + + self.start(session_number) + + self.gateway.write_log("REST API启动成功") + + def query_account(self): + """""" + self.add_request( + method="GET", + path="/api/v4/futures/accounts", + callback=self.on_query_account + ) + + def query_position(self): + """""" + self.add_request( + method="GET", + path="/api/v4/futures/positions", + callback=self.on_query_position + ) + + def query_order(self): + """""" + for contract in self.contracts: + params = { + "contract": contract, + "status": "open", + } + + self.add_request( + method="GET", + path="/api/v4/futures/orders", + callback=self.on_query_order, + params=params + ) + + def query_trade(self): + """""" + self.add_request( + method="GET", + path="/api/v4/futures/my_trades", + callback=self.on_query_trade + ) + + def query_contract(self): + """""" + self.add_request( + method="GET", + path="/api/v4/futures/contracts", + callback=self.on_query_contract + ) + + def query_history(self, req: HistoryRequest): + """""" + history = [] + interval = INTERVAL_VT2GATEIO[req.interval] + + params = { + "contract": req.symbol, + "limit": 2000, + "interval": interval, + } + + resp = self.request( + method="GET", + path="/api/v4/futures/candlesticks", + params=params + ) + + if resp.status_code // 100 != 2: + msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}" + self.gateway.write_log(msg) + else: + data = resp.json() + if not data: + msg = f"获取历史数据为空" + + for d in data: + bar = BarData( + symbol=req.symbol, + exchange=req.exchange, + datetime=datetime.fromtimestamp(d["t"]), + interval=req.interval, + volume=d["v"], + open_price=float(d["o"]), + high_price=float(d["h"]), + low_price=float(d["l"]), + close_price=float(d["c"]), + gateway_name=self.gateway_name + ) + history.append(bar) + + begin = datetime.fromtimestamp(data[0]["t"]) + end = datetime.fromtimestamp(data[-1]["t"]) + + msg = f"获取历史数据成功,{req.symbol} - {req.interval.value},{begin} - {end}" + + self.gateway.write_log(msg) + + return history + + def new_local_orderid(self): + """""" + with self.order_count_lock: + self.order_count += 1 + local_orderid = f"{self.connect_date}{self.order_count}" + return local_orderid + + def send_order(self, req: OrderRequest): + """""" + local_orderid = self.new_local_orderid() + + order = req.create_order_data( + local_orderid, + self.gateway_name + ) + + order.time = datetime.now().strftime("%H:%M:%S") + + if req.direction == Direction.SHORT: + volume = -int(req.volume) + else: + volume = int(req.volume) + + request_body = { + "contract": req.symbol, + "size": volume, + "price": str(req.price), + "close": False, + "tif": "gtc", + "text": f"t-{local_orderid}" + } + + data = json.dumps(request_body) + + self.add_request( + method="POST", + path="/api/v4/futures/orders", + callback=self.on_send_order, + data=data, + extra=order, + on_error=self.on_send_order_error, + on_failed=self.on_send_order_failed + ) + + self.gateway.on_order(order) + return order.vt_orderid + + def cancel_order(self, req: CancelRequest): + """""" + sys_orderid = self.order_manager.get_sys_orderid(req.orderid) + + self.add_request( + method="DELETE", + path=f"/api/v4/futures/orders/{sys_orderid}", + callback=self.on_cancel_order, + on_failed=self.on_cancel_order_failed, + extra=req + ) + + def on_query_account(self, data, request): + """""" + self.account_id = str(data["user"]) + + account = AccountData( + accountid=self.account_id, + balance=float(data["total"]), + frozen=float(data["total"]) - float(data["available"]), + gateway_name=self.gateway_name, + ) + self.gateway.on_account(account) + + if not self.contract_query: + self.query_contract() + + def on_query_position(self, data, request): + """""" + for d in data: + volume = float(d["size"]) + + position = PositionData( + symbol=d["contract"], + exchange=Exchange.GATEIO, + volume=volume, + direction=Direction.NET, + price=float(d["entry_price"]), + pnl=float(d["unrealised_pnl"]), + gateway_name=self.gateway_name, + ) + self.gateway.on_position(position) + + def on_query_trade(self, data, request): + """""" + data = list( + reversed(data) + ) + + for d in data: + local_orderid = self.new_local_orderid() + self.trade_count += 1 + + volume = d["size"] + if volume > 0: + direction = Direction.LONG + else: + direction = Direction.SHORT + + dt = datetime.fromtimestamp(d["create_time"]).strftime("%H:%M:%S") + + trade = TradeData( + symbol=d["contract"], + exchange=Exchange.GATEIO, + orderid=local_orderid, + tradeid=str(self.trade_count).rjust(8, "0"), + direction=direction, + price=d["price"], + volume=abs(volume), + time=dt, + gateway_name=self.gateway_name, + ) + self.gateway.on_trade(trade) + + self.gateway.write_log("成交查询成功") + self.ws_api.trade_count = self.trade_count + self.query_order() + + def on_query_order(self, data, request): + + """""" + if not data: + return + + for d in data: + + local_orderid = self.new_local_orderid() + sys_orderid = str(d["id"]) + + self.order_manager.update_orderid_map( + local_orderid=local_orderid, + sys_orderid=sys_orderid + ) + + volume = d["size"] + if volume > 0: + direction = Direction.LONG + else: + direction = Direction.SHORT + + dt = datetime.fromtimestamp(d["create_time"]).strftime("%H:%M:%S") + + order = OrderData( + orderid=local_orderid, + symbol=d["contract"], + exchange=Exchange.GATEIO, + price=float(d["price"]), + volume=abs(volume), + type=OrderType.LIMIT, + direction=direction, + status=STATUS_GATEIO2VT[d["status"]], + time=dt, + gateway_name=self.gateway_name, + ) + self.order_manager.on_order(order) + + self.gateway.write_log(f"{order.symbol}合约委托信息查询成功") + + def on_query_contract(self, data, request): # type: (dict, Request)->None + """""" + for d in data: + symbol = d["name"] + self.contracts.append(symbol) + + contract = ContractData( + symbol=symbol, + exchange=Exchange.GATEIO, + name=symbol, + pricetick=d["order_price_round"], + size=int(d["leverage_min"]), + min_volume=d["order_size_min"], + product=Product.FUTURES, + gateway_name=self.gateway_name, + history_data=True + ) + self.gateway.on_contract(contract) + + self.gateway.write_log("合约信息查询成功") + + self.contract_query = True + + self.query_trade() + self.update_info(self.contracts, self.account_id) + + def update_info(self, symbols, account_id): + """""" + self.cycle_query = True + + for symbol in symbols: + self.ws_api.update_info(symbol, account_id) + + def on_send_order(self, data, request): + """""" + order = request.extra + order.status = STATUS_GATEIO2VT[data["status"]] + + if order.status == Status.ALLTRADED: + order.traded = order.volume + + dt = datetime.fromtimestamp(data["create_time"]) + order.time = dt.strftime("%H:%M:%S") + + sys_orderid = data["id"] + self.order_manager.on_order(order) + self.order_manager.update_orderid_map(order.orderid, sys_orderid) + + def on_send_order_failed(self, status_code: str, request: Request): + """ + Callback when sending order failed on server. + """ + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) + + msg = f"委托失败,状态码:{status_code},信息:{request.response.text}" + + self.gateway.write_log(msg) + + def on_send_order_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback when sending order caused exception. + """ + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) + + # Record exception if not ConnectionError + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) + + def on_cancel_order(self, data, request): + """""" + cancel_request = request.extra + local_orderid = cancel_request.orderid + order = self.order_manager.get_order_with_local_orderid(local_orderid) + + if self.check_error(data, "撤单"): + order.status = Status.REJECTED + else: + order.status = Status.CANCELLED + self.gateway.write_log(f"委托撤单成功:{order.orderid}") + + self.order_manager.on_order(order) + + def on_cancel_order_failed(self, status_code: str, request: Request): + """ + Callback when canceling order failed on server. + """ + + msg = f"撤单失败,状态码:{status_code},信息:{request.response.text}" + self.gateway.write_log(msg) + + def check_error(self, data: dict, func: str = ""): + """""" + if data["status"] != "error": + return False + + error_code = data["err_code"] + error_msg = data["err_msg"] + + self.gateway.write_log(f"{func}请求出错,代码:{error_code},信息:{error_msg}") + return True + + +class GateiofWebsocketApi(WebsocketClient): + """""" + + def __init__(self, gateway): + """""" + super(GateiofWebsocketApi, self).__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + self.order_manager = gateway.order_manager + + self.key = "" + self.secret = "" + self.account_id = "" + self.trade_count = 0 + + self.ticks = {} + + def connect( + self, + key: str, + secret: str, + server: str, + proxy_host: str, + proxy_port: int + ): + """""" + self.key = key + self.secret = secret + + if server == "REAL": + self.init(WEBSOCKET_HOST, proxy_host, proxy_port) + else: + self.init(TESTNET_WEBSOCKET_HOST, proxy_host, proxy_port) + self.start() + + def on_connected(self): + """""" + self.gateway.write_log("Websocket API连接成功") + + def subscribe(self, req: SubscribeRequest): + """ + Subscribe to tick data upate. + """ + tick = TickData( + symbol=req.symbol, + exchange=req.exchange, + name=req.symbol, + datetime=datetime.now(), + gateway_name=self.gateway_name, + ) + self.ticks[req.symbol] = tick + + self.subscribe_data(req.symbol) + + def subscribe_data(self, symbol: str): + """""" + update_tick = self.construct_req( + channel="futures.tickers", + event="subscribe", + pay_load=[symbol] + ) + self.send_packet(update_tick) + + update_depth = self.construct_req( + channel="futures.order_book", + event="subscribe", + pay_load=[symbol, "5", "0"] + ) + self.send_packet(update_depth) + + def update_info(self, symbol: str, account_id: str): + """""" + + update_order = self.construct_req( + channel="futures.orders", + event="subscribe", + pay_load=[account_id, symbol] + ) + self.send_packet(update_order) + + update_position = self.construct_req( + channel="futures.position_closes", + event="subscribe", + pay_load=[account_id, symbol] + ) + self.send_packet(update_position) + + def on_disconnected(self): + """""" + self.gateway.write_log("Websocket API连接断开") + + def on_packet(self, packet: dict): + """""" + + timestamp = packet["time"] + channel = packet["channel"] + event = packet["event"] + result = packet["result"] + error = packet["error"] + + if error: + self.gateway.write_log("Websocket API报错:%s" % error) + return + + if channel == "futures.tickers": + if event == "update": + self.on_tick(result, timestamp) + + elif channel == "futures.order_book": + if event == "all": + self.on_depth(result, timestamp) + + elif channel == "futures.orders": + if event == "update": + self.on_order(result, timestamp) + + def on_error(self, exception_type: type, exception_value: Exception, tb): + """""" + msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" + self.gateway.write_log(msg) + + sys.stderr.write(self.exception_detail( + exception_type, exception_value, tb)) + + def construct_req(self, channel: str, event: str, pay_load: list): + """""" + expires = int(time.time()) + signature = generate_websocket_sign( + self.secret, channel, event, expires) + + req = { + "time": expires, + "channel": channel, + "event": event, + "payload": pay_load, + "auth": { + "method": "api_key", + "KEY": self.key, + "SIGN": signature + } + } + + return req + + def on_tick(self, l: list, t: int): + """""" + + d = l[0] + symbol = d["contract"] + tick = self.ticks.get(symbol, None) + if not tick: + return + + tick.last_price = float(d["last"]) + tick.datetime = datetime.fromtimestamp(t) + self.gateway.on_tick(copy(tick)) + + def on_depth(self, d: dict, t: int): + """""" + symbol = d["contract"] + tick = self.ticks.get(symbol, None) + if not tick: + return + + for n, buf in enumerate(d["bids"][:5]): + price = float(buf["p"]) + volume = buf["s"] + tick.__setattr__("bid_price_%s" % (n + 1), price) + tick.__setattr__("bid_volume_%s" % (n + 1), volume) + + for n, buf in enumerate(d["asks"][:5]): + price = float(buf["p"]) + volume = buf["s"] + tick.__setattr__("ask_price_%s" % (n + 1), price) + tick.__setattr__("ask_volume_%s" % (n + 1), volume) + + tick.datetime = datetime.fromtimestamp(t) + + self.gateway.on_tick(copy(tick)) + + def on_order(self, l: list, t: int): + """""" + d = l[0] + + local_orderid = str(d["text"])[2:] + + volume = d["size"] + if volume > 0: + direction = Direction.LONG + else: + direction = Direction.SHORT + + order = OrderData( + orderid=local_orderid, + symbol=d["contract"], + exchange=Exchange.GATEIO, + price=float(d["price"]), + volume=abs(volume), + type=OrderType.LIMIT, + direction=direction, + status=STATUS_GATEIO2VT[d["status"]], + time=datetime.fromtimestamp(t).strftime("%H:%M:%S"), + gateway_name=self.gateway_name, + ) + + self.order_manager.on_order(copy(order)) + + # Update trade + if order.status == Status.ALLTRADED: + self.trade_count += 1 + + trade = TradeData( + symbol=order.symbol, + exchange=order.exchange, + orderid=order.orderid, + tradeid=str(self.trade_count).rjust(8, "0"), + direction=order.direction, + price=float(d["fill_price"]), + volume=order.volume, + time=order.time, + gateway_name=self.gateway_name, + ) + self.gateway.on_trade(trade) + + +def generate_sign(key, secret, method, path, get_params=None, get_data=None): + """""" + if get_params: + params = urlencode(get_params) + else: + params = "" + + hashed_data = get_hashed_data(get_data) + + timestamp = str(time.time()) + + pay_load = [method, path, params, hashed_data, timestamp] + pay_load = "\n".join(pay_load) + + signature = hmac.new( + secret.encode("utf-8"), + pay_load.encode("utf-8"), + hashlib.sha512 + ).hexdigest() + + headers = { + "Accept": "application/json", + "Content-Type": "application/json", + "KEY": key, + "Timestamp": str(timestamp), + "SIGN": signature + } + + return headers + + +def get_hashed_data(get_data): + """""" + hashed_data = hashlib.sha512() + if get_data: + data = get_data + hashed_data.update(data.encode("utf-8")) + + return hashed_data.hexdigest() + + +def generate_websocket_sign(secret, channel, event, time): + """""" + message = 'channel=%s&event=%s&time=%s' % (channel, event, time) + + signature = hmac.new( + secret.encode("utf-8"), + message.encode("utf-8"), + hashlib.sha512 + ).hexdigest() + + return signature From d518566e558c95007830555bf39de2c14d6bd58c Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Fri, 18 Oct 2019 19:12:20 +0800 Subject: [PATCH 4/6] [Mod] change gateio gateway name --- .../gateway/{gateio_f => gateios}/__init__.py | 0 .../gateios_gateway.py} | 202 ++++++------------ 2 files changed, 64 insertions(+), 138 deletions(-) rename vnpy/gateway/{gateio_f => gateios}/__init__.py (100%) rename vnpy/gateway/{gateio_f/gateiof_gateway.py => gateios/gateios_gateway.py} (84%) diff --git a/vnpy/gateway/gateio_f/__init__.py b/vnpy/gateway/gateios/__init__.py similarity index 100% rename from vnpy/gateway/gateio_f/__init__.py rename to vnpy/gateway/gateios/__init__.py diff --git a/vnpy/gateway/gateio_f/gateiof_gateway.py b/vnpy/gateway/gateios/gateios_gateway.py similarity index 84% rename from vnpy/gateway/gateio_f/gateiof_gateway.py rename to vnpy/gateway/gateios/gateios_gateway.py index 67a7911c..4ca2a6ae 100644 --- a/vnpy/gateway/gateio_f/gateiof_gateway.py +++ b/vnpy/gateway/gateios/gateios_gateway.py @@ -10,6 +10,7 @@ from copy import copy from datetime import datetime, timedelta from threading import Lock from urllib.parse import urlencode +from typing import List from vnpy.api.rest import Request, RestClient from vnpy.api.websocket import WebsocketClient @@ -23,6 +24,7 @@ from vnpy.trader.object import (AccountData, BarData, CancelRequest, OrderRequest, PositionData, SubscribeRequest, TickData, TradeData) + TESTNET_REST_HOST = "https://fx-api-testnet.gateio.ws/" REST_HOST = "https://api.gateio.ws" @@ -55,7 +57,6 @@ class GateiofGateway(BaseGateway): default_setting = { "API Key": "", "Secret Key": "", - "会话数": 3, "服务器": ["REAL", "TESTNET"], "代理地址": "", "代理端口": "", @@ -65,7 +66,7 @@ class GateiofGateway(BaseGateway): def __init__(self, event_engine): """Constructor""" - super(GateiofGateway, self).__init__(event_engine, "GATEIO") + super().__init__(event_engine, "GATEIOS") self.order_manager = LocalOrderManager(self) self.ws_api = GateiofWebsocketApi(self) @@ -75,7 +76,6 @@ class GateiofGateway(BaseGateway): """""" key = setting["API Key"] secret = setting["Secret Key"] - session_number = setting["会话数"] server = setting["服务器"] proxy_host = setting["代理地址"] proxy_port = setting["代理端口"] @@ -85,13 +85,9 @@ class GateiofGateway(BaseGateway): else: proxy_port = 0 - self.rest_api.connect(key, secret, session_number, server, - proxy_host, proxy_port) - self.ws_api.connect(key, secret, server, proxy_host, proxy_port) + self.rest_api.connect(key, secret, server, proxy_host, proxy_port) - self.query_account() - self.query_position() - self.cycle_query() + self.init_query() def subscribe(self, req: SubscribeRequest): """""" @@ -103,7 +99,7 @@ class GateiofGateway(BaseGateway): def cancel_order(self, req: CancelRequest): """""" - self.rest_api.cancel_order(req) + self.order_manager.cancel_order(req) def query_account(self): """""" @@ -124,17 +120,11 @@ class GateiofGateway(BaseGateway): def process_timer_event(self, event: Event): """""" - if self.rest_api.cycle_query: - self.count += 1 - if self.count < 3: - return + self.query_account() + self.query_position() - self.query_account() - self.query_position() - - def cycle_query(self): + def init_query(self): """""" - self.count = 0 self.event_engine.register(EVENT_TIMER, self.process_timer_event) @@ -150,28 +140,22 @@ class GateiofRestApi(RestClient): self.gateway = gateway self.gateway_name = gateway.gateway_name self.order_manager = gateway.order_manager - self.ws_api = self.gateway.ws_api + self.ws_api = gateway.ws_api self.key = "" self.secret = "" self.account_id = "" + self.server = "" + self.proxy_host = "" + self.proxy_port = 0 - self.order_count = 10000 - self.trade_count = 0 - self.order_count_lock = Lock() - self.connect_date = 0 - - self.contracts = [] - - self.cycle_query = False - self.contract_query = False + self.symbols = [] def sign(self, request): """ Generate HBDM signature. """ - - headers_with_signature = generate_sign( + request.headers = generate_sign( self.key, self.secret, request.method, @@ -180,14 +164,6 @@ class GateiofRestApi(RestClient): get_data=request.data ) - request.headers = headers_with_signature - - # if request.params: - # query = urlencode(request.params) - # path = request.path + "?" + query - # else: - # path = request.path - if not request.data: request.data = "" @@ -197,7 +173,6 @@ class GateiofRestApi(RestClient): self, key: str, secret: str, - session_number: int, server: str, proxy_host: str, proxy_port: int @@ -207,6 +182,9 @@ class GateiofRestApi(RestClient): """ self.key = key self.secret = secret + self.server = server + self.proxy_host = proxy_host + self.proxy_port = proxy_port self.connect_date = int(datetime.now().strftime("%y%m%d")) @@ -215,7 +193,7 @@ class GateiofRestApi(RestClient): else: self.init(TESTNET_REST_HOST, proxy_host, proxy_port) - self.start(session_number) + self.start(3) self.gateway.write_log("REST API启动成功") @@ -237,7 +215,7 @@ class GateiofRestApi(RestClient): def query_order(self): """""" - for contract in self.contracts: + for contract in self.symbols: params = { "contract": contract, "status": "open", @@ -250,14 +228,6 @@ class GateiofRestApi(RestClient): params=params ) - def query_trade(self): - """""" - self.add_request( - method="GET", - path="/api/v4/futures/my_trades", - callback=self.on_query_trade - ) - def query_contract(self): """""" self.add_request( @@ -315,22 +285,13 @@ class GateiofRestApi(RestClient): return history - def new_local_orderid(self): - """""" - with self.order_count_lock: - self.order_count += 1 - local_orderid = f"{self.connect_date}{self.order_count}" - return local_orderid - def send_order(self, req: OrderRequest): """""" - local_orderid = self.new_local_orderid() - + local_orderid = self.order_manager.new_local_orderid() order = req.create_order_data( local_orderid, self.gateway_name ) - order.time = datetime.now().strftime("%H:%M:%S") if req.direction == Direction.SHORT: @@ -386,18 +347,16 @@ class GateiofRestApi(RestClient): ) self.gateway.on_account(account) - if not self.contract_query: + if not self.symbols: self.query_contract() def on_query_position(self, data, request): """""" for d in data: - volume = float(d["size"]) - position = PositionData( symbol=d["contract"], exchange=Exchange.GATEIO, - volume=volume, + volume=float(d["size"]), direction=Direction.NET, price=float(d["entry_price"]), pnl=float(d["unrealised_pnl"]), @@ -405,49 +364,9 @@ class GateiofRestApi(RestClient): ) self.gateway.on_position(position) - def on_query_trade(self, data, request): - """""" - data = list( - reversed(data) - ) - - for d in data: - local_orderid = self.new_local_orderid() - self.trade_count += 1 - - volume = d["size"] - if volume > 0: - direction = Direction.LONG - else: - direction = Direction.SHORT - - dt = datetime.fromtimestamp(d["create_time"]).strftime("%H:%M:%S") - - trade = TradeData( - symbol=d["contract"], - exchange=Exchange.GATEIO, - orderid=local_orderid, - tradeid=str(self.trade_count).rjust(8, "0"), - direction=direction, - price=d["price"], - volume=abs(volume), - time=dt, - gateway_name=self.gateway_name, - ) - self.gateway.on_trade(trade) - - self.gateway.write_log("成交查询成功") - self.ws_api.trade_count = self.trade_count - self.query_order() - def on_query_order(self, data, request): - """""" - if not data: - return - for d in data: - local_orderid = self.new_local_orderid() sys_orderid = str(d["id"]) @@ -461,8 +380,8 @@ class GateiofRestApi(RestClient): direction = Direction.LONG else: direction = Direction.SHORT - - dt = datetime.fromtimestamp(d["create_time"]).strftime("%H:%M:%S") + + dt = datetime.fromtimestamp(d["create_time"]) order = OrderData( orderid=local_orderid, @@ -473,7 +392,7 @@ class GateiofRestApi(RestClient): type=OrderType.LIMIT, direction=direction, status=STATUS_GATEIO2VT[d["status"]], - time=dt, + time=dt.strftime("%H:%M:%S"), gateway_name=self.gateway_name, ) self.order_manager.on_order(order) @@ -484,7 +403,7 @@ class GateiofRestApi(RestClient): """""" for d in data: symbol = d["name"] - self.contracts.append(symbol) + self.symbols.append(symbol) contract = ContractData( symbol=symbol, @@ -501,23 +420,22 @@ class GateiofRestApi(RestClient): self.gateway.write_log("合约信息查询成功") - self.contract_query = True - - self.query_trade() - self.update_info(self.contracts, self.account_id) - - def update_info(self, symbols, account_id): - """""" - self.cycle_query = True - - for symbol in symbols: - self.ws_api.update_info(symbol, account_id) + # Connect websocket api after account id and contract symbols collected + self.ws_api.connect( + self.key, + self.secret, + self.server, + self.proxy_host, + self.proxy_port, + self.account_id, + self.symbols + ) def on_send_order(self, data, request): """""" order = request.extra order.status = STATUS_GATEIO2VT[data["status"]] - + if order.status == Status.ALLTRADED: order.traded = order.volume @@ -602,8 +520,9 @@ class GateiofWebsocketApi(WebsocketClient): self.key = "" self.secret = "" self.account_id = "" - self.trade_count = 0 + self.symbols = [] + self.trade_count = 0 self.ticks = {} def connect( @@ -612,22 +531,42 @@ class GateiofWebsocketApi(WebsocketClient): secret: str, server: str, proxy_host: str, - proxy_port: int + proxy_port: int, + account_id: str, + symbols: List[str] ): """""" self.key = key self.secret = secret + self.account_id = account_id + self.symbols = symbols if server == "REAL": self.init(WEBSOCKET_HOST, proxy_host, proxy_port) else: self.init(TESTNET_WEBSOCKET_HOST, proxy_host, proxy_port) + self.start() def on_connected(self): """""" self.gateway.write_log("Websocket API连接成功") + for symbol in self.symbols: + update_order = self.construct_req( + channel="futures.orders", + event="subscribe", + pay_load=[self.account_id, symbol] + ) + self.send_packet(update_order) + + update_position = self.construct_req( + channel="futures.position_closes", + event="subscribe", + pay_load=[self.account_id, symbol] + ) + self.send_packet(update_position) + def subscribe(self, req: SubscribeRequest): """ Subscribe to tick data upate. @@ -661,20 +600,7 @@ class GateiofWebsocketApi(WebsocketClient): def update_info(self, symbol: str, account_id: str): """""" - - update_order = self.construct_req( - channel="futures.orders", - event="subscribe", - pay_load=[account_id, symbol] - ) - self.send_packet(update_order) - - update_position = self.construct_req( - channel="futures.position_closes", - event="subscribe", - pay_load=[account_id, symbol] - ) - self.send_packet(update_position) + def on_disconnected(self): """""" @@ -809,7 +735,7 @@ class GateiofWebsocketApi(WebsocketClient): price=float(d["fill_price"]), volume=order.volume, time=order.time, - gateway_name=self.gateway_name, + gateway_name=self.gateway_name, ) self.gateway.on_trade(trade) @@ -841,7 +767,7 @@ def generate_sign(key, secret, method, path, get_params=None, get_data=None): "Timestamp": str(timestamp), "SIGN": signature } - + return headers From 4336ccf06504d0120d98a3d95ac3e9be2d1068fa Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Fri, 18 Oct 2019 19:22:35 +0800 Subject: [PATCH 5/6] [Mod] change function names --- examples/vn_trader/run.py | 2 + vnpy/gateway/gateios/__init__.py | 2 +- vnpy/gateway/gateios/gateios_gateway.py | 74 +++++++++---------------- 3 files changed, 28 insertions(+), 50 deletions(-) diff --git a/examples/vn_trader/run.py b/examples/vn_trader/run.py index 95ac966c..6aa6332c 100644 --- a/examples/vn_trader/run.py +++ b/examples/vn_trader/run.py @@ -30,6 +30,7 @@ from vnpy.gateway.okexs import OkexsGateway from vnpy.gateway.da import DaGateway from vnpy.gateway.coinbase import CoinbaseGateway from vnpy.gateway.bitstamp import BitstampGateway +from vnpy.gateway.gateios import GateiosGateway from vnpy.app.cta_strategy import CtaStrategyApp # from vnpy.app.csv_loader import CsvLoaderApp @@ -76,6 +77,7 @@ def main(): # main_engine.add_gateway(DaGateway) # main_engine.add_gateway(CoinbaseGateway) main_engine.add_gateway(BitstampGateway) + main_engine.add_gateway(GateiosGateway) main_engine.add_app(CtaStrategyApp) main_engine.add_app(CtaBacktesterApp) diff --git a/vnpy/gateway/gateios/__init__.py b/vnpy/gateway/gateios/__init__.py index d39ab445..0234aa20 100644 --- a/vnpy/gateway/gateios/__init__.py +++ b/vnpy/gateway/gateios/__init__.py @@ -1 +1 @@ -from .gateiof_gateway import GateiofGateway +from .gateios_gateway import GateiosGateway diff --git a/vnpy/gateway/gateios/gateios_gateway.py b/vnpy/gateway/gateios/gateios_gateway.py index 4ca2a6ae..c42a6825 100644 --- a/vnpy/gateway/gateios/gateios_gateway.py +++ b/vnpy/gateway/gateios/gateios_gateway.py @@ -49,7 +49,7 @@ TIMEDELTA_MAP = { } -class GateiofGateway(BaseGateway): +class GateiosGateway(BaseGateway): """ VN Trader Gateway for Gate.io future connection. """ @@ -69,8 +69,8 @@ class GateiofGateway(BaseGateway): super().__init__(event_engine, "GATEIOS") self.order_manager = LocalOrderManager(self) - self.ws_api = GateiofWebsocketApi(self) - self.rest_api = GateiofRestApi(self) + self.ws_api = GateiosWebsocketApi(self) + self.rest_api = GateiosRestApi(self) def connect(self, setting: dict): """""" @@ -128,9 +128,9 @@ class GateiofGateway(BaseGateway): self.event_engine.register(EVENT_TIMER, self.process_timer_event) -class GateiofRestApi(RestClient): +class GateiosRestApi(RestClient): """ - Gateiof REST API + Gateios REST API """ def __init__(self, gateway: BaseGateway): @@ -455,7 +455,6 @@ class GateiofRestApi(RestClient): self.gateway.on_order(order) msg = f"委托失败,状态码:{status_code},信息:{request.response.text}" - self.gateway.write_log(msg) def on_send_order_error( @@ -478,40 +477,28 @@ class GateiofRestApi(RestClient): local_orderid = cancel_request.orderid order = self.order_manager.get_order_with_local_orderid(local_orderid) - if self.check_error(data, "撤单"): - order.status = Status.REJECTED + if data["status"] == "error": + error_code = data["err_code"] + error_msg = data["err_msg"] + self.gateway.write_log(f"撤单失败,错误代码:{error_code},信息:{error_msg}") else: order.status = Status.CANCELLED - self.gateway.write_log(f"委托撤单成功:{order.orderid}") - - self.order_manager.on_order(order) + self.order_manager.on_order(order) def on_cancel_order_failed(self, status_code: str, request: Request): """ Callback when canceling order failed on server. """ - msg = f"撤单失败,状态码:{status_code},信息:{request.response.text}" self.gateway.write_log(msg) - def check_error(self, data: dict, func: str = ""): - """""" - if data["status"] != "error": - return False - error_code = data["err_code"] - error_msg = data["err_msg"] - - self.gateway.write_log(f"{func}请求出错,代码:{error_code},信息:{error_msg}") - return True - - -class GateiofWebsocketApi(WebsocketClient): +class GateiosWebsocketApi(WebsocketClient): """""" def __init__(self, gateway): """""" - super(GateiofWebsocketApi, self).__init__() + super(GateiosWebsocketApi, self).__init__() self.gateway = gateway self.gateway_name = gateway.gateway_name @@ -545,7 +532,7 @@ class GateiofWebsocketApi(WebsocketClient): self.init(WEBSOCKET_HOST, proxy_host, proxy_port) else: self.init(TESTNET_WEBSOCKET_HOST, proxy_host, proxy_port) - + self.start() def on_connected(self): @@ -553,14 +540,14 @@ class GateiofWebsocketApi(WebsocketClient): self.gateway.write_log("Websocket API连接成功") for symbol in self.symbols: - update_order = self.construct_req( + update_order = self.generate_req( channel="futures.orders", event="subscribe", pay_load=[self.account_id, symbol] ) self.send_packet(update_order) - update_position = self.construct_req( + update_position = self.generate_req( channel="futures.position_closes", event="subscribe", pay_load=[self.account_id, symbol] @@ -580,27 +567,19 @@ class GateiofWebsocketApi(WebsocketClient): ) self.ticks[req.symbol] = tick - self.subscribe_data(req.symbol) - - def subscribe_data(self, symbol: str): - """""" - update_tick = self.construct_req( + tick_req = self.generate_req( channel="futures.tickers", event="subscribe", - pay_load=[symbol] + pay_load=[req.symbol] ) - self.send_packet(update_tick) + self.send_packet(tick_req) - update_depth = self.construct_req( + depth_req = self.generate_req( channel="futures.order_book", event="subscribe", - pay_load=[symbol, "5", "0"] + pay_load=[req.symbol, "5", "0"] ) - self.send_packet(update_depth) - - def update_info(self, symbol: str, account_id: str): - """""" - + self.send_packet(depth_req) def on_disconnected(self): """""" @@ -608,7 +587,6 @@ class GateiofWebsocketApi(WebsocketClient): def on_packet(self, packet: dict): """""" - timestamp = packet["time"] channel = packet["channel"] event = packet["event"] @@ -619,14 +597,14 @@ class GateiofWebsocketApi(WebsocketClient): self.gateway.write_log("Websocket API报错:%s" % error) return + print(packet) + if channel == "futures.tickers": if event == "update": self.on_tick(result, timestamp) - elif channel == "futures.order_book": if event == "all": self.on_depth(result, timestamp) - elif channel == "futures.orders": if event == "update": self.on_order(result, timestamp) @@ -639,11 +617,10 @@ class GateiofWebsocketApi(WebsocketClient): sys.stderr.write(self.exception_detail( exception_type, exception_value, tb)) - def construct_req(self, channel: str, event: str, pay_load: list): + def generate_req(self, channel: str, event: str, pay_load: list): """""" expires = int(time.time()) - signature = generate_websocket_sign( - self.secret, channel, event, expires) + signature = generate_websocket_sign(self.secret, channel, event, expires) req = { "time": expires, @@ -661,7 +638,6 @@ class GateiofWebsocketApi(WebsocketClient): def on_tick(self, l: list, t: int): """""" - d = l[0] symbol = d["contract"] tick = self.ticks.get(symbol, None) From 8f7ddd985aafd0223c6a16f23c0ff1c45500f5f6 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Fri, 18 Oct 2019 20:10:35 +0800 Subject: [PATCH 6/6] [Mod] complete test of gateio gateway --- vnpy/gateway/gateios/gateios_gateway.py | 162 ++++++++++++------------ 1 file changed, 82 insertions(+), 80 deletions(-) diff --git a/vnpy/gateway/gateios/gateios_gateway.py b/vnpy/gateway/gateios/gateios_gateway.py index c42a6825..a2a84d7a 100644 --- a/vnpy/gateway/gateios/gateios_gateway.py +++ b/vnpy/gateway/gateios/gateios_gateway.py @@ -8,9 +8,8 @@ import sys import time from copy import copy from datetime import datetime, timedelta -from threading import Lock from urllib.parse import urlencode -from typing import List +from typing import List, Dict from vnpy.api.rest import Request, RestClient from vnpy.api.websocket import WebsocketClient @@ -31,11 +30,6 @@ REST_HOST = "https://api.gateio.ws" TESTNET_WEBSOCKET_HOST = "wss://fx-ws-testnet.gateio.ws/v4/ws" WEBSOCKET_HOST = "wss://fx-ws.gateio.ws/v4/ws" -STATUS_GATEIO2VT = { - "open": Status.NOTTRADED, - "finished": Status.ALLTRADED, -} - INTERVAL_VT2GATEIO = { Interval.MINUTE: "1m", Interval.HOUR: "1h", @@ -72,7 +66,7 @@ class GateiosGateway(BaseGateway): self.ws_api = GateiosWebsocketApi(self) self.rest_api = GateiosRestApi(self) - def connect(self, setting: dict): + def connect(self, setting: Dict): """""" key = setting["API Key"] secret = setting["Secret Key"] @@ -99,7 +93,7 @@ class GateiosGateway(BaseGateway): def cancel_order(self, req: CancelRequest): """""" - self.order_manager.cancel_order(req) + self.rest_api.cancel_order(req) def query_account(self): """""" @@ -320,12 +314,15 @@ class GateiosRestApi(RestClient): on_failed=self.on_send_order_failed ) - self.gateway.on_order(order) + self.order_manager.on_order(order) return order.vt_orderid def cancel_order(self, req: CancelRequest): """""" sys_orderid = self.order_manager.get_sys_orderid(req.orderid) + if not sys_orderid: + self.write_log("撤单失败,找不到对应系统委托号{}".format(req.orderid)) + return self.add_request( method="DELETE", @@ -367,7 +364,7 @@ class GateiosRestApi(RestClient): def on_query_order(self, data, request): """""" for d in data: - local_orderid = self.new_local_orderid() + local_orderid = self.order_manager.new_local_orderid() sys_orderid = str(d["id"]) self.order_manager.update_orderid_map( @@ -375,12 +372,15 @@ class GateiosRestApi(RestClient): sys_orderid=sys_orderid ) - volume = d["size"] - if volume > 0: + if d["size"] > 0: direction = Direction.LONG else: direction = Direction.SHORT + volume = abs(d["size"]) + traded = abs(d["size"] - d["left"]) + status = get_order_status(d["status"], volume, traded) + dt = datetime.fromtimestamp(d["create_time"]) order = OrderData( @@ -391,7 +391,7 @@ class GateiosRestApi(RestClient): volume=abs(volume), type=OrderType.LIMIT, direction=direction, - status=STATUS_GATEIO2VT[d["status"]], + status=status, time=dt.strftime("%H:%M:%S"), gateway_name=self.gateway_name, ) @@ -434,16 +434,7 @@ class GateiosRestApi(RestClient): def on_send_order(self, data, request): """""" order = request.extra - order.status = STATUS_GATEIO2VT[data["status"]] - - if order.status == Status.ALLTRADED: - order.traded = order.volume - - dt = datetime.fromtimestamp(data["create_time"]) - order.time = dt.strftime("%H:%M:%S") - - sys_orderid = data["id"] - self.order_manager.on_order(order) + sys_orderid = str(data["id"]) self.order_manager.update_orderid_map(order.orderid, sys_orderid) def on_send_order_failed(self, status_code: str, request: Request): @@ -473,17 +464,10 @@ class GateiosRestApi(RestClient): def on_cancel_order(self, data, request): """""" - cancel_request = request.extra - local_orderid = cancel_request.orderid - order = self.order_manager.get_order_with_local_orderid(local_orderid) - if data["status"] == "error": error_code = data["err_code"] error_msg = data["err_msg"] self.gateway.write_log(f"撤单失败,错误代码:{error_code},信息:{error_msg}") - else: - order.status = Status.CANCELLED - self.order_manager.on_order(order) def on_cancel_order_failed(self, status_code: str, request: Request): """ @@ -540,19 +524,16 @@ class GateiosWebsocketApi(WebsocketClient): self.gateway.write_log("Websocket API连接成功") for symbol in self.symbols: - update_order = self.generate_req( - channel="futures.orders", - event="subscribe", - pay_load=[self.account_id, symbol] - ) - self.send_packet(update_order) - - update_position = self.generate_req( - channel="futures.position_closes", - event="subscribe", - pay_load=[self.account_id, symbol] - ) - self.send_packet(update_position) + for channel in [ + "futures.orders", + "futures.usertrades" + ]: + req = self.generate_req( + channel=channel, + event="subscribe", + pay_load=[self.account_id, symbol] + ) + self.send_packet(req) def subscribe(self, req: SubscribeRequest): """ @@ -585,7 +566,7 @@ class GateiosWebsocketApi(WebsocketClient): """""" self.gateway.write_log("Websocket API连接断开") - def on_packet(self, packet: dict): + def on_packet(self, packet: Dict): """""" timestamp = packet["time"] channel = packet["channel"] @@ -597,17 +578,14 @@ class GateiosWebsocketApi(WebsocketClient): self.gateway.write_log("Websocket API报错:%s" % error) return - print(packet) - - if channel == "futures.tickers": - if event == "update": - self.on_tick(result, timestamp) - elif channel == "futures.order_book": - if event == "all": - self.on_depth(result, timestamp) - elif channel == "futures.orders": - if event == "update": - self.on_order(result, timestamp) + if channel == "futures.tickers" and event == "update": + self.on_tick(result, timestamp) + elif channel == "futures.order_book" and event == "all": + self.on_depth(result, timestamp) + elif channel == "futures.orders" and event == "update": + self.on_order(result, timestamp) + elif channel == "futures.usertrades" and event == "update": + self.on_trade(result, timestamp) def on_error(self, exception_type: type, exception_value: Exception, tb): """""" @@ -617,10 +595,11 @@ class GateiosWebsocketApi(WebsocketClient): sys.stderr.write(self.exception_detail( exception_type, exception_value, tb)) - def generate_req(self, channel: str, event: str, pay_load: list): + def generate_req(self, channel: str, event: str, pay_load: List): """""" expires = int(time.time()) - signature = generate_websocket_sign(self.secret, channel, event, expires) + signature = generate_websocket_sign( + self.secret, channel, event, expires) req = { "time": expires, @@ -636,7 +615,7 @@ class GateiosWebsocketApi(WebsocketClient): return req - def on_tick(self, l: list, t: int): + def on_tick(self, l: List, t: int): """""" d = l[0] symbol = d["contract"] @@ -645,10 +624,11 @@ class GateiosWebsocketApi(WebsocketClient): return tick.last_price = float(d["last"]) + tick.volume = int(d["volume_24h"]) tick.datetime = datetime.fromtimestamp(t) self.gateway.on_tick(copy(tick)) - def on_depth(self, d: dict, t: int): + def on_depth(self, d: Dict, t: int): """""" symbol = d["contract"] tick = self.ticks.get(symbol, None) @@ -671,18 +651,21 @@ class GateiosWebsocketApi(WebsocketClient): self.gateway.on_tick(copy(tick)) - def on_order(self, l: list, t: int): + def on_order(self, l: List, t: int): """""" d = l[0] local_orderid = str(d["text"])[2:] - volume = d["size"] - if volume > 0: + if d["size"] > 0: direction = Direction.LONG else: direction = Direction.SHORT + volume = abs(d["size"]) + traded = abs(d["size"] - d["left"]) + status = get_order_status(d["status"], volume, traded) + order = OrderData( orderid=local_orderid, symbol=d["contract"], @@ -691,29 +674,34 @@ class GateiosWebsocketApi(WebsocketClient): volume=abs(volume), type=OrderType.LIMIT, direction=direction, - status=STATUS_GATEIO2VT[d["status"]], + status=status, time=datetime.fromtimestamp(t).strftime("%H:%M:%S"), gateway_name=self.gateway_name, ) - self.order_manager.on_order(copy(order)) + self.order_manager.on_order(order) - # Update trade - if order.status == Status.ALLTRADED: - self.trade_count += 1 + def on_trade(self, l: List, t: int): + """""" + d = l[0] - trade = TradeData( - symbol=order.symbol, - exchange=order.exchange, - orderid=order.orderid, - tradeid=str(self.trade_count).rjust(8, "0"), - direction=order.direction, - price=float(d["fill_price"]), - volume=order.volume, - time=order.time, - gateway_name=self.gateway_name, - ) - self.gateway.on_trade(trade) + sys_orderid = d["order_id"] + order = self.order_manager.get_order_with_sys_orderid(sys_orderid) + if not order: + return + + trade = TradeData( + symbol=order.symbol, + exchange=order.exchange, + orderid=order.orderid, + tradeid=d["id"], + direction=order.direction, + price=float(d["price"]), + volume=abs(d["size"]), + time=datetime.fromtimestamp(d["create_time"]).strftime("%H:%M:%S"), + gateway_name=self.gateway_name, + ) + self.gateway.on_trade(trade) def generate_sign(key, secret, method, path, get_params=None, get_data=None): @@ -768,3 +756,17 @@ def generate_websocket_sign(secret, channel, event, time): ).hexdigest() return signature + + +def get_order_status(status: str, volume: int, traded: int): + """""" + if status == "open": + if traded: + return Status.PARTTRADED + else: + return Status.NOTTRADED + else: + if traded == volume: + return Status.ALLTRADED + else: + return Status.CANCELLED