From 8cf23cea4ba0392fad791e0d02f3f77fb58d1e82 Mon Sep 17 00:00:00 2001 From: vigarbuaa Date: Wed, 5 Jun 2019 23:33:53 +0800 Subject: [PATCH 01/20] add query_account and query_positions function , debug OK --- vnpy/gateway/alpaca/__init__.py | 1 + vnpy/gateway/alpaca/alpaca_gateway.py | 426 ++++++++++++++++++++++++++ 2 files changed, 427 insertions(+) create mode 100644 vnpy/gateway/alpaca/__init__.py create mode 100644 vnpy/gateway/alpaca/alpaca_gateway.py diff --git a/vnpy/gateway/alpaca/__init__.py b/vnpy/gateway/alpaca/__init__.py new file mode 100644 index 00000000..16a9a0b9 --- /dev/null +++ b/vnpy/gateway/alpaca/__init__.py @@ -0,0 +1 @@ +from .alpaca_gateway import AlpacaGateway diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py new file mode 100644 index 00000000..d102403b --- /dev/null +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -0,0 +1,426 @@ +# encoding: UTF-8 +""" + Author: vigarbuaa +""" + +import hashlib +import hmac +import sys +import time +from copy import copy +from threading import Lock +from datetime import datetime +from urllib.parse import urlencode +from vnpy.api.rest import Request, RestClient +from vnpy.api.websocket import WebsocketClient + +from vnpy.trader.constant import ( + Direction, + Exchange, + OrderType, + Product, + Status, + Offset, + Interval +) +from vnpy.trader.gateway import BaseGateway +from vnpy.trader.object import ( + TickData, + OrderData, + TradeData, + PositionData, AccountData, + ContractData, + OrderRequest, + CancelRequest, + SubscribeRequest, +) + +REST_HOST = "https://api.alpaca.markets" +PAPER_REST_HOST = "https://paper-api.alpaca.markets" +KEY = "" +SECRET = "" + +STATUS_ALPACA2VT = { + "new": Status.SUBMITTING, + "open": Status.NOTTRADED, + "part_filled": Status.PARTTRADED, + "filled": Status.ALLTRADED, + "cancelled": Status.CANCELLED, + "cancelling": Status.CANCELLED, + "failure": Status.REJECTED, +} + +DIRECTION_VT2ALPACA = {Direction.LONG: "buy", Direction.SHORT: "sell"} +DIRECTION_ALPACA2VT = {"buy": Direction.LONG, "sell": Direction.SHORT, + "long": Direction.LONG, "short": Direction.SHORT} + +ORDERTYPE_VT2ALPACA = { + OrderType.LIMIT: "limit", + OrderType.MARKET: "market" +} +ORDERTYPE_ALPACA2VT = {v: k for k, v in ORDERTYPE_VT2ALPACA.items()} + +class AlpacaGateway(BaseGateway): + """ + VN Trader Gateway for Alpaca connection. + """ + + default_setting = { + "key": "", + "secret": "", + "session": 3, + "proxy_host": "127.0.0.1", + "proxy_port": 1080, + } + + def __init__(self, event_engine): + """Constructor""" + super(AlpacaGateway, self).__init__(event_engine, "ALPACA") + + self.rest_api = AlpacaRestApi(self) + #self.ws_api = AlpacaWebsocketApi(self) + self.order_map = {} + + def connect(self, setting: dict): + """""" + key = setting["key"] + secret = setting["secret"] + session = setting["session"] + proxy_host = setting["proxy_host"] + proxy_port = setting["proxy_port"] + + self.rest_api.connect(key, secret, session, proxy_host, proxy_port) + + #self.ws_api.connect(key, secret, proxy_host, proxy_port) + + def subscribe(self, req: SubscribeRequest): + """""" + #self.ws_api.subscribe(req) + pass + + def send_order(self, req: OrderRequest): + """""" + return self.rest_api.send_order(req) + + def cancel_order(self, req: CancelRequest): + """""" + #self.ws_api.cancel_order(req) + pass + + def query_account(self): + """""" + self.rest_api.query_account() + + def query_position(self): + """""" + self.rest_api.query_position() + + def close(self): + """""" + self.rest_api.stop() + self.ws_api.stop() + + +class AlpacaRestApi(RestClient): + """ + Alpaca REST API + """ + + def __init__(self, gateway: BaseGateway): + """""" + super(AlpacaRestApi, self).__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.key = "" + self.secret = "" + + self.order_count = 1_000_000 + self.order_count_lock = Lock() + self.connect_time = 0 + print( + self.gateway_name, + self.key, + self.secret, + self.order_count, + self.connect_time) + + def query_account(self): + print("call query_account") + path = f"/v1/account" + self.add_request( + method="GET", + path=path, + callback=self.on_query_account + ) + + def on_query_account(self, data, request): + print("debug on_query_account: ", data, request) + account = AccountData( + accountid=data['id'], + balance=float(data['cash']), + frozen=float(data['cash']) - float(data['buying_power']), + gateway_name=self.gateway_name + ) + self.gateway.on_account(account) + + def query_position(self): + print("call query_position") + path = f"/v1/positions" + self.add_request( + method="GET", + path=path, + callback=self.on_query_position + ) + + def on_query_position(self, data, request): + print("debug on_query_position: ", data,"!!!", request) + for d in data: + position = PositionData( + symbol=d['symbol'], + exchange=Exchange.ALPACA, + direction=DIRECTION_ALPACA2VT[d['side']], + volume=d['qty'], + price=d['avg_entry_price'], + pnl=d['unrealized_pl'], + gateway_name=self.gateway_name, + ) + self.gateway.on_position(position) + + + + def sign(self, request): + """ + Generate Alpaca signature. + """ + #msg = request.method + \ + # "/api/v1/{}{}{}".format(path, nonce, request.data) + headers = { + "APCA-API-KEY-ID": self.key, + "APCA-API-SECRET-KEY": self.secret, + } + + request.headers = headers + request.allow_redirects=False + return request + + def _new_order_id(self): + with self.order_count_lock: + self.order_count += 1 + return self.order_count + + def connect( + self, + key: str, + secret: str, + session_num: int, + proxy_host: str, + proxy_port: int, + ): + """ + Initialize connection to REST server. + """ + self.key = key + self.secret = secret + + self.init(PAPER_REST_HOST, proxy_host, proxy_port) + print("rest connect: ", PAPER_REST_HOST, proxy_host, proxy_port) + self.start(session_num) + self.connect_time = ( + int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count + ) + print("rest client connected",self.connect_time) + self.gateway.write_log("ALPACA REST API启动成功") + self.query_account() + self.query_position() + + def on_send_order(self, status_code: int, request: Request): + print("debug on send order: ", status_code, request) + pass + + def on_failed(self, status_code: int, request: Request): + """ + Callback to handle request failed. + """ + msg = f"请求失败,状态码:{status_code},信息:{request.response.text}" + print('debug on_failed', msg) + self.gateway.write_log(msg) + + def on_error( + self, exception_type: type, exception_value: Exception, tb, request: Request + ): + """ + Callback to handler request exception. + """ + msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" + print('debug on_error', msg) + self.gateway.write_log(msg) + + sys.stderr.write( + self.exception_detail(exception_type, exception_value, tb, request) + ) + + def send_order(self, req: OrderRequest): + orderid = str(self.connect_time + self._new_order_id()) + data = { + "symbol": req.symbol, + "qty":2, + "side":'buy', + "type":'limit', + "time_in_force":'opg', + "limit_price":20.50 +# "side": DIRECTION_VT2BITMEX[req.direction], +# "ordType": ORDERTYPE_VT2BITMEX[req.type], +# "orderQty": int(req.volume), +# "clOrdID": orderid, + } + order = req.create_order_data(orderid, self.gateway_name) + print("debug detail order: ",order) + self.add_request( + "POST", + "/v1/orders", + callback=self.on_send_order, + data=data, + extra=order, + on_failed=self.on_failed, + on_error=self.on_error, + ) + self.gateway.on_order(order) + return order.vt_orderid + + +class AlpacaWebsocketApi(WebsocketClient): + """""" + + def __init__(self, gateway): + """""" + super(AlpacaWebsocketApi, self).__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + self.order_id = 1_000_000 + # self.date = int(datetime.now().strftime('%y%m%d%H%M%S')) * self.orderId + self.key = "" + self.secret = "" + + self.callbacks = { + "trade": self.on_tick, + "orderBook10": self.on_depth, + "execution": self.on_trade, + "order": self.on_order, + "position": self.on_position, + "margin": self.on_account, + "instrument": self.on_contract, + } + + self.ticks = {} + self.accounts = {} + self.orders = {} + self.trades = set() + self.tickDict = {} + self.bidDict = {} + self.askDict = {} + self.orderLocalDict = {} + self.channelDict = {} # ChannelID : (Channel, Symbol) + + def connect( + self, key: str, secret: str, proxy_host: str, proxy_port: int + ): + """""" + self.key = key + self.secret = secret.encode() + + self.init(WEBSOCKET_HOST, proxy_host, proxy_port) + + self.start() + + def subscribe(self, req: SubscribeRequest): + pass + + def send_order(self, req: OrderRequest): + pass + + # ---------------------------------------------------------------------- + def cancel_order(self, req: CancelRequest): + """""" + pass + + def on_connected(self): + """""" + self.gateway.write_log("Websocket API连接成功") + self.authenticate() + + def on_disconnected(self): + """""" + self.gateway.write_log("Websocket API连接断开") + + # need debug 20190404 + def on_packet(self, packet: dict): + """""" + print("debug on_packet: " , packet) + + # ---------------------------------------------------------------------- + def on_response(self, data): + """""" + pass + # ---------------------------------------------------------------------- + def on_update(self, data): + """""" + pass + + # ---------------------------------------------------------------------- + def on_wallet(self, data): + """""" + pass + + # ---------------------------------------------------------------------- + def on_trade_update(self, data): + """""" + pass + + def on_error(self, exception_type: type, exception_value: Exception, tb): + """""" + print("on_error: " ,type,Exception,tb) + pass + + # debug OK , 0405 + def authenticate(self): + pass + + def subscribe_topic(self): + pass + + def on_tick(self, d): + """""" + pass + + def on_depth(self, d): + """""" + pass + + def on_trade(self, d): + """""" + pass + + def generateDateTime(self, s): + """生成时间""" + dt = datetime.fromtimestamp(s / 1000.0) + time = dt.strftime("%H:%M:%S.%f") + return time + + def on_order(self, data): + """""" + pass + + def on_position(self, d): + """""" + pass + + def on_account(self, d): + """""" + pass + + def on_contract(self, d): + """""" + pass \ No newline at end of file From 21cc72ac3f43deab545f0bbcae70f357c16aca82 Mon Sep 17 00:00:00 2001 From: vigarbuaa Date: Thu, 6 Jun 2019 23:13:39 +0800 Subject: [PATCH 02/20] add send_order function 1. add send_order function 2. post by json param, need to modify rest_client.py to support json field --- vnpy/api/rest/rest_client.py | 11 ++++++++++- vnpy/gateway/alpaca/alpaca_gateway.py | 16 ++++++---------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/vnpy/api/rest/rest_client.py b/vnpy/api/rest/rest_client.py index 7bd4568e..ead3992f 100644 --- a/vnpy/api/rest/rest_client.py +++ b/vnpy/api/rest/rest_client.py @@ -49,6 +49,10 @@ class Request(object): self.response = None self.status = RequestStatus.ready + self.json=None + + def set_json(self, json_str: dict): + self.json=json_str def __str__(self): if self.response is None: @@ -61,6 +65,7 @@ class Request(object): "headers: {}\n" "params: {}\n" "data: {}\n" + "json: {}\n" "response:" "{}\n".format( self.method, @@ -70,6 +75,7 @@ class Request(object): self.headers, self.params, self.data, + self.json, "" if self.response is None else self.response.text, ) ) @@ -145,6 +151,7 @@ class RestClient(object): on_failed: Callable = None, on_error: Callable = None, extra: Any = None, + json_str: dict = None, ): """ Add a new request. @@ -170,6 +177,8 @@ class RestClient(object): on_error, extra, ) + if json_str is not None: + request.set_json(json_str) self._queue.put(request) return request @@ -253,10 +262,10 @@ class RestClient(object): headers=request.headers, params=request.params, data=request.data, + json=request.json, proxies=self.proxies, ) request.response = response - status_code = response.status_code if status_code // 100 == 2: # 2xx都算成功,尽管交易所都用200 jsonBody = response.json() diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index d102403b..3d938b44 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -265,26 +265,22 @@ class AlpacaRestApi(RestClient): orderid = str(self.connect_time + self._new_order_id()) data = { "symbol": req.symbol, - "qty":2, - "side":'buy', - "type":'limit', + "qty": int(req.volume), + "side": DIRECTION_VT2ALPACA[req.direction], + "type": ORDERTYPE_VT2ALPACA[req.type], "time_in_force":'opg', - "limit_price":20.50 -# "side": DIRECTION_VT2BITMEX[req.direction], -# "ordType": ORDERTYPE_VT2BITMEX[req.type], -# "orderQty": int(req.volume), -# "clOrdID": orderid, + "limit_price": float(req.price) } order = req.create_order_data(orderid, self.gateway_name) - print("debug detail order: ",order) self.add_request( "POST", "/v1/orders", callback=self.on_send_order, - data=data, + #data=data, extra=order, on_failed=self.on_failed, on_error=self.on_error, + json_str=data, ) self.gateway.on_order(order) return order.vt_orderid From 4e30095ac37870e5f00617b9c89bab2da8dd622e Mon Sep 17 00:00:00 2001 From: vigarbuaa Date: Thu, 6 Jun 2019 23:33:17 +0800 Subject: [PATCH 03/20] modify send_order func to fill limit price order --- vnpy/gateway/alpaca/alpaca_gateway.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index 3d938b44..aa58dc98 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -263,14 +263,17 @@ class AlpacaRestApi(RestClient): def send_order(self, req: OrderRequest): orderid = str(self.connect_time + self._new_order_id()) - data = { + raw_dict={ "symbol": req.symbol, "qty": int(req.volume), "side": DIRECTION_VT2ALPACA[req.direction], "type": ORDERTYPE_VT2ALPACA[req.type], - "time_in_force":'opg', - "limit_price": float(req.price) + "time_in_force":'day', } + if raw_dict['type'] == "limit": + raw_dict['limit_price'] = float(req.price) + + data = raw_dict order = req.create_order_data(orderid, self.gateway_name) self.add_request( "POST", From 9fbac9dfba89dc39b1d387a7e3b8cb12736fa3bd Mon Sep 17 00:00:00 2001 From: vigarbuaa Date: Sat, 8 Jun 2019 06:40:09 +0800 Subject: [PATCH 04/20] format --- vnpy/gateway/alpaca/alpaca_gateway.py | 71 ++++++++++++++------------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index aa58dc98..8b2b5aa1 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -29,7 +29,7 @@ from vnpy.trader.object import ( OrderData, TradeData, PositionData, AccountData, - ContractData, + ContractData, OrderRequest, CancelRequest, SubscribeRequest, @@ -60,6 +60,7 @@ ORDERTYPE_VT2ALPACA = { } ORDERTYPE_ALPACA2VT = {v: k for k, v in ORDERTYPE_VT2ALPACA.items()} + class AlpacaGateway(BaseGateway): """ VN Trader Gateway for Alpaca connection. @@ -95,7 +96,7 @@ class AlpacaGateway(BaseGateway): def subscribe(self, req: SubscribeRequest): """""" - #self.ws_api.subscribe(req) + # self.ws_api.subscribe(req) pass def send_order(self, req: OrderRequest): @@ -104,8 +105,7 @@ class AlpacaGateway(BaseGateway): def cancel_order(self, req: CancelRequest): """""" - #self.ws_api.cancel_order(req) - pass + self.rest_api.cancel_order(req) def query_account(self): """""" @@ -139,12 +139,7 @@ class AlpacaRestApi(RestClient): self.order_count = 1_000_000 self.order_count_lock = Lock() self.connect_time = 0 - print( - self.gateway_name, - self.key, - self.secret, - self.order_count, - self.connect_time) + self.order_dict={} def query_account(self): print("call query_account") @@ -154,7 +149,7 @@ class AlpacaRestApi(RestClient): path=path, callback=self.on_query_account ) - + def on_query_account(self, data, request): print("debug on_query_account: ", data, request) account = AccountData( @@ -164,7 +159,7 @@ class AlpacaRestApi(RestClient): gateway_name=self.gateway_name ) self.gateway.on_account(account) - + def query_position(self): print("call query_position") path = f"/v1/positions" @@ -175,7 +170,7 @@ class AlpacaRestApi(RestClient): ) def on_query_position(self, data, request): - print("debug on_query_position: ", data,"!!!", request) + print("debug on_query_position: ", data, "!!!", request) for d in data: position = PositionData( symbol=d['symbol'], @@ -188,28 +183,24 @@ class AlpacaRestApi(RestClient): ) self.gateway.on_position(position) - - def sign(self, request): """ Generate Alpaca signature. """ - #msg = request.method + \ - # "/api/v1/{}{}{}".format(path, nonce, request.data) headers = { "APCA-API-KEY-ID": self.key, "APCA-API-SECRET-KEY": self.secret, } request.headers = headers - request.allow_redirects=False + request.allow_redirects = False return request def _new_order_id(self): with self.order_count_lock: self.order_count += 1 return self.order_count - + def connect( self, key: str, @@ -230,29 +221,36 @@ class AlpacaRestApi(RestClient): self.connect_time = ( int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count ) - print("rest client connected",self.connect_time) + print("rest client connected", self.connect_time) self.gateway.write_log("ALPACA REST API启动成功") self.query_account() self.query_position() def on_send_order(self, status_code: int, request: Request): print("debug on send order: ", status_code, request) - pass + order = request.extra + self.gateway.on_order(order) - def on_failed(self, status_code: int, request: Request): + def on_failed_order(self, status_code: int, request: Request): """ Callback to handle request failed. """ + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) msg = f"请求失败,状态码:{status_code},信息:{request.response.text}" print('debug on_failed', msg) self.gateway.write_log(msg) - def on_error( + def on_error_order( self, exception_type: type, exception_value: Exception, tb, request: Request ): """ Callback to handler request exception. """ + order = request.extra + order.status = Status.REJECTED + self.gateway.on_order(order) msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" print('debug on_error', msg) self.gateway.write_log(msg) @@ -261,14 +259,19 @@ class AlpacaRestApi(RestClient): self.exception_detail(exception_type, exception_value, tb, request) ) + def cancel_order(self, req: CancelRequest): + """""" + order_id = req.orderid + print("come to cancel_order", order_id) + def send_order(self, req: OrderRequest): orderid = str(self.connect_time + self._new_order_id()) - raw_dict={ + raw_dict = { "symbol": req.symbol, "qty": int(req.volume), "side": DIRECTION_VT2ALPACA[req.direction], "type": ORDERTYPE_VT2ALPACA[req.type], - "time_in_force":'day', + "time_in_force": 'day', } if raw_dict['type'] == "limit": raw_dict['limit_price'] = float(req.price) @@ -279,15 +282,14 @@ class AlpacaRestApi(RestClient): "POST", "/v1/orders", callback=self.on_send_order, - #data=data, + # data=data, extra=order, - on_failed=self.on_failed, - on_error=self.on_error, + on_failed=self.on_failed_order, + on_error=self.on_error_order, json_str=data, ) - self.gateway.on_order(order) return order.vt_orderid - + class AlpacaWebsocketApi(WebsocketClient): """""" @@ -329,9 +331,7 @@ class AlpacaWebsocketApi(WebsocketClient): """""" self.key = key self.secret = secret.encode() - self.init(WEBSOCKET_HOST, proxy_host, proxy_port) - self.start() def subscribe(self, req: SubscribeRequest): @@ -357,13 +357,14 @@ class AlpacaWebsocketApi(WebsocketClient): # need debug 20190404 def on_packet(self, packet: dict): """""" - print("debug on_packet: " , packet) + print("debug on_packet: ", packet) # ---------------------------------------------------------------------- def on_response(self, data): """""" pass # ---------------------------------------------------------------------- + def on_update(self, data): """""" pass @@ -380,7 +381,7 @@ class AlpacaWebsocketApi(WebsocketClient): def on_error(self, exception_type: type, exception_value: Exception, tb): """""" - print("on_error: " ,type,Exception,tb) + print("on_error: ", type, Exception, tb) pass # debug OK , 0405 @@ -422,4 +423,4 @@ class AlpacaWebsocketApi(WebsocketClient): def on_contract(self, d): """""" - pass \ No newline at end of file + pass From 5e5137f9abac82e3826703e0c9fb5bc4b4b93f84 Mon Sep 17 00:00:00 2001 From: vigarbuaa Date: Sat, 8 Jun 2019 07:17:25 +0800 Subject: [PATCH 05/20] add order_dict for cancel_order function order_dict {local_order_id: remote_order_id} --- vnpy/gateway/alpaca/alpaca_gateway.py | 28 +++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index 8b2b5aa1..e82caf4f 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -226,9 +226,13 @@ class AlpacaRestApi(RestClient): self.query_account() self.query_position() - def on_send_order(self, status_code: int, request: Request): - print("debug on send order: ", status_code, request) + def on_send_order(self, data, request: Request): + print("debug on send order: ", data, request) order = request.extra + lcoal_order_id= order.orderId + remote_order_id = data['id'] + self.order_dict[local_order_id]=remote_order_id + print('debug on_send_order: ', local_order_id, remote_order_id) self.gateway.on_order(order) def on_failed_order(self, status_code: int, request: Request): @@ -259,11 +263,31 @@ class AlpacaRestApi(RestClient): self.exception_detail(exception_type, exception_value, tb, request) ) + # need debug 0608 def cancel_order(self, req: CancelRequest): """""" order_id = req.orderid + remote_order_id = self.order_dict['order_id'] + if remote_order_id is None: + print("[error]: can not get remote_order_id from local dict!") + return + path="/v1/orders/"+str(remote_order_id) + self.add_request( + "DELETE", + path, + callback=self.on_cancel_order, + on_error=self.on_cancel_order_error, + extra=req + ) print("come to cancel_order", order_id) + def on_cancel_order(self, data, request): + """Websocket will push a new order status""" + pass + + def on_cancel_order_error(self, data, request): + pass + def send_order(self, req: OrderRequest): orderid = str(self.connect_time + self._new_order_id()) raw_dict = { From 7be2a90e4c76e1cca0864cc3b770ac184b5b06f2 Mon Sep 17 00:00:00 2001 From: vigarbuaa Date: Sat, 15 Jun 2019 11:19:15 +0800 Subject: [PATCH 06/20] add websocket into alpaca --- vnpy/gateway/alpaca/alpaca_gateway.py | 97 +++++++++++++++++++++------ 1 file changed, 77 insertions(+), 20 deletions(-) diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index e82caf4f..528457e5 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -37,6 +37,7 @@ from vnpy.trader.object import ( REST_HOST = "https://api.alpaca.markets" PAPER_REST_HOST = "https://paper-api.alpaca.markets" +WEBSOCKET_HOST = "wss://paper-api.alpaca.markets/stream" # Market Data KEY = "" SECRET = "" @@ -79,7 +80,7 @@ class AlpacaGateway(BaseGateway): super(AlpacaGateway, self).__init__(event_engine, "ALPACA") self.rest_api = AlpacaRestApi(self) - #self.ws_api = AlpacaWebsocketApi(self) + self.ws_api = AlpacaWebsocketApi(self) self.order_map = {} def connect(self, setting: dict): @@ -92,7 +93,7 @@ class AlpacaGateway(BaseGateway): self.rest_api.connect(key, secret, session, proxy_host, proxy_port) - #self.ws_api.connect(key, secret, proxy_host, proxy_port) + self.ws_api.connect(key, secret, proxy_host, proxy_port) def subscribe(self, req: SubscribeRequest): """""" @@ -139,7 +140,7 @@ class AlpacaRestApi(RestClient): self.order_count = 1_000_000 self.order_count_lock = Lock() self.connect_time = 0 - self.order_dict={} + self.order_list = [] def query_account(self): print("call query_account") @@ -151,7 +152,6 @@ class AlpacaRestApi(RestClient): ) def on_query_account(self, data, request): - print("debug on_query_account: ", data, request) account = AccountData( accountid=data['id'], balance=float(data['cash']), @@ -170,7 +170,6 @@ class AlpacaRestApi(RestClient): ) def on_query_position(self, data, request): - print("debug on_query_position: ", data, "!!!", request) for d in data: position = PositionData( symbol=d['symbol'], @@ -225,14 +224,14 @@ class AlpacaRestApi(RestClient): self.gateway.write_log("ALPACA REST API启动成功") self.query_account() self.query_position() + #self.query_contracts() - def on_send_order(self, data, request: Request): - print("debug on send order: ", data, request) - order = request.extra - lcoal_order_id= order.orderId + def on_send_order(self, data, request ): + print("debug on_send_order data is {} request is {} ---".format( data, request)) remote_order_id = data['id'] - self.order_dict[local_order_id]=remote_order_id - print('debug on_send_order: ', local_order_id, remote_order_id) + order = request.extra + self.order_list.append(remote_order_id) + print('debug on_send_order: local is {} ---'.format(self.order_list)) self.gateway.on_order(order) def on_failed_order(self, status_code: int, request: Request): @@ -266,12 +265,13 @@ class AlpacaRestApi(RestClient): # need debug 0608 def cancel_order(self, req: CancelRequest): """""" + """ order_id = req.orderid remote_order_id = self.order_dict['order_id'] if remote_order_id is None: print("[error]: can not get remote_order_id from local dict!") return - path="/v1/orders/"+str(remote_order_id) + path = "/v1/orders/" + str(remote_order_id) self.add_request( "DELETE", path, @@ -280,11 +280,13 @@ class AlpacaRestApi(RestClient): extra=req ) print("come to cancel_order", order_id) +""" + pass def on_cancel_order(self, data, request): """Websocket will push a new order status""" pass - + def on_cancel_order_error(self, data, request): pass @@ -314,6 +316,38 @@ class AlpacaRestApi(RestClient): ) return order.vt_orderid + def on_query_contracts(self, data, request: Request): + for instrument_data in data: + symbol = instrument_data['symbol'] + contract = ContractData( + symbol=symbol, + exchange=Exchange.ALPACA, + name=symbol, + product=Product.SPOT, + size=1, # need debug + pricetick=0.01, # need debug + gateway_name=self.gateway_name + ) + self.on_contract(contract) + + def on_failed_query_contracts(self, status_code: int, request: Request): + pass + + def on_error_query_contracts(self, exception_type: type, exception_value: Exception, tb, request: Request): + pass + + def query_contracts(self): + params = {"status": "active"} + self.add_request( + "GET", + "/v1/assets", + params = params, + callback=self.on_query_contracts, + on_failed=self.on_failed_query_contracts, + on_error=self.on_error_query_contracts, + # data=data, + ) + class AlpacaWebsocketApi(WebsocketClient): """""" @@ -354,10 +388,26 @@ class AlpacaWebsocketApi(WebsocketClient): ): """""" self.key = key - self.secret = secret.encode() + self.secret = secret self.init(WEBSOCKET_HOST, proxy_host, proxy_port) self.start() + def login(self): + """""" + params={"action":"authenticate", "data": { + "key_id":self.key,"secret_key":self.secret + }} + print("string is {} ===".format(params)) + self.send_packet(params) + + def on_login(self, packet): + """""" + print("websocket authenticate success!!! msg: ", packet) + params={"action":"listen", "data": { + "streams":["account_updates", "trade_updates"] + }} + self.send_packet(params) + def subscribe(self, req: SubscribeRequest): pass @@ -372,7 +422,7 @@ class AlpacaWebsocketApi(WebsocketClient): def on_connected(self): """""" self.gateway.write_log("Websocket API连接成功") - self.authenticate() + self.login() def on_disconnected(self): """""" @@ -382,6 +432,17 @@ class AlpacaWebsocketApi(WebsocketClient): def on_packet(self, packet: dict): """""" print("debug on_packet: ", packet) + if "ping" in packet: + # has ping??? + req = {"pong": packet["ping"]} + print("ping response: ", req) + self.send_packet(req) + elif "stream" in packet and "data" in packet: + # on_data + pass + else: + print("unrecognize mesg", packet) + #parse authenticate/trade/other data here # ---------------------------------------------------------------------- def on_response(self, data): @@ -408,10 +469,6 @@ class AlpacaWebsocketApi(WebsocketClient): print("on_error: ", type, Exception, tb) pass - # debug OK , 0405 - def authenticate(self): - pass - def subscribe_topic(self): pass @@ -447,4 +504,4 @@ class AlpacaWebsocketApi(WebsocketClient): def on_contract(self, d): """""" - pass + pass \ No newline at end of file From e8e2e1121d91bac59c710bf390d2481dc3b73d63 Mon Sep 17 00:00:00 2001 From: vigarbuaa Date: Sat, 15 Jun 2019 13:45:50 +0800 Subject: [PATCH 07/20] websocket stream and auth and listening stream --- vnpy/gateway/alpaca/alpaca_gateway.py | 53 +++++++++++++++++++-------- 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index 528457e5..a033a384 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -392,7 +392,7 @@ class AlpacaWebsocketApi(WebsocketClient): self.init(WEBSOCKET_HOST, proxy_host, proxy_port) self.start() - def login(self): + def authenticate(self): """""" params={"action":"authenticate", "data": { "key_id":self.key,"secret_key":self.secret @@ -400,12 +400,13 @@ class AlpacaWebsocketApi(WebsocketClient): print("string is {} ===".format(params)) self.send_packet(params) - def on_login(self, packet): + def on_authenticate(self): """""" - print("websocket authenticate success!!! msg: ", packet) + print("websocket authenticate success!!! msg: " ) params={"action":"listen", "data": { "streams":["account_updates", "trade_updates"] }} + print("on_authenticate func", params) self.send_packet(params) def subscribe(self, req: SubscribeRequest): @@ -422,27 +423,45 @@ class AlpacaWebsocketApi(WebsocketClient): def on_connected(self): """""" self.gateway.write_log("Websocket API连接成功") - self.login() + self.authenticate() def on_disconnected(self): """""" self.gateway.write_log("Websocket API连接断开") - # need debug 20190404 def on_packet(self, packet: dict): """""" print("debug on_packet: ", packet) - if "ping" in packet: - # has ping??? - req = {"pong": packet["ping"]} - print("ping response: ", req) - self.send_packet(req) - elif "stream" in packet and "data" in packet: - # on_data - pass + if "stream" in packet and "data" in packet: + stream_ret = packet['stream'] + data_ret = packet['data'] + if(stream_ret == "authorization"): + self.handle_auth(packet) + elif(stream_ret == "listening"): + self.gateway.write_log("listening {}".format(data_ret)) + else: + self.ondata(packet) else: - print("unrecognize mesg", packet) - #parse authenticate/trade/other data here + print("unrecognize msg", packet) + + # ---------------------------------------------------------------------- + def on_data(self, data): + pass + + # ---------------------------------------------------------------------- + def handle_auth(self, data): + stream_ret = data['stream'] + data_ret = data['data'] + print("stream is {}, data is {}".format(stream_ret,data_ret)) + if (data_ret['status'] == "authorized"): + print("authorization success!!!") + self.gateway.write_log("authorization success!!!") + self.on_authenticate() + elif (data_ret['status'] == "unauthorized"): + print("authorization failed!!!") + self.gateway.write_log("authorization failed!!!") + else: + print("??unhandled status: ",data) # ---------------------------------------------------------------------- def on_response(self, data): @@ -467,7 +486,9 @@ class AlpacaWebsocketApi(WebsocketClient): def on_error(self, exception_type: type, exception_value: Exception, tb): """""" print("on_error: ", type, Exception, tb) - pass + sys.stderr.write( + self.exception_detail(exception_type, exception_value, tb ) + ) def subscribe_topic(self): pass From 1e0248ce5cc858e8a44e93d5b28a242155d938de Mon Sep 17 00:00:00 2001 From: vigarbuaa Date: Sat, 15 Jun 2019 15:07:07 +0800 Subject: [PATCH 08/20] add swicht {real, paper} url function --- vnpy/gateway/alpaca/alpaca_gateway.py | 69 +++++++++++++++++++-------- 1 file changed, 49 insertions(+), 20 deletions(-) diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index a033a384..20bc75f6 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -36,19 +36,19 @@ from vnpy.trader.object import ( ) REST_HOST = "https://api.alpaca.markets" +WEBSOCKET_HOST = "wss://api.alpaca.markets/stream" # Market Data PAPER_REST_HOST = "https://paper-api.alpaca.markets" -WEBSOCKET_HOST = "wss://paper-api.alpaca.markets/stream" # Market Data +PAPER_WEBSOCKET_HOST = "wss://paper-api.alpaca.markets/stream" # Market Data KEY = "" SECRET = "" STATUS_ALPACA2VT = { "new": Status.SUBMITTING, - "open": Status.NOTTRADED, - "part_filled": Status.PARTTRADED, - "filled": Status.ALLTRADED, + "partial_fill": Status.PARTTRADED, + "fill": Status.ALLTRADED, "cancelled": Status.CANCELLED, - "cancelling": Status.CANCELLED, - "failure": Status.REJECTED, + #"done_for_day": Status.CANCELLED, + "expired": Status.NOTTRADED } DIRECTION_VT2ALPACA = {Direction.LONG: "buy", Direction.SHORT: "sell"} @@ -71,6 +71,7 @@ class AlpacaGateway(BaseGateway): "key": "", "secret": "", "session": 3, + "服务器": ["REAL", "PAPER"], "proxy_host": "127.0.0.1", "proxy_port": 1080, } @@ -85,15 +86,17 @@ class AlpacaGateway(BaseGateway): def connect(self, setting: dict): """""" + print("[debug] gateway setting: ",setting) key = setting["key"] secret = setting["secret"] session = setting["session"] proxy_host = setting["proxy_host"] proxy_port = setting["proxy_port"] - - self.rest_api.connect(key, secret, session, proxy_host, proxy_port) - - self.ws_api.connect(key, secret, proxy_host, proxy_port) + env=setting['服务器'] + rest_url = REST_HOST if env == "REAL" else PAPER_REST_HOST + websocket_url = WEBSOCKET_HOST if env == "REAL" else PAPER_WEBSOCKET_HOST + self.rest_api.connect(key, secret, session, proxy_host, proxy_port,rest_url) + self.ws_api.connect(key, secret, proxy_host, proxy_port,websocket_url) def subscribe(self, req: SubscribeRequest): """""" @@ -152,6 +155,7 @@ class AlpacaRestApi(RestClient): ) def on_query_account(self, data, request): + print("on_query_account debug: " , data) account = AccountData( accountid=data['id'], balance=float(data['cash']), @@ -207,15 +211,15 @@ class AlpacaRestApi(RestClient): session_num: int, proxy_host: str, proxy_port: int, + url: str, ): """ Initialize connection to REST server. """ self.key = key self.secret = secret - - self.init(PAPER_REST_HOST, proxy_host, proxy_port) - print("rest connect: ", PAPER_REST_HOST, proxy_host, proxy_port) + self.init(url, proxy_host, proxy_port) + print("rest connect: ", url, proxy_host, proxy_port) self.start(session_num) self.connect_time = ( int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count @@ -227,7 +231,6 @@ class AlpacaRestApi(RestClient): #self.query_contracts() def on_send_order(self, data, request ): - print("debug on_send_order data is {} request is {} ---".format( data, request)) remote_order_id = data['id'] order = request.extra self.order_list.append(remote_order_id) @@ -384,12 +387,12 @@ class AlpacaWebsocketApi(WebsocketClient): self.channelDict = {} # ChannelID : (Channel, Symbol) def connect( - self, key: str, secret: str, proxy_host: str, proxy_port: int + self, key: str, secret: str, proxy_host: str, proxy_port: int,url:str ): """""" self.key = key self.secret = secret - self.init(WEBSOCKET_HOST, proxy_host, proxy_port) + self.init(url, proxy_host, proxy_port) self.start() def authenticate(self): @@ -397,16 +400,13 @@ class AlpacaWebsocketApi(WebsocketClient): params={"action":"authenticate", "data": { "key_id":self.key,"secret_key":self.secret }} - print("string is {} ===".format(params)) self.send_packet(params) def on_authenticate(self): """""" - print("websocket authenticate success!!! msg: " ) params={"action":"listen", "data": { "streams":["account_updates", "trade_updates"] }} - print("on_authenticate func", params) self.send_packet(params) def subscribe(self, req: SubscribeRequest): @@ -446,7 +446,36 @@ class AlpacaWebsocketApi(WebsocketClient): # ---------------------------------------------------------------------- def on_data(self, data): - pass + print("on_data is {}".format(data)) + stream_ret = data['stream'] + data_ret = data['data'] + if(stream_ret == "account_updates"): + #handle account + account = AccountData( + accountid=data_ret['id'], + balance=float(data_ret['cash']), + frozen=float(data_ret['cash']) - float(data_ret['cash_withdrawable']), + gateway_name=self.gateway_name + ) + self.gateway.on_account(account) + elif(stream_ret == "trade_updates"): + d=data_ret['order'] + trade = TradeData( + symbol=d["symbol"], + exchange=Exchange.ALPACA, + orderid=d['id'], + tradeid=None, + direction=DIRECTION_ALPACA2VT[d["side"]], + price=data_ret["price"], + volume=data_ret["qty"], + time=data_ret["timestamp"][11:19], + gateway_name=self.gateway_name, + ) + self.gateway.on_trade(trade) + #self.gateway.on_order(order) # udpate order status + else: + pass + # ---------------------------------------------------------------------- def handle_auth(self, data): From eb691525fc78cd00408cb66bea06df440a60f237 Mon Sep 17 00:00:00 2001 From: vigarbuaa Date: Tue, 18 Jun 2019 23:13:41 +0800 Subject: [PATCH 09/20] cancel order successful send cancel req by rest_client receive reponse by websocket ... need to debug --- vnpy/gateway/alpaca/alpaca_gateway.py | 50 ++++++++++++++++----------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index 20bc75f6..02d8709b 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -143,7 +143,7 @@ class AlpacaRestApi(RestClient): self.order_count = 1_000_000 self.order_count_lock = Lock() self.connect_time = 0 - self.order_list = [] + self.order_dict ={} def query_account(self): print("call query_account") @@ -230,11 +230,14 @@ class AlpacaRestApi(RestClient): self.query_position() #self.query_contracts() - def on_send_order(self, data, request ): + def on_send_order(self, data, request: Request ): + print("debug on_send_order data: ", data) + print("debug on_send_order request: ", request) + print("***debug on_send_order request: ", request.extra) remote_order_id = data['id'] order = request.extra - self.order_list.append(remote_order_id) - print('debug on_send_order: local is {} ---'.format(self.order_list)) + self.order_dict[order.orderid]=remote_order_id + print("+++debug on_send_order request: ", self.order_dict) self.gateway.on_order(order) def on_failed_order(self, status_code: int, request: Request): @@ -268,9 +271,9 @@ class AlpacaRestApi(RestClient): # need debug 0608 def cancel_order(self, req: CancelRequest): """""" - """ order_id = req.orderid - remote_order_id = self.order_dict['order_id'] + remote_order_id = self.order_dict[order_id] + print("debug cancel order: order id ", order_id, "---", remote_order_id) if remote_order_id is None: print("[error]: can not get remote_order_id from local dict!") return @@ -283,7 +286,6 @@ class AlpacaRestApi(RestClient): extra=req ) print("come to cancel_order", order_id) -""" pass def on_cancel_order(self, data, request): @@ -307,6 +309,7 @@ class AlpacaRestApi(RestClient): data = raw_dict order = req.create_order_data(orderid, self.gateway_name) + print("debug send_order orderBody extra: ",order) self.add_request( "POST", "/v1/orders", @@ -440,7 +443,7 @@ class AlpacaWebsocketApi(WebsocketClient): elif(stream_ret == "listening"): self.gateway.write_log("listening {}".format(data_ret)) else: - self.ondata(packet) + self.on_data(packet) else: print("unrecognize msg", packet) @@ -460,23 +463,28 @@ class AlpacaWebsocketApi(WebsocketClient): self.gateway.on_account(account) elif(stream_ret == "trade_updates"): d=data_ret['order'] - trade = TradeData( - symbol=d["symbol"], - exchange=Exchange.ALPACA, - orderid=d['id'], - tradeid=None, - direction=DIRECTION_ALPACA2VT[d["side"]], - price=data_ret["price"], - volume=data_ret["qty"], - time=data_ret["timestamp"][11:19], - gateway_name=self.gateway_name, - ) - self.gateway.on_trade(trade) + if (data_ret['event'] == "fill"): + trade = TradeData( + symbol=d["symbol"], + exchange=Exchange.ALPACA, + orderid=d['id'], + tradeid=None, + direction=DIRECTION_ALPACA2VT[d["side"]], + price=d["filled_avg_price"], + volume=d["filled_qty"], + time=data_ret["timestamp"][11:19], + gateway_name=self.gateway_name, + ) + self.gateway.on_trade(trade) + elif (data_ret['event'] == "canceled"): + + self.gateway.on_order(order) + else: + print("unhandled trade_update msg, ", data_ret['event']) #self.gateway.on_order(order) # udpate order status else: pass - # ---------------------------------------------------------------------- def handle_auth(self, data): stream_ret = data['stream'] From 04b0da5ce53a825126f132bfcefa7bf817eeb43f Mon Sep 17 00:00:00 2001 From: vigarbuaa Date: Thu, 20 Jun 2019 22:24:03 +0800 Subject: [PATCH 10/20] add try/catch to json parse func MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit rest_client认为每个返回都可解为json,在处理alpaca的撤单操作时报错 --- vnpy/api/rest/rest_client.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/vnpy/api/rest/rest_client.py b/vnpy/api/rest/rest_client.py index ead3992f..1af29631 100644 --- a/vnpy/api/rest/rest_client.py +++ b/vnpy/api/rest/rest_client.py @@ -268,9 +268,12 @@ class RestClient(object): request.response = response status_code = response.status_code if status_code // 100 == 2: # 2xx都算成功,尽管交易所都用200 - jsonBody = response.json() - request.callback(jsonBody, request) - request.status = RequestStatus.success + try: + jsonBody = response.json() + request.callback(jsonBody, request) + request.status = RequestStatus.success + except: + print("not json body from request, check body: ",response.text) else: request.status = RequestStatus.failed From 0753e3cfe2689fab49972c025ae05c75228f7a2a Mon Sep 17 00:00:00 2001 From: vigarbuaa Date: Thu, 20 Jun 2019 22:24:18 +0800 Subject: [PATCH 11/20] add cancel order fun for alpaca --- vnpy/gateway/alpaca/alpaca_gateway.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index 02d8709b..95c5e28a 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -60,7 +60,7 @@ ORDERTYPE_VT2ALPACA = { OrderType.MARKET: "market" } ORDERTYPE_ALPACA2VT = {v: k for k, v in ORDERTYPE_VT2ALPACA.items()} - +GLOBAL_ORDER={} class AlpacaGateway(BaseGateway): """ @@ -239,6 +239,8 @@ class AlpacaRestApi(RestClient): self.order_dict[order.orderid]=remote_order_id print("+++debug on_send_order request: ", self.order_dict) self.gateway.on_order(order) + GLOBAL_ORDER[remote_order_id]=order + print("===+++ debug update global_order_dict: ", GLOBAL_ORDER) def on_failed_order(self, status_code: int, request: Request): """ @@ -286,14 +288,15 @@ class AlpacaRestApi(RestClient): extra=req ) print("come to cancel_order", order_id) - pass def on_cancel_order(self, data, request): """Websocket will push a new order status""" pass - def on_cancel_order_error(self, data, request): - pass + def on_cancel_order_error( self, exception_type: type, exception_value: Exception, tb, request: Request): + # Record exception if not ConnectionError + if not issubclass(exception_type, ConnectionError): + self.on_error(exception_type, exception_value, tb, request) def send_order(self, req: OrderRequest): orderid = str(self.connect_time + self._new_order_id()) @@ -320,6 +323,7 @@ class AlpacaRestApi(RestClient): on_error=self.on_error_order, json_str=data, ) + print("debug send_order ret val : ", order.vt_orderid) return order.vt_orderid def on_query_contracts(self, data, request: Request): @@ -341,7 +345,8 @@ class AlpacaRestApi(RestClient): def on_error_query_contracts(self, exception_type: type, exception_value: Exception, tb, request: Request): pass - + + # need debug def query_contracts(self): params = {"status": "active"} self.add_request( @@ -477,8 +482,11 @@ class AlpacaWebsocketApi(WebsocketClient): ) self.gateway.on_trade(trade) elif (data_ret['event'] == "canceled"): - + order_id = d['id'] + order=GLOBAL_ORDER[order_id] + order.status = Status.CANCELLED self.gateway.on_order(order) + print("^^^^debug cancel order id, ",order_id," body: ",order) else: print("unhandled trade_update msg, ", data_ret['event']) #self.gateway.on_order(order) # udpate order status From 2dd33b5ba04326dbbe421adc72b02180e6be760c Mon Sep 17 00:00:00 2001 From: vigarbuaa Date: Fri, 21 Jun 2019 13:26:40 +0800 Subject: [PATCH 12/20] add subscribe func --- vnpy/gateway/alpaca/alpaca_gateway.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index 95c5e28a..08bb6fb7 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -36,9 +36,9 @@ from vnpy.trader.object import ( ) REST_HOST = "https://api.alpaca.markets" -WEBSOCKET_HOST = "wss://api.alpaca.markets/stream" # Market Data +WEBSOCKET_HOST = "wss://api.alpaca.markets/stream" # Market Data PAPER_REST_HOST = "https://paper-api.alpaca.markets" -PAPER_WEBSOCKET_HOST = "wss://paper-api.alpaca.markets/stream" # Market Data +PAPER_WEBSOCKET_HOST = "wss://paper-api.alpaca.markets/stream" # Market Data KEY = "" SECRET = "" @@ -100,7 +100,7 @@ class AlpacaGateway(BaseGateway): def subscribe(self, req: SubscribeRequest): """""" - # self.ws_api.subscribe(req) + self.ws_api.subscribe(req) pass def send_order(self, req: OrderRequest): @@ -393,6 +393,7 @@ class AlpacaWebsocketApi(WebsocketClient): self.askDict = {} self.orderLocalDict = {} self.channelDict = {} # ChannelID : (Channel, Symbol) + self.channels=["account_updates", "trade_updates"] def connect( self, key: str, secret: str, proxy_host: str, proxy_port: int,url:str @@ -413,12 +414,16 @@ class AlpacaWebsocketApi(WebsocketClient): def on_authenticate(self): """""" params={"action":"listen", "data": { - "streams":["account_updates", "trade_updates"] + "streams":self.channels }} self.send_packet(params) def subscribe(self, req: SubscribeRequest): - pass + self.channels.append(req.symbol) + params={"action":"listen", "data": { + "streams":self.channels + }} + self.send_packet(params) def send_order(self, req: OrderRequest): pass @@ -468,6 +473,8 @@ class AlpacaWebsocketApi(WebsocketClient): self.gateway.on_account(account) elif(stream_ret == "trade_updates"): d=data_ret['order'] + order_id = d['id'] + order=GLOBAL_ORDER[order_id] if (data_ret['event'] == "fill"): trade = TradeData( symbol=d["symbol"], @@ -481,9 +488,9 @@ class AlpacaWebsocketApi(WebsocketClient): gateway_name=self.gateway_name, ) self.gateway.on_trade(trade) + order.status = Status.ALLTRADED + self.gateway.on_order(order) elif (data_ret['event'] == "canceled"): - order_id = d['id'] - order=GLOBAL_ORDER[order_id] order.status = Status.CANCELLED self.gateway.on_order(order) print("^^^^debug cancel order id, ",order_id," body: ",order) From 9124302ccad9147595c6f3770a5168a888354893 Mon Sep 17 00:00:00 2001 From: vigarbuaa Date: Fri, 21 Jun 2019 22:31:30 +0800 Subject: [PATCH 13/20] add process_timer_event to query account and position --- vnpy/gateway/alpaca/alpaca_gateway.py | 63 +++++++++++++++++---------- 1 file changed, 40 insertions(+), 23 deletions(-) diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index 08bb6fb7..21e26b2b 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -13,6 +13,8 @@ from datetime import datetime from urllib.parse import urlencode from vnpy.api.rest import Request, RestClient from vnpy.api.websocket import WebsocketClient +from vnpy.event import Event +from vnpy.trader.event import EVENT_TIMER from vnpy.trader.constant import ( Direction, @@ -97,11 +99,11 @@ class AlpacaGateway(BaseGateway): websocket_url = WEBSOCKET_HOST if env == "REAL" else PAPER_WEBSOCKET_HOST self.rest_api.connect(key, secret, session, proxy_host, proxy_port,rest_url) self.ws_api.connect(key, secret, proxy_host, proxy_port,websocket_url) + self.init_query() def subscribe(self, req: SubscribeRequest): """""" self.ws_api.subscribe(req) - pass def send_order(self, req: OrderRequest): """""" @@ -123,6 +125,32 @@ class AlpacaGateway(BaseGateway): """""" self.rest_api.stop() self.ws_api.stop() + + def process_timer_event(self, event: Event): + """""" + self.count += 1 + if self.count < 5: + return + + self.query_account() + self.query_position() + + def init_query(self): + """""" + self.count = 0 + self.event_engine.register(EVENT_TIMER, self.process_timer_event) + + + def process_timer_event(self, event: Event): + """""" + self.count += 1 + if self.count < 5: + return + else: + self.count = 0 + + self.query_account() + self.query_position() class AlpacaRestApi(RestClient): @@ -419,7 +447,9 @@ class AlpacaWebsocketApi(WebsocketClient): self.send_packet(params) def subscribe(self, req: SubscribeRequest): - self.channels.append(req.symbol) + self.channels.append("A."+req.symbol) + self.channels.append("Q."+req.symbol) + print("debug subscribe: {} ".format(self.channels)) params={"action":"listen", "data": { "streams":self.channels }} @@ -435,7 +465,7 @@ class AlpacaWebsocketApi(WebsocketClient): def on_connected(self): """""" - self.gateway.write_log("Websocket API连接成功") + self.gateway.write_log("Websocket API连接成功") self.authenticate() def on_disconnected(self): @@ -464,6 +494,7 @@ class AlpacaWebsocketApi(WebsocketClient): data_ret = data['data'] if(stream_ret == "account_updates"): #handle account + print("!!!!!!!@@@@@@@@ account_udpate is {}".format(data)) account = AccountData( accountid=data_ret['id'], balance=float(data_ret['cash']), @@ -475,12 +506,15 @@ class AlpacaWebsocketApi(WebsocketClient): d=data_ret['order'] order_id = d['id'] order=GLOBAL_ORDER[order_id] - if (data_ret['event'] == "fill"): + local_order_id = order.orderid + order.status = STATUS_ALPACA2VT[data_ret['event']] + self.gateway.on_order(order) + if (data_ret['event'] == "fill" or data_ret['event'] == "partial_fill"): trade = TradeData( symbol=d["symbol"], exchange=Exchange.ALPACA, - orderid=d['id'], - tradeid=None, + orderid=local_order_id, + tradeid=d['id'], direction=DIRECTION_ALPACA2VT[d["side"]], price=d["filled_avg_price"], volume=d["filled_qty"], @@ -488,15 +522,6 @@ class AlpacaWebsocketApi(WebsocketClient): gateway_name=self.gateway_name, ) self.gateway.on_trade(trade) - order.status = Status.ALLTRADED - self.gateway.on_order(order) - elif (data_ret['event'] == "canceled"): - order.status = Status.CANCELLED - self.gateway.on_order(order) - print("^^^^debug cancel order id, ",order_id," body: ",order) - else: - print("unhandled trade_update msg, ", data_ret['event']) - #self.gateway.on_order(order) # udpate order status else: pass @@ -526,14 +551,6 @@ class AlpacaWebsocketApi(WebsocketClient): pass # ---------------------------------------------------------------------- - def on_wallet(self, data): - """""" - pass - - # ---------------------------------------------------------------------- - def on_trade_update(self, data): - """""" - pass def on_error(self, exception_type: type, exception_value: Exception, tb): """""" From 08d3b26a5d5060a0b2a4f5c43ef23e799d3b3368 Mon Sep 17 00:00:00 2001 From: vigarbuaa Date: Sat, 22 Jun 2019 10:22:04 +0800 Subject: [PATCH 14/20] remove debug print info --- vnpy/gateway/alpaca/alpaca_gateway.py | 28 ++++++--------------------- 1 file changed, 6 insertions(+), 22 deletions(-) diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index 21e26b2b..a34e8529 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -88,7 +88,6 @@ class AlpacaGateway(BaseGateway): def connect(self, setting: dict): """""" - print("[debug] gateway setting: ",setting) key = setting["key"] secret = setting["secret"] session = setting["session"] @@ -174,7 +173,6 @@ class AlpacaRestApi(RestClient): self.order_dict ={} def query_account(self): - print("call query_account") path = f"/v1/account" self.add_request( method="GET", @@ -183,7 +181,6 @@ class AlpacaRestApi(RestClient): ) def on_query_account(self, data, request): - print("on_query_account debug: " , data) account = AccountData( accountid=data['id'], balance=float(data['cash']), @@ -193,7 +190,6 @@ class AlpacaRestApi(RestClient): self.gateway.on_account(account) def query_position(self): - print("call query_position") path = f"/v1/positions" self.add_request( method="GET", @@ -208,7 +204,7 @@ class AlpacaRestApi(RestClient): exchange=Exchange.ALPACA, direction=DIRECTION_ALPACA2VT[d['side']], volume=d['qty'], - price=d['avg_entry_price'], + price=round(d['avg_entry_price'],3), pnl=d['unrealized_pl'], gateway_name=self.gateway_name, ) @@ -221,6 +217,7 @@ class AlpacaRestApi(RestClient): headers = { "APCA-API-KEY-ID": self.key, "APCA-API-SECRET-KEY": self.secret, + 'Content-Type': 'application/json' } request.headers = headers @@ -247,28 +244,21 @@ class AlpacaRestApi(RestClient): self.key = key self.secret = secret self.init(url, proxy_host, proxy_port) - print("rest connect: ", url, proxy_host, proxy_port) self.start(session_num) self.connect_time = ( int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count ) - print("rest client connected", self.connect_time) self.gateway.write_log("ALPACA REST API启动成功") self.query_account() self.query_position() #self.query_contracts() def on_send_order(self, data, request: Request ): - print("debug on_send_order data: ", data) - print("debug on_send_order request: ", request) - print("***debug on_send_order request: ", request.extra) remote_order_id = data['id'] order = request.extra self.order_dict[order.orderid]=remote_order_id - print("+++debug on_send_order request: ", self.order_dict) self.gateway.on_order(order) GLOBAL_ORDER[remote_order_id]=order - print("===+++ debug update global_order_dict: ", GLOBAL_ORDER) def on_failed_order(self, status_code: int, request: Request): """ @@ -278,7 +268,6 @@ class AlpacaRestApi(RestClient): order.status = Status.REJECTED self.gateway.on_order(order) msg = f"请求失败,状态码:{status_code},信息:{request.response.text}" - print('debug on_failed', msg) self.gateway.write_log(msg) def on_error_order( @@ -298,12 +287,10 @@ class AlpacaRestApi(RestClient): self.exception_detail(exception_type, exception_value, tb, request) ) - # need debug 0608 def cancel_order(self, req: CancelRequest): """""" order_id = req.orderid remote_order_id = self.order_dict[order_id] - print("debug cancel order: order id ", order_id, "---", remote_order_id) if remote_order_id is None: print("[error]: can not get remote_order_id from local dict!") return @@ -345,11 +332,11 @@ class AlpacaRestApi(RestClient): "POST", "/v1/orders", callback=self.on_send_order, - # data=data, + data=data, extra=order, on_failed=self.on_failed_order, on_error=self.on_error_order, - json_str=data, + #json_str=data, ) print("debug send_order ret val : ", order.vt_orderid) return order.vt_orderid @@ -362,8 +349,8 @@ class AlpacaRestApi(RestClient): exchange=Exchange.ALPACA, name=symbol, product=Product.SPOT, - size=1, # need debug - pricetick=0.01, # need debug + size=1, + pricetick=0.01, gateway_name=self.gateway_name ) self.on_contract(contract) @@ -374,7 +361,6 @@ class AlpacaRestApi(RestClient): def on_error_query_contracts(self, exception_type: type, exception_value: Exception, tb, request: Request): pass - # need debug def query_contracts(self): params = {"status": "active"} self.add_request( @@ -384,7 +370,6 @@ class AlpacaRestApi(RestClient): callback=self.on_query_contracts, on_failed=self.on_failed_query_contracts, on_error=self.on_error_query_contracts, - # data=data, ) @@ -529,7 +514,6 @@ class AlpacaWebsocketApi(WebsocketClient): def handle_auth(self, data): stream_ret = data['stream'] data_ret = data['data'] - print("stream is {}, data is {}".format(stream_ret,data_ret)) if (data_ret['status'] == "authorized"): print("authorization success!!!") self.gateway.write_log("authorization success!!!") From 17fe9bf914a98612cd80bff328d958b259531903 Mon Sep 17 00:00:00 2001 From: vigarbuaa Date: Sat, 22 Jun 2019 10:51:50 +0800 Subject: [PATCH 15/20] remove not used rest_client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. "json的内容可以通过设置Content-Type为application/json达成,没必要单独增加一个json字段。" remove not used code 2. 在alpaca撤单调试过程中,发生返回值200,但body.text是json的情况,致rest_client解析json报错,加了try/catch做处理 --- vnpy/api/rest/rest_client.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/vnpy/api/rest/rest_client.py b/vnpy/api/rest/rest_client.py index 1af29631..ce2fb60a 100644 --- a/vnpy/api/rest/rest_client.py +++ b/vnpy/api/rest/rest_client.py @@ -49,10 +49,6 @@ class Request(object): self.response = None self.status = RequestStatus.ready - self.json=None - - def set_json(self, json_str: dict): - self.json=json_str def __str__(self): if self.response is None: @@ -65,7 +61,6 @@ class Request(object): "headers: {}\n" "params: {}\n" "data: {}\n" - "json: {}\n" "response:" "{}\n".format( self.method, @@ -75,7 +70,6 @@ class Request(object): self.headers, self.params, self.data, - self.json, "" if self.response is None else self.response.text, ) ) @@ -151,7 +145,6 @@ class RestClient(object): on_failed: Callable = None, on_error: Callable = None, extra: Any = None, - json_str: dict = None, ): """ Add a new request. @@ -177,8 +170,6 @@ class RestClient(object): on_error, extra, ) - if json_str is not None: - request.set_json(json_str) self._queue.put(request) return request @@ -262,7 +253,6 @@ class RestClient(object): headers=request.headers, params=request.params, data=request.data, - json=request.json, proxies=self.proxies, ) request.response = response From 985c06254973f48f0dddac10ab0791b22161673e Mon Sep 17 00:00:00 2001 From: vigarbuaa Date: Sat, 22 Jun 2019 10:56:03 +0800 Subject: [PATCH 16/20] format code --- vnpy/gateway/alpaca/alpaca_gateway.py | 90 +++++++++++++-------------- 1 file changed, 45 insertions(+), 45 deletions(-) diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index a34e8529..31b37d24 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -49,7 +49,7 @@ STATUS_ALPACA2VT = { "partial_fill": Status.PARTTRADED, "fill": Status.ALLTRADED, "cancelled": Status.CANCELLED, - #"done_for_day": Status.CANCELLED, + # "done_for_day": Status.CANCELLED, "expired": Status.NOTTRADED } @@ -62,7 +62,8 @@ ORDERTYPE_VT2ALPACA = { OrderType.MARKET: "market" } ORDERTYPE_ALPACA2VT = {v: k for k, v in ORDERTYPE_VT2ALPACA.items()} -GLOBAL_ORDER={} +GLOBAL_ORDER = {} + class AlpacaGateway(BaseGateway): """ @@ -93,11 +94,12 @@ class AlpacaGateway(BaseGateway): session = setting["session"] proxy_host = setting["proxy_host"] proxy_port = setting["proxy_port"] - env=setting['服务器'] - rest_url = REST_HOST if env == "REAL" else PAPER_REST_HOST - websocket_url = WEBSOCKET_HOST if env == "REAL" else PAPER_WEBSOCKET_HOST - self.rest_api.connect(key, secret, session, proxy_host, proxy_port,rest_url) - self.ws_api.connect(key, secret, proxy_host, proxy_port,websocket_url) + env = setting['服务器'] + rest_url = REST_HOST if env == "REAL" else PAPER_REST_HOST + websocket_url = WEBSOCKET_HOST if env == "REAL" else PAPER_WEBSOCKET_HOST + self.rest_api.connect(key, secret, session, + proxy_host, proxy_port, rest_url) + self.ws_api.connect(key, secret, proxy_host, proxy_port, websocket_url) self.init_query() def subscribe(self, req: SubscribeRequest): @@ -124,7 +126,7 @@ class AlpacaGateway(BaseGateway): """""" self.rest_api.stop() self.ws_api.stop() - + def process_timer_event(self, event: Event): """""" self.count += 1 @@ -139,7 +141,6 @@ class AlpacaGateway(BaseGateway): self.count = 0 self.event_engine.register(EVENT_TIMER, self.process_timer_event) - def process_timer_event(self, event: Event): """""" self.count += 1 @@ -170,7 +171,7 @@ class AlpacaRestApi(RestClient): self.order_count = 1_000_000 self.order_count_lock = Lock() self.connect_time = 0 - self.order_dict ={} + self.order_dict = {} def query_account(self): path = f"/v1/account" @@ -204,7 +205,7 @@ class AlpacaRestApi(RestClient): exchange=Exchange.ALPACA, direction=DIRECTION_ALPACA2VT[d['side']], volume=d['qty'], - price=round(d['avg_entry_price'],3), + price=round(d['avg_entry_price'], 3), pnl=d['unrealized_pl'], gateway_name=self.gateway_name, ) @@ -251,14 +252,14 @@ class AlpacaRestApi(RestClient): self.gateway.write_log("ALPACA REST API启动成功") self.query_account() self.query_position() - #self.query_contracts() + # self.query_contracts() - def on_send_order(self, data, request: Request ): + def on_send_order(self, data, request: Request): remote_order_id = data['id'] order = request.extra - self.order_dict[order.orderid]=remote_order_id + self.order_dict[order.orderid] = remote_order_id self.gateway.on_order(order) - GLOBAL_ORDER[remote_order_id]=order + GLOBAL_ORDER[remote_order_id] = order def on_failed_order(self, status_code: int, request: Request): """ @@ -308,7 +309,7 @@ class AlpacaRestApi(RestClient): """Websocket will push a new order status""" pass - def on_cancel_order_error( self, exception_type: type, exception_value: Exception, tb, request: Request): + def on_cancel_order_error(self, exception_type: type, exception_value: Exception, tb, request: Request): # Record exception if not ConnectionError if not issubclass(exception_type, ConnectionError): self.on_error(exception_type, exception_value, tb, request) @@ -327,7 +328,7 @@ class AlpacaRestApi(RestClient): data = raw_dict order = req.create_order_data(orderid, self.gateway_name) - print("debug send_order orderBody extra: ",order) + print("debug send_order orderBody extra: ", order) self.add_request( "POST", "/v1/orders", @@ -336,20 +337,20 @@ class AlpacaRestApi(RestClient): extra=order, on_failed=self.on_failed_order, on_error=self.on_error_order, - #json_str=data, + # json_str=data, ) print("debug send_order ret val : ", order.vt_orderid) return order.vt_orderid def on_query_contracts(self, data, request: Request): - for instrument_data in data: + for instrument_data in data: symbol = instrument_data['symbol'] contract = ContractData( symbol=symbol, - exchange=Exchange.ALPACA, + exchange=Exchange.ALPACA, name=symbol, product=Product.SPOT, - size=1, + size=1, pricetick=0.01, gateway_name=self.gateway_name ) @@ -360,13 +361,13 @@ class AlpacaRestApi(RestClient): def on_error_query_contracts(self, exception_type: type, exception_value: Exception, tb, request: Request): pass - + def query_contracts(self): params = {"status": "active"} self.add_request( "GET", "/v1/assets", - params = params, + params=params, callback=self.on_query_contracts, on_failed=self.on_failed_query_contracts, on_error=self.on_error_query_contracts, @@ -406,10 +407,10 @@ class AlpacaWebsocketApi(WebsocketClient): self.askDict = {} self.orderLocalDict = {} self.channelDict = {} # ChannelID : (Channel, Symbol) - self.channels=["account_updates", "trade_updates"] + self.channels = ["account_updates", "trade_updates"] def connect( - self, key: str, secret: str, proxy_host: str, proxy_port: int,url:str + self, key: str, secret: str, proxy_host: str, proxy_port: int, url: str ): """""" self.key = key @@ -419,24 +420,24 @@ class AlpacaWebsocketApi(WebsocketClient): def authenticate(self): """""" - params={"action":"authenticate", "data": { - "key_id":self.key,"secret_key":self.secret + params = {"action": "authenticate", "data": { + "key_id": self.key, "secret_key": self.secret }} self.send_packet(params) def on_authenticate(self): """""" - params={"action":"listen", "data": { - "streams":self.channels + params = {"action": "listen", "data": { + "streams": self.channels }} self.send_packet(params) - + def subscribe(self, req: SubscribeRequest): - self.channels.append("A."+req.symbol) - self.channels.append("Q."+req.symbol) + self.channels.append("A." + req.symbol) + self.channels.append("Q." + req.symbol) print("debug subscribe: {} ".format(self.channels)) - params={"action":"listen", "data": { - "streams":self.channels + params = {"action": "listen", "data": { + "streams": self.channels }} self.send_packet(params) @@ -450,7 +451,7 @@ class AlpacaWebsocketApi(WebsocketClient): def on_connected(self): """""" - self.gateway.write_log("Websocket API连接成功") + self.gateway.write_log("Websocket API连接成功") self.authenticate() def on_disconnected(self): @@ -463,9 +464,9 @@ class AlpacaWebsocketApi(WebsocketClient): if "stream" in packet and "data" in packet: stream_ret = packet['stream'] data_ret = packet['data'] - if(stream_ret == "authorization"): + if(stream_ret == "authorization"): self.handle_auth(packet) - elif(stream_ret == "listening"): + elif(stream_ret == "listening"): self.gateway.write_log("listening {}".format(data_ret)) else: self.on_data(packet) @@ -478,19 +479,18 @@ class AlpacaWebsocketApi(WebsocketClient): stream_ret = data['stream'] data_ret = data['data'] if(stream_ret == "account_updates"): - #handle account - print("!!!!!!!@@@@@@@@ account_udpate is {}".format(data)) account = AccountData( accountid=data_ret['id'], balance=float(data_ret['cash']), - frozen=float(data_ret['cash']) - float(data_ret['cash_withdrawable']), + frozen=float(data_ret['cash']) - + float(data_ret['cash_withdrawable']), gateway_name=self.gateway_name ) self.gateway.on_account(account) elif(stream_ret == "trade_updates"): - d=data_ret['order'] + d = data_ret['order'] order_id = d['id'] - order=GLOBAL_ORDER[order_id] + order = GLOBAL_ORDER[order_id] local_order_id = order.orderid order.status = STATUS_ALPACA2VT[data_ret['event']] self.gateway.on_order(order) @@ -522,7 +522,7 @@ class AlpacaWebsocketApi(WebsocketClient): print("authorization failed!!!") self.gateway.write_log("authorization failed!!!") else: - print("??unhandled status: ",data) + print("??unhandled status: ", data) # ---------------------------------------------------------------------- def on_response(self, data): @@ -540,7 +540,7 @@ class AlpacaWebsocketApi(WebsocketClient): """""" print("on_error: ", type, Exception, tb) sys.stderr.write( - self.exception_detail(exception_type, exception_value, tb ) + self.exception_detail(exception_type, exception_value, tb) ) def subscribe_topic(self): @@ -578,4 +578,4 @@ class AlpacaWebsocketApi(WebsocketClient): def on_contract(self, d): """""" - pass \ No newline at end of file + pass From af94e1e186a5ff75be7ea7cc6ed1108e74e7dc57 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Tue, 25 Jun 2019 10:37:40 +0800 Subject: [PATCH 17/20] [Del] remove unnecessary try...catch --- vnpy/api/rest/rest_client.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/vnpy/api/rest/rest_client.py b/vnpy/api/rest/rest_client.py index ce2fb60a..59ff4a42 100644 --- a/vnpy/api/rest/rest_client.py +++ b/vnpy/api/rest/rest_client.py @@ -257,13 +257,10 @@ class RestClient(object): ) request.response = response status_code = response.status_code - if status_code // 100 == 2: # 2xx都算成功,尽管交易所都用200 - try: - jsonBody = response.json() - request.callback(jsonBody, request) - request.status = RequestStatus.success - except: - print("not json body from request, check body: ",response.text) + if status_code // 100 == 2: # 2xx codes are all successful + json_body = response.json() + request.callback(json_body, request) + request.status = RequestStatus.success else: request.status = RequestStatus.failed From 1b139f34a1cfc1dfa1993f95731ca34d21270dd7 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Tue, 25 Jun 2019 10:47:27 +0800 Subject: [PATCH 18/20] [Mod] improve code readability --- vnpy/gateway/alpaca/alpaca_gateway.py | 63 ++++++--------------------- 1 file changed, 13 insertions(+), 50 deletions(-) diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index 31b37d24..b111ea27 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -3,14 +3,9 @@ Author: vigarbuaa """ -import hashlib -import hmac import sys -import time -from copy import copy from threading import Lock from datetime import datetime -from urllib.parse import urlencode from vnpy.api.rest import Request, RestClient from vnpy.api.websocket import WebsocketClient from vnpy.event import Event @@ -21,16 +16,15 @@ from vnpy.trader.constant import ( Exchange, OrderType, Product, - Status, - Offset, - Interval + Status ) from vnpy.trader.gateway import BaseGateway from vnpy.trader.object import ( TickData, OrderData, TradeData, - PositionData, AccountData, + PositionData, + AccountData, ContractData, OrderRequest, CancelRequest, @@ -74,7 +68,7 @@ class AlpacaGateway(BaseGateway): "key": "", "secret": "", "session": 3, - "服务器": ["REAL", "PAPER"], + "server": ["REAL", "PAPER"], "proxy_host": "127.0.0.1", "proxy_port": 1080, } @@ -94,7 +88,7 @@ class AlpacaGateway(BaseGateway): session = setting["session"] proxy_host = setting["proxy_host"] proxy_port = setting["proxy_port"] - env = setting['服务器'] + env = setting['server'] rest_url = REST_HOST if env == "REAL" else PAPER_REST_HOST websocket_url = WEBSOCKET_HOST if env == "REAL" else PAPER_WEBSOCKET_HOST self.rest_api.connect(key, secret, session, @@ -127,15 +121,6 @@ class AlpacaGateway(BaseGateway): self.rest_api.stop() self.ws_api.stop() - def process_timer_event(self, event: Event): - """""" - self.count += 1 - if self.count < 5: - return - - self.query_account() - self.query_position() - def init_query(self): """""" self.count = 0 @@ -146,8 +131,7 @@ class AlpacaGateway(BaseGateway): self.count += 1 if self.count < 5: return - else: - self.count = 0 + self.count = 0 self.query_account() self.query_position() @@ -440,15 +424,7 @@ class AlpacaWebsocketApi(WebsocketClient): "streams": self.channels }} self.send_packet(params) - - def send_order(self, req: OrderRequest): - pass - - # ---------------------------------------------------------------------- - def cancel_order(self, req: CancelRequest): - """""" - pass - + def on_connected(self): """""" self.gateway.write_log("Websocket API连接成功") @@ -472,18 +448,18 @@ class AlpacaWebsocketApi(WebsocketClient): self.on_data(packet) else: print("unrecognize msg", packet) - - # ---------------------------------------------------------------------- + def on_data(self, data): print("on_data is {}".format(data)) stream_ret = data['stream'] data_ret = data['data'] if(stream_ret == "account_updates"): + frozen = float(data_ret['cash']) - float(data_ret['cash_withdrawable']) + account = AccountData( accountid=data_ret['id'], balance=float(data_ret['cash']), - frozen=float(data_ret['cash']) - - float(data_ret['cash_withdrawable']), + frozen=frozen, gateway_name=self.gateway_name ) self.gateway.on_account(account) @@ -509,10 +485,9 @@ class AlpacaWebsocketApi(WebsocketClient): self.gateway.on_trade(trade) else: pass - - # ---------------------------------------------------------------------- + def handle_auth(self, data): - stream_ret = data['stream'] + # stream_ret = data['stream'] data_ret = data['data'] if (data_ret['status'] == "authorized"): print("authorization success!!!") @@ -524,18 +499,6 @@ class AlpacaWebsocketApi(WebsocketClient): else: print("??unhandled status: ", data) - # ---------------------------------------------------------------------- - def on_response(self, data): - """""" - pass - # ---------------------------------------------------------------------- - - def on_update(self, data): - """""" - pass - - # ---------------------------------------------------------------------- - def on_error(self, exception_type: type, exception_value: Exception, tb): """""" print("on_error: ", type, Exception, tb) From a25051087d102031643e944df765bdda1c660fb7 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Tue, 25 Jun 2019 16:25:34 +0800 Subject: [PATCH 19/20] [Mod] complete test of AlpacaGateway --- examples/vn_trader/run.py | 82 +-- vnpy/api/rest/rest_client.py | 6 +- vnpy/gateway/alpaca/alpaca_gateway.py | 747 +++++++++++++++----------- 3 files changed, 474 insertions(+), 361 deletions(-) diff --git a/examples/vn_trader/run.py b/examples/vn_trader/run.py index c7cd65c5..a364d98d 100644 --- a/examples/vn_trader/run.py +++ b/examples/vn_trader/run.py @@ -3,30 +3,31 @@ from vnpy.event import EventEngine from vnpy.trader.engine import MainEngine from vnpy.trader.ui import MainWindow, create_qapp -from vnpy.gateway.binance import BinanceGateway -from vnpy.gateway.bitmex import BitmexGateway -from vnpy.gateway.futu import FutuGateway -from vnpy.gateway.ib import IbGateway -from vnpy.gateway.ctp import CtpGateway +# from vnpy.gateway.binance import BinanceGateway +# from vnpy.gateway.bitmex import BitmexGateway +# from vnpy.gateway.futu import FutuGateway +# from vnpy.gateway.ib import IbGateway +# from vnpy.gateway.ctp import CtpGateway # from vnpy.gateway.ctptest import CtptestGateway -from vnpy.gateway.femas import FemasGateway -from vnpy.gateway.tiger import TigerGateway +# from vnpy.gateway.femas import FemasGateway +# from vnpy.gateway.tiger import TigerGateway # from vnpy.gateway.oes import OesGateway -from vnpy.gateway.okex import OkexGateway -from vnpy.gateway.huobi import HuobiGateway -from vnpy.gateway.bitfinex import BitfinexGateway -from vnpy.gateway.onetoken import OnetokenGateway -from vnpy.gateway.okexf import OkexfGateway +# from vnpy.gateway.okex import OkexGateway +# from vnpy.gateway.huobi import HuobiGateway +# from vnpy.gateway.bitfinex import BitfinexGateway +# from vnpy.gateway.onetoken import OnetokenGateway +# from vnpy.gateway.okexf import OkexfGateway # from vnpy.gateway.xtp import XtpGateway -from vnpy.gateway.hbdm import HbdmGateway -from vnpy.gateway.tap import TapGateway +# from vnpy.gateway.hbdm import HbdmGateway +# from vnpy.gateway.tap import TapGateway +from vnpy.gateway.alpaca import AlpacaGateway -from vnpy.app.cta_strategy import CtaStrategyApp -from vnpy.app.csv_loader import CsvLoaderApp -from vnpy.app.algo_trading import AlgoTradingApp -from vnpy.app.cta_backtester import CtaBacktesterApp -from vnpy.app.data_recorder import DataRecorderApp -from vnpy.app.risk_manager import RiskManagerApp +# from vnpy.app.cta_strategy import CtaStrategyApp +# from vnpy.app.csv_loader import CsvLoaderApp +# from vnpy.app.algo_trading import AlgoTradingApp +# from vnpy.app.cta_backtester import CtaBacktesterApp +# from vnpy.app.data_recorder import DataRecorderApp +# from vnpy.app.risk_manager import RiskManagerApp def main(): @@ -37,30 +38,31 @@ def main(): main_engine = MainEngine(event_engine) - main_engine.add_gateway(BinanceGateway) - main_engine.add_gateway(CtpGateway) + # main_engine.add_gateway(BinanceGateway) + # main_engine.add_gateway(CtpGateway) # main_engine.add_gateway(CtptestGateway) - main_engine.add_gateway(FemasGateway) - main_engine.add_gateway(IbGateway) - main_engine.add_gateway(FutuGateway) - main_engine.add_gateway(BitmexGateway) - main_engine.add_gateway(TigerGateway) + # main_engine.add_gateway(FemasGateway) + # main_engine.add_gateway(IbGateway) + # main_engine.add_gateway(FutuGateway) + # main_engine.add_gateway(BitmexGateway) + # main_engine.add_gateway(TigerGateway) # main_engine.add_gateway(OesGateway) - main_engine.add_gateway(OkexGateway) - main_engine.add_gateway(HuobiGateway) - main_engine.add_gateway(BitfinexGateway) - main_engine.add_gateway(OnetokenGateway) - main_engine.add_gateway(OkexfGateway) - main_engine.add_gateway(HbdmGateway) + # main_engine.add_gateway(OkexGateway) + # main_engine.add_gateway(HuobiGateway) + # main_engine.add_gateway(BitfinexGateway) + # main_engine.add_gateway(OnetokenGateway) + # main_engine.add_gateway(OkexfGateway) + # main_engine.add_gateway(HbdmGateway) # main_engine.add_gateway(XtpGateway) - main_engine.add_gateway(TapGateway) + # main_engine.add_gateway(TapGateway) + main_engine.add_gateway(AlpacaGateway) - main_engine.add_app(CtaStrategyApp) - main_engine.add_app(CtaBacktesterApp) - main_engine.add_app(CsvLoaderApp) - main_engine.add_app(AlgoTradingApp) - main_engine.add_app(DataRecorderApp) - main_engine.add_app(RiskManagerApp) + # main_engine.add_app(CtaStrategyApp) + # main_engine.add_app(CtaBacktesterApp) + # main_engine.add_app(CsvLoaderApp) + # main_engine.add_app(AlgoTradingApp) + # main_engine.add_app(DataRecorderApp) + # main_engine.add_app(RiskManagerApp) main_window = MainWindow(main_engine, event_engine) main_window.showMaximized() diff --git a/vnpy/api/rest/rest_client.py b/vnpy/api/rest/rest_client.py index 59ff4a42..4c3e1e50 100644 --- a/vnpy/api/rest/rest_client.py +++ b/vnpy/api/rest/rest_client.py @@ -258,7 +258,11 @@ class RestClient(object): request.response = response status_code = response.status_code if status_code // 100 == 2: # 2xx codes are all successful - json_body = response.json() + if status_code == 204: + json_body = None + else: + json_body = response.json() + request.callback(json_body, request) request.status = RequestStatus.success else: diff --git a/vnpy/gateway/alpaca/alpaca_gateway.py b/vnpy/gateway/alpaca/alpaca_gateway.py index b111ea27..036708c3 100644 --- a/vnpy/gateway/alpaca/alpaca_gateway.py +++ b/vnpy/gateway/alpaca/alpaca_gateway.py @@ -1,9 +1,10 @@ # encoding: UTF-8 """ - Author: vigarbuaa +Author: vigarbuaa """ import sys +import json from threading import Lock from datetime import datetime from vnpy.api.rest import Request, RestClient @@ -23,7 +24,7 @@ from vnpy.trader.object import ( TickData, OrderData, TradeData, - PositionData, + PositionData, AccountData, ContractData, OrderRequest, @@ -31,32 +32,42 @@ from vnpy.trader.object import ( SubscribeRequest, ) -REST_HOST = "https://api.alpaca.markets" -WEBSOCKET_HOST = "wss://api.alpaca.markets/stream" # Market Data -PAPER_REST_HOST = "https://paper-api.alpaca.markets" -PAPER_WEBSOCKET_HOST = "wss://paper-api.alpaca.markets/stream" # Market Data -KEY = "" -SECRET = "" + +REST_HOST = "https://api.alpaca.markets" # Live trading +WEBSOCKET_HOST = "wss://api.alpaca.markets/stream" +PAPER_REST_HOST = "https://paper-api.alpaca.markets" # Paper Trading +PAPER_WEBSOCKET_HOST = "wss://paper-api.alpaca.markets/stream" + +DATA_REST_HOST = "https://data.alpaca.markets" + STATUS_ALPACA2VT = { - "new": Status.SUBMITTING, - "partial_fill": Status.PARTTRADED, - "fill": Status.ALLTRADED, - "cancelled": Status.CANCELLED, - # "done_for_day": Status.CANCELLED, - "expired": Status.NOTTRADED + "new": Status.NOTTRADED, + "partially_filled": Status.PARTTRADED, + "filled": Status.ALLTRADED, + "canceled": Status.CANCELLED, + "expired": Status.CANCELLED, + "rejected": Status.REJECTED } -DIRECTION_VT2ALPACA = {Direction.LONG: "buy", Direction.SHORT: "sell"} -DIRECTION_ALPACA2VT = {"buy": Direction.LONG, "sell": Direction.SHORT, - "long": Direction.LONG, "short": Direction.SHORT} +DIRECTION_VT2ALPACA = { + Direction.LONG: "buy", + Direction.SHORT: "sell" +} +DIRECTION_ALPACA2VT = { + "buy": Direction.LONG, + "sell": Direction.SHORT, + "long": Direction.LONG, + "short": Direction.SHORT +} ORDERTYPE_VT2ALPACA = { OrderType.LIMIT: "limit", OrderType.MARKET: "market" } ORDERTYPE_ALPACA2VT = {v: k for k, v in ORDERTYPE_VT2ALPACA.items()} -GLOBAL_ORDER = {} + +LOCAL_SYS_MAP = {} class AlpacaGateway(BaseGateway): @@ -65,40 +76,41 @@ class AlpacaGateway(BaseGateway): """ default_setting = { - "key": "", - "secret": "", - "session": 3, - "server": ["REAL", "PAPER"], - "proxy_host": "127.0.0.1", - "proxy_port": 1080, + "KEY ID": "", + "Secret Key": "", + "会话数": 10, + "服务器": ["REAL", "PAPER"] } + exchanges = [Exchange.SMART] + def __init__(self, event_engine): """Constructor""" - super(AlpacaGateway, self).__init__(event_engine, "ALPACA") + super().__init__(event_engine, "ALPACA") self.rest_api = AlpacaRestApi(self) self.ws_api = AlpacaWebsocketApi(self) - self.order_map = {} + self.data_rest_api = AlpacaDataRestApi(self) def connect(self, setting: dict): """""" - key = setting["key"] - secret = setting["secret"] - session = setting["session"] - proxy_host = setting["proxy_host"] - proxy_port = setting["proxy_port"] - env = setting['server'] - rest_url = REST_HOST if env == "REAL" else PAPER_REST_HOST - websocket_url = WEBSOCKET_HOST if env == "REAL" else PAPER_WEBSOCKET_HOST - self.rest_api.connect(key, secret, session, - proxy_host, proxy_port, rest_url) - self.ws_api.connect(key, secret, proxy_host, proxy_port, websocket_url) + key = setting["KEY ID"] + secret = setting["Secret Key"] + session = setting["会话数"] + server = setting["服务器"] + + rest_url = REST_HOST if server == "REAL" else PAPER_REST_HOST + websocket_url = WEBSOCKET_HOST if server == "REAL" else PAPER_WEBSOCKET_HOST + + self.rest_api.connect(key, secret, session, rest_url) + self.data_rest_api.connect(key, secret, session) + self.ws_api.connect(key, secret, websocket_url) + self.init_query() def subscribe(self, req: SubscribeRequest): """""" - self.ws_api.subscribe(req) + self.data_rest_api.subscribe(req) def send_order(self, req: OrderRequest): """""" @@ -119,6 +131,7 @@ class AlpacaGateway(BaseGateway): def close(self): """""" self.rest_api.stop() + self.data_rest_api.stop() self.ws_api.stop() def init_query(self): @@ -128,6 +141,8 @@ class AlpacaGateway(BaseGateway): def process_timer_event(self, event: Event): """""" + self.data_rest_api.query_bar() + self.count += 1 if self.count < 5: return @@ -142,9 +157,9 @@ class AlpacaRestApi(RestClient): Alpaca REST API """ - def __init__(self, gateway: BaseGateway): + def __init__(self, gateway: AlpacaGateway): """""" - super(AlpacaRestApi, self).__init__() + super().__init__() self.gateway = gateway self.gateway_name = gateway.gateway_name @@ -154,46 +169,10 @@ class AlpacaRestApi(RestClient): self.order_count = 1_000_000 self.order_count_lock = Lock() + self.connect_time = 0 - self.order_dict = {} - def query_account(self): - path = f"/v1/account" - self.add_request( - method="GET", - path=path, - callback=self.on_query_account - ) - - def on_query_account(self, data, request): - account = AccountData( - accountid=data['id'], - balance=float(data['cash']), - frozen=float(data['cash']) - float(data['buying_power']), - gateway_name=self.gateway_name - ) - self.gateway.on_account(account) - - def query_position(self): - path = f"/v1/positions" - self.add_request( - method="GET", - path=path, - callback=self.on_query_position - ) - - def on_query_position(self, data, request): - for d in data: - position = PositionData( - symbol=d['symbol'], - exchange=Exchange.ALPACA, - direction=DIRECTION_ALPACA2VT[d['side']], - volume=d['qty'], - price=round(d['avg_entry_price'], 3), - pnl=d['unrealized_pl'], - gateway_name=self.gateway_name, - ) - self.gateway.on_position(position) + self.cancel_reqs = {} def sign(self, request): """ @@ -202,60 +181,226 @@ class AlpacaRestApi(RestClient): headers = { "APCA-API-KEY-ID": self.key, "APCA-API-SECRET-KEY": self.secret, - 'Content-Type': 'application/json' + "Content-Type": "application/json" } request.headers = headers request.allow_redirects = False + request.data = json.dumps(request.data) return request - def _new_order_id(self): - with self.order_count_lock: - self.order_count += 1 - return self.order_count - def connect( self, key: str, secret: str, session_num: int, - proxy_host: str, - proxy_port: int, url: str, ): """ - Initialize connection to REST server. + Initialize connection to REST server. """ self.key = key self.secret = secret - self.init(url, proxy_host, proxy_port) - self.start(session_num) + self.connect_time = ( int(datetime.now().strftime("%y%m%d%H%M%S")) * self.order_count ) - self.gateway.write_log("ALPACA REST API启动成功") + + self.init(url) + self.start(session_num) + + self.gateway.write_log("REST API启动成功") + self.query_contract() self.query_account() self.query_position() - # self.query_contracts() + self.query_order() + + def query_contract(self): + """""" + params = {"status": "active"} + + self.add_request( + "GET", + "/v2/assets", + params=params, + callback=self.on_query_contract + ) + + def query_account(self): + """""" + self.add_request( + method="GET", + path="/v2/account", + callback=self.on_query_account + ) + + def query_position(self): + """""" + self.add_request( + method="GET", + path="/v2/positions", + callback=self.on_query_position + ) + + def query_order(self): + """""" + params = { + "status": "open" + } + + self.add_request( + method="GET", + path="/v2/orders", + params=params, + callback=self.on_query_order + ) + + def _new_order_id(self): + """""" + with self.order_count_lock: + self.order_count += 1 + return self.order_count + + def send_order(self, req: OrderRequest): + """""" + local_orderid = str(self.connect_time + self._new_order_id()) + + data = { + "symbol": req.symbol, + "qty": str(req.volume), + "side": DIRECTION_VT2ALPACA[req.direction], + "type": ORDERTYPE_VT2ALPACA[req.type], + "time_in_force": "day", + "client_order_id": local_orderid + } + + if data["type"] == "limit": + data["limit_price"] = str(req.price) + + order = req.create_order_data(local_orderid, self.gateway_name) + self.gateway.on_order(order) + + self.add_request( + "POST", + "/v2/orders", + callback=self.on_send_order, + data=data, + extra=order, + on_failed=self.on_send_order_failed, + on_error=self.on_send_order_error, + ) + + return order.vt_orderid + + def cancel_order(self, req: CancelRequest): + """""" + sys_orderid = LOCAL_SYS_MAP.get(req.orderid, None) + if not sys_orderid: + self.cancel_reqs[req.orderid] = req + return + + path = f"/v2/orders/{sys_orderid}" + + self.add_request( + "DELETE", + path, + callback=self.on_cancel_order, + extra=req + ) + + def on_query_contract(self, data, request: Request): + """""" + for d in data: + symbol = d["symbol"] + + contract = ContractData( + symbol=symbol, + exchange=Exchange.SMART, + name=symbol, + product=Product.SPOT, + size=1, + pricetick=0.01, + gateway_name=self.gateway_name + ) + self.gateway.on_contract(contract) + + self.gateway.write_log("合约信息查询成功") + + def on_query_account(self, data, request): + """""" + account = AccountData( + accountid=data["id"], + balance=float(data["equity"]), + gateway_name=self.gateway_name + ) + self.gateway.on_account(account) + + def on_query_position(self, data, request): + """""" + for d in data: + position = PositionData( + symbol=d["symbol"], + exchange=Exchange.SMART, + direction=DIRECTION_ALPACA2VT[d["side"]], + volume=int(d["qty"]), + price=float(d["avg_entry_price"]), + pnl=float(d["unrealized_pl"]), + gateway_name=self.gateway_name, + ) + self.gateway.on_position(position) + + def update_order(self, d: dict): + """""" + sys_orderid = d["id"] + local_orderid = d["client_order_id"] + LOCAL_SYS_MAP[local_orderid] = sys_orderid + + direction = DIRECTION_ALPACA2VT[d["side"]] + order_type = ORDERTYPE_ALPACA2VT[d["type"]] + + order = OrderData( + orderid=local_orderid, + symbol=d["symbol"], + exchange=Exchange.SMART, + price=float(d["limit_price"]), + volume=float(d["qty"]), + type=order_type, + direction=direction, + traded=float(d["filled_qty"]), + status=STATUS_ALPACA2VT.get(d["status"], Status.SUBMITTING), + time=d["created_at"], + gateway_name=self.gateway_name, + ) + self.gateway.on_order(order) + + def on_query_order(self, data, request): + """""" + for d in data: + self.update_order(d) + + self.gateway.write_log("委托信息查询成功") def on_send_order(self, data, request: Request): - remote_order_id = data['id'] - order = request.extra - self.order_dict[order.orderid] = remote_order_id - self.gateway.on_order(order) - GLOBAL_ORDER[remote_order_id] = order + """""" + self.update_order(data) - def on_failed_order(self, status_code: int, request: Request): + order = request.extra + if order.orderid in self.cancel_reqs: + req = self.cancel_reqs.pop(order.orderid) + self.cancel_order(req) + + def on_send_order_failed(self, status_code: int, request: Request): """ Callback to handle request failed. """ order = request.extra order.status = Status.REJECTED self.gateway.on_order(order) + msg = f"请求失败,状态码:{status_code},信息:{request.response.text}" self.gateway.write_log(msg) - def on_error_order( + def on_send_order_error( self, exception_type: type, exception_value: Exception, tb, request: Request ): """ @@ -264,167 +409,73 @@ class AlpacaRestApi(RestClient): order = request.extra order.status = Status.REJECTED self.gateway.on_order(order) + msg = f"触发异常,状态码:{exception_type},信息:{exception_value}" - print('debug on_error', msg) self.gateway.write_log(msg) sys.stderr.write( self.exception_detail(exception_type, exception_value, tb, request) ) - def cancel_order(self, req: CancelRequest): - """""" - order_id = req.orderid - remote_order_id = self.order_dict[order_id] - if remote_order_id is None: - print("[error]: can not get remote_order_id from local dict!") - return - path = "/v1/orders/" + str(remote_order_id) - self.add_request( - "DELETE", - path, - callback=self.on_cancel_order, - on_error=self.on_cancel_order_error, - extra=req - ) - print("come to cancel_order", order_id) - def on_cancel_order(self, data, request): - """Websocket will push a new order status""" - pass - - def on_cancel_order_error(self, exception_type: type, exception_value: Exception, tb, request: Request): - # Record exception if not ConnectionError - if not issubclass(exception_type, ConnectionError): - self.on_error(exception_type, exception_value, tb, request) - - def send_order(self, req: OrderRequest): - orderid = str(self.connect_time + self._new_order_id()) - raw_dict = { - "symbol": req.symbol, - "qty": int(req.volume), - "side": DIRECTION_VT2ALPACA[req.direction], - "type": ORDERTYPE_VT2ALPACA[req.type], - "time_in_force": 'day', - } - if raw_dict['type'] == "limit": - raw_dict['limit_price'] = float(req.price) - - data = raw_dict - order = req.create_order_data(orderid, self.gateway_name) - print("debug send_order orderBody extra: ", order) - self.add_request( - "POST", - "/v1/orders", - callback=self.on_send_order, - data=data, - extra=order, - on_failed=self.on_failed_order, - on_error=self.on_error_order, - # json_str=data, - ) - print("debug send_order ret val : ", order.vt_orderid) - return order.vt_orderid - - def on_query_contracts(self, data, request: Request): - for instrument_data in data: - symbol = instrument_data['symbol'] - contract = ContractData( - symbol=symbol, - exchange=Exchange.ALPACA, - name=symbol, - product=Product.SPOT, - size=1, - pricetick=0.01, - gateway_name=self.gateway_name - ) - self.on_contract(contract) - - def on_failed_query_contracts(self, status_code: int, request: Request): - pass - - def on_error_query_contracts(self, exception_type: type, exception_value: Exception, tb, request: Request): - pass - - def query_contracts(self): - params = {"status": "active"} - self.add_request( - "GET", - "/v1/assets", - params=params, - callback=self.on_query_contracts, - on_failed=self.on_failed_query_contracts, - on_error=self.on_error_query_contracts, - ) + """""" + req = request.extra + msg = f"撤单成功,委托号:{req.orderid}" + self.gateway.write_log(msg) class AlpacaWebsocketApi(WebsocketClient): """""" - def __init__(self, gateway): + def __init__(self, gateway: AlpacaGateway): """""" - super(AlpacaWebsocketApi, self).__init__() + super().__init__() self.gateway = gateway self.gateway_name = gateway.gateway_name - self.order_id = 1_000_000 - # self.date = int(datetime.now().strftime('%y%m%d%H%M%S')) * self.orderId + + self.trade_count = 0 + self.key = "" self.secret = "" - self.callbacks = { - "trade": self.on_tick, - "orderBook10": self.on_depth, - "execution": self.on_trade, - "order": self.on_order, - "position": self.on_position, - "margin": self.on_account, - "instrument": self.on_contract, - } - - self.ticks = {} - self.accounts = {} - self.orders = {} - self.trades = set() - self.tickDict = {} - self.bidDict = {} - self.askDict = {} - self.orderLocalDict = {} - self.channelDict = {} # ChannelID : (Channel, Symbol) - self.channels = ["account_updates", "trade_updates"] - def connect( - self, key: str, secret: str, proxy_host: str, proxy_port: int, url: str + self, key: str, secret: str, url: str ): """""" self.key = key self.secret = secret - self.init(url, proxy_host, proxy_port) + + self.init(url) self.start() def authenticate(self): """""" - params = {"action": "authenticate", "data": { - "key_id": self.key, "secret_key": self.secret - }} + params = { + "action": "authenticate", + "data": { + "key_id": self.key, + "secret_key": self.secret + } + } self.send_packet(params) - def on_authenticate(self): + def on_authenticate(self, data): """""" - params = {"action": "listen", "data": { - "streams": self.channels - }} + if data["status"] == "authorized": + self.gateway.write_log("Websocket API登录成功") + else: + self.gateway.write_log("Websocket API登录失败") + return + + params = { + "action": "listen", + "data": { + "streams": ["trade_updates", "account_updates"] + } + } self.send_packet(params) - def subscribe(self, req: SubscribeRequest): - self.channels.append("A." + req.symbol) - self.channels.append("Q." + req.symbol) - print("debug subscribe: {} ".format(self.channels)) - params = {"action": "listen", "data": { - "streams": self.channels - }} - self.send_packet(params) - def on_connected(self): """""" self.gateway.write_log("Websocket API连接成功") @@ -436,109 +487,165 @@ class AlpacaWebsocketApi(WebsocketClient): def on_packet(self, packet: dict): """""" - print("debug on_packet: ", packet) - if "stream" in packet and "data" in packet: - stream_ret = packet['stream'] - data_ret = packet['data'] - if(stream_ret == "authorization"): - self.handle_auth(packet) - elif(stream_ret == "listening"): - self.gateway.write_log("listening {}".format(data_ret)) - else: - self.on_data(packet) - else: - print("unrecognize msg", packet) - - def on_data(self, data): - print("on_data is {}".format(data)) - stream_ret = data['stream'] - data_ret = data['data'] - if(stream_ret == "account_updates"): - frozen = float(data_ret['cash']) - float(data_ret['cash_withdrawable']) + stream = packet["stream"] + data = packet["data"] - account = AccountData( - accountid=data_ret['id'], - balance=float(data_ret['cash']), - frozen=frozen, - gateway_name=self.gateway_name - ) - self.gateway.on_account(account) - elif(stream_ret == "trade_updates"): - d = data_ret['order'] - order_id = d['id'] - order = GLOBAL_ORDER[order_id] - local_order_id = order.orderid - order.status = STATUS_ALPACA2VT[data_ret['event']] - self.gateway.on_order(order) - if (data_ret['event'] == "fill" or data_ret['event'] == "partial_fill"): - trade = TradeData( - symbol=d["symbol"], - exchange=Exchange.ALPACA, - orderid=local_order_id, - tradeid=d['id'], - direction=DIRECTION_ALPACA2VT[d["side"]], - price=d["filled_avg_price"], - volume=d["filled_qty"], - time=data_ret["timestamp"][11:19], - gateway_name=self.gateway_name, - ) - self.gateway.on_trade(trade) - else: - pass - - def handle_auth(self, data): - # stream_ret = data['stream'] - data_ret = data['data'] - if (data_ret['status'] == "authorized"): - print("authorization success!!!") - self.gateway.write_log("authorization success!!!") - self.on_authenticate() - elif (data_ret['status'] == "unauthorized"): - print("authorization failed!!!") - self.gateway.write_log("authorization failed!!!") - else: - print("??unhandled status: ", data) + if stream == "authorization": + self.on_authenticate(data) + elif stream == "listening": + streams = data["streams"] - def on_error(self, exception_type: type, exception_value: Exception, tb): - """""" - print("on_error: ", type, Exception, tb) - sys.stderr.write( - self.exception_detail(exception_type, exception_value, tb) - ) + if "trade_updates" in streams: + self.gateway.write_log("委托成交推送订阅成功") - def subscribe_topic(self): - pass + if "account_updates" in streams: + self.gateway.write_log("资金变化推送订阅成功") - def on_tick(self, d): - """""" - pass - - def on_depth(self, d): - """""" - pass - - def on_trade(self, d): - """""" - pass - - def generateDateTime(self, s): - """生成时间""" - dt = datetime.fromtimestamp(s / 1000.0) - time = dt.strftime("%H:%M:%S.%f") - return time + elif stream == "trade_updates": + self.on_order(data) + elif stream == "account_updates": + self.on_account(data) def on_order(self, data): """""" - pass + # Update order + d = data["order"] + sys_orderid = d["id"] + local_orderid = d["client_order_id"] + LOCAL_SYS_MAP[local_orderid] = sys_orderid - def on_position(self, d): - """""" - pass + direction = DIRECTION_ALPACA2VT[d["side"]] + order_type = ORDERTYPE_ALPACA2VT[d["type"]] - def on_account(self, d): - """""" - pass + order = OrderData( + orderid=local_orderid, + symbol=d["symbol"], + exchange=Exchange.SMART, + price=float(d["limit_price"]), + volume=float(d["qty"]), + type=order_type, + direction=direction, + traded=float(d["filled_qty"]), + status=STATUS_ALPACA2VT.get(d["status"], Status.SUBMITTING), + time=d["created_at"], + gateway_name=self.gateway_name, + ) + self.gateway.on_order(order) - def on_contract(self, d): + # Update Trade + event = data.get("event", "") + if event != "fill": + return + + self.trade_count += 1 + + trade = TradeData( + symbol=order.symbol, + exchange=order.exchange, + orderid=order.orderid, + tradeid=str(self.trade_count), + direction=order.direction, + price=float(data["price"]), + volume=int(data["qty"]), + time=data["timestamp"], + gateway_name=self.gateway_name + ) + self.gateway.on_trade(trade) + + def on_account(self, data): """""" - pass + account = AccountData( + accountid=data["id"], + balance=float(data["equity"]), + gateway_name=self.gateway_name + ) + self.gateway.on_account(account) + + +class AlpacaDataRestApi(RestClient): + """ + Alpaca Market Data REST API + """ + + def __init__(self, gateway: AlpacaGateway): + """""" + super().__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.key = "" + self.secret = "" + + self.symbols = set() + + def sign(self, request): + """ + Generate Alpaca signature. + """ + headers = { + "APCA-API-KEY-ID": self.key, + "APCA-API-SECRET-KEY": self.secret, + "Content-Type": "application/json" + } + + request.headers = headers + request.allow_redirects = False + return request + + def connect( + self, + key: str, + secret: str, + session_num: int + ): + """ + Initialize connection to REST server. + """ + self.key = key + self.secret = secret + + self.init(DATA_REST_HOST) + self.start(session_num) + + self.gateway.write_log("行情REST API启动成功") + + def subscribe(self, req: SubscribeRequest): + """""" + self.symbols.add(req.symbol) + + def query_bar(self): + """""" + if not self._active or not self.symbols: + return + + params = { + "symbols": ",".join(list(self.symbols)), + "limit": 1 + } + + self.add_request( + method="GET", + path="/v1/bars/1Min", + params=params, + callback=self.on_query_bar + ) + + def on_query_bar(self, data, request): + """""" + for symbol, buf in data.items(): + d = buf[0] + + tick = TickData( + symbol=symbol, + exchange=Exchange.SMART, + datetime=datetime.now(), + name=symbol, + open_price=d["o"], + high_price=d["h"], + low_price=d["l"], + last_price=d["c"], + gateway_name=self.gateway_name + ) + + self.gateway.on_tick(tick) From d0bb68fc479f2775616b7d2433fddaa419f8a772 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Wed, 26 Jun 2019 21:43:06 +0800 Subject: [PATCH 20/20] [Mod] complete test of ToraGateway --- examples/vn_trader/run.py | 10 ++-- vnpy/gateway/tora/md.py | 18 +++++- vnpy/gateway/tora/td.py | 91 ++++++++++++++++++++++++------- vnpy/gateway/tora/tora_gateway.py | 12 ++-- vnpy/trader/ui/mainwindow.py | 1 + 5 files changed, 99 insertions(+), 33 deletions(-) diff --git a/examples/vn_trader/run.py b/examples/vn_trader/run.py index c7cd65c5..aaab4352 100644 --- a/examples/vn_trader/run.py +++ b/examples/vn_trader/run.py @@ -9,7 +9,7 @@ from vnpy.gateway.futu import FutuGateway from vnpy.gateway.ib import IbGateway from vnpy.gateway.ctp import CtpGateway # from vnpy.gateway.ctptest import CtptestGateway -from vnpy.gateway.femas import FemasGateway +# from vnpy.gateway.femas import FemasGateway from vnpy.gateway.tiger import TigerGateway # from vnpy.gateway.oes import OesGateway from vnpy.gateway.okex import OkexGateway @@ -19,7 +19,8 @@ from vnpy.gateway.onetoken import OnetokenGateway from vnpy.gateway.okexf import OkexfGateway # from vnpy.gateway.xtp import XtpGateway from vnpy.gateway.hbdm import HbdmGateway -from vnpy.gateway.tap import TapGateway +# from vnpy.gateway.tap import TapGateway +from vnpy.gateway.tora import ToraGateway from vnpy.app.cta_strategy import CtaStrategyApp from vnpy.app.csv_loader import CsvLoaderApp @@ -40,7 +41,7 @@ def main(): main_engine.add_gateway(BinanceGateway) main_engine.add_gateway(CtpGateway) # main_engine.add_gateway(CtptestGateway) - main_engine.add_gateway(FemasGateway) + # main_engine.add_gateway(FemasGateway) main_engine.add_gateway(IbGateway) main_engine.add_gateway(FutuGateway) main_engine.add_gateway(BitmexGateway) @@ -53,7 +54,8 @@ def main(): main_engine.add_gateway(OkexfGateway) main_engine.add_gateway(HbdmGateway) # main_engine.add_gateway(XtpGateway) - main_engine.add_gateway(TapGateway) + # main_engine.add_gateway(TapGateway) + main_engine.add_gateway(ToraGateway) main_engine.add_app(CtaStrategyApp) main_engine.add_app(CtaBacktesterApp) diff --git a/vnpy/gateway/tora/md.py b/vnpy/gateway/tora/md.py index b35a5b51..ef10176c 100644 --- a/vnpy/gateway/tora/md.py +++ b/vnpy/gateway/tora/md.py @@ -23,22 +23,28 @@ def parse_datetime(date: str, time: str): class ToraMdSpi(CTORATstpMdSpi): + """""" def __init__(self, api: "ToraMdApi", gateway: "BaseGateway"): + """""" super().__init__() self.gateway = gateway self._api = api def OnFrontConnected(self) -> Any: + """""" self.gateway.write_log("行情服务器连接成功") def OnFrontDisconnected(self, error_code: int) -> Any: - self.gateway.write_log(f"行情服务器连接断开({error_code}):{get_error_msg(error_code)}") + """""" + self.gateway.write_log( + f"行情服务器连接断开({error_code}):{get_error_msg(error_code)}") def OnRspError( self, error_info: CTORATstpRspInfoField, request_id: int, is_last: bool ) -> Any: + """""" error_id = error_info.ErrorID error_msg = error_info.ErrorMsg self.gateway.write_log(f"行情服务收到错误消息({error_id}):{error_msg}") @@ -50,6 +56,7 @@ class ToraMdSpi(CTORATstpMdSpi): request_id: int, is_last: bool, ) -> Any: + """""" error_id = error_info.ErrorID if error_id != 0: error_msg = error_info.ErrorMsg @@ -64,6 +71,7 @@ class ToraMdSpi(CTORATstpMdSpi): request_id: int, is_last: bool, ) -> Any: + """""" error_id = error_info.ErrorID if error_id != 0: error_msg = error_info.ErrorMsg @@ -72,6 +80,7 @@ class ToraMdSpi(CTORATstpMdSpi): self.gateway.write_log("行情服务器登出成功") def OnRtnDepthMarketData(self, data: CTORATstpMarketDataField) -> Any: + """""" if data.ExchangeID not in EXCHANGE_TORA2VT: return tick_data = TickData( @@ -114,8 +123,10 @@ class ToraMdSpi(CTORATstpMdSpi): class ToraMdApi: + """""" def __init__(self, gateway: BaseGateway): + """""" self.gateway = gateway self.md_address = "" @@ -151,10 +162,13 @@ class ToraMdApi: return True def subscribe(self, symbols: List[str], exchange: Exchange): - err = self._native_api.SubscribeMarketData(symbols, EXCHANGE_VT2TORA[exchange]) + """""" + err = self._native_api.SubscribeMarketData( + symbols, EXCHANGE_VT2TORA[exchange]) self._if_error_write_log(err, "subscribe") def _if_error_write_log(self, error_code: int, function_name: str): + """""" if error_code != 0: error_msg = get_error_msg(error_code) msg = f'在执行 {function_name} 时发生错误({error_code}): {error_msg}' diff --git a/vnpy/gateway/tora/td.py b/vnpy/gateway/tora/td.py index db3b3e44..1d0926af 100644 --- a/vnpy/gateway/tora/td.py +++ b/vnpy/gateway/tora/td.py @@ -47,7 +47,8 @@ def _check_error(none_return: bool = True, def wrapped(self, info, error_info, *args): function_name = func.__name__ if print_function_name: - print(function_name, "info" if info else "None", error_info.ErrorID) + print(function_name, "info" if info else "None", + error_info.ErrorID) # print if errors error_code = error_info.ErrorID @@ -72,8 +73,10 @@ def _check_error(none_return: bool = True, class QueryLoop: + """""" def __init__(self, gateway: "BaseGateway"): + """""" self.event_engine = gateway.event_engine self._seconds_left = 0 @@ -84,6 +87,7 @@ class QueryLoop: self.event_engine.register(EVENT_TIMER, self._process_timer_event) def stop(self): + """""" self.event_engine.unregister(EVENT_TIMER, self._process_timer_event) def _process_timer_event(self, event): @@ -96,7 +100,8 @@ class QueryLoop: self._seconds_left = 2 # get the last one and re-queue it - func = self._query_functions.pop(0) # works fine if there is no so much items + # works fine if there is no so much items + func = self._query_functions.pop(0) self._query_functions.append(func) # call it @@ -107,11 +112,13 @@ OrdersType = Dict[str, "OrderInfo"] class ToraTdSpi(CTORATstpTraderSpi): + """""" def __init__(self, session_info: "SessionInfo", api: "ToraTdApi", gateway: "BaseGateway", orders: OrdersType): + """""" super().__init__() self.session_info = session_info self.gateway = gateway @@ -120,6 +127,7 @@ class ToraTdSpi(CTORATstpTraderSpi): self._api: "ToraTdApi" = api def OnRtnTrade(self, info: CTORATstpTradeField) -> None: + """""" try: trade_data = TradeData( gateway_name=self.gateway.gateway_name, @@ -138,6 +146,7 @@ class ToraTdSpi(CTORATstpTraderSpi): return def OnRtnOrder(self, info: CTORATstpOrderField) -> None: + """""" self._api.update_last_local_order_id(int(info.OrderRef)) try: @@ -155,6 +164,7 @@ class ToraTdSpi(CTORATstpTraderSpi): @_check_error(error_return=False, write_log=False, print_function_name=False) def OnErrRtnOrderInsert(self, info: CTORATstpInputOrderField, error_info: CTORATstpRspInfoField) -> None: + """""" try: self._api.update_last_local_order_id(int(info.OrderRef)) except ValueError: @@ -173,24 +183,27 @@ class ToraTdSpi(CTORATstpTraderSpi): @_check_error(error_return=False, write_log=False, print_function_name=False) def OnErrRtnOrderAction(self, info: CTORATstpOrderActionField, error_info: CTORATstpRspInfoField) -> None: + """""" pass @_check_error() def OnRtnCondOrder(self, info: CTORATstpConditionOrderField) -> None: + """""" pass @_check_error() def OnRspOrderAction(self, info: CTORATstpInputOrderActionField, error_info: CTORATstpRspInfoField, request_id: int, is_last: bool) -> None: - print("order action succeed!") + pass @_check_error() def OnRspOrderInsert(self, info: CTORATstpInputOrderField, error_info: CTORATstpRspInfoField, request_id: int, is_last: bool) -> None: + """""" try: order_data = self.parse_order_field(info) except KeyError: - self.gateway.write_log(f"收到无法识别的下单回执({info.OrderRef})!") + self.gateway.write_log(f"收到无法识别的下单回执({info.OrderRef})") return self.gateway.on_order(order_data) @@ -208,14 +221,17 @@ class ToraTdSpi(CTORATstpTraderSpi): @_check_error(print_function_name=False) def OnRspQryPosition(self, info: CTORATstpPositionField, error_info: CTORATstpRspInfoField, request_id: int, is_last: bool) -> None: + """""" if info.InvestorID != self.session_info.investor_id: - self.gateway.write_log("OnRspQryPosition:收到其他账户的仓位信息!") + self.gateway.write_log("OnRspQryPosition:收到其他账户的仓位信息") return if info.ExchangeID not in EXCHANGE_TORA2VT: - self.gateway.write_log(f"OnRspQryPosition:忽略不支持的交易所:{info.ExchangeID}") + self.gateway.write_log( + f"OnRspQryPosition:忽略不支持的交易所:{info.ExchangeID}") return volume = info.CurrentPosition - frozen = info.HistoryPosFrozen + info.TodayBSFrozen + info.TodayPRFrozen + info.TodaySMPosFrozen + frozen = info.HistoryPosFrozen + info.TodayBSFrozen + \ + info.TodayPRFrozen + info.TodaySMPosFrozen position_data = PositionData( gateway_name=self.gateway.gateway_name, symbol=info.SecurityID, @@ -224,7 +240,8 @@ class ToraTdSpi(CTORATstpTraderSpi): volume=volume, # verify this: which one should vnpy use? frozen=frozen, # verify this: which one should i use? price=info.TotalPosCost / volume, - pnl=info.LastPrice * volume - info.TotalPosCost, # verify this: is this formula correct + # verify this: is this formula correct + pnl=info.LastPrice * volume - info.TotalPosCost, yd_volume=info.HistoryPos, ) self.gateway.on_position(position_data) @@ -233,7 +250,7 @@ class ToraTdSpi(CTORATstpTraderSpi): def OnRspQryTradingAccount(self, info: CTORATstpTradingAccountField, error_info: CTORATstpRspInfoField, request_id: int, is_last: bool) -> None: - + """""" self.session_info.account_id = info.AccountID account_data = AccountData( gateway_name=self.gateway.gateway_name, @@ -247,19 +264,22 @@ class ToraTdSpi(CTORATstpTraderSpi): def OnRspQryShareholderAccount(self, info: CTORATstpShareholderAccountField, error_info: CTORATstpRspInfoField, request_id: int, is_last: bool) -> None: + """""" exchange = EXCHANGE_TORA2VT[info.ExchangeID] self.session_info.shareholder_ids[exchange] = info.ShareholderID @_check_error(print_function_name=False) def OnRspQryInvestor(self, info: CTORATstpInvestorField, error_info: CTORATstpRspInfoField, request_id: int, is_last: bool) -> None: + """""" self.session_info.investor_id = info.InvestorID @_check_error(none_return=False, print_function_name=False) def OnRspQrySecurity(self, info: CTORATstpSecurityField, error_info: CTORATstpRspInfoField, request_id: int, is_last: bool) -> None: + """""" if is_last: - self.gateway.write_log("合约信息查询成功!") + self.gateway.write_log("合约信息查询成功") if not info: return @@ -283,12 +303,14 @@ class ToraTdSpi(CTORATstpTraderSpi): self.gateway.on_contract(contract_data) def OnFrontConnected(self) -> None: + """""" self.gateway.write_log("交易服务器连接成功") self._api.login() @_check_error(print_function_name=False) def OnRspUserLogin(self, info: CTORATstpRspUserLoginField, error_info: CTORATstpRspInfoField, request_id: int, is_last: bool) -> None: + """""" self._api.update_last_local_order_id(int(info.MaxOrderRef)) self.session_info.front_id = info.FrontID self.session_info.session_id = info.SessionID @@ -298,7 +320,9 @@ class ToraTdSpi(CTORATstpTraderSpi): self._api.start_query_loop() # stop at ToraTdApi.stop() def OnFrontDisconnected(self, error_code: int) -> None: - self.gateway.write_log(f"交易服务器连接断开({error_code}):{get_error_msg(error_code)}") + """""" + self.gateway.write_log( + f"交易服务器连接断开({error_code}):{get_error_msg(error_code)}") def parse_order_field(self, info): """ @@ -330,6 +354,7 @@ class ToraTdSpi(CTORATstpTraderSpi): class ToraTdApi: def __init__(self, gateway: BaseGateway): + """""" self.gateway = gateway self.username = "" @@ -347,13 +372,16 @@ class ToraTdApi: self._next_local_order_id = int(1e5) def get_shareholder_id(self, exchange: Exchange): + """""" return self.session_info.shareholder_ids[exchange] def update_last_local_order_id(self, new_val: int): + """""" cur = self._next_local_order_id self._next_local_order_id = max(cur, new_val + 1) def _if_error_write_log(self, error_code: int, function_name: str): + """""" if error_code != 0: error_msg = get_error_msg(error_code) msg = f'在执行 {function_name} 时发生错误({error_code}): {error_msg}' @@ -361,21 +389,25 @@ class ToraTdApi: return True def _get_new_req_id(self): + """""" req_id = self._last_req_id self._last_req_id += 1 return req_id def _get_new_order_id(self) -> str: + """""" order_id = self._next_local_order_id self._next_local_order_id += 1 return str(order_id) def query_contracts(self): + """""" info = CTORATstpQrySecurityField() err = self._native_api.ReqQrySecurity(info, self._get_new_req_id()) self._if_error_write_log(err, "query_contracts") def query_exchange(self, exchange: Exchange): + """""" info = CTORATstpQryExchangeField() info.ExchangeID = EXCHANGE_VT2TORA[exchange] err = self._native_api.ReqQryExchange(info, self._get_new_req_id()) @@ -383,6 +415,7 @@ class ToraTdApi: self._if_error_write_log(err, "query_exchange") def query_market_data(self, symbol: str, exchange: Exchange): + """""" info = CTORATstpQryMarketDataField() info.ExchangeID = EXCHANGE_VT2TORA[exchange] info.SecurityID = symbol @@ -390,6 +423,7 @@ class ToraTdApi: self._if_error_write_log(err, "query_market_data") def stop(self): + """""" self.stop_query_loop() if self._native_api: @@ -419,16 +453,21 @@ class ToraTdApi: :return: """ flow_path = str(get_folder_path(self.gateway.gateway_name.lower())) - self._native_api = CTORATstpTraderApi.CreateTstpTraderApi(flow_path, True) - self._spi = ToraTdSpi(self.session_info, self, self.gateway, self.orders) + self._native_api = CTORATstpTraderApi.CreateTstpTraderApi( + flow_path, True) + self._spi = ToraTdSpi(self.session_info, self, + self.gateway, self.orders) self._native_api.RegisterSpi(self._spi) self._native_api.RegisterFront(self.td_address) - self._native_api.SubscribePublicTopic(TORA_TE_RESUME_TYPE.TORA_TERT_RESTART) - self._native_api.SubscribePrivateTopic(TORA_TE_RESUME_TYPE.TORA_TERT_RESTART) + self._native_api.SubscribePublicTopic( + TORA_TE_RESUME_TYPE.TORA_TERT_RESTART) + self._native_api.SubscribePrivateTopic( + TORA_TE_RESUME_TYPE.TORA_TERT_RESTART) self._native_api.Init() return True def send_order(self, req: OrderRequest): + """""" if req.type is OrderType.STOP: raise NotImplementedError() if req.type is OrderType.FAK or req.type is OrderType.FOK: @@ -465,16 +504,19 @@ class ToraTdApi: self.session_info.session_id, self.session_info.front_id, ) - self.gateway.on_order(req.create_order_data(order_id, self.gateway.gateway_name)) + self.gateway.on_order(req.create_order_data( + order_id, self.gateway.gateway_name)) # err = self._native_api.ReqCondOrderInsert(info, self._get_new_req_id()) err = self._native_api.ReqOrderInsert(info, self._get_new_req_id()) self._if_error_write_log(err, "send_order:ReqOrderInsert") def cancel_order(self, req: CancelRequest): + """""" info = CTORATstpInputOrderActionField() info.InvestorID = self.session_info.investor_id - info.ExchangeID = EXCHANGE_VT2TORA[req.exchange] # 没有的话:(16608):VIP:未知的交易所代码 + # 没有的话:(16608):VIP:未知的交易所代码 + info.ExchangeID = EXCHANGE_VT2TORA[req.exchange] info.SecurityID = req.symbol # info.OrderActionRef = str(self._get_new_req_id()) @@ -491,6 +533,7 @@ class ToraTdApi: self._if_error_write_log(err, "cancel_order:ReqOrderAction") def query_initialize_status(self): + """""" self.query_contracts() self.query_investors() self.query_shareholder_ids() @@ -500,41 +543,51 @@ class ToraTdApi: self.query_trades() def query_accounts(self): + """""" info = CTORATstpQryTradingAccountField() - err = self._native_api.ReqQryTradingAccount(info, self._get_new_req_id()) + err = self._native_api.ReqQryTradingAccount( + info, self._get_new_req_id()) self._if_error_write_log(err, "query_accounts") def query_shareholder_ids(self): + """""" info = CTORATstpQryShareholderAccountField() - err = self._native_api.ReqQryShareholderAccount(info, self._get_new_req_id()) + err = self._native_api.ReqQryShareholderAccount( + info, self._get_new_req_id()) self._if_error_write_log(err, "query_shareholder_ids") def query_investors(self): + """""" info = CTORATstpQryInvestorField() err = self._native_api.ReqQryInvestor(info, self._get_new_req_id()) self._if_error_write_log(err, "query_investors") def query_positions(self): + """""" info = CTORATstpQryPositionField() err = self._native_api.ReqQryPosition(info, self._get_new_req_id()) self._if_error_write_log(err, "query_positions") def query_orders(self): + """""" info = CTORATstpQryOrderField() err = self._native_api.ReqQryOrder(info, self._get_new_req_id()) self._if_error_write_log(err, "query_orders") def query_trades(self): + """""" info = CTORATstpQryTradeField() err = self._native_api.ReqQryTrade(info, self._get_new_req_id()) self._if_error_write_log(err, "query_trades") def start_query_loop(self): + """""" if not self._query_loop: self._query_loop = QueryLoop(self.gateway) self._query_loop.start() def stop_query_loop(self): + """""" if self._query_loop: self._query_loop.stop() self._query_loop = None diff --git a/vnpy/gateway/tora/tora_gateway.py b/vnpy/gateway/tora/tora_gateway.py index 76501a94..c09acee3 100644 --- a/vnpy/gateway/tora/tora_gateway.py +++ b/vnpy/gateway/tora/tora_gateway.py @@ -5,7 +5,8 @@ TODO: * Linux support """ -from vnpy.api.tora.vntora import (AsyncDispatchException, set_async_callback_exception_handler) +from vnpy.api.tora.vntora import ( + AsyncDispatchException, set_async_callback_exception_handler) from vnpy.event import EventEngine from vnpy.trader.gateway import BaseGateway @@ -20,6 +21,8 @@ def is_valid_front_address(address: str): class ToraGateway(BaseGateway): + """""" + default_setting = { "账号": "", "密码": "", @@ -86,13 +89,6 @@ class ToraGateway(BaseGateway): """""" self._td_api.query_positions() - def write_log(self, msg: str): - """ - for easier test - """ - print(msg) - super().write_log(msg) - def _async_callback_exception_handler(self, e: AsyncDispatchException): error_str = f"发生内部错误:\n" f"位置:{e.instance}.{e.function_name}" f"详细信息:{e.what}" self.write_log(error_str) diff --git a/vnpy/trader/ui/mainwindow.py b/vnpy/trader/ui/mainwindow.py index 3313c780..8d2ab6c5 100644 --- a/vnpy/trader/ui/mainwindow.py +++ b/vnpy/trader/ui/mainwindow.py @@ -163,6 +163,7 @@ class MainWindow(QtWidgets.QMainWindow): def init_toolbar(self): """""" self.toolbar = QtWidgets.QToolBar(self) + self.toolbar.setObjectName("工具栏") self.toolbar.setFloatable(False) self.toolbar.setMovable(False)