Spring WebFlux 深度实践指南

文章目录

Spring WebFlux 是 Spring Framework 5 引入的响应式 Web 框架,基于 Project Reactor 实现,支持非阻塞、函数式编程模型。本节将深入探讨 WebFlux 的核心功能,包括 REST API 构建、响应式数据库访问和实时通信。
4.3.1 构建 Reactive REST API

基础项目搭建
首先创建 Spring Boot WebFlux 项目(基于 Spring Initializr):
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>响应式控制器
注解式控制器(与传统Spring MVC类似但支持响应式类型):
@RestController@RequestMapping("/products")publicclassProductController{privatefinalProductService productService;// 构造函数注入publicProductController(ProductService productService){this.productService = productService;}@GetMappingpublicFlux<Product>getAllProducts(){return productService.findAll();}@GetMapping("/{id}")publicMono<Product>getProductById(@PathVariableString id){return productService.findById(id);}@PostMapping@ResponseStatus(HttpStatus.CREATED)publicMono<Product>createProduct(@RequestBodyMono<Product> productMono){return productService.save(productMono);}@PutMapping("/{id}")publicMono<Product>updateProduct(@PathVariableString id,@RequestBodyMono<Product> productMono){return productService.update(id, productMono);}@DeleteMapping("/{id}")@ResponseStatus(HttpStatus.NO_CONTENT)publicMono<Void>deleteProduct(@PathVariableString id){return productService.delete(id);}}函数式端点(RouterFunction方式):
@ConfigurationpublicclassProductRouter{@BeanpublicRouterFunction<ServerResponse>route(ProductHandler handler){returnRouterFunctions.route().GET("/fn/products", handler::getAll).GET("/fn/products/{id}", handler::getById).POST("/fn/products", handler::create).PUT("/fn/products/{id}", handler::update).DELETE("/fn/products/{id}", handler::delete).build();}}@ComponentpublicclassProductHandler{privatefinalProductService productService;publicProductHandler(ProductService productService){this.productService = productService;}publicMono<ServerResponse>getAll(ServerRequest request){returnServerResponse.ok().contentType(MediaType.APPLICATION_NDJSON).body(productService.findAll(),Product.class);}publicMono<ServerResponse>getById(ServerRequest request){String id = request.pathVariable("id");return productService.findById(id).flatMap(product ->ServerResponse.ok().bodyValue(product)).switchIfEmpty(ServerResponse.notFound().build());}publicMono<ServerResponse>create(ServerRequest request){return request.bodyToMono(Product.class).flatMap(productService::save).flatMap(product ->ServerResponse.created(URI.create("/fn/products/"+ product.getId())).bodyValue(product));}publicMono<ServerResponse>update(ServerRequest request){String id = request.pathVariable("id");return request.bodyToMono(Product.class).flatMap(product -> productService.update(id,Mono.just(product))).flatMap(product ->ServerResponse.ok().bodyValue(product)).switchIfEmpty(ServerResponse.notFound().build());}publicMono<ServerResponse>delete(ServerRequest request){String id = request.pathVariable("id");return productService.delete(id).then(ServerResponse.noContent().build());}}高级特性

流式响应(Server-Sent Events):
@GetMapping(value ="/stream", produces =MediaType.TEXT_EVENT_STREAM_VALUE)publicFlux<ProductEvent>streamProducts(){returnFlux.interval(Duration.ofSeconds(1)).map(sequence ->newProductEvent("product-"+ sequence,"Event at "+Instant.now()));}请求验证与异常处理:
@ControllerAdvicepublicclassGlobalErrorHandlerextendsAbstractErrorWebExceptionHandler{publicGlobalErrorHandler(ErrorAttributes errorAttributes,WebProperties.Resources resources,ApplicationContext applicationContext,ServerCodecConfigurer serverCodecConfigurer){super(errorAttributes, resources, applicationContext);this.setMessageWriters(serverCodecConfigurer.getWriters());}@OverrideprotectedRouterFunction<ServerResponse>getRoutingFunction(ErrorAttributes errorAttributes){returnRouterFunctions.route(RequestPredicates.all(), request ->{Map<String,Object> errorProperties =getErrorAttributes(request,ErrorAttributeOptions.defaults());HttpStatus status =getHttpStatus(errorProperties);returnServerResponse.status(status).contentType(MediaType.APPLICATION_JSON).bodyValue(errorProperties);});}privateHttpStatusgetHttpStatus(Map<String,Object> errorProperties){returnHttpStatus.valueOf((Integer)errorProperties.get("status"));}}// 自定义验证publicclassProductValidator{publicstaticMono<Product>validate(Product product){returnMono.just(product).flatMap(p ->{List<String> errors =newArrayList<>();if(p.getName()==null|| p.getName().isEmpty()){ errors.add("Product name is required");}if(p.getPrice()<=0){ errors.add("Price must be positive");}if(!errors.isEmpty()){returnMono.error(newValidationException(errors));}returnMono.just(p);});}}4.3.2 响应式数据库访问(R2DBC)

R2DBC 配置
添加依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-r2dbc</artifactId></dependency><dependency><groupId>io.r2dbc</groupId><artifactId>r2dbc-postgresql</artifactId><scope>runtime</scope></dependency>配置 application.yml:
spring:r2dbc:url: r2dbc:postgresql://localhost:5432/mydb username: user password: pass pool:enabled:truemax-size:20响应式Repository
定义实体:
@Data@Table("products")publicclassProduct{@IdprivateLong id;privateString name;privateString description;privateBigDecimal price;privateInstant createdAt;}创建Repository接口:
publicinterfaceProductRepositoryextendsReactiveCrudRepository<Product,Long>{Flux<Product>findByNameContaining(String name);@Query("SELECT * FROM products WHERE price > :minPrice")Flux<Product>findByPriceGreaterThan(BigDecimal minPrice);@Modifying@Query("UPDATE products SET price = price * :factor")Mono<Integer>updateAllPrices(BigDecimal factor);}复杂查询与事务

自定义查询实现:
publicclassProductRepositoryImplimplementsCustomProductRepository{privatefinalDatabaseClient databaseClient;publicProductRepositoryImpl(DatabaseClient databaseClient){this.databaseClient = databaseClient;}@OverridepublicFlux<Product>complexSearch(ProductCriteria criteria){return databaseClient.sql(""" SELECT * FROM products WHERE name LIKE :name AND price BETWEEN :minPrice AND :maxPrice ORDER BY :sortField :sortDirection LIMIT :limit OFFSET :offset """).bind("name","%"+ criteria.getName()+"%").bind("minPrice", criteria.getMinPrice()).bind("maxPrice", criteria.getMaxPrice()).bind("sortField", criteria.getSortField()).bind("sortDirection", criteria.getSortDirection()).bind("limit", criteria.getPageSize()).bind("offset",(criteria.getPageNumber()-1)* criteria.getPageSize()).map((row, metadata)->toProduct(row)).all();}privateProducttoProduct(Row row){// 行到对象的转换逻辑}}事务管理:
@Service@RequiredArgsConstructorpublicclassProductService{privatefinalProductRepository productRepository;privatefinalTransactionalOperator transactionalOperator;publicMono<Void>transferStock(String fromId,String toId,int quantity){return transactionalOperator.execute(status -> productRepository.findById(fromId).flatMap(fromProduct ->{if(fromProduct.getStock()< quantity){returnMono.error(newInsufficientStockException());} fromProduct.setStock(fromProduct.getStock()- quantity);return productRepository.save(fromProduct).then(productRepository.findById(toId)).flatMap(toProduct ->{ toProduct.setStock(toProduct.getStock()+ quantity);return productRepository.save(toProduct);});}));}}性能优化
- 连接池配置:
spring:r2dbc:pool:max-size:20initial-size:5max-idle-time: 30m - 批处理操作:
publicMono<Integer>batchInsert(List<Product> products){return databaseClient.inConnectionMany(connection ->{Batch batch = connection.createBatch(); products.forEach(product -> batch.add("INSERT INTO products(name, price) VALUES($1, $2)").bind(0, product.getName()).bind(1, product.getPrice()));returnFlux.from(batch.execute()).reduce(0,(count, result)-> count + result.getRowsUpdated());});}4.3.3 WebSocket 实时通信

基础WebSocket配置
配置类:
@Configuration@EnableWebFluxpublicclassWebSocketConfigimplementsWebSocketMessageBrokerConfigurer{@OverridepublicvoidregisterStompEndpoints(StompEndpointRegistry registry){ registry.addEndpoint("/ws").setHandshakeHandler(newDefaultHandshakeHandler()).setAllowedOrigins("*");}@OverridepublicvoidconfigureMessageBroker(MessageBrokerRegistry registry){ registry.enableSimpleBroker("/topic"); registry.setApplicationDestinationPrefixes("/app");}}响应式WebSocket处理
股票行情推送示例:
@ControllerpublicclassStockTickerController{privatefinalFlux<StockQuote> stockQuoteFlux;publicStockTickerController(StockQuoteGenerator quoteGenerator){this.stockQuoteFlux =Flux.interval(Duration.ofMillis(500)).map(sequence -> quoteGenerator.generate()).share();// 热发布,多个订阅者共享数据}@MessageMapping("stocks.subscribe")@SendTo("/topic/stocks")publicFlux<StockQuote>subscribe(){return stockQuoteFlux;}@MessageMapping("stocks.filter")publicFlux<StockQuote>filter(@PayloadString symbol){return stockQuoteFlux.filter(quote -> quote.getSymbol().equals(symbol));}}客户端连接示例:
const socket =newSockJS('/ws');const stompClient = Stomp.over(socket); stompClient.connect({},()=>{ stompClient.subscribe('/topic/stocks',(message)=>{const quote =JSON.parse(message.body);updateStockTable(quote);}); stompClient.send("/app/stocks.filter",{},"AAPL");});高级特性

RSocket集成(更强大的响应式协议):
@Controller@MessageMapping("stock.service")publicclassRSocketStockController{@MessageMapping("current")publicMono<StockQuote>current(String symbol){return stockService.getCurrent(symbol);}@MessageMapping("stream")publicFlux<StockQuote>stream(String symbol){return stockService.getStream(symbol);}@MessageMapping("channel")publicFlux<StockQuote>channel(Flux<String> symbols){return symbols.flatMap(stockService::getStream);}}背压控制:
@MessageMapping("large.data.stream")publicFlux<DataChunk>largeDataStream(){return dataService.streamLargeData().onBackpressureBuffer(50,// 缓冲区大小 chunk -> log.warn("Dropping chunk due to backpressure"));}安全配置
@Configuration@EnableWebFluxSecuritypublicclassSecurityConfig{@BeanpublicSecurityWebFilterChainsecurityFilterChain(ServerHttpSecurity http){return http .authorizeExchange().pathMatchers("/ws/**").authenticated().anyExchange().permitAll().and().httpBasic().and().csrf().disable().build();}@BeanpublicMapReactiveUserDetailsServiceuserDetailsService(){UserDetails user =User.withUsername("user").password("{noop}password").roles("USER").build();returnnewMapReactiveUserDetailsService(user);}}性能监控与最佳实践

监控端点配置
management:endpoints:web:exposure:include: health, metrics, prometheus metrics:tags:application: ${spring.application.name}响应式应用监控
@BeanpublicMeterRegistryCustomizer<MeterRegistry>metricsCommonTags(){return registry -> registry.config().commonTags("application","reactive-demo");}// 自定义指标@BeanpublicWebFiltermetricsWebFilter(MeterRegistry registry){return(exchange, chain)->{String path = exchange.getRequest().getPath().toString();Timer.Sample sample =Timer.start(registry);return chain.filter(exchange).doOnSuccessOrError((done, ex)->{ sample.stop(registry.timer("http.requests","uri", path,"status", exchange.getResponse().getStatusCode().toString(),"method", exchange.getRequest().getMethodValue()));});};}最佳实践总结
- 线程模型理解:
- WebFlux 默认使用 Netty 事件循环线程
- 阻塞操作必须使用
publishOn切换到弹性线程池
- 背压策略选择:
- UI 客户端:使用
onBackpressureDrop或onBackpressureLatest - 服务间通信:使用
onBackpressureBuffer配合合理缓冲区大小
- UI 客户端:使用
- 错误处理原则:
- 尽早处理错误
- 为每个 Flux/Mono 链添加错误处理
- 区分业务异常和系统异常
- 测试策略:
- 使用
StepVerifier测试响应式流 - 使用
WebTestClient测试控制器 - 虚拟时间测试长时间操作
- 使用
- 性能调优:
- 合理配置连接池
- 监控关键指标(延迟、吞吐量、资源使用率)
- 使用响应式日志框架(如 Logback 异步Appender)
通过以上全面实践,您将能够构建高性能、可扩展的响应式 Web 应用,充分利用 WebFlux 的非阻塞特性,处理高并发场景下的各种挑战。