반응형

https://github.com/Seungkyu-Han/Toge-do-backend

 

GitHub - Seungkyu-Han/Toge-do-backend: Toge-do 앱의 백엔드 리포지토리입니다

Toge-do 앱의 백엔드 리포지토리입니다. Contribute to Seungkyu-Han/Toge-do-backend development by creating an account on GitHub.

github.com

이제 Stomp를 사용한 채팅 서버의 구현이다.

 

기존에 개발했던 채팅은 Websocket을 사용해서 소켓을 연결시켜두고, 거기에 Flux로 데이터를 전송해주었던 걸로 기억난다.

클라이언트로부터 데이터를 받을 수 있는 SSE 느낌으로 말이다.

 

이번에는 Stomp 라이브러리를 사용해 제대로 채팅을 구현해보려 한다.

 

Stomp는 우선 socket에 연결하고, subscribe하는 주소에 구독해 메시지를 받고 publish하는 주소로 메시지를 보낸다고 한다.

그래서 다른 방법으로는 테스트가 어렵고 저번에 작성했던 글의 방법으로 테스트해야 한다.

 

https://jiangxy.github.io/websocket-debug-tool/

 

WebSocket Debug Tool

 

jiangxy.github.io

 

해당 사이트로 설명을 해보자면

저렇게 url에 소켓을 연결하고 stomp subscribe에 서버에서 설정한 subscribe destination을 구독하면 해당 채팅방에서 생성된 메시지들이 실시간으로 전달된다.

사실 그냥 destication을 구독하는게 아니라 endpoint가 /subscribe면 뒤에 /subscribe/{chatGroupId} 이런 식으로 뒤에 채팅방의 정보가 있어야 한다.

 

마찬가지로 send destination을 통해 메시지를 보내며 이 때도 /publish/{chatGroupId}와 같이 채팅방의 정보가 추가되어야 한다.

헤더를 추가할 수도 있으며, 이거는 그냥 연결해두고 Rest API를 호출한다고 생각하면 편할것이다.

 

그럼 일단 Stomp를 사용하기 위해 추가해야 하는 라이브러리이다.

implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation ("org.springframework.boot:spring-boot-starter-websocket")

 

이렇게 websocket과 webflux만 추가해주면 된다고 한다.

나는 기존에 webflux를 사용하고 있어 websocket만 추가해주었다.

 

이제 stomp의 config다

@Configuration
@EnableWebSocketMessageBroker
class ChatConfig: WebSocketMessageBrokerConfigurer {

    override fun registerStompEndpoints(registry: StompEndpointRegistry) {
        registry.addEndpoint("/websocket/v1/chat")
            .setAllowedOriginPatterns("*")
            .withSockJS()
        registry.addEndpoint("/websocket/v1/chat")
            .setAllowedOriginPatterns("*")
    }

    override fun configureMessageBroker(registry: MessageBrokerRegistry) {
        registry.enableSimpleBroker("/sub")
        registry.setApplicationDestinationPrefixes("/pub")
    }
}

 

우선 이런 식으로 작성하고 하나씩 설명해보자면

 

registerStompEndpoints는 웹소켓의 endpoint를 설정한다.

registry.addEndpoint에 해당 웹소켓의 end point를 설정해준다.

만약 사용하는 서버가 localhost:8080이라면 end point는 ws://localhost:8080/websocket/v1/chat이 될 것 이다.

setAllowedOriginPatterns에는 해당 웹소켓에 접속 가능한 주소를 적는 것이다.

클라우드 서버에서 사용하기 때문에 "*"로 지정해주었다.

withSockJs는 자바스크립트 라이브러리 관련된 것이라고 하는데, 나는 일단 몰라서 모두 등록해두었다.

 

configureMessageBroker는 위에서 보았던 subscribe, publish로 사용할 destination을 등록해주면 된다.

 

이제 message controller를 작성해보자.

@RestController안에 @MessageMapping을 적어주면 된다.

    @MessageMapping("/{groupId}")
    fun publishChatMessage(
        @Parameter(hidden = true) @Header(HttpHeaders.AUTHORIZATION) accessToken: String,
        @DestinationVariable groupId: String,
        @RequestBody messageReqDto: MessageReqDto
    ): Mono<ResponseEntity<Void>> {
        return chatService.publishMessage(
            groupId = groupId,
            userId = getUserIdFromToken(accessToken),
            message = messageReqDto.message
        ).then(Mono.fromCallable { ResponseEntity(HttpStatus.OK) })
    }

 

이렇게 원하는 Header를 가져올 수도 있고, dto를 정의해서 원하는 json 형식으로 값을 가져올 수 있다.

@DestinationVariable로 groupId를 가져왔으며, 해당 정보는 MongoDB에 그대로 저장한다.

 

이제 서버에서 클라이언트로 메시지를 보내는 방법이다.

simpMessageSendingOperations.convertAndSend(
    "/sub/${chatDocument.groupId}", messageDocumentToDto(chatDocument)

 

그냥 간단하게 SimpMessageSendingOperations에 convertAndSend로 구독하고 있는 채팅방으로 메시지 dto를 전송하면 된다.

 

해당 이벤트를 가져오는 방법은 전 포스팅에서 MongoDB changeStream 부분을 찾아보면 된다.

 

테스트 결과도 전 포스팅의 사진으로 대체하도록 하겠다.

 

Stomp로 처음 채팅방을 만들어 보았는데, 채팅방 관리가 굉장히 쉬웠다.

WebSocket만 사용해서 구현하는 거보다 난이도가 더 쉬운 것 같다.

반응형

https://github.com/Seungkyu-Han/Toge-do-backend

 

GitHub - Seungkyu-Han/Toge-do-backend: Toge-do 앱의 백엔드 리포지토리입니다

Toge-do 앱의 백엔드 리포지토리입니다. Contribute to Seungkyu-Han/Toge-do-backend development by creating an account on GitHub.

github.com

 

마지막 기능인 채팅 기능을 개발하던 중, 당연히 kafka를 써야겠다고 생각하고 있었지만... 그냥 갑자기 쓰기 싫어졌다.

우선 기존 MSA 환경에서 채팅 서버에서 kafka를 쓰는 이유는 다음과 같다.

(MSA 환경이라고 말하는게 맞나...? 그냥 해당 서버가 여러개이고 로드밸런서로 연결된 상황)

 

채팅 서버가 하나가 아니기 때문에, 위와 같은 상황에서는 A가 B에게 메시지를 전송할 수 없다.

따라서 서버 간에 이벤트(메시지 전송)을 주고 받기 위해, Kafka를 사용하는 것이다.

 

 

해당 상황에서 kafka는 다음과 같은 역할을 한다.

이렇게해서 모든 chat server에 메시지가 전달되고, B와 연결되어 있는 서버에서 메시지를 전송하게 되는 것이다.

 

근데, 이 상황이 마음에 들지는 않았다.

어차피 채팅을 저장하기 위해 MongoDB에 저장도 해야하는데, 이벤트를 전송하기 위해 kafka도 연결하니 메시지 전송 하나로 외부 리소스에 2번 접근하는 것이기 때문이다.

 

이 방법을 해결하기 위해 MongoDB의 changeStream을 사용하고자 한다.

ReactiveMongoTemplate을 사용했으며, MongoTemplate에서는 이 기능이 있는지는 잘 모르겠다.

 

 

우선 구현해보도록 하자.

        reactiveMongoTemplate.changeStream(ChatDocument::class.java)
            .listen()
            .doOnNext{
                item ->
                val chatDocument = item.body
                val operationType = item.operationType

                if(chatDocument != null && operationType == OperationType.INSERT) {
                    simpMessageSendingOperations.convertAndSend(
                        "/sub/${chatDocument.groupId}", messageDocumentToDto(chatDocument)
                    )
                }
            }.subscribe()

 

이렇게 changeStream({변화를 subscribe할 Document}).listen()

이 메서드로 해당 collection에 데이터의 삽입, 수정, 삭제 이벤트를 구독할 수 있다.

이러한 이벤트를 doOnNext로 처리하는 것이다.

 

우리는 채팅이 생성되는 Insert 이벤트만 사용할 것이다.

그렇기 때문에 listen의 Return에서 operationType을 가져오고 조건문을 사용해 INSERT인 경우에만 웹소켓에 메시지를 전송하도록 한다.

 

조건문 안의 코드는 Stomp 라이브러리를 사용해 사용자에게 메시지를 전송하는 부분이다.

다음 포스팅의 내용으로 다루도록 하겠다.

 

이제 2개의 서버를 다른 포트로 실행하고, 각각의 서버에 연결한 메시지가 정상적으로 오는지 확인해보자.

 

테스트는 아래의 페이지를 사용했다.

https://jiangxy.github.io/websocket-debug-tool/

 

WebSocket Debug Tool

 

jiangxy.github.io

stomp로 테스트가 가능하기 때문이다.

 

스프링 서버를 12044, 12045 포트에서 실행 해두었다.

 

그리고 이렇게 각각 같은 채팅방에 구독을 한 후, 메시지를 전송해보니(페이지가 화면을 줄이니 깨진다...)

이렇게 정상적으로 모두에게 전송되는 것을 볼 수 있었다.(토큰을 사용해서 중간 부분은 모자이크로 처리했다.)

 

이렇게 하니 Kafka없이도 MongoDB만 사용해 채팅서버를 구현할 수 있었다.

반응형

https://github.com/Seungkyu-Han/Toge-do-backend

 

GitHub - Seungkyu-Han/Toge-do-backend: Toge-do 앱의 백엔드 리포지토리입니다

Toge-do 앱의 백엔드 리포지토리입니다. Contribute to Seungkyu-Han/Toge-do-backend development by creating an account on GitHub.

github.com

 

우선 에러가 발생한 코드는 다음과 같다.

    override fun findByUserId(userId: ObjectId): Mono<PersonalScheduleDocument> {
        return reactiveRedisTemplate.opsForValue()
            .get("$redisPrefix$userId")
            .map{
                objectMapper.readValue(it, PersonalScheduleDocument::class.java)
            }
            .switchIfEmpty(
                personalScheduleMongoRepository.findByUserId(userId)
            )
            .publishOn(Schedulers.boundedElastic())
            .doOnSuccess {
                if (it != null)
                    reactiveRedisTemplate.opsForValue()
                        .set(
                            "$redisPrefix$userId",
                            objectMapper.writeValueAsString(it),
                            personalScheduleRedisTime
                            ).block()
            }
    }

 

이렇게 Redis를 먼저 조회하고, 만약 Redis에 값이 없으면 MongoDB를 조회하는 코드이다.

해당 코드를 이용하여 테스트를 하고, Redis에 값이 있으면 MongoDB를 호출하지 않을 것이라고 생각했다.

 

@Test
        @DisplayName("Redis에 조회하려는 값이 있는 경우")
        fun findByUserIdAndRedisHaveResultReturnSuccess(){
            //given
            val userId = ObjectId.get()
            val personalSchedule = PersonalScheduleDocument(
                userId = userId,
            )
            val personalScheduleToString = objectMapper.writeValueAsString(personalSchedule)
            `when`(reactiveRedisTemplate.opsForValue())
                .thenReturn(reactiveValueOperations)

            `when`(reactiveValueOperations.get("$redisPrefix$userId"))
                .thenReturn(Mono.just(personalScheduleToString))

            `when`(personalScheduleMongoRepository.findByUserId(userId))
                .thenReturn(Mono.just(personalSchedule))

            `when`(reactiveValueOperations.set(
                "$redisPrefix$userId",
                personalScheduleToString,
                personalScheduleRedisTime
            )).thenReturn(Mono.just(true))

            //when
            StepVerifier.create(personalScheduleRepositoryImpl.findByUserId(userId))
                .expectNextMatches { it.userId == personalSchedule.userId }.verifyComplete()

            //then

            verify(personalScheduleMongoRepository, times(0)).findByUserId(userId)


        }

 

이렇게 하고 테스트를 해보았는데

이렇게 MongoDB의 findByUserId가 호출되었다고 나온다.

 

내가 코드를 작성한 것이라고 생각해서 계속 수정해 보았지만, 고칠 수 없었고 구글링을 통해 답을 얻을 수 있었다.

 

그냥 원래 그렇다고 한다...

switchIfEmpty가 지금은 데이터베이스에 접근하고 있기 때문에 접근을 최대한 하지 않아야 한다.

 

그렇기 때문에 Mono.defer를 이용하여 호출을 최대한 미루기로 했다.(defer를 왜 사용하는지 잘 몰랐는데... 여기에서 사용하는구나)

 

override fun findByUserId(userId: ObjectId): Mono<PersonalScheduleDocument> {
        return reactiveRedisTemplate.opsForValue()
            .get("$redisPrefix$userId")
            .map{
                objectMapper.readValue(it, PersonalScheduleDocument::class.java)
            }
            .switchIfEmpty(
                Mono.defer{personalScheduleMongoRepository.findByUserId(userId) }
            )
            .publishOn(Schedulers.boundedElastic())
            .doOnSuccess {
                if (it != null)
                    reactiveRedisTemplate.opsForValue()
                        .set(
                            "$redisPrefix$userId",
                            objectMapper.writeValueAsString(it),
                            personalScheduleRedisTime
                            ).block()
            }
    }

 

이렇게 해당 코드를 Mono.defer{personalScheduleMongoRepository.findByUserId(userId)}로 감싸고 테스트를 해보니

 

이렇게 테스트가 잘 통과하는 것을 볼 수 있었다.

 

switchIfEmpty 다 수정하러 가야겠다...

+ Recent posts