From d73be6d77c52e3428680adcca2b88d457bc0dfd0 Mon Sep 17 00:00:00 2001 From: 1122455801 Date: Fri, 15 Mar 2019 10:12:55 +0800 Subject: [PATCH 01/10] Update tiger_gateway.py --- vnpy/gateway/tiger/tiger_gateway.py | 81 ++++++++++++++++------------- 1 file changed, 44 insertions(+), 37 deletions(-) diff --git a/vnpy/gateway/tiger/tiger_gateway.py b/vnpy/gateway/tiger/tiger_gateway.py index d0c1f4ab..74d34bce 100644 --- a/vnpy/gateway/tiger/tiger_gateway.py +++ b/vnpy/gateway/tiger/tiger_gateway.py @@ -116,6 +116,8 @@ class TigerGateway(BaseGateway): self.queue = Queue() self.pool = None + self.ID_TIGER2VT = {} + self.ID_VT2TIGER = {} self.ticks = {} self.trades = set() self.contracts = {} @@ -125,18 +127,14 @@ class TigerGateway(BaseGateway): """""" while self.active: try: - func, arg = self.queue.get(timeout=0.1) - print(func, arg) - if arg: - func(arg) - else: - func() + func, args = self.queue.get(timeout=0.1) + func(*args) except Empty: pass - def add_task(self, func, arg=None): + def add_task(self, func, *args): """""" - self.queue.put((func, arg)) + self.queue.put((func, [*args])) def connect(self, setting: dict): """""" @@ -157,9 +155,6 @@ class TigerGateway(BaseGateway): self.add_task(self.connect_quote) self.add_task(self.connect_trade) self.add_task(self.connect_push) - self.write_log("行情接口连接成功") - - # self.thread.start() def init_client_config(self, sandbox=True): """""" @@ -183,6 +178,7 @@ class TigerGateway(BaseGateway): self.write_log("查询合约失败") return + self.write_log("行情接口连接成功") self.write_log("合约查询成功") def connect_trade(self): @@ -256,6 +252,8 @@ class TigerGateway(BaseGateway): def on_asset_change(self, tiger_account: str, data: list): """""" data = dict(data) + if "net_liquidation" not in data: + return account = AccountData( accountid=tiger_account, @@ -274,7 +272,7 @@ class TigerGateway(BaseGateway): symbol=symbol, exchange=exchange, direction=Direction.NET, - volume=data["quantity"], + volume=int(data["quantity"]), frozen=0.0, price=data["average_cost"], pnl=data["unrealized_pnl"], @@ -283,18 +281,16 @@ class TigerGateway(BaseGateway): self.on_position(pos) def on_order_change(self, tiger_account: str, data: list): - """""" - print("委托", data) - self.local_id += 1 + """""" data = dict(data) + print("委托推送", data["origin_symbol"], data["order_id"], data["filled"], data["status"]) symbol, exchange = convert_symbol_tiger2vt(data["origin_symbol"]) status = PUSH_STATUS_TIGER2VT[data["status"]] order = OrderData( symbol=symbol, exchange=exchange, - # orderid=data["order_id"], - orderid=self.local_id, + orderid=self.ID_TIGER2VT.get(str(data["order_id"]), self.get_new_local_id()), direction=Direction.NET, price=data.get("limit_price", 0), volume=data["quantity"], @@ -313,28 +309,30 @@ class TigerGateway(BaseGateway): exchange=exchange, direction=Direction.NET, tradeid=self.tradeid, - orderid=data["order_id"], + orderid=self.ID_TIGER2VT[str(data["order_id"])], price=data["avg_fill_price"], volume=data["filled"], time=datetime.fromtimestamp(data["trade_time"] / 1000).strftime("%H:%M:%S"), gateway_name=self.gateway_name, ) self.on_trade(trade) + + def get_new_local_id(self): + self.local_id += 1 + return self.local_id def send_order(self, req: OrderRequest): """""" - self.local_id += 1 - order = req.create_order_data(self.local_id, self.gateway_name) - return order.vt_orderid + local_id = self.get_new_local_id() + order = req.create_order_data(local_id, self.gateway_name) + self.on_order(order) + self.add_task(self._send_order, req, local_id) + return order.vt_orderid - self.add_task(self._send_order, req) - - def _send_order(self, req: OrderRequest): + def _send_order(self, req: OrderRequest, local_id): """""" currency = config_symbol_currency(req.symbol) - - # first, get contract try: contract = self.trade_client.get_contracts(symbol=req.symbol, currency=currency)[0] order = self.trade_client.create_order( @@ -345,8 +343,14 @@ class TigerGateway(BaseGateway): quantity=int(req.volume), limit_price=req.price, ) + self.ID_TIGER2VT[str(order.order_id)] = local_id + self.ID_VT2TIGER[local_id] = str(order.order_id) + self.trade_client.place_order(order) + print("发单:", order.contract.symbol, order.order_id, order.quantity, order.status) + except: # noqa + traceback.print_exc() self.write_log("发单失败") return @@ -357,7 +361,8 @@ class TigerGateway(BaseGateway): def _cancel_order(self, req: CancelRequest): """""" try: - data = self.trade_client.cancel_order(order_id=req.orderid) + order_id = self.ID_VT2TIGER[req.orderid] + data = self.trade_client.cancel_order(order_id=order_id) except ApiException: self.write_log(f"撤单失败:{req.orderid}") @@ -420,8 +425,7 @@ class TigerGateway(BaseGateway): for ix, row in contract_CN.iterrows(): symbol = row["symbol"] symbol, exchange = convert_symbol_tiger2vt(symbol) - if symbol == '600001': - print(f"symbol: {symbol} t:{type(symbol)} l:{len(symbol)} ex:{exchange} n:{row['name']}") + contract = ContractData( symbol=symbol, exchange=exchange, @@ -467,7 +471,7 @@ class TigerGateway(BaseGateway): symbol=symbol, exchange=exchange, direction=Direction.NET, - volume=i.quantity, + volume=int(i.quantity), frozen=0.0, price=i.average_cost, pnl=float(i.unrealized_pnl), @@ -500,12 +504,12 @@ class TigerGateway(BaseGateway): """""" for i in data: symbol, exchange = convert_symbol_tiger2vt(str(i.contract)) - self.local_id += 1 + local_id = self.get_new_local_id() + order = OrderData( symbol=symbol, exchange=exchange, - orderid=self.local_id, - # orderid=str(i.order_id), + orderid=local_id, direction=Direction.NET, price=i.limit_price if i.limit_price else 0.0, volume=i.quantity, @@ -514,17 +518,20 @@ class TigerGateway(BaseGateway): time=datetime.fromtimestamp(i.order_time / 1000).strftime("%H:%M:%S"), gateway_name=self.gateway_name, ) - + self.ID_TIGER2VT[str(i.order_id)] = local_id self.on_order(order) + + self.ID_VT2TIGER = {v: k for k, v in self.ID_TIGER2VT.items()} + print("原始委托字典", self.ID_TIGER2VT) + print("原始反向字典", self.ID_VT2TIGER) def process_deal(self, data): """ Process trade data for both query and update. """ - for i in reversed(data): + for i in data: if i.status == ORDER_STATUS.PARTIALLY_FILLED or i.status == ORDER_STATUS.FILLED: symbol, exchange = convert_symbol_tiger2vt(str(i.contract)) - self.local_id += 1 self.tradeid += 1 trade = TradeData( @@ -532,7 +539,7 @@ class TigerGateway(BaseGateway): exchange=exchange, direction=Direction.NET, tradeid=self.tradeid, - orderid=self.local_id, + orderid=self.ID_TIGER2VT[str(i.order_id)], price=i.avg_fill_price, volume=i.filled, time=datetime.fromtimestamp(i.trade_time / 1000).strftime("%H:%M:%S"), From 51c8ed3b0330e0d4ec32b52562318e03c44cffea Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 15 Mar 2019 02:24:58 -0400 Subject: [PATCH 02/10] [Add] some docs for BaseGateway --- vnpy/trader/gateway.py | 56 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/vnpy/trader/gateway.py b/vnpy/trader/gateway.py index 45f12d84..598120c3 100644 --- a/vnpy/trader/gateway.py +++ b/vnpy/trader/gateway.py @@ -33,6 +33,28 @@ class BaseGateway(ABC): """ Abstract gateway class for creating gateways connection to different trading systems. + + # How to implement a gateway: + + A gateway should satisfies: + * this class should be thread-safe: + * all methods should be thread-safe + * no mutable shared properties between objects. + * all methods should be non-blocked + * satisfies all requirements written in docstring for every method and callbacks. + * automatically reconnect if connection lost. + + methods must implements: + all @abstractmethod + + callbacks must response manually: + * on_tick + * on_trade + * on_order + * on_position + * on_account + * on_contract + """ # Fields required in setting dict for connect function. @@ -113,6 +135,21 @@ class BaseGateway(ABC): def connect(self, setting: dict): """ Start gateway connection. + + to implement this method, you must: + * connect to server if necessary + * log connected if all necessary connection is established + * do the following query and response corresponding on_xxxx and write_log + * contracts : on_contract + * account asset : on_account + * account holding: on_position + * orders of account: on_order + * trades of account: on_trade + * if any of query above is failed, write log. + + future plan: + response callback/change status instead of write_log + """ pass @@ -131,9 +168,19 @@ class BaseGateway(ABC): pass @abstractmethod - def send_order(self, req: OrderRequest): + def send_order(self, req: OrderRequest) -> str: """ - Send a new order. + Send a new order to server. + + implementation should finish the tasks blow: + * create an OrderData from req using OrderRequest.create_order_data + * send request to server + * if request is sent, OrderData.status should be set to Status.SUBMITTING + * if request is failed to sent, OrderData.status should be set to Status.REJECTED + * response on_order: + * return OrderData.vt_orderid + + :return str vt_orderid for created OrderData """ pass @@ -141,6 +188,10 @@ class BaseGateway(ABC): def cancel_order(self, req: CancelRequest): """ Cancel an existing order. + implementation should finish the tasks blow: + * send request to server + + """ pass @@ -148,6 +199,7 @@ class BaseGateway(ABC): def query_account(self): """ Query account balance. + """ pass From 49e721844ed4952e29fc9de182e54a7ca175cb9e Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 15 Mar 2019 02:26:05 -0400 Subject: [PATCH 03/10] [Mod] change typing and docs for RestClient and WebSocketClient --- vnpy/api/rest/rest_client.py | 14 +++++++------- vnpy/api/websocket/websocket_client.py | 18 ++++++++++++------ 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/vnpy/api/rest/rest_client.py b/vnpy/api/rest/rest_client.py index ff8de574..24a6b80f 100644 --- a/vnpy/api/rest/rest_client.py +++ b/vnpy/api/rest/rest_client.py @@ -6,7 +6,7 @@ from datetime import datetime from enum import Enum from multiprocessing.dummy import Pool from queue import Empty, Queue -from typing import Any, Callable +from typing import Any, Callable, Optional import requests @@ -79,7 +79,7 @@ class RestClient(object): """ HTTP Client designed for all sorts of trading RESTFul API. - * Reimplement before_request function to add signature function. + * Reimplement sign function to add signature function. * Reimplement on_failed function to handle Non-2xx responses. * Use on_failed parameter in add_request function for individual Non-2xx response handling. * Reimplement on_error function to handle exception msg. @@ -88,7 +88,7 @@ class RestClient(object): def __init__(self): """ """ - self.url_base = None # type: str + self.url_base = '' # type: str self._active = False self._queue = Queue() @@ -208,7 +208,7 @@ class RestClient(object): exception_type: type, exception_value: Exception, tb, - request: Request, + request: Optional[Request], ): """ Default on_error handler for Python exception. @@ -223,7 +223,7 @@ class RestClient(object): exception_type: type, exception_value: Exception, tb, - request: Request, + request: Optional[Request], ): text = "[{}]: Unhandled RestClient Error:{}\n".format( datetime.now().isoformat(), exception_type @@ -236,8 +236,8 @@ class RestClient(object): return text def _process_request( - self, request: Request, session: requests.session - ): # type: (Request, requests.Session)->None + self, request: Request, session: requests.Session + ): """ Sending request to server and get result. """ diff --git a/vnpy/api/websocket/websocket_client.py b/vnpy/api/websocket/websocket_client.py index 61541594..2a1820a8 100644 --- a/vnpy/api/websocket/websocket_client.py +++ b/vnpy/api/websocket/websocket_client.py @@ -7,6 +7,7 @@ import traceback from datetime import datetime from threading import Lock, Thread from time import sleep +from typing import Any import websocket @@ -23,13 +24,16 @@ class WebsocketClient(object): Default serialization format is json. - Callbacks to reimplement: + Callbacks to overrides: + * unpack_data * on_connected * on_disconnected * on_packet * on_error After start() is called, the ping thread will ping server every 60 seconds. + + If you want to send anything other than JSON, override send_packet. """ def __init__(self): @@ -92,22 +96,24 @@ class WebsocketClient(object): def send_packet(self, packet: dict): """ Send a packet (dict data) to server + + override this if you want to send non-json packet """ text = json.dumps(packet) self._record_last_sent_text(text) - return self._get_ws().send(text, opcode=websocket.ABNF.OPCODE_TEXT) + return self._send_text(text) - def send_text(self, text: str): + def _send_text(self, text: str): """ Send a text string to server. """ return self._get_ws().send(text, opcode=websocket.ABNF.OPCODE_TEXT) - def send_binary(self, data: bytes): + def _send_binary(self, data: bytes): """ Send bytes data to server. """ - return self._get_ws().send_binary(data) + return self._get_ws()._send_binary(data) def _reconnect(self): """""" @@ -189,7 +195,7 @@ class WebsocketClient(object): """ Default serialization format is json. - Reimplement this method if you want to use other serialization format. + override this method if you want to use other serialization format. """ return json.loads(data) From 39effc29b4407f000d68d07c6b745b423c7a9208 Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 15 Mar 2019 02:28:54 -0400 Subject: [PATCH 04/10] [Mod] BtimexGateway: renaeme setting.session into session_number --- vnpy/gateway/bitmex/bitmex_gateway.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/vnpy/gateway/bitmex/bitmex_gateway.py b/vnpy/gateway/bitmex/bitmex_gateway.py index 638ee2a6..437564fe 100644 --- a/vnpy/gateway/bitmex/bitmex_gateway.py +++ b/vnpy/gateway/bitmex/bitmex_gateway.py @@ -62,7 +62,7 @@ class BitmexGateway(BaseGateway): default_setting = { "key": "", "secret": "", - "session": 3, + "session_number": 3, "server": ["REAL", "TESTNET"], "proxy_host": "127.0.0.1", "proxy_port": 1080, @@ -79,12 +79,12 @@ class BitmexGateway(BaseGateway): """""" key = setting["key"] secret = setting["secret"] - session = setting["session"] + session_number = setting["session_number"] server = setting["server"] proxy_host = setting["proxy_host"] proxy_port = setting["proxy_port"] - self.rest_api.connect(key, secret, session, + self.rest_api.connect(key, secret, session_number, server, proxy_host, proxy_port) self.ws_api.connect(key, secret, server, proxy_host, proxy_port) @@ -172,7 +172,7 @@ class BitmexRestApi(RestClient): self, key: str, secret: str, - session: int, + session_number: int, server: str, proxy_host: str, proxy_port: int, @@ -192,7 +192,7 @@ class BitmexRestApi(RestClient): else: self.init(TESTNET_REST_HOST, proxy_host, proxy_port) - self.start(session) + self.start(session_number) self.gateway.write_log("REST API启动成功") From 88f661615c398f56f1c6e4f3b9a07a224e297745 Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 15 Mar 2019 02:43:05 -0400 Subject: [PATCH 05/10] [Add] BitmexGateway: make send_order thread-safe --- vnpy/gateway/bitmex/bitmex_gateway.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/vnpy/gateway/bitmex/bitmex_gateway.py b/vnpy/gateway/bitmex/bitmex_gateway.py index 437564fe..e7fe1fc8 100644 --- a/vnpy/gateway/bitmex/bitmex_gateway.py +++ b/vnpy/gateway/bitmex/bitmex_gateway.py @@ -8,6 +8,7 @@ import sys import time from copy import copy from datetime import datetime +from threading import Lock from urllib.parse import urlencode from requests import ConnectionError @@ -131,6 +132,8 @@ class BitmexRestApi(RestClient): self.secret = "" self.order_count = 1_000_000 + self.order_count_lock = Lock() + self.connect_time = 0 def sign(self, request): @@ -196,10 +199,14 @@ class BitmexRestApi(RestClient): self.gateway.write_log("REST API启动成功") + def _new_order_id(self): + with self.order_count_lock: + self.order_count += 1 + return self.order_count + def send_order(self, req: OrderRequest): """""" - self.order_count += 1 - orderid = str(self.connect_time + self.order_count) + orderid = str(self.connect_time + self._new_order_id()) data = { "symbol": req.symbol, From fed48e1a061e8df3023d083ffc26df46d78ddb75 Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 15 Mar 2019 02:43:21 -0400 Subject: [PATCH 06/10] [Add] BitmexGateway: Added some comment to make logic more clear. --- vnpy/gateway/bitmex/bitmex_gateway.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/vnpy/gateway/bitmex/bitmex_gateway.py b/vnpy/gateway/bitmex/bitmex_gateway.py index e7fe1fc8..e610e32e 100644 --- a/vnpy/gateway/bitmex/bitmex_gateway.py +++ b/vnpy/gateway/bitmex/bitmex_gateway.py @@ -89,6 +89,7 @@ class BitmexGateway(BaseGateway): server, proxy_host, proxy_port) self.ws_api.connect(key, secret, server, proxy_host, proxy_port) + # websocket will push all account status on connected, including asset, position and orders. def subscribe(self, req: SubscribeRequest): """""" @@ -279,7 +280,7 @@ class BitmexRestApi(RestClient): self.on_error(exception_type, exception_value, tb, request) def on_send_order(self, data, request): - """""" + """Websocket will push a new order status""" pass def on_cancel_order_error( @@ -293,7 +294,7 @@ class BitmexRestApi(RestClient): self.on_error(exception_type, exception_value, tb, request) def on_cancel_order(self, data, request): - """""" + """Websocket will push a new order status""" pass def on_failed(self, status_code: int, request: Request): From f6d0cda6ce0e68eea982e6860a9c147a8013a8f1 Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 15 Mar 2019 03:00:28 -0400 Subject: [PATCH 07/10] [Mod] WebSocketClient: remove useless lock. --- vnpy/api/websocket/websocket_client.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/vnpy/api/websocket/websocket_client.py b/vnpy/api/websocket/websocket_client.py index 2a1820a8..52f3eb01 100644 --- a/vnpy/api/websocket/websocket_client.py +++ b/vnpy/api/websocket/websocket_client.py @@ -107,13 +107,17 @@ class WebsocketClient(object): """ Send a text string to server. """ - return self._get_ws().send(text, opcode=websocket.ABNF.OPCODE_TEXT) + ws = self._ws + if ws: + ws.send(text, opcode=websocket.ABNF.OPCODE_TEXT) def _send_binary(self, data: bytes): """ Send bytes data to server. """ - return self._get_ws()._send_binary(data) + ws = self._ws + if ws: + ws._send_binary(data) def _reconnect(self): """""" @@ -143,11 +147,6 @@ class WebsocketClient(object): self._ws.close() self._ws = None - def _get_ws(self): - """""" - with self._ws_lock: - return self._ws - def _run(self): """ Keep running till stop is called. @@ -158,7 +157,7 @@ class WebsocketClient(object): # todo: onDisconnect while self._active: try: - ws = self._get_ws() + ws = self._ws if ws: text = ws.recv() @@ -215,7 +214,7 @@ class WebsocketClient(object): def _ping(self): """""" - ws = self._get_ws() + ws = self._ws if ws: ws.send("ping", websocket.ABNF.OPCODE_PING) From c414f561f6d37d2e23c3c3e13decac9e3841168f Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 15 Mar 2019 03:07:01 -0400 Subject: [PATCH 08/10] =?UTF-8?q?[Add]=20BaseGateway:=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E8=AF=B4=E6=98=8E=E5=9B=9E=E8=B0=83=E5=87=BD=E6=95=B0on=5Fxxxx?= =?UTF-8?q?=E7=9A=84=E5=8F=82=E6=95=B0=E5=BF=85=E9=A1=BB=E4=B8=BA=E5=B8=B8?= =?UTF-8?q?=E9=87=8F=E7=9A=84=E8=A6=81=E6=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/trader/gateway.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/vnpy/trader/gateway.py b/vnpy/trader/gateway.py index 598120c3..2442f990 100644 --- a/vnpy/trader/gateway.py +++ b/vnpy/trader/gateway.py @@ -36,6 +36,8 @@ class BaseGateway(ABC): # How to implement a gateway: + --- + ## Basics A gateway should satisfies: * this class should be thread-safe: * all methods should be thread-safe @@ -44,10 +46,12 @@ class BaseGateway(ABC): * satisfies all requirements written in docstring for every method and callbacks. * automatically reconnect if connection lost. - methods must implements: + --- + ## methods must implements: all @abstractmethod - callbacks must response manually: + --- + ## callbacks must response manually: * on_tick * on_trade * on_order @@ -55,6 +59,13 @@ class BaseGateway(ABC): * on_account * on_contract + All the XxxData passed to callback should be constant, which means that + the object should not be modified after passing to on_xxxx. + So if you use a cache to store reference of data, use copy.copy to create a new object + before passing that data into on_xxxx + + + """ # Fields required in setting dict for connect function. From 0eb9630b231e310bc8b5b36be4ca18e07fbc4cb0 Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 15 Mar 2019 03:37:42 -0400 Subject: [PATCH 09/10] [Add] BaseGateway Document: send_order.orderid should be unique --- vnpy/trader/gateway.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vnpy/trader/gateway.py b/vnpy/trader/gateway.py index 2442f990..d1a6248f 100644 --- a/vnpy/trader/gateway.py +++ b/vnpy/trader/gateway.py @@ -185,6 +185,7 @@ class BaseGateway(ABC): implementation should finish the tasks blow: * create an OrderData from req using OrderRequest.create_order_data + * assign a unique(gateway instance scope) id to OrderData.orderid * send request to server * if request is sent, OrderData.status should be set to Status.SUBMITTING * if request is failed to sent, OrderData.status should be set to Status.REJECTED From b8ab027081942cfbe2112e2cc33a0d776b1e90ce Mon Sep 17 00:00:00 2001 From: nanoric Date: Fri, 15 Mar 2019 05:45:53 -0400 Subject: [PATCH 10/10] [Fix] WebsocketClient: removed unused import: make flake8 happy. --- vnpy/api/websocket/websocket_client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/vnpy/api/websocket/websocket_client.py b/vnpy/api/websocket/websocket_client.py index 52f3eb01..5fde1d82 100644 --- a/vnpy/api/websocket/websocket_client.py +++ b/vnpy/api/websocket/websocket_client.py @@ -7,7 +7,6 @@ import traceback from datetime import datetime from threading import Lock, Thread from time import sleep -from typing import Any import websocket