Skip to content

Commit

Permalink
- do indexation of consumed ranges
Browse files Browse the repository at this point in the history
  • Loading branch information
psmagin committed Aug 1, 2024
1 parent 65bc576 commit a372753
Show file tree
Hide file tree
Showing 21 changed files with 421 additions and 17 deletions.
1 change: 1 addition & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ services:
DB_HOST: ${DB_HOST}
DB_DATABASE: ${DB_DATABASE}
DB_PASSWORD: ${DB_PASSWORD}
OKAPI_URL: http://api-mock:8080
JAVA_OPTIONS: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:${DEBUG_PORT}"

api-mock:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected static <T> DefaultKafkaConsumerFactory<String, T> getConsumerFactory(J

public enum SearchTopic implements FolioKafkaTopic {
CONSORTIUM_INSTANCE("search.consortium.instance"),
REINDEX_RANGE_INDEX("search.reindex.range.index");
REINDEX_RANGE_INDEX("search.reindex.range-index");

private final String topicName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,48 @@
import static org.folio.search.configuration.kafka.KafkaConfiguration.SearchTopic.REINDEX_RANGE_INDEX;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.folio.search.model.event.ReindexRangeIndexEvent;
import org.folio.spring.tools.kafka.FolioKafkaProperties;
import org.folio.spring.tools.kafka.FolioMessageProducer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.util.backoff.FixedBackOff;

@Log4j2
@Configuration
@RequiredArgsConstructor
public class ReindexRangeIndexEventKafkaConfiguration extends KafkaConfiguration {

private final KafkaProperties kafkaProperties;

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ReindexRangeIndexEvent> rangeIndexListenerContainerFactory(
CommonErrorHandler commonErrorHandler) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, ReindexRangeIndexEvent>();
var deserializer = new JsonDeserializer<>(ReindexRangeIndexEvent.class, false);
factory.setConsumerFactory(getConsumerFactory(deserializer, kafkaProperties));
factory.setCommonErrorHandler(commonErrorHandler);
return factory;
}

@Bean
public CommonErrorHandler errorHandler(FolioKafkaProperties kafkaProperties) {
var backOff = new FixedBackOff(kafkaProperties.getRetryIntervalMs(), kafkaProperties.getRetryDeliveryAttempts());
return new DefaultErrorHandler((consumerRecord, exception) -> {
var message = new ParameterizedMessage("Error occurred while processing event=[{}], topic=[{}]",
consumerRecord.value(), consumerRecord.topic());
log.error(message, exception);
}, backOff);
}

@Bean
public KafkaTemplate<String, ReindexRangeIndexEvent> rangeIndexKafkaTemplate() {
return new KafkaTemplate<>(getProducerFactory(kafkaProperties));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.folio.search.exception;

public class ReindexException extends RuntimeException {

public ReindexException(String errorMessage) {
super(errorMessage);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.folio.search.integration;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.folio.search.model.event.ReindexRangeIndexEvent;
import org.folio.search.service.consortium.ConsortiumTenantExecutor;
import org.folio.search.service.reindex.ReindexService;
import org.folio.search.utils.KafkaConstants;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Log4j2
@Component
@RequiredArgsConstructor
public class ReindexKafkaListener {

private final ReindexService reindexService;
private final ConsortiumTenantExecutor executionService;
private final SystemUserScopedExecutionService systemUserScopedExecutionService;

@KafkaListener(
id = KafkaConstants.REINDEX_RANGE_INDEX_LISTENER_ID,
containerFactory = "rangeIndexListenerContainerFactory",
topicPattern = "#{folioKafkaProperties.listener['reindex-range-index'].topicPattern}",
groupId = "#{folioKafkaProperties.listener['reindex-range-index'].groupId}",
concurrency = "#{folioKafkaProperties.listener['reindex-range-index'].concurrency}")
public void handleInstanceEvents(ReindexRangeIndexEvent event) {
systemUserScopedExecutionService.executeSystemUserScoped(event.getTenant(),
() -> executionService.execute(() -> reindexService.process(event)));
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
package org.folio.search.model.event;

import java.util.UUID;
import lombok.Data;
import org.folio.search.model.types.ReindexEntityType;
import org.folio.spring.tools.kafka.BaseKafkaMessage;

@Data
public class ReindexRangeIndexEvent implements BaseKafkaMessage {

private UUID id;
private ReindexEntityType entityType;
private int offset;
private int limit;

private String tenant;
private String ts;
}

//{
// "id": "8b00efbd-07e0-48c1-a691-d259d9c67e37",
// "entityType": "SUBJECT",
// "offset": 0,
// "limit": 10,
// "tenant": "test_tenant"
//}

Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
package org.folio.search.service.reindex;

import static java.util.function.Function.identity;
import static org.apache.commons.collections4.MapUtils.getString;
import static org.folio.search.utils.SearchUtils.CONTRIBUTOR_RESOURCE;
import static org.folio.search.utils.SearchUtils.ID_FIELD;
import static org.folio.search.utils.SearchUtils.INSTANCE_CLASSIFICATION_RESOURCE;
import static org.folio.search.utils.SearchUtils.INSTANCE_RESOURCE;
import static org.folio.search.utils.SearchUtils.INSTANCE_SUBJECT_RESOURCE;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.model.event.ReindexRangeIndexEvent;
import org.folio.search.model.reindex.UploadRangeEntity;
import org.folio.search.model.types.ReindexEntityType;
Expand All @@ -16,6 +24,13 @@
@Service
public class ReindexRangeIndexService {

private static final Map<ReindexEntityType, String> RESOURCE_NAME_MAP = Map.of(
ReindexEntityType.INSTANCE, INSTANCE_RESOURCE,
ReindexEntityType.SUBJECT, INSTANCE_SUBJECT_RESOURCE,
ReindexEntityType.CLASSIFICATION, INSTANCE_CLASSIFICATION_RESOURCE,
ReindexEntityType.CONTRIBUTOR, CONTRIBUTOR_RESOURCE
);

private final Map<ReindexEntityType, ReindexJdbcRepository> repositories;
private final FolioMessageProducer<ReindexRangeIndexEvent> indexRangeEventProducer;

Expand All @@ -33,10 +48,23 @@ public void prepareAndSendIndexRanges(ReindexEntityType entityType) {
indexRangeEventProducer.sendMessages(prepareEvents(uploadRanges));
}

public Collection<ResourceEvent> fetchRecordRange(ReindexRangeIndexEvent rangeIndexEvent) {
var entityType = rangeIndexEvent.getEntityType();
var repository = repositories.get(entityType);
var recordMaps = repository.fetchBy(rangeIndexEvent.getLimit(), rangeIndexEvent.getOffset());
return recordMaps.stream()
.map(map -> new ResourceEvent().id(getString(map, ID_FIELD))
.resourceName(RESOURCE_NAME_MAP.get(entityType))
._new(map)
.tenant(rangeIndexEvent.getTenant()))
.toList();
}

private List<ReindexRangeIndexEvent> prepareEvents(List<UploadRangeEntity> uploadRanges) {
return uploadRanges.stream()
.map(range -> {
var event = new ReindexRangeIndexEvent();
event.setId(range.getId());
event.setEntityType(range.getEntityType());
event.setOffset(range.getOffset());
event.setLimit(range.getLimit());
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/org/folio/search/service/reindex/ReindexService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.folio.search.service.reindex;

import java.util.Collection;
import lombok.RequiredArgsConstructor;
import org.folio.search.domain.dto.FolioIndexOperationResponse;
import org.folio.search.exception.ReindexException;
import org.folio.search.model.event.ReindexRangeIndexEvent;
import org.folio.search.repository.PrimaryResourceRepository;
import org.folio.search.service.converter.MultiTenantSearchDocumentConverter;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class ReindexService {

private final ReindexRangeIndexService rangeIndexService;
private final PrimaryResourceRepository elasticRepository;
private final MultiTenantSearchDocumentConverter documentConverter;

public boolean process(ReindexRangeIndexEvent event) {
var resourceEvents = rangeIndexService.fetchRecordRange(event);
var documents = documentConverter.convert(resourceEvents).values().stream().flatMap(Collection::stream).toList();
var folioIndexOperationResponse = elasticRepository.indexResources(documents);
if (folioIndexOperationResponse.getStatus() == FolioIndexOperationResponse.StatusEnum.ERROR) {
throw new ReindexException(folioIndexOperationResponse.getErrorMessage());
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,44 @@
package org.folio.search.service.reindex.jdbc;

import java.util.HashMap;
import java.util.Map;
import org.folio.search.configuration.properties.ReindexConfigurationProperties;
import org.folio.search.model.types.ReindexEntityType;
import org.folio.search.utils.JsonConverter;
import org.folio.spring.FolioExecutionContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;

@Repository
public class ClassificationJdbcRepository extends ReindexJdbcRepository {

protected ClassificationJdbcRepository(JdbcTemplate jdbcTemplate, FolioExecutionContext context,
protected ClassificationJdbcRepository(JdbcTemplate jdbcTemplate, JsonConverter jsonConverter,
FolioExecutionContext context,
ReindexConfigurationProperties reindexConfigurationProperties) {
super(jdbcTemplate, context, reindexConfigurationProperties);
super(jdbcTemplate, jsonConverter, context, reindexConfigurationProperties);
}

@Override
public ReindexEntityType entityType() {
return ReindexEntityType.CLASSIFICATION;
}

@Override
protected RowMapper<Map<String, Object>> rowToMapMapper() {
return (rs, rowNum) -> {
Map<String, Object> subject = new HashMap<>();
subject.put("id", rs.getString("id"));
subject.put("number", rs.getString("number"));
subject.put("typeId", rs.getString("type_id"));

var maps = jsonConverter.fromJsonToListOfMaps(rs.getString("instances"));
subject.put("instances", maps);

return subject;
};
}

@Override
protected String entityTable() {
return "classification";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,45 @@
package org.folio.search.service.reindex.jdbc;

import java.util.HashMap;
import java.util.Map;
import org.folio.search.configuration.properties.ReindexConfigurationProperties;
import org.folio.search.model.types.ReindexEntityType;
import org.folio.search.utils.JsonConverter;
import org.folio.spring.FolioExecutionContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;

@Repository
public class ContributorJdbcRepository extends ReindexJdbcRepository {

protected ContributorJdbcRepository(JdbcTemplate jdbcTemplate, FolioExecutionContext context,
protected ContributorJdbcRepository(JdbcTemplate jdbcTemplate, JsonConverter jsonConverter,
FolioExecutionContext context,
ReindexConfigurationProperties reindexConfigurationProperties) {
super(jdbcTemplate, context, reindexConfigurationProperties);
super(jdbcTemplate, jsonConverter, context, reindexConfigurationProperties);
}

@Override
public ReindexEntityType entityType() {
return ReindexEntityType.CONTRIBUTOR;
}

@Override
protected RowMapper<Map<String, Object>> rowToMapMapper() {
return (rs, rowNum) -> {
Map<String, Object> subject = new HashMap<>();
subject.put("id", rs.getString("id"));
subject.put("name", rs.getString("name"));
subject.put("contributorNameTypeId", rs.getString("contributor_name_type_id"));
subject.put("authorityId", rs.getString("authority_id"));

var maps = jsonConverter.fromJsonToListOfMaps(rs.getString("instances"));
subject.put("instances", maps);

return subject;
};
}

@Override
protected String entityTable() {
return "contributor";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,54 @@
package org.folio.search.service.reindex.jdbc;

import static org.folio.search.utils.JdbcUtils.getFullTableName;

import java.util.Map;
import org.folio.search.configuration.properties.ReindexConfigurationProperties;
import org.folio.search.model.types.ReindexEntityType;
import org.folio.search.utils.JsonConverter;
import org.folio.spring.FolioExecutionContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;

@Repository
public class InstanceJdbcRepository extends ReindexJdbcRepository {

protected InstanceJdbcRepository(JdbcTemplate jdbcTemplate, FolioExecutionContext context,
protected InstanceJdbcRepository(JdbcTemplate jdbcTemplate, JsonConverter jsonConverter,
FolioExecutionContext context,
ReindexConfigurationProperties reindexConfigurationProperties) {
super(jdbcTemplate, context, reindexConfigurationProperties);
super(jdbcTemplate, jsonConverter, context, reindexConfigurationProperties);
}

@Override
public ReindexEntityType entityType() {
return ReindexEntityType.INSTANCE;
}

@Override
protected String getFetchBySql() {
return """
SELECT i.instance_json
|| jsonb_build_object('tenantId', i.tenant_id)
|| jsonb_build_object('shared', i.shared)
|| jsonb_build_object('isBoundWith', i.is_bound_with)
|| jsonb_build_object('holdings', jsonb_agg(h.holding_json || jsonb_build_object('tenantId', h.tenant_id)))
|| jsonb_build_object('items', jsonb_agg(it.item_json || jsonb_build_object('tenantId', it.tenant_id))) as json
FROM %s i
JOIN %s h on h.instance_id = i.id
JOIN %s it on it.holding_id = h.id
GROUP BY i.id LIMIT ? OFFSET ?;
"""
.formatted(getFullTableName(context, entityTable()),
getFullTableName(context, "holding"),
getFullTableName(context, "item"));
}

@Override
protected RowMapper<Map<String, Object>> rowToMapMapper() {
return (rs, rowNum) -> jsonConverter.fromJsonToMap(rs.getString("json"));
}

@Override
protected String entityTable() {
return "instance";
Expand Down
Loading

0 comments on commit a372753

Please sign in to comment.