diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index a34e8529..31b37d24 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -49,7 +49,7 @@ STATUS_ALPACA2VT = { "partial_fill": Status.PARTTRADED, "fill": Status.ALLTRADED, "cancelled": Status.CANCELLED, - #"done_for_day": Status.CANCELLED, + # "done_for_day": Status.CANCELLED, "expired": Status.NOTTRADED } @@ -62,7 +62,8 @@ ORDERTYPE_VT2ALPACA = { OrderType.MARKET: "market" } ORDERTYPE_ALPACA2VT = {v: k for k, v in ORDERTYPE_VT2ALPACA.items()} -GLOBAL_ORDER={} +GLOBAL_ORDER = {} + class AlpacaGateway(BaseGateway): """ @@ -93,11 +94,12 @@ class AlpacaGateway(BaseGateway): session = setting["session"] proxy_host = setting["proxy_host"] proxy_port = setting["proxy_port"] - env=setting['服务器'] - rest_url = REST_HOST if env == "REAL" else PAPER_REST_HOST - websocket_url = WEBSOCKET_HOST if env == "REAL" else PAPER_WEBSOCKET_HOST - self.rest_api.connect(key, secret, session, proxy_host, proxy_port,rest_url) - self.ws_api.connect(key, secret, proxy_host, proxy_port,websocket_url) + env = setting['服务器'] + rest_url = REST_HOST if env == "REAL" else PAPER_REST_HOST + websocket_url = WEBSOCKET_HOST if env == "REAL" else PAPER_WEBSOCKET_HOST + self.rest_api.connect(key, secret, session, + proxy_host, proxy_port, rest_url) + self.ws_api.connect(key, secret, proxy_host, proxy_port, websocket_url) self.init_query() def subscribe(self, req: SubscribeRequest): @@ -124,7 +126,7 @@ class AlpacaGateway(BaseGateway): """""" self.rest_api.stop() self.ws_api.stop() - + def process_timer_event(self, event: Event): """""" self.count += 1 @@ -139,7 +141,6 @@ class AlpacaGateway(BaseGateway): self.count = 0 self.event_engine.register(EVENT_TIMER, self.process_timer_event) - def process_timer_event(self, event: Event): """""" self.count += 1 @@ -170,7 +171,7 @@ class AlpacaRestApi(RestClient): self.order_count = 1_000_000 self.order_count_lock = Lock() self.connect_time = 0 - self.order_dict ={} + self.order_dict = {} def query_account(self): path = f"/v1/account" @@ -204,7 +205,7 @@ class AlpacaRestApi(RestClient): exchange=Exchange.ALPACA, direction=DIRECTION_ALPACA2VT[d['side']], volume=d['qty'], - price=round(d['avg_entry_price'],3), + price=round(d['avg_entry_price'], 3), pnl=d['unrealized_pl'], gateway_name=self.gateway_name, ) @@ -251,14 +252,14 @@ class AlpacaRestApi(RestClient): self.gateway.write_log("ALPACA REST API启动成功") self.query_account() self.query_position() - #self.query_contracts() + # self.query_contracts() - def on_send_order(self, data, request: Request ): + def on_send_order(self, data, request: Request): remote_order_id = data['id'] order = request.extra - self.order_dict[order.orderid]=remote_order_id + self.order_dict[order.orderid] = remote_order_id self.gateway.on_order(order) - GLOBAL_ORDER[remote_order_id]=order + GLOBAL_ORDER[remote_order_id] = order def on_failed_order(self, status_code: int, request: Request): """ @@ -308,7 +309,7 @@ class AlpacaRestApi(RestClient): """Websocket will push a new order status""" pass - def on_cancel_order_error( self, exception_type: type, exception_value: Exception, tb, request: Request): + def on_cancel_order_error(self, exception_type: type, exception_value: Exception, tb, request: Request): # Record exception if not ConnectionError if not issubclass(exception_type, ConnectionError): self.on_error(exception_type, exception_value, tb, request) @@ -327,7 +328,7 @@ class AlpacaRestApi(RestClient): data = raw_dict order = req.create_order_data(orderid, self.gateway_name) - print("debug send_order orderBody extra: ",order) + print("debug send_order orderBody extra: ", order) self.add_request( "POST", "/v1/orders", @@ -336,20 +337,20 @@ class AlpacaRestApi(RestClient): extra=order, on_failed=self.on_failed_order, on_error=self.on_error_order, - #json_str=data, + # json_str=data, ) print("debug send_order ret val : ", order.vt_orderid) return order.vt_orderid def on_query_contracts(self, data, request: Request): - for instrument_data in data: + for instrument_data in data: symbol = instrument_data['symbol'] contract = ContractData( symbol=symbol, - exchange=Exchange.ALPACA, + exchange=Exchange.ALPACA, name=symbol, product=Product.SPOT, - size=1, + size=1, pricetick=0.01, gateway_name=self.gateway_name ) @@ -360,13 +361,13 @@ class AlpacaRestApi(RestClient): def on_error_query_contracts(self, exception_type: type, exception_value: Exception, tb, request: Request): pass - + def query_contracts(self): params = {"status": "active"} self.add_request( "GET", "/v1/assets", - params = params, + params=params, callback=self.on_query_contracts, on_failed=self.on_failed_query_contracts, on_error=self.on_error_query_contracts, @@ -406,10 +407,10 @@ class AlpacaWebsocketApi(WebsocketClient): self.askDict = {} self.orderLocalDict = {} self.channelDict = {} # ChannelID : (Channel, Symbol) - self.channels=["account_updates", "trade_updates"] + self.channels = ["account_updates", "trade_updates"] def connect( - self, key: str, secret: str, proxy_host: str, proxy_port: int,url:str + self, key: str, secret: str, proxy_host: str, proxy_port: int, url: str ): """""" self.key = key @@ -419,24 +420,24 @@ class AlpacaWebsocketApi(WebsocketClient): def authenticate(self): """""" - params={"action":"authenticate", "data": { - "key_id":self.key,"secret_key":self.secret + params = {"action": "authenticate", "data": { + "key_id": self.key, "secret_key": self.secret }} self.send_packet(params) def on_authenticate(self): """""" - params={"action":"listen", "data": { - "streams":self.channels + params = {"action": "listen", "data": { + "streams": self.channels }} self.send_packet(params) - + def subscribe(self, req: SubscribeRequest): - self.channels.append("A."+req.symbol) - self.channels.append("Q."+req.symbol) + self.channels.append("A." + req.symbol) + self.channels.append("Q." + req.symbol) print("debug subscribe: {} ".format(self.channels)) - params={"action":"listen", "data": { - "streams":self.channels + params = {"action": "listen", "data": { + "streams": self.channels }} self.send_packet(params) @@ -450,7 +451,7 @@ class AlpacaWebsocketApi(WebsocketClient): def on_connected(self): """""" - self.gateway.write_log("Websocket API连接成功") + self.gateway.write_log("Websocket API连接成功") self.authenticate() def on_disconnected(self): @@ -463,9 +464,9 @@ class AlpacaWebsocketApi(WebsocketClient): if "stream" in packet and "data" in packet: stream_ret = packet['stream'] data_ret = packet['data'] - if(stream_ret == "authorization"): + if(stream_ret == "authorization"): self.handle_auth(packet) - elif(stream_ret == "listening"): + elif(stream_ret == "listening"): self.gateway.write_log("listening {}".format(data_ret)) else: self.on_data(packet) @@ -478,19 +479,18 @@ class AlpacaWebsocketApi(WebsocketClient): stream_ret = data['stream'] data_ret = data['data'] if(stream_ret == "account_updates"): - #handle account - print("!!!!!!!@@@@@@@@ account_udpate is {}".format(data)) account = AccountData( accountid=data_ret['id'], balance=float(data_ret['cash']), - frozen=float(data_ret['cash']) - float(data_ret['cash_withdrawable']), + frozen=float(data_ret['cash']) - + float(data_ret['cash_withdrawable']), gateway_name=self.gateway_name ) self.gateway.on_account(account) elif(stream_ret == "trade_updates"): - d=data_ret['order'] + d = data_ret['order'] order_id = d['id'] - order=GLOBAL_ORDER[order_id] + order = GLOBAL_ORDER[order_id] local_order_id = order.orderid order.status = STATUS_ALPACA2VT[data_ret['event']] self.gateway.on_order(order) @@ -522,7 +522,7 @@ class AlpacaWebsocketApi(WebsocketClient): print("authorization failed!!!") self.gateway.write_log("authorization failed!!!") else: - print("??unhandled status: ",data) + print("??unhandled status: ", data) # ---------------------------------------------------------------------- def on_response(self, data): @@ -540,7 +540,7 @@ class AlpacaWebsocketApi(WebsocketClient): """""" print("on_error: ", type, Exception, tb) sys.stderr.write( - self.exception_detail(exception_type, exception_value, tb ) + self.exception_detail(exception_type, exception_value, tb) ) def subscribe_topic(self): @@ -578,4 +578,4 @@ class AlpacaWebsocketApi(WebsocketClient): def on_contract(self, d): """""" - pass \ No newline at end of file + pass