跳到主要内容Spring Boot 数据仓库与 ETL 工具集成实战 | 极客日志Javajava
Spring Boot 数据仓库与 ETL 工具集成实战
Spring Boot 数据仓库与 ETL 工具集成涉及 Apache Hive 存储管理与 Apache Spark 数据处理。通过配置依赖与连接参数,可实现结构化数据的抽取、转换与加载。文中提供完整代码示例,涵盖 JdbcTemplate 操作及 SparkSession 调度,助力构建企业级数据链路。
宁静1 浏览 Spring Boot 数据仓库与 ETL 工具集成

在构建企业级应用时,将 Spring Boot 与数据仓库及 ETL(抽取、转换、加载)工具结合是常见需求。这不仅能实现数据的集中管理,还能通过自动化流程提升数据处理效率。本文将深入探讨如何集成 Apache Hive 和 Apache Spark,并提供完整的代码示例。
核心概念简述
数据仓库主要用于存储和管理大量结构化数据,支持复杂的企业级分析决策。常见的选择包括基于 Hadoop 的 Apache Hive、HBase,以及云原生的 Amazon Redshift、Google BigQuery 等。
ETL 工具则负责将数据从源系统导入到目标仓库。Apache Spark 适合大规模批处理,Flink 擅长流处理,而 Airflow 常用于任务调度。Spring Boot 作为后端框架,可以很好地封装这些逻辑,提供统一的 API 接口。
集成 Apache Hive
集成 Hive 的核心在于配置 JDBC 驱动并建立连接。Spring Boot 的 JdbcTemplate 足以应对大多数基础 CRUD 场景。
1. 依赖与配置
首先在 pom.xml 中引入必要的依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.2</version>
</>
org.apache.hadoop
hadoop-common
3.3.1
org.springframework.boot
spring-boot-starter-test
test
dependency
<dependency>
<groupId>
</groupId>
<artifactId>
</artifactId>
<version>
</version>
</dependency>
<dependency>
<groupId>
</groupId>
<artifactId>
</artifactId>
<scope>
</scope>
</dependency>
</dependencies>
接着在 application.properties 中配置数据源:
server.port=8080
# Hive 连接信息
spring.datasource.url=jdbc:hive2://localhost:10000/default
spring.datasource.driver-class-name=org.apache.hive.jdbc.HiveDriver
spring.datasource.username=hive
spring.datasource.password=
2. 实体类定义
我们需要一个对应的 Java 对象来映射表结构。注意这里使用了标准的 Getter/Setter 模式。
public class Product {
private Long id;
private String productId;
private String productName;
private double price;
private int sales;
public Product() {}
public Product(Long id, String productId, String productName, double price, int sales) {
this.id = id;
this.productId = productId;
this.productName = productName;
this.price = price;
this.sales = sales;
}
public Long getId() { return id; }
public void setId(Long id) { this.id = id; }
public String getProductId() { return productId; }
public void setProductId(String productId) { this.productId = productId; }
public String getProductName() { return productName; }
public void setProductName(String productName) { this.productName = productName; }
public double getPrice() { return price; }
public void setPrice(double price) { this.price = price; }
public int getSales() { return sales; }
public void setSales(int sales) { this.sales = sales; }
@Override
public String toString() {
return "Product{" +
"id=" + id +
", productId='" + productId + '\'' +
", productName='" + productName + '\'' +
", price=" + price +
", sales=" + sales +
'}';
}
}
3. 数据访问层 (Repository)
使用 JdbcTemplate 直接操作数据库是最轻量级的方案。虽然 Hive 通常用于分析,但在某些场景下也支持事务性写入。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
@Repository
public class ProductRepository {
@Autowired
private JdbcTemplate jdbcTemplate;
public List<Product> getAllProducts() {
String sql = "SELECT * FROM product";
return jdbcTemplate.query(sql, new RowMapper<Product>() {
@Override
public Product mapRow(ResultSet rs, int rowNum) throws SQLException {
Product product = new Product();
product.setId(rs.getLong("id"));
product.setProductId(rs.getString("product_id"));
product.setProductName(rs.getString("product_name"));
product.setPrice(rs.getDouble("price"));
product.setSales(rs.getInt("sales"));
return product;
}
});
}
public Product getProductById(Long id) {
String sql = "SELECT * FROM product WHERE id = ?";
return jdbcTemplate.queryForObject(sql, new Object[]{id}, new RowMapper<Product>() {
@Override
public Product mapRow(ResultSet rs, int rowNum) throws SQLException {
Product product = new Product();
product.setId(rs.getLong("id"));
product.setProductId(rs.getString("product_id"));
product.setProductName(rs.getString("product_name"));
product.setPrice(rs.getDouble("price"));
product.setSales(rs.getInt("sales"));
return product;
}
});
}
public void addProduct(Product product) {
String sql = "INSERT INTO product (product_id, product_name, price, sales) VALUES (?, ?, ?, ?)";
jdbcTemplate.update(sql, product.getProductId(), product.getProductName(), product.getPrice(), product.getSales());
}
public void updateProduct(Product product) {
String sql = "UPDATE product SET product_id = ?, product_name = ?, price = ?, sales = ? WHERE id = ?";
jdbcTemplate.update(sql, product.getProductId(), product.getProductName(), product.getPrice(), product.getSales(), product.getId());
}
public void deleteProduct(Long id) {
String sql = "DELETE FROM product WHERE id = ?";
jdbcTemplate.update(sql, id);
}
}
4. 业务层与控制器
Service 层负责编排逻辑,Controller 暴露 RESTful 接口。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class ProductService {
@Autowired
private ProductRepository productRepository;
public List<Product> getAllProducts() {
return productRepository.getAllProducts();
}
public Product getProductById(Long id) {
return productRepository.getProductById(id);
}
public void addProduct(Product product) {
productRepository.addProduct(product);
}
public void updateProduct(Product product) {
productRepository.updateProduct(product);
}
public void deleteProduct(Long id) {
productRepository.deleteProduct(id);
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
@RequestMapping("/api/products")
public class ProductController {
@Autowired
private ProductService productService;
@GetMapping("/")
public List<Product> getAllProducts() {
return productService.getAllProducts();
}
@GetMapping("/{id}")
public Product getProductById(@PathVariable Long id) {
return productService.getProductById(id);
}
@PostMapping("/")
public void addProduct(@RequestBody Product product) {
productService.addProduct(product);
}
@PutMapping("/{id}")
public void updateProduct(@PathVariable Long id, @RequestBody Product product) {
product.setId(id);
productService.updateProduct(product);
}
@DeleteMapping("/{id}")
public void deleteProduct(@PathVariable Long id) {
productService.deleteProduct(id);
}
}
集成 Apache Spark
对于更复杂的数据清洗和转换任务,Spring Boot 可以作为控制入口,调用 Spark 进行计算。
1. 依赖与配置
Spark 需要特定的序列化版本依赖,这里以 Scala 2.12 为例。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
server.port=8080
spark.master=local[*]
spark.app.name=ETLExample
2. ETL 任务执行
创建一个组件类来封装 SparkSession 的生命周期和数据流。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Properties;
@Component
public class ETLJob {
@Value("${spark.master}")
private String master;
@Value("${spark.app.name}")
private String appName;
public void runETL() {
SparkSession sparkSession = SparkSession.builder()
.master(master)
.appName(appName)
.getOrCreate();
Dataset<Row> sourceData = sparkSession.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("src/main/resources/source-data.csv");
Dataset<Row> transformedData = sourceData.select(
sourceData.col("id"),
sourceData.col("product_id"),
sourceData.col("product_name"),
sourceData.col("price"),
sourceData.col("sales")
).filter(sourceData.col("sales").gt(100));
Properties connectionProperties = new Properties();
connectionProperties.put("user", "hive");
connectionProperties.put("password", "");
transformedData.write().mode("overwrite")
.jdbc("jdbc:hive2://localhost:10000/default", "transformed_product", connectionProperties);
sparkSession.stop();
}
}
3. 任务调度与触发
我们可以利用 Spring 的定时任务功能自动执行 ETL,或者通过 HTTP 接口手动触发。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/etl")
public class ETLController {
@Autowired
private ETLScheduler etlScheduler;
@PostMapping("/run")
public String runETL() {
etlScheduler.runETL();
return "ETL 任务已启动";
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class ETLScheduler {
@Autowired
private ETLJob etlJob;
@Scheduled(cron = "0 0 0 * * ?")
public void runETL() {
etlJob.runETL();
}
public void runETLNow() {
etlJob.runETL();
}
}
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class ETLApplication {
public static void main(String[] args) {
SpringApplication.run(ETLApplication.class, args);
}
}
实际应用场景
- 产品库存同步:将电商订单数据定期清洗后写入 Hive,供 BI 报表使用。
- 用户行为分析:收集前端日志,经过 Spark 过滤异常值后存入数仓。
- 销售数据聚合:按日/周维度汇总销售额,更新到数据仓库供管理层查看。
通过上述步骤,我们不仅实现了数据的持久化,还构建了可扩展的 ETL 流水线。在实际开发中,根据数据量大小选择合适的工具组合是关键——小数据量用 JDBC 直连即可,大数据量则务必引入 Spark 等分布式引擎。
相关免费在线工具
- Keycode 信息
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
- Escape 与 Native 编解码
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
- JavaScript / HTML 格式化
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
- JavaScript 压缩与混淆
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online