Kafka JavaAPI Producer内容示例

Kafka JavaAPI Producer内容示例

如果你想创建一个Producer来生产数据,那么你需要有一个Topic。
可以执行以下命令创建Topic

//bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 2 --partitions 3 --topic student
  • --create代表创建
  • --zookeeper zkhost:port 代表zookeeper的端口,多个端口之间用逗号隔开 例如:--zookeeper hadoop01:2181,hadoop02:2181
  • --replication-factor 代表副本数
  • --partitions 代表分区数

当你有了一个Topic之后,你就可以用Producer向这个Topic里发送数据了。

package com.czxy.demo01;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class Producer01 {
    //模拟生产者,请写出代码向student主题中生产数据0-99
    

    public static void main(String[] args) throws InterruptedException {

		//.properties文件的另一种形式
        Properties props = new Properties();
        //kafka的broker集群地址,如果没有配置host文件,那么可以使用ip地址:端口
        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        //bootstrap.servers是Kafka集群的IP地址,如果Broker数量超过1个,则使用逗号分隔
   		
   		//消息确认机制为all
        props.put("acks", "all");
        
        //重试次数为0
        props.put("retries", 0);
		
		//批处理消息字节数
        props.put("batch.size", 16384);

		//每条数据产生延迟
        props.put("linger.ms", 1);

		//缓冲区大小
        props.put("buffer.memory", 33554432);

		//序列化器:StringSerializer
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		
		//创建生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        //发送数据
        for (int i = 0; i < 100; i++) {
            // 发送数据 ,需要一个producerRecord对象,
            // 最少参数 String topic, V value
            kafkaProducer.send(new ProducerRecord<String, String>("student", ""+i));
            //停顿100ms
            Thread.sleep(100);
        }

    }
}

这里的Properties props = new Properties();其实是.properties文件的另一种形式,工作中大多还是使用.properties文件为主。