前言
你有没有遇到过这种情况:
服务器CPU突然飙到100%,top 里看到一个进程,但不知道它到底在干什么。用 gdb attach 上去,程序卡住;用 strace,输出太多看不清。
你需要一张火焰图。
今天,我们动手写一个火焰图生成工具,彻底搞懂:
· 如何采样程序的调用栈
· 如何把采样数据变成可视化图片
· 怎么定位CPU的"热点函数"
---
一、火焰图原理
1. 核心思路
火焰图不是"实时监控",而是采样统计:
```
每隔10毫秒 → 记录当前程序在哪个函数里 → 重复一万次 → 画出统计结果
```
函数在采样中出现的次数越多,它在图上就越"胖",说明它占用的CPU时间越多。
2. 一张图看懂火焰图
```
▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄▄
▌ main █
▌ ├─ parse_config █
▌ │ └─ strcmp █
▌ └─ process_request █
▌ ├─ malloc █
▌ ├─ memcpy █
▌ └─ hash_compute ← 这个函数最宽,是瓶颈
```
· Y轴:调用栈深度(下面是被调用的,上面是调用者)
· X轴:不是时间线,是样本占比
· 宽度:函数占用CPU的时间比例
3. 需要的三个工具
工具 作用
采样器 每隔N毫秒记录一次调用栈
折叠器 把调用栈去重、计数、格式化
绘图器 把数据画成SVG图片
我们手写前两个,用现成的 FlameGraph 绘图。
---
二、完整代码实现
1. 采样器:用 ptrace 抓调用栈
```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <sys/ptrace.h>
#include <sys/wait.h>
#include <sys/user.h>
#include <execinfo.h>
#define SAMPLE_INTERVAL_US 10000 // 10毫秒采样一次
#define MAX_FRAMES 64
typedef struct {
void *addrs[MAX_FRAMES];
int depth;
int count; // 相同调用栈出现的次数
} stack_sample_t;
static stack_sample_t *samples = NULL;
static int sample_count = 0;
static int sample_capacity = 0;
// 添加或更新调用栈记录
void add_or_update_stack(void **addrs, int depth) {
for (int i = 0; i < sample_count; i++) {
if (samples[i].depth == depth) {
int match = 1;
for (int j = 0; j < depth; j++) {
if (samples[i].addrs[j] != addrs[j]) {
match = 0;
break;
}
}
if (match) {
samples[i].count++;
return;
}
}
}
// 新调用栈
if (sample_count >= sample_capacity) {
sample_capacity = sample_capacity ? sample_capacity * 2 : 1024;
samples = realloc(samples, sample_capacity * sizeof(stack_sample_t));
}
samples[sample_count].depth = depth;
samples[sample_count].count = 1;
for (int i = 0; i < depth; i++) {
samples[sample_count].addrs[i] = addrs[i];
}
sample_count++;
}
// 获取目标进程的调用栈(通过ptrace)
int get_callstack(pid_t target, void **buffer, int max_depth) {
struct user_regs_struct regs;
if (ptrace(PTRACE_GETREGS, target, NULL, ®s) == -1) {
return 0;
}
// x86_64: RSP是栈指针,RIP是指令指针
unsigned long rbp = regs.rbp;
unsigned long rip = regs.rip;
buffer[0] = (void*)rip;
int depth = 1;
// 沿着RBP链表向上回溯
for (int i = 1; i < max_depth && rbp != 0; i++) {
unsigned long ret_addr;
// 读取RBP+8处的返回地址
ret_addr = ptrace(PTRACE_PEEKDATA, target, rbp + 8, NULL);
if (ret_addr == 0 || ret_addr == (unsigned long)-1) break;
buffer[i] = (void*)ret_addr;
depth++;
// 移动到上一个栈帧
rbp = ptrace(PTRACE_PEEKDATA, target, rbp, NULL);
}
return depth;
}
// 采样器主循环
void sampler(pid_t target) {
printf("开始采样,目标PID: %d\n", target);
while (1) {
// 暂停目标进程
if (ptrace(PTRACE_ATTACH, target, NULL, NULL) == -1) {
perror("ptrace attach");
break;
}
waitpid(target, NULL, 0);
// 获取调用栈
void *stack[MAX_FRAMES];
int depth = get_callstack(target, stack, MAX_FRAMES);
if (depth > 0) {
add_or_update_stack(stack, depth);
}
// 恢复运行
ptrace(PTRACE_DETACH, target, NULL, NULL);
// 等待下一次采样
usleep(SAMPLE_INTERVAL_US);
}
}
int main(int argc, char *argv[]) {
if (argc < 2) {
fprintf(stderr, "用法: %s <PID>\n", argv[0]);
return 1;
}
pid_t target = atoi(argv[1]);
// 设置采样时间(默认10秒)
alarm(10);
signal(SIGALRM, exit);
sampler(target);
// 输出折叠格式的数据
for (int i = 0; i < sample_count; i++) {
// 打印调用栈,用分号分隔
for (int j = samples[i].depth - 1; j >= 0; j--) {
// 这里应该用 addr2line 解析符号
// 简化版:直接输出地址
printf("%lx", (unsigned long)samples[i].addrs[j]);
if (j > 0) printf(";");
}
printf(" %d\n", samples[i].count);
}
return 0;
}
```
2. 折叠器:简化版(用 addr2line 解析符号)
```bash
#!/bin/bash
# fold_stacks.sh
# 从采样器输出读取,解析符号
while read line; do
stack=$(echo "$line" | awk '{print $1}')
count=$(echo "$line" | awk '{print $2}')
# 用 addr2line 把地址转成函数名
folded=""
IFS=';' read -ra addrs <<< "$stack"
for addr in "${addrs[@]}"; do
func=$(addr2line -e "$1" -f "$addr" 2>/dev/null | head -1)
if [ -z "$func" ]; then
func="$addr"
fi
folded="${folded};${func}"
done
folded="${folded#;}" # 去掉开头的分号
# 输出折叠格式
echo "$folded $count"
done
```
3. 简化版:不用ptrace,用 backtrace 采样自己
如果你的目标是分析自己的程序,可以更简单:
```c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <execinfo.h>
#include <string.h>
#include <time.h>
#define MAX_SAMPLES 10000
#define MAX_DEPTH 64
typedef struct {
char stack[1024];
int count;
} sample_t;
sample_t samples[MAX_SAMPLES];
int sample_count = 0;
// 信号处理函数:记录调用栈
void sample_handler(int sig) {
void *buffer[MAX_DEPTH];
int depth = backtrace(buffer, MAX_DEPTH);
// 跳过 signal handler 自身
depth -= 2;
// 构建字符串表示
char stack_str[1024] = {0};
for (int i = 2; i < depth + 2; i++) {
char addr_str[32];
snprintf(addr_str, sizeof(addr_str), "%p", buffer[i]);
if (i > 2) strcat(stack_str, ";");
strcat(stack_str, addr_str);
}
// 查找或添加
for (int i = 0; i < sample_count; i++) {
if (strcmp(samples[i].stack, stack_str) == 0) {
samples[i].count++;
return;
}
}
// 新调用栈
strcpy(samples[sample_count].stack, stack_str);
samples[sample_count].count = 1;
sample_count++;
}
int main(int argc, char *argv[]) {
// 设置定时器
struct itimerval timer;
timer.it_value.tv_sec = 0;
timer.it_value.tv_usec = 10000; // 10毫秒
timer.it_interval.tv_sec = 0;
timer.it_interval.tv_usec = 10000;
signal(SIGPROF, sample_handler);
setitimer(ITIMER_PROF, &timer, NULL);
// 这里放你的业务代码
printf("开始采样,按Ctrl+C结束\n");
sleep(60); // 采样60秒
// 输出结果
for (int i = 0; i < sample_count; i++) {
printf("%s %d\n", samples[i].stack, samples[i].count);
}
return 0;
}
```
---
三、生成火焰图的完整流程
步骤1:编译目标程序(带调试符号)
```bash
gcc -g -O2 myprogram.c -o myprogram
```
步骤2:运行采样器
```bash
# 方法A:分析其他进程
sudo ./sampler $(pidof myprogram) > stacks.txt
# 方法B:分析自己
./self_sampler 2> stacks.txt
```
步骤3:折叠调用栈
```bash
./fold_stacks.sh myprogram < stacks.txt > folded.txt
```
步骤4:生成火焰图
```bash
git clone https://github.com/brendangregg/FlameGraph
./FlameGraph/flamegraph.pl folded.txt > flame.svg
```
步骤5:查看结果
用浏览器打开 flame.svg,点击函数可以缩放。
---
四、实战案例:定位CPU热点
问题程序
```c
// cpu_hotspot.c
#include <stdio.h>
#include <string.h>
void hash_compute(char *data, int len) {
// 故意写一个很慢的哈希
for (int i = 0; i < len; i++) {
for (int j = 0; j < 1000; j++) {
data[i] ^= (j * i) & 0xff;
}
}
}
void process_request(char *data, int len) {
hash_compute(data, len); // 瓶颈在这里
}
int main() {
char buffer[1024];
while (1) {
memset(buffer, 'A', sizeof(buffer));
process_request(buffer, sizeof(buffer));
}
return 0;
}
```
生成火焰图
```bash
gcc -g cpu_hotspot.c -o hotspot
./self_sampler 2> stacks.txt # 跑几秒后Ctrl+C
./flamegraph.pl stacks.txt > hotspot.svg
```
火焰图解读
```
main
└─ process_request
└─ hash_compute ← 这个函数占了90%的宽度,是瓶颈
```
结论:优化 hash_compute,或者用更快的哈希算法。
---
五、常见问题与优化
问题1:采样开销太大
采样频率 性能影响
1000次/秒 CPU占用增加5-10%
100次/秒 基本无感
建议:生产环境用100Hz就够了。
问题2:符号解析不到
```bash
# 确保程序编译时带 -g
gcc -g program.c -o program
# 如果被strip过,需要恢复
objdump --demangle -d program | grep function_name
```
问题3:权限不足
ptrace 需要root权限,或者:
```bash
# 允许非root使用ptrace
echo 0 > /proc/sys/kernel/yama/ptrace_scope
```
问题4:多线程程序
上面的代码只采样主线程。要采样所有线程:
```c
// 遍历 /proc/pid/task/
DIR *dir = opendir("/proc/12345/task");
struct dirent *entry;
while ((entry = readdir(dir))) {
pid_t tid = atoi(entry->d_name);
if (tid > 0) {
// 对每个线程采样
sample_thread(tid);
}
}
```
---
六、进阶:perf + 火焰图
手写采样器适合学习,生产环境推荐用 perf:
```bash
# 采样
perf record -F 99 -g -p 12345 -- sleep 30
# 生成火焰图
perf script | ./FlameGraph/stackcollapse-perf.pl | \
./FlameGraph/flamegraph.pl > perf_flame.svg
```
perf 的优势:
· 内核级采样,开销更小
· 支持符号解析
· 可以同时采样CPU、内存、磁盘I/O
---
七、完整的火焰图生成脚本
```bash
#!/bin/bash
# quick_flame.sh - 一键生成火焰图
PID=$1
DURATION=${2:-10}
OUTPUT=${3:-flame.svg}
if [ -z "$PID" ]; then
echo "用法: $0 <PID> [时长秒数] [输出文件名]"
exit 1
fi
echo "采样进程 $PID,时长 ${DURATION} 秒..."
# 采样
perf record -F 99 -g -p $PID -- sleep $DURATION
# 生成折叠数据
perf script | stackcollapse-perf.pl > out.folded
# 生成火焰图
flamegraph.pl out.folded > $OUTPUT
echo "火焰图已生成: $OUTPUT"
```
---
结语
通过这篇文章,你学会了:
· 火焰图的原理(采样 + 统计 + 可视化)
· 如何用 ptrace 手写一个采样器
· 如何用 backtrace 采样自己的程序
· 完整的火焰图生成流程
· 用 perf 快速定位CPU热点
下次CPU飙高,不要再 top 乱看了。生成一张火焰图,凶手一目了然。
下一篇预告:《手写一个简单的性能监控工具:CPU、内存、网络一网打尽》
---
评论区留下你用火焰图抓到过的"凶手"~