一、固定式线程池的概念
固定式线程池是指在创建时就确定好线程数量的线程池实现。池内维护一组预先创建好的工作线程,所有提交的任务不会立刻执行,而是放入一个任务队列中,由这些固定数量的线程依次取出并执行。
特点:
- 线程数量固定
- 任务队列通常是有界的,防止内存无线膨胀
- 线程复用:任务执行完后,线程不会销毁,而是继续等待下一个任务
- 生产者-消费者模式:调用方(生产者)提交任务,工作线程(消费者)执行任务
与“动态线程池”(CachedThreadPool)不同,固定式线程池不会根据任务量动态增减线程,更适合 CPU 密集型场景或需要严格控制并发度的系统。
二、为什么会设计出固定式线程池
直接在每次任务到来时 new std::thread 的做法有严重缺陷:
- 创建/销毁线程开销大:每次创建线程都需要内核分配栈空间、上下文切换,销毁同样耗时。在高并发场景下,频繁创建线程会导致“线程爆炸”,系统瞬间生成成百上千个线程池,CPU、内存被耗尽。
- 资源无法控制:操作系统对线程总数有限制(Linux默认几千个),超出后程序直接崩溃。
- 性能不可预测:线程越多,上下文切换越频繁,整体吞吐量反而下降。
固定式线程池的解决方案是:
- 提前创建固定数量的线程(通常设为CPU核心数),彻底消除创建/销毁开销。
- 通过有界任务队列缓冲多余的任务。实现“生产者阻塞”或“队列满等待”,从而天然限流。
- 资源可预测、可控:程序启动时就能知道最多占用多少线程和内存,适合服务器、游戏服务器、后台任务处理等生产环境。
三、代码设计
同步队列(SyncQueue):
#include<list>#include<mutex>#include<condition_variable>#include<iostream>usingnamespacestd;constintMaxTaskCount=200;template<classT>classSyncQueue{private:std::list<T>m_queue;mutablestd::mutex m_mutex;std::condition_variable m_notEmpty;std::condition_variable m_notFull;intm_maxSize;boolm_needStop;boolIsFull()const{boolfull=m_queue.size()>=m_maxSize;if(full){cout<<"m_queue 已经满了,需要等待..."<<endl;}returnfull;}boolIsEmpty()const{boolempty=m_queue.empty();if(empty){cout<<"m_queue 已经空了,需要等待..."<<endl;}returnempty;}template<classF>voidAdd(F&&task){std::unique_lock<std::mutex>locker(m_mutex);m_notFull.wait(locker,[this]{returnm_needStop||!IsFull();});if(m_needStop){return;}m_queue.push_back(std::forward<F>(task));m_notEmpty.notify_one();}public:SyncQueue(intmaxsize):m_maxSize(maxsize),m_needStop(false){}~SyncQueue(){}voidPut(constT&task){Add(task);}voidPut(T&&task){Add(std::forward<T>(task));}voidTake(std::list<T>&list){std::unique_lock<std::mutex>locker(m_mutex);m_notEmpty.wait(locker,[this]{returnm_needStop||!IsEmpty();});if(m_needStop){return;}list=std::move(m_queue);m_notFull.notify_one();}voidTake(T&task){std::unique_lock<std::mutex>locker(m_mutex);m_notEmpty.wait(locker,[this]{returnm_needStop||!IsEmpty();});if(m_needStop){return;}task=m_queue.front();m_queue.pop_front();m_notFull.notify_one();}voidStop(){{std::lock_guard<std::mutex>locker(m_mutex);m_needStop=true;}m_notFull.notify_all();m_notEmpty.notify_all();}boolEmpty()const{std::lock_guard<std::mutex>locker(m_mutex);returnm_queue.empty();}boolFull()const{std::lock_guard<std::mutex>locker(m_mutex);returnm_queue.size()>=m_maxSize;}size_tSize()const{std::lock_guard<std::mutex>locker(m_mutex);returnm_queue.size();}size_tCount()const{returnm_queue.size();}};同步队列框架讲解:
SyncQueue 是整个线程池的“心脏”,它实现了线程安全的生产者-消费者有界队列。
关键成员:
std::list<T>m_queue;// 实际存储任务(list 便于整体移动)mutablestd::mutex m_mutex;// 保护队列和所有状态std::condition_variable m_notEmpty;// 消费者等待“非空”std::condition_variable m_notFull;// 生产者等待“非满”intm_maxSize;// 队列上限boolm_needStop;// 优雅停止标志核心方法:
1.IsFull() / IsEmpty()
在 condition_variable::wait 的 lambda 中被调用。
这些函数在持有 mutex 的情况下被调用(wait 的 predicate 语义保证),所以里面直接访问 m_queue.size() 是安全的。
打印调试信息方便读者观察“队列满/空时阻塞”的行为。
2,Add模板+Put (生产者接口)
template<classF>voidAdd(F&&task){std::unique_lock<std::mutex>locker(m_mutex);m_notFull.wait(locker,[this]{returnm_needStop||!IsFull();});if(m_needStop)return;m_queue.push_back(std::forward<F>(task));m_notEmpty.notify_one();// 只唤醒一个消费者(效率最高)}完美转发支持左值/右值,零拷贝。
wait + predicate 防止虚假唤醒和队列已满。
m_needStop 判断让 Stop 后立即返回,避免析构时死锁。
3.Take(消费者接口)
Take(T& task):单任务取出(最常用)。
Take(std::list& list):一次性取出所有任务(批量消费,可扩展)。
同样使用 m_notEmpty.wait + predicate + if(m_needStop) return;。
取出后 m_notFull.notify_one() 唤醒生产者。
4.Stop() —— 优雅停止
voidStop(){{std::lock_guard<std::mutex>locker(m_mutex);m_needStop=true;}m_notFull.notify_all();m_notEmpty.notify_all();}先在锁内设置标志,再在锁外 notify_all,避免通知丢失。
让正在 wait 的生产者和消费者立刻醒来并退出。
为什么这样设计?
双条件变量(notEmpty + notFull)是经典的有界缓冲区实现,能同时支持生产者阻塞和消费者阻塞。
m_needStop + predicate 是线程池优雅关闭的标准技巧,避免析构时线程还在 wait 导致程序 hang。
使用 std::list 而非 std::queue 是因为 Take(list) 可以 std::move 整个队列,性能极高(批量消费时优势明显)。
所有查询方法(Empty/Full/Size)都加锁。
固定式线程池(FixedThreadPool):
#include"SyncQueue.hpp"#include<functional>classFixedThreadPool{public:usingTask=std::function<void(void)>;private:std::list<std::shared_ptr<std::thread>>m_threadgroup;SyncQueue<Task>m_queue;std::atomic_bool m_running;std::once_flag m_flag;voidStart(intnumthreads){m_running=true;for(inti=0;i<numthreads;++i){m_threadgroup.push_back(std::make_shared<std::thread>(&FixedThreadPool::RunInThread,this));//m_threadgroup.push_back(std::shared_ptr<std::thread>(new thread(&FixedThreadPool::RunInThread,this)))}}voidRunInThread(){while(m_running){Task task;m_queue.Take(task);if(m_running&&task){task();}}}voidStopThreadGroup(){m_queue.Stop();m_running=false;for(auto&thread:m_threadgroup){if(thread){thread->join();}}m_threadgroup.clear();}public:FixedThreadPool(intnumThreads=std::thread::hardware_concurrency()):m_queue(MaxTaskCount),m_running(false){Start(numThreads);}~FixedThreadPool(){Stop();}voidStop(){std::call_once(m_flag,[this]{StopThreadGroup();});}voidAddTask(Task&&task){m_queue.Put(std::forward<Task>(task));}voidAddTask(constTask&task){m_queue.Put(task);}};固定式线程池框架讲解:
FixedThreadPool 完全建立在 SyncQueue 之上,职责清晰分离:
std::list<std::shared_ptr<std::thread>>m_threadgroup;// 线程组(必须用 shared_ptr,因为 thread 不可复制)SyncQueue<Task>m_queue;// 任务队列(MaxTaskCount=200)std::atomic_bool m_running;// 原子运行标志std::once_flag m_flag;// 防止重复 Stop关键方法:
Start(int numThreads):创建固定数量线程,每个线程跑 RunInThread。
RunInThread()(工作线程主循环):
while(m_running){Task task;m_queue.Take(task);// 阻塞等待任务if(m_running&&task)// 双重检查!task();}if (m_running && task) 是防止 Stop 后还执行任务的防护。
StopThreadGroup():
m_queue.Stop() → 唤醒所有 Take/Put。
m_running = false。
join() 所有线程。
清空线程组。
Stop() 使用 std::call_once 保证只停止一次(析构和手动 Stop 都安全)。
AddTask 直接调用 m_queue.Put,支持完美转发。
为什么要这样设计:
职责分离:SyncQueue 只管“任务存储+同步”,FixedThreadPool 只管“线程生命周期”,代码清晰、可维护。
默认线程数 = hardware_concurrency():最合理的 CPU 密集型默认值。
shared_ptr + list 是 C++ 中存储动态线程组的标准做法(thread 不可复制、不可移动到 vector 后又 join)。
std::once_flag 解决“析构时重复 Stop”导致的二次 join 崩溃问题。
生产者(AddTask)可能阻塞(队列满),这正是固定线程池的“背压”机制,防止任务无限堆积。
测试代码
#include"FixedThreadPool.hpp"#include<future>FixedThreadPoolpool(4);std::mutex g_cout_mutex;voidAdd(inta,intb,std::promise<int>&c_promise){cout<<"add begin ..."<<endl;std::this_thread::sleep_for(std::chrono::milliseconds(2000));intc=a+b;c_promise.set_value(c);std::this_thread::sleep_for(std::chrono::milliseconds(1000));cout<<"add end ..."<<endl;}voidadd_a(){std::promise<int>c_promise;std::future<int>a_future=c_promise.get_future();std::function<void(void)>f=std::bind(Add,10,20,std::ref(c_promise));pool.AddTask(f);{std::unique_lock<std::mutex>locker(g_cout_mutex);cout<<"add_a:"<<a_future.get()<<endl;}}voidadd_b(){std::promise<int>c_promise;std::future<int>a_future=c_promise.get_future();std::function<void(void)>f=std::bind(Add,20,30,std::ref(c_promise));pool.AddTask(f);{std::unique_lock<std::mutex>locker(g_cout_mutex);cout<<"add_b:"<<a_future.get()<<endl;}}voidadd_c(){std::promise<int>c_promise;std::future<int>a_future=c_promise.get_future();std::function<void(void)>f=std::bind(Add,30,40,std::ref(c_promise));pool.AddTask(f);{std::unique_lock<std::mutex>locker(g_cout_mutex);cout<<"add_c:"<<a_future.get()<<endl;}}intmain(){std::threadtha(add_a);std::threadthb(add_b);std::threadthc(add_c);tha.join();thb.join();thc.join();return0;}测试代码的目的:
- 验证多生产者 + 多消费者并发:3 个线程(tha/thb/thc)同时调用 AddTask,池内 4 个工作线程同时执行。
- 验证异步执行 + 结果返回:每个任务用 std::promise + std::future 把计算结果(a+b)传回主线程。future.get() 会阻塞调用方,直到工作线程 set_value。
- 验证任务执行的耗时与交错:Add 函数里 sleep(2000ms) + sleep(1000ms),让 cout 输出 begin/end 明显交错,肉眼可见“任务在后台并行执行”。
- 验证线程池在析构时的正确停止:main 结束时 ~FixedThreadPool 调用 Stop(),此时所有任务已完成(因为 get() 已返回),不会丢失任务。
- 验证互斥输出:g_cout_mutex 保护打印 add_a:30 等结果,避免多线程 cout 乱序。