Kafka JavaAPI Consumer内容示例

Kafka JavaAPI Consumer内容示例

基础的Consumer内容示例:

package com.czxy.demo02;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class Consumer02 {
    public static void main(String[] args){

		//.properties文件的另一种形式
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        //	消费者组id为test
        properties.put("group.id", "test");
        //自动提交offset
        properties.put("enable.auto.commit", "true");
        //自动提交偏移量的时间间隔
        properties.put("auto.commit.interval.ms", "1000");

		//从哪儿消费
        properties.put("auto.offset.reset", "earliest");
        //earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        //latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
        //none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常


		//会话超时时间
        properties.put("session.timeout.ms", "30000");
        //反序列化器
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		
		//实例一个Consumer
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

		//订阅 teacher主题
        kafkaConsumer.subscribe(Arrays.asList("teacher"));
        while (true) {
            // jdk queue(Java的JDK 中) .offer()插入、.poll()获取元素
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
            	//输出获得的消息的偏移量和值(offset 和 value)
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                System.out.println();
            }
        }

    }
}

需要注意的是:properties.put("auto.offset.reset", "earliest");

  • earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  • latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  • none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常