Spark 学习笔记
一、Spark 核心基础
1.1 什么是 Spark
Apache Spark 是一个快速、通用、可扩展的分布式计算引擎,基于 Scala 语言开发,由加州大学伯克利分校 AMP 实验室于 2010 年开源,2013 年捐赠给 Apache 软件基金会,成为顶级开源项目。
核心定位:解决 Hadoop MapReduce 计算速度慢、迭代计算效率低的问题,提供内存计算能力,支持多种计算场景(批处理、流处理、机器学习、图计算等)。
1.2 Spark 与 Hadoop 的对比
| 对比维度 | Spark | Hadoop MapReduce |
|---|
| 计算模型 | 内存计算,中间结果可缓存,支持迭代计算 | 磁盘计算,中间结果写入磁盘,不支持迭代优化 |
| 计算速度 | 比 MapReduce 快 10-100 倍(内存计算优势) | 速度较慢,受磁盘 I/O 限制 |
| 适用场景 | 批处理、流处理、机器学习、图计算、交互式查询 | 仅适用于大规模批处理,场景单一 |
| 依赖关系 | 可独立运行,也可依赖 Hadoop 的 HDFS(存储)、YARN(调度) | 依赖 Hadoop 生态(HDFS 存储、YARN 调度) |
1.3 Spark 核心优势
- 快速:基于内存计算,减少磁盘 I/O 开销,迭代计算无需重复读写数据。
- 通用:提供统一的 API,支持批处理(Spark Core)、流处理(Spark Streaming/Structured Streaming)、机器学习(MLlib)、图计算(GraphX)四大核心模块。
- 可扩展:支持水平扩展,可部署在单机、集群(Standalone、YARN、Mesos、K8s),节点数量可动态调整。
- 易用:支持 Scala、Java、Python、R、SQL 等多种语言,API 简洁,开发效率高。
1.4 Spark 运行环境
1.4.1 运行模式分类
- Local 模式(本地模式):单机运行,适用于开发、测试、小规模数据处理(无需集群),核心参数为 local[N](N 为 CPU 核心数,local[*]表示使用全部核心)。
- Standalone 模式(独立集群模式):Spark 自带的集群调度模式,无需依赖其他组件,部署简单,适用于中小规模集群。
- YARN 模式(Hadoop YARN 模式):最常用的模式,依赖 Hadoop 的 YARN 作为资源调度器,HDFS 作为存储,适用于大规模集群,与 Hadoop 生态无缝集成。
- Mesos 模式:依赖 Apache Mesos 作为资源调度器,适用于多框架(Spark、Hadoop、Storm 等)共享集群资源的场景。
- K8s 模式(容器化模式):基于 Kubernetes 部署 Spark 集群,适用于容器化运维环境,可实现资源的动态调度和弹性伸缩。
1.4.2 核心运行环境配置(以 YARN 模式为例)
关键配置文件(spark/conf 目录):
spark-env.sh:配置环境变量(如 JAVA_HOME、HADOOP_HOME、YARN_CONF_DIR、Spark 集群节点信息)。
spark-defaults.conf:配置 Spark 默认参数(如指定 Master 为 yarn、指定 Driver/Executor 内存、指定日志存储路径)。
slaves(Standalone 模式用):配置集群从节点(Worker)的主机名。
二、Spark 核心组件与架构
2.1 Spark 核心架构组件
Spark 集群运行时,分为**Driver(驱动程序)和Executor(执行器)**两大核心角色,配合 Cluster Manager(集群管理器)实现资源调度和任务执行。
2.1.1 Driver(驱动节点)
核心职责:
- 负责解析用户提交的代码(如 Spark SQL、Scala/Java 代码),生成抽象语法树(AST),转化为 DAG(有向无环图)任务。
- 对 DAG 任务进行划分,拆分为多个 Stage(阶段),每个 Stage 包含多个 Task(任务)。
- 向 Cluster Manager 申请集群资源(CPU、内存),并将 Task 分配给 Executor 执行。
- 监控 Task 的执行状态,处理 Task 失败重试、任务中断等异常情况。
- 收集 Executor 执行后的结果,汇总并返回给用户。
2.1.2 Executor(执行节点)
核心职责:
- 接收 Driver 分配的 Task,执行具体的计算逻辑(如 Map、Reduce、Filter 等操作)。
- 负责本地数据的存储和缓存(使用内存或磁盘缓存中间结果,提升迭代计算效率)。
- 将 Task 执行的中间结果或最终结果反馈给 Driver。
- 每个 Executor 是一个独立的 JVM 进程,在集群的 Worker 节点上运行,可并行执行多个 Task(Task 数量由 CPU 核心数决定)。
2.1.3 Cluster Manager(集群管理器)
核心职责:负责集群资源(CPU、内存)的分配和管理,协调 Driver 和 Executor 之间的通信,不同运行模式对应不同的 Cluster Manager:
- Local 模式:内置简单的 Cluster Manager,无需单独部署。
- Standalone 模式:Spark 自带的 Cluster Manager(Master+Worker)。
- YARN 模式:Hadoop YARN 的 ResourceManager(资源管理)+ NodeManager(节点管理)。
- K8s 模式:Kubernetes 的 API Server、Scheduler 等组件。
2.2 DAG(有向无环图)与 Stage 划分
2.2.1 DAG 概念
Spark 将用户提交的计算任务转化为 DAG(有向无环图),其中每个节点代表一个 RDD(弹性分布式数据集)的转换操作,边代表 RDD 之间的依赖关系(血缘关系),无循环依赖,确保任务可有序执行。
2.2.2 Stage 划分规则
Driver 会根据 DAG 中 RDD 的依赖关系,将任务划分为多个 Stage(阶段),Stage 之间是串行执行,Stage 内部的 Task 是并行执行。
划分依据:宽依赖(Shuffle 依赖)和窄依赖(Narrow 依赖)。
- 窄依赖:一个父 RDD 的分区只对应一个子 RDD 的分区(如 Map、Filter、MapPartitions),无需 Shuffle,可在同一个 Stage 内完成。
- 宽依赖:一个父 RDD 的分区对应多个子 RDD 的分区(如 GroupByKey、ReduceByKey、Join),需要进行 Shuffle(数据重新分区、网络传输),宽依赖是 Stage 划分的边界,每个宽依赖对应一个新的 Stage。
2.2.3 Shuffle(洗牌)机制
Shuffle 是 Spark 中最耗时的操作,核心是将 Executor 之间的数据进行重新分区和网络传输,分为两个阶段:
- Map 阶段:Executor 执行 Task,将计算结果按照指定的分区规则(Hash 分区、Range 分区等)写入本地磁盘的临时文件。
- Reduce 阶段:每个 Reduce Task 从多个 Map Task 的临时文件中读取对应分区的数据,进行合并、计算,最终输出结果。
优化要点:减少 Shuffle 操作(如用 ReduceByKey 替代 GroupByKey)、调整 Shuffle 分区数、开启 Shuffle 缓存等。
2.3 RDD(弹性分布式数据集)核心概念
2.3.1 RDD 定义
RDD(Resilient Distributed Dataset)是 Spark 的核心抽象,代表一个不可变、可分区、可并行计算的分布式数据集,具备以下特性:
- 不可变:RDD 创建后无法修改,只能通过转换操作(Transformation)生成新的 RDD。
- 可分区:RDD 的数据被划分为多个分区(Partition),分布在集群的不同 Executor 上,分区数量可配置(默认与 HDFS 块大小一致,128M/256M)。
- 弹性:支持数据的容错(基于血缘关系自动恢复丢失的分区)、动态调整分区数量。
- 可并行:每个分区的计算可独立并行执行,互不干扰。
2.3.2 RDD 的创建方式
- 从本地集合创建(适用于 Local 模式测试):
parallelize():将本地集合转化为 RDD,可指定分区数。
makeRDD():与 parallelize() 功能类似,支持指定分区的偏好位置(Preferred Locations)。
- 从外部存储创建(最常用):
textFile():读取文本文件(本地文件、HDFS、S3 等),每行作为一个元素,返回 RDD[String]。
read.csv()/read.json():通过 Spark SQL 的 API 读取结构化文件,返回 DataFrame(可转换为 RDD)。
hadoopFile():读取 Hadoop 支持的文件格式(如 SequenceFile、MapFile)。
- 从其他 RDD 转换创建:通过 Transformation 操作(如 Map、Filter)从已有的 RDD 生成新的 RDD。
2.3.3 RDD 的两大操作类型
Spark 中 RDD 的操作分为Transformation(转换操作)和 Action(行动操作),核心区别:是否触发任务执行(惰性求值)。
(1)Transformation(转换操作)
特性:惰性求值,仅记录 RDD 之间的依赖关系(血缘关系),不立即执行计算,只有当 Action 操作被调用时,才会触发整个 DAG 的执行。
常用转换操作:
Map(func):对 RDD 中的每个元素应用 func 函数,返回一个新的 RDD(一对一映射)。
Filter(func):对 RDD 中的每个元素应用 func 函数,筛选出返回值为 true 的元素,返回新的 RDD。
FlatMap(func):对 RDD 中的每个元素应用 func 函数(返回一个迭代器),将迭代器中的元素扁平化,返回新的 RDD(一对多映射)。
MapPartitions(func):对 RDD 的每个分区应用 func 函数(输入是整个分区的迭代器),效率比 Map 高(减少函数调用次数)。
GroupByKey():按照元素的 key 进行分组,返回(key, 迭代器)形式的 RDD(宽依赖,有 Shuffle)。
ReduceByKey(func):按照元素的 key 进行分组,对每个组内的元素应用 func 函数进行聚合(宽依赖,有 Shuffle,比 GroupByKey 高效,因为会在本地先聚合)。
SortByKey(ascending=true):按照元素的 key 进行排序,ascending 指定升序/降序。
Join(otherRDD, how="inner"):将两个 RDD 按照 key 进行连接(inner、left、right、full outer join),宽依赖,有 Shuffle。
Union(otherRDD):将两个 RDD 合并,返回一个包含所有元素的新 RDD(分区数相加,不去重)。
Intersection(otherRDD):求两个 RDD 的交集,返回新的 RDD(去重,有 Shuffle)。
(2)Action(行动操作)
特性:立即触发计算,执行整个 DAG 任务,将结果返回给 Driver 或写入外部存储。
常用行动操作:
count():返回 RDD 中元素的个数。
first():返回 RDD 中的第一个元素。
take(n):返回 RDD 中的前 n 个元素。
collect():将 RDD 中的所有元素收集到 Driver 端(注意:数据量不能太大,否则会导致 Driver 内存溢出)。
reduce(func):对 RDD 中的元素应用 func 函数进行聚合,返回一个单一值(如求和、求最大值)。
foreach(func):对 RDD 中的每个元素应用 func 函数,无返回值(如写入外部存储)。
saveAsTextFile(path):将 RDD 中的元素写入文本文件(本地路径、HDFS 等),每个分区对应一个文件。
saveAsSequenceFile(path):将 RDD 中的元素写入 SequenceFile(Hadoop 支持的二进制文件格式)。
2.3.4 RDD 的缓存与持久化
核心目的:当 RDD 需要被多次使用(如迭代计算、多次 Action 操作)时,将其缓存到内存或磁盘,避免重复计算,提升效率。
(1)缓存 API
cache():默认将 RDD 缓存到内存(等价于 persist(StorageLevel.MEMORY_ONLY))。
persist(storageLevel):手动指定存储级别,支持多种存储策略:
MEMORY_ONLY:仅缓存到内存,内存不足时,不缓存,下次使用重新计算(默认)。
MEMORY_AND_DISK:优先缓存到内存,内存不足时,将剩余数据缓存到磁盘。
DISK_ONLY:仅缓存到磁盘。
MEMORY_ONLY_SER:将 RDD 元素序列化后缓存到内存(节省内存,序列化有开销)。
MEMORY_AND_DISK_SER:序列化后缓存到内存和磁盘。
unpersist():清除 RDD 的缓存(释放内存/磁盘资源)。
(2)注意事项
- 缓存是惰性的,只有当第一个 Action 操作触发时,才会将 RDD 缓存到指定存储介质。
- 缓存的 RDD 会保留血缘关系,当缓存数据丢失(如内存溢出、节点故障)时,可通过血缘关系重新计算恢复。
- 避免缓存不必要的 RDD,否则会浪费资源;对于一次性使用的 RDD,无需缓存。
2.3.5 RDD 的血缘关系与容错机制
- 血缘关系(Lineage):RDD 之间的依赖关系构成血缘关系,每个 RDD 都记录了自己的父 RDD 和转换操作,形成一个血缘链条。
- 容错机制:基于血缘关系实现容错,当某个 RDD 的分区丢失(如 Executor 节点故障)时,Spark 无需重新计算整个 RDD,只需根据血缘关系,重新计算丢失的分区即可,大大提升容错效率。
三、Spark SQL 核心知识
3.1 Spark SQL 简介
Spark SQL 是 Spark 用于处理结构化数据的模块,基于 Spark Core 构建,支持 SQL 查询和 DataFrame/Dataset API,可与 Spark 的其他模块(如 Spark Streaming、MLlib)无缝集成。
核心优势:
- 统一的 API:支持 SQL 和 DataFrame/Dataset 两种方式操作结构化数据,灵活易用。
- 优化的执行计划:内置 Catalyst 优化器,可对 SQL 语句和 DataFrame 操作进行优化(如谓词下推、列裁剪、Join 优化),提升执行效率。
- 兼容多种数据源:支持读取 HDFS、Hive、MySQL、PostgreSQL、JSON、CSV 等多种结构化数据源,也可写入多种存储。
- 与 Hive 兼容:可直接访问 Hive 的元数据(Metastore),执行 Hive SQL 语句,无需修改现有 Hive 数据。
3.2 DataFrame 与 Dataset
3.2.1 DataFrame
DataFrame 是一个分布式的表格型数据结构,包含行和列,每个列有名称和数据类型(Schema),类似于关系型数据库的表,也类似于 Pandas 的 DataFrame,但分布式存储在集群中。
特点:
- 有 Schema(结构信息):明确每个列的名称和数据类型(如 String、Int、Double),便于优化和类型检查。
- 不可变:与 RDD 一样,DataFrame 创建后无法修改,只能通过转换操作生成新的 DataFrame。
- 支持多种操作:可通过 SQL 语句操作,也可通过 DataFrame API(如 select、filter、groupBy)操作。
- 底层基于 RDD:DataFrame 本质上是一个包含 Schema 信息的 RDD[Row](Row 表示一行数据)。
3.2.2 Dataset
Dataset 是 Spark 1.6 版本引入的新抽象,结合了 RDD 的强类型和 DataFrame 的结构化优势,支持编译时类型检查,比 DataFrame 更安全、更灵活。
特点:
- 强类型:Dataset 中的元素是特定的 Java/Scala 对象(如 Case Class),编译时可检查类型错误,避免运行时类型异常。
- 有 Schema:与 DataFrame 一样,包含 Schema 信息,支持结构化查询。
- 兼容 RDD 和 DataFrame:可与 RDD、DataFrame 相互转换(Dataset <-> DataFrame <-> RDD)。
- 支持两种 API:Typed API(针对强类型对象的操作,如 map、filter)和 Untyped API(与 DataFrame API 一致,如 select、groupBy)。
3.2.3 RDD、DataFrame、Dataset 对比
| 对比维度 | RDD | DataFrame | Dataset |
|---|
| 类型安全 | 强类型(编译时检查) | 弱类型(运行时检查) | 强类型(编译时检查) |
| Schema 信息 | 无 Schema(仅存储数据) | 有 Schema(结构化) | 有 Schema(结构化) |
| 优化支持 | 无优化(依赖用户手动优化) | 支持 Catalyst 优化 | 支持 Catalyst 优化 |
| API 灵活性 | 高(支持任意复杂操作) | 中(结构化操作为主) | 高(结合 RDD 和 DataFrame 优势) |
| 适用场景 | 非结构化/半结构化数据、复杂计算 | 结构化数据、SQL 查询、快速分析 | 结构化数据、强类型需求、复杂业务逻辑 |
3.2.4 三者相互转换
- RDD → DataFrame:
- 通过 toDF() 方法(需导入 spark.implicits._),自动推断 Schema(或手动指定 Schema)。
- 示例:rdd.toDF("name", "age")
- DataFrame → RDD:
- 通过 rdd 属性,返回 RDD[Row]。
- 示例:df.rdd
- DataFrame → Dataset:
- 通过 as[CaseClass] 方法,指定强类型对象。
- 示例:df.as[Person](Person 为 Case Class)
- Dataset → DataFrame:
- 通过 toDF() 方法,或使用 selectExpr 指定列名。
- 示例:ds.toDF()
- Dataset → RDD:
- 通过 rdd 属性,返回 RDD[CaseClass]。
- 示例:ds.rdd
- RDD → Dataset:
- 先将 RDD 转换为 DataFrame,再转换为 Dataset;或直接通过 toDS() 方法(需导入 spark.implicits._)。
- 示例:rdd.map(x => Person(x._1, x._2)).toDS()
3.3 Spark SQL 实操要点
3.3.1 SparkSession(入口对象)
Spark SQL 的入口对象是 SparkSession,替代了 Spark 1.x 版本中的 SQLContext 和 HiveContext,负责创建 DataFrame、执行 SQL 查询、访问数据源等。
创建 SparkSession 示例(Scala):
import org.apache.spark.sql.SparkSession
object SparkSQLDemo {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("SparkSQLDemo")
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
// 导入隐式转换(用于 RDD→DataFrame 等操作)
import spark.implicits._
// 执行 SQL 查询、操作 DataFrame/Dataset...
// 关闭 SparkSession
spark.stop()
}
}
3.3.2 读取结构化数据源
Spark SQL 支持读取多种结构化数据源,常用 API:
- 读取 CSV 文件:spark.read.csv("path").toDF("col1", "col2"),可指定分隔符、是否有表头(header=true)、数据类型(inferSchema=true)。
- 读取 JSON 文件:spark.read.json("path"),自动推断 Schema。
- 读取 Hive 表:spark.sql("select * from hive_db.hive_table"),需开启 enableHiveSupport()。
- 读取 JDBC 数据库(MySQL/PostgreSQL):
spark.read.format("jdbc")
.option("url", "jdbc:mysql://host:port/dbname")
.option("dbtable", "table_name")
.option("user", "username")
.option("password", "password")
.load()
3.3.3 执行 SQL 查询
Spark SQL 支持两种 SQL 查询方式:
- 直接执行 SQL 语句:通过 spark.sql("sql 语句"),返回 DataFrame。
- 示例:val df = spark.sql("select name, age from user where age > 18")
- 创建临时视图(Temp View)后查询:
- createTempView("view_name"):创建会话级临时视图,仅当前 SparkSession 有效,会话关闭后视图消失。
- createGlobalTempView("view_name"):创建全局临时视图,所有 SparkSession 共享,查询时需加 global_temp 前缀(如 select * from global_temp.view_name)。
- 示例:df.createTempView("user_view"); spark.sql("select * from user_view")
3.3.4 写入结构化数据源
常用写入 API:df.write.format("格式").option("参数").save("路径")
- 写入 CSV/JSON 文件:df.write.csv("path")、df.write.json("path"),可指定 mode(overwrite 覆盖、append 追加、ignore 忽略、errorifexists 报错)。
- 写入 Hive 表:df.write.mode("overwrite").saveAsTable("hive_db.hive_table")。
- 写入 JDBC 数据库:
df.write.format("jdbc")
.option("url", "jdbc:mysql://host:port/dbname")
.option("dbtable", "table_name")
.option("user", "username")
.option("password", "password")
.mode("append")
.save()
四、Spark Streaming 核心知识
4.1 Spark Streaming 简介
Spark Streaming 是 Spark 用于处理实时流数据的模块,基于 Spark Core 构建,采用'微批处理'(Micro-Batching)机制,将实时流数据切分为一系列小的批处理任务,批量处理数据,实现近实时计算(延迟通常在秒级)。
核心优势:
- 易用性:API 与 Spark Core/RDD 高度一致,熟悉 RDD 操作即可快速上手。
- 容错性:基于 RDD 的血缘关系实现容错,流数据的每个微批都对应一个 RDD,故障时可重新计算。
- 可扩展性:与 Spark 集群无缝集成,支持水平扩展,可处理大规模流数据。
- 集成性:可与 Spark SQL、MLlib、GraphX 等模块集成,实现流数据的实时分析、机器学习等场景。
注意:Spark Streaming 是'微批处理',并非真正的实时流处理(如 Flink 的流式处理),适用于延迟要求不高(秒级)的场景;若需要毫秒级延迟,建议使用 Flink。
4.2 Spark Streaming 核心概念
4.2.1 DStream(离散流)
DStream(Discretized Stream)是 Spark Streaming 的核心抽象,代表一个连续的流数据序列,本质上是一系列连续的 RDD(每个 RDD 对应一个微批的数据)。
特性:
- 不可变:与 RDD 一样,DStream 创建后无法修改,只能通过转换操作生成新的 DStream。
- 微批构成:每个 DStream 由多个微批的 RDD 组成,每个 RDD 包含一个时间间隔(Batch Interval)内的数据。
- 操作类型:支持转换操作(Transformation)和输出操作(Output Operation),与 RDD 的操作类似。
4.2.2 Batch Interval(批处理间隔)
核心参数,指定将流数据切分为微批的时间间隔(如 1 秒、5 秒),决定了 Spark Streaming 的延迟时间(延迟 ≈ 批处理间隔)。
注意:批处理间隔的设置需结合集群性能和业务需求,间隔太小(如 100ms)会导致任务过多,集群压力大;间隔太大(如 1 分钟)会导致延迟过高。
4.2.3 StreamingContext(流处理入口)
StreamingContext 是 Spark Streaming 的入口对象,负责创建 DStream、启动流处理、停止流处理,基于 SparkContext 创建。
创建 StreamingContext 示例(Scala):
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.SparkConf
object SparkStreamingDemo {
def main(args: Array[String]): Unit = {
// 1. 创建 SparkConf
val conf = new SparkConf().setAppName("SparkStreamingDemo").setMaster("local[*]")
// 2. 创建 StreamingContext,指定批处理间隔为 1 秒
val ssc = new StreamingContext(conf, Seconds(1))
// 3. 创建 DStream、执行操作...
// 4. 启动流处理(开始接收数据并处理)
ssc.start()
// 5. 等待流处理停止(阻塞当前线程,直到手动停止或发生异常)
ssc.awaitTermination()
// 6. 停止流处理(可选,手动停止)
ssc.stop()
}
}
4.3 DStream 的操作
4.3.1 转换操作(Transformation)
与 RDD 的转换操作类似,分为无状态转换和有状态转换。
(1)无状态转换
每个微批的处理独立于其他微批,不依赖历史数据,常用操作:
map(func):对 DStream 中的每个元素应用 func 函数,返回新的 DStream。
filter(func):筛选出符合条件的元素,返回新的 DStream。
flatMap(func):扁平化处理,与 RDD 的 flatMap 一致。
reduceByKey(func):按照 key 聚合,每个微批内独立聚合(不依赖历史微批的数据)。
join(otherDStream):将两个 DStream 按照 key 连接,每个微批内独立连接。
(2)有状态转换
处理当前微批时,依赖历史微批的数据(如累计计数、滑动窗口统计),常用操作:
updateStateByKey(func):维护每个 key 的状态,根据当前微批的数据和历史状态,更新 key 的新状态(如累计求和、计数)。
window(windowDuration, slideDuration):创建滑动窗口 DStream,窗口时长(windowDuration)指定窗口包含的微批数量,滑动步长(slideDuration)指定窗口滑动的微批数量(如窗口时长 10 秒,滑动步长 5 秒,即每 5 秒统计一次过去 10 秒的数据)。
reduceByKeyAndWindow(func, windowDuration, slideDuration):在滑动窗口内,按照 key 聚合数据。
注意:使用有状态转换时,需设置检查点(Checkpoint),用于保存历史状态数据,防止节点故障导致状态丢失。
4.3.2 输出操作(Output Operation)
触发 DStream 的计算,将处理结果输出到外部存储或控制台,常用操作:
print():将每个微批的前 10 条数据打印到控制台(适用于测试)。
foreachRDD(func):对 DStream 中的每个 RDD 应用 func 函数,可将数据写入外部存储(如 HDFS、MySQL、Kafka)。
saveAsTextFiles(prefix, suffix):将每个微批的 RDD 写入文本文件,文件名由前缀、时间戳、后缀组成。
4.4 数据源接入
Spark Streaming 支持接入多种实时数据源,分为基础数据源和高级数据源。
4.4.1 基础数据源
- 文件数据源:监控指定目录,当有新文件写入时,读取文件数据作为微批(支持 HDFS、本地文件等),示例:ssc.textFileStream("hdfs://path")。
- Socket 数据源:从指定的 Socket 端口接收数据(适用于测试),示例:ssc.socketTextStream("host", port)。
- RDD 队列数据源:将本地 RDD 队列作为数据源,适用于测试,示例:ssc.queueStream(rddQueue)。
4.4.2 高级数据源
需导入对应的依赖包(如 Kafka、Flume),常用:
- Kafka 数据源(最常用):通过 Spark Streaming 的 Kafka 集成包,接收 Kafka 主题中的数据,支持两种集成方式:Receiver-based(旧版本,效率低)和 Direct Stream(新版本,效率高,推荐)。
- Flume 数据源:接收 Flume 采集的日志数据,支持推模式(Flume push to Spark)和拉模式(Spark pull from Flume)。
- Kinesis 数据源:接收 Amazon Kinesis 的流数据。
4.5 检查点(Checkpoint)机制
4.5.1 核心作用
- 保存状态数据:用于有状态转换(如 updateStateByKey、window),保存历史状态,防止节点故障导致状态丢失。
- 恢复 Driver:当 Driver 节点故障时,可从检查点中恢复 Driver 的状态,重启流处理任务,无需重新启动。
4.5.2 配置检查点
通过 StreamingContext 的 checkpoint() 方法指定检查点存储路径(本地路径或 HDFS 路径),示例:
// 设置检查点路径(HDFS 路径,推荐)
ssc.checkpoint("hdfs://path/to/checkpoint")
注意:检查点路径需是可靠的存储(如 HDFS),本地路径仅适用于 Local 模式测试,集群模式下会导致节点故障后无法恢复。
五、Spark 常见问题与优化
5.1 常见问题及解决方案
5.1.1 Driver 内存溢出(OOM)
原因:Driver 内存不足,常见于 collect() 操作读取大量数据、广播变量过大、任务过多导致 Driver 负载过高。
解决方案:
- 避免使用 collect() 操作读取大量数据,改用 take()、saveAsTextFile() 等操作。
- 调整 Driver 内存:通过–driver-memory 参数指定(如 spark-submit --driver-memory 4g)。
- 优化广播变量:减少广播变量的大小,避免广播大量数据。
5.1.2 Executor 内存溢出(OOM)
原因:Executor 内存不足,常见于缓存大量 RDD、Shuffle 数据量过大、单个 Task 处理的数据过多。
解决方案:
- 调整 Executor 内存:通过–executor-memory 参数指定(如 spark-submit --executor-memory 8g)。
- 优化缓存策略:减少不必要的 RDD 缓存,使用 MEMORY_AND_DISK 存储级别,避免仅缓存到内存。
- 调整 Shuffle 分区数:增加 Shuffle 分区数(–conf spark.sql.shuffle.partitions=200),减少单个分区的数据量。
- 优化 Task 粒度:增加 Task 数量,使每个 Task 处理的数据量适中(建议每个 Task 处理 128M-256M 数据)。
5.1.3 Shuffle 性能低下
原因:Shuffle 操作涉及网络传输和磁盘 I/O,是 Spark 中最耗时的操作,常见于分区数不合理、数据倾斜、序列化效率低。
解决方案:
- 调整 Shuffle 分区数:根据集群 CPU 核心数和数据量调整,一般设置为 CPU 核心数的 2-3 倍(如 100-200 个分区)。
- 避免不必要的 Shuffle:用 ReduceByKey 替代 GroupByKey,用 Broadcast Join 替代普通 Join(小表 Join 大表时)。
- 解决数据倾斜:
- 对倾斜的 key 进行拆分(如加盐),分散到多个分区。
- 使用随机前缀,将倾斜的 key 分散到不同的 Reduce Task。
- 过滤掉异常倾斜的 key(如无效数据)。
- 优化序列化:使用 Kyro 序列化替代 Java 序列化(–conf spark.serializer=org.apache.spark.serializer.KryoSerializer),减少数据传输量和内存占用。
5.1.4 任务执行缓慢
原因:CPU 核心不足、内存分配不合理、数据倾斜、Task 数量过少。
解决方案:
- 增加集群节点或 CPU 核心数,提升并行计算能力。
- 合理分配内存:区分 Executor 的堆内存和堆外内存,避免内存浪费。
- 解决数据倾斜(参考上面的 Shuffle 优化)。
- 增加 Task 数量,提升并行度(Task 数量建议大于等于集群总 CPU 核心数)。
5.2 核心优化策略
5.2.1 资源优化
- Driver 优化:合理设置 Driver 内存(根据任务复杂度),避免 Driver 成为瓶颈。
- Executor 优化:
- 设置合适的 Executor 数量(–num-executors),一般为集群节点数的 2-3 倍。
- 设置合适的 Executor 内存(–executor-memory)和 CPU 核心数(–executor-cores),避免单 Executor 占用过多资源。
- 内存优化:开启堆外内存(–conf spark.executor.memoryOverhead),用于存储序列化数据、Shuffle 临时数据,避免堆内存溢出。
5.2.2 计算优化
- 减少 Shuffle 操作:优先使用窄依赖操作(如 Map、Filter),避免不必要的宽依赖(如 GroupByKey、Join),从源头减少网络传输和磁盘 I/O 开销,这是计算优化的核心前提(与 5.1.3 Shuffle 性能低下的优化逻辑呼应)。
- 优化 Task 粒度:合理设置 Task 数量,一般建议 Task 数量为集群总 CPU 核心数的 2-3 倍,确保每个 Task 处理的数据量控制在 128M-256M 之间。既避免 Task 过大导致单个 Executor 内存压力激增,也防止 Task 过小造成集群任务调度开销过高、资源利用率不足。
- 使用高效的算子:优先选择高性能算子替代低效算子,降低计算开销。例如用 ReduceByKey 替代 GroupByKey(ReduceByKey 支持本地预聚合,可大幅减少 Shuffle 阶段的数据量);简单筛选后映射场景,可用 filter+map 替代 flatMap 提升效率;去重操作可通过 groupByKey+first() 自定义实现,减少 distinct 算子的冗余计算。
- 优化数据格式:采用高效存储和序列化格式,进一步降低 I/O 和内存消耗。存储层面,用 Parquet、ORC 等列式存储格式替代文本格式,可减少不必要的字段读取,提升磁盘读取效率;序列化层面,推荐使用 Kyro 序列化替代默认的 Java 序列化,序列化效率可提升 30%-50%,同时大幅减少内存占用和网络传输的数据量。
- 避免数据倾斜:从计算层面进一步规避倾斜问题。可通过数据预处理过滤无效倾斜 key、大表与大表 Join 时采用分桶 Join 减少 Shuffle 压力、根据数据实时分布动态调整分区数等方式,确保计算任务均匀分配到各个 Executor,避免单个 Task 长期阻塞拖慢整体任务进度。
- 利用惰性求值特性:充分发挥 Spark 惰性求值的优势,合理规划 Transformation 和 Action 操作的执行顺序。将过滤、列裁剪等轻量级操作前置,提前筛选无效数据、剔除不必要字段,减少后续计算的数据量;同时避免频繁调用 Action 操作,尽量将多个关联的 Action 操作合并执行,减少 DAG 的重复构建和计算,提升整体执行效率。