From f16a87990569ad37b5e8155e71d72bb3b1d4c455 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Fri, 5 Apr 2019 12:32:48 +0800 Subject: [PATCH] [Add]LocalOrderManager and use it for async order id map in HuobiGateway --- vnpy/app/csv_loader/ui/widget.py | 4 +- vnpy/gateway/huobi/huobi_gateway.py | 133 +++++++++------------------- vnpy/trader/gateway.py | 122 +++++++++++++++++++++++++ 3 files changed, 165 insertions(+), 94 deletions(-) diff --git a/vnpy/app/csv_loader/ui/widget.py b/vnpy/app/csv_loader/ui/widget.py index 30160d77..79ef9e0c 100644 --- a/vnpy/app/csv_loader/ui/widget.py +++ b/vnpy/app/csv_loader/ui/widget.py @@ -27,8 +27,8 @@ class CsvLoaderWidget(QtWidgets.QWidget): self.setFixedWidth(300) self.setWindowFlags( - (self.windowFlags() | QtCore.Qt.CustomizeWindowHint) & - ~QtCore.Qt.WindowMaximizeButtonHint) + (self.windowFlags() | QtCore.Qt.CustomizeWindowHint) + & ~QtCore.Qt.WindowMaximizeButtonHint) file_button = QtWidgets.QPushButton("选择文件") file_button.clicked.connect(self.select_file) diff --git a/vnpy/gateway/huobi/huobi_gateway.py b/vnpy/gateway/huobi/huobi_gateway.py index b6dfd4bb..5572f5fe 100644 --- a/vnpy/gateway/huobi/huobi_gateway.py +++ b/vnpy/gateway/huobi/huobi_gateway.py @@ -24,7 +24,7 @@ from vnpy.trader.constant import ( Status, OrderType ) -from vnpy.trader.gateway import BaseGateway +from vnpy.trader.gateway import BaseGateway, LocalOrderManager from vnpy.trader.object import ( TickData, OrderData, @@ -81,12 +81,7 @@ class HuobiGateway(BaseGateway): """Constructor""" super(HuobiGateway, self).__init__(event_engine, "HUOBI") - self.order_count = 100000 - - self.local_huobi_map = {} # local orderid: huobi orderid - self.huobi_local_map = {} # huobi orderid: local orderid - self.local_order_map = {} # local orderid: order - self.huobi_order_data = {} # huobi orderid: data + self.order_manager = LocalOrderManager(self) self.rest_api = HuobiRestApi(self) self.trade_ws_api = HuobiTradeWebsocketApi(self) @@ -147,49 +142,6 @@ class HuobiGateway(BaseGateway): self.count = 0 self.event_engine.register(EVENT_TIMER, self.process_timer_event) - def get_local_orderid(self, huobi_orderid: str): - """""" - local_orderid = self.huobi_local_map.get(huobi_orderid, None) - - if not local_orderid: - local_orderid = self.new_local_orderid() - self.update_orderid_map(local_orderid, huobi_orderid) - - return local_orderid - - def get_huobi_orderid(self, local_orderid: str): - """""" - huobi_orderid = self.local_huobi_map.get(local_orderid, "") - return huobi_orderid - - def new_local_orderid(self): - """""" - self.order_count += 1 - return str(self.order_count) - - def update_orderid_map(self, local_orderid: str, huobi_orderid: str): - """""" - self.huobi_local_map[huobi_orderid] = local_orderid - self.local_huobi_map[local_orderid] = huobi_orderid - - if huobi_orderid in self.huobi_order_data: - data = self.huobi_order_data.pop(huobi_orderid) - self.trade_ws_api.on_order(data) - - def on_order(self, order: OrderData): - """""" - self.local_order_map[order.orderid] = order - - super().on_order(copy(order)) - - def get_order(self, huobi_orderid: str): - """""" - local_orderid = self.huobi_local_map.get(huobi_orderid, None) - if not local_orderid: - return None - else: - return self.local_order_map[local_orderid] - class HuobiRestApi(RestClient): """ @@ -202,6 +154,7 @@ class HuobiRestApi(RestClient): self.gateway = gateway self.gateway_name = gateway.gateway_name + self.order_manager = gateway.order_manager self.host = "" self.key = "" @@ -300,7 +253,7 @@ class HuobiRestApi(RestClient): (req.direction, req.type), "" ) - local_orderid = self.gateway.new_local_orderid() + local_orderid = self.order_manager.new_local_orderid() order = req.create_order_data( local_orderid, self.gateway_name @@ -324,19 +277,14 @@ class HuobiRestApi(RestClient): extra=order, ) - self.gateway.on_order(order) + self.order_manager.on_order(order) return order.vt_orderid def cancel_order(self, req: CancelRequest): """""" - local_id = req.orderid - huobi_orderid = self.gateway.get_huobi_orderid(local_id) + sys_orderid = self.order_manager.get_sys_orderid(req.orderid) - if not huobi_orderid: - self.cancel_requests[local_id] = req - return - - path = f"/v1/order/orders/{huobi_orderid}/submitcancel" + path = f"/v1/order/orders/{sys_orderid}/submitcancel" self.add_request( method="POST", path=path, @@ -344,9 +292,6 @@ class HuobiRestApi(RestClient): extra=req ) - if local_id in self.cancel_requests: - self.cancel_requests.pop(local_id) - def on_query_account(self, data, request): """""" if self.check_error(data, "查询账户"): @@ -387,8 +332,8 @@ class HuobiRestApi(RestClient): return for d in data["data"]: - huobi_orderid = d["id"] - local_orderid = self.gateway.get_local_orderid(huobi_orderid) + sys_orderid = d["id"] + local_orderid = self.order_manager.get_local_orderid(sys_orderid) direction, order_type = ORDERTYPE_HUOBI2VT[d["type"]] dt = datetime.fromtimestamp(d["created-at"] / 1000) @@ -408,7 +353,7 @@ class HuobiRestApi(RestClient): gateway_name=self.gateway_name, ) - self.gateway.on_order(order) + self.order_manager.on_order(order) self.gateway.write_log("委托信息查询成功") @@ -446,15 +391,11 @@ class HuobiRestApi(RestClient): if self.check_error(data, "委托"): order.status = Status.REJECTED - self.gateway.on_order(order) + self.order_manager.on_order(order) return - huobi_orderid = str(data["data"]) - self.gateway.update_orderid_map(order.orderid, huobi_orderid) - - req = self.cancel_requests.get(order.orderid, None) - if req: - self.cancel_order(req) + sys_orderid = data["data"] + self.order_manager.update_orderid_map(order.orderid, sys_orderid) def on_cancel_order(self, data, request): """""" @@ -463,12 +404,11 @@ class HuobiRestApi(RestClient): cancel_request = request.extra local_orderid = cancel_request.orderid - huobi_orderid = self.gateway.get_huobi_orderid(local_orderid) - order = self.gateway.get_order(huobi_orderid) + order = self.order_manager.get_order_with_local_orderid(local_orderid) order.status = Status.CANCELLED - self.gateway.on_order(copy(order)) + self.order_manager.on_order(order) self.gateway.write_log(f"委托撤单成功:{order.orderid}") def check_error(self, data: dict, func: str = ""): @@ -562,6 +502,9 @@ class HuobiTradeWebsocketApi(HuobiWebsocketApiBase): """""" super().__init__(gateway) + self.order_manager = gateway.order_manager + self.order_manager.push_data_callback = self.on_data + self.req_id = 0 def connect(self, key, secret, proxy_host, proxy_port): @@ -599,30 +542,36 @@ class HuobiTradeWebsocketApi(HuobiWebsocketApiBase): def on_order(self, data: dict): """""" - huobi_orderid = str(data["order-id"]) - order = self.gateway.get_order(huobi_orderid) + sys_orderid = str(data["order-id"]) + + order = self.order_manager.get_order_with_sys_orderid(sys_orderid) if not order: - self.gateway.huobi_order_data[huobi_orderid] = data + self.order_manager.add_push_data(sys_orderid, data) return traded_volume = float(data["filled-amount"]) + + # Push order event order.traded += traded_volume order.status = STATUS_HUOBI2VT.get(data["order-state"], None) - self.gateway.on_order(order) + self.order_manager.on_order(order) - if traded_volume: - trade = TradeData( - symbol=order.symbol, - exchange=Exchange.HUOBI, - orderid=order.orderid, - tradeid=str(data["seq-id"]), - direction=order.direction, - price=float(data["price"]), - volume=float(data["filled-amount"]), - time=datetime.now().strftime("%H:%M:%S"), - gateway_name=self.gateway_name, - ) - self.gateway.on_trade(trade) + # Push trade event + if not traded_volume: + return + + trade = TradeData( + symbol=order.symbol, + exchange=Exchange.HUOBI, + orderid=order.orderid, + tradeid=str(data["seq-id"]), + direction=order.direction, + price=float(data["price"]), + volume=float(data["filled-amount"]), + time=datetime.now().strftime("%H:%M:%S"), + gateway_name=self.gateway_name, + ) + self.gateway.on_trade(trade) class HuobiDataWebsocketApi(HuobiWebsocketApiBase): diff --git a/vnpy/trader/gateway.py b/vnpy/trader/gateway.py index d1a6248f..ce7aa591 100644 --- a/vnpy/trader/gateway.py +++ b/vnpy/trader/gateway.py @@ -4,6 +4,7 @@ from abc import ABC, abstractmethod from typing import Any +from copy import copy from vnpy.event import Event, EventEngine from .event import ( @@ -227,3 +228,124 @@ class BaseGateway(ABC): Return default setting dict. """ return self.default_setting + + +class LocalOrderManager: + """ + Management tool to support use local order id for trading. + """ + + def __init__(self, gateway: BaseGateway): + """""" + self.gateway = gateway + + # For generating local orderid + self.order_prefix = "" + self.order_count = 0 + self.orders = {} # local_orderid:order + + # Map between local and system orderid + self.local_sys_orderid_map = {} + self.sys_local_orderid_map = {} + + # Push order data buf + self.push_data_buf = {} # sys_orderid:data + + # Callback for processing push order data + self.push_data_callback = None + + # Cancel request buf + self.cancel_request_buf = {} # local_orderid:req + + def new_local_orderid(self): + """ + Generate a new local orderid. + """ + self.order_count += 1 + local_orderid = str(self.order_count).rjust(8, "0") + return local_orderid + + def get_local_orderid(self, sys_orderid: str): + """ + Get local orderid with sys orderid. + """ + local_orderid = self.sys_local_orderid_map.get(sys_orderid, "") + + if not local_orderid: + local_orderid = self.new_local_orderid() + self.update_orderid_map(local_orderid, sys_orderid) + + return local_orderid + + def get_sys_orderid(self, local_orderid: str): + """ + Get sys orderid with local orderid. + """ + sys_orderid = self.local_sys_orderid_map.get(local_orderid, "") + return sys_orderid + + def update_orderid_map(self, local_orderid: str, sys_orderid: str): + """ + Update orderid map. + """ + self.sys_local_orderid_map[sys_orderid] = local_orderid + self.local_sys_orderid_map[local_orderid] = sys_orderid + + self.check_cancel_request(local_orderid) + self.check_push_data(sys_orderid) + + def check_push_data(self, sys_orderid: str): + """ + Check if any order push data waiting. + """ + if sys_orderid not in self.push_data_buf: + return + + data = self.push_data_buf.pop(sys_orderid) + if self.push_data_callback: + self.push_data_callback(data) + + def add_push_data(self, sys_orderid: str, data: dict): + """ + Add push data into buf. + """ + self.push_data_buf[sys_orderid] = data + + def get_order_with_sys_orderid(self, sys_orderid: str): + """""" + local_orderid = self.sys_local_orderid_map.get(sys_orderid, None) + if not local_orderid: + return None + else: + return self.get_order_with_local_orderid(local_orderid) + + def get_order_with_local_orderid(self, local_orderid: str): + """""" + order = self.orders[local_orderid] + return copy(order) + + def on_order(self, order: OrderData): + """ + Keep an order buf before pushing it to gateway. + """ + self.orders[order.orderid] = copy(order) + self.gateway.on_order(order) + + def cancel_order(self, req: CancelRequest): + """ + """ + sys_orderid = self.get_sys_orderid(req.orderid) + if not sys_orderid: + self.cancel_request_buf[req.orderid] = req + return + + self.gateway.cancel_order(req) + + def check_cancel_request(self, local_orderid: str): + """ + """ + if local_orderid not in self.cancel_request_buf: + return + + req = self.cancel_request_buf.pop(local_orderid) + self.gateway.cancel_order(req)