From 7be2a90e4c76e1cca0864cc3b770ac184b5b06f2 Mon Sep 17 00:00:00 2001 From: vigarbuaa Date: Sat, 15 Jun 2019 11:19:15 +0800 Subject: [PATCH] add websocket into alpaca --- vnpy/gateway/alpaca/alpaca_gateway.py | 97 +++++++++++++++++++++------ 1 file changed, 77 insertions(+), 20 deletions(-) diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index e82caf4f..528457e5 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -37,6 +37,7 @@ from vnpy.trader.object import ( REST_HOST = "https://api.alpaca.markets" PAPER_REST_HOST = "https://paper-api.alpaca.markets" +WEBSOCKET_HOST = "wss://paper-api.alpaca.markets/stream" # Market Data KEY = "" SECRET = "" @@ -79,7 +80,7 @@ class AlpacaGateway(BaseGateway): super(AlpacaGateway, self).__init__(event_engine, "ALPACA") self.rest_api = AlpacaRestApi(self) - #self.ws_api = AlpacaWebsocketApi(self) + self.ws_api = AlpacaWebsocketApi(self) self.order_map = {} def connect(self, setting: dict): @@ -92,7 +93,7 @@ class AlpacaGateway(BaseGateway): self.rest_api.connect(key, secret, session, proxy_host, proxy_port) - #self.ws_api.connect(key, secret, proxy_host, proxy_port) + self.ws_api.connect(key, secret, proxy_host, proxy_port) def subscribe(self, req: SubscribeRequest): """""" @@ -139,7 +140,7 @@ class AlpacaRestApi(RestClient): self.order_count = 1_000_000 self.order_count_lock = Lock() self.connect_time = 0 - self.order_dict={} + self.order_list = [] def query_account(self): print("call query_account") @@ -151,7 +152,6 @@ class AlpacaRestApi(RestClient): ) def on_query_account(self, data, request): - print("debug on_query_account: ", data, request) account = AccountData( accountid=data['id'], balance=float(data['cash']), @@ -170,7 +170,6 @@ class AlpacaRestApi(RestClient): ) def on_query_position(self, data, request): - print("debug on_query_position: ", data, "!!!", request) for d in data: position = PositionData( symbol=d['symbol'], @@ -225,14 +224,14 @@ class AlpacaRestApi(RestClient): self.gateway.write_log("ALPACA REST API启动成功") self.query_account() self.query_position() + #self.query_contracts() - def on_send_order(self, data, request: Request): - print("debug on send order: ", data, request) - order = request.extra - lcoal_order_id= order.orderId + def on_send_order(self, data, request ): + print("debug on_send_order data is {} request is {} ---".format( data, request)) remote_order_id = data['id'] - self.order_dict[local_order_id]=remote_order_id - print('debug on_send_order: ', local_order_id, remote_order_id) + order = request.extra + self.order_list.append(remote_order_id) + print('debug on_send_order: local is {} ---'.format(self.order_list)) self.gateway.on_order(order) def on_failed_order(self, status_code: int, request: Request): @@ -266,12 +265,13 @@ class AlpacaRestApi(RestClient): # need debug 0608 def cancel_order(self, req: CancelRequest): """""" + """ order_id = req.orderid remote_order_id = self.order_dict['order_id'] if remote_order_id is None: print("[error]: can not get remote_order_id from local dict!") return - path="/v1/orders/"+str(remote_order_id) + path = "/v1/orders/" + str(remote_order_id) self.add_request( "DELETE", path, @@ -280,11 +280,13 @@ class AlpacaRestApi(RestClient): extra=req ) print("come to cancel_order", order_id) +""" + pass def on_cancel_order(self, data, request): """Websocket will push a new order status""" pass - + def on_cancel_order_error(self, data, request): pass @@ -314,6 +316,38 @@ class AlpacaRestApi(RestClient): ) return order.vt_orderid + def on_query_contracts(self, data, request: Request): + for instrument_data in data: + symbol = instrument_data['symbol'] + contract = ContractData( + symbol=symbol, + exchange=Exchange.ALPACA, + name=symbol, + product=Product.SPOT, + size=1, # need debug + pricetick=0.01, # need debug + gateway_name=self.gateway_name + ) + self.on_contract(contract) + + def on_failed_query_contracts(self, status_code: int, request: Request): + pass + + def on_error_query_contracts(self, exception_type: type, exception_value: Exception, tb, request: Request): + pass + + def query_contracts(self): + params = {"status": "active"} + self.add_request( + "GET", + "/v1/assets", + params = params, + callback=self.on_query_contracts, + on_failed=self.on_failed_query_contracts, + on_error=self.on_error_query_contracts, + # data=data, + ) + class AlpacaWebsocketApi(WebsocketClient): """""" @@ -354,10 +388,26 @@ class AlpacaWebsocketApi(WebsocketClient): ): """""" self.key = key - self.secret = secret.encode() + self.secret = secret self.init(WEBSOCKET_HOST, proxy_host, proxy_port) self.start() + def login(self): + """""" + params={"action":"authenticate", "data": { + "key_id":self.key,"secret_key":self.secret + }} + print("string is {} ===".format(params)) + self.send_packet(params) + + def on_login(self, packet): + """""" + print("websocket authenticate success!!! msg: ", packet) + params={"action":"listen", "data": { + "streams":["account_updates", "trade_updates"] + }} + self.send_packet(params) + def subscribe(self, req: SubscribeRequest): pass @@ -372,7 +422,7 @@ class AlpacaWebsocketApi(WebsocketClient): def on_connected(self): """""" self.gateway.write_log("Websocket API连接成功") - self.authenticate() + self.login() def on_disconnected(self): """""" @@ -382,6 +432,17 @@ class AlpacaWebsocketApi(WebsocketClient): def on_packet(self, packet: dict): """""" print("debug on_packet: ", packet) + if "ping" in packet: + # has ping??? + req = {"pong": packet["ping"]} + print("ping response: ", req) + self.send_packet(req) + elif "stream" in packet and "data" in packet: + # on_data + pass + else: + print("unrecognize mesg", packet) + #parse authenticate/trade/other data here # ---------------------------------------------------------------------- def on_response(self, data): @@ -408,10 +469,6 @@ class AlpacaWebsocketApi(WebsocketClient): print("on_error: ", type, Exception, tb) pass - # debug OK , 0405 - def authenticate(self): - pass - def subscribe_topic(self): pass @@ -447,4 +504,4 @@ class AlpacaWebsocketApi(WebsocketClient): def on_contract(self, d): """""" - pass + pass \ No newline at end of file