跳到主要内容
极客日志极客日志面向AI+效率的开发者社区
首页博客GitHub 精选镜像工具UI配色美学隐私政策关于联系
搜索内容 / 工具 / 仓库 / 镜像...⌘K搜索
注册
博客列表
PythonAI

LangChain 消息处理实战:缓存、过滤、合并与流式输出

LangChain 消息处理涵盖内存缓存、消息过滤、消息合并及流式输出等核心功能。通过 InMemoryChatMessageHistory 实现多轮对话上下文记忆,利用 filter_messages 按类型或 ID 筛选消息,使用 merge_message_runs 合并连续同类型消息以避免异常。流式输出分为同步与异步两种模式,能实时返回内容提升用户体验,特别适用于聊天机器人及高并发场景。结合 FastAPI 可实现高效接口集成,StrOutputParser 用于统一输出格式。

GRACE Grace发布于 2026/4/7更新于 2026/5/2019 浏览
LangChain 消息处理实战:缓存、过滤、合并与流式输出

一、消息内存缓存

核心概念

通过 InMemoryChatMessageHistory 将对话历史存储在内存中,使模型能记住之前的对话内容。

关键组件

组件作用
InMemoryChatMessageHistory内存中的聊天记录存储器
RunnableWithMessageHistory将模型包装为支持历史记录的可运行对象
memory_store(字典)以 session_id 为 key 管理多个会话的历史

代码流程

# 1. 创建内存存储字典
memory_store = {}

# 2. 定义获取会话历史的函数(按 session_id 区分会话)
def get_session_history(session_id: str):
    if session_id not in memory_store:
        memory_store[session_id] = InMemoryChatMessageHistory()
    return memory_store[session_id]

# 3. 用 RunnableWithMessageHistory 包装模型
message_model = RunnableWithMessageHistory(model, get_session_history)

# 4. 通过 config 指定会话 ID
config = {"configurable": {"session_id": "123"}}

# 5. 多轮对话,模型自动记住上下文
response1 = message_model.invoke({"input": "你好,我是小明"}, config=config)
response2 = message_model.invoke({"input": "我叫什么名字?"}, config=config)
# → 模型能回答出'小明',因为历史被缓存了

运行效果

  • 第一轮:用户说'我是小明',AI 正常打招呼
  • 第二轮:用户问'我叫什么名字',AI 能从历史中回忆出'小明'

从 LangChain 的 v0.3 版本开始,官方建议 LangChain 用户不要使用 RunnableWithMessageHistory,而是利用 LangGraph 持久性来完成。


二、消息过滤

核心概念

使用 filter_messages 函数对消息列表进行筛选,按类型或 ID 过滤消息。

关键函数

from langchain_core.messages import filter_messages 

过滤参数

参数作用示例
include_types只保留指定类型的消息"ai" → 只保留 AI 消息
exclude_ids排除指定 ID 的消息"4" → 排除 id 为'4'的消息

代码示例

messages = [
    HumanMessage(content="你好,我是小明", id="1"),
    AIMessage(content="你好,小明!很高兴认识你!", id="2"),
    HumanMessage(content="我想知道我之前的名字", id="3"),
    AIMessage(content="你之前的名字是小绿!", id="4"),
]
# 过滤:只保留 AI 消息,且排除 id=4 的消息
filtered_messages = filter_messages(
    messages, include_types=["ai"], exclude_ids=["4"]
)
# → 结果只剩 id=2 的 AIMessage: "你好,小明!很高兴认识你!"

过滤逻辑

原始消息 → include_types=["ai"] 筛掉 Human 消息 → exclude_ids=["4"] 再排除 id=4 → 最终结果

原始:[Human#1, AI#2, Human#3, AI#4]
↓ include_types=["ai"]
中间:[AI#2, AI#4]
↓ exclude_ids=["4"]
结果:[AI#2]

三、消息合并

核心概念

使用 merge_message_runs 将连续的同类型消息合并为一条,避免多条连续 Human 或 AI 消息导致模型报错或行为异常。

关键函数

from langchain_core.messages import merge_message_runs 

代码示例

messages = [
    HumanMessage(content="你好", id="1"),
    HumanMessage(content="我是小明", id="2"), # 连续两条 Human
    AIMessage(content="你好,小明!", id="3"),
    AIMessage(content="很高兴认识你!", id="4"), # 连续两条 AI
]
merged_messages = merge_message_runs(messages)

合并效果

合并前(4 条):
human: 你好
human: 我是小明
ai: 你好,小明!
ai: 很高兴认识你!

合并后(2 条):
human: 你好\n我是小明
ai: 你好,小明!\n很高兴认识你!

两种使用方式

# 方式一:直接调用函数合并后传给模型
merged_messages = merge_message_runs(messages)
model.invoke(merged_messages)

# 方式二:通过管道(pipe)操作,合并与模型调用串联
chain = merge_message_runs | model
response = chain.invoke(messages)

管道方式更简洁,适合在 LangChain 链式调用中使用。


四、流式输出

什么是流式输出

流式输出(Streaming) 是指 AI 模型逐字返回内容,而不是等待全部生成完毕后一次性返回。就像 ChatGPT 那样,文字一个个'打'出来,而不是突然全部出现。

为什么需要?

AI 生成长文本可能需要几秒甚至更长时间。传统方式用户需要等待整个响应完成才能看到内容,体验很差。流式输出实时展示生成过程,让用户感觉响应更快,交互更自然。

特性非流式流式
用户体验需要等待实时看到
适用场景短文本聊天对话、长文本
内存占用一次性加载逐块处理
可控性无法中断可随时停止

典型应用

  1. 聊天机器人:像 ChatGPT 一样逐字显示
  2. 文章生成:实时展示生成过程
  3. 代码生成:逐行显示代码
  4. 实时翻译:边翻译边显示

五、同步 vs 异步流式

LangChain 提供两种流式方式:同步(stream)和异步(astream)。

核心区别

特性同步 stream异步 astream
调用chain.stream()chain.astream()
循环for chunk inasync for chunk in
阻塞阻塞线程不阻塞,可并发
场景单个请求多个并发请求
性能一般更高

工作原理

同步流式: 阻塞当前线程,处理一个请求时无法处理其他请求。就像排队买咖啡,必须等前一个人买完。

异步流式: 使用协程机制,等待 AI 响应时可以切换到其他任务。就像服务员可以同时为多桌客人点单。

何时使用异步?

推荐:

  • 多用户 Web 应用
  • 高并发聊天机器人
  • 与其他异步操作结合

不需要:

  • 简单的单次调用
  • 学习测试阶段

六、流式输出基础用法

同步流式

from langchain_deepseek import ChatDeepSeek
from langchain_core.output_parsers import StrOutputParser

model = ChatDeepSeek(model="deepseek-chat", streaming=True)
parser = StrOutputParser()
chain = model | parser

for chunk in chain.stream("写一个关于程序员的笑话"):
    print(chunk, end="|", flush=True)

关键点:

  • streaming=True:必须设置
  • flush=True:立即刷新输出

异步流式

import asyncio

async def main():
    chain = model | parser
    async for chunk in chain.astream("写一个关于程序员的笑话"):
        print(chunk, end="|", flush=True)

if __name__ == "__main__":
    asyncio.run(main())

关键点:

  • async def:定义异步函数
  • async for:异步迭代
  • asyncio.run():运行入口

七、输出解析器

StrOutputParser 是最常用的解析器,将模型输出转换为纯文本。

作用:

  • 提取文本内容
  • 去除多余格式
  • 统一输出格式

自定义解析器:

def custom_parser(output: str) -> str:
    return output.strip().replace("。", "!")

chain = model | parser | custom_parser

应用场景:

  • 格式转换(Markdown → HTML)
  • 内容过滤审核
  • 特殊字符处理

八、流式输出实际应用

1. 聊天机器人

用户发送消息后,AI 回复逐字显示,像真人打字。使用异步流式提高响应速度。

2. 多用户并发

Web 应用中多个用户同时请求,异步流式可以并发处理。

性能对比:

  • 同步:3 个请求需要 15 秒(串行)
  • 异步:3 个请求只需 5 秒(并发)

3. FastAPI 集成

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

@app.get("/chat")
async def chat_stream(question: str):
    async def generate():
        async for chunk in chain.astream(question):
            yield chunk
    return StreamingResponse(generate(), media_type="text/plain")

九、常见问题

1. 没有流式效果?

原因: 忘记 streaming=True 或 flush=True

2. async for 报错?

原因: 使用了 ainvoke() 而不是 astream()

ainvoke() 返回完整结果,astream() 返回流式迭代器。

3. 性能对比

  • 单个请求:同步和异步相近
  • 多个并发:异步快 3 倍

十、总结对比

功能函数/类用途
内存缓存InMemoryChatMessageHistory + RunnableWithMessageHistory让模型记住多轮对话上下文
消息过滤filter_messages按类型/ID 筛选消息
消息合并merge_message_runs合并连续同类型消息
流式输出stream / astream实时逐字返回,提升体验
输出解析StrOutputParser将模型输出转为纯文本

典型应用场景

  • 内存缓存:多轮对话场景,用户问'我之前说了什么'时模型能回答
  • 消息过滤:只提取 AI 回复做摘要、排除某些敏感消息
  • 消息合并:用户连续发了多条消息时,合并后再发给模型,避免格式错误
  • 流式输出:聊天机器人逐字显示、长文本生成、FastAPI 接口集成

流式输出要点

  1. 流式输出 = 实时返回,提升体验
  2. 同步 = 简单,适合学习
  3. 异步 = 高性能,适合生产
  4. 必须设置 streaming=True 和 flush=True

目录

  1. 一、消息内存缓存
  2. 核心概念
  3. 关键组件
  4. 代码流程
  5. 1. 创建内存存储字典
  6. 2. 定义获取会话历史的函数(按 session_id 区分会话)
  7. 3. 用 RunnableWithMessageHistory 包装模型
  8. 4. 通过 config 指定会话 ID
  9. 5. 多轮对话,模型自动记住上下文
  10. → 模型能回答出“小明”,因为历史被缓存了
  11. 运行效果
  12. 二、消息过滤
  13. 核心概念
  14. 关键函数
  15. 过滤参数
  16. 代码示例
  17. 过滤:只保留 AI 消息,且排除 id=4 的消息
  18. → 结果只剩 id=2 的 AIMessage: "你好,小明!很高兴认识你!"
  19. 过滤逻辑
  20. 三、消息合并
  21. 核心概念
  22. 关键函数
  23. 代码示例
  24. 合并效果
  25. 两种使用方式
  26. 方式一:直接调用函数合并后传给模型
  27. 方式二:通过管道(pipe)操作,合并与模型调用串联
  28. 四、流式输出
  29. 什么是流式输出
  30. 为什么需要?
  31. 典型应用
  32. 五、同步 vs 异步流式
  33. 核心区别
  34. 工作原理
  35. 何时使用异步?
  36. 六、流式输出基础用法
  37. 同步流式
  38. 异步流式
  39. 七、输出解析器
  40. 八、流式输出实际应用
  41. 1. 聊天机器人
  42. 2. 多用户并发
  43. 3. FastAPI 集成
  44. 九、常见问题
  45. 1. 没有流式效果?
  46. 2. async for 报错?
  47. 3. 性能对比
  48. 十、总结对比
  49. 典型应用场景
  50. 流式输出要点
  • 💰 8折买阿里云服务器限时8折了解详情
  • Magick API 一键接入全球大模型注册送1000万token查看
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • 苍穹外卖实战:SpringTask 定时任务与 WebSocket 实时通信
  • AI 商业价值与盈利趋势深度解析
  • 链表两两交换:Java 递归与迭代三种实现方案
  • Android 注册登录界面实现示例代码
  • Java 入门指南:环境配置与第一个程序
  • LeetCode 141: 环形链表判断算法详解
  • LeetCode 141:环形链表判断的两种经典解法
  • AI 产品经理转型指南:从传统产品到智能产品的进阶路径
  • LeetCode 141:环形链表判断的两种经典解法
  • TypeTale 免费 AIGC 视频创作工具简介
  • 超大规模多模态交通数据集:320TB+ 行车视频与道路监控资源
  • Ollama 本地大模型运行指南
  • 大模型(LLM)学习路线与核心资料整理
  • LeetCode 141:环形链表检测的经典解法
  • Qwen3-VL 基于 Llama-Factory 的 QLoRA 微调与部署全流程实战
  • Python 有望告别 GIL 锁:PEP-703 提案深度解析与影响评估
  • 6 款 AI 测试技能工具,助力自动化测试提效
  • 2026 年 3 月 GESP C++ 一级真题解析:数字替换
  • C++ STL list 容器详解:使用与模拟实现
  • Qwen2.5 技术报告详解:架构、训练与长文本能力

相关免费在线工具

  • RSA密钥对生成器

    生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online

  • Mermaid 预览与可视化编辑

    基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online

  • 随机西班牙地址生成器

    随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online

  • curl 转代码

    解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online