[Mod] complete test of AlpacaGateway

This commit is contained in:
vn.py 2019-06-25 16:25:34 +08:00
parent 1b139f34a1
commit a25051087d
3 changed files with 474 additions and 361 deletions

View File

@ -3,30 +3,31 @@ from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
from vnpy.gateway.binance import BinanceGateway
from vnpy.gateway.bitmex import BitmexGateway
from vnpy.gateway.futu import FutuGateway
from vnpy.gateway.ib import IbGateway
from vnpy.gateway.ctp import CtpGateway
# from vnpy.gateway.binance import BinanceGateway
# from vnpy.gateway.bitmex import BitmexGateway
# from vnpy.gateway.futu import FutuGateway
# from vnpy.gateway.ib import IbGateway
# from vnpy.gateway.ctp import CtpGateway
# from vnpy.gateway.ctptest import CtptestGateway
from vnpy.gateway.femas import FemasGateway
from vnpy.gateway.tiger import TigerGateway
# from vnpy.gateway.femas import FemasGateway
# from vnpy.gateway.tiger import TigerGateway
# from vnpy.gateway.oes import OesGateway
from vnpy.gateway.okex import OkexGateway
from vnpy.gateway.huobi import HuobiGateway
from vnpy.gateway.bitfinex import BitfinexGateway
from vnpy.gateway.onetoken import OnetokenGateway
from vnpy.gateway.okexf import OkexfGateway
# from vnpy.gateway.okex import OkexGateway
# from vnpy.gateway.huobi import HuobiGateway
# from vnpy.gateway.bitfinex import BitfinexGateway
# from vnpy.gateway.onetoken import OnetokenGateway
# from vnpy.gateway.okexf import OkexfGateway
# from vnpy.gateway.xtp import XtpGateway
from vnpy.gateway.hbdm import HbdmGateway
from vnpy.gateway.tap import TapGateway
# from vnpy.gateway.hbdm import HbdmGateway
# from vnpy.gateway.tap import TapGateway
from vnpy.gateway.alpaca import AlpacaGateway
from vnpy.app.cta_strategy import CtaStrategyApp
from vnpy.app.csv_loader import CsvLoaderApp
from vnpy.app.algo_trading import AlgoTradingApp
from vnpy.app.cta_backtester import CtaBacktesterApp
from vnpy.app.data_recorder import DataRecorderApp
from vnpy.app.risk_manager import RiskManagerApp
# from vnpy.app.cta_strategy import CtaStrategyApp
# from vnpy.app.csv_loader import CsvLoaderApp
# from vnpy.app.algo_trading import AlgoTradingApp
# from vnpy.app.cta_backtester import CtaBacktesterApp
# from vnpy.app.data_recorder import DataRecorderApp
# from vnpy.app.risk_manager import RiskManagerApp
def main():
@ -37,30 +38,31 @@ def main():
main_engine = MainEngine(event_engine)
main_engine.add_gateway(BinanceGateway)
main_engine.add_gateway(CtpGateway)
# main_engine.add_gateway(BinanceGateway)
# main_engine.add_gateway(CtpGateway)
# main_engine.add_gateway(CtptestGateway)
main_engine.add_gateway(FemasGateway)
main_engine.add_gateway(IbGateway)
main_engine.add_gateway(FutuGateway)
main_engine.add_gateway(BitmexGateway)
main_engine.add_gateway(TigerGateway)
# main_engine.add_gateway(FemasGateway)
# main_engine.add_gateway(IbGateway)
# main_engine.add_gateway(FutuGateway)
# main_engine.add_gateway(BitmexGateway)
# main_engine.add_gateway(TigerGateway)
# main_engine.add_gateway(OesGateway)
main_engine.add_gateway(OkexGateway)
main_engine.add_gateway(HuobiGateway)
main_engine.add_gateway(BitfinexGateway)
main_engine.add_gateway(OnetokenGateway)
main_engine.add_gateway(OkexfGateway)
main_engine.add_gateway(HbdmGateway)
# main_engine.add_gateway(OkexGateway)
# main_engine.add_gateway(HuobiGateway)
# main_engine.add_gateway(BitfinexGateway)
# main_engine.add_gateway(OnetokenGateway)
# main_engine.add_gateway(OkexfGateway)
# main_engine.add_gateway(HbdmGateway)
# main_engine.add_gateway(XtpGateway)
main_engine.add_gateway(TapGateway)
# main_engine.add_gateway(TapGateway)
main_engine.add_gateway(AlpacaGateway)
main_engine.add_app(CtaStrategyApp)
main_engine.add_app(CtaBacktesterApp)
main_engine.add_app(CsvLoaderApp)
main_engine.add_app(AlgoTradingApp)
main_engine.add_app(DataRecorderApp)
main_engine.add_app(RiskManagerApp)
# main_engine.add_app(CtaStrategyApp)
# main_engine.add_app(CtaBacktesterApp)
# main_engine.add_app(CsvLoaderApp)
# main_engine.add_app(AlgoTradingApp)
# main_engine.add_app(DataRecorderApp)
# main_engine.add_app(RiskManagerApp)
main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()

View File

@ -258,7 +258,11 @@ class RestClient(object):
request.response = response
status_code = response.status_code
if status_code // 100 == 2: # 2xx codes are all successful
json_body = response.json()
if status_code == 204:
json_body = None
else:
json_body = response.json()
request.callback(json_body, request)
request.status = RequestStatus.success
else:

View File

@ -1,9 +1,10 @@
# encoding: UTF-8
"""
Author: vigarbuaa
Author: vigarbuaa
"""
import sys
import json
from threading import Lock
from datetime import datetime
from vnpy.api.rest import Request, RestClient
@ -23,7 +24,7 @@ from vnpy.trader.object import (
TickData,
OrderData,
TradeData,
PositionData,
PositionData,
AccountData,
ContractData,
OrderRequest,
@ -31,32 +32,42 @@ from vnpy.trader.object import (
SubscribeRequest,
)
REST_HOST = "https://api.alpaca.markets"
WEBSOCKET_HOST = "wss://api.alpaca.markets/stream" # Market Data
PAPER_REST_HOST = "https://paper-api.alpaca.markets"
PAPER_WEBSOCKET_HOST = "wss://paper-api.alpaca.markets/stream" # Market Data
KEY = ""
SECRET = ""
REST_HOST = "https://api.alpaca.markets" # Live trading
WEBSOCKET_HOST = "wss://api.alpaca.markets/stream"
PAPER_REST_HOST = "https://paper-api.alpaca.markets" # Paper Trading
PAPER_WEBSOCKET_HOST = "wss://paper-api.alpaca.markets/stream"
DATA_REST_HOST = "https://data.alpaca.markets"
STATUS_ALPACA2VT = {
"new": Status.SUBMITTING,
"partial_fill": Status.PARTTRADED,
"fill": Status.ALLTRADED,
"cancelled": Status.CANCELLED,
# "done_for_day": Status.CANCELLED,
"expired": Status.NOTTRADED
"new": Status.NOTTRADED,
"partially_filled": Status.PARTTRADED,
"filled": Status.ALLTRADED,
"canceled": Status.CANCELLED,
"expired": Status.CANCELLED,
"rejected": Status.REJECTED
}
DIRECTION_VT2ALPACA = {Direction.LONG: "buy", Direction.SHORT: "sell"}
DIRECTION_ALPACA2VT = {"buy": Direction.LONG, "sell": Direction.SHORT,
"long": Direction.LONG, "short": Direction.SHORT}
DIRECTION_VT2ALPACA = {
Direction.LONG: "buy",
Direction.SHORT: "sell"
}
DIRECTION_ALPACA2VT = {
"buy": Direction.LONG,
"sell": Direction.SHORT,
"long": Direction.LONG,
"short": Direction.SHORT
}
ORDERTYPE_VT2ALPACA = {
OrderType.LIMIT: "limit",
OrderType.MARKET: "market"
}
ORDERTYPE_ALPACA2VT = {v: k for k, v in ORDERTYPE_VT2ALPACA.items()}
GLOBAL_ORDER = {}
LOCAL_SYS_MAP = {}
class AlpacaGateway(BaseGateway):
@ -65,40 +76,41 @@ class AlpacaGateway(BaseGateway):
"""
default_setting = {
"key": "",
"secret": "",
"session": 3,
"server": ["REAL", "PAPER"],
"proxy_host": "127.0.0.1",
"proxy_port": 1080,
"KEY ID": "",
"Secret Key": "",
"会话数": 10,
"服务器": ["REAL", "PAPER"]
}
exchanges = [Exchange.SMART]
def __init__(self, event_engine):
"""Constructor"""
super(AlpacaGateway, self).__init__(event_engine, "ALPACA")
super().__init__(event_engine, "ALPACA")
self.rest_api = AlpacaRestApi(self)
self.ws_api = AlpacaWebsocketApi(self)
self.order_map = {}
self.data_rest_api = AlpacaDataRestApi(self)
def connect(self, setting: dict):
""""""
key = setting["key"]
secret = setting["secret"]
session = setting["session"]
proxy_host = setting["proxy_host"]
proxy_port = setting["proxy_port"]
env = setting['server']
rest_url = REST_HOST if env == "REAL" else PAPER_REST_HOST
websocket_url = WEBSOCKET_HOST if env == "REAL" else PAPER_WEBSOCKET_HOST
self.rest_api.connect(key, secret, session,
proxy_host, proxy_port, rest_url)
self.ws_api.connect(key, secret, proxy_host, proxy_port, websocket_url)
key = setting["KEY ID"]
secret = setting["Secret Key"]
session = setting["会话数"]
server = setting["服务器"]
rest_url = REST_HOST if server == "REAL" else PAPER_REST_HOST
websocket_url = WEBSOCKET_HOST if server == "REAL" else PAPER_WEBSOCKET_HOST
self.rest_api.connect(key, secret, session, rest_url)
self.data_rest_api.connect(key, secret, session)
self.ws_api.connect(key, secret, websocket_url)
self.init_query()
def subscribe(self, req: SubscribeRequest):
""""""
self.ws_api.subscribe(req)
self.data_rest_api.subscribe(req)
def send_order(self, req: OrderRequest):
""""""
@ -119,6 +131,7 @@ class AlpacaGateway(BaseGateway):
def close(self):
""""""
self.rest_api.stop()
self.data_rest_api.stop()
self.ws_api.stop()
def init_query(self):
@ -128,6 +141,8 @@ class AlpacaGateway(BaseGateway):
def process_timer_event(self, event: Event):
""""""
self.data_rest_api.query_bar()
self.count += 1
if self.count < 5:
return
@ -142,9 +157,9 @@ class AlpacaRestApi(RestClient):
Alpaca REST API
"""
def __init__(self, gateway: BaseGateway):
def __init__(self, gateway: AlpacaGateway):
""""""
super(AlpacaRestApi, self).__init__()
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
@ -154,46 +169,10 @@ class AlpacaRestApi(RestClient):
self.order_count = 1_000_000
self.order_count_lock = Lock()
self.connect_time = 0
self.order_dict = {}
def query_account(self):
path = f"/v1/account"
self.add_request(
method="GET",
path=path,
callback=self.on_query_account
)
def on_query_account(self, data, request):
account = AccountData(
accountid=data['id'],
balance=float(data['cash']),
frozen=float(data['cash']) - float(data['buying_power']),
gateway_name=self.gateway_name
)
self.gateway.on_account(account)
def query_position(self):
path = f"/v1/positions"
self.add_request(
method="GET",
path=path,
callback=self.on_query_position
)
def on_query_position(self, data, request):
for d in data:
position = PositionData(
symbol=d['symbol'],
exchange=Exchange.ALPACA,
direction=DIRECTION_ALPACA2VT[d['side']],
volume=d['qty'],
price=round(d['avg_entry_price'], 3),
pnl=d['unrealized_pl'],
gateway_name=self.gateway_name,
)
self.gateway.on_position(position)
self.cancel_reqs = {}
def sign(self, request):
"""
@ -202,60 +181,226 @@ class AlpacaRestApi(RestClient):
headers = {
"APCA-API-KEY-ID": self.key,
"APCA-API-SECRET-KEY": self.secret,
'Content-Type': 'application/json'
"Content-Type": "application/json"
}
request.headers = headers
request.allow_redirects = False
request.data = json.dumps(request.data)
return request
def _new_order_id(self):
with self.order_count_lock:
self.order_count += 1
return self.order_count
def connect(
self,
key: str,
secret: str,
session_num: int,
proxy_host: str,
proxy_port: int,
url: str,
):
"""
Initialize connection to REST server.
Initialize connection to REST server.
"""
self.key = key
self.secret = secret
self.init(url, proxy_host, proxy_port)
self.start(session_num)
self.connect_time = (
int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count
)
self.gateway.write_log("ALPACA REST API启动成功")
self.init(url)
self.start(session_num)
self.gateway.write_log("REST API启动成功")
self.query_contract()
self.query_account()
self.query_position()
# self.query_contracts()
self.query_order()
def query_contract(self):
""""""
params = {"status": "active"}
self.add_request(
"GET",
"/v2/assets",
params=params,
callback=self.on_query_contract
)
def query_account(self):
""""""
self.add_request(
method="GET",
path="/v2/account",
callback=self.on_query_account
)
def query_position(self):
""""""
self.add_request(
method="GET",
path="/v2/positions",
callback=self.on_query_position
)
def query_order(self):
""""""
params = {
"status": "open"
}
self.add_request(
method="GET",
path="/v2/orders",
params=params,
callback=self.on_query_order
)
def _new_order_id(self):
""""""
with self.order_count_lock:
self.order_count += 1
return self.order_count
def send_order(self, req: OrderRequest):
""""""
local_orderid = str(self.connect_time + self._new_order_id())
data = {
"symbol": req.symbol,
"qty": str(req.volume),
"side": DIRECTION_VT2ALPACA[req.direction],
"type": ORDERTYPE_VT2ALPACA[req.type],
"time_in_force": "day",
"client_order_id": local_orderid
}
if data["type"] == "limit":
data["limit_price"] = str(req.price)
order = req.create_order_data(local_orderid, self.gateway_name)
self.gateway.on_order(order)
self.add_request(
"POST",
"/v2/orders",
callback=self.on_send_order,
data=data,
extra=order,
on_failed=self.on_send_order_failed,
on_error=self.on_send_order_error,
)
return order.vt_orderid
def cancel_order(self, req: CancelRequest):
""""""
sys_orderid = LOCAL_SYS_MAP.get(req.orderid, None)
if not sys_orderid:
self.cancel_reqs[req.orderid] = req
return
path = f"/v2/orders/{sys_orderid}"
self.add_request(
"DELETE",
path,
callback=self.on_cancel_order,
extra=req
)
def on_query_contract(self, data, request: Request):
""""""
for d in data:
symbol = d["symbol"]
contract = ContractData(
symbol=symbol,
exchange=Exchange.SMART,
name=symbol,
product=Product.SPOT,
size=1,
pricetick=0.01,
gateway_name=self.gateway_name
)
self.gateway.on_contract(contract)
self.gateway.write_log("合约信息查询成功")
def on_query_account(self, data, request):
""""""
account = AccountData(
accountid=data["id"],
balance=float(data["equity"]),
gateway_name=self.gateway_name
)
self.gateway.on_account(account)
def on_query_position(self, data, request):
""""""
for d in data:
position = PositionData(
symbol=d["symbol"],
exchange=Exchange.SMART,
direction=DIRECTION_ALPACA2VT[d["side"]],
volume=int(d["qty"]),
price=float(d["avg_entry_price"]),
pnl=float(d["unrealized_pl"]),
gateway_name=self.gateway_name,
)
self.gateway.on_position(position)
def update_order(self, d: dict):
""""""
sys_orderid = d["id"]
local_orderid = d["client_order_id"]
LOCAL_SYS_MAP[local_orderid] = sys_orderid
direction = DIRECTION_ALPACA2VT[d["side"]]
order_type = ORDERTYPE_ALPACA2VT[d["type"]]
order = OrderData(
orderid=local_orderid,
symbol=d["symbol"],
exchange=Exchange.SMART,
price=float(d["limit_price"]),
volume=float(d["qty"]),
type=order_type,
direction=direction,
traded=float(d["filled_qty"]),
status=STATUS_ALPACA2VT.get(d["status"], Status.SUBMITTING),
time=d["created_at"],
gateway_name=self.gateway_name,
)
self.gateway.on_order(order)
def on_query_order(self, data, request):
""""""
for d in data:
self.update_order(d)
self.gateway.write_log("委托信息查询成功")
def on_send_order(self, data, request: Request):
remote_order_id = data['id']
order = request.extra
self.order_dict[order.orderid] = remote_order_id
self.gateway.on_order(order)
GLOBAL_ORDER[remote_order_id] = order
""""""
self.update_order(data)
def on_failed_order(self, status_code: int, request: Request):
order = request.extra
if order.orderid in self.cancel_reqs:
req = self.cancel_reqs.pop(order.orderid)
self.cancel_order(req)
def on_send_order_failed(self, status_code: int, request: Request):
"""
Callback to handle request failed.
"""
order = request.extra
order.status = Status.REJECTED
self.gateway.on_order(order)
msg = f"请求失败,状态码:{status_code},信息:{request.response.text}"
self.gateway.write_log(msg)
def on_error_order(
def on_send_order_error(
self, exception_type: type, exception_value: Exception, tb, request: Request
):
"""
@ -264,167 +409,73 @@ class AlpacaRestApi(RestClient):
order = request.extra
order.status = Status.REJECTED
self.gateway.on_order(order)
msg = f"触发异常,状态码:{exception_type},信息:{exception_value}"
print('debug on_error', msg)
self.gateway.write_log(msg)
sys.stderr.write(
self.exception_detail(exception_type, exception_value, tb, request)
)
def cancel_order(self, req: CancelRequest):
""""""
order_id = req.orderid
remote_order_id = self.order_dict[order_id]
if remote_order_id is None:
print("[error]: can not get remote_order_id from local dict!")
return
path = "/v1/orders/" + str(remote_order_id)
self.add_request(
"DELETE",
path,
callback=self.on_cancel_order,
on_error=self.on_cancel_order_error,
extra=req
)
print("come to cancel_order", order_id)
def on_cancel_order(self, data, request):
"""Websocket will push a new order status"""
pass
def on_cancel_order_error(self, exception_type: type, exception_value: Exception, tb, request: Request):
# Record exception if not ConnectionError
if not issubclass(exception_type, ConnectionError):
self.on_error(exception_type, exception_value, tb, request)
def send_order(self, req: OrderRequest):
orderid = str(self.connect_time + self._new_order_id())
raw_dict = {
"symbol": req.symbol,
"qty": int(req.volume),
"side": DIRECTION_VT2ALPACA[req.direction],
"type": ORDERTYPE_VT2ALPACA[req.type],
"time_in_force": 'day',
}
if raw_dict['type'] == "limit":
raw_dict['limit_price'] = float(req.price)
data = raw_dict
order = req.create_order_data(orderid, self.gateway_name)
print("debug send_order orderBody extra: ", order)
self.add_request(
"POST",
"/v1/orders",
callback=self.on_send_order,
data=data,
extra=order,
on_failed=self.on_failed_order,
on_error=self.on_error_order,
# json_str=data,
)
print("debug send_order ret val : ", order.vt_orderid)
return order.vt_orderid
def on_query_contracts(self, data, request: Request):
for instrument_data in data:
symbol = instrument_data['symbol']
contract = ContractData(
symbol=symbol,
exchange=Exchange.ALPACA,
name=symbol,
product=Product.SPOT,
size=1,
pricetick=0.01,
gateway_name=self.gateway_name
)
self.on_contract(contract)
def on_failed_query_contracts(self, status_code: int, request: Request):
pass
def on_error_query_contracts(self, exception_type: type, exception_value: Exception, tb, request: Request):
pass
def query_contracts(self):
params = {"status": "active"}
self.add_request(
"GET",
"/v1/assets",
params=params,
callback=self.on_query_contracts,
on_failed=self.on_failed_query_contracts,
on_error=self.on_error_query_contracts,
)
""""""
req = request.extra
msg = f"撤单成功,委托号:{req.orderid}"
self.gateway.write_log(msg)
class AlpacaWebsocketApi(WebsocketClient):
""""""
def __init__(self, gateway):
def __init__(self, gateway: AlpacaGateway):
""""""
super(AlpacaWebsocketApi, self).__init__()
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.order_id = 1_000_000
# self.date = int(datetime.now().strftime('%y%m%d%H%M%S')) * self.orderId
self.trade_count = 0
self.key = ""
self.secret = ""
self.callbacks = {
"trade": self.on_tick,
"orderBook10": self.on_depth,
"execution": self.on_trade,
"order": self.on_order,
"position": self.on_position,
"margin": self.on_account,
"instrument": self.on_contract,
}
self.ticks = {}
self.accounts = {}
self.orders = {}
self.trades = set()
self.tickDict = {}
self.bidDict = {}
self.askDict = {}
self.orderLocalDict = {}
self.channelDict = {} # ChannelID : (Channel, Symbol)
self.channels = ["account_updates", "trade_updates"]
def connect(
self, key: str, secret: str, proxy_host: str, proxy_port: int, url: str
self, key: str, secret: str, url: str
):
""""""
self.key = key
self.secret = secret
self.init(url, proxy_host, proxy_port)
self.init(url)
self.start()
def authenticate(self):
""""""
params = {"action": "authenticate", "data": {
"key_id": self.key, "secret_key": self.secret
}}
params = {
"action": "authenticate",
"data": {
"key_id": self.key,
"secret_key": self.secret
}
}
self.send_packet(params)
def on_authenticate(self):
def on_authenticate(self, data):
""""""
params = {"action": "listen", "data": {
"streams": self.channels
}}
if data["status"] == "authorized":
self.gateway.write_log("Websocket API登录成功")
else:
self.gateway.write_log("Websocket API登录失败")
return
params = {
"action": "listen",
"data": {
"streams": ["trade_updates", "account_updates"]
}
}
self.send_packet(params)
def subscribe(self, req: SubscribeRequest):
self.channels.append("A." + req.symbol)
self.channels.append("Q." + req.symbol)
print("debug subscribe: {} ".format(self.channels))
params = {"action": "listen", "data": {
"streams": self.channels
}}
self.send_packet(params)
def on_connected(self):
""""""
self.gateway.write_log("Websocket API连接成功")
@ -436,109 +487,165 @@ class AlpacaWebsocketApi(WebsocketClient):
def on_packet(self, packet: dict):
""""""
print("debug on_packet: ", packet)
if "stream" in packet and "data" in packet:
stream_ret = packet['stream']
data_ret = packet['data']
if(stream_ret == "authorization"):
self.handle_auth(packet)
elif(stream_ret == "listening"):
self.gateway.write_log("listening {}".format(data_ret))
else:
self.on_data(packet)
else:
print("unrecognize msg", packet)
def on_data(self, data):
print("on_data is {}".format(data))
stream_ret = data['stream']
data_ret = data['data']
if(stream_ret == "account_updates"):
frozen = float(data_ret['cash']) - float(data_ret['cash_withdrawable'])
stream = packet["stream"]
data = packet["data"]
account = AccountData(
accountid=data_ret['id'],
balance=float(data_ret['cash']),
frozen=frozen,
gateway_name=self.gateway_name
)
self.gateway.on_account(account)
elif(stream_ret == "trade_updates"):
d = data_ret['order']
order_id = d['id']
order = GLOBAL_ORDER[order_id]
local_order_id = order.orderid
order.status = STATUS_ALPACA2VT[data_ret['event']]
self.gateway.on_order(order)
if (data_ret['event'] == "fill" or data_ret['event'] == "partial_fill"):
trade = TradeData(
symbol=d["symbol"],
exchange=Exchange.ALPACA,
orderid=local_order_id,
tradeid=d['id'],
direction=DIRECTION_ALPACA2VT[d["side"]],
price=d["filled_avg_price"],
volume=d["filled_qty"],
time=data_ret["timestamp"][11:19],
gateway_name=self.gateway_name,
)
self.gateway.on_trade(trade)
else:
pass
def handle_auth(self, data):
# stream_ret = data['stream']
data_ret = data['data']
if (data_ret['status'] == "authorized"):
print("authorization success!!!")
self.gateway.write_log("authorization success!!!")
self.on_authenticate()
elif (data_ret['status'] == "unauthorized"):
print("authorization failed!!!")
self.gateway.write_log("authorization failed!!!")
else:
print("??unhandled status: ", data)
if stream == "authorization":
self.on_authenticate(data)
elif stream == "listening":
streams = data["streams"]
def on_error(self, exception_type: type, exception_value: Exception, tb):
""""""
print("on_error: ", type, Exception, tb)
sys.stderr.write(
self.exception_detail(exception_type, exception_value, tb)
)
if "trade_updates" in streams:
self.gateway.write_log("委托成交推送订阅成功")
def subscribe_topic(self):
pass
if "account_updates" in streams:
self.gateway.write_log("资金变化推送订阅成功")
def on_tick(self, d):
""""""
pass
def on_depth(self, d):
""""""
pass
def on_trade(self, d):
""""""
pass
def generateDateTime(self, s):
"""生成时间"""
dt = datetime.fromtimestamp(s / 1000.0)
time = dt.strftime("%H:%M:%S.%f")
return time
elif stream == "trade_updates":
self.on_order(data)
elif stream == "account_updates":
self.on_account(data)
def on_order(self, data):
""""""
pass
# Update order
d = data["order"]
sys_orderid = d["id"]
local_orderid = d["client_order_id"]
LOCAL_SYS_MAP[local_orderid] = sys_orderid
def on_position(self, d):
""""""
pass
direction = DIRECTION_ALPACA2VT[d["side"]]
order_type = ORDERTYPE_ALPACA2VT[d["type"]]
def on_account(self, d):
""""""
pass
order = OrderData(
orderid=local_orderid,
symbol=d["symbol"],
exchange=Exchange.SMART,
price=float(d["limit_price"]),
volume=float(d["qty"]),
type=order_type,
direction=direction,
traded=float(d["filled_qty"]),
status=STATUS_ALPACA2VT.get(d["status"], Status.SUBMITTING),
time=d["created_at"],
gateway_name=self.gateway_name,
)
self.gateway.on_order(order)
def on_contract(self, d):
# Update Trade
event = data.get("event", "")
if event != "fill":
return
self.trade_count += 1
trade = TradeData(
symbol=order.symbol,
exchange=order.exchange,
orderid=order.orderid,
tradeid=str(self.trade_count),
direction=order.direction,
price=float(data["price"]),
volume=int(data["qty"]),
time=data["timestamp"],
gateway_name=self.gateway_name
)
self.gateway.on_trade(trade)
def on_account(self, data):
""""""
pass
account = AccountData(
accountid=data["id"],
balance=float(data["equity"]),
gateway_name=self.gateway_name
)
self.gateway.on_account(account)
class AlpacaDataRestApi(RestClient):
"""
Alpaca Market Data REST API
"""
def __init__(self, gateway: AlpacaGateway):
""""""
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.key = ""
self.secret = ""
self.symbols = set()
def sign(self, request):
"""
Generate Alpaca signature.
"""
headers = {
"APCA-API-KEY-ID": self.key,
"APCA-API-SECRET-KEY": self.secret,
"Content-Type": "application/json"
}
request.headers = headers
request.allow_redirects = False
return request
def connect(
self,
key: str,
secret: str,
session_num: int
):
"""
Initialize connection to REST server.
"""
self.key = key
self.secret = secret
self.init(DATA_REST_HOST)
self.start(session_num)
self.gateway.write_log("行情REST API启动成功")
def subscribe(self, req: SubscribeRequest):
""""""
self.symbols.add(req.symbol)
def query_bar(self):
""""""
if not self._active or not self.symbols:
return
params = {
"symbols": ",".join(list(self.symbols)),
"limit": 1
}
self.add_request(
method="GET",
path="/v1/bars/1Min",
params=params,
callback=self.on_query_bar
)
def on_query_bar(self, data, request):
""""""
for symbol, buf in data.items():
d = buf[0]
tick = TickData(
symbol=symbol,
exchange=Exchange.SMART,
datetime=datetime.now(),
name=symbol,
open_price=d["o"],
high_price=d["h"],
low_price=d["l"],
last_price=d["c"],
gateway_name=self.gateway_name
)
self.gateway.on_tick(tick)