事务
RabbitMQ 是基于 AMQP 协议实现的,该协议实现了事务机制,因此 RabbitMQ 也支持事务机制。Spring AMQP 也提供了对事务相关的操作。RabbitMQ 事务允许开发者确保消息的发送和接收是原⼦性的,要么全部成功,要么全部失败。
添加配置
spring:
application:
name: rabbit-extensions-demo
rabbitmq:
addresses: amqp://study:[email protected]:5672/extension
常量类
public class Constants {
// 事务队列名称
public static final String TRANS_QUEUE = "trans.queue";
}
声明队列
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitextensionsdemo.constant.Constants;
@Configuration
public class RabbitMQConfig {
@Bean("transQueue")
public Queue transQueue() {
return QueueBuilder.durable(Constants.TRANS_QUEUE).build();
}
}
设置 RabbitTemplate
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitTemplateConfig {
@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true); // 开启事务
return rabbitTemplate;
}
}
编写生产消息代码 1
@Transactional
@RequestMapping("/trans")
public String trans() {
transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 1...");
transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 2...");
return "消息发送成功";
}
观察效果


此时我们可以看到,两条消息都进入了队列。
编写生产消息代码 2
@Transactional
@RequestMapping("/trans")
public String trans() {
transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 1...");
int num = 5 / 0; // 模拟异常
transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, "trans test 2...");
return "消息发送成功";
}
观察效果



此时我们发现,抛异常了,但是队列中居然有 1 条消息,怎么回事?我们不是开启事务了吗?
原因是因为我们没有配置事务管理器。
配置事务管理器
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitTemplateConfig {
@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true); // 开启事务
return rabbitTemplate;
}
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
}
观察效果



此时我们可以看到,生产消息时抛异常了,因为开启了事务,所以生产的消息没有入队列,且结果符合预期。


