[Add]websocket api for OnetokenGateway

This commit is contained in:
vn.py 2019-04-17 13:16:34 +08:00
parent 94c4e0beda
commit 194ccdb09c
2 changed files with 404 additions and 69 deletions

View File

@ -13,6 +13,7 @@ from vnpy.gateway.oes import OesGateway
from vnpy.gateway.okex import OkexGateway
from vnpy.gateway.huobi import HuobiGateway
from vnpy.gateway.bitfinex import BitfinexGateway
from vnpy.gateway.onetoken import OnetokenGateway
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.csv_loader import CsvLoaderApp
@ -36,6 +37,7 @@ def main():
main_engine.add_gateway(OkexGateway)
main_engine.add_gateway(HuobiGateway)
main_engine.add_gateway(BitfinexGateway)
main_engine.add_gateway(OnetokenGateway)
main_engine.add_app(CtaStrategyApp)
main_engine.add_app(CtaBacktesterApp)

View File

@ -3,16 +3,19 @@
"""
import hashlib
import sys
import hmac
import json
import time
from datetime import datetime
from threading import Lock
from urllib.parse import urlparse
from copy import copy
from requests import ConnectionError
from vnpy.api.rest import Request, RestClient
from vnpy.api.websocket import WebsocketClient
from vnpy.trader.constant import (
Direction,
Exchange,
@ -22,20 +25,34 @@ from vnpy.trader.constant import (
)
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import (
TickData,
PositionData,
AccountData,
OrderRequest,
CancelRequest,
SubscribeRequest,
ContractData
ContractData,
OrderData,
TradeData
)
from vnpy.trader.event import EVENT_TIMER
REST_HOST = 'https://1token.trade/api'
REST_HOST = "https://1token.trade/api"
DATA_WEBSOCKET_HOST = "wss://1token.trade/api/v1/ws/tick"
TRADE_WEBSOCKET_HOST = "wss://1token.trade/api/v1/ws/trade"
DIRECTION_VT2ONETOKEN = {Direction.LONG: "b", Direction.SHORT: "s"}
DIRECTION_ONETOKEN2VT = {v: k for k, v in DIRECTION_VT2ONETOKEN.items()}
EXCHANGE_VT2ONETOKEN = {
Exchange.OKEX: "okex",
Exchange.HUOBI: "huobi"
}
EXCHANGE_ONETOKEN2VT = {v: k for k, v in EXCHANGE_VT2ONETOKEN.items()}
class OnetokenGateway(BaseGateway):
"""
VN Trader Gateway for 1Token connection
@ -44,7 +61,7 @@ class OnetokenGateway(BaseGateway):
default_setting = {
"OT Key": "",
"OT Secret": "",
"交易所": "",
"交易所": ["BINANCE", "BITMEX", "OKEX", "OKEF"],
"账户": "",
"会话数": 3,
"代理地址": "127.0.0.1",
@ -53,9 +70,13 @@ class OnetokenGateway(BaseGateway):
def __init__(self, event_engine):
"""Constructor"""
super(OnetokenGateway, self).__init__(event_engine, "1Token")
super(OnetokenGateway, self).__init__(event_engine, "1TOKEN")
self.rest_api = OnetokenRestApi(self)
self.data_ws_api = OnetokenDataWebsocketApi(self)
self.trade_ws_api = OnetokenTradeWebsocketApi(self)
self.count = 0
def connect(self, setting: dict):
""""""
@ -66,13 +87,18 @@ class OnetokenGateway(BaseGateway):
account = setting["账户"]
proxy_host = setting["代理地址"]
proxy_port = setting["代理端口"]
self.rest_api.connect(key, secret, session_number,
exchange, account, proxy_host, proxy_port)
self.data_ws_api.connect(proxy_host, proxy_port)
self.trade_ws_api.connect(
key, secret, exchange, account, proxy_host, proxy_port)
self.init_ping()
def subscribe(self, req: SubscribeRequest):
""""""
pass
# self.ws_api.subscribe(req)
self.data_ws_api.subscribe(req)
def send_order(self, req: OrderRequest):
""""""
@ -93,6 +119,22 @@ class OnetokenGateway(BaseGateway):
def close(self):
""""""
self.rest_api.stop()
self.data_ws_api.stop()
self.trade_ws_api.stop()
def process_timer_event(self, event):
""""""
self.count += 1
if self.count < 20:
return
self.count = 0
self.data_ws_api.ping()
self.trade_ws_api.ping()
def init_ping(self):
""""""
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
class OnetokenRestApi(RestClient):
@ -123,23 +165,24 @@ class OnetokenRestApi(RestClient):
"""
method = request.method
endpoint = '/' + request.path.split('/', 3)[3]
endpoint = "/" + request.path.split("/", 3)[3]
# v1/trade/okex/mock-example/info -> okex/mock-example/info
parsed_url = urlparse(endpoint)
path = parsed_url.path
nonce = str(int(time.time() * 1e6))
data = request.data
json_str = data if data else ''
json_str = data if data else ""
message = method + path + nonce + json_str
signature = hmac.new(bytes(self.secret, 'utf8'), bytes(message, 'utf8'), digestmod=hashlib.sha256).hexdigest()
signature = hmac.new(bytes(self.secret, "utf8"), bytes(
message, "utf8"), digestmod=hashlib.sha256).hexdigest()
headers = {'Api-Nonce': nonce,
'Api-Key': self.key,
'Api-Signature': signature,
'Content-Type': 'application/json'}
headers = {"Api-Nonce": nonce,
"Api-Key": self.key,
"Api-Signature": signature,
"Content-Type": "application/json"}
request.headers = headers
return request
@ -174,58 +217,13 @@ class OnetokenRestApi(RestClient):
self.query_time()
self.query_contract()
self.query_account()
# self.query_account()
def _new_order_id(self):
with self.order_count_lock:
self.order_count += 1
return self.order_count
def query_account(self): # get balance and positions at the same time
""""""
self.add_request(
"GET",
"/v1/trade/{}/{}/info".format(self.exchange, self.account),
callback=self.on_query_account
)
def on_query_account(self, data, request):
"""This is for WS Example"""
for account_data in data["position"]:
_type = account_data['type']
if 'spot' in _type: # 统计balance
account = AccountData(
accountid=account_data["contract"],
balance=float(account_data["total_amount"]),
frozen=float(account_data["frozen"]),
gateway_name=self.gateway_name
)
self.gateway.on_account(account)
elif _type == 'future': # 期货合约
long_position = PositionData(
symbol=account_data["contract"],
exchange=Exchange.OKEX, # todo add Exchange
direction=Direction.LONG,
volume=account_data['total_amount_long'],
frozen=account_data['total_amount_long'] - account_data['available_long'],
gateway_name=self.gateway_name,
# yd_volume=?
)
short_position = PositionData(
symbol=account_data["contract"],
exchange=Exchange.OKEX, # todo add Exchange
direction=Direction.SHORT,
volume=account_data['total_amount_short'],
frozen=account_data['total_amount_short'] - account_data['available_short'],
gateway_name=self.gateway_name,
# yd_volume=?
)
self.gateway.on_position(long_position)
self.gateway.on_position(short_position)
self.gateway.write_log("账户资金查询成功")
self.gateway.write_log("账户持仓查询成功")
def query_time(self):
""""""
self.add_request(
@ -238,7 +236,7 @@ class OnetokenRestApi(RestClient):
""""""
server_timestamp = data["server_time"]
dt = datetime.utcfromtimestamp(server_timestamp)
server_time = dt.isoformat() + 'Z'
server_time = dt.isoformat() + "Z"
local_time = datetime.utcnow().isoformat()
msg = f"服务器时间:{server_time},本机时间:{local_time}"
self.gateway.write_log(msg)
@ -260,30 +258,31 @@ class OnetokenRestApi(RestClient):
exchange=Exchange.OKEX, # todo
name=symbol,
product=Product.SPOT, # todo
size=float(instrument_data['min_amount']),
pricetick=float(instrument_data['unit_amount']),
size=float(instrument_data["min_amount"]),
pricetick=float(instrument_data["unit_amount"]),
gateway_name=self.gateway_name
)
self.gateway.on_contract(contract)
self.gateway.write_log("合约信息查询成功")
# Start websocket api after instruments data collected
# self.gateway.ws_api.start()
self.gateway.data_ws_api.start()
self.gateway.trade_ws_api.start()
def send_order(self, req: OrderRequest):
""""""
orderid = str(self.connect_time + self._new_order_id())
data = {
'contract': self.exchange + '/' + req.symbol,
'price': float(req.price),
"contract": self.exchange + "/" + req.symbol,
"price": float(req.price),
"bs": DIRECTION_VT2ONETOKEN[req.direction],
'amount': float(req.volume),
'client_oid': orderid
"amount": float(req.volume),
"client_oid": orderid
}
if req.offset == Offset.CLOSE:
data['options'] = {'close': True}
data["options"] = {"close": True}
data = json.dumps(data)
order = req.create_order_data(orderid, self.gateway_name)
@ -304,7 +303,7 @@ class OnetokenRestApi(RestClient):
def cancel_order(self, req: CancelRequest):
""""""
params = {
'client_oid': req.orderid
"client_oid": req.orderid
}
self.add_request(
@ -358,3 +357,337 @@ class OnetokenRestApi(RestClient):
# Record exception if not ConnectionError
if not issubclass(exception_type, ConnectionError):
self.on_error(exception_type, exception_value, tb, request)
class OnetokenDataWebsocketApi(WebsocketClient):
""""""
def __init__(self, gateway):
""""""
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.ticks = {}
self.callbacks = {
"auth": self.on_login,
"single-tick-verbose": self.on_tick
}
def connect(
self,
proxy_host: str,
proxy_port: int
):
""""""
self.init(DATA_WEBSOCKET_HOST, proxy_host, proxy_port)
def subscribe(self, req: SubscribeRequest):
"""
Subscribe to tick data upate.
"""
tick = TickData(
symbol=req.symbol,
exchange=req.exchange,
name=req.symbol,
datetime=datetime.now(),
gateway_name=self.gateway_name,
)
contract_symbol = f"{req.exchange.value.lower()}/{req.symbol.lower()}"
self.ticks[contract_symbol] = tick
req = {
"uri": "subscribe-single-tick-verbose",
"contract": contract_symbol
}
self.send_packet(req)
def on_connected(self):
""""""
self.gateway.write_log("行情Websocket API连接成功")
self.login()
def on_disconnected(self):
""""""
self.gateway.write_log("行情Websocket API连接断开")
def on_packet(self, packet: dict):
""""""
channel = packet.get("uri", "")
if not channel:
return
data = packet.get("data", None)
callback = self.callbacks.get(channel, None)
if callback:
callback(data)
def on_error(self, exception_type: type, exception_value: Exception, tb):
""""""
msg = f"触发异常,状态码:{exception_type},信息:{exception_value}"
self.gateway.write_log(msg)
sys.stderr.write(self.exception_detail(
exception_type, exception_value, tb))
def login(self):
"""
Need to login befores subscribe to websocket topic.
"""
req = {"uri": "auth"}
self.send_packet(req)
self.callbacks["auth"] = self.on_login
def on_login(self, data: dict):
""""""
self.gateway.write_log("行情Websocket API登录成功")
def on_tick(self, data: dict):
""""""
contract_symbol = data["contract"]
tick = self.ticks.get(contract_symbol, None)
if not tick:
return
tick.last_price = data["last"]
tick.datetime = datetime.strptime(
data["time"][:-6], "%Y-%m-%dT%H:%M:%S.%f")
bids = data["bids"]
asks = data["asks"]
for n, buf in enumerate(bids):
tick.__setattr__("bid_price_%s" % (n + 1), buf["price"])
tick.__setattr__("bid_volume_%s" % (n + 1), buf["volume"])
for n, buf in enumerate(asks):
tick.__setattr__("ask_price_%s" % (n + 1), buf["price"])
tick.__setattr__("ask_volume_%s" % (n + 1), buf["volume"])
self.gateway.on_tick(copy(tick))
def ping(self):
""""""
self.send_packet({"uri": "ping"})
class OnetokenTradeWebsocketApi(WebsocketClient):
""""""
def __init__(self, gateway):
""""""
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.key = ""
self.secret = ""
self.exchange = ""
self.account = ""
self.trade_count = 0
self.callbacks = {
"sub-info": self.on_subscribe_info,
"sub-order": self.on_subscribe_order,
"info": self.on_info,
"order": self.on_order
}
def connect(
self,
key: str,
secret: str,
exchange: str,
account: str,
proxy_host: str,
proxy_port: int
):
""""""
self.key = key
self.secret = secret
self.exchange = exchange
self.account = account
# Create header for ws connection
nonce = str(int(time.time() * 1e6))
path = f"/ws/{self.account}"
message = "GET" + path + nonce
signature = hmac.new(bytes(self.secret, "utf8"), bytes(
message, "utf8"), digestmod=hashlib.sha256).hexdigest()
header = {
"Api-Nonce": nonce,
"Api-Key": self.key,
"Api-Signature": signature
}
host = f"{TRADE_WEBSOCKET_HOST}/{self.exchange}/{self.account}"
self.init(host, proxy_host, proxy_port, header=header)
def subscribe_info(self):
"""
Subscribe to account update.
"""
self.send_packet({"uri": "sub-info"})
def subscribe_order(self):
"""
Subscribe to order update.
"""
self.send_packet({"uri": "sub-order"})
def on_connected(self):
""""""
self.gateway.write_log("交易Websocket API连接成功")
self.subscribe_info()
self.subscribe_order()
def on_disconnected(self):
""""""
self.gateway.write_log("交易Websocket API连接断开")
def on_packet(self, packet: dict):
""""""
if "uri" in packet:
channel = packet["uri"]
if "data" in packet:
data = packet["data"]
elif "code" in packet:
data = packet["code"]
else:
data = None
elif "action" in packet:
channel = packet["action"]
data = packet.get("data", None)
else:
print(packet)
return
callback = self.callbacks.get(channel, None)
if callback:
callback(data)
def on_error(self, exception_type: type, exception_value: Exception, tb):
""""""
msg = f"触发异常,状态码:{exception_type},信息:{exception_value}"
self.gateway.write_log(msg)
sys.stderr.write(self.exception_detail(
exception_type, exception_value, tb))
def on_subscribe_info(self, data: str):
""""""
if data == "success":
self.gateway.write_log("账户资金推送订阅成功")
def on_subscribe_order(self, data: str):
""""""
if data == "success":
self.gateway.write_log("委托更新推送订阅成功")
def on_info(self, data: dict):
""""""
for account_data in data["position"]:
_type = account_data["type"]
# Spot
if "spot" in _type:
account = AccountData(
accountid=account_data["contract"],
balance=float(account_data["total_amount"]),
frozen=float(account_data["frozen"]),
gateway_name=self.gateway_name
)
self.gateway.on_account(account)
# Futures
elif _type == "future":
long_position = PositionData(
symbol=account_data["contract"],
exchange=Exchange.OKEX, # todo add Exchange
direction=Direction.LONG,
volume=account_data["total_amount_long"],
frozen=account_data["total_amount_long"] - \
account_data["available_long"],
gateway_name=self.gateway_name,
)
short_position = PositionData(
symbol=account_data["contract"],
exchange=Exchange.OKEX, # todo add Exchange
direction=Direction.SHORT,
volume=account_data["total_amount_short"],
frozen=account_data["total_amount_short"] - \
account_data["available_short"],
gateway_name=self.gateway_name,
)
self.gateway.on_position(long_position)
self.gateway.on_position(short_position)
def on_order(self, data: dict):
""""""
print("--------------------------")
for order_data in data:
print(order_data)
contract_symbol = order_data["contract"]
exchange_str, symbol = contract_symbol.split("/")
timestamp = order_data["entrust_time"][11:19]
orderid = order_data["options"]["client_oid"]
order = OrderData(
symbol=symbol,
exchange=EXCHANGE_ONETOKEN2VT[exchange_str],
orderid=orderid,
direction=DIRECTION_ONETOKEN2VT[order_data["bs"]],
price=order_data["entrust_price"],
volume=order_data["entrust_amount"],
traded=order_data["dealt_amount"],
time=timestamp,
gateway_name=self.gateway_name
)
if order_data["canceled_time"]:
order.status = Status.CANCELLED
else:
if order.traded == order.volume:
order.status = Status.ALLTRADED
elif not order.traded:
order.status = Status.NOTTRADED
else:
order.status = Status.PARTTRADED
self.gateway.on_order(order)
# Push trade data
if not order_data["last_dealt_amount"]:
return
trade_timestamp = order_data["last_update"][11:19]
self.trade_count += 1
trade = TradeData(
symbol=order.symbol,
exchange=order.exchange,
orderid=order.orderid,
tradeid=str(self.trade_count),
direction=order.direction,
price=order_data["average_dealt_price"],
volume=order_data["last_dealt_amount"],
gateway_name=self.gateway_name,
time=trade_timestamp
)
self.gateway.on_trade(trade)
def ping(self):
""""""
self.send_packet({"uri": "ping"})