跳到主要内容
极客日志极客日志面向AI+效率的开发者社区
首页博客GitHub 精选镜像工具UI配色美学隐私政策关于联系
搜索内容 / 工具 / 仓库 / 镜像...⌘K搜索
注册
博客列表
RustAI算法

从零写 Agent 框架:Workflow 与 Runtime 模块设计

综述由AI生成详细阐述了 AI Agent 框架中 Runtime 模块的核心设计与实现方案。内容涵盖 Workflow 与 Multi-agent 的典型场景分析,提出了基于 Service、Plan、Context 和 Scheduler 的四层架构模型。重点介绍了 Rust 语言下的 Trait 抽象、异步执行流程、JSON 序列化封装以及异常处理与性能优化策略。此外,补充了针对执行流程、并发场景的单元测试代码示例,旨在帮助开发者构建具备高扩展性与稳定性的 Agent 运行环境。

追风少年发布于 2025/2/7更新于 2026/6/320 浏览
从零写 Agent 框架:Workflow 与 Runtime 模块设计

前言

上篇文章我们大致演示了一下 ai_agent 的食用方法。这里我们做一下核心模块 runtime 的设计和实现。

一个 agent 也好,workflow 也好,它们单个实现起来并不复杂,困难的是如何将它们有机地组合起来,能够按照一定的逻辑流转起来。并且能够层层嵌套,能力无限。

理解起来比较抽象,我们先看几个重点方向,和现实中的例子:

1. 简单的 workflow 场景

我们可以使用一个简单的 workflow 来处理一些场景,比如奶茶店的智能推荐,那么它的一个流程大致如下:

graph LR
User --用户画像+query+上下文+其他--> prompt
prompt --> llm 
llm --> answer
answer --> 奶茶卡片
  • 这个流程就能看到,我们将 LLM、Memory、Prompt 都当做一个节点,按照图中的顺序执行最终会得到一个推荐结果。
  • 可以说这是一个 workflow 的基础功能。

2. Smartflow 场景

对于有些问题,我们是不能够将完全预测出执行流程,比如 Least-to-Most 和 smartflow 的场景,这些节点往往是在执行过程中慢慢生成出来的。

我们还是举个例子,假设有这样一个 agent:评阅大师,它的目标是评阅优化输入文案。那么它的工作流程可能是这样的。

graph TD
user -->|文案 | pf[评分 LLM]
pf -->|评分>80| 输出
pf -->|评分<80 \n 待优化方向 | yhp[追加上下文]
yhp --> yh[优化 LLM]
yh --> pf
  • 工作方式一目了然,可以理解为一个评分 LLM,一个优化 LLM。对于一个文案,评分 LLM 不断给出需要优化的点,并追加到上下文中,优化 LLM 则根据要求不断优化,直到评分>80。
  • 我们的流程设计肯定不能是 DAG,因为出现环了。

3. Multi-agent 场景

multi-agent 常用来解决复杂问题,对流程设计的要求也更高。比较典型的场景是狼人杀。它的流程可能长这样。

graph TD
xdyt[入夜] --> zcr
zcr[主持人] -->|第一轮 | lrfy[狼人开刀]
lrfy --> zcr
zcr[主持人 agent] -->|第二轮 | nwfy[女巫药]
nwfy --> zcr
zcr[主持人 agent] -->|第三轮 | yyjfy[预言家查人]
yyjfy --> zcr
zcr[主持人 agent] -->|第四轮 | syrfy[所有人发言]
syrfy --> gp[归票]
gp --> zcr
zcr -->|某个阵容胜利 | 结束
  • 上图画的游戏规则并不严谨,但大体能感受到一个 multi-agent 的工作方式。这里面的每个角色都可以理解为一个 agent,主持人可以是一个固定的脚本。
  • 这个场景要求每个 agent 都有自己的 prompt,有独立的 memory 等,将一个 agent 放大,那么这个 agent 也应该是一个 workflow,由无数的节点拼接而来。也就要求我们流程设计能够层层嵌套,能力无限。

4. NL2Code 场景

保持开放是一个非常重要的能力,一来是能够让程序员直接写脚本。二是大模型有时候也会自己写脚本,也就是 NL2Code 场景。

我之前参与过一个专项,其中的一个能力是用户自由操作文档,比如文档归类,概要摘取等,结果通过 text2sql 存到数据库中。

这种场景下,仅仅提供有限功能的节点是不足够的。必须具备无限扩展的能力。

5. 开放域

在开放域中,agent 将具备更自由的意志。举几个典型的场景:聊天室 游戏 NPC 虚拟宠物。

在这种应用中,agent 更应该是一个完备的 AI,具备更长的生命周期,更强的主观能动性。

agent 从运行到结束,要像人一样,生下来就有意识,直到死亡。

简单的做法是先构建一个 single agent,并让它至少有个 memory+tool+设定这几个模块,最好是采用多模态大模型做基座模型。然后不断循环调用这个 agent,从而达到和我一样的牛马状态。当然成本肯定极高。

另一种方法是做双循环,还是这个 agent,一个循环是外界有输入后再调用 agent,另一个循环是定时唤起 agent 的主观意识。从而节能减排。

不管是哪种方式,我更倾向于从 agent 外部解决。而非 agent 本身。

总结

现实中应用肯定不局限于这几种情况,但通过一定的流程编辑基本都可以解决,只是复杂性会比较高。

Runtime 设计

一个 agent 的 runtime 大概可以看成是由这几部分构成:调度 + 节点服务 + 执行计划 + 上下文。

节点 Service

一个 service 可以理解成一个原子能力,比如 LLM,比如 tool,也可能是 function。

我们这里做一个抽象:

pub trait Service: Send + Sync {
    async fn call(&self, flow: Flow) -> anyhow::Result<Output>;
}

执行计划

这个也很好理解,我们需要按照这个计划不断执行 service。

pub trait Plan: Send + Sync {
    fn next(&self, ctx: Arc<Context>, node_id: &str) -> NextNodeResult;
    fn set(&self, nodes: Vec<PlanNode>);
    fn update(
        &self,
        node_code: &str,
        update: Box<dyn FnOnce(Option<&mut PlanNode>) -> anyhow::Result<()>>,
    ) -> anyhow::Result<()>;
}

上下文 Context

这个所谓的上下文 Context 可以理解为 执行一个任务的具体消息,比如任务编码,堆栈信息,状态等。

  • 这里有一个子上下文的概念,也就是说在一个 service 里面可以执行一个子计划,也就是能力无限的实现。并且父子上下文会共享堆栈。todo: 也会共享状态。
  • 这里没有做共享设计,每个 Context 会记录自己归属的运行时。也就是每个运行时都是独立的,为了多租户设计的。
pub struct Context {
    pub parent_code: Option<String>,
    // 任务流名称
    pub code: String,
    // 状态
    pub status: AtomicU8, // 0:init 1:running, 2:success, 3:error
    // 堆栈信息
    pub stack: Arc<Mutex<ContextStack>>,
    // 执行计划
    pub plan: Arc<dyn Plan>,
    // 全局扩展字段
    pub extend: Mutex<HashMap<String, Box<dyn Any + Send + Sync + 'static>>>,
    // 结束时回调
    pub over_callback: Option<Mutex<Vec<Box<dyn FnOnce(Arc<Context>) + Send + Sync + 'static>>>>,
    pub(crate) runtime: Arc<Runtime>,
}

调度

调度也很简单,就是按照 plan 不断执行 service。

  • 这里做了中间层设计,也就是每个 service 都是一个'洋葱',一层层进,一层层出。
  • 每个 service 都是异步执行的,就是说如果你的 plan 存在并行结构,那么两个并行的分支会一起执行。
fn exec_next_node(ctx: Arc<Context>, node_code: &str) {
    let nodes = ctx.plan.next(ctx.clone(), node_code);
    ...
    for i in nodes {
        match ctx.runtime.nodes.get(i.node_type_id.as_str()) {
            ... // 获取服务实例
        };
        // 将 service 和 middle 封装成一个 flow 执行
        let flow = Flow::new(i, ctx.clone(), middle);
        tokio::spawn(async move {
            let ctx = flow.ctx.clone();
            let result = tokio::spawn(async move {
                // 处理一些堆栈信息
                // 执行 flow
                if let Err(e) = flow.call().await {
                    // 错误处理
                } else {
                    // 执行成功则继续执行下一个
                    Runtime::exec_next_node(ctx, code.as_str());
                }
            })
            .await;
            // 检查错误,和故障恢复
        });
    }
}

Service Layer

为了方便 service 的实现,我们将 service 再包装一层。如下:

pub trait ServiceLayer: Sync + Send {
    // service 的配置类型
    type Config;
    // 输出结果
    type Output;
    // 执行过程
    async fn call(
        &self,
        code: String,
        ctx: Arc<Context>,
        cfg: Self::Config,
    ) -> anyhow::Result<Self::Output>;
}

再为这个包装层加一个自动 json 解析的实现。这样入参和出参可以直接绑定到 struct。

impl<T, In, Out> Service for LayerJson<T, In, Out>
where
    T: ServiceLayer<Config = In, Output = Out>,
    In: for<'a> Deserialize<'a> + Send + Sync,
    Out: Serialize + Send + Sync,
{
    async fn call(&self, flow: Flow) -> anyhow::Result<Output> {
        // 类型解析和参数的初步处理
        // 解析入参
        let cfg = match serde_json::from_str::<In>(node_config.as_str()) {
            Ok(c) => c,
            Err(e) => return Err(anyhow!("config parse error: {}", e)),
        };
        // 调用执行过程
        let output = self.handle.call(code, ctx, cfg).await?;
        // 解析出参
        let raw = match serde_json::to_value(&output) {
            Ok(o) => o,
            Err(e) => return anyhow::anyhow!("code[{}],output json error:{}", node_info, e).into(),
        };
        Output::new(raw).raw_to_ctx().ok()
    }
}

异常处理与恢复

在复杂的 Agent 运行环境中,异常处理至关重要。除了基本的 Result 传播外,还需要考虑以下策略:

  1. 重试机制:对于网络波动或临时性 LLM 超时,应配置指数退避重试。
  2. 熔断机制:当某个 Service 连续失败超过阈值,暂时停止该节点的调度,防止雪崩效应。
  3. 状态回滚:在执行关键业务逻辑前保存 Checkpoint,若后续步骤失败,可尝试回滚至上一安全状态。
pub enum RetryStrategy {
    Fixed(u32),
    ExponentialBackoff { base_ms: u64, max_times: u32 },
}

impl RetryStrategy {
    pub async fn execute<F, Fut, T>(&self, func: F) -> anyhow::Result<T>
    where
        F: Fn() -> Fut,
        Fut: Future<Output = anyhow::Result<T>>,
    {
        // 实现重试逻辑
        todo!()
    }
}

性能考量

为了保证高并发下的稳定性,Runtime 设计还需关注以下性能点:

  • 内存管理:使用 Arc 共享不可变数据,减少拷贝。注意 Mutex 的持有时间,避免长时间阻塞。
  • 异步调度:利用 tokio 的多线程特性,将 IO 密集型任务(如 LLM 调用)与 CPU 密集型任务(如 Prompt 组装)分离。
  • 资源隔离:不同租户或任务的 Context 应严格隔离,防止数据泄露或资源争抢。

测试

单元测试是保证 Runtime 稳定性的关键。我们需要验证 Plan 的执行顺序、Context 的状态流转以及 Service 的调用逻辑。

Mock Service 实现

首先定义一个用于测试的 Mock Service:

struct MockService;

#[async_trait]
impl Service for MockService {
    async fn call(&self, flow: Flow) -> anyhow::Result<Output> {
        println!("MockService executed with input: {:?}", flow.input);
        Ok(Output::new("mock_result"))
    }
}

执行流程测试

使用 tokio::test 编写集成测试,模拟一个简单的线性流程:

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_simple_flow() {
        let runtime = Arc::new(Runtime::default());
        let mut context = Context::new("test_flow_001", runtime.clone());
        
        // 注册 Mock Service
        runtime.register_service("mock", Arc::new(MockService));
        
        // 设置执行计划
        let plan = SimplePlan::new(vec!["mock".to_string()]);
        context.set_plan(plan);
        
        // 启动执行
        let result = Runtime::exec_next_node(context, "mock").await;
        
        assert_eq!(context.status.load(Ordering::SeqCst), 2); // success
    }
}

并发场景测试

验证并行分支的正确执行:

#[tokio::test]
async fn test_parallel_branches() {
    let runtime = Arc::new(Runtime::default());
    let context = Arc::new(Context::new("parallel_test", runtime.clone()));
    
    // 模拟两个并行节点
    let nodes = vec!["service_a", "service_b"];
    
    // 实际测试中需确保两个任务同时完成且互不干扰
    // ... 省略具体实现细节
}

结语

整体 runtime 模块的设计核心在于解耦与扩展性。通过将 Service、Plan 和 Context 抽象化,我们能够灵活应对从简单工作流到复杂多智能体协作的各种场景。未来的优化方向包括引入更细粒度的权限控制、支持动态热更新 Plan 以及增强对分布式环境的适配能力。

希望这篇关于 Agent 框架 Runtime 设计的文章能为你构建自己的 AI 系统提供有价值的参考。

目录

  1. 前言
  2. 1. 简单的 workflow 场景
  3. 2. Smartflow 场景
  4. 3. Multi-agent 场景
  5. 4. NL2Code 场景
  6. 5. 开放域
  7. 总结
  8. Runtime 设计
  9. 节点 Service
  10. 执行计划
  11. 上下文 Context
  12. 调度
  13. Service Layer
  14. 异常处理与恢复
  15. 性能考量
  16. 测试
  17. Mock Service 实现
  18. 执行流程测试
  19. 并发场景测试
  20. 结语
  • 💰 8折买阿里云服务器限时8折了解详情
  • Magick API 一键接入全球大模型注册送1000万token查看
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • Java 房屋租赁系统的设计与实现
  • Anything XL 教程:Streamlit 实时预览与生成进度可视化
  • nanobot 通过 webhook 对接钉钉和飞书实现跨平台消息同步
  • Python 全栈开发:FastAPI 高性能后端开发
  • Qwen3.5 开源发布:国产大模型性能与技术架构深度解读
  • AI 时代如何提前预见未来:技术人的前瞻性思维与职业策略
  • 网络安全入门教程:从基础理论到渗透测试实战
  • Web 自动化测试实战:常用函数全解析与场景化应用指南
  • 前端文件上传最佳实践:分片、断点续传及拖拽
  • 【花雕学编程】Arduino BLDC 之基于串口指令的远程控制工业巡检机器人
  • Windows 下安装与配置 ZeroClaw 本地机器人
  • 位运算实战:判断字符唯一性与查找丢失数字
  • 黑客技术零基础入门学习指南
  • 采摘机器人毕业设计:基于 ROS 2 与 STM32 的感知控制实现
  • PointWorld:面向野外机器人操作的 3D 世界模型规模化
  • 基于 Neo4j 与 py2neo 的知识图谱搭建实战
  • 联邦学习架构解析:多医院协作训练 AI 模型方案
  • Docker Compose 多实例 Tomcat 部署示例
  • 微信支付接入密码输入后转圈失败问题排查
  • 前端动画库对比与实战指南:GSAP / Lottie / Swiper / AOS

相关免费在线工具

  • 加密/解密文本

    使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online

  • RSA密钥对生成器

    生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online

  • Mermaid 预览与可视化编辑

    基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online

  • 随机西班牙地址生成器

    随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online

  • Gemini 图片去水印

    基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online