Pathway 实时数据框架
Pathway 是一个用于流处理、实时分析、LLM 管道和 RAG 的 Python ETL 框架。它拥有易于使用的 Python API,能无缝集成各类 Python ML 库,代码可在开发和生产环境中使用,有效处理批量和流式数据。
项目信息
- 项目地址:https://github.com/pathwaycom/pathway
- 主要语言:Python
- Stars: 55.9k

核心功能
- 流处理与实时分析:适用于事件处理和实时分析管道,如实时 ETL、事件驱动的警报管道、实时分析等。
- LLM 管道与 RAG:提供专门的 LLM 工具,可构建实时 LLM 和 RAG 管道,包含常见 LLM 服务的包装器和实用工具。
- 数据连接:具备多种连接器,可连接 Kafka、GDrive、PostgreSQL、SharePoint 等外部数据源,通过 Airbyte 连接器能连接 300 多种不同数据源,也可自定义连接器。
- 数据转换:支持无状态和有状态的转换,如连接、窗口化和排序,许多转换直接在 Rust 中实现,也可使用任意 Python 函数。
- 持久化:提供持久化功能,可保存计算状态,便于在更新或崩溃后重启管道。
- 一致性处理:自动处理时间问题,确保计算结果的一致性,能处理延迟和乱序数据点,免费版提供'至少一次'一致性,企业版提供'恰好一次'一致性。
- LLM 辅助工具:提供 LLM 扩展,包含集成 LLM 与数据管道所需的实用工具,如 LLM 包装器、解析器、嵌入器、拆分器,以及内存中的实时向量索引,还集成了 LlamaIndex 和 LangChain。
优势
- 易用性:提供简单的 Python API,方便用户使用,可轻松集成喜欢的 Python ML 库。
- 通用性:同一代码可用于本地开发、CI/CD 测试、批量作业、流重放和数据流处理。
- 高性能:由 Rust 引擎驱动,支持多线程、多进程和分布式计算,能突破 Python 的限制。
- 可扩展性:适用于不同规模的项目,企业版支持分布式计算和 Kubernetes 部署。
- 监控方便:自带监控仪表盘,可跟踪每个连接器发送的消息数量和系统延迟,还包含日志消息。
安装与使用
使用 pip install -U pathway 进行安装,要求 Python 3.10 或更高版本,支持 MacOS 和 Linux,其他系统需在虚拟机上运行。
示例代码如下:
import pathway as pw
# Define the schema of your data (Optional)
class InputSchema(pw.Schema):
value: int
# Connect to your data using connectors
input_table = pw.io.csv.read(, schema=InputSchema)
filtered_table = input_table.(input_table.value >= )
result_table = filtered_table.reduce(sum_value=pw.reducers.(filtered_table.value))
pw.io.jsonlines.write(result_table, )
pw.run()


