可视化编排 + AI Copilot + 私有知识库:Sim如何打造下一代AI智能体开发平台?
simstudioai/sim:一个用于快速构建和部署AI智能体工作流的平台
1. 整体介绍
概要说明
项目地址:https://github.com/simstudioai/sim
项目简介:Sim 是一个基于有向无环图(DAG)的、声明式的低代码/无代码 AI 智能体工作流编排与执行引擎。它并非一个简单的“可视化工具”,其核心是一个高性能、支持复杂流程控制(如循环、并行、暂停恢复)的运行时系统。代码展示了其核心执行引擎、DAG 构建器和智能体(Agent)处理器。
面临问题、场景与人群:
- 问题:
- 编排复杂性:构建多步骤、具备条件判断、循环、并行及人工介入的 AI 应用逻辑复杂,传统代码编写工作量大,易出错。
- 状态管理困难:AI 工作流执行时间长、可能中断(如等待人工审批),需要可靠的持久化与恢复机制。
- 工具集成繁琐:需要将多种工具(API、函数、MCP 协议工具、自定义代码)统一封装,供 AI 智能体安全、便捷地调用。
- 开发效率瓶颈:从原型到生产部署周期长,需要兼顾可视化设计和底层执行性能。
- 目标人群:AI 应用开发者、产品经理、业务分析师,以及需要在业务流程中集成 AI 能力的企业团队。
- 核心场景:自动化客服、内容生成流水线、数据分析与报告、复杂的决策支持系统等需要串联多个 AI 步骤和人工审核的流程。
解决方法与演进:
- 传统方式:开发人员使用脚本(如 Python)硬编码流程逻辑,耦合度高,状态管理需自行实现,可视化与执行引擎分离。
- Sim 新方式:
- 声明式建模:将工作流抽象为 DAG,节点(Block)声明其功能,边(Edge)声明依赖关系。执行引擎负责解析 DAG 并调度。
- 可视化即代码:画布操作直接生成可序列化的 DAG 描述(
SerializedWorkflow),设计与执行使用同一套数据模型。 - 内置复杂控制流:通过
LoopConstructor,ParallelConstructor等将高级语义(循环、并行)编译为底层 DAG 结构。 - 执行快照与恢复:
ExecutionEngine支持将运行时状态(如pendingBlocks,remainingEdges)序列化,实现“暂停/恢复”。
商业价值预估:
- 代码成本估算:构建一个具备类似 DAG 执行引擎、状态恢复、多工具集成能力的系统,需要一个 5-8 人的资深前端与后端团队约 9-12 个月,粗略人力成本在数百万人民币级别。
- 覆盖问题空间效益:Sim 通过提供标准化平台,将上述成本分摊给大量用户。它解决了 AI 应用开发中的 “编排”、“集成”、“部署” 三大核心痛点,覆盖了从原型验证到生产部署的全链路。其商业价值在于显著降低企业引入 AI 自动化的技术门槛和时间成本,潜在市场涵盖金融、教育、电商、客服等多个行业。其开源+托管模式,有助于快速建立生态,获取早期用户与场景反馈。
2. 详细功能拆解(产品+技术视角)
- 可视化 DAG 编辑器(产品层)
- 产品视角:用户通过拖拽 Block、连线来设计工作流,直观表达“先做什么,后做什么,在什么条件下分支”。
- 技术支撑:前端组件生成
SerializedWorkflow数据。核心是后端的DAGBuilder.build()方法,它将用户友好的序列化数据,编译(Construction)为内部可执行的DAG图结构,处理了循环展开、并行路径识别等复杂转换。
- 高性能 DAG 执行引擎(技术层)
- 产品视角:用户点击“运行”,工作流自动执行,支持实时日志、中途暂停。
- 技术支撑:
ExecutionEngine类是核心。它管理一个就绪队列(readyQueue)和正在执行的 Promise 集合(executing)。其run()方法循环检查“是否有工作”(hasWork()),从队列中取出就绪节点,通过NodeExecutionOrchestrator异步执行,并根据边条件(EdgeManager)将下游节点加入队列,实现并行推进。
- 状态持久化与恢复(技术层)
- 产品视角:工作流可以暂停(如等待人工审批),之后从断点继续执行,状态不丢失。
- 技术支撑:这是关键技术。
ExecutionEngine在initializeQueue()时检查是否有来自快照的pendingBlocks或remainingEdges,并据此恢复队列和节点依赖状态。AgentBlockHandler中的_pauseMetadata和buildPausedResult()方法共同实现了暂停点的创建与快照生成。
- 多工具集成与智能体调度(技术层)
- 产品视角:在 Agent Block 中可配置 LLM 模型、提示词,并勾选各种工具(数据库查询、API、自定义函数)供其调用。
- 技术支撑:
AgentBlockHandler是复杂度最高的处理器之一。它负责:
a. 工具格式化:将配置的工具(ToolInput)统一格式化为 LLM 可识别的工具调用模式(formatTools)。
b. MCP 工具处理:支持 Model Context Protocol 工具,包含缓存 Schema、服务发现、批量处理等优化(processMcpToolsBatched)。
c. 上下文构建:整合系统提示、用户输入、记忆(Memory)服务,构建完整的对话历史(buildMessages)。
d. 供应商抽象:将请求统一发送给executeProviderRequest,屏蔽不同 AI 供应商(OpenAI, Anthropic, 本地 Ollama 等)的 API 差异。
3. 技术难点挖掘
- 动态 DAG 的构建与执行:如何将用户定义的循环、并行等高级结构,正确无误地编译为静态 DAG,并确保执行时的语义正确性(如循环的迭代、并发的扇入扇出)。
- 有状态工作流的暂停与精确恢复:如何在任意节点(特别是并行分支中)暂停,并序列化所有中间状态(变量、未完成的边、待执行节点队列),保证恢复后执行结果的一致性。
- 工具调用的安全性与性能:如何安全地执行用户自定义代码(Function Block)、动态发现和调用外部工具(MCP),同时管理连接、超时和错误,并避免对 LLM 请求造成性能瓶颈。
- 混合执行模式的支持:需要同时支持非流式响应、流式响应(Streaming),并在浏览器和服务器两种环境下都能稳定工作,对网络通信和错误处理要求高。
4. 详细设计图
4.1 核心架构图 (Component Diagram)
4.2 核心链路序列图:工作流执行 (Sequence Diagram)
AI ProviderAgentBlockHandlerNodeOrchestratorDAGExecutionEngine用户AI ProviderAgentBlockHandlerNodeOrchestratorDAGExecutionEngine用户alt[需要调用工具]loop[处理就绪队列]run(workflowId)initializeQueue()get ready nodes[node1]executeNode(node)execute(ctx, block, inputs)formatTools(), buildMessages()携带工具的LLM请求响应(可能含工具调用)executeFunction(toolCall)工具执行结果发送工具结果最终响应BlockOutputhandleNodeCompletion(output)edgeManager.processOutgoingEdges()获取新的就绪节点[node2, node3...]ExecutionResult
4.3 核心类图 (Class Diagram)
构建
持有
持有
委托执行
路由到
DAGBuilder
-pathConstructor
-loopConstructor
-nodeConstructor
-edgeConstructor
+build(SerializedWorkflow, triggerBlockId) : DAG
DAG
+nodes: Map<string, DAGNode>
+loopConfigs: Map<string, SerializedLoop>
DAGNode
+id: string
+block: SerializedBlock
+incomingEdges: Set<string>
+outgoingEdges: Map<string, DAGEdge>
+metadata: NodeMetadata
ExecutionEngine
-readyQueue: string[]
-executing: Set<Promise>
-context: ExecutionContext
-dag: DAG
+run(triggerBlockId) : Promise<ExecutionResult>
-processQueue()
-executeNodeAsync(nodeId)
-handleNodeCompletion()
-buildPausedResult()
ExecutionContext
+executedBlocks: Set<string>
+blockLogs: any[]
+metadata: ExecutionMetadata
+workflowVariables: Map
+pendingDynamicNodes: string[]
NodeExecutionOrchestrator
+executeNode(ctx, nodeId) : Promise<NodeResult>
AgentBlockHandler
+canHandle(block) : boolean
+execute(ctx, block, inputs) : Promise<BlockOutput>
-formatTools(tools)
-buildMessages(inputs)
-executeProviderRequest()
5. 核心函数解析
5.1 DAGBuilder.build() - DAG 编译核心
这是将用户定义的静态工作流编译成可执行 DAG 的核心过程,体现了“构造”而非“解释”的设计思想。
// apps/sim/executor/dag/builder.ts - 简化伪代码exportclassDAGBuilder{build(workflow: SerializedWorkflow, triggerBlockId?:string, savedIncomingEdges?: Record<string,string[]>):DAG{const dag:DAG={ nodes:newMap(), loopConfigs:newMap(), parallelConfigs:newMap()};// 阶段1:初始化配置this.initializeConfigs(workflow, dag);// 提取循环、并行配置// 阶段2:路径分析const reachableBlocks =this.pathConstructor.execute(workflow, triggerBlockId);// 关键:基于触发点或起点,计算工作流中实际可达的所有区块,实现“按需执行”。// 阶段3:循环结构展开this.loopConstructor.execute(dag, reachableBlocks);// 关键:将逻辑上的循环节点,在DAG中展开为实际的节点和边结构,为执行引擎创造明确的路径。// 阶段4:节点实例化与元数据标记const{ blocksInLoops, blocksInParallels, pauseTriggerMapping }=this.nodeConstructor.execute(workflow, dag, reachableBlocks);// 关键:为每个SerializedBlock创建对应的DAGNode,并标记其元数据(如是否在循环内、是否为恢复触发点等)。// 阶段5:边构建this.edgeConstructor.execute(workflow, dag, blocksInParallels, blocksInLoops, reachableBlocks, pauseTriggerMapping);// 关键:根据区块间的连接关系,创建DAGEdge,并可能根据循环/并行上下文调整边的源和目标。// 阶段6:状态恢复(如从快照恢复)if(savedIncomingEdges){for(const[nodeId, incomingEdgeArray]of Object.entries(savedIncomingEdges)){const node = dag.nodes.get(nodeId);if(node){ node.incomingEdges =newSet(incomingEdgeArray);// 恢复节点的依赖边状态}}}return dag;}}技术要点:这是一个多阶段的编译器前端。pathConstructor 确保 DAG 只包含必要节点;loopConstructor 和 edgeConstructor 协同工作,将高级控制流语义“降低”为基本 DAG 结构;最后对 savedIncomingEdges 的处理,是支持状态恢复的关键,它直接修改了 DAG 节点的运行时依赖状态。
5.2 ExecutionEngine.run() - 执行调度核心
这是运行时的心脏,一个基于队列的异步调度器,负责驱动整个 DAG 的执行。
// apps/sim/executor/execution/engine.ts - 简化伪代码exportclassExecutionEngine{asyncrun(triggerBlockId?:string):Promise<ExecutionResult>{const startTime = Date.now();try{// 1. 队列初始化:可能从快照恢复,或从触发点/起点开始this.initializeQueue(triggerBlockId);// 2. 主调度循环:只要有待处理工作就继续while(this.hasWork()){// 检查 readyQueue 或 executingawaitthis.processQueue();// 处理就绪节点,并等待任意一个执行完成}// 3. 收尾:等待所有异步执行彻底结束awaitthis.waitForAllExecutions();// 4. 构建成功结果return{ success:true, output:this.finalOutput,...};}catch(error){// 5. 错误处理:包装错误信息,保留执行上下文const executionResult: ExecutionResult ={ success:false,...};if(error &&typeof error ==='object'){(error asany).executionResult = executionResult;// 将结果附加到原始错误上,便于调试}throw error;// 重新抛出,由上层捕获}}privateasyncprocessQueue():Promise<void>{// 批量执行所有当前就绪的节点(实现隐式并行)while(this.readyQueue.length >0){const nodeId =this.dequeue();const promise =this.executeNodeAsync(nodeId);// 异步执行,不等待this.trackExecution(promise);// 加入执行跟踪集合}// 关键:等待至少一个正在执行的任务完成,以释放资源并可能产生新的就绪节点if(this.executing.size >0){awaitthis.waitForAnyExecution();// 使用 Promise.race}}privateasyncexecuteNodeAsync(nodeId:string):Promise<void>{const wasAlreadyExecuted =this.context.executedBlocks.has(nodeId);const result =awaitthis.nodeOrchestrator.executeNode(this.context, nodeId);if(!wasAlreadyExecuted){// 关键:节点完成后的回调,需要加锁(queueLock)确保状态更新原子性awaitthis.withQueueLock(async()=>{awaitthis.handleNodeCompletion(nodeId, result.output, result.isFinalOutput);});}}privateasynchandleNodeCompletion(nodeId:string, output: NormalizedBlockOutput, isFinalOutput:boolean):Promise<void>{const node =this.dag.nodes.get(nodeId);// 关键点1: 检查暂停if(output._pauseMetadata){this.pausedBlocks.set(pauseMetadata.contextId, output._pauseMetadata);this.context.metadata.status ='paused';return;// 不再处理下游,工作流进入暂停状态}// 关键点2: 处理节点输出(如变量赋值)awaitthis.nodeOrchestrator.handleNodeCompletion(this.context, nodeId, output);// 关键点3: 边处理,激活下游const readyNodes =this.edgeManager.processOutgoingEdges(node, output,false);this.addMultipleToQueue(readyNodes);// 下游节点进入就绪队列// 关键点4: 处理动态节点(如并行展开新增的节点)if(this.context.pendingDynamicNodes?.length >0){this.addMultipleToQueue(this.context.pendingDynamicNodes);this.context.pendingDynamicNodes =[];}}}技术要点:
- 队列驱动:核心模型是持续消耗
readyQueue,并通过edgeManager的生产者-消费者模型。 - 并发控制:通过
executingSet 和Promise.race/Promise.all管理并发度,实现自然的并行执行。 - 原子性更新:
withQueueLock确保在处理节点完成、更新边状态、向队列添加新节点这一系列操作时,不会产生竞态条件。 - 暂停机制:在
handleNodeCompletion中优先检查_pauseMetadata,一旦发现即停止推进,并构建暂停结果。这是一种“协作式”的中断。
5.3 AgentBlockHandler 中的工具格式化
展示了 Sim 如何将多样化的工具抽象为 LLM 可用的统一接口,这是其扩展性的关键。
// apps/sim/executor/handlers/agent/agent-handler.ts - 节选privateasyncformatTools(ctx: ExecutionContext, inputTools: ToolInput[]):Promise<any[]>{// ... 过滤 usageControl 等 ...const mcpTools: ToolInput[]=[];const otherTools: ToolInput[]=[];// 1. 分类工具for(const tool of filtered){if(tool.type ==='mcp'){ mcpTools.push(tool);}else{ otherTools.push(tool);}}// 2. 并行处理非MCP工具(自定义工具、内置工具转换)const otherResults =awaitPromise.all(otherTools.map(async(tool)=>{if(tool.type ==='custom-tool'){returnawaitthis.createCustomTool(ctx, tool);// 创建可执行函数}returnthis.transformBlockTool(ctx, tool);// 转换内置工具(如API块)}));// 3. 批量处理MCP工具(性能优化)const mcpResults =awaitthis.processMcpToolsBatched(ctx, mcpTools);// 4. 合并并返回return[...otherResults,...mcpResults].filter(tool => tool !=null);}privateasynccreateCustomTool(ctx: ExecutionContext, tool: ToolInput):Promise<any>{// ... 获取schema和code ...const toolId =`${AGENT.CUSTOM_TOOL_PREFIX}${title}`;return{ id: toolId, name: schema.function.name, description: schema.function.description ||'', parameters: filteredSchema,// 过滤掉已由params提供的参数 params: userProvidedParams,// 预设参数 usageControl: tool.usageControl ||'auto',// 关键:定义执行函数,在LLM调用时被执行executeFunction:async(callParams: Record<string,any>)=>{const mergedParams =mergeToolParameters(userProvidedParams, callParams);const result =awaitexecuteTool('function_execute',{ code,// 用户定义的函数代码...mergedParams, timeout: tool.timeout, envVars: ctx.environmentVariables ||{}, workflowVariables: ctx.workflowVariables ||{},},false,false, ctx);if(!result.success){thrownewError(result.error);}return result.output;}};}技术要点:
- 统一包装:无论底层是代码片段、HTTP API 还是 MCP 协议,最终都包装成具有
executeFunction方法的对象。 - 参数合并:支持在工具配置时提供预设参数(
params),并与 LLM 调用时产生的参数(callParams)安全合并。 - 上下文注入:执行时自动注入工作流变量、环境变量等上下文(
ctx),使工具能感知工作流状态。 - MCP 优化:
processMcpToolsBatched体现了对性能的考虑,通过按服务器分组发现工具,减少连接开销。
总结
Sim 项目的技术核心在于一个精心设计的 DAG 编译与执行引擎。它成功地将可视化编程的易用性与底层执行引擎的严谨性(状态管理、并发控制、错误处理)结合起来。其架构清晰,通过构造器模式编译工作流,通过队列和 Promise 管理异步执行,通过统一的处理器接口集成多样化功能。代码中体现的“快照恢复”、“工具抽象”和“混合执行模式支持”是应对实际生产环境复杂性的关键设计,使其超越了简单的原型工具范畴,具备了支撑企业级 AI 应用的能力。
同类方案中,相较于 LangChain 等代码库,Sim 提供了更低门槛的可视化界面和开箱即用的状态管理;相较于 n8n 等通用自动化工具,它在 AI 智能体、工具集成和提示工程方面做了更深度的原生集成。其主要的考量点可能在于应对超大规模、高并发工作流时的性能优化,以及更复杂嵌套控制流的场景支持。