[Mod] complete function test of da gateway

This commit is contained in:
vn.py 2019-09-02 16:44:48 +08:00
parent 795eab7228
commit fa6c9e9975
2 changed files with 231 additions and 227 deletions

Binary file not shown.

View File

@ -2,7 +2,7 @@
""" """
from datetime import datetime from datetime import datetime
from threading import Thread from copy import copy
import wmi import wmi
@ -38,7 +38,7 @@ from vnpy.trader.event import EVENT_TIMER
STATUS_DA2VT = { STATUS_DA2VT = {
"1": Status.SUBMITTING,, "1": Status.SUBMITTING,
"2": Status.NOTTRADED, "2": Status.NOTTRADED,
"3": Status.PARTTRADED, "3": Status.PARTTRADED,
"4": Status.ALLTRADED, "4": Status.ALLTRADED,
@ -78,7 +78,7 @@ EXCHANGE_DA2VT = {
# "LME": Exchange.LME, # "LME": Exchange.LME,
# "CME_CBT": Exchange.CBOT, # "CME_CBT": Exchange.CBOT,
# "HKEX": Exchange.HKFE, # "HKEX": Exchange.HKFE,
# "CME": Exchange.CME, "CME": Exchange.CME,
# "TOCOM": Exchange.TOCOM, # "TOCOM": Exchange.TOCOM,
# "KRX": Exchange.KRX, # "KRX": Exchange.KRX,
# "ICE": Exchange.ICE # "ICE": Exchange.ICE
@ -96,9 +96,10 @@ PRODUCT_DA2VT = {
# } # }
symbol_exchange_map = {}
symbol_name_map = {} symbol_name_map = {}
symbol_size_map = {} symbol_currency_map = {}
currency_account_map = {}
account_currency_map = {}
class DaGateway(BaseGateway): class DaGateway(BaseGateway):
@ -130,7 +131,7 @@ class DaGateway(BaseGateway):
future_address = setting["交易服务器"] future_address = setting["交易服务器"]
market_address = setting["行情服务器"] market_address = setting["行情服务器"]
auth_code = setting["授权码"] auth_code = setting["授权码"]
if not future_address.startswith("tcp://"): if not future_address.startswith("tcp://"):
future_address = "tcp://" + future_address future_address = "tcp://" + future_address
if not market_address.startswith("tcp://"): if not market_address.startswith("tcp://"):
@ -224,50 +225,47 @@ class DaMarketApi(MarketApi):
""" """
Callback of tick data update. Callback of tick data update.
""" """
print('on rsp', data, error)
symbol = data["TreatyCode"] symbol = data["TreatyCode"]
exchange = EXCHANGE_DA2VT.get(data["ExchangeCode"], None) exchange = EXCHANGE_DA2VT.get(data["ExchangeCode"], None)
if not exchange: if not exchange:
return return
timestamp = f"{data['TradeDay']} {data['Time']}"
tick = TickData( tick = TickData(
symbol=symbol, symbol=symbol,
exchange=exchange, exchange=exchange,
datetime=datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f"), datetime=datetime.strptime(data['Time'], "%Y-%m-%d %H:%M:%S"),
name=symbol_name_map[symbol], volume=to_int(data["FilledNum"]),
volume=int(data["FilledNum"]), open_interest=to_int(data["HoldNum"]),
open_interest=int(data["HoldNum"]), limit_up=to_float(data["LimitUpPrice"]),
limit_up=float(data["LimitUpPrice"]), limit_down=to_float(data["LimitDownPrice"]),
limit_down=float(data["LimitDownPrice"]), last_price=to_float(data["CurrPrice"]),
last_price=float(data["CurrPrice"]), open_price=to_float(data["Open"]),
open_price=float(data["Open"]), high_price=to_float(data["High"]),
high_price=float(data["High"]), low_price=to_float(data["Low"]),
low_price=float(data["Low"]), pre_close=to_float(data["PreSettlementPrice"]),
pre_close=float(data["PreSettlementPrice"]), bid_price_1=to_float(data["BuyPrice"]),
bid_price_1=float(data["BuyPrice"]), bid_price_2=to_float(data["BuyPrice2"]),
bid_price_2=float(data["BuyPrice2"]), bid_price_3=to_float(data["BuyPrice3"]),
bid_price_3=float(data["BuyPrice3"]), bid_price_4=to_float(data["BuyPrice4"]),
bid_price_4=float(data["BuyPrice4"]), bid_price_5=to_float(data["BuyPrice5"]),
bid_price_5=float(data["BuyPrice5"]), ask_price_1=to_float(data["SalePrice"]),
ask_price_1=float(data["SalePrice"]), ask_price_2=to_float(data["SalePrice2"]),
ask_price_2=float(data["SalePrice2"]), ask_price_3=to_float(data["SalePrice3"]),
ask_price_3=float(data["SalePrice3"]), ask_price_4=to_float(data["SalePrice4"]),
ask_price_4=float(data["SalePrice4"]), ask_price_5=to_float(data["SalePrice5"]),
ask_price_5=float(data["SalePrice5"]), bid_volume_1=to_int(data["BuyNumber"]),
bid_volume_1=int(data["BuyNumber"]), bid_volume_2=to_int(data["BuyNumber2"]),
bid_volume_2=int(data["BuyNumber2"]), bid_volume_3=to_int(data["BuyNumber3"]),
bid_volume_3=int(data["BuyNumber3"]), bid_volume_4=to_int(data["BuyNumber4"]),
bid_volume_4=int(data["BuyNumber4"]), bid_volume_5=to_int(data["BuyNumber5"]),
bid_volume_5=int(data["BuyNumber5"]), ask_volume_1=to_int(data["SaleNumber"]),
ask_volume_1=int(data["SaleNumber"]), ask_volume_2=to_int(data["SaleNumber2"]),
ask_volume_2=int(data["SaleNumber2"]), ask_volume_3=to_int(data["SaleNumber3"]),
ask_volume_3=int(data["SaleNumber3"]), ask_volume_4=to_int(data["SaleNumber4"]),
ask_volume_4=int(data["SaleNumber4"]), ask_volume_5=to_int(data["SaleNumber5"]),
ask_volume_5=int(data["SaleNumber5"]),
gateway_name=self.gateway_name gateway_name=self.gateway_name
) )
tick.name = symbol_name_map[tick.vt_symbol]
self.gateway.on_tick(tick) self.gateway.on_tick(tick)
def connect(self, address: str, userid: str, password: str, auth_code: str): def connect(self, address: str, userid: str, password: str, auth_code: str):
@ -328,7 +326,6 @@ class DaMarketApi(MarketApi):
} }
self.reqid += 1 self.reqid += 1
i = self.reqMarketData(da_req, self.reqid) i = self.reqMarketData(da_req, self.reqid)
print(i, da_req)
self.subscribed[req.symbol] = req self.subscribed[req.symbol] = req
@ -351,7 +348,7 @@ class DaFutureApi(FutureApi):
self.gateway_name = gateway.gateway_name self.gateway_name = gateway.gateway_name
self.reqid = 0 self.reqid = 0
self.order_ref = 0 self.local_no = int(datetime.now().strftime("%Y%m%d") + "000000")
self.connect_status = False self.connect_status = False
self.login_status = False self.login_status = False
@ -363,6 +360,9 @@ class DaFutureApi(FutureApi):
self.auth_code = "" self.auth_code = ""
self.mac_address = get_mac_address() self.mac_address = get_mac_address()
self.orders = {}
self.order_info = {}
def onFrontConnected(self): def onFrontConnected(self):
"""""" """"""
self.gateway.write_log("交易服务器连接成功") self.gateway.write_log("交易服务器连接成功")
@ -382,6 +382,13 @@ class DaFutureApi(FutureApi):
# 查询可交易合约 # 查询可交易合约
for exchange in EXCHANGE_DA2VT.values(): for exchange in EXCHANGE_DA2VT.values():
self.query_contract(exchange) self.query_contract(exchange)
# 查询账户信息
self.query_account()
self.query_position()
self.query_order()
self.query_trade()
else: else:
self.login_failed = True self.login_failed = True
self.gateway.write_error("交易服务器登录失败", error) self.gateway.write_error("交易服务器登录失败", error)
@ -392,30 +399,25 @@ class DaFutureApi(FutureApi):
def onRspOrderInsert(self, data: dict, error: dict, reqid: int, last: bool): def onRspOrderInsert(self, data: dict, error: dict, reqid: int, last: bool):
"""""" """"""
order_ref = data["OrderRef"] errorid = error["ErrorID"]
orderid = f"{self.frontid}_{self.sessionid}_{order_ref}" orderid = data["LocalNo"]
order = self.orders[orderid]
symbol = data["InstrumentID"] if errorid:
exchange = symbol_exchange_map[symbol] order.status = Status.REJECTED
self.gateway.write_error("交易委托失败", error)
else:
order.time = data["OrderTime"]
order = OrderData( self.order_info[order.orderid] = (data["OrderNo"], data["SystemNo"])
symbol=symbol,
exchange=exchange, self.gateway.on_order(copy(order))
orderid=orderid,
direction=DIRECTION_DA2VT[data["Direction"]],
offset=OFFSET_DA2VT.get(data["CombOffsetFlag"], Offset.NONE),
price=data["LimitPrice"],
volume=data["VolumeTotalOriginal"],
status=Status.REJECTED,
gateway_name=self.gateway_name
)
self.gateway.on_order(order)
self.gateway.write_error("交易委托失败", error) def onRspOrderCancel(self, data: dict, error: dict, reqid: int, last: bool):
def onRspOrderAction(self, data: dict, error: dict, reqid: int, last: bool):
"""""" """"""
self.gateway.write_error("交易撤单失败", error) errorid = error["ErrorID"]
if errorid:
self.gateway.write_error("交易撤单失败", error)
def onRspQueryMaxOrderVolume(self, data: dict, error: dict, reqid: int, last: bool): def onRspQueryMaxOrderVolume(self, data: dict, error: dict, reqid: int, last: bool):
"""""" """"""
@ -430,79 +432,6 @@ class DaFutureApi(FutureApi):
self.reqid += 1 self.reqid += 1
self.reqQryInstrument({}, self.reqid) self.reqQryInstrument({}, self.reqid)
def onRspQryInvestorPosition(self, data: dict, error: dict, reqid: int, last: bool):
""""""
if not data:
return
# Get buffered position object
key = f"{data['InstrumentID'], data['PosiDirection']}"
position = self.positions.get(key, None)
if not position:
position = PositionData(
symbol=data["InstrumentID"],
exchange=symbol_exchange_map[data["InstrumentID"]],
direction=DIRECTION_DA2VT[data["PosiDirection"]],
gateway_name=self.gateway_name
)
self.positions[key] = position
# For SHFE position data update
if position.exchange == Exchange.SHFE:
if data["YdPosition"] and not data["TodayPosition"]:
position.yd_volume = data["Position"]
# For other exchange position data update
else:
position.yd_volume = data["Position"] - data["TodayPosition"]
# Get contract size (spread contract has no size value)
size = symbol_size_map.get(position.symbol, 0)
# Calculate previous position cost
cost = position.price * position.volume * size
# Update new position volume
position.volume += data["Position"]
position.pnl += data["PositionProfit"]
# Calculate average position price
if position.volume and size:
cost += data["PositionCost"]
position.price = cost / (position.volume * size)
# Get frozen volume
if position.direction == Direction.LONG:
position.frozen += data["ShortFrozen"]
else:
position.frozen += data["LongFrozen"]
if last:
for position in self.positions.values():
self.gateway.on_position(position)
self.positions.clear()
def onRspQryTradingAccount(self, data: dict, error: dict, reqid: int, last: bool):
""""""
if "AccountID" not in data:
return
account = AccountData(
accountid=data["AccountID"],
balance=data["Balance"],
frozen=data["FrozenMargin"] + data["FrozenCash"] + data["FrozenCommission"],
gateway_name=self.gateway_name
)
account.available = data["Available"]
self.gateway.on_account(account)
def onRspQryExchange(self, data: dict, error: dict, reqid: int, last: bool):
"""
Callback of instrument query.
"""
print(data)
def onRspQryInstrument(self, data: dict, error: dict, reqid: int, last: bool): def onRspQryInstrument(self, data: dict, error: dict, reqid: int, last: bool):
""" """
Callback of instrument query. Callback of instrument query.
@ -519,6 +448,9 @@ class DaFutureApi(FutureApi):
gateway_name=self.gateway_name gateway_name=self.gateway_name
) )
symbol_name_map[contract.vt_symbol] = contract.name
symbol_currency_map[contract.symbol] = data["CommodityFCurrencyNo"]
self.gateway.on_contract(contract) self.gateway.on_contract(contract)
if last: if last:
@ -542,17 +474,32 @@ class DaFutureApi(FutureApi):
time=data["OrderTime"], time=data["OrderTime"],
gateway_name=self.gateway_name gateway_name=self.gateway_name
) )
self.gateway.on_order(order)
self.local_no = max(self.local_no, int(data["LocalNo"]))
self.orders[order.orderid] = order
self.order_info[order.orderid] = (data["OrderNo"], data["SystemNo"])
self.gateway.on_order(copy(order))
if last:
self.gateway.write_log("委托信息查询成功")
def onRspQryTrade(self, data: dict, error: dict, reqid: int, last: bool): def onRspQryTrade(self, data: dict, error: dict, reqid: int, last: bool):
""" """
Callback of trade query. Callback of trade query.
""" """
self.update_trade(data)
if last:
self.gateway.write_log("成交信息查询成功")
def update_trade(self, data: dict):
""""""
trade = TradeData( trade = TradeData(
symbol=data["TreatyCode"], symbol=data["TreatyCode"],
exchange=EXCHANGE_DA2VT[data["ExchangeCode"]], exchange=EXCHANGE_DA2VT[data["ExchangeCode"]],
orderid=data["LocalNo"], orderid=data["LocalNo"],
tradeid=data["FilledNo"] tradeid=data["FilledNo"],
direction=DIRECTION_DA2VT[data["BuySale"]], direction=DIRECTION_DA2VT[data["BuySale"]],
offset=OFFSET_DA2VT[data["AddReduce"]], offset=OFFSET_DA2VT[data["AddReduce"]],
price=float(data["FilledPrice"]), price=float(data["FilledPrice"]),
@ -567,70 +514,115 @@ class DaFutureApi(FutureApi):
Callback of trade query. Callback of trade query.
""" """
account = AccountData( account = AccountData(
accountid=data["UserId"], accountid=data["CurrencyNo"],
balance=float(data["TodayBalance"]) balance=float(data["TodayBalance"]),
frozen=float(data["FreezenMoney"]), frozen=float(data["FreezenMoney"]),
gateway_name=self.gateway_name gateway_name=self.gateway_name
) )
currency_account_map[data["CurrencyNo"]] = data["AccountNo"]
account_currency_map[data["AccountNo"]] = data["CurrencyNo"]
self.gateway.on_account(account) self.gateway.on_account(account)
def onRspQryPosition(self, data: dict, error: dict, reqid: int, last: bool): if last:
""" self.gateway.write_log("资金信息查询成功")
Callback of trade query.
"""
position = PositionData(
symbol=data["ContCode"],
exchange=EXCHANGE_DA2VT[data["ExchangeNo"]],
direction=DIRECTION_DA2VT[data["Direct"]],
volume=data["HoldVol"],
price=data["HoldPrice"],
gateway_name=self.gateway_name
)
self.gateway.on_position(position)
def onRtnOrder(self, data: dict): def onRspQryTotalPosition(self, data: dict, error: dict, reqid: int, last: bool):
"""
Callback of position query.
"""
if data["TreatyCode"]:
long_position = PositionData(
symbol=data["TreatyCode"],
exchange=EXCHANGE_DA2VT[data["ExchangeNo"]],
direction=Direction.LONG,
volume=data["BuyHoldNumber"],
price=data["BuyHoldOpenPrice"],
gateway_name=self.gateway_name
)
self.gateway.on_position(long_position)
short_position = PositionData(
symbol=data["TreatyCode"],
exchange=EXCHANGE_DA2VT[data["ExchangeNo"]],
direction=Direction.SHORT,
volume=data["SaleHoldNumber"],
price=data["SaleHoldOpenPrice"],
gateway_name=self.gateway_name
)
self.gateway.on_position(short_position)
if last:
self.gateway.write_log("持仓信息查询成功")
def onRtnOrder(self, data: dict, error: dict, reqid: int, last: bool):
""" """
Callback of order status update. Callback of order status update.
""" """
pass orderid = data["LocalOrderNo"]
self.local_no = max(self.local_no, int(orderid))
def onRtnTrade(self, data: dict): order = self.orders.get(orderid, None)
if not order:
return
order.traded = data["FilledNumber"]
if data["IsCanceled"] == "1":
order.status = Status.CANCELLED
elif order.traded == order.volume:
order.status = Status.ALLTRADED
elif order.traded > 0:
order.status = Status.PARTTRADED
else:
order.status = Status.NOTTRADED
self.gateway.on_order(copy(order))
def onRtnTrade(self, data: dict, error: dict, reqid: int, last: bool):
""" """
Callback of trade status update. Callback of trade status update.
""" """
symbol = data["InstrumentID"] self.update_trade(data)
exchange = symbol_exchange_map.get(symbol, "")
if not exchange: def onRtnCapital(self, data: dict, error: dict, reqid: int, last: bool):
self.trade_data.append(data) """
return Callback of capital status update.
"""
currency = account_currency_map[data["AccountNo"]]
orderid = self.sysid_orderid_map[data["OrderSysID"]] account = AccountData(
accountid=currency,
trade = TradeData( balance=data["TodayTotal"],
symbol=symbol, frozen=data["FrozenDeposit"],
exchange=exchange,
orderid=orderid,
tradeid=data["TradeID"],
direction=DIRECTION_DA2VT[data["Direction"]],
offset=OFFSET_DA2VT[data["OffsetFlag"]],
price=data["Price"],
volume=data["Volume"],
time=data["TradeTime"],
gateway_name=self.gateway_name gateway_name=self.gateway_name
) )
self.gateway.on_trade(trade)
def update_trade(self, data: dict): self.gateway.on_account(account)
""""""
pass
def update_account(self, data: dict): def onRtnPosition(self, data: dict, error: dict, reqid: int, last: bool):
"""""" """
pass Callback of position status update.
"""
long_position = PositionData(
symbol=data["TreatyCode"],
exchange=EXCHANGE_DA2VT[data["ExchangeNo"]],
direction=Direction.LONG,
volume=data["BuyHoldNumber"],
price=data["BuyHoldOpenPrice"],
gateway_name=self.gateway_name
)
self.gateway.on_position(long_position)
def update_position(self, data: dict): short_position = PositionData(
"""""" symbol=data["TreatyCode"],
pass exchange=EXCHANGE_DA2VT[data["ExchangeNo"]],
direction=Direction.SHORT,
volume=data["SaleHoldNumber"],
price=data["SaleHoldOpenPrice"],
gateway_name=self.gateway_name
)
self.gateway.on_position(short_position)
def connect( def connect(
self, self,
@ -689,50 +681,36 @@ class DaFutureApi(FutureApi):
} }
self.reqid += 1 self.reqid += 1
n = self.reqUserLogin(req, self.reqid) self.reqUserLogin(req, self.reqid)
print("login", n)
def send_order(self, req: OrderRequest): def send_order(self, req: OrderRequest):
""" """
Send new order. Send new order.
""" """
self.order_ref += 1 self.local_no += 1
currency = symbol_currency_map[req.symbol]
account_no = currency_account_map[currency]
da_req = { da_req = {
"InstrumentID": req.symbol, "UserId": self.userid,
"ExchangeID": req.exchange.value, "AccountNo": account_no,
"LimitPrice": req.price, "LocalNo": str(self.local_no),
"VolumeTotalOriginal": int(req.volume), "TradePwd": self.password,
"OrderPriceType": ORDERTYPE_VT2DA.get(req.type, ""), "ExchangeCode": EXCHANGE_VT2DA[req.exchange],
"Direction": DIRECTION_VT2DA.get(req.direction, ""), "TreatyCode": req.symbol,
"CombOffsetFlag": OFFSET_VT2DA.get(req.offset, ""), "BuySale": DIRECTION_VT2DA[req.direction],
"OrderRef": str(self.order_ref), "OrderPrice": str(req.price),
"InvestorID": self.userid, "OrderNumber": str(int(req.volume)),
"UserID": self.userid, "PriceType": ORDERTYPE_VT2DA[req.type]
"BrokerID": self.brokerid,
"CombHedgeFlag": THOST_FTDC_HF_Speculation,
"ContingentCondition": THOST_FTDC_CC_Immediately,
"ForceCloseReason": THOST_FTDC_FCC_NotForceClose,
"IsAutoSuspend": 0,
"TimeCondition": THOST_FTDC_TC_GFD,
"VolumeCondition": THOST_FTDC_VC_AV,
"MinVolume": 1
} }
if req.type == OrderType.FAK:
da_req["OrderPriceType"] = THOST_FTDC_OPT_LimitPrice
da_req["TimeCondition"] = THOST_FTDC_TC_IOC
da_req["VolumeCondition"] = THOST_FTDC_VC_AV
elif req.type == OrderType.FOK:
da_req["OrderPriceType"] = THOST_FTDC_OPT_LimitPrice
da_req["TimeCondition"] = THOST_FTDC_TC_IOC
da_req["VolumeCondition"] = THOST_FTDC_VC_CV
self.reqid += 1 self.reqid += 1
self.reqOrderInsert(da_req, self.reqid) self.reqOrderInsert(da_req, self.reqid)
orderid = f"{self.frontid}_{self.sessionid}_{self.order_ref}" order = req.create_order_data(str(self.local_no), self.gateway_name)
order = req.create_order_data(orderid, self.gateway_name)
self.orders[order.orderid] = order
self.gateway.on_order(order) self.gateway.on_order(order)
return order.vt_orderid return order.vt_orderid
@ -741,21 +719,26 @@ class DaFutureApi(FutureApi):
""" """
Cancel existing order. Cancel existing order.
""" """
frontid, sessionid, order_ref = req.orderid.split("_") order = self.orders[req.orderid]
currency = symbol_currency_map[req.symbol]
account_no = currency_account_map[currency]
order_no, system_no = self.order_info[order.orderid]
da_req = { da_req = {
"InstrumentID": req.symbol, "UserId": self.userid,
"ExchangeID": req.exchange.value, "LocalNo": req.orderid,
"OrderRef": order_ref, "AccountNo": account_no,
"FrontID": int(frontid), "TradePwd": self.password,
"SessionID": int(sessionid), "ExchangeCode": EXCHANGE_VT2DA[req.exchange],
"ActionFlag": THOST_FTDC_AF_Delete, "TreatyCode": req.symbol,
"BrokerID": self.brokerid, "BuySale": DIRECTION_VT2DA[order.direction],
"InvestorID": self.userid "OrderNo": order_no,
"SystemNo": system_no
} }
self.reqid += 1 self.reqid += 1
self.reqOrderAction(da_req, self.reqid) self.reqOrderCancel(da_req, self.reqid)
def query_account(self): def query_account(self):
""" """
@ -775,15 +758,19 @@ class DaFutureApi(FutureApi):
""" """
Query account balance data. Query account balance data.
""" """
da_req = {"UserId": self.userid}
self.reqid += 1 self.reqid += 1
self.reqQryTrade({}, self.reqid) self.reqQryTrade(da_req, self.reqid)
def query_position(self): def query_position(self):
""" """
Query position holding data. Query position holding data.
""" """
da_req = {"AccountNo": self.userid}
self.reqid += 1 self.reqid += 1
self.reqQryPosition({}, self.reqid) self.reqQryTotalPosition(da_req, self.reqid)
def query_contract(self, exchange, page=1): def query_contract(self, exchange, page=1):
""" """
@ -827,4 +814,21 @@ def get_mac_address():
if not interface: if not interface:
return "" return ""
return interface.MACAddress return interface.MACAddress
def to_int(data: str) -> int:
""""""
if not data:
return 0
else:
return int(data)
def to_float(data: str) -> float:
""""""
if not data:
return 0.0
else:
return float(data)