跳到主要内容
基于 Spring Cloud 构建分布式智能推荐系统实战 | 极客日志
Java AI java 算法
基于 Spring Cloud 构建分布式智能推荐系统实战 基于 Spring Cloud 微服务架构的分布式智能推荐系统设计与实现。涵盖架构拆分、特征工程与模型推理解耦、Python 模型服务化集成、Java 端异步调用及熔断降级策略。通过 Docker 容器化部署与 JVM 调优,解决高并发场景下的性能瓶颈,提供从训练到上线的全链路技术方案。
基于 Spring Cloud 构建分布式智能推荐系统实战
背景与挑战
在数字化时代,推荐系统已成为电商平台、内容分发平台及社交网络的核心竞争力。从淘宝的'猜你喜欢'到 Netflix 的影视推荐,优秀的推荐系统不仅能提升用户留存率和转化率,更能为企业带来可观的商业价值。然而,随着业务规模增长和算法复杂化,传统单体架构逐渐暴露瓶颈:
逻辑复杂 :涉及用户画像、实时行为收集、特征工程、模型推理等多个环节,单体应用难以应对。
并发压力 :海量请求下单机部署无法满足弹性伸缩需求。
迭代困难 :AI 模型频繁更新,单体架构下部署往往需重启整个应用,影响线上稳定性。
策略隔离 :A/B 测试需要灵活的流量隔离和策略切换,单体架构实现困难。
采用 Spring Cloud 微服务架构结合分布式 AI 服务成为主流解决方案。这种架构实现了计算与存储解耦、特征工程与模型推理分离,支持按需水平扩展,并能轻松应对流量洪峰。本文将基于 Spring Boot 3 和 Spring Cloud 2022,详细介绍如何从零构建高可用、可扩展的智能推荐系统。
整体架构设计
一个完整的推荐系统需处理从用户请求到结果返回的全链路流程。在微服务架构下,我们将系统拆分为多个职责单一的服务,通过协作完成推荐任务。
服务划分与职责
user-service :负责用户基础信息和画像数据维护,提供注册、登录、画像查询接口。包含性别、年龄等静态属性及历史偏好标签等动态属性。
item-service :管理商品/内容元数据,包括分类、品牌、价格、库存等。同时维护商品特征向量,用于相似度计算。
recommendation-service :核心协调服务。接收请求,调用上下游服务获取上下文,组装特征向量,最终调用模型服务获取结果。
feature-engine :特征工程服务,负责实时特征提取和处理,如用户实时行为、商品热度、上下文特征等。
model-serving :模型推理服务,通常使用 Python 实现,利用 PyTorch/TensorFlow 能力,通过 REST 或 gRPC 暴露接口。
event-collector :事件收集服务,负责收集曝光、点击、转化等行为数据,发送至消息队列用于离线训练和在线学习。
基础设施组件
服务注册与发现 :Nacos 或 Eureka,支持健康检查和故障剔除。
API 网关 :Spring Cloud Gateway,统一入口,负责路由、认证、限流熔断。
配置中心 :Nacos Config 或 Spring Cloud Config,集中管理配置并支持动态刷新。
消息队列 :Kafka 或 RocketMQ,处理异步事件流。
缓存层 :Redis,缓存热点推荐结果和特征数据。
系统调用关系
┌─────────────────────────────────────────────────────────────────────┐
│ API Gateway (Spring Cloud Gateway) │
│ 认证 | 限流 | 路由 | 熔断 │
└─────────────────────────────────────────────────────────────────────┘
│ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ Recommendation Service (推荐服务) │
│ ┌─────────────────────────────────────────────┐ │
│ │ Controller → Service → Feature Assembler │ │
│ └─────────────────────────────────────────────┘ │
└─────┬───────────────┬───────────────┬───────────────┬───────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐ ┌─────────────────────────┐
│ User │ │ Item │ │ Feature │ │ Model Serving │
│ Service │ │ Service │ │ Engine │ │ (Python/PyTorch) │
│ │ │ │ │ │ │ │
│ • 协同过滤│ │ • 用户信息│ │ • 实时行为│ │ • NCF/DeepFM │
│ • 矩阵分解│ │ • 用户画像│ │ • 特征组装│ │ • 深度学习 │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘ └─────────┬───────────────┘
│ │ │ │
└───────┬───────┴───────┬───────┴───────────────────┘
▼ ▼
┌───────────────────────────────────────────────────┐
│ Nacos (注册中心) │
│ • 服务注册与发现 • 配置中心 • 健康检查 │
└───────────────────────────────────────────────────┘
▼
┌───────────────────────────────────────────────────┐
│ Kafka (消息队列) │
│ • 曝光事件 • 点击事件 • 转化事件 │
└───────────────────────────────────────────────────┘
▼
┌───────────────────────────────────────────────────┐
│ Event Collector Service │
│ • 实时日志收集 • 在线学习反馈 │
└───────────────────────────────────────────────────┘
架构亮点
特征与模型解耦 :特征工程和模型推理独立为微服务,并行迭代互不影响。
水平扩展能力 :各服务可根据负载独立扩容,如大促期间增加推荐服务和模型服务实例。
容错与降级 :通过 Resilience4j 实现熔断降级,模型不可用时降级到规则推荐。
灰度发布支持 :网关流量路由支持不同算法版本灰度测试。
AI 模型选型与训练 推荐算法选择需综合考虑业务场景、数据规模和实时性要求。常见算法包括协同过滤、矩阵分解及深度学习模型。
算法选型分析
协同过滤 :经典算法,分为基于用户和物品两种。实现简单、可解释性强,但存在冷启动和稀疏性问题。
矩阵分解 :将评分矩阵分解为低维因子,捕捉潜在关系。SVD++ 是代表算法,能更好处理数据稀疏问题。
深度学习模型 :如 NCF、DeepFM、Wide&Deep。能捕捉高阶特征交互,效果通常优于传统方法。
模型训练与导出 实际项目中,我们通常使用 Python 生态中的 PyTorch 进行训练。以下是一个使用 PyTorch 实现 NCF 模型的简化示例:
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader
class NCFModel (nn.Module):
def __init__ (self, num_users, num_items, embedding_dim=32 ):
super (NCFModel, self ).__init__()
self .user_embedding = nn.Embedding(num_users, embedding_dim)
self .item_embedding = nn.Embedding(num_items, embedding_dim)
self .mlp = nn.Sequential(
nn.Linear(embedding_dim * 2 , 128 ),
nn.ReLU(),
nn.Dropout(0.2 ),
nn.Linear(128 , 64 ),
nn.ReLU(),
nn.Dropout(0.2 ),
nn.Linear(64 , 1 ),
nn.Sigmoid()
)
def forward (self, user_ids, item_ids ):
user_emb = self .user_embedding(user_ids)
item_emb = self .item_embedding(item_ids)
concat = torch.cat([user_emb, item_emb], dim=-1 )
return self .mlp(concat)
class RecommendationDataset (Dataset ):
def __init__ (self, user_ids, item_ids, labels ):
self .user_ids = torch.LongTensor(user_ids)
self .item_ids = torch.LongTensor(item_ids)
self .labels = torch.FloatTensor(labels)
def __len__ (self ):
return len (self .labels)
def __getitem__ (self, idx ):
return self .user_ids[idx], self .item_ids[idx], self .labels[idx]
def train_model (train_data, val_data, num_users, num_items, epochs=10 ):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu' )
model = NCFModel(num_users, num_items).to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001 )
train_loader = DataLoader(train_data, batch_size=256 , shuffle=True )
for epoch in range (epochs):
model.train()
total_loss = 0
for user_ids, item_ids, labels in train_loader:
user_ids, item_ids, labels = user_ids.to(device), item_ids.to(device), labels.to(device)
optimizer.zero_grad()
predictions = model(user_ids, item_ids).squeeze()
loss = criterion(predictions, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
print (f'Epoch {epoch+1 } /{epochs} , Loss: {total_loss/len (train_loader):.4 f} ' )
return model
def export_to_onnx (model, output_path='ncf_model.onnx' ):
model.eval ()
dummy_user_ids = torch.LongTensor([0 ])
dummy_item_ids = torch.LongTensor([0 ])
torch.onnx.export(
model, (dummy_user_ids, dummy_item_ids), output_path,
input_names=['user_ids' , 'item_ids' ],
output_names=['prediction' ],
dynamic_axes={'user_ids' : {0 : 'batch_size' }, 'item_ids' : {0 : 'batch_size' }, 'prediction' : {0 : 'batch_size' }},
opset_version=14
)
print (f'Model exported to {output_path} ' )
if __name__ == '__main__' :
num_users = 10000
num_items = 50000
num_samples = 100000
user_ids = np.random.randint(0 , num_users, num_samples)
item_ids = np.random.randint(0 , num_items, num_samples)
labels = np.random.randint(0 , 2 , num_samples).astype(float )
split_idx = int (0.8 * num_samples)
train_data = RecommendationDataset(user_ids[:split_idx], item_ids[:split_idx], labels[:split_idx])
val_data = RecommendationDataset(user_ids[split_idx:], item_ids[split_idx:], labels[split_idx:])
model = train_model(train_data, val_data, num_users, num_items, epochs=5 )
export_to_onnx(model)
模型服务化 训练完成后,将模型部署为服务供 Java 应用调用。以下是使用 FastAPI 实现的模型推理服务:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import torch
import onnxruntime as ort
import numpy as np
from typing import List
app = FastAPI(title='Recommendation Model API' )
session = ort.InferenceSession('ncf_model.onnx' )
class PredictionRequest (BaseModel ):
user_ids: List [int ]
item_ids: List [int ]
class PredictionResponse (BaseModel ):
predictions: List [float ]
@app.post('/api/v1/predict' , response_model=PredictionResponse )
async def predict (request: PredictionRequest ):
try :
user_ids = np.array(request.user_ids, dtype=np.int64).reshape(-1 , 1 )
item_ids = np.array(request.item_ids, dtype=np.int64).reshape(-1 , 1 )
inputs = {'user_ids' : user_ids, 'item_ids' : item_ids}
predictions = session.run(None , inputs)[0 ]
return PredictionResponse(predictions=predictions.tolist())
except Exception as e:
raise HTTPException(status_code=500 , detail=str (e))
@app.get('/health' )
async def health_check ():
return {'status' : 'healthy' }
if __name__ == '__main__' :
import uvicorn
uvicorn.run(app, host='0.0.0.0' , port=8000 )
这个模型服务提供了 REST API,Java 应用可以通过 HTTP 调用。对于高性能场景,也可以考虑使用 gRPC 协议。
核心微服务实现 本节将详细介绍推荐系统中核心微服务的实现,包括服务间调用、特征组装、模型推理集成、异步日志收集和熔断降级等关键功能。所有代码基于 Spring Boot 3 和 Spring Cloud 2022。
项目结构与依赖
<project >
<groupId > com.example.recommendation</groupId >
<artifactId > recommendation-system</artifactId >
<version > 1.0.0</version >
<packaging > pom</packaging >
<modules >
<module > user-service</module >
<module > item-service</module >
<module > recommendation-service</module >
<module > feature-engine</module >
<module > common</module >
</modules >
<properties >
<java.version > 17</java.version >
<spring-boot.version > 3.2.0</spring-boot.version >
<spring-cloud.version > 2023.0.0</spring-cloud.version >
</properties >
<dependencyManagement >
<dependencies >
<dependency >
<groupId > org.springframework.boot</groupId >
<artifactId > spring-boot-dependencies</artifactId >
<version > ${spring-boot.version}</version >
<type > pom</type >
<scope > import</scope >
</dependency >
<dependency >
<groupId > org.springframework.cloud</groupId >
<artifactId > spring-cloud-dependencies</artifactId >
<version > ${spring-cloud.version}</version >
<type > pom</type >
<scope > import</scope >
</dependency >
</dependencies >
</dependencyManagement >
</project >
User Service 实现
package com.example.user.controller;
import com.example.user.dto.UserProfileDTO;
import com.example.user.service.UserService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/users")
@RequiredArgsConstructor
public class UserController {
private final UserService userService;
@GetMapping("/{userId}/profile")
public UserProfileDTO getUserProfile (@PathVariable Long userId) {
return userService.getUserProfile(userId);
}
@PostMapping("/profiles/batch")
public Map<Long, UserProfileDTO> getUserProfilesBatch (@RequestBody List<Long> userIds) {
return userService.getUserProfilesBatch(userIds);
}
}
package com.example.user.service;
import com.example.user.dto.UserProfileDTO;
import com.example.user.entity.UserProfile;
import com.example.user.repository.UserProfileRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
public class UserService {
private final UserProfileRepository userProfileRepository;
@Cacheable(value = "userProfiles", key = "#userId")
public UserProfileDTO getUserProfile (Long userId) {
UserProfile profile = userProfileRepository.findById(userId).orElseThrow(() -> new RuntimeException ("User not found: " + userId));
return UserProfileDTO.builder()
.userId(profile.getUserId())
.gender(profile.getGender())
.age(profile.getAge())
.city(profile.getCity())
.membershipLevel(profile.getMembershipLevel())
.interestTags(profile.getInterestTags())
.build();
}
public Map<Long, UserProfileDTO> getUserProfilesBatch (List<Long> userIds) {
List<UserProfile> profiles = userProfileRepository.findAllById(userIds);
return profiles.stream().collect(Collectors.toMap(UserProfile::getUserId, profile -> UserProfileDTO.builder()
.userId(profile.getUserId())
.gender(profile.getGender())
.age(profile.getAge())
.city(profile.getCity())
.membershipLevel(profile.getMembershipLevel())
.interestTags(profile.getInterestTags())
.build()));
}
}
server:
port: 8081
spring:
application:
name: user-service
datasource:
url: jdbc:mysql://localhost:3306/user_db
username: root
password: password
data:
redis:
host: localhost
port: 6379
spring.cloud.nacos:
discovery:
server-addr: localhost:8848
namespace: public
config:
server-addr: localhost:8848
file-extension: yml
Recommendation Service 实现 推荐服务是整个系统的核心,负责协调各服务完成推荐流程。首先定义 Feign 客户端:
package com.example.recommendation.client;
import com.example.common.dto.UserProfileDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List;
import java.util.Map;
@FeignClient(name = "user-service", path = "/api/users", fallbackFactory = UserClientFallback.class)
public interface UserClient {
@GetMapping("/{userId}/profile")
UserProfileDTO getUserProfile (@PathVariable("userId") Long userId) ;
@PostMapping("/profiles/batch")
Map<Long, UserProfileDTO> getUserProfilesBatch (@RequestBody List<Long> userIds) ;
}
package com.example.recommendation.client;
import com.example.common.dto.ItemDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List;
import java.util.Map;
@FeignClient(name = "item-service", path = "/api/items", fallbackFactory = ItemClientFallback.class)
public interface ItemClient {
@GetMapping("/{itemId}")
ItemDTO getItem (@PathVariable("itemId") Long itemId) ;
@PostMapping("/batch")
List<ItemDTO> getItemsBatch (@RequestBody List<Long> itemIds) ;
@GetMapping("/category/{category}")
List<ItemDTO> getItemsByCategory (@PathVariable("category") String category) ;
}
package com.example.recommendation.client;
import com.example.common.dto.UserProfileDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class UserClientFallback implements UserClient {
@Override
public UserProfileDTO getUserProfile (Long userId) {
log.warn("User service fallback triggered for userId: {}" , userId);
return UserProfileDTO.builder()
.userId(userId)
.gender("unknown" )
.age(25 )
.city("unknown" )
.membershipLevel("NORMAL" )
.interestTags(Collections.emptyList())
.build();
}
@Override
public Map<Long, UserProfileDTO> getUserProfilesBatch (List<Long> userIds) {
log.warn("User service batch fallback triggered" );
return Collections.emptyMap();
}
}
package com.example.recommendation.service;
import com.example.common.dto.*;
import com.example.recommendation.client.ItemClient;
import com.example.recommendation.client.UserClient;
import com.example.recommendation.client.ModelClient;
import com.example.recommendation.config.RecommendationProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@Service
@RequiredArgsConstructor
public class RecommendationService {
private final UserClient userClient;
private final ItemClient itemClient;
private final ModelClient modelClient;
private final FeatureEngineClient featureEngineClient;
private final RedisTemplate<String, Object> redisTemplate;
private final KafkaTemplate<String, Object> kafkaTemplate;
private final RecommendationProperties properties;
public RecommendationResult getRecommendations (Long userId, String scenario, int size) {
String cacheKey = String.format("recommendation:%d:%s" , userId, scenario);
List<RecommendedItem> cachedResult = (List<RecommendedItem>) redisTemplate.opsForValue().get(cacheKey);
if (cachedResult != null ) {
log.info("Cache hit for user: {}, scenario: {}" , userId, scenario);
return RecommendationResult.builder()
.userId(userId)
.scenario(scenario)
.items(cachedResult)
.source("CACHE" )
.build();
}
UserProfileDTO userProfile = userClient.getUserProfile(userId);
List<Long> candidateItemIds = getCandidateItems(userProfile, scenario, size * 10 );
List<ItemDTO> candidateItems = itemClient.getItemsBatch(candidateItemIds);
FeatureVector featureVector = buildFeatureVector(userId, userProfile, candidateItems);
Map<Long, Double> itemScores = modelClient.predict(userId, candidateItemIds, featureVector);
List<RecommendedItem> recommendedItems = candidateItems.stream()
.filter(item -> itemScores.containsKey(item.getItemId()))
.sorted((a, b) -> Double.compare(itemScores.get(b.getItemId()), itemScores.get(a.getItemId())))
.limit(size)
.map(item -> RecommendedItem.builder()
.itemId(item.getItemId())
.itemName(item.getItemName())
.category(item.getCategory())
.price(item.getPrice())
.score(itemScores.get(item.getItemId()))
.reason("基于您的兴趣推荐" )
.build())
.collect(Collectors.toList());
redisTemplate.opsForValue().set(cacheKey, recommendedItems, Duration.ofMinutes(properties.getCacheExpireMinutes()));
recordExposureEvent(userId, scenario, recommendedItems);
return RecommendationResult.builder()
.userId(userId)
.scenario(scenario)
.items(recommendedItems)
.source("MODEL" )
.build();
}
private List<Long> getCandidateItems (UserProfileDTO userProfile, String scenario, int poolSize) {
List<String> interests = userProfile.getInterestTags();
if (interests.isEmpty()) {
interests = Arrays.asList("热门" );
}
return itemClient.getItemsByCategory(interests.get(0 )).stream()
.limit(poolSize)
.map(ItemDTO::getItemId)
.collect(Collectors.toList());
}
private FeatureVector buildFeatureVector (Long userId, UserProfileDTO userProfile, List<ItemDTO> items) {
return FeatureVector.builder()
.userId(userId)
.userGender(userProfile.getGender())
.userAge(userProfile.getAge())
.userCity(userProfile.getCity())
.membershipLevel(userProfile.getMembershipLevel())
.interestTags(userProfile.getInterestTags())
.itemCategories(items.stream().map(ItemDTO::getCategory).distinct().collect(Collectors.toList()))
.hourOfDay(LocalDateTime.now().getHour())
.dayOfWeek(LocalDateTime.now().getDayOfWeek().getValue())
.build();
}
private void recordExposureEvent (Long userId, String scenario, List<RecommendedItem> items) {
List<Long> itemIds = items.stream().map(RecommendedItem::getItemId).collect(Collectors.toList());
ExposureEvent event = ExposureEvent.builder()
.userId(userId)
.scenario(scenario)
.itemIds(itemIds)
.timestamp(System.currentTimeMillis())
.build();
kafkaTemplate.send("recommendation-exposure" , event);
log.debug("Exposure event sent for user: {}" , userId);
}
public void recordClickEvent (Long userId, Long itemId, String scenario) {
ClickEvent event = ClickEvent.builder()
.userId(userId)
.itemId(itemId)
.scenario(scenario)
.timestamp(System.currentTimeMillis())
.build();
kafkaTemplate.send("recommendation-click" , event);
log.info("Click event recorded for user: {}, item: {}" , userId, itemId);
}
}
模型推理客户端(通过 WebClient 调用 Python 模型服务):
package com.example.recommendation.client;
import com.example.recommendation.dto.ModelPredictRequest;
import com.example.recommendation.dto.ModelPredictResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
@RequiredArgsConstructor
public class ModelClient {
private final WebClient.Builder webClientBuilder;
@Value("${model.service.url}")
private String modelServiceUrl;
@Value("${model.service.timeout}")
private int timeoutMs;
public Map<Long, Double> predict (Long userId, List<Long> itemIds, FeatureVector featureVector) {
WebClient webClient = webClientBuilder.baseUrl(modelServiceUrl).build();
List<Integer> userIds = Collections.nCopies(itemIds.size(), userId.intValue());
ModelPredictRequest request = ModelPredictRequest.builder()
.userIds(userIds)
.itemIds(itemIds.stream().map(Long::intValue).collect(Collectors.toList()))
.featureVector(featureVector)
.build();
Map<Long, Double> scores = new HashMap <>();
try {
ModelPredictResponse response = webClient.post()
.uri("/api/v1/predict" )
.bodyValue(request)
.retrieve()
.bodyToMono(ModelPredictResponse.class)
.timeout(Duration.ofMillis(timeoutMs))
.retryWhen(Retry.backoff(3 , Duration.ofMillis(100 )))
.block();
if (response != null && response.getPredictions() != null ) {
for (int i = 0 ; i < itemIds.size(); i++) {
scores.put(itemIds.get(i), response.getPredictions().get(i));
}
}
} catch (Exception e) {
log.error("Model prediction failed for userId: {}, error: {}" , userId, e.getMessage());
itemIds.forEach(id -> scores.put(id, 0.5 ));
}
return scores;
}
}
server:
port: 8083
spring:
application:
name: recommendation-service
cloud:
nacos:
discovery:
server-addr: localhost:8848
openfeign:
client:
config:
default:
connectTimeout: 2000
readTimeout: 5000
loggerLevel: basic
circuitbreaker:
enabled: true
circuitbreaker:
configs:
default:
slidingWindowSize: 10
minimumNumberOfCalls: 5
failureRateThreshold: 50
waitDurationInOpenState: 10s
permittedNumberOfCallsInHalfOpenState: 3
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
model:
service:
url: http://localhost:8000
timeout: 3000
recommendation:
cache:
expire-minutes: 30
candidate:
pool-size: 500
management:
endpoints:
web:
exposure:
include: health, metrics, prometheus
metrics:
export:
prometheus:
enabled: true
熔断与降级配置 使用 Resilience4j 实现服务熔断和降级:
package com.example.recommendation.config;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.timelimiter.TimeLimiterConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
@Configuration
public class ResilienceConfig {
@Bean
public CircuitBreakerRegistry circuitBreakerRegistry () {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.slidingWindowSize(10 )
.minimumNumberOfCalls(5 )
.failureRateThreshold(50 )
.waitDurationInOpenState(Duration.ofSeconds(10 ))
.permittedNumberOfCallsInHalfOpenState(3 )
.slowCallDurationThreshold(Duration.ofSeconds(3 ))
.slowCallRateThreshold(50 )
.build();
return CircuitBreakerRegistry.of(config);
}
@Bean
public TimeLimiterConfig timeLimiterConfig () {
return TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(5 )).build();
}
}
package com.example.recommendation.fallback;
import com.example.common.dto.RecommendationResult;
import com.example.common.dto.RecommendedItem;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
@Slf4j
@Component
public class RecommendationFallback {
public RecommendationResult getHotItemsFallback (Long userId, String scenario) {
log.warn("Model service degraded, using hot items fallback for user: {}" , userId);
List<RecommendedItem> hotItems = getHotItems(scenario);
return RecommendationResult.builder()
.userId(userId)
.scenario(scenario)
.items(hotItems)
.source("FALLBACK_HOT_ITEMS" )
.build();
}
private List<RecommendedItem> getHotItems (String scenario) {
return Arrays.asList(
RecommendedItem.builder()
.itemId(1001L )
.itemName("热门商品 1" )
.category("电子" )
.price(299.0 )
.score(0.95 )
.reason("热门推荐" )
.build(),
RecommendedItem.builder()
.itemId(1002L )
.itemName("热门商品 2" )
.category("服装" )
.price(199.0 )
.score(0.92 )
.reason("热门推荐" )
.build()
);
}
}
异步日志收集 通过 Kafka 发送用户行为事件,用于在线学习和模型更新:
package com.example.recommendation.producer;
import com.example.common.dto.ExposureEvent;
import com.example.common.dto.ClickEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Slf4j
@Component
@RequiredArgsConstructor
public class EventProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void sendExposureEvent (ExposureEvent event) {
kafkaTemplate.send("recommendation-exposure" , event).addCallback(new ListenableFutureCallback <SendResult<String, Object>>() {
@Override
public void onSuccess (SendResult<String, Object> result) {
log.debug("Exposure event sent successfully: {}" , event);
}
@Override
public void onFailure (Throwable ex) {
log.error("Failed to send exposure event: {}" , event, ex);
}
});
}
public void sendClickEvent (ClickEvent event) {
kafkaTemplate.send("recommendation-click" , event).addCallback(new ListenableFutureCallback <SendResult<String, Object>>() {
@Override
public void onSuccess (SendResult<String, Object> result) {
log.info("Click event sent successfully: {}" , event);
}
@Override
public void onFailure (Throwable ex) {
log.error("Failed to send click event: {}" , event, ex);
}
});
}
}
部署与性能优化 完成了核心服务的开发后,接下来需要考虑如何将这些服务部署到生产环境,并进行性能优化以确保系统的高可用和低延迟。
Docker 容器化 首先为每个服务创建 Dockerfile。以下以 recommendation-service 为例:
# recommendation-service/Dockerfile
FROM eclipse-temurin:17-jre-alpine
WORKDIR /app
# 复制 JAR 文件
COPY target/recommendation-service-*.jar app.jar
# 设置 JVM 参数
ENV JAVA_OPTS="-Xms512m -Xmx1g -XX:+UseG1GC -XX:MaxGCPauseMillis=200"
EXPOSE 8083
ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar app.jar"]
使用 Docker Compose 进行本地开发环境编排:
version: '3.8'
services:
nacos:
image: nacos/nacos-server:v2.2.3
ports:
- "8848:8848"
environment:
MODE: standalone
mysql:
image: mysql:8.0
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: password
MYSQL_DATABASE: recommendation_db
volumes:
- mysql-data:/var/lib/mysql
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
user-service:
build: ./user-service
ports:
- "8081:8081"
environment:
SPRING_CLOUD_NACOS_DISCOVERY_SERVER_ADDR: nacos:8848
SPRING_DATASOURCE_URL: jdbc:mysql://mysql:3306/user_db
SPRING_DATA_REDIS_HOST: redis
depends_on:
- nacos
- mysql
- redis
item-service:
build: ./item-service
ports:
- "8082:8082"
environment:
SPRING_CLOUD_NACOS_DISCOVERY_SERVER_ADDR: nacos:8848
SPRING_DATASOURCE_URL: jdbc:mysql://mysql:3306/item_db
depends_on:
- nacos
- mysql
recommendation-service:
build: ./recommendation-service
ports:
- "8083:8083"
environment:
SPRING_CLOUD_NACOS_DISCOVERY_SERVER_ADDR: nacos:8848
SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
MODEL_SERVICE_URL: http://model-service:8000
depends_on:
- nacos
- kafka
- user-service
- item-service
model-service:
build: ./model-service
ports:
- "8000:8000"
environment:
- MODEL_PATH=/app/models/ncf_model.onnx
volumes:
- ./models:/app/models
volumes:
mysql-data:
redis-data:
Kubernetes 编排 对于生产环境,使用 Kubernetes 进行容器编排。以下是 recommendation-service 的部署配置:
apiVersion: apps/v1
kind: Deployment
metadata:
name: recommendation-service
labels:
app: recommendation-service
spec:
replicas: 3
selector:
matchLabels:
app: recommendation-service
template:
metadata:
labels:
app: recommendation-service
spec:
containers:
- name: recommendation-service
image: your-registry/recommendation-service:1.0.0
ports:
- containerPort: 8083
env:
- name: SPRING_PROFILES_ACTIVE
value: "prod"
- name: SPRING_CLOUD_NACOS_DISCOVERY_SERVER_ADDR
value: "nacos-service:8848"
- name: SPRING_KAFKA_BOOTSTRAP_SERVERS
value: "kafka-service:9092"
- name: MODEL_SERVICE_URL
value: "http://model-service:8000"
- name: JAVA_OPTS
value: "-Xms1g -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=200"
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /actuator/health
port: 8083
initialDelaySeconds: 60
periodSeconds: 10
readinessProbe:
httpGet:
path: /actuator/health
port: 8083
initialDelaySeconds: 30
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: recommendation-service
spec:
selector:
app: recommendation-service
ports:
- protocol: TCP
port: 8083
targetPort: 8083
type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: recommendation-service-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: recommendation-service
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
性能优化策略
1. 模型服务 GPU 加速 对于深度学习模型,使用 GPU 可以显著提升推理速度。以下是支持 GPU 的模型服务部署配置:
# model-service/Dockerfile
FROM nvidia/cuda:11.8.0-runtime-ubuntu22.04
RUN apt-get update && apt-get install -y \ python3.10 \ python3-pip \ && rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt
COPY model_server.py .
COPY models/ ./models/
EXPOSE 8000
CMD ["python3", "model_server.py"]
resources:
limits:
nvidia.com/gpu: 1
2. 多级缓存策略
@Configuration
@EnableCaching
public class CacheConfiguration {
@Bean
public RedisCacheManager redisCacheManager (RedisConnectionFactory factory) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(30 ))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer ()))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer ()));
return RedisCacheManager.builder(factory).cacheDefaults(config).withInitialCacheConfigurations(getCacheConfigurations()).build();
}
private Map<String, RedisCacheConfiguration> getCacheConfigurations () {
Map<String, RedisCacheConfiguration> configMap = new HashMap <>();
configMap.put("userProfiles" , RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofHours(1 )));
configMap.put("recommendations" , RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofMinutes(30 )));
configMap.put("hotItems" , RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofMinutes(10 )));
return configMap;
}
}
3. JVM 调优 针对推荐服务的内存和 CPU 特性进行 JVM 参数调优:
JAVA_OPTS=" -Xms2g -Xmx4g -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:G1ReservePercent=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/logs/heapdump.hprof -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/logs/gc.log -Duser.timezone=Asia/Shanghai -Dfile.encoding=UTF-8 "
4. 数据库优化
为高频查询字段添加索引
使用读写分离分担主库压力
对于用户画像等热点数据使用 Redis 缓存
CREATE INDEX idx_user_id ON user_profile(user_id);
CREATE INDEX idx_membership ON user_profile(membership_level);
CREATE INDEX idx_city ON user_profile(city);
CREATE INDEX idx_item_category ON item(category);
CREATE INDEX idx_item_brand ON item(brand);
CREATE INDEX idx_item_created ON item(created_at);
压测结果 使用 JMeter 进行压力测试,测试场景为并发获取推荐结果:
推荐服务:3 个实例,每实例 2C4G
模型服务:2 个实例,每实例 4C8G + GPU
Redis:1 主 2 从
Kafka:3 节点集群
并发数 QPS 平均响应时间 P95 响应时间 P99 响应时间 错误率 100 850 115ms 180ms 250ms 0% 500 3200 155ms 280ms 420ms 0% 1000 4800 205ms 450ms 680ms 0.1% 2000 5200 380ms 850ms 1200ms 2%
并发数 QPS 平均响应时间 P95 响应时间 P99 响应时间 错误率 100 1500 65ms 95ms 130ms 0% 500 5800 85ms 150ms 220ms 0% 1000 9500 105ms 200ms 320ms 0% 2000 12000 165ms 350ms 520ms 0.05%
通过缓存和 JVM 调优,系统性能提升了约 100-150%,P99 延迟降低了约 50%。
总结与展望 本文详细介绍了基于 Spring Cloud 微服务架构构建智能推荐系统的完整过程,从架构设计、模型选型、服务实现到部署优化,全方位展示了如何在企业级 Java 生态中集成 AI 能力。通过微服务架构,我们实现了特征工程与模型推理的解耦,使各组件可以独立开发、部署和扩展;通过服务熔断和降级策略,确保了系统的高可用性;通过多级缓存和 JVM 调优,显著提升了系统性能。
Spring Cloud 与 AI 的结合为传统 Java 应用注入了智能化能力,使开发者可以在熟悉的 Java 生态中轻松集成机器学习模型。这种架构的优势在于:一是技术栈的灵活性,Java 开发者专注于业务逻辑和数据流转,Python 算法工程师专注于模型开发和训练,各司其职;二是系统的可扩展性,各服务可根据负载独立扩容,应对流量洪峰;三是部署的便捷性,通过容器化和编排工具,可以实现快速部署和弹性伸缩。
实时推荐 :当前的系统主要基于离线计算的批量推荐,未来可以引入实时流处理技术(如 Flink),实现基于用户实时行为的毫秒级推荐更新。当用户浏览一个商品后,系统可以立即调整后续推荐内容,提升用户体验和转化率。
多模态特征融合 :随着多媒体内容的普及,推荐系统需要处理文本、图像、视频等多种模态的特征。可以引入视觉特征提取模型(如 CLIP、ResNet),实现'以图搜图'和跨模态推荐。例如,用户上传一张图片,系统可以推荐相似的商品。
联邦学习与隐私保护 :在数据隐私保护日益重要的背景下,联邦学习成为一种重要的技术方向。通过在用户设备本地进行模型训练,只上传模型参数而非原始数据,可以在保护用户隐私的同时实现个性化推荐。
AutoML 与模型自动迭代 :引入 AutoML 技术,实现模型的自动训练、评估和部署。当新算法出现时,系统可以自动进行 A/B 测试,选择最优模型上线,形成闭环的算法迭代体系。
大模型增强推荐 :利用大语言模型(LLM)的强大理解能力,实现更精准的用户意图识别和推荐理由生成。LLM 可以根据用户对话历史生成更符合用户偏好的推荐结果,同时提供可解释的推荐理由。
AI 技术的快速发展为推荐系统带来了新的机遇和挑战。作为 Java 开发者,我们需要保持开放的心态,积极学习和拥抱新技术,在保证系统稳定性和可维护性的前提下,将 AI 能力优雅地集成到现有架构中。希望本文能够为读者提供有价值的参考,助力大家在 AI 时代的技术升级之路。
相关免费在线工具 Keycode 信息 查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
Escape 与 Native 编解码 JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
JavaScript / HTML 格式化 使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
JavaScript 压缩与混淆 Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
RSA密钥对生成器 生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online