跳到主要内容
极客日志极客日志面向AI+效率的开发者社区
首页博客GitHub 精选镜像工具UI配色美学隐私政策关于联系
搜索内容 / 工具 / 仓库 / 镜像...⌘K搜索
注册
博客列表
PythonAI

LangChain 消息处理:缓存、过滤、合并与流式输出

LangChain 消息处理机制,包括内存缓存、消息过滤与合并。重点介绍流式输出的同步与异步用法,对比性能差异,并提供 FastAPI 集成示例。同时指出 LangGraph 作为新版本的持久化替代方案,并总结常见问题与最佳实践。

链路追踪发布于 2026/4/6更新于 2026/5/2124 浏览
LangChain 消息处理:缓存、过滤、合并与流式输出

一、消息内存缓存

核心概念

通过 InMemoryChatMessageHistory 将对话历史存储在内存中,使模型能"记住"之前的对话内容。

关键组件
组件作用
InMemoryChatMessageHistory内存中的聊天记录存储器
RunnableWithMessageHistory将模型包装为支持历史记录的可运行对象
memory_store(字典)以 session_id 为 key 管理多个会话的历史
代码流程
# 1. 创建内存存储字典
memory_store = {}

# 2. 定义获取会话历史的函数(按 session_id 区分会话)
def get_session_history(session_id: str):
    if session_id not in memory_store:
        memory_store[session_id] = InMemoryChatMessageHistory()
    return memory_store[session_id]

# 3. 用 RunnableWithMessageHistory 包装模型
message_model = RunnableWithMessageHistory(model, get_session_history)

# 4. 通过 config 指定会话 ID
config = {"configurable": {"session_id": "123"}}

# 5. 多轮对话,模型自动记住上下文
response1 = message_model.invoke({"input": "你好,我是小明"}, config=config)
response2 = message_model.invoke({"input": "我叫什么名字?"}, config=config)
# → 模型能回答出"小明",因为历史被缓存了
运行效果
  • 第一轮:用户说"我是小明",AI 正常打招呼
  • 第二轮:用户问"我叫什么名字",AI 能从历史中回忆出"小明"

从 LangChain 的 v0.3 版本开始,官方建议 LangChain 用户不要使用 RunnableWithMessageHistory,而是利用 LangGraph 持久性来完成


二、消息过滤

核心概念

使用 filter_messages 函数对消息列表进行筛选,按类型或ID过滤消息。

关键函数
from langchain_core.messages import filter_messages 
过滤参数
参数作用示例
include_types只保留指定类型的消息["ai"] → 只保留 AI 消息
exclude_ids排除指定 ID 的消息["4"] → 排除 id 为'4'的消息
代码示例
messages = [
    HumanMessage(content="你好,我是小明", id="1"),
    AIMessage(content="你好,小明!很高兴认识你!", id="2"),
    HumanMessage(content="我想知道我之前的名字", id="3"),
    AIMessage(content="你之前的名字是小绿!", id="4"),
]
# 过滤:只保留 AI 消息,且排除 id=4 的消息
filtered_messages = filter_messages(
    messages, include_types=["ai"], exclude_ids=["4"],
)
# → 结果只剩 id=2 的 AIMessage: "你好,小明!很高兴认识你!"
过滤逻辑

原始消息 → include_types=["ai"] 筛掉 Human 消息 → exclude_ids=["4"] 再排除 id=4 → 最终结果

原始:[Human#1, AI#2, Human#3, AI#4]
↓ include_types=["ai"]
中间:[AI#2, AI#4]
↓ exclude_ids=["4"]
结果:[AI#2]

三、消息合并

核心概念

使用 merge_message_runs 将连续的同类型消息合并为一条,避免多条连续 Human 或 AI 消息导致模型报错或行为异常。

关键函数
from langchain_core.messages import merge_message_runs 
代码示例
messages = [
    HumanMessage(content="你好", id="1"),
    HumanMessage(content="我是小明", id="2"), # 连续两条 Human
    AIMessage(content="你好,小明!", id="3"),
    AIMessage(content="很高兴认识你!", id="4"), # 连续两条 AI
]
merged_messages = merge_message_runs(messages)
合并效果
合并前(4 条): human: 你好
human: 我是小明
ai: 你好,小明!
ai: 很高兴认识你!
合并后(2 条): human: 你好
我是小明
ai: 你好,小明!
很高兴认识你!
两种使用方式
# 方式一:直接调用函数合并后传给模型
merged_messages = merge_message_runs(messages)
model.invoke(merged_messages)

# 方式二:通过管道(pipe)操作,合并与模型调用串联
chain = merge_message_runs | model
response = chain.invoke(messages)

管道方式更简洁,适合在 LangChain 链式调用中使用。


四、流式输出

什么是流式输出

流式输出(Streaming) 是指 AI 模型逐字返回内容,而不是等待全部生成完毕后一次性返回。就像 ChatGPT 那样,文字一个个"打"出来,而不是突然全部出现。

为什么需要?

AI 生成长文本可能需要几秒甚至更长时间。传统方式用户需要等待整个响应完成才能看到内容,体验很差。流式输出实时展示生成过程,让用户感觉响应更快,交互更自然。

特性非流式流式
用户体验需要等待实时看到
适用场景短文本聊天对话、长文本
内存占用一次性加载逐块处理
可控性无法中断可随时停止
典型应用
  1. 聊天机器人:像 ChatGPT 一样逐字显示
  2. 文章生成:实时展示生成过程
  3. 代码生成:逐行显示代码
  4. 实时翻译:边翻译边显示

五、同步 vs 异步流式

LangChain 提供两种流式方式:同步(stream)和异步(astream)。

核心区别
特性同步 stream异步 astream
调用chain.stream()chain.astream()
循环for chunk inasync for chunk in
阻塞阻塞线程不阻塞,可并发
场景单个请求多个并发请求
性能一般更高
工作原理

同步流式: 阻塞当前线程,处理一个请求时无法处理其他请求。就像排队买咖啡,必须等前一个人买完。

异步流式: 使用协程机制,等待 AI 响应时可以切换到其他任务。就像服务员可以同时为多桌客人点单。

何时使用异步?

推荐:

  • 多用户 Web 应用
  • 高并发聊天机器人
  • 与其他异步操作结合

不需要:

  • 简单的单次调用
  • 学习测试阶段

六、流式输出基础用法

同步流式
from langchain_deepseek import ChatDeepSeek
from langchain_core.output_parsers import StrOutputParser

model = ChatDeepSeek(model="deepseek-chat", streaming=True)
parser = StrOutputParser()
chain = model | parser

for chunk in chain.stream("写一个关于程序员的笑话"):
    print(chunk, end="|", flush=True)

关键点:

  • streaming=True:必须设置
  • flush=True:立即刷新输出
异步流式
import asyncio

async def main():
    chain = model | parser
    async for chunk in chain.astream("写一个关于程序员的笑话"):
        print(chunk, end="|", flush=True)

if __name__ == "__main__":
    asyncio.run(main())

关键点:

  • async def:定义异步函数
  • async for:异步迭代
  • asyncio.run():运行入口

七、输出解析器

StrOutputParser 是最常用的解析器,将模型输出转换为纯文本。

作用:

  • 提取文本内容
  • 去除多余格式
  • 统一输出格式

自定义解析器:

def custom_parser(output: str) -> str:
    return output.strip().replace("。", "!")

chain = model | parser | custom_parser 

应用场景:

  • 格式转换(Markdown → HTML)
  • 内容过滤审核
  • 特殊字符处理

八、流式输出实际应用

1. 聊天机器人

用户发送消息后,AI 回复逐字显示,像真人打字。使用异步流式提高响应速度。

2. 多用户并发

Web 应用中多个用户同时请求,异步流式可以并发处理。

性能对比:

  • 同步:3 个请求需要 15 秒(串行)
  • 异步:3 个请求只需 5 秒(并发)
3. FastAPI 集成
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

@app.get("/chat")
async def chat_stream(question: str):
    async def generate():
        async for chunk in chain.astream(question):
            yield chunk
    return StreamingResponse(generate(), media_type="text/plain")

九、常见问题

1. 没有流式效果?

原因: 忘记 streaming=True 或 flush=True

2. async for 报错?

原因: 使用了 ainvoke() 而不是 astream()

ainvoke() 返回完整结果,astream() 返回流式迭代器。

3. 性能对比
  • 单个请求:同步和异步相近
  • 多个并发:异步快 3 倍

十、总结对比

功能函数/类用途
内存缓存InMemoryChatMessageHistory + RunnableWithMessageHistory让模型记住多轮对话上下文
消息过滤filter_messages按类型/ID 筛选消息
消息合并merge_message_runs合并连续同类型消息
流式输出stream / astream实时逐字返回,提升体验
输出解析StrOutputParser将模型输出转为纯文本
典型应用场景
  • 内存缓存:多轮对话场景,用户问"我之前说了什么"时模型能回答
  • 消息过滤:只提取 AI 回复做摘要、排除某些敏感消息
  • 消息合并:用户连续发了多条消息时,合并后再发给模型,避免格式错误
  • 流式输出:聊天机器人逐字显示、长文本生成、FastAPI 接口集成
流式输出要点
  1. 流式输出 = 实时返回,提升体验
  2. 同步 = 简单,适合学习
  3. 异步 = 高性能,适合生产
  4. 必须设置 streaming=True 和 flush=True

目录

  1. 一、消息内存缓存
  2. 核心概念
  3. 关键组件
  4. 代码流程
  5. 1. 创建内存存储字典
  6. 2. 定义获取会话历史的函数(按 session_id 区分会话)
  7. 3. 用 RunnableWithMessageHistory 包装模型
  8. 4. 通过 config 指定会话 ID
  9. 5. 多轮对话,模型自动记住上下文
  10. → 模型能回答出"小明",因为历史被缓存了
  11. 运行效果
  12. 二、消息过滤
  13. 核心概念
  14. 关键函数
  15. 过滤参数
  16. 代码示例
  17. 过滤:只保留 AI 消息,且排除 id=4 的消息
  18. → 结果只剩 id=2 的 AIMessage: "你好,小明!很高兴认识你!"
  19. 过滤逻辑
  20. 三、消息合并
  21. 核心概念
  22. 关键函数
  23. 代码示例
  24. 合并效果
  25. 两种使用方式
  26. 方式一:直接调用函数合并后传给模型
  27. 方式二:通过管道(pipe)操作,合并与模型调用串联
  28. 四、流式输出
  29. 什么是流式输出
  30. 为什么需要?
  31. 典型应用
  32. 五、同步 vs 异步流式
  33. 核心区别
  34. 工作原理
  35. 何时使用异步?
  36. 六、流式输出基础用法
  37. 同步流式
  38. 异步流式
  39. 七、输出解析器
  40. 八、流式输出实际应用
  41. 1\. 聊天机器人
  42. 2\. 多用户并发
  43. 3\. FastAPI 集成
  44. 九、常见问题
  45. 1\. 没有流式效果?
  46. 2\. async for 报错?
  47. 3\. 性能对比
  48. 十、总结对比
  49. 典型应用场景
  50. 流式输出要点
  • 💰 8折买阿里云服务器限时8折了解详情
  • Magick API 一键接入全球大模型注册送1000万token查看
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • 青少年机器人编程系统化学习路径:从机械启蒙到人工智能
  • 企业级招聘数据采集:基于 Bright Data AI Studio 的自动化爬虫方案
  • Web3 入门:从比特币到以太坊智能合约
  • 设计支持万人并发抢购的秒杀系统架构方案
  • 激光雷达外参标定算法详解
  • Java IO 流核心概念与应用实践
  • 数据结构详解:堆、哈希表与字符串哈希
  • 医疗 AI 模型部署的现实困境与临床集成挑战
  • Stable Diffusion 详细使用教程:安装、配置与实战指南
  • Python YOLOv8 进阶教程
  • C++ 基础概念详解
  • Git 远程仓库同步实战:从克隆到推送避坑指南
  • AI 大模型基础:LLM 核心概念与架构解析
  • 高性能C++服务多线程资源调度优化实战
  • 基于 ECharts 与 Flask 的交互式数据可视化应用开发
  • C++ 转 C#:核心思维转变与实战要点
  • 安卓手机使用 Termux 部署 AstrBot 与 NapCat QQ 机器人
  • JVM 垃圾收集器:ParNew、CMS 与三色标记算法
  • 指针与数据结构:核心原理与实战解析
  • FPGA 实现 CAN 总线接口与数据帧解析

相关免费在线工具

  • RSA密钥对生成器

    生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online

  • Mermaid 预览与可视化编辑

    基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online

  • 随机西班牙地址生成器

    随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online

  • curl 转代码

    解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online