如何安装 libunifex。
libunifex 是一个 C++ 库,实现了统一的异步执行模型。以下是几种常见的安装方法:
从源码编译安装
# 克隆仓库gitclone https://github.com/facebookexperimental/libunifex.gitcdlibunifex# 创建构建目录mkdirbuildcdbuild# 配置和编译STRING=20很重要cmake..-DCMAKE_CXX_STANDARD:STRING=20cmake --build.-j$(nproc)# 安装(可能需要 sudo)sudocmake --install.使用包管理器
使用 vcpkg:
vcpkginstalllibunifex使用 Conan:
conaninstalllibunifex/0.4.0@依赖要求
- C++17 或更高版本的编译器
- CMake 3.12 或更高版本
在项目中使用
安装后,在你的 CMakeLists.txt 中:
find_package(unifex REQUIRED) target_link_libraries(your_target PRIVATE unifex::unifex)我选择的是源码编译安装
编译libunifex 时会自动下载googletest 源码
如果下载googletest可以吧url 改成gitee 的地址
- ExternalProject_Add(googletest
- GIT_REPOSITORY https://gitee.com/zhuzi677/edk2-googletest.git
一、SVG 图对应的传统异步栈
SVG 图展示了三个线程的栈信息:
- IO Thread(粉色块):
- 栈顶
#0 async_read_some_at(...) - 中间
#3 ctx.run() - 栈底
#5 __clone - 图中箭头指向 Threadpool,表示逻辑调用流转。
- 栈顶
- Threadpool threads(绿色块):
- 栈顶
#0 process_file(...) - 中间
#5 pool.run() - 栈底
#10 __clone - 箭头指向 Main thread。
- 栈顶
- Main thread(橙色块):
- 栈顶
#0 unifex::sync_wait(...) - 中间
#4 main() - 栈底
#7 __libc_start_main()
- 栈顶
特点
- 每个线程栈只显示自己线程的调用片段。
- 如果想理解整体调用链,需要跨线程分析箭头。
- 调试时不直观,异常无法直接定位到逻辑调用源。
二、改进后的异步栈(Better Async Stacks)
#0 async_read_some_at(...) ... #5 process_file(...) ... #12 unifex::sync_wait(...) ... #16 main() ... #19 __libc_start_main()特点
- 连续逻辑调用链:
- 异步操作的整个流程被“展开”成一条连续栈,从主线程到 IO 操作。
- 不再受线程边界限制。
- 跨线程可见:
- 虽然
async_read_some_at在 IO 线程,process_file在 Threadpool,sync_wait在 Main thread,但栈信息把它们按逻辑顺序拼接。
- 虽然
- 易于调试:
- 异常可以直接追溯到触发点。
- 调试人员无需手动合并线程栈。
三、数学类比
可以把异步调用链抽象为函数映射:
- 传统异步栈是分段函数:
- 每个线程为一段:
fIO(t),fPool(t),fMain(t) f_\text{IO}(t), \quad f_\text{Pool}(t), \quad f_\text{Main}(t)fIO(t),fPool(t),fMain(t) - 各段独立,无法直接得到整体逻辑。
- 每个线程为一段:
- 改进异步栈是拼接的连续函数:
F(t)=fMain(t)→fPool(t)→fIO(t) F(t) = f_\text{Main}(t) \to f_\text{Pool}(t) \to f_\text{IO}(t)F(t)=fMain(t)→fPool(t)→fIO(t)- 用箭头→\to→表示逻辑调用流。
- 可以直接看到完整的调用路径。
四、总结
| 特性 | 传统异步栈 | 改进异步栈 |
|---|---|---|
| 线程边界 | 有,显示线程局部栈 | 跨线程,按逻辑顺序显示 |
| 可读性 | 低,需要手动合并 | 高,一条连续栈 |
| 调试便利性 | 异常难追踪 | 异常可直接定位 |
| 数学类比 | 分段函数fi(t)f_i(t)fi(t) | 拼接函数F(t)F(t)F(t) |
核心理解:
传统栈是“线程切片”,改进栈是“逻辑展开”,它把异步操作的调用链抽象成一条连续的路径,使调试和理解流程直观化。
一、结构化并发(Structured Concurrency)
1. 概念
- 传统异步:任务创建和销毁的生命周期可能散落在不同线程和函数作用域中,导致异步栈难以完整追踪。
- 结构化并发:任务的生命周期被限制在一个明确的作用域内,从而可以形成完整的逻辑调用链。
- Lewis Baker 在 2020 年将这个概念引入 Folly。
- 你们团队将其引入 Unifex,使得异步栈可以连续可读。
数学类比:
- 每个任务可以看作一个函数fif_ifi,其生命周期由作用域SSS限定:
fi:S→R f_i : S \to Rfi:S→R - 当任务在作用域结束时自动完成或取消,多个任务形成嵌套关系:
Fstructured(S)=f1→f2→⋯→fn F_\text{structured}(S) = f_1 \to f_2 \to \dots \to f_nFstructured(S)=f1→f2→⋯→fn
这样就能形成完整的逻辑栈。
二、代码解析
intmain(intargc,char**argv){unifex::static_thread_pool pool;io_uring_context ctx;unifex::task<void>task=async_main({argv+1,argc-1},pool,ctx);unifex::sync_wait(std::move(task));return0;}1. 栈和线程关系
- 线程池创建:
unifex::static_thread_pool pool;- 创建一个静态线程池,供异步任务执行。
- 可以看作逻辑上任务执行的“容器”。
- IO 上下文创建:
io_uring_context ctx;- 用于处理高性能异步 IO。
- 逻辑上属于异步任务的一部分。
- 创建异步任务:
unifex::task<void>task=async_main({argv+1,argc-1},pool,ctx);async_main返回一个unifex::task<void>对象。- 任务会在
pool和ctx上下文中执行。 - 这里任务的生命周期受限于
task对象所在作用域(结构化并发)。
- 同步等待任务完成:
unifex::sync_wait(std::move(task));- 阻塞主线程直到
task完成。 - 由于结构化并发,整个异步调用链可以被追踪并展开成完整异步栈:
main()→sync_wait(task)→async_main(...)→pool/ctx tasks→… \text{main()} \to \text{sync\_wait(task)} \to \text{async\_main(...)} \to \text{pool/ctx tasks} \to \dotsmain()→sync_wait(task)→async_main(...)→pool/ctx tasks→…
- 阻塞主线程直到
2. 异步栈的形成机制
- 每个异步操作在创建时被注册到父作用域,形成任务树(Task Tree)。
- 当
sync_wait等待时,Unifex 可以沿任务树构建逻辑栈。 - 这样,即使任务在不同线程执行,异步栈仍然可以连续可读。
数学类比: - 异步调用关系形成有向图G=(V,E)G=(V,E)G=(V,E),其中:
- VVV是任务节点
- EEE是“父子任务”边
sync_wait使图沿着父子关系线性化:
G→linearizeFstacked(t) G \xrightarrow{\text{linearize}} F_\text{stacked}(t)GlinearizeFstacked(t)- 最终形成连续的逻辑栈FstackedF_\text{stacked}Fstacked,可用于调试。
3. 核心理解
- 结构化并发使异步栈可能:
- 限定任务作用域,形成清晰的父子关系。
- Unifex 的实现:
- 通过 task 树和 sync_wait 构建逻辑栈。
- 调试优势:
- 异步栈不再碎片化,跨线程调用链完整可见。
- 类似顺序执行栈,但实际执行可能是并发的。
#include<iostream>// std::cout, std::endl#include<vector>// std::vector#include<string>// std::string#include<unifex/static_thread_pool.hpp>// Unifex 静态线程池#include<unifex/task.hpp>// unifex::task 协程任务#include<unifex/sync_wait.hpp>// unifex::sync_wait 同步等待任务完成#include<unifex/scheduler_concepts.hpp>// scheduler 概念#include<unifex/on.hpp>// schedule 等调度函数// ============================================================================// 模拟 IO 上下文结构体// 在实际应用中可以封装 io_uring 或其他异步 IO// ============================================================================structio_uring_context{io_uring_context(){// 构造函数:初始化 IO 上下文// 可以在这里创建 io_uring 实例或其他异步资源}~io_uring_context(){// 析构函数:清理资源// 确保异步任务完成后释放底层资源}};// ============================================================================// async_main:核心异步任务函数// 参数:// args - 写死的参数列表// pool - 静态线程池,用于异步任务调度// ctx - IO 上下文(未使用,只是演示接口)// 返回值:unifex::task<void>,表示可等待的异步操作// ============================================================================unifex::task<void>async_main(conststd::vector<std::string>&args,unifex::static_thread_pool&pool,io_uring_context&/*ctx*/){// -------------------------------// 将任务调度到线程池执行// -------------------------------// pool.get_scheduler() 返回线程池调度器// co_await unifex::schedule(...) 会将协程挂起并安排在线程池线程执行co_awaitunifex::schedule(pool.get_scheduler());// -------------------------------// 异步处理逻辑// -------------------------------std::cout<<"Processing "<<args.size()<<" arguments asynchronously:\n";for(constauto&arg:args){std::cout<<" - "<<arg<<"\n";}// -------------------------------// 协程返回// -------------------------------co_return;}// ============================================================================// main 函数// ============================================================================intmain(){// -------------------------------// 创建静态线程池// -------------------------------// 线程池中有固定数量线程,用于调度 async_main 内部的协程unifex::static_thread_pool pool;// -------------------------------// 创建 IO 上下文// -------------------------------io_uring_context ctx;// -------------------------------// 参数写死在代码里// -------------------------------std::vector<std::string>args={"param1","param2","param3"};// -------------------------------// 创建异步任务// -------------------------------// async_main 返回 unifex::task<void> 协程任务unifex::task<void>task=async_main(args,pool,ctx);// -------------------------------// 同步等待异步任务完成// -------------------------------// sync_wait 会阻塞当前线程,直到 task 执行完成// 在结构化并发模型下,这保证了任务生命周期可追踪unifex::sync_wait(std::move(task));return0;}理解总结
- 静态线程池
- 提供固定线程数,用于调度异步协程。
- 调度器通过
pool.get_scheduler()获取,co_await schedule(...)将任务挂起并在线程池执行。
- 异步任务 async_main
- 返回
unifex::task<void>,表示可等待的异步协程任务。 co_await暂停协程,实现异步行为。co_return完成协程。
- 返回
- 同步等待
unifex::sync_wait阻塞当前线程直到任务完成。- 结合协程和线程池,实现跨线程异步调用链。
- 结构化并发
- 协程栈完整可追踪。
- 从
main()→async_main()→ 线程池调度的调用链在调试时可以清晰看到。
- 参数写死
- 不再依赖命令行参数。
- 输出示例:
Processing 3 arguments asynchronously: - param1 - param2 - param3
cmake
cmake_minimum_required(VERSION 3.22) project(AsyncUnifexDemo LANGUAGES CXX) # 使用 C++20 协程支持 set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) # 如果需要开启编译优化,可取消注释 # set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O2") # 查找 Unifex 库 # 假设你已经安装了 Unifex,并且提供了 find_package 支持 find_package(unifex REQUIRED) # 可执行文件 add_executable(async_demo main.cpp # 你的示例代码文件名 ) # 链接 Unifex 库 target_link_libraries(async_demo PRIVATE unifex::unifex) # 如果需要 pthread 支持(Linux 下通常需要) set(THREADS_PREFER_PTHREAD_FLAG ON) find_package(Threads REQUIRED) target_link_libraries(async_demo PRIVATE unifex::unifex Threads::Threads ${CMAKE_DL_LIBS} ) # 可选:开启编译器诊断信息 target_compile_options(async_demo PRIVATE $<$<CXX_COMPILER_ID:GNU>:-Wall -Wextra -Wpedantic> $<$<CXX_COMPILER_ID:Clang>:-Wall -Wextra -Wpedantic> )完整的Unifex 异步文件处理示例
#include<unifex/file_concepts.hpp>// 文件相关概念#include<unifex/inplace_stop_token.hpp>// 停止 token,用于取消异步任务#include<unifex/io_concepts.hpp>// IO 概念#include<unifex/linux/io_uring_context.hpp>// Linux io_uring 上下文#include<unifex/on.hpp>// 将任务调度到指定 scheduler#include<unifex/scheduler_concepts.hpp>// Scheduler 概念#include<unifex/span.hpp>// span 容器视图#include<unifex/static_thread_pool.hpp>// 静态线程池#include<unifex/sync_wait.hpp>// 同步等待异步任务完成#include<unifex/task.hpp>// 协程任务 task#include<unifex/then.hpp>// 链式 then#include<unifex/via.hpp>// 通过 scheduler 执行#include<unifex/when_all_range.hpp>// 等待一组任务完成#include<algorithm>#include<array>#include<cstdio>#include<filesystem>#include<iterator>#include<ranges>#include<thread>#include<utility>#include<vector>namespace{// 为文件系统、范围、视图创建别名namespacefs=std::filesystem;namespaceranges=std::ranges;namespaceviews=ranges::views;// -----------------------------------------------------------------------------// io_uring 上下文封装// -----------------------------------------------------------------------------// 负责异步 IO 调度与线程管理structio_uring_context{io_uring_context()=default;~io_uring_context(){// 请求停止 io_uring 线程stopSource_.request_stop();t_.join();// 等待线程退出,确保资源安全释放}// 获取 scheduler 用于调度异步任务autoget_scheduler()noexcept{returnctx_.get_scheduler();}private:unifex::inplace_stop_source stopSource_;// 停止信号源unifex::linuxos::io_uring_context ctx_;// Linux io_uring 上下文std::thread t_{[this]{ctx_.run(stopSource_.get_token());}};// 后台线程执行 IO};// -----------------------------------------------------------------------------// 异步读取文件函数// -----------------------------------------------------------------------------// file:文件对象// offset:偏移量// buffer:缓冲区unifex::senderautoasync_read_some_at(auto&file,int64_toffset,auto&buffer){// 将 buffer 转换为字节视图autooutputSpan=unifex::as_writable_bytes(unifex::span(buffer));// 返回一个 async_read_some_at sender,用于协程 co_awaitreturnunifex::async_read_some_at(file,offset,outputSpan);}// -----------------------------------------------------------------------------// 打开文件(只读)// -----------------------------------------------------------------------------// ioCtx:io_uring 上下文// filename:文件路径autoopen_file_read_only(auto&ioCtx,fs::path filename){returnunifex::open_file_read_only(ioCtx.get_scheduler(),filename);}// -----------------------------------------------------------------------------// 文件统计结构体// -----------------------------------------------------------------------------// chars:字符数(不含换行符)// lines:行数structword_stats{unsignedlongchars{};unsignedlonglines{};};// -----------------------------------------------------------------------------// 异步处理文件// -----------------------------------------------------------------------------// file:异步文件对象unifex::task<word_stats>process_file(autofile){word_stats result;std::array<char,4096>buffer;// 4KB 缓冲区int64_toffset=0;// 循环异步读取文件,直到 EOFwhile(std::size_t bytesRead=co_awaitasync_read_some_at(file,offset,buffer)){// 获取有效数据autovalidBytes=unifex::span(buffer.data(),bytesRead);// 统计当前缓冲区的换行符数autonewlines=ranges::count(validBytes,'\n');// 更新统计信息result.lines+=newlines;// 行数result.chars+=(bytesRead-newlines);// 字符数(减去换行符)offset+=bytesRead;// 更新文件偏移}co_returnresult;// 返回统计结果}// -----------------------------------------------------------------------------// 异步主函数// -----------------------------------------------------------------------------// args:命令行参数文件名列表// pool:线程池// io:io_uring 上下文unifex::task<void>async_main(unifex::span<char*>args,auto&pool,auto&io){// 将每个文件名映射为 process_file 异步任务autojobs=args|views::transform([&](fs::path fileName){autofile=open_file_read_only(io,fileName);returnprocess_file(std::move(file));});// 将所有任务调度到线程池执行,并等待全部完成// when_all_range 返回 std::vector<word_stats>autostats=co_awaitunifex::on(pool.get_scheduler(),unifex::when_all_range(jobs.begin(),jobs.end()));// 遍历结果,计算每个文件的平均单词长度for(std::size_t i=0;i<stats.size();++i){doublemean=(double)stats[i].chars/(double)stats[i].lines;// 数学公式表示:// $$ \text{mean}_i = \frac{\text{chars}_i}{\text{lines}_i} $$std::printf("Average word length in %s is %g\n",args[i],mean);}}}// namespace// -----------------------------------------------------------------------------// 主程序入口// -----------------------------------------------------------------------------intmain(intargc,char**argv){unifex::static_thread_pool pool;// 创建线程池io_uring_context ctx;// 创建 IO 上下文// 构造异步任务unifex::task<void>task=async_main({argv+1,argc-1},pool,ctx);// 同步等待异步任务完成unifex::sync_wait(std::move(task));return0;}理解总结
- Structured Concurrency
- 所有子任务都在父协程的
co_await下完成。 - 主协程
async_main会等待所有process_file任务完成。 - 类似数学公式表示为:
async_main=wait(process_file1∥process_file2∥⋯∥process_filen) \text{async\_main} = \text{wait}\big(\text{process\_file}_1 \parallel \text{process\_file}_2 \parallel \dots \parallel \text{process\_file}_n\big)async_main=wait(process_file1∥process_file2∥⋯∥process_filen)
- 所有子任务都在父协程的
- 异步文件读取
- 每次读取BBB字节(这里B=4096B=4096B=4096):
bytesRead=co_await async_read_some_at(file, offset, buffer) \text{bytesRead} = \text{co\_await async\_read\_some\_at(file, offset, buffer)}bytesRead=co_await async_read_some_at(file, offset, buffer) - 更新字符数与行数:
KaTeX parse error: Undefined control sequence: \n at position 31: …= \text{count('\̲n̲')} \ \text{cha…
- 每次读取BBB字节(这里B=4096B=4096B=4096):
- 线程池调度
unifex::on(pool.get_scheduler(), ...)将任务绑定到线程池。- 确保任务异步执行但在主协程中可等待。
- 协程/异步栈
- 每个
co_await是挂起点,协程状态存储在task<T>中。 - 结构化并发保证不会出现未捕获的异步任务。
如果你需要,我可以画出async stack 可视化图,显示main -> async_main -> process_file的调用关系以及协程挂起点,这样理解异步堆栈和 Structured Concurrency 会更直观。
你希望我画这个图吗?
好的,我们来对你贴出的代码进行详细的理解分析,并结合数学公式用.........和.........表示。你的代码核心是Structured Concurrency和Unifex 异步任务系统,主要实现了异步读取文件并统计字符和行数的功能。
- 每个
1⃣main函数
intmain(intargc,char**argv){unifex::static_thread_pool pool;io_uring_context ctx;unifex::task<void>task=async_main({argv+1,argc-1},pool,ctx);unifex::sync_wait(std::move(task));return0;}理解:
unifex::static_thread_pool pool;
创建一个静态线程池,用于调度异步任务。io_uring_context ctx;
创建 IO 上下文(基于io_uring),提供异步文件操作能力。async_main({argv + 1, argc - 1}, pool, ctx);- 将命令行参数转换为
unifex::span<char*>,跳过程序名。 - 调用异步主函数
async_main,返回unifex::task<void>。
- 将命令行参数转换为
unifex::sync_wait(std::move(task));- 阻塞当前线程直到异步任务完成。
- 这里体现了Structured Concurrency:主线程等待所有子任务完成再退出。
数学类比:
如果有nnn个文件任务fif_ifi并行处理,则sync_wait相当于在主线程上等待:
result=wait(f1∥f2∥⋯∥fn) \text{result} = \text{wait}\big(f_1 \parallel f_2 \parallel \dots \parallel f_n\big)result=wait(f1∥f2∥⋯∥fn)
2⃣async_main函数
unifex::task<void>async_main(unifex::span<char*>args,auto&pool,auto&io){autojobs=args|views::transform([io](fs::path fileName)->unifex::task<word_stats>{autofile=unifex::open_file_read_only(io,fileName);returnprocess_file(std::move(file));});autostats=co_awaitunifex::on(pool.get_scheduler(),unifex::when_all_range(jobs.begin(),jobs.end()));for(std::size_t i=0;i<stats.size();++i){doublemean=(double)stats[i].chars/(double)stats[i].lines;std::printf("Average word length in %s is %g\n",args[i],mean);}}理解:
jobs = args | views::transform(...)- 对每个文件名创建一个异步任务
process_file(file)。 views::transform返回一个延迟计算的 range,不立即执行。
- 对每个文件名创建一个异步任务
co_await unifex::on(...)- 将所有任务调度到线程池。
when_all_range表示等待所有异步任务完成,得到stats,类型是std::vector<word_stats>。- 结构化并发保证所有子任务都完成后,才继续下一步。
- 平均单词长度计算:
doublemean=(double)stats[i].chars/(double)stats[i].lines;用数学公式表示:
meani=charsilinesi \text{mean}_i = \frac{\text{chars}_i}{\text{lines}_i}meani=linesicharsi
- charsi\text{chars}_icharsi是第iii个文件的总字符数(不包括换行)。
- linesi\text{lines}_ilinesi是第iii个文件的总行数。
3⃣process_file函数
structword_stats{unsignedlongchars{};unsignedlonglines{}};unifex::task<word_stats>process_file(autofile){word_stats result;std::array<char,4096>buffer;int64_toffset=0;while(std::size_t bytesRead=co_awaitasync_read_some_at(file,offset,buffer)){autovalidBytes=unifex::span(buffer.data(),bytesRead);autonewlines=ranges::count(validBytes,'\n');result.lines+=newlines;result.chars+=(bytesRead-newlines);offset+=bytesRead;}co_returnresult;}理解:
- 定义
word_stats结构体,用于存储文件统计信息。 co_await async_read_some_at(file, offset, buffer)- 异步读取文件的一部分到
buffer。 - 返回读取的字节数
bytesRead。
- 异步读取文件的一部分到
ranges::count(validBytes, '\n')- 统计本次读取中换行符的数量。
- 更新结果:
lines+=newlines chars+=(bytesRead−newlines) \text{lines} += \text{newlines} \ \text{chars} += (\text{bytesRead} - \text{newlines})lines+=newlineschars+=(bytesRead−newlines) - 移动偏移量:
offset+=bytesRead \text{offset} += \text{bytesRead}offset+=bytesRead
- 循环直到文件读完。
co_return result;- 将统计信息返回给调用者,继续 async stack 的下一个 awaitable。
4⃣ 异步栈(Async Stack)原理
- 每个
co_await创建一个协程 frame,存储挂起状态。 Structured Concurrency保证子协程不会泄露,父协程等待所有子协程完成。- 异步堆栈类似:
main() └─ async_main() ├─ process_file(file1) ├─ process_file(file2) └─ process_file(file3)co_await挂起点相当于:
resume point=frame pointer+awaitable state \text{resume point} = \text{frame pointer} + \text{awaitable state}resume point=frame pointer+awaitable state- Unifex 的
task<T>封装了这个状态,并可在 scheduler 上恢复执行。
5⃣ 总结
- 结构化并发:父任务等待所有子任务完成,形成清晰 async stack。
- 异步任务调度:通过
unifex::on(pool.get_scheduler(), ...)将任务调度到线程池。 - 文件统计:异步读取文件,统计字符和行数,计算平均单词长度:
Average word length=charslines \text{Average word length} = \frac{\text{chars}}{\text{lines}}Average word length=lineschars - 协程实现:
co_await用于挂起异步操作。co_return返回协程结果。task<T>保存协程 frame,支持 resume。
一、第一张 SVG:线程之间的宏观调度关系
这张图的核心目的是:
“一个异步任务在 Main / ThreadPool / IO 三个执行域之间如何流转”
1⃣ 三大区域总览
左侧:IO Thread
IO Thread: async_read_some_at(...) ...对应代码里的:
unifex::linuxos::io_uring_context ctx_;std::thread t_{[this]{ctx_.run(stopSource_.get_token());}};含义:
- 这是io_uring 专用线程
- 只做一件事:
- 提交 IO 请求
- 等待内核完成
- 完成后触发 sender 的
set_value / set_error
关键点
async_read_some_at()
永远不会在调用它的线程真正执行 IO
而是:
提交 IO→io_uring 线程→完成回调 \text{提交 IO} \to \text{io\_uring 线程} \to \text{完成回调}提交IO→io_uring线程→完成回调
中间:Threadpool threads
Threadpool threads: process_file(...) ...对应:
unifex::static_thread_pool pool;co_awaitunifex::on(pool.get_scheduler(),when_all_range(...))含义:
process_file()协程:- 最初
- 每次 IO 完成后恢复
- 都运行在线程池线程上
这里非常重要的一点:
co_await async_read_some_at(...)
会让出线程池线程
线程池线程不会阻塞,而是去跑别的任务。
右侧:Main thread
Main thread: unifex::sync_wait(...) main() __libc_start_main()对应:
unifex::sync_wait(std::move(task));含义:
- 主线程:
- 启动 async graph
- 阻塞等待最终完成
- 不参与 IO
- 不参与协程执行
sync_wait=同步世界 ↔ 异步世界的边界
2⃣ 箭头含义(非常关键)
➡ IO → ThreadPool
async_read_some_at(...) ---> process_file(...)表示:
io_uring 完成一个 read
→ 调用 receiver.set_value
→ 恢复process_file协程
→调度到线程池
数学/抽象模型是:
IO 完成⇒continuation resume \text{IO 完成} \Rightarrow \text{continuation resume}IO完成⇒continuation resume
➡ ThreadPool → Main
process_file(...) ---> sync_wait(...)表示:
所有
when_all_range的 sender 完成
→sync_wait被唤醒
→ main 返回
3⃣ 这张图想告诉你的核心事实
不是:
一个线程从头跑到尾
而是:
一个逻辑任务在多个执行域之间跳转
| 逻辑阶段 | 实际线程 |
|---|---|
| 启动 | main |
| 计算 | pool |
| IO | io_uring |
| 继续计算 | pool |
| 汇总 | pool |
| 等待 | main |
二、第二张 SVG:协程 & Sender 的“真实调用栈”
这张图是给“调试器 / 栈回溯”看的
如果你用gdb / lldb看 backtrace,
看到的就是这张图的内容。
1⃣ 左:Pool thread
process_file(...) coro::resume() set_value() pool.run() __clone()含义分解
pool.run()
线程池 worker 的主循环:
while(true){task=pop();task();}coro::resume()
协程被恢复的瞬间
等价于:
handle.resume();process_file(...)
你的协程函数体:
while(...){bytesRead=co_awaitasync_read_some_at(...);}set_value()
这是Sender/Receiver 的完成回调
来自:
async_read_some_at(...)关键理解
Sender 并不会“返回值”
而是通过 set_value 触发 continuation
2⃣ 中间:Async frames
inject_stop_request_thunk() async_main()::lambda async_main() ...这是最容易让人迷惑,但也是最关键的部分
这些不是线程栈,而是:
异步组合层生成的“中间 continuation”
例如:
on(pool,when_all_range(...))等价于一个巨大的函数组合:
fn(fn−1(…f1(sender))) f_n(f_{n-1}(\dots f_1(sender)))fn(fn−1(…f1(sender)))
这些lambda / thunk:
- 负责
- 停止传播(stop_token)
- scheduler 切换
- sender → receiver glue
- 不是你写的
- 但真实存在于调用栈中
inject_stop_request_thunk()
来自:
unifex::inplace_stop_token负责:
- 如果
sync_wait请求取消 - 把 stop 请求注入到 sender graph
3⃣ 右:Main thread
pthread_wait sync_wait main __libc_start_main核心理解
sync_wait()的实现本质是:
block current thread until receiver.set_value() \text{block current thread until receiver.set\_value()}block current thread until receiver.set_value()
所以:
- main 线程:
- 一直阻塞
- 不参与任何协程执行
- 直到:
when_all_range的最后一个 sender 完成
4⃣ 箭头解释
➡ Pool → Async frames
set_value() → async_main lambda表示:
线程池线程调用 receiver.set_value
→ 执行 sender graph 中的 continuation
➡ Async → Main
async_main → sync_wait表示:
顶层 sender 完成
→ 唤醒 main thread
三、把两张图合起来的“终极理解”
1⃣ 你写的是“顺序代码”
while(...){bytesRead=co_awaitasync_read_some_at(...);}2⃣ 实际执行是“分布式状态机”
每个co_await都会:
- 挂起协程
- 线程返回线程池
- IO 在另一个线程执行
- 完成后恢复协程
数学抽象为:
Coroutine=State Machine \text{Coroutine} = \text{State Machine}Coroutine=State Machine
3⃣ 为什么 Unifex 的栈这么“深”
因为:
- Sender/Receiver 是组合模型
- 每个算法(on / via / when_all / then)
- 都是一个“函数层”
- 编译期展开
- 运行期体现为 thunk / lambda
四、如果你愿意,我可以继续做的事
一、整体一句话总结(先给结论)
这段程序是一个典型的 Unifex「协程 + Sender/Receiver + 多执行域」示例:
- 主线程只负责启动和等待
- 线程池负责 CPU 计算与协程恢复
- io_uring 线程负责真正的异步 IO
- 所有
co_await本质上都是状态机切换 + continuation 传递
数学上可以把整个程序抽象为:
Program=MainWait∘SchedulerSwitch∘AsyncIO∘CoroutineFSM \text{Program} = \text{MainWait} \circ \text{SchedulerSwitch} \circ \text{AsyncIO} \circ \text{CoroutineFSM}Program=MainWait∘SchedulerSwitch∘AsyncIO∘CoroutineFSM
二、代码层:你到底写了什么?
我们从静态代码结构开始。
1⃣io_uring_context:IO 执行域
structio_uring_context{unifex::inplace_stop_source stopSource_;unifex::linuxos::io_uring_context ctx_;std::thread t_{[this]{ctx_.run(stopSource_.get_token());}};};含义
ctx_:Unifex 封装的 io_uring 事件循环t_:专用 IO 线程run():- 阻塞等待内核完成事件
- 完成后调用 receiver 的
set_value / set_error
关键点
任何
async_read_some_at的真正 IO 都只会在这个线程发生
2⃣process_file:协程 = 状态机
unifex::task<word_stats>process_file(autofile)逻辑上你写的是
while(read){统计}实际上编译器生成的是
Coroutine≡State Machine \text{Coroutine} \equiv \text{State Machine}Coroutine≡State Machine
每个co_await都会:
- 保存局部状态(
offset,result,buffer) - 挂起协程
- 把 continuation 注册给 sender
- 当前线程立刻返回
这一行是“灵魂”
std::size_t bytesRead=co_awaitasync_read_some_at(...)它不是阻塞读,而是:
submit IO⇒suspend⇒resume later \text{submit IO} \Rightarrow \text{suspend} \Rightarrow \text{resume later}submit IO⇒suspend⇒resume later
3⃣async_main:Sender 图的构造器
autojobs=args|views::transform(...);co_awaitunifex::on(pool.get_scheduler(),unifex::when_all_range(jobs.begin(),jobs.end()));你在语义上写的是
“把所有文件丢给线程池并发处理,等全部完成”
实际构造的是一个Sender Graph
on └── when_all_range ├── process_file(file1) ├── process_file(file2) ├── ...重要
on(...)≠ “创建线程”- 它只是指定 continuation 在哪个 scheduler 上 resume
4⃣main + sync_wait:同步 / 异步边界
unifex::sync_wait(std::move(task));语义上等价于:
block main thread until final receiver.set_value() \text{block main thread until final receiver.set\_value()}block main thread until final receiver.set_value()
三、第一张 SVG:线程之间发生了什么?
这是宏观调度视角
左:IO Thread
SVG 中的:
async_read_some_at(...) ...对应代码:
unifex::async_read_some_at(file,offset,span)实际流程
- 协程提交 IO 请求
- io_uring 线程:
io_uring_wait_cqe- 内核完成
- 调用 receiver 的
set_value(bytesRead)
协程不在这里运行,只是 IO 完成通知
中:Threadpool threads
SVG 中:
process_file(...) ...对应:
unifex::static_thread_pool pool;发生的事
- 所有协程恢复都在这里
- 每次 IO 完成:
set_valuecoro::resume- 继续执行
process_file
线程池线程 = 协程执行者
右:Main thread
SVG 中:
unifex::sync_wait(...) main() __libc_start_main()主线程的真实状态
main() └── sync_wait() └── pthread_wait() └── (blocked)主线程不跑协程、不跑 IO
➡ 箭头解释
IO → Threadpool
IO 完成 → set_value → 协程恢复数学表达:
IO completion⇒continuation resume \text{IO completion} \Rightarrow \text{continuation resume}IO completion⇒continuation resume
Threadpool → Main
when_all_range 完成 → sync_wait 唤醒四、第二张 SVG:为什么栈这么“乱”?
这是调试器 / backtrace 视角
Pool thread 栈
process_file(...) coro::resume() set_value() pool.run() __clone()含义拆解
| 栈帧 | 来源 |
|---|---|
process_file | 你的协程 |
coro::resume | 协程恢复点 |
set_value | Sender/Receiver |
pool.run | 线程池 worker |
__clone | OS 线程 |
| 这是真实 CPU 调用栈 |
🔵 Async frames(最关键)
inject_stop_request_thunk() async_main()::lambda async_main()它们是什么?
Sender 组合产生的 continuation glue
来源包括:
onwhen_all_rangesync_wait- stop_token 注入
它们不是线程切换,也不是魔法
而是:
fn(fn−1(…f1(receiver))) f_n(f_{n-1}(\dots f_1(receiver)))fn(fn−1(…f1(receiver)))
inject_stop_request_thunk()
来自:
unifex::inplace_stop_token负责:
- main 请求取消
- 向 sender graph 广播 stop
Main thread 栈
pthread_wait sync_wait main __libc_start_main完全符合:
同步等待异步完成
➡ 箭头解释
Pool → Async frames
set_value() → continuation 链Async → Main
最终 set_value → sync_wait 唤醒五、把三层视角合成一张“心智模型”
1⃣ 你写的
co_awaitasync_read_some_at(...)2⃣ 编译器看到的
Suspend→Register continuation→Resume later \text{Suspend} \rightarrow \text{Register continuation} \rightarrow \text{Resume later}Suspend→Register continuation→Resume later
3⃣ 运行时发生的
| 阶段 | 线程 |
|---|---|
| 提交 IO | pool |
| 等待 IO | io_uring |
| 恢复协程 | pool |
| 汇总结果 | pool |
| 等待 | main |
六、为什么这个设计“高级但正确”
- 无阻塞
- 无忙等
- 执行域隔离
- 组合可扩展
- 符合 C++20 协程语义
这正是Unifex 的设计目标。
下面这段内容本质上是在回答一个非常“硬核”的问题:
在 C++20 协程 + Unifex 这种 sender/receiver 体系里,
我们如何把“真实 CPU 栈”和“逻辑异步调用栈”重新拼成一个人类能理解的调用链?
你给出的backtrace + SVG正好是这个问题的“解剖图”。
我按问题 → 原理 → 逐帧解释 → SVG 对照 → 可执行结论的顺序来讲。
一、问题到底是什么?
你在调试时看到的是这样的栈(简化):
0:unifex::task<word_stats>process_file<…>(async_read_only_file)()at main.cpp:70 // 用户定义的协程函数,返回 unifex::task<word_stats>// 这是当前“正在执行”的异步业务逻辑帧,也是 async 逻辑栈的最顶端 // 实际代码运行点在这里,但它的调用者并不在传统 C++ 调用栈中1:std::coroutine_handle<void>::resume[abi:ne180100]()const()at coroutine_handle.h:69 // C++20 协程 ABI 层入口 // 由外部调度器或 continuation 调用,用来恢复 process_file 协程帧执行 // 这是从“同步世界”跳入“协程帧”的标准桥梁2:unifex::continuation_handle<void>::resume()()at continuations.hpp:220 // Unifex 的 continuation 句柄 // 表示“上一个异步 sender 完成后继续执行”的抽象 // 开始进入 Unifex 自己维护的异步控制流3:unifex::continuation_handle<…_promise<word_stats>>::resume()()at continuations.hpp:311 // 带具体 promise 类型的 continuation // 知道如何恢复 task<word_stats>对应的 promise / 协程状态 // 这是 Unifex 将 sender/receiver 模型映射回协程的关键层4:…_sr_thunk_task<…>…inject_stop_request_thunk<…>(…_sa_task<word_stats>)()at task.hpp:824 // Unifex 自动生成的 thunk(中间包装层) // 在协程恢复前注入 stop_token / cancellation 逻辑 // 该层完全是库内部机制,不对应任何用户源代码5:auto unifex::connect_awaitable(task<void>, receiver auto&)()at connect_awaitable.hpp:234 // 将一个 awaitable(这里是 task<void>) // 连接为 sender → receiver 的执行关系 // 是“co_await 表达式”向 Unifex sender 体系过渡的关键节点6:unifex::task<void>async_main<…>(span<char*>, auto&, auto&)::'lambda'at main.cpp:85 // async_main 内部为 co_await 构造的匿名 lambda 协程 // 表示“当前 await 完成后应该继续执行的代码块”7:unifex::task<void>async_main<…>(span<char*>, auto&, auto&)::'lambda'at main.cpp:85 // 同一个 lambda 协程在不同恢复阶段出现的栈帧 // 反映了协程被多次 resume,而不是递归调用8:unifex::task<void>async_main<…>(span<char*>, auto&, auto&)()at main.cpp:89 // async_main 本身的协程函数 // 整个程序的异步顶层逻辑入口,相当于“异步版 main”9:auto unifex::on(scheduler auto&&, sender auto&&)const()at on.hpp:51 // unifex::on 定制点 // 指定 sender 必须在某个 scheduler(如 io_uring、线程池)上运行 // 这是执行上下文(线程 / 事件循环)切换的核心位置10: decltype(auto)tag_invoke(…)()at sender_for.hpp:61 // C++ 的 tag_invoke 定制点分发机制 // Unifex 通过它选择 on / connect / start 的具体实现 // 决定最终使用哪个调度器和执行策略11: auto unifex::_wsa::_make_sender<…>(auto&&, auto&&)()at with_scheduler_affinity.hpp:44 // with_scheduler_affinity 的内部实现 // 构造一个“绑定了 scheduler 的 sender” // 用来保证后续操作始终在指定调度器上执行12: decltype(auto)…_promise<void>::await_transform(…)()at task.hpp:384 // promise_type::await_transform // C++ 协程语义点:将 co_await 表达式转换为 awaiter // 这是语言级协程和 Unifex awaitable 的交汇处13: auto unifex::connect_awaitable(task<void>, receiver auto&)()at connect_awaitable.hpp:234 // 再一次出现的 connect_awaitable // 用于把 async_main 自身连接成最外层 sender // 建立整个异步执行链的根节点14: main()at main.cpp:103 // 传统同步 C++ main 函数 // 启动 async_main,并驱动其完成(run loop / sync_wait 等)15: main()at main.cpp:103 // 编译器生成的重复内联或展开帧 // 对理解逻辑无额外意义,可视为同一层16: main()at main.cpp:103 // 同上,属于同步调用栈展开细节17: main()at main.cpp:105 // main 函数的不同源代码行 // 仍然处于同步世界18: __libc_start_main()at ???:0 // C 运行时入口 // 操作系统 → C runtime → main 的启动路径终点process_file // 用户定义的协程函数(unifex::task<word_stats>) // 当前真正执行业务逻辑的地方;这是 async 栈中“最上面”的逻辑帧 coroutine_handle::resume // C++20 协程 ABI 入口 // 由外部调度器/continuation 调用,用来恢复 process_file 的协程帧执行 continuation_handle::resume // Unifex 的 continuation 机制 // 把“上一个 sender 的完成”继续传播到下一个 awaiter // 这是 Unifex 异步栈开始显式出现的地方 inject_stop_request_thunk // Unifex 内部生成的中间 thunk // 用于在协程恢复前注入 stop_token / cancellation 语义 // 不属于用户代码,但影响控制流 connect_awaitable // 把一个 awaitable(task / sender) // 连接成 sender-receiver 形式 // 这是“协程 await 语义 → sender 模型”的桥梁 async_main::lambda // async_main 内部为 co_await 构造的匿名协程 continuation // 表示“await 这个操作完成后继续做什么” async_main // 顶层用户异步入口(unifex::task<void>) // 通常是 async 版本的 main // 管理整个异步程序生命周期 on(...)// unifex::on(scheduler, sender)// 指定 sender 在某个 scheduler(如 io_uring、thread_pool)上运行 // 这是执行上下文切换的关键节点 tag_invoke // C++ 定制点机制(Customization Point Object) // Unifex 用它分派 on / connect / start 等操作 // 决定最终调用哪个平台/调度器实现 await_transform // promise_type::await_transform // 把 co_await 表达式转换成具体 awaiter // 是“语言级 co_await → 库级 sender”的入口 connect_awaitable // 第二次出现: // async_main 自身被连接成最外层 sender // 建立整个异步执行链的根节点 main // 同步世界的入口 // 启动 async_main,并阻塞或 run-loop 等待其完成 __libc_start_main // C 运行时入口 // 操作系统 → C runtime → main 的最后一步但你真正想知道的是:
“process_file 是从 main 调用过来的吗?”
“中间这些 async / continuation 到底算不算调用关系?”
而普通的stack unwinding 只知道“谁调用了谁”,
完全不知道“谁在逻辑上 await 了谁”。
二、这说什么?
开头三句是整个方法论的总结:
So what do we need?
- Walk the regular stack → 直到第一个 async frame
- Walk the async stack → 直到 async 链的末端
- Walk the regular stack back to main
这三步本质是在做:
Logical Stack====================CPU Stack Prefix⊕Async Continuation Chain⊕CPU Stack Suffix \text{Logical Stack} \\ ==================== \\ \text{CPU Stack Prefix} \\ \oplus \text{Async Continuation Chain}\\ \oplus \text{CPU Stack Suffix} \\Logical Stack====================CPU Stack Prefix⊕Async Continuation Chain⊕CPU Stack Suffix
三、什么是「regular stack」?什么是「async frame」?
1⃣ Regular stack(普通栈帧)
这是硬件 + ABI 意义上的栈:
ret*:返回地址prev*:上一帧指针data:局部变量
你的 SVG 里画的正是x86 / AArch64 通用栈模型。
特点:- 严格的 LIFO
- 由
call / ret控制 gdb backtrace天然支持
2⃣ Async frame(异步帧)
不在 CPU 栈上,而是:
- 协程 promise 对象
- continuation 节点
- sender/receiver glue
典型代表:
inject_stop_request_thunk connect_awaitable continuation_handle::resume特点:
- 不遵循 call/ret
- 通过函数对象 + 状态机连接
- 需要“人为识别”
四、从你的 backtrace 开始逐帧拆解
我们直接用你给的编号。
Frame 0:真正的“当前执行点”
0 : unifex::task<word_stats> process_file(...) at main.cpp:70这是用户代码
这是协程恢复后的执行位置
在 SVG 中对应:
process_fileFrame 1:协程恢复入口(分界点!)
1 : std::coroutine_handle<void>::resume()这是第一个“async frame”
为什么?
- 这是ABI 规定的协程恢复入口
- 再往下已经不是“谁调用了谁”
- 而是“谁 resume 了谁”
这是你“停止普通栈回溯”的地方
Frames 2–4:Unifex continuation 链
2 : continuation_handle<void>::resume 3 : continuation_handle<..._promise>::resume 4 : inject_stop_request_thunk它们的角色
这是一条异步 continuation 链:
IO completion → continuation_handle → promise.resume → inject_stop_request_thunk数学上可以看成:
resume=f4∘f3∘f2 resume = f_4 \circ f_3 \circ f_2resume=f4∘f3∘f2
它们在逻辑上是“await 的调用者”
但在 CPU 栈上不是父子关系
Frames 5–13:async_main 的 await 链
connect_awaitable async_main::lambda async_main on(...) tag_invoke await_transform connect_awaitable这一整段对应:
co_awaiton(pool.get_scheduler(),when_all_range(...));也就是:
process_file ↑ when_all_range ↑ on(scheduler) ↑ async_main这是你真正想要看到的“逻辑调用栈”
Frames 14–18:回到普通世界
14 : main ... 18 : __libc_start_main再次回到regular stack
程序入口
五、SVG 图如何对应这一切?
左 → 中 → 右 = 栈时间轴
左:未知旧栈帧(省略)
...中:分界点(关键)
coro::resume这正是:
std::coroutine_handle::resume它既是 CPU 栈的一帧,又是 async 世界的入口
右:当前协程帧
process_fileframe* / instr* 是什么?
frame*
- 指向当前 coroutine frame
- 存储在 promise / continuation 内部
- 不是 prev*(不是栈)
instr*
- 当前 resume 的指令地址
- 类似“逻辑 PC”
这就是async stack walking必须额外读取的元信息。
六、为什么普通 backtrace 不够?
因为:
Async Call Graph⊄CPU Stack \text{Async Call Graph} \not\subset \text{CPU Stack}Async Call Graph⊂CPU Stack
| 问题 | 普通 backtrace |
|---|---|
| 谁 await 了我 | |
| 协程逻辑父子关系 | |
| Sender 组合链 | |
| stop_token 注入 |
七、真正完整的“异步栈”应该长这样
人类理解版本:
main └── async_main └── on(thread_pool) └── when_all_range └── process_file └── co_await async_read_some_at而CPU 真实栈只是:
process_file coro::resume continuation_handle::resume ... pool.run八、结论(重点)
这张 slide 想教你的只有一件事:
异步栈 ≠ CPU 栈
而你要做的是:
- 先走 CPU 栈
→ 找到第一个coroutine_handle::resume - 再走 async continuation 链
→ promise / continuation / await_transform - 最后接回 main 的 CPU 栈
一句话总结
Unifex 的异步调用栈是“拼出来的”,不是“走出来的”。
0:unifex::task<word_stats>process_file<…>(async_read_only_file)()at main.cpp:701:std::coroutine_handle<void>::resume[abi:ne180100]()const()at coroutine_handle.h:692:unifex::continuation_handle<void>::resume()()at continuations.hpp:2203:unifex::continuation_handle<…_promise<word_stats>>::resume()()at continuations.hpp:3114:…_sr_thunk_task<…>…inject_stop_request_thunk<…>(…_sa_task<word_stats>)()at task.hpp:8245:autounifex::connect_awaitable(task<void>,receiverauto&)()at connect_awaitable.hpp:2346:unifex::task<void>async_main<…>(span<char*>,auto&,auto&)::'lambda'at main.cpp:857:unifex::task<void>async_main<…>(span<char*>,auto&,auto&)::'lambda'at main.cpp:858:unifex::task<void>async_main<…>(span<char*>,auto&,auto&)()at main.cpp:899:autounifex::on(schedulerauto&&,senderauto&&)const()at on.hpp:5110:decltype(auto)tag_invoke(…)()at sender_for.hpp:6111:autounifex::_wsa::_make_sender<…>(auto&&,auto&&)()at with_scheduler_affinity.hpp:4412:decltype(auto)…_promise<void>::await_transform(…)()at task.hpp:38413:autounifex::connect_awaitable(task<void>,receiverauto&)()at connect_awaitable.hpp:23414:main()at main.cpp:10315:main()at main.cpp:10316:main()at main.cpp:103一、你反复看到这些“逐步增长的栈”说明了什么?
贴出的多组栈,其实不是不同调用路径,而是:
同一次逻辑执行,在不同“恢复阶段”被采样到的 CPU 栈
也就是说:
- 协程每
resume()一次 - CPU 栈都会重新从某个点“长出来”
- 每多执行一步,就多“暴露”一层同步包装帧
所以你看到的是:
process_file ↓ coroutine_handle::resume ↓ continuation_handle::resume ↓ inject_stop_request_thunk ↓ connect_awaitable ↓ async_main::lambda ↓ async_main ↓ on(...) ↓ tag_invoke ↓ await_transform ↓ connect_awaitable ↓ main这不是递归,而是协程恢复链被逐层“揭开”。
二、核心问题:为什么 async 调用链不在普通栈上?
1⃣ 同步函数的世界(你熟悉的)
同步调用满足:
f0→f1→f2→… f_0 \rightarrow f_1 \rightarrow f_2 \rightarrow \dotsf0→f1→f2→…
特点:
- 每一次调用:
ret*= 返回地址prev*= 上一个栈帧
- 所有信息都在CPU 栈上
- 用
backtrace()就能完整看到
2⃣ 协程 / Unifex 的世界(完全不同)
协程不是调用,而是:
resume⇒jump into saved state \text{resume} \Rightarrow \text{jump into saved state}resume⇒jump into saved state
关键点:
- 协程帧不在 CPU 栈上
- 它在:
- heap
- TLS
- scheduler 私有内存
- CPU 栈上只有一个“入口点”
所以:
CPU 栈 ≠ 逻辑调用栈
三、你 SVG 里画的东西,其实非常“对”
我们把你图里的元素逐个对齐到现实。
ret*
ret*含义:
- 正常函数返回地址
- CPU 执行完当前函数后跳回哪里
只存在于同步栈帧
prev*
prev*含义:
- 上一个栈帧的指针(frame pointer)
- 用来“向上走栈”
只能在“连续的同步调用”中使用
frame*
frame*你在图中把它单独画出来,这非常关键。
它代表:
协程帧的“逻辑父帧”指针
也就是:
- 不在 CPU 栈上
- 指向:
promise_type- continuation
- async caller
这是异步栈的“prev”*
aframe*
aframe*这是最重要的概念之一。
它表示:
async frame(异步逻辑帧)
特点:
- 存在于:
- heap
- TLS
- scheduler 内部
- 通过 Unifex 的 continuation 链串起来
- 完全脱离 CPU 栈
TLS magic*
你在右侧标了:
TLS magic*这是非常准确的。
现实中:
- Unifex / coroutine runtime必须把“当前协程”存到 TLS
- 因为:
- CPU 栈无法表达 async 关系
- TLS 里通常放:
- 当前 coroutine handle
- 当前 continuation
- 当前 scheduler 上下文
这是 async stack walking 的“锚点”
四、为什么要 “drop / keep”?
你图里写了:
drop keep这代表栈裁剪策略(stack pruning)。
drop(丢弃)
- 纯同步包装
- 框架 glue code
- 对用户无意义
例如:
std::coroutine_handle::resume continuation_handle::resume inject_stop_request_thunkkeep(保留)
- 真正的“逻辑边界”
- 用户关心的 async 关系
例如:
process_file async_main main这正是 async backtrace 工具要做的事
五、inject_stop_request_thunk 是什么“怪物”?
这一帧你反复看到:
inject_stop_request_thunk它的本质是:
一个“插在同步栈里的异步桥梁”
作用:
- 在 resume 前:
- 注入 cancellation / stop_token
- 在 resume 后:
- 恢复 continuation
从结构上:
- 恢复 continuation
CPU stack ↓ thunk ↓ async frame所以你在图中把它画成instr* 是完全正确的:
- 它不是逻辑函数
- 它是“跳板”
六、为什么这些栈会“重复打印”?
你看到多次:
process_file coroutine_handle::resume continuation_handle::resume ...这是因为:
协程执行=∑iresumei \text{协程执行} = \sum_i \text{resume}_i协程执行=i∑resumei
每一次:
- 协程挂起(co_await)
- CPU 栈清空
- 某个事件完成
- 再次 resume
- 栈重新生长
所以你捕获到的是“时间切片视角的栈”
七、你现在“实际上已经理解了什么”
说一句实话:
你已经理解了 async debugging 90% 的核心难点。
你现在已经知道:
- 为什么普通 backtrace 不够
- async frame 在哪里
- continuation 是 async 的 prev*
- TLS 是 async 栈的入口
- thunk / connect_awaitable 是桥梁
- drop / keep 的意义
这正是:
LLVM async stacktrace / Java async stack / Rust async backtrace
解决的同一个问题
八、一句话总结(非常重要)
同步栈告诉你“CPU 正在干什么”,
异步栈告诉你“逻辑上是谁调用了谁”,
Unifex 把两者用 continuation + TLS 粘在一起。
0:unifex::task<word_stats>process_file<…>(async_read_only_file)()at main.cpp:701:std::coroutine_handle<void>::resume[abi:ne180100]()const()at coroutine_handle.h:692:unifex::continuation_handle<void>::resume()()at continuations.hpp:2203:unifex::continuation_handle<…_promise<word_stats>>::resume()()at continuations.hpp:3114:…_sr_thunk_task<…>…inject_stop_request_thunk<…>(…_sa_task<word_stats>)()at task.hpp:8245:autounifex::connect_awaitable(task<void>,receiverauto&)()at connect_awaitable.hpp:2346:unifex::task<void>async_main<…>(span<char*>,auto&,auto&)::'lambda'at main.cpp:857:unifex::task<void>async_main<…>(span<char*>,auto&,auto&)::'lambda'at main.cpp:858:unifex::task<void>async_main<…>(span<char*>,auto&,auto&)()at main.cpp:899:autounifex::on(schedulerauto&&,senderauto&&)const()at on.hpp:5110:decltype(auto)tag_invoke(…)()at sender_for.hpp:6111:autounifex::_wsa::_make_sender<…>(auto&&,auto&&)()at with_scheduler_affinity.hpp:4412:decltype(auto)…_promise<void>::await_transform(…)()at task.hpp:38413:autounifex::connect_awaitable(task<void>,receiverauto&)()at connect_awaitable.hpp:23414:main()at main.cpp:10315:main()at main.cpp:10316:main()at main.cpp:10317:main()at main.cpp:10518:__libc_start_main()at???:0一、先给结论版“心智模型”
在 Unifex 中:
CPU 同步调用栈 ≠ 协程调用栈
- 同步栈:由
ret* / prev* / instr*串起来 - 异步栈(async stack):由一组heap 上的协程帧(aframe)串起来
- 唯一的桥:
std::coroutine_handle::resume()
你看到的现象是:
调试器打印的是“同步栈”,
而 SVG 画的是“同步栈 + 异步栈 + 两者之间的指针关系”
二、逐行解释这段栈(但作为一个整体)
0 : unifex::task<word_stats> process_file<…>(async_read_only_file) () at main.cpp:70这是你真正关心的业务逻辑:
process_file是一个协程函数- 返回
unifex::task<word_stats> - 它的执行状态不在当前 CPU 栈中
- 它的局部变量、暂停点保存在协程帧(aframe)中
SVG 里对应的是最右侧标着process_file / aframe* 的盒子。
1 : std::coroutine_handle<void>::resume()这是整个故事里最重要的一行。
它表示:
CPU 正在从普通函数调用栈
跳转执行一个已经存在的协程帧
用一句公式化的话说:
resume():CPU stack→async frame \text{resume()} : \text{CPU stack} \rightarrow \text{async frame}resume():CPU stack→async frame
SVG 中所有从frame* / instr*指向aframe*的箭头,本质上都代表这一步。
2 : unifex::continuation_handle<void>::resume() 3 : unifex::continuation_handle<…_promise<word_stats>>::resume()这是Unifex 自己封装的“协程恢复层”:
continuation_handle是对coroutine_handle的轻量包装- 目的是:
- 统一 sender / receiver 调度
- 把“下一个要恢复的协程”串成链
在 SVG 中,它们对应的不是新的 CPU 栈帧,而是:
async frame 之间的 prev指针*
也就是你看到的async prev*。
4 : …inject_stop_request_thunk…这一层是 Unifex 的取消语义注入点:
- 它不是用户代码
- 也不是单纯调度
- 它是一个额外插入的协程帧
目的只有一个:
把
stop_token / cancellation
注入进 async 调用链
所以在 SVG 里你看到一个单独标出来的:
inject…thunk- 有自己的
instr* / prev* / magic*
这是一个真实存在的 async frame,只是你没写。
5 : auto unifex::connect_awaitable(task<void>, receiver auto&)这是async 世界的“连线操作”:
connect_awaitable并不执行协程- 它做的是:
- 把一个
task和一个receiver绑在一起 - 建立continuation 关系
在 SVG 中,它对应的是那些:
- 把一个
magic*- 指向 TLS 或下一个 async frame 的箭头
可以理解为:
connect:future→continuation \text{connect} : \text{future} \rightarrow \text{continuation}connect:future→continuation
6,7,8 : unifex::task<void> async_main …这是async 世界里的“main”:
async_main本身是一个协程- 它
co_await process_file(...) - 所以它:
- 是
process_file的父协程 - 自己也有一个 async frame
你看到同一个 lambda 出现两次,是因为:
- 是
一个协程帧可以多次被 resume
但 CPU 栈帧每次都是新的
9 : auto unifex::on(scheduler, sender) 10: tag_invoke 11: _make_sender这是Unifex 的调度绑定阶段:
on(...):指定执行在哪个 scheduler(线程 / io_uring / 线程池)tag_invoke:CPO 分发(无虚函数)_make_sender:把 scheduler 信息封进 sender
这些在 SVG 中体现为:- async frame 右侧指向CPU / TLS
magic*指向 TLS 的箭头
含义是:
恢复协程时,CPU 从 TLS 拿到“该恢复哪个 async frame”
12 : promise<void>::await_transform这是C++ 语法 → Unifex 语义的桥:
- 每一个
co_await都会先走这里 - Unifex 在这里:
- 拦截 awaitable
- 改写成 sender/receiver 体系
所以这是:
语言级协程 ↔ 库级 async 框架 的交界面
13 : connect_awaitable (再次出现)说明:
async_main自己也被“更外层”连接- async 调用链继续向上传播
14–17 : main 18 : __libc_start_main这是纯同步世界:
- 正常 C 运行时启动
main调用 async runtime- 然后就不再直接控制执行流程
在 SVG 左侧,就是这些传统: ret*prev*data
三、如何把三张 SVG 合成一个理解
你这三张图其实是同一件事的不同放大级别:
1⃣ 同步栈视角(调试器)
__libc_start_main main connect_awaitable resume process_file只能看到 CPU 栈
2⃣ 混合视角(你画的主图)
- 左:同步栈(ret / prev)
- 中:跳转点(frame* / instr*)
- 右:async frame(aframe / magic)
第一次把 async frame 画出来
3⃣ 完整执行模型(最后一张)
- 多个 async frame 串成异步调用链
- 同步栈只是“恢复执行的跳板”
- TLS 保存“下一步恢复谁”
真正的 async 栈
四、一句话把所有图和栈“锁死”
CPU 栈只负责“跳转”,
真正的调用关系存在于 async frame 的 prev链中。*
或者更硬核一点:
Logical Call Stack≠Physical Call Stack \text{Logical Call Stack} \neq \text{Physical Call Stack}Logical Call Stack=Physical Call Stack
- Physical stack:你在 backtrace 里看到的
- Logical stack:你在 SVG 里画的 async frames
如果你愿意,下一步我可以: - 把这套模型抽象成一张最简“Unifex async 执行定律图”
- 或者直接对照GDB / LLDB 如何手动 walk async 栈给你一个算法级解释
一、先建立统一的名词对照(后面反复用)
在你的图里反复出现这些标记,它们不是随便画的,而是精确对应底层机制:
- ret*:返回地址(return address)
- prev*:上一帧的栈指针(frame chain)
- frame*:当前 CPU 栈帧基址(frame pointer)
- instr*:即将执行的指令地址(instruction pointer / resume point)
- aframe*:异步帧(协程帧,分配在 heap)
- magic*:Unifex 用来串 async 链的“控制指针”
- TLS magic*:线程局部存储中,当前活跃 async continuation 的入口
一句话总结关系是:
同步栈→resume异步帧(aframe)→magic*下一个异步帧 \text{同步栈} \xrightarrow{\text{resume}} \text{异步帧(aframe)} \xrightarrow{\text{magic*}} \text{下一个异步帧}同步栈resume异步帧(aframe)magic*下一个异步帧
二、最左侧:纯同步世界(__libc → main → drop)
在三张图中,左侧和右侧的长方形堆叠都是同一类东西:
CPU 的普通调用栈帧
以你图中的__libc… → main → drop为例:
- 每一个栈帧都严格符合 ABI:
ret*:函数返回后 CPU 要跳转的位置prev*:上一层栈帧data:局部变量、保存寄存器等
- CPU只认识这种结构
- 调试器(gdb / lldb)默认也只能 walk 这种链
因此在没有协程时,调用关系是一个简单链:
__libc_start_main→main→drop \text{\_\_libc\_start\_main} \rightarrow \text{main} \rightarrow \text{drop}__libc_start_main→main→drop
三、关键断点:drop 栈帧里的 frame/ aframe**
你在三张图里都画了一个特殊的drop帧,它和普通栈帧不一样:
- 上半部分仍然是同步栈结构
- 下半部分开始出现:
frame*aframe*
这正是同步 → 异步的边界点。
这里发生了什么?
当代码执行到类似:
co_awaitsome_task;编译器会生成:
- 一个协程帧(aframe),放在 heap 上
- 当前函数不再“直接调用”下一步逻辑
- 而是:
- 保存当前状态
- 调用
coroutine_handle::resume()去恢复另一个 aframe
这一步可以写成:
CPU stack frame→coroutine_handle::resumeaframe \text{CPU stack frame} \xrightarrow{\text{coroutine\_handle::resume}} \text{aframe}CPU stack framecoroutine_handle::resumeaframe
因此你看到:
frame*:告诉 CPU“我从哪个同步帧进入异步”aframe*:指向真正存储协程状态的内存块
四、中间区域:async frame(main / inject…thunk / connect_await)
你在中间画了多个小盒子,例如:
main(async)inject…thunkconnect_await
它们有一个共同特征:
它们不是 CPU 栈帧,而是 async frame
每个 async frame 都包含:
instr*:恢复时从哪条指令继续prev*:逻辑上的“上一个 async 调用者”magic*:Unifex 私有的 continuation 指针
重要区别
- 同步栈:
prev*是物理栈指针 - 异步栈:
prev*是逻辑调用关系
也就是说:
async prev*≠CPU prev* \text{async prev*} \neq \text{CPU prev*}async prev*=CPU prev*
这正是你画虚线、圆点、弯箭头的原因。
五、inject…thunk:为什么它一定存在?
你单独高亮了inject…thunk,这是非常关键的一层。
它的作用不是业务逻辑,而是:
- 把
stop_token - 把 cancellation
- 把 Unifex 的生命周期控制
注入到 async 调用链中
从模型上看,它是:
父协程→inject thunk→子协程 \text{父协程} \rightarrow \text{inject thunk} \rightarrow \text{子协程}父协程→inject thunk→子协程
所以它: - 有自己完整的
instr* / prev* / magic* - 必须出现在 async 栈中
- 但你永远不会“显式写”它
六、TLS magic*:为什么 async 还能“回来”?
在右下角,你反复画了:
TLS magic*这是Unifex 能“walk async stack”的核心秘密。
执行流程是:
- CPU 正在执行同步代码
- 某个点调用
resume() resume():- 从 TLS 里取出
magic* - 找到当前要恢复的 aframe
- 从 TLS 里取出
- 跳转到该 aframe 的
instr*
可以抽象为:
TLS.magic*⇒current async frame \text{TLS.magic*} \Rightarrow \text{current async frame}TLS.magic*⇒current async frame
因此:
- 同步栈负责“谁来调度”
- TLS 负责“调度谁”
七、三张图之间的关系(非常重要)
你这三张 SVG 并不是三种不同设计,而是:
1⃣第一张:
强调CPU ↔ async frame ↔ TLS的整体连接关系
2⃣第二张:
强调同步栈如何在 drop 点“撕开”,插入 async frame
3⃣第三张:
把左右两侧的同步栈 + 中间 async 链 + TLS全部展开
可以把它们理解为:
图 1⊂图 2⊂图 3 \text{图 1} \subset \text{图 2} \subset \text{图 3}图1⊂图2⊂图3
八、最终一句话总结(把所有图“锁死”)
调试器看到的是 CPU 的“物理栈”,
而 Unifex 真正执行的是 async frame 的“逻辑栈”。
两者通过coroutine_handle::resume()和 TLS 中的magic*连接。
或者更形式化地说:
Physical Call Stack;≠;Logical Async Call Stack \text{Physical Call Stack} ;\neq; \text{Logical Async Call Stack}Physical Call Stack;=;Logical Async Call Stack
但:
resume()+TLS.magic*;⇒;二者可相互跳转 \text{resume()} + \text{TLS.magic*} ;\Rightarrow; \text{二者可相互跳转}resume()+TLS.magic*;⇒;二者可相互跳转
一、一句话结论(先给答案)
magic* 本质上就是一个AsyncStackRoot*
它是“同步栈 ↔ 异步栈” 的锚点(anchor),用来:
把 CPU 的物理调用栈,和 Unifex 的逻辑 async 调用栈连接起来
形式化一点说:
magic*≡AsyncStackRoot* \text{magic*} \equiv \text{AsyncStackRoot*}magic*≡AsyncStackRoot*
它不是魔法,也不是黑科技,而是一个明确的数据结构入口点。
二、AsyncStackRoot:magic* 指向的“根”
你给出的结构体是整个谜题的核心:
structAsyncStackRoot{atomic<AsyncStackFrame*>topFrame;AsyncStackRoot*nextRoot;frame_ptr stackFramePtr;instruction_ptr returnAddress;};我们逐个字段解释,并说明为什么它必须存在。
1⃣topFrame—— 当前线程的 async 栈顶
atomic<AsyncStackFrame*>topFrame;含义:
- 指向当前活跃的 async frame
- 是逻辑 async 调用栈的“栈顶”
- 用
atomic是因为:- async continuation 可能跨线程恢复
- 需要 lock-free 安全更新
抽象关系是:
AsyncStackRoot.topFrame→AsyncStackFramecurrent \text{AsyncStackRoot.topFrame} \rightarrow \text{AsyncStackFrame}_\text{current}AsyncStackRoot.topFrame→AsyncStackFramecurrent
这就是你图中反复看到的:
magic* → aframe*
2⃣nextRoot—— async roots 的链表
AsyncStackRoot*nextRoot;这是一个非常容易被忽略但极其关键的字段。
它表示:
- 一个线程的同步栈上
- 可能嵌套多个 async 根
例如:
main()└─on(scheduler,task)└─sync_wait(...)每一层都会在同步栈上放一个AsyncStackRoot,于是形成:
Root0→Root1→Root2 \text{Root}_0 \rightarrow \text{Root}_1 \rightarrow \text{Root}_2Root0→Root1→Root2
这也是为什么 async 栈是“森林 + 根链表”,而不是单一栈。
3⃣stackFramePtr—— 同步栈锚点
frame_ptr stackFramePtr;这就是magic能“回到同步世界”的原因*。
它保存的是:
- CPU 的frame pointer(如 RBP)
- 指向某个真实存在的同步栈帧
于是你得到一个关键等式:
AsyncStackRoot.stackFramePtr⇒CPU stack frame \text{AsyncStackRoot.stackFramePtr} \Rightarrow \text{CPU stack frame}AsyncStackRoot.stackFramePtr⇒CPU stack frame
也就是说:
async 栈不是悬浮在空中的
它明确知道自己是从哪一个同步栈帧“分叉”出来的
4⃣returnAddress—— 回到同步代码的位置
instruction_ptr returnAddress;这是恢复同步执行所需的指令地址。
当 async 链跑完,需要“回到”:
autox=sync_wait(task);这条语句之后时,CPU 就靠:
(stackFramePtr,returnAddress) (\text{stackFramePtr}, \text{returnAddress})(stackFramePtr,returnAddress)
来完成一次非普通 return 的控制流恢复。
三、AsyncStackFrame:真正的 async “栈帧”
再看你给出的第二个结构:
structAsyncStackFrame{AsyncStackFrame*parentFrame;instruction_ptr instructionPointer;AsyncStackRoot*stackRoot;};这才是你图中 aframe* 对应的实体。
1⃣parentFrame—— 逻辑 async 调用关系
AsyncStackFrame*parentFrame;这不是 CPU 的调用关系,而是:
“是谁 co_await 了我”
即:
childFrame.parentFrame=awaitingFrame \text{childFrame.parentFrame} = \text{awaitingFrame}childFrame.parentFrame=awaitingFrame
这正是 async stack 和 sync stack 最大的本质差异:
| 对比项 | 同步栈 | 异步栈 |
|---|---|---|
| 上一帧 | 物理调用者 | 逻辑 await 者 |
| 存储位置 | CPU 栈 | heap |
| 自动 unwind | 是 | 否 |
2⃣instructionPointer—— resume 的精确位置
instruction_ptr instructionPointer;等价于:
协程挂起点的 resume 地址
当你看到:
coroutine_handle::resume()真正跳转到的就是这里。
形式化表示:
resume()⇒jump to instructionPointer \text{resume()} \Rightarrow \text{jump to instructionPointer}resume()⇒jump to instructionPointer
3⃣stackRoot—— 回到 magic*
AsyncStackRoot*stackRoot;这是双向绑定:
AsyncStackRoot.topFrame → AsyncStackFrameAsyncStackFrame.stackRoot → AsyncStackRoot
于是形成一个闭环:
Root↔Frame \text{Root} \leftrightarrow \text{Frame}Root↔Frame
这保证了:- 无论从同步侧还是异步侧
- 都能找到对方
四、所以 magic* 究竟“魔法”在哪里?
现在可以非常明确地回答你反复问的那句话:
So what was that magic?*
答案是:
magic不是“魔法指针”,而是AsyncStackRoot*,
它是 async 栈的根节点,同时锚定了一个真实的同步栈帧。*
换句话说:
magic*=AsyncStackRoot*=sync ↔ async 的连接点 \text{magic*} = \text{AsyncStackRoot*} = \text{sync ↔ async 的连接点}magic*=AsyncStackRoot*=sync ↔ async的连接点
五、Unifex 是如何“把 async stack 带进来”的?
你最后那几条 bullet point,现在可以逐条精确解释了:
Every async operation owns an AsyncStackFrame
- 每一个 sender / task
- 都在 heap 上分配一个
AsyncStackFrame - 用它记录:
- resume 点
- 逻辑父子关系
Threads executing part of an async operation have an AsyncStackRoot on a frame in their stack
- 当 async 在某个线程执行时
- 在该线程的同步栈上放一个
AsyncStackRoot - 这就是你看到的:
- TLS
- magic*
- frame*
它把 async execution锚定到具体线程、具体栈帧。
Every sender algorithm captures its return address
这是最精妙的一点。
sender 在构造 continuation 时,会:
- 捕获:
- 当前 frame pointer
- 当前 return address
- 填入
AsyncStackRoot
于是 async 执行完后,可以:
恢复 CPU 栈⇒跳回原同步代码 \text{恢复 CPU 栈} \Rightarrow \text{跳回原同步代码}恢复CPU栈⇒跳回原同步代码
而不是简单return。
六、最终总结(把所有东西锁死)
magic* 的完整身份是:
**一个存在于同步栈上的
AsyncStackRoot*,
它保存了:
- async 栈顶
- 同步栈锚点
- 返回指令地址
从而把 C++20 协程的“逻辑调用栈”安全地嫁接到传统 CPU 调用栈上。**
用一句公式收尾:
AsyncStackRoot={Async stack entrySync stack anchor \text{AsyncStackRoot} = \begin{cases} \text{Async stack entry} \\ \text{Sync stack anchor} \end{cases}AsyncStackRoot={Async stack entrySync stack anchor