반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리

Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

 

우선 CDC는 change data capture라는 뜻으로, 기존의 outbox는 데이터베이스에서 start 상태를 조회해서 kafka로 publish 했다.

 

이 방법의 문제는 스케줄러에 의해, 해당 시간에만 전송이 이루어지기 때문에 오랜 시간이 지나야 동기화가 진행된다.

이 방법을 해결하기 위해 데이터베이스의 트랜잭션을 감지해, create update delete의 연산이 일어나면 감지해서 kafka로 이벤트를 전송하게 된다.

 

우선 참고한 기술블로그들을 소개하겠다.

https://techblog.woowahan.com/10000/

 

CDC 너두 할 수 있어(feat. B2B 알림 서비스에 Kafka CDC 적용하기) | 우아한형제들 기술블로그

"어 이거 CDC 적용하면 딱이겠는데요? 한번 CDC로 해보면 어때요?" B2B 알림서비스 기획 리뷰 도중 제안받은 의견입니다. 저는 이때까지만 해도 CDC가 무엇인지 잘 모르는 상태였지만, 저 의견 덕분

techblog.woowahan.com

 

우선 굉장히 어렵다...

 

솔직히 블로그를 작성하는 지금까지 완벽하게 성공하지는 못한 것 같다.

그래도 차근차근 글을 작성하며 익혀보도록 하겠다.

 

우선 docker로 debezium 컨테이너를 만들어보자.

 

사용한 debezium의 docker-compose.yml이다.

 

services:
  debezium:
    container_name: debezium
    image: debezium/connect:2.7.3.Final
    ports:
      - "8083:8083"
    environment:
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: "connect-config"
      OFFSET_STORAGE_TOPIC: "connect-offsets"
      STATUS_STORAGE_TOPIC: "connect-status"
      BOOTSTRAP_SERVERS: kafka:29092
      LOGGING_LEVEL: "DEBUG"
      CONNECT_SCHEMA_NAME_ADJUSTMENT_MODE: avro
      KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
    volumes:
      - "./debezium:/kafka/connect/debezium-connector-schemaregistry-7.2.6"
    networks:
      - seungkyu

networks:
  seungkyu:
    driver: bridge
    name: seungkyu
    external: true

 

네트워크를 kafka와 같은 네트워크로 설정하고, kafka의 주소를 BOOTSTRAP_SERVERS로 지정하면 된다.

 

avro를 사용하기 때문에 schema를 avro로 지정해주고, converter로 avroconverter로 지정을 해준다.

 

그리고 여기서는 debezium-connector-schemaregistry의 버전에 따라 jar 파일들을 넣어주어야 한다.

 

https://debezium.io/documentation/reference/stable/configuration/avro.html

 

Avro Serialization :: Debezium Documentation

Version: |

debezium.io

 

해당 주소에 가져와야 할 파일들이 적혀있다.

 

여기에 해당하는 jar 파일들을 버전에 맞추어 mount한 디렉토리에 저장해주면 된다.

 

또한, 이 중에서도

 

avro와 guava에 해당하는 jar 파일도 maven repository에서 찾아 디렉토리에 넣어주어야 한다.

 

그렇게 하면 일단, 시작!은 할 수 있게 된 것이다.

반응형

MSA에서 굉장히 많이 사용하는 Kafka를 사용하려고 한다.

 

kafka에서는 이벤트를 주고 받을 때, 넘기는 데이터를 class로 정의해야 하는데 이것을 avro를 사용해서 정의해보려고 한다.

 

avro를 사용해서 resource 폴더에 json으로 포멧을 작성하고, avro를 실행하면 java의 클래스로 파일들이 생성되게 된다.

 

우선 gradle에 avro를 추가하자

 

build.gradle.kts를 사용했다.

import com.github.davidmc24.gradle.plugin.avro.GenerateAvroJavaTask

plugins {
    kotlin("jvm") version "1.9.25"
    id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
}

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.apache.avro:avro:1.12.0")
}

tasks.test {
    useJUnitPlatform()
}

kotlin {
    jvmToolchain(17)
}

avro {
    setCreateSetters(false)
}

val generateAvro:TaskProvider<GenerateAvroJavaTask> = tasks.register("generateAvro", GenerateAvroJavaTask::class.java) {
    source("src/main/resources/avro")
    setOutputDir(file("src/main/java"))
    stringType.set("String")
    enableDecimalLogicalType = true
}

tasks.named("compileJava").configure {
    dependsOn(generateAvro)
}

 

source로 src/main/resources/avro를 지정해주었다.

해당 폴더에 avsc 파일을 생성한 후에 generateAvro를 gradlew로 실행해주면 된다.

 

avsc 파일의 예시이다.

{
    "namespace": "seungkyu.food.ordering.kafka.order.avro.model",
    "type": "record",
    "name": "PaymentRequestAvroModel",
    "fields": [
        {
            "name": "id",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
        },
        {
            "name": "sagaId",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
        },
        {
            "name": "customerId",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
        },
        {
            "name": "orderId",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
        },
        {
            "name": "price",
            "type": {
                "type": "long",
                "logicalType": "long"
            }
        },
        {
            "name": "createdAt",
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
            }
        },
        {
            "name": "paymentOrderStatus",
            "type": {
                  "type": "enum",
                  "name": "PaymentOrderStatus",
                  "symbols": ["PENDING", "CANCELLED"]
               }
        }
    ]
}

 

우선 namespace는 생성할 파일의 위치이다.

name으로 해당 클래스의 이름을 지정해준다.

 

fields이다.

name으로 해당 클래스에서 파라미터의 이름을 지정해준다.

type으로 class에서 사용할 타입과, kafka에서 사용할 타입을 명시해준다.

 

이제 gradle로 실행해보자.

지정한 패키지에 해당 클래스들이 생성되었다.

 

들어가서 확인해보니, 이렇게 직접 수정하지 말라고 하는 안내도 있었다.

 

이렇게 생성한 class로 kafka 이벤트를 주고 받을 수 있게 되었다.

'MSA' 카테고리의 다른 글

MSA에 CQRS 패턴 적용하기  (0) 2025.03.01
MSA에 Outbox 패턴 적용하기  (0) 2025.02.28
MSA에 SAGA 패턴 적용하기  (0) 2025.02.24
DDD에서 Hexagonal Architecture로 변경하기  (0) 2025.02.20
DDD의 핵심 요소  (0) 2025.02.14

+ Recent posts