LangChain RAG 进阶:路由、查询构建与索引检索策略
在构建检索增强生成(RAG)系统时,仅仅依赖简单的向量相似度搜索往往难以满足复杂场景的需求。本文深入探讨 LangChain 框架下的高级检索技术,涵盖路由机制、结构化查询、多表示索引以及重排序策略,旨在帮助开发者提升系统的准确性与效率。
LangChain RAG 进阶教程涵盖路由机制、结构化查询、多表示索引及重排序技术。通过逻辑与语义路由区分问题类型,利用元数据过滤器优化检索精度,采用多向量存储提升上下文关联度,并结合 RAG-Fusion 与 Cohere 重排序增强结果相关性。提供完整的 Python 代码示例与最佳实践建议,帮助开发者构建高效准确的检索增强生成系统。

在构建检索增强生成(RAG)系统时,仅仅依赖简单的向量相似度搜索往往难以满足复杂场景的需求。本文深入探讨 LangChain 框架下的高级检索技术,涵盖路由机制、结构化查询、多表示索引以及重排序策略,旨在帮助开发者提升系统的准确性与效率。
完成 Query Translation 之后进入 Routing 阶段。Routing 的核心意义在于根据不同的问题类型采取不同的处理策略。例如,涉及关系型数据库的问题应走 NL2SQL 路径,而涉及知识库的问题则走向量数据库查询路径。
使用函数调用进行分类是实现路由的有效方式。通过定义结构化输出模型,LLM 可以将用户问题分类到特定的数据源。
from typing import Literal
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI
# Data model
class RouteQuery(BaseModel):
"""Route a user query to the most relevant datasource."""
datasource: Literal["python_docs", "js_docs", "golang_docs"] = Field(
..., description="Given a user question choose which datasource would be most relevant for answering their question",
)
# LLM with function call
llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)
structured_llm = llm.with_structured_output(RouteQuery)
# Prompt
system = """You are an expert at routing a user question to the appropriate data source.
Based on the programming language the question is referring to, route it to the relevant data source."""
prompt = ChatPromptTemplate.from_messages([
("system", system),
("human", "{question}"),
])
# Define router
router = prompt | structured_llm
注意:我们使用函数调用来产生结构化输出,确保路由决策的稳定性。
question = """Why doesn't the following code work:
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_messages(["human", "speak in {language}"])
prompt.invoke("french")
"""
result = router.invoke({"question": question})
# result.datasource -> 'python_docs'
def choose_route(result):
if "python_docs" in result.datasource.lower():
return "chain for python_docs"
elif "js_docs" in result.datasource.lower():
return "chain for js_docs"
else:
return "golang_docs"
from langchain_core.runnables import RunnableLambda
full_chain = router | RunnableLambda(choose_route)
full_chain.invoke({"question": question})
# 'chain for python_docs'
除了基于规则的逻辑路由,还可以利用语义相似度进行路由。通过将预设的提示词模板嵌入为向量,计算用户查询与模板向量的余弦相似度,从而选择最匹配的提示词。
from langchain.utils.math import cosine_similarity
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
# Two prompts
physics_template = """You are a very smart physics professor. \You are great at answering questions about physics in a concise and easy to understand manner. \When you don't know the answer to a question you admit that you don't know.
Here is a question:
{query}"""
math_template = """You are a very good mathematician. You are great at answering math questions. \You are so good because you are able to break down hard problems into their component parts, \answer the component parts, and then put them together to answer the broader question.
Here is a question:
{query}"""
# Embed prompts
embeddings = OpenAIEmbeddings()
prompt_templates = [physics_template, math_template]
prompt_embeddings = embeddings.embed_documents(prompt_templates)
# Route question to prompt
def prompt_router(input):
# Embed question
query_embedding = embeddings.embed_query(input["query"])
# Compute similarity
similarity = cosine_similarity([query_embedding], prompt_embeddings)[0]
most_similar = prompt_templates[similarity.argmax()]
# Chosen prompt
print("Using MATH" if most_similar == math_template else "Using PHYSICS")
return PromptTemplate.from_template(most_similar)
chain = (
{"query": RunnablePassthrough()}
| RunnableLambda(prompt_router)
| ChatOpenAI()
| StrOutputParser()
)
print(chain.invoke("What's a black hole"))
输出示例显示系统正确识别了物理相关问题并选择了相应的提示词。
许多矢量存储包含元数据字段,这使得可以根据元数据过滤特定块成为可能。将自然语言转换为结构化搜索查询是提升检索精度的关键步骤。
假设我们有一个包含视频教程的数据库,每个文档都有标题、描述、发布时间、观看次数等元数据。我们可以定义一个 Pydantic 模型来描述查询架构。
import datetime
from typing import Literal, Optional, Tuple
from langchain_core.pydantic_v1 import BaseModel, Field
class TutorialSearch(BaseModel):
"""Search over a database of tutorial videos about a software library."""
content_search: str = Field(..., description="Similarity search query applied to video transcripts.")
title_search: str = Field(..., description=("Alternate version of the content search query to apply to video titles. Should be succinct and only include key words that could be in a video title."))
min_view_count: Optional[int] = Field(None, description="Minimum view count filter, inclusive. Only use if explicitly specified.")
max_view_count: Optional[int] = Field(None, description="Maximum view count filter, exclusive. Only use if explicitly specified.")
earliest_publish_date: Optional[datetime.date] = Field(None, description="Earliest publish date filter, inclusive. Only use if explicitly specified.")
latest_publish_date: Optional[datetime.date] = Field(None, description="Latest publish date filter, exclusive. Only use if explicitly specified.")
min_length_sec: Optional[int] = Field(None, description="Minimum video length in seconds, inclusive. Only use if explicitly specified.")
max_length_sec: Optional[int] = Field(None, description="Maximum video length in seconds, exclusive. Only use if explicitly specified.")
def pretty_print(self) -> None:
for field in self.__fields__:
if getattr(self, field) is not None and getattr(self, field) != getattr(self.__fields__[field], "default", None):
print(f"{field}: {getattr(self, field)}")
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
system = """You are an expert at converting user questions into database queries. \You have access to a database of tutorial videos about a software library for building LLM-powered applications. \Given a question, return a database query optimized to retrieve the most relevant results.
If there are acronyms or words you are not familiar with, do not try to rephrase them."""
prompt = ChatPromptTemplate.from_messages([
("system", system),
("human", "{question}"),
])
llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)
structured_llm = llm.with_structured_output(TutorialSearch)
query_analyzer = prompt | structured_llm
# Example usage
query_analyzer.invoke({"question": "rag from scratch"}).pretty_print()
通过这种方式,LLM 能够理解用户的意图并将其转化为具体的过滤条件,如时间范围、长度限制等,从而实现更精准的检索。
高效的索引策略对于 RAG 的性能至关重要。传统的单向量索引可能无法捕捉文档的多层次语义信息。
多表示索引允许我们将父文档(长文本)与其子文档(摘要或切片)分别索引。检索时先匹配子文档(摘要),再根据 ID 获取完整的父文档内容。这种方法结合了摘要的紧凑性和原文的完整性。
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
loader = WebBaseLoader("https://lilianweng.github.io/posts/2023-06-23-agent/")
docs = loader.load()
loader = WebBaseLoader("https://lilianweng.github.io/posts/2024-02-05-human-data-quality/")
docs.extend(loader.load())
import uuid
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
chain = (
{"doc": lambda x: x.page_content}
| ChatPromptTemplate.from_template("Summarize the following document:\n\n{doc}")
| ChatOpenAI(model="gpt-3.5-turbo",max_retries=0)
| StrOutputParser()
)
summaries = chain.batch(docs, {"max_concurrency": 5})
from langchain.storage import InMemoryByteStore
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.retrievers.multi_vector import MultiVectorRetriever
# The vectorstore to use to index the child chunks
vectorstore = Chroma(collection_name="summaries",
embedding_function=OpenAIEmbeddings())
# The storage layer for the parent documents
store = InMemoryByteStore()
id_key = "doc_id"
# The retriever
retriever = MultiVectorRetriever(
vectorstore=vectorstore,
byte_store=store,
id_key=id_key,
)
doc_ids = [str(uuid.uuid4()) for _ in docs]
# Docs linked to summaries
summary_docs = [
Document(page_content=s, metadata={id_key: doc_ids[i]})
for i, s in enumerate(summaries)
]
# Add
retriever.vectorstore.add_documents(summary_docs)
retriever.docstore.mset(list(zip(doc_ids, docs)))
query = "Memory in agents"
sub_docs = vectorstore.similarity_search(query,k=1)
sub_docs[0]
检索结果首先返回摘要,随后可以通过 retriever.get_relevant_documents 获取完整的原始文档内容。
ColBERT 是一种基于延迟交互的检索模型,它为段落中的每个标记生成受上下文影响的向量。相比传统向量检索,ColBERT 能更好地捕捉细粒度的语义匹配。
from ragatouille import RAGPretrainedModel
RAG = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")
import requests
def get_wikipedia_page(title: str):
URL = "https://en.wikipedia.org/w/api.php"
params = {
"action": "query",
"format": "json",
"titles": title,
"prop": "extracts",
"explaintext": True,
}
headers = {"User-Agent": "RAGatouille_tutorial/0.0.1"}
response = requests.get(URL, params=params, headers=headers)
data = response.json()
page = next(iter(data["query"]["pages"].values()))
return page["extract"] if "extract" in page else None
full_document = get_wikipedia_page("Hayao_Miyazaki")
RAG.index(
collection=[full_document],
index_name="Miyazaki-123",
max_document_length=180,
split_documents=True,
)
results = RAG.search(query="What animation studio did Miyazaki found?", k=3)
retriever = RAG.as_langchain_retriever(k=3)
retriever.invoke("What animation studio did Miyazaki found?")
检索阶段的优化直接影响最终回答的质量。除了基础检索,重排序(Re-ranking)技术可以显著提升结果的相关性。
重排序通常分为两阶段:粗排(召回)和精排(重排序)。RAG-Fusion 是一种无需额外训练的重排序方法,它通过生成多个变体查询并合并结果来实现。
import bs4
from langchain_community.document_loaders import WebBaseLoader
loader = WebBaseLoader(
web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
bs_kwargs=dict(
parse_only=bs4.SoupStrainer(
class_=("post-content", "post-title", "post-header")
)
),
)
blog_docs = loader.load()
from langchain.text_splitter import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=300,
chunk_overlap=50)
splits = text_splitter.split_documents(blog_docs)
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
vectorstore = Chroma.from_documents(documents=splits,
embedding=OpenAIEmbeddings())
retriever = vectorstore.as_retriever()
from langchain.prompts import ChatPromptTemplate
template = """You are a helpful assistant that generates multiple search queries based on a single input query. \nGenerate multiple search queries related to: {question} \nOutput (4 queries):"""
prompt_rag_fusion = ChatPromptTemplate.from_template(template)
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
generate_queries = (
prompt_rag_fusion
| ChatOpenAI(temperature=0)
| StrOutputParser()
| (lambda x: x.split("\n"))
)
from langchain.load import dumps, loads
def reciprocal_rank_fusion(results: list[list], k=60):
fused_scores = {}
for docs in results:
for rank, doc in enumerate(docs):
doc_str = dumps(doc)
if doc_str not in fused_scores:
fused_scores[doc_str] = 0
previous_score = fused_scores[doc_str]
fused_scores[doc_str] += 1 / (rank + k)
reranked_results = [
(loads(doc), score)
for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
]
return reranked_results
question = "What is task decomposition for LLM agents?"
retrieval_chain_rag_fusion = generate_queries | retriever.map() | reciprocal_rank_fusion
docs = retrieval_chain_rag_fusion.invoke({"question": question})
len(docs)
此外,也可以使用专门的 Rerank 模型(如 Cohere Rerank)对初步检索结果进行重新打分。
from langchain_community.llms import Cohere
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CohereRerank
retriever = vectorstore.as_retriever(search_kwargs={"k": 10})
compressor = CohereRerank()
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor, base_retriever=retriever
)
compressed_docs = compression_retriever.get_relevant_documents(question)
本文详细介绍了 LangChain RAG 系统中的高级组件。在实际应用中,建议遵循以下原则:
通过组合上述技术,开发者可以构建出更加鲁棒、准确且高效的 RAG 应用,满足企业级复杂场景的需求。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online