前言
ezdata 是一个基于 Vue3 前端、Python 后端及 LLM(大语言模型)开发的数据处理分析和任务调度系统。该系统旨在解决多数据源查询工具切换频繁、脚本部署杂乱以及定时任务管理困难等痛点。核心功能包括数据源管理、数据模型管理、数据集成、数据查询 API 接口、低代码自定义数据处理任务模版,以及单任务及 DAG(有向无环图)任务工作流调度。
系统集成了 LLM 模块实现 RAG(检索增强生成)知识库问答,能够链接各数据源数据进行数据对话问答,提供交互式数据分析功能。通过自然语言转 SQL 或 Python 代码的技术路径,降低数据分析门槛,提升数据利用效率。
背景与痛点
在实际业务场景中,经常需要进行数据抓取并存储到不同的数据源中,包括文件、关系型数据库、NoSQL 数据库、时序数据库、图数据库等,同时伴随大量的数据查询和分析工作。经过实践调研,发现以下主要问题:
- 工具割裂:不同数据源查询时需要使用不同的专用工具查看,对于简单的查询和分析需频繁切换可视化工具,操作繁琐且上下文丢失。
- 脚本管理混乱:Python 脚本部署杂乱,缺乏统一的监控和日志管理。定时任务的动态更新时间策略较麻烦,难以管理和监控执行状态。虽然调研了部分成熟的任务调度系统(如 DolphinScheduler、Airflow 等),但大多基于 Java 生态或需要部署较重的服务才可运行,资源消耗较大,不适合轻量级场景。
- 任务复用性差:同一脚本任务需根据不同参数部署多份,维护成本高。希望将任务固化成模版,通过表单化配置任务模版参数,稍作修改即可运行一个新的同类型的任务,提高开发效率。
基于上述问题,决定自行设计一套系统,方便进行数据的简单查询处理和脚本任务的动态配置、定时调度运行和状态监控。
技术选型
考虑到个人开发精力有限,遵循以下原则进行技术选型:
- 沿用成熟框架:尽量沿用现有的开源成熟框架和库,直接搬运相关模块代码到项目中或做少量修改,降低开发风险。
- 轻量级与可扩展:确保系统轻量级,支持单机部署在小型服务器上,后期可动态扩容到多台机器或 K8s 集群上分布式运行。
- 快速交付:前端采用成熟的低代码平台进行开发,减少页面组件和 RBAC 权限菜单模块的开发时间,暂时忽略 UI 优化,优先实现功能,后续迭代优化。
中间件层
- MySQL:作为业务数据库,存储元数据、任务配置及用户信息。
- Redis:提供任务队列和缓存相关的功能,加速高频访问数据的读取。
- 对象存储:文件存储可选择性使用本地存储或对接 MinIO、阿里云 OSS 等第三方服务,用于存储非结构化数据及日志文件。
- Elasticsearch (ES):引入 ES 作为日志中间件和大量数据时的数据存储、检索、对外服务的中间件,便于分析任务日志和提供基础的大数据量查询。
后端层
- Flask:使用 Flask 作为 API 服务框架,保持轻量灵活。
- Flask-APScheduler:引入该框架做任务调度,开放 API 接口方便前端做任务的动态配置和启停、定时触发等功能。
- Celery:使用 Celery 做分布式任务分发执行模块,借助 Celery Flower 模块开放的 API 接口实现任务队列管理、执行中任务监控等功能,确保异步任务的高可靠性。
前端层
- Vue3 + Ant Design:使用了基于 Vue3 和 Ant Design 的 Jeecg 低代码系统,省去了开发各种前端组件和 RBAC 权限菜单模块的工作,快速搭建出基础的系统前端页面。
数据模块设计
创建一个统一的数据模型概念,将各数据源类型抽象为统一数据模型。支持的数据源类型包括:文件、API、数据库表、数据库查询 SQL、Binlog 数据流、Kafka 数据流、NoSQL 文档记录等。
在统一数据模型上支持创建、删除、字段管理、查询取数、封装数据查询 API 接口等各种功能。不同数据模型具有不同的功能类型,并使用前端低代码系统快速拓展实现数据源的管理和创建新的数据模型类型和相关拓展功能。
查询与 ETL 流程
查询数据时,根据系统配置的相关连接参数,实例化对应的数据模型 Reader 对象。Reader 对象负责从源头获取原始数据,并根据前端配置的过滤条件、原生查询等表单参数对数据源进行查询和数据组装返回,实现数据对象的即时查询。
针对数据读取对象,通过内部配置的固定数据处理逻辑或编写自定义代码,即可将查询的原始数据转换为目标数据源所需的数据结构。随后实例化目标数据模型 Writer 对象,实现 ETL(抽取、转换、加载)数据集成管道功能。这种设计使得数据流转过程标准化,便于维护和扩展。
与大模型结合
随着 ChatGPT 等大模型应用的火爆,开始思考如何与现有数据查询模块结合实现自然语言到数据结果的转化。在调研了较多相关项目(ChatDB、NL2SQL、ChatBI 相关开源项目)并结合现有数据查询模块后,决定实现一套基于代码生成的 AI 数据分析工具。
核心实现逻辑
- Prompt 生成:首先,对实例化的数据模型 Reader 实现一个生成模型使用介绍 Prompt 的函数。该函数生成了该数据源相关数据样貌、表结构、前几条数据示例以及该 Reader 对象读取数据的 Python 代码示例等详细信息,供 LLM 学习上下文。
- RAG 检索增强:针对 MySQL 中有大量表或 Elasticsearch 中有大量索引,无法一次性给出所有表结构信息提示的情况,使用了 RAG 相关技术。将每个数据表结构或索引 Mapping 分割为一个个知识块,加入向量数据库。然后使用 RAG 检索增强功能,根据用户问题检索出相关所需知识段,作为提示词的一部分,减少 Token 消耗并提高准确性。
- 代码生成与执行:根据用户问题,RAG 检索相关知识库,加上 Reader 对象的使用提示,组合成代码生成目标结果的提示词。LLM 生成对应代码后,使用沙箱环境中的代码执行器获取对应数据结论、数据 DataFrame、统计报表 HTML 等各类型数据结果,传送给前端实现回答问题、导出数据表格、绘制数据报表等功能。
- 自我修正机制:若 LLM 生成代码执行错误,将错误信息反馈给 LLM,让其反思修复此代码,并使用新生成的代码重新生成数据结果,形成闭环优化。
效果优化及后续计划
经过开发,基本实现了针对各种数据源的自然语言查询数据和对话问答功能。使用了 RAG 技术将正确的回答进行了手动标记生成问答对知识段加入知识库,下次问到类似问题时,能更好更精准的实现对应的数据分析需求。
并开发了 LLM 对话的 RAG 知识库问答、Agent 工具调用等 AI 对话场景,将数据对话封装成为了其中的数据分析工具,拓展了对话系统的应用丰富度。后续将着重开发以下模块:
- AI 对话应用优化:使基础的 AI 对话具备对指定数据源的数据查询分析功能,无需额外配置即可直接调用。
- 服务化封装:将 AI 应用封装成服务,支持使用开放对话页面或 API 接口访问对应对话应用,便于第三方系统集成。
- 自动化 ETL:将 AI 应用到数据集成模块中,实现 AI 自动化的 ETL 数据管道搭建,根据数据特征自动推荐转换逻辑。
- 更多应用场景:探索更多数据和 AI 结合的应用案例,如智能预警、趋势预测等。
部署与维护建议
为了保证系统的稳定性和安全性,建议在部署时注意以下几点:
- 环境隔离:代码执行器应运行在独立的沙箱环境中,防止恶意代码对宿主机造成破坏。
- 密钥管理:数据库连接字符串、API Key 等敏感信息应存储在环境变量或专门的密钥管理服务中,避免硬编码在代码里。
- 监控告警:利用 Prometheus 和 Grafana 对 Celery Worker 和 Flask 服务进行监控,设置关键指标(如 CPU、内存、任务积压数)的告警阈值。
- 备份策略:定期备份 MySQL 数据库和 Redis 数据,确保在发生故障时能快速恢复。
总结
ezdata 项目通过整合传统数据处理技术与前沿的大模型能力,提供了一个轻量级、易扩展的数据分析与任务调度解决方案。它解决了多工具切换和脚本管理混乱的问题,并通过 RAG 和代码生成技术降低了数据分析的门槛。未来随着功能的完善,有望成为企业级数据智能化转型的有力工具。