【Ubuntu datasophon1.2.1 二开之九:验证离线数据入湖】

【Ubuntu datasophon1.2.1 二开之九:验证离线数据入湖】

Ubuntu datasophon1.2.1 二开之九:验证离线数据入湖

背景

前面一篇已经验证在线数据入湖了,最后一步就是验证离线数据入湖。虽然已经做好坑坑洼洼的准备。但是困难比预想要多得多。花时间要多得多。为了最后一哆嗦,坚持坚持坚持,终于胜利!

环境准备

1. 在datasophon安装好dolphinscheduler 3.1.8

配置租户

在这里插入图片描述

hdfs就是linux用户

创建环境

在这里插入图片描述


告诉ds spark3位置,hdfs位置

修改配置文件

修改bin\env\dolphinscheduler_env.sh

在这里插入图片描述


在这里插入图片描述


总结一下:
1.修改数据库类型为mysql,指定数据库url,用户名及密码
2.修改zk地址
3.修改hadoop位置及它配置目录
4.jdk位置
5.hive及flink位置
修改 work-server\bin\start.sh
就修改一行:
原来:

$JAVA_HOME/bin/java $JAVA_OPTS \ -cp "$DOLPHINSCHEDULER_HOME/conf":"$DOLPHINSCHEDULER_HOME/libs/*" \ org.apache.dolphinscheduler.server.worker.WorkerServer 

改成:

$JAVA_HOME/bin/java $JAVA_OPTS \ -cp "$DOLPHINSCHEDULER_HOME/conf":"$DOLPHINSCHEDULER_HOME/libs/*":${HADOOP_CONF_DIR}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/hdfs/* \ org.apache.dolphinscheduler.server.worker.WorkerServer 

启动时传入hdfs配置目录及home目录

2. 升级spark3版本

datasophon 自带spark3.1.3 ,不支持paimon,换句话paimon,支持spark 从3.2版本开始。必须升级,否则报类找不到,这个问题耽搁我好长时间,paimon从0.7-0.9都试,还试了kafka-connect方式。

数据加工流向图

在这里插入图片描述

遇到坑及填平方法

1.现象: 经典的 NoClassDefFoundError,例如 org/apache/spark/kafka010/KafkaConfigUpdater 和 org/apache/spark/sql/connector/write/Write。

填平方案:

依赖分析: 通过错误栈和GitHub issue确认,KafkaConfigUpdater 类属于 Spark 而非 Paimon,且需要额外的 spark-token-provider-kafka 等JAR包。

版本匹配: 最终放弃在Spark 3.1.3上挣扎,将Spark升级到与Paimon 0.9.0兼容的3.2.4版本,从根本上解决了 Write 类找不到的API不兼容问题。

依赖管理: 明确需要通过 --jars 参数或DataSophon资源中心,将 paimon-spark-3.2-0.9.0.jar 等所有依赖JAR包完整地提供给Spark任务。

2. Spark与Paimon版本不兼容

现象: 使用Paimon 0.8.2或0.9.0时,均报告 Write 类找不到。

填平方案:

放弃低版本组合: 确认Paimon从某个版本开始强依赖Spark 3.2的DataSource V2 API,与你的Spark 3.1.3环境不兼容。

升级Spark: 决定将Spark从 3.1.3 升级到 3.2.4。这一步是解决问题的关键转折点,虽然需要升级组件,但一劳永逸地解决了兼容性问题。

3. HDFS权限问题

现象: 建表或写入时报 Permission denied,用户 root123 无法在 /user/paimon/warehouse 或 /user/hive/warehouse 下创建目录。

填平方案:

使用 sudo -u hdfs: 切换到HDFS超级用户执行权限修复命令。

递归授权: 执行 hdfs dfs -chown -R root123:supergroup /user/paimon 和 hdfs dfs -chmod -R 755 /user/paimon,将目录所有者改为当前用户并赋予写权限。

调整Hive仓库权限: 类似地,对 /user/hive/warehouse 目录也进行了权限调整,确保了元数据操作顺畅。

4. 元数据存储方式选择

现象: 使用Paimon默认的文件系统Catalog时,元数据直接存在HDFS上,导致并发问题和权限困扰。

填平方案:

切换为Hive Metastore: 最终决定使用 Hive Metastore 作为Paimon的Catalog。配置如下:

text
–conf spark.sql.catalog.paimon.metastore=hive
–conf spark.sql.catalog.paimon.uri=thrift://ddp1:9083
明确数据存储路径: 同时指定 spark.sql.catalog.paimon.warehouse,实现元数据(在MySQL/Hive)与数据文件(在HDFS)的分离,使整个架构更清晰、更稳定。

5. 环境与组件升级

现象: 旧版Spark 3.1.3成为兼容性瓶颈,且DataSophon管理着多个组件,升级有顾虑。

填平方案:

确认兼容性: 检查并确认Hadoop 3.3.3、Hive 3.1.3、Kafka 2.4.1等核心组件与Spark 3.2.4和JDK 11兼容。

分步升级: 选择保持JDK 8不变,仅升级Spark到3.2.4,将影响面降到最低,成功绕过JDK升级的风险。

6.Spark 找不到 Kafka 数据源

错误信息:

text
Failed to find data source: kafka
原因: Spark 默认不包含 Kafka 集成包。

解决方案:

bash

下载匹配版本的 Kafka 依赖

spark-sql-kafka-0-10_2.12-3.2.0.jar
kafka-clients-2.8.0.jar
commons-pool2-2.11.1.jar

在 DS Spark 任务的"选项参数"中添加 --jars

–jars /path/to/spark-sql-kafka-0-10_2.12-3.2.0.jar,/path/to/kafka-clients-2.8.0.jar,/path/to/commons-pool2-2.11.1.jar
关键点:

版本必须与 Spark 版本匹配

commons-pool2 是 Kafka 消费者的传递依赖,容易遗漏

7.Paimon 表创建失败(Derby 权限问题)

错误信息:

text
ERROR XBM0H: Directory /…/metastore_db cannot be created.
java.io.FileNotFoundException: derby.log (Permission denied)
原因: Spark 默认使用嵌入式 Derby 作为 Hive Metastore,没有配置 Hive Metastore 时会尝试在临时目录创建 Derby 数据库。

解决方案: 配置使用外部的 Hive Metastore

bash
–conf spark.sql.catalog.paimon.metastore=hive
–conf spark.sql.catalog.paimon.uri=thrift://hive-metastore-host:9083
–conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

8.Kafka Topic 不存在

错误信息:

text
UNKNOWN_TOPIC_OR_PARTITION
原因: 脚本中配置的 topic 名称与实际不符。

解决方案:

bash

查看所有 topic

kafka-topics.sh --bootstrap-server kafka-host:9092 --list

使用正确的 topic 名称

kafka_topic = “user_log_topic” # 而不是 “user_log”

9.Spark 版本不匹配导致类找不到

错误信息:

text
java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
java.lang.ClassNotFoundException: org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig
原因: Spark 版本与 Kafka 依赖版本不匹配。

解决方案:

Spark 3.2.0 → 使用 spark-sql-kafka-0-10_2.12-3.2.0.jar 和 kafka-clients-2.8.0.jar

不能混用 3.2.4 的包

10.DolphinScheduler 参数换行问题

错误信息:

text
–conf: command not found
原因: DS 的"选项参数"中换行导致命令被分割。

解决方案: 所有参数写在一行,不要换行

properties
–jars /path/to/jar1.jar,/path/to/jar2.jar --conf key1=value1 --conf key2=value2

11.Shell 脚本换行符问题

错误信息:

text
$‘\r’: command not found
原因: 在 Windows 上编辑的脚本上传后带有 \r\n 换行符。

解决方案:

bash

转换换行符

sed -i ‘s/\r$//’ script.sh

或直接在 DS 的脚本框中编写,不上传文件

12.Shell 脚本中 Spark 命令缺少配置

错误信息:

text
ERROR XBM0H: Directory /…/metastore_db cannot be created.
原因: Shell 脚本中的 spark-sql 命令没有配置 Hive Metastore。

解决方案: 在 Shell 脚本中的每个 spark-sql 命令都要加上完整配置:

bash
${SPARK_HOME}/bin/spark-sql
–conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog
–conf spark.sql.catalog.paimon.metastore=hive
–conf spark.sql.catalog.paimon.uri=thrift://ddp1:9083
–conf spark.sql.catalog.paimon.warehouse=hdfs://…/warehouse
–conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

13.CSV 导入 ClickHouse 格式错误

错误信息:

text
Code: 117. DB::Exception: Expected end of line: … (INCORRECT_DATA)
原因:

CSV 中的 JSON 字段包含逗号和引号,干扰了 CSV 解析

时间格式 2026-03-29T10:00:00.000Z 不是 ClickHouse 默认格式

解决方案: 使用 TSV 格式替代 CSV

bash

将 CSV 转换为 TSV(逗号 → 制表符)

cat data.csv | sed ‘s/,/\t/g’ | clickhouse client --query “INSERT INTO table FORMAT TSV”

14.Shell 脚本 while 循环只执行一次

错误信息: 循环内的命令导致循环提前退出。

原因: clickhouse-client 命令失败或 echo 干扰了循环变量。

解决方案: 放弃 while 循环,使用管道直接导入

bash

不推荐:while 循环逐行插入

while read line; do
clickhouse-client --query “INSERT …”
done < file.csv

推荐:管道批量导入

cat file.csv | clickhouse-client --query “INSERT INTO table FORMAT TSV”

15. Paimon 表写入失败(表不存在)

错误信息:

text
Schema file not found in location paimon.default.ods_user_log. Please create table first.
解决方案: 先创建表,再写入数据

sql
CREATE TABLE IF NOT EXISTS paimon.default.ods_user_log (…) USING paimon;
或在 Python 脚本中自动创建:

python
if not spark.catalog.tableExists(“paimon.default.ods_user_log”):
spark.sql(“CREATE TABLE …”)

最后

成果截图:

在这里插入图片描述
在这里插入图片描述

各个节点对应脚本
1.kafka_to_paimon

from pyspark.sql import SparkSession from pyspark.sql.functions import col, lit, from_json, to_timestamp from pyspark.sql.types import StructType, StructField, StringType, TimestampType import sys import os bizdate = sys.argv[1]iflen(sys.argv)>1else os.environ.get("bizdate","2026-03-29") spark = SparkSession.builder \ .appName(f"KafkaToPaimon_Batch_{bizdate}") \ .config("spark.sql.catalog.paimon","org.apache.paimon.spark.SparkCatalog") \ .config("spark.sql.catalog.paimon.warehouse","hdfs://nameservice1/user/paimon/warehouse") \ .config("spark.sql.catalog.paimon.metastore","hive") \ .config("spark.sql.catalog.paimon.uri","thrift://ddp1:9083") \ .config("spark.sql.extensions","org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") \ .getOrCreate()print(f"开始处理业务日期: {bizdate}") # 定义 schema json_schema =StructType([StructField("user_id",StringType(), True),StructField("event_time",StringType(), True),StructField("event_type",StringType(), True),StructField("data",StringType(), True)]) # Kafka 配置 kafka_bootstrap_servers ="ddp4:9092,ddp3:9092" kafka_topic ="user_log_topic" # 检查表是否存在,如果不存在则创建 print("检查表是否存在...")try: # 尝试查询表 spark.sql("SELECT 1 FROM paimon.default.ods_user_log LIMIT 1")print("表已存在") except Exception:print("表不存在,正在创建...") create_sql =""" CREATETABLE paimon.default.ods_user_log( user_id STRING, event_time TIMESTAMP, event_type STRING, data STRING, dt STRING)USING paimon PARTITIONEDBY(dt)TBLPROPERTIES('bucket'='4','file.format'='parquet')""" spark.sql(create_sql)print("表创建成功") # 从 Kafka 读取数据 df = spark.read \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \ .option("subscribe", kafka_topic) \ .option("startingOffsets","earliest") \ .option("endingOffsets","latest") \ .load() \ .selectExpr("CAST(value AS STRING) as json_str") # 解析 JSON parsed_df = df.select(from_json(col("json_str"), json_schema).alias("data")).select(col("data.user_id"),to_timestamp(col("data.event_time")).alias("event_time"),col("data.event_type"),col("data.data"),lit(bizdate).alias("dt")).filter(col("user_id").isNotNull()) count = parsed_df.count()print(f"从 Kafka 读取到 {count} 条有效数据")if count >0:print("\n数据样例(前5条):") parsed_df.show(5, truncate=False) # 写入数据 parsed_df.write \ .format("paimon") \ .mode("append") \ .insertInto("paimon.default.ods_user_log")print(f"✅ 写入完成,共 {count} 条记录")else:print("没有数据需要写入") spark.stop()

2.paimon写入ck(实际是写入hdfs,忘记改名称)

from pyspark.sql import SparkSession bizdate ="2026-03-29" spark = SparkSession.builder \ .appName(f"PaimonToClickHouse_{bizdate}") \ .config("spark.sql.catalog.paimon","org.apache.paimon.spark.SparkCatalog") \ .config("spark.sql.catalog.paimon.metastore","hive") \ .config("spark.sql.catalog.paimon.uri","thrift://ddp1:9083") \ .config("spark.sql.catalog.paimon.warehouse","hdfs://nameservice1/user/paimon/warehouse") \ .config("spark.sql.extensions","org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") \ .getOrCreate()print("="*50)print("Paimon → ClickHouse 导入")print(f"业务日期: {bizdate}")print("="*50) # 读取数据 df = spark.sql(f""" SELECT user_id, event_time, event_type, data FROM paimon.default.ods_user_log WHERE dt ='{bizdate}'""") count = df.count()print(f"[1/2] Paimon 表中数据量: {count}")if count >0:print("[2/2] 写入 ClickHouse...") # 保存为 CSV 到 HDFS output_path = f"/tmp/paimon_export_{bizdate}" df.coalesce(1).write.mode("overwrite").option("header","false").csv(output_path)print(f"✅ 数据已导出到 HDFS: {output_path}")print(f" 请使用 clickhouse-client 导入数据")else:print("⚠️ 没有数据需要导入") spark.stop()

3.hdfs导入ck

#!/bin/bash # 环境变量 CH_CLIENT=/opt/datasophon/clickhouse/bin/clickhouse HOST=ddp3 PORT=9000USER=defaultBIZDATE="2026-03-29" echo "==========================================" echo "Paimon → ClickHouse 数据导入" echo "业务日期: ${BIZDATE}" echo "==========================================" # 1. 下载 CSV 文件 echo "[1/5] 下载 CSV 文件..." hdfs dfs -get -f /tmp/paimon_export_${BIZDATE}/part-*.csv /tmp/paimon_data_${BIZDATE}.csv 2>/dev/nullif[!-f /tmp/paimon_data_${BIZDATE}.csv ]; then echo "❌ CSV 文件不存在" exit 1 fi TOTAL_LINES=$(wc -l </tmp/paimon_data_${BIZDATE}.csv) echo "CSV 文件行数: ${TOTAL_LINES}" # 2. 创建 ClickHouse 表(如果不存在) echo "[2/5] 创建 ClickHouse 表(如果不存在)..." ${CH_CLIENT} client --host ${HOST}--port ${PORT}--user ${USER}--query " CREATETABLEIFNOTEXISTSdefault.user_log( user_id String, event_time String, event_type String, data String )ENGINE=MergeTree()ORDERBY user_id " 2>/dev/null # 3. 删除当前日期的旧数据(避免重复) echo "[3/5] 删除当前日期的旧数据..." ${CH_CLIENT} client --host ${HOST}--port ${PORT}--user ${USER}--query " ALTERTABLEdefault.user_log DELETEWHERE event_time LIKE'${BIZDATE}%' " 2>/dev/null # 4. 导入数据(使用 TSV 格式) echo "[4/5] 导入数据到 ClickHouse..." # 将 CSV 转换为 TSV(用制表符分隔)后导入 cat /tmp/paimon_data_${BIZDATE}.csv | sed 's/,/\t/g'| ${CH_CLIENT} client --host ${HOST}--port ${PORT}--user ${USER}--query "INSERT INTO default.user_log FORMAT TSV" # 5. 验证导入结果 echo "[5/5] 验证导入结果..."COUNT=$(${CH_CLIENT} client --host ${HOST}--port ${PORT}--user ${USER}--query "SELECT COUNT(*) FROM default.user_log WHERE event_time LIKE '${BIZDATE}%'"2>/dev/null)TOTAL=$(${CH_CLIENT} client --host ${HOST}--port ${PORT}--user ${USER}--query "SELECT COUNT(*) FROM default.user_log"2>/dev/null) echo "==========================================" echo "✅ 导入完成!" echo " 本次导入行数: ${TOTAL_LINES}" echo " 当前日期数据量: ${COUNT}" echo " ClickHouse 总数据量: ${TOTAL}" echo "==========================================" # 显示数据样例 if["${TOTAL_LINES}"-gt 0]; then echo "" echo "新增数据样例:" ${CH_CLIENT} client --host ${HOST}--port ${PORT}--user ${USER}--query "SELECT * FROM default.user_log WHERE event_time LIKE '${BIZDATE}%' LIMIT 3"--format PrettyCompact fi # 清理临时文件 rm -f /tmp/paimon_data_${BIZDATE}.csv echo "" echo "==========================================" echo "导入任务完成" echo "=========================================="

如需沟通:lita2lz

Read more

Codex / OpenCode / Cursor / OpenClaw 对比指南

前提说明:这四个工具并不处于同一维度。Cursor 和 Codex 更接近“主开发工作台”,OpenCode 是“开源终端 Agent”,OpenClaw 则更像“把 Agent 接入聊天软件的网关”。因此在横向对比前,先明确各自定位,才不会拿错标尺。 核心定位速览 工具本质定位主要使用界面模型 / 生态策略最适合谁CodexOpenAI 原生编程 Agent / 自动化开发平台CLI 终端、本地环境、云端任务流以 OpenAI 模型体系为核心已深度使用 ChatGPT / OpenAI,希望 Agent 直接读写仓库、执行命令、跑自动化流程CursorAI 原生 IDE(编辑器中心)编辑器主界面 + 内置终端 + Diff 审查支持 OpenAI / Anthropic / Google 等多模型,切换灵活追求一体化开发体验,希望写码、

HarmonyOS鸿蒙PC的QT应用开发:QT项目运行原理与 EmbeddedUIExtensionAbility介绍

HarmonyOS鸿蒙PC的QT应用开发:QT项目运行原理与 EmbeddedUIExtensionAbility介绍

好消息,2026年3.31日,QT官方正式发布鸿蒙版QT。本次开源发布正式推出面向鸿蒙系统平板和PC设备的Qt 5.12.12 LTS 适配版本,在完整保留 Qt 5.12.12 核心能力(含界面渲染、信号槽机制、跨平台 I/O、网络通信及数据库模块)的基础上,深度适配鸿蒙系统架构。本版本可降低开发者跨平台移植成本,加速 Qt 与鸿蒙生态融合,助力多场景鸿蒙应用高效开发。 QT官方鸿蒙版开源地址:https://wiki.qt.io/Qt5.12.12_Open_Source_Release_for_HarmonyOS_zh QT官方文档地址:https://wiki.qt.io/Qt_for_

vector

vector

vector * 1 vector的介绍 * 2 vector的使用 * 2.1 vector的定义 * 2.2 vector iterator的使用 * 2.3 vector空间 * 2.4 vector增删查改 * 2.5 迭代器失效问题 1 vector的介绍 vector是C++标准模板库(STL)中最常用的序列容器之一,封装了动态数组,可以自动管理内存,提供随机访问、动态扩容等功能,可以把vector理解成一个数组。 基本特性 动态数组: 大小可变,插入/删除元素时自动调整容量 连续存储: 元素在内存中连续存放,支持高效的随机访问 类型安全: 模板类,存储特定类型的对象 内存自动管理: 自动分配和释放内存,避免手动new[]/delete[] 2 vector的使用 2.

Python + Ollama 本地跑大模型:零成本打造私有 AI 助手

Python + Ollama 本地跑大模型:零成本打造私有 AI 助手

零 API 费用、零数据泄露风险、完全离线可用。本文带你从安装到实战,30 分钟跑起一个本地 AI 助手。 一、为什么要在本地跑大模型? 对比维度云端 API(ChatGPT / Claude)本地模型(Ollama)费用按量付费,$20/月起完全免费数据隐私数据上传到云端数据留在本地网络依赖必须联网离线可用模型选择固定自由切换开源模型硬件要求无需要一定配置 38%27%18%12%5%选择本地大模型的理由(2026年开发者调查)数据隐私与安全零成本长期使用离线可用可自由定制微调其他 二、Ollama 是什么? Ollama 是一个开源的本地大模型运行框架,核心特点: * 一键拉取模型:类似 docker pull 的体验 * 自动适配硬件:根据你的显存/内存自动量化 * 兼容 OpenAI API 格式:现有代码几乎不用改 * 跨平台:Windows