반응형

https://github.com/Seungkyu-Han/Toge-do-backend

 

GitHub - Seungkyu-Han/Toge-do-backend: Toge-do 앱의 백엔드 리포지토리입니다

Toge-do 앱의 백엔드 리포지토리입니다. Contribute to Seungkyu-Han/Toge-do-backend development by creating an account on GitHub.

github.com

앱을 실행 중인 상태에서는 FCM이 아닌 SSE로 실시간 알림을 받을 수 있도록 개발을 해보려고 한다.

(사실 다른 곳들은 앱이 켜진 상태에서도 FCM을 사용하여 알림을 받도록 하는 것 같았지만, 공부를 위한 프로젝트이기 때문에 SSE와 FCM을 병행해보려 한다.)

 

우선 Webflux와 MSA 환경에서 개발했다.

시스템 구성도는 다음과 같다.

 

kafka를 활용하여 각 서버에서 발생한 이벤트들을 notification server에서 listen 할 수 있도록 했고, notification에서 SSE api를 만들어 이벤트들을 실시간으로 전송할 수 있도록 했다.

 

우선 SSE Service부터 개발해보자

private val sinkMap = ConcurrentHashMap<String, Many<SSEDao>>()

 

이 sink를 활용하여 구독하고 있는 client에게 이벤트를 전송한다.

 

우선 이 sink에 구독하는 서비스이다.

    override fun subscribeNotification(id: String): Flux<ServerSentEvent<SSEDto>> {
        sinkMap[id] = Sinks.many().unicast().onBackpressureBuffer()
        return sinkMap[id]!!.asFlux()
            .doOnCancel { sinkMap.remove(id) }
            .map{
                sseDao ->
                ServerSentEvent
                    .builder(
                        SSEDto(
                            sender = sseDao.sender
                        )
                    )
                    .event(sseDao.event.eventValue.toString())
                    .build()
            }
    }

 

우선 id를 통해 해당 메서드로 구독 요청이 들어오면

sinkMap에 해당 id로 concurrentHashMap에 등록을 해준다.

 

그러고 return은 거기서 asFlux이며 거기에서 map을 통해 이벤트가 올 때마다 메시지를 전송해주게 된다.

나는 SSEDto라는 data class를 만들어서 거기에 전송할 정보들을 작성했다.

해당 함수의 return type은 Flux<ServerSentEvent<Any>>이며 flux에서 내려오는 값을 map으로 변환하여 ServerSentEvent 타입으로 넘겨주면 된다.

 

해당 클라이언트와 연결이 끊어지면, 해당 hashMap에서도 삭제해야 하기 때문에 .doOnCancel로 연결이 끊어지면 수행할 작업에 hashMap에서 삭제하는 코드를 적어준다.

 

이제 해당 flux로 이벤트를 전송하는 메서드이다.

    override fun publishNotification(id: String, sseDao: SSEDao): Boolean {
        val sink = sinkMap[id]
        return if (sink != null) {
            sink.tryEmitNext(sseDao)
            true
        } else {
            false
        }
    }

이렇게 id를 사용해 해당 sinkMap에서 값을 찾고, 그 값으로 .tryEmitNext 메서드를 호출하여 이벤트를 전송하면 된다.

나는 만약 연결이 없다면 푸쉬알림을 줄 수 있도록, Boolean으로 return을 만들어주었다.

 

이제 해당 SSE에 구독하는 controller이다.

@RestController
@RequestMapping("/api/v1/notification")
class NotificationController(
    private val notificationService: NotificationService
) {

    @GetMapping(produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun subscribeNotifications(
        @Parameter(hidden = true) @RequestHeader("X-VP-UserId") userId: String,
    ): Flux<ServerSentEvent<SSEDto>>{
        return notificationService.subscribeNotification(userId)
    }

}

이렇게 produce에 MediaType.TEXT_EVENT_STREAM_VALUE를 명시해준다.

사용자를 식별 할 수 있도록 id(여기서는 gateway를 통해 filter 된 아이디가 전달)를 받아, return type으로 Flux<ServerSentEvent<Any>>만 넘겨주면 된다.

 

이렇게 하면 사용자의 구독을 받고, 이벤트가 발생하면 넘겨줄 수도 있게 되었다.

 

이제 이벤트를 받아오는 kafkaListener까지 완성해보자.

 

    @KafkaListener(topics = ["FRIEND_REQUEST_TOPIC"], groupId = "seungkyu")
    fun requestFriend(message: String){
        val event = EventEnums.REQUEST_FRIEND_EVENT
        val friendRequestEventDto = objectMapper.readValue(message, FriendRequestEventDto::class.java)
        notificationService.publishNotification(
            id = friendRequestEventDto.receiverId,
            sseDao = SSEDao(EventEnums.REQUEST_FRIEND_EVENT, friendRequestEventDto.sender)
        )
    }

 

event로는 수신해야 하는 아이디와, 상호작용을 하는 상대방의 name이 넘어온다.

 

해당 listener에 sse service(notificationService)를 주입하고, 만들었던 publishNotification 함수를 사용해 해당 사용자에게 이벤트를 전송해준다.

 

아래는 완성한 service 코드이다.

@Service
class NotificationServiceImpl: NotificationService {

    private val sinkMap = ConcurrentHashMap<String, Many<SSEDao>>()

    override fun subscribeNotification(id: String): Flux<ServerSentEvent<SSEDto>> {
        sinkMap[id] = Sinks.many().unicast().onBackpressureBuffer()
        return sinkMap[id]!!.asFlux()
            .doOnCancel { sinkMap.remove(id) }
            .map{
                sseDao ->
                ServerSentEvent
                    .builder(
                        SSEDto(
                            sender = sseDao.sender
                        )
                    )
                    .event(sseDao.event.eventValue.toString())
                    .build()
            }
    }

    override fun publishNotification(id: String, sseDao: SSEDao): Boolean {
        val sink = sinkMap[id]
        return if (sink != null) {
            sink.tryEmitNext(sseDao)
            true
        } else {
            false
        }
    }
}

 

이거 다음으로는 앱이 켜지지 않은 상태에서도 알림을 보낼 수 있도록 FCM을 활용한 푸쉬알림을 구현해보려 한다.

+ Recent posts