跳到主要内容AI 知识库:基于 FastAPI 与 LLM 实现 RAG 流程基础功能 | 极客日志PythonAI算法
AI 知识库:基于 FastAPI 与 LLM 实现 RAG 流程基础功能
AI 知识库基于 RAG 流程,利用 FastAPI 异步框架结合 OpenAI 等大模型实现。涵盖文档分块、LLM 引擎调用(流式/阻塞)、向量生成及检索应答。通过 DocumentChunk 处理 docx 文件,LLMEngine 管理模型交互,DocumentVector 构建 QA 对并生成嵌入向量。检索阶段计算问题与答案向量距离,匹配最相似片段后由大模型生成最终回复,实现垂直领域智能问答系统。
二进制18 浏览 AI 知识库 - RAG 流程基础功能实现
本文介绍如何基于 FastAPI 与 tortoise-orm 等异步方式结合 openai 等大模型服务,实现 RAG(检索增强生成)流程的基础功能。系统支持文档解析、向量化存储及智能问答。
文档规整
当前主要支持 .docx 类型文件,后续计划补充 .pdf 等格式。文档处理模块负责将非结构化文本转换为适合大模型处理的片段。
from utils.ai.document_chunk import DocumentChunk
if __name__ == "__main__":
file_path = "./data/测试文档.docx"
image_base_path = "./data/images"
image_base_url = "https://example.com/images"
dc = DocumentChunk(image_base_path=image_base_path, is_debug=True)
content = dc.process_file(file_path)
print(f"Extracted content length: {len(content)}")
大模型调用
你需要查阅对应大模型的文档以及准备 api_key,按照示例代码进行初始化。当前支持 OpenAI、AzureOpenAI、Moonshot,你可以根据以下代码结构自行添加其他模型。
模型客户端封装
class ModelClient:
"""Models Client"""
api_key: str = None
kwargs: dict = {}
@classmethod
def open_ai(cls):
"""OpenAI Client"""
pass
@classmethod
():
():
:
():
.model_name = model_name
.api_key = api_key
.is_debug = is_debug
.args = args
.kwargs = kwargs
ModelClient.api_key = api_key
ModelClient.kwargs = kwargs
.client_dict = {
: ModelClient.open_ai,
: ModelClient.azure_open_ai,
: ModelClient.moonshot,
}
.client = .client_dict.get(model_name)()
def
azure_open_ai
cls
"""AzureOpenAI Client"""
pass
@classmethod
def
moonshot
cls
"""Moonshot Client"""
pass
class
LLMEngine
"""Large Language Models Engine"""
def
__init__
self, model_name: str = None, api_key: str = None, is_debug: bool = False, *args, **kwargs
self
self
self
self
self
self
"open_ai"
"azure_open_ai"
"moonshot"
self
self
WebSocket 实时对话
系统提供了 websocket 接口用于模型对话应答,支持鉴权与流式输出。
from fastapi import APIRouter, WebSocket, Depends
from starlette.websockets import WebSocketDisconnect
chat_ws_router = APIRouter()
@chat_ws_router.websocket("/{token}/{chat_id}")
async def chat(websocket: WebSocket, token: str, chat_id: str, user: dict = Depends(check_user)):
"""WebSocket 对话接口"""
await websocket.accept()
if not user:
await websocket.close(code=1008)
raise Exception("鉴权验证失败")
try:
while True:
data = await websocket.receive_text()
llm_engine = LLMEngine(model_name='azure_open_ai', api_key=api_key)
llm_engine.system_prompt = "你是一名 Python 专家"
response_generator = llm_engine.chat(input=data)
async for chunk in response_generator:
if isinstance(chunk, str):
await websocket.send_text(chunk)
else:
print(type(chunk), chunk)
await websocket.close()
break
except Exception as e:
print(f"WebSocket 连接发生异常:{e}")
await websocket.close()
多轮对话与流式响应
import asyncio
from utils.ai.llm_engine import LLMEngine
async def main():
new_engine = LLMEngine(model_name='azure_open_ai', api_key="your_api_key")
new_engine.system_prompt = "你是一名 Python 专家"
response_generator = new_engine.chat(input="Python 是什么时候诞生的")
async for chunk in response_generator:
if isinstance(chunk, str):
print(f"Received: {chunk}")
else:
print(f"Received chunk type: {type(chunk)}, value: {chunk}")
if __name__ == "__main__":
asyncio.run(main())
一次对话,阻塞响应
通过修改参数可切换为同步阻塞模式,适用于不需要流式输出的场景。
import asyncio
from utils.ai.llm_engine import LLMEngine
async def main():
new_engine = LLMEngine(model_name='azure_open_ai', api_key="your_api_key")
generated_message = await new_engine.chat_only(prompt="你是强大的人工智能", input="你是谁?")
print(generated_message)
if __name__ == "__main__":
asyncio.run(main())
向量化处理
向量化是将文本转换为高维向量空间中的点,以便进行相似度计算。
import asyncio
from utils.ai.llm_engine import LLMEngine
async def main():
llm_engine = LLMEngine(model_name='azure_open_ai', api_key="your_api_key")
question = "1+1 等于多少"
answer = "等于 2"
question_embedding = await llm_engine.embedding(text=question)
answer_embedding = await llm_engine.embedding(text=answer)
return question_embedding, answer_embedding
if __name__ == "__main__":
asyncio.run(main())
生成向量化数据结构
结合大模型生成 QA 段落并构造知识库数据。你可以参考以下表结构和业务逻辑,得到最终向量化的数据结构后结合自身的业务需求进行调整。
核心流程
- 接收文档字符串:通过
DocumentChunk 处理后的内容,或使用其他方式生成。
- 初始化引擎:传入
LLMEngine 实例。
- 生成 Chunk:利用 Prompt 将长文本切分为语义完整的段落。
- 生成 QA 对:利用 Prompt 从段落中提取问题和答案。
- 生成向量:对问题和答案分别进行 Embedding。
import asyncio
from utils.ai.document_chunk import DocumentChunk
from utils.ai.document_vector import DocumentVector
from utils.ai.llm_engine import LLMEngine
async def main():
is_debug = True
file_path = "./data/测试文档分段.docx"
dc = DocumentChunk(image_base_path="./data/images", is_debug=is_debug)
document_content = dc.process_file(file_path)
llm_engine = LLMEngine(model_name='azure_open_ai', api_key="your_api_key", is_debug=is_debug)
dv = DocumentVector(document_content=document_content, llm_engine=llm_engine, is_debug=is_debug)
await dv.gen_chunks()
await dv.gen_qa()
await dv.gen_qa_vector()
if __name__ == '__main__':
asyncio.run(main())
数据结构说明
[
{"index": 1, "chunk": "段落 1"},
{"index": 2, "chunk": "段落 2"}
]
[
{
"Q": "什么是 Python 中的协程?",
"A": "Python 中的协程是一种用于实现异步编程的机制...",
"chunks": ["Python 中的协程是一种用于实现异步编程的机制..."]
}
]
{
'able_id': 0,
'document_id': 0,
'answer': '1+1 等于 2',
'question': '1+1 等于几?',
'chunks': ["1+1=2", "..."],
'answer_embedding': [0.027..., -0.010..., ...],
'question_embedding': [0.027..., -0.010..., ...]
}
向量化应答 (RAG 检索)
这是 RAG 流程的核心环节。当用户提问时,系统将问题向量化,在数据库中检索最相似的向量记录,获取上下文片段,最后由大模型生成回答。
检索逻辑
- 问题嵌入:将用户输入的问题转换为向量。
- 相似度搜索:在向量数据库(如 Milvus, Chroma, Faiss)中计算余弦相似度或欧氏距离。
- Top-K 召回:选取距离最近的 K 个文档片段作为上下文。
- Prompt 组装:将原始问题与检索到的上下文组合成新的 Prompt。
- 模型生成:调用 LLM 根据上下文回答问题。
代码示例
import asyncio
from typing import List, Dict
from utils.ai.llm_engine import LLMEngine
from database.vector_store import VectorStore
class RagRetriever:
def __init__(self, llm_engine: LLMEngine, vector_store: VectorStore):
self.llm_engine = llm_engine
self.vector_store = vector_store
async def retrieve(self, query: str, top_k: int = 3) -> List[Dict]:
"""检索相关片段"""
query_embedding = await self.llm_engine.embedding(text=query)
results = await self.vector_store.search(query_embedding, limit=top_k)
return results
async def generate_answer(self, query: str, context: List[str]) -> str:
"""基于上下文生成回答"""
prompt_template = f"""
请根据以下提供的背景信息回答问题。如果背景信息不足以回答问题,请直接说明。
背景信息:
{chr(10).join(context)}
问题:{query}
回答:
"""
response = await self.llm_engine.chat_only(prompt=prompt_template, input="")
return response
async def process_query(self, query: str) -> str:
"""完整流程"""
context_list = []
results = await self.retrieve(query)
for item in results:
context_list.append(item['answer'])
context_list.extend(item['chunks'])
return await self.generate_answer(query, context_list)
async def test_rag():
llm_engine = LLMEngine(model_name='azure_open_ai', api_key="your_api_key")
vector_store = VectorStore(collection_name="knowledge_base")
retriever = RagRetriever(llm_engine, vector_store)
final_answer = await retriever.process_query("1+1 等于多少?")
print(final_answer)
if __name__ == "__main__":
asyncio.run(test_rag())
总结
本指南详细阐述了构建 AI 知识库的技术路径。通过 FastAPI 提供异步接口,结合 LLM 引擎处理自然语言交互,利用向量化技术实现语义检索。开发者可根据实际业务调整文档解析策略、向量模型选择及数据库方案,以构建高性能的智能问答系统。
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- 随机西班牙地址生成器
随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
- curl 转代码
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online