Skip to content

Commit

Permalink
refactor: remove batch processing in kafka listener
Browse files Browse the repository at this point in the history
  • Loading branch information
JiwonKKang committed Dec 6, 2024
1 parent acc0562 commit 737a94a
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 36 deletions.
1 change: 1 addition & 0 deletions gateway-service/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ RUN mkdir -p ${MODULE_NAME}/build/extracted && (java -Djarmode=layertools -jar $

FROM eclipse-temurin:17-jdk
VOLUME /tmp

RUN ln -sf /usr/share/zoneinfo/Asia/Seoul /etc/localtime
ENV TZ=Asia/Seoul

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,32 @@ private val log = KotlinLogging.logger {}

@Component
class CDCEventListener(
private val CDCEventHandlerFactory: CDCEventHandlerFactory
private val cdcEventHandlerFactory: CDCEventHandlerFactory
) {

@KafkaListener(topics = ["\${kafka.topic.cdc_events}"] , groupId = "\${kafka.consumer-group.cdc_events}")
fun receive(records: List<ConsumerRecord<String, DebeziumEvent>>, acknowledgment: Acknowledgment) {
val sortedRecords: List<ConsumerRecord<String, DebeziumEvent>> = records.stream()
.filter { shouldProcessEvent(it.value().payload) }
.sorted(Comparator.comparing { r -> r.value().payload.date })
.toList()
@KafkaListener(topics = ["\${kafka.topic.cdc_events}"], groupId = "\${kafka.consumer-group.cdc_events}")
fun receive(record: ConsumerRecord<String, DebeziumEvent>, acknowledgment: Acknowledgment) {
val payload = record.value().payload

log.info { "${sortedRecords.size} 개의 이벤트를 처리 요청" }
if (isReadOperation(payload)) {
log.info { "처리하지 않는 이벤트: ${payload.operation}" }
acknowledgment.acknowledge()
return
}

log.info { "${payload.source["table"]} - ${payload.operation} 이벤트를 ${record.topic()} 토픽에서 처리 요청" }

sortedRecords.forEach { record ->
log.info {
"${record.value().payload.operation} 이벤트를 ${record.topic()} 토픽에 처리 요청"
}
CDCEventHandlerFactory.getHandler(getTableName(record)).process(record.value())
try {
val tableName = getTableName(record)
cdcEventHandlerFactory.getHandler(tableName).process(record.value())
acknowledgment.acknowledge()
} catch (ex: Exception) {
log.error(ex) { "이벤트 처리 중 오류 발생: ${record.value()}" }
}
acknowledgment.acknowledge()
}

private fun shouldProcessEvent(payload: DebeziumEvent.DebeziumEventPayload): Boolean {
// READ 이벤트 처리하지 않음
return payload.operation != DebeziumEvent.DebeziumEventPayloadOperation.READ
private fun isReadOperation(payload: DebeziumEvent.DebeziumEventPayload): Boolean {
return payload.operation == DebeziumEvent.DebeziumEventPayloadOperation.READ
}

private fun getTableName(record: ConsumerRecord<String, DebeziumEvent>): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.springframework.stereotype.Component
@Component
class QuizCDCEventHandler(
mapper: ObjectMapper,
private val quizSynchronizer: QuizSynchronizer,
private val quizDocumentSynchronizer: QuizDocumentSynchronizer,
) : AbstractSimpleEventHandler<QuizCDCEvent>(mapper), EventHandler {

init {
Expand All @@ -19,14 +19,14 @@ class QuizCDCEventHandler(

final override fun initActions() {
actions.put(DebeziumEvent.DebeziumEventPayloadOperation.CREATE) { _, after ->
after?.let { quizSynchronizer.createQuiz(it) }
after?.let { quizDocumentSynchronizer.createQuiz(it) }
}

actions.put(DebeziumEvent.DebeziumEventPayloadOperation.UPDATE) { _, after ->
after?.let { quizSynchronizer.updateQuiz(it) }
after?.let { quizDocumentSynchronizer.updateQuiz(it) }
}
actions.put(DebeziumEvent.DebeziumEventPayloadOperation.DELETE) { before, _ ->
before?.let { quizSynchronizer.removeQuiz(it.id) }
before?.let { quizDocumentSynchronizer.removeQuiz(it.id) }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.springframework.data.repository.findByIdOrNull
import org.springframework.stereotype.Component

@Component
class QuizSynchronizer(
class QuizDocumentSynchronizer(
private val quizElasticRepository: QuizElasticRepository,
private val quizTagJpaRepository: QuizTagJpaRepository
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ import org.springframework.stereotype.Component
@Component
class QuizOptionCDCEventHandler(
mapper: ObjectMapper,
private val quizSynchronizer: QuizSynchronizer
private val quizDocumentSynchronizer: QuizDocumentSynchronizer
) : AbstractSimpleEventHandler<QuizOptionCDCEvent>(mapper), EventHandler {
init {
initActions()
}

final override fun initActions() {
actions.put(DebeziumEvent.DebeziumEventPayloadOperation.CREATE) { _, after ->
after?.let { quizSynchronizer.addQuizOption(it) }
after?.let { quizDocumentSynchronizer.addQuizOption(it) }
}

actions.put(DebeziumEvent.DebeziumEventPayloadOperation.DELETE) { before, _ ->
before?.let { quizSynchronizer.removeQuizOption(it.quizId, it.optionNumber) }
before?.let { quizDocumentSynchronizer.removeQuizOption(it.quizId, it.optionNumber) }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.springframework.stereotype.Component
@Component
class QuizTagMappingCDCEventHandler(
mapper: ObjectMapper,
private val quizSynchronizer: QuizSynchronizer
private val quizDocumentSynchronizer: QuizDocumentSynchronizer
) : AbstractSimpleEventHandler<QuizTagMappingCDCEvent>(mapper), EventHandler {

init {
Expand All @@ -18,11 +18,11 @@ class QuizTagMappingCDCEventHandler(

override fun initActions() {
actions.put(DebeziumEvent.DebeziumEventPayloadOperation.CREATE) { _, after ->
after?.let { quizSynchronizer.addQuizTag(it) }
after?.let { quizDocumentSynchronizer.addQuizTag(it) }
}

actions.put(DebeziumEvent.DebeziumEventPayloadOperation.DELETE) { before, _ ->
before?.let { quizSynchronizer.removeQuizTag(it.quizId, it.tagId) }
before?.let { quizDocumentSynchronizer.removeQuizTag(it.quizId, it.tagId) }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,20 @@ class UserEventKafkaListener(
private val eventHandlerFactory: EventHandlerFactory,
) {

@KafkaListener(topics = ["#{'\${kafka.topic.user-outbox}'.split(',')}"], groupId = "\${kafka.consumer-group.quiz}")
fun receive(records: List<ConsumerRecord<String, Event>>, ack: Acknowledgment) {
records.forEach { record ->
val appEvent = record.value()
log.info { "User Outbox 이벤트를 받았습니다. $appEvent" }
@KafkaListener(
topics = ["#{'\${kafka.topic.user-outbox}'.split(',')}"],
groupId = "\${kafka.consumer-group.quiz}"
)
fun receive(record: ConsumerRecord<String, Event>, ack: Acknowledgment) {
val appEvent = record.value()
log.info { "User Outbox 이벤트를 받았습니다. $appEvent" }

try {
eventHandlerFactory.getEventHandler(appEvent.origin).process(appEvent)
ack.acknowledge()
} catch (ex: Exception) {
log.error(ex) { "이벤트 처리 중 오류 발생: $appEvent" }
// ack.acknowledge()를 호출하지 않음으로써 메시지를 재처리 가능하도록 유지
}
ack.acknowledge()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ kafka-consumer-config:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
batch-listener: true
batch-listener: false
auto-startup: true
concurrency-level: 3
session-timeout-ms: 10000
Expand Down

0 comments on commit 737a94a

Please sign in to comment.