[Mod] complete test of gateio gateway

This commit is contained in:
vn.py 2019-10-18 20:10:35 +08:00
parent 4336ccf065
commit 8f7ddd985a

View File

@ -8,9 +8,8 @@ import sys
import time import time
from copy import copy from copy import copy
from datetime import datetime, timedelta from datetime import datetime, timedelta
from threading import Lock
from urllib.parse import urlencode from urllib.parse import urlencode
from typing import List from typing import List, Dict
from vnpy.api.rest import Request, RestClient from vnpy.api.rest import Request, RestClient
from vnpy.api.websocket import WebsocketClient from vnpy.api.websocket import WebsocketClient
@ -31,11 +30,6 @@ REST_HOST = "https://api.gateio.ws"
TESTNET_WEBSOCKET_HOST = "wss://fx-ws-testnet.gateio.ws/v4/ws" TESTNET_WEBSOCKET_HOST = "wss://fx-ws-testnet.gateio.ws/v4/ws"
WEBSOCKET_HOST = "wss://fx-ws.gateio.ws/v4/ws" WEBSOCKET_HOST = "wss://fx-ws.gateio.ws/v4/ws"
STATUS_GATEIO2VT = {
"open": Status.NOTTRADED,
"finished": Status.ALLTRADED,
}
INTERVAL_VT2GATEIO = { INTERVAL_VT2GATEIO = {
Interval.MINUTE: "1m", Interval.MINUTE: "1m",
Interval.HOUR: "1h", Interval.HOUR: "1h",
@ -72,7 +66,7 @@ class GateiosGateway(BaseGateway):
self.ws_api = GateiosWebsocketApi(self) self.ws_api = GateiosWebsocketApi(self)
self.rest_api = GateiosRestApi(self) self.rest_api = GateiosRestApi(self)
def connect(self, setting: dict): def connect(self, setting: Dict):
"""""" """"""
key = setting["API Key"] key = setting["API Key"]
secret = setting["Secret Key"] secret = setting["Secret Key"]
@ -99,7 +93,7 @@ class GateiosGateway(BaseGateway):
def cancel_order(self, req: CancelRequest): def cancel_order(self, req: CancelRequest):
"""""" """"""
self.order_manager.cancel_order(req) self.rest_api.cancel_order(req)
def query_account(self): def query_account(self):
"""""" """"""
@ -320,12 +314,15 @@ class GateiosRestApi(RestClient):
on_failed=self.on_send_order_failed on_failed=self.on_send_order_failed
) )
self.gateway.on_order(order) self.order_manager.on_order(order)
return order.vt_orderid return order.vt_orderid
def cancel_order(self, req: CancelRequest): def cancel_order(self, req: CancelRequest):
"""""" """"""
sys_orderid = self.order_manager.get_sys_orderid(req.orderid) sys_orderid = self.order_manager.get_sys_orderid(req.orderid)
if not sys_orderid:
self.write_log("撤单失败,找不到对应系统委托号{}".format(req.orderid))
return
self.add_request( self.add_request(
method="DELETE", method="DELETE",
@ -367,7 +364,7 @@ class GateiosRestApi(RestClient):
def on_query_order(self, data, request): def on_query_order(self, data, request):
"""""" """"""
for d in data: for d in data:
local_orderid = self.new_local_orderid() local_orderid = self.order_manager.new_local_orderid()
sys_orderid = str(d["id"]) sys_orderid = str(d["id"])
self.order_manager.update_orderid_map( self.order_manager.update_orderid_map(
@ -375,12 +372,15 @@ class GateiosRestApi(RestClient):
sys_orderid=sys_orderid sys_orderid=sys_orderid
) )
volume = d["size"] if d["size"] > 0:
if volume > 0:
direction = Direction.LONG direction = Direction.LONG
else: else:
direction = Direction.SHORT direction = Direction.SHORT
volume = abs(d["size"])
traded = abs(d["size"] - d["left"])
status = get_order_status(d["status"], volume, traded)
dt = datetime.fromtimestamp(d["create_time"]) dt = datetime.fromtimestamp(d["create_time"])
order = OrderData( order = OrderData(
@ -391,7 +391,7 @@ class GateiosRestApi(RestClient):
volume=abs(volume), volume=abs(volume),
type=OrderType.LIMIT, type=OrderType.LIMIT,
direction=direction, direction=direction,
status=STATUS_GATEIO2VT[d["status"]], status=status,
time=dt.strftime("%H:%M:%S"), time=dt.strftime("%H:%M:%S"),
gateway_name=self.gateway_name, gateway_name=self.gateway_name,
) )
@ -434,16 +434,7 @@ class GateiosRestApi(RestClient):
def on_send_order(self, data, request): def on_send_order(self, data, request):
"""""" """"""
order = request.extra order = request.extra
order.status = STATUS_GATEIO2VT[data["status"]] sys_orderid = str(data["id"])
if order.status == Status.ALLTRADED:
order.traded = order.volume
dt = datetime.fromtimestamp(data["create_time"])
order.time = dt.strftime("%H:%M:%S")
sys_orderid = data["id"]
self.order_manager.on_order(order)
self.order_manager.update_orderid_map(order.orderid, sys_orderid) self.order_manager.update_orderid_map(order.orderid, sys_orderid)
def on_send_order_failed(self, status_code: str, request: Request): def on_send_order_failed(self, status_code: str, request: Request):
@ -473,17 +464,10 @@ class GateiosRestApi(RestClient):
def on_cancel_order(self, data, request): def on_cancel_order(self, data, request):
"""""" """"""
cancel_request = request.extra
local_orderid = cancel_request.orderid
order = self.order_manager.get_order_with_local_orderid(local_orderid)
if data["status"] == "error": if data["status"] == "error":
error_code = data["err_code"] error_code = data["err_code"]
error_msg = data["err_msg"] error_msg = data["err_msg"]
self.gateway.write_log(f"撤单失败,错误代码:{error_code},信息:{error_msg}") self.gateway.write_log(f"撤单失败,错误代码:{error_code},信息:{error_msg}")
else:
order.status = Status.CANCELLED
self.order_manager.on_order(order)
def on_cancel_order_failed(self, status_code: str, request: Request): def on_cancel_order_failed(self, status_code: str, request: Request):
""" """
@ -540,19 +524,16 @@ class GateiosWebsocketApi(WebsocketClient):
self.gateway.write_log("Websocket API连接成功") self.gateway.write_log("Websocket API连接成功")
for symbol in self.symbols: for symbol in self.symbols:
update_order = self.generate_req( for channel in [
channel="futures.orders", "futures.orders",
"futures.usertrades"
]:
req = self.generate_req(
channel=channel,
event="subscribe", event="subscribe",
pay_load=[self.account_id, symbol] pay_load=[self.account_id, symbol]
) )
self.send_packet(update_order) self.send_packet(req)
update_position = self.generate_req(
channel="futures.position_closes",
event="subscribe",
pay_load=[self.account_id, symbol]
)
self.send_packet(update_position)
def subscribe(self, req: SubscribeRequest): def subscribe(self, req: SubscribeRequest):
""" """
@ -585,7 +566,7 @@ class GateiosWebsocketApi(WebsocketClient):
"""""" """"""
self.gateway.write_log("Websocket API连接断开") self.gateway.write_log("Websocket API连接断开")
def on_packet(self, packet: dict): def on_packet(self, packet: Dict):
"""""" """"""
timestamp = packet["time"] timestamp = packet["time"]
channel = packet["channel"] channel = packet["channel"]
@ -597,17 +578,14 @@ class GateiosWebsocketApi(WebsocketClient):
self.gateway.write_log("Websocket API报错%s" % error) self.gateway.write_log("Websocket API报错%s" % error)
return return
print(packet) if channel == "futures.tickers" and event == "update":
if channel == "futures.tickers":
if event == "update":
self.on_tick(result, timestamp) self.on_tick(result, timestamp)
elif channel == "futures.order_book": elif channel == "futures.order_book" and event == "all":
if event == "all":
self.on_depth(result, timestamp) self.on_depth(result, timestamp)
elif channel == "futures.orders": elif channel == "futures.orders" and event == "update":
if event == "update":
self.on_order(result, timestamp) self.on_order(result, timestamp)
elif channel == "futures.usertrades" and event == "update":
self.on_trade(result, timestamp)
def on_error(self, exception_type: type, exception_value: Exception, tb): def on_error(self, exception_type: type, exception_value: Exception, tb):
"""""" """"""
@ -617,10 +595,11 @@ class GateiosWebsocketApi(WebsocketClient):
sys.stderr.write(self.exception_detail( sys.stderr.write(self.exception_detail(
exception_type, exception_value, tb)) exception_type, exception_value, tb))
def generate_req(self, channel: str, event: str, pay_load: list): def generate_req(self, channel: str, event: str, pay_load: List):
"""""" """"""
expires = int(time.time()) expires = int(time.time())
signature = generate_websocket_sign(self.secret, channel, event, expires) signature = generate_websocket_sign(
self.secret, channel, event, expires)
req = { req = {
"time": expires, "time": expires,
@ -636,7 +615,7 @@ class GateiosWebsocketApi(WebsocketClient):
return req return req
def on_tick(self, l: list, t: int): def on_tick(self, l: List, t: int):
"""""" """"""
d = l[0] d = l[0]
symbol = d["contract"] symbol = d["contract"]
@ -645,10 +624,11 @@ class GateiosWebsocketApi(WebsocketClient):
return return
tick.last_price = float(d["last"]) tick.last_price = float(d["last"])
tick.volume = int(d["volume_24h"])
tick.datetime = datetime.fromtimestamp(t) tick.datetime = datetime.fromtimestamp(t)
self.gateway.on_tick(copy(tick)) self.gateway.on_tick(copy(tick))
def on_depth(self, d: dict, t: int): def on_depth(self, d: Dict, t: int):
"""""" """"""
symbol = d["contract"] symbol = d["contract"]
tick = self.ticks.get(symbol, None) tick = self.ticks.get(symbol, None)
@ -671,18 +651,21 @@ class GateiosWebsocketApi(WebsocketClient):
self.gateway.on_tick(copy(tick)) self.gateway.on_tick(copy(tick))
def on_order(self, l: list, t: int): def on_order(self, l: List, t: int):
"""""" """"""
d = l[0] d = l[0]
local_orderid = str(d["text"])[2:] local_orderid = str(d["text"])[2:]
volume = d["size"] if d["size"] > 0:
if volume > 0:
direction = Direction.LONG direction = Direction.LONG
else: else:
direction = Direction.SHORT direction = Direction.SHORT
volume = abs(d["size"])
traded = abs(d["size"] - d["left"])
status = get_order_status(d["status"], volume, traded)
order = OrderData( order = OrderData(
orderid=local_orderid, orderid=local_orderid,
symbol=d["contract"], symbol=d["contract"],
@ -691,26 +674,31 @@ class GateiosWebsocketApi(WebsocketClient):
volume=abs(volume), volume=abs(volume),
type=OrderType.LIMIT, type=OrderType.LIMIT,
direction=direction, direction=direction,
status=STATUS_GATEIO2VT[d["status"]], status=status,
time=datetime.fromtimestamp(t).strftime("%H:%M:%S"), time=datetime.fromtimestamp(t).strftime("%H:%M:%S"),
gateway_name=self.gateway_name, gateway_name=self.gateway_name,
) )
self.order_manager.on_order(copy(order)) self.order_manager.on_order(order)
# Update trade def on_trade(self, l: List, t: int):
if order.status == Status.ALLTRADED: """"""
self.trade_count += 1 d = l[0]
sys_orderid = d["order_id"]
order = self.order_manager.get_order_with_sys_orderid(sys_orderid)
if not order:
return
trade = TradeData( trade = TradeData(
symbol=order.symbol, symbol=order.symbol,
exchange=order.exchange, exchange=order.exchange,
orderid=order.orderid, orderid=order.orderid,
tradeid=str(self.trade_count).rjust(8, "0"), tradeid=d["id"],
direction=order.direction, direction=order.direction,
price=float(d["fill_price"]), price=float(d["price"]),
volume=order.volume, volume=abs(d["size"]),
time=order.time, time=datetime.fromtimestamp(d["create_time"]).strftime("%H:%M:%S"),
gateway_name=self.gateway_name, gateway_name=self.gateway_name,
) )
self.gateway.on_trade(trade) self.gateway.on_trade(trade)
@ -768,3 +756,17 @@ def generate_websocket_sign(secret, channel, event, time):
).hexdigest() ).hexdigest()
return signature return signature
def get_order_status(status: str, volume: int, traded: int):
""""""
if status == "open":
if traded:
return Status.PARTTRADED
else:
return Status.NOTTRADED
else:
if traded == volume:
return Status.ALLTRADED
else:
return Status.CANCELLED