跳到主要内容
Sim 基于 DAG 的 AI 智能体工作流编排引擎技术解析 | 极客日志
TypeScript SaaS AI
Sim 基于 DAG 的 AI 智能体工作流编排引擎技术解析 综述由AI生成 Sim 是一个基于有向无环图(DAG)的低代码 AI 智能体工作流编排与执行引擎。它通过声明式建模将复杂流程抽象为节点与边,支持循环、并行及人工介入控制。核心组件包括高性能执行引擎、状态持久化恢复机制及多工具集成处理器(含 MCP 协议)。技术实现采用 TypeScript,解决了传统脚本硬编码耦合度高、状态管理困难及工具集成繁琐等问题,适用于自动化客服、数据分析等需要串联多个 AI 步骤的场景。
DevOpsTeam 发布于 2026/4/10 更新于 2026/5/23 18 浏览Sim 基于 DAG 的 AI 智能体工作流编排引擎技术解析
1. 整体介绍
概要说明
项目地址 :https://github.com/simstudioai/sim
项目简介 :Sim 是一个基于有向无环图(DAG)的、声明式的低代码/无代码 AI 智能体工作流编排与执行引擎。它并非一个简单的'可视化工具',其核心是一个高性能、支持复杂流程控制(如循环、并行、暂停恢复)的运行时系统。代码展示了其核心执行引擎、DAG 构建器和智能体(Agent)处理器。
面临问题、场景与人群 :
问题 :
编排复杂性 :构建多步骤、具备条件判断、循环、并行及人工介入的 AI 应用逻辑复杂,传统代码编写工作量大,易出错。
状态管理困难 :AI 工作流执行时间长、可能中断(如等待人工审批),需要可靠的持久化与恢复机制。
工具集成繁琐 :需要将多种工具(API、函数、MCP 协议工具、自定义代码)统一封装,供 AI 智能体安全、便捷地调用。
开发效率瓶颈 :从原型到生产部署周期长,需要兼顾可视化设计和底层执行性能。
目标人群 :AI 应用开发者、产品经理、业务分析师,以及需要在业务流程中集成 AI 能力的企业团队。
核心场景 :自动化客服、内容生成流水线、数据分析与报告、复杂的决策支持系统等需要串联多个 AI 步骤和人工审核的流程。
解决方法与演进 :
传统方式 :开发人员使用脚本(如 Python)硬编码流程逻辑,耦合度高,状态管理需自行实现,可视化与执行引擎分离。
Sim 新方式 :
声明式建模 :将工作流抽象为 DAG,节点(Block)声明其功能,边(Edge)声明依赖关系。执行引擎负责解析 DAG 并调度。
可视化即代码 :画布操作直接生成可序列化的 DAG 描述(SerializedWorkflow),设计与执行使用同一套数据模型。
内置复杂控制流 :通过 LoopConstructor, ParallelConstructor 等将高级语义(循环、并行)编译为底层 DAG 结构。
执行快照与恢复 :ExecutionEngine 支持将运行时状态(如 pendingBlocks, remainingEdges)序列化,实现'暂停/恢复'。
商业价值预估 :
代码成本估算 :构建一个具备类似 DAG 执行引擎、状态恢复、多工具集成能力的系统,需要一个 5-8 人的资深前端与后端团队约 9-12 个月,粗略人力成本在数百万人民币级别。
覆盖问题空间效益 :Sim 通过提供标准化平台,将上述成本分摊给大量用户。它解决了 AI 应用开发中的'编排'、'集成'、'部署'三大核心痛点,覆盖了从原型验证到生产部署的全链路。其商业价值在于显著降低企业引入 AI 自动化的技术门槛和时间成本,潜在市场涵盖金融、教育、电商、客服等多个行业。其开源 + 托管模式,有助于快速建立生态,获取早期用户与场景反馈。
2. 详细功能拆解(产品 + 技术视角)
可视化 DAG 编辑器(产品层)
产品视角 :用户通过拖拽 Block、连线来设计工作流,直观表达'先做什么,后做什么,在什么条件下分支'。
技术支撑 :前端组件生成 SerializedWorkflow 数据。核心是后端的 DAGBuilder.build() 方法,它将用户友好的序列化数据,编译(Construction)为内部可执行的 图结构,处理了循环展开、并行路径识别等复杂转换。
DAG
高性能 DAG 执行引擎(技术层)
产品视角 :用户点击'运行',工作流自动执行,支持实时日志、中途暂停。
技术支撑 :ExecutionEngine 类是核心。它管理一个就绪队列(readyQueue)和正在执行的 Promise 集合(executing)。其 run() 方法循环检查'是否有工作'(hasWork()),从队列中取出就绪节点,通过 NodeExecutionOrchestrator 异步执行,并根据边条件(EdgeManager)将下游节点加入队列,实现并行推进。
状态持久化与恢复(技术层)
产品视角 :工作流可以暂停(如等待人工审批),之后从断点继续执行,状态不丢失。
技术支撑 :这是关键技术。ExecutionEngine 在 initializeQueue() 时检查是否有来自快照的 pendingBlocks 或 remainingEdges,并据此恢复队列和节点依赖状态。AgentBlockHandler 中的 _pauseMetadata 和 buildPausedResult() 方法共同实现了暂停点的创建与快照生成。
多工具集成与智能体调度(技术层)
产品视角 :在 Agent Block 中可配置 LLM 模型、提示词,并勾选各种工具(数据库查询、API、自定义函数)供其调用。
技术支撑 :AgentBlockHandler 是复杂度最高的处理器之一。它负责:
a. 工具格式化 :将配置的工具(ToolInput)统一格式化为 LLM 可识别的工具调用模式(formatTools)。
b. MCP 工具处理 :支持 Model Context Protocol 工具,包含缓存 Schema、服务发现、批量处理等优化(processMcpToolsBatched)。
c. 上下文构建 :整合系统提示、用户输入、记忆(Memory)服务,构建完整的对话历史(buildMessages)。
d. 供应商抽象 :将请求统一发送给 executeProviderRequest,屏蔽不同 AI 供应商(OpenAI, Anthropic, 本地 Ollama 等)的 API 差异。
3. 技术难点挖掘
动态 DAG 的构建与执行 :如何将用户定义的循环、并行等高级结构,正确无误地编译为静态 DAG,并确保执行时的语义正确性(如循环的迭代、并发的扇入扇出)。
有状态工作流的暂停与精确恢复 :如何在任意节点(特别是并行分支中)暂停,并序列化所有中间状态(变量、未完成的边、待执行节点队列),保证恢复后执行结果的一致性。
工具调用的安全性与性能 :如何安全地执行用户自定义代码(Function Block)、动态发现和调用外部工具(MCP),同时管理连接、超时和错误,并避免对 LLM 请求造成性能瓶颈。
混合执行模式的支持 :需要同时支持非流式响应、流式响应(Streaming),并在浏览器和服务器两种环境下都能稳定工作,对网络通信和错误处理要求高。
4. 详细设计图
4.1 核心架构图 (Component Diagram)
4.2 核心链路序列图:工作流执行 (Sequence Diagram) AI ProviderAgentBlockHandlerNodeOrchestratorDAGExecutionEngine 用户 AI ProviderAgentBlockHandlerNodeOrchestratorDAGExecutionEngine 用户 alt[需要调用工具]loop[处理就绪队列]run(workflowId)initializeQueue()get ready nodes[node1]executeNode(node)execute(ctx, block, inputs)formatTools(), buildMessages()携带工具的 LLM 请求响应 (可能含工具调用)executeFunction(toolCall) 工具执行结果发送工具结果最终响应 BlockOutputhandleNodeCompletion(output)edgeManager.processOutgoingEdges()获取新的就绪节点[node2, node3...]ExecutionResult
4.3 核心类图 (Class Diagram) typeScript
DAGBuilder
-pathConstructor
-loopConstructor
-nodeConstructor
-edgeConstructor
+build(SerializedWorkflow, triggerBlockId) : DAG
DAG
+nodes: Map<string, DAGNode>
+loopConfigs: Map<string, SerializedLoop>
DAGNode
+id: string
+block: SerializedBlock
+incomingEdges: Set<string>
+outgoingEdges: Map<string, DAGEdge>
+metadata: NodeMetadata
ExecutionEngine
-readyQueue: string[]
-executing: Set<Promise>
-context: ExecutionContext
-dag: DAG
+run(triggerBlockId) : Promise<ExecutionResult>
-processQueue()
-executeNodeAsync(nodeId)
-handleNodeCompletion()
-buildPausedResult()
ExecutionContext
+executedBlocks: Set<string>
+blockLogs: any[]
+metadata: ExecutionMetadata
+workflowVariables: Map
+pendingDynamicNodes: string[]
NodeExecutionOrchestrator
+executeNode(ctx, nodeId) : Promise<NodeResult>
AgentBlockHandler
+canHandle(block) : boolean
+execute(ctx, block, inputs) : Promise<BlockOutput>
-formatTools(tools)
-buildMessages(inputs)
-executeProviderRequest()
5. 核心函数解析
5.1 DAGBuilder.build() - DAG 编译核心 这是将用户定义的静态工作流编译成可执行 DAG 的核心过程,体现了'构造'而非'解释'的设计思想。
export class DAGBuilder {
build (workflow : SerializedWorkflow , triggerBlockId ?: string , savedIncomingEdges ?: Record <string , string []>): DAG {
const dag : DAG = {
nodes : new Map (),
loopConfigs : new Map (),
parallelConfigs : new Map ()
};
this .initializeConfigs (workflow, dag);
const reachableBlocks = this .pathConstructor .execute (workflow, triggerBlockId);
this .loopConstructor .execute (dag, reachableBlocks);
const { blocksInLoops, blocksInParallels, pauseTriggerMapping } = this .nodeConstructor .execute (workflow, dag, reachableBlocks);
this .edgeConstructor .execute (workflow, dag, blocksInParallels, blocksInLoops, reachableBlocks, pauseTriggerMapping);
if (savedIncomingEdges){
for (const [nodeId, incomingEdgeArray]of Object .entries (savedIncomingEdges)){
const node = dag.nodes .get (nodeId);
if (node){ node.incomingEdges = new Set (incomingEdgeArray);
}
}
return dag;
}
}
技术要点 :这是一个多阶段的编译器前端。pathConstructor 确保 DAG 只包含必要节点;loopConstructor 和 edgeConstructor 协同工作,将高级控制流语义'降低'为基本 DAG 结构;最后对 savedIncomingEdges 的处理,是支持状态恢复的关键,它直接修改了 DAG 节点的运行时依赖状态。
5.2 ExecutionEngine.run() - 执行调度核心 这是运行时的心脏,一个基于队列的异步调度器,负责驱动整个 DAG 的执行。
export class ExecutionEngine {
async run (triggerBlockId ?: string ): Promise <ExecutionResult > {
const startTime = Date .now ();
try {
this .initializeQueue (triggerBlockId);
while (this .hasWork ()){
await this .processQueue ();
}
await this .waitForAllExecutions ();
return { success : true , output : this .finalOutput , ...};
} catch (error) {
const executionResult : ExecutionResult = { success : false , ...};
if (error && typeof error === 'object' ){
(error as any ).executionResult = executionResult;
}
throw error;
}
}
private async processQueue (): Promise <void > {
while (this .readyQueue .length > 0 ){
const nodeId = this .dequeue ();
const promise = this .executeNodeAsync (nodeId);
this .trackExecution (promise);
}
if (this .executing .size > 0 ){
await this .waitForAnyExecution ();
}
}
private async executeNodeAsync (nodeId : string ): Promise <void > {
const wasAlreadyExecuted = this .context .executedBlocks .has (nodeId);
const result = await this .nodeOrchestrator .executeNode (this .context , nodeId);
if (!wasAlreadyExecuted){
await this .withQueueLock (async ()=>{
await this .handleNodeCompletion (nodeId, result.output , result.isFinalOutput );
});
}
}
private async handleNodeCompletion (nodeId : string , output : NormalizedBlockOutput , isFinalOutput : boolean ): Promise <void > {
const node = this .dag .nodes .get (nodeId);
if (output._pauseMetadata ){
this .pausedBlocks .set (pauseMetadata.contextId , output._pauseMetadata );
this .context .metadata .status = 'paused' ;
return ;
}
await this .nodeOrchestrator .handleNodeCompletion (this .context , nodeId, output);
const readyNodes = this .edgeManager .processOutgoingEdges (node, output, false );
this .addMultipleToQueue (readyNodes);
if (this .context .pendingDynamicNodes ?.length > 0 ){
this .addMultipleToQueue (this .context .pendingDynamicNodes );
this .context .pendingDynamicNodes = [];
}
}
}
队列驱动 :核心模型是持续消耗 readyQueue,并通过 edgeManager 的生产者 - 消费者模型。
并发控制 :通过 executing Set 和 Promise.race/Promise.all 管理并发度,实现自然的并行执行。
原子性更新 :withQueueLock 确保在处理节点完成、更新边状态、向队列添加新节点这一系列操作时,不会产生竞态条件。
暂停机制 :在 handleNodeCompletion 中优先检查 _pauseMetadata,一旦发现即停止推进,并构建暂停结果。这是一种'协作式'的中断。
5.3 AgentBlockHandler 中的工具格式化 展示了 Sim 如何将多样化的工具抽象为 LLM 可用的统一接口,这是其扩展性的关键。
private async formatTools (ctx : ExecutionContext , inputTools : ToolInput []): Promise <any []> {
const mcpTools : ToolInput [] = [];
const otherTools : ToolInput [] = [];
for (const tool of filtered){
if (tool.type === 'mcp' ){ mcpTools.push (tool); }
else { otherTools.push (tool); }
}
const otherResults = await Promise .all (otherTools.map (async (tool)=>{
if (tool.type === 'custom-tool' ){ return await this .createCustomTool (ctx, tool);
return this .transformBlockTool (ctx, tool);
}));
const mcpResults = await this .processMcpToolsBatched (ctx, mcpTools);
return [...otherResults,...mcpResults].filter (tool => tool != null );
}
private async createCustomTool (ctx : ExecutionContext , tool : ToolInput ): Promise <any > {
const toolId = `${AGENT.CUSTOM_TOOL_PREFIX} ${title} ` ;
return {
id : toolId,
name : schema.function .name ,
description : schema.function .description || '' ,
parameters : filteredSchema,
params : userProvidedParams,
usageControl : tool.usageControl || 'auto' ,
executeFunction : async (callParams : Record <string ,any >)=>{
const mergedParams = mergeToolParameters (userProvidedParams, callParams);
const result = await executeTool ('function_execute' , {
code,
mergedParams,
timeout : tool.timeout ,
envVars : ctx.environmentVariables || {},
workflowVariables : ctx.workflowVariables || {},
}, false , false , ctx);
if (!result.success ){ throw new Error (result.error ); }
return result.output ;
}
};
}
统一包装 :无论底层是代码片段、HTTP API 还是 MCP 协议,最终都包装成具有 executeFunction 方法的对象。
参数合并 :支持在工具配置时提供预设参数(params),并与 LLM 调用时产生的参数(callParams)安全合并。
上下文注入 :执行时自动注入工作流变量、环境变量等上下文(ctx),使工具能感知工作流状态。
MCP 优化 :processMcpToolsBatched 体现了对性能的考虑,通过按服务器分组发现工具,减少连接开销。
总结 Sim 项目的技术核心在于一个精心设计的 DAG 编译与执行引擎 。它成功地将可视化编程的易用性与底层执行引擎的严谨性(状态管理、并发控制、错误处理)结合起来。其架构清晰,通过构造器模式编译工作流,通过队列和 Promise 管理异步执行,通过统一的处理器接口集成多样化功能。代码中体现的'快照恢复'、'工具抽象'和'混合执行模式支持'是应对实际生产环境复杂性的关键设计,使其超越了简单的原型工具范畴,具备了支撑企业级 AI 应用的能力。
同类方案中,相较于 LangChain 等代码库,Sim 提供了更低门槛的可视化界面和开箱即用的状态管理;相较于 n8n 等通用自动化工具,它在 AI 智能体、工具集成和提示工程方面做了更深度的原生集成。其主要的考量点可能在于应对超大规模、高并发工作流时的性能优化,以及更复杂嵌套控制流的场景支持。
相关免费在线工具 RSA密钥对生成器 生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
Mermaid 预览与可视化编辑 基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
随机西班牙地址生成器 随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
Markdown转HTML 将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online