跳到主要内容
极客日志极客日志
首页博客AI提示词GitHub精选代理工具
搜索
|注册
博客列表
Javajava

SeaTunnel 数据集成实战:多场景数据同步配置指南

SeaTunnel 支持多种数据源连接器,涵盖 MySQL、Hive、Kafka 等。演示了三种典型同步场景:基于 JDBC 的全量批量同步、结合 Hive Metastore 的数据仓库写入、以及基于时间戳的增量抽取和 MySQL CDC 实时流处理。重点解析了配置文件中的关键参数,如连接信息、查询语句、作业模式及依赖包管理,帮助开发者快速搭建稳定可靠的数据管道。

FrontendX发布于 2025/2/7更新于 2026/4/250 浏览
SeaTunnel 数据集成实战:多场景数据同步配置指南

连接器类型概览

SeaTunnel 的连接器生态非常丰富,基本覆盖了主流的数据存储和计算组件。无论是关系型数据库、NoSQL 还是大数据组件,都能找到对应的 Source 和 Sink 实现。

SourceSink
ClickhouseClickhouse
ElasticsearchElasticsearch
FakeSourceFakeSource
FtpFtp
Github/GitlabGithub/Gitlab
GreenplumGreenplum
Hdfs fileHdfs file
HiveHive
HttpHttp
Hudi/IcebergHudi/Iceberg
JDBCJDBC
KuduKudu
MongoDBMongoDB
Mysql / MySQL CDCMysql / MySQL CDC
RedisRedis
KafkaKafka
StarRocksStarRocks
PhoenixPhoenix
......

MySQL 到 MySQL 全量同步

这是最基础也是最常用的场景。配置时主要关注 Source 端的查询语句和 Sink 端的写入策略。

核心参数包括连接 URL、驱动类名以及用户凭证。在 Source 端,query 字段决定了读取范围;Sink 端则通过 query 指定插入逻辑,支持预编译语句以提高性能。

env {
  execution.parallelism = 2
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
        query = "select * from base_region limit 4"
    }
}

transform {
    # 此处可配置 SQL 转换插件
}

sink {
  jdbc {
    url = "jdbc:mysql://127.0.0.1:3306/dw"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "user"
    password = "password"
    query = "insert into base_region(id,region_name) values(?,?)"
  }
}

启动作业只需指定配置文件路径:

./bin/seatunnel.sh --config ./config/mysql2mysql_batch.conf

MySQL 到 Hive 同步

将数据同步至数仓通常涉及引擎依赖问题。如果使用 Spark 或 Flink 引擎,需确保环境已集成 Hive;若使用 SeaTunnel Zeta 引擎,则需手动补充相关 Jar 包。

Zeta 引擎依赖:

  • seatunnel-hadoop3-3.1.4-uber.jar
  • hive-exec-2.3.9.jar

请将上述文件放入 $SEATUNNEL_HOME/lib/ 目录下。

配置示例如下,注意 metastore_uri 需指向正确的 Hive Metastore 地址。

env {
  job.mode = "BATCH"
}

source {
    Jdbc {
        url = "jdbc:mysql:///127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        query = "select * from source_user"
    }
}

sink {
  Hive {
    table_name = "ods.sink_user"
    metastore_uri = "thrift://bigdata101:9083"
  }
}

运行命令与之前类似:

./bin/seatunnel.sh --config ./config/mysql2hive.conf

增量同步策略

生产环境中往往需要按时间维度进行增量抽取。这里以订单明细表为例,根据 create_time 字段过滤数据。

关键点:

  1. 源端查询需包含时间范围判断。
  2. 利用 Shell 变量传递日期参数(如 ${etl_dt}),便于调度系统调用。
  3. 注意时间格式转换,避免字符串比较错误。
env {
  execution.parallelism = 2
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
        query = "select * from t_order_detail where create_time >= REPLACE('\"${etl_dt}\"', 'T', ' ') and create_time < date_add(REPLACE('\"${etl_dt}\"', 'T', ' '),interval 1 day);"
    }
}

sink {
  jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
    query = "insert into ods_t_order_detail_di (id,order_id,sku_id,sku_name,img_url,order_price,sku_num,create_time) values(?,?,?,?,?,?,?,?)"
  }
}

执行时通过 -i 参数传入日期:

./bin/seatunnel.sh --config ./config/mysql2mysql_ods_t_order_detail_di.conf -i etl_dt='2024-02-05'

实时同步 (CDC)

对于对时效性要求极高的场景,可以开启流式模式 (STREAMING)。基于 MySQL CDC 可以实现准实时的数据变更捕获。

注意事项:

  • 必须设置 checkpoint.interval 以保证故障恢复能力。
  • Sink 端建议开启 generate_sink_sql = true,让框架自动生成插入语句,减少维护成本。
env {
  execution.parallelism = 2
  job.mode = "STREAMING"
  checkpoint.interval = 10000
}

source {
  MySQL-CDC {
    username = "user"
    password = "password"
    table-names = ["test.source_user"]
    base-url = "jdbc:mysql://127.0.0.1:3306/test"
  }
}

sink {
  jdbc {
    url = "jdbc:mysql://127.0.0.1:3306/dw"
    driver = "com.mysql.cj.jdbc.Driver"
    username = "user"
    password = "password"
    generate_sink_sql = true
    database = "dw"
    table = "source_user_01"
    primary_keys = ["userid"]
  }
}

启动脚本:

./bin/seatunnel.sh --config ./config/mysql2mysql_rt.conf

目录

  1. 连接器类型概览
  2. MySQL 到 MySQL 全量同步
  3. MySQL 到 Hive 同步
  4. 增量同步策略
  5. 实时同步 (CDC)
  • 💰 8折买阿里云服务器限时8折了解详情
  • 💰 8折买阿里云服务器限时8折购买
  • 🦞 5分钟部署阿里云小龙虾了解详情
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog

更多推荐文章

查看全部
  • MySQL 表结构修改与数据查询基础
  • 码良:一款高扩展性的在线 Web 页面构建工具
  • DeepSeek 完整使用手册:功能、技巧与 API 集成指南
  • AI 产品经理工作全流程与模型构建实战指南
  • Android Binder 线程池机制详解
  • DeepSeek 20 个实用建议:普通人如何高效使用大模型
  • 2024 年人工智能 PC 行业趋势与消费者洞察白皮书
  • Agentic RAG:基于多文档的 AI Agent 智能体构建指南
  • 大模型本地部署指南:基于 llama.cpp 在 CPU 上运行 LLaMA2
  • AI 产品经理转行大模型指南:核心素质与学习路径
  • 基于 Hugging Face 与 TRL 微调大语言模型实战
  • 基于大模型构建本地知识库的技术实践
  • BERT 详解:自然语言处理 (NLP) 基础与进阶指南
  • NLP 基础:BERT 模型原理、训练与压缩技术详解
  • Embeddings 技术详解与 Word2Vec 模型训练指南
  • RAG 应用开发的 7 种主流 Embedding 模型解析
  • 使用 Mistral 和 Llama2 构建 AI 聊天机器人
  • 大模型评估 LLM Evals:5 种核心策略与实战应用指南
  • LLM 长文本处理技术综述:位置编码与注意力机制优化
  • OpenAI GPT-4o 免费策略分析与 AI 工具推荐

相关免费在线工具

  • Keycode 信息

    查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online

  • Escape 与 Native 编解码

    JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online

  • JavaScript / HTML 格式化

    使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online

  • JavaScript 压缩与混淆

    Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online