반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지

Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

 

기존의 Saga 패턴에서 더 강화된 내용이라고 생각하면 될 것이다.

 

Saga 패턴은 긴 트랜잭션을 짧은 트랜잭션으로 나누고 process와 rollback을 사용하여 하나씩 나아가는 구조였다.

 

여기서 이벤트를 전송하고, 변경된 내용을 데이터베이스에 저장하게 되는데 만약 이벤트를 전송하고 데이터베이스에서 에러가 발생해 이벤트만 전송되게 된다면 문제가 발생할 수 있다.

 

이런 문제를 해결하기 위해 사용하는 패턴이다.

방법은 먼저 데이터베이스에 변경사항들을 저장하고, 스케줄러를 사용해 한 번에 이벤트를 전송하는 것이다.

이 때 변경사항들은 기존의 데이터베이스가 아닌, 이벤트를 위한 별도의 저장공간을 만들게 된다.

이곳에 보낼 데이터를 미리 저장해두고 나중에 보내기 때문에 보낼 편지함(Outbox)패턴이라고 불리게 된다.

그리고 모든 사항이 완료된 Outbox 데이터들은 데이터베이스의 최적화를 위해 스케줄러를 사용해서 지속적으로 삭제해준다.

 

이러한 방법을 MSA에서 Outbox 패턴이라고 한다.

 

이렇게 별도의 데이터베이스에 저장해두고 한 번에 보내게되며, 실제 서비스에서는 1~2초의 간격으로 스케줄러를 실행한다고 한다.

 

우선 아래에는 직접 작성한 Outbox 패턴으로 설명해보겠다.

 

@Component
class PaymentOutboxScheduler(
    private val paymentOutboxHelper: PaymentOutboxHelper,
    private val paymentRequestMessagePublisher: PaymentRequestMessagePublisher
): OutboxScheduler {

    private val logger = LoggerFactory.getLogger(PaymentOutboxScheduler::class.java)

    @Transactional
    @Scheduled(fixedDelay = 10000, initialDelay = 10000)
    override fun processOutboxMessages() {
        logger.info("결제를 요청하는 스케줄러가 실행되었습니다.")
        paymentOutboxHelper.getPaymentOutboxMessageByOutboxStatusAndOrderStatus(
            OutboxStatus.STARTED,
            listOf(OrderStatus.PENDING, OrderStatus.CANCELLING)
        ).publishOn(Schedulers.boundedElastic()).map{
            paymentOutboxMessage: PaymentOutboxMessage ->
            if(paymentOutboxMessage.payload.orderStatus == OrderStatus.CANCELLING) {

                paymentOutboxMessage.payload.paymentOrderStatus = PaymentOrderStatus.CANCELLING
            }
            paymentRequestMessagePublisher.publish(
                paymentOutboxMessage = paymentOutboxMessage,
                callback = ::updateOutboxStatus
            ).subscribe()
        }.subscribe()
    }

    private fun updateOutboxStatus(paymentOutboxMessage: PaymentOutboxMessage, outboxStatus: OutboxStatus): Mono<Void> {
        paymentOutboxMessage.outboxStatus = outboxStatus
        return paymentOutboxHelper.save(paymentOutboxMessage).then()
    }
}

 

주문 서버에서 결제 서버로 결제를 요청하는 과정이다.

우선 주문이 발생하면 Outbox의 상태가 Start인 값들만 조회한다.

 

처음에 Outbox에 Start로 저장을 하기 때문에 한번도 전송된 적이 없는 데이터를 불러오는 것이다.

 

그렇게 조회된 모든 데이터를 모두 publisher로 전송을 하며, 전송을 하면 callback을 사용해 Outbox의 Status를 Complete로 변경해준다.

 

publisher의 내용이다.

override fun publish(
        paymentOutboxMessage: PaymentOutboxMessage,
        callback: (PaymentOutboxMessage, OutboxStatus) -> Mono<Void>
    ): Mono<Void> {
        return mono{

            val paymentEventPayload = paymentOutboxMessage.payload

            logger.info("{} 주문에 대한 이벤트 전송을 준비 중입니다.", paymentEventPayload.orderId.toString())

            val paymentRequestAvroModel = paymentEventPayloadToPaymentRequestAvroModel(paymentEventPayload)

            reactiveKafkaProducer.send(
                paymentRequestTopic,
                paymentEventPayload.orderId.toString(),
                paymentRequestAvroModel
            ).publishOn(Schedulers.boundedElastic()).map{
                callback(paymentOutboxMessage, OutboxStatus.COMPLETED).subscribe()
            }.doOnError{
                callback(paymentOutboxMessage, OutboxStatus.FAILED).subscribe()
            }.subscribe()

            logger.info("{}의 주문이 메시지 큐로 결제 요청을 위해 전송되었습니다", paymentEventPayload.orderId.toString())

        }.then()
    }

 

이렇게 해당 데이터를 model로 변환하여 kafka로 전송을 하고, 전송 상태의 여부에 따라 callback을 사용하여 outbox의 상태를 변환한다.

 

결제 서버의 내용은 작성하지 않겠지만, 결제 서버에서도 수신 받은 내용에 따라 내용을 처리하고 kafka로 전송할 데이터를 outbox에 저장해주면 된다.

 

이제 여기서 문제가 생기게 된다.

 

스케줄러를 사용하기 때문에 특정 시간에만 동기화가 이루어지게 되며, 해당 스케줄러가 동작하는 시간에만 CPU의 사용량이 늘어나게 된다.

 

이러한 방법을 해결하기 위해 마지막으로 CDC 패턴을 사용한다고 한다.

CDC와 관련된 내용은 CQRS 다음에 작성해보도록 하겠다.

반응형

팀원이 작성한 코드에서 성능 튜닝이 필요한 코드를 찾았다.

override fun patchAdmin(routineId: Int, newAdminId: Int, authentication: Authentication): ResponseEntity<CoBoResponseDto<CoBoResponseStatus>> {
        val user = userRepository.findById(authentication.name.toInt())
            .orElseThrow{throw NoSuchElementException("일치하는 사용자가 없습니다.")}

        val newAdmin = userRepository.findById(newAdminId)
            .orElseThrow{throw NoSuchElementException("일치하는 사용자가 없습니다.")}

        val routine = routineRepository.findById(routineId)
            .orElseThrow{throw NoSuchElementException("일치하는 루틴이 없습니다.")}

        if (routine.admin != user)
            throw IllegalAccessException("수정 권한이 없습니다.")

        if (!participationRepository.existsByUserAndRoutine(newAdmin, routine))
            throw NoSuchElementException("참여 정보가 없는 사용자입니다.")

        routine.admin = newAdmin
        routineRepository.save(routine)

        return CoBoResponse<CoBoResponseStatus>(CoBoResponseStatus.SUCCESS).getResponseEntity()
    }

 

해당 코드이다.

 

무려 데이터베이스에 5번 접근하게 된다.

마지막에 save하는 부분은 데이터베이스에서 데이터를 가져온 후 호출해야 하기 때문에 마지막에 실행해야 하지만, 위의 4개의 접근은 의존성이 존재하지 않기 때문에 4개를 각각 다른 쓰레드에서 비동기적으로 실행이 가능하다.

 

if (!participationRepository.existsByUserAndRoutine(newAdmin, routine))
            throw NoSuchElementException("참여 정보가 없는 사용자입니다.")

 

현재 이 코드는 의존성이 존재하지만, JPA가 아닌 QueryDsl을 사용하여 리펙토링해서 의존성을 없애도록 하였다.

 

participationRepository.existsByUserIdAndRoutineIdByQueryDsl(newAdminId, routineId)

 

이렇게 Entity가 아닌 Int 타입의 Id로 검색하도록 코드를 변경했다.

override fun existsByUserIdAndRoutineIdByQueryDsl(userId: Int, routineId: Int): Boolean {
        return jpaQueryFactory
            .select(participation)
            .from(participation)
            .leftJoin(participation.user)
            .where(
                participation.routine.id.eq(routineId),
                participation.user.kakaoId.eq(userId)
            ).fetchFirst() != null
    }

 

이 부분은 QueryDsl의 코드이다.

 

이렇게 저 코드에서도 비동기적으로 실행 할 수 있도록 의존성을 제거하였다.

 

이제 application 단계에서 코드들을 비동기적으로 처리하여 속도를 높여보자.

override fun patchAdmin(routineId: Int, newAdminId: Int, authentication: Authentication): ResponseEntity<CoBoResponseDto<CoBoResponseStatus>> {

        val userCompletableFuture = CompletableFuture.supplyAsync{
            userRepository.findById(authentication.name.toInt())
                .orElseThrow{throw NoSuchElementException("일치하는 사용자가 없습니다.")}
        }

        val newAdminFuture = CompletableFuture.supplyAsync {
            userRepository.findById(newAdminId)
                .orElseThrow{throw NoSuchElementException("일치하는 사용자가 없습니다.")}
        }

        val routineFuture = CompletableFuture.supplyAsync{
            routineRepository.findById(routineId)
                .orElseThrow{throw NoSuchElementException("일치하는 루틴이 없습니다.")}
        }

        val isParticipationFuture = CompletableFuture.supplyAsync{
            participationRepository.existsByUserIdAndRoutineIdByQueryDsl(newAdminId, routineId)
        }

        return CompletableFuture.allOf(userCompletableFuture, newAdminFuture, routineFuture, isParticipationFuture)
            .thenApplyAsync {

                val user = userCompletableFuture.get()
                val newAdmin = newAdminFuture.get()
                val routine = routineFuture.get()
                val isParticipation = isParticipationFuture.get()

                if (routine.admin != user)
                    throw IllegalAccessException("수정 권한이 없습니다.")

                if (!isParticipation)
                    throw NoSuchElementException("참여 정보가 없는 사용자입니다.")

                routine.admin = newAdmin
                routineRepository.save(routine)

                CoBoResponse<CoBoResponseStatus>(CoBoResponseStatus.SUCCESS).getResponseEntity()
            }.get()
    }

 

이렇게 코드를 변경하였다.

 

위의 4개를 CompletableFuture로 담아 비동기적으로 실행하였으며, 4개의 쓰레드가 모두 완료되었을 때를 allOf로 가져와 한 곳에서 모아 결과를 확인 한 후 다시 routineRepository에 저장하도록 하였다.

 

위의 4개의 접근은 한번에 실행하기 때문에 사실상 저 코드는 2개의 데이터베이스 접근의 시간과 비슷하게 튜닝이 되었을 것이다.

 

얼마나 속도가 좋아졌는지 궁금하여 로그로 시간을 찍어보았다.

 

아래 사진은 기존 코드의 실행시간이다.

 

아래 사진은 튜닝한 코드의 실행 시간이다.

 

데이터베이스의 접근 4개를 동시에 처리해서 그런지, 2배 이상의 성능 향상이 일어난 것을 볼 수 있다.

 

최근에 리액티브 프로그래밍에 재미를 느껴 따라해보았는데, 비동기가 왜 필요한지를 다시 한 번 느낄 수 있었던 것 같다.

'크무톡톡 프로젝트' 카테고리의 다른 글

네이버 Oauth 로그인, SpringBoot  (2) 2024.07.23
Nginx로 Swagger Proxy_pass  (1) 2024.07.22
Springboot와 DialogFlow 연동 - API  (0) 2024.01.17
SMTP 서버 구축  (0) 2024.01.04
EC2에 Java 17 설치  (1) 2023.12.29

+ Recent posts