Apache Spark RDD累加器 accumulators 和 广播变量 broadcast variables

Apache Spark RDD累加器 accumulators 和 广播变量 broadcast variables

在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。

www.zeeklog.com  - Apache Spark RDD累加器 accumulators 和 广播变量 broadcast variables


(蓝色的就是副本)

但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。

为了满足这种需求,Spark提供了两种类型的变量:

1.累加器accumulators:累加器支持在所有不同节点之间进行累加计算(比如计数或者求和)

2.广播变量broadcast variables:广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。

累加器

不使用累加器

    val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    //使用scala集合完成累加
    var counter1: Int = 0;
    var data = Seq(1,2,3)
    data.foreach(x => counter1 += x )
    println(counter1)//6

    println("+++++++++++++++++++++++++")

运行结果:6

	//使用RDD进行累加
    var counter2: Int = 0;
    val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3]
    dataRDD.foreach(x => counter2 += x)
    println(counter2)//0

    println("-------------------------")

运行结果:0

上面的RDD操作运行结果是0
因为foreach中的函数是传递给Worker中的Executor执行,用到了counter2变量
而counter2变量在Driver端定义的,在传递给Executor的时候,各个Executor都有了一份counter2
最后各个Executor将各自个x加到自己的counter2上面了,和Driver端的counter2没有关系

使用累加器

通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。这时使用累加器就可以实现我们想要的效果。

    //使用累加器进行RDD累加
    val counter3: Accumulator[Int] = sc.accumulator(0)
    dataRDD.foreach(x => counter3 += x)
    println(counter3)//6

运行结果:6
这里sc.accumulator(0),这个参数0指的是初始值。
如果改为10,那么运行结果就是60

完整代码参考如下:

package demo06

import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}

object AccumulatorTest01 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    //使用scala集合完成累加
    var counter1: Int = 0;
    var data = Seq(1,2,3)
    data.foreach(x => counter1 += x )
    println(counter1)//6

    println("+++++++++++++++++++++++++")

    //使用RDD进行累加
    var counter2: Int = 0;
    val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3]
    dataRDD.foreach(x => counter2 += x)
    println(counter2)//0

    println("-------------------------")
    //注意:上面的RDD操作运行结果是0
    //因为foreach中的函数是传递给Worker中的Executor执行,用到了counter2变量
    //而counter2变量在Driver端定义的,在传递给Executor的时候,各个Executor都有了一份counter2
    //最后各个Executor将各自个x加到自己的counter2上面了,和Driver端的counter2没有关系

    //使用累加器进行RDD累加
    val counter3: Accumulator[Int] = sc.accumulator(0)
    dataRDD.foreach(x => counter3 += x)
    println(counter3)//6
  }
}

广播变量

在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。

不使用广播变量

www.zeeklog.com  - Apache Spark RDD累加器 accumulators 和 广播变量 broadcast variables


使用广播变量之前,数据在各个task里

使用广播变量

www.zeeklog.com  - Apache Spark RDD累加器 accumulators 和 广播变量 broadcast variables


使用广播变量后,数据在Driver和Executor里共享,方便每个Task调用

代码