From 197fe5ddbf5a5ae29b930562308fed21eac85b66 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sun, 21 Jul 2024 20:06:59 +0530 Subject: [PATCH] Handle success and failure --- .../compact/CompactionSchedulerImpl.java | 4 +- .../compact/CompactionStatusTracker.java | 136 ++++++++++++++---- .../coordinator/duty/CompactSegments.java | 1 + .../http/CompactionConfigUpdateRequest.java | 5 + .../org/apache/druid/cli/CliOverlord.java | 6 +- 5 files changed, 118 insertions(+), 34 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSchedulerImpl.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSchedulerImpl.java index c700a46696d8..21683bf6daa3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSchedulerImpl.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSchedulerImpl.java @@ -78,7 +78,7 @@ * - [x] route compaction status API to overlord if scheduler is enabled * - [x] skip run on coordinator if scheduler is enabled * - [x] task state listener - * - [ ] handle success and failure inside DatasourceQueue + * - [x] handle success and failure inside CompactionStatusTracker * - [x] make policy serializable * - [ ] handle priority datasource in policy * - [ ] add another policy @@ -261,8 +261,6 @@ private synchronized void processCompactionQueue( CoordinatorCompactionConfig currentConfig ) { - statusTracker.onCompactionConfigUpdated(currentConfig); - DataSourcesSnapshot dataSourcesSnapshot = segmentManager.getSnapshotOfDataSourcesWithAllUsedSegments(); final CoordinatorRunStats stats = new CoordinatorRunStats(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatusTracker.java b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatusTracker.java index 3bf5bc65d4e3..88118987baf0 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatusTracker.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatusTracker.java @@ -28,7 +28,6 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.joda.time.Interval; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -44,7 +43,8 @@ public class CompactionStatusTracker private static final Logger log = new Logger(CompactionStatusTracker.class); private final ObjectMapper objectMapper; - private final Map> datasourceToRecentlySubmittedIntervals = new HashMap<>(); + private final Map datasourceStatuses = new HashMap<>(); + private final Map submittedTaskIdToPayload = new HashMap<>(); @Inject public CompactionStatusTracker( @@ -59,9 +59,9 @@ public CompactionStatus computeCompactionStatus( DataSourceCompactionConfig config ) { - final CompactionStatus status = CompactionStatus.compute(candidate, config, objectMapper); - if (status.isComplete()) { - return status; + final CompactionStatus compactionStatus = CompactionStatus.compute(candidate, config, objectMapper); + if (compactionStatus.isComplete()) { + return compactionStatus; } final long inputSegmentSize = config.getInputSegmentSizeBytes(); @@ -72,16 +72,30 @@ public CompactionStatus computeCompactionStatus( ); } - final Set recentlySubmittedIntervals - = datasourceToRecentlySubmittedIntervals.getOrDefault(config.getDataSource(), Collections.emptySet()); - if (recentlySubmittedIntervals.contains(candidate.getUmbrellaInterval())) { - return CompactionStatus.skipped( - "Interval[%s] has been recently submitted for compaction", - candidate.getUmbrellaInterval() - ); + final Interval compactionInterval = candidate.getUmbrellaInterval(); + + final IntervalStatus intervalStatus + = datasourceStatuses.getOrDefault(config.getDataSource(), DatasourceStatus.EMPTY) + .getIntervalStatuses() + .get(compactionInterval); + + if (intervalStatus == null) { + return compactionStatus; } - return status; + switch (intervalStatus.state) { + case TASK_SUBMITTED: + case COMPACTED: + case FAILED_ALL_RETRIES: + return CompactionStatus.skipped( + "Interval[%s] was recently submitted for compaction and has state[%s].", + compactionInterval, intervalStatus.state + ); + default: + break; + } + + return compactionStatus; } public void onCompactionConfigUpdated(CoordinatorCompactionConfig compactionConfig) @@ -94,10 +108,10 @@ public void onCompactionConfigUpdated(CoordinatorCompactionConfig compactionConf } // Clean up state for datasources where compaction has been freshly disabled - final Set allDatasources = new HashSet<>(datasourceToRecentlySubmittedIntervals.keySet()); + final Set allDatasources = new HashSet<>(datasourceStatuses.keySet()); allDatasources.forEach(datasource -> { if (!compactionEnabledDatasources.contains(datasource)) { - datasourceToRecentlySubmittedIntervals.remove(datasource); + datasourceStatuses.remove(datasource); } }); } @@ -107,27 +121,97 @@ public void onTaskSubmitted( SegmentsToCompact candidateSegments ) { - datasourceToRecentlySubmittedIntervals - .computeIfAbsent(taskPayload.getDataSource(), ds -> new HashSet<>()) - .add(candidateSegments.getUmbrellaInterval()); + final DatasourceStatus datasourceStatus = getOrComputeDatasourceStatus(taskPayload.getDataSource()); + + datasourceStatus.getIntervalStatuses().computeIfAbsent( + candidateSegments.getUmbrellaInterval(), + i -> new IntervalStatus(IntervalState.TASK_SUBMITTED, 0) + ); + + submittedTaskIdToPayload.put(taskPayload.getId(), taskPayload); } public void onTaskFinished(String taskId, TaskStatus taskStatus) { - log.info("Task[%s] has new status[%s].", taskId, taskStatus); + if (!taskStatus.isComplete()) { + return; + } - if (taskStatus.isFailure()) { - // TODO: retry logic and other stuff + final ClientCompactionTaskQuery taskPayload = submittedTaskIdToPayload.remove(taskId); + if (taskPayload == null) { + // Nothing to do since we don't know the corresponding datasource or interval + return; } - // Do not remove the interval of this task from recently submitted intervals - // as there might be some delay before the segments of this task are published - // and updated in the timeline. If compaction duty runs before the segments - // are published, it might re-submit the same task. + final DatasourceStatus datasourceStatus + = getOrComputeDatasourceStatus(taskPayload.getDataSource()); + final Interval compactionInterval = taskPayload.getIoConfig().getInputSpec().getInterval(); + + final IntervalStatus lastKnownStatus = datasourceStatus.getIntervalStatuses().get(compactionInterval); + + if (taskStatus.isSuccess()) { + datasourceStatus.intervalStatus.put( + compactionInterval, + new IntervalStatus(IntervalState.COMPACTED, 10) + ); + } else if (lastKnownStatus == null) { + // This is the first failure + datasourceStatus.intervalStatus.put( + compactionInterval, + new IntervalStatus(IntervalState.FAILED, 0) + ); + } else if (lastKnownStatus.state == IntervalState.FAILED + && ++lastKnownStatus.retryCount > 10) { + // Failure retries have been exhausted + datasourceStatus.intervalStatus.put( + compactionInterval, + new IntervalStatus(IntervalState.FAILED_ALL_RETRIES, 10) + ); + } } public void reset() { - datasourceToRecentlySubmittedIntervals.clear(); + datasourceStatuses.clear(); + } + + private DatasourceStatus getOrComputeDatasourceStatus(String datasource) + { + return datasourceStatuses.computeIfAbsent(datasource, ds -> new DatasourceStatus()); + } + + private static class DatasourceStatus + { + static final DatasourceStatus EMPTY = new DatasourceStatus(); + + final Map intervalStatus = new HashMap<>(); + + Map getIntervalStatuses() + { + return intervalStatus; + } + } + + private static class IntervalStatus + { + final IntervalState state; + int turnsToSkip; + int retryCount; + + IntervalStatus(IntervalState state, int turnsToSkip) + { + this.state = state; + this.turnsToSkip = turnsToSkip; + } + + void markSkipped() + { + this.turnsToSkip--; + } + } + + private enum IntervalState + { + TASK_SUBMITTED, COMPACTED, FAILED, FAILED_ALL_RETRIES } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 8963e96fc455..cecc30672154 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -116,6 +116,7 @@ public OverlordClient getOverlordClient() @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { + statusTracker.reset(); run( params.getCoordinatorCompactionConfig(), params.getUsedSegmentsTimelinesPerDataSource(), diff --git a/server/src/main/java/org/apache/druid/server/http/CompactionConfigUpdateRequest.java b/server/src/main/java/org/apache/druid/server/http/CompactionConfigUpdateRequest.java index e74d3482e751..dc9d62a55f27 100644 --- a/server/src/main/java/org/apache/druid/server/http/CompactionConfigUpdateRequest.java +++ b/server/src/main/java/org/apache/druid/server/http/CompactionConfigUpdateRequest.java @@ -50,30 +50,35 @@ public CompactionConfigUpdateRequest( this.compactionPolicy = compactionPolicy; } + @Nullable @JsonProperty public Double getCompactionTaskSlotRatio() { return compactionTaskSlotRatio; } + @Nullable @JsonProperty public Integer getMaxCompactionTaskSlots() { return maxCompactionTaskSlots; } + @Nullable @JsonProperty public Boolean getUseAutoScaleSlots() { return useAutoScaleSlots; } + @Nullable @JsonProperty public CompactionEngine getCompactionEngine() { return compactionEngine; } + @Nullable @JsonProperty public CompactionSegmentSearchPolicy getCompactionPolicy() { diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index d08eb2c399f7..21d9f30c7d43 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -207,11 +207,7 @@ public void configure(Binder binder) binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8090); binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8290); - JsonConfigProvider.bind( - binder, - CentralizedDatasourceSchemaConfig.PROPERTY_PREFIX, - CentralizedDatasourceSchemaConfig.class - ); + JsonConfigProvider.bind(binder, CentralizedDatasourceSchemaConfig.PROPERTY_PREFIX, CentralizedDatasourceSchemaConfig.class); binder.bind(SegmentsMetadataManager.class) .toProvider(SegmentsMetadataManagerProvider.class)