上一篇【第48篇】Kafka时间轮(TimingWheel)源码解析
下一篇【第50篇】副本机制源码解析(二)——ReplicaManager的复制流程
摘要
如果有人问你:"Kafka为什么敢说自己不丢数据?“答案就两个词——副本机制(Replication)。Kafka给每条消息都准备了"备胎”,一台Broker挂了?没关系,别的Broker上还有一模一样的副本,无缝顶上去。
本文带你深入副本的"世界观":我们不会一上来就讲复杂的复制流程,而是先把副本机制的几个核心概念掰开揉碎——Replica的三种状态(Leader/Follower/Offline)是怎么切换的、LEO(Log End Offset)和HW(High Watermark)这两条"生命线"的区别、ISR这个"精英俱乐部"的入会标准、以及Leader Epoch怎么巧妙解决"HW截断"这个老问题。这些都是看懂后续ReplicaManager源码的基础,建议先打完这篇"地基"再看下一篇的复制流程。
一、为什么需要副本——没有副本的Kafka就是"裸奔"
先看一张对比图,感受一下有副本和没副本的区别:
【单副本架构——一台Broker就是你的全部身家】 Producer Consumer │ ▲ ▼ │ ┌─────────────────────────────────────┐ │ Broker 1 │ │ ┌───────────────────────────────┐ │ │ │ Partition P0 │ │ │ │ [msg0][msg1][msg2][msg3] ... │ │ │ └───────────────────────────────┘ │ └─────────────────────────────────────┘ Broker 1 宕机 → 数据丢失 → 💀💀💀 【多副本架构——每个分区都有"备胎"】 ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │ │ │ P0 [L] │──┼──┼─►│ P0 [F] │ │ │ │ P0 [F] │ │ │ │ [m0][m1] │ │ │ │ [m0][m1] │ │ │ │ [m0][m1] │ │ │ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ Leader Follower Follower Broker 1 宕机 → Broker 2 Follower 晋升为 Leader → 数据完好 ✅单副本模式就是"把所有鸡蛋放在一个篮子里",篮子翻了全没了。多副本模式下,每个Partition都有多个副本分布在不同的Broker上,一台机器挂了,别的机器上还有完整数据。
Kafka的副本配置很简单:
# server.properties default.replication.factor=3 # 默认副本数 # 创建Topic时指定 kafka-topics.sh --create \ --topic my-topic \ --partitions 6 \ --replication-factor 3 # 每个分区3个副本replication-factor=3意味着每个分区的消息会被复制到3个不同的Broker上,这3个副本构成一个"副本组"。
二、副本的三种状态——Leader、Follower、离线
在Kafka的副本世界里,一个副本同一时刻只能是以下三种状态之一:
【副本状态转换图】 ┌──────────┐ │ Offline │──── Broker重启或恢复 ────┐ │ 副本 │ │ └──────────┘ │ │ │ Broker宕机 │ 或磁盘故障 ▼ │ ┌──────────────────┐ ▼ │ │ (副本不可用) │ NewReplica │ │ 新副本 │ │ │ └──────┬───────────┘ │ Controller指定角色 │ ┌────────────────────────┴────────────────────────┐ │ │ ▼ ▼ ┌──────────────────┐ ┌──────────────────┐ │ LeaderReplica │ │ FollowerReplica │ │ 主副本 │◄──── Leader宕机,重新选举 ────│ 从副本 │ │ │─── Controller重新分配角色 ──►│ │ │ 负责读写请求 │ │ 只从Leader同步 │ └──────────────────┘ └──────────────────┘这些状态在Kafka源码中由Replica类管理。每个分区有一个Leader副本和多个Follower副本:
- Leader(主副本):负责处理所有读写请求,是分区的"对外窗口"
- Follower(从副本):只从Leader同步数据,不处理客户端请求,是分区的"备胎"
- Offline(离线):Broker宕机或磁盘故障时,该Broker上的副本进入离线状态
一个关键设计是:只有Leader副本处理读写,Follower副本不提供服务。这保证了数据的一致性——所有写入都在Leader上先发生,然后同步到Follower。这种"主从复制"模型虽然对Leader的单点压力大,但避免了分布式系统中常见的"多写冲突"问题,实现简单且可靠。
三、副本的双重身份——Local Replica vs Remote Replica
这是Kafka副本设计中最容易让人困惑的地方——同一个副本在不同语境下有不同的称呼。
【Local Replica vs Remote Replica】 Broker 1 视角: ┌──────────────────────────────────────────────────┐ │ Broker 1 │ │ │ │ ┌──────────────────────┐ ┌──────────────────┐ │ │ │ P0 Leader Replica │ │ P0 [Broker 2] │ │ │ │ (Local Replica) │ │ (Remote Replica) │ │ │ │ │ │ │ │ │ │ ● LEO = 1050 │ │ ● LEO = 1000 │ │ │ │ ● HW = 1000 │ │ ● 仅用于ISR计算 │ │ │ │ ● 实际存储数据 │ │ ● 不存储数据 │ │ │ └──────────────────────┘ └──────────────────┘ │ │ │ │ Local = 本机磁盘上有真实数据 │ │ Remote = 只是元数据记录,数据在其他Broker上 │ └──────────────────────────────────────────────────┘在源码层面,Replica类有一个关键方法区分两者:
// Replica.scala - 判断是否为Local副本defisLocal():Boolean={// Local副本在本机磁盘上有真实的日志数据log.isDefined// log不为None说明是Local Replica}Local副本在Broker本地有真正的日志文件(.log和.index),可以是Leader也可以是Follower。Remote副本只是一个"影子"——Leader所在的Broker用Remote Replica对象来跟踪其他Broker上Follower的同步进度(LEO、lastCaughtUpTime等),但它自己不存储数据。每个RemoteReplica都对应另一个Broker上的一个LocalReplica。
举个实际例子:Partition P0有3个副本,分布在Broker 1、2、3上,Broker 1是Leader。那么在Broker 1上:
- P0有一个Local副本(Leader角色,本地有数据)
- P0有两个Remote副本对象(记录Broker 2和Broker 3上Follower的同步状态,但不存数据)
而在Broker 2和Broker 3上,P0各有一个Local副本(Follower角色,本地有数据)。
这个设计让Leader能够精确感知每个Follower的同步进度,从而做出ISR扩容/缩容、HW推进等决策。
四、LEO与HW——两个最重要的"水位线"
如果说副本机制是Kafka的灵魂,那LEO和HW就是灵魂的两盏"红绿灯",控制着消息的可见性和可靠性。
【LEO与HW的关系图解】 Partition P0 (Leader on Broker 1) ┌────┬────┬────┬────┬────┬────┬────┬────┬────┬────┬───┬───┐ │msg │msg │msg │msg │msg │msg │msg │msg │msg │msg │ │ │ │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ 8 │ 9 │ │ │ └────┴────┴────┴────┴────┴────┴────┴────┴────┴────┴───┴───┘ ▲ ▲ ▲ │ │ │ │ HW = 6 LEO = 10 │ (所有ISR已确认) (下一条消息写入位置) │ offset=0 ┌────────────────────────────────────────────────────────┐ │ 消费者只能消费 HW 之前的消息(msg0~msg5) │ │ 生产者写入的消息先追加到 LEO 位置 │ │ 当所有 ISR 副本都同步了 msg6 后,HW 推进到 7 │ └────────────────────────────────────────────────────────┘这两个水位线的正式定义:
| 概念 | 全称 | 含义 | 谁维护 | 读可见性 |
|---|---|---|---|---|
| LEO | Log End Offset | 日志末尾的下一条消息的offset,即下一条写入消息的位置 | 每个副本独立维护 | — |
| HW | High Watermark | 分区中所有ISR副本都已确认的"安全水位",消费者只能读到HW之前的消息 | Leader副本维护 | 决定消费者能看到哪些消息 |
LEO始终大于等于HW(LEO >= HW)。当Leader收到新消息后,LEO先推进,HW不变;等所有ISR副本都同步了这条消息,HW才会追上LEO。
在源码中,Replica类通过以下字段实现这两个水位线:
// Replica.scala - HW与LEO的核心定义classReplica(val brokerId:Int,val partition:Partition,time:Time=SystemTime,initialHighWatermarkValue:Long=0L,val log:Option[Log]=None)extendsLogging{// LEO:通过logEndOffset方法获取(内部调用log.logEndOffset)def logEndOffset:LogOffsetMetadata={log match{caseSome(l)=>newLogOffsetMetadata(l.logEndOffset)// Local副本caseNone=>LogOffsetMetadata.UnknownOffsetMetadata// Remote副本}}// HW:记录所有ISR副本都已确认的offset位置@volatileprivatevarhighWatermarkMetadata:LogOffsetMetadata=newLogOffsetMetadata(initialHighWatermarkValue)def highWatermark:LogOffsetMetadata=highWatermarkMetadata// 设置HW的值(只允许增大,不能回退)def highWatermark_=(newHighWatermark:LogOffsetMetadata):Unit={if(newHighWatermark.messageOffset>highWatermarkMetadata.messageOffset){highWatermarkMetadata=newHighWatermark}}}注意到highWatermark_=方法中的关键逻辑——HW只能增大,不能回退。这个约束保证了消费者看到的offset不会倒退,避免了重复消费的问题。
当Follower副本从Leader拉取消息时,需要更新自己的读取结果:
// Replica.scala - 更新Follower的读取状态defupdateLogReadResult(logReadResult:LogReadResult):Unit={// 更新LEOif(logReadResult.info.fetchOffsetMetadata.messageOffset>=this.logEndOffset.messageOffset){this.logEndOffset=logReadResult.info.fetchOffsetMetadata}// HW取Leader返回的HW值和当前本地HW的较小值if(logReadResult.hw<this.highWatermark.messageOffset){this.highWatermark=newLogOffsetMetadata(logReadResult.hw)}}| 对比维度 | LEO | HW |
|---|---|---|
| 所属副本 | 每个副本独立维护 | Leader维护全局值 |
| 变化方向 | 单调递增 | 单调递增 |
| 触发更新 | 写入新消息后立即更新 | 所有ISR副本确认后更新 |
| 消费者可见 | 否 | 是(只读HW之前的消息) |
| Follower用途 | 记录自己追到了哪里 | 确认哪些消息可提交 |
五、Leader Epoch——HW截断问题的"救火队长"
上面说了HW只能前进不能后退,但这引发了一个经典问题:
【HW截断问题的场景图解】 时刻 T0: Broker 1是Leader, LEO=10, HW=6 ┌──────────────────────────────────────┐ │ Broker1 [L] [0][1][2][3][4][5]▌[6][7][8][9] │ HW=6 │ Broker2 [F] [0][1][2][3][4][5]▌ │ HW=6 └──────────────────────────────────────┘ 时刻 T1: Broker 1挂了,Broker 2 成为新Leader ┌──────────────────────────────────────┐ │ Broker2 [L] [0][1][2][3][4][5]▌ │ HW=6, LEO=6 │ Broker1 [F] [0][1][2][3][4][5]▌[6][7][8][9] │ 旧Leader恢复 └──────────────────────────────────────┘ 时刻 T2: Broker 1 作为Follower恢复,发现自己的LEO>新Leader的LEO 需要截断到新Leader的HW=6 问题:Broker 1上的 msg6~msg9 被截断了,但它们可能已被客户端消费! (如果原Leader在当时已推进了HW)这个场景中,旧Leader恢复后变成Follower,发现自己比新Leader"多了一截尾巴",只能截断到新Leader的HW位置。但如果被截断的消息之前已经被消费者读走了,消费者重启后会发现"消息不见了",这就是消息丢失。
Leader Epoch就是为了解决这个问题而引入的。它的核心思想是:给每一任Leader打上"任期号",每次Leader变更时Epoch递增。
【Leader Epoch 工作原理】 Epoch 0 (Broker1是Leader, LEO=10, HW=4) ┌────────────────────────────────────────────────┐ │ Broker1 [L] [0][1][2][3]▌[4][5][6][7][8][9] │ HW=4 └────────────────────────────────────────────────┘ Epoch 1 (Broker1挂了,Broker2成为新Leader, LEO=6) ┌────────────────────────────────────────────────┐ │ Broker2 [L] [0][1][2][3][4][5]▌ │ HW=6, Epoch=1 │ ▲ │ Epoch Change = (1, 4) ← 记录Leader变更时的offset └────────────────────────────────────────────────┘ Broker1 恢复后: 1. 查询 Epoch=1 对应的起始offset = 4 2. 将log截断到 offset=4 (而不是HW=6) 3. 从 offset=4 开始向新Leader同步 4. 结果:msg4~msg9全部被从Leader重新拉取,避免丢失Leader Epoch在源码中的实现:
// LeaderEpochFileCache.scala - Leader Epoch核心逻辑classLeaderEpochFileCache{// epoch -> (startOffset, ...)privateval epochs:util.TreeMap[Integer,EpochEntry]=newutil.TreeMap()/** * 根据Leader Epoch确定Follower应该截断到哪个offset * * @param leaderEpoch Follower当前的Leader Epoch * @return 截断到哪个offset */defendOffsetFor(leaderEpoch:Int):Long={val entry=epochs.get(leaderEpoch)if(entry==null){// 没有找到对应epoch的记录,截断到最早的offsetundecidedOffset()}else{// 返回该Epoch结束时的offsetentry.endOffset}}// 记录新的Leader Epochdefassign(leaderEpoch:Int,startOffset:Long):Unit={val entry=newEpochEntry(leaderEpoch,startOffset)epochs.put(leaderEpoch,entry)// 清理旧的、已无用的Epoch记录truncateFromEnd(leaderEpoch)}}| 对比维度 | HW机制(旧) | Leader Epoch机制(新,0.11+) |
|---|---|---|
| 截断依据 | Leader的HW值 | Leader Epoch对应的startOffset |
| 消息丢失风险 | 高(可能截断掉已消费消息) | 低(截断到Epoch变更点的offset) |
| 实现复杂度 | 简单 | 中等(需维护Epoch→Offset映射) |
| Kafka版本 | 0.10及之前 | 0.11开始默认启用 |
| 存储位置 | 无独立存储 | leader-epoch-checkpoint文件 |
六、ISR——"步调一致"的副本俱乐部
ISR(In-Sync Replicas,同步副本集)是Kafka副本机制最核心的概念之一。它本质上是一个动态维护的"精英副本俱乐部"——只有跟得上Leader步伐的副本才能留在ISR里。
// Partition.scala - ISR的核心定义classPartition(val topic:String,val partitionId:Int,time:Time,replicaManager:ReplicaManager)extendsLogging{// ISR集合:当前所有与Leader保持同步的副本@volatilevarinSyncReplicas:Set[Replica]=Set.empty[Replica]// AR集合:Assigned Replicas,该分区被分配的所有副本@volatilevarassignedReplicas:Set[Replica]=Set.empty[Replica]// Leader副本varleaderReplicaIdOpt:Option[Int]=None}三个集合的关系非常重要:
【AR vs ISR vs OSR 的关系】 AR (Assigned Replicas) = 所有被分配了该分区副本的Broker ┌─────────────────────────────────────────────────┐ │ Broker 1 │ Broker 2 │ Broker 3 │ Broker 4│ │ [Leader] │ [Follower] │ [Follower] │[Follower]│ └─────────────────────────────────────────────────┘ ISR (In-Sync Replicas) = AR中跟上Leader步伐的副本 ┌──────────────────────────────────┐ │ Broker 1 │ Broker 2 │ Broker 3│ │ [Leader] │ [Follower] │[Follower]│ └──────────────────────────────────┘ OSR (Out-of-Sync Replicas) = AR - ISR = 掉队的副本 ┌─────────┐ │ Broker 4│ ← 网络慢 / 磁盘慢 / 挂了 → 踢出ISR └─────────┘一个副本被踢出ISR的判断标准由两个参数控制:
| 参数 | 默认值 | 含义 |
|---|---|---|
replica.lag.time.max.ms | 10000ms | Follower超过10秒没有fetch请求或没追上Leader,踢出ISR |
replica.lag.max.messages | 已废弃(0.9+) | 基于消息数判断滞后(早期Kafka的参数) |
ISR的扩容和缩容在源码中由Partition类的两个方法实现:
// Partition.scala - ISR扩容(maybeExpandIsr)defmaybeExpandIsr(replicaId:Int):Unit={// 获取或创建指定replicaId对应的副本对象val replica=getReplica(replicaId)if(replica==null)returnval leaderHWIncremented=inWriteLock(leaderIsrUpdateLock){// 条件1:该副本当前不在ISR中// 条件2:该副本的LEO >= Leader的HW(追上了Leader)if(!inSyncReplicas.contains(replica)&&replica.logEndOffset.messageOffset>=leaderReplica.highWatermark.messageOffset){// 将副本加入ISRval newInSyncReplicas=inSyncReplicas+replica inSyncReplicas=newInSyncReplicas// 记录ISR变更,触发后续的ISR-change通知isrChangeSet+=newTopicAndPartition(topic,partitionId)true}else{false}}// 如果ISR扩张,可能推进HWif(leaderHWIncremented){maybeIncrementLeaderHW(leaderReplica)}}// Partition.scala - ISR缩容(maybeShrinkIsr)defmaybeShrinkIsr(replicaMaxLagTimeMs:Long):Unit={val leaderHWIncremented=inWriteLock(leaderIsrUpdateLock){// 检查ISR中哪些Follower的lastCaughtUpTime超过了阈值val outOfSyncReplicas=inSyncReplicas.filter{replica=>replica.logEndOffset.messageOffset<leaderReplica.logEndOffset.messageOffset&&(time.milliseconds()-replica.lastCaughtUpTimeMs)>replicaMaxLagTimeMs}if(outOfSyncReplicas.nonEmpty){// 将滞后副本从ISR中移除inSyncReplicas=inSyncReplicas--outOfSyncReplicas// 记录ISR变更isrChangeSet+=newTopicAndPartition(topic,partitionId)true}else{false}}// 如果ISR收缩,可能需要回退HW(因为ISR变小了)if(leaderHWIncremented){maybeIncrementLeaderHW(leaderReplica)}}ISR缩容的关键是lastCaughtUpTimeMs,它记录的是Follower副本最后一次赶上Leader LEO的时间。如果超过replica.lag.time.max.ms还没赶上,就会被踢出ISR。这个设计很精妙:它不看"落后多少条消息",只看"落后了多长时间",避免了大流量分区中频繁的ISR抖动。
本篇小结
这篇文章帮你建立了Kafka副本机制的"世界观":
- 副本的三种状态:Leader(读写窗口)、Follower(备份跟班)、Offline(休假中),角色切换由Controller通过LeaderAndIsrRequest统一指挥
- Local vs Remote:Local副本是"住持",在磁盘上有真实数据;Remote副本是"影子",Leader用来跟踪Follower同步进度但不存数据
- LEO和HW:LEO是"写到哪里了",HW是"哪些消息安全了",消费者只能消费HW之前的消息,保证消费到的数据不会回退
- Leader Epoch:解决HW截断的"连锁灾难",用任期号精确定位截断点,避免消息丢失
- ISR俱乐部:动态管理的"精英副本集合",扩容标准是"LEO追上HW",缩容标准是"超时没跟上"
下一篇我们将接着看ReplicaManager——它怎么调度副本的角色切换、怎么管理ISR的变更、怎么协调Follower的同步线程。
上一篇【第48篇】Kafka时间轮(TimingWheel)源码解析
下一篇【第50篇】副本机制源码解析(二)——ReplicaManager的复制流程