Skip to content

Commit

Permalink
Merge branch 'master' into MSEARCH-906
Browse files Browse the repository at this point in the history
  • Loading branch information
viacheslavkol authored Dec 27, 2024
2 parents 907833c + f32d6af commit 4b53e12
Show file tree
Hide file tree
Showing 41 changed files with 812 additions and 832 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Move Instance sub-entities population from database trigger to code ([MSEARCH-887](https://folio-org.atlassian.net/browse/MSEARCH-887))
* Update reindex merge failed status only for failed entity type ([MSEARCH-909](https://folio-org.atlassian.net/browse/MSEARCH-909))
* Extend reindex range tables with status, fail_cause columns ([MSEARCH-870](https://folio-org.atlassian.net/browse/MSEARCH-870))
* Implement scheduled indexing for instance sub-resources ([MSEARCH-922](https://folio-org.atlassian.net/browse/MSEARCH-922))
* Implement endpoint to run merge reindex stage only for failed ranges ([MSEARCH-906](https://folio-org.atlassian.net/browse/MSEARCH-906))

### Bug fixes
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ and [Cross-cluster replication](https://docs.aws.amazon.com/opensearch-service/l
| REINDEX_MERGE_RANGE_PUBLISHER_RETRY_INTERVAL_MS | 1000 | The retry interval in ms for reindex merge range request publishing. |
| REINDEX_MERGE_RANGE_PUBLISHER_RETRY_ATTEMPTS | 5 | The maximum number of retries for reindex merge range request publishing. |
| MAX_SEARCH_BATCH_REQUEST_IDS_COUNT | 20_000 | Defines maximum batch request IDs count for searching consolidated items/holdings in consortium |
| INSTANCE_CHILDREN_INDEX_DELAY_MS | 60000 | Defines the delay for scheduler that indexes subjects/contributors/classifications in a background |

The module uses system user to communicate with other modules from Kafka consumers.
For production deployments you MUST specify the password for this system user via env variable:
Expand Down
35 changes: 10 additions & 25 deletions descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -1054,36 +1054,11 @@
"value": "",
"description": "Replication factor for `search.reindex.range-index` topic."
},
{
"name": "KAFKA_INDEX_SUB_RESOURCE_TOPIC_PARTITIONS",
"value": "50",
"description": "Amount of partitions for `search.index.sub-resource` topic."
},
{
"name": "KAFKA_INDEX_SUB_RESOURCE_TOPIC_REPLICATION_FACTOR",
"value": "",
"description": "Replication factor for `search.index.sub-resource` topic."
},
{
"name": "KAFKA_REINDEX_RECORDS_CONCURRENCY",
"value": "2",
"description": "Custom number of kafka concurrent threads for `inventory.reindex-records` message consuming."
},
{
"name": "KAFKA_INDEX_SUB_RESOURCE_CONCURRENCY",
"value": "2",
"description": "Custom number of kafka concurrent threads for `search.index.sub-resource` message consuming."
},
{
"name": "KAFKA_CONSUMER_INDEX_SUB_RESOURCE_MAX_POLL_RECORDS",
"value": "200",
"description": "Maximum number of records returned in a single call to poll() for instance sub-resource events."
},
{
"name": "KAFKA_CONSUMER_INDEX_SUB_RESOURCE_MAX_POLL_INTERVAL_MS",
"value": "600000",
"description": "Maximum processing time allowed for consumer on instance sub-resource events."
},
{
"name": "KAFKA_CONSUMER_MAX_POLL_RECORDS",
"value": "200",
Expand Down Expand Up @@ -1161,6 +1136,16 @@
"value": "smile",
"description": "Format for passing data to elasticsearch (json/smile)"
},
{
"name": "INSTANCE_CHILDREN_INDEX_DELAY_MS",
"value": "60000",
"description": "Scheduler delay for indexing subjects/contributors/classifications"
},
{
"name": "INSTANCE_CHILDREN_INDEX_BATCH_SIZE",
"value": "500",
"description": "Batch size for indexing subjects/contributors/classifications"
},
{
"name": "INITIAL_LANGUAGES",
"value": "eng",
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/folio/search/SearchApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
* Folio search application.
*/
@EnableCaching
@EnableScheduling
@EnableFeignClients
@SpringBootApplication
public class SearchApplication {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ public static class IndexingSettings {
* Data format to use for passing data to elasticsearch.
*/
private IndexingDataFormat dataFormat;

/**
* Scheduler delay for indexing subjects/contributors/classifications.
* */
private long instanceChildrenIndexDelayMs;

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.folio.search.configuration.properties;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
@ConfigurationProperties(prefix = "folio.system")
public class SystemProperties {

private String schemaName;
private String changeLog;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.logging.log4j.message.FormattedMessage;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.model.event.SubResourceEvent;
import org.folio.search.model.types.ResourceType;
import org.folio.search.service.ResourceService;
import org.folio.search.service.config.ConfigSynchronizationService;
Expand Down Expand Up @@ -70,31 +69,6 @@ public void handleInstanceEvents(List<ConsumerRecord<String, ResourceEvent>> con

}

/**
* Handles instance sub-resource events and indexes them after extracting from instance resource..
*
* @param consumerRecords - list of consumer records from Apache Kafka to process.
*/
@KafkaListener(
id = KafkaConstants.INSTANCE_SUB_RESOURCE_LISTENER_ID,
containerFactory = "subResourceListenerContainerFactory",
topicPattern = "#{folioKafkaProperties.listener['index-sub-resource'].topicPattern}",
groupId = "#{folioKafkaProperties.listener['index-sub-resource'].groupId}",
concurrency = "#{folioKafkaProperties.listener['index-sub-resource'].concurrency}")
public void handleInstanceChildrenEvents(List<ConsumerRecord<String, SubResourceEvent>> consumerRecords) {
log.info("Processing instance sub resource events from kafka events [number of events: {}]",
consumerRecords.size());
var batch = consumerRecords.stream()
.map(ConsumerRecord::value)
.toList();
var batchByTenant = batch.stream().collect(Collectors.groupingBy(ResourceEvent::getTenant));
batchByTenant.forEach((tenant, resourceEvents) -> executionService.executeSystemUserScoped(tenant, () -> {
folioMessageBatchProcessor.consumeBatchWithFallback(resourceEvents, KAFKA_RETRY_TEMPLATE_NAME,
resourceService::indexInstanceSubResources, KafkaMessageListener::logFailedEvent);
return null;
}));
}

/**
* Handles authority record events and indexes them using event body.
*
Expand Down

This file was deleted.

3 changes: 3 additions & 0 deletions src/main/java/org/folio/search/model/entity/TenantEntity.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package org.folio.search.model.entity;

public record TenantEntity(String id, String centralId, boolean active) { }
Original file line number Diff line number Diff line change
@@ -1,22 +1,13 @@
package org.folio.search.service;

import static java.util.Collections.singletonList;
import static org.apache.commons.lang3.StringUtils.startsWith;
import static org.folio.search.utils.SearchConverterUtils.getResourceSource;
import static org.folio.search.utils.SearchConverterUtils.isUpdateEventForResourceSharing;
import static org.folio.search.utils.SearchUtils.SOURCE_CONSORTIUM_PREFIX;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.domain.dto.ResourceEventType;
import org.folio.search.model.event.SubResourceEvent;
import org.folio.search.service.consortium.ConsortiumTenantProvider;
import org.folio.search.service.converter.preprocessor.extractor.ChildResourceExtractor;
import org.folio.spring.tools.kafka.FolioMessageProducer;
import org.springframework.stereotype.Component;

/**
Expand All @@ -28,63 +19,9 @@
@RequiredArgsConstructor
public class InstanceChildrenResourceService {

private final FolioMessageProducer<SubResourceEvent> messageProducer;
private final List<ChildResourceExtractor> resourceExtractors;
private final ConsortiumTenantProvider consortiumTenantProvider;

public void sendChildrenEvent(ResourceEvent event) {
var needChildrenEvent = false;
if (isUpdateEventForResourceSharing(event)) {
needChildrenEvent = resourceExtractors.stream()
.anyMatch(extractor -> !extractor.hasChildResourceChanges(event));
} else if (!startsWith(getResourceSource(event), SOURCE_CONSORTIUM_PREFIX)) {
needChildrenEvent = resourceExtractors.stream()
.anyMatch(extractor -> extractor.hasChildResourceChanges(event));
}

if (needChildrenEvent) {
var childEvent = SubResourceEvent.fromResourceEvent(event);
log.debug("sendChildrenEvent::Sending event for instance child entities processing");
if (log.isDebugEnabled()) {
log.debug("sendChildrenEvent::Sending event for instance child entities processing [{}]", event);
}
messageProducer.sendMessages(singletonList(childEvent));
} else {
log.debug("sendChildrenEvent::Not sending event for instance child entities processing");
if (log.isDebugEnabled()) {
log.debug("sendChildrenEvent::Not sending event for instance child entities processing [{}]", event);
}
}
}

public List<ResourceEvent> extractChildren(ResourceEvent event) {
log.debug("processChildren::Starting instance children event processing");
if (log.isDebugEnabled()) {
log.debug("processChildren::Starting instance children event processing [{}]", event);
}

var events = new LinkedList<ResourceEvent>();

if (isUpdateEventForResourceSharing(event)) {
for (var resourceExtractor : resourceExtractors) {
events.addAll(resourceExtractor.prepareEventsOnSharing(event));
}
} else if (startsWith(getResourceSource(event), SOURCE_CONSORTIUM_PREFIX)) {
log.debug(
"processChildren::Finished instance children event processing. No additional action for shadow instance.");
return events;
} else {
for (var resourceExtractor : resourceExtractors) {
events.addAll(resourceExtractor.prepareEvents(event));
}
}

if (log.isDebugEnabled()) {
log.debug("processChildren::Finished instance children event processing. Events after: [{}], ", events);
}
return events;
}

public void persistChildren(String tenantId, List<ResourceEvent> events) {
var shared = consortiumTenantProvider.isCentralTenant(tenantId);
resourceExtractors.forEach(resourceExtractor -> resourceExtractor.persistChildren(shared, events));
Expand Down
21 changes: 0 additions & 21 deletions src/main/java/org/folio/search/service/ResourceService.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import static org.folio.search.utils.SearchUtils.getNumberOfRequests;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -28,7 +27,6 @@
import org.folio.search.domain.dto.FolioIndexOperationResponse;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.domain.dto.ResourceEventType;
import org.folio.search.model.event.SubResourceEvent;
import org.folio.search.model.index.SearchDocumentBody;
import org.folio.search.model.metadata.ResourceDescription;
import org.folio.search.model.metadata.ResourceIndexingConfiguration;
Expand Down Expand Up @@ -79,16 +77,6 @@ public FolioIndexOperationResponse indexResources(List<ResourceEvent> resourceEv
return bulkIndexResponse;
}

public FolioIndexOperationResponse indexInstanceSubResources(List<SubResourceEvent> events) {
var childEvents = events.stream()
.map(instanceChildrenResourceService::extractChildren)
.flatMap(Collection::stream)
.distinct()
.toList();

return indexResources(childEvents);
}

/**
* Index list of resource id event to elasticsearch.
*
Expand Down Expand Up @@ -121,19 +109,10 @@ private Map<String, List<SearchDocumentBody>> processIndexInstanceEvents(List<Re
.filter(Objects::nonNull)
.toList();

preProcessEvents(fetchedInstances);
return searchDocumentConverter.convert(fetchedInstances);
}

private void preProcessEvents(List<ResourceEvent> instanceEvents) {
instanceEvents.forEach(event -> consortiumTenantExecutor.run(
() -> instanceChildrenResourceService.sendChildrenEvent(event)));
}

private Map<String, List<SearchDocumentBody>> processDeleteInstanceEvents(List<ResourceEvent> deleteEvents) {
if (deleteEvents != null) {
preProcessEvents(deleteEvents);
}
return searchDocumentConverter.convert(deleteEvents);
}

Expand Down
Loading

0 comments on commit 4b53e12

Please sign in to comment.