【LangChain1.0】第一阶段:架构全景、Runnable 协议与 LCEL 声明式语法解析

第一阶段:架构全景、Runnable 协议与 LCEL 声明式语法解析

版本要求: 本教程基于 LangChain 1.0.7+、LangGraph 1.0.3+、Python 3.10+
更新日期: 2025-12

📋 前置准备

环境配置

在开始学习之前,请确保完成以下环境配置:

1. Python 版本
python --version # 需要 Python 3.10 或更高版本
2. 安装依赖
# 使用 pip 安装最新版本 pip install langchain langchain-openai langgraph langchain-community # 或使用 uv (推荐) uv pip install langchain langchain-openai langgraph langchain-community # 如需指定版本(推荐使用1.0.7或更高版本) pip install langchain>=1.0.7 langchain-openai>=1.0.3 langgraph>=1.0.3 
3. 环境变量配置
# 创建 .env 文件 OPENAI_API_KEY=sk-your-api-key-here LANGSMITH_API_KEY=your-langsmith-key # 可选,用于监控 LANGSMITH_TRACING=true # 可选# 在代码中加载from dotenv import load_dotenv import os load_dotenv()# 验证环境变量 required_vars =["OPENAI_API_KEY"]for var in required_vars:ifnot os.getenv(var):raise EnvironmentError(f"缺少必需的环境变量: {var}")
4. 依赖版本清单
# pyproject.toml 推荐配置 [tool.poetry.dependencies] python = "^3.10" langchain = "^1.0.7" langchain-openai = "^1.0.3" langgraph = "^1.0.3" langchain-community = "^0.3.0" langchain-core = "^1.0.7" langsmith = "^0.4.43" python-dotenv = "^1.0.0" # requirements.txt 格式 # langchain>=1.0.7 # langchain-openai>=1.0.3 # langgraph>=1.0.3 # langchain-community>=0.3.0 # langchain-core>=1.0.7 # langsmith>=0.4.43 # python-dotenv>=1.0.0 

前置知识

建议具备以下基础知识:

  • ✅ Python 基础 (async/await、类型注解、装饰器)
  • ✅ LLM 基本概念 (Prompt、Token、Temperature等)
  • ✅ API 调用基础
  • ✅ JSON 数据格式

第1章:LangChain 生态全景


1.1 架构层次关系

应用层
Deep Agents / LangGraph Projects

编排层
LangGraph

链路层
LangChain / LCEL

监控层
LangSmith

外部资源
Models / APIs / Tools

LangChain 生态系统目前已形成“多层协同”的架构体系,既可支持快速原型开发,也可支撑生产级 LLM 应用。整体结构如下:

层级核心组件职责定位典型场景
应用层Deep Agents / LangGraph Projects复杂自治 Agent、长期运行、多 Agent 协作智能助手、自动化任务系统
编排层LangGraph状态化流程控制、节点执行、分支循环多 Agent 编排、可视化状态流
链路层LangChain / LCEL模型调用、提示管理、工具集成RAG、问答、对话
监控层LangSmith调试、观测、评估、成本追踪DevOps、Evals、质量监控
1.1.1 LangChain 与 LangGraph 的关系

LangChain 专注于 链式逻辑与 Agent 封装;LangGraph 专注于 流程编排与状态管理

  • LangChain: 用于构建单条或线性 chain(Prompt→Model→Tool→Output)。
  • LangGraph: 用于管理含分支、循环、并发的复杂流程(可视化、持久化状态)。
  • 二者可并用:LangGraph 中的节点可运行 LangChain 或 LCEL 构造的 chain。
图 1-2 LangChain 与 LangGraph 协作关系图

LangChain Chain
(Prompt→Model→Tool→Output)

LangGraph Node

LangGraph Flow
(多节点编排 / 状态持久化)

LangGraph Studio(可视化与监控)

1.1.2 如何构建 Agent

LangChain 1.0+ 提供统一的 Agent 构建接口:create_agent

快速开始:创建你的第一个 Agent

from langchain.agents import create_agent from langchain_openai import ChatOpenAI from langchain_core.tools import tool # 步骤1: 定义工具@tooldefget_weather(city:str)->str:"""获取指定城市的天气"""returnf"{city}今天天气晴朗,温度25°C"@tooldefcalculate(expression:str)->str:"""计算数学表达式"""try: result =eval(expression)returnf"计算结果: {result}"except Exception as e:returnf"计算错误: {str(e)}"# 步骤2: 创建 Agent agent = create_agent( model=ChatOpenAI(model="gpt-4"), tools=[get_weather, calculate], system_prompt="你是一个有帮助的助手,可以查询天气和进行计算。")# 步骤3: 运行 Agent result = agent.invoke({"messages":[("user","北京天气如何?另外帮我算一下 25 * 4")]})# 查看结果print(result["messages"][-1].content)

输出示例

北京今天天气晴朗,温度25°C。 25 * 4 的计算结果是 100。 

核心概念

create_agent 的工作原理

用户输入

Agent 接收

LLM 分析

需要工具?

调用工具

获取结果

生成回复

返回用户

关键参数说明

参数类型必需说明
modelChatModel | str使用的语言模型
toolsList[Tool]可用的工具列表
system_promptstr系统提示词,定义 Agent 行为
checkpointerCheckpointer状态持久化(用于多轮对话)
interrupt_beforeList[str]在指定节点前暂停(需要人工确认)
interrupt_afterList[str]在指定节点后暂停

完整工作流程

  1. 模型绑定:指定使用的 LLM(如 GPT-4、Claude 等)
  2. 工具注册:提供 Agent 可调用的工具集合
  3. 提示配置:通过 system_prompt 定义 Agent 的角色和行为
  4. 决策执行:LLM 基于 ReAct 模式自动决定是否调用工具
  5. 结果返回:自动组合工具输出和 LLM 回复
  6. 监控追踪:集成 LangSmith 实现全链路追踪

关键特性

  • 官方推荐:LangChain 1.0+ 标准 API
  • 简洁易用:统一的接口,3步即可创建 Agent
  • 完整功能:支持 middleware、cache、checkpointer
  • 自动工具调用:LLM 自动判断何时使用哪个工具
  • 多轮对话:支持状态持久化,实现上下文记忆
  • 长期支持:官方维护,持续更新

Prompt Template

LLM / ChatModel

Tool Selection

Tool Execution

Parser / Output Formatter

返回结果

LangSmith Callback / Tracing


1.1.3 LCEL 的定位与作用

LCEL(LangChain Expression Language)是 LangChain 的“声明式组合语法”,用于 构建可并行、可流式、可追踪的 Runnable 链

  • 核心概念:
    • RunnableSequence 顺序执行;
    • RunnableParallel 并行执行;
    • 支持 async / stream / batch 统一调用;
    • 可直接嵌入 LangGraph 节点。
  • 价值: 在代码层面构建“数据流管线”,如同 Node-RED 或 Airflow 的轻量化实现。

输入数据

RunnableSequence(顺序执行)

RunnableParallel(并行执行)

模型推理 / 工具调用

流式输出 / 结构化解析

1.1.4 LangSmith 的监控职责

LangSmith 是 LangChain 官方推出的可观测性与质量评估平台。

主要职责:

  • 🔍 Tracing :追踪 Chain/Graph/Agent 每个调用节点。
  • 📈 Metrics :监控延迟、Token 用量、错误率、成本。
  • 🧪 Evaluation :对模型或 Agent 输出进行打分与对比。
  • ⚙️ Integration :与 LangChain 、LangGraph 、Deep Agents 原生集成。

LangSmith

Tracing
链路追踪

Metrics
性能&成本

Evals
模型评估

Dashboard / Report

开发者 / 团队协作

1.2 核心设计理念

LangChain Design

Provider-Agnostic

多模型兼容

快速切换

Runnable Protocol

统一接口

可组合执行

Middleware Driven

Hook/Callback

Metrics/Retry

Production First

稳定性

可观测性

成本控制

1.2.1 Provider-Agnostic 设计

LangChain 通过统一接口屏蔽 LLM 提供商差异(OpenAI、Anthropic、Cohere、Azure 等),
以 “Provider 无关” 的方式构建应用。

  • 模型切换无需修改上层逻辑。
  • 支持跨平台成本追踪与性能比较。
1.2.2 Runnable Protocol 统一抽象

Runnable 是 LangChain 的核心执行协议:

一切皆 Runnable。

包括 Chain、Agent、Tool、Prompt 均实现该接口。

  • 统一执行入口:invoke()ainvoke()stream()
  • 支持异步、批量、流式、可追踪调用。
  • 所有 Runnable 可嵌套、组合、装饰。

Runnable

Chain

Agent

Tool

Prompt

LCEL 组合结构

1.2.3 Middleware-Driven 架构

LangChain 支持 Callback / Hook / Tracing 机制,可在执行流中插入中间件。

常见中间件用途:

  • Token 计数与成本监控
  • 日志与错误追踪
  • 安全审查与访问控制
  • 重试与超时控制

LangSmithMiddlewareChain/AgentUserLangSmithMiddlewareChain/AgentUser调用执行进入中间件 (token计数/日志)上报监控数据返回监控结果执行主流程返回输出结果

1.2.4 Production-First 理念

LangChain 1.0 及 LangGraph 1.0 发布后,生态全面转向 生产级稳定性与可观测性
核心目标包括:

  • 长期兼容(向 2.0 平滑过渡)
  • 成本可控(LangSmith 监控 + 自动计费)
  • 模型热替换(Provider-agnostic)
  • 完整 CI/CD 与 Evals 集成

开发阶段
LangChain Prototype

测试阶段
LangSmith 调试

部署阶段
LangGraph 编排

监控阶段
Metrics / Evals

持续优化
模型&提示调整

1.3 技术选型决策树

应用需求评估

流程是否复杂?

使用 create_agent
LangChain 快速原型

是否需要状态管理?

使用 LCEL 构建 chain

是否为长期运行/自治?

使用 LangGraph 编排

使用 Deep Agents
结合 LangSmith 监控

1.3.1 何时使用 create_agent

适用场景:

  • 单 Agent 执行,流程线性;
  • 需要快速实现 Tool 调用;
  • 用于 RAG 、问答、助手类场景。

1.3.2 何时深入 LangGraph

适用场景:

  • 多 Agent 协作;
  • 存在分支 / 循环 / 状态管理;
  • 需可视化、可调试、持久化运行。

1.3.3 何时使用 Deep Agents

适用场景:

  • 长期运行、自主决策 Agent;
  • 复杂任务拆解、子 Agent 管理;
  • 持续任务执行与周期性触发。

1.3.4 何时需要 Middleware

适用场景:

  • 生产环境运行;
  • 需要日志、指标、安全控制、回调。
    推荐:所有 Chain/Agent 均启用 LangSmith Tracing + 自定义 Callback。

1.3.5 典型应用场景分析

场景推荐技术理由
A. 企业文档问答create_agent + LCEL快速构建 RAG 问答
B. 智能客服系统LangChain Agent + Middleware需多轮对话与监控
C. 自动化任务管理LangGraph + Deep Agents + LangSmith复杂 workflow + 自治 agent
D. 内容摘要或转换LCEL轻量、高并行、可流式

本章小结

LangChain 生态体系可概括为:

链式逻辑(LangChain) → 图式编排(LangGraph) → 监控评估(LangSmith) → 自治进化(Deep Agents)

核心理念:

  • Provider-Agnostic
  • Runnable 统一抽象
  • Middleware 可插架构
  • Production-First 部署思维

设计哲学上,从“玩具原型”走向“生产可观测”的工程系统。

思考与练习

1. 练习 1:业务场景技术选型决策路径

业务场景: 企业级智能客服系统(基于私有知识库的 RAG 系统)

技术选型决策路径图
  1. 数据接入层 (Ingestion)
    • 数据源: PDF 还是 Wiki? →\rightarrow→决定 Loader (PyPDFLoader vs. ConfluenceLoader)
    • 更新频率: 实时还是批处理? →\rightarrow→决定 Indexing API 策略
  2. 处理层 (Transformation)
    • 文本分割: 语义分割还是字符分割? →\rightarrow→选 RecursiveCharacterTextSplitter
    • 向量化: 开源 (BGE) 还是闭源 (OpenAI)? →\rightarrow→平衡成本与隐私
  3. 存储与检索层 (Retrieval)
    • 向量库: 毫秒级搜索 (Pinecone) 还是本地部署 (Chroma/Milvus)?
    • 检索策略: 向量搜索还是混合搜索 (Hybrid Search)? →\rightarrow→引入 BM25 回排
  4. 应用层 (Augmentation & Generation)
    • 对话管理: 是否需要长短期记忆? →\rightarrow→选 LangGraph Checkpointer
    • 推理模型: 复杂任务 (GPT-4o) 还是简单总结 (Llama-3-8B)?
2. 练习 2:LCEL 例程与 Middleware 插入点

这是一个标准的 LCEL 链式调用,模拟“根据关键词搜索并生成专业报告”的过程。

from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough # 1. 定义组件 model = ChatOpenAI(model="gpt-4o") prompt = ChatPromptTemplate.from_template("请针对以下主题编写一份专业报告: {topic}") parser = StrOutputParser()# 2. 构建 LCEL 链# 插入点 A: 在这里插入 Logging Middleware (Log输入参数) chain =({"topic": RunnablePassthrough()}| prompt # 插入点 B: 在这里插入 Retry/Fallback (容错处理)| model.with_retry(stop_after_attempt=3)# 插入点 C: 在这里插入 Output Validator (检查输出合法性)| parser )# 3. Middleware 标注说明# - A. Input Logger: 用于在 LangSmith 中记录原始用户的 Query。# - B. Fallback/Retry: 当 LLM 达到 Rate Limit 或 API 报错时,自动切换到备份模型或重试。# - C. Config/Callbacks: 插入 tracer 或 token 计数器。
3. 练习 3:长期运行 Agent 设计 (市场监控系统)
架构设计 (LangGraph + LangSmith)
A. LangGraph 流程节点
  1. Start 节点:初始化监控清单(行业关键词、竞争对手)。
  2. Search 节点:调用搜索工具获取最新新闻/数据。
  3. Analyze 节点:LLM 评估数据价值(是否有重要异动)。
  4. Router (条件分支)
    • 如果有价值 →\rightarrow→Report 节点(生成摘要并推送)。
    • 如果无价值 →\rightarrow→Sleep 节点(进入等待状态)。
  5. End 节点:归档当次扫描日志。
B. 监控与 Evals (LangSmith 实现)
  • 状态追踪 (Traces):通过 LangSmith 监控 Agent 在 SearchAnalyze 节点的延迟,识别哪个环节最耗时。
  • 持久化 (Persistence):利用 LangGraph 的 checkpointer 将 Agent 的状态存储在数据库中,确保即使服务器重启,市场监控任务也能从中断点恢复。
  • 评估 (Evals)
    • 单元测试:针对 Analyze 节点,构建一个包含“真实异动”和“噪音数据”的数据集。
    • 反馈回路:在推送报告后,收集用户“有用/无用”的点赞反馈,并同步到 LangSmith 标注集中,用于微调 Prompt。
4. 思考题:LCEL 嵌套在 LangGraph 节点中的优势与代价
优势 (Advantages)
  1. 细粒度控制:在 LangGraph 的单个节点内,你可以使用 LCEL 快速构建复杂的“小链条”(如:Prompt →\rightarrow→ LLM →\rightarrow→ Parser),保持节点代码的简洁。
  2. 标准接口:LCEL 组件原生支持流式输出 (Streaming) 和异步调用 (Async),这使得 LangGraph 节点能自动继承这些特性。
  3. 高度可组合性:可以轻松地在不同节点间复用相同的 LCEL 片段(例如:通用的格式化器)。
代价 (Trade-offs)
  1. 调试深度增加:嵌套过深会导致 LangSmith 中的 Trace 层级非常复杂,排查报错时需要层层展开。
  2. 状态管理混乱:LCEL 倾向于隐式传递数据,而 LangGraph 强调显式的 State 更新。如果在节点内过度使用 LCEL 修改数据,可能会导致全局状态更新不透明。
  3. 性能开销:每一层 LCEL 封装都会带来微小的运行抽象开销,在对延迟极其敏感的场景下需谨慎。

第2章:核心抽象:Runnable 与 LCEL

2.1 Runnable Protocol

2.1.1 为什么需要统一抽象

在 LangChain 1.0 之前,不同组件(Prompt、Model、Tool、Chain)的调用方式各不相同,导致:

  • 接口不一致:学习成本高,难以组合
  • 缺乏标准化:无法统一追踪、监控
  • 组合困难:不同组件难以嵌套使用

Runnable Protocol 解决方案

LangChain 1.0 引入 Runnable 作为统一执行协议,所有组件均实现该接口:

from langchain_core.runnables import Runnable # 所有组件均实现 Runnable 接口classRunnable:definvoke(self,input, config=None):...# 同步调用defainvoke(self,input, config=None):...# 异步调用defstream(self,input, config=None):...# 流式输出defastream(self,input, config=None):...# 异步流式defbatch(self, inputs, config=None):...# 批量处理

核心优势

Runnable 统一抽象

一致的调用方式

可组合性

可追踪性

自动优化

降低学习成本

LCEL 管道

LangSmith 集成

批处理/并行

实际应用示例

from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI from langchain_core.output_parsers import StrOutputParser # 所有组件都是 Runnable prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}") model = ChatOpenAI() parser = StrOutputParser()# 统一的调用方式 result = prompt.invoke({"topic":"AI"}) result = model.invoke("Tell me a joke") result = parser.invoke("some text")

2.1.2 核心方法:invoke、stream、batch

invoke() - 同步调用

最基础的调用方式,适用于单次请求:

from langchain_openai import ChatOpenAI model = ChatOpenAI(model="gpt-4")# 同步调用 response = model.invoke("What is LangChain?")print(response.content)

执行流程

LangSmithLLMRunnableUserLangSmithLLMRunnableUserinvoke(input)开始追踪发送请求返回完整结果记录结果返回输出

stream() - 流式输出

适用于需要实时反馈的场景(如聊天界面):

# 流式输出for chunk in model.stream("Tell me a long story"):print(chunk.content, end="", flush=True)

流式输出的优势

  • ✅ 降低首字延迟(TTFT - Time To First Token)
  • ✅ 提升用户体验(实时显示)
  • ✅ 减少超时风险

User Request

Stream Token 1

Stream Token 2

Stream Token 3

...

Stream Complete

batch() - 批量处理

适用于批量请求场景,自动优化并发:

# 批量处理(自动并发优化) inputs =["What is AI?","What is ML?","What is LLM?"] results = model.batch(inputs)for result in results:print(result.content)

批量处理的优势

  • ✅ 自动并发控制
  • ✅ 成本追踪聚合
  • ✅ 错误处理优化

Batch Inputs

并发控制器

Request 1

Request 2

Request 3

结果聚合

Batch Results

abatch() - 异步批量处理

在需要高并发处理大量请求时,abatch() 比同步 batch() 性能更好:

import asyncio from langchain_openai import ChatOpenAI model = ChatOpenAI()asyncdefasync_batch_example(): inputs =["What is AI?","What is ML?","What is LLM?","What is NLP?","What is DL?"]# 异步批量处理 results =await model.abatch(inputs)for i, result inenumerate(results):print(f"Result {i+1}: {result.content}")# 运行异步任务 asyncio.run(async_batch_example())

abatch 与 batch 的对比

方法适用场景优势
batch()中小批量(<50)实现简单,无需async/await
abatch()大批量(50+)、I/O密集更高并发性能,资源利用率高

2.1.3 异步方法:ainvoke、astream

在高并发场景下,异步方法可显著提升性能:

ainvoke() - 异步调用

import asyncio from langchain_openai import ChatOpenAI model = ChatOpenAI()asyncdefmain():# 异步调用 response =await model.ainvoke("What is async programming?")print(response.content) asyncio.run(main())

astream() - 异步流式

asyncdefstream_example():asyncfor chunk in model.astream("Tell me a story"):print(chunk.content, end="", flush=True) asyncio.run(stream_example())

并发性能对比

# ❌ 同步方式(串行执行,慢)defsync_batch(): results =[]for query in queries: results.append(model.invoke(query))return results # ✅ 异步方式(并发执行,快)asyncdefasync_batch(): tasks =[model.ainvoke(query)for query in queries]returnawait asyncio.gather(*tasks)

性能对比

请求数同步耗时异步耗时性能提升
1030s5s6x
50150s15s10x
100300s25s12x

2.1.4 Runnable 类型:Lambda、Parallel、Branch、Fallbacks

RunnableLambda - 自定义函数包装

将普通 Python 函数包装为 Runnable:

from langchain_core.runnables import RunnableLambda defuppercase(text:str)->str:return text.upper()# 包装为 Runnable runnable_upper = RunnableLambda(uppercase)# 统一调用方式 result = runnable_upper.invoke("hello")# "HELLO"

RunnableParallel - 并行执行

同时执行多个 Runnable,结果以字典形式返回:

from langchain_core.runnables import RunnableParallel parallel = RunnableParallel( joke=ChatPromptTemplate.from_template("Tell a joke about {topic}")| model, poem=ChatPromptTemplate.from_template("Write a poem about {topic}")| model )# 并行执行 result = parallel.invoke({"topic":"AI"})print(result["joke"])print(result["poem"])

Input: topic='AI'

RunnableParallel

Joke Generator

Poem Generator

Result Dict

Output

RunnableBranch - 条件分支

根据条件选择不同的执行路径:

from langchain_core.runnables import RunnableBranch branch = RunnableBranch((lambda x:len(x)>100, long_text_handler),(lambda x:len(x)>10, medium_text_handler), short_text_handler # 默认分支) result = branch.invoke("some text")

Yes

No

Yes

No

Input

len > 100?

Long Text Handler

len > 10?

Medium Text Handler

Short Text Handler

with_fallbacks() - 降级处理

主 Runnable 失败时,自动切换到备用方案:

from langchain_openai import ChatOpenAI primary_model = ChatOpenAI(model="gpt-4") fallback_model = ChatOpenAI(model="gpt-3.5-turbo")# 直接使用 with_fallbacks 方法,无需导入额外类 model_with_fallback = primary_model.with_fallbacks([fallback_model])# 如果 GPT-4 失败,自动使用 GPT-3.5 result = model_with_fallback.invoke("Hello")

参数说明 (基于官方API文档验证):

必需参数

  • fallbacks: Sequence[Runnable] - 备用 Runnable 序列,按顺序尝试

可选参数 (仅关键字参数):

  • exceptions_to_handle: Tuple[Type[BaseException], ...] - 需要处理的异常类型元组,默认为 (Exception,)
  • exception_key: Optional[str] - 可选的键名,用于将异常信息传递给备用方案。如为 None (默认),异常不传递给备用方案

完整参数示例

# ✅ 示例1: 只对特定异常类型执行降级 model_with_fallback = primary_model.with_fallbacks( fallbacks=[fallback_model],# ✅ 官方标准参数:fallbacks (复数,列表) exceptions_to_handle=(TimeoutError, ConnectionError),# ✅ 官方标准参数)# ✅ 示例2: 将异常信息传递给备用方案from langchain_core.runnables import RunnableLambda defhandle_with_error_context(inputs):"""备用方案可以访问异常信息"""if"error"in inputs:print(f"Original error: {inputs['error']}")return fallback_model.invoke(inputs["input"]) model_with_error_context = primary_model.with_fallbacks( fallbacks=[RunnableLambda(handle_with_error_context)],# ✅ 使用 fallbacks 参数名 exception_key="error"# ✅ 官方标准参数:异常会以 "error" 键传递)# ⚠️ 重要:使用 exception_key 时,主 Runnable 和所有备用方案都必须接受字典输入 result = model_with_error_context.invoke({"input":"Hello"})

API 规范总结

defwith_fallbacks( self, fallbacks: Sequence[Runnable[Input, Output]],# 必需*, exceptions_to_handle: Tuple[Type[BaseException],...]=(Exception,),# 可选 exception_key: Optional[str]=None# 可选)-> RunnableWithFallbacksT[Input, Output]:...

Success

Failure

Request

Primary: GPT-4

Return Result

Fallback: GPT-3.5


2.2 LCEL 表达式语言

2.2.1 声明式组合理念

LCEL(LangChain Expression Language)是一种声明式语法,用于组合 Runnable 对象。

命令式 vs 声明式

# ❌ 命令式(手动控制流程)defimperative_chain(input): step1_result = prompt.invoke(input) step2_result = model.invoke(step1_result) step3_result = parser.invoke(step2_result)return step3_result # ✅ 声明式(LCEL 管道) chain = prompt | model | parser result = chain.invoke(input)

LCEL 的核心优势

LCEL

声明式

代码简洁

意图清晰

可组合

管道连接

嵌套组合

自动优化

并行执行

流式传输

可追踪

LangSmith 集成

Debug 友好


2.2.2 管道操作符 | 与并行 {}

管道操作符 | - 顺序执行

将多个 Runnable 串联成管道:

from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI from langchain_core.output_parsers import StrOutputParser # 管道组合 chain =( ChatPromptTemplate.from_template("Tell me about {topic}")| ChatOpenAI()| StrOutputParser())# 自动按顺序执行 result = chain.invoke({"topic":"LangChain"})

执行流程

Input

Prompt Template

ChatOpenAI

StrOutputParser

Output

并行字典 {} - 并行执行

使用字典语法实现并行执行:

from langchain_core.runnables import RunnablePassthrough chain ={"context": retriever | format_docs,"question": RunnablePassthrough()}| prompt | model # context 和 question 并行处理 result = chain.invoke("What is LangChain?")

执行流程

Input

RunnableParallel

context: retriever

question: passthrough

Merge Results

Prompt

Model

assign() - 状态更新快捷方式

RunnablePassthrough.assign() 是 LCEL 中最常用的操作之一,用于在链中添加或更新字段:

from langchain_core.runnables import RunnablePassthrough from langchain_chroma import Chroma from langchain_openai import OpenAIEmbeddings, ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser # 创建向量检索器 vectorstore = Chroma.from_texts(["LangChain是一个AI应用框架","它支持RAG和Agent"], embedding=OpenAIEmbeddings()) retriever = vectorstore.as_retriever()# 使用 assign() 添加检索上下文 chain =( RunnablePassthrough.assign( context=retriever # 添加 context 字段)| ChatPromptTemplate.from_template("基于以下上下文回答问题:\n{context}\n\n问题: {question}")| ChatOpenAI()| StrOutputParser())# 输入只需要 question,context 会自动添加 result = chain.invoke({"question":"什么是LangChain?"})# 内部流程: {"question": "..."} -> {"question": "...", "context": [...]}

assign() 的优势

  1. 保留原始输入:不覆盖已有字段
  2. 简化代码:避免手动构造字典
  3. 链式组合:可以多次调用
# 多次 assign 叠加字段 chain =( RunnablePassthrough.assign( context=retriever # 添加检索结果).assign( context_count=lambda x:len(x["context"])# 添加统计信息).assign( timestamp=lambda x:"2025-11-17"# 添加时间戳)| prompt | model )# 输入: {"question": "..."}# 第一步后: {"question": "...", "context": [...]}# 第二步后: {"question": "...", "context": [...], "context_count": 3}# 第三步后: {"question": "...", "context": [...], "context_count": 3, "timestamp": "..."}

常见使用场景

# 场景1: RAG 添加检索上下文 rag_chain =( RunnablePassthrough.assign(context=retriever)| rag_prompt | model )# 场景2: 添加多个数据源 multi_source_chain =( RunnablePassthrough.assign( docs=doc_retriever, history=history_retriever, metadata=metadata_fetcher )| prompt | model )# 场景3: 数据转换 transform_chain =( RunnablePassthrough.assign( upper_text=lambda x: x["text"].upper(), word_count=lambda x:len(x["text"].split()))| processor )

2.2.3 组合模式:顺序、并行、条件、循环

顺序链接

# 简单顺序 chain = step1 | step2 | step3 # 复杂顺序 chain =({"input": RunnablePassthrough()}| prompt | model |{"output": parser,"raw": RunnablePassthrough()})

并行执行

# 并行获取多个信息 chain = RunnableParallel( summary=summarize_chain, keywords=extract_keywords_chain, sentiment=sentiment_chain )

条件分支

from langchain_core.runnables import RunnableBranch # 根据输入长度选择不同处理 chain = RunnableBranch((lambda x:len(x["text"])>1000, long_text_chain),(lambda x:len(x["text"])>100, medium_text_chain), short_text_chain )

循环迭代

# 使用 RunnableLambda 实现循环defiterative_refine(input): result =inputfor _ inrange(3): result = refine_chain.invoke(result)return result chain = RunnableLambda(iterative_refine)

2.3 高级特性

2.3.1 Fallback 降级与 Retry 重试

Fallback - 自动降级

# 多级降级 chain =( primary_model .with_fallbacks(fallbacks=[backup_model_1, backup_model_2])# ✅ 使用 fallbacks 参数名)# 只对特定异常执行降级 chain =( primary_model .with_fallbacks( fallbacks=[backup_model_1, backup_model_2],# ✅ 使用 fallbacks 参数名 exceptions_to_handle=(TimeoutError, ConnectionError)# ✅ 官方标准参数))

降级流程

Success

Failure

Success

Failure

Request

Primary Model

Return

Backup Model 1

Backup Model 2

Retry - 自动重试

# 直接使用 with_retry 方法,无需单独导入 chain =( prompt | model | parser ).with_retry( stop_after_attempt=3,# 最大重试次数 wait_exponential_jitter=True,# 指数退避 + 随机抖动 retry_if_exception_type=(Exception,)# 指定需要重试的异常类型)

参数说明(基于官方API文档验证):

  • stop_after_attempt:最大重试次数,默认为 3
  • wait_exponential_jitter:是否使用指数退避 + 随机抖动,默认为 True
  • retry_if_exception_type:需要重试的异常类型元组,默认为 (Exception,)

重试策略示例

from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI # 只对特定异常重试 chain =(prompt | model | parser).with_retry( retry_if_exception_type=(TimeoutError, ConnectionError), stop_after_attempt=5, wait_exponential_jitter=True)# 禁用指数退避(立即重试) chain =(prompt | model | parser).with_retry( stop_after_attempt=3, wait_exponential_jitter=False# 禁用指数退避,立即重试)

重试行为

  • 指数退避:1s → 2s → 4s → 8s
  • 最大重试次数:可自定义(默认3次)
  • 重试条件:可指定异常类型(默认所有 Exception)

2.3.2 Timeout 超时控制

重要: RunnableConfig 不支持 timeout 参数。超时控制应在模型层面配置。

from langchain_openai import ChatOpenAI # ✅ 正确:在模型构造时设置timeout model = ChatOpenAI( model="gpt-4", timeout=30,# 30秒超时 max_retries=2) chain = prompt | model | parser result = chain.invoke(input)

超时 + 降级组合策略

from langchain_openai import ChatOpenAI # 主模型:严格超时 slow_model = ChatOpenAI(model="gpt-4", timeout=10)# 降级模型:快速响应 fast_model = ChatOpenAI(model="gpt-3.5-turbo", timeout=5)# 组合超时 + 降级 chain =(prompt | slow_model | parser).with_fallbacks([ prompt | fast_model | parser ])

使用 RunnableConfig 配置其他参数

from langchain_core.runnables import RunnableConfig # RunnableConfig支持的参数 result = chain.invoke(input, config=RunnableConfig( max_concurrency=5,# 最大并发数 tags=["production"],# 标签(用于监控) metadata={"user":"alice"}# 元数据))

2.3.3 缓存与性能优化
💡 提示: 本节介绍 Runnable Protocol 的基础性能API。生产环境的深度性能调优、成本控制、缓存架构等内容,详见 第八篇《生产实践》第21章

LLM 缓存

from langchain_core.caches import InMemoryCache from langchain_core.globalsimport set_llm_cache # 启用缓存 set_llm_cache(InMemoryCache())# 相同请求直接返回缓存结果 model.invoke("What is AI?")# 调用 LLM model.invoke("What is AI?")# 返回缓存(不调用 LLM)

批处理优化

# 批处理优化(使用max_concurrency控制并发) chain = prompt | model.with_config({"max_concurrency":10})# 内部自动合并请求 results = chain.batch(inputs)

流式优化

# 流式传输减少延迟for chunk in chain.stream(input):print(chunk, end="")

性能对比

特性普通调用优化后性能提升
缓存2s50ms40x
批处理10s2s5x
流式TTFT 2sTTFT 200ms10x

本章小结

Runnable Protocol 核心要点

  • ✅ 统一接口:invoke、stream、batch、ainvoke、astream
  • ✅ 可组合性:Lambda、Parallel、Branch、Fallbacks
  • ✅ 可追踪性:自动集成 LangSmith
  • ✅ 性能优化:异步、批处理、缓存

LCEL 核心要点

  • ✅ 声明式组合:| 管道、{} 并行
  • ✅ 自动优化:并行执行、流式传输
  • ✅ 高级特性:Fallback、Retry、Timeout、Cache

设计哲学

一切皆 Runnable,所有组件统一接口,声明式组合,自动优化执行。

思考与练习

练习 1:基础管道 (Prompt → Model → Parser)

这是最基础的 LCEL 组合方式,用于建立一个简单的问答流。

from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser # 1. 定义组件 model = ChatOpenAI(model="gpt-4o-mini") prompt = ChatPromptTemplate.from_template("请用简洁的语言回答问题: {question}") parser = StrOutputParser()# 2. 构建管道 chain = prompt | model | parser # 3. 执行# response = chain.invoke({"question": "什么是量子纠缠?"})# print(response)
练习 2:并行处理 (RunnableParallel)

利用 RunnableParallel 同步执行多个独立任务。

from langchain_core.runnables import RunnableParallel # 定义三个不同的 Prompt joke_prompt = ChatPromptTemplate.from_template("讲一个关于 {topic} 的冷笑话") poem_prompt = ChatPromptTemplate.from_template("写一首关于 {topic} 的五言绝句") story_prompt = ChatPromptTemplate.from_template("写一个关于 {topic} 的 50 字微型小说")# 构建并行处理链 parallel_chain = RunnableParallel( joke=joke_prompt | model | parser, poem=poem_prompt | model | parser, story=story_prompt | model | parser )# 运行# results = parallel_chain.invoke({"topic": "AI"})# print(f"笑话: {results['joke']}\n诗歌: {results['poem']}\n故事: {results['story']}")
练习 3:错误处理 (Fallback & Retry)

通过 with_fallbacks 增加系统的健壮性。

from langchain_openai import ChatOpenAI # 模拟一个容易失败的主模型(例如使用错误的 API Key 或不存在的模型名) primary_model = ChatOpenAI(model="invalid-model-name")# 定义备用模型 backup_model = ChatOpenAI(model="gpt-4o-mini")# 创建带有备选方案的链 chain_with_fallback =( prompt | primary_model.with_fallbacks([backup_model])| parser )# 即使 primary_model 报错,系统也会自动切换到 backup_model# result = chain_with_fallback.invoke({"question": "如何学习 Python?"})
练习 4:性能优化 (Sync vs Async Batch)

对比同步和异步在处理批量任务时的效率。

import asyncio import time questions =[{"question":f"数字 {i} 有什么特殊的数学含义?"}for i inrange(10)]# 同步批处理 start_sync = time.time() sync_results = chain.batch(questions) end_sync = time.time()print(f"同步批处理耗时: {end_sync - start_sync:.2f} 秒")# 异步批处理asyncdefrun_async_batch(): start_async = time.time() async_results =await chain.abatch(questions) end_async = time.time()print(f"异步批处理耗时: {end_async - start_async:.2f} 秒")# asyncio.run(run_async_batch())
2. 思考题解答分析
Q1:什么场景下应该使用 stream 而不是 invoke
  • 提升用户体验:在构建聊天机器人或 Web 界面时,LLM 生成文本较慢。使用 stream 可以让用户实时看到生成的文字(打字机效果),减少等待焦虑。
  • 长文本生成:当输出内容非常长(如写小说、写长报告)时,invoke 必须等待全部内容生成完毕,这可能导致 HTTP 请求超时。
  • 中间步骤监控:在复杂的 Agent 工作流中,通过流式输出可以观察链条执行的中间过程。
Q2:RunnableBranch 和简单的 if-else 有什么区别?
  • 图结构与追踪RunnableBranch 是计算图的一部分,它可以被序列化,并在 LangSmith 等工具中清晰地展示逻辑分支。普通的 if-else 对 LangChain 的追踪系统来说是透明的黑盒。
  • 组合性RunnableBranch 返回的是一个 Runnable 对象,它可以直接通过 | 符号与其他组件无缝连接。
  • 运行时决策RunnableBranch 设计用于在管道流转过程中,根据前一个组件的输出动态决定路径。
Q3:如何在 LCEL 中实现循环逻辑?

LCEL 本身是一个 DAG(有向无环图),它并不直接提供类似 whilefor 的操作符。实现循环通常有以下几种方式:

  • Python 原生包装:在外部使用 Python 的 forwhile 循环多次调用 chain.invoke()
  • 递归 Runnable:定义一个 Runnable,其内部在满足特定条件时再次调用自身。
  • 升级到 LangGraph:对于复杂的循环逻辑(如:Agent 思考 -> 行动 -> 观察 -> 再次思考),官方推荐使用 LangGraph。LangGraph 允许在图结构中定义循环(Edges 回指),是解决循环逻辑的最佳实践。

Read more

Java-Spring入门指南(十四)利用IDEA教你构建第一个SpringMVC系统

Java-Spring入门指南(十四)利用IDEA教你构建第一个SpringMVC系统

Java-Spring入门指南(十四)SpringMVC项目实战搭建 * 前言 * 一、首先导入我们的Maven * 二、接着导入SpringMVC相关的包 * 三、创建Servlet_web环境 * (1)配置springmvc.xml * (2)配置web.xml里面的中央处理器 * (3)为什么需要配置前端控制器? * 五、配置最新的tomcat 11 * 六、运行项目 前言 * 在上一篇博客中,我们系统学习了SpringMVC的核心流程与组件分工,明确了DispatcherServlet(前端控制器)、HandlerMapping(处理器映射器)等组件的协作逻辑。 * 理论之后更需实践,如何从0到1搭建一个可运行的SpringMVC项目,如何将核心组件配置落地,是本次实战的核心目标。 * 本文将基于Maven+IDEA+Tomcat 11环境,一步步完成SpringMVC项目的搭建、配置与运行,让你直观感受“理论”到“实战”的转化过程。 我的个人主页,欢迎来阅读我的其他文章 https:

By Ne0inhk
Flutter 组件 freezed_collection 的鸿蒙化适配实战 - 驾驭极致集合不可变性大坝、构建 OpenHarmony 分布式端高性能、防篡改、类型安全的数据阵列方案

Flutter 组件 freezed_collection 的鸿蒙化适配实战 - 驾驭极致集合不可变性大坝、构建 OpenHarmony 分布式端高性能、防篡改、类型安全的数据阵列方案

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net Flutter 组件 freezed_collection 的鸿蒙化适配实战 - 驾驭极致集合不可变性大坝、构建 OpenHarmony 分布式端高性能、防篡改、类型安全的数据阵列方案 前言 在鸿蒙(OpenHarmony)生态的工业级交付、重型金融结算以及对业务逻辑零缺陷容忍的跨端政务系统中。“集合数据的不可变性与深层防篡改维度”是衡量整个系统架构鲁棒性的最终质量门禁。面对包含数万个 SKU 商品详情、海量设备状态快照、甚至是金融流水大波次的 0308 批次工程大盘。如果仅仅依靠 Dart 原生的 List.unmodifiable 或者是干瘪的运行时报错。不仅会导致在定位多线程并发竞态(Race Condition)时让架构师如同在逻辑废墟中盲人摸象。更会因为缺乏编译期强制约束。令整个系统的状态管理在跨设备同步时陷入严重的混乱盲区。 我们需要一种“逻辑严丝合缝、操作物理隔离”的集合资产保护艺术。 freezed_collection 是一套专注于无缝整

By Ne0inhk
安利一款超实用的前端可视化打印设计器:Vue Print Designer

安利一款超实用的前端可视化打印设计器:Vue Print Designer

做前端开发的朋友应该都懂,业务开发中遇到打印需求真的头大 —— 手写分页逻辑繁琐、不同框架适配麻烦、票据 / 快递单这类定制化打印场景不好实现,找个趁手的打印插件更是难上加难。最近发现了一款开源的可视化打印设计器Vue Print Designer,完美解决了这些痛点,不管是快速开发还是企业级定制化需求都能满足,今天就跟大家详细聊聊这款工具。 一、Vue Print Designer 是什么? Vue Print Designer 是一款面向业务表单、标签、票据、快递单等打印场景的可视化设计器,核心主打模板化、变量化设计,还提供了静默打印、云打印能力,同时支持 PDF / 图片 / Blob 等多种导出方式,完全能覆盖日常开发中的各类打印需求。 它不是简单的打印插件,而是一套完整的打印解决方案,从可视化设计模板,到参数配置、多端打印,再到定制化扩展,一站式搞定,而且项目还在持续更新,最新版本已经支持英寸、厘米作为单位,对国际化和精细化设计更友好了。 项目地址:https://gitee.com/

By Ne0inhk
Spring Boot 实战:MyBatis 操作数据库(上)

Spring Boot 实战:MyBatis 操作数据库(上)

—JavaEE专栏— Spring Boot 实战:MyBatis 操作数据库(上) 摘要 本文深度解析了 Spring Boot 环境下 MyBatis 的集成与应用。通过回顾传统 JDBC 的局限性,详细展示了 MyBatis 在日志配置、CRUD 操作、自增主键返回及多表查询中的实战用法。同时,文章深入探讨了 #{} 与 ${} 的底层预编译差异及安全风险,并分享了企业级开发中的数据库命名规范与 Druid 连接池配置,助力开发者构建稳健的持久层架构。 文章目录 * Spring Boot 实战:MyBatis 操作数据库(上) * 摘要 * @[toc] * 1. 为什么持久层开发需要 MyBatis? * 1.1 传统 JDBC 的局限性 * 1.2

By Ne0inhk