add subscribe func

This commit is contained in:
vigarbuaa 2019-06-21 13:26:40 +08:00
parent 0753e3cfe2
commit 2dd33b5ba0

View File

@ -100,7 +100,7 @@ class AlpacaGateway(BaseGateway):
def subscribe(self, req: SubscribeRequest): def subscribe(self, req: SubscribeRequest):
"""""" """"""
# self.ws_api.subscribe(req) self.ws_api.subscribe(req)
pass pass
def send_order(self, req: OrderRequest): def send_order(self, req: OrderRequest):
@ -393,6 +393,7 @@ class AlpacaWebsocketApi(WebsocketClient):
self.askDict = {} self.askDict = {}
self.orderLocalDict = {} self.orderLocalDict = {}
self.channelDict = {} # ChannelID : (Channel, Symbol) self.channelDict = {} # ChannelID : (Channel, Symbol)
self.channels=["account_updates", "trade_updates"]
def connect( def connect(
self, key: str, secret: str, proxy_host: str, proxy_port: int,url:str self, key: str, secret: str, proxy_host: str, proxy_port: int,url:str
@ -413,12 +414,16 @@ class AlpacaWebsocketApi(WebsocketClient):
def on_authenticate(self): def on_authenticate(self):
"""""" """"""
params={"action":"listen", "data": { params={"action":"listen", "data": {
"streams":["account_updates", "trade_updates"] "streams":self.channels
}} }}
self.send_packet(params) self.send_packet(params)
def subscribe(self, req: SubscribeRequest): def subscribe(self, req: SubscribeRequest):
pass self.channels.append(req.symbol)
params={"action":"listen", "data": {
"streams":self.channels
}}
self.send_packet(params)
def send_order(self, req: OrderRequest): def send_order(self, req: OrderRequest):
pass pass
@ -468,6 +473,8 @@ class AlpacaWebsocketApi(WebsocketClient):
self.gateway.on_account(account) self.gateway.on_account(account)
elif(stream_ret == "trade_updates"): elif(stream_ret == "trade_updates"):
d=data_ret['order'] d=data_ret['order']
order_id = d['id']
order=GLOBAL_ORDER[order_id]
if (data_ret['event'] == "fill"): if (data_ret['event'] == "fill"):
trade = TradeData( trade = TradeData(
symbol=d["symbol"], symbol=d["symbol"],
@ -481,9 +488,9 @@ class AlpacaWebsocketApi(WebsocketClient):
gateway_name=self.gateway_name, gateway_name=self.gateway_name,
) )
self.gateway.on_trade(trade) self.gateway.on_trade(trade)
order.status = Status.ALLTRADED
self.gateway.on_order(order)
elif (data_ret['event'] == "canceled"): elif (data_ret['event'] == "canceled"):
order_id = d['id']
order=GLOBAL_ORDER[order_id]
order.status = Status.CANCELLED order.status = Status.CANCELLED
self.gateway.on_order(order) self.gateway.on_order(order)
print("^^^^debug cancel order id, ",order_id," body: ",order) print("^^^^debug cancel order id, ",order_id," body: ",order)