format code

This commit is contained in:
vigarbuaa 2019-06-22 10:56:03 +08:00
parent 17fe9bf914
commit 985c062549

View File

@ -49,7 +49,7 @@ STATUS_ALPACA2VT = {
"partial_fill": Status.PARTTRADED,
"fill": Status.ALLTRADED,
"cancelled": Status.CANCELLED,
#"done_for_day": Status.CANCELLED,
# "done_for_day": Status.CANCELLED,
"expired": Status.NOTTRADED
}
@ -62,7 +62,8 @@ ORDERTYPE_VT2ALPACA = {
OrderType.MARKET: "market"
}
ORDERTYPE_ALPACA2VT = {v: k for k, v in ORDERTYPE_VT2ALPACA.items()}
GLOBAL_ORDER={}
GLOBAL_ORDER = {}
class AlpacaGateway(BaseGateway):
"""
@ -93,11 +94,12 @@ class AlpacaGateway(BaseGateway):
session = setting["session"]
proxy_host = setting["proxy_host"]
proxy_port = setting["proxy_port"]
env=setting['服务器']
rest_url = REST_HOST if env == "REAL" else PAPER_REST_HOST
websocket_url = WEBSOCKET_HOST if env == "REAL" else PAPER_WEBSOCKET_HOST
self.rest_api.connect(key, secret, session, proxy_host, proxy_port,rest_url)
self.ws_api.connect(key, secret, proxy_host, proxy_port,websocket_url)
env = setting['服务器']
rest_url = REST_HOST if env == "REAL" else PAPER_REST_HOST
websocket_url = WEBSOCKET_HOST if env == "REAL" else PAPER_WEBSOCKET_HOST
self.rest_api.connect(key, secret, session,
proxy_host, proxy_port, rest_url)
self.ws_api.connect(key, secret, proxy_host, proxy_port, websocket_url)
self.init_query()
def subscribe(self, req: SubscribeRequest):
@ -124,7 +126,7 @@ class AlpacaGateway(BaseGateway):
""""""
self.rest_api.stop()
self.ws_api.stop()
def process_timer_event(self, event: Event):
""""""
self.count += 1
@ -139,7 +141,6 @@ class AlpacaGateway(BaseGateway):
self.count = 0
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def process_timer_event(self, event: Event):
""""""
self.count += 1
@ -170,7 +171,7 @@ class AlpacaRestApi(RestClient):
self.order_count = 1_000_000
self.order_count_lock = Lock()
self.connect_time = 0
self.order_dict ={}
self.order_dict = {}
def query_account(self):
path = f"/v1/account"
@ -204,7 +205,7 @@ class AlpacaRestApi(RestClient):
exchange=Exchange.ALPACA,
direction=DIRECTION_ALPACA2VT[d['side']],
volume=d['qty'],
price=round(d['avg_entry_price'],3),
price=round(d['avg_entry_price'], 3),
pnl=d['unrealized_pl'],
gateway_name=self.gateway_name,
)
@ -251,14 +252,14 @@ class AlpacaRestApi(RestClient):
self.gateway.write_log("ALPACA REST API启动成功")
self.query_account()
self.query_position()
#self.query_contracts()
# self.query_contracts()
def on_send_order(self, data, request: Request ):
def on_send_order(self, data, request: Request):
remote_order_id = data['id']
order = request.extra
self.order_dict[order.orderid]=remote_order_id
self.order_dict[order.orderid] = remote_order_id
self.gateway.on_order(order)
GLOBAL_ORDER[remote_order_id]=order
GLOBAL_ORDER[remote_order_id] = order
def on_failed_order(self, status_code: int, request: Request):
"""
@ -308,7 +309,7 @@ class AlpacaRestApi(RestClient):
"""Websocket will push a new order status"""
pass
def on_cancel_order_error( self, exception_type: type, exception_value: Exception, tb, request: Request):
def on_cancel_order_error(self, exception_type: type, exception_value: Exception, tb, request: Request):
# Record exception if not ConnectionError
if not issubclass(exception_type, ConnectionError):
self.on_error(exception_type, exception_value, tb, request)
@ -327,7 +328,7 @@ class AlpacaRestApi(RestClient):
data = raw_dict
order = req.create_order_data(orderid, self.gateway_name)
print("debug send_order orderBody extra: ",order)
print("debug send_order orderBody extra: ", order)
self.add_request(
"POST",
"/v1/orders",
@ -336,20 +337,20 @@ class AlpacaRestApi(RestClient):
extra=order,
on_failed=self.on_failed_order,
on_error=self.on_error_order,
#json_str=data,
# json_str=data,
)
print("debug send_order ret val : ", order.vt_orderid)
return order.vt_orderid
def on_query_contracts(self, data, request: Request):
for instrument_data in data:
for instrument_data in data:
symbol = instrument_data['symbol']
contract = ContractData(
symbol=symbol,
exchange=Exchange.ALPACA,
exchange=Exchange.ALPACA,
name=symbol,
product=Product.SPOT,
size=1,
size=1,
pricetick=0.01,
gateway_name=self.gateway_name
)
@ -360,13 +361,13 @@ class AlpacaRestApi(RestClient):
def on_error_query_contracts(self, exception_type: type, exception_value: Exception, tb, request: Request):
pass
def query_contracts(self):
params = {"status": "active"}
self.add_request(
"GET",
"/v1/assets",
params = params,
params=params,
callback=self.on_query_contracts,
on_failed=self.on_failed_query_contracts,
on_error=self.on_error_query_contracts,
@ -406,10 +407,10 @@ class AlpacaWebsocketApi(WebsocketClient):
self.askDict = {}
self.orderLocalDict = {}
self.channelDict = {} # ChannelID : (Channel, Symbol)
self.channels=["account_updates", "trade_updates"]
self.channels = ["account_updates", "trade_updates"]
def connect(
self, key: str, secret: str, proxy_host: str, proxy_port: int,url:str
self, key: str, secret: str, proxy_host: str, proxy_port: int, url: str
):
""""""
self.key = key
@ -419,24 +420,24 @@ class AlpacaWebsocketApi(WebsocketClient):
def authenticate(self):
""""""
params={"action":"authenticate", "data": {
"key_id":self.key,"secret_key":self.secret
params = {"action": "authenticate", "data": {
"key_id": self.key, "secret_key": self.secret
}}
self.send_packet(params)
def on_authenticate(self):
""""""
params={"action":"listen", "data": {
"streams":self.channels
params = {"action": "listen", "data": {
"streams": self.channels
}}
self.send_packet(params)
def subscribe(self, req: SubscribeRequest):
self.channels.append("A."+req.symbol)
self.channels.append("Q."+req.symbol)
self.channels.append("A." + req.symbol)
self.channels.append("Q." + req.symbol)
print("debug subscribe: {} ".format(self.channels))
params={"action":"listen", "data": {
"streams":self.channels
params = {"action": "listen", "data": {
"streams": self.channels
}}
self.send_packet(params)
@ -450,7 +451,7 @@ class AlpacaWebsocketApi(WebsocketClient):
def on_connected(self):
""""""
self.gateway.write_log("Websocket API连接成功")
self.gateway.write_log("Websocket API连接成功")
self.authenticate()
def on_disconnected(self):
@ -463,9 +464,9 @@ class AlpacaWebsocketApi(WebsocketClient):
if "stream" in packet and "data" in packet:
stream_ret = packet['stream']
data_ret = packet['data']
if(stream_ret == "authorization"):
if(stream_ret == "authorization"):
self.handle_auth(packet)
elif(stream_ret == "listening"):
elif(stream_ret == "listening"):
self.gateway.write_log("listening {}".format(data_ret))
else:
self.on_data(packet)
@ -478,19 +479,18 @@ class AlpacaWebsocketApi(WebsocketClient):
stream_ret = data['stream']
data_ret = data['data']
if(stream_ret == "account_updates"):
#handle account
print("!!!!!!!@@@@@@@@ account_udpate is {}".format(data))
account = AccountData(
accountid=data_ret['id'],
balance=float(data_ret['cash']),
frozen=float(data_ret['cash']) - float(data_ret['cash_withdrawable']),
frozen=float(data_ret['cash']) -
float(data_ret['cash_withdrawable']),
gateway_name=self.gateway_name
)
self.gateway.on_account(account)
elif(stream_ret == "trade_updates"):
d=data_ret['order']
d = data_ret['order']
order_id = d['id']
order=GLOBAL_ORDER[order_id]
order = GLOBAL_ORDER[order_id]
local_order_id = order.orderid
order.status = STATUS_ALPACA2VT[data_ret['event']]
self.gateway.on_order(order)
@ -522,7 +522,7 @@ class AlpacaWebsocketApi(WebsocketClient):
print("authorization failed!!!")
self.gateway.write_log("authorization failed!!!")
else:
print("??unhandled status: ",data)
print("??unhandled status: ", data)
# ----------------------------------------------------------------------
def on_response(self, data):
@ -540,7 +540,7 @@ class AlpacaWebsocketApi(WebsocketClient):
""""""
print("on_error: ", type, Exception, tb)
sys.stderr.write(
self.exception_detail(exception_type, exception_value, tb )
self.exception_detail(exception_type, exception_value, tb)
)
def subscribe_topic(self):
@ -578,4 +578,4 @@ class AlpacaWebsocketApi(WebsocketClient):
def on_contract(self, d):
""""""
pass
pass