跳到主要内容
Spring Boot 消息队列与异步通信 | 极客日志
Java java
Spring Boot 消息队列与异步通信 Spring Boot 消息队列与异步通信涵盖消息队列定义、特点及 ActiveMQ、RabbitMQ、Kafka 集成方法。内容包括配置步骤、生产者消费者代码示例,以及@Async 注解和 CompletableFuture 异步通信实践。通过用户注册、订单处理等场景展示解耦与性能优化方案。
21.1 学习目标与重点提示
学习目标 :掌握 Spring Boot 消息队列与异步通信的核心概念与使用方法,包括消息队列的定义与特点、Spring Boot 与 ActiveMQ 的集成、Spring Boot 与 RabbitMQ 的集成、Spring Boot 与 Kafka 的集成、Spring Boot 异步通信的基本方法、Spring Boot 的实际应用场景,学会在实际开发中处理消息队列与异步通信问题。
重点 :消息队列的定义与特点 、Spring Boot 与 ActiveMQ 的集成 、Spring Boot 与 RabbitMQ 的集成 、Spring Boot 与 Kafka 的集成 、Spring Boot 异步通信的基本方法 、Spring Boot 的实际应用场景 。
21.2 消息队列概述
消息队列是 Java 开发中的重要组件。
21.2.1 消息队列的定义
定义 :消息队列是一种异步通信机制,用于在应用程序之间传递消息。
作用 :
实现应用程序之间的异步通信。
实现应用程序之间的解耦。
提高应用程序的性能。
常见的消息队列 :
ActiveMQ:Apache ActiveMQ 是一款开源的消息队列。
RabbitMQ:RabbitMQ 是一款开源的消息队列。
Kafka:Apache Kafka 是一款开源的消息队列。
✅ 结论 :消息队列是一种异步通信机制,作用是实现应用程序之间的异步通信、解耦、提高应用程序的性能。
21.2.2 消息队列的特点
定义 :消息队列的特点是指消息队列的特性。
特点 :
异步通信:消息发送者不需要等待消息接收者的响应。
解耦:消息发送者与消息接收者之间不需要直接通信。
可靠性:消息队列提供消息的可靠传输。
可扩展性:消息队列可以扩展到多个应用程序之间的通信。
✅ 结论 :消息队列的特点包括异步通信、解耦、可靠性、可扩展性。
21.3 Spring Boot 与 ActiveMQ 的集成
Spring Boot 与 ActiveMQ 的集成是 Java 开发中的重要内容。
21.3.1 集成 ActiveMQ 的步骤
定义 :集成 ActiveMQ 的步骤是指使用 Spring Boot 与 ActiveMQ 集成的方法。
步骤 :
创建 Spring Boot 项目。
添加所需的依赖。
配置 ActiveMQ。
创建消息生产者。
创建消息消费者。
测试应用。
示例 :
pom.xml 文件中的依赖:
<dependencies >
<dependency >
<groupId > org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-activemq
org.springframework.boot
spring-boot-starter-test
test
</groupId >
<artifactId >
</artifactId >
</dependency >
<dependency >
<groupId >
</groupId >
<artifactId >
</artifactId >
</dependency >
<dependency >
<groupId >
</groupId >
<artifactId >
</artifactId >
<scope >
</scope >
</dependency >
</dependencies >
application.properties 文件中的 ActiveMQ 配置:
# 服务器端口
server.port=8080
# ActiveMQ 配置
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer {
@Autowired
private JmsTemplate jmsTemplate;
public void sendMessage (String destination, String message) {
jmsTemplate.convertAndSend(destination, message);
System.out.println("发送消息:" + message);
}
}
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer {
@JmsListener(destination = "test-queue")
public void receiveMessage (String message) {
System.out.println("接收消息:" + message);
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
@Autowired
private MessageProducer messageProducer;
@GetMapping("/send")
public String sendMessage (@RequestParam String message) {
messageProducer.sendMessage("test-queue" , message);
return "消息发送成功" ;
}
}
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class ActiveMQApplicationTests {
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate restTemplate;
@Test
void contextLoads () {}
@Test
void testSendMessage () {
String message = "Hello, ActiveMQ!" ;
String response = restTemplate.getForObject("http://localhost:" + port + "/send?message=" + message, String.class);
assertThat(response).contains("消息发送成功" );
}
}
✅ 结论 :集成 ActiveMQ 的步骤包括创建 Spring Boot 项目、添加所需的依赖、配置 ActiveMQ、创建消息生产者、创建消息消费者、测试应用。
21.4 Spring Boot 与 RabbitMQ 的集成 Spring Boot 与 RabbitMQ 的集成是 Java 开发中的重要内容。
21.4.1 集成 RabbitMQ 的步骤 定义 :集成 RabbitMQ 的步骤是指使用 Spring Boot 与 RabbitMQ 集成的方法。
创建 Spring Boot 项目。
添加所需的依赖。
配置 RabbitMQ。
创建消息生产者。
创建消息消费者。
测试应用。
<dependencies >
<dependency >
<groupId > org.springframework.boot</groupId >
<artifactId > spring-boot-starter-web</artifactId >
</dependency >
<dependency >
<groupId > org.springframework.boot</groupId >
<artifactId > spring-boot-starter-amqp</artifactId >
</dependency >
<dependency >
<groupId > org.springframework.boot</groupId >
<artifactId > spring-boot-starter-test</artifactId >
<scope > test</scope >
</dependency >
</dependencies >
application.properties 文件中的 RabbitMQ 配置:
# 服务器端口
server.port=8080
# RabbitMQ 配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage (String exchange, String routingKey, String message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
System.out.println("发送消息:" + message);
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer {
@RabbitListener(queues = "test-queue")
public void receiveMessage (String message) {
System.out.println("接收消息:" + message);
}
}
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean
public Queue testQueue () {
return new Queue ("test-queue" , true );
}
@Bean
public DirectExchange testExchange () {
return new DirectExchange ("test-exchange" );
}
@Bean
public Binding testBinding () {
return BindingBuilder.bind(testQueue()).to(testExchange()).with("test-routing-key" );
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
@Autowired
private MessageProducer messageProducer;
@GetMapping("/send")
public String sendMessage (@RequestParam String message) {
messageProducer.sendMessage("test-exchange" , "test-routing-key" , message);
return "消息发送成功" ;
}
}
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class RabbitMQApplicationTests {
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate restTemplate;
@Test
void contextLoads () {}
@Test
void testSendMessage () {
String message = "Hello, RabbitMQ!" ;
String response = restTemplate.getForObject("http://localhost:" + port + "/send?message=" + message, String.class);
assertThat(response).contains("消息发送成功" );
}
}
✅ 结论 :集成 RabbitMQ 的步骤包括创建 Spring Boot 项目、添加所需的依赖、配置 RabbitMQ、创建消息生产者、创建消息消费者、测试应用。
21.5 Spring Boot 与 Kafka 的集成 Spring Boot 与 Kafka 的集成是 Java 开发中的重要内容。
21.5.1 集成 Kafka 的步骤 定义 :集成 Kafka 的步骤是指使用 Spring Boot 与 Kafka 集成的方法。
创建 Spring Boot 项目。
添加所需的依赖。
配置 Kafka。
创建消息生产者。
创建消息消费者。
测试应用。
<dependencies >
<dependency >
<groupId > org.springframework.boot</groupId >
<artifactId > spring-boot-starter-web</artifactId >
</dependency >
<dependency >
<groupId > org.springframework.kafka</groupId >
<artifactId > spring-kafka</artifactId >
</dependency >
<dependency >
<groupId > org.springframework.boot</groupId >
<artifactId > spring-boot-starter-test</artifactId >
<scope > test</scope >
</dependency >
</dependencies >
application.properties 文件中的 Kafka 配置:
# 服务器端口
server.port=8080
# Kafka 配置
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage (String topic, String message) {
kafkaTemplate.send(topic, message);
System.out.println("发送消息:" + message);
}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer {
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void receiveMessage (String message) {
System.out.println("接收消息:" + message);
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
@Autowired
private MessageProducer messageProducer;
@GetMapping("/send")
public String sendMessage (@RequestParam String message) {
messageProducer.sendMessage("test-topic" , message);
return "消息发送成功" ;
}
}
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class KafkaApplicationTests {
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate restTemplate;
@Test
void contextLoads () {}
@Test
void testSendMessage () {
String message = "Hello, Kafka!" ;
String response = restTemplate.getForObject("http://localhost:" + port + "/send?message=" + message, String.class);
assertThat(response).contains("消息发送成功" );
}
}
✅ 结论 :集成 Kafka 的步骤包括创建 Spring Boot 项目、添加所需的依赖、配置 Kafka、创建消息生产者、创建消息消费者、测试应用。
21.6 Spring Boot 异步通信的基本方法 Spring Boot 异步通信的基本方法包括使用@Async 注解、使用 CompletableFuture、使用消息队列。
21.6.1 使用@Async 注解 定义 :使用@Async 注解是指使用 Spring Boot 异步通信的基本方法之一。
<dependencies >
<dependency >
<groupId > org.springframework.boot</groupId >
<artifactId > spring-boot-starter-web</artifactId >
</dependency >
<dependency >
<groupId > org.springframework.boot</groupId >
<artifactId > spring-boot-starter-test</artifactId >
<scope > test</scope >
</dependency >
</dependencies >
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
@Configuration
@EnableAsync
public class AsyncConfig {}
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
public class AsyncService {
@Async
public void asyncMethod () {
System.out.println("异步方法执行:" + Thread.currentThread().getName());
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class AsyncController {
@Autowired
private AsyncService asyncService;
@GetMapping("/async")
public String asyncMethod () {
System.out.println("主线程执行:" + Thread.currentThread().getName());
asyncService.asyncMethod();
return "异步方法调用成功" ;
}
}
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class AsyncApplicationTests {
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate restTemplate;
@Test
void contextLoads () {}
@Test
void testAsyncMethod () {
String response = restTemplate.getForObject("http://localhost:" + port + "/async" , String.class);
assertThat(response).contains("异步方法调用成功" );
}
}
✅ 结论 :使用@Async 注解是指使用 Spring Boot 异步通信的基本方法之一,作用是实现异步通信、提高应用程序的性能。
21.6.2 使用 CompletableFuture 定义 :使用 CompletableFuture 是指使用 Spring Boot 异步通信的基本方法之一。
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@RestController
public class CompletableFutureController {
@GetMapping("/completableFuture")
public String completableFuture () throws ExecutionException, InterruptedException {
System.out.println("主线程执行:" + Thread.currentThread().getName());
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("异步方法执行:" + Thread.currentThread().getName());
});
future.get();
return "CompletableFuture 调用成功" ;
}
}
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class CompletableFutureApplicationTests {
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate restTemplate;
@Test
void contextLoads () {}
@Test
void testCompletableFuture () {
String response = restTemplate.getForObject("http://localhost:" + port + "/completableFuture" , String.class);
assertThat(response).contains("CompletableFuture 调用成功" );
}
}
✅ 结论 :使用 CompletableFuture 是指使用 Spring Boot 异步通信的基本方法之一,作用是实现异步通信、提高应用程序的性能。
21.7 Spring Boot 的实际应用场景 在实际开发中,Spring Boot 消息队列与异步通信的应用场景非常广泛,如:
实现用户注册的异步处理。
实现订单的异步处理。
实现邮件发送的异步处理。
实现日志的异步处理。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
@EnableAsync
public class UserRegistrationApplication {
public static void main (String[] args) {
SpringApplication.run(UserRegistrationApplication.class, args);
}
}
@Service
class UserRegistrationService {
@Async
public void sendWelcomeEmail (String email) {
System.out.println("发送欢迎邮件:" + email);
try {
Thread.sleep(2000 );
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("邮件发送成功:" + email);
}
}
@RestController
class UserRegistrationController {
@Autowired
private UserRegistrationService userRegistrationService;
@GetMapping("/register")
public String registerUser (String email) {
System.out.println("用户注册:" + email);
userRegistrationService.sendWelcomeEmail(email);
return "用户注册成功" ;
}
}
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class UserRegistrationApplicationTests {
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate restTemplate;
@Test
void contextLoads () {}
@Test
void testRegisterUser () {
String email = "[email protected] " ;
String response = restTemplate.getForObject("http://localhost:" + port + "/register?email=" + email, String.class);
assertThat(response).contains("用户注册成功" );
}
}
用户注册:test @example.com
发送欢迎邮件:test @example.com
邮件发送成功:test @example.com
✅ 结论 :在实际开发中,Spring Boot 消息队列与异步通信的应用场景非常广泛,需要根据实际问题选择合适的异步通信方法。
总结 本章我们学习了 Spring Boot 消息队列与异步通信,包括消息队列的定义与特点、Spring Boot 与 ActiveMQ 的集成、Spring Boot 与 RabbitMQ 的集成、Spring Boot 与 Kafka 的集成、Spring Boot 异步通信的基本方法、Spring Boot 的实际应用场景,学会了在实际开发中处理消息队列与异步通信问题。其中,消息队列的定义与特点、Spring Boot 与 ActiveMQ 的集成、Spring Boot 与 RabbitMQ 的集成、Spring Boot 与 Kafka 的集成、Spring Boot 异步通信的基本方法、Spring Boot 的实际应用场景是本章的重点内容。从下一章开始,我们将学习 Spring Boot 的其他组件、微服务等内容。
相关免费在线工具 Keycode 信息 查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
Escape 与 Native 编解码 JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
JavaScript / HTML 格式化 使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
JavaScript 压缩与混淆 Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online