반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리

Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

 

이제 debezium으로 발행한 이벤트를 소비 해보도록 하자.

 

오늘도 공식문서를 참고하여 글을 작성한다.

https://debezium.io/documentation/reference/stable/connectors/mongodb.html

 

Debezium connector for MongoDB :: Debezium Documentation

A long integer value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, set this property to a positive long value

debezium.io

 

우선 해당 컬렉션에서 변경사항이 생기면

 

이렇게 kafka에 이벤트가 전송되어야 한다.

 

그리고 앞으로 서버에서 직접 kafka로 publish 하는 게 아니기 때문에 잠깐 publish를 주석처리해 두자.

 

이렇게 일단 서버에서 직접 발행이 되지 못하도록 설정해두었다.

 

그리고 일단 listener를 통해 

    @KafkaListener(id = "\${kafka.consumer.payment-consumer-group-id}",
        topics = ["\${kafka.topic.payment-request}"])
    fun receive(
        @Payload value: String,
        @Header(KafkaHeaders.RECEIVED_KEY) keys: String,
        @Header(KafkaHeaders.RECEIVED_PARTITION) partitions: Int,
        @Header(KafkaHeaders.OFFSET) offsets: Long
    ) {

        logger.info("Received $value from $partitions partitions offsets $offsets")
    }

 

 

이렇게하고 데이터베이스를 변경해보았더니, 다음과 같은 문자열이 출력되었다.

 

이제 이거를 json으로 만들어보자.

 

ObjectMapper를 사용해서 json으로 만들어주었다.

https://jsonformatter.org/#google_vignette

 

Best JSON Formatter and JSON Validator: Online JSON Formatter

Online JSON Formatter / Beautifier and JSON Validator will format JSON data, and helps to validate, convert JSON to XML, JSON to CSV. Save and Share JSON

jsonformatter.org

여기에 json을 붙여넣으면 알아서 이쁘게 만들어준다.

 

여기서 payload를 잘봐야 한다.

우선 op가 연산의 종류를 말해준다.

C가 insert, D가 Delete, U가 Update이다.

우리는 여기서 생성되었음을 감지하는 C 일 때만 함수를 실행해야 하기에, 해당 타입일 때만 동작하게 해준다.

 

여기서 우선 필요한 값들을 추출하고, 기존에 호출하던 함수로 연결을 해주었다.

 

@KafkaListener(id = "\${kafka.consumer.payment-consumer-group-id}",
        topics = ["\${kafka.topic.payment-request}"])
    fun receive(
        @Payload value: String,
        @Header(KafkaHeaders.RECEIVED_KEY) keys: String,
        @Header(KafkaHeaders.RECEIVED_PARTITION) partitions: Int,
        @Header(KafkaHeaders.OFFSET) offsets: Long
    ) {

        val cdcJson = objectMapper.readTree(value)

        if(cdcJson["payload"]["op"].asText() == "c"){

            val paymentRequestJson = objectMapper.readTree(cdcJson["payload"]["after"].asText())

            val paymentRequestDto = PaymentRequestDto(
                id = paymentRequestJson["_id"]["\$oid"].asText(),
                customerId = paymentRequestJson["payload"]["customerId"]["\$oid"].asText(),
                price = paymentRequestJson["payload"]["price"]["\$numberLong"].asLong(),
                createdAt= LocalDateTime.ofEpochSecond(paymentRequestJson["createdAt"]["\$date"].asLong() / 1000, 0, ZoneOffset.UTC),
                paymentOrderStatus = PaymentOrderStatus.valueOf(paymentRequestJson["payload"]["paymentOrderStatus"].asText()),
            )

            logger.info("paymentRequestDto $paymentRequestDto")

            if(paymentRequestDto.paymentOrderStatus == PaymentOrderStatus.PENDING){
                logger.info("주문 {}의 결제가 진행 중입니다", paymentRequestDto.id)
                paymentRequestMessageListener.completePayment(paymentRequestDto)
            }else{
                logger.info("주문 {}의 결제가 취소 중입니다", paymentRequestDto.id)
                paymentRequestMessageListener.cancelPayment(paymentRequestDto)
            }.subscribe()
        }
    }

 

json에서 값을 가져와서 그대로 함수를 호출해주었다.

 

이렇게 바꿔준 후 Order 서버에서 새로운 주문을 요청해보았더니

 

Order 서버의 Payment_outbox를 감지해서 Payment 서버에서 이러한 이벤트가 다시 Order 서버로 발행되었다.

 

이렇게 데이터베이스에서의 변경을 감지하고 이벤트를 전송하는 Debezium을 사용해보았다.

 

이렇게 만들기는 했지만, 사실 Json을 사용하는 것보다 Avro model을 사용하는 것이 더 성능이 좋다고 한다.(kakao에서 저장공간의 최적화를 위해 Avro를 사용한다고 한다.)

 

Avro로 시도를 해보았지만, 너무 어려워서 실패했다.

그리고 전체 서비스를 Debezium outbox로 교체한 것은 아니었다.

 

이렇게 MSA에서 필수적으로 사용하는 패턴들만 사용을 해보고, 후에 Toge-do 앱 리펙토링 할 때 반영해보려고 한다.

반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지

Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

 

우선 CQRS 패턴을 적용하기 위해 

https://youtu.be/BnS6343GTkY?si=kRkeJSen4kr-3tO9

 

해당 영상을 참고했다.

 

사실 CQRS가 Command Query Responsibility Segregation로 그냥 단순히 서비스 로직과 쿼리를 분리해서 무슨 이점이 있을까? 라는 생각을 가지고 있었다.

 

하지만 우아한 형제들에서 이런 사용 사례들을 보고 나니 사용하는 이유를 좀 알 수 있을 것 같았다.

 

그리고 내가 이해한 것이 맞다면 아래와 같은 이유로 사용할 것이다.

 

이런 MSA 환경에서 주문을 수행하기 위해서는 고객의 정보를 조회해야 한다.

 

이런 상황에서 동기적으로 동작하기 위해서는 Customer 서버에 Http 요청을 보내야 하지만 이 과정에서 큰 오버헤드가 발생하고, Http 응답이 오기까지 Order 서버에서 block된다는 문제가 발생한다.

그렇다고 Order 서버에서 Customer Database에 접근하면 분리의 원칙을 위반하고 데이터베이스를 조회하는 과정에서 병목이 발생할 가능성이 있다.

 

그렇기에

그렇기에 Customer 서버에서 데이터의 변경요청이 오면 이벤트를 발행해 다른 서버들에게 알려주고, 다른 서버들은 그 중 필수적으로 필요한 부분만 조회가 빠른 데이터베이스에 저장해 조회하면서 사용하는 것이다.

 

그렇게되면 고객 정보의 Create, Update, Delete는 Customer 서버에서 일어나고 고객정보의 Read는 Order 서버에서 일어나게 된다.

이런 것을 CQRS라고 말한다고 생각한다.

 

구현은 Saga, Outbox에 비해 별로 어렵지 않았다.

기존의 데이터는 MongoDB에 저장하고 있었고 Order 서버에서도 고객의 정보를 추가적으로 저장해야 하는데, 내가 사용할 수 있는 조회가 가장 빠른 데이터베이스인 Redis를 사용했다.

 

    override fun createCustomer(createCustomerCommand: CreateCustomerCommand): Mono<CreateCustomerResponse> {
        val customer = Customer(
            id = CustomerId(ObjectId.get()),
            username = createCustomerCommand.username,
            email = createCustomerCommand.email
        )

        val customerCreatedEvent = customerDomainService.validateAndCreateCustomer(customer)

        return customerMessagePublisher.publish(customerCreatedEvent)
            .then(
                customerRepository.save(customer)
            ).thenReturn(CreateCustomerResponse(customer.id.toString(), customer.username, customer.email))
    }

 

이렇게 고객의 생성 명령이 실행되면 데이터베이스에 저장하며, 이벤트를 발행한다.

물론 이 과정 내에서도 Transaction 처리와 Outbox 패턴을 적용해야 하지만, 그 부분은 생략하도록 하겠다.

 

@KafkaListener(id = "\${kafka.consumer.customer-consumer-group-id}",
        topics = ["\${kafka.topic.customer-create}"])
    override fun receive(
        @Payload values: List<CustomerCreateAvroModel>,
        @Header(KafkaHeaders.RECEIVED_KEY) keys: List<String>,
        @Header(KafkaHeaders.RECEIVED_PARTITION) partitions: List<Int>,
        @Header(KafkaHeaders.OFFSET) offsets: List<Long>
    ) {
        values.forEach{
            customerCreateMessageListener.createCredit(
                Credit(
                    customerId = CustomerId(id = ObjectId(it.id)),
                    totalCreditAmount = Money.ZERO
                )
            ).subscribe()
        }
    }

 

이벤트를 수신하는 부분은 이렇게 고객의 id를 받아서 결제 정보를 만들게 된다.

 

여기서 이벤트는 고객의 아이디, 이름, 이메일 모두 발행이 되었지만, 수신하는 부분에서는 고객의 id만 사용하게 된다.

 

결제 과정에서는 고객의 이름, 이메일을 사용하지 않기 때문에 굳이 저장할 필요가 없다.

최소 데이터 보관 원칙에 의해 꼭 필요한 column들만 저장을 하는 것이 좋다.

 

    override fun createCredit(credit: Credit): Mono<Void> {
        return creditRepository.save(credit).then()
    }
    override fun save(credit: Credit): Mono<Credit>{
        return reactiveRedisTemplate.opsForValue().set(
            "${redisPrefix}${credit.customerId.id}",
            objectMapper.writeValueAsString(creditToCreditEntity(credit))
        ).thenReturn(credit)
    }

 

그리고는 조회가 빠른 Redis에 저장을 해두고 주문이 발생하면, Customer 서버에 데이터를 요청하지 않고 빠르게 주문을 수행할 수 있도록 한다.

+ Recent posts