diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index 08bb6fb7..21e26b2b 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -13,6 +13,8 @@ from datetime import datetime from urllib.parse import urlencode from vnpy.api.rest import Request, RestClient from vnpy.api.websocket import WebsocketClient +from vnpy.event import Event +from vnpy.trader.event import EVENT_TIMER from vnpy.trader.constant import ( Direction, @@ -97,11 +99,11 @@ class AlpacaGateway(BaseGateway): websocket_url = WEBSOCKET_HOST if env == "REAL" else PAPER_WEBSOCKET_HOST self.rest_api.connect(key, secret, session, proxy_host, proxy_port,rest_url) self.ws_api.connect(key, secret, proxy_host, proxy_port,websocket_url) + self.init_query() def subscribe(self, req: SubscribeRequest): """""" self.ws_api.subscribe(req) - pass def send_order(self, req: OrderRequest): """""" @@ -123,6 +125,32 @@ class AlpacaGateway(BaseGateway): """""" self.rest_api.stop() self.ws_api.stop() + + def process_timer_event(self, event: Event): + """""" + self.count += 1 + if self.count < 5: + return + + self.query_account() + self.query_position() + + def init_query(self): + """""" + self.count = 0 + self.event_engine.register(EVENT_TIMER, self.process_timer_event) + + + def process_timer_event(self, event: Event): + """""" + self.count += 1 + if self.count < 5: + return + else: + self.count = 0 + + self.query_account() + self.query_position() class AlpacaRestApi(RestClient): @@ -419,7 +447,9 @@ class AlpacaWebsocketApi(WebsocketClient): self.send_packet(params) def subscribe(self, req: SubscribeRequest): - self.channels.append(req.symbol) + self.channels.append("A."+req.symbol) + self.channels.append("Q."+req.symbol) + print("debug subscribe: {} ".format(self.channels)) params={"action":"listen", "data": { "streams":self.channels }} @@ -435,7 +465,7 @@ class AlpacaWebsocketApi(WebsocketClient): def on_connected(self): """""" - self.gateway.write_log("Websocket API连接成功") + self.gateway.write_log("Websocket API连接成功") self.authenticate() def on_disconnected(self): @@ -464,6 +494,7 @@ class AlpacaWebsocketApi(WebsocketClient): data_ret = data['data'] if(stream_ret == "account_updates"): #handle account + print("!!!!!!!@@@@@@@@ account_udpate is {}".format(data)) account = AccountData( accountid=data_ret['id'], balance=float(data_ret['cash']), @@ -475,12 +506,15 @@ class AlpacaWebsocketApi(WebsocketClient): d=data_ret['order'] order_id = d['id'] order=GLOBAL_ORDER[order_id] - if (data_ret['event'] == "fill"): + local_order_id = order.orderid + order.status = STATUS_ALPACA2VT[data_ret['event']] + self.gateway.on_order(order) + if (data_ret['event'] == "fill" or data_ret['event'] == "partial_fill"): trade = TradeData( symbol=d["symbol"], exchange=Exchange.ALPACA, - orderid=d['id'], - tradeid=None, + orderid=local_order_id, + tradeid=d['id'], direction=DIRECTION_ALPACA2VT[d["side"]], price=d["filled_avg_price"], volume=d["filled_qty"], @@ -488,15 +522,6 @@ class AlpacaWebsocketApi(WebsocketClient): gateway_name=self.gateway_name, ) self.gateway.on_trade(trade) - order.status = Status.ALLTRADED - self.gateway.on_order(order) - elif (data_ret['event'] == "canceled"): - order.status = Status.CANCELLED - self.gateway.on_order(order) - print("^^^^debug cancel order id, ",order_id," body: ",order) - else: - print("unhandled trade_update msg, ", data_ret['event']) - #self.gateway.on_order(order) # udpate order status else: pass @@ -526,14 +551,6 @@ class AlpacaWebsocketApi(WebsocketClient): pass # ---------------------------------------------------------------------- - def on_wallet(self, data): - """""" - pass - - # ---------------------------------------------------------------------- - def on_trade_update(self, data): - """""" - pass def on_error(self, exception_type: type, exception_value: Exception, tb): """"""