@startuml title Data Recording database mongoDB as db participant runDataRecord as rundr participant Process as run participant MainEngine as me participant LogEngine as runle participant DataEngine as dte participant LogEngine as le participant DrEngine as dr participant Queue as drq participant Thread as drt participant eventEngine as ee participant EventProcessThread as eet participant Queue as eeq participant TimerThread as eetm participant "ibGateway:vtGateway" as gw participant "IbWrapper:IbApi" as wrap participant "IB Gateway/IB TWS" as ib activate rundr loop rundr->rundr: 检查是否处于交易时间段\n非交易时间段不起动子进程 rundr->rundr: 若处于非交易时间段\n并且有子进程存在,\n就终止子进程运行 rundr->run ** : 若处于交易时间段\n并且不存在子进程,\ncreate process activate run rundr->rundr: sleep(5) end loop deactivate rundr run->runle ** : create runle ||| run->ee ** :create evnetEngine object activate ee ee->ee ++ : init() ee->eeq ** : Create Queue ee->eet ** : Create event process thread note right: 事件处理子线程 ee->eetm ** : Create timer thread note right: 定时器子线程 return return object ||| run->me ** : new mainEngine(ee) activate me me->me ++ : init() me->ee ++ : ee.start() note right: 启动ee的子线程 ee->eet ++ #red : thread.start() ee->eetm ++ #green : timer.start() ee-->me--: return me->dte ** :创建DataEngine() activate dte dte->dte ++ : init() dte->dte ++ : loadContract() return ||| dte->dte ++ : registerEvent() dte->ee ++ : ee.register(交易类事件, 对应的事件处理句柄) note right **以下事件处理句柄都在DataEngine中定义** (EVENT_CONTRACT, processContractEvent) 合约 (EVENT_ORDER, processOrderEvent) 订单 (EVENT_TRADE, processTradeEvent) 交易 (EVENT_POSITION, processPositionEvent) 头寸 (EVENT_ACCOUNT, processAccountEvent) 帐号 (EVENT_LOG, processLogEvent) LOG (EVENT_ERROR, processErrorEvent) ERROR end note return return return return object me->le ** :创建LogEngine() activate le le->le ++ : init() return return me-->me--:return me-->run--: object ||| run->me ++ : addGateway(ibGateway) me->gw ** : Create IbGateway(ee, "IB") activate gw gw->gw ++ : init() note right ** 将ee保存到私有变量 ** 内部变量初始化为空 ** 获取连接IB G/W的信息 end note gw->wrap ** : Create IbWrapper(self) note right IbApi是基于Boost.python的封装, 用C++实现,但是可以提供给Python程序使用。 IbApid的功能上面,主要提供了主动函数 和回调函数。 end note activate wrap wrap->wrap ++ : 构造函数() return object return object return return object return ||| run->me ++ : addApp() me->dr**: new DrEngine(me,ee) activate dr dr->dr++:init() dr->drq **:Create Queue dr->drt **:Create Thread activate drt #blue dr->dr ++ : loadSetting() note right 从配置文件中加载 订阅设置到字典对象 end note dr->me ++ : me.subscribe(req, gateway) note left 根据配置文件,订阅TICK数据和BAR数据。 end note me->gw ++ : 订阅市场行情\ngw.subscribe(req) gw->gw : 如果网关没有连接,就先缓存req; return return return dr->ee++:register(TICK, handler) note right: 登记EVENT_TICK事件 return dr-->dr -- : dr-->me--: object ||| me->me ++ : 初始化其他内部变量 return run<--me -- : return ||| run->ee: ee.regiser() 注册日志处理事件 activate ee note right 登记EVENT_LOG事件处理句柄 登记EVENT_ERROR事件处理句柄 end note return ||| run->me++: me.Connect(IB) note right: 连接IB网关 me->gw ++ : gw.connect() gw-> wrap ++ : ibwrapper.eConnect(host, port, \nclientId, false) wrap-\ib ++ : 调用IB的C++代码,\n发送连接请求数据包到IB网关 return return gw->wrap ++ : ibwrapper.reqCurrentTime() wrap->ib ++: 调用IB的C++代码,\n发送服务器时间查询数据包到IB网关 return return return ib-/ wrap ++ : connectAck()\nIB网关发回响应数据包到IB的C++代码,\n该代码回调函数 return ib->wrap ++ : currentTime()\nIB网关发回响应数据包到IB的C++代码,\n该代码回调函数 wrap->gw ++ : 将缓存的req取出,进行实际订阅\nsubscribe(subscribeReq) gw->wrap ++ : 请求合约详细\nreqContractDetails(tickerId, contract) return gw->wrap ++ : 请求市场数据\nreqMktData(tickerId, contract, '',\nFalse, TagValueList()) return ||| return ||| return ib->wrap ++ : contractDetails(reqId, contractDetails)\nIB网关响应的合约信息通过回调函数传入 wrap -> gw ++ : OnContract(contract)\n推送事件 gw->ee ++ : ee.put(event(EVENT_CONTRACT, contract)) return return return ib->wrap ++ : tickPrice(tickerId, field, price, canAutoExecute)\nIB网关响应的合约信息通过回调函数传入 wrap -> gw ++ : OnTick(tick)\n推送事件 gw->ee ++ : ee.put(event(EVENT_TICK, tick)) return return return me->me ++ : dbConnect() me->db ++ : MongoClient() return dbClinet me->ee ++ : ee.register(EVENT_LOG, self.dbLogging) return return run<--me--:return ||| run->ee ++ : ee.register(EVENT_LOG, le.processLogEvent) return ||| run->ee ++ : ee.register(EVENT_ERROR, le.processErrorEvent) return ||| loop run->run: sleep(1) end loop deactivate run ||| gw->eeq ++: queue.put(tick/bar) return ||| eetm->ee -- : ee.__runTimer() activate ee #green loop ee->eeq ++ : queue.put(定时器事件) return ee->ee : 休眠1秒 end loop deactivate ee ||| eet->ee--: ee.run() activate ee #red loop ee->eeq ++ :queue.get() return event ee->ee ++ #red : __process(event) ee->dr ++ #red :call procecssTickEvent() dr->dr ++ #red :OnTick() dr->drq ++: queue.put(tick) return return dr->dr ++ #red :OnBar() dr->drq ++: queue.put(bar) return return return return end loop deactivate ee drt->dr -- :run() activate dr #blue loop dr->drq ++ :queue.get() drq-->dr --:data dr->me:insertDB(data) end loop deactivate dr me->db: 写入mongoDB @enduml