Skip to content

Commit

Permalink
feat(reindex): Implement Reindex status Endpoint (#641)
Browse files Browse the repository at this point in the history
* feat(reindex): Implement Reindex status Endpoint

- Implement DB schema
- Implement status update trigger on processed items updates
- Implement endpoint

Implements: MSEARCH-797
  • Loading branch information
viacheslavkol authored Aug 15, 2024
1 parent 60647a5 commit baee716
Show file tree
Hide file tree
Showing 20 changed files with 603 additions and 11 deletions.
16 changes: 15 additions & 1 deletion descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"provides": [
{
"id": "indices",
"version": "0.7",
"version": "0.8",
"handlers": [
{
"methods": [
Expand Down Expand Up @@ -74,6 +74,15 @@
"modulePermissions": [
"user-tenants.collection.get"
]
},
{
"methods": [
"GET"
],
"pathPattern": "/search/index/instance-records/reindex/status",
"permissionsRequired": [
"search.index.reindex.status.get"
]
}
]
},
Expand Down Expand Up @@ -693,6 +702,11 @@
"displayName": "Search - starts inventory reindex operation",
"description": "Starts inventory reindex operation"
},
{
"permissionName": "search.index.reindex.status.get",
"displayName": "Search - returns reindex status for entities",
"description": "Returns reindex status for entities"
},
{
"permissionName": "search.facets.collection.get",
"displayName": "Search - returns facets for a query for given filter options by record type",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import org.folio.search.domain.dto.FolioIndexOperationResponse;
import org.folio.search.domain.dto.ReindexJob;
import org.folio.search.domain.dto.ReindexRequest;
import org.folio.search.domain.dto.ReindexStatusItem;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.domain.dto.UpdateIndexDynamicSettingsRequest;
import org.folio.search.domain.dto.UpdateMappingsRequest;
import org.folio.search.rest.resource.IndexManagementApi;
import org.folio.search.service.IndexService;
import org.folio.search.service.ResourceService;
import org.folio.search.service.reindex.ReindexRangeIndexService;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RequestMapping;
Expand All @@ -30,6 +32,7 @@
public class IndexManagementController implements IndexManagementApi {

private final IndexService indexService;
private final ReindexRangeIndexService reindexRangeService;
private final ResourceService resourceService;

@Override
Expand Down Expand Up @@ -59,4 +62,9 @@ public ResponseEntity<FolioIndexOperationResponse> updateIndexDynamicSettings(
public ResponseEntity<FolioIndexOperationResponse> updateMappings(String tenantId, UpdateMappingsRequest request) {
return ResponseEntity.ok(indexService.updateMappings(request.getResourceName(), tenantId));
}

@Override
public ResponseEntity<List<ReindexStatusItem>> getReindexStatus(String tenantId) {
return ResponseEntity.ok(reindexRangeService.getReindexStatuses(tenantId));
}
}
11 changes: 11 additions & 0 deletions src/main/java/org/folio/search/converter/ReindexStatusMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.folio.search.converter;

import org.folio.search.domain.dto.ReindexStatusItem;
import org.folio.search.model.reindex.ReindexStatusEntity;
import org.mapstruct.Mapper;

@Mapper(componentModel = "spring")
public interface ReindexStatusMapper {

ReindexStatusItem convert(ReindexStatusEntity entity);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.folio.search.model.reindex;

import java.sql.Timestamp;
import lombok.Data;
import org.folio.search.model.types.ReindexEntityType;
import org.folio.search.model.types.ReindexStatus;

@Data
public class ReindexStatusEntity {

public static final String REINDEX_STATUS_TABLE = "reindex_status";
public static final String ENTITY_TYPE_COLUMN = "entity_type";
public static final String STATUS_COLUMN = "status";
public static final String TOTAL_MERGE_RANGES_COLUMN = "total_merge_ranges";
public static final String PROCESSED_MERGE_RANGES_COLUMN = "processed_merge_ranges";
public static final String TOTAL_UPLOAD_RANGES_COLUMN = "total_upload_ranges";
public static final String PROCESSED_UPLOAD_RANGES_COLUMN = "processed_upload_ranges";
public static final String START_TIME_MERGE_COLUMN = "start_time_merge";
public static final String END_TIME_MERGE_COLUMN = "end_time_merge";
public static final String START_TIME_UPLOAD_COLUMN = "start_time_upload";
public static final String END_TIME_UPLOAD_COLUMN = "end_time_upload";

private final ReindexEntityType entityType;
private final ReindexStatus status;
private int totalMergeRanges;
private int processedMergeRanges;
private int totalUploadRanges;
private int processedUploadRanges;
private Timestamp startTimeMerge;
private Timestamp endTimeMerge;
private Timestamp startTimeUpload;
private Timestamp endTimeUpload;

}
19 changes: 19 additions & 0 deletions src/main/java/org/folio/search/model/types/ReindexStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.folio.search.model.types;

import lombok.Getter;

@Getter
public enum ReindexStatus {

MERGE_IN_PROGRESS("Merge In Progress"),
MERGE_COMPLETED("Merge Completed"),
UPLOAD_IN_PROGRESS("Upload In Progress"),
UPLOAD_COMPLETED("Upload Completed"),
UPLOAD_FAILED("Upload Failed");

private final String value;

ReindexStatus(String value) {
this.value = value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ public boolean process(ReindexRangeIndexEvent event) {
var folioIndexOperationResponse = elasticRepository.indexResources(documents);
rangeIndexService.updateFinishDate(event);
if (folioIndexOperationResponse.getStatus() == FolioIndexOperationResponse.StatusEnum.ERROR) {
// TODO MSEARCH-797 - update status as failed indicating upload has failed
rangeIndexService.setReindexUploadFailed(event.getEntityType());
throw new ReindexException(folioIndexOperationResponse.getErrorMessage());
}

rangeIndexService.addProcessedUploadRanges(event.getEntityType(), documents.size());
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,26 @@
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.folio.search.converter.ReindexStatusMapper;
import org.folio.search.domain.dto.ReindexStatusItem;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.exception.RequestValidationException;
import org.folio.search.model.event.ReindexRangeIndexEvent;
import org.folio.search.model.reindex.UploadRangeEntity;
import org.folio.search.model.types.ReindexEntityType;
import org.folio.search.service.consortium.ConsortiumTenantService;
import org.folio.search.service.reindex.jdbc.ReindexJdbcRepository;
import org.folio.search.service.reindex.jdbc.ReindexStatusRepository;
import org.folio.spring.integration.XOkapiHeaders;
import org.folio.spring.tools.kafka.FolioMessageProducer;
import org.springframework.stereotype.Service;

@Service
public class ReindexRangeIndexService {

static final String REQUEST_NOT_ALLOWED_MSG =
"The request not allowed for member tenant of consortium environment";

private static final Map<ReindexEntityType, String> RESOURCE_NAME_MAP = Map.of(
ReindexEntityType.INSTANCE, INSTANCE_RESOURCE,
ReindexEntityType.SUBJECT, INSTANCE_SUBJECT_RESOURCE,
Expand All @@ -35,11 +44,19 @@ public class ReindexRangeIndexService {

private final Map<ReindexEntityType, ReindexJdbcRepository> repositories;
private final FolioMessageProducer<ReindexRangeIndexEvent> indexRangeEventProducer;
private final ReindexStatusRepository statusRepository;
private final ReindexStatusMapper reindexStatusMapper;
private final ConsortiumTenantService tenantService;

public ReindexRangeIndexService(List<ReindexJdbcRepository> repositories,
FolioMessageProducer<ReindexRangeIndexEvent> indexRangeEventProducer) {
FolioMessageProducer<ReindexRangeIndexEvent> indexRangeEventProducer,
ReindexStatusRepository statusRepository, ReindexStatusMapper reindexStatusMapper,
ConsortiumTenantService tenantService) {
this.repositories = repositories.stream().collect(Collectors.toMap(ReindexJdbcRepository::entityType, identity()));
this.indexRangeEventProducer = indexRangeEventProducer;
this.statusRepository = statusRepository;
this.reindexStatusMapper = reindexStatusMapper;
this.tenantService = tenantService;
}

public void prepareAndSendIndexRanges(ReindexEntityType entityType) {
Expand Down Expand Up @@ -67,6 +84,29 @@ public void updateFinishDate(ReindexRangeIndexEvent event) {
repository.setIndexRangeFinishDate(event.getId(), Timestamp.from(Instant.now()));
}

public List<ReindexStatusItem> getReindexStatuses(String tenantId) {
var centralTenant = tenantService.getCentralTenant(tenantId);
if (centralTenant.isPresent() && !centralTenant.get().equals(tenantId)) {
throw new RequestValidationException(REQUEST_NOT_ALLOWED_MSG, XOkapiHeaders.TENANT, tenantId);
}

var statuses = statusRepository.getReindexStatuses();

return statuses.stream().map(reindexStatusMapper::convert).toList();
}

public void setReindexUploadFailed(ReindexEntityType entityType) {
statusRepository.setReindexUploadFailed(entityType);
}

public void addProcessedMergeRanges(ReindexEntityType entityType, int processedMergeRanges) {
statusRepository.addReindexCounts(entityType, processedMergeRanges, 0);
}

public void addProcessedUploadRanges(ReindexEntityType entityType, int processedUploadRanges) {
statusRepository.addReindexCounts(entityType, 0, processedUploadRanges);
}

private List<ReindexRangeIndexEvent> prepareEvents(List<UploadRangeEntity> uploadRanges) {
return uploadRanges.stream()
.map(range -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.folio.search.service.reindex.jdbc;

import static org.folio.search.model.reindex.ReindexStatusEntity.END_TIME_MERGE_COLUMN;
import static org.folio.search.model.reindex.ReindexStatusEntity.END_TIME_UPLOAD_COLUMN;
import static org.folio.search.model.reindex.ReindexStatusEntity.PROCESSED_MERGE_RANGES_COLUMN;
import static org.folio.search.model.reindex.ReindexStatusEntity.PROCESSED_UPLOAD_RANGES_COLUMN;
import static org.folio.search.model.reindex.ReindexStatusEntity.REINDEX_STATUS_TABLE;
import static org.folio.search.model.reindex.ReindexStatusEntity.START_TIME_MERGE_COLUMN;
import static org.folio.search.model.reindex.ReindexStatusEntity.START_TIME_UPLOAD_COLUMN;
import static org.folio.search.model.reindex.ReindexStatusEntity.STATUS_COLUMN;
import static org.folio.search.model.reindex.ReindexStatusEntity.TOTAL_MERGE_RANGES_COLUMN;
import static org.folio.search.model.reindex.ReindexStatusEntity.TOTAL_UPLOAD_RANGES_COLUMN;
import static org.folio.search.utils.JdbcUtils.getFullTableName;

import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.folio.search.model.reindex.ReindexStatusEntity;
import org.folio.search.model.types.ReindexEntityType;
import org.folio.search.model.types.ReindexStatus;
import org.folio.spring.FolioExecutionContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;

@Repository
@RequiredArgsConstructor
public class ReindexStatusRepository {

private static final String SELECT_REINDEX_STATUS_BY_REINDEX_ID_SQL = "SELECT * FROM %s;";
private static final String UPDATE_REINDEX_STATUS_SQL = """
UPDATE %s
SET status = ?, %s = ?
WHERE entity_type = ?;
""";
private static final String UPDATE_REINDEX_COUNTS_SQL = """
UPDATE %s
SET processed_merge_ranges = processed_merge_ranges + ?,
processed_upload_ranges = processed_upload_ranges + ?
WHERE entity_type = ?;
""";

private final FolioExecutionContext context;
private final JdbcTemplate jdbcTemplate;


public List<ReindexStatusEntity> getReindexStatuses() {
var fullTableName = getFullTableName(context, REINDEX_STATUS_TABLE);
var sql = SELECT_REINDEX_STATUS_BY_REINDEX_ID_SQL.formatted(fullTableName);
return jdbcTemplate.query(sql, reindexStatusRowMapper());
}

public void setReindexUploadFailed(ReindexEntityType entityType) {
var fullTableName = getFullTableName(context, REINDEX_STATUS_TABLE);
var sql = UPDATE_REINDEX_STATUS_SQL.formatted(fullTableName, END_TIME_UPLOAD_COLUMN);

jdbcTemplate.update(sql, ReindexStatus.UPLOAD_FAILED.name(), Timestamp.from(Instant.now()), entityType.name());
}

public void addReindexCounts(ReindexEntityType entityType, int processedMergeRanges,
int processedUploadRanges) {
var fullTableName = getFullTableName(context, REINDEX_STATUS_TABLE);
var sql = UPDATE_REINDEX_COUNTS_SQL.formatted(fullTableName);

jdbcTemplate.update(sql, processedMergeRanges, processedUploadRanges, entityType.name());
}

private RowMapper<ReindexStatusEntity> reindexStatusRowMapper() {
return (rs, rowNum) -> {
var reindexStatus = new ReindexStatusEntity(
ReindexEntityType.valueOf(rs.getString(ReindexStatusEntity.ENTITY_TYPE_COLUMN)),
ReindexStatus.valueOf(rs.getString(STATUS_COLUMN))
);
reindexStatus.setTotalMergeRanges(rs.getInt(TOTAL_MERGE_RANGES_COLUMN));
reindexStatus.setProcessedMergeRanges(rs.getInt(PROCESSED_MERGE_RANGES_COLUMN));
reindexStatus.setTotalUploadRanges(rs.getInt(TOTAL_UPLOAD_RANGES_COLUMN));
reindexStatus.setProcessedUploadRanges(rs.getInt(PROCESSED_UPLOAD_RANGES_COLUMN));
reindexStatus.setStartTimeMerge(rs.getTimestamp(START_TIME_MERGE_COLUMN));
reindexStatus.setEndTimeMerge(rs.getTimestamp(END_TIME_MERGE_COLUMN));
reindexStatus.setStartTimeUpload(rs.getTimestamp(START_TIME_UPLOAD_COLUMN));
reindexStatus.setEndTimeUpload(rs.getTimestamp(END_TIME_UPLOAD_COLUMN));
return reindexStatus;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,33 @@
</createTable>
</changeSet>

<changeSet id="MSEARCH-797@@create-reindex_status-table" author="Viacheslav_Kolesnyk">
<preConditions>
<not>
<tableExists tableName="reindex_status"/>
</not>
</preConditions>

<comment>Create reindex_status table</comment>

<createTable tableName="reindex_status">
<column name="entity_type" type="VARCHAR(30)">
<constraints nullable="false" primaryKey="true" primaryKeyName="pk_reindex_status"/>
</column>
<column name="status" type="VARCHAR(20)">
<constraints nullable="false"/>
</column>
<column name="total_merge_ranges" type="int eger"/>
<column name="processed_merge_ranges" type="integer"/>
<column name="total_upload_ranges" type="integer"/>
<column name="processed_upload_ranges" type="integer"/>
<column name="start_time_merge" type="DATETIME"/>
<column name="end_time_merge" type="DATETIME"/>
<column name="start_time_upload" type="DATETIME"/>
<column name="end_time_upload" type="DATETIME"/>
</createTable>
</changeSet>

<changeSet id="MSEARCH-793@@create-instance_view" author="Mukhiddin_Yusupov">
<preConditions onFail="MARK_RAN">
<and>
Expand Down Expand Up @@ -263,4 +290,11 @@
</preConditions>
<sqlFile path="delete-instance-trigger.sql" relativeToChangelogFile="true" splitStatements="false"/>
</changeSet>

<changeSet id="MSEARCH-797@@create-reindex_status-trigger" author="Viacheslav_Kolesnyk">
<preConditions onFail="MARK_RAN">
<tableExists tableName="reindex_status"/>
</preConditions>
<sqlFile path="update-reindex-status-trigger.sql" relativeToChangelogFile="true" splitStatements="false"/>
</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
CREATE OR REPLACE FUNCTION update_reindex_status_trigger()
RETURNS TRIGGER AS $$
BEGIN
-- update status and end time for merge
IF OLD.status = 'MERGE_IN_PROGRESS' and NEW.total_merge_ranges = NEW.processed_merge_ranges
THEN NEW.status = 'MERGE_COMPLETED'; NEW.end_time_merge = current_timestamp;
ELSE
-- update status and end time for upload
IF OLD.status = 'UPLOAD_IN_PROGRESS' and NEW.total_upload_ranges = NEW.processed_upload_ranges
THEN NEW.status = 'UPLOAD_COMPLETED'; NEW.end_time_upload = current_timestamp;
END IF;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

DROP TRIGGER IF EXISTS reindex_status_updated_trigger ON reindex_status CASCADE;
CREATE TRIGGER reindex_status_updated_trigger
BEFORE UPDATE
ON reindex_status
FOR EACH ROW
EXECUTE FUNCTION update_reindex_status_trigger();
Loading

0 comments on commit baee716

Please sign in to comment.