Spring Cloud 与 AI 结合构建分布式智能推荐系统
基于 Spring Cloud 微服务架构构建分布式智能推荐系统的完整方案。涵盖整体架构设计、AI 模型选型与训练(NCF)、核心微服务实现(用户、商品、推荐、特征工程)、熔断降级配置及部署优化策略。通过 Java 生态集成 Python AI 服务,实现了高可用、可扩展的智能推荐功能,并提供了性能压测结果与未来展望。

基于 Spring Cloud 微服务架构构建分布式智能推荐系统的完整方案。涵盖整体架构设计、AI 模型选型与训练(NCF)、核心微服务实现(用户、商品、推荐、特征工程)、熔断降级配置及部署优化策略。通过 Java 生态集成 Python AI 服务,实现了高可用、可扩展的智能推荐功能,并提供了性能压测结果与未来展望。

一个完整的推荐系统需要处理从用户请求到推荐结果返回的全链路流程。在微服务架构下,我们将整个系统拆分为多个职责单一的服务,通过服务间协作完成推荐任务。核心服务包括用户服务、商品服务、特征工程服务、推荐服务和模型推理服务。
服务划分与职责
基础设施组件
系统调用关系
┌─────────────────────────────────────────────────────────────────────┐
│ API Gateway (Spring Cloud Gateway) │
│ 认证 | 限流 | 路由 | 熔断 │
└─────────────────────────────────────────────────────────────────────┘
│ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ Recommendation Service (推荐服务) │
│ ┌─────────────────────────────────────────────┐ │
│ │ Controller → Service → Feature Assembler │ │
│ └─────────────────────────────────────────────┘ │
└─────┬───────────────┬───────────────┬───────────────┬───────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐ ┌─────────────────────────┐
│ User │ │ Item │ │ Feature │ │ Model Serving │
│ Service │ │ Service │ │ Engine │ │ (Python/PyTorch) │
│ │ │ │ │ │
│ • 协同过滤 │ │ • 用户信息 │ │ • 商品信息 │ │ • 实时行为 │
│ • 矩阵分解 │ │ • 用户画像 │ │ • 商品特征 │ │ • 特征组装 │
│ • NCF/DeepFM │ │ │ │ │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘ └─────────┬───────────────┘
│ │ │ │
└───────────────┴───────────────┴────────────────────┘
│ ▼
┌───────────────────────────┐
│ Nacos (注册中心) │
│ • 服务注册与发现 │
│ • 配置中心 │
│ • 健康检查 │
└───────────────────────────┘
│ ▼
┌───────────────────────────┐
│ Kafka (消息队列) │
│ • 曝光事件 │
│ • 点击事件 │
│ • 转化事件 │
└───────────────────────────┘
│ ▼
┌───────────────────────────┐
│ Event Collector Service │
│ • 实时日志收集 │
│ • 在线学习反馈 │
└───────────────────────────┘
架构设计亮点
推荐算法的选择需要综合考虑业务场景、数据规模、实时性要求和计算资源。对于电商和内容平台,常见的推荐算法包括基于协同过滤的传统方法、基于矩阵分解的隐因子模型,以及基于深度学习的神经网络模型。
算法选型分析
模型训练与导出
在实际项目中,我们通常使用 Python 生态中的 PyTorch 或 TensorFlow 进行模型训练。以下是一个使用 PyTorch 实现 NCF 模型的简化示例:
# train_ncf_model.py
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader
# 定义 NCF 模型
class NCFModel(nn.Module):
def __init__(self, num_users, num_items, embedding_dim=32):
super(NCFModel, self).__init__()
# 用户嵌入层
self.user_embedding = nn.Embedding(num_users, embedding_dim)
# 物品嵌入层
self.item_embedding = nn.Embedding(num_items, embedding_dim)
# MLP 层
self.mlp = nn.Sequential(
nn.Linear(embedding_dim * 2, 128),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(128, 64),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(64, 1),
nn.Sigmoid()
)
def forward(self, user_ids, item_ids):
user_emb = self.user_embedding(user_ids)
item_emb = self.item_embedding(item_ids)
concat = torch.cat([user_emb, item_emb], dim=-1)
return self.mlp(concat)
# 自定义数据集
class RecommendationDataset(Dataset):
def __init__():
.user_ids = torch.LongTensor(user_ids)
.item_ids = torch.LongTensor(item_ids)
.labels = torch.FloatTensor(labels)
():
(.labels)
():
.user_ids[idx], .item_ids[idx], .labels[idx]
():
device = torch.device( torch.cuda.is_available() )
model = NCFModel(num_users, num_items).to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=)
train_loader = DataLoader(train_data, batch_size=, shuffle=)
epoch (epochs):
model.train()
total_loss =
user_ids, item_ids, labels train_loader:
user_ids, item_ids, labels = user_ids.to(device), item_ids.to(device), labels.to(device)
optimizer.zero_grad()
predictions = model(user_ids, item_ids).squeeze()
loss = criterion(predictions, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
()
model
():
model.()
dummy_user_ids = torch.LongTensor([])
dummy_item_ids = torch.LongTensor([])
torch.onnx.export(
model,
(dummy_user_ids, dummy_item_ids),
output_path,
input_names=[,],
output_names=[],
dynamic_axes={:{:},:{:},:{:}},
opset_version=
)
()
__name__ == :
num_users =
num_items =
num_samples =
user_ids = np.random.randint(, num_users, num_samples)
item_ids = np.random.randint(, num_items, num_samples)
labels = np.random.randint(,, num_samples).astype()
split_idx = (* num_samples)
train_data = RecommendationDataset(user_ids[:split_idx], user_ids[:split_idx], labels[:split_idx])
val_data = RecommendationDataset(user_ids[split_idx:], user_ids[split_idx:], labels[split_idx:])
model = train_model(train_data, val_data, num_users, num_items, epochs=)
export_to_onnx(model)
模型服务化
训练完成后,我们需要将模型部署为服务供 Java 应用调用。以下是使用 FastAPI 实现的模型推理服务:
# model_server.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import torch
import onnxruntime as ort
import numpy as np
from typing import List
app = FastAPI(title='Recommendation Model API')
# 加载 ONNX 模型
session = ort.InferenceSession('ncf_model.onnx')
class PredictionRequest(BaseModel):
user_ids: List[int]
item_ids: List[int]
class PredictionResponse(BaseModel):
predictions: List[float]
@app.post('/api/v1/predict', response_model=PredictionResponse)
async def predict(request: PredictionRequest):
try:
# 准备输入数据
user_ids = np.array(request.user_ids, dtype=np.int64).reshape(-1,1)
item_ids = np.array(request.item_ids, dtype=np.int64).reshape(-1,1)
# ONNX 推理
inputs = {'user_ids': user_ids,'item_ids': item_ids }
predictions = session.run(None, inputs)[0]
return PredictionResponse(predictions=predictions.tolist())
except Exception e:
HTTPException(status_code=, detail=(e))
():
{:}
__name__ == :
uvicorn
uvicorn.run(app, host=, port=)
这个模型服务提供了 REST API,Java 应用可以通过 HTTP 调用。对于高性能场景,也可以考虑使用 gRPC 协议,它比 REST 更高效,支持双向流式通信。
本节将详细介绍推荐系统中核心微服务的实现,包括服务间调用、特征组装、模型推理集成、异步日志收集和熔断降级等关键功能。所有代码基于 Spring Boot 3 和 Spring Cloud 2022。
首先,创建一个 Maven 多模块项目:
<!-- pom.xml -->
<project>
<groupId>com.example.recommendation</groupId>
<artifactId>recommendation-system</artifactId>
<version>1.0.0</version>
<packaging>pom</packaging>
<modules>
<module>user-service</module>
<module>item-service</module>
<module>recommendation-service</module>
<module>feature-engine</module>
<module>common</module>
</modules>
<properties>
<java.version>17</java.version>
<spring-boot.version>3.2.0</spring-boot.version>
<spring-cloud.version>2023.0.0</spring-cloud.version>
</properties>
org.springframework.boot
spring-boot-dependencies
${spring-boot.version}
pom
import
org.springframework.cloud
spring-cloud-dependencies
${spring-cloud.version}
pom
import
用户服务提供用户基础信息和画像查询接口:
// user-service/src/main/java/com/example/user/controller/UserController.java
package com.example.user.controller;
import com.example.user.dto.UserProfileDTO;
import com.example.user.service.UserService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/users")
@RequiredArgsConstructor
public class UserController {
private final UserService userService;
/**
* 获取用户画像
*/
@GetMapping("/{userId}/profile")
public UserProfileDTO getUserProfile(@PathVariable Long userId) {
return userService.getUserProfile(userId);
}
/**
* 批量获取用户画像
*/
@PostMapping("/profiles/batch")
public Map<Long,UserProfileDTO> getUserProfilesBatch(@RequestBody List<Long> userIds) {
return userService.getUserProfilesBatch(userIds);
}
}
// user-service/src/main/java/com/example/user/service/UserService.java
package com.example.user.service;
import com.example.user.dto.UserProfileDTO;
import com.example.user.entity.UserProfile;
import com.example.user.repository.UserProfileRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
public class UserService {
private final UserProfileRepository userProfileRepository;
/**
* 获取用户画像,使用 Redis 缓存
*/
@Cacheable(value = "userProfiles", key = "#userId")
public UserProfileDTO getUserProfile(Long userId) {
UserProfile profile = userProfileRepository.findById(userId).orElseThrow(() -> new RuntimeException("User not found: " + userId));
return UserProfileDTO.builder()
.userId(profile.getUserId())
.gender(profile.getGender())
.age(profile.getAge())
.city(profile.getCity())
.membershipLevel(profile.getMembershipLevel())
.interestTags(profile.getInterestTags())
.build();
}
/**
* 批量获取用户画像
*/
public Map<Long,UserProfileDTO> getUserProfilesBatch(List<Long> userIds) {
List<UserProfile> profiles = userProfileRepository.findAllById(userIds);
return profiles.stream().collect(Collectors.toMap(UserProfile::getUserId, profile -> UserProfileDTO.builder()
.userId(profile.getUserId())
.gender(profile.getGender())
.age(profile.getAge())
.city(profile.getCity())
.membershipLevel(profile.getMembershipLevel())
.interestTags(profile.getInterestTags())
.build()));
}
}
# user-service/src/main/resources/application.yml
server:
port: 8081
spring:
application:
name: user-service
datasource:
url: jdbc:mysql://localhost:3306/user_db
username: root
password: password
data:
redis:
host: localhost
port: 6379
# Nacos 注册中心配置
spring.cloud.nacos:
discovery:
server-addr: localhost:8848
namespace: public
config:
server-addr: localhost:8848
file-extension: yml
推荐服务是整个系统的核心,负责协调各服务完成推荐流程。首先定义 Feign 客户端:
// recommendation-service/src/main/java/com/example/recommendation/client/UserClient.java
package com.example.recommendation.client;
import com.example.common.dto.UserProfileDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List;
import java.util.Map;
@FeignClient(name = "user-service", path = "/api/users", fallbackFactory = UserClientFallback.class)
public interface UserClient {
@GetMapping("/{userId}/profile")
UserProfileDTO getUserProfile(@PathVariable("userId") Long userId);
@PostMapping("/profiles/batch")
Map<Long,UserProfileDTO> getUserProfilesBatch(@RequestBody List<Long> userIds);
}
// recommendation-service/src/main/java/com/example/recommendation/client/ItemClient.java
package com.example.recommendation.client;
import com.example.common.dto.ItemDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List;
import java.util.Map;
@FeignClient(name = "item-service", path = "/api/items", fallbackFactory = ItemClientFallback.class)
public interface ItemClient {
@GetMapping("/{itemId}")
ItemDTO getItem(@PathVariable("itemId") Long itemId);
@PostMapping("/batch")
List<ItemDTO> getItemsBatch(@RequestBody List<Long> itemIds);
@GetMapping("/category/{category}")
List<ItemDTO> getItemsByCategory(@PathVariable("category") String category);
}
实现降级工厂类:
// recommendation-service/src/main/java/com/example/recommendation/client/UserClientFallback.java
package com.example.recommendation.client;
import com.example.common.dto.UserProfileDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class UserClientFallback implements UserClient {
@Override
public UserProfileDTO getUserProfile(Long userId) {
log.warn("User service fallback triggered for userId: {}", userId);
// 返回默认用户画像
return UserProfileDTO.builder()
.userId(userId)
.gender("unknown")
.age(25)
.city("unknown")
.membershipLevel("NORMAL")
.interestTags(Collections.emptyList())
.build();
}
@Override
public Map<Long,UserProfileDTO> getUserProfilesBatch(List<Long> userIds) {
log.warn("User service batch fallback triggered");
return Collections.emptyMap();
}
}
推荐服务核心逻辑:
// recommendation-service/src/main/java/com/example/recommendation/service/RecommendationService.java
package com.example.recommendation.service;
import com.example.common.dto.*;
import com.example.recommendation.client.ItemClient;
import com.example.recommendation.client.UserClient;
import com.example.recommendation.client.ModelClient;
import com.example.recommendation.config.RecommendationProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@Service
@RequiredArgsConstructor
public class RecommendationService {
private final UserClient userClient;
private final ItemClient itemClient;
private final ModelClient modelClient;
private final FeatureEngineClient featureEngineClient;
private final RedisTemplate<String,Object> redisTemplate;
private final KafkaTemplate<String,Object> kafkaTemplate;
private final RecommendationProperties properties;
/**
* 获取个性化推荐
*/
public RecommendationResult getRecommendations(Long userId, String scenario, int size) {
String.format(, userId, scenario);
List<RecommendedItem> cachedResult = (List<RecommendedItem>) redisTemplate.opsForValue().get(cacheKey);
(cachedResult != ) {
log.info(, userId, scenario);
RecommendationResult.builder().userId(userId).scenario(scenario).items(cachedResult).source().build();
}
userClient.getUserProfile(userId);
List<Long> candidateItemIds = getCandidateItems(userProfile, scenario, size * );
List<ItemDTO> candidateItems = itemClient.getItemsBatch(candidateItemIds);
buildFeatureVector(userId, userProfile, candidateItems);
Map<Long,Double> itemScores = modelClient.predict(userId, candidateItemIds, featureVector);
List<RecommendedItem> recommendedItems = candidateItems.stream()
.filter(item -> itemScores.containsKey(item.getItemId()))
.sorted((a, b) -> Double.compare(itemScores.get(b.getItemId()), itemScores.get(a.getItemId())))
.limit(size)
.map(item -> RecommendedItem.builder()
.itemId(item.getItemId())
.itemName(item.getItemName())
.category(item.getCategory())
.price(item.getPrice())
.score(itemScores.get(item.getItemId()))
.reason()
.build())
.collect(Collectors.toList());
redisTemplate.opsForValue().set(cacheKey, recommendedItems, Duration.ofMinutes(properties.getCacheExpireMinutes()));
recordExposureEvent(userId, scenario, recommendedItems);
RecommendationResult.builder().userId(userId).scenario(scenario).items(recommendedItems).source().build();
}
List<Long> {
List<String> interests = userProfile.getInterestTags();
(interests.isEmpty()) {
interests = Arrays.asList();
}
itemClient.getItemsByCategory(interests.get()).stream()
.limit(poolSize)
.map(ItemDTO::getItemId)
.collect(Collectors.toList());
}
FeatureVector {
FeatureVector.builder()
.userId(userId)
.userGender(userProfile.getGender())
.userAge(userProfile.getAge())
.userCity(userProfile.getCity())
.membershipLevel(userProfile.getMembershipLevel())
.interestTags(userProfile.getInterestTags())
.itemCategories(items.stream().map(ItemDTO::getCategory).distinct().collect(Collectors.toList()))
.hourOfDay(LocalDateTime.now().getHour())
.dayOfWeek(LocalDateTime.now().getDayOfWeek().getValue())
.build();
}
{
List<Long> itemIds = items.stream().map(RecommendedItem::getItemId).collect(Collectors.toList());
ExposureEvent.builder()
.userId(userId)
.scenario(scenario)
.itemIds(itemIds)
.timestamp(System.currentTimeMillis())
.build();
kafkaTemplate.send(, event);
log.debug(, userId);
}
{
ClickEvent.builder()
.userId(userId)
.itemId(itemId)
.scenario(scenario)
.timestamp(System.currentTimeMillis())
.build();
kafkaTemplate.send(, event);
log.info(, userId, itemId);
}
}
模型推理客户端(通过 WebClient 调用 Python 模型服务):
// recommendation-service/src/main/java/com/example/recommendation/client/ModelClient.java
package com.example.recommendation.client;
import com.example.recommendation.dto.ModelPredictRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
@RequiredArgsConstructor
public class ModelClient {
private final WebClient.Builder webClientBuilder;
@Value("${model.service.url}")
private String modelServiceUrl;
@Value("${model.service.timeout}")
private int timeoutMs;
/**
* 调用模型服务获取预测分数
*/
public Map<Long,Double> predict(Long userId, List<Long> itemIds, FeatureVector featureVector) {
WebClient webClient = webClientBuilder.baseUrl(modelServiceUrl).build();
// 构建请求体
List<Integer> userIds = Collections.nCopies(itemIds.size(), userId.intValue());
ModelPredictRequest request = ModelPredictRequest.builder()
.userIds(userIds)
.itemIds(itemIds.stream().map(Long::intValue).collect(Collectors.toList()))
.featureVector(featureVector)
.build();
Map<Long,Double> scores = <>();
{
webClient.post()
.uri()
.bodyValue(request)
.retrieve()
.bodyToMono(ModelPredictResponse.class)
.timeout(Duration.ofMillis(timeoutMs))
.retryWhen(Retry.backoff(, Duration.ofMillis()))
.block();
(response != && response.getPredictions() != ) {
( ; i < itemIds.size(); i++) {
scores.put(itemIds.get(i), response.getPredictions().get(i));
}
}
} (Exception e) {
log.error(, userId, e.getMessage());
itemIds.forEach(id -> scores.put(id, ));
}
scores;
}
}
推荐服务配置:
# recommendation-service/src/main/resources/application.yml
server:
port: 8083
spring:
application:
name: recommendation-service
cloud:
nacos:
discovery:
server-addr: localhost:8848
openfeign:
client:
config:
default:
connectTimeout: 2000
readTimeout: 5000
loggerLevel: basic
circuitbreaker:
enabled: true
# Resilience4j 配置
circuitbreaker:
configs:
default:
slidingWindowSize: 10
minimumNumberOfCalls: 5
failureRateThreshold: 50
waitDurationInOpenState: 10s
permittedNumberOfCallsInHalfOpenState: 3
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# 模型服务配置
model:
使用 Resilience4j 实现服务熔断和降级:
// recommendation-service/src/main/java/com/example/recommendation/config/ResilienceConfig.java
package com.example.recommendation.config;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.timelimiter.TimeLimiterConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
@Configuration
public class ResilienceConfig {
@Bean
public CircuitBreakerRegistry circuitBreakerRegistry() {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.slidingWindowSize(10)
.minimumNumberOfCalls(5)
.failureRateThreshold(50)
.waitDurationInOpenState(Duration.ofSeconds(10))
.permittedNumberOfCallsInHalfOpenState(3)
.slowCallDurationThreshold(Duration.ofSeconds(3))
.slowCallRateThreshold(50)
.build();
return CircuitBreakerRegistry.of(config);
}
@Bean
public TimeLimiterConfig timeLimiterConfig() {
return TimeLimiterConfig.custom()
.timeoutDuration(Duration.ofSeconds(5))
.build();
}
}
实现降级策略:
// recommendation-service/src/main/java/com/example/recommendation/fallback/RecommendationFallback.java
package com.example.recommendation.fallback;
import com.example.common.dto.RecommendationResult;
import com.example.common.dto.RecommendedItem;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@Slf4j
@Component
public class RecommendationFallback {
/**
* 模型服务降级:返回热门商品推荐
*/
public RecommendationResult getHotItemsFallback(Long userId, String scenario) {
log.warn("Model service degraded, using hot items fallback for user: {}", userId);
// 这里可以从缓存或数据库中获取热门商品
List<RecommendedItem> hotItems = getHotItems(scenario);
return RecommendationResult.builder()
.userId(userId)
.scenario(scenario)
.items(hotItems)
.source("FALLBACK_HOT_ITEMS")
.build();
}
/**
* 获取热门商品(实际实现中应从缓存或数据库获取)
*/
private List<RecommendedItem> getHotItems(String scenario) {
// 简化实现,返回固定的热门商品
return Arrays.asList(
RecommendedItem.builder().itemId(1001L).itemName("热门商品 1").category("电子").price(299.0).score(0.95).reason("热门推荐").build(),
RecommendedItem.builder().itemId(1002L).itemName("热门商品 2").category("服装").price(199.0).score().reason().build()
);
}
}
通过 Kafka 发送用户行为事件,用于在线学习和模型更新:
// recommendation-service/src/main/java/com/example/recommendation/producer/EventProducer.java
package com.example.recommendation.producer;
import com.example.common.dto.ExposureEvent;
import com.example.common.dto.ClickEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Slf4j
@Component
@RequiredArgsConstructor
public class EventProducer {
private final KafkaTemplate<String,Object> kafkaTemplate;
/**
* 发送曝光事件
*/
public void sendExposureEvent(ExposureEvent event) {
kafkaTemplate.send("recommendation-exposure", event).addCallback(new ListenableFutureCallback<SendResult<String,Object>>() {
@Override
public void onSuccess(SendResult<String,Object> result) {
log.debug("Exposure event sent successfully: {}", event);
}
@Override
public void onFailure(Throwable ex) {
log.error("Failed to send exposure event: {}", event, ex);
}
});
}
/**
* 发送点击事件
*/
public {
kafkaTemplate.send(, event).addCallback( <SendResult<String,Object>>() {
{
log.info(, event);
}
{
log.error(, event, ex);
}
});
}
}
完成了核心服务的开发后,接下来需要考虑如何将这些服务部署到生产环境,并进行性能优化以确保系统的高可用和低延迟。
首先为每个服务创建 Dockerfile。以下以 recommendation-service 为例:
# recommendation-service/Dockerfile
FROM eclipse-temurin:17-jre-alpine
WORKDIR /app
# 复制 JAR 文件
COPY target/recommendation-service-*.jar app.jar
# 设置 JVM 参数
ENV JAVA_OPTS="-Xms512m -Xmx1g -XX:+UseG1GC -XX:MaxGCPauseMillis=200"
EXPOSE 8083
ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar app.jar"]
使用 Docker Compose 进行本地开发环境编排:
# docker-compose.yml
version: '3.8'
services:
# Nacos 注册中心
nacos:
image: nacos/nacos-server:v2.2.3
ports:
- "8848:8848"
environment:
MODE: standalone
# MySQL
mysql:
image: mysql:8.0
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: password
MYSQL_DATABASE: recommendation_db
volumes:
- mysql-data:/var/lib/mysql
# Redis
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
# Kafka
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:
对于生产环境,使用 Kubernetes 进行容器编排。以下是 recommendation-service 的部署配置:
# k8s/recommendation-service-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: recommendation-service
labels:
app: recommendation-service
spec:
replicas: 3
selector:
matchLabels:
app: recommendation-service
template:
metadata:
labels:
app: recommendation-service
spec:
containers:
- name: recommendation-service
image: your-registry/recommendation-service:1.0.0
ports:
- containerPort: 8083
env:
- name: SPRING_PROFILES_ACTIVE
value: "prod"
- name: SPRING_CLOUD_NACOS_DISCOVERY_SERVER_ADDR
value: "nacos-service:8848"
- name: SPRING_KAFKA_BOOTSTRAP_SERVERS
value: "kafka-service:9092"
- name:
1. 模型服务 GPU 加速
对于深度学习模型,使用 GPU 可以显著提升推理速度。以下是支持 GPU 的模型服务部署配置:
# model-service/Dockerfile
FROM nvidia/cuda:11.8.0-runtime-ubuntu22.04
RUN apt-get update && apt-get install -y \
python3.10 \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt
COPY model_server.py .
COPY models/ ./models/
EXPOSE 8000
CMD ["python3", "model_server.py"]
在 Kubernetes 中使用 GPU:
resources:
limits:
nvidia.com/gpu: 1
2. 多级缓存策略
实现三级缓存来减少计算压力:
// CacheConfiguration.java
@Configuration
@EnableCaching
public class CacheConfiguration {
@Bean
public RedisCacheManager redisCacheManager(RedisConnectionFactory factory) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(30))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
return RedisCacheManager.builder(factory)
.cacheDefaults(config)
.withInitialCacheConfigurations(getCacheConfigurations())
.build();
}
private Map<String,RedisCacheConfiguration> getCacheConfigurations() {
Map<String,RedisCacheConfiguration> configMap = new HashMap<>();
// 用户画像缓存 - 1 小时
configMap.put("userProfiles", RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofHours(1)));
// 推荐结果缓存 - 30 分钟
configMap.put("recommendations", RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofMinutes(30)));
// 热门商品缓存 - 10 分钟
configMap.put("hotItems", RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofMinutes(10)));
return configMap;
}
}
3. JVM 调优
针对推荐服务的内存和 CPU 特性进行 JVM 参数调优:
# 推荐服务的推荐 JVM 参数
JAVA_OPTS=" -Xms2g -Xmx4g -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:G1ReservePercent=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/logs/heapdump.hprof -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/logs/gc.log -Duser.timezone=Asia/Shanghai -Dfile.encoding=UTF-8 "
4. 数据库优化
-- 用户画像表索引优化
CREATE INDEX idx_user_id ON user_profile(user_id);
CREATE INDEX idx_membership ON user_profile(membership_level);
CREATE INDEX idx_city ON user_profile(city);
-- 商品表索引优化
CREATE INDEX idx_item_category ON item(category);
CREATE INDEX idx_item_brand ON item(brand);
CREATE INDEX idx_item_created ON item(created_at);
使用 JMeter 进行压力测试,测试场景为并发获取推荐结果:
测试环境:
测试结果:
| 并发数 | QPS | 平均响应时间 | P95 响应时间 | P99 响应时间 | 错误率 |
|---|---|---|---|---|---|
| 100 | 850 | 115ms | 180ms | 250ms | 0% |
| 500 | 3200 | 155ms | 280ms | 420ms | 0% |
| 1000 | 4800 | 205ms | 450ms | 680ms | 0.1% |
| 2000 | 5200 | 380ms | 850ms | 1200ms | 2% |
优化后结果(启用多级缓存 + JVM 调优):
| 并发数 | QPS | 平均响应时间 | P95 响应时间 | P99 响应时间 | 错误率 |
|---|---|---|---|---|---|
| 100 | 1500 | 65ms | 95ms | 130ms | 0% |
| 500 | 5800 | 85ms | 150ms | 220ms | 0% |
| 1000 | 9500 | 105ms | 200ms | 320ms | 0% |
| 2000 | 12000 | 165ms | 350ms | 520ms | 0.05% |
通过缓存和 JVM 调优,系统性能提升了约 100-150%,P99 延迟降低了约 50%。
本文详细介绍了基于 Spring Cloud 微服务架构构建智能推荐系统的完整过程,从架构设计、模型选型、服务实现到部署优化,全方位展示了如何在企业级 Java 生态中集成 AI 能力。通过微服务架构,我们实现了特征工程与模型推理的解耦,使各组件可以独立开发、部署和扩展;通过服务熔断和降级策略,确保了系统的高可用性;通过多级缓存和 JVM 调优,显著提升了系统性能。
展望未来,推荐系统还有许多值得探索的方向:
1. 实时推荐:当前的系统主要基于离线计算的批量推荐,未来可以引入实时流处理技术(如 Flink),实现基于用户实时行为的毫秒级推荐更新。当用户浏览一个商品后,系统可以立即调整后续推荐内容,提升用户体验和转化率。
2. 多模态特征融合:随着多媒体内容的普及,推荐系统需要处理文本、图像、视频等多种模态的特征。可以引入视觉特征提取模型(如 CLIP、ResNet),实现'以图搜图'和跨模态推荐。例如,用户上传一张图片,系统可以推荐相似的商品。
3. 联邦学习与隐私保护:在数据隐私保护日益重要的背景下,联邦学习成为一种重要的技术方向。通过在用户设备本地进行模型训练,只上传模型参数而非原始数据,可以在保护用户隐私的同时实现个性化推荐。
4. AutoML 与模型自动迭代:引入 AutoML 技术,实现模型的自动训练、评估和部署。当新算法出现时,系统可以自动进行 A/B 测试,选择最优模型上线,形成闭环的算法迭代体系。
5. 大模型增强推荐:利用大语言模型(LLM)的强大理解能力,实现更精准的用户意图识别和推荐理由生成。LLM 可以根据用户对话历史生成更符合用户偏好的推荐结果,同时提供可解释的推荐理由。
AI 技术的快速发展为推荐系统带来了新的机遇和挑战。作为 Java 开发者,我们需要保持开放的心态,积极学习和拥抱新技术,在保证系统稳定性和可维护性的前提下,将 AI 能力优雅地集成到现有架构中。希望本文能够为读者提供有价值的参考,助力大家在 AI 时代的技术升级之路。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online