반응형

면접에서 나왔던 질문이지만, 내가 대답하지 못했던 부분에 대해서 알아보려고 한다.

 

우선 트랜잭션이 무엇인지, 왜 필요한지는 이미 안다고 생각한다.

 

그리고 당연히 트랜잭션의 레벨이 높아질수록, 동시성이 떨어지기 때문에 성능이 떨어진다.

하지만 그만큼 고립시켜서 동작하기 때문에 일관성은 높아진다.

 

우선 크게

Read Uncommitted, Read Committed, Repeatable Read, Serializable

이렇게 4가지로 나뉘며, 오른쪽으로 갈수록 레벨이 높아진다.

 

  • Read Uncommitted

트랜잭션이 처리중이거나, 아직 commit되지 않은 데이터를 다른 트랜잭션이 읽는 것을 허용한다.

 

 

이렇게 그냥 다른 트랜잭션에서 처리를 하고 있더라도, 데이터를 그냥 조회해서 값을 가져온다.

하지만 조회 후, 데이터가 rollback 되어 버리면 부정확한 데이터를 가져오게 된다.

 

이런 Dirty Read가 발생할 수 있기 때문에, 정확도가 너무 낮아 표준에서 인정하지 않는 격리 수준이라고 한다.

 

  • Read Committed

커밋된 데이터만 조회할 수 있도록 한다.

그렇기에 다른 트랜잭션이 접근 할 수 없어 대기하게 된다.

그렇기에 commit 되기 전의 데이터를 읽는 Dirty Read는 발생하지 않게 된다.

 

하지만, 하나의 트랜잭션에서 다른 트랜잭션의 commit에 따라 데이터의 조회 결과가 달라지는 Non-Repeatable Read 문제가 생기게 된다.

그렇지만 자주 발생하는 문제는 아니며, 대부분의 기본 설정이 이 Read Commited라고 한다.

 

  • Repeatable Read

보통의 RDBMS는 변경 전의 데이터를 Undo 공간에 백업해둔다.

그렇기에 트랜잭션의 번호를 확인해, 자신보다 더 늦은 트랜잭션의 번호가 존재한다면 이 Undo 공간에서 데이터를 조회하게 된다.

그렇기 때문에, 다른 트랜잭션에 의해 변경되더라도 동일한 데이터를 조회할 수 있게 된다.

 

하지만 조회 트랜잭션동안 새로운 데이터가 추가되는 것을 감지하지 못하는 Phantom Read가 발생할 수도 있다고 한다.

그래도 이 Phantom Read는 트랜잭션의 번호를 확인해 무시하는 방법으로 해결 할 수 있으며, 현재의 RDBMS에서는 특정 상황이 아니라면 대부분 발생하지 않는 현상이기에 크게 신경쓰지는 않아도 된다고 한다.

 

  • Serializable

마지막은 가장 강력한 Serializable 레벨이다.

그냥 모든 트랜잭션을 순차적으로 실행시키기에 위에 소개했던 모든 정합성 관련 문제들이 발생하지 않는다.

하지만 동시성이 없어지기에 성능이 매우 떨어진다.

극단적으로 안정성이 필요한 경우가 아니라면 잘 사용하지 않는다고 한다.

반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리

Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

저번 포스팅에서 debezium 컨테이너를 생성했었다.

 

이제 이 debezium을 통해 kafka topic을 만들어보자.

 

늘 그렇듯, 공식문서를 보면서 한다.

https://debezium.io/documentation/reference/stable/connectors/mongodb.html

 

Debezium connector for MongoDB :: Debezium Documentation

A long integer value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, set this property to a positive long value

debezium.io

 

 

해당 항목을 참고하여 Debezium으로 http 요청을 보낸다.

 

  1.  그냥 Debezium에서 사용할 이름이다. 토픽이랑 비슷하게 맞추어서 생성하면 된다.
  2. connector.class는 mongodb를 사용하기 때문에 저 내용 그대로 넣어주면 된다. 데이터베이스마다 다 다르니, 공식문서..를 찾아 들어가서 넣어주면 된다.
  3. Mongodb 연결 주소이다. 당연히 비밀번호와 계정이 있다면 넣어주어야 하고, replicaSet의 정보도 주어야 한다.
  4. 토픽 이름의 prefix이다. 저기에 지정한 prefix에 따라 토픽의 이름이 생성된다. prefix가 A이고, B 데이터베이스의 C 컬렉션이면 토픽의 이름은 A.B.C로 생성이 된다.
  5. 변화를 감지할 데이터베이스이다. B 데이터베이스의 C 컬렉션이면 이 곳에는 B.C로 넣어주면 된다.

 

일단, debezium을 통해 토픽을 작성하는데 필요한 필수정보는 끝났다.

만약 더 추가할 내용이 있다면 공식문서를 통해 추가하도록 하자.

 

이제 POST로 debezium에게 요청하면 된다.

예시를 보면

POST http://localhost:8083/connectors/
Content-Type: application/json

{
  "name": "order-payment-request-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "tasks.max": "1",
    "mongodb.connection.string": "",
    "mongodb.authSource": "admin",
    "collection.include.list": "orders.payment_outboxes",
    "topic.prefix": "debezium",
    "tombstones.on.delete": "false"
  }
}

 

이렇게 요청하면 된다.

나는 Intellij Http를 통해 요청했다.

 

요청을 보내고 kafka-ui에 보면

connect-status 토픽에 

이런 식으로 무슨 내용이 와있다.

이러면 설정이 된것이다.

 

만약 모든 topic을 조회하고 싶으면

GET http://localhost:8083/connectors/

 

해당 uri로 요청하면 된다.

그러면 모든 debezium의 모든 토픽이 응답된다.

 

만약 토픽을 삭제하고 싶다면

DELETE http://localhost:8083/connectors/{이름}

 

여기로 DELETE 요청을 보내면 된다.

 

이렇게 내가 만든 이름으로 topic이 생성된 것을 볼 수 있다.

 

이번에는 여기까지만 하고, 다음에는 해당 토픽으로 publish 해보고 subscribe 해보도록 하자.

반응형

https://github.com/Seungkyu-Han/micro_service_webflux

 

GitHub - Seungkyu-Han/micro_service_webflux: Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리

Webflux 환경에서 MSA의 Saga, Outbox, CQRS, CDC를 연습해보기 위한 리포지토리입니다. - Seungkyu-Han/micro_service_webflux

github.com

 

우선 CDC는 change data capture라는 뜻으로, 기존의 outbox는 데이터베이스에서 start 상태를 조회해서 kafka로 publish 했다.

 

이 방법의 문제는 스케줄러에 의해, 해당 시간에만 전송이 이루어지기 때문에 오랜 시간이 지나야 동기화가 진행된다.

이 방법을 해결하기 위해 데이터베이스의 트랜잭션을 감지해, create update delete의 연산이 일어나면 감지해서 kafka로 이벤트를 전송하게 된다.

 

우선 참고한 기술블로그들을 소개하겠다.

https://techblog.woowahan.com/10000/

 

CDC 너두 할 수 있어(feat. B2B 알림 서비스에 Kafka CDC 적용하기) | 우아한형제들 기술블로그

"어 이거 CDC 적용하면 딱이겠는데요? 한번 CDC로 해보면 어때요?" B2B 알림서비스 기획 리뷰 도중 제안받은 의견입니다. 저는 이때까지만 해도 CDC가 무엇인지 잘 모르는 상태였지만, 저 의견 덕분

techblog.woowahan.com

 

우선 굉장히 어렵다...

 

솔직히 블로그를 작성하는 지금까지 완벽하게 성공하지는 못한 것 같다.

그래도 차근차근 글을 작성하며 익혀보도록 하겠다.

 

우선 docker로 debezium 컨테이너를 만들어보자.

 

사용한 debezium의 docker-compose.yml이다.

 

services:
  debezium:
    container_name: debezium
    image: debezium/connect:2.7.3.Final
    ports:
      - "8083:8083"
    environment:
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: "connect-config"
      OFFSET_STORAGE_TOPIC: "connect-offsets"
      STATUS_STORAGE_TOPIC: "connect-status"
      BOOTSTRAP_SERVERS: kafka:29092
      LOGGING_LEVEL: "DEBUG"
      CONNECT_SCHEMA_NAME_ADJUSTMENT_MODE: avro
      KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
    volumes:
      - "./debezium:/kafka/connect/debezium-connector-schemaregistry-7.2.6"
    networks:
      - seungkyu

networks:
  seungkyu:
    driver: bridge
    name: seungkyu
    external: true

 

네트워크를 kafka와 같은 네트워크로 설정하고, kafka의 주소를 BOOTSTRAP_SERVERS로 지정하면 된다.

 

avro를 사용하기 때문에 schema를 avro로 지정해주고, converter로 avroconverter로 지정을 해준다.

 

그리고 여기서는 debezium-connector-schemaregistry의 버전에 따라 jar 파일들을 넣어주어야 한다.

 

https://debezium.io/documentation/reference/stable/configuration/avro.html

 

Avro Serialization :: Debezium Documentation

Version: |

debezium.io

 

해당 주소에 가져와야 할 파일들이 적혀있다.

 

여기에 해당하는 jar 파일들을 버전에 맞추어 mount한 디렉토리에 저장해주면 된다.

 

또한, 이 중에서도

 

avro와 guava에 해당하는 jar 파일도 maven repository에서 찾아 디렉토리에 넣어주어야 한다.

 

그렇게 하면 일단, 시작!은 할 수 있게 된 것이다.

반응형

https://github.com/Seungkyu-Han/Toge-do-backend

 

GitHub - Seungkyu-Han/Toge-do-backend: Toge-do 앱의 백엔드 리포지토리입니다

Toge-do 앱의 백엔드 리포지토리입니다. Contribute to Seungkyu-Han/Toge-do-backend development by creating an account on GitHub.

github.com

이제 Stomp를 사용한 채팅 서버의 구현이다.

 

기존에 개발했던 채팅은 Websocket을 사용해서 소켓을 연결시켜두고, 거기에 Flux로 데이터를 전송해주었던 걸로 기억난다.

클라이언트로부터 데이터를 받을 수 있는 SSE 느낌으로 말이다.

 

이번에는 Stomp 라이브러리를 사용해 제대로 채팅을 구현해보려 한다.

 

Stomp는 우선 socket에 연결하고, subscribe하는 주소에 구독해 메시지를 받고 publish하는 주소로 메시지를 보낸다고 한다.

그래서 다른 방법으로는 테스트가 어렵고 저번에 작성했던 글의 방법으로 테스트해야 한다.

 

https://jiangxy.github.io/websocket-debug-tool/

 

WebSocket Debug Tool

 

jiangxy.github.io

 

해당 사이트로 설명을 해보자면

저렇게 url에 소켓을 연결하고 stomp subscribe에 서버에서 설정한 subscribe destination을 구독하면 해당 채팅방에서 생성된 메시지들이 실시간으로 전달된다.

사실 그냥 destication을 구독하는게 아니라 endpoint가 /subscribe면 뒤에 /subscribe/{chatGroupId} 이런 식으로 뒤에 채팅방의 정보가 있어야 한다.

 

마찬가지로 send destination을 통해 메시지를 보내며 이 때도 /publish/{chatGroupId}와 같이 채팅방의 정보가 추가되어야 한다.

헤더를 추가할 수도 있으며, 이거는 그냥 연결해두고 Rest API를 호출한다고 생각하면 편할것이다.

 

그럼 일단 Stomp를 사용하기 위해 추가해야 하는 라이브러리이다.

implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation ("org.springframework.boot:spring-boot-starter-websocket")

 

이렇게 websocket과 webflux만 추가해주면 된다고 한다.

나는 기존에 webflux를 사용하고 있어 websocket만 추가해주었다.

 

이제 stomp의 config다

@Configuration
@EnableWebSocketMessageBroker
class ChatConfig: WebSocketMessageBrokerConfigurer {

    override fun registerStompEndpoints(registry: StompEndpointRegistry) {
        registry.addEndpoint("/websocket/v1/chat")
            .setAllowedOriginPatterns("*")
            .withSockJS()
        registry.addEndpoint("/websocket/v1/chat")
            .setAllowedOriginPatterns("*")
    }

    override fun configureMessageBroker(registry: MessageBrokerRegistry) {
        registry.enableSimpleBroker("/sub")
        registry.setApplicationDestinationPrefixes("/pub")
    }
}

 

우선 이런 식으로 작성하고 하나씩 설명해보자면

 

registerStompEndpoints는 웹소켓의 endpoint를 설정한다.

registry.addEndpoint에 해당 웹소켓의 end point를 설정해준다.

만약 사용하는 서버가 localhost:8080이라면 end point는 ws://localhost:8080/websocket/v1/chat이 될 것 이다.

setAllowedOriginPatterns에는 해당 웹소켓에 접속 가능한 주소를 적는 것이다.

클라우드 서버에서 사용하기 때문에 "*"로 지정해주었다.

withSockJs는 자바스크립트 라이브러리 관련된 것이라고 하는데, 나는 일단 몰라서 모두 등록해두었다.

 

configureMessageBroker는 위에서 보았던 subscribe, publish로 사용할 destination을 등록해주면 된다.

 

이제 message controller를 작성해보자.

@RestController안에 @MessageMapping을 적어주면 된다.

    @MessageMapping("/{groupId}")
    fun publishChatMessage(
        @Parameter(hidden = true) @Header(HttpHeaders.AUTHORIZATION) accessToken: String,
        @DestinationVariable groupId: String,
        @RequestBody messageReqDto: MessageReqDto
    ): Mono<ResponseEntity<Void>> {
        return chatService.publishMessage(
            groupId = groupId,
            userId = getUserIdFromToken(accessToken),
            message = messageReqDto.message
        ).then(Mono.fromCallable { ResponseEntity(HttpStatus.OK) })
    }

 

이렇게 원하는 Header를 가져올 수도 있고, dto를 정의해서 원하는 json 형식으로 값을 가져올 수 있다.

@DestinationVariable로 groupId를 가져왔으며, 해당 정보는 MongoDB에 그대로 저장한다.

 

이제 서버에서 클라이언트로 메시지를 보내는 방법이다.

simpMessageSendingOperations.convertAndSend(
    "/sub/${chatDocument.groupId}", messageDocumentToDto(chatDocument)

 

그냥 간단하게 SimpMessageSendingOperations에 convertAndSend로 구독하고 있는 채팅방으로 메시지 dto를 전송하면 된다.

 

해당 이벤트를 가져오는 방법은 전 포스팅에서 MongoDB changeStream 부분을 찾아보면 된다.

 

테스트 결과도 전 포스팅의 사진으로 대체하도록 하겠다.

 

Stomp로 처음 채팅방을 만들어 보았는데, 채팅방 관리가 굉장히 쉬웠다.

WebSocket만 사용해서 구현하는 거보다 난이도가 더 쉬운 것 같다.

반응형

https://github.com/Seungkyu-Han/Toge-do-backend

 

GitHub - Seungkyu-Han/Toge-do-backend: Toge-do 앱의 백엔드 리포지토리입니다

Toge-do 앱의 백엔드 리포지토리입니다. Contribute to Seungkyu-Han/Toge-do-backend development by creating an account on GitHub.

github.com

 

마지막 기능인 채팅 기능을 개발하던 중, 당연히 kafka를 써야겠다고 생각하고 있었지만... 그냥 갑자기 쓰기 싫어졌다.

우선 기존 MSA 환경에서 채팅 서버에서 kafka를 쓰는 이유는 다음과 같다.

(MSA 환경이라고 말하는게 맞나...? 그냥 해당 서버가 여러개이고 로드밸런서로 연결된 상황)

 

채팅 서버가 하나가 아니기 때문에, 위와 같은 상황에서는 A가 B에게 메시지를 전송할 수 없다.

따라서 서버 간에 이벤트(메시지 전송)을 주고 받기 위해, Kafka를 사용하는 것이다.

 

 

해당 상황에서 kafka는 다음과 같은 역할을 한다.

이렇게해서 모든 chat server에 메시지가 전달되고, B와 연결되어 있는 서버에서 메시지를 전송하게 되는 것이다.

 

근데, 이 상황이 마음에 들지는 않았다.

어차피 채팅을 저장하기 위해 MongoDB에 저장도 해야하는데, 이벤트를 전송하기 위해 kafka도 연결하니 메시지 전송 하나로 외부 리소스에 2번 접근하는 것이기 때문이다.

 

이 방법을 해결하기 위해 MongoDB의 changeStream을 사용하고자 한다.

ReactiveMongoTemplate을 사용했으며, MongoTemplate에서는 이 기능이 있는지는 잘 모르겠다.

 

 

우선 구현해보도록 하자.

        reactiveMongoTemplate.changeStream(ChatDocument::class.java)
            .listen()
            .doOnNext{
                item ->
                val chatDocument = item.body
                val operationType = item.operationType

                if(chatDocument != null && operationType == OperationType.INSERT) {
                    simpMessageSendingOperations.convertAndSend(
                        "/sub/${chatDocument.groupId}", messageDocumentToDto(chatDocument)
                    )
                }
            }.subscribe()

 

이렇게 changeStream({변화를 subscribe할 Document}).listen()

이 메서드로 해당 collection에 데이터의 삽입, 수정, 삭제 이벤트를 구독할 수 있다.

이러한 이벤트를 doOnNext로 처리하는 것이다.

 

우리는 채팅이 생성되는 Insert 이벤트만 사용할 것이다.

그렇기 때문에 listen의 Return에서 operationType을 가져오고 조건문을 사용해 INSERT인 경우에만 웹소켓에 메시지를 전송하도록 한다.

 

조건문 안의 코드는 Stomp 라이브러리를 사용해 사용자에게 메시지를 전송하는 부분이다.

다음 포스팅의 내용으로 다루도록 하겠다.

 

이제 2개의 서버를 다른 포트로 실행하고, 각각의 서버에 연결한 메시지가 정상적으로 오는지 확인해보자.

 

테스트는 아래의 페이지를 사용했다.

https://jiangxy.github.io/websocket-debug-tool/

 

WebSocket Debug Tool

 

jiangxy.github.io

stomp로 테스트가 가능하기 때문이다.

 

스프링 서버를 12044, 12045 포트에서 실행 해두었다.

 

그리고 이렇게 각각 같은 채팅방에 구독을 한 후, 메시지를 전송해보니(페이지가 화면을 줄이니 깨진다...)

이렇게 정상적으로 모두에게 전송되는 것을 볼 수 있었다.(토큰을 사용해서 중간 부분은 모자이크로 처리했다.)

 

이렇게 하니 Kafka없이도 MongoDB만 사용해 채팅서버를 구현할 수 있었다.

반응형

https://github.com/Seungkyu-Han/Toge-do-backend

 

GitHub - Seungkyu-Han/Toge-do-backend: Toge-do 앱의 백엔드 리포지토리입니다

Toge-do 앱의 백엔드 리포지토리입니다. Contribute to Seungkyu-Han/Toge-do-backend development by creating an account on GitHub.

github.com

 

우선 에러가 발생한 코드는 다음과 같다.

    override fun findByUserId(userId: ObjectId): Mono<PersonalScheduleDocument> {
        return reactiveRedisTemplate.opsForValue()
            .get("$redisPrefix$userId")
            .map{
                objectMapper.readValue(it, PersonalScheduleDocument::class.java)
            }
            .switchIfEmpty(
                personalScheduleMongoRepository.findByUserId(userId)
            )
            .publishOn(Schedulers.boundedElastic())
            .doOnSuccess {
                if (it != null)
                    reactiveRedisTemplate.opsForValue()
                        .set(
                            "$redisPrefix$userId",
                            objectMapper.writeValueAsString(it),
                            personalScheduleRedisTime
                            ).block()
            }
    }

 

이렇게 Redis를 먼저 조회하고, 만약 Redis에 값이 없으면 MongoDB를 조회하는 코드이다.

해당 코드를 이용하여 테스트를 하고, Redis에 값이 있으면 MongoDB를 호출하지 않을 것이라고 생각했다.

 

@Test
        @DisplayName("Redis에 조회하려는 값이 있는 경우")
        fun findByUserIdAndRedisHaveResultReturnSuccess(){
            //given
            val userId = ObjectId.get()
            val personalSchedule = PersonalScheduleDocument(
                userId = userId,
            )
            val personalScheduleToString = objectMapper.writeValueAsString(personalSchedule)
            `when`(reactiveRedisTemplate.opsForValue())
                .thenReturn(reactiveValueOperations)

            `when`(reactiveValueOperations.get("$redisPrefix$userId"))
                .thenReturn(Mono.just(personalScheduleToString))

            `when`(personalScheduleMongoRepository.findByUserId(userId))
                .thenReturn(Mono.just(personalSchedule))

            `when`(reactiveValueOperations.set(
                "$redisPrefix$userId",
                personalScheduleToString,
                personalScheduleRedisTime
            )).thenReturn(Mono.just(true))

            //when
            StepVerifier.create(personalScheduleRepositoryImpl.findByUserId(userId))
                .expectNextMatches { it.userId == personalSchedule.userId }.verifyComplete()

            //then

            verify(personalScheduleMongoRepository, times(0)).findByUserId(userId)


        }

 

이렇게 하고 테스트를 해보았는데

이렇게 MongoDB의 findByUserId가 호출되었다고 나온다.

 

내가 코드를 작성한 것이라고 생각해서 계속 수정해 보았지만, 고칠 수 없었고 구글링을 통해 답을 얻을 수 있었다.

 

그냥 원래 그렇다고 한다...

switchIfEmpty가 지금은 데이터베이스에 접근하고 있기 때문에 접근을 최대한 하지 않아야 한다.

 

그렇기 때문에 Mono.defer를 이용하여 호출을 최대한 미루기로 했다.(defer를 왜 사용하는지 잘 몰랐는데... 여기에서 사용하는구나)

 

override fun findByUserId(userId: ObjectId): Mono<PersonalScheduleDocument> {
        return reactiveRedisTemplate.opsForValue()
            .get("$redisPrefix$userId")
            .map{
                objectMapper.readValue(it, PersonalScheduleDocument::class.java)
            }
            .switchIfEmpty(
                Mono.defer{personalScheduleMongoRepository.findByUserId(userId) }
            )
            .publishOn(Schedulers.boundedElastic())
            .doOnSuccess {
                if (it != null)
                    reactiveRedisTemplate.opsForValue()
                        .set(
                            "$redisPrefix$userId",
                            objectMapper.writeValueAsString(it),
                            personalScheduleRedisTime
                            ).block()
            }
    }

 

이렇게 해당 코드를 Mono.defer{personalScheduleMongoRepository.findByUserId(userId)}로 감싸고 테스트를 해보니

 

이렇게 테스트가 잘 통과하는 것을 볼 수 있었다.

 

switchIfEmpty 다 수정하러 가야겠다...

반응형

https://github.com/Seungkyu-Han/Toge-do-backend

 

GitHub - Seungkyu-Han/Toge-do-backend: Toge-do 앱의 백엔드 리포지토리입니다

Toge-do 앱의 백엔드 리포지토리입니다. Contribute to Seungkyu-Han/Toge-do-backend development by creating an account on GitHub.

github.com

해당 프로젝트에 transaction을 걸어보려 한다.

 

우선 transaction 설정을 하는 이유부터 말해보려 한다.

 

프로젝트 중 다음과 같은 코드가 있다.

    override fun disconnectFriend(id: ObjectId, friendId: ObjectId): Mono<UserDocument> {
        return friendService.removeFriend(id, friendId)
            .flatMap {
                friendService.removeFriend(friendId, id)
            }

id에 있는 friend를 삭제하고, 그 다음으로 friend에 있는 id를 삭제하는 것이다.

서로의 친구 목록에서 서로를 삭제하는 메서드이다.

 

다음과 같은 상황을 가정해보자.

위의 코드가 성공한 다음에, 아래에서는 에러가 발생한 상황이다.

각각 userA, userB라고 하면 userA의 친구목록에는 userB가 없지만 userB의 친구목록에는 userA가 있는 모순된 상황이 생긴다.

이러면 데이터베이스의 무결성이 깨지게 되기 때문에, 아래 코드에서 error가 생기면 위의 코드를 Rollback 해줘야 한다.

 

보통 이런 경우에서 기존의 Spring MVC에서는 @transactional을 사용하게 된다.

하지만 찾아보니 webflux 환경에서는 다른 스레드에서 작동한다는 특성 때문에 @transactional이 적용되지 않는다고 하여 알아보고, 방법을 찾아보려 한다.

 

  • 1. 중간에 Error를 발생시키고 데이터베이스 모니터링 해보기

위의 코드를 그대로 사용하고, 첫번째 removeFriend는 성공 두번째 removeFriend는 Exception을 발생시켰다.

이런 경우에는 예상대로, 한쪽의 친구만 삭제되고 한쪽은 남아있게 될까?

 

removeFriend를 다음과 같이 작성하여, 짝수번 요청에서는 에러를 발생시켰다.

private var flag = false
override fun removeFriend(userId: ObjectId, friendId: ObjectId): Mono<UserDocument> {
        return userRepository.findById(userId)
            .flatMap {
                flag = !flag
                if(flag){
                    it.removeFriend(friendId)
                }
                else
                    Mono.error(FriendException(ErrorCode.NOT_FRIEND))
            }
            .flatMap{
                userRepository.save(it)
            }.onErrorMap{
                when(it){
                    is CantRequestToMeException -> FriendException(ErrorCode.CANT_REQUEST_TO_ME)
                    is NotFriendException -> FriendException(ErrorCode.NOT_FRIEND)
                    else -> it
                }
            }
    }

 

현재 데이터베이는 아래와 같은 상태이다.

서로 각자의 ObjectId를 가지고 있다.

 

Swagger는 발생한 에러대로 친구가 아니라는 에러가 발생했으며

 

데이터베이스를 보면 한쪽의 친구 요청만 끊어진 것을 볼 수 있다.

 

자 이제 여기에 transaction을 적용해보자.

 

  • 2. TransactionalOperator를 적용

org.springframework.transaction.reactive 라이브러리에서 제공하는 TransactionalOperator를 이용하는 것이다.

 

우선 다음과 같이 Config 파일을 만들어주고

@Configuration
class TransactionalOperatorConfig {

    @Bean
    fun transactionalOperator(transactionManager: ReactiveMongoTransactionManager): TransactionalOperator {
        return TransactionalOperator.create(transactionManager)
    }

    @Bean
    fun reactiveMongoTransactionManager(dbFactory: ReactiveMongoDatabaseFactory): ReactiveMongoTransactionManager {
        return ReactiveMongoTransactionManager(dbFactory)
    }
}

여기서 중요한 부분은 해당 데이터베이스가 replica 세팅이 되어 있어야 한다는 것이다.

 

사용한 mongodb의 docker-compose 파일은 아래에 작성해두도록 하겠다.

해당 도커의 컨테이너에서 쉘에 들어가

rs.initiate()

이거만 입력해주면 설정이 끝난다.

 

이렇게 설정을 해두고

transaction을 설정하고 싶은 부분에 다음과 같이 작성해주면 된다.

transactionalOperator.transactional(Mono or Flux)

 

나는 해당 코드를 다음과같이 작성했다.

return transactionalOperator.transactional(friendService.removeFriend(id, friendId)
            .flatMap {
                friendService.removeFriend(friendId, id)
            })

이렇게 removeFriend의 2개의 함수가 하나의 transaction 안에 들어왔다

 

이제 한 번 테스트를 해보자.

(데이터베이스를 재설정해서 방금과 id가 바뀌었다.)

 

이게 데이터베이스이고 다시 에러가 발생하는 API를 요청해보았다.

 

일단 당연히 응답은 에러이다.

 

데이터베이스도 확인을 해보니

다음과 같이 transaction이 적용된 것을 볼 수 있었다.

 

해당 방법을 이용하여 프로젝트에 transaction을 적용하려 한다.

 

  • 3. @Transactional annotation 사용

자 그러면 진짜 webflux에서 transcationl annotation이 작동하지 않을까?

어차피 코드를 만들어본 김에 테스트 해보려 한다.

 

코드를 이렇게 만들어서 테스트 해보았다.

    @Transactional
    override fun disconnectFriend(id: ObjectId, friendId: ObjectId): Mono<UserDocument> {
//        return transactionalOperator.transactional(friendService.removeFriend(id, friendId)
//            .flatMap {
//                friendService.removeFriend(friendId, id)
//            })
        return friendService.removeFriend(id, friendId)
            .flatMap {
                friendService.removeFriend(friendId, id)
            }

 

현재 데이터베이스는 아래와 같다.(사실 위와 같다..)

 

자 이제 서버를 재실행하고 다시 에러가 발생하는 API를 요청해보자.

 

일단 당연하게 응답은 같다.

 

자 이제 데이터베이스를 확인해보자.

이렇게... transaction 적용이 된 것을 볼 수 있다...하하

 

그래도 프로젝트에서는 2번 방법을 사용할 것 같다.

값을 비동기로 반환하기에... 하지만 왜 영어를 쓰시는 분들이 webflux에서 @transactional을 사용하지 말라는지는 더 알아봐야 할 거 같다...

 

이렇게 오랫동안 고민만 했던 transaction을 프로젝트에 적용할 수 있었다.

반응형

최근에 Redis를 공부하면서, Redis가 가지는 장점인 "접근 속도가 빠르다"를 활용하여 데이터베이스의 캐시를 만들어보려 한다.

 

사실 많이 사용할 것 같지는 않지만, 그래도 WebFlux와 Redis에 대해 더 잘 이해할 수 있을 거 같아서 한 번 시도해보고 정리하려 한다.

 

    implementation("io.lettuce:lettuce-core:6.5.0.RELEASE")
    implementation("org.springframework.boot:spring-boot-starter-data-mongodb-reactive")
    implementation("org.springframework.boot:spring-boot-starter-data-redis-reactive")
    implementation("org.springframework.boot:spring-boot-starter-webflux")

 

우선 위의 의존성들을 추가 해 준 후

 

사용한 Entity와 Repository는 다음과 같이 작성했다.

Entity

@Document("user")
data class User(
    @Id
    var id: ObjectId? = null,
    var email: String,
    var name: String,
    @CreatedDate
    var createdAt: LocalDateTime? = null,
    @LastModifiedDate
    var updatedAt: LocalDateTime? = null,
)

 

Repository

interface UserRepository: ReactiveMongoRepository<User, ObjectId> {
}

 

간단하게 userId를 검색하면 이메일이 나오는 서버를 만들어 보았다.

 

    @GetMapping("/users/{id}")
    fun getUserEmail(@PathVariable id: String): Mono<String> {

        return Mono.just("result")
    }

 

이렇게 /users/{id}의 주소로 요청하면 값이 반환되도록 작성했다.

 

우선 가장 처음으로 만든 코드는

    return redisTemplate.opsForValue().get("users:$id")
            .map{
                it
            }.switchIfEmpty {
                userRepository.findById(ObjectId(id)).map{
                    it.email
                }
            }

 

이렇게 작성했다.

redis를 먼저 살펴 본 후에 있으면 그 값을, 없으면 다시 데이터베이스에 접근하여 값을 가져온다.

 

일단 테스트를 해보면

Redis에 데이터가 있는 경우는

 

이렇게 Redis에서, Redis의 값을 지우면

이렇게 Mongodb에서 값을 가져오게 된다.

 

근데 여기서 든 생각은, 비동기를 사용하기 때문에 동시에 접근해서 먼저 온 값을 넣는 방법은 없을까? 였다.

이렇게 사용하는 방법은 만약 cache miss가 일어난다면, 바로 데이터베이스에 접근하는 것보다 못하기 때문이다.

 

그래서 Mono.firstWithSignal을 사용했다.

firstWithSignal은 먼저 값이 나오는 Mono를 사용하는 메서드이다.

 

최종으로는 이렇게 수정했다.

    @GetMapping("/users/{id}")
    fun getUserEmail(@PathVariable id: String): Mono<String> {

        val cacheMono = redisTemplate.opsForValue().get("users:$id")
            .mapNotNull {
                it
            }

        val dbMono = userRepository.findById(ObjectId(id)).map{
            it.email
        }

        return Mono.firstWithSignal(cacheMono,dbMono)
    }

 

이번에도 테스트를 해보면

Redis에 값이 있으면

이렇게 Redis에서 값을 가져오고

 

Redis에 값이 없다면

이렇게 Mongodb에서 값을 가져오는 것을 볼 수 있다.

반응형

Spring WebFlux로 프로젝트를 진행하던 중 unique index가 필요해서 다음과 같이 지정했었다.

@Document(collection = "Category")
data class CategoryDocument(
    @Id
    val id: ObjectId? = null,
    @Indexed(unique = true, name = "category_name")
    var name: String,

)

이렇게 @Indexed(unqie=true)로 설정하면 해당 column에 unique constraint가 적용되어야 한다.

 

하지만 같은 이름으로 계속 저장을 해봐도, 에러가 생기지 않았다.

 

문제의 원인을 찾아보니, MongoDB에서는 @Indexed로 지정하는 것 뿐만 아니라

Spring에서 MongoDB에 이런 제약 조건을 설정 할 수 있도록 application.yml에 다음과 같은 옵션을 추가해야 한다고 한다.

 

spring:
  data:
    mongodb:
      auto-index-creation: false

 

이렇게 설정을 하고 MongoDB에서 

db.Category.getIndexes()의 쿼리를 날려보니

 

이런 식으로 unique가 설정된 것을 볼 수 있었다.

+ Recent posts