From 338db92c03cba2bc0f03be80580dbe14a9ee59d1 Mon Sep 17 00:00:00 2001 From: msincenselee Date: Thu, 16 Jul 2020 20:41:07 +0800 Subject: [PATCH] [bug fix] --- vnpy/app/account_recorder/README.md | 10 + vnpy/app/algo_trading/template.py | 12 +- vnpy/app/cta_stock/engine.py | 1 - vnpy/app/cta_strategy_pro/template.py | 10 +- vnpy/gateway/ctp/ctp_gateway.py | 2 +- vnpy/gateway/pb/pb_gateway.py | 999 ++++++++++++++++++++++++-- 6 files changed, 957 insertions(+), 77 deletions(-) diff --git a/vnpy/app/account_recorder/README.md b/vnpy/app/account_recorder/README.md index ef5254a2..5de99508 100644 --- a/vnpy/app/account_recorder/README.md +++ b/vnpy/app/account_recorder/README.md @@ -31,3 +31,13 @@ } } } + +# 创建mongodb 索引,提高性能 + +db.today_orders.createIndex({'account_id':1,'vt_symbol':1,'sys_orderid':1,'order_date':1,'holder_id':1},{'name':'accountid_vtsymbol_sysorderid_order_date_holder_id','unique':true}) +db.history_orders.createIndex({'account_id':1,'vt_symbol':1,'sys_orderid':1,'order_date':1,'holder_id':1},{'name':'history_accountid_vtsymbol_sysorderid_order_date_holder_id'}) +db.today_trades.createIndex({'account_id':1,'vt_symbol':1,'vt_tradeid':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtSymbol_vt_tradeid_trade_date_holder_id','unique':true}) +db.history_trades.createIndex({'account_id':1,'vt_symbol':1,'vt_tradeid':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtSymbol_vt_tradeid_trade_date_holder_id'}) +db.today_positions.createIndex({'account_id':1,'vt_symbol':1,'direction':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtsymbol_direction_trade_date_holder_id'}) +db.today_strategy_pos.createIndex({'account_id':1,'strategy_group':1,'strategy_name':1,'date':1},{'name':'accountid_strategy_group_strategy_name_date'}) +db.strategy_snapshot.createIndex({'account_id':1,'strategy_group':1,'strategy':1,'guid':1},{'name':'accountid_strategy_name_guid'}) diff --git a/vnpy/app/algo_trading/template.py b/vnpy/app/algo_trading/template.py index 31f30a86..65604a9b 100644 --- a/vnpy/app/algo_trading/template.py +++ b/vnpy/app/algo_trading/template.py @@ -117,7 +117,11 @@ class AlgoTemplate: offset: Offset = Offset.NONE ): """""" - msg = f"委托买入{vt_symbol}:{volume}@{price}" + if offset in [Offset.CLOSE]: + msg = f"委托买平{vt_symbol}:{volume}@{price}" + else: + msg = f"委托买入{vt_symbol}:{volume}@{price}" + self.write_log(msg) return self.algo_engine.send_order( @@ -139,7 +143,11 @@ class AlgoTemplate: offset: Offset = Offset.NONE ): """""" - msg = f"委托卖出{vt_symbol}:{volume}@{price}" + if offset in [Offset.NONE, Offset.CLOSE]: + msg = f"委托卖出{vt_symbol}:{volume}@{price}" + else: + msg = f"委托开空{vt_symbol}:{volume}@{price}" + self.write_log(msg) return self.algo_engine.send_order( diff --git a/vnpy/app/cta_stock/engine.py b/vnpy/app/cta_stock/engine.py index 962a9a8d..e876a9e6 100644 --- a/vnpy/app/cta_stock/engine.py +++ b/vnpy/app/cta_stock/engine.py @@ -234,7 +234,6 @@ class CtaEngine(BaseEngine): # 比对仓位,使用上述获取得持仓信息,不用重复获取 self.compare_pos(strategy_pos_list=copy(all_strategy_pos)) - # 推送到事件 self.put_all_strategy_pos_event(all_strategy_pos) diff --git a/vnpy/app/cta_strategy_pro/template.py b/vnpy/app/cta_strategy_pro/template.py index 71289af8..19e14052 100644 --- a/vnpy/app/cta_strategy_pro/template.py +++ b/vnpy/app/cta_strategy_pro/template.py @@ -804,8 +804,10 @@ class CtaProTemplate(CtaTemplate): self.display_grids() if not self.backtesting: - pos_symbols.add(self.vt_symbol) - pos_symbols.add(self.idx_symbol) + if self.vt_symbol not in pos_symbols: + pos_symbols.add(self.vt_symbol) + if self.idx_symbol not in pos_symbols: + pos_symbols.add(self.idx_symbol) # 如果持仓的合约,不在self.vt_symbol中,需要订阅 for symbol in list(pos_symbols): self.write_log(f'新增订阅合约:{symbol}') @@ -1807,7 +1809,7 @@ class CtaProFutureTemplate(CtaProTemplate): # 当前没有昨仓,采用锁仓处理 else: - self.write_log(u'昨仓多单:{}不满足条件,创建对锁仓'.format(grid_pos.longYd)) + self.write_log(u'昨仓多单:{}不满足条件,创建对锁仓'.format(grid_pos.long_yd)) dist_record = dict() dist_record['datetime'] = self.cur_datetime dist_record['symbol'] = sell_symbol @@ -1907,7 +1909,7 @@ class CtaProFutureTemplate(CtaProTemplate): # 当前没有昨仓,采用锁仓处理 else: - self.write_log(u'昨仓空单:{}不满足条件,建立对锁仓'.format(grid_pos.shortYd)) + self.write_log(u'昨仓空单:{}不满足条件,建立对锁仓'.format(grid_pos.short_yd)) dist_record = dict() dist_record['datetime'] = self.cur_datetime dist_record['symbol'] = cover_symbol diff --git a/vnpy/gateway/ctp/ctp_gateway.py b/vnpy/gateway/ctp/ctp_gateway.py index 7e937093..ad251513 100644 --- a/vnpy/gateway/ctp/ctp_gateway.py +++ b/vnpy/gateway/ctp/ctp_gateway.py @@ -1875,7 +1875,7 @@ class TickCombiner(object): ratio_tick.date = tick.date ratio_tick.time = tick.time - # 比率tick + # 比率tick = (腿1 * 腿1 手数 / 腿2价格 * 腿2手数) 百分比 ratio_tick.ask_price_1 = 100 * self.last_leg1_tick.ask_price_1 * self.leg1_ratio \ / (self.last_leg2_tick.bid_price_1 * self.leg2_ratio) # noqa ratio_tick.ask_price_1 = round_to( diff --git a/vnpy/gateway/pb/pb_gateway.py b/vnpy/gateway/pb/pb_gateway.py index 9b60e42e..e0ec75e6 100644 --- a/vnpy/gateway/pb/pb_gateway.py +++ b/vnpy/gateway/pb/pb_gateway.py @@ -1,11 +1,15 @@ # 恒投交易客户端 文件接口 +# 1. 支持csv/dbf文件的读写 +# 2. 采用tdx作为行情数据源 # 华富资产 李来佳 28888502 import os import sys import copy import csv +import dbf import traceback +import pandas as pd from typing import Any, Dict, List from datetime import datetime, timedelta from time import sleep @@ -36,6 +40,7 @@ from vnpy.trader.object import ( AccountData ) from vnpy.trader.utility import get_folder_path, print_dict, extract_vt_symbol, get_stock_exchange, append_data +from vnpy.data.tdx.tdx_common import get_stock_type_sz, get_stock_type_sh # 代码 <=> 中文名称 symbol_name_map: Dict[str, str] = {} @@ -87,6 +92,58 @@ CANCEL_ORDER_FIELDS = OrderedDict({ "BYZD": "C2", # 备用字段 "BYZD2": "C16", # 备用字段2 }) + +# 通用接口_委托查询 +UPDATE_ORDER_FIELDS = OrderedDict({ + "WTRQ": "N8", # 委托日期 + "WTSJ": "N6", # 委托时间 + "WTXH": "N8", # 委托序号 + "WBZDYXH": "N9", # 第三方系统自定义号 + "CPBH": "C32", # 产品(账户)编号 + "ZCDYBH": "C16", # 资产单元编号 + "ZHBH": "C16", # 组合编号 + "GDDM": "C20", # 股东代码 + "JYSC": "C3", # 交易市场 + "ZQDM": "C16", # 证券代码 + "WTFX": "C4", # 委托方向 + "WTJGLX": "C1", # 委托价格类型 + "WTJG": "N11.4", # 委托价格 + "WTSL": "N12", # 委托数量 + "YMDJJE": "N16.2", # 预买冻结金额 + "YMSRJE": "N16.2", # 预卖收入金额 + "WTZT": "C1", # 委托状态 + "WTCCSL": "N12", # 委托撤成数量 + "FDYY": "C254", # 废单原因 + "JYSSBBH": "C64", # 交易所申报编号 + "CLBZ": "C1", # 处理标志 + "BYZD": "C2", # 备用字段 + "WTJE": "N16.2", # 委托金额 + "TSBS": "C64", # 特殊标识 +}) + +# 通用接口_成交查询 +UPDATE_TRADE_FIELDS = OrderedDict({ + "CJRQ": "N8", # 成交日期 + "CJBH": "C64", # 成交序号 + "WTXH": "N8", # 委托序号 + "WBZDYXH": "N9", # 第三方系统自定义号 + "CPBH": "C32", # 产品(账户)编号 + "ZCDYBH": "C16", # 资产单元编号 + "ZHBH": "C16", # 组合编号 + "GDDM": "C20", # 股东代码 + "JYSC": "C3", # 交易市场 + "ZQDM": "C16", # 证券代码 + "WTFX": "C4", # 委托方向 + "CJSL": "N16", # 成交数量 + "CJJG": "N11.4", # 成交价格 + "CJJE": "N16.2", # 成交金额 + "ZFY": "N16.2", # 总费用 + "CJSJ": "N6", # 成交时间 + "CLBZ": "C1", # 处理标志 + "BYZD": "C2", # 备用字段 + "TSBS": "C64", # 特殊标识 + "JYSCJBH": "C64", # 成交编号 +}) # 交易所id <=> Exchange EXCHANGE_PB2VT: Dict[str, Exchange] = { "1": Exchange.SSE, @@ -115,8 +172,19 @@ DIRECTION_STOCK_PB2VT: Dict[str, Any] = { DIRECTION_STOCK_VT2PB: Dict[Any, str] = {v: k for k, v in DIRECTION_STOCK_PB2VT.items()} DIRECTION_STOCK_NAME2VT: Dict[str, Any] = { "卖出": Direction.SHORT, - "买入": Direction.LONG + "买入": Direction.LONG, + "债券买入": Direction.LONG, + "债券卖出": Direction.SHORT, + "申购": Direction.LONG } + +DIRECTION_ORDER_PB2VT: Dict[str, Any] = { + "1": Direction.LONG, + "2": Direction.SHORT, + "3": Direction.LONG, + "4": Direction.SHORT +} + # 持仓方向 <=> Direction POSITION_DIRECTION_PB2VT = { "1": Direction.LONG, @@ -207,6 +275,22 @@ STATUS_NAME2VT: Dict[str, Status] = { "未审批即撤销": Status.UNKNOWN, } +STATUS_PB2VT: Dict[str, Status] = { + "1": Status.SUBMITTING, + "2": Status.SUBMITTING, + "3": Status.SUBMITTING, + "4": Status.NOTTRADED, + "5": Status.REJECTED, + "6": Status.PARTTRADED, + "7": Status.ALLTRADED, + "8": Status.CANCELLED, + "9": Status.CANCELLED, + "a": Status.CANCELLING, + "b": Status.UNKNOWN, + "c": Status.UNKNOWN, + "d": Status.UNKNOWN, +} + STOCK_CONFIG_FILE = 'tdx_stock_config.pkb2' from pytdx.hq import TdxHq_API # 通达信股票行情 @@ -222,7 +306,10 @@ class PbGateway(BaseGateway): "产品编号": "", "单元编号": "", "股东代码_沪": "", - "股东代码_深": "" + "股东代码_深": "", + "文件格式": "dbf", + "导出子目录": "数据导出", + "pb版本": "2018" } # 接口支持得交易所清单 @@ -239,6 +326,9 @@ class PbGateway(BaseGateway): self.tdx_connected = False # 通达信行情API得连接状态 + self.file_type = 'dbf' + self.pb_version = '2018' + def connect(self, setting: dict) -> None: """""" userid = setting["资金账号"] @@ -249,8 +339,19 @@ class PbGateway(BaseGateway): Exchange.SSE: setting["股东代码_沪"], Exchange.SZSE: setting["股东代码_深"] } + self.file_type = setting.get('文件格式', 'dbf') + self.pb_version = setting.get('pb版本', '2018') + + # 2019版,导出目录,自动增加一个‘数据导出’的子文件夹 + # 2018版,导出目录,无自动增加的子目录 + export_sub_folder = setting.get('导出子目录', '数据导出') + if len(export_sub_folder) > 0: + # 2019款 + export_folder = os.path.abspath(os.path.join(csv_folder, export_sub_folder)) + else: + # 2018款 + export_folder = csv_folder - export_folder = os.path.abspath(os.path.join(csv_folder, "数据导出")) self.md_api.connect() self.td_api.connect(user_id=userid, order_folder=csv_folder, @@ -305,17 +406,17 @@ class PbGateway(BaseGateway): def init_query(self) -> None: """""" self.count = 0 - self.query_functions = [self.query_account, self.query_position, self.query_orders, self.query_trades] + self.query_functions = [self.query_account, self.query_position] self.event_engine.register(EVENT_TIMER, self.process_timer_event) class PbMdApi(object): - def __init__(self, gateway: BaseGateway): + def __init__(self, gateway: PbGateway): """""" super().__init__() - self.gateway: BaseGateway = gateway + self.gateway: PbGateway = gateway self.gateway_name: str = gateway.gateway_name self.connect_status: bool = False @@ -337,10 +438,6 @@ class PbMdApi(object): self.commission_dict = {} self.contract_dict = {} - if len(self.symbol_dict) == 0: # or self.cache_time < datetime.now() - timedelta(days=1): - # self.cache_config() - self.gateway.write_error(f'本地没有股票信息的缓存配置文件') - # self.queue = Queue() # 请求队列 self.pool = None # 线程池 # self.req_thread = None # 定时器线程 @@ -427,6 +524,11 @@ class PbMdApi(object): self.gateway.write_log(u'开始通达信行情服务器') + if len(self.symbol_dict) == 0: + self.gateway.write_error(f'本地没有股票信息的缓存配置文件') + else: + self.cov_contracts() + # 选取最佳服务器 if self.best_ip['ip'] is None and self.best_ip['port'] is None: self.best_ip = self.select_best_ip() @@ -448,6 +550,9 @@ class PbMdApi(object): self.connection_status = True self.security_count = c + # if len(symbol_name_map) == 0: + # self.get_stock_list() + except Exception as ex: self.gateway.write_error(u'连接服务器tdx[{}]异常:{},{}'.format(i, str(ex), traceback.format_exc())) return @@ -559,6 +664,112 @@ class PbMdApi(object): # df = pd.DataFrame(all_contacts) # df.to_csv(export_file) + def cov_contracts(self): + """转换本地缓存=》合约信息推送""" + for symbol_marketid, info in self.symbol_dict.items(): + symbol, market_id = symbol_marketid.split('_') + exchange = info.get('exchange', '') + if len(exchange) == 0: + continue + exchange = Exchange(exchange) + if info['stock_type'] == 'stock_cn': + product = Product.EQUITY + elif info['stock_type'] in ['bond_cn', 'cb_cn']: + product = Product.BOND + elif info['stock_type'] == 'index_cn': + product = Product.INDEX + elif info['stock_type'] == 'etf_cn': + product = Product.ETF + else: + product = Product.EQUITY + + volume_tick = info['volunit'] + if symbol.startswith('688'): + volume_tick = 200 + + contract = ContractData( + gateway_name=self.gateway_name, + symbol=symbol, + exchange=exchange, + name=info['name'], + product=product, + pricetick=round(0.1 ** info['decimal_point'], info['decimal_point']), + size=1, + min_volume=volume_tick, + margin_rate=1 + ) + # 缓存 合约 =》 中文名 + symbol_name_map.update({contract.symbol: contract.name}) + + # 缓存代码和交易所的印射关系 + symbol_exchange_map[contract.symbol] = contract.exchange + + self.contract_dict.update({contract.symbol: contract}) + self.contract_dict.update({contract.vt_symbol: contract}) + # 推送 + self.gateway.on_contract(contract) + + def get_stock_list(self): + """股票所有的code&name列表""" + + api = self.api_dict.get(0) + if api is None: + self.gateway.write_log(u'取不到api连接,更新合约信息失败') + return None + + self.gateway.write_log(f'查询所有的股票信息') + + data = pd.concat( + [pd.concat([api.to_df(api.get_security_list(j, i * 1000)).assign(sse='sz' if j == 0 else 'sh').set_index( + ['code', 'sse'], drop=False) for i in range(int(api.get_security_count(j) / 1000) + 1)], axis=0) for j + in range(2)], axis=0) + sz = data.query('sse=="sz"') + sh = data.query('sse=="sh"') + sz = sz.assign(sec=sz.code.apply(get_stock_type_sz)) + sh = sh.assign(sec=sh.code.apply(get_stock_type_sh)) + + temp_df = pd.concat([sz, sh]).query('sec in ["stock_cn","etf_cn","bond_cn","cb_cn"]').sort_index().assign( + name=data['name'].apply(lambda x: str(x)[0:6])) + hq_codelist = temp_df.loc[:, ['code', 'name']].set_index(['code'], drop=False) + + for i in range(0, len(temp_df)): + row = temp_df.iloc[i] + if row['sec'] == 'etf_cn': + product = Product.ETF + elif row['sec'] in ['bond_cn', 'cb_cn']: + product = Product.BOND + else: + product = Product.EQUITY + + volume_tick = 100 if product != Product.BOND else 10 + if row['code'].startswith('688'): + volume_tick = 200 + + contract = ContractData( + gateway_name=self.gateway_name, + symbol=row['code'], + exchange=Exchange.SSE if row['sse'] == 'sh' else Exchange.SZSE, + name=row['name'], + product=product, + pricetick=round(0.1 ** row['decimal_point'], row['decimal_point']), + size=1, + min_volume=volume_tick, + margin_rate=1 + + ) + # 缓存 合约 =》 中文名 + symbol_name_map.update({contract.symbol: contract.name}) + + # 缓存代码和交易所的印射关系 + symbol_exchange_map[contract.symbol] = contract.exchange + + self.contract_dict.update({contract.symbol: contract}) + self.contract_dict.update({contract.vt_symbol: contract}) + # 推送 + self.gateway.on_contract(contract) + + return hq_codelist + def run(self, i): """ 版本1:Pool内得线程,持续运行,每个线程从queue中获取一个请求并处理 @@ -699,7 +910,7 @@ class PbMdApi(object): if d.get('bid5'): tick.bid_price_2 = round(d.get('bid2') / (10 ** (decimal_point - 2)), decimal_point) tick.bid_volume_2 = d.get('bid_vol2') - tick.askPrice2 = round(d.get('ask2') / (10 ** (decimal_point - 2)), decimal_point) + tick.ask_price_2 = round(d.get('ask2') / (10 ** (decimal_point - 2)), decimal_point) tick.ask_volume_2 = d.get('ask_vol2') tick.bid_price_3 = round(d.get('bid3') / (10 ** (decimal_point - 2)), decimal_point) @@ -785,11 +996,11 @@ class PbMdApi(object): class PbTdApi(object): - def __init__(self, gateway: BaseGateway): + def __init__(self, gateway: PbGateway): """""" super().__init__() self._active = False - self.gateway: BaseGateway = gateway + self.gateway: PbGateway = gateway self.gateway_name: str = gateway.gateway_name self.userid: str = "" # 资金账号 @@ -809,6 +1020,8 @@ class PbTdApi(object): # 所有交易 self.trades = {} # tradeid: trade + # 本gateway以外的委托 + self.orders = {} # sys_orderid: order # 未获取本地更新检查的orderid清单 self.unchecked_orderids = [] @@ -831,28 +1044,48 @@ class PbTdApi(object): if os.path.exists(self.account_folder): self.login_status = True + # 仅查询一次 + self.query_trades() + # 仅全局查询一次 + self.query_orders() + + # 首次连接时,优先全部撤单 + self.cancel_all() + + if self.gateway.file_type == 'dbf': + self.gateway.query_functions.append(self.query_update_trades_dbf) + self.gateway.query_functions.append(self.query_update_orders_dbf) + def get_data(self, file_path, field_names=None): """获取文件内容""" if not os.path.exists(file_path): return None results = [] - with open(file=file_path, mode='r', encoding='gbk', ) as f: - reader = csv.DictReader(f=f, fieldnames=field_names, delimiter=",") - for row in reader: - results.append(row) - + try: + with open(file=file_path, mode='r', encoding='gbk', ) as f: + reader = csv.DictReader(f=f, fieldnames=field_names, delimiter=",") + for row in reader: + results.append(row) + except Exception as ex: + self.gateway.write_error(f'读取csv文件数据异常:{str(ex)}') return results def query_account(self): """获取资金账号信息""" - - # 账号的文件 - accounts_csv = os.path.abspath(os.path.join(self.account_folder, - self.trading_date, - '{}{}.csv'.format( - PB_FILE_NAMES.get('accounts'), - self.trading_date))) + if self.gateway.pb_version == '2018': + # 账号的文件 + accounts_csv = os.path.abspath(os.path.join(self.account_folder, + '{}{}.csv'.format( + PB_FILE_NAMES.get('accounts'), + self.trading_date))) + else: + # 账号的文件 + accounts_csv = os.path.abspath(os.path.join(self.account_folder, + self.trading_date, + '{}{}.csv'.format( + PB_FILE_NAMES.get('accounts'), + self.trading_date))) # csv => 所有账号资金清单 account_list = self.get_data(accounts_csv) if not account_list: @@ -864,8 +1097,8 @@ class PbTdApi(object): account = AccountData( gateway_name=self.gateway_name, accountid=self.userid, - balance=float(data["产品净值"]), - frozen=float(data["产品净值"]) - float(data["可用余额"]), + balance=float(data["单元净值"]), + frozen=float(data["单元净值"]) - float(data["可用余额"]), currency="人民币", trading_day=self.trading_day ) @@ -873,13 +1106,19 @@ class PbTdApi(object): def query_position(self): """获取持仓信息""" - - # 持仓的文件 - positions_csv = os.path.abspath(os.path.join(self.account_folder, - self.trading_date, - '{}{}.csv'.format( - PB_FILE_NAMES.get('positions'), - self.trading_date))) + if self.gateway.pb_version == '2018': + # 持仓的文件 + positions_csv = os.path.abspath(os.path.join(self.account_folder, + '{}{}.csv'.format( + PB_FILE_NAMES.get('positions'), + self.trading_date))) + else: + # 持仓的文件 + positions_csv = os.path.abspath(os.path.join(self.account_folder, + self.trading_date, + '{}{}.csv'.format( + PB_FILE_NAMES.get('positions'), + self.trading_date))) # csv => 所有持仓清单 position_list = self.get_data(positions_csv) if not position_list: @@ -922,11 +1161,17 @@ class PbTdApi(object): def query_orders(self): """获取所有委托""" # 所有委托的文件 - orders_csv = os.path.abspath(os.path.join(self.account_folder, - self.trading_date, - '{}{}.csv'.format( - PB_FILE_NAMES.get('orders'), - self.trading_date))) + if self.gateway.pb_version == '2018': + orders_csv = os.path.abspath(os.path.join(self.account_folder, + '{}{}.csv'.format( + PB_FILE_NAMES.get('orders'), + self.trading_date))) + else: + orders_csv = os.path.abspath(os.path.join(self.account_folder, + self.trading_date, + '{}{}.csv'.format( + PB_FILE_NAMES.get('orders'), + self.trading_date))) # csv => 所有委托记录 order_list = self.get_data(orders_csv) @@ -944,18 +1189,32 @@ class PbTdApi(object): order_date = data["委托日期"] order_time = data["委托时间"] order_status = STATUS_NAME2VT.get(data["委托状态"]) - if order is None: - local_orderid = self.gateway.order_manager.get_local_orderid(sys_orderid) + + # 检查是否存在本地orders缓存中(系统级别的委托单) + sys_order = self.orders.get(sys_orderid, None) + + if order is not None: + continue + # 委托单不存在本地映射库,说明是其他地方下的单子,不是通过本接口下单 + if sys_order is None: + + # 不处理以下状态 + if order_status in [Status.SUBMITTING, Status.REJECTED, Status.CANCELLED, Status.CANCELLING]: + continue + order_dt = datetime.strptime(f'{order_date} {order_time}', "%Y%m%d %H%M%S") - order = OrderData( + direction = DIRECTION_STOCK_NAME2VT.get(data["委托方向"]) + if direction is None: + direction = Direction.NET + sys_order = OrderData( gateway_name=self.gateway_name, symbol=data["证券代码"], exchange=EXCHANGE_NAME2VT.get(data["交易市场"]), - orderid=local_orderid, + orderid=sys_orderid, sys_orderid=sys_orderid, accountid=self.userid, type=ORDERTYPE_NAME2VT.get(data["价格类型"], OrderType.LIMIT), - direction=DIRECTION_STOCK_NAME2VT.get(data["委托方向"]), + direction=direction, offset=Offset.NONE, price=float(data["委托价格"]), volume=float(data["委托数量"]), @@ -964,23 +1223,193 @@ class PbTdApi(object): datetime=order_dt, time=order_dt.strftime('%H:%M:%S') ) - self.gateway.order_manager.on_order(order) + # 直接发出订单更新事件 + self.gateway.write_log(f'账号订单查询,新增:{sys_order.__dict__}') + self.orders.update({sys_order.sys_orderid: sys_order}) + self.gateway.on_order(sys_order) continue + + # 存在账号缓存,判断状态是否更新 else: - if order.status != order_status or order.traded != float(data["成交数量"]): - order.traded = float(data["成交数量"]) + # 暂不处理,交给XHPT_WTCX模块处理 + if sys_order.status != order_status or sys_order.traded != float(data["成交数量"]): + sys_order.traded = float(data["成交数量"]) + sys_order.status = order_status + self.orders.update({sys_order.sys_orderid: sys_order}) + self.gateway.write_log(f'账号订单查询,更新:{sys_order.__dict__}') + self.gateway.on_order(sys_order) + continue + + def query_update_orders_dbf(self): + """扫描批量下单的委托查询(dbf文件格式)""" + # XHPT_WTCX委托的dbf文件 + orders_dbf = os.path.abspath(os.path.join(self.order_folder, + '{}{}.dbf'.format( + PB_FILE_NAMES.get('update_orders'), + self.trading_date))) + # dbf => 所有委托记录 + try: + # dbf => 所有成交记录 + self.gateway.write_log(f'扫描所有委托查询:{orders_dbf}') + table = dbf.Table(orders_dbf, codepage='cp936') + table.open(dbf.READ_ONLY) + + for data in table: + # 第三方系统自定义号 + local_orderid = str(data.wbzdyxh) + if len(local_orderid) == 0: + self.gateway.write_log(f'获取不到本地委托号:{print_dict(data.__dict__)}') + continue + # 如果不足8位,自动补充0 + if len(local_orderid) < 8: + local_orderid = local_orderid.rjust(8, '0') + + # 委托状态=> + order_status = STATUS_PB2VT.get(str(data.wtzt)) + # 恒生平台返回的委托序号 + sys_orderid = str(data.wtxh) + if len(sys_orderid) == 0: + self.gateway.write_log(f'获取不到恒生平台的委托序号:{print_dict(data.__dict__)}') + continue + + # 通过本地委托编号,检查是否存在本地订单列表中 + order = self.gateway.order_manager.get_order_with_local_orderid(local_orderid) + if order is None: + self.gateway.write_log(f'本地委托编号{local_orderid}不在本地订单中') + + direction = DIRECTION_STOCK_NAME2VT.get(str(data.wtfx).strip()) + if direction is None: + direction = Direction.NET + if order_status == Status.ALLTRADED: + traded = data.wtsl + else: + traded = 0 + order_dt = datetime.strptime(f'{data.wtrq} {data.wtsj}', "%Y%m%d %H%M%S") + exchange = EXCHANGE_PB2VT.get(str(data.jysc).strip()) + new_order = OrderData( + gateway_name=self.gateway_name, + symbol=str(data.zqdm).strip(), + exchange=exchange, + orderid=local_orderid, + sys_orderid=sys_orderid, + accountid=self.userid, + type=ORDERTYPE_PB2VT.get(str(data.wtjglx).strip(), OrderType.LIMIT), + direction=direction, + offset=Offset.NONE, + price=float(data.wtjg), + volume=float(data.wtsl), + traded=traded, + status=order_status, + datetime=order_dt, + time=order_dt.strftime('%H:%M:%S') + ) + self.gateway.write_log(f'补充委托记录:{print_dict(new_order.__dict__)}') + self.gateway.order_manager.on_order(new_order) + + continue + + if order.sys_orderid != sys_orderid: + pre_sys_orderid = order.sys_orderid + order.sys_orderid = sys_orderid + self.gateway.order_manager.update_orderid_map(local_orderid=local_orderid, sys_orderid=sys_orderid) + self.gateway.write_log( + f'绑定local_orderid:{local_orderid}, <=> 系统委托号:{pre_sys_orderid}=>{sys_orderid}') + + if local_orderid in self.unchecked_orderids: + self.unchecked_orderids.remove(local_orderid) + + # 如果委托状态是已经撤单,拒单,已成交,就不处理 + if order.status in [Status.CANCELLED, Status.REJECTED, Status.ALLTRADED]: + continue + + if order.status != order_status: + self.gateway.write_log(f'{local_orderid} 状态:{order.status.value} => {order_status.value}') order.status = order_status + if order.status == Status.CANCELLED: + order.cancel_time = datetime.now().strftime('%H:%M:%S') + if order.status == Status.ALLTRADED and order.traded != order.volume: + self.gateway.write_log(f'dbf批量下单,委托单全成交,成交数:{order.traded}=>{order.volume}') + order.traded = order.volume + + self.gateway.write_log(f'dbf批量下单,委托单更新:{order.__dict__}') self.gateway.order_manager.on_order(order) continue + table.close() + except Exception as ex: + self.gateway.write_error(f'dbf查询委托库异常:{str(ex)}') + self.gateway.write_error(traceback.format_exc()) + + def query_update_orders_csv(self): + """扫描批量下单的委托查询(csv文件格式)""" + # XHPT_WTCX委托的CSV文件 + orders_csv = os.path.abspath(os.path.join(self.order_folder, + '{}{}.csv'.format( + PB_FILE_NAMES.get('update_orders'), + self.trading_date))) + # csv => 所有委托记录 + order_list = self.get_data(orders_csv, field_names=UPDATE_ORDER_FIELDS.keys()) + if not order_list: + return + + for data in order_list: + # 第三方系统自定义号 + local_orderid = str(data["WBZDYXH"]).lstrip() + if len(local_orderid) == 0: + continue + if len(local_orderid) < 8: + local_orderid = local_orderid.rjust(8, '0') + + order = self.gateway.order_manager.get_order_with_local_orderid(local_orderid) + if order is None: + continue + + # 恒生平台返回的委托序号 + sys_orderid = str(data['WTXH']).lstrip() + if len(sys_orderid) == 0: + continue + + if order.sys_orderid != sys_orderid: + pre_sys_orderid = order.sys_orderid + order.sys_orderid = sys_orderid + self.gateway.order_manager.update_orderid_map(local_orderid=local_orderid, sys_orderid=sys_orderid) + self.gateway.write_log(f'绑定local_orderid:{local_orderid}, <=> 系统委托号:{pre_sys_orderid}=>{sys_orderid}') + if local_orderid in self.unchecked_orderids: + self.unchecked_orderids.remove(local_orderid) + + # 如果委托状态是已经撤单,拒单,已成交,就不处理 + if order.status in [Status.CANCELLED, Status.REJECTED, Status.ALLTRADED]: + continue + + order_status = STATUS_PB2VT.get(data["WTZT"]) + + if order.status != order_status: + self.gateway.write_log(f'{local_orderid} 状态:{order.status.value} => {order_status.value}') + order.status = order_status + if order.status == Status.CANCELLED: + order.cancel_time = datetime.now().strftime('%H:%M:%S') + if order.status == Status.ALLTRADED and order.traded != order.volume: + self.gateway.write_log(f'csv批量下单,委托单全成交,成交数:{order.traded}=>{order.volume}') + order.traded = order.volume + + self.gateway.write_log(f'csv批量下单,委托更新:{order.__dict__}') + self.gateway.order_manager.on_order(order) + continue + def query_trades(self): """获取所有成交""" # 所有成交的文件 - trades_csv = os.path.abspath(os.path.join(self.account_folder, - self.trading_date, - '{}{}.csv'.format( - PB_FILE_NAMES.get('trades'), - self.trading_date))) + if self.gateway.pb_version == '2018': + trades_csv = os.path.abspath(os.path.join(self.account_folder, + '{}{}.csv'.format( + PB_FILE_NAMES.get('trades'), + self.trading_date))) + else: + trades_csv = os.path.abspath(os.path.join(self.account_folder, + self.trading_date, + '{}{}.csv'.format( + PB_FILE_NAMES.get('trades'), + self.trading_date))) # csv => 所有成交记录 trade_list = self.get_data(trades_csv) @@ -996,9 +1425,10 @@ class PbTdApi(object): # 检查是否存在本地trades缓存中 trade = self.trades.get(sys_tradeid, None) + order = self.gateway.order_manager.get_order_with_sys_orderid(sys_orderid) - if trade is None: - local_orderid = self.gateway.order_manager.get_local_orderid(sys_orderid) + # 如果交易不再本地映射关系 + if trade is None and order is None: trade_date = data["成交日期"] trade_time = data["成交时间"] trade_dt = datetime.strptime(f'{trade_date} {trade_time}', "%Y%m%d %H%M%S") @@ -1006,7 +1436,7 @@ class PbTdApi(object): gateway_name=self.gateway_name, symbol=data["证券代码"], exchange=EXCHANGE_NAME2VT.get(data["交易市场"]), - orderid=local_orderid, + orderid=sys_tradeid, tradeid=sys_tradeid, sys_orderid=sys_orderid, accountid=self.userid, @@ -1023,8 +1453,207 @@ class PbTdApi(object): self.gateway.on_trade(copy.copy(trade)) continue - def check_send_order(self): - """检查更新委托文件""" + def query_update_trades_dbf(self): + """获取接口的dbf成交更新""" + # 所有成交的dbf文件 + trades_dbf = os.path.abspath(os.path.join(self.order_folder, + '{}{}.dbf'.format( + PB_FILE_NAMES.get('update_trades'), + self.trading_date))) + + try: + # dbf => 所有成交记录 + self.gateway.write_log(f'扫描所有成交记录:{trades_dbf}') + table = dbf.Table(trades_dbf, codepage='cp936') + table.open(dbf.READ_ONLY) + + for data in table: + # 本地委托号 + local_orderid = str(data.wbzdyxh).strip() + if 0 < len(local_orderid) < 8: + local_orderid = local_orderid.rjust(8, '0') + # 系统委托号 + sys_orderid = str(data.wtxh).strip() + # 系统交易号 + sys_tradeid = str(data.cjbh).strip() + + # 检查是否存在本地trades缓存中 + trade = self.trades.get(sys_tradeid, None) + order = self.gateway.order_manager.get_order_with_sys_orderid(sys_orderid) + + # 如果交易不再本地映射关系 + if trade is None and order: + trade_date = str(data.cjrq).strip() + trade_time = str(data.cjsj).strip() + trade_dt = datetime.strptime(f'{trade_date} {trade_time}', "%Y%m%d %H%M%S") + + trade = TradeData( + gateway_name=self.gateway_name, + symbol=str(data.zqdm).strip(), + exchange=EXCHANGE_PB2VT.get(str(data.jysc).strip()), + orderid=local_orderid, + tradeid=sys_tradeid, + sys_orderid=sys_orderid, + accountid=self.userid, + direction=DIRECTION_ORDER_PB2VT.get(str(data.wtfx).strip()), + offset=Offset.NONE, + price=float(data.cjjg), + volume=int(data.cjsl), + datetime=trade_dt, + time=trade_dt.strftime('%H:%M:%S'), + trade_amount=float(data.cjje), + commission=float(data.zfy), + holder_id=str(data.gddm).strip() + ) + # 保存交易记录 + self.trades[sys_tradeid] = trade + + # 更新订单的成交数量 + if order.volume >= order.traded + trade.volume: + pre_traded = order.traded + order.traded += trade.volume + self.gateway.write_log( + f'{local_orderid}/{sys_orderid} 成交数量:{pre_traded} =>{order.traded} ,目标:{order.volume}') + + # 发送成交更新 + self.gateway.on_trade(copy.copy(trade)) + continue + + table.close() + except Exception as ex: + self.gateway.write_error(f'dbf查询成交库异常:{str(ex)}') + self.gateway.write_error(traceback.format_exc()) + + def query_update_trades_csv(self): + """获取接口的csv成交更新""" + # 所有成交的csv文件 + trades_csv = os.path.abspath(os.path.join(self.order_folder, + '{}{}.csv'.format( + PB_FILE_NAMES.get('update_trades'), + self.trading_date))) + + # csv => 所有成交记录 + trade_list = self.get_data(trades_csv, field_names=UPDATE_TRADE_FIELDS.keys()) + if not trade_list: + return + + for data in trade_list: + local_orderid = str(data["WBZDYXH"]).lstrip() + if len(local_orderid) < 8: + local_orderid = local_orderid.rjust(8, '0') + + sys_orderid = str(data["WTXH"]).lstrip() + sys_tradeid = str(data["CJBH"]).lstrip() + + # 检查是否存在本地trades缓存中 + trade = self.trades.get(sys_tradeid, None) + order = self.gateway.order_manager.get_order_with_sys_orderid(sys_orderid) + + # 如果交易不再本地映射关系 + if trade is None and order: + trade_date = str(data["CJRQ"]).lstrip() + trade_time = str(data["CJSJ"]).lstrip() + trade_dt = datetime.strptime(f'{trade_date} {trade_time}', "%Y%m%d %H%M%S") + + trade = TradeData( + gateway_name=self.gateway_name, + symbol=str(data["ZQDM"]).lstrip(), + exchange=EXCHANGE_PB2VT.get(str(data["JYSC"]).lstrip()), + orderid=local_orderid, + tradeid=sys_tradeid, + sys_orderid=sys_orderid, + accountid=self.userid, + direction=DIRECTION_ORDER_PB2VT.get(str(data["WTFX"]).lstrip()), + offset=Offset.NONE, + price=float(str(data["CJJG"]).lstrip()), + volume=float(str(data["CJSL"]).lstrip()), + datetime=trade_dt, + time=trade_dt.strftime('%H:%M:%S'), + trade_amount=float(str(data["CJJE"]).lstrip()), + commission=float(str(data["ZFY"]).lstrip()), + holder_id=str(data['GDDM']).lstrip() + ) + # 保存交易记录 + self.trades[sys_tradeid] = trade + + # 更新订单的成交数量 + if order.volume >= order.traded + trade.volume: + pre_traded = order.traded + order.traded += trade.volume + self.gateway.write_log( + f'{local_orderid}/{sys_orderid} 成交数量:{pre_traded} =>{order.traded} ,目标:{order.volume}') + + # 发送成交更新 + self.gateway.on_trade(copy.copy(trade)) + continue + + def check_send_order_dbf(self): + """检查更新委托文件csv""" + + dbf_file = os.path.abspath(os.path.join(self.order_folder, + '{}{}.dbf'.format(PB_FILE_NAMES.get('send_order'), self.trading_date))) + try: + table = dbf.Table(dbf_file, codepage='cp936') + table.open(dbf.READ_ONLY) + for record in table: + local_orderid = str(record.wbzdyxh) + if len(local_orderid) < 8: + local_orderid = local_orderid.rjust(8, '0') + + if local_orderid not in self.unchecked_orderids: + continue + + # 从本地order_manager中获取order + order = self.gateway.order_manager.get_order_with_local_orderid(local_orderid) + # 判断order取不到,或者order状态不是SUBMITTING + if order is None or order.status != Status.SUBMITTING: + continue + + # 检查是否具有系统委托编号 + if order.sys_orderid == "": + sys_orderid = str(getattr(record, 'wtxh', '')) + if len(sys_orderid) == 0: + continue + + # 委托失败标志 + if sys_orderid == "0": + err_msg = record.sbyy + if isinstance(err_msg, bytes): + err_msg = err_msg.decode('gbk') + + if len(err_msg) == 0 or record.wtsbdm == 0: + self.gateway.write_log(f'收到失败标准,又没有失败原因:{print_dict(record.__dict__)}') + continue + + err_id = str(getattr(record, 'wtsbdm', '')).strip() + order.status = Status.REJECTED + self.gateway.write_log(f'dbf批量下单,委托被拒:{order.__dict__}') + self.gateway.order_manager.on_order(order) + self.gateway.write_error(msg=err_msg, error={"ErrorID": err_id, "ErrorMsg": "委托失败"}) + + if sys_orderid != '0': + self.gateway.order_manager.update_orderid_map(local_orderid=local_orderid, + sys_orderid=sys_orderid) + order.sys_orderid = sys_orderid + order.status = Status.NOTTRADED + self.gateway.write_log(f'绑定本地local_orderid:{local_orderid} <=>sys_orderid:{sys_orderid}') + self.gateway.write_log(f'dbf批量下单,委托接受:{order.__dict__}') + self.gateway.order_manager.on_order(order) + self.gateway.write_log(f'委托成功') + + # 移除检查的id + self.gateway.write_log(f'本地委托单更新检查完毕,移除{local_orderid}') + self.unchecked_orderids.remove(local_orderid) + + table.close() + + except Exception as ex: + self.gateway.write_error(f'dbf查询系统委托号异常:{str(ex)}') + self.gateway.write_error(traceback.format_exc()) + + def check_send_order_csv(self): + """检查更新委托文件csv""" + # 当日send_order的文件 send_order_csv = os.path.abspath(os.path.join(self.order_folder, '{}{}.csv'.format( @@ -1053,18 +1682,25 @@ class PbTdApi(object): sys_orderid = data.get('WTXH', '').lstrip(' ') if len(sys_orderid) == 0: continue + err_msg = data.get('SBYY', '').lstrip(' ') # 委托失败标志 if sys_orderid == "0": - err_msg = data.get('SBYY', '').lstrip(' ') + if len(err_msg) == 0: + self.gateway.write_log(f'收到失败标准,又没有失败原因:{print_dict(data.__dict__)}') + continue + err_id = data.get('WTSBDM', '').lstrip(' ') order.status = Status.REJECTED + self.gateway.write_log(f'csv批量下单,委托被拒:{order.__dict__}') self.gateway.order_manager.on_order(order) self.gateway.write_error(msg=err_msg, error={"ErrorID": err_id, "ErrorMsg": "委托失败"}) - else: + + if sys_orderid != '0': self.gateway.order_manager.update_orderid_map(local_orderid=local_orderid, sys_orderid=sys_orderid) order.sys_orderid = sys_orderid order.status = Status.NOTTRADED + self.gateway.write_log(f'csv批量下单,委托被接受:{order.__dict__}') self.gateway.order_manager.on_order(order) self.gateway.write_log(f'委托成功') @@ -1073,7 +1709,99 @@ class PbTdApi(object): self.unchecked_orderids.remove(local_orderid) def send_order(self, req: OrderRequest): - """委托""" + """委托发单""" + if self.gateway.file_type == 'dbf': + return self.send_order_dbf(req) + else: + return self.send_order_csv(req) + + def send_order_dbf(self, req: OrderRequest): + """通过dbf文件进行发单""" + # 发生委托,才添加批量埋单接口的委托、成交检查 + if self.query_update_trades_dbf not in self.gateway.query_functions: + self.gateway.query_functions.append(self.query_update_trades_dbf) + if self.query_update_orders_dbf not in self.gateway.query_functions: + self.gateway.query_functions.append(self.query_update_orders_dbf) + + # 创建本地orderid(str格式, HHMM+00序列号) + local_orderid = self.gateway.order_manager.new_local_orderid() + + # req => order + order = req.create_order_data(orderid=local_orderid, gateway_name=self.gateway_name) + + dbf_file = os.path.abspath(os.path.join(self.order_folder, + '{}{}.dbf'.format(PB_FILE_NAMES.get('send_order'), self.trading_date))) + # 股票买卖,强制offset = Offset.NONE + order.offset = Offset.NONE + + contract = self.gateway.md_api.contract_dict.get(f'{order.symbol}.{order.exchange.value}') + direction = DIRECTION_STOCK_VT2PB.get((order.direction, order.offset)) # 委托方向 + if contract: + if contract.product == Product.BOND: + if direction == '1': + direction = '3' + else: + direction = '4' + + data = ( + self.product_id, # "CPBH": "C32", # 产品代码/基金代码 <-- 输入参数 --> + self.unit_id, # "ZCDYBH": "C16", # 单元编号/组合编号 + self.unit_id, # "ZHBH": "C16", # 组合编号 + self.holder_ids.get(order.exchange), # "GDDM": "C20", # 股东代码 + EXCHANGE_VT2PB.get(order.exchange), # "JYSC": "C3", # 交易市场 + order.symbol, # "ZQDM": "C16", # 证券代码 + direction, # "WTFX": "C4", # 委托方向 + get_pb_order_type(order.exchange, order.type), # "WTJGLX": "C1", # 委托价格类型 + round(order.price, 2), # "WTJG": "N11.4", # 委托价格 + int(order.volume), # "WTSL": "N12", # 委托数量 + local_orderid, # "WBZDYXH": "N9", # 第三方系统自定义号( 如果字符串不是数字,会报错,如果前面有0,自动去掉) + None, # "WTXH": "N8", # 委托序号 <-- 输出参数 --> + None, # "WTSBDM": "N8", # 委托失败代码 + "", # "SBYY": "C254", # 失败原因 + "", # "CLBZ": "C1", # 处理标志 <-- 内部自用字段 --> + "", # "BYZD": "C2", # 备用字段 + 0, # "WTJE": "N16.2", # 委托金额 <-- 扩充参数 --> + "", # "TSBS": "C64", # 特殊标识 + "" # "YWBS": "C2", # 业务标识 + ) + try: + # 打开dbf文件=》table + table = dbf.Table(dbf_file) + # 读取、写入模式 + table.open(dbf.READ_WRITE) + # 写入数据 + table.append(data) + # 关闭dbf文件 + table.close() + except Exception as ex: + self.gateway.write_error(f'dbf添加发单记录异常:{str(ex)}') + self.gateway.write_error(traceback.format_exc()) + return "" + + # 设置状态为提交中 + order.status = Status.SUBMITTING + # 添加待检查列表 + self.unchecked_orderids.append(local_orderid) + # 登记并发送on_order事件 + self.gateway.write_log(f'send_order,提交dbf委托:{order.__dict__}') + self.gateway.order_manager.on_order(order) + + # 添加定时检查任务 + if self.check_send_order_dbf not in self.gateway.query_functions: + self.gateway.write_log(f'添加扫描系统委托号任务到任务队列中') + self.gateway.query_functions.append(self.check_send_order_dbf) + + return order.vt_orderid + + def send_order_csv(self, req: OrderRequest): + """csv文件格式委托""" + + # 发生委托,才添加批量埋单接口的委托、成交检查 + if self.query_update_trades_csv not in self.gateway.query_functions: + self.gateway.query_functions.append(self.query_update_trades_csv) + if self.query_update_orders_csv not in self.gateway.query_functions: + self.gateway.query_functions.append(self.query_update_orders_csv) + # 创建本地orderid local_orderid = self.gateway.order_manager.new_local_orderid() @@ -1085,6 +1813,14 @@ class PbTdApi(object): # 股票买卖,强制offset = Offset.NONE order.offset = Offset.NONE + contract = self.gateway.md_api.contract_dict.get(f'{order.symbol}.{order.exchange.value}') + direction = DIRECTION_STOCK_VT2PB.get((order.direction, order.offset)) # 委托方向 + if contract: + if contract.product == Product.BOND: + if direction == '1': + direction = '3' + else: + direction = '4' data = { "CPBH": self.product_id, # 产品代码/基金代码 <-- 输入参数 --> "ZCDYBH": self.unit_id, # 单元编号/组合编号 @@ -1092,7 +1828,7 @@ class PbTdApi(object): "GDDM": self.holder_ids.get(order.exchange), # 股东代码 "JYSC": EXCHANGE_VT2PB.get(order.exchange), # 交易市场 "ZQDM": order.symbol, # 证券代码 - "WTFX": DIRECTION_STOCK_VT2PB.get((order.direction, order.offset)), # 委托方向 + "WTFX": direction, "WTJGLX": get_pb_order_type(order.exchange, order.type), # 委托价格类型 "WTJG": round(order.price, 4), # 委托价格 "WTSL": int(order.volume), # 委托数量 @@ -1104,7 +1840,7 @@ class PbTdApi(object): append_data(file_name=csv_file, dict_data=order_data, - field_names=SEND_ORDER_FIELDS.keys(), + field_names=list(SEND_ORDER_FIELDS.keys()), auto_header=False, encoding='gbk') @@ -1113,24 +1849,98 @@ class PbTdApi(object): # 添加待检查列表 self.unchecked_orderids.append(local_orderid) # 登记并发送on_order事件 + self.gateway.write_log(f'send_order,提交csv下单:{order.__dict__}') self.gateway.order_manager.on_order(order) # 添加定时检查任务 - if self.check_send_order not in self.gateway.query_functions: + if self.check_send_order_csv not in self.gateway.query_functions: self.gateway.write_log(f'添加定时检查到任务队列中') - self.gateway.query_functions.append(self.check_send_order) + self.gateway.query_functions.append(self.check_send_order_csv) + + return order.vt_orderid def cancel_order(self, req: CancelRequest): - """撤单""" + if self.gateway.file_type == 'dbf': + return self.cancel_order_dbf(req) + else: + return self.cancel_order_csv(req) + + def cancel_order_dbf(self, req: CancelRequest): + """ + dbf文件撤单 + :param req: + :return: + """ + self.gateway.write_log(f'dbf委托撤单:{req.__dict__}') + try: + # 获取订单 + order = self.gateway.order_manager.get_order_with_local_orderid(local_orderid=req.orderid) + + # 订单不存在 + if order is None: + self.gateway.write_error(f'订单{req.orderid}不存在, 撤单失败') + return False + + # 或者已经全部成交,已经被拒单,已经撤单 + if order.status in [Status.ALLTRADED, Status.REJECTED, Status.CANCELLING, + Status.CANCELLED]: + self.gateway.write_error(f'订单{req.orderid}存在, 状态为:{order.status}, 不能再撤单') + return False + + sys_orderid = self.gateway.order_manager.get_sys_orderid(req.orderid) + + if sys_orderid is None or len(sys_orderid) == 0: + self.gateway.write_error(f'订单{req.orderid}=》系统委托id不存在,撤单失败') + return False + + data = ( + int(sys_orderid), # 委托序号 + None, # "JYSC": "C3", # 交易市场 + None, # "ZQDM": "C16", # 证券代码 + None, # "CDCGBZ": "C1", # 撤单成功标志 + None, # "SBYY": "C254", # 失败原因 + None, # "CLBZ": "C1", # 处理标志 + None, # "BYZD": "C2", # 备用字段 + None # "BYZD2": "C16", # 备用字段2 + ) + + dbf_file = os.path.abspath(os.path.join(self.order_folder, + '{}{}.dbf'.format(PB_FILE_NAMES.get('cancel_order'), + self.trading_date))) + + + # 打开dbf文件=》table + table = dbf.Table(dbf_file) + # 读取、写入模式 + table.open(dbf.READ_WRITE) + # 写入数据 + table.append(data) + # 关闭dbf文件 + table.close() + return True + except Exception as ex: + self.gateway.write_error(f'dbf委托撤单异常:{str(ex)}') + self.gateway.write_error(traceback.format_exc()) + return False + + def cancel_order_csv(self, req: CancelRequest): + """csv文件撤单""" + self.gateway.write_log(f'处理撤单请求{req.__dict__}') # 获取订单 order = self.gateway.order_manager.get_order_with_local_orderid(local_orderid=req.orderid) - # 订单不存在,或者已经全部成交,已经被拒单,已经撤单 - if order is None or order.status in [Status.ALLTRADED, Status.REJECTED, Status.CANCELLING, Status.CANCELLED]: + # 订单不存在 + if order is None: self.gateway.write_log(f'订单{req.orderid}不存在, 撤单失败') return False + # 或者已经全部成交,已经被拒单,已经撤单 + if order.status in [Status.ALLTRADED, Status.REJECTED, Status.CANCELLING, + Status.CANCELLED]: + self.gateway.write_log(f'订单{req.orderid}存在, 状态为:{order.status}, 不能再撤单') + return False + sys_orderid = self.gateway.order_manager.get_sys_orderid(req.orderid) if len(sys_orderid) == 0: @@ -1144,10 +1954,61 @@ class PbTdApi(object): cancel_data = format_dict(data, CANCEL_ORDER_FIELDS) csv_file = os.path.abspath(os.path.join(self.order_folder, - '{}{}.csv'.format(PB_FILE_NAMES.get('cancel_order'), self.trading_date))) + '{}{}.csv'.format(PB_FILE_NAMES.get('cancel_order'), + self.trading_date))) append_data(file_name=csv_file, dict_data=cancel_data, - field_names=CANCEL_ORDER_FIELDS.keys(), + field_names=list(CANCEL_ORDER_FIELDS.keys()), auto_header=False, encoding='gbk') return True + + def cancel_all(self): + if self.gateway.file_type == 'dbf': + return self.cancel_all_dbf() + else: + return self.cancel_all_csv() + + def cancel_all_dbf(self): + """dbf文件全策略单d""" + # XHPT_WTCX委托的dbf文件 + orders_dbf = os.path.abspath(os.path.join(self.order_folder, + '{}{}.dbf'.format( + PB_FILE_NAMES.get('update_orders'), + self.trading_date))) + + cancel_dbf = os.path.abspath(os.path.join(self.order_folder, + '{}{}.dbf'.format(PB_FILE_NAMES.get('cancel_order'), + self.trading_date))) + + # dbf => 所有委托记录 + try: + # dbf => 所有成交记录 + self.gateway.write_log(f'全撤单,扫描所有委托查询记录:{orders_dbf}') + orders_table = dbf.Table(orders_dbf, codepage='cp936') + orders_table.open(dbf.READ_ONLY) + + cancel_table = dbf.Table(cancel_dbf, codepage='cp936') + cancel_table.open(dbf.READ_WRITE) + + for data in orders_table: + # 委托状态=> + order_status = STATUS_PB2VT.get(str(data.wtzt)) + # 恒生平台返回的委托序号 + sys_orderid = str(data.wtxh) + + if order_status in [Status.NOTTRADED] and len(sys_orderid) > 0: + self.gateway.write_log(f'撤单:{data.__dict__}') + cancel_data = (int(sys_orderid), None, None, None, None, None, None, None) + cancel_table.append(cancel_data) + + orders_table.close() + cancel_table.close() + + except Exchange as ex: + self.gateway.write_error(f'dbf全委托撤单异常:{str(ex)}') + self.gateway.write_error(traceback.format_exc()) + return False + + def cancel_all_csv(self): + pass