AI Agent 框架:工作流与 Agent 运行时模块设计
探讨了 AI Agent 框架中运行时模块的核心设计。通过定义服务接口、执行计划、上下文管理及调度机制,实现了支持简单工作流、智能流程及多智能体协作的灵活架构。重点阐述了基于 Rust 语言的异步并发实现,包括线程安全的数据共享、状态流转控制以及错误恢复策略。该设计旨在提供无限嵌套的能力,满足开放域场景下的复杂业务需求,同时确保系统的高可用性与可扩展性。

探讨了 AI Agent 框架中运行时模块的核心设计。通过定义服务接口、执行计划、上下文管理及调度机制,实现了支持简单工作流、智能流程及多智能体协作的灵活架构。重点阐述了基于 Rust 语言的异步并发实现,包括线程安全的数据共享、状态流转控制以及错误恢复策略。该设计旨在提供无限嵌套的能力,满足开放域场景下的复杂业务需求,同时确保系统的高可用性与可扩展性。

在上一篇文章中,我们大致演示了 ai_agent 的基本使用方法。本文重点深入核心模块 runtime(运行时)的设计与实现。
无论是单个 Agent 还是 Workflow,其基础功能的实现相对简单。真正的挑战在于如何将它们有机地组合起来,使其按照特定的逻辑流转,并支持层层嵌套,从而实现无限的能力扩展。
为了更直观地理解这一概念,我们先分析几个典型的应用场景及其对应的流程设计需求。
我们可以使用一个简单的 Workflow 来处理特定场景,例如奶茶店的智能推荐系统。其基本流程如下:
graph LR
User --用户画像+query+上下文--> Prompt
Prompt --> LLM
LLM --> Answer
Answer --> Card
在这个流程中,我们将 LLM、Memory、Prompt 等视为独立的节点,按照图中定义的顺序依次执行,最终得到推荐结果。这是 Workflow 的基础功能体现,适用于线性任务。
对于某些复杂问题,我们无法预先完全预测执行流程。例如 Least-to-Most 和 Smartflow 场景,这些节点往往是在执行过程中动态生成的。
以'评阅大师'Agent 为例,目标是优化输入文案。其工作流程可能包含一个评分 LLM 和一个优化 LLM。评分 LLM 不断给出需要优化的点并追加到上下文中,优化 LLM 根据要求不断优化,直到评分达到阈值(如>80)。
graph TD
User -->|文案| PF[评分 LLM]
PF -->|评分>80| Output
PF -->|评分<80\n待优化方向| YHP[追加上下文]
YHP --> YH[优化 LLM]
YH --> PF
这种设计表明,我们的流程不能仅仅是有向无环图(DAG),因为可能存在循环依赖。运行时必须支持动态节点生成和状态回溯。
Multi-Agent 常用于解决复杂问题,对流程设计的要求更高。典型的场景是狼人杀游戏。流程可能涉及多个角色交互:
graph TD
Start[入夜] --> Host[主持人]
Host -->|第一轮| Wolf[狼人开刀]
Wolf --> Host
Host -->|第二轮| Witch[女巫药]
Witch --> Host
Host -->|第三轮| Seer[预言家查人]
Seer --> Host
Host -->|第四轮| Talk[所有人发言]
Talk --> Vote[归票]
Vote --> Host
Host --> End[结束]
虽然上图游戏规则并不严谨,但能感受到 Multi-Agent 的工作方式。每个角色都是一个独立的 Agent,拥有自己的 Prompt 和 Memory。主持人可以是固定的脚本或另一个 Agent。这个场景要求流程设计能够层层嵌套,将一个 Agent 放大后,它本身也应该是一个由无数节点拼接而来的 Workflow。
保持开放是至关重要的能力。一方面允许程序员直接编写脚本,另一方面大模型也能自行生成代码(NL2Code)。
例如在文档处理场景中,用户自由操作文档(归类、摘取),结果通过 text2sql 存入数据库。仅仅提供有限功能的节点是不够的,必须具备无限扩展的能力,允许运行时调用外部工具或解释器。
在开放域中,Agent 将具备更自由的意志。典型场景包括聊天室、游戏 NPC、虚拟宠物等。
在这种应用中,Agent 应被视为一个完备的 AI,具备更长的生命周期和更强的主观能动性。从运行到结束,要像人一样,生下来就有意识,直到死亡。
简单的做法是先构建一个 Single Agent,赋予其 Memory、Tool 和设定模块,采用多模态大模型做基座,然后不断循环调用。另一种方法是双循环:一个循环响应外界输入,另一个循环定时唤起 Agent 的主观意识,从而节能减排。
一个 Agent 的 Runtime 可以看作由调度、节点服务、执行计划和上下文四部分组成。
Service 可以理解为一个原子能力,比如 LLM 调用、工具函数或自定义 Function。我们需要对其进行抽象,确保线程安全和异步执行。
pub trait Service: Send + Sync {
/// 执行服务调用
async fn call(&self, flow: Flow) -> anyhow::Result<Output>;
}
这里的关键约束是 Send + Sync,这意味着 Service 可以在多线程环境中安全共享和传递。Flow 包含了当前执行的上下文信息,Output 是标准化的返回结果。
执行计划定义了 Service 的执行顺序和逻辑分支。我们需要按照计划不断执行 Service,并支持动态更新。
pub trait Plan: Send + Sync {
/// 获取下一个节点
fn next(&self, ctx: Arc<Context>, node_id: &str) -> NextNodeResult;
/// 设置节点列表
fn set(&self, nodes: Vec<PlanNode>);
/// 动态更新节点配置
fn update(
&self,
node_code: &str,
update: Box<dyn FnOnce(Option<&mut PlanNode>) -> anyhow::Result<()>>,
) -> anyhow::Result<()>;
}
next 方法负责根据当前上下文决定下一步走向,支持条件分支和循环。update 方法允许在运行时修改计划,这对于 Smartflow 场景至关重要。
Context 理解为执行一个任务的具体消息,包括任务编码、堆栈信息、状态等。它支持父子上下文的概念,即在一个 Service 内部可以执行子计划,父子上下文共享堆栈和部分状态。
pub struct Context {
/// 父任务编码,用于追踪调用链
pub parent_code: Option<String>,
/// 任务流名称
pub code: String,
/// 状态:0:init 1:running 2:success 3:error
pub status: AtomicU8,
/// 堆栈信息,保护并发访问
pub stack: Arc<Mutex<ContextStack>>,
/// 执行计划
pub plan: Arc<dyn Plan>,
/// 全局扩展字段,用于存储临时数据
pub extend: Mutex<HashMap<String, Box<dyn Any + Send + Sync + 'static>>>,
/// 结束时回调
pub over_callback: Option<Mutex<Vec<Box<dyn FnOnce(Arc<Context>) + Send + Sync + 'static>>>>,
/// 关联的运行时实例
pub(crate) runtime: Arc<Runtime>,
}
每个 Context 记录自己归属的运行时,确保多租户环境下的隔离性。AtomicU8 保证了状态变更的原子性,避免竞态条件。
调度器按照 Plan 不断执行 Service。这里做了中间层设计,每个 Service 都是'洋葱'结构,一层层进,一层层出。所有 Service 都是异步执行的,如果 Plan 存在并行结构,两个分支会同时执行。
fn exec_next_node(ctx: Arc<Context>, node_code: &str) {
let nodes = ctx.plan.next(ctx.clone(), node_code);
for i in nodes {
match ctx.runtime.nodes.get(i.node_type_id.as_str()) {
Some(service) => {
// 将 service 和 middle 封装成一个 flow 执行
let flow = Flow::new(i, ctx.clone(), middle);
tokio::spawn(async move {
let result = flow.call().await;
match result {
Ok(_) => {
// 执行成功则继续执行下一个
Runtime::exec_next_node(ctx, code.as_str());
}
Err(e) => {
// 错误处理与恢复
handle_error(ctx, e);
}
}
});
}
None => {
log::error!("Unknown node type: {}", i.node_type_id);
}
}
}
}
为了方便 Service 的实现,我们将 Service 再包装一层,提供自动 JSON 解析功能。这样入参和出参可以直接绑定到 Struct,减少样板代码。
pub trait ServiceLayer: Sync + Send {
type Config;
type Output;
async fn call(
&self,
code: String,
ctx: Arc<Context>,
cfg: Self::Config,
) -> anyhow::Result<Self::Output>;
}
impl<T, In, Out> Service for LayerJson<T, In, Out>
where
T: ServiceLayer<Config = In, Output = Out>,
In: for<'a> Deserialize<'a> + Send + Sync,
Out: Serialize + Send + Sync,
{
async fn call(&self, flow: Flow) -> anyhow::Result<Output> {
// 解析入参
let cfg = serde_json::from_str::<In>(flow.config.as_str())?;
// 调用执行过程
let output = self.handle.call(flow.id, flow.ctx, cfg).await?;
// 解析出参
let raw = serde_json::to_value(&output)?;
Ok(Output::new(raw))
}
}
在 Rust 实现的 Runtime 中,并发安全是核心考量。通过使用 Arc 和 Mutex,我们确保了上下文在多线程间的共享访问安全。
Arc 包裹 Context 和 Plan,允许多个协程持有引用而不增加拷贝开销。ContextStack 使用 Mutex 保护,但在高并发场景下,可考虑细粒度锁或无锁数据结构来减少竞争。anyhow::Result 统一错误类型,便于在异步任务间传递错误信息并进行统一处理。为了支持未来的业务扩展,Runtime 设计了以下机制:
update 方法支持在不重启服务的情况下调整流程逻辑。本设计通过模块化拆分,实现了灵活且强大的 AI Agent 运行时。支持从简单线性流程到复杂多智能体协作的各种场景。基于 Rust 的高性能特性,该方案在保证并发安全的同时,提供了良好的扩展性和可维护性,为构建企业级 AI 应用奠定了坚实基础。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online