在分布式消息系统的版图上,Kafka 长期占据统治地位。然而随着云原生浪潮席卷整个技术栈,企业在消息基础设施上面临的痛点愈发清晰:存储与计算耦合导致弹性扩缩容困难、多租户隔离不彻底、跨地域复制依赖外部工具、消费模型单一无法同时支撑队列和流式场景。Apache Pulsar 正是在这样的背景下,带着「云原生优先」的设计哲学进入了主流视野。
本文将从 Pulsar 的核心理念出发,系统性拆解其架构、使用场景、实操方式,并给出真实的生产落地经验。全文约 3500 字,力求深入但不堆砌术语。
一、Pulsar 是什么
Apache Pulsar 是一个开源的分布式消息与流处理平台,由 Yahoo 于 2013 年开发,2016 年捐赠给 Apache 基金会,2018 年成为顶级项目。与 Kafka 的日志模型不同,Pulsar 在架构层面实现了计算与存储分离,因此天然具备弹性伸缩、多租户和跨地域复制的特性。
可以用一句话概括其设计理念:
Pulsar = 消息发布订阅 + 分布式日志存储 + 轻量级流处理函数
二、架构层次深度拆解
2.1 三层解耦:Broker / BookKeeper / ZooKeeper
┌─────────────────────────────────────────────┐ │ Producer │ └────────────────────┬────────────────────────┘ │ ┌────────────────────▼────────────────────────┐ │ Pulsar Broker │ │ (Stateless — 无状态服务层) │ │ ┌─────────┐ ┌──────────┐ ┌─────────────┐ │ │ │Topic Svr│ │Lookup Svr│ │Load Balancer│ │ │ └─────────┘ └──────────┘ └─────────────┘ │ └────────────────────┬────────────────────────┘ │ ┌────────────────────▼────────────────────────┐ │ Apache BookKeeper │ │ (Stateful — 持久化存储层) │ │ ┌──────────┐ ┌──────────┐ ┌──────────────┐│ │ │ Bookie 1 │ │ Bookie 2 │ │ Bookie 3 ... ││ │ └──────────┘ └──────────┘ └──────────────┘│ └─────────────────────────────────────────────┘
- Broker(无状态):负责接收客户端请求、管理 Topic 元数据、路由消息到对应的 BookKeeper 节点。Broker 本身不存储任何消息数据,启动和停止近乎瞬时。
- BookKeeper(有状态):Apache 旗下的另一个顶级项目,提供低延迟、持久化的分布式日志存储。每个 Topic 分区实际对应一组 BookKeeper Ledger。
- Metadata Store:默认为 ZooKeeper,3.0 版本后支持 etcd / RocksDB 替代方案,存储集群元数据与租户命名空间配置。
这种分层架构的直接收益是独立扩缩容:当写入流量突增,只需扩容 BookKeeper 节点;当连接数暴增,只需扩容 Broker 节点。两种资源不必等比例增长,避免了 Kafka 中磁盘与 CPU 同步扩容带来的浪费。
2.2 消息存储模型:Segment 与 Ledger
Pulsar 将一个分区内消息流按时间和大小切分为多个Segment,每个 Segment 对应 BookKeeper 中的一个Ledger。Ledger 又由多个Entry组成:
Partition ├── Segment 0 → Ledger-001 (Bookie A, C, E) ├── Segment 1 → Ledger-002 (Bookie B, D, F) ├── Segment 2 → Ledger-003 (Bookie A, D, E) └── ...
为什么这样设计?因为每个 Ledger 的写入集合(Ensemble)可以不同。这意味着写入负载可以在 BookKeeper 集群中的所有节点之间动态打散,而非绑定到少数节点。Segment 滚动关闭后,旧的 Ledger 变为只读,写入移至新的 Ledger,天然实现了数据分片与热写点的消散。
2.3 消费模型:四种订阅模式
| 订阅模式 | 行为 | 典型场景 |
|---|
| Exclusive | 一个订阅只允许一个 Consumer | 严格顺序消费 |
| Failover | 多 Consumer,同一时刻仅主 Consumer 活跃,故障时切换 | 高可用顺序消费 |
| Shared | 多 Consumer 以 Round-Robin 分摊消息 | 高吞吐并发处理 |
| Key_Shared | 相同 Key 的消息发送给同一 Consumer,不同 Key 可并发 | 有序分片处理 |
其中Key_Shared模式是 Pulsar 的原创设计。Kafka 的消费者组中,一个分区只能被组内一个消费者消费,并发度上限等于分区数。而 Pulsar 的 Key_Shared 允许多个消费者共同消费同一分区,只要保证同一 Key 不被并发处理即可。这意味着你可以同时获得高并发和有序性,而不必预先增加分区数。
三、使用场景全景图
3.1 实时数据管道
某电商平台的订单数据需要同步分发到推荐系统、风控引擎、实时数仓三个下游系统。传统方案下,要么维护三套 Kafka Topic 导致数据重复存储,要么让下游轮询同一 Topic 互相干扰。
Pulsar 的方案:创建一个 Topic,为其创建三个独立的Subscription(订阅)。每个订阅有自己独立的消费位点(Cursor),下游系统各自按自己的节奏消费,互不影响。消息只存储一份,却支持多路独立消费——这就是 Topic 与 Subscription 解耦的价值。
3.2 跨地域数据同步(Geo-Replication)
Pulsar 内置了原生的跨集群复制能力,不需要额外部署 MirrorMaker 之类的外部工具。你可以在集群 A 和集群 B 之间配置双向或单向复制,Topic 级别的粒度控制,支持异步复制和同步复制两种模式。
实际案例:一家全球 SaaS 服务商在 AWS 美东和美西分别部署了 Pulsar 集群,通过 Geo-Replication 实现了 Topic 级别的双向数据同步,延迟稳定在 200ms 以内。当美东集群故障时,美西集群自动接管,整个过程对业务透明。
3.3 多租户消息平台
Pulsar 的租户模型天生适合构建企业级统一消息平台:
Tenant (业务线) └── Namespace (环境/团队) └── Topic (具体业务)
每个租户可以分配独立的存储配额、带宽限制和权限策略。配合 Pulsar Functions 的轻量计算能力,租户还可以在消息通道内部署自己的 ETL 逻辑,无需额外计算集群。这一点在企业内部数据中台场景中尤为突出——你可以用一套 Pulsar 集群同时服务订单、支付、物流、营销四个业务域,每个域独立管控。
3.4 事件驱动微服务
在微服务架构中,服务间通信面临两个难题:可靠性和可追溯性。Pulsar 的Exclusive + Key_Shared订阅组合可以精确控制消费语义,而内置的 Schema Registry 则保证了消息格式的版本兼容性。
例如用户服务发布 user.created 事件时,Schema Registry 会校验消息是否符合预设的 Avro/JSON/Protobuf Schema。下游营销服务即使延迟升级,也能通过 Schema 兼容策略正常解析旧版本消息,避免序列化异常导致的消费中断。
四、具体使用方式:从零搭建到生产运行
4.1 本地开发环境(Docker)
# 拉取 Pulsar 官方镜像并启动 docker run -it \ -p 6650:6650 \ -p 8080:8080 \ --name pulsar-standalone \ apachepulsar/pulsar:3.3.0 \ bin/pulsar standalone启动后,6650 端口为 TCP 协议端口(用于生产消费),8080 端口为 HTTP 管理端口。
4.2 创建租户与命名空间
Pulsar 要求在发送消息前先创建好逻辑结构:
# 1. 创建租户 bin/pulsar-admin tenants create my-company \ --allowed-clusters standalone # 2. 创建命名空间 bin/pulsar-admin namespaces create my-company/order-service # 3. 设置消息保留策略(保留 72 小时或 5GB) bin/pulsar-admin namespaces set-retention my-company/order-service \ --size 5G \ --time 72h4.3 Java 客户端 — Producer
PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build(); Producer<byte[]> producer = client.newProducer() .topic("persistent://my-company/order-service/order-created") .compressionType(CompressionType.LZ4) .batchingMaxMessages(1000) .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) .create(); // 发送消息并确认 MessageId msgId = producer.newMessage() .key("order-2024-001") .value("{\"orderId\":\"001\",\"amount\":99.9}".getBytes()) .property("source", "order-service") .send(); // 同步等待 Broker 确认(持久化完成) // MessageId 可用于后续追踪和去重 producer.close(); client.close();这里有个容易踩的坑:batchingMaxPublishDelay 设得太大会导致低流量场景下消息延迟骤增。建议生产环境设为 5-10ms,高吞吐场景可放宽到 20ms。
4.4 Java 客户端 — Consumer
Consumer<byte[]> consumer = client.newConsumer() .topic("persistent://my-company/order-service/order-created") .subscriptionName("fraud-detection-sub") .subscriptionType(SubscriptionType.Key_Shared) .keySharedPolicy( KeySharedPolicy.stickyHashRange() .ranges(Range.of(0, 65535)) ) .subscribe(); while (true) { Message<byte[]> msg = consumer.receive(); try { String content = new String(msg.getData()); processOrder(content); // 业务逻辑 consumer.acknowledge(msg); // 确认消费 } catch (Exception e) { consumer.negativeAcknowledge(msg); // 否定确认,触发重投 } }4.5 消息重投与死信策略
Pulsar 提供了细粒度的消息重投控制:
consumer.negativeAcknowledge(msg); // 消息将在 60s(默认 nack 重投延迟)后重新投递当一条消息重复投递达到阈值后,自动进入死信队列:
# 配置死信策略 bin/pulsar-admin namespaces set-subscription-types-enabled \ my-company/order-service \ --enable true # 查看死信队列中的消息 bin/pulsar-admin topics peek-messages \ persistent://my-company/order-service/order-created-fraud-detection-sub-DLQ \ --count 104.6 Schema 管理
Pulsar 原生支持 Schema 注册,避免序列化格式不一致导致的生产事故:
// Producer 声明 Schema Producer<OrderEvent> producer = client.newProducer(Schema.AVRO(OrderEvent.class)) .topic("persistent://my-company/order-service/order-events") .create(); // Consumer 自动校验 Consumer<OrderEvent> consumer = client.newConsumer(Schema.AVRO(OrderEvent.class)) .topic("persistent://my-company/order-service/order-events") .subscriptionName("analytics-sub") .subscribe();Schema 兼容策略包含 FORWARD(新 Schema 可读旧数据)、BACKWARD(旧 Schema 可读新数据)和 FULL(双向兼容),建议从 BACKWARD 开始,后续逐步收敛到 FULL。
五、使用优点总结
| 维度 | Pulsar 的优势 |
|---|
| 弹性伸缩 | Broker 无状态,BookKeeper 独立扩容,无需数据重平衡(Rebalance) |
| 多租户隔离 | 原生租户(Tenant)→ 命名空间(Namespace)→ Topic 三级结构,配额和权限逐层可控 |
| 多订阅模式 | 一个 Topic 多份订阅,各自维护独立位点,支持 Exclusive / Failover / Shared / Key_Shared |
| Geo-Replication | 内置跨集群异步/同步复制,Topic 级粒度控制,无需外部 Mirroring 工具 |
| 分层存储 | 冷数据自动卸载到 S3 / HDFS / GCS,降低 BookKeeper 存储成本 |
| 消息存活策略 | 可按时间 / 存储大小 / 消费确认状态灵活配置消息的 TTL、Backlog 配额和保留策略 |
| Schema Registry | 内置消息格式校验和兼容性检查,防止序列化异常级联故障 |
| 轻量计算 | Pulsar Functions 无需单独部署 Flink/Spark,直接在 Broker 上运行消息 ETL |
六、使用方式的进阶技巧
6.1 分层存储配置
随着业务发展,BookKeeper 的 SSD 空间会迅速膨胀。Pulsar 支持将已关闭的 Ledger 自动卸载到对象存储:
# 配置命名空间开启分层存储 bin/pulsar-admin namespaces set-offload-policies my-company/order-service \ --offloadThresholdInBytes 10G \ --offloadDeletionLagMs 3600000 \ --driver aws-s3 \ --region us-east-1 \ --bucket pulsar-offload-data一条消息从写入到最终存储的生命周期:
消息写入 → BookKeeper (热存储, SSD) → Segment 关闭 (5分钟或 500MB) → 超过阈值 → 卸载到 S3 (冷存储) → S3 数据满 4 小时 → 从 BookKeeper 清除
6.2 消息去重
Pulsar 支持生产端幂等发送,通过 Broker 端缓存序列 ID 来避免网络重试导致的重复:
producer.newMessage() .key("order-2024-001") .sequenceId(sequenceId.getAndIncrement()) // 单调递增 .value(data) .send();Broker 收到消息后会比对 producerName + sequenceId 组合,如果发现重复则直接丢弃这条消息但返回成功确认——避免客户端超时重试带来的重复写入。
6.3 跨集群复制实战
配置 Topic 级别的跨地域复制:
# 1. 创建全局命名空间(同时存在于两个集群) bin/pulsar-admin namespaces create my-company/global-sync \ --clusters us-east,us-west # 2. 配置 Topic 级别复制 bin/pulsar-admin topics set-replication-clusters \ persistent://my-company/global-sync/order-events \ --clusters us-east,us-west # 3. 查看复制状态 bin/pulsar-admin topics stats-internal \ persistent://my-company/global-sync/order-events七、生产经验与避坑指南
7.1 BookKeeper 容量规划
BookKeeper 的每个 Bookie 节点建议 SSD 容量不超过 4TB,因为 BookKeeper 的恢复过程需要全量扫描所有 Entry Log,单节点数据量过大会导致故障恢复耗时过长。如果业务存储需求较大,宁可增加 Bookie 节点数,保持单节点数据量在可控范围。
7.2 不要滥用 Shared 订阅
Shared 订阅下消息没有顺序保证,如果你的业务需要严格有序处理,应使用 Failover 或 Key_Shared。一个经典的错误案例:某支付系统使用 Shared 订阅处理账户余额变更,导致同一账户的加钱和减钱操作被两个 Consumer 并发执行,出现了余额计算错误。
7.3 Cursor 位点保护
Pulsar 的 Cursor(消费位点)存储在 BookKeeper 中,与消息本身同等级别持久化。但这不意味着可以无视 Cursor 的保护。生产环境中建议开启Managed Ledger 的 Cursor 快照,防止大量积压时 Cursor 追赶性能下降:
bin/pulsar-admin namespaces set-subscription-expiration-time \ my-company/order-service \ --time 168h # 7 天未活动的订阅自动清理八、展望:Pulsar 3.x 及未来
2024 年发布的 Pulsar 3.3 版本引入了多项重要特性:非持久化 Topic 的跨 Broker 负载均衡优化、Oxia 元数据服务替代 ZooKeeper、以及PIP-307 事务日志压缩。其中 Oxia 是 Pulsar 社区自研的元数据服务,旨在彻底摆脱对 ZooKeeper 的依赖——这意味着部署复杂度将大幅降低,Pulsar 将成为一个真正的「自包含」系统。
长远来看,Pulsar 的「存储计算分离 + 原生计算能力」架构使其天然适配云原生和湖仓一体架构。当越来越多的企业从「消息队列」升级为「统一事件平台」时,Pulsar 的设计哲学将展现出更强的生命力。
结语
Apache Pulsar 不是一个「Kafka 替代品」,它是为云原生时代重新设计的事件流基础设施。其架构层面的创新——计算存储分离、多协议订阅、原生 Geo-Replication——并非为了差异化而差异化,而是切实解决了大规模分布式消息系统在实际运维中遇到的痛点。
如果你正在评估消息基础设施的技术选型,或者当前的 Kafka 集群已经遇到了扩缩容和跨地域复制的瓶颈,Pulsar 值得你花一周时间深入调研和 POC。毕竟架构选型的窗口期只有一次,选择一个与你的业务增长曲线相匹配的平台,是技术负责人最重要的决策之一。(内容由AI生成,仅供参考)