반응형

스프링을 사용하다보면 @Transactional을 굉장히 많이 사용한다.

 

에러가 발생하면 transaction을 rollback 시켜주기 때문에 사용한다라고는 알고 있지만, 어떠한 방식으로 동작하는지는 제대로 알아보지 않은 것 같다.

 

우선 블로그들과 spring framework의 github를 참고했다.

 

@Transactional을 사용하면 interceptor가 중간에 가져와서 invokeWithinTransaction가 실행되도록 한다.

 

안에서 작성된 코드는 이정도 되는 것 같다.

 

https://github.com/spring-projects/spring-framework/blob/main/spring-tx/src/main/java/org/springframework/transaction/annotation/Transactional.java

protected @Nullable Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
			final InvocationCallback invocation) throws Throwable {

		// If the transaction attribute is null, the method is non-transactional.
		TransactionAttributeSource tas = getTransactionAttributeSource();
		final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
		final TransactionManager tm = determineTransactionManager(txAttr, targetClass);

		if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager rtm) {
			boolean isSuspendingFunction = KotlinDetector.isSuspendingFunction(method);
			boolean hasSuspendingFlowReturnType = isSuspendingFunction &&
					COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName());

			ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
				Class<?> reactiveType =
						(isSuspendingFunction ? (hasSuspendingFlowReturnType ? Flux.class : Mono.class) : method.getReturnType());
				ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(reactiveType);
				if (adapter == null) {
					throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type [" +
							method.getReturnType() + "] with specified transaction manager: " + tm);
				}
				return new ReactiveTransactionSupport(adapter);
			});

			return txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, rtm);
		}

		PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
		final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

		if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager cpptm)) {
			// Standard transaction demarcation with getTransaction and commit/rollback calls.
			TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

			Object retVal;
			try {
				// This is an around advice: Invoke the next interceptor in the chain.
				// This will normally result in a target object being invoked.
				retVal = invocation.proceedWithInvocation();
			}
			catch (Throwable ex) {
				// target invocation exception
				completeTransactionAfterThrowing(txInfo, ex);
				throw ex;
			}
			finally {
				cleanupTransactionInfo(txInfo);
			}

			if (retVal != null && txAttr != null) {
				TransactionStatus status = txInfo.getTransactionStatus();
				if (status != null) {
					if (retVal instanceof Future<?> future && future.isDone()) {
						try {
							future.get();
						}
						catch (ExecutionException ex) {
							Throwable cause = ex.getCause();
							Assert.state(cause != null, "Cause must not be null");
							if (txAttr.rollbackOn(cause)) {
								status.setRollbackOnly();
							}
						}
						catch (InterruptedException ex) {
							Thread.currentThread().interrupt();
						}
					}
					else if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
						// Set rollback-only in case of Vavr failure matching our rollback rules...
						retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
					}
				}
			}

			commitTransactionAfterReturning(txInfo);
			return retVal;
		}

		else {
			Object result;
			final ThrowableHolder throwableHolder = new ThrowableHolder();

			// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
			try {
				result = cpptm.execute(txAttr, status -> {
					TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
					try {
						Object retVal = invocation.proceedWithInvocation();
						if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
							// Set rollback-only in case of Vavr failure matching our rollback rules...
							retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
						}
						return retVal;
					}
					catch (Throwable ex) {
						if (txAttr.rollbackOn(ex)) {
							// A RuntimeException: will lead to a rollback.
							if (ex instanceof RuntimeException runtimeException) {
								throw runtimeException;
							}
							else {
								throw new ThrowableHolderException(ex);
							}
						}
						else {
							// A normal return value: will lead to a commit.
							throwableHolder.throwable = ex;
							return null;
						}
					}
					finally {
						cleanupTransactionInfo(txInfo);
					}
				});
			}
			catch (ThrowableHolderException ex) {
				throw ex.getCause();
			}
			catch (TransactionSystemException ex2) {
				if (throwableHolder.throwable != null) {
					logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
					ex2.initApplicationException(throwableHolder.throwable);
				}
				throw ex2;
			}
			catch (Throwable ex2) {
				if (throwableHolder.throwable != null) {
					logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
				}
				throw ex2;
			}

			// Check result state: It might indicate a Throwable to rethrow.
			if (throwableHolder.throwable != null) {
				throw throwableHolder.throwable;
			}
			return result;
		}
	}

 

우선 트랜잭션의 속성과 매니저를 가져온다.

TransactionAttributeSource tas = getTransactionAttributeSource();
TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
TransactionManager tm = determineTransactionManager(txAttr, targetClass);

 

특별한 설정이 없다면, 트랜잭션을 처리한다.

PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
...
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager cpptm)) {

}

 

아래의 메서드로 트랜잭션이 진행된다.

 

  • 트랜잭션 시작 (createTransactionIfNecessary)
  • 비즈니스 로직 실행 (invocation.proceedWithInvocation())
  • 예외 발생 시 롤백 (completeTransactionAfterThrowing)
  • 정상 종료 시 커밋 (commitTransactionAfterReturning)

 

아래의 메서드로 에러 발생시 롤백이 예약된다.

status.setRollbackOnly();

 

 

 

만약, 해당 트랜잭션이 중첩이 되어 있다면

호출한 트랜잭션으로 전파된다.

if (throwableHolder.throwable != null) {
    throw throwableHolder.throwable;
}
return result;

 

간단하게 정리하자면

 

  • Spring이 트랜잭션 어노테이션이 붙은 메서드를 감지
  • 그 메서드를 프록시(Proxy) 객체로 감쌉니다
  • 실제 메서드를 호출할 때, 프록시가 트랜잭션 관련 로직을 앞뒤로 끼워 넣습니다

이런식으로 트랜잭션이 처리된다.

 

반응형

면접에서 나왔던 질문이지만, 내가 대답하지 못했던 부분에 대해서 알아보려고 한다.

 

우선 트랜잭션이 무엇인지, 왜 필요한지는 이미 안다고 생각한다.

 

그리고 당연히 트랜잭션의 레벨이 높아질수록, 동시성이 떨어지기 때문에 성능이 떨어진다.

하지만 그만큼 고립시켜서 동작하기 때문에 일관성은 높아진다.

 

우선 크게

Read Uncommitted, Read Committed, Repeatable Read, Serializable

이렇게 4가지로 나뉘며, 오른쪽으로 갈수록 레벨이 높아진다.

 

  • Read Uncommitted

트랜잭션이 처리중이거나, 아직 commit되지 않은 데이터를 다른 트랜잭션이 읽는 것을 허용한다.

 

 

이렇게 그냥 다른 트랜잭션에서 처리를 하고 있더라도, 데이터를 그냥 조회해서 값을 가져온다.

하지만 조회 후, 데이터가 rollback 되어 버리면 부정확한 데이터를 가져오게 된다.

 

이런 Dirty Read가 발생할 수 있기 때문에, 정확도가 너무 낮아 표준에서 인정하지 않는 격리 수준이라고 한다.

 

  • Read Committed

커밋된 데이터만 조회할 수 있도록 한다.

그렇기에 다른 트랜잭션이 접근 할 수 없어 대기하게 된다.

그렇기에 commit 되기 전의 데이터를 읽는 Dirty Read는 발생하지 않게 된다.

 

하지만, 하나의 트랜잭션에서 다른 트랜잭션의 commit에 따라 데이터의 조회 결과가 달라지는 Non-Repeatable Read 문제가 생기게 된다.

그렇지만 자주 발생하는 문제는 아니며, 대부분의 기본 설정이 이 Read Commited라고 한다.

 

  • Repeatable Read

보통의 RDBMS는 변경 전의 데이터를 Undo 공간에 백업해둔다.

그렇기에 트랜잭션의 번호를 확인해, 자신보다 더 늦은 트랜잭션의 번호가 존재한다면 이 Undo 공간에서 데이터를 조회하게 된다.

그렇기 때문에, 다른 트랜잭션에 의해 변경되더라도 동일한 데이터를 조회할 수 있게 된다.

 

하지만 조회 트랜잭션동안 새로운 데이터가 추가되는 것을 감지하지 못하는 Phantom Read가 발생할 수도 있다고 한다.

그래도 이 Phantom Read는 트랜잭션의 번호를 확인해 무시하는 방법으로 해결 할 수 있으며, 현재의 RDBMS에서는 특정 상황이 아니라면 대부분 발생하지 않는 현상이기에 크게 신경쓰지는 않아도 된다고 한다.

 

  • Serializable

마지막은 가장 강력한 Serializable 레벨이다.

그냥 모든 트랜잭션을 순차적으로 실행시키기에 위에 소개했던 모든 정합성 관련 문제들이 발생하지 않는다.

하지만 동시성이 없어지기에 성능이 매우 떨어진다.

극단적으로 안정성이 필요한 경우가 아니라면 잘 사용하지 않는다고 한다.

반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리

Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

 

이제 debezium으로 발행한 이벤트를 소비 해보도록 하자.

 

오늘도 공식문서를 참고하여 글을 작성한다.

https://debezium.io/documentation/reference/stable/connectors/mongodb.html

 

Debezium connector for MongoDB :: Debezium Documentation

A long integer value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, set this property to a positive long value

debezium.io

 

우선 해당 컬렉션에서 변경사항이 생기면

 

이렇게 kafka에 이벤트가 전송되어야 한다.

 

그리고 앞으로 서버에서 직접 kafka로 publish 하는 게 아니기 때문에 잠깐 publish를 주석처리해 두자.

 

이렇게 일단 서버에서 직접 발행이 되지 못하도록 설정해두었다.

 

그리고 일단 listener를 통해 

    @KafkaListener(id = "\${kafka.consumer.payment-consumer-group-id}",
        topics = ["\${kafka.topic.payment-request}"])
    fun receive(
        @Payload value: String,
        @Header(KafkaHeaders.RECEIVED_KEY) keys: String,
        @Header(KafkaHeaders.RECEIVED_PARTITION) partitions: Int,
        @Header(KafkaHeaders.OFFSET) offsets: Long
    ) {

        logger.info("Received $value from $partitions partitions offsets $offsets")
    }

 

 

이렇게하고 데이터베이스를 변경해보았더니, 다음과 같은 문자열이 출력되었다.

 

이제 이거를 json으로 만들어보자.

 

ObjectMapper를 사용해서 json으로 만들어주었다.

https://jsonformatter.org/#google_vignette

 

Best JSON Formatter and JSON Validator: Online JSON Formatter

Online JSON Formatter / Beautifier and JSON Validator will format JSON data, and helps to validate, convert JSON to XML, JSON to CSV. Save and Share JSON

jsonformatter.org

여기에 json을 붙여넣으면 알아서 이쁘게 만들어준다.

 

여기서 payload를 잘봐야 한다.

우선 op가 연산의 종류를 말해준다.

C가 insert, D가 Delete, U가 Update이다.

우리는 여기서 생성되었음을 감지하는 C 일 때만 함수를 실행해야 하기에, 해당 타입일 때만 동작하게 해준다.

 

여기서 우선 필요한 값들을 추출하고, 기존에 호출하던 함수로 연결을 해주었다.

 

@KafkaListener(id = "\${kafka.consumer.payment-consumer-group-id}",
        topics = ["\${kafka.topic.payment-request}"])
    fun receive(
        @Payload value: String,
        @Header(KafkaHeaders.RECEIVED_KEY) keys: String,
        @Header(KafkaHeaders.RECEIVED_PARTITION) partitions: Int,
        @Header(KafkaHeaders.OFFSET) offsets: Long
    ) {

        val cdcJson = objectMapper.readTree(value)

        if(cdcJson["payload"]["op"].asText() == "c"){

            val paymentRequestJson = objectMapper.readTree(cdcJson["payload"]["after"].asText())

            val paymentRequestDto = PaymentRequestDto(
                id = paymentRequestJson["_id"]["\$oid"].asText(),
                customerId = paymentRequestJson["payload"]["customerId"]["\$oid"].asText(),
                price = paymentRequestJson["payload"]["price"]["\$numberLong"].asLong(),
                createdAt= LocalDateTime.ofEpochSecond(paymentRequestJson["createdAt"]["\$date"].asLong() / 1000, 0, ZoneOffset.UTC),
                paymentOrderStatus = PaymentOrderStatus.valueOf(paymentRequestJson["payload"]["paymentOrderStatus"].asText()),
            )

            logger.info("paymentRequestDto $paymentRequestDto")

            if(paymentRequestDto.paymentOrderStatus == PaymentOrderStatus.PENDING){
                logger.info("주문 {}의 결제가 진행 중입니다", paymentRequestDto.id)
                paymentRequestMessageListener.completePayment(paymentRequestDto)
            }else{
                logger.info("주문 {}의 결제가 취소 중입니다", paymentRequestDto.id)
                paymentRequestMessageListener.cancelPayment(paymentRequestDto)
            }.subscribe()
        }
    }

 

json에서 값을 가져와서 그대로 함수를 호출해주었다.

 

이렇게 바꿔준 후 Order 서버에서 새로운 주문을 요청해보았더니

 

Order 서버의 Payment_outbox를 감지해서 Payment 서버에서 이러한 이벤트가 다시 Order 서버로 발행되었다.

 

이렇게 데이터베이스에서의 변경을 감지하고 이벤트를 전송하는 Debezium을 사용해보았다.

 

이렇게 만들기는 했지만, 사실 Json을 사용하는 것보다 Avro model을 사용하는 것이 더 성능이 좋다고 한다.(kakao에서 저장공간의 최적화를 위해 Avro를 사용한다고 한다.)

 

Avro로 시도를 해보았지만, 너무 어려워서 실패했다.

그리고 전체 서비스를 Debezium outbox로 교체한 것은 아니었다.

 

이렇게 MSA에서 필수적으로 사용하는 패턴들만 사용을 해보고, 후에 Toge-do 앱 리펙토링 할 때 반영해보려고 한다.

+ Recent posts