
1. 项目概述与架构设计
1.1 系统核心设计理念
本系统采用 'AI 优先,人工兜底' 的混合交互模式,构建一个能够自动处理 80% 常规查询,同时无缝转接复杂问题给人工坐席的智能客服系统。系统基于微服务架构,确保高可用性、可扩展性和模块化设计。
1.2 整体架构图
┌─────────────────────────────────────────────────────────────┐
│ 客户端层 (多渠道接入) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Web 聊天 │ │ 移动 APP │ │ 微信 │ │ 电话接口│ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
└────────────────────────┬────────────────────────────────────┘
│ HTTPS/WebSocket
┌────────────────────────┴────────────────────────────────────┐
│ API 网关层 (统一接入) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 认证授权 │ 流量控制 │ 协议转换 │ 请求路由 │ 负载均衡 │ │
│ └─────────────────────────────────────────────────────┘ │
└────────────────────────┬────────────────────────────────────┘
│ 内部 RPC/gRPC
┌────────────────────────┴────────────────────────────────────┐
│ 业务逻辑层 (核心服务) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │对话管理 │ │意图识别 │ │知识检索 │ │坐席协作 │ │
│ │ 服务 │ │ 服务 │ │ 服务 │ │ 服务 │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
└────────────────────────┬────────────────────────────────────┘
│ 消息队列/数据库访问
┌────────────────────────┴────────────────────────────────────┐
│ 数据与 AI 层 (能力支撑) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │向量数据库│ │关系数据库│ │缓存服务 │ │AI 模型 │ │
│ │(FAISS/Chroma)│(PostgreSQL/MySQL)│(Redis)│ │服务 │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────────────────────────┘
1.3 技术栈选择
| 组件 | 技术选择 | 理由 |
|---|---|---|
| 后端框架 | FastAPI + Python 3.10+ | 异步高性能,自动 API 文档生成 |
| 对话引擎 | Rasa 3.x + 自定义扩展 | 开源 NLU 框架,灵活可控 |
| AI 模型 | Sentence Transformers, BERT, GPT-2/3 | 平衡性能与效果 |
| 向量数据库 | FAISS + PostgreSQL (pgvector) | 高性能相似度检索 |
| 实时通信 | WebSocket + Redis Pub/Sub | 低延迟消息传递 |
| 前端框架 | React + TypeScript + TailwindCSS | 现代化界面,组件化 |
| 坐席桌面 | Electron + React | 跨平台桌面应用 |
| 部署 | Docker + Kubernetes | 容器化,易于扩展 |
2. 环境搭建与项目初始化
2.1 开发环境配置
# 1. 创建项目目录结构
mkdir -p ai-human-customer-service
cd ai-human-customer-service
# 2. 创建 Python 虚拟环境
python3.10 -m venv venv
source venv/bin/activate # Linux/Mac
# venv\Scripts\activate # Windows
# 3. 安装基础依赖
pip install --upgrade pip
pip install fastapi==0.104.1
pip install uvicorn[standard]==0.24.0
pip install sqlalchemy==2.0.23
pip install asyncpg==0.29.0
pip install redis==5.0.1
pip install pydantic==2.5.0
pip install pydantic-settings==2.1.0
pip install python-multipart==0.0.6
pip install websockets==12.0
pip install aiohttp==3.9.1
pip install jwt==1.3.1
pip install python-dateutil==2.8.2
pip install loguru==0.7.2
# 4. 创建项目结构
mkdir -p {src,tests,docs,scripts,deploy}
mkdir -p src/{api,core,models,schemas,services,utils,ai_components}
mkdir -p src/api/{endpoints,middleware}
mkdir -p src/core/{config,database,security}
mkdir -p src/ai_components/{nlp,retrieval,models}
mkdir -p tests/{unit,integration}
2.2 配置文件设计
创建 src/core/config.py:
""" 系统配置管理模块
使用 pydantic-settings 进行配置管理,支持环境变量覆盖
"""
from typing import Optional, List, Dict, Any
from pydantic_settings import BaseSettings
from pydantic import Field, validator
import secrets
class Settings(BaseSettings):
"""应用配置类"""
# 应用基础配置
APP_NAME: str = "AI-Human Customer Service"
APP_VERSION: str = "1.0.0"
DEBUG: bool = False
ENVIRONMENT: str = "development" # development, staging, production
# API 配置
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "AI Human Customer Service"
BACKEND_CORS_ORIGINS: List[str] = ["http://localhost:3000"]
# 安全配置
SECRET_KEY: str = Field(default_factory=lambda: secrets.token_urlsafe(32))
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 7 # 7 天
# 数据库配置
POSTGRES_SERVER: str = "localhost"
POSTGRES_USER: str =
POSTGRES_PASSWORD: =
POSTGRES_DB: =
POSTGRES_PORT: =
DATABASE_URL: [] =
() -> :
(v, ):
v
()
REDIS_HOST: =
REDIS_PORT: =
REDIS_DB: =
REDIS_PASSWORD: [] =
REDIS_URL: [] =
() -> :
(v, ):
v
password = values.get()
auth_part = password
()
AI_MODEL_PATH: =
SENTENCE_TRANSFORMER_MODEL: =
BERT_MODEL_PATH: =
GPT_MODEL_PATH: =
USE_GPU: =
AI_MODEL_CACHE_SIZE: =
MAX_CONVERSATION_HISTORY: =
SESSION_TIMEOUT_MINUTES: =
DEFAULT_AI_CONFIDENCE_THRESHOLD: =
MAX_AGENTS_PER_QUEUE: =
AGENT_HEARTBEAT_INTERVAL: =
AGENT_AWAY_TIMEOUT: =
RABBITMQ_HOST: =
RABBITMQ_PORT: =
RABBITMQ_USER: =
RABBITMQ_PASSWORD: =
ENABLE_METRICS: =
METRICS_PORT: =
LOG_LEVEL: =
:
env_file =
case_sensitive =
extra =
settings = Settings()
ENV_TEMPLATE =
():
(, ) f:
secret_key = secrets.token_urlsafe()
f.write(ENV_TEMPLATE.(secret_key=secret_key))
()
__name__ == :
create_env_file()
3. 核心数据模型设计
3.1 数据库模型定义
创建 src/models/base.py:
""" 数据库基础模型定义
使用 SQLAlchemy ORM 进行数据建模
"""
from datetime import datetime
from typing import Any, Dict, Optional
from sqlalchemy import Column, DateTime, Integer, String, Boolean, Text, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
Base = declarative_base()
class TimestampMixin:
"""时间戳混入类"""
created_at = Column(DateTime, default=func.now(), nullable=False)
updated_at = Column(DateTime, default=func.now(), onupdate=func.now(), nullable=False)
deleted_at = Column(DateTime, nullable=True)
class Customer(Base, TimestampMixin):
"""客户模型"""
__tablename__ = "customers"
id = Column(String(36), primary_key=True, index=True)
external_id = Column(String(100), unique=True, nullable=True, index=True)
name = Column(String(200), nullable=True)
email = Column(String(254), nullable=True, index=True)
phone = Column(String(50), nullable=True, index=True)
metadata = Column(JSON, nullable=True, default=dict)
tags = Column(JSON, nullable=True, default=list)
segment = Column(String(), nullable=)
lifetime_value = Column(Integer, default=)
last_interaction_at = Column(DateTime, nullable=)
is_active = Column(Boolean, default=)
notes = Column(Text, nullable=)
() -> [, ]:
{
: .,
: .external_id,
: .name,
: .email,
: .phone,
: .metadata {},
: .tags [],
: .segment,
: .lifetime_value,
: .last_interaction_at.isoformat() .last_interaction_at ,
: .is_active,
: .notes,
: .created_at.isoformat(),
: .updated_at.isoformat()
}
(Base, TimestampMixin):
__tablename__ =
= Column(String(), primary_key=, index=)
customer_id = Column(String(), index=, nullable=)
channel = Column(String(), nullable=)
status = Column(String(), default=)
assigned_agent_id = Column(String(), nullable=, index=)
assigned_ai_model = Column(String(), nullable=)
metadata = Column(JSON, nullable=, default=)
sentiment_score = Column(Integer, nullable=)
satisfaction_score = Column(Integer, nullable=)
closed_at = Column(DateTime, nullable=)
close_reason = Column(String(), nullable=)
() -> [, ]:
{
: .,
: .customer_id,
: .channel,
: .status,
: .assigned_agent_id,
: .assigned_ai_model,
: .metadata {},
: .sentiment_score,
: .satisfaction_score,
: .closed_at.isoformat() .closed_at ,
: .close_reason,
: .created_at.isoformat(),
: .updated_at.isoformat()
}
(Base, TimestampMixin):
__tablename__ =
= Column(String(), primary_key=, index=)
conversation_id = Column(String(), index=, nullable=)
sender_type = Column(String(), nullable=)
sender_id = Column(String(), nullable=)
content = Column(Text, nullable=)
content_type = Column(String(), default=)
metadata = Column(JSON, nullable=, default=)
intent = Column(String(), nullable=)
confidence = Column(Integer, nullable=)
is_read = Column(Boolean, default=)
read_at = Column(DateTime, nullable=)
ai_model_used = Column(String(), nullable=)
ai_confidence = Column(Integer, nullable=)
ai_metadata = Column(JSON, nullable=, default=)
() -> [, ]:
{
: .,
: .conversation_id,
: .sender_type,
: .sender_id,
: .content,
: .content_type,
: .metadata {},
: .intent,
: .confidence,
: .is_read,
: .read_at.isoformat() .read_at ,
: .ai_model_used,
: .ai_confidence,
: .ai_metadata {},
: .created_at.isoformat()
}
(Base, TimestampMixin):
__tablename__ =
= Column(String(), primary_key=, index=)
user_id = Column(String(), unique=, nullable=)
name = Column(String(), nullable=)
email = Column(String(), unique=, nullable=)
status = Column(String(), default=)
skills = Column(JSON, nullable=, default=)
current_conversation_ids = Column(JSON, nullable=, default=)
max_concurrent_chats = Column(Integer, default=)
metadata = Column(JSON, nullable=, default=)
last_active_at = Column(DateTime, nullable=)
availability_schedule = Column(JSON, nullable=)
performance_score = Column(Integer, default=)
is_active = Column(Boolean, default=)
() -> [, ]:
{
: .,
: .user_id,
: .name,
: .email,
: .status,
: .skills [],
: .current_conversation_ids [],
: .max_concurrent_chats,
: .metadata {},
: .last_active_at.isoformat() .last_active_at ,
: .availability_schedule {},
: .performance_score,
: .is_active,
: .created_at.isoformat(),
: .updated_at.isoformat()
}
(Base, TimestampMixin):
__tablename__ =
= Column(String(), primary_key=, index=)
title = Column(String(), nullable=)
content = Column(Text, nullable=)
category = Column(String(), nullable=, index=)
tags = Column(JSON, nullable=, default=)
language = Column(String(), default=)
is_active = Column(Boolean, default=)
vector_embedding = Column(JSON, nullable=)
metadata = Column(JSON, nullable=, default=)
usage_count = Column(Integer, default=)
last_used_at = Column(DateTime, nullable=)
() -> [, ]:
{
: .,
: .title,
: .content,
: .category,
: .tags [],
: .language,
: .is_active,
: .metadata {},
: .usage_count,
: .last_used_at.isoformat() .last_used_at ,
: .created_at.isoformat(),
: .updated_at.isoformat()
}
(Base, TimestampMixin):
__tablename__ =
= Column(String(), primary_key=, index=)
name = Column(String(), nullable=, unique=)
description = Column(Text, nullable=)
examples = Column(JSON, nullable=, default=)
handler_type = Column(String(), default=)
confidence_threshold = Column(Integer, default=)
metadata = Column(JSON, nullable=, default=)
is_active = Column(Boolean, default=)
() -> [, ]:
{
: .,
: .name,
: .description,
: .examples [],
: .handler_type,
: .confidence_threshold,
: .metadata {},
: .is_active,
: .created_at.isoformat(),
: .updated_at.isoformat()
}
(Base, TimestampMixin):
__tablename__ =
= Column(String(), primary_key=, index=)
conversation_id = Column(String(), index=, nullable=)
message_id = Column(String(), index=, nullable=)
ai_model = Column(String(), nullable=)
prompt = Column(Text, nullable=)
response = Column(Text, nullable=)
confidence = Column(Integer, nullable=)
processing_time_ms = Column(Integer, nullable=)
tokens_used = Column(Integer, nullable=)
metadata = Column(JSON, nullable=, default=)
() -> [, ]:
{
: .,
: .conversation_id,
: .message_id,
: .ai_model,
: .prompt,
: .response,
: .confidence,
: .processing_time_ms,
: .tokens_used,
: .metadata {},
: .created_at.isoformat()
}
3.2 数据库初始化脚本
创建 scripts/init_database.py:
#!/usr/bin/env python3
""" 数据库初始化脚本
创建数据库表结构和初始数据
"""
import asyncio
import sys
from pathlib import Path
# 添加项目根目录到 Python 路径
sys.path.append(str(Path(__file__).parent.parent))
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import text
from src.core.config import settings
from src.models.base import Base
from src.models import *
# 导入所有模型
async def init_database():
"""初始化数据库"""
print("🚀 开始初始化数据库...")
# 创建异步引擎
database_url = settings.DATABASE_URL.replace("postgresql+asyncpg", "postgresql")
sync_engine = create_async_engine(
database_url,
echo=settings.DEBUG,
pool_pre_ping=True,
pool_size=20,
max_overflow=30
)
# 创建所有表
print("📦 创建数据库表...")
async with sync_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
print("✅ 数据库表创建完成")
# 创建初始数据
print("📝 创建初始数据...")
# 使用异步会话
async_session = sessionmaker(
sync_engine,
class_=AsyncSession,
expire_on_commit=
)
async_session() session:
:
sample_intents = [
{: , : , : ,
: [, , , , ],
: , : },
{: , : , : ,
: [, , , , ],
: , : },
{: , : , : ,
: [, , , , ],
: , : },
{: , : , : ,
: [, , , , ],
: , : },
{: , : , : ,
: [, , , , ],
: , : }
]
intent_data sample_intents:
session.execute(text(), intent_data)
knowledge_entries = [
{: , : ,
: ,
: , : [, ], : },
{: , : ,
: ,
: , : [, , ], : },
{: , : ,
: ,
: , : [, , ], : },
{: , : ,
: ,
: , : [, , ], : }
]
kb_data knowledge_entries:
session.execute(text(), kb_data)
session.commit()
()
Exception e:
session.rollback()
()
()
():
()
:
database_url = settings.DATABASE_URL.replace(, )
engine = create_async_engine(database_url)
engine.connect() conn:
result = conn.execute(text())
version = result.scalar()
()
engine.dispose()
Exception e:
()
():
(*)
()
(*)
check_database_connection():
()
init_database()
__name__ == :
asyncio.run(main())
4. AI 核心组件实现
4.1 NLP 处理器(意图识别与情感分析)
创建 src/ai_components/nlp/processor.py:
""" NLP 处理器
负责意图识别、情感分析、实体提取等自然语言处理任务
"""
import asyncio
import logging
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
import numpy as np
from datetime import datetime
# 导入必要的 NLP 库
try:
import jieba
import jieba.analyse
JIEBA_AVAILABLE = True
except ImportError:
JIEBA_AVAILABLE = False
print("⚠️ jieba 未安装,中文分词功能将受限")
try:
from transformers import (
AutoTokenizer, AutoModelForSequenceClassification, pipeline, BertTokenizer, BertModel
)
from sentence_transformers import SentenceTransformer
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
print("⚠️ transformers/sentence-transformers 未安装")
from src.core.config import settings
@dataclass
class IntentResult:
"""意图识别结果"""
intent: str
confidence: float
entities: Dict[str, Any]
alternatives: List[Dict[, ]]
:
sentiment:
score:
confidence:
emotions: [, ]
:
text:
intent_result: IntentResult
sentiment_result: SentimentResult
entities: [, ]
keywords: []
language:
processing_time_ms:
:
():
.logger = logging.getLogger(__name__)
.initialized =
.models = {}
.tokenizers = {}
.confidence_threshold =
.max_sequence_length =
():
.initialized:
.logger.info()
:
JIEBA_AVAILABLE:
jieba.set_dictionary()
jieba.initialize()
TRANSFORMERS_AVAILABLE:
.logger.info()
.sentence_model = SentenceTransformer(
settings.SENTENCE_TRANSFORMER_MODEL,
cache_folder=settings.AI_MODEL_PATH
)
.logger.info()
.sentiment_analyzer = pipeline(, model=, device= settings.USE_GPU -)
.logger.info()
.bert_tokenizer = BertTokenizer.from_pretrained(
settings.BERT_MODEL_PATH,
cache_dir=settings.AI_MODEL_PATH
)
.bert_model = BertModel.from_pretrained(
settings.BERT_MODEL_PATH,
cache_dir=settings.AI_MODEL_PATH
)
settings.USE_GPU:
.bert_model.cuda()
.intent_patterns = ._load_intent_patterns()
.initialized =
.logger.info()
Exception e:
.logger.error()
() -> [, []]:
{
: [, , , , , ],
: [, , , , ],
: [, , , , , ],
: [, , , , , ],
: [, , , , , ],
: [, , , , , ],
: [, , , , ],
: [, , , , ],
: [, , , , ],
: []
}
() -> NLPAnalysis:
start_time = datetime.now()
.initialized:
.initialize()
language = ._detect_language(text)
intent_task = asyncio.create_task(.detect_intent(text, context))
sentiment_task = asyncio.create_task(.analyze_sentiment(text))
entity_task = asyncio.create_task(.extract_entities(text))
keyword_task = asyncio.create_task(.extract_keywords(text, language))
intent_result = intent_task
sentiment_result = sentiment_task
entities = entity_task
keywords = keyword_task
processing_time = (datetime.now() - start_time).total_seconds() *
NLPAnalysis(
text=text,
intent_result=intent_result,
sentiment_result=sentiment_result,
entities=entities,
keywords=keywords,
language=language,
processing_time_ms=processing_time
)
() -> IntentResult:
:
rule_based_result = ._rule_based_intent_detection(text)
similarity_based_result = ._similarity_based_intent_detection(text)
final_intent, final_confidence, alternatives = ._fuse_intent_results(
rule_based_result, similarity_based_result
)
entities = .extract_entities(text)
context:
final_intent = ._apply_context_rules(final_intent, context)
IntentResult(
intent=final_intent,
confidence=final_confidence,
entities=entities,
alternatives=alternatives
)
Exception e:
.logger.error()
IntentResult(intent=, confidence=, entities={}, alternatives=[])
() -> [, ]:
text_lower = text.lower()
best_intent =
best_score =
intent, patterns .intent_patterns.items():
score =
pattern patterns:
pattern text_lower:
score +=
score > :
pattern_count = (patterns)
normalized_score = score / pattern_count
normalized_score > best_score:
best_score = normalized_score
best_intent = intent
best_intent, best_score
() -> [, ]:
TRANSFORMERS_AVAILABLE:
,
:
text_embedding = .sentence_model.encode(text)
intent_scores = {}
intent, examples .intent_patterns.items():
examples:
example_embeddings = .sentence_model.encode(examples)
similarities = np.dot(example_embeddings, text_embedding.T)
max_similarity = np.(similarities)
normalized_score = ((max_similarity + ) / )
intent_scores[intent] = normalized_score
intent_scores:
best_intent = (intent_scores, key=intent_scores.get)
best_score = intent_scores[best_intent]
best_intent, best_score
,
Exception e:
.logger.error()
,
() -> [, , []]:
rule_intent, rule_score = rule_result
sim_intent, sim_score = similarity_result
rule_weight =
sim_weight =
scores = {}
scores[rule_intent] = rule_score * rule_weight
scores[sim_intent] = scores.get(sim_intent, ) + sim_score * sim_weight
best_intent = (scores, key=scores.get)
best_score = scores[best_intent]
alternatives = [{: intent, : score} intent, score scores.items() intent != best_intent score > ]
alternatives.sort(key= x: x[], reverse=)
best_intent, best_score, alternatives
() -> :
intent == context.get(, ) >= :
intent == context.get() == :
intent
() -> SentimentResult:
:
TRANSFORMERS_AVAILABLE:
._simple_sentiment_analysis(text)
results = .sentiment_analyzer(text)
results:
result = results[]
label = result[]
score = result[]
label [, , ]:
sentiment =
label [, , ]:
sentiment =
:
sentiment =
emotions = ._analyze_emotions(text)
SentimentResult(
sentiment=sentiment,
score=score,
confidence=score,
emotions=emotions
)
SentimentResult(sentiment=, score=, confidence=, emotions={})
Exception e:
.logger.error()
SentimentResult(sentiment=, score=, confidence=, emotions={})
() -> SentimentResult:
positive_words = [, , , , , , ]
negative_words = [, , , , , , ]
positive_count = ( word positive_words word text)
negative_count = ( word negative_words word text)
total = positive_count + negative_count
total == :
SentimentResult(sentiment=, score=, confidence=, emotions={})
sentiment_score = positive_count / total
sentiment_score > :
sentiment =
sentiment_score < :
sentiment =
:
sentiment =
SentimentResult(
sentiment=sentiment,
score=sentiment_score,
confidence=(sentiment_score, - sentiment_score) * ,
emotions={}
)
() -> [, ]:
emotion_keywords = {
: [, , , , ],
: [, , , , ],
: [, , , , ],
: [, , , ],
: [, , , ],
: [, , , ]
}
emotions = {}
emotion, keywords emotion_keywords.items():
count = ( keyword keywords keyword text)
count > :
emotions[emotion] = count / (keywords)
emotions
() -> [, ]:
entities = {: [], : [], : [], : [], : [], : []}
re
numbers = re.findall(, text)
entities[] = numbers
product_keywords = [, , , , ]
keyword product_keywords:
keyword text:
idx = text.find(keyword)
start = (, idx - )
end = ((text), idx + )
entities[].append(text[start:end])
date_patterns = [, , , ]
pattern date_patterns:
dates = re.findall(pattern, text)
entities[].extend(dates)
entities
() -> []:
language == JIEBA_AVAILABLE:
keywords = jieba.analyse.extract_tags(
text, topK=, withWeight=, allowPOS=(, , , , , , ))
keywords
:
words = text.lower().split()
stop_words = {, , , , , , , , , , }
keywords = [word word words word stop_words]
keywords[:]
() -> :
re
re.search(, text):
re.search(, text):
() -> np.ndarray:
TRANSFORMERS_AVAILABLE:
RuntimeError()
.initialized:
.initialize()
embedding = .sentence_model.encode(text)
embedding
() -> [NLPAnalysis]:
tasks = [.analyze_text(text) text texts]
results = asyncio.gather(*tasks)
results
nlp_processor = NLPProcessor()
4.2 知识检索系统
创建 src/ai_components/retrieval/engine.py:
""" 知识检索引擎
基于向量相似度从知识库中检索相关信息
"""
import asyncio
import logging
import json
from typing import List, Dict, Optional, Any, Tuple
from datetime import datetime
import numpy as np
from dataclasses import dataclass
# 导入向量数据库库
try:
import faiss
FAISS_AVAILABLE = True
except ImportError:
FAISS_AVAILABLE = False
print("⚠️ faiss 未安装,向量检索功能将受限")
try:
import chromadb
from chromadb.config import Settings
CHROMA_AVAILABLE = True
except ImportError:
CHROMA_AVAILABLE = False
print("⚠️ chromadb 未安装,向量数据库功能将受限")
from src.core.config import settings
from src.ai_components.nlp.processor import nlp_processor
@dataclass
class SearchResult:
"""搜索结果"""
id: str
content: str
score: float
metadata: Dict[str, Any]
source: str
:
query:
results: [SearchResult]
suggested_questions: []
processing_time_ms:
:
():
.logger = logging.getLogger(__name__)
.initialized =
.faiss_index =
.id_to_content = {}
.embedding_dim =
.chroma_client =
.chroma_collection =
.cache = {}
.cache_size =
():
.initialized:
.logger.info()
:
nlp_processor.initialize()
CHROMA_AVAILABLE:
._initialize_chromadb()
FAISS_AVAILABLE:
._initialize_faiss()
._load_knowledge_base()
.initialized =
.logger.info()
Exception e:
.logger.error()
():
.logger.info()
.chroma_client = chromadb.Client(Settings(
chroma_db_impl=,
persist_directory=
))
collection_name =
:
.chroma_collection = .chroma_client.get_collection(collection_name)
.logger.info()
:
.chroma_collection = .chroma_client.create_collection(
name=collection_name,
metadata={: }
)
.logger.info()
():
.logger.info()
.faiss_index = faiss.IndexFlatIP(.embedding_dim)
settings.USE_GPU (faiss, ):
.logger.info()
res = faiss.StandardGpuResources()
.faiss_index = faiss.index_cpu_to_gpu(res, , .faiss_index)
():
.logger.info()
sample_knowledge = [
{: , : ,
: , : {: , : }},
{: , : ,
: , : {: , : }},
{: , : ,
: , : {: , : }},
{: , : ,
: , : {: , : }},
{: , : ,
: , : {: , : }}
]
.add_documents(sample_knowledge)
.logger.info()
():
documents:
contents = [doc[] doc documents]
embeddings = ._generate_embeddings(contents)
FAISS_AVAILABLE .faiss_index :
embeddings_np = np.array(embeddings).astype()
.faiss_index.add(embeddings_np)
i, doc (documents):
doc_id = doc[]
.id_to_content[doc_id] = {
: doc[],
: doc.get(, {}),
: (.id_to_content)
}
CHROMA_AVAILABLE .chroma_collection :
:
.chroma_collection.add(
embeddings=embeddings,
documents=contents,
metadatas=[doc.get(, {}) doc documents],
ids=[doc[] doc documents]
)
.logger.info()
Exception e:
.logger.error()
() -> RetrievalResponse:
start_time = datetime.now()
.initialized:
.initialize()
cache_key =
cache_key .cache:
.logger.debug()
.cache[cache_key]
query_analysis = nlp_processor.analyze_text(query)
query_embedding = nlp_processor.get_text_embedding(query)
results = []
vector_results = ._vector_search(query_embedding, query, top_k, threshold)
results.extend(vector_results)
keyword_results = ._keyword_search(query_analysis.keywords, top_k, threshold)
results.extend(keyword_results)
intent_results = ._intent_based_search(query_analysis.intent_result.intent, top_k)
results.extend(intent_results)
unique_results = ._deduplicate_and_sort(results)
final_results = unique_results[:top_k]
suggested_questions = ._generate_suggested_questions(query, query_analysis, final_results)
processing_time = (datetime.now() - start_time).total_seconds() *
response = RetrievalResponse(
query=query,
results=final_results,
suggested_questions=suggested_questions,
processing_time_ms=processing_time
)
._add_to_cache(cache_key, response)
response
() -> [SearchResult]:
results = []
FAISS_AVAILABLE .faiss_index :
:
query_vector = np.array([query_embedding]).astype()
distances, indices = .faiss_index.search(query_vector, top_k * )
i ((indices[])):
idx = indices[][i]
distance = distances[][i]
score = ((distance + ) / )
score >= threshold:
doc_id, doc_info .id_to_content.items():
doc_info[] == idx:
results.append(SearchResult(
=doc_id,
content=doc_info[],
score=score,
metadata=doc_info[],
source=
))
.logger.debug()
Exception e:
.logger.error()
CHROMA_AVAILABLE .chroma_collection :
:
chroma_results = .chroma_collection.query(
query_embeddings=[query_embedding.tolist()],
n_results=top_k,
include=[, , ]
)
chroma_results chroma_results[]:
i ((chroma_results[][])):
doc = chroma_results[][][i]
metadata = chroma_results[][][i]
distance = chroma_results[][][i]
score = ( - distance) distance
score >= threshold:
doc_id =
results.append(SearchResult(
=doc_id,
content=doc,
score=score,
metadata=metadata {},
source=
))
.logger.debug()
Exception e:
.logger.error()
results
() -> [SearchResult]:
results = []
keywords:
results
doc_id, doc_info .id_to_content.items():
content = doc_info[].lower()
metadata = doc_info[]
match_count =
keyword keywords:
keyword.lower() content:
match_count +=
match_count > :
score = match_count / (keywords)
score >= threshold:
results.append(SearchResult(
=doc_id,
content=doc_info[],
score=score,
metadata=metadata,
source=
))
results.sort(key= x: x.score, reverse=)
results[:top_k]
() -> [SearchResult]:
results = []
intent_categories = {
: [, , ],
: [, , , ],
: [, , , ],
: [, , , ],
: [, , , ]
}
categories = intent_categories.get(intent, [])
categories:
results
doc_id, doc_info .id_to_content.items():
metadata = doc_info[]
doc_categories = metadata.get(, [])
(doc_categories, ):
doc_categories = [doc_categories]
cat categories:
cat doc_categories:
results.append(SearchResult(
=doc_id,
content=doc_info[],
score=,
metadata=metadata,
source=
))
results[:top_k]
() -> [SearchResult]:
seen_contents = ()
unique_results = []
result results:
content_prefix = result.content[:]
content_prefix seen_contents:
seen_contents.add(content_prefix)
unique_results.append(result)
unique_results.sort(key= x: x.score, reverse=)
unique_results
() -> []:
suggestions = []
intent = query_analysis.intent_result.intent
intent_suggestions = {
: [, , , ],
: [, , , ],
: [, , , ],
: [, , , ]
}
intent intent_suggestions:
suggestions.extend(intent_suggestions[intent][:])
result search_results[:]:
content = result.content
sentences = content.split()[:]
sentence sentences:
sentence (sentence) > :
suggestions.append()
unique_suggestions = (.fromkeys(suggestions))
unique_suggestions[:]
() -> [[]]:
embeddings = []
text texts:
embedding = nlp_processor.get_text_embedding(text)
embeddings.append(embedding.tolist())
embeddings
():
(.cache) >= .cache_size:
oldest_key = ((.cache))
.cache[oldest_key]
.cache[key] = value
():
keys_to_remove = [k k .cache.keys() doc_id k]
key keys_to_remove:
.cache[key]
document = {: doc_id, : content, : metadata}
.add_documents([document])
():
keys_to_remove = [k k .cache.keys() doc_id k]
key keys_to_remove:
.cache[key]
CHROMA_AVAILABLE .chroma_collection :
:
.chroma_collection.delete(ids=[doc_id])
.logger.info()
Exception e:
.logger.error()
retrieval_engine = KnowledgeRetrievalEngine()
5. 对话管理系统
5.1 对话状态管理
创建 src/services/conversation/manager.py:
""" 对话管理器
负责对话状态管理、上下文维护和流程控制
"""
import asyncio
import json
import logging
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime, timedelta
from enum import Enum
import uuid
from src.core.config import settings
from src.models.base import Conversation, Message
from src.ai_components.nlp.processor import nlp_processor, NLPAnalysis
from src.ai_components.retrieval.engine import retrieval_engine, RetrievalResponse
class ConversationState(Enum):
"""对话状态枚举"""
ACTIVE = "active" # 活跃状态
PENDING = "pending" # 等待人工
TRANSFERRING = "transferring" # 转接中
CLOSED = "closed" # 已关闭
ESCALATED = "escalated" # 已升级
class HandlerType(Enum):
"""处理器类型枚举"""
AI = "ai" # AI 处理
AGENT = "agent" # 人工处理
HYBRID = "hybrid" # 混合处理
class ConversationContext:
():
.conversation_id = conversation_id
.customer_id = customer_id
.messages: [] = []
.state_history: [] = []
.intent_history: [] = []
.sentiment_history: [] = []
.metadata: [, ] = {}
.created_at = datetime.now()
.updated_at = datetime.now()
.message_count =
.ai_response_count =
.agent_response_count =
.transfer_count =
.escalation_count =
.last_message_time =
.last_ai_response_time =
.last_agent_response_time =
.cached_responses: [, ] = {}
():
.messages.append(message)
.message_count +=
.last_message_time = datetime.now()
.updated_at = datetime.now()
(.messages) > settings.MAX_CONVERSATION_HISTORY:
.messages = .messages[-settings.MAX_CONVERSATION_HISTORY:]
():
.state_history.append({
: datetime.now().isoformat(),
: old_state,
: new_state,
: reason
})
():
.intent_history.append({
: datetime.now().isoformat(),
: intent,
: confidence,
: text
})
():
.sentiment_history.append({
: datetime.now().isoformat(),
: sentiment,
: score
})
() -> []:
.messages[-count:] .messages []
() -> [, ]:
{
: .conversation_id,
: .customer_id,
: .message_count,
: .ai_response_count,
: .agent_response_count,
: .transfer_count,
: .escalation_count,
: .metadata.get(, ),
: .created_at.isoformat(),
: .updated_at.isoformat(),
: (datetime.now() - .created_at).total_seconds()
}
() -> :
.last_message_time:
timeout_minutes = settings.SESSION_TIMEOUT_MINUTES
timeout_delta = timedelta(minutes=timeout_minutes)
datetime.now() - .last_message_time > timeout_delta
() -> [, ]:
{
: .conversation_id,
: .customer_id,
: .messages,
: .state_history,
: .intent_history,
: .sentiment_history,
: .metadata,
: {
: .message_count,
: .ai_response_count,
: .agent_response_count,
: .transfer_count,
: .escalation_count
},
: {
: .created_at.isoformat(),
: .updated_at.isoformat(),
: .last_message_time.isoformat() .last_message_time
}
}
:
():
.logger = logging.getLogger(__name__)
.conversations: [, ConversationContext] = {}
.customer_conversations: [, []] = {}
.max_conversations =
.cleanup_interval =
asyncio.create_task(._cleanup_task())
() -> ConversationContext:
conversation_id = (uuid.uuid4())
context = ConversationContext(conversation_id, customer_id)
context.metadata.update({
: channel,
: ConversationState.ACTIVE.value,
: channel,
: metadata {}
})
.conversations[conversation_id] = context
customer_id .customer_conversations:
.customer_conversations[customer_id] = []
.customer_conversations[customer_id].append(conversation_id)
(.customer_conversations[customer_id]) > :
old_conv_id = .customer_conversations[customer_id].pop()
old_conv_id .conversations:
.conversations[old_conv_id]
.logger.info()
context
() -> [, ]:
start_time = datetime.now()
context = .conversations.get(conversation_id)
context:
ValueError()
context.last_message_time = datetime.now()
context.add_message(message)
text = message.get(, )
nlp_analysis = nlp_processor.analyze_text(text)
context.add_intent(
nlp_analysis.intent_result.intent,
nlp_analysis.intent_result.confidence,
text
)
context.add_sentiment(
nlp_analysis.sentiment_result.sentiment,
nlp_analysis.sentiment_result.score
)
handler_type = ._determine_handler_type(context, nlp_analysis, message)
handler_type == HandlerType.AI:
response = ._handle_with_ai(context, nlp_analysis, message)
context.ai_response_count +=
context.last_ai_response_time = datetime.now()
handler_type == HandlerType.AGENT:
response = ._handle_with_agent(context, nlp_analysis, message)
context.agent_response_count +=
context.last_agent_response_time = datetime.now()
handler_type == HandlerType.HYBRID:
response = ._handle_hybrid(context, nlp_analysis, message)
:
response = {: , : , : }
._update_conversation_state(context, response)
processing_time = (datetime.now() - start_time).total_seconds() *
result = {
: conversation_id,
: message.get(),
: handler_type.value,
: response,
: {
: nlp_analysis.intent_result.intent,
: nlp_analysis.intent_result.confidence,
: nlp_analysis.sentiment_result.sentiment,
: nlp_analysis.sentiment_result.score,
: nlp_analysis.entities,
: nlp_analysis.keywords
},
: processing_time,
: context.get_conversation_summary()
}
result
() -> HandlerType:
current_state = context.metadata.get()
current_state [ConversationState.PENDING.value, ConversationState.TRANSFERRING.value, ConversationState.ESCALATED.value]:
HandlerType.AGENT
text = message.get(, ).lower()
(keyword text keyword [, , , ]):
HandlerType.AGENT
nlp_analysis.sentiment_result.sentiment == :
sentiment_score = nlp_analysis.sentiment_result.score
sentiment_score > :
HandlerType.AGENT
sentiment_score > :
HandlerType.HYBRID
intent_confidence = nlp_analysis.intent_result.confidence
intent_name = nlp_analysis.intent_result.intent
intent_configs = {
: {: , : HandlerType.AGENT},
: {: , : HandlerType.HYBRID},
: {: , : HandlerType.HYBRID},
: {: , : HandlerType.AI},
: {: , : HandlerType.AI},
: {: , : HandlerType.AI}
}
config = intent_configs.get(intent_name, intent_configs[])
intent_confidence < config[]:
recent_messages = context.get_recent_messages()
(recent_messages) >= :
HandlerType.AI
:
HandlerType.HYBRID
context.message_count > :
HandlerType.HYBRID
context.transfer_count > :
HandlerType.AGENT
config[]
() -> [, ]:
:
text = message.get(, )
retrieval_result = retrieval_engine.search(text)
prompt = ._build_ai_prompt(context, text, nlp_analysis, retrieval_result)
ai_response = ._generate_ai_response(prompt)
response = {
: ,
: ai_response,
: nlp_analysis.intent_result.confidence,
: [r.content[:] r retrieval_result.results[:]],
: retrieval_result.suggested_questions,
:
}
response
Exception e:
.logger.error()
{: , : , : }
() -> [, ]:
:
available_agents = ._find_available_agents(context, nlp_analysis)
available_agents:
assigned_agent = ._assign_agent(context, available_agents[])
response = {
: ,
: ,
: assigned_agent[],
: assigned_agent[],
: ,
:
}
:
response = {
: ,
: ,
: ._get_queue_position(context),
: ,
:
}
response
Exception e:
.logger.error()
{: , : , : }
() -> [, ]:
:
text = message.get(, )
retrieval_result = retrieval_engine.search(text)
prompt = ._build_ai_prompt(context, text, nlp_analysis, retrieval_result)
ai_response = ._generate_ai_response(prompt)
available_agents = ._find_available_agents(context, nlp_analysis)
response = {
: ,
: ai_response,
: ai_response,
: ,
: (available_agents) > ,
: nlp_analysis.intent_result.confidence,
: [r.content[:] r retrieval_result.results[:]],
:
}
available_agents:
response[] =
response
Exception e:
.logger.error()
._handle_with_ai(context, nlp_analysis, message)
() -> :
recent_messages = context.get_recent_messages()
prompt =
msg recent_messages[-:]:
sender = msg.get() ==
prompt +=
prompt +=
retrieval_result.results:
prompt +=
i, result (retrieval_result.results[:], ):
prompt +=
prompt +=
prompt
() -> :
prompt prompt:
prompt prompt:
prompt prompt:
:
() -> []:
skills_needed = []
nlp_analysis.intent_result.intent == :
skills_needed = [, ]
nlp_analysis.intent_result.intent == :
skills_needed = [, ]
mock_agents = [
{: , : , : [, , ], : , : , : },
{: , : , : [, , ], : , : , : },
{: , : , : [, , ], : , : , : }
]
available_agents = []
agent mock_agents:
(agent[] == agent[] < agent[]):
skills_needed:
(skill agent[] skill skills_needed):
available_agents.append(agent)
:
available_agents.append(agent)
available_agents
() -> :
context.metadata[] = agent[]
context.metadata[] = agent[]
context.transfer_count +=
agent
() -> :
():
old_state = context.metadata.get(, ConversationState.ACTIVE.value)
new_state = old_state
response_type = response.get(, )
response_type == :
new_state = ConversationState.TRANSFERRING.value
response_type == :
new_state = ConversationState.PENDING.value
response_type.lower():
new_state = ConversationState.ESCALATED.value
new_state != old_state:
context.metadata[] = new_state
context.add_state_change(old_state, new_state, response_type)
.logger.info()
() -> [ConversationContext]:
.conversations.get(conversation_id)
():
context = .conversations.get(conversation_id)
context:
old_state = context.metadata.get(, ConversationState.ACTIVE.value)
context.metadata[] = ConversationState.CLOSED.value
context.add_state_change(old_state, ConversationState.CLOSED.value, reason)
context.metadata[] = reason
context.metadata[] = datetime.now().isoformat()
.logger.info()
():
:
:
asyncio.sleep(.cleanup_interval)
current_time = datetime.now()
expired_conversations = []
conv_id, context .conversations.items():
context.is_timed_out():
expired_conversations.append(conv_id)
conv_id expired_conversations:
.close_conversation(conv_id, )
.conversations[conv_id]
expired_conversations:
.logger.info()
(.conversations) > .max_conversations:
excess = (.conversations) - .max_conversations
old_ids = (.conversations.keys())[:excess]
conv_id old_ids:
.close_conversation(conv_id, )
.conversations[conv_id]
.logger.info()
Exception e:
.logger.error()
conversation_manager = ConversationManager()
6. API 服务实现
6.1 主 API 服务
创建 src/api/main.py:
""" 主 API 服务
提供 RESTful API 接口
"""
import logging
from typing import List, Optional
from fastapi import FastAPI, HTTPException, Depends, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from src.core.config import settings
from src.api.endpoints import conversations, customers, agents, knowledge, analytics
# 配置日志
logging.basicConfig(
level=getattr(logging, settings.LOG_LEVEL),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 创建 FastAPI 应用
app = FastAPI(
title=settings.PROJECT_NAME,
version=settings.APP_VERSION,
openapi_url=f"{settings.API_V1_STR}/openapi.json",
docs_url="/docs" if settings.DEBUG else None,
redoc_url="/redoc" if settings.DEBUG else None
)
# 安全验证
security = HTTPBearer()
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""验证访问令牌"""
# 这里应该实现 JWT 验证逻辑
token = credentials.credentials
token token != :
HTTPException(
status_code=,
detail=,
headers={: }
)
{: }
app.add_middleware(
CORSMiddleware,
allow_origins=settings.BACKEND_CORS_ORIGINS,
allow_credentials=,
allow_methods=[],
allow_headers=[],
)
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=[] settings.DEBUG settings.BACKEND_CORS_ORIGINS
)
():
JSONResponse(
status_code=exc.status_code,
content={: {: exc.status_code, : exc.detail, : (exc, , )}}
)
():
logger.error(, exc_info=)
JSONResponse(
status_code=,
content={: {: , : , : (exc) settings.DEBUG }}
)
app.include_router(
conversations.router,
prefix=,
tags=[],
dependencies=[Depends(verify_token)]
)
app.include_router(
customers.router,
prefix=,
tags=[],
dependencies=[Depends(verify_token)]
)
app.include_router(
agents.router,
prefix=,
tags=[],
dependencies=[Depends(verify_token)]
)
app.include_router(
knowledge.router,
prefix=,
tags=[],
dependencies=[Depends(verify_token)]
)
app.include_router(
analytics.router,
prefix=,
tags=[],
dependencies=[Depends(verify_token)]
)
():
websocket.accept()
:
token token != :
websocket.send_json({: , : })
websocket.close()
:
data = websocket.receive_json()
response = {
: ,
: conversation_id,
: data.get(),
: data.get()
}
websocket.send_json(response)
WebSocketDisconnect:
logger.info()
Exception e:
logger.error()
websocket.close()
():
{: , : settings.PROJECT_NAME, : settings.APP_VERSION, : settings.ENVIRONMENT}
():
{: , : settings.APP_VERSION, : settings.DEBUG , : settings.ENVIRONMENT}
():
logger.info()
logger.info()
logger.info()
:
src.ai_components.nlp.processor nlp_processor
src.ai_components.retrieval.engine retrieval_engine
nlp_processor.initialize()
retrieval_engine.initialize()
logger.info()
Exception e:
logger.error()
():
logger.info()
__name__ == :
uvicorn
uvicorn.run(, host=, port=, reload=settings.DEBUG, log_level=settings.LOG_LEVEL.lower())
6.2 对话 API 端点
创建 src/api/endpoints/conversations.py:
""" 对话 API 端点
处理对话相关的 HTTP 请求
"""
import logging
from typing import List, Optional, Dict, Any
from datetime import datetime
from fastapi import APIRouter, HTTPException, Depends, WebSocket
from pydantic import BaseModel, Field
import uuid
from src.core.config import settings
from src.services.conversation.manager import conversation_manager, ConversationContext
logger = logging.getLogger(__name__)
router = APIRouter()
# 数据模型
class MessageRequest(BaseModel):
"""消息请求"""
content: str = Field(..., min_length=1, max_length=2000)
content_type: str = Field(default="text")
metadata: Optional[Dict[str, Any]] = Field(default=None)
customer_id: Optional[str] = Field(default=None)
channel: str = Field(default="web")
class ConversationCreateRequest(BaseModel):
"""创建对话请求"""
customer_id: str
channel: str = Field(default="web")
metadata: Optional[Dict[, ]] = Field(default=)
initial_message: [] = Field(default=)
():
conversation_id:
customer_id:
channel:
status:
created_at:
updated_at:
message_count:
metadata: [, ]
summary: [[, ]] =
():
message_id:
conversation_id:
sender_type:
sender_id: []
content:
content_type:
created_at:
metadata: [, ]
intent: [] =
confidence: [] =
():
conversation_id:
message_id:
handler_type:
response: [, ]
analysis: [, ]
processing_time_ms:
context_summary: [, ]
():
:
logger.info()
context = conversation_manager.create_conversation(
customer_id=request.customer_id,
channel=request.channel,
metadata=request.metadata
)
request.initial_message:
message = {
: (uuid.uuid4()),
: request.initial_message,
: ,
: ,
: request.customer_id,
: request.metadata {},
: datetime.now().isoformat()
}
conversation_manager.process_message(
context.conversation_id,
message
)
response = ConversationResponse(
conversation_id=context.conversation_id,
customer_id=context.customer_id,
channel=context.metadata.get(, request.channel),
status=context.metadata.get(, ),
created_at=context.created_at.isoformat(),
updated_at=context.updated_at.isoformat(),
message_count=context.message_count,
metadata=context.metadata,
summary=context.get_conversation_summary()
)
response
Exception e:
logger.error()
HTTPException(status_code=, detail=)
():
:
logger.info()
message_id = (uuid.uuid4())
message = {
: message_id,
: request.content,
: request.content_type,
: ,
: request.customer_id,
: request.metadata {},
: datetime.now().isoformat()
}
result = conversation_manager.process_message(
conversation_id,
message
)
response = ProcessMessageResponse(
conversation_id=result[],
message_id=result[],
handler_type=result[],
response=result[],
analysis=result[],
processing_time_ms=result[],
context_summary=result[]
)
response
ValueError e:
logger.error()
HTTPException(status_code=, detail=(e))
Exception e:
logger.error()
HTTPException(status_code=, detail=)
():
:
context = conversation_manager.get_conversation(conversation_id)
context:
HTTPException(status_code=, detail=)
response = ConversationResponse(
conversation_id=context.conversation_id,
customer_id=context.customer_id,
channel=context.metadata.get(, ),
status=context.metadata.get(, ),
created_at=context.created_at.isoformat(),
updated_at=context.updated_at.isoformat(),
message_count=context.message_count,
metadata=context.metadata,
summary=context.get_conversation_summary()
)
response
HTTPException:
Exception e:
logger.error()
HTTPException(status_code=, detail=)
():
:
context = conversation_manager.get_conversation(conversation_id)
context:
HTTPException(status_code=, detail=)
messages = context.get_recent_messages(limit + offset)
messages = messages[offset:offset + limit]
response_messages = []
msg messages:
response_messages.append(MessageResponse(
message_id=msg.get(, (uuid.uuid4())),
conversation_id=conversation_id,
sender_type=msg.get(, ),
sender_id=msg.get(),
content=msg.get(, ),
content_type=msg.get(, ),
created_at=msg.get(, datetime.now().isoformat()),
metadata=msg.get(, {}),
intent=msg.get(),
confidence=msg.get()
))
response_messages
HTTPException:
Exception e:
logger.error()
HTTPException(status_code=, detail=)
():
:
conversation_manager.close_conversation(conversation_id, reason)
{
: ,
: conversation_id,
: reason,
: datetime.now().isoformat()
}
Exception e:
logger.error()
HTTPException(status_code=, detail=)
():
:
[]
Exception e:
logger.error()
HTTPException(status_code=, detail=)
():
:
context = conversation_manager.get_conversation(conversation_id)
context:
HTTPException(status_code=, detail=)
message_id = (uuid.uuid4())
message = {
: message_id,
: ,
: ,
: ,
: {: , : agent_id},
: datetime.now().isoformat()
}
result = conversation_manager.process_message(
conversation_id,
message
)
{
: ,
: conversation_id,
: ,
: result
}
HTTPException:
Exception e:
logger.error()
HTTPException(status_code=, detail=)
():
websocket.accept()
:
context = conversation_manager.get_conversation(conversation_id)
context:
websocket.send_json({: , : })
websocket.close()
token token != :
websocket.send_json({: , : })
websocket.close()
websocket.send_json({
: ,
: conversation_id,
: context.customer_id,
: ,
: datetime.now().isoformat()
})
:
data = websocket.receive_json()
message_type = data.get()
message_type == :
content = data.get(, )
sender_type = data.get(, )
message = {
: (uuid.uuid4()),
: content,
: ,
: sender_type,
: data.get(),
: data.get(, {}),
: datetime.now().isoformat()
}
context.add_message(message)
sender_type == :
result = conversation_manager.process_message(
conversation_id,
message
)
websocket.send_json({: , : result})
:
websocket.send_json({: , : message})
message_type == :
websocket.send_json({: , : datetime.now().isoformat()})
message_type == :
websocket.close()
:
websocket.send_json({: , : })
WebSocketDisconnect:
logger.info()
Exception e:
logger.error()
:
websocket.send_json({: , : })
websocket.close()
:
7. 坐席桌面应用(简化前端)
7.1 坐席界面 HTML 模板
创建 src/static/agent_dashboard.html:
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AI-人工客服坐席桌面</title>
<link href="https://cdn.jsdelivr.net/npm/[email protected]/dist/tailwind.min.css" rel="stylesheet">
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css">
<style>
.conversation-active { border-left: 4px solid #10B981; }
.conversation-pending { border-left: 4px solid #F59E0B; }
.conversation-transferring { border-left: 4px solid #3B82F6; }
.message-customer { background-color: #E5E7EB; align-self: flex-start; }
.message-agent { background-color: ; : white; : flex-end; }
{ : ; : white; : flex-end; }
{ : inline-block; : ; : ; : ; : ; : ; : typing infinite ease-in-out both; }
() { : -; }
() { : -; }
typing { , , { : (); } { : (); } }
A
坐席:张客服
离线
在线
离开
离线
0
进行中
0
今日总计
0
等待中
0s
平均响应
进行中对话
0
暂无进行中的对话
快捷回复
问候语
请稍等
结束语
未选择对话
-
-
AI 协助
转接
结束
客户姓名-
联系电话-
电子邮件-
历史记录
0 次对话 | 最后联系:-
选择左侧的对话开始聊天
AI 已自动处理常规问题,需要人工介入的对话会显示在这里
AI 建议回复:
好的,我明白了
请提供更多细节
我帮您查询一下
0/1000
客户正在输入...
连接正常
AI 分析面板
情感分析
-
负面
中性
正面
暂无数据
意图识别
暂无数据
相关知识
-
暂无相关建议
对话摘要
暂无摘要
AI 建议
等待分析...
应用建议
8. 部署与运维
8.1 Docker 部署配置
创建 Dockerfile:
# 使用 Python 官方镜像
FROM python:3.10-slim
# 设置工作目录
WORKDIR /app
# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV DEBIAN_FRONTEND=noninteractive
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
build-essential \
curl \
git \
&& rm -rf /var/lib/apt/lists/*
# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# 复制项目文件
COPY . .
# 创建非 root 用户
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000"]
创建 docker-compose.yml:
version: '3.8'
services:
# 数据库服务
postgres:
image: postgres:15
environment:
POSTGRES_USER: ${POSTGRES_USER:-postgres}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-password}
POSTGRES_DB: ${POSTGRES_DB:-customer_service}
volumes:
- postgres_data:/var/lib/postgresql/data
- ./scripts/init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 10s
timeout: 5s
retries: 5
networks:
- ai-customer-service
# Redis 服务
redis:
image: redis:7-alpine
command: redis-server --requirepass ${REDIS_PASSWORD:-password}
volumes:
- redis_data:/data
ports:
- "6379:6379"
healthcheck:
[, , ]
创建 scripts/init.sql 数据库初始化脚本:
-- 创建扩展
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "pgvector";
-- 创建表(SQLAlchemy 会自动创建,这里提供备份脚本)
-- 注意:实际表结构由 SQLAlchemy 模型定义
8.2 监控与日志配置
创建 src/core/monitoring.py:
""" 监控配置
配置指标收集、日志记录和性能监控
"""
import logging
import time
from typing import Dict, Any, Optional
from contextlib import contextmanager
from datetime import datetime
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge, Summary
class MetricsCollector:
"""指标收集器"""
def __init__(self):
self.request_count = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
self.request_duration = Histogram('http_request_duration_seconds', 'HTTP request duration in seconds', ['method', 'endpoint'])
self.conversation_count = Gauge('conversations_active', 'Number of active conversations')
self.message_count = Counter('messages_processed_total', 'Total messages processed', ['handler_type', 'intent'])
self.ai_response_time = Summary('ai_response_time_seconds', 'AI response time in seconds')
self.error_count = Counter('errors_total', 'Total errors', ['error_type', ])
.customer_satisfaction = Gauge(, )
.agent_performance = Gauge(, , [])
.ai_confidence = Histogram(, , buckets=[, , , , , ])
():
.request_count.labels(method, endpoint, status).inc()
.request_duration.labels(method, endpoint).observe(duration)
():
.message_count.labels(handler_type, intent).inc()
():
.ai_response_time.observe(duration)
():
.error_count.labels(error_type, component).inc()
():
.conversation_count.(count)
():
.customer_satisfaction.(score)
():
.agent_performance.labels(agent_id).(score)
():
.ai_confidence.observe(confidence)
:
():
.metrics = MetricsCollector()
.logger = logging.getLogger(__name__)
():
start_time = time.time()
status =
:
Exception e:
status =
.metrics.record_error((e).__name__, )
:
duration = time.time() - start_time
.metrics.record_request(method, endpoint, status, duration)
():
start_time = time.time()
:
:
duration = time.time() - start_time
.metrics.record_ai_response_time(duration)
monitor = PerformanceMonitor()
():
log_format =
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(log_format))
file_handler = logging.FileHandler()
file_handler.setFormatter(logging.Formatter(log_format))
logging.basicConfig(
level=logging.INFO,
handlers=[console_handler, file_handler],
=log_format
)
logging.getLogger().setLevel(logging.WARNING)
logging.getLogger().setLevel(logging.WARNING)
logging.getLogger().setLevel(logging.WARNING)
():
prometheus_client start_http_server
:
start_http_server(port)
logging.info()
Exception e:
logging.error()
8.3 测试脚本
创建 tests/test_integration.py:
""" 集成测试
测试系统各个组件的集成
"""
import asyncio
import pytest
import json
from datetime import datetime
from typing import Dict, Any
from src.services.conversation.manager import ConversationManager
from src.ai_components.nlp.processor import NLPProcessor
from src.ai_components.retrieval.engine import KnowledgeRetrievalEngine
class TestIntegration:
"""集成测试类"""
@pytest.fixture(autouse=True)
async def setup(self):
"""测试设置"""
self.conversation_manager = ConversationManager()
self.nlp_processor = NLPProcessor()
self.retrieval_engine = KnowledgeRetrievalEngine()
# 初始化组件
await self.nlp_processor.initialize()
await self.retrieval_engine.initialize()
yield
# 清理
self.conversation_manager.conversations.clear()
@pytest.mark.asyncio
async def test_end_to_end_conversation(self):
"""测试端到端对话流程"""
# 1. 创建对话
context = await self.conversation_manager.create_conversation(
customer_id=,
channel=,
metadata={: }
)
context.conversation_id
context.customer_id ==
message = {
: ,
: ,
: ,
: ,
: ,
: {},
: datetime.now().isoformat()
}
result = .conversation_manager.process_message(
context.conversation_id,
message
)
result[] == context.conversation_id
result[] [, , ]
result
result
nlp_analysis = result[]
nlp_analysis
nlp_analysis
nlp_analysis
retrieval_result = .retrieval_engine.search()
retrieval_result.query ==
(retrieval_result.results, )
(retrieval_result.processing_time_ms, )
.conversation_manager.close_conversation(
context.conversation_id,
)
closed_context = .conversation_manager.get_conversation(
context.conversation_id
)
closed_context.metadata[] ==
closed_context.metadata[] ==
():
test_cases = [
{: , : , : },
{: , : , : },
{: , : , : }
]
test_case test_cases:
analysis = .nlp_processor.analyze_text(test_case[])
analysis.text == test_case[]
analysis.intent_result.intent == test_case[]
analysis.intent_result.confidence >= test_case[]
analysis.sentiment_result.sentiment [, , ]
analysis.processing_time_ms >
():
test_documents = [
{: , : , : {: , : }},
{: , : , : {: , : }}
]
.retrieval_engine.add_documents(test_documents)
queries = [(, ), (, ), (, )]
query, expected_min_results queries:
result = .retrieval_engine.search(query)
result.query == query
(result.results) >= expected_min_results
search_result result.results:
search_result.
search_result.content
<= search_result.score <=
(search_result.metadata, )
():
conversations = []
i ():
context = .conversation_manager.create_conversation(
customer_id=,
channel=
)
conversations.append(context)
(.conversation_manager.conversations) ==
conv conversations:
.conversation_manager.close_conversation(
conv.conversation_id,
)
closed_conv = .conversation_manager.get_conversation(
conv.conversation_id
)
closed_conv.metadata[] ==
():
pytest.raises(ValueError):
.conversation_manager.process_message(, {: })
context = .conversation_manager.create_conversation(
customer_id=,
channel=
)
empty_message = {: , : , : }
result = .conversation_manager.process_message(
context.conversation_id,
empty_message
)
result
result[] ==
__name__ == :
asyncio
():
tester = TestIntegration()
tester.setup()
()
tester.test_end_to_end_conversation()
()
tester.test_nlp_processing()
()
tester.test_knowledge_retrieval()
()
tester.test_conversation_state_management()
()
tester.test_error_handling()
()
asyncio.run(run_tests())
9. 项目总结与扩展建议
9.1 项目总结
通过以上实现,我们构建了一个完整的 AI 智能客服与坐席客服融合系统,具有以下特点:
- 模块化架构:系统采用清晰的微服务架构,各组件职责分明,易于维护和扩展。
- 智能对话管理:实现了基于意图识别、情感分析和上下文理解的智能对话路由。
- 人机无缝协作:AI 与人工坐席可以平滑切换,AI 提供实时辅助建议。
- 知识驱动响应:基于向量检索的知识库系统,提供准确、及时的响应。
- 实时通信:支持 WebSocket 实时通信,提供流畅的用户体验。
- 完整的监控运维:包含日志记录、性能监控和健康检查。
9.2 扩展建议
- 高级 AI 模型集成:
- 集成 GPT-4、Claude 等大语言模型,提升对话质量
- 实现多模态理解(图像、语音、视频)
- 添加语音合成(TTS)和语音识别(ASR)功能
- 高级功能扩展:
- 实时翻译支持多语言客服
- 情感语音合成,让 AI 回复更有情感
- 预测性分析,提前发现客户需求
- 自动化工作流程,集成 CRM 系统
- 性能优化:
- 实现模型量化,减少内存占用
- 添加缓存层,提高响应速度
- 实现负载均衡和水平扩展
- 安全增强:
- 添加端到端加密
- 实现更严格的访问控制
- 添加审计日志和合规性检查
- 部署优化:
- Kubernetes 编排,实现自动伸缩
- CI/CD 流水线,自动化部署
- 多区域部署,提高可用性
9.3 快速启动指南
- 环境准备:
# 克隆项目
git clone <repository-url>
cd ai-human-customer-service
# 安装依赖
pip install -r requirements.txt
# 创建环境文件
python -c "from src.core.config import create_env_file; create_env_file()"
# 编辑.env 文件,配置数据库等
- 启动服务:
# 使用 Docker Compose(推荐)
docker-compose up -d
# 或直接运行
python src/api/main.py
- 访问服务:
- API 文档:http://localhost:8000/docs
- 坐席界面:http://localhost:3000
- 监控面板:http://localhost:3001 (如果启用)
- 运行测试:
pytest tests/ -v
这个系统提供了一个完整的 AI 客服解决方案基础框架,可以根据具体需求进行定制和扩展。系统设计考虑了生产环境的可用性、可扩展性和可维护性,是一个可以立即投入使用的企业级解决方案。


