Kafka双机KRaft集群部署

Kafka 双机 KRaft 集群部署文档

1. 方案说明

本文档介绍如何部署 Kafka 双节点 KRaft 集群,实现 Kafka 高可用运行。

Kafka 3.x 版本开始推荐使用 KRaft 模式,无需依赖 Zookeeper。

本方案特点:

  • 双节点 Kafka 集群
  • 数据副本同步
  • Broker 宕机自动 Leader 迁移
  • 支持 Producer / Consumer 高可用访问

2. 集群架构

 Producer | v +-------------+ | Kafka集群 | | | | 170 180 | +------+------+ | Consumer Group 

服务器规划:

服务器IP角色
Kafka-1192.168.239.170Broker + Controller
Kafka-2192.168.239.180Broker + Controller
VIP(可选)192.168.239.200应用访问

3. 环境准备

3.1 操作系统

推荐:

CentOS 7+ Rocky Linux Ubuntu 20+ 

3.2 Java 环境

Kafka 需要 Java 运行环境。

安装 OpenJDK:

yum install-y java-17-openjdk 

验证:

java-version

输出示例:

openjdk version "17" 

4. Kafka 安装

4.1 下载 Kafka

下载 Kafka 安装包:

https://kafka.apache.org/downloads 

示例:

wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz 

解压:

tar-xzf kafka_2.13-3.7.0.tgz mv kafka_2.13-3.7.0 /opt/kafka 

4.2 创建数据目录

mkdir-p /data/ 

4.3 开放防火墙端口

firewall-cmd --zone=public --add-port=9092/tcp --permanent firewall-cmd --zone=public --add-port=9093/tcp --permanent firewall-cmd --reload

5. KRaft 集群配置

Kafka 3.x 使用 KRaft 模式替代 Zookeeper。

配置文件:

/opt/kafka/config/kraft/server.properties 

6. Kafka 节点配置

6.1 Kafka-1 配置

服务器:

192.168.239.170 

编辑配置:

vi /opt/kafka/config/kraft/server.properties 
# 当前节点在 Kafka 集群中的角色 # broker:消息存储与处理 # controller:负责集群元数据管理和 leader 选举 process.roles=broker,controller # 当前节点的唯一ID # 每个Kafka节点必须唯一 node.id=1 # Controller 集群投票节点 # 格式: # nodeId@host:controllerPort # 用于 KRaft controller 选举 [email protected]:9093,[email protected]:9093 # 定义 Kafka 监听地址 # PLAINTEXT:客户端连接端口 # CONTROLLER:KRaft Controller 通信端口 listeners=PLAINTEXT://192.168.239.170:9092,CONTROLLER://192.168.239.170:9093 # Broker 之间通信使用的 listener inter.broker.listener.name=PLAINTEXT # 对外公布的地址 # Producer 和 Consumer 会通过这个地址连接 Kafka advertised.listeners=PLAINTEXT://192.168.239.170:9092 # 指定 Controller 使用的 listener 名称 controller.listener.names=CONTROLLER # 定义 listener 与安全协议映射关系 # 默认使用 PLAINTEXT(无加密) listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # Kafka 网络线程数 # 用于处理客户端网络请求 num.network.threads=3 # Kafka IO 线程数 # 处理磁盘读写操作 num.io.threads=8 # Socket 发送缓冲区大小(字节) socket.send.buffer.bytes=102400 # Socket 接收缓冲区大小(字节) socket.receive.buffer.bytes=102400 # Kafka 单个请求最大大小 # 默认 100MB socket.request.max.bytes=104857600 # Kafka 数据日志存储目录 # Topic 数据、分区日志、offset 等都会存放在这里 log.dirs=/data/kraft-combined-logs # Topic 默认分区数量 # 创建 Topic 时未指定 partition 时使用 num.partitions=3 # 每个数据目录用于恢复日志的线程数 num.recovery.threads.per.data.dir=1 # 消费者 offset 主题副本数量 # 建议 >=2 offsets.topic.replication.factor=2 # 事务日志副本数量 transaction.state.log.replication.factor=2 # 事务日志最小 ISR 数量 transaction.state.log.min.isr=1 # Kafka 日志保留时间(小时) # 默认 7 天 log.retention.hours=168 # 单个日志 segment 文件大小 # 默认 1GB log.segment.bytes=1073741824 # Kafka 日志清理检查间隔 # 每5分钟检查一次是否需要删除过期日志 log.retention.check.interval.ms=300000 

6.2 Kafka-2 配置

服务器:

192.168.239.180 

编辑配置:

vi /opt/kafka/config/kraft/server.properties 
# 当前节点在 Kafka 集群中的角色 # broker:消息存储与处理 # controller:负责集群元数据管理和 leader 选举 process.roles=broker,controller # 当前节点的唯一ID # 每个Kafka节点必须唯一 node.id=2 # Controller 集群投票节点 # 格式: # nodeId@host:controllerPort # 用于 KRaft controller 选举 [email protected]:9093,[email protected]:9093 # 定义 Kafka 监听地址 # PLAINTEXT:客户端连接端口 # CONTROLLER:KRaft Controller 通信端口 listeners=PLAINTEXT://192.168.239.180:9092,CONTROLLER://192.168.239.180:9093 # Broker 之间通信使用的 listener inter.broker.listener.name=PLAINTEXT # 对外公布的地址 # Producer 和 Consumer 会通过这个地址连接 Kafka advertised.listeners=PLAINTEXT://192.168.239.180:9092 # 指定 Controller 使用的 listener 名称 controller.listener.names=CONTROLLER # 定义 listener 与安全协议映射关系 # 默认使用 PLAINTEXT(无加密) listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # Kafka 网络线程数 # 用于处理客户端网络请求 num.network.threads=3 # Kafka IO 线程数 # 处理磁盘读写操作 num.io.threads=8 # Socket 发送缓冲区大小(字节) socket.send.buffer.bytes=102400 # Socket 接收缓冲区大小(字节) socket.receive.buffer.bytes=102400 # Kafka 单个请求最大大小 # 默认 100MB socket.request.max.bytes=104857600 # Kafka 数据日志存储目录 # Topic 数据、分区日志、offset 等都会存放在这里 log.dirs=/data/kraft-combined-logs # Topic 默认分区数量 # 创建 Topic 时未指定 partition 时使用 num.partitions=3 # 每个数据目录用于恢复日志的线程数 num.recovery.threads.per.data.dir=1 # 消费者 offset 主题副本数量 # 建议 >=2 offsets.topic.replication.factor=2 # 事务日志副本数量 transaction.state.log.replication.factor=2 # 事务日志最小 ISR 数量 transaction.state.log.min.isr=1 # Kafka 日志保留时间(小时) # 默认 7 天 log.retention.hours=168 # 单个日志 segment 文件大小 # 默认 1GB log.segment.bytes=1073741824 # Kafka 日志清理检查间隔 # 每5分钟检查一次是否需要删除过期日志 log.retention.check.interval.ms=300000 

7. 初始化 Kafka 存储

生成 ClusterID:

/opt/kafka/bin/kafka-storage.sh random-uuid 

示例:

q1Sh-9_ISia_zwGINzRvyQ 

初始化 Kafka:

Kafka-1:

/opt/kafka/bin/kafka-storage.sh format\-t q1Sh-9_ISia_zwGINzRvyQ \-c /opt/kafka/config/kraft/server.properties 

Kafka-2:

/opt/kafka/bin/kafka-storage.sh format\-t q1Sh-9_ISia_zwGINzRvyQ \-c /opt/kafka/config/kraft/server.properties 

8. 启动 Kafka

Kafka-1:

/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties 

Kafka-2:

/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties 

查看进程:

jps 

应看到:

Kafka 

9. 集群验证

查看 Broker:

/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server 192.168.239.170:9092 

输出示例:

192.168.239.170:9092 (id:1) 192.168.239.180:9092 (id:2) 

说明 Kafka 集群已建立。


10. 创建 Topic

创建 Topic:

/opt/kafka/bin/kafka-topics.sh \--create\--topic sync-test \ --bootstrap-server 192.168.239.170:9092 \--partitions3\ --replication-factor 2

查看 Topic:

/opt/kafka/bin/kafka-topics.sh \--describe\--topic sync-test \ --bootstrap-server 192.168.239.170:9092 

输出示例:

Partition:0 Leader:1 Replicas:1,2 Isr:1,2 Partition:1 Leader:2 Replicas:2,1 Isr:2,1 

说明副本同步正常。


11. 测试生产消费

11.1 生产消息

/opt/kafka/bin/kafka-console-producer.sh \ --bootstrap-server 192.168.239.170:9092 \--topic sync-test 

输入:

123 456 789 

11.2 消费消息

/opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server 192.168.239.170:9092 \--topic sync-test \ --from-beginning 

输出:

123 456 789 

12. 故障切换验证

关闭 Kafka-1:

shutdown-h now 

再次消费:

kafka-console-consumer.sh \ --bootstrap-server 192.168.239.180:9092 \--topic sync-test \ --from-beginning 

如果仍然能消费,说明:

  • Leader 自动迁移成功
  • Kafka 集群高可用生效

13. Java 客户端连接示例

Producer / Consumer 推荐配置:

bootstrap.servers= 192.168.239.170:9092, 192.168.239.180:9092 

Java 示例:

Properties props = new Properties(); props.put("bootstrap.servers", "192.168.239.170:9092,192.168.239.180:9092"); props.put("group.id","demo-consumer"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

14. 注意事项

  1. Kafka 数据目录不要使用 /tmp
  2. 建议使用独立磁盘 /data/kafka-logs
  3. 生产环境建议 3 节点 Kafka 集群
  4. Producer 推荐配置:
acks=all retries=3 

15. 目录说明

Kafka 数据目录示例:

/data/kafka-logs/ sync-test-0 sync-test-1 sync-test-2 __consumer_offsets 

Kafka 数据以 日志文件形式存储


16. 总结

本方案实现:

  • Kafka KRaft 双节点集群
  • 副本数据同步
  • Broker 故障自动切换
  • Producer / Consumer 高可用访问

适用于:

  • 中小规模 Kafka 部署
  • 开发测试环境
  • 轻量生产环境
Could not load content