跳到主要内容
基于 Spring Cloud 的分布式智能推荐系统架构与实践 | 极客日志
Java AI java 算法
基于 Spring Cloud 的分布式智能推荐系统架构与实践 推荐系统在电商和内容平台中至关重要,但传统单体架构难以应对高并发与复杂算法迭代。本文基于 Spring Cloud 微服务架构,结合 Python AI 模型,构建分布式智能推荐系统。通过特征工程与模型推理解耦、服务熔断降级、多级缓存及 GPU 加速等策略,实现了高可用与高性能。内容涵盖架构设计、模型训练导出、核心服务实现(Feign、Resilience4j)、容器化部署及性能压测优化,为 Java 生态集成 AI 能力提供完整落地方案。
abccba 发布于 2026/3/29 更新于 2026/4/25 2 浏览引言
在当今数字化时代,推荐系统已成为电商平台、内容分发平台及社交网络的核心竞争力。从淘宝的'猜你喜欢'到抖音的精准推送,优秀的推荐系统不仅能显著提升用户留存率和转化率,更能为企业带来可观的商业价值。据统计,亚马逊约 35% 的销售额来自推荐系统。
然而,随着业务规模增长和算法复杂化,传统单体架构逐渐暴露瓶颈。推荐系统涉及用户画像、实时行为收集、特征工程、模型推理等多个环节,单体应用难以应对复杂的业务逻辑;海量并发请求要求弹性伸缩能力;AI 模型频繁迭代需要热更新机制,而单体部署往往导致服务中断。此外,A/B 测试验证不同策略也需要灵活的流量隔离。
基于上述挑战,采用 Spring Cloud 微服务架构结合分布式 AI 服务成为主流解决方案。这种架构实现了计算与存储解耦、特征工程与模型推理分离,支持按需水平扩展。同时,微服务天然支持模型热更新和灰度发布。本文将基于 Spring Boot 3 和 Spring Cloud 2022,详细介绍如何从零构建高可用、可扩展的智能推荐系统,重点探讨如何在 Java 生态中安全、高效地集成 AI 能力。
整体架构设计
一个完整的推荐系统需处理从用户请求到结果返回的全链路流程。在微服务架构下,我们将系统拆分为多个职责单一的服务,通过协作完成推荐任务。核心服务包括用户服务、商品服务、特征工程服务、推荐服务和模型推理服务。
服务划分与职责
user-service :负责用户基础信息和画像数据维护,提供注册、登录、画像查询接口。画像包含性别、年龄、地域等静态属性及历史偏好标签等动态属性。
item-service :管理商品元数据,包括分类、品牌、价格、库存、标签等。同时维护商品特征向量,用于相似度计算。
recommendation-service :核心协调服务。接收请求,调用上下游服务获取上下文,组装特征向量,最终调用模型服务获取推荐结果。
feature-engine :特征工程服务,负责实时特征提取和处理,如用户实时行为、商品热度、上下文特征(时间、设备、地理位置)等。
model-serving :模型推理服务,通常使用 Python 实现,加载训练好的推荐模型并提供推理接口,通过 REST 或 gRPC 暴露。
event-collector :事件收集服务,负责收集曝光、点击、转化等行为数据,发送至消息队列,用于离线训练和在线学习。
基础设施组件
服务注册与发现 :使用 Nacos 或 Eureka,支持健康检查和故障剔除。
API 网关 :Spring Cloud Gateway 作为统一入口,负责路由转发、认证授权、限流熔断。
配置中心 :Nacos Config 集中管理配置文件,支持动态刷新。
消息队列 :Kafka 或 RocketMQ 处理异步事件流。
缓存层 :Redis 缓存热点推荐结果和特征数据。
系统调用关系
┌─────────────────────────────────────────────────────────────────────┐
│ API Gateway (Spring Cloud Gateway) │
│ 认证 | 限流 | 路由 | 熔断 │
└─────────────────────────────────────────────────────────────────────┘
│ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ Recommendation Service (推荐服务) │
│ ┌─────────────────────────────────────────────┐ │
│ │ Controller → Service → Feature Assembler │ │
│ └─────────────────────────────────────────────┘ │
└─────┬───────────────┬───────────────┬───────────────┬───────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐ ┌─────────────────────────┐
│ User │ │ Item │ │ Feature │ │ Model Serving │
│ Service │ │ Service │ │ Engine │ │ (Python/PyTorch) │
│ │ │ │ │ │ │ • 协同过滤 │
│ • 用户信息│ │ • 商品信息│ │ • 实时行为│ │ • 矩阵分解 │
│ • 用户画像│ │ • 商品特征│ │ • 特征组装│ │ • NCF/DeepFM │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘ └─────────┬───────────────┘
│ │ │ │
└─────────────┴─────────────┴─────────────────┘
▼
┌───────────────────────────┐
│ Nacos (注册中心) │
│ • 服务注册与发现 │
│ • 配置中心 │
│ • 健康检查 │
└───────────────────────────┘
▼
┌───────────────────────────┐
│ Kafka (消息队列) │
│ • 曝光事件 │
│ • 点击事件 │
│ • 转化事件 │
└───────────────────────────┘
▼
┌───────────────────────────┐
│ Event Collector Service │
│ • 实时日志收集 │
│ • 在线学习反馈 │
└───────────────────────────┘
架构设计亮点
特征与模型解耦 :特征工程独立为微服务,模型推理独立为 Python 服务,两者通过标准化接口交互。这种设计使特征工程和模型开发可以并行迭代,互不影响。
水平扩展能力 :各服务可根据负载独立扩容,如大促期间可临时增加推荐服务和模型服务的实例数。
容错与降级 :通过 Resilience4j 实现服务熔断和降级,当模型服务不可用时,可降级到基于规则的推荐,确保核心业务不受影响。
灰度发布支持 :通过网关的流量路由能力,可实现不同算法版本的灰度测试。
AI 模型选型与训练 推荐算法的选择需综合考虑业务场景、数据规模、实时性要求和计算资源。对于电商和内容平台,常见的推荐算法包括基于协同过滤的传统方法、基于矩阵分解的隐因子模型,以及基于深度学习的神经网络模型。
算法选型分析
协同过滤(Collaborative Filtering) :分为基于用户和基于物品两种。实现简单、可解释性强,但存在冷启动和稀疏性问题。
矩阵分解(Matrix Factorization) :通过将用户 - 物品评分矩阵分解为低维矩阵,捕捉潜在因子关系。SVD 及其变体是代表算法,能更好地处理数据稀疏问题。
深度学习模型 :如 NCF、DeepFM、Wide&Deep 等能够捕捉高阶特征交互,效果通常优于传统方法。
模型训练与导出 在实际项目中,我们通常使用 Python 生态中的 PyTorch 进行模型训练。以下是一个使用 PyTorch 实现 NCF 模型的简化示例:
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader
class NCFModel (nn.Module):
def __init__ (self, num_users, num_items, embedding_dim=32 ):
super (NCFModel, self ).__init__()
self .user_embedding = nn.Embedding(num_users, embedding_dim)
self .item_embedding = nn.Embedding(num_items, embedding_dim)
self .mlp = nn.Sequential(
nn.Linear(embedding_dim * 2 , 128 ),
nn.ReLU(),
nn.Dropout(0.2 ),
nn.Linear(128 , 64 ),
nn.ReLU(),
nn.Dropout(0.2 ),
nn.Linear(64 , 1 ),
nn.Sigmoid()
)
def forward (self, user_ids, item_ids ):
user_emb = self .user_embedding(user_ids)
item_emb = self .item_embedding(item_ids)
concat = torch.cat([user_emb, item_emb], dim=-1 )
return self .mlp(concat)
class RecommendationDataset (Dataset ):
def __init__ (self, user_ids, item_ids, labels ):
self .user_ids = torch.LongTensor(user_ids)
self .item_ids = torch.LongTensor(item_ids)
self .labels = torch.FloatTensor(labels)
def __len__ (self ):
return len (self .labels)
def __getitem__ (self, idx ):
return self .user_ids[idx], self .item_ids[idx], self .labels[idx]
def train_model (train_data, val_data, num_users, num_items, epochs=10 ):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu' )
model = NCFModel(num_users, num_items).to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001 )
train_loader = DataLoader(train_data, batch_size=256 , shuffle=True )
for epoch in range (epochs):
model.train()
total_loss = 0
for user_ids, item_ids, labels in train_loader:
user_ids, item_ids, labels = user_ids.to(device), item_ids.to(device), labels.to(device)
optimizer.zero_grad()
predictions = model(user_ids, item_ids).squeeze()
loss = criterion(predictions, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
print (f'Epoch {epoch+1 } /{epochs} , Loss: {total_loss/len (train_loader):.4 f} ' )
return model
def export_to_onnx (model, output_path='ncf_model.onnx' ):
model.eval ()
dummy_user_ids = torch.LongTensor([0 ])
dummy_item_ids = torch.LongTensor([0 ])
torch.onnx.export(
model, (dummy_user_ids, dummy_item_ids), output_path,
input_names=['user_ids' , 'item_ids' ],
output_names=['prediction' ],
dynamic_axes={'user_ids' : {0 : 'batch_size' }, 'item_ids' : {0 : 'batch_size' }, 'prediction' : {0 : 'batch_size' }},
opset_version=14
)
print (f'Model exported to {output_path} ' )
if __name__ == '__main__' :
num_users = 10000
num_items = 50000
num_samples = 100000
user_ids = np.random.randint(0 , num_users, num_samples)
item_ids = np.random.randint(0 , num_items, num_samples)
labels = np.random.randint(0 , 2 , num_samples).astype(float )
split_idx = int (0.8 * num_samples)
train_data = RecommendationDataset(user_ids[:split_idx], item_ids[:split_idx], labels[:split_idx])
val_data = RecommendationDataset(user_ids[split_idx:], item_ids[split_idx:], labels[split_idx:])
model = train_model(train_data, val_data, num_users, num_items, epochs=5 )
export_to_onnx(model)
模型服务化 训练完成后,我们需要将模型部署为服务供 Java 应用调用。以下是使用 FastAPI 实现的模型推理服务:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import onnxruntime as ort
import numpy as np
from typing import List
app = FastAPI(title='Recommendation Model API' )
session = ort.InferenceSession('ncf_model.onnx' )
class PredictionRequest (BaseModel ):
user_ids: List [int ]
item_ids: List [int ]
class PredictionResponse (BaseModel ):
predictions: List [float ]
@app.post('/api/v1/predict' , response_model=PredictionResponse )
async def predict (request: PredictionRequest ):
try :
user_ids = np.array(request.user_ids, dtype=np.int64).reshape(-1 , 1 )
item_ids = np.array(request.item_ids, dtype=np.int64).reshape(-1 , 1 )
inputs = {'user_ids' : user_ids, 'item_ids' : item_ids}
predictions = session.run(None , inputs)[0 ]
return PredictionResponse(predictions=predictions.tolist())
except Exception as e:
raise HTTPException(status_code=500 , detail=str (e))
@app.get('/health' )
async def health_check ():
return {'status' : 'healthy' }
if __name__ == '__main__' :
import uvicorn
uvicorn.run(app, host='0.0.0.0' , port=8000 )
这个模型服务提供了 REST API,Java 应用可以通过 HTTP 调用。对于高性能场景,也可以考虑使用 gRPC 协议。
核心微服务实现 本节将详细介绍推荐系统中核心微服务的实现,包括服务间调用、特征组装、模型推理集成、异步日志收集和熔断降级等关键功能。所有代码基于 Spring Boot 3 和 Spring Cloud 2022。
项目结构与依赖
<project >
<groupId > com.example.recommendation</groupId >
<artifactId > recommendation-system</artifactId >
<version > 1.0.0</version >
<packaging > pom</packaging >
<modules >
<module > user-service</module >
<module > item-service</module >
<module > recommendation-service</module >
<module > feature-engine</module >
<module > common</module >
</modules >
<properties >
<java.version > 17</java.version >
<spring-boot.version > 3.2.0</spring-boot.version >
<spring-cloud.version > 2023.0.0</spring-cloud.version >
</properties >
<dependencyManagement >
<dependencies >
<dependency >
<groupId > org.springframework.boot</groupId >
<artifactId > spring-boot-dependencies</artifactId >
<version > ${spring-boot.version}</version >
<type > pom</type >
<scope > import</scope >
</dependency >
<dependency >
<groupId > org.springframework.cloud</groupId >
<artifactId > spring-cloud-dependencies</artifactId >
<version > ${spring-cloud.version}</version >
<type > pom</type >
<scope > import</scope >
</dependency >
</dependencies >
</dependencyManagement >
</project >
User Service 实现
package com.example.user.controller;
import com.example.user.dto.UserProfileDTO;
import com.example.user.service.UserService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/users")
@RequiredArgsConstructor
public class UserController {
private final UserService userService;
@GetMapping("/{userId}/profile")
public UserProfileDTO getUserProfile (@PathVariable Long userId) {
return userService.getUserProfile(userId);
}
@PostMapping("/profiles/batch")
public Map<Long, UserProfileDTO> getUserProfilesBatch (@RequestBody List<Long> userIds) {
return userService.getUserProfilesBatch(userIds);
}
}
package com.example.user.service;
import com.example.user.dto.UserProfileDTO;
import com.example.user.entity.UserProfile;
import com.example.user.repository.UserProfileRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
public class UserService {
private final UserProfileRepository userProfileRepository;
@Cacheable(value = "userProfiles", key = "#userId")
public UserProfileDTO getUserProfile (Long userId) {
UserProfile profile = userProfileRepository.findById(userId).orElseThrow(() -> new RuntimeException ("User not found: " + userId));
return UserProfileDTO.builder()
.userId(profile.getUserId())
.gender(profile.getGender())
.age(profile.getAge())
.city(profile.getCity())
.membershipLevel(profile.getMembershipLevel())
.interestTags(profile.getInterestTags())
.build();
}
public Map<Long, UserProfileDTO> getUserProfilesBatch (List<Long> userIds) {
List<UserProfile> profiles = userProfileRepository.findAllById(userIds);
return profiles.stream().collect(Collectors.toMap(UserProfile::getUserId, profile -> UserProfileDTO.builder()
.userId(profile.getUserId())
.gender(profile.getGender())
.age(profile.getAge())
.city(profile.getCity())
.membershipLevel(profile.getMembershipLevel())
.interestTags(profile.getInterestTags())
.build()));
}
}
server:
port: 8081
spring:
application:
name: user-service
datasource:
url: jdbc:mysql://localhost:3306/user_db
username: root
password: password
data:
redis:
host: localhost
port: 6379
spring:
cloud:
nacos:
discovery:
server-addr: localhost:8848
namespace: public
config:
server-addr: localhost:8848
file-extension: yml
Recommendation Service 实现 推荐服务是整个系统的核心,负责协调各服务完成推荐流程。首先定义 Feign 客户端:
package com.example.recommendation.client;
import com.example.common.dto.UserProfileDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List;
import java.util.Map;
@FeignClient(name = "user-service", path = "/api/users", fallbackFactory = UserClientFallback.class)
public interface UserClient {
@GetMapping("/{userId}/profile")
UserProfileDTO getUserProfile (@PathVariable("userId") Long userId) ;
@PostMapping("/profiles/batch")
Map<Long, UserProfileDTO> getUserProfilesBatch (@RequestBody List<Long> userIds) ;
}
package com.example.recommendation.client;
import com.example.common.dto.UserProfileDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class UserClientFallback implements UserClient {
@Override
public UserProfileDTO getUserProfile (Long userId) {
log.warn("User service fallback triggered for userId: {}" , userId);
return UserProfileDTO.builder()
.userId(userId)
.gender("unknown" )
.age(25 )
.city("unknown" )
.membershipLevel("NORMAL" )
.interestTags(Collections.emptyList())
.build();
}
@Override
public Map<Long, UserProfileDTO> getUserProfilesBatch (List<Long> userIds) {
log.warn("User service batch fallback triggered" );
return Collections.emptyMap();
}
}
package com.example.recommendation.service;
import com.example.common.dto.*;
import com.example.recommendation.client.ItemClient;
import com.example.recommendation.client.UserClient;
import com.example.recommendation.client.ModelClient;
import com.example.recommendation.config.RecommendationProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@Service
@RequiredArgsConstructor
public class RecommendationService {
private final UserClient userClient;
private final ItemClient itemClient;
private final ModelClient modelClient;
private final FeatureEngineClient featureEngineClient;
private final RedisTemplate<String, Object> redisTemplate;
private final KafkaTemplate<String, Object> kafkaTemplate;
private final RecommendationProperties properties;
public RecommendationResult getRecommendations (Long userId, String scenario, int size) {
String cacheKey = String.format("recommendation:%d:%s" , userId, scenario);
List<RecommendedItem> cachedResult = (List<RecommendedItem>) redisTemplate.opsForValue().get(cacheKey);
if (cachedResult != null ) {
log.info("Cache hit for user: {}, scenario: {}" , userId, scenario);
return RecommendationResult.builder()
.userId(userId)
.scenario(scenario)
.items(cachedResult)
.source("CACHE" )
.build();
}
UserProfileDTO userProfile = userClient.getUserProfile(userId);
List<Long> candidateItemIds = getCandidateItems(userProfile, scenario, size * 10 );
List<ItemDTO> candidateItems = itemClient.getItemsBatch(candidateItemIds);
FeatureVector featureVector = buildFeatureVector(userId, userProfile, candidateItems);
Map<Long, Double> itemScores = modelClient.predict(userId, candidateItemIds, featureVector);
List<RecommendedItem> recommendedItems = candidateItems.stream()
.filter(item -> itemScores.containsKey(item.getItemId()))
.sorted((a, b) -> Double.compare(itemScores.get(b.getItemId()), itemScores.get(a.getItemId())))
.limit(size)
.map(item -> RecommendedItem.builder()
.itemId(item.getItemId())
.itemName(item.getItemName())
.category(item.getCategory())
.price(item.getPrice())
.score(itemScores.get(item.getItemId()))
.reason("基于您的兴趣推荐" )
.build())
.collect(Collectors.toList());
redisTemplate.opsForValue().set(cacheKey, recommendedItems, Duration.ofMinutes(properties.getCacheExpireMinutes()));
recordExposureEvent(userId, scenario, recommendedItems);
return RecommendationResult.builder()
.userId(userId)
.scenario(scenario)
.items(recommendedItems)
.source("MODEL" )
.build();
}
private List<Long> getCandidateItems (UserProfileDTO userProfile, String scenario, int poolSize) {
List<String> interests = userProfile.getInterestTags();
if (interests.isEmpty()) {
interests = Arrays.asList("热门" );
}
return itemClient.getItemsByCategory(interests.get(0 )).stream()
.limit(poolSize)
.map(ItemDTO::getItemId)
.collect(Collectors.toList());
}
private FeatureVector buildFeatureVector (Long userId, UserProfileDTO userProfile, List<ItemDTO> items) {
return FeatureVector.builder()
.userId(userId)
.userGender(userProfile.getGender())
.userAge(userProfile.getAge())
.userCity(userProfile.getCity())
.membershipLevel(userProfile.getMembershipLevel())
.interestTags(userProfile.getInterestTags())
.itemCategories(items.stream().map(ItemDTO::getCategory).distinct().collect(Collectors.toList()))
.hourOfDay(LocalDateTime.now().getHour())
.dayOfWeek(LocalDateTime.now().getDayOfWeek().getValue())
.build();
}
private void recordExposureEvent (Long userId, String scenario, List<RecommendedItem> items) {
List<Long> itemIds = items.stream().map(RecommendedItem::getItemId).collect(Collectors.toList());
ExposureEvent event = ExposureEvent.builder()
.userId(userId)
.scenario(scenario)
.itemIds(itemIds)
.timestamp(System.currentTimeMillis())
.build();
kafkaTemplate.send("recommendation-exposure" , event);
log.debug("Exposure event sent for user: {}" , userId);
}
public void recordClickEvent (Long userId, Long itemId, String scenario) {
ClickEvent event = ClickEvent.builder()
.userId(userId)
.itemId(itemId)
.scenario(scenario)
.timestamp(System.currentTimeMillis())
.build();
kafkaTemplate.send("recommendation-click" , event);
log.info("Click event recorded for user: {}, item: {}" , userId, itemId);
}
}
模型推理客户端(通过 WebClient 调用 Python 模型服务):
package com.example.recommendation.client;
import com.example.recommendation.dto.ModelPredictRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
@RequiredArgsConstructor
public class ModelClient {
private final WebClient.Builder webClientBuilder;
@Value("${model.service.url}")
private String modelServiceUrl;
@Value("${model.service.timeout}")
private int timeoutMs;
public Map<Long, Double> predict (Long userId, List<Long> itemIds, FeatureVector featureVector) {
WebClient webClient = webClientBuilder.baseUrl(modelServiceUrl).build();
List<Integer> userIds = Collections.nCopies(itemIds.size(), userId.intValue());
ModelPredictRequest request = ModelPredictRequest.builder()
.userIds(userIds)
.itemIds(itemIds.stream().map(Long::intValue).collect(Collectors.toList()))
.featureVector(featureVector)
.build();
Map<Long, Double> scores = new HashMap <>();
try {
ModelPredictResponse response = webClient.post()
.uri("/api/v1/predict" )
.bodyValue(request)
.retrieve()
.bodyToMono(ModelPredictResponse.class)
.timeout(Duration.ofMillis(timeoutMs))
.retryWhen(Retry.backoff(3 , Duration.ofMillis(100 )))
.block();
if (response != null && response.getPredictions() != null ) {
for (int i = 0 ; i < itemIds.size(); i++) {
scores.put(itemIds.get(i), response.getPredictions().get(i));
}
}
} catch (Exception e) {
log.error("Model prediction failed for userId: {}, error: {}" , userId, e.getMessage());
itemIds.forEach(id -> scores.put(id, 0.5 ));
}
return scores;
}
}
熔断与降级配置 使用 Resilience4j 实现服务熔断和降级:
package com.example.recommendation.config;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.timelimiter.TimeLimiterConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
@Configuration
public class ResilienceConfig {
@Bean
public CircuitBreakerRegistry circuitBreakerRegistry () {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.slidingWindowSize(10 )
.minimumNumberOfCalls(5 )
.failureRateThreshold(50 )
.waitDurationInOpenState(Duration.ofSeconds(10 ))
.permittedNumberOfCallsInHalfOpenState(3 )
.slowCallDurationThreshold(Duration.ofSeconds(3 ))
.slowCallRateThreshold(50 )
.build();
return CircuitBreakerRegistry.of(config);
}
@Bean
public TimeLimiterConfig timeLimiterConfig () {
return TimeLimiterConfig.custom()
.timeoutDuration(Duration.ofSeconds(5 ))
.build();
}
}
package com.example.recommendation.fallback;
import com.example.common.dto.RecommendationResult;
import com.example.common.dto.RecommendedItem;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
@Slf4j
@Component
public class RecommendationFallback {
public RecommendationResult getHotItemsFallback (Long userId, String scenario) {
log.warn("Model service degraded, using hot items fallback for user: {}" , userId);
List<RecommendedItem> hotItems = getHotItems(scenario);
return RecommendationResult.builder()
.userId(userId)
.scenario(scenario)
.items(hotItems)
.source("FALLBACK_HOT_ITEMS" )
.build();
}
private List<RecommendedItem> getHotItems (String scenario) {
return Arrays.asList(
RecommendedItem.builder().itemId(1001L ).itemName("热门商品 1" ).category("电子" ).price(299.0 ).score(0.95 ).reason("热门推荐" ).build(),
RecommendedItem.builder().itemId(1002L ).itemName("热门商品 2" ).category("服装" ).price(199.0 ).score(0.92 ).reason("热门推荐" ).build()
);
}
}
异步日志收集 通过 Kafka 发送用户行为事件,用于在线学习和模型更新:
package com.example.recommendation.producer;
import com.example.common.dto.ExposureEvent;
import com.example.common.dto.ClickEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Slf4j
@Component
@RequiredArgsConstructor
public class EventProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void sendExposureEvent (ExposureEvent event) {
kafkaTemplate.send("recommendation-exposure" , event).addCallback(new ListenableFutureCallback <SendResult<String, Object>>() {
@Override
public void onSuccess (SendResult<String, Object> result) {
log.debug("Exposure event sent successfully: {}" , event);
}
@Override
public void onFailure (Throwable ex) {
log.error("Failed to send exposure event: {}" , event, ex);
}
});
}
public void sendClickEvent (ClickEvent event) {
kafkaTemplate.send("recommendation-click" , event).addCallback(new ListenableFutureCallback <SendResult<String, Object>>() {
@Override
public void onSuccess (SendResult<String, Object> result) {
log.info("Click event sent successfully: {}" , event);
}
@Override
public void onFailure (Throwable ex) {
log.error("Failed to send click event: {}" , event, ex);
}
});
}
}
部署与性能优化 完成了核心服务的开发后,接下来需要考虑如何将这些服务部署到生产环境,并进行性能优化以确保系统的高可用和低延迟。
Docker 容器化 首先为每个服务创建 Dockerfile。以下以 recommendation-service 为例:
# recommendation-service/Dockerfile
FROM eclipse-temurin:17-jre-alpine
WORKDIR /app
# 复制 JAR 文件
COPY target/recommendation-service-*.jar app.jar
# 设置 JVM 参数
ENV JAVA_OPTS="-Xms512m -Xmx1g -XX:+UseG1GC -XX:MaxGCPauseMillis=200"
EXPOSE 8083
ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar app.jar"]
使用 Docker Compose 进行本地开发环境编排:
version: '3.8'
services:
nacos:
image: nacos/nacos-server:v2.2.3
ports:
- "8848:8848"
environment:
MODE: standalone
mysql:
image: mysql:8.0
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: password
MYSQL_DATABASE: recommendation_db
volumes:
- mysql-data:/var/lib/mysql
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
user-service:
build: ./user-service
ports:
- "8081:8081"
environment:
SPRING_CLOUD_NACOS_DISCOVERY_SERVER_ADDR: nacos:8848
SPRING_DATASOURCE_URL: jdbc:mysql://mysql:3306/user_db
SPRING_DATA_REDIS_HOST: redis
depends_on:
- nacos
- mysql
- redis
item-service:
build: ./item-service
ports:
- "8082:8082"
environment:
SPRING_CLOUD_NACOS_DISCOVERY_SERVER_ADDR: nacos:8848
SPRING_DATASOURCE_URL: jdbc:mysql://mysql:3306/item_db
depends_on:
- nacos
- mysql
recommendation-service:
build: ./recommendation-service
ports:
- "8083:8083"
environment:
SPRING_CLOUD_NACOS_DISCOVERY_SERVER_ADDR: nacos:8848
SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
MODEL_SERVICE_URL: http://model-service:8000
depends_on:
- nacos
- kafka
- user-service
- item-service
model-service:
build: ./model-service
ports:
- "8000:8000"
environment:
- MODEL_PATH=/app/models/ncf_model.onnx
volumes:
- ./models:/app/models
volumes:
mysql-data:
redis-data:
Kubernetes 编排 对于生产环境,使用 Kubernetes 进行容器编排。以下是 recommendation-service 的部署配置:
apiVersion: apps/v1
kind: Deployment
metadata:
name: recommendation-service
labels:
app: recommendation-service
spec:
replicas: 3
selector:
matchLabels:
app: recommendation-service
template:
metadata:
labels:
app: recommendation-service
spec:
containers:
- name: recommendation-service
image: your-registry/recommendation-service:1.0.0
ports:
- containerPort: 8083
env:
- name: SPRING_PROFILES_ACTIVE
value: "prod"
- name: SPRING_CLOUD_NACOS_DISCOVERY_SERVER_ADDR
value: "nacos-service:8848"
- name: SPRING_KAFKA_BOOTSTRAP_SERVERS
value: "kafka-service:9092"
- name: MODEL_SERVICE_URL
value: "http://model-service:8000"
- name: JAVA_OPTS
value: "-Xms1g -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=200"
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /actuator/health
port: 8083
initialDelaySeconds: 60
periodSeconds: 10
readinessProbe:
httpGet:
path: /actuator/health
port: 8083
initialDelaySeconds: 30
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: recommendation-service
spec:
selector:
app: recommendation-service
ports:
- protocol: TCP
port: 8083
targetPort: 8083
type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: recommendation-service-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: recommendation-service
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
性能优化策略
1. 模型服务 GPU 加速 对于深度学习模型,使用 GPU 可以显著提升推理速度。以下是支持 GPU 的模型服务部署配置:
# model-service/Dockerfile
FROM nvidia/cuda:11.8.0-runtime-ubuntu22.04
RUN apt-get update && apt-get install -y \
python3.10 \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt
COPY model_server.py .
COPY models/ ./models/
EXPOSE 8000
CMD ["python3", "model_server.py"]
resources:
limits:
nvidia.com/gpu: 1
2. 多级缓存策略
@Configuration
@EnableCaching
public class CacheConfiguration {
@Bean
public RedisCacheManager redisCacheManager (RedisConnectionFactory factory) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(30 ))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer ()))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer ()));
return RedisCacheManager.builder(factory)
.cacheDefaults(config)
.withInitialCacheConfigurations(getCacheConfigurations())
.build();
}
private Map<String, RedisCacheConfiguration> getCacheConfigurations () {
Map<String, RedisCacheConfiguration> configMap = new HashMap <>();
configMap.put("userProfiles" , RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofHours(1 )));
configMap.put("recommendations" , RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofMinutes(30 )));
configMap.put("hotItems" , RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofMinutes(10 )));
return configMap;
}
}
3. JVM 调优 针对推荐服务的内存和 CPU 特性进行 JVM 参数调优:
JAVA_OPTS="-Xms2g -Xmx4g -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:G1ReservePercent=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/logs/heapdump.hprof -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/logs/gc.log -Duser.timezone=Asia/Shanghai -Dfile.encoding=UTF-8"
4. 数据库优化
为高频查询字段添加索引
使用读写分离分担主库压力
对于用户画像等热点数据使用 Redis 缓存
CREATE INDEX idx_user_id ON user_profile(user_id);
CREATE INDEX idx_membership ON user_profile(membership_level);
CREATE INDEX idx_city ON user_profile(city);
CREATE INDEX idx_item_category ON item(category);
CREATE INDEX idx_item_brand ON item(brand);
CREATE INDEX idx_item_created ON item(created_at);
压测结果 使用 JMeter 进行压力测试,测试场景为并发获取推荐结果:
推荐服务:3 个实例,每实例 2C4G
模型服务:2 个实例,每实例 4C8G + GPU
Redis:1 主 2 从
Kafka:3 节点集群
并发数 QPS 平均响应时间 P95 响应时间 P99 响应时间 错误率 100 850 115ms 180ms 250ms 0% 500 3200 155ms 280ms 420ms 0% 1000 4800 205ms 450ms 680ms 0.1% 2000 5200 380ms 850ms 1200ms 2%
并发数 QPS 平均响应时间 P95 响应时间 P99 响应时间 错误率 100 1500 65ms 95ms 130ms 0% 500 5800 85ms 150ms 220ms 0% 1000 9500 105ms 200ms 320ms 0% 2000 12000 165ms 350ms 520ms 0.05%
通过缓存和 JVM 调优,系统性能提升了约 100-150%,P99 延迟降低了约 50%。
总结与展望 本文详细介绍了基于 Spring Cloud 微服务架构构建智能推荐系统的完整过程,从架构设计、模型选型、服务实现到部署优化,全方位展示了如何在企业级 Java 生态中集成 AI 能力。通过微服务架构,我们实现了特征工程与模型推理的解耦,使各组件可以独立开发、部署和扩展;通过服务熔断和降级策略,确保了系统的高可用性;通过多级缓存和 JVM 调优,显著提升了系统性能。
Spring Cloud 与 AI 的结合为传统 Java 应用注入了智能化能力,使开发者可以在熟悉的 Java 生态中轻松集成机器学习模型。这种架构的优势在于:一是技术栈的灵活性,Java 开发者专注于业务逻辑和数据流转,Python 算法工程师专注于模型开发和训练,各司其职;二是系统的可扩展性,各服务可根据负载独立扩容,应对流量洪峰;三是部署的便捷性,通过容器化和编排工具,可以实现快速部署和弹性伸缩。
实时推荐 :当前的系统主要基于离线计算的批量推荐,未来可以引入实时流处理技术(如 Flink),实现基于用户实时行为的毫秒级推荐更新。当用户浏览一个商品后,系统可以立即调整后续推荐内容,提升用户体验和转化率。
多模态特征融合 :随着多媒体内容的普及,推荐系统需要处理文本、图像、视频等多种模态的特征。可以引入视觉特征提取模型(如 CLIP、ResNet),实现'以图搜图'和跨模态推荐。例如,用户上传一张图片,系统可以推荐相似的商品。
联邦学习与隐私保护 :在数据隐私保护日益重要的背景下,联邦学习成为一种重要的技术方向。通过在用户设备本地进行模型训练,只上传模型参数而非原始数据,可以在保护用户隐私的同时实现个性化推荐。
AutoML 与模型自动迭代 :引入 AutoML 技术,实现模型的自动训练、评估和部署。当新算法出现时,系统可以自动进行 A/B 测试,选择最优模型上线,形成闭环的算法迭代体系。
大模型增强推荐 :利用大语言模型(LLM)的强大理解能力,实现更精准的用户意图识别和推荐理由生成。LLM 可以根据用户对话历史生成更符合用户偏好的推荐结果,同时提供可解释的推荐理由。
AI 技术的快速发展为推荐系统带来了新的机遇和挑战。作为 Java 开发者,我们需要保持开放的心态,积极学习和拥抱新技术,在保证系统稳定性和可维护性的前提下,将 AI 能力优雅地集成到现有架构中。希望本文能够为读者提供有价值的参考,助力大家在 AI 时代的技术升级之路。
相关免费在线工具 Keycode 信息 查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
Escape 与 Native 编解码 JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
JavaScript / HTML 格式化 使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
JavaScript 压缩与混淆 Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
RSA密钥对生成器 生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online