AI赋能智慧客服与人工客服融合系统企业级方案

文章目录

1. 项目概述与架构设计
1.1 系统核心设计理念
本系统采用 “AI优先,人工兜底” 的混合交互模式,构建一个能够自动处理80%常规查询,同时无缝转接复杂问题给人工坐席的智能客服系统。系统基于微服务架构,确保高可用性、可扩展性和模块化设计。
1.2 整体架构图
┌─────────────────────────────────────────────────────────────┐ │ 客户端层 (多渠道接入) │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ Web聊天 │ │ 移动APP │ │ 微信 │ │ 电话接口│ │ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ └────────────────────────┬────────────────────────────────────┘ │ HTTPS/WebSocket ┌─────────────────────────────────────────────────────────────┐ │ API网关层 (统一接入) │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 认证授权 │ 流量控制 │ 协议转换 │ 请求路由 │ 负载均衡 │ │ │ └─────────────────────────────────────────────────────┘ │ └────────────────────────┬────────────────────────────────────┘ │ 内部RPC/gRPC ┌─────────────────────────────────────────────────────────────┐ │ 业务逻辑层 (核心服务) │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │对话管理 │ │意图识别 │ │知识检索 │ │坐席协作 │ │ │ │ 服务 │ │ 服务 │ │ 服务 │ │ 服务 │ │ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ └────────────────────────┬────────────────────────────────────┘ │ 消息队列/数据库访问 ┌─────────────────────────────────────────────────────────────┐ │ 数据与AI层 (能力支撑) │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │向量数据库│ │关系数据库│ │缓存服务 │ │AI模型 │ │ │ │ (FAISS/ │ │(PostgreSQL│ │(Redis) │ │服务 │ │ │ │Chroma) │ │ /MySQL) │ │ │ │ │ │ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ └─────────────────────────────────────────────────────────────┘ 1.3 技术栈选择
| 组件 | 技术选择 | 理由 |
|---|---|---|
| 后端框架 | FastAPI + Python 3.10+ | 异步高性能,自动API文档生成 |
| 对话引擎 | Rasa 3.x + 自定义扩展 | 开源NLU框架,灵活可控 |
| AI模型 | Sentence Transformers, BERT, GPT-2/3 | 平衡性能与效果 |
| 向量数据库 | FAISS + PostgreSQL (pgvector) | 高性能相似度检索 |
| 实时通信 | WebSocket + Redis Pub/Sub | 低延迟消息传递 |
| 前端框架 | React + TypeScript + TailwindCSS | 现代化界面,组件化 |
| 坐席桌面 | Electron + React | 跨平台桌面应用 |
| 部署 | Docker + Kubernetes | 容器化,易于扩展 |
2. 环境搭建与项目初始化
2.1 开发环境配置
# 1. 创建项目目录结构mkdir -p ai-human-customer-service cd ai-human-customer-service # 2. 创建Python虚拟环境 python3.10 -m venv venv source venv/bin/activate # Linux/Mac# venv\Scripts\activate # Windows# 3. 安装基础依赖 pip install --upgrade pip pip installfastapi==0.104.1 pip install uvicorn[standard]==0.24.0 pip installsqlalchemy==2.0.23 pip installasyncpg==0.29.0 pip installredis==5.0.1 pip installpydantic==2.5.0 pip install pydantic-settings==2.1.0 pip install python-multipart==0.0.6 pip installwebsockets==12.0 pip installaiohttp==3.9.1 pip installjwt==1.3.1 pip install python-dateutil==2.8.2 pip installloguru==0.7.2 # 4. 创建项目结构mkdir -p {src,tests,docs,scripts,deploy}mkdir -p src/{api,core,models,schemas,services,utils,ai_components}mkdir -p src/api/{endpoints,middleware}mkdir -p src/core/{config,database,security}mkdir -p src/ai_components/{nlp,retrieval,models}mkdir -p tests/{unit,integration}2.2 配置文件设计
创建 src/core/config.py:
""" 系统配置管理模块 使用pydantic-settings进行配置管理,支持环境变量覆盖 """from typing import Optional, List, Dict, Any from pydantic_settings import BaseSettings from pydantic import Field, validator import secrets classSettings(BaseSettings):"""应用配置类"""# 应用基础配置 APP_NAME:str="AI-Human Customer Service" APP_VERSION:str="1.0.0" DEBUG:bool=False ENVIRONMENT:str="development"# development, staging, production# API配置 API_V1_STR:str="/api/v1" PROJECT_NAME:str="AI Human Customer Service" BACKEND_CORS_ORIGINS: List[str]=["http://localhost:3000"]# 安全配置 SECRET_KEY:str= Field(default_factory=lambda: secrets.token_urlsafe(32)) ALGORITHM:str="HS256" ACCESS_TOKEN_EXPIRE_MINUTES:int=60*24*7# 7天# 数据库配置 POSTGRES_SERVER:str="localhost" POSTGRES_USER:str="postgres" POSTGRES_PASSWORD:str="password" POSTGRES_DB:str="customer_service" POSTGRES_PORT:str="5432" DATABASE_URL: Optional[str]=None@validator("DATABASE_URL", pre=True)defassemble_db_connection(cls, v: Optional[str], values: Dict[str, Any])-> Any:"""构建数据库连接URL"""ifisinstance(v,str):return v return(f"postgresql+asyncpg://{values.get('POSTGRES_USER')}:"f"{values.get('POSTGRES_PASSWORD')}@{values.get('POSTGRES_SERVER')}:"f"{values.get('POSTGRES_PORT')}/{values.get('POSTGRES_DB')}")# Redis配置 REDIS_HOST:str="localhost" REDIS_PORT:int=6379 REDIS_DB:int=0 REDIS_PASSWORD: Optional[str]=None REDIS_URL: Optional[str]=None@validator("REDIS_URL", pre=True)defassemble_redis_connection(cls, v: Optional[str], values: Dict[str, Any])-> Any:"""构建Redis连接URL"""ifisinstance(v,str):return v password = values.get("REDIS_PASSWORD") auth_part =f":{password}@"if password else""return(f"redis://{auth_part}{values.get('REDIS_HOST')}:"f"{values.get('REDIS_PORT')}/{values.get('REDIS_DB')}")# AI模型配置 AI_MODEL_PATH:str="./models" SENTENCE_TRANSFORMER_MODEL:str="all-MiniLM-L6-v2" BERT_MODEL_PATH:str="bert-base-uncased" GPT_MODEL_PATH:str="gpt2" USE_GPU:bool=True AI_MODEL_CACHE_SIZE:int=100# 对话管理配置 MAX_CONVERSATION_HISTORY:int=20 SESSION_TIMEOUT_MINUTES:int=30 DEFAULT_AI_CONFIDENCE_THRESHOLD:float=0.7# 坐席配置 MAX_AGENTS_PER_QUEUE:int=10 AGENT_HEARTBEAT_INTERVAL:int=30# 秒 AGENT_AWAY_TIMEOUT:int=300# 秒# 消息队列配置 RABBITMQ_HOST:str="localhost" RABBITMQ_PORT:int=5672 RABBITMQ_USER:str="guest" RABBITMQ_PASSWORD:str="guest"# 监控配置 ENABLE_METRICS:bool=True METRICS_PORT:int=9090 LOG_LEVEL:str="INFO"classConfig:"""Pydantic配置""" env_file =".env" case_sensitive =True extra ="ignore" settings = Settings()# 创建环境文件模板 ENV_TEMPLATE ="""# 应用配置 ENVIRONMENT=development DEBUG=True # 数据库配置 POSTGRES_SERVER=localhost POSTGRES_USER=postgres POSTGRES_PASSWORD=your_password POSTGRES_DB=customer_service POSTGRES_PORT=5432 # Redis配置 REDIS_HOST=localhost REDIS_PORT=6379 REDIS_DB=0 # AI模型配置 SENTENCE_TRANSFORMER_MODEL=all-MiniLM-L6-v2 USE_GPU=False # 安全配置 SECRET_KEY={secret_key} """# 生成.env文件defcreate_env_file():"""创建环境变量文件"""withopen(".env","w")as f: secret_key = secrets.token_urlsafe(32) f.write(ENV_TEMPLATE.format(secret_key=secret_key))print("✅ .env文件创建成功")if __name__ =="__main__": create_env_file()3. 核心数据模型设计
3.1 数据库模型定义
创建 src/models/base.py:
""" 数据库基础模型定义 使用SQLAlchemy ORM进行数据建模 """from datetime import datetime from typing import Any, Dict, Optional from sqlalchemy import Column, DateTime, Integer, String, Boolean, Text, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.sql import func Base = declarative_base()classTimestampMixin:"""时间戳混入类""" created_at = Column(DateTime, default=func.now(), nullable=False) updated_at = Column(DateTime, default=func.now(), onupdate=func.now(), nullable=False) deleted_at = Column(DateTime, nullable=True)classCustomer(Base, TimestampMixin):"""客户模型""" __tablename__ ="customers"id= Column(String(36), primary_key=True, index=True) external_id = Column(String(100), unique=True, nullable=True, index=True) name = Column(String(200), nullable=True) email = Column(String(254), nullable=True, index=True) phone = Column(String(50), nullable=True, index=True) metadata = Column(JSON, nullable=True, default=dict) tags = Column(JSON, nullable=True, default=list) segment = Column(String(50), nullable=True) lifetime_value = Column(Integer, default=0) last_interaction_at = Column(DateTime, nullable=True) is_active = Column(Boolean, default=True) notes = Column(Text, nullable=True)defto_dict(self)-> Dict[str, Any]:"""转换为字典"""return{"id": self.id,"external_id": self.external_id,"name": self.name,"email": self.email,"phone": self.phone,"metadata": self.metadata or{},"tags": self.tags or[],"segment": self.segment,"lifetime_value": self.lifetime_value,"last_interaction_at": self.last_interaction_at.isoformat()if self.last_interaction_at elseNone,"is_active": self.is_active,"notes": self.notes,"created_at": self.created_at.isoformat(),"updated_at": self.updated_at.isoformat()}classConversation(Base, TimestampMixin):"""对话模型""" __tablename__ ="conversations"id= Column(String(36), primary_key=True, index=True) customer_id = Column(String(36), index=True, nullable=False) channel = Column(String(50), nullable=False)# web, mobile, wechat, etc. status = Column(String(20), default="active")# active, pending, closed, transferred assigned_agent_id = Column(String(36), nullable=True, index=True) assigned_ai_model = Column(String(100), nullable=True) metadata = Column(JSON, nullable=True, default=dict) sentiment_score = Column(Integer, nullable=True)# -100 to 100 satisfaction_score = Column(Integer, nullable=True)# 0-100 closed_at = Column(DateTime, nullable=True) close_reason = Column(String(100), nullable=True)defto_dict(self)-> Dict[str, Any]:"""转换为字典"""return{"id": self.id,"customer_id": self.customer_id,"channel": self.channel,"status": self.status,"assigned_agent_id": self.assigned_agent_id,"assigned_ai_model": self.assigned_ai_model,"metadata": self.metadata or{},"sentiment_score": self.sentiment_score,"satisfaction_score": self.satisfaction_score,"closed_at": self.closed_at.isoformat()if self.closed_at elseNone,"close_reason": self.close_reason,"created_at": self.created_at.isoformat(),"updated_at": self.updated_at.isoformat()}classMessage(Base, TimestampMixin):"""消息模型""" __tablename__ ="messages"id= Column(String(36), primary_key=True, index=True) conversation_id = Column(String(36), index=True, nullable=False) sender_type = Column(String(20), nullable=False)# customer, agent, ai sender_id = Column(String(36), nullable=True) content = Column(Text, nullable=False) content_type = Column(String(20), default="text")# text, image, file, etc. metadata = Column(JSON, nullable=True, default=dict) intent = Column(String(100), nullable=True) confidence = Column(Integer, nullable=True)# 0-100 is_read = Column(Boolean, default=False) read_at = Column(DateTime, nullable=True)# AI相关字段 ai_model_used = Column(String(100), nullable=True) ai_confidence = Column(Integer, nullable=True) ai_metadata = Column(JSON, nullable=True, default=dict)defto_dict(self)-> Dict[str, Any]:"""转换为字典"""return{"id": self.id,"conversation_id": self.conversation_id,"sender_type": self.sender_type,"sender_id": self.sender_id,"content": self.content,"content_type": self.content_type,"metadata": self.metadata or{},"intent": self.intent,"confidence": self.confidence,"is_read": self.is_read,"read_at": self.read_at.isoformat()if self.read_at elseNone,"ai_model_used": self.ai_model_used,"ai_confidence": self.ai_confidence,"ai_metadata": self.ai_metadata or{},"created_at": self.created_at.isoformat()}classAgent(Base, TimestampMixin):"""坐席模型""" __tablename__ ="agents"id= Column(String(36), primary_key=True, index=True) user_id = Column(String(36), unique=True, nullable=False) name = Column(String(200), nullable=False) email = Column(String(254), unique=True, nullable=False) status = Column(String(20), default="offline")# online, away, offline, busy skills = Column(JSON, nullable=True, default=list) current_conversation_ids = Column(JSON, nullable=True, default=list) max_concurrent_chats = Column(Integer, default=3) metadata = Column(JSON, nullable=True, default=dict) last_active_at = Column(DateTime, nullable=True) availability_schedule = Column(JSON, nullable=True)# 排班信息 performance_score = Column(Integer, default=0) is_active = Column(Boolean, default=True)defto_dict(self)-> Dict[str, Any]:"""转换为字典"""return{"id": self.id,"user_id": self.user_id,"name": self.name,"email": self.email,"status": self.status,"skills": self.skills or[],"current_conversation_ids": self.current_conversation_ids or[],"max_concurrent_chats": self.max_concurrent_chats,"metadata": self.metadata or{},"last_active_at": self.last_active_at.isoformat()if self.last_active_at elseNone,"availability_schedule": self.availability_schedule or{},"performance_score": self.performance_score,"is_active": self.is_active,"created_at": self.created_at.isoformat(),"updated_at": self.updated_at.isoformat()}classKnowledgeBase(Base, TimestampMixin):"""知识库模型""" __tablename__ ="knowledge_base"id= Column(String(36), primary_key=True, index=True) title = Column(String(500), nullable=False) content = Column(Text, nullable=False) category = Column(String(100), nullable=True, index=True) tags = Column(JSON, nullable=True, default=list) language = Column(String(10), default="zh") is_active = Column(Boolean, default=True) vector_embedding = Column(JSON, nullable=True)# 存储向量嵌入 metadata = Column(JSON, nullable=True, default=dict) usage_count = Column(Integer, default=0) last_used_at = Column(DateTime, nullable=True)defto_dict(self)-> Dict[str, Any]:"""转换为字典"""return{"id": self.id,"title": self.title,"content": self.content,"category": self.category,"tags": self.tags or[],"language": self.language,"is_active": self.is_active,"metadata": self.metadata or{},"usage_count": self.usage_count,"last_used_at": self.last_used_at.isoformat()if self.last_used_at elseNone,"created_at": self.created_at.isoformat(),"updated_at": self.updated_at.isoformat()}classIntent(Base, TimestampMixin):"""意图模型""" __tablename__ ="intents"id= Column(String(36), primary_key=True, index=True) name = Column(String(100), nullable=False, unique=True) description = Column(Text, nullable=True) examples = Column(JSON, nullable=True, default=list) handler_type = Column(String(50), default="ai")# ai, agent, hybrid confidence_threshold = Column(Integer, default=70) metadata = Column(JSON, nullable=True, default=dict) is_active = Column(Boolean, default=True)defto_dict(self)-> Dict[str, Any]:"""转换为字典"""return{"id": self.id,"name": self.name,"description": self.description,"examples": self.examples or[],"handler_type": self.handler_type,"confidence_threshold": self.confidence_threshold,"metadata": self.metadata or{},"is_active": self.is_active,"created_at": self.created_at.isoformat(),"updated_at": self.updated_at.isoformat()}classAIResponseLog(Base, TimestampMixin):"""AI响应日志""" __tablename__ ="ai_response_logs"id= Column(String(36), primary_key=True, index=True) conversation_id = Column(String(36), index=True, nullable=False) message_id = Column(String(36), index=True, nullable=False) ai_model = Column(String(100), nullable=False) prompt = Column(Text, nullable=False) response = Column(Text, nullable=False) confidence = Column(Integer, nullable=True) processing_time_ms = Column(Integer, nullable=True) tokens_used = Column(Integer, nullable=True) metadata = Column(JSON, nullable=True, default=dict)defto_dict(self)-> Dict[str, Any]:"""转换为字典"""return{"id": self.id,"conversation_id": self.conversation_id,"message_id": self.message_id,"ai_model": self.ai_model,"prompt": self.prompt,"response": self.response,"confidence": self.confidence,"processing_time_ms": self.processing_time_ms,"tokens_used": self.tokens_used,"metadata": self.metadata or{},"created_at": self.created_at.isoformat()}3.2 数据库初始化脚本
创建 scripts/init_database.py:
#!/usr/bin/env python3""" 数据库初始化脚本 创建数据库表结构和初始数据 """import asyncio import sys from pathlib import Path # 添加项目根目录到Python路径 sys.path.append(str(Path(__file__).parent.parent))from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker from sqlalchemy import text from src.core.config import settings from src.models.base import Base from src.models import*# 导入所有模型asyncdefinit_database():"""初始化数据库"""print("🚀 开始初始化数据库...")# 创建异步引擎 database_url = settings.DATABASE_URL.replace("postgresql+asyncpg","postgresql") sync_engine = create_async_engine( database_url, echo=settings.DEBUG, pool_pre_ping=True, pool_size=20, max_overflow=30)# 创建所有表print("📦 创建数据库表...")asyncwith sync_engine.begin()as conn:await conn.run_sync(Base.metadata.create_all)print("✅ 数据库表创建完成")# 创建初始数据print("📝 创建初始数据...")# 使用异步会话 async_session = sessionmaker( sync_engine, class_=AsyncSession, expire_on_commit=False)asyncwith async_session()as session:try:# 创建示例意图 sample_intents =[{"id":"greeting","name":"greeting","description":"客户打招呼","examples":["你好","嗨","早上好","在吗","有人吗"],"handler_type":"ai","confidence_threshold":60},{"id":"product_inquiry","name":"product_inquiry","description":"产品咨询","examples":["这个产品多少钱","有什么功能","怎么使用","有什么优惠","什么时候发货"],"handler_type":"ai","confidence_threshold":70},{"id":"technical_support","name":"technical_support","description":"技术支持","examples":["无法登录","系统错误","怎么重置密码","连接失败","闪退问题"],"handler_type":"hybrid","confidence_threshold":65},{"id":"complaint","name":"complaint","description":"投诉建议","examples":["我要投诉","服务太差","质量有问题","退款","赔偿"],"handler_type":"agent","confidence_threshold":75},{"id":"payment_issue","name":"payment_issue","description":"支付问题","examples":["付款失败","重复扣款","退款未到账","支付方式","发票问题"],"handler_type":"hybrid","confidence_threshold":70}]# 插入意图数据for intent_data in sample_intents:await session.execute(text(""" INSERT INTO intents (id, name, description, examples, handler_type, confidence_threshold, created_at, updated_at) VALUES (:id, :name, :description, :examples, :handler_type, :confidence_threshold, NOW(), NOW()) ON CONFLICT (id) DO NOTHING """), intent_data)# 创建示例知识库条目 knowledge_entries =[{"id":"kb_welcome","title":"欢迎信息","content":"欢迎使用我们的客服系统!我是AI助手,可以帮您解答问题。如需人工服务,请告诉我。","category":"general","tags":["welcome","greeting"],"language":"zh"},{"id":"kb_product_info","title":"产品基本信息","content":"我们的产品提供以下功能:1. 智能对话 2. 多语言支持 3. 24小时在线 4. 人工转接。价格根据套餐不同有所差异。","category":"product","tags":["product","features","pricing"],"language":"zh"},{"id":"kb_technical_help","title":"常见技术问题","content":"如果您遇到技术问题:1. 尝试刷新页面 2. 清除缓存 3. 检查网络连接 4. 重启应用。如果问题持续,请联系技术支持。","category":"technical","tags":["troubleshooting","help","support"],"language":"zh"},{"id":"kb_refund_policy","title":"退款政策","content":"我们的退款政策:购买后30天内可申请退款,需提供订单号和退款原因。退款将在7-14个工作日内处理。","category":"policy","tags":["refund","policy","money"],"language":"zh"}]for kb_data in knowledge_entries:await session.execute(text(""" INSERT INTO knowledge_base (id, title, content, category, tags, language, is_active, created_at, updated_at) VALUES (:id, :title, :content, :category, :tags, :language, true, NOW(), NOW()) ON CONFLICT (id) DO NOTHING """), kb_data)await session.commit()print("✅ 初始数据创建完成")except Exception as e:await session.rollback()print(f"❌ 创建初始数据失败: {e}")raiseprint("🎉 数据库初始化完成!")asyncdefcheck_database_connection():"""检查数据库连接"""print("🔍 检查数据库连接...")try: database_url = settings.DATABASE_URL.replace("postgresql+asyncpg","postgresql") engine = create_async_engine(database_url)asyncwith engine.connect()as conn: result =await conn.execute(text("SELECT version()")) version = result.scalar()print(f"✅ 数据库连接成功: {version}")await engine.dispose()returnTrueexcept Exception as e:print(f"❌ 数据库连接失败: {e}")returnFalseasyncdefmain():"""主函数"""print("="*50)print("数据库初始化工具")print("="*50)# 检查连接ifnotawait check_database_connection():print("请检查数据库配置和连接")return# 初始化数据库await init_database()if __name__ =="__main__": asyncio.run(main())4. AI核心组件实现
4.1 NLP处理器(意图识别与情感分析)
创建 src/ai_components/nlp/processor.py:
""" NLP处理器 负责意图识别、情感分析、实体提取等自然语言处理任务 """import asyncio import logging from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass import numpy as np from datetime import datetime # 导入必要的NLP库try:import jieba import jieba.analyse JIEBA_AVAILABLE =Trueexcept ImportError: JIEBA_AVAILABLE =Falseprint("⚠️ jieba未安装,中文分词功能将受限")try:from transformers import( AutoTokenizer, AutoModelForSequenceClassification, pipeline, BertTokenizer, BertModel )from sentence_transformers import SentenceTransformer TRANSFORMERS_AVAILABLE =Trueexcept ImportError: TRANSFORMERS_AVAILABLE =Falseprint("⚠️ transformers/sentence-transformers未安装")from src.core.config import settings @dataclassclassIntentResult:"""意图识别结果""" intent:str confidence:float entities: Dict[str, Any] alternatives: List[Dict[str,float]]@dataclassclassSentimentResult:"""情感分析结果""" sentiment:str# positive, negative, neutral score:float confidence:float emotions: Dict[str,float]# 细分情绪@dataclassclassNLPAnalysis:"""NLP分析综合结果""" text:str intent_result: IntentResult sentiment_result: SentimentResult entities: Dict[str, Any] keywords: List[str] language:str processing_time_ms:floatclassNLPProcessor:"""NLP处理器主类"""def__init__(self): self.logger = logging.getLogger(__name__) self.initialized =False self.models ={} self.tokenizers ={}# 配置 self.confidence_threshold =0.6 self.max_sequence_length =512asyncdefinitialize(self):"""初始化NLP模型"""if self.initialized:return self.logger.info("初始化NLP处理器...")try:# 初始化分词器if JIEBA_AVAILABLE:# 加载自定义词典 jieba.set_dictionary('./data/dict.txt') jieba.initialize()# 初始化句子编码器if TRANSFORMERS_AVAILABLE: self.logger.info("加载Sentence Transformer模型...") self.sentence_model = SentenceTransformer( settings.SENTENCE_TRANSFORMER_MODEL, cache_folder=settings.AI_MODEL_PATH )# 加载情感分析模型 self.logger.info("加载情感分析模型...") self.sentiment_analyzer = pipeline("sentiment-analysis", model="nlptown/bert-base-multilingual-uncased-sentiment", device=0if settings.USE_GPU else-1)# 加载BERT模型用于特征提取 self.logger.info("加载BERT模型...") self.bert_tokenizer = BertTokenizer.from_pretrained( settings.BERT_MODEL_PATH, cache_dir=settings.AI_MODEL_PATH ) self.bert_model = BertModel.from_pretrained( settings.BERT_MODEL_PATH, cache_dir=settings.AI_MODEL_PATH )if settings.USE_GPU: self.bert_model.cuda()# 加载意图分类模型(示例,实际需要训练) self.intent_patterns = self._load_intent_patterns() self.initialized =True self.logger.info("NLP处理器初始化完成")except Exception as e: self.logger.error(f"NLP处理器初始化失败: {e}")raisedef_load_intent_patterns(self)-> Dict[str, List[str]]:"""加载意图模式(实际应从数据库加载)"""return{"greeting":["你好","嗨","早上好","晚上好","在吗","有人吗"],"farewell":["再见","拜拜","谢谢","结束","好了"],"product_inquiry":["产品","价格","功能","特性","规格","多少钱"],"technical_support":["问题","错误","故障","无法","不能","帮助"],"complaint":["投诉","不满","差评","退款","赔偿","生气"],"payment_issue":["支付","付款","扣款","退款","发票","账单"],"schedule":["时间","预约","安排","什么时候","多久"],"location":["地址","位置","在哪里","怎么去","门店"],"account":["登录","注册","密码","账户","会员"],"other":[]}asyncdefanalyze_text(self, text:str, context: Optional[Dict]=None)-> NLPAnalysis:""" 综合分析文本 Args: text: 待分析文本 context: 上下文信息 Returns: NLPAnalysis对象 """ start_time = datetime.now()ifnot self.initialized:await self.initialize()# 检测语言 language = self._detect_language(text)# 并行执行各种分析 intent_task = asyncio.create_task(self.detect_intent(text, context)) sentiment_task = asyncio.create_task(self.analyze_sentiment(text)) entity_task = asyncio.create_task(self.extract_entities(text)) keyword_task = asyncio.create_task(self.extract_keywords(text, language))# 等待所有任务完成 intent_result =await intent_task sentiment_result =await sentiment_task entities =await entity_task keywords =await keyword_task # 计算处理时间 processing_time =(datetime.now()- start_time).total_seconds()*1000return NLPAnalysis( text=text, intent_result=intent_result, sentiment_result=sentiment_result, entities=entities, keywords=keywords, language=language, processing_time_ms=processing_time )asyncdefdetect_intent(self, text:str, context: Optional[Dict]=None)-> IntentResult:""" 检测用户意图 Args: text: 用户输入文本 context: 对话上下文 Returns: IntentResult对象 """try:# 方法1:基于规则的匹配(快速,准确率高) rule_based_result = self._rule_based_intent_detection(text)# 方法2:基于相似度的匹配(使用句子编码器) similarity_based_result =await self._similarity_based_intent_detection(text)# 方法3:基于机器学习的分类(如果有训练好的模型)# ml_based_result = await self._ml_based_intent_detection(text)# 融合结果(加权平均) final_intent, final_confidence, alternatives = self._fuse_intent_results( rule_based_result, similarity_based_result )# 提取实体 entities =await self.extract_entities(text)# 考虑上下文if context: final_intent = self._apply_context_rules(final_intent, context)return IntentResult( intent=final_intent, confidence=final_confidence, entities=entities, alternatives=alternatives )except Exception as e: self.logger.error(f"意图检测失败: {e}")return IntentResult( intent="unknown", confidence=0.0, entities={}, alternatives=[])def_rule_based_intent_detection(self, text:str)-> Tuple[str,float]:"""基于规则的意图检测""" text_lower = text.lower() best_intent ="other" best_score =0.0for intent, patterns in self.intent_patterns.items(): score =0for pattern in patterns:if pattern in text_lower: score +=1if score >0: pattern_count =len(patterns) normalized_score = score / pattern_count if normalized_score > best_score: best_score = normalized_score best_intent = intent return best_intent, best_score asyncdef_similarity_based_intent_detection(self, text:str)-> Tuple[str,float]:"""基于相似度的意图检测"""ifnot TRANSFORMERS_AVAILABLE:return"other",0.0try:# 编码输入文本 text_embedding = self.sentence_model.encode(text)# 计算与每个意图示例的相似度 intent_scores ={}for intent, examples in self.intent_patterns.items():ifnot examples:continue# 编码所有示例 example_embeddings = self.sentence_model.encode(examples)# 计算相似度 similarities = np.dot(example_embeddings, text_embedding.T) max_similarity = np.max(similarities)# 标准化到0-1范围 normalized_score =float((max_similarity +1)/2) intent_scores[intent]= normalized_score # 找到最佳匹配if intent_scores: best_intent =max(intent_scores, key=intent_scores.get) best_score = intent_scores[best_intent]return best_intent, best_score return"other",0.0except Exception as e: self.logger.error(f"相似度意图检测失败: {e}")return"other",0.0def_fuse_intent_results(self, rule_result: Tuple[str,float], similarity_result: Tuple[str,float])-> Tuple[str,float, List[Dict]]:"""融合不同方法的结果""" rule_intent, rule_score = rule_result sim_intent, sim_score = similarity_result # 权重设置(可调整) rule_weight =0.6 sim_weight =0.4# 计算加权得分 scores ={} scores[rule_intent]= rule_score * rule_weight scores[sim_intent]= scores.get(sim_intent,0)+ sim_score * sim_weight # 找到最佳意图 best_intent =max(scores, key=scores.get) best_score = scores[best_intent]# 生成备选意图 alternatives =[{"intent": intent,"score": score}for intent, score in scores.items()if intent != best_intent and score >0.1] alternatives.sort(key=lambda x: x["score"], reverse=True)return best_intent, best_score, alternatives def_apply_context_rules(self, intent:str, context: Dict)->str:"""应用上下文规则调整意图"""# 示例规则:如果连续3次都是greeting,可能是有问题if intent =="greeting"and context.get("greeting_count",0)>=3:return"technical_support"# 示例规则:如果刚刚完成支付相关对话,现在问时间可能是问发货时间if intent =="schedule"and context.get("last_intent")=="payment_issue":return"shipping_inquiry"return intent asyncdefanalyze_sentiment(self, text:str)-> SentimentResult:""" 分析文本情感 Args: text: 待分析文本 Returns: SentimentResult对象 """try:ifnot TRANSFORMERS_AVAILABLE:# 回退到简单的情感分析return self._simple_sentiment_analysis(text)# 使用transformers情感分析 results = self.sentiment_analyzer(text)if results: result = results[0] label = result['label'] score = result['score']# 转换标签if label in["POSITIVE","5 stars","4 stars"]: sentiment ="positive"elif label in["NEGATIVE","1 star","2 stars"]: sentiment ="negative"else: sentiment ="neutral"# 简单情绪分析 emotions = self._analyze_emotions(text)return SentimentResult( sentiment=sentiment, score=score, confidence=score, emotions=emotions )return SentimentResult( sentiment="neutral", score=0.5, confidence=0.0, emotions={})except Exception as e: self.logger.error(f"情感分析失败: {e}")return SentimentResult( sentiment="neutral", score=0.5, confidence=0.0, emotions={})def_simple_sentiment_analysis(self, text:str)-> SentimentResult:"""简单的情感分析(基于关键词)""" positive_words =["好","不错","满意","喜欢","棒","赞","感谢"] negative_words =["差","不好","不满意","讨厌","垃圾","投诉","生气"] positive_count =sum(1for word in positive_words if word in text) negative_count =sum(1for word in negative_words if word in text) total = positive_count + negative_count if total ==0:return SentimentResult( sentiment="neutral", score=0.5, confidence=0.0, emotions={}) sentiment_score = positive_count / total if sentiment_score >0.6: sentiment ="positive"elif sentiment_score <0.4: sentiment ="negative"else: sentiment ="neutral"return SentimentResult( sentiment=sentiment, score=sentiment_score, confidence=min(sentiment_score,1- sentiment_score)*2, emotions={})def_analyze_emotions(self, text:str)-> Dict[str,float]:"""分析细分情绪""" emotion_keywords ={"happy":["开心","高兴","快乐","愉快","兴奋"],"angry":["生气","愤怒","恼火","不爽","气愤"],"sad":["伤心","难过","悲伤","失望","沮丧"],"surprise":["惊讶","惊奇","意外","没想到"],"fear":["害怕","担心","恐惧","紧张"],"disgust":["恶心","讨厌","嫌弃","反感"]} emotions ={}for emotion, keywords in emotion_keywords.items(): count =sum(1for keyword in keywords if keyword in text)if count >0: emotions[emotion]= count /len(keywords)return emotions asyncdefextract_entities(self, text:str)-> Dict[str, Any]:""" 提取命名实体 Args: text: 待分析文本 Returns: 实体字典 """ entities ={"products":[],"numbers":[],"dates":[],"locations":[],"people":[],"organizations":[]}# 简单实体提取(实际应该使用NER模型)import re # 提取数字 numbers = re.findall(r'\d+', text) entities["numbers"]= numbers # 提取产品名(简单实现) product_keywords =["产品","服务","软件","系统","应用"]for keyword in product_keywords:if keyword in text:# 提取产品名周围的文本 idx = text.find(keyword) start =max(0, idx -10) end =min(len(text), idx +10) entities["products"].append(text[start:end])# 提取日期 date_patterns =[r'\d{4}年\d{1,2}月\d{1,2}日',r'\d{1,2}月\d{1,2}日',r'\d{4}-\d{2}-\d{2}',r'今天|明天|后天|昨天']for pattern in date_patterns: dates = re.findall(pattern, text) entities["dates"].extend(dates)return entities asyncdefextract_keywords(self, text:str, language:str="zh")-> List[str]:""" 提取关键词 Args: text: 待分析文本 language: 语言 Returns: 关键词列表 """if language =="zh"and JIEBA_AVAILABLE:# 使用jieba提取中文关键词 keywords = jieba.analyse.extract_tags( text, topK=10, withWeight=False, allowPOS=('n','nr','ns','nt','nz','v','a'))return keywords else:# 简单分词(适用于英文) words = text.lower().split()# 移除停用词 stop_words ={"the","a","an","and","or","but","in","on","at","to","for"} keywords =[word for word in words if word notin stop_words]return keywords[:10]def_detect_language(self, text:str)->str:"""检测文本语言"""# 简单语言检测(基于字符范围)import re # 检查中文字符if re.search(r'[\u4e00-\u9fff]', text):return"zh"# 检查英文字符if re.search(r'[a-zA-Z]', text):return"en"# 默认返回中文return"zh"asyncdefget_text_embedding(self, text:str)-> np.ndarray:""" 获取文本的向量嵌入 Args: text: 输入文本 Returns: 文本向量 """ifnot TRANSFORMERS_AVAILABLE:raise RuntimeError("transformers库未安装")ifnot self.initialized:await self.initialize() embedding = self.sentence_model.encode(text)return embedding asyncdefbatch_analyze(self, texts: List[str])-> List[NLPAnalysis]:""" 批量分析文本 Args: texts: 文本列表 Returns: 分析结果列表 """ tasks =[self.analyze_text(text)for text in texts] results =await asyncio.gather(*tasks)return results # 创建全局NLP处理器实例 nlp_processor = NLPProcessor()4.2 知识检索系统
创建 src/ai_components/retrieval/engine.py:
""" 知识检索引擎 基于向量相似度从知识库中检索相关信息 """import asyncio import logging import json from typing import List, Dict, Optional, Any, Tuple from datetime import datetime import numpy as np from dataclasses import dataclass # 导入向量数据库库try:import faiss FAISS_AVAILABLE =Trueexcept ImportError: FAISS_AVAILABLE =Falseprint("⚠️ faiss未安装,向量检索功能将受限")try:import chromadb from chromadb.config import Settings CHROMA_AVAILABLE =Trueexcept ImportError: CHROMA_AVAILABLE =Falseprint("⚠️ chromadb未安装,向量数据库功能将受限")from src.core.config import settings from src.ai_components.nlp.processor import nlp_processor @dataclassclassSearchResult:"""搜索结果"""id:str content:str score:float metadata: Dict[str, Any] source:str# knowledge_base, faq, document, etc.@dataclassclassRetrievalResponse:"""检索响应""" query:str results: List[SearchResult] suggested_questions: List[str] processing_time_ms:floatclassKnowledgeRetrievalEngine:"""知识检索引擎"""def__init__(self): self.logger = logging.getLogger(__name__) self.initialized =False# 向量索引 self.faiss_index =None self.id_to_content ={} self.embedding_dim =384# all-MiniLM-L6-v2的维度# ChromaDB客户端 self.chroma_client =None self.chroma_collection =None# 缓存 self.cache ={} self.cache_size =1000asyncdefinitialize(self):"""初始化检索引擎"""if self.initialized:return self.logger.info("初始化知识检索引擎...")try:# 初始化NLP处理器await nlp_processor.initialize()# 初始化向量数据库if CHROMA_AVAILABLE:await self._initialize_chromadb()# 初始化FAISS索引if FAISS_AVAILABLE:await self._initialize_faiss()# 从数据库加载知识库数据await self._load_knowledge_base() self.initialized =True self.logger.info("知识检索引擎初始化完成")except Exception as e: self.logger.error(f"知识检索引擎初始化失败: {e}")raiseasyncdef_initialize_chromadb(self):"""初始化ChromaDB""" self.logger.info("初始化ChromaDB...")# 创建ChromaDB客户端 self.chroma_client = chromadb.Client(Settings( chroma_db_impl="duckdb+parquet", persist_directory="./data/chroma_db"))# 创建或获取集合 collection_name ="knowledge_base"try: self.chroma_collection = self.chroma_client.get_collection(collection_name) self.logger.info(f"加载现有集合: {collection_name}")except: self.chroma_collection = self.chroma_client.create_collection( name=collection_name, metadata={"description":"客服知识库"}) self.logger.info(f"创建新集合: {collection_name}")asyncdef_initialize_faiss(self):"""初始化FAISS索引""" self.logger.info("初始化FAISS索引...")# 创建Flat索引(精确搜索,适合中小规模数据) self.faiss_index = faiss.IndexFlatIP(self.embedding_dim)# 如果有GPU,使用GPU加速if settings.USE_GPU andhasattr(faiss,'StandardGpuResources'): self.logger.info("启用FAISS GPU加速...") res = faiss.StandardGpuResources() self.faiss_index = faiss.index_cpu_to_gpu(res,0, self.faiss_index)asyncdef_load_knowledge_base(self):"""从数据库加载知识库""" self.logger.info("加载知识库数据...")# 这里应该从数据库加载知识库条目# 示例:模拟加载一些数据# 实际应该从数据库查询 sample_knowledge =[{"id":"kb_001","content":"我们的工作时间是周一至周五9:00-18:00,周末休息。","category":"general","metadata":{"source":"faq","importance":5}},{"id":"kb_002","content":"产品退款需要在购买后30天内申请,退款将在7-14个工作日内处理。","category":"policy","metadata":{"source":"policy","importance":8}},{"id":"kb_003","content":"如果忘记密码,可以点击登录页面的'忘记密码'链接,通过邮箱重置。","category":"account","metadata":{"source":"faq","importance":7}},{"id":"kb_004","content":"技术支持电话:400-123-4567,工作时间随时可拨打。","category":"contact","metadata":{"source":"contact","importance":9}},{"id":"kb_005","content":"产品支持7天无理由退货,但需要商品完好、包装完整。","category":"policy","metadata":{"source":"policy","importance":6}}]# 添加到向量数据库await self.add_documents(sample_knowledge) self.logger.info(f"已加载 {len(sample_knowledge)} 条知识库数据")asyncdefadd_documents(self, documents: List[Dict[str, Any]]):""" 添加文档到知识库 Args: documents: 文档列表,每个文档包含id, content, metadata """ifnot documents:return# 生成文档向量 contents =[doc["content"]for doc in documents] embeddings =await self._generate_embeddings(contents)# 添加到FAISS索引if FAISS_AVAILABLE and self.faiss_index isnotNone:# 转换为numpy数组 embeddings_np = np.array(embeddings).astype('float32')# 添加到索引 self.faiss_index.add(embeddings_np)# 保存id到内容的映射for i, doc inenumerate(documents): doc_id = doc["id"] self.id_to_content[doc_id]={"content": doc["content"],"metadata": doc.get("metadata",{}),"index":len(self.id_to_content)}# 添加到ChromaDBif CHROMA_AVAILABLE and self.chroma_collection isnotNone:try: self.chroma_collection.add( embeddings=embeddings, documents=contents, metadatas=[doc.get("metadata",{})for doc in documents], ids=[doc["id"]for doc in documents]) self.logger.info(f"已添加 {len(documents)} 个文档到ChromaDB")except Exception as e: self.logger.error(f"添加到ChromaDB失败: {e}")asyncdefsearch(self, query:str, top_k:int=5, threshold:float=0.5)-> RetrievalResponse:""" 搜索相关知识 Args: query: 查询文本 top_k: 返回结果数量 threshold: 相似度阈值 Returns: RetrievalResponse对象 """ start_time = datetime.now()ifnot self.initialized:await self.initialize()# 检查缓存 cache_key =f"{query}_{top_k}_{threshold}"if cache_key in self.cache: self.logger.debug("从缓存返回搜索结果")return self.cache[cache_key]# 分析查询 query_analysis =await nlp_processor.analyze_text(query) query_embedding =await nlp_processor.get_text_embedding(query)# 执行搜索 results =[]# 方法1:向量相似度搜索 vector_results =await self._vector_search( query_embedding, query, top_k, threshold ) results.extend(vector_results)# 方法2:关键词匹配搜索 keyword_results =await self._keyword_search( query_analysis.keywords, top_k, threshold ) results.extend(keyword_results)# 方法3:基于意图的搜索 intent_results =await self._intent_based_search( query_analysis.intent_result.intent, top_k ) results.extend(intent_results)# 去重和排序 unique_results = self._deduplicate_and_sort(results)# 取前top_k个结果 final_results = unique_results[:top_k]# 生成建议问题 suggested_questions =await self._generate_suggested_questions( query, query_analysis, final_results )# 计算处理时间 processing_time =(datetime.now()- start_time).total_seconds()*1000 response = RetrievalResponse( query=query, results=final_results, suggested_questions=suggested_questions, processing_time_ms=processing_time )# 缓存结果 self._add_to_cache(cache_key, response)return response asyncdef_vector_search(self, query_embedding: np.ndarray, query:str, top_k:int, threshold:float)-> List[SearchResult]:"""向量相似度搜索""" results =[]# 使用FAISS搜索if FAISS_AVAILABLE and self.faiss_index isnotNone:try:# 准备查询向量 query_vector = np.array([query_embedding]).astype('float32')# 搜索 distances, indices = self.faiss_index.search(query_vector, top_k *2)# 处理结果for i inrange(len(indices[0])): idx = indices[0][i] distance = distances[0][i]# 转换为相似度分数 score =float((distance +1)/2)# 假设cosine相似度if score >= threshold:# 查找对应文档for doc_id, doc_info in self.id_to_content.items():if doc_info["index"]== idx: results.append(SearchResult(id=doc_id, content=doc_info["content"], score=score, metadata=doc_info["metadata"], source="knowledge_base"))break self.logger.debug(f"FAISS搜索找到 {len(results)} 个结果")except Exception as e: self.logger.error(f"FAISS搜索失败: {e}")# 使用ChromaDB搜索if CHROMA_AVAILABLE and self.chroma_collection isnotNone:try: chroma_results = self.chroma_collection.query( query_embeddings=[query_embedding.tolist()], n_results=top_k, include=["documents","metadatas","distances"])if chroma_results and chroma_results["documents"]:for i inrange(len(chroma_results["documents"][0])): doc = chroma_results["documents"][0][i] metadata = chroma_results["metadatas"][0][i] distance = chroma_results["distances"][0][i]# 转换为相似度分数 score =float(1.0- distance)if distance else0.5if score >= threshold: doc_id =f"chroma_{i}" results.append(SearchResult(id=doc_id, content=doc, score=score, metadata=metadata or{}, source="knowledge_base")) self.logger.debug(f"ChromaDB搜索找到 {len(results)} 个结果")except Exception as e: self.logger.error(f"ChromaDB搜索失败: {e}")return results asyncdef_keyword_search(self, keywords: List[str], top_k:int, threshold:float)-> List[SearchResult]:"""关键词搜索""" results =[]ifnot keywords:return results # 简单实现:在内容中搜索关键词for doc_id, doc_info in self.id_to_content.items(): content = doc_info["content"].lower() metadata = doc_info["metadata"]# 计算关键词匹配度 match_count =0for keyword in keywords:if keyword.lower()in content: match_count +=1if match_count >0: score = match_count /len(keywords)if score >= threshold: results.append(SearchResult(id=doc_id, content=doc_info["content"], score=score, metadata=metadata, source="knowledge_base"))# 限制结果数量 results.sort(key=lambda x: x.score, reverse=True)return results[:top_k]asyncdef_intent_based_search(self, intent:str, top_k:int)-> List[SearchResult]:"""基于意图的搜索""" results =[]# 根据意图类别搜索相关文档 intent_categories ={"product_inquiry":["product","features","specification"],"technical_support":["technical","help","support","troubleshooting"],"payment_issue":["payment","refund","billing","invoice"],"complaint":["complaint","issue","problem","dissatisfied"],"account":["account","login","password","register"]} categories = intent_categories.get(intent,[])ifnot categories:return results for doc_id, doc_info in self.id_to_content.items(): metadata = doc_info["metadata"] doc_categories = metadata.get("categories",[])ifisinstance(doc_categories,str): doc_categories =[doc_categories]# 检查类别匹配for cat in categories:if cat in doc_categories: results.append(SearchResult(id=doc_id, content=doc_info["content"], score=0.7,# 中等置信度 metadata=metadata, source="knowledge_base"))break# 限制结果数量return results[:top_k]def_deduplicate_and_sort(self, results: List[SearchResult])-> List[SearchResult]:"""去重和排序""" seen_contents =set() unique_results =[]for result in results:# 简单去重:基于内容前50个字符 content_prefix = result.content[:50]if content_prefix notin seen_contents: seen_contents.add(content_prefix) unique_results.append(result)# 按分数排序 unique_results.sort(key=lambda x: x.score, reverse=True)return unique_results asyncdef_generate_suggested_questions(self, query:str, query_analysis: Any, search_results: List[SearchResult])-> List[str]:"""生成建议问题""" suggestions =[]# 基于意图的建议 intent = query_analysis.intent_result.intent intent_suggestions ={"product_inquiry":["这个产品有什么功能?","价格是多少?","有优惠活动吗?","怎么购买?"],"technical_support":["常见问题有哪些?","怎么联系技术支持?","系统要求是什么?","有用户手册吗?"],"payment_issue":["退款流程是怎样的?","支持哪些支付方式?","发票怎么开?","付款遇到问题怎么办?"],"account":["怎么注册账号?","忘记密码怎么办?","怎么修改个人信息?","账号安全如何保障?"]}if intent in intent_suggestions: suggestions.extend(intent_suggestions[intent][:2])# 基于搜索结果的建议for result in search_results[:2]:# 从内容中提取可能的问题 content = result.content sentences = content.split('。')[:2]for sentence in sentences:if sentence andlen(sentence)>10: suggestions.append(f"关于「{sentence[:20]}...」")# 去重 unique_suggestions =list(dict.fromkeys(suggestions))return unique_suggestions[:5]asyncdef_generate_embeddings(self, texts: List[str])-> List[List[float]]:"""生成文本向量""" embeddings =[]for text in texts: embedding =await nlp_processor.get_text_embedding(text) embeddings.append(embedding.tolist())return embeddings def_add_to_cache(self, key:str, value: RetrievalResponse):"""添加到缓存"""iflen(self.cache)>= self.cache_size:# 移除最旧的缓存项 oldest_key =next(iter(self.cache))del self.cache[oldest_key] self.cache[key]= value asyncdefupdate_document(self, doc_id:str, content:str, metadata: Dict):"""更新文档"""# 从缓存中删除相关查询 keys_to_remove =[k for k in self.cache.keys()if doc_id in k]for key in keys_to_remove:del self.cache[key]# 更新向量数据库(需要重新添加) document ={"id": doc_id,"content": content,"metadata": metadata }# 注意:实际实现需要先从索引中删除旧文档,再添加新文档await self.add_documents([document])asyncdefdelete_document(self, doc_id:str):"""删除文档"""# 从缓存中删除相关查询 keys_to_remove =[k for k in self.cache.keys()if doc_id in k]for key in keys_to_remove:del self.cache[key]# 从向量数据库中删除if CHROMA_AVAILABLE and self.chroma_collection isnotNone:try: self.chroma_collection.delete(ids=[doc_id]) self.logger.info(f"已从ChromaDB删除文档: {doc_id}")except Exception as e: self.logger.error(f"从ChromaDB删除文档失败: {e}")# 创建全局检索引擎实例 retrieval_engine = KnowledgeRetrievalEngine()5. 对话管理系统
5.1 对话状态管理
创建 src/services/conversation/manager.py:
""" 对话管理器 负责对话状态管理、上下文维护和流程控制 """import asyncio import json import logging from typing import Dict, List, Optional, Any, Tuple from datetime import datetime, timedelta from enum import Enum import uuid from src.core.config import settings from src.models.base import Conversation, Message from src.ai_components.nlp.processor import nlp_processor, NLPAnalysis from src.ai_components.retrieval.engine import retrieval_engine, RetrievalResponse classConversationState(Enum):"""对话状态枚举""" ACTIVE ="active"# 活跃状态 PENDING ="pending"# 等待人工 TRANSFERRING ="transferring"# 转接中 CLOSED ="closed"# 已关闭 ESCALATED ="escalated"# 已升级classHandlerType(Enum):"""处理器类型枚举""" AI ="ai"# AI处理 AGENT ="agent"# 人工处理 HYBRID ="hybrid"# 混合处理classConversationContext:"""对话上下文"""def__init__(self, conversation_id:str, customer_id:str): self.conversation_id = conversation_id self.customer_id = customer_id self.messages: List[Dict]=[] self.state_history: List[Dict]=[] self.intent_history: List[Dict]=[] self.sentiment_history: List[Dict]=[] self.metadata: Dict[str, Any]={} self.created_at = datetime.now() self.updated_at = datetime.now()# 统计信息 self.message_count =0 self.ai_response_count =0 self.agent_response_count =0 self.transfer_count =0 self.escalation_count =0# 时间相关 self.last_message_time =None self.last_ai_response_time =None self.last_agent_response_time =None# 缓存 self.cached_responses: Dict[str, Any]={}defadd_message(self, message: Dict):"""添加消息到上下文""" self.messages.append(message) self.message_count +=1 self.last_message_time = datetime.now() self.updated_at = datetime.now()# 限制历史消息数量iflen(self.messages)> settings.MAX_CONVERSATION_HISTORY: self.messages = self.messages[-settings.MAX_CONVERSATION_HISTORY:]defadd_state_change(self, old_state:str, new_state:str, reason:str):"""添加状态变更记录""" self.state_history.append({"timestamp": datetime.now().isoformat(),"old_state": old_state,"new_state": new_state,"reason": reason })defadd_intent(self, intent:str, confidence:float, text:str):"""添注意图记录""" self.intent_history.append({"timestamp": datetime.now().isoformat(),"intent": intent,"confidence": confidence,"text": text })defadd_sentiment(self, sentiment:str, score:float):"""添加情感记录""" self.sentiment_history.append({"timestamp": datetime.now().isoformat(),"sentiment": sentiment,"score": score })defget_recent_messages(self, count:int=10)-> List[Dict]:"""获取最近的消息"""return self.messages[-count:]if self.messages else[]defget_conversation_summary(self)-> Dict[str, Any]:"""获取对话摘要"""return{"conversation_id": self.conversation_id,"customer_id": self.customer_id,"message_count": self.message_count,"ai_response_count": self.ai_response_count,"agent_response_count": self.agent_response_count,"transfer_count": self.transfer_count,"escalation_count": self.escalation_count,"current_state": self.metadata.get("current_state","active"),"created_at": self.created_at.isoformat(),"updated_at": self.updated_at.isoformat(),"duration_seconds":(datetime.now()- self.created_at).total_seconds()}defis_timed_out(self)->bool:"""检查对话是否超时"""ifnot self.last_message_time:returnFalse timeout_minutes = settings.SESSION_TIMEOUT_MINUTES timeout_delta = timedelta(minutes=timeout_minutes)return datetime.now()- self.last_message_time > timeout_delta defto_dict(self)-> Dict[str, Any]:"""转换为字典"""return{"conversation_id": self.conversation_id,"customer_id": self.customer_id,"messages": self.messages,"state_history": self.state_history,"intent_history": self.intent_history,"sentiment_history": self.sentiment_history,"metadata": self.metadata,"statistics":{"message_count": self.message_count,"ai_response_count": self.ai_response_count,"agent_response_count": self.agent_response_count,"transfer_count": self.transfer_count,"escalation_count": self.escalation_count },"timestamps":{"created_at": self.created_at.isoformat(),"updated_at": self.updated_at.isoformat(),"last_message_time": self.last_message_time.isoformat()if self.last_message_time elseNone}}classConversationManager:"""对话管理器"""def__init__(self): self.logger = logging.getLogger(__name__) self.conversations: Dict[str, ConversationContext]={} self.customer_conversations: Dict[str, List[str]]={}# 配置 self.max_conversations =10000 self.cleanup_interval =300# 5分钟清理一次# 启动清理任务 asyncio.create_task(self._cleanup_task())asyncdefcreate_conversation(self, customer_id:str, channel:str, metadata: Optional[Dict]=None)-> ConversationContext:""" 创建新对话 Args: customer_id: 客户ID channel: 渠道 metadata: 元数据 Returns: 对话上下文 """ conversation_id =str(uuid.uuid4())# 创建上下文 context = ConversationContext(conversation_id, customer_id) context.metadata.update({"channel": channel,"current_state": ConversationState.ACTIVE.value,"created_channel": channel,"metadata": metadata or{}})# 保存到内存 self.conversations[conversation_id]= context # 更新客户对话映射if customer_id notin self.customer_conversations: self.customer_conversations[customer_id]=[] self.customer_conversations[customer_id].append(conversation_id)# 限制每个客户的对话数量iflen(self.customer_conversations[customer_id])>10:# 移除最旧的对话 old_conv_id = self.customer_conversations[customer_id].pop(0)if old_conv_id in self.conversations:del self.conversations[old_conv_id] self.logger.info(f"创建新对话: {conversation_id}, 客户: {customer_id}, 渠道: {channel}")return context asyncdefprocess_message(self, conversation_id:str, message: Dict)-> Dict[str, Any]:""" 处理消息 Args: conversation_id: 对话ID message: 消息数据 Returns: 处理结果 """ start_time = datetime.now()# 获取对话上下文 context = self.conversations.get(conversation_id)ifnot context:raise ValueError(f"对话不存在: {conversation_id}")# 更新最后消息时间 context.last_message_time = datetime.now()# 添加消息到上下文 context.add_message(message)# 分析消息 text = message.get("content","") nlp_analysis =await nlp_processor.analyze_text(text)# 更新上下文记录 context.add_intent( nlp_analysis.intent_result.intent, nlp_analysis.intent_result.confidence, text ) context.add_sentiment( nlp_analysis.sentiment_result.sentiment, nlp_analysis.sentiment_result.score )# 确定处理器类型 handler_type =await self._determine_handler_type( context, nlp_analysis, message )# 处理消息if handler_type == HandlerType.AI: response =await self._handle_with_ai( context, nlp_analysis, message ) context.ai_response_count +=1 context.last_ai_response_time = datetime.now()elif handler_type == HandlerType.AGENT: response =await self._handle_with_agent( context, nlp_analysis, message ) context.agent_response_count +=1 context.last_agent_response_time = datetime.now()elif handler_type == HandlerType.HYBRID: response =await self._handle_hybrid( context, nlp_analysis, message )else: response ={"type":"error","content":"无法确定处理方式","handler":"unknown"}# 更新对话状态await self._update_conversation_state(context, response)# 计算处理时间 processing_time =(datetime.now()- start_time).total_seconds()*1000# 构建响应 result ={"conversation_id": conversation_id,"message_id": message.get("id"),"handler_type": handler_type.value,"response": response,"analysis":{"intent": nlp_analysis.intent_result.intent,"confidence": nlp_analysis.intent_result.confidence,"sentiment": nlp_analysis.sentiment_result.sentiment,"sentiment_score": nlp_analysis.sentiment_result.score,"entities": nlp_analysis.entities,"keywords": nlp_analysis.keywords },"processing_time_ms": processing_time,"context_summary": context.get_conversation_summary()}return result asyncdef_determine_handler_type(self, context: ConversationContext, nlp_analysis: NLPAnalysis, message: Dict)-> HandlerType:""" 确定处理器类型 Args: context: 对话上下文 nlp_analysis: NLP分析结果 message: 消息 Returns: 处理器类型 """ current_state = context.metadata.get("current_state")# 如果已经在转接或人工状态,继续人工处理if current_state in[ConversationState.PENDING.value, ConversationState.TRANSFERRING.value, ConversationState.ESCALATED.value]:return HandlerType.AGENT # 检查是否明确要求人工 text = message.get("content","").lower()ifany(keyword in text for keyword in["人工","真人","客服","转人工"]):return HandlerType.AGENT # 检查情感:负面情感转人工if nlp_analysis.sentiment_result.sentiment =="negative": sentiment_score = nlp_analysis.sentiment_result.score if sentiment_score >0.7:# 强烈负面return HandlerType.AGENT elif sentiment_score >0.5:# 中度负面return HandlerType.HYBRID # 检查意图置信度 intent_confidence = nlp_analysis.intent_result.confidence intent_name = nlp_analysis.intent_result.intent # 获取意图配置(应该从数据库获取) intent_configs ={"complaint":{"threshold":0.7,"handler": HandlerType.AGENT},"technical_support":{"threshold":0.6,"handler": HandlerType.HYBRID},"payment_issue":{"threshold":0.65,"handler": HandlerType.HYBRID},"product_inquiry":{"threshold":0.5,"handler": HandlerType.AI},"greeting":{"threshold":0.4,"handler": HandlerType.AI},"other":{"threshold":0.3,"handler": HandlerType.AI}} config = intent_configs.get(intent_name, intent_configs["other"])# 如果置信度低于阈值,考虑转人工if intent_confidence < config["threshold"]:# 检查是否有历史对话可以提供上下文 recent_messages = context.get_recent_messages(3)iflen(recent_messages)>=2:# 如果最近有成功交互,继续AI处理return HandlerType.AI else:# 新对话且置信度低,转人工或混合return HandlerType.HYBRID # 检查对话长度:长对话可能需要人工介入if context.message_count >15:return HandlerType.HYBRID # 检查转移历史:频繁转移需要人工if context.transfer_count >2:return HandlerType.AGENT # 默认返回配置的处理器类型return config["handler"]asyncdef_handle_with_ai(self, context: ConversationContext, nlp_analysis: NLPAnalysis, message: Dict)-> Dict[str, Any]:"""使用AI处理消息"""try: text = message.get("content","")# 检索相关知识 retrieval_result =await retrieval_engine.search(text)# 构建AI提示 prompt = self._build_ai_prompt(context, text, nlp_analysis, retrieval_result)# 调用AI生成响应(这里简化,实际应该调用AI模型) ai_response =await self._generate_ai_response(prompt)# 构建响应 response ={"type":"ai_response","content": ai_response,"confidence": nlp_analysis.intent_result.confidence,"sources":[r.content[:100]for r in retrieval_result.results[:3]],"suggested_questions": retrieval_result.suggested_questions,"handler":"ai"}return response except Exception as e: self.logger.error(f"AI处理失败: {e}")return{"type":"error","content":"抱歉,AI暂时无法处理您的请求,正在为您转接人工客服。","handler":"ai_fallback"}asyncdef_handle_with_agent(self, context: ConversationContext, nlp_analysis: NLPAnalysis, message: Dict)-> Dict[str, Any]:"""使用人工处理消息"""try:# 获取可用的坐席 available_agents =await self._find_available_agents( context, nlp_analysis )if available_agents:# 分配坐席 assigned_agent =await self._assign_agent( context, available_agents[0]) response ={"type":"agent_assigned","content":f"正在为您转接人工客服,坐席 {assigned_agent['name']} 将为您服务。","agent_id": assigned_agent["id"],"agent_name": assigned_agent["name"],"estimated_wait_time":30,# 预估等待时间(秒)"handler":"agent_routing"}else:# 没有可用坐席 response ={"type":"agent_queue","content":"当前所有客服坐席繁忙,您已被加入等待队列。请稍候,我们会尽快为您服务。","queue_position":await self._get_queue_position(context),"estimated_wait_time":120,# 预估等待时间(秒)"handler":"agent_queue"}return response except Exception as e: self.logger.error(f"人工处理失败: {e}")return{"type":"error","content":"抱歉,人工客服暂时不可用,请稍后再试或留言。","handler":"agent_fallback"}asyncdef_handle_hybrid(self, context: ConversationContext, nlp_analysis: NLPAnalysis, message: Dict)-> Dict[str, Any]:"""混合处理:AI提供建议,人工确认"""try:# 先获取AI响应 text = message.get("content","") retrieval_result =await retrieval_engine.search(text) prompt = self._build_ai_prompt(context, text, nlp_analysis, retrieval_result) ai_response =await self._generate_ai_response(prompt)# 同时寻找人工坐席 available_agents =await self._find_available_agents( context, nlp_analysis ) response ={"type":"hybrid_response","content": ai_response,"ai_suggestion": ai_response,"requires_agent_confirmation":True,"agent_available":len(available_agents)>0,"confidence": nlp_analysis.intent_result.confidence,"sources":[r.content[:100]for r in retrieval_result.results[:2]],"handler":"hybrid"}# 如果坐席可用,准备转接if available_agents: response["agent_note"]="AI已提供建议,坐席可随时接管对话"return response except Exception as e: self.logger.error(f"混合处理失败: {e}")returnawait self._handle_with_ai(context, nlp_analysis, message)def_build_ai_prompt(self, context: ConversationContext, text:str, nlp_analysis: NLPAnalysis, retrieval_result: RetrievalResponse)->str:"""构建AI提示"""# 获取对话历史 recent_messages = context.get_recent_messages(5)# 构建提示模板 prompt =f"""你是一个专业的客服AI助手,正在与客户对话。 客户信息: - 对话ID: {context.conversation_id} - 消息总数: {context.message_count} - 当前情感: {nlp_analysis.sentiment_result.sentiment} (分数: {nlp_analysis.sentiment_result.score}) - 检测意图: {nlp_analysis.intent_result.intent} (置信度: {nlp_analysis.intent_result.confidence}) 对话历史(最近5条): """for msg in recent_messages[-5:]: sender ="客户"if msg.get("sender_type")=="customer"else"客服" prompt +=f"{sender}: {msg.get('content','')}\n" prompt +=f"\n当前客户消息:{text}\n\n"# 添加相关知识if retrieval_result.results: prompt +="相关知识库信息:\n"for i, result inenumerate(retrieval_result.results[:3],1): prompt +=f"{i}. {result.content[:200]}...\n" prompt +=""" 请根据以上信息,用友好、专业、有帮助的语气回复客户。注意: 1. 如果问题需要人工处理,请说明原因并建议转人工 2. 如果信息不足,可以询问更多细节 3. 保持回复简洁明了 4. 如果是中文对话,请用中文回复 你的回复:"""return prompt asyncdef_generate_ai_response(self, prompt:str)->str:"""生成AI响应(简化版本)"""# 这里应该调用实际的AI模型# 示例:使用简单的规则生成响应# 实际应该使用GPT或其他模型# 为了示例,返回一个简单的响应if"你好"in prompt or"嗨"in prompt:return"您好!我是AI客服助手,很高兴为您服务。请问有什么可以帮助您的吗?"elif"谢谢"in prompt or"感谢"in prompt:return"不客气!很高兴能帮助您。如果还有其他问题,请随时告诉我。"elif"再见"in prompt or"拜拜"in prompt:return"感谢您的咨询,再见!祝您有美好的一天!"else:return"我理解您的问题,正在为您查找相关信息。请稍等..."asyncdef_find_available_agents(self, context: ConversationContext, nlp_analysis: NLPAnalysis)-> List[Dict]:"""查找可用坐席"""# 这里应该从数据库查询可用坐席# 示例:返回模拟数据 skills_needed =[]if nlp_analysis.intent_result.intent =="technical_support": skills_needed =["technical","support"]elif nlp_analysis.intent_result.intent =="payment_issue": skills_needed =["billing","finance"]# 模拟坐席数据 mock_agents =[{"id":"agent_001","name":"张客服","skills":["technical","support","product"],"status":"online","current_chats":2,"max_chats":5},{"id":"agent_002","name":"李客服","skills":["billing","finance","account"],"status":"online","current_chats":1,"max_chats":5},{"id":"agent_003","name":"王客服","skills":["general","sales","product"],"status":"away","current_chats":0,"max_chats":5}]# 过滤可用坐席 available_agents =[]for agent in mock_agents:if(agent["status"]=="online"and agent["current_chats"]< agent["max_chats"]):# 检查技能匹配if skills_needed:ifany(skill in agent["skills"]for skill in skills_needed): available_agents.append(agent)else: available_agents.append(agent)return available_agents asyncdef_assign_agent(self, context: ConversationContext, agent: Dict)-> Dict:"""分配坐席"""# 更新上下文 context.metadata["assigned_agent_id"]= agent["id"] context.metadata["assigned_agent_name"]= agent["name"] context.transfer_count +=1# 这里应该更新数据库中的对话记录# 并通知坐席端return agent asyncdef_get_queue_position(self, context: ConversationContext)->int:"""获取队列位置"""# 这里应该查询实际的队列系统return1asyncdef_update_conversation_state(self, context: ConversationContext, response: Dict):"""更新对话状态""" old_state = context.metadata.get("current_state", ConversationState.ACTIVE.value) new_state = old_state response_type = response.get("type","")if response_type =="agent_assigned": new_state = ConversationState.TRANSFERRING.value elif response_type =="agent_queue": new_state = ConversationState.PENDING.value elif"escalat"in response_type.lower(): new_state = ConversationState.ESCALATED.value if new_state != old_state: context.metadata["current_state"]= new_state context.add_state_change(old_state, new_state, response_type) self.logger.info(f"对话 {context.conversation_id} 状态变更: {old_state} -> {new_state}")asyncdefget_conversation(self, conversation_id:str)-> Optional[ConversationContext]:"""获取对话上下文"""return self.conversations.get(conversation_id)asyncdefclose_conversation(self, conversation_id:str, reason:str="client_closed"):"""关闭对话""" context = self.conversations.get(conversation_id)if context: old_state = context.metadata.get("current_state", ConversationState.ACTIVE.value) context.metadata["current_state"]= ConversationState.CLOSED.value context.add_state_change(old_state, ConversationState.CLOSED.value, reason) context.metadata["closed_reason"]= reason context.metadata["closed_at"]= datetime.now().isoformat() self.logger.info(f"关闭对话: {conversation_id}, 原因: {reason}")asyncdef_cleanup_task(self):"""清理任务:定期清理超时对话"""whileTrue:try:await asyncio.sleep(self.cleanup_interval) current_time = datetime.now() expired_conversations =[]# 查找超时对话for conv_id, context in self.conversations.items():if context.is_timed_out(): expired_conversations.append(conv_id)# 清理超时对话for conv_id in expired_conversations:await self.close_conversation(conv_id,"timeout")del self.conversations[conv_id]if expired_conversations: self.logger.info(f"清理了 {len(expired_conversations)} 个超时对话")# 限制总对话数量iflen(self.conversations)> self.max_conversations:# 移除最旧的对话 excess =len(self.conversations)- self.max_conversations old_ids =list(self.conversations.keys())[:excess]for conv_id in old_ids:await self.close_conversation(conv_id,"system_cleanup")del self.conversations[conv_id] self.logger.info(f"清理了 {excess} 个旧对话以控制内存")except Exception as e: self.logger.error(f"清理任务出错: {e}")# 创建全局对话管理器实例 conversation_manager = ConversationManager()6. API服务实现
6.1 主API服务
创建 src/api/main.py:
""" 主API服务 提供RESTful API接口 """import logging from typing import List, Optional from fastapi import FastAPI, HTTPException, Depends, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware from fastapi.responses import JSONResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from src.core.config import settings from src.api.endpoints import conversations, customers, agents, knowledge, analytics # 配置日志 logging.basicConfig( level=getattr(logging, settings.LOG_LEVEL),format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__)# 创建FastAPI应用 app = FastAPI( title=settings.PROJECT_NAME, version=settings.APP_VERSION, openapi_url=f"{settings.API_V1_STR}/openapi.json", docs_url="/docs"if settings.DEBUG elseNone, redoc_url="/redoc"if settings.DEBUG elseNone)# 安全验证 security = HTTPBearer()asyncdefverify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):"""验证访问令牌"""# 这里应该实现JWT验证逻辑# 简化示例:检查Bearer令牌 token = credentials.credentials ifnot token or token !="demo_token":# 实际应该验证JWTraise HTTPException( status_code=401, detail="无效的访问令牌", headers={"WWW-Authenticate":"Bearer"})return{"user_id":"demo_user"}# 中间件 app.add_middleware( CORSMiddleware, allow_origins=settings.BACKEND_CORS_ORIGINS, allow_credentials=True, allow_methods=["*"], allow_headers=["*"],) app.add_middleware( TrustedHostMiddleware, allowed_hosts=["*"]if settings.DEBUG else settings.BACKEND_CORS_ORIGINS )# 异常处理器@app.exception_handler(HTTPException)asyncdefhttp_exception_handler(request, exc):"""HTTP异常处理器"""return JSONResponse( status_code=exc.status_code, content={"error":{"code": exc.status_code,"message": exc.detail,"details":getattr(exc,"details",None)}})@app.exception_handler(Exception)asyncdefgeneral_exception_handler(request, exc):"""通用异常处理器""" logger.error(f"未处理异常: {exc}", exc_info=True)return JSONResponse( status_code=500, content={"error":{"code":500,"message":"内部服务器错误","details":str(exc)if settings.DEBUG elseNone}})# 路由 app.include_router( conversations.router, prefix=f"{settings.API_V1_STR}/conversations", tags=["conversations"], dependencies=[Depends(verify_token)]) app.include_router( customers.router, prefix=f"{settings.API_V1_STR}/customers", tags=["customers"], dependencies=[Depends(verify_token)]) app.include_router( agents.router, prefix=f"{settings.API_V1_STR}/agents", tags=["agents"], dependencies=[Depends(verify_token)]) app.include_router( knowledge.router, prefix=f"{settings.API_V1_STR}/knowledge", tags=["knowledge"], dependencies=[Depends(verify_token)]) app.include_router( analytics.router, prefix=f"{settings.API_V1_STR}/analytics", tags=["analytics"], dependencies=[Depends(verify_token)])# WebSocket端点@app.websocket("/ws/{conversation_id}")asyncdefwebsocket_endpoint( websocket: WebSocket, conversation_id:str, token: Optional[str]=None):"""WebSocket端点用于实时通信"""await websocket.accept()try:# 验证连接ifnot token or token !="demo_token":await websocket.send_json({"type":"error","message":"未授权连接"})await websocket.close()return# 这里应该实现WebSocket消息处理逻辑# 简化示例:回显消息whileTrue: data =await websocket.receive_json()# 处理消息 response ={"type":"echo","conversation_id": conversation_id,"message": data.get("message"),"timestamp": data.get("timestamp")}await websocket.send_json(response)except WebSocketDisconnect: logger.info(f"WebSocket连接断开: {conversation_id}")except Exception as e: logger.error(f"WebSocket错误: {e}")await websocket.close()# 健康检查端点@app.get("/health")asyncdefhealth_check():"""健康检查"""return{"status":"healthy","service": settings.PROJECT_NAME,"version": settings.APP_VERSION,"environment": settings.ENVIRONMENT }@app.get("/")asyncdefroot():"""根端点"""return{"message":f"欢迎使用{settings.PROJECT_NAME} API","version": settings.APP_VERSION,"docs":"/docs"if settings.DEBUG elseNone,"environment": settings.ENVIRONMENT }# 启动事件@app.on_event("startup")asyncdefstartup_event():"""启动事件""" logger.info(f"启动 {settings.PROJECT_NAME} v{settings.APP_VERSION}") logger.info(f"环境: {settings.ENVIRONMENT}") logger.info(f"调试模式: {settings.DEBUG}")# 初始化AI组件try:from src.ai_components.nlp.processor import nlp_processor from src.ai_components.retrieval.engine import retrieval_engine await nlp_processor.initialize()await retrieval_engine.initialize() logger.info("AI组件初始化完成")except Exception as e: logger.error(f"AI组件初始化失败: {e}")@app.on_event("shutdown")asyncdefshutdown_event():"""关闭事件""" logger.info("正在关闭服务...")if __name__ =="__main__":import uvicorn uvicorn.run("src.api.main:app", host="0.0.0.0", port=8000,reload=settings.DEBUG, log_level=settings.LOG_LEVEL.lower())6.2 对话API端点
创建 src/api/endpoints/conversations.py:
""" 对话API端点 处理对话相关的HTTP请求 """import logging from typing import List, Optional, Dict, Any from datetime import datetime from fastapi import APIRouter, HTTPException, Depends, WebSocket from pydantic import BaseModel, Field import uuid from src.core.config import settings from src.services.conversation.manager import conversation_manager, ConversationContext logger = logging.getLogger(__name__) router = APIRouter()# 数据模型classMessageRequest(BaseModel):"""消息请求""" content:str= Field(..., min_length=1, max_length=2000) content_type:str= Field(default="text") metadata: Optional[Dict[str, Any]]= Field(default=None) customer_id: Optional[str]= Field(default=None) channel:str= Field(default="web")classConversationCreateRequest(BaseModel):"""创建对话请求""" customer_id:str channel:str= Field(default="web") metadata: Optional[Dict[str, Any]]= Field(default=None) initial_message: Optional[str]= Field(default=None)classConversationResponse(BaseModel):"""对话响应""" conversation_id:str customer_id:str channel:str status:str created_at:str updated_at:str message_count:int metadata: Dict[str, Any] summary: Optional[Dict[str, Any]]=NoneclassMessageResponse(BaseModel):"""消息响应""" message_id:str conversation_id:str sender_type:str sender_id: Optional[str] content:str content_type:str created_at:str metadata: Dict[str, Any] intent: Optional[str]=None confidence: Optional[float]=NoneclassProcessMessageResponse(BaseModel):"""处理消息响应""" conversation_id:str message_id:str handler_type:str response: Dict[str, Any] analysis: Dict[str, Any] processing_time_ms:float context_summary: Dict[str, Any]# API端点@router.post("/", response_model=ConversationResponse)asyncdefcreate_conversation(request: ConversationCreateRequest):""" 创建新对话 Args: request: 创建对话请求 Returns: 创建的对话信息 """try: logger.info(f"创建对话请求: 客户={request.customer_id}, 渠道={request.channel}")# 创建对话上下文 context =await conversation_manager.create_conversation( customer_id=request.customer_id, channel=request.channel, metadata=request.metadata )# 如果有初始消息,处理它if request.initial_message: message ={"id":str(uuid.uuid4()),"content": request.initial_message,"content_type":"text","sender_type":"customer","sender_id": request.customer_id,"metadata": request.metadata or{},"timestamp": datetime.now().isoformat()}await conversation_manager.process_message( context.conversation_id, message )# 构建响应 response = ConversationResponse( conversation_id=context.conversation_id, customer_id=context.customer_id, channel=context.metadata.get("channel", request.channel), status=context.metadata.get("current_state","active"), created_at=context.created_at.isoformat(), updated_at=context.updated_at.isoformat(), message_count=context.message_count, metadata=context.metadata, summary=context.get_conversation_summary())return response except Exception as e: logger.error(f"创建对话失败: {e}")raise HTTPException( status_code=500, detail=f"创建对话失败: {str(e)}")@router.post("/{conversation_id}/messages", response_model=ProcessMessageResponse)asyncdefprocess_message(conversation_id:str, request: MessageRequest):""" 处理对话消息 Args: conversation_id: 对话ID request: 消息请求 Returns: 处理结果 """try: logger.info(f"处理消息: 对话={conversation_id}, 内容长度={len(request.content)}")# 创建消息对象 message_id =str(uuid.uuid4()) message ={"id": message_id,"content": request.content,"content_type": request.content_type,"sender_type":"customer","sender_id": request.customer_id,"metadata": request.metadata or{},"timestamp": datetime.now().isoformat()}# 处理消息 result =await conversation_manager.process_message( conversation_id, message )# 构建响应 response = ProcessMessageResponse( conversation_id=result["conversation_id"], message_id=result["message_id"], handler_type=result["handler_type"], response=result["response"], analysis=result["analysis"], processing_time_ms=result["processing_time_ms"], context_summary=result["context_summary"])return response except ValueError as e: logger.error(f"对话不存在: {conversation_id}")raise HTTPException(status_code=404, detail=str(e))except Exception as e: logger.error(f"处理消息失败: {e}")raise HTTPException( status_code=500, detail=f"处理消息失败: {str(e)}")@router.get("/{conversation_id}", response_model=ConversationResponse)asyncdefget_conversation(conversation_id:str):""" 获取对话详情 Args: conversation_id: 对话ID Returns: 对话详情 """try: context =await conversation_manager.get_conversation(conversation_id)ifnot context:raise HTTPException(status_code=404, detail="对话不存在") response = ConversationResponse( conversation_id=context.conversation_id, customer_id=context.customer_id, channel=context.metadata.get("channel","unknown"), status=context.metadata.get("current_state","active"), created_at=context.created_at.isoformat(), updated_at=context.updated_at.isoformat(), message_count=context.message_count, metadata=context.metadata, summary=context.get_conversation_summary())return response except HTTPException:raiseexcept Exception as e: logger.error(f"获取对话失败: {e}")raise HTTPException( status_code=500, detail=f"获取对话失败: {str(e)}")@router.get("/{conversation_id}/messages", response_model=List[MessageResponse])asyncdefget_conversation_messages(conversation_id:str, limit:int=50, offset:int=0):""" 获取对话消息历史 Args: conversation_id: 对话ID limit: 返回数量限制 offset: 偏移量 Returns: 消息列表 """try: context =await conversation_manager.get_conversation(conversation_id)ifnot context:raise HTTPException(status_code=404, detail="对话不存在")# 获取消息 messages = context.get_recent_messages(limit + offset) messages = messages[offset:offset + limit]# 构建响应 response_messages =[]for msg in messages: response_messages.append(MessageResponse( message_id=msg.get("id",str(uuid.uuid4())), conversation_id=conversation_id, sender_type=msg.get("sender_type","customer"), sender_id=msg.get("sender_id"), content=msg.get("content",""), content_type=msg.get("content_type","text"), created_at=msg.get("timestamp", datetime.now().isoformat()), metadata=msg.get("metadata",{}), intent=msg.get("intent"), confidence=msg.get("confidence")))return response_messages except HTTPException:raiseexcept Exception as e: logger.error(f"获取消息失败: {e}")raise HTTPException( status_code=500, detail=f"获取消息失败: {str(e)}")@router.post("/{conversation_id}/close")asyncdefclose_conversation(conversation_id:str, reason:str="client_closed"):""" 关闭对话 Args: conversation_id: 对话ID reason: 关闭原因 Returns: 关闭结果 """try:await conversation_manager.close_conversation(conversation_id, reason)return{"success":True,"conversation_id": conversation_id,"reason": reason,"closed_at": datetime.now().isoformat()}except Exception as e: logger.error(f"关闭对话失败: {e}")raise HTTPException( status_code=500, detail=f"关闭对话失败: {str(e)}")@router.get("/customer/{customer_id}/conversations", response_model=List[ConversationResponse])asyncdefget_customer_conversations(customer_id:str, limit:int=20):""" 获取客户的所有对话 Args: customer_id: 客户ID limit: 返回数量限制 Returns: 对话列表 """try:# 这里应该从数据库查询客户的对话# 简化示例:返回空列表return[]except Exception as e: logger.error(f"获取客户对话失败: {e}")raise HTTPException( status_code=500, detail=f"获取客户对话失败: {str(e)}")@router.post("/{conversation_id}/transfer")asyncdeftransfer_to_agent(conversation_id:str, agent_id: Optional[str]=None):""" 转接到人工坐席 Args: conversation_id: 对话ID agent_id: 指定的坐席ID(可选) Returns: 转接结果 """try: context =await conversation_manager.get_conversation(conversation_id)ifnot context:raise HTTPException(status_code=404, detail="对话不存在")# 创建转接消息 message_id =str(uuid.uuid4()) message ={"id": message_id,"content":"客户请求转接人工客服","content_type":"system","sender_type":"system","metadata":{"transfer_request":True,"requested_agent_id": agent_id },"timestamp": datetime.now().isoformat()}# 处理转接请求 result =await conversation_manager.process_message( conversation_id, message )return{"success":True,"conversation_id": conversation_id,"transfer_initiated":True,"result": result }except HTTPException:raiseexcept Exception as e: logger.error(f"转接失败: {e}")raise HTTPException( status_code=500, detail=f"转接失败: {str(e)}")# WebSocket端点(用于实时消息)@router.websocket("/{conversation_id}/ws")asyncdefconversation_websocket( websocket: WebSocket, conversation_id:str, token: Optional[str]=None):""" 对话WebSocket端点 Args: websocket: WebSocket连接 conversation_id: 对话ID token: 认证令牌 """await websocket.accept()try:# 验证对话 context =await conversation_manager.get_conversation(conversation_id)ifnot context:await websocket.send_json({"type":"error","message":"对话不存在"})await websocket.close()return# 验证令牌(简化)ifnot token or token !="demo_token":await websocket.send_json({"type":"error","message":"未授权连接"})await websocket.close()return# 发送欢迎消息await websocket.send_json({"type":"welcome","conversation_id": conversation_id,"customer_id": context.customer_id,"message":"WebSocket连接已建立","timestamp": datetime.now().isoformat()})# 处理消息whileTrue: data =await websocket.receive_json() message_type = data.get("type")if message_type =="message":# 处理客户端消息 content = data.get("content","") sender_type = data.get("sender_type","customer") message ={"id":str(uuid.uuid4()),"content": content,"content_type":"text","sender_type": sender_type,"sender_id": data.get("sender_id"),"metadata": data.get("metadata",{}),"timestamp": datetime.now().isoformat()}# 添加到上下文 context.add_message(message)# 如果是客户消息,处理它if sender_type =="customer": result =await conversation_manager.process_message( conversation_id, message )# 发送响应await websocket.send_json({"type":"response","result": result })else:# 直接回显await websocket.send_json({"type":"echo","message": message })elif message_type =="ping":# 心跳检测await websocket.send_json({"type":"pong","timestamp": datetime.now().isoformat()})elif message_type =="close":# 关闭连接await websocket.close()breakelse:await websocket.send_json({"type":"error","message":f"未知的消息类型: {message_type}"})except WebSocketDisconnect: logger.info(f"WebSocket连接断开: {conversation_id}")except Exception as e: logger.error(f"WebSocket错误: {e}")try:await websocket.send_json({"type":"error","message":f"服务器错误: {str(e)}"})await websocket.close()except:pass7. 坐席桌面应用(简化前端)
7.1 坐席界面HTML模板
创建 src/static/agent_dashboard.html:
<!DOCTYPEhtml><htmllang="zh-CN"><head><metacharset="UTF-8"><metaname="viewport"content="width=device-width, initial-scale=1.0"><title>AI-人工客服坐席桌面</title><linkhref="https://cdn.jsdelivr.net/npm/[email protected]/dist/tailwind.min.css"rel="stylesheet"><linkrel="stylesheet"href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css"><style>.conversation-active{border-left: 4px solid #10B981;}.conversation-pending{border-left: 4px solid #F59E0B;}.conversation-transferring{border-left: 4px solid #3B82F6;}.message-customer{background-color: #E5E7EB;align-self: flex-start;}.message-agent{background-color: #3B82F6;color: white;align-self: flex-end;}.message-ai{background-color: #10B981;color: white;align-self: flex-end;}.typing-indicator{display: inline-block;width: 10px;height: 10px;border-radius: 50%;background-color: #9CA3AF;margin: 0 2px;animation: typing 1.4s infinite ease-in-out both;}.typing-indicator:nth-child(1){animation-delay: -0.32s;}.typing-indicator:nth-child(2){animation-delay: -0.16s;}@keyframes typing{0%, 80%, 100%{transform:scale(0);}40%{transform:scale(1.0);}}</style></head><bodyclass="bg-gray-100"><divclass="flex h-screen"><!-- 左侧边栏 --><divclass="w-80 bg-white border-r border-gray-200 flex flex-col"><!-- 坐席状态 --><divclass="p-4 border-b border-gray-200"><divclass="flex items-center justify-between"><divclass="flex items-center"><divclass="w-10 h-10 bg-blue-500 rounded-full flex items-center justify-center text-white font-bold"> A </div><divclass="ml-3"><divclass="font-semibold">坐席: 张客服</div><divclass="text-sm text-gray-500"id="agent-status">离线</div></div></div><div><selectid="status-select"class="border rounded px-2 py-1 text-sm"><optionvalue="online">在线</option><optionvalue="away">离开</option><optionvalue="offline"selected>离线</option></select></div></div><!-- 统计数据 --><divclass="mt-4 grid grid-cols-2 gap-2"><divclass="bg-blue-50 p-2 rounded text-center"><divclass="text-2xl font-bold text-blue-600"id="active-chats">0</div><divclass="text-xs text-gray-600">进行中</div></div><divclass="bg-green-50 p-2 rounded text-center"><divclass="text-2xl font-bold text-green-600"id="total-chats">0</div><divclass="text-xs text-gray-600">今日总计</div></div><divclass="bg-yellow-50 p-2 rounded text-center"><divclass="text-2xl font-bold text-yellow-600"id="waiting-chats">0</div><divclass="text-xs text-gray-600">等待中</div></div><divclass="bg-purple-50 p-2 rounded text-center"><divclass="text-2xl font-bold text-purple-600"id="avg-response">0s</div><divclass="text-xs text-gray-600">平均响应</div></div></div></div><!-- 对话列表 --><divclass="flex-1 overflow-y-auto"><divclass="p-4 border-b border-gray-200"><divclass="flex justify-between items-center"><h3class="font-semibold">进行中对话</h3><spanclass="bg-blue-100 text-blue-800 text-xs px-2 py-1 rounded-full"id="conversation-count">0</span></div><divclass="mt-2"><inputtype="text"placeholder="搜索对话..."class="w-full border rounded px-3 py-1 text-sm"></div></div><divid="conversation-list"class="divide-y divide-gray-100"><!-- 对话列表将通过JavaScript动态加载 --><divclass="p-4 text-center text-gray-500"id="no-conversations"> 暂无进行中的对话 </div></div></div><!-- 快捷回复 --><divclass="p-4 border-t border-gray-200"><h4class="font-semibold mb-2">快捷回复</h4><divclass="space-y-1"><buttonclass="quick-reply-btn w-full text-left px-3 py-2 text-sm bg-gray-100 hover:bg-gray-200 rounded"data-reply="您好,很高兴为您服务!"> 问候语 </button><buttonclass="quick-reply-btn w-full text-left px-3 py-2 text-sm bg-gray-100 hover:bg-gray-200 rounded"data-reply="请稍等,我为您查询一下。"> 请稍等 </button><buttonclass="quick-reply-btn w-full text-left px-3 py-2 text-sm bg-gray-100 hover:bg-gray-200 rounded"data-reply="请问您还有其他问题吗?"> 结束语 </button></div></div></div><!-- 主聊天区域 --><divclass="flex-1 flex flex-col"><!-- 聊天头部 --><divclass="bg-white border-b border-gray-200 p-4"><divclass="flex justify-between items-center"><div><h2class="text-xl font-semibold"id="current-customer">未选择对话</h2><divclass="flex items-center mt-1"><spanclass="text-sm text-gray-500"id="conversation-info">-</span><spanclass="ml-3 text-xs px-2 py-1 rounded-full"id="conversation-status-badge">-</span></div></div><divclass="flex space-x-2"><buttonid="ai-assist-btn"class="px-4 py-2 bg-green-100 text-green-800 rounded hover:bg-green-200"><iclass="fas fa-robot mr-1"></i> AI协助 </button><buttonid="transfer-btn"class="px-4 py-2 bg-blue-100 text-blue-800 rounded hover:bg-blue-200"><iclass="fas fa-exchange-alt mr-1"></i> 转接 </button><buttonid="close-chat-btn"class="px-4 py-2 bg-red-100 text-red-800 rounded hover:bg-red-200"><iclass="fas fa-times mr-1"></i> 结束 </button></div></div><!-- 客户信息卡片 --><divid="customer-card"class="mt-4 hidden"><divclass="bg-gray-50 p-3 rounded-lg"><divclass="grid grid-cols-3 gap-4 text-sm"><div><divclass="text-gray-500">客户姓名</div><divid="customer-name"class="font-medium">-</div></div><div><divclass="text-gray-500">联系电话</div><divid="customer-phone"class="font-medium">-</div></div><div><divclass="text-gray-500">电子邮件</div><divid="customer-email"class="font-medium">-</div></div></div><divclass="mt-3"><divclass="text-gray-500">历史记录</div><divclass="text-sm"><spanid="history-count">0</span> 次对话 | 最后联系: <spanid="last-contact">-</span></div></div></div></div></div><!-- 聊天消息区域 --><divclass="flex-1 overflow-y-auto p-4 bg-gray-50"id="message-container"><divclass="max-w-4xl mx-auto space-y-4"><!-- 欢迎消息 --><divclass="text-center py-8"id="welcome-message"><divclass="text-gray-400 mb-2"><iclass="fas fa-comments text-4xl"></i></div><h3class="text-xl font-medium text-gray-600">选择左侧的对话开始聊天</h3><pclass="text-gray-500 mt-2">AI已自动处理常规问题,需要人工介入的对话会显示在这里</p></div><!-- 消息将通过JavaScript动态加载 --></div></div><!-- 消息输入区域 --><divclass="bg-white border-t border-gray-200 p-4"><divclass="max-w-4xl mx-auto"><!-- AI建议 --><divid="ai-suggestions"class="mb-3 hidden"><divclass="text-xs text-gray-500 mb-1">AI建议回复:</div><divclass="flex flex-wrap gap-2"><buttonclass="ai-suggestion-btn text-xs bg-green-100 text-green-800 px-3 py-1 rounded-full hover:bg-green-200"> 好的,我明白了 </button><buttonclass="ai-suggestion-btn text-xs bg-green-100 text-green-800 px-3 py-1 rounded-full hover:bg-green-200"> 请提供更多细节 </button><buttonclass="ai-suggestion-btn text-xs bg-green-100 text-green-800 px-3 py-1 rounded-full hover:bg-green-200"> 我帮您查询一下 </button></div></div><!-- 输入框 --><divclass="flex items-end space-x-2"><divclass="flex-1 border rounded-lg"><divclass="p-2"><textareaid="message-input"rows="2"placeholder="输入消息..."class="w-full border-none focus:outline-none resize-none"></textarea></div><divclass="border-t px-2 py-1 flex justify-between items-center"><divclass="flex space-x-2"><buttonclass="text-gray-400 hover:text-gray-600"><iclass="fas fa-paperclip"></i></button><buttonclass="text-gray-400 hover:text-gray-600"><iclass="far fa-smile"></i></button></div><divclass="text-xs text-gray-500"><spanid="char-count">0</span>/1000 </div></div></div><buttonid="send-btn"class="px-6 py-3 bg-blue-600 text-white rounded-lg hover:bg-blue-700"><iclass="fas fa-paper-plane"></i></button></div><!-- 状态提示 --><divclass="mt-2 text-xs text-gray-500 flex items-center"><divclass="flex-1"><spanid="typing-indicator"class="hidden"><spanclass="typing-indicator"></span><spanclass="typing-indicator"></span><spanclass="typing-indicator"></span> 客户正在输入... </span></div><div><spanid="connection-status"class="text-green-600"><iclass="fas fa-circle"></i> 连接正常 </span></div></div></div></div></div><!-- 右侧边栏 (AI分析) --><divclass="w-96 bg-white border-l border-gray-200 p-4 overflow-y-auto"><h3class="font-semibold mb-4">AI分析面板</h3><!-- 情感分析 --><divclass="mb-6"><divclass="flex justify-between items-center mb-2"><h4class="font-medium">情感分析</h4><spanclass="text-xs text-gray-500"id="sentiment-score">-</span></div><divclass="h-2 bg-gray-200 rounded-full overflow-hidden"><divid="sentiment-bar"class="h-full bg-green-500 w-1/2"></div></div><divclass="flex justify-between text-xs mt-1"><spanclass="text-red-500">负面</span><spanclass="text-gray-500">中性</span><spanclass="text-green-500">正面</span></div><divclass="mt-2 text-sm"id="sentiment-desc">暂无数据</div></div><!-- 意图识别 --><divclass="mb-6"><h4class="font-medium mb-2">意图识别</h4><divid="intent-tags"class="flex flex-wrap gap-1"><!-- 意图标签将通过JavaScript动态加载 --></div><divclass="mt-3 text-sm"id="intent-desc">暂无数据</div></div><!-- 知识库建议 --><divclass="mb-6"><divclass="flex justify-between items-center mb-2"><h4class="font-medium">相关知识</h4><spanclass="text-xs text-gray-500"id="kb-match-score">-</span></div><divid="knowledge-suggestions"class="space-y-2"><!-- 知识条目将通过JavaScript动态加载 --><divclass="text-sm text-gray-500 italic">暂无相关建议</div></div></div><!-- 对话摘要 --><divclass="mb-6"><h4class="font-medium mb-2">对话摘要</h4><divclass="text-sm bg-gray-50 p-3 rounded"id="conversation-summary"> 暂无摘要 </div></div><!-- 下一步建议 --><div><h4class="font-medium mb-2">AI建议</h4><divclass="space-y-2"><divclass="flex items-start"><divclass="w-6 h-6 bg-green-100 text-green-800 rounded-full flex items-center justify-center mr-2 mt-0.5"><iclass="fas fa-robot text-xs"></i></div><divclass="text-sm flex-1"><divid="ai-recommendation">等待分析...</div><buttonid="apply-recommendation"class="mt-1 text-xs text-blue-600 hover:text-blue-800 hidden"><iclass="fas fa-check mr-1"></i> 应用建议 </button></div></div></div></div></div></div><!-- JavaScript --><script>classAgentDashboard{constructor(){this.currentConversationId =null;this.wsConnection =null;this.agentId ='agent_001';this.agentStatus ='offline';this.conversations =[];this.messageInput = document.getElementById('message-input');this.sendBtn = document.getElementById('send-btn');this.messageContainer = document.getElementById('message-container');this.conversationList = document.getElementById('conversation-list');this.typingIndicator = document.getElementById('typing-indicator');this.init();}init(){// 初始化事件监听器this.initEventListeners();// 连接WebSocketthis.connectWebSocket();// 加载模拟数据this.loadMockData();// 更新坐席状态this.updateAgentStatus();// 开始心跳检测this.startHeartbeat();}initEventListeners(){// 状态选择 document.getElementById('status-select').addEventListener('change',(e)=>{this.agentStatus = e.target.value;this.updateAgentStatus();});// 发送消息this.sendBtn.addEventListener('click',()=>this.sendMessage());this.messageInput.addEventListener('keypress',(e)=>{if(e.key ==='Enter'&&!e.shiftKey){ e.preventDefault();this.sendMessage();}});// 输入字符计数this.messageInput.addEventListener('input',(e)=>{const count = e.target.value.length; document.getElementById('char-count').textContent = count;});// 快捷回复按钮 document.querySelectorAll('.quick-reply-btn').forEach(btn=>{ btn.addEventListener('click',(e)=>{const reply = e.target.dataset.reply;this.messageInput.value = reply;this.messageInput.focus();});});// AI协助按钮 document.getElementById('ai-assist-btn').addEventListener('click',()=>{this.requestAIAssistance();});// 转接按钮 document.getElementById('transfer-btn').addEventListener('click',()=>{this.transferConversation();});// 结束聊天按钮 document.getElementById('close-chat-btn').addEventListener('click',()=>{this.closeConversation();});}connectWebSocket(){// 这里应该连接到实际的WebSocket服务器// 简化示例:模拟连接 console.log('连接到WebSocket服务器...');// 模拟WebSocket消息setTimeout(()=>{ document.getElementById('connection-status').innerHTML ='<i></i> 连接正常';},1000);}loadMockData(){// 加载模拟对话数据this.conversations =[{ id:'conv_001', customer_id:'cust_001', customer_name:'张三', last_message:'你好,我想咨询产品价格', last_time:'10:20', unread:2, status:'active', sentiment:'neutral', agent_assigned:true},{ id:'conv_002', customer_id:'cust_002', customer_name:'李四', last_message:'系统出现错误,无法登录', last_time:'09:45', unread:0, status:'pending', sentiment:'negative', agent_assigned:false},{ id:'conv_003', customer_id:'cust_003', customer_name:'王五', last_message:'退款什么时候能到账?', last_time:'昨天', unread:1, status:'transferring', sentiment:'negative', agent_assigned:true}];// 更新对话列表this.updateConversationList();// 更新统计数据this.updateStatistics();}updateConversationList(){const container =this.conversationList;const noConversations = document.getElementById('no-conversations');if(this.conversations.length ===0){ noConversations.classList.remove('hidden'); container.innerHTML ='';return;} noConversations.classList.add('hidden');let html ='';this.conversations.forEach(conv=>{const statusClass =`conversation-${conv.status}`;const unreadBadge = conv.unread >0?`<span>${conv.unread}</span>`:''; html +=` <divtoken interpolation">${statusClass}"> <div> <div> <div>${conv.customer_name}</div> <div>${conv.last_message}</div> </div> <div> <div>${conv.last_time}</div> ${unreadBadge} </div> </div> <div> <spantoken interpolation">${ conv.sentiment ==='positive'?'bg-green-100 text-green-800': conv.sentiment ==='negative'?'bg-red-100 text-red-800':'bg-gray-100 text-gray-800'}"> ${conv.sentiment ==='positive'?'正面': conv.sentiment ==='negative'?'负面':'中性'} </span> ${conv.agent_assigned ?'<span>已分配</span>':'<span>待分配</span>'} </div> </div> `;}); container.innerHTML = html;// 添加对话点击事件 document.querySelectorAll('.conversation-item').forEach(item=>{ item.addEventListener('click',(e)=>{const conversationId = item.dataset.conversationId;const customerId = item.dataset.customerId;this.selectConversation(conversationId, customerId);});});// 更新对话计数 document.getElementById('conversation-count').textContent =this.conversations.length;}selectConversation(conversationId, customerId){this.currentConversationId = conversationId;// 更新UIconst conversation =this.conversations.find(c=> c.id === conversationId);if(conversation){ document.getElementById('current-customer').textContent = conversation.customer_name; document.getElementById('conversation-info').textContent =`对话ID: ${conversationId}`;// 更新状态徽章const statusBadge = document.getElementById('conversation-status-badge'); statusBadge.textContent =this.getStatusText(conversation.status); statusBadge.className =`ml-3 text-xs px-2 py-1 rounded-full ${ conversation.status ==='active'?'bg-green-100 text-green-800': conversation.status ==='pending'?'bg-yellow-100 text-yellow-800':'bg-blue-100 text-blue-800'}`;// 显示客户卡片this.showCustomerCard(customerId);// 隐藏欢迎消息 document.getElementById('welcome-message').classList.add('hidden');// 加载消息历史this.loadConversationMessages(conversationId);// 更新AI分析面板this.updateAIAnalysis(conversation);// 标记为已读this.markConversationAsRead(conversationId);}}showCustomerCard(customerId){// 显示客户卡片 document.getElementById('customer-card').classList.remove('hidden');// 模拟客户数据const mockCustomer ={ name:'张三', phone:'13800138000', email:'[email protected]', history_count:5, last_contact:'2024-01-10'}; document.getElementById('customer-name').textContent = mockCustomer.name; document.getElementById('customer-phone').textContent = mockCustomer.phone; document.getElementById('customer-email').textContent = mockCustomer.email; document.getElementById('history-count').textContent = mockCustomer.history_count; document.getElementById('last-contact').textContent = mockCustomer.last_contact;}loadConversationMessages(conversationId){// 清空消息容器const container = document.getElementById('message-container').querySelector('.max-w-4xl'); container.innerHTML ='<div></div>';// 模拟消息数据const mockMessages =[{ id:'msg_001', sender_type:'customer', content:'你好,我想咨询一下你们的产品', time:'10:15'},{ id:'msg_002', sender_type:'ai', content:'您好!我是AI客服助手,很高兴为您服务。我们有多款产品,请问您想了解哪方面的信息?', time:'10:16'},{ id:'msg_003', sender_type:'customer', content:'我想知道企业版的价格和功能', time:'10:17'},{ id:'msg_004', sender_type:'agent', content:'您好,我是人工客服张客服。企业版的价格根据用户数量不同有所差异,基础功能包括...', time:'10:20'}];// 添加消息到界面 mockMessages.forEach(msg=>{this.addMessageToUI(msg);});// 滚动到底部this.scrollToBottom();}addMessageToUI(message){const container = document.getElementById('message-container').querySelector('.max-w-4xl');const messageDiv = document.createElement('div'); messageDiv.className =`flex ${message.sender_type ==='customer'?'justify-start':'justify-end'}`;const messageClass = message.sender_type ==='customer'?'message-customer': message.sender_type ==='agent'?'message-agent':'message-ai';const senderLabel = message.sender_type ==='customer'?'客户': message.sender_type ==='agent'?'坐席':'AI助手'; messageDiv.innerHTML =` <div> <divtoken interpolation">${message.sender_type ==='customer'?'justify-start':'justify-end'}"> <span>${senderLabel} • ${message.time}</span> </div> <divtoken interpolation">${messageClass} rounded-2xl px-4 py-3"> ${message.content} </div> </div> `; container.appendChild(messageDiv);}sendMessage(){const content =this.messageInput.value.trim();if(!content ||!this.currentConversationId)return;// 创建消息对象const message ={ id:'msg_'+ Date.now(), sender_type:'agent', content: content, time:newDate().toLocaleTimeString([],{hour:'2-digit', minute:'2-digit'})};// 添加到UIthis.addMessageToUI(message);// 清空输入框this.messageInput.value =''; document.getElementById('char-count').textContent ='0';// 滚动到底部this.scrollToBottom();// 模拟客户回复(3秒后)setTimeout(()=>{this.simulateCustomerReply();},3000);// 更新AI分析this.updateAIAfterMessage(content);}simulateCustomerReply(){const mockReplies =['好的,我明白了','那价格是多少呢?','有没有试用版?','我需要和团队商量一下','谢谢你的解答'];const randomReply = mockReplies[Math.floor(Math.random()* mockReplies.length)];const message ={ id:'msg_'+ Date.now(), sender_type:'customer', content: randomReply, time:newDate().toLocaleTimeString([],{hour:'2-digit', minute:'2-digit'})};// 显示"正在输入"指示器this.showTypingIndicator();// 2秒后显示消息setTimeout(()=>{this.hideTypingIndicator();this.addMessageToUI(message);this.scrollToBottom();// 更新对话列表this.updateConversationLastMessage(randomReply);},2000);}showTypingIndicator(){this.typingIndicator.classList.remove('hidden');}hideTypingIndicator(){this.typingIndicator.classList.add('hidden');}updateConversationLastMessage(message){const conversation =this.conversations.find(c=> c.id ===this.currentConversationId);if(conversation){ conversation.last_message = message; conversation.last_time =newDate().toLocaleTimeString([],{hour:'2-digit', minute:'2-digit'}); conversation.unread =0;this.updateConversationList();}}updateAIAnalysis(conversation){// 更新情感分析 document.getElementById('sentiment-score').textContent = conversation.sentiment ==='positive'?'75%': conversation.sentiment ==='negative'?'30%':'50%'; document.getElementById('sentiment-bar').style.width = conversation.sentiment ==='positive'?'75%': conversation.sentiment ==='negative'?'30%':'50%'; document.getElementById('sentiment-bar').className =`h-full ${ conversation.sentiment ==='positive'?'bg-green-500': conversation.sentiment ==='negative'?'bg-red-500':'bg-yellow-500'}`; document.getElementById('sentiment-desc').textContent = conversation.sentiment ==='positive'?'客户情绪积极,对话进展顺利': conversation.sentiment ==='negative'?'客户情绪负面,需要特别关注':'客户情绪中性,对话正常进行';// 更新意图识别const intentTags = document.getElementById('intent-tags'); intentTags.innerHTML =` <span>产品咨询</span> <span>价格询问</span> `; document.getElementById('intent-desc').textContent ='客户主要关心产品价格和功能';// 更新知识库建议const knowledgeContainer = document.getElementById('knowledge-suggestions'); knowledgeContainer.innerHTML =` <div> <div>企业版功能介绍</div> <div>支持最多1000用户,包含高级分析功能,提供专属技术支持。</div> </div> <div> <div>价格策略</div> <div>基础版免费,专业版$29/月,企业版定制报价。</div> </div> `; document.getElementById('kb-match-score').textContent ='85%匹配';// 更新对话摘要 document.getElementById('conversation-summary').textContent ='客户咨询企业版产品功能和价格,AI已提供基本信息,现由人工坐席进一步解答。';// 更新AI建议 document.getElementById('ai-recommendation').textContent ='建议提供详细的功能对比表和报价单,询问客户团队规模以便精准报价。'; document.getElementById('apply-recommendation').classList.remove('hidden');}updateAIAfterMessage(message){// 基于发送的消息更新AI分析// 这里可以添加更智能的分析逻辑 console.log('分析消息:', message);// 显示AI建议回复const suggestions = document.getElementById('ai-suggestions'); suggestions.classList.remove('hidden');// 添加AI建议点击事件 document.querySelectorAll('.ai-suggestion-btn').forEach(btn=>{ btn.addEventListener('click',(e)=>{this.messageInput.value = e.target.textContent;this.messageInput.focus();});});}requestAIAssistance(){if(!this.currentConversationId){alert('请先选择一个对话');return;}// 模拟AI协助const aiResponse ={ id:'ai_'+ Date.now(), sender_type:'ai', content:'根据对话分析,客户主要关注价格和功能对比。建议:1. 提供详细报价单 2. 安排产品演示 3. 询问预算范围', time:newDate().toLocaleTimeString([],{hour:'2-digit', minute:'2-digit'})};this.addMessageToUI(aiResponse);this.scrollToBottom();// 更新AI建议 document.getElementById('ai-recommendation').textContent ='已分析对话历史,建议提供个性化报价和产品演示邀请。';}transferConversation(){if(!this.currentConversationId){alert('请先选择一个对话');return;}const targetAgent =prompt('请输入目标坐席ID或部门:');if(targetAgent){alert(`已将对话转接给 ${targetAgent}`);// 这里应该调用API执行转接}}closeConversation(){if(!this.currentConversationId){alert('请先选择一个对话');return;}if(confirm('确定要结束这个对话吗?')){const reason =prompt('请输入结束原因:','问题已解决');if(reason !==null){// 这里应该调用API结束对话alert(`对话已结束,原因:${reason}`);// 从列表中移除this.conversations =this.conversations.filter(c=> c.id !==this.currentConversationId );// 更新界面this.updateConversationList();this.resetChatArea();}}}resetChatArea(){this.currentConversationId =null;// 重置UI document.getElementById('current-customer').textContent ='未选择对话'; document.getElementById('conversation-info').textContent ='-'; document.getElementById('conversation-status-badge').textContent ='-'; document.getElementById('customer-card').classList.add('hidden'); document.getElementById('welcome-message').classList.remove('hidden');// 清空消息区域const container = document.getElementById('message-container').querySelector('.max-w-4xl'); container.innerHTML ='<div></div>';// 隐藏AI建议 document.getElementById('ai-suggestions').classList.add('hidden');}markConversationAsRead(conversationId){const conversation =this.conversations.find(c=> c.id === conversationId);if(conversation){ conversation.unread =0;this.updateConversationList();}}updateStatistics(){const activeChats =this.conversations.filter(c=> c.status ==='active').length;const waitingChats =this.conversations.filter(c=> c.status ==='pending').length; document.getElementById('active-chats').textContent = activeChats; document.getElementById('total-chats').textContent =this.conversations.length; document.getElementById('waiting-chats').textContent = waitingChats; document.getElementById('avg-response').textContent ='45s';}updateAgentStatus(){ document.getElementById('agent-status').textContent =this.agentStatus ==='online'?'在线':this.agentStatus ==='away'?'离开':'离线';// 这里应该通知服务器状态变更 console.log('坐席状态变更为:',this.agentStatus);}getStatusText(status){const statusMap ={'active':'进行中','pending':'等待中','transferring':'转接中','closed':'已结束','escalated':'已升级'};return statusMap[status]|| status;}scrollToBottom(){const container = document.getElementById('message-container'); container.scrollTop = container.scrollHeight;}startHeartbeat(){// 心跳检测setInterval(()=>{if(this.wsConnection &&this.wsConnection.readyState === WebSocket.OPEN){this.wsConnection.send(JSON.stringify({ type:'ping'}));}},30000);}}// 初始化应用 document.addEventListener('DOMContentLoaded',()=>{const dashboard =newAgentDashboard(); window.dashboard = dashboard;// 暴露给控制台调试});</script></body></html>8. 部署与运维
8.1 Docker部署配置
创建 Dockerfile:
# 使用Python官方镜像 FROM python:3.10-slim # 设置工作目录 WORKDIR /app # 设置环境变量 ENV PYTHONDONTWRITEBYTECODE=1 ENV PYTHONUNBUFFERED=1 ENV DEBIAN_FRONTEND=noninteractive # 安装系统依赖 RUN apt-get update && apt-get install -y \ gcc \ g++ \ build-essential \ curl \ git \ && rm -rf /var/lib/apt/lists/* # 安装Python依赖 COPY requirements.txt . RUN pip install --upgrade pip && \ pip install --no-cache-dir -r requirements.txt # 复制项目文件 COPY . . # 创建非root用户 RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app USER appuser # 暴露端口 EXPOSE 8000 # 启动命令 CMD ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000"] 创建 docker-compose.yml:
version:'3.8'services:# 数据库服务postgres:image: postgres:15environment:POSTGRES_USER: ${POSTGRES_USER:-postgres}POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-password}POSTGRES_DB: ${POSTGRES_DB:-customer_service}volumes:- postgres_data:/var/lib/postgresql/data - ./scripts/init.sql:/docker-entrypoint-initdb.d/init.sql ports:-"5432:5432"healthcheck:test:["CMD-SHELL","pg_isready -U postgres"]interval: 10s timeout: 5s retries:5networks:- ai-customer-service # Redis服务redis:image: redis:7-alpine command: redis-server --requirepass ${REDIS_PASSWORD:-password}volumes:- redis_data:/data ports:-"6379:6379"healthcheck:test:["CMD","redis-cli","ping"]interval: 10s timeout: 5s retries:5networks:- ai-customer-service # 主API服务api:build: . depends_on:postgres:condition: service_healthy redis:condition: service_healthy environment:- ENVIRONMENT=production - POSTGRES_SERVER=postgres - POSTGRES_PORT=5432 - REDIS_HOST=redis - REDIS_PORT=6379 volumes:- ./models:/app/models - ./data:/app/data ports:-"8000:8000"command:> sh -c "python scripts/init_database.py && uvicorn src.api.main:app --host 0.0.0.0 --port 8000 --workers 4"networks:- ai-customer-service # 前端服务(坐席界面)frontend:build:context: . dockerfile: Dockerfile.frontend ports:-"3000:80"depends_on:- api networks:- ai-customer-service # 监控服务(可选)monitoring:image: grafana/grafana:latest environment:- GF_SECURITY_ADMIN_PASSWORD=admin ports:-"3001:3000"volumes:- grafana_data:/var/lib/grafana networks:- ai-customer-service # 消息队列(可选)rabbitmq:image: rabbitmq:3-management environment:RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER:-guest}RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD:-guest}ports:-"5672:5672"-"15672:15672"volumes:- rabbitmq_data:/var/lib/rabbitmq networks:- ai-customer-service networks:ai-customer-service:driver: bridge volumes:postgres_data:redis_data:grafana_data:rabbitmq_data:创建 scripts/init.sql 数据库初始化脚本:
-- 创建扩展CREATE EXTENSION IFNOTEXISTS"uuid-ossp";CREATE EXTENSION IFNOTEXISTS"pgvector";-- 创建表(SQLAlchemy会自动创建,这里提供备份脚本)-- 注意:实际表结构由SQLAlchemy模型定义8.2 监控与日志配置
创建 src/core/monitoring.py:
""" 监控配置 配置指标收集、日志记录和性能监控 """import logging import time from typing import Dict, Any, Optional from contextlib import contextmanager from datetime import datetime import prometheus_client from prometheus_client import Counter, Histogram, Gauge, Summary classMetricsCollector:"""指标收集器"""def__init__(self): self.request_count = Counter('http_requests_total','Total HTTP requests',['method','endpoint','status']) self.request_duration = Histogram('http_request_duration_seconds','HTTP request duration in seconds',['method','endpoint']) self.conversation_count = Gauge('conversations_active','Number of active conversations') self.message_count = Counter('messages_processed_total','Total messages processed',['handler_type','intent']) self.ai_response_time = Summary('ai_response_time_seconds','AI response time in seconds') self.error_count = Counter('errors_total','Total errors',['error_type','component'])# 业务指标 self.customer_satisfaction = Gauge('customer_satisfaction_score','Customer satisfaction score') self.agent_performance = Gauge('agent_performance_score','Agent performance score',['agent_id']) self.ai_confidence = Histogram('ai_confidence_score','AI confidence scores', buckets=[0.1,0.3,0.5,0.7,0.9,1.0])defrecord_request(self, method:str, endpoint:str, status:int, duration:float):"""记录HTTP请求""" self.request_count.labels(method, endpoint, status).inc() self.request_duration.labels(method, endpoint).observe(duration)defrecord_message(self, handler_type:str, intent:str):"""记录消息处理""" self.message_count.labels(handler_type, intent).inc()defrecord_ai_response_time(self, duration:float):"""记录AI响应时间""" self.ai_response_time.observe(duration)defrecord_error(self, error_type:str, component:str):"""记录错误""" self.error_count.labels(error_type, component).inc()defupdate_conversation_count(self, count:int):"""更新对话计数""" self.conversation_count.set(count)defupdate_satisfaction(self, score:float):"""更新满意度分数""" self.customer_satisfaction.set(score)defupdate_agent_performance(self, agent_id:str, score:float):"""更新坐席表现""" self.agent_performance.labels(agent_id).set(score)defrecord_ai_confidence(self, confidence:float):"""记录AI置信度""" self.ai_confidence.observe(confidence)classPerformanceMonitor:"""性能监控器"""def__init__(self): self.metrics = MetricsCollector() self.logger = logging.getLogger(__name__)@contextmanagerdefmeasure_request(self, method:str, endpoint:str):"""测量请求性能""" start_time = time.time() status =200try:yieldexcept Exception as e: status =500 self.metrics.record_error(type(e).__name__,'request_handler')raisefinally: duration = time.time()- start_time self.metrics.record_request(method, endpoint, status, duration)@contextmanagerdefmeasure_ai_response(self):"""测量AI响应性能""" start_time = time.time()try:yieldfinally: duration = time.time()- start_time self.metrics.record_ai_response_time(duration)# 创建全局监控器实例 monitor = PerformanceMonitor()defsetup_logging():"""配置日志""" log_format ='%(asctime)s - %(name)s - %(levelname)s - %(message)s'# 控制台处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter(log_format))# 文件处理器 file_handler = logging.FileHandler('logs/app.log') file_handler.setFormatter(logging.Formatter(log_format))# 配置根日志器 logging.basicConfig( level=logging.INFO, handlers=[console_handler, file_handler],format=log_format )# 设置第三方库日志级别 logging.getLogger('uvicorn').setLevel(logging.WARNING) logging.getLogger('sqlalchemy').setLevel(logging.WARNING) logging.getLogger('aiosqlite').setLevel(logging.WARNING)defstart_metrics_server(port:int=9090):"""启动指标服务器"""from prometheus_client import start_http_server try: start_http_server(port) logging.info(f"指标服务器启动在端口 {port}")except Exception as e: logging.error(f"启动指标服务器失败: {e}")8.3 测试脚本
创建 tests/test_integration.py:
""" 集成测试 测试系统各个组件的集成 """import asyncio import pytest import json from datetime import datetime from typing import Dict, Any from src.services.conversation.manager import ConversationManager from src.ai_components.nlp.processor import NLPProcessor from src.ai_components.retrieval.engine import KnowledgeRetrievalEngine classTestIntegration:"""集成测试类"""@pytest.fixture(autouse=True)asyncdefsetup(self):"""测试设置""" self.conversation_manager = ConversationManager() self.nlp_processor = NLPProcessor() self.retrieval_engine = KnowledgeRetrievalEngine()# 初始化组件await self.nlp_processor.initialize()await self.retrieval_engine.initialize()yield# 清理 self.conversation_manager.conversations.clear()@pytest.mark.asyncioasyncdeftest_end_to_end_conversation(self):"""测试端到端对话流程"""# 1. 创建对话 context =await self.conversation_manager.create_conversation( customer_id="test_customer_001", channel="web", metadata={"test":True})assert context.conversation_id isnotNoneassert context.customer_id =="test_customer_001"# 2. 发送消息 message ={"id":"test_msg_001","content":"你好,我想咨询产品价格","content_type":"text","sender_type":"customer","sender_id":"test_customer_001","metadata":{},"timestamp": datetime.now().isoformat()} result =await self.conversation_manager.process_message( context.conversation_id, message )assert result["conversation_id"]== context.conversation_id assert result["handler_type"]in["ai","hybrid","agent"]assert"response"in result assert"analysis"in result # 3. 验证NLP分析 nlp_analysis = result["analysis"]assert"intent"in nlp_analysis assert"confidence"in nlp_analysis assert"sentiment"in nlp_analysis # 4. 测试知识检索 retrieval_result =await self.retrieval_engine.search("产品价格")assert retrieval_result.query =="产品价格"assertisinstance(retrieval_result.results,list)assertisinstance(retrieval_result.processing_time_ms,float)# 5. 关闭对话await self.conversation_manager.close_conversation( context.conversation_id,"test_complete") closed_context =await self.conversation_manager.get_conversation( context.conversation_id )assert closed_context.metadata["current_state"]=="closed"assert closed_context.metadata["closed_reason"]=="test_complete"@pytest.mark.asyncioasyncdeftest_nlp_processing(self):"""测试NLP处理""" test_cases =[{"text":"你好,今天天气不错","expected_intent":"greeting","min_confidence":0.5},{"text":"我要投诉,服务太差了","expected_intent":"complaint","min_confidence":0.6},{"text":"怎么重置密码?","expected_intent":"technical_support","min_confidence":0.5}]for test_case in test_cases: analysis =await self.nlp_processor.analyze_text(test_case["text"])assert analysis.text == test_case["text"]assert analysis.intent_result.intent == test_case["expected_intent"]assert analysis.intent_result.confidence >= test_case["min_confidence"]assert analysis.sentiment_result.sentiment in["positive","negative","neutral"]assert analysis.processing_time_ms >[email protected]_knowledge_retrieval(self):"""测试知识检索"""# 添加测试文档 test_documents =[{"id":"test_doc_001","content":"产品价格分为三个档次:基础版免费,专业版每月29美元,企业版定制报价。","metadata":{"category":"pricing","importance":9}},{"id":"test_doc_002","content":"技术支持联系方式:电话400-123-4567,邮箱[email protected]。","metadata":{"category":"contact","importance":8}}]await self.retrieval_engine.add_documents(test_documents)# 测试检索 queries =[("价格是多少",1),("怎么联系技术支持",1),("产品功能和定价",2)]for query, expected_min_results in queries: result =await self.retrieval_engine.search(query)assert result.query == query assertlen(result.results)>= expected_min_results for search_result in result.results:assert search_result.idisnotNoneassert search_result.content isnotNoneassert0<= search_result.score <=1assertisinstance(search_result.metadata,dict)@pytest.mark.asyncioasyncdeftest_conversation_state_management(self):"""测试对话状态管理"""# 创建多个对话 conversations =[]for i inrange(3): context =await self.conversation_manager.create_conversation( customer_id=f"test_customer_{i:03d}", channel="web") conversations.append(context)# 验证对话计数assertlen(self.conversation_manager.conversations)==3# 测试超时清理(模拟)# 注意:实际测试需要模拟时间流逝# 关闭对话for conv in conversations:await self.conversation_manager.close_conversation( conv.conversation_id,"test") closed_conv =await self.conversation_manager.get_conversation( conv.conversation_id )assert closed_conv.metadata["current_state"]=="closed"@pytest.mark.asyncioasyncdeftest_error_handling(self):"""测试错误处理"""# 测试不存在的对话with pytest.raises(ValueError):await self.conversation_manager.process_message("non_existent_conversation",{"content":"test"})# 测试空消息 context =await self.conversation_manager.create_conversation( customer_id="error_test", channel="web") empty_message ={"id":"empty_msg","content":"","sender_type":"customer"} result =await self.conversation_manager.process_message( context.conversation_id, empty_message )# 应该能处理空消息(虽然可能置信度低)assert result isnotNoneassert result["handler_type"]=="ai"# 默认处理if __name__ =="__main__":# 手动运行测试import asyncio asyncdefrun_tests(): tester = TestIntegration()await tester.setup()print("运行端到端对话测试...")await tester.test_end_to_end_conversation()print("运行NLP处理测试...")await tester.test_nlp_processing()print("运行知识检索测试...")await tester.test_knowledge_retrieval()print("运行对话状态管理测试...")await tester.test_conversation_state_management()print("运行错误处理测试...")await tester.test_error_handling()print("所有测试通过!") asyncio.run(run_tests())9. 项目总结与扩展建议
9.1 项目总结
通过以上实现,我们构建了一个完整的AI智能客服与坐席客服融合系统,具有以下特点:
- 模块化架构:系统采用清晰的微服务架构,各组件职责分明,易于维护和扩展。
- 智能对话管理:实现了基于意图识别、情感分析和上下文理解的智能对话路由。
- 人机无缝协作:AI与人工坐席可以平滑切换,AI提供实时辅助建议。
- 知识驱动响应:基于向量检索的知识库系统,提供准确、及时的响应。
- 实时通信:支持WebSocket实时通信,提供流畅的用户体验。
- 完整的监控运维:包含日志记录、性能监控和健康检查。
9.2 扩展建议
- 高级AI模型集成:
- 集成GPT-4、Claude等大语言模型,提升对话质量
- 实现多模态理解(图像、语音、视频)
- 添加语音合成(TTS)和语音识别(ASR)功能
- 高级功能扩展:
- 实时翻译支持多语言客服
- 情感语音合成,让AI回复更有情感
- 预测性分析,提前发现客户需求
- 自动化工作流程,集成CRM系统
- 性能优化:
- 实现模型量化,减少内存占用
- 添加缓存层,提高响应速度
- 实现负载均衡和水平扩展
- 安全增强:
- 添加端到端加密
- 实现更严格的访问控制
- 添加审计日志和合规性检查
- 部署优化:
- Kubernetes编排,实现自动伸缩
- CI/CD流水线,自动化部署
- 多区域部署,提高可用性
9.3 快速启动指南
- 环境准备:
# 克隆项目git clone <repository-url>cd ai-human-customer-service # 安装依赖 pip install -r requirements.txt # 创建环境文件 python -c "from src.core.config import create_env_file; create_env_file()"# 编辑.env文件,配置数据库等- 启动服务:
# 使用Docker Compose(推荐) docker-compose up -d # 或直接运行 python src/api/main.py - 访问服务:
- API文档:http://localhost:8000/docs
- 坐席界面:http://localhost:3000
- 监控面板:http://localhost:3001 (如果启用)
- 运行测试:
pytest tests/ -v 这个系统提供了一个完整的AI客服解决方案基础框架,可以根据具体需求进行定制和扩展。系统设计考虑了生产环境的可用性、可扩展性和可维护性,是一个可以立即投入使用的企业级解决方案。