跳到主要内容
LangChain 消息处理:缓存、过滤、合并与流式输出实战 | 极客日志
Python AI
LangChain 消息处理:缓存、过滤、合并与流式输出实战 LangChain 消息处理技术。涵盖 InMemoryChatMessageHistory 实现对话历史缓存,filter_messages 按类型或 ID 筛选消息,merge_message_runs 合并连续同类型消息以避免异常。重点讲解同步与异步流式输出(stream/astream)的区别及适用场景,结合 FastAPI 实现高并发聊天机器人。总结各功能组件用途及典型应用案例。
星落 发布于 2026/4/6 更新于 2026/5/23 34 浏览
一、消息内存缓存
核心概念
通过 InMemoryChatMessageHistory 将对话历史存储在内存中,使模型能"记住"之前的对话内容。
关键组件
组件 作用 InMemoryChatMessageHistory内存中的聊天记录存储器 RunnableWithMessageHistory将模型包装为支持历史记录的可运行对象 memory_store(字典)以 session_id 为 key 管理多个会话的历史
代码流程
memory_store = {}
def get_session_history (session_id: str ):
if session_id not in memory_store:
memory_store[session_id] = InMemoryChatMessageHistory()
return memory_store[session_id]
message_model = RunnableWithMessageHistory(model, get_session_history)
config = {"configurable" : {"session_id" : "123" }}
response1 = message_model.invoke({"input" : "你好,我是小明" }, config=config)
response2 = message_model.invoke({"input" : "我叫什么名字?" }, config=config)
运行效果
第一轮:用户说"我是小明",AI 正常打招呼
第二轮:用户问"我叫什么名字",AI 能从历史中回忆出"小明"
从 LangChain 的 v0.3 版本开始,官方建议 LangChain 用户不要使用 RunnableWithMessageHistory,而是利用 LangGraph 持久性来完成。
二、消息过滤
核心概念 使用 filter_messages 函数对消息列表进行筛选,按类型 或ID 过滤消息。
关键函数 from langchain_core.messages import filter_messages
过滤参数 参数 作用 示例 include_types只保留指定类型的消息 ["ai"] → 只保留 AI 消息exclude_ids排除指定 ID 的消息 ["4"] → 排除 id 为"4"的消息
代码示例 messages = [
HumanMessage(content="你好,我是小明" , id ="1" ),
AIMessage(content="你好,小明!很高兴认识你!" , id ="2" ),
HumanMessage(content="我想知道我之前的名字" , id ="3" ),
AIMessage(content="你之前的名字是小绿!" , id ="4" ),
]
filtered_messages = filter_messages(
messages,
include_types=["ai" ],
exclude_ids=["4" ],
)
过滤逻辑 原始消息 → include_types=["ai"] 筛掉 Human 消息 → exclude_ids=["4"] 再排除 id=4 → 最终结果
原始:[Human#1, AI#2, Human#3, AI#4]
↓ include_types =["ai" ]
中间:[AI#2, AI#4]
↓ exclude_ids =["4" ]
结果:[AI#2]
三、消息合并
核心概念 使用 merge_message_runs 将连续的同类型消息 合并为一条,避免多条连续 Human 或 AI 消息导致模型报错或行为异常。
关键函数 from langchain_core.messages import merge_message_runs
代码示例 messages = [
HumanMessage(content="你好" , id ="1" ),
HumanMessage(content="我是小明" , id ="2" ),
AIMessage(content="你好,小明!" , id ="3" ),
AIMessage(content="很高兴认识你!" , id ="4" ),
]
merged_messages = merge_message_runs(messages)
合并效果 合并前(4 条): human: 你好
human: 我是小明
ai: 你好,小明!
ai: 很高兴认识你!
合并后(2 条): human: 你好\n我是小明
ai: 你好,小明!\n很高兴认识你!
两种使用方式
merged_messages = merge_message_runs(messages)
model.invoke(merged_messages)
chain = merge_message_runs | model
response = chain.invoke(messages)
管道方式更简洁,适合在 LangChain 链式调用中使用。
四、流式输出
什么是流式输出 流式输出(Streaming) 是指 AI 模型逐字返回内容,而不是等待全部生成完毕后一次性返回。就像 ChatGPT 那样,文字一个个"打"出来,而不是突然全部出现。
为什么需要? AI 生成长文本可能需要几秒甚至更长时间。传统方式用户需要等待整个响应完成才能看到内容,体验很差。流式输出实时展示生成过程,让用户感觉响应更快,交互更自然。
特性 非流式 流式 用户体验 需要等待 实时看到 适用场景 短文本 聊天对话、长文本 内存占用 一次性加载 逐块处理 可控性 无法中断 可随时停止
典型应用
聊天机器人 :像 ChatGPT 一样逐字显示
文章生成 :实时展示生成过程
代码生成 :逐行显示代码
实时翻译 :边翻译边显示
五、同步 vs 异步流式 LangChain 提供两种流式方式:同步(stream)和异步(astream)。
核心区别 特性 同步 stream 异步 astream 调用 chain.stream()chain.astream()循环 for chunk inasync for chunk in阻塞 阻塞线程 不阻塞,可并发 场景 单个请求 多个并发请求 性能 一般 更高
工作原理 同步流式: 阻塞当前线程,处理一个请求时无法处理其他请求。就像排队买咖啡,必须等前一个人买完。
异步流式: 使用协程机制,等待 AI 响应时可以切换到其他任务。就像服务员可以同时为多桌客人点单。
何时使用异步?
多用户 Web 应用
高并发聊天机器人
与其他异步操作结合
六、流式输出基础用法
同步流式 from langchain_deepseek import ChatDeepSeek
from langchain_core.output_parsers import StrOutputParser
model = ChatDeepSeek(model="deepseek-chat" , streaming=True )
parser = StrOutputParser()
chain = model | parser
for chunk in chain.stream("写一个关于程序员的笑话" ):
print (chunk, end="|" , flush=True )
streaming=True:必须设置
flush=True:立即刷新输出
异步流式 import asyncio
async def main ():
chain = model | parser
async for chunk in chain.astream("写一个关于程序员的笑话" ):
print (chunk, end="|" , flush=True )
if __name__ == "__main__" :
asyncio.run(main())
async def:定义异步函数
async for:异步迭代
asyncio.run():运行入口
七、输出解析器 StrOutputParser 是最常用的解析器,将模型输出转换为纯文本。
def custom_parser (output: str ) -> str :
return output.strip().replace("。" , "!" )
chain = model | parser | custom_parser
格式转换(Markdown → HTML)
内容过滤审核
特殊字符处理
八、流式输出实际应用
1. 聊天机器人 用户发送消息后,AI 回复逐字显示,像真人打字。使用异步流式提高响应速度。
2. 多用户并发 Web 应用中多个用户同时请求,异步流式可以并发处理。
同步:3 个请求需要 15 秒(串行)
异步:3 个请求只需 5 秒(并发)
3. FastAPI 集成 from fastapi import FastAPI
from fastapi.responses import StreamingResponse
@app.get("/chat" )
async def chat_stream (question: str ):
async def generate ():
async for chunk in chain.astream(question):
yield chunk
return StreamingResponse(generate(), media_type="text/plain" )
九、常见问题
1. 没有流式效果? 原因: 忘记 streaming=True 或 flush=True
2. async for 报错? 原因: 使用了 ainvoke() 而不是 astream()
ainvoke() 返回完整结果,astream() 返回流式迭代器。
3. 性能对比
单个请求:同步和异步相近
多个并发:异步快 3 倍
十、总结对比 功能 函数/类 用途 内存缓存 InMemoryChatMessageHistory + RunnableWithMessageHistory让模型记住多轮对话上下文 消息过滤 filter_messages按类型/ID 筛选消息 消息合并 merge_message_runs合并连续同类型消息 流式输出 stream / astream实时逐字返回,提升体验 输出解析 StrOutputParser将模型输出转为纯文本
典型应用场景
内存缓存 :多轮对话场景,用户问"我之前说了什么"时模型能回答
消息过滤 :只提取 AI 回复做摘要、排除某些敏感消息
消息合并 :用户连续发了多条消息时,合并后再发给模型,避免格式错误
流式输出 :聊天机器人逐字显示、长文本生成、FastAPI 接口集成
流式输出要点
流式输出 = 实时返回,提升体验
同步 = 简单,适合学习
异步 = 高性能,适合生产
必须设置 streaming=True 和 flush=True
相关免费在线工具 RSA密钥对生成器 生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
Mermaid 预览与可视化编辑 基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
随机西班牙地址生成器 随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online