반응형

MSA에서 굉장히 많이 사용하는 Kafka를 사용하려고 한다.

 

kafka에서는 이벤트를 주고 받을 때, 넘기는 데이터를 class로 정의해야 하는데 이것을 avro를 사용해서 정의해보려고 한다.

 

avro를 사용해서 resource 폴더에 json으로 포멧을 작성하고, avro를 실행하면 java의 클래스로 파일들이 생성되게 된다.

 

우선 gradle에 avro를 추가하자

 

build.gradle.kts를 사용했다.

import com.github.davidmc24.gradle.plugin.avro.GenerateAvroJavaTask

plugins {
    kotlin("jvm") version "1.9.25"
    id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
}

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.apache.avro:avro:1.12.0")
}

tasks.test {
    useJUnitPlatform()
}

kotlin {
    jvmToolchain(17)
}

avro {
    setCreateSetters(false)
}

val generateAvro:TaskProvider<GenerateAvroJavaTask> = tasks.register("generateAvro", GenerateAvroJavaTask::class.java) {
    source("src/main/resources/avro")
    setOutputDir(file("src/main/java"))
    stringType.set("String")
    enableDecimalLogicalType = true
}

tasks.named("compileJava").configure {
    dependsOn(generateAvro)
}

 

source로 src/main/resources/avro를 지정해주었다.

해당 폴더에 avsc 파일을 생성한 후에 generateAvro를 gradlew로 실행해주면 된다.

 

avsc 파일의 예시이다.

{
    "namespace": "seungkyu.food.ordering.kafka.order.avro.model",
    "type": "record",
    "name": "PaymentRequestAvroModel",
    "fields": [
        {
            "name": "id",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
        },
        {
            "name": "sagaId",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
        },
        {
            "name": "customerId",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
        },
        {
            "name": "orderId",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
        },
        {
            "name": "price",
            "type": {
                "type": "long",
                "logicalType": "long"
            }
        },
        {
            "name": "createdAt",
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
            }
        },
        {
            "name": "paymentOrderStatus",
            "type": {
                  "type": "enum",
                  "name": "PaymentOrderStatus",
                  "symbols": ["PENDING", "CANCELLED"]
               }
        }
    ]
}

 

우선 namespace는 생성할 파일의 위치이다.

name으로 해당 클래스의 이름을 지정해준다.

 

fields이다.

name으로 해당 클래스에서 파라미터의 이름을 지정해준다.

type으로 class에서 사용할 타입과, kafka에서 사용할 타입을 명시해준다.

 

이제 gradle로 실행해보자.

지정한 패키지에 해당 클래스들이 생성되었다.

 

들어가서 확인해보니, 이렇게 직접 수정하지 말라고 하는 안내도 있었다.

 

이렇게 생성한 class로 kafka 이벤트를 주고 받을 수 있게 되었다.

'MSA' 카테고리의 다른 글

MSA에 CQRS 패턴 적용하기  (0) 2025.03.01
MSA에 Outbox 패턴 적용하기  (0) 2025.02.28
MSA에 SAGA 패턴 적용하기  (0) 2025.02.24
DDD에서 Hexagonal Architecture로 변경하기  (0) 2025.02.20
DDD의 핵심 요소  (0) 2025.02.14
반응형

https://github.com/Seungkyu-Han/Toge-do-backend

 

GitHub - Seungkyu-Han/Toge-do-backend: Toge-do 앱의 백엔드 리포지토리입니다

Toge-do 앱의 백엔드 리포지토리입니다. Contribute to Seungkyu-Han/Toge-do-backend development by creating an account on GitHub.

github.com

얼마 전에 게임회사의 면접을 보게 되었다.

해당 면접에서 Heap 영역에서의 메모리 단편화를 막기 위해서는 어떤 방법이 좋을까요?라는 질문이 있었다.

 

그 당시에는 답변을 하지 못했지만, 지금와서 생각해보면 이 ObjectPool을 적용하는 것이 답일 것 같다.

 

잊지 않기 위해 현재 진행하고 있는 프로젝트에 ObjectPool을 적용해보려 한다.

 

직접 만들어야 하나... 생각하고 있었는데

다행히 스프링에서 해당 라이브러리가 존재한다.

 

일단 Object Pool은 객체가 필요할 때, 매번 생성하는 것이 아니라 관리하고 있는 pool에서 가져오는 것을 말한다.

이렇게 Pool에서 객체를 가져오게 되면, 우선 객체를 생성하는 데 필요한 비용이 감소한다.

객체를 생성할 때, 메모리를 할당하고 객체를 생성하는 과정은 시간이 굉장히 오래 걸리기 때문에 이 과정을 아낄 수 있다.

또한, 불연속적인 Heap 메모리에 계속 할당하지 않기 때문에 메모리의 단편화가 줄어들게 된다.

찾아보니 게임업계에서 굉장히 많이 사용하는 방법인 것 같다.

 

바로 적용을 해보도록 하자.

 

    //OBJECT POOL
    implementation("org.apache.commons:commons-pool2:2.12.1")

 

해당 라이브러리를 추가해준다.

 

java에서 Object Pool을 관리해주는 라이브러리이다.

 

그 다음에는 Pool에서 관리할 객체를 넣어주도록 하자.

나는 UserDocument를 Pool에서 관리하도록 할 것이다.

 

@Component
class UserPoolComponent: BasePooledObjectFactory<UserDocument>() {

    private var a = 0

    override fun create(): UserDocument {
        a += 1
        return UserDocument(
            name = a.toString(),
            oauth = Oauth()
        )
    }

    override fun wrap(userDocument: UserDocument): PooledObject<UserDocument> {
        return DefaultPooledObject(userDocument)
    }
}

 

우선 create 메서드를 오버라이딩해서 미리 저장해둘 객체들을 만들어준다.

이 create 메서드를 통해 객체를 생성해서 pool에 저장해두게 된다.

 

wrap 메서드를 통해 해당 객체를 관리하도록 하는데, 이것도 DefaultPooledObject를 통해 라이브러리를 사용해 객체를 관리할 수 있다.

wrap을 통해 해당 객체의 대여상태, 유휴상태등을 저장하고 관리하게 된다.

 

그리고는

val userPool = GenericObjectPool(userPoolComponent, GenericObjectPoolConfig<UserDocument>().apply{
        maxTotal = 10
        minIdle = 5
    })

해당 component로 pool을 만들어서 사용한다.

 

해당 pool에서 borrowObject를 통해 객체를 대여하고, returnObject를 통해 객체를 반납한다.

당연하게도 대여한 후에 반납을 꼭 해줘야, 다음 사용자가 사용이 가능하다.

 

그리고 중요한 부분은, 반납하는 순간은 해당 함수의 종료 시점이 아니라 해당 함수를 호출한 함수가 해당 객체의 사용이 끝날 때이다.

호출당한 함수에서 반납을 해버리면, 동시성 문제가 발생 할 수 있다.

 

일단 테스트를 진행해보도록 하자.

 

 

        @Test
        @DisplayName("Object Pool 테스트")
        fun createUserObjectPoolTest(){
            //given

            for (i in 1..10){
                val a = userService.userPool.borrowObject()
                println(a)
            }

            //when

            //then
        }

 

이렇게 10개를 반납하지 않고, 호출해보았다.

객체들의 이름은 알기 쉽도록 1부터 10까지 차례로 생성해보았다.

 

이렇게 반납을 하지 않으니, 계속 객체풀에서 다음 객체가 대여되어 다른 이름을 가지는 것을 볼 수 있다.

 

        @Test
        @DisplayName("Object Pool 테스트")
        fun createUserObjectPoolTest(){
            //given

            for (i in 1..10){
                val a = userService.userPool.borrowObject()
                println(a)
                userService.userPool.returnObject(a)
            }

            //when

            //then
        }

 

이렇게 반납을 하면서 출력을 해보니,

 

하나의 객체만을 사용해서 출력이 되는 것을 볼 수 있다.

 

Object Pool이 객체 생성에 대한 비용을 아낄 수는 있지만, 해당 객체의 사용 종료 시점을 추적해야 하는 등 생각보다 사용이 어렵다.

동시성 문제를 피하기 위해서는 정확하게 추적이 가능할 때만 사용하도록 하자.

반응형

https://github.com/Seungkyu-Han/Toge-do-backend

 

GitHub - Seungkyu-Han/Toge-do-backend: Toge-do 앱의 백엔드 리포지토리입니다

Toge-do 앱의 백엔드 리포지토리입니다. Contribute to Seungkyu-Han/Toge-do-backend development by creating an account on GitHub.

github.com

 

해당 프로젝트 중에 아래와 같은 data class를 object mapper를 사용하여 읽은 후 redis에 String으로 저장하는 코드가 있었다.

package vp.togedo.model.documents.personalSchedule

import org.bson.types.ObjectId
import vp.togedo.model.exception.personalSchedule.PersonalScheduleEndTimeBeforeStartTimeException
import vp.togedo.model.exception.personalSchedule.PersonalScheduleTimeIsNotRangeException

data class PersonalScheduleElement(
    val id: ObjectId = ObjectId.get(),
    val startTime: String,
    val endTime: String,
    val name: String,
    val color: String
){
    /**
     * 해당 개인 스케줄 요소의 시작 시간이 종료 시간보다 앞인지 확인
     * @return true
     * @throws PersonalScheduleEndTimeBeforeStartTimeException 종료 시간이 시작 시간보다 앞에 있음
     */
    private fun isStartTimeBefore(): Boolean{
        if(startTime.length != endTime.length ||
            startTime > endTime)
            throw PersonalScheduleEndTimeBeforeStartTimeException()
        return true
    }

    /**
     * 해당 스케줄의 시간이 범위 내에 있는지 확인
     * @param startTimeRange 시작 범위
     * @param endTimeRange 종료 범위
     * @return true
     * @throws PersonalScheduleTimeIsNotRangeException 유효한 시간 범위가 아님
     */
    private fun isTimeRange(
        startTimeRange: String,
        endTimeRange: String): Boolean{
        if(startTime.length != startTimeRange.length ||
            endTime.length != endTimeRange.length ||
            startTime !in startTimeRange..endTimeRange ||
            endTime !in startTimeRange..endTimeRange){
            throw PersonalScheduleTimeIsNotRangeException()
        }
        return true
    }

    /**
     * 유동 스케줄의 시간이 유효한지 확인
     * @return true
     * @throws PersonalScheduleTimeIsNotRangeException 유효한 시간 범위가 아님
     * @throws PersonalScheduleEndTimeBeforeStartTimeException 종료 시간이 시작 시간보다 앞에 있음
     */
    fun isValidTimeForFlexibleSchedule(): Boolean{
        return isStartTimeBefore() &&
                //00(년)_01(월)_01(일)_00(시)_00(분) ~ 99(년)_12(월)_31(일)_23(시)_59(분)
                isTimeRange(
                    startTimeRange = "0001010000",
                    endTimeRange = "9912312359",)
    }

    /**
     * 고정 스케줄의 시간이 유효한지 확인
     * @return true
     * @throws PersonalScheduleTimeIsNotRangeException 유효한 시간 범위가 아님
     * @throws PersonalScheduleEndTimeBeforeStartTimeException 종료 시간이 시작 시간보다 앞에 있음
     */
    fun isValidTimeForFixedSchedule(): Boolean{
        return isStartTimeBefore() &&
                //1(요일)_00(시)_00(분) ~ 7(요일)_23(시)_59(분)
                isTimeRange(
                    startTimeRange = "10000",
                    endTimeRange = "72359")
    }

}

 

그리고 여기에서 고정 일정을 추가하기 위해 isValidFimeForFixedSchedule만 호출을 하고 저장을 하는데, 계속 isValidTimeForFixedSchedule이 호출되어 Exception이 발생했다.

 

디버깅을 아무리 해보아도, 해당 API에서는 Fix 검증 코드만 사용하고 있었기에 버그를 잡기가 굉장히 어려웠다.

 

그러던 중 해당 에러가, redis 저장과정에서 생긴 다는 것을 알게 되었고 object mapper의 writeValueAsString에서 에러가 발생했다.

 

아니....왜 직렬화만 하라니까 함수를 호출하지?라고 생각을 하며 이유를 찾아보고 있었는데 함수의 이름 때문이었다.

 

직렬화 과정에서 함수 앞에 "is"가 붙어있으면 호출하여 해당 함수로 조건검사를 시도한다는 것이었다.

 

지금 함수명들이 IsValidTime이기 때문에 해당 함수들로 조건검사를 했던 것이다.

 

이 방법을 알고 있지 않았기 때문에 직접 조건검사를 하고 추가를 해주었고, 지금은 이 조건검사를 해제해야 했다.

 

@JsonIgnore

 

함수들에 이 annotation을 붙여주면 된다.

직렬화를 하지 않겠다는 것이다.

 

다음에는 굳이 ignore가 아닌, 직렬화를 조건검사를 한 후에 할 수도 있겠다라는 생각이 들었다.

+ Recent posts