add process_timer_event to query account and position

This commit is contained in:
vigarbuaa 2019-06-21 22:31:30 +08:00
parent 2dd33b5ba0
commit 9124302cca

View File

@ -13,6 +13,8 @@ from datetime import datetime
from urllib.parse import urlencode
from vnpy.api.rest import Request, RestClient
from vnpy.api.websocket import WebsocketClient
from vnpy.event import Event
from vnpy.trader.event import EVENT_TIMER
from vnpy.trader.constant import (
Direction,
@ -97,11 +99,11 @@ class AlpacaGateway(BaseGateway):
websocket_url = WEBSOCKET_HOST if env == "REAL" else PAPER_WEBSOCKET_HOST
self.rest_api.connect(key, secret, session, proxy_host, proxy_port,rest_url)
self.ws_api.connect(key, secret, proxy_host, proxy_port,websocket_url)
self.init_query()
def subscribe(self, req: SubscribeRequest):
""""""
self.ws_api.subscribe(req)
pass
def send_order(self, req: OrderRequest):
""""""
@ -123,6 +125,32 @@ class AlpacaGateway(BaseGateway):
""""""
self.rest_api.stop()
self.ws_api.stop()
def process_timer_event(self, event: Event):
""""""
self.count += 1
if self.count < 5:
return
self.query_account()
self.query_position()
def init_query(self):
""""""
self.count = 0
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def process_timer_event(self, event: Event):
""""""
self.count += 1
if self.count < 5:
return
else:
self.count = 0
self.query_account()
self.query_position()
class AlpacaRestApi(RestClient):
@ -419,7 +447,9 @@ class AlpacaWebsocketApi(WebsocketClient):
self.send_packet(params)
def subscribe(self, req: SubscribeRequest):
self.channels.append(req.symbol)
self.channels.append("A."+req.symbol)
self.channels.append("Q."+req.symbol)
print("debug subscribe: {} ".format(self.channels))
params={"action":"listen", "data": {
"streams":self.channels
}}
@ -435,7 +465,7 @@ class AlpacaWebsocketApi(WebsocketClient):
def on_connected(self):
""""""
self.gateway.write_log("Websocket API连接成功")
self.gateway.write_log("Websocket API连接成功")
self.authenticate()
def on_disconnected(self):
@ -464,6 +494,7 @@ class AlpacaWebsocketApi(WebsocketClient):
data_ret = data['data']
if(stream_ret == "account_updates"):
#handle account
print("!!!!!!!@@@@@@@@ account_udpate is {}".format(data))
account = AccountData(
accountid=data_ret['id'],
balance=float(data_ret['cash']),
@ -475,12 +506,15 @@ class AlpacaWebsocketApi(WebsocketClient):
d=data_ret['order']
order_id = d['id']
order=GLOBAL_ORDER[order_id]
if (data_ret['event'] == "fill"):
local_order_id = order.orderid
order.status = STATUS_ALPACA2VT[data_ret['event']]
self.gateway.on_order(order)
if (data_ret['event'] == "fill" or data_ret['event'] == "partial_fill"):
trade = TradeData(
symbol=d["symbol"],
exchange=Exchange.ALPACA,
orderid=d['id'],
tradeid=None,
orderid=local_order_id,
tradeid=d['id'],
direction=DIRECTION_ALPACA2VT[d["side"]],
price=d["filled_avg_price"],
volume=d["filled_qty"],
@ -488,15 +522,6 @@ class AlpacaWebsocketApi(WebsocketClient):
gateway_name=self.gateway_name,
)
self.gateway.on_trade(trade)
order.status = Status.ALLTRADED
self.gateway.on_order(order)
elif (data_ret['event'] == "canceled"):
order.status = Status.CANCELLED
self.gateway.on_order(order)
print("^^^^debug cancel order id, ",order_id," body: ",order)
else:
print("unhandled trade_update msg, ", data_ret['event'])
#self.gateway.on_order(order) # udpate order status
else:
pass
@ -526,14 +551,6 @@ class AlpacaWebsocketApi(WebsocketClient):
pass
# ----------------------------------------------------------------------
def on_wallet(self, data):
""""""
pass
# ----------------------------------------------------------------------
def on_trade_update(self, data):
""""""
pass
def on_error(self, exception_type: type, exception_value: Exception, tb):
""""""