diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedFromSegmentIdsAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedFromSegmentIdsAction.java index 67f7ae6e1317..4f54131dee6f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedFromSegmentIdsAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedFromSegmentIdsAction.java @@ -73,12 +73,6 @@ public UpgradedFromSegmentsResponse perform(Task task, TaskActionToolbox toolbox ); } - @Override - public boolean isAudited() - { - return false; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedToSegmentIdsAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedToSegmentIdsAction.java index 412c9604d114..256a663a1a47 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedToSegmentIdsAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUpgradedToSegmentIdsAction.java @@ -78,12 +78,6 @@ public UpgradedToSegmentsResponse perform(Task task, TaskActionToolbox toolbox) ); } - @Override - public boolean isAudited() - { - return false; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapter.java index 6d23fcec327e..c3e498aa18f7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapter.java @@ -25,6 +25,7 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.metadata.TaskLookup; import org.joda.time.Interval; import java.util.Comparator; @@ -32,16 +33,16 @@ public class IndexerMetadataStorageAdapter { - private final TaskStorageQueryAdapter taskStorageQueryAdapter; + private final TaskStorage taskStorage; private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; @Inject public IndexerMetadataStorageAdapter( - TaskStorageQueryAdapter taskStorageQueryAdapter, + TaskStorage taskStorage, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator ) { - this.taskStorageQueryAdapter = taskStorageQueryAdapter; + this.taskStorage = taskStorage; this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator; } @@ -49,8 +50,8 @@ public int deletePendingSegments(String dataSource, Interval deleteInterval) { // Find the earliest active task created for the specified datasource; if one exists, // check if its interval overlaps with the delete interval. - final Optional> earliestActiveTaskOptional = taskStorageQueryAdapter - .getActiveTaskInfo(dataSource) + final Optional> earliestActiveTaskOptional = taskStorage + .getTaskInfos(TaskLookup.activeTasksOnly(), dataSource) .stream() .min(Comparator.comparing(TaskInfo::getCreatedTime)); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java similarity index 81% rename from indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java rename to indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java index 8a454f5a231b..29ca16f5aa95 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java @@ -27,7 +27,6 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.TaskLookup; -import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.joda.time.Interval; @@ -36,25 +35,23 @@ import java.util.Map; /** - * Wraps a {@link TaskStorage}, providing a useful collection of read-only methods. + * Provides read-only methods to fetch information related to tasks. + * This class may serve information that is cached in memory in {@link TaskQueue} + * or {@link TaskLockbox}. If not present in memory, then the underlying + * {@link TaskStorage} is queried. */ -public class TaskStorageQueryAdapter +public class TaskQueryTool { private final TaskStorage storage; private final TaskLockbox taskLockbox; - private final Optional taskQueue; + private final TaskMaster taskMaster; @Inject - public TaskStorageQueryAdapter(TaskStorage storage, TaskLockbox taskLockbox, TaskMaster taskMaster) + public TaskQueryTool(TaskStorage storage, TaskLockbox taskLockbox, TaskMaster taskMaster) { this.storage = storage; this.taskLockbox = taskLockbox; - this.taskQueue = taskMaster.getTaskQueue(); - } - - public List getActiveTasks() - { - return storage.getActiveTasks(); + this.taskMaster = taskMaster; } /** @@ -85,7 +82,7 @@ public Map> getLockedIntervals(Map minTa public List> getActiveTaskInfo(@Nullable String dataSource) { return storage.getTaskInfos( - ActiveTaskLookup.getInstance(), + TaskLookup.activeTasksOnly(), dataSource ); } @@ -98,20 +95,21 @@ public List getTaskStatusPlusList( return storage.getTaskStatusPlusList(taskLookups, dataSource); } - public Optional getTask(final String taskid) + public Optional getTask(final String taskId) { + final Optional taskQueue = taskMaster.getTaskQueue(); if (taskQueue.isPresent()) { - Optional activeTask = taskQueue.get().getActiveTask(taskid); + Optional activeTask = taskQueue.get().getActiveTask(taskId); if (activeTask.isPresent()) { return activeTask; } } - return storage.getTask(taskid); + return storage.getTask(taskId); } - public Optional getStatus(final String taskid) + public Optional getStatus(final String taskId) { - return storage.getStatus(taskid); + return storage.getStatus(taskId); } @Nullable diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 07155f00462d..f945cc95c1e0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -47,10 +47,10 @@ import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.TaskQueryTool; import org.apache.druid.indexing.overlord.TaskQueue; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; -import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter; import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter; import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; @@ -125,7 +125,7 @@ public class OverlordResource private static final Logger log = new Logger(OverlordResource.class); private final TaskMaster taskMaster; - private final TaskStorageQueryAdapter taskStorageQueryAdapter; + private final TaskQueryTool taskQueryTool; private final IndexerMetadataStorageAdapter indexerMetadataStorageAdapter; private final TaskLogStreamer taskLogStreamer; private final JacksonConfigManager configManager; @@ -162,7 +162,7 @@ private static TaskStateLookup fromString(@Nullable String state) @Inject public OverlordResource( TaskMaster taskMaster, - TaskStorageQueryAdapter taskStorageQueryAdapter, + TaskQueryTool taskQueryTool, IndexerMetadataStorageAdapter indexerMetadataStorageAdapter, TaskLogStreamer taskLogStreamer, JacksonConfigManager configManager, @@ -174,7 +174,7 @@ public OverlordResource( ) { this.taskMaster = taskMaster; - this.taskStorageQueryAdapter = taskStorageQueryAdapter; + this.taskQueryTool = taskQueryTool; this.indexerMetadataStorageAdapter = indexerMetadataStorageAdapter; this.taskLogStreamer = taskLogStreamer; this.configManager = configManager; @@ -284,7 +284,7 @@ public Response getDatasourceLockedIntervals(Map minTaskPriorit } // Build the response - return Response.ok(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority)).build(); + return Response.ok(taskQueryTool.getLockedIntervals(minTaskPriority)).build(); } @POST @@ -298,7 +298,7 @@ public Response getDatasourceLockedIntervalsV2(List lockFilter } // Build the response - return Response.ok(taskStorageQueryAdapter.getLockedIntervals(lockFilterPolicies)).build(); + return Response.ok(taskQueryTool.getLockedIntervals(lockFilterPolicies)).build(); } @GET @@ -309,7 +309,7 @@ public Response getTaskPayload(@PathParam("taskid") String taskid) { final TaskPayloadResponse response = new TaskPayloadResponse( taskid, - taskStorageQueryAdapter.getTask(taskid).orNull() + taskQueryTool.getTask(taskid).orNull() ); final Response.Status status = response.getPayload() == null @@ -325,7 +325,7 @@ public Response getTaskPayload(@PathParam("taskid") String taskid) @ResourceFilters(TaskResourceFilter.class) public Response getTaskStatus(@PathParam("taskid") String taskid) { - final TaskInfo taskInfo = taskStorageQueryAdapter.getTaskInfo(taskid); + final TaskInfo taskInfo = taskQueryTool.getTaskInfo(taskid); TaskStatusResponse response = null; if (taskInfo != null) { @@ -440,7 +440,7 @@ public Response shutdownTasksForDataSource(@PathParam("dataSource") final String @Override public Response apply(TaskQueue taskQueue) { - final List> tasks = taskStorageQueryAdapter.getActiveTaskInfo(dataSource); + final List> tasks = taskQueryTool.getActiveTaskInfo(dataSource); if (tasks.isEmpty()) { return Response.status(Status.NOT_FOUND).build(); } else { @@ -471,7 +471,7 @@ public Response getMultipleTaskStatuses(Set taskIds) if (taskQueue.isPresent()) { optional = taskQueue.get().getTaskStatus(taskId); } else { - optional = taskStorageQueryAdapter.getStatus(taskId); + optional = taskQueryTool.getStatus(taskId); } if (optional.isPresent()) { result.put(taskId, optional.get()); @@ -866,7 +866,7 @@ private Stream getTaskStatusPlusList( throw new IAE("Unknown state: [%s]", state); } - final Stream taskStatusPlusStream = taskStorageQueryAdapter.getTaskStatusPlusList( + final Stream taskStatusPlusStream = taskQueryTool.getTaskStatusPlusList( taskLookups, dataSource ).stream(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java index ad1f526eda64..a9f66ce30e72 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java @@ -26,7 +26,7 @@ import com.sun.jersey.spi.container.ContainerRequest; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter; +import org.apache.druid.indexing.overlord.TaskQueryTool; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.http.security.AbstractResourceFilter; import org.apache.druid.server.security.Access; @@ -49,16 +49,16 @@ */ public class TaskResourceFilter extends AbstractResourceFilter { - private final TaskStorageQueryAdapter taskStorageQueryAdapter; + private final TaskQueryTool taskQueryTool; @Inject public TaskResourceFilter( - TaskStorageQueryAdapter taskStorageQueryAdapter, + TaskQueryTool taskQueryTool, AuthorizerMapper authorizerMapper ) { super(authorizerMapper); - this.taskStorageQueryAdapter = taskStorageQueryAdapter; + this.taskQueryTool = taskQueryTool; } @Override @@ -76,7 +76,7 @@ public ContainerRequest filter(ContainerRequest request) IdUtils.validateId("taskId", taskId); - Optional taskOptional = taskStorageQueryAdapter.getTask(taskId); + Optional taskOptional = taskQueryTool.getTask(taskId); if (!taskOptional.isPresent()) { throw new WebApplicationException( Response.status(Response.Status.NOT_FOUND) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java index 633d861410d8..5226c9d735b8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.metadata.TaskLookup; import org.easymock.EasyMock; import org.hamcrest.MatcherAssert; import org.joda.time.Interval; @@ -39,7 +40,7 @@ public class IndexerMetadataStorageAdapterTest { - private TaskStorageQueryAdapter taskStorageQueryAdapter; + private TaskStorage taskStorage; private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter; @@ -47,9 +48,9 @@ public class IndexerMetadataStorageAdapterTest public void setup() { indexerMetadataStorageCoordinator = EasyMock.strictMock(IndexerMetadataStorageCoordinator.class); - taskStorageQueryAdapter = EasyMock.strictMock(TaskStorageQueryAdapter.class); + taskStorage = EasyMock.strictMock(TaskStorage.class); indexerMetadataStorageAdapter = new IndexerMetadataStorageAdapter( - taskStorageQueryAdapter, + taskStorage, indexerMetadataStorageCoordinator ); } @@ -73,7 +74,7 @@ public void testDeletePendingSegments() NoopTask.create() ) ); - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos); + EasyMock.expect(taskStorage.getTaskInfos(TaskLookup.activeTasksOnly(), "dataSource")).andReturn(taskInfos); final Interval deleteInterval = Intervals.of("2017-01-01/2017-12-01"); EasyMock @@ -84,7 +85,7 @@ public void testDeletePendingSegments() ) ) .andReturn(10); - EasyMock.replay(taskStorageQueryAdapter, indexerMetadataStorageCoordinator); + EasyMock.replay(taskStorage, indexerMetadataStorageCoordinator); Assert.assertEquals(10, indexerMetadataStorageAdapter.deletePendingSegments("dataSource", deleteInterval)); } @@ -109,7 +110,8 @@ public void testDeletePendingSegmentsOfOneOverlappingRunningTask() ) ); - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos); + EasyMock.expect(taskStorage.getTaskInfos(TaskLookup.activeTasksOnly(), "dataSource")) + .andReturn(taskInfos); final Interval deleteInterval = Intervals.of("2017-01-01/2017-12-01"); EasyMock @@ -120,7 +122,7 @@ public void testDeletePendingSegmentsOfOneOverlappingRunningTask() ) ) .andReturn(10); - EasyMock.replay(taskStorageQueryAdapter, indexerMetadataStorageCoordinator); + EasyMock.replay(taskStorage, indexerMetadataStorageCoordinator); MatcherAssert.assertThat( Assert.assertThrows( @@ -155,7 +157,8 @@ public void testDeletePendingSegmentsOfMultipleOverlappingRunningTasks() ) ); - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos); + EasyMock.expect(taskStorage.getTaskInfos(TaskLookup.activeTasksOnly(), "dataSource")) + .andReturn(taskInfos); final Interval deleteInterval = Intervals.of("2017-01-01/2018-12-01"); EasyMock @@ -166,7 +169,7 @@ public void testDeletePendingSegmentsOfMultipleOverlappingRunningTasks() ) ) .andReturn(10); - EasyMock.replay(taskStorageQueryAdapter, indexerMetadataStorageCoordinator); + EasyMock.replay(taskStorage, indexerMetadataStorageCoordinator); MatcherAssert.assertThat( Assert.assertThrows( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 4384be25bc1d..7b1209e79291 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -234,7 +234,7 @@ public int compare(DataSegment dataSegment, DataSegment dataSegment2) private final String taskStorageType; private ObjectMapper mapper; - private TaskStorageQueryAdapter tsqa = null; + private TaskQueryTool tsqa = null; private TaskStorage taskStorage = null; private TaskLockbox taskLockbox = null; private TaskQueue taskQueue = null; @@ -477,7 +477,7 @@ private TaskStorage setUpTaskStorage() TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes(); EasyMock.replay(taskMaster); - tsqa = new TaskStorageQueryAdapter(taskStorage, taskLockbox, taskMaster); + tsqa = new TaskQueryTool(taskStorage, taskLockbox, taskMaster); return taskStorage; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 3af672a665b5..4f2c3a38794f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -42,10 +42,10 @@ import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.TaskQueryTool; import org.apache.druid.indexing.overlord.TaskQueue; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; -import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter; import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter; import org.apache.druid.indexing.overlord.autoscaling.AutoScaler; @@ -106,7 +106,7 @@ public class OverlordResourceTest private JacksonConfigManager configManager; private ProvisioningStrategy provisioningStrategy; private AuthConfig authConfig; - private TaskStorageQueryAdapter taskStorageQueryAdapter; + private TaskQueryTool taskQueryTool; private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter; private HttpServletRequest req; private TaskRunner taskRunner; @@ -126,7 +126,7 @@ public void setUp() provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class); authConfig = EasyMock.createMock(AuthConfig.class); taskMaster = EasyMock.createStrictMock(TaskMaster.class); - taskStorageQueryAdapter = EasyMock.createStrictMock(TaskStorageQueryAdapter.class); + taskQueryTool = EasyMock.createStrictMock(TaskQueryTool.class); indexerMetadataStorageAdapter = EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class); req = EasyMock.createStrictMock(HttpServletRequest.class); workerTaskRunnerQueryAdapter = EasyMock.createStrictMock(WorkerTaskRunnerQueryAdapter.class); @@ -171,7 +171,7 @@ public Access authorize(AuthenticationResult authenticationResult, Resource reso overlordResource = new OverlordResource( taskMaster, - taskStorageQueryAdapter, + taskQueryTool, indexerMetadataStorageAdapter, null, configManager, @@ -189,7 +189,7 @@ public void tearDown() EasyMock.verify( taskRunner, taskMaster, - taskStorageQueryAdapter, + taskQueryTool, indexerMetadataStorageAdapter, req, workerTaskRunnerQueryAdapter, @@ -202,7 +202,7 @@ private void replayAll() EasyMock.replay( taskRunner, taskMaster, - taskStorageQueryAdapter, + taskQueryTool, indexerMetadataStorageAdapter, req, workerTaskRunnerQueryAdapter, @@ -247,7 +247,7 @@ public void testSecuredGetWaitingTask() { expectAuthorizationTokenCheck(); EasyMock.expect( - taskStorageQueryAdapter.getTaskStatusPlusList( + taskQueryTool.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null ) @@ -282,7 +282,7 @@ public void testSecuredGetCompleteTasks() List tasksIds = ImmutableList.of("id_1", "id_2", "id_3"); EasyMock.expect( - taskStorageQueryAdapter.getTaskStatusPlusList( + taskQueryTool.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), null) ).andStubReturn( ImmutableList.of( @@ -292,6 +292,7 @@ public void testSecuredGetCompleteTasks() ) ); replayAll(); + List responseObjects = (List) overlordResource .getCompleteTasks(null, req).getEntity(); @@ -312,7 +313,7 @@ public void testSecuredGetRunningTasks() ) ); EasyMock.expect( - taskStorageQueryAdapter.getTaskStatusPlusList( + taskQueryTool.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null ) @@ -339,7 +340,7 @@ public void testGetTasks() { expectAuthorizationTokenCheck(); EasyMock.expect( - taskStorageQueryAdapter.getTaskStatusPlusList( + taskQueryTool.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance(), @@ -367,6 +368,7 @@ public void testGetTasks() ).atLeastOnce(); replayAll(); + List responseObjects = (List) overlordResource .getTasks(null, null, null, null, null, req) .getEntity(); @@ -379,7 +381,7 @@ public void testGetTasksFilterDataSource() expectAuthorizationTokenCheck(); //completed tasks EasyMock.expect( - taskStorageQueryAdapter.getTaskStatusPlusList( + taskQueryTool.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null), @@ -421,7 +423,7 @@ public void testGetTasksFilterWaitingState() expectAuthorizationTokenCheck(); //active tasks EasyMock.expect( - taskStorageQueryAdapter.getTaskStatusPlusList( + taskQueryTool.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance() @@ -445,6 +447,7 @@ public void testGetTasksFilterWaitingState() ); replayAll(); + List responseObjects = (List) overlordResource .getTasks( "waiting", @@ -463,7 +466,7 @@ public void testGetTasksFilterRunningState() { expectAuthorizationTokenCheck(); EasyMock.expect( - taskStorageQueryAdapter.getTaskStatusPlusList( + taskQueryTool.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance() @@ -513,7 +516,7 @@ public void testGetTasksFilterPendingState() ) ); EasyMock.expect( - taskStorageQueryAdapter.getTaskStatusPlusList( + taskQueryTool.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null ) @@ -547,7 +550,7 @@ public void testGetTasksFilterCompleteState() { expectAuthorizationTokenCheck(); EasyMock.expect( - taskStorageQueryAdapter.getTaskStatusPlusList( + taskQueryTool.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), null ) @@ -559,6 +562,7 @@ public void testGetTasksFilterCompleteState() ) ); replayAll(); + List responseObjects = (List) overlordResource .getTasks("complete", null, null, null, null, req) .getEntity(); @@ -573,7 +577,7 @@ public void testGetTasksFilterCompleteStateWithInterval() expectAuthorizationTokenCheck(); Duration duration = new Period("PT86400S").toStandardDuration(); EasyMock.expect( - taskStorageQueryAdapter.getTaskStatusPlusList( + taskQueryTool.getTaskStatusPlusList( EasyMock.anyObject(), EasyMock.anyObject() ) @@ -586,6 +590,7 @@ public void testGetTasksFilterCompleteStateWithInterval() ); replayAll(); + String interval = "2010-01-01_P1D"; List responseObjects = (List) overlordResource .getTasks("complete", null, interval, null, null, req) @@ -604,7 +609,7 @@ public void testGetTasksRequiresDatasourceRead() // Setup mocks to return completed, active, known, pending and running tasks EasyMock.expect( - taskStorageQueryAdapter.getTaskStatusPlusList( + taskQueryTool.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null), @@ -653,7 +658,7 @@ public void testGetTasksFilterByTaskTypeRequiresDatasourceRead() // Setup mocks to return completed, active, known, pending and running tasks EasyMock.expect( - taskStorageQueryAdapter.getTaskStatusPlusList( + taskQueryTool.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null), @@ -711,7 +716,7 @@ public void testGetCompleteTasksOfAllDatasources() { expectAuthorizationTokenCheck(); EasyMock.expect( - taskStorageQueryAdapter.getTaskStatusPlusList( + taskQueryTool.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null) @@ -726,6 +731,7 @@ public void testGetCompleteTasksOfAllDatasources() ) ); replayAll(); + List responseObjects = (List) overlordResource .getTasks("complete", null, null, null, null, req) .getEntity(); @@ -738,6 +744,7 @@ public void testGetCompleteTasksOfAllDatasources() public void testGetTasksNegativeState() { replayAll(); + Object responseObject = overlordResource .getTasks("blah", "ds_test", null, null, null, req) .getEntity(); @@ -755,6 +762,7 @@ public void testSecuredTaskPost() EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false); replayAll(); + Task task = NoopTask.create(); overlordResource.taskPost(task, req); } @@ -936,10 +944,10 @@ public void testGetTaskPayload() throws Exception // This should be fixed in https://github.com/apache/druid/issues/6685. // expectAuthorizationTokenCheck(); final NoopTask task = NoopTask.create(); - EasyMock.expect(taskStorageQueryAdapter.getTask("mytask")) + EasyMock.expect(taskQueryTool.getTask("mytask")) .andReturn(Optional.of(task)); - EasyMock.expect(taskStorageQueryAdapter.getTask("othertask")) + EasyMock.expect(taskQueryTool.getTask("othertask")) .andReturn(Optional.absent()); replayAll(); @@ -970,7 +978,7 @@ public void testGetTaskStatus() throws Exception final String taskId = task.getId(); final TaskStatus status = TaskStatus.running(taskId); - EasyMock.expect(taskStorageQueryAdapter.getTaskInfo(taskId)) + EasyMock.expect(taskQueryTool.getTaskInfo(taskId)) .andReturn(new TaskInfo( task.getId(), DateTimes.of("2018-01-01"), @@ -979,7 +987,7 @@ public void testGetTaskStatus() throws Exception task )); - EasyMock.expect(taskStorageQueryAdapter.getTaskInfo("othertask")) + EasyMock.expect(taskQueryTool.getTaskInfo("othertask")) .andReturn(null); EasyMock.>expect(taskRunner.getKnownTasks()) @@ -1034,7 +1042,7 @@ public void testGetLockedIntervals() throws Exception ) ); - EasyMock.expect(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority)) + EasyMock.expect(taskQueryTool.getLockedIntervals(minTaskPriority)) .andReturn(expectedLockedIntervals); replayAll(); @@ -1104,7 +1112,7 @@ public void testShutdownAllTasks() EasyMock.expect(taskMaster.getTaskQueue()).andReturn( Optional.of(mockQueue) ).anyTimes(); - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("datasource")).andStubReturn(ImmutableList.of( + EasyMock.expect(taskQueryTool.getActiveTaskInfo("datasource")).andStubReturn(ImmutableList.of( new TaskInfo<>( "id_1", DateTime.now(ISOChronology.getInstanceUTC()), @@ -1140,7 +1148,7 @@ public void testShutdownAllTasksForNonExistingDataSource() final TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class); EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(EasyMock.anyString())).andReturn(Collections.emptyList()); + EasyMock.expect(taskQueryTool.getActiveTaskInfo(EasyMock.anyString())).andReturn(Collections.emptyList()); replayAll(); final Response response = overlordResource.shutdownTasksForDataSource("notExisting"); @@ -1232,6 +1240,7 @@ public void testGetTotalWorkerCapacityWithUnknown() EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); replayAll(); + final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); @@ -1247,6 +1256,7 @@ public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButWorkerBehaviorConfi EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); replayAll(); + final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); @@ -1263,6 +1273,7 @@ public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButAutoScaleNotConfigu EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); replayAll(); + final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); @@ -1471,15 +1482,16 @@ public void testGetMultipleTaskStatuses_absentTaskQueue() { replayAll(); - TaskStorageQueryAdapter taskStorageQueryAdapter = EasyMock.createMock(TaskStorageQueryAdapter.class); - EasyMock.expect(taskStorageQueryAdapter.getStatus("task")) + TaskQueryTool taskQueryTool = EasyMock.createMock(TaskQueryTool.class); + EasyMock.expect(taskQueryTool.getStatus("task")) .andReturn(Optional.of(TaskStatus.running("task"))); + TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()); - EasyMock.replay(taskMaster, taskStorageQueryAdapter); + EasyMock.replay(taskMaster, taskQueryTool); OverlordResource overlordResource = new OverlordResource( taskMaster, - taskStorageQueryAdapter, + taskQueryTool, null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index aab3639b2e63..9bbf2e9fc8a5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -52,12 +52,12 @@ import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.TaskQueryTool; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerFactory; import org.apache.druid.indexing.overlord.TaskRunnerListener; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskStorage; -import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter; import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; @@ -269,14 +269,14 @@ public void testOverlordRun() throws Exception Assert.assertEquals(taskMaster.getCurrentLeader(), druidNode.getHostAndPort()); Assert.assertEquals(Optional.absent(), taskMaster.getRedirectLocation()); - final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage, taskLockbox, taskMaster); + final TaskQueryTool taskQueryTool = new TaskQueryTool(taskStorage, taskLockbox, taskMaster); final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter = new WorkerTaskRunnerQueryAdapter(taskMaster, null); // Test Overlord resource stuff AuditManager auditManager = EasyMock.createNiceMock(AuditManager.class); overlordResource = new OverlordResource( taskMaster, - taskStorageQueryAdapter, - new IndexerMetadataStorageAdapter(taskStorageQueryAdapter, null), + taskQueryTool, + new IndexerMetadataStorageAdapter(taskStorage, null), null, null, auditManager, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java index 3b775ca94915..a25c6aec6215 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java @@ -26,7 +26,7 @@ import com.sun.jersey.spi.container.ResourceFilter; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter; +import org.apache.druid.indexing.overlord.TaskQueryTool; import org.apache.druid.indexing.overlord.http.OverlordResource; import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; @@ -60,7 +60,7 @@ public static Collection data() return ImmutableList.copyOf( Iterables.concat( getRequestPaths(OverlordResource.class, ImmutableList.of( - TaskStorageQueryAdapter.class, + TaskQueryTool.class, AuthorizerMapper.class ) ), @@ -84,7 +84,7 @@ public static Collection data() private static boolean mockedOnceTsqa; private static boolean mockedOnceSM; - private TaskStorageQueryAdapter tsqa; + private TaskQueryTool tsqa; private SupervisorManager supervisorManager; public OverlordSecurityResourceFilterTest( @@ -107,7 +107,7 @@ public void setUp() // Since we are creating the mocked tsqa object only once and getting that object from Guice here therefore // if the mockedOnce check is not done then we will call EasyMock.expect and EasyMock.replay on the mocked object // multiple times and it will throw exceptions - tsqa = injector.getInstance(TaskStorageQueryAdapter.class); + tsqa = injector.getInstance(TaskQueryTool.class); EasyMock.expect(tsqa.getTask(EasyMock.anyString())).andReturn(Optional.of(noopTask)).anyTimes(); EasyMock.replay(tsqa); mockedOnceTsqa = true; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilterTest.java index 59abcaacdf80..8a7684b5ccd0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilterTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilterTest.java @@ -21,7 +21,7 @@ import com.google.common.base.Optional; import com.sun.jersey.spi.container.ContainerRequest; -import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter; +import org.apache.druid.indexing.overlord.TaskQueryTool; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.server.security.AuthorizerMapper; import org.easymock.EasyMock; @@ -42,7 +42,7 @@ public class TaskResourceFilterTest { private AuthorizerMapper authorizerMapper; - private TaskStorageQueryAdapter taskStorageQueryAdapter; + private TaskQueryTool taskQueryTool; private ContainerRequest containerRequest; private TaskResourceFilter resourceFilter; @@ -50,9 +50,9 @@ public class TaskResourceFilterTest public void setup() { authorizerMapper = EasyMock.createMock(AuthorizerMapper.class); - taskStorageQueryAdapter = EasyMock.createMock(TaskStorageQueryAdapter.class); + taskQueryTool = EasyMock.createMock(TaskQueryTool.class); containerRequest = EasyMock.createMock(ContainerRequest.class); - resourceFilter = new TaskResourceFilter(taskStorageQueryAdapter, authorizerMapper); + resourceFilter = new TaskResourceFilter(taskQueryTool, authorizerMapper); } @Test @@ -68,11 +68,11 @@ public void testTaskNotFound() expect(supervisorSpec.getDataSources()) .andReturn(Collections.singletonList(taskId)) .anyTimes(); - expect(taskStorageQueryAdapter.getTask(taskId)) + expect(taskQueryTool.getTask(taskId)) .andReturn(Optional.absent()) .atLeastOnce(); EasyMock.replay(containerRequest); - EasyMock.replay(taskStorageQueryAdapter); + EasyMock.replay(taskQueryTool); WebApplicationException expected = null; try { @@ -84,7 +84,7 @@ public void testTaskNotFound() Assert.assertNotNull(expected); Assert.assertEquals(expected.getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode()); EasyMock.verify(containerRequest); - EasyMock.verify(taskStorageQueryAdapter); + EasyMock.verify(taskQueryTool); } private List getPathSegments(String path) diff --git a/processing/src/main/java/org/apache/druid/metadata/TaskLookup.java b/processing/src/main/java/org/apache/druid/metadata/TaskLookup.java index 75bbb32431c4..6bd4280c5a3d 100644 --- a/processing/src/main/java/org/apache/druid/metadata/TaskLookup.java +++ b/processing/src/main/java/org/apache/druid/metadata/TaskLookup.java @@ -43,6 +43,11 @@ enum TaskLookupType COMPLETE } + static TaskLookup activeTasksOnly() + { + return ActiveTaskLookup.getInstance(); + } + /** * Whether this lookup is guaranteed to not return any tasks. */ diff --git a/processing/src/test/java/org/apache/druid/metadata/TaskLookupTest.java b/processing/src/test/java/org/apache/druid/metadata/TaskLookupTest.java index 76d90737a5f5..fe372558b64d 100644 --- a/processing/src/test/java/org/apache/druid/metadata/TaskLookupTest.java +++ b/processing/src/test/java/org/apache/druid/metadata/TaskLookupTest.java @@ -130,7 +130,7 @@ public void testSingleton() @Test public void testGetType() { - Assert.assertEquals(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance().getType()); + Assert.assertEquals(TaskLookupType.ACTIVE, TaskLookup.activeTasksOnly().getType()); } } } diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index da4b7e02714e..4dd23f2faf2a 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -74,9 +74,9 @@ import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory; import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.TaskQueryTool; import org.apache.druid.indexing.overlord.TaskRunnerFactory; import org.apache.druid.indexing.overlord.TaskStorage; -import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter; import org.apache.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerProvisioningConfig; import org.apache.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerProvisioningStrategy; import org.apache.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig; @@ -229,7 +229,7 @@ public void configure(Binder binder) binder.bind(TaskActionClientFactory.class).to(LocalTaskActionClientFactory.class).in(LazySingleton.class); binder.bind(TaskActionToolbox.class).in(LazySingleton.class); binder.bind(TaskLockbox.class).in(LazySingleton.class); - binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class); + binder.bind(TaskQueryTool.class).in(LazySingleton.class); binder.bind(IndexerMetadataStorageAdapter.class).in(LazySingleton.class); binder.bind(SupervisorManager.class).in(LazySingleton.class);