StructStreaming 流直接保存结果到JSON文件

StructStreaming 流直接保存结果到JSON文件

原理

将要保存的数据转为json格式,再保存。

这里我第一次接触StructSteaming输出JSON格式的文件,所以就用了foreachWriter,自定义了一个JSONSink,方便代码复用。
这种方法会重复的开关流,不推荐使用,仅供参考,如果能解决重复开关流的问题,就好了!。

第一种

package demo05


import java.io.FileWriter

import com.alibaba.fastjson.serializer.SerializerFeature
import com.alibaba.fastjson.JSON
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}


object HW01 {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("HW01")
      .master("local[*]")
      .getOrCreate()

    val sc = spark.sparkContext
    sc.setLogLevel("WARN")

    val socketDF: DataFrame = spark.readStream.format("socket")
      .option("host", "192.168.121.201")
      .option("port", "9999")
      .load()

    val JSONSink:JSONSink = new JSONSink
    socketDF.writeStream.foreach(JSONSink)
      .outputMode("append")
      .trigger(Trigger.ProcessingTime(0))
      .start()
      .awaitTermination()


  }
}


class JSONSink() extends ForeachWriter[Row]{
  var fw:FileWriter = _
  override def open(partitionId: Long, version: Long): Boolean = {
    fw = new FileWriter("E:\\cache\\sparkCache\\20200419\\abc.json",true)
    true
  }

  override def process(value: Row): Unit = {
    val str = value.get(0).toString
    val seq = (str,str.reverse)
    val JsonString: String = JSON.toJSONString(seq, SerializerFeature.WriteMapNullValue)
    println(JsonString)
    fw.write(JsonString+"\n")
    fw.flush()


  }

  override def close(errorOrNull: Throwable): Unit = {
    fw.close()
  }
}

第二种

package demo05

import org.apache.spark.sql.{DataFrame, SparkSession}

object HW0102 {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("HW0102")
      .master("local[*]")
      .getOrCreate()

    val sc = spark.sparkContext
    sc.setLogLevel("WARN")

    val socketDF: DataFrame = spark.readStream.format("socket")
      .option("host", "192.168.121.201")
      .option("port", "9999")
      .load()

    import spark.implicits._
    socketDF.as[String].flatMap(line =>{
      line.split("\\w+").map(word =>{//按空格切分
        (word,word.reverse)
      })
    }).toDF("原单词","反转单词")

    socketDF.writeStream.format("json")//支持 orc json csv
      .outputMode("append")
      .option("path","./file")//输出目录
      .option("checkpointLocation","./ck1")//必须指定checkpoint目录
      .start()
      .awaitTermination()


  }
}

两种的区别:

第一种比较自由,第二种必须指定ckpoint。
第一种不常用,第二种常用。
没有特殊原因不推荐使用第一种。