diff --git a/vnpy/gateway/oes/__init__.py b/vnpy/gateway/oes/__init__.py new file mode 100644 index 00000000..1d13b91e --- /dev/null +++ b/vnpy/gateway/oes/__init__.py @@ -0,0 +1 @@ +from .oes_gateway import OesGateway diff --git a/vnpy/gateway/oes/config_template.ini b/vnpy/gateway/oes/config_template.ini new file mode 100644 index 00000000..c7db390f --- /dev/null +++ b/vnpy/gateway/oes/config_template.ini @@ -0,0 +1,135 @@ +# +# MDS API接口库的配置文件样例 +# + +############################################## +# 客户端配置 +############################################## + +[oes_client] +ordServer = 1 {td_ord_server} +rptServer = 1 {td_rpt_server} +qryServer = 1 {td_qry_server} + +username = {username} +# 密码支持明文和MD5两种格式 (如 txt:XXX 或 md5:XXX..., 不带前缀则默认为明文) +password = {password} +heartBtInt = 30 + +# 客户端环境号, 用于区分不同客户端实例上报的委托申报, 取值由客户端自行分配 +# 取值范围 [0~99] ([100~127] 为保留区间,客户端应避免使用) +clEnvId = 0 + +# 待订阅的客户端环境号 +# - 大于0, 区分环境号, 仅订阅环境号对应的回报数据 +# - 小于等于0, 不区分环境号, 订阅该客户下的所有回报数据 +rpt.subcribeEnvId = 0 + +# 待订阅的回报消息类型集合 +# - 0:默认 (等价于: 1,2,4,8,0x10,0x20,0x40), 0xFFFF:所有 +# - 1:OES业务拒绝, 2:OES委托已生成, 4:交易所委托回报, 8:交易所成交回报, 0x10:出入金委托执行报告, +# - 0x20:资金变动信息, 0x40:持仓变动信息, 0x80:市场状态信息 +# 要订阅多个数据种类, 可以用逗号或空格分隔, 或者设置为并集值 +# 比如想订阅所有委托、成交相关的回报消息,可以使用如下两种方式: +# - rpt.subcribeRptTypes = 1,4,8 +# - 或等价的: rpt.subcribeRptTypes = 0x0D +rpt.subcribeRptTypes = 0 + +# 服务器集群的集群类型 (1: 基于复制集的高可用集群, 2: 基于对等节点的服务器集群, 0: 默认为基于复制集的高可用集群) +clusterType = 0 + +# 套接字参数配置 (可选配置) +soRcvbuf = 8192 +soSndbuf = 8192 +connTimeoutMs = 5000 +tcpNodelay = 1 +quickAck = 1 +keepalive = 1 +keepIdle = 60 +keepIntvl = 5 +keepCnt = 9 + +[mds_client] +#udpServer.L1 = udp-mcast://232.200.151.100:5301 +#udpServer.L2 = udp-mcast://232.200.152.100:5302 +#udpServer.TickTrade = udp-mcast://232.200.153.100:5303 +#udpServer.TickOrder = udp-mcast://232.200.154.100:5304 + +tcpServer = {md_tcp_server} +qryServer = {md_qry_server} + +username = {username} +# 密码支持明文和MD5两种格式 (如 txt:XXX 或 md5:XXX..., 不带前缀则默认为明文) +password = {password} +heartBtInt = 30 + +sse.stock.enable = false +sse.stock.instrs = + +sse.index.enable = false +sse.index.instrs = + +sse.option.enable = false +sse.option.instrs = 10000001, 11001996 + +szse.stock.enable = false +szse.stock.instrs = + +szse.index.enable = false +szse.index.instrs = + +# 订阅模式 (0: 设置为订阅列表中的股票, 1: 增加订阅列表中的股票, 2: 删除订阅列表中的股票) +mktData.subMode = 0 + +# 数据模式 (0: 订阅最新快照并跳过过时数据, 1: 订阅最新快照并立即发送, 2: 订阅所有时点的行情快照) +mktData.tickType = 0 + +# 逐笔数据的过期时间类型 (0: 不过期, 1: 立即过期 (若落后于快照1秒则视为过期), 2: 及时过期 (3秒), 3: 超时过期 (30秒)) +mktData.tickExpireType = 0 + +# 订阅的数据种类 +# (0:所有, 1:L1快照/指数/期权, 2:L2快照, 4:L2委托队列, 8:L2逐笔成交, +# 0x10:L2逐笔委托, 0x20:L2虚拟集合竞价, 0x40:L2市场总览, 0x100:市场状态, 0x200:证券实时状态, +# 0x400:指数行情, 0x800:期权行情) +# 要订阅多个数据种类, 可以用逗号或空格分隔, 或者设置为并集值, 如: +# "mktData.dataTypes = 1,2,4" 或等价的 "mktData.dataTypes = 0x07" +mktData.dataTypes = 0 + +# 请求订阅的行情数据的起始时间 (格式: HHMMSS 或 HHMMSSsss) +# (-1: 从头开始获取, 0: 从最新位置开始获取实时行情, 大于0: 从指定的起始时间开始获取) +mktData.beginTime = 0 + +# 在推送实时行情数据之前, 是否需要推送已订阅产品的初始的行情快照 +#mktData.isRequireInitialMktData = 0 + +# 行情服务器集群的集群类型 (1: 基于复制集的高可用集群, 2: 基于对等节点的服务器集群, 0: 默认为基于复制集的高可用集群) +clusterType = 0 + +# 套接字参数配置 (可选配置) +soRcvbuf = 8192 +soSndbuf = 1024 +connTimeoutMs = 5000 +tcpNodelay = 1 +quickAck = 1 +keepalive = 1 +keepIdle = 60 +keepIntvl = 5 +keepCnt = 9 +#mcastInterfaceIp = 192.168.0.11 + + +############################################## +# 日志配置 +############################################## + +[log] +log.root_category = {log_level}, console_log +log.mode={log_mode} +log.threshold=TRACE +log.file={log_path} +log.file.max_file_length=300 +log.file.max_backup_index=3 + +[console_log] +log.mode=console +log.threshold=ERROR diff --git a/vnpy/gateway/oes/error_code.py b/vnpy/gateway/oes/error_code.py new file mode 100644 index 00000000..afb553c1 --- /dev/null +++ b/vnpy/gateway/oes/error_code.py @@ -0,0 +1,128 @@ +from vnoes import OesApi_GetErrorMsg, OesApi_GetLastError + + +def error_to_str(code: int): + try: + # return error_codes[code] + return OesApi_GetErrorMsg(code) + except KeyError: + return "Unknown error code!" + + +def get_last_error(): + code = OesApi_GetLastError() + return OesApi_GetErrorMsg(code) + + +error_codes = { + 1001: "报文格式错误", + 1002: "当前主机不是主节点", + 1003: "主存库操作失败", + 1004: "因状态等基础数据不匹配,无法更新数据", + 1005: "协议版本不兼容", + 1006: "数据不存在", + 1007: "未到达服务开放时间", + 1008: "非法的定位游标", + 1009: "非法的客户端登陆用户名称", + 1010: "非法的证券代码", + 1011: "非法的客户代码", + 1012: "非法的客户端类型", + 1013: "客户端已被禁用", + 1014: "客户端密码不正确", + 1015: "客户端重复登录", + 1016: "客户端连接数量过多", + 1017: "客户端未经授权操作他人账户", + 1018: "数据超出修改范围", + 1019: "非法的应用系统名称", + 1020: "请求条件有冲突", + 1021: "非法的客户端IP/MAC地址格式", + 1022: "尚不支持此业务", + 1023: "非法的客户端环境号", + 1024: "交易所拒绝", + 1025: "主柜拒绝", + 1026: "流量超出限制范围", + 1027: "禁止使用API登录", + 1028: "非法的私募基金产品代码", + 1029: "密码未改变", + 1030: "非法的来源分类", + 1031: "非法的加密类型", + 1032: "非法的客户端设备序列号", + 1033: "无可用节点", + 1101: "登录柜台失败", + 1102: "上报至柜台失败", + 1103: "从柜台获取状态失败", + 1201: "非法的证券账户代码", + 1202: "非法的资金账户代码", + 1203: "非法的出入金方向", + 1204: "非法的市场代码", + 1205: "非法的证券类别", + 1206: "非法的买卖类型", + 1207: "非法的币种", + 1208: "非法的委托类型", + 1209: "无效的账户状态", + 1210: "未找到委托信息", + 1211: "未找到持仓信息", + 1212: "未找到出入金流水", + 1213: "流水号重复", + 1214: "当前时段不能报价", + 1215: "没有操作权限", + 1216: "可用/可取资金余额不足", + 1217: "可用持仓不足", + 1218: "委托数量不在合法区间内", + 1219: "非数量单位的整数倍", + 1220: "非法的PBU代码", + 1221: "价格不在合法区间内", + 1222: "非价格单位的整数倍", + 1223: "无涨停价市价委托失败", + 1224: "当前时段不支持市价委托", + 1225: "无效的订单状态", + 1226: "撤单信息与原始委托不符", + 1227: "重复撤单", + 1228: "未通过限仓检查", + 1229: "未通过限购检查", + 1230: "超过了ETF最大现金替代比例", + 1231: "非行权日", + 1232: "产品(证券)停牌", + 1233: "合约限制开仓", + 1234: "当日累计申购或赎回数量超过限额", + 1235: "当日累计净申购或净赎回数量超过限额", + 1236: "找不到前收盘价", + 1237: "超过报撤比限制", + 1238: "委托请求过于频繁", + 1239: "非法的出入金转账金额", + 1240: "重复的认购委托", + 1241: "认购委托份数超过认购额度", + 1242: "出入金笔数超过限制", + 1243: "禁止同时做多笔出入金", + 1244: "非法的新股配号、中签记录类型", + 1245: "限制股东账户进行买交易", + 1246: "限制股东账户进行卖交易", + 1247: "限制股东账户进行逆回购交易", + 1248: "限制股东账户进行新股认购交易", + 1249: "股东账户没有市价委托交易的权限", + 1250: "股东账户没有交易创业板证券的权限", + 1251: "股东账户没有交易分级基金的权限", + 1252: "股东账户没有债券合格投资者的权限", + 1253: "客户风险评级低于交易证券需求的风险等级", + 1254: "股东账户没有交易风险警示证券的权限", + 1255: "股东账户没有交易退市整理证券的权限", + 1256: "股东账户没有交易单市场ETF的权限", + 1257: "股东账户没有交易跨市场ETF的权限", + 1258: "股东账户没有交易货币基金ETF的权限", + 1259: "股东账户没有交易跨境ETF的权限", + 1260: "仅允许合格投资者投资该证券", + 1261: "仅允许合格机构投资者投资该证券", + 1262: "出入金执行异常,待人工干预", + 1263: "交易日不在证券的发行期内", + 1264: "ETF产品禁止申购", + 1265: "ETF产品禁止赎回", + 1266: "限制股东账户进行撤指定", + 1267: "限制股东账户进行转托管", + 1268: "机构客户/主柜业务不支持银行转帐", + 1269: "不能买入被禁止开仓的证券", + 1270: "不能买入黑名单中的证券", + 1271: "股东账户没有交易存托凭证的权限", + 1272: "股东账户没有交易创新企业股票的权限", + 1273: "非法的出入金转账类型", + +} diff --git a/vnpy/gateway/oes/md.py b/vnpy/gateway/oes/md.py new file mode 100644 index 00000000..8d2ac0ad --- /dev/null +++ b/vnpy/gateway/oes/md.py @@ -0,0 +1,245 @@ +from datetime import datetime +from threading import Thread +# noinspection PyUnresolvedReferences +from typing import Any, Callable, Dict + +from vnoes import * + +from vnpy.trader.constant import Exchange +from vnpy.trader.gateway import BaseGateway +from vnpy.trader.object import SubscribeRequest, TickData +from .error_code import error_to_str + +EXCHANGE_MDS2VT = { + eMdsExchangeIdT.MDS_EXCH_SSE: Exchange.SSE, + eMdsExchangeIdT.MDS_EXCH_SZSE: Exchange.SZSE, +} +EXCHANGE_VT2MDS = {v: k for k, v in EXCHANGE_MDS2VT.items()} + + +class OesMdMessageLoop: + + def __init__(self, gateway: BaseGateway, env: MdsApiClientEnvT): + self.gateway = gateway + self.env = env + self.alive = False + self.th = Thread(target=self.message_loop) + + self.message_handlers: Dict[int, Callable[[dict], None]] = { + eMdsMsgTypeT.MDS_MSGTYPE_MARKET_DATA_SNAPSHOT_FULL_REFRESH: self.on_market_full_refresh, + eMdsMsgTypeT.MDS_MSGTYPE_L2_MARKET_DATA_SNAPSHOT: self.on_l2_market_data_snapshot, + eMdsMsgTypeT.MDS_MSGTYPE_L2_ORDER: self.on_l2_order, + eMdsMsgTypeT.MDS_MSGTYPE_L2_TRADE: self.on_l2_trade, + + eMdsMsgTypeT.MDS_MSGTYPE_QRY_SECURITY_STATUS: self.on_security_status, + eMdsMsgTypeT.MDS_MSGTYPE_L2_MARKET_DATA_INCREMENTAL: lambda x: 1, + eMdsMsgTypeT.MDS_MSGTYPE_L2_BEST_ORDERS_SNAPSHOT: self.on_best_orders_snapshot, + eMdsMsgTypeT.MDS_MSGTYPE_INDEX_SNAPSHOT_FULL_REFRESH: self.on_index_snapshot_full_refresh, + eMdsMsgTypeT.MDS_MSGTYPE_L2_MARKET_OVERVIEW: self.on_l2_market_overview, + eMdsMsgTypeT.MDS_MSGTYPE_OPTION_SNAPSHOT_FULL_REFRESH: self.on_option_snapshot_ful_refresh, + eMdsMsgTypeT.MDS_MSGTYPE_TRADING_SESSION_STATUS: self.on_trading_session_status, + eMdsMsgTypeT.MDS_MSGTYPE_SECURITY_STATUS: self.on_security_status, + eMdsMsgTypeT.MDS_MSGTYPE_MARKET_DATA_REQUEST: self.on_market_data_request, + eMdsMsgTypeT.MDS_MSGTYPE_HEARTBEAT: lambda x: 1, + } + self.last_tick: Dict[str, TickData] = {} + self.symbol_to_exchange: Dict[str, Exchange] = {} + + def register_symbol(self, symbol: str, exchange: Exchange): + self.symbol_to_exchange[symbol] = exchange + + def get_last_tick(self, symbol): + try: + return self.last_tick[symbol] + except KeyError: + tick = TickData( + gateway_name=self.gateway.gateway_name, + symbol=symbol, + exchange=self.symbol_to_exchange[symbol], + # todo: use cache of something else to resolve exchange + datetime=datetime.utcnow() + ) + self.last_tick[symbol] = tick + return tick + + def start(self): + self.alive = True + self.th.start() + + def join(self): + self.th.join() + + def on_message(self, session_info: SGeneralClientChannelT, + head: SMsgHeadT, + body: Any): + if session_info.protocolType == SMSG_PROTO_BINARY: + b = cast.toMdsMktRspMsgBodyT(body) + if head.msgId in self.message_handlers: + # self.gateway.write_log( + # f"msg id : {head.msgId} {eMdsMsgTypeT(head.msgId)}") + self.message_handlers[head.msgId](b) + else: + self.gateway.write_log( + f"unknown msg id : {head.msgId} {eMdsMsgTypeT(head.msgId)}") + else: + self.gateway.write_log(f"unknown prototype : {session_info.protocolType}") + return 1 + + def message_loop(self): + tcp_channel = self.env.tcpChannel + timeout_ms = 1000 + is_timeout = SPlatform_IsNegEtimeout + is_disconnected = SPlatform_IsNegEpipe + while self.alive: + ret = MdsApi_WaitOnMsg(tcp_channel, + timeout_ms, + self.on_message) + if ret < 0: + if is_timeout(ret): + pass + if is_disconnected(ret): + # todo: handle disconnected + self.alive = False + break + pass + return + + def on_l2_market_data_snapshot(self, d: MdsMktRspMsgBodyT): + data: MdsL2StockSnapshotBodyT = d.mktDataSnapshot.l2Stock + symbol = str(data.SecurityID) + tick = self.get_last_tick(symbol) + tick.limit_up = data.HighPx / 10000 + tick.limit_down = data.LowPx / 10000 + tick.open_price = data.OpenPx / 10000 + tick.pre_close = data.ClosePx / 10000 + tick.high_price = data.HighPx / 10000 + tick.low_price = data.LowPx / 10000 + + for i in range(min(data.BidPriceLevel, 5)): + tick.__dict__['bid_price_' + str(i + 1)] = data.BidLevels[i].Price / 10000 + for i in range(min(data.OfferPriceLevel, 5)): + tick.__dict__['ask_price_' + str(i + 1)] = data.OfferLevels[i].Price / 10000 + self.gateway.on_tick(tick) + + def on_market_full_refresh(self, d: MdsMktRspMsgBodyT): + data: MdsStockSnapshotBodyT = d.mktDataSnapshot.stock + symbol = data.SecurityID + tick = self.get_last_tick(symbol) + tick.limit_up = data.HighPx / 10000 + tick.limit_down = data.LowPx / 10000 + tick.open_price = data.OpenPx / 10000 + tick.pre_close = data.ClosePx / 10000 + tick.high_price = data.HighPx / 10000 + tick.low_price = data.LowPx / 10000 + + for i in range(5): + tick.__dict__['bid_price_' + str(i + 1)] = data.BidLevels[i].Price / 10000 + for i in range(5): + tick.__dict__['ask_price_' + str(i + 1)] = data.OfferLevels[i].Price / 10000 + self.gateway.on_tick(tick) + pass + + def on_l2_trade(self, d: MdsMktRspMsgBodyT): + data = d.trade + symbol = data.SecurityID + tick = self.get_last_tick(symbol) + tick.datetime = datetime.utcnow() + tick.volume = data.TradeQty + tick.last_price = data.TradePrice / 10000 + self.gateway.on_tick(tick) + + def on_market_data_request(self, d: MdsMktRspMsgBodyT): + pass + + def on_trading_session_status(self, d: MdsMktRspMsgBodyT): + pass + + def on_l2_market_overview(self, d: MdsMktRspMsgBodyT): + pass + + def on_index_snapshot_full_refresh(self, d: MdsMktRspMsgBodyT): + pass + + def on_option_snapshot_ful_refresh(self, d: MdsMktRspMsgBodyT): + pass + + def on_best_orders_snapshot(self, d: MdsMktRspMsgBodyT): + pass + + def on_l2_order(self, d: MdsMktRspMsgBodyT): + pass + + def on_security_status(self, d: MdsMktRspMsgBodyT): + pass + + def stop(self): + self.alive = False + + +class OesMdApi: + + def __init__(self, gateway: BaseGateway): + self.gateway = gateway + self.env = MdsApiClientEnvT() + self.message_loop = OesMdMessageLoop(gateway, self.env) + + def connect(self, config_path: str): + if not MdsApi_InitAllByConvention(self.env, config_path): + pass + + if not MdsApi_IsValidTcpChannel(self.env.tcpChannel): + pass + if not MdsApi_IsValidQryChannel(self.env.qryChannel): + pass + + def start(self): + self.message_loop.start() + + def stop(self): + self.message_loop.stop() + if not MdsApi_LogoutAll(self.env, True): + pass # doc for this function is error + if not MdsApi_DestoryAll(self.env): + pass # doc for this function is error + + def join(self): + self.message_loop.join() + + # why isn't arg a ContractData? + def subscribe(self, req: SubscribeRequest): + mds_req = MdsMktDataRequestReqT() + entry = MdsMktDataRequestEntryT() + mds_req.subMode = eMdsSubscribeModeT.MDS_SUB_MODE_APPEND + mds_req.tickType = eMdsSubscribedTickExpireTypeT.MDS_TICK_EXPIRE_TYPE_TIMELY # 1s, 3s? + mds_req.sseStockFlag = eMdsMktSubscribeFlagT.MDS_MKT_SUB_FLAG_DEFAULT + mds_req.sseIndexFlag = eMdsMktSubscribeFlagT.MDS_MKT_SUB_FLAG_DEFAULT + mds_req.sseOptionFlag = eMdsMktSubscribeFlagT.MDS_MKT_SUB_FLAG_DEFAULT + mds_req.szseStockFlag = eMdsMktSubscribeFlagT.MDS_MKT_SUB_FLAG_DEFAULT + mds_req.szseIndexFlag = eMdsMktSubscribeFlagT.MDS_MKT_SUB_FLAG_DEFAULT + mds_req.szseOptionFlag = eMdsMktSubscribeFlagT.MDS_MKT_SUB_FLAG_DEFAULT + mds_req.isRequireInitialMktData = 1 + mds_req.tickExpireType = eMdsSubscribedTickExpireTypeT.MDS_TICK_EXPIRE_TYPE_TIMELY + mds_req.dataTypes = ( + eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_L1_SNAPSHOT + | eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_L2_SNAPSHOT + | eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_L2_TRADE + | eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_L2_ORDER + | eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_INDEX_SNAPSHOT + | eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_OPTION_SNAPSHOT + ) + mds_req.beginTime = 0 + mds_req.subSecurityCnt = 1 + + entry.exchId = EXCHANGE_VT2MDS[req.exchange] + entry.securityType = eMdsSecurityTypeT.MDS_SECURITY_TYPE_STOCK # todo: option and others + entry.instrId = int(req.symbol) + + self.message_loop.register_symbol(req.symbol, req.exchange) + ret = MdsApi_SubscribeMarketData( + self.env.tcpChannel, + mds_req, + entry) + if not ret: + self.gateway.write_log( + f"MdsApi_SubscribeByString failed with {ret}:{error_to_str(ret)}") + pass diff --git a/vnpy/gateway/oes/oes_gateway.py b/vnpy/gateway/oes/oes_gateway.py new file mode 100644 index 00000000..bd9ae897 --- /dev/null +++ b/vnpy/gateway/oes/oes_gateway.py @@ -0,0 +1,120 @@ +# encoding: UTF-8 +""" +""" +import hashlib +import json +import os +from threading import Thread +from typing import Any, Callable, Dict + +import vnoes + +from .md import OesMdApi +from .td import OesTdApi + +from vnpy.trader.gateway import BaseGateway +from vnpy.trader.object import (CancelRequest, OrderRequest, SubscribeRequest, ContractData, + AccountData, PositionData, OrderData, TradeData, TickData) +from vnpy.trader.utility import get_file_path +from .utils import config_template + + +class OesGateway(BaseGateway): + """ + VN Trader Gateway for BitMEX connection. + """ + + def on_tick(self, tick: TickData): + super().on_tick(tick) + + def on_trade(self, trade: TradeData): + super().on_trade(trade) + + def on_order(self, order: OrderData): + super().on_order(order) + + def on_position(self, position: PositionData): + super().on_position(position) + + def on_account(self, account: AccountData): + super().on_account(account) + + def on_contract(self, contract: ContractData): + super().on_contract(contract) + + default_setting = { + "td_ord_server": "tcp://106.15.58.119:6101", + "td_rpt_server": "tcp://106.15.58.119:6301", + "td_qry_server": "tcp://106.15.58.119:6401", + "md_tcp_server": "tcp://139.196.228.232:5103", + "md_qry_server": "tcp://139.196.228.232:5203", + "username": "", + "password": "", + } + + def __init__(self, event_engine): + """Constructor""" + super().__init__(event_engine, "OES") + + self.md_api = OesMdApi(self) + self.td_api = OesTdApi(self) + + def connect(self, setting: dict): + """""" + if not setting['password'].startswith("md5:"): + setting['password'] = "md5:" + hashlib.md5(setting['password'].encode()).hexdigest() + + config_path = get_file_path("vnoes.ini") + with open(config_path, "wt") as f: + if 'test' in setting: + log_level = 'DEBUG' + log_mode = 'console' + else: + log_level = 'WARNING' + log_mode = 'file' + log_dir = get_file_path('oes') + log_path = os.path.join(log_dir, 'log.log') + if os.path.exists(log_dir): + os.mkdir(log_dir) + content = config_template.format(**setting, + log_level=log_level, + log_mode=log_mode, + log_path=log_path) + f.write(content) + + self.td_api.connect(str(config_path)) + self.td_api.query_account() + self.td_api.query_contracts() + self.td_api.query_position() + self.td_api.init_query_orders() + self.td_api.start() + + self.md_api.connect(str(config_path)) + self.md_api.start() + + def subscribe(self, req: SubscribeRequest): + """""" + self.md_api.subscribe(req) + + def send_order(self, req: OrderRequest): + """""" + return self.td_api.send_order(req) + + def cancel_order(self, req: CancelRequest): + """""" + return self.td_api.cancel_order(req) + + def query_account(self): + """""" + return self.td_api.query_account() + + def query_position(self): + """""" + return self.query_position() + + def close(self): + """""" + self.md_api.stop() + self.td_api.stop() + pass + diff --git a/vnpy/gateway/oes/td.py b/vnpy/gateway/oes/td.py new file mode 100644 index 00000000..3b734a20 --- /dev/null +++ b/vnpy/gateway/oes/td.py @@ -0,0 +1,764 @@ +from copy import copy +from dataclasses import dataclass +from datetime import datetime +from threading import Thread +# noinspection PyUnresolvedReferences +from typing import Any, Callable, Dict + +from vnoes import * + +from vnpy.gateway.oes.error_code import error_to_str +from vnpy.trader.constant import Direction, Exchange, Offset, PriceType, Product, Status +from vnpy.trader.gateway import BaseGateway +from vnpy.trader.object import AccountData, CancelRequest, ContractData, OrderData, OrderRequest, \ + PositionData, TradeData + +EXCHANGE_OES2VT = { + eOesMarketIdT.OES_MKT_SH_ASHARE: Exchange.SSE, + eOesMarketIdT.OES_MKT_SZ_ASHARE: Exchange.SZSE, + eOesMarketIdT.OES_MKT_SH_OPTION: Exchange.SHFE, +} +EXCHANGE_VT2OES = {v: k for k, v in EXCHANGE_OES2VT.items()} + +PRODUCT_OES2VT = { + eOesMarketIdT.OES_MKT_SH_ASHARE: Product.EQUITY, + eOesMarketIdT.OES_MKT_SZ_ASHARE: Product.EQUITY, + eOesMarketIdT.OES_MKT_SH_OPTION: Product.FUTURES, +} + +# only limit price can match, all other price types are not perfectly match. +ORDER_TYPE_VT2OES = { + (Exchange.SSE, PriceType.LIMIT): eOesOrdTypeShT.OES_ORD_TYPE_SH_LMT, + (Exchange.SZSE, PriceType.LIMIT): eOesOrdTypeSzT.OES_ORD_TYPE_SZ_LMT, +} + +BUY_SELL_TYPE_VT2OES = { + (Exchange.SSE, Offset.OPEN, Direction.LONG): eOesBuySellTypeT.OES_BS_TYPE_BUY, + (Exchange.SSE, Offset.OPEN, Direction.SHORT): eOesBuySellTypeT.OES_BS_TYPE_SELL, + (Exchange.SSE, Offset.OPEN, Direction.NET): eOesBuySellTypeT.OES_BS_TYPE_BUY, + (Exchange.SSE, Offset.CLOSE, Direction.LONG): eOesBuySellTypeT.OES_BS_TYPE_SELL, + (Exchange.SSE, Offset.CLOSE, Direction.SHORT): eOesBuySellTypeT.OES_BS_TYPE_BUY, + (Exchange.SSE, Offset.CLOSE, Direction.NET): eOesBuySellTypeT.OES_BS_TYPE_SELL, + + (Exchange.SZSE, Offset.OPEN, Direction.LONG): eOesBuySellTypeT.OES_BS_TYPE_BUY, + (Exchange.SZSE, Offset.OPEN, Direction.SHORT): eOesBuySellTypeT.OES_BS_TYPE_SELL, + (Exchange.SZSE, Offset.OPEN, Direction.NET): eOesBuySellTypeT.OES_BS_TYPE_BUY, + (Exchange.SZSE, Offset.CLOSE, Direction.LONG): eOesBuySellTypeT.OES_BS_TYPE_SELL, + (Exchange.SZSE, Offset.CLOSE, Direction.SHORT): eOesBuySellTypeT.OES_BS_TYPE_BUY, + (Exchange.SZSE, Offset.CLOSE, Direction.NET): eOesBuySellTypeT.OES_BS_TYPE_SELL, + + (Exchange.SHFE, Offset.OPEN, Direction.LONG): eOesBuySellTypeT.OES_BS_TYPE_BUY_OPEN, + (Exchange.SHFE, Offset.OPEN, Direction.SHORT): eOesBuySellTypeT.OES_BS_TYPE_SELL_OPEN, + (Exchange.SHFE, Offset.OPEN, Direction.NET): eOesBuySellTypeT.OES_BS_TYPE_BUY_OPEN, + (Exchange.SHFE, Offset.CLOSE, Direction.LONG): eOesBuySellTypeT.OES_BS_TYPE_BUY_CLOSE, + (Exchange.SHFE, Offset.CLOSE, Direction.SHORT): eOesBuySellTypeT.OES_BS_TYPE_SELL_CLOSE, + (Exchange.SHFE, Offset.CLOSE, Direction.NET): eOesBuySellTypeT.OES_BS_TYPE_BUY_CLOSE, + # todo: eOesBuySellTypeT.OES_BS_TYPE_OPTION_EXERCISE == 行权 +} + +STATUS_OES2VT = { + eOesOrdStatusT.OES_ORD_STATUS_NEW: Status.NOTTRADED, + eOesOrdStatusT.OES_ORD_STATUS_DECLARED: Status.NOTTRADED, + eOesOrdStatusT.OES_ORD_STATUS_PARTIALLY_FILLED: Status.PARTTRADED, + eOesOrdStatusT.OES_ORD_STATUS_FILLED: Status.ALLTRADED, + + eOesOrdStatusT.OES_ORD_STATUS_CANCEL_DONE: Status.CANCELLED, + eOesOrdStatusT.OES_ORD_STATUS_PARTIALLY_CANCELED: Status.CANCELLED, + eOesOrdStatusT.OES_ORD_STATUS_CANCELED: Status.CANCELLED, + + eOesOrdStatusT.OES_ORD_STATUS_INVALID_OES: Status.REJECTED, + eOesOrdStatusT.OES_ORD_STATUS_INVALID_SH_F: Status.REJECTED, + eOesOrdStatusT.OES_ORD_STATUS_INVALID_SH_E: Status.REJECTED, + eOesOrdStatusT.OES_ORD_STATUS_INVALID_SH_COMM: Status.REJECTED, + eOesOrdStatusT.OES_ORD_STATUS_INVALID_SZ_F: Status.REJECTED, + eOesOrdStatusT.OES_ORD_STATUS_INVALID_SZ_E: Status.REJECTED, + eOesOrdStatusT.OES_ORD_STATUS_INVALID_SZ_REJECT: Status.REJECTED, + eOesOrdStatusT.OES_ORD_STATUS_INVALID_SZ_TRY_AGAIN: Status.REJECTED, +} + + +@dataclass +class InternalOrder: + order_id: int = None + vt_order: OrderData = None + req_data: OesOrdReqT = None + rpt_data: OesOrdCnfmT = None + + +class OrderManager: + + def __init__(self): + self.last_order_id = 100000000 + self._orders: Dict[int, InternalOrder] = {} + + # key tuple: seqNo, ordId, envId, userInfo + self._remote_created_orders: Dict[Tuple[int, int, int, int], InternalOrder] = {} + + @staticmethod + def hash_remote_order(data): + key = (data.origClSeqNo, data.origClOrdId, data.origClEnvId, data.userInfo) + return key + + @staticmethod + def hash_remote_trade(data: OesTrdCnfmT): + key = (data.clSeqNo, data.clOrdId, data.clEnvId, data.userInfo) + return key + + def new_local_id(self): + id = self.last_order_id + self.last_order_id += 1 + return id + + def new_remote_id(self): + id = self.last_order_id + self.last_order_id += 1 + return id + + def save_local_created(self, order_id: int, order: OrderData, oes_req: OesOrdReqT): + self._orders[order_id] = InternalOrder( + order_id=order_id, + vt_order=order, + req_data=oes_req + ) + + def save_remote_created(self, order_id: int, vt_order: OrderData, data: OesOrdCnfmT): + internal_order = InternalOrder( + order_id=order_id, + vt_order=vt_order, + rpt_data=data + ) + self._orders[order_id] = internal_order + key = self.hash_remote_order(data) + self._remote_created_orders[key] = internal_order + + def get_from_order_id(self, id: int): + return self._orders[id] + + def get_remote_created_order_from_oes_data(self, data): + """ + :return: internal_order if succeed else None, will check only remote created order + """ + try: + key = self.hash_remote_order(data) + except AttributeError: + key = self.hash_remote_trade(data) + try: + return self._remote_created_orders[key] + except KeyError: + return None + + def get_from_oes_data(self, data): + try: + key = self.hash_remote_order(data) + except AttributeError: + key = self.hash_remote_trade(data) + try: + return self._remote_created_orders[key] + except KeyError: + order_id = key[3] + return self._orders[order_id] + + +class OesTdMessageLoop: + + def __init__(self, + gateway: BaseGateway, + env: OesApiClientEnvT, + order_manager: OrderManager, + td: "OesTdApi" + ): + self.gateway = gateway + self.env = env + self.order_manager = order_manager + self.td = td + + self.alive = False + self.th = Thread(target=self.message_loop) + + self.message_handlers: Dict[int, Callable[[dict], None]] = { + eOesMsgTypeT.OESMSG_RPT_BUSINESS_REJECT: self.on_reject, + eOesMsgTypeT.OESMSG_RPT_ORDER_INSERT: self.on_order_inserted, + eOesMsgTypeT.OESMSG_RPT_ORDER_REPORT: self.on_order_report, + eOesMsgTypeT.OESMSG_RPT_TRADE_REPORT: self.on_trade_report, + eOesMsgTypeT.OESMSG_RPT_STOCK_HOLDING_VARIATION: self.on_stock_holding, + eOesMsgTypeT.OESMSG_RPT_OPTION_HOLDING_VARIATION: self.on_option_holding, + eOesMsgTypeT.OESMSG_RPT_CASH_ASSET_VARIATION: self.on_cash, + eOesMsgTypeT.OESMSG_SESS_HEARTBEAT: lambda x: x, + } + + def start(self): + self.alive = True + self.th.start() + + def join(self): + self.th.join() + + def on_message(self, session_info: SGeneralClientChannelT, + head: SMsgHeadT, + body: Any): + if session_info.protocolType == SMSG_PROTO_BINARY: + b = cast.toOesRspMsgBodyT(body) + if head.msgId in self.message_handlers: + self.message_handlers[head.msgId](b) + else: + self.gateway.write_log( + f"unknown msg id : {head.msgId} {eOesMsgTypeT(head.msgId)}") + else: + self.gateway.write_log(f"unknown prototype : {session_info.protocolType}") + return 1 + + def message_loop(self): + rtp_channel = self.env.rptChannel + timeout_ms = 1000 + is_timeout = SPlatform_IsNegEtimeout + is_disconnected = SPlatform_IsNegEpipe + + while self.alive: + ret = OesApi_WaitReportMsg(rtp_channel, + timeout_ms, + self.on_message) + if ret < 0: + if is_timeout(ret): + pass + if is_disconnected(ret): + # todo: handle disconnected + self.alive = False + break + pass + return + + def on_reject(self, d: OesRspMsgBodyT): + error_code = d.rptMsg.rptHead.ordRejReason + error_string = error_to_str(error_code) + data: OesOrdRejectT = d.rptMsg.rptBody.ordRejectRsp + i = self.order_manager.get_from_oes_data(data) + vt_order = i.vt_order + + if vt_order == Status.ALLTRADED: + return + + vt_order.status = Status.REJECTED + + self.gateway.on_order(vt_order) + self.gateway.write_log( + f"Order: {vt_order.vt_symbol}-{vt_order.vt_orderid} Code: {error_code} Rejected: {error_string}") + + def on_order_inserted(self, d: OesRspMsgBodyT): + data = d.rptMsg.rptBody.ordInsertRsp + + i = self.order_manager.get_from_oes_data(data) + vt_order = i.vt_order + vt_order.status = STATUS_OES2VT[data.ordStatus] + vt_order.volume = data.ordQty - data.canceledQty + vt_order.traded = data.cumQty + + self.gateway.on_order(vt_order) + + def on_order_report(self, d: OesRspMsgBodyT): + data: OesOrdCnfmT = d.rptMsg.rptBody.ordCnfm + + i = self.order_manager.get_from_oes_data(data) + vt_order = i.vt_order + vt_order.status = STATUS_OES2VT[data.ordStatus] + vt_order.volume = data.ordQty - data.canceledQty + vt_order.traded = data.cumQty + self.gateway.on_order(vt_order) + + def on_trade_report(self, d: OesRspMsgBodyT): + data: OesTrdCnfmT = d.rptMsg.rptBody.trdCnfm + + i = self.order_manager.get_from_oes_data(data) + vt_order = i.vt_order + # vt_order.status = STATUS_OES2VT[data.ordStatus] + + trade = TradeData( + gateway_name=self.gateway.gateway_name, + symbol=data.securityId, + exchange=EXCHANGE_OES2VT[data.mktId], + orderid=data.userInfo, + tradeid=data.exchTrdNum, + direction=vt_order.direction, + offset=vt_order.offset, + price=data.trdPrice / 10000, + volume=data.trdQty, + time=datetime.utcnow().isoformat() # strict + ) + self.gateway.on_trade(trade) + + # hack : + # Sometimes order_report is not received after a trade is received. + # (only trade msg but no order msg) + # This cause a problem that vt_order.traded stay 0 after a trade, which is a error state. + # So we have to query new status of order for every receiving of trade. + # BUT + # Oes have no async call to query order only. + # And calling sync function here will slow down vnpy. + # So we queue it into another thread. + self.td.schedule_query_order(i) + + def on_option_holding(self, d: OesRspMsgBodyT): + pass + + def on_stock_holding(self, d: OesRspMsgBodyT): + data = d.rptMsg.rptBody.stkHoldingRpt + position = PositionData( + gateway_name=self.gateway.gateway_name, + symbol=data.securityId, + exchange=EXCHANGE_OES2VT[data.mktId], + direction=Direction.NET, + volume=data.sumHld, + frozen=data.lockHld, + price=data.costPrice / 10000, + # pnl=data.costPrice - data.originalCostAmt, + pnl=0, # todo: oes只提供日初持仓价格信息,不提供最初持仓价格信息,所以pnl只有当日的 + yd_volume=data.originalHld, + ) + self.gateway.on_position(position) + + def on_cash(self, d: OesRspMsgBodyT): + data = d.rptMsg.rptBody.cashAssetRpt + + balance = data.currentTotalBal + availiable = data.currentAvailableBal + # drawable = data.currentDrawableBal + account_id = data.custId + account = AccountData( + gateway_name=self.gateway.gateway_name, + accountid=account_id, + balance=balance, + frozen=balance - availiable, + ) + self.gateway.on_account(account) + return 1 + + def stop(self): + self.alive = False + + +class OesTdApi: + + def __init__(self, gateway: BaseGateway): + self.gateway = gateway + self.env = OesApiClientEnvT() + + self.order_manager = OrderManager() + self.message_loop = OesTdMessageLoop(gateway, + self.env, + self.order_manager, + self) + + self.account_id = None + self.last_seq_index = 1 # 0 has special manning for oes + + def connect(self, config_path: str): + if not OesApi_InitAllByConvention(self.env, config_path, -1, self.last_seq_index): + pass + self.last_seq_index = self.env.ordChannel.lastOutMsgSeq + 1 + + if not OesApi_IsValidOrdChannel(self.env.ordChannel): + pass + if not OesApi_IsValidQryChannel(self.env.qryChannel): + pass + if not OesApi_IsValidRptChannel(self.env.rptChannel): + pass + + def start(self): + self.message_loop.start() + + def stop(self): + self.message_loop.stop() + if not OesApi_LogoutAll(self.env, True): + pass # doc for this function is error + if not OesApi_DestoryAll(self.env): + pass # doc for this function is error + + def join(self): + self.message_loop.join() + + def query_account(self) -> bool: + return self.query_cash_asset() + + def query_cash_asset(self) -> bool: + ret = OesApi_QueryCashAsset(self.env.qryChannel, + OesQryCashAssetFilterT(), + self.on_query_asset + ) + return ret >= 0 + + def on_query_asset(self, + session_info: SGeneralClientChannelT, + head: SMsgHeadT, + body: Any, + cursor: OesQryCursorT, + ): + data = cast.toOesCashAssetItemT(body) + balance = data.currentTotalBal / 10000 + availiable = data.currentAvailableBal / 10000 + # drawable = data.currentDrawableBal + account_id = data.custId + account = AccountData( + gateway_name=self.gateway.gateway_name, + accountid=account_id, + balance=balance, + frozen=balance - availiable, + ) + self.account_id = account_id + self.gateway.on_account(account) + return 1 + + def query_stock(self, ) -> bool: + # Thread(target=self._query_stock, ).start() + return self._query_stock() + + def _query_stock(self, ) -> bool: + f = OesQryStockFilterT() + ret = OesApi_QueryStock(self.env.qryChannel, f, self.on_query_stock) + return ret >= 0 + + def on_query_stock(self, + session_info: SGeneralClientChannelT, + head: SMsgHeadT, + body: Any, + cursor: OesQryCursorT, + ): + data: OesStockBaseInfoT = cast.toOesStockItemT(body) + contract = ContractData( + gateway_name=self.gateway.gateway_name, + symbol=data.securityId, + exchange=EXCHANGE_OES2VT[data.mktId], + name=data.securityName, + product=PRODUCT_OES2VT[data.mktId], + size=data.buyQtyUnit, + pricetick=data.priceUnit, + ) + self.gateway.on_contract(contract) + return 1 + + def query_option(self) -> bool: + f = OesQryOptionFilterT() + ret = OesApi_QueryOption(self.env.qryChannel, + f, + self.on_query_option + ) + return ret >= 0 + + def on_query_option(self, + session_info: SGeneralClientChannelT, + head: SMsgHeadT, + body: Any, + cursor: OesQryCursorT, + ): + data = cast.toOesOptionItemT(body) + contract = ContractData( + gateway_name=self.gateway.gateway_name, + symbol=data.securityId, + exchange=EXCHANGE_OES2VT[data.mktId], + name=data.securityName, + product=PRODUCT_OES2VT[data.mktId], + size=data.roundLot, + pricetick=data.tickSize, + ) + self.gateway.on_contract(contract) + return 1 + + def query_issue(self) -> bool: + f = OesQryIssueFilterT() + ret = OesApi_QueryIssue(self.env.qryChannel, + f, + self.on_query_issue + ) + return ret >= 0 + + def on_query_issue(self, + session_info: SGeneralClientChannelT, + head: SMsgHeadT, + body: Any, + cursor: OesQryCursorT, + ): + data = cast.toOesIssueItemT(body) + contract = ContractData( + gateway_name=self.gateway.gateway_name, + symbol=data.securityId, + exchange=EXCHANGE_OES2VT[data.mktId], + name=data.securityName, + product=PRODUCT_OES2VT[data.mktId], + size=data.qtyUnit, + pricetick=1, + ) + self.gateway.on_contract(contract) + return 1 + + def query_etf(self) -> bool: + f = OesQryEtfFilterT() + ret = OesApi_QueryEtf(self.env.qryChannel, + f, + self.on_query_etf + ) + return ret >= 0 + + def on_query_etf(self, + session_info: SGeneralClientChannelT, + head: SMsgHeadT, + body: Any, + cursor: OesQryCursorT, + ): + data = cast.toOesEtfItemT(body) + contract = ContractData( + gateway_name=self.gateway.gateway_name, + symbol=data.securityId, + exchange=EXCHANGE_OES2VT[data.mktId], + name=data.securityId, + product=PRODUCT_OES2VT[data.mktId], + size=data.creRdmUnit, # todo: to verify! creRdmUnit : 每个篮子 (最小申购、赎回单位) 对应的ETF份数 + pricetick=1, + ) + self.gateway.on_contract(contract) + return 1 + + def query_stock_holding(self) -> bool: + f = OesQryStkHoldingFilterT() + ret = OesApi_QueryStkHolding(self.env.qryChannel, + f, + self.on_query_stock_holding + ) + return ret >= 0 + + def on_query_stock_holding(self, + session_info: SGeneralClientChannelT, + head: SMsgHeadT, + body: Any, + cursor: OesQryCursorT, + ): + data = cast.toOesStkHoldingItemT(body) + + position = PositionData( + gateway_name=self.gateway.gateway_name, + symbol=data.securityId, + exchange=EXCHANGE_OES2VT[data.mktId], + direction=Direction.NET, + volume=data.sumHld, + frozen=data.lockHld, + price=data.costPrice / 10000, + # pnl=data.costPrice - data.originalCostAmt, + pnl=0, # todo: oes只提供日初持仓价格信息,不提供最初持仓价格信息,所以pnl只有当日的 + yd_volume=data.originalHld, + ) + self.gateway.on_position(position) + return 1 + + def query_option_holding(self) -> bool: + f = OesQryStkHoldingFilterT() + f.mktId = eOesMarketIdT.OES_MKT_ID_UNDEFINE + f.userInfo = 0 + ret = OesApi_QueryOptHolding(self.env.qryChannel, + f, + self.on_query_holding + ) + return ret >= 0 + + def on_query_holding(self, + session_info: SGeneralClientChannelT, + head: SMsgHeadT, + body: Any, + cursor: OesQryCursorT, + ): + data = cast.toOesOptHoldingItemT(body) + + # 权利 + pos_long = PositionData( + gateway_name=self.gateway.gateway_name, + symbol=data.securityId, + exchange=EXCHANGE_OES2VT[data.mktId], + direction=Direction.LONG, + volume=data.hldA, + frozen=data.hldRA, + price=0, + # pnl=data.costPrice - data.originalCostAmt, + pnl=0, + yd_volume=0, + ) + self.gateway.on_position(pos_long) + + # 义务 + pos_short = PositionData( + gateway_name=self.gateway.gateway_name, + symbol=data.securityId, + exchange=EXCHANGE_OES2VT[data.mktId], + direction=Direction.SHORT, + volume=data.hldB, + frozen=data.hldRB, + price=0, + # pnl=data.costPrice - data.originalCostAmt, + pnl=0, + yd_volume=0, + ) + self.gateway.on_position(pos_short) + + return 1 + + def query_contracts(self): + self.query_stock() + # self.query_option() + # self.query_issue() + pass + + def query_position(self): + self.query_stock_holding() + self.query_option_holding() + + def send_order(self, vt_req: OrderRequest): + seq_id = self.last_seq_index + self.last_seq_index += 1 # note: thread un-safe here, conflict with on_query_order + order_id = self.order_manager.new_local_id() + + oes_req = OesOrdReqT() + oes_req.clSeqNo = seq_id + oes_req.mktId = EXCHANGE_VT2OES[vt_req.exchange] + oes_req.ordType = ORDER_TYPE_VT2OES[(vt_req.exchange, vt_req.price_type)] + oes_req.bsType = BUY_SELL_TYPE_VT2OES[(vt_req.exchange, vt_req.offset, vt_req.direction)] + oes_req.invAcctId = "" + oes_req.securityId = vt_req.symbol + oes_req.ordQty = int(vt_req.volume) + oes_req.ordPrice = int(vt_req.price * 10000) + oes_req.userInfo = order_id + + ret = OesApi_SendOrderReq(self.env.ordChannel, + oes_req + ) + + order = vt_req.create_order_data(str(order_id), self.gateway.gateway_name) + order.direction = Direction.NET # fix direction into NET: stock only + if ret >= 0: + self.order_manager.save_local_created(order_id, order, oes_req) + self.gateway.on_order(order) + else: + self.gateway.write_log("Failed to send_order!") + + return order.vt_orderid + + def cancel_order(self, vt_req: CancelRequest): + seq_id = self.last_seq_index + self.last_seq_index += 1 # note: thread un-safe here + + oes_req = OesOrdCancelReqT() + order_id = int(vt_req.orderid) + internal_order = self.order_manager.get_from_order_id(order_id) + if internal_order.rpt_data: + data = internal_order.rpt_data + # oes_req.origClSeqNo = self.local_id_to_sys_id[int(order_id)] + oes_req.origClOrdId = data.clOrdId + oes_req.origClSeqNo = data.clSeqNo + oes_req.origClEnvId = data.origClEnvId + oes_req.mktId = data.mktId + # oes_req.invAcctId = data.invAcctId + # oes_req.mktId = data.mktId + # oes_req.securityId = data.securityId + else: + data = internal_order.req_data + oes_req.origClSeqNo = data.clSeqNo + oes_req.mktId = internal_order.req_data.mktId + + oes_req.clSeqNo = seq_id + oes_req.invAcctId = "" + oes_req.securityId = vt_req.symbol + oes_req.userInfo = order_id + ret = OesApi_SendOrderCancelReq(self.env.ordChannel, + oes_req) + + if ret >= 0: + pass + else: + pass + return + + def schedule_query_order(self, internal_order: InternalOrder)->Thread: + th = Thread(target=self.query_order, args=(internal_order, )) + th.start() + return th + + def query_order(self, internal_order: InternalOrder) -> bool: + f = OesQryOrdFilterT() + if internal_order.req_data: + f.clSeqNo = internal_order.req_data.clSeqNo + f.mktId = internal_order.req_data.mktId + f.invAcctId = internal_order.req_data.invAcctId + else: + f.clSeqNo = internal_order.rpt_data.origClSeqNo + f.clOrdId = internal_order.rpt_data.origClOrdId + f.clEnvId = internal_order.rpt_data.origClEnvId + f.mktId = internal_order.rpt_data.mktId + f.invAcctId = internal_order.rpt_data.invAcctId + ret = OesApi_QueryOrder(self.env.qryChannel, + f, + self.on_query_order + ) + return ret >= 0 + + def on_query_order(self, + session_info: SGeneralClientChannelT, + head: SMsgHeadT, + body: Any, + cursor: OesQryCursorT): + data: OesOrdCnfmT = cast.toOesOrdItemT(body) + i = self.order_manager.get_from_oes_data(data) + vt_order = i.vt_order + vt_order.status = STATUS_OES2VT[data.ordStatus] + vt_order.volume = data.ordQty - data.canceledQty + vt_order.traded = data.cumQty + self.gateway.on_order(vt_order) + return 1 + + def init_query_orders(self) -> bool: + """ + :note: this function can be called only before calling send_order + :return: + """ + f = OesQryOrdFilterT() + ret = OesApi_QueryOrder(self.env.qryChannel, + f, + self.on_init_query_orders + ) + return ret >= 0 + + def on_init_query_orders(self, + session_info: SGeneralClientChannelT, + head: SMsgHeadT, + body: Any, + cursor: OesQryCursorT, + ): + data: OesOrdCnfmT = cast.toOesOrdItemT(body) + i = self.order_manager.get_remote_created_order_from_oes_data(data) + if not i: + order_id = self.order_manager.new_remote_id() + + if data.bsType == eOesBuySellTypeT.OES_BS_TYPE_BUY: + offset = Offset.OPEN + else: + offset = Offset.CLOSE + + vt_order = OrderData( + gateway_name=self.gateway.gateway_name, + symbol=data.securityId, + exchange=EXCHANGE_OES2VT[data.mktId], + orderid=order_id if order_id else data.userInfo, # generated id + direction=Direction.NET, + offset=offset, + price=data.ordPrice / 10000, + volume=data.ordQty - data.canceledQty, + traded=data.cumQty, + status=STATUS_OES2VT[data.ordStatus], + + # this time should be generated automatically or by a static function + time=datetime.utcnow().isoformat(), + ) + self.order_manager.save_remote_created(order_id, vt_order, data) + self.gateway.on_order(vt_order) + return 1 + else: + vt_order = i.vt_order + vt_order.status = STATUS_OES2VT[data.ordStatus] + vt_order.volume = data.ordQty - data.canceledQty + vt_order.traded = data.cumQty + self.gateway.on_order(vt_order) + return 1 + diff --git a/vnpy/gateway/oes/utils.py b/vnpy/gateway/oes/utils.py new file mode 100644 index 00000000..3d8b3d83 --- /dev/null +++ b/vnpy/gateway/oes/utils.py @@ -0,0 +1,6 @@ +import os + +mydir = os.path.dirname(__file__) +config_template_path = os.path.join(mydir, "config_template.ini") +with open(config_template_path, "rt", encoding='utf-8') as f: + config_template = f.read()