spring---transaction(1)---源代码分析(事务的拦截器TransactionInterceptor)

发布时间:2022-08-19T17:07:19 Java
  • 先了解一下spring的事务。分为分明式事务管理和注解式事务管理,对于前期的事务,spring会通过扫描拦截对于事务的方法进行增强(以后讲解)。
  • 若果目标方法存在事务,spring产出的bean会是一个代理对象(cglib或者jdk)。
  • 本问讨论的是spring拦截到事务,对于事务的增强处理。

 

spring自己的一系列接口设计

    •  PlatformTransactionManager 事务管理器
    •  TransactionDefinition 事务定义
    •  TransactionStatus 事务状态

 

 

TranctionInterceptor之前了解

  • 看过spring源码的同学一定都会找spring tx的入口就是在TxAdviceBeanDefinitionParser这里将解析tx的配置,生成TransactionInterceptor对象,这个也就是一个普通的切面类,只要符合AOP规则的调用都会进入此切面。
  • ransactionInterceptor支撑着整个事务功能的架构,逻辑还是相对复杂的,那么现在我们切入正题来分析此拦截器是如何实现事务特性的。
  • spring在处理事务的aop增强是,主要调用了return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);

 

TranctionInterceptor

首先看TranctionInterceptor(位于spring-tx-*.jar中的org.springframework.transaction.interceptor)的结构:

继承类TransactionAspectSupport:其实对其进行了增强(模板方法模式)

实现接口MethodInterceptor:方法拦截器,执行代理类的目标方法,会触发invoke方法执行

public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
    @Override
    //实现了MethodInterceptor的invoke方法
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        //获取目标类
     Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
     //父类TransactionAspectSupport的模板方法
        return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
            @Override
       //InvocationCallback接口的回调方法
            public Object proceedWithInvocation() throws Throwable {
          //执行目标方法
                return invocation.proceed();
            }
        });
    }
}

 

重点分析 抽象类TransactionAspectSupport(基类)的invokeWithinTransaction方法

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
   //protected修饰,不允许其他包和无关类调用
    protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
        // 获取对应事务属性.如果事务属性为空(则目标方法不存在事务)
        final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
     // 根据事务的属性获取beanFactory中的PlatformTransactionManager(spring事务管理器的顶级接口),一般这里或者的是DataSourceTransactiuonManager
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
     // 目标方法唯一标识(类.方法,如service.UserServiceImpl.save)
        final String joinpointIdentification = methodIdentification(method, targetClass);
     //如果txAttr为空或者tm 属于非CallbackPreferringPlatformTransactionManager,执行目标增强     ①
        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            //看是否有必要创建一个事务,根据事务传播行为,做出相应的判断
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
            Object retVal = null;
            try {
          //回调方法执行,执行目标方法(原有的业务逻辑)
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // 异常回滚
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
          //清除信息
                cleanupTransactionInfo(txInfo);
            }
        //提交事务
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }
     //编程式事务处理(CallbackPreferringPlatformTransactionManager) 不做重点分析
        else {
            try {
                Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
                        new TransactionCallback<Object>() {
                            @Override
                            public Object doInTransaction(TransactionStatus status) {
                                TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                                try {
                                    return invocation.proceedWithInvocation();
                                }
                                catch (Throwable ex) {
                                    if (txAttr.rollbackOn(ex)) {
                                        // A RuntimeException: will lead to a rollback.
                                        if (ex instanceof RuntimeException) {
                                            throw (RuntimeException) ex;
                                        }
                                        else {
                                            throw new ThrowableHolderException(ex);
                                        }
                                    }
                                    else {
                                        // A normal return value: will lead to a commit.
                                        return new ThrowableHolder(ex);
                                    }
                                }
                                finally {
                                    cleanupTransactionInfo(txInfo);
                                }
                            }
                        });

                // Check result: It might indicate a Throwable to rethrow.
                if (result instanceof ThrowableHolder) {
                    throw ((ThrowableHolder) result).getThrowable();
                }
                else {
                    return result;
                }
            }
            catch (ThrowableHolderException ex) {
                throw ex.getCause();
            }
        }
    }
}
  • ①不同的事务处理方式使用不同的逻辑。对于声明式事务的处理与编程式事务的处理,第一点区别在于事务属性上,因为编程式的事务处理是不需要有事务属性的,第二点区别就是在TransactionManager上,CallbackPreferringPlatformTransactionManager实现PlatformTransactionManager接口,暴露出一个方法用于执行事务处理中的回调。所以,这两种方式都可以用作事务处理方式的判断。

 

重点分析createTransactionIfNecessary方法,它会判断是否存在事务,根据事务的传播属性。做出不同的处理,也是做了一层包装,核心是通过TransactionStatus来判断事务的属性。

通过持有的PlatformTransactionManager来获取TransactionStatus

AbstractPlatformTransactionManager.java(spring中存在很多模板方法,对于 Abstract开头的封装的抽象类,基本都有模板方法,且为final修饰

    @Override
    public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
     //这里其实主要就是调用PlatformTransactionManager的getTransactionf方法来获取TransactionStatus来开启一个事务:
        Object transaction = doGetTransaction();
        // Cache debug flag to avoid repeated checks.
        boolean debugEnabled = logger.isDebugEnabled();
        if (definition == null) {
            // Use defaults if no transaction definition given.
            definition = new DefaultTransactionDefinition();
        }
     //这个判断很重要,是否已经存在的一个transaction
        if (isExistingTransaction(transaction)) {
       //如果是存在的将进行一些处理
            // Existing transaction found -> check propagation behavior to find out how to behave.
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }

        // Check definition settings for new transaction.
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }

        // No existing transaction found -> check propagation behavior to find out how to proceed.
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
     //如果是PROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED这三种类型将开启一个新的事务
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
            }
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
          //开启新事物
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException ex) {
                resume(null, suspendedResources);
                throw ex;
            }
            catch (Error err) {
                resume(null, suspendedResources);
                throw err;
            }
        }
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + definition);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
        }
    }

 

 

这段代码比较长也是比较核心的一段代码,让我们来慢慢分析,首先这里将执行doGetTransaction方法来获取一个transaction,和dobegin方法如何开启一个事务

AbstractPlatformTransactionManager并没有给出doGetTransaction的具体实现。而是由子类实现。

我们以分析实现类DataSourceTransactionManager的具体方法。

 

public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
        implements ResourceTransactionManager, InitializingBean {
    private DataSource dataSource;

    @Override
  //这段代码中主要是根据this.dataSource来获取ConnectionHolder,这个ConnectionHolder是放在TransactionSynchronizationManager的ThreadLocal中持有的,如果是第一次来获取,肯定得到是null。
    protected Object doGetTransaction() {
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
     //这一行代码中TransactionSynchronizationManager很重要,是对connection的核心获取、持有、删除等
        ConnectionHolder conHolder =
                (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
     //这里不论获取到或者获取不到都将此设置newConnectionHolder为false
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
    }

 

接着代码往下将执行到isExistingTransaction(transaction),这里主要是依据下面代码判断:

    @Override
    protected boolean isExistingTransaction(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
     //如果是第一次开启事务这里必然是false,否则将返回true。
        return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive());
    }

我们这里先讨论第一次进入的情况,也就是false的时候将继续往下执行到了判断事务Propagation的时候了,如果Propagation为:ROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED中的一个将开启一个新事物,new一个新的DefaultTransactionStatus ,并且newTransaction设置为true,这个状态很重要,因为后面的不论回滚、提交都是根据这个属性来判断是否在这个TransactionStatus上来进行。

接着将执行doBegin方法:

    protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;

        try {
            if (txObject.getConnectionHolder() == null ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
          //从dataSource中获取一个Connection
                Connection newCon = this.dataSource.getConnection();
                if (logger.isDebugEnabled()) {
                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
          //为当前Transaction设置ConnectionHolder,并且设置newConnectionHolder为true
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }

            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();
       //这里主要是根据definition对connection进行一些设置
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);

            // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
            // so we don't want to do it unnecessarily (for example if we've explicitly
            // configured the connection pool to set it already).
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                if (logger.isDebugEnabled()) {
                    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }
          //开启事务,设置autoCommit为false
                con.setAutoCommit(false);
            }
       //这里设置transactionActive为true,还记得签名判断是否存在的transaction吧?就是根据这个
            txObject.getConnectionHolder().setTransactionActive(true);

            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }

            // Bind the session holder to the thread.
            if (txObject.isNewConnectionHolder()) {
         //这里将当前的connection放入TransactionSynchronizationManager中持有,如果下次调用可以判断为已有的事务
                TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
            }
        }
        catch (Throwable ex) {
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, this.dataSource);
                txObject.setConnectionHolder(null, false);
            }
            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
    }

TransactionSynchronizationManager中持有,记得前面doGetTransaction方法吧,如果同一个线程,再此进入执行的话就会获取到同一个ConnectionHolder,在后面的isExistingTransaction方法也可以判定为是已有的transaction。

 

 

接下来将执行prepareSynchronization方法,主要是对TransactionSynchronizationManager的一系列设置。然后将返回上层代码执行prepareTransactionInfo方法

TransactionAspectSupport.java

    protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
            TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {

        TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
        if (txAttr != null) {
            // We need a transaction for this method
            if (logger.isTraceEnabled()) {
                logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
            }
            // The transaction manager will flag an error if an incompatible tx already exists
            txInfo.newTransactionStatus(status);
        }
        else {
            // The TransactionInfo.hasTransaction() method will return
            // false. We created it only to preserve the integrity of
            // the ThreadLocal stack maintained in this class.
            if (logger.isTraceEnabled())
                logger.trace("Don't need to create transaction for [" + joinpointIdentification +
                        "]: This method isn't transactional.");
        }

        // We always bind the TransactionInfo to the thread, even if we didn't create
        // a new transaction here. This guarantees that the TransactionInfo stack
        // will be managed correctly even if no transaction was created by this aspect.
        txInfo.bindToThread();
        return txInfo;
    }

这里其实比较简单主要生成一个TransactionInfo并绑定到当前线程的ThreadLocal

        private void bindToThread() {
            // Expose current TransactionStatus, preserving any existing TransactionStatus
            // for restoration after this transaction is complete.
            this.oldTransactionInfo = transactionInfoHolder.get();
            transactionInfoHolder.set(this);
        }

然后再返回到上层代码,接着就是执行相应的逻辑代码了

retVal = invocation.proceed();

 

执行过程的finally代码块将执行cleanupTransactionInfo(txInfo);

    protected void cleanupTransactionInfo(TransactionInfo txInfo) {
        if (txInfo != null) {
       //这里就是将txInfo进行重置工作,让它恢复到前一个状态。
            txInfo.restoreThreadLocalStatus();
        }
    }

 

 

然后就是提交操作(commitTransactionAfterReturning)或者是回滚操作(completeTransactionAfterThrowing)了。这里就拿提交操作来为例来说明,回滚操作类似:

    protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
        if (txInfo != null && txInfo.hasTransaction()) {
            if (logger.isTraceEnabled()) {
                logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
            }
            txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
        }
    }

实际就是执行的processCommit方法

    private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;
            try {
                prepareForCommit(status);
                triggerBeforeCommit(status);
                triggerBeforeCompletion(status);
                beforeCompletionInvoked = true;
                boolean globalRollbackOnly = false;
                if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                    globalRollbackOnly = status.isGlobalRollbackOnly();
                }
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Releasing transaction savepoint");
                    }
                    status.releaseHeldSavepoint();
                }
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction commit");
                    }
                    doCommit(status);
                }
                // Throw UnexpectedRollbackException if we have a global rollback-only
                // marker but still didn't get a corresponding exception from commit.
                if (globalRollbackOnly) {
                    throw new UnexpectedRollbackException(
                            "Transaction silently rolled back because it has been marked as rollback-only");
                }
            }
            catch (UnexpectedRollbackException ex) {
                // can only be caused by doCommit
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                throw ex;
            }
            catch (TransactionException ex) {
                // can only be caused by doCommit
                if (isRollbackOnCommitFailure()) {
                    doRollbackOnCommitException(status, ex);
                }
                else {
                    triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                }
                throw ex;
            }
            catch (RuntimeException ex) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, ex);
                throw ex;
            }
            catch (Error err) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, err);
                throw err;
            }

            // Trigger afterCommit callbacks, with an exception thrown there
            // propagated to callers but the transaction still considered as committed.
            try {
                triggerAfterCommit(status);
            }
            finally {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
            }

        }
        finally {
            cleanupAfterCompletion(status);
        }
    }

首先将执行一些提交前的准备工作,这里将进行是否有savepoint判断status.hasSavepoint(),如果有的话将进行释放savePoint,即getConnectionHolderForSavepoint().getConnection().releaseSavepoint((Savepoint) savepoint);

 

接着就判断是否是新的transaction:status.isNewTransaction(),如果是的话将执行 doCommit(status);

    @Override
    protected void doCommit(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
            logger.debug("Committing JDBC transaction on Connection [" + con + "]");
        }
        try {
       //其实也就是调用了Connection的commit()方法。
            con.commit();
        }
        catch (SQLException ex) {
            throw new TransactionSystemException("Could not commit JDBC transaction", ex);
        }
    }

 

 

最后无论成功与否都将调用finally块中的cleanupAfterCompletion(status)

    private void cleanupAfterCompletion(DefaultTransactionStatus status) {
        status.setCompleted();
        if (status.isNewSynchronization()) {
       ////TransactionSynchronizationManager清理工作
            TransactionSynchronizationManager.clear();
        }
        if (status.isNewTransaction()) {
       //这个比较重要(重点分析)
            doCleanupAfterCompletion(status.getTransaction());
        }
        if (status.getSuspendedResources() != null) {
            if (status.isDebug()) {
                logger.debug("Resuming suspended transaction after completion of inner transaction");
            }
            resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources());
        }
    }

 

首先对TransactionSynchronizationManager进行一系列清理工作,然后就将执行doCleanupAfterCompletion方法:

    @Override
    protected void doCleanupAfterCompletion(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

        // Remove the connection holder from the thread, if exposed.
        if (txObject.isNewConnectionHolder()) {
        ////从TransactionSynchronizationManager中解绑相应的connectionHolder
            TransactionSynchronizationManager.unbindResource(this.dataSource);
        }

        // Reset connection.
        Connection con = txObject.getConnectionHolder().getConnection();
        try {
            if (txObject.isMustRestoreAutoCommit()) {
         //对获取到的Connection进行一些还原
                con.setAutoCommit(true);
            }
            DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
        }
        catch (Throwable ex) {
            logger.debug("Could not reset JDBC Connection after transaction", ex);
        }

        if (txObject.isNewConnectionHolder()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
            }
            ////如果是newConnection将这个链接关闭,如果是连接池将还给连接池
            DataSourceUtils.releaseConnection(con, this.dataSource);
        }
     //这里将这只transactionActive为false
        txObject.getConnectionHolder().clear();
    }

其实就是将TransactionSynchronizationManager中持有的connectionHolder释放,并且还原当前Connection 的状态,然后将对当前的transaction进行清理包括设置transactionActive为false等。

至此整个spring的事务过程也就结束了。

 

原文链接:https://www.cnblogs.com/chihirotan/p/6739748.html