以下内容是Apache Kafka中ReplicaManager类(或其子类)的一部分,主要负责管理副本(replica)的状态、日志、高水位(High Watermark)、故障处理、选举等核心功能。下面我将逐段解释其作用和逻辑,帮助你理解整体设计。
1.leaderPartitionsIterator
privatedefleaderPartitionsIterator:Iterator[Partition]=nonOfflinePartitionsIterator.filter(_.leaderLogIfLocal.isDefined)- 作用:返回当前 broker 上作为Leader的所有分区(Partition)的迭代器。
- 关键点:
nonOfflinePartitionsIterator:遍历所有未下线的分区。.leaderLogIfLocal.isDefined:表示该分区在本机有 Leader 日志(即本机是该分区的 Leader)。
✅ 简单说:找出本机是 Leader 的那些分区。
2.getLogEndOffset
defgetLogEndOffset(topicPartition:TopicPartition):Option[Long]=nonOfflinePartition(topicPartition).flatMap(_.leaderLogIfLocal.map(_.logEndOffset))- 作用:获取指定分区的日志末端偏移量(Log End Offset, LEO)。
- 前提:该分区必须在线,且本机是 Leader。
- 返回
None表示该分区不在线,或者本机不是 Leader。
✅ 用于读取当前 Leader 分区写入的最新 offset。
3.checkpointHighWatermarks
defcheckpointHighWatermarks():Unit={...}- 作用:将所有分区的高水位(High Watermark, HW)持久化到磁盘上的
highwatermark文件中。 - 流程:
- 遍历所有非下线分区(包括主日志
log和未来日志futureLog,后者用于副本迁移)。 - 按日志目录(
logDir)分组,收集每个分区的 HW。 - 调用
checkpoints.write(hws)将 HW 写入对应目录的 checkpoint 文件。
- 遍历所有非下线分区(包括主日志
- 异常处理:若写入失败(如磁盘损坏),记录错误。
✅ 保证 broker 重启后能恢复正确的 HW,避免数据重复消费。
4.markPartitionOffline(仅测试用)
defmarkPartitionOffline(tp:TopicPartition):Unit=...- 作用:将指定分区标记为Offline(下线)。
- 清除相关指标(metrics)。
- 注意:注释说明“仅用于测试”,生产环境应通过日志目录故障等方式触发下线。
5.handleLogDirFailure
defhandleLogDirFailure(dir:String,sendZkNotification:Boolean=true):Unit={...}- 作用:当某个日志目录(磁盘)发生故障时,执行故障隔离。
- 关键步骤:
- 找出该目录下的所有主日志分区(
newOfflinePartitions)和未来日志分区(partitionsWithOfflineFutureReplica)。 - 停止对这些分区的副本拉取(fetcher)和日志迁移(alter log dirs)。
- 移除 future log(如果存在)。
- 调用
markPartitionOffline将主日志分区标记为离线。 - 清理相关指标和 high watermark checkpoint。
- 通知 ZooKeeper(或 KRaft 控制器)该 broker 的日志目录故障。
- 调用
logManager.handleLogDirFailure进一步处理(如标记目录 offline)。
- 找出该目录下的所有主日志分区(
✅ 核心容错机制:磁盘故障 → 自动隔离受影响分区 → 通知集群 → 触发副本重平衡。
6.removeMetrics
defremoveMetrics():Unit={...}- 作用:清理与副本管理相关的 JMX 指标(如 Leader 数、Under Replicated 分区数等)。
- 在 shutdown 时调用,避免指标残留。
7.shutdown
defshutdown(checkpointHW:Boolean=true):Unit={...}- 作用:优雅关闭
ReplicaManager。 - 流程:
- 清理指标。
- 关闭各种管理器(fetcher、log dir alter、延迟操作 purgatory)。
- 可选:持久化高水位(默认开启,测试时可关闭)。
- 关闭副本选择器(ReplicaSelector)。
✅ 确保 broker 关闭时状态一致,避免数据丢失。
8. 工厂方法(用于可扩展性)
protecteddefcreateReplicaFetcherManager(...)=...protecteddefcreateReplicaAlterLogDirsManager(...)=...protecteddefcreateReplicaSelector()=...- 允许子类重写,自定义 fetcher、log dir manager 或副本选择策略(如 rack-aware)。
9.lastOffsetForLeaderEpoch
deflastOffsetForLeaderEpoch(...):Map[...]={...}- 作用:响应
OffsetsForLeaderEpoch请求(用于副本同步、事务恢复等)。 - 根据分区状态返回:
- Online:调用分区的
lastOffsetForLeaderEpoch(查找指定 epoch 的最大 offset)。 - Offline:返回
KAFKA_STORAGE_ERROR。 - None(未知分区):返回
UNKNOWN_TOPIC_OR_PARTITION或NOT_LEADER_OR_FOLLOWER。
- Online:调用分区的
✅ 支持精确的 epoch-based offset 查询,防止日志截断(log truncation)问题。
10.electLeaders
defelectLeaders(controller,partitions,electionType,callback,timeout)={...}- 作用:触发分区 Leader 选举(如 Preferred Leader Election 或 Unclean Leader Election)。
- 流程:
- 调用
controller.electLeaders发起选举。 - 选举结果回调中:
- 若有成功选举的分区,放入
DelayedElectLeader并加入delayedElectLeaderPurgatory等待 ISR 同步或超时。 - 若全部失败,立即返回错误。
- 若有成功选举的分区,放入
- 调用
- 使用DelayedOperationPurgatory机制实现异步等待。
✅ 支持控制器驱动的 Leader 重选举,提升可用性。
总结:这段代码的核心职责
| 功能 | 说明 |
|---|---|
| Leader 管理 | 识别本机 Leader 分区,提供 LEO/HW |
| 持久化 | 定期 checkpoint 高水位 |
| 容错 | 处理磁盘故障,自动下线分区 |
| 指标 | 暴露副本健康状态 |
| 关闭 | 优雅 shutdown,保证一致性 |
| 扩展性 | 可插拔的 fetcher / selector |
| 协议支持 | 支持 LeaderEpoch 查询、Leader 选举 |
如果你是在阅读 Kafka 源码、调试副本问题、或开发自定义存储层,理解这些方法非常关键。需要更深入某一部分(比如DelayedElectLeader或ReplicaFetcherManager),可以继续提问!