https://github.com/Seungkyu-Han/Toge-do-backend
GitHub - Seungkyu-Han/Toge-do-backend: Toge-do 앱의 백엔드 리포지토리입니다
Toge-do 앱의 백엔드 리포지토리입니다. Contribute to Seungkyu-Han/Toge-do-backend development by creating an account on GitHub.
github.com
마지막 기능인 채팅 기능을 개발하던 중, 당연히 kafka를 써야겠다고 생각하고 있었지만... 그냥 갑자기 쓰기 싫어졌다.
우선 기존 MSA 환경에서 채팅 서버에서 kafka를 쓰는 이유는 다음과 같다.
(MSA 환경이라고 말하는게 맞나...? 그냥 해당 서버가 여러개이고 로드밸런서로 연결된 상황)
채팅 서버가 하나가 아니기 때문에, 위와 같은 상황에서는 A가 B에게 메시지를 전송할 수 없다.
따라서 서버 간에 이벤트(메시지 전송)을 주고 받기 위해, Kafka를 사용하는 것이다.
해당 상황에서 kafka는 다음과 같은 역할을 한다.
이렇게해서 모든 chat server에 메시지가 전달되고, B와 연결되어 있는 서버에서 메시지를 전송하게 되는 것이다.
근데, 이 상황이 마음에 들지는 않았다.
어차피 채팅을 저장하기 위해 MongoDB에 저장도 해야하는데, 이벤트를 전송하기 위해 kafka도 연결하니 메시지 전송 하나로 외부 리소스에 2번 접근하는 것이기 때문이다.
이 방법을 해결하기 위해 MongoDB의 changeStream을 사용하고자 한다.
ReactiveMongoTemplate을 사용했으며, MongoTemplate에서는 이 기능이 있는지는 잘 모르겠다.
우선 구현해보도록 하자.
reactiveMongoTemplate.changeStream(ChatDocument::class.java)
.listen()
.doOnNext{
item ->
val chatDocument = item.body
val operationType = item.operationType
if(chatDocument != null && operationType == OperationType.INSERT) {
simpMessageSendingOperations.convertAndSend(
"/sub/${chatDocument.groupId}", messageDocumentToDto(chatDocument)
)
}
}.subscribe()
이렇게 changeStream({변화를 subscribe할 Document}).listen()
이 메서드로 해당 collection에 데이터의 삽입, 수정, 삭제 이벤트를 구독할 수 있다.
이러한 이벤트를 doOnNext로 처리하는 것이다.
우리는 채팅이 생성되는 Insert 이벤트만 사용할 것이다.
그렇기 때문에 listen의 Return에서 operationType을 가져오고 조건문을 사용해 INSERT인 경우에만 웹소켓에 메시지를 전송하도록 한다.
조건문 안의 코드는 Stomp 라이브러리를 사용해 사용자에게 메시지를 전송하는 부분이다.
다음 포스팅의 내용으로 다루도록 하겠다.
이제 2개의 서버를 다른 포트로 실행하고, 각각의 서버에 연결한 메시지가 정상적으로 오는지 확인해보자.
테스트는 아래의 페이지를 사용했다.
https://jiangxy.github.io/websocket-debug-tool/
WebSocket Debug Tool
jiangxy.github.io
stomp로 테스트가 가능하기 때문이다.
스프링 서버를 12044, 12045 포트에서 실행 해두었다.
그리고 이렇게 각각 같은 채팅방에 구독을 한 후, 메시지를 전송해보니(페이지가 화면을 줄이니 깨진다...)
이렇게 정상적으로 모두에게 전송되는 것을 볼 수 있었다.(토큰을 사용해서 중간 부분은 모자이크로 처리했다.)
이렇게 하니 Kafka없이도 MongoDB만 사용해 채팅서버를 구현할 수 있었다.
'토이 프로젝트' 카테고리의 다른 글
직렬화 과정에서 함수 조건 검사 (0) | 2025.01.13 |
---|---|
Spring Webflux + Stomp를 사용해 채팅 구현하기 (0) | 2025.01.05 |
WebFlux SwitchIfEmpty가 항상 시작되는 에러 (2) | 2024.12.23 |
MSA Gateway에 CircuitBreaker 적용 (0) | 2024.12.16 |
Webflux에 transaction 적용 (0) | 2024.12.13 |