From b0e75c4e529635c3e110b8acf976b3d3163c2230 Mon Sep 17 00:00:00 2001 From: msincenselee Date: Tue, 10 Mar 2020 15:44:50 +0800 Subject: [PATCH] =?UTF-8?q?[=E5=A2=9E=E5=8A=A0]=20=E5=B8=81=E5=AE=89?= =?UTF-8?q?=E5=90=88=E7=BA=A6=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/app/cta_strategy_pro/__init__.py | 12 +- vnpy/app/cta_strategy_pro/template.py | 1 + vnpy/component/base.py | 2 +- vnpy/gateway/binance/binance_gateway.py | 4 +- vnpy/gateway/binancef/__init__.py | 1 + vnpy/gateway/binancef/binancef_gateway.py | 848 ++++++++++++++++++++++ vnpy/gateway/ctp/ctp_gateway.py | 2 +- 7 files changed, 865 insertions(+), 5 deletions(-) create mode 100644 vnpy/gateway/binancef/__init__.py create mode 100644 vnpy/gateway/binancef/binancef_gateway.py diff --git a/vnpy/app/cta_strategy_pro/__init__.py b/vnpy/app/cta_strategy_pro/__init__.py index 08cf7428..32b57c6a 100644 --- a/vnpy/app/cta_strategy_pro/__init__.py +++ b/vnpy/app/cta_strategy_pro/__init__.py @@ -5,7 +5,17 @@ from .base import APP_NAME, StopOrder from .engine import CtaEngine -from .template import CtaTemplate, CtaSignal, TargetPosTemplate, CtaProTemplate, CtaProFutureTemplate +from .template import ( + Direction, + Offset, + Status, + TickData, + BarData, + TradeData, + OrderData, + CtaTemplate, CtaSignal, TargetPosTemplate, CtaProTemplate, CtaProFutureTemplate) # noqa +from vnpy.trader.utility import BarGenerator, ArrayManager # noqa + from .template_spread import CtaSpreadTemplate diff --git a/vnpy/app/cta_strategy_pro/template.py b/vnpy/app/cta_strategy_pro/template.py index c5699a79..a40a9995 100644 --- a/vnpy/app/cta_strategy_pro/template.py +++ b/vnpy/app/cta_strategy_pro/template.py @@ -18,6 +18,7 @@ from vnpy.trader.utility import virtual, append_data, extract_vt_symbol, get_und from .base import StopOrder, EngineType from vnpy.component.cta_grid_trade import CtaGrid, CtaGridTrade, LOCK_GRID from vnpy.component.cta_position import CtaPosition +from vnpy.component.cta_policy import CtaPolicy class CtaTemplate(ABC): diff --git a/vnpy/component/base.py b/vnpy/component/base.py index 500fd31f..75381dad 100644 --- a/vnpy/component/base.py +++ b/vnpy/component/base.py @@ -5,7 +5,7 @@ import sys from abc import ABC from enum import Enum from logging import INFO, ERROR - +from vnpy.trader.constant import Direction class Area(Enum): """ Kline area """ diff --git a/vnpy/gateway/binance/binance_gateway.py b/vnpy/gateway/binance/binance_gateway.py index b60b3f7b..da1ea1f2 100644 --- a/vnpy/gateway/binance/binance_gateway.py +++ b/vnpy/gateway/binance/binance_gateway.py @@ -99,9 +99,9 @@ class BinanceGateway(BaseGateway): exchanges = [Exchange.BINANCE] - def __init__(self, event_engine): + def __init__(self, event_engine, gateway_name="BINANCE"): """Constructor""" - super().__init__(event_engine, "BINANCE") + super().__init__(event_engine, gateway_name) self.trade_ws_api = BinanceTradeWebsocketApi(self) self.market_ws_api = BinanceDataWebsocketApi(self) diff --git a/vnpy/gateway/binancef/__init__.py b/vnpy/gateway/binancef/__init__.py new file mode 100644 index 00000000..396f6074 --- /dev/null +++ b/vnpy/gateway/binancef/__init__.py @@ -0,0 +1 @@ +from .binancef_gateway import BinancefGateway diff --git a/vnpy/gateway/binancef/binancef_gateway.py b/vnpy/gateway/binancef/binancef_gateway.py new file mode 100644 index 00000000..2f11276f --- /dev/null +++ b/vnpy/gateway/binancef/binancef_gateway.py @@ -0,0 +1,848 @@ +""" +Gateway for Binance Crypto Exchange. +""" + +import urllib +import hashlib +import hmac +import time +from copy import copy +from datetime import datetime, timedelta +from enum import Enum +from threading import Lock +from typing import Dict, List + +from vnpy.api.rest import RestClient, Request +from vnpy.api.websocket import WebsocketClient +from vnpy.trader.constant import ( + Direction, + Exchange, + Product, + Status, + OrderType, + Interval +) +from vnpy.trader.gateway import BaseGateway +from vnpy.trader.object import ( + TickData, + OrderData, + TradeData, + AccountData, + ContractData, + PositionData, + BarData, + OrderRequest, + CancelRequest, + SubscribeRequest, + HistoryRequest +) +from vnpy.trader.event import EVENT_TIMER +from vnpy.event import Event, EventEngine + +REST_HOST: str = "https://fapi.binance.com" +WEBSOCKET_TRADE_HOST: str = "wss://fstream.binance.com/ws/" +WEBSOCKET_DATA_HOST: str = "wss://fstream.binance.com/stream?streams=" + +TESTNET_RESTT_HOST: str = "https://testnet.binancefuture.com" +TESTNET_WEBSOCKET_TRADE_HOST: str = "wss://stream.binancefuture.com/ws/" +TESTNET_WEBSOCKET_DATA_HOST: str = "wss://stream.binancefuture.com/stream?streams=" + +STATUS_BINANCEF2VT: Dict[str, Status] = { + "NEW": Status.NOTTRADED, + "PARTIALLY_FILLED": Status.PARTTRADED, + "FILLED": Status.ALLTRADED, + "CANCELED": Status.CANCELLED, + "REJECTED": Status.REJECTED +} + +ORDERTYPE_VT2BINANCEF: Dict[OrderType, str] = { + OrderType.LIMIT: "LIMIT", + OrderType.MARKET: "MARKET" +} +ORDERTYPE_BINANCEF2VT: Dict[str, OrderType] = {v: k for k, v in ORDERTYPE_VT2BINANCEF.items()} + +DIRECTION_VT2BINANCEF: Dict[Direction, str] = { + Direction.LONG: "BUY", + Direction.SHORT: "SELL" +} +DIRECTION_BINANCEF2VT: Dict[str, Direction] = {v: k for k, v in DIRECTION_VT2BINANCEF.items()} + +INTERVAL_VT2BINANCEF: Dict[Interval, str] = { + Interval.MINUTE: "1m", + Interval.HOUR: "1h", + Interval.DAILY: "1d", +} + +TIMEDELTA_MAP: Dict[Interval, timedelta] = { + Interval.MINUTE: timedelta(minutes=1), + Interval.HOUR: timedelta(hours=1), + Interval.DAILY: timedelta(days=1), +} + + +class Security(Enum): + NONE: int = 0 + SIGNED: int = 1 + API_KEY: int = 2 + + +symbol_name_map: Dict[str, str] = {} + + +class BinancefGateway(BaseGateway): + """ + VN Trader Gateway for Binance connection. + """ + + default_setting = { + "key": "", + "secret": "", + "session_number": 3, + "server": ["TESTNET", "REAL"], + "proxy_host": "", + "proxy_port": 0, + } + + exchanges: Exchange = [Exchange.BINANCE] + + def __init__(self, event_engine: EventEngine, gateway_name="BINANCEF"): + """Constructor""" + super().__init__(event_engine, gateway_name) + + self.trade_ws_api = BinancefTradeWebsocketApi(self) + self.market_ws_api = BinancefDataWebsocketApi(self) + self.rest_api = BinancefRestApi(self) + + def connect(self, setting: dict) -> None: + """""" + key = setting["key"] + secret = setting["secret"] + session_number = setting["session_number"] + server = setting["server"] + proxy_host = setting["proxy_host"] + proxy_port = setting["proxy_port"] + + self.rest_api.connect(key, secret, session_number, server, + proxy_host, proxy_port) + self.market_ws_api.connect(proxy_host, proxy_port, server) + + self.event_engine.register(EVENT_TIMER, self.process_timer_event) + + def subscribe(self, req: SubscribeRequest) -> None: + """""" + self.market_ws_api.subscribe(req) + + def send_order(self, req: OrderRequest) -> str: + """""" + return self.rest_api.send_order(req) + + def cancel_order(self, req: CancelRequest) -> Request: + """""" + self.rest_api.cancel_order(req) + + def query_account(self) -> None: + """""" + pass + + def query_position(self) -> None: + """""" + pass + + def query_history(self, req: HistoryRequest) -> List[BarData]: + """""" + return self.rest_api.query_history(req) + + def close(self) -> None: + """""" + self.rest_api.stop() + self.trade_ws_api.stop() + self.market_ws_api.stop() + + def process_timer_event(self, event: Event) -> None: + """""" + self.rest_api.keep_user_stream() + + +class BinancefRestApi(RestClient): + """ + BINANCE REST API + """ + + def __init__(self, gateway: BinancefGateway): + """""" + super().__init__() + + self.gateway: BinancefGateway = gateway + self.gateway_name: str = gateway.gateway_name + + self.trade_ws_api: BinancefTradeWebsocketApi = self.gateway.trade_ws_api + + self.key: str = "" + self.secret: str = "" + + self.user_stream_key: str = "" + self.keep_alive_count: int = 0 + self.recv_window: int = 5000 + self.time_offset: int = 0 + + self.order_count: int = 1_000_000 + self.order_count_lock: Lock = Lock() + self.connect_time: int = 0 + + def sign(self, request: Request) -> Request: + """ + Generate BINANCE signature. + """ + security = request.data["security"] + if security == Security.NONE: + request.data = None + return request + + if request.params: + path = request.path + "?" + urllib.parse.urlencode(request.params) + else: + request.params = dict() + path = request.path + + if security == Security.SIGNED: + timestamp = int(time.time() * 1000) + + if self.time_offset > 0: + timestamp -= abs(self.time_offset) + elif self.time_offset < 0: + timestamp += abs(self.time_offset) + + request.params["timestamp"] = timestamp + + query = urllib.parse.urlencode(sorted(request.params.items())) + signature = hmac.new(self.secret, query.encode( + "utf-8"), hashlib.sha256).hexdigest() + + query += "&signature={}".format(signature) + path = request.path + "?" + query + + request.path = path + request.params = {} + request.data = {} + + # Add headers + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "Accept": "application/json", + "X-MBX-APIKEY": self.key + } + + if security in [Security.SIGNED, Security.API_KEY]: + request.headers = headers + + return request + + def connect( + self, + key: str, + secret: str, + session_number: int, + server: str, + proxy_host: str, + proxy_port: int + ) -> None: + """ + Initialize connection to REST server. + """ + self.key = key + self.secret = secret.encode() + self.proxy_port = proxy_port + self.proxy_host = proxy_host + self.server = server + + self.connect_time = ( + int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count + ) + + if self.server == "REAL": + self.init(REST_HOST, proxy_host, proxy_port) + else: + self.init(TESTNET_RESTT_HOST, proxy_host, proxy_port) + + self.start(session_number) + + self.gateway.write_log("REST API启动成功") + + self.query_time() + self.query_account() + self.query_position() + self.query_order() + self.query_contract() + self.start_user_stream() + + def query_time(self) -> Request: + """""" + data = { + "security": Security.NONE + } + path = "/fapi/v1/time" + + return self.add_request( + "GET", + path, + callback=self.on_query_time, + data=data + ) + + def query_account(self) -> Request: + """""" + data = {"security": Security.SIGNED} + + self.add_request( + method="GET", + path="/fapi/v1/account", + callback=self.on_query_account, + data=data + ) + + def query_position(self) -> Request: + """""" + data = {"security": Security.SIGNED} + + self.add_request( + method="GET", + path="/fapi/v1/positionRisk", + callback=self.on_query_position, + data=data + ) + + def query_order(self) -> Request: + """""" + data = {"security": Security.SIGNED} + + self.add_request( + method="GET", + path="/fapi/v1/openOrders", + callback=self.on_query_order, + data=data + ) + + def query_contract(self) -> Request: + """""" + data = { + "security": Security.NONE + } + self.add_request( + method="GET", + path="/fapi/v1/exchangeInfo", + callback=self.on_query_contract, + data=data + ) + + def _new_order_id(self) -> int: + """""" + with self.order_count_lock: + self.order_count += 1 + return self.order_count + + def send_order(self, req: OrderRequest) -> str: + """""" + orderid = str(self.connect_time + self._new_order_id()) + order = req.create_order_data( + orderid, + self.gateway_name + ) + self.gateway.on_order(order) + + data = { + "security": Security.SIGNED + } + + params = { + "symbol": req.symbol, + "timeInForce": "GTC", + "side": DIRECTION_VT2BINANCEF[req.direction], + "type": ORDERTYPE_VT2BINANCEF[req.type], + "price": float(req.price), + "quantity": int(req.volume), + "newClientOrderId": orderid, + "newOrderRespType": "ACK" + } + + self.add_request( + method="POST", + path="/fapi/v1/order", + callback=self.on_send_order, + data=data, + params=params, + extra=order, + on_error=self.on_send_order_error, + on_failed=self.on_send_order_failed + ) + + return order.vt_orderid + + def cancel_order(self, req: CancelRequest) -> Request: + """""" + data = { + "security": Security.SIGNED + } + + params = { + "symbol": req.symbol, + "origClientOrderId": req.orderid + } + + self.add_request( + method="DELETE", + path="/fapi/v1/order", + callback=self.on_cancel_order, + params=params, + data=data, + extra=req + ) + + def start_user_stream(self) -> Request: + """""" + data = { + "security": Security.API_KEY + } + + self.add_request( + method="POST", + path="/fapi/v1/listenKey", + callback=self.on_start_user_stream, + data=data + ) + + def keep_user_stream(self) -> Request: + """""" + self.keep_alive_count += 1 + if self.keep_alive_count < 1800: + return + + data = { + "security": Security.API_KEY + } + + params = { + "listenKey": self.user_stream_key + } + + self.add_request( + method="PUT", + path="/fapi/v1/listenKey", + callback=self.on_keep_user_stream, + params=params, + data=data + ) + + def on_query_time(self, data: dict, request: Request) -> None: + """""" + local_time = int(time.time() * 1000) + server_time = int(data["serverTime"]) + self.time_offset = local_time - server_time + + def on_query_account(self, data: dict, request: Request) -> None: + """""" + for asset in data["assets"]: + account = AccountData( + accountid=asset["asset"], + balance=float(asset["walletBalance"]), + frozen=float(asset["maintMargin"]), + gateway_name=self.gateway_name + ) + + if account.balance: + self.gateway.on_account(account) + + self.gateway.write_log("账户资金查询成功") + + def on_query_position(self, data: dict, request: Request) -> None: + """""" + for d in data: + position = PositionData( + symbol=d["symbol"], + exchange=Exchange.BINANCE, + direction=Direction.NET, + volume=int(float(d["positionAmt"])), + price=float(d["entryPrice"]), + pnl=float(d["unRealizedProfit"]), + gateway_name=self.gateway_name, + ) + + if position.volume: + self.gateway.on_position(position) + + self.gateway.write_log("持仓信息查询成功") + + def on_query_order(self, data: dict, request: Request) -> None: + """""" + for d in data: + dt = datetime.fromtimestamp(d["time"] / 1000) + time = dt.strftime("%Y-%m-%d %H:%M:%S") + + order = OrderData( + orderid=d["clientOrderId"], + symbol=d["symbol"], + exchange=Exchange.BINANCE, + price=float(d["price"]), + volume=float(d["origQty"]), + type=ORDERTYPE_BINANCEF2VT[d["type"]], + direction=DIRECTION_BINANCEF2VT[d["side"]], + traded=float(d["executedQty"]), + status=STATUS_BINANCEF2VT.get(d["status"], None), + time=time, + gateway_name=self.gateway_name, + ) + self.gateway.on_order(order) + + self.gateway.write_log("委托信息查询成功") + + def on_query_contract(self, data: dict, request: Request) -> None: + """处理合约配置""" + import json + rate_limits = data.get('rateLimits') + rate_limits = json.dumps(rate_limits, indent=2) + self.gateway.write_log(f'速率限制:{rate_limits}') + + for d in data["symbols"]: + base_currency = d["baseAsset"] + quote_currency = d["quoteAsset"] + name = f"{base_currency.upper()}/{quote_currency.upper()}" + + pricetick = 1 + min_volume = 1 + + for f in d["filters"]: + if f["filterType"] == "PRICE_FILTER": + pricetick = float(f["tickSize"]) + elif f["filterType"] == "LOT_SIZE": + min_volume = float(f["stepSize"]) + + contract = ContractData( + symbol=d["symbol"], + exchange=Exchange.BINANCE, + name=name, + pricetick=pricetick, + size=1, + min_volume=min_volume, + product=Product.FUTURES, + history_data=True, + gateway_name=self.gateway_name, + ) + self.gateway.on_contract(contract) + + symbol_name_map[contract.symbol] = contract.name + + self.gateway.write_log("合约信息查询成功") + + def on_send_order(self, data: dict, request: Request) -> None: + """""" + pass + + def on_send_order_failed(self, status_code: str, request: Request) -> None: + """ + Callback when sending order failed on server. + """ + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) + + msg = f"委托失败,状态码:{status_code},信息:{request.response.text}" + self.gateway.write_log(msg) + + def on_send_order_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ) -> None: + """ + Callback when sending order caused exception. + """ + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) + + # Record exception if not ConnectionError + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) + + def on_cancel_order(self, data: dict, request: Request) -> None: + """""" + pass + + def on_start_user_stream(self, data: dict, request: Request) -> None: + """""" + self.user_stream_key = data["listenKey"] + self.keep_alive_count = 0 + + if self.server == "REAL": + url = WEBSOCKET_TRADE_HOST + self.user_stream_key + else: + url = TESTNET_WEBSOCKET_TRADE_HOST + self.user_stream_key + + self.trade_ws_api.connect(url, self.proxy_host, self.proxy_port) + + def on_keep_user_stream(self, data: dict, request: Request) -> None: + """""" + pass + + def query_history(self, req: HistoryRequest) -> List[OrderData]: + """""" + history = [] + limit = 1000 + start_time = int(datetime.timestamp(req.start)) + + while True: + # Create query params + params = { + "symbol": req.symbol, + "interval": INTERVAL_VT2BINANCEF[req.interval], + "limit": limit, + "startTime": start_time * 1000, # convert to millisecond + } + + # Add end time if specified + if req.end: + end_time = int(datetime.timestamp(req.end)) + params["endTime"] = end_time * 1000 # convert to millisecond + + # Get response from server + resp = self.request( + "GET", + "/api/v1/klines", + data={"security": Security.NONE}, + params=params + ) + + # Break if request failed with other status code + if resp.status_code // 100 != 2: + msg = f"获取历史数据失败,状态码:{resp.status_code},信息:{resp.text}" + self.gateway.write_log(msg) + break + else: + data = resp.json() + if not data: + msg = f"获取历史数据为空,开始时间:{start_time}" + self.gateway.write_log(msg) + break + + buf = [] + + for l in data: + dt = datetime.fromtimestamp(l[0] / 1000) # convert to second + + bar = BarData( + symbol=req.symbol, + exchange=req.exchange, + datetime=dt, + interval=req.interval, + volume=float(l[5]), + open_price=float(l[1]), + high_price=float(l[2]), + low_price=float(l[3]), + close_price=float(l[4]), + gateway_name=self.gateway_name + ) + buf.append(bar) + + history.extend(buf) + + begin = buf[0].datetime + end = buf[-1].datetime + msg = f"获取历史数据成功,{req.symbol} - {req.interval.value},{begin} - {end}" + self.gateway.write_log(msg) + + # Break if total data count less than limit (latest date collected) + if len(data) < limit: + break + + # Update start time + start_dt = bar.datetime + TIMEDELTA_MAP[req.interval] + start_time = int(datetime.timestamp(start_dt)) + + return history + + +class BinancefTradeWebsocketApi(WebsocketClient): + """""" + + def __init__(self, gateway: BinancefGateway): + """""" + super().__init__() + + self.gateway: BinancefGateway = gateway + self.gateway_name: str = gateway.gateway_name + + def connect(self, url: str, proxy_host: str, proxy_port: int) -> None: + """""" + self.init(url, proxy_host, proxy_port) + self.start() + + def on_connected(self) -> None: + """""" + self.gateway.write_log("交易Websocket API连接成功") + + def on_packet(self, packet: dict) -> None: # type: (dict)->None + """""" + if packet["e"] == "ACCOUNT_UPDATE": + self.on_account(packet) + elif packet["e"] == "ORDER_TRADE_UPDATE": + self.on_order(packet) + + def on_account(self, packet: dict) -> None: + """""" + for acc_data in packet["a"]["B"]: + account = AccountData( + accountid=acc_data["a"], + balance=float(acc_data["wb"]), + frozen=float(acc_data["wb"]) - float(acc_data["cw"]), + gateway_name=self.gateway_name + ) + + if account.balance: + self.gateway.on_account(account) + + for pos_data in packet["a"]["P"]: + position = PositionData( + symbol=pos_data["s"], + exchange=Exchange.BINANCE, + direction=Direction.NET, + volume=int(float(pos_data["pa"])), + price=float(pos_data["ep"]), + pnl=float(pos_data["cr"]), + gateway_name=self.gateway_name, + ) + self.gateway.on_position(position) + + def on_order(self, packet: dict) -> None: + """""" + dt = datetime.fromtimestamp(packet["E"] / 1000) + time = dt.strftime("%Y-%m-%d %H:%M:%S") + + ord_data = packet["o"] + + order = OrderData( + symbol=ord_data["s"], + exchange=Exchange.BINANCE, + orderid=str(ord_data["c"]), + type=ORDERTYPE_BINANCEF2VT[ord_data["o"]], + direction=DIRECTION_BINANCEF2VT[ord_data["S"]], + price=float(ord_data["p"]), + volume=float(ord_data["q"]), + traded=float(ord_data["z"]), + status=STATUS_BINANCEF2VT[ord_data["X"]], + time=time, + gateway_name=self.gateway_name + ) + + self.gateway.on_order(order) + + # Push trade event + trade_volume = float(ord_data["l"]) + if not trade_volume: + return + + trade_dt = datetime.fromtimestamp(ord_data["T"] / 1000) + trade_time = trade_dt.strftime("%Y-%m-%d %H:%M:%S") + + trade = TradeData( + symbol=order.symbol, + exchange=order.exchange, + orderid=order.orderid, + tradeid=ord_data["t"], + direction=order.direction, + price=float(ord_data["L"]), + volume=trade_volume, + time=trade_time, + gateway_name=self.gateway_name, + ) + self.gateway.on_trade(trade) + + +class BinancefDataWebsocketApi(WebsocketClient): + """""" + + def __init__(self, gateway: BinancefGateway): + """""" + super().__init__() + + self.gateway: BinancefGateway = gateway + self.gateway_name: str = gateway.gateway_name + + self.ticks: Dict[str, TickData] = {} + + def connect( + self, + proxy_host: str, + proxy_port: int, + server: str + ) -> None: + """""" + self.proxy_host = proxy_host + self.proxy_port = proxy_port + self.server = server + + def on_connected(self) -> None: + """""" + self.gateway.write_log("行情Websocket API连接刷新") + + def subscribe(self, req: SubscribeRequest) -> None: + """""" + if req.symbol not in symbol_name_map: + self.gateway.write_log(f"找不到该合约代码{req.symbol}") + return + + # Create tick buf data + tick = TickData( + symbol=req.symbol, + name=symbol_name_map.get(req.symbol, ""), + exchange=Exchange.BINANCE, + datetime=datetime.now(), + gateway_name=self.gateway_name, + ) + self.ticks[req.symbol.lower()] = tick + + # Close previous connection + if self._active: + self.stop() + self.join() + + # Create new connection + channels = [] + for ws_symbol in self.ticks.keys(): + channels.append(ws_symbol + "@ticker") + channels.append(ws_symbol + "@depth5") + + if self.server == "REAL": + url = WEBSOCKET_DATA_HOST + "/".join(channels) + else: + url = TESTNET_WEBSOCKET_DATA_HOST + "/".join(channels) + + self.init(url, self.proxy_host, self.proxy_port) + self.start() + + def on_packet(self, packet: dict) -> None: + """""" + stream = packet["stream"] + data = packet["data"] + + symbol, channel = stream.split("@") + tick = self.ticks[symbol] + + if channel == "ticker": + tick.volume = float(data['v']) + tick.open_price = float(data['o']) + tick.high_price = float(data['h']) + tick.low_price = float(data['l']) + tick.last_price = float(data['c']) + tick.datetime = datetime.fromtimestamp(float(data['E']) / 1000) + else: + bids = data["b"] + for n in range(5): + price, volume = bids[n] + tick.__setattr__("bid_price_" + str(n + 1), float(price)) + tick.__setattr__("bid_volume_" + str(n + 1), float(volume)) + + asks = data["a"] + for n in range(5): + price, volume = asks[n] + tick.__setattr__("ask_price_" + str(n + 1), float(price)) + tick.__setattr__("ask_volume_" + str(n + 1), float(volume)) + + if tick.last_price: + self.gateway.on_tick(copy(tick)) diff --git a/vnpy/gateway/ctp/ctp_gateway.py b/vnpy/gateway/ctp/ctp_gateway.py index 46aef7eb..05efa88a 100644 --- a/vnpy/gateway/ctp/ctp_gateway.py +++ b/vnpy/gateway/ctp/ctp_gateway.py @@ -84,7 +84,7 @@ from vnpy.data.tdx.tdx_common import ( get_cache_json, save_cache_json, TDX_FUTURE_CONFIG) -from vnpy.app.cta_strategy_pro.base import ( +from vnpy.component.base import ( MARKET_DAY_ONLY, NIGHT_MARKET_23, NIGHT_MARKET_SQ2 )