add websocket into alpaca

This commit is contained in:
vigarbuaa 2019-06-15 11:19:15 +08:00
parent 5e5137f9ab
commit 7be2a90e4c

View File

@ -37,6 +37,7 @@ from vnpy.trader.object import (
REST_HOST = "https://api.alpaca.markets"
PAPER_REST_HOST = "https://paper-api.alpaca.markets"
WEBSOCKET_HOST = "wss://paper-api.alpaca.markets/stream" # Market Data
KEY = ""
SECRET = ""
@ -79,7 +80,7 @@ class AlpacaGateway(BaseGateway):
super(AlpacaGateway, self).__init__(event_engine, "ALPACA")
self.rest_api = AlpacaRestApi(self)
#self.ws_api = AlpacaWebsocketApi(self)
self.ws_api = AlpacaWebsocketApi(self)
self.order_map = {}
def connect(self, setting: dict):
@ -92,7 +93,7 @@ class AlpacaGateway(BaseGateway):
self.rest_api.connect(key, secret, session, proxy_host, proxy_port)
#self.ws_api.connect(key, secret, proxy_host, proxy_port)
self.ws_api.connect(key, secret, proxy_host, proxy_port)
def subscribe(self, req: SubscribeRequest):
""""""
@ -139,7 +140,7 @@ class AlpacaRestApi(RestClient):
self.order_count = 1_000_000
self.order_count_lock = Lock()
self.connect_time = 0
self.order_dict={}
self.order_list = []
def query_account(self):
print("call query_account")
@ -151,7 +152,6 @@ class AlpacaRestApi(RestClient):
)
def on_query_account(self, data, request):
print("debug on_query_account: ", data, request)
account = AccountData(
accountid=data['id'],
balance=float(data['cash']),
@ -170,7 +170,6 @@ class AlpacaRestApi(RestClient):
)
def on_query_position(self, data, request):
print("debug on_query_position: ", data, "!!!", request)
for d in data:
position = PositionData(
symbol=d['symbol'],
@ -225,14 +224,14 @@ class AlpacaRestApi(RestClient):
self.gateway.write_log("ALPACA REST API启动成功")
self.query_account()
self.query_position()
#self.query_contracts()
def on_send_order(self, data, request: Request):
print("debug on send order: ", data, request)
order = request.extra
lcoal_order_id= order.orderId
def on_send_order(self, data, request ):
print("debug on_send_order data is {} request is {} ---".format( data, request))
remote_order_id = data['id']
self.order_dict[local_order_id]=remote_order_id
print('debug on_send_order: ', local_order_id, remote_order_id)
order = request.extra
self.order_list.append(remote_order_id)
print('debug on_send_order: local is {} ---'.format(self.order_list))
self.gateway.on_order(order)
def on_failed_order(self, status_code: int, request: Request):
@ -266,12 +265,13 @@ class AlpacaRestApi(RestClient):
# need debug 0608
def cancel_order(self, req: CancelRequest):
""""""
"""
order_id = req.orderid
remote_order_id = self.order_dict['order_id']
if remote_order_id is None:
print("[error]: can not get remote_order_id from local dict!")
return
path="/v1/orders/"+str(remote_order_id)
path = "/v1/orders/" + str(remote_order_id)
self.add_request(
"DELETE",
path,
@ -280,6 +280,8 @@ class AlpacaRestApi(RestClient):
extra=req
)
print("come to cancel_order", order_id)
"""
pass
def on_cancel_order(self, data, request):
"""Websocket will push a new order status"""
@ -314,6 +316,38 @@ class AlpacaRestApi(RestClient):
)
return order.vt_orderid
def on_query_contracts(self, data, request: Request):
for instrument_data in data:
symbol = instrument_data['symbol']
contract = ContractData(
symbol=symbol,
exchange=Exchange.ALPACA,
name=symbol,
product=Product.SPOT,
size=1, # need debug
pricetick=0.01, # need debug
gateway_name=self.gateway_name
)
self.on_contract(contract)
def on_failed_query_contracts(self, status_code: int, request: Request):
pass
def on_error_query_contracts(self, exception_type: type, exception_value: Exception, tb, request: Request):
pass
def query_contracts(self):
params = {"status": "active"}
self.add_request(
"GET",
"/v1/assets",
params = params,
callback=self.on_query_contracts,
on_failed=self.on_failed_query_contracts,
on_error=self.on_error_query_contracts,
# data=data,
)
class AlpacaWebsocketApi(WebsocketClient):
""""""
@ -354,10 +388,26 @@ class AlpacaWebsocketApi(WebsocketClient):
):
""""""
self.key = key
self.secret = secret.encode()
self.secret = secret
self.init(WEBSOCKET_HOST, proxy_host, proxy_port)
self.start()
def login(self):
""""""
params={"action":"authenticate", "data": {
"key_id":self.key,"secret_key":self.secret
}}
print("string is {} ===".format(params))
self.send_packet(params)
def on_login(self, packet):
""""""
print("websocket authenticate success!!! msg: ", packet)
params={"action":"listen", "data": {
"streams":["account_updates", "trade_updates"]
}}
self.send_packet(params)
def subscribe(self, req: SubscribeRequest):
pass
@ -372,7 +422,7 @@ class AlpacaWebsocketApi(WebsocketClient):
def on_connected(self):
""""""
self.gateway.write_log("Websocket API连接成功")
self.authenticate()
self.login()
def on_disconnected(self):
""""""
@ -382,6 +432,17 @@ class AlpacaWebsocketApi(WebsocketClient):
def on_packet(self, packet: dict):
""""""
print("debug on_packet: ", packet)
if "ping" in packet:
# has ping???
req = {"pong": packet["ping"]}
print("ping response: ", req)
self.send_packet(req)
elif "stream" in packet and "data" in packet:
# on_data
pass
else:
print("unrecognize mesg", packet)
#parse authenticate/trade/other data here
# ----------------------------------------------------------------------
def on_response(self, data):
@ -408,10 +469,6 @@ class AlpacaWebsocketApi(WebsocketClient):
print("on_error: ", type, Exception, tb)
pass
# debug OK , 0405
def authenticate(self):
pass
def subscribe_topic(self):
pass