跳到主要内容Eino ADK 核心解析:为什么 Agent 必须是一层独立抽象 | 极客日志Go / GolangAI
Eino ADK 核心解析:为什么 Agent 必须是一层独立抽象
Eino ADK 中的 Agent 抽象并非简单的 Prompt 包装,而是统一输入、事件流与行为协议的核心运行对象。解析 Agent 接口设计原理,涵盖 Name、Description、Run 方法职责,剖析 AgentInput 上下文机制及 AsyncIterator 事件消费模式。通过从零实现 ConceptTutorAgent 实战代码,展示如何构建请求级选项与自定义事件流,帮助开发者理解多 Agent 协作下的链路治理与状态管理基础。
观心14 浏览 Eino ADK 核心解析:为什么 Agent 必须是一层独立抽象
很多人真正没看懂的,不是 Name、Description、Run,而是这套协议到底统一了什么。
本篇只讲一点:为什么 ADK 一定要单独定义 Agent 这层抽象?
本文只做四件事:
- 讲清
Agent 有什么用,为什么它不是 Prompt 包装器
- 讲透
AgentInput / AgentRunOption / AsyncIterator / AgentEvent
- 给一个 零外部依赖 的自定义 Agent demo
- 帮你把后面
Workflow / Runner / Interrupt 的地基先打好
1. 为什么 Agent 抽象是必要的
如果没有 Agent 这一层,AI 应用很容易长成一堆分散的模型调用:这里直接调 ChatModel,那里自己拼 Messages,Tool 结果自己处理。多 Agent 协作时,每一层都重新定义输入输出,中断、恢复、链路追踪、状态注入散在业务代码里。
但只要系统开始复杂一点,问题就来了:谁是这次执行单元的身份标识?别的 Agent 怎么知道它能做什么?调用方拿到的是最终字符串,还是过程事件?某个请求级参数该影响谁?
所以 Agent 抽象真正解决的,不是'怎么调模型',而是:怎么把一次智能体执行,统一成一个可运行、可组合、可治理的对象。
你只要先记住一个判断就够了:Agent 不单独存在,而是和 AgentInput、AgentRunOption、AsyncIterator、AgentEvent 一起构成运行协议。
2. Agent 接口:为什么这三个方法都不能少
官方定义很短,但每个字段都有深意:
type Agent interface {
Name(ctx context.Context) string
Description(ctx context.Context) string
Run(ctx context.Context, input *AgentInput, opts ...AgentRunOption) *AsyncIterator[*AgentEvent]
}
Name
Name 不只是'取个名字'。它至少承担三件事:Agent 的身份标识、执行链路里的节点名、以及 DesignateAgent(...) 这类定向 option 的匹配目标。
Description
Description 也不只是注释。它更像对外公开的职责声明:给人看,知道这个 Agent 会什么;给别的 Agent 看,判断该不该把任务转给它。
Run
Run 才是核心。这一个签名,直接把四件事统一了:
- 一次 Agent 执行必须带
context.Context
- 输入统一走
AgentInput
- 请求级调参统一走
AgentRunOption
- 输出统一走事件流
AsyncIterator[*AgentEvent]
所以 Run 不是普通函数,它是在规定:ADK 里的一次 Agent 执行,应该以什么协议被启动、被调整、被消费。
3. AgentInput:为什么输入是 Messages,不是一个字符串
type AgentInput struct {
Messages []Message
EnableStreaming bool
}
type Message = *schema.Message
很多人第一次看到这里,会下意识理解成'用户问题 + 一个流式开关'。这个理解太轻了。
Messages 是任务上下文,不是单条 prompt
Messages 里可以放的不只是用户这一句。它可以承载当前问题、对话历史、上游 Agent 结果、背景知识、样例数据、系统约束。
也就是说,Messages 的意义不是'聊天格式',真正的价值是:把一次任务所需的上下文统一收紧。
如果输入只是一条 string prompt,那每个 Agent 都得自己决定历史怎么塞、系统约束怎么塞、Tool 结果怎么塞,输入协议就会发散。
EnableStreaming 是建议,不是强制
这是一个特别容易踩的点。很多人会误以为 EnableStreaming=true 就一定流式,EnableStreaming=false 就一定非流式。但官方文档强调得很清楚,它只是一个 建议。
它只会影响那些'同时支持流和非流'的组件,比如 ChatModel。如果某个组件天然只支持一种输出方式,比如很多 Tool,它不会因为这个字段就突然变成流式。
这句最好直接背下来:EnableStreaming 控制的是偏好,不是强制转换器。实际输出到底是不是流,请看后面的 MessageVariant.IsStreaming。
4. AgentRunOption 和 AgentWithOptions
| 能力 | 作用时机 | 你可以先怎么理解 |
|---|
AgentRunOption | 请求期 | 这一次运行怎么调 |
AgentWithOptions | 运行前 | 这个 Agent 先被怎么包装 |
AgentRunOption
它是传给 Run() 的。官方内置给了两个很典型的通用 option:WithSessionValues(设置跨 Agent 读写数据)和 WithSkipTransferMessages(某些 Transfer 消息不进入 History)。
除此之外,ADK 还给了两个很实用的扩展点:adk.WrapImplSpecificOptFn(...) 和 adk.GetImplSpecificOptions(...)。这套设计的价值很直接:每个 Agent 都可以扩展出自己的请求级参数,而不用把所有行为都塞进一套全局 option。
比如后面 demo 里的 WithAudience("newbie") 和 WithAudience("interview"),它就能证明 AgentRunOption 真的是'这次运行怎么调',而不是静态配置。
AgentWithOptions
它是这样用的:func AgentWithOptions(ctx context.Context, agent Agent, opts ...AgentOption) Agent。
官方当前内置支持的两个点是 WithDisallowTransferToParent 和 WithHistoryRewriter。它们都不属于'这一次运行怎么调',它们属于:在真正执行前,先把 Agent 包一层通用行为。
5. AsyncIterator:为什么 Agent 不直接返回字符串
type AsyncIterator[T any] struct { ... }
func (ai *AsyncIterator[T]) Next() (T, bool)
ADK 这里的一个关键设计是:Agent 不是'输入一个值,输出一个值'的普通函数。一次 Agent 执行,除了最终文本,还可能产生中间输出、Tool 消息、跳转行为、中断行为、错误。如果只返回 string,这些信息根本没地方放。
所以 ADK 选择的是:不直接给终值,而是给一串按顺序消费的事件。
Next() 为什么重要
Next() 是阻塞式的。也就是每次调用时,只会等两种结果:等到一个新的 AgentEvent,或者等到迭代器关闭,返回 ok=false。
for {
event, ok := iter.Next()
if !ok {
break
}
}
NewAsyncIteratorPair + goroutine 为什么是常见写法
官方给了这套基础设施:iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]()。
iter 给调用方消费
gen 给 Agent 内部发事件
自定义 Agent 常见实现会开 goroutine,不是为了炫技,而是因为 Run() 的目标不是等所有事做完再返回,而是先把事件出口交出去,然后内部异步地产生事件。如果不这么做,你会把'事件流协议'重新写回'阻塞函数返回值'。
6. AgentEvent / AgentOutput / AgentAction:一次执行到底吐出了什么
type AgentEvent struct {
AgentName string
RunPath []RunStep
Output *AgentOutput
Action *AgentAction
Err error
}
这部分只要抓住'事件里到底装了哪几类信息'就够了。
AgentName 和 RunPath
AgentName:是谁发出的当前事件
RunPath:这个事件是沿着哪条调用链走到这里的
在单 Agent 场景里你可能感受不强。但一到多 Agent 场景,这两个字段就是链路上下文。
AgentOutput
type AgentOutput struct {
MessageOutput *MessageVariant
CustomizedOutput any
}
这说明 ADK 默认把'消息输出'当成第一公民,同时也允许你挂自定义输出。而 MessageVariant 的价值是把流式和非流式统一起来。
IsStreaming:当前到底是不是流
Role:当前是 Assistant 还是 Tool
ToolName:如果是 Tool,工具名是什么
AgentAction
很多人看 AgentEvent 时,只盯着 Output。但 ADK 还专门留了一条'行为输出通道':
type AgentAction struct {
Exit bool
Interrupted *InterruptInfo
TransferToAgent *TransferToAgentAction
BreakLoop *BreakLoopAction
CustomizedAction any
}
它的意义很直接:Agent 不只会'说什么',还会'决定接下来怎么跑'。
NewExitAction():立刻退出
NewTransferToAgentAction(name):跳到目标 Agent
Interrupted:通知 Runner 当前中断
BreakLoop:让 LoopAgent 结束循环
gen.Send(&adk.AgentEvent{
Action: adk.NewExitAction(),
})
Err
否则很容易出现一种假象:看起来'好像有输出',但实际执行已经坏了。
7. 自定义 Agent 实战:从零实现一个 ConceptTutorAgent
这段代码的目标不是做知识推理,而是跑通 Agent 协议。它想证明 4 件事:自定义 Agent 本质上就是实现 Agent 接口;Run() 返回的是事件流,不是字符串;AgentRunOption 可以做请求级调参;不接模型 API,也能把 Agent 协议本身跑通。
完整代码
package main
import (
"context"
"fmt"
"log"
"os"
"strings"
"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/schema"
)
type audienceOptions struct {
audience string
}
func WithAudience(audience string) adk.AgentRunOption {
return adk.WrapImplSpecificOptFn(func(o *audienceOptions) {
o.audience = audience
})
}
type ConceptTutorAgent struct{}
func (a *ConceptTutorAgent) Name(ctx context.Context) string {
return "ConceptTutorAgent"
}
func (a *ConceptTutorAgent) Description(ctx context.Context) string {
return "负责把一个技术概念讲成新手能听懂的三段话"
}
func (a *ConceptTutorAgent) Run(ctx context.Context, input *adk.AgentInput, opts ...adk.AgentRunOption) *adk.AsyncIterator[*adk.AgentEvent] {
iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]()
go func() {
defer gen.Close()
if err := ctx.Err(); err != nil {
gen.Send(&adk.AgentEvent{Err: err})
return
}
if input == nil || len(input.Messages) == 0 {
gen.Send(&adk.AgentEvent{Err: fmt.Errorf("agent input messages is empty")})
return
}
cfg := adk.GetImplSpecificOptions(&audienceOptions{audience: "newbie"}, opts...)
concept := lastUserMessage(input.Messages)
if strings.TrimSpace(concept) == "" {
gen.Send(&adk.AgentEvent{Err: fmt.Errorf("last user message is empty")})
return
}
reply := buildReply(concept, cfg.audience, input.EnableStreaming)
gen.Send(adk.EventFromMessage(schema.AssistantMessage(reply, nil), nil, schema.Assistant, ""))
}()
return iter
}
func lastUserMessage(messages []adk.Message) string {
for i := len(messages) - 1; i >= 0; i-- {
msg := messages[i]
if msg != nil && msg.Role == schema.User {
return msg.Content
}
}
return ""
}
func buildReply(concept, audience string, enableStreaming bool) string {
prefix := "面向新手"
if audience == "interview" {
prefix = "面向面试复盘"
}
streamingHint := "这次我没有实现流式输出,所以会一次性返回完整结果。"
if !enableStreaming {
streamingHint = "这次按非流式方式返回完整结果。"
}
return fmt.Sprintf("%s\n\n一句话定义:这里把'%s'当成当前要讲解的概念。\n为什么重要:这个 demo 不是在做真实知识推理,而是在演示 Agent 如何围绕输入、事件和 option 组织一次执行。\n常见坑:别把 Messages 理解成单条 prompt,它其实承载的是任务上下文。\n补充:%s", prefix, concept, streamingHint)
}
func main() {
ctx := context.Background()
concept := "Agent 抽象"
if len(os.Args) > 1 {
concept = strings.Join(os.Args[1:], " ")
}
agent := &ConceptTutorAgent{}
input := &adk.AgentInput{
Messages: []adk.Message{
schema.SystemMessage("你是一个负责解释技术概念的教学 Agent。"),
schema.UserMessage(concept),
},
EnableStreaming: true,
}
fmt.Printf("agent=%s\n", agent.Name(ctx))
fmt.Printf("description=%s\n\n", agent.Description(ctx))
iter := agent.Run(ctx, input, WithAudience("newbie"))
for {
event, ok := iter.Next()
if !ok {
break
}
if event.Err != nil {
log.Fatalf("agent failed: %v", event.Err)
}
if event.Output == nil || event.Output.MessageOutput == nil {
continue
}
mv := event.Output.MessageOutput
if mv.Message == nil {
continue
}
fmt.Printf("assistant>\n%s\n", mv.Message.Content)
}
}
运行
go mod init concept-tutor-demo
go get github.com/cloudwego/eino@latest
go run . -- "AsyncIterator"
agent=ConceptTutorAgent
description=负责把一个技术概念讲成新手能听懂的三段话
assistant> 面向新手 一句话定义:这里把'AsyncIterator'当成当前要讲解的概念。 为什么重要:这个 demo 不是在做真实知识推理,而是在演示 Agent 如何围绕输入、事件和 option 组织一次执行。 常见坑:别把 Messages 理解成单条 prompt,它其实承载的是任务上下文。 补充:这次我没有实现流式输出,所以会一次性返回完整结果。
这段代码对应了哪些抽象
Name():给 Agent 身份
Description():给 Agent 职责描述
Run():按统一协议执行
AgentInput.Messages:承载任务上下文
WithAudience(...):演示请求级 option
NewAsyncIteratorPair():建立生产者和消费者
EventFromMessage(...):把输出装进 AgentEvent
iter.Next():调用方按事件流消费
进阶补充:流式长什么样
这次 demo 故意没实现流式,就是为了说明:EnableStreaming=true 不意味着你这个 Agent 必须流式输出。
如果你只想看'流式 MessageVariant 怎么发',一个最小片段是:
stream := schema.StreamReaderFromArray([]adk.Message{
schema.AssistantMessage("第一段。", nil),
schema.AssistantMessage("第二段。", nil),
})
gen.Send(adk.EventFromMessage(nil, stream, schema.Assistant, ""))
IsStreaming = true
Message = nil
MessageStream != nil
总结
Agent 不是一段配置,而是一套统一输入、统一事件流、统一行为协议的运行对象。
相关免费在线工具
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- 随机西班牙地址生成器
随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online