文章目录

一、消息内存缓存
核心概念
通过 InMemoryChatMessageHistory 将对话历史存储在内存中,使模型能"记住"之前的对话内容。
文章目录 一、消息内存缓存 核心概念 关键组件 代码流程 运行效果 二、消息过滤 核心概念 关键函数 过滤参数 代码示例 过滤逻辑 三、消息合并 核心概念 关键函数 代码示例 合并效果 两种使用方式 四、流式输出 什么是流式输出 为什么需要? 典型应用 五、同步 vs 异步流式 核心区别 工作原理 何时使用异步? 六、流式输出基础用法 同步流式 异步流式 七、输出解析器 八、流式输出实际应用 1\…


通过 InMemoryChatMessageHistory 将对话历史存储在内存中,使模型能"记住"之前的对话内容。
| 组件 | 作用 |
|---|---|
InMemoryChatMessageHistory | 内存中的聊天记录存储器 |
RunnableWithMessageHistory | 将模型包装为支持历史记录的可运行对象 |
memory_store(字典) | 以 session_id 为 key 管理多个会话的历史 |
# 1. 创建内存存储字典 memory_store ={}# 2. 定义获取会话历史的函数(按 session_id 区分会话)defget_session_history(session_id:str):if session_id notin memory_store: memory_store[session_id]= InMemoryChatMessageHistory()return memory_store[session_id]# 3. 用 RunnableWithMessageHistory 包装模型 message_model = RunnableWithMessageHistory(model, get_session_history)# 4. 通过 config 指定会话 ID config ={"configurable":{"session_id":"123"}}# 5. 多轮对话,模型自动记住上下文 response1 = message_model.invoke({"input":"你好,我是小明"}, config=config) response2 = message_model.invoke({"input":"我叫什么名字?"}, config=config)# → 模型能回答出"小明",因为历史被缓存了
从LangChain的v0.3版本开始,官⽅建议LangChain⽤⼾不要使⽤
RunnableWithMessageHistory ,⽽是利⽤ LangGraph 持久性 来完成
使用 filter_messages 函数对消息列表进行筛选,按类型或ID过滤消息。
from langchain_core.messages import filter_messages
| 参数 | 作用 | 示例 |
|---|---|---|
include_types | 只保留指定类型的消息 | ["ai"] → 只保留 AI 消息 |
exclude_ids | 排除指定 ID 的消息 | ["4"] → 排除 id 为 '4' 的消息 |
messages =[ HumanMessage(content="你好,我是小明",id="1"), AIMessage(content="你好,小明!很高兴认识你!",id="2"), HumanMessage(content="我想知道我之前的名字",id="3"), AIMessage(content="你之前的名字是小绿!",id="4"),]# 过滤:只保留 AI 消息,且排除 的消息 filtered_messages = filter_messages( messages, include_types=["ai"], exclude_ids=["4"],)# → 结果只剩 的 AIMessage: "你好,小明!很高兴认识你!"
原始消息 → include_types=["ai"] 筛掉 Human 消息 → exclude_ids=["4"] 再排除 id=4 → 最终结果
原始: [Human#1, AI#2, Human#3, AI#4] ↓ include_types=["ai"] 中间: [AI#2, AI#4] ↓ exclude_ids=["4"] 结果: [AI#2]
使用 merge_message_runs 将连续的同类型消息合并为一条,避免多条连续 Human 或 AI 消息导致模型报错或行为异常。
from langchain_core.messages import merge_message_runs
messages =[ HumanMessage(content="你好",id="1"), HumanMessage(content="我是小明",id="2"),# 连续两条 Human AIMessage(content="你好,小明!",id="3"), AIMessage(content="很高兴认识你!",id="4"),# 连续两条 AI] merged_messages = merge_message_runs(messages)
合并前(4条): human: 你好 human: 我是小明 ai: 你好,小明! ai: 很高兴认识你! 合并后(2条): human: 你好\n我是小明 ai: 你好,小明!\n很高兴认识你!
# 方式一:直接调用函数合并后传给模型 merged_messages = merge_message_runs(messages) model.invoke(merged_messages)# 方式二:通过管道(pipe)操作,合并与模型调用串联 chain = merge_message_runs | model response = chain.invoke(messages)
管道方式更简洁,适合在 LangChain 链式调用中使用。
流式输出(Streaming) 是指 AI 模型逐字返回内容,而不是等待全部生成完毕后一次性返回。就像 ChatGPT 那样,文字一个个"打"出来,而不是突然全部出现。
AI 生成长文本可能需要几秒甚至更长时间。传统方式用户需要等待整个响应完成才能看到内容,体验很差。流式输出实时展示生成过程,让用户感觉响应更快,交互更自然。
| 特性 | 非流式 | 流式 |
|---|---|---|
| 用户体验 | 需要等待 | 实时看到 |
| 适用场景 | 短文本 | 聊天对话、长文本 |
| 内存占用 | 一次性加载 | 逐块处理 |
| 可控性 | 无法中断 | 可随时停止 |
LangChain 提供两种流式方式:同步(stream)和异步(astream)。
| 特性 | 同步 stream | 异步 astream |
|---|---|---|
| 调用 | chain.stream() | chain.astream() |
| 循环 | for chunk in | async for chunk in |
| 阻塞 | 阻塞线程 | 不阻塞,可并发 |
| 场景 | 单个请求 | 多个并发请求 |
| 性能 | 一般 | 更高 |
同步流式: 阻塞当前线程,处理一个请求时无法处理其他请求。就像排队买咖啡,必须等前一个人买完。
异步流式: 使用协程机制,等待 AI 响应时可以切换到其他任务。就像服务员可以同时为多桌客人点单。
推荐:
不需要:
from langchain_deepseek import ChatDeepSeek from langchain_core.output_parsers import StrOutputParser model = ChatDeepSeek(model="deepseek-chat", streaming=True) parser = StrOutputParser() chain = model | parser for chunk in chain.stream("写一个关于程序员的笑话"):print(chunk, end="|", flush=True)
关键点:
streaming=True:必须设置flush=True:立即刷新输出import asyncio asyncdefmain(): chain = model | parser asyncfor chunk in chain.astream("写一个关于程序员的笑话"):print(chunk, end="|", flush=True)if __name__ =="__main__": asyncio.run(main())
关键点:
async def:定义异步函数async for:异步迭代asyncio.run():运行入口StrOutputParser 是最常用的解析器,将模型输出转换为纯文本。
作用:
自定义解析器:
defcustom_parser(output:str)->str:return output.strip().replace("。","!") chain = model | parser | custom_parser
应用场景:
用户发送消息后,AI 回复逐字显示,像真人打字。使用异步流式提高响应速度。
Web 应用中多个用户同时请求,异步流式可以并发处理。
性能对比:
from fastapi import FastAPI from fastapi.responses import StreamingResponse @app.get("/chat")asyncdefchat_stream(question:str):asyncdefgenerate():asyncfor chunk in chain.astream(question):yield chunk return StreamingResponse(generate(), media_type="text/plain")
原因: 忘记 streaming=True 或 flush=True
原因: 使用了 ainvoke() 而不是 astream()
ainvoke() 返回完整结果,astream() 返回流式迭代器。
| 功能 | 函数/类 | 用途 |
|---|---|---|
| 内存缓存 | InMemoryChatMessageHistory + RunnableWithMessageHistory | 让模型记住多轮对话上下文 |
| 消息过滤 | filter_messages | 按类型/ID 筛选消息 |
| 消息合并 | merge_message_runs | 合并连续同类型消息 |
| 流式输出 | stream / astream | 实时逐字返回,提升体验 |
| 输出解析 | StrOutputParser | 将模型输出转为纯文本 |
streaming=True 和 flush=True
微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online