diff --git a/vnpy/api/da/vndafuture.pyd b/vnpy/api/da/vndafuture.pyd index 0bae339f..285eee87 100644 Binary files a/vnpy/api/da/vndafuture.pyd and b/vnpy/api/da/vndafuture.pyd differ diff --git a/vnpy/gateway/da/da_gateway.py b/vnpy/gateway/da/da_gateway.py index 978baf1c..ad6d0d00 100644 --- a/vnpy/gateway/da/da_gateway.py +++ b/vnpy/gateway/da/da_gateway.py @@ -2,7 +2,7 @@ """ from datetime import datetime -from threading import Thread +from copy import copy import wmi @@ -38,7 +38,7 @@ from vnpy.trader.event import EVENT_TIMER STATUS_DA2VT = { - "1": Status.SUBMITTING,, + "1": Status.SUBMITTING, "2": Status.NOTTRADED, "3": Status.PARTTRADED, "4": Status.ALLTRADED, @@ -78,7 +78,7 @@ EXCHANGE_DA2VT = { # "LME": Exchange.LME, # "CME_CBT": Exchange.CBOT, # "HKEX": Exchange.HKFE, - # "CME": Exchange.CME, + "CME": Exchange.CME, # "TOCOM": Exchange.TOCOM, # "KRX": Exchange.KRX, # "ICE": Exchange.ICE @@ -96,9 +96,10 @@ PRODUCT_DA2VT = { # } -symbol_exchange_map = {} symbol_name_map = {} -symbol_size_map = {} +symbol_currency_map = {} +currency_account_map = {} +account_currency_map = {} class DaGateway(BaseGateway): @@ -130,7 +131,7 @@ class DaGateway(BaseGateway): future_address = setting["交易服务器"] market_address = setting["行情服务器"] auth_code = setting["授权码"] - + if not future_address.startswith("tcp://"): future_address = "tcp://" + future_address if not market_address.startswith("tcp://"): @@ -224,50 +225,47 @@ class DaMarketApi(MarketApi): """ Callback of tick data update. """ - print('on rsp', data, error) symbol = data["TreatyCode"] exchange = EXCHANGE_DA2VT.get(data["ExchangeCode"], None) if not exchange: return - timestamp = f"{data['TradeDay']} {data['Time']}" - tick = TickData( symbol=symbol, exchange=exchange, - datetime=datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f"), - name=symbol_name_map[symbol], - volume=int(data["FilledNum"]), - open_interest=int(data["HoldNum"]), - limit_up=float(data["LimitUpPrice"]), - limit_down=float(data["LimitDownPrice"]), - last_price=float(data["CurrPrice"]), - open_price=float(data["Open"]), - high_price=float(data["High"]), - low_price=float(data["Low"]), - pre_close=float(data["PreSettlementPrice"]), - bid_price_1=float(data["BuyPrice"]), - bid_price_2=float(data["BuyPrice2"]), - bid_price_3=float(data["BuyPrice3"]), - bid_price_4=float(data["BuyPrice4"]), - bid_price_5=float(data["BuyPrice5"]), - ask_price_1=float(data["SalePrice"]), - ask_price_2=float(data["SalePrice2"]), - ask_price_3=float(data["SalePrice3"]), - ask_price_4=float(data["SalePrice4"]), - ask_price_5=float(data["SalePrice5"]), - bid_volume_1=int(data["BuyNumber"]), - bid_volume_2=int(data["BuyNumber2"]), - bid_volume_3=int(data["BuyNumber3"]), - bid_volume_4=int(data["BuyNumber4"]), - bid_volume_5=int(data["BuyNumber5"]), - ask_volume_1=int(data["SaleNumber"]), - ask_volume_2=int(data["SaleNumber2"]), - ask_volume_3=int(data["SaleNumber3"]), - ask_volume_4=int(data["SaleNumber4"]), - ask_volume_5=int(data["SaleNumber5"]), + datetime=datetime.strptime(data['Time'], "%Y-%m-%d %H:%M:%S"), + volume=to_int(data["FilledNum"]), + open_interest=to_int(data["HoldNum"]), + limit_up=to_float(data["LimitUpPrice"]), + limit_down=to_float(data["LimitDownPrice"]), + last_price=to_float(data["CurrPrice"]), + open_price=to_float(data["Open"]), + high_price=to_float(data["High"]), + low_price=to_float(data["Low"]), + pre_close=to_float(data["PreSettlementPrice"]), + bid_price_1=to_float(data["BuyPrice"]), + bid_price_2=to_float(data["BuyPrice2"]), + bid_price_3=to_float(data["BuyPrice3"]), + bid_price_4=to_float(data["BuyPrice4"]), + bid_price_5=to_float(data["BuyPrice5"]), + ask_price_1=to_float(data["SalePrice"]), + ask_price_2=to_float(data["SalePrice2"]), + ask_price_3=to_float(data["SalePrice3"]), + ask_price_4=to_float(data["SalePrice4"]), + ask_price_5=to_float(data["SalePrice5"]), + bid_volume_1=to_int(data["BuyNumber"]), + bid_volume_2=to_int(data["BuyNumber2"]), + bid_volume_3=to_int(data["BuyNumber3"]), + bid_volume_4=to_int(data["BuyNumber4"]), + bid_volume_5=to_int(data["BuyNumber5"]), + ask_volume_1=to_int(data["SaleNumber"]), + ask_volume_2=to_int(data["SaleNumber2"]), + ask_volume_3=to_int(data["SaleNumber3"]), + ask_volume_4=to_int(data["SaleNumber4"]), + ask_volume_5=to_int(data["SaleNumber5"]), gateway_name=self.gateway_name ) + tick.name = symbol_name_map[tick.vt_symbol] self.gateway.on_tick(tick) def connect(self, address: str, userid: str, password: str, auth_code: str): @@ -328,7 +326,6 @@ class DaMarketApi(MarketApi): } self.reqid += 1 i = self.reqMarketData(da_req, self.reqid) - print(i, da_req) self.subscribed[req.symbol] = req @@ -351,7 +348,7 @@ class DaFutureApi(FutureApi): self.gateway_name = gateway.gateway_name self.reqid = 0 - self.order_ref = 0 + self.local_no = int(datetime.now().strftime("%Y%m%d") + "000000") self.connect_status = False self.login_status = False @@ -363,6 +360,9 @@ class DaFutureApi(FutureApi): self.auth_code = "" self.mac_address = get_mac_address() + self.orders = {} + self.order_info = {} + def onFrontConnected(self): """""" self.gateway.write_log("交易服务器连接成功") @@ -382,6 +382,13 @@ class DaFutureApi(FutureApi): # 查询可交易合约 for exchange in EXCHANGE_DA2VT.values(): self.query_contract(exchange) + + # 查询账户信息 + self.query_account() + self.query_position() + self.query_order() + self.query_trade() + else: self.login_failed = True self.gateway.write_error("交易服务器登录失败", error) @@ -392,30 +399,25 @@ class DaFutureApi(FutureApi): def onRspOrderInsert(self, data: dict, error: dict, reqid: int, last: bool): """""" - order_ref = data["OrderRef"] - orderid = f"{self.frontid}_{self.sessionid}_{order_ref}" + errorid = error["ErrorID"] + orderid = data["LocalNo"] + order = self.orders[orderid] - symbol = data["InstrumentID"] - exchange = symbol_exchange_map[symbol] + if errorid: + order.status = Status.REJECTED + self.gateway.write_error("交易委托失败", error) + else: + order.time = data["OrderTime"] - order = OrderData( - symbol=symbol, - exchange=exchange, - orderid=orderid, - direction=DIRECTION_DA2VT[data["Direction"]], - offset=OFFSET_DA2VT.get(data["CombOffsetFlag"], Offset.NONE), - price=data["LimitPrice"], - volume=data["VolumeTotalOriginal"], - status=Status.REJECTED, - gateway_name=self.gateway_name - ) - self.gateway.on_order(order) + self.order_info[order.orderid] = (data["OrderNo"], data["SystemNo"]) + + self.gateway.on_order(copy(order)) - self.gateway.write_error("交易委托失败", error) - - def onRspOrderAction(self, data: dict, error: dict, reqid: int, last: bool): + def onRspOrderCancel(self, data: dict, error: dict, reqid: int, last: bool): """""" - self.gateway.write_error("交易撤单失败", error) + errorid = error["ErrorID"] + if errorid: + self.gateway.write_error("交易撤单失败", error) def onRspQueryMaxOrderVolume(self, data: dict, error: dict, reqid: int, last: bool): """""" @@ -430,79 +432,6 @@ class DaFutureApi(FutureApi): self.reqid += 1 self.reqQryInstrument({}, self.reqid) - def onRspQryInvestorPosition(self, data: dict, error: dict, reqid: int, last: bool): - """""" - if not data: - return - - # Get buffered position object - key = f"{data['InstrumentID'], data['PosiDirection']}" - position = self.positions.get(key, None) - if not position: - position = PositionData( - symbol=data["InstrumentID"], - exchange=symbol_exchange_map[data["InstrumentID"]], - direction=DIRECTION_DA2VT[data["PosiDirection"]], - gateway_name=self.gateway_name - ) - self.positions[key] = position - - # For SHFE position data update - if position.exchange == Exchange.SHFE: - if data["YdPosition"] and not data["TodayPosition"]: - position.yd_volume = data["Position"] - # For other exchange position data update - else: - position.yd_volume = data["Position"] - data["TodayPosition"] - - # Get contract size (spread contract has no size value) - size = symbol_size_map.get(position.symbol, 0) - - # Calculate previous position cost - cost = position.price * position.volume * size - - # Update new position volume - position.volume += data["Position"] - position.pnl += data["PositionProfit"] - - # Calculate average position price - if position.volume and size: - cost += data["PositionCost"] - position.price = cost / (position.volume * size) - - # Get frozen volume - if position.direction == Direction.LONG: - position.frozen += data["ShortFrozen"] - else: - position.frozen += data["LongFrozen"] - - if last: - for position in self.positions.values(): - self.gateway.on_position(position) - - self.positions.clear() - - def onRspQryTradingAccount(self, data: dict, error: dict, reqid: int, last: bool): - """""" - if "AccountID" not in data: - return - - account = AccountData( - accountid=data["AccountID"], - balance=data["Balance"], - frozen=data["FrozenMargin"] + data["FrozenCash"] + data["FrozenCommission"], - gateway_name=self.gateway_name - ) - account.available = data["Available"] - - self.gateway.on_account(account) - - def onRspQryExchange(self, data: dict, error: dict, reqid: int, last: bool): - """ - Callback of instrument query. - """ - print(data) - def onRspQryInstrument(self, data: dict, error: dict, reqid: int, last: bool): """ Callback of instrument query. @@ -519,6 +448,9 @@ class DaFutureApi(FutureApi): gateway_name=self.gateway_name ) + symbol_name_map[contract.vt_symbol] = contract.name + symbol_currency_map[contract.symbol] = data["CommodityFCurrencyNo"] + self.gateway.on_contract(contract) if last: @@ -542,17 +474,32 @@ class DaFutureApi(FutureApi): time=data["OrderTime"], gateway_name=self.gateway_name ) - self.gateway.on_order(order) + + self.local_no = max(self.local_no, int(data["LocalNo"])) + self.orders[order.orderid] = order + self.order_info[order.orderid] = (data["OrderNo"], data["SystemNo"]) + + self.gateway.on_order(copy(order)) + + if last: + self.gateway.write_log("委托信息查询成功") def onRspQryTrade(self, data: dict, error: dict, reqid: int, last: bool): """ Callback of trade query. """ + self.update_trade(data) + + if last: + self.gateway.write_log("成交信息查询成功") + + def update_trade(self, data: dict): + """""" trade = TradeData( symbol=data["TreatyCode"], exchange=EXCHANGE_DA2VT[data["ExchangeCode"]], orderid=data["LocalNo"], - tradeid=data["FilledNo"] + tradeid=data["FilledNo"], direction=DIRECTION_DA2VT[data["BuySale"]], offset=OFFSET_DA2VT[data["AddReduce"]], price=float(data["FilledPrice"]), @@ -567,70 +514,115 @@ class DaFutureApi(FutureApi): Callback of trade query. """ account = AccountData( - accountid=data["UserId"], - balance=float(data["TodayBalance"]) + accountid=data["CurrencyNo"], + balance=float(data["TodayBalance"]), frozen=float(data["FreezenMoney"]), gateway_name=self.gateway_name ) + + currency_account_map[data["CurrencyNo"]] = data["AccountNo"] + account_currency_map[data["AccountNo"]] = data["CurrencyNo"] self.gateway.on_account(account) - def onRspQryPosition(self, data: dict, error: dict, reqid: int, last: bool): - """ - Callback of trade query. - """ - position = PositionData( - symbol=data["ContCode"], - exchange=EXCHANGE_DA2VT[data["ExchangeNo"]], - direction=DIRECTION_DA2VT[data["Direct"]], - volume=data["HoldVol"], - price=data["HoldPrice"], - gateway_name=self.gateway_name - ) - self.gateway.on_position(position) + if last: + self.gateway.write_log("资金信息查询成功") - def onRtnOrder(self, data: dict): + def onRspQryTotalPosition(self, data: dict, error: dict, reqid: int, last: bool): + """ + Callback of position query. + """ + if data["TreatyCode"]: + long_position = PositionData( + symbol=data["TreatyCode"], + exchange=EXCHANGE_DA2VT[data["ExchangeNo"]], + direction=Direction.LONG, + volume=data["BuyHoldNumber"], + price=data["BuyHoldOpenPrice"], + gateway_name=self.gateway_name + ) + self.gateway.on_position(long_position) + + short_position = PositionData( + symbol=data["TreatyCode"], + exchange=EXCHANGE_DA2VT[data["ExchangeNo"]], + direction=Direction.SHORT, + volume=data["SaleHoldNumber"], + price=data["SaleHoldOpenPrice"], + gateway_name=self.gateway_name + ) + self.gateway.on_position(short_position) + + if last: + self.gateway.write_log("持仓信息查询成功") + + def onRtnOrder(self, data: dict, error: dict, reqid: int, last: bool): """ Callback of order status update. """ - pass + orderid = data["LocalOrderNo"] + self.local_no = max(self.local_no, int(orderid)) - def onRtnTrade(self, data: dict): + order = self.orders.get(orderid, None) + if not order: + return + + order.traded = data["FilledNumber"] + + if data["IsCanceled"] == "1": + order.status = Status.CANCELLED + elif order.traded == order.volume: + order.status = Status.ALLTRADED + elif order.traded > 0: + order.status = Status.PARTTRADED + else: + order.status = Status.NOTTRADED + + self.gateway.on_order(copy(order)) + + def onRtnTrade(self, data: dict, error: dict, reqid: int, last: bool): """ Callback of trade status update. """ - symbol = data["InstrumentID"] - exchange = symbol_exchange_map.get(symbol, "") - if not exchange: - self.trade_data.append(data) - return + self.update_trade(data) + + def onRtnCapital(self, data: dict, error: dict, reqid: int, last: bool): + """ + Callback of capital status update. + """ + currency = account_currency_map[data["AccountNo"]] - orderid = self.sysid_orderid_map[data["OrderSysID"]] - - trade = TradeData( - symbol=symbol, - exchange=exchange, - orderid=orderid, - tradeid=data["TradeID"], - direction=DIRECTION_DA2VT[data["Direction"]], - offset=OFFSET_DA2VT[data["OffsetFlag"]], - price=data["Price"], - volume=data["Volume"], - time=data["TradeTime"], + account = AccountData( + accountid=currency, + balance=data["TodayTotal"], + frozen=data["FrozenDeposit"], gateway_name=self.gateway_name ) - self.gateway.on_trade(trade) - def update_trade(self, data: dict): - """""" - pass + self.gateway.on_account(account) - def update_account(self, data: dict): - """""" - pass + def onRtnPosition(self, data: dict, error: dict, reqid: int, last: bool): + """ + Callback of position status update. + """ + long_position = PositionData( + symbol=data["TreatyCode"], + exchange=EXCHANGE_DA2VT[data["ExchangeNo"]], + direction=Direction.LONG, + volume=data["BuyHoldNumber"], + price=data["BuyHoldOpenPrice"], + gateway_name=self.gateway_name + ) + self.gateway.on_position(long_position) - def update_position(self, data: dict): - """""" - pass + short_position = PositionData( + symbol=data["TreatyCode"], + exchange=EXCHANGE_DA2VT[data["ExchangeNo"]], + direction=Direction.SHORT, + volume=data["SaleHoldNumber"], + price=data["SaleHoldOpenPrice"], + gateway_name=self.gateway_name + ) + self.gateway.on_position(short_position) def connect( self, @@ -689,50 +681,36 @@ class DaFutureApi(FutureApi): } self.reqid += 1 - n = self.reqUserLogin(req, self.reqid) - print("login", n) + self.reqUserLogin(req, self.reqid) def send_order(self, req: OrderRequest): """ Send new order. """ - self.order_ref += 1 + self.local_no += 1 + currency = symbol_currency_map[req.symbol] + account_no = currency_account_map[currency] + da_req = { - "InstrumentID": req.symbol, - "ExchangeID": req.exchange.value, - "LimitPrice": req.price, - "VolumeTotalOriginal": int(req.volume), - "OrderPriceType": ORDERTYPE_VT2DA.get(req.type, ""), - "Direction": DIRECTION_VT2DA.get(req.direction, ""), - "CombOffsetFlag": OFFSET_VT2DA.get(req.offset, ""), - "OrderRef": str(self.order_ref), - "InvestorID": self.userid, - "UserID": self.userid, - "BrokerID": self.brokerid, - "CombHedgeFlag": THOST_FTDC_HF_Speculation, - "ContingentCondition": THOST_FTDC_CC_Immediately, - "ForceCloseReason": THOST_FTDC_FCC_NotForceClose, - "IsAutoSuspend": 0, - "TimeCondition": THOST_FTDC_TC_GFD, - "VolumeCondition": THOST_FTDC_VC_AV, - "MinVolume": 1 + "UserId": self.userid, + "AccountNo": account_no, + "LocalNo": str(self.local_no), + "TradePwd": self.password, + "ExchangeCode": EXCHANGE_VT2DA[req.exchange], + "TreatyCode": req.symbol, + "BuySale": DIRECTION_VT2DA[req.direction], + "OrderPrice": str(req.price), + "OrderNumber": str(int(req.volume)), + "PriceType": ORDERTYPE_VT2DA[req.type] } - if req.type == OrderType.FAK: - da_req["OrderPriceType"] = THOST_FTDC_OPT_LimitPrice - da_req["TimeCondition"] = THOST_FTDC_TC_IOC - da_req["VolumeCondition"] = THOST_FTDC_VC_AV - elif req.type == OrderType.FOK: - da_req["OrderPriceType"] = THOST_FTDC_OPT_LimitPrice - da_req["TimeCondition"] = THOST_FTDC_TC_IOC - da_req["VolumeCondition"] = THOST_FTDC_VC_CV - self.reqid += 1 self.reqOrderInsert(da_req, self.reqid) - orderid = f"{self.frontid}_{self.sessionid}_{self.order_ref}" - order = req.create_order_data(orderid, self.gateway_name) + order = req.create_order_data(str(self.local_no), self.gateway_name) + + self.orders[order.orderid] = order self.gateway.on_order(order) return order.vt_orderid @@ -741,21 +719,26 @@ class DaFutureApi(FutureApi): """ Cancel existing order. """ - frontid, sessionid, order_ref = req.orderid.split("_") + order = self.orders[req.orderid] + + currency = symbol_currency_map[req.symbol] + account_no = currency_account_map[currency] + order_no, system_no = self.order_info[order.orderid] da_req = { - "InstrumentID": req.symbol, - "ExchangeID": req.exchange.value, - "OrderRef": order_ref, - "FrontID": int(frontid), - "SessionID": int(sessionid), - "ActionFlag": THOST_FTDC_AF_Delete, - "BrokerID": self.brokerid, - "InvestorID": self.userid + "UserId": self.userid, + "LocalNo": req.orderid, + "AccountNo": account_no, + "TradePwd": self.password, + "ExchangeCode": EXCHANGE_VT2DA[req.exchange], + "TreatyCode": req.symbol, + "BuySale": DIRECTION_VT2DA[order.direction], + "OrderNo": order_no, + "SystemNo": system_no } self.reqid += 1 - self.reqOrderAction(da_req, self.reqid) + self.reqOrderCancel(da_req, self.reqid) def query_account(self): """ @@ -775,15 +758,19 @@ class DaFutureApi(FutureApi): """ Query account balance data. """ + da_req = {"UserId": self.userid} + self.reqid += 1 - self.reqQryTrade({}, self.reqid) + self.reqQryTrade(da_req, self.reqid) def query_position(self): """ Query position holding data. """ + da_req = {"AccountNo": self.userid} + self.reqid += 1 - self.reqQryPosition({}, self.reqid) + self.reqQryTotalPosition(da_req, self.reqid) def query_contract(self, exchange, page=1): """ @@ -827,4 +814,21 @@ def get_mac_address(): if not interface: return "" - return interface.MACAddress \ No newline at end of file + return interface.MACAddress + + +def to_int(data: str) -> int: + """""" + if not data: + return 0 + else: + return int(data) + + +def to_float(data: str) -> float: + """""" + if not data: + return 0.0 + else: + return float(data) + \ No newline at end of file