Merge pull request #1682 from vnpy/batch-trade

Batch trade
This commit is contained in:
vn.py 2019-05-10 14:31:38 +08:00 committed by GitHub
commit d1ac0379c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 337 additions and 26 deletions

View File

@ -124,7 +124,8 @@ install_requires = [
"tigeropen", "tigeropen",
"rqdatac", "rqdatac",
"ta-lib", "ta-lib",
"ibapi" "ibapi",
"deap"
] ]
if sys.version_info.minor < 7: if sys.version_info.minor < 7:
install_requires.append("dataclasses") install_requires.append("dataclasses")

View File

@ -521,6 +521,9 @@ class BacktesterChart(pg.GraphicsWindow):
def set_data(self, df): def set_data(self, df):
"""""" """"""
if df is None:
return
count = len(df) count = len(df)
self.dates.clear() self.dates.clear()

View File

@ -12,6 +12,7 @@ import hmac
from copy import copy from copy import copy
from datetime import datetime from datetime import datetime
from threading import Lock from threading import Lock
from typing import Sequence
from vnpy.event import Event from vnpy.event import Event
from vnpy.api.rest import RestClient, Request from vnpy.api.rest import RestClient, Request
@ -22,19 +23,22 @@ from vnpy.trader.constant import (
Exchange, Exchange,
Product, Product,
Status, Status,
OrderType OrderType,
Interval
) )
from vnpy.trader.gateway import BaseGateway from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import ( from vnpy.trader.object import (
TickData, TickData,
OrderData, OrderData,
TradeData, TradeData,
BarData,
AccountData, AccountData,
PositionData, PositionData,
ContractData, ContractData,
OrderRequest, OrderRequest,
CancelRequest, CancelRequest,
SubscribeRequest SubscribeRequest,
HistoryRequest
) )
from vnpy.trader.event import EVENT_TIMER from vnpy.trader.event import EVENT_TIMER
@ -56,7 +60,8 @@ ORDERTYPE_VT2HBDM = {
OrderType.LIMIT: "limit", OrderType.LIMIT: "limit",
} }
ORDERTYPE_HBDM2VT = {v: k for k, v in ORDERTYPE_VT2HBDM.items()} ORDERTYPE_HBDM2VT = {v: k for k, v in ORDERTYPE_VT2HBDM.items()}
ORDERTYPE_HBDM2VT[1] = OrderType.LIMIT
ORDERTYPE_HBDM2VT[3] = OrderType.MARKET
DIRECTION_VT2HBDM = { DIRECTION_VT2HBDM = {
Direction.LONG: "buy", Direction.LONG: "buy",
@ -70,6 +75,18 @@ OFFSET_VT2HBDM = {
} }
OFFSET_HBDM2VT = {v: k for k, v in OFFSET_VT2HBDM.items()} OFFSET_HBDM2VT = {v: k for k, v in OFFSET_VT2HBDM.items()}
INTERVAL_VT2HBDM = {
Interval.MINUTE: "1min",
Interval.HOUR: "60min",
Interval.DAILY: "1day"
}
CONTRACT_TYPE_MAP = {
"this_week": "CW",
"next_week": "NW",
"this_quarter": "CQ"
}
symbol_type_map = {} symbol_type_map = {}
@ -129,6 +146,10 @@ class HbdmGateway(BaseGateway):
"""""" """"""
self.rest_api.cancel_order(req) self.rest_api.cancel_order(req)
def send_orders(self, reqs: Sequence[OrderRequest]):
""""""
return self.rest_api.send_orders(reqs)
def query_account(self): def query_account(self):
"""""" """"""
self.rest_api.query_account() self.rest_api.query_account()
@ -137,6 +158,10 @@ class HbdmGateway(BaseGateway):
"""""" """"""
self.rest_api.query_position() self.rest_api.query_position()
def query_history(self, req: HistoryRequest):
""""""
return self.rest_api.query_history(req)
def close(self): def close(self):
"""""" """"""
self.rest_api.stop() self.rest_api.stop()
@ -169,7 +194,7 @@ class HbdmRestApi(RestClient):
self.gateway = gateway self.gateway = gateway
self.gateway_name = gateway.gateway_name self.gateway_name = gateway.gateway_name
self.host = "" self.host = ""
self.key = "" self.key = ""
self.secret = "" self.secret = ""
@ -249,12 +274,47 @@ class HbdmRestApi(RestClient):
def query_order(self): def query_order(self):
"""""" """"""
for currency in self.currencies: for currency in self.currencies:
# Open Orders
data = {"symbol": currency} data = {"symbol": currency}
self.add_request( self.add_request(
method="POST", method="POST",
path="/api/v1/contract_openorders", path="/api/v1/contract_openorders",
callback=self.on_query_order, callback=self.on_query_active_order,
data=data,
extra=currency
)
# History Orders
data = {
"symbol": currency,
"trade_type": 0,
"type": 2,
"status": 0,
"create_date": 7
}
self.add_request(
method="POST",
path="/api/v1/contract_hisorders",
callback=self.on_query_history_order,
data=data,
extra=currency
)
def query_trade(self):
""""""
for currency in self.currencies:
data = {
"symbol": currency,
"trade_type": 0,
"create_date": 7
}
self.add_request(
method="POST",
path="/api/v1/contract_matchresults",
callback=self.on_query_trade,
data=data, data=data,
extra=currency extra=currency
) )
@ -266,7 +326,67 @@ class HbdmRestApi(RestClient):
path="/api/v1/contract_contract_info", path="/api/v1/contract_contract_info",
callback=self.on_query_contract callback=self.on_query_contract
) )
def query_history(self, req: HistoryRequest):
""""""
# Convert symbol
contract_type = symbol_type_map.get(req.symbol, "")
buf = [i for i in req.symbol if not i.isdigit()]
symbol = "".join(buf)
ws_contract_type = CONTRACT_TYPE_MAP[contract_type]
ws_symbol = f"{symbol}_{ws_contract_type}"
# Create query params
params = {
"symbol": ws_symbol,
"period": INTERVAL_VT2HBDM[req.interval],
"size": 2000
}
# Get response from server
resp = self.request(
"GET",
"/market/history/kline",
params=params
)
# Break if request failed with other status code
history = []
if resp.status_code // 100 != 2:
msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}"
self.gateway.write_log(msg)
else:
data = resp.json()
if not data:
msg = f"获取历史数据为空"
self.gateway.write_log(msg)
else:
for d in data["data"]:
dt = datetime.fromtimestamp(d["id"])
bar = BarData(
symbol=req.symbol,
exchange=req.exchange,
datetime=dt,
interval=req.interval,
volume=d["vol"],
open_price=d["open"],
high_price=d["high"],
low_price=d["low"],
close_price=d["close"],
gateway_name=self.gateway_name
)
history.append(bar)
begin = history[0].datetime
end = history[-1].datetime
msg = f"获取历史数据成功,{req.symbol} - {req.interval.value}{begin} - {end}"
self.gateway.write_log(msg)
return history
def new_local_orderid(self): def new_local_orderid(self):
"""""" """"""
with self.order_count_lock: with self.order_count_lock:
@ -288,8 +408,8 @@ class HbdmRestApi(RestClient):
"client_order_id": int(local_orderid), "client_order_id": int(local_orderid),
"price": req.price, "price": req.price,
"volume": int(req.volume), "volume": int(req.volume),
"direction": DIRECTION_VT2HBDM[req.direction], "direction": DIRECTION_VT2HBDM.get(req.direction, ""),
"offset": OFFSET_VT2HBDM[req.offset], "offset": OFFSET_VT2HBDM.get(req.offset, ""),
"order_price_type": ORDERTYPE_VT2HBDM.get(req.type, ""), "order_price_type": ORDERTYPE_VT2HBDM.get(req.type, ""),
"lever_rate": 20 "lever_rate": 20
} }
@ -307,6 +427,53 @@ class HbdmRestApi(RestClient):
self.gateway.on_order(order) self.gateway.on_order(order)
return order.vt_orderid return order.vt_orderid
def send_orders(self, reqs: Sequence[OrderRequest]):
""""""
orders_data = []
orders = []
vt_orderids = []
for req in reqs:
local_orderid = self.new_local_orderid()
order = req.create_order_data(
local_orderid,
self.gateway_name
)
order.time = datetime.now().strftime("%H:%M:%S")
self.gateway.on_order(order)
d = {
"contract_code": req.symbol,
"client_order_id": int(local_orderid),
"price": req.price,
"volume": int(req.volume),
"direction": DIRECTION_VT2HBDM.get(req.direction, ""),
"offset": OFFSET_VT2HBDM.get(req.offset, ""),
"order_price_type": ORDERTYPE_VT2HBDM.get(req.type, ""),
"lever_rate": 20
}
orders_data.append(d)
orders.append(order)
vt_orderids.append(order.vt_orderid)
data = {
"orders_data": orders_data
}
self.add_request(
method="POST",
path="/api/v1/contract_batchorder",
callback=self.on_send_orders,
data=data,
extra=orders,
on_error=self.on_send_orders_error,
on_failed=self.on_send_orders_failed
)
return vt_orderids
def cancel_order(self, req: CancelRequest): def cancel_order(self, req: CancelRequest):
"""""" """"""
buf = [i for i in req.symbol if not i.isdigit()] buf = [i for i in req.symbol if not i.isdigit()]
@ -315,10 +482,11 @@ class HbdmRestApi(RestClient):
"symbol": "".join(buf), "symbol": "".join(buf),
} }
if req.orderid > 1000000: orderid = int(req.orderid)
data["client_order_id"] = int(req.orderid) if orderid > 1000000:
data["client_order_id"] = orderid
else: else:
data["order_id"] = int(req.orderid) data["order_id"] = orderid
self.add_request( self.add_request(
method="POST", method="POST",
@ -359,7 +527,7 @@ class HbdmRestApi(RestClient):
for d in data["data"]: for d in data["data"]:
key = f"{d['contract_code']}_{d['direction']}" key = f"{d['contract_code']}_{d['direction']}"
position = self.positions.get(key, None) position = self.positions.get(key, None)
if not position: if not position:
position = PositionData( position = PositionData(
symbol=d["contract_code"], symbol=d["contract_code"],
@ -373,17 +541,18 @@ class HbdmRestApi(RestClient):
position.frozen = d["frozen"] position.frozen = d["frozen"]
position.price = d["cost_hold"] position.price = d["cost_hold"]
position.pnl = d["profit"] position.pnl = d["profit"]
for position in self.positions.values(): for position in self.positions.values():
self.gateway.on_position(position) self.gateway.on_position(position)
def on_query_order(self, data, request): def on_query_active_order(self, data, request):
"""""" """"""
if self.check_error(data, "查询委托"): if self.check_error(data, "查询活动委托"):
return return
for d in data["data"]["orders"]: for d in data["data"]["orders"]:
dt = datetime.fromtimestamp(d["created_at"] / 1000) timestamp = d["created_at"]
dt = datetime.fromtimestamp(timestamp / 1000)
time = dt.strftime("%H:%M:%S") time = dt.strftime("%H:%M:%S")
if d["client_order_id"]: if d["client_order_id"]:
@ -407,7 +576,62 @@ class HbdmRestApi(RestClient):
) )
self.gateway.on_order(order) self.gateway.on_order(order)
self.gateway.write_log(f"{request.extra}委托信息查询成功") self.gateway.write_log(f"{request.extra}活动委托信息查询成功")
def on_query_history_order(self, data, request):
""""""
if self.check_error(data, "查询历史委托"):
return
for d in data["data"]["orders"]:
timestamp = d["create_date"]
dt = datetime.fromtimestamp(timestamp / 1000)
time = dt.strftime("%H:%M:%S")
orderid = d["order_id"]
order = OrderData(
orderid=orderid,
symbol=d["contract_code"],
exchange=Exchange.HUOBI,
price=d["price"],
volume=d["volume"],
type=ORDERTYPE_HBDM2VT[d["order_price_type"]],
direction=DIRECTION_HBDM2VT[d["direction"]],
offset=OFFSET_HBDM2VT[d["offset"]],
traded=d["trade_volume"],
status=STATUS_HBDM2VT[d["status"]],
time=time,
gateway_name=self.gateway_name,
)
self.gateway.on_order(order)
self.gateway.write_log(f"{request.extra}历史委托信息查询成功")
def on_query_trade(self, data, request):
""""""
if self.check_error(data, "查询成交"):
return
for d in data["data"]["trades"]:
dt = datetime.fromtimestamp(d["create_date"] / 1000)
time = dt.strftime("%H:%M:%S")
trade = TradeData(
tradeid=d["match_id"],
orderid=d["order_id"],
symbol=d["contract_code"],
exchange=Exchange.HUOBI,
price=d["trade_price"],
volume=d["trade_volume"],
direction=DIRECTION_HBDM2VT[d["direction"]],
offset=OFFSET_HBDM2VT[d["offset"]],
time=time,
gateway_name=self.gateway_name,
)
self.gateway.on_trade(trade)
self.gateway.write_log(f"{request.extra}成交信息查询成功")
def on_query_contract(self, data, request): # type: (dict, Request)->None def on_query_contract(self, data, request): # type: (dict, Request)->None
"""""" """"""
@ -425,6 +649,7 @@ class HbdmRestApi(RestClient):
size=int(d["contract_size"]), size=int(d["contract_size"]),
min_volume=1, min_volume=1,
product=Product.FUTURES, product=Product.FUTURES,
history_data=True,
gateway_name=self.gateway_name, gateway_name=self.gateway_name,
) )
self.gateway.on_contract(contract) self.gateway.on_contract(contract)
@ -434,6 +659,7 @@ class HbdmRestApi(RestClient):
self.gateway.write_log("合约信息查询成功") self.gateway.write_log("合约信息查询成功")
self.query_order() self.query_order()
self.query_trade()
def on_send_order(self, data, request): def on_send_order(self, data, request):
"""""" """"""
@ -479,6 +705,53 @@ class HbdmRestApi(RestClient):
msg = f"撤单失败,状态码:{status_code},信息:{request.response.text}" msg = f"撤单失败,状态码:{status_code},信息:{request.response.text}"
self.gateway.write_log(msg) self.gateway.write_log(msg)
def on_send_orders(self, data, request):
""""""
orders = request.extra
errors = data.get("errors", None)
if errors:
for d in errors:
ix = d["index"]
code = d["err_code"]
msg = d["err_msg"]
order = orders[ix]
order.status = Status.REJECTED
self.gateway.on_order(order)
msg = f"批量委托失败,状态码:{code},信息:{msg}"
self.gateway.write_log(msg)
def on_send_orders_failed(self, status_code: str, request: Request):
"""
Callback when sending order failed on server.
"""
orders = request.extra
for order in orders:
order.status = Status.REJECTED
self.gateway.on_order(order)
msg = f"批量委托失败,状态码:{status_code},信息:{request.response.text}"
self.gateway.write_log(msg)
def on_send_orders_error(
self, exception_type: type, exception_value: Exception, tb, request: Request
):
"""
Callback when sending order caused exception.
"""
orders = request.extra
for order in orders:
order.status = Status.REJECTED
self.gateway.on_order(order)
# Record exception if not ConnectionError
if not issubclass(exception_type, ConnectionError):
self.on_error(exception_type, exception_value, tb, request)
def check_error(self, data: dict, func: str = ""): def check_error(self, data: dict, func: str = ""):
"""""" """"""
if data["status"] != "error": if data["status"] != "error":
@ -670,11 +943,6 @@ class HbdmTradeWebsocketApi(HbdmWebsocketApiBase):
class HbdmDataWebsocketApi(HbdmWebsocketApiBase): class HbdmDataWebsocketApi(HbdmWebsocketApiBase):
"""""" """"""
CONTRACT_TYPE_MAP = {
"this_week": "CW",
"next_week": "NW",
"this_quarter": "CQ"
}
def __init__(self, gateway): def __init__(self, gateway):
"""""" """"""
@ -699,7 +967,7 @@ class HbdmDataWebsocketApi(HbdmWebsocketApiBase):
buf = [i for i in req.symbol if not i.isdigit()] buf = [i for i in req.symbol if not i.isdigit()]
symbol = "".join(buf) symbol = "".join(buf)
ws_contract_type = self.CONTRACT_TYPE_MAP[contract_type] ws_contract_type = CONTRACT_TYPE_MAP[contract_type]
ws_symbol = f"{symbol}_{ws_contract_type}" ws_symbol = f"{symbol}_{ws_contract_type}"
# Create tick data buffer # Create tick data buffer

View File

@ -8,7 +8,7 @@ from datetime import datetime
from email.message import EmailMessage from email.message import EmailMessage
from queue import Empty, Queue from queue import Empty, Queue
from threading import Thread from threading import Thread
from typing import Any from typing import Any, Sequence
from vnpy.event import Event, EventEngine from vnpy.event import Event, EventEngine
from .app import BaseApp from .app import BaseApp
@ -180,6 +180,22 @@ class MainEngine:
if gateway: if gateway:
gateway.cancel_order(req) gateway.cancel_order(req)
def send_orders(self, reqs: Sequence[OrderRequest], gateway_name: str):
"""
"""
gateway = self.get_gateway(gateway_name)
if gateway:
return gateway.send_orders(reqs)
else:
return ["" for req in reqs]
def cancel_orders(self, reqs: Sequence[CancelRequest], gateway_name: str):
"""
"""
gateway = self.get_gateway(gateway_name)
if gateway:
gateway.cancel_orders(reqs)
def query_history(self, req: HistoryRequest, gateway_name: str): def query_history(self, req: HistoryRequest, gateway_name: str):
""" """
Send cancel order request to a specific gateway. Send cancel order request to a specific gateway.

View File

@ -3,7 +3,7 @@
""" """
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any from typing import Any, Sequence
from copy import copy from copy import copy
from vnpy.event import Event, EventEngine from vnpy.event import Event, EventEngine
@ -210,6 +210,29 @@ class BaseGateway(ABC):
""" """
pass pass
def send_orders(self, reqs: Sequence[OrderRequest]):
"""
Send a batch of orders to server.
Use a for loop of send_order function by default.
Reimplement this function if batch order supported on server.
"""
vt_orderids = []
for req in reqs:
vt_orderid = self.send_order(req)
vt_orderids.append(vt_orderid)
return vt_orderids
def cancel_orders(self, reqs: Sequence[CancelRequest]):
"""
Cancel a batch of orders to server.
Use a for loop of cancel_order function by default.
Reimplement this function if batch cancel supported on server.
"""
for req in reqs:
self.cancel_order(req)
@abstractmethod @abstractmethod
def query_account(self): def query_account(self):
""" """