StructStreaming 流直接保存结果到JSON文件
原理
将要保存的数据转为json格式,再保存。
这里我第一次接触StructSteaming输出JSON格式的文件,所以就用了foreachWriter,自定义了一个JSONSink,方便代码复用。
这种方法会重复的开关流,不推荐使用,仅供参考,如果能解决重复开关流的问题,就好了!。
第一种
package demo05
import java.io.FileWriter
import com.alibaba.fastjson.serializer.SerializerFeature
import com.alibaba.fastjson.JSON
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
object HW01 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("HW01")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
val socketDF: DataFrame = spark.readStream.format("socket")
.option("host", "192.168.121.201")
.option("port", "9999")
.load()
val JSONSink:JSONSink = new JSONSink
socketDF.writeStream.foreach(JSONSink)
.outputMode("append")
.trigger(Trigger.ProcessingTime(0))
.start()
.awaitTermination()
}
}
class JSONSink() extends ForeachWriter[Row]{
var fw:FileWriter = _
override def open(partitionId: Long, version: Long): Boolean = {
fw = new FileWriter("E:\\cache\\sparkCache\\20200419\\abc.json",true)
true
}
override def process(value: Row): Unit = {
val str = value.get(0).toString
val seq = (str,str.reverse)
val JsonString: String = JSON.toJSONString(seq, SerializerFeature.WriteMapNullValue)
println(JsonString)
fw.write(JsonString+"\n")
fw.flush()
}
override def close(errorOrNull: Throwable): Unit = {
fw.close()
}
}
第二种
package demo05
import org.apache.spark.sql.{DataFrame, SparkSession}
object HW0102 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("HW0102")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
val socketDF: DataFrame = spark.readStream.format("socket")
.option("host", "192.168.121.201")
.option("port", "9999")
.load()
import spark.implicits._
socketDF.as[String].flatMap(line =>{
line.split("\\w+").map(word =>{//按空格切分
(word,word.reverse)
})
}).toDF("原单词","反转单词")
socketDF.writeStream.format("json")//支持 orc json csv
.outputMode("append")
.option("path","./file")//输出目录
.option("checkpointLocation","./ck1")//必须指定checkpoint目录
.start()
.awaitTermination()
}
}
两种的区别:
第一种比较自由,第二种必须指定ckpoint。
第一种不常用,第二种常用。
没有特殊原因不推荐使用第一种。