이번에 Kafka를 공부하며 서버 간에 Kafka 통신을 할 수 있게 되었고, 이것을 이용하여 사용자의 정보를 수정할 때마다 사용자에게 메일로 알림을 발송하는 시스템을 만들어보려 한다.
시스템 구조는 위와 같다.
User Server에서 직접 이메일을 전송하는 방법도 있겠지만, Email 발송 자체가 시간이 오래 걸리는 작업이며 사용자의 정보만 다루는 User Server에 있어야 하는 기능은 아니기 때문에 이런 식으로 분리를 한 것이다.
우선 dependency는 아래와 같다.
implementation("io.projectreactor.kafka:reactor-kafka:1.3.19")
implementation("org.springframework.kafka:spring-kafka:3.0.9")
implementation("org.springframework.boot:spring-boot-starter-webflux")
Spring Webflux를 사용하기 때문에 webflux, kafka를 사용하기 위해 kafka를 implementation 해준다.
그러고는 Email Service를 다음과 같이 작성했다.
@Service
class EmailService(
private val javaMailSender: JavaMailSender,
) {
fun sendEmail(title: String, content: String, address: String) {
val message = SimpleMailMessage()
message.setTo(address)
message.subject = title
message.text = content
javaMailSender.send(message)
}
}
그냥 간단하게 이메일의 내용 작성해서 보내는 서비스이다.
이렇게 정상적으로 메일이 오는 것을 볼 수 있다.
그럼 이제 유저 서버를 만들고 유저 정보를 수정하면 Kafka에 이벤트를 publish하는 서버를 만들어보자.
우선 Producer 쪽의 Kafka 설정이다.
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: seungkyu
yml에는 이렇게 bootsrap-server를 명시해주고
@Configuration
class KafkaConfig {
@Bean
fun reactiveKafkaProducerTemplate(
kafkaProperties: KafkaProperties
): ReactiveKafkaProducerTemplate<String, String> {
val props = HashMap<String, Any>()
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaProperties.bootstrapServers
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return ReactiveKafkaProducerTemplate(
SenderOptions.create(props)
)
}
}
이렇게 reactiveKafkaProducerTemplate Bean을 만들어준다.
KafkaProperties를 parameter로 받으면 yml에 작성된 kafka의 정보를 불러오게 된다.
이것을 바탕으로 reactive kafka를 만들어주었다.
@Service
class UserService(
private val reactiveKafkaProducerTemplate: ReactiveKafkaProducerTemplate<String, String>,
private val objectMapper: ObjectMapper
) {
fun changeUserInfo(userInfo: UserInfo){
reactiveKafkaProducerTemplate.send("change-user-info", objectMapper.writeValueAsString(userInfo))
.subscribe()
}
}
데이터베이스에서 수정하는 부분은 빼고, 이렇게 chage-user-info 토픽으로 변경된 유저의 정보를 보내주었다.
이제 kafka listener를 만들어서 해당 event가 올 때마다 메일을 보내주면 된다.
다시 메일 서버로 가서
@Component
class UserKafkaListener(
private val objectMapper: ObjectMapper,
private val emailService: EmailService
) {
data class UserInfo(
val name: String,
val email: String,
val age: Int
)
@KafkaListener(topics = ["change-user-info"])
fun changeUserListener(data: String){
val userInfo = objectMapper.readValue(data, UserInfo::class.java)
emailService.sendEmail(
address = userInfo.email,
title = "유저 정보가 변경되었습니다.",
content = "유저 정보가 변경되었습니다. 본인이 아니라면 신고해주시기 바랍니다."
)
}
}
이렇게 Listener를 작성해준다.
@Component안에 @KafkaListener(topics = ["구독하고 싶은 토픽"])을 적어주면 된다.
당연하게 publish 하는 topic과 같은 topic이어야 한다.
지금은 문자열로 데이터를 주고받지만, JsonSerializer등을 사용하면 Json으로 바로 데이터를 주고 받을 수 있다.
방금 작성했던 user.http로 다시 사용자 정보를 수정해보면
이렇게 메일이 정상적으로 오는 것을 볼 수 있다.
'토이 프로젝트' 카테고리의 다른 글
Junit에서 Redis mock 에러 (0) | 2024.12.07 |
---|---|
Kafka: The coordinator is not available. 에러 (0) | 2024.12.06 |
WebFlux에서 RedissonReactive를 사용한 동시성 이슈 해결 (0) | 2024.11.17 |
Redis Sentinel 설정 (0) | 2024.11.09 |
Redis의 pub-sub을 사용한 SSE notification 서버 만들기 (1) | 2024.11.09 |