https://github.com/Seungkyu-Han/micro_service_webflux
GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리
Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux
github.com
이제 debezium으로 발행한 이벤트를 소비 해보도록 하자.
오늘도 공식문서를 참고하여 글을 작성한다.
https://debezium.io/documentation/reference/stable/connectors/mongodb.html
Debezium connector for MongoDB :: Debezium Documentation
A long integer value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, set this property to a positive long value
debezium.io
우선 해당 컬렉션에서 변경사항이 생기면
이렇게 kafka에 이벤트가 전송되어야 한다.
그리고 앞으로 서버에서 직접 kafka로 publish 하는 게 아니기 때문에 잠깐 publish를 주석처리해 두자.
이렇게 일단 서버에서 직접 발행이 되지 못하도록 설정해두었다.
그리고 일단 listener를 통해
@KafkaListener(id = "\${kafka.consumer.payment-consumer-group-id}",
topics = ["\${kafka.topic.payment-request}"])
fun receive(
@Payload value: String,
@Header(KafkaHeaders.RECEIVED_KEY) keys: String,
@Header(KafkaHeaders.RECEIVED_PARTITION) partitions: Int,
@Header(KafkaHeaders.OFFSET) offsets: Long
) {
logger.info("Received $value from $partitions partitions offsets $offsets")
}
이렇게하고 데이터베이스를 변경해보았더니, 다음과 같은 문자열이 출력되었다.
이제 이거를 json으로 만들어보자.
ObjectMapper를 사용해서 json으로 만들어주었다.
https://jsonformatter.org/#google_vignette
Best JSON Formatter and JSON Validator: Online JSON Formatter
Online JSON Formatter / Beautifier and JSON Validator will format JSON data, and helps to validate, convert JSON to XML, JSON to CSV. Save and Share JSON
jsonformatter.org
여기에 json을 붙여넣으면 알아서 이쁘게 만들어준다.
여기서 payload를 잘봐야 한다.
우선 op가 연산의 종류를 말해준다.
C가 insert, D가 Delete, U가 Update이다.
우리는 여기서 생성되었음을 감지하는 C 일 때만 함수를 실행해야 하기에, 해당 타입일 때만 동작하게 해준다.
여기서 우선 필요한 값들을 추출하고, 기존에 호출하던 함수로 연결을 해주었다.
@KafkaListener(id = "\${kafka.consumer.payment-consumer-group-id}",
topics = ["\${kafka.topic.payment-request}"])
fun receive(
@Payload value: String,
@Header(KafkaHeaders.RECEIVED_KEY) keys: String,
@Header(KafkaHeaders.RECEIVED_PARTITION) partitions: Int,
@Header(KafkaHeaders.OFFSET) offsets: Long
) {
val cdcJson = objectMapper.readTree(value)
if(cdcJson["payload"]["op"].asText() == "c"){
val paymentRequestJson = objectMapper.readTree(cdcJson["payload"]["after"].asText())
val paymentRequestDto = PaymentRequestDto(
id = paymentRequestJson["_id"]["\$oid"].asText(),
customerId = paymentRequestJson["payload"]["customerId"]["\$oid"].asText(),
price = paymentRequestJson["payload"]["price"]["\$numberLong"].asLong(),
createdAt= LocalDateTime.ofEpochSecond(paymentRequestJson["createdAt"]["\$date"].asLong() / 1000, 0, ZoneOffset.UTC),
paymentOrderStatus = PaymentOrderStatus.valueOf(paymentRequestJson["payload"]["paymentOrderStatus"].asText()),
)
logger.info("paymentRequestDto $paymentRequestDto")
if(paymentRequestDto.paymentOrderStatus == PaymentOrderStatus.PENDING){
logger.info("주문 {}의 결제가 진행 중입니다", paymentRequestDto.id)
paymentRequestMessageListener.completePayment(paymentRequestDto)
}else{
logger.info("주문 {}의 결제가 취소 중입니다", paymentRequestDto.id)
paymentRequestMessageListener.cancelPayment(paymentRequestDto)
}.subscribe()
}
}
json에서 값을 가져와서 그대로 함수를 호출해주었다.
이렇게 바꿔준 후 Order 서버에서 새로운 주문을 요청해보았더니
Order 서버의 Payment_outbox를 감지해서 Payment 서버에서 이러한 이벤트가 다시 Order 서버로 발행되었다.
이렇게 데이터베이스에서의 변경을 감지하고 이벤트를 전송하는 Debezium을 사용해보았다.
이렇게 만들기는 했지만, 사실 Json을 사용하는 것보다 Avro model을 사용하는 것이 더 성능이 좋다고 한다.(kakao에서 저장공간의 최적화를 위해 Avro를 사용한다고 한다.)
Avro로 시도를 해보았지만, 너무 어려워서 실패했다.
그리고 전체 서비스를 Debezium outbox로 교체한 것은 아니었다.
이렇게 MSA에서 필수적으로 사용하는 패턴들만 사용을 해보고, 후에 Toge-do 앱 리펙토링 할 때 반영해보려고 한다.
'MSA' 카테고리의 다른 글
MSA에 CDC 적용을 위한 Debezium topic 생성 (0) | 2025.03.03 |
---|---|
MSA에 CDC 적용을 위한 Debezium 도커 설정 (0) | 2025.03.02 |
MSA에 CQRS 패턴 적용하기 (0) | 2025.03.01 |
MSA에 Outbox 패턴 적용하기 (0) | 2025.02.28 |
MSA에 SAGA 패턴 적용하기 (0) | 2025.02.24 |