diff --git a/NEWS.md b/NEWS.md index ef1af70ad..fe07d7b81 100644 --- a/NEWS.md +++ b/NEWS.md @@ -10,6 +10,7 @@ * Move Instance sub-entities population from database trigger to code ([MSEARCH-887](https://folio-org.atlassian.net/browse/MSEARCH-887)) * Update reindex merge failed status only for failed entity type ([MSEARCH-909](https://folio-org.atlassian.net/browse/MSEARCH-909)) * Extend reindex range tables with status, fail_cause columns ([MSEARCH-870](https://folio-org.atlassian.net/browse/MSEARCH-870)) +* Implement scheduled indexing for instance sub-resources ([MSEARCH-922](https://folio-org.atlassian.net/browse/MSEARCH-922)) * Implement endpoint to run merge reindex stage only for failed ranges ([MSEARCH-906](https://folio-org.atlassian.net/browse/MSEARCH-906)) ### Bug fixes diff --git a/README.md b/README.md index 6422f2eba..98c11611b 100644 --- a/README.md +++ b/README.md @@ -274,6 +274,7 @@ and [Cross-cluster replication](https://docs.aws.amazon.com/opensearch-service/l | REINDEX_MERGE_RANGE_PUBLISHER_RETRY_INTERVAL_MS | 1000 | The retry interval in ms for reindex merge range request publishing. | | REINDEX_MERGE_RANGE_PUBLISHER_RETRY_ATTEMPTS | 5 | The maximum number of retries for reindex merge range request publishing. | | MAX_SEARCH_BATCH_REQUEST_IDS_COUNT | 20_000 | Defines maximum batch request IDs count for searching consolidated items/holdings in consortium | +| INSTANCE_CHILDREN_INDEX_DELAY_MS | 60000 | Defines the delay for scheduler that indexes subjects/contributors/classifications in a background | The module uses system user to communicate with other modules from Kafka consumers. For production deployments you MUST specify the password for this system user via env variable: diff --git a/descriptors/ModuleDescriptor-template.json b/descriptors/ModuleDescriptor-template.json index db10b2a4a..8743cf62d 100644 --- a/descriptors/ModuleDescriptor-template.json +++ b/descriptors/ModuleDescriptor-template.json @@ -1054,36 +1054,11 @@ "value": "", "description": "Replication factor for `search.reindex.range-index` topic." }, - { - "name": "KAFKA_INDEX_SUB_RESOURCE_TOPIC_PARTITIONS", - "value": "50", - "description": "Amount of partitions for `search.index.sub-resource` topic." - }, - { - "name": "KAFKA_INDEX_SUB_RESOURCE_TOPIC_REPLICATION_FACTOR", - "value": "", - "description": "Replication factor for `search.index.sub-resource` topic." - }, { "name": "KAFKA_REINDEX_RECORDS_CONCURRENCY", "value": "2", "description": "Custom number of kafka concurrent threads for `inventory.reindex-records` message consuming." }, - { - "name": "KAFKA_INDEX_SUB_RESOURCE_CONCURRENCY", - "value": "2", - "description": "Custom number of kafka concurrent threads for `search.index.sub-resource` message consuming." - }, - { - "name": "KAFKA_CONSUMER_INDEX_SUB_RESOURCE_MAX_POLL_RECORDS", - "value": "200", - "description": "Maximum number of records returned in a single call to poll() for instance sub-resource events." - }, - { - "name": "KAFKA_CONSUMER_INDEX_SUB_RESOURCE_MAX_POLL_INTERVAL_MS", - "value": "600000", - "description": "Maximum processing time allowed for consumer on instance sub-resource events." - }, { "name": "KAFKA_CONSUMER_MAX_POLL_RECORDS", "value": "200", @@ -1161,6 +1136,16 @@ "value": "smile", "description": "Format for passing data to elasticsearch (json/smile)" }, + { + "name": "INSTANCE_CHILDREN_INDEX_DELAY_MS", + "value": "60000", + "description": "Scheduler delay for indexing subjects/contributors/classifications" + }, + { + "name": "INSTANCE_CHILDREN_INDEX_BATCH_SIZE", + "value": "500", + "description": "Batch size for indexing subjects/contributors/classifications" + }, { "name": "INITIAL_LANGUAGES", "value": "eng", diff --git a/src/main/java/org/folio/search/SearchApplication.java b/src/main/java/org/folio/search/SearchApplication.java index 988a76e63..ff189a559 100644 --- a/src/main/java/org/folio/search/SearchApplication.java +++ b/src/main/java/org/folio/search/SearchApplication.java @@ -4,11 +4,13 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCaching; import org.springframework.cloud.openfeign.EnableFeignClients; +import org.springframework.scheduling.annotation.EnableScheduling; /** * Folio search application. */ @EnableCaching +@EnableScheduling @EnableFeignClients @SpringBootApplication public class SearchApplication { diff --git a/src/main/java/org/folio/search/configuration/kafka/SubResourceKafkaConfiguration.java b/src/main/java/org/folio/search/configuration/kafka/SubResourceKafkaConfiguration.java deleted file mode 100644 index e69b1b586..000000000 --- a/src/main/java/org/folio/search/configuration/kafka/SubResourceKafkaConfiguration.java +++ /dev/null @@ -1,47 +0,0 @@ -package org.folio.search.configuration.kafka; - -import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG; -import static org.folio.search.configuration.kafka.KafkaConfiguration.SearchTopic.INDEX_SUB_RESOURCE; - -import java.util.Map; -import lombok.RequiredArgsConstructor; -import org.folio.search.model.event.SubResourceEvent; -import org.folio.spring.tools.kafka.FolioMessageProducer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.serializer.JsonDeserializer; - -@Configuration -@RequiredArgsConstructor -public class SubResourceKafkaConfiguration extends KafkaConfiguration { - - private final KafkaProperties kafkaProperties; - - @Bean - public ConcurrentKafkaListenerContainerFactory subResourceListenerContainerFactory( - @Value("#{folioKafkaProperties.listener['index-sub-resource'].maxPollRecords}") Integer maxPollRecords, - @Value("#{folioKafkaProperties.listener['index-sub-resource'].maxPollIntervalMs}") Integer maxPollIntervalMs) { - var factory = new ConcurrentKafkaListenerContainerFactory(); - factory.setBatchListener(true); - var deserializer = new JsonDeserializer<>(SubResourceEvent.class, false); - var overrideProperties = Map.of(MAX_POLL_RECORDS_CONFIG, maxPollRecords, - MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs); - factory.setConsumerFactory(getConsumerFactory(deserializer, kafkaProperties, overrideProperties)); - return factory; - } - - @Bean - public KafkaTemplate subResourceKafkaTemplate() { - return new KafkaTemplate<>(getProducerFactory(kafkaProperties)); - } - - @Bean - public FolioMessageProducer subResourceMessageProducer() { - return new FolioMessageProducer<>(subResourceKafkaTemplate(), INDEX_SUB_RESOURCE); - } -} diff --git a/src/main/java/org/folio/search/configuration/properties/SearchConfigurationProperties.java b/src/main/java/org/folio/search/configuration/properties/SearchConfigurationProperties.java index 919dde3e3..4218594f4 100644 --- a/src/main/java/org/folio/search/configuration/properties/SearchConfigurationProperties.java +++ b/src/main/java/org/folio/search/configuration/properties/SearchConfigurationProperties.java @@ -69,6 +69,12 @@ public static class IndexingSettings { * Data format to use for passing data to elasticsearch. */ private IndexingDataFormat dataFormat; + + /** + * Scheduler delay for indexing subjects/contributors/classifications. + * */ + private long instanceChildrenIndexDelayMs; + } } diff --git a/src/main/java/org/folio/search/configuration/properties/SystemProperties.java b/src/main/java/org/folio/search/configuration/properties/SystemProperties.java new file mode 100644 index 000000000..e94bda503 --- /dev/null +++ b/src/main/java/org/folio/search/configuration/properties/SystemProperties.java @@ -0,0 +1,18 @@ +package org.folio.search.configuration.properties; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Data +@Component +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +@ConfigurationProperties(prefix = "folio.system") +public class SystemProperties { + + private String schemaName; + private String changeLog; +} diff --git a/src/main/java/org/folio/search/integration/message/KafkaMessageListener.java b/src/main/java/org/folio/search/integration/message/KafkaMessageListener.java index 4e0bfbf28..48d7add1a 100644 --- a/src/main/java/org/folio/search/integration/message/KafkaMessageListener.java +++ b/src/main/java/org/folio/search/integration/message/KafkaMessageListener.java @@ -24,7 +24,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.logging.log4j.message.FormattedMessage; import org.folio.search.domain.dto.ResourceEvent; -import org.folio.search.model.event.SubResourceEvent; import org.folio.search.model.types.ResourceType; import org.folio.search.service.ResourceService; import org.folio.search.service.config.ConfigSynchronizationService; @@ -70,31 +69,6 @@ public void handleInstanceEvents(List> con } - /** - * Handles instance sub-resource events and indexes them after extracting from instance resource.. - * - * @param consumerRecords - list of consumer records from Apache Kafka to process. - */ - @KafkaListener( - id = KafkaConstants.INSTANCE_SUB_RESOURCE_LISTENER_ID, - containerFactory = "subResourceListenerContainerFactory", - topicPattern = "#{folioKafkaProperties.listener['index-sub-resource'].topicPattern}", - groupId = "#{folioKafkaProperties.listener['index-sub-resource'].groupId}", - concurrency = "#{folioKafkaProperties.listener['index-sub-resource'].concurrency}") - public void handleInstanceChildrenEvents(List> consumerRecords) { - log.info("Processing instance sub resource events from kafka events [number of events: {}]", - consumerRecords.size()); - var batch = consumerRecords.stream() - .map(ConsumerRecord::value) - .toList(); - var batchByTenant = batch.stream().collect(Collectors.groupingBy(ResourceEvent::getTenant)); - batchByTenant.forEach((tenant, resourceEvents) -> executionService.executeSystemUserScoped(tenant, () -> { - folioMessageBatchProcessor.consumeBatchWithFallback(resourceEvents, KAFKA_RETRY_TEMPLATE_NAME, - resourceService::indexInstanceSubResources, KafkaMessageListener::logFailedEvent); - return null; - })); - } - /** * Handles authority record events and indexes them using event body. * diff --git a/src/main/java/org/folio/search/model/entity/InstanceClassificationEntityAgg.java b/src/main/java/org/folio/search/model/entity/InstanceClassificationEntityAgg.java deleted file mode 100644 index 423dcf5fe..000000000 --- a/src/main/java/org/folio/search/model/entity/InstanceClassificationEntityAgg.java +++ /dev/null @@ -1,11 +0,0 @@ -package org.folio.search.model.entity; - -import java.util.Set; -import org.folio.search.model.index.InstanceSubResource; - -public record InstanceClassificationEntityAgg( - String id, - String typeId, - String number, - Set instances -) { } diff --git a/src/main/java/org/folio/search/model/entity/TenantEntity.java b/src/main/java/org/folio/search/model/entity/TenantEntity.java new file mode 100644 index 000000000..1eadb8bd1 --- /dev/null +++ b/src/main/java/org/folio/search/model/entity/TenantEntity.java @@ -0,0 +1,3 @@ +package org.folio.search.model.entity; + +public record TenantEntity(String id, String centralId, boolean active) { } diff --git a/src/main/java/org/folio/search/service/InstanceChildrenResourceService.java b/src/main/java/org/folio/search/service/InstanceChildrenResourceService.java index a5c960602..230325808 100644 --- a/src/main/java/org/folio/search/service/InstanceChildrenResourceService.java +++ b/src/main/java/org/folio/search/service/InstanceChildrenResourceService.java @@ -1,22 +1,13 @@ package org.folio.search.service; -import static java.util.Collections.singletonList; -import static org.apache.commons.lang3.StringUtils.startsWith; -import static org.folio.search.utils.SearchConverterUtils.getResourceSource; -import static org.folio.search.utils.SearchConverterUtils.isUpdateEventForResourceSharing; -import static org.folio.search.utils.SearchUtils.SOURCE_CONSORTIUM_PREFIX; - -import java.util.LinkedList; import java.util.List; import java.util.Map; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.folio.search.domain.dto.ResourceEvent; import org.folio.search.domain.dto.ResourceEventType; -import org.folio.search.model.event.SubResourceEvent; import org.folio.search.service.consortium.ConsortiumTenantProvider; import org.folio.search.service.converter.preprocessor.extractor.ChildResourceExtractor; -import org.folio.spring.tools.kafka.FolioMessageProducer; import org.springframework.stereotype.Component; /** @@ -28,63 +19,9 @@ @RequiredArgsConstructor public class InstanceChildrenResourceService { - private final FolioMessageProducer messageProducer; private final List resourceExtractors; private final ConsortiumTenantProvider consortiumTenantProvider; - public void sendChildrenEvent(ResourceEvent event) { - var needChildrenEvent = false; - if (isUpdateEventForResourceSharing(event)) { - needChildrenEvent = resourceExtractors.stream() - .anyMatch(extractor -> !extractor.hasChildResourceChanges(event)); - } else if (!startsWith(getResourceSource(event), SOURCE_CONSORTIUM_PREFIX)) { - needChildrenEvent = resourceExtractors.stream() - .anyMatch(extractor -> extractor.hasChildResourceChanges(event)); - } - - if (needChildrenEvent) { - var childEvent = SubResourceEvent.fromResourceEvent(event); - log.debug("sendChildrenEvent::Sending event for instance child entities processing"); - if (log.isDebugEnabled()) { - log.debug("sendChildrenEvent::Sending event for instance child entities processing [{}]", event); - } - messageProducer.sendMessages(singletonList(childEvent)); - } else { - log.debug("sendChildrenEvent::Not sending event for instance child entities processing"); - if (log.isDebugEnabled()) { - log.debug("sendChildrenEvent::Not sending event for instance child entities processing [{}]", event); - } - } - } - - public List extractChildren(ResourceEvent event) { - log.debug("processChildren::Starting instance children event processing"); - if (log.isDebugEnabled()) { - log.debug("processChildren::Starting instance children event processing [{}]", event); - } - - var events = new LinkedList(); - - if (isUpdateEventForResourceSharing(event)) { - for (var resourceExtractor : resourceExtractors) { - events.addAll(resourceExtractor.prepareEventsOnSharing(event)); - } - } else if (startsWith(getResourceSource(event), SOURCE_CONSORTIUM_PREFIX)) { - log.debug( - "processChildren::Finished instance children event processing. No additional action for shadow instance."); - return events; - } else { - for (var resourceExtractor : resourceExtractors) { - events.addAll(resourceExtractor.prepareEvents(event)); - } - } - - if (log.isDebugEnabled()) { - log.debug("processChildren::Finished instance children event processing. Events after: [{}], ", events); - } - return events; - } - public void persistChildren(String tenantId, List events) { var shared = consortiumTenantProvider.isCentralTenant(tenantId); resourceExtractors.forEach(resourceExtractor -> resourceExtractor.persistChildren(shared, events)); diff --git a/src/main/java/org/folio/search/service/ResourceService.java b/src/main/java/org/folio/search/service/ResourceService.java index c1b40ac67..f76a543bf 100644 --- a/src/main/java/org/folio/search/service/ResourceService.java +++ b/src/main/java/org/folio/search/service/ResourceService.java @@ -14,7 +14,6 @@ import static org.folio.search.utils.SearchUtils.getNumberOfRequests; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -28,7 +27,6 @@ import org.folio.search.domain.dto.FolioIndexOperationResponse; import org.folio.search.domain.dto.ResourceEvent; import org.folio.search.domain.dto.ResourceEventType; -import org.folio.search.model.event.SubResourceEvent; import org.folio.search.model.index.SearchDocumentBody; import org.folio.search.model.metadata.ResourceDescription; import org.folio.search.model.metadata.ResourceIndexingConfiguration; @@ -79,16 +77,6 @@ public FolioIndexOperationResponse indexResources(List resourceEv return bulkIndexResponse; } - public FolioIndexOperationResponse indexInstanceSubResources(List events) { - var childEvents = events.stream() - .map(instanceChildrenResourceService::extractChildren) - .flatMap(Collection::stream) - .distinct() - .toList(); - - return indexResources(childEvents); - } - /** * Index list of resource id event to elasticsearch. * @@ -121,19 +109,10 @@ private Map> processIndexInstanceEvents(List instanceEvents) { - instanceEvents.forEach(event -> consortiumTenantExecutor.run( - () -> instanceChildrenResourceService.sendChildrenEvent(event))); - } - private Map> processDeleteInstanceEvents(List deleteEvents) { - if (deleteEvents != null) { - preProcessEvents(deleteEvents); - } return searchDocumentConverter.convert(deleteEvents); } diff --git a/src/main/java/org/folio/search/service/ScheduledInstanceSubResourcesService.java b/src/main/java/org/folio/search/service/ScheduledInstanceSubResourcesService.java new file mode 100644 index 000000000..1c5f9c5d9 --- /dev/null +++ b/src/main/java/org/folio/search/service/ScheduledInstanceSubResourcesService.java @@ -0,0 +1,93 @@ +package org.folio.search.service; + +import static java.util.function.UnaryOperator.identity; +import static java.util.stream.Collectors.toMap; +import static org.apache.commons.collections4.MapUtils.getString; +import static org.folio.search.utils.SearchUtils.ID_FIELD; + +import java.util.List; +import java.util.Map; +import lombok.extern.log4j.Log4j2; +import org.folio.search.domain.dto.ResourceEvent; +import org.folio.search.domain.dto.ResourceEventType; +import org.folio.search.model.types.ReindexEntityType; +import org.folio.search.service.reindex.ReindexConstants; +import org.folio.search.service.reindex.jdbc.InstanceChildResourceRepository; +import org.folio.search.service.reindex.jdbc.SubResourceResult; +import org.folio.search.service.reindex.jdbc.SubResourcesLockRepository; +import org.folio.search.service.reindex.jdbc.TenantRepository; +import org.folio.search.service.reindex.jdbc.UploadRangeRepository; +import org.folio.spring.service.SystemUserScopedExecutionService; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +@Log4j2 +@Service +public class ScheduledInstanceSubResourcesService { + + private final ResourceService resourceService; + private final TenantRepository tenantRepository; + private final Map repositories; + private final SubResourcesLockRepository subResourcesLockRepository; + private final SystemUserScopedExecutionService executionService; + + public ScheduledInstanceSubResourcesService(ResourceService resourceService, + TenantRepository tenantRepository, + List repositories, + SubResourcesLockRepository subResourcesLockRepository, + SystemUserScopedExecutionService executionService) { + this.resourceService = resourceService; + this.tenantRepository = tenantRepository; + this.repositories = repositories.stream() + .filter(InstanceChildResourceRepository.class::isInstance) + .collect(toMap(UploadRangeRepository::entityType, identity())); + this.subResourcesLockRepository = subResourcesLockRepository; + this.executionService = executionService; + } + + @Scheduled(fixedDelayString = "#{searchConfigurationProperties.indexing.instanceChildrenIndexDelayMs}") + public void persistChildren() { + log.info("persistChildren::Starting instance children processing"); + tenantRepository.fetchDataTenantIds() + .forEach(tenant -> executionService.executeSystemUserScoped(tenant, () -> { + var entityTypes = repositories.keySet(); + for (var entityType : entityTypes) { + var timestamp = subResourcesLockRepository.lockSubResource(entityType, tenant); + if (timestamp.isPresent()) { + SubResourceResult result = null; + try { + result = repositories.get(entityType).fetchByTimestamp(tenant, timestamp.get()); + if (result.hasRecords()) { + var events = map(result.records(), entityType, tenant); + resourceService.indexResources(events); + } + } catch (Exception e) { + log.error("persistChildren::Error processing instance children", e); + } finally { + var lastUpdatedDate = result == null || result.lastUpdateDate() == null + ? timestamp.get() + : result.lastUpdateDate(); + subResourcesLockRepository.unlockSubResource(entityType, lastUpdatedDate, tenant); + } + } + } + return null; + })); + + log.debug("persistChildren::Finished instance children processing"); + } + + public List map(List> recordMaps, ReindexEntityType entityType, String tenant) { + return recordMaps.stream() + .map(map -> { + var instancesEmpty = map.get("instances") == null; + return new ResourceEvent().id(getString(map, ID_FIELD)) + .type(instancesEmpty ? ResourceEventType.DELETE : ResourceEventType.CREATE) + .resourceName(ReindexConstants.RESOURCE_NAME_MAP.get(entityType).getName()) + ._new(instancesEmpty ? null : map) + .tenant(tenant); + }) + .toList(); + } + +} diff --git a/src/main/java/org/folio/search/service/converter/preprocessor/extractor/ChildResourceExtractor.java b/src/main/java/org/folio/search/service/converter/preprocessor/extractor/ChildResourceExtractor.java index ebde8786f..a25eef868 100644 --- a/src/main/java/org/folio/search/service/converter/preprocessor/extractor/ChildResourceExtractor.java +++ b/src/main/java/org/folio/search/service/converter/preprocessor/extractor/ChildResourceExtractor.java @@ -15,19 +15,12 @@ import org.folio.search.domain.dto.ResourceEvent; import org.folio.search.domain.dto.ResourceEventType; import org.folio.search.service.reindex.jdbc.InstanceChildResourceRepository; -import org.springframework.transaction.annotation.Transactional; @RequiredArgsConstructor public abstract class ChildResourceExtractor { private final InstanceChildResourceRepository repository; - public abstract List prepareEvents(ResourceEvent resource); - - public abstract List prepareEventsOnSharing(ResourceEvent resource); - - public abstract boolean hasChildResourceChanges(ResourceEvent event); - protected abstract List> constructRelations(boolean shared, ResourceEvent event, List> entities); @@ -35,7 +28,6 @@ protected abstract List> constructRelations(boolean shared, protected abstract String childrenFieldName(); - @Transactional public void persistChildren(boolean shared, List events) { var instanceIdsForDeletion = events.stream() .filter(event -> event.getType() != ResourceEventType.CREATE && event.getType() != ResourceEventType.REINDEX) diff --git a/src/main/java/org/folio/search/service/converter/preprocessor/extractor/impl/ClassificationResourceExtractor.java b/src/main/java/org/folio/search/service/converter/preprocessor/extractor/impl/ClassificationResourceExtractor.java index 4fa63a050..b22066e22 100644 --- a/src/main/java/org/folio/search/service/converter/preprocessor/extractor/impl/ClassificationResourceExtractor.java +++ b/src/main/java/org/folio/search/service/converter/preprocessor/extractor/impl/ClassificationResourceExtractor.java @@ -1,118 +1,28 @@ package org.folio.search.service.converter.preprocessor.extractor.impl; -import static java.util.Collections.emptyList; import static org.apache.commons.lang3.StringUtils.EMPTY; -import static org.apache.commons.lang3.StringUtils.defaultIfBlank; -import static org.apache.commons.lang3.StringUtils.truncate; -import static org.folio.search.utils.CollectionUtils.subtract; -import static org.folio.search.utils.SearchConverterUtils.getNewAsMap; -import static org.folio.search.utils.SearchConverterUtils.getOldAsMap; import static org.folio.search.utils.SearchUtils.CLASSIFICATIONS_FIELD; import static org.folio.search.utils.SearchUtils.CLASSIFICATION_NUMBER_FIELD; import static org.folio.search.utils.SearchUtils.CLASSIFICATION_TYPE_FIELD; import static org.folio.search.utils.SearchUtils.prepareForExpectedFormat; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; import lombok.extern.log4j.Log4j2; -import org.apache.commons.collections4.MapUtils; import org.folio.search.domain.dto.ResourceEvent; -import org.folio.search.domain.dto.ResourceEventType; -import org.folio.search.domain.dto.TenantConfiguredFeature; -import org.folio.search.model.entity.InstanceClassificationEntityAgg; -import org.folio.search.model.index.ClassificationResource; -import org.folio.search.model.types.ResourceType; -import org.folio.search.service.FeatureConfigService; import org.folio.search.service.converter.preprocessor.extractor.ChildResourceExtractor; import org.folio.search.service.reindex.jdbc.ClassificationRepository; -import org.folio.search.utils.CollectionUtils; -import org.folio.search.utils.JsonConverter; import org.folio.search.utils.ShaUtils; -import org.jetbrains.annotations.NotNull; import org.springframework.stereotype.Component; @Log4j2 @Component public class ClassificationResourceExtractor extends ChildResourceExtractor { - private final JsonConverter jsonConverter; - private final FeatureConfigService featureConfigService; - private final ClassificationRepository repository; - - public ClassificationResourceExtractor(ClassificationRepository repository, JsonConverter jsonConverter, - FeatureConfigService featureConfigService) { + public ClassificationResourceExtractor(ClassificationRepository repository) { super(repository); - this.jsonConverter = jsonConverter; - this.featureConfigService = featureConfigService; - this.repository = repository; - } - - @Override - public List prepareEvents(ResourceEvent event) { - if (!featureConfigService.isEnabled(TenantConfiguredFeature.BROWSE_CLASSIFICATIONS)) { - return emptyList(); - } - - var oldClassifications = getChildResources(getOldAsMap(event)); - var newClassifications = getChildResources(getNewAsMap(event)); - - if (oldClassifications.equals(newClassifications)) { - return emptyList(); - } - var tenant = event.getTenant(); - var classificationsForCreate = subtract(newClassifications, oldClassifications); - var classificationsForDelete = subtract(oldClassifications, newClassifications); - - var idsForCreate = toIds(classificationsForCreate); - var idsForDelete = toIds(classificationsForDelete); - - List idsForFetch = new ArrayList<>(); - idsForFetch.addAll(idsForCreate); - idsForFetch.addAll(idsForDelete); - - var entityAggList = repository.fetchByIds(idsForFetch); - var list = getResourceEventsForDeletion(idsForDelete, entityAggList, tenant); - - var list1 = entityAggList.stream() - .map(entities -> toResourceEvent(entities, tenant)) - .toList(); - return CollectionUtils.mergeSafelyToList(list, list1); - } - - @Override - public List prepareEventsOnSharing(ResourceEvent event) { - if (!featureConfigService.isEnabled(TenantConfiguredFeature.BROWSE_CLASSIFICATIONS)) { - return emptyList(); - } - - var classifications = getChildResources(getOldAsMap(event)); - - if (!classifications.equals(getChildResources(getNewAsMap(event)))) { - log.warn("Classifications are different on Update for instance sharing"); - return emptyList(); - } - - var tenant = event.getTenant(); - - var entitiesForDelete = toIds(classifications); - var entityAggList = repository.fetchByIds(entitiesForDelete); - - return entityAggList.stream() - .map(entities -> toResourceEvent(entities, tenant)) - .toList(); - } - - @Override - public boolean hasChildResourceChanges(ResourceEvent event) { - var oldClassifications = getChildResources(getOldAsMap(event)); - var newClassifications = getChildResources(getNewAsMap(event)); - - return !oldClassifications.equals(newClassifications); } @Override @@ -128,6 +38,9 @@ protected List> constructRelations(boolean shared, ResourceE @Override protected Map constructEntity(Map entityProperties) { + if (entityProperties == null) { + return null; + } var classificationNumber = prepareForExpectedFormat(entityProperties.get(CLASSIFICATION_NUMBER_FIELD), 50); if (classificationNumber.isEmpty()) { return null; @@ -147,54 +60,4 @@ protected Map constructEntity(Map entityProperti protected String childrenFieldName() { return CLASSIFICATIONS_FIELD; } - - private List getResourceEventsForDeletion(List idsForDelete, - List entityAggList, - String tenant) { - var notFoundEntitiesForDelete = new ArrayList<>(idsForDelete); - var iterator = notFoundEntitiesForDelete.iterator(); - while (iterator.hasNext()) { - var classification = iterator.next(); - for (InstanceClassificationEntityAgg agg : entityAggList) { - if (agg.id().equals(classification)) { - iterator.remove(); - } - } - } - - return notFoundEntitiesForDelete.stream() - .map(classificationId -> toResourceDeleteEvent(classificationId, tenant)) - .toList(); - } - - private ResourceEvent toResourceDeleteEvent(String id, String tenant) { - return new ResourceEvent() - .id(id) - .tenant(tenant) - .resourceName(ResourceType.INSTANCE_CLASSIFICATION.getName()) - .type(ResourceEventType.DELETE); - } - - private ResourceEvent toResourceEvent(InstanceClassificationEntityAgg source, String tenant) { - var id = source.id(); - var resource = new ClassificationResource(id, source.typeId(), source.number(), source.instances()); - return new ResourceEvent() - .id(id) - .tenant(tenant) - .resourceName(ResourceType.INSTANCE_CLASSIFICATION.getName()) - .type(ResourceEventType.UPDATE) - ._new(jsonConverter.convertToMap(resource)); - } - - private String getClassificationId(String number, String typeId) { - return ShaUtils.sha(truncate(number.replace("\\", "\\\\"), 50), typeId); - } - - @NotNull - private List toIds(Set> subtract) { - return subtract.stream() - .map(map -> getClassificationId(defaultIfBlank(MapUtils.getString(map, CLASSIFICATION_NUMBER_FIELD), ""), - MapUtils.getString(map, CLASSIFICATION_TYPE_FIELD))) - .collect(Collectors.toCollection(ArrayList::new)); - } } diff --git a/src/main/java/org/folio/search/service/converter/preprocessor/extractor/impl/ContributorResourceExtractor.java b/src/main/java/org/folio/search/service/converter/preprocessor/extractor/impl/ContributorResourceExtractor.java index 5c0b201f4..9e6682b98 100644 --- a/src/main/java/org/folio/search/service/converter/preprocessor/extractor/impl/ContributorResourceExtractor.java +++ b/src/main/java/org/folio/search/service/converter/preprocessor/extractor/impl/ContributorResourceExtractor.java @@ -1,106 +1,28 @@ package org.folio.search.service.converter.preprocessor.extractor.impl; -import static java.util.Collections.emptyList; import static org.apache.commons.lang3.StringUtils.EMPTY; -import static org.apache.commons.lang3.StringUtils.defaultIfBlank; -import static org.apache.commons.lang3.StringUtils.truncate; -import static org.folio.search.utils.CollectionUtils.subtract; -import static org.folio.search.utils.SearchConverterUtils.getNewAsMap; -import static org.folio.search.utils.SearchConverterUtils.getOldAsMap; import static org.folio.search.utils.SearchUtils.AUTHORITY_ID_FIELD; import static org.folio.search.utils.SearchUtils.CONTRIBUTORS_FIELD; import static org.folio.search.utils.SearchUtils.CONTRIBUTOR_TYPE_FIELD; import static org.folio.search.utils.SearchUtils.prepareForExpectedFormat; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; import lombok.extern.log4j.Log4j2; -import org.apache.commons.collections4.MapUtils; import org.folio.search.domain.dto.ResourceEvent; -import org.folio.search.domain.dto.ResourceEventType; -import org.folio.search.model.entity.InstanceContributorEntityAgg; -import org.folio.search.model.index.ContributorResource; -import org.folio.search.model.types.ResourceType; import org.folio.search.service.converter.preprocessor.extractor.ChildResourceExtractor; import org.folio.search.service.reindex.jdbc.ContributorRepository; -import org.folio.search.utils.CollectionUtils; -import org.folio.search.utils.JsonConverter; import org.folio.search.utils.ShaUtils; -import org.jetbrains.annotations.NotNull; import org.springframework.stereotype.Component; @Log4j2 @Component public class ContributorResourceExtractor extends ChildResourceExtractor { - private final JsonConverter jsonConverter; - private final ContributorRepository repository; - - public ContributorResourceExtractor(ContributorRepository repository, JsonConverter jsonConverter) { + public ContributorResourceExtractor(ContributorRepository repository) { super(repository); - this.jsonConverter = jsonConverter; - this.repository = repository; - } - - @Override - public List prepareEvents(ResourceEvent event) { - var oldEntities = getChildResources(getOldAsMap(event)); - var newEntities = getChildResources(getNewAsMap(event)); - - if (oldEntities.equals(newEntities)) { - return emptyList(); - } - - var tenant = event.getTenant(); - var entitiesForCreate = subtract(newEntities, oldEntities); - var entitiesForDelete = subtract(oldEntities, newEntities); - - var idsForCreate = toIds(entitiesForCreate); - var idsForDelete = toIds(entitiesForDelete); - - List idsForFetch = new ArrayList<>(); - idsForFetch.addAll(idsForCreate); - idsForFetch.addAll(idsForDelete); - - var entityAggList = repository.fetchByIds(idsForFetch); - var list = getResourceEventsForDeletion(idsForDelete, entityAggList, tenant); - - var list1 = entityAggList.stream() - .map(entities -> toResourceEvent(entities, tenant)) - .toList(); - return CollectionUtils.mergeSafelyToList(list, list1); - } - - @Override - public List prepareEventsOnSharing(ResourceEvent event) { - var entities = getChildResources(getOldAsMap(event)); - - if (!entities.equals(getChildResources(getNewAsMap(event)))) { - log.warn("Contributors are different on Update for instance sharing"); - return emptyList(); - } - - var tenant = event.getTenant(); - - var ids = toIds(entities); - var entityAggList = repository.fetchByIds(ids); - - return entityAggList.stream() - .map(e -> toResourceEvent(e, tenant)) - .toList(); - } - - @Override - public boolean hasChildResourceChanges(ResourceEvent event) { - var oldContributors = getChildResources(getOldAsMap(event)); - var newContributors = getChildResources(getNewAsMap(event)); - - return !oldContributors.equals(newContributors); } @Override @@ -117,6 +39,9 @@ protected List> constructRelations(boolean shared, ResourceE @Override protected Map constructEntity(Map entityProperties) { + if (entityProperties == null) { + return null; + } var contributorName = prepareForExpectedFormat(entityProperties.get("name"), 255); if (contributorName.isBlank()) { return null; @@ -140,57 +65,4 @@ protected Map constructEntity(Map entityProperti protected String childrenFieldName() { return CONTRIBUTORS_FIELD; } - - private List getResourceEventsForDeletion(List idsForDelete, - List entityAggList, - String tenant) { - var notFoundEntitiesForDelete = new ArrayList<>(idsForDelete); - var iterator = notFoundEntitiesForDelete.iterator(); - while (iterator.hasNext()) { - var entityId = iterator.next(); - for (InstanceContributorEntityAgg agg : entityAggList) { - if (agg.id().equals(entityId)) { - iterator.remove(); - } - } - } - - return notFoundEntitiesForDelete.stream() - .map(classificationId -> toResourceDeleteEvent(classificationId, tenant)) - .toList(); - } - - private ResourceEvent toResourceDeleteEvent(String id, String tenant) { - return new ResourceEvent() - .id(id) - .tenant(tenant) - .resourceName(ResourceType.INSTANCE_CONTRIBUTOR.getName()) - .type(ResourceEventType.DELETE); - } - - private ResourceEvent toResourceEvent(InstanceContributorEntityAgg source, String tenant) { - var id = source.id(); - var resource = new ContributorResource(id, source.name(), source.nameTypeId(), source.authorityId(), - source.instances()); - return new ResourceEvent() - .id(id) - .tenant(tenant) - .resourceName(ResourceType.INSTANCE_CONTRIBUTOR.getName()) - .type(ResourceEventType.UPDATE) - ._new(jsonConverter.convertToMap(resource)); - } - - private String getEntityId(String name, String typeId, String authorityId) { - return ShaUtils.sha(truncate(name.replace("\\", "\\\\"), 255), - typeId, authorityId); - } - - @NotNull - private List toIds(Set> subtract) { - return subtract.stream() - .map(map -> getEntityId(defaultIfBlank(MapUtils.getString(map, "name"), ""), - MapUtils.getString(map, "contributorNameTypeId"), - MapUtils.getString(map, AUTHORITY_ID_FIELD))) - .collect(Collectors.toCollection(ArrayList::new)); - } } diff --git a/src/main/java/org/folio/search/service/converter/preprocessor/extractor/impl/SubjectResourceExtractor.java b/src/main/java/org/folio/search/service/converter/preprocessor/extractor/impl/SubjectResourceExtractor.java index e239b6a41..1279a0262 100644 --- a/src/main/java/org/folio/search/service/converter/preprocessor/extractor/impl/SubjectResourceExtractor.java +++ b/src/main/java/org/folio/search/service/converter/preprocessor/extractor/impl/SubjectResourceExtractor.java @@ -1,12 +1,6 @@ package org.folio.search.service.converter.preprocessor.extractor.impl; -import static java.util.Collections.emptyList; import static org.apache.commons.lang3.StringUtils.EMPTY; -import static org.apache.commons.lang3.StringUtils.defaultIfBlank; -import static org.apache.commons.lang3.StringUtils.truncate; -import static org.folio.search.utils.CollectionUtils.subtract; -import static org.folio.search.utils.SearchConverterUtils.getNewAsMap; -import static org.folio.search.utils.SearchConverterUtils.getOldAsMap; import static org.folio.search.utils.SearchUtils.AUTHORITY_ID_FIELD; import static org.folio.search.utils.SearchUtils.SUBJECTS_FIELD; import static org.folio.search.utils.SearchUtils.SUBJECT_SOURCE_ID_FIELD; @@ -14,95 +8,23 @@ import static org.folio.search.utils.SearchUtils.SUBJECT_VALUE_FIELD; import static org.folio.search.utils.SearchUtils.prepareForExpectedFormat; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; import lombok.extern.log4j.Log4j2; -import org.apache.commons.collections4.MapUtils; import org.folio.search.domain.dto.ResourceEvent; -import org.folio.search.domain.dto.ResourceEventType; -import org.folio.search.model.entity.InstanceSubjectEntityAgg; -import org.folio.search.model.index.SubjectResource; -import org.folio.search.model.types.ResourceType; import org.folio.search.service.converter.preprocessor.extractor.ChildResourceExtractor; import org.folio.search.service.reindex.jdbc.SubjectRepository; -import org.folio.search.utils.CollectionUtils; -import org.folio.search.utils.JsonConverter; import org.folio.search.utils.ShaUtils; -import org.jetbrains.annotations.NotNull; import org.springframework.stereotype.Component; @Log4j2 @Component public class SubjectResourceExtractor extends ChildResourceExtractor { - private final JsonConverter jsonConverter; - private final SubjectRepository repository; - - public SubjectResourceExtractor(SubjectRepository repository, JsonConverter jsonConverter) { + public SubjectResourceExtractor(SubjectRepository repository) { super(repository); - this.repository = repository; - this.jsonConverter = jsonConverter; - } - - @Override - public List prepareEvents(ResourceEvent event) { - var oldSubjects = getChildResources(getOldAsMap(event)); - var newSubjects = getChildResources(getNewAsMap(event)); - - if (oldSubjects.equals(newSubjects)) { - return emptyList(); - } - - var tenant = event.getTenant(); - var subjectsForCreate = subtract(newSubjects, oldSubjects); - var subjectsForDelete = subtract(oldSubjects, newSubjects); - - var idsForCreate = toIds(subjectsForCreate); - var idsForDelete = toIds(subjectsForDelete); - - List idsForFetch = new ArrayList<>(); - idsForFetch.addAll(idsForCreate); - idsForFetch.addAll(idsForDelete); - - var entityAggList = repository.fetchByIds(idsForFetch); - var list = getResourceEventsForDeletion(idsForDelete, entityAggList, tenant); - - var list1 = entityAggList.stream() - .map(entities -> toResourceEvent(entities, tenant)) - .toList(); - return CollectionUtils.mergeSafelyToList(list, list1); - } - - @Override - public List prepareEventsOnSharing(ResourceEvent event) { - var subjects = getChildResources(getOldAsMap(event)); - - if (!subjects.equals(getChildResources(getNewAsMap(event)))) { - log.warn("Subjects are different on Update for instance sharing"); - return emptyList(); - } - - var tenant = event.getTenant(); - - var ids = toIds(subjects); - var entityAggList = repository.fetchByIds(ids); - - return entityAggList.stream() - .map(entities -> toResourceEvent(entities, tenant)) - .toList(); - } - - @Override - public boolean hasChildResourceChanges(ResourceEvent event) { - var oldSubjects = getChildResources(getOldAsMap(event)); - var newSubjects = getChildResources(getNewAsMap(event)); - - return !oldSubjects.equals(newSubjects); } @Override @@ -118,6 +40,9 @@ protected List> constructRelations(boolean shared, ResourceE @Override protected Map constructEntity(Map entityProperties) { + if (entityProperties == null) { + return null; + } var subjectValue = prepareForExpectedFormat(entityProperties.get(SUBJECT_VALUE_FIELD), 255); if (subjectValue.isEmpty()) { return null; @@ -142,58 +67,4 @@ protected Map constructEntity(Map entityProperti protected String childrenFieldName() { return SUBJECTS_FIELD; } - - private List getResourceEventsForDeletion(List idsForDelete, - List entityAggList, - String tenant) { - var notFoundEntitiesForDelete = new ArrayList<>(idsForDelete); - var iterator = notFoundEntitiesForDelete.iterator(); - while (iterator.hasNext()) { - var entityId = iterator.next(); - for (InstanceSubjectEntityAgg agg : entityAggList) { - if (agg.id().equals(entityId)) { - iterator.remove(); - } - } - } - - return notFoundEntitiesForDelete.stream() - .map(classificationId -> toResourceDeleteEvent(classificationId, tenant)) - .toList(); - } - - private ResourceEvent toResourceDeleteEvent(String id, String tenant) { - return new ResourceEvent() - .id(id) - .tenant(tenant) - .resourceName(ResourceType.INSTANCE_SUBJECT.getName()) - .type(ResourceEventType.DELETE); - } - - private ResourceEvent toResourceEvent(InstanceSubjectEntityAgg source, String tenant) { - var id = source.id(); - var resource = new SubjectResource(id, source.value(), source.authorityId(), source.sourceId(), source.typeId(), - source.instances()); - return new ResourceEvent() - .id(id) - .tenant(tenant) - .resourceName(ResourceType.INSTANCE_SUBJECT.getName()) - .type(ResourceEventType.UPDATE) - ._new(jsonConverter.convertToMap(resource)); - } - - private String getEntityId(String number, String authorityId, String sourceId, String typeId) { - return ShaUtils.sha(truncate(number.replace("\\", "\\\\"), 255), authorityId, sourceId, - typeId); - } - - @NotNull - private List toIds(Set> subtract) { - return subtract.stream() - .map(map -> getEntityId(defaultIfBlank(MapUtils.getString(map, SUBJECT_VALUE_FIELD), ""), - MapUtils.getString(map, AUTHORITY_ID_FIELD), - MapUtils.getString(map, SUBJECT_SOURCE_ID_FIELD), - MapUtils.getString(map, SUBJECT_TYPE_ID_FIELD))) - .collect(Collectors.toCollection(ArrayList::new)); - } } diff --git a/src/main/java/org/folio/search/service/reindex/jdbc/ClassificationRepository.java b/src/main/java/org/folio/search/service/reindex/jdbc/ClassificationRepository.java index 60f01d291..355f14ad6 100644 --- a/src/main/java/org/folio/search/service/reindex/jdbc/ClassificationRepository.java +++ b/src/main/java/org/folio/search/service/reindex/jdbc/ClassificationRepository.java @@ -1,23 +1,22 @@ package org.folio.search.service.reindex.jdbc; -import static org.folio.search.utils.JdbcUtils.getParamPlaceholder; import static org.folio.search.utils.JdbcUtils.getParamPlaceholderForUuid; +import static org.folio.search.utils.SearchUtils.CLASSIFICATION_NUMBER_ENTITY_FIELD; import static org.folio.search.utils.SearchUtils.CLASSIFICATION_NUMBER_FIELD; import static org.folio.search.utils.SearchUtils.CLASSIFICATION_TYPE_FIELD; +import static org.folio.search.utils.SearchUtils.SUB_RESOURCE_INSTANCES_FIELD; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.Collections; +import java.sql.Timestamp; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import lombok.extern.log4j.Log4j2; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.collections4.ListUtils; import org.folio.search.configuration.properties.ReindexConfigurationProperties; -import org.folio.search.model.entity.InstanceClassificationEntityAgg; import org.folio.search.model.types.ReindexEntityType; import org.folio.search.service.reindex.ReindexConstants; import org.folio.search.utils.JdbcUtils; @@ -68,15 +67,64 @@ public class ClassificationRepository extends UploadRangeRepository implements I c.id; """; + private static final String SELECT_BY_UPDATED_QUERY = """ + WITH cte AS (SELECT c.id, + c.number, + c.type_id, + c.last_updated_date + FROM %1$s.classification c + WHERE last_updated_date > ? + ORDER BY last_updated_date + ) + SELECT c.id, + c.number, + c.type_id, + c.last_updated_date, + json_agg( + CASE + WHEN sub.instance_count IS NULL THEN NULL + ELSE + json_build_object( + 'count', sub.instance_count, + 'shared', sub.shared, + 'tenantId', sub.tenant_id + ) + END + ) AS instances + FROM cte c + LEFT JOIN + (SELECT cte.id, + ins.tenant_id, + ins.shared, + count(1) AS instance_count + FROM %1$s.instance_classification ins + INNER JOIN cte ON ins.classification_id = cte.id + GROUP BY cte.id, + ins.tenant_id, + ins.shared) sub ON c.id = sub.id + GROUP BY c.id, + c.number, + c.type_id, + c.last_updated_date + ORDER BY last_updated_date ASC; + """; + private static final String DELETE_QUERY = """ - DELETE - FROM %s.instance_classification - WHERE instance_id IN (%s); + WITH deleted_ids as ( + DELETE + FROM %1$s.instance_classification + WHERE instance_id IN (%2$s) + RETURNING classification_id + ) + UPDATE %1$s.classification + SET last_updated_date = CURRENT_TIMESTAMP + WHERE id IN (SELECT * FROM deleted_ids); """; + private static final String INSERT_ENTITIES_SQL = """ INSERT INTO %s.classification (id, number, type_id) VALUES (?, ?, ?) - ON CONFLICT DO NOTHING; + ON CONFLICT (id) DO UPDATE SET last_updated_date = CURRENT_TIMESTAMP; """; private static final String INSERT_RELATIONS_SQL = """ INSERT INTO %s.instance_classification (instance_id, classification_id, tenant_id, shared) @@ -86,8 +134,6 @@ public class ClassificationRepository extends UploadRangeRepository implements I private static final String ID_RANGE_INS_WHERE_CLAUSE = "ins.classification_id >= ? AND ins.classification_id <= ?"; private static final String ID_RANGE_CLAS_WHERE_CLAUSE = "c.id >= ? AND c.id <= ?"; - private static final String IDS_INS_WHERE_CLAUSE = "ins.classification_id IN (%1$s)"; - private static final String IDS_CLAS_WHERE_CLAUSE = "c.id IN (%1$s)"; protected ClassificationRepository(JdbcTemplate jdbcTemplate, JsonConverter jsonConverter, @@ -111,22 +157,20 @@ protected Optional subEntityTable() { return Optional.of(ReindexConstants.INSTANCE_CLASSIFICATION_TABLE); } - public List fetchByIds(List ids) { - if (CollectionUtils.isEmpty(ids)) { - return Collections.emptyList(); - } - var sql = SELECT_QUERY.formatted(JdbcUtils.getSchemaName(context), - IDS_INS_WHERE_CLAUSE.formatted(getParamPlaceholder(ids.size())), - IDS_CLAS_WHERE_CLAUSE.formatted(getParamPlaceholder(ids.size()))); - return jdbcTemplate.query(sql, instanceClassificationAggRowMapper(), ListUtils.union(ids, ids).toArray()); - } - @Override public List> fetchByIdRange(String lower, String upper) { var sql = getFetchBySql(); return jdbcTemplate.query(sql, rowToMapMapper(), lower, upper, lower, upper); } + @Override + public SubResourceResult fetchByTimestamp(String tenant, Timestamp timestamp) { + var sql = SELECT_BY_UPDATED_QUERY.formatted(JdbcUtils.getSchemaName(tenant, context.getFolioModuleMetadata())); + var records = jdbcTemplate.query(sql, rowToMapMapper2(), timestamp); + var lastUpdateDate = records.isEmpty() ? null : records.get(records.size() - 1).get(LAST_UPDATED_DATE_FIELD); + return new SubResourceResult(records, (Timestamp) lastUpdateDate); + } + @Override protected String getFetchBySql() { return SELECT_QUERY.formatted(JdbcUtils.getSchemaName(context), @@ -139,11 +183,30 @@ protected RowMapper> rowToMapMapper() { return (rs, rowNum) -> { Map classification = new HashMap<>(); classification.put("id", getId(rs)); - classification.put("number", getNumber(rs)); + classification.put(CLASSIFICATION_NUMBER_ENTITY_FIELD, getNumber(rs)); + classification.put("typeId", getTypeId(rs)); + + var maps = jsonConverter.fromJsonToListOfMaps(getInstances(rs)).stream().filter(Objects::nonNull).toList(); + if (!maps.isEmpty()) { + classification.put(SUB_RESOURCE_INSTANCES_FIELD, maps); + } + + return classification; + }; + } + + protected RowMapper> rowToMapMapper2() { + return (rs, rowNum) -> { + Map classification = new HashMap<>(); + classification.put("id", getId(rs)); + classification.put(CLASSIFICATION_NUMBER_ENTITY_FIELD, getNumber(rs)); classification.put("typeId", getTypeId(rs)); + classification.put(LAST_UPDATED_DATE_FIELD, rs.getTimestamp("last_updated_date")); - var maps = jsonConverter.fromJsonToListOfMaps(getInstances(rs)); - classification.put("instances", maps); + var maps = jsonConverter.fromJsonToListOfMaps(getInstances(rs)).stream().filter(Objects::nonNull).toList(); + if (!maps.isEmpty()) { + classification.put(SUB_RESOURCE_INSTANCES_FIELD, maps); + } return classification; }; @@ -191,15 +254,6 @@ public void saveAll(Set> entities, List> } } - private RowMapper instanceClassificationAggRowMapper() { - return (rs, rowNum) -> new InstanceClassificationEntityAgg( - getId(rs), - getTypeId(rs), - getNumber(rs), - parseInstanceSubResources(getInstances(rs)) - ); - } - private String getId(ResultSet rs) throws SQLException { return rs.getString("id"); } @@ -209,10 +263,10 @@ private String getTypeId(ResultSet rs) throws SQLException { } private String getNumber(ResultSet rs) throws SQLException { - return rs.getString("number"); + return rs.getString(CLASSIFICATION_NUMBER_ENTITY_FIELD); } private String getInstances(ResultSet rs) throws SQLException { - return rs.getString("instances"); + return rs.getString(SUB_RESOURCE_INSTANCES_FIELD); } } diff --git a/src/main/java/org/folio/search/service/reindex/jdbc/ContributorRepository.java b/src/main/java/org/folio/search/service/reindex/jdbc/ContributorRepository.java index a014e3860..68c87c6eb 100644 --- a/src/main/java/org/folio/search/service/reindex/jdbc/ContributorRepository.java +++ b/src/main/java/org/folio/search/service/reindex/jdbc/ContributorRepository.java @@ -4,13 +4,16 @@ import static org.folio.search.utils.JdbcUtils.getParamPlaceholderForUuid; import static org.folio.search.utils.SearchUtils.AUTHORITY_ID_FIELD; import static org.folio.search.utils.SearchUtils.CONTRIBUTOR_TYPE_FIELD; +import static org.folio.search.utils.SearchUtils.SUB_RESOURCE_INSTANCES_FIELD; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Timestamp; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import lombok.extern.log4j.Log4j2; @@ -71,15 +74,70 @@ public class ContributorRepository extends UploadRangeRepository implements Inst c.id; """; + private static final String SELECT_BY_UPDATED_QUERY = """ + WITH cte AS (SELECT id, + name, + name_type_id, + authority_id, + last_updated_date + FROM %1$s.contributor + WHERE last_updated_date > ? + ORDER BY last_updated_date + ) + SELECT c.id, + c.name, + c.name_type_id, + c.authority_id, + c.last_updated_date, + json_agg( + CASE + WHEN sub.instance_count IS NULL THEN NULL + ELSE json_build_object( + 'count', sub.instance_count, + 'typeId', sub.type_ids, + 'shared', sub.shared, + 'tenantId', sub.tenant_id + ) + END + ) AS instances + FROM cte c + LEFT JOIN + (SELECT cte.id, + ins.tenant_id, + ins.shared, + array_agg(DISTINCT ins.type_id) FILTER (WHERE ins.type_id <> '') AS type_ids, + count(DISTINCT ins.instance_id) AS instance_count + FROM %1$s.instance_contributor ins + INNER JOIN cte + ON ins.contributor_id = cte.id + GROUP BY cte.id, + ins.tenant_id, + ins.shared) sub ON c.id = sub.id + GROUP BY c.id, + c.name, + c.name_type_id, + c.authority_id, + c.last_updated_date + ORDER BY last_updated_date ASC; + """; + + private static final String DELETE_QUERY = """ - DELETE - FROM %s.instance_contributor - WHERE instance_id IN (%s); + WITH deleted_ids as ( + DELETE + FROM %1$s.instance_contributor + WHERE instance_id IN (%2$s) + RETURNING contributor_id + ) + UPDATE %1$s.contributor + SET last_updated_date = CURRENT_TIMESTAMP + WHERE id IN (SELECT * FROM deleted_ids); """; + private static final String INSERT_ENTITIES_SQL = """ INSERT INTO %s.contributor (id, name, name_type_id, authority_id) VALUES (?, ?, ?, ?) - ON CONFLICT DO NOTHING; + ON CONFLICT (id) DO UPDATE SET last_updated_date = CURRENT_TIMESTAMP; """; private static final String INSERT_RELATIONS_SQL = """ INSERT INTO %s.instance_contributor (instance_id, contributor_id, type_id, tenant_id, shared) @@ -130,6 +188,14 @@ public List> fetchByIdRange(String lower, String upper) { return jdbcTemplate.query(sql, rowToMapMapper(), lower, upper, lower, upper); } + @Override + public SubResourceResult fetchByTimestamp(String tenant, Timestamp timestamp) { + var sql = SELECT_BY_UPDATED_QUERY.formatted(JdbcUtils.getSchemaName(tenant, context.getFolioModuleMetadata())); + var records = jdbcTemplate.query(sql, rowToMapMapper2(), timestamp); + var lastUpdateDate = records.isEmpty() ? null : records.get(records.size() - 1).get(LAST_UPDATED_DATE_FIELD); + return new SubResourceResult(records, (Timestamp) lastUpdateDate); + } + @Override protected String getFetchBySql() { return SELECT_QUERY.formatted(JdbcUtils.getSchemaName(context), @@ -145,8 +211,28 @@ protected RowMapper> rowToMapMapper() { contributor.put("contributorNameTypeId", getNameTypeId(rs)); contributor.put(AUTHORITY_ID_FIELD, getAuthorityId(rs)); - var maps = jsonConverter.fromJsonToListOfMaps(getInstances(rs)); - contributor.put("instances", maps); + var maps = jsonConverter.fromJsonToListOfMaps(getInstances(rs)).stream().filter(Objects::nonNull).toList(); + if (!maps.isEmpty()) { + contributor.put(SUB_RESOURCE_INSTANCES_FIELD, maps); + } + + return contributor; + }; + } + + protected RowMapper> rowToMapMapper2() { + return (rs, rowNum) -> { + Map contributor = new HashMap<>(); + contributor.put("id", getId(rs)); + contributor.put("name", getName(rs)); + contributor.put("contributorNameTypeId", getNameTypeId(rs)); + contributor.put(LAST_UPDATED_DATE_FIELD, rs.getTimestamp("last_updated_date")); + contributor.put(AUTHORITY_ID_FIELD, getAuthorityId(rs)); + + var maps = jsonConverter.fromJsonToListOfMaps(getInstances(rs)).stream().filter(Objects::nonNull).toList(); + if (!maps.isEmpty()) { + contributor.put(SUB_RESOURCE_INSTANCES_FIELD, maps); + } return contributor; }; @@ -223,7 +309,7 @@ private String getAuthorityId(ResultSet rs) throws SQLException { } private String getInstances(ResultSet rs) throws SQLException { - return rs.getString("instances"); + return rs.getString(SUB_RESOURCE_INSTANCES_FIELD); } } diff --git a/src/main/java/org/folio/search/service/reindex/jdbc/InstanceChildResourceRepository.java b/src/main/java/org/folio/search/service/reindex/jdbc/InstanceChildResourceRepository.java index b550551d2..da8118073 100644 --- a/src/main/java/org/folio/search/service/reindex/jdbc/InstanceChildResourceRepository.java +++ b/src/main/java/org/folio/search/service/reindex/jdbc/InstanceChildResourceRepository.java @@ -3,10 +3,13 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.folio.search.model.types.ReindexEntityType; public interface InstanceChildResourceRepository { void deleteByInstanceIds(List instanceIds); void saveAll(Set> entities, List> entityRelations); + + ReindexEntityType entityType(); } diff --git a/src/main/java/org/folio/search/service/reindex/jdbc/SubResourceResult.java b/src/main/java/org/folio/search/service/reindex/jdbc/SubResourceResult.java new file mode 100644 index 000000000..751e7a65c --- /dev/null +++ b/src/main/java/org/folio/search/service/reindex/jdbc/SubResourceResult.java @@ -0,0 +1,12 @@ +package org.folio.search.service.reindex.jdbc; + +import java.sql.Timestamp; +import java.util.List; +import java.util.Map; + +public record SubResourceResult(List> records, Timestamp lastUpdateDate) { + + public boolean hasRecords() { + return !records.isEmpty(); + } +} diff --git a/src/main/java/org/folio/search/service/reindex/jdbc/SubResourcesLockRepository.java b/src/main/java/org/folio/search/service/reindex/jdbc/SubResourcesLockRepository.java new file mode 100644 index 000000000..8c9c44bff --- /dev/null +++ b/src/main/java/org/folio/search/service/reindex/jdbc/SubResourcesLockRepository.java @@ -0,0 +1,51 @@ +package org.folio.search.service.reindex.jdbc; + +import static org.folio.search.utils.JdbcUtils.getSchemaName; + +import java.sql.Timestamp; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.folio.search.model.types.ReindexEntityType; +import org.folio.spring.FolioModuleMetadata; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +@Repository +@RequiredArgsConstructor +public class SubResourcesLockRepository { + + private static final String LOCK_SUB_RESOURCE_SQL = """ + UPDATE %s.sub_resources_lock + SET locked_flag = TRUE + WHERE entity_type = ? AND locked_flag = FALSE + RETURNING last_updated_date + """; + + private static final String UNLOCK_SUB_RESOURCE_SQL = """ + UPDATE %s.sub_resources_lock + SET locked_flag = FALSE, last_updated_date = ? + WHERE entity_type = ? + """; + + private final JdbcTemplate jdbcTemplate; + private final FolioModuleMetadata moduleMetadata; + + public Optional lockSubResource(ReindexEntityType entityType, String tenantId) { + var formattedSql = formatSqlWithSchema(LOCK_SUB_RESOURCE_SQL, tenantId); + return jdbcTemplate.query( + formattedSql, + rs -> rs.next() ? Optional.of(rs.getTimestamp(1)) : Optional.empty(), + entityType.getType() + ); + } + + public void unlockSubResource(ReindexEntityType entityType, Timestamp lastUpdatedDate, String tenantId) { + + var formattedSql = formatSqlWithSchema(UNLOCK_SUB_RESOURCE_SQL, tenantId); + jdbcTemplate.update(formattedSql, lastUpdatedDate, entityType.getType()); + } + + private String formatSqlWithSchema(String sqlTemplate, String tenantId) { + return sqlTemplate.formatted(getSchemaName(tenantId, moduleMetadata)); + } +} diff --git a/src/main/java/org/folio/search/service/reindex/jdbc/SubjectRepository.java b/src/main/java/org/folio/search/service/reindex/jdbc/SubjectRepository.java index 94ce56560..fdaaa951d 100644 --- a/src/main/java/org/folio/search/service/reindex/jdbc/SubjectRepository.java +++ b/src/main/java/org/folio/search/service/reindex/jdbc/SubjectRepository.java @@ -6,13 +6,16 @@ import static org.folio.search.utils.SearchUtils.SUBJECT_SOURCE_ID_FIELD; import static org.folio.search.utils.SearchUtils.SUBJECT_TYPE_ID_FIELD; import static org.folio.search.utils.SearchUtils.SUBJECT_VALUE_FIELD; +import static org.folio.search.utils.SearchUtils.SUB_RESOURCE_INSTANCES_FIELD; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Timestamp; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import lombok.extern.log4j.Log4j2; @@ -72,15 +75,69 @@ public class SubjectRepository extends UploadRangeRepository implements Instance s.id; """; + private static final String SELECT_BY_UPDATED_QUERY = """ + WITH cte AS (SELECT s.id, + s.value, + s.authority_id, + s.source_id, + s.type_id, + s.last_updated_date + FROM %1$s.subject s + WHERE last_updated_date > ? + ORDER BY last_updated_date + ) + SELECT s.id, + s.value, + s.authority_id, + s.source_id, + s.type_id, + s.last_updated_date, + json_agg( + CASE + WHEN sub.instance_count IS NULL THEN NULL + ELSE json_build_object( + 'count', sub.instance_count, + 'shared', sub.shared, + 'tenantId', sub.tenant_id + ) + END + ) AS instances + FROM cte s + LEFT JOIN + (SELECT cte.id, + ins.tenant_id, + ins.shared, + count(1) AS instance_count + FROM %1$s.instance_subject ins + INNER JOIN cte ON ins.subject_id = cte.id + GROUP BY cte.id, + ins.tenant_id, + ins.shared) sub ON s.id = sub.id + GROUP BY s.id, + s.value, + s.authority_id, + s.source_id, + s.type_id, + s.last_updated_date + ORDER BY last_updated_date ASC; + """; + private static final String DELETE_QUERY = """ - DELETE - FROM %s.instance_subject - WHERE instance_id IN (%s); + WITH deleted_ids as ( + DELETE + FROM %1$s.instance_subject + WHERE instance_id IN (%2$s) + RETURNING subject_id + ) + UPDATE %1$s.subject + SET last_updated_date = CURRENT_TIMESTAMP + WHERE id IN (SELECT * FROM deleted_ids); """; + private static final String INSERT_ENTITIES_SQL = """ INSERT INTO %s.subject (id, value, authority_id, source_id, type_id) VALUES (?, ?, ?, ?, ?) - ON CONFLICT DO NOTHING; + ON CONFLICT (id) DO UPDATE SET last_updated_date = CURRENT_TIMESTAMP; """; private static final String INSERT_RELATIONS_SQL = """ INSERT INTO %s.instance_subject (instance_id, subject_id, tenant_id, shared) @@ -93,7 +150,6 @@ public class SubjectRepository extends UploadRangeRepository implements Instance private static final String IDS_INS_WHERE_CLAUSE = "ins.subject_id IN (%1$s)"; private static final String IDS_SUB_WHERE_CLAUSE = "s.id IN (%1$s)"; - protected SubjectRepository(JdbcTemplate jdbcTemplate, JsonConverter jsonConverter, FolioExecutionContext context, @@ -126,13 +182,19 @@ public List fetchByIds(List ids) { return jdbcTemplate.query(sql, instanceAggRowMapper(), ListUtils.union(ids, ids).toArray()); } - @Override public List> fetchByIdRange(String lower, String upper) { var sql = getFetchBySql(); return jdbcTemplate.query(sql, rowToMapMapper(), lower, upper, lower, upper); } + @Override + public SubResourceResult fetchByTimestamp(String tenant, Timestamp timestamp) { + var sql = SELECT_BY_UPDATED_QUERY.formatted(JdbcUtils.getSchemaName(tenant, context.getFolioModuleMetadata())); + var records = jdbcTemplate.query(sql, rowToMapMapper2(), timestamp); + var lastUpdateDate = records.isEmpty() ? null : records.get(records.size() - 1).get(LAST_UPDATED_DATE_FIELD); + return new SubResourceResult(records, (Timestamp) lastUpdateDate); + } @Override protected String getFetchBySql() { @@ -151,8 +213,29 @@ protected RowMapper> rowToMapMapper() { subject.put("sourceId", getSourceId(rs)); subject.put("typeId", getTypeId(rs)); - var maps = jsonConverter.fromJsonToListOfMaps(getInstances(rs)); - subject.put("instances", maps); + var maps = jsonConverter.fromJsonToListOfMaps(getInstances(rs)).stream().filter(Objects::nonNull).toList(); + if (!maps.isEmpty()) { + subject.put(SUB_RESOURCE_INSTANCES_FIELD, maps); + } + + return subject; + }; + } + + protected RowMapper> rowToMapMapper2() { + return (rs, rowNum) -> { + Map subject = new HashMap<>(); + subject.put("id", getId(rs)); + subject.put(SUBJECT_VALUE_FIELD, getValue(rs)); + subject.put(AUTHORITY_ID_FIELD, getAuthorityId(rs)); + subject.put("sourceId", getSourceId(rs)); + subject.put("typeId", getTypeId(rs)); + subject.put(LAST_UPDATED_DATE_FIELD, rs.getTimestamp("last_updated_date")); + + var maps = jsonConverter.fromJsonToListOfMaps(getInstances(rs)).stream().filter(Objects::nonNull).toList(); + if (!maps.isEmpty()) { + subject.put(SUB_RESOURCE_INSTANCES_FIELD, maps); + } return subject; }; @@ -234,6 +317,6 @@ private String getTypeId(ResultSet rs) throws SQLException { } private String getInstances(ResultSet rs) throws SQLException { - return rs.getString("instances"); + return rs.getString(SUB_RESOURCE_INSTANCES_FIELD); } } diff --git a/src/main/java/org/folio/search/service/reindex/jdbc/TenantRepository.java b/src/main/java/org/folio/search/service/reindex/jdbc/TenantRepository.java new file mode 100644 index 000000000..270810e5e --- /dev/null +++ b/src/main/java/org/folio/search/service/reindex/jdbc/TenantRepository.java @@ -0,0 +1,37 @@ +package org.folio.search.service.reindex.jdbc; + +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.folio.search.configuration.properties.SystemProperties; +import org.folio.search.model.entity.TenantEntity; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +@Repository +@RequiredArgsConstructor +public class TenantRepository { + + private final JdbcTemplate jdbcTemplate; + private final SystemProperties systemProperties; + + public static final String INSERT_QUERY = """ + INSERT INTO %s.known_tenant (id, central_id, active) + VALUES (?, ?, ?) ON CONFLICT (id) DO UPDATE SET active = ?; + """; + public static final String FETCH_QUERY = """ + SELECT id FROM %s.known_tenant + WHERE active = TRUE AND central_id IS NULL; + """; + + public void saveTenant(TenantEntity tenantEntity) { + String query = INSERT_QUERY.formatted(systemProperties.getSchemaName()); + jdbcTemplate.update(query, tenantEntity.id(), tenantEntity.centralId(), tenantEntity.active(), + tenantEntity.active()); + } + + public List fetchDataTenantIds() { + String query = FETCH_QUERY.formatted(systemProperties.getSchemaName()); + return jdbcTemplate.query(query, (rs, rowNum) -> rs.getString("id")); + } + +} diff --git a/src/main/java/org/folio/search/service/reindex/jdbc/UploadInstanceRepository.java b/src/main/java/org/folio/search/service/reindex/jdbc/UploadInstanceRepository.java index 9506bb0f0..7bc3a33cb 100644 --- a/src/main/java/org/folio/search/service/reindex/jdbc/UploadInstanceRepository.java +++ b/src/main/java/org/folio/search/service/reindex/jdbc/UploadInstanceRepository.java @@ -2,6 +2,7 @@ import static org.folio.search.utils.JdbcUtils.getFullTableName; +import java.sql.Timestamp; import java.util.Collections; import java.util.List; import java.util.Map; @@ -42,6 +43,11 @@ protected UploadInstanceRepository(JdbcTemplate jdbcTemplate, JsonConverter json super(jdbcTemplate, jsonConverter, context, reindexConfig); } + @Override + public SubResourceResult fetchByTimestamp(String tenant, Timestamp timestamp) { + return null; + } + @Override public ReindexEntityType entityType() { return ReindexEntityType.INSTANCE; diff --git a/src/main/java/org/folio/search/service/reindex/jdbc/UploadRangeRepository.java b/src/main/java/org/folio/search/service/reindex/jdbc/UploadRangeRepository.java index 20c616ce1..1123458a2 100644 --- a/src/main/java/org/folio/search/service/reindex/jdbc/UploadRangeRepository.java +++ b/src/main/java/org/folio/search/service/reindex/jdbc/UploadRangeRepository.java @@ -33,7 +33,7 @@ public abstract class UploadRangeRepository extends ReindexJdbcRepository { protected static final String SELECT_RECORD_SQL = "SELECT * from %s WHERE id >= ? AND id <= ?;"; - + protected static final String LAST_UPDATED_DATE_FIELD = "lastUpdatedDate"; private static final String UPSERT_UPLOAD_RANGE_SQL = """ INSERT INTO %s (id, entity_type, lower, upper, created_at, finished_at) VALUES (?, ?, ?, ?, ?, ?) @@ -69,6 +69,8 @@ public List> fetchByIdRange(String lower, String upper) { return jdbcTemplate.query(sql, rowToMapMapper(), lower, upper); } + public abstract SubResourceResult fetchByTimestamp(String tenant, Timestamp timestamp); + protected String getFetchBySql() { return SELECT_RECORD_SQL.formatted(getFullTableName(context, entityTable())); } @@ -88,6 +90,11 @@ protected Set parseInstanceSubResources(String instancesJso } } + protected List createRanges() { + var uploadRangeLevel = reindexConfig.getUploadRangeLevel(); + return RangeGenerator.createHexRanges(uploadRangeLevel); + } + private RowMapper uploadRangeRowMapper() { return (rs, rowNum) -> { var uploadRange = new UploadRangeEntity( @@ -116,11 +123,6 @@ private List prepareAndSaveUploadRanges() { return ranges; } - protected List createRanges() { - var uploadRangeLevel = reindexConfig.getUploadRangeLevel(); - return RangeGenerator.createHexRanges(uploadRangeLevel); - } - private void upsertUploadRanges(List uploadRanges) { var fullTableName = getFullTableName(context, UPLOAD_RANGE_TABLE); jdbcTemplate.batchUpdate(UPSERT_UPLOAD_RANGE_SQL.formatted(fullTableName), uploadRanges, BATCH_OPERATION_SIZE, diff --git a/src/main/java/org/folio/search/service/SearchTenantService.java b/src/main/java/org/folio/search/service/system/SearchTenantService.java similarity index 91% rename from src/main/java/org/folio/search/service/SearchTenantService.java rename to src/main/java/org/folio/search/service/system/SearchTenantService.java index 682176182..4ab665a14 100644 --- a/src/main/java/org/folio/search/service/SearchTenantService.java +++ b/src/main/java/org/folio/search/service/system/SearchTenantService.java @@ -1,4 +1,4 @@ -package org.folio.search.service; +package org.folio.search.service.system; import static java.lang.Boolean.parseBoolean; @@ -9,11 +9,14 @@ import org.folio.search.configuration.properties.SearchConfigurationProperties; import org.folio.search.domain.dto.LanguageConfig; import org.folio.search.domain.dto.ReindexRequest; +import org.folio.search.model.entity.TenantEntity; import org.folio.search.model.types.ReindexEntityType; +import org.folio.search.service.IndexService; import org.folio.search.service.browse.CallNumberBrowseRangeService; import org.folio.search.service.consortium.LanguageConfigServiceDecorator; import org.folio.search.service.metadata.ResourceDescriptionService; import org.folio.search.service.reindex.ReindexService; +import org.folio.search.service.reindex.jdbc.TenantRepository; import org.folio.spring.FolioExecutionContext; import org.folio.spring.liquibase.FolioSpringLiquibase; import org.folio.spring.service.PrepareSystemUserService; @@ -41,6 +44,7 @@ public class SearchTenantService extends TenantService { private final ResourceDescriptionService resourceDescriptionService; private final CallNumberBrowseRangeService callNumberBrowseRangeService; private final SearchConfigurationProperties searchConfigurationProperties; + private final TenantRepository tenantRepository; public SearchTenantService(JdbcTemplate jdbcTemplate, FolioExecutionContext context, FolioSpringLiquibase folioSpringLiquibase, KafkaAdminService kafkaAdminService, @@ -49,7 +53,8 @@ public SearchTenantService(JdbcTemplate jdbcTemplate, FolioExecutionContext cont LanguageConfigServiceDecorator languageConfigService, CallNumberBrowseRangeService callNumberBrowseRangeService, ResourceDescriptionService resourceDescriptionService, - SearchConfigurationProperties searchConfigurationProperties) { + SearchConfigurationProperties searchConfigurationProperties, + TenantRepository tenantRepository) { super(jdbcTemplate, context, folioSpringLiquibase); this.kafkaAdminService = kafkaAdminService; this.indexService = indexService; @@ -59,6 +64,7 @@ public SearchTenantService(JdbcTemplate jdbcTemplate, FolioExecutionContext cont this.callNumberBrowseRangeService = callNumberBrowseRangeService; this.resourceDescriptionService = resourceDescriptionService; this.searchConfigurationProperties = searchConfigurationProperties; + this.tenantRepository = tenantRepository; } /** @@ -84,7 +90,10 @@ public SearchTenantService(JdbcTemplate jdbcTemplate, FolioExecutionContext cont public synchronized void createOrUpdateTenant(TenantAttributes tenantAttributes) { var tenantId = context.getTenantId(); var centralTenant = centralTenant(tenantId, tenantAttributes); - if (tenantId.equals(centralTenant)) { + var isCentral = tenantId.equals(centralTenant); + var tenantEntity = new TenantEntity(tenantId, isCentral ? null : centralTenant, true); + tenantRepository.saveTenant(tenantEntity); + if (isCentral) { super.createOrUpdateTenant(tenantAttributes); } else { log.info("Not executing full tenant init for not central tenant {}.", tenantId); @@ -104,7 +113,10 @@ public synchronized void createOrUpdateTenant(TenantAttributes tenantAttributes) public void deleteTenant(TenantAttributes tenantAttributes) { var tenantId = context.getTenantId(); var centralTenant = centralTenant(tenantId, tenantAttributes); - if (tenantId.equals(centralTenant)) { + var isCentral = tenantId.equals(centralTenant); + var tenantEntity = new TenantEntity(tenantId, isCentral ? null : centralTenant, false); + tenantRepository.saveTenant(tenantEntity); + if (isCentral) { super.deleteTenant(tenantAttributes); } else { log.info("Not executing full tenant destroy for not central tenant {}.", tenantId); diff --git a/src/main/java/org/folio/search/service/system/SystemSchemaInitializer.java b/src/main/java/org/folio/search/service/system/SystemSchemaInitializer.java new file mode 100644 index 000000000..72b743529 --- /dev/null +++ b/src/main/java/org/folio/search/service/system/SystemSchemaInitializer.java @@ -0,0 +1,40 @@ +package org.folio.search.service.system; + +import liquibase.exception.LiquibaseException; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.folio.search.configuration.properties.SystemProperties; +import org.folio.spring.liquibase.FolioSpringLiquibase; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.boot.autoconfigure.liquibase.LiquibaseProperties; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +@Log4j2 +public class SystemSchemaInitializer implements InitializingBean { + + private final LiquibaseProperties liquibaseProperties; + private final FolioSpringLiquibase folioSpringLiquibase; + private final SystemProperties systemProperties; + + /** + * Performs database update using {@link FolioSpringLiquibase} and then returns previous configuration for this bean. + * + * @throws LiquibaseException - if liquibase update failed. + */ + @Override + public void afterPropertiesSet() throws LiquibaseException { + log.info("Starting liquibase update for system"); + + folioSpringLiquibase.setChangeLog(systemProperties.getChangeLog()); + folioSpringLiquibase.setDefaultSchema(systemProperties.getSchemaName()); + + folioSpringLiquibase.performLiquibaseUpdate(); + + folioSpringLiquibase.setChangeLog(liquibaseProperties.getChangeLog()); + folioSpringLiquibase.setDefaultSchema(liquibaseProperties.getDefaultSchema()); + + log.info("Completed liquibase update for system"); + } +} diff --git a/src/main/java/org/folio/search/utils/JdbcUtils.java b/src/main/java/org/folio/search/utils/JdbcUtils.java index 95ce07e3c..2c09f5f13 100644 --- a/src/main/java/org/folio/search/utils/JdbcUtils.java +++ b/src/main/java/org/folio/search/utils/JdbcUtils.java @@ -4,6 +4,7 @@ import lombok.experimental.UtilityClass; import org.folio.spring.FolioExecutionContext; +import org.folio.spring.FolioModuleMetadata; import org.springframework.jdbc.core.JdbcTemplate; @UtilityClass @@ -11,6 +12,10 @@ public class JdbcUtils { private static final String TRUNCATE_TABLE_SQL = "TRUNCATE TABLE %s;"; + public static String getSchemaName(String tenantId, FolioModuleMetadata folioModuleMetadata) { + return folioModuleMetadata.getDBSchemaName(tenantId); + } + public static String getSchemaName(FolioExecutionContext context) { return context.getFolioModuleMetadata().getDBSchemaName(context.getTenantId()); } diff --git a/src/main/java/org/folio/search/utils/SearchConverterUtils.java b/src/main/java/org/folio/search/utils/SearchConverterUtils.java index 224609365..317dfbfa4 100644 --- a/src/main/java/org/folio/search/utils/SearchConverterUtils.java +++ b/src/main/java/org/folio/search/utils/SearchConverterUtils.java @@ -2,11 +2,7 @@ import static java.util.Collections.emptyMap; import static org.apache.commons.collections4.MapUtils.getString; -import static org.apache.commons.lang3.StringUtils.removeStart; -import static org.apache.commons.lang3.StringUtils.startsWith; -import static org.folio.search.domain.dto.ResourceEventType.UPDATE; import static org.folio.search.utils.SearchUtils.ID_FIELD; -import static org.folio.search.utils.SearchUtils.SOURCE_CONSORTIUM_PREFIX; import static org.folio.search.utils.SearchUtils.SOURCE_FIELD; import java.util.Arrays; @@ -186,13 +182,6 @@ public static void copyEntityFields(Map source, Map + + + + diff --git a/src/main/resources/changelog/changes/v4.0/create-reindex-entity-tables.xml b/src/main/resources/changelog/changes/v4.0/create-reindex-entity-tables.xml index b347ea50d..e4eb33821 100644 --- a/src/main/resources/changelog/changes/v4.0/create-reindex-entity-tables.xml +++ b/src/main/resources/changelog/changes/v4.0/create-reindex-entity-tables.xml @@ -8,7 +8,8 @@ Enable pgcrypto extension - CREATE EXTENSION IF NOT EXISTS pgcrypto SCHEMA public; + CREATE + EXTENSION IF NOT EXISTS pgcrypto SCHEMA public; @@ -192,7 +193,7 @@ - + @@ -374,7 +375,8 @@ - + @@ -385,5 +387,125 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Create sub_resources_locks table + + + + + + + + + + + + + + + + + + + + Fill sub_resources_lock table with initial records + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/main/resources/changelog/changes/v4.0/sql/update-reindex-status-trigger-v2.sql b/src/main/resources/changelog/changes/v4.0/sql/update-reindex-status-trigger-v2.sql new file mode 100644 index 000000000..fc2b996a1 --- /dev/null +++ b/src/main/resources/changelog/changes/v4.0/sql/update-reindex-status-trigger-v2.sql @@ -0,0 +1,36 @@ +CREATE OR REPLACE FUNCTION update_reindex_status_trigger() + RETURNS TRIGGER AS +$$ +BEGIN + -- update status and end time for merge + IF OLD.status = 'MERGE_IN_PROGRESS' and NEW.total_merge_ranges = NEW.processed_merge_ranges + THEN + NEW.status = 'MERGE_COMPLETED'; + NEW.end_time_merge = current_timestamp; + ELSE + -- update status and end time for upload + IF OLD.status = 'UPLOAD_IN_PROGRESS' and NEW.total_upload_ranges = NEW.processed_upload_ranges + THEN + NEW.status = 'UPLOAD_COMPLETED'; + NEW.end_time_upload = current_timestamp; + UPDATE sub_resources_lock + SET last_updated_date = current_timestamp, locked_flag = FALSE + WHERE entity_type = lower(OLD.entity_type); + END IF; + END IF; + IF NEW.status = 'MERGE_IN_PROGRESS' AND NEW.entity_type = 'INSTANCE' THEN + UPDATE sub_resources_lock SET locked_flag = TRUE; + END IF; + IF NEW.status = 'UPLOAD_IN_PROGRESS' THEN + UPDATE sub_resources_lock SET locked_flag = TRUE WHERE entity_type = lower(NEW.entity_type); + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS reindex_status_updated_trigger ON reindex_status CASCADE; +CREATE TRIGGER reindex_status_updated_trigger + BEFORE UPDATE OF processed_merge_ranges, processed_upload_ranges + ON reindex_status + FOR EACH ROW +EXECUTE FUNCTION update_reindex_status_trigger(); diff --git a/src/main/resources/changelog/system/initial-schema.xml b/src/main/resources/changelog/system/initial-schema.xml new file mode 100644 index 000000000..9a2f3dc17 --- /dev/null +++ b/src/main/resources/changelog/system/initial-schema.xml @@ -0,0 +1,19 @@ + + + + + CREATE SCHEMA IF NOT EXISTS mod_search__system; + + + + + + + + + + + diff --git a/src/test/java/org/folio/search/integration/KafkaMessageListenerIT.java b/src/test/java/org/folio/search/integration/KafkaMessageListenerIT.java index 6e1f98cc3..ae095186e 100644 --- a/src/test/java/org/folio/search/integration/KafkaMessageListenerIT.java +++ b/src/test/java/org/folio/search/integration/KafkaMessageListenerIT.java @@ -10,7 +10,6 @@ import static org.folio.search.utils.KafkaConstants.EVENT_LISTENER_ID; import static org.folio.search.utils.SearchResponseHelper.getSuccessIndexOperationResponse; import static org.folio.search.utils.TestConstants.TENANT_ID; -import static org.folio.search.utils.TestConstants.instanceSubResourceTopic; import static org.folio.search.utils.TestConstants.inventoryAuthorityTopic; import static org.folio.search.utils.TestConstants.inventoryBoundWithTopic; import static org.folio.search.utils.TestConstants.inventoryInstanceTopic; @@ -35,7 +34,6 @@ import org.folio.search.configuration.RetryTemplateConfiguration; import org.folio.search.configuration.kafka.InstanceResourceEventKafkaConfiguration; import org.folio.search.configuration.kafka.ResourceEventKafkaConfiguration; -import org.folio.search.configuration.kafka.SubResourceKafkaConfiguration; import org.folio.search.configuration.properties.ReindexConfigurationProperties; import org.folio.search.configuration.properties.StreamIdsProperties; import org.folio.search.domain.dto.ResourceEvent; @@ -44,7 +42,6 @@ import org.folio.search.integration.message.FolioMessageBatchProcessor; import org.folio.search.integration.message.KafkaMessageListener; import org.folio.search.integration.message.interceptor.ResourceEventBatchInterceptor; -import org.folio.search.model.event.SubResourceEvent; import org.folio.search.model.types.ResourceType; import org.folio.search.service.ResourceService; import org.folio.search.service.config.ConfigSynchronizationService; @@ -66,13 +63,13 @@ import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.retry.annotation.EnableRetry; +import org.springframework.test.context.bean.override.mockito.MockitoBean; @Log4j2 @EnableKafka @@ -99,14 +96,12 @@ class KafkaMessageListenerIT { @Autowired private KafkaTemplate resourceKafkaTemplate; @Autowired - private KafkaTemplate subResourceKafkaTemplate; - @Autowired private FolioKafkaProperties kafkaProperties; - @MockBean + @MockitoBean private ResourceService resourceService; - @MockBean + @MockitoBean private SystemUserScopedExecutionService executionService; - @MockBean + @MockitoBean private ConfigSynchronizationService configSynchronizationService; @Autowired @@ -143,14 +138,6 @@ void handleInstanceEvents_positive_boundWithEvent() { verify(resourceService).indexInstancesById(List.of(expectedEvent))); } - @Test - void handleInstanceSubResourceEvents_positive() { - var expectedEvent = SubResourceEvent.fromResourceEvent(instanceEvent()); - subResourceKafkaTemplate.send(instanceSubResourceTopic(), INSTANCE_ID, expectedEvent); - await().atMost(ONE_MINUTE).pollInterval(ONE_HUNDRED_MILLISECONDS).untilAsserted(() -> - verify(resourceService).indexInstanceSubResources(List.of(expectedEvent))); - } - @Test void handleInstanceEvents_negative_tenantIndexNotInitialized() throws Exception { var idEvent = instanceEvent(); @@ -249,7 +236,6 @@ private static ResourceEvent authorityEvent(String id) { @EnableRetry(proxyTargetClass = true) @Import({ InstanceResourceEventKafkaConfiguration.class, ResourceEventKafkaConfiguration.class, - SubResourceKafkaConfiguration.class, KafkaAutoConfiguration.class, FolioMessageBatchProcessor.class, KafkaAdminService.class, LocalFileProvider.class, JsonConverter.class, JacksonAutoConfiguration.class, RetryTemplateConfiguration.class, ResourceEventBatchInterceptor.class diff --git a/src/test/java/org/folio/search/service/InstanceChildrenResourceServiceTest.java b/src/test/java/org/folio/search/service/InstanceChildrenResourceServiceTest.java index 972fbc0c2..2cfe4f1e1 100644 --- a/src/test/java/org/folio/search/service/InstanceChildrenResourceServiceTest.java +++ b/src/test/java/org/folio/search/service/InstanceChildrenResourceServiceTest.java @@ -1,14 +1,7 @@ package org.folio.search.service; -import static java.util.Collections.singletonList; -import static org.assertj.core.api.Assertions.assertThat; -import static org.folio.search.utils.SearchUtils.SOURCE_CONSORTIUM_PREFIX; -import static org.folio.search.utils.SearchUtils.SOURCE_FIELD; import static org.folio.search.utils.TestConstants.TENANT_ID; -import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import java.util.List; @@ -16,29 +9,23 @@ import java.util.UUID; import org.folio.search.domain.dto.ResourceEvent; import org.folio.search.domain.dto.ResourceEventType; -import org.folio.search.model.event.SubResourceEvent; import org.folio.search.service.consortium.ConsortiumTenantProvider; import org.folio.search.service.converter.preprocessor.extractor.ChildResourceExtractor; import org.folio.search.service.converter.preprocessor.extractor.impl.ClassificationResourceExtractor; import org.folio.search.service.converter.preprocessor.extractor.impl.ContributorResourceExtractor; import org.folio.search.service.converter.preprocessor.extractor.impl.SubjectResourceExtractor; import org.folio.spring.testing.type.UnitTest; -import org.folio.spring.tools.kafka.FolioMessageProducer; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @UnitTest @ExtendWith(MockitoExtension.class) class InstanceChildrenResourceServiceTest { - @Mock - private FolioMessageProducer messageProducer; @Mock private ConsortiumTenantProvider consortiumTenantProvider; @Mock @@ -55,94 +42,7 @@ class InstanceChildrenResourceServiceTest { void setUp() { this.resourceExtractors = List.of(classificationResourceExtractor, contributorResourceExtractor, subjectResourceExtractor); - service = new InstanceChildrenResourceService(messageProducer, resourceExtractors, consortiumTenantProvider); - } - - @ParameterizedTest - @ValueSource(ints = {0, 1, 2}) - void sendChildrenEvent(int extractorIndex) { - var event = new ResourceEvent() - ._new(Map.of(SOURCE_FIELD, "MARC")); - var expectedEvent = SubResourceEvent.fromResourceEvent(event); - when(resourceExtractors.get(extractorIndex).hasChildResourceChanges(event)).thenReturn(true); - - service.sendChildrenEvent(event); - - verify(messageProducer, times(1)).sendMessages(singletonList(expectedEvent)); - } - - @ParameterizedTest - @ValueSource(ints = {0, 1, 2}) - void sendChildrenEvent_resourceSharing(int extractorIndex) { - var event = resourceSharingEvent(); - var expectedEvent = SubResourceEvent.fromResourceEvent(event); - for (int i = 0; i < resourceExtractors.size(); i++) { - if (i != extractorIndex) { - lenient().when(resourceExtractors.get(extractorIndex).hasChildResourceChanges(event)).thenReturn(true); - } - } - - service.sendChildrenEvent(event); - - verify(messageProducer, times(1)).sendMessages(singletonList(expectedEvent)); - } - - @ParameterizedTest - @ValueSource(strings = {"MARC", "CONSORTIUM_MARC"}) - void sendChildrenEvent_noEvent(String source) { - var event = new ResourceEvent() - ._new(Map.of(SOURCE_FIELD, source)); - resourceExtractors.forEach(resourceExtractor -> - when(resourceExtractor.hasChildResourceChanges(event)).thenReturn(false)); - - service.sendChildrenEvent(event); - - verifyNoInteractions(messageProducer); - } - - @Test - void sendChildrenEvent_resourceSharing_noEvent() { - var event = resourceSharingEvent(); - resourceExtractors.forEach(resourceExtractor -> - when(resourceExtractor.hasChildResourceChanges(event)).thenReturn(true)); - - service.sendChildrenEvent(event); - - verifyNoInteractions(messageProducer); - } - - @Test - void extractChildren() { - var event = new ResourceEvent(); - resourceExtractors.forEach(resourceExtractor -> - when(resourceExtractor.prepareEvents(event)).thenReturn(List.of(new ResourceEvent(), new ResourceEvent()))); - - var result = service.extractChildren(event); - - assertThat(result).hasSize(6); - } - - @Test - void extractChildren_resourceSharing() { - var event = resourceSharingEvent(); - resourceExtractors.forEach(resourceExtractor -> - when(resourceExtractor.prepareEventsOnSharing(event)) - .thenReturn(List.of(new ResourceEvent(), new ResourceEvent()))); - - var result = service.extractChildren(event); - - assertThat(result).hasSize(6); - } - - @Test - void extractChildren_shadowInstance() { - var event = new ResourceEvent() - ._new(Map.of(SOURCE_FIELD, SOURCE_CONSORTIUM_PREFIX + "MARC")); - - var result = service.extractChildren(event); - - assertThat(result).isEmpty(); - resourceExtractors.forEach(Mockito::verifyNoInteractions); + service = new InstanceChildrenResourceService(resourceExtractors, consortiumTenantProvider); } @ParameterizedTest @@ -173,11 +73,4 @@ void persistChildrenOnReindex(boolean shared) { resourceExtractors.forEach(resourceExtractor -> verify(resourceExtractor).persistChildren(shared, expectedEvents)); } - - private ResourceEvent resourceSharingEvent() { - return new ResourceEvent() - .type(ResourceEventType.UPDATE) - ._new(Map.of(SOURCE_FIELD, SOURCE_CONSORTIUM_PREFIX + "MARC")) - .old(Map.of(SOURCE_FIELD, "MARC")); - } } diff --git a/src/test/java/org/folio/search/service/SearchTenantServiceTest.java b/src/test/java/org/folio/search/service/system/SearchTenantServiceTest.java similarity index 94% rename from src/test/java/org/folio/search/service/SearchTenantServiceTest.java rename to src/test/java/org/folio/search/service/system/SearchTenantServiceTest.java index 453abd39d..3fcce098e 100644 --- a/src/test/java/org/folio/search/service/SearchTenantServiceTest.java +++ b/src/test/java/org/folio/search/service/system/SearchTenantServiceTest.java @@ -1,4 +1,4 @@ -package org.folio.search.service; +package org.folio.search.service.system; import static org.folio.search.model.types.ResourceType.INSTANCE_SUBJECT; import static org.folio.search.model.types.ResourceType.UNKNOWN; @@ -20,9 +20,12 @@ import java.util.Set; import org.folio.search.configuration.properties.SearchConfigurationProperties; import org.folio.search.domain.dto.LanguageConfig; +import org.folio.search.model.entity.TenantEntity; +import org.folio.search.service.IndexService; import org.folio.search.service.browse.CallNumberBrowseRangeService; import org.folio.search.service.consortium.LanguageConfigServiceDecorator; import org.folio.search.service.metadata.ResourceDescriptionService; +import org.folio.search.service.reindex.jdbc.TenantRepository; import org.folio.spring.FolioExecutionContext; import org.folio.spring.FolioModuleMetadata; import org.folio.spring.liquibase.FolioSpringLiquibase; @@ -76,6 +79,8 @@ public String getDBSchemaName(String tenantId) { private FolioSpringLiquibase folioSpringLiquibase; @Mock private JdbcTemplate jdbcTemplate; + @Mock + private TenantRepository tenantRepository; @Test void createOrUpdateTenant_positive() { @@ -89,6 +94,7 @@ void createOrUpdateTenant_positive() { searchTenantService.createOrUpdateTenant(tenantAttributes()); + verify(tenantRepository).saveTenant(new TenantEntity(TENANT_ID, null, true)); verify(languageConfigService).create(new LanguageConfig().code("eng")); verify(indexService).createIndexIfNotExist(UNKNOWN, TENANT_ID); verify(indexService, never()).reindexInventory(TENANT_ID, null); @@ -105,6 +111,7 @@ void createOrUpdateTenant_positive_onlyKafkaAndSystemUserWhenConsortiumMemberTen searchTenantService.createOrUpdateTenant(tenantAttributes().addParametersItem(centralTenantParameter())); + verify(tenantRepository).saveTenant(new TenantEntity(TENANT_ID, CENTRAL_TENANT_ID, true)); verifyNoInteractions(languageConfigService); verifyNoInteractions(indexService); verify(kafkaAdminService).createTopics(TENANT_ID); @@ -192,6 +199,7 @@ void deleteTenant_positive() { searchTenantService.deleteTenant(tenantAttributes()); + verify(tenantRepository).saveTenant(new TenantEntity(TENANT_ID, null, false)); verify(jdbcTemplate).execute(anyString()); verify(callNumberBrowseRangeService).evictRangeCache(TENANT_ID); verify(indexService).dropIndex(UNKNOWN, TENANT_ID); @@ -204,6 +212,7 @@ void deleteTenant_positive_onlyDeleteKafkaTopicsWhenConsortiumMemberTenant() { searchTenantService.deleteTenant(tenantAttributes().addParametersItem(centralTenantParameter())); + verify(tenantRepository).saveTenant(new TenantEntity(TENANT_ID, CENTRAL_TENANT_ID, false)); verify(kafkaAdminService).deleteTopics(TENANT_ID); verifyNoInteractions(jdbcTemplate); verifyNoInteractions(callNumberBrowseRangeService); diff --git a/src/test/java/org/folio/search/utils/TestConstants.java b/src/test/java/org/folio/search/utils/TestConstants.java index a4944ea9b..4f1b15291 100644 --- a/src/test/java/org/folio/search/utils/TestConstants.java +++ b/src/test/java/org/folio/search/utils/TestConstants.java @@ -1,6 +1,5 @@ package org.folio.search.utils; -import static org.folio.search.configuration.kafka.KafkaConfiguration.SearchTopic.INDEX_SUB_RESOURCE; import static org.folio.search.utils.TestUtils.randomId; import static org.folio.spring.config.properties.FolioEnvironment.getFolioEnvName; @@ -64,10 +63,6 @@ public static String inventoryInstanceTopic(String tenantId) { return getTopicName(tenantId, INVENTORY_INSTANCE_TOPIC); } - public static String instanceSubResourceTopic() { - return getTopicName(TENANT_ID, INDEX_SUB_RESOURCE.topicName()); - } - public static String reindexRangeIndexTopic(String tenantId) { return getTopicName(tenantId, REINDEX_RANGE_INDEX_TOPIC); } diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index 3b3adbdc9..a9a01a4cc 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -44,6 +44,9 @@ server.port: 8081 folio: environment: folio-test + system: + schemaName: mod_search__system + change-log: classpath:changelog/changelog-system.xml search-config: initial-languages: eng,fre,ita,spa,ger max-search-batch-request-ids-count: ${MAX_SEARCH_BATCH_REQUEST_IDS_COUNT:500} @@ -54,6 +57,8 @@ folio: browse-classifications: true indexing: data-format: ${INDEXING_DATA_FORMAT:json} + instance-children-index-delay-ms: 2000 + instance-children-index-batch-size: 500 reindex: location-batch-size: ${REINDEX_LOCATION_BATCH_SIZE:2} query: @@ -123,9 +128,6 @@ folio: - name: inventory.reindex-records numPartitions: 1 replicationFactor: 1 - - name: search.index.sub-resource - numPartitions: 1 - replicationFactor: 1 listener: events: concurrency: 2 @@ -161,12 +163,6 @@ folio: concurrency: 1 topic-pattern: (${folio.environment}\.)(.*\.)inventory\.reindex-records group-id: ${folio.environment}-mod-search-reindex-records-group - index-sub-resource: - concurrency: 2 - topic-pattern: (${folio.environment}\.)(.*\.)search\.index\.sub-resource - group-id: ${folio.environment}-mod-search-index-sub-resource-group - max-poll-records: ${KAFKA_CONSUMER_INDEX_SUB_RESOURCE_MAX_POLL_RECORDS:5} - max-poll-interval-ms: ${KAFKA_CONSUMER_INDEX_SUB_RESOURCE_MAX_POLL_INTERVAL_MS:60000} okapiUrl: ${okapi.url} logging: request: