SpringBoot整合Flink CDC,实时追踪mysql数据变动

SpringBoot整合Flink CDC,实时追踪mysql数据变动

❃博主首页 :「码到三十五」 ,同名公众号 :「码到三十五」,wx号 : 「liwu0213」
☠博主专栏 :<mysql高手> <elasticsearch高手> <源码解读> <java核心> <面试攻关>
♝博主的话 :搬的每块砖,皆为峰峦之基;公众号搜索「码到三十五」关注这个爱发技术干货的coder,一起筑基


我们将整合Spring Boot和Apache Flink CDC(Change Data Capture)来实现实时数据追踪。下面是一个基本的实践流程代码,包括搭建Spring Boot项目、整合Flink CDC以及实现数据变动的实时追踪。

文章目录

前言

Flink CDC(Flink Change Data Capture)是一种基于数据库日志的CDC技术,它实现了一个全增量一体化的数据集成框架。与Flink计算框架相结合,Flink CDC能够高效地实现海量数据的实时集成。其核心功能在于实时监视数据库或数据流中的数据变动,并将这些变动抽取出来,以便进行进一步的处理和分析。借助Flink CDC,用户可以轻松地构建实时数据管道,实时响应和处理数据变动,为实时分析、实时报表和实时决策等场景提供有力支持。

Flink CDC的应用场景广泛,包括但不限于实时数据仓库更新、实时数据同步和迁移以及实时数据处理等。它还能确保数据一致性,并在数据发生变更时准确地进行捕获和处理。此外,Flink CDC支持与多种数据源进行集成,如MySQL、PostgreSQL、Oracle等,并提供了相应的连接器,便于数据的捕获和处理。

接下来,将详细介绍MySQL CDC的使用。MySQL CDC连接器允许从MySQL数据库中读取快照数据和增量数据。

1. MySQL开启Binlog

MySQL中开启binlog功能,需要修改配置文件中(如Linux的/etc/my.cnf或Windows的\my.ini)的[mysqld]部分设置相关参数:

[mysqld] server-id=1 # 设置日志格式为行级格式 binlog-format=Row # 设置binlog日志文件的前缀 log-bin=mysql-bin # 指定需要记录二进制日志的数据库 binlog_do_db=testjpa 

除了开启binlog功能外,还需要为Flink CDC配置相应的权限,以确保其能够正常连接到MySQL并读取数据。这包括授予Flink CDC连接MySQL的用户必要的权限,如SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SHOW VIEW等。这些权限是Flink CDC读取数据和元数据所必需的。

检查是否已开启binlog功能:

mysql>SHOW VARIABLES LIKE'log_bin';+---------------+-------+| Variable_name |Value|+---------------+-------+| log_bin |ON|+---------------+-------+

至此,MySQL的相关配置已完成。

2. 创建Spring Boot项目

首先,你需要创建一个Spring Boot项目。可以使用Spring Initializr(https://start.spring.io/)来快速生成项目。

3. 添加依赖

pom.xml中添加Apache Flink和Flink CDC的依赖。以下是必要的依赖:

<dependencies><!-- Flink dependency --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><!-- Spring Boot dependencies --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency></dependencies>

4. 配置Flink和MySQL CDC

在Spring Boot的application.ymlapplication.properties文件中配置Flink和MySQL数据库连接:

flink:checkpoint:interval:10000parallelism:1spring:datasource:url: jdbc:mysql://localhost:3306/your_database username: your_username password: your_password 

5. 实现数据实时追踪

创建一个服务类来实现数据的实时追踪:

importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importorg.springframework.stereotype.Service;@ServicepublicclassFlinkCdcService{publicvoidstartDataStreaming(){finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();finalStreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);// 使用Flink CDC连接MySQLString name ="inventory"; tableEnv.executeSql("CREATE TABLE "+ name +" ("+" id INT,"+" name STRING,"+" description STRING,"+" weight DECIMAL(10, 3)"+") WITH ("+" 'connector' = 'mysql-cdc',"+" 'hostname' = 'localhost',"+" 'port' = '3306',"+" 'username' = 'your_username',"+" 'password' = 'your_password',"+" 'database-name' = 'your_database',"+" 'table-name' = 'your_table'"+")");// 查询并打印结果DataStream<String> dataStream = tableEnv.sqlQuery("SELECT * FROM "+ name).execute().print();try{ env.execute("Flink CDC Demo");}catch(Exception e){ e.printStackTrace();}}}

6. 启动Spring Boot应用

在你的Spring Boot应用的启动类中调用FlinkCdcServicestartDataStreaming方法来启动数据追踪:

importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.CommandLineRunner;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublicclassFlinkCdcApplicationimplementsCommandLineRunner{@AutowiredprivateFlinkCdcService flinkCdcService;publicstaticvoidmain(String[] args){SpringApplication.run(FlinkCdcApplication.class, args);}@Overridepublicvoidrun(String... args)throwsException{ flinkCdcService.startDataStreaming();}}

7. 运行并测试

运行Spring Boot应用,并在MySQL数据库中做出一些数据变动。你应该能在控制台看到实时打印的数据变动。


关注公众号[码到三十五]获取更多技术干货 !

Read more

【亮数据 × Dify】零代码秒搭 AI 实时爬虫,数据伸手就来!

【亮数据 × Dify】零代码秒搭 AI 实时爬虫,数据伸手就来!

主要演示了如何用亮数据(Bright Data)+ Dify 零代码搭建一个 AI 实时爬虫工具,实现自动抓取网页数据并生成分析报告。核心流程如下: ✅ 总结: 1. 工具介绍 * Dify:开源的大语言模型应用开发平台,支持无代码搭建 AI 应用。 * 亮数据(Bright Data):提供网页抓取服务,支持 API 调用。 2. 操作步骤 步骤内容1. 登录 Dify 云需科学上网,支持 GitHub / 谷歌 / 邮箱登录。2. 安装插件在 Dify 插件市场通过 GitHub 链接安装“亮数据”插件。3. 创建应用新建空白应用 → 选择“工作流”模式 → 命名项目。4. 配置工作流构建

使用 VS Code 连接 MySQL 数据库

使用 VS Code 连接 MySQL 数据库

文章目录 * 前言 * VS Code下载安装 * 如何在VS Code上连接MySQL数据库 * 1、打开扩展 * 2、安装MySQL插件 * 3、连接 * 导入和导出表结构和数据 前言 提示:这里可以添加本文要记录的大概内容: 听说VS Code不要钱,功能还和 Navicat 差不多,还能在上面打游戏 但是没安装插件是不行的 发现一个非常牛的博主 还有一个非常牛的大佬 提示:以下是本篇文章正文内容,下面案例可供参考 VS Code下载安装 VS Code下载安装 如何在VS Code上连接MySQL数据库 本篇分享是在已有VS Code这个软件的基础上,数据库举的例子是MySQL 1、打开扩展 2、安装MySQL插件 在搜索框搜索 MySQL和 MySQL Syntax,下载这三个插件 点击下面的插件,选择【install】安装

阿里云全品类 8 折券限时领,建站 / AI / 存储通用 立即领取