本文共 3004 字,大约阅读时间需要 10 分钟。
ThreadPool::ThreadPool(const string& nameArg) : mutex_(), //线程锁 notEmpty_(mutex_), notFull_(mutex_), name_(nameArg), //线程池处理的最大的任务数 maxQueueSize_(0), //标识线程池是否正在运行 running_(false){ }
ThreadPool::~ThreadPool(){ //如果线程池正在运行,停止线程池 if (running_) { stop(); }}
void ThreadPool::start(int numThreads){ //判断线程池是否为空 assert(threads_.empty()); //线程池的运行状态设置为true running_ = true; //设置线程池的线程的个数 threads_.reserve(numThreads); //创建numsThreads 个线程,并启动这些线程 //这些线程在没有任务的时候进入睡眠状态,在Thread类中可以看出 for (int i = 0; i < numThreads; ++i) { char id[32]; snprintf(id, sizeof id, "%d", i+1); threads_.emplace_back(new muduo::Thread( std::bind(&ThreadPool::runInThread, this), name_+id)); //启动线程,在Thread类中,启动一个线程 threads_[i]->start(); } if (numThreads == 0 && threadInitCallback_) { threadInitCallback_(); }}
//用于回收所有的线程void ThreadPool::stop(){ { MutexLockGuard lock(mutex_); running_ = false; //唤醒所有的线程 notEmpty_.notifyAll(); } for (auto& thr : threads_) { thr->join(); }}
void ThreadPool::run(Task task){ if (threads_.empty()) { //如果没有子线程,就在主线程中执行该任务 task(); } else { //对线程池队列加锁 MutexLockGuard lock(mutex_); //如果任务队列满了,即线程池中的任务数达到了上限,就调用wait阻塞,直到可以添加任务位置 while (isFull()) { notFull_.wait(); } assert(!isFull()); //将任务放入队列 queue_.push_back(std::move(task)); //唤醒一个线程取执行任务 notEmpty_.notify(); }}
ThreadPool::Task ThreadPool::take(){ MutexLockGuard lock(mutex_); // always use a while-loop, due to spurious wakeup //如果任务队列为空,并且线程池处于运行状态 //则线程池中的线程wait 进入睡眠状态 while (queue_.empty() && running_) { //当前的子线程处于睡眠的状态 notEmpty_.wait(); } Task task; //任务队列不为空的时候 if (!queue_.empty()) { //获取队列首部的任务 task = queue_.front(); //出队列一个任务 queue_.pop_front(); if (maxQueueSize_ > 0) { //此时任务队列肯定不满,可以添加新的任务 notFull_.notify(); } } //将获得的任务返回执行 return task;}
void ThreadPool::runInThread(){ try { if (threadInitCallback_) { threadInitCallback_(); } while (running_) { //Task task(take()); 返回一个任务 Task task(take()); if (task) { //执行任务队列获得的任务 task(); } } } catch (const Exception& ex) { fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str()); fprintf(stderr, "reason: %s\n", ex.what()); fprintf(stderr, "stack trace: %s\n", ex.stackTrace()); abort(); } catch (const std::exception& ex) { fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str()); fprintf(stderr, "reason: %s\n", ex.what()); abort(); } catch (...) { fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str()); throw; // rethrow }}
转载地址:http://banwi.baihongyu.com/