Skip to content

Commit

Permalink
- trigger upload
Browse files Browse the repository at this point in the history
  • Loading branch information
psmagin committed Sep 5, 2024
1 parent 0a3981a commit 43fc509
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package org.folio.search.service.reindex;

import java.util.Arrays;
import java.util.Collection;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.logging.log4j.message.FormattedMessage;
import org.folio.search.domain.dto.FolioIndexOperationResponse;
import org.folio.search.domain.dto.ReindexUploadDto;
import org.folio.search.exception.ReindexException;
import org.folio.search.model.event.ReindexRangeIndexEvent;
import org.folio.search.model.event.ReindexRecordsEvent;
import org.folio.search.repository.PrimaryResourceRepository;
import org.folio.search.service.converter.MultiTenantSearchDocumentConverter;
import org.folio.spring.FolioExecutionContext;
import org.springframework.stereotype.Service;

@Log4j2
Expand All @@ -21,7 +24,9 @@ public class ReindexOrchestrationService {
private final ReindexMergeRangeIndexService mergeRangeService;
private final ReindexStatusService reindexStatusService;
private final PrimaryResourceRepository elasticRepository;
private final ReindexService reindexService;
private final MultiTenantSearchDocumentConverter documentConverter;
private final FolioExecutionContext context;

public boolean process(ReindexRangeIndexEvent event) {
log.info("process:: ReindexRangeIndexEvent [id: {}, tenantId: {}, entityType: {}, offset: {}, limit: {}, ts: {}]",
Expand Down Expand Up @@ -52,11 +57,16 @@ public boolean process(ReindexRecordsEvent event) {
reindexStatusService.addProcessedMergeRanges(entityType, 1);
} catch (Exception ex) {
log.error(new FormattedMessage("process:: ReindexRecordsEvent indexing error [rangeId: {}, error: {}]",
event.getRangeId(), ex.getMessage()), ex);
event.getRangeId(), ex.getMessage()), ex);
reindexStatusService.updateReindexMergeFailed();
} finally {
log.info("process:: ReindexRecordsEvent processed [rangeId: {}]", event.getRangeId());
log.info("process:: ReindexRecordsEvent processed [rangeId: {}, recordType: {}]",
event.getRangeId(), event.getRecordType());
mergeRangeService.updateFinishDate(entityType, event.getRangeId());
if (reindexStatusService.isMergeCompleted()) {
reindexService.submitUploadReindex(context.getTenantId(),
Arrays.asList(ReindexUploadDto.EntityTypesEnum.values()));
}
}

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public ReindexService(ConsortiumTenantService consortiumService,
}

public CompletableFuture<Void> submitFullReindex(String tenantId) {
log.info("initFullReindex:: for [tenantId: {}]", tenantId);
log.info("submitFullReindex:: for [tenantId: {}]", tenantId);

validateTenant(tenantId);

Expand Down Expand Up @@ -91,13 +91,13 @@ public CompletableFuture<Void> submitFullReindex(String tenantId) {
return unused;
});

log.info("initFullReindex:: submitted [tenantId: {}]", tenantId);
log.info("submitFullReindex:: submitted [tenantId: {}]", tenantId);
return future;
}

public CompletableFuture<Void> submitUploadReindex(String tenantId,
List<ReindexUploadDto.EntityTypesEnum> entityTypesDto) {
log.info("submitting reindex upload process");
log.info("submitUploadReindex:: for [tenantId: {}]", tenantId);
var reindexEntityTypes = entityTypeMapper.convert(entityTypesDto);
validateUploadReindex(tenantId, reindexEntityTypes);

Expand All @@ -120,7 +120,7 @@ public CompletableFuture<Void> submitUploadReindex(String tenantId,
futures.add(future);
}

log.info("reindex upload process submitted");
log.info("submitUploadReindex:: submitted [tenantId: {}]", tenantId);
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,14 @@ public void updateReindexMergeStarted(ReindexEntityType entityType, int totalMer
}

public void updateReindexUploadStarted(ReindexEntityType entityType, int totalUploadRanges) {
log.info("updateReindexUploadStarted:: for [entityType: {}, totalMergeRanges: {}]", entityType, totalUploadRanges);
statusRepository.setUploadReindexStarted(entityType, totalUploadRanges);
}

public boolean isMergeCompleted() {
return statusRepository.isMergeCompleted();
}

private List<ReindexStatusEntity> constructNewStatusRecords(Set<ReindexEntityType> entityTypes,
ReindexStatus status) {
return entityTypes.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class ReindexStatusRepository {

private static final String QUERY_TWO_COLUMNS_PLACEHOLDER = "%s = ?, %s = ?";

private static final String SELECT_MERGE_STATUS_SQL = "SELECT check_merge_completed_status()";

private final FolioExecutionContext context;
private final JdbcTemplate jdbcTemplate;

Expand Down Expand Up @@ -140,6 +142,10 @@ public void saveReindexStatusRecords(List<ReindexStatusEntity> statusRecords) {
});
}

public boolean isMergeCompleted() {
return Boolean.TRUE.equals(jdbcTemplate.queryForObject(SELECT_MERGE_STATUS_SQL, Boolean.class));
}

private RowMapper<ReindexStatusEntity> reindexStatusRowMapper() {
return (rs, rowNum) -> {
var reindexStatus = new ReindexStatusEntity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@
<tableExists tableName="contributor"/>
</and>
</preConditions>
<sqlFile path="create-instance-trigger.sql" relativeToChangelogFile="true" splitStatements="false"/>
<sqlFile path="sql/create-instance-trigger.sql" relativeToChangelogFile="true" splitStatements="false"/>
</changeSet>

<changeSet id="MSEARCH-793@@create-instance_deleted_trigger" author="Mukhiddin_Yusupov">
Expand All @@ -365,13 +365,17 @@
<tableExists tableName="contributor"/>
</and>
</preConditions>
<sqlFile path="delete-instance-trigger.sql" relativeToChangelogFile="true" splitStatements="false"/>
<sqlFile path="sql/delete-instance-trigger.sql" relativeToChangelogFile="true" splitStatements="false"/>
</changeSet>

<changeSet id="MSEARCH-797@@create-reindex_status-trigger" author="Viacheslav_Kolesnyk">
<preConditions onFail="MARK_RAN">
<tableExists tableName="reindex_status"/>
</preConditions>
<sqlFile path="update-reindex-status-trigger.sql" relativeToChangelogFile="true" splitStatements="false"/>
<sqlFile path="sql/update-reindex-status-trigger.sql" relativeToChangelogFile="true" splitStatements="false"/>
</changeSet>

<changeSet id="MSEARCH-821@@create-merge_status_function-trigger" author="pavlo_smahin">
<sqlFile path="sql/check-merge-completed-status-function.sql" relativeToChangelogFile="true" splitStatements="false"/>
</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE OR REPLACE FUNCTION check_merge_completed_status()
RETURNS BOOLEAN AS
$$
DECLARE
result BOOLEAN;
BEGIN
SELECT count(1) = 3
INTO result
FROM reindex_status
WHERE entity_type IN ('ITEM', 'INSTANCE', 'HOLDINGS')
AND status = 'MERGE_COMPLETED';

RETURN result;
END;
$$ LANGUAGE plpgsql;
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
CREATE OR REPLACE FUNCTION update_reindex_status_trigger()
RETURNS TRIGGER AS
$$
BEGIN
-- update status and end time for merge
IF OLD.status = 'MERGE_IN_PROGRESS' and NEW.total_merge_ranges = NEW.processed_merge_ranges
THEN
NEW.status = 'MERGE_COMPLETED';
NEW.end_time_merge = current_timestamp;
ELSE
-- update status and end time for upload
IF OLD.status = 'UPLOAD_IN_PROGRESS' and NEW.total_upload_ranges = NEW.processed_upload_ranges
THEN
NEW.status = 'UPLOAD_COMPLETED';
NEW.end_time_upload = current_timestamp;
END IF;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

DROP TRIGGER IF EXISTS reindex_status_updated_trigger ON reindex_status CASCADE;
CREATE TRIGGER reindex_status_updated_trigger
BEFORE UPDATE OF processed_merge_ranges, processed_upload_ranges
ON reindex_status
FOR EACH ROW
EXECUTE FUNCTION update_reindex_status_trigger();

This file was deleted.

0 comments on commit 43fc509

Please sign in to comment.