Spark Streaming createDirectStream保存kafka offset(JAVA实现)
问题描述()
最近使用spark streaming处理kafka的数据,业务数据量比较大,就使用了kafkaUtils的createDirectStream()方式,此方法直接从kafka的broker的分区中读取数据,跳过了zookeeper,并且没有receiver,是spark的task直接对接kakfa topic partition,能保证消息恰好一次语意,但是此种方式因为没有经过zk,topic的offset也就没有保存,当job重启后只能从最新的offset开始消费消息,造成重启过程中的消息丢失。
解决方案
一般,有两种方式可以先spark streaming 保存offset:spark checkpoint机制和程序中自己实现保存offset逻辑,下面分别介绍。
checkpoint机制
spark streaming job 可以通过checkpoint 的方式保存job执行断点,断点中有spark streaming context中的全部信息(包括有kakfa每个topic partition的offset)。checkpoint有两种方式,一个是checkpoint 数据和metadata,另一个只checkpoint metadata,一般情况只保存metadata即可,因此这里只介绍checkpoint metadata。
流程图 Startcheckpoint存在?从checkpoint得到sparkStreamingContextcheckpoint sparkStreamingContext数据到hdfs/tachyon读取数据启动task,处理数据End新建sparkStreamingContextyesno
代码实现 package com.nsfocus.bsa.example;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
/**
* Checkpoint example
*
* @author Shuai YUAN
* @date 2015/10/27
*/public class CheckpointTest {
private static String CHECKPOINT_DIR = "/checkpoint";
public static void main(String[] args) {
// get javaStreamingContext from checkpoint dir or create from sparkconf
JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(CHECKPOINT_DIR, new JavaStreamingContextFactory() {
public JavaStreamingContext create() {
return createContext();
}
});
jssc.start();
jssc.awaitTermination();
}
public static JavaStreamingContext createContext() {
SparkConf sparkConf = new SparkConf().setAppName("tachyon-test-consumer");
Set<String> topicSet = new HashSet<String>();
topicSet.add("test_topic");
HashMap<String, String> kafkaParam = new HashMap<String, String>();
kafkaParam.put("metadata.broker.list", "test1:9092,test2:9092");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
// do checkpoint metadata to hdfs
jssc.checkpoint(CHECKPOINT_DIR);
JavaPairInputDStream<String, String> message =
KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParam,
topicSet
);
JavaDStream<String> valueDStream = message.map(new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> v1) throws Exception {
return v1._2();
}
});
valueDStream.count().print();
return jssc;
}
}
开发者可以自己开发保存offset到zk的实现逻辑。spark streaming 的rdd可以被转换为HasOffsetRanges类型,进而得到所有partition的offset。
实现流程 start初始化kafka连接参数初始化kafka cluster对象利用kafka连接参数得到offsets集合出现异常?设置offsets为0初始化sparkStreamingContext初始化Kafka对应的DStream得到DStream中rdd对应的offsets处理数据...更新offset到kafka clusterendyesno
源码实现
scala的实现网上很容易搜到,这里贴个java实现的代码。 package com.xueba207.test;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import scala.Predef;
import scala.Tuple2;
import scala.collection.JavaConversions;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.Atomic