AI 知识库:基于 FastAPI 与 LLM 实现 RAG 流程基础功能
AI 知识库基于 RAG 流程,利用 FastAPI 异步框架结合 OpenAI 等大模型实现。涵盖文档分块、LLM 引擎调用(流式/阻塞)、向量生成及检索应答。通过 DocumentChunk 处理 docx 文件,LLMEngine 管理模型交互,DocumentVector 构建 QA 对并生成嵌入向量。检索阶段计算问题与答案向量距离,匹配最相似片段后由大模型生成最终回复,实现垂直领域智能问答系统。

AI 知识库基于 RAG 流程,利用 FastAPI 异步框架结合 OpenAI 等大模型实现。涵盖文档分块、LLM 引擎调用(流式/阻塞)、向量生成及检索应答。通过 DocumentChunk 处理 docx 文件,LLMEngine 管理模型交互,DocumentVector 构建 QA 对并生成嵌入向量。检索阶段计算问题与答案向量距离,匹配最相似片段后由大模型生成最终回复,实现垂直领域智能问答系统。

本文介绍如何基于 FastAPI 与 tortoise-orm 等异步方式结合 openai 等大模型服务,实现 RAG(检索增强生成)流程的基础功能。系统支持文档解析、向量化存储及智能问答。
当前主要支持 .docx 类型文件,后续计划补充 .pdf 等格式。文档处理模块负责将非结构化文本转换为适合大模型处理的片段。
# /ExileChat/test/test_ai/test_document_chunk.py
from utils.ai.document_chunk import DocumentChunk
if __name__ == "__main__":
# 文档的路径或静态资源服务器的地址
file_path = "./data/测试文档.docx"
# 文档中图片存放的路径或图片服务器的地址,image_base_path 与 image_base_url 使用其一即可
image_base_path = "./data/images"
image_base_url = "https://example.com/images"
dc = DocumentChunk(image_base_path=image_base_path, is_debug=True)
content = dc.process_file(file_path)
print(f"Extracted content length: {len(content)}")
你需要查阅对应大模型的文档以及准备 api_key,按照示例代码进行初始化。当前支持 OpenAI、AzureOpenAI、Moonshot,你可以根据以下代码结构自行添加其他模型。
# /ExileChat/utils/ai/llm_engine.py
class ModelClient:
"""Models Client"""
api_key: str = None
kwargs: dict = {}
@classmethod
def open_ai(cls):
"""OpenAI Client"""
# 初始化 OpenAI 客户端逻辑
pass
@classmethod
def azure_open_ai(cls):
"""AzureOpenAI Client"""
# 初始化 Azure OpenAI 客户端逻辑
pass
@classmethod
def moonshot(cls):
"""Moonshot Client"""
# 初始化 Moonshot 客户端逻辑
pass
class LLMEngine:
"""Large Language Models Engine"""
def __init__(self, model_name: str = None, api_key: str = None, is_debug: bool = False, *args, **kwargs):
self.model_name = model_name
self.api_key = api_key
self.is_debug = is_debug
self.args = args
self.kwargs = kwargs
ModelClient.api_key = api_key
ModelClient.kwargs = kwargs
self.client_dict = {
"open_ai": ModelClient.open_ai,
"azure_open_ai": ModelClient.azure_open_ai,
"moonshot": ModelClient.moonshot,
}
self.client = self.client_dict.get(model_name)()
系统提供了 websocket 接口用于模型对话应答,支持鉴权与流式输出。
# /ExileChat/app/api/chat_ws/chat_ws.py
from fastapi import APIRouter, WebSocket, Depends
from starlette.websockets import WebSocketDisconnect
chat_ws_router = APIRouter()
@chat_ws_router.websocket("/{token}/{chat_id}")
async def chat(websocket: WebSocket, token: str, chat_id: str, user: dict = Depends(check_user)):
"""WebSocket 对话接口"""
await websocket.accept()
if not user:
await websocket.close(code=1008)
raise Exception("鉴权验证失败")
try:
while True:
data = await websocket.receive_text()
llm_engine = LLMEngine(model_name='azure_open_ai', api_key=api_key)
llm_engine.system_prompt = "你是一名 Python 专家"
response_generator = llm_engine.chat(input=data)
async for chunk in response_generator:
if isinstance(chunk, str):
await websocket.send_text(chunk)
else:
print(type(chunk), chunk)
await websocket.close()
break
except Exception as e:
()
websocket.close()
支持异步迭代接收大模型生成的 Token 流。
# /ExileChat/test/test_ai/test_llm_engine.py
import asyncio
from utils.ai.llm_engine import LLMEngine
async def main():
new_engine = LLMEngine(model_name='azure_open_ai', api_key="your_api_key")
new_engine.system_prompt = "你是一名 Python 专家"
response_generator = new_engine.chat(input="Python 是什么时候诞生的")
async for chunk in response_generator:
if isinstance(chunk, str):
print(f"Received: {chunk}")
else:
print(f"Received chunk type: {type(chunk)}, value: {chunk}")
if __name__ == "__main__":
asyncio.run(main())
通过修改参数可切换为同步阻塞模式,适用于不需要流式输出的场景。
# /ExileChat/test/test_ai/test_llm_engine_only.py
import asyncio
from utils.ai.llm_engine import LLMEngine
async def main():
new_engine = LLMEngine(model_name='azure_open_ai', api_key="your_api_key")
generated_message = await new_engine.chat_only(prompt="你是强大的人工智能", input="你是谁?")
print(generated_message)
if __name__ == "__main__":
asyncio.run(main())
向量化是将文本转换为高维向量空间中的点,以便进行相似度计算。
import asyncio
from utils.ai.llm_engine import LLMEngine
async def main():
llm_engine = LLMEngine(model_name='azure_open_ai', api_key="your_api_key")
question = "1+1 等于多少"
answer = "等于 2"
question_embedding = await llm_engine.embedding(text=question)
answer_embedding = await llm_engine.embedding(text=answer)
return question_embedding, answer_embedding
if __name__ == "__main__":
asyncio.run(main())
结合大模型生成 QA 段落并构造知识库数据。你可以参考以下表结构和业务逻辑,得到最终向量化的数据结构后结合自身的业务需求进行调整。
DocumentChunk 处理后的内容,或使用其他方式生成。LLMEngine 实例。# /ExileChat/test/test_ai/test_document_vector.py
import asyncio
from utils.ai.document_chunk import DocumentChunk
from utils.ai.document_vector import DocumentVector
from utils.ai.llm_engine import LLMEngine
async def main():
is_debug = True
file_path = "./data/测试文档分段.docx"
# 1. 文档分块
dc = DocumentChunk(image_base_path="./data/images", is_debug=is_debug)
document_content = dc.process_file(file_path)
# 2. 初始化引擎
llm_engine = LLMEngine(model_name='azure_open_ai', api_key="your_api_key", is_debug=is_debug)
# 3. 构建向量对象
dv = DocumentVector(document_content=document_content, llm_engine=llm_engine, is_debug=is_debug)
# 4. 执行生成任务
await dv.gen_chunks() # 生成段落
await dv.gen_qa() # 生成 QA 对
await dv.gen_qa_vector() # 生成向量
if __name__ == '__main__':
asyncio.run(main())
Chunks 结构:
[
{"index": 1, "chunk": "段落 1"},
{"index": 2, "chunk": "段落 2"}
]
QA 结构:
[
{
"Q": "什么是 Python 中的协程?",
"A": "Python 中的协程是一种用于实现异步编程的机制...",
"chunks": ["Python 中的协程是一种用于实现异步编程的机制..."]
}
]
最终向量对象:
{
'able_id': 0,
'document_id': 0,
'answer': '1+1 等于 2',
'question': '1+1 等于几?',
'chunks': ["1+1=2", "..."],
'answer_embedding': [0.027..., -0.010..., ...],
'question_embedding': [0.027..., -0.010..., ...]
}
这是 RAG 流程的核心环节。当用户提问时,系统将问题向量化,在数据库中检索最相似的向量记录,获取上下文片段,最后由大模型生成回答。
# /ExileChat/utils/ai/rag_retriever.py
import asyncio
from typing import List, Dict
from utils.ai.llm_engine import LLMEngine
from database.vector_store import VectorStore # 假设存在的向量存储类
class RagRetriever:
def __init__(self, llm_engine: LLMEngine, vector_store: VectorStore):
self.llm_engine = llm_engine
self.vector_store = vector_store
async def retrieve(self, query: str, top_k: int = 3) -> List[Dict]:
"""检索相关片段"""
# 1. 问题向量化
query_embedding = await self.llm_engine.embedding(text=query)
# 2. 向量检索
results = await self.vector_store.search(query_embedding, limit=top_k)
return results
async def generate_answer(self, query: str, context: List[str]) -> str:
"""基于上下文生成回答"""
prompt_template = f"""
请根据以下提供的背景信息回答问题。如果背景信息不足以回答问题,请直接说明。
背景信息:
{chr(10).join(context)}
问题:
回答:
"""
response = .llm_engine.chat_only(prompt=prompt_template, =)
response
() -> :
context_list = []
results = .retrieve(query)
item results:
context_list.append(item[])
context_list.extend(item[])
.generate_answer(query, context_list)
():
llm_engine = LLMEngine(model_name=, api_key=)
vector_store = VectorStore(collection_name=)
retriever = RagRetriever(llm_engine, vector_store)
final_answer = retriever.process_query()
(final_answer)
__name__ == :
asyncio.run(test_rag())
本指南详细阐述了构建 AI 知识库的技术路径。通过 FastAPI 提供异步接口,结合 LLM 引擎处理自然语言交互,利用向量化技术实现语义检索。开发者可根据实际业务调整文档解析策略、向量模型选择及数据库方案,以构建高性能的智能问答系统。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online