반응형

Redis의 pub-sub 구조를 공부하면서, 이런 비동기 pub-sub 구조로 별도의 서버에서 알림을 보내고, 받는 서버를 만들 수 있을 거 같다는 생각이 들었다.

 

Redis와 SSE를 더 연습하기 위해 바로 만들어보았다.

 

우선 Redis의 Pub-Sub은 비동기 message queue 방식으로

이렇게 Redis에 publish하면 subscribe하고 있는 서버들에게 데이터를 전송하는 구조이다.

하지만 redis에 있는 데이터는 남아있지 않고 바로 휘발되기에 때에 따라서는 Kafka와 같은 message queue 구조가 더 적합할 수도 있다.

이렇게 Redis를 사용하면 send server에서 직접적으로 receive server들에게 데이터를 전송할 필요가 없기 때문에, 느슨한 연결관계를 만들어 줄 수 있기에 만약 receive server를 추가하더라도 send server에서는 변경할 필요 없이 해당 receive server가 redis를 subscribe하기만 하면 된다.

우선 각각의 send server, receive server를 20000, 20001 포트로 구성했다.

 

사용한 dependencies들은 다음과 같다.

    implementation("org.springframework.boot:spring-boot-starter-webflux:3.3.5")

    //COROUTINE
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-rx3")

    implementation("io.lettuce:lettuce-core:6.5.0.RELEASE")

    compileOnly("org.projectlombok:lombok:1.18.34")
    annotationProcessor("org.projectlombok:lombok:1.18.34")
    implementation("org.springframework.boot:spring-boot-starter-data-redis")

 

  • Publish Server

우선 상대적으로 쉬운 publish server부터 만들어보겠다.

 

아래와 같이 redis 정보를 입력해주고

spring:
  data:
    redis:
      port: 12042

server:
  port: 20000

 

ReactiveRedisTemplate에 convertAndSend 함수를 사용하여 해당 채널로 메시지를 전송할 수 있다.

 


@RestController
class SendController(
    private val redisTemplate: ReactiveRedisTemplate<String, String>,
    private val objectMapper: ObjectMapper
) {

    data class MessageDto(
        val message: String,
        val user: String,
        val chatRoom: String
    )

    @PostMapping("/send")
    suspend fun sendMessage(@RequestBody messageDto: MessageDto) {
        redisTemplate.convertAndSend(
            "users:message",
                objectMapper.writeValueAsString(messageDto)
        ).subscribe()
    }
}

 

Post로 메시지를 받고, "users:message"라는 채널로 해당 메시지를 직렬화하여 전송한다.

Publish Server에는 이렇게 전송만하면 된다.

 

  • Subscribe Server

이제 어려운 Subsribe server를 만들어보자.

우선 Redis의 해당 채널을 구독해야 한다.

해당 채널의 정보는 Configuration에 작성한다.

 

@Configuration
class RedisConfig(
    private val redisConnectionFactory: RedisConnectionFactory,
    private val messageListenerService: MessageListenerService
) {

    @Bean
    fun redisMessageListenerContainer(): RedisMessageListenerContainer {
        val container = RedisMessageListenerContainer()
        container.connectionFactory = redisConnectionFactory
        container.addMessageListener(messageListenerService,
            ChannelTopic("users:message")
        )
        return container
    }
}

 

이런 식으로 채널을 구독한다.

addMessageListener를 더 작성하여, 2개 이상의 채널을 구독할 수도 있다.

 

이제 MessageListener를 상속받은 MessageListenerService를 구현해야 한다.

 

우선 이렇게만 작성해보고, 채널에서 메시지가 오는지 확인해본다.

@Service
@Slf4j
class MessageListenerService(
    private val objectMapper: ObjectMapper
): MessageListener {

    companion object {
        private val log = LoggerFactory.getLogger(MessageListenerService::class.java)
    }

    override fun onMessage(message: Message, pattern: ByteArray?) {
        log.info("Received a message: {}, from channel : {}", message.body.toString(Charsets.UTF_8), message.channel.toString(Charsets.UTF_8))
    }
}

 

이렇게 작성하고 PublishServer에서 POST 요청을 해보니

 

이런 식으로 메시지 큐에서 값을 잘 전달받는 것을 볼 수 있었다.

 

이제 SSE를 요청할 Controller를 만들어보자

@RestController
class ReceiveController(
    private val messageListenerService: MessageListenerService
) {

    @GetMapping("/receive", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun receive(@RequestParam chatRoom: String): Flux<ServerSentEvent<String>>{
        return messageListenerService.registerSink(chatRoom)
            .map{
                ServerSentEvent.builder<String>()
                    .data(it)
                    .comment("This is notification")
                    .build()
        }
    }
}

 

이렇게 messageListenerService를 주입받는 컨트롤러를 만든다.

Header를 TEXT_EVENT_STREAM_VALUE로 지정해서 SSE를 명시해준다.

그리고는 sink를 등록하고, 거기서부터 받는 메시지를 계속 ServerSentEvent로 보내준다.

 

다시 MessageListenerService로 돌아가서 수정해보자.

@Service
@Slf4j
class MessageListenerService(
    private val objectMapper: ObjectMapper
): MessageListener {

    companion object {
        private val log = LoggerFactory.getLogger(MessageListenerService::class.java)
    }
    private val sinksMap = mutableMapOf<String, Sinks.Many<String>>()

    data class MessageDto(
        val message: String,
        val user: String,
        val chatRoom: String
    )

    override fun onMessage(message: Message, pattern: ByteArray?) {
        log.info("Received a message: {}, from channel : {}", message.body.toString(Charsets.UTF_8), message.channel.toString(Charsets.UTF_8))
        val messageDto = objectMapper.readValue(message.body, MessageDto::class.java)
        sinksMap[messageDto.chatRoom]?.tryEmitNext(messageDto.message)
    }

    fun registerSink(chatRoom: String): Flux<String> {
        val sink = sinksMap.computeIfAbsent(chatRoom){
            Sinks.many().unicast().onBackpressureBuffer()
        }

        return sink.asFlux()
            .doOnCancel{
                log.info("Connection is closed")
                sinksMap.remove(chatRoom)
            }
    }
}

 

registerSink에서는 map을 사용해 요청한 chatRoom으로 sink를 등록한다.

doOnCancel을 사용해 연결이 끊어지면 map에서 삭제되도록 해두었다.

 

onMessage에서는 수정하여, 메시지 큐에서 메시지가 도착하면 chatRoom을 찾아 메시지를 전송해준다.

 

이렇게 모든 코드를 완성했다.

intellij http client를 사용하여 테스트 해보도록 하자.

 

receive 요청

GET http://localhost:20001/receive?chatRoom=1

 

send 요청

POST http://localhost:20000/send
Content-Type: application/json

{
  "message": "hello",
  "user": "seungkyu",
  "chatRoom": "1"
}

 

이렇게 작성하고, receive부터 한 후 메시지를 보내보았다.

 

결과로는

이렇게 잘 나오는 것을 볼 수 있었다.

 

send에서 chatRoom을 2로 바꾸면, chatRoom에 해당하는 sink가 없기 때문에 Event가 오지 않는 것도 볼 수 있었다.

+ Recent posts