Spring WebFlux 深度实践指南

Spring WebFlux 深度实践指南
在这里插入图片描述

文章目录

在这里插入图片描述

Spring WebFlux 是 Spring Framework 5 引入的响应式 Web 框架,基于 Project Reactor 实现,支持非阻塞、函数式编程模型。本节将深入探讨 WebFlux 的核心功能,包括 REST API 构建、响应式数据库访问和实时通信。

4.3.1 构建 Reactive REST API

在这里插入图片描述
基础项目搭建

首先创建 Spring Boot WebFlux 项目(基于 Spring Initializr):

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>
响应式控制器

注解式控制器(与传统Spring MVC类似但支持响应式类型):

@RestController@RequestMapping("/products")publicclassProductController{privatefinalProductService productService;// 构造函数注入publicProductController(ProductService productService){this.productService = productService;}@GetMappingpublicFlux<Product>getAllProducts(){return productService.findAll();}@GetMapping("/{id}")publicMono<Product>getProductById(@PathVariableString id){return productService.findById(id);}@PostMapping@ResponseStatus(HttpStatus.CREATED)publicMono<Product>createProduct(@RequestBodyMono<Product> productMono){return productService.save(productMono);}@PutMapping("/{id}")publicMono<Product>updateProduct(@PathVariableString id,@RequestBodyMono<Product> productMono){return productService.update(id, productMono);}@DeleteMapping("/{id}")@ResponseStatus(HttpStatus.NO_CONTENT)publicMono<Void>deleteProduct(@PathVariableString id){return productService.delete(id);}}

函数式端点(RouterFunction方式):

@ConfigurationpublicclassProductRouter{@BeanpublicRouterFunction<ServerResponse>route(ProductHandler handler){returnRouterFunctions.route().GET("/fn/products", handler::getAll).GET("/fn/products/{id}", handler::getById).POST("/fn/products", handler::create).PUT("/fn/products/{id}", handler::update).DELETE("/fn/products/{id}", handler::delete).build();}}@ComponentpublicclassProductHandler{privatefinalProductService productService;publicProductHandler(ProductService productService){this.productService = productService;}publicMono<ServerResponse>getAll(ServerRequest request){returnServerResponse.ok().contentType(MediaType.APPLICATION_NDJSON).body(productService.findAll(),Product.class);}publicMono<ServerResponse>getById(ServerRequest request){String id = request.pathVariable("id");return productService.findById(id).flatMap(product ->ServerResponse.ok().bodyValue(product)).switchIfEmpty(ServerResponse.notFound().build());}publicMono<ServerResponse>create(ServerRequest request){return request.bodyToMono(Product.class).flatMap(productService::save).flatMap(product ->ServerResponse.created(URI.create("/fn/products/"+ product.getId())).bodyValue(product));}publicMono<ServerResponse>update(ServerRequest request){String id = request.pathVariable("id");return request.bodyToMono(Product.class).flatMap(product -> productService.update(id,Mono.just(product))).flatMap(product ->ServerResponse.ok().bodyValue(product)).switchIfEmpty(ServerResponse.notFound().build());}publicMono<ServerResponse>delete(ServerRequest request){String id = request.pathVariable("id");return productService.delete(id).then(ServerResponse.noContent().build());}}
高级特性
在这里插入图片描述


流式响应(Server-Sent Events):

@GetMapping(value ="/stream", produces =MediaType.TEXT_EVENT_STREAM_VALUE)publicFlux<ProductEvent>streamProducts(){returnFlux.interval(Duration.ofSeconds(1)).map(sequence ->newProductEvent("product-"+ sequence,"Event at "+Instant.now()));}

请求验证与异常处理

@ControllerAdvicepublicclassGlobalErrorHandlerextendsAbstractErrorWebExceptionHandler{publicGlobalErrorHandler(ErrorAttributes errorAttributes,WebProperties.Resources resources,ApplicationContext applicationContext,ServerCodecConfigurer serverCodecConfigurer){super(errorAttributes, resources, applicationContext);this.setMessageWriters(serverCodecConfigurer.getWriters());}@OverrideprotectedRouterFunction<ServerResponse>getRoutingFunction(ErrorAttributes errorAttributes){returnRouterFunctions.route(RequestPredicates.all(), request ->{Map<String,Object> errorProperties =getErrorAttributes(request,ErrorAttributeOptions.defaults());HttpStatus status =getHttpStatus(errorProperties);returnServerResponse.status(status).contentType(MediaType.APPLICATION_JSON).bodyValue(errorProperties);});}privateHttpStatusgetHttpStatus(Map<String,Object> errorProperties){returnHttpStatus.valueOf((Integer)errorProperties.get("status"));}}// 自定义验证publicclassProductValidator{publicstaticMono<Product>validate(Product product){returnMono.just(product).flatMap(p ->{List<String> errors =newArrayList<>();if(p.getName()==null|| p.getName().isEmpty()){ errors.add("Product name is required");}if(p.getPrice()<=0){ errors.add("Price must be positive");}if(!errors.isEmpty()){returnMono.error(newValidationException(errors));}returnMono.just(p);});}}

4.3.2 响应式数据库访问(R2DBC)

在这里插入图片描述
R2DBC 配置

添加依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-r2dbc</artifactId></dependency><dependency><groupId>io.r2dbc</groupId><artifactId>r2dbc-postgresql</artifactId><scope>runtime</scope></dependency>

配置 application.yml:

spring:r2dbc:url: r2dbc:postgresql://localhost:5432/mydb username: user password: pass pool:enabled:truemax-size:20
响应式Repository

定义实体:

@Data@Table("products")publicclassProduct{@IdprivateLong id;privateString name;privateString description;privateBigDecimal price;privateInstant createdAt;}

创建Repository接口:

publicinterfaceProductRepositoryextendsReactiveCrudRepository<Product,Long>{Flux<Product>findByNameContaining(String name);@Query("SELECT * FROM products WHERE price > :minPrice")Flux<Product>findByPriceGreaterThan(BigDecimal minPrice);@Modifying@Query("UPDATE products SET price = price * :factor")Mono<Integer>updateAllPrices(BigDecimal factor);}
复杂查询与事务
在这里插入图片描述


自定义查询实现

publicclassProductRepositoryImplimplementsCustomProductRepository{privatefinalDatabaseClient databaseClient;publicProductRepositoryImpl(DatabaseClient databaseClient){this.databaseClient = databaseClient;}@OverridepublicFlux<Product>complexSearch(ProductCriteria criteria){return databaseClient.sql(""" SELECT * FROM products WHERE name LIKE :name AND price BETWEEN :minPrice AND :maxPrice ORDER BY :sortField :sortDirection LIMIT :limit OFFSET :offset """).bind("name","%"+ criteria.getName()+"%").bind("minPrice", criteria.getMinPrice()).bind("maxPrice", criteria.getMaxPrice()).bind("sortField", criteria.getSortField()).bind("sortDirection", criteria.getSortDirection()).bind("limit", criteria.getPageSize()).bind("offset",(criteria.getPageNumber()-1)* criteria.getPageSize()).map((row, metadata)->toProduct(row)).all();}privateProducttoProduct(Row row){// 行到对象的转换逻辑}}

事务管理

@Service@RequiredArgsConstructorpublicclassProductService{privatefinalProductRepository productRepository;privatefinalTransactionalOperator transactionalOperator;publicMono<Void>transferStock(String fromId,String toId,int quantity){return transactionalOperator.execute(status -> productRepository.findById(fromId).flatMap(fromProduct ->{if(fromProduct.getStock()< quantity){returnMono.error(newInsufficientStockException());} fromProduct.setStock(fromProduct.getStock()- quantity);return productRepository.save(fromProduct).then(productRepository.findById(toId)).flatMap(toProduct ->{ toProduct.setStock(toProduct.getStock()+ quantity);return productRepository.save(toProduct);});}));}}
性能优化
  1. 连接池配置
spring:r2dbc:pool:max-size:20initial-size:5max-idle-time: 30m 
  1. 批处理操作
publicMono<Integer>batchInsert(List<Product> products){return databaseClient.inConnectionMany(connection ->{Batch batch = connection.createBatch(); products.forEach(product -> batch.add("INSERT INTO products(name, price) VALUES($1, $2)").bind(0, product.getName()).bind(1, product.getPrice()));returnFlux.from(batch.execute()).reduce(0,(count, result)-> count + result.getRowsUpdated());});}

4.3.3 WebSocket 实时通信

在这里插入图片描述
基础WebSocket配置

配置类:

@Configuration@EnableWebFluxpublicclassWebSocketConfigimplementsWebSocketMessageBrokerConfigurer{@OverridepublicvoidregisterStompEndpoints(StompEndpointRegistry registry){ registry.addEndpoint("/ws").setHandshakeHandler(newDefaultHandshakeHandler()).setAllowedOrigins("*");}@OverridepublicvoidconfigureMessageBroker(MessageBrokerRegistry registry){ registry.enableSimpleBroker("/topic"); registry.setApplicationDestinationPrefixes("/app");}}
响应式WebSocket处理

股票行情推送示例

@ControllerpublicclassStockTickerController{privatefinalFlux<StockQuote> stockQuoteFlux;publicStockTickerController(StockQuoteGenerator quoteGenerator){this.stockQuoteFlux =Flux.interval(Duration.ofMillis(500)).map(sequence -> quoteGenerator.generate()).share();// 热发布,多个订阅者共享数据}@MessageMapping("stocks.subscribe")@SendTo("/topic/stocks")publicFlux<StockQuote>subscribe(){return stockQuoteFlux;}@MessageMapping("stocks.filter")publicFlux<StockQuote>filter(@PayloadString symbol){return stockQuoteFlux.filter(quote -> quote.getSymbol().equals(symbol));}}

客户端连接示例

const socket =newSockJS('/ws');const stompClient = Stomp.over(socket); stompClient.connect({},()=>{ stompClient.subscribe('/topic/stocks',(message)=>{const quote =JSON.parse(message.body);updateStockTable(quote);}); stompClient.send("/app/stocks.filter",{},"AAPL");});
高级特性
在这里插入图片描述


RSocket集成(更强大的响应式协议):

@Controller@MessageMapping("stock.service")publicclassRSocketStockController{@MessageMapping("current")publicMono<StockQuote>current(String symbol){return stockService.getCurrent(symbol);}@MessageMapping("stream")publicFlux<StockQuote>stream(String symbol){return stockService.getStream(symbol);}@MessageMapping("channel")publicFlux<StockQuote>channel(Flux<String> symbols){return symbols.flatMap(stockService::getStream);}}

背压控制

@MessageMapping("large.data.stream")publicFlux<DataChunk>largeDataStream(){return dataService.streamLargeData().onBackpressureBuffer(50,// 缓冲区大小 chunk -> log.warn("Dropping chunk due to backpressure"));}
安全配置
@Configuration@EnableWebFluxSecuritypublicclassSecurityConfig{@BeanpublicSecurityWebFilterChainsecurityFilterChain(ServerHttpSecurity http){return http .authorizeExchange().pathMatchers("/ws/**").authenticated().anyExchange().permitAll().and().httpBasic().and().csrf().disable().build();}@BeanpublicMapReactiveUserDetailsServiceuserDetailsService(){UserDetails user =User.withUsername("user").password("{noop}password").roles("USER").build();returnnewMapReactiveUserDetailsService(user);}}

性能监控与最佳实践

在这里插入图片描述

监控端点配置

management:endpoints:web:exposure:include: health, metrics, prometheus metrics:tags:application: ${spring.application.name}

响应式应用监控

@BeanpublicMeterRegistryCustomizer<MeterRegistry>metricsCommonTags(){return registry -> registry.config().commonTags("application","reactive-demo");}// 自定义指标@BeanpublicWebFiltermetricsWebFilter(MeterRegistry registry){return(exchange, chain)->{String path = exchange.getRequest().getPath().toString();Timer.Sample sample =Timer.start(registry);return chain.filter(exchange).doOnSuccessOrError((done, ex)->{ sample.stop(registry.timer("http.requests","uri", path,"status", exchange.getResponse().getStatusCode().toString(),"method", exchange.getRequest().getMethodValue()));});};}

最佳实践总结

  1. 线程模型理解
    • WebFlux 默认使用 Netty 事件循环线程
    • 阻塞操作必须使用 publishOn 切换到弹性线程池
  2. 背压策略选择
    • UI 客户端:使用 onBackpressureDroponBackpressureLatest
    • 服务间通信:使用 onBackpressureBuffer 配合合理缓冲区大小
  3. 错误处理原则
    • 尽早处理错误
    • 为每个 Flux/Mono 链添加错误处理
    • 区分业务异常和系统异常
  4. 测试策略
    • 使用 StepVerifier 测试响应式流
    • 使用 WebTestClient 测试控制器
    • 虚拟时间测试长时间操作
  5. 性能调优
    • 合理配置连接池
    • 监控关键指标(延迟、吞吐量、资源使用率)
    • 使用响应式日志框架(如 Logback 异步Appender)

通过以上全面实践,您将能够构建高性能、可扩展的响应式 Web 应用,充分利用 WebFlux 的非阻塞特性,处理高并发场景下的各种挑战。

Read more

[DeepSeek] 入门详细指南(上)

[DeepSeek] 入门详细指南(上)

前言 今天的是 zty 写DeepSeek的第1篇文章,这个系列我也不知道能更多久,大约是一周一更吧,然后跟C++的知识详解换着更。 来冲个100赞兄弟们 最近啊,浙江出现了一匹AI界的黑马——DeepSeek。这个名字可能对很多人来说还比较陌生,但它已经在全球范围内引发了巨大的关注,甚至让一些科技巨头感到了压力。简单来说这 DeepSeek足以改变世界格局                                                   先   赞   后   看    养   成   习   惯  众所周知,一篇文章需要一个头图                                                   先   赞   后   看    养   成   习   惯   上面那行字怎么读呢,让大家来跟我一起读一遍吧,先~赞~后~看~养~成~习~惯~ 想要 DeepSeek从入门到精通.pdf 文件的加这个企鹅群:953793685(

By Ne0inhk
DeepFace深度学习库+OpenCV实现——情绪分析器

DeepFace深度学习库+OpenCV实现——情绪分析器

目录 应用场景 实现组件 1. 硬件组件 2. 软件库与依赖 3. 功能模块 代码详解(实现思路) 导入必要的库 打开摄像头并初始化变量 主循环 FPS计算 情绪分析及结果展示 显示FPS和图像 退出条件 编辑 完整代码 效果展示 自然的 开心的 伤心的 恐惧的 惊讶的  效果展示 自然的 开心的 伤心的 恐惧的 惊讶的   应用场景         应用场景比较广泛,尤其是在需要了解和分析人类情感反应的场合。: 1. 心理健康评估:在心理健康领域,可以通过长期监控和分析一个人的情绪变化来辅助医生进行诊断或治疗效果评估。 2. 用户体验研究:在产品设计、广告制作或网站开发过程中,通过观察用户在使用过程中的情绪反应,来优化产品的用户体验。 3. 互动娱乐:在游戏或虚拟现实应用中,根据玩家的情绪状态动态调整游戏难度或故事情节,以增加沉浸感和互动性。

By Ne0inhk
最全java面试题及答案(208道)

最全java面试题及答案(208道)

本文分为十九个模块,分别是:「Java 基础、容器、多线程、反射、对象拷贝、Java Web 、异常、网络、设计模式、Spring/Spring MVC、Spring Boot/Spring Cloud、Hibernate、MyBatis、RabbitMQ、Kafka、Zookeeper、MySQL、Redis、JVM」 ,如下图所示: 共包含 208 道面试题,本文的宗旨是为读者朋友们整理一份详实而又权威的面试清单,下面一起进入主题吧。 Java 基础 1. JDK 和 JRE 有什么区别? * JDK:Java Development Kit 的简称,Java 开发工具包,提供了 Java

By Ne0inhk
10分钟打造专属AI助手!ToDesk云电脑/顺网云/海马云操作DeepSeek哪家强?

10分钟打造专属AI助手!ToDesk云电脑/顺网云/海马云操作DeepSeek哪家强?

文章目录 * 一、引言 * 云计算平台概览 * ToDesk云电脑:随时随地用上高性能电脑 * 二 .云电脑初体验 * DeekSeek介绍 * 版本参数与特点 * 任务类型表现 * 1、ToDesk云电脑 * 2、顺网云电脑 * 3、海马云电脑 * 三、DeekSeek本地化实操和AIGC应用 * 1. ToDesk云电脑 * 2. 海马云电脑 * 3、顺网云电脑 * 四、结语 * 总结:云电脑如何选择? 一、引言 DeepSeek这些大模型让 AI 开发变得越来越有趣,但真要跑起来,可没那么简单! * 本地配置太麻烦:显卡不够、驱动难装、环境冲突,光是折腾这些就让人心态崩了。 * 云端性能参差不齐:选错云电脑,可能卡到爆、加载慢,还容易掉线,搞得效率直线下降。 * 成本难控:有的平台按小时计费,价格一会儿一个样,

By Ne0inhk