Skip to content

Commit

Permalink
- fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
psmagin committed Sep 10, 2024
1 parent 1b3f8cb commit befa195
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import lombok.RequiredArgsConstructor;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.integration.message.interceptor.DeleteAllRecordFilterStrategy;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -19,19 +18,17 @@ public class InstanceResourceEventKafkaConfiguration extends KafkaConfiguration

/**
* Creates and configures {@link ConcurrentKafkaListenerContainerFactory} as Spring bean
* for consuming resource events from Apache Kafka.
* for consuming resource instance related events from Kafka.
*
* @return {@link ConcurrentKafkaListenerContainerFactory} object as Spring bean.
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ResourceEvent> instanceResourceListenerContainerFactory(
DeleteAllRecordFilterStrategy deleteAllRecordFilterStrategy,
BatchInterceptor<String, ResourceEvent>[] batchInterceptors) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, ResourceEvent>();
factory.setBatchListener(true);
var deserializer = new JsonDeserializer<>(ResourceEvent.class, false);
factory.setConsumerFactory(getConsumerFactory(deserializer, kafkaProperties));
factory.setRecordFilterStrategy(deleteAllRecordFilterStrategy);
factory.setBatchInterceptor(new CompositeBatchInterceptor<>(batchInterceptors));
return factory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class KafkaMessageListener {
groupId = "#{folioKafkaProperties.listener['events'].groupId}",
concurrency = "#{folioKafkaProperties.listener['events'].concurrency}")
public void handleInstanceEvents(List<ConsumerRecord<String, ResourceEvent>> consumerRecords) {
log.info("Processing instance ids from kafka events [number of events: {}]", consumerRecords.size());
log.info("Processing instance related events from kafka events [number of events: {}]", consumerRecords.size());
var batch = getInstanceResourceEvents(consumerRecords);
var batchByTenant = batch.stream().collect(Collectors.groupingBy(ResourceEvent::getTenant));
batchByTenant.forEach((tenant, resourceEvents) -> executionService.executeSystemUserScoped(tenant, () -> {
Expand Down

0 comments on commit befa195

Please sign in to comment.