반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리

Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

 

이제 debezium으로 발행한 이벤트를 소비 해보도록 하자.

 

오늘도 공식문서를 참고하여 글을 작성한다.

https://debezium.io/documentation/reference/stable/connectors/mongodb.html

 

Debezium connector for MongoDB :: Debezium Documentation

A long integer value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, set this property to a positive long value

debezium.io

 

우선 해당 컬렉션에서 변경사항이 생기면

 

이렇게 kafka에 이벤트가 전송되어야 한다.

 

그리고 앞으로 서버에서 직접 kafka로 publish 하는 게 아니기 때문에 잠깐 publish를 주석처리해 두자.

 

이렇게 일단 서버에서 직접 발행이 되지 못하도록 설정해두었다.

 

그리고 일단 listener를 통해 

    @KafkaListener(id = "\${kafka.consumer.payment-consumer-group-id}",
        topics = ["\${kafka.topic.payment-request}"])
    fun receive(
        @Payload value: String,
        @Header(KafkaHeaders.RECEIVED_KEY) keys: String,
        @Header(KafkaHeaders.RECEIVED_PARTITION) partitions: Int,
        @Header(KafkaHeaders.OFFSET) offsets: Long
    ) {

        logger.info("Received $value from $partitions partitions offsets $offsets")
    }

 

 

이렇게하고 데이터베이스를 변경해보았더니, 다음과 같은 문자열이 출력되었다.

 

이제 이거를 json으로 만들어보자.

 

ObjectMapper를 사용해서 json으로 만들어주었다.

https://jsonformatter.org/#google_vignette

 

Best JSON Formatter and JSON Validator: Online JSON Formatter

Online JSON Formatter / Beautifier and JSON Validator will format JSON data, and helps to validate, convert JSON to XML, JSON to CSV. Save and Share JSON

jsonformatter.org

여기에 json을 붙여넣으면 알아서 이쁘게 만들어준다.

 

여기서 payload를 잘봐야 한다.

우선 op가 연산의 종류를 말해준다.

C가 insert, D가 Delete, U가 Update이다.

우리는 여기서 생성되었음을 감지하는 C 일 때만 함수를 실행해야 하기에, 해당 타입일 때만 동작하게 해준다.

 

여기서 우선 필요한 값들을 추출하고, 기존에 호출하던 함수로 연결을 해주었다.

 

@KafkaListener(id = "\${kafka.consumer.payment-consumer-group-id}",
        topics = ["\${kafka.topic.payment-request}"])
    fun receive(
        @Payload value: String,
        @Header(KafkaHeaders.RECEIVED_KEY) keys: String,
        @Header(KafkaHeaders.RECEIVED_PARTITION) partitions: Int,
        @Header(KafkaHeaders.OFFSET) offsets: Long
    ) {

        val cdcJson = objectMapper.readTree(value)

        if(cdcJson["payload"]["op"].asText() == "c"){

            val paymentRequestJson = objectMapper.readTree(cdcJson["payload"]["after"].asText())

            val paymentRequestDto = PaymentRequestDto(
                id = paymentRequestJson["_id"]["\$oid"].asText(),
                customerId = paymentRequestJson["payload"]["customerId"]["\$oid"].asText(),
                price = paymentRequestJson["payload"]["price"]["\$numberLong"].asLong(),
                createdAt= LocalDateTime.ofEpochSecond(paymentRequestJson["createdAt"]["\$date"].asLong() / 1000, 0, ZoneOffset.UTC),
                paymentOrderStatus = PaymentOrderStatus.valueOf(paymentRequestJson["payload"]["paymentOrderStatus"].asText()),
            )

            logger.info("paymentRequestDto $paymentRequestDto")

            if(paymentRequestDto.paymentOrderStatus == PaymentOrderStatus.PENDING){
                logger.info("주문 {}의 결제가 진행 중입니다", paymentRequestDto.id)
                paymentRequestMessageListener.completePayment(paymentRequestDto)
            }else{
                logger.info("주문 {}의 결제가 취소 중입니다", paymentRequestDto.id)
                paymentRequestMessageListener.cancelPayment(paymentRequestDto)
            }.subscribe()
        }
    }

 

json에서 값을 가져와서 그대로 함수를 호출해주었다.

 

이렇게 바꿔준 후 Order 서버에서 새로운 주문을 요청해보았더니

 

Order 서버의 Payment_outbox를 감지해서 Payment 서버에서 이러한 이벤트가 다시 Order 서버로 발행되었다.

 

이렇게 데이터베이스에서의 변경을 감지하고 이벤트를 전송하는 Debezium을 사용해보았다.

 

이렇게 만들기는 했지만, 사실 Json을 사용하는 것보다 Avro model을 사용하는 것이 더 성능이 좋다고 한다.(kakao에서 저장공간의 최적화를 위해 Avro를 사용한다고 한다.)

 

Avro로 시도를 해보았지만, 너무 어려워서 실패했다.

그리고 전체 서비스를 Debezium outbox로 교체한 것은 아니었다.

 

이렇게 MSA에서 필수적으로 사용하는 패턴들만 사용을 해보고, 후에 Toge-do 앱 리펙토링 할 때 반영해보려고 한다.

반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리

Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

저번 포스팅에서 debezium 컨테이너를 생성했었다.

 

이제 이 debezium을 통해 kafka topic을 만들어보자.

 

늘 그렇듯, 공식문서를 보면서 한다.

https://debezium.io/documentation/reference/stable/connectors/mongodb.html

 

Debezium connector for MongoDB :: Debezium Documentation

A long integer value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, set this property to a positive long value

debezium.io

 

 

해당 항목을 참고하여 Debezium으로 http 요청을 보낸다.

 

  1.  그냥 Debezium에서 사용할 이름이다. 토픽이랑 비슷하게 맞추어서 생성하면 된다.
  2. connector.class는 mongodb를 사용하기 때문에 저 내용 그대로 넣어주면 된다. 데이터베이스마다 다 다르니, 공식문서..를 찾아 들어가서 넣어주면 된다.
  3. Mongodb 연결 주소이다. 당연히 비밀번호와 계정이 있다면 넣어주어야 하고, replicaSet의 정보도 주어야 한다.
  4. 토픽 이름의 prefix이다. 저기에 지정한 prefix에 따라 토픽의 이름이 생성된다. prefix가 A이고, B 데이터베이스의 C 컬렉션이면 토픽의 이름은 A.B.C로 생성이 된다.
  5. 변화를 감지할 데이터베이스이다. B 데이터베이스의 C 컬렉션이면 이 곳에는 B.C로 넣어주면 된다.

 

일단, debezium을 통해 토픽을 작성하는데 필요한 필수정보는 끝났다.

만약 더 추가할 내용이 있다면 공식문서를 통해 추가하도록 하자.

 

이제 POST로 debezium에게 요청하면 된다.

예시를 보면

POST http://localhost:8083/connectors/
Content-Type: application/json

{
  "name": "order-payment-request-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "tasks.max": "1",
    "mongodb.connection.string": "",
    "mongodb.authSource": "admin",
    "collection.include.list": "orders.payment_outboxes",
    "topic.prefix": "debezium",
    "tombstones.on.delete": "false"
  }
}

 

이렇게 요청하면 된다.

나는 Intellij Http를 통해 요청했다.

 

요청을 보내고 kafka-ui에 보면

connect-status 토픽에 

이런 식으로 무슨 내용이 와있다.

이러면 설정이 된것이다.

 

만약 모든 topic을 조회하고 싶으면

GET http://localhost:8083/connectors/

 

해당 uri로 요청하면 된다.

그러면 모든 debezium의 모든 토픽이 응답된다.

 

만약 토픽을 삭제하고 싶다면

DELETE http://localhost:8083/connectors/{이름}

 

여기로 DELETE 요청을 보내면 된다.

 

이렇게 내가 만든 이름으로 topic이 생성된 것을 볼 수 있다.

 

이번에는 여기까지만 하고, 다음에는 해당 토픽으로 publish 해보고 subscribe 해보도록 하자.

반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리

Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

 

우선 CDC는 change data capture라는 뜻으로, 기존의 outbox는 데이터베이스에서 start 상태를 조회해서 kafka로 publish 했다.

 

이 방법의 문제는 스케줄러에 의해, 해당 시간에만 전송이 이루어지기 때문에 오랜 시간이 지나야 동기화가 진행된다.

이 방법을 해결하기 위해 데이터베이스의 트랜잭션을 감지해, create update delete의 연산이 일어나면 감지해서 kafka로 이벤트를 전송하게 된다.

 

우선 참고한 기술블로그들을 소개하겠다.

https://techblog.woowahan.com/10000/

 

CDC 너두 할 수 있어(feat. B2B 알림 서비스에 Kafka CDC 적용하기) | 우아한형제들 기술블로그

"어 이거 CDC 적용하면 딱이겠는데요? 한번 CDC로 해보면 어때요?" B2B 알림서비스 기획 리뷰 도중 제안받은 의견입니다. 저는 이때까지만 해도 CDC가 무엇인지 잘 모르는 상태였지만, 저 의견 덕분

techblog.woowahan.com

 

우선 굉장히 어렵다...

 

솔직히 블로그를 작성하는 지금까지 완벽하게 성공하지는 못한 것 같다.

그래도 차근차근 글을 작성하며 익혀보도록 하겠다.

 

우선 docker로 debezium 컨테이너를 만들어보자.

 

사용한 debezium의 docker-compose.yml이다.

 

services:
  debezium:
    container_name: debezium
    image: debezium/connect:2.7.3.Final
    ports:
      - "8083:8083"
    environment:
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: "connect-config"
      OFFSET_STORAGE_TOPIC: "connect-offsets"
      STATUS_STORAGE_TOPIC: "connect-status"
      BOOTSTRAP_SERVERS: kafka:29092
      LOGGING_LEVEL: "DEBUG"
      CONNECT_SCHEMA_NAME_ADJUSTMENT_MODE: avro
      KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
    volumes:
      - "./debezium:/kafka/connect/debezium-connector-schemaregistry-7.2.6"
    networks:
      - seungkyu

networks:
  seungkyu:
    driver: bridge
    name: seungkyu
    external: true

 

네트워크를 kafka와 같은 네트워크로 설정하고, kafka의 주소를 BOOTSTRAP_SERVERS로 지정하면 된다.

 

avro를 사용하기 때문에 schema를 avro로 지정해주고, converter로 avroconverter로 지정을 해준다.

 

그리고 여기서는 debezium-connector-schemaregistry의 버전에 따라 jar 파일들을 넣어주어야 한다.

 

https://debezium.io/documentation/reference/stable/configuration/avro.html

 

Avro Serialization :: Debezium Documentation

Version: |

debezium.io

 

해당 주소에 가져와야 할 파일들이 적혀있다.

 

여기에 해당하는 jar 파일들을 버전에 맞추어 mount한 디렉토리에 저장해주면 된다.

 

또한, 이 중에서도

 

avro와 guava에 해당하는 jar 파일도 maven repository에서 찾아 디렉토리에 넣어주어야 한다.

 

그렇게 하면 일단, 시작!은 할 수 있게 된 것이다.

반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지

Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

 

우선 CQRS 패턴을 적용하기 위해 

https://youtu.be/BnS6343GTkY?si=kRkeJSen4kr-3tO9

 

해당 영상을 참고했다.

 

사실 CQRS가 Command Query Responsibility Segregation로 그냥 단순히 서비스 로직과 쿼리를 분리해서 무슨 이점이 있을까? 라는 생각을 가지고 있었다.

 

하지만 우아한 형제들에서 이런 사용 사례들을 보고 나니 사용하는 이유를 좀 알 수 있을 것 같았다.

 

그리고 내가 이해한 것이 맞다면 아래와 같은 이유로 사용할 것이다.

 

이런 MSA 환경에서 주문을 수행하기 위해서는 고객의 정보를 조회해야 한다.

 

이런 상황에서 동기적으로 동작하기 위해서는 Customer 서버에 Http 요청을 보내야 하지만 이 과정에서 큰 오버헤드가 발생하고, Http 응답이 오기까지 Order 서버에서 block된다는 문제가 발생한다.

그렇다고 Order 서버에서 Customer Database에 접근하면 분리의 원칙을 위반하고 데이터베이스를 조회하는 과정에서 병목이 발생할 가능성이 있다.

 

그렇기에

그렇기에 Customer 서버에서 데이터의 변경요청이 오면 이벤트를 발행해 다른 서버들에게 알려주고, 다른 서버들은 그 중 필수적으로 필요한 부분만 조회가 빠른 데이터베이스에 저장해 조회하면서 사용하는 것이다.

 

그렇게되면 고객 정보의 Create, Update, Delete는 Customer 서버에서 일어나고 고객정보의 Read는 Order 서버에서 일어나게 된다.

이런 것을 CQRS라고 말한다고 생각한다.

 

구현은 Saga, Outbox에 비해 별로 어렵지 않았다.

기존의 데이터는 MongoDB에 저장하고 있었고 Order 서버에서도 고객의 정보를 추가적으로 저장해야 하는데, 내가 사용할 수 있는 조회가 가장 빠른 데이터베이스인 Redis를 사용했다.

 

    override fun createCustomer(createCustomerCommand: CreateCustomerCommand): Mono<CreateCustomerResponse> {
        val customer = Customer(
            id = CustomerId(ObjectId.get()),
            username = createCustomerCommand.username,
            email = createCustomerCommand.email
        )

        val customerCreatedEvent = customerDomainService.validateAndCreateCustomer(customer)

        return customerMessagePublisher.publish(customerCreatedEvent)
            .then(
                customerRepository.save(customer)
            ).thenReturn(CreateCustomerResponse(customer.id.toString(), customer.username, customer.email))
    }

 

이렇게 고객의 생성 명령이 실행되면 데이터베이스에 저장하며, 이벤트를 발행한다.

물론 이 과정 내에서도 Transaction 처리와 Outbox 패턴을 적용해야 하지만, 그 부분은 생략하도록 하겠다.

 

@KafkaListener(id = "\${kafka.consumer.customer-consumer-group-id}",
        topics = ["\${kafka.topic.customer-create}"])
    override fun receive(
        @Payload values: List<CustomerCreateAvroModel>,
        @Header(KafkaHeaders.RECEIVED_KEY) keys: List<String>,
        @Header(KafkaHeaders.RECEIVED_PARTITION) partitions: List<Int>,
        @Header(KafkaHeaders.OFFSET) offsets: List<Long>
    ) {
        values.forEach{
            customerCreateMessageListener.createCredit(
                Credit(
                    customerId = CustomerId(id = ObjectId(it.id)),
                    totalCreditAmount = Money.ZERO
                )
            ).subscribe()
        }
    }

 

이벤트를 수신하는 부분은 이렇게 고객의 id를 받아서 결제 정보를 만들게 된다.

 

여기서 이벤트는 고객의 아이디, 이름, 이메일 모두 발행이 되었지만, 수신하는 부분에서는 고객의 id만 사용하게 된다.

 

결제 과정에서는 고객의 이름, 이메일을 사용하지 않기 때문에 굳이 저장할 필요가 없다.

최소 데이터 보관 원칙에 의해 꼭 필요한 column들만 저장을 하는 것이 좋다.

 

    override fun createCredit(credit: Credit): Mono<Void> {
        return creditRepository.save(credit).then()
    }
    override fun save(credit: Credit): Mono<Credit>{
        return reactiveRedisTemplate.opsForValue().set(
            "${redisPrefix}${credit.customerId.id}",
            objectMapper.writeValueAsString(creditToCreditEntity(credit))
        ).thenReturn(credit)
    }

 

그리고는 조회가 빠른 Redis에 저장을 해두고 주문이 발생하면, Customer 서버에 데이터를 요청하지 않고 빠르게 주문을 수행할 수 있도록 한다.

반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지

Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

 

기존의 Saga 패턴에서 더 강화된 내용이라고 생각하면 될 것이다.

 

Saga 패턴은 긴 트랜잭션을 짧은 트랜잭션으로 나누고 process와 rollback을 사용하여 하나씩 나아가는 구조였다.

 

여기서 이벤트를 전송하고, 변경된 내용을 데이터베이스에 저장하게 되는데 만약 이벤트를 전송하고 데이터베이스에서 에러가 발생해 이벤트만 전송되게 된다면 문제가 발생할 수 있다.

 

이런 문제를 해결하기 위해 사용하는 패턴이다.

방법은 먼저 데이터베이스에 변경사항들을 저장하고, 스케줄러를 사용해 한 번에 이벤트를 전송하는 것이다.

이 때 변경사항들은 기존의 데이터베이스가 아닌, 이벤트를 위한 별도의 저장공간을 만들게 된다.

이곳에 보낼 데이터를 미리 저장해두고 나중에 보내기 때문에 보낼 편지함(Outbox)패턴이라고 불리게 된다.

그리고 모든 사항이 완료된 Outbox 데이터들은 데이터베이스의 최적화를 위해 스케줄러를 사용해서 지속적으로 삭제해준다.

 

이러한 방법을 MSA에서 Outbox 패턴이라고 한다.

 

이렇게 별도의 데이터베이스에 저장해두고 한 번에 보내게되며, 실제 서비스에서는 1~2초의 간격으로 스케줄러를 실행한다고 한다.

 

우선 아래에는 직접 작성한 Outbox 패턴으로 설명해보겠다.

 

@Component
class PaymentOutboxScheduler(
    private val paymentOutboxHelper: PaymentOutboxHelper,
    private val paymentRequestMessagePublisher: PaymentRequestMessagePublisher
): OutboxScheduler {

    private val logger = LoggerFactory.getLogger(PaymentOutboxScheduler::class.java)

    @Transactional
    @Scheduled(fixedDelay = 10000, initialDelay = 10000)
    override fun processOutboxMessages() {
        logger.info("결제를 요청하는 스케줄러가 실행되었습니다.")
        paymentOutboxHelper.getPaymentOutboxMessageByOutboxStatusAndOrderStatus(
            OutboxStatus.STARTED,
            listOf(OrderStatus.PENDING, OrderStatus.CANCELLING)
        ).publishOn(Schedulers.boundedElastic()).map{
            paymentOutboxMessage: PaymentOutboxMessage ->
            if(paymentOutboxMessage.payload.orderStatus == OrderStatus.CANCELLING) {

                paymentOutboxMessage.payload.paymentOrderStatus = PaymentOrderStatus.CANCELLING
            }
            paymentRequestMessagePublisher.publish(
                paymentOutboxMessage = paymentOutboxMessage,
                callback = ::updateOutboxStatus
            ).subscribe()
        }.subscribe()
    }

    private fun updateOutboxStatus(paymentOutboxMessage: PaymentOutboxMessage, outboxStatus: OutboxStatus): Mono<Void> {
        paymentOutboxMessage.outboxStatus = outboxStatus
        return paymentOutboxHelper.save(paymentOutboxMessage).then()
    }
}

 

주문 서버에서 결제 서버로 결제를 요청하는 과정이다.

우선 주문이 발생하면 Outbox의 상태가 Start인 값들만 조회한다.

 

처음에 Outbox에 Start로 저장을 하기 때문에 한번도 전송된 적이 없는 데이터를 불러오는 것이다.

 

그렇게 조회된 모든 데이터를 모두 publisher로 전송을 하며, 전송을 하면 callback을 사용해 Outbox의 Status를 Complete로 변경해준다.

 

publisher의 내용이다.

override fun publish(
        paymentOutboxMessage: PaymentOutboxMessage,
        callback: (PaymentOutboxMessage, OutboxStatus) -> Mono<Void>
    ): Mono<Void> {
        return mono{

            val paymentEventPayload = paymentOutboxMessage.payload

            logger.info("{} 주문에 대한 이벤트 전송을 준비 중입니다.", paymentEventPayload.orderId.toString())

            val paymentRequestAvroModel = paymentEventPayloadToPaymentRequestAvroModel(paymentEventPayload)

            reactiveKafkaProducer.send(
                paymentRequestTopic,
                paymentEventPayload.orderId.toString(),
                paymentRequestAvroModel
            ).publishOn(Schedulers.boundedElastic()).map{
                callback(paymentOutboxMessage, OutboxStatus.COMPLETED).subscribe()
            }.doOnError{
                callback(paymentOutboxMessage, OutboxStatus.FAILED).subscribe()
            }.subscribe()

            logger.info("{}의 주문이 메시지 큐로 결제 요청을 위해 전송되었습니다", paymentEventPayload.orderId.toString())

        }.then()
    }

 

이렇게 해당 데이터를 model로 변환하여 kafka로 전송을 하고, 전송 상태의 여부에 따라 callback을 사용하여 outbox의 상태를 변환한다.

 

결제 서버의 내용은 작성하지 않겠지만, 결제 서버에서도 수신 받은 내용에 따라 내용을 처리하고 kafka로 전송할 데이터를 outbox에 저장해주면 된다.

 

이제 여기서 문제가 생기게 된다.

 

스케줄러를 사용하기 때문에 특정 시간에만 동기화가 이루어지게 되며, 해당 스케줄러가 동작하는 시간에만 CPU의 사용량이 늘어나게 된다.

 

이러한 방법을 해결하기 위해 마지막으로 CDC 패턴을 사용한다고 한다.

CDC와 관련된 내용은 CQRS 다음에 작성해보도록 하겠다.

반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지

Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

 

MSA에서 제일 중요한 부분 중 하나라고 생각한다.

현재 개발 중인 서버의 구조이다.

 

  1. 클라이언트가 주문을 하면, 주문 서버의 API를 사용해 주문을 하게 된다.
  2. 주문 서버만을 사용해 주문을 완료할 수 없으니, 우선 고객의 잔액을 확인하기 위해 payment 서버로 메시지를 보내게 된다.
  3. 결제 서버는 금액과 잔액을 확인한 후에, 결제가 가능하다면 금액을 뺀 잔액을 저장하고 주문 서버로 응답 메시지를 보내게 된다.
  4. 주문 서버는 결제가 가능하다면, 식당 서버로 메시지를 보내 해당 식당의 상태와 메뉴를 확인한다.
  5. 식당 서버는 해당 메시지를 확인 한 후 그에 맞는 메시지를 주문 서버에게 응답한다.
  6. 해당 메시지를 확인 한 후, 결과를 데이터베이스에 저장한다.

 

이런 과정을 통해 결제가 진행된다.

늘 그렇듯 이런 과정에 transaction 처리를 해야한다.

하지만 다른 서버간에 메시지를 보내는 과정에서 어떻게 transaction 처리를 할 수 있을까?

이럴 때 사용하는 것이 saga 패턴이다.

 

Saga 패턴마이크로서비스 아키텍처에서 분산 트랜잭션을 관리하는 방법 중 하나다.

각 서비스가 개별적으로 트랜잭션을 수행하고, 트랜잭션 간 일관성을 유지하기 위해 보상 작업(rollback)이나 이벤트 체인을 활용하는 방식을 말한다.

 

즉 해당 서버로부터 응답을 받고, 그 결과에 따라 작업을 process 할지 rollback 할지 결정하는 것이다.

 

우선 현재 서비스를 분석해보면, 다음과 같은 실패가 존재한다.

  1. 결제 과정에서 잔액이 부족해 결제를 실패하는 경우
  2. 결제는 성공했지만, 식당에서 문제가 있어 주문이 실패하는 경우

1번은 잔액을 보존하고, 주문 서버로 실패 이벤트만 전송하면 되지만 2번은 아니다.

2번은 식당 서버에서 실패 이벤트를 받으면, 결제 서버로도 실패 이벤트를 전송해 잔액을 복구해야 한다.

 

우선 서비스에 적용을 해보도록 하자.

Saga의 적용을 위해 아래와 같은 인터페이스를 생성한다.

interface SagaStep<T, SuccessEvent: DomainEvent<*>, FailEvent: DomainEvent<*>> {

    fun process(data: T): Mono<SuccessEvent>
    fun rollback(data: T): Mono<FailEvent>
}

여기서 DomainEvent는 common 모듈에서 각각의 이벤트를 위해 상속받아 구현한다.

 

결제와 관련된 Saga는 다음과 같이 구현한다.

@Component
class OrderPaymentSaga(
    private val orderDomainService: OrderDomainService,
    private val orderRepository: OrderRepository
): SagaStep<PaymentResponse, OrderPaidEvent, EmptyEvent> {

    private val logger = LoggerFactory.getLogger(OrderPaymentSaga::class.java)

    override fun process(data: PaymentResponse): Mono<OrderPaidEvent> {
        logger.info("주문 {}의 상태를 결제완료로 변경합니다", data.id)
        return orderRepository.findById(ObjectId(data.id))
            .flatMap{
                val orderPaidEvent = orderDomainService.payOrder(order = it)
                orderRepository.save(it)
                    .thenReturn(orderPaidEvent)
            }.doOnNext{
                logger.info("주문 {}의 상태가 결제완료로 변경되어 저장되었습니다.", it.order.orderId.id)
            }
    }

    override fun rollback(data: PaymentResponse): Mono<EmptyEvent> {
        logger.info("주문 {}의 상태를 취소로 변경합니다.", data.id)
        return orderRepository.findById(ObjectId(data.id))
            .flatMap {
                orderDomainService.cancelOrder(order = it)
                orderRepository.save(it)
                    .thenReturn(EmptyEvent())
            }.doOnNext{
                logger.info("주문 {}의 상태가 취소로 변경되어 저장되었습니다.", data.id)
            }
    }
}

 

주문이 실패한다면, 해당 주문만 실패로 데이터베이스에 저장해준다.

주문이 성공한다면, 해당 주문을 결제성공으로 데이터베이스에 저장하고 식당 서버로 승인 요청 이벤트를 전송한다.

 

승인과 관련된 Saga이다.

@Component
class RestaurantApprovalSaga(
    private val orderDomainService: OrderDomainService,
    private val orderRepository: OrderRepository,
    private val orderCancelledPaymentRequestMessagePublisher: OrderCancelledPaymentRequestMessagePublisher
): SagaStep<RestaurantApprovalResponse, EmptyEvent, OrderCancelledEvent> {

    private val logger = LoggerFactory.getLogger(RestaurantApprovalSaga::class.java)

    @Transactional
    override fun process(data: RestaurantApprovalResponse): Mono<EmptyEvent> {
        logger.info("{} 주문이 승인 완료되었습니다", data.id)

        return orderRepository.findById(ObjectId(data.id))
            .flatMap {
                orderDomainService.approveOrder(order = it)
                logger.info("바뀐거: $it")
                orderRepository.save(it)
                    .thenReturn(EmptyEvent())
            }.doOnNext{
                logger.info("{} 주문이 승인 완료되어 저장되었습니다", data.id)
            }
    }

    @Transactional
    override fun rollback(data: RestaurantApprovalResponse): Mono<OrderCancelledEvent> {
        logger.info("{} 주문이 미승인 되었습니다", data.id)
        return orderRepository.findById(ObjectId(data.id))
            .flatMap {
                val orderCancelledEvent = orderDomainService.cancelOrderPayment(it)
                orderRepository.save(it)
                    .thenReturn(orderCancelledEvent)
            }.doOnNext{
                logger.info("{} 주문이 취소 중 상태로 저장되었습니다", it.order.orderId.id.toString())
            }.doOnNext{
                orderCancelledPaymentRequestMessagePublisher.publish(it)
            }.doOnNext {
                logger.info("{} 주문의 결제 취소 이벤트를 전송했습니다.", it.order.orderId.id.toString())
            }
    }
}

 

주문이 승인되었다면, 해당 주문만 승인 완료로 데이터베이스에 저장해주면 된다. (이미 결제 서버에서 결제는 완료되었기 때문에)

주문이 실패했다면, 결제 서버의 잔액을 복구시켜야 하기 때문에 결제서버로 실패 이벤트를 전송해준다.

 

이런 식으로 메시지를 받을 때마다, process 할건지 rollback 할건지 결정해서 다음 프로세스를 진행하면 된다.

그리고 당연히 메시지를 받고 보내는 과정까지의 해당 서버는 transaction 하게 동작해야 한다.

 

enum class OrderStatus {
    PENDING, PAID, APPROVED, CANCELLING, CANCELLED
}

 

주문의 상태는 다음과 같다.

PENDING: 주문이 막 생성된 상태

PAID: 결제가 완료된 상태 (Order -> Payment -> Order로 성공 이벤트를 응답받음)

APPROVED: 주문이 승인된 상태 (Order -> Payment -> Order -> Restaurant -> Order로 성공 이벤트를 응답받음)

CANCELLING: 승인이 취소된 상태 (Order -> Payment -> Order -> Restaurant -> Order로 실패 이벤트를 응답받음)

CANCELLED: 모든 취소가 완료된 상태

 

해당 상태들을 데이터베이스에 저장해가며, 장기 transaction을 메시지를 응답받고 보내는 과정으로 분해하여 transaction을 적용하며 saga패턴을 적용하면 된다.

 

해당 서버 내에서 transaction을 적용하는 것은 어렵지 않겠지만, 이런 프로세스를 이해 할 수 있도록 saga 패턴을 적용할 서버를 제대로 분석하는 것이 필요하다고 생각한다.

'MSA' 카테고리의 다른 글

MSA에 CQRS 패턴 적용하기  (0) 2025.03.01
MSA에 Outbox 패턴 적용하기  (0) 2025.02.28
DDD에서 Hexagonal Architecture로 변경하기  (0) 2025.02.20
Spring + Kafka에서 avro 사용하기  (1) 2025.02.15
DDD의 핵심 요소  (0) 2025.02.14
반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지

Webflux 환경에서 MSA의 Saga, Outbox, CQRS를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

기존에 사용하던 Architecture는 보통 DDD였다.

물론 DDD도 굉장히 좋은 설계이지만, 다음과 같은 문제가 있었다.

 

 

보통 이렇게 개발이 되는데, 여기서 가장 핵심인 부분은 Domain Layer이다.

다른 Layer에 영향을 최대한 받지 않고, 독립적으로 존재할 수 있어야 가장 좋을 것이다.

하지만 지금은 Data Layer에 종속이 되어 있으며, Data Layer가 변경이 될 때마다 Domain Layer도 수정이 되어야 한다는 문제가 생긴다.

 

그렇기 때문에 이런 구조를 다음과 같이 변경해보려고 한다.

Domain Layer에서 사용할 Data Layer의 인터페이스만 만들어두고, 해당 인터페이스는 Data Layer에서 하는 것이다.

Domain Layer에서 사용하는 인터페이스를 포트라고 하고, 해당 인터페이스를 구현하는 객체를 어뎁터라고 한다.

 

이렇게 Domain Layer에서 포트를 만들고, Data Layer에서 어뎁터로 연결하는 방식을 사용하여 Domain Layer의 독립성을 높이는 방법이 Hexagonal Architecture이다.

 

 

이런 방법으로 다른 Layer들이 Domain Layer에 의존하도록 한다.

 

예를 들어보자면, 결제와 관련된 서비스를 개발하고 있다.

 

여기서 가장 독립적인 모듈은 domain인 payment-domain이다.

이런 식으로 포트를 만든다.

여기서 포트는 입력으로 들어오는 포트와 나는 출력 포트가 있다.

 

입력으로 들어오는 포트는 domain layer에서 기존처럼 개발 할 수 있다.

어차피 상위 layer에서 해당 포트를 사용하는 구조이기 때문이다.

 

출력으로 나가는 포트는 여기서 구현하는 것이 아닌, 인터페이스만 만들어두고 해당 인터페이스를 사용해서 domain layer를 개발한다.

이런 식으로 인터페이스만 만들어 사용한다.

 

해당 인터페이스의 구현은 외부 persistence 모듈이다.

 

이런 식으로 어뎁터 클래스를 만들고

dependency injection을 통해 외부에서 구현한 repository를 주입해준다.

message와 관련한 kafka도 데이터베이스와 같다.

 

이런 식으로 설계한다면, domain layer는 어떤 모듈도 의존하지 않는 가장 독립적인 상태로 개발이 가능하다.

'MSA' 카테고리의 다른 글

MSA에 CQRS 패턴 적용하기  (0) 2025.03.01
MSA에 Outbox 패턴 적용하기  (0) 2025.02.28
MSA에 SAGA 패턴 적용하기  (0) 2025.02.24
Spring + Kafka에서 avro 사용하기  (1) 2025.02.15
DDD의 핵심 요소  (0) 2025.02.14
반응형

MSA에서 굉장히 많이 사용하는 Kafka를 사용하려고 한다.

 

kafka에서는 이벤트를 주고 받을 때, 넘기는 데이터를 class로 정의해야 하는데 이것을 avro를 사용해서 정의해보려고 한다.

 

avro를 사용해서 resource 폴더에 json으로 포멧을 작성하고, avro를 실행하면 java의 클래스로 파일들이 생성되게 된다.

 

우선 gradle에 avro를 추가하자

 

build.gradle.kts를 사용했다.

import com.github.davidmc24.gradle.plugin.avro.GenerateAvroJavaTask

plugins {
    kotlin("jvm") version "1.9.25"
    id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
}

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.apache.avro:avro:1.12.0")
}

tasks.test {
    useJUnitPlatform()
}

kotlin {
    jvmToolchain(17)
}

avro {
    setCreateSetters(false)
}

val generateAvro:TaskProvider<GenerateAvroJavaTask> = tasks.register("generateAvro", GenerateAvroJavaTask::class.java) {
    source("src/main/resources/avro")
    setOutputDir(file("src/main/java"))
    stringType.set("String")
    enableDecimalLogicalType = true
}

tasks.named("compileJava").configure {
    dependsOn(generateAvro)
}

 

source로 src/main/resources/avro를 지정해주었다.

해당 폴더에 avsc 파일을 생성한 후에 generateAvro를 gradlew로 실행해주면 된다.

 

avsc 파일의 예시이다.

{
    "namespace": "seungkyu.food.ordering.kafka.order.avro.model",
    "type": "record",
    "name": "PaymentRequestAvroModel",
    "fields": [
        {
            "name": "id",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
        },
        {
            "name": "sagaId",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
        },
        {
            "name": "customerId",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
        },
        {
            "name": "orderId",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
        },
        {
            "name": "price",
            "type": {
                "type": "long",
                "logicalType": "long"
            }
        },
        {
            "name": "createdAt",
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
            }
        },
        {
            "name": "paymentOrderStatus",
            "type": {
                  "type": "enum",
                  "name": "PaymentOrderStatus",
                  "symbols": ["PENDING", "CANCELLED"]
               }
        }
    ]
}

 

우선 namespace는 생성할 파일의 위치이다.

name으로 해당 클래스의 이름을 지정해준다.

 

fields이다.

name으로 해당 클래스에서 파라미터의 이름을 지정해준다.

type으로 class에서 사용할 타입과, kafka에서 사용할 타입을 명시해준다.

 

이제 gradle로 실행해보자.

지정한 패키지에 해당 클래스들이 생성되었다.

 

들어가서 확인해보니, 이렇게 직접 수정하지 말라고 하는 안내도 있었다.

 

이렇게 생성한 class로 kafka 이벤트를 주고 받을 수 있게 되었다.

'MSA' 카테고리의 다른 글

MSA에 CQRS 패턴 적용하기  (0) 2025.03.01
MSA에 Outbox 패턴 적용하기  (0) 2025.02.28
MSA에 SAGA 패턴 적용하기  (0) 2025.02.24
DDD에서 Hexagonal Architecture로 변경하기  (0) 2025.02.20
DDD의 핵심 요소  (0) 2025.02.14
반응형

MSA를 알기 위해서는 우선 DDD를 먼저 알아야 한다고 한다.

DDD는 Domain Driven Design으로 도메인이라는 영역?, 집합?을 기준으로 아키텍처를 설계하는 것을 말한다.

 

DDD의 핵심 요소로는 Entity, Value Object, Aggregate, Aggregate Root, Domain Event, Domain Service, Application Service가 있으며 하나씩 알아보도록 할 것이다.

 

  • Entity

고유한 식별자로 구분되는 도메인 객체를 말한다.

객체의 Equal을 구분하는 방법은 파라미터, 참조를 확인하는 게 아니라 고유한 ID가 같은지 확인한다.

내부의 값들은 변경이 가능하며, 이 Entity 자체에 로직을 추가하여 사용한다.

data class User(
    val id: UUID,
    var name: String,
    var email: String
) {
    fun updateEmail(email: String) {
        this.email = email
    }
}

 

이런 식으로 id를 활용하여 동일성을 판단하고, updateEmail과 같은 함수들을 포함한다.

  • Value Object(값 객체)

값 자체로 동등성을 비교하는, id가 없는 객체이다.

값들을 전체 비교하여 동일한 객체로 간주한다.

내부의 값은 변경할 수 없으며, 필요하다면 객체를 새로 생성하여 사용해야 한다.

 

data class Address(
    val city: String,
    val street: String,
    val postalCode: String
)

 

이렇게 Id가 존재하지 않으며, parameter들만 동일하다면 동일한 객체이다.

 

  • Aggregate

관련된 Entity와 값 객체를 묶어서 일관성을 유지하는 군집을 말한다.

하나의 트랜잭션 단위이며, Aggregate Root를 사용해서만 외부 접근이 가능하다.

  • Aggregate Root

Aggregate 내의 최상위 Entity이며, Aggregate Root를 통해서만 Aggregate에 접근이 가능하다.

data class Order(
    val id: String,
    val customerId: String,
    var items: MutableList<OrderItem>,
    var status: OrderStatus
) {
    fun addItem(item: OrderItem) {
        items.add(item)
    }

    fun completeOrder() {
        status = OrderStatus.COMPLETED
    }
}

data class OrderItem(
    val productId: String,
    var quantity: Int,
    val price: Double
)

 

여기서 Order가 Aggregate Root, OrderItem이 Aggregate이며, OrderItem에 직접 접근하지 않고, Order의 메서드를 통해서만 접근하는 것을 볼 수 있다.

 

  • Domain Event

도메인 내에서 비즈니스적으로 중요한 이벤트를 명시하는 객체이다.

도메인 모델에서 비즈니스 이벤트를 명확하게 표현한다.

이벤트 발행 후 비동기로 데이터를 처리가능하다.

data class OrderCompletedEvent(
    val orderId: String,
    val completedAt: LocalDateTime
)

 

  • Domain Service

비즈니스 규칙을 처리하는 서비스이다.

Entity나 Value Object로 표현하기 어려운 비즈니스 로직을 담당하며, stateless 서비스이다.

class DiscountService {
    fun calculateDiscount(order: Order): Double {
        return if (order.items.size >= 5) 0.1 else 0.0
    }
}

 

Order에 넣기 애매한 비즈니스 규칙이기에 도메인 서비스로 분리한다.

도메인 엔티티와 상호작용하는 것을 볼 수 있다.

 

  • Application Service

비즈니스 흐름을 관리하는 서비스

유즈케이스를 중심으로 작업의 흐름을 제어하며, 도메인 서비스와 도메인 엔티티등을 호출해 작업을 조합한다.

상위 계층에서 외부 API 호출을 담당하며, 트랜잭션과 이벤트 발행의 책임이 있다.

@Service
class OrderService(
    private val orderRepository: OrderRepository,
    private val discountService: DiscountService
) {
    @Transactional
    fun createOrder(customerId: String, items: List<OrderItem>): Order {
        val order = Order(
            id = UUID.randomUUID().toString(),
            customerId = customerId,
            items = items.toMutableList(),
            status = OrderStatus.PENDING
        )
        // 할인 적용 (도메인 서비스 사용)
        val discount = discountService.calculateDiscount(order)
        if (discount > 0) {
            println("할인 적용: $discount")
        }

        // 주문 저장
        return orderRepository.save(order)
    }
}

 

도메인 서비스와 애플리케이션 서비스가 좀 헷갈리지만, 두 서비스는 호출하는 layer가 다른 것을 볼 수 있다.

'MSA' 카테고리의 다른 글

MSA에 CQRS 패턴 적용하기  (0) 2025.03.01
MSA에 Outbox 패턴 적용하기  (0) 2025.02.28
MSA에 SAGA 패턴 적용하기  (0) 2025.02.24
DDD에서 Hexagonal Architecture로 변경하기  (0) 2025.02.20
Spring + Kafka에서 avro 사용하기  (1) 2025.02.15

+ Recent posts