news 2026/6/13 13:32:43

【Kafka源码解读和使用指南】第49篇:Kafka副本机制源码解析(一)——副本的“世界观“

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
【Kafka源码解读和使用指南】第49篇:Kafka副本机制源码解析(一)——副本的“世界观“

上一篇【第48篇】Kafka时间轮(TimingWheel)源码解析
下一篇【第50篇】副本机制源码解析(二)——ReplicaManager的复制流程


摘要

如果有人问你:"Kafka为什么敢说自己不丢数据?“答案就两个词——副本机制(Replication)。Kafka给每条消息都准备了"备胎”,一台Broker挂了?没关系,别的Broker上还有一模一样的副本,无缝顶上去。

本文带你深入副本的"世界观":我们不会一上来就讲复杂的复制流程,而是先把副本机制的几个核心概念掰开揉碎——Replica的三种状态(Leader/Follower/Offline)是怎么切换的、LEO(Log End Offset)和HW(High Watermark)这两条"生命线"的区别、ISR这个"精英俱乐部"的入会标准、以及Leader Epoch怎么巧妙解决"HW截断"这个老问题。这些都是看懂后续ReplicaManager源码的基础,建议先打完这篇"地基"再看下一篇的复制流程。


一、为什么需要副本——没有副本的Kafka就是"裸奔"

先看一张对比图,感受一下有副本和没副本的区别:

【单副本架构——一台Broker就是你的全部身家】 Producer Consumer │ ▲ ▼ │ ┌─────────────────────────────────────┐ │ Broker 1 │ │ ┌───────────────────────────────┐ │ │ │ Partition P0 │ │ │ │ [msg0][msg1][msg2][msg3] ... │ │ │ └───────────────────────────────┘ │ └─────────────────────────────────────┘ Broker 1 宕机 → 数据丢失 → 💀💀💀 【多副本架构——每个分区都有"备胎"】 ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │ │ │ P0 [L] │──┼──┼─►│ P0 [F] │ │ │ │ P0 [F] │ │ │ │ [m0][m1] │ │ │ │ [m0][m1] │ │ │ │ [m0][m1] │ │ │ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ Leader Follower Follower Broker 1 宕机 → Broker 2 Follower 晋升为 Leader → 数据完好 ✅

单副本模式就是"把所有鸡蛋放在一个篮子里",篮子翻了全没了。多副本模式下,每个Partition都有多个副本分布在不同的Broker上,一台机器挂了,别的机器上还有完整数据。

Kafka的副本配置很简单:

# server.properties default.replication.factor=3 # 默认副本数 # 创建Topic时指定 kafka-topics.sh --create \ --topic my-topic \ --partitions 6 \ --replication-factor 3 # 每个分区3个副本

replication-factor=3意味着每个分区的消息会被复制到3个不同的Broker上,这3个副本构成一个"副本组"。


二、副本的三种状态——Leader、Follower、离线

在Kafka的副本世界里,一个副本同一时刻只能是以下三种状态之一:

【副本状态转换图】 ┌──────────┐ │ Offline │──── Broker重启或恢复 ────┐ │ 副本 │ │ └──────────┘ │ │ │ Broker宕机 │ 或磁盘故障 ▼ │ ┌──────────────────┐ ▼ │ │ (副本不可用) │ NewReplica │ │ 新副本 │ │ │ └──────┬───────────┘ │ Controller指定角色 │ ┌────────────────────────┴────────────────────────┐ │ │ ▼ ▼ ┌──────────────────┐ ┌──────────────────┐ │ LeaderReplica │ │ FollowerReplica │ │ 主副本 │◄──── Leader宕机,重新选举 ────│ 从副本 │ │ │─── Controller重新分配角色 ──►│ │ │ 负责读写请求 │ │ 只从Leader同步 │ └──────────────────┘ └──────────────────┘

这些状态在Kafka源码中由Replica类管理。每个分区有一个Leader副本和多个Follower副本:

  • Leader(主副本):负责处理所有读写请求,是分区的"对外窗口"
  • Follower(从副本):只从Leader同步数据,不处理客户端请求,是分区的"备胎"
  • Offline(离线):Broker宕机或磁盘故障时,该Broker上的副本进入离线状态

一个关键设计是:只有Leader副本处理读写,Follower副本不提供服务。这保证了数据的一致性——所有写入都在Leader上先发生,然后同步到Follower。这种"主从复制"模型虽然对Leader的单点压力大,但避免了分布式系统中常见的"多写冲突"问题,实现简单且可靠。


三、副本的双重身份——Local Replica vs Remote Replica

这是Kafka副本设计中最容易让人困惑的地方——同一个副本在不同语境下有不同的称呼

【Local Replica vs Remote Replica】 Broker 1 视角: ┌──────────────────────────────────────────────────┐ │ Broker 1 │ │ │ │ ┌──────────────────────┐ ┌──────────────────┐ │ │ │ P0 Leader Replica │ │ P0 [Broker 2] │ │ │ │ (Local Replica) │ │ (Remote Replica) │ │ │ │ │ │ │ │ │ │ ● LEO = 1050 │ │ ● LEO = 1000 │ │ │ │ ● HW = 1000 │ │ ● 仅用于ISR计算 │ │ │ │ ● 实际存储数据 │ │ ● 不存储数据 │ │ │ └──────────────────────┘ └──────────────────┘ │ │ │ │ Local = 本机磁盘上有真实数据 │ │ Remote = 只是元数据记录,数据在其他Broker上 │ └──────────────────────────────────────────────────┘

在源码层面,Replica类有一个关键方法区分两者:

// Replica.scala - 判断是否为Local副本defisLocal():Boolean={// Local副本在本机磁盘上有真实的日志数据log.isDefined// log不为None说明是Local Replica}

Local副本在Broker本地有真正的日志文件(.log.index),可以是Leader也可以是Follower。Remote副本只是一个"影子"——Leader所在的Broker用Remote Replica对象来跟踪其他Broker上Follower的同步进度(LEO、lastCaughtUpTime等),但它自己不存储数据。每个RemoteReplica都对应另一个Broker上的一个LocalReplica

举个实际例子:Partition P0有3个副本,分布在Broker 1、2、3上,Broker 1是Leader。那么在Broker 1上:

  • P0有一个Local副本(Leader角色,本地有数据)
  • P0有两个Remote副本对象(记录Broker 2和Broker 3上Follower的同步状态,但不存数据)

而在Broker 2和Broker 3上,P0各有一个Local副本(Follower角色,本地有数据)。

这个设计让Leader能够精确感知每个Follower的同步进度,从而做出ISR扩容/缩容、HW推进等决策。


四、LEO与HW——两个最重要的"水位线"

如果说副本机制是Kafka的灵魂,那LEO和HW就是灵魂的两盏"红绿灯",控制着消息的可见性和可靠性。

【LEO与HW的关系图解】 Partition P0 (Leader on Broker 1) ┌────┬────┬────┬────┬────┬────┬────┬────┬────┬────┬───┬───┐ │msg │msg │msg │msg │msg │msg │msg │msg │msg │msg │ │ │ │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ 8 │ 9 │ │ │ └────┴────┴────┴────┴────┴────┴────┴────┴────┴────┴───┴───┘ ▲ ▲ ▲ │ │ │ │ HW = 6 LEO = 10 │ (所有ISR已确认) (下一条消息写入位置) │ offset=0 ┌────────────────────────────────────────────────────────┐ │ 消费者只能消费 HW 之前的消息(msg0~msg5) │ │ 生产者写入的消息先追加到 LEO 位置 │ │ 当所有 ISR 副本都同步了 msg6 后,HW 推进到 7 │ └────────────────────────────────────────────────────────┘

这两个水位线的正式定义:

概念全称含义谁维护读可见性
LEOLog End Offset日志末尾的下一条消息的offset,即下一条写入消息的位置每个副本独立维护
HWHigh Watermark分区中所有ISR副本都已确认的"安全水位",消费者只能读到HW之前的消息Leader副本维护决定消费者能看到哪些消息

LEO始终大于等于HW(LEO >= HW)。当Leader收到新消息后,LEO先推进,HW不变;等所有ISR副本都同步了这条消息,HW才会追上LEO。

在源码中,Replica类通过以下字段实现这两个水位线:

// Replica.scala - HW与LEO的核心定义classReplica(val brokerId:Int,val partition:Partition,time:Time=SystemTime,initialHighWatermarkValue:Long=0L,val log:Option[Log]=None)extendsLogging{// LEO:通过logEndOffset方法获取(内部调用log.logEndOffset)def logEndOffset:LogOffsetMetadata={log match{caseSome(l)=>newLogOffsetMetadata(l.logEndOffset)// Local副本caseNone=>LogOffsetMetadata.UnknownOffsetMetadata// Remote副本}}// HW:记录所有ISR副本都已确认的offset位置@volatileprivatevarhighWatermarkMetadata:LogOffsetMetadata=newLogOffsetMetadata(initialHighWatermarkValue)def highWatermark:LogOffsetMetadata=highWatermarkMetadata// 设置HW的值(只允许增大,不能回退)def highWatermark_=(newHighWatermark:LogOffsetMetadata):Unit={if(newHighWatermark.messageOffset>highWatermarkMetadata.messageOffset){highWatermarkMetadata=newHighWatermark}}}

注意到highWatermark_=方法中的关键逻辑——HW只能增大,不能回退。这个约束保证了消费者看到的offset不会倒退,避免了重复消费的问题。

当Follower副本从Leader拉取消息时,需要更新自己的读取结果:

// Replica.scala - 更新Follower的读取状态defupdateLogReadResult(logReadResult:LogReadResult):Unit={// 更新LEOif(logReadResult.info.fetchOffsetMetadata.messageOffset>=this.logEndOffset.messageOffset){this.logEndOffset=logReadResult.info.fetchOffsetMetadata}// HW取Leader返回的HW值和当前本地HW的较小值if(logReadResult.hw<this.highWatermark.messageOffset){this.highWatermark=newLogOffsetMetadata(logReadResult.hw)}}
对比维度LEOHW
所属副本每个副本独立维护Leader维护全局值
变化方向单调递增单调递增
触发更新写入新消息后立即更新所有ISR副本确认后更新
消费者可见是(只读HW之前的消息)
Follower用途记录自己追到了哪里确认哪些消息可提交

五、Leader Epoch——HW截断问题的"救火队长"

上面说了HW只能前进不能后退,但这引发了一个经典问题:

【HW截断问题的场景图解】 时刻 T0: Broker 1是Leader, LEO=10, HW=6 ┌──────────────────────────────────────┐ │ Broker1 [L] [0][1][2][3][4][5]▌[6][7][8][9] │ HW=6 │ Broker2 [F] [0][1][2][3][4][5]▌ │ HW=6 └──────────────────────────────────────┘ 时刻 T1: Broker 1挂了,Broker 2 成为新Leader ┌──────────────────────────────────────┐ │ Broker2 [L] [0][1][2][3][4][5]▌ │ HW=6, LEO=6 │ Broker1 [F] [0][1][2][3][4][5]▌[6][7][8][9] │ 旧Leader恢复 └──────────────────────────────────────┘ 时刻 T2: Broker 1 作为Follower恢复,发现自己的LEO>新Leader的LEO 需要截断到新Leader的HW=6 问题:Broker 1上的 msg6~msg9 被截断了,但它们可能已被客户端消费! (如果原Leader在当时已推进了HW)

这个场景中,旧Leader恢复后变成Follower,发现自己比新Leader"多了一截尾巴",只能截断到新Leader的HW位置。但如果被截断的消息之前已经被消费者读走了,消费者重启后会发现"消息不见了",这就是消息丢失

Leader Epoch就是为了解决这个问题而引入的。它的核心思想是:给每一任Leader打上"任期号",每次Leader变更时Epoch递增。

【Leader Epoch 工作原理】 Epoch 0 (Broker1是Leader, LEO=10, HW=4) ┌────────────────────────────────────────────────┐ │ Broker1 [L] [0][1][2][3]▌[4][5][6][7][8][9] │ HW=4 └────────────────────────────────────────────────┘ Epoch 1 (Broker1挂了,Broker2成为新Leader, LEO=6) ┌────────────────────────────────────────────────┐ │ Broker2 [L] [0][1][2][3][4][5]▌ │ HW=6, Epoch=1 │ ▲ │ Epoch Change = (1, 4) ← 记录Leader变更时的offset └────────────────────────────────────────────────┘ Broker1 恢复后: 1. 查询 Epoch=1 对应的起始offset = 4 2. 将log截断到 offset=4 (而不是HW=6) 3. 从 offset=4 开始向新Leader同步 4. 结果:msg4~msg9全部被从Leader重新拉取,避免丢失

Leader Epoch在源码中的实现:

// LeaderEpochFileCache.scala - Leader Epoch核心逻辑classLeaderEpochFileCache{// epoch -> (startOffset, ...)privateval epochs:util.TreeMap[Integer,EpochEntry]=newutil.TreeMap()/** * 根据Leader Epoch确定Follower应该截断到哪个offset * * @param leaderEpoch Follower当前的Leader Epoch * @return 截断到哪个offset */defendOffsetFor(leaderEpoch:Int):Long={val entry=epochs.get(leaderEpoch)if(entry==null){// 没有找到对应epoch的记录,截断到最早的offsetundecidedOffset()}else{// 返回该Epoch结束时的offsetentry.endOffset}}// 记录新的Leader Epochdefassign(leaderEpoch:Int,startOffset:Long):Unit={val entry=newEpochEntry(leaderEpoch,startOffset)epochs.put(leaderEpoch,entry)// 清理旧的、已无用的Epoch记录truncateFromEnd(leaderEpoch)}}
对比维度HW机制(旧)Leader Epoch机制(新,0.11+)
截断依据Leader的HW值Leader Epoch对应的startOffset
消息丢失风险高(可能截断掉已消费消息)低(截断到Epoch变更点的offset)
实现复杂度简单中等(需维护Epoch→Offset映射)
Kafka版本0.10及之前0.11开始默认启用
存储位置无独立存储leader-epoch-checkpoint文件

六、ISR——"步调一致"的副本俱乐部

ISR(In-Sync Replicas,同步副本集)是Kafka副本机制最核心的概念之一。它本质上是一个动态维护的"精英副本俱乐部"——只有跟得上Leader步伐的副本才能留在ISR里。

// Partition.scala - ISR的核心定义classPartition(val topic:String,val partitionId:Int,time:Time,replicaManager:ReplicaManager)extendsLogging{// ISR集合:当前所有与Leader保持同步的副本@volatilevarinSyncReplicas:Set[Replica]=Set.empty[Replica]// AR集合:Assigned Replicas,该分区被分配的所有副本@volatilevarassignedReplicas:Set[Replica]=Set.empty[Replica]// Leader副本varleaderReplicaIdOpt:Option[Int]=None}

三个集合的关系非常重要:

【AR vs ISR vs OSR 的关系】 AR (Assigned Replicas) = 所有被分配了该分区副本的Broker ┌─────────────────────────────────────────────────┐ │ Broker 1 │ Broker 2 │ Broker 3 │ Broker 4│ │ [Leader] │ [Follower] │ [Follower] │[Follower]│ └─────────────────────────────────────────────────┘ ISR (In-Sync Replicas) = AR中跟上Leader步伐的副本 ┌──────────────────────────────────┐ │ Broker 1 │ Broker 2 │ Broker 3│ │ [Leader] │ [Follower] │[Follower]│ └──────────────────────────────────┘ OSR (Out-of-Sync Replicas) = AR - ISR = 掉队的副本 ┌─────────┐ │ Broker 4│ ← 网络慢 / 磁盘慢 / 挂了 → 踢出ISR └─────────┘

一个副本被踢出ISR的判断标准由两个参数控制:

参数默认值含义
replica.lag.time.max.ms10000msFollower超过10秒没有fetch请求或没追上Leader,踢出ISR
replica.lag.max.messages已废弃(0.9+)基于消息数判断滞后(早期Kafka的参数)

ISR的扩容缩容在源码中由Partition类的两个方法实现:

// Partition.scala - ISR扩容(maybeExpandIsr)defmaybeExpandIsr(replicaId:Int):Unit={// 获取或创建指定replicaId对应的副本对象val replica=getReplica(replicaId)if(replica==null)returnval leaderHWIncremented=inWriteLock(leaderIsrUpdateLock){// 条件1:该副本当前不在ISR中// 条件2:该副本的LEO >= Leader的HW(追上了Leader)if(!inSyncReplicas.contains(replica)&&replica.logEndOffset.messageOffset>=leaderReplica.highWatermark.messageOffset){// 将副本加入ISRval newInSyncReplicas=inSyncReplicas+replica inSyncReplicas=newInSyncReplicas// 记录ISR变更,触发后续的ISR-change通知isrChangeSet+=newTopicAndPartition(topic,partitionId)true}else{false}}// 如果ISR扩张,可能推进HWif(leaderHWIncremented){maybeIncrementLeaderHW(leaderReplica)}}// Partition.scala - ISR缩容(maybeShrinkIsr)defmaybeShrinkIsr(replicaMaxLagTimeMs:Long):Unit={val leaderHWIncremented=inWriteLock(leaderIsrUpdateLock){// 检查ISR中哪些Follower的lastCaughtUpTime超过了阈值val outOfSyncReplicas=inSyncReplicas.filter{replica=>replica.logEndOffset.messageOffset<leaderReplica.logEndOffset.messageOffset&&(time.milliseconds()-replica.lastCaughtUpTimeMs)>replicaMaxLagTimeMs}if(outOfSyncReplicas.nonEmpty){// 将滞后副本从ISR中移除inSyncReplicas=inSyncReplicas--outOfSyncReplicas// 记录ISR变更isrChangeSet+=newTopicAndPartition(topic,partitionId)true}else{false}}// 如果ISR收缩,可能需要回退HW(因为ISR变小了)if(leaderHWIncremented){maybeIncrementLeaderHW(leaderReplica)}}

ISR缩容的关键是lastCaughtUpTimeMs,它记录的是Follower副本最后一次赶上Leader LEO的时间。如果超过replica.lag.time.max.ms还没赶上,就会被踢出ISR。这个设计很精妙:它不看"落后多少条消息",只看"落后了多长时间",避免了大流量分区中频繁的ISR抖动。


本篇小结

这篇文章帮你建立了Kafka副本机制的"世界观":

  • 副本的三种状态:Leader(读写窗口)、Follower(备份跟班)、Offline(休假中),角色切换由Controller通过LeaderAndIsrRequest统一指挥
  • Local vs Remote:Local副本是"住持",在磁盘上有真实数据;Remote副本是"影子",Leader用来跟踪Follower同步进度但不存数据
  • LEO和HW:LEO是"写到哪里了",HW是"哪些消息安全了",消费者只能消费HW之前的消息,保证消费到的数据不会回退
  • Leader Epoch:解决HW截断的"连锁灾难",用任期号精确定位截断点,避免消息丢失
  • ISR俱乐部:动态管理的"精英副本集合",扩容标准是"LEO追上HW",缩容标准是"超时没跟上"

下一篇我们将接着看ReplicaManager——它怎么调度副本的角色切换、怎么管理ISR的变更、怎么协调Follower的同步线程。


上一篇【第48篇】Kafka时间轮(TimingWheel)源码解析
下一篇【第50篇】副本机制源码解析(二)——ReplicaManager的复制流程


版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/13 13:31:21

redis-windows 安装 redis 到 windows 电脑

目录 前言一、下载二、使用1.前台模式运行2.后台模式运行 前言 redis-windows 是一个 由官方 Redis Windows 源代码编译而成的软件&#xff0c;你可以使用 redis-windows 在 windows 系统快速安装 Redis 用于本地开发和学习。 如果你想在 windows 电脑上练习 Redis 命令&…

作者头像 李华
网站建设 2026/6/13 13:25:53

5分钟打造专属桌面伙伴:DyberPet让你的电脑桌面不再孤单

5分钟打造专属桌面伙伴&#xff1a;DyberPet让你的电脑桌面不再孤单 【免费下载链接】DyberPet Desktop Cyber Pet Framework based on PySide6 项目地址: https://gitcode.com/GitHub_Trending/dy/DyberPet 你是否厌倦了单调的电脑桌面&#xff1f;是否希望有个可爱的小…

作者头像 李华
网站建设 2026/6/13 13:24:40

工科毕设代码难题怎么破?百考通AI一站式解决代码开发痛点

对于计算机、电子信息、自动化、机械等工科专业的同学来说&#xff0c;毕业论文的核心难点往往不是理论撰写&#xff0c;而是程序代码开发。多数学生可以顺利梳理论文框架、完成文献综述与理论分析&#xff0c;却频频卡在代码环节&#xff1a;框架搭建无从下手、算法逻辑梳理混…

作者头像 李华
网站建设 2026/6/13 13:21:55

3D打印你的2026世界杯派对:奖杯、吉祥物模型合集来了

一觉醒来&#xff0c;世界杯来了。2026年国际足联世界杯已于北京时间6月12日凌晨正式开幕。作为世界杯历史上首次由美国、加拿大、墨西哥三国共同承办的赛事&#xff0c;本届世界杯将于2026年6月11日至7月19日举行&#xff0c;共有48支球队参赛&#xff0c;赛事总场次达到104场…

作者头像 李华
网站建设 2026/6/13 13:19:51

EdgeRemover终极解决方案:专业级Windows Edge浏览器管理工具

EdgeRemover终极解决方案&#xff1a;专业级Windows Edge浏览器管理工具 【免费下载链接】EdgeRemover A PowerShell script that correctly uninstalls or reinstalls Microsoft Edge on Windows 10 & 11. 项目地址: https://gitcode.com/gh_mirrors/ed/EdgeRemover …

作者头像 李华
网站建设 2026/6/13 13:18:53

终极指南:如何在WPS Office中无缝集成Zotero文献管理工具

终极指南&#xff1a;如何在WPS Office中无缝集成Zotero文献管理工具 【免费下载链接】WPS-Zotero An add-on for WPS Writer to integrate with Zotero. 项目地址: https://gitcode.com/gh_mirrors/wp/WPS-Zotero 还在为学术写作中的文献引用而头疼吗&#xff1f;WPS-Z…

作者头像 李华