跳到主要内容
极客日志极客日志
首页博客AI提示词GitHub精选代理工具
搜索
|注册
博客列表
Javajava

Spark 离线开发框架设计与实现

综述由AI生成Spark 离线开发框架针对数据回溯效率低、维护成本高的问题,设计了基础框架、可扩展工具及应用程序三层架构。通过配置分离与通用 UDF 封装简化开发流程,支持 SQL 与 Java 双模式应用。核心亮点在于回溯功能,通过合并环境准备与释放时间、支持断点续回及并行策略,将大规模数据回溯耗时从数天缩短至数小时,显著提升研发效能。

月光旅人发布于 2025/2/4更新于 2026/4/273 浏览
Spark 离线开发框架设计与实现

Spark 离线开发框架设计与实现

一、背景

随着 Spark 及其社区的持续演进,技术架构和性能优势日益凸显,目前已成为大数据处理的主流选择。Spark 支持 Scala、Java、SQL、Python 等多种语言开发。

Spark SQL 兼容 Hive,易于整合,上手快,适合简单数据处理。但在面对复杂逻辑或深度分析时,纯 SQL 方案往往显得力不从心,维护成本也较高,此时使用高级语言编写代码会更高效。

在日常数据仓库开发中,除了常规任务,还涉及大量数据回溯工作。对于创新型业务,口径变化频繁,数据回溯几个月甚至一年很常见。传统方式效率低,且需人力密切关注各任务状态。

针对上述痛点,我们设计了一套 Spark 离线开发框架。该框架不仅让开发变得简单高效,还能在无需额外开发的情况下,快速完成大规模数据回溯。

(注:此处展示框架解决的问题及解决方案对比)

二、框架设计

框架旨在封装重复性工作,降低开发门槛。整体架构分为基础框架、可扩展工具及应用程序三部分,开发者只需关注应用程序层即可快速实现代码。

(注:此处展示框架架构图)

2.1 基础框架

基础框架实现了代码与配置的分离机制。资源配置统一以 XML 文件形式保存,由框架解析处理。框架会根据配置自动创建 SparkSession、SparkContext 和 SparkConf,加载常用环境变量,并提供通用 UDF 函数(如 URL 参数解析等)。

所有应用继承自 Application 父类,开发者只需关注核心业务逻辑部分。

(注:此处展示 Application 处理流程图)

目前,离线框架支持的常用环境变量如下表所示。

(注:此处展示环境变量列表)

2.2 可扩展工具

可扩展工具包含大量服务于应用程序及基础框架的工具类,例如配置文件解析类、数据库读写工具类、日期工具类等。这些通用模块统称为可扩展工具,不再赘述。

2.3 应用程序

2.3.1 SQL 应用

对于 SQL 应用,只需编写 SQL 代码及资源配置。应用类为唯一类(已实现),供所有 SQL 应用复用,开发者无需关心具体实现。

配置示例如下,class 为固定类名,开发者主要关注 path 中的 SQL 路径及 conf 中的资源大小。

<?xml version="1.0" encoding="UTF-8"?>
<project name="test">
    <class>com.way.app.instance.SqlExecutor</class>
    <path>sql 文件路径</path>
    <!-- sparksession conf -->
    <conf>
        <spark.executor.memory>1G</spark.executor.memory>
        <>2
        1G
        20
    

spark.executor.cores
</spark.executor.cores>
<spark.driver.memory>
</spark.driver.memory>
<spark.executor.instances>
</spark.executor.instances>
</conf>
</project>
2.3.2 Java 应用

当 SQL 无法满足复杂数据处理需求时,支持 Java 程序编写。开发者需创建新类继承 Application 父类并实现 run 方法。

在 run 方法中,你只需关注数据处理逻辑。通用的 SparkSession 创建及关闭、输入输出逻辑均由框架封装。框架支持 HDFS 文件输入、SQL 输入等多种类型,调用相关处理函数即可。

以下是一个简单的 Java 数据处理应用示例。从配置可以看出,仍需配置资源大小,但相比 SQL 应用,需要定制化编写对应的 Java 类,并指定输入(input)和输出(output)参数。此例中输入为 SQL 代码,输出为 HDFS 文件。

<?xml version="1.0" encoding="UTF-8"?>
<project name="ecommerce_dwd_hanwuji_click_incr_day_domain">
    <class>com.way.app.instance.ecommerce.Test</class>
    <input>
        <type>table</type>
        <sql>select clk_url, clk_num from test_table where event_day='{DATE}' and click_pv > 0 and is_ubs_spam=0</sql>
    </input>
    <output>
        <type>afs_kp</type>
        <path>test/event_day={DATE}</path>
    </output>
    <conf>
        <spark.executor.memory>2G</spark.executor.memory>
        <spark.executor.cores>2</spark.executor.cores>
        <spark.driver.memory>2G</spark.driver.memory>
        <spark.executor.instances>10</spark.executor.instances>
    </conf>
</project>

对应 Java 实现类如下,开发者只需三步走:获取输入数据、逻辑处理、结果输出。

package com.way.app.instance.ecommerce;

import com.way.app.Application;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.api.java.function.FilterFunction;

import java.util.Map;

public class Test extends Application {
    @Override
    public void run() {
        // 输入
        Map<String, String> input = (Map<String, String>) property.get("input");
        Dataset<Row> ds = sparkSession.sql(getInput(input)).toDF("url", "num");

        // 逻辑处理(筛选特定站点日志)
        JavaRDD<String> outRdd = ds.filter((FilterFunction<Row>) row -> {
            String url = row.getAs("url").toString();
            return url.contains(".jd.com") || url.contains(".suning.com") || 
                   url.contains("pin.suning.com") || url.contains(".taobao.com") || 
                   url.contains("detail.tmall.hk") || url.contains(".amazon.cn") || 
                   url.contains(".kongfz.com") || url.contains(".gome.com.cn") || 
                   url.contains(".kaola.com") || url.contains(".dangdang.com") || 
                   url.contains("aisite.wejianzhan.com") || url.contains("w.weipaitang.com");
        }).toJavaRDD().map(row -> row.mkString("\001"));

        // 输出
        Map<String, String> output = (Map<String, String>) property.get("output");
        outRdd.saveAsTextFile(getOutPut(output));
    }
}
2.3.3 数据回溯应用

数据回溯应用是为了解决快速回溯、释放人力而研发的。开发者无需重构任务代码,与 SQL 应用相同,回溯应用类为唯一类,支持多种回溯方案。

方案设计

日常回溯中发现,单次任务存在严重的时间浪费。无论何种提交方式,都需经历执行环境申请及准备过程:Client 向 RS 申请启动 ApplicationMaster,AM 向 RS 申请 Executor 资源,注册 Task 等。这个过程统称为'执行环境准备',通常耗时 5-20 分钟,失败率约 10%。

整个流程分为三个阶段:执行环境准备、开始执行代码、释放资源。

离线开发框架回溯应用从节省时间和人力两方面考虑,设计方案如下:

(注:此处展示回溯时间优化方案图)

时间节省: 将所有回溯子任务的第一、第三步(环境准备及释放)压缩为一次。若回溯 30 个子任务,可节省 5-20 分钟 × 29。子任务越多,提效越明显。

人力节省: 开发者无需额外开发或添加回溯配置。框架启动的任务数量远小于传统方案,只需关注一个任务的执行状态,而非维护 N 个任务。

实测数据显示,回溯一年的串行任务,不使用框架在最理想情况下需 2.5 天;使用框架在最坏情况下仅需 6 小时,提效 90% 以上,且基本无需人力关注。

功能介绍

断点续回 Spark 计算虽快,但节点宕机、网络波动导致任务失败的情况难免。回溯任务动辄数月,失败后从断点处继续回溯至关重要。框架记录了已成功部分,重启后自动续回。

回溯顺序 根据业务需求确定顺序。依赖历史数据的增量数据通常从历史到现在;无依赖需求时,从现在到历史可更快满足指标了解。框架支持灵活配置。

并行回溯 回溯任务优先级通常低于例行任务,资源有限时需控制并发。框架默认为串行回溯,但也支持一定程度的并行度,开发者可根据资源情况调整。

创建一个回溯任务

回溯应用使用非常方便,无需新开发代码,配置回溯方案即可。

  • class:回溯应用唯一类,必填。
  • type:应用类型,默认 sql,Java 应用填类名。
  • path:回溯代码路径,必填。
  • limitdate:截止日期,必填。
  • startdate:开始日期,必填(断点续回或并行时可能无效)。
  • order:回溯顺序,默认倒序(-1),正序为 1。
  • distance:步长,用于并行回溯。
  • file:日志文件,记录成功日期,失败重启时以此为准。
  • conf:资源占用配置。
<?xml version="1.0" encoding="UTF-8"?>
<project name="ecommerce_ads_others_order_retain_incr_day">
    <class>com.way.app.instance.ecommerce.Huisu</class>
    <type>sql</type>
    <path>/sql/ecommerce/ecommerce_ads_others_order_retain_incr_day.sql</path>
    <limitdate>20220404</limitdate>
    <startdate>20210101</startdate>
    <order>1</order>
    <distance>-1</distance>
    <file>/user/ecommerce_ads_others_order_retain_incr_day/process</file>
    <conf>
        <spark.executor.memory>1G</spark.executor.memory>
        <spark.executor.cores>2</spark.executor.cores>
        <spark.executor.instances>30</spark.executor.instances>
        <spark.yarn.maxAppAttempts>1</spark.yarn.maxAppAttempts>
    </conf>
</project>

三、使用方式

3.1 使用介绍

使用离线框架开发时,开发者只需重点关注数据逻辑处理。打包提交后,每个应用主类相同(即 Application 父类),唯一变化的是接收的配置文件的相对路径。

3.2 使用对比

使用离线框架前后对比效果显著,开发效率大幅提升。

(注:此处展示使用前后对比图)

四、展望

目前框架仅支持 SQL 和 Java 语言。未来计划升级支持更多 Spark 原生语言,让开发者更方便、快速地进行大数据开发。

目录

  1. Spark 离线开发框架设计与实现
  2. 一、背景
  3. 二、框架设计
  4. 2.1 基础框架
  5. 2.2 可扩展工具
  6. 2.3 应用程序
  7. 2.3.1 SQL 应用
  8. 2.3.2 Java 应用
  9. 2.3.3 数据回溯应用
  10. 方案设计
  11. 功能介绍
  12. 创建一个回溯任务
  13. 三、使用方式
  14. 3.1 使用介绍
  15. 3.2 使用对比
  16. 四、展望
  • 💰 8折买阿里云服务器限时8折了解详情
  • 💰 8折买阿里云服务器限时8折购买
  • 🦞 5分钟部署阿里云小龙虾了解详情
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • 被工具定义的编程时代:VS Code 与 JetBrains 效率提升指南
  • 6 款主流免费 AI 写作工具实测:如何规避 AI 检测并提升留存率
  • 基于 Claude Code 的 AI 内容创作自动化工作流实践
  • Jetson Orin NX + Fast-LIO2 自主无人机完整部署方案
  • Whisper-large-v3 多任务并行服务实战:转录、翻译与摘要一体化
  • 如何构建高质量的大模型 RAG 系统
  • PAT 乙级 1043 输出 PATest 题目解析(Python 版)
  • Neeshck-Z-lMage_LYX_v2 LoRA 强度测试:0.0–1.5 生成质量衰减曲线
  • 基于 Python 的淘宝商品数据爬取与可视化系统
  • 数据链路层详解:LLC、MAC、局域网与广域网
  • 大模型技术从基础入门到实战应用指南
  • 隔板法指南:从分球入盒到不定方程,组合计数解题模板
  • Python Flask HTTP 微服务开发与数据库集成
  • Cursor 彻底卸载与设备标识重置指南
  • 网络安全工程师职业定义、核心技能与认证体系详解
  • Harness Engineering:AI 时代的工程最佳实践
  • Neo4j Desktop 2.0 安装及自定义路径配置指南
  • encrypt-labs 前端加密靶场环境搭建与关卡解析
  • 基于扣子平台搭建多功能 AI 女友机器人教程
  • Python 缓存过期机制指南:TTL 设置与 LRU 淘汰策略

相关免费在线工具

  • Keycode 信息

    查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online

  • Escape 与 Native 编解码

    JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online

  • JavaScript / HTML 格式化

    使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online

  • JavaScript 压缩与混淆

    Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online