LangChain 核心组件 RunnableLambda 详解与实战
在之前的文章中,我们学习了 LangChain 的**链(Chain)**是由多个 对象通过 符号组合而成的(比如 )。但如果我们想在链中插入自己写的普通 Python 函数(比如数据清洗、日志记录、调用外部 API),直接放进去会报错——因为普通函数不遵守 协议,无法被 LangChain 的链识别和调用。
LangChain 核心组件 RunnableLambda 详解与实战 在之前的文章中,我们学习了 LangChain 的**链(Chain)**是由多个 Runnable 对象通过 | 符号组合而成的(比如 PromptTemplate | ChatModel | OutputParser)。但如果我们想在链中插入自己写的普通 Python 函数(比如数据清洗、日志记录、调用外部 API),直接…

在之前的文章中,我们学习了 LangChain 的**链(Chain)**是由多个 对象通过 符号组合而成的(比如 )。但如果我们想在链中插入自己写的普通 Python 函数(比如数据清洗、日志记录、调用外部 API),直接放进去会报错——因为普通函数不遵守 协议,无法被 LangChain 的链识别和调用。
Runnable|PromptTemplate | ChatModel | OutputParserRunnable这时候,RunnableLambda 就派上用场了!
它的核心定位是:将任意 Python 函数/可调用对象,快速转换为符合 LangChain Runnable 协议的对象,让普通函数拥有'通行证',无缝融入 LCEL(LangChain Expression Language)链中。

from langchain_core.runnables import RunnableLambda
# 本质:包装一个函数,让它继承 Runnable 的所有能力
class RunnableLambda(Runnable[Input, Output]):
def __init__(self, func: Callable[[Input], Output]):
self.func = func # 传入的普通函数
# 实现 Runnable 的核心方法(invoke/batch/ainvoke 等)
def invoke(self, input: Input, config=None) -> Output:
return self.func(input) # 调用原始函数并返回结果
简单说:RunnableLambda 就是一个'包装器',把普通函数包一层,让它拥有 invoke、batch、ainvoke 等 Runnable 的核心能力。
RunnableLambda 的价值不在于'创造新功能',而在于'打通兼容性',它的核心功能可以总结为 3 点:
| 核心功能 | 具体说明 | 应用场景 |
|---|---|---|
| 函数转 Runnable | 普通函数 → Runnable 对象 | 插入 LangChain 链中执行 |
| 无缝链式组合 | 支持 ` | ` 符号与其他 Runnable(PromptTemplate、Model、Parser)组合 |
| 原生支持高级特性 | 无需额外编码,自动支持异步(ainvoke)、批量(batch)、流式(stream) | 高并发、大数据量场景 |
很多同学会问:'我直接在链中调用函数不行吗?为什么要多包一层?' 下面用表格清晰对比:
| 特性 | 普通函数 | RunnableLambda 包装后 | 关键说明 |
|---|---|---|---|
| 可组合性 | ❌ 无法直接用 ` | ` 接入链 | ✅ 支持 ` |
| 类型校验 | ❌ 无原生支持 | ✅ 支持静态类型检查 | 配合 TypeHint,IDE 可自动提示参数/返回值类型 |
| 异步支持 | ❌ 需手动实现 async | ✅ 原生支持 ainvoke | 传入 async 函数即可异步执行,无需额外封装 |
| 批量处理 | ❌ 需手动循环 | ✅ 原生支持 batch | 自动优化批量请求(如批量调用大模型时减少网络开销) |
| LangChain 生态兼容 | ❌ 无法接入 Chain | ✅ 无缝集成所有 Runnable 链路 | 可与 PromptTemplate、Retriever、Parser 等直接组合 |

# 嵌套调用,繁琐且不易扩展
def clean_text(x):
return x.strip().lower()
input_text = " Hello LangChain! "
cleaned = clean_text(input_text)
result = model.invoke(cleaned) # 手动传递参数
# 链式组合,清晰且可扩展
chain = RunnableLambda(clean_text) | model
result = chain.invoke(" Hello LangChain! ") # 自动传递参数
from langchain_core.runnables import RunnableLambda
# 1. 定义普通函数
def log_input(x):
print(f"[日志] 输入内容:{x}")
return x # 必须返回值,供下一个环节使用
# 2. 包装为 RunnableLambda
log_runnable = RunnableLambda(log_input)
# 3. 调用(支持 invoke/batch/ainvoke)
# 单个调用
log_runnable.invoke("测试输入")
# 输出:[日志] 输入内容:测试输入
# 批量调用(自动优化)
log_runnable.batch(["输入 1", "输入 2", "输入 3"])
| 方法 | 作用 | 示例 |
|---|---|---|
invoke(input) | 单个输入执行 | runnable.invoke("hello") |
batch(inputs) | 批量输入执行 | runnable.batch(["a", "b", "c"]) |
ainvoke(input) | 异步单个执行(需传入 async 函数) | await runnable.ainvoke("hello") |
astream(input) | 异步流式输出 | async for chunk in runnable.astream("hello") |
None)结合之前学过的 LangChain 知识,用 5 个实战案例带你掌握 RunnableLambda 的核心用法!

需求:去除首尾空格 → 转为小写 → 过滤数字
from langchain_core.runnables import RunnableLambda
# 构建链式清洗流程
text_clean_chain = (
RunnableLambda(lambda x: x.strip()) # 去除空格
| RunnableLambda(lambda x: x.lower()) # 转为小写
| RunnableLambda(lambda x: ''.join([c for c in x if not c.isdigit()])) # 过滤数字
)
# 执行
result = text_clean_chain.invoke(" Hello123World456 ")
print(result) # 输出:helloworld
需求:解析用户输入 → 过滤敏感词 → 记录日志 → 调用大模型
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
import os
# 1. 配置模型(建议用环境变量存储密钥,避免硬编码)
model = ChatOpenAI(
model_name="qwen-plus",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
api_key=os.getenv("DASHSCOPE_API_KEY"), # 从环境变量读取
temperature=0.7
)
# 2. 自定义函数:过滤敏感词
def filter_sensitive(text: str) -> str:
sensitive_words = ["暴力", "色情", "赌博"]
for word in sensitive_words:
text = text.replace(word, "***")
return text
# 3. 自定义函数:记录请求日志
def log_request(text: str) -> str:
print(f"[用户请求] {text}")
return text
# 4. 构建链(解析输入→过滤→日志→模型)
chain = (
RunnableLambda(lambda x: x["user_input"]) # 从字典中提取输入
| RunnableLambda(filter_sensitive) # 过滤敏感词
| RunnableLambda(log_request) # 记录日志
| model # 调用大模型
)
# 执行
result = chain.invoke({"user_input": "这个电影包含暴力内容"})
print(result.content) # 输出:这个电影包含***内容(大模型处理后的结果)
需求:异步调用外部 API(如翻译接口),融入链中
import asyncio
import aiohttp
from langchain_core.runnables import RunnableLambda
from langchain_core.prompts import ChatPromptTemplate
# 1. 异步函数:调用外部翻译 API(示例用百度翻译测试)
async def translate_to_en(text: str) -> str:
async with aiohttp.ClientSession() as session:
url = "https://fanyi.baidu.com/sug"
params = {"kw": text}
async with session.get(url, params=params) as resp:
data = await resp.json()
return data["data"][0]["v"] # 简化处理,实际需异常捕获
# 2. 构建链:翻译→Prompt→模型
prompt = ChatPromptTemplate.from_template("解释这个英文短语:{text}")
chain = (
RunnableLambda(translate_to_en) # 异步翻译(传入 async 函数)
| prompt # 构建 Prompt
| model # 调用模型解释
)
# 3. 异步执行
async def main():
result = await chain.ainvoke("你好世界") # 用 ainvoke 异步调用
print(result.content)
asyncio.run(main())
需求:批量处理 10 条用户评论,清洗后批量调用模型生成回复
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
import os
model = ChatOpenAI(model_name="qwen-plus", api_key=os.getenv("DASHSCOPE_API_KEY"))
# 1. 文本清洗函数
def clean_comment(comment: str) -> str:
return comment.strip().replace("垃圾", "***").replace("卧槽", "**")
# 2. 构建链:清洗→生成回复
chain = (
RunnableLambda(clean_comment)
| ChatPromptTemplate.from_template("回复用户评论:{input}")
| model
)
# 3. 批量执行(自动优化请求,比循环 invoke 更高效)
comments = [
" 这个产品太垃圾了! ",
"卧槽,体验超差!",
"很好用,推荐!",
# ... 更多评论(共 10 条)
]
# 批量调用
results = chain.batch(comments)
for idx, res in enumerate(results):
print(f"回复{idx+1}:{res.content}")
需求:RAG 检索后,用 RunnableLambda 处理检索结果,再通过 PydanticOutputParser 格式化输出
from langchain_core.runnables import RunnableLambda
from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from langchain_milvus import Milvus
from langchain_core.embeddings import FakeEmbeddings
# 1. 定义输出格式(Pydantic)
class CommentSummary(BaseModel):
sentiment: str = Field(description="情感倾向:正面/负面/中性")
reason: str = Field(description="判断理由")
reply: str = Field(description="回复建议")
parser = PydanticOutputParser(pydantic_object=CommentSummary)
# 2. RAG 检索(简化:用 FakeEmbeddings 模拟)
embeddings = FakeEmbeddings(size=100)
vector_db = Milvus(
embedding_function=embeddings,
connection_args={"uri": "http://localhost:19530"},
collection_name="comments",
)
# 3. 自定义函数:处理检索结果(提取前 3 条相关评论)
def process_retrieval(results):
comments = [doc.page_content for doc in results[:3]]
return "\n".join(comments)
# 4. 构建链:检索→处理结果→Prompt→模型→解析
chain = (
vector_db.as_retriever() # RAG 检索
| RunnableLambda(process_retrieval) # 处理检索结果
| ChatPromptTemplate.from_template("""
分析以下用户评论,按要求输出:
{comments}
{format_instructions}
""")
| model
| parser # 格式化输出
)
# 执行
result = chain.invoke("产品质量相关评论")
print(result.sentiment) # 输出:负面
print(result.reply) # 输出:针对质量问题的回复建议
def log(x): print(x)(无返回值)def log(x): print(x); return x(必须返回输入或处理后的值)batch 会自动循环调用(无需手动处理),但建议确保函数是'无状态'的(不依赖全局变量)。ainvoke 调用invoke 会报错,必须用 await ainvoke() 或 astream()。batch 而非循环 invoke(LangChain 会自动优化网络请求和资源占用)ainvoke),避免阻塞主线程RunnableLambda 是 LangChain 中'灵活性的关键'——它让我们摆脱了 LangChain 内置组件的限制,能够将任意自定义逻辑(数据处理、日志、外部系统调用)无缝融入 AI 链路中。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML 转 Markdown 互为补充。 在线工具,Markdown 转 HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML 转 Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online