博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spring事务源码分析结合mybatis源码(一)
阅读量:5149 次
发布时间:2019-06-13

本文共 14249 字,大约阅读时间需要 47 分钟。

最近想提升,苦逼程序猿,想了想还是拿最熟悉,之前也一直想看但没看的spring源码来看吧,正好最近在弄事务这部分的东西,就看了下,同时写下随笔记录下,以备后查。

spring tx源码分析

这里只分析简单事务也就是DataSourceTransactionManager

首先肯定找入口了,看过spring源码的同学一定都会找spring tx的入口就是在TxAdviceBeanDefinitionParser这里将解析tx的配置,生成TransactionInterceptor对象,这个也就是一个普通的切面类,只要符合AOP规则的调用都会进入此切面。

在invoke方法中最重要的一段代码:这里主要分析一个新的事务的开始过程

 

Class
targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(invocation.getMethod(), targetClass);//获取配置的TransactionAttribute信息 final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(invocation.getMethod(), targetClass); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);//开启一个新的事务 Object retVal = null; try { retVal = invocation.proceed();//原有逻辑执行 } catch (Throwable ex) { completeTransactionAfterThrowing(txInfo, ex);//发生异常时候对异常的处理 throw ex; } finally { cleanupTransactionInfo(txInfo);//清理TransactionInfo信息 } commitTransactionAfterReturning(txInfo);//提交事务 return retVal;

 

首先开启事务,也就是调用createTransactionIfNecessary方法:

protected TransactionInfo createTransactionIfNecessary(            PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {        if (txAttr != null && txAttr.getName() == null) {            txAttr = new DelegatingTransactionAttribute(txAttr) {                @Override                public String getName() {                    return joinpointIdentification;                }            };        }        TransactionStatus status = null;        if (txAttr != null) {            if (tm != null) {                status = tm.getTransaction(txAttr);            }            else {                }            }        }        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);    }

这里其实主要就是调用PlatformTransactionManager的getTransactionf方法来获取TransactionStatus来开启一个事务:

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {        Object transaction = doGetTransaction();        if (definition == null) {            definition = new DefaultTransactionDefinition();        }        if (isExistingTransaction(transaction)) {
//这个判断很重要,是否已经存在的一个transaction return handleExistingTransaction(definition, transaction, debugEnabled);//如果是存在的将进行一些处理 } if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } //如果是PROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED这三种类型将开启一个新的事务 else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition);//开启新事物 prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } else { boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }

这段代码比较长也是比较核心的一段代码,让我们来慢慢分析,首先这里将执行doGetTransaction方法来获取一个transaction

protected Object doGetTransaction() {        DataSourceTransactionObject txObject = new DataSourceTransactionObject();        txObject.setSavepointAllowed(isNestedTransactionAllowed());        ConnectionHolder conHolder =            (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);        //这一行代码中TransactionSynchronizationManager很重要,是对connection的核心获取、持有、删除等        txObject.setConnectionHolder(conHolder, false);        //这里不论获取到或者获取不到都将此设置newConnectionHolder为false        return txObject;    }

这段代码中主要是根据this.dataSource来获取ConnectionHolder,这个ConnectionHolder是放在TransactionSynchronizationManager的ThreadLocal中持有的,如果是第一次来获取,肯定得到是null。

接着代码往下将执行到isExistingTransaction(transaction),这里主要是依据下面代码判断:

txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive()

如果是第一次开启事务这里必然是false,否则将返回true。

我们这里先讨论第一次进入的情况,也就是false的时候将继续往下执行到了判断事务Propagation的时候了,如果Propagation为:ROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED中的一个将开启一个新事物,new一个新的DefaultTransactionStatus ,并且

newTransaction设置为true,这个状态很重要,因为后面的不论回滚、提交都是根据这个属性来判断是否在这个TransactionStatus上来进行。
接着将执行doBegin方法:
protected void doBegin(Object transaction, TransactionDefinition definition) {        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;        Connection con = null;        try {            if (txObject.getConnectionHolder() == null ||                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {                Connection newCon = this.dataSource.getConnection();//从dataSource中获取一个Connection                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);//为当前Transaction设置ConnectionHolder,并且设置newConnectionHolder为true            }            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);            con = txObject.getConnectionHolder().getConnection();            //这里主要是根据definition对connection进行一些设置            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);            txObject.setPreviousIsolationLevel(previousIsolationLevel);            if (con.getAutoCommit()) {
//开启事务,设置autoCommit为false txObject.setMustRestoreAutoCommit(true); con.setAutoCommit(false); } //这里设置transactionActive为true,还记得签名判断是否存在的transaction吧?就是根据这个 txObject.getConnectionHolder().setTransactionActive(true); int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } if (txObject.isNewConnectionHolder()) { //这里将当前的connection放入TransactionSynchronizationManager中持有,如果下次调用可以判断为已有的事务 TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); } } }

这里其实主要就是从dataSource中获取一个新的connection,形成一个ConnectionHolder,并且放入TransactionSynchronizationManager中持有,记得前面doGetTransaction方法吧,如果同一个线程,再此进入执行的话就会获取到同一个ConnectionHolder,在后面的isExistingTransaction方法也可以判定为是已有的transaction。

接下来将执行prepareSynchronization方法,主要是对TransactionSynchronizationManager的一系列设置。

然后将返回上层代码执行prepareTransactionInfo方法

protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,            TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {        TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);        if (txAttr != null) {            txInfo.newTransactionStatus(status);        }        txInfo.bindToThread();        return txInfo;    }

这里其实比较简单主要生成一个TransactionInfo并绑定到当前线程的ThreadLocal

private void bindToThread() {            this.oldTransactionInfo = transactionInfoHolder.get();            transactionInfoHolder.set(this);        }

形成了一个链表,具体啥用我也暂时没看到,唯一看到的就是通过TransactionAspectSupport.currentTransactionStatus()可以获取当前的transaction状态。

然后再返回到上层代码,接着就是执行相应的逻辑代码了

retVal = invocation.proceed();

执行过程的finally代码块将执行cleanupTransactionInfo(txInfo);

private void restoreThreadLocalStatus() {            transactionInfoHolder.set(this.oldTransactionInfo);        }

这里就是将txInfo进行重置工作,让它恢复到前一个状态。

然后就是提交操作(commitTransactionAfterReturning)或者是回滚操作(completeTransactionAfterThrowing)了。

这里就拿提交操作来为例来说明,回滚操作类似:

protected void commitTransactionAfterReturning(TransactionInfo txInfo) {        if (txInfo != null && txInfo.hasTransaction()) {            txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());        }    }

实际就是执行的processCommit方法

private void processCommit(DefaultTransactionStatus status) throws TransactionException {        try {            boolean beforeCompletionInvoked = false;            try {                prepareForCommit(status);                triggerBeforeCommit(status);                triggerBeforeCompletion(status);                beforeCompletionInvoked = true;                boolean globalRollbackOnly = false;                if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {                    globalRollbackOnly = status.isGlobalRollbackOnly();                }                if (status.hasSavepoint()) {                    status.releaseHeldSavepoint();                }                else if (status.isNewTransaction()) {                    doCommit(status);                }                if (globalRollbackOnly) {                    throw new UnexpectedRollbackException(                            "Transaction silently rolled back because it has been marked as rollback-only");                }            }            catch (UnexpectedRollbackException ex) {                triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);                throw ex;            }            catch (TransactionException ex) {                if (isRollbackOnCommitFailure()) {                    doRollbackOnCommitException(status, ex);                }                else {                    triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);                }                throw ex;            }            catch (RuntimeException ex) {                if (!beforeCompletionInvoked) {                    triggerBeforeCompletion(status);                }                doRollbackOnCommitException(status, ex);                throw ex;            }            catch (Error err) {                if (!beforeCompletionInvoked) {                    triggerBeforeCompletion(status);                }                doRollbackOnCommitException(status, err);                throw err;            }            try {                triggerAfterCommit(status);            }            finally {                triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);            }        }        finally {            cleanupAfterCompletion(status);        }    }

首先将执行一些提交前的准备工作,这里将进行是否有savepoint判断status.hasSavepoint(),如果有的话将进行释放savePoint,即getConnectionHolderForSavepoint().getConnection().releaseSavepoint((Savepoint) savepoint);

接着就判断是否是新的transaction:status.isNewTransaction(),如果是的话将执行 doCommit(status);

protected void doCommit(DefaultTransactionStatus status) {        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();        Connection con = txObject.getConnectionHolder().getConnection();        try {            con.commit();        }        catch (SQLException ex) {            throw new TransactionSystemException("Could not commit JDBC transaction", ex);        }    }

其实也就是调用了Connection的commit()方法。

最后无论成功与否都将调用finally块中的cleanupAfterCompletion(status)

private void cleanupAfterCompletion(DefaultTransactionStatus status) {        status.setCompleted();        if (status.isNewSynchronization()) {            TransactionSynchronizationManager.clear();//TransactionSynchronizationManager清理工作        }        if (status.isNewTransaction()) {            doCleanupAfterCompletion(status.getTransaction());//这个比较重要        }        if (status.getSuspendedResources() != null) {            resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources());        }    }

首先对TransactionSynchronizationManager进行一系列清理工作,然后就将执行doCleanupAfterCompletion方法:

protected void doCleanupAfterCompletion(Object transaction) {        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;        if (txObject.isNewConnectionHolder()) {            //从TransactionSynchronizationManager中解绑相应的connectionHolder            TransactionSynchronizationManager.unbindResource(this.dataSource);        }        Connection con = txObject.getConnectionHolder().getConnection();        try {            //对获取到的Connection进行一些还原            if (txObject.isMustRestoreAutoCommit()) {                con.setAutoCommit(true);            }//对获取到的Connection进行一些还原            DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());        }        catch (Throwable ex) {        }        if (txObject.isNewConnectionHolder()) {            //如果是newConnection将这个链接关闭,如果是连接池将还给连接池            DataSourceUtils.releaseConnection(con, this.dataSource);        }        //这里将这只transactionActive为false        txObject.getConnectionHolder().clear();    }

其实就是将TransactionSynchronizationManager中持有的connectionHolder释放,并且还原当前Connection 的状态,然后将对当前的transaction进行清理包括设置transactionActive为false等。

至此整个spring的事务过程也就结束了。

两篇比较好的关于事务的博客:

http://www.iteye.com/topic/78674

http://www.cnblogs.com/yangy608/archive/2011/06/29/2093478.html

转载于:https://www.cnblogs.com/lcxdever/p/4570090.html

你可能感兴趣的文章
关联查询中的一对一查询。通过第一种方式也就是自动映射的方式查询所有订单信息,关联查询下单用户信息。(由于需要创建中间类,这种方法在实际开发中已经不再使用)...
查看>>
【编程思想】【设计模式】【行为模式Behavioral】备忘录模式Memento
查看>>
Oracle创建分区表
查看>>
Android深度探索第九章
查看>>
hdu 2393:Higher Math(计算几何,水题)
查看>>
windows下用tcc编译Lua
查看>>
execl导入导出
查看>>
phpcms和php格式化时间戳
查看>>
thinkphp的空控制器和空操作以及对应解决方法
查看>>
hadoop记录-MapReduce之如何处理失败的task(转载)
查看>>
TCP介绍
查看>>
POJ 2823 Sliding Window
查看>>
如何修改mysql数据库文件的路径
查看>>
CSS3笔记4
查看>>
约瑟夫问题
查看>>
Android的所有权限说明
查看>>
DOM节点类型
查看>>
C++ fstream文件操作
查看>>
初识socket
查看>>
Hive(二)hive的基本操作
查看>>