跳到主要内容Agent 框架开发(三):实现 LLM、Tool 及服务节点 | 极客日志RustAI算法
Agent 框架开发(三):实现 LLM、Tool 及服务节点
综述由AI生成Agent 框架的核心 Service 实现,涵盖 LLM 节点对接、Tool 抽象设计(支持 HTTP、Rust 函数及 Python 脚本)、以及流程控制节点(分支、注入、变量)。通过 ServiceLayer 接口统一执行入口,利用 CfgBound 机制处理变量引用与上下文传递。文章展示了如何注册服务层并启动 gRPC 服务,旨在构建模块化、可扩展的 AI Agent 运行时架构。
flc9 浏览 Agent 框架开发(三):实现 LLM、Tool 及服务节点
前言
在上一篇文章中,我们实现了一个基本的运行时,能够将 Service 按照 Plan 执行起来。本文我们将尝试实现一些基本节点,最终运行一个最简单的 Agent。
1. OpenAI-LLM
这里我们接入 OpenAI 的 LLM 能力。
首先定义模型的基本配置,包含 prompt、temperature、tools、context 等参数。
pub struct LLMNodeRequest {
#[serde(default = "String::default")]
pub prompt: String,
#[serde(default = "LLMNodeRequest::default_model_35")]
pub model: String,
#[serde(default = "Vec::default")]
pub tools: Vec<ChatCompletionTool>,
#[serde(default = "Vec::default")]
pub context: Vec<LLMContextMessage>,
#[serde(default = "LLMNodeRequest::max_tokens_length")]
pub max_tokens: u16,
#[serde(default = "LLMNodeRequest::default_temperature")]
pub temperature: f32,
#[serde(default = "bool::default")]
pub is_stream: bool,
pub query: String,
}
然后实现 Service,使用之前包装好的解析层:
impl agent_rt::ServiceLayer for OpenaiLLMService {
type Config = CfgBound<LLMNodeRequest>;
= LLMNodeResponse;
(
&,
_code: ,
ctx: Arc<Context>,
cfg: ::Config,
) anyhow::<::Output> {
= cfg.(&ctx)?;
= cfg.()?;
= .openai_client.().(req).?;
= LLMNodeResponse::();
(msg) = stream.(). {
= msg {
(o) => o,
(e) => (anyhow::Error::(e)),
};
msg.choices {
(s) = i.delta.content {
(s) = ::(&ctx, s) {
resp.(s.());
}
}
(tools) = i.delta.tool_calls {
resp.(tools);
}
}
}
(resp)
}
}
type
Output
async
fn
call
self
String
Self
->
Result
Self
let
cfg
bound
let
req
to_openai_chat_request
let
mut
stream
self
chat
create_stream
await
let
mut
resp
default
while
let
Some
next
await
let
msg
match
Ok
Err
return
Err
from
for
i
in
if
let
Some
if
let
Some
Self
try_send_to_channel
append_answer
as_str
if
let
Some
append_tools
Ok
2. Tool
我们采用通用设计,不具体实现某个 Tool,而是进行抽象。
pub trait ToolEvent: Send {
async fn call(&self, name: &str, args: String) -> anyhow::Result<String>;
}
pub struct ToolService {
loader: Box<dyn ToolEvent + Sync + 'static>,
}
实现 ServiceLayer,直接将事件调用出去:
impl ServiceLayer for ToolService {
type Config = CfgBound<LLMToolCallRequest>;
type Output = LLMToolCallResponse;
async fn call(
&self,
code: String,
ctx: Arc<Context>,
cfg: Self::Config,
) -> anyhow::Result<Self::Output> {
let cfg = cfg.bound(&ctx)?;
let LLMToolCallRequest { call_id, name, args } = cfg;
wd_log::log_debug_ln!("code[{}] exec tool[{}] args:{:?}", code, name, args);
let content = self.loader.call(name.as_str(), args).await?;
let resp = LLMToolCallResponse { call_id, content };
wd_log::log_debug_ln!("code[{}] exec tool[{}] result[{:?}]", code, name, resp);
Ok(resp)
}
}
具体的实现可能是本地 Rust 函数、API 接口或其他脚本,这里用枚举定义:
pub enum Tool {
Http(ToolHttp),
Python(ToolPython),
Custom(Arc<dyn ToolFunction + Sync + 'static>),
}
2.1 HTTP 实现
虽然大模型 Function Call 直接给出执行哪个函数,但多个 API 通常在一个服务里,且有相同的鉴权和签名方式。因此,对于 HTTP 接口,我们需要先按组(Plugin)包装。
在调用具体 API 之前,先找到 Plugin。具体实现可能是数据库或其他服务加载出来的。
pub trait PluginSchedule: Send {
async fn schedule(&self, plugin_name: &str, tool_name: &str) -> anyhow::Result<Plugin>;
}
pub struct PluginControl {
pub schedule: Box<dyn PluginSchedule + Sync + 'static>,
}
impl ToolEvent for PluginControl {
async fn call(&self, name: &str, args: String) -> anyhow::Result<String> {
let mut list = name.split('.').into_iter().rev().collect::<Vec<&str>>();
let plugin_name = list.pop().unwrap_or("");
let tool_name = list.pop().unwrap_or("");
let plugin = self.schedule.schedule(plugin_name, tool_name).await?;
plugin.call(tool_name, args).await
}
}
pub struct Plugin {
pub auth: Option<Oauth>,
pub server: Option<(String, u16)>,
pub tools: HashMap<String, Tool>,
}
HTTP 的具体实现逻辑主要是组装请求并发起调用,此处省略具体网络代码细节。
2.2 Rust 函数实现
定义一个 Trait,并为所有符合该 Trait 的函数做默认实现:
pub trait ToolFunction: Send {
async fn call(&self, args: String) -> anyhow::Result<String>;
}
impl<T, Fut> ToolFunction for T
where
T: Fn(String) -> Fut + Send + Sync + 'static,
Fut: Future<Output = anyhow::Result<String>> + Send,
{
async fn call(&self, args: String) -> anyhow::Result<String> {
(self)(args).await
}
}
2.3 Python 脚本实现
为什么是 Python?
虽然 Lua 短小轻快,但 Python 是大众选择,生态更丰富,便于集成现有库。
能用本地的 Python 执行吗?
不能。服务端运行的脚本需要沙盒环境,且需避免不同本地环境的版本差异和包依赖冲突。通常建议通过容器化或远程解释器服务来执行。
3. 流程节点
因为我们实现的是流程图策略,所以还需要实现一些功能无关的 Service,即流程图里面的节点。
3.1 分支
分支的定义非常简短,一个数组盛放判断条件和变量,成功走 true_goto 节点,失败走 false_goto 节点。
pub struct SelectorServiceConfig {
pub condition: String,
pub vars: Vec<Value>,
pub true_goto: String,
pub false_goto: String,
}
impl agent_rt::ServiceLayer for SelectorService {
type Config = CfgBound<SelectorServiceConfig>;
type Output = Value;
async fn call(
&self,
code: String,
ctx: Arc<Context>,
cfg: Self::Config,
) -> anyhow::Result<Self::Output> {
let cfg = cfg.bound(&ctx)?;
let mut result = false;
let all_true = cfg.condition == "且";
let mut vars = cfg.vars.into_iter().collect::<VecDeque<Value>>();
loop {
let ok = Self::judge(&mut vars)?;
if !ok && all_true {
break;
}
}
let go_next_node = if result {
cfg.true_goto
} else {
cfg.false_goto
};
ctx.plan.update(...)?;
Ok(Value::Null)
}
}
3.2 注入
这是为了修改某个节点的配置,比如给 LLM 追加上下文。实现也很简单:
impl agent_rt::ServiceLayer for InjectorService {
type Config = CfgBound<InjectorServiceConfig>;
type Output = Value;
async fn call(
&self,
_code: String,
ctx: Arc<Context>,
cfg: Self::Config,
) -> anyhow::Result<Self::Output> {
let InjectorServiceConfig {
from, to, operate, ..
} = cfg.bound(&ctx)?;
if to.is_empty() {
return Err(anyhow::anyhow!("InjectorService: from and to must have a value"));
}
Self::update(to, &ctx, |x| Self::operate(x, from, operate))?;
Ok(Value::Null)
}
}
3.3 变量
这种节点不做任何事情,就是生成一个变量,一般 start 节点和 end 节点都是变量节点。
impl agent_rt::ServiceLayer for VarFlowChartService {
type Config = CfgBound<Value>;
type Output = Value;
async fn call(
&self,
_code: String,
ctx: Arc<Context>,
cfg: Self::Config,
) -> anyhow::Result<Self::Output> {
let var = cfg.raw_bound_value(&ctx)?;
Ok(var)
}
}
4. CfgBound
上面的所有节点配置,都用的同样的绑定方式 CfgBound 来进行绑定。它用于处理变量引用,例如 {{llm.tools}} 表示这个变量来自 LLM 节点的执行结果里的 tools 字段。这种引用机制避免了每个 Service 重复实现变量解析逻辑。
测试与启动
上面实现的内容都在 wd_agent 中。作为服务提供服务时,需要和 agent_rt 结合一起运行,通过 gRPC 调用。
pub async fn start(addr: &str) {
let openai_llm = wd_agent::rt_node_service::OpenaiLLMService::default();
let var = wd_agent::rt_node_service::VarFlowChartService::default();
let python = PythonCodeService::new("http://127.0.0.1:50001")
.await
.unwrap();
let rt = agent_rt::Runtime::default()
.register_service_layer("openai_llm", openai_llm)
.register_service_layer("python", python)
.register_service_layer("flow_chart_selector", SelectorService::default())
.register_service_layer("flow_chart_injector", InjectorService::default())
.register_service_layer("workflow", WorkflowService::default())
.register_service_layer("flow_chart_var", var);
let app = serve_entity::AgentServeEntity::new(rt);
let addr = addr.parse().unwrap();
wd_log::log_debug_ln!("grpc.Server lister addr[{}]", addr);
tonic::transport::Server::builder()
.add_service(proto::agent_service_server::AgentServiceServer::new(app))
.serve(addr)
.await
.unwrap();
}
总结
本文演示了 Agent 框架中核心 Service 的实现方式,包括 LLM 交互、工具调用抽象、以及流程控制节点。通过 ServiceLayer 接口统一了不同功能的执行入口,利用 CfgBound 解决了上下文变量传递的问题。这种模块化设计使得后续扩展新的 Tool 类型或流程节点变得非常简单,为构建复杂的 AI Agent 应用奠定了坚实的基础。
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- 随机西班牙地址生成器
随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online