Agent 框架开发(三):实现 LLM、Tool 及服务节点
Agent 框架的核心 Service 实现,涵盖 LLM 节点对接、Tool 抽象设计(支持 HTTP、Rust 函数及 Python 脚本)、以及流程控制节点(分支、注入、变量)。通过 ServiceLayer 接口统一执行入口,利用 CfgBound 机制处理变量引用与上下文传递。文章展示了如何注册服务层并启动 gRPC 服务,旨在构建模块化、可扩展的 AI Agent 运行时架构。

Agent 框架的核心 Service 实现,涵盖 LLM 节点对接、Tool 抽象设计(支持 HTTP、Rust 函数及 Python 脚本)、以及流程控制节点(分支、注入、变量)。通过 ServiceLayer 接口统一执行入口,利用 CfgBound 机制处理变量引用与上下文传递。文章展示了如何注册服务层并启动 gRPC 服务,旨在构建模块化、可扩展的 AI Agent 运行时架构。

在上一篇文章中,我们实现了一个基本的运行时,能够将 Service 按照 Plan 执行起来。本文我们将尝试实现一些基本节点,最终运行一个最简单的 Agent。
这里我们接入 OpenAI 的 LLM 能力。
首先定义模型的基本配置,包含 prompt、temperature、tools、context 等参数。
pub struct LLMNodeRequest {
#[serde(default = "String::default")]
pub prompt: String,
#[serde(default = "LLMNodeRequest::default_model_35")]
pub model: String,
#[serde(default = "Vec::default")]
pub tools: Vec<ChatCompletionTool>,
#[serde(default = "Vec::default")]
pub context: Vec<LLMContextMessage>,
#[serde(default = "LLMNodeRequest::max_tokens_length")]
pub max_tokens: u16,
#[serde(default = "LLMNodeRequest::default_temperature")]
pub temperature: f32,
#[serde(default = "bool::default")]
pub is_stream: bool,
pub query: String,
}
然后实现 Service,使用之前包装好的解析层:
impl agent_rt::ServiceLayer for OpenaiLLMService {
type Config = CfgBound<LLMNodeRequest>;
type Output = LLMNodeResponse;
async fn call(
&self,
_code: String,
ctx: Arc<Context>,
cfg: Self::Config,
) -> anyhow::Result<Self::Output> {
let cfg = cfg.bound(&ctx)?;
let req = cfg.to_openai_chat_request()?;
let mut stream = self.openai_client.chat().create_stream(req).await?;
let mut resp = LLMNodeResponse::default();
while let Some(msg) = stream.next().await {
let msg = match msg {
Ok(o) => o,
Err(e) => return Err(anyhow::Error::from(e)),
};
for i in msg.choices {
// 文本消息
if let Some(s) = i.delta.content {
if let Some(s) = Self::try_send_to_channel(&ctx, s) {
resp.append_answer(s.as_str());
}
}
// 工具调用
if let Some(tools) = i.delta.tool_calls {
resp.append_tools(tools);
}
}
}
Ok(resp)
}
}
我们采用通用设计,不具体实现某个 Tool,而是进行抽象。
// Tool 事件发生器
pub trait ToolEvent: Send {
async fn call(&self, name: &str, args: String) -> anyhow::Result<String>;
}
// Tool Service 实现的结构体
pub struct ToolService {
loader: Box<dyn ToolEvent + Sync + 'static>,
}
实现 ServiceLayer,直接将事件调用出去:
impl ServiceLayer for ToolService {
type Config = CfgBound<LLMToolCallRequest>;
type Output = LLMToolCallResponse;
async fn call(
&self,
code: String,
ctx: Arc<Context>,
cfg: Self::Config,
) -> anyhow::Result<Self::Output> {
let cfg = cfg.bound(&ctx)?;
let LLMToolCallRequest { call_id, name, args } = cfg;
wd_log::log_debug_ln!("code[{}] exec tool[{}] args:{:?}", code, name, args);
let content = self.loader.call(name.as_str(), args).await?;
let resp = LLMToolCallResponse { call_id, content };
wd_log::log_debug_ln!("code[{}] exec tool[{}] result[{:?}]", code, name, resp);
Ok(resp)
}
}
具体的实现可能是本地 Rust 函数、API 接口或其他脚本,这里用枚举定义:
pub enum Tool {
Http(ToolHttp),
Python(ToolPython),
Custom(Arc<dyn ToolFunction + Sync + 'static>),
}
虽然大模型 Function Call 直接给出执行哪个函数,但多个 API 通常在一个服务里,且有相同的鉴权和签名方式。因此,对于 HTTP 接口,我们需要先按组(Plugin)包装。
在调用具体 API 之前,先找到 Plugin。具体实现可能是数据库或其他服务加载出来的。
// 加载 Plugin 的抽象
pub trait PluginSchedule: Send {
async fn schedule(&self, plugin_name: &str, tool_name: &str) -> anyhow::Result<Plugin>;
}
// Plugin 调度器的实现的结构体
pub struct PluginControl {
pub schedule: Box<dyn PluginSchedule + Sync + 'static>,
}
// 实现上面提到的事件发生器
impl ToolEvent for PluginControl {
async fn call(&self, name: &str, args: String) -> anyhow::Result<String> {
let mut list = name.split('.').into_iter().rev().collect::<Vec<&str>>();
let plugin_name = list.pop().unwrap_or("");
let tool_name = list.pop().unwrap_or("");
= .schedule.(plugin_name, tool_name).?;
plugin.(tool_name, args).
}
}
{
auth: <Oauth>,
server: <(, )>,
tools: HashMap<, Tool>,
}
HTTP 的具体实现逻辑主要是组装请求并发起调用,此处省略具体网络代码细节。
定义一个 Trait,并为所有符合该 Trait 的函数做默认实现:
// 函数的抽象
pub trait ToolFunction: Send {
async fn call(&self, args: String) -> anyhow::Result<String>;
}
// 所有符合这个 Trait 的函数的默认实现
impl<T, Fut> ToolFunction for T
where
T: Fn(String) -> Fut + Send + Sync + 'static,
Fut: Future<Output = anyhow::Result<String>> + Send,
{
async fn call(&self, args: String) -> anyhow::Result<String> {
(self)(args).await
}
}
在实现之前需要考虑几个问题:
为什么是 Python? 虽然 Lua 短小轻快,但 Python 是大众选择,生态更丰富,便于集成现有库。
能用本地的 Python 执行吗? 不能。服务端运行的脚本需要沙盒环境,且需避免不同本地环境的版本差异和包依赖冲突。通常建议通过容器化或远程解释器服务来执行。
因为我们实现的是流程图策略,所以还需要实现一些功能无关的 Service,即流程图里面的节点。
分支的定义非常简短,一个数组盛放判断条件和变量,成功走 true_goto 节点,失败走 false_goto 节点。
pub struct SelectorServiceConfig {
// 分支判断的条件:或,且
pub condition: String,
// 三段式 var1 [comparator] var2
// comparator ==,!=,>,>=,<,<=,is_null,no_null,contain,no_contain
// if comparator is [is_null,no_null], do not need var2
pub vars: Vec<Value>,
pub true_goto: String,
pub false_goto: String,
}
具体实现同样遵循 ServiceLayer:
impl agent_rt::ServiceLayer for SelectorService {
type Config = CfgBound<SelectorServiceConfig>;
type Output = Value;
async fn call(
&self,
code: String,
ctx: Arc<Context>,
cfg: Self::Config,
) -> anyhow::Result<Self::Output> {
let cfg = cfg.bound(&ctx)?;
let mut result = false;
let all_true = cfg.condition == "且";
// 循环判断条件是否成立
let mut vars = cfg.vars.into_iter().collect::<VecDeque<Value>>();
loop {
// 具体的判断函数,根据判断条件判断是否成立,返回 bool 结果
let ok = Self::judge(&mut vars)?;
// 结果检查逻辑...
if !ok && all_true {
break;
}
}
// 找到下个执行的节点
let go_next_node = if result {
cfg.true_goto
} else {
cfg.false_goto
};
ctx.plan.update(...)?;
(Value::Null)
}
}
这是为了修改某个节点的配置,比如给 LLM 追加上下文。实现也很简单:
impl agent_rt::ServiceLayer for InjectorService {
type Config = CfgBound<InjectorServiceConfig>;
type Output = Value;
async fn call(
&self,
_code: String,
ctx: Arc<Context>,
cfg: Self::Config,
) -> anyhow::Result<Self::Output> {
let InjectorServiceConfig {
from, to, operate, ..
} = cfg.bound(&ctx)?;
if to.is_empty() {
return Err(anyhow::anyhow!("InjectorService: from and to must have a value"));
}
// from: 修改的结果
// operate: 按照什么方式修改,目前只实现了赋值和追加操作
// to: 修改位置,就是修改哪个变量?
Self::update(to, &ctx, |x| Self::operate(x, from, operate))?;
Ok(Value::Null)
}
}
这种节点不做任何事情,就是生成一个变量,一般 start 节点和 end 节点都是变量节点。
impl agent_rt::ServiceLayer for VarFlowChartService {
type Config = CfgBound<Value>;
type Output = Value;
async fn call(
&self,
_code: String,
ctx: Arc<Context>,
cfg: Self::Config,
) -> anyhow::Result<Self::Output> {
let var = cfg.raw_bound_value(&ctx)?;
Ok(var)
}
}
上面的所有节点配置,都用的同样的绑定方式 CfgBound 来进行绑定。它用于处理变量引用,例如 {{llm.tools}} 表示这个变量来自 LLM 节点的执行结果里的 tools 字段。这种引用机制避免了每个 Service 重复实现变量解析逻辑。
上面实现的内容都在 wd_agent 中。作为服务提供服务时,需要和 agent_rt 结合一起运行,通过 gRPC 调用。
总纲如下:
pub async fn start(addr: &str) {
// create service
let openai_llm = wd_agent::rt_node_service::OpenaiLLMService::default();
let var = wd_agent::rt_node_service::VarFlowChartService::default();
let python = PythonCodeService::new("http://127.0.0.1:50001")
.await
.unwrap();
// build agent runtime
let rt = agent_rt::Runtime::default()
.register_service_layer("openai_llm", openai_llm)
.register_service_layer("python", python)
.register_service_layer("flow_chart_selector", SelectorService::default())
.register_service_layer("flow_chart_injector", InjectorService::default())
.register_service_layer("workflow", WorkflowService::default())
.register_service_layer("flow_chart_var", var);
// 启动 rpc 服务
let app = serve_entity::AgentServeEntity::new(rt);
let addr = addr.parse().unwrap();
wd_log::log_debug_ln!("grpc.Server lister addr[{}]", addr);
tonic::transport::Server::builder()
.(proto::agent_service_server::AgentServiceServer::(app))
.(addr)
.
.();
}
本文演示了 Agent 框架中核心 Service 的实现方式,包括 LLM 交互、工具调用抽象、以及流程控制节点。通过 ServiceLayer 接口统一了不同功能的执行入口,利用 CfgBound 解决了上下文变量传递的问题。这种模块化设计使得后续扩展新的 Tool 类型或流程节点变得非常简单,为构建复杂的 AI Agent 应用奠定了坚实的基础。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online