반응형

저번에 작성한 글과 이어진다.

 

일단 WebFlux로 만들어는 놨고, 어느 서버가 더 성능이 좋은지 테스트를 해본 후 해당 서버로 교체를 하려 한다.

솔직히 WebFlux가 더 좋을거라고는 이미 알고 있지만, 그래도 대략적인 Throughput을 구해놓기 위해 측정해봤다.

 

사용한 서버들은 아래와 같다.

https://github.com/Seungkyu-Han/KMU-TOKTOK2_FASTAPI

 

GitHub - Seungkyu-Han/KMU-TOKTOK2_FASTAPI

Contribute to Seungkyu-Han/KMU-TOKTOK2_FASTAPI development by creating an account on GitHub.

github.com

https://github.com/Seungkyu-Han/KMU-TOKTOK2_WEBFLUX

 

GitHub - Seungkyu-Han/KMU-TOKTOK2_WEBFLUX: 크무톡톡2 chatgpt webflux 서버

크무톡톡2 chatgpt webflux 서버. Contribute to Seungkyu-Han/KMU-TOKTOK2_WEBFLUX development by creating an account on GitHub.

github.com

 

이 중 FastAPI는 폴링이기 때문에 성능이 더 안 좋을 것이다.

그렇기에 FastAPI에는 기대를 하지 않고 시작했다.

 

혹시 성능 테스트에 다른 부분이 영향을 미칠수도 있으니 Docker로 CPU 1개, Ram 1GB로 설정해두고 테스트했다.

설정하는 방법은 docker-compose.yml을 사용하여 이런 옵션을 추가해주면 된다.

deploy:
  resources:
    limits:
      cpus: '1'
      memory: 1G

 

JMeter는 다음과 같이 작성했다.

헤더에 Content-Type을 명시하고

 

300명의 유저를 1분 동안 접속시켜 보았다.

 

이렇게 설정을 하고 테스트를 해보았다.

우선 WebFlux

평균 응답 시간은 11초, 최대 응답 시간은 64초

ThroughPut은 3.2/sec라고 한다.

사실 64초나 걸리면 안되지만, 그래도 300명이 1분 동안 접속할 일은 없기 때문에 이정도로 만족하려 한다.

 

이번엔 FastAPI다.

같은 조건으로 테스트 해보았지만 서버가 바로 죽어버렸다...

그렇기에 유저 수를 300명에서 100명으로 줄여보았지만, 그래도 좋은 결과는 나오지 않았다.

 

사실 이 서버는 외부 서버에 요청해서 가져오는 것이기 때문에 시간 보다는 ThroughPut을 중점으로 보려 했지만, 거기에서도 너무 많이 차이가 나버렸다...

 

이번 결과로 서버는 FastAPI에서 WebFlux로 교체하게 되었다.

반응형

기존에 Fastapi로 빠르게 ChatGPT 서버를 만들어서 사용하고 있었다.

GPT Assistant는 curl를 통한 요청을 지원하고 node.js와 python은 라이브러리를 지원했기 때문이다.

그 당시에는 일단 빠르게 배포를 해야 했기에 일단 파이썬으로 구현하고, 다음 주부터는 사용자가 몰릴 예정이라고 하여 여유있는 기간에 Webflux로 서버를 교청하게 되었다.

 

https://platform.openai.com/docs/api-reference/runs/createThreadAndRun

 

여기 openai의 assistant 공식 문서를 참조하여 만들었다.

 

일단 사용한 DTO들이다.

 

Request DTO

data class ChatBotReq(
    val content: String,
    @JsonProperty("assistant_id")
    val assistantId: String
)

data class ChatGPTReq(
    @JsonProperty("assistant_id")
    val assistantId: String,
    val thread: Thread,
    val stream: Boolean
)

data class Message(
    val role: String,
    val content: String
)

data class Thread(
    val messages: List<Message>
)

 

여기서 ChatBotReq는 외부서버에서 해당 서버로 요청 할 때 사용하는 DTO이고, 그 아래의 3개는 openAi로 요청 할 때 사용하는 DTO이다.

 

 

Response DTO

data class ChatGPTRes(
    val id: String,
    @JsonProperty("object")
    val objectType: String,
    @JsonProperty("created_at")
    val createdAt: Long,
    @JsonProperty("assistant_id")
    val assistantId: String,
    @JsonProperty("thread_id")
    val threadId: String,
    @JsonProperty("run_id")
    val runId: String,
    val status: String,
    @JsonProperty("incomplete_details")
    val incompleteDetails: String?,
    @JsonProperty("incomplete_at")
    val incompleteAt: Long?,
    @JsonProperty("completed_at")
    val completedAt: Long,
    val role: String,
    val content: List<Content>,
    val attachments: List<Any>,
    val metadata: Map<String, Any>?
)

data class Content(
    val type: String,
    val text: Text
)

data class Text(
    val value: String,
    val annotations: List<Annotation>?
)

data class Annotation(
    val type: String,
    val text: String,
    @JsonProperty("start_index")
    val startIndex: Int,
    @JsonProperty("end_index")
    val endIndex: Int,
    @JsonProperty("file_citation")
    val fileCitation: FileCitation?
)

data class FileCitation(
    @JsonProperty("file_id")
    val fileId: String?
)

OpenAI로부터 오는 응답 DTO이다.

 

근데 사실 여기서는 문자열로 받아서, 그 문자열을 ObjectMapper를 사용하여 해당 클래스로 매핑할 예정이다.

 

그 다음은 Router이다.

@Configuration
class ChatBotRouter {

    @Bean
    fun chatBotRouterMapping(
        chatBotHandler: ChatBotHandler
    ) = coRouter {
        POST("/", chatBotHandler::post)
    }
}

 

코루틴을 사용하기 때문에 coRouter를 사용하였다.

 

이제 Service 부분이다.

@Component
class ChatBotHandler(
    @Value("\${OPEN_API_KEY}")
    private val openApiKey: String,
    private val objectMapper: ObjectMapper
) {

    private val openAIUrl = "https://api.openai.com/v1"
    private val errorMessage = "챗봇 요청 중 에러가 발생했습니다."

    private val createAndRunWebClient =
        WebClient.builder()
            .baseUrl("$openAIUrl/threads/runs")
            .defaultHeaders {
                it.set(HttpHeaders.AUTHORIZATION, "Bearer $openApiKey")
                it.set(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON_VALUE)
                it.set("OpenAI-Beta", "assistants=v2")
            }
            .build()

    suspend fun post(serverRequest: ServerRequest): ServerResponse {
        return withContext(Dispatchers.IO) {
            val chatBotReq = serverRequest.bodyToMono(ChatBotReq::class.java).awaitSingle()
            val response = createAndRunWebClient
                .post()
                .bodyValue(
                    ChatGPTReq(
                        assistantId = chatBotReq.assistantId,
                        thread = Thread(
                            messages = listOf(
                                Message(
                                    role = "user",
                                    content = chatBotReq.content
                                )
                            )
                        ),
                        stream = true
                    )
                )
                .retrieve()
                .bodyToMono(String::class.java)
                .awaitSingle()


            ServerResponse.ok().bodyValue(
                try{
                objectMapper.readValue(response.split("event: thread.message.completed")[1]
                    .split("event: thread.run.step.completed")[0]
                    .trimIndent()
                    .removePrefix("data: "), ChatGPTRes::class.java).content[0].text.value
                }catch (e: IndexOutOfBoundsException){
                    errorMessage
                }
            )
                .awaitSingle()
        }
    }
}

 

우선 openApiKey는 필요하다.

이거는 발급해오고, 깃허브에는 올리면 안되기 때문에 환경변수로 빼준다.

 

우선 공통된 부분에 대한 webClient를 작성해준다.

헤더에 이런 부분들이 들어가니 똑같이 넣어주면 된다.

 

이제 POST로 요청하고, body 부분을 작성해주는데 여기서 stream은 true로 해준다.

SSE로 구현을 하지는 않는다. 그렇다고 해서 stream을 false로 하게 된다면, 이 응답이 생길 때 까지 서버에 polling을 해야 하기에 그냥 openAI에서 받을 때는 SSE로 받고, 대신 그것을 Flux가 아닌 Mono로 모아주었다.

 

이렇게 온 응답을 문자열로 출력해보면

event: thread.message~~~~~

data: {}

 

이렇게 오게 되는데, 우리는 이 중에서 메시지가 모두 만들어진 event:thread.message.completed일 때를 가져오면 된다.

그렇기 때문에 이 문자열로 split을 하고, 그 다음에 있는 내용을 가져온다.

 

그러고는 event: thread.run.step.completed 뒤에 있는 내용도 필요가 없기 때문에 그 부분도 split으로 뒷부분을 잘라준다.

그러면 이제 우리가 원하는 data: {} 이런 형태가 남게 될 것이다.

 

혹시 모르니까 trim으로 공백을 잘라주고, 앞 부분의 'data: ' 부분도 removePrefix로 제거해준다.

{~~~} 이런 내용만 남게 될 테니, ObjectMapper를 사용해 위에 있는 Dto로 변환해주고, content의 value 안에 있는 문자열을 넘겨주면 된다.

 

인덱스를 가져오는 과정들에서 에러가 생길 수도 있으니 IndexOutOfBoundsException을 사용해 Error를 핸들링 해준다.

 

Intellij의 http를 사용하여 테스트를 해보니 잘 나오는 것을 볼 수 있다.

 

이렇게 잘 나오는 것을 볼 수 있다.

'크무톡톡 프로젝트' 카테고리의 다른 글

FastAPI VS WebFlux jemeter 성능 테스트  (0) 2024.10.23
JPA Paging vs JDBC 속도 비교  (0) 2024.08.10
SpringBoot 시간 설정  (0) 2024.08.09
JPA Soft Delete  (0) 2024.08.06
Docker를 활용한 Nginx로 Swagger proxy  (0) 2024.08.05

+ Recent posts