跳到主要内容LlamaIndex 工作流核心组件与实战指南 | 极客日志PythonAI算法
LlamaIndex 工作流核心组件与实战指南
LlamaIndex Workflows 是构建复杂大模型应用的轻量级事件驱动框架。它将应用拆解为独立步骤,通过事件通信,支持并行执行、状态共享及可视化调试。核心组件如 Event、Context 及 Step 装饰器,涵盖分支循环、流式处理、并发执行及嵌套工作流等高级用法。结合智能客服实战案例,展示如何利用检查点机制保障长流程稳定性,帮助开发者在灵活性与可控性之间找到平衡,高效编排多智能体协作系统。
DataScient12 浏览 背景与概述
构建大语言模型(LLM)应用时,我们常面临复杂场景:需多次调用模型、查询多数据源、执行分支逻辑或人工介入。传统顺序代码耦合度高、难调试、扩展性差。
LlamaIndex Workflows 是专为解决这些问题设计的轻量级事件驱动框架。2025 年 6 月发布的 1.0 正式版作为独立 Python/TypeScript 包,核心理念是将 AI 应用拆解为独立'步骤'(Step),通过'事件'(Event)通信,由框架调度执行。这种设计带来模块化、易测试、支持并行及内置可视化调试等优势。
LlamaIndex 工作流
核心组件
理解核心概念前,把工作流比作智能工厂:事件是流转的'物料',步骤是处理物料的'工位',上下文则是共享的'信息白板'。
定义工作流事件
事件是基本通信单元。所有事件继承自 Event 类,本质是 Pydantic 模型,可携带结构化数据。
from llama_index.core.workflow import Event
from typing import List, Optional
class MessageEvent(Event):
content: str
class AnalysisEvent(Event):
topic: str
keywords: List[str]
confidence: float
框架提供两个特殊内置事件:
StartEvent:工作流入口,调用 workflow.run() 时参数自动封装成它。
StopEvent:工作流出口,返回它时流程终止,并返回 result 字段。
from llama_index.core.workflow import StartEvent, StopEvent
设置工作流类
创建继承自 Workflow 的类,定义各个步骤。
from llama_index.core.workflow import Workflow, step
from llama_index.llms.openai import OpenAI
class ():
llm = OpenAI(model=)
MyWorkflow
Workflow
"gpt-4o-mini"
工作流入口点
使用 @step 装饰器标记的方法是'步骤'。框架根据方法的参数类型注解和返回值类型注解,自动判断该步骤接收什么事件、产出什么事件。
from llama_index.core.workflow import step, StartEvent, StopEvent
class JokeWorkflow(Workflow):
llm = OpenAI(model="gpt-4o-mini")
@step
async def generate_joke(self, ev: StartEvent) -> JokeEvent:
topic = ev.topic
prompt = f"讲一个关于{topic}的笑话"
response = await self.llm.acomplete(prompt)
return JokeEvent(joke=str(response))
@step
async def critique_joke(self, ev: JokeEvent) -> StopEvent:
prompt = f"分析这个笑话的笑点:{ev.joke}"
response = await self.llm.acomplete(prompt)
return StopEvent(result=str(response))
工作流程退出
当任何步骤返回 StopEvent 时,整个工作流立即停止,并返回 StopEvent.result 的值。这意味着工作流可以有多个潜在的出口点——你可以根据不同条件在不同步骤中决定终止流程。
class ConditionalWorkflow(Workflow):
@step
async def step_one(self, ev: StartEvent) -> StopEvent | NextEvent:
if ev.input == "exit":
return StopEvent(result="流程提前结束")
return NextEvent(data=ev.input)
绘制工作流程
LlamaIndex 提供可视化工具帮助理解结构。需先安装额外包:
pip install llama-index-utils-workflow
from llama_index.utils.workflow import (
draw_all_possible_flows,
draw_most_recent_execution,
)
draw_all_possible_flows(JokeWorkflow, filename="workflow_structure.html")
workflow = JokeWorkflow()
await workflow.run(topic="程序员")
draw_most_recent_execution(workflow, filename="recent_execution.html")
动态执行图特别有用——你可以清楚看到哪些分支被实际执行了,哪一步耗时最长。
全局上下文/状态协同
当工作流变复杂时,单纯靠事件传递数据繁琐。例如步骤 A 和 D 都需要访问用户原始问题,若只通过事件传递,中间步骤都得负责转发。
Context 对象就是解决这个问题。它像全局'数据黑板',任何步骤可随时读取或写入数据。
from llama_index.core.workflow import Context
class StatefulWorkflow(Workflow):
@step
async def first_step(self, ctx: Context, ev: StartEvent) -> NextEvent:
await ctx.set("user_query", ev.query)
await ctx.set("start_time", datetime.now())
return NextEvent()
@step
async def second_step(self, ctx: Context, ev: NextEvent) -> StopEvent:
query = await ctx.get("user_query")
start_time = await ctx.get("start_time")
print(f"处理查询:{query}, 耗时:{datetime.now() - start_time}")
return StopEvent(result="完成")
注意:ctx.get() 和 ctx.set() 是异步方法,需要使用 await。
多事件等待
有时一个步骤需等待多个事件全部到达才能执行。例如同时获取'用户画像'和'商品推荐'结果再生成推送。Context 提供 collect_events() 方法。
class UserProfileEvent(Event):
profile: str
class RecommendationEvent(Event):
items: List[str]
class AggregationWorkflow(Workflow):
@step
async def fetch_profile(self, ctx: Context, ev: StartEvent) -> UserProfileEvent:
await asyncio.sleep(1)
return UserProfileEvent(profile="科技爱好者")
@step
async def fetch_recommendations(self, ctx: Context, ev: StartEvent) -> RecommendationEvent:
await asyncio.sleep(1)
return RecommendationEvent(items=["GPU", "机械键盘"])
@step
async def aggregate(self, ctx: Context, ev: StartEvent) -> StopEvent:
events = await ctx.collect_events(
ev, [UserProfileEvent, RecommendationEvent]
)
if events is None:
return None
profile_event, rec_event = events
result = f"为用户 {profile_event.profile} 推荐 {rec_event.items}"
return StopEvent(result=result)
手动事件触发
默认情况下,工作流自动将步骤返回值作为事件发送。若想在同一步骤发送多个事件或动态决定时机,可使用 ctx.send_event()。
class FanOutWorkflow(Workflow):
@step
async def broadcast(self, ctx: Context, ev: StartEvent) -> None:
for i in range(5):
ctx.send_event(ProcessEvent(index=i))
ctx.send_event(CompletionEvent())
@step
async def process(self, ctx: Context, ev: ProcessEvent) -> None:
print(f"处理任务 {ev.index}")
@step
async def on_complete(self, ctx: Context, ev: CompletionEvent) -> StopEvent:
return StopEvent(result="所有任务完成")
人机协同
人机协同(Human-in-the-loop)是常见需求。当模型不确定时,可暂停等待人工介入。通过 stepwise=True 模式或配合服务端框架实现。
handler = workflow.run(stepwise=True)
async for event in handler.stream_events():
if isinstance(event, HumanInputRequiredEvent):
user_input = await get_user_input(event.question)
handler.ctx.send_event(HumanResponseEvent(response=user_input))
await handler.run_step()
result = await handler
TypeScript 版本还可集成到 Hono/Express 等框架中,通过 API 端点实现交互。
逐步执行
逐步执行是调试利器。开启 stepwise 模式后,工作流每执行一个步骤就暂停,等待手动推进。
async def debug_workflow():
workflow = MyComplexWorkflow()
handler = workflow.run(stepwise=True)
events = await handler.run_step()
print(f"第一步产出的事件:{events}")
events = await handler.run_step()
print(f"第二步产出的事件:{events}")
final_result = await handler
print(f"最终结果:{final_result}")
检查点工作流程
对于长时间运行的工作流,崩溃不应从头开始。检查点(Checkpoint)机制允许保存完整状态并在以后恢复。
class CheckpointWorkflow(Workflow):
@step
async def critical_step(self, ctx: Context, ev: StartEvent) -> NextEvent:
result = await self.do_something()
await self.save_checkpoint(ctx, "after_critical_step")
return NextEvent(data=result)
workflow = CheckpointWorkflow()
handler = workflow.run()
try:
result = await handler
except Exception as e:
last_checkpoint = workflow.get_last_checkpoint()
new_handler = workflow.run_from(checkpoint=last_checkpoint)
result = await new_handler
部署工作流
工作流可轻松部署为 Web 服务。TypeScript 版本提供与 Hono 框架集成:
import { Hono } from "hono";
import { createHonoHandler } from "@llamaindex/workflow-core/interrupter/hono";
const app = new Hono();
app.post("/api/run", createHonoHandler(
myWorkflow,
async (ctx) => startEvent(await ctx.req.json()),
stopEvent
));
serve(app);
Python 版本也可使用 FastAPI 等框架封装。
Workflow 管道
基本工作流
最简单的线性工作流:StartEvent → Step1 → Step2 → StopEvent。
from llama_index.core.workflow import Workflow, step, StartEvent, StopEvent, Event
class Step1Event(Event):
data: str
class LinearWorkflow(Workflow):
@step
async def step_one(self, ev: StartEvent) -> Step1Event:
print(f"步骤 1 处理:{ev.input}")
return Step1Event(data=ev.input.upper())
@step
async def step_two(self, ev: Step1Event) -> StopEvent:
print(f"步骤 2 处理:{ev.data}")
return StopEvent(result=f"最终结果:{ev.data}")
async def main():
w = LinearWorkflow()
result = await w.run(input="hello world")
print(result)
工作流分支与循环
支持条件分支和循环,只需让步骤返回不同类型的事件。
class RouteEvent(Event):
value: int
class BranchAEvent(Event):
result: str
class BranchBEvent(Event):
result: str
class LoopEvent(Event):
counter: int
class BranchLoopWorkflow(Workflow):
@step
async def router(self, ev: StartEvent) -> RouteEvent:
return RouteEvent(value=ev.number)
@step
async def handle_positive(self, ev: RouteEvent) -> BranchAEvent | LoopEvent:
if ev.value > 0:
return BranchAEvent(result=f"正数:{ev.value}")
elif ev.value < 0:
return BranchBEvent(result=f"负数:{ev.value}")
else:
return LoopEvent(counter=1)
@step
async def retry_zero(self, ev: LoopEvent) -> StartEvent:
if ev.counter < 3:
print(f"遇到 0,第{ev.counter}次重试...")
return StartEvent(number=1)
return BranchAEvent(result="重试次数用尽,强制视为正数")
@step
async def finalize(self, ev: BranchAEvent | BranchBEvent) -> StopEvent:
return StopEvent(result=ev.result)
状态维护
Context 是维护状态的首选方式。它是可序列化的——可将工作流状态保存到磁盘或 Redis 后再恢复。
class StateMaintenanceWorkflow(Workflow):
@step
async def accumulate(self, ctx: Context, ev: StartEvent) -> StopEvent:
history = await ctx.get("history", default=[])
history.append(ev.message)
await ctx.set("history", history)
if len(history) >= 5:
return StopEvent(result=f"收集完毕:{history}")
return None
流媒体事件
LLM 生成长耗时操作,实时反馈进度能提升体验。ctx.write_event_to_stream() 允许向事件流写入中间进度,主流程异步监听。
class ProgressEvent(Event):
msg: str
class StreamingWorkflow(Workflow):
llm = OpenAI(model="gpt-4o-mini")
@step
async def generate(self, ctx: Context, ev: StartEvent) -> StopEvent:
ctx.write_event_to_stream(ProgressEvent(msg="开始生成..."))
full_response = ""
async for chunk in self.llm.astream_complete(ev.prompt):
full_response += chunk.delta
ctx.write_event_to_stream(ProgressEvent(msg=chunk.delta))
ctx.write_event_to_stream(ProgressEvent(msg="生成完成!"))
return StopEvent(result=full_response)
async def main():
w = StreamingWorkflow()
handler = w.run(prompt="写一首关于 AI 的诗")
async for ev in handler.stream_events():
if isinstance(ev, ProgressEvent):
print(f"进度:{ev.msg}", flush=True)
final = await handler
print(f"\n\n最终结果:\n{final}")
并发执行
工作流可轻松实现并发。当同一事件有多个步骤'感兴趣'时,它们会被并发执行。也可在单个步骤中手动发送多个事件触发并行处理。
class ParallelWorkflow(Workflow):
@step
async def kickoff(self, ev: StartEvent) -> ProcessEvent:
for i in range(3):
self.send_event(ProcessEvent(task_id=i, data=ev.data))
@step
async def worker(self, ev: ProcessEvent) -> ResultEvent:
await asyncio.sleep(1)
return ResultEvent(task_id=ev.task_id, result=f"任务{ev.task_id}完成")
@step
async def collector(self, ctx: Context, ev: ResultEvent) -> StopEvent | None:
results = await ctx.get("results", default=[])
results.append(ev.result)
await ctx.set("results", results)
if len(results) == 3:
return StopEvent(result=f"全部完成:{results}")
return None
子类化工作流
通过子类化已有工作流扩展行为,实现代码复用和定制化。
class BaseWorkflow(Workflow):
@step
async def common_step(self, ev: StartEvent) -> IntermediateEvent:
return IntermediateEvent(data=ev.input.strip())
@step
async def final_step(self, ev: IntermediateEvent) -> StopEvent:
return StopEvent(result=ev.data)
class CustomWorkflow(BaseWorkflow):
@step
async def final_step(self, ev: IntermediateEvent) -> StopEvent:
return StopEvent(result=f"自定义结果:{ev.data.upper()}")
嵌套工作流
工作流可嵌套调用——一个工作流的步骤中运行另一个工作流。构建模块化、可组合的系统。
class ReflectionWorkflow(Workflow):
@step
async def reflect(self, ev: StartEvent) -> StopEvent:
improved = f"改进后的查询:{ev.query} (经过反思优化)"
return StopEvent(result=improved)
class MainWorkflow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent, reflection_wf: Workflow) -> ProcessEvent:
improved = await reflection_wf.run(query=ev.query)
return ProcessEvent(query=improved)
@step
async def process(self, ev: ProcessEvent) -> StopEvent:
return StopEvent(result=f"最终处理:{ev.query}")
main = MainWorkflow()
main.add_workflows(reflection_wf=ReflectionWorkflow())
result = await main.run(query="初始查询")
print(result)
可视化工作流
除了生成静态图片,工具还可输出 HTML 交互式图表,适合在 Jupyter Notebook 中展示调试。
from IPython.display import HTML
draw_all_possible_flows(MyWorkflow, filename="temp.html")
with open("temp.html", "r") as f:
display(HTML(f.read()))
实战练习
选择题
-
在 LlamaIndex Workflow 中,哪个事件是工作流的默认出口点?
A. StartEvent B. StopEvent C. Context D. ProgressEvent
答案:B
-
以下哪个方法用于在工作流步骤之间共享全局数据?
A. send_event() B. collect_events() C. ctx.set() / ctx.get() D. write_event_to_stream()
答案:C
-
想要实时获取 LLM 生成过程中的中间结果,应该使用什么机制?
A. 检查点 (Checkpoint) B. 流式事件 (Streaming Events) C. 嵌套工作流 D. 手动事件触发
答案:B
填空题
-
使用 @step 装饰器标记的方法,其返回值的类型注解决定了该步骤产出什么 ''。
答案:事件 (Event)
-
要从检查点恢复工作流执行,应使用 ''方法。
答案:run_from()
简答题
-
简述 Context 对象和事件传递在数据共享上的区别,以及各自的适用场景。
参考答案:
- 事件传递适用于步骤间的'一次性的、需要触发下游逻辑'的数据传递,是工作流的驱动机制。
Context 适用于'多个步骤需要访问的、全局性的'数据,如配置参数、数据库连接、用户会话信息等。Context 避免了数据在长链条中逐层传递的繁琐,使代码更清晰。
-
什么情况下需要使用嵌套工作流而不是简单的步骤组合?
参考答案:
- 某个子功能逻辑复杂,本身就是一个完整的工作流,希望独立开发和测试
- 需要在运行时动态替换子流程的实现(多态)
- 子工作流可能被多个不同的父工作流复用
- 希望保持主工作流的简洁性,将细节封装在子工作流中
实操题
-
实现一个'智能客服'工作流,要求:
- 接收用户问题
- 第一步:判断问题类型(售后/售前/投诉)
- 第二步:根据类型调用不同的处理流程
- 第三步:生成最终回复
- 要求使用
Context 在整个流程中记录日志
参考答案:
from llama_index.core.workflow import Workflow, step, StartEvent, StopEvent, Event, Context
from enum import Enum
from datetime import datetime
class QuestionType(Enum):
AFTER_SALES = "售后"
PRE_SALES = "售前"
COMPLAINT = "投诉"
class ClassifyEvent(Event):
qtype: QuestionType
question: str
class AfterSalesEvent(Event):
question: str
class PreSalesEvent(Event):
question: str
class ComplaintEvent(Event):
question: str
class CustomerServiceWorkflow(Workflow):
@step
async def classify(self, ctx: Context, ev: StartEvent) -> ClassifyEvent:
logs = await ctx.get("logs", default=[])
logs.append(f"[{datetime.now()}] 收到问题:{ev.question}")
await ctx.set("logs", logs)
question = ev.question.lower()
if "退货" in question or "维修" in question:
qtype = QuestionType.AFTER_SALES
elif "多少钱" in question or "价格" in question:
qtype = QuestionType.PRE_SALES
else:
qtype = QuestionType.COMPLAINT
return ClassifyEvent(qtype=qtype, question=ev.question)
@step
async def handle_after_sales(self, ctx: Context, ev: ClassifyEvent) -> AfterSalesEvent:
if ev.qtype != QuestionType.AFTER_SALES:
return None
logs = await ctx.get("logs")
logs.append(f"[{datetime.now()}] 进入售后流程")
await ctx.set("logs", logs)
return AfterSalesEvent(question=ev.question)
@step
async def handle_pre_sales(self, ctx: Context, ev: ClassifyEvent) -> PreSalesEvent:
if ev.qtype != QuestionType.PRE_SALES:
return None
logs = await ctx.get("logs")
logs.append(f"[{datetime.now()}] 进入售前流程")
await ctx.set("logs", logs)
return PreSalesEvent(question=ev.question)
@step
async def handle_complaint(self, ctx: Context, ev: ClassifyEvent) -> ComplaintEvent:
if ev.qtype != QuestionType.COMPLAINT:
return None
logs = await ctx.get("logs")
logs.append(f"[{datetime.now()}] 进入投诉流程(升级处理)")
await ctx.set("logs", logs)
return ComplaintEvent(question=ev.question)
@step
async def generate_response(self, ctx: Context, ev: AfterSalesEvent | PreSalesEvent | ComplaintEvent) -> StopEvent:
if isinstance(ev, AfterSalesEvent):
response = f"【售后】关于「{ev.question}」,请提供订单号,我们将为您安排退货/维修。"
elif isinstance(ev, PreSalesEvent):
response = f"【售前】关于「{ev.question}」,我们的产品售价为 299 元,当前有优惠活动。"
else:
response = f"【投诉】非常抱歉给您带来不便,关于「{ev.question}」,已转接人工客服,请稍候。"
logs = await ctx.get("logs")
logs.append(f"[{datetime.now()}] 生成回复:{response}")
await ctx.set("logs", logs)
print("\n=== 处理日志 ===")
for log in logs:
print(log)
print("===============\n")
return StopEvent(result=response)
async def test():
w = CustomerServiceWorkflow()
result = await w.run(question="我的产品坏了,想退货")
print(f"最终回复:{result}")
总结
LlamaIndex Workflows 提供了优雅的框架来编排复杂的 AI 应用。通过事件驱动的架构,我们可以将业务逻辑拆解为独立、可测试的步骤,并通过 Context 实现跨步骤的状态共享。可视化工具和检查点机制则为调试和生产环境下的稳定性提供了有力保障。
从简单的线性 RAG 流水线,到复杂的多智能体协作系统,再到需要人工介入的审批流程,Workflows 都能胜任。掌握这一工具,意味着你在构建 LLM 应用时,不再需要在'灵活性'和'可控性'之间做取舍——两者可以兼得。
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- 随机西班牙地址生成器
随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
- curl 转代码
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online