반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지

Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

 

우선 CQRS 패턴을 적용하기 위해 

https://youtu.be/BnS6343GTkY?si=kRkeJSen4kr-3tO9

 

해당 영상을 참고했다.

 

사실 CQRS가 Command Query Responsibility Segregation로 그냥 단순히 서비스 로직과 쿼리를 분리해서 무슨 이점이 있을까? 라는 생각을 가지고 있었다.

 

하지만 우아한 형제들에서 이런 사용 사례들을 보고 나니 사용하는 이유를 좀 알 수 있을 것 같았다.

 

그리고 내가 이해한 것이 맞다면 아래와 같은 이유로 사용할 것이다.

 

이런 MSA 환경에서 주문을 수행하기 위해서는 고객의 정보를 조회해야 한다.

 

이런 상황에서 동기적으로 동작하기 위해서는 Customer 서버에 Http 요청을 보내야 하지만 이 과정에서 큰 오버헤드가 발생하고, Http 응답이 오기까지 Order 서버에서 block된다는 문제가 발생한다.

그렇다고 Order 서버에서 Customer Database에 접근하면 분리의 원칙을 위반하고 데이터베이스를 조회하는 과정에서 병목이 발생할 가능성이 있다.

 

그렇기에

그렇기에 Customer 서버에서 데이터의 변경요청이 오면 이벤트를 발행해 다른 서버들에게 알려주고, 다른 서버들은 그 중 필수적으로 필요한 부분만 조회가 빠른 데이터베이스에 저장해 조회하면서 사용하는 것이다.

 

그렇게되면 고객 정보의 Create, Update, Delete는 Customer 서버에서 일어나고 고객정보의 Read는 Order 서버에서 일어나게 된다.

이런 것을 CQRS라고 말한다고 생각한다.

 

구현은 Saga, Outbox에 비해 별로 어렵지 않았다.

기존의 데이터는 MongoDB에 저장하고 있었고 Order 서버에서도 고객의 정보를 추가적으로 저장해야 하는데, 내가 사용할 수 있는 조회가 가장 빠른 데이터베이스인 Redis를 사용했다.

 

    override fun createCustomer(createCustomerCommand: CreateCustomerCommand): Mono<CreateCustomerResponse> {
        val customer = Customer(
            id = CustomerId(ObjectId.get()),
            username = createCustomerCommand.username,
            email = createCustomerCommand.email
        )

        val customerCreatedEvent = customerDomainService.validateAndCreateCustomer(customer)

        return customerMessagePublisher.publish(customerCreatedEvent)
            .then(
                customerRepository.save(customer)
            ).thenReturn(CreateCustomerResponse(customer.id.toString(), customer.username, customer.email))
    }

 

이렇게 고객의 생성 명령이 실행되면 데이터베이스에 저장하며, 이벤트를 발행한다.

물론 이 과정 내에서도 Transaction 처리와 Outbox 패턴을 적용해야 하지만, 그 부분은 생략하도록 하겠다.

 

@KafkaListener(id = "\${kafka.consumer.customer-consumer-group-id}",
        topics = ["\${kafka.topic.customer-create}"])
    override fun receive(
        @Payload values: List<CustomerCreateAvroModel>,
        @Header(KafkaHeaders.RECEIVED_KEY) keys: List<String>,
        @Header(KafkaHeaders.RECEIVED_PARTITION) partitions: List<Int>,
        @Header(KafkaHeaders.OFFSET) offsets: List<Long>
    ) {
        values.forEach{
            customerCreateMessageListener.createCredit(
                Credit(
                    customerId = CustomerId(id = ObjectId(it.id)),
                    totalCreditAmount = Money.ZERO
                )
            ).subscribe()
        }
    }

 

이벤트를 수신하는 부분은 이렇게 고객의 id를 받아서 결제 정보를 만들게 된다.

 

여기서 이벤트는 고객의 아이디, 이름, 이메일 모두 발행이 되었지만, 수신하는 부분에서는 고객의 id만 사용하게 된다.

 

결제 과정에서는 고객의 이름, 이메일을 사용하지 않기 때문에 굳이 저장할 필요가 없다.

최소 데이터 보관 원칙에 의해 꼭 필요한 column들만 저장을 하는 것이 좋다.

 

    override fun createCredit(credit: Credit): Mono<Void> {
        return creditRepository.save(credit).then()
    }
    override fun save(credit: Credit): Mono<Credit>{
        return reactiveRedisTemplate.opsForValue().set(
            "${redisPrefix}${credit.customerId.id}",
            objectMapper.writeValueAsString(creditToCreditEntity(credit))
        ).thenReturn(credit)
    }

 

그리고는 조회가 빠른 Redis에 저장을 해두고 주문이 발생하면, Customer 서버에 데이터를 요청하지 않고 빠르게 주문을 수행할 수 있도록 한다.

반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지

Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

 

기존의 Saga 패턴에서 더 강화된 내용이라고 생각하면 될 것이다.

 

Saga 패턴은 긴 트랜잭션을 짧은 트랜잭션으로 나누고 process와 rollback을 사용하여 하나씩 나아가는 구조였다.

 

여기서 이벤트를 전송하고, 변경된 내용을 데이터베이스에 저장하게 되는데 만약 이벤트를 전송하고 데이터베이스에서 에러가 발생해 이벤트만 전송되게 된다면 문제가 발생할 수 있다.

 

이런 문제를 해결하기 위해 사용하는 패턴이다.

방법은 먼저 데이터베이스에 변경사항들을 저장하고, 스케줄러를 사용해 한 번에 이벤트를 전송하는 것이다.

이 때 변경사항들은 기존의 데이터베이스가 아닌, 이벤트를 위한 별도의 저장공간을 만들게 된다.

이곳에 보낼 데이터를 미리 저장해두고 나중에 보내기 때문에 보낼 편지함(Outbox)패턴이라고 불리게 된다.

그리고 모든 사항이 완료된 Outbox 데이터들은 데이터베이스의 최적화를 위해 스케줄러를 사용해서 지속적으로 삭제해준다.

 

이러한 방법을 MSA에서 Outbox 패턴이라고 한다.

 

이렇게 별도의 데이터베이스에 저장해두고 한 번에 보내게되며, 실제 서비스에서는 1~2초의 간격으로 스케줄러를 실행한다고 한다.

 

우선 아래에는 직접 작성한 Outbox 패턴으로 설명해보겠다.

 

@Component
class PaymentOutboxScheduler(
    private val paymentOutboxHelper: PaymentOutboxHelper,
    private val paymentRequestMessagePublisher: PaymentRequestMessagePublisher
): OutboxScheduler {

    private val logger = LoggerFactory.getLogger(PaymentOutboxScheduler::class.java)

    @Transactional
    @Scheduled(fixedDelay = 10000, initialDelay = 10000)
    override fun processOutboxMessages() {
        logger.info("결제를 요청하는 스케줄러가 실행되었습니다.")
        paymentOutboxHelper.getPaymentOutboxMessageByOutboxStatusAndOrderStatus(
            OutboxStatus.STARTED,
            listOf(OrderStatus.PENDING, OrderStatus.CANCELLING)
        ).publishOn(Schedulers.boundedElastic()).map{
            paymentOutboxMessage: PaymentOutboxMessage ->
            if(paymentOutboxMessage.payload.orderStatus == OrderStatus.CANCELLING) {

                paymentOutboxMessage.payload.paymentOrderStatus = PaymentOrderStatus.CANCELLING
            }
            paymentRequestMessagePublisher.publish(
                paymentOutboxMessage = paymentOutboxMessage,
                callback = ::updateOutboxStatus
            ).subscribe()
        }.subscribe()
    }

    private fun updateOutboxStatus(paymentOutboxMessage: PaymentOutboxMessage, outboxStatus: OutboxStatus): Mono<Void> {
        paymentOutboxMessage.outboxStatus = outboxStatus
        return paymentOutboxHelper.save(paymentOutboxMessage).then()
    }
}

 

주문 서버에서 결제 서버로 결제를 요청하는 과정이다.

우선 주문이 발생하면 Outbox의 상태가 Start인 값들만 조회한다.

 

처음에 Outbox에 Start로 저장을 하기 때문에 한번도 전송된 적이 없는 데이터를 불러오는 것이다.

 

그렇게 조회된 모든 데이터를 모두 publisher로 전송을 하며, 전송을 하면 callback을 사용해 Outbox의 Status를 Complete로 변경해준다.

 

publisher의 내용이다.

override fun publish(
        paymentOutboxMessage: PaymentOutboxMessage,
        callback: (PaymentOutboxMessage, OutboxStatus) -> Mono<Void>
    ): Mono<Void> {
        return mono{

            val paymentEventPayload = paymentOutboxMessage.payload

            logger.info("{} 주문에 대한 이벤트 전송을 준비 중입니다.", paymentEventPayload.orderId.toString())

            val paymentRequestAvroModel = paymentEventPayloadToPaymentRequestAvroModel(paymentEventPayload)

            reactiveKafkaProducer.send(
                paymentRequestTopic,
                paymentEventPayload.orderId.toString(),
                paymentRequestAvroModel
            ).publishOn(Schedulers.boundedElastic()).map{
                callback(paymentOutboxMessage, OutboxStatus.COMPLETED).subscribe()
            }.doOnError{
                callback(paymentOutboxMessage, OutboxStatus.FAILED).subscribe()
            }.subscribe()

            logger.info("{}의 주문이 메시지 큐로 결제 요청을 위해 전송되었습니다", paymentEventPayload.orderId.toString())

        }.then()
    }

 

이렇게 해당 데이터를 model로 변환하여 kafka로 전송을 하고, 전송 상태의 여부에 따라 callback을 사용하여 outbox의 상태를 변환한다.

 

결제 서버의 내용은 작성하지 않겠지만, 결제 서버에서도 수신 받은 내용에 따라 내용을 처리하고 kafka로 전송할 데이터를 outbox에 저장해주면 된다.

 

이제 여기서 문제가 생기게 된다.

 

스케줄러를 사용하기 때문에 특정 시간에만 동기화가 이루어지게 되며, 해당 스케줄러가 동작하는 시간에만 CPU의 사용량이 늘어나게 된다.

 

이러한 방법을 해결하기 위해 마지막으로 CDC 패턴을 사용한다고 한다.

CDC와 관련된 내용은 CQRS 다음에 작성해보도록 하겠다.

반응형

프로젝트에서 DeepL API를 사용해 PDF를 번역하는 기능이 추가되었다.

https://developers.deepl.com/docs

 

Introduction | DeepL API Documentation

Learn more about the DeepL API's capabilities and common use cases.

developers.deepl.com

 

우선 DeepL의 공식문서이다.

처음에는 늘 그렇듯 curl을 사용해 파일을 보내고, 응답을 받을 것이라고 생각했지만 자바쪽으로 지원해주는 라이브러리가 있었다.

 

 

자바 라이브러리의 주소는 다음과 같다.

https://github.com/DeepLcom/deepl-java

 

GitHub - DeepLcom/deepl-java: Official Java library for the DeepL language translation API.

Official Java library for the DeepL language translation API. - DeepLcom/deepl-java

github.com

 

요즘에는 자료가 많이 없는 기능들을 사용하다보니, 이렇게 깃허브에 직접 찾아들어가 사용방법을 찾아보는 일이 많아졌다.

 

implementation("com.deepl.api:deepl-java:1.9.0")

 

현재 기준으로 가장 최신 버전인 해당 라이브러리를 추가해준다.

 

처음에는 해당 문서처럼 Translator를 생성하려고 했지만, 해당 기능은 Deprecated 되었다고 한다.

(그러면 문서 좀 수정해주지...)

 

지금은 DeepLClient 클래스를 생성하고, 해당 생성자에 DeepL의 API 키를 넘겨주면 된다.

 

DeepLClient(deepLKey)

 

해당 클래스에서도 translateDocument라는 함수를 사용할 것이다.

 

해당 함수를 오버로딩하고 있는 함수들은 다음과 같다.

    public DocumentStatus translateDocument(File inputFile, File outputFile, @Nullable String sourceLang, String targetLang, @Nullable DocumentTranslationOptions options) throws DocumentTranslationException, IOException {
        try {
            if (outputFile.exists()) {
                throw new IOException("File already exists at output path");
            } else {
                InputStream inputStream = new FileInputStream(inputFile);

                DocumentStatus var8;
                try {
                    OutputStream outputStream = new FileOutputStream(outputFile);

                    try {
                        var8 = this.translateDocument(inputStream, inputFile.getName(), outputStream, sourceLang, targetLang, options);
                    } catch (Throwable var12) {
                        try {
                            outputStream.close();
                        } catch (Throwable var11) {
                            var12.addSuppressed(var11);
                        }

                        throw var12;
                    }

                    outputStream.close();
                } catch (Throwable var13) {
                    try {
                        inputStream.close();
                    } catch (Throwable var10) {
                        var13.addSuppressed(var10);
                    }

                    throw var13;
                }

                inputStream.close();
                return var8;
            }
        } catch (Exception exception) {
            outputFile.delete();
            throw exception;
        }
    }

    public DocumentStatus translateDocument(File inputFile, File outputFile, @Nullable String sourceLang, String targetLang) throws DocumentTranslationException, IOException {
        return this.translateDocument((File)inputFile, (File)outputFile, (String)sourceLang, targetLang, (DocumentTranslationOptions)null);
    }

    public DocumentStatus translateDocument(InputStream inputStream, String fileName, OutputStream outputStream, @Nullable String sourceLang, String targetLang, @Nullable DocumentTranslationOptions options) throws DocumentTranslationException {
        DocumentHandle handle = null;

        try {
            handle = this.translateDocumentUpload(inputStream, fileName, sourceLang, targetLang, options);
            DocumentStatus status = this.translateDocumentWaitUntilDone(handle);
            this.translateDocumentDownload(handle, outputStream);
            return status;
        } catch (Exception exception) {
            throw new DocumentTranslationException("Error occurred during document translation: " + exception.getMessage(), exception, handle);
        }
    }

    public DocumentStatus translateDocument(InputStream inputFile, String fileName, OutputStream outputFile, @Nullable String sourceLang, String targetLang) throws DocumentTranslationException {
        return this.translateDocument(inputFile, fileName, outputFile, sourceLang, targetLang, (DocumentTranslationOptions)null);
    }

 

File을 넘기는 함수가 아닌 inpuStream을 넘기는 함수를 사용할 것이며, 사용 가능한 언어의 종류는 DeepL의 공식문서에 나와있다.

 

여기서 sourceLang은 Null이 가능하다.

현재 문서에 대한 정보를 주지 않아도, 번역을 해보니 sourceLang을 지정해 줄 때와 똑같은 결과가 나왔었다.

 

해당 함수를 사용하면 byteOutputStream이 나오게 된다.

해당 byteOutputStream을 byteArray로 바꾸어서 controller에서 응답해주면 된다.

@Service
class DeepLManager(
    @Value("\${deepL.key}")
    private val deepLKey: String
) {

    fun translateDocument(inputStream: InputStream, fileName: String, targetLang: LanguageEnum): ByteArray {
        ByteArrayOutputStream().use{
            byteArrayOutputStream ->
            DeepLClient(deepLKey).translateDocument(
                inputStream,
                fileName,
                byteArrayOutputStream,
                null,
                targetLang.targetLang,
            )
            return byteArrayOutputStream.toByteArray()
        }
    }

}

 

해당 서비스는 DeepL로 부터 번역된 ByteArray를 가져오는 서비스이고

그냥 응답받은 byteArrayOutputStream에서 toByteArray만 호출하면 ByteArray로 변환된다.

 

이렇게 응답받은 ByteArray을

return translateConnector.createTranslate(
            docsId, createTranslateReq
        ).map{
            ByteArrayOutputStream().use{

            }
            ResponseEntity.ok()
                .contentType(
                    MediaType.APPLICATION_OCTET_STREAM
                )
                .headers{
                    header ->
                    header.contentDisposition = ContentDisposition.builder("pdf")
                        .filename("${docsId}-${createTranslateReq.targetLang.name}.pdf")
                        .build()
                }
                .body(it)
        }

 

이런 식으로 controller에서 응답해주면 된다.

이렇게 하면 지정해준 파일의 이름으로 번역된 파일을 다운받을 수 있다.

 

+ 근데 이게 생각보다 돈이 많이 나오는 거 같다.

이렇게 만들고 몇번 요청을 한 후, PM한테 금액을 확인해달라고 부탁하니 벌써 5만원이 나왔다고 한다...

다들 조심해서 사용하는 게 좋을 것 같다.......

'틔움랩' 카테고리의 다른 글

MockBean deprecated와 대체  (0) 2025.01.28
Github action을 통한 Spring CI/CD 설정  (0) 2025.01.25

+ Recent posts