From a25051087d102031643e944df765bdda1c660fb7 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Tue, 25 Jun 2019 16:25:34 +0800 Subject: [PATCH] [Mod] complete test of AlpacaGateway --- examples/vn_trader/run.py | 82 +-- vnpy/api/rest/rest_client.py | 6 +- vnpy/gateway/alpaca/alpaca_gateway.py | 747 +++++++++++++++----------- 3 files changed, 474 insertions(+), 361 deletions(-) diff --git a/examples/vn_trader/run.py b/examples/vn_trader/run.py index c7cd65c5..a364d98d 100644 --- a/examples/vn_trader/run.py +++ b/examples/vn_trader/run.py @@ -3,30 +3,31 @@ from vnpy.event import EventEngine from vnpy.trader.engine import MainEngine from vnpy.trader.ui import MainWindow, create_qapp -from vnpy.gateway.binance import BinanceGateway -from vnpy.gateway.bitmex import BitmexGateway -from vnpy.gateway.futu import FutuGateway -from vnpy.gateway.ib import IbGateway -from vnpy.gateway.ctp import CtpGateway +# from vnpy.gateway.binance import BinanceGateway +# from vnpy.gateway.bitmex import BitmexGateway +# from vnpy.gateway.futu import FutuGateway +# from vnpy.gateway.ib import IbGateway +# from vnpy.gateway.ctp import CtpGateway # from vnpy.gateway.ctptest import CtptestGateway -from vnpy.gateway.femas import FemasGateway -from vnpy.gateway.tiger import TigerGateway +# from vnpy.gateway.femas import FemasGateway +# from vnpy.gateway.tiger import TigerGateway # from vnpy.gateway.oes import OesGateway -from vnpy.gateway.okex import OkexGateway -from vnpy.gateway.huobi import HuobiGateway -from vnpy.gateway.bitfinex import BitfinexGateway -from vnpy.gateway.onetoken import OnetokenGateway -from vnpy.gateway.okexf import OkexfGateway +# from vnpy.gateway.okex import OkexGateway +# from vnpy.gateway.huobi import HuobiGateway +# from vnpy.gateway.bitfinex import BitfinexGateway +# from vnpy.gateway.onetoken import OnetokenGateway +# from vnpy.gateway.okexf import OkexfGateway # from vnpy.gateway.xtp import XtpGateway -from vnpy.gateway.hbdm import HbdmGateway -from vnpy.gateway.tap import TapGateway +# from vnpy.gateway.hbdm import HbdmGateway +# from vnpy.gateway.tap import TapGateway +from vnpy.gateway.alpaca import AlpacaGateway -from vnpy.app.cta_strategy import CtaStrategyApp -from vnpy.app.csv_loader import CsvLoaderApp -from vnpy.app.algo_trading import AlgoTradingApp -from vnpy.app.cta_backtester import CtaBacktesterApp -from vnpy.app.data_recorder import DataRecorderApp -from vnpy.app.risk_manager import RiskManagerApp +# from vnpy.app.cta_strategy import CtaStrategyApp +# from vnpy.app.csv_loader import CsvLoaderApp +# from vnpy.app.algo_trading import AlgoTradingApp +# from vnpy.app.cta_backtester import CtaBacktesterApp +# from vnpy.app.data_recorder import DataRecorderApp +# from vnpy.app.risk_manager import RiskManagerApp def main(): @@ -37,30 +38,31 @@ def main(): main_engine = MainEngine(event_engine) - main_engine.add_gateway(BinanceGateway) - main_engine.add_gateway(CtpGateway) + # main_engine.add_gateway(BinanceGateway) + # main_engine.add_gateway(CtpGateway) # main_engine.add_gateway(CtptestGateway) - main_engine.add_gateway(FemasGateway) - main_engine.add_gateway(IbGateway) - main_engine.add_gateway(FutuGateway) - main_engine.add_gateway(BitmexGateway) - main_engine.add_gateway(TigerGateway) + # main_engine.add_gateway(FemasGateway) + # main_engine.add_gateway(IbGateway) + # main_engine.add_gateway(FutuGateway) + # main_engine.add_gateway(BitmexGateway) + # main_engine.add_gateway(TigerGateway) # main_engine.add_gateway(OesGateway) - main_engine.add_gateway(OkexGateway) - main_engine.add_gateway(HuobiGateway) - main_engine.add_gateway(BitfinexGateway) - main_engine.add_gateway(OnetokenGateway) - main_engine.add_gateway(OkexfGateway) - main_engine.add_gateway(HbdmGateway) + # main_engine.add_gateway(OkexGateway) + # main_engine.add_gateway(HuobiGateway) + # main_engine.add_gateway(BitfinexGateway) + # main_engine.add_gateway(OnetokenGateway) + # main_engine.add_gateway(OkexfGateway) + # main_engine.add_gateway(HbdmGateway) # main_engine.add_gateway(XtpGateway) - main_engine.add_gateway(TapGateway) + # main_engine.add_gateway(TapGateway) + main_engine.add_gateway(AlpacaGateway) - main_engine.add_app(CtaStrategyApp) - main_engine.add_app(CtaBacktesterApp) - main_engine.add_app(CsvLoaderApp) - main_engine.add_app(AlgoTradingApp) - main_engine.add_app(DataRecorderApp) - main_engine.add_app(RiskManagerApp) + # main_engine.add_app(CtaStrategyApp) + # main_engine.add_app(CtaBacktesterApp) + # main_engine.add_app(CsvLoaderApp) + # main_engine.add_app(AlgoTradingApp) + # main_engine.add_app(DataRecorderApp) + # main_engine.add_app(RiskManagerApp) main_window = MainWindow(main_engine, event_engine) main_window.showMaximized() diff --git a/vnpy/api/rest/rest_client.py b/vnpy/api/rest/rest_client.py index 59ff4a42..4c3e1e50 100644 --- a/vnpy/api/rest/rest_client.py +++ b/vnpy/api/rest/rest_client.py @@ -258,7 +258,11 @@ class RestClient(object): request.response = response status_code = response.status_code if status_code // 100 == 2: # 2xx codes are all successful - json_body = response.json() + if status_code == 204: + json_body = None + else: + json_body = response.json() + request.callback(json_body, request) request.status = RequestStatus.success else: diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index b111ea27..036708c3 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -1,9 +1,10 @@ # encoding: UTF-8 """ - Author: vigarbuaa +Author: vigarbuaa """ import sys +import json from threading import Lock from datetime import datetime from vnpy.api.rest import Request, RestClient @@ -23,7 +24,7 @@ from vnpy.trader.object import ( TickData, OrderData, TradeData, - PositionData, + PositionData, AccountData, ContractData, OrderRequest, @@ -31,32 +32,42 @@ from vnpy.trader.object import ( SubscribeRequest, ) -REST_HOST = "https://api.alpaca.markets" -WEBSOCKET_HOST = "wss://api.alpaca.markets/stream" # Market Data -PAPER_REST_HOST = "https://paper-api.alpaca.markets" -PAPER_WEBSOCKET_HOST = "wss://paper-api.alpaca.markets/stream" # Market Data -KEY = "" -SECRET = "" + +REST_HOST = "https://api.alpaca.markets" # Live trading +WEBSOCKET_HOST = "wss://api.alpaca.markets/stream" +PAPER_REST_HOST = "https://paper-api.alpaca.markets" # Paper Trading +PAPER_WEBSOCKET_HOST = "wss://paper-api.alpaca.markets/stream" + +DATA_REST_HOST = "https://data.alpaca.markets" + STATUS_ALPACA2VT = { - "new": Status.SUBMITTING, - "partial_fill": Status.PARTTRADED, - "fill": Status.ALLTRADED, - "cancelled": Status.CANCELLED, - # "done_for_day": Status.CANCELLED, - "expired": Status.NOTTRADED + "new": Status.NOTTRADED, + "partially_filled": Status.PARTTRADED, + "filled": Status.ALLTRADED, + "canceled": Status.CANCELLED, + "expired": Status.CANCELLED, + "rejected": Status.REJECTED } -DIRECTION_VT2ALPACA = {Direction.LONG: "buy", Direction.SHORT: "sell"} -DIRECTION_ALPACA2VT = {"buy": Direction.LONG, "sell": Direction.SHORT, - "long": Direction.LONG, "short": Direction.SHORT} +DIRECTION_VT2ALPACA = { + Direction.LONG: "buy", + Direction.SHORT: "sell" +} +DIRECTION_ALPACA2VT = { + "buy": Direction.LONG, + "sell": Direction.SHORT, + "long": Direction.LONG, + "short": Direction.SHORT +} ORDERTYPE_VT2ALPACA = { OrderType.LIMIT: "limit", OrderType.MARKET: "market" } ORDERTYPE_ALPACA2VT = {v: k for k, v in ORDERTYPE_VT2ALPACA.items()} -GLOBAL_ORDER = {} + +LOCAL_SYS_MAP = {} class AlpacaGateway(BaseGateway): @@ -65,40 +76,41 @@ class AlpacaGateway(BaseGateway): """ default_setting = { - "key": "", - "secret": "", - "session": 3, - "server": ["REAL", "PAPER"], - "proxy_host": "127.0.0.1", - "proxy_port": 1080, + "KEY ID": "", + "Secret Key": "", + "会话数": 10, + "服务器": ["REAL", "PAPER"] } + exchanges = [Exchange.SMART] + def __init__(self, event_engine): """Constructor""" - super(AlpacaGateway, self).__init__(event_engine, "ALPACA") + super().__init__(event_engine, "ALPACA") self.rest_api = AlpacaRestApi(self) self.ws_api = AlpacaWebsocketApi(self) - self.order_map = {} + self.data_rest_api = AlpacaDataRestApi(self) def connect(self, setting: dict): """""" - key = setting["key"] - secret = setting["secret"] - session = setting["session"] - proxy_host = setting["proxy_host"] - proxy_port = setting["proxy_port"] - env = setting['server'] - rest_url = REST_HOST if env == "REAL" else PAPER_REST_HOST - websocket_url = WEBSOCKET_HOST if env == "REAL" else PAPER_WEBSOCKET_HOST - self.rest_api.connect(key, secret, session, - proxy_host, proxy_port, rest_url) - self.ws_api.connect(key, secret, proxy_host, proxy_port, websocket_url) + key = setting["KEY ID"] + secret = setting["Secret Key"] + session = setting["会话数"] + server = setting["服务器"] + + rest_url = REST_HOST if server == "REAL" else PAPER_REST_HOST + websocket_url = WEBSOCKET_HOST if server == "REAL" else PAPER_WEBSOCKET_HOST + + self.rest_api.connect(key, secret, session, rest_url) + self.data_rest_api.connect(key, secret, session) + self.ws_api.connect(key, secret, websocket_url) + self.init_query() def subscribe(self, req: SubscribeRequest): """""" - self.ws_api.subscribe(req) + self.data_rest_api.subscribe(req) def send_order(self, req: OrderRequest): """""" @@ -119,6 +131,7 @@ class AlpacaGateway(BaseGateway): def close(self): """""" self.rest_api.stop() + self.data_rest_api.stop() self.ws_api.stop() def init_query(self): @@ -128,6 +141,8 @@ class AlpacaGateway(BaseGateway): def process_timer_event(self, event: Event): """""" + self.data_rest_api.query_bar() + self.count += 1 if self.count < 5: return @@ -142,9 +157,9 @@ class AlpacaRestApi(RestClient): Alpaca REST API """ - def __init__(self, gateway: BaseGateway): + def __init__(self, gateway: AlpacaGateway): """""" - super(AlpacaRestApi, self).__init__() + super().__init__() self.gateway = gateway self.gateway_name = gateway.gateway_name @@ -154,46 +169,10 @@ class AlpacaRestApi(RestClient): self.order_count = 1_000_000 self.order_count_lock = Lock() + self.connect_time = 0 - self.order_dict = {} - def query_account(self): - path = f"/v1/account" - self.add_request( - method="GET", - path=path, - callback=self.on_query_account - ) - - def on_query_account(self, data, request): - account = AccountData( - accountid=data['id'], - balance=float(data['cash']), - frozen=float(data['cash']) - float(data['buying_power']), - gateway_name=self.gateway_name - ) - self.gateway.on_account(account) - - def query_position(self): - path = f"/v1/positions" - self.add_request( - method="GET", - path=path, - callback=self.on_query_position - ) - - def on_query_position(self, data, request): - for d in data: - position = PositionData( - symbol=d['symbol'], - exchange=Exchange.ALPACA, - direction=DIRECTION_ALPACA2VT[d['side']], - volume=d['qty'], - price=round(d['avg_entry_price'], 3), - pnl=d['unrealized_pl'], - gateway_name=self.gateway_name, - ) - self.gateway.on_position(position) + self.cancel_reqs = {} def sign(self, request): """ @@ -202,60 +181,226 @@ class AlpacaRestApi(RestClient): headers = { "APCA-API-KEY-ID": self.key, "APCA-API-SECRET-KEY": self.secret, - 'Content-Type': 'application/json' + "Content-Type": "application/json" } request.headers = headers request.allow_redirects = False + request.data = json.dumps(request.data) return request - def _new_order_id(self): - with self.order_count_lock: - self.order_count += 1 - return self.order_count - def connect( self, key: str, secret: str, session_num: int, - proxy_host: str, - proxy_port: int, url: str, ): """ - Initialize connection to REST server. + Initialize connection to REST server. """ self.key = key self.secret = secret - self.init(url, proxy_host, proxy_port) - self.start(session_num) + self.connect_time = ( int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count ) - self.gateway.write_log("ALPACA REST API启动成功") + + self.init(url) + self.start(session_num) + + self.gateway.write_log("REST API启动成功") + self.query_contract() self.query_account() self.query_position() - # self.query_contracts() + self.query_order() + + def query_contract(self): + """""" + params = {"status": "active"} + + self.add_request( + "GET", + "/v2/assets", + params=params, + callback=self.on_query_contract + ) + + def query_account(self): + """""" + self.add_request( + method="GET", + path="/v2/account", + callback=self.on_query_account + ) + + def query_position(self): + """""" + self.add_request( + method="GET", + path="/v2/positions", + callback=self.on_query_position + ) + + def query_order(self): + """""" + params = { + "status": "open" + } + + self.add_request( + method="GET", + path="/v2/orders", + params=params, + callback=self.on_query_order + ) + + def _new_order_id(self): + """""" + with self.order_count_lock: + self.order_count += 1 + return self.order_count + + def send_order(self, req: OrderRequest): + """""" + local_orderid = str(self.connect_time + self._new_order_id()) + + data = { + "symbol": req.symbol, + "qty": str(req.volume), + "side": DIRECTION_VT2ALPACA[req.direction], + "type": ORDERTYPE_VT2ALPACA[req.type], + "time_in_force": "day", + "client_order_id": local_orderid + } + + if data["type"] == "limit": + data["limit_price"] = str(req.price) + + order = req.create_order_data(local_orderid, self.gateway_name) + self.gateway.on_order(order) + + self.add_request( + "POST", + "/v2/orders", + callback=self.on_send_order, + data=data, + extra=order, + on_failed=self.on_send_order_failed, + on_error=self.on_send_order_error, + ) + + return order.vt_orderid + + def cancel_order(self, req: CancelRequest): + """""" + sys_orderid = LOCAL_SYS_MAP.get(req.orderid, None) + if not sys_orderid: + self.cancel_reqs[req.orderid] = req + return + + path = f"/v2/orders/{sys_orderid}" + + self.add_request( + "DELETE", + path, + callback=self.on_cancel_order, + extra=req + ) + + def on_query_contract(self, data, request: Request): + """""" + for d in data: + symbol = d["symbol"] + + contract = ContractData( + symbol=symbol, + exchange=Exchange.SMART, + name=symbol, + product=Product.SPOT, + size=1, + pricetick=0.01, + gateway_name=self.gateway_name + ) + self.gateway.on_contract(contract) + + self.gateway.write_log("合约信息查询成功") + + def on_query_account(self, data, request): + """""" + account = AccountData( + accountid=data["id"], + balance=float(data["equity"]), + gateway_name=self.gateway_name + ) + self.gateway.on_account(account) + + def on_query_position(self, data, request): + """""" + for d in data: + position = PositionData( + symbol=d["symbol"], + exchange=Exchange.SMART, + direction=DIRECTION_ALPACA2VT[d["side"]], + volume=int(d["qty"]), + price=float(d["avg_entry_price"]), + pnl=float(d["unrealized_pl"]), + gateway_name=self.gateway_name, + ) + self.gateway.on_position(position) + + def update_order(self, d: dict): + """""" + sys_orderid = d["id"] + local_orderid = d["client_order_id"] + LOCAL_SYS_MAP[local_orderid] = sys_orderid + + direction = DIRECTION_ALPACA2VT[d["side"]] + order_type = ORDERTYPE_ALPACA2VT[d["type"]] + + order = OrderData( + orderid=local_orderid, + symbol=d["symbol"], + exchange=Exchange.SMART, + price=float(d["limit_price"]), + volume=float(d["qty"]), + type=order_type, + direction=direction, + traded=float(d["filled_qty"]), + status=STATUS_ALPACA2VT.get(d["status"], Status.SUBMITTING), + time=d["created_at"], + gateway_name=self.gateway_name, + ) + self.gateway.on_order(order) + + def on_query_order(self, data, request): + """""" + for d in data: + self.update_order(d) + + self.gateway.write_log("委托信息查询成功") def on_send_order(self, data, request: Request): - remote_order_id = data['id'] - order = request.extra - self.order_dict[order.orderid] = remote_order_id - self.gateway.on_order(order) - GLOBAL_ORDER[remote_order_id] = order + """""" + self.update_order(data) - def on_failed_order(self, status_code: int, request: Request): + order = request.extra + if order.orderid in self.cancel_reqs: + req = self.cancel_reqs.pop(order.orderid) + self.cancel_order(req) + + def on_send_order_failed(self, status_code: int, request: Request): """ Callback to handle request failed. """ order = request.extra order.status = Status.REJECTED self.gateway.on_order(order) + msg = f"请求失败,状态码:{status_code},信息:{request.response.text}" self.gateway.write_log(msg) - def on_error_order( + def on_send_order_error( self, exception_type: type, exception_value: Exception, tb, request: Request ): """ @@ -264,167 +409,73 @@ class AlpacaRestApi(RestClient): order = request.extra order.status = Status.REJECTED self.gateway.on_order(order) + msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" - print('debug on_error', msg) self.gateway.write_log(msg) sys.stderr.write( self.exception_detail(exception_type, exception_value, tb, request) ) - def cancel_order(self, req: CancelRequest): - """""" - order_id = req.orderid - remote_order_id = self.order_dict[order_id] - if remote_order_id is None: - print("[error]: can not get remote_order_id from local dict!") - return - path = "/v1/orders/" + str(remote_order_id) - self.add_request( - "DELETE", - path, - callback=self.on_cancel_order, - on_error=self.on_cancel_order_error, - extra=req - ) - print("come to cancel_order", order_id) - def on_cancel_order(self, data, request): - """Websocket will push a new order status""" - pass - - def on_cancel_order_error(self, exception_type: type, exception_value: Exception, tb, request: Request): - # Record exception if not ConnectionError - if not issubclass(exception_type, ConnectionError): - self.on_error(exception_type, exception_value, tb, request) - - def send_order(self, req: OrderRequest): - orderid = str(self.connect_time + self._new_order_id()) - raw_dict = { - "symbol": req.symbol, - "qty": int(req.volume), - "side": DIRECTION_VT2ALPACA[req.direction], - "type": ORDERTYPE_VT2ALPACA[req.type], - "time_in_force": 'day', - } - if raw_dict['type'] == "limit": - raw_dict['limit_price'] = float(req.price) - - data = raw_dict - order = req.create_order_data(orderid, self.gateway_name) - print("debug send_order orderBody extra: ", order) - self.add_request( - "POST", - "/v1/orders", - callback=self.on_send_order, - data=data, - extra=order, - on_failed=self.on_failed_order, - on_error=self.on_error_order, - # json_str=data, - ) - print("debug send_order ret val : ", order.vt_orderid) - return order.vt_orderid - - def on_query_contracts(self, data, request: Request): - for instrument_data in data: - symbol = instrument_data['symbol'] - contract = ContractData( - symbol=symbol, - exchange=Exchange.ALPACA, - name=symbol, - product=Product.SPOT, - size=1, - pricetick=0.01, - gateway_name=self.gateway_name - ) - self.on_contract(contract) - - def on_failed_query_contracts(self, status_code: int, request: Request): - pass - - def on_error_query_contracts(self, exception_type: type, exception_value: Exception, tb, request: Request): - pass - - def query_contracts(self): - params = {"status": "active"} - self.add_request( - "GET", - "/v1/assets", - params=params, - callback=self.on_query_contracts, - on_failed=self.on_failed_query_contracts, - on_error=self.on_error_query_contracts, - ) + """""" + req = request.extra + msg = f"撤单成功,委托号:{req.orderid}" + self.gateway.write_log(msg) class AlpacaWebsocketApi(WebsocketClient): """""" - def __init__(self, gateway): + def __init__(self, gateway: AlpacaGateway): """""" - super(AlpacaWebsocketApi, self).__init__() + super().__init__() self.gateway = gateway self.gateway_name = gateway.gateway_name - self.order_id = 1_000_000 - # self.date = int(datetime.now().strftime('%y%m%d%H%M%S')) * self.orderId + + self.trade_count = 0 + self.key = "" self.secret = "" - self.callbacks = { - "trade": self.on_tick, - "orderBook10": self.on_depth, - "execution": self.on_trade, - "order": self.on_order, - "position": self.on_position, - "margin": self.on_account, - "instrument": self.on_contract, - } - - self.ticks = {} - self.accounts = {} - self.orders = {} - self.trades = set() - self.tickDict = {} - self.bidDict = {} - self.askDict = {} - self.orderLocalDict = {} - self.channelDict = {} # ChannelID : (Channel, Symbol) - self.channels = ["account_updates", "trade_updates"] - def connect( - self, key: str, secret: str, proxy_host: str, proxy_port: int, url: str + self, key: str, secret: str, url: str ): """""" self.key = key self.secret = secret - self.init(url, proxy_host, proxy_port) + + self.init(url) self.start() def authenticate(self): """""" - params = {"action": "authenticate", "data": { - "key_id": self.key, "secret_key": self.secret - }} + params = { + "action": "authenticate", + "data": { + "key_id": self.key, + "secret_key": self.secret + } + } self.send_packet(params) - def on_authenticate(self): + def on_authenticate(self, data): """""" - params = {"action": "listen", "data": { - "streams": self.channels - }} + if data["status"] == "authorized": + self.gateway.write_log("Websocket API登录成功") + else: + self.gateway.write_log("Websocket API登录失败") + return + + params = { + "action": "listen", + "data": { + "streams": ["trade_updates", "account_updates"] + } + } self.send_packet(params) - def subscribe(self, req: SubscribeRequest): - self.channels.append("A." + req.symbol) - self.channels.append("Q." + req.symbol) - print("debug subscribe: {} ".format(self.channels)) - params = {"action": "listen", "data": { - "streams": self.channels - }} - self.send_packet(params) - def on_connected(self): """""" self.gateway.write_log("Websocket API连接成功") @@ -436,109 +487,165 @@ class AlpacaWebsocketApi(WebsocketClient): def on_packet(self, packet: dict): """""" - print("debug on_packet: ", packet) - if "stream" in packet and "data" in packet: - stream_ret = packet['stream'] - data_ret = packet['data'] - if(stream_ret == "authorization"): - self.handle_auth(packet) - elif(stream_ret == "listening"): - self.gateway.write_log("listening {}".format(data_ret)) - else: - self.on_data(packet) - else: - print("unrecognize msg", packet) - - def on_data(self, data): - print("on_data is {}".format(data)) - stream_ret = data['stream'] - data_ret = data['data'] - if(stream_ret == "account_updates"): - frozen = float(data_ret['cash']) - float(data_ret['cash_withdrawable']) + stream = packet["stream"] + data = packet["data"] - account = AccountData( - accountid=data_ret['id'], - balance=float(data_ret['cash']), - frozen=frozen, - gateway_name=self.gateway_name - ) - self.gateway.on_account(account) - elif(stream_ret == "trade_updates"): - d = data_ret['order'] - order_id = d['id'] - order = GLOBAL_ORDER[order_id] - local_order_id = order.orderid - order.status = STATUS_ALPACA2VT[data_ret['event']] - self.gateway.on_order(order) - if (data_ret['event'] == "fill" or data_ret['event'] == "partial_fill"): - trade = TradeData( - symbol=d["symbol"], - exchange=Exchange.ALPACA, - orderid=local_order_id, - tradeid=d['id'], - direction=DIRECTION_ALPACA2VT[d["side"]], - price=d["filled_avg_price"], - volume=d["filled_qty"], - time=data_ret["timestamp"][11:19], - gateway_name=self.gateway_name, - ) - self.gateway.on_trade(trade) - else: - pass - - def handle_auth(self, data): - # stream_ret = data['stream'] - data_ret = data['data'] - if (data_ret['status'] == "authorized"): - print("authorization success!!!") - self.gateway.write_log("authorization success!!!") - self.on_authenticate() - elif (data_ret['status'] == "unauthorized"): - print("authorization failed!!!") - self.gateway.write_log("authorization failed!!!") - else: - print("??unhandled status: ", data) + if stream == "authorization": + self.on_authenticate(data) + elif stream == "listening": + streams = data["streams"] - def on_error(self, exception_type: type, exception_value: Exception, tb): - """""" - print("on_error: ", type, Exception, tb) - sys.stderr.write( - self.exception_detail(exception_type, exception_value, tb) - ) + if "trade_updates" in streams: + self.gateway.write_log("委托成交推送订阅成功") - def subscribe_topic(self): - pass + if "account_updates" in streams: + self.gateway.write_log("资金变化推送订阅成功") - def on_tick(self, d): - """""" - pass - - def on_depth(self, d): - """""" - pass - - def on_trade(self, d): - """""" - pass - - def generateDateTime(self, s): - """生成时间""" - dt = datetime.fromtimestamp(s / 1000.0) - time = dt.strftime("%H:%M:%S.%f") - return time + elif stream == "trade_updates": + self.on_order(data) + elif stream == "account_updates": + self.on_account(data) def on_order(self, data): """""" - pass + # Update order + d = data["order"] + sys_orderid = d["id"] + local_orderid = d["client_order_id"] + LOCAL_SYS_MAP[local_orderid] = sys_orderid - def on_position(self, d): - """""" - pass + direction = DIRECTION_ALPACA2VT[d["side"]] + order_type = ORDERTYPE_ALPACA2VT[d["type"]] - def on_account(self, d): - """""" - pass + order = OrderData( + orderid=local_orderid, + symbol=d["symbol"], + exchange=Exchange.SMART, + price=float(d["limit_price"]), + volume=float(d["qty"]), + type=order_type, + direction=direction, + traded=float(d["filled_qty"]), + status=STATUS_ALPACA2VT.get(d["status"], Status.SUBMITTING), + time=d["created_at"], + gateway_name=self.gateway_name, + ) + self.gateway.on_order(order) - def on_contract(self, d): + # Update Trade + event = data.get("event", "") + if event != "fill": + return + + self.trade_count += 1 + + trade = TradeData( + symbol=order.symbol, + exchange=order.exchange, + orderid=order.orderid, + tradeid=str(self.trade_count), + direction=order.direction, + price=float(data["price"]), + volume=int(data["qty"]), + time=data["timestamp"], + gateway_name=self.gateway_name + ) + self.gateway.on_trade(trade) + + def on_account(self, data): """""" - pass + account = AccountData( + accountid=data["id"], + balance=float(data["equity"]), + gateway_name=self.gateway_name + ) + self.gateway.on_account(account) + + +class AlpacaDataRestApi(RestClient): + """ + Alpaca Market Data REST API + """ + + def __init__(self, gateway: AlpacaGateway): + """""" + super().__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.key = "" + self.secret = "" + + self.symbols = set() + + def sign(self, request): + """ + Generate Alpaca signature. + """ + headers = { + "APCA-API-KEY-ID": self.key, + "APCA-API-SECRET-KEY": self.secret, + "Content-Type": "application/json" + } + + request.headers = headers + request.allow_redirects = False + return request + + def connect( + self, + key: str, + secret: str, + session_num: int + ): + """ + Initialize connection to REST server. + """ + self.key = key + self.secret = secret + + self.init(DATA_REST_HOST) + self.start(session_num) + + self.gateway.write_log("行情REST API启动成功") + + def subscribe(self, req: SubscribeRequest): + """""" + self.symbols.add(req.symbol) + + def query_bar(self): + """""" + if not self._active or not self.symbols: + return + + params = { + "symbols": ",".join(list(self.symbols)), + "limit": 1 + } + + self.add_request( + method="GET", + path="/v1/bars/1Min", + params=params, + callback=self.on_query_bar + ) + + def on_query_bar(self, data, request): + """""" + for symbol, buf in data.items(): + d = buf[0] + + tick = TickData( + symbol=symbol, + exchange=Exchange.SMART, + datetime=datetime.now(), + name=symbol, + open_price=d["o"], + high_price=d["h"], + low_price=d["l"], + last_price=d["c"], + gateway_name=self.gateway_name + ) + + self.gateway.on_tick(tick)