Apache Spark JavaAPI WordCount示例
MAVEN项目 需要先引入pom文件
本地运行
package demo02
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCountLocal {
def main(args: Array[String]): Unit = {
//1.创建SparkContext
val config = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
//2.读取文件
//A Resilient Distributed Dataset (RDD)弹性分布式数据集
//可以简单理解为分布式的集合,但是spark对它做了很多的封装,
//让程序员使用起来就像操作本地集合一样简单,这样大家就很happy了
val fileRDD: RDD[String] = sc.textFile("E:\\cache\\sparkCache\\20200403\\word.txt")
//3.处理数据
//3.1对每一行按空切分并压平形成一个新的集合中装的一个个的单词
//flatMap是对集合中的每一个元素进行操作,再进行压平
val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
//3.2每个单词记为1
val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_, 1))
//3.3根据key进行聚合,统计每个单词的数量
//wordAndOneRDD.reduceByKey((a,b)=>a+b)
//第一个_:之前累加的结果
//第二个_:当前进来的数据
val wordAndCount: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_ + _)
//4.收集结果
val result: Array[(String, Int)] = wordAndCount.collect()
result.foreach(println)
}
}
集群运行
package demo02
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
//1.创建SparkContext
val config = new SparkConf().setAppName("wc") //.setMaster("local[*]")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
//2.读取文件
//A Resilient Distributed Dataset (RDD)弹性分布式数据集
//可以简单理解为分布式的集合,但是spark对它做了很多的封装,
//让程序员使用起来就像操作本地集合一样简单,这样大家就很happy了
val fileRDD: RDD[String] = sc.textFile(args(0)) //文件输入路径
//3.处理数据
//3.1对每一行按空切分并压平形成一个新的集合中装的一个个的单词
//flatMap是对集合中的每一个元素进行操作,再进行压平
val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
//3.2每个单词记为1
val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_, 1))
//3.3根据key进行聚合,统计每个单词的数量
//wordAndOneRDD.reduceByKey((a,b)=>a+b)
//第一个_:之前累加的结果
//第二个_:当前进来的数据
val wordAndCount: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_ + _)
wordAndCount.saveAsTextFile(args(1)) //文件输出路径
//4.收集结果
//val result: Array[(String, Int)] = wordAndCount.collect()
//result.foreach(println)
}
}
写完代码后需要打包上传到集群运行。
提交到集群的参考代码:
- 执行命令提交到Spark-HA集群
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-submit \
--class cn.itcast.sparkhello.WordCount \
--master spark://node01:7077,node02:7077 \
--executor-memory 1g \
--total-executor-cores 2 \
/root/wc.jar \
hdfs://node01:8020/aa.txt \
hdfs://node01:8020/cc
- 执行命令提交到YARN集群
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-submit \
--class cn.itcast.sparkhello.WordCount \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 2 \
--queue default \
/root/wc.jar \
hdfs://node01:8020/wordcount/input/words.txt \
hdfs://node01:8020/wordcount/output5
java8版(了解)
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
public class WordCount_Java {
public static void main(String[] args){
SparkConf conf = new SparkConf().setAppName("wc").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> fileRDD = jsc.textFile("D:\\授课\\190429\\资料\\data\\words.txt");
JavaRDD<String> wordRDD = fileRDD.flatMap(s -> Arrays.asList(s.split(" ")).iterator());
JavaPairRDD<String, Integer> wordAndOne = wordRDD.mapToPair(w -> new Tuple2<>(w, 1));
JavaPairRDD<String, Integer> wordAndCount = wordAndOne.reduceByKey((a, b) -> a + b);
//wordAndCount.collect().forEach(t->System.out.println(t));
wordAndCount.collect().forEach(System.out::println);
//函数式编程的核心思想:行为参数化!
}
}
public class Test {
public static void main(String[] args){
new Thread(
new Runnable() {
@Override
public void run() {
System.out.println("java8");
}
}
).start();
//接下来使用Java8的lambda表达式(函数式编程)
new Thread(
()->System.out.println("java8")
).start();
}
}