跳到主要内容
LangChain 1.0 架构全景、Runnable 协议与 LCEL 声明式语法解析 | 极客日志
Python AI
LangChain 1.0 架构全景、Runnable 协议与 LCEL 声明式语法解析 综述由AI生成 介绍 LangChain 1.0 架构体系,涵盖 Runnable 协议统一抽象与 LCEL 声明式组合语法。内容包括环境配置、生态全景(应用层、编排层、链路层、监控层)、Agent 构建流程及核心设计理念(Provider-Agnostic、Middleware Driven)。详细讲解了 invoke、stream、batch 等调用方法,以及 LCEL 管道操作符、并行执行、条件分支、降级重试等高级特性。通过代码示例演示了 RAG 系统构建、性能优化及异步处理最佳实践,旨在帮助开发者快速掌握 LangChain 生产级开发模式。
利刃 发布于 2026/3/24 更新于 2026/5/4 15 浏览第一阶段:架构全景、Runnable 协议与 LCEL 声明式语法解析
版本要求 : 本教程基于 LangChain 1.0.7+、LangGraph 1.0.3+、Python 3.10+
更新日期 : 2025-12
📋 前置准备
环境配置
在开始学习之前,请确保完成以下环境配置:
1. Python 版本
python --version
2. 安装依赖
pip install langchain langchain-openai langgraph langchain-community
uv pip install langchain langchain-openai langgraph langchain-community
pip install "langchain>=1.0.7" "langchain-openai>=1.0.3" "langgraph>=1.0.3"
3. 环境变量配置
OPENAI_API_KEY=sk-your-api-key-here
LANGSMITH_API_KEY=your-langsmith-key
LANGSMITH_TRACING=true
from dotenv import load_dotenv
import os
load_dotenv()
required_vars = ["OPENAI_API_KEY" ]
for var in required_vars:
if not os.getenv(var):
raise EnvironmentError(f"缺少必需的环境变量:{var} " )
4. 依赖版本清单
[tool.poetry.dependencies]
python = "^3.10"
langchain = "^1.0.7"
=
=
=
=
=
=
langchain-openai
"^1.0.3"
langgraph
"^1.0.3"
langchain-community
"^0.3.0"
langchain-core
"^1.0.7"
langsmith
"^0.4.43"
python-dotenv
"^1.0.0"
# requirements.txt 格式
langchain>=1.0.7
langchain-openai>=1.0.3
langgraph>=1.0.3
langchain-community>=0.3.0
langchain-core>=1.0.7
langsmith>=0.4.43
python-dotenv>=1.0.0
前置知识
✅ Python 基础 (async/await、类型注解、装饰器)
✅ LLM 基本概念 (Prompt、Token、Temperature 等)
✅ API 调用基础
✅ JSON 数据格式
第 1 章:LangChain 生态全景
1.1 架构层次关系 应用层
Deep Agents / LangGraph Projects
外部资源
Models / APIs / Tools
LangChain 生态系统目前已形成'多层协同'的架构体系,既可支持快速原型开发,也可支撑生产级 LLM 应用。整体结构如下:
层级 核心组件 职责定位 典型场景 应用层 Deep Agents / LangGraph Projects 复杂自治 Agent、长期运行、多 Agent 协作 智能助手、自动化任务系统 编排层 LangGraph 状态化流程控制、节点执行、分支循环 多 Agent 编排、可视化状态流 链路层 LangChain / LCEL 模型调用、提示管理、工具集成 RAG、问答、对话 监控层 LangSmith 调试、观测、评估、成本追踪 DevOps、Evals、质量监控
1.1.1 LangChain 与 LangGraph 的关系 LangChain 专注于 链式逻辑与 Agent 封装 ;LangGraph 专注于 流程编排与状态管理 。
LangChain: 用于构建单条或线性 chain(Prompt→Model→Tool→Output)。
LangGraph: 用于管理含分支、循环、并发的复杂流程(可视化、持久化状态)。
二者可并用:LangGraph 中的节点可运行 LangChain 或 LCEL 构造的 chain。
图 1-2 LangChain 与 LangGraph 协作关系图
LangChain Chain
(Prompt→Model→Tool→Output)
↓
LangGraph Node
↓
LangGraph Flow
(多节点编排 / 状态持久化)
↓
LangGraph Studio(可视化与监控)
1.1.2 如何构建 Agent LangChain 1.0+ 提供统一的 Agent 构建接口:create_agent
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
@tool
def get_weather (city: str ) -> str :
"""获取指定城市的天气"""
return f"{city} 今天天气晴朗,温度 25°C"
@tool
def calculate (expression: str ) -> str :
"""计算数学表达式"""
try :
result = eval (expression)
return f"计算结果:{result} "
except Exception as e:
return f"计算错误:{str (e)} "
agent = create_agent(
model=ChatOpenAI(model="gpt-4" ),
tools=[get_weather, calculate],
system_prompt="你是一个有帮助的助手,可以查询天气和进行计算。"
)
result = agent.invoke({"messages" : [("user" , "北京天气如何?另外帮我算一下 25 * 4" )]})
print (result["messages" ][-1 ].content)
北京今天天气晴朗,温度 25°C。 25 * 4 的计算结果是 100。
用户输入 → Agent 接收 → LLM 分析 → 需要工具?
↓ Yes ↓ No
调用工具 生成回复
↓ ↓
获取结果 ← 返回用户
参数 类型 必需 说明 modelChatModel ✅ 使用的语言模型 toolsList[Tool] ✅ 可用的工具列表 system_promptstr ❌ 系统提示词,定义 Agent 行为 checkpointerCheckpointer ❌ 状态持久化(用于多轮对话) interrupt_beforeList[str] ❌ 在指定节点前暂停(需要人工确认) interrupt_afterList[str] ❌ 在指定节点后暂停
模型绑定 :指定使用的 LLM(如 GPT-4、Claude 等)
工具注册 :提供 Agent 可调用的工具集合
提示配置 :通过 system_prompt 定义 Agent 的角色和行为
决策执行 :LLM 基于 ReAct 模式自动决定是否调用工具
结果返回 :自动组合工具输出和 LLM 回复
监控追踪 :集成 LangSmith 实现全链路追踪
✅ 官方推荐 :LangChain 1.0+ 标准 API
✅ 简洁易用 :统一的接口,3 步即可创建 Agent
✅ 完整功能 :支持 middleware、cache、checkpointer
✅ 自动工具调用 :LLM 自动判断何时使用哪个工具
✅ 多轮对话 :支持状态持久化,实现上下文记忆
✅ 长期支持 :官方维护,持续更新
Prompt Template → LLM / ChatModel → Tool Selection → Tool Execution → Parser / Output Formatter → 返回结果
↓
LangSmith Callback / Tracing
1.1.3 LCEL 的定位与作用 LCEL(LangChain Expression Language)是 LangChain 的'声明式组合语法',用于 构建可并行、可流式、可追踪的 Runnable 链 。
核心概念:
RunnableSequence 顺序执行;
RunnableParallel 并行执行;
支持 async / stream / batch 统一调用;
可直接嵌入 LangGraph 节点。
价值: 在代码层面构建'数据流管线',如同 Node-RED 或 Airflow 的轻量化实现。
输入数据 → RunnableSequence(顺序执行) → RunnableParallel(并行执行) → 模型推理 / 工具调用 → 流式输出 / 结构化解析
1.1.4 LangSmith 的监控职责 LangSmith 是 LangChain 官方推出的可观测性与质量评估平台。
🔍 Tracing :追踪 Chain/Graph/Agent 每个调用节点。
📈 Metrics :监控延迟、Token 用量、错误率、成本。
🧪 Evaluation :对模型或 Agent 输出进行打分与对比。
⚙️ Integration :与 LangChain、LangGraph、Deep Agents 原生集成。
LangSmith
├── Tracing (链路追踪)
├── Metrics (性能&成本)
├── Evals (模型评估)
└── Dashboard / Report (开发者 / 团队协作)
1.2 核心设计理念 LangChain Design
├── Provider-Agnostic (多模型兼容,快速切换)
├── Runnable Protocol (统一接口,可组合执行)
├── Middleware Driven (Hook/Callback, Metrics/Retry)
└── Production First (稳定性,可观测性,成本控制)
1.2.1 Provider-Agnostic 设计 LangChain 通过统一接口屏蔽 LLM 提供商差异(OpenAI、Anthropic、Cohere、Azure 等),以'Provider 无关'的方式构建应用。
模型切换无需修改上层逻辑。
支持跨平台成本追踪与性能比较。
1.2.2 Runnable Protocol 统一抽象 Runnable 是 LangChain 的核心执行协议:
包括 Chain、Agent、Tool、Prompt 均实现该接口。
统一执行入口:invoke()、ainvoke()、stream()。
支持异步、批量、流式、可追踪调用。
所有 Runnable 可嵌套、组合、装饰。
Runnable
├── Chain
├── Agent
├── Tool
├── Prompt
└── LCEL 组合结构
1.2.3 Middleware-Driven 架构 LangChain 支持 Callback / Hook / Tracing 机制,可在执行流中插入中间件。
Token 计数与成本监控
日志与错误追踪
安全审查与访问控制
重试与超时控制
User → LangSmithMiddleware → Chain/Agent → User
↓
调用执行进入中间件 (token 计数/日志)
↓
上报监控数据
↓
返回监控结果
↓
执行主流程
↓
返回输出结果
1.2.4 Production-First 理念 LangChain 1.0 及 LangGraph 1.0 发布后,生态全面转向 生产级稳定性与可观测性 。核心目标包括:
长期兼容(向 2.0 平滑过渡)
成本可控(LangSmith 监控 + 自动计费)
模型热替换(Provider-agnostic)
完整 CI/CD 与 Evals 集成
开发阶段 (LangChain Prototype) → 测试阶段 (LangSmith 调试) → 部署阶段 (LangGraph 编排) → 监控阶段 (Metrics / Evals) → 持续优化 (模型&提示调整)
1.3 技术选型决策树 应用需求评估
↓
流程是否复杂?
├─ 是 → 使用 LangGraph 编排
└─ 否 → 使用 create_agent / LangChain 快速原型
↓
是否需要状态管理?
├─ 是 → 使用 LCEL 构建 chain
└─ 否 → 使用 Deep Agents / 结合 LangSmith 监控
↓
是否为长期运行/自治?
├─ 是 → 使用 Deep Agents
└─ 否 → 使用 LCEL
1.3.1 何时使用 create_agent
单 Agent 执行,流程线性;
需要快速实现 Tool 调用;
用于 RAG、问答、助手类场景。
1.3.2 何时深入 LangGraph
多 Agent 协作;
存在分支 / 循环 / 状态管理;
需可视化、可调试、持久化运行。
1.3.3 何时使用 Deep Agents
长期运行、自主决策 Agent;
复杂任务拆解、子 Agent 管理;
持续任务执行与周期性触发。
1.3.4 何时需要 Middleware
生产环境运行;
需要日志、指标、安全控制、回调。
推荐:所有 Chain/Agent 均启用 LangSmith Tracing + 自定义 Callback。
1.3.5 典型应用场景分析 场景 推荐技术 理由 A. 企业文档问答 create_agent + LCEL 快速构建 RAG 问答 B. 智能客服系统 LangChain Agent + Middleware 需多轮对话与监控 C. 自动化任务管理 LangGraph + Deep Agents + LangSmith 复杂 workflow + 自治 agent D. 内容摘要或转换 LCEL 轻量、高并行、可流式
本章小结
链式逻辑(LangChain) → 图式编排(LangGraph) → 监控评估(LangSmith) → 自治进化(Deep Agents)
Provider-Agnostic
Runnable 统一抽象
Middleware 可插架构
Production-First 部署思维
设计哲学上,从'玩具原型'走向'生产可观测'的工程系统。
思考与练习
1. 练习 1:业务场景技术选型决策路径 业务场景: 企业级智能客服系统(基于私有知识库的 RAG 系统)
技术选型决策路径图
数据接入层 (Ingestion)
数据源: PDF 还是 Wiki? → 决定 Loader (PyPDFLoader vs. ConfluenceLoader)
更新频率: 实时还是批处理? → 决定 Indexing API 策略
处理层 (Transformation)
文本分割: 语义分割还是字符分割? → 选 RecursiveCharacterTextSplitter
向量化: 开源 (BGE) 还是闭源 (OpenAI)? → 平衡成本与隐私
存储与检索层 (Retrieval)
向量库: 毫秒级搜索 (Pinecone) 还是本地部署 (Chroma/Milvus)?
检索策略: 向量搜索还是混合搜索 (Hybrid Search)? → 引入 BM25 回排
应用层 (Augmentation & Generation)
对话管理: 是否需要长短期记忆? → 选 LangGraph Checkpointer
推理模型: 复杂任务 (GPT-4o) 还是简单总结 (Llama-3-8B)?
2. 练习 2:LCEL 例程与 Middleware 插入点 这是一个标准的 LCEL 链式调用,模拟'根据关键词搜索并生成专业报告'的过程。
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
model = ChatOpenAI(model="gpt-4o" )
prompt = ChatPromptTemplate.from_template("请针对以下主题编写一份专业报告:{topic}" )
parser = StrOutputParser()
chain = (
{"topic" : RunnablePassthrough()}
| prompt
| model.with_retry(stop_after_attempt=3 )
| parser
)
3. 练习 3:长期运行 Agent 设计 (市场监控系统)
架构设计 (LangGraph + LangSmith)
A. LangGraph 流程节点
Start 节点 :初始化监控清单(行业关键词、竞争对手)。
Search 节点 :调用搜索工具获取最新新闻/数据。
Analyze 节点 :LLM 评估数据价值(是否有重要异动)。
Router (条件分支) :
如果有价值 → Report 节点 (生成摘要并推送)。
如果无价值 → Sleep 节点 (进入等待状态)。
End 节点 :归档当次扫描日志。
B. 监控与 Evals (LangSmith 实现)
状态追踪 (Traces) :通过 LangSmith 监控 Agent 在 Search 和 Analyze 节点的延迟,识别哪个环节最耗时。
持久化 (Persistence) :利用 LangGraph 的 checkpointer 将 Agent 的状态存储在数据库中,确保即使服务器重启,市场监控任务也能从中断点恢复。
评估 (Evals) :
单元测试 :针对 Analyze 节点,构建一个包含'真实异动'和'噪音数据'的数据集。
反馈回路 :在推送报告后,收集用户'有用/无用'的点赞反馈,并同步到 LangSmith 标注集中,用于微调 Prompt。
4. 思考题:LCEL 嵌套在 LangGraph 节点中的优势与代价
优势 (Advantages)
细粒度控制 :在 LangGraph 的单个节点内,你可以使用 LCEL 快速构建复杂的'小链条'(如:Prompt → LLM → Parser),保持节点代码的简洁。
标准接口 :LCEL 组件原生支持流式输出 (Streaming) 和异步调用 (Async),这使得 LangGraph 节点能自动继承这些特性。
高度可组合性 :可以轻松地在不同节点间复用相同的 LCEL 片段(例如:通用的格式化器)。
代价 (Trade-offs)
调试深度增加 :嵌套过深会导致 LangSmith 中的 Trace 层级非常复杂,排查报错时需要层层展开。
状态管理混乱 :LCEL 倾向于隐式传递数据,而 LangGraph 强调显式的 State 更新。如果在节点内过度使用 LCEL 修改数据,可能会导致全局状态更新不透明。
性能开销 :每一层 LCEL 封装都会带来微小的运行抽象开销,在对延迟极其敏感的场景下需谨慎。
第 2 章:核心抽象:Runnable 与 LCEL
2.1 Runnable Protocol
2.1.1 为什么需要统一抽象 在 LangChain 1.0 之前,不同组件(Prompt、Model、Tool、Chain)的调用方式各不相同,导致:
接口不一致 :学习成本高,难以组合
缺乏标准化 :无法统一追踪、监控
组合困难 :不同组件难以嵌套使用
LangChain 1.0 引入 Runnable 作为统一执行协议 ,所有组件均实现该接口:
from langchain_core.runnables import Runnable
class Runnable :
def invoke (self, input , config=None ): ...
def ainvoke (self, input , config=None ): ...
def stream (self, input , config=None ): ...
def astream (self, input , config=None ): ...
def batch (self, inputs, config=None ): ...
Runnable 统一抽象
├── 一致的调用方式
├── 可组合性
├── 可追踪性
├── 自动优化
└── 降低学习成本
LCEL 管道
LangSmith 集成
批处理/并行
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}" )
model = ChatOpenAI()
parser = StrOutputParser()
result = prompt.invoke({"topic" : "AI" })
result = model.invoke("Tell me a joke" )
result = parser.invoke("some text" )
2.1.2 核心方法:invoke、stream、batch from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4" )
response = model.invoke("What is LangChain?" )
print (response.content)
User → LangSmith → LLM → Runnable
↓
invoke(input)
↓
开始追踪
↓
发送请求
↓
返回完整结果
↓
记录结果
↓
返回输出
for chunk in model.stream("Tell me a long story" ):
print (chunk.content, end="" , flush=True )
✅ 降低首字延迟(TTFT - Time To First Token)
✅ 提升用户体验(实时显示)
✅ 减少超时风险
User Request
↓
Stream Token 1
↓
Stream Token 2
↓
Stream Token 3
↓
...
↓
Stream Complete
inputs = ["What is AI?" , "What is ML?" , "What is LLM?" ]
results = model.batch(inputs)
for result in results:
print (result.content)
✅ 自动并发控制
✅ 成本追踪聚合
✅ 错误处理优化
Batch Inputs
↓
并发控制器
↓
Request 1
Request 2
Request 3
↓
结果聚合
↓
Batch Results
在需要高并发处理大量请求时,abatch() 比同步 batch() 性能更好:
import asyncio
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
async def async_batch_example ():
inputs = ["What is AI?" , "What is ML?" , "What is LLM?" , "What is NLP?" , "What is DL?" ]
results = await model.abatch(inputs)
for i, result in enumerate (results):
print (f"Result {i+1 } : {result.content} " )
asyncio.run(async_batch_example())
方法 适用场景 优势 batch()中小批量(<50) 实现简单,无需 async/await abatch()大批量(50+)、I/O 密集 更高并发性能,资源利用率高
2.1.3 异步方法:ainvoke、astream import asyncio
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
async def main ():
response = await model.ainvoke("What is async programming?" )
print (response.content)
asyncio.run(main())
async def stream_example ():
async for chunk in model.astream("Tell me a story" ):
print (chunk.content, end="" , flush=True )
asyncio.run(stream_example())
def sync_batch ():
results = []
for query in queries:
results.append(model.invoke(query))
return results
async def async_batch ():
tasks = [model.ainvoke(query) for query in queries]
return await asyncio.gather(*tasks)
请求数 同步耗时 异步耗时 性能提升 10 30s 5s 6x 50 150s 15s 10x 100 300s 25s 12x
2.1.4 Runnable 类型:Lambda、Parallel、Branch、Fallbacks 将普通 Python 函数包装为 Runnable:
from langchain_core.runnables import RunnableLambda
def uppercase (text: str ) -> str :
return text.upper()
runnable_upper = RunnableLambda(uppercase)
result = runnable_upper.invoke("hello" )
同时执行多个 Runnable,结果以字典形式返回:
from langchain_core.runnables import RunnableParallel
parallel = RunnableParallel(
joke=ChatPromptTemplate.from_template("Tell a joke about {topic}" ) | model,
poem=ChatPromptTemplate.from_template("Write a poem about {topic}" ) | model
)
result = parallel.invoke({"topic" : "AI" })
print (result["joke" ])
print (result["poem" ])
Input: topic='AI'
↓
RunnableParallel
├── Joke Generator
└── Poem Generator
↓
Result Dict
↓
Output
from langchain_core.runnables import RunnableBranch
branch = RunnableBranch(
(lambda x: len (x) > 100 , long_text_handler),
(lambda x: len (x) > 10 , medium_text_handler),
short_text_handler
)
result = branch.invoke("some text" )
Input
↓
len > 100?
├─ Yes → Long Text Handler
└─ No
↓
len > 10?
├─ Yes → Medium Text Handler
└─ No → Short Text Handler
主 Runnable 失败时,自动切换到备用方案:
from langchain_openai import ChatOpenAI
primary_model = ChatOpenAI(model="gpt-4" )
fallback_model = ChatOpenAI(model="gpt-3.5-turbo" )
model_with_fallback = primary_model.with_fallbacks([fallback_model])
result = model_with_fallback.invoke("Hello" )
fallbacks: Sequence[Runnable] - 备用 Runnable 序列,按顺序尝试
exceptions_to_handle: Tuple[Type[BaseException], ...] - 需要处理的异常类型元组,默认为 (Exception,)
exception_key: Optional[str] - 可选的键名,用于将异常信息传递给备用方案。如为 None (默认),异常不传递给备用方案
model_with_fallback = primary_model.with_fallbacks(
fallbacks=[fallback_model],
exceptions_to_handle=(TimeoutError, ConnectionError)
)
from langchain_core.runnables import RunnableLambda
def handle_with_error_context (inputs ):
"""备用方案可以访问异常信息"""
if "error" in inputs:
print (f"Original error: {inputs['error' ]} " )
return fallback_model.invoke(inputs["input" ])
model_with_error_context = primary_model.with_fallbacks(
fallbacks=[RunnableLambda(handle_with_error_context)],
exception_key="error"
)
result = model_with_error_context.invoke({"input" : "Hello" })
def with_fallbacks (
self,
fallbacks: Sequence [Runnable[Input, Output]],
*,
exceptions_to_handle: Tuple [Type [BaseException], ...] = (Exception, ),
exception_key: Optional [str ] = None
) -> RunnableWithFallbacksT[Input, Output]:
...
Success
↓
Request
↓
Primary: GPT-4
↓
Return Result
Failure
↓
Fallback: GPT-3.5
2.2 LCEL 表达式语言
2.2.1 声明式组合理念 LCEL(LangChain Expression Language)是一种声明式 语法,用于组合 Runnable 对象。
def imperative_chain (input ):
step1_result = prompt.invoke(input )
step2_result = model.invoke(step1_result)
step3_result = parser.invoke(step2_result)
return step3_result
chain = prompt | model | parser
result = chain.invoke(input )
LCEL
├── 声明式
├── 代码简洁
├── 意图清晰
├── 可组合
├── 管道连接
├── 嵌套组合
├── 自动优化
├── 并行执行
├── 流式传输
├── 可追踪
├── LangSmith 集成
├── Debug 友好
2.2.2 管道操作符 | 与并行 {} from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
chain = (
ChatPromptTemplate.from_template("Tell me about {topic}" )
| ChatOpenAI()
| StrOutputParser()
)
result = chain.invoke({"topic" : "LangChain" })
Input
↓
Prompt Template
↓
ChatOpenAI
↓
StrOutputParser
↓
Output
from langchain_core.runnables import RunnablePassthrough
chain = {
"context" : retriever | format_docs,
"question" : RunnablePassthrough()
} | prompt | model
result = chain.invoke("What is LangChain?" )
Input
↓
RunnableParallel
├── context: retriever
└── question: passthrough
↓
Merge Results
↓
Prompt
↓
Model
RunnablePassthrough.assign() 是 LCEL 中最常用的操作之一,用于在链中添加或更新字段:
from langchain_core.runnables import RunnablePassthrough
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
vectorstore = Chroma.from_texts(
["LangChain 是一个 AI 应用框架" , "它支持 RAG 和 Agent" ],
embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()
chain = (
RunnablePassthrough.assign(
context=retriever
)
| ChatPromptTemplate.from_template("基于以下上下文回答问题:\n{context}\n\n问题:{question}" )
| ChatOpenAI()
| StrOutputParser()
)
result = chain.invoke({"question" : "什么是 LangChain?" })
保留原始输入 :不覆盖已有字段
简化代码 :避免手动构造字典
链式组合 :可以多次调用
chain = (
RunnablePassthrough.assign(
context=retriever
).assign(
context_count=lambda x: len (x["context" ])
).assign(
timestamp=lambda x: "2025-11-17"
)
| prompt | model
)
rag_chain = (
RunnablePassthrough.assign(context=retriever)
| rag_prompt | model
)
multi_source_chain = (
RunnablePassthrough.assign(
docs=doc_retriever,
history=history_retriever,
metadata=metadata_fetcher
)
| prompt | model
)
transform_chain = (
RunnablePassthrough.assign(
upper_text=lambda x: x["text" ].upper(),
word_count=lambda x: len (x["text" ].split())
)
| processor
)
2.2.3 组合模式:顺序、并行、条件、循环
chain = step1 | step2 | step3
chain = ({"input" : RunnablePassthrough()} | prompt | model | {"output" : parser, "raw" : RunnablePassthrough()})
chain = RunnableParallel(
summary=summarize_chain,
keywords=extract_keywords_chain,
sentiment=sentiment_chain
)
from langchain_core.runnables import RunnableBranch
chain = RunnableBranch(
(lambda x: len (x["text" ]) > 1000 , long_text_chain),
(lambda x: len (x["text" ]) > 100 , medium_text_chain),
short_text_chain
)
def iterative_refine (input ):
result = input
for _ in range (3 ):
result = refine_chain.invoke(result)
return result
chain = RunnableLambda(iterative_refine)
2.3 高级特性
2.3.1 Fallback 降级与 Retry 重试
chain = (
primary_model
.with_fallbacks(fallbacks=[backup_model_1, backup_model_2])
)
chain = (
primary_model
.with_fallbacks(
fallbacks=[backup_model_1, backup_model_2],
exceptions_to_handle=(TimeoutError, ConnectionError)
)
)
Success
↓
Request
↓
Primary Model
↓
Return
Failure
↓
Backup Model 1
↓
Backup Model 2
chain = (
prompt | model | parser
).with_retry(
stop_after_attempt=3 ,
wait_exponential_jitter=True ,
retry_if_exception_type=(Exception,)
)
stop_after_attempt:最大重试次数,默认为 3
wait_exponential_jitter:是否使用指数退避 + 随机抖动,默认为 True
retry_if_exception_type:需要重试的异常类型元组,默认为 (Exception,)
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
chain = (
prompt | model | parser
).with_retry(
retry_if_exception_type=(TimeoutError, ConnectionError),
stop_after_attempt=5 ,
wait_exponential_jitter=True
)
chain = (
prompt | model | parser
).with_retry(
stop_after_attempt=3 ,
wait_exponential_jitter=False
)
指数退避:1s → 2s → 4s → 8s
最大重试次数:可自定义(默认 3 次)
重试条件:可指定异常类型(默认所有 Exception)
2.3.2 Timeout 超时控制 重要 : RunnableConfig 不支持 timeout 参数。超时控制应在模型层面配置。
from langchain_openai import ChatOpenAI
model = ChatOpenAI(
model="gpt-4" ,
timeout=30 ,
max_retries=2
)
chain = prompt | model | parser
result = chain.invoke(input )
from langchain_openai import ChatOpenAI
slow_model = ChatOpenAI(model="gpt-4" , timeout=10 )
fast_model = ChatOpenAI(model="gpt-3.5-turbo" , timeout=5 )
chain = (
prompt | slow_model | parser
).with_fallbacks([
prompt | fast_model | parser
])
使用 RunnableConfig 配置其他参数 :
from langchain_core.runnables import RunnableConfig
result = chain.invoke(input , config=RunnableConfig(
max_concurrency=5 ,
tags=["production" ],
metadata={"user" : "alice" }
))
2.3.3 缓存与性能优化
💡 提示 : 本节介绍 Runnable Protocol 的基础性能 API。生产环境的深度性能调优、成本控制、缓存架构等内容,详见 第八篇《生产实践》第 21 章 。
from langchain_core.caches import InMemoryCache
from langchain_core.globals import set_llm_cache
set_llm_cache(InMemoryCache())
model.invoke("What is AI?" )
model.invoke("What is AI?" )
chain = prompt | model.with_config({"max_concurrency" : 10 })
results = chain.batch(inputs)
for chunk in chain.stream(input ):
print (chunk, end="" )
特性 普通调用 优化后 性能提升 缓存 2s 50ms 40x 批处理 10s 2s 5x 流式 TTFT 2s TTFT 200ms 10x
本章小结
✅ 统一接口:invoke、stream、batch、ainvoke、astream
✅ 可组合性:Lambda、Parallel、Branch、Fallbacks
✅ 可追踪性:自动集成 LangSmith
✅ 性能优化:异步、批处理、缓存
✅ 声明式组合:| 管道、{} 并行
✅ 自动优化:并行执行、流式传输
✅ 高级特性:Fallback、Retry、Timeout、Cache
一切皆 Runnable,所有组件统一接口,声明式组合,自动优化执行。
思考与练习
练习 1:基础管道 (Prompt → Model → Parser) 这是最基础的 LCEL 组合方式,用于建立一个简单的问答流。
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4o-mini" )
prompt = ChatPromptTemplate.from_template("请用简洁的语言回答问题:{question}" )
parser = StrOutputParser()
chain = prompt | model | parser
练习 2:并行处理 (RunnableParallel) 利用 RunnableParallel 同步执行多个独立任务。
from langchain_core.runnables import RunnableParallel
joke_prompt = ChatPromptTemplate.from_template("讲一个关于 {topic} 的冷笑话" )
poem_prompt = ChatPromptTemplate.from_template("写一首关于 {topic} 的五言绝句" )
story_prompt = ChatPromptTemplate.from_template("写一个关于 {topic} 的 50 字微型小说" )
parallel_chain = RunnableParallel(
joke=joke_prompt | model | parser,
poem=poem_prompt | model | parser,
story=story_prompt | model | parser
)
练习 3:错误处理 (Fallback & Retry) 通过 with_fallbacks 增加系统的健壮性。
from langchain_openai import ChatOpenAI
primary_model = ChatOpenAI(model="invalid-model-name" )
backup_model = ChatOpenAI(model="gpt-4o-mini" )
chain_with_fallback = (
prompt | primary_model.with_fallbacks([backup_model]) | parser
)
练习 4:性能优化 (Sync vs Async Batch) import asyncio
import time
questions = [{"question" : f"数字 {i} 有什么特殊的数学含义?" } for i in range (10 )]
start_sync = time.time()
sync_results = chain.batch(questions)
end_sync = time.time()
print (f"同步批处理耗时:{end_sync - start_sync:.2 f} 秒" )
async def run_async_batch ():
start_async = time.time()
async_results = await chain.abatch(questions)
end_async = time.time()
print (f"异步批处理耗时:{end_async - start_async:.2 f} 秒" )
2. 思考题解答分析
Q1:什么场景下应该使用 stream 而不是 invoke?
提升用户体验 :在构建聊天机器人或 Web 界面时,LLM 生成文本较慢。使用 stream 可以让用户实时看到生成的文字(打字机效果),减少等待焦虑。
长文本生成 :当输出内容非常长(如写小说、写长报告)时,invoke 必须等待全部内容生成完毕,这可能导致 HTTP 请求超时。
中间步骤监控 :在复杂的 Agent 工作流中,通过流式输出可以观察链条执行的中间过程。
Q2:RunnableBranch 和简单的 if-else 有什么区别?
图结构与追踪 :RunnableBranch 是计算图的一部分,它可以被序列化,并在 LangSmith 等工具中清晰地展示逻辑分支。普通的 if-else 对 LangChain 的追踪系统来说是透明的黑盒。
组合性 :RunnableBranch 返回的是一个 Runnable 对象,它可以直接通过 | 符号与其他组件无缝连接。
运行时决策 :RunnableBranch 设计用于在管道流转过程中,根据前一个组件的输出动态决定路径。
Q3:如何在 LCEL 中实现循环逻辑? LCEL 本身是一个 DAG(有向无环图) ,它并不直接提供类似 while 或 for 的操作符。实现循环通常有以下几种方式:
Python 原生包装 :在外部使用 Python 的 for 或 while 循环多次调用 chain.invoke()。
递归 Runnable :定义一个 Runnable,其内部在满足特定条件时再次调用自身。
升级到 LangGraph :对于复杂的循环逻辑(如:Agent 思考 -> 行动 -> 观察 -> 再次思考),官方推荐使用 LangGraph 。LangGraph 允许在图结构中定义循环(Edges 回指),是解决循环逻辑的最佳实践。
相关免费在线工具 RSA密钥对生成器 生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
Mermaid 预览与可视化编辑 基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
随机西班牙地址生成器 随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online