Structured Streaming知识梳理

Structured Streaming知识梳理

Structured Streaming是一个基于Spark SQL引擎的可扩展、容错的流处理引擎

Spark SQL:接收sql转化成算子,对数据进行计算。意义是:降低学习、开发难度/成本。

SparkStreaming:实现数据的实时计算。意义:提高数据的计算效率。

能否实现通过sql    计算实时数据???

答案:能(Structured Streaming)

特点:

不用去考虑是流式计算,还是批处理,只要使用同样的方式来编写计算操作即可

快速、可扩展、容错、端到端的一次性流处理

1.简洁的模型: 一个流想象成是无限增长的表格。

Structured Streaming基于SparkSQL(DF/DS)

DF=rdd+schema(结构)è 表

DS=rdd+schema +类型è  表

2.一致的 API:和 Spark SQL 共用大部分 API,批处理和流处理程序还可以共用代码

3.卓越的性能

4.多语言支持:Scala,Java,Python,R 和 SQL

编程模型

一个流的数据源从逻辑上来说就是一个不断增长的动态表格,新数据被持续不断地添加到表格的末尾

对动态数据源进行实时查询,就是对当前的表格内容执行一次 SQL 查询。

www.zeeklog.com  - Structured Streaming知识梳理

数据查询方式

触发器(Trigger)

定执行周期

核心的思想

将实时到达的数据不断追加到unbound table无界表

应用场景

将数据源 映射   为类   似于关系数据库中的表,(SparkSQL中的DF/DS)

然后将经过计算得到的结果映射为另一张表

www.zeeklog.com  - Structured Streaming知识梳理

一个流的输出有多种模式

查询后的完整结果

与上次查询相比的差异

追加最新的结果

StructStreaming计算WordCount图示

www.zeeklog.com  - Structured Streaming知识梳理

如图所示,

第一行表示从socket不断接收数据,

第二行可以看成是之前提到的“unbound table",

第三行为最终的wordCounts是结果集。

当有新的数据到达时,Spark会执行“增量"查询,并更新结果集;

Structured Streaming实战

Socket source (for testing): 从socket连接中读取文本内容。

File source: 以数据流的方式读取一个目录中的文件。支持text、csv、json、parquet等文件类型。

Kafka source: 从Kafka中拉取数据,与0.10或以上的版本兼容,后面单独整合Kafka

  /*Socket source (for testing): 从socket连接中读取文本内容。*/
 
object demo01 {
  def main(args: Array[String]): Unit = {
    //1.创建
    val spark = SparkSession.builder().appName("").master("local[*]").getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    //2.接入/读取最新数据
    val socketDate: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node01")
      .option("port", "9999").load()
    //3.根据业务进行预处理  和计算
    import spark.implicits._
    val socketDatasastring = socketDate.as[String]
    //对数据进行拆分
    val word: Dataset[String] = socketDatasastring.flatMap(a=>a.split(" "))
    val wordCount: Dataset[Row] = word.groupBy("value").count().sort($"count".desc)

    //4.计算结果输出
    wordCount.writeStream.format("console")//数据输出到哪里
      .outputMode("complete")//输出所有数据
      .trigger(Trigger.ProcessingTime(0))//尽快计算   默认
      .start()//开始任务
      .awaitTermination()//等待关闭
  }
}
/ *      File source: 以数据流的方式读取一个目录中的文件。支持text、csv、json、parquet等文件类型。
  */
object demo02 {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSet
    val spark: SparkSession =
      SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    val Schema: StructType = new StructType()
      .add("name","string")
      .add("age","integer")
      .add("hobby","string")
    //2.接收数据
    import spark.implicits._
    // Schema must be specified when creating a streaming source DataFrame.
    val dataDF: DataFrame =                 //需要指定目录
      spark.readStream.schema(Schema).json("D:\\dev\\dashuju\\spark\\第八天\\json")
    //3.处理数据
    val result: Dataset[Row] =
      dataDF.filter($"age" < 25).groupBy("hobby").count().sort($"count".desc)
    //4.输出结果
    result.writeStream
      .format("console")
      .outputMode("complete")
      .trigger(Trigger.ProcessingTime(0))
      .start()
      .awaitTermination()
  }
}
/*Kafka source: 从Kafka中拉取数据,与0.10或以上的版本兼容,后面单独整合Kafka*/
object demo03 {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark = SparkSession.builder().appName("").master("local[*]").getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    //2.连接kafka
    val dataFrame = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "node01:9092")
      .option("subscribe", "18liuhneg")
      .load()

    //3.处理数据
    import spark.implicits._
    val dataFrameString: Dataset[String] = dataFrame.selectExpr("CAST(value AS String)").as[String]
    val word = dataFrameString.flatMap(_.split(" "))
    val wordcount = word.groupBy("value").count().sort($"count".desc)

    //4.打印数据
    wordcount.writeStream.format("console")
      .outputMode("complete")
      .trigger(Trigger.ProcessingTime(0))
      .start()
      .awaitTermination()
  }
}