跳到主要内容 SpringBoot 手动开启数据库事务的几种实现方式 | 极客日志
Java java
SpringBoot 手动开启数据库事务的几种实现方式 本文介绍了 SpringBoot 中手动开启数据库事务的四种方式,重点讲解了基于 PlatformTransactionManager 的手动管理方案。通过注入 PlatformTransactionManager 获取 TransactionStatus,控制事务的开启、提交与回滚。源码分析揭示了 JdbcTransactionManager 如何通过 DataSourceTransactionManager 将 Connection 自动提交设为 false,并利用 TransactionSynchronizationManager 绑定线程资源。此外,还探讨了 MyBatis SqlSession 与当前线程共享事务的原理,以及一级缓存导致查询结果不一致的问题及解决方案(使用 @Options 刷新缓存)。
概要
某些情况下我们可能需要手动开启事务,比如由多个业务组合的功能,其中某一段业务报错我们需要进行回滚操作,或者是使用数据库事务实现分布式锁。那么该如何开启事务呢。
开启事务
方式一 :使用@Transactional 注解,Spring 会自动帮我们管理事务,包括开启事务、提交事务、回滚事务。
方式二 :从数据源 DataSource 中获取一个 Connection,DataSource 是自动装配的,SpringBoot 默认使用的是 HikariDataSource。将 Connection 自动提交设置为 false,用此 Connection 执行业务 SQL,然后提交事务、回滚事务。
方式三 :借助 Spring 中的事务管理器 PlatformTransactionManager 来开启事务。
方式四 :使用 TransactionTemplate(自动注入即可),调用其 execute 方法来执行业务逻辑。
PlatformTransactionManager
方式一是自动管理事务。方式二虽然能手动管理事务,但实际操作起来不太优雅。方式四本质上还是方式三只不过把开启事务、提交事务、回滚事务做了封装,通过 lambda 函数回调执行我们的业务,可以认为还是自动管理了事务,这里重点介绍方式三。
先看一段代码和运行效果
@Resource
private UserMapper userMapper;
@Resource
private PlatformTransactionManager transactionManager;
@GetMapping(value = "/transaction", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<String> transaction () throws InterruptedException {
TransactionStatus transaction = null ;
try {
transaction = transactionManager.getTransaction(new DefaultTransactionDefinition ());
User user = userMapper.findById(1L );
System.out.println("更新前:" + user);
user.setAge(28 );
userMapper.updateById(user);
Thread otherThread = new Thread (() -> {
userMapper.findById( );
System.out.println( + newUser);
});
otherThread.start();
otherThread.join();
userMapper.findById( );
System.out.println( + newUser);
} {
(transaction != ) {
transactionManager.commit(transaction);
(() -> {
userMapper.findById( );
System.out.println( + user);
});
otherThread.start();
otherThread.join();
}
}
ResponseEntity.ok( );
}
微信扫一扫,关注极客日志 微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具 Keycode 信息 查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
Escape 与 Native 编解码 JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
JavaScript / HTML 格式化 使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
JavaScript 压缩与混淆 Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
User
newUser
=
1L
"新线程获取更新后的值:"
User
newUser
=
1L
"更新后:"
finally
if
null
Thread
otherThread
=
new
Thread
User
user
=
1L
"新线程获取事务提交后的值:"
return
"transaction"
这里使用 mybatis 来作为持久层框架,PlatformTransactionManager 系统已经自动装配,这里直接注入就可以使用。从运行效果来看手动开启的事务是生效的。
上面的测试代码是开启了一个新线程来观察事务开启后的效果,由于是新线程必然和当前线程是不会共享事务。但是这种写法需要额外的线程来操作,下面是用 mybatis 的 SqlSessionFactory 来开启一个新的 SqlSession 和当前线程不共享事务。
@Resource
private UserMapper userMapper;
@Resource
private PlatformTransactionManager transactionManager;
@Resource
private SqlSessionFactory sqlSessionFactory;
@GetMapping(value = "/transaction2", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<String> transaction2 () {
TransactionStatus transaction = null ;
DefaultSqlSession sqlSession = null ;
try {
transaction = transactionManager.getTransaction(new DefaultTransactionDefinition ());
User user = userMapper.findById(1L );
System.out.println("更新前:" + user);
user.setAge(28 );
userMapper.updateById(user);
Configuration configuration = sqlSessionFactory.getConfiguration();
sqlSession = new DefaultSqlSession (
configuration,
configuration.newExecutor(new JdbcTransaction (configuration.getEnvironment().getDataSource().getConnection()), ExecutorType.SIMPLE),
true
);
User user2 = sqlSession.getMapper(UserMapper.class).findById(1L );
System.out.println("新 SqlSession 获取更新后的值:" + user2);
User newUser = userMapper.findById(1L );
System.out.println("更新后:" + newUser);
} catch (SQLException e) {
throw new RuntimeException (e);
} finally {
if (transaction != null ) {
transactionManager.commit(transaction);
if (sqlSession != null ) {
User newUser = sqlSession.getMapper(UserMapper.class).findById(1L );
System.out.println("新 SqlSession 获取事务提交后的值:" + newUser);
sqlSession.close();
}
}
}
return ResponseEntity.ok("transaction2" );
}
从运行结果来看,事务提交后新 sqlSession 获取的 age 应该为 28,但仍然是 18。这是因为同一个 sqlSession 执行了相同的查询 sql 语句时,后续的查询会从缓存中拿值,我们需要在相应的 mapper 方法上加上@Options 注解每次查询前会清空缓然后走数据库查询。
@Options(flushCache = Options.FlushCachePolicy.TRUE)
@Select("select * from user where id = #{id}")
User findById (Long id) ;
技术细节
PlatformTransactionManager 是如何实现手动管理事务的
PlatformTransactionManager 的实现是 JdbcTransactionManager,参考 DataSourceTransactionManagerAutoConfiguration 自动装配类。如果引入了其他事务框架,如 spring-boot-starter-data-jpa,那么 PlatformTransactionManager 实现会是 JpaTransactionManager,可以参考 HibernateJpaAutoConfiguration 自动装配类。不管是 JdbcTransactionManager 还是 JpaTransactionManager 在开启事务时做的相关操作都是类似的,都是从数据源中获取到一个新的 Connection 后将其自动提交设置为 false。
transaction = transactionManager.getTransaction(new DefaultTransactionDefinition ());
getTransaction 方法在其抽象类 AbstractPlatformTransactionManager 中,源码如下
public final TransactionStatus getTransaction (@Nullable TransactionDefinition definition) throws TransactionException {
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException ("No existing transaction found for transaction marked with propagation 'mandatory'" );
} else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null );
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
return startTransaction(def, transaction, false , debugEnabled, suspendedResources);
} catch (RuntimeException | Error ex) {
resume(null , suspendedResources);
throw ex;
}
} else {
}
}
private TransactionStatus startTransaction (TransactionDefinition definition, Object transaction, boolean nested, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = new TransactionStatus (definition, transaction, true , newSynchronization, nested, debugEnabled, suspendedResources);
this .transactionExecutionListeners.forEach(listener -> listener.beforeBegin(status));
try {
} catch (RuntimeException | Error ex) {
this .transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, ex));
throw ex;
}
prepareSynchronization(status, definition);
this .transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, null ));
return status;
}
doBegin 是抽象方法,其实现在 JdbcTransactionManager 的父类 DataSourceTransactionManager 中实现。
protected void doBegin (Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null ;
try {
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction" );
}
if (definition.isReadOnly()) {
checkDefaultReadOnly(newCon);
}
txObject.setConnectionHolder(newConnectionHolder(newCon), true );
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true );
con = txObject.getConnectionHolder().getConnection();
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true );
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit" );
}
con.setAutoCommit(false );
}
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable ex) {
}
}
源码中可以看到新获取的连接其自动提交被设置为 false 这样就能实现手动提交事务了。且新的连接被 TransactionSynchronizationManager(事务同步器)绑定到当前线程中,事务同步器在绑定数据时是用 ThreadLocal 来实现的,方便后续线程能直接拿到绑定的数据库连接。
当使用 mybatis 的 mapper 接口或者 sqlSession 查询以及更新数据时,是如何共享事务的。
mapper 接口会变成一个代理对象(是一个 MapperFactoryBean 属于工厂 Bean),sql 的执行是交给代理对象中封装的 sqlSession 来完成操作。sqlSession 在执行 sql 语句时最终会交给 Executor。
Executor 中会有个事务字段 transaction 是一个接口。在 Spring 环境下它的实现是 SpringManagedTransaction。Executor 执行 sql 语句时会从 transaction 中获取一个数据连接。
public Connection getConnection () throws SQLException {
if (this .connection == null ) {
openConnection();
}
return this .connection;
}
private void openConnection () throws SQLException {
this .connection = DataSourceUtils.getConnection(this .dataSource);
this .autoCommit = this .connection.getAutoCommit();
this .isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this .connection, this .dataSource);
LOGGER.debug(() -> "JDBC Connection [" + this .connection + "] will" + (this .isConnectionTransactional ? " : " : " not " ) + "be managed by Spring" );
}
可以看到连接的获取是通过工具类 DataSourceUtils 来操作完成的,这个是 spring jdbc 中所提供的工具类。
public static Connection getConnection (DataSource dataSource) throws CannotGetJdbcConnectionException {
try {
return doGetConnection(dataSource);
} catch (SQLException ex) {
throw new CannotGetJdbcConnectionException ("Failed to obtain JDBC Connection" , ex);
} catch (IllegalStateException ex) {
throw new CannotGetJdbcConnectionException ("Failed to obtain JDBC Connection" , ex);
}
}
public static Connection doGetConnection (DataSource dataSource) throws SQLException {
Assert.notNull(dataSource, "No DataSource specified" );
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
logger.debug("Fetching resumed JDBC Connection from DataSource" );
conHolder.setConnection(fetchConnection(dataSource));
}
return conHolder.getConnection();
}
return con;
}
由于前面开启事务时已经给当前线程绑定了一个 ConnectionHolder,这里就直接接能获取到,这样就实现了同一个线程中数据库连接的共享。最后提交事务时是交给 doCommit 方法完成的。
protected void doCommit (DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]" );
}
try {
con.commit();
} catch (SQLException ex) {
throw translateException("JDBC commit" , ex);
}
}
事务提交之后从数据源中拿到的 Connection 自动提交要恢复为 true。JdbcTransactionManager 的操作是在父类 DataSourceTransactionManager 的 doCleanupAfterCompletion 方法中完成的。
protected void doCleanupAfterCompletion (Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
Connection con = txObject.getConnectionHolder().getConnection();
try {
if (txObject.isMustRestoreAutoCommit()) {
con.setAutoCommit(true );
}
DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel(), (txObject.isReadOnly() && !isDefaultReadOnly()));
} catch (Throwable ex) {
logger.debug("Could not reset JDBC Connection after transaction" , ex);
}
}
总结 为何 mybatis 的 sqlSession 在执行同一个查询 sql 语句时后续会从缓存中拿值。前面说到 sql 语句的执行会交给 Executor,其查询方法如下。
public <E> List<E> query (MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query" ).object(ms.getId());
if (closed) {
throw new ExecutorException ("Executor was closed." );
}
if (queryStack == 0 && ms.isFlushCacheRequired()) {
clearLocalCache();
}
List<E> list;
try {
queryStack++;
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null ;
if (list != null ) {
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
queryStack--;
}
if (queryStack == 0 ) {
for (DeferredLoad deferredLoad : deferredLoads) {
deferredLoad.load();
}
deferredLoads.clear();
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
clearLocalCache();
}
}
return list;
}
mybatis 在扫描 mapper 接口时会默认让查询语句的刷新缓存为都为 false,这里其实就是 mybatis 的一级缓存,属于会话级别。当指定 Options 注解且其 flushCache 为 true 时会设置查询语句要刷新缓存,如果是使用 xml 写 sql 语句,相应的 select 标签上指定 flushCache 属性为 true。