[Add]LocalOrderManager and use it for async order id map in HuobiGateway

This commit is contained in:
vn.py 2019-04-05 12:32:48 +08:00
parent 81454e7dfb
commit f16a879905
3 changed files with 165 additions and 94 deletions

View File

@ -27,8 +27,8 @@ class CsvLoaderWidget(QtWidgets.QWidget):
self.setFixedWidth(300)
self.setWindowFlags(
(self.windowFlags() | QtCore.Qt.CustomizeWindowHint) &
~QtCore.Qt.WindowMaximizeButtonHint)
(self.windowFlags() | QtCore.Qt.CustomizeWindowHint)
& ~QtCore.Qt.WindowMaximizeButtonHint)
file_button = QtWidgets.QPushButton("选择文件")
file_button.clicked.connect(self.select_file)

View File

@ -24,7 +24,7 @@ from vnpy.trader.constant import (
Status,
OrderType
)
from vnpy.trader.gateway import BaseGateway
from vnpy.trader.gateway import BaseGateway, LocalOrderManager
from vnpy.trader.object import (
TickData,
OrderData,
@ -81,12 +81,7 @@ class HuobiGateway(BaseGateway):
"""Constructor"""
super(HuobiGateway, self).__init__(event_engine, "HUOBI")
self.order_count = 100000
self.local_huobi_map = {} # local orderid: huobi orderid
self.huobi_local_map = {} # huobi orderid: local orderid
self.local_order_map = {} # local orderid: order
self.huobi_order_data = {} # huobi orderid: data
self.order_manager = LocalOrderManager(self)
self.rest_api = HuobiRestApi(self)
self.trade_ws_api = HuobiTradeWebsocketApi(self)
@ -147,49 +142,6 @@ class HuobiGateway(BaseGateway):
self.count = 0
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def get_local_orderid(self, huobi_orderid: str):
""""""
local_orderid = self.huobi_local_map.get(huobi_orderid, None)
if not local_orderid:
local_orderid = self.new_local_orderid()
self.update_orderid_map(local_orderid, huobi_orderid)
return local_orderid
def get_huobi_orderid(self, local_orderid: str):
""""""
huobi_orderid = self.local_huobi_map.get(local_orderid, "")
return huobi_orderid
def new_local_orderid(self):
""""""
self.order_count += 1
return str(self.order_count)
def update_orderid_map(self, local_orderid: str, huobi_orderid: str):
""""""
self.huobi_local_map[huobi_orderid] = local_orderid
self.local_huobi_map[local_orderid] = huobi_orderid
if huobi_orderid in self.huobi_order_data:
data = self.huobi_order_data.pop(huobi_orderid)
self.trade_ws_api.on_order(data)
def on_order(self, order: OrderData):
""""""
self.local_order_map[order.orderid] = order
super().on_order(copy(order))
def get_order(self, huobi_orderid: str):
""""""
local_orderid = self.huobi_local_map.get(huobi_orderid, None)
if not local_orderid:
return None
else:
return self.local_order_map[local_orderid]
class HuobiRestApi(RestClient):
"""
@ -202,6 +154,7 @@ class HuobiRestApi(RestClient):
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.order_manager = gateway.order_manager
self.host = ""
self.key = ""
@ -300,7 +253,7 @@ class HuobiRestApi(RestClient):
(req.direction, req.type), ""
)
local_orderid = self.gateway.new_local_orderid()
local_orderid = self.order_manager.new_local_orderid()
order = req.create_order_data(
local_orderid,
self.gateway_name
@ -324,19 +277,14 @@ class HuobiRestApi(RestClient):
extra=order,
)
self.gateway.on_order(order)
self.order_manager.on_order(order)
return order.vt_orderid
def cancel_order(self, req: CancelRequest):
""""""
local_id = req.orderid
huobi_orderid = self.gateway.get_huobi_orderid(local_id)
sys_orderid = self.order_manager.get_sys_orderid(req.orderid)
if not huobi_orderid:
self.cancel_requests[local_id] = req
return
path = f"/v1/order/orders/{huobi_orderid}/submitcancel"
path = f"/v1/order/orders/{sys_orderid}/submitcancel"
self.add_request(
method="POST",
path=path,
@ -344,9 +292,6 @@ class HuobiRestApi(RestClient):
extra=req
)
if local_id in self.cancel_requests:
self.cancel_requests.pop(local_id)
def on_query_account(self, data, request):
""""""
if self.check_error(data, "查询账户"):
@ -387,8 +332,8 @@ class HuobiRestApi(RestClient):
return
for d in data["data"]:
huobi_orderid = d["id"]
local_orderid = self.gateway.get_local_orderid(huobi_orderid)
sys_orderid = d["id"]
local_orderid = self.order_manager.get_local_orderid(sys_orderid)
direction, order_type = ORDERTYPE_HUOBI2VT[d["type"]]
dt = datetime.fromtimestamp(d["created-at"] / 1000)
@ -408,7 +353,7 @@ class HuobiRestApi(RestClient):
gateway_name=self.gateway_name,
)
self.gateway.on_order(order)
self.order_manager.on_order(order)
self.gateway.write_log("委托信息查询成功")
@ -446,15 +391,11 @@ class HuobiRestApi(RestClient):
if self.check_error(data, "委托"):
order.status = Status.REJECTED
self.gateway.on_order(order)
self.order_manager.on_order(order)
return
huobi_orderid = str(data["data"])
self.gateway.update_orderid_map(order.orderid, huobi_orderid)
req = self.cancel_requests.get(order.orderid, None)
if req:
self.cancel_order(req)
sys_orderid = data["data"]
self.order_manager.update_orderid_map(order.orderid, sys_orderid)
def on_cancel_order(self, data, request):
""""""
@ -463,12 +404,11 @@ class HuobiRestApi(RestClient):
cancel_request = request.extra
local_orderid = cancel_request.orderid
huobi_orderid = self.gateway.get_huobi_orderid(local_orderid)
order = self.gateway.get_order(huobi_orderid)
order = self.order_manager.get_order_with_local_orderid(local_orderid)
order.status = Status.CANCELLED
self.gateway.on_order(copy(order))
self.order_manager.on_order(order)
self.gateway.write_log(f"委托撤单成功:{order.orderid}")
def check_error(self, data: dict, func: str = ""):
@ -562,6 +502,9 @@ class HuobiTradeWebsocketApi(HuobiWebsocketApiBase):
""""""
super().__init__(gateway)
self.order_manager = gateway.order_manager
self.order_manager.push_data_callback = self.on_data
self.req_id = 0
def connect(self, key, secret, proxy_host, proxy_port):
@ -599,30 +542,36 @@ class HuobiTradeWebsocketApi(HuobiWebsocketApiBase):
def on_order(self, data: dict):
""""""
huobi_orderid = str(data["order-id"])
order = self.gateway.get_order(huobi_orderid)
sys_orderid = str(data["order-id"])
order = self.order_manager.get_order_with_sys_orderid(sys_orderid)
if not order:
self.gateway.huobi_order_data[huobi_orderid] = data
self.order_manager.add_push_data(sys_orderid, data)
return
traded_volume = float(data["filled-amount"])
# Push order event
order.traded += traded_volume
order.status = STATUS_HUOBI2VT.get(data["order-state"], None)
self.gateway.on_order(order)
self.order_manager.on_order(order)
if traded_volume:
trade = TradeData(
symbol=order.symbol,
exchange=Exchange.HUOBI,
orderid=order.orderid,
tradeid=str(data["seq-id"]),
direction=order.direction,
price=float(data["price"]),
volume=float(data["filled-amount"]),
time=datetime.now().strftime("%H:%M:%S"),
gateway_name=self.gateway_name,
)
self.gateway.on_trade(trade)
# Push trade event
if not traded_volume:
return
trade = TradeData(
symbol=order.symbol,
exchange=Exchange.HUOBI,
orderid=order.orderid,
tradeid=str(data["seq-id"]),
direction=order.direction,
price=float(data["price"]),
volume=float(data["filled-amount"]),
time=datetime.now().strftime("%H:%M:%S"),
gateway_name=self.gateway_name,
)
self.gateway.on_trade(trade)
class HuobiDataWebsocketApi(HuobiWebsocketApiBase):

View File

@ -4,6 +4,7 @@
from abc import ABC, abstractmethod
from typing import Any
from copy import copy
from vnpy.event import Event, EventEngine
from .event import (
@ -227,3 +228,124 @@ class BaseGateway(ABC):
Return default setting dict.
"""
return self.default_setting
class LocalOrderManager:
"""
Management tool to support use local order id for trading.
"""
def __init__(self, gateway: BaseGateway):
""""""
self.gateway = gateway
# For generating local orderid
self.order_prefix = ""
self.order_count = 0
self.orders = {} # local_orderid:order
# Map between local and system orderid
self.local_sys_orderid_map = {}
self.sys_local_orderid_map = {}
# Push order data buf
self.push_data_buf = {} # sys_orderid:data
# Callback for processing push order data
self.push_data_callback = None
# Cancel request buf
self.cancel_request_buf = {} # local_orderid:req
def new_local_orderid(self):
"""
Generate a new local orderid.
"""
self.order_count += 1
local_orderid = str(self.order_count).rjust(8, "0")
return local_orderid
def get_local_orderid(self, sys_orderid: str):
"""
Get local orderid with sys orderid.
"""
local_orderid = self.sys_local_orderid_map.get(sys_orderid, "")
if not local_orderid:
local_orderid = self.new_local_orderid()
self.update_orderid_map(local_orderid, sys_orderid)
return local_orderid
def get_sys_orderid(self, local_orderid: str):
"""
Get sys orderid with local orderid.
"""
sys_orderid = self.local_sys_orderid_map.get(local_orderid, "")
return sys_orderid
def update_orderid_map(self, local_orderid: str, sys_orderid: str):
"""
Update orderid map.
"""
self.sys_local_orderid_map[sys_orderid] = local_orderid
self.local_sys_orderid_map[local_orderid] = sys_orderid
self.check_cancel_request(local_orderid)
self.check_push_data(sys_orderid)
def check_push_data(self, sys_orderid: str):
"""
Check if any order push data waiting.
"""
if sys_orderid not in self.push_data_buf:
return
data = self.push_data_buf.pop(sys_orderid)
if self.push_data_callback:
self.push_data_callback(data)
def add_push_data(self, sys_orderid: str, data: dict):
"""
Add push data into buf.
"""
self.push_data_buf[sys_orderid] = data
def get_order_with_sys_orderid(self, sys_orderid: str):
""""""
local_orderid = self.sys_local_orderid_map.get(sys_orderid, None)
if not local_orderid:
return None
else:
return self.get_order_with_local_orderid(local_orderid)
def get_order_with_local_orderid(self, local_orderid: str):
""""""
order = self.orders[local_orderid]
return copy(order)
def on_order(self, order: OrderData):
"""
Keep an order buf before pushing it to gateway.
"""
self.orders[order.orderid] = copy(order)
self.gateway.on_order(order)
def cancel_order(self, req: CancelRequest):
"""
"""
sys_orderid = self.get_sys_orderid(req.orderid)
if not sys_orderid:
self.cancel_request_buf[req.orderid] = req
return
self.gateway.cancel_order(req)
def check_cancel_request(self, local_orderid: str):
"""
"""
if local_orderid not in self.cancel_request_buf:
return
req = self.cancel_request_buf.pop(local_orderid)
self.gateway.cancel_order(req)