最近 LlamaIndex 推出了一项仍处于 beta 阶段的新特性:Workflows。它提供了一种事件驱动的编排方式,专门用来构建复杂的 RAG 与 Agent 工作流,和 LangChain 的 LangGraph(基于图的编排)思路很不一样。我第一时间上手玩了一下,感觉这种设计在处理多分支、循环以及需要精细控制中间过程的场景时,比之前的 Query Pipeline 灵活太多。
这篇文章是系列的上篇,主要聊聊 Workflows 的基本思想和简单用法。下篇会用更复杂的多分支与循环案例深入。
旧手段的局限
无论是 LangChain 还是 LlamaIndex,早期的做法无非两类:一是提供更高层的封装,比如 RouterRetriever、带重排参数的 QueryEngine;二是引入链式或 DAG 结构,如 LangChain 的 LCEL、LlamaIndex 的 Query Pipeline。这些方式应付简单串联没问题,但碰上需要循环或复杂条件分支的场景就很吃力。比如你希望 Agent 能在生成、反思、工具调用之间反复跳转,用链式表达几乎不可能;更别提调试时,黑盒组件里到底发生了什么,你很难插手。
因此,更强大的编排能力成了刚需,LangGraph 和 LlamaIndex Workflows 都是为了解决这个痛点而生。
Workflows 的核心:事件驱动,而非图调度
LlamaIndex Workflows 的设计核心是事件驱动。它把工作流中的每个处理环节定义为 step(一个 Python 函数),step 可以'订阅'特定的 event。当一个 event 被发出时,订阅了它的 step 就会自动执行。这种机制让组件之间的连接不再由框架预先定义死的边来控制,而是由 step 自己决定要接收什么、产出什么。
几个关键概念:
- Workflow:代表整个端到端流程,调用
run()启动。 - Step:工作流中的一个任务,用
@step装饰器标记,函数签名通过输入参数声明要接收的 event 类型。 - Event:Pydantic 对象,step 之间传递数据的载体。系统预定义了
StartEvent(起始)和StopEvent(结束)。 - Context:可选的全局状态,所有 step 都能读写,但多数数据流转直接通过 event 更清晰。
和 LangGraph 对比:LangGraph 显式定义节点(node)和边(edge),由编译器决定执行顺序;Workflows 则是隐式路由,step 之间的触发关系由 event 类型匹配自动完成。这带来的好处是,重写一个 step 不影响其他部分,分支逻辑可以分散在各 step 内部,不需要维护一张集中式图。
一个示例:带重排的 RAG 工作流
我们用 Workflows 实现一个最简单的 RAG 加 Rerank 流程,包含索引和生成两个阶段。流程图大致如下(实际运行时框架会动态调度):
- 索引阶段:加载文档并创建向量索引。
- 生成阶段:检索 → 重排 → 生成答案。
这两个阶段通过 StartEvent 是否携带 query 参数来分流。索引 step 没有 query 就直接执行并返回 StopEvent;检索 step 只有收到带 query 的 StartEvent 才干活。
代码直接看下面:
from llama_index.core.workflow import Event, Context, Workflow, StartEvent, StopEvent, step
from llama_index.core.schema import NodeWithScore
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex
from llama_index.core.response_synthesizers CompactAndRefine
llama_index.core.postprocessor.llm_rerank LLMRerank
llama_index.llms.openai OpenAI
llama_index.embeddings.openai OpenAIEmbedding
():
nodes: [NodeWithScore]
():
nodes: [NodeWithScore]
():
() -> StopEvent | :
ev.get():
documents = SimpleDirectoryReader(input_files=[]).load_data()
index = VectorStoreIndex.from_documents(documents=documents)
()
StopEvent(result=index)
() -> RetrieverEvent | :
ev.get():
query = ev.get()
index = ev.get()
ctx.(, query)
retriever = index.as_retriever(similarity_top_k=)
nodes = retriever.aretrieve(query)
RetrieverEvent(nodes=nodes)
() -> RerankEvent:
ranker = LLMRerank(choice_batch_size=, top_n=, llm=OpenAI(model=))
new_nodes = ranker.postprocess_nodes(ev.nodes, query_str= ctx.get(, default=))
RerankEvent(nodes=new_nodes)
() -> StopEvent:
llm = OpenAI(model=)
summarizer = CompactAndRefine(llm=llm, streaming=, verbose=)
query = ctx.get(, default=)
response = summarizer.asynthesize(query, nodes=ev.nodes)
StopEvent(result=response)


