[New] OesGateway
This commit is contained in:
parent
a5f9171713
commit
c06d9d066b
1
vnpy/gateway/oes/__init__.py
Normal file
1
vnpy/gateway/oes/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .oes_gateway import OesGateway
|
135
vnpy/gateway/oes/config_template.ini
Normal file
135
vnpy/gateway/oes/config_template.ini
Normal file
@ -0,0 +1,135 @@
|
||||
#
|
||||
# MDS API接口库的配置文件样例
|
||||
#
|
||||
|
||||
##############################################
|
||||
# 客户端配置
|
||||
##############################################
|
||||
|
||||
[oes_client]
|
||||
ordServer = 1 {td_ord_server}
|
||||
rptServer = 1 {td_rpt_server}
|
||||
qryServer = 1 {td_qry_server}
|
||||
|
||||
username = {username}
|
||||
# 密码支持明文和MD5两种格式 (如 txt:XXX 或 md5:XXX..., 不带前缀则默认为明文)
|
||||
password = {password}
|
||||
heartBtInt = 30
|
||||
|
||||
# 客户端环境号, 用于区分不同客户端实例上报的委托申报, 取值由客户端自行分配
|
||||
# 取值范围 [0~99] ([100~127] 为保留区间,客户端应避免使用)
|
||||
clEnvId = 0
|
||||
|
||||
# 待订阅的客户端环境号
|
||||
# - 大于0, 区分环境号, 仅订阅环境号对应的回报数据
|
||||
# - 小于等于0, 不区分环境号, 订阅该客户下的所有回报数据
|
||||
rpt.subcribeEnvId = 0
|
||||
|
||||
# 待订阅的回报消息类型集合
|
||||
# - 0:默认 (等价于: 1,2,4,8,0x10,0x20,0x40), 0xFFFF:所有
|
||||
# - 1:OES业务拒绝, 2:OES委托已生成, 4:交易所委托回报, 8:交易所成交回报, 0x10:出入金委托执行报告,
|
||||
# - 0x20:资金变动信息, 0x40:持仓变动信息, 0x80:市场状态信息
|
||||
# 要订阅多个数据种类, 可以用逗号或空格分隔, 或者设置为并集值
|
||||
# 比如想订阅所有委托、成交相关的回报消息,可以使用如下两种方式:
|
||||
# - rpt.subcribeRptTypes = 1,4,8
|
||||
# - 或等价的: rpt.subcribeRptTypes = 0x0D
|
||||
rpt.subcribeRptTypes = 0
|
||||
|
||||
# 服务器集群的集群类型 (1: 基于复制集的高可用集群, 2: 基于对等节点的服务器集群, 0: 默认为基于复制集的高可用集群)
|
||||
clusterType = 0
|
||||
|
||||
# 套接字参数配置 (可选配置)
|
||||
soRcvbuf = 8192
|
||||
soSndbuf = 8192
|
||||
connTimeoutMs = 5000
|
||||
tcpNodelay = 1
|
||||
quickAck = 1
|
||||
keepalive = 1
|
||||
keepIdle = 60
|
||||
keepIntvl = 5
|
||||
keepCnt = 9
|
||||
|
||||
[mds_client]
|
||||
#udpServer.L1 = udp-mcast://232.200.151.100:5301
|
||||
#udpServer.L2 = udp-mcast://232.200.152.100:5302
|
||||
#udpServer.TickTrade = udp-mcast://232.200.153.100:5303
|
||||
#udpServer.TickOrder = udp-mcast://232.200.154.100:5304
|
||||
|
||||
tcpServer = {md_tcp_server}
|
||||
qryServer = {md_qry_server}
|
||||
|
||||
username = {username}
|
||||
# 密码支持明文和MD5两种格式 (如 txt:XXX 或 md5:XXX..., 不带前缀则默认为明文)
|
||||
password = {password}
|
||||
heartBtInt = 30
|
||||
|
||||
sse.stock.enable = false
|
||||
sse.stock.instrs =
|
||||
|
||||
sse.index.enable = false
|
||||
sse.index.instrs =
|
||||
|
||||
sse.option.enable = false
|
||||
sse.option.instrs = 10000001, 11001996
|
||||
|
||||
szse.stock.enable = false
|
||||
szse.stock.instrs =
|
||||
|
||||
szse.index.enable = false
|
||||
szse.index.instrs =
|
||||
|
||||
# 订阅模式 (0: 设置为订阅列表中的股票, 1: 增加订阅列表中的股票, 2: 删除订阅列表中的股票)
|
||||
mktData.subMode = 0
|
||||
|
||||
# 数据模式 (0: 订阅最新快照并跳过过时数据, 1: 订阅最新快照并立即发送, 2: 订阅所有时点的行情快照)
|
||||
mktData.tickType = 0
|
||||
|
||||
# 逐笔数据的过期时间类型 (0: 不过期, 1: 立即过期 (若落后于快照1秒则视为过期), 2: 及时过期 (3秒), 3: 超时过期 (30秒))
|
||||
mktData.tickExpireType = 0
|
||||
|
||||
# 订阅的数据种类
|
||||
# (0:所有, 1:L1快照/指数/期权, 2:L2快照, 4:L2委托队列, 8:L2逐笔成交,
|
||||
# 0x10:L2逐笔委托, 0x20:L2虚拟集合竞价, 0x40:L2市场总览, 0x100:市场状态, 0x200:证券实时状态,
|
||||
# 0x400:指数行情, 0x800:期权行情)
|
||||
# 要订阅多个数据种类, 可以用逗号或空格分隔, 或者设置为并集值, 如:
|
||||
# "mktData.dataTypes = 1,2,4" 或等价的 "mktData.dataTypes = 0x07"
|
||||
mktData.dataTypes = 0
|
||||
|
||||
# 请求订阅的行情数据的起始时间 (格式: HHMMSS 或 HHMMSSsss)
|
||||
# (-1: 从头开始获取, 0: 从最新位置开始获取实时行情, 大于0: 从指定的起始时间开始获取)
|
||||
mktData.beginTime = 0
|
||||
|
||||
# 在推送实时行情数据之前, 是否需要推送已订阅产品的初始的行情快照
|
||||
#mktData.isRequireInitialMktData = 0
|
||||
|
||||
# 行情服务器集群的集群类型 (1: 基于复制集的高可用集群, 2: 基于对等节点的服务器集群, 0: 默认为基于复制集的高可用集群)
|
||||
clusterType = 0
|
||||
|
||||
# 套接字参数配置 (可选配置)
|
||||
soRcvbuf = 8192
|
||||
soSndbuf = 1024
|
||||
connTimeoutMs = 5000
|
||||
tcpNodelay = 1
|
||||
quickAck = 1
|
||||
keepalive = 1
|
||||
keepIdle = 60
|
||||
keepIntvl = 5
|
||||
keepCnt = 9
|
||||
#mcastInterfaceIp = 192.168.0.11
|
||||
|
||||
|
||||
##############################################
|
||||
# 日志配置
|
||||
##############################################
|
||||
|
||||
[log]
|
||||
log.root_category = {log_level}, console_log
|
||||
log.mode={log_mode}
|
||||
log.threshold=TRACE
|
||||
log.file={log_path}
|
||||
log.file.max_file_length=300
|
||||
log.file.max_backup_index=3
|
||||
|
||||
[console_log]
|
||||
log.mode=console
|
||||
log.threshold=ERROR
|
128
vnpy/gateway/oes/error_code.py
Normal file
128
vnpy/gateway/oes/error_code.py
Normal file
@ -0,0 +1,128 @@
|
||||
from vnoes import OesApi_GetErrorMsg, OesApi_GetLastError
|
||||
|
||||
|
||||
def error_to_str(code: int):
|
||||
try:
|
||||
# return error_codes[code]
|
||||
return OesApi_GetErrorMsg(code)
|
||||
except KeyError:
|
||||
return "Unknown error code!"
|
||||
|
||||
|
||||
def get_last_error():
|
||||
code = OesApi_GetLastError()
|
||||
return OesApi_GetErrorMsg(code)
|
||||
|
||||
|
||||
error_codes = {
|
||||
1001: "报文格式错误",
|
||||
1002: "当前主机不是主节点",
|
||||
1003: "主存库操作失败",
|
||||
1004: "因状态等基础数据不匹配,无法更新数据",
|
||||
1005: "协议版本不兼容",
|
||||
1006: "数据不存在",
|
||||
1007: "未到达服务开放时间",
|
||||
1008: "非法的定位游标",
|
||||
1009: "非法的客户端登陆用户名称",
|
||||
1010: "非法的证券代码",
|
||||
1011: "非法的客户代码",
|
||||
1012: "非法的客户端类型",
|
||||
1013: "客户端已被禁用",
|
||||
1014: "客户端密码不正确",
|
||||
1015: "客户端重复登录",
|
||||
1016: "客户端连接数量过多",
|
||||
1017: "客户端未经授权操作他人账户",
|
||||
1018: "数据超出修改范围",
|
||||
1019: "非法的应用系统名称",
|
||||
1020: "请求条件有冲突",
|
||||
1021: "非法的客户端IP/MAC地址格式",
|
||||
1022: "尚不支持此业务",
|
||||
1023: "非法的客户端环境号",
|
||||
1024: "交易所拒绝",
|
||||
1025: "主柜拒绝",
|
||||
1026: "流量超出限制范围",
|
||||
1027: "禁止使用API登录",
|
||||
1028: "非法的私募基金产品代码",
|
||||
1029: "密码未改变",
|
||||
1030: "非法的来源分类",
|
||||
1031: "非法的加密类型",
|
||||
1032: "非法的客户端设备序列号",
|
||||
1033: "无可用节点",
|
||||
1101: "登录柜台失败",
|
||||
1102: "上报至柜台失败",
|
||||
1103: "从柜台获取状态失败",
|
||||
1201: "非法的证券账户代码",
|
||||
1202: "非法的资金账户代码",
|
||||
1203: "非法的出入金方向",
|
||||
1204: "非法的市场代码",
|
||||
1205: "非法的证券类别",
|
||||
1206: "非法的买卖类型",
|
||||
1207: "非法的币种",
|
||||
1208: "非法的委托类型",
|
||||
1209: "无效的账户状态",
|
||||
1210: "未找到委托信息",
|
||||
1211: "未找到持仓信息",
|
||||
1212: "未找到出入金流水",
|
||||
1213: "流水号重复",
|
||||
1214: "当前时段不能报价",
|
||||
1215: "没有操作权限",
|
||||
1216: "可用/可取资金余额不足",
|
||||
1217: "可用持仓不足",
|
||||
1218: "委托数量不在合法区间内",
|
||||
1219: "非数量单位的整数倍",
|
||||
1220: "非法的PBU代码",
|
||||
1221: "价格不在合法区间内",
|
||||
1222: "非价格单位的整数倍",
|
||||
1223: "无涨停价市价委托失败",
|
||||
1224: "当前时段不支持市价委托",
|
||||
1225: "无效的订单状态",
|
||||
1226: "撤单信息与原始委托不符",
|
||||
1227: "重复撤单",
|
||||
1228: "未通过限仓检查",
|
||||
1229: "未通过限购检查",
|
||||
1230: "超过了ETF最大现金替代比例",
|
||||
1231: "非行权日",
|
||||
1232: "产品(证券)停牌",
|
||||
1233: "合约限制开仓",
|
||||
1234: "当日累计申购或赎回数量超过限额",
|
||||
1235: "当日累计净申购或净赎回数量超过限额",
|
||||
1236: "找不到前收盘价",
|
||||
1237: "超过报撤比限制",
|
||||
1238: "委托请求过于频繁",
|
||||
1239: "非法的出入金转账金额",
|
||||
1240: "重复的认购委托",
|
||||
1241: "认购委托份数超过认购额度",
|
||||
1242: "出入金笔数超过限制",
|
||||
1243: "禁止同时做多笔出入金",
|
||||
1244: "非法的新股配号、中签记录类型",
|
||||
1245: "限制股东账户进行买交易",
|
||||
1246: "限制股东账户进行卖交易",
|
||||
1247: "限制股东账户进行逆回购交易",
|
||||
1248: "限制股东账户进行新股认购交易",
|
||||
1249: "股东账户没有市价委托交易的权限",
|
||||
1250: "股东账户没有交易创业板证券的权限",
|
||||
1251: "股东账户没有交易分级基金的权限",
|
||||
1252: "股东账户没有债券合格投资者的权限",
|
||||
1253: "客户风险评级低于交易证券需求的风险等级",
|
||||
1254: "股东账户没有交易风险警示证券的权限",
|
||||
1255: "股东账户没有交易退市整理证券的权限",
|
||||
1256: "股东账户没有交易单市场ETF的权限",
|
||||
1257: "股东账户没有交易跨市场ETF的权限",
|
||||
1258: "股东账户没有交易货币基金ETF的权限",
|
||||
1259: "股东账户没有交易跨境ETF的权限",
|
||||
1260: "仅允许合格投资者投资该证券",
|
||||
1261: "仅允许合格机构投资者投资该证券",
|
||||
1262: "出入金执行异常,待人工干预",
|
||||
1263: "交易日不在证券的发行期内",
|
||||
1264: "ETF产品禁止申购",
|
||||
1265: "ETF产品禁止赎回",
|
||||
1266: "限制股东账户进行撤指定",
|
||||
1267: "限制股东账户进行转托管",
|
||||
1268: "机构客户/主柜业务不支持银行转帐",
|
||||
1269: "不能买入被禁止开仓的证券",
|
||||
1270: "不能买入黑名单中的证券",
|
||||
1271: "股东账户没有交易存托凭证的权限",
|
||||
1272: "股东账户没有交易创新企业股票的权限",
|
||||
1273: "非法的出入金转账类型",
|
||||
|
||||
}
|
245
vnpy/gateway/oes/md.py
Normal file
245
vnpy/gateway/oes/md.py
Normal file
@ -0,0 +1,245 @@
|
||||
from datetime import datetime
|
||||
from threading import Thread
|
||||
# noinspection PyUnresolvedReferences
|
||||
from typing import Any, Callable, Dict
|
||||
|
||||
from vnoes import *
|
||||
|
||||
from vnpy.trader.constant import Exchange
|
||||
from vnpy.trader.gateway import BaseGateway
|
||||
from vnpy.trader.object import SubscribeRequest, TickData
|
||||
from .error_code import error_to_str
|
||||
|
||||
EXCHANGE_MDS2VT = {
|
||||
eMdsExchangeIdT.MDS_EXCH_SSE: Exchange.SSE,
|
||||
eMdsExchangeIdT.MDS_EXCH_SZSE: Exchange.SZSE,
|
||||
}
|
||||
EXCHANGE_VT2MDS = {v: k for k, v in EXCHANGE_MDS2VT.items()}
|
||||
|
||||
|
||||
class OesMdMessageLoop:
|
||||
|
||||
def __init__(self, gateway: BaseGateway, env: MdsApiClientEnvT):
|
||||
self.gateway = gateway
|
||||
self.env = env
|
||||
self.alive = False
|
||||
self.th = Thread(target=self.message_loop)
|
||||
|
||||
self.message_handlers: Dict[int, Callable[[dict], None]] = {
|
||||
eMdsMsgTypeT.MDS_MSGTYPE_MARKET_DATA_SNAPSHOT_FULL_REFRESH: self.on_market_full_refresh,
|
||||
eMdsMsgTypeT.MDS_MSGTYPE_L2_MARKET_DATA_SNAPSHOT: self.on_l2_market_data_snapshot,
|
||||
eMdsMsgTypeT.MDS_MSGTYPE_L2_ORDER: self.on_l2_order,
|
||||
eMdsMsgTypeT.MDS_MSGTYPE_L2_TRADE: self.on_l2_trade,
|
||||
|
||||
eMdsMsgTypeT.MDS_MSGTYPE_QRY_SECURITY_STATUS: self.on_security_status,
|
||||
eMdsMsgTypeT.MDS_MSGTYPE_L2_MARKET_DATA_INCREMENTAL: lambda x: 1,
|
||||
eMdsMsgTypeT.MDS_MSGTYPE_L2_BEST_ORDERS_SNAPSHOT: self.on_best_orders_snapshot,
|
||||
eMdsMsgTypeT.MDS_MSGTYPE_INDEX_SNAPSHOT_FULL_REFRESH: self.on_index_snapshot_full_refresh,
|
||||
eMdsMsgTypeT.MDS_MSGTYPE_L2_MARKET_OVERVIEW: self.on_l2_market_overview,
|
||||
eMdsMsgTypeT.MDS_MSGTYPE_OPTION_SNAPSHOT_FULL_REFRESH: self.on_option_snapshot_ful_refresh,
|
||||
eMdsMsgTypeT.MDS_MSGTYPE_TRADING_SESSION_STATUS: self.on_trading_session_status,
|
||||
eMdsMsgTypeT.MDS_MSGTYPE_SECURITY_STATUS: self.on_security_status,
|
||||
eMdsMsgTypeT.MDS_MSGTYPE_MARKET_DATA_REQUEST: self.on_market_data_request,
|
||||
eMdsMsgTypeT.MDS_MSGTYPE_HEARTBEAT: lambda x: 1,
|
||||
}
|
||||
self.last_tick: Dict[str, TickData] = {}
|
||||
self.symbol_to_exchange: Dict[str, Exchange] = {}
|
||||
|
||||
def register_symbol(self, symbol: str, exchange: Exchange):
|
||||
self.symbol_to_exchange[symbol] = exchange
|
||||
|
||||
def get_last_tick(self, symbol):
|
||||
try:
|
||||
return self.last_tick[symbol]
|
||||
except KeyError:
|
||||
tick = TickData(
|
||||
gateway_name=self.gateway.gateway_name,
|
||||
symbol=symbol,
|
||||
exchange=self.symbol_to_exchange[symbol],
|
||||
# todo: use cache of something else to resolve exchange
|
||||
datetime=datetime.utcnow()
|
||||
)
|
||||
self.last_tick[symbol] = tick
|
||||
return tick
|
||||
|
||||
def start(self):
|
||||
self.alive = True
|
||||
self.th.start()
|
||||
|
||||
def join(self):
|
||||
self.th.join()
|
||||
|
||||
def on_message(self, session_info: SGeneralClientChannelT,
|
||||
head: SMsgHeadT,
|
||||
body: Any):
|
||||
if session_info.protocolType == SMSG_PROTO_BINARY:
|
||||
b = cast.toMdsMktRspMsgBodyT(body)
|
||||
if head.msgId in self.message_handlers:
|
||||
# self.gateway.write_log(
|
||||
# f"msg id : {head.msgId} {eMdsMsgTypeT(head.msgId)}")
|
||||
self.message_handlers[head.msgId](b)
|
||||
else:
|
||||
self.gateway.write_log(
|
||||
f"unknown msg id : {head.msgId} {eMdsMsgTypeT(head.msgId)}")
|
||||
else:
|
||||
self.gateway.write_log(f"unknown prototype : {session_info.protocolType}")
|
||||
return 1
|
||||
|
||||
def message_loop(self):
|
||||
tcp_channel = self.env.tcpChannel
|
||||
timeout_ms = 1000
|
||||
is_timeout = SPlatform_IsNegEtimeout
|
||||
is_disconnected = SPlatform_IsNegEpipe
|
||||
while self.alive:
|
||||
ret = MdsApi_WaitOnMsg(tcp_channel,
|
||||
timeout_ms,
|
||||
self.on_message)
|
||||
if ret < 0:
|
||||
if is_timeout(ret):
|
||||
pass
|
||||
if is_disconnected(ret):
|
||||
# todo: handle disconnected
|
||||
self.alive = False
|
||||
break
|
||||
pass
|
||||
return
|
||||
|
||||
def on_l2_market_data_snapshot(self, d: MdsMktRspMsgBodyT):
|
||||
data: MdsL2StockSnapshotBodyT = d.mktDataSnapshot.l2Stock
|
||||
symbol = str(data.SecurityID)
|
||||
tick = self.get_last_tick(symbol)
|
||||
tick.limit_up = data.HighPx / 10000
|
||||
tick.limit_down = data.LowPx / 10000
|
||||
tick.open_price = data.OpenPx / 10000
|
||||
tick.pre_close = data.ClosePx / 10000
|
||||
tick.high_price = data.HighPx / 10000
|
||||
tick.low_price = data.LowPx / 10000
|
||||
|
||||
for i in range(min(data.BidPriceLevel, 5)):
|
||||
tick.__dict__['bid_price_' + str(i + 1)] = data.BidLevels[i].Price / 10000
|
||||
for i in range(min(data.OfferPriceLevel, 5)):
|
||||
tick.__dict__['ask_price_' + str(i + 1)] = data.OfferLevels[i].Price / 10000
|
||||
self.gateway.on_tick(tick)
|
||||
|
||||
def on_market_full_refresh(self, d: MdsMktRspMsgBodyT):
|
||||
data: MdsStockSnapshotBodyT = d.mktDataSnapshot.stock
|
||||
symbol = data.SecurityID
|
||||
tick = self.get_last_tick(symbol)
|
||||
tick.limit_up = data.HighPx / 10000
|
||||
tick.limit_down = data.LowPx / 10000
|
||||
tick.open_price = data.OpenPx / 10000
|
||||
tick.pre_close = data.ClosePx / 10000
|
||||
tick.high_price = data.HighPx / 10000
|
||||
tick.low_price = data.LowPx / 10000
|
||||
|
||||
for i in range(5):
|
||||
tick.__dict__['bid_price_' + str(i + 1)] = data.BidLevels[i].Price / 10000
|
||||
for i in range(5):
|
||||
tick.__dict__['ask_price_' + str(i + 1)] = data.OfferLevels[i].Price / 10000
|
||||
self.gateway.on_tick(tick)
|
||||
pass
|
||||
|
||||
def on_l2_trade(self, d: MdsMktRspMsgBodyT):
|
||||
data = d.trade
|
||||
symbol = data.SecurityID
|
||||
tick = self.get_last_tick(symbol)
|
||||
tick.datetime = datetime.utcnow()
|
||||
tick.volume = data.TradeQty
|
||||
tick.last_price = data.TradePrice / 10000
|
||||
self.gateway.on_tick(tick)
|
||||
|
||||
def on_market_data_request(self, d: MdsMktRspMsgBodyT):
|
||||
pass
|
||||
|
||||
def on_trading_session_status(self, d: MdsMktRspMsgBodyT):
|
||||
pass
|
||||
|
||||
def on_l2_market_overview(self, d: MdsMktRspMsgBodyT):
|
||||
pass
|
||||
|
||||
def on_index_snapshot_full_refresh(self, d: MdsMktRspMsgBodyT):
|
||||
pass
|
||||
|
||||
def on_option_snapshot_ful_refresh(self, d: MdsMktRspMsgBodyT):
|
||||
pass
|
||||
|
||||
def on_best_orders_snapshot(self, d: MdsMktRspMsgBodyT):
|
||||
pass
|
||||
|
||||
def on_l2_order(self, d: MdsMktRspMsgBodyT):
|
||||
pass
|
||||
|
||||
def on_security_status(self, d: MdsMktRspMsgBodyT):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
self.alive = False
|
||||
|
||||
|
||||
class OesMdApi:
|
||||
|
||||
def __init__(self, gateway: BaseGateway):
|
||||
self.gateway = gateway
|
||||
self.env = MdsApiClientEnvT()
|
||||
self.message_loop = OesMdMessageLoop(gateway, self.env)
|
||||
|
||||
def connect(self, config_path: str):
|
||||
if not MdsApi_InitAllByConvention(self.env, config_path):
|
||||
pass
|
||||
|
||||
if not MdsApi_IsValidTcpChannel(self.env.tcpChannel):
|
||||
pass
|
||||
if not MdsApi_IsValidQryChannel(self.env.qryChannel):
|
||||
pass
|
||||
|
||||
def start(self):
|
||||
self.message_loop.start()
|
||||
|
||||
def stop(self):
|
||||
self.message_loop.stop()
|
||||
if not MdsApi_LogoutAll(self.env, True):
|
||||
pass # doc for this function is error
|
||||
if not MdsApi_DestoryAll(self.env):
|
||||
pass # doc for this function is error
|
||||
|
||||
def join(self):
|
||||
self.message_loop.join()
|
||||
|
||||
# why isn't arg a ContractData?
|
||||
def subscribe(self, req: SubscribeRequest):
|
||||
mds_req = MdsMktDataRequestReqT()
|
||||
entry = MdsMktDataRequestEntryT()
|
||||
mds_req.subMode = eMdsSubscribeModeT.MDS_SUB_MODE_APPEND
|
||||
mds_req.tickType = eMdsSubscribedTickExpireTypeT.MDS_TICK_EXPIRE_TYPE_TIMELY # 1s, 3s?
|
||||
mds_req.sseStockFlag = eMdsMktSubscribeFlagT.MDS_MKT_SUB_FLAG_DEFAULT
|
||||
mds_req.sseIndexFlag = eMdsMktSubscribeFlagT.MDS_MKT_SUB_FLAG_DEFAULT
|
||||
mds_req.sseOptionFlag = eMdsMktSubscribeFlagT.MDS_MKT_SUB_FLAG_DEFAULT
|
||||
mds_req.szseStockFlag = eMdsMktSubscribeFlagT.MDS_MKT_SUB_FLAG_DEFAULT
|
||||
mds_req.szseIndexFlag = eMdsMktSubscribeFlagT.MDS_MKT_SUB_FLAG_DEFAULT
|
||||
mds_req.szseOptionFlag = eMdsMktSubscribeFlagT.MDS_MKT_SUB_FLAG_DEFAULT
|
||||
mds_req.isRequireInitialMktData = 1
|
||||
mds_req.tickExpireType = eMdsSubscribedTickExpireTypeT.MDS_TICK_EXPIRE_TYPE_TIMELY
|
||||
mds_req.dataTypes = (
|
||||
eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_L1_SNAPSHOT
|
||||
| eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_L2_SNAPSHOT
|
||||
| eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_L2_TRADE
|
||||
| eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_L2_ORDER
|
||||
| eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_INDEX_SNAPSHOT
|
||||
| eMdsSubscribeDataTypeT.MDS_SUB_DATA_TYPE_OPTION_SNAPSHOT
|
||||
)
|
||||
mds_req.beginTime = 0
|
||||
mds_req.subSecurityCnt = 1
|
||||
|
||||
entry.exchId = EXCHANGE_VT2MDS[req.exchange]
|
||||
entry.securityType = eMdsSecurityTypeT.MDS_SECURITY_TYPE_STOCK # todo: option and others
|
||||
entry.instrId = int(req.symbol)
|
||||
|
||||
self.message_loop.register_symbol(req.symbol, req.exchange)
|
||||
ret = MdsApi_SubscribeMarketData(
|
||||
self.env.tcpChannel,
|
||||
mds_req,
|
||||
entry)
|
||||
if not ret:
|
||||
self.gateway.write_log(
|
||||
f"MdsApi_SubscribeByString failed with {ret}:{error_to_str(ret)}")
|
||||
pass
|
120
vnpy/gateway/oes/oes_gateway.py
Normal file
120
vnpy/gateway/oes/oes_gateway.py
Normal file
@ -0,0 +1,120 @@
|
||||
# encoding: UTF-8
|
||||
"""
|
||||
"""
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
from threading import Thread
|
||||
from typing import Any, Callable, Dict
|
||||
|
||||
import vnoes
|
||||
|
||||
from .md import OesMdApi
|
||||
from .td import OesTdApi
|
||||
|
||||
from vnpy.trader.gateway import BaseGateway
|
||||
from vnpy.trader.object import (CancelRequest, OrderRequest, SubscribeRequest, ContractData,
|
||||
AccountData, PositionData, OrderData, TradeData, TickData)
|
||||
from vnpy.trader.utility import get_file_path
|
||||
from .utils import config_template
|
||||
|
||||
|
||||
class OesGateway(BaseGateway):
|
||||
"""
|
||||
VN Trader Gateway for BitMEX connection.
|
||||
"""
|
||||
|
||||
def on_tick(self, tick: TickData):
|
||||
super().on_tick(tick)
|
||||
|
||||
def on_trade(self, trade: TradeData):
|
||||
super().on_trade(trade)
|
||||
|
||||
def on_order(self, order: OrderData):
|
||||
super().on_order(order)
|
||||
|
||||
def on_position(self, position: PositionData):
|
||||
super().on_position(position)
|
||||
|
||||
def on_account(self, account: AccountData):
|
||||
super().on_account(account)
|
||||
|
||||
def on_contract(self, contract: ContractData):
|
||||
super().on_contract(contract)
|
||||
|
||||
default_setting = {
|
||||
"td_ord_server": "tcp://106.15.58.119:6101",
|
||||
"td_rpt_server": "tcp://106.15.58.119:6301",
|
||||
"td_qry_server": "tcp://106.15.58.119:6401",
|
||||
"md_tcp_server": "tcp://139.196.228.232:5103",
|
||||
"md_qry_server": "tcp://139.196.228.232:5203",
|
||||
"username": "",
|
||||
"password": "",
|
||||
}
|
||||
|
||||
def __init__(self, event_engine):
|
||||
"""Constructor"""
|
||||
super().__init__(event_engine, "OES")
|
||||
|
||||
self.md_api = OesMdApi(self)
|
||||
self.td_api = OesTdApi(self)
|
||||
|
||||
def connect(self, setting: dict):
|
||||
""""""
|
||||
if not setting['password'].startswith("md5:"):
|
||||
setting['password'] = "md5:" + hashlib.md5(setting['password'].encode()).hexdigest()
|
||||
|
||||
config_path = get_file_path("vnoes.ini")
|
||||
with open(config_path, "wt") as f:
|
||||
if 'test' in setting:
|
||||
log_level = 'DEBUG'
|
||||
log_mode = 'console'
|
||||
else:
|
||||
log_level = 'WARNING'
|
||||
log_mode = 'file'
|
||||
log_dir = get_file_path('oes')
|
||||
log_path = os.path.join(log_dir, 'log.log')
|
||||
if os.path.exists(log_dir):
|
||||
os.mkdir(log_dir)
|
||||
content = config_template.format(**setting,
|
||||
log_level=log_level,
|
||||
log_mode=log_mode,
|
||||
log_path=log_path)
|
||||
f.write(content)
|
||||
|
||||
self.td_api.connect(str(config_path))
|
||||
self.td_api.query_account()
|
||||
self.td_api.query_contracts()
|
||||
self.td_api.query_position()
|
||||
self.td_api.init_query_orders()
|
||||
self.td_api.start()
|
||||
|
||||
self.md_api.connect(str(config_path))
|
||||
self.md_api.start()
|
||||
|
||||
def subscribe(self, req: SubscribeRequest):
|
||||
""""""
|
||||
self.md_api.subscribe(req)
|
||||
|
||||
def send_order(self, req: OrderRequest):
|
||||
""""""
|
||||
return self.td_api.send_order(req)
|
||||
|
||||
def cancel_order(self, req: CancelRequest):
|
||||
""""""
|
||||
return self.td_api.cancel_order(req)
|
||||
|
||||
def query_account(self):
|
||||
""""""
|
||||
return self.td_api.query_account()
|
||||
|
||||
def query_position(self):
|
||||
""""""
|
||||
return self.query_position()
|
||||
|
||||
def close(self):
|
||||
""""""
|
||||
self.md_api.stop()
|
||||
self.td_api.stop()
|
||||
pass
|
||||
|
764
vnpy/gateway/oes/td.py
Normal file
764
vnpy/gateway/oes/td.py
Normal file
@ -0,0 +1,764 @@
|
||||
from copy import copy
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from threading import Thread
|
||||
# noinspection PyUnresolvedReferences
|
||||
from typing import Any, Callable, Dict
|
||||
|
||||
from vnoes import *
|
||||
|
||||
from vnpy.gateway.oes.error_code import error_to_str
|
||||
from vnpy.trader.constant import Direction, Exchange, Offset, PriceType, Product, Status
|
||||
from vnpy.trader.gateway import BaseGateway
|
||||
from vnpy.trader.object import AccountData, CancelRequest, ContractData, OrderData, OrderRequest, \
|
||||
PositionData, TradeData
|
||||
|
||||
EXCHANGE_OES2VT = {
|
||||
eOesMarketIdT.OES_MKT_SH_ASHARE: Exchange.SSE,
|
||||
eOesMarketIdT.OES_MKT_SZ_ASHARE: Exchange.SZSE,
|
||||
eOesMarketIdT.OES_MKT_SH_OPTION: Exchange.SHFE,
|
||||
}
|
||||
EXCHANGE_VT2OES = {v: k for k, v in EXCHANGE_OES2VT.items()}
|
||||
|
||||
PRODUCT_OES2VT = {
|
||||
eOesMarketIdT.OES_MKT_SH_ASHARE: Product.EQUITY,
|
||||
eOesMarketIdT.OES_MKT_SZ_ASHARE: Product.EQUITY,
|
||||
eOesMarketIdT.OES_MKT_SH_OPTION: Product.FUTURES,
|
||||
}
|
||||
|
||||
# only limit price can match, all other price types are not perfectly match.
|
||||
ORDER_TYPE_VT2OES = {
|
||||
(Exchange.SSE, PriceType.LIMIT): eOesOrdTypeShT.OES_ORD_TYPE_SH_LMT,
|
||||
(Exchange.SZSE, PriceType.LIMIT): eOesOrdTypeSzT.OES_ORD_TYPE_SZ_LMT,
|
||||
}
|
||||
|
||||
BUY_SELL_TYPE_VT2OES = {
|
||||
(Exchange.SSE, Offset.OPEN, Direction.LONG): eOesBuySellTypeT.OES_BS_TYPE_BUY,
|
||||
(Exchange.SSE, Offset.OPEN, Direction.SHORT): eOesBuySellTypeT.OES_BS_TYPE_SELL,
|
||||
(Exchange.SSE, Offset.OPEN, Direction.NET): eOesBuySellTypeT.OES_BS_TYPE_BUY,
|
||||
(Exchange.SSE, Offset.CLOSE, Direction.LONG): eOesBuySellTypeT.OES_BS_TYPE_SELL,
|
||||
(Exchange.SSE, Offset.CLOSE, Direction.SHORT): eOesBuySellTypeT.OES_BS_TYPE_BUY,
|
||||
(Exchange.SSE, Offset.CLOSE, Direction.NET): eOesBuySellTypeT.OES_BS_TYPE_SELL,
|
||||
|
||||
(Exchange.SZSE, Offset.OPEN, Direction.LONG): eOesBuySellTypeT.OES_BS_TYPE_BUY,
|
||||
(Exchange.SZSE, Offset.OPEN, Direction.SHORT): eOesBuySellTypeT.OES_BS_TYPE_SELL,
|
||||
(Exchange.SZSE, Offset.OPEN, Direction.NET): eOesBuySellTypeT.OES_BS_TYPE_BUY,
|
||||
(Exchange.SZSE, Offset.CLOSE, Direction.LONG): eOesBuySellTypeT.OES_BS_TYPE_SELL,
|
||||
(Exchange.SZSE, Offset.CLOSE, Direction.SHORT): eOesBuySellTypeT.OES_BS_TYPE_BUY,
|
||||
(Exchange.SZSE, Offset.CLOSE, Direction.NET): eOesBuySellTypeT.OES_BS_TYPE_SELL,
|
||||
|
||||
(Exchange.SHFE, Offset.OPEN, Direction.LONG): eOesBuySellTypeT.OES_BS_TYPE_BUY_OPEN,
|
||||
(Exchange.SHFE, Offset.OPEN, Direction.SHORT): eOesBuySellTypeT.OES_BS_TYPE_SELL_OPEN,
|
||||
(Exchange.SHFE, Offset.OPEN, Direction.NET): eOesBuySellTypeT.OES_BS_TYPE_BUY_OPEN,
|
||||
(Exchange.SHFE, Offset.CLOSE, Direction.LONG): eOesBuySellTypeT.OES_BS_TYPE_BUY_CLOSE,
|
||||
(Exchange.SHFE, Offset.CLOSE, Direction.SHORT): eOesBuySellTypeT.OES_BS_TYPE_SELL_CLOSE,
|
||||
(Exchange.SHFE, Offset.CLOSE, Direction.NET): eOesBuySellTypeT.OES_BS_TYPE_BUY_CLOSE,
|
||||
# todo: eOesBuySellTypeT.OES_BS_TYPE_OPTION_EXERCISE == 行权
|
||||
}
|
||||
|
||||
STATUS_OES2VT = {
|
||||
eOesOrdStatusT.OES_ORD_STATUS_NEW: Status.NOTTRADED,
|
||||
eOesOrdStatusT.OES_ORD_STATUS_DECLARED: Status.NOTTRADED,
|
||||
eOesOrdStatusT.OES_ORD_STATUS_PARTIALLY_FILLED: Status.PARTTRADED,
|
||||
eOesOrdStatusT.OES_ORD_STATUS_FILLED: Status.ALLTRADED,
|
||||
|
||||
eOesOrdStatusT.OES_ORD_STATUS_CANCEL_DONE: Status.CANCELLED,
|
||||
eOesOrdStatusT.OES_ORD_STATUS_PARTIALLY_CANCELED: Status.CANCELLED,
|
||||
eOesOrdStatusT.OES_ORD_STATUS_CANCELED: Status.CANCELLED,
|
||||
|
||||
eOesOrdStatusT.OES_ORD_STATUS_INVALID_OES: Status.REJECTED,
|
||||
eOesOrdStatusT.OES_ORD_STATUS_INVALID_SH_F: Status.REJECTED,
|
||||
eOesOrdStatusT.OES_ORD_STATUS_INVALID_SH_E: Status.REJECTED,
|
||||
eOesOrdStatusT.OES_ORD_STATUS_INVALID_SH_COMM: Status.REJECTED,
|
||||
eOesOrdStatusT.OES_ORD_STATUS_INVALID_SZ_F: Status.REJECTED,
|
||||
eOesOrdStatusT.OES_ORD_STATUS_INVALID_SZ_E: Status.REJECTED,
|
||||
eOesOrdStatusT.OES_ORD_STATUS_INVALID_SZ_REJECT: Status.REJECTED,
|
||||
eOesOrdStatusT.OES_ORD_STATUS_INVALID_SZ_TRY_AGAIN: Status.REJECTED,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class InternalOrder:
|
||||
order_id: int = None
|
||||
vt_order: OrderData = None
|
||||
req_data: OesOrdReqT = None
|
||||
rpt_data: OesOrdCnfmT = None
|
||||
|
||||
|
||||
class OrderManager:
|
||||
|
||||
def __init__(self):
|
||||
self.last_order_id = 100000000
|
||||
self._orders: Dict[int, InternalOrder] = {}
|
||||
|
||||
# key tuple: seqNo, ordId, envId, userInfo
|
||||
self._remote_created_orders: Dict[Tuple[int, int, int, int], InternalOrder] = {}
|
||||
|
||||
@staticmethod
|
||||
def hash_remote_order(data):
|
||||
key = (data.origClSeqNo, data.origClOrdId, data.origClEnvId, data.userInfo)
|
||||
return key
|
||||
|
||||
@staticmethod
|
||||
def hash_remote_trade(data: OesTrdCnfmT):
|
||||
key = (data.clSeqNo, data.clOrdId, data.clEnvId, data.userInfo)
|
||||
return key
|
||||
|
||||
def new_local_id(self):
|
||||
id = self.last_order_id
|
||||
self.last_order_id += 1
|
||||
return id
|
||||
|
||||
def new_remote_id(self):
|
||||
id = self.last_order_id
|
||||
self.last_order_id += 1
|
||||
return id
|
||||
|
||||
def save_local_created(self, order_id: int, order: OrderData, oes_req: OesOrdReqT):
|
||||
self._orders[order_id] = InternalOrder(
|
||||
order_id=order_id,
|
||||
vt_order=order,
|
||||
req_data=oes_req
|
||||
)
|
||||
|
||||
def save_remote_created(self, order_id: int, vt_order: OrderData, data: OesOrdCnfmT):
|
||||
internal_order = InternalOrder(
|
||||
order_id=order_id,
|
||||
vt_order=vt_order,
|
||||
rpt_data=data
|
||||
)
|
||||
self._orders[order_id] = internal_order
|
||||
key = self.hash_remote_order(data)
|
||||
self._remote_created_orders[key] = internal_order
|
||||
|
||||
def get_from_order_id(self, id: int):
|
||||
return self._orders[id]
|
||||
|
||||
def get_remote_created_order_from_oes_data(self, data):
|
||||
"""
|
||||
:return: internal_order if succeed else None, will check only remote created order
|
||||
"""
|
||||
try:
|
||||
key = self.hash_remote_order(data)
|
||||
except AttributeError:
|
||||
key = self.hash_remote_trade(data)
|
||||
try:
|
||||
return self._remote_created_orders[key]
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
def get_from_oes_data(self, data):
|
||||
try:
|
||||
key = self.hash_remote_order(data)
|
||||
except AttributeError:
|
||||
key = self.hash_remote_trade(data)
|
||||
try:
|
||||
return self._remote_created_orders[key]
|
||||
except KeyError:
|
||||
order_id = key[3]
|
||||
return self._orders[order_id]
|
||||
|
||||
|
||||
class OesTdMessageLoop:
|
||||
|
||||
def __init__(self,
|
||||
gateway: BaseGateway,
|
||||
env: OesApiClientEnvT,
|
||||
order_manager: OrderManager,
|
||||
td: "OesTdApi"
|
||||
):
|
||||
self.gateway = gateway
|
||||
self.env = env
|
||||
self.order_manager = order_manager
|
||||
self.td = td
|
||||
|
||||
self.alive = False
|
||||
self.th = Thread(target=self.message_loop)
|
||||
|
||||
self.message_handlers: Dict[int, Callable[[dict], None]] = {
|
||||
eOesMsgTypeT.OESMSG_RPT_BUSINESS_REJECT: self.on_reject,
|
||||
eOesMsgTypeT.OESMSG_RPT_ORDER_INSERT: self.on_order_inserted,
|
||||
eOesMsgTypeT.OESMSG_RPT_ORDER_REPORT: self.on_order_report,
|
||||
eOesMsgTypeT.OESMSG_RPT_TRADE_REPORT: self.on_trade_report,
|
||||
eOesMsgTypeT.OESMSG_RPT_STOCK_HOLDING_VARIATION: self.on_stock_holding,
|
||||
eOesMsgTypeT.OESMSG_RPT_OPTION_HOLDING_VARIATION: self.on_option_holding,
|
||||
eOesMsgTypeT.OESMSG_RPT_CASH_ASSET_VARIATION: self.on_cash,
|
||||
eOesMsgTypeT.OESMSG_SESS_HEARTBEAT: lambda x: x,
|
||||
}
|
||||
|
||||
def start(self):
|
||||
self.alive = True
|
||||
self.th.start()
|
||||
|
||||
def join(self):
|
||||
self.th.join()
|
||||
|
||||
def on_message(self, session_info: SGeneralClientChannelT,
|
||||
head: SMsgHeadT,
|
||||
body: Any):
|
||||
if session_info.protocolType == SMSG_PROTO_BINARY:
|
||||
b = cast.toOesRspMsgBodyT(body)
|
||||
if head.msgId in self.message_handlers:
|
||||
self.message_handlers[head.msgId](b)
|
||||
else:
|
||||
self.gateway.write_log(
|
||||
f"unknown msg id : {head.msgId} {eOesMsgTypeT(head.msgId)}")
|
||||
else:
|
||||
self.gateway.write_log(f"unknown prototype : {session_info.protocolType}")
|
||||
return 1
|
||||
|
||||
def message_loop(self):
|
||||
rtp_channel = self.env.rptChannel
|
||||
timeout_ms = 1000
|
||||
is_timeout = SPlatform_IsNegEtimeout
|
||||
is_disconnected = SPlatform_IsNegEpipe
|
||||
|
||||
while self.alive:
|
||||
ret = OesApi_WaitReportMsg(rtp_channel,
|
||||
timeout_ms,
|
||||
self.on_message)
|
||||
if ret < 0:
|
||||
if is_timeout(ret):
|
||||
pass
|
||||
if is_disconnected(ret):
|
||||
# todo: handle disconnected
|
||||
self.alive = False
|
||||
break
|
||||
pass
|
||||
return
|
||||
|
||||
def on_reject(self, d: OesRspMsgBodyT):
|
||||
error_code = d.rptMsg.rptHead.ordRejReason
|
||||
error_string = error_to_str(error_code)
|
||||
data: OesOrdRejectT = d.rptMsg.rptBody.ordRejectRsp
|
||||
i = self.order_manager.get_from_oes_data(data)
|
||||
vt_order = i.vt_order
|
||||
|
||||
if vt_order == Status.ALLTRADED:
|
||||
return
|
||||
|
||||
vt_order.status = Status.REJECTED
|
||||
|
||||
self.gateway.on_order(vt_order)
|
||||
self.gateway.write_log(
|
||||
f"Order: {vt_order.vt_symbol}-{vt_order.vt_orderid} Code: {error_code} Rejected: {error_string}")
|
||||
|
||||
def on_order_inserted(self, d: OesRspMsgBodyT):
|
||||
data = d.rptMsg.rptBody.ordInsertRsp
|
||||
|
||||
i = self.order_manager.get_from_oes_data(data)
|
||||
vt_order = i.vt_order
|
||||
vt_order.status = STATUS_OES2VT[data.ordStatus]
|
||||
vt_order.volume = data.ordQty - data.canceledQty
|
||||
vt_order.traded = data.cumQty
|
||||
|
||||
self.gateway.on_order(vt_order)
|
||||
|
||||
def on_order_report(self, d: OesRspMsgBodyT):
|
||||
data: OesOrdCnfmT = d.rptMsg.rptBody.ordCnfm
|
||||
|
||||
i = self.order_manager.get_from_oes_data(data)
|
||||
vt_order = i.vt_order
|
||||
vt_order.status = STATUS_OES2VT[data.ordStatus]
|
||||
vt_order.volume = data.ordQty - data.canceledQty
|
||||
vt_order.traded = data.cumQty
|
||||
self.gateway.on_order(vt_order)
|
||||
|
||||
def on_trade_report(self, d: OesRspMsgBodyT):
|
||||
data: OesTrdCnfmT = d.rptMsg.rptBody.trdCnfm
|
||||
|
||||
i = self.order_manager.get_from_oes_data(data)
|
||||
vt_order = i.vt_order
|
||||
# vt_order.status = STATUS_OES2VT[data.ordStatus]
|
||||
|
||||
trade = TradeData(
|
||||
gateway_name=self.gateway.gateway_name,
|
||||
symbol=data.securityId,
|
||||
exchange=EXCHANGE_OES2VT[data.mktId],
|
||||
orderid=data.userInfo,
|
||||
tradeid=data.exchTrdNum,
|
||||
direction=vt_order.direction,
|
||||
offset=vt_order.offset,
|
||||
price=data.trdPrice / 10000,
|
||||
volume=data.trdQty,
|
||||
time=datetime.utcnow().isoformat() # strict
|
||||
)
|
||||
self.gateway.on_trade(trade)
|
||||
|
||||
# hack :
|
||||
# Sometimes order_report is not received after a trade is received.
|
||||
# (only trade msg but no order msg)
|
||||
# This cause a problem that vt_order.traded stay 0 after a trade, which is a error state.
|
||||
# So we have to query new status of order for every receiving of trade.
|
||||
# BUT
|
||||
# Oes have no async call to query order only.
|
||||
# And calling sync function here will slow down vnpy.
|
||||
# So we queue it into another thread.
|
||||
self.td.schedule_query_order(i)
|
||||
|
||||
def on_option_holding(self, d: OesRspMsgBodyT):
|
||||
pass
|
||||
|
||||
def on_stock_holding(self, d: OesRspMsgBodyT):
|
||||
data = d.rptMsg.rptBody.stkHoldingRpt
|
||||
position = PositionData(
|
||||
gateway_name=self.gateway.gateway_name,
|
||||
symbol=data.securityId,
|
||||
exchange=EXCHANGE_OES2VT[data.mktId],
|
||||
direction=Direction.NET,
|
||||
volume=data.sumHld,
|
||||
frozen=data.lockHld,
|
||||
price=data.costPrice / 10000,
|
||||
# pnl=data.costPrice - data.originalCostAmt,
|
||||
pnl=0, # todo: oes只提供日初持仓价格信息,不提供最初持仓价格信息,所以pnl只有当日的
|
||||
yd_volume=data.originalHld,
|
||||
)
|
||||
self.gateway.on_position(position)
|
||||
|
||||
def on_cash(self, d: OesRspMsgBodyT):
|
||||
data = d.rptMsg.rptBody.cashAssetRpt
|
||||
|
||||
balance = data.currentTotalBal
|
||||
availiable = data.currentAvailableBal
|
||||
# drawable = data.currentDrawableBal
|
||||
account_id = data.custId
|
||||
account = AccountData(
|
||||
gateway_name=self.gateway.gateway_name,
|
||||
accountid=account_id,
|
||||
balance=balance,
|
||||
frozen=balance - availiable,
|
||||
)
|
||||
self.gateway.on_account(account)
|
||||
return 1
|
||||
|
||||
def stop(self):
|
||||
self.alive = False
|
||||
|
||||
|
||||
class OesTdApi:
|
||||
|
||||
def __init__(self, gateway: BaseGateway):
|
||||
self.gateway = gateway
|
||||
self.env = OesApiClientEnvT()
|
||||
|
||||
self.order_manager = OrderManager()
|
||||
self.message_loop = OesTdMessageLoop(gateway,
|
||||
self.env,
|
||||
self.order_manager,
|
||||
self)
|
||||
|
||||
self.account_id = None
|
||||
self.last_seq_index = 1 # 0 has special manning for oes
|
||||
|
||||
def connect(self, config_path: str):
|
||||
if not OesApi_InitAllByConvention(self.env, config_path, -1, self.last_seq_index):
|
||||
pass
|
||||
self.last_seq_index = self.env.ordChannel.lastOutMsgSeq + 1
|
||||
|
||||
if not OesApi_IsValidOrdChannel(self.env.ordChannel):
|
||||
pass
|
||||
if not OesApi_IsValidQryChannel(self.env.qryChannel):
|
||||
pass
|
||||
if not OesApi_IsValidRptChannel(self.env.rptChannel):
|
||||
pass
|
||||
|
||||
def start(self):
|
||||
self.message_loop.start()
|
||||
|
||||
def stop(self):
|
||||
self.message_loop.stop()
|
||||
if not OesApi_LogoutAll(self.env, True):
|
||||
pass # doc for this function is error
|
||||
if not OesApi_DestoryAll(self.env):
|
||||
pass # doc for this function is error
|
||||
|
||||
def join(self):
|
||||
self.message_loop.join()
|
||||
|
||||
def query_account(self) -> bool:
|
||||
return self.query_cash_asset()
|
||||
|
||||
def query_cash_asset(self) -> bool:
|
||||
ret = OesApi_QueryCashAsset(self.env.qryChannel,
|
||||
OesQryCashAssetFilterT(),
|
||||
self.on_query_asset
|
||||
)
|
||||
return ret >= 0
|
||||
|
||||
def on_query_asset(self,
|
||||
session_info: SGeneralClientChannelT,
|
||||
head: SMsgHeadT,
|
||||
body: Any,
|
||||
cursor: OesQryCursorT,
|
||||
):
|
||||
data = cast.toOesCashAssetItemT(body)
|
||||
balance = data.currentTotalBal / 10000
|
||||
availiable = data.currentAvailableBal / 10000
|
||||
# drawable = data.currentDrawableBal
|
||||
account_id = data.custId
|
||||
account = AccountData(
|
||||
gateway_name=self.gateway.gateway_name,
|
||||
accountid=account_id,
|
||||
balance=balance,
|
||||
frozen=balance - availiable,
|
||||
)
|
||||
self.account_id = account_id
|
||||
self.gateway.on_account(account)
|
||||
return 1
|
||||
|
||||
def query_stock(self, ) -> bool:
|
||||
# Thread(target=self._query_stock, ).start()
|
||||
return self._query_stock()
|
||||
|
||||
def _query_stock(self, ) -> bool:
|
||||
f = OesQryStockFilterT()
|
||||
ret = OesApi_QueryStock(self.env.qryChannel, f, self.on_query_stock)
|
||||
return ret >= 0
|
||||
|
||||
def on_query_stock(self,
|
||||
session_info: SGeneralClientChannelT,
|
||||
head: SMsgHeadT,
|
||||
body: Any,
|
||||
cursor: OesQryCursorT,
|
||||
):
|
||||
data: OesStockBaseInfoT = cast.toOesStockItemT(body)
|
||||
contract = ContractData(
|
||||
gateway_name=self.gateway.gateway_name,
|
||||
symbol=data.securityId,
|
||||
exchange=EXCHANGE_OES2VT[data.mktId],
|
||||
name=data.securityName,
|
||||
product=PRODUCT_OES2VT[data.mktId],
|
||||
size=data.buyQtyUnit,
|
||||
pricetick=data.priceUnit,
|
||||
)
|
||||
self.gateway.on_contract(contract)
|
||||
return 1
|
||||
|
||||
def query_option(self) -> bool:
|
||||
f = OesQryOptionFilterT()
|
||||
ret = OesApi_QueryOption(self.env.qryChannel,
|
||||
f,
|
||||
self.on_query_option
|
||||
)
|
||||
return ret >= 0
|
||||
|
||||
def on_query_option(self,
|
||||
session_info: SGeneralClientChannelT,
|
||||
head: SMsgHeadT,
|
||||
body: Any,
|
||||
cursor: OesQryCursorT,
|
||||
):
|
||||
data = cast.toOesOptionItemT(body)
|
||||
contract = ContractData(
|
||||
gateway_name=self.gateway.gateway_name,
|
||||
symbol=data.securityId,
|
||||
exchange=EXCHANGE_OES2VT[data.mktId],
|
||||
name=data.securityName,
|
||||
product=PRODUCT_OES2VT[data.mktId],
|
||||
size=data.roundLot,
|
||||
pricetick=data.tickSize,
|
||||
)
|
||||
self.gateway.on_contract(contract)
|
||||
return 1
|
||||
|
||||
def query_issue(self) -> bool:
|
||||
f = OesQryIssueFilterT()
|
||||
ret = OesApi_QueryIssue(self.env.qryChannel,
|
||||
f,
|
||||
self.on_query_issue
|
||||
)
|
||||
return ret >= 0
|
||||
|
||||
def on_query_issue(self,
|
||||
session_info: SGeneralClientChannelT,
|
||||
head: SMsgHeadT,
|
||||
body: Any,
|
||||
cursor: OesQryCursorT,
|
||||
):
|
||||
data = cast.toOesIssueItemT(body)
|
||||
contract = ContractData(
|
||||
gateway_name=self.gateway.gateway_name,
|
||||
symbol=data.securityId,
|
||||
exchange=EXCHANGE_OES2VT[data.mktId],
|
||||
name=data.securityName,
|
||||
product=PRODUCT_OES2VT[data.mktId],
|
||||
size=data.qtyUnit,
|
||||
pricetick=1,
|
||||
)
|
||||
self.gateway.on_contract(contract)
|
||||
return 1
|
||||
|
||||
def query_etf(self) -> bool:
|
||||
f = OesQryEtfFilterT()
|
||||
ret = OesApi_QueryEtf(self.env.qryChannel,
|
||||
f,
|
||||
self.on_query_etf
|
||||
)
|
||||
return ret >= 0
|
||||
|
||||
def on_query_etf(self,
|
||||
session_info: SGeneralClientChannelT,
|
||||
head: SMsgHeadT,
|
||||
body: Any,
|
||||
cursor: OesQryCursorT,
|
||||
):
|
||||
data = cast.toOesEtfItemT(body)
|
||||
contract = ContractData(
|
||||
gateway_name=self.gateway.gateway_name,
|
||||
symbol=data.securityId,
|
||||
exchange=EXCHANGE_OES2VT[data.mktId],
|
||||
name=data.securityId,
|
||||
product=PRODUCT_OES2VT[data.mktId],
|
||||
size=data.creRdmUnit, # todo: to verify! creRdmUnit : 每个篮子 (最小申购、赎回单位) 对应的ETF份数
|
||||
pricetick=1,
|
||||
)
|
||||
self.gateway.on_contract(contract)
|
||||
return 1
|
||||
|
||||
def query_stock_holding(self) -> bool:
|
||||
f = OesQryStkHoldingFilterT()
|
||||
ret = OesApi_QueryStkHolding(self.env.qryChannel,
|
||||
f,
|
||||
self.on_query_stock_holding
|
||||
)
|
||||
return ret >= 0
|
||||
|
||||
def on_query_stock_holding(self,
|
||||
session_info: SGeneralClientChannelT,
|
||||
head: SMsgHeadT,
|
||||
body: Any,
|
||||
cursor: OesQryCursorT,
|
||||
):
|
||||
data = cast.toOesStkHoldingItemT(body)
|
||||
|
||||
position = PositionData(
|
||||
gateway_name=self.gateway.gateway_name,
|
||||
symbol=data.securityId,
|
||||
exchange=EXCHANGE_OES2VT[data.mktId],
|
||||
direction=Direction.NET,
|
||||
volume=data.sumHld,
|
||||
frozen=data.lockHld,
|
||||
price=data.costPrice / 10000,
|
||||
# pnl=data.costPrice - data.originalCostAmt,
|
||||
pnl=0, # todo: oes只提供日初持仓价格信息,不提供最初持仓价格信息,所以pnl只有当日的
|
||||
yd_volume=data.originalHld,
|
||||
)
|
||||
self.gateway.on_position(position)
|
||||
return 1
|
||||
|
||||
def query_option_holding(self) -> bool:
|
||||
f = OesQryStkHoldingFilterT()
|
||||
f.mktId = eOesMarketIdT.OES_MKT_ID_UNDEFINE
|
||||
f.userInfo = 0
|
||||
ret = OesApi_QueryOptHolding(self.env.qryChannel,
|
||||
f,
|
||||
self.on_query_holding
|
||||
)
|
||||
return ret >= 0
|
||||
|
||||
def on_query_holding(self,
|
||||
session_info: SGeneralClientChannelT,
|
||||
head: SMsgHeadT,
|
||||
body: Any,
|
||||
cursor: OesQryCursorT,
|
||||
):
|
||||
data = cast.toOesOptHoldingItemT(body)
|
||||
|
||||
# 权利
|
||||
pos_long = PositionData(
|
||||
gateway_name=self.gateway.gateway_name,
|
||||
symbol=data.securityId,
|
||||
exchange=EXCHANGE_OES2VT[data.mktId],
|
||||
direction=Direction.LONG,
|
||||
volume=data.hldA,
|
||||
frozen=data.hldRA,
|
||||
price=0,
|
||||
# pnl=data.costPrice - data.originalCostAmt,
|
||||
pnl=0,
|
||||
yd_volume=0,
|
||||
)
|
||||
self.gateway.on_position(pos_long)
|
||||
|
||||
# 义务
|
||||
pos_short = PositionData(
|
||||
gateway_name=self.gateway.gateway_name,
|
||||
symbol=data.securityId,
|
||||
exchange=EXCHANGE_OES2VT[data.mktId],
|
||||
direction=Direction.SHORT,
|
||||
volume=data.hldB,
|
||||
frozen=data.hldRB,
|
||||
price=0,
|
||||
# pnl=data.costPrice - data.originalCostAmt,
|
||||
pnl=0,
|
||||
yd_volume=0,
|
||||
)
|
||||
self.gateway.on_position(pos_short)
|
||||
|
||||
return 1
|
||||
|
||||
def query_contracts(self):
|
||||
self.query_stock()
|
||||
# self.query_option()
|
||||
# self.query_issue()
|
||||
pass
|
||||
|
||||
def query_position(self):
|
||||
self.query_stock_holding()
|
||||
self.query_option_holding()
|
||||
|
||||
def send_order(self, vt_req: OrderRequest):
|
||||
seq_id = self.last_seq_index
|
||||
self.last_seq_index += 1 # note: thread un-safe here, conflict with on_query_order
|
||||
order_id = self.order_manager.new_local_id()
|
||||
|
||||
oes_req = OesOrdReqT()
|
||||
oes_req.clSeqNo = seq_id
|
||||
oes_req.mktId = EXCHANGE_VT2OES[vt_req.exchange]
|
||||
oes_req.ordType = ORDER_TYPE_VT2OES[(vt_req.exchange, vt_req.price_type)]
|
||||
oes_req.bsType = BUY_SELL_TYPE_VT2OES[(vt_req.exchange, vt_req.offset, vt_req.direction)]
|
||||
oes_req.invAcctId = ""
|
||||
oes_req.securityId = vt_req.symbol
|
||||
oes_req.ordQty = int(vt_req.volume)
|
||||
oes_req.ordPrice = int(vt_req.price * 10000)
|
||||
oes_req.userInfo = order_id
|
||||
|
||||
ret = OesApi_SendOrderReq(self.env.ordChannel,
|
||||
oes_req
|
||||
)
|
||||
|
||||
order = vt_req.create_order_data(str(order_id), self.gateway.gateway_name)
|
||||
order.direction = Direction.NET # fix direction into NET: stock only
|
||||
if ret >= 0:
|
||||
self.order_manager.save_local_created(order_id, order, oes_req)
|
||||
self.gateway.on_order(order)
|
||||
else:
|
||||
self.gateway.write_log("Failed to send_order!")
|
||||
|
||||
return order.vt_orderid
|
||||
|
||||
def cancel_order(self, vt_req: CancelRequest):
|
||||
seq_id = self.last_seq_index
|
||||
self.last_seq_index += 1 # note: thread un-safe here
|
||||
|
||||
oes_req = OesOrdCancelReqT()
|
||||
order_id = int(vt_req.orderid)
|
||||
internal_order = self.order_manager.get_from_order_id(order_id)
|
||||
if internal_order.rpt_data:
|
||||
data = internal_order.rpt_data
|
||||
# oes_req.origClSeqNo = self.local_id_to_sys_id[int(order_id)]
|
||||
oes_req.origClOrdId = data.clOrdId
|
||||
oes_req.origClSeqNo = data.clSeqNo
|
||||
oes_req.origClEnvId = data.origClEnvId
|
||||
oes_req.mktId = data.mktId
|
||||
# oes_req.invAcctId = data.invAcctId
|
||||
# oes_req.mktId = data.mktId
|
||||
# oes_req.securityId = data.securityId
|
||||
else:
|
||||
data = internal_order.req_data
|
||||
oes_req.origClSeqNo = data.clSeqNo
|
||||
oes_req.mktId = internal_order.req_data.mktId
|
||||
|
||||
oes_req.clSeqNo = seq_id
|
||||
oes_req.invAcctId = ""
|
||||
oes_req.securityId = vt_req.symbol
|
||||
oes_req.userInfo = order_id
|
||||
ret = OesApi_SendOrderCancelReq(self.env.ordChannel,
|
||||
oes_req)
|
||||
|
||||
if ret >= 0:
|
||||
pass
|
||||
else:
|
||||
pass
|
||||
return
|
||||
|
||||
def schedule_query_order(self, internal_order: InternalOrder)->Thread:
|
||||
th = Thread(target=self.query_order, args=(internal_order, ))
|
||||
th.start()
|
||||
return th
|
||||
|
||||
def query_order(self, internal_order: InternalOrder) -> bool:
|
||||
f = OesQryOrdFilterT()
|
||||
if internal_order.req_data:
|
||||
f.clSeqNo = internal_order.req_data.clSeqNo
|
||||
f.mktId = internal_order.req_data.mktId
|
||||
f.invAcctId = internal_order.req_data.invAcctId
|
||||
else:
|
||||
f.clSeqNo = internal_order.rpt_data.origClSeqNo
|
||||
f.clOrdId = internal_order.rpt_data.origClOrdId
|
||||
f.clEnvId = internal_order.rpt_data.origClEnvId
|
||||
f.mktId = internal_order.rpt_data.mktId
|
||||
f.invAcctId = internal_order.rpt_data.invAcctId
|
||||
ret = OesApi_QueryOrder(self.env.qryChannel,
|
||||
f,
|
||||
self.on_query_order
|
||||
)
|
||||
return ret >= 0
|
||||
|
||||
def on_query_order(self,
|
||||
session_info: SGeneralClientChannelT,
|
||||
head: SMsgHeadT,
|
||||
body: Any,
|
||||
cursor: OesQryCursorT):
|
||||
data: OesOrdCnfmT = cast.toOesOrdItemT(body)
|
||||
i = self.order_manager.get_from_oes_data(data)
|
||||
vt_order = i.vt_order
|
||||
vt_order.status = STATUS_OES2VT[data.ordStatus]
|
||||
vt_order.volume = data.ordQty - data.canceledQty
|
||||
vt_order.traded = data.cumQty
|
||||
self.gateway.on_order(vt_order)
|
||||
return 1
|
||||
|
||||
def init_query_orders(self) -> bool:
|
||||
"""
|
||||
:note: this function can be called only before calling send_order
|
||||
:return:
|
||||
"""
|
||||
f = OesQryOrdFilterT()
|
||||
ret = OesApi_QueryOrder(self.env.qryChannel,
|
||||
f,
|
||||
self.on_init_query_orders
|
||||
)
|
||||
return ret >= 0
|
||||
|
||||
def on_init_query_orders(self,
|
||||
session_info: SGeneralClientChannelT,
|
||||
head: SMsgHeadT,
|
||||
body: Any,
|
||||
cursor: OesQryCursorT,
|
||||
):
|
||||
data: OesOrdCnfmT = cast.toOesOrdItemT(body)
|
||||
i = self.order_manager.get_remote_created_order_from_oes_data(data)
|
||||
if not i:
|
||||
order_id = self.order_manager.new_remote_id()
|
||||
|
||||
if data.bsType == eOesBuySellTypeT.OES_BS_TYPE_BUY:
|
||||
offset = Offset.OPEN
|
||||
else:
|
||||
offset = Offset.CLOSE
|
||||
|
||||
vt_order = OrderData(
|
||||
gateway_name=self.gateway.gateway_name,
|
||||
symbol=data.securityId,
|
||||
exchange=EXCHANGE_OES2VT[data.mktId],
|
||||
orderid=order_id if order_id else data.userInfo, # generated id
|
||||
direction=Direction.NET,
|
||||
offset=offset,
|
||||
price=data.ordPrice / 10000,
|
||||
volume=data.ordQty - data.canceledQty,
|
||||
traded=data.cumQty,
|
||||
status=STATUS_OES2VT[data.ordStatus],
|
||||
|
||||
# this time should be generated automatically or by a static function
|
||||
time=datetime.utcnow().isoformat(),
|
||||
)
|
||||
self.order_manager.save_remote_created(order_id, vt_order, data)
|
||||
self.gateway.on_order(vt_order)
|
||||
return 1
|
||||
else:
|
||||
vt_order = i.vt_order
|
||||
vt_order.status = STATUS_OES2VT[data.ordStatus]
|
||||
vt_order.volume = data.ordQty - data.canceledQty
|
||||
vt_order.traded = data.cumQty
|
||||
self.gateway.on_order(vt_order)
|
||||
return 1
|
||||
|
6
vnpy/gateway/oes/utils.py
Normal file
6
vnpy/gateway/oes/utils.py
Normal file
@ -0,0 +1,6 @@
|
||||
import os
|
||||
|
||||
mydir = os.path.dirname(__file__)
|
||||
config_template_path = os.path.join(mydir, "config_template.ini")
|
||||
with open(config_template_path, "rt", encoding='utf-8') as f:
|
||||
config_template = f.read()
|
Loading…
Reference in New Issue
Block a user