基于 OpenAI、LangChain 和 MongoDB 构建 AI Agent
本文介绍如何使用 OpenAI、LangChain、LangGraph 和 MongoDB 构建具备工具调用和记忆功能的 AI Agent。通过配置环境变量,集成 Tavily 搜索工具,利用 LangGraph 编排节点流程,并结合 MongoDB 实现对话状态的持久化存储。最终实现一个能够自主决策、联网搜索并保存多会话记录的智能助手。

本文介绍如何使用 OpenAI、LangChain、LangGraph 和 MongoDB 构建具备工具调用和记忆功能的 AI Agent。通过配置环境变量,集成 Tavily 搜索工具,利用 LangGraph 编排节点流程,并结合 MongoDB 实现对话状态的持久化存储。最终实现一个能够自主决策、联网搜索并保存多会话记录的智能助手。

大语言模型(LLM)如 ChatGPT、Gemini 和 Claude 已成为企业不可或缺的工具。如今,开发定制化的 AI Agent 成为许多公司的需求。本文将详细介绍如何创建一个个性化的助手,该助手不仅能进行功能调用,还能将对话记录存储在数据库中,实现多会话的连续互动,同时能够执行网页搜索并总结相关信息。
为了优化组织结构并便于未来扩展,我们将使用 LangChain、LangGraph 和 LangSmith 这三个核心工具。LangChain 简化了流媒体处理、工具调用及多种 LLM 的支持;LangGraph 用于组织工具选择逻辑,让智能助手自主决策路径;LangSmith 则提供监控能力,帮助观察从提问到答案生成的全过程。
在开始之前,请确保准备好以下资源:
首先,在你的 .env 文件中添加关键的环境变量:
OPENAI_API_KEY=sk-proj-XXXXXX
TAVILY_API_KEY=tvly-XXXXXXXXXXXXXXXXXXXXXXXX
MONGO_URI=mongodb+srv://username:[email protected]/dbname
LANGCHAIN_TRACING_V2=true
LANGCHAIN_ENDPOINT=https://api.smith.langchain.com
LANGCHAIN_API_KEY=lsv2_pt_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx_xxxxxxxxxx
LANGCHAIN_PROJECT=ai-agent
接下来,将这些 Key 和 URI 配置到设置文件中,确保 langsmith、OpenAI key、Tavily key 和 Mongo URI 均已正确加载。
# app/utilities/settings.py
from pydantic import SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
openai_key: SecretStr
tavily_api_key: str
mongo_uri: SecretStr
# LangSmith
langchain_tracing_v2: bool = True
langchain_endpoint: str = "https://api.smith.langchain.com"
langchain_project: str = "ai-agent"
langchain_api_key: str
model_config = SettingsConfigDict(env_file=".env", extra="ignore")
settings = Settings()
使用设置中的 API Key 设置 OpenAI 客户端:
# app/main.py
from langchain_openai import ChatOpenAI
from .utilities.settings import settings
llm = ChatOpenAI(
openai_api_key=settings.openai_key,
model_name="gpt-4o-mini",
max_retries=2,
)
为了让聊天机器人的各个部分顺畅交流,需要设置一个全局'状态'变量,以便数据在各个节点间流通。我们定义一个 State 类来管理消息列表。
# app/main.py
from typing import Annotated, TypedDict
from langchain_core.messages import BaseMessage
from langgraph.graph import END, START, StateGraph
from langgraph.graph.message import add_messages
from langgraph.graph.state import CompiledStateGraph
class State(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
在 LangGraph 中,每个节点本质上是一个函数。我们创建一个 chatbot 函数,通过 llm.ainvoke 调用 OpenAI,将当前消息状态传入,获取新的 AI 消息并更新到状态中。
async def chatbot(state: State) -> State:
response_message = await llm.ainvoke(state["messages"])
return {"messages": [response_message]}
LangGraph 提供了同步和异步两种方法。本示例选择异步的 ainvoke 以提高性能。
我们需要将新写的函数加入到流程图中。节点之间的连线称为'边'。
async def get_graph() -> CompiledStateGraph:
graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)
graph = graph_builder.compile()
return graph
编译后的流程图结构如下:
[图片:基础流程图结构]
编写一个新异步函数 run_graph,通过 graph.astream 启动编译好的图。该函数会激活'chatbot'节点,并处理事件流以提取最后一条消息返回给用户。
系统提示(System Prompt)为 AI 助手提供上下文信息,例如当前日期,这对于处理时效性信息至关重要。
# app/main.py
from datetime import datetime, timezone
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
now = datetime.now(timezone.utc)
system_prompt = f"""
You are an AI assistant helping a user.
Current Date: {now}
"""
async def run_graph(question: str) -> None:
async for event in graph.astream(
{
"messages": [
SystemMessage(content=system_prompt),
HumanMessage(content=question),
]
}
):
for value in event.values():
print(value["messages"][-1].content)
主循环监听用户输入,支持退出命令(quit、exit 或 q)。
async def main() -> None:
while True:
question = input("q: ")
if question.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
await run_graph(question)
if __name__ == "__main__":
import anyio
anyio.run(main)
利用 TavilySearchResults,我们的 AI 助手可以联网搜索。我们将工具绑定到 AI 助手身上,使其具备自主决策是否需要调用工具的能力。
为了实现工具调用的自动路由,我们需要定义一个条件函数 tools_condition,判断 LLM 是否决定调用工具。
from langchain_core.tools import ToolNode
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
web_search = TavilySearchResults(max_results=2)
tools = [web_search]
llm_with_tools = llm.bind_tools(tools)
def tools_condition(state: State):
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and len(last_message.tool_calls) > 0:
return "tools"
return "end"
现在加入 tool_node,并根据条件边连接不同的节点。
async def get_graph() -> CompiledStateGraph:
graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", chatbot)
tool_node = ToolNode(tools=tools)
graph_builder.add_node("tools", tool_node)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_conditional_edges("chatbot", tools_condition)
graph_builder.add_edge("tools", "chatbot")
graph = graph_builder.compile()
return graph
此时,如果 LLM 决定调用工具,流程将导向 tools 节点执行搜索,然后返回 chatbot 节点生成最终回答。
为了实现多会话的连续互动,我们需要将记忆持久化到 MongoDB 中。LangGraph 支持通过 Checkpointer 机制保存状态。
在代码顶部引入 MongoDB 异步客户端,并在 get_graph 函数中设置检查点。
# app/main.py
from motor.motor_asyncio import AsyncIOMotorClient
from langgraph.checkpoint.mongodb.aio import AsyncMongoDBSaver
async_mongodb_client: AsyncIOMotorClient = AsyncIOMotorClient(
settings.mongo_uri.get_secret_value()
)
async def get_graph() -> CompiledStateGraph:
checkpointer = AsyncMongoDBSaver(
client=async_mongodb_client,
db_name="ai",
checkpoint_collection_name="checkpoints",
writes_collection_name="checkpoint_writes",
)
graph_builder = StateGraph(State)
graph_builder.add_node("chatbot", chatbot)
tool_node = ToolNode(tools=tools)
graph_builder.add_node("tools", tool_node)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_conditional_edges("chatbot", tools_condition)
graph_builder.add_edge("tools", "chatbot")
graph = graph_builder.compile(checkpointer=checkpointer)
return graph
利用 MongoDB 设置 thread_id,可以保存不同聊天记录中的消息。这允许助手跨多个聊天会话记住之前的对话内容。为不同聊天创建独立记录时,只需使用不同的 thread_id。
from langchain_core.runnables import RunnableConfig
async def run_graph(config: RunnableConfig, question: str) -> None:
graph = await get_graph()
async for event in graph.astream(
{
"messages": [
SystemMessage(content=system_prompt),
HumanMessage(content=question),
]
},
config=config,
stream_mode="values",
):
event["messages"][-1].pretty_print()
async def main() -> None:
config = RunnableConfig(configurable={"thread_id": 1})
while True:
question = input("q: ")
if question.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
await run_graph(config=config, question=question)
if __name__ == "__main__":
import anyio
anyio.run(main)
通过以上步骤,我们打造了一个能够自主决定是否需要调用工具、能够在网上查找信息,并且能够为每个对话线程保存信息的 AI 助手。由于使用了 LangChain 生态,后续想要加入流媒体处理、支持多种模型或其他 AI 模型的功能也变得相当简单。结合 LangSmith,我们可以清晰地监控整个推理过程,便于调试和优化。
此架构不仅适用于简单的问答机器人,也可作为复杂工作流的基础,支持更高级的企业级应用场景。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online