跳到主要内容Spring Boot 数据仓库与 ETL 工具集成实战 | 极客日志Javajava
Spring Boot 数据仓库与 ETL 工具集成实战
Spring Boot 集成数据仓库与 ETL 工具涉及 Hive 连接配置、Spark 任务调度及实际业务场景落地。通过 JdbcTemplate 实现数据访问,结合 SparkSession 处理大数据转换,利用 Spring Scheduling 定时执行 ETL 流程。文章涵盖依赖引入、配置文件编写、Repository 层设计及 Controller 接口实现,提供从环境搭建到测试验证的完整实践路径,帮助开发者构建高效的数据处理管道。
Stephaine Walsh0 浏览 Spring Boot 数据仓库与 ETL 工具集成实战
在构建企业级应用时,将 Spring Boot 与数据仓库及 ETL(抽取、转换、加载)工具结合是处理海量数据的关键。本文将探讨如何整合 Apache Hive 进行数据存储,以及利用 Apache Spark 执行大数据任务,并提供完整的代码实践。
核心概念简述
数据仓库通常用于存储和管理大量结构化数据,支持复杂的分析决策。常见的选择包括基于 Hadoop 的 Apache Hive、列式数据库 HBase,以及云端的 Amazon Redshift 和 Google BigQuery。
ETL 工具负责数据的抽取、转换和加载流程。Apache Spark 作为分布式计算框架,非常适合处理大规模 ETL 操作;Apache Flink 则擅长流处理;而 Apache Airflow 常用于调度这些任务。
集成 Apache Hive
要在 Spring Boot 中连接 Hive,首先需要引入必要的依赖。除了基础的 Web 启动器,还需要 Hive JDBC 驱动和 Hadoop 通用库。
1. 配置依赖与连接
在 pom.xml 中添加以下依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
hadoop-common
3.3.1
org.springframework.boot
spring-boot-starter-test
test
<artifactId>
</artifactId>
<version>
</version>
</dependency>
<dependency>
<groupId>
</groupId>
<artifactId>
</artifactId>
<scope>
</scope>
</dependency>
</dependencies>
接着在 application.properties 中配置数据源:
server.port=8080
spring.datasource.url=jdbc:hive2://localhost:10000/default
spring.datasource.driver-class-name=org.apache.hive.jdbc.HiveDriver
spring.datasource.username=hive
spring.datasource.password=
2. 数据访问层实现
定义实体类 Product,包含商品的基本信息。注意字段映射需与数据库表结构一致。
public class Product {
private Long id;
private String productId;
private String productName;
private double price;
private int sales;
public Product() {}
public Product(Long id, String productId, String productName, double price, int sales) {
this.id = id;
this.productId = productId;
this.productName = productName;
this.price = price;
this.sales = sales;
}
public Long getId() { return id; }
public void setId(Long id) { this.id = id; }
public String getProductId() { return productId; }
public void setProductId(String productId) { this.productId = productId; }
public String getProductName() { return productName; }
public void setProductName(String productName) { this.productName = productName; }
public double getPrice() { return price; }
public void setPrice(double price) { this.price = price; }
public int getSales() { return sales; }
public void setSales(int sales) { this.sales = sales; }
@Override
public String toString() {
return "Product{" +
"id=" + id +
", productId='" + productId + '\'' +
", productName='" + productName + '\'' +
", price=" + price +
", sales=" + sales +
'}';
}
}
使用 JdbcTemplate 编写 Repository 接口,封装 SQL 操作。这里需要注意参数绑定和结果集映射。
@Repository
public class ProductRepository {
@Autowired
private JdbcTemplate jdbcTemplate;
public List<Product> getAllProducts() {
String sql = "SELECT * FROM product";
return jdbcTemplate.query(sql, (rs, rowNum) -> {
Product product = new Product();
product.setId(rs.getLong("id"));
product.setProductId(rs.getString("product_id"));
product.setProductName(rs.getString("product_name"));
product.setPrice(rs.getDouble("price"));
product.setSales(rs.getInt("sales"));
return product;
});
}
public Product getProductById(Long id) {
String sql = "SELECT * FROM product WHERE id = ?";
return jdbcTemplate.queryForObject(sql, new Object[]{id}, (rs, rowNum) -> {
Product product = new Product();
product.setId(rs.getLong("id"));
product.setProductId(rs.getString("product_id"));
product.setProductName(rs.getString("product_name"));
product.setPrice(rs.getDouble("price"));
product.setSales(rs.getInt("sales"));
return product;
});
}
public void addProduct(Product product) {
String sql = "INSERT INTO product (product_id, product_name, price, sales) VALUES (?, ?, ?, ?)";
jdbcTemplate.update(sql, product.getProductId(), product.getProductName(), product.getPrice(), product.getSales());
}
public void updateProduct(Product product) {
String sql = "UPDATE product SET product_id = ?, product_name = ?, price = ?, sales = ? WHERE id = ?";
jdbcTemplate.update(sql, product.getProductId(), product.getProductName(), product.getPrice(), product.getSales(), product.getId());
}
public void deleteProduct(Long id) {
String sql = "DELETE FROM product WHERE id = ?";
jdbcTemplate.update(sql, id);
}
}
3. 业务层与控制器
Service 层主要负责调用 Repository,Controller 层暴露 REST API。这里采用标准的分层架构,便于维护。
@Service
public class ProductService {
@Autowired
private ProductRepository productRepository;
public List<Product> getAllProducts() {
return productRepository.getAllProducts();
}
public Product getProductById(Long id) {
return productRepository.getProductById(id);
}
public void addProduct(Product product) {
productRepository.addProduct(product);
}
public void updateProduct(Product product) {
productRepository.updateProduct(product);
}
public void deleteProduct(Long id) {
productRepository.deleteProduct(id);
}
}
@RestController
@RequestMapping("/api/products")
public class ProductController {
@Autowired
private ProductService productService;
@GetMapping("/")
public List<Product> getAllProducts() {
return productService.getAllProducts();
}
@GetMapping("/{id}")
public Product getProductById(@PathVariable Long id) {
return productService.getProductById(id);
}
@PostMapping("/")
public void addProduct(@RequestBody Product product) {
productService.addProduct(product);
}
@PutMapping("/{id}")
public void updateProduct(@PathVariable Long id, @RequestBody Product product) {
product.setId(id);
productService.updateProduct(product);
}
@DeleteMapping("/{id}")
public void deleteProduct(@PathVariable Long id) {
productService.deleteProduct(id);
}
}
启动类只需添加 @SpringBootApplication 注解即可。
集成 Apache Spark
对于更复杂的数据处理场景,Spring Boot 可以嵌入 Spark Session 来执行 ETL 任务。这种方式适合需要在 Java 应用中直接调用大数据能力的场景。
1. 依赖与配置
Spark 需要指定 Scala 版本,这里以 2.12 为例。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置文件设置 Spark Master 地址和应用名称:
server.port=8080
spark.master=local[*]
spark.app.name=ETLExample
2. ETL 任务实现
创建一个组件类来管理 SparkSession 的生命周期和执行逻辑。读取 CSV 源数据,过滤销售大于 100 的记录,并写入 Hive 表。
@Component
public class ETLJob {
@Value("${spark.master}")
private String master;
@Value("${spark.app.name}")
private String appName;
public void runETL() {
SparkSession sparkSession = SparkSession.builder()
.master(master)
.appName(appName)
.getOrCreate();
Dataset<Row> sourceData = sparkSession.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("src/main/resources/source-data.csv");
Dataset<Row> transformedData = sourceData.select(
sourceData.col("id"),
sourceData.col("product_id"),
sourceData.col("product_name"),
sourceData.col("price"),
sourceData.col("sales")
).filter(sourceData.col("sales").gt(100));
Properties connectionProperties = new Properties();
connectionProperties.put("user", "hive");
connectionProperties.put("password", "");
transformedData.write().mode("overwrite")
.jdbc("jdbc:hive2://localhost:10000/default", "transformed_product", connectionProperties);
sparkSession.stop();
}
}
3. 调度与触发
为了自动化运行,可以使用 Spring 的定时任务功能。同时提供一个手动触发的接口供调试使用。
@Component
public class ETLScheduler {
@Autowired
private ETLJob etlJob;
@Scheduled(cron = "0 0 0 * * ?")
public void runETL() {
etlJob.runETL();
}
public void runETLNow() {
etlJob.runETL();
}
}
@RestController
@RequestMapping("/api/etl")
public class ETLController {
@Autowired
private ETLScheduler etlScheduler;
@PostMapping("/run")
public String runETL() {
etlScheduler.runETL();
return "ETL 任务已启动";
}
}
@SpringBootApplication
@EnableScheduling
public class ETLApplication {
public static void main(String[] args) {
SpringApplication.run(ETLApplication.class, args);
}
}
实际应用场景
- 产品信息同步:将交易系统的订单数据清洗后导入数仓。
- 用户画像分析:整合多源用户行为数据,生成统一视图。
- 销售报表生成:定期汇总销售数据,更新 BI 系统所需表。
通过上述步骤,你可以构建一个既具备 Web 服务能力,又能处理大数据任务的 Spring Boot 应用。关键在于合理管理资源(如 SparkSession),并确保数据一致性。在实际生产环境中,建议结合 Airflow 等外部调度器来管理更复杂的依赖关系。
相关免费在线工具
- Keycode 信息
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
- Escape 与 Native 编解码
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
- JavaScript / HTML 格式化
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
- JavaScript 压缩与混淆
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online