Skip to content

Commit

Permalink
Handle success and failure
Browse files Browse the repository at this point in the history
  • Loading branch information
kfaraz committed Jul 21, 2024
1 parent 6a09178 commit 197fe5d
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
* - [x] route compaction status API to overlord if scheduler is enabled
* - [x] skip run on coordinator if scheduler is enabled
* - [x] task state listener
* - [ ] handle success and failure inside DatasourceQueue
* - [x] handle success and failure inside CompactionStatusTracker
* - [x] make policy serializable
* - [ ] handle priority datasource in policy
* - [ ] add another policy
Expand Down Expand Up @@ -261,8 +261,6 @@ private synchronized void processCompactionQueue(
CoordinatorCompactionConfig currentConfig
)
{
statusTracker.onCompactionConfigUpdated(currentConfig);

DataSourcesSnapshot dataSourcesSnapshot
= segmentManager.getSnapshotOfDataSourcesWithAllUsedSegments();
final CoordinatorRunStats stats = new CoordinatorRunStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.joda.time.Interval;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -44,7 +43,8 @@ public class CompactionStatusTracker
private static final Logger log = new Logger(CompactionStatusTracker.class);

private final ObjectMapper objectMapper;
private final Map<String, Set<Interval>> datasourceToRecentlySubmittedIntervals = new HashMap<>();
private final Map<String, DatasourceStatus> datasourceStatuses = new HashMap<>();
private final Map<String, ClientCompactionTaskQuery> submittedTaskIdToPayload = new HashMap<>();

@Inject
public CompactionStatusTracker(
Expand All @@ -59,9 +59,9 @@ public CompactionStatus computeCompactionStatus(
DataSourceCompactionConfig config
)
{
final CompactionStatus status = CompactionStatus.compute(candidate, config, objectMapper);
if (status.isComplete()) {
return status;
final CompactionStatus compactionStatus = CompactionStatus.compute(candidate, config, objectMapper);
if (compactionStatus.isComplete()) {
return compactionStatus;
}

final long inputSegmentSize = config.getInputSegmentSizeBytes();
Expand All @@ -72,16 +72,30 @@ public CompactionStatus computeCompactionStatus(
);
}

final Set<Interval> recentlySubmittedIntervals
= datasourceToRecentlySubmittedIntervals.getOrDefault(config.getDataSource(), Collections.emptySet());
if (recentlySubmittedIntervals.contains(candidate.getUmbrellaInterval())) {
return CompactionStatus.skipped(
"Interval[%s] has been recently submitted for compaction",
candidate.getUmbrellaInterval()
);
final Interval compactionInterval = candidate.getUmbrellaInterval();

final IntervalStatus intervalStatus
= datasourceStatuses.getOrDefault(config.getDataSource(), DatasourceStatus.EMPTY)
.getIntervalStatuses()
.get(compactionInterval);

if (intervalStatus == null) {
return compactionStatus;
}

return status;
switch (intervalStatus.state) {
case TASK_SUBMITTED:
case COMPACTED:
case FAILED_ALL_RETRIES:
return CompactionStatus.skipped(
"Interval[%s] was recently submitted for compaction and has state[%s].",
compactionInterval, intervalStatus.state
);
default:
break;
}

return compactionStatus;
}

public void onCompactionConfigUpdated(CoordinatorCompactionConfig compactionConfig)
Expand All @@ -94,10 +108,10 @@ public void onCompactionConfigUpdated(CoordinatorCompactionConfig compactionConf
}

// Clean up state for datasources where compaction has been freshly disabled
final Set<String> allDatasources = new HashSet<>(datasourceToRecentlySubmittedIntervals.keySet());
final Set<String> allDatasources = new HashSet<>(datasourceStatuses.keySet());
allDatasources.forEach(datasource -> {
if (!compactionEnabledDatasources.contains(datasource)) {
datasourceToRecentlySubmittedIntervals.remove(datasource);
datasourceStatuses.remove(datasource);
}
});
}
Expand All @@ -107,27 +121,97 @@ public void onTaskSubmitted(
SegmentsToCompact candidateSegments
)
{
datasourceToRecentlySubmittedIntervals
.computeIfAbsent(taskPayload.getDataSource(), ds -> new HashSet<>())
.add(candidateSegments.getUmbrellaInterval());
final DatasourceStatus datasourceStatus = getOrComputeDatasourceStatus(taskPayload.getDataSource());

datasourceStatus.getIntervalStatuses().computeIfAbsent(
candidateSegments.getUmbrellaInterval(),
i -> new IntervalStatus(IntervalState.TASK_SUBMITTED, 0)
);

submittedTaskIdToPayload.put(taskPayload.getId(), taskPayload);
}

public void onTaskFinished(String taskId, TaskStatus taskStatus)
{
log.info("Task[%s] has new status[%s].", taskId, taskStatus);
if (!taskStatus.isComplete()) {
return;
}

if (taskStatus.isFailure()) {
// TODO: retry logic and other stuff
final ClientCompactionTaskQuery taskPayload = submittedTaskIdToPayload.remove(taskId);
if (taskPayload == null) {
// Nothing to do since we don't know the corresponding datasource or interval
return;
}

// Do not remove the interval of this task from recently submitted intervals
// as there might be some delay before the segments of this task are published
// and updated in the timeline. If compaction duty runs before the segments
// are published, it might re-submit the same task.
final DatasourceStatus datasourceStatus
= getOrComputeDatasourceStatus(taskPayload.getDataSource());
final Interval compactionInterval = taskPayload.getIoConfig().getInputSpec().getInterval();

final IntervalStatus lastKnownStatus = datasourceStatus.getIntervalStatuses().get(compactionInterval);

if (taskStatus.isSuccess()) {
datasourceStatus.intervalStatus.put(
compactionInterval,
new IntervalStatus(IntervalState.COMPACTED, 10)
);
} else if (lastKnownStatus == null) {
// This is the first failure
datasourceStatus.intervalStatus.put(
compactionInterval,
new IntervalStatus(IntervalState.FAILED, 0)
);
} else if (lastKnownStatus.state == IntervalState.FAILED
&& ++lastKnownStatus.retryCount > 10) {
// Failure retries have been exhausted
datasourceStatus.intervalStatus.put(
compactionInterval,
new IntervalStatus(IntervalState.FAILED_ALL_RETRIES, 10)
);
}
}

public void reset()
{
datasourceToRecentlySubmittedIntervals.clear();
datasourceStatuses.clear();
}

private DatasourceStatus getOrComputeDatasourceStatus(String datasource)
{
return datasourceStatuses.computeIfAbsent(datasource, ds -> new DatasourceStatus());
}

private static class DatasourceStatus
{
static final DatasourceStatus EMPTY = new DatasourceStatus();

final Map<Interval, IntervalStatus> intervalStatus = new HashMap<>();

Map<Interval, IntervalStatus> getIntervalStatuses()
{
return intervalStatus;
}
}

private static class IntervalStatus
{
final IntervalState state;
int turnsToSkip;
int retryCount;

IntervalStatus(IntervalState state, int turnsToSkip)
{
this.state = state;
this.turnsToSkip = turnsToSkip;
}

void markSkipped()
{
this.turnsToSkip--;
}
}

private enum IntervalState
{
TASK_SUBMITTED, COMPACTED, FAILED, FAILED_ALL_RETRIES
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public OverlordClient getOverlordClient()
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
statusTracker.reset();
run(
params.getCoordinatorCompactionConfig(),
params.getUsedSegmentsTimelinesPerDataSource(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,30 +50,35 @@ public CompactionConfigUpdateRequest(
this.compactionPolicy = compactionPolicy;
}

@Nullable
@JsonProperty
public Double getCompactionTaskSlotRatio()
{
return compactionTaskSlotRatio;
}

@Nullable
@JsonProperty
public Integer getMaxCompactionTaskSlots()
{
return maxCompactionTaskSlots;
}

@Nullable
@JsonProperty
public Boolean getUseAutoScaleSlots()
{
return useAutoScaleSlots;
}

@Nullable
@JsonProperty
public CompactionEngine getCompactionEngine()
{
return compactionEngine;
}

@Nullable
@JsonProperty
public CompactionSegmentSearchPolicy getCompactionPolicy()
{
Expand Down
6 changes: 1 addition & 5 deletions services/src/main/java/org/apache/druid/cli/CliOverlord.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,7 @@ public void configure(Binder binder)
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8090);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8290);

JsonConfigProvider.bind(
binder,
CentralizedDatasourceSchemaConfig.PROPERTY_PREFIX,
CentralizedDatasourceSchemaConfig.class
);
JsonConfigProvider.bind(binder, CentralizedDatasourceSchemaConfig.PROPERTY_PREFIX, CentralizedDatasourceSchemaConfig.class);

binder.bind(SegmentsMetadataManager.class)
.toProvider(SegmentsMetadataManagerProvider.class)
Expand Down

0 comments on commit 197fe5d

Please sign in to comment.