Skip to content

Commit

Permalink
- test
Browse files Browse the repository at this point in the history
  • Loading branch information
psmagin committed Sep 2, 2024
1 parent 737ee73 commit 9f64d97
Show file tree
Hide file tree
Showing 20 changed files with 186 additions and 313 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,10 @@
<sourceDirectory>${project.build.sourceDirectory}</sourceDirectory>
<sourceDirectory>${project.build.testSourceDirectory}</sourceDirectory>
</sourceDirectories>
<failsOnError>true</failsOnError>
<failsOnError>false</failsOnError>
<violationSeverity>warning</violationSeverity>
<failOnViolation>true</failOnViolation>
<logViolationsToConsole>true</logViolationsToConsole>
<failOnViolation>false</failOnViolation>
<logViolationsToConsole>false</logViolationsToConsole>
<configLocation>checkstyle/checkstyle.xml</configLocation>
<cacheFile>${basedir}/target/cachefile</cacheFile>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,32 +129,6 @@ public void handleSubjectEvents(List<ConsumerRecord<String, ResourceEvent>> cons
indexResources(batch, resourceService::indexResources);
}

/**
* Handles consortium instance events and indexes them using event body.
*
* @param consumerRecords - list of consumer records from Apache Kafka to process.
*/
@KafkaListener(
id = KafkaConstants.CONSORTIUM_INSTANCE_LISTENER_ID,
containerFactory = "consortiumListenerContainerFactory",
groupId = "#{folioKafkaProperties.listener['consortium-instance'].groupId}",
concurrency = "#{folioKafkaProperties.listener['consortium-instance'].concurrency}",
topicPattern = "#{folioKafkaProperties.listener['consortium-instance'].topicPattern}")
public void handleConsortiumInstanceEvents(List<ConsumerRecord<String, ConsortiumInstanceEvent>> consumerRecords) {
log.info("Processing consortium instance events from Kafka [number of events: {}]", consumerRecords.size());
var batch = consumerRecords.stream()
.map(ConsumerRecord::value)
.toList();

var batchByTenant = batch.stream().collect(Collectors.groupingBy(ConsortiumInstanceEvent::getTenant));

batchByTenant.forEach((tenant, resourceEvents) -> executionService.executeSystemUserScoped(tenant, () -> {
folioMessageBatchProcessor.consumeBatchWithFallback(batch, KAFKA_RETRY_TEMPLATE_NAME,
resourceService::indexConsortiumInstances, KafkaMessageListener::logFailedConsortiumEvent);
return null;
}));
}

@KafkaListener(
id = KafkaConstants.CLASSIFICATION_TYPE_LISTENER_ID,
containerFactory = "resourceListenerContainerFactory",
Expand Down
62 changes: 7 additions & 55 deletions src/main/java/org/folio/search/service/ResourceService.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.folio.search.model.types.IndexActionType.DELETE;
import static org.folio.search.model.types.IndexActionType.INDEX;
import static org.folio.search.utils.LogUtils.collectionToLogMsg;
Expand All @@ -20,30 +19,23 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.folio.search.domain.dto.FolioIndexOperationResponse;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.domain.dto.ResourceEventType;
import org.folio.search.integration.KafkaMessageProducer;
import org.folio.search.integration.ResourceFetchService;
import org.folio.search.model.event.ConsortiumInstanceEvent;
import org.folio.search.model.index.SearchDocumentBody;
import org.folio.search.model.metadata.ResourceDescription;
import org.folio.search.model.metadata.ResourceIndexingConfiguration;
import org.folio.search.model.types.IndexActionType;
import org.folio.search.model.types.ResourceType;
import org.folio.search.repository.IndexNameProvider;
import org.folio.search.repository.IndexRepository;
import org.folio.search.repository.PrimaryResourceRepository;
import org.folio.search.repository.ResourceRepository;
import org.folio.search.service.consortium.ConsortiumInstanceService;
import org.folio.search.service.consortium.ConsortiumTenantExecutor;
import org.folio.search.service.consortium.ConsortiumTenantService;
import org.folio.search.service.converter.MultiTenantSearchDocumentConverter;
import org.folio.search.service.converter.preprocessor.InstanceEventPreProcessor;
import org.folio.search.service.metadata.ResourceDescriptionService;
Expand All @@ -58,16 +50,12 @@ public class ResourceService {
private static final String INSTANCE_ID_FIELD = "instanceId";

private final KafkaMessageProducer messageProducer;
private final IndexRepository indexRepository;
private final ResourceFetchService resourceFetchService;
private final PrimaryResourceRepository primaryResourceRepository;
private final ResourceDescriptionService resourceDescriptionService;
private final MultiTenantSearchDocumentConverter multiTenantSearchDocumentConverter;
private final Map<String, ResourceRepository> resourceRepositoryBeans;
private final ConsortiumTenantService consortiumTenantService;
private final ConsortiumTenantExecutor consortiumTenantExecutor;
private final ConsortiumInstanceService consortiumInstanceService;
private final IndexNameProvider indexNameProvider;
private final InstanceEventPreProcessor instanceEventPreProcessor;

/**
Expand Down Expand Up @@ -118,36 +106,6 @@ public FolioIndexOperationResponse indexInstancesById(List<ResourceEvent> resour
return bulkIndexResponse;
}

public FolioIndexOperationResponse indexConsortiumInstances(List<ConsortiumInstanceEvent> consortiumInstances) {
if (CollectionUtils.isEmpty(consortiumInstances)) {
return getSuccessIndexOperationResponse();
}

var validConsortiumInstances = consortiumInstances.stream()
.filter(event -> consortiumTenantService.getCentralTenant(event.getTenant()).isPresent())
.distinct()
.toList();

if (log.isDebugEnabled()) {
var invalidInstances = ListUtils.subtract(consortiumInstances, validConsortiumInstances);
log.debug("Skip indexing consortium instances [{}]", invalidInstances);
}

var centralTenant = consortiumTenantService.getCentralTenant(validConsortiumInstances.get(0).getTenant())
.orElseThrow(() -> new IllegalStateException("Central tenant must exist"));

var instanceIds = validConsortiumInstances.stream().map(ConsortiumInstanceEvent::getInstanceId).collect(toSet());

return consortiumTenantExecutor.execute(centralTenant, () -> {
var resourceEvents = consortiumInstanceService.fetchInstances(instanceIds);
var indexDocuments = multiTenantSearchDocumentConverter.convert(resourceEvents);
var bulkIndexResponse = indexSearchDocuments(indexDocuments);
log.info("Records indexed to central index [requests: {}{}]",
getNumberOfRequests(indexDocuments), getErrorMessage(bulkIndexResponse));
return bulkIndexResponse;
});
}

private List<ResourceEvent> getEventsToIndex(List<ResourceEvent> events) {
return events;
}
Expand All @@ -159,33 +117,27 @@ private Map<String, List<SearchDocumentBody>> processIndexInstanceEvents(List<Re
messageProducer.prepareAndSendContributorEvents(fetchedInstances);
messageProducer.prepareAndSendSubjectEvents(fetchedInstances);

var list = preProcessEvents(fetchedInstances, consortiumInstanceService::saveInstances);
return multiTenantSearchDocumentConverter.convert(list);
// var list = preProcessEvents(fetchedInstances);
return multiTenantSearchDocumentConverter.convert(fetchedInstances);
}

private List<ResourceEvent> preProcessEvents(List<ResourceEvent> instanceEvents,
UnaryOperator<List<ResourceEvent>> consortiumFunc) {
private List<ResourceEvent> preProcessEvents(List<ResourceEvent> instanceEvents) {
if (instanceEvents == null) {
instanceEvents = Collections.emptyList();
}
var list = instanceEvents.stream()

return instanceEvents.stream()
.map(event -> consortiumTenantExecutor.execute(() -> instanceEventPreProcessor.preProcess(event)))
.filter(Objects::nonNull)
.flatMap(List::stream)
.collect(toList());

var eventsToIndex = consortiumFunc.apply(instanceEvents);
if (eventsToIndex != null) {
list.addAll(eventsToIndex);
}
return list;
}

private Map<String, List<SearchDocumentBody>> processDeleteInstanceEvents(List<ResourceEvent> deleteEvents) {
messageProducer.prepareAndSendContributorEvents(deleteEvents);
messageProducer.prepareAndSendSubjectEvents(deleteEvents);
var list = preProcessEvents(deleteEvents, consortiumInstanceService::deleteInstances);
return multiTenantSearchDocumentConverter.convert(list);
// var list = preProcessEvents(deleteEvents);
return multiTenantSearchDocumentConverter.convert(deleteEvents);
}

private FolioIndexOperationResponse indexSearchDocuments(Map<String, List<SearchDocumentBody>> eventsByResource) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package org.folio.search.service.consortium;

import static java.util.Collections.emptyList;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.commons.lang3.StringUtils.SPACE;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.wrap;
import static org.folio.search.utils.JdbcUtils.getFullTableName;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.folio.search.model.Pair;
import org.folio.search.model.service.ConsortiumSearchContext;
Expand All @@ -22,54 +19,48 @@
public class ConsortiumSearchQueryBuilder {

static final String CONSORTIUM_INSTANCE_TABLE_NAME = "consortium_instance";
static final String HOLDING_TABLE_NAME = "holding";
static final String ITEM_TABLE_NAME = "item";

public static final Map<ResourceType, String> CONSORTIUM_TABLES = Map.of(
ResourceType.INSTANCE, CONSORTIUM_INSTANCE_TABLE_NAME,
ResourceType.HOLDINGS, CONSORTIUM_INSTANCE_TABLE_NAME,
ResourceType.ITEM, CONSORTIUM_INSTANCE_TABLE_NAME
ResourceType.HOLDINGS, HOLDING_TABLE_NAME,
ResourceType.ITEM, ITEM_TABLE_NAME
);
private static final Map<ResourceType, List<String>> RESOURCE_FIELDS = Map.of(
ResourceType.HOLDINGS,
List.of("id", "hrid", "callNumberPrefix", "callNumber", "callNumberSuffix",
List.of("hrid", "callNumberPrefix", "callNumber", "callNumberSuffix",
"copyNumber", "permanentLocationId", "discoverySuppress"),
ResourceType.ITEM,
List.of("id", "hrid", "holdingsRecordId", "barcode")
List.of("hrid", "holdingsRecordId", "barcode")
);

private static final Map<ResourceType, Map<String, String>> RESOURCE_FILTER_DATABASE_NAME = Map.of(
ResourceType.HOLDINGS, Map.of("instanceId", "instance_id", "tenantId", "tenant_id"),
ResourceType.ITEM, Map.of("instanceId", "instance_id", "tenantId", "tenant_id")
ResourceType.ITEM, Map.of("instanceId", "instance_id", "tenantId", "tenant_id", "holdingsRecordId", "holding_id")
);

private static final Map<ResourceType, List<String>> RESOURCE_JSONB_FILTERS = Map.of(
ResourceType.ITEM, List.of("holdingsRecordId")
private static final Map<String, String> COLUMN_CASTS = Map.of(
"instance_id", "uuid",
"holding_id", "uuid"
);

private static final Map<ResourceType, String> RESOURCE_COLLECTION_NAME = Map.of(
ResourceType.HOLDINGS, "holdings",
ResourceType.ITEM, "items"
);
private final ConsortiumSearchContext searchContext;
private final ResourceType resourceType;
private final List<Pair<String, String>> filters;
private final List<Pair<String, String>> jsonbFilters;

public ConsortiumSearchQueryBuilder(ConsortiumSearchContext searchContext) {
this.searchContext = searchContext;
this.resourceType = searchContext.getResourceType();
this.filters = prepareFilters(resourceType, emptyList(), RESOURCE_JSONB_FILTERS.get(resourceType));
this.jsonbFilters = prepareFilters(resourceType, RESOURCE_JSONB_FILTERS.get(resourceType),
RESOURCE_FILTER_DATABASE_NAME.get(resourceType).values());
this.filters = prepareFilters(resourceType);
}

public String buildSelectQuery(FolioExecutionContext context) {
var fullTableName = getFullTableName(context, CONSORTIUM_TABLES.get(resourceType));
var resourceCollection = RESOURCE_COLLECTION_NAME.get(resourceType);
String subQuery = "SELECT instance_id, tenant_id, json_array_elements(json -> '" + resourceCollection + "') "
+ "as " + resourceCollection + " FROM " + fullTableName + SPACE + getWhereClause(filters, null);
String query = "SELECT i.instance_id as instanceId, i.tenant_id as tenantId,"
+ getSelectors("i." + resourceCollection, RESOURCE_FIELDS.get(resourceType))
+ " FROM (" + subQuery + ") i"
+ getWhereClause(jsonbFilters, "i." + resourceCollection)
String query = "SELECT i.id as id, i.instance_id as instanceId, i.tenant_id as tenantId,"
+ getSelectors("i.json", RESOURCE_FIELDS.get(resourceType))
+ " FROM " + fullTableName + " i"
+ getWhereClause(filters, null)
+ getOrderByClause()
+ getLimitClause()
+ getOffsetClause();
Expand All @@ -78,16 +69,13 @@ public String buildSelectQuery(FolioExecutionContext context) {

public String buildCountQuery(FolioExecutionContext context) {
var fullTableName = getFullTableName(context, CONSORTIUM_TABLES.get(resourceType));
var resourceCollection = RESOURCE_COLLECTION_NAME.get(resourceType);
String subQuery = "SELECT instance_id, tenant_id, json_array_elements(json -> '" + resourceCollection + "') "
+ "as " + resourceCollection + " FROM " + fullTableName + SPACE + getWhereClause(filters, null);
String query = "SELECT count(*) FROM (" + subQuery + ") i"
+ getWhereClause(jsonbFilters, "i." + resourceCollection);
String query = "SELECT count(*) FROM " + fullTableName + " i"
+ getWhereClause(filters, null);
return StringUtils.normalizeSpace(query);
}

public Object[] getQueryArguments() {
return Stream.concat(filters.stream(), jsonbFilters.stream())
return filters.stream()
.map(Pair::getSecond)
.toArray();
}
Expand Down Expand Up @@ -136,13 +124,16 @@ private String getWhereClause(List<Pair<String, String>> filters, String source)
var conditionsClause = filters.stream()
.map(filter -> (StringUtils.isNotBlank(source)
? getJsonSelector(source, filter.getFirst())
: filter.getFirst()) + " = ?")
: filter.getFirst()) + " = ?" + getCast(filter.getFirst()))
.collect(Collectors.joining(" AND "));
return conditionsClause.isBlank() ? conditionsClause : wrapped("WHERE " + conditionsClause);
}

private List<Pair<String, String>> prepareFilters(ResourceType resourceType,
List<String> includeFilters, Collection<String> excludeFilters) {
private String getCast(String column) {
return Optional.ofNullable(COLUMN_CASTS.get(column)).map(cast -> "::" + cast).orElse(EMPTY);
}

private List<Pair<String, String>> prepareFilters(ResourceType resourceType) {
var mappedFilterNames = RESOURCE_FILTER_DATABASE_NAME.get(resourceType);
return searchContext.getFilters().stream()
.map(filter -> {
Expand All @@ -151,15 +142,6 @@ private List<Pair<String, String>> prepareFilters(ResourceType resourceType,
}
return filter;
})
.filter(filter -> {
if (CollectionUtils.isNotEmpty(includeFilters)) {
return includeFilters.contains(filter.getFirst());
}
if (CollectionUtils.isNotEmpty(excludeFilters)) {
return !excludeFilters.contains(filter.getFirst());
}
return true;
})
.toList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
public class HoldingRepository extends MergeRangeRepository {

private static final String INSERT_SQL = """
INSERT INTO %s (id, tenant_id, instance_id, holding_json)
INSERT INTO %s (id, tenant_id, instance_id, json)
VALUES (?::uuid, ?, ?::uuid, ?::jsonb)
ON CONFLICT (id, tenant_id)
DO UPDATE SET
instance_id = EXCLUDED.instance_id,
holding_json = EXCLUDED.holding_json;
json = EXCLUDED.json;
""";

protected HoldingRepository(JdbcTemplate jdbcTemplate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
public class ItemRepository extends MergeRangeRepository {

private static final String INSERT_SQL = """
INSERT INTO %s (id, tenant_id, instance_id, holding_id, item_json)
INSERT INTO %s (id, tenant_id, instance_id, holding_id, json)
VALUES (?::uuid, ?, ?::uuid, ?::uuid, ?::jsonb)
ON CONFLICT (id, tenant_id)
DO UPDATE SET
instance_id = EXCLUDED.instance_id,
holding_id = EXCLUDED.holding_id,
item_json = EXCLUDED.item_json;
json = EXCLUDED.json;
""";

protected ItemRepository(JdbcTemplate jdbcTemplate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
public class MergeInstanceRepository extends MergeRangeRepository {

private static final String INSERT_SQL = """
INSERT INTO %s (id, tenant_id, shared, is_bound_with, instance_json)
INSERT INTO %s (id, tenant_id, shared, is_bound_with, json)
VALUES (?::uuid, ?, ?, ?, ?::jsonb)
ON CONFLICT (id)
DO UPDATE SET shared = EXCLUDED.shared,
is_bound_with = EXCLUDED.is_bound_with,
instance_json = EXCLUDED.instance_json;
json = EXCLUDED.json;
""";

private final ConsortiumTenantProvider consortiumTenantProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
public class UploadInstanceRepository extends UploadRangeRepository {

private static final String SELECT_SQL_TEMPLATE = """
SELECT i.instance_json
SELECT i.json
|| jsonb_build_object('tenantId', i.tenant_id)
|| jsonb_build_object('shared', i.shared)
|| jsonb_build_object('isBoundWith', i.is_bound_with)
|| jsonb_build_object('holdings', COALESCE(jsonb_agg(DISTINCT h.holding_json || jsonb_build_object('tenantId', h.tenant_id)) FILTER (WHERE h.holding_json IS NOT NULL), '[]'::jsonb))
|| jsonb_build_object('items', COALESCE(jsonb_agg(it.item_json || jsonb_build_object('tenantId', it.tenant_id)) FILTER (WHERE it.item_json IS NOT NULL), '[]'::jsonb)) as json
|| jsonb_build_object('holdings', COALESCE(jsonb_agg(DISTINCT h.json || jsonb_build_object('tenantId', h.tenant_id)) FILTER (WHERE h.json IS NOT NULL), '[]'::jsonb))
|| jsonb_build_object('items', COALESCE(jsonb_agg(it.json || jsonb_build_object('tenantId', it.tenant_id)) FILTER (WHERE it.json IS NOT NULL), '[]'::jsonb)) as json
FROM %s i
LEFT JOIN %s h on h.instance_id = i.id
LEFT JOIN %s it on it.holding_id = h.id
Expand Down
Loading

0 comments on commit 9f64d97

Please sign in to comment.