반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리

Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

저번 포스팅에서 debezium 컨테이너를 생성했었다.

 

이제 이 debezium을 통해 kafka topic을 만들어보자.

 

늘 그렇듯, 공식문서를 보면서 한다.

https://debezium.io/documentation/reference/stable/connectors/mongodb.html

 

Debezium connector for MongoDB :: Debezium Documentation

A long integer value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, set this property to a positive long value

debezium.io

 

 

해당 항목을 참고하여 Debezium으로 http 요청을 보낸다.

 

  1.  그냥 Debezium에서 사용할 이름이다. 토픽이랑 비슷하게 맞추어서 생성하면 된다.
  2. connector.class는 mongodb를 사용하기 때문에 저 내용 그대로 넣어주면 된다. 데이터베이스마다 다 다르니, 공식문서..를 찾아 들어가서 넣어주면 된다.
  3. Mongodb 연결 주소이다. 당연히 비밀번호와 계정이 있다면 넣어주어야 하고, replicaSet의 정보도 주어야 한다.
  4. 토픽 이름의 prefix이다. 저기에 지정한 prefix에 따라 토픽의 이름이 생성된다. prefix가 A이고, B 데이터베이스의 C 컬렉션이면 토픽의 이름은 A.B.C로 생성이 된다.
  5. 변화를 감지할 데이터베이스이다. B 데이터베이스의 C 컬렉션이면 이 곳에는 B.C로 넣어주면 된다.

 

일단, debezium을 통해 토픽을 작성하는데 필요한 필수정보는 끝났다.

만약 더 추가할 내용이 있다면 공식문서를 통해 추가하도록 하자.

 

이제 POST로 debezium에게 요청하면 된다.

예시를 보면

POST http://localhost:8083/connectors/
Content-Type: application/json

{
  "name": "order-payment-request-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "tasks.max": "1",
    "mongodb.connection.string": "",
    "mongodb.authSource": "admin",
    "collection.include.list": "orders.payment_outboxes",
    "topic.prefix": "debezium",
    "tombstones.on.delete": "false"
  }
}

 

이렇게 요청하면 된다.

나는 Intellij Http를 통해 요청했다.

 

요청을 보내고 kafka-ui에 보면

connect-status 토픽에 

이런 식으로 무슨 내용이 와있다.

이러면 설정이 된것이다.

 

만약 모든 topic을 조회하고 싶으면

GET http://localhost:8083/connectors/

 

해당 uri로 요청하면 된다.

그러면 모든 debezium의 모든 토픽이 응답된다.

 

만약 토픽을 삭제하고 싶다면

DELETE http://localhost:8083/connectors/{이름}

 

여기로 DELETE 요청을 보내면 된다.

 

이렇게 내가 만든 이름으로 topic이 생성된 것을 볼 수 있다.

 

이번에는 여기까지만 하고, 다음에는 해당 토픽으로 publish 해보고 subscribe 해보도록 하자.

반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리

Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

 

우선 CDC는 change data capture라는 뜻으로, 기존의 outbox는 데이터베이스에서 start 상태를 조회해서 kafka로 publish 했다.

 

이 방법의 문제는 스케줄러에 의해, 해당 시간에만 전송이 이루어지기 때문에 오랜 시간이 지나야 동기화가 진행된다.

이 방법을 해결하기 위해 데이터베이스의 트랜잭션을 감지해, create update delete의 연산이 일어나면 감지해서 kafka로 이벤트를 전송하게 된다.

 

우선 참고한 기술블로그들을 소개하겠다.

https://techblog.woowahan.com/10000/

 

CDC 너두 할 수 있어(feat. B2B 알림 서비스에 Kafka CDC 적용하기) | 우아한형제들 기술블로그

"어 이거 CDC 적용하면 딱이겠는데요? 한번 CDC로 해보면 어때요?" B2B 알림서비스 기획 리뷰 도중 제안받은 의견입니다. 저는 이때까지만 해도 CDC가 무엇인지 잘 모르는 상태였지만, 저 의견 덕분

techblog.woowahan.com

 

우선 굉장히 어렵다...

 

솔직히 블로그를 작성하는 지금까지 완벽하게 성공하지는 못한 것 같다.

그래도 차근차근 글을 작성하며 익혀보도록 하겠다.

 

우선 docker로 debezium 컨테이너를 만들어보자.

 

사용한 debezium의 docker-compose.yml이다.

 

services:
  debezium:
    container_name: debezium
    image: debezium/connect:2.7.3.Final
    ports:
      - "8083:8083"
    environment:
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: "connect-config"
      OFFSET_STORAGE_TOPIC: "connect-offsets"
      STATUS_STORAGE_TOPIC: "connect-status"
      BOOTSTRAP_SERVERS: kafka:29092
      LOGGING_LEVEL: "DEBUG"
      CONNECT_SCHEMA_NAME_ADJUSTMENT_MODE: avro
      KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
    volumes:
      - "./debezium:/kafka/connect/debezium-connector-schemaregistry-7.2.6"
    networks:
      - seungkyu

networks:
  seungkyu:
    driver: bridge
    name: seungkyu
    external: true

 

네트워크를 kafka와 같은 네트워크로 설정하고, kafka의 주소를 BOOTSTRAP_SERVERS로 지정하면 된다.

 

avro를 사용하기 때문에 schema를 avro로 지정해주고, converter로 avroconverter로 지정을 해준다.

 

그리고 여기서는 debezium-connector-schemaregistry의 버전에 따라 jar 파일들을 넣어주어야 한다.

 

https://debezium.io/documentation/reference/stable/configuration/avro.html

 

Avro Serialization :: Debezium Documentation

Version: |

debezium.io

 

해당 주소에 가져와야 할 파일들이 적혀있다.

 

여기에 해당하는 jar 파일들을 버전에 맞추어 mount한 디렉토리에 저장해주면 된다.

 

또한, 이 중에서도

 

avro와 guava에 해당하는 jar 파일도 maven repository에서 찾아 디렉토리에 넣어주어야 한다.

 

그렇게 하면 일단, 시작!은 할 수 있게 된 것이다.

반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지

Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

 

우선 CQRS 패턴을 적용하기 위해 

https://youtu.be/BnS6343GTkY?si=kRkeJSen4kr-3tO9

 

해당 영상을 참고했다.

 

사실 CQRS가 Command Query Responsibility Segregation로 그냥 단순히 서비스 로직과 쿼리를 분리해서 무슨 이점이 있을까? 라는 생각을 가지고 있었다.

 

하지만 우아한 형제들에서 이런 사용 사례들을 보고 나니 사용하는 이유를 좀 알 수 있을 것 같았다.

 

그리고 내가 이해한 것이 맞다면 아래와 같은 이유로 사용할 것이다.

 

이런 MSA 환경에서 주문을 수행하기 위해서는 고객의 정보를 조회해야 한다.

 

이런 상황에서 동기적으로 동작하기 위해서는 Customer 서버에 Http 요청을 보내야 하지만 이 과정에서 큰 오버헤드가 발생하고, Http 응답이 오기까지 Order 서버에서 block된다는 문제가 발생한다.

그렇다고 Order 서버에서 Customer Database에 접근하면 분리의 원칙을 위반하고 데이터베이스를 조회하는 과정에서 병목이 발생할 가능성이 있다.

 

그렇기에

그렇기에 Customer 서버에서 데이터의 변경요청이 오면 이벤트를 발행해 다른 서버들에게 알려주고, 다른 서버들은 그 중 필수적으로 필요한 부분만 조회가 빠른 데이터베이스에 저장해 조회하면서 사용하는 것이다.

 

그렇게되면 고객 정보의 Create, Update, Delete는 Customer 서버에서 일어나고 고객정보의 Read는 Order 서버에서 일어나게 된다.

이런 것을 CQRS라고 말한다고 생각한다.

 

구현은 Saga, Outbox에 비해 별로 어렵지 않았다.

기존의 데이터는 MongoDB에 저장하고 있었고 Order 서버에서도 고객의 정보를 추가적으로 저장해야 하는데, 내가 사용할 수 있는 조회가 가장 빠른 데이터베이스인 Redis를 사용했다.

 

    override fun createCustomer(createCustomerCommand: CreateCustomerCommand): Mono<CreateCustomerResponse> {
        val customer = Customer(
            id = CustomerId(ObjectId.get()),
            username = createCustomerCommand.username,
            email = createCustomerCommand.email
        )

        val customerCreatedEvent = customerDomainService.validateAndCreateCustomer(customer)

        return customerMessagePublisher.publish(customerCreatedEvent)
            .then(
                customerRepository.save(customer)
            ).thenReturn(CreateCustomerResponse(customer.id.toString(), customer.username, customer.email))
    }

 

이렇게 고객의 생성 명령이 실행되면 데이터베이스에 저장하며, 이벤트를 발행한다.

물론 이 과정 내에서도 Transaction 처리와 Outbox 패턴을 적용해야 하지만, 그 부분은 생략하도록 하겠다.

 

@KafkaListener(id = "\${kafka.consumer.customer-consumer-group-id}",
        topics = ["\${kafka.topic.customer-create}"])
    override fun receive(
        @Payload values: List<CustomerCreateAvroModel>,
        @Header(KafkaHeaders.RECEIVED_KEY) keys: List<String>,
        @Header(KafkaHeaders.RECEIVED_PARTITION) partitions: List<Int>,
        @Header(KafkaHeaders.OFFSET) offsets: List<Long>
    ) {
        values.forEach{
            customerCreateMessageListener.createCredit(
                Credit(
                    customerId = CustomerId(id = ObjectId(it.id)),
                    totalCreditAmount = Money.ZERO
                )
            ).subscribe()
        }
    }

 

이벤트를 수신하는 부분은 이렇게 고객의 id를 받아서 결제 정보를 만들게 된다.

 

여기서 이벤트는 고객의 아이디, 이름, 이메일 모두 발행이 되었지만, 수신하는 부분에서는 고객의 id만 사용하게 된다.

 

결제 과정에서는 고객의 이름, 이메일을 사용하지 않기 때문에 굳이 저장할 필요가 없다.

최소 데이터 보관 원칙에 의해 꼭 필요한 column들만 저장을 하는 것이 좋다.

 

    override fun createCredit(credit: Credit): Mono<Void> {
        return creditRepository.save(credit).then()
    }
    override fun save(credit: Credit): Mono<Credit>{
        return reactiveRedisTemplate.opsForValue().set(
            "${redisPrefix}${credit.customerId.id}",
            objectMapper.writeValueAsString(creditToCreditEntity(credit))
        ).thenReturn(credit)
    }

 

그리고는 조회가 빠른 Redis에 저장을 해두고 주문이 발생하면, Customer 서버에 데이터를 요청하지 않고 빠르게 주문을 수행할 수 있도록 한다.

+ Recent posts