跳到主要内容
Spring Cloud+AI :实现分布式智能推荐系统 | 极客日志
Python
Spring Cloud+AI :实现分布式智能推荐系统 !在这里插入图片描述 欢迎文末添加好友交流,共同进步! ' 俺はモンキー・D・ルフィ。海贼王になる男だ!' !在这里插入图片描述 !在这里插入图片描述 引言 在当今数字化时代,推荐系统已成为电商平台、内容分发平台、社交网络等互联网产品的核心竞争力之一。从淘宝的"猜你喜欢"、抖音的精准内容推送,到 Netflix 的影视推荐,优秀的推荐系统不仅能显著提升用户留存率和转化率,更能为企业带来可观的商业价…
嘘 发布于 2026/4/6 更新于 2026/5/22 16K 浏览
欢迎文末添加好友交流,共同进步!
' 俺はモンキー・D・ルフィ。海贼王になる男だ!'
引言
在当今数字化时代,推荐系统已成为电商平台、内容分发平台、社交网络等互联网产品的核心竞争力之一。从淘宝的"猜你喜欢"、抖音的精准内容推送,到 Netflix 的影视推荐,优秀的推荐系统不仅能显著提升用户留存率和转化率,更能为企业带来可观的商业价值。据统计,亚马逊约 35% 的销售额来自推荐系统,Netflix 则通过推荐算法为用户节省了每年约 10 亿美元的搜索成本。
然而,随着业务规模的增长和推荐算法的复杂化,传统的单体架构逐渐暴露出诸多瓶颈。首先,推荐系统涉及用户画像构建、实时行为收集、特征工程、模型推理等多个环节,单体应用难以应对日益复杂的业务逻辑;其次,推荐服务需要处理海量并发请求,单机部署无法满足弹性伸缩的需求;再者,AI 模型的迭代更新日益频繁,单体架构下模型部署往往需要重启整个应用,严重影响线上服务稳定性;最后,企业需要支持 A/B 测试以验证不同算法策略的效果,单体架构难以实现灵活的流量隔离和策略切换。
基于上述挑战,采用 Spring Cloud 微服务架构结合分布式 AI 服务成为企业级推荐系统的主流解决方案。这种架构实现了计算与存储的解耦、特征工程与模型推理的分离,支持按需水平扩展,能够轻松应对流量洪峰。同时,微服务架构天然支持模型热更新和灰度发布,使算法迭代更加敏捷。本文将基于 Spring Boot 3 和 Spring Cloud 2022,详细介绍如何从零构建一个高可用、可扩展的智能推荐系统,重点探讨如何在 Java 生态中安全、高效地集成 AI 能力。
整体架构设计
一个完整的推荐系统需要处理从用户请求到推荐结果返回的全链路流程。在微服务架构下,我们将整个系统拆分为多个职责单一的服务,通过服务间协作完成推荐任务。核心服务包括用户服务、商品服务、特征工程服务、推荐服务和模型推理服务。
服务划分与职责
user-service :负责用户基础信息和画像数据的维护,提供用户注册、登录、画像查询等接口。用户画像包括性别、年龄、地域、会员等级等静态属性,以及历史偏好标签等动态属性。
item-service :管理商品/内容元数据,包括商品分类、品牌、价格、库存、标签等信息。同时维护商品特征向量,用于相似度计算。
recommendation-service :推荐系统的核心服务,负责协调其他服务完成推荐流程。它接收用户请求,调用用户和商品服务获取上下文,通过特征工程服务组装特征向量,最终调用模型服务获取推荐结果。
feature-engine :特征工程服务,负责实时特征提取和处理。包括用户实时行为特征(浏览、点击、购买)、商品热度特征、上下文特征(时间、设备、地理位置)等。
model-serving :模型推理服务,负责加载训练好的推荐模型并提供推理接口。该服务通常使用 Python 实现,利用 PyTorch/TensorFlow 的推理能力,通过 REST 或 gRPC 暴露服务。
event-collector :事件收集服务,负责收集用户曝光、点击、转化等行为数据,并将这些数据发送到消息队列(如 Kafka),用于后续的离线训练和在线学习。
服务注册与发现 :使用 Nacos 或 Eureka 作为注册中心,实现服务的动态注册与发现,支持服务健康检查和故障剔除。
API 网关 :使用 Spring Cloud Gateway 作为统一入口,负责路由转发、认证授权、限流熔断、协议转换等功能。
配置中心 :使用 Nacos Config 或 Spring Cloud Config 集中管理各服务的配置文件,支持配置的动态刷新。
消息队列 :使用 Kafka 或 RocketMQ 处理异步事件流,实现实时日志收集和在线学习。
缓存层 :使用 Redis 缓存热点推荐结果和特征数据,减少数据库和模型服务的压力。
┌─────────────────────────────────────────────────────────────────────┐ │ API Gateway (Spring Cloud Gateway) │ │ 认证 | 限流 | 路由 | 熔断 │ └─────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────┐ │ Recommendation Service (推荐服务) │ │ ┌─────────────────────────────────────────────┐ │ │ │ Controller → Service → Feature Assembler │ │ │ └─────────────────────────────────────────────┘ │ └─────┬───────────────┬───────────────┬───────────────┬───────────────┘ │ │ │ │ ▼ ▼ ▼ ▼ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌─────────────────────────┐ │ User │ │ Item │ │ Feature │ │ Model Serving │ │ Service │ │ Service │ │ Engine │ │ (Python/PyTorch) │ │ │ │ │ │ │ │ • 协同过滤 │ │ • 用户信息 │ │ • 商品信息 │ │ • 实时行为 │ │ • 矩阵分解 │ │ • 用户画像 │ │ • 商品特征 │ │ • 特征组装 │ │ • NCF/DeepFM │ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ └─────────┬───────────────┘ │ │ │ │ └───────────────┴───────────────┴────────────────────┘ │ ▼ ┌───────────────────────────┐ │ Nacos (注册中心) │ │ • 服务注册与发现 │ │ • 配置中心 │ │ • 健康检查 │ └───────────────────────────┘ │ ▼ ┌───────────────────────────┐ │ Kafka (消息队列) │ │ • 曝光事件 │ │ • 点击事件 │ │ • 转化事件 │ └───────────────────────────┘ │ ▼ ┌───────────────────────────┐ │ Event Collector Service │ │ • 实时日志收集 │ │ • 在线学习反馈 │ └───────────────────────────┘
特征与模型解耦 :特征工程独立为微服务,模型推理独立为 Python 服务,两者通过标准化接口交互。这种设计使特征工程和模型开发可以并行迭代,互不影响。
水平扩展能力 :各服务可根据负载独立扩容,如双11大促期间可以临时增加推荐服务和模型服务的实例数,应对流量洪峰。
容错与降级 :通过 Resilience4j 实现服务熔断和降级,当模型服务不可用时,可以降级到基于规则的推荐(如热门商品推荐),确保核心业务不受影响。
灰度发布支持 :通过网关的流量路由能力,可以实现不同算法版本的灰度测试,验证新算法效果后再全量发布。
AI 模型选型与训练 推荐算法的选择需要综合考虑业务场景、数据规模、实时性要求和计算资源。对于电商和内容平台,常见的推荐算法包括基于协同过滤的传统方法、基于矩阵分解的隐因子模型,以及基于深度学习的神经网络模型。
协同过滤(Collaborative Filtering)是最经典的推荐算法之一,分为基于用户和基于物品两种。用户协同过滤假设相似用户有相似偏好,通过找到与目标用户相似的用户群,推荐这些用户喜欢的物品。物品协同过滤则基于物品间的相似度,推荐与用户历史行为中物品相似的其他物品。协同过滤实现简单、可解释性强,但存在冷启动和稀疏性问题。
矩阵分解(Matrix Factorization)通过将用户-物品评分矩阵分解为低维的用户矩阵和物品矩阵,捕捉潜在因子关系。SVD(奇异值分解)和其变体 SVD++ 是其中的代表算法。矩阵分解相比协同过滤能更好地处理数据稀疏问题,是目前工业界应用最广泛的算法之一。
深度学习模型如 NCF(Neural Collaborative Filtering)、DeepFM、Wide&Deep 等能够捕捉高阶特征交互,在效果上通常优于传统方法。NCF 用神经网络替代矩阵分解的内积操作,增强表达能力;DeepFM 结合了因子分解机和深度网络的优势;Wide&Deep 则通过联合训练线性模型和深度模型,平衡记忆与泛化能力。
在实际项目中,我们通常使用 Python 生态中的 PyTorch 或 TensorFlow 进行模型训练。以下是一个使用 PyTorch 实现 NCF 模型的简化示例:
# train_ncf_model.pyimport torch import torch.nn as nn import numpy as np from torch.utils .data import Dataset, DataLoader # 定义 NCF 模型classNCFModel(nn.Module ):def__init__ (self, num_users, num_items, embedding_dim=32 ):super (NCFModel, self).__init__ ()# 用户嵌入层 self.user_embedding = nn.Embedding (num_users, embedding_dim)# 物品嵌入层 self.item_embedding = nn.Embedding (num_items, embedding_dim)# MLP 层 self.mlp = nn.Sequential ( nn.Linear (embedding_dim *2 ,128 ), nn.ReLU (), nn.Dropout (0.2 ), nn.Linear (128 ,64 ), nn.ReLU (), nn.Dropout (0.2 ), nn.Linear (64 ,1 ), nn.Sigmoid ())defforward (self, user_ids, item_ids): user_emb = self.user_embedding (user_ids) item_emb = self.item_embedding (item_ids) concat = torch.cat ([user_emb, item_emb], dim=-1 )return self.mlp (concat)# 自定义数据集classRecommendationDataset (Dataset):def__init__ (self, user_ids, item_ids, labels): self.user_ids = torch.LongTensor (user_ids) self.item_ids = torch.LongTensor (item_ids) self.labels = torch.FloatTensor (labels)def__len__ (self):returnlen (self.labels)def__getitem__ (self, idx):return self.user_ids[idx], self.item_ids[idx], self.labels[idx]# 训练函数deftrain_model (train_data, val_data, num_users, num_items, epochs=10 ): device = torch.device ('cuda' if torch.cuda.is_available ()else'cpu' ) model = NCFModel (num_users, num_items).to (device) criterion = nn.BCELoss () optimizer = torch.optim.Adam (model.parameters (), lr=0.001 ) train_loader = DataLoader (train_data, batch_size=256 , shuffle=True)for epoch inrange (epochs): model.train () total_loss =0 for user_ids, item_ids, labels in train_loader: user_ids, item_ids, labels = user_ids.to (device), item_ids.to (device), labels.to (device) optimizer.zero_grad () predictions = model (user_ids, item_ids).squeeze () loss = criterion (predictions, labels) loss.backward () optimizer.step () total_loss += loss.item ()print (f'Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader):.4f}' )return model # 导出模型为 ONNX 格式defexport_to_onnx (model, output_path='ncf_model.onnx' ): model.eval () dummy_user_ids = torch.LongTensor ([0 ]) dummy_item_ids = torch.LongTensor ([0 ]) torch.onnx.export ( model,(dummy_user_ids, dummy_item_ids), output_path, input_names=['user_ids' ,'item_ids' ], output_names=['prediction' ], dynamic_axes={'user_ids':{0 :'batch_size' },'item_ids':{0 :'batch_size' },'prediction':{0 :'batch_size' }}, opset_version=14 )print(f'Model exported to {output_path}')if __name__ =='__main__':# 模拟训练数据 num_users =10000 num_items =50000 num_samples =100000 user_ids = np.random.randint (0 , num_users, num_samples) item_ids = np.random.randint (0 , num_items, num_samples) labels = np.random.randint (0 ,2 , num_samples).astype (float)# 划分训练集和验证集 split_idx =int (0.8 * num_samples) train_data = RecommendationDataset (user_ids[:split_idx], item_ids[:split_idx], labels[:split_idx]) val_data = RecommendationDataset (user_ids[split_idx:], item_ids[split_idx:], labels[split_idx:])# 训练模型 model = train_model (train_data, val_data, num_users, num_items, epochs=5 )# 导出为 ONNX 格式 export_to_onnx (model)
训练完成后,我们需要将模型部署为服务供 Java 应用调用。以下是使用 FastAPI 实现的模型推理服务:
# model_server.pyfrom fastapi import FastAPI, HTTPException from pydantic import BaseModel import torch import onnxruntime as ort import numpy as np from typing import List app = FastAPI(title='Recommendation Model API' )# 加载 ONNX 模型 session = ort.InferenceSession('ncf_model.onnx' )classPredictionRequest(BaseModel): user_ids: List[int] item_ids: List[int]classPredictionResponse(BaseModel): predictions: List[float]@app .post('/api/v1/predict' , response_model=PredictionResponse)asyncdefpredict(request: PredictionRequest):try :# 准备输入数据 user_ids = np.array(request.user_ids, dtype=np.int64).reshape(-1 ,1 ) item_ids = np.array(request.item_ids, dtype=np.int64).reshape(-1 ,1 )# ONNX 推理 inputs ={'user_ids' : user_ids,'item_ids' : item_ids } predictions = session.run(None, inputs)[0 ]return PredictionResponse(predictions=predictions.tolist())except Exception as e:raise HTTPException(status_code=500 , detail=str(e))@app .get ('/health' )asyncdefhealth_check():return {'status' :'healthy' }if __name__ =='__main__' :import uvicorn uvicorn.run(app, host='0.0.0.0' , port=8000 )
这个模型服务提供了 REST API,Java 应用可以通过 HTTP 调用。对于高性能场景,也可以考虑使用 gRPC 协议,它比 REST 更高效,支持双向流式通信。
核心微服务实现 本节将详细介绍推荐系统中核心微服务的实现,包括服务间调用、特征组装、模型推理集成、异步日志收集和熔断降级等关键功能。所有代码基于 Spring Boot 3 和 Spring Cloud 2022。
项目结构与依赖 <project > <groupId > com.example.recommendation</groupId > <artifactId > recommendation-system</artifactId > <version > 1.0.0</version > <packaging > pom</packaging > <modules > <module > user-service</module > <module > item-service</module > <module > recommendation-service</module > <module > feature-engine</module > <module > common</module > </modules > <properties > <java.version > 17</java.version > <spring-boot.version > 3.2.0</spring-boot.version > <spring-cloud.version > 2023.0.0</spring-cloud.version > </properties > <dependencyManagement > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-dependencies</artifactId > <version > ${spring-boot.version}</version > <type > pom</type > <scope > import</scope > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-dependencies</artifactId > <version > ${spring-cloud.version}</version > <type > pom</type > <scope > import</scope > </dependency > </dependencies > </dependencyManagement > </project >
User Service 实现 / / user - service/ src/ main/ java/ com/ example/ user / controller/ UserController.javapackagecom.example.user.controller;importcom.example.user.dto.UserProfileDTO;importcom.example.user.service.UserService;importlombok.RequiredArgsConstructor;importorg.springframework.web.bind.annotation.* ;@RestController @RequestMapping ("/api/users")@RequiredArgsConstructorpublicclassUserController {privatefinalUserService userService;@GetMapping ("/{userId}/profile")publicUserProfileDTOgetUserProfile(@PathVariableLong userId){return userService.getUserProfile(userId);}@PostMapping ("/profiles/batch")publicMap< Long,UserProfileDTO> getUserProfilesBatch(@RequestBodyList < Long> userIds){return userService.getUserProfilesBatch(userIds);}}
/ / user - service/ src/ main/ java/ com/ example/ user / service/ UserService.javapackagecom.example.user.service;importcom.example.user.dto.UserProfileDTO;importcom.example.user.entity.UserProfile;importcom.example.user.repository.UserProfileRepository;importlombok.RequiredArgsConstructor;importorg.springframework.cache.annotation.Cacheable;importorg.springframework.stereotype.Service;importjava.util.List;importjava.util.Map;importjava.util.stream.Collectors;@Service @RequiredArgsConstructorpublicclassUserService {privatefinalUserProfileRepository userProfileRepository;@Cacheable (value = "userProfiles", key = "#userId")publicUserProfileDTOgetUserProfile(Long userId){UserProfile profile = userProfileRepository.findById(userId).orElseThrow(()- > newRuntimeException("User not found: "+ userId));returnUserProfileDTO.builder().userId(profile.getUserId()).gender(profile.getGender()).age(profile.getAge()).city(profile.getCity()).membershipLevel(profile.getMembershipLevel()).interestTags(profile.getInterestTags()).build();}publicMap< Long,UserProfileDTO> getUserProfilesBatch(List< Long> userIds){List< UserProfile> profiles = userProfileRepository.findAllById(userIds);return profiles.stream().collect (Collectors.toMap(UserProfile::getUserId, profile - > UserProfileDTO.builder().userId(profile.getUserId()).gender(profile.getGender()).age(profile.getAge()).city(profile.getCity()).membershipLevel(profile.getMembershipLevel()).interestTags(profile.getInterestTags()).build()));}}
# user-service/src/main/resources/application.ymlserver:port:8081spring:application:name: user-service datasource:url: jdbc:mysql://localhost:3306/user_db username: root password: password data:redis:host: localhost port:6379# Nacos 注册中心配置spring.cloud.nacos:discovery:server-addr: localhost:8848namespace: public config:server-addr: localhost:8848file-extension: yml
Recommendation Service 实现 推荐服务是整个系统的核心,负责协调各服务完成推荐流程。首先定义 Feign 客户端:
// recommendation-service/src/main/java/com/example/recommendation/client/UserClient.javapackagecom.example.recommendation.client;importcom.example.common.dto.UserProfileDTO;importorg.springframework.cloud.openfeign.FeignClient;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.PathVariable;importorg.springframework.web.bind.annotation.PostMapping;importorg.springframework.web.bind.annotation.RequestBody;importjava.util.List;importjava.util.Map;@FeignClient ( name = "user-service" , path = "/api/users" , fallbackFactory = UserClientFallback.class) publicinterfaceUserClient{ @GetMapping ( "/{userId}/profile" ) UserProfileDTOgetUserProfile( @PathVariable ( "userId" ) Long userId) ;@PostMapping ( "/profiles/batch" ) Map<Long,UserProfileDTO>getUserProfilesBatch( @RequestBodyList <Long> userIds) ;}
// recommendation-service/src/main/java/com/example/recommendation/client/ItemClient.javapackagecom.example.recommendation.client;importcom.example.common.dto.ItemDTO;importorg.springframework.cloud.openfeign.FeignClient;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.PathVariable;importorg.springframework.web.bind.annotation.PostMapping;importorg.springframework.web.bind.annotation.RequestBody;importjava.util.List;importjava.util.Map;@FeignClient ( name = "item-service" , path = "/api/items" , fallbackFactory = ItemClientFallback.class) publicinterfaceItemClient{ @GetMapping ( "/{itemId}" ) ItemDTOgetItem( @PathVariable ( "itemId" ) Long itemId) ;@PostMapping ( "/batch" ) List<ItemDTO>getItemsBatch( @RequestBodyList <Long> itemIds) ;@GetMapping ( "/category/{category}" ) List<ItemDTO>getItemsByCategory( @PathVariable ( "category" ) String category) ;}
// recommendation-service/src/main/java/com/example/recommendation/client/UserClientFallback.javapackagecom.example.recommendation.client;importcom.example.common.dto.UserProfileDTO;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Component;importjava.util.Collections;importjava.util.List;importjava.util.Map;@Slf 4j@ComponentpublicclassUserClientFallbackimplementsUserClient {@OverridepublicUserProfileDTOgetUserProfile (Long userId){ log.warn("User service fallback triggered for userId: {}" , userId);// 返回默认用户画像returnUserProfileDTO.builder().userId(userId).gender("unknown" ).age(25 ).city("unknown" ).membershipLevel("NORMAL" ).interestTags(Collections.emptyList()).build();}@OverridepublicMap <Long,UserProfileDTO>getUserProfilesBatch(List<Long> userIds){ log.warn("User service batch fallback triggered" );returnCollections.emptyMap();}}
// recommendation-service/src /main /java/com/example/recommendation/service/RecommendationService.javapackagecom .example .recommendation .service ;importcom.example .common .dto .*;importcom.example .recommendation .client .ItemClient ;importcom.example .recommendation .client .UserClient ;importcom.example .recommendation .client .ModelClient ;importcom.example .recommendation .config .RecommendationProperties ;importlombok.RequiredArgsConstructor ;importlombok.extern .slf4j .Slf4j ;importorg.springframework .data .redis .core .RedisTemplate ;importorg.springframework .stereotype .Service ;importjava.time .Duration ;importjava.util .*;importjava.util .stream .Collectors ;@Slf4j @Service @RequiredArgsConstructorpublicclassRecommendationService {privatefinalUserClient userClient;privatefinalItemClient itemClient;privatefinalModelClient modelClient;privatefinalFeatureEngineClient featureEngineClient;privatefinalRedisTemplate<String,Object > redisTemplate;privatefinalKafkaTemplate<String,Object > kafkaTemplate;privatefinalRecommendationProperties properties;publicRecommendationResultgetRecommendations(Long userId,String scenario,int size){// 1 . 尝试从缓存获取String cacheKey =String.format ("recommendation:%d:%s", userId, scenario);List<RecommendedItem> cachedResult =(List<RecommendedItem>) redisTemplate.opsForValue().get(cacheKey);if(cachedResult !=null){ log.info(" Cache hit for user: {}, scenario: {}", userId, scenario);returnRecommendationResult.builder ().userId (userId).scenario (scenario).items (cachedResult).source ("CACHE").build ();}// 2 . 获取用户画像UserProfileDTO userProfile = userClient.getUserProfile (userId);// 3 . 获取候选商品集(从召回池中筛选)List<Long> candidateItemIds =getCandidateItems(userProfile, scenario, size *10 );// 4 . 批量获取商品信息List<ItemDTO> candidateItems = itemClient.getItemsBatch (candidateItemIds);// 5 . 特征工程FeatureVector featureVector =buildFeatureVector(userId, userProfile, candidateItems);// 6 . 模型推理评分Map<Long,Double> itemScores = modelClient.predict (userId, candidateItemIds, featureVector);// 7 . 排序并返回 Top -KList<RecommendedItem> recommendedItems = candidateItems.stream ().filter (item -> itemScores.containsKey (item.getItemId ())).sorted ((a , b )->Double.compare ( itemScores.get (b .getItemId ()), itemScores.get (a .getItemId ()))).limit (size).map (item ->RecommendedItem.builder ().itemId (item.getItemId ()).itemName (item.getItemName ()).category (item.getCategory ()).price (item.getPrice ()).score (itemScores.get (item.getItemId ())).reason ("基于您的兴趣推荐").build ()).collect (Collectors.toList ());// 8 . 缓存结果 redisTemplate.opsForValue ().set ( cacheKey, recommendedItems,Duration.ofMinutes (properties.getCacheExpireMinutes ()));// 9 . 异步记录曝光事件recordExposureEvent(userId, scenario, recommendedItems);returnRecommendationResult.builder ().userId (userId).scenario (scenario).items (recommendedItems).source ("MODEL").build ();}privateList<Long>getCandidateItems(UserProfileDTO userProfile,String scenario,int poolSize){// 实际实现中,这里可以从离线计算的召回池中获取// 这里简化为按用户兴趣标签获取相关商品List<String> interests = userProfile.getInterestTags ();if(interests.isEmpty ()){ interests =Arrays.asList ("热门");}return itemClient.getItemsByCategory (interests.get (0 )).stream ().limit (poolSize).map (ItemDTO::getItemId).collect (Collectors.toList ());}privateFeatureVectorbuildFeatureVector(Long userId,UserProfileDTO userProfile,List<ItemDTO> items){returnFeatureVector.builder ().userId (userId).userGender (userProfile.getGender ()).userAge (userProfile.getAge ()).userCity (userProfile.getCity ()).membershipLevel (userProfile.getMembershipLevel ()).interestTags (userProfile.getInterestTags ()).itemCategories (items.stream ().map (ItemDTO::getCategory).distinct ().collect (Collectors.toList ())).hourOfDay (LocalDateTime.now ().getHour ()).dayOfWeek (LocalDateTime.now ().getDayOfWeek ().getValue ()).build ();}privatevoidrecordExposureEvent(Long userId,String scenario,List<RecommendedItem> items){List<Long> itemIds = items.stream ().map (RecommendedItem::getItemId).collect (Collectors.toList ());ExposureEvent event =ExposureEvent.builder ().userId (userId).scenario (scenario).itemIds (itemIds).timestamp (System.currentTimeMillis ()).build (); kafkaTemplate.send ("recommendation-exposure", event); log.debug ("Exposure event sent for user: {}", userId);}publicvoidrecordClickEvent(Long userId,Long itemId,String scenario){ClickEvent event =ClickEvent.builder ().userId (userId).itemId (itemId).scenario (scenario).timestamp (System.currentTimeMillis ()).build (); kafkaTemplate.send ("recommendation-click", event); log.info ("Click event recorded for user: {}, item: {}", userId, itemId);}}
模型推理客户端(通过 WebClient 调用 Python 模型服务):
// recommendation-service/src /main /java/com/example/recommendation/client/ModelClient.javapackagecom .example .recommendation .client ;importcom.example .recommendation .dto .ModelPredictRequest ;importlombok.RequiredArgsConstructor ;importlombok.extern .slf4j .Slf4j ;importorg.springframework .beans .factory .annotation .Value ;importorg.springframework .stereotype .Component ;importorg.springframework .web .reactive .function .client .WebClient ;importreactor.core .publisher .Mono ;importreactor.util .retry .Retry ;importjava.time .Duration ;importjava.util .HashMap ;importjava.util .List ;importjava.util .Map ;@Slf4j @Component @RequiredArgsConstructorpublicclassModelClient {privatefinalWebClient.Builder webClientBuilder;@Value ("${model.service .url }")privateString modelServiceUrl;@Value ("${model.service .timeout }")privateint timeoutMs;publicMap<Long,Double>predict(Long userId,List<Long> itemIds,FeatureVector featureVector){WebClient webClient = webClientBuilder .baseUrl (modelServiceUrl).build ();// 构建请求体List<Integer> userIds =Collections.nCopies (itemIds.size (), userId.intValue ());ModelPredictRequest request =ModelPredictRequest.builder ().userIds (userIds).itemIds (itemIds.stream ().map (Long::intValue).collect (Collectors.toList ())).featureVector (featureVector).build ();// 发起请求并处理响应Map<Long,Double> scores =newHashMap<>();try{ModelPredictResponse response = webClient.post ().uri ("/api/v1/predict").bodyValue (request).retrieve ().bodyToMono (ModelPredictResponse.class ).timeout (Duration.ofMillis (timeoutMs)).retryWhen (Retry.backoff (3 ,Duration.ofMillis (100 ))).block ();if(response !=null&& response.getPredictions ()!=null){for(int i =0 ; i < itemIds.size (); i ++){ scores.put (itemIds.get (i ), response.getPredictions ().get (i ));}}}catch(Exception e){ log.error ("Model prediction failed for userId: {}, error: {}", userId, e.getMessage ());// 返回默认分数 itemIds.forEach (id -> scores.put (id,0.5 ));}return scores;}}
# recommendation- service/src/ main/resources/ application.ymlserver:port:8083spring:application:name: recommendation- service cloud:nacos:discovery:server- addr: localhost:8848 # Feign 配置openfeign:client:config:default :connectTimeout:2000readTimeout:5000loggerLevel: basic circuitbreaker:enabled:true # Resilience4j 配置circuitbreaker:configs:default :slidingWindowSize:10minimumNumberOfCalls:5failureRateThreshold:50waitDurationInOpenState: 10s permittedNumberOfCallsInHalfOpenState:3kafka:bootstrap- servers: localhost:9092producer:key- serializer: org.apache.kafka.common.serialization.StringSerializer value- serializer: org.springframework.kafka.support.serializer.JsonSerializer # 模型服务配置model:service:url: http:
熔断与降级配置 使用 Resilience4j 实现服务熔断和降级:
// recommendation-service/src /main /java/com/example/recommendation/config/ResilienceConfig.javapackagecom .example .recommendation .config ;importio.github .resilience4j .circuitbreaker .CircuitBreakerConfig ;importio.github .resilience4j .circuitbreaker .CircuitBreakerRegistry ;importio.github .resilience4j .timelimiter .TimeLimiterConfig ;importorg.springframework .context .annotation .Bean ;importorg.springframework .context .annotation .Configuration ;importjava.time .Duration ;@ConfigurationpublicclassResilienceConfig {@BeanpublicCircuitBreakerRegistrycircuitBreakerRegistry (){CircuitBreakerConfig config =CircuitBreakerConfig.custom ().slidingWindowSize (10 ).minimumNumberOfCalls (5 ).failureRateThreshold (50 ).waitDurationInOpenState (Duration.ofSeconds (10 )).permittedNumberOfCallsInHalfOpenState (3 ).slowCallDurationThreshold (Duration.ofSeconds (3 )).slowCallRateThreshold (50 ).build ();returnCircuitBreakerRegistry.of (config);}@BeanpublicTimeLimiterConfigtimeLimiterConfig (){returnTimeLimiterConfig.custom ().timeoutDuration (Duration.ofSeconds (5 )).build ();}}
// recommendation-service/src/main/java/com/example/recommendation/fallback/RecommendationFallback.javapackagecom.example.recommendation.fallback;importcom.example.common.dto.RecommendationResult;importcom.example.common.dto.RecommendedItem;importlombok.extern.slf4j.Slf4j;importorg.springframework.stereotype.Component;importjava.util.Arrays;importjava.util.Collections;importjava.util.List;@Slf 4j@ComponentpublicclassRecommendationFallback {/** * 模型服务降级:返回热门商品推荐 */pu blicRecommendationResultgetHotItemsFallback(Long userId,String scenario){ log.warn("Model service degraded, using hot items fallback for user: {}" , userId);// 这里可以从缓存或数据库中获取热门商品List<RecommendedItem> hotItems =getHotItems(scenario);returnRecommendationResult.builder().userId(userId).scenario(scenario).items(hotItems).source("FALLBACK_HOT_ITEMS" ).build();}/** * 获取热门商品(实际实现中应从缓存或数据库获取) */privateList<RecommendedItem>getHotItems(String scenario){/ / 简化实现,返回固定的热门商品returnArrays.asList(RecommendedItem.builder().itemId(1001L).itemName("热门商品1" ).category("电子" ).price(299.0 ).score(0.95 ).reason("热门推荐" ).build(),RecommendedItem.builder().itemId(1002L).itemName("热门商品2" ).category("服装" ).price(199.0 ).score(0.92 ).reason("热门推荐" ).build());}}
异步日志收集 通过 Kafka 发送用户行为事件,用于在线学习和模型更新:
// recommendation-service/src/main/java/com/example/recommendation/producer/EventProducer.javapackagecom.example.recommendation.producer;importcom.example.common.dto.ExposureEvent;importcom.example.common.dto.ClickEvent;importlombok.RequiredArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.support.SendResult;importorg.springframework.stereotype.Component;importorg.springframework.util.concurrent.ListenableFutureCallback;@Slf4j@Component@RequiredArgsConstructorpublicclassEventProducer{privatefinalKafkaTemplate<String ,Object > kafkaTemplate;/** * 发送曝光事件 */publicvoidsendExposureEvent(ExposureEvent event ){ kafkaTemplate.send("recommendation-exposure" , event ).addCallback(newListenableFutureCallback<SendResult<String ,Object >>(){@OverridepublicvoidonSuccess(SendResult<String ,Object > result){ log.debug("Exposure event sent successfully: {}" , event );}@OverridepublicvoidonFailure(Throwable ex){ log.error ("Failed to send exposure event: {}" , event , ex);}});}/** * 发送点击事件 */publicvoidsendClickEvent(ClickEvent event ){ kafkaTemplate.send("recommendation-click" , event ).addCallback(newListenableFutureCallback<SendResult<String ,Object >>(){@OverridepublicvoidonSuccess(SendResult<String ,Object > result){ log.info("Click event sent successfully: {}" , event );}@OverridepublicvoidonFailure(Throwable ex){ log.error ("Failed to send click event: {}" , event , ex);}});}}
部署与性能优化 完成了核心服务的开发后,接下来需要考虑如何将这些服务部署到生产环境,并进行性能优化以确保系统的高可用和低延迟。
Docker 容器化 首先为每个服务创建 Dockerfile。以下以 recommendation-service 为例:
# recommendation- service/ Dockerfile FROM eclipse- temurin:17 - jre- alpine WORKDIR / app # 复制 JAR 文件 COPY target/ recommendation- service- * .jar app.jar # 设置 JVM 参数 ENV JAVA_OPTS= "-Xms512m -Xmx1g -XX:+UseG1GC -XX:MaxGCPauseMillis=200" EXPOSE 8083 ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar app.jar"]
使用 Docker Compose 进行本地开发环境编排:
# docker-compose.ymlversion:'3.8 'services:# Nacos 注册中心nacos:image: nacos/nacos-server:v2.2.3 ports:-"8848:8848" environment:MODE: standalone # MySQLmysql:image: mysql:8 .0ports:-"3306:3306" environment:MYSQL_ROOT_PASSWORD: password MYSQL_DATABASE: recommendation_db volumes:- mysql-data :/var/lib/mysql # Redisredis:image: redis:7 -alpine ports:-"6379:6379" volumes:- redis-data :/data # Kafkakafka:image: confluentinc/cp-kafka:7.5 .0 ports:-"9092:9092" environment:KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:1depends_on:- zookeeper zookeeper:image: confluentinc/cp-zookeeper:7.5 .0 ports:-"2181:2181" environment:ZOOKEEPER_CLIENT_PORT:2181 # 用户服务user-service:build: ./user-service ports:-"8081:8081" environment:SPRING_CLOUD_NACOS_DISCOVERY_SERVER_ADDR: nacos:8848SPRING_DATASOURCE_URL: jdbc:mysql://mysql:3306 /user_db SPRING_DATA_REDIS_HOST: redis depends_on:- nacos - mysql - redis # 商品服务item-service:build: ./item-service ports:-"8082:8082" environment:SPRING_CLOUD_NACOS_DISCOVERY_SERVER_ADDR: nacos:8848SPRING_DATASOURCE_URL: jdbc:mysql://mysql:3306 /item_db depends_on:- nacos - mysql # 推荐服务recommendation-service:build: ./recommendation-service ports:-"8083:8083" environment:SPRING_CLOUD_NACOS_DISCOVERY_SERVER_ADDR: nacos:8848SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092MODEL_SERVICE_URL: http://model-service:8000depends_on:- nacos - kafka - user-service - item-service # 模型服务(Python)model-service:build: ./model-service ports:-"8000:8000" environment:- MODEL_PATH=/app/models/ncf_model.onnx volumes:- ./models:/app/models volumes:mysql-data :redis-data :
Kubernetes 编排 对于生产环境,使用 Kubernetes 进行容器编排。以下是 recommendation-service 的部署配置:
# k8s/recommendation-service-deployment.yamlapiVersion: apps/v1 kind: Deployment metadata:name: recommendation-service labels:app: recommendation-service spec:replicas:3selector:matchLabels:app: recommendation-service template:metadata:labels:app: recommendation-service spec:containers:-name: recommendation-service image: your-registry/recommendation-service:1.0.0 ports:-containerPort:8083env:-name: SPRING_PROFILES_ACTIVE value:"prod" -name: SPRING_CLOUD_NACOS_DISCOVERY_SERVER_ADDR value:"nacos-service:8848" -name: SPRING_KAFKA_BOOTSTRAP_SERVERS value:"kafka-service:9092" -name: MODEL_SERVICE_URL value:"http://model-service:8000" -name: JAVA_OPTS value:"-Xms1g -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=200" resources:requests:memory:"1Gi" cpu:"500m" limits:memory:"2Gi" cpu:"2000m" livenessProbe:httpGet:path: /actuator/health port:8083initialDelaySeconds:60periodSeconds:10readinessProbe:httpGet:path: /actuator/health port:8083initialDelaySeconds:30periodSeconds:5---apiVersion: v1 kind: Service metadata:name: recommendation-service spec:selector:app: recommendation-service ports:-protocol: TCP port:8083targetPort:8083type: ClusterIP ---apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata:name: recommendation-service-hpa spec:scaleTargetRef:apiVersion: apps/v1 kind: Deployment name: recommendation-service minReplicas:3maxReplicas:10metrics:-type : Resource resource:name: cpu target:type : Utilization averageUtilization:70-type : Resource resource:name: memory target:type : Utilization averageUtilization:80
性能优化策略 对于深度学习模型,使用 GPU 可以显著提升推理速度。以下是支持 GPU 的模型服务部署配置:
# model-service/Dockerfile FROM nvidia/cuda:11.8.0-runtime-ubuntu22.04 RUN apt-get update && apt-get install -y \ python3.10 \ python3-pip \ && rm -rf /var/lib/apt/lists/* WORKDIR /app COPY requirements.txt . RUN pip3 install --no-cache-dir -r requirements.txt COPY model_server.py . COPY models/ ./models/ EXPOSE 8000 CMD ["python3" , "model_server.py" ]
resources:limits:nvidia.com/gpu:1
// CacheConfiguration.java@Configuration@EnableCachingpublicclassCacheConfiguration {@BeanpublicRedisCacheManagerredisCacheManager (RedisConnectionFactory factory){RedisCacheConfiguration config =RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofMinutes(30 )).serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(newStringRedisSerializer())).serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(newGenericJackson2JsonRedisSerializer()));returnRedisCacheManager.builder(factory).cacheDefaults(config).withInitialCacheConfigurations(getCacheConfigurations()).build();}privateMap<String,RedisCacheConfiguration>getCacheConfigurations(){Map<String,RedisCacheConfiguration> configMap =newHashMap<>();// 用户画像缓存 - 1 小时 configMap.put("userProfiles" ,RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofHours(1 )));// 推荐结果缓存 - 30 分钟 configMap.put("recommendations" ,RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofMinutes(30 )));// 热门商品缓存 - 10 分钟 configMap.put("hotItems" ,RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofMinutes(10 )));return configMap;}}
针对推荐服务的内存和 CPU 特性进行 JVM 参数调优:
# 推荐服务的推荐 JVM 参数JAVA_OPTS=" -Xms2g -Xmx4g -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:G1ReservePercent=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/logs/heapdump.hprof -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/logs/gc.log -Duser.timezone=Asia/Shanghai -Dfile.encoding=UTF-8 "
为高频查询字段添加索引
使用读写分离分担主库压力
对于用户画像等热点数据使用 Redis 缓存
-- 用户画像表索引优化CREATEINDEX idx_user_id ON user_profile (user_id);CREATEINDEX idx_membership ON user_profile (membership_level);CREATEINDEX idx_city ON user_profile (city);-- 商品表索引优化CREATEINDEX idx_item_category ON item (category);CREATEINDEX idx_item_brand ON item (brand);CREATEINDEX idx_item_created ON item (created_at);
压测结果 使用 JMeter 进行压力测试,测试场景为并发获取推荐结果:
推荐服务:3 个实例,每实例 2C4G
模型服务:2 个实例,每实例 4C8G + GPU
Redis:1 主 2 从
Kafka:3 节点集群
并发数 QPS 平均响应时间 P95 响应时间 P99 响应时间 错误率 100 850 115ms 180ms 250ms 0% 500 3200 155ms 280ms 420ms 0% 1000 4800 205ms 450ms 680ms 0.1% 2000 5200 380ms 850ms 1200ms 2%
并发数 QPS 平均响应时间 P95 响应时间 P99 响应时间 错误率 100 1500 65ms 95ms 130ms 0% 500 5800 85ms 150ms 220ms 0% 1000 9500 105ms 200ms 320ms 0% 2000 12000 165ms 350ms 520ms 0.05%
通过缓存和 JVM 调优,系统性能提升了约 100-150%,P99 延迟降低了约 50%。
总结与展望
本文详细介绍了基于 Spring Cloud 微服务架构构建智能推荐系统的完整过程,从架构设计、模型选型、服务实现到部署优化,全方位展示了如何在企业级 Java 生态中集成 AI 能力。通过微服务架构,我们实现了特征工程与模型推理的解耦,使各组件可以独立开发、部署和扩展;通过服务熔断和降级策略,确保了系统的高可用性;通过多级缓存和 JVM 调优,显著提升了系统性能。
Spring Cloud 与 AI 的结合为传统 Java 应用注入了智能化能力,使开发者可以在熟悉的 Java 生态中轻松集成机器学习模型。这种架构的优势在于:一是技术栈的灵活性,Java 开发者专注于业务逻辑和数据流转,Python 算法工程师专注于模型开发和训练,各司其职;二是系统的可扩展性,各服务可根据负载独立扩容,应对流量洪峰;三是部署的便捷性,通过容器化和编排工具,可以实现快速部署和弹性伸缩。
1. 实时推荐 :当前的系统主要基于离线计算的批量推荐,未来可以引入实时流处理技术(如 Flink),实现基于用户实时行为的毫秒级推荐更新。当用户浏览一个商品后,系统可以立即调整后续推荐内容,提升用户体验和转化率。
2. 多模态特征融合 :随着多媒体内容的普及,推荐系统需要处理文本、图像、视频等多种模态的特征。可以引入视觉特征提取模型(如 CLIP、ResNet),实现"以图搜图"和跨模态推荐。例如,用户上传一张图片,系统可以推荐相似的商品。
3. 联邦学习与隐私保护 :在数据隐私保护日益重要的背景下,联邦学习成为一种重要的技术方向。通过在用户设备本地进行模型训练,只上传模型参数而非原始数据,可以在保护用户隐私的同时实现个性化推荐。
4. AutoML 与模型自动迭代 :引入 AutoML 技术,实现模型的自动训练、评估和部署。当新算法出现时,系统可以自动进行 A/B 测试,选择最优模型上线,形成闭环的算法迭代体系。
5. 大模型增强推荐 :利用大语言模型(LLM)的强大理解能力,实现更精准的用户意图识别和推荐理由生成。LLM 可以根据用户对话历史生成更符合用户偏好的推荐结果,同时提供可解释的推荐理由。
AI 技术的快速发展为推荐系统带来了新的机遇和挑战。作为 Java 开发者,我们需要保持开放的心态,积极学习和拥抱新技术,在保证系统稳定性和可维护性的前提下,将 AI 能力优雅地集成到现有架构中。希望本文能够为读者提供有价值的参考,助力大家在 AI 时代的技术升级之路。
✍️ 坚持用清晰易懂的图解+可落地的代码,让每个知识点都简单直观!💡 座右铭 :'道路是曲折的,前途是光明的!'
目录
引言 整体架构设计 AI 模型选型与训练 trainncfmodel.pyimport torch import torch.nn as nn import numpy as np from torch.utils.data import Dataset, DataLoader # 定义 NCF 模型classNCFModel(nn.Module):definit(self, numusers, numitems, embeddingdim=32):super(NCFModel, self).init()# 用户嵌入层 self.userembedding = nn.Embedding(numusers, embeddingdim)# 物品嵌入层 self.itemembedding = nn.Embedding(numitems, embeddingdim)# MLP 层 self.mlp = nn.Sequential( nn.Linear(embeddingdim 2,128), nn.ReLU(), nn.Dropout(0.2), nn.Linear(128,64), nn.ReLU(), nn.Dropout(0.2), nn.Linear(64,1), nn.Sigmoid())defforward(self, userids, itemids): useremb = self.userembedding(userids) itememb = self.itemembedding(itemids) concat = torch.cat([useremb, itememb], dim=-1)return self.mlp(concat)# 自定义数据集classRecommendationDataset(Dataset):definit(self, userids, itemids, labels): self.userids = torch.LongTensor(userids) self.itemids = torch.LongTensor(itemids) self.labels = torch.FloatTensor(labels)deflen(self):returnlen(self.labels)defgetitem(self, idx):return self.userids[idx], self.itemids[idx], self.labels[idx]# 训练函数deftrainmodel(traindata, valdata, numusers, numitems, epochs=10): device = torch.device('cuda'if torch.cuda.isavailable()else'cpu') model = NCFModel(numusers, numitems).to(device) criterion = nn.BCELoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.001) trainloader = DataLoader(traindata, batchsize=256, shuffle=True)for epoch inrange(epochs): model.train() totalloss =0for userids, itemids, labels in trainloader: userids, itemids, labels = userids.to(device), itemids.to(device), labels.to(device) optimizer.zerograd() predictions = model(userids, itemids).squeeze() loss = criterion(predictions, labels) loss.backward() optimizer.step() totalloss += loss.item()print(f'Epoch {epoch+1}/{epochs}, Loss: {totalloss/len(trainloader):.4f}')return model # 导出模型为 ONNX 格式defexporttoonnx(model, outputpath='ncfmodel.onnx'): model.eval() dummyuserids = torch.LongTensor([0]) dummyitemids = torch.LongTensor([0]) torch.onnx.export( model,(dummyuserids, dummyitemids), outputpath, inputnames=['userids','itemids'], outputnames=['prediction'], dynamicaxes={'userids':{0:'batchsize'},'itemids':{0:'batchsize'},'prediction':{0:'batchsize'}}, opsetversion=14)print(f'Model exported to {outputpath}')if name =='main':# 模拟训练数据 numusers =10000 numitems =50000 numsamples =100000 userids = np.random.randint(0, numusers, numsamples) itemids = np.random.randint(0, numitems, numsamples) labels = np.random.randint(0,2, numsamples).astype(float)# 划分训练集和验证集 splitidx =int(0.8 numsamples) traindata = RecommendationDataset(userids[:splitidx], itemids[:splitidx], labels[:splitidx]) valdata = RecommendationDataset(userids[splitidx:], itemids[splitidx:], labels[splitidx:])# 训练模型 model = trainmodel(traindata, valdata, numusers, numitems, epochs=5)# 导出为 ONNX 格式 exportto_onnx(model) modelserver.pyfrom fastapi import FastAPI, HTTPException from pydantic import BaseModel import torch import onnxruntime as ort import numpy as np from typing import List app = FastAPI(title='Recommendation Model API')# 加载 ONNX 模型 session = ort.InferenceSession('ncfmodel.onnx')classPredictionRequest(BaseModel): userids: List[int] itemids: List[int]classPredictionResponse(BaseModel): predictions: List[float]@app.post('/api/v1/predict', responsemodel=PredictionResponse)asyncdefpredict(request: PredictionRequest):try:# 准备输入数据 userids = np.array(request.userids, dtype=np.int64).reshape(-1,1) itemids = np.array(request.itemids, dtype=np.int64).reshape(-1,1)# ONNX 推理 inputs ={'userids': userids,'itemids': itemids } predictions = session.run(None, inputs)[0]return PredictionResponse(predictions=predictions.tolist())except Exception as e:raise HTTPException(statuscode=500, detail=str(e))@app.get('/health')asyncdefhealth_check():return{'status':'healthy'}if name =='main':import uvicorn uvicorn.run(app, host='0.0.0.0', port=8000) 核心微服务实现 项目结构与依赖 User Service 实现 user-service/src/main/resources/application.ymlserver:port:8081spring:application:name: user-service datasource:url: jdbc:mysql://localhost:3306/user_db username: root password: password data:redis:host: localhost port:6379# Nacos 注册中心配置spring.cloud.nacos:discovery:server-addr: localhost:8848namespace: public config:server-addr: localhost:8848file-extension: yml Recommendation Service 实现 recommendation-service/src/main/resources/application.ymlserver:port:8083spring:application:name: recommendation-service cloud:nacos:discovery:server-addr: localhost:8848# Feign 配置openfeign:client:config:default:connectTimeout:2000readTimeout:5000loggerLevel: basic circuitbreaker:enabled:true# Resilience4j 配置circuitbreaker:configs:default:slidingWindowSize:10minimumNumberOfCalls:5failureRateThreshold:50waitDurationInOpenState: 10s permittedNumberOfCallsInHalfOpenState:3kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 模型服务配置model:service:url: http://localhost:8000timeout:3000# 推荐配置recommendation:cache:expire-minutes:30candidate:pool-size:500# 监控配置management:endpoints:web:exposure:include: health,metrics,prometheus metrics:export:prometheus:enabled:true 熔断与降级配置 异步日志收集 部署与性能优化 Docker 容器化 recommendation-service/Dockerfile FROM eclipse-temurin:17-jre-alpine WORKDIR /app # 复制 JAR 文件 COPY target/recommendation-service-*.jar app.jar # 设置 JVM 参数 ENV JAVAOPTS="-Xms512m -Xmx1g -XX:+UseG1GC -XX:MaxGCPauseMillis=200" EXPOSE 8083 ENTRYPOINT ["sh", "-c", "java $JAVAOPTS -jar app.jar"] docker-compose.ymlversion:'3.8'services:# Nacos 注册中心nacos:image: nacos/nacos-server:v2.2.3 ports:-"8848:8848"environment:MODE: standalone # MySQLmysql:image: mysql:8.0ports:-"3306:3306"environment:MYSQLROOTPASSWORD: password MYSQLDATABASE: recommendationdb volumes:- mysql-data:/var/lib/mysql # Redisredis:image: redis:7-alpine ports:-"6379:6379"volumes:- redis-data:/data # Kafkakafka:image: confluentinc/cp-kafka:7.5.0 ports:-"9092:9092"environment:KAFKAZOOKEEPERCONNECT: zookeeper:2181KAFKAADVERTISEDLISTENERS: PLAINTEXT://localhost:9092KAFKAOFFSETSTOPICREPLICATIONFACTOR:1dependson:- zookeeper zookeeper:image: confluentinc/cp-zookeeper:7.5.0 ports:-"2181:2181"environment:ZOOKEEPERCLIENTPORT:2181# 用户服务user-service:build: ./user-service ports:-"8081:8081"environment:SPRINGCLOUDNACOSDISCOVERYSERVERADDR: nacos:8848SPRINGDATASOURCEURL: jdbc:mysql://mysql:3306/userdb SPRINGDATAREDISHOST: redis dependson:- nacos - mysql - redis # 商品服务item-service:build: ./item-service ports:-"8082:8082"environment:SPRINGCLOUDNACOSDISCOVERYSERVERADDR: nacos:8848SPRINGDATASOURCEURL: jdbc:mysql://mysql:3306/itemdb dependson:- nacos - mysql # 推荐服务recommendation-service:build: ./recommendation-service ports:-"8083:8083"environment:SPRINGCLOUDNACOSDISCOVERYSERVERADDR: nacos:8848SPRINGKAFKABOOTSTRAPSERVERS: kafka:9092MODELSERVICEURL: http://model-service:8000dependson:- nacos - kafka - user-service - item-service # 模型服务(Python)model-service:build: ./model-service ports:-"8000:8000"environment:- MODELPATH=/app/models/ncf_model.onnx volumes:- ./models:/app/models volumes:mysql-data:redis-data: Kubernetes 编排 k8s/recommendation-service-deployment.yamlapiVersion: apps/v1 kind: Deployment metadata:name: recommendation-service labels:app: recommendation-service spec:replicas:3selector:matchLabels:app: recommendation-service template:metadata:labels:app: recommendation-service spec:containers:-name: recommendation-service image: your-registry/recommendation-service:1.0.0 ports:-containerPort:8083env:-name: SPRINGPROFILESACTIVE value:"prod"-name: SPRINGCLOUDNACOSDISCOVERYSERVERADDR value:"nacos-service:8848"-name: SPRINGKAFKABOOTSTRAPSERVERS value:"kafka-service:9092"-name: MODELSERVICEURL value:"http://model-service:8000"-name: JAVA_OPTS value:"-Xms1g -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=200"resources:requests:memory:"1Gi"cpu:"500m"limits:memory:"2Gi"cpu:"2000m"livenessProbe:httpGet:path: /actuator/health port:8083initialDelaySeconds:60periodSeconds:10readinessProbe:httpGet:path: /actuator/health port:8083initialDelaySeconds:30periodSeconds:5---apiVersion: v1 kind: Service metadata:name: recommendation-service spec:selector:app: recommendation-service ports:-protocol: TCP port:8083targetPort:8083type: ClusterIP ---apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata:name: recommendation-service-hpa spec:scaleTargetRef:apiVersion: apps/v1 kind: Deployment name: recommendation-service minReplicas:3maxReplicas:10metrics:-type: Resource resource:name: cpu target:type: Utilization averageUtilization:70-type: Resource resource:name: memory target:type: Utilization averageUtilization:80 性能优化策略 model-service/Dockerfile FROM nvidia/cuda:11.8.0-runtime-ubuntu22.04 RUN apt-get update && apt-get install -y \ python3.10 \ python3-pip \ && rm -rf /var/lib/apt/lists/* WORKDIR /app COPY requirements.txt . RUN pip3 install --no-cache-dir -r requirements.txt COPY modelserver.py . COPY models/ ./models/ EXPOSE 8000 CMD ["python3", "modelserver.py"] 推荐服务的推荐 JVM 参数JAVA_OPTS=" -Xms2g -Xmx4g -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:G1ReservePercent=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/logs/heapdump.hprof -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/logs/gc.log -Duser.timezone=Asia/Shanghai -Dfile.encoding=UTF-8 " 压测结果 总结与展望 相关免费在线工具 curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
Markdown转HTML 将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
HTML转Markdown 将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
JSON 压缩 通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online