[Add]huobi derivatives gateway

This commit is contained in:
vn.py 2019-05-02 19:28:42 +08:00
parent 4ec6ca2af7
commit 2da861d9e7
4 changed files with 825 additions and 10 deletions

View File

@ -17,6 +17,7 @@ from vnpy.gateway.bitfinex import BitfinexGateway
from vnpy.gateway.onetoken import OnetokenGateway
from vnpy.gateway.okexf import OkexfGateway
from vnpy.gateway.xtp import XtpGateway
from vnpy.gateway.hbdm import HbdmGateway
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.csv_loader import CsvLoaderApp
@ -44,6 +45,7 @@ def main():
main_engine.add_gateway(BitfinexGateway)
main_engine.add_gateway(OnetokenGateway)
main_engine.add_gateway(OkexfGateway)
main_engine.add_gateway(HbdmGateway)
main_engine.add_app(CtaStrategyApp)
main_engine.add_app(CtaBacktesterApp)

View File

@ -0,0 +1 @@
from .hbdm_gateway import HbdmGateway

View File

@ -0,0 +1,819 @@
"""
火币合约接口
"""
import re
import urllib
import base64
import json
import zlib
import hashlib
import hmac
from copy import copy
from datetime import datetime
from threading import Lock
from vnpy.event import Event
from vnpy.api.rest import RestClient, Request
from vnpy.api.websocket import WebsocketClient
from vnpy.trader.constant import (
Direction,
Offset,
Exchange,
Product,
Status,
OrderType
)
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import (
TickData,
OrderData,
TradeData,
AccountData,
PositionData,
ContractData,
OrderRequest,
CancelRequest,
SubscribeRequest
)
from vnpy.trader.event import EVENT_TIMER
REST_HOST = "https://api.hbdm.com"
WEBSOCKET_DATA_HOST = "wss://www.hbdm.com/ws" # Market Data
WEBSOCKET_TRADE_HOST = "wss://api.hbdm.com/notification" # Account and Order
STATUS_HBDM2VT = {
3: Status.NOTTRADED,
4: Status.PARTTRADED,
5: Status.CANCELLED,
6: Status.ALLTRADED,
7: Status.CANCELLED,
}
ORDERTYPE_VT2HBDM = {
OrderType.MARKET: "opponent",
OrderType.LIMIT: "limit",
}
ORDERTYPE_HBDM2VT = {v: k for k, v in ORDERTYPE_VT2HBDM.items()}
DIRECTION_VT2HBDM = {
Direction.LONG: "buy",
Direction.SHORT: "sell",
}
DIRECTION_HBDM2VT = {v: k for k, v in DIRECTION_VT2HBDM.items()}
OFFSET_VT2HBDM = {
Offset.OPEN: "open",
Offset.CLOSE: "close",
}
OFFSET_HBDM2VT = {v: k for k, v in OFFSET_VT2HBDM.items()}
symbol_type_map = {}
class HbdmGateway(BaseGateway):
"""
VN Trader Gateway for Hbdm connection.
"""
default_setting = {
"API Key": "",
"Secret Key": "",
"会话数": 3,
"代理地址": "",
"代理端口": "",
}
def __init__(self, event_engine):
"""Constructor"""
super().__init__(event_engine, "HBDM")
self.rest_api = HbdmRestApi(self)
self.trade_ws_api = HbdmTradeWebsocketApi(self)
self.market_ws_api = HbdmDataWebsocketApi(self)
def connect(self, setting: dict):
""""""
key = setting["API Key"]
secret = setting["Secret Key"]
session_number = setting["会话数"]
proxy_host = setting["代理地址"]
proxy_port = setting["代理端口"]
if proxy_port.isdigit():
proxy_port = int(proxy_port)
else:
proxy_port = 0
self.rest_api.connect(key, secret, session_number,
proxy_host, proxy_port)
self.trade_ws_api.connect(key, secret, proxy_host, proxy_port)
self.market_ws_api.connect(key, secret, proxy_host, proxy_port)
self.init_query()
def subscribe(self, req: SubscribeRequest):
""""""
self.market_ws_api.subscribe(req)
def send_order(self, req: OrderRequest):
""""""
return self.rest_api.send_order(req)
def cancel_order(self, req: CancelRequest):
""""""
self.rest_api.cancel_order(req)
def query_account(self):
""""""
self.rest_api.query_account()
def query_position(self):
""""""
self.rest_api.query_position()
def close(self):
""""""
self.rest_api.stop()
self.trade_ws_api.stop()
self.market_ws_api.stop()
def process_timer_event(self, event: Event):
""""""
self.count += 1
if self.count < 3:
return
self.query_account()
self.query_position()
def init_query(self):
""""""
self.count = 0
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
class HbdmRestApi(RestClient):
"""
HBDM REST API
"""
def __init__(self, gateway: BaseGateway):
""""""
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.host = ""
self.key = ""
self.secret = ""
self.account_id = ""
self.order_count = 10000
self.order_count_lock = Lock()
self.connect_time = 0
self.positions = {}
self.currencies = set()
def sign(self, request):
"""
Generate HBDM signature.
"""
request.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.71 Safari/537.36"
}
params_with_signature = create_signature(
self.key,
request.method,
self.host,
request.path,
self.secret,
request.params
)
request.params = params_with_signature
if request.method == "POST":
request.headers["Content-Type"] = "application/json"
if request.data:
request.data = json.dumps(request.data)
return request
def connect(
self,
key: str,
secret: str,
session_number: int,
proxy_host: str,
proxy_port: int
):
"""
Initialize connection to REST server.
"""
self.key = key
self.secret = secret
self.host, _ = _split_url(REST_HOST)
self.connect_time = int(datetime.now().strftime("%y%m%d%H%M%S"))
self.init(REST_HOST, proxy_host, proxy_port)
self.start(session_number)
self.gateway.write_log("REST API启动成功")
self.query_contract()
def query_account(self):
""""""
self.add_request(
method="POST",
path="/api/v1/contract_account_info",
callback=self.on_query_account
)
def query_position(self):
""""""
self.add_request(
method="POST",
path="/api/v1/contract_position_info",
callback=self.on_query_position
)
def query_order(self):
""""""
for currency in self.currencies:
data = {"symbol": currency}
self.add_request(
method="POST",
path="/api/v1/contract_openorders",
callback=self.on_query_order,
data=data,
extra=currency
)
def query_contract(self):
""""""
self.add_request(
method="GET",
path="/api/v1/contract_contract_info",
callback=self.on_query_contract
)
def new_local_orderid(self):
""""""
with self.order_count_lock:
self.order_count += 1
local_orderid = f"{self.connect_time}{self.order_count}"
return local_orderid
def send_order(self, req: OrderRequest):
""""""
local_orderid = self.new_local_orderid()
order = req.create_order_data(
local_orderid,
self.gateway_name
)
order.time = datetime.now().strftime("%H:%M:%S")
data = {
"contract_code": req.symbol,
"client_order_id": int(local_orderid),
"price": req.price,
"volume": int(req.volume),
"direction": DIRECTION_VT2HBDM[req.direction],
"offset": OFFSET_VT2HBDM[req.offset],
"order_price_type": ORDERTYPE_VT2HBDM.get(req.type, ""),
"lever_rate": 20
}
self.add_request(
method="POST",
path="/api/v1/contract_order",
callback=self.on_send_order,
data=data,
extra=order,
on_error=self.on_send_order_error,
on_failed=self.on_send_order_failed
)
self.gateway.on_order(order)
return order.vt_orderid
def cancel_order(self, req: CancelRequest):
""""""
buf = [i for i in req.symbol if not i.isdigit()]
data = {
"symbol": "".join(buf),
}
if req.orderid > 1000000:
data["client_order_id"] = int(req.orderid)
else:
data["order_id"] = int(req.orderid)
self.add_request(
method="POST",
path="/api/v1/contract_cancel",
callback=self.on_cancel_order,
on_failed=self.on_cancel_order_failed,
data=data,
extra=req
)
def on_query_account(self, data, request):
""""""
if self.check_error(data, "查询账户"):
return
for d in data["data"]:
account = AccountData(
accountid=d["symbol"],
balance=d["margin_balance"],
frozen=d["margin_frozen"],
gateway_name=self.gateway_name,
)
self.gateway.on_account(account)
def on_query_position(self, data, request):
""""""
if self.check_error(data, "查询持仓"):
return
# Clear all buf data
for position in self.positions.values():
position.volume = 0
position.frozen = 0
position.price = 0
position.pnl = 0
for d in data["data"]:
key = f"{d['contract_code']}_{d['direction']}"
position = self.positions.get(key, None)
if not position:
position = PositionData(
symbol=d["contract_code"],
exchange=Exchange.HUOBI,
direction=DIRECTION_HBDM2VT[d["direction"]],
gateway_name=self.gateway_name
)
self.positions[key] = position
position.volume = d["volume"]
position.frozen = d["frozen"]
position.price = d["cost_hold"]
position.pnl = d["profit"]
for position in self.positions.values():
self.gateway.on_position(position)
def on_query_order(self, data, request):
""""""
if self.check_error(data, "查询委托"):
return
for d in data["data"]["orders"]:
dt = datetime.fromtimestamp(d["created_at"] / 1000)
time = dt.strftime("%H:%M:%S")
if d["client_order_id"]:
orderid = d["client_order_id"]
else:
orderid = d["order_id"]
order = OrderData(
orderid=orderid,
symbol=d["contract_code"],
exchange=Exchange.HUOBI,
price=d["price"],
volume=d["volume"],
type=ORDERTYPE_HBDM2VT[d["order_price_type"]],
direction=DIRECTION_HBDM2VT[d["direction"]],
offset=OFFSET_HBDM2VT[d["offset"]],
traded=d["trade_volume"],
status=STATUS_HBDM2VT[d["status"]],
time=time,
gateway_name=self.gateway_name,
)
self.gateway.on_order(order)
self.gateway.write_log(f"{request.extra}委托信息查询成功")
def on_query_contract(self, data, request): # type: (dict, Request)->None
""""""
if self.check_error(data, "查询合约"):
return
for d in data["data"]:
self.currencies.add(d["symbol"])
contract = ContractData(
symbol=d["contract_code"],
exchange=Exchange.HUOBI,
name=d["contract_code"],
pricetick=d["price_tick"],
size=int(d["contract_size"]),
min_volume=1,
product=Product.FUTURES,
gateway_name=self.gateway_name,
)
self.gateway.on_contract(contract)
symbol_type_map[contract.symbol] = d['contract_type']
self.gateway.write_log("合约信息查询成功")
self.query_order()
def on_send_order(self, data, request):
""""""
order = request.extra
if self.check_error(data, "委托"):
order.status = Status.REJECTED
self.gateway.on_order(order)
def on_send_order_failed(self, status_code: str, request: Request):
"""
Callback when sending order failed on server.
"""
order = request.extra
order.status = Status.REJECTED
self.gateway.on_order(order)
msg = f"委托失败,状态码:{status_code},信息:{request.response.text}"
self.gateway.write_log(msg)
def on_send_order_error(
self, exception_type: type, exception_value: Exception, tb, request: Request
):
"""
Callback when sending order caused exception.
"""
order = request.extra
order.status = Status.REJECTED
self.gateway.on_order(order)
# Record exception if not ConnectionError
if not issubclass(exception_type, ConnectionError):
self.on_error(exception_type, exception_value, tb, request)
def on_cancel_order(self, data, request):
""""""
self.check_error(data, "撤单")
def on_cancel_order_failed(self, status_code: str, request: Request):
"""
Callback when canceling order failed on server.
"""
msg = f"撤单失败,状态码:{status_code},信息:{request.response.text}"
self.gateway.write_log(msg)
def check_error(self, data: dict, func: str = ""):
""""""
if data["status"] != "error":
return False
error_code = data["err_code"]
error_msg = data["err_msg"]
self.gateway.write_log(f"{func}请求出错,代码:{error_code},信息:{error_msg}")
return True
class HbdmWebsocketApiBase(WebsocketClient):
""""""
def __init__(self, gateway):
""""""
super(HbdmWebsocketApiBase, self).__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.key = ""
self.secret = ""
self.sign_host = ""
self.path = ""
self.req_id = 0
def connect(
self,
key: str,
secret: str,
url: str,
proxy_host: str,
proxy_port: int
):
""""""
self.key = key
self.secret = secret
host, path = _split_url(url)
self.sign_host = host
self.path = path
self.init(url, proxy_host, proxy_port)
self.start()
def login(self):
""""""
self.req_id += 1
params = {
"op": "auth",
"type": "api",
"cid": str(self.req_id),
}
params.update(create_signature(self.key, "GET", self.sign_host, self.path, self.secret))
return self.send_packet(params)
def on_login(self, packet):
""""""
pass
@staticmethod
def unpack_data(data):
""""""
return json.loads(zlib.decompress(data, 31))
def on_packet(self, packet):
""""""
if "ping" in packet:
req = {"pong": packet["ping"]}
self.send_packet(req)
elif "op" in packet and packet["op"] == "ping":
req = {
"op": "pong",
"ts": packet["ts"]
}
self.send_packet(req)
elif "err-msg" in packet:
return self.on_error_msg(packet)
elif "op" in packet and packet["op"] == "auth":
return self.on_login()
else:
self.on_data(packet)
def on_data(self, packet):
""""""
print("data : {}".format(packet))
def on_error_msg(self, packet):
""""""
msg = packet["err-msg"]
if msg == "invalid pong":
return
self.gateway.write_log(packet["err-msg"])
class HbdmTradeWebsocketApi(HbdmWebsocketApiBase):
""""""
def __init__(self, gateway):
""""""
super().__init__(gateway)
def connect(self, key, secret, proxy_host, proxy_port):
""""""
super().connect(key, secret, WEBSOCKET_TRADE_HOST, proxy_host, proxy_port)
def subscribe(self):
""""""
self.req_id += 1
req = {
"op": "sub",
"cid": str(self.req_id),
"topic": f"orders.*"
}
self.send_packet(req)
def on_connected(self):
""""""
self.gateway.write_log("交易Websocket API连接成功")
self.login()
def on_login(self):
""""""
self.gateway.write_log("交易Websocket API登录成功")
self.subscribe()
def on_data(self, packet): # type: (dict)->None
""""""
op = packet.get("op", None)
if op != "notify":
return
topic = packet["topic"]
if "orders" in topic:
self.on_order(packet)
def on_order(self, data: dict):
""""""
dt = datetime.fromtimestamp(data["created_at"] / 1000)
time = dt.strftime("%H:%M:%S")
if data["client_order_id"]:
orderid = data["client_order_id"]
else:
orderid = data["order_id"]
order = OrderData(
symbol=data["contract_code"],
exchange=Exchange.HUOBI,
orderid=orderid,
type=ORDERTYPE_HBDM2VT[data["order_price_type"]],
direction=DIRECTION_HBDM2VT[data["direction"]],
offset=OFFSET_HBDM2VT[data["offset"]],
price=data["price"],
volume=data["volume"],
traded=data["trade_volume"],
status=STATUS_HBDM2VT[data["status"]],
time=time,
gateway_name=self.gateway_name
)
self.gateway.on_order(order)
# Push trade event
trades = data["trade"]
if not trades:
return
for d in trades:
dt = datetime.fromtimestamp(d["created_at"] / 1000)
time = dt.strftime("%H:%M:%S")
trade = TradeData(
symbol=order.symbol,
exchange=Exchange.HUOBI,
orderid=order.orderid,
tradeid=str(d["trade_id"]),
direction=order.direction,
offset=order.offset,
price=d["trade_price"],
volume=d["trade_volume"],
time=time,
gateway_name=self.gateway_name,
)
self.gateway.on_trade(trade)
class HbdmDataWebsocketApi(HbdmWebsocketApiBase):
""""""
CONTRACT_TYPE_MAP = {
"this_week": "CW",
"next_week": "NW",
"this_quarter": "CQ"
}
def __init__(self, gateway):
""""""
super().__init__(gateway)
self.ticks = {}
def connect(self, key: str, secret: str, proxy_host: str, proxy_port: int):
""""""
super().connect(key, secret, WEBSOCKET_DATA_HOST, proxy_host, proxy_port)
def on_connected(self):
""""""
self.gateway.write_log("行情Websocket API连接成功")
def subscribe(self, req: SubscribeRequest):
""""""
contract_type = symbol_type_map.get(req.symbol, "")
if not contract_type:
return
buf = [i for i in req.symbol if not i.isdigit()]
symbol = "".join(buf)
ws_contract_type = self.CONTRACT_TYPE_MAP[contract_type]
ws_symbol = f"{symbol}_{ws_contract_type}"
# Create tick data buffer
tick = TickData(
symbol=req.symbol,
name=req.symbol,
exchange=Exchange.HUOBI,
datetime=datetime.now(),
gateway_name=self.gateway_name,
)
self.ticks[ws_symbol] = tick
# Subscribe to market depth update
self.req_id += 1
req = {
"sub": f"market.{ws_symbol}.depth.step0",
"id": str(self.req_id)
}
self.send_packet(req)
# Subscribe to market detail update
self.req_id += 1
req = {
"sub": f"market.{ws_symbol}.detail",
"id": str(self.req_id)
}
self.send_packet(req)
def on_data(self, packet): # type: (dict)->None
""""""
channel = packet.get("ch", None)
if channel:
if "depth.step" in channel:
self.on_market_depth(packet)
elif "detail" in channel:
self.on_market_detail(packet)
elif "err_code" in packet:
code = packet["err_code"]
msg = packet["err_msg"]
self.gateway.write_log(f"错误代码:{code}, 错误信息:{msg}")
def on_market_depth(self, data):
"""行情深度推送 """
ws_symbol = data["ch"].split(".")[1]
tick = self.ticks[ws_symbol]
tick.datetime = datetime.fromtimestamp(data["ts"] / 1000)
bids = data["tick"]["bids"]
for n in range(5):
price, volume = bids[n]
tick.__setattr__("bid_price_" + str(n + 1), float(price))
tick.__setattr__("bid_volume_" + str(n + 1), float(volume))
asks = data["tick"]["asks"]
for n in range(5):
price, volume = asks[n]
tick.__setattr__("ask_price_" + str(n + 1), float(price))
tick.__setattr__("ask_volume_" + str(n + 1), float(volume))
if tick.last_price:
self.gateway.on_tick(copy(tick))
def on_market_detail(self, data):
"""市场细节推送"""
ws_symbol = data["ch"].split(".")[1]
tick = self.ticks[ws_symbol]
tick.datetime = datetime.fromtimestamp(data["ts"] / 1000)
tick_data = data["tick"]
tick.open_price = tick_data["open"]
tick.high_price = tick_data["high"]
tick.low_price = tick_data["low"]
tick.last_price = tick_data["close"]
tick.volume = tick_data["vol"]
if tick.bid_price_1:
self.gateway.on_tick(copy(tick))
def _split_url(url):
"""
将url拆分为host和path
:return: host, path
"""
result = re.match("\w+://([^/]*)(.*)", url) # noqa
if result:
return result.group(1), result.group(2)
def create_signature(api_key, method, host, path, secret_key, get_params=None):
"""
创建签名
:param get_params: dict 使用GET方法时附带的额外参数(urlparams)
:return:
"""
sorted_params = [
("AccessKeyId", api_key),
("SignatureMethod", "HmacSHA256"),
("SignatureVersion", "2"),
("Timestamp", datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"))
]
if get_params:
sorted_params.extend(list(get_params.items()))
sorted_params = list(sorted(sorted_params))
encode_params = urllib.parse.urlencode(sorted_params)
payload = [method, host, path, encode_params]
payload = "\n".join(payload)
payload = payload.encode(encoding="UTF8")
secret_key = secret_key.encode(encoding="UTF8")
digest = hmac.new(secret_key, payload, digestmod=hashlib.sha256).digest()
signature = base64.b64encode(digest)
params = dict(sorted_params)
params["Signature"] = signature.decode("UTF8")
return params

View File

@ -1,5 +1,3 @@
# encoding: UTF-8
"""
火币交易接口
"""
@ -77,11 +75,9 @@ class HuobiGateway(BaseGateway):
"代理端口": "",
}
exchanges = [Exchange.HUOBI]
def __init__(self, event_engine):
"""Constructor"""
super(HuobiGateway, self).__init__(event_engine, "HUOBI")
super().__init__(event_engine, "HUOBI")
self.order_manager = LocalOrderManager(self)
@ -157,7 +153,7 @@ class HuobiRestApi(RestClient):
def __init__(self, gateway: BaseGateway):
""""""
super(HuobiRestApi, self).__init__()
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
@ -168,9 +164,6 @@ class HuobiRestApi(RestClient):
self.secret = ""
self.account_id = ""
self.cancel_requests = {}
self.orders = {}
def sign(self, request):
"""
Generate HUOBI signature.
@ -463,7 +456,7 @@ class HuobiWebsocketApiBase(WebsocketClient):
def __init__(self, gateway):
""""""
super(HuobiWebsocketApiBase, self).__init__()
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name