Kafka ISR 与 AR 深度解析:副本同步机制核心概念
在 Kafka 的分布式架构中,副本机制是保证高可用和数据一致性的基石。而 ISR(In-Sync Replicas) 和 AR(Assigned Replicas) 则是理解副本同步机制的两个核心概念。
一、核心概念定义
1.1 AR:分区的所有副本
AR(Assigned Replicas):一个分区分配的所有副本集合,包括 Leader 和所有 Follower。
- Leader 副本:负责读写
- Follower 副本:同步数据
- AR = Leader + 所有 Follower
1.2 ISR:与 Leader 保持同步的副本
ISR(In-Sync Replicas):与 Leader 保持同步的 Follower 集合,包括 Leader 自身。只有 ISR 中的副本才有资格被选举为新的 Leader。
- ISR 动态集合:例如 [0, 1, 2, 3]
- 主题分区 AR:例如 [0, 1, 2, 3, 4]
- Leader 0:始终在 ISR 中
- Follower 1/2/3:同步正常,在 ISR 中
- Follower 4:同步落后,不在 ISR 中
1.3 官方定义
| 概念 | 全称 | 定义 | 包含 |
|---|
| AR | Assigned Replicas | 分区分配的所有副本 | Leader + 所有 Follower |
| ISR | In-Sync Replicas | 与 Leader 保持同步的副本 | Leader + 同步的 Follower |
二、ISR 的判定标准
2.1 同步的判断条件
class ReplicaManager {
def isInSync(replica: Replica): Boolean = {
val maxLagMs = config.replicaLagTimeMaxMs
val lastCaughtUpTimeMs = replica.lastCaughtUpTimeMs
val currentTimeMs = time.milliseconds()
(currentTimeMs - lastCaughtUpTimeMs) <= maxLagMs
}
}
2.2 关键参数
replica.lag.time.max.ms=30000
min.insync.replicas=2
2.3 ISR 的动态变化
Kafka ISR 动态变化示例:
- 正常状态:Leader 0,ISR: [0, 1, 2, 3, 4]
- Follower 变慢:Follower 4 网络延迟/GC/负载高,同步落后
- 从 ISR 中移除:超过
replica.lag.time.max.ms 阈值,Follower 4 被踢出
- Follower 恢复:重新同步,追上 Leader 的 offset,重新加入 ISR
三、ISR 的核心作用
3.1 作用一:Leader 选举的选民池
当 Leader 宕机时,Controller 检测到故障并开始选举。
- 检查 ISR 列表:当前 ISR: [1, 2, 3]
- 选举规则:优先从 ISR 中选择,确保数据不丢失
- 特殊情况:ISR 为空时启用脏选举(unclean leader election),可能丢失数据
为什么只能用 ISR 选举?
- ISR 中的副本数据与 Leader 基本一致
- 选它们做 Leader 不会丢失数据
- 不在 ISR 中的副本数据落后,当选会导致数据丢失
3.2 作用二:消息确认的基准
Properties props = new Properties();
props.put("acks", "0");
props.put("acks", "1");
props.put("acks", "all");
acks=all 的工作原理:
- 消息需写入本地并同步给所有 ISR 副本
- 等待 min.insync.replicas 个确认即可返回成功
- 若同步慢,可能导致超时
3.3 作用三:保证数据一致性
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--alter --topic my-topic --config min.insync.replicas=2
props.put("acks", "all");
四、AR 与 ISR 的关系图解
4.1 集合关系图
- AR: 所有副本 (Leader + Follower 1, 2, 3...)
- ISR: 同步副本集合 (Leader + 同步 Follower)
- OSR: 不同步副本 (AR - ISR)
4.2 公式表示
AR = {Leader, Follower1, Follower2, Follower3, ...}
ISR = {Leader, Follower_i, Follower_j, ...} 其中 Follower 满足同步条件
OSR = AR - ISR # Out-of-Sync Replicas,同步落后的副本
AR = ISR ∪ OSR
ISR ∩ OSR = ∅
4.3 动态变化示例
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic my-topic
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0,1,2,3,4 Isr: 0,1,2,3
五、ISR 相关的关键配置
5.1 Broker 级别配置
# server.properties
# Follower 落后多长时间被踢出 ISR(默认 30 秒)
replica.lag.time.max.ms=30000
# 是否允许不在 ISR 中的副本被选举为 Leader
# 如果设为 true,可能会丢数据;设为 false,提高数据一致性
unclean.leader.election.enable=false
# 最小同步副本数(也可在 Topic 级别设置)
min.insync.replicas=1
5.2 Topic 级别配置
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic my-topic --partitions 3 --replication-factor 3 \
--config min.insync.replicas=2
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--alter --topic my-topic --config min.insync.replicas=2
5.3 Producer 级别配置
Properties props = new Properties();
props.put("acks", "all");
props.put("retries", "3");
props.put("max.in.flight.requests.per.connection", "1");
props.put("acks", "1");
props.put("retries", "0");
六、ISR 的实际应用场景
6.1 场景一:Leader 选举
public class LeaderElection {
public void electNewLeader(Partition partition) {
List<Replica> isr = partition.getInSyncReplicas();
if (!isr.isEmpty()) {
Replica newLeader = isr.get(0);
partition.setLeader(newLeader);
} else {
if (uncleanLeaderElectionEnabled) {
Replica newLeader = chooseFromOSR();
partition.setLeader(newLeader);
} else {
throw new NoReplicaAvailableException();
}
}
}
}
6.2 场景二:生产者的可靠性保证
public class ReliableProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", "1");
props.put("enable.idempotence", "true");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "value"), (metadata, exception) -> {
if (exception != null) {
handleException(exception);
}
});
}
}
6.3 场景三:监控 ISR 变化
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic my-topic --under-replicated-partitions
bin/kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --under-replicated-partitions
七、面试高频问题
Q1:ISR 和 AR 分别代表什么?
答:
- AR:分区的所有副本集合(包括 Leader 和所有 Follower)
- ISR:与 Leader 保持同步的副本集合(包括 Leader 和同步的 Follower)
Q2:副本被踢出 ISR 的条件是什么?
答:Follower 超过 replica.lag.time.max.ms(默认 30 秒)没有向 Leader 拉取消息,就会被踢出 ISR。这表示该 Follower 已经落后太多,不能保证数据一致性。
Q3:为什么 Leader 选举只能从 ISR 中选?
答:因为 ISR 中的副本数据与 Leader 基本一致,选它们做 Leader 不会丢失数据。如果从 OSR 中选举,选出的 Leader 可能缺少大量数据,导致数据不一致。
Q4:min.insync.replicas 的作用是什么?
答:当 Producer 设置 acks=all 时,消息必须写入 ISR 中至少 min.insync.replicas 个副本才算成功。这个参数保证了最小的数据冗余度,防止在 ISR 数量不足时仍然写入成功导致数据风险。
Q5:什么是 unclean leader election?有什么风险?
答:当 ISR 为空时,是否允许从 OSR(不同步副本)中选举 Leader。如果允许(unclean.leader.election.enable=true),可能会选出数据落后的副本作为 Leader,导致数据丢失。这是用一致性换可用性。
八、总结
8.1 核心概念关系
- AR:所有副本集合
- ISR:同步副本集合(可被选举)
- OSR:落后副本集合(不可被选举)
8.2 关键公式
AR = ISR ∪ OSR
ISR ⊆ AR
Leader ∈ ISR
8.3 一句话总结
ISR 是 Kafka 保证数据一致性的核心机制,它圈定了'可用且可靠'的副本集合,是 Leader 选举和数据确认的基准。