基于 FastAPI 的 AI 知识库 RAG 流程基础功能实现
基于 FastAPI 与异步 ORM 构建 AI 知识库,涵盖文档分块、大模型调用、流式响应及向量检索生成等核心功能。文章详细介绍了如何使用 DocumentChunk 处理文档,LLMEngine 对接多种大模型接口,以及通过向量化技术实现 QA 对的自动生成与存储。此外,补充了完整的检索增强生成(RAG)流程,包括问题嵌入、向量检索、上下文构建及最终回答生成,为开发者提供了一套可落地的 AI 知识库实现方案。

基于 FastAPI 与异步 ORM 构建 AI 知识库,涵盖文档分块、大模型调用、流式响应及向量检索生成等核心功能。文章详细介绍了如何使用 DocumentChunk 处理文档,LLMEngine 对接多种大模型接口,以及通过向量化技术实现 QA 对的自动生成与存储。此外,补充了完整的检索增强生成(RAG)流程,包括问题嵌入、向量检索、上下文构建及最终回答生成,为开发者提供了一套可落地的 AI 知识库实现方案。

本项目基于 FastAPI 与tortoise-orm 等异步方式结合 openai 实现。
当前仅支持 .docx 类型,后续补充 .pdf 等类型。
# /ExileChat/test/test_ai/test_document_chunk.py
from utils.ai.document_chunk import DocumentChunk
if __name__ == "__main__":
# 文档的路径或静态资源服务器的地址
file_path = "./test/测试文档.docx"
# 文档中图片存放的路径或图片服务器的地址,`image_base_path` 与 `image_base_url` 使用其一即可
image_base_path = "./test/test_ai"
image_base_url = "https://www.example.com"
dc = DocumentChunk(image_base_path=image_base_path, is_debug=True)
dc.process_file(file_path)
你需要查阅对应大模型的文档以及准备 api_key,按照例子来使用它。当前仅支持 OpenAI、AzureOpenAI、Moonshot,当然你可以自行添加其他模型,你需要根据以下代码示例实现对应的初始化工作。
# /ExileChat/utils/ai/llm_engine.py
# 代码片段
class ModelClient:
"""Models Client"""
api_key: str = None
kwargs: dict = {}
@classmethod
def open_ai(cls):
"""OpenAI Client"""
...
@classmethod
def azure_open_ai(cls):
"""AzureOpenAI Client"""
...
@classmethod
def moonshot(cls):
"""Moonshot Client"""
...
# 代码片段
class LLMEngine:
"""Large Language Models Engine"""
def __init__(self, model_name: str = None, api_key: str = None, is_debug: bool = False, *args, **kwargs):
self.model_name = model_name
self.api_key = api_key
self.is_debug = is_debug
self.args = args
self.kwargs = kwargs
ModelClient.api_key = api_key
ModelClient.kwargs = kwargs
self.client_dict = {
"open_ai": ModelClient.open_ai,
"azure_open_ai": ModelClient.azure_open_ai,
"moonshot": ModelClient.moonshot,
}
...
当然也提供了 websocket 与模型对话应答。
# /ExileChat/app/api/chat_ws/chat_ws.py
# 代码片段
# 把鉴权删除,替换模型与提示词即可。
@chat_ws_router.websocket("/{token}/{chat_id}")
async def chat(websocket: WebSocket, token: str, chat_id: str, user: dict = Depends(check_user)):
"""对话"""
await websocket.accept()
if not user:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
print("鉴权验证失败 ws 关闭...")
raise CustomException(status_code=403, detail="鉴权验证失败...", custom_code=10005)
try:
while True:
data = await websocket.receive_text()
# await websocket.send_text(f"chat_id: {chat_id} 用户:{user} token: {token} 消息:{data}")
# 测试代码
llm_engine = LLMEngine(model_name='azure_open_ai', api_key=api_key)
llm_engine.system_prompt = "你是一名 Python 专家"
response_generator = llm_engine.chat(input=data)
async for chunk in response_generator:
if isinstance(chunk, str):
await websocket.send_text(chunk)
else:
print(type(chunk), chunk)
await websocket.close()
break
except Exception as e:
print(f"WebSocket 连接发生异常:{e}")
await websocket.close()
# /ExileChat/test/test_ai/test_llm_engine.py
import asyncio
from utils.ai.llm_engine import LLMEngine
from api_key import api_key
async def main():
"""main"""
new_engine = LLMEngine(model_name='azure_open_ai', api_key=api_key)
new_engine.system_prompt = "你是一名 Python 专家"
response_generator = new_engine.chat(input="Python 是什么时候诞生的")
async for chunk in response_generator:
if isinstance(chunk, str):
print(f"Received: {chunk}")
else:
print(f"Received chunk type: {type(chunk)}, value: {chunk}")
if __name__ == "__main__":
asyncio.run(main())
你可以通过修改 stream 改变响应性方式。
# /ExileChat/test/test_ai/test_llm_engine_only.py
import asyncio
from utils.ai.llm_engine import LLMEngine
from api_key import api_key
async def main():
"""main"""
new_engine = LLMEngine(model_name='azure_open_ai', api_key=api_key)
generated_message = await new_engine.chat_only(prompt="你是强大的人工智能", input="你是谁?")
print(generated_message)
if __name__ == "__main__":
asyncio.run(main())
import asyncio
from utils.ai.llm_engine import LLMEngine
from api_key import api_key
async def main():
"""main"""
llm_engine = LLMEngine(model_name='azure_open_ai', api_key=api_key)
question = "1+1 等于多少"
answer = "等于 2"
question_embedding = await llm_engine.embedding(text=question)
answer_embedding = await llm_engine.embedding(text=answer)
return question_embedding, answer_embedding
if __name__ == "__main__":
asyncio.run(main())
组合使用:结合大模型生成 QA 段落并且向量化构造知识库数据。你完全可以不需要理会我项目的表结构和业务逻辑,你得到最终向量化的数据结构,结合自身的业务即可。
# /ExileChat/test/test_ai/test_document_vector.py
import asyncio
from api_key import api_key
from utils.ai.llm_engine import LLMEngine
from utils.ai.document_chunk import DocumentChunk
from utils.ai.document_vector import DocumentVector
"""
文档向量
1.接收已经处理好的文档字符串。
通过 `from utils.ai.document_chunk import DocumentChunk` 文档规整生成文档字符串,当然你可以使用其他更好方式生成它。
2.接收大模型引擎 `LLMEngine` 实例,例如以下使用了 `azure_open_ai` 作为模型。
llm_engine = LLMEngine(model_name='azure_open_ai', api_key=api_key, is_debug=is_debug)
3.使用 `LLMEngine` 结合 `prompt` 文档生成 `chunks`,同时 `self.chunks` 也会被赋值,如果你需要自定义可以修改它 `self.chunks=[...]`
dv = DocumentVector(...)
dv.gen_chunks()
*.得到例如以下结构的数据,自定义时需要按照以下数据结构。
[
{
"index": 1,
"chunk": "段落 1"
},
{
"index": 2,
"chunk": "段落 2"
}
...
]
4.使用 `LLMEngine` 结合 `prompt` 对段落生成 `QA`,同时 `self.qa` 也会被赋值,如果你需要自定义可以修改它 `self.qa=[...]`
dv.gen_qa()
*.得到例如以下结构的数据,自定义时需要按照以下数据结构。
[
{
"Q": "什么是 Python 中的协程?",
"A": "Python 中的协程是一种用于实现异步编程的机制,它允许程序在等待操作完成时挂起执行,从而可以执行其他任务。",
"chunks": [
"Python 中的协程是一种用于实现异步编程的机制,它允许程序在等待操作完成时挂起执行,从而可以执行其他任务。"
]
},
{
"Q": "协程在 Python 中的作用是什么?",
"A": "协程是 Python 语言中的一个特性,用于编写更高效和响应更快的代码。",
"chunks": [
"协程是 Python 语言中的一个特性,用于编写更高效和响应更快的代码。"
]
},
...
]
5.生成 `QA` 向量。
dv.gen_qa_vector()
*.最终会得到一个组装好的对象列表,例如以下结构的数据。
[
{
'able_id': 0,
'document_id': 0,
'answer': '1+1 等于 2',
'question': '1+1 等于几?',
'chunks': ["1+1=2", "..."],
'answer_embedding': [0.02715362422168255, -0.010939446277916431, 0.006153851747512817, -0.009505090303719044, ...],
'question_embedding': [0.02715362422168255, -0.010939446277916431, 0.006153851747512817, -0.009505090303719044, ...],
},
...
]
"""
if __name__ == '__main__':
is_debug = True
# 为何测试问答的精准,我使用了我的毕业论文作为测试,hhh。
# 你可以把下面 document_content = "1+1=2" 注释打开来进行测试。
# file_path = "./test/基于 Python+Vue 自动化测试平台的设计与实现.docx"
file_path = "./test/测试文档分段.docx"
dc = DocumentChunk(image_base_path="./test/test_ai", is_debug=is_debug)
document_content = dc.process_file(file_path)
# document_content = "1+1=2"
llm_engine = LLMEngine(model_name='azure_open_ai', api_key=api_key, is_debug=is_debug)
dv = DocumentVector(document_content=document_content, llm_engine=llm_engine, is_debug=is_debug)
asyncio.run(dv.gen_chunks())
asyncio.run(dv.gen_qa())
asyncio.run(dv.gen_qa_vector())
问题向量化通过数据库检索得到向量距离最相近的答案,结合大模型生成应答。以下是完整的检索与生成流程示例。
# /ExileChat/utils/ai/retrieval.py
import asyncio
from typing import List, Dict
from utils.ai.llm_engine import LLMEngine
async def retrieve_and_answer(question: str, top_k: int = 3):
"""
检索增强生成 (RAG) 核心函数
:param question: 用户问题
:param top_k: 检索数量
:return: 生成的回答
"""
# 1. 初始化引擎
llm_engine = LLMEngine(model_name='azure_open_ai', api_key=api_key)
# 2. 向量化问题
question_embedding = await llm_engine.embedding(text=question)
# 3. 检索相似向量 (此处模拟数据库查询)
# 实际场景中应调用 VectorDB.search(query_vector=question_embedding, limit=top_k)
retrieved_docs = await search_database_simulated(question_embedding, top_k)
# 4. 构建上下文
context = "\n".join([doc['answer'] for doc in retrieved_docs])
prompt = f"""基于以下知识库内容回答问题:
知识库:
{context}
问题:{question}
如果知识库中没有相关信息,请说明无法回答。
"""
# 5. 生成回答
response = await llm_engine.chat_only(prompt=prompt, input=question)
return response
async def search_database_simulated(embedding: List[float], limit: int):
"""模拟数据库检索"""
# 这里应替换为真实的向量数据库查询逻辑
# 例如 Milvus, Chroma, Pinecone 等
return []
if __name__ == "__main__":
asyncio.run(retrieve_and_answer("1+1 等于多少"))
通过以上步骤,即可完成从文档解析、向量化存储到检索问答的完整 RAG 流程闭环。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online