news 2026/4/21 1:45:56

C++ 笔记 高级线程同步原语与线程池实现

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
C++ 笔记 高级线程同步原语与线程池实现

std::thread基础上,C++11 还提供了std::condition_variable(条件变量)std::atomic(原子变量)两大高级同步原语,分别解决 “线程间协作通知” 和 “无锁数据竞争” 问题;而线程池则是对std::thread的高层封装,通过预创建线程池避免频繁创建 / 销毁线程的开销,是多线程开发的核心工具。本文将从底层原理、代码示例到完整实现,系统拆解这三大知识点。


一、std::condition_variable:线程间的条件同步

1.1 核心作用与原理

std::condition_variable用于线程间的协作通知:一个线程等待某个条件成立而阻塞,另一个线程在条件成立时通知阻塞线程继续执行。它必须与std::unique_lock<std::mutex>配合使用 ——mutex 保护共享条件,条件变量负责线程的阻塞与唤醒,二者结合解决 “忙等待(Busy Wait)” 问题(避免线程空转浪费 CPU)。

1.2 关键函数

表格

函数作用
wait(lock)阻塞当前线程,直到被通知;自动释放锁,被唤醒后重新获取锁
wait(lock, pred)带谓词的wait:阻塞直到被通知且pred()true(自动处理虚假唤醒)
notify_one()唤醒一个等待的线程
notify_all()唤醒所有等待的线程

1.3 经典示例:生产者 - 消费者模型

这是条件变量最典型的应用场景:生产者线程生成数据放入队列,消费者线程从队列取数据处理,队列满时生产者阻塞,队列空时消费者阻塞。

#include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <chrono> std::queue<int> data_queue; // 共享数据队列 std::mutex mtx; // 保护队列的互斥锁 std::condition_variable cv; // 条件变量 const int MAX_QUEUE_SIZE = 5; // 队列最大容量 // 生产者线程:生成数据放入队列 void producer() { for (int i = 1; i <= 10; ++i) { { std::unique_lock<std::mutex> lock(mtx); // 队列满时阻塞,等待消费者取数据(谓词防止虚假唤醒) cv.wait(lock, []() { return data_queue.size() < MAX_QUEUE_SIZE; }); data_queue.push(i); std::cout << "[生产者] 放入数据: " << i << ",队列大小: " << data_queue.size() << "\n"; } cv.notify_one(); // 通知一个消费者线程 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产耗时 } } // 消费者线程:从队列取数据处理 void consumer(int id) { while (true) { int data; { std::unique_lock<std::mutex> lock(mtx); // 队列空时阻塞,等待生产者放数据;同时检查是否生产结束 cv.wait(lock, []() { return !data_queue.empty(); }); data = data_queue.front(); data_queue.pop(); std::cout << "[消费者 " << id << "] 取出数据: " << data << ",队列大小: " << data_queue.size() << "\n"; } cv.notify_one(); // 通知一个生产者线程 std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 模拟消费耗时 if (data == 10) break; // 生产结束,退出循环 } } int main() { std::thread prod(producer); std::thread cons1(consumer, 1); std::thread cons2(consumer, 2); prod.join(); cons1.join(); cons2.join(); return 0; }

1.4 关键细节:虚假唤醒

wait()可能在没有被notify_one()/notify_all()调用时被唤醒(称为 “虚假唤醒”),因此必须用带谓词的wait,谓词用于验证条件是否真正成立,确保线程安全。


二、std::atomic:无锁原子操作

2.1 核心作用与原理

std::atomic提供无锁的原子操作,底层依赖硬件的原子指令(如 x86 的LOCK前缀),比std::mutex更轻量,适合简单的共享数据(如计数器、标志位),避免了锁的开销和死锁风险。

2.2 基本用法

#include <iostream> #include <thread> #include <atomic> #include <vector> // 1. 原子计数器:无锁实现,多线程安全 std::atomic<int> atomic_count(0); // 对比:非原子计数器(会有数据竞争) int unsafe_count = 0; void atomic_increment() { for (int i = 0; i < 10000; ++i) { atomic_count++; // 原子自增操作,底层是硬件原子指令 } } void unsafe_increment() { for (int i = 0; i < 10000; ++i) { unsafe_count++; // 非原子操作,多线程下会出错 } } int main() { // 测试原子计数器 std::vector<std::thread> threads1; for (int i = 0; i < 10; ++i) { threads1.emplace_back(atomic_increment); } for (auto& t : threads1) t.join(); std::cout << "[原子计数器] 最终结果: " << atomic_count << "(期望 100000,实际一致)\n"; // 测试非原子计数器 std::vector<std::thread> threads2; for (int i = 0; i < 10; ++i) { threads2.emplace_back(unsafe_increment); } for (auto& t : threads2) t.join(); std::cout << "[非原子计数器] 最终结果: " << unsafe_count << "(期望 100000,实际可能更小)\n"; // 2. 原子标志位:用于线程间的停止信号 std::atomic<bool> stop_flag(false); std::thread worker([&stop_flag]() { while (!stop_flag) { std::cout << "[工作线程] 运行中...\n"; std::this_thread::sleep_for(std::chrono::milliseconds(500)); } std::cout << "[工作线程] 收到停止信号,退出\n"; }); std::this_thread::sleep_for(std::chrono::seconds(2)); stop_flag = true; // 原子设置标志位,通知工作线程停止 worker.join(); return 0; }

2.3 内存序简介(进阶)

std::atomic支持不同的内存序(Memory Order),用于控制多线程下的指令重排和可见性,默认是std::memory_order_seq_cst(顺序一致性,最安全但性能稍低),其他内存序(如memory_order_relaxedmemory_order_acquirememory_order_release)可用于优化性能,但需谨慎使用(容易出错)。


三、线程池:高效的线程管理

3.1 为什么需要线程池?

直接使用std::thread有两个核心问题:

  1. 频繁创建 / 销毁线程开销大:线程创建需要分配栈空间、内核态切换,销毁也需要回收资源,短任务场景下开销甚至超过任务本身。
  2. 线程数量不可控:无限制创建线程会导致系统资源耗尽、CPU 调度开销过大。

线程池的核心思想是:预创建一组线程(固定数量或动态调整),将任务提交到任务队列,线程池中的线程循环从队列取任务执行,避免了频繁创建 / 销毁线程的开销,同时控制了线程数量。

3.2 线程池的核心组成

一个完整的线程池包含以下部分:

  1. 线程数组:预创建的工作线程集合。
  2. 任务队列:存储待执行任务的队列(需用 mutex 和条件变量保护)。
  3. 同步机制std::mutex保护任务队列,std::condition_variable通知工作线程有新任务。
  4. 任务提交接口:支持提交任意可调用对象(函数、lambda、成员函数等),并返回std::future获取任务结果。

3.3 完整实现:一个可提交任务、获取结果的线程池

#include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <vector> #include <functional> #include <future> #include <memory> #include <stdexcept> class ThreadPool { public: // 构造函数:创建指定数量的工作线程 ThreadPool(size_t num_threads) : stop(false) { for (size_t i = 0; i < num_threads; ++i) { // 每个工作线程循环执行:从任务队列取任务并执行 workers.emplace_back([this]() { while (true) { std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queue_mutex); // 阻塞直到:线程池停止 或 有新任务 this->condition.wait(lock, [this]() { return this->stop || !this->tasks.empty(); }); // 线程池停止且任务队列为空,退出线程 if (this->stop && this->tasks.empty()) return; // 从队列取任务 task = std::move(this->tasks.front()); this->tasks.pop(); } // 执行任务 task(); } }); } } // 析构函数:停止所有线程 ~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; // 设置停止标志 } condition.notify_all(); // 唤醒所有工作线程 for (std::thread& worker : workers) { worker.join(); // 等待所有线程退出 } } // 禁止拷贝和移动 ThreadPool(const ThreadPool&) = delete; ThreadPool& operator=(const ThreadPool&) = delete; // 核心接口:提交任务,返回 std::future 获取结果 template<class F, class... Args> auto submit(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; // 将任务包装为 std::packaged_task,支持获取返回值 auto task = std::make_shared<std::packaged_task<return_type()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); // 线程池停止后禁止提交任务 if (stop) throw std::runtime_error("submit on stopped ThreadPool"); // 将任务放入队列(包装为 std::function<void()>) tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); // 通知一个工作线程有新任务 return res; } private: std::vector<std::thread> workers; // 工作线程数组 std::queue<std::function<void()>> tasks; // 任务队列 std::mutex queue_mutex; // 保护任务队列的互斥锁 std::condition_variable condition; // 条件变量:通知工作线程 bool stop; // 停止标志 }; // ------------------------------ 线程池使用示例 ------------------------------ // 示例1:简单的无返回值任务 void print_task(int id) { std::cout << "[线程池] 执行任务 " << id << ",线程ID: " << std::this_thread::get_id() << "\n"; std::this_thread::sleep_for(std::chrono::milliseconds(100)); } // 示例2:有返回值的任务 int add_task(int a, int b) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); return a + b; } int main() { // 创建线程池:4个工作线程 ThreadPool pool(4); std::cout << "[主线程] 线程池已启动,工作线程数量: 4\n"; // 提交10个无返回值任务 std::cout << "\n[主线程] 提交10个无返回值任务...\n"; for (int i = 0; i < 10; ++i) { pool.submit(print_task, i + 1); } // 提交3个有返回值任务,用 std::future 获取结果 std::cout << "\n[主线程] 提交3个有返回值任务...\n"; std::vector<std::future<int>> futures; futures.push_back(pool.submit(add_task, 10, 20)); futures.push_back(pool.submit(add_task, 30, 40)); futures.push_back(pool.submit(add_task, 50, 60)); // 获取并打印有返回值任务的结果 std::cout << "\n[主线程] 等待有返回值任务完成...\n"; for (size_t i = 0; i < futures.size(); ++i) { std::cout << "[主线程] 任务 " << i + 1 << " 结果: " << futures[i].get() << "\n"; } // 主线程等待一段时间,让无返回值任务执行完毕 std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << "\n[主线程] 所有任务执行完毕,线程池将自动析构\n"; return 0; } // 线程池析构,自动停止所有工作线程

3.4 关键细节

任务包装:用std::packaged_taskstd::bind将任意可调用对象包装为无参数的std::function<void()>,同时支持通过std::future获取返回值。

线程安全:任务队列的访问必须用std::mutex保护,条件变量用于通知工作线程有新任务或线程池停止。

析构安全:析构函数中先设置stop标志,再唤醒所有线程,最后join()等待线程退出,确保线程池安全销毁。


四、总结与最佳实践

  1. std::condition_variable:用于线程间协作通知,必须与std::unique_lock配合,优先用带谓词的wait()处理虚假唤醒,典型场景是生产者 - 消费者模型。
  2. std::atomic:用于无锁原子操作,比mutex轻量,适合简单共享数据(计数器、标志位),默认内存序memory_order_seq_cst最安全,进阶可按需调整。
  3. 线程池:是多线程开发的核心工具,通过预创建线程避免频繁创建 / 销毁开销,核心是 “任务队列 + 工作线程”,支持提交任意任务并获取结果,适合大量短任务场景。

掌握这三大知识点,能让你高效编写高性能、线程安全的 C++ 多线程程序。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/21 1:42:15

无线安灯系统解决自行车质检滞后问题

在自行车制造车间&#xff0c;质检环节曾是效率提升的“隐形瓶颈”。传统质检依赖人工巡检和纸质记录&#xff0c;问题发现滞后、反馈链条冗长&#xff0c;导致不良品积压、返工成本高企。如今&#xff0c;无线安灯系统的应用&#xff0c;正以实时响应、精准追溯的特性&#xf…

作者头像 李华
网站建设 2026/4/21 1:39:22

基于深度学习的YOLO11的河道垃圾识别 海洋垃圾检测与垃圾分类项目介绍

文章目录基于YOLOv8的河道及海洋垃圾检测与垃圾分类项目介绍一、YOLOv8简介二、项目背景与意义三、基于YOLOv8的垃圾检测与分类系统![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/2434d65e833b497ab5f750156f67c69e.png)四、数据集构建五、具体训练代码教程六、结论…

作者头像 李华
网站建设 2026/4/21 1:38:16

BilibiliDown:三步掌握B站视频下载与批量处理技巧

BilibiliDown&#xff1a;三步掌握B站视频下载与批量处理技巧 【免费下载链接】BilibiliDown (GUI-多平台支持) B站 哔哩哔哩 视频下载器。支持稍后再看、收藏夹、UP主视频批量下载|Bilibili Video Downloader &#x1f633; 项目地址: https://gitcode.com/gh_mirrors/bi/Bi…

作者头像 李华
网站建设 2026/4/21 1:32:42

基于无线通信的室内环境控制系统(有完整资料)

资料查找方式&#xff1a;特纳斯电子&#xff08;电子校园网&#xff09;&#xff1a;搜索下面编号即可编号&#xff1a;T1482310M设计简介&#xff1a;本设计是基于无线通信的室内环境控制系统&#xff0c;主要实现以下功能&#xff1a;通过温湿度传感器检测温湿度&#xff0c…

作者头像 李华
网站建设 2026/4/21 1:32:08

Rust 异步任务执行栈分析

Rust 异步任务执行栈分析 Rust 的异步编程模型凭借其高性能和低开销&#xff0c;在现代高并发系统中占据重要地位。异步任务执行栈作为其核心机制之一&#xff0c;直接影响任务的调度与执行效率。本文将深入分析 Rust 异步任务执行栈的工作原理&#xff0c;帮助开发者更好地理…

作者头像 李华
网站建设 2026/4/21 1:30:14

JAVA后端开发——为什么 Maven 在 IDEA 能成功,终端却报错?

在 Java 开发中&#xff0c;我们经常遇到一个“玄学”问题&#xff1a;在 IDEA 中双击 Maven 的 clean 再 install 一切正常&#xff0c;但一切换到终端执行 mvn clean install -DskipTests&#xff0c;却瞬间“满屏飘红”。这类问题看似偶发&#xff0c;实则背后有一套清晰的机…

作者头像 李华