Structured Streaming知识梳理
Structured Streaming是一个基于Spark SQL引擎的可扩展、容错的流处理引擎
Spark SQL:接收sql转化成算子,对数据进行计算。意义是:降低学习、开发难度/成本。
SparkStreaming:实现数据的实时计算。意义:提高数据的计算效率。
能否实现通过sql 计算实时数据???
答案:能(Structured Streaming)
特点:
不用去考虑是流式计算,还是批处理,只要使用同样的方式来编写计算操作即可
快速、可扩展、容错、端到端的一次性流处理
1.简洁的模型: 一个流想象成是无限增长的表格。
Structured Streaming基于SparkSQL(DF/DS)
DF=rdd+schema(结构)è 表
DS=rdd+schema +类型è 表
2.一致的 API:和 Spark SQL 共用大部分 API,批处理和流处理程序还可以共用代码
3.卓越的性能
4.多语言支持:Scala,Java,Python,R 和 SQL
编程模型
一个流的数据源从逻辑上来说就是一个不断增长的动态表格,新数据被持续不断地添加到表格的末尾
对动态数据源进行实时查询,就是对当前的表格内容执行一次 SQL 查询。
数据查询方式
触发器(Trigger)
定执行周期
核心的思想
将实时到达的数据不断追加到unbound table无界表
应用场景
将数据源 映射 为类 似于关系数据库中的表,(SparkSQL中的DF/DS)
然后将经过计算得到的结果映射为另一张表
一个流的输出有多种模式
查询后的完整结果
与上次查询相比的差异
追加最新的结果
StructStreaming计算WordCount图示
如图所示,
第一行表示从socket不断接收数据,
第二行可以看成是之前提到的“unbound table",
第三行为最终的wordCounts是结果集。
当有新的数据到达时,Spark会执行“增量"查询,并更新结果集;
Structured Streaming实战
Socket source (for testing): 从socket连接中读取文本内容。
File source: 以数据流的方式读取一个目录中的文件。支持text、csv、json、parquet等文件类型。
Kafka source: 从Kafka中拉取数据,与0.10或以上的版本兼容,后面单独整合Kafka
/*Socket source (for testing): 从socket连接中读取文本内容。*/
object demo01 {
def main(args: Array[String]): Unit = {
//1.创建
val spark = SparkSession.builder().appName("").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
//2.接入/读取最新数据
val socketDate: DataFrame = spark.readStream
.format("socket")
.option("host", "node01")
.option("port", "9999").load()
//3.根据业务进行预处理 和计算
import spark.implicits._
val socketDatasastring = socketDate.as[String]
//对数据进行拆分
val word: Dataset[String] = socketDatasastring.flatMap(a=>a.split(" "))
val wordCount: Dataset[Row] = word.groupBy("value").count().sort($"count".desc)
//4.计算结果输出
wordCount.writeStream.format("console")//数据输出到哪里
.outputMode("complete")//输出所有数据
.trigger(Trigger.ProcessingTime(0))//尽快计算 默认
.start()//开始任务
.awaitTermination()//等待关闭
}
} |
/ * File source: 以数据流的方式读取一个目录中的文件。支持text、csv、json、parquet等文件类型。 */ object demo02 { def main(args: Array[String]): Unit = { //1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSet val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") val Schema: StructType = new StructType() .add("name","string") .add("age","integer") .add("hobby","string") //2.接收数据 import spark.implicits._ // Schema must be specified when creating a streaming source DataFrame. val dataDF: DataFrame = //需要指定目录 spark.readStream.schema(Schema).json("D:\\dev\\dashuju\\spark\\第八天\\json") //3.处理数据 val result: Dataset[Row] = dataDF.filter($"age" < 25).groupBy("hobby").count().sort($"count".desc) //4.输出结果 result.writeStream .format("console") .outputMode("complete") .trigger(Trigger.ProcessingTime(0)) .start() .awaitTermination() } } |
/*Kafka source: 从Kafka中拉取数据,与0.10或以上的版本兼容,后面单独整合Kafka*/ object demo03 { def main(args: Array[String]): Unit = { //1.创建SparkSession val spark = SparkSession.builder().appName("").master("local[*]").getOrCreate() spark.sparkContext.setLogLevel("WARN") //2.连接kafka val dataFrame = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", "node01:9092") .option("subscribe", "18liuhneg") .load() //3.处理数据 import spark.implicits._ val dataFrameString: Dataset[String] = dataFrame.selectExpr("CAST(value AS String)").as[String] val word = dataFrameString.flatMap(_.split(" ")) val wordcount = word.groupBy("value").count().sort($"count".desc) //4.打印数据 wordcount.writeStream.format("console") .outputMode("complete") .trigger(Trigger.ProcessingTime(0)) .start() .awaitTermination() } } |