Apache Spark JavaAPI WordCount示例

Apache Spark JavaAPI WordCount示例

MAVEN项目 需要先引入pom文件

本地运行

package demo02

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCountLocal {
  def main(args: Array[String]): Unit = {
    //1.创建SparkContext
    val config = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc = new SparkContext(config)
    sc.setLogLevel("WARN")
    //2.读取文件
    //A Resilient Distributed Dataset (RDD)弹性分布式数据集
    //可以简单理解为分布式的集合,但是spark对它做了很多的封装,
    //让程序员使用起来就像操作本地集合一样简单,这样大家就很happy了
    val fileRDD: RDD[String] = sc.textFile("E:\\cache\\sparkCache\\20200403\\word.txt")
    //3.处理数据
    //3.1对每一行按空切分并压平形成一个新的集合中装的一个个的单词
    //flatMap是对集合中的每一个元素进行操作,再进行压平

    val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
    //3.2每个单词记为1

    val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_, 1))
    //3.3根据key进行聚合,统计每个单词的数量
    //wordAndOneRDD.reduceByKey((a,b)=>a+b)
    //第一个_:之前累加的结果
    //第二个_:当前进来的数据
    val wordAndCount: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_ + _)
    //4.收集结果
    val result: Array[(String, Int)] = wordAndCount.collect()
    result.foreach(println)
  }
}

集群运行

package demo02

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    //1.创建SparkContext
    val config = new SparkConf().setAppName("wc") //.setMaster("local[*]")
    val sc = new SparkContext(config)
    sc.setLogLevel("WARN")
    //2.读取文件
    //A Resilient Distributed Dataset (RDD)弹性分布式数据集
    //可以简单理解为分布式的集合,但是spark对它做了很多的封装,
    //让程序员使用起来就像操作本地集合一样简单,这样大家就很happy了
    val fileRDD: RDD[String] = sc.textFile(args(0)) //文件输入路径
    //3.处理数据
    //3.1对每一行按空切分并压平形成一个新的集合中装的一个个的单词
    //flatMap是对集合中的每一个元素进行操作,再进行压平
    val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
    //3.2每个单词记为1
    val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_, 1))
    //3.3根据key进行聚合,统计每个单词的数量
    //wordAndOneRDD.reduceByKey((a,b)=>a+b)
    //第一个_:之前累加的结果
    //第二个_:当前进来的数据
    val wordAndCount: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_ + _)
    wordAndCount.saveAsTextFile(args(1)) //文件输出路径
    //4.收集结果
    //val result: Array[(String, Int)] = wordAndCount.collect()
    //result.foreach(println)
  }
}

写完代码后需要打包上传到集群运行。

提交到集群的参考代码:

  • 执行命令提交到Spark-HA集群
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-submit \
--class cn.itcast.sparkhello.WordCount \
--master spark://node01:7077,node02:7077 \
--executor-memory 1g \
--total-executor-cores 2 \
/root/wc.jar \
hdfs://node01:8020/aa.txt \
hdfs://node01:8020/cc
  • 执行命令提交到YARN集群
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-submit \
--class cn.itcast.sparkhello.WordCount \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 2 \
--queue default \
/root/wc.jar \
hdfs://node01:8020/wordcount/input/words.txt \
hdfs://node01:8020/wordcount/output5

java8版(了解)

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class WordCount_Java {
    public static void main(String[] args){
        SparkConf conf = new SparkConf().setAppName("wc").setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaRDD<String> fileRDD = jsc.textFile("D:\\授课\\190429\\资料\\data\\words.txt");
        JavaRDD<String> wordRDD = fileRDD.flatMap(s -> Arrays.asList(s.split(" ")).iterator());
        JavaPairRDD<String, Integer> wordAndOne = wordRDD.mapToPair(w -> new Tuple2<>(w, 1));
        JavaPairRDD<String, Integer> wordAndCount = wordAndOne.reduceByKey((a, b) -> a + b);
        //wordAndCount.collect().forEach(t->System.out.println(t));
        wordAndCount.collect().forEach(System.out::println);
        //函数式编程的核心思想:行为参数化!
    }
}

 

public class Test {
    public static void main(String[] args){
      new Thread(
         new Runnable() {
          @Override
          public void run() {
              System.out.println("java8");
          }
        }
      ).start();

      //接下来使用Java8的lambda表达式(函数式编程)
      new Thread(
              ()->System.out.println("java8")
      ).start();
    }
}