跳到主要内容
极客日志极客日志面向AI+效率的开发者社区
首页博客GitHub 精选镜像工具UI配色美学隐私政策关于联系
搜索内容 / 工具 / 仓库 / 镜像...⌘K搜索
注册
博客列表
Javajava

Spring Boot 数据仓库与 ETL 工具集成实战

Spring Boot 数据仓库与 ETL 工具集成涉及 Apache Hive 存储管理与 Apache Spark 数据处理。通过配置依赖与连接参数,可实现结构化数据的抽取、转换与加载。文中提供完整代码示例,涵盖 JdbcTemplate 操作及 SparkSession 调度,助力构建企业级数据链路。

宁静发布于 2026/3/22更新于 2026/6/1219 浏览
Spring Boot 数据仓库与 ETL 工具集成实战

Spring Boot 数据仓库与 ETL 工具集成

架构示意图

在构建企业级应用时,将 Spring Boot 与数据仓库及 ETL(抽取、转换、加载)工具结合是常见需求。这不仅能实现数据的集中管理,还能通过自动化流程提升数据处理效率。本文将深入探讨如何集成 Apache Hive 和 Apache Spark,并提供完整的代码示例。

核心概念简述

数据仓库主要用于存储和管理大量结构化数据,支持复杂的企业级分析决策。常见的选择包括基于 Hadoop 的 Apache Hive、HBase,以及云原生的 Amazon Redshift、Google BigQuery 等。

ETL 工具则负责将数据从源系统导入到目标仓库。Apache Spark 适合大规模批处理,Flink 擅长流处理,而 Airflow 常用于任务调度。Spring Boot 作为后端框架,可以很好地封装这些逻辑,提供统一的 API 接口。

集成 Apache Hive

集成 Hive 的核心在于配置 JDBC 驱动并建立连接。Spring Boot 的 JdbcTemplate 足以应对大多数基础 CRUD 场景。

1. 依赖与配置

首先在 pom.xml 中引入必要的依赖:

<dependencies>
    <!-- Web 启动器 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- Hive JDBC 驱动 -->
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-jdbc</artifactId>
        <version>3.1.2</version>
    </>
    
    
        org.apache.hadoop
        hadoop-common
        3.3.1
    
    
    
        org.springframework.boot
        spring-boot-starter-test
        test
    

dependency
<!-- Hadoop 通用库 -->
<dependency>
<groupId>
</groupId>
<artifactId>
</artifactId>
<version>
</version>
</dependency>
<!-- 测试依赖 -->
<dependency>
<groupId>
</groupId>
<artifactId>
</artifactId>
<scope>
</scope>
</dependency>
</dependencies>

接着在 application.properties 中配置数据源:

server.port=8080
# Hive 连接信息
spring.datasource.url=jdbc:hive2://localhost:10000/default
spring.datasource.driver-class-name=org.apache.hive.jdbc.HiveDriver
spring.datasource.username=hive
spring.datasource.password=

2. 实体类定义

我们需要一个对应的 Java 对象来映射表结构。注意这里使用了标准的 Getter/Setter 模式。

public class Product {
    private Long id;
    private String productId;
    private String productName;
    private double price;
    private int sales;

    public Product() {}

    public Product(Long id, String productId, String productName, double price, int sales) {
        this.id = id;
        this.productId = productId;
        this.productName = productName;
        this.price = price;
        this.sales = sales;
    }

    // Getters and Setters
    public Long getId() { return id; }
    public void setId(Long id) { this.id = id; }
    public String getProductId() { return productId; }
    public void setProductId(String productId) { this.productId = productId; }
    public String getProductName() { return productName; }
    public void setProductName(String productName) { this.productName = productName; }
    public double getPrice() { return price; }
    public void setPrice(double price) { this.price = price; }
    public int getSales() { return sales; }
    public void setSales(int sales) { this.sales = sales; }

    @Override
    public String toString() {
        return "Product{" +
                "id=" + id +
                ", productId='" + productId + '\'' +
                ", productName='" + productName + '\'' +
                ", price=" + price +
                ", sales=" + sales +
                '}';
    }
}

3. 数据访问层 (Repository)

使用 JdbcTemplate 直接操作数据库是最轻量级的方案。虽然 Hive 通常用于分析,但在某些场景下也支持事务性写入。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

@Repository
public class ProductRepository {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public List<Product> getAllProducts() {
        String sql = "SELECT * FROM product";
        return jdbcTemplate.query(sql, new RowMapper<Product>() {
            @Override
            public Product mapRow(ResultSet rs, int rowNum) throws SQLException {
                Product product = new Product();
                product.setId(rs.getLong("id"));
                product.setProductId(rs.getString("product_id"));
                product.setProductName(rs.getString("product_name"));
                product.setPrice(rs.getDouble("price"));
                product.setSales(rs.getInt("sales"));
                return product;
            }
        });
    }

    public Product getProductById(Long id) {
        String sql = "SELECT * FROM product WHERE id = ?";
        return jdbcTemplate.queryForObject(sql, new Object[]{id}, new RowMapper<Product>() {
            @Override
            public Product mapRow(ResultSet rs, int rowNum) throws SQLException {
                Product product = new Product();
                product.setId(rs.getLong("id"));
                product.setProductId(rs.getString("product_id"));
                product.setProductName(rs.getString("product_name"));
                product.setPrice(rs.getDouble("price"));
                product.setSales(rs.getInt("sales"));
                return product;
            }
        });
    }

    public void addProduct(Product product) {
        String sql = "INSERT INTO product (product_id, product_name, price, sales) VALUES (?, ?, ?, ?)";
        jdbcTemplate.update(sql, product.getProductId(), product.getProductName(), product.getPrice(), product.getSales());
    }

    public void updateProduct(Product product) {
        String sql = "UPDATE product SET product_id = ?, product_name = ?, price = ?, sales = ? WHERE id = ?";
        jdbcTemplate.update(sql, product.getProductId(), product.getProductName(), product.getPrice(), product.getSales(), product.getId());
    }

    public void deleteProduct(Long id) {
        String sql = "DELETE FROM product WHERE id = ?";
        jdbcTemplate.update(sql, id);
    }
}

4. 业务层与控制器

Service 层负责编排逻辑,Controller 暴露 RESTful 接口。

// ProductService.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;

@Service
public class ProductService {
    @Autowired
    private ProductRepository productRepository;

    public List<Product> getAllProducts() {
        return productRepository.getAllProducts();
    }

    public Product getProductById(Long id) {
        return productRepository.getProductById(id);
    }

    public void addProduct(Product product) {
        productRepository.addProduct(product);
    }

    public void updateProduct(Product product) {
        productRepository.updateProduct(product);
    }

    public void deleteProduct(Long id) {
        productRepository.deleteProduct(id);
    }
}
// ProductController.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;

@RestController
@RequestMapping("/api/products")
public class ProductController {
    @Autowired
    private ProductService productService;

    @GetMapping("/")
    public List<Product> getAllProducts() {
        return productService.getAllProducts();
    }

    @GetMapping("/{id}")
    public Product getProductById(@PathVariable Long id) {
        return productService.getProductById(id);
    }

    @PostMapping("/")
    public void addProduct(@RequestBody Product product) {
        productService.addProduct(product);
    }

    @PutMapping("/{id}")
    public void updateProduct(@PathVariable Long id, @RequestBody Product product) {
        product.setId(id);
        productService.updateProduct(product);
    }

    @DeleteMapping("/{id}")
    public void deleteProduct(@PathVariable Long id) {
        productService.deleteProduct(id);
    }
}

集成 Apache Spark

对于更复杂的数据清洗和转换任务,Spring Boot 可以作为控制入口,调用 Spark 进行计算。

1. 依赖与配置

Spark 需要特定的序列化版本依赖,这里以 Scala 2.12 为例。

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.1.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.1.2</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

配置文件设置 Spark 运行模式和应用名称:

server.port=8080
spark.master=local[*]
spark.app.name=ETLExample

2. ETL 任务执行

创建一个组件类来封装 SparkSession 的生命周期和数据流。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.Properties;

@Component
public class ETLJob {

    @Value("${spark.master}")
    private String master;

    @Value("${spark.app.name}")
    private String appName;

    public void runETL() {
        SparkSession sparkSession = SparkSession.builder()
                .master(master)
                .appName(appName)
                .getOrCreate();

        // 读取源数据 CSV
        Dataset<Row> sourceData = sparkSession.read()
                .format("csv")
                .option("header", "true")
                .option("inferSchema", "true")
                .load("src/main/resources/source-data.csv");

        // 数据转换:筛选销量大于 100 的商品
        Dataset<Row> transformedData = sourceData.select(
                sourceData.col("id"),
                sourceData.col("product_id"),
                sourceData.col("product_name"),
                sourceData.col("price"),
                sourceData.col("sales")
        ).filter(sourceData.col("sales").gt(100));

        // 写入 Hive 数据仓库
        Properties connectionProperties = new Properties();
        connectionProperties.put("user", "hive");
        connectionProperties.put("password", "");

        transformedData.write().mode("overwrite")
                .jdbc("jdbc:hive2://localhost:10000/default", "transformed_product", connectionProperties);

        sparkSession.stop();
    }
}

3. 任务调度与触发

我们可以利用 Spring 的定时任务功能自动执行 ETL,或者通过 HTTP 接口手动触发。

// ETLController.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/etl")
public class ETLController {

    @Autowired
    private ETLScheduler etlScheduler;

    @PostMapping("/run")
    public String runETL() {
        etlScheduler.runETL();
        return "ETL 任务已启动";
    }
}
// ETLScheduler.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class ETLScheduler {
    @Autowired
    private ETLJob etlJob;

    // 每天凌晨 0 点执行
    @Scheduled(cron = "0 0 0 * * ?")
    public void runETL() {
        etlJob.runETL();
    }

    public void runETLNow() {
        etlJob.runETL();
    }
}

主启动类需开启定时任务支持:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class ETLApplication {
    public static void main(String[] args) {
        SpringApplication.run(ETLApplication.class, args);
    }
}

实际应用场景

这种架构非常适合以下场景:

  • 产品库存同步:将电商订单数据定期清洗后写入 Hive,供 BI 报表使用。
  • 用户行为分析:收集前端日志,经过 Spark 过滤异常值后存入数仓。
  • 销售数据聚合:按日/周维度汇总销售额,更新到数据仓库供管理层查看。

通过上述步骤,我们不仅实现了数据的持久化,还构建了可扩展的 ETL 流水线。在实际开发中,根据数据量大小选择合适的工具组合是关键——小数据量用 JDBC 直连即可,大数据量则务必引入 Spark 等分布式引擎。

目录

  1. Spring Boot 数据仓库与 ETL 工具集成
  2. 核心概念简述
  3. 集成 Apache Hive
  4. 1. 依赖与配置
  5. Hive 连接信息
  6. 2. 实体类定义
  7. 3. 数据访问层 (Repository)
  8. 4. 业务层与控制器
  9. 集成 Apache Spark
  10. 1. 依赖与配置
  11. 2. ETL 任务执行
  12. 3. 任务调度与触发
  13. 实际应用场景
  • 💰 8折买阿里云服务器限时8折了解详情
  • Magick API 一键接入全球大模型注册送1000万token查看
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • AI 辅助构建高可用电商系统核心架构实战
  • Spring Boot + jQuery 前后端分离图书管理系统:接口设计与调试
  • Linux 基本使用与 Java 程序部署指南
  • Spring AI 框架入门与核心功能详解
  • 飞算 JavaAI 智能开发助手:工程代码一键生成功能解析
  • SpringBoot 结合 Redis+Caffeine 多级缓存架构实践
  • llama.cpp 量化大模型部署与运行指南
  • Windows 11 配置 CUDA 版 llama.cpp 实现系统全局调用
  • 宇树机器人 G1 二次开发:导航仿真与地图转换教程
  • 2025 年 9 月 GESP C++ 三级真题解析
  • 第 n 个丑数:从暴力枚举到动态规划 + 多指针
  • 基于C++的DPU医疗领域编程初探
  • 动态规划解题思路与常见题型总结
  • 从Colab到生产:Llama Factory进阶迁移指南
  • ROS1 机器人 SLAM:Gmapping 算法详解与实战
  • CentOS 搭建私人漫画库:Teemii + cpolar 公网访问
  • CLIP 论文阅读笔记
  • LazyLLM 代码专家智能体实战与测评
  • Windows 配置 NFS 客户端
  • 基于 AI 的全栈开发新路径:自动生成 UI 设计稿与 H5 原型

相关免费在线工具

  • Keycode 信息

    查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online

  • Escape 与 Native 编解码

    JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online

  • JavaScript / HTML 格式化

    使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online

  • JavaScript 压缩与混淆

    Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online