Skip to content

Commit

Permalink
Fix bug in TaskStorageQueryAdapter (apache#16750)
Browse files Browse the repository at this point in the history
Changes:
- Do not hold a reference to `TaskQueue` in `TaskStorageQueryAdapter`
- Use `TaskStorage` instead of `TaskStorageQueryAdapter` in `IndexerMetadataStorageAdapter`
- Rename `TaskStorageQueryAdapter` to `TaskQueryTool`
- Fix newly added task actions `RetrieveUpgradedFromSegmentIds` and `RetrieveUpgradedToSegmentIds`
by removing `isAudited` method.
  • Loading branch information
kfaraz authored Jul 17, 2024
1 parent 40ef9fc commit 89066b7
Show file tree
Hide file tree
Showing 15 changed files with 116 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,6 @@ public UpgradedFromSegmentsResponse perform(Task task, TaskActionToolbox toolbox
);
}

@Override
public boolean isAudited()
{
return false;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,6 @@ public UpgradedToSegmentsResponse perform(Task task, TaskActionToolbox toolbox)
);
}

@Override
public boolean isAudited()
{
return false;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,33 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.metadata.TaskLookup;
import org.joda.time.Interval;

import java.util.Comparator;
import java.util.Optional;

public class IndexerMetadataStorageAdapter
{
private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final TaskStorage taskStorage;
private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;

@Inject
public IndexerMetadataStorageAdapter(
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskStorage taskStorage,
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator
)
{
this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.taskStorage = taskStorage;
this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
}

public int deletePendingSegments(String dataSource, Interval deleteInterval)
{
// Find the earliest active task created for the specified datasource; if one exists,
// check if its interval overlaps with the delete interval.
final Optional<TaskInfo<Task, TaskStatus>> earliestActiveTaskOptional = taskStorageQueryAdapter
.getActiveTaskInfo(dataSource)
final Optional<TaskInfo<Task, TaskStatus>> earliestActiveTaskOptional = taskStorage
.getTaskInfos(TaskLookup.activeTasksOnly(), dataSource)
.stream()
.min(Comparator.comparing(TaskInfo::getCreatedTime));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.joda.time.Interval;

Expand All @@ -36,25 +35,23 @@
import java.util.Map;

/**
* Wraps a {@link TaskStorage}, providing a useful collection of read-only methods.
* Provides read-only methods to fetch information related to tasks.
* This class may serve information that is cached in memory in {@link TaskQueue}
* or {@link TaskLockbox}. If not present in memory, then the underlying
* {@link TaskStorage} is queried.
*/
public class TaskStorageQueryAdapter
public class TaskQueryTool
{
private final TaskStorage storage;
private final TaskLockbox taskLockbox;
private final Optional<TaskQueue> taskQueue;
private final TaskMaster taskMaster;

@Inject
public TaskStorageQueryAdapter(TaskStorage storage, TaskLockbox taskLockbox, TaskMaster taskMaster)
public TaskQueryTool(TaskStorage storage, TaskLockbox taskLockbox, TaskMaster taskMaster)
{
this.storage = storage;
this.taskLockbox = taskLockbox;
this.taskQueue = taskMaster.getTaskQueue();
}

public List<Task> getActiveTasks()
{
return storage.getActiveTasks();
this.taskMaster = taskMaster;
}

/**
Expand Down Expand Up @@ -85,7 +82,7 @@ public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> minTa
public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource)
{
return storage.getTaskInfos(
ActiveTaskLookup.getInstance(),
TaskLookup.activeTasksOnly(),
dataSource
);
}
Expand All @@ -98,20 +95,21 @@ public List<TaskStatusPlus> getTaskStatusPlusList(
return storage.getTaskStatusPlusList(taskLookups, dataSource);
}

public Optional<Task> getTask(final String taskid)
public Optional<Task> getTask(final String taskId)
{
final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
if (taskQueue.isPresent()) {
Optional<Task> activeTask = taskQueue.get().getActiveTask(taskid);
Optional<Task> activeTask = taskQueue.get().getActiveTask(taskId);
if (activeTask.isPresent()) {
return activeTask;
}
}
return storage.getTask(taskid);
return storage.getTask(taskId);
}

public Optional<TaskStatus> getStatus(final String taskid)
public Optional<TaskStatus> getStatus(final String taskId)
{
return storage.getStatus(taskid);
return storage.getStatus(taskId);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
Expand Down Expand Up @@ -125,7 +125,7 @@ public class OverlordResource
private static final Logger log = new Logger(OverlordResource.class);

private final TaskMaster taskMaster;
private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final TaskQueryTool taskQueryTool;
private final IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
private final TaskLogStreamer taskLogStreamer;
private final JacksonConfigManager configManager;
Expand Down Expand Up @@ -162,7 +162,7 @@ private static TaskStateLookup fromString(@Nullable String state)
@Inject
public OverlordResource(
TaskMaster taskMaster,
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskQueryTool taskQueryTool,
IndexerMetadataStorageAdapter indexerMetadataStorageAdapter,
TaskLogStreamer taskLogStreamer,
JacksonConfigManager configManager,
Expand All @@ -174,7 +174,7 @@ public OverlordResource(
)
{
this.taskMaster = taskMaster;
this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.taskQueryTool = taskQueryTool;
this.indexerMetadataStorageAdapter = indexerMetadataStorageAdapter;
this.taskLogStreamer = taskLogStreamer;
this.configManager = configManager;
Expand Down Expand Up @@ -284,7 +284,7 @@ public Response getDatasourceLockedIntervals(Map<String, Integer> minTaskPriorit
}

// Build the response
return Response.ok(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority)).build();
return Response.ok(taskQueryTool.getLockedIntervals(minTaskPriority)).build();
}

@POST
Expand All @@ -298,7 +298,7 @@ public Response getDatasourceLockedIntervalsV2(List<LockFilterPolicy> lockFilter
}

// Build the response
return Response.ok(taskStorageQueryAdapter.getLockedIntervals(lockFilterPolicies)).build();
return Response.ok(taskQueryTool.getLockedIntervals(lockFilterPolicies)).build();
}

@GET
Expand All @@ -309,7 +309,7 @@ public Response getTaskPayload(@PathParam("taskid") String taskid)
{
final TaskPayloadResponse response = new TaskPayloadResponse(
taskid,
taskStorageQueryAdapter.getTask(taskid).orNull()
taskQueryTool.getTask(taskid).orNull()
);

final Response.Status status = response.getPayload() == null
Expand All @@ -325,7 +325,7 @@ public Response getTaskPayload(@PathParam("taskid") String taskid)
@ResourceFilters(TaskResourceFilter.class)
public Response getTaskStatus(@PathParam("taskid") String taskid)
{
final TaskInfo<Task, TaskStatus> taskInfo = taskStorageQueryAdapter.getTaskInfo(taskid);
final TaskInfo<Task, TaskStatus> taskInfo = taskQueryTool.getTaskInfo(taskid);
TaskStatusResponse response = null;

if (taskInfo != null) {
Expand Down Expand Up @@ -440,7 +440,7 @@ public Response shutdownTasksForDataSource(@PathParam("dataSource") final String
@Override
public Response apply(TaskQueue taskQueue)
{
final List<TaskInfo<Task, TaskStatus>> tasks = taskStorageQueryAdapter.getActiveTaskInfo(dataSource);
final List<TaskInfo<Task, TaskStatus>> tasks = taskQueryTool.getActiveTaskInfo(dataSource);
if (tasks.isEmpty()) {
return Response.status(Status.NOT_FOUND).build();
} else {
Expand Down Expand Up @@ -471,7 +471,7 @@ public Response getMultipleTaskStatuses(Set<String> taskIds)
if (taskQueue.isPresent()) {
optional = taskQueue.get().getTaskStatus(taskId);
} else {
optional = taskStorageQueryAdapter.getStatus(taskId);
optional = taskQueryTool.getStatus(taskId);
}
if (optional.isPresent()) {
result.put(taskId, optional.get());
Expand Down Expand Up @@ -866,7 +866,7 @@ private Stream<TaskStatusPlus> getTaskStatusPlusList(
throw new IAE("Unknown state: [%s]", state);
}

final Stream<TaskStatusPlus> taskStatusPlusStream = taskStorageQueryAdapter.getTaskStatusPlusList(
final Stream<TaskStatusPlus> taskStatusPlusStream = taskQueryTool.getTaskStatusPlusList(
taskLookups,
dataSource
).stream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.sun.jersey.spi.container.ContainerRequest;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.http.security.AbstractResourceFilter;
import org.apache.druid.server.security.Access;
Expand All @@ -49,16 +49,16 @@
*/
public class TaskResourceFilter extends AbstractResourceFilter
{
private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final TaskQueryTool taskQueryTool;

@Inject
public TaskResourceFilter(
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskQueryTool taskQueryTool,
AuthorizerMapper authorizerMapper
)
{
super(authorizerMapper);
this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.taskQueryTool = taskQueryTool;
}

@Override
Expand All @@ -76,7 +76,7 @@ public ContainerRequest filter(ContainerRequest request)

IdUtils.validateId("taskId", taskId);

Optional<Task> taskOptional = taskStorageQueryAdapter.getTask(taskId);
Optional<Task> taskOptional = taskQueryTool.getTask(taskId);
if (!taskOptional.isPresent()) {
throw new WebApplicationException(
Response.status(Response.Status.NOT_FOUND)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.TaskLookup;
import org.easymock.EasyMock;
import org.hamcrest.MatcherAssert;
import org.joda.time.Interval;
Expand All @@ -39,17 +40,17 @@

public class IndexerMetadataStorageAdapterTest
{
private TaskStorageQueryAdapter taskStorageQueryAdapter;
private TaskStorage taskStorage;
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;

@Before
public void setup()
{
indexerMetadataStorageCoordinator = EasyMock.strictMock(IndexerMetadataStorageCoordinator.class);
taskStorageQueryAdapter = EasyMock.strictMock(TaskStorageQueryAdapter.class);
taskStorage = EasyMock.strictMock(TaskStorage.class);
indexerMetadataStorageAdapter = new IndexerMetadataStorageAdapter(
taskStorageQueryAdapter,
taskStorage,
indexerMetadataStorageCoordinator
);
}
Expand All @@ -73,7 +74,7 @@ public void testDeletePendingSegments()
NoopTask.create()
)
);
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos);
EasyMock.expect(taskStorage.getTaskInfos(TaskLookup.activeTasksOnly(), "dataSource")).andReturn(taskInfos);

final Interval deleteInterval = Intervals.of("2017-01-01/2017-12-01");
EasyMock
Expand All @@ -84,7 +85,7 @@ public void testDeletePendingSegments()
)
)
.andReturn(10);
EasyMock.replay(taskStorageQueryAdapter, indexerMetadataStorageCoordinator);
EasyMock.replay(taskStorage, indexerMetadataStorageCoordinator);

Assert.assertEquals(10, indexerMetadataStorageAdapter.deletePendingSegments("dataSource", deleteInterval));
}
Expand All @@ -109,7 +110,8 @@ public void testDeletePendingSegmentsOfOneOverlappingRunningTask()
)
);

EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos);
EasyMock.expect(taskStorage.getTaskInfos(TaskLookup.activeTasksOnly(), "dataSource"))
.andReturn(taskInfos);

final Interval deleteInterval = Intervals.of("2017-01-01/2017-12-01");
EasyMock
Expand All @@ -120,7 +122,7 @@ public void testDeletePendingSegmentsOfOneOverlappingRunningTask()
)
)
.andReturn(10);
EasyMock.replay(taskStorageQueryAdapter, indexerMetadataStorageCoordinator);
EasyMock.replay(taskStorage, indexerMetadataStorageCoordinator);

MatcherAssert.assertThat(
Assert.assertThrows(
Expand Down Expand Up @@ -155,7 +157,8 @@ public void testDeletePendingSegmentsOfMultipleOverlappingRunningTasks()
)
);

EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos);
EasyMock.expect(taskStorage.getTaskInfos(TaskLookup.activeTasksOnly(), "dataSource"))
.andReturn(taskInfos);

final Interval deleteInterval = Intervals.of("2017-01-01/2018-12-01");
EasyMock
Expand All @@ -166,7 +169,7 @@ public void testDeletePendingSegmentsOfMultipleOverlappingRunningTasks()
)
)
.andReturn(10);
EasyMock.replay(taskStorageQueryAdapter, indexerMetadataStorageCoordinator);
EasyMock.replay(taskStorage, indexerMetadataStorageCoordinator);

MatcherAssert.assertThat(
Assert.assertThrows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public int compare(DataSegment dataSegment, DataSegment dataSegment2)
private final String taskStorageType;

private ObjectMapper mapper;
private TaskStorageQueryAdapter tsqa = null;
private TaskQueryTool tsqa = null;
private TaskStorage taskStorage = null;
private TaskLockbox taskLockbox = null;
private TaskQueue taskQueue = null;
Expand Down Expand Up @@ -477,7 +477,7 @@ private TaskStorage setUpTaskStorage()
TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes();
EasyMock.replay(taskMaster);
tsqa = new TaskStorageQueryAdapter(taskStorage, taskLockbox, taskMaster);
tsqa = new TaskQueryTool(taskStorage, taskLockbox, taskMaster);
return taskStorage;
}

Expand Down
Loading

0 comments on commit 89066b7

Please sign in to comment.