Spring Boot 数据仓库与ETL工具集成

Spring Boot 数据仓库与ETL工具集成

Spring Boot 数据仓库与ETL工具集成

在这里插入图片描述
26.1 学习目标与重点提示

学习目标:掌握Spring Boot数据仓库与ETL工具集成的核心概念与使用方法,包括数据仓库的定义与特点、ETL工具的定义与特点、Spring Boot与数据仓库的集成、Spring Boot与ETL工具的集成、Spring Boot的实际应用场景,学会在实际开发中处理数据仓库与ETL工具集成问题。
重点:数据仓库的定义与特点ETL工具的定义与特点Spring Boot与数据仓库的集成Spring Boot与ETL工具的集成Spring Boot的实际应用场景

26.2 数据仓库与ETL工具概述

数据仓库与ETL工具是Java开发中的重要组件。

26.2.1 数据仓库的定义

定义:数据仓库是一种用于存储和管理大量结构化数据的数据库系统,用于支持企业级数据分析和决策。
作用

  • 提供统一的数据存储。
  • 支持复杂的数据分析。
  • 提高决策效率。

常见的数据仓库

  • Apache Hive:Apache Hive是一种基于Hadoop的数据仓库工具。
  • Apache HBase:Apache HBase是一种基于Hadoop的列式数据库。
  • Amazon Redshift:Amazon Redshift是一种基于云计算的数据仓库。
  • Google BigQuery:Google BigQuery是一种基于云计算的数据仓库。

✅ 结论:数据仓库是一种用于存储和管理大量结构化数据的数据库系统,作用是提供统一的数据存储、支持复杂的数据分析、提高决策效率。

26.2.2 ETL工具的定义

定义:ETL工具是一种用于数据抽取(Extract)、转换(Transform)和加载(Load)的工具,用于将数据从源系统导入到数据仓库。
作用

  • 实现数据的抽取。
  • 实现数据的转换。
  • 实现数据的加载。

常见的ETL工具

  • Apache Spark:Apache Spark是一种开源的分布式计算框架,支持ETL操作。
  • Apache Flink:Apache Flink是一种开源的流处理框架,支持ETL操作。
  • Apache Airflow:Apache Airflow是一种开源的调度工具,用于调度ETL任务。
  • Talend:Talend是一种开源的ETL工具。

✅ 结论:ETL工具是一种用于数据抽取、转换和加载的工具,作用是实现数据的抽取、转换、加载。

26.3 Spring Boot与数据仓库的集成

Spring Boot与数据仓库的集成是Java开发中的重要内容。

26.3.1 集成Apache Hive的步骤

定义:集成Apache Hive的步骤是指使用Spring Boot与Apache Hive集成的方法。
步骤

  1. 创建Spring Boot项目。
  2. 添加所需的依赖。
  3. 配置Apache Hive。
  4. 创建数据访问层。
  5. 创建业务层。
  6. 创建控制器类。
  7. 测试应用。

示例
pom.xml文件中的依赖:

<dependencies><!-- Web依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Hive依赖 --><dependency><groupId>org.apache.hive</groupId><artifactId>hive-jdbc</artifactId><version>3.1.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.3.1</version></dependency><!-- 测试依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

application.properties文件中的配置:

# 服务器端口 server.port=8080 # Hive连接信息 spring.datasource.url=jdbc:hive2://localhost:10000/default spring.datasource.driver-class-name=org.apache.hive.jdbc.HiveDriver spring.datasource.username=hive spring.datasource.password= 

实体类:

publicclassProduct{privateLong id;privateString productId;privateString productName;privatedouble price;privateint sales;publicProduct(){}publicProduct(Long id,String productId,String productName,double price,int sales){this.id = id;this.productId = productId;this.productName = productName;this.price = price;this.sales = sales;}// Getter和Setter方法publicLonggetId(){return id;}publicvoidsetId(Long id){this.id = id;}publicStringgetProductId(){return productId;}publicvoidsetProductId(String productId){this.productId = productId;}publicStringgetProductName(){return productName;}publicvoidsetProductName(String productName){this.productName = productName;}publicdoublegetPrice(){return price;}publicvoidsetPrice(double price){this.price = price;}publicintgetSales(){return sales;}publicvoidsetSales(int sales){this.sales = sales;}@OverridepublicStringtoString(){return"Product{"+"id="+ id +",+ productId +'\''+",+ productName +'\''+", price="+ price +", sales="+ sales +'}';}}

Repository接口:

importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.jdbc.core.JdbcTemplate;importorg.springframework.jdbc.core.RowMapper;importorg.springframework.stereotype.Repository;importjava.sql.ResultSet;importjava.sql.SQLException;importjava.util.List;@RepositorypublicclassProductRepository{@AutowiredprivateJdbcTemplate jdbcTemplate;publicList<Product>getAllProducts(){String sql ="SELECT * FROM product";return jdbcTemplate.query(sql,newRowMapper<Product>(){@OverridepublicProductmapRow(ResultSet rs,int rowNum)throwsSQLException{Product product =newProduct(); product.setId(rs.getLong("id")); product.setProductId(rs.getString("product_id")); product.setProductName(rs.getString("product_name")); product.setPrice(rs.getDouble("price")); product.setSales(rs.getInt("sales"));return product;}});}publicProductgetProductById(Long id){String sql ="SELECT * FROM product WHERE id = ?";return jdbcTemplate.queryForObject(sql,newObject[]{id},newRowMapper<Product>(){@OverridepublicProductmapRow(ResultSet rs,int rowNum)throwsSQLException{Product product =newProduct(); product.setId(rs.getLong("id")); product.setProductId(rs.getString("product_id")); product.setProductName(rs.getString("product_name")); product.setPrice(rs.getDouble("price")); product.setSales(rs.getInt("sales"));return product;}});}publicvoidaddProduct(Product product){String sql ="INSERT INTO product (product_id, product_name, price, sales) VALUES (?, ?, ?, ?)"; jdbcTemplate.update(sql, product.getProductId(), product.getProductName(), product.getPrice(), product.getSales());}publicvoidupdateProduct(Product product){String sql ="UPDATE product SET product_id = ?, product_name = ?, price = ?, sales = ? WHERE id = ?"; jdbcTemplate.update(sql, product.getProductId(), product.getProductName(), product.getPrice(), product.getSales(), product.getId());}publicvoiddeleteProduct(Long id){String sql ="DELETE FROM product WHERE id = ?"; jdbcTemplate.update(sql, id);}}

Service类:

importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importjava.util.List;@ServicepublicclassProductService{@AutowiredprivateProductRepository productRepository;publicList<Product>getAllProducts(){return productRepository.getAllProducts();}publicProductgetProductById(Long id){return productRepository.getProductById(id);}publicvoidaddProduct(Product product){ productRepository.addProduct(product);}publicvoidupdateProduct(Product product){ productRepository.updateProduct(product);}publicvoiddeleteProduct(Long id){ productRepository.deleteProduct(id);}}

控制器类:

importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.*;importjava.util.List;@RestController@RequestMapping("/api/products")publicclassProductController{@AutowiredprivateProductService productService;@GetMapping("/")publicList<Product>getAllProducts(){return productService.getAllProducts();}@GetMapping("/{id}")publicProductgetProductById(@PathVariableLong id){return productService.getProductById(id);}@PostMapping("/")publicvoidaddProduct(@RequestBodyProduct product){ productService.addProduct(product);}@PutMapping("/{id}")publicvoidupdateProduct(@PathVariableLong id,@RequestBodyProduct product){ product.setId(id); productService.updateProduct(product);}@DeleteMapping("/{id}")publicvoiddeleteProduct(@PathVariableLong id){ productService.deleteProduct(id);}}

应用启动类:

importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublicclassHiveApplication{publicstaticvoidmain(String[] args){SpringApplication.run(HiveApplication.class, args);}}

测试类:

importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.boot.test.web.client.TestRestTemplate;importorg.springframework.boot.web.server.LocalServerPort;importstaticorg.assertj.core.api.Assertions.assertThat;@SpringBootTest(webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)classHiveApplicationTests{@LocalServerPortprivateint port;@AutowiredprivateTestRestTemplate restTemplate;@TestvoidcontextLoads(){}@TestvoidtestGetAllProducts(){List<Product> products = restTemplate.getForObject("http://localhost:"+ port +"/api/products/",List.class);assertThat(products).isNotNull();assertThat(products.size()).isGreaterThanOrEqualTo(0);}@TestvoidtestAddProduct(){Product product =newProduct(null,"P001","手机",1000.0,100); restTemplate.postForObject("http://localhost:"+ port +"/api/products/", product,Product.class);List<Product> products = restTemplate.getForObject("http://localhost:"+ port +"/api/products/",List.class);assertThat(products).isNotNull();assertThat(products.size()).isGreaterThanOrEqualTo(1);}}

✅ 结论:集成Apache Hive的步骤包括创建Spring Boot项目、添加所需的依赖、配置Apache Hive、创建数据访问层、创建业务层、创建控制器类、测试应用。

26.4 Spring Boot与ETL工具的集成

Spring Boot与ETL工具的集成是Java开发中的重要内容。

26.4.1 集成Apache Spark的步骤

定义:集成Apache Spark的步骤是指使用Spring Boot与Apache Spark集成的方法。
步骤

  1. 创建Spring Boot项目。
  2. 添加所需的依赖。
  3. 配置Apache Spark。
  4. 创建ETL任务。
  5. 测试应用。

示例
pom.xml文件中的依赖:

<dependencies><!-- Web依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spark依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.1.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.1.2</version></dependency><!-- 测试依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

application.properties文件中的配置:

# 服务器端口 server.port=8080 # Spark配置 spark.master=local[*] spark.app.name=ETLExample 

ETL任务类:

importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SparkSession;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;importjava.util.Properties;@ComponentpublicclassETLJob{@Value("${spark.master}")privateString master;@Value("${spark.app.name}")privateString appName;publicvoidrunETL(){SparkSession sparkSession =SparkSession.builder().master(master).appName(appName).getOrCreate();// 读取源数据Dataset<Row> sourceData = sparkSession.read().format("csv").option("header","true").option("inferSchema","true").load("src/main/resources/source-data.csv");// 数据转换Dataset<Row> transformedData = sourceData.select( sourceData.col("id"), sourceData.col("product_id"), sourceData.col("product_name"), sourceData.col("price"), sourceData.col("sales")).filter(sourceData.col("sales")>100);// 写入目标数据Properties connectionProperties =newProperties(); connectionProperties.put("user","hive"); connectionProperties.put("password",""); transformedData.write().mode("overwrite").jdbc("jdbc:hive2://localhost:10000/default","transformed_product", connectionProperties); sparkSession.stop();}}

控制器类:

importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.*;@RestController@RequestMapping("/api/etl")publicclassETLController{@AutowiredprivateETLScheduler etlScheduler;@PostMapping("/run")publicStringrunETL(){ etlScheduler.runETL();return"ETL任务已启动";}}

调度器类:

importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.scheduling.annotation.Scheduled;importorg.springframework.stereotype.Component;@ComponentpublicclassETLScheduler{@AutowiredprivateETLJob etlJob;@Scheduled(cron ="0 0 0 * * ?")// 每天凌晨0点执行publicvoidrunETL(){ etlJob.runETL();}publicvoidrunETLNow(){ etlJob.runETL();}}

应用启动类:

importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;importorg.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication@EnableSchedulingpublicclassETLApplication{publicstaticvoidmain(String[] args){SpringApplication.run(ETLApplication.class, args);}}

测试类:

importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.boot.test.web.client.TestRestTemplate;importorg.springframework.boot.web.server.LocalServerPort;importstaticorg.assertj.core.api.Assertions.assertThat;@SpringBootTest(webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)classETLApplicationTests{@LocalServerPortprivateint port;@AutowiredprivateTestRestTemplate restTemplate;@TestvoidcontextLoads(){}@TestvoidtestRunETL(){String response = restTemplate.postForObject("http://localhost:"+ port +"/api/etl/run",null,String.class);assertThat(response).contains("ETL任务已启动");}}

✅ 结论:集成Apache Spark的步骤包括创建Spring Boot项目、添加所需的依赖、配置Apache Spark、创建ETL任务、测试应用。

26.5 Spring Boot的实际应用场景

在实际开发中,Spring Boot数据仓库与ETL工具集成的应用场景非常广泛,如:

  • 实现产品信息的ETL任务。
  • 实现用户信息的ETL任务。
  • 实现订单信息的ETL任务。
  • 实现销售数据的ETL任务。

示例

importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SparkSession;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;importjava.util.Properties;@ComponentclassETLJob{@Value("${spark.master}")privateString master;@Value("${spark.app.name}")privateString appName;publicvoidrunETL(){SparkSession sparkSession =SparkSession.builder().master(master).appName(appName).getOrCreate();// 读取源数据Dataset<Row> sourceData = sparkSession.read().format("csv").option("header","true").option("inferSchema","true").load("src/main/resources/source-data.csv");// 数据转换Dataset<Row> transformedData = sourceData.select( sourceData.col("id"), sourceData.col("product_id"), sourceData.col("product_name"), sourceData.col("price"), sourceData.col("sales")).filter(sourceData.col("sales")>100);// 写入目标数据Properties connectionProperties =newProperties(); connectionProperties.put("user","hive"); connectionProperties.put("password",""); transformedData.write().mode("overwrite").jdbc("jdbc:hive2://localhost:10000/default","transformed_product", connectionProperties); sparkSession.stop();}}@RestController@RequestMapping("/api/etl")classETLController{@AutowiredprivateETLScheduler etlScheduler;@PostMapping("/run")publicStringrunETL(){ etlScheduler.runETL();return"ETL任务已启动";}}@ComponentclassETLScheduler{@AutowiredprivateETLJob etlJob;@Scheduled(cron ="0 0 0 * * ?")// 每天凌晨0点执行publicvoidrunETL(){ etlJob.runETL();}publicvoidrunETLNow(){ etlJob.runETL();}}@SpringBootApplication@EnableSchedulingpublicclassETLApplication{publicstaticvoidmain(String[] args){SpringApplication.run(ETLApplication.class, args);}}// 测试类@SpringBootTest(webEnvironment =SpringBootTest.WebEnvironment.RANDOM_PORT)classETLApplicationTests{@LocalServerPortprivateint port;@AutowiredprivateTestRestTemplate restTemplate;@TestvoidcontextLoads(){}@TestvoidtestRunETL(){String response = restTemplate.postForObject("http://localhost:"+ port +"/api/etl/run",null,String.class);assertThat(response).contains("ETL任务已启动");}}

输出结果

  • 访问http://localhost:8080/api/etl/run:启动ETL任务。
  • 控制台输出:ETL任务已启动。

✅ 结论:在实际开发中,Spring Boot数据仓库与ETL工具集成的应用场景非常广泛,需要根据实际问题选择合适的数据仓库和ETL工具。

总结

本章我们学习了Spring Boot数据仓库与ETL工具集成,包括数据仓库的定义与特点、ETL工具的定义与特点、Spring Boot与数据仓库的集成、Spring Boot与ETL工具的集成、Spring Boot的实际应用场景,学会了在实际开发中处理数据仓库与ETL工具集成问题。其中,数据仓库的定义与特点、ETL工具的定义与特点、Spring Boot与数据仓库的集成、Spring Boot与ETL工具的集成、Spring Boot的实际应用场景是本章的重点内容。从下一章开始,我们将学习Spring Boot的其他组件、微服务等内容。

Read more

Flutter for OpenHarmony: Flutter 三方库 ntp 精准同步鸿蒙设备系统时间(分布式协同授时利器)

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net 前言 在进行 OpenHarmony 分布式开发、金融交易或具有严格时效性的业务(如:秒杀倒计时、双因素认证 OTP)时,开发者不能完全信任设备本地的系统时间。用户可能为了某种目的手动篡改时间,或者由于网络同步问题导致时间存在偏差。 ntp 软件包提供了一种直接与互联网授时中心(NTP 服务器)通信的能力。它能绕过本地系统时钟,获取绝对精准的 UTC 时间,并计算出本地时间与真实时间的“偏移量(Offset)”。 一、核心授时原理 ntp 通过测量往返网络延迟来消除误差。 发送 NTP 请求 (UDP) 返回高精度时间戳 鸿蒙 App 全球授时中枢 (pool.ntp.org) 计算网络往返耗时 (RTT) 得出绝对时间偏移量 生成鸿蒙业务专用准时 二、

By Ne0inhk

Flutter 三方库 r_tree 的鸿蒙化适配指南 - 实现极速空间检索与二 freel-dimensional 数据处理的鸿蒙架构实战

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net Flutter 三方库 r_tree 的鸿蒙化适配指南 - 实现极速空间检索与二 freel-dimensional 数据处理的鸿蒙架构实战 在鸿蒙系统中的地图测绘、大屏可视化以及涉及大量点击坐标检测的复杂场景下,如何在庞大的点位数据中完成毫秒级的空间检索?r_tree 做为工业级的空间索引结构,为 Flutter for OpenHarmony 提供了一套成熟的高效搜索方案。本文将带您深入实战其鸿蒙化适配细节。 前言 什么是 R-tree?它是一种用于处理多维数据的平衡树索引结构,类似于 B-tree。在鸿蒙设备上,当我们需要从数万个电子围栏(Geo-fence)中实时判断当前位置位于哪个区域,或者在高密度的工业看板图中精确捕捉用户的每一个点击操作时,普通的遍历循环将导致严重的 UI 掉帧。r_tree 则是解决这一性能瓶颈的绝佳利器。 一、原理分析 / 概念介绍 1.1 空间索引模型 R-tree

By Ne0inhk

AI大模型实用(一)SpringAI接入deepseek示例

一、SpringAI接入deepseek所需环境 * JDK17+(JDK8无法支持) * SpringBoot 3.x * maven 3.6+ 官网地址 https://docs.spring.io/spring-ai/reference/api/chatclient.html SpringAI接入deepseek方式有很多。 下面演示SpringAI原生方式接入deepseek. 注:Spring AI调用Ollama+DeepSeek 二、pom依赖 1、 SpringBoot修改版本 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <

By Ne0inhk
Rust异步编程的错误处理艺术

Rust异步编程的错误处理艺术

Rust异步编程的错误处理艺术 一、异步错误的本质与分类 1.1 异步错误与同步错误的区别 💡在Rust同步编程中,错误通常是通过Result<T, E>类型返回的,Err变体包含了错误信息,程序会阻塞线程直到操作完成。而在异步编程中,操作的结果是一个Future<Output = Result<T, E>>,程序会暂停任务直到操作完成,Err变体可能是IO错误、超时错误、取消错误等异步场景特有的错误。 同步错误示例: usestd::fs::File;usestd::io::Read;// 同步读取文件,阻塞线程fnread_file_sync()->Result<String,std::io::Error>{letmut

By Ne0inhk