一、无锁队列
1.1、有锁队列和无锁队列
- 有锁队列:通过互斥锁或其他同步机制保证线程安全的队列,属于阻塞队列
- 无锁队列:通过原子操作实现线程安全的队列,属于非阻塞队列
1.2、锁的局限
- 线程阻塞带来的上下文切换开销
- 死锁风险
- 性能瓶颈,高并发下锁竞争激烈,吞吐量下降
1.3、非阻塞队列的层次
- 无阻塞(
Obstruction-free):- 系统整体不会因为任何线程挂起而阻塞
- 可能有个别线程需要重试
- 是最弱的非阻塞保证
- 无锁(
lock-free):- 至少一个线程成功,其他可能重试
- 依赖
cas等原子操作
- 无等待(
wait-free):- 所有线程都能成功,无重试
- 依赖
exchange等原子操作
1.4、设计队列
队列一般涉及到生产者和消费者,需要根据生产者和消费者的个数,来设计队列
- 单生产者单消费者:
SPSC- 最简单的无锁队列
- 一般使用
acquire,release内存序即可 - 性能最高
- 多生产者单消费者:
MPSC- 多个生产者竞争写入,一个消费者读取
- 单生产者多消费者:
SPMC- 一个生产者竞争写入,多个消费者读取
- 多生产者多消费者:
MPMC- 多个生产者竞争写入,多个消费者读取
1.5、理解volatile,内存屏障,原子操作之间的关系
volatile:- 防止编译器优化,确保每次访问都从内存读取/写入
- 不保证内存可见性,不提供多线程间的内存可见性保证
- 适用场景:
- 硬件寄存器访问(内存映射IO)
- 信号处理
- setjmp/longjmp场景
- 不适用场景:多线程同步(应该适用
std::atomic)
- 内存屏障(
std::atomic_thread_fence):解决可见性(同步性)与顺序性- 防止指令重排序
- 保证内存可见性
- 标准库:
std::atomic_thread_fenceLoad-Load屏障:确保屏障前的读操作先于屏障后的读操作Store-Store屏障:确保屏障前的写操作先于屏障后的写操作Load-Store屏障:确保屏障前的读操作先于屏障后的写操作Store-Load屏障:确保屏障前的写操作先于屏障后的读操作
- 原子操作:
- 提供不可分割的操作,确保多线程安全访问
- 标准库:
std::atomic - 内存序:
memory_order_relaxed:不保证内存可见性和顺序性,只保证原子性memory_order_acquire: 本线程后续的读操作不会被重排到此操作之前memory_order_release: 本线程之前的写操作不会被重排到此操作之后memory_order_acq_rel: 本线程之前的写操作不会被重排到此操作之后,本线程后续的读操作不会被重排到此操作之前memory_order_seq_cst: 全局顺序,保证内存可见性和顺序性,但性能较低
三者之间的关系
volatile≠ 原子操作 ≠ 内存屏障- 多线程同步的正确组合:
std::atomic+ 合适的内存序 volatile在多线程中几乎从不用于同步目的
1.6、无锁队列(lock-free)设计
ringbuffer:环形缓冲区数据结构:
- 可以使用数组:
- 基本实现:固定大小的数组 + 头尾索引(
head与tail) push操作,移动tail指针pop操作,移动head指针- 缺点:需要为最坏的情况预留空间,浪费内存
- 基本实现:固定大小的数组 + 头尾索引(
- 可以使用链表
- 可以进行动态伸缩
- 混合队列(
hybrid):- 即固定大小的小数组的链表,结合两者优点,负责度最高
- 可以使用数组:
操作:
push:进行写操作,需要进行判满pop:进行读操作,需要进行判空
队列需要存储的数据类型:
- 简单的数据类型(POD):兼容C的普通数据类型,如
int、char、float等 - 复杂的数据类型(非POD):如
struct、class(可能存在虚函数)等
注意:
为了兼顾,所以使用模板类
- 简单的数据类型(POD):兼容C的普通数据类型,如
要避免伪共享
- 什么是伪共享:
struct RingBuffer { std::atomic<int> producer_counter; // 生产者 std::atomic<int> consumer_counter; // 消费者 // 这两个变量可能在同一个64字节缓存行中 };一个生产者操作A,一个消费者操作B;二者没有数据竞争,但是:
- CPU的缓存是以缓存行为单位来加载数据的,缓存行的大小通常是64字节
- 线程1修改A或者线程2修改B,都会造成缓存行标记为脏,然后CPU会自动通过缓存一致性协议(MESI),将脏缓存行同步到其他核心
MESI频繁同步,会导致性能急剧下降
解决方案:
通过内存对齐,将A和B放在不同的缓存行当中
// 在C++当中,可以通过`alignas`来指定对齐方式 struct RingBuffer { alignas(64) std::atomic<int> producer_counter; // 生产者 alignas(64) std::atomic<int> consumer_counter; // 消费者 };优化:
- CPU对于位运算的优化比加减乘除要快,所以可以使用位运算来代替加减乘除
- 对
Capacity的大小,优化成2的幂次方 - 取模运算:
next = (curr + 1) % Capacity-------------->next = (curr + 1) & (Capacity - 1)
接口设计:
push:入队- 先进行判满,再进行入队操作
- 对于简单数据类型(
POD),直接使用std::atomic即可 - 对于复杂的数据类型(
非POD),可以考虑用到完美转发
pop:出队- 先进行判空,再进行出队操作
size:获取队列大小
// 入队 template<typename T> bool push(U&& value) { // 1.先判满 const std::size_t w = m_write.load(std::memory_order_relaxed); // SPSC, 考虑到性能最高,使用relaxed const std::size_t next_w = (w + 1) & (Capacity - 1); // 哨兵 if (next_w == m_read.load(std::memory_order_acquire)) return false; // acquire 保证读到最新的值 // 2.写入数据 new (&buffers[w]) T(std::forward<U>(value)); m_write.store(next_w, std::memory_order_release); // release 保证写入的值是最新的 return true; } bool pop(T& value) { // 1.先判空 const std::size_t r = m_read.load(std::memory_order_relaxed); if (r == m_write.load(std::memory_order_acquire)) return false; // 2.读取数据 value = std::move(*reinterpret_cast<T*>(&buffers[r])); // reinterpret_cast 对内存进行重新解释 reinterpret_cast<T *>(&buffers[r])->~T(); m_read.store((r + 1) & (Capacity - 1), std::memory_order_release); return true; } std::size_t size()const { const std::size_t r = m_read.load(std::memory_order_acquire); const std::size_t w = m_write.load(std::memory_order_acquire); return w >= r ? (w - r) : (Capacity - r + w); }二、无锁编程中的问题
2.1、ABA问题
- 什么是ABA问题:
- ABA问题是指,线程1读取到变量A,线程2将变量A修改为B,再修改回A,线程1再次读取到变量A,但线程1无法判断变量A是否被修改过。
- ABA问题的危害:
- 可能导致数据不一致
- 可能导致逻辑错误
- 在内存回收中尤其危险
- 解决方案:
- 使用版本号,每次修改变量时,增加版本号
struct VersionedValue { T value; std::atomic<uint64_t> version; };- 使用指针,每次修改变量时,修改指针指向的地址
struct TaggedPointer { void* ptr; uint64_t tag; // 每次修改递增 };- 使用原子引用,每次修改变量时,修改引用的值
三、有锁队列 VS 无锁队列
相对于无锁队列,设计就比较简单,基本上对于任何操作进行加锁,释放锁即可;但头疼点也随之而来,容易造成死锁。
3.1、选择容器
有锁队列的选择多种多样,可以直接选择STL容器,如std::queue、std::deque、std::list等,进行动态扩容。
3.2、实现
// 还是采用模板类方式构造 template<typename T, typename StorageType = std::deque<T>> class LockedQueue { private: std::mutex m_mutex; StorageType m_storage; public: LockedQueue() = default; ~LockedQueue() = default; void add(const T& item) { lock(); _queue.push_back(item); unlock(); } void lock() { this->_lock.lock(); } void unlock() { this->_lock.unlock(); } void pop_front() { std::lock_guard<std::mutex> lock(_lock); _queue.pop_front(); } };自旋锁和互斥锁的区别
- 自旋锁:
- 自旋锁是一种用户态忙等待锁,当线程A持有自旋锁,线程B尝试获取自旋锁时,线程B会一直循环等待,直到线程A释放自旋锁。
- 无上下文切换
- 自旋锁适用于锁持有时间短的场景,如果锁持有时间过长,会导致CPU资源浪费。
- 互斥锁:
- 互斥锁是一种内核态锁,当线程A持有互斥锁,线程B尝试获取互斥锁时,线程B会被挂起,直到线程A释放互斥锁。
- 有上下文切换
- 互斥锁适用于锁持有时间长的场景,如果锁持有时间短,会导致上下文切换的开销。
如何判断锁的持有时间?
- 通过锁的持有时间 > 用户态切内核态,内核态切用户态的时间和,那么使用互斥锁;反之,使用自旋锁。
四、 总结:
- 环形队列是一种逻辑数据结构,并非物理数据结构,实际上没有环形的内存空间。
- 无锁队列为什么要使用环形队列?
- 环形队列可以充分利用内存空间,避免内存碎片(
malloc/free的开销) - 对于原子操作友好,如果使用
STL队列,比如queue<int> q,入队可能会涉及到内存分配、节点构造、指针修改等操作,这些操作很难进行原子化,一般都是加锁。 - 对于常用的
STL-vector,在push_back时,如果空间不足,会进行扩容,扩容会触发重新分配内存;无法保证环形访问;而且删除效率低下。
- 环形队列可以充分利用内存空间,避免内存碎片(
- 无锁队列性能对比
| 类型 | 时间复杂度 | 适用场景 | 性能 |
|---|---|---|---|
| SPSC | O(1) | 流水线处理 | 最高 |
| MPSC | O(n) | 日志收集 | 中等 |
| SPMC | O(n) | 事件广播 | 中等 |
| MPMC | O(n²) | 通用队列 | 最低 |
- 无锁队列 VS 有锁队列
| 特性 | 无锁队列 | 有锁队列 |
|---|---|---|
| 开销 | 低(无线程上下文切换) | 高(有线程上下文切换) |
| 实现复杂度 | 高 | 低 |
| 内存开销 | 高 | 低 |
| 适用场景 | 高并发、低延迟 | 通用 |