要在 Kafka 中发送消息,首先需要确保集群环境就绪。在 Linux 环境下,依次启动 ZooKeeper 和 Kafka 服务。
zkServer.sh start
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
环境搭建完成后,建议先通过命令行工具验证连接是否正常。例如启动一个控制台消费者监听 pet 主题:
kafka-console-consumer.sh --topic pet --bootstrap-server localhost:9092
接下来准备代码所需的配置文件。将 producer.properties 和 consumer.properties 放入项目的 resources 目录下,确保路径可被类加载器访问。
核心逻辑在于使用 Scala 编写生产者客户端。这里需要注意资源的管理,发送完数据后务必关闭生产者实例以释放连接。
package com.example.kafka
import java.io.InputStream
import java.util.Properties
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
object KafkaHelper {
def main(args: Array[String]): Unit = {
// 1. 加载配置文件
val stream: InputStream = KafkaHelper.getClass.getClassLoader.getResourceAsStream("producer.properties")
val properties: Properties = new Properties()
properties.load(stream)
// 2. 初始化生产者对象
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](properties)
// 3. 构建消息并发送
// send 方法返回 Future[RecordMetadata],表示异步发送结果
val send: Future[RecordMetadata] = producer.send(new ProducerRecord[String, String]("pet", "Hello Kafka"))
// 4. 关闭资源
producer.close()
}
}
运行上述代码后,如果配置正确且网络通畅,即可在消费者端看到消息内容。实际开发中,建议对 Future 的结果进行回调处理或阻塞等待,以确保消息确认投递。

