跳到主要内容AI 与人工客服融合系统企业级架构设计 | 极客日志PythonSaaSWeChatAI大前端算法
AI 与人工客服融合系统企业级架构设计
介绍基于 Python、FastAPI 和 Rasa 构建的 AI 与人工客服融合系统。采用微服务架构,集成 NLP 意图识别、情感分析及向量知识库检索。支持多渠道接入、实时 WebSocket 通信及坐席协作。通过 Docker 部署,实现高可用与可扩展性,提供完整的监控运维方案。
ArchDesign30 浏览 
1. 项目概述与架构设计
1.1 系统核心设计理念
本系统采用 'AI 优先,人工兜底' 的混合交互模式,构建一个能够自动处理 80% 常规查询,同时无缝转接复杂问题给人工坐席的智能客服系统。系统基于微服务架构,确保高可用性、可扩展性和模块化设计。
1.2 整体架构图
┌─────────────────────────────────────────────────────────────┐
│ 客户端层 (多渠道接入) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Web 聊天 │ │ 移动 APP │ │ 微信 │ │ 电话接口│ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
└────────────────────────┬────────────────────────────────────┘
│ HTTPS/WebSocket
┌────────────────────────┴────────────────────────────────────┐
│ API 网关层 (统一接入) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 认证授权 │ 流量控制 │ 协议转换 │ 请求路由 │ 负载均衡 │ │
│ └─────────────────────────────────────────────────────┘ │
└────────────────────────┬────────────────────────────────────┘
│ 内部 RPC/gRPC
┌────────────────────────┴────────────────────────────────────┐
│ 业务逻辑层 (核心服务) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │对话管理 │ │意图识别 │ │知识检索 │ │坐席协作 │ │
│ │ 服务 │ │ 服务 │ │ 服务 │ │ 服务 │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
└────────────────────────┬────────────────────────────────────┘
│ 消息队列/数据库访问
┌────────────────────────┴────────────────────────────────────┐
│ 数据与 AI 层 (能力支撑) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │向量数据库│ │关系数据库│ │缓存服务 │ │AI 模型 │ │
│ │(FAISS/Chroma)│(PostgreSQL/MySQL)│(Redis)│ │服务 │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────────────────────────┘
1.3 技术栈选择
| 组件 | 技术选择 | 理由 |
|---|
| 后端框架 | FastAPI + Python 3.10+ | 异步高性能,自动 API 文档生成 |
| 对话引擎 | Rasa 3.x + 自定义扩展 | 开源 NLU 框架,灵活可控 |
| AI 模型 | Sentence Transformers, BERT, GPT-2/3 |
| 向量数据库 | FAISS + PostgreSQL (pgvector) | 高性能相似度检索 |
| 实时通信 | WebSocket + Redis Pub/Sub | 低延迟消息传递 |
| 前端框架 | React + TypeScript + TailwindCSS | 现代化界面,组件化 |
| 坐席桌面 | Electron + React | 跨平台桌面应用 |
| 部署 | Docker + Kubernetes | 容器化,易于扩展 |
2. 环境搭建与项目初始化
2.1 开发环境配置
mkdir -p ai-human-customer-service
cd ai-human-customer-service
python3.10 -m venv venv
source venv/bin/activate
pip install --upgrade pip
pip install fastapi==0.104.1
pip install uvicorn[standard]==0.24.0
pip install sqlalchemy==2.0.23
pip install asyncpg==0.29.0
pip install redis==5.0.1
pip install pydantic==2.5.0
pip install pydantic-settings==2.1.0
pip install python-multipart==0.0.6
pip install websockets==12.0
pip install aiohttp==3.9.1
pip install jwt==1.3.1
pip install python-dateutil==2.8.2
pip install loguru==0.7.2
mkdir -p {src,tests,docs,scripts,deploy}
mkdir -p src/{api,core,models,schemas,services,utils,ai_components}
mkdir -p src/api/{endpoints,middleware}
mkdir -p src/core/{config,database,security}
mkdir -p src/ai_components/{nlp,retrieval,models}
mkdir -p tests/{unit,integration}
2.2 配置文件设计
""" 系统配置管理模块
使用 pydantic-settings 进行配置管理,支持环境变量覆盖
"""
from typing import Optional, List, Dict, Any
from pydantic_settings import BaseSettings
from pydantic import Field, validator
import secrets
class Settings(BaseSettings):
"""应用配置类"""
APP_NAME: str = "AI-Human Customer Service"
APP_VERSION: str = "1.0.0"
DEBUG: bool = False
ENVIRONMENT: str = "development"
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "AI Human Customer Service"
BACKEND_CORS_ORIGINS: List[str] = ["http://localhost:3000"]
SECRET_KEY: str = Field(default_factory=lambda: secrets.token_urlsafe(32))
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 7
POSTGRES_SERVER: str = "localhost"
POSTGRES_USER: str = "postgres"
POSTGRES_PASSWORD: str = "password"
POSTGRES_DB: str = "customer_service"
POSTGRES_PORT: str = "5432"
DATABASE_URL: Optional[str] = None
@validator("DATABASE_URL", pre=True)
def assemble_db_connection(cls, v: Optional[str], values: Dict[str, Any]) -> Any:
"""构建数据库连接 URL"""
if isinstance(v, str):
return v
return (f"postgresql+asyncpg://{values.get('POSTGRES_USER')}:{"
f"{values.get('POSTGRES_PASSWORD')}@{values.get('POSTGRES_SERVER')}:{"
f"{values.get('POSTGRES_PORT')}/{values.get('POSTGRES_DB')}")
REDIS_HOST: str = "localhost"
REDIS_PORT: int = 6379
REDIS_DB: int = 0
REDIS_PASSWORD: Optional[str] = None
REDIS_URL: Optional[str] = None
@validator("REDIS_URL", pre=True)
def assemble_redis_connection(cls, v: Optional[str], values: Dict[str, Any]) -> Any:
"""构建 Redis 连接 URL"""
if isinstance(v, str):
return v
password = values.get("REDIS_PASSWORD")
auth_part = f":{password}@" if password else ""
return (f"redis://{auth_part}{values.get('REDIS_HOST')}:{"
f"{values.get('REDIS_PORT')}/{values.get('REDIS_DB')}")
AI_MODEL_PATH: str = "./models"
SENTENCE_TRANSFORMER_MODEL: str = "all-MiniLM-L6-v2"
BERT_MODEL_PATH: str = "bert-base-uncased"
GPT_MODEL_PATH: str = "gpt2"
USE_GPU: bool = True
AI_MODEL_CACHE_SIZE: int = 100
MAX_CONVERSATION_HISTORY: int = 20
SESSION_TIMEOUT_MINUTES: int = 30
DEFAULT_AI_CONFIDENCE_THRESHOLD: float = 0.7
MAX_AGENTS_PER_QUEUE: int = 10
AGENT_HEARTBEAT_INTERVAL: int = 30
AGENT_AWAY_TIMEOUT: int = 300
RABBITMQ_HOST: str = "localhost"
RABBITMQ_PORT: int = 5672
RABBITMQ_USER: str = "guest"
RABBITMQ_PASSWORD: str = "guest"
ENABLE_METRICS: bool = True
METRICS_PORT: int = 9090
LOG_LEVEL: str = "INFO"
class Config:
"""Pydantic 配置"""
env_file = ".env"
case_sensitive = True
extra = "ignore"
settings = Settings()
ENV_TEMPLATE = """# 应用配置
ENVIRONMENT=development
DEBUG=True
# 数据库配置
POSTGRES_SERVER=localhost
POSTGRES_USER=postgres
POSTGRES_PASSWORD=your_password
POSTGRES_DB=customer_service
POSTGRES_PORT=5432
# Redis 配置
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
# AI 模型配置
SENTENCE_TRANSFORMER_MODEL=all-MiniLM-L6-v2
USE_GPU=False
# 安全配置
SECRET_KEY={secret_key}
"""
def create_env_file():
"""创建环境变量文件"""
with open(".env", "w") as f:
secret_key = secrets.token_urlsafe(32)
f.write(ENV_TEMPLATE.format(secret_key=secret_key))
print("✅ .env 文件创建成功")
if __name__ == "__main__":
create_env_file()
3. 核心数据模型设计
3.1 数据库模型定义
""" 数据库基础模型定义
使用 SQLAlchemy ORM 进行数据建模
"""
from datetime import datetime
from typing import Any, Dict, Optional
from sqlalchemy import Column, DateTime, Integer, String, Boolean, Text, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
Base = declarative_base()
class TimestampMixin:
"""时间戳混入类"""
created_at = Column(DateTime, default=func.now(), nullable=False)
updated_at = Column(DateTime, default=func.now(), onupdate=func.now(), nullable=False)
deleted_at = Column(DateTime, nullable=True)
class Customer(Base, TimestampMixin):
"""客户模型"""
__tablename__ = "customers"
id = Column(String(36), primary_key=True, index=True)
external_id = Column(String(100), unique=True, nullable=True, index=True)
name = Column(String(200), nullable=True)
email = Column(String(254), nullable=True, index=True)
phone = Column(String(50), nullable=True, index=True)
metadata = Column(JSON, nullable=True, default=dict)
tags = Column(JSON, nullable=True, default=list)
segment = Column(String(50), nullable=True)
lifetime_value = Column(Integer, default=0)
last_interaction_at = Column(DateTime, nullable=True)
is_active = Column(Boolean, default=True)
notes = Column(Text, nullable=True)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"id": self.id,
"external_id": self.external_id,
"name": self.name,
"email": self.email,
"phone": self.phone,
"metadata": self.metadata or {},
"tags": self.tags or [],
"segment": self.segment,
"lifetime_value": self.lifetime_value,
"last_interaction_at": self.last_interaction_at.isoformat() if self.last_interaction_at else None,
"is_active": self.is_active,
"notes": self.notes,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat()
}
class Conversation(Base, TimestampMixin):
"""对话模型"""
__tablename__ = "conversations"
id = Column(String(36), primary_key=True, index=True)
customer_id = Column(String(36), index=True, nullable=False)
channel = Column(String(50), nullable=False)
status = Column(String(20), default="active")
assigned_agent_id = Column(String(36), nullable=True, index=True)
assigned_ai_model = Column(String(100), nullable=True)
metadata = Column(JSON, nullable=True, default=dict)
sentiment_score = Column(Integer, nullable=True)
satisfaction_score = Column(Integer, nullable=True)
closed_at = Column(DateTime, nullable=True)
close_reason = Column(String(100), nullable=True)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"id": self.id,
"customer_id": self.customer_id,
"channel": self.channel,
"status": self.status,
"assigned_agent_id": self.assigned_agent_id,
"assigned_ai_model": self.assigned_ai_model,
"metadata": self.metadata or {},
"sentiment_score": self.sentiment_score,
"satisfaction_score": self.satisfaction_score,
"closed_at": self.closed_at.isoformat() if self.closed_at else None,
"close_reason": self.close_reason,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat()
}
class Message(Base, TimestampMixin):
"""消息模型"""
__tablename__ = "messages"
id = Column(String(36), primary_key=True, index=True)
conversation_id = Column(String(36), index=True, nullable=False)
sender_type = Column(String(20), nullable=False)
sender_id = Column(String(36), nullable=True)
content = Column(Text, nullable=False)
content_type = Column(String(20), default="text")
metadata = Column(JSON, nullable=True, default=dict)
intent = Column(String(100), nullable=True)
confidence = Column(Integer, nullable=True)
is_read = Column(Boolean, default=False)
read_at = Column(DateTime, nullable=True)
ai_model_used = Column(String(100), nullable=True)
ai_confidence = Column(Integer, nullable=True)
ai_metadata = Column(JSON, nullable=True, default=dict)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"id": self.id,
"conversation_id": self.conversation_id,
"sender_type": self.sender_type,
"sender_id": self.sender_id,
"content": self.content,
"content_type": self.content_type,
"metadata": self.metadata or {},
"intent": self.intent,
"confidence": self.confidence,
"is_read": self.is_read,
"read_at": self.read_at.isoformat() if self.read_at else None,
"ai_model_used": self.ai_model_used,
"ai_confidence": self.ai_confidence,
"ai_metadata": self.ai_metadata or {},
"created_at": self.created_at.isoformat()
}
class Agent(Base, TimestampMixin):
"""坐席模型"""
__tablename__ = "agents"
id = Column(String(36), primary_key=True, index=True)
user_id = Column(String(36), unique=True, nullable=False)
name = Column(String(200), nullable=False)
email = Column(String(254), unique=True, nullable=False)
status = Column(String(20), default="offline")
skills = Column(JSON, nullable=True, default=list)
current_conversation_ids = Column(JSON, nullable=True, default=list)
max_concurrent_chats = Column(Integer, default=3)
metadata = Column(JSON, nullable=True, default=dict)
last_active_at = Column(DateTime, nullable=True)
availability_schedule = Column(JSON, nullable=True)
performance_score = Column(Integer, default=0)
is_active = Column(Boolean, default=True)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"id": self.id,
"user_id": self.user_id,
"name": self.name,
"email": self.email,
"status": self.status,
"skills": self.skills or [],
"current_conversation_ids": self.current_conversation_ids or [],
"max_concurrent_chats": self.max_concurrent_chats,
"metadata": self.metadata or {},
"last_active_at": self.last_active_at.isoformat() if self.last_active_at else None,
"availability_schedule": self.availability_schedule or {},
"performance_score": self.performance_score,
"is_active": self.is_active,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat()
}
class KnowledgeBase(Base, TimestampMixin):
"""知识库模型"""
__tablename__ = "knowledge_base"
id = Column(String(36), primary_key=True, index=True)
title = Column(String(500), nullable=False)
content = Column(Text, nullable=False)
category = Column(String(100), nullable=True, index=True)
tags = Column(JSON, nullable=True, default=list)
language = Column(String(10), default="zh")
is_active = Column(Boolean, default=True)
vector_embedding = Column(JSON, nullable=True)
metadata = Column(JSON, nullable=True, default=dict)
usage_count = Column(Integer, default=0)
last_used_at = Column(DateTime, nullable=True)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"id": self.id,
"title": self.title,
"content": self.content,
"category": self.category,
"tags": self.tags or [],
"language": self.language,
"is_active": self.is_active,
"metadata": self.metadata or {},
"usage_count": self.usage_count,
"last_used_at": self.last_used_at.isoformat() if self.last_used_at else None,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat()
}
class Intent(Base, TimestampMixin):
"""意图模型"""
__tablename__ = "intents"
id = Column(String(36), primary_key=True, index=True)
name = Column(String(100), nullable=False, unique=True)
description = Column(Text, nullable=True)
examples = Column(JSON, nullable=True, default=list)
handler_type = Column(String(50), default="ai")
confidence_threshold = Column(Integer, default=70)
metadata = Column(JSON, nullable=True, default=dict)
is_active = Column(Boolean, default=True)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"examples": self.examples or [],
"handler_type": self.handler_type,
"confidence_threshold": self.confidence_threshold,
"metadata": self.metadata or {},
"is_active": self.is_active,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat()
}
class AIResponseLog(Base, TimestampMixin):
"""AI 响应日志"""
__tablename__ = "ai_response_logs"
id = Column(String(36), primary_key=True, index=True)
conversation_id = Column(String(36), index=True, nullable=False)
message_id = Column(String(36), index=True, nullable=False)
ai_model = Column(String(100), nullable=False)
prompt = Column(Text, nullable=False)
response = Column(Text, nullable=False)
confidence = Column(Integer, nullable=True)
processing_time_ms = Column(Integer, nullable=True)
tokens_used = Column(Integer, nullable=True)
metadata = Column(JSON, nullable=True, default=dict)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"id": self.id,
"conversation_id": self.conversation_id,
"message_id": self.message_id,
"ai_model": self.ai_model,
"prompt": self.prompt,
"response": self.response,
"confidence": self.confidence,
"processing_time_ms": self.processing_time_ms,
"tokens_used": self.tokens_used,
"metadata": self.metadata or {},
"created_at": self.created_at.isoformat()
}
3.2 数据库初始化脚本
创建 scripts/init_database.py:
""" 数据库初始化脚本
创建数据库表结构和初始数据
"""
import asyncio
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent))
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import text
from src.core.config import settings
from src.models.base import Base
from src.models import *
async def init_database():
"""初始化数据库"""
print("🚀 开始初始化数据库...")
database_url = settings.DATABASE_URL.replace("postgresql+asyncpg", "postgresql")
sync_engine = create_async_engine(
database_url,
echo=settings.DEBUG,
pool_pre_ping=True,
pool_size=20,
max_overflow=30
)
print("📦 创建数据库表...")
async with sync_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
print("✅ 数据库表创建完成")
print("📝 创建初始数据...")
async_session = sessionmaker(
sync_engine,
class_=AsyncSession,
expire_on_commit=False
)
async with async_session() as session:
try:
sample_intents = [
{"id": "greeting", "name": "greeting", "description": "客户打招呼",
"examples": ["你好", "嗨", "早上好", "在吗", "有人吗"],
"handler_type": "ai", "confidence_threshold": 60},
{"id": "product_inquiry", "name": "product_inquiry", "description": "产品咨询",
"examples": ["这个产品多少钱", "有什么功能", "怎么使用", "有什么优惠", "什么时候发货"],
"handler_type": "ai", "confidence_threshold": 70},
{"id": "technical_support", "name": "technical_support", "description": "技术支持",
"examples": ["无法登录", "系统错误", "怎么重置密码", "连接失败", "闪退问题"],
"handler_type": "hybrid", "confidence_threshold": 65},
{"id": "complaint", "name": "complaint", "description": "投诉建议",
"examples": ["我要投诉", "服务太差", "质量有问题", "退款", "赔偿"],
"handler_type": "agent", "confidence_threshold": 75},
{"id": "payment_issue", "name": "payment_issue", "description": "支付问题",
"examples": ["付款失败", "重复扣款", "退款未到账", "支付方式", "发票问题"],
"handler_type": "hybrid", "confidence_threshold": 70}
]
for intent_data in sample_intents:
await session.execute(text("""
INSERT INTO intents (id, name, description, examples, handler_type, confidence_threshold, created_at, updated_at)
VALUES (:id, :name, :description, :examples, :handler_type, :confidence_threshold, NOW(), NOW())
ON CONFLICT (id) DO NOTHING
"""), intent_data)
knowledge_entries = [
{"id": "kb_welcome", "title": "欢迎信息",
"content": "欢迎使用我们的客服系统!我是 AI 助手,可以帮您解答问题。如需人工服务,请告诉我。",
"category": "general", "tags": ["welcome", "greeting"], "language": "zh"},
{"id": "kb_product_info", "title": "产品基本信息",
"content": "我们的产品提供以下功能:1. 智能对话 2. 多语言支持 3. 24 小时在线 4. 人工转接。价格根据套餐不同有所差异。",
"category": "product", "tags": ["product", "features", "pricing"], "language": "zh"},
{"id": "kb_technical_help", "title": "常见技术问题",
"content": "如果您遇到技术问题:1. 尝试刷新页面 2. 清除缓存 3. 检查网络连接 4. 重启应用。如果问题持续,请联系技术支持。",
"category": "technical", "tags": ["troubleshooting", "help", "support"], "language": "zh"},
{"id": "kb_refund_policy", "title": "退款政策",
"content": "我们的退款政策:购买后 30 天内可申请退款,需提供订单号和退款原因。退款将在 7-14 个工作日内处理。",
"category": "policy", "tags": ["refund", "policy", "money"], "language": "zh"}
]
for kb_data in knowledge_entries:
await session.execute(text("""
INSERT INTO knowledge_base (id, title, content, category, tags, language, is_active, created_at, updated_at)
VALUES (:id, :title, :content, :category, :tags, :language, true, NOW(), NOW())
ON CONFLICT (id) DO NOTHING
"""), kb_data)
await session.commit()
print("✅ 初始数据创建完成")
except Exception as e:
await session.rollback()
print(f"❌ 创建初始数据失败:{e}")
raise
print("🎉 数据库初始化完成!")
async def check_database_connection():
"""检查数据库连接"""
print("🔍 检查数据库连接...")
try:
database_url = settings.DATABASE_URL.replace("postgresql+asyncpg", "postgresql")
engine = create_async_engine(database_url)
async with engine.connect() as conn:
result = await conn.execute(text("SELECT version()"))
version = result.scalar()
print(f"✅ 数据库连接成功:{version}")
await engine.dispose()
return True
except Exception as e:
print(f"❌ 数据库连接失败:{e}")
return False
async def main():
"""主函数"""
print("="*50)
print("数据库初始化工具")
print("="*50)
if not await check_database_connection():
print("请检查数据库配置和连接")
return
await init_database()
if __name__ == "__main__":
asyncio.run(main())
4. AI 核心组件实现
4.1 NLP 处理器(意图识别与情感分析)
创建 src/ai_components/nlp/processor.py:
""" NLP 处理器
负责意图识别、情感分析、实体提取等自然语言处理任务
"""
import asyncio
import logging
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
import numpy as np
from datetime import datetime
try:
import jieba
import jieba.analyse
JIEBA_AVAILABLE = True
except ImportError:
JIEBA_AVAILABLE = False
print("⚠️ jieba 未安装,中文分词功能将受限")
try:
from transformers import (
AutoTokenizer, AutoModelForSequenceClassification, pipeline, BertTokenizer, BertModel
)
from sentence_transformers import SentenceTransformer
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
print("⚠️ transformers/sentence-transformers 未安装")
from src.core.config import settings
@dataclass
class IntentResult:
"""意图识别结果"""
intent: str
confidence: float
entities: Dict[str, Any]
alternatives: List[Dict[str, float]]
@dataclass
class SentimentResult:
"""情感分析结果"""
sentiment: str
score: float
confidence: float
emotions: Dict[str, float]
@dataclass
class NLPAnalysis:
"""NLP 分析综合结果"""
text: str
intent_result: IntentResult
sentiment_result: SentimentResult
entities: Dict[str, Any]
keywords: List[str]
language: str
processing_time_ms: float
class NLPProcessor:
"""NLP 处理器主类"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.initialized = False
self.models = {}
self.tokenizers = {}
self.confidence_threshold = 0.6
self.max_sequence_length = 512
async def initialize(self):
"""初始化 NLP 模型"""
if self.initialized:
return
self.logger.info("初始化 NLP 处理器...")
try:
if JIEBA_AVAILABLE:
jieba.set_dictionary('./data/dict.txt')
jieba.initialize()
if TRANSFORMERS_AVAILABLE:
self.logger.info("加载 Sentence Transformer 模型...")
self.sentence_model = SentenceTransformer(
settings.SENTENCE_TRANSFORMER_MODEL,
cache_folder=settings.AI_MODEL_PATH
)
self.logger.info("加载情感分析模型...")
self.sentiment_analyzer = pipeline("sentiment-analysis", model="nlptown/bert-base-multilingual-uncased-sentiment", device=0 if settings.USE_GPU else -1)
self.logger.info("加载 BERT 模型...")
self.bert_tokenizer = BertTokenizer.from_pretrained(
settings.BERT_MODEL_PATH,
cache_dir=settings.AI_MODEL_PATH
)
self.bert_model = BertModel.from_pretrained(
settings.BERT_MODEL_PATH,
cache_dir=settings.AI_MODEL_PATH
)
if settings.USE_GPU:
self.bert_model.cuda()
self.intent_patterns = self._load_intent_patterns()
self.initialized = True
self.logger.info("NLP 处理器初始化完成")
except Exception as e:
self.logger.error(f"NLP 处理器初始化失败:{e}")
raise
def _load_intent_patterns(self) -> Dict[str, List[str]]:
"""加载意图模式(实际应从数据库加载)"""
return {
"greeting": ["你好", "嗨", "早上好", "晚上好", "在吗", "有人吗"],
"farewell": ["再见", "拜拜", "谢谢", "结束", "好了"],
"product_inquiry": ["产品", "价格", "功能", "特性", "规格", "多少钱"],
"technical_support": ["问题", "错误", "故障", "无法", "不能", "帮助"],
"complaint": ["投诉", "不满", "差评", "退款", "赔偿", "生气"],
"payment_issue": ["支付", "付款", "扣款", "退款", "发票", "账单"],
"schedule": ["时间", "预约", "安排", "什么时候", "多久"],
"location": ["地址", "位置", "在哪里", "怎么去", "门店"],
"account": ["登录", "注册", "密码", "账户", "会员"],
"other": []
}
async def analyze_text(self, text: str, context: Optional[Dict] = None) -> NLPAnalysis:
"""
综合分析文本
Args:
text: 待分析文本
context: 上下文信息
Returns:
NLPAnalysis 对象
"""
start_time = datetime.now()
if not self.initialized:
await self.initialize()
language = self._detect_language(text)
intent_task = asyncio.create_task(self.detect_intent(text, context))
sentiment_task = asyncio.create_task(self.analyze_sentiment(text))
entity_task = asyncio.create_task(self.extract_entities(text))
keyword_task = asyncio.create_task(self.extract_keywords(text, language))
intent_result = await intent_task
sentiment_result = await sentiment_task
entities = await entity_task
keywords = await keyword_task
processing_time = (datetime.now() - start_time).total_seconds() * 1000
return NLPAnalysis(
text=text,
intent_result=intent_result,
sentiment_result=sentiment_result,
entities=entities,
keywords=keywords,
language=language,
processing_time_ms=processing_time
)
async def detect_intent(self, text: str, context: Optional[Dict] = None) -> IntentResult:
"""
检测用户意图
Args:
text: 用户输入文本
context: 对话上下文
Returns:
IntentResult 对象
"""
try:
rule_based_result = self._rule_based_intent_detection(text)
similarity_based_result = await self._similarity_based_intent_detection(text)
final_intent, final_confidence, alternatives = self._fuse_intent_results(
rule_based_result, similarity_based_result
)
entities = await self.extract_entities(text)
if context:
final_intent = self._apply_context_rules(final_intent, context)
return IntentResult(
intent=final_intent,
confidence=final_confidence,
entities=entities,
alternatives=alternatives
)
except Exception as e:
self.logger.error(f"意图检测失败:{e}")
return IntentResult(intent="unknown", confidence=0.0, entities={}, alternatives=[])
def _rule_based_intent_detection(self, text: str) -> Tuple[str, float]:
"""基于规则的意图检测"""
text_lower = text.lower()
best_intent = "other"
best_score = 0.0
for intent, patterns in self.intent_patterns.items():
score = 0
for pattern in patterns:
if pattern in text_lower:
score += 1
if score > 0:
pattern_count = len(patterns)
normalized_score = score / pattern_count
if normalized_score > best_score:
best_score = normalized_score
best_intent = intent
return best_intent, best_score
async def _similarity_based_intent_detection(self, text: str) -> Tuple[str, float]:
"""基于相似度的意图检测"""
if not TRANSFORMERS_AVAILABLE:
return "other", 0.0
try:
text_embedding = self.sentence_model.encode(text)
intent_scores = {}
for intent, examples in self.intent_patterns.items():
if not examples:
continue
example_embeddings = self.sentence_model.encode(examples)
similarities = np.dot(example_embeddings, text_embedding.T)
max_similarity = np.max(similarities)
normalized_score = float((max_similarity + 1) / 2)
intent_scores[intent] = normalized_score
if intent_scores:
best_intent = max(intent_scores, key=intent_scores.get)
best_score = intent_scores[best_intent]
return best_intent, best_score
return "other", 0.0
except Exception as e:
self.logger.error(f"相似度意图检测失败:{e}")
return "other", 0.0
def _fuse_intent_results(self, rule_result: Tuple[str, float], similarity_result: Tuple[str, float]) -> Tuple[str, float, List[Dict]]:
"""融合不同方法的结果"""
rule_intent, rule_score = rule_result
sim_intent, sim_score = similarity_result
rule_weight = 0.6
sim_weight = 0.4
scores = {}
scores[rule_intent] = rule_score * rule_weight
scores[sim_intent] = scores.get(sim_intent, 0) + sim_score * sim_weight
best_intent = max(scores, key=scores.get)
best_score = scores[best_intent]
alternatives = [{"intent": intent, "score": score} for intent, score in scores.items() if intent != best_intent and score > 0.1]
alternatives.sort(key=lambda x: x["score"], reverse=True)
return best_intent, best_score, alternatives
def _apply_context_rules(self, intent: str, context: Dict) -> str:
"""应用上下文规则调整意图"""
if intent == "greeting" and context.get("greeting_count", 0) >= 3:
return "technical_support"
if intent == "schedule" and context.get("last_intent") == "payment_issue":
return "shipping_inquiry"
return intent
async def analyze_sentiment(self, text: str) -> SentimentResult:
"""
分析文本情感
Args:
text: 待分析文本
Returns:
SentimentResult 对象
"""
try:
if not TRANSFORMERS_AVAILABLE:
return self._simple_sentiment_analysis(text)
results = self.sentiment_analyzer(text)
if results:
result = results[0]
label = result['label']
score = result['score']
if label in ["POSITIVE", "5 stars", "4 stars"]:
sentiment = "positive"
elif label in ["NEGATIVE", "1 star", "2 stars"]:
sentiment = "negative"
else:
sentiment = "neutral"
emotions = self._analyze_emotions(text)
return SentimentResult(
sentiment=sentiment,
score=score,
confidence=score,
emotions=emotions
)
return SentimentResult(sentiment="neutral", score=0.5, confidence=0.0, emotions={})
except Exception as e:
self.logger.error(f"情感分析失败:{e}")
return SentimentResult(sentiment="neutral", score=0.5, confidence=0.0, emotions={})
def _simple_sentiment_analysis(self, text: str) -> SentimentResult:
"""简单的情感分析(基于关键词)"""
positive_words = ["好", "不错", "满意", "喜欢", "棒", "赞", "感谢"]
negative_words = ["差", "不好", "不满意", "讨厌", "垃圾", "投诉", "生气"]
positive_count = sum(1 for word in positive_words if word in text)
negative_count = sum(1 for word in negative_words if word in text)
total = positive_count + negative_count
if total == 0:
return SentimentResult(sentiment="neutral", score=0.5, confidence=0.0, emotions={})
sentiment_score = positive_count / total
if sentiment_score > 0.6:
sentiment = "positive"
elif sentiment_score < 0.4:
sentiment = "negative"
else:
sentiment = "neutral"
return SentimentResult(
sentiment=sentiment,
score=sentiment_score,
confidence=min(sentiment_score, 1 - sentiment_score) * 2,
emotions={}
)
def _analyze_emotions(self, text: str) -> Dict[str, float]:
"""分析细分情绪"""
emotion_keywords = {
"happy": ["开心", "高兴", "快乐", "愉快", "兴奋"],
"angry": ["生气", "愤怒", "恼火", "不爽", "气愤"],
"sad": ["伤心", "难过", "悲伤", "失望", "沮丧"],
"surprise": ["惊讶", "惊奇", "意外", "没想到"],
"fear": ["害怕", "担心", "恐惧", "紧张"],
"disgust": ["恶心", "讨厌", "嫌弃", "反感"]
}
emotions = {}
for emotion, keywords in emotion_keywords.items():
count = sum(1 for keyword in keywords if keyword in text)
if count > 0:
emotions[emotion] = count / len(keywords)
return emotions
async def extract_entities(self, text: str) -> Dict[str, Any]:
"""
提取命名实体
Args:
text: 待分析文本
Returns:
实体字典
"""
entities = {"products": [], "numbers": [], "dates": [], "locations": [], "people": [], "organizations": []}
import re
numbers = re.findall(r'\d+', text)
entities["numbers"] = numbers
product_keywords = ["产品", "服务", "软件", "系统", "应用"]
for keyword in product_keywords:
if keyword in text:
idx = text.find(keyword)
start = max(0, idx - 10)
end = min(len(text), idx + 10)
entities["products"].append(text[start:end])
date_patterns = [r'\d{4}年\d{1,2}月\d{1,2}日', r'\d{1,2}月\d{1,2}日', r'\d{4}-\d{2}-\d{2}', r'今天 | 明天 | 后天 | 昨天']
for pattern in date_patterns:
dates = re.findall(pattern, text)
entities["dates"].extend(dates)
return entities
async def extract_keywords(self, text: str, language: str = "zh") -> List[str]:
"""
提取关键词
Args:
text: 待分析文本
language: 语言
Returns:
关键词列表
"""
if language == "zh" and JIEBA_AVAILABLE:
keywords = jieba.analyse.extract_tags(
text, topK=10, withWeight=False, allowPOS=('n', 'nr', 'ns', 'nt', 'nz', 'v', 'a'))
return keywords
else:
words = text.lower().split()
stop_words = {"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for"}
keywords = [word for word in words if word not in stop_words]
return keywords[:10]
def _detect_language(self, text: str) -> str:
"""检测文本语言"""
import re
if re.search(r'[\u4e00-\u9fff]', text):
return "zh"
if re.search(r'[a-zA-Z]', text):
return "en"
return "zh"
async def get_text_embedding(self, text: str) -> np.ndarray:
"""
获取文本的向量嵌入
Args:
text: 输入文本
Returns:
文本向量
"""
if not TRANSFORMERS_AVAILABLE:
raise RuntimeError("transformers 库未安装")
if not self.initialized:
await self.initialize()
embedding = self.sentence_model.encode(text)
return embedding
async def batch_analyze(self, texts: List[str]) -> List[NLPAnalysis]:
"""
批量分析文本
Args:
texts: 文本列表
Returns:
分析结果列表
"""
tasks = [self.analyze_text(text) for text in texts]
results = await asyncio.gather(*tasks)
return results
nlp_processor = NLPProcessor()
4.2 知识检索系统
创建 src/ai_components/retrieval/engine.py:
""" 知识检索引擎
基于向量相似度从知识库中检索相关信息
"""
import asyncio
import logging
import json
from typing import List, Dict, Optional, Any, Tuple
from datetime import datetime
import numpy as np
from dataclasses import dataclass
try:
import faiss
FAISS_AVAILABLE = True
except ImportError:
FAISS_AVAILABLE = False
print("⚠️ faiss 未安装,向量检索功能将受限")
try:
import chromadb
from chromadb.config import Settings
CHROMA_AVAILABLE = True
except ImportError:
CHROMA_AVAILABLE = False
print("⚠️ chromadb 未安装,向量数据库功能将受限")
from src.core.config import settings
from src.ai_components.nlp.processor import nlp_processor
@dataclass
class SearchResult:
"""搜索结果"""
id: str
content: str
score: float
metadata: Dict[str, Any]
source: str
@dataclass
class RetrievalResponse:
"""检索响应"""
query: str
results: List[SearchResult]
suggested_questions: List[str]
processing_time_ms: float
class KnowledgeRetrievalEngine:
"""知识检索引擎"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.initialized = False
self.faiss_index = None
self.id_to_content = {}
self.embedding_dim = 384
self.chroma_client = None
self.chroma_collection = None
self.cache = {}
self.cache_size = 1000
async def initialize(self):
"""初始化检索引擎"""
if self.initialized:
return
self.logger.info("初始化知识检索引擎...")
try:
await nlp_processor.initialize()
if CHROMA_AVAILABLE:
await self._initialize_chromadb()
if FAISS_AVAILABLE:
await self._initialize_faiss()
await self._load_knowledge_base()
self.initialized = True
self.logger.info("知识检索引擎初始化完成")
except Exception as e:
self.logger.error(f"知识检索引擎初始化失败:{e}")
raise
async def _initialize_chromadb(self):
"""初始化 ChromaDB"""
self.logger.info("初始化 ChromaDB...")
self.chroma_client = chromadb.Client(Settings(
chroma_db_impl="duckdb+parquet",
persist_directory="./data/chroma_db"
))
collection_name = "knowledge_base"
try:
self.chroma_collection = self.chroma_client.get_collection(collection_name)
self.logger.info(f"加载现有集合:{collection_name}")
except:
self.chroma_collection = self.chroma_client.create_collection(
name=collection_name,
metadata={"description": "客服知识库"}
)
self.logger.info(f"创建新集合:{collection_name}")
async def _initialize_faiss(self):
"""初始化 FAISS 索引"""
self.logger.info("初始化 FAISS 索引...")
self.faiss_index = faiss.IndexFlatIP(self.embedding_dim)
if settings.USE_GPU and hasattr(faiss, 'StandardGpuResources'):
self.logger.info("启用 FAISS GPU 加速...")
res = faiss.StandardGpuResources()
self.faiss_index = faiss.index_cpu_to_gpu(res, 0, self.faiss_index)
async def _load_knowledge_base(self):
"""从数据库加载知识库"""
self.logger.info("加载知识库数据...")
sample_knowledge = [
{"id": "kb_001", "content": "我们的工作时间是周一至周五 9:00-18:00,周末休息。",
"category": "general", "metadata": {"source": "faq", "importance": 5}},
{"id": "kb_002", "content": "产品退款需要在购买后 30 天内申请,退款将在 7-14 个工作日内处理。",
"category": "policy", "metadata": {"source": "policy", "importance": 8}},
{"id": "kb_003", "content": "如果忘记密码,可以点击登录页面的'忘记密码'链接,通过邮箱重置。",
"category": "account", "metadata": {"source": "faq", "importance": 7}},
{"id": "kb_004", "content": "技术支持电话:400-123-4567,工作时间随时可拨打。",
"category": "contact", "metadata": {"source": "contact", "importance": 9}},
{"id": "kb_005", "content": "产品支持 7 天无理由退货,但需要商品完好、包装完整。",
"category": "policy", "metadata": {"source": "policy", "importance": 6}}
]
await self.add_documents(sample_knowledge)
self.logger.info(f"已加载 {len(sample_knowledge)} 条知识库数据")
async def add_documents(self, documents: List[Dict[str, Any]]):
"""
添加文档到知识库
Args:
documents: 文档列表,每个文档包含 id, content, metadata
"""
if not documents:
return
contents = [doc["content"] for doc in documents]
embeddings = await self._generate_embeddings(contents)
if FAISS_AVAILABLE and self.faiss_index is not None:
embeddings_np = np.array(embeddings).astype('float32')
self.faiss_index.add(embeddings_np)
for i, doc in enumerate(documents):
doc_id = doc["id"]
self.id_to_content[doc_id] = {
"content": doc["content"],
"metadata": doc.get("metadata", {}),
"index": len(self.id_to_content)
}
if CHROMA_AVAILABLE and self.chroma_collection is not None:
try:
self.chroma_collection.add(
embeddings=embeddings,
documents=contents,
metadatas=[doc.get("metadata", {}) for doc in documents],
ids=[doc["id"] for doc in documents]
)
self.logger.info(f"已添加 {len(documents)} 个文档到 ChromaDB")
except Exception as e:
self.logger.error(f"添加到 ChromaDB 失败:{e}")
async def search(self, query: str, top_k: int = 5, threshold: float = 0.5) -> RetrievalResponse:
"""
搜索相关知识
Args:
query: 查询文本
top_k: 返回结果数量
threshold: 相似度阈值
Returns:
RetrievalResponse 对象
"""
start_time = datetime.now()
if not self.initialized:
await self.initialize()
cache_key = f"{query}_{top_k}_{threshold}"
if cache_key in self.cache:
self.logger.debug("从缓存返回搜索结果")
return self.cache[cache_key]
query_analysis = await nlp_processor.analyze_text(query)
query_embedding = await nlp_processor.get_text_embedding(query)
results = []
vector_results = await self._vector_search(query_embedding, query, top_k, threshold)
results.extend(vector_results)
keyword_results = await self._keyword_search(query_analysis.keywords, top_k, threshold)
results.extend(keyword_results)
intent_results = await self._intent_based_search(query_analysis.intent_result.intent, top_k)
results.extend(intent_results)
unique_results = self._deduplicate_and_sort(results)
final_results = unique_results[:top_k]
suggested_questions = await self._generate_suggested_questions(query, query_analysis, final_results)
processing_time = (datetime.now() - start_time).total_seconds() * 1000
response = RetrievalResponse(
query=query,
results=final_results,
suggested_questions=suggested_questions,
processing_time_ms=processing_time
)
self._add_to_cache(cache_key, response)
return response
async def _vector_search(self, query_embedding: np.ndarray, query: str, top_k: int, threshold: float) -> List[SearchResult]:
"""向量相似度搜索"""
results = []
if FAISS_AVAILABLE and self.faiss_index is not None:
try:
query_vector = np.array([query_embedding]).astype('float32')
distances, indices = self.faiss_index.search(query_vector, top_k * 2)
for i in range(len(indices[0])):
idx = indices[0][i]
distance = distances[0][i]
score = float((distance + 1) / 2)
if score >= threshold:
for doc_id, doc_info in self.id_to_content.items():
if doc_info["index"] == idx:
results.append(SearchResult(
id=doc_id,
content=doc_info["content"],
score=score,
metadata=doc_info["metadata"],
source="knowledge_base"
))
break
self.logger.debug(f"FAISS 搜索找到 {len(results)} 个结果")
except Exception as e:
self.logger.error(f"FAISS 搜索失败:{e}")
if CHROMA_AVAILABLE and self.chroma_collection is not None:
try:
chroma_results = self.chroma_collection.query(
query_embeddings=[query_embedding.tolist()],
n_results=top_k,
include=["documents", "metadatas", "distances"]
)
if chroma_results and chroma_results["documents"]:
for i in range(len(chroma_results["documents"][0])):
doc = chroma_results["documents"][0][i]
metadata = chroma_results["metadatas"][0][i]
distance = chroma_results["distances"][0][i]
score = float(1.0 - distance) if distance else 0.5
if score >= threshold:
doc_id = f"chroma_{i}"
results.append(SearchResult(
id=doc_id,
content=doc,
score=score,
metadata=metadata or {},
source="knowledge_base"
))
self.logger.debug(f"ChromaDB 搜索找到 {len(results)} 个结果")
except Exception as e:
self.logger.error(f"ChromaDB 搜索失败:{e}")
return results
async def _keyword_search(self, keywords: List[str], top_k: int, threshold: float) -> List[SearchResult]:
"""关键词搜索"""
results = []
if not keywords:
return results
for doc_id, doc_info in self.id_to_content.items():
content = doc_info["content"].lower()
metadata = doc_info["metadata"]
match_count = 0
for keyword in keywords:
if keyword.lower() in content:
match_count += 1
if match_count > 0:
score = match_count / len(keywords)
if score >= threshold:
results.append(SearchResult(
id=doc_id,
content=doc_info["content"],
score=score,
metadata=metadata,
source="knowledge_base"
))
results.sort(key=lambda x: x.score, reverse=True)
return results[:top_k]
async def _intent_based_search(self, intent: str, top_k: int) -> List[SearchResult]:
"""基于意图的搜索"""
results = []
intent_categories = {
"product_inquiry": ["product", "features", "specification"],
"technical_support": ["technical", "help", "support", "troubleshooting"],
"payment_issue": ["payment", "refund", "billing", "invoice"],
"complaint": ["complaint", "issue", "problem", "dissatisfied"],
"account": ["account", "login", "password", "register"]
}
categories = intent_categories.get(intent, [])
if not categories:
return results
for doc_id, doc_info in self.id_to_content.items():
metadata = doc_info["metadata"]
doc_categories = metadata.get("categories", [])
if isinstance(doc_categories, str):
doc_categories = [doc_categories]
for cat in categories:
if cat in doc_categories:
results.append(SearchResult(
id=doc_id,
content=doc_info["content"],
score=0.7,
metadata=metadata,
source="knowledge_base"
))
break
return results[:top_k]
def _deduplicate_and_sort(self, results: List[SearchResult]) -> List[SearchResult]:
"""去重和排序"""
seen_contents = set()
unique_results = []
for result in results:
content_prefix = result.content[:50]
if content_prefix not in seen_contents:
seen_contents.add(content_prefix)
unique_results.append(result)
unique_results.sort(key=lambda x: x.score, reverse=True)
return unique_results
async def _generate_suggested_questions(self, query: str, query_analysis: Any, search_results: List[SearchResult]) -> List[str]:
"""生成建议问题"""
suggestions = []
intent = query_analysis.intent_result.intent
intent_suggestions = {
"product_inquiry": ["这个产品有什么功能?", "价格是多少?", "有优惠活动吗?", "怎么购买?"],
"technical_support": ["常见问题有哪些?", "怎么联系技术支持?", "系统要求是什么?", "有用户手册吗?"],
"payment_issue": ["退款流程是怎样的?", "支持哪些支付方式?", "发票怎么开?", "付款遇到问题怎么办?"],
"account": ["怎么注册账号?", "忘记密码怎么办?", "怎么修改个人信息?", "账号安全如何保障?"]
}
if intent in intent_suggestions:
suggestions.extend(intent_suggestions[intent][:2])
for result in search_results[:2]:
content = result.content
sentences = content.split('。')[:2]
for sentence in sentences:
if sentence and len(sentence) > 10:
suggestions.append(f"关于「{sentence[:20]}...」")
unique_suggestions = list(dict.fromkeys(suggestions))
return unique_suggestions[:5]
async def _generate_embeddings(self, texts: List[str]) -> List[List[float]]:
"""生成文本向量"""
embeddings = []
for text in texts:
embedding = await nlp_processor.get_text_embedding(text)
embeddings.append(embedding.tolist())
return embeddings
def _add_to_cache(self, key: str, value: RetrievalResponse):
"""添加到缓存"""
if len(self.cache) >= self.cache_size:
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
self.cache[key] = value
async def update_document(self, doc_id: str, content: str, metadata: Dict):
"""更新文档"""
keys_to_remove = [k for k in self.cache.keys() if doc_id in k]
for key in keys_to_remove:
del self.cache[key]
document = {"id": doc_id, "content": content, "metadata": metadata}
await self.add_documents([document])
async def delete_document(self, doc_id: str):
"""删除文档"""
keys_to_remove = [k for k in self.cache.keys() if doc_id in k]
for key in keys_to_remove:
del self.cache[key]
if CHROMA_AVAILABLE and self.chroma_collection is not None:
try:
self.chroma_collection.delete(ids=[doc_id])
self.logger.info(f"已从 ChromaDB 删除文档:{doc_id}")
except Exception as e:
self.logger.error(f"从 ChromaDB 删除文档失败:{e}")
retrieval_engine = KnowledgeRetrievalEngine()
5. 对话管理系统
5.1 对话状态管理
创建 src/services/conversation/manager.py:
""" 对话管理器
负责对话状态管理、上下文维护和流程控制
"""
import asyncio
import json
import logging
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime, timedelta
from enum import Enum
import uuid
from src.core.config import settings
from src.models.base import Conversation, Message
from src.ai_components.nlp.processor import nlp_processor, NLPAnalysis
from src.ai_components.retrieval.engine import retrieval_engine, RetrievalResponse
class ConversationState(Enum):
"""对话状态枚举"""
ACTIVE = "active"
PENDING = "pending"
TRANSFERRING = "transferring"
CLOSED = "closed"
ESCALATED = "escalated"
class HandlerType(Enum):
"""处理器类型枚举"""
AI = "ai"
AGENT = "agent"
HYBRID = "hybrid"
class ConversationContext:
"""对话上下文"""
def __init__(self, conversation_id: str, customer_id: str):
self.conversation_id = conversation_id
self.customer_id = customer_id
self.messages: List[Dict] = []
self.state_history: List[Dict] = []
self.intent_history: List[Dict] = []
self.sentiment_history: List[Dict] = []
self.metadata: Dict[str, Any] = {}
self.created_at = datetime.now()
self.updated_at = datetime.now()
self.message_count = 0
self.ai_response_count = 0
self.agent_response_count = 0
self.transfer_count = 0
self.escalation_count = 0
self.last_message_time = None
self.last_ai_response_time = None
self.last_agent_response_time = None
self.cached_responses: Dict[str, Any] = {}
def add_message(self, message: Dict):
"""添加消息到上下文"""
self.messages.append(message)
self.message_count += 1
self.last_message_time = datetime.now()
self.updated_at = datetime.now()
if len(self.messages) > settings.MAX_CONVERSATION_HISTORY:
self.messages = self.messages[-settings.MAX_CONVERSATION_HISTORY:]
def add_state_change(self, old_state: str, new_state: str, reason: str):
"""添加状态变更记录"""
self.state_history.append({
"timestamp": datetime.now().isoformat(),
"old_state": old_state,
"new_state": new_state,
"reason": reason
})
def add_intent(self, intent: str, confidence: float, text: str):
"""添注意图记录"""
self.intent_history.append({
"timestamp": datetime.now().isoformat(),
"intent": intent,
"confidence": confidence,
"text": text
})
def add_sentiment(self, sentiment: str, score: float):
"""添加情感记录"""
self.sentiment_history.append({
"timestamp": datetime.now().isoformat(),
"sentiment": sentiment,
"score": score
})
def get_recent_messages(self, count: int = 10) -> List[Dict]:
"""获取最近的消息"""
return self.messages[-count:] if self.messages else []
def get_conversation_summary(self) -> Dict[str, Any]:
"""获取对话摘要"""
return {
"conversation_id": self.conversation_id,
"customer_id": self.customer_id,
"message_count": self.message_count,
"ai_response_count": self.ai_response_count,
"agent_response_count": self.agent_response_count,
"transfer_count": self.transfer_count,
"escalation_count": self.escalation_count,
"current_state": self.metadata.get("current_state", "active"),
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"duration_seconds": (datetime.now() - self.created_at).total_seconds()
}
def is_timed_out(self) -> bool:
"""检查对话是否超时"""
if not self.last_message_time:
return False
timeout_minutes = settings.SESSION_TIMEOUT_MINUTES
timeout_delta = timedelta(minutes=timeout_minutes)
return datetime.now() - self.last_message_time > timeout_delta
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"conversation_id": self.conversation_id,
"customer_id": self.customer_id,
"messages": self.messages,
"state_history": self.state_history,
"intent_history": self.intent_history,
"sentiment_history": self.sentiment_history,
"metadata": self.metadata,
"statistics": {
"message_count": self.message_count,
"ai_response_count": self.ai_response_count,
"agent_response_count": self.agent_response_count,
"transfer_count": self.transfer_count,
"escalation_count": self.escalation_count
},
"timestamps": {
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"last_message_time": self.last_message_time.isoformat() if self.last_message_time else None
}
}
class ConversationManager:
"""对话管理器"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.conversations: Dict[str, ConversationContext] = {}
self.customer_conversations: Dict[str, List[str]] = {}
self.max_conversations = 10000
self.cleanup_interval = 300
asyncio.create_task(self._cleanup_task())
async def create_conversation(self, customer_id: str, channel: str, metadata: Optional[Dict] = None) -> ConversationContext:
"""
创建新对话
Args:
customer_id: 客户 ID
channel: 渠道
metadata: 元数据
Returns:
对话上下文
"""
conversation_id = str(uuid.uuid4())
context = ConversationContext(conversation_id, customer_id)
context.metadata.update({
"channel": channel,
"current_state": ConversationState.ACTIVE.value,
"created_channel": channel,
"metadata": metadata or {}
})
self.conversations[conversation_id] = context
if customer_id not in self.customer_conversations:
self.customer_conversations[customer_id] = []
self.customer_conversations[customer_id].append(conversation_id)
if len(self.customer_conversations[customer_id]) > 10:
old_conv_id = self.customer_conversations[customer_id].pop(0)
if old_conv_id in self.conversations:
del self.conversations[old_conv_id]
self.logger.info(f"创建新对话:{conversation_id}, 客户:{customer_id}, 渠道:{channel}")
return context
async def process_message(self, conversation_id: str, message: Dict) -> Dict[str, Any]:
"""
处理消息
Args:
conversation_id: 对话 ID
message: 消息数据
Returns:
处理结果
"""
start_time = datetime.now()
context = self.conversations.get(conversation_id)
if not context:
raise ValueError(f"对话不存在:{conversation_id}")
context.last_message_time = datetime.now()
context.add_message(message)
text = message.get("content", "")
nlp_analysis = await nlp_processor.analyze_text(text)
context.add_intent(
nlp_analysis.intent_result.intent,
nlp_analysis.intent_result.confidence,
text
)
context.add_sentiment(
nlp_analysis.sentiment_result.sentiment,
nlp_analysis.sentiment_result.score
)
handler_type = await self._determine_handler_type(context, nlp_analysis, message)
if handler_type == HandlerType.AI:
response = await self._handle_with_ai(context, nlp_analysis, message)
context.ai_response_count += 1
context.last_ai_response_time = datetime.now()
elif handler_type == HandlerType.AGENT:
response = await self._handle_with_agent(context, nlp_analysis, message)
context.agent_response_count += 1
context.last_agent_response_time = datetime.now()
elif handler_type == HandlerType.HYBRID:
response = await self._handle_hybrid(context, nlp_analysis, message)
else:
response = {"type": "error", "content": "无法确定处理方式", "handler": "unknown"}
await self._update_conversation_state(context, response)
processing_time = (datetime.now() - start_time).total_seconds() * 1000
result = {
"conversation_id": conversation_id,
"message_id": message.get("id"),
"handler_type": handler_type.value,
"response": response,
"analysis": {
"intent": nlp_analysis.intent_result.intent,
"confidence": nlp_analysis.intent_result.confidence,
"sentiment": nlp_analysis.sentiment_result.sentiment,
"sentiment_score": nlp_analysis.sentiment_result.score,
"entities": nlp_analysis.entities,
"keywords": nlp_analysis.keywords
},
"processing_time_ms": processing_time,
"context_summary": context.get_conversation_summary()
}
return result
async def _determine_handler_type(self, context: ConversationContext, nlp_analysis: NLPAnalysis, message: Dict) -> HandlerType:
"""
确定处理器类型
Args:
context: 对话上下文
nlp_analysis: NLP 分析结果
message: 消息
Returns:
处理器类型
"""
current_state = context.metadata.get("current_state")
if current_state in [ConversationState.PENDING.value, ConversationState.TRANSFERRING.value, ConversationState.ESCALATED.value]:
return HandlerType.AGENT
text = message.get("content", "").lower()
if any(keyword in text for keyword in ["人工", "真人", "客服", "转人工"]):
return HandlerType.AGENT
if nlp_analysis.sentiment_result.sentiment == "negative":
sentiment_score = nlp_analysis.sentiment_result.score
if sentiment_score > 0.7:
return HandlerType.AGENT
elif sentiment_score > 0.5:
return HandlerType.HYBRID
intent_confidence = nlp_analysis.intent_result.confidence
intent_name = nlp_analysis.intent_result.intent
intent_configs = {
"complaint": {"threshold": 0.7, "handler": HandlerType.AGENT},
"technical_support": {"threshold": 0.6, "handler": HandlerType.HYBRID},
"payment_issue": {"threshold": 0.65, "handler": HandlerType.HYBRID},
"product_inquiry": {"threshold": 0.5, "handler": HandlerType.AI},
"greeting": {"threshold": 0.4, "handler": HandlerType.AI},
"other": {"threshold": 0.3, "handler": HandlerType.AI}
}
config = intent_configs.get(intent_name, intent_configs["other"])
if intent_confidence < config["threshold"]:
recent_messages = context.get_recent_messages(3)
if len(recent_messages) >= 2:
return HandlerType.AI
else:
return HandlerType.HYBRID
if context.message_count > 15:
return HandlerType.HYBRID
if context.transfer_count > 2:
return HandlerType.AGENT
return config["handler"]
async def _handle_with_ai(self, context: ConversationContext, nlp_analysis: NLPAnalysis, message: Dict) -> Dict[str, Any]:
"""使用 AI 处理消息"""
try:
text = message.get("content", "")
retrieval_result = await retrieval_engine.search(text)
prompt = self._build_ai_prompt(context, text, nlp_analysis, retrieval_result)
ai_response = await self._generate_ai_response(prompt)
response = {
"type": "ai_response",
"content": ai_response,
"confidence": nlp_analysis.intent_result.confidence,
"sources": [r.content[:100] for r in retrieval_result.results[:3]],
"suggested_questions": retrieval_result.suggested_questions,
"handler": "ai"
}
return response
except Exception as e:
self.logger.error(f"AI 处理失败:{e}")
return {"type": "error", "content": "抱歉,AI 暂时无法处理您的请求,正在为您转接人工客服。", "handler": "ai_fallback"}
async def _handle_with_agent(self, context: ConversationContext, nlp_analysis: NLPAnalysis, message: Dict) -> Dict[str, Any]:
"""使用人工处理消息"""
try:
available_agents = await self._find_available_agents(context, nlp_analysis)
if available_agents:
assigned_agent = await self._assign_agent(context, available_agents[0])
response = {
"type": "agent_assigned",
"content": f"正在为您转接人工客服,坐席 {assigned_agent['name']} 将为您服务。",
"agent_id": assigned_agent["id"],
"agent_name": assigned_agent["name"],
"estimated_wait_time": 30,
"handler": "agent_routing"
}
else:
response = {
"type": "agent_queue",
"content": "当前所有客服坐席繁忙,您已被加入等待队列。请稍候,我们会尽快为您服务。",
"queue_position": await self._get_queue_position(context),
"estimated_wait_time": 120,
"handler": "agent_queue"
}
return response
except Exception as e:
self.logger.error(f"人工处理失败:{e}")
return {"type": "error", "content": "抱歉,人工客服暂时不可用,请稍后再试或留言。", "handler": "agent_fallback"}
async def _handle_hybrid(self, context: ConversationContext, nlp_analysis: NLPAnalysis, message: Dict) -> Dict[str, Any]:
"""混合处理:AI 提供建议,人工确认"""
try:
text = message.get("content", "")
retrieval_result = await retrieval_engine.search(text)
prompt = self._build_ai_prompt(context, text, nlp_analysis, retrieval_result)
ai_response = await self._generate_ai_response(prompt)
available_agents = await self._find_available_agents(context, nlp_analysis)
response = {
"type": "hybrid_response",
"content": ai_response,
"ai_suggestion": ai_response,
"requires_agent_confirmation": True,
"agent_available": len(available_agents) > 0,
"confidence": nlp_analysis.intent_result.confidence,
"sources": [r.content[:100] for r in retrieval_result.results[:2]],
"handler": "hybrid"
}
if available_agents:
response["agent_note"] = "AI 已提供建议,坐席可随时接管对话"
return response
except Exception as e:
self.logger.error(f"混合处理失败:{e}")
return await self._handle_with_ai(context, nlp_analysis, message)
def _build_ai_prompt(self, context: ConversationContext, text: str, nlp_analysis: NLPAnalysis, retrieval_result: RetrievalResponse) -> str:
"""构建 AI 提示"""
recent_messages = context.get_recent_messages(5)
prompt = f"""你是一个专业的客服 AI 助手,正在与客户对话。
客户信息:
- 对话 ID: {context.conversation_id}
- 消息总数:{context.message_count}
- 当前情感:{nlp_analysis.sentiment_result.sentiment} (分数:{nlp_analysis.sentiment_result.score})
- 检测意图:{nlp_analysis.intent_result.intent} (置信度:{nlp_analysis.intent_result.confidence})
对话历史(最近 5 条):
"""
for msg in recent_messages[-5:]:
sender = "客户" if msg.get("sender_type") == "customer" else "客服"
prompt += f"{sender}: {msg.get('content','')}\n"
prompt += f"\n当前客户消息:{text}\n\n"
if retrieval_result.results:
prompt += "相关知识库信息:\n"
for i, result in enumerate(retrieval_result.results[:3], 1):
prompt += f"{i}. {result.content[:200]}...\n"
prompt += """ 请根据以上信息,用友好、专业、有帮助的语气回复客户。注意:
1. 如果问题需要人工处理,请说明原因并建议转人工
2. 如果信息不足,可以询问更多细节
3. 保持回复简洁明了
4. 如果是中文对话,请用中文回复
你的回复:"""
return prompt
async def _generate_ai_response(self, prompt: str) -> str:
"""生成 AI 响应(简化版本)"""
if "你好" in prompt or "嗨" in prompt:
return "您好!我是 AI 客服助手,很高兴为您服务。请问有什么可以帮助您的吗?"
elif "谢谢" in prompt or "感谢" in prompt:
return "不客气!很高兴能帮助您。如果还有其他问题,请随时告诉我。"
elif "再见" in prompt or "拜拜" in prompt:
return "感谢您的咨询,再见!祝您有美好的一天!"
else:
return "我理解您的问题,正在为您查找相关信息。请稍等..."
async def _find_available_agents(self, context: ConversationContext, nlp_analysis: NLPAnalysis) -> List[Dict]:
"""查找可用坐席"""
skills_needed = []
if nlp_analysis.intent_result.intent == "technical_support":
skills_needed = ["technical", "support"]
elif nlp_analysis.intent_result.intent == "payment_issue":
skills_needed = ["billing", "finance"]
mock_agents = [
{"id": "agent_001", "name": "张客服", "skills": ["technical", "support", "product"], "status": "online", "current_chats": 2, "max_chats": 5},
{"id": "agent_002", "name": "李客服", "skills": ["billing", "finance", "account"], "status": "online", "current_chats": 1, "max_chats": 5},
{"id": "agent_003", "name": "王客服", "skills": ["general", "sales", "product"], "status": "away", "current_chats": 0, "max_chats": 5}
]
available_agents = []
for agent in mock_agents:
if (agent["status"] == "online" and agent["current_chats"] < agent["max_chats"]):
if skills_needed:
if any(skill in agent["skills"] for skill in skills_needed):
available_agents.append(agent)
else:
available_agents.append(agent)
return available_agents
async def _assign_agent(self, context: ConversationContext, agent: Dict) -> Dict:
"""分配坐席"""
context.metadata["assigned_agent_id"] = agent["id"]
context.metadata["assigned_agent_name"] = agent["name"]
context.transfer_count += 1
return agent
async def _get_queue_position(self, context: ConversationContext) -> int:
"""获取队列位置"""
return 1
async def _update_conversation_state(self, context: ConversationContext, response: Dict):
"""更新对话状态"""
old_state = context.metadata.get("current_state", ConversationState.ACTIVE.value)
new_state = old_state
response_type = response.get("type", "")
if response_type == "agent_assigned":
new_state = ConversationState.TRANSFERRING.value
elif response_type == "agent_queue":
new_state = ConversationState.PENDING.value
elif "escalat" in response_type.lower():
new_state = ConversationState.ESCALATED.value
if new_state != old_state:
context.metadata["current_state"] = new_state
context.add_state_change(old_state, new_state, response_type)
self.logger.info(f"对话 {context.conversation_id} 状态变更:{old_state} -> {new_state}")
async def get_conversation(self, conversation_id: str) -> Optional[ConversationContext]:
"""获取对话上下文"""
return self.conversations.get(conversation_id)
async def close_conversation(self, conversation_id: str, reason: str = "client_closed"):
"""关闭对话"""
context = self.conversations.get(conversation_id)
if context:
old_state = context.metadata.get("current_state", ConversationState.ACTIVE.value)
context.metadata["current_state"] = ConversationState.CLOSED.value
context.add_state_change(old_state, ConversationState.CLOSED.value, reason)
context.metadata["closed_reason"] = reason
context.metadata["closed_at"] = datetime.now().isoformat()
self.logger.info(f"关闭对话:{conversation_id}, 原因:{reason}")
async def _cleanup_task(self):
"""清理任务:定期清理超时对话"""
while True:
try:
await asyncio.sleep(self.cleanup_interval)
current_time = datetime.now()
expired_conversations = []
for conv_id, context in self.conversations.items():
if context.is_timed_out():
expired_conversations.append(conv_id)
for conv_id in expired_conversations:
await self.close_conversation(conv_id, "timeout")
del self.conversations[conv_id]
if expired_conversations:
self.logger.info(f"清理了 {len(expired_conversations)} 个超时对话")
if len(self.conversations) > self.max_conversations:
excess = len(self.conversations) - self.max_conversations
old_ids = list(self.conversations.keys())[:excess]
for conv_id in old_ids:
await self.close_conversation(conv_id, "system_cleanup")
del self.conversations[conv_id]
self.logger.info(f"清理了 {excess} 个旧对话以控制内存")
except Exception as e:
self.logger.error(f"清理任务出错:{e}")
conversation_manager = ConversationManager()
6. API 服务实现
6.1 主 API 服务
""" 主 API 服务
提供 RESTful API 接口
"""
import logging
from typing import List, Optional
from fastapi import FastAPI, HTTPException, Depends, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from src.core.config import settings
from src.api.endpoints import conversations, customers, agents, knowledge, analytics
logging.basicConfig(
level=getattr(logging, settings.LOG_LEVEL),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = FastAPI(
title=settings.PROJECT_NAME,
version=settings.APP_VERSION,
openapi_url=f"{settings.API_V1_STR}/openapi.json",
docs_url="/docs" if settings.DEBUG else None,
redoc_url="/redoc" if settings.DEBUG else None
)
security = HTTPBearer()
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""验证访问令牌"""
token = credentials.credentials
if not token or token != "demo_token":
raise HTTPException(
status_code=401,
detail="无效的访问令牌",
headers={"WWW-Authenticate": "Bearer"}
)
return {"user_id": "demo_user"}
app.add_middleware(
CORSMiddleware,
allow_origins=settings.BACKEND_CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["*"] if settings.DEBUG else settings.BACKEND_CORS_ORIGINS
)
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
"""HTTP 异常处理器"""
return JSONResponse(
status_code=exc.status_code,
content={"error": {"code": exc.status_code, "message": exc.detail, "details": getattr(exc, "details", None)}}
)
@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
"""通用异常处理器"""
logger.error(f"未处理异常:{exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={"error": {"code": 500, "message": "内部服务器错误", "details": str(exc) if settings.DEBUG else None}}
)
app.include_router(
conversations.router,
prefix=f"{settings.API_V1_STR}/conversations",
tags=["conversations"],
dependencies=[Depends(verify_token)]
)
app.include_router(
customers.router,
prefix=f"{settings.API_V1_STR}/customers",
tags=["customers"],
dependencies=[Depends(verify_token)]
)
app.include_router(
agents.router,
prefix=f"{settings.API_V1_STR}/agents",
tags=["agents"],
dependencies=[Depends(verify_token)]
)
app.include_router(
knowledge.router,
prefix=f"{settings.API_V1_STR}/knowledge",
tags=["knowledge"],
dependencies=[Depends(verify_token)]
)
app.include_router(
analytics.router,
prefix=f"{settings.API_V1_STR}/analytics",
tags=["analytics"],
dependencies=[Depends(verify_token)]
)
@app.websocket("/ws/{conversation_id}")
async def websocket_endpoint(
websocket: WebSocket,
conversation_id: str,
token: Optional[str] = None
):
"""WebSocket 端点用于实时通信"""
await websocket.accept()
try:
if not token or token != "demo_token":
await websocket.send_json({"type": "error", "message": "未授权连接"})
await websocket.close()
return
while True:
data = await websocket.receive_json()
response = {
"type": "echo",
"conversation_id": conversation_id,
"message": data.get("message"),
"timestamp": data.get("timestamp")
}
await websocket.send_json(response)
except WebSocketDisconnect:
logger.info(f"WebSocket 连接断开:{conversation_id}")
except Exception as e:
logger.error(f"WebSocket 错误:{e}")
await websocket.close()
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy", "service": settings.PROJECT_NAME, "version": settings.APP_VERSION, "environment": settings.ENVIRONMENT}
@app.get("/")
async def root():
"""根端点"""
return {"message": f"欢迎使用{settings.PROJECT_NAME} API", "version": settings.APP_VERSION, "docs": "/docs" if settings.DEBUG else None, "environment": settings.ENVIRONMENT}
@app.on_event("startup")
async def startup_event():
"""启动事件"""
logger.info(f"启动 {settings.PROJECT_NAME} v{settings.APP_VERSION}")
logger.info(f"环境:{settings.ENVIRONMENT}")
logger.info(f"调试模式:{settings.DEBUG}")
try:
from src.ai_components.nlp.processor import nlp_processor
from src.ai_components.retrieval.engine import retrieval_engine
await nlp_processor.initialize()
await retrieval_engine.initialize()
logger.info("AI 组件初始化完成")
except Exception as e:
logger.error(f"AI 组件初始化失败:{e}")
@app.on_event("shutdown")
async def shutdown_event():
"""关闭事件"""
logger.info("正在关闭服务...")
if __name__ == "__main__":
import uvicorn
uvicorn.run("src.api.main:app", host="0.0.0.0", port=8000, reload=settings.DEBUG, log_level=settings.LOG_LEVEL.lower())
6.2 对话 API 端点
创建 src/api/endpoints/conversations.py:
""" 对话 API 端点
处理对话相关的 HTTP 请求
"""
import logging
from typing import List, Optional, Dict, Any
from datetime import datetime
from fastapi import APIRouter, HTTPException, Depends, WebSocket
from pydantic import BaseModel, Field
import uuid
from src.core.config import settings
from src.services.conversation.manager import conversation_manager, ConversationContext
logger = logging.getLogger(__name__)
router = APIRouter()
class MessageRequest(BaseModel):
"""消息请求"""
content: str = Field(..., min_length=1, max_length=2000)
content_type: str = Field(default="text")
metadata: Optional[Dict[str, Any]] = Field(default=None)
customer_id: Optional[str] = Field(default=None)
channel: str = Field(default="web")
class ConversationCreateRequest(BaseModel):
"""创建对话请求"""
customer_id: str
channel: str = Field(default="web")
metadata: Optional[Dict[str, Any]] = Field(default=None)
initial_message: Optional[str] = Field(default=None)
class ConversationResponse(BaseModel):
"""对话响应"""
conversation_id: str
customer_id: str
channel: str
status: str
created_at: str
updated_at: str
message_count: int
metadata: Dict[str, Any]
summary: Optional[Dict[str, Any]] = None
class MessageResponse(BaseModel):
"""消息响应"""
message_id: str
conversation_id: str
sender_type: str
sender_id: Optional[str]
content: str
content_type: str
created_at: str
metadata: Dict[str, Any]
intent: Optional[str] = None
confidence: Optional[float] = None
class ProcessMessageResponse(BaseModel):
"""处理消息响应"""
conversation_id: str
message_id: str
handler_type: str
response: Dict[str, Any]
analysis: Dict[str, Any]
processing_time_ms: float
context_summary: Dict[str, Any]
@router.post("/", response_model=ConversationResponse)
async def create_conversation(request: ConversationCreateRequest):
"""
创建新对话
Args:
request: 创建对话请求
Returns:
创建的对话信息
"""
try:
logger.info(f"创建对话请求:客户={request.customer_id}, 渠道={request.channel}")
context = await conversation_manager.create_conversation(
customer_id=request.customer_id,
channel=request.channel,
metadata=request.metadata
)
if request.initial_message:
message = {
"id": str(uuid.uuid4()),
"content": request.initial_message,
"content_type": "text",
"sender_type": "customer",
"sender_id": request.customer_id,
"metadata": request.metadata or {},
"timestamp": datetime.now().isoformat()
}
await conversation_manager.process_message(
context.conversation_id,
message
)
response = ConversationResponse(
conversation_id=context.conversation_id,
customer_id=context.customer_id,
channel=context.metadata.get("channel", request.channel),
status=context.metadata.get("current_state", "active"),
created_at=context.created_at.isoformat(),
updated_at=context.updated_at.isoformat(),
message_count=context.message_count,
metadata=context.metadata,
summary=context.get_conversation_summary()
)
return response
except Exception as e:
logger.error(f"创建对话失败:{e}")
raise HTTPException(status_code=500, detail=f"创建对话失败:{str(e)}")
@router.post("/{conversation_id}/messages", response_model=ProcessMessageResponse)
async def process_message(conversation_id: str, request: MessageRequest):
"""
处理对话消息
Args:
conversation_id: 对话 ID
request: 消息请求
Returns:
处理结果
"""
try:
logger.info(f"处理消息:对话={conversation_id}, 内容长度={len(request.content)}")
message_id = str(uuid.uuid4())
message = {
"id": message_id,
"content": request.content,
"content_type": request.content_type,
"sender_type": "customer",
"sender_id": request.customer_id,
"metadata": request.metadata or {},
"timestamp": datetime.now().isoformat()
}
result = await conversation_manager.process_message(
conversation_id,
message
)
response = ProcessMessageResponse(
conversation_id=result["conversation_id"],
message_id=result["message_id"],
handler_type=result["handler_type"],
response=result["response"],
analysis=result["analysis"],
processing_time_ms=result["processing_time_ms"],
context_summary=result["context_summary"]
)
return response
except ValueError as e:
logger.error(f"对话不存在:{conversation_id}")
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"处理消息失败:{e}")
raise HTTPException(status_code=500, detail=f"处理消息失败:{str(e)}")
@router.get("/{conversation_id}", response_model=ConversationResponse)
async def get_conversation(conversation_id: str):
"""
获取对话详情
Args:
conversation_id: 对话 ID
Returns:
对话详情
"""
try:
context = await conversation_manager.get_conversation(conversation_id)
if not context:
raise HTTPException(status_code=404, detail="对话不存在")
response = ConversationResponse(
conversation_id=context.conversation_id,
customer_id=context.customer_id,
channel=context.metadata.get("channel", "unknown"),
status=context.metadata.get("current_state", "active"),
created_at=context.created_at.isoformat(),
updated_at=context.updated_at.isoformat(),
message_count=context.message_count,
metadata=context.metadata,
summary=context.get_conversation_summary()
)
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"获取对话失败:{e}")
raise HTTPException(status_code=500, detail=f"获取对话失败:{str(e)}")
@router.get("/{conversation_id}/messages", response_model=List[MessageResponse])
async def get_conversation_messages(conversation_id: str, limit: int = 50, offset: int = 0):
"""
获取对话消息历史
Args:
conversation_id: 对话 ID
limit: 返回数量限制
offset: 偏移量
Returns:
消息列表
"""
try:
context = await conversation_manager.get_conversation(conversation_id)
if not context:
raise HTTPException(status_code=404, detail="对话不存在")
messages = context.get_recent_messages(limit + offset)
messages = messages[offset:offset + limit]
response_messages = []
for msg in messages:
response_messages.append(MessageResponse(
message_id=msg.get("id", str(uuid.uuid4())),
conversation_id=conversation_id,
sender_type=msg.get("sender_type", "customer"),
sender_id=msg.get("sender_id"),
content=msg.get("content", ""),
content_type=msg.get("content_type", "text"),
created_at=msg.get("timestamp", datetime.now().isoformat()),
metadata=msg.get("metadata", {}),
intent=msg.get("intent"),
confidence=msg.get("confidence")
))
return response_messages
except HTTPException:
raise
except Exception as e:
logger.error(f"获取消息失败:{e}")
raise HTTPException(status_code=500, detail=f"获取消息失败:{str(e)}")
@router.post("/{conversation_id}/close")
async def close_conversation(conversation_id: str, reason: str = "client_closed"):
"""
关闭对话
Args:
conversation_id: 对话 ID
reason: 关闭原因
Returns:
关闭结果
"""
try:
await conversation_manager.close_conversation(conversation_id, reason)
return {
"success": True,
"conversation_id": conversation_id,
"reason": reason,
"closed_at": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"关闭对话失败:{e}")
raise HTTPException(status_code=500, detail=f"关闭对话失败:{str(e)}")
@router.get("/customer/{customer_id}/conversations", response_model=List[ConversationResponse])
async def get_customer_conversations(customer_id: str, limit: int = 20):
"""
获取客户的所有对话
Args:
customer_id: 客户 ID
limit: 返回数量限制
Returns:
对话列表
"""
try:
return []
except Exception as e:
logger.error(f"获取客户对话失败:{e}")
raise HTTPException(status_code=500, detail=f"获取客户对话失败:{str(e)}")
@router.post("/{conversation_id}/transfer")
async def transfer_to_agent(conversation_id: str, agent_id: Optional[str] = None):
"""
转接到人工坐席
Args:
conversation_id: 对话 ID
agent_id: 指定的坐席 ID(可选)
Returns:
转接结果
"""
try:
context = await conversation_manager.get_conversation(conversation_id)
if not context:
raise HTTPException(status_code=404, detail="对话不存在")
message_id = str(uuid.uuid4())
message = {
"id": message_id,
"content": "客户请求转接人工客服",
"content_type": "system",
"sender_type": "system",
"metadata": {"transfer_request": True, "requested_agent_id": agent_id},
"timestamp": datetime.now().isoformat()
}
result = await conversation_manager.process_message(
conversation_id,
message
)
return {
"success": True,
"conversation_id": conversation_id,
"transfer_initiated": True,
"result": result
}
except HTTPException:
raise
except Exception as e:
logger.error(f"转接失败:{e}")
raise HTTPException(status_code=500, detail=f"转接失败:{str(e)}")
@router.websocket("/{conversation_id}/ws")
async def conversation_websocket(
websocket: WebSocket,
conversation_id: str,
token: Optional[str] = None
):
"""
对话 WebSocket 端点
Args:
websocket: WebSocket 连接
conversation_id: 对话 ID
token: 认证令牌
"""
await websocket.accept()
try:
context = await conversation_manager.get_conversation(conversation_id)
if not context:
await websocket.send_json({"type": "error", "message": "对话不存在"})
await websocket.close()
return
if not token or token != "demo_token":
await websocket.send_json({"type": "error", "message": "未授权连接"})
await websocket.close()
return
await websocket.send_json({
"type": "welcome",
"conversation_id": conversation_id,
"customer_id": context.customer_id,
"message": "WebSocket 连接已建立",
"timestamp": datetime.now().isoformat()
})
while True:
data = await websocket.receive_json()
message_type = data.get("type")
if message_type == "message":
content = data.get("content", "")
sender_type = data.get("sender_type", "customer")
message = {
"id": str(uuid.uuid4()),
"content": content,
"content_type": "text",
"sender_type": sender_type,
"sender_id": data.get("sender_id"),
"metadata": data.get("metadata", {}),
"timestamp": datetime.now().isoformat()
}
context.add_message(message)
if sender_type == "customer":
result = await conversation_manager.process_message(
conversation_id,
message
)
await websocket.send_json({"type": "response", "result": result})
else:
await websocket.send_json({"type": "echo", "message": message})
elif message_type == "ping":
await websocket.send_json({"type": "pong", "timestamp": datetime.now().isoformat()})
elif message_type == "close":
await websocket.close()
break
else:
await websocket.send_json({"type": "error", "message": f"未知的消息类型:{message_type}"})
except WebSocketDisconnect:
logger.info(f"WebSocket 连接断开:{conversation_id}")
except Exception as e:
logger.error(f"WebSocket 错误:{e}")
try:
await websocket.send_json({"type": "error", "message": f"服务器错误:{str(e)}"})
await websocket.close()
except:
pass
7. 坐席桌面应用(简化前端)
7.1 坐席界面 HTML 模板
创建 src/static/agent_dashboard.html:
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AI-人工客服坐席桌面</title>
<link href="https://cdn.jsdelivr.net/npm/[email protected]/dist/tailwind.min.css" rel="stylesheet">
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css">
<style>
.conversation-active { border-left: 4px solid #10B981; }
.conversation-pending { border-left: 4px solid #F59E0B; }
.conversation-transferring { border-left: 4px solid #3B82F6; }
.message-customer { background-color: #E5E7EB; align-self: flex-start; }
.message-agent { background-color: #3B82F6; color: white; align-self: flex-end; }
.message-ai { background-color: #10B981; color: white; align-self: flex-end; }
.typing-indicator { display: inline-block; width: 10px; height: 10px; border-radius: 50%; background-color: #9CA3AF; margin: 0 2px; animation: typing 1.4s infinite ease-in-out both; }
.typing-indicator:nth-child(1) { animation-delay: -0.32s; }
.typing-indicator:nth-child(2) { animation-delay: -0.16s; }
@keyframes typing { 0%, 80%, 100% { transform: scale(0); } 40% { transform: scale(1.0); } }
</style>
</head>
<body class="bg-gray-100">
<div class="flex h-screen">
<div class="w-80 bg-white border-r border-gray-200 flex flex-col">
<div class="p-4 border-b border-gray-200">
<div class="flex items-center justify-between">
<div class="flex items-center">
<div class="w-10 h-10 bg-blue-500 rounded-full flex items-center justify-center text-white font-bold"> A </div>
<div class="ml-3">
<div class="font-semibold">坐席:张客服</div>
<div class="text-sm text-gray-500" id="agent-status">离线</div>
</div>
</div>
<div>
<select id="status-select" class="border rounded px-2 py-1 text-sm">
<option value="online">在线</option>
<option value="away">离开</option>
<option value="offline" selected>离线</option>
</select>
</div>
</div>
</div>
<div class="mt-4 grid grid-cols-2 gap-2">
<div class="bg-blue-50 p-2 rounded text-center">
<div class="text-2xl font-bold text-blue-600" id="active-chats">0</div>
<div class="text-xs text-gray-600">进行中</div>
</div>
<div class="bg-green-50 p-2 rounded text-center">
<div class="text-2xl font-bold text-green-600" id="total-chats">0</div>
<div class="text-xs text-gray-600">今日总计</div>
</div>
<div class="bg-yellow-50 p-2 rounded text-center">
<div class="text-2xl font-bold text-yellow-600" id="waiting-chats">0</div>
<div class="text-xs text-gray-600">等待中</div>
</div>
<div class="bg-purple-50 p-2 rounded text-center">
<div class="text-2xl font-bold text-purple-600" id="avg-response">0s</div>
<div class="text-xs text-gray-600">平均响应</div>
</div>
</div>
<div class="flex-1 overflow-y-auto">
<div class="p-4 border-b border-gray-200">
<div class="flex justify-between items-center">
<h3 class="font-semibold">进行中对话</h3>
<span class="bg-blue-100 text-blue-800 text-xs px-2 py-1 rounded-full" id="conversation-count">0</span>
</div>
<div class="mt-2">
<input type="text" placeholder="搜索对话..." class="w-full border rounded px-3 py-1 text-sm">
</div>
</div>
<div id="conversation-list" class="divide-y divide-gray-100">
<div class="p-4 text-center text-gray-500" id="no-conversations">暂无进行中的对话</div>
</div>
</div>
<div class="p-4 border-t border-gray-200">
<h4 class="font-semibold mb-2">快捷回复</h4>
<div class="space-y-1">
<button class="quick-reply-btn w-full text-left px-3 py-2 text-sm bg-gray-100 hover:bg-gray-200 rounded" data-reply="您好,很高兴为您服务!">问候语</button>
<button class="quick-reply-btn w-full text-left px-3 py-2 text-sm bg-gray-100 hover:bg-gray-200 rounded" data-reply="请稍等,我为您查询一下。">请稍等</button>
<button class="quick-reply-btn w-full text-left px-3 py-2 text-sm bg-gray-100 hover:bg-gray-200 rounded" data-reply="请问您还有其他问题吗?">结束语</button>
</div>
</div>
</div>
<div class="flex-1 flex flex-col">
<div class="bg-white border-b border-gray-200 p-4">
<div class="flex justify-between items-center">
<div>
<h2 class="text-xl font-semibold" id="current-customer">未选择对话</h2>
<div class="flex items-center mt-1">
<span class="text-sm text-gray-500" id="conversation-info">-</span>
<span class="ml-3 text-xs px-2 py-1 rounded-full" id="conversation-status-badge">-</span>
</div>
</div>
<div class="flex space-x-2">
<button id="ai-assist-btn" class="px-4 py-2 bg-green-100 text-green-800 rounded hover:bg-green-200"><i class="fas fa-robot mr-1"></i> AI 协助</button>
<button id="transfer-btn" class="px-4 py-2 bg-blue-100 text-blue-800 rounded hover:bg-blue-200"><i class="fas fa-exchange-alt mr-1"></i> 转接</button>
<button id="close-chat-btn" class="px-4 py-2 bg-red-100 text-red-800 rounded hover:bg-red-200"><i class="fas fa-times mr-1"></i> 结束</button>
</div>
</div>
<div id="customer-card" class="mt-4 hidden">
<div class="bg-gray-50 p-3 rounded-lg">
<div class="grid grid-cols-3 gap-4 text-sm">
<div><div class="text-gray-500">客户姓名</div><div id="customer-name" class="font-medium">-</div></div>
<div><div class="text-gray-500">联系电话</div><div id="customer-phone" class="font-medium">-</div></div>
<div><div class="text-gray-500">电子邮件</div><div id="customer-email" class="font-medium">-</div></div>
</div>
<div class="mt-3">
<div class="text-gray-500">历史记录</div>
<div class="text-sm"><span id="history-count">0</span> 次对话 | 最后联系:<span id="last-contact">-</span></div>
</div>
</div>
</div>
</div>
<div class="flex-1 overflow-y-auto p-4 bg-gray-50" id="message-container">
<div class="max-w-4xl mx-auto space-y-4">
<div class="text-center py-8" id="welcome-message">
<div class="text-gray-400 mb-2"><i class="fas fa-comments text-4xl"></i></div>
<h3 class="text-xl font-medium text-gray-600">选择左侧的对话开始聊天</h3>
<p class="text-gray-500 mt-2">AI 已自动处理常规问题,需要人工介入的对话会显示在这里</p>
</div>
</div>
</div>
<div class="bg-white border-t border-gray-200 p-4">
<div class="max-w-4xl mx-auto">
<div id="ai-suggestions" class="mb-3 hidden">
<div class="text-xs text-gray-500 mb-1">AI 建议回复:</div>
<div class="flex flex-wrap gap-2">
<button class="ai-suggestion-btn text-xs bg-green-100 text-green-800 px-3 py-1 rounded-full hover:bg-green-200">好的,我明白了</button>
<button class="ai-suggestion-btn text-xs bg-green-100 text-green-800 px-3 py-1 rounded-full hover:bg-green-200">请提供更多细节</button>
<button class="ai-suggestion-btn text-xs bg-green-100 text-green-800 px-3 py-1 rounded-full hover:bg-green-200">我帮您查询一下</button>
</div>
</div>
<div class="flex items-end space-x-2">
<div class="flex-1 border rounded-lg">
<div class="p-2">
<textarea id="message-input" rows="2" placeholder="输入消息..." class="w-full border-none focus:outline-none resize-none"></textarea>
</div>
<div class="border-t px-2 py-1 flex justify-between items-center">
<div class="flex space-x-2">
<button class="text-gray-400 hover:text-gray-600"><i class="fas fa-paperclip"></i></button>
<button class="text-gray-400 hover:text-gray-600"><i class="far fa-smile"></i></button>
</div>
<div class="text-xs text-gray-500"><span id="char-count">0</span>/1000</div>
</div>
</div>
<button id="send-btn" class="px-6 py-3 bg-blue-600 text-white rounded-lg hover:bg-blue-700"><i class="fas fa-paper-plane"></i></button>
</div>
<div class="mt-2 text-xs text-gray-500 flex items-center">
<div class="flex-1">
<span id="typing-indicator" class="hidden"><span class="typing-indicator"></span><span class="typing-indicator"></span><span class="typing-indicator"></span> 客户正在输入...</span>
</div>
<div><span id="connection-status" class="text-green-600"><i class="fas fa-circle"></i> 连接正常</span></div>
</div>
</div>
</div>
</div>
<div class="w-96 bg-white border-l border-gray-200 p-4 overflow-y-auto">
<h3 class="font-semibold mb-4">AI 分析面板</h3>
<div class="mb-6">
<div class="flex justify-between items-center mb-2">
<h4 class="font-medium">情感分析</h4>
<span class="text-xs text-gray-500" id="sentiment-score">-</span>
</div>
<div class="h-2 bg-gray-200 rounded-full overflow-hidden">
<div id="sentiment-bar" class="h-full bg-green-500 w-1/2"></div>
</div>
<div class="flex justify-between text-xs mt-1">
<span class="text-red-500">负面</span>
<span class="text-gray-500">中性</span>
<span class="text-green-500">正面</span>
</div>
<div class="mt-2 text-sm" id="sentiment-desc">暂无数据</div>
</div>
<div class="mb-6">
<h4 class="font-medium mb-2">意图识别</h4>
<div id="intent-tags" class="flex flex-wrap gap-1">
</div>
<div class="mt-3 text-sm" id="intent-desc">暂无数据</div>
</div>
<div class="mb-6">
<div class="flex justify-between items-center mb-2">
<h4 class="font-medium">相关知识</h4>
<span class="text-xs text-gray-500" id="kb-match-score">-</span>
</div>
<div id="knowledge-suggestions" class="space-y-2">
<div class="text-sm text-gray-500 italic">暂无相关建议</div>
</div>
</div>
<div class="mb-6">
<h4 class="font-medium mb-2">对话摘要</h4>
<div class="text-sm bg-gray-50 p-3 rounded" id="conversation-summary">暂无摘要</div>
</div>
<div>
<h4 class="font-medium mb-2">AI 建议</h4>
<div class="space-y-2">
<div class="flex items-start">
<div class="w-6 h-6 bg-green-100 text-green-800 rounded-full flex items-center justify-center mr-2 mt-0.5"><i class="fas fa-robot text-xs"></i></div>
<div class="text-sm flex-1">
<div id="ai-recommendation">等待分析...</div>
<button id="apply-recommendation" class="mt-1 text-xs text-blue-600 hover:text-blue-800 hidden"><i class="fas fa-check mr-1"></i> 应用建议</button>
</div>
</div>
</div>
</div>
</div>
</div>
<script>
class AgentDashboard {
constructor() {
this.currentConversationId = null;
this.wsConnection = null;
this.agentId = 'agent_001';
this.agentStatus = 'offline';
this.conversations = [];
this.messageInput = document.getElementById('message-input');
this.sendBtn = document.getElementById('send-btn');
this.messageContainer = document.getElementById('message-container');
this.conversationList = document.getElementById('conversation-list');
this.typingIndicator = document.getElementById('typing-indicator');
this.init();
}
init() {
this.initEventListeners();
this.();
.();
.();
.();
}
() {
.().(, {
. = e..;
.();
});
..(, .());
..(, {
(e. === && !e.) {
e.();
.();
}
});
..(, {
count = e...;
.(). = count;
});
.().( {
btn.(, {
reply = e...;
.. = reply;
..();
});
});
.().(, {
.();
});
.().(, {
.();
});
.().(, {
.();
});
}
() {
.();
( {
.(). = ;
}, );
}
() {
. = [
{ : , : , : , : , : , : , : , : , : },
{ : , : , : , : , : , : , : , : , : },
{ : , : , : , : , : , : , : , : , : }
];
.();
.();
}
() {
container = .;
noConversations = .();
(.. === ) {
noConversations..();
container. = ;
;
}
noConversations..();
html = ;
..( {
statusClass = ;
unreadBadge = conv. > ? : ;
html += ;
});
container. = html;
.().( {
item.(, {
conversationId = item..;
customerId = item..;
.(conversationId, customerId);
});
});
.(). = ..;
}
() {
. = conversationId;
conversation = ..( c. === conversationId);
(conversation) {
.(). = conversation.;
.(). = ;
statusBadge = .();
statusBadge. = .(conversation.);
statusBadge. = ;
.(customerId);
.()..();
.(conversationId);
.(conversation);
.(conversationId);
}
}
() {
.()..();
mockCustomer = {
: ,
: ,
: ,
: ,
:
};
.(). = mockCustomer.;
.(). = mockCustomer.;
.(). = mockCustomer.;
.(). = mockCustomer.;
.(). = mockCustomer.;
}
() {
container = .().();
container. = ;
mockMessages = [
{ : , : , : , : },
{ : , : , : , : },
{ : , : , : , : },
{ : , : , : , : }
];
mockMessages.( {
.(msg);
});
.();
}
() {
container = .().();
messageDiv = .();
messageDiv. = ;
messageClass = message. === ? : message. === ? : ;
senderLabel = message. === ? : message. === ? : ;
messageDiv. = ;
container.(messageDiv);
}
() {
content = ...();
(!content || !.) ;
message = {
: + .(),
: ,
: content,
: ().([], { : , : })
};
.(message);
.. = ;
.(). = ;
.();
( {
.();
}, );
.(content);
}
() {
mockReplies = [, , , , ];
randomReply = mockReplies[.(.() * mockReplies.)];
message = {
: + .(),
: ,
: randomReply,
: ().([], { : , : })
};
.();
( {
.();
.(message);
.();
.(randomReply);
}, );
}
() {
...();
}
() {
...();
}
() {
conversation = ..( c. === .);
(conversation) {
conversation. = message;
conversation. = ().([], { : , : });
conversation. = ;
.();
}
}
() {
.(). = conversation. === ? : conversation. === ? : ;
.().. = conversation. === ? : conversation. === ? : ;
.(). = ;
.(). = conversation. === ? : conversation. === ? : ;
intentTags = .();
intentTags. = ;
.(). = ;
knowledgeContainer = .();
knowledgeContainer. = ;
.(). = ;
.(). = ;
.(). = ;
.()..();
}
() {
.(, message);
suggestions = .();
suggestions..();
.().( {
btn.(, {
.. = e..;
..();
});
});
}
() {
(!.) {
();
;
}
aiResponse = {
: + .(),
: ,
: ,
: ().([], { : , : })
};
.(aiResponse);
.();
.(). = ;
}
() {
(!.) {
();
;
}
targetAgent = ();
(targetAgent) {
();
}
}
() {
(!.) {
();
;
}
(()) {
reason = (, );
(reason !== ) {
();
. = ..( c. !== .);
.();
.();
}
}
}
() {
. = ;
.(). = ;
.(). = ;
.(). = ;
.()..();
.()..();
container = .().();
container. = ;
.()..();
}
() {
conversation = ..( c. === conversationId);
(conversation) {
conversation. = ;
.();
}
}
() {
activeChats = ..( c. === ).;
waitingChats = ..( c. === ).;
.(). = activeChats;
.(). = ..;
.(). = waitingChats;
.(). = ;
}
() {
.(). = . === ? : . === ? : ;
.(, .);
}
() {
statusMap = {: , : , : , : , : };
statusMap[status] || status;
}
() {
container = .();
container. = container.;
}
() {
( {
(. && .. === .) {
..(.({ : }));
}
}, );
}
}
.(, {
dashboard = ();
. = dashboard;
});
</script>
</body>
</html>
8. 部署与运维
8.1 Docker 部署配置
# 使用 Python 官方镜像
FROM python:3.10-slim
# 设置工作目录
WORKDIR /app
# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV DEBIAN_FRONTEND=noninteractive
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
build-essential \
curl \
git \
&& rm -rf /var/lib/apt/lists/*
# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# 复制项目文件
COPY . .
# 创建非 root 用户
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000"]
version: '3.8'
services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: ${POSTGRES_USER:-postgres}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-password}
POSTGRES_DB: ${POSTGRES_DB:-customer_service}
volumes:
- postgres_data:/var/lib/postgresql/data
- ./scripts/init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 10s
timeout: 5s
retries: 5
networks:
- ai-customer-service
redis:
image: redis:7-alpine
command: redis-server --requirepass ${REDIS_PASSWORD:-password}
volumes:
- redis_data:/data
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
networks:
- ai-customer-service
api:
build: .
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
environment:
- ENVIRONMENT=production
- POSTGRES_SERVER=postgres
- POSTGRES_PORT=5432
- REDIS_HOST=redis
- REDIS_PORT=6379
volumes:
- ./models:/app/models
- ./data:/app/data
ports:
- "8000:8000"
command: >
sh -c "python scripts/init_database.py && uvicorn src.api.main:app --host 0.0.0.0 --port 8000 --workers 4"
networks:
- ai-customer-service
frontend:
build:
context: .
dockerfile: Dockerfile.frontend
ports:
- "3000:80"
depends_on:
- api
networks:
- ai-customer-service
monitoring:
image: grafana/grafana:latest
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
ports:
- "3001:3000"
volumes:
- grafana_data:/var/lib/grafana
networks:
- ai-customer-service
rabbitmq:
image: rabbitmq:3-management
environment:
RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER:-guest}
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD:-guest}
ports:
- "5672:5672"
- "15672:15672"
volumes:
- rabbitmq_data:/var/lib/rabbitmq
networks:
- ai-customer-service
networks:
ai-customer-service:
driver: bridge
volumes:
postgres_data:
redis_data:
grafana_data:
rabbitmq_data:
创建 scripts/init.sql 数据库初始化脚本:
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "pgvector";
8.2 监控与日志配置
创建 src/core/monitoring.py:
""" 监控配置
配置指标收集、日志记录和性能监控
"""
import logging
import time
from typing import Dict, Any, Optional
from contextlib import contextmanager
from datetime import datetime
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge, Summary
class MetricsCollector:
"""指标收集器"""
def __init__(self):
self.request_count = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
self.request_duration = Histogram('http_request_duration_seconds', 'HTTP request duration in seconds', ['method', 'endpoint'])
self.conversation_count = Gauge('conversations_active', 'Number of active conversations')
self.message_count = Counter('messages_processed_total', 'Total messages processed', ['handler_type', 'intent'])
self.ai_response_time = Summary('ai_response_time_seconds', 'AI response time in seconds')
self.error_count = Counter('errors_total', 'Total errors', ['error_type', 'component'])
self.customer_satisfaction = Gauge('customer_satisfaction_score', 'Customer satisfaction score')
self.agent_performance = Gauge('agent_performance_score', 'Agent performance score', ['agent_id'])
self.ai_confidence = Histogram('ai_confidence_score', 'AI confidence scores', buckets=[0.1, 0.3, 0.5, 0.7, 0.9, 1.0])
def record_request(self, method: str, endpoint: str, status: int, duration: float):
"""记录 HTTP 请求"""
self.request_count.labels(method, endpoint, status).inc()
self.request_duration.labels(method, endpoint).observe(duration)
def record_message(self, handler_type: str, intent: str):
"""记录消息处理"""
self.message_count.labels(handler_type, intent).inc()
def record_ai_response_time(self, duration: float):
"""记录 AI 响应时间"""
self.ai_response_time.observe(duration)
def record_error(self, error_type: str, component: str):
"""记录错误"""
self.error_count.labels(error_type, component).inc()
def update_conversation_count(self, count: int):
"""更新对话计数"""
self.conversation_count.set(count)
def update_satisfaction(self, score: float):
"""更新满意度分数"""
self.customer_satisfaction.set(score)
def update_agent_performance(self, agent_id: str, score: float):
"""更新坐席表现"""
self.agent_performance.labels(agent_id).set(score)
def record_ai_confidence(self, confidence: float):
"""记录 AI 置信度"""
self.ai_confidence.observe(confidence)
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.metrics = MetricsCollector()
self.logger = logging.getLogger(__name__)
@contextmanager
def measure_request(self, method: str, endpoint: str):
"""测量请求性能"""
start_time = time.time()
status = 200
try:
yield
except Exception as e:
status = 500
self.metrics.record_error(type(e).__name__, 'request_handler')
raise
finally:
duration = time.time() - start_time
self.metrics.record_request(method, endpoint, status, duration)
@contextmanager
def measure_ai_response(self):
"""测量 AI 响应性能"""
start_time = time.time()
try:
yield
finally:
duration = time.time() - start_time
self.metrics.record_ai_response_time(duration)
monitor = PerformanceMonitor()
def setup_logging():
"""配置日志"""
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(log_format))
file_handler = logging.FileHandler('logs/app.log')
file_handler.setFormatter(logging.Formatter(log_format))
logging.basicConfig(
level=logging.INFO,
handlers=[console_handler, file_handler],
format=log_format
)
logging.getLogger('uvicorn').setLevel(logging.WARNING)
logging.getLogger('sqlalchemy').setLevel(logging.WARNING)
logging.getLogger('aiosqlite').setLevel(logging.WARNING)
def start_metrics_server(port: int = 9090):
"""启动指标服务器"""
from prometheus_client import start_http_server
try:
start_http_server(port)
logging.info(f"指标服务器启动在端口 {port}")
except Exception as e:
logging.error(f"启动指标服务器失败:{e}")
8.3 测试脚本
创建 tests/test_integration.py:
""" 集成测试
测试系统各个组件的集成
"""
import asyncio
import pytest
import json
from datetime import datetime
from typing import Dict, Any
from src.services.conversation.manager import ConversationManager
from src.ai_components.nlp.processor import NLPProcessor
from src.ai_components.retrieval.engine import KnowledgeRetrievalEngine
class TestIntegration:
"""集成测试类"""
@pytest.fixture(autouse=True)
async def setup(self):
"""测试设置"""
self.conversation_manager = ConversationManager()
self.nlp_processor = NLPProcessor()
self.retrieval_engine = KnowledgeRetrievalEngine()
await self.nlp_processor.initialize()
await self.retrieval_engine.initialize()
yield
self.conversation_manager.conversations.clear()
@pytest.mark.asyncio
async def test_end_to_end_conversation(self):
"""测试端到端对话流程"""
context = await self.conversation_manager.create_conversation(
customer_id="test_customer_001",
channel="web",
metadata={"test": True}
)
assert context.conversation_id is not None
assert context.customer_id == "test_customer_001"
message = {
"id": "test_msg_001",
"content": "你好,我想咨询产品价格",
"content_type": "text",
"sender_type": "customer",
"sender_id": "test_customer_001",
"metadata": {},
"timestamp": datetime.now().isoformat()
}
result = await self.conversation_manager.process_message(
context.conversation_id,
message
)
assert result["conversation_id"] == context.conversation_id
assert result["handler_type"] in ["ai", "hybrid", "agent"]
assert "response" in result
assert "analysis" in result
nlp_analysis = result["analysis"]
assert "intent" in nlp_analysis
assert "confidence" in nlp_analysis
assert "sentiment" in nlp_analysis
retrieval_result = await self.retrieval_engine.search("产品价格")
assert retrieval_result.query == "产品价格"
assert isinstance(retrieval_result.results, list)
assert isinstance(retrieval_result.processing_time_ms, float)
await self.conversation_manager.close_conversation(
context.conversation_id,
"test_complete"
)
closed_context = await self.conversation_manager.get_conversation(
context.conversation_id
)
assert closed_context.metadata["current_state"] == "closed"
assert closed_context.metadata["closed_reason"] == "test_complete"
@pytest.mark.asyncio
async def test_nlp_processing(self):
"""测试 NLP 处理"""
test_cases = [
{"text": "你好,今天天气不错", "expected_intent": "greeting", "min_confidence": 0.5},
{"text": "我要投诉,服务太差了", "expected_intent": "complaint", "min_confidence": 0.6},
{"text": "怎么重置密码?", "expected_intent": "technical_support", "min_confidence": 0.5}
]
for test_case in test_cases:
analysis = await self.nlp_processor.analyze_text(test_case["text"])
assert analysis.text == test_case["text"]
assert analysis.intent_result.intent == test_case["expected_intent"]
assert analysis.intent_result.confidence >= test_case["min_confidence"]
assert analysis.sentiment_result.sentiment in ["positive", "negative", "neutral"]
assert analysis.processing_time_ms > 0
@pytest.mark.asyncio
async def test_knowledge_retrieval(self):
"""测试知识检索"""
test_documents = [
{"id": "test_doc_001", "content": "产品价格分为三个档次:基础版免费,专业版每月 29 美元,企业版定制报价。", "metadata": {"category": "pricing", "importance": 9}},
{"id": "test_doc_002", "content": "技术支持联系方式:电话 400-123-4567,邮箱 [email protected]。", "metadata": {"category": "contact", "importance": 8}}
]
await self.retrieval_engine.add_documents(test_documents)
queries = [("价格是多少", 1), ("怎么联系技术支持", 1), ("产品功能和定价", 2)]
for query, expected_min_results in queries:
result = await self.retrieval_engine.search(query)
assert result.query == query
assert len(result.results) >= expected_min_results
for search_result in result.results:
assert search_result.id is not None
assert search_result.content is not None
assert 0 <= search_result.score <= 1
assert isinstance(search_result.metadata, dict)
@pytest.mark.asyncio
async def test_conversation_state_management(self):
"""测试对话状态管理"""
conversations = []
for i in range(3):
context = await self.conversation_manager.create_conversation(
customer_id=f"test_customer_{i:03d}",
channel="web"
)
conversations.append(context)
assert len(self.conversation_manager.conversations) == 3
for conv in conversations:
await self.conversation_manager.close_conversation(
conv.conversation_id,
"test"
)
closed_conv = await self.conversation_manager.get_conversation(
conv.conversation_id
)
assert closed_conv.metadata["current_state"] == "closed"
@pytest.mark.asyncio
async def test_error_handling(self):
"""测试错误处理"""
with pytest.raises(ValueError):
await self.conversation_manager.process_message("non_existent_conversation", {"content": "test"})
context = await self.conversation_manager.create_conversation(
customer_id="error_test",
channel="web"
)
empty_message = {"id": "empty_msg", "content": "", "sender_type": "customer"}
result = await self.conversation_manager.process_message(
context.conversation_id,
empty_message
)
assert result is not None
assert result["handler_type"] == "ai"
if __name__ == "__main__":
import asyncio
async def run_tests():
tester = TestIntegration()
await tester.setup()
print("运行端到端对话测试...")
await tester.test_end_to_end_conversation()
print("运行 NLP 处理测试...")
await tester.test_nlp_processing()
print("运行知识检索测试...")
await tester.test_knowledge_retrieval()
print("运行对话状态管理测试...")
await tester.test_conversation_state_management()
print("运行错误处理测试...")
await tester.test_error_handling()
print("所有测试通过!")
asyncio.run(run_tests())
9. 项目总结与扩展建议
9.1 项目总结
通过以上实现,我们构建了一个完整的 AI 智能客服与坐席客服融合系统,具有以下特点:
- 模块化架构:系统采用清晰的微服务架构,各组件职责分明,易于维护和扩展。
- 智能对话管理:实现了基于意图识别、情感分析和上下文理解的智能对话路由。
- 人机无缝协作:AI 与人工坐席可以平滑切换,AI 提供实时辅助建议。
- 知识驱动响应:基于向量检索的知识库系统,提供准确、及时的响应。
- 实时通信:支持 WebSocket 实时通信,提供流畅的用户体验。
- 完整的监控运维:包含日志记录、性能监控和健康检查。
9.2 扩展建议
- 高级 AI 模型集成:
- 集成 GPT-4、Claude 等大语言模型,提升对话质量
- 实现多模态理解(图像、语音、视频)
- 添加语音合成(TTS)和语音识别(ASR)功能
- 高级功能扩展:
- 实时翻译支持多语言客服
- 情感语音合成,让 AI 回复更有情感
- 预测性分析,提前发现客户需求
- 自动化工作流程,集成 CRM 系统
- 性能优化:
- 实现模型量化,减少内存占用
- 添加缓存层,提高响应速度
- 实现负载均衡和水平扩展
- 安全增强:
- 添加端到端加密
- 实现更严格的访问控制
- 添加审计日志和合规性检查
- 部署优化:
- Kubernetes 编排,实现自动伸缩
- CI/CD 流水线,自动化部署
- 多区域部署,提高可用性
9.3 快速启动指南
git clone <repository-url>
cd ai-human-customer-service
pip install -r requirements.txt
python -c "from src.core.config import create_env_file; create_env_file()"
docker-compose up -d
python src/api/main.py
这个系统提供了一个完整的 AI 客服解决方案基础框架,可以根据具体需求进行定制和扩展。系统设计考虑了生产环境的可用性、可扩展性和可维护性,是一个可以立即投入使用的企业级解决方案。
connectWebSocket
this
loadMockData
this
updateAgentStatus
this
startHeartbeat
initEventListeners
document
getElementById
'status-select'
addEventListener
'change'
(e) =>
this
agentStatus
target
value
this
updateAgentStatus
this
sendBtn
addEventListener
'click'
() =>
this
sendMessage
this
messageInput
addEventListener
'keypress'
(e) =>
if
key
'Enter'
shiftKey
preventDefault
this
sendMessage
this
messageInput
addEventListener
'input'
(e) =>
const
target
value
length
document
getElementById
'char-count'
textContent
document
querySelectorAll
'.quick-reply-btn'
forEach
btn =>
addEventListener
'click'
(e) =>
const
target
dataset
reply
this
messageInput
value
this
messageInput
focus
document
getElementById
'ai-assist-btn'
addEventListener
'click'
() =>
this
requestAIAssistance
document
getElementById
'transfer-btn'
addEventListener
'click'
() =>
this
transferConversation
document
getElementById
'close-chat-btn'
addEventListener
'click'
() =>
this
closeConversation
connectWebSocket
console
log
'连接到 WebSocket 服务器...'
setTimeout
() =>
document
getElementById
'connection-status'
innerHTML
'<i></i> 连接正常'
1000
loadMockData
this
conversations
id
'conv_001'
customer_id
'cust_001'
customer_name
'张三'
last_message
'你好,我想咨询产品价格'
last_time
'10:20'
unread
2
status
'active'
sentiment
'neutral'
agent_assigned
true
id
'conv_002'
customer_id
'cust_002'
customer_name
'李四'
last_message
'系统出现错误,无法登录'
last_time
'09:45'
unread
0
status
'pending'
sentiment
'negative'
agent_assigned
false
id
'conv_003'
customer_id
'cust_003'
customer_name
'王五'
last_message
'退款什么时候能到账?'
last_time
'昨天'
unread
1
status
'transferring'
sentiment
'negative'
agent_assigned
true
this
updateConversationList
this
updateStatistics
updateConversationList
const
this
conversationList
const
document
getElementById
'no-conversations'
if
this
conversations
length
0
classList
remove
'hidden'
innerHTML
''
return
classList
add
'hidden'
let
''
this
conversations
forEach
conv =>
const
`conversation-${conv.status}`
const
unread
0
`<span>${conv.unread}</span>`
''
`
<div class="${statusClass} conversation-item" data-conversation-id="${conv.id}" data-customer-id="${conv.customer_id}">
<div>
<div>
<div>${conv.customer_name}</div>
<div>${conv.last_message}</div>
</div>
<div>
<div>${conv.last_time}</div>
${unreadBadge}
</div>
</div>
<div>
<span class="${conv.sentiment === 'positive' ? 'bg-green-100 text-green-800' : conv.sentiment === 'negative' ? 'bg-red-100 text-red-800' : 'bg-gray-100 text-gray-800'}">
${conv.sentiment === 'positive' ? '正面' : conv.sentiment === 'negative' ? '负面' : '中性'}
</span>
${conv.agent_assigned ? '<span>已分配</span>' : '<span>待分配</span>'}
</div>
</div>
`
innerHTML
document
querySelectorAll
'.conversation-item'
forEach
item =>
addEventListener
'click'
(e) =>
const
dataset
conversationId
const
dataset
customerId
this
selectConversation
document
getElementById
'conversation-count'
textContent
this
conversations
length
selectConversation
conversationId, customerId
this
currentConversationId
const
this
conversations
find
c =>
id
if
document
getElementById
'current-customer'
textContent
customer_name
document
getElementById
'conversation-info'
textContent
`对话 ID: ${conversationId}`
const
document
getElementById
'conversation-status-badge'
textContent
this
getStatusText
status
className
`ml-3 text-xs px-2 py-1 rounded-full ${conversation.status === 'active' ? 'bg-green-100 text-green-800' : conversation.status === 'pending' ? 'bg-yellow-100 text-yellow-800' : 'bg-blue-100 text-blue-800'}`
this
showCustomerCard
document
getElementById
'welcome-message'
classList
add
'hidden'
this
loadConversationMessages
this
updateAIAnalysis
this
markConversationAsRead
showCustomerCard
customerId
document
getElementById
'customer-card'
classList
remove
'hidden'
const
name
'张三'
phone
'13800138000'
email
history_count
5
last_contact
'2024-01-10'
document
getElementById
'customer-name'
textContent
name
document
getElementById
'customer-phone'
textContent
phone
document
getElementById
'customer-email'
textContent
email
document
getElementById
'history-count'
textContent
history_count
document
getElementById
'last-contact'
textContent
last_contact
loadConversationMessages
conversationId
const
document
getElementById
'message-container'
querySelector
'.max-w-4xl'
innerHTML
'<div></div>'
const
id
'msg_001'
sender_type
'customer'
content
'你好,我想咨询一下你们的产品'
time
'10:15'
id
'msg_002'
sender_type
'ai'
content
'您好!我是 AI 客服助手,很高兴为您服务。我们有多款产品,请问您想了解哪方面的信息?'
time
'10:16'
id
'msg_003'
sender_type
'customer'
content
'我想知道企业版的价格和功能'
time
'10:17'
id
'msg_004'
sender_type
'agent'
content
'您好,我是人工客服张客服。企业版的价格根据用户数量不同有所差异,基础功能包括...'
time
'10:20'
forEach
msg =>
this
addMessageToUI
this
scrollToBottom
addMessageToUI
message
const
document
getElementById
'message-container'
querySelector
'.max-w-4xl'
const
document
createElement
'div'
className
`flex ${message.sender_type === 'customer' ? 'justify-start' : 'justify-end'}`
const
sender_type
'customer'
'message-customer'
sender_type
'agent'
'message-agent'
'message-ai'
const
sender_type
'customer'
'客户'
sender_type
'agent'
'坐席'
'AI 助手'
innerHTML
`
<div>
<div class="${message.sender_type === 'customer' ? 'justify-start' : 'justify-end'}">
<span>${senderLabel} • ${message.time}</span>
</div>
<div class="${messageClass} rounded-2xl px-4 py-3">
${message.content}
</div>
</div>
`
appendChild
sendMessage
const
this
messageInput
value
trim
if
this
currentConversationId
return
const
id
'msg_'
Date
now
sender_type
'agent'
content
time
new
Date
toLocaleTimeString
hour
'2-digit'
minute
'2-digit'
this
addMessageToUI
this
messageInput
value
''
document
getElementById
'char-count'
textContent
'0'
this
scrollToBottom
setTimeout
() =>
this
simulateCustomerReply
3000
this
updateAIAfterMessage
simulateCustomerReply
const
'好的,我明白了'
'那价格是多少呢?'
'有没有试用版?'
'我需要和团队商量一下'
'谢谢你的解答'
const
Math
floor
Math
random
length
const
id
'msg_'
Date
now
sender_type
'customer'
content
time
new
Date
toLocaleTimeString
hour
'2-digit'
minute
'2-digit'
this
showTypingIndicator
setTimeout
() =>
this
hideTypingIndicator
this
addMessageToUI
this
scrollToBottom
this
updateConversationLastMessage
2000
showTypingIndicator
this
typingIndicator
classList
remove
'hidden'
hideTypingIndicator
this
typingIndicator
classList
add
'hidden'
updateConversationLastMessage
message
const
this
conversations
find
c =>
id
this
currentConversationId
if
last_message
last_time
new
Date
toLocaleTimeString
hour
'2-digit'
minute
'2-digit'
unread
0
this
updateConversationList
updateAIAnalysis
conversation
document
getElementById
'sentiment-score'
textContent
sentiment
'positive'
'75%'
sentiment
'negative'
'30%'
'50%'
document
getElementById
'sentiment-bar'
style
width
sentiment
'positive'
'75%'
sentiment
'negative'
'30%'
'50%'
document
getElementById
'sentiment-bar'
className
`h-full ${conversation.sentiment === 'positive' ? 'bg-green-500' : conversation.sentiment === 'negative' ? 'bg-red-500' : 'bg-yellow-500'}`
document
getElementById
'sentiment-desc'
textContent
sentiment
'positive'
'客户情绪积极,对话进展顺利'
sentiment
'negative'
'客户情绪负面,需要特别关注'
'客户情绪中性,对话正常进行'
const
document
getElementById
'intent-tags'
innerHTML
`<span>产品咨询</span><span>价格询问</span>`
document
getElementById
'intent-desc'
textContent
'客户主要关心产品价格和功能'
const
document
getElementById
'knowledge-suggestions'
innerHTML
`
<div>
<div>企业版功能介绍</div>
<div>支持最多 1000 用户,包含高级分析功能,提供专属技术支持。</div>
</div>
<div>
<div>价格策略</div>
<div>基础版免费,专业版$29/月,企业版定制报价。</div>
</div>
`
document
getElementById
'kb-match-score'
textContent
'85% 匹配'
document
getElementById
'conversation-summary'
textContent
'客户咨询企业版产品功能和价格,AI 已提供基本信息,现由人工坐席进一步解答。'
document
getElementById
'ai-recommendation'
textContent
'建议提供详细的功能对比表和报价单,询问客户团队规模以便精准报价。'
document
getElementById
'apply-recommendation'
classList
remove
'hidden'
updateAIAfterMessage
message
console
log
'分析消息:'
const
document
getElementById
'ai-suggestions'
classList
remove
'hidden'
document
querySelectorAll
'.ai-suggestion-btn'
forEach
btn =>
addEventListener
'click'
(e) =>
this
messageInput
value
target
textContent
this
messageInput
focus
requestAIAssistance
if
this
currentConversationId
alert
'请先选择一个对话'
return
const
id
'ai_'
Date
now
sender_type
'ai'
content
'根据对话分析,客户主要关注价格和功能对比。建议:1. 提供详细报价单 2. 安排产品演示 3. 询问预算范围'
time
new
Date
toLocaleTimeString
hour
'2-digit'
minute
'2-digit'
this
addMessageToUI
this
scrollToBottom
document
getElementById
'ai-recommendation'
textContent
'已分析对话历史,建议提供个性化报价和产品演示邀请。'
transferConversation
if
this
currentConversationId
alert
'请先选择一个对话'
return
const
prompt
'请输入目标坐席 ID 或部门:'
if
alert
`已将对话转接给 ${targetAgent}`
closeConversation
if
this
currentConversationId
alert
'请先选择一个对话'
return
if
confirm
'确定要结束这个对话吗?'
const
prompt
'请输入结束原因:'
'问题已解决'
if
null
alert
`对话已结束,原因:${reason}`
this
conversations
this
conversations
filter
c =>
id
this
currentConversationId
this
updateConversationList
this
resetChatArea
resetChatArea
this
currentConversationId
null
document
getElementById
'current-customer'
textContent
'未选择对话'
document
getElementById
'conversation-info'
textContent
'-'
document
getElementById
'conversation-status-badge'
textContent
'-'
document
getElementById
'customer-card'
classList
add
'hidden'
document
getElementById
'welcome-message'
classList
remove
'hidden'
const
document
getElementById
'message-container'
querySelector
'.max-w-4xl'
innerHTML
'<div></div>'
document
getElementById
'ai-suggestions'
classList
add
'hidden'
markConversationAsRead
conversationId
const
this
conversations
find
c =>
id
if
unread
0
this
updateConversationList
updateStatistics
const
this
conversations
filter
c =>
status
'active'
length
const
this
conversations
filter
c =>
status
'pending'
length
document
getElementById
'active-chats'
textContent
document
getElementById
'total-chats'
textContent
this
conversations
length
document
getElementById
'waiting-chats'
textContent
document
getElementById
'avg-response'
textContent
'45s'
updateAgentStatus
document
getElementById
'agent-status'
textContent
this
agentStatus
'online'
'在线'
this
agentStatus
'away'
'离开'
'离线'
console
log
'坐席状态变更为:'
this
agentStatus
getStatusText
status
const
'active'
'进行中'
'pending'
'等待中'
'transferring'
'转接中'
'closed'
'已结束'
'escalated'
'已升级'
return
scrollToBottom
const
document
getElementById
'message-container'
scrollTop
scrollHeight
startHeartbeat
setInterval
() =>
if
this
wsConnection
this
wsConnection
readyState
WebSocket
OPEN
this
wsConnection
send
JSON
stringify
type
'ping'
30000
document
addEventListener
'DOMContentLoaded'
() =>
const
new
AgentDashboard
window
dashboard
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- 随机西班牙地址生成器
随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
- curl 转代码
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online