前言
上篇文章我们大致演示了一下 ai_agent 的食用方法。这里我们做一下核心模块 runtime 的设计和实现。
一个 agent 也好,workflow 也好,它们单个实现起来并不复杂,困难的是如何将它们有机地组合起来,能够按照一定的逻辑流转起来。并且能够层层嵌套,能力无限。
理解起来比较抽象,我们先看几个重点方向,和现实中的例子:
1. 简单的 workflow 场景
我们可以使用一个简单的 workflow 来处理一些场景,比如奶茶店的智能推荐,那么它的一个流程大致如下:
graph LR
User --用户画像+query+上下文+其他--> prompt
prompt --> llm
llm --> answer
answer --> 奶茶卡片
- 这个流程就能看到,我们将 LLM、Memory、Prompt 都当做一个节点,按照图中的顺序执行最终会得到一个推荐结果。
- 可以说这是一个 workflow 的基础功能。
2. Smartflow 场景
对于有些问题,我们是不能够将完全预测出执行流程,比如 Least-to-Most 和 smartflow 的场景,这些节点往往是在执行过程中慢慢生成出来的。
我们还是举个例子,假设有这样一个 agent:评阅大师,它的目标是评阅优化输入文案。那么它的工作流程可能是这样的。
graph TD
user -->|文案 | pf[评分 LLM]
pf -->|评分>80| 输出
pf -->|评分<80 \n 待优化方向 | yhp[追加上下文]
yhp --> yh[优化 LLM]
yh --> pf
- 工作方式一目了然,可以理解为一个评分 LLM,一个优化 LLM。对于一个文案,评分 LLM 不断给出需要优化的点,并追加到上下文中,优化 LLM 则根据要求不断优化,直到评分>80。
- 我们的流程设计肯定不能是 DAG,因为出现环了。
3. Multi-agent 场景
multi-agent 常用来解决复杂问题,对流程设计的要求也更高。比较典型的场景是狼人杀。它的流程可能长这样。
graph TD
xdyt[入夜] --> zcr
zcr[主持人] -->|第一轮 | lrfy[狼人开刀]
lrfy --> zcr
zcr[主持人 agent] -->|第二轮 | nwfy[女巫药]
nwfy --> zcr
zcr[主持人 agent] -->|第三轮 | yyjfy[预言家查人]
yyjfy --> zcr
zcr[主持人 agent] -->|第四轮 | syrfy[所有人发言]
syrfy --> gp[归票]
gp --> zcr
zcr -->|某个阵容胜利 | 结束
- 上图画的游戏规则并不严谨,但大体能感受到一个
multi-agent 的工作方式。这里面的每个角色都可以理解为一个 agent,主持人可以是一个固定的脚本。
- 这个场景要求每个 agent 都有自己的 prompt,有独立的 memory 等,将一个 agent 放大,那么这个 agent 也应该是一个 workflow,由无数的节点拼接而来。也就要求我们流程设计能够层层嵌套,能力无限。
4. NL2Code 场景
保持开放是一个非常重要的能力,一来是能够让程序员直接写脚本。二是大模型有时候也会自己写脚本,也就是 NL2Code 场景。
我之前参与过一个专项,其中的一个能力是用户自由操作文档,比如文档归类,概要摘取等,结果通过 text2sql 存到数据库中。
这种场景下,仅仅提供有限功能的节点是不足够的。必须具备无限扩展的能力。
5. 开放域
在开放域中,agent 将具备更自由的意志。举几个典型的场景:聊天室 游戏 NPC 虚拟宠物。
在这种应用中,agent 更应该是一个完备的 AI,具备更长的生命周期,更强的主观能动性。
agent 从运行到结束,要像人一样,生下来就有意识,直到死亡。
简单的做法是先构建一个 single agent,并让它至少有个 memory+tool+设定这几个模块,最好是采用多模态大模型做基座模型。然后不断循环调用这个 agent,从而达到和我一样的牛马状态。当然成本肯定极高。
另一种方法是做双循环,还是这个 agent,一个循环是外界有输入后再调用 agent,另一个循环是定时唤起 agent 的主观意识。从而节能减排。
不管是哪种方式,我更倾向于从 agent 外部解决。而非 agent 本身。
总结
现实中应用肯定不局限于这几种情况,但通过一定的流程编辑基本都可以解决,只是复杂性会比较高。
Runtime 设计
一个 agent 的 runtime 大概可以看成是由这几部分构成:调度 + 节点服务 + 执行计划 + 上下文。
节点 Service
一个 service 可以理解成一个原子能力,比如 LLM,比如 tool,也可能是 function。
我们这里做一个抽象:
pub trait Service: Send + Sync {
async fn call(&self, flow: Flow) -> anyhow::Result<Output>;
}
执行计划
这个也很好理解,我们需要按照这个计划不断执行 service。
pub trait Plan: Send + Sync {
fn next(&self, ctx: Arc<Context>, node_id: &str) -> NextNodeResult;
fn set(&self, nodes: Vec<PlanNode>);
fn update(
&self,
node_code: &str,
update: Box<dyn FnOnce(Option<&mut PlanNode>) -> anyhow::Result<()>>,
) -> anyhow::Result<()>;
}
上下文 Context
这个所谓的上下文 Context 可以理解为 执行一个任务的具体消息,比如任务编码,堆栈信息,状态等。
- 这里有一个子上下文的概念,也就是说在一个
service 里面可以执行一个子计划,也就是能力无限的实现。并且父子上下文会共享堆栈。todo: 也会共享状态。
- 这里没有做共享设计,每个
Context 会记录自己归属的运行时。也就是每个运行时都是独立的,为了多租户设计的。
pub struct Context {
pub parent_code: Option<String>,
pub code: String,
pub status: AtomicU8,
pub stack: Arc<Mutex<ContextStack>>,
pub plan: Arc<dyn Plan>,
pub extend: Mutex<HashMap<String, Box<dyn Any + Send + Sync + 'static>>>,
pub over_callback: Option<Mutex<Vec<Box<dyn FnOnce(Arc<Context>) + Send + Sync + 'static>>>>,
pub(crate) runtime: Arc<Runtime>,
}
调度
调度也很简单,就是按照 plan 不断执行 service。
- 这里做了中间层设计,也就是每个 service 都是一个'洋葱',一层层进,一层层出。
- 每个 service 都是异步执行的,就是说如果你的
plan 存在并行结构,那么两个并行的分支会一起执行。
fn exec_next_node(ctx: Arc<Context>, node_code: &str) {
let nodes = ctx.plan.next(ctx.clone(), node_code);
...
for i in nodes {
match ctx.runtime.nodes.get(i.node_type_id.as_str()) {
...
};
let flow = Flow::new(i, ctx.clone(), middle);
tokio::spawn(async move {
let ctx = flow.ctx.clone();
let result = tokio::spawn(async move {
if let Err(e) = flow.call().await {
} else {
Runtime::exec_next_node(ctx, code.as_str());
}
})
.await;
});
}
}
Service Layer
为了方便 service 的实现,我们将 service 再包装一层。如下:
pub trait ServiceLayer: Sync + Send {
type Config;
type Output;
async fn call(
&self,
code: String,
ctx: Arc<Context>,
cfg: Self::Config,
) -> anyhow::Result<Self::Output>;
}
再为这个包装层加一个自动 json 解析的实现。这样入参和出参可以直接绑定到 struct。
impl<T, In, Out> Service for LayerJson<T, In, Out>
where
T: ServiceLayer<Config = In, Output = Out>,
In: for<'a> Deserialize<'a> + Send + Sync,
Out: Serialize + Send + Sync,
{
async fn call(&self, flow: Flow) -> anyhow::Result<Output> {
let cfg = match serde_json::from_str::<In>(node_config.as_str()) {
Ok(c) => c,
Err(e) => return Err(anyhow!("config parse error: {}", e)),
};
let output = self.handle.call(code, ctx, cfg).await?;
let raw = match serde_json::to_value(&output) {
Ok(o) => o,
Err(e) => return anyhow::anyhow!("code[{}],output json error:{}", node_info, e).into(),
};
Output::new(raw).raw_to_ctx().ok()
}
}
异常处理与恢复
在复杂的 Agent 运行环境中,异常处理至关重要。除了基本的 Result 传播外,还需要考虑以下策略:
- 重试机制:对于网络波动或临时性 LLM 超时,应配置指数退避重试。
- 熔断机制:当某个 Service 连续失败超过阈值,暂时停止该节点的调度,防止雪崩效应。
- 状态回滚:在执行关键业务逻辑前保存 Checkpoint,若后续步骤失败,可尝试回滚至上一安全状态。
pub enum RetryStrategy {
Fixed(u32),
ExponentialBackoff { base_ms: u64, max_times: u32 },
}
impl RetryStrategy {
pub async fn execute<F, Fut, T>(&self, func: F) -> anyhow::Result<T>
where
F: Fn() -> Fut,
Fut: Future<Output = anyhow::Result<T>>,
{
todo!()
}
}
性能考量
为了保证高并发下的稳定性,Runtime 设计还需关注以下性能点:
- 内存管理:使用
Arc 共享不可变数据,减少拷贝。注意 Mutex 的持有时间,避免长时间阻塞。
- 异步调度:利用
tokio 的多线程特性,将 IO 密集型任务(如 LLM 调用)与 CPU 密集型任务(如 Prompt 组装)分离。
- 资源隔离:不同租户或任务的 Context 应严格隔离,防止数据泄露或资源争抢。
测试
单元测试是保证 Runtime 稳定性的关键。我们需要验证 Plan 的执行顺序、Context 的状态流转以及 Service 的调用逻辑。
Mock Service 实现
首先定义一个用于测试的 Mock Service:
struct MockService;
#[async_trait]
impl Service for MockService {
async fn call(&self, flow: Flow) -> anyhow::Result<Output> {
println!("MockService executed with input: {:?}", flow.input);
Ok(Output::new("mock_result"))
}
}
执行流程测试
使用 tokio::test 编写集成测试,模拟一个简单的线性流程:
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_simple_flow() {
let runtime = Arc::new(Runtime::default());
let mut context = Context::new("test_flow_001", runtime.clone());
runtime.register_service("mock", Arc::new(MockService));
let plan = SimplePlan::new(vec!["mock".to_string()]);
context.set_plan(plan);
let result = Runtime::exec_next_node(context, "mock").await;
assert_eq!(context.status.load(Ordering::SeqCst), 2);
}
}
并发场景测试
验证并行分支的正确执行:
#[tokio::test]
async fn test_parallel_branches() {
let runtime = Arc::new(Runtime::default());
let context = Arc::new(Context::new("parallel_test", runtime.clone()));
let nodes = vec!["service_a", "service_b"];
}
结语
整体 runtime 模块的设计核心在于解耦与扩展性。通过将 Service、Plan 和 Context 抽象化,我们能够灵活应对从简单工作流到复杂多智能体协作的各种场景。未来的优化方向包括引入更细粒度的权限控制、支持动态热更新 Plan 以及增强对分布式环境的适配能力。
希望这篇关于 Agent 框架 Runtime 设计的文章能为你构建自己的 AI 系统提供有价值的参考。