Kafka JavaAPI Consumer内容示例
基础的Consumer内容示例:
package com.czxy.demo02;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class Consumer02 {
public static void main(String[] args){
//.properties文件的另一种形式
Properties properties = new Properties();
properties.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
// 消费者组id为test
properties.put("group.id", "test");
//自动提交offset
properties.put("enable.auto.commit", "true");
//自动提交偏移量的时间间隔
properties.put("auto.commit.interval.ms", "1000");
//从哪儿消费
properties.put("auto.offset.reset", "earliest");
//earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
//会话超时时间
properties.put("session.timeout.ms", "30000");
//反序列化器
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//实例一个Consumer
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
//订阅 teacher主题
kafkaConsumer.subscribe(Arrays.asList("teacher"));
while (true) {
// jdk queue(Java的JDK 中) .offer()插入、.poll()获取元素
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
//输出获得的消息的偏移量和值(offset 和 value)
System.out.printf("offset = %d, value = %s", record.offset(), record.value());
System.out.println();
}
}
}
}
需要注意的是:properties.put("auto.offset.reset", "earliest");
- earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
- latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
- none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常