RAG 落地场景中的关键优化技巧与实战经验
本文深入探讨了 RAG 技术在落地场景中的关键优化技巧与实战经验。内容涵盖知识加工流程(加载、切片、多元信息抽取、存储)及检索链路优化(查询改写、多策略召回、重排序)。重点介绍了知识图谱、Doc Tree、元数据过滤等增强手段,以及静态知识与动态工具结合的 RAG 架构。通过运维与金融财报两个实际案例,展示了如何通过严谨的流程设计与混合召回机制提升系统专业性与准确性,并提供了相应的评测指标建议。

本文深入探讨了 RAG 技术在落地场景中的关键优化技巧与实战经验。内容涵盖知识加工流程(加载、切片、多元信息抽取、存储)及检索链路优化(查询改写、多策略召回、重排序)。重点介绍了知识图谱、Doc Tree、元数据过滤等增强手段,以及静态知识与动态工具结合的 RAG 架构。通过运维与金融财报两个实际案例,展示了如何通过严谨的流程设计与混合召回机制提升系统专业性与准确性,并提供了相应的评测指标建议。

在过去两年中,检索增强生成(RAG, Retrieval-Augmented Generation)技术逐渐成为提升智能体的核心组成部分。通过结合检索与生成的双重能力,RAG 能够引入外部知识,从而为大模型在复杂场景中的应用提供更多可能性。但是在实际落地场景中,往往会存在检索准确率低,噪音干扰多,召回完整性,专业性不够,导致 LLM 幻觉严重的问题。本文将聚焦 RAG 在实际落地场景中的知识加工和检索细节,如何去优化 RAG Pipeline 链路,最终提升召回准确率。
快速搭建一个 RAG 智能问答应用很简单,但是在实际业务场景落地还需要做大量的准备工作。

主要分为知识加工和RAG 检索部分关键流程:

知识加载 -> 知识切片 -> 信息抽取 -> 知识加工 (embedding/graph/keywords) -> 知识存储

# 知识工厂进行实例化
KnowledgeFactory -> create() -> load() -> Document
支持格式包括:knowledge, markdown, pdf, docx, txt, html, pptx, url 等。
如何扩展:
class Knowledge(ABC):
def load(self) -> List[Document]:
"""Load knowledge from data loader."""
@classmethod
def document_type(cls) -> Any:
"""Get document type."""
def support_chunk_strategy(cls) -> List[ChunkStrategy]:
"""Return supported chunk strategy."""
return [
ChunkStrategy.CHUNK_BY_SIZE,
ChunkStrategy.CHUNK_BY_PAGE,
ChunkStrategy.CHUNK_BY_PARAGRAPH,
ChunkStrategy.CHUNK_BY_MARKDOWN_HEADER,
ChunkStrategy.CHUNK_BY_SEPARATOR,
]
@classmethod
def default_chunk_strategy(cls) -> ChunkStrategy:
"""Return default chunk strategy."""
return ChunkStrategy.CHUNK_BY_SIZE

ChunkManager: 通过加载后的知识数据,根据用户指定的分片策略和分片参数路由到对应的分片处理器进行分配。
class ChunkManager:
"""Manager for chunks."""
def __init__(
self,
knowledge: Knowledge,
chunk_parameter: Optional[ChunkParameters] = None,
extractor: Optional[Extractor] = None,
):
"""Create a new ChunkManager with the given knowledge."""
self._knowledge = knowledge
self._extractor = extractor
self._chunk_parameters = chunk_parameter or ChunkParameters()
self._chunk_strategy = (
chunk_parameter.chunk_strategy
if chunk_parameter and chunk_parameter.chunk_strategy
else self._knowledge.default_chunk_strategy().name
)
self._text_splitter = self._chunk_parameters.text_splitter
self._splitter_type = self._chunk_parameters.splitter_type
如何扩展:如果你想在界面上自定义一个新的分片策略,需新增切片策略并新增 Splitter 实现逻辑。
class ChunkStrategy(Enum):
"""Chunk Strategy Enum."""
CHUNK_BY_SIZE: _STRATEGY_ENUM_TYPE = (
RecursiveCharacterTextSplitter,
[
{
"param_name": "chunk_size",
"param_type": "int",
"default_value": 512,
"description": "The size of the data chunks used in processing.",
},
{
"param_name": "chunk_overlap",
"param_type": "int",
"default_value": 50,
"description": "The amount of overlap between adjacent data chunks.",
},
],
"chunk size",
"split document by chunk size",
)
# ... other strategies like CHUNK_BY_PAGE, CHUNK_BY_PARAGRAPH etc.
向量抽取: embedding, 实现 Embeddings 接口。
@abstractmethod
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Embed search docs."""
@abstractmethod
def embed_query(self, text: str) -> List[float]:
"""Embed query text."""
配置示例:
# EMBEDDING_MODEL=proxy_openai
# proxy_openai_proxy_server_url=https://api.openai.com/v1
# proxy_openai_proxy_api_key={your-openai-sk}
# proxy_openai_proxy_backend=text-embedding-ada-002
知识图谱抽取: knowledge graph
class TripletExtractor(LLMExtractor):
"""TripletExtractor class."""
def __init__(self, llm_client: LLMClient, model_name: str):
super().__init__(llm_client, model_name, TRIPLET_EXTRACT_PT)
TRIPLET_EXTRACT_PT = (
"Some text is provided below. Given the text, "
"extract up to knowledge triplets as more as possible "
"in the form of (subject, predicate, object).\n"
"Avoid stopwords. The subject, predicate, object can not be none.\n"
"---------------------\n"
"Example:\n"
"Text: Alice is Bob's mother.\n"
"Triplets:\n(Alice, is mother of, Bob)\n"
# ... more examples
)
倒排索引抽取: keywords 分词。可以用 es 默认的分词库,也可以使用 es 的插件模式自定义分词。
整个知识持久化统一实现了 IndexStoreBase 接口,目前提供了向量数据库、图数据库、全文索引三类实现。

load_document(),包括索引 schema 创建,向量数据分批写入等等。class VectorStoreBase(IndexStoreBase, ABC):
"""Vector store base class."""
@abstractmethod
def load_document(self, chunks: List[Chunk]) -> List[str]:
"""Load document in index database."""
@abstractmethod
async def aload_document(self, chunks: List[Chunk]) -> List[str]:
"""Load document in index database."""
@abstractmethod
def similar_search_with_scores(
self,
text,
topk,
score_threshold: float,
filters: Optional[MetadataFilters] = None,
) -> List[Chunk]:
"""Similar search with scores in index database."""
TuGraphStore 会根据三元组生成具体的 Cypher 语句并执行。def insert_triplet(self, subj: str, rel: str, obj: str) -> None:
"""Add triplet."""
subj_query = f"MERGE (n1:{self._node_label} {{id:'{subj}'}})"
obj_query = f"MERGE (n1:{self._node_label} {{id:'{obj}'}})"
rel_query = (
f"MERGE (n1:{self._node_label} {{id:'{subj}'}})"
f"-[r:{self._edge_label} {{id:'{rel}'}}]->"
f"(n2:{self._node_label} {{id:'{obj}'}})"
)
self.conn.run(query=subj_query)
self.conn.run(query=obj_query)
self.conn.run(query=rel_query)
{
"analysis": {"analyzer": {"default": {"type": "standard"}}},
"similarity": {
"custom_bm25": {
"type": "BM25",
"k1": self._k1,
"b": self._b,
}
}
}
question -> rewrite -> similarity_search -> rerank -> context_candidates
接下来是知识检索,目前社区的检索逻辑主要分为这几步。如果你设置了查询改写参数,目前会通过大模型给你进行一轮问题改写,然后会根据你的知识加工方式路由到对应的检索器。如果你是通过向量进行加工的,那就会通过 EmbeddingRetriever 进行检索;如果你构建方式是通过知识图谱构建的,就会按照知识图谱方式进行检索;如果你设置了 rerank 模型,会给粗筛后的候选值进行精筛,让候选值和用户问题更有关联。

class EmbeddingRetriever(BaseRetriever):
"""Embedding retriever."""
def __init__(
self,
index_store: IndexStoreBase,
top_k: int = 4,
query_rewrite: Optional[QueryRewrite] = None,
rerank: Optional[Ranker] = None,
retrieve_strategy: Optional[RetrieverStrategy] = RetrieverStrategy.EMBEDDING,
):
pass
async def _aretrieve_with_score(
self,
query: str,
score_threshold: float,
filters: Optional[MetadataFilters] = None,
) -> List[Chunk]:
"""Retrieve knowledge chunks with score."""
queries = [query]
new_queries = await self._query_rewrite.rewrite(
origin_query=query, context=context, nums=1
)
queries.extend(new_queries)
candidates_with_score = [
self._similarity_search_with_score(
query, score_threshold, filters, root_tracer.get_current_span_id()
)
for query in queries
]
new_candidates_with_score = await self._rerank.arank(
new_candidates_with_score, query
)
return new_candidates_with_score
参数说明:
index_store: 具体的向量数据库top_k: 返回的具体候选 chunk 个数query_rewrite: 查询改写函数rerank: 重排序函数score_threshold: 得分,我们默认会把相似度得分小于阈值的上下文信息给过滤掉filters: Optional[MetadataFilters], 元数据信息过滤器,主要是可以用来前置通过属性信息筛掉一些不匹配的候选信息。class MetadataFilter(BaseModel):
key: str = Field(..., description="The key of metadata to filter.")
operator: FilterOperator = Field(default=FilterOperator.EQ, description="The operator of metadata filter.")
value: Union[str, int, float, List[str], List[int], List[float]] = Field(..., description="The value of metadata to filter.")

首先通过模型进行关键词抽取,这里可以通过传统的 nlp 技术进行分词,也可以通过大模型进行分词,然后进行关键词按照同义词做扩充,找到关键词的候选列表,最后根据关键词候选列表调用 explore 方法召回局部子图。
KEYWORD_EXTRACT_PT = (
"A question is provided below. Given the question, extract up to "
"keywords from the text. Focus on extracting the keywords that we can use "
"to best lookup answers to the question.\n"
"Generate as more as possible synonyms or alias of the keywords "
"considering possible cases of capitalization, pluralization, "
"common expressions, etc.\n"
"Avoid stopwords.\n"
"Provide the keywords and synonyms in comma-separated format."
"Formatted keywords and synonyms text should be separated by a semicolon.\n"
"---------------------\n"
"Example:\n"
"Text: Alice is Bob's mother.\n"
"Keywords:\nAlice,mother,Bob;mummy\n"
# ... more examples
)
def explore(
self,
subs: List[str],
direct: Direction = Direction.BOTH,
depth: Optional[int] = None,
fan: Optional[int] = None,
limit: Optional[int] = None,
) -> Graph:
"""Explore on graph."""
这部分是 ChatData 场景的 schema-linking 检索。主要是通过 schema-linking 方式通过二阶段相似度检索,首先先找到最相关的表,然后再最相关的字段信息。优点:这种二阶段检索也是为了解决社区反馈的大宽表体验的问题。
def _similarity_search(
self, query, filters: Optional[MetadataFilters] = None
) -> List[Chunk]:
"""Similar search."""
table_chunks = self._table_vector_store_connector.similar_search_with_scores(
query, self._top_k, 0, filters
)
not_sep_chunks = [
chunk for chunk in table_chunks if not chunk.metadata.get("separated")
]
separated_chunks = [
chunk for chunk in table_chunks if chunk.metadata.get("separated")
]
if not separated_chunks:
return not_sep_chunks
tasks = [
lambda c=chunk: self._retrieve_field(c, query) for chunk in separated_chunks
]
separated_result = run_tasks(tasks, concurrency_limit=3)
return not_sep_chunks + separated_result
目前 RAG 智能问答应用几个痛点:
非结构化/半结构化/结构化数据的处理,准备决定着 RAG 应用的上限,因此首先需要在知识处理,索引阶段做大量的细粒度的 ETL 工作,主要优化的思路方向:非结构化 -> 结构化:有条理地组织知识信息;提取更加丰富的,多元化的语义信息。
目的:需要对文档进行精确的解析,更多元化的识别到不同类型的数据。
优化建议:
目的:保存上下文完整性和相关性,这直接关乎回复准确率。
优化建议:
除了对文档进行 Embedding 向量抽取外,其他多元化的信息抽取能够对文档进行数据增强,显著提升 RAG 召回效果。
这篇文章讲了个啥,总结一下 等全局问题场景。通过 mapreduce 等方式分段抽取,通过模型为每段 chunk 提取摘要信息。目前知识库提供了文档上传 -> 解析 -> 切片 -> Embedding -> 知识图谱三元组抽取 -> 向量数据库存储 -> 图数据库存储等知识加工的能力,但是不具备对文档进行复杂的个性化的信息抽取能力,因此希望通过构建知识加工工作流模版来完成复杂的,可视化的,用户可自定义的知识抽取,转换,加工流程。
RAG 流程的优化我们又分为了静态文档的 RAG 和动态数据获取的 RAG。目前大部分涉及到的 RAG 只覆盖了非结构化的文档静态资产,但是实际业务很多场景的问答是通过工具获取动态数据 + 静态知识数据共同回答的场景,不仅需要检索到静态的知识,同时需要 RAG 检索到工具资产库里面工具信息并执行获取动态数据。
目的:澄清用户语义,将用户的原始问题从模糊的,意图不清晰的查询优化为含义更丰富的一个可检索的 Query。
LLMExtractor) 或 构建 embedding+ 逻辑回归实现双塔模型。当我们把索引分成许多 chunks 并且都存储在相同的知识空间里面,检索效率会成为问题。比如用户问 "浙江我武科技公司" 相关信息时,并不想召回其他公司的信息。因此,如果可以通过公司名称元数据属性先进行过滤,就会大大提升效率和相关度。
async def aretrieve(
self, query: str, filters: Optional[MetadataFilters] = None
) -> List[Chunk]:
"""Retrieve knowledge chunks."""
return await self._aretrieve(query, filters)
经过粗筛候选列表后,怎么通过精筛过滤噪音呢?
使用相关重排序模型进行精筛,可以使用开源的模型,也可以使用带业务语义微调的模型。
## Rerank model
#RERANK_MODEL=bce-reranker-base
#RERANK_TOP_K=5
根据不同索引召回的内容进行业务 RRF 加权综合打分剔除。
文档类知识是相对静态的,无法回答个性化以及动态的信息,需要依赖一些第三方平台工具才可以回答。基于这种情况,我们需要一些动态 RAG 的方法,通过工具资产定义 -> 工具选择 -> 工具校验 -> 工具执行获取动态数据。
构建企业领域工具资产库,将散落到各个平台的工具 API,工具脚本进行整合,进而提供智能体端到端的使用能力。
工具召回沿用静态知识的 RAG 召回的思路,再通过完整的工具执行生命周期来获取工具执行结果。
在评估智能问答流程时,需要单独对召回相关性准确率以及模型问答的相关性进行评估,然后再综合考虑,以判断 RAG 流程在哪些方面仍需改进。
评价指标:
RetrieverHitRateMetric: 命中率衡量的是 RAG retriever 召回出现在检索结果前 top-k 个文档中的比例。RetrieverMRRMetric: Mean Reciprocal Rank,通过分析最相关文档在检索结果里的排名来计算每个查询的准确性。RetrieverSimilarityMetric: 相似度指标计算,计算召回内容与预测内容的相似度。AnswerRelevancyMetric: 智能体答案相关性指标,通过智能体答案与用户提问的匹配程度。在数据基础设施领域,有很多运维 SRE,每天会接收到大量的告警,因此很多时间来需要响应应急事件,进而进行故障诊断,然后故障复盘,进而进行经验沉淀。另外一部分时间又需要响应用户咨询,需要他们用他们的知识以及工具使用经验进行答疑。
传统的 RAG + Agent 技术可以解决通用的,确定性没那么高的,单步任务场景。但是面对数据基础设施领域的专业场景,整个检索过程必须是确定,专业和真实的,并且是需要一步一步推理的。
通用的智能体:传统的 RAG 对知识的严谨和专业性要求没那么高,适用于客服,旅游,平台答疑机器人这样的一些业务场景。 数据基础设施智能体:RAG 流程是严谨和专业的,需要专属的 RAG 工作流程,上下文包括 (告警->定位->止血->恢复),并且需要对专家沉淀的问答和应急经验,进行结构化的抽取,建立层次关系。因此我们选择知识图谱来作为数据承载。
基于数据基础设施的确定性和特殊性,我们选择通过结合知识图谱来作为诊断应急经验的知识承载。我们通过 SRE 沉淀下来的应急排查事件知识经验 结合应急复盘流程,建立了 DB 应急事件驱动的知识图谱,我们以 DB 抖动为例,影响 DB 抖动的几个事件,包括慢 SQL 问题,容量问题,我们在各个应急事件间建立了关系。
在智能体检索阶段,我们使用 GraphRAG 作为静态知识检索的承载,因此识别到 DB 抖动异常后,找到了与 DB 抖动异常节点相关的节点作为我们分析依据。由于在知识抽取阶段每一个节点还保留了每个事件的一些元数据信息,包括事件名,事件描述,相关工具,工具参数等等。因此我们可以通过执行工具的执行生命周期链路来获取返回结果拿到动态数据来作为应急诊断的排查依据。通过这种动静结合的混合召回的方式比纯朴素的 RAG 召回,保障了数据基础设施智能体执行的确定性,专业性和严谨性。
最后通过社区 AWEL+AGENT 技术,通过 AGENT 编排的范式,打造了从意图专家-> 应急诊断专家 -> 诊断根因分析专家。每个 Agent 的职能都是不一样的,意图专家负责识别解析用户的意图和识别告警信息,诊断专家需要通过 GraphRAG 定位到需要分析的根因节点,以及获取具体的根因信息,分析专家需要结合各个根因节点的数据 + 历史分析复盘报告生成诊断分析报告。
可以围绕各自领域构建属于自己的领域资产库包括,知识资产,工具资产以及知识图谱资产。领域资产:领域资产包括了知识库,API,工具脚本。资产处理,整个资产数据链路涉及了领域资产加工,领域资产检索和领域资产评估。
RAG 技术的落地不仅仅是简单的检索加生成,更需要精细化的知识加工与检索策略优化。通过多元化的信息抽取(如知识图谱、QA 对)、多策略混合召回、元数据过滤以及动态工具调用,可以显著提升 RAG 系统的准确性与实用性。在实际应用中,应结合具体业务场景选择合适的优化方案,并建立完善的评测体系持续迭代,以实现真正智能化的问答体验。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online