Skip to content

Commit

Permalink
Refactor: Cleanup coordinator duties for metadata cleanup (apache#14631)
Browse files Browse the repository at this point in the history
Changes
- Add abstract class `MetadataCleanupDuty`
- Make `KillAuditLogs`, `KillCompactionConfig`, etc extend `MetadataCleanupDuty` 
- Improve log and error messages
- Cleanup tests
- No functional change
  • Loading branch information
kfaraz authored Aug 5, 2023
1 parent 62ddeaf commit 2d8e0f2
Show file tree
Hide file tree
Showing 20 changed files with 547 additions and 510 deletions.
4 changes: 2 additions & 2 deletions docs/development/modules.md
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,12 @@ This config file adds the configs below to enable a custom coordinator duty.
```
druid.coordinator.dutyGroups=["cleanupMetadata"]
druid.coordinator.cleanupMetadata.duties=["killSupervisors"]
druid.coordinator.cleanupMetadata.duty.killSupervisors.retainDuration=PT0M
druid.coordinator.cleanupMetadata.duty.killSupervisors.durationToRetain=PT0M
druid.coordinator.cleanupMetadata.period=PT10S
```

These configurations create a custom coordinator duty group called `cleanupMetadata` which runs a custom coordinator duty called `killSupervisors` every 10 seconds.
The custom coordinator duty `killSupervisors` also has a config called `retainDuration` which is set to 0 minute.
The custom coordinator duty `killSupervisors` also has a config called `durationToRetain` which is set to 0 minute.

### Routing data through a HTTP proxy for your extension

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
# If you are making a change in load list below, make the necessary changes in github actions too
druid_extensions_loadList=["druid-kafka-indexing-service","mysql-metadata-storage","druid-s3-extensions","druid-basic-security","simple-client-sslcontext","druid-testing-tools","druid-lookups-cached-global","druid-histogram","druid-datasketches"]

druid_coordinator_period_metadataStoreManagementPeriod=PT1H
druid_coordinator_period_metadataStoreManagementPeriod=PT5S
druid_sql_planner_authorizeSystemTablesDirectly=false

#Testing kill supervisor custom coordinator duty
druid_coordinator_kill_supervisor_on=false
druid_coordinator_dutyGroups=["cleanupMetadata"]
druid_coordinator_cleanupMetadata_duties=["killSupervisors"]
druid_coordinator_cleanupMetadata_duty_killSupervisors_retainDuration=PT0M
druid_coordinator_cleanupMetadata_duty_killSupervisors_durationToRetain=PT0M
druid_coordinator_cleanupMetadata_period=PT10S
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,6 @@ public void run()
.withSnapshotOfDataSourcesWithAllUsedSegments(dataSourcesSnapshot)
.withDynamicConfigs(getDynamicConfigs())
.withCompactionConfig(getCompactionConfig())
.withEmitter(emitter)
.build();
log.info(
"Initialized run params for group [%s] with [%,d] used segments in [%d] datasources.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
Expand Down Expand Up @@ -68,7 +67,6 @@ private static TreeSet<DataSegment> createUsedSegmentsSet(Iterable<DataSegment>
private final StrategicSegmentAssigner segmentAssigner;
private final @Nullable TreeSet<DataSegment> usedSegments;
private final @Nullable DataSourcesSnapshot dataSourcesSnapshot;
private final ServiceEmitter emitter;
private final CoordinatorDynamicConfig coordinatorDynamicConfig;
private final CoordinatorCompactionConfig coordinatorCompactionConfig;
private final SegmentLoadingConfig segmentLoadingConfig;
Expand All @@ -83,7 +81,6 @@ private DruidCoordinatorRuntimeParams(
StrategicSegmentAssigner segmentAssigner,
@Nullable TreeSet<DataSegment> usedSegments,
@Nullable DataSourcesSnapshot dataSourcesSnapshot,
ServiceEmitter emitter,
CoordinatorDynamicConfig coordinatorDynamicConfig,
CoordinatorCompactionConfig coordinatorCompactionConfig,
SegmentLoadingConfig segmentLoadingConfig,
Expand All @@ -98,7 +95,6 @@ private DruidCoordinatorRuntimeParams(
this.segmentAssigner = segmentAssigner;
this.usedSegments = usedSegments;
this.dataSourcesSnapshot = dataSourcesSnapshot;
this.emitter = emitter;
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.coordinatorCompactionConfig = coordinatorCompactionConfig;
this.segmentLoadingConfig = segmentLoadingConfig;
Expand Down Expand Up @@ -145,11 +141,6 @@ public TreeSet<DataSegment> getUsedSegments()
return usedSegments;
}

public ServiceEmitter getEmitter()
{
return emitter;
}

public CoordinatorDynamicConfig getCoordinatorDynamicConfig()
{
return coordinatorDynamicConfig;
Expand Down Expand Up @@ -200,7 +191,6 @@ public Builder buildFromExisting()
segmentAssigner,
usedSegments,
dataSourcesSnapshot,
emitter,
coordinatorDynamicConfig,
coordinatorCompactionConfig,
segmentLoadingConfig,
Expand All @@ -219,7 +209,6 @@ public static class Builder
private StrategicSegmentAssigner segmentAssigner;
private @Nullable TreeSet<DataSegment> usedSegments;
private @Nullable DataSourcesSnapshot dataSourcesSnapshot;
private ServiceEmitter emitter;
private CoordinatorDynamicConfig coordinatorDynamicConfig;
private CoordinatorCompactionConfig coordinatorCompactionConfig;
private SegmentLoadingConfig segmentLoadingConfig;
Expand All @@ -242,7 +231,6 @@ private Builder(
StrategicSegmentAssigner segmentAssigner,
@Nullable TreeSet<DataSegment> usedSegments,
@Nullable DataSourcesSnapshot dataSourcesSnapshot,
ServiceEmitter emitter,
CoordinatorDynamicConfig coordinatorDynamicConfig,
CoordinatorCompactionConfig coordinatorCompactionConfig,
SegmentLoadingConfig segmentLoadingConfig,
Expand All @@ -257,7 +245,6 @@ private Builder(
this.segmentAssigner = segmentAssigner;
this.usedSegments = usedSegments;
this.dataSourcesSnapshot = dataSourcesSnapshot;
this.emitter = emitter;
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.coordinatorCompactionConfig = coordinatorCompactionConfig;
this.segmentLoadingConfig = segmentLoadingConfig;
Expand All @@ -279,7 +266,6 @@ public DruidCoordinatorRuntimeParams build()
segmentAssigner,
usedSegments,
dataSourcesSnapshot,
emitter,
coordinatorDynamicConfig,
coordinatorCompactionConfig,
segmentLoadingConfig,
Expand Down Expand Up @@ -369,12 +355,6 @@ public Builder withUsedSegmentsInTest(Collection<DataSegment> usedSegments)
return this;
}

public Builder withEmitter(ServiceEmitter emitter)
{
this.emitter = emitter;
return this;
}

public Builder withDynamicConfigs(CoordinatorDynamicConfig configs)
{
this.coordinatorDynamicConfig = configs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,14 @@

package org.apache.druid.server.coordinator.duty;

import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.stats.Stats;
import org.joda.time.DateTime;

public class KillAuditLog implements CoordinatorDuty
public class KillAuditLog extends MetadataCleanupDuty
{
private static final Logger log = new Logger(KillAuditLog.class);

private final long period;
private final long retainDuration;
private long lastKillTime = 0;

private final AuditManager auditManager;

@Inject
Expand All @@ -44,44 +35,20 @@ public KillAuditLog(
DruidCoordinatorConfig config
)
{
this.period = config.getCoordinatorAuditKillPeriod().getMillis();
Preconditions.checkArgument(
this.period >= config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
"coordinator audit kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"
);
this.retainDuration = config.getCoordinatorAuditKillDurationToRetain().getMillis();
Preconditions.checkArgument(this.retainDuration >= 0, "coordinator audit kill retainDuration must be >= 0");
Preconditions.checkArgument(this.retainDuration < System.currentTimeMillis(), "Coordinator audit kill retainDuration cannot be greater than current time in ms");
log.debug(
"Audit Kill Task scheduling enabled with period [%s], retainDuration [%s]",
this.period,
this.retainDuration
super(
"audit logs",
"druid.coordinator.kill.audit",
config.getCoordinatorAuditKillPeriod(),
config.getCoordinatorAuditKillDurationToRetain(),
Stats.Kill.AUDIT_LOGS,
config
);
this.auditManager = auditManager;
}

@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime)
{
long currentTimeMillis = System.currentTimeMillis();
if ((lastKillTime + period) < currentTimeMillis) {
lastKillTime = currentTimeMillis;
long timestamp = currentTimeMillis - retainDuration;
try {
int auditRemoved = auditManager.removeAuditLogsOlderThan(timestamp);
ServiceEmitter emitter = params.getEmitter();
emitter.emit(
new ServiceMetricEvent.Builder().build(
"metadata/kill/audit/count",
auditRemoved
)
);
log.info("Finished running KillAuditLog duty. Removed %,d audit logs", auditRemoved);
}
catch (Exception e) {
log.error(e, "Failed to kill audit log");
}
}
return params;
return auditManager.removeAuditLogsOlderThan(minCreatedTime.getMillis());
}
}
Loading

0 comments on commit 2d8e0f2

Please sign in to comment.