从一个最普通的 SQL 问题说起
数据分析中最常见的需求,不是找到某一行具体的记录,而是从大量数据中提炼出规律。比如:各部门的销售总额是多少?哪些 IP 地址的请求量最高?过去一周每天的错误率有什么变化?
这类问题,在 SQL 里对应的就是聚合查询,也就是带有GROUP BY的查询。
SELECTdepartment,sum(value)FROMsalesGROUPBYdepartmentORDERBYsum(value)DESCLIMIT10这条语句的语义很直观:把所有销售记录按部门分组,计算每个部门的销售总额,取前 10 名降序返回。
然而,当这张sales表存储在 PB 级的对象存储上,由数百万个 Parquet 文件组成,分散在全球多个计算节点时,执行这条查询就不再是一件简单的事了。
Cloudflare 在 2025 年 12 月宣布,R2 SQL 正式支持聚合查询,包括GROUP BY、SUM、COUNT、HAVING等常用聚合操作。这篇博客的核心,就是讲清楚这件事在工程上是如何实现的。
聚合查询的基本逻辑:把数据装进桶
理解后续内容之前,先把聚合查询的执行模型建立清楚。
GROUP BY的核心动作是分桶:把表中所有的行,按照某个列的值分配进不同的桶。每个桶有一个标签(分组键的值),桶里装着所有具有相同标签的行。分桶完成之后,再对每个桶内的行执行聚合函数——比如sum(value)是把桶里所有行的value字段加起来,count(*)是数桶里有多少行。
ORDER BY和LIMIT进一步限定了返回结果的范围——只看聚合结果最大的前几名。
HAVING则是一个作用于聚合结果的过滤器,在分桶和计算完成之后,丢弃不满足条件的桶:
SELECTdepartment,sum(value),count(*)FROMsalesGROUPBYdepartmentHAVINGcount(*)>5ORDERBYsum(value)DESCLIMIT10这条查询的含义是:只统计销售记录超过 5 条的部门,按总销售额降序取前 10 名。
聚合查询的内在特殊性:计算时机的两难
聚合查询有一个有趣的特性:它们可以引用那些并不存储在任何地方的列。
department这一列直接存在 Parquet 文件里,读出来就能用。但sum(value)和count(*)这两列,是查询引擎在运行时动态计算出来的,文件里没有现成的值。
这个区别带来了一个执行顺序上的约束:如果查询在HAVING或ORDER BY中引用了聚合函数的结果,那么这个聚合结果必须先被计算出来,才能用它做排序或过滤。
反过来,如果查询只在SELECT子句中使用聚合函数,而不在HAVING或ORDER BY中依赖它,那么可以等到最后一刻再计算,执行过程中只维护一个中间状态,最终合并时才得出最终值。
这两种情况对应两种截然不同的执行策略。
策略一:散射-聚合(Scatter-Gather)
对于不需要根据聚合结果做排序或过滤的查询,R2 SQL 采用散射-聚合模式,整体流程与过滤查询类似:
协调者节点(Coordinator)分析查询,结合 Iceberg 元数据确定哪些 Parquet 行组(Row Group)可能包含相关数据,然后把这些行组作为独立的工作单元分发给多个工作节点(Worker)并行处理。
区别在于:工作节点不只是过滤行,还需要计算预聚合(Pre-aggregate)。
预聚合是什么?它是聚合函数的中间状态——一份对部分数据做了初步计算的不完整结果,多个预聚合可以合并,最终得到聚合函数的完整值。
几个典型的预聚合形式:
count(*)的预聚合:一个整数,表示这批数据里有多少行。最终合并时,把所有工作节点的整数加起来即可。sum(value)的预聚合:一个数值,表示这批数据里value的局部求和。最终合并时同样相加。avg(value)的预聚合:两个数值——sum(value)和count(*)。最终合并时,把所有节点的sum加总,把所有节点的count加总,再相除得到平均值。
工作节点计算完预聚合后,把结果流式传送给协调者。协调者汇总所有预聚合,计算最终值,返回给用户。
这个模式效率很高——协调者收到的每个节点的数据量都很小,整体内存占用与 R2 中存储的数据量无关。
散射-聚合的致命缺陷
然而,一旦查询引入了基于聚合结果的ORDER BY,散射-聚合就会给出错误的答案。
考虑这个查询:找出销售额最高的前 2 个部门。
SELECTdepartment,sum(sales)FROMsalesGROUPBYdepartmentORDERBYsum(sales)DESCLIMIT2要确定全局前 2 名,必须知道每个部门在全部数据上的销售总额。但 Parquet 文件是随机分布在各工作节点上的,同一个部门的销售记录可能分散在许多不同的文件里,也就分散在不同的工作节点上。
假设"工程部"在全局是销售额第一,但因为它的记录均匀分布在 10 个工作节点上,在每个节点上看起来销售额都不突出,无法进入该节点的本地前 2 名。这样一来,协调者收到的结果里根本没有"工程部"的数据,全局第一名就这样被错误地丢弃了。
这不是实现质量的问题,而是散射-聚合这个模式本身的边界:当排序依赖全局聚合结果时,局部聚合结果无法代表全局。
策略二:洗牌聚合(Shuffling Aggregation)
要解决这个问题,需要在最终聚合之前,先把同一分组键的所有数据汇聚到同一个节点上——这就是洗牌(Shuffle)。
确定性哈希分区:数据主动找到归属节点
洗牌的路由机制依赖确定性哈希分区:对于每一行数据,工作节点对其GROUP BY列的值做哈希运算,哈希结果决定这行数据应该被发送到哪个工作节点。
关键在于"确定性"三个字——相同的输入永远映射到相同的输出。所有工作节点使用同一个哈希函数,无需任何中心化注册表,就能各自独立地得出一致的路由决策。如果"工程部"哈希到节点 5,那么集群里每一个工作节点都知道,凡是遇到"工程部"的行,就发给节点 5。
洗牌完成之后,节点 5 持有全局所有"工程部"的记录,可以独立计算出准确的全局销售总额。
同步屏障:确保数据到齐再计算
洗牌引入了一个时序依赖:如果节点 5 在节点 3 还没把"工程部"的数据发完之前就开始计算,结果就是不完整的。
解决方案是一个严格的同步屏障(Synchronization Barrier)。协调者追踪整个集群的进度,各工作节点通过 gRPC 流把待发数据全部冲刷(Flush)出去。只有当每个工作节点都向协调者确认"我的输入文件已处理完毕,所有数据已发送",协调者才下达继续执行的指令。
这个屏障保证了:当下一阶段开始时,每个节点持有的数据集是完整且准确的。
本地最终化:把计算下推到工作节点
同步屏障之后,每个工作节点都持有其负责分组的完整数据,可以在本地独立完成最终计算。这个时候可以做的事情很多:
- 本地计算聚合函数的最终值;
- 本地执行
HAVING过滤,丢弃不满足条件的分组; - 本地排序。
这些操作原本在散射-聚合模式中是协调者的负担,现在被分散到了各工作节点上并行完成,避免了协调者成为单点瓶颈。
流式归并:协调者只做轻量合并
最后,协调者的角色从"重计算节点"变成了"轻量合并节点"。
由于各工作节点已经在本地完成了聚合和排序,协调者只需要做一次k 路归并(k-way merge):同时从所有工作节点拉取已排好序的结果流,逐行比较,按照排序规则挑选"胜出者",组成最终结果。
对于带有LIMIT的查询,这个设计格外高效——协调者凑够所需的行数后立即停止,不需要把所有剩余数据都加载进内存。
两种策略的适用边界
| 特征 | 散射-聚合(Scatter-Gather) | 洗牌聚合(Shuffling) |
|---|---|---|
| 适用场景 | SELECT中含聚合,但HAVING/ORDER BY不依赖聚合结果 | HAVING或ORDER BY依赖聚合结果 |
| 协调者压力 | 极低(只合并小数据) | 低(只做 k 路归并) |
| 工作节点通信 | 节点→协调者,单向 | 节点→节点,全量交换(All-to-All) |
| 正确性保证 | 高基数分组键下无法保证排序正确性 | 始终正确 |
适合LIMIT | 是 | 是,归并阶段即可提前终止 |
总结
这篇博客看起来是在宣布一个产品功能的上线,但它真正有价值的地方,是把分布式聚合查询这个数据库领域的经典难题,用一种清晰的方式讲了出来。
散射-聚合和洗牌聚合不是 R2 SQL 的独创,Spark、Trino、Flink 里都有类似的概念。R2 SQL 的特殊之处在于:它把这套分布式执行能力,建立在完全无服务器的基础设施上,运行在 Cloudflare 遍布全球的边缘网络里,不需要用户预置任何集群,用完即释放。
两种策略背后的核心判断逻辑值得单独记住:如果聚合结果需要在后续步骤中被引用(排序、过滤),就不能依赖局部聚合——必须先把数据按分组键聚拢,才能保证结果的正确性。这是一个与具体实现无关的、分布式聚合计算的基本原则。
R2 SQL 的聚合查询支持已正式上线,可通过官方文档查看语法参考。
参考来源:Cloudflare Blog — “Announcing support for GROUP BY, SUM, and other aggregation queries in R2 SQL”