diff --git a/vnpy/gateway/gateios/gateios_gateway.py b/vnpy/gateway/gateios/gateios_gateway.py index c42a6825..a2a84d7a 100644 --- a/vnpy/gateway/gateios/gateios_gateway.py +++ b/vnpy/gateway/gateios/gateios_gateway.py @@ -8,9 +8,8 @@ import sys import time from copy import copy from datetime import datetime, timedelta -from threading import Lock from urllib.parse import urlencode -from typing import List +from typing import List, Dict from vnpy.api.rest import Request, RestClient from vnpy.api.websocket import WebsocketClient @@ -31,11 +30,6 @@ REST_HOST = "https://api.gateio.ws" TESTNET_WEBSOCKET_HOST = "wss://fx-ws-testnet.gateio.ws/v4/ws" WEBSOCKET_HOST = "wss://fx-ws.gateio.ws/v4/ws" -STATUS_GATEIO2VT = { - "open": Status.NOTTRADED, - "finished": Status.ALLTRADED, -} - INTERVAL_VT2GATEIO = { Interval.MINUTE: "1m", Interval.HOUR: "1h", @@ -72,7 +66,7 @@ class GateiosGateway(BaseGateway): self.ws_api = GateiosWebsocketApi(self) self.rest_api = GateiosRestApi(self) - def connect(self, setting: dict): + def connect(self, setting: Dict): """""" key = setting["API Key"] secret = setting["Secret Key"] @@ -99,7 +93,7 @@ class GateiosGateway(BaseGateway): def cancel_order(self, req: CancelRequest): """""" - self.order_manager.cancel_order(req) + self.rest_api.cancel_order(req) def query_account(self): """""" @@ -320,12 +314,15 @@ class GateiosRestApi(RestClient): on_failed=self.on_send_order_failed ) - self.gateway.on_order(order) + self.order_manager.on_order(order) return order.vt_orderid def cancel_order(self, req: CancelRequest): """""" sys_orderid = self.order_manager.get_sys_orderid(req.orderid) + if not sys_orderid: + self.write_log("撤单失败,找不到对应系统委托号{}".format(req.orderid)) + return self.add_request( method="DELETE", @@ -367,7 +364,7 @@ class GateiosRestApi(RestClient): def on_query_order(self, data, request): """""" for d in data: - local_orderid = self.new_local_orderid() + local_orderid = self.order_manager.new_local_orderid() sys_orderid = str(d["id"]) self.order_manager.update_orderid_map( @@ -375,12 +372,15 @@ class GateiosRestApi(RestClient): sys_orderid=sys_orderid ) - volume = d["size"] - if volume > 0: + if d["size"] > 0: direction = Direction.LONG else: direction = Direction.SHORT + volume = abs(d["size"]) + traded = abs(d["size"] - d["left"]) + status = get_order_status(d["status"], volume, traded) + dt = datetime.fromtimestamp(d["create_time"]) order = OrderData( @@ -391,7 +391,7 @@ class GateiosRestApi(RestClient): volume=abs(volume), type=OrderType.LIMIT, direction=direction, - status=STATUS_GATEIO2VT[d["status"]], + status=status, time=dt.strftime("%H:%M:%S"), gateway_name=self.gateway_name, ) @@ -434,16 +434,7 @@ class GateiosRestApi(RestClient): def on_send_order(self, data, request): """""" order = request.extra - order.status = STATUS_GATEIO2VT[data["status"]] - - if order.status == Status.ALLTRADED: - order.traded = order.volume - - dt = datetime.fromtimestamp(data["create_time"]) - order.time = dt.strftime("%H:%M:%S") - - sys_orderid = data["id"] - self.order_manager.on_order(order) + sys_orderid = str(data["id"]) self.order_manager.update_orderid_map(order.orderid, sys_orderid) def on_send_order_failed(self, status_code: str, request: Request): @@ -473,17 +464,10 @@ class GateiosRestApi(RestClient): def on_cancel_order(self, data, request): """""" - cancel_request = request.extra - local_orderid = cancel_request.orderid - order = self.order_manager.get_order_with_local_orderid(local_orderid) - if data["status"] == "error": error_code = data["err_code"] error_msg = data["err_msg"] self.gateway.write_log(f"撤单失败,错误代码:{error_code},信息:{error_msg}") - else: - order.status = Status.CANCELLED - self.order_manager.on_order(order) def on_cancel_order_failed(self, status_code: str, request: Request): """ @@ -540,19 +524,16 @@ class GateiosWebsocketApi(WebsocketClient): self.gateway.write_log("Websocket API连接成功") for symbol in self.symbols: - update_order = self.generate_req( - channel="futures.orders", - event="subscribe", - pay_load=[self.account_id, symbol] - ) - self.send_packet(update_order) - - update_position = self.generate_req( - channel="futures.position_closes", - event="subscribe", - pay_load=[self.account_id, symbol] - ) - self.send_packet(update_position) + for channel in [ + "futures.orders", + "futures.usertrades" + ]: + req = self.generate_req( + channel=channel, + event="subscribe", + pay_load=[self.account_id, symbol] + ) + self.send_packet(req) def subscribe(self, req: SubscribeRequest): """ @@ -585,7 +566,7 @@ class GateiosWebsocketApi(WebsocketClient): """""" self.gateway.write_log("Websocket API连接断开") - def on_packet(self, packet: dict): + def on_packet(self, packet: Dict): """""" timestamp = packet["time"] channel = packet["channel"] @@ -597,17 +578,14 @@ class GateiosWebsocketApi(WebsocketClient): self.gateway.write_log("Websocket API报错:%s" % error) return - print(packet) - - if channel == "futures.tickers": - if event == "update": - self.on_tick(result, timestamp) - elif channel == "futures.order_book": - if event == "all": - self.on_depth(result, timestamp) - elif channel == "futures.orders": - if event == "update": - self.on_order(result, timestamp) + if channel == "futures.tickers" and event == "update": + self.on_tick(result, timestamp) + elif channel == "futures.order_book" and event == "all": + self.on_depth(result, timestamp) + elif channel == "futures.orders" and event == "update": + self.on_order(result, timestamp) + elif channel == "futures.usertrades" and event == "update": + self.on_trade(result, timestamp) def on_error(self, exception_type: type, exception_value: Exception, tb): """""" @@ -617,10 +595,11 @@ class GateiosWebsocketApi(WebsocketClient): sys.stderr.write(self.exception_detail( exception_type, exception_value, tb)) - def generate_req(self, channel: str, event: str, pay_load: list): + def generate_req(self, channel: str, event: str, pay_load: List): """""" expires = int(time.time()) - signature = generate_websocket_sign(self.secret, channel, event, expires) + signature = generate_websocket_sign( + self.secret, channel, event, expires) req = { "time": expires, @@ -636,7 +615,7 @@ class GateiosWebsocketApi(WebsocketClient): return req - def on_tick(self, l: list, t: int): + def on_tick(self, l: List, t: int): """""" d = l[0] symbol = d["contract"] @@ -645,10 +624,11 @@ class GateiosWebsocketApi(WebsocketClient): return tick.last_price = float(d["last"]) + tick.volume = int(d["volume_24h"]) tick.datetime = datetime.fromtimestamp(t) self.gateway.on_tick(copy(tick)) - def on_depth(self, d: dict, t: int): + def on_depth(self, d: Dict, t: int): """""" symbol = d["contract"] tick = self.ticks.get(symbol, None) @@ -671,18 +651,21 @@ class GateiosWebsocketApi(WebsocketClient): self.gateway.on_tick(copy(tick)) - def on_order(self, l: list, t: int): + def on_order(self, l: List, t: int): """""" d = l[0] local_orderid = str(d["text"])[2:] - volume = d["size"] - if volume > 0: + if d["size"] > 0: direction = Direction.LONG else: direction = Direction.SHORT + volume = abs(d["size"]) + traded = abs(d["size"] - d["left"]) + status = get_order_status(d["status"], volume, traded) + order = OrderData( orderid=local_orderid, symbol=d["contract"], @@ -691,29 +674,34 @@ class GateiosWebsocketApi(WebsocketClient): volume=abs(volume), type=OrderType.LIMIT, direction=direction, - status=STATUS_GATEIO2VT[d["status"]], + status=status, time=datetime.fromtimestamp(t).strftime("%H:%M:%S"), gateway_name=self.gateway_name, ) - self.order_manager.on_order(copy(order)) + self.order_manager.on_order(order) - # Update trade - if order.status == Status.ALLTRADED: - self.trade_count += 1 + def on_trade(self, l: List, t: int): + """""" + d = l[0] - trade = TradeData( - symbol=order.symbol, - exchange=order.exchange, - orderid=order.orderid, - tradeid=str(self.trade_count).rjust(8, "0"), - direction=order.direction, - price=float(d["fill_price"]), - volume=order.volume, - time=order.time, - gateway_name=self.gateway_name, - ) - self.gateway.on_trade(trade) + sys_orderid = d["order_id"] + order = self.order_manager.get_order_with_sys_orderid(sys_orderid) + if not order: + return + + trade = TradeData( + symbol=order.symbol, + exchange=order.exchange, + orderid=order.orderid, + tradeid=d["id"], + direction=order.direction, + price=float(d["price"]), + volume=abs(d["size"]), + time=datetime.fromtimestamp(d["create_time"]).strftime("%H:%M:%S"), + gateway_name=self.gateway_name, + ) + self.gateway.on_trade(trade) def generate_sign(key, secret, method, path, get_params=None, get_data=None): @@ -768,3 +756,17 @@ def generate_websocket_sign(secret, channel, event, time): ).hexdigest() return signature + + +def get_order_status(status: str, volume: int, traded: int): + """""" + if status == "open": + if traded: + return Status.PARTTRADED + else: + return Status.NOTTRADED + else: + if traded == volume: + return Status.ALLTRADED + else: + return Status.CANCELLED