Skip to content

Commit

Permalink
MSEARCH-794: fix status setting query
Browse files Browse the repository at this point in the history
  • Loading branch information
mukhiddin-yusuf committed Aug 16, 2024
1 parent 1e55259 commit 09f8fe5
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void publishReindexRecordsRange(MergeRangeEntity rangeEntity) {
var to = rangeEntity.getUpperId().toString();
var recordsRange = new InventoryReindexRecordsClient.ReindexRecords(
rangeEntity.getId().toString(),
rangeEntity.getEntityType().name(),
rangeEntity.getEntityType().getType(),
new InventoryReindexRecordsClient.ReindexRecordsRange(from, to));

try {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/folio/search/model/client/CqlQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public static CqlQuery greaterThan(CqlQueryParam param, String value) {
return fromTemplate("%s>(%s)", param.getCqlParam(), value);
}

public static CqlQuery sortBy(CqlQuery cqlQuery, CqlQueryParam param) {
return fromTemplate("%s sortBy %s", cqlQuery.query, param.getCqlParam());
}

private static CqlQuery fromTemplate(String format, Object... args) {
return new CqlQuery(String.format(format, args));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ private List<MergeRangeEntity> constructRecordMergeRanges(int recordsCount,

List<MergeRangeEntity> ranges = new ArrayList<>();
int pages = (int) Math.ceil((double) recordsCount / rangeSize);
var recordIds = inventoryService.fetchInventoryRecordIds(recordType, null, 0, rangeSize);
var query = CqlQuery.sortBy(new CqlQuery("cql.allRecords=1"), CqlQueryParam.ID);
var recordIds = inventoryService.fetchInventoryRecordIds(recordType, query, 0, rangeSize);
if (CollectionUtils.isEmpty(recordIds)) {
log.warn("There are no records to create merge ranges: [recordType: {}, recordsCount: {}, tenant: {}]",
recordType, recordsCount, tenantId);
Expand All @@ -95,8 +96,9 @@ private List<MergeRangeEntity> constructRecordMergeRanges(int recordsCount,
for (int i = 1; i < pages; i++) {
int offset = i * rangeSize;
int limit = Math.min(rangeSize, recordsCount - offset);
var query = CqlQuery.greaterThan(CqlQueryParam.ID, lowerId.toString());
recordIds = inventoryService.fetchInventoryRecordIds(recordType, query, offset, limit);
query = CqlQuery.greaterThan(CqlQueryParam.ID, lowerId.toString());
recordIds =
inventoryService.fetchInventoryRecordIds(recordType, CqlQuery.sortBy(query, CqlQueryParam.ID), offset, limit);
lowerId = recordIds.get(0);
upperId = recordIds.get(recordIds.size() - 1);
ranges.add(mergeEntity(UUID.randomUUID(), recordType, tenantId, lowerId, upperId, Timestamp.from(Instant.now())));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.folio.search.service.reindex.jdbc;

import static org.folio.search.model.reindex.ReindexStatusEntity.END_TIME_MERGE_COLUMN;
import static org.folio.search.model.reindex.ReindexStatusEntity.END_TIME_UPLOAD_COLUMN;
import static org.folio.search.model.reindex.ReindexStatusEntity.START_TIME_MERGE_COLUMN;
import static org.folio.search.model.reindex.ReindexStatusEntity.STATUS_COLUMN;
import static org.folio.search.model.reindex.ReindexStatusEntity.TOTAL_MERGE_RANGES_COLUMN;
import static org.folio.search.service.reindex.ReindexConstants.REINDEX_STATUS_TABLE;
import static org.folio.search.utils.JdbcUtils.getFullTableName;
Expand Down Expand Up @@ -37,7 +37,7 @@ public class ReindexStatusRepository {
private static final String UPDATE_FOR_ENTITIES_SQL = """
UPDATE %s
SET %s
WHERE entity_type in ?;
WHERE entity_type in (%s);
""";

private final FolioExecutionContext context;
Expand All @@ -57,21 +57,15 @@ public void setMergeReindexStarted(ReindexEntityType entityType, int totalMergeR
jdbcTemplate.update(sql, totalMergeRanges, Timestamp.from(Instant.now()), entityType.name());
}

public void setReindexUploadFailed(ReindexEntityType entityType) {
var fullTableName = getFullTableName(context, REINDEX_STATUS_TABLE);
var sql = UPDATE_SQL.formatted(fullTableName, "%s = ?".formatted(END_TIME_UPLOAD_COLUMN));

jdbcTemplate.update(sql, ReindexStatus.UPLOAD_FAILED.name(), Timestamp.from(Instant.now()), entityType.name());
}

public void setReindexMergeFailed(List<ReindexEntityType> entityTypes) {
var inTypes = entityTypes.stream()
.map(ReindexEntityType::name)
.collect(Collectors.joining(",", "(", ")"));
.map(entityType -> "'%s'".formatted(entityType.name()))
.collect(Collectors.joining(","));
var fullTableName = getFullTableName(context, REINDEX_STATUS_TABLE);
var sql = UPDATE_FOR_ENTITIES_SQL.formatted(fullTableName, "%s = ?".formatted(END_TIME_MERGE_COLUMN));
var sql = UPDATE_FOR_ENTITIES_SQL.formatted(
fullTableName, "%s = ?, %s = ?".formatted(STATUS_COLUMN, END_TIME_MERGE_COLUMN), inTypes);

jdbcTemplate.update(sql, ReindexStatus.MERGE_FAILED.name(), Timestamp.from(Instant.now()), inTypes);
jdbcTemplate.update(sql, ReindexStatus.MERGE_FAILED.name(), Timestamp.from(Instant.now()));
}

public void saveReindexStatusRecords(List<ReindexStatusEntity> statusRecords) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,11 @@
</preConditions>
<sqlFile path="update-reindex-status-trigger.sql" relativeToChangelogFile="true" splitStatements="false"/>
</changeSet>

<changeSet id="MSEARCH-794@@create-reindex_status-trigger-fix" author="Mukhiddin_Yusupov">
<preConditions onFail="MARK_RAN">
<tableExists tableName="reindex_status"/>
</preConditions>
<sqlFile path="update-reindex-status-trigger-fix.sql" relativeToChangelogFile="true" splitStatements="false"/>
</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
CREATE OR REPLACE FUNCTION update_reindex_status_trigger()
RETURNS TRIGGER AS $$
BEGIN
-- update status and end time for merge
IF OLD.status = 'MERGE_IN_PROGRESS' and NEW.total_merge_ranges = NEW.processed_merge_ranges
THEN NEW.status = 'MERGE_COMPLETED'; NEW.end_time_merge = current_timestamp;
ELSE
-- update status and end time for upload
IF OLD.status = 'UPLOAD_IN_PROGRESS' and NEW.total_upload_ranges = NEW.processed_upload_ranges
THEN NEW.status = 'UPLOAD_COMPLETED'; NEW.end_time_upload = current_timestamp;
END IF;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

DROP TRIGGER IF EXISTS reindex_status_updated_trigger ON reindex_status CASCADE;
CREATE TRIGGER reindex_status_updated_trigger
BEFORE UPDATE OF processed_merge_ranges, processed_upload_ranges
ON reindex_status
FOR EACH ROW
EXECUTE FUNCTION update_reindex_status_trigger();

0 comments on commit 09f8fe5

Please sign in to comment.