基于 RocketMQ 的事务消息与分布式最终一致性
原理简介
- RocketMQ 提供了类似 X/Open XA 的分布式事务功能,通过 MQ 的事务消息能达到分布式事务的最终一致。
- 发送方在业务执行开始会先向消息队列中投递'半消息'。半消息即暂时不会真正投递的消息。当发送方(即生产者)将消息成功发送给了 MQ 服务端且并未将该消息的二次确认结果返回,此时消息状态是'暂时不可投递'状态(可以认为是状态未知)。该状态下的消息即半消息。
- 如果出现网络闪断、生产者应用重启等原因导致事务消息二次确认丢失,MQ 服务端会通过扫描发现某条消息长期处于'半消息'状态,MQ 服务端会主动向生产者查询该消息的最终状态是处于 Commit(消息提交) 还是 Rollback(消息回滚)。这个过程称为消息回查。
有了上述的概念,我们详细解释一下事务消息交互的过程:
- 首先,MQ 发送方向 MQ 服务(即 RocketMQ 的 Broker)发送半消息。
- MQ 服务端会将消息做持久化处理,并发送 ACK 确认消息已经发送成功。
- MQ 发送方执行本地事务。
- MQ 发送方根据本地事务执行的结果向 MQ 服务提交二次确认:如果本地事务执行成功,则提交消息状态为 Commit,否则为 Rollback。MQ 服务端收到 Commit 状态的消息将消息标记为可投递状态,订阅方最终会收到该条消息。如果收到的是 Rollback,最终 MQ 服务端会删除该条半消息,订阅方不会接收到这条消息。
- 如果出现网络闪断、应用重启等情况,第 4 阶段提交的二次确认最终并未能到达 MQ 服务端,一定时间之后,MQ 服务端会对此消息发起回查操作,确认发送方本地事务的执行状态。
- 发送方需要实现服务回查逻辑供 MQ 服务端进行回调。当发送方收到回查后,需要检查对应消息的本地事务执行的最终结果,此处也需要根据本地事务的成功或失败返回 Commit 或者 Rollback,即再次提交消息状态的二次确认,MQ 服务端仍会按照步骤 4 对该半消息进行操作。
注意 1-4 为事务消息的发送过程,5-6 为事务消息的回查过程。
如何使用
此处引用官网的 demo 进行简单说明。
事务状态
RocketMQ 定义了三种事务状态:
TransactionStatus.CommitTransaction:消息提交,当消息状态为 CommitTransaction,表示允许消费者允许消费当前消息。TransactionStatus.RollbackTransaction:消息回滚,表示 MQ 服务端将会删除当前半消息,不允许消费者消费。TransactionStatus.Unknown:中间状态,表示 MQ 服务需要发起回查操作,检测当前发送方本地事务的执行状态。
发送事务消息
(1)创建事务消息生产者
使用 TransactionMQProducer 创建消息发送客户端。并指定一个唯一的生产者组 producerGroup,当执行完本地事务,需要返回给 MQ 服务端执行结果,返回上面的三种事务状态。CommitTransaction、RollbackTransaction、Unknown。
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
();
(, , , TimeUnit.SECONDS, <Runnable>(), () {
Thread {
(r);
thread.setName();
thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = []{, , , , };
( ; i < ; i++) {
{
(, tags[i % tags.length], + i,
( + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendMessageInTransaction(msg, );
System.out.printf(, sendResult);
Thread.sleep();
} (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
( ; i < ; i++) {
Thread.sleep();
}
producer.shutdown();
}
}


