Skip to content

Commit

Permalink
Clean up stale entries from upgradeSegments table (apache#15637)
Browse files Browse the repository at this point in the history
* Clean up stale entries from upgradeSegments table
  • Loading branch information
AmatyaAvadhanula authored Jan 17, 2024
1 parent fc06f2d commit a26defd
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,14 @@ public void remove(final Task task)
try {
try {
log.info("Removing task[%s] from activeTasks", task.getId());
if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == TaskLockType.REPLACE)) {
final int upgradeSegmentsDeleted = metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
log.info(
"Deleted [%d] entries from upgradeSegments table for task[%s] with REPLACE locks.",
upgradeSegmentsDeleted,
task.getId()
);
}
unlockAll(task);
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1919,6 +1919,36 @@ public void testCleanUpLocksAfterSegmentAllocationFailure()
Assert.assertTrue(testLockbox.getAllLocks().isEmpty());
}

@Test
public void testUpgradeSegmentsCleanupOnUnlock()
{
final Task replaceTask = NoopTask.create();
final Task appendTask = NoopTask.create();
final IndexerSQLMetadataStorageCoordinator coordinator
= EasyMock.createMock(IndexerSQLMetadataStorageCoordinator.class);
// Only the replaceTask should attempt a delete on the upgradeSegments table
EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once();
EasyMock.replay(coordinator);

final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator);

taskLockbox.add(replaceTask);
taskLockbox.tryLock(
replaceTask,
new TimeChunkLockRequest(TaskLockType.REPLACE, replaceTask, Intervals.of("2024/2025"), "v0")
);

taskLockbox.add(appendTask);
taskLockbox.tryLock(
appendTask,
new TimeChunkLockRequest(TaskLockType.APPEND, appendTask, Intervals.of("2024/2025"), "v0")
);

taskLockbox.remove(replaceTask);
taskLockbox.remove(appendTask);

EasyMock.verify(coordinator);
}

private class TaskLockboxValidator
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@ public DataSegment retrieveSegmentForId(final String id, boolean includeUnused)
return null;
}

@Override
public int deleteUpgradeSegmentsForTask(final String taskId)
{
throw new UnsupportedOperationException();
}

public Set<DataSegment> getPublished()
{
return ImmutableSet.copyOf(published);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,4 +443,11 @@ SegmentPublishResult commitMetadataOnly(
* @return DataSegment used segment corresponding to given id
*/
DataSegment retrieveSegmentForId(String id, boolean includeUnused);

/**
* Delete entries from the upgrade segments table after the corresponding replace task has ended
* @param taskId - id of the task with replace locks
* @return number of deleted entries from the metadata store
*/
int deleteUpgradeSegmentsForTask(String taskId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2707,6 +2707,22 @@ public DataSegment retrieveSegmentForId(final String id, boolean includeUnused)
);
}

@Override
public int deleteUpgradeSegmentsForTask(final String taskId)
{
return connector.getDBI().inTransaction(
(handle, status) -> handle
.createStatement(
StringUtils.format(
"DELETE FROM %s WHERE task_id = :task_id",
dbTables.getUpgradeSegmentsTable()
)
)
.bind("task_id", taskId)
.execute()
);
}

private static class PendingSegmentsRecord
{
private final String sequenceName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,37 @@ public void testRetrieveSegmentForId()
Assert.assertEquals(defaultSegment, coordinator.retrieveSegmentForId(defaultSegment.getId().toString(), true));
}

@Test
public void testCleanUpgradeSegmentsTableForTask()
{
final String taskToClean = "taskToClean";
final ReplaceTaskLock replaceLockToClean = new ReplaceTaskLock(
taskToClean,
Intervals.of("2023-01-01/2023-02-01"),
"2023-03-01"
);
DataSegment segmentToClean0 = createSegment(
Intervals.of("2023-01-01/2023-02-01"),
"2023-02-01",
new NumberedShardSpec(0, 0)
);
DataSegment segmentToClean1 = createSegment(
Intervals.of("2023-01-01/2023-01-02"),
"2023-01-02",
new NumberedShardSpec(0, 0)
);
insertIntoUpgradeSegmentsTable(
ImmutableMap.of(segmentToClean0, replaceLockToClean, segmentToClean1, replaceLockToClean)
);

// Unrelated task should not result in clean up
Assert.assertEquals(0, coordinator.deleteUpgradeSegmentsForTask("someRandomTask"));
// The two segment entries are deleted
Assert.assertEquals(2, coordinator.deleteUpgradeSegmentsForTask(taskToClean));
// Nothing further to delete
Assert.assertEquals(0, coordinator.deleteUpgradeSegmentsForTask(taskToClean));
}

@Test
public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOException
{
Expand Down

0 comments on commit a26defd

Please sign in to comment.