Kafka JavaAPI Producer内容示例
如果你想创建一个Producer来生产数据,那么你需要有一个Topic。
可以执行以下命令创建Topic
//bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 2 --partitions 3 --topic student
--create
代表创建--zookeeper zkhost:port
代表zookeeper的端口,多个端口之间用逗号隔开 例如:--zookeeper hadoop01:2181,hadoop02:2181
--replication-factor
代表副本数--partitions
代表分区数
当你有了一个Topic之后,你就可以用Producer向这个Topic里发送数据了。
package com.czxy.demo01;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class Producer01 {
//模拟生产者,请写出代码向student主题中生产数据0-99
public static void main(String[] args) throws InterruptedException {
//.properties文件的另一种形式
Properties props = new Properties();
//kafka的broker集群地址,如果没有配置host文件,那么可以使用ip地址:端口
props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
//bootstrap.servers是Kafka集群的IP地址,如果Broker数量超过1个,则使用逗号分隔
//消息确认机制为all
props.put("acks", "all");
//重试次数为0
props.put("retries", 0);
//批处理消息字节数
props.put("batch.size", 16384);
//每条数据产生延迟
props.put("linger.ms", 1);
//缓冲区大小
props.put("buffer.memory", 33554432);
//序列化器:StringSerializer
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//创建生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
//发送数据
for (int i = 0; i < 100; i++) {
// 发送数据 ,需要一个producerRecord对象,
// 最少参数 String topic, V value
kafkaProducer.send(new ProducerRecord<String, String>("student", ""+i));
//停顿100ms
Thread.sleep(100);
}
}
}
这里的Properties props = new Properties();
其实是.properties文件的另一种形式,工作中大多还是使用.properties文件为主。