详解RabbitMQ高级特性之事务

目录
事务
RabbitMQ是基于AMQP协议实现的, 该协议实现了事务机制, 因此RabbitMQ也⽀持事务机制. Spring AMQP也提供了对事务相关的操作. RabbitMQ事务允许开发者确保消息的发送和接收是原⼦性的, 要么全部成功, 要么全部失败.
添加配置
spring: application: name: rabbit-extensions-demo rabbitmq: addresses: amqp://study:[email protected]:5672/extension
常量类
public class Constants { //事务 public static final String TRANS_QUEUE = "trans.queue"; }声明队列
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import rabbitextensionsdemo.constant.Constants; @Configuration public class RabbitMQConfig { @Bean("transQueue") public Queue transQueue(){ return QueueBuilder.durable(Constants.TRANS_QUEUE).build(); } }设置RabbitTemplate
import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitTemplateConfig { @Bean("transRabbitTemplate") public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setChannelTransacted(true); //开启事务 return rabbitTemplate; } }编写生产消息代码1
@Transactional @RequestMapping("/trans") public String trans(){ transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE, "trans test 1..."); transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE, "trans test 2..."); return "消息发送成功"; }观察效果


此时我们可以看到,两条消息都进入了队列。
编写生产消息代码2
@Transactional @RequestMapping("/trans") public String trans(){ transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE, "trans test 1..."); int num = 5/0; transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE, "trans test 2..."); return "消息发送成功"; }观察效果



此时我们发现,抛异常了,但是队列中居然有1条消息,怎么回事?我们不是开启事务了吗?
原因是因为我们没有配置事务管理器
配置事务管理器
import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitTemplateConfig { @Bean("transRabbitTemplate") public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setChannelTransacted(true); //开启事务 return rabbitTemplate; } @Bean public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){ return new RabbitTransactionManager(connectionFactory); } }观察效果



此时我们可以看到,生产消息时抛异常了,因为开启了事务,所以生产的消息没有入队列,且结果符合预期。