반응형

https://github.com/Seungkyu-Han/Toge-do-backend

 

GitHub - Seungkyu-Han/Toge-do-backend: Toge-do 앱의 백엔드 리포지토리입니다

Toge-do 앱의 백엔드 리포지토리입니다. Contribute to Seungkyu-Han/Toge-do-backend development by creating an account on GitHub.

github.com

 

마지막 기능인 채팅 기능을 개발하던 중, 당연히 kafka를 써야겠다고 생각하고 있었지만... 그냥 갑자기 쓰기 싫어졌다.

우선 기존 MSA 환경에서 채팅 서버에서 kafka를 쓰는 이유는 다음과 같다.

(MSA 환경이라고 말하는게 맞나...? 그냥 해당 서버가 여러개이고 로드밸런서로 연결된 상황)

 

채팅 서버가 하나가 아니기 때문에, 위와 같은 상황에서는 A가 B에게 메시지를 전송할 수 없다.

따라서 서버 간에 이벤트(메시지 전송)을 주고 받기 위해, Kafka를 사용하는 것이다.

 

 

해당 상황에서 kafka는 다음과 같은 역할을 한다.

이렇게해서 모든 chat server에 메시지가 전달되고, B와 연결되어 있는 서버에서 메시지를 전송하게 되는 것이다.

 

근데, 이 상황이 마음에 들지는 않았다.

어차피 채팅을 저장하기 위해 MongoDB에 저장도 해야하는데, 이벤트를 전송하기 위해 kafka도 연결하니 메시지 전송 하나로 외부 리소스에 2번 접근하는 것이기 때문이다.

 

이 방법을 해결하기 위해 MongoDB의 changeStream을 사용하고자 한다.

ReactiveMongoTemplate을 사용했으며, MongoTemplate에서는 이 기능이 있는지는 잘 모르겠다.

 

 

우선 구현해보도록 하자.

        reactiveMongoTemplate.changeStream(ChatDocument::class.java)
            .listen()
            .doOnNext{
                item ->
                val chatDocument = item.body
                val operationType = item.operationType

                if(chatDocument != null && operationType == OperationType.INSERT) {
                    simpMessageSendingOperations.convertAndSend(
                        "/sub/${chatDocument.groupId}", messageDocumentToDto(chatDocument)
                    )
                }
            }.subscribe()

 

이렇게 changeStream({변화를 subscribe할 Document}).listen()

이 메서드로 해당 collection에 데이터의 삽입, 수정, 삭제 이벤트를 구독할 수 있다.

이러한 이벤트를 doOnNext로 처리하는 것이다.

 

우리는 채팅이 생성되는 Insert 이벤트만 사용할 것이다.

그렇기 때문에 listen의 Return에서 operationType을 가져오고 조건문을 사용해 INSERT인 경우에만 웹소켓에 메시지를 전송하도록 한다.

 

조건문 안의 코드는 Stomp 라이브러리를 사용해 사용자에게 메시지를 전송하는 부분이다.

다음 포스팅의 내용으로 다루도록 하겠다.

 

이제 2개의 서버를 다른 포트로 실행하고, 각각의 서버에 연결한 메시지가 정상적으로 오는지 확인해보자.

 

테스트는 아래의 페이지를 사용했다.

https://jiangxy.github.io/websocket-debug-tool/

 

WebSocket Debug Tool

 

jiangxy.github.io

stomp로 테스트가 가능하기 때문이다.

 

스프링 서버를 12044, 12045 포트에서 실행 해두었다.

 

그리고 이렇게 각각 같은 채팅방에 구독을 한 후, 메시지를 전송해보니(페이지가 화면을 줄이니 깨진다...)

이렇게 정상적으로 모두에게 전송되는 것을 볼 수 있었다.(토큰을 사용해서 중간 부분은 모자이크로 처리했다.)

 

이렇게 하니 Kafka없이도 MongoDB만 사용해 채팅서버를 구현할 수 있었다.

반응형

https://github.com/Seungkyu-Han/Toge-do-backend

 

GitHub - Seungkyu-Han/Toge-do-backend: Toge-do 앱의 백엔드 리포지토리입니다

Toge-do 앱의 백엔드 리포지토리입니다. Contribute to Seungkyu-Han/Toge-do-backend development by creating an account on GitHub.

github.com

 

우선 에러가 발생한 코드는 다음과 같다.

    override fun findByUserId(userId: ObjectId): Mono<PersonalScheduleDocument> {
        return reactiveRedisTemplate.opsForValue()
            .get("$redisPrefix$userId")
            .map{
                objectMapper.readValue(it, PersonalScheduleDocument::class.java)
            }
            .switchIfEmpty(
                personalScheduleMongoRepository.findByUserId(userId)
            )
            .publishOn(Schedulers.boundedElastic())
            .doOnSuccess {
                if (it != null)
                    reactiveRedisTemplate.opsForValue()
                        .set(
                            "$redisPrefix$userId",
                            objectMapper.writeValueAsString(it),
                            personalScheduleRedisTime
                            ).block()
            }
    }

 

이렇게 Redis를 먼저 조회하고, 만약 Redis에 값이 없으면 MongoDB를 조회하는 코드이다.

해당 코드를 이용하여 테스트를 하고, Redis에 값이 있으면 MongoDB를 호출하지 않을 것이라고 생각했다.

 

@Test
        @DisplayName("Redis에 조회하려는 값이 있는 경우")
        fun findByUserIdAndRedisHaveResultReturnSuccess(){
            //given
            val userId = ObjectId.get()
            val personalSchedule = PersonalScheduleDocument(
                userId = userId,
            )
            val personalScheduleToString = objectMapper.writeValueAsString(personalSchedule)
            `when`(reactiveRedisTemplate.opsForValue())
                .thenReturn(reactiveValueOperations)

            `when`(reactiveValueOperations.get("$redisPrefix$userId"))
                .thenReturn(Mono.just(personalScheduleToString))

            `when`(personalScheduleMongoRepository.findByUserId(userId))
                .thenReturn(Mono.just(personalSchedule))

            `when`(reactiveValueOperations.set(
                "$redisPrefix$userId",
                personalScheduleToString,
                personalScheduleRedisTime
            )).thenReturn(Mono.just(true))

            //when
            StepVerifier.create(personalScheduleRepositoryImpl.findByUserId(userId))
                .expectNextMatches { it.userId == personalSchedule.userId }.verifyComplete()

            //then

            verify(personalScheduleMongoRepository, times(0)).findByUserId(userId)


        }

 

이렇게 하고 테스트를 해보았는데

이렇게 MongoDB의 findByUserId가 호출되었다고 나온다.

 

내가 코드를 작성한 것이라고 생각해서 계속 수정해 보았지만, 고칠 수 없었고 구글링을 통해 답을 얻을 수 있었다.

 

그냥 원래 그렇다고 한다...

switchIfEmpty가 지금은 데이터베이스에 접근하고 있기 때문에 접근을 최대한 하지 않아야 한다.

 

그렇기 때문에 Mono.defer를 이용하여 호출을 최대한 미루기로 했다.(defer를 왜 사용하는지 잘 몰랐는데... 여기에서 사용하는구나)

 

override fun findByUserId(userId: ObjectId): Mono<PersonalScheduleDocument> {
        return reactiveRedisTemplate.opsForValue()
            .get("$redisPrefix$userId")
            .map{
                objectMapper.readValue(it, PersonalScheduleDocument::class.java)
            }
            .switchIfEmpty(
                Mono.defer{personalScheduleMongoRepository.findByUserId(userId) }
            )
            .publishOn(Schedulers.boundedElastic())
            .doOnSuccess {
                if (it != null)
                    reactiveRedisTemplate.opsForValue()
                        .set(
                            "$redisPrefix$userId",
                            objectMapper.writeValueAsString(it),
                            personalScheduleRedisTime
                            ).block()
            }
    }

 

이렇게 해당 코드를 Mono.defer{personalScheduleMongoRepository.findByUserId(userId)}로 감싸고 테스트를 해보니

 

이렇게 테스트가 잘 통과하는 것을 볼 수 있었다.

 

switchIfEmpty 다 수정하러 가야겠다...

반응형

https://github.com/Seungkyu-Han/Toge-do-backend

 

GitHub - Seungkyu-Han/Toge-do-backend: Toge-do 앱의 백엔드 리포지토리입니다

Toge-do 앱의 백엔드 리포지토리입니다. Contribute to Seungkyu-Han/Toge-do-backend development by creating an account on GitHub.

github.com

MSA에서 요청을 받는 가장 상위 layer인 Gateway에 CircuitBreaker를 적용해보려 한다.

 

우선 CircuitBreaker에 대해 알아보자.

말 그대로 회로 차단기의 역할을 한다.

 

이런 서버가 있다고 생각해보자.

server B의 처리량에 제한이 있기 때문에 1초에 11번 이상 요청하면 server B에서 지연이 생기다가 에러가 반환될 것이다.

이런 상황을 막기 위해 server B로의 요청을 막고 default로 지정된 응답을 반환하는 것이다.

 

 

이렇게 CircuitBreaker는 요청하는 외부서버에서 일정량의 에러가 발생하면 해당 서버의 응답을 차단하고 default 응답을 반환하며, 조건이 충족되면 다시 해당 서버로 응답을 보내는 등, 서버에서의 장애 전파를 막는다.

 

Circuit Breaker는 아래와 같이 3가지의 상태가 존재한다.

  • closed: 해당 서버와 정상적으로 요청을 주고 받을 수 있는 상태
  • open: 해당 서버와 아예 요청을 주고 받을 수 없는 상태
  • half_open: 해당 서버와 응답을 주고 받을 수는 있지만, 불안정한 상태로 실패율을 측정해 close, open으로 변경될 상태

 

각 상태는 위와 같은 방식으로 상호작용한다.

 

실패율과 성공율은 sliding window라는 큐에 저장된 성공과 실패를 확인해, 계산한다.

 

이제 적용해보도록 하자.

 

이렇게 설계되어 있는 시스템에서 Gateway에 Circuit Breaker를 설정해보려 한다.

 

우선 Gateway에 해당 라이브러리를 추가해준다.

implementation("org.springframework.cloud:spring-cloud-starter-circuitbreaker-reactor-resilience4j")

 

그러고는 yml 파일에 다음과 같이 설정해준다. (@Bean으로 설정하는 방법도 있지만, 더 간단한 yml에 설정을 해보려 한다)

 

resilience4j:
  circuitbreaker:
    instances:
      user:
        sliding-window-size: 10
        failure-rate-threshold: 50
        minimum-number-of-calls: 4
        automatic-transition-from-open-to-half-open-enabled: true
        permitted-number-of-calls-in-half-open-state: 4
        wait-duration-in-open-state: 5s

      schedule:
        sliding-window-size: 10
        failure-rate-threshold: 50
        minimum-number-of-calls: 4
        automatic-transition-from-open-to-half-open-enabled: true
        permitted-number-of-calls-in-half-open-state: 4
        wait-duration-in-open-state: 5s
  timelimiter:
    instances:
      user:
        timeout-duration: 3s

      schedule:
        timeout-duration: 3s

 

(sliding-window-size): sliding window의 큐 사이즈

(failure-rate-threshold): 실패율이 해당 비율을 넘으면 회로가 차단

(minimun-number-of-calls): 성공률과 실패율을 계산 할 때는 큐에 최소 n개의 요소가 존재해야 함

(automatic-transition-from-open-to-half-open-enabled): 회로가 지정된 시간 이후에 자동으로 open에서 half_open으로 전환

(permitted-number-of-calls-in-half-open-state): half_open 상태에서 받을 요청의 개수

(wait-duration-in-open-state): open 상태에서 대기할 시간

 

timelimiter에서는 각 서버로부터의 응답이 설정한 시간을 넘으면 에러로 판단하고 지정된 응답을 반환한다는 것이다.

 

이렇게 CircuitBreaker들을 설정을 했으며, 이제 각 라우터에 적용해보자.

spring:
  cloud:
    gateway:
      routes:
        - id: user-security
          uri: ${USER_URI}
          predicates:
            - Path=/api/v1/friend/**
          filters:
            - AuthorizationFilter
            - name: CircuitBreaker
              args:
                name: user
                fallbackUri: forward:/fallback/user

 

해당 라우터에 필터로 CircuitBreaker라는 이름으로 필터를 등록해준다.

args의 name으로 CircuitBreaker의 이름을 지정하고, fallbackUri를 통해 지정된 default 응답을 설정한다.

 

이렇게 만들었으니 테스트를 해보자.

    @GetMapping("/CircuitBreaker-test")
    suspend fun test(@RequestParam flag: Int): ResponseEntity<HttpStatus>{
        
        if (flag == 1)
            Mono.delay(Duration.ofSeconds(10)).awaitSingle()

        return ResponseEntity.ok().build()
    }

 

이렇게 flag를 1로 주면 10초의 지연이 발생하기에, timelimter의 시간인 3초를 넘어 실패로 간주하고 지정된 응답을 반환하도록 만든 api가 있다.

 

 

이렇게 1로 요청을 해보니, 3초가 지나 지정된 응답으로 503과 문자열이 나오는 것을 볼 수 있었다.

그럼 이제 실패율을 50% 넘겨보고, 그 후 요청을 해보며 어떤 응답이 나오는지 보자.

 

빠르게 3번을 1로 요청해보며, minimum number의 4번에서 실패율을 50%를 넘겨보니

0을 요청해도 바로 실패가 반환하는 것을 볼 수 있었다.

 

 

지정한 시간인 5초 이후에 다시 요청해보자.

이번에는 같은 값으로 요청했음에도 200이 나오는 것을 볼 수 있었다.

 

이렇게 Gateway에 기본적인 Circuit breaker를 설정해보았다.

아직 Gateway를 제외하면 서버 간에 통신이 존재하지 않아서, 이 정도만 설정을 하겠지만 중요한 기술이니 앞으로도 계속 공부해두도록 해야겠다.

반응형

https://github.com/Seungkyu-Han/Toge-do-backend

 

GitHub - Seungkyu-Han/Toge-do-backend: Toge-do 앱의 백엔드 리포지토리입니다

Toge-do 앱의 백엔드 리포지토리입니다. Contribute to Seungkyu-Han/Toge-do-backend development by creating an account on GitHub.

github.com

해당 프로젝트에 transaction을 걸어보려 한다.

 

우선 transaction 설정을 하는 이유부터 말해보려 한다.

 

프로젝트 중 다음과 같은 코드가 있다.

    override fun disconnectFriend(id: ObjectId, friendId: ObjectId): Mono<UserDocument> {
        return friendService.removeFriend(id, friendId)
            .flatMap {
                friendService.removeFriend(friendId, id)
            }

id에 있는 friend를 삭제하고, 그 다음으로 friend에 있는 id를 삭제하는 것이다.

서로의 친구 목록에서 서로를 삭제하는 메서드이다.

 

다음과 같은 상황을 가정해보자.

위의 코드가 성공한 다음에, 아래에서는 에러가 발생한 상황이다.

각각 userA, userB라고 하면 userA의 친구목록에는 userB가 없지만 userB의 친구목록에는 userA가 있는 모순된 상황이 생긴다.

이러면 데이터베이스의 무결성이 깨지게 되기 때문에, 아래 코드에서 error가 생기면 위의 코드를 Rollback 해줘야 한다.

 

보통 이런 경우에서 기존의 Spring MVC에서는 @transactional을 사용하게 된다.

하지만 찾아보니 webflux 환경에서는 다른 스레드에서 작동한다는 특성 때문에 @transactional이 적용되지 않는다고 하여 알아보고, 방법을 찾아보려 한다.

 

  • 1. 중간에 Error를 발생시키고 데이터베이스 모니터링 해보기

위의 코드를 그대로 사용하고, 첫번째 removeFriend는 성공 두번째 removeFriend는 Exception을 발생시켰다.

이런 경우에는 예상대로, 한쪽의 친구만 삭제되고 한쪽은 남아있게 될까?

 

removeFriend를 다음과 같이 작성하여, 짝수번 요청에서는 에러를 발생시켰다.

private var flag = false
override fun removeFriend(userId: ObjectId, friendId: ObjectId): Mono<UserDocument> {
        return userRepository.findById(userId)
            .flatMap {
                flag = !flag
                if(flag){
                    it.removeFriend(friendId)
                }
                else
                    Mono.error(FriendException(ErrorCode.NOT_FRIEND))
            }
            .flatMap{
                userRepository.save(it)
            }.onErrorMap{
                when(it){
                    is CantRequestToMeException -> FriendException(ErrorCode.CANT_REQUEST_TO_ME)
                    is NotFriendException -> FriendException(ErrorCode.NOT_FRIEND)
                    else -> it
                }
            }
    }

 

현재 데이터베이는 아래와 같은 상태이다.

서로 각자의 ObjectId를 가지고 있다.

 

Swagger는 발생한 에러대로 친구가 아니라는 에러가 발생했으며

 

데이터베이스를 보면 한쪽의 친구 요청만 끊어진 것을 볼 수 있다.

 

자 이제 여기에 transaction을 적용해보자.

 

  • 2. TransactionalOperator를 적용

org.springframework.transaction.reactive 라이브러리에서 제공하는 TransactionalOperator를 이용하는 것이다.

 

우선 다음과 같이 Config 파일을 만들어주고

@Configuration
class TransactionalOperatorConfig {

    @Bean
    fun transactionalOperator(transactionManager: ReactiveMongoTransactionManager): TransactionalOperator {
        return TransactionalOperator.create(transactionManager)
    }

    @Bean
    fun reactiveMongoTransactionManager(dbFactory: ReactiveMongoDatabaseFactory): ReactiveMongoTransactionManager {
        return ReactiveMongoTransactionManager(dbFactory)
    }
}

여기서 중요한 부분은 해당 데이터베이스가 replica 세팅이 되어 있어야 한다는 것이다.

 

사용한 mongodb의 docker-compose 파일은 아래에 작성해두도록 하겠다.

해당 도커의 컨테이너에서 쉘에 들어가

rs.initiate()

이거만 입력해주면 설정이 끝난다.

 

이렇게 설정을 해두고

transaction을 설정하고 싶은 부분에 다음과 같이 작성해주면 된다.

transactionalOperator.transactional(Mono or Flux)

 

나는 해당 코드를 다음과같이 작성했다.

return transactionalOperator.transactional(friendService.removeFriend(id, friendId)
            .flatMap {
                friendService.removeFriend(friendId, id)
            })

이렇게 removeFriend의 2개의 함수가 하나의 transaction 안에 들어왔다

 

이제 한 번 테스트를 해보자.

(데이터베이스를 재설정해서 방금과 id가 바뀌었다.)

 

이게 데이터베이스이고 다시 에러가 발생하는 API를 요청해보았다.

 

일단 당연히 응답은 에러이다.

 

데이터베이스도 확인을 해보니

다음과 같이 transaction이 적용된 것을 볼 수 있었다.

 

해당 방법을 이용하여 프로젝트에 transaction을 적용하려 한다.

 

  • 3. @Transactional annotation 사용

자 그러면 진짜 webflux에서 transcationl annotation이 작동하지 않을까?

어차피 코드를 만들어본 김에 테스트 해보려 한다.

 

코드를 이렇게 만들어서 테스트 해보았다.

    @Transactional
    override fun disconnectFriend(id: ObjectId, friendId: ObjectId): Mono<UserDocument> {
//        return transactionalOperator.transactional(friendService.removeFriend(id, friendId)
//            .flatMap {
//                friendService.removeFriend(friendId, id)
//            })
        return friendService.removeFriend(id, friendId)
            .flatMap {
                friendService.removeFriend(friendId, id)
            }

 

현재 데이터베이스는 아래와 같다.(사실 위와 같다..)

 

자 이제 서버를 재실행하고 다시 에러가 발생하는 API를 요청해보자.

 

일단 당연하게 응답은 같다.

 

자 이제 데이터베이스를 확인해보자.

이렇게... transaction 적용이 된 것을 볼 수 있다...하하

 

그래도 프로젝트에서는 2번 방법을 사용할 것 같다.

값을 비동기로 반환하기에... 하지만 왜 영어를 쓰시는 분들이 webflux에서 @transactional을 사용하지 말라는지는 더 알아봐야 할 거 같다...

 

이렇게 오랫동안 고민만 했던 transaction을 프로젝트에 적용할 수 있었다.

반응형

https://github.com/Seungkyu-Han/Toge-do-backend

 

GitHub - Seungkyu-Han/Toge-do-backend: Toge-do 앱의 백엔드 리포지토리입니다

Toge-do 앱의 백엔드 리포지토리입니다. Contribute to Seungkyu-Han/Toge-do-backend development by creating an account on GitHub.

github.com

이번에는 앱이 실행 중이지 않더라도 실시간 알림을 받는 방법이다.

(FCM을 사용하며, 다른 방법은 솔직히 생각해보지 않았다...)

 

이번에도 Webflux와 MSA 환경이며, 시스템 구성도도 저번과 같다.

 

우선 GCP 콘솔에서 프로젝트를 만들어주고, 프론트에 말을 해준 뒤 기본 설정들을 부탁한다.

그리고 테스트 할 수 있도록 deviceToken 하나를 받아왔다.

 

우선 이 토큰이 맞는지 FCM 페이지에서 테스트를 해보자.

이 화면에서 테스트 메시지를 보낼 수 있고, 파란색의 테스트 메시지 전송 버튼을 누르면 디바이스 토큰을 입력한 뒤 메시지를 보낼 수 있다.

 

우선 테스트 메시지를 전송해보니, 해당 기기에 알림이 잘 오는 것을 확인했다.

그럼 이제 스프링에서 알림을 보내보도록 하자.

 

 

프로젝트 설정 페이지에서 공개키를 가져와 스프링에 설정해준다.

 

저렇게 예시가 작성되어 있으며, 나도 저 예시를 따라서 인증 정보를 가져올 수 있도록 했다.

 

우선 필요한 라이브러리들을 불러온다.

implementation("com.google.firebase:firebase-admin:9.4.2")

 

자바였으면 @PostConstruct였나...? 를 사용하겠지만, 코틀린을 사용하기 때문에 Init을 사용해 서버스 클래스를 생성하자마자 인증 정보를 불러오고 firebase app을 초기화해주었다.

    init{
        val serviceAccount = FileInputStream(credentials)
        val options = FirebaseOptions.builder()
            .setCredentials(GoogleCredentials.fromStream(serviceAccount))
            .build()

        FirebaseApp.initializeApp(options)
    }

 

이제 firebase로 요청하는 코드인데, 라이브러리를 제공하기 때문에 어렵지 않다.

notification을 작성 -> message를 작성 -> 작성한 message를 FirebaseMessaging 클래스로 요청이다.

 

            val notification = Notification.builder()
                .setTitle(title)
                .setBody(content)
                .build()

            val message = Message.builder()
                .setNotification(notification)
                .setToken(deviceToken)
                .build()
                
            FirebaseMessaging.getInstance().send(message)

 

이렇게 작성하여 요청하면 되고, 나는 코루틴을 사용하여

    override fun pushNotification(deviceToken: String, title: String, content: String) {
        CoroutineScope(Dispatchers.IO).launch {
            val notification = Notification.builder()
                .setTitle(title)
                .setBody(content)
                .build()

            val message = Message.builder()
                .setNotification(notification)
                .setToken(deviceToken)
                .build()
            FirebaseMessaging.getInstance().send(message)
        }
    }

 

이렇게 service 클래스를 작성했다.

아래는 작성한 전체 service 코드이다.

@Service
class FCMServiceImpl(
    @Value("\${FCM.CREDENTIALS}")
    private val credentials: String
): FCMService {

    init{
        val serviceAccount = FileInputStream(credentials)
        val options = FirebaseOptions.builder()
            .setCredentials(GoogleCredentials.fromStream(serviceAccount))
            .build()

        FirebaseApp.initializeApp(options)
    }

    override fun pushNotification(deviceToken: String, title: String, content: String) {
        CoroutineScope(Dispatchers.IO).launch {
            val notification = Notification.builder()
                .setTitle(title)
                .setBody(content)
                .build()

            val message = Message.builder()
                .setNotification(notification)
                .setToken(deviceToken)
                .build()
            FirebaseMessaging.getInstance().send(message)
        }
    }
}

 

이제 kafka listener를 사용해 해당 서비스를 요청하도록 만들어야 한다.

해당 서비스는 우선 SSE로 요청을 시도하고, 연결된 기기가 없다면 FCM을 사용해 이벤트를 전송하도록 만들었다.

 

    @KafkaListener(topics = ["FRIEND_REQUEST_TOPIC"], groupId = "seungkyu")
    fun requestFriend(message: String){
        val event = EventEnums.REQUEST_FRIEND_EVENT
        val friendRequestEventDto = objectMapper.readValue(message, FriendRequestEventDto::class.java)
        val isSSE = notificationService.publishNotification(
            id = friendRequestEventDto.receiverId,
            sseDao = SSEDao(EventEnums.REQUEST_FRIEND_EVENT, friendRequestEventDto.sender)
        )
        if (!isSSE && friendRequestEventDto.deviceToken != null){
            fcmService.pushNotification(
                deviceToken = friendRequestEventDto.deviceToken,
                title = event.eventTitle,
                content = "${friendRequestEventDto.sender}${event.eventContent}"
            )
        }
    }

 

kafka listener로 이벤트를 받고, sink가 연결되어 있지 않으며 device token이 있는 상황이라면 해당 device token을 사용해 FCM service로 요청해 앱 푸쉬 알림을 전송한다.

 

이렇게 작성하고 친구요청으로 테스트를 해보았는데, 아래와 같이 잘 가는 것을 볼 수 있었다.

 

이렇게 앱 푸쉬 서비스를 개발해보았다.

개인적으로는 SSE보다 더 쉬운 것 같았다.

반응형

https://github.com/Seungkyu-Han/Toge-do-backend

 

GitHub - Seungkyu-Han/Toge-do-backend: Toge-do 앱의 백엔드 리포지토리입니다

Toge-do 앱의 백엔드 리포지토리입니다. Contribute to Seungkyu-Han/Toge-do-backend development by creating an account on GitHub.

github.com

앱을 실행 중인 상태에서는 FCM이 아닌 SSE로 실시간 알림을 받을 수 있도록 개발을 해보려고 한다.

(사실 다른 곳들은 앱이 켜진 상태에서도 FCM을 사용하여 알림을 받도록 하는 것 같았지만, 공부를 위한 프로젝트이기 때문에 SSE와 FCM을 병행해보려 한다.)

 

우선 Webflux와 MSA 환경에서 개발했다.

시스템 구성도는 다음과 같다.

 

kafka를 활용하여 각 서버에서 발생한 이벤트들을 notification server에서 listen 할 수 있도록 했고, notification에서 SSE api를 만들어 이벤트들을 실시간으로 전송할 수 있도록 했다.

 

우선 SSE Service부터 개발해보자

private val sinkMap = ConcurrentHashMap<String, Many<SSEDao>>()

 

이 sink를 활용하여 구독하고 있는 client에게 이벤트를 전송한다.

 

우선 이 sink에 구독하는 서비스이다.

    override fun subscribeNotification(id: String): Flux<ServerSentEvent<SSEDto>> {
        sinkMap[id] = Sinks.many().unicast().onBackpressureBuffer()
        return sinkMap[id]!!.asFlux()
            .doOnCancel { sinkMap.remove(id) }
            .map{
                sseDao ->
                ServerSentEvent
                    .builder(
                        SSEDto(
                            sender = sseDao.sender
                        )
                    )
                    .event(sseDao.event.eventValue.toString())
                    .build()
            }
    }

 

우선 id를 통해 해당 메서드로 구독 요청이 들어오면

sinkMap에 해당 id로 concurrentHashMap에 등록을 해준다.

 

그러고 return은 거기서 asFlux이며 거기에서 map을 통해 이벤트가 올 때마다 메시지를 전송해주게 된다.

나는 SSEDto라는 data class를 만들어서 거기에 전송할 정보들을 작성했다.

해당 함수의 return type은 Flux<ServerSentEvent<Any>>이며 flux에서 내려오는 값을 map으로 변환하여 ServerSentEvent 타입으로 넘겨주면 된다.

 

해당 클라이언트와 연결이 끊어지면, 해당 hashMap에서도 삭제해야 하기 때문에 .doOnCancel로 연결이 끊어지면 수행할 작업에 hashMap에서 삭제하는 코드를 적어준다.

 

이제 해당 flux로 이벤트를 전송하는 메서드이다.

    override fun publishNotification(id: String, sseDao: SSEDao): Boolean {
        val sink = sinkMap[id]
        return if (sink != null) {
            sink.tryEmitNext(sseDao)
            true
        } else {
            false
        }
    }

이렇게 id를 사용해 해당 sinkMap에서 값을 찾고, 그 값으로 .tryEmitNext 메서드를 호출하여 이벤트를 전송하면 된다.

나는 만약 연결이 없다면 푸쉬알림을 줄 수 있도록, Boolean으로 return을 만들어주었다.

 

이제 해당 SSE에 구독하는 controller이다.

@RestController
@RequestMapping("/api/v1/notification")
class NotificationController(
    private val notificationService: NotificationService
) {

    @GetMapping(produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun subscribeNotifications(
        @Parameter(hidden = true) @RequestHeader("X-VP-UserId") userId: String,
    ): Flux<ServerSentEvent<SSEDto>>{
        return notificationService.subscribeNotification(userId)
    }

}

이렇게 produce에 MediaType.TEXT_EVENT_STREAM_VALUE를 명시해준다.

사용자를 식별 할 수 있도록 id(여기서는 gateway를 통해 filter 된 아이디가 전달)를 받아, return type으로 Flux<ServerSentEvent<Any>>만 넘겨주면 된다.

 

이렇게 하면 사용자의 구독을 받고, 이벤트가 발생하면 넘겨줄 수도 있게 되었다.

 

이제 이벤트를 받아오는 kafkaListener까지 완성해보자.

 

    @KafkaListener(topics = ["FRIEND_REQUEST_TOPIC"], groupId = "seungkyu")
    fun requestFriend(message: String){
        val event = EventEnums.REQUEST_FRIEND_EVENT
        val friendRequestEventDto = objectMapper.readValue(message, FriendRequestEventDto::class.java)
        notificationService.publishNotification(
            id = friendRequestEventDto.receiverId,
            sseDao = SSEDao(EventEnums.REQUEST_FRIEND_EVENT, friendRequestEventDto.sender)
        )
    }

 

event로는 수신해야 하는 아이디와, 상호작용을 하는 상대방의 name이 넘어온다.

 

해당 listener에 sse service(notificationService)를 주입하고, 만들었던 publishNotification 함수를 사용해 해당 사용자에게 이벤트를 전송해준다.

 

아래는 완성한 service 코드이다.

@Service
class NotificationServiceImpl: NotificationService {

    private val sinkMap = ConcurrentHashMap<String, Many<SSEDao>>()

    override fun subscribeNotification(id: String): Flux<ServerSentEvent<SSEDto>> {
        sinkMap[id] = Sinks.many().unicast().onBackpressureBuffer()
        return sinkMap[id]!!.asFlux()
            .doOnCancel { sinkMap.remove(id) }
            .map{
                sseDao ->
                ServerSentEvent
                    .builder(
                        SSEDto(
                            sender = sseDao.sender
                        )
                    )
                    .event(sseDao.event.eventValue.toString())
                    .build()
            }
    }

    override fun publishNotification(id: String, sseDao: SSEDao): Boolean {
        val sink = sinkMap[id]
        return if (sink != null) {
            sink.tryEmitNext(sseDao)
            true
        } else {
            false
        }
    }
}

 

이거 다음으로는 앱이 켜지지 않은 상태에서도 알림을 보낼 수 있도록 FCM을 활용한 푸쉬알림을 구현해보려 한다.

반응형

https://github.com/Seungkyu-Han/Toge-do-backend

 

GitHub - Seungkyu-Han/Toge-do-backend: Toge-do 앱의 백엔드 리포지토리입니다

Toge-do 앱의 백엔드 리포지토리입니다. Contribute to Seungkyu-Han/Toge-do-backend development by creating an account on GitHub.

github.com

이번 프로젝트는 Gateway를 사용하기 때문에 각각의 server에서 security를 설정하지 않고, Gateway에서 Jwt 토큰을 확인하여 헤더를 변경해 각 서버로 재요청하도록 설계해보았다.

 

그렇게 작성한 GatewayFilter는 다음과 같다.

@Component
class AuthorizationFilter(
    private val jwtTokenProvider: JwtTokenProvider
) : AbstractGatewayFilterFactory<AuthorizationFilter.Config>(Config::class.java) {

    class Config

    override fun apply(config: Config): GatewayFilter {
        return GatewayFilter { exchange, chain ->
            val authHeader = exchange.request.headers["Authorization"]?.firstOrNull()

            if (!isValidAuthorizationHeader(authHeader)) {
                return@GatewayFilter forbid(exchange)
            }

            val token = authHeader!!.removePrefix("Bearer ")

            if (!jwtTokenProvider.isAccessToken(token)) {
                return@GatewayFilter forbid(exchange)
            }

            try {
                val userId = jwtTokenProvider.getUserId(token)
                val nextReq = exchange.request.mutate()
                    .header("X-VP-UserId", userId)
                    .build()

                chain.filter(exchange.mutate().request(nextReq).build())
            } catch (jwtException: JwtException) {
                forbid(exchange)
            }
        }
    }

    private fun isValidAuthorizationHeader(authHeader: String?): Boolean {
        return authHeader != null && authHeader.startsWith("Bearer ")
    }

    private fun forbid(exchange: ServerWebExchange): Mono<Void> {
        exchange.response.statusCode = HttpStatus.FORBIDDEN
        return exchange.response.setComplete()
    }
}

 

헤더를 확인하여 헤더가 비어 있거나, Bearer 로 시작하지 않으면 403을 반환한다.

토큰들이 유효하지 않은 경우에도 403을 반환한다.

 

이렇게 filter를 만들고 보니 테스트를 해보고 싶어졌다.

 

하지만 이 apply 함수를 어떻게 테스트하는지 몰랐고, 찾아보던 중 MockServerHttpRequestMockServerWebExchange를 발견했다.

 

이거로 ServerHttpRequest, ServerWebExchange를 생성하고 이걸 넘겨주면 되는 것이다.

 

    @Test
    @DisplayName("Token이 Bearer로 시작하지 않는 경우")
    fun authorizationIsNotStartWithBearerReturnForbidden(){
        //given
        val request = MockServerHttpRequest.get("/")
            .header(HttpHeaders.AUTHORIZATION, "NOT BEARER!!")

        val exchange = MockServerWebExchange.from(request)
        val filter = authorizationFilter.apply(AuthorizationFilter.Config())

        //when
        StepVerifier.create(filter.filter(exchange) { _ -> Mono.empty() })
            .verifyComplete()

        //then
        Assertions.assertEquals(HttpStatus.FORBIDDEN, exchange.response.statusCode)
    }

 

우선 가장 먼저 토큰이 없는 경우이다.

Authorization 헤더가 비어있기 때문에 바로 403이 반환된다.

 

    @Test
    @DisplayName("토큰이 만료된 경우")
    fun tokenIsExpiredReturnForbidden(){
        //given
        val token = "i am token!"
        val request = MockServerHttpRequest.get("/")
            .header(HttpHeaders.AUTHORIZATION, "Bearer $token")

        val exchange = MockServerWebExchange.from(request)
        val filter = authorizationFilter.apply(AuthorizationFilter.Config())

        `when`(jwtTokenProvider.getUserId(token))
            .thenThrow(ExpiredJwtException::class.java)

        //when
        StepVerifier.create(filter.filter(exchange) { _ -> Mono.empty() })
            .verifyComplete()

        //then
        Assertions.assertEquals(HttpStatus.FORBIDDEN, exchange.response.statusCode)
    }

 

MockServerHttpRequest에 원하는 헤더를 넣을 수 있다.

어차피 어떤 토큰이든지 mocking한 jwtTokenProvider에서 원하는 값을 리턴하기 때문에 아무 값이나 넣어주었다.

 

Jwt를 Parse 할 때 발생하는 에러는 ExpiredJwtException, MalformedJwtException, SignatureException 이기 때문에 이 에러들을 mocking으로 부터 발생시켜 각각의 Response를 테스트 할 수 있었다.

반응형

Junit에서 redis를 @MockBean으로 만들어서 사용하던 중 에러가 발생했다.

사용한 코드는 다음과 같다.

 

        @Test
        @DisplayName("유효한 코드를 입력시 true를 반환")
        fun checkValidCodeReturnTrue(){
            //given
            val email = "${UUID.randomUUID()}@test.com"
            val code = UUID.randomUUID().toString()

            //when
            `when`(reactiveRedisTemplate.opsForValue().get("$redisKeyPrefix:$email"))
                .thenReturn(Mono.just(code))

            //then
            StepVerifier.create(emailService.checkValidEmail(email, code))
                .expectNext(true)
                .verifyComplete()
        }

 

Redis를 확인해보고 해당 인증번호가 있는지 확인하는 테스트코드이다.

 

여기서 다음과 같은 에러가 발생했다.

Cannot invoke "org.springframework.data.redis.core.ReactiveValueOperations.get(Object)" because the return value of "org.springframework.data.redis.core.ReactiveRedisTemplate.opsForValue()" is null

 

 

reactiveRedisTemplate은 opsForValue 메서드를 호출하면 ReactiveValueOperations를 반환하는데, 이 객체가 null이라는 것이다.

redis를 mocking하고 메서드를 호출 할 때는 바로 get을 호출하는 게 아니라, 그 중간에 ReactiveValueOperations도 Mocking하고 여기서 get을 호출해야 하는 것이다.

 

        `when`(reactiveRedisTemplate.opsForValue())
            .thenReturn(reactiveValueOperations)

        `when`(reactiveValueOperations.get("$redisKeyPrefix:$email"))
            .thenReturn(Mono.just(code))

 

 

이렇게 2단계로 when을 나누어주는 방법으로 테스트에 성공했다.

반응형

Kafka를 사용해서 event를 주고 받는 코드를 작성하다, publish는 되지만 consume이 되지 않는 문제가 발생했다.

 

메시지가 자세하게 나오지 않아

logging:
  level:
    org.springframework.kafka: DEBUG
    org.apache.kafka: DEBUG

 

이렇게 kafka 관련해서 더 자세한 로그를 출력하도록 하고, 로그를 확인하고 있으니

 

org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.

 

이런 에러가 발생하고 있었다.

 

이 전에도 kafka를 계속 테스트 해보았는데, kafka의 docker-compose 문제라고 생각했고 이 에러메시지를 구글에 검색해보았다.

 

https://stackoverflow.com/questions/40316862/the-group-coordinator-is-not-available-kafka

 

The group coordinator is not available-Kafka

When I am write a topic to kafka,there is an error:Offset commit failed: 2016-10-29 14:52:56.387 INFO [nioEventLoopGroup-3-1][org.apache.kafka.common.utils.AppInfoParser$AppInfo:82] - Kafka versio...

stackoverflow.com

 

그러니 이런 자료를 찾을 수 있었는데, 나와 같은 문제인 것 같았다.

 

답글에 있는대로 하나의 kafka만 사용하면 이런 에러가 발생 할 수 있다고 했다.

현재 

services:
  zookeeper:
    container_name: zookeeper
    image: confluentinc/cp-zookeeper:7.2.6
    ports:
      - '32181:32181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - vp

  kafka:
    container_name: kafka
    image: confluentinc/cp-kafka:7.2.6
    ports:
      - '9092:9092'
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://kafka:9092
    networks:
      - vp

  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    ports:
      - "9000:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:29092
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:22181

    networks:
      - vp

networks:
  vp:
    driver: bridge
    name: vp
    external: true

 

이렇게 하나의 kafka만 사용하고 있었고, 답글대로

KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

 

이런 옵션을 추가해주었고

 

docker container를 다시 실행주었다.

 

그러고 이벤트를 발생시키니

@Component
class ImageEventListener(
    private val imageService: ImageService
) {

    @KafkaListener(topics = ["IMAGE_DELETE_IMAGE_TOPIC"], groupId = "seungkyu")
    fun deleteImageListener(fileName: String){
        println("Deleting image $fileName")
        imageService.deleteImage(fileName)
    }
}

 

이렇게 작성한 이벤트가 실행되는 것을 볼 수 있었다.

 

kafka 연결 문제에만 3일 정도 쓴 거 같은데... 역시 구글링하면 답이 나오기는 한다....해결해서 다행이다 ㅠㅠ

반응형

이번에 Kafka를 공부하며 서버 간에 Kafka 통신을 할 수 있게 되었고, 이것을 이용하여 사용자의 정보를 수정할 때마다 사용자에게 메일로 알림을 발송하는 시스템을 만들어보려 한다.

 

시스템 구조는 위와 같다.

 

User Server에서 직접 이메일을 전송하는 방법도 있겠지만, Email 발송 자체가 시간이 오래 걸리는 작업이며 사용자의 정보만 다루는 User Server에 있어야 하는 기능은 아니기 때문에 이런 식으로 분리를 한 것이다.

 

우선 dependency는 아래와 같다.

    implementation("io.projectreactor.kafka:reactor-kafka:1.3.19")
    implementation("org.springframework.kafka:spring-kafka:3.0.9")
    implementation("org.springframework.boot:spring-boot-starter-webflux")

Spring Webflux를 사용하기 때문에 webflux, kafka를 사용하기 위해 kafka를 implementation 해준다.

 

그러고는 Email Service를 다음과 같이 작성했다.

@Service
class EmailService(
    private val javaMailSender: JavaMailSender,
) {

    fun sendEmail(title: String, content: String, address: String) {

        val message = SimpleMailMessage()

        message.setTo(address)
        message.subject = title
        message.text = content

        javaMailSender.send(message)
    }
}

그냥 간단하게 이메일의 내용 작성해서 보내는 서비스이다.

 

이렇게 정상적으로 메일이 오는 것을 볼 수 있다.

 

그럼 이제 유저 서버를 만들고 유저 정보를 수정하면 Kafka에 이벤트를 publish하는 서버를 만들어보자.

 

우선 Producer 쪽의 Kafka 설정이다.

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: seungkyu

yml에는 이렇게 bootsrap-server를 명시해주고

 

@Configuration
class KafkaConfig {

    @Bean
    fun reactiveKafkaProducerTemplate(
        kafkaProperties: KafkaProperties
    ): ReactiveKafkaProducerTemplate<String, String> {

        val props = HashMap<String, Any>()

        props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaProperties.bootstrapServers
        props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java

        return ReactiveKafkaProducerTemplate(
            SenderOptions.create(props)
        )
    }

}

 

이렇게 reactiveKafkaProducerTemplate Bean을 만들어준다.

KafkaProperties를 parameter로 받으면 yml에 작성된 kafka의 정보를 불러오게 된다.

 

이것을 바탕으로 reactive kafka를 만들어주었다.

@Service
class UserService(
    private val reactiveKafkaProducerTemplate: ReactiveKafkaProducerTemplate<String, String>,
    private val objectMapper: ObjectMapper
) {

    fun changeUserInfo(userInfo: UserInfo){
        reactiveKafkaProducerTemplate.send("change-user-info", objectMapper.writeValueAsString(userInfo))
            .subscribe()
    }
}

 

데이터베이스에서 수정하는 부분은 빼고, 이렇게 chage-user-info 토픽으로 변경된 유저의 정보를 보내주었다.

 

이제 kafka listener를 만들어서 해당 event가 올 때마다 메일을 보내주면 된다.

 

다시 메일 서버로 가서

@Component
class UserKafkaListener(
    private val objectMapper: ObjectMapper,
    private val emailService: EmailService
) {

    data class UserInfo(
        val name: String,
        val email: String,
        val age: Int
    )

    @KafkaListener(topics = ["change-user-info"])
    fun changeUserListener(data: String){
        val userInfo = objectMapper.readValue(data, UserInfo::class.java)
        emailService.sendEmail(
            address = userInfo.email,
            title = "유저 정보가 변경되었습니다.",
            content = "유저 정보가 변경되었습니다. 본인이 아니라면 신고해주시기 바랍니다."
        )
    }
}

 

이렇게 Listener를 작성해준다.

@Component안에 @KafkaListener(topics = ["구독하고 싶은 토픽"])을 적어주면 된다.

당연하게 publish 하는 topic과 같은 topic이어야 한다.

 

지금은 문자열로 데이터를 주고받지만, JsonSerializer등을 사용하면 Json으로 바로 데이터를 주고 받을 수 있다.

 

방금 작성했던 user.http로 다시 사용자 정보를 수정해보면

이렇게 메일이 정상적으로 오는 것을 볼 수 있다.

+ Recent posts