This commit is contained in:
nanoric 2019-03-07 23:13:18 -04:00
parent a54699b11d
commit a280758809
3 changed files with 77 additions and 215 deletions

View File

@ -114,8 +114,6 @@ class OesMdMessageLoop:
data: MdsL2StockSnapshotBodyT = d.mktDataSnapshot.l2Stock
symbol = str(data.SecurityID)
tick = self.get_last_tick(symbol)
tick.limit_up = data.HighPx / 10000
tick.limit_down = data.LowPx / 10000
tick.open_price = data.OpenPx / 10000
tick.pre_close = data.ClosePx / 10000
tick.high_price = data.HighPx / 10000
@ -131,8 +129,6 @@ class OesMdMessageLoop:
data: MdsStockSnapshotBodyT = d.mktDataSnapshot.stock
symbol = data.SecurityID
tick = self.get_last_tick(symbol)
tick.limit_up = data.HighPx / 10000
tick.limit_down = data.LowPx / 10000
tick.open_price = data.OpenPx / 10000
tick.pre_close = data.ClosePx / 10000
tick.high_price = data.HighPx / 10000

View File

@ -19,30 +19,12 @@ class OesGateway(BaseGateway):
VN Trader Gateway for BitMEX connection.
"""
def on_tick(self, tick: TickData):
super().on_tick(tick)
def on_trade(self, trade: TradeData):
super().on_trade(trade)
def on_order(self, order: OrderData):
super().on_order(order)
def on_position(self, position: PositionData):
super().on_position(position)
def on_account(self, account: AccountData):
super().on_account(account)
def on_contract(self, contract: ContractData):
super().on_contract(contract)
default_setting = {
"td_ord_server": "tcp://106.15.58.119:6101",
"td_rpt_server": "tcp://106.15.58.119:6301",
"td_qry_server": "tcp://106.15.58.119:6401",
"md_tcp_server": "tcp://139.196.228.232:5103",
"md_qry_server": "tcp://139.196.228.232:5203",
"td_ord_server": "",
"td_rpt_server": "",
"td_qry_server": "",
"md_tcp_server": "",
"md_qry_server": "",
"username": "",
"password": "",
}
@ -81,10 +63,12 @@ class OesGateway(BaseGateway):
f.write(content)
self.td_api.connect(str(config_path))
self.td_api.query_account()
self.td_api.query_contracts()
self.td_api.query_position()
self.td_api.init_query_orders()
self.td_api.start()
self.md_api.connect(str(config_path))
@ -103,18 +87,17 @@ class OesGateway(BaseGateway):
def cancel_order(self, req: CancelRequest):
""""""
return self.td_api.cancel_order(req)
self.td_api.cancel_order(req)
def query_account(self):
""""""
return self.td_api.query_account()
self.td_api.query_account()
def query_position(self):
""""""
return self.query_position()
self.td_api.query_position()
def close(self):
""""""
self.md_api.stop()
self.td_api.stop()
pass

View File

@ -89,8 +89,6 @@ STATUS_OES2VT = {
class InternalOrder:
order_id: int = None
vt_order: OrderData = None
req_data: OesOrdReqT = None
rpt_data: OesOrdCnfmT = None
class OrderManager:
@ -99,9 +97,6 @@ class OrderManager:
self.last_order_id = 100000000
self._orders: Dict[int, InternalOrder] = {}
# key tuple: seqNo, ordId, envId, userInfo
self._remote_created_orders: Dict[Tuple[int, int, int, int], InternalOrder] = {}
@staticmethod
def hash_remote_order(data):
key = (data.origClSeqNo, data.origClOrdId, data.origClEnvId, data.userInfo)
@ -112,59 +107,24 @@ class OrderManager:
key = (data.clSeqNo, data.clOrdId, data.clEnvId, data.userInfo)
return key
def new_local_id(self):
id = self.last_order_id
self.last_order_id += 1
return id
def new_remote_id(self):
id = self.last_order_id
self.last_order_id += 1
return id
def save_local_created(self, order_id: int, order: OrderData, oes_req: OesOrdReqT):
def save_local_created(self, order_id: int, order: OrderData):
print(f"saved order, id:{order_id}")
self._orders[order_id] = InternalOrder(
order_id=order_id,
vt_order=order,
req_data=oes_req
)
def save_remote_created(self, order_id: int, vt_order: OrderData, data: OesOrdCnfmT):
internal_order = InternalOrder(
order_id=order_id,
vt_order=vt_order,
rpt_data=data
)
self._orders[order_id] = internal_order
key = self.hash_remote_order(data)
self._remote_created_orders[key] = internal_order
def save_remote_created(self, order_id: int, vt_order: OrderData):
return self.save_local_created(order_id, vt_order)
def get_from_order_id(self, id: int):
return self._orders[id]
def get_remote_created_order_from_oes_data(self, data):
"""
:return: internal_order if succeed else None, will check only remote created order
"""
try:
key = self.hash_remote_order(data)
except AttributeError:
key = self.hash_remote_trade(data)
try:
return self._remote_created_orders[key]
except KeyError:
return None
def get_from_oes_data(self, data):
try:
key = self.hash_remote_order(data)
except AttributeError:
key = self.hash_remote_trade(data)
try:
return self._remote_created_orders[key]
except KeyError:
order_id = key[3]
return self._orders[order_id]
return data.clSeqNo
def get_order_id_from_data(self, data):
return self.get_from_order_id(self.get_from_oes_data(data))
class OesTdMessageLoop:
@ -184,7 +144,7 @@ class OesTdMessageLoop:
self.th = Thread(target=self.message_loop)
self.message_handlers: Dict[int, Callable[[dict], None]] = {
eOesMsgTypeT.OESMSG_RPT_BUSINESS_REJECT: self.on_reject,
eOesMsgTypeT.OESMSG_RPT_BUSINESS_REJECT: self.on_order_rejected,
eOesMsgTypeT.OESMSG_RPT_ORDER_INSERT: self.on_order_inserted,
eOesMsgTypeT.OESMSG_RPT_ORDER_REPORT: self.on_order_report,
eOesMsgTypeT.OESMSG_RPT_TRADE_REPORT: self.on_trade_report,
@ -216,13 +176,13 @@ class OesTdMessageLoop:
return 1
def message_loop(self):
rtp_channel = self.env.rptChannel
rpt_channel = self.env.rptChannel
timeout_ms = 1000
is_timeout = SPlatform_IsNegEtimeout
is_disconnected = SPlatform_IsNegEpipe
while self.alive:
ret = OesApi_WaitReportMsg(rtp_channel,
ret = OesApi_WaitReportMsg(rpt_channel,
timeout_ms,
self.on_message)
if ret < 0:
@ -232,14 +192,14 @@ class OesTdMessageLoop:
# todo: handle disconnected
self.alive = False
break
pass
return
def on_reject(self, d: OesRspMsgBodyT):
def on_order_rejected(self, d: OesRspMsgBodyT):
error_code = d.rptMsg.rptHead.ordRejReason
error_string = error_to_str(error_code)
data: OesOrdRejectT = d.rptMsg.rptBody.ordRejectRsp
i = self.order_manager.get_from_oes_data(data)
if not data.origClSeqNo:
i = self.order_manager.get_from_order_id(data.clSeqNo)
vt_order = i.vt_order
if vt_order == Status.ALLTRADED:
@ -250,6 +210,8 @@ class OesTdMessageLoop:
self.gateway.on_order(vt_order)
self.gateway.write_log(
f"Order: {vt_order.vt_symbol}-{vt_order.vt_orderid} Code: {error_code} Rejected: {error_string}")
else:
self.gateway.write_log(f"Failed to cancel Order, id: {data.origClSeqNo}")
def on_order_inserted(self, d: OesRspMsgBodyT):
data = d.rptMsg.rptBody.ordInsertRsp
@ -257,7 +219,7 @@ class OesTdMessageLoop:
i = self.order_manager.get_from_oes_data(data)
vt_order = i.vt_order
vt_order.status = STATUS_OES2VT[data.ordStatus]
vt_order.volume = data.ordQty - data.canceledQty
vt_order.volume = data.ordQty
vt_order.traded = data.cumQty
self.gateway.on_order(vt_order)
@ -268,7 +230,7 @@ class OesTdMessageLoop:
i = self.order_manager.get_from_oes_data(data)
vt_order = i.vt_order
vt_order.status = STATUS_OES2VT[data.ordStatus]
vt_order.volume = data.ordQty - data.canceledQty
vt_order.volume = data.ordQty
vt_order.traded = data.cumQty
self.gateway.on_order(vt_order)
@ -279,6 +241,7 @@ class OesTdMessageLoop:
vt_order = i.vt_order
# vt_order.status = STATUS_OES2VT[data.ordStatus]
trade = TradeData(
gateway_name=self.gateway.gateway_name,
symbol=data.securityId,
@ -315,10 +278,10 @@ class OesTdMessageLoop:
exchange=EXCHANGE_OES2VT[data.mktId],
direction=Direction.NET,
volume=data.sumHld,
frozen=data.lockHld,
frozen=data.lockHld, # todo: to verify
price=data.costPrice / 10000,
# pnl=data.costPrice - data.originalCostAmt,
pnl=0, # todo: oes只提供日初持仓价格信息不提供最初持仓价格信息所以pnl只有当日的
pnl=0,
yd_volume=data.originalHld,
)
self.gateway.on_position(position)
@ -329,7 +292,7 @@ class OesTdMessageLoop:
balance = data.currentTotalBal
availiable = data.currentAvailableBal
# drawable = data.currentDrawableBal
account_id = data.custId
account_id = data.cashAcctId
account = AccountData(
gateway_name=self.gateway.gateway_name,
accountid=account_id,
@ -356,12 +319,19 @@ class OesTdApi:
self)
self.account_id = None
self.last_seq_index = 1 # 0 has special manning for oes
self.last_seq_index = 1000000 # 0 has special manning for oes
def get_new_seq_index(self):
"""note: not thread safe currently"""
# todo: add lock
index = self.last_seq_index
self.last_seq_index += 1
return index
def connect(self, config_path: str):
if not OesApi_InitAllByConvention(self.env, config_path, -1, self.last_seq_index):
pass
self.last_seq_index = self.env.ordChannel.lastOutMsgSeq + 1
self.last_seq_index = max(self.last_seq_index, self.env.ordChannel.lastOutMsgSeq + 1)
if not OesApi_IsValidOrdChannel(self.env.ordChannel):
pass
@ -383,15 +353,11 @@ class OesTdApi:
def join(self):
self.message_loop.join()
def query_account(self) -> bool:
return self.query_cash_asset()
def query_cash_asset(self) -> bool:
ret = OesApi_QueryCashAsset(self.env.qryChannel,
def query_account(self):
OesApi_QueryCashAsset(self.env.qryChannel,
OesQryCashAssetFilterT(),
self.on_query_asset
)
return ret >= 0
def on_query_asset(self,
session_info: SGeneralClientChannelT,
@ -403,7 +369,7 @@ class OesTdApi:
balance = data.currentTotalBal / 10000
availiable = data.currentAvailableBal / 10000
# drawable = data.currentDrawableBal
account_id = data.custId
account_id = data.cashAcctId
account = AccountData(
gateway_name=self.gateway.gateway_name,
accountid=account_id,
@ -469,60 +435,6 @@ class OesTdApi:
self.gateway.on_contract(contract)
return 1
def query_issue(self) -> bool:
f = OesQryIssueFilterT()
ret = OesApi_QueryIssue(self.env.qryChannel,
f,
self.on_query_issue
)
return ret >= 0
def on_query_issue(self,
session_info: SGeneralClientChannelT,
head: SMsgHeadT,
body: Any,
cursor: OesQryCursorT,
):
data = cast.toOesIssueItemT(body)
contract = ContractData(
gateway_name=self.gateway.gateway_name,
symbol=data.securityId,
exchange=EXCHANGE_OES2VT[data.mktId],
name=data.securityName,
product=PRODUCT_OES2VT[data.mktId],
size=data.qtyUnit,
pricetick=1,
)
self.gateway.on_contract(contract)
return 1
def query_etf(self) -> bool:
f = OesQryEtfFilterT()
ret = OesApi_QueryEtf(self.env.qryChannel,
f,
self.on_query_etf
)
return ret >= 0
def on_query_etf(self,
session_info: SGeneralClientChannelT,
head: SMsgHeadT,
body: Any,
cursor: OesQryCursorT,
):
data = cast.toOesEtfItemT(body)
contract = ContractData(
gateway_name=self.gateway.gateway_name,
symbol=data.securityId,
exchange=EXCHANGE_OES2VT[data.mktId],
name=data.securityId,
product=PRODUCT_OES2VT[data.mktId],
size=data.creRdmUnit, # todo: to verify! creRdmUnit : 每个篮子 (最小申购、赎回单位) 对应的ETF份数
pricetick=1,
)
self.gateway.on_contract(contract)
return 1
def query_stock_holding(self) -> bool:
f = OesQryStkHoldingFilterT()
ret = OesApi_QueryStkHolding(self.env.qryChannel,
@ -608,16 +520,14 @@ class OesTdApi:
self.query_stock()
# self.query_option()
# self.query_issue()
pass
def query_position(self):
self.query_stock_holding()
self.query_option_holding()
def send_order(self, vt_req: OrderRequest):
seq_id = self.last_seq_index
self.last_seq_index += 1 # note: thread un-safe here, conflict with on_query_order
order_id = self.order_manager.new_local_id()
seq_id = self.get_new_seq_index()
order_id = seq_id
oes_req = OesOrdReqT()
oes_req.clSeqNo = seq_id
@ -628,16 +538,16 @@ class OesTdApi:
oes_req.securityId = vt_req.symbol
oes_req.ordQty = int(vt_req.volume)
oes_req.ordPrice = int(vt_req.price * 10000)
oes_req.userInfo = order_id
oes_req.origClOrdId = order_id
order = vt_req.create_order_data(str(order_id), self.gateway.gateway_name)
order.direction = Direction.NET # fix direction into NET: stock only
self.order_manager.save_local_created(order_id, order)
ret = OesApi_SendOrderReq(self.env.ordChannel,
oes_req
)
order = vt_req.create_order_data(str(order_id), self.gateway.gateway_name)
order.direction = Direction.NET # fix direction into NET: stock only
if ret >= 0:
self.order_manager.save_local_created(order_id, order, oes_req)
self.gateway.on_order(order)
else:
self.gateway.write_log("Failed to send_order!")
@ -645,40 +555,21 @@ class OesTdApi:
return order.vt_orderid
def cancel_order(self, vt_req: CancelRequest):
seq_id = self.last_seq_index
self.last_seq_index += 1 # note: thread un-safe here
seq_id = self.get_new_seq_index()
oes_req = OesOrdCancelReqT()
order_id = int(vt_req.orderid)
internal_order = self.order_manager.get_from_order_id(order_id)
if internal_order.rpt_data:
data = internal_order.rpt_data
# oes_req.origClSeqNo = self.local_id_to_sys_id[int(order_id)]
oes_req.origClOrdId = data.clOrdId
oes_req.origClSeqNo = data.clSeqNo
oes_req.origClEnvId = data.origClEnvId
oes_req.mktId = data.mktId
# oes_req.invAcctId = data.invAcctId
# oes_req.mktId = data.mktId
# oes_req.securityId = data.securityId
else:
data = internal_order.req_data
oes_req.origClSeqNo = data.clSeqNo
oes_req.mktId = internal_order.req_data.mktId
oes_req.origClOrdId = internal_order.order_id
oes_req.mktId = EXCHANGE_VT2OES[vt_req.exchange]
oes_req.clSeqNo = seq_id
oes_req.origClSeqNo = order_id
oes_req.invAcctId = ""
oes_req.securityId = vt_req.symbol
oes_req.userInfo = order_id
ret = OesApi_SendOrderCancelReq(self.env.ordChannel,
OesApi_SendOrderCancelReq(self.env.ordChannel,
oes_req)
if ret >= 0:
pass
else:
pass
return
def schedule_query_order(self, internal_order: InternalOrder) -> Thread:
th = Thread(target=self.query_order, args=(internal_order,))
th.start()
@ -686,16 +577,8 @@ class OesTdApi:
def query_order(self, internal_order: InternalOrder) -> bool:
f = OesQryOrdFilterT()
if internal_order.req_data:
f.clSeqNo = internal_order.req_data.clSeqNo
f.mktId = internal_order.req_data.mktId
f.invAcctId = internal_order.req_data.invAcctId
else:
f.clSeqNo = internal_order.rpt_data.origClSeqNo
f.clOrdId = internal_order.rpt_data.origClOrdId
f.clEnvId = internal_order.rpt_data.origClEnvId
f.mktId = internal_order.rpt_data.mktId
f.invAcctId = internal_order.rpt_data.invAcctId
f.mktId = EXCHANGE_VT2OES[internal_order.vt_order.exchange]
f.clSeqNo = internal_order.order_id
ret = OesApi_QueryOrder(self.env.qryChannel,
f,
self.on_query_order
@ -735,9 +618,16 @@ class OesTdApi:
cursor: OesQryCursorT,
):
data: OesOrdCnfmT = cast.toOesOrdItemT(body)
i = self.order_manager.get_remote_created_order_from_oes_data(data)
if not i:
order_id = self.order_manager.new_remote_id()
try:
i = self.order_manager.get_from_oes_data(data)
vt_order = i.vt_order
vt_order.status = STATUS_OES2VT[data.ordStatus]
vt_order.volume = data.ordQty - data.canceledQty
vt_order.traded = data.cumQty
self.gateway.on_order(vt_order)
except KeyError:
# order_id = self.order_manager.new_remote_id()
order_id = self.order_manager.get_order_id_from_data(data)
if data.bsType == eOesBuySellTypeT.OES_BS_TYPE_BUY:
offset = Offset.OPEN
@ -748,7 +638,7 @@ class OesTdApi:
gateway_name=self.gateway.gateway_name,
symbol=data.securityId,
exchange=EXCHANGE_OES2VT[data.mktId],
orderid=order_id if order_id else data.userInfo, # generated id
orderid=order_id if order_id else data.origClSeqNo, # generated id
direction=Direction.NET,
offset=offset,
price=data.ordPrice / 10000,
@ -759,13 +649,6 @@ class OesTdApi:
# this time should be generated automatically or by a static function
time=datetime.utcnow().isoformat(),
)
self.order_manager.save_remote_created(order_id, vt_order, data)
self.gateway.on_order(vt_order)
return 1
else:
vt_order = i.vt_order
vt_order.status = STATUS_OES2VT[data.ordStatus]
vt_order.volume = data.ordQty - data.canceledQty
vt_order.traded = data.cumQty
self.order_manager.save_remote_created(order_id, vt_order)
self.gateway.on_order(vt_order)
return 1