Kafka ISR与AR深度解析:副本同步机制核心概念
Kafka ISR与AR深度解析:副本同步机制核心概念
🌺The Begin🌺点点关注,收藏不迷路🌺 |
关键词:Kafka、ISR、AR、副本同步、Leader选举、高可用、数据一致性
在Kafka的分布式架构中,副本机制是保证高可用和数据一致性的基石。而**ISR(In-Sync Replicas)和AR(Assigned Replicas)**则是理解副本同步机制的两个核心概念。
今天,我们将深入剖析这两个概念的本质、作用以及它们在Kafka高可用架构中的关键地位。
一、核心概念定义
1.1 AR:分区的所有副本
AR(Assigned Replicas):一个分区分配的所有副本集合,包括Leader和所有Follower。
一个分区的AR
Leader副本
负责读写
AR集合
Follower副本1
同步数据
Follower副本2
同步数据
AR = Leader + 所有Follower
1.2 ISR:与Leader保持同步的副本
ISR(In-Sync Replicas):与Leader保持同步的Follower集合,包括Leader自身。只有ISR中的副本才有资格被选举为新的Leader。
ISR与AR的关系
ISR动态集合
ISR: 副本0,1,2,3
与Leader同步的4个副本
主题分区
AR: 副本0,1,2,3,4
分配的所有5个副本
Leader 0
始终在ISR中
Follower 1
同步正常
Follower 2
同步正常
Follower 3
同步正常
Follower 4
同步落后
不在ISR中
1.3 官方定义
| 概念 | 全称 | 定义 | 包含 |
|---|---|---|---|
| AR | Assigned Replicas | 分区分配的所有副本 | Leader + 所有Follower |
| ISR | In-Sync Replicas | 与Leader保持同步的副本 | Leader + 同步的Follower |
二、ISR的判定标准
2.1 同步的判断条件
// Kafka源码中的判断逻辑(概念性)classReplicaManager{ def isInSync(replica:Replica):Boolean={// 1. 必须是Follower(或Leader)// 2. 必须与Leader保持同步// 主要判断依据:// - Follower的LEO(Log End Offset)要接近Leader的LEO// - Follower的拉取请求要及时// - 在replica.lag.time.max.ms时间内有拉取请求 val maxLagMs = config.replicaLagTimeMaxMs val lastCaughtUpTimeMs = replica.lastCaughtUpTimeMs val currentTimeMs = time.milliseconds()// 如果最后一次追上Leader的时间在允许范围内(currentTimeMs - lastCaughtUpTimeMs)<= maxLagMs }}2.2 关键参数
# Kafka Broker配置 # 1. 副本落后最大时间阈值 replica.lag.time.max.ms=30000 # 默认30秒 # 如果Follower超过30秒没有拉取消息,就会被踢出ISR # 2. 最小同步副本数 min.insync.replicas=2 # Topic级别或Broker级别配置 # 生产者在acks=all时需要ISR中至少有这么多个副本 2.3 ISR的动态变化
Kafka ISR (In-Sync Replicas) 动态变化
ISR缩减
正常状态
Follower恢复
Follower4恢复
重新开始同步
追上Leader的offset
重新加入ISR
ISR: [0,1,2,3,4]
恢复完整
ISR作用:
• 确保数据一致性
• 决定可用性
• 影响acks配置
Leader: 0
ISR: [0,1,2,3,4]
参数配置:
• replica.lag.time.max.ms
• min.insync.replicas
ISR: [0,1,2,3]
Follower4被踢出
剩余副本
不同步
Leader 0
Follower 1
Follower 2
Follower 3
Follower4
(非ISR)
Follower变慢
同步请求
超过阈值
Follower4
网络延迟/GC/负载高
Leader 0
Follower4
同步落后
replica.lag.time.max.ms
(默认30秒)
从ISR中移除
副本状态
Leader 0
(接收读写)
Follower 1
(同步中)
Follower 2
(同步中)
Follower 3
(同步中)
Follower 4
(同步中)
ISR: 与Leader保持同步的副本集合
三、ISR的核心作用
3.1 作用一:Leader选举的选民池
Kafka Leader选举流程
选出新Leader
选举过程
故障发生
初始状态
候选人对比
副本状态
数据落后
❌ 宕机
被选为
新Leader 1
Leader 0
(当前Leader)
Follower 1
(ISR中)
Follower 2
(ISR中)
Follower 3
(ISR中)
Follower 4
(不在ISR)
Leader 0 不可用
Controller检测到
Leader故障
开始选举
检查ISR列表
当前ISR: [1,2,3]
Follower 1
✅ ISR中
数据最新
Follower 2
✅ ISR中
数据最新
Follower 3
✅ ISR中
数据最新
Follower 4
❌ 不在ISR
数据落后
更新ISR: [1,2,3]
选举完成
选举规则:
• 优先从ISR中选择
• ISR中所有副本数据一致
• 保证数据不丢失
特殊情况:
• ISR为空时启用脏选举
• 可能丢失数据
• 配合unclean.leader.election
Topic: test
分区: 0
副本: 0,1,2,3,4
基于ISR的Leader选举机制
为什么只能用ISR选举?
- ISR中的副本数据与Leader基本一致
- 选它们做Leader不会丢失数据
- 不在ISR中的副本数据落后,当选会导致数据丢失
3.2 作用二:消息确认的基准
// Producer的acks参数Properties props =newProperties();// acks=0:不等待确认 props.put("acks","0");// 可能丢数据,性能最高// acks=1:Leader确认即可 props.put("acks","1");// Leader写入成功就返回// acks=all:ISR全部确认 props.put("acks","all");// 最安全,性能最低// 或 acks=-1(等同于all)acks=all的工作原理:
Follower3 (非ISR)Follower2 (ISR)Follower1 (ISR)LeaderProducerFollower3 (非ISR)Follower2 (ISR)Follower1 (ISR)LeaderProducermin.insync.replicas=2所以2个ISR确认就够了发送消息写入本地同步同步同步(慢,可能超时)ackack返回成功(所有ISR已确认)
3.3 作用三:保证数据一致性
// 结合min.insync.replicas保证最小同步数// Topic配置 bin/kafka-topics.sh \ --bootstrap-server localhost:9092 \ --alter \ --topic my-topic \ --config min.insync.replicas=2// Producer配置 props.put("acks","all");// 要求所有ISR确认// 当ISR数量小于min.insync.replicas时// 比如:副本数=3,ISR=[0,1](2个),min.insync.replicas=2 ✅ 可用// 如果ISR=[0](1个),min.insync.replicas=2 ❌ 不可用,Producer会收到异常四、AR与ISR的关系图解
4.1 集合关系图
副本集合关系
ISR: 同步副本集合
Leader
Follower 1
Follower 2
AR: 所有副本
Follower 3
同步落后
4.2 公式表示
AR = {Leader, Follower1, Follower2, Follower3, ...} ISR = {Leader, Follower_i, Follower_j, ...} 其中 Follower 满足同步条件 OSR = AR - ISR # Out-of-Sync Replicas,同步落后的副本 AR = ISR ∪ OSR ISR ∩ OSR = ∅ 4.3 动态变化示例
# 查看副本状态 bin/kafka-topics.sh \ --bootstrap-server localhost:9092 \--describe\--topic my-topic # 输出示例 Topic: my-topic Partition: 0 Leader: 0 Replicas: 0,1,2,3,4 Isr: 0,1,2,3 # AR: [0,1,2,3,4]# ISR: [0,1,2,3]# OSR: [4]五、ISR相关的关键配置
5.1 Broker级别配置
# server.properties # Follower落后多长时间被踢出ISR(默认30秒) replica.lag.time.max.ms=30000 # 是否允许不在ISR中的副本被选举为Leader # 如果设为true,可能会丢数据;设为false,提高数据一致性 unclean.leader.election.enable=false # 最小同步副本数(也可在Topic级别设置) min.insync.replicas=1 5.2 Topic级别配置
# 创建Topic时设置 bin/kafka-topics.sh \ --bootstrap-server localhost:9092 \--create\--topic my-topic \--partitions3\ --replication-factor 3\--configmin.insync.replicas=2# 修改现有Topic bin/kafka-topics.sh \ --bootstrap-server localhost:9092 \--alter\--topic my-topic \--configmin.insync.replicas=25.3 Producer级别配置
// Producer配置影响ISR的使用Properties props =newProperties();// 一致性要求高的场景 props.put("acks","all");// 等待所有ISR确认 props.put("retries","3");// 重试次数 props.put("max.in.flight.requests.per.connection","1");// 保证顺序// 性能优先的场景 props.put("acks","1");// Leader确认即可 props.put("retries","0");// 不重试六、ISR的实际应用场景
6.1 场景一:Leader选举
// 当Leader宕机时,Kafka的选举逻辑publicclassLeaderElection{publicvoidelectNewLeader(Partition partition){// 1. 获取当前ISR列表List<Replica> isr = partition.getInSyncReplicas();// 2. 优先从ISR中选举if(!isr.isEmpty()){// 选择ISR中的第一个作为新LeaderReplica newLeader = isr.get(0); partition.setLeader(newLeader);}else{// 如果ISR为空,根据配置决定if(uncleanLeaderElectionEnabled){// 允许从OSR中选举(可能丢数据)Replica newLeader =chooseFromOSR(); partition.setLeader(newLeader);}else{// 不可用thrownewNoReplicaAvailableException();}}}}6.2 场景二:生产者的可靠性保证
// 需要高可靠性的生产者配置publicclassReliableProducer{publicstaticvoidmain(String[] args){Properties props =newProperties(); props.put("bootstrap.servers","localhost:9092"); props.put("acks","all"); props.put("retries",Integer.MAX_VALUE); props.put("max.in.flight.requests.per.connection","1"); props.put("enable.idempotence","true");// 开启幂等性// 还需要Topic配置min.insync.replicas=2KafkaProducer<String,String> producer =newKafkaProducer<>(props);// 发送消息 producer.send(newProducerRecord<>("my-topic","key","value"),(metadata, exception)->{if(exception !=null){// 如果ISR不足,会收到NotEnoughReplicasExceptionhandleException(exception);}});}}6.3 场景三:监控ISR变化
# 监控ISR收缩和扩张 bin/kafka-topics.sh \ --bootstrap-server localhost:9092 \--describe\--topic my-topic \ --under-replicated-partitions # 查看所有未完全同步的分区 bin/kafka-topics.sh \ --bootstrap-server localhost:9092 \--describe\ --under-replicated-partitions 七、面试高频问题
Q1:ISR和AR分别代表什么?
答:
- AR:分区的所有副本集合(包括Leader和所有Follower)
- ISR:与Leader保持同步的副本集合(包括Leader和同步的Follower)
Q2:副本被踢出ISR的条件是什么?
答:Follower超过replica.lag.time.max.ms(默认30秒)没有向Leader拉取消息,就会被踢出ISR。这表示该Follower已经落后太多,不能保证数据一致性。
Q3:为什么Leader选举只能从ISR中选?
答:因为ISR中的副本数据与Leader基本一致,选它们做Leader不会丢失数据。如果从OSR中选举,选出的Leader可能缺少大量数据,导致数据不一致。
Q4:min.insync.replicas的作用是什么?
答:当Producer设置acks=all时,消息必须写入ISR中至少min.insync.replicas个副本才算成功。这个参数保证了最小的数据冗余度,防止在ISR数量不足时仍然写入成功导致数据风险。
Q5:什么是unclean leader election?有什么风险?
答:当ISR为空时,是否允许从OSR(不同步副本)中选举Leader。如果允许(unclean.leader.election.enable=true),可能会选出数据落后的副本作为Leader,导致数据丢失。这是用一致性换可用性。
八、总结
8.1 核心概念关系
root(Kafka副本集合)
AR(所有副本)
Leader(读写)
Follower(同步)
ISR(同步副本)
数据一致
可被选举
OSR(落后副本)
数据落后
不能选举
8.2 关键公式
AR = ISR ∪ OSR
ISR ⊆ AR
Leader ∈ ISR
8.3 一句话总结
ISR是Kafka保证数据一致性的核心机制,它圈定了"可用且可靠"的副本集合,是Leader选举和数据确认的基准。
掌握了ISR和AR的概念,你就理解了Kafka高可用架构的精髓!
思考题:在Kafka 2.8+中引入的Raft-based KRaft模式,ISR的概念有什么变化?和ZooKeeper模式下的ISR有什么异同?欢迎在评论区讨论!
🌺The End🌺点点关注,收藏不迷路🌺 |