반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리

Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

 

이제 debezium으로 발행한 이벤트를 소비 해보도록 하자.

 

오늘도 공식문서를 참고하여 글을 작성한다.

https://debezium.io/documentation/reference/stable/connectors/mongodb.html

 

Debezium connector for MongoDB :: Debezium Documentation

A long integer value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, set this property to a positive long value

debezium.io

 

우선 해당 컬렉션에서 변경사항이 생기면

 

이렇게 kafka에 이벤트가 전송되어야 한다.

 

그리고 앞으로 서버에서 직접 kafka로 publish 하는 게 아니기 때문에 잠깐 publish를 주석처리해 두자.

 

이렇게 일단 서버에서 직접 발행이 되지 못하도록 설정해두었다.

 

그리고 일단 listener를 통해 

    @KafkaListener(id = "\${kafka.consumer.payment-consumer-group-id}",
        topics = ["\${kafka.topic.payment-request}"])
    fun receive(
        @Payload value: String,
        @Header(KafkaHeaders.RECEIVED_KEY) keys: String,
        @Header(KafkaHeaders.RECEIVED_PARTITION) partitions: Int,
        @Header(KafkaHeaders.OFFSET) offsets: Long
    ) {

        logger.info("Received $value from $partitions partitions offsets $offsets")
    }

 

 

이렇게하고 데이터베이스를 변경해보았더니, 다음과 같은 문자열이 출력되었다.

 

이제 이거를 json으로 만들어보자.

 

ObjectMapper를 사용해서 json으로 만들어주었다.

https://jsonformatter.org/#google_vignette

 

Best JSON Formatter and JSON Validator: Online JSON Formatter

Online JSON Formatter / Beautifier and JSON Validator will format JSON data, and helps to validate, convert JSON to XML, JSON to CSV. Save and Share JSON

jsonformatter.org

여기에 json을 붙여넣으면 알아서 이쁘게 만들어준다.

 

여기서 payload를 잘봐야 한다.

우선 op가 연산의 종류를 말해준다.

C가 insert, D가 Delete, U가 Update이다.

우리는 여기서 생성되었음을 감지하는 C 일 때만 함수를 실행해야 하기에, 해당 타입일 때만 동작하게 해준다.

 

여기서 우선 필요한 값들을 추출하고, 기존에 호출하던 함수로 연결을 해주었다.

 

@KafkaListener(id = "\${kafka.consumer.payment-consumer-group-id}",
        topics = ["\${kafka.topic.payment-request}"])
    fun receive(
        @Payload value: String,
        @Header(KafkaHeaders.RECEIVED_KEY) keys: String,
        @Header(KafkaHeaders.RECEIVED_PARTITION) partitions: Int,
        @Header(KafkaHeaders.OFFSET) offsets: Long
    ) {

        val cdcJson = objectMapper.readTree(value)

        if(cdcJson["payload"]["op"].asText() == "c"){

            val paymentRequestJson = objectMapper.readTree(cdcJson["payload"]["after"].asText())

            val paymentRequestDto = PaymentRequestDto(
                id = paymentRequestJson["_id"]["\$oid"].asText(),
                customerId = paymentRequestJson["payload"]["customerId"]["\$oid"].asText(),
                price = paymentRequestJson["payload"]["price"]["\$numberLong"].asLong(),
                createdAt= LocalDateTime.ofEpochSecond(paymentRequestJson["createdAt"]["\$date"].asLong() / 1000, 0, ZoneOffset.UTC),
                paymentOrderStatus = PaymentOrderStatus.valueOf(paymentRequestJson["payload"]["paymentOrderStatus"].asText()),
            )

            logger.info("paymentRequestDto $paymentRequestDto")

            if(paymentRequestDto.paymentOrderStatus == PaymentOrderStatus.PENDING){
                logger.info("주문 {}의 결제가 진행 중입니다", paymentRequestDto.id)
                paymentRequestMessageListener.completePayment(paymentRequestDto)
            }else{
                logger.info("주문 {}의 결제가 취소 중입니다", paymentRequestDto.id)
                paymentRequestMessageListener.cancelPayment(paymentRequestDto)
            }.subscribe()
        }
    }

 

json에서 값을 가져와서 그대로 함수를 호출해주었다.

 

이렇게 바꿔준 후 Order 서버에서 새로운 주문을 요청해보았더니

 

Order 서버의 Payment_outbox를 감지해서 Payment 서버에서 이러한 이벤트가 다시 Order 서버로 발행되었다.

 

이렇게 데이터베이스에서의 변경을 감지하고 이벤트를 전송하는 Debezium을 사용해보았다.

 

이렇게 만들기는 했지만, 사실 Json을 사용하는 것보다 Avro model을 사용하는 것이 더 성능이 좋다고 한다.(kakao에서 저장공간의 최적화를 위해 Avro를 사용한다고 한다.)

 

Avro로 시도를 해보았지만, 너무 어려워서 실패했다.

그리고 전체 서비스를 Debezium outbox로 교체한 것은 아니었다.

 

이렇게 MSA에서 필수적으로 사용하는 패턴들만 사용을 해보고, 후에 Toge-do 앱 리펙토링 할 때 반영해보려고 한다.

반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리

Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

저번 포스팅에서 debezium 컨테이너를 생성했었다.

 

이제 이 debezium을 통해 kafka topic을 만들어보자.

 

늘 그렇듯, 공식문서를 보면서 한다.

https://debezium.io/documentation/reference/stable/connectors/mongodb.html

 

Debezium connector for MongoDB :: Debezium Documentation

A long integer value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, set this property to a positive long value

debezium.io

 

 

해당 항목을 참고하여 Debezium으로 http 요청을 보낸다.

 

  1.  그냥 Debezium에서 사용할 이름이다. 토픽이랑 비슷하게 맞추어서 생성하면 된다.
  2. connector.class는 mongodb를 사용하기 때문에 저 내용 그대로 넣어주면 된다. 데이터베이스마다 다 다르니, 공식문서..를 찾아 들어가서 넣어주면 된다.
  3. Mongodb 연결 주소이다. 당연히 비밀번호와 계정이 있다면 넣어주어야 하고, replicaSet의 정보도 주어야 한다.
  4. 토픽 이름의 prefix이다. 저기에 지정한 prefix에 따라 토픽의 이름이 생성된다. prefix가 A이고, B 데이터베이스의 C 컬렉션이면 토픽의 이름은 A.B.C로 생성이 된다.
  5. 변화를 감지할 데이터베이스이다. B 데이터베이스의 C 컬렉션이면 이 곳에는 B.C로 넣어주면 된다.

 

일단, debezium을 통해 토픽을 작성하는데 필요한 필수정보는 끝났다.

만약 더 추가할 내용이 있다면 공식문서를 통해 추가하도록 하자.

 

이제 POST로 debezium에게 요청하면 된다.

예시를 보면

POST http://localhost:8083/connectors/
Content-Type: application/json

{
  "name": "order-payment-request-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "tasks.max": "1",
    "mongodb.connection.string": "",
    "mongodb.authSource": "admin",
    "collection.include.list": "orders.payment_outboxes",
    "topic.prefix": "debezium",
    "tombstones.on.delete": "false"
  }
}

 

이렇게 요청하면 된다.

나는 Intellij Http를 통해 요청했다.

 

요청을 보내고 kafka-ui에 보면

connect-status 토픽에 

이런 식으로 무슨 내용이 와있다.

이러면 설정이 된것이다.

 

만약 모든 topic을 조회하고 싶으면

GET http://localhost:8083/connectors/

 

해당 uri로 요청하면 된다.

그러면 모든 debezium의 모든 토픽이 응답된다.

 

만약 토픽을 삭제하고 싶다면

DELETE http://localhost:8083/connectors/{이름}

 

여기로 DELETE 요청을 보내면 된다.

 

이렇게 내가 만든 이름으로 topic이 생성된 것을 볼 수 있다.

 

이번에는 여기까지만 하고, 다음에는 해당 토픽으로 publish 해보고 subscribe 해보도록 하자.

반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지

Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

 

기존의 Saga 패턴에서 더 강화된 내용이라고 생각하면 될 것이다.

 

Saga 패턴은 긴 트랜잭션을 짧은 트랜잭션으로 나누고 process와 rollback을 사용하여 하나씩 나아가는 구조였다.

 

여기서 이벤트를 전송하고, 변경된 내용을 데이터베이스에 저장하게 되는데 만약 이벤트를 전송하고 데이터베이스에서 에러가 발생해 이벤트만 전송되게 된다면 문제가 발생할 수 있다.

 

이런 문제를 해결하기 위해 사용하는 패턴이다.

방법은 먼저 데이터베이스에 변경사항들을 저장하고, 스케줄러를 사용해 한 번에 이벤트를 전송하는 것이다.

이 때 변경사항들은 기존의 데이터베이스가 아닌, 이벤트를 위한 별도의 저장공간을 만들게 된다.

이곳에 보낼 데이터를 미리 저장해두고 나중에 보내기 때문에 보낼 편지함(Outbox)패턴이라고 불리게 된다.

그리고 모든 사항이 완료된 Outbox 데이터들은 데이터베이스의 최적화를 위해 스케줄러를 사용해서 지속적으로 삭제해준다.

 

이러한 방법을 MSA에서 Outbox 패턴이라고 한다.

 

이렇게 별도의 데이터베이스에 저장해두고 한 번에 보내게되며, 실제 서비스에서는 1~2초의 간격으로 스케줄러를 실행한다고 한다.

 

우선 아래에는 직접 작성한 Outbox 패턴으로 설명해보겠다.

 

@Component
class PaymentOutboxScheduler(
    private val paymentOutboxHelper: PaymentOutboxHelper,
    private val paymentRequestMessagePublisher: PaymentRequestMessagePublisher
): OutboxScheduler {

    private val logger = LoggerFactory.getLogger(PaymentOutboxScheduler::class.java)

    @Transactional
    @Scheduled(fixedDelay = 10000, initialDelay = 10000)
    override fun processOutboxMessages() {
        logger.info("결제를 요청하는 스케줄러가 실행되었습니다.")
        paymentOutboxHelper.getPaymentOutboxMessageByOutboxStatusAndOrderStatus(
            OutboxStatus.STARTED,
            listOf(OrderStatus.PENDING, OrderStatus.CANCELLING)
        ).publishOn(Schedulers.boundedElastic()).map{
            paymentOutboxMessage: PaymentOutboxMessage ->
            if(paymentOutboxMessage.payload.orderStatus == OrderStatus.CANCELLING) {

                paymentOutboxMessage.payload.paymentOrderStatus = PaymentOrderStatus.CANCELLING
            }
            paymentRequestMessagePublisher.publish(
                paymentOutboxMessage = paymentOutboxMessage,
                callback = ::updateOutboxStatus
            ).subscribe()
        }.subscribe()
    }

    private fun updateOutboxStatus(paymentOutboxMessage: PaymentOutboxMessage, outboxStatus: OutboxStatus): Mono<Void> {
        paymentOutboxMessage.outboxStatus = outboxStatus
        return paymentOutboxHelper.save(paymentOutboxMessage).then()
    }
}

 

주문 서버에서 결제 서버로 결제를 요청하는 과정이다.

우선 주문이 발생하면 Outbox의 상태가 Start인 값들만 조회한다.

 

처음에 Outbox에 Start로 저장을 하기 때문에 한번도 전송된 적이 없는 데이터를 불러오는 것이다.

 

그렇게 조회된 모든 데이터를 모두 publisher로 전송을 하며, 전송을 하면 callback을 사용해 Outbox의 Status를 Complete로 변경해준다.

 

publisher의 내용이다.

override fun publish(
        paymentOutboxMessage: PaymentOutboxMessage,
        callback: (PaymentOutboxMessage, OutboxStatus) -> Mono<Void>
    ): Mono<Void> {
        return mono{

            val paymentEventPayload = paymentOutboxMessage.payload

            logger.info("{} 주문에 대한 이벤트 전송을 준비 중입니다.", paymentEventPayload.orderId.toString())

            val paymentRequestAvroModel = paymentEventPayloadToPaymentRequestAvroModel(paymentEventPayload)

            reactiveKafkaProducer.send(
                paymentRequestTopic,
                paymentEventPayload.orderId.toString(),
                paymentRequestAvroModel
            ).publishOn(Schedulers.boundedElastic()).map{
                callback(paymentOutboxMessage, OutboxStatus.COMPLETED).subscribe()
            }.doOnError{
                callback(paymentOutboxMessage, OutboxStatus.FAILED).subscribe()
            }.subscribe()

            logger.info("{}의 주문이 메시지 큐로 결제 요청을 위해 전송되었습니다", paymentEventPayload.orderId.toString())

        }.then()
    }

 

이렇게 해당 데이터를 model로 변환하여 kafka로 전송을 하고, 전송 상태의 여부에 따라 callback을 사용하여 outbox의 상태를 변환한다.

 

결제 서버의 내용은 작성하지 않겠지만, 결제 서버에서도 수신 받은 내용에 따라 내용을 처리하고 kafka로 전송할 데이터를 outbox에 저장해주면 된다.

 

이제 여기서 문제가 생기게 된다.

 

스케줄러를 사용하기 때문에 특정 시간에만 동기화가 이루어지게 되며, 해당 스케줄러가 동작하는 시간에만 CPU의 사용량이 늘어나게 된다.

 

이러한 방법을 해결하기 위해 마지막으로 CDC 패턴을 사용한다고 한다.

CDC와 관련된 내용은 CQRS 다음에 작성해보도록 하겠다.

+ Recent posts