diff --git a/gateway-service/Dockerfile b/gateway-service/Dockerfile index 5c525af0..c5a46b3a 100644 --- a/gateway-service/Dockerfile +++ b/gateway-service/Dockerfile @@ -13,6 +13,7 @@ RUN mkdir -p ${MODULE_NAME}/build/extracted && (java -Djarmode=layertools -jar $ FROM eclipse-temurin:17-jdk VOLUME /tmp + RUN ln -sf /usr/share/zoneinfo/Asia/Seoul /etc/localtime ENV TZ=Asia/Seoul diff --git a/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/CDCEventListener.kt b/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/CDCEventListener.kt index accea97b..01cb8586 100644 --- a/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/CDCEventListener.kt +++ b/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/CDCEventListener.kt @@ -11,31 +11,32 @@ private val log = KotlinLogging.logger {} @Component class CDCEventListener( - private val CDCEventHandlerFactory: CDCEventHandlerFactory + private val cdcEventHandlerFactory: CDCEventHandlerFactory ) { - @KafkaListener(topics = ["\${kafka.topic.cdc_events}"] , groupId = "\${kafka.consumer-group.cdc_events}") - fun receive(records: List>, acknowledgment: Acknowledgment) { - val sortedRecords: List> = records.stream() - .filter { shouldProcessEvent(it.value().payload) } - .sorted(Comparator.comparing { r -> r.value().payload.date }) - .toList() + @KafkaListener(topics = ["\${kafka.topic.cdc_events}"], groupId = "\${kafka.consumer-group.cdc_events}") + fun receive(record: ConsumerRecord, acknowledgment: Acknowledgment) { + val payload = record.value().payload - log.info { "${sortedRecords.size} 개의 이벤트를 처리 요청" } + if (isReadOperation(payload)) { + log.info { "처리하지 않는 이벤트: ${payload.operation}" } + acknowledgment.acknowledge() + return + } + log.info { "${payload.source["table"]} - ${payload.operation} 이벤트를 ${record.topic()} 토픽에서 처리 요청" } - sortedRecords.forEach { record -> - log.info { - "${record.value().payload.operation} 이벤트를 ${record.topic()} 토픽에 처리 요청" - } - CDCEventHandlerFactory.getHandler(getTableName(record)).process(record.value()) + try { + val tableName = getTableName(record) + cdcEventHandlerFactory.getHandler(tableName).process(record.value()) + acknowledgment.acknowledge() + } catch (ex: Exception) { + log.error(ex) { "이벤트 처리 중 오류 발생: ${record.value()}" } } - acknowledgment.acknowledge() } - private fun shouldProcessEvent(payload: DebeziumEvent.DebeziumEventPayload): Boolean { - // READ 이벤트 처리하지 않음 - return payload.operation != DebeziumEvent.DebeziumEventPayloadOperation.READ + private fun isReadOperation(payload: DebeziumEvent.DebeziumEventPayload): Boolean { + return payload.operation == DebeziumEvent.DebeziumEventPayloadOperation.READ } private fun getTableName(record: ConsumerRecord): String { diff --git a/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/QuizCDCEventHandler.kt b/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/QuizCDCEventHandler.kt index c5c45da5..f10886bd 100644 --- a/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/QuizCDCEventHandler.kt +++ b/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/QuizCDCEventHandler.kt @@ -9,7 +9,7 @@ import org.springframework.stereotype.Component @Component class QuizCDCEventHandler( mapper: ObjectMapper, - private val quizSynchronizer: QuizSynchronizer, + private val quizDocumentSynchronizer: QuizDocumentSynchronizer, ) : AbstractSimpleEventHandler(mapper), EventHandler { init { @@ -19,14 +19,14 @@ class QuizCDCEventHandler( final override fun initActions() { actions.put(DebeziumEvent.DebeziumEventPayloadOperation.CREATE) { _, after -> - after?.let { quizSynchronizer.createQuiz(it) } + after?.let { quizDocumentSynchronizer.createQuiz(it) } } actions.put(DebeziumEvent.DebeziumEventPayloadOperation.UPDATE) { _, after -> - after?.let { quizSynchronizer.updateQuiz(it) } + after?.let { quizDocumentSynchronizer.updateQuiz(it) } } actions.put(DebeziumEvent.DebeziumEventPayloadOperation.DELETE) { before, _ -> - before?.let { quizSynchronizer.removeQuiz(it.id) } + before?.let { quizDocumentSynchronizer.removeQuiz(it.id) } } } } \ No newline at end of file diff --git a/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/QuizSynchronizer.kt b/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/QuizDocumentSynchronizer.kt similarity index 98% rename from quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/QuizSynchronizer.kt rename to quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/QuizDocumentSynchronizer.kt index 778a733d..0cc53745 100644 --- a/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/QuizSynchronizer.kt +++ b/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/QuizDocumentSynchronizer.kt @@ -9,7 +9,7 @@ import org.springframework.data.repository.findByIdOrNull import org.springframework.stereotype.Component @Component -class QuizSynchronizer( +class QuizDocumentSynchronizer( private val quizElasticRepository: QuizElasticRepository, private val quizTagJpaRepository: QuizTagJpaRepository ) { diff --git a/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/QuizOptionCDCEventHandler.kt b/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/QuizOptionCDCEventHandler.kt index 86e0c5c4..2ac10fee 100644 --- a/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/QuizOptionCDCEventHandler.kt +++ b/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/QuizOptionCDCEventHandler.kt @@ -9,7 +9,7 @@ import org.springframework.stereotype.Component @Component class QuizOptionCDCEventHandler( mapper: ObjectMapper, - private val quizSynchronizer: QuizSynchronizer + private val quizDocumentSynchronizer: QuizDocumentSynchronizer ) : AbstractSimpleEventHandler(mapper), EventHandler { init { initActions() @@ -17,11 +17,11 @@ class QuizOptionCDCEventHandler( final override fun initActions() { actions.put(DebeziumEvent.DebeziumEventPayloadOperation.CREATE) { _, after -> - after?.let { quizSynchronizer.addQuizOption(it) } + after?.let { quizDocumentSynchronizer.addQuizOption(it) } } actions.put(DebeziumEvent.DebeziumEventPayloadOperation.DELETE) { before, _ -> - before?.let { quizSynchronizer.removeQuizOption(it.quizId, it.optionNumber) } + before?.let { quizDocumentSynchronizer.removeQuizOption(it.quizId, it.optionNumber) } } } } \ No newline at end of file diff --git a/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/QuizTagMappingCDCEventHandler.kt b/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/QuizTagMappingCDCEventHandler.kt index e6b49c76..47710507 100644 --- a/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/QuizTagMappingCDCEventHandler.kt +++ b/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/quizread/messaging/listener/QuizTagMappingCDCEventHandler.kt @@ -9,7 +9,7 @@ import org.springframework.stereotype.Component @Component class QuizTagMappingCDCEventHandler( mapper: ObjectMapper, - private val quizSynchronizer: QuizSynchronizer + private val quizDocumentSynchronizer: QuizDocumentSynchronizer ) : AbstractSimpleEventHandler(mapper), EventHandler { init { @@ -18,11 +18,11 @@ class QuizTagMappingCDCEventHandler( override fun initActions() { actions.put(DebeziumEvent.DebeziumEventPayloadOperation.CREATE) { _, after -> - after?.let { quizSynchronizer.addQuizTag(it) } + after?.let { quizDocumentSynchronizer.addQuizTag(it) } } actions.put(DebeziumEvent.DebeziumEventPayloadOperation.DELETE) { before, _ -> - before?.let { quizSynchronizer.removeQuizTag(it.quizId, it.tagId) } + before?.let { quizDocumentSynchronizer.removeQuizTag(it.quizId, it.tagId) } } } } \ No newline at end of file diff --git a/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/user/messaging/listener/UserEventKafkaListener.kt b/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/user/messaging/listener/UserEventKafkaListener.kt index 63aab58b..c192cd33 100644 --- a/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/user/messaging/listener/UserEventKafkaListener.kt +++ b/quiz-service/quiz-infra/src/main/kotlin/com/grepp/quizy/quiz/infra/user/messaging/listener/UserEventKafkaListener.kt @@ -13,13 +13,20 @@ class UserEventKafkaListener( private val eventHandlerFactory: EventHandlerFactory, ) { - @KafkaListener(topics = ["#{'\${kafka.topic.user-outbox}'.split(',')}"], groupId = "\${kafka.consumer-group.quiz}") - fun receive(records: List>, ack: Acknowledgment) { - records.forEach { record -> - val appEvent = record.value() - log.info { "User Outbox 이벤트를 받았습니다. $appEvent" } + @KafkaListener( + topics = ["#{'\${kafka.topic.user-outbox}'.split(',')}"], + groupId = "\${kafka.consumer-group.quiz}" + ) + fun receive(record: ConsumerRecord, ack: Acknowledgment) { + val appEvent = record.value() + log.info { "User Outbox 이벤트를 받았습니다. $appEvent" } + + try { eventHandlerFactory.getEventHandler(appEvent.origin).process(appEvent) + ack.acknowledge() + } catch (ex: Exception) { + log.error(ex) { "이벤트 처리 중 오류 발생: $appEvent" } + // ack.acknowledge()를 호출하지 않음으로써 메시지를 재처리 가능하도록 유지 } - ack.acknowledge() } -} \ No newline at end of file +} diff --git a/quiz-service/quiz-infra/src/main/resources/application-infra.yml b/quiz-service/quiz-infra/src/main/resources/application-infra.yml index 75374efe..99ee22de 100644 --- a/quiz-service/quiz-infra/src/main/resources/application-infra.yml +++ b/quiz-service/quiz-infra/src/main/resources/application-infra.yml @@ -59,7 +59,7 @@ kafka-consumer-config: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer auto-offset-reset: earliest - batch-listener: true + batch-listener: false auto-startup: true concurrency-level: 3 session-timeout-ms: 10000