diff --git a/NEWS.md b/NEWS.md
index f00daa9e8..825dad28f 100644
--- a/NEWS.md
+++ b/NEWS.md
@@ -8,6 +8,9 @@
### Features
* Move Instance sub-entities population from database trigger to code ([MSEARCH-887](https://folio-org.atlassian.net/browse/MSEARCH-887))
+* Update reindex merge failed status only for failed entity type ([MSEARCH-909](https://folio-org.atlassian.net/browse/MSEARCH-909))
+* Extend reindex range tables with status, fail_cause columns ([MSEARCH-870](https://folio-org.atlassian.net/browse/MSEARCH-870))
+* Implement scheduled indexing for instance sub-resources ([MSEARCH-922](https://folio-org.atlassian.net/browse/MSEARCH-922))
* Call Numbers Browse: Implement Database Structure and Logic for Managing Call Numbers ([MSEARCH-862](https://folio-org.atlassian.net/browse/MSEARCH-862))
* Call Numbers Browse: Implement Call Number Browse Config ([MSEARCH-863](https://folio-org.atlassian.net/browse/MSEARCH-863))
* Call Numbers Browse: Implement Indexing and Re-indexing Mechanisms for Call-Numbers ([MSEARCH-864](https://folio-org.atlassian.net/browse/MSEARCH-864))
@@ -21,6 +24,7 @@
* Fix old browse config returned on get after upsert ([MSEARCH-897](https://folio-org.atlassian.net/browse/MSEARCH-897))
* Fix generation of IDs ranges in Reindex Upload for Subject, Classification and Contributor ([MSEARCH-907](https://folio-org.atlassian.net/browse/MSEARCH-907))
* Remove browse config caching ([MSEARCH-897](https://folio-org.atlassian.net/browse/MSEARCH-897))
+* Fix the "Invalid reference" appears after updating ownership ([MSEARCH-915](https://folio-org.atlassian.net/browse/MSEARCH-915))
### Tech Dept
* Description ([ISSUE](https://folio-org.atlassian.net/browse/ISSUE))
diff --git a/README.md b/README.md
index 4b43af31c..23df34ac7 100644
--- a/README.md
+++ b/README.md
@@ -275,6 +275,7 @@ and [Cross-cluster replication](https://docs.aws.amazon.com/opensearch-service/l
| REINDEX_MERGE_RANGE_PUBLISHER_RETRY_INTERVAL_MS | 1000 | The retry interval in ms for reindex merge range request publishing. |
| REINDEX_MERGE_RANGE_PUBLISHER_RETRY_ATTEMPTS | 5 | The maximum number of retries for reindex merge range request publishing. |
| MAX_SEARCH_BATCH_REQUEST_IDS_COUNT | 20_000 | Defines maximum batch request IDs count for searching consolidated items/holdings in consortium |
+| INSTANCE_CHILDREN_INDEX_DELAY_MS | 60000 | Defines the delay for scheduler that indexes subjects/contributors/classifications in a background |
The module uses system user to communicate with other modules from Kafka consumers.
For production deployments you MUST specify the password for this system user via env variable:
diff --git a/descriptors/ModuleDescriptor-template.json b/descriptors/ModuleDescriptor-template.json
index d2149cd99..a6becf5a1 100644
--- a/descriptors/ModuleDescriptor-template.json
+++ b/descriptors/ModuleDescriptor-template.json
@@ -1054,36 +1054,11 @@
"value": "",
"description": "Replication factor for `search.reindex.range-index` topic."
},
- {
- "name": "KAFKA_INDEX_SUB_RESOURCE_TOPIC_PARTITIONS",
- "value": "50",
- "description": "Amount of partitions for `search.index.sub-resource` topic."
- },
- {
- "name": "KAFKA_INDEX_SUB_RESOURCE_TOPIC_REPLICATION_FACTOR",
- "value": "",
- "description": "Replication factor for `search.index.sub-resource` topic."
- },
{
"name": "KAFKA_REINDEX_RECORDS_CONCURRENCY",
"value": "2",
"description": "Custom number of kafka concurrent threads for `inventory.reindex-records` message consuming."
},
- {
- "name": "KAFKA_INDEX_SUB_RESOURCE_CONCURRENCY",
- "value": "2",
- "description": "Custom number of kafka concurrent threads for `search.index.sub-resource` message consuming."
- },
- {
- "name": "KAFKA_CONSUMER_INDEX_SUB_RESOURCE_MAX_POLL_RECORDS",
- "value": "200",
- "description": "Maximum number of records returned in a single call to poll() for instance sub-resource events."
- },
- {
- "name": "KAFKA_CONSUMER_INDEX_SUB_RESOURCE_MAX_POLL_INTERVAL_MS",
- "value": "600000",
- "description": "Maximum processing time allowed for consumer on instance sub-resource events."
- },
{
"name": "KAFKA_CONSUMER_MAX_POLL_RECORDS",
"value": "200",
@@ -1161,6 +1136,16 @@
"value": "smile",
"description": "Format for passing data to elasticsearch (json/smile)"
},
+ {
+ "name": "INSTANCE_CHILDREN_INDEX_DELAY_MS",
+ "value": "60000",
+ "description": "Scheduler delay for indexing subjects/contributors/classifications"
+ },
+ {
+ "name": "INSTANCE_CHILDREN_INDEX_BATCH_SIZE",
+ "value": "500",
+ "description": "Batch size for indexing subjects/contributors/classifications"
+ },
{
"name": "INITIAL_LANGUAGES",
"value": "eng",
diff --git a/pom.xml b/pom.xml
index a0fe732c5..bce00a108 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.springframework.boot
spring-boot-starter-parent
- 3.3.5
+ 3.4.0
@@ -39,17 +39,17 @@
1.8.0-SNAPSHOT
35.3.0
2.18.0
- 1.6.2
- 2.17.0
+ 1.6.3
+ 2.18.0
4.4
2.9.5
- 1.18.34
+ 1.18.36
0.2.0
0.8.3
- 1.0.0
- 7.9.0
+ 1.0.1
+ 7.10.0
1.0.1
3.6.0
3.4.0
@@ -57,9 +57,9 @@
3.6.0
3.13.0
3.5.2
- 3.5.1
+ 3.5.2
3.1.1
- 10.20.0
+ 10.21.0
diff --git a/src/main/java/org/folio/search/SearchApplication.java b/src/main/java/org/folio/search/SearchApplication.java
index 988a76e63..ff189a559 100644
--- a/src/main/java/org/folio/search/SearchApplication.java
+++ b/src/main/java/org/folio/search/SearchApplication.java
@@ -4,11 +4,13 @@
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cloud.openfeign.EnableFeignClients;
+import org.springframework.scheduling.annotation.EnableScheduling;
/**
* Folio search application.
*/
@EnableCaching
+@EnableScheduling
@EnableFeignClients
@SpringBootApplication
public class SearchApplication {
diff --git a/src/main/java/org/folio/search/configuration/kafka/SubResourceKafkaConfiguration.java b/src/main/java/org/folio/search/configuration/kafka/SubResourceKafkaConfiguration.java
deleted file mode 100644
index e69b1b586..000000000
--- a/src/main/java/org/folio/search/configuration/kafka/SubResourceKafkaConfiguration.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package org.folio.search.configuration.kafka;
-
-import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG;
-import static org.folio.search.configuration.kafka.KafkaConfiguration.SearchTopic.INDEX_SUB_RESOURCE;
-
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import org.folio.search.model.event.SubResourceEvent;
-import org.folio.spring.tools.kafka.FolioMessageProducer;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-
-@Configuration
-@RequiredArgsConstructor
-public class SubResourceKafkaConfiguration extends KafkaConfiguration {
-
- private final KafkaProperties kafkaProperties;
-
- @Bean
- public ConcurrentKafkaListenerContainerFactory subResourceListenerContainerFactory(
- @Value("#{folioKafkaProperties.listener['index-sub-resource'].maxPollRecords}") Integer maxPollRecords,
- @Value("#{folioKafkaProperties.listener['index-sub-resource'].maxPollIntervalMs}") Integer maxPollIntervalMs) {
- var factory = new ConcurrentKafkaListenerContainerFactory();
- factory.setBatchListener(true);
- var deserializer = new JsonDeserializer<>(SubResourceEvent.class, false);
- var overrideProperties = Map.of(MAX_POLL_RECORDS_CONFIG, maxPollRecords,
- MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
- factory.setConsumerFactory(getConsumerFactory(deserializer, kafkaProperties, overrideProperties));
- return factory;
- }
-
- @Bean
- public KafkaTemplate subResourceKafkaTemplate() {
- return new KafkaTemplate<>(getProducerFactory(kafkaProperties));
- }
-
- @Bean
- public FolioMessageProducer subResourceMessageProducer() {
- return new FolioMessageProducer<>(subResourceKafkaTemplate(), INDEX_SUB_RESOURCE);
- }
-}
diff --git a/src/main/java/org/folio/search/configuration/properties/SearchConfigurationProperties.java b/src/main/java/org/folio/search/configuration/properties/SearchConfigurationProperties.java
index 919dde3e3..4218594f4 100644
--- a/src/main/java/org/folio/search/configuration/properties/SearchConfigurationProperties.java
+++ b/src/main/java/org/folio/search/configuration/properties/SearchConfigurationProperties.java
@@ -69,6 +69,12 @@ public static class IndexingSettings {
* Data format to use for passing data to elasticsearch.
*/
private IndexingDataFormat dataFormat;
+
+ /**
+ * Scheduler delay for indexing subjects/contributors/classifications.
+ * */
+ private long instanceChildrenIndexDelayMs;
+
}
}
diff --git a/src/main/java/org/folio/search/configuration/properties/SystemProperties.java b/src/main/java/org/folio/search/configuration/properties/SystemProperties.java
new file mode 100644
index 000000000..e94bda503
--- /dev/null
+++ b/src/main/java/org/folio/search/configuration/properties/SystemProperties.java
@@ -0,0 +1,18 @@
+package org.folio.search.configuration.properties;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+@Data
+@Component
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+@ConfigurationProperties(prefix = "folio.system")
+public class SystemProperties {
+
+ private String schemaName;
+ private String changeLog;
+}
diff --git a/src/main/java/org/folio/search/integration/message/KafkaMessageListener.java b/src/main/java/org/folio/search/integration/message/KafkaMessageListener.java
index 7ff89c605..ebcaeb7da 100644
--- a/src/main/java/org/folio/search/integration/message/KafkaMessageListener.java
+++ b/src/main/java/org/folio/search/integration/message/KafkaMessageListener.java
@@ -24,7 +24,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.logging.log4j.message.FormattedMessage;
import org.folio.search.domain.dto.ResourceEvent;
-import org.folio.search.model.event.SubResourceEvent;
import org.folio.search.model.types.ResourceType;
import org.folio.search.service.ResourceService;
import org.folio.search.service.config.ConfigSynchronizationService;
@@ -70,31 +69,6 @@ public void handleInstanceEvents(List> con
}
- /**
- * Handles instance sub-resource events and indexes them after extracting from instance resource..
- *
- * @param consumerRecords - list of consumer records from Apache Kafka to process.
- */
- @KafkaListener(
- id = KafkaConstants.INSTANCE_SUB_RESOURCE_LISTENER_ID,
- containerFactory = "subResourceListenerContainerFactory",
- topicPattern = "#{folioKafkaProperties.listener['index-sub-resource'].topicPattern}",
- groupId = "#{folioKafkaProperties.listener['index-sub-resource'].groupId}",
- concurrency = "#{folioKafkaProperties.listener['index-sub-resource'].concurrency}")
- public void handleInstanceChildrenEvents(List> consumerRecords) {
- log.info("Processing instance sub resource events from kafka events [number of events: {}]",
- consumerRecords.size());
- var batch = consumerRecords.stream()
- .map(ConsumerRecord::value)
- .toList();
- var batchByTenant = batch.stream().collect(Collectors.groupingBy(ResourceEvent::getTenant));
- batchByTenant.forEach((tenant, resourceEvents) -> executionService.executeSystemUserScoped(tenant, () -> {
- folioMessageBatchProcessor.consumeBatchWithFallback(resourceEvents, KAFKA_RETRY_TEMPLATE_NAME,
- resourceService::indexInstanceSubResources, KafkaMessageListener::logFailedEvent);
- return null;
- }));
- }
-
/**
* Handles authority record events and indexes them using event body.
*
diff --git a/src/main/java/org/folio/search/integration/message/interceptor/PopulateInstanceBatchInterceptor.java b/src/main/java/org/folio/search/integration/message/interceptor/PopulateInstanceBatchInterceptor.java
index a0454f427..a35c0a964 100644
--- a/src/main/java/org/folio/search/integration/message/interceptor/PopulateInstanceBatchInterceptor.java
+++ b/src/main/java/org/folio/search/integration/message/interceptor/PopulateInstanceBatchInterceptor.java
@@ -116,7 +116,7 @@ private void process(String tenant, List batch) {
.map(ResourceEvent::getId)
.toList();
if (!idsToDrop.isEmpty()) {
- repository.deleteEntities(idsToDrop);
+ deleteEntities(tenant, recordCollection.getKey(), repository, idsToDrop);
}
var noShadowCopiesInstanceEvents = recordByOperation.values().stream().flatMap(Collection::stream).toList();
@@ -127,6 +127,14 @@ private void process(String tenant, List batch) {
}
}
+ private void deleteEntities(String tenant, String resourceType, MergeRangeRepository repository, List ids) {
+ if (ResourceType.HOLDINGS.getName().equals(resourceType) || ResourceType.ITEM.getName().equals(resourceType)) {
+ repository.deleteEntitiesForTenant(ids, tenant);
+ } else {
+ repository.deleteEntities(ids);
+ }
+ }
+
private boolean isInstanceEvent(ResourceEvent event) {
var resourceName = event.getResourceName();
return ResourceType.INSTANCE.getName().equals(resourceName)
diff --git a/src/main/java/org/folio/search/model/entity/InstanceClassificationEntityAgg.java b/src/main/java/org/folio/search/model/entity/InstanceClassificationEntityAgg.java
deleted file mode 100644
index 423dcf5fe..000000000
--- a/src/main/java/org/folio/search/model/entity/InstanceClassificationEntityAgg.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.folio.search.model.entity;
-
-import java.util.Set;
-import org.folio.search.model.index.InstanceSubResource;
-
-public record InstanceClassificationEntityAgg(
- String id,
- String typeId,
- String number,
- Set instances
-) { }
diff --git a/src/main/java/org/folio/search/model/entity/TenantEntity.java b/src/main/java/org/folio/search/model/entity/TenantEntity.java
new file mode 100644
index 000000000..1eadb8bd1
--- /dev/null
+++ b/src/main/java/org/folio/search/model/entity/TenantEntity.java
@@ -0,0 +1,3 @@
+package org.folio.search.model.entity;
+
+public record TenantEntity(String id, String centralId, boolean active) { }
diff --git a/src/main/java/org/folio/search/model/reindex/MergeRangeEntity.java b/src/main/java/org/folio/search/model/reindex/MergeRangeEntity.java
index 856aad1b5..d095ce687 100644
--- a/src/main/java/org/folio/search/model/reindex/MergeRangeEntity.java
+++ b/src/main/java/org/folio/search/model/reindex/MergeRangeEntity.java
@@ -4,6 +4,7 @@
import java.util.UUID;
import lombok.Data;
import org.folio.search.model.types.ReindexEntityType;
+import org.folio.search.model.types.ReindexRangeStatus;
@Data
public class MergeRangeEntity {
@@ -15,6 +16,8 @@ public class MergeRangeEntity {
public static final String RANGE_UPPER_COLUMN = "upper";
public static final String CREATED_AT_COLUMN = "created_at";
public static final String FINISHED_AT_COLUMN = "finished_at";
+ public static final String STATUS_COLUMN = "status";
+ public static final String FAIL_CAUSE_COLUMN = "fail_cause";
private final UUID id;
private final ReindexEntityType entityType;
@@ -23,5 +26,7 @@ public class MergeRangeEntity {
private final String upperId;
private final Timestamp createdAt;
private Timestamp finishedAt;
+ private final ReindexRangeStatus status;
+ private final String failCause;
}
diff --git a/src/main/java/org/folio/search/model/reindex/UploadRangeEntity.java b/src/main/java/org/folio/search/model/reindex/UploadRangeEntity.java
index df264ee4b..7fcaef797 100644
--- a/src/main/java/org/folio/search/model/reindex/UploadRangeEntity.java
+++ b/src/main/java/org/folio/search/model/reindex/UploadRangeEntity.java
@@ -4,6 +4,7 @@
import java.util.UUID;
import lombok.Data;
import org.folio.search.model.types.ReindexEntityType;
+import org.folio.search.model.types.ReindexRangeStatus;
@Data
public class UploadRangeEntity {
@@ -14,6 +15,8 @@ public class UploadRangeEntity {
public static final String UPPER_BOUND_COLUMN = "upper";
public static final String CREATED_AT_COLUMN = "created_at";
public static final String FINISHED_AT_COLUMN = "finished_at";
+ public static final String STATUS_COLUMN = "status";
+ public static final String FAIL_CAUSE_COLUMN = "fail_cause";
private final UUID id;
private final ReindexEntityType entityType;
@@ -21,5 +24,7 @@ public class UploadRangeEntity {
private final String upper;
private final Timestamp createdAt;
private Timestamp finishedAt;
+ private final ReindexRangeStatus status;
+ private final String failCause;
}
diff --git a/src/main/java/org/folio/search/model/service/ResultList.java b/src/main/java/org/folio/search/model/service/ResultList.java
index 4928a26ba..1c1c34529 100644
--- a/src/main/java/org/folio/search/model/service/ResultList.java
+++ b/src/main/java/org/folio/search/model/service/ResultList.java
@@ -23,6 +23,8 @@ public class ResultList {
/**
* Paged result data.
*/
+ @JsonAlias({"classificationTypes", "identifierTypes", "alternativeTitleTypes", "callNumberTypes", "locations",
+ "loccamps", "loclibs", "locinsts"})
private List result = Collections.emptyList();
// The `key` is required per contract
diff --git a/src/main/java/org/folio/search/model/types/ReindexRangeStatus.java b/src/main/java/org/folio/search/model/types/ReindexRangeStatus.java
new file mode 100644
index 000000000..1f9279ad9
--- /dev/null
+++ b/src/main/java/org/folio/search/model/types/ReindexRangeStatus.java
@@ -0,0 +1,28 @@
+package org.folio.search.model.types;
+
+import lombok.Getter;
+
+@Getter
+public enum ReindexRangeStatus {
+ SUCCESS("Success"),
+ FAIL("Fail");
+
+ private final String value;
+
+ ReindexRangeStatus(String value) {
+ this.value = value;
+ }
+
+ public static ReindexRangeStatus valueOfNullable(String value) {
+ if (value == null) {
+ return null;
+ }
+
+ for (ReindexRangeStatus b : ReindexRangeStatus.values()) {
+ if (b.name().equalsIgnoreCase(value)) {
+ return b;
+ }
+ }
+ throw new IllegalArgumentException("Unexpected value '" + value + "'");
+ }
+}
diff --git a/src/main/java/org/folio/search/service/InstanceChildrenResourceService.java b/src/main/java/org/folio/search/service/InstanceChildrenResourceService.java
index 56b43f5d5..c44deedbd 100644
--- a/src/main/java/org/folio/search/service/InstanceChildrenResourceService.java
+++ b/src/main/java/org/folio/search/service/InstanceChildrenResourceService.java
@@ -1,24 +1,14 @@
package org.folio.search.service;
-import static java.util.Collections.emptyList;
-import static java.util.Collections.singletonList;
-import static org.apache.commons.lang3.StringUtils.startsWith;
-import static org.folio.search.utils.SearchConverterUtils.getResourceSource;
-import static org.folio.search.utils.SearchConverterUtils.isUpdateEventForResourceSharing;
-import static org.folio.search.utils.SearchUtils.SOURCE_CONSORTIUM_PREFIX;
-
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.extern.log4j.Log4j2;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.domain.dto.ResourceEventType;
-import org.folio.search.model.event.SubResourceEvent;
import org.folio.search.model.types.ResourceType;
import org.folio.search.service.consortium.ConsortiumTenantProvider;
import org.folio.search.service.converter.preprocessor.extractor.ChildResourceExtractor;
-import org.folio.spring.tools.kafka.FolioMessageProducer;
import org.springframework.stereotype.Component;
/**
@@ -29,83 +19,16 @@
@Component
public class InstanceChildrenResourceService {
- private final FolioMessageProducer messageProducer;
private final Map> resourceExtractors;
private final ConsortiumTenantProvider consortiumTenantProvider;
- public InstanceChildrenResourceService(FolioMessageProducer messageProducer,
- List resourceExtractors,
+ public InstanceChildrenResourceService(List resourceExtractors,
ConsortiumTenantProvider consortiumTenantProvider) {
- this.messageProducer = messageProducer;
this.resourceExtractors = resourceExtractors.stream()
.collect(Collectors.groupingBy(ChildResourceExtractor::resourceType));
this.consortiumTenantProvider = consortiumTenantProvider;
}
- public void sendChildrenEvent(ResourceEvent event) {
- var resourceType = ResourceType.byName(event.getResourceName());
- var extractors = resourceExtractors.get(resourceType);
- if (extractors == null) {
- return;
- }
- var needChildrenEvent = false;
- if (isUpdateEventForResourceSharing(event)) {
- needChildrenEvent = extractors.stream()
- .anyMatch(extractor -> !extractor.hasChildResourceChanges(event));
- } else if (!startsWith(getResourceSource(event), SOURCE_CONSORTIUM_PREFIX)) {
- needChildrenEvent = extractors.stream()
- .anyMatch(extractor -> extractor.hasChildResourceChanges(event));
- }
-
- if (needChildrenEvent) {
- var childEvent = SubResourceEvent.fromResourceEvent(event);
- log.debug("sendChildrenEvent::Sending event for instance child entities processing");
- if (log.isDebugEnabled()) {
- log.debug("sendChildrenEvent::Sending event for instance child entities processing [{}]", event);
- }
- messageProducer.sendMessages(singletonList(childEvent));
- } else {
- log.debug("sendChildrenEvent::Not sending event for instance child entities processing");
- if (log.isDebugEnabled()) {
- log.debug("sendChildrenEvent::Not sending event for instance child entities processing [{}]", event);
- }
- }
- }
-
- public List extractChildren(ResourceEvent event) {
- log.debug("processChildren::Starting instance children event processing");
- if (log.isDebugEnabled()) {
- log.debug("processChildren::Starting instance children event processing [{}]", event);
- }
-
- var resourceType = ResourceType.byName(event.getResourceName());
- var extractors = resourceExtractors.get(resourceType);
- if (extractors == null) {
- return emptyList();
- }
-
- var events = new LinkedList();
-
- if (isUpdateEventForResourceSharing(event)) {
- for (var resourceExtractor : extractors) {
- events.addAll(resourceExtractor.prepareEventsOnSharing(event));
- }
- } else if (startsWith(getResourceSource(event), SOURCE_CONSORTIUM_PREFIX)) {
- log.debug(
- "processChildren::Finished instance children event processing. No additional action for shadow instance.");
- return events;
- } else {
- for (var resourceExtractor : extractors) {
- events.addAll(resourceExtractor.prepareEvents(event));
- }
- }
-
- if (log.isDebugEnabled()) {
- log.debug("processChildren::Finished instance children event processing. Events after: [{}], ", events);
- }
- return events;
- }
-
public void persistChildren(String tenantId, ResourceType resourceType, List events) {
var extractors = resourceExtractors.get(resourceType);
if (extractors == null) {
diff --git a/src/main/java/org/folio/search/service/ResourceService.java b/src/main/java/org/folio/search/service/ResourceService.java
index 1124d54fc..cc1787e60 100644
--- a/src/main/java/org/folio/search/service/ResourceService.java
+++ b/src/main/java/org/folio/search/service/ResourceService.java
@@ -14,7 +14,6 @@
import static org.folio.search.utils.SearchUtils.getNumberOfRequests;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -28,7 +27,6 @@
import org.folio.search.domain.dto.FolioIndexOperationResponse;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.domain.dto.ResourceEventType;
-import org.folio.search.model.event.SubResourceEvent;
import org.folio.search.model.index.SearchDocumentBody;
import org.folio.search.model.metadata.ResourceDescription;
import org.folio.search.model.metadata.ResourceIndexingConfiguration;
@@ -79,16 +77,6 @@ public FolioIndexOperationResponse indexResources(List resourceEv
return bulkIndexResponse;
}
- public FolioIndexOperationResponse indexInstanceSubResources(List events) {
- var childEvents = events.stream()
- .map(instanceChildrenResourceService::extractChildren)
- .flatMap(Collection::stream)
- .distinct()
- .toList();
-
- return indexResources(childEvents);
- }
-
/**
* Index list of resource id event to elasticsearch.
*
@@ -125,15 +113,7 @@ private Map> processIndexInstanceEvents(List instanceEvents) {
- instanceEvents.forEach(event -> consortiumTenantExecutor.run(
- () -> instanceChildrenResourceService.sendChildrenEvent(event)));
- }
-
private Map> processDeleteInstanceEvents(List deleteEvents) {
- if (deleteEvents != null) {
- preProcessEvents(deleteEvents);
- }
return searchDocumentConverter.convert(deleteEvents);
}
diff --git a/src/main/java/org/folio/search/service/ScheduledInstanceSubResourcesService.java b/src/main/java/org/folio/search/service/ScheduledInstanceSubResourcesService.java
new file mode 100644
index 000000000..1c5f9c5d9
--- /dev/null
+++ b/src/main/java/org/folio/search/service/ScheduledInstanceSubResourcesService.java
@@ -0,0 +1,93 @@
+package org.folio.search.service;
+
+import static java.util.function.UnaryOperator.identity;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.collections4.MapUtils.getString;
+import static org.folio.search.utils.SearchUtils.ID_FIELD;
+
+import java.util.List;
+import java.util.Map;
+import lombok.extern.log4j.Log4j2;
+import org.folio.search.domain.dto.ResourceEvent;
+import org.folio.search.domain.dto.ResourceEventType;
+import org.folio.search.model.types.ReindexEntityType;
+import org.folio.search.service.reindex.ReindexConstants;
+import org.folio.search.service.reindex.jdbc.InstanceChildResourceRepository;
+import org.folio.search.service.reindex.jdbc.SubResourceResult;
+import org.folio.search.service.reindex.jdbc.SubResourcesLockRepository;
+import org.folio.search.service.reindex.jdbc.TenantRepository;
+import org.folio.search.service.reindex.jdbc.UploadRangeRepository;
+import org.folio.spring.service.SystemUserScopedExecutionService;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+@Log4j2
+@Service
+public class ScheduledInstanceSubResourcesService {
+
+ private final ResourceService resourceService;
+ private final TenantRepository tenantRepository;
+ private final Map repositories;
+ private final SubResourcesLockRepository subResourcesLockRepository;
+ private final SystemUserScopedExecutionService executionService;
+
+ public ScheduledInstanceSubResourcesService(ResourceService resourceService,
+ TenantRepository tenantRepository,
+ List repositories,
+ SubResourcesLockRepository subResourcesLockRepository,
+ SystemUserScopedExecutionService executionService) {
+ this.resourceService = resourceService;
+ this.tenantRepository = tenantRepository;
+ this.repositories = repositories.stream()
+ .filter(InstanceChildResourceRepository.class::isInstance)
+ .collect(toMap(UploadRangeRepository::entityType, identity()));
+ this.subResourcesLockRepository = subResourcesLockRepository;
+ this.executionService = executionService;
+ }
+
+ @Scheduled(fixedDelayString = "#{searchConfigurationProperties.indexing.instanceChildrenIndexDelayMs}")
+ public void persistChildren() {
+ log.info("persistChildren::Starting instance children processing");
+ tenantRepository.fetchDataTenantIds()
+ .forEach(tenant -> executionService.executeSystemUserScoped(tenant, () -> {
+ var entityTypes = repositories.keySet();
+ for (var entityType : entityTypes) {
+ var timestamp = subResourcesLockRepository.lockSubResource(entityType, tenant);
+ if (timestamp.isPresent()) {
+ SubResourceResult result = null;
+ try {
+ result = repositories.get(entityType).fetchByTimestamp(tenant, timestamp.get());
+ if (result.hasRecords()) {
+ var events = map(result.records(), entityType, tenant);
+ resourceService.indexResources(events);
+ }
+ } catch (Exception e) {
+ log.error("persistChildren::Error processing instance children", e);
+ } finally {
+ var lastUpdatedDate = result == null || result.lastUpdateDate() == null
+ ? timestamp.get()
+ : result.lastUpdateDate();
+ subResourcesLockRepository.unlockSubResource(entityType, lastUpdatedDate, tenant);
+ }
+ }
+ }
+ return null;
+ }));
+
+ log.debug("persistChildren::Finished instance children processing");
+ }
+
+ public List map(List