본문 바로가기
IT/Spring

스프링의 트랜잭션

by 봉즙 2023. 10. 12.

프로그래밍 방식 트랜잭션

소스코드에 직접 트랜잭션을 넣어 관리하는 방법

트랜잭션이 필요한 모든 메서드에 선언을 해주기 때문에 유연하나 유지보수가 힘들다.

선언적 트랜잭션

AbstractPlatformTransactionManager 를 상속하여 일관되게 형식으로 적용이 가능하도록 되어있어 주로 사용한다.

스프링의 @TransactionalCGlibAopProxy를 사용하여 구현하였는데 흐름을 분석해보자.

class CglibAopProxy implements AopProxy, Serializable {
    private static class DynamicAdvisedInterceptor implements MethodInterceptor, Serializable {
        @Override
        @Nullable
        public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
            Object oldProxy = null;
            boolean setProxyContext = false;
            Object target = null;
            TargetSource targetSource = this.advised.getTargetSource();
            try {
                if (this.advised.exposeProxy) {
                    // Make invocation available if necessary.
                    oldProxy = AopContext.setCurrentProxy(proxy);
                    setProxyContext = true;
                }
                // Get as late as possible to minimize the time we "own" the target, in case it comes from a pool...
                target = targetSource.getTarget();
                Class<?> targetClass = (target != null ? target.getClass() : null);
                List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
                Object retVal;
                // Check whether we only have one InvokerInterceptor: that is,
                // no real advice, but just reflective invocation of the target.
                if (chain.isEmpty()) {
                    // We can skip creating a MethodInvocation: just invoke the target directly.
                    // Note that the final invoker must be an InvokerInterceptor, so we know
                    // it does nothing but a reflective operation on the target, and no hot
                    // swapping or fancy proxying.
                    Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
                    retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
                } else {
                    // We need to create a method invocation...
                    retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
                }
                return processReturnType(proxy, target, method, retVal);
            } finally {
                if (target != null && !targetSource.isStatic()) {
                    targetSource.releaseTarget(target);
                }
                if (setProxyContext) {
                    // Restore old proxy.
                    AopContext.setCurrentProxy(oldProxy);
                }
            }
        }
    }

    private static class CglibMethodInvocation extends ReflectiveMethodInvocation {
        @Override
        @Nullable
        public Object proceed() throws Throwable {
            try {
                return super.proceed();
            } catch (RuntimeException ex) {
                throw ex;
            } catch (Exception ex) {
                if (ReflectionUtils.declaresException(getMethod(), ex.getClass()) ||
                        KotlinDetector.isKotlinType(getMethod().getDeclaringClass())) {
                    // Propagate original exception if declared on the target method
                    // (with callers expecting it). Always propagate it for Kotlin code
                    // since checked exceptions do not have to be explicitly declared there.
                    throw ex;
                } else {
                    // Checked exception thrown in the interceptor but not declared on the
                    // target method signature -> apply an UndeclaredThrowableException,
                    // aligned with standard JDK dynamic proxy behavior.
                    throw new UndeclaredThrowableException(ex);
                }
            }
        }
    }
}

@Transactional 을 만나게 되면 위와 같이 CglibAopProxy.DynamicAdvisedInterceptorintercept()를 통하여 인터셉트한다.

public class ReflectiveMethodInvocation implements ProxyMethodInvocation, Cloneable {
    @Override
    @Nullable
    public Object proceed() throws Throwable {
        // We start with an index of -1 and increment early.
        if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
            return invokeJoinpoint();
        }

        Object interceptorOrInterceptionAdvice =
                this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
        if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher dm) {
            // Evaluate dynamic method matcher here: static part will already have
            // been evaluated and found to match.
            Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass());
            if (dm.matcher().matches(this.method, targetClass, this.arguments)) {
                return dm.interceptor().invoke(this);
            }
            else {
                // Dynamic matching failed.
                // Skip this interceptor and invoke the next in the chain.
                return proceed();
            }
        }
        else {
            // It's an interceptor, so we just invoke it: The pointcut will have
            // been evaluated statically before this object was constructed.
            return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
        }
    }
}

인터셉터 인덱스가 목록의 끝에 도달하는지 확인한다.
Dynamic Method Matcher를 사용하면 메서드가 맞는지 확인하고, 맞다면 해당 Interceptor를 호출한다. 만약 실패하면 다음 체인을 사용
Dynamic Method Matcher를 사용하지 않는 경우, Interceptor를 호출하여 어드바이스를 적용합니다.

public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
    @Override
    @Nullable
    public Object invoke(MethodInvocation invocation) throws Throwable {
        // Work out the target class: may be {@code null}.
        // The TransactionAttributeSource should be passed the target class
        // as well as the method, which may be from an interface.
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

        // Adapt to TransactionAspectSupport's invokeWithinTransaction...
        return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
            @Override
            @Nullable
            public Object proceedWithInvocation() throws Throwable {
                return invocation.proceed();
            }
            @Override
            public Object getTarget() {
                return invocation.getThis();
            }
            @Override
            public Object[] getArguments() {
                return invocation.getArguments();
            }
        });
    }

}

invokeWithinTransaction() 트랜잭션 내에서 메서드 호출을 조정

CoroutinesInvocationCallback 을 통해 메서드 실행을 wrapping

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
    /**
     * General delegate for around-advice-based subclasses, delegating to several other template
     * methods on this class. Able to handle {@link CallbackPreferringPlatformTransactionManager}
     * as well as regular {@link PlatformTransactionManager} implementations and
     * {@link ReactiveTransactionManager} implementations for reactive return types.
     * @param method the Method being invoked
     * @param targetClass the target class that we're invoking the method on
     * @param invocation the callback to use for proceeding with the target invocation
     * @return the return value of the method, if any
     * @throws Throwable propagated from the target invocation
     */
    @Nullable
    protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
            final InvocationCallback invocation) throws Throwable {

        // If the transaction attribute is null, the method is non-transactional.
        TransactionAttributeSource tas = getTransactionAttributeSource();
        final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
        final TransactionManager tm = determineTransactionManager(txAttr);

        if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager rtm) {
            boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method);
            boolean hasSuspendingFlowReturnType = isSuspendingFunction &&
                    COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName());
            if (isSuspendingFunction && !(invocation instanceof CoroutinesInvocationCallback)) {
                throw new IllegalStateException("Coroutines invocation not supported: " + method);
            }
            CoroutinesInvocationCallback corInv = (isSuspendingFunction ? (CoroutinesInvocationCallback) invocation : null);

            ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
                Class<?> reactiveType =
                        (isSuspendingFunction ? (hasSuspendingFlowReturnType ? Flux.class : Mono.class) : method.getReturnType());
                ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(reactiveType);
                if (adapter == null) {
                    throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
                            method.getReturnType());
                }
                return new ReactiveTransactionSupport(adapter);
            });

            InvocationCallback callback = invocation;
            if (corInv != null) {
                callback = () -> KotlinDelegate.invokeSuspendingFunction(method, corInv);
            }
            Object result = txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, rtm);
            if (corInv != null) {
                Publisher<?> pr = (Publisher<?>) result;
                return (hasSuspendingFlowReturnType ? KotlinDelegate.asFlow(pr) :
                        KotlinDelegate.awaitSingleOrNull(pr, corInv.getContinuation()));
            }
            return result;
        }

        PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

        if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager cpptm)) {
            // Standard transaction demarcation with getTransaction and commit/rollback calls.
            TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

            Object retVal;
            try {
                // This is an around advice: Invoke the next interceptor in the chain.
                // This will normally result in a target object being invoked.
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // target invocation exception
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                cleanupTransactionInfo(txInfo);
            }

            if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                // Set rollback-only in case of Vavr failure matching our rollback rules...
                TransactionStatus status = txInfo.getTransactionStatus();
                if (status != null && txAttr != null) {
                    retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
                }
            }

            commitTransactionAfterReturning(txInfo);
            return retVal;
        }

        else {
            Object result;
            final ThrowableHolder throwableHolder = new ThrowableHolder();

            // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
            try {
                result = cpptm.execute(txAttr, status -> {
                    TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
                    try {
                        Object retVal = invocation.proceedWithInvocation();
                        if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                            // Set rollback-only in case of Vavr failure matching our rollback rules...
                            retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
                        }
                        return retVal;
                    }
                    catch (Throwable ex) {
                        if (txAttr.rollbackOn(ex)) {
                            // A RuntimeException: will lead to a rollback.
                            if (ex instanceof RuntimeException runtimeException) {
                                throw runtimeException;
                            }
                            else {
                                throw new ThrowableHolderException(ex);
                            }
                        }
                        else {
                            // A normal return value: will lead to a commit.
                            throwableHolder.throwable = ex;
                            return null;
                        }
                    }
                    finally {
                        cleanupTransactionInfo(txInfo);
                    }
                });
            }
            catch (ThrowableHolderException ex) {
                throw ex.getCause();
            }
            catch (TransactionSystemException ex2) {
                if (throwableHolder.throwable != null) {
                    logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                    ex2.initApplicationException(throwableHolder.throwable);
                }
                throw ex2;
            }
            catch (Throwable ex2) {
                if (throwableHolder.throwable != null) {
                    logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                }
                throw ex2;
            }

            // Check result state: It might indicate a Throwable to rethrow.
            if (throwableHolder.throwable != null) {
                throw throwableHolder.throwable;
            }
            return result;
        }
    }
}

TransactionAttribute 를 확인하고 TransactionManager 결정

ReactiveTransactionManager를 사용하는 경우, 코루틴 비동기 함수 호출 처리

PlatformTransactionManager 표준 트랜잭션 처리 및 메서드 커밋, 롤백을 관리

CallbackPreferringPlatformTransactionManager 사용시, 트랜잭션 콜백을 전달하고 트랜잭션 내에서 메서드 호출

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
    /**
     * Create a transaction if necessary based on the given TransactionAttribute.
     * <p>Allows callers to perform custom TransactionAttribute lookups through
     * the TransactionAttributeSource.
     * @param txAttr the TransactionAttribute (may be {@code null})
     * @param joinpointIdentification the fully qualified method name
     * (used for monitoring and logging purposes)
     * @return a TransactionInfo object, whether a transaction was created.
     * The {@code hasTransaction()} method on TransactionInfo can be used to
     * tell if there was a transaction created.
     * @see #getTransactionAttributeSource()
     */
    @SuppressWarnings("serial")
    protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
                                                           @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

        // If no name specified, apply method identification as transaction name.
        if (txAttr != null && txAttr.getName() == null) {
            txAttr = new DelegatingTransactionAttribute(txAttr) {
                @Override
                public String getName() {
                    return joinpointIdentification;
                }
            };
        }

        TransactionStatus status = null;
        if (txAttr != null) {
            if (tm != null) {
                status = tm.getTransaction(txAttr);
            }
            else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                            "] because no transaction manager has been configured");
                }
            }
        }
        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    }

}

PlatformTransactionManager 및 TransactionAttribute 사용하여 트랜잭션 확인 및 정보를 구성

public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
    /**
     * This implementation handles propagation behavior. Delegates to
     * {@code doGetTransaction}, {@code isExistingTransaction}
     * and {@code doBegin}.
     * @see #doGetTransaction
     * @see #isExistingTransaction
     * @see #doBegin
     */
    @Override
    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
            throws TransactionException {

        // Use defaults if no transaction definition given.
        TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

        Object transaction = doGetTransaction();
        boolean debugEnabled = logger.isDebugEnabled();

        if (isExistingTransaction(transaction)) {
            // Existing transaction found -> check propagation behavior to find out how to behave.
            return handleExistingTransaction(def, transaction, debugEnabled);
        }

        // Check definition settings for new transaction.
        if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
        }

        // No existing transaction found -> check propagation behavior to find out how to proceed.
        if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
            }
            try {
                return startTransaction(def, transaction, debugEnabled, suspendedResources);
            }
            catch (RuntimeException | Error ex) {
                resume(null, suspendedResources);
                throw ex;
            }
        }
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + def);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
        }
    }

    private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
                                               boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        DefaultTransactionStatus status = newTransactionStatus(
                definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
        doBegin(transaction, definition);
        prepareSynchronization(status, definition);
        return status;
    }
}

TransactionDefinition을 통해 기본값 혹은 설정된 값을 사용하여 트랜잭션 정의
doGetTransaction()을 호출하여 현재 트랜잭션을 가져온다. 이미 존재하는 경우, 전파를 고려하여 상태 반환. 없는 경우 새로 생성

public class JpaTransactionManager extends AbstractPlatformTransactionManager
        implements ResourceTransactionManager, BeanFactoryAware, InitializingBean {
        @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        JpaTransactionObject txObject = (JpaTransactionObject) transaction;

        if (txObject.hasConnectionHolder() && !txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            throw new IllegalTransactionStateException(
                    "Pre-bound JDBC Connection found! JpaTransactionManager does not support " +
                    "running within DataSourceTransactionManager if told to manage the DataSource itself. " +
                    "It is recommended to use a single JpaTransactionManager for all transactions " +
                    "on a single DataSource, no matter whether JPA or JDBC access.");
        }

        try {
            if (!txObject.hasEntityManagerHolder() ||
                    txObject.getEntityManagerHolder().isSynchronizedWithTransaction()) {
                EntityManager newEm = createEntityManagerForTransaction();
                if (logger.isDebugEnabled()) {
                    logger.debug("Opened new EntityManager [" + newEm + "] for JPA transaction");
                }
                txObject.setEntityManagerHolder(new EntityManagerHolder(newEm), true);
            }

            EntityManager em = txObject.getEntityManagerHolder().getEntityManager();

            // Delegate to JpaDialect for actual transaction begin.
            int timeoutToUse = determineTimeout(definition);
            Object transactionData = getJpaDialect().beginTransaction(em,
                    new JpaTransactionDefinition(definition, timeoutToUse, txObject.isNewEntityManagerHolder()));
            txObject.setTransactionData(transactionData);
            txObject.setReadOnly(definition.isReadOnly());

            // Register transaction timeout.
            if (timeoutToUse != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getEntityManagerHolder().setTimeoutInSeconds(timeoutToUse);
            }

            // Register the JPA EntityManager's JDBC Connection for the DataSource, if set.
            if (getDataSource() != null) {
                ConnectionHandle conHandle = getJpaDialect().getJdbcConnection(em, definition.isReadOnly());
                if (conHandle != null) {
                    ConnectionHolder conHolder = new ConnectionHolder(conHandle);
                    if (timeoutToUse != TransactionDefinition.TIMEOUT_DEFAULT) {
                        conHolder.setTimeoutInSeconds(timeoutToUse);
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug("Exposing JPA transaction as JDBC [" + conHandle + "]");
                    }
                    TransactionSynchronizationManager.bindResource(getDataSource(), conHolder);
                    txObject.setConnectionHolder(conHolder);
                }
                else {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Not exposing JPA transaction [" + em + "] as JDBC transaction because " +
                                "JpaDialect [" + getJpaDialect() + "] does not support JDBC Connection retrieval");
                    }
                }
            }

            // Bind the entity manager holder to the thread.
            if (txObject.isNewEntityManagerHolder()) {
                TransactionSynchronizationManager.bindResource(
                        obtainEntityManagerFactory(), txObject.getEntityManagerHolder());
            }
            txObject.getEntityManagerHolder().setSynchronizedWithTransaction(true);
        }

        catch (TransactionException ex) {
            closeEntityManagerAfterFailedBegin(txObject);
            throw ex;
        }
        catch (Throwable ex) {
            closeEntityManagerAfterFailedBegin(txObject);
            throw new CannotCreateTransactionException("Could not open JPA EntityManager for transaction", ex);
        }
    }
}

커넥션 홀더가 있는 경우, 트랜잭션과 동기화 되었는지 확인
엔티티 매니저 홀더가 없거나 이미 트랜잭션과 동기화되어 있다면 새 엔티티 매니저를 생성하고 홀더에 설정
JpaDialect를 사용하여 실제 트랜잭션을 시작
데이터 소스가 설정되어 있으면 JDBC 연결 홀더를 등록한다.
엔터티 매니저 홀더를 현재 스레드에 바인딩하고, 트랜잭션과 동기화되었음을 표시

 

 

https://github.com/hanbong5938/practice/tree/master/tx/src

'IT > Spring' 카테고리의 다른 글

Spring Boot 3.2 변경점  (3) 2023.11.24
Spring AOP 분석  (0) 2023.10.10
RestControllerAdvice 사용하여 로깅  (0) 2023.09.25
AOP 사용하여 로깅  (0) 2023.09.25
Interceptor를 사용하여 Request/Reponse Logging  (0) 2023.09.20

댓글