Kafka双机KRaft集群部署
Kafka 双机 KRaft 集群部署文档
1. 方案说明
本文档介绍如何部署 Kafka 双节点 KRaft 集群,实现 Kafka 高可用运行。
Kafka 3.x 版本开始推荐使用 KRaft 模式,无需依赖 Zookeeper。
本方案特点:
- 双节点 Kafka 集群
- 数据副本同步
- Broker 宕机自动 Leader 迁移
- 支持 Producer / Consumer 高可用访问
2. 集群架构
Producer | v +-------------+ | Kafka集群 | | | | 170 180 | +------+------+ | Consumer Group 服务器规划:
| 服务器 | IP | 角色 |
|---|---|---|
| Kafka-1 | 192.168.239.170 | Broker + Controller |
| Kafka-2 | 192.168.239.180 | Broker + Controller |
| VIP(可选) | 192.168.239.200 | 应用访问 |
3. 环境准备
3.1 操作系统
推荐:
CentOS 7+ Rocky Linux Ubuntu 20+ 3.2 Java 环境
Kafka 需要 Java 运行环境。
安装 OpenJDK:
yum install-y java-17-openjdk 验证:
java-version输出示例:
openjdk version "17" 4. Kafka 安装
4.1 下载 Kafka
下载 Kafka 安装包:
https://kafka.apache.org/downloads 示例:
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz 解压:
tar-xzf kafka_2.13-3.7.0.tgz mv kafka_2.13-3.7.0 /opt/kafka 4.2 创建数据目录
mkdir-p /data/ 4.3 开放防火墙端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent firewall-cmd --zone=public --add-port=9093/tcp --permanent firewall-cmd --reload5. KRaft 集群配置
Kafka 3.x 使用 KRaft 模式替代 Zookeeper。
配置文件:
/opt/kafka/config/kraft/server.properties 6. Kafka 节点配置
6.1 Kafka-1 配置
服务器:
192.168.239.170 编辑配置:
vi /opt/kafka/config/kraft/server.properties # 当前节点在 Kafka 集群中的角色 # broker:消息存储与处理 # controller:负责集群元数据管理和 leader 选举 process.roles=broker,controller # 当前节点的唯一ID # 每个Kafka节点必须唯一 node.id=1 # Controller 集群投票节点 # 格式: # nodeId@host:controllerPort # 用于 KRaft controller 选举 [email protected]:9093,[email protected]:9093 # 定义 Kafka 监听地址 # PLAINTEXT:客户端连接端口 # CONTROLLER:KRaft Controller 通信端口 listeners=PLAINTEXT://192.168.239.170:9092,CONTROLLER://192.168.239.170:9093 # Broker 之间通信使用的 listener inter.broker.listener.name=PLAINTEXT # 对外公布的地址 # Producer 和 Consumer 会通过这个地址连接 Kafka advertised.listeners=PLAINTEXT://192.168.239.170:9092 # 指定 Controller 使用的 listener 名称 controller.listener.names=CONTROLLER # 定义 listener 与安全协议映射关系 # 默认使用 PLAINTEXT(无加密) listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # Kafka 网络线程数 # 用于处理客户端网络请求 num.network.threads=3 # Kafka IO 线程数 # 处理磁盘读写操作 num.io.threads=8 # Socket 发送缓冲区大小(字节) socket.send.buffer.bytes=102400 # Socket 接收缓冲区大小(字节) socket.receive.buffer.bytes=102400 # Kafka 单个请求最大大小 # 默认 100MB socket.request.max.bytes=104857600 # Kafka 数据日志存储目录 # Topic 数据、分区日志、offset 等都会存放在这里 log.dirs=/data/kraft-combined-logs # Topic 默认分区数量 # 创建 Topic 时未指定 partition 时使用 num.partitions=3 # 每个数据目录用于恢复日志的线程数 num.recovery.threads.per.data.dir=1 # 消费者 offset 主题副本数量 # 建议 >=2 offsets.topic.replication.factor=2 # 事务日志副本数量 transaction.state.log.replication.factor=2 # 事务日志最小 ISR 数量 transaction.state.log.min.isr=1 # Kafka 日志保留时间(小时) # 默认 7 天 log.retention.hours=168 # 单个日志 segment 文件大小 # 默认 1GB log.segment.bytes=1073741824 # Kafka 日志清理检查间隔 # 每5分钟检查一次是否需要删除过期日志 log.retention.check.interval.ms=300000 6.2 Kafka-2 配置
服务器:
192.168.239.180 编辑配置:
vi /opt/kafka/config/kraft/server.properties # 当前节点在 Kafka 集群中的角色 # broker:消息存储与处理 # controller:负责集群元数据管理和 leader 选举 process.roles=broker,controller # 当前节点的唯一ID # 每个Kafka节点必须唯一 node.id=2 # Controller 集群投票节点 # 格式: # nodeId@host:controllerPort # 用于 KRaft controller 选举 [email protected]:9093,[email protected]:9093 # 定义 Kafka 监听地址 # PLAINTEXT:客户端连接端口 # CONTROLLER:KRaft Controller 通信端口 listeners=PLAINTEXT://192.168.239.180:9092,CONTROLLER://192.168.239.180:9093 # Broker 之间通信使用的 listener inter.broker.listener.name=PLAINTEXT # 对外公布的地址 # Producer 和 Consumer 会通过这个地址连接 Kafka advertised.listeners=PLAINTEXT://192.168.239.180:9092 # 指定 Controller 使用的 listener 名称 controller.listener.names=CONTROLLER # 定义 listener 与安全协议映射关系 # 默认使用 PLAINTEXT(无加密) listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # Kafka 网络线程数 # 用于处理客户端网络请求 num.network.threads=3 # Kafka IO 线程数 # 处理磁盘读写操作 num.io.threads=8 # Socket 发送缓冲区大小(字节) socket.send.buffer.bytes=102400 # Socket 接收缓冲区大小(字节) socket.receive.buffer.bytes=102400 # Kafka 单个请求最大大小 # 默认 100MB socket.request.max.bytes=104857600 # Kafka 数据日志存储目录 # Topic 数据、分区日志、offset 等都会存放在这里 log.dirs=/data/kraft-combined-logs # Topic 默认分区数量 # 创建 Topic 时未指定 partition 时使用 num.partitions=3 # 每个数据目录用于恢复日志的线程数 num.recovery.threads.per.data.dir=1 # 消费者 offset 主题副本数量 # 建议 >=2 offsets.topic.replication.factor=2 # 事务日志副本数量 transaction.state.log.replication.factor=2 # 事务日志最小 ISR 数量 transaction.state.log.min.isr=1 # Kafka 日志保留时间(小时) # 默认 7 天 log.retention.hours=168 # 单个日志 segment 文件大小 # 默认 1GB log.segment.bytes=1073741824 # Kafka 日志清理检查间隔 # 每5分钟检查一次是否需要删除过期日志 log.retention.check.interval.ms=300000 7. 初始化 Kafka 存储
生成 ClusterID:
/opt/kafka/bin/kafka-storage.sh random-uuid 示例:
q1Sh-9_ISia_zwGINzRvyQ 初始化 Kafka:
Kafka-1:
/opt/kafka/bin/kafka-storage.sh format\-t q1Sh-9_ISia_zwGINzRvyQ \-c /opt/kafka/config/kraft/server.properties Kafka-2:
/opt/kafka/bin/kafka-storage.sh format\-t q1Sh-9_ISia_zwGINzRvyQ \-c /opt/kafka/config/kraft/server.properties 8. 启动 Kafka
Kafka-1:
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties Kafka-2:
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties 查看进程:
jps 应看到:
Kafka 9. 集群验证
查看 Broker:
/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server 192.168.239.170:9092 输出示例:
192.168.239.170:9092 (id:1) 192.168.239.180:9092 (id:2) 说明 Kafka 集群已建立。
10. 创建 Topic
创建 Topic:
/opt/kafka/bin/kafka-topics.sh \--create\--topic sync-test \ --bootstrap-server 192.168.239.170:9092 \--partitions3\ --replication-factor 2查看 Topic:
/opt/kafka/bin/kafka-topics.sh \--describe\--topic sync-test \ --bootstrap-server 192.168.239.170:9092 输出示例:
Partition:0 Leader:1 Replicas:1,2 Isr:1,2 Partition:1 Leader:2 Replicas:2,1 Isr:2,1 说明副本同步正常。
11. 测试生产消费
11.1 生产消息
/opt/kafka/bin/kafka-console-producer.sh \ --bootstrap-server 192.168.239.170:9092 \--topic sync-test 输入:
123 456 789 11.2 消费消息
/opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server 192.168.239.170:9092 \--topic sync-test \ --from-beginning 输出:
123 456 789 12. 故障切换验证
关闭 Kafka-1:
shutdown-h now 再次消费:
kafka-console-consumer.sh \ --bootstrap-server 192.168.239.180:9092 \--topic sync-test \ --from-beginning 如果仍然能消费,说明:
- Leader 自动迁移成功
- Kafka 集群高可用生效
13. Java 客户端连接示例
Producer / Consumer 推荐配置:
bootstrap.servers= 192.168.239.170:9092, 192.168.239.180:9092 Java 示例:
Properties props = new Properties(); props.put("bootstrap.servers", "192.168.239.170:9092,192.168.239.180:9092"); props.put("group.id","demo-consumer"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 14. 注意事项
- Kafka 数据目录不要使用
/tmp - 建议使用独立磁盘
/data/kafka-logs - 生产环境建议 3 节点 Kafka 集群
- Producer 推荐配置:
acks=all retries=3 15. 目录说明
Kafka 数据目录示例:
/data/kafka-logs/ sync-test-0 sync-test-1 sync-test-2 __consumer_offsets Kafka 数据以 日志文件形式存储。
16. 总结
本方案实现:
- Kafka KRaft 双节点集群
- 副本数据同步
- Broker 故障自动切换
- Producer / Consumer 高可用访问
适用于:
- 中小规模 Kafka 部署
- 开发测试环境
- 轻量生产环境