[Add] TaskQueue增加terminate(),可以终止pop的等待

This commit is contained in:
nanoric 2019-02-18 03:12:20 -04:00
parent 3618044b36
commit 4dc63db71c
3 changed files with 815 additions and 793 deletions

View File

@ -25,6 +25,8 @@ struct Task
bool task_last; //是否为最后返回
};
class TerminatedError : std::exception
{};
class TaskQueue
{
@ -33,6 +35,8 @@ private:
mutex mutex_; //互斥锁
condition_variable cond_; //条件变量
bool _terminate = false;
public:
//存入新的任务
@ -48,15 +52,21 @@ public:
Task pop()
{
unique_lock<mutex> mlock(mutex_);
while (queue_.empty()) //当队列为空时
{
cond_.wait(mlock); //等待条件变量通知
}
cond_.wait(mlock, [&]() {
return !queue_.empty() || _terminate;
}); //等待条件变量通知
if (_terminate)
throw TerminatedError();
Task task = queue_.front(); //获取队列中的最后一个任务
queue_.pop(); //删除该任务
return task; //返回该任务
}
void terminate()
{
_terminate = true;
cond_.notify_all(); //通知正在阻塞等待的线程
}
};

View File

@ -206,85 +206,91 @@ void MdApi::OnRtnForQuoteRsp(CThostFtdcForQuoteRspField *pForQuoteRsp)
void MdApi::processTask()
{
while (this->active)
{
Task task = this->task_queue.pop();
switch (task.task_name)
{
case ONFRONTCONNECTED:
{
this->processFrontConnected(&task);
break;
}
try
{
while (this->active)
{
Task task = this->task_queue.pop();
switch (task.task_name)
{
case ONFRONTCONNECTED:
{
this->processFrontConnected(&task);
break;
}
case ONFRONTDISCONNECTED:
{
this->processFrontDisconnected(&task);
break;
}
case ONFRONTDISCONNECTED:
{
this->processFrontDisconnected(&task);
break;
}
case ONHEARTBEATWARNING:
{
this->processHeartBeatWarning(&task);
break;
}
case ONHEARTBEATWARNING:
{
this->processHeartBeatWarning(&task);
break;
}
case ONRSPUSERLOGIN:
{
this->processRspUserLogin(&task);
break;
}
case ONRSPUSERLOGIN:
{
this->processRspUserLogin(&task);
break;
}
case ONRSPUSERLOGOUT:
{
this->processRspUserLogout(&task);
break;
}
case ONRSPUSERLOGOUT:
{
this->processRspUserLogout(&task);
break;
}
case ONRSPERROR:
{
this->processRspError(&task);
break;
}
case ONRSPERROR:
{
this->processRspError(&task);
break;
}
case ONRSPSUBMARKETDATA:
{
this->processRspSubMarketData(&task);
break;
}
case ONRSPSUBMARKETDATA:
{
this->processRspSubMarketData(&task);
break;
}
case ONRSPUNSUBMARKETDATA:
{
this->processRspUnSubMarketData(&task);
break;
}
case ONRSPUNSUBMARKETDATA:
{
this->processRspUnSubMarketData(&task);
break;
}
case ONRSPSUBFORQUOTERSP:
{
this->processRspSubForQuoteRsp(&task);
break;
}
case ONRSPSUBFORQUOTERSP:
{
this->processRspSubForQuoteRsp(&task);
break;
}
case ONRSPUNSUBFORQUOTERSP:
{
this->processRspUnSubForQuoteRsp(&task);
break;
}
case ONRSPUNSUBFORQUOTERSP:
{
this->processRspUnSubForQuoteRsp(&task);
break;
}
case ONRTNDEPTHMARKETDATA:
{
this->processRtnDepthMarketData(&task);
break;
}
case ONRTNDEPTHMARKETDATA:
{
this->processRtnDepthMarketData(&task);
break;
}
case ONRTNFORQUOTERSP:
{
this->processRtnForQuoteRsp(&task);
break;
}
};
}
case ONRTNFORQUOTERSP:
{
this->processRtnForQuoteRsp(&task);
break;
}
};
}
}
catch (const TerminatedError&)
{
}
};
void MdApi::processFrontConnected(Task *task)

File diff suppressed because it is too large Load Diff