From 2d8e0f28f36f83e091911cc7b36a57b10fd065cc Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 5 Aug 2023 13:08:23 +0530 Subject: [PATCH] Refactor: Cleanup coordinator duties for metadata cleanup (#14631) Changes - Add abstract class `MetadataCleanupDuty` - Make `KillAuditLogs`, `KillCompactionConfig`, etc extend `MetadataCleanupDuty` - Improve log and error messages - Cleanup tests - No functional change --- docs/development/modules.md | 4 +- .../test-groups/custom-coordinator-duties | 4 +- .../server/coordinator/DruidCoordinator.java | 1 - .../DruidCoordinatorRuntimeParams.java | 20 --- .../server/coordinator/duty/KillAuditLog.java | 57 ++----- .../duty/KillCompactionConfig.java | 160 ++++++++---------- .../duty/KillDatasourceMetadata.java | 92 ++++------ .../server/coordinator/duty/KillRules.java | 62 ++----- .../coordinator/duty/KillSupervisors.java | 61 ++----- .../duty/KillSupervisorsCustomDuty.java | 72 ++++---- .../coordinator/duty/MetadataCleanupDuty.java | 135 +++++++++++++++ .../druid/server/coordinator/stats/Stats.java | 14 ++ .../coordinator/BalanceSegmentsProfiler.java | 1 - .../coordinator/DruidCoordinatorTest.java | 6 +- .../coordinator/duty/KillAuditLogTest.java | 49 ++++-- .../duty/KillCompactionConfigTest.java | 88 ++++------ .../duty/KillDatasourceMetadataTest.java | 72 +++++--- .../coordinator/duty/KillRulesTest.java | 47 ++--- .../duty/KillSupervisorsCustomDutyTest.java | 63 +++++-- .../coordinator/duty/KillSupervisorsTest.java | 49 ++++-- 20 files changed, 547 insertions(+), 510 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/server/coordinator/duty/MetadataCleanupDuty.java diff --git a/docs/development/modules.md b/docs/development/modules.md index a0d2335194de..75f4bbbe5461 100644 --- a/docs/development/modules.md +++ b/docs/development/modules.md @@ -351,12 +351,12 @@ This config file adds the configs below to enable a custom coordinator duty. ``` druid.coordinator.dutyGroups=["cleanupMetadata"] druid.coordinator.cleanupMetadata.duties=["killSupervisors"] -druid.coordinator.cleanupMetadata.duty.killSupervisors.retainDuration=PT0M +druid.coordinator.cleanupMetadata.duty.killSupervisors.durationToRetain=PT0M druid.coordinator.cleanupMetadata.period=PT10S ``` These configurations create a custom coordinator duty group called `cleanupMetadata` which runs a custom coordinator duty called `killSupervisors` every 10 seconds. -The custom coordinator duty `killSupervisors` also has a config called `retainDuration` which is set to 0 minute. +The custom coordinator duty `killSupervisors` also has a config called `durationToRetain` which is set to 0 minute. ### Routing data through a HTTP proxy for your extension diff --git a/integration-tests/docker/environment-configs/test-groups/custom-coordinator-duties b/integration-tests/docker/environment-configs/test-groups/custom-coordinator-duties index 5c75c4197d35..cea6370c2914 100644 --- a/integration-tests/docker/environment-configs/test-groups/custom-coordinator-duties +++ b/integration-tests/docker/environment-configs/test-groups/custom-coordinator-duties @@ -20,12 +20,12 @@ # If you are making a change in load list below, make the necessary changes in github actions too druid_extensions_loadList=["druid-kafka-indexing-service","mysql-metadata-storage","druid-s3-extensions","druid-basic-security","simple-client-sslcontext","druid-testing-tools","druid-lookups-cached-global","druid-histogram","druid-datasketches"] -druid_coordinator_period_metadataStoreManagementPeriod=PT1H +druid_coordinator_period_metadataStoreManagementPeriod=PT5S druid_sql_planner_authorizeSystemTablesDirectly=false #Testing kill supervisor custom coordinator duty druid_coordinator_kill_supervisor_on=false druid_coordinator_dutyGroups=["cleanupMetadata"] druid_coordinator_cleanupMetadata_duties=["killSupervisors"] -druid_coordinator_cleanupMetadata_duty_killSupervisors_retainDuration=PT0M +druid_coordinator_cleanupMetadata_duty_killSupervisors_durationToRetain=PT0M druid_coordinator_cleanupMetadata_period=PT10S diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index f8ea8ef24f35..272de190109f 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -699,7 +699,6 @@ public void run() .withSnapshotOfDataSourcesWithAllUsedSegments(dataSourcesSnapshot) .withDynamicConfigs(getDynamicConfigs()) .withCompactionConfig(getCompactionConfig()) - .withEmitter(emitter) .build(); log.info( "Initialized run params for group [%s] with [%,d] used segments in [%d] datasources.", diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java index ed1c57dbb193..79173683fa40 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java @@ -23,7 +23,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import org.apache.druid.client.DataSourcesSnapshot; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.server.coordinator.balancer.BalancerStrategy; import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; @@ -68,7 +67,6 @@ private static TreeSet createUsedSegmentsSet(Iterable private final StrategicSegmentAssigner segmentAssigner; private final @Nullable TreeSet usedSegments; private final @Nullable DataSourcesSnapshot dataSourcesSnapshot; - private final ServiceEmitter emitter; private final CoordinatorDynamicConfig coordinatorDynamicConfig; private final CoordinatorCompactionConfig coordinatorCompactionConfig; private final SegmentLoadingConfig segmentLoadingConfig; @@ -83,7 +81,6 @@ private DruidCoordinatorRuntimeParams( StrategicSegmentAssigner segmentAssigner, @Nullable TreeSet usedSegments, @Nullable DataSourcesSnapshot dataSourcesSnapshot, - ServiceEmitter emitter, CoordinatorDynamicConfig coordinatorDynamicConfig, CoordinatorCompactionConfig coordinatorCompactionConfig, SegmentLoadingConfig segmentLoadingConfig, @@ -98,7 +95,6 @@ private DruidCoordinatorRuntimeParams( this.segmentAssigner = segmentAssigner; this.usedSegments = usedSegments; this.dataSourcesSnapshot = dataSourcesSnapshot; - this.emitter = emitter; this.coordinatorDynamicConfig = coordinatorDynamicConfig; this.coordinatorCompactionConfig = coordinatorCompactionConfig; this.segmentLoadingConfig = segmentLoadingConfig; @@ -145,11 +141,6 @@ public TreeSet getUsedSegments() return usedSegments; } - public ServiceEmitter getEmitter() - { - return emitter; - } - public CoordinatorDynamicConfig getCoordinatorDynamicConfig() { return coordinatorDynamicConfig; @@ -200,7 +191,6 @@ public Builder buildFromExisting() segmentAssigner, usedSegments, dataSourcesSnapshot, - emitter, coordinatorDynamicConfig, coordinatorCompactionConfig, segmentLoadingConfig, @@ -219,7 +209,6 @@ public static class Builder private StrategicSegmentAssigner segmentAssigner; private @Nullable TreeSet usedSegments; private @Nullable DataSourcesSnapshot dataSourcesSnapshot; - private ServiceEmitter emitter; private CoordinatorDynamicConfig coordinatorDynamicConfig; private CoordinatorCompactionConfig coordinatorCompactionConfig; private SegmentLoadingConfig segmentLoadingConfig; @@ -242,7 +231,6 @@ private Builder( StrategicSegmentAssigner segmentAssigner, @Nullable TreeSet usedSegments, @Nullable DataSourcesSnapshot dataSourcesSnapshot, - ServiceEmitter emitter, CoordinatorDynamicConfig coordinatorDynamicConfig, CoordinatorCompactionConfig coordinatorCompactionConfig, SegmentLoadingConfig segmentLoadingConfig, @@ -257,7 +245,6 @@ private Builder( this.segmentAssigner = segmentAssigner; this.usedSegments = usedSegments; this.dataSourcesSnapshot = dataSourcesSnapshot; - this.emitter = emitter; this.coordinatorDynamicConfig = coordinatorDynamicConfig; this.coordinatorCompactionConfig = coordinatorCompactionConfig; this.segmentLoadingConfig = segmentLoadingConfig; @@ -279,7 +266,6 @@ public DruidCoordinatorRuntimeParams build() segmentAssigner, usedSegments, dataSourcesSnapshot, - emitter, coordinatorDynamicConfig, coordinatorCompactionConfig, segmentLoadingConfig, @@ -369,12 +355,6 @@ public Builder withUsedSegmentsInTest(Collection usedSegments) return this; } - public Builder withEmitter(ServiceEmitter emitter) - { - this.emitter = emitter; - return this; - } - public Builder withDynamicConfigs(CoordinatorDynamicConfig configs) { this.coordinatorDynamicConfig = configs; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java index ac735c6594aa..754bec1d4fd5 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java @@ -19,23 +19,14 @@ package org.apache.druid.server.coordinator.duty; -import com.google.common.base.Preconditions; import com.google.inject.Inject; import org.apache.druid.audit.AuditManager; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; -import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.stats.Stats; +import org.joda.time.DateTime; -public class KillAuditLog implements CoordinatorDuty +public class KillAuditLog extends MetadataCleanupDuty { - private static final Logger log = new Logger(KillAuditLog.class); - - private final long period; - private final long retainDuration; - private long lastKillTime = 0; - private final AuditManager auditManager; @Inject @@ -44,44 +35,20 @@ public KillAuditLog( DruidCoordinatorConfig config ) { - this.period = config.getCoordinatorAuditKillPeriod().getMillis(); - Preconditions.checkArgument( - this.period >= config.getCoordinatorMetadataStoreManagementPeriod().getMillis(), - "coordinator audit kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod" - ); - this.retainDuration = config.getCoordinatorAuditKillDurationToRetain().getMillis(); - Preconditions.checkArgument(this.retainDuration >= 0, "coordinator audit kill retainDuration must be >= 0"); - Preconditions.checkArgument(this.retainDuration < System.currentTimeMillis(), "Coordinator audit kill retainDuration cannot be greater than current time in ms"); - log.debug( - "Audit Kill Task scheduling enabled with period [%s], retainDuration [%s]", - this.period, - this.retainDuration + super( + "audit logs", + "druid.coordinator.kill.audit", + config.getCoordinatorAuditKillPeriod(), + config.getCoordinatorAuditKillDurationToRetain(), + Stats.Kill.AUDIT_LOGS, + config ); this.auditManager = auditManager; } @Override - public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime) { - long currentTimeMillis = System.currentTimeMillis(); - if ((lastKillTime + period) < currentTimeMillis) { - lastKillTime = currentTimeMillis; - long timestamp = currentTimeMillis - retainDuration; - try { - int auditRemoved = auditManager.removeAuditLogsOlderThan(timestamp); - ServiceEmitter emitter = params.getEmitter(); - emitter.emit( - new ServiceMetricEvent.Builder().build( - "metadata/kill/audit/count", - auditRemoved - ) - ); - log.info("Finished running KillAuditLog duty. Removed %,d audit logs", auditRemoved); - } - catch (Exception e) { - log.error(e, "Failed to kill audit log"); - } - } - return params; + return auditManager.removeAuditLogsOlderThan(minCreatedTime.getMillis()); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java index 74645b51b4e7..4b92cfa9b9bb 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java @@ -19,7 +19,6 @@ package org.apache.druid.server.coordinator.duty; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import org.apache.druid.audit.AuditInfo; @@ -28,15 +27,15 @@ import org.apache.druid.java.util.RetryableException; import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.MetadataStorageConnector; import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.SqlSegmentsMetadataManager; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; -import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.stats.Stats; +import org.joda.time.DateTime; +import org.joda.time.Duration; import java.util.Map; import java.util.Set; @@ -48,16 +47,11 @@ * Note that this will delete compaction configuration for inactive datasources * (datasource with no used and unused segments) immediately. */ -public class KillCompactionConfig implements CoordinatorDuty +public class KillCompactionConfig extends MetadataCleanupDuty { private static final Logger log = new Logger(KillCompactionConfig.class); private static final int UPDATE_NUM_RETRY = 5; - static final String COUNT_METRIC = "metadata/kill/compaction/count"; - - private final long period; - private long lastKillTime = 0; - private final JacksonConfigManager jacksonConfigManager; private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager; private final MetadataStorageConnector connector; @@ -72,99 +66,85 @@ public KillCompactionConfig( MetadataStorageTablesConfig connectorConfig ) { + super( + "compaction configs", + "druid.coordinator.kill.compaction", + config.getCoordinatorCompactionKillPeriod(), + Duration.millis(1), // Retain duration is ignored + Stats.Kill.COMPACTION_CONFIGS, + config + ); this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager; this.jacksonConfigManager = jacksonConfigManager; - this.period = config.getCoordinatorCompactionKillPeriod().getMillis(); this.connector = connector; this.connectorConfig = connectorConfig; - Preconditions.checkArgument( - this.period >= config.getCoordinatorMetadataStoreManagementPeriod().getMillis(), - "Coordinator compaction configuration kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod" - ); - log.debug( - "Compaction Configuration Kill Task scheduling enabled with period [%s]", - this.period - ); } @Override - public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime) { - long currentTimeMillis = System.currentTimeMillis(); - if ((lastKillTime + period) < currentTimeMillis) { - lastKillTime = currentTimeMillis; - try { - RetryUtils.retry( - () -> { - final byte[] currentBytes = CoordinatorCompactionConfig.getConfigInByteFromDb(connector, connectorConfig); - final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig(jacksonConfigManager, currentBytes); - // If current compaction config is empty then there is nothing to do - if (CoordinatorCompactionConfig.empty().equals(current)) { - log.info( - "Finished running KillCompactionConfig duty. Nothing to do as compaction config is already empty."); - emitMetric(params.getEmitter(), 0); - return ConfigManager.SetResult.ok(); - } - - // Get all active datasources - // Note that we get all active datasources after getting compaction config to prevent race condition if new - // datasource and config are added. - Set activeDatasources = sqlSegmentsMetadataManager.retrieveAllDataSourceNames(); - final Map updated = current - .getCompactionConfigs() - .stream() - .filter(dataSourceCompactionConfig -> activeDatasources.contains(dataSourceCompactionConfig.getDataSource())) - .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity())); - - // Calculate number of compaction configs to remove for logging - int compactionConfigRemoved = current.getCompactionConfigs().size() - updated.size(); - - ConfigManager.SetResult result = jacksonConfigManager.set( - CoordinatorCompactionConfig.CONFIG_KEY, - currentBytes, - CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(updated.values())), - new AuditInfo( - "KillCompactionConfig", - "CoordinatorDuty for automatic deletion of compaction config", - "" - ) - ); - if (result.isOk()) { - log.info( - "Finished running KillCompactionConfig duty. Removed %,d compaction configs", - compactionConfigRemoved - ); - emitMetric(params.getEmitter(), compactionConfigRemoved); - } else if (result.isRetryable()) { - // Failed but is retryable - log.debug("Retrying KillCompactionConfig duty"); - throw new RetryableException(result.getException()); - } else { - // Failed and not retryable - log.error(result.getException(), "Failed to kill compaction configurations"); - emitMetric(params.getEmitter(), 0); - } - return result; - }, - e -> e instanceof RetryableException, - UPDATE_NUM_RETRY - ); - } - catch (Exception e) { - log.error(e, "Failed to kill compaction configurations"); - emitMetric(params.getEmitter(), 0); - } + try { + return RetryUtils.retry( + this::tryDeleteCompactionConfigs, + e -> e instanceof RetryableException, + UPDATE_NUM_RETRY + ); + } + catch (Exception e) { + log.error(e, "Failed to kill compaction configurations"); + return 0; } - return params; } - private void emitMetric(ServiceEmitter emitter, int compactionConfigRemoved) + /** + * Tries to delete compaction configs for inactive datasources and returns + * the number of compaction configs successfully removed. + */ + private int tryDeleteCompactionConfigs() throws RetryableException { - emitter.emit( - new ServiceMetricEvent.Builder().build( - COUNT_METRIC, - compactionConfigRemoved + final byte[] currentBytes = CoordinatorCompactionConfig.getConfigInByteFromDb(connector, connectorConfig); + final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.convertByteToConfig( + jacksonConfigManager, + currentBytes + ); + // If current compaction config is empty then there is nothing to do + if (CoordinatorCompactionConfig.empty().equals(current)) { + log.info("Nothing to do as compaction config is already empty."); + return 0; + } + + // Get all active datasources + // Note that we get all active datasources after getting compaction config to prevent race condition if new + // datasource and config are added. + Set activeDatasources = sqlSegmentsMetadataManager.retrieveAllDataSourceNames(); + final Map updated = current + .getCompactionConfigs() + .stream() + .filter(dataSourceCompactionConfig -> activeDatasources.contains(dataSourceCompactionConfig.getDataSource())) + .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity())); + + // Calculate number of compaction configs removed + int compactionConfigRemoved = current.getCompactionConfigs().size() - updated.size(); + + ConfigManager.SetResult result = jacksonConfigManager.set( + CoordinatorCompactionConfig.CONFIG_KEY, + currentBytes, + CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(updated.values())), + new AuditInfo( + "KillCompactionConfig", + "CoordinatorDuty for automatic deletion of compaction config", + "" ) ); + + if (result.isOk()) { + return compactionConfigRemoved; + } else if (result.isRetryable()) { + log.debug("Retrying KillCompactionConfig duty"); + throw new RetryableException(result.getException()); + } else { + log.error(result.getException(), "Failed to kill compaction configurations"); + return 0; + } } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java index 9c9535b2502a..7fbf1b1deb56 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java @@ -19,17 +19,14 @@ package org.apache.druid.server.coordinator.duty; -import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.inject.Inject; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.MetadataSupervisorManager; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; -import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.stats.Stats; +import org.joda.time.DateTime; import java.util.Collection; import java.util.Map; @@ -42,14 +39,8 @@ * Note that this class relies on the supervisorSpec.getDataSources names to match with the * 'datasource' column of the datasource metadata table. */ -public class KillDatasourceMetadata implements CoordinatorDuty +public class KillDatasourceMetadata extends MetadataCleanupDuty { - private static final Logger log = new Logger(KillDatasourceMetadata.class); - - private final long period; - private final long retainDuration; - private long lastKillTime = 0; - private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; private final MetadataSupervisorManager metadataSupervisorManager; @@ -60,62 +51,37 @@ public KillDatasourceMetadata( MetadataSupervisorManager metadataSupervisorManager ) { + super( + "datasources", + "druid.coordinator.kill.datasource", + config.getCoordinatorDatasourceKillPeriod(), + config.getCoordinatorDatasourceKillDurationToRetain(), + Stats.Kill.DATASOURCES, + config + ); this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator; this.metadataSupervisorManager = metadataSupervisorManager; - this.period = config.getCoordinatorDatasourceKillPeriod().getMillis(); - Preconditions.checkArgument( - this.period >= config.getCoordinatorMetadataStoreManagementPeriod().getMillis(), - "Coordinator datasource metadata kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod" - ); - this.retainDuration = config.getCoordinatorDatasourceKillDurationToRetain().getMillis(); - Preconditions.checkArgument(this.retainDuration >= 0, "Coordinator datasource metadata kill retainDuration must be >= 0"); - Preconditions.checkArgument(this.retainDuration < System.currentTimeMillis(), "Coordinator datasource metadata kill retainDuration cannot be greater than current time in ms"); - log.debug( - "Datasource Metadata Kill Task scheduling enabled with period [%s], retainDuration [%s]", - this.period, - this.retainDuration - ); } @Override - public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime) { - long currentTimeMillis = System.currentTimeMillis(); - if ((lastKillTime + period) < currentTimeMillis) { - lastKillTime = currentTimeMillis; - long timestamp = currentTimeMillis - retainDuration; - try { - // Datasource metadata only exists for datasource with supervisor - // To determine if datasource metadata is still active, we check if the supervisor for that particular datasource - // is still active or not - Map allActiveSupervisor = metadataSupervisorManager.getLatestActiveOnly(); - Set allDatasourceWithActiveSupervisor = allActiveSupervisor.values() - .stream() - .map(supervisorSpec -> supervisorSpec.getDataSources()) - .flatMap(Collection::stream) - .filter(datasource -> !Strings.isNullOrEmpty(datasource)) - .collect(Collectors.toSet()); - // We exclude removing datasource metadata with active supervisor - int datasourceMetadataRemovedCount = indexerMetadataStorageCoordinator.removeDataSourceMetadataOlderThan( - timestamp, - allDatasourceWithActiveSupervisor - ); - ServiceEmitter emitter = params.getEmitter(); - emitter.emit( - new ServiceMetricEvent.Builder().build( - "metadata/kill/datasource/count", - datasourceMetadataRemovedCount - ) - ); - log.info( - "Finished running KillDatasourceMetadata duty. Removed %,d datasource metadata", - datasourceMetadataRemovedCount - ); - } - catch (Exception e) { - log.error(e, "Failed to kill datasource metadata"); - } - } - return params; + // Datasource metadata only exists for datasource with supervisor + // To determine if datasource metadata is still active, we check if the supervisor for that particular datasource + // is still active or not + Map allActiveSupervisor = metadataSupervisorManager.getLatestActiveOnly(); + Set allDatasourceWithActiveSupervisor + = allActiveSupervisor.values() + .stream() + .map(SupervisorSpec::getDataSources) + .flatMap(Collection::stream) + .filter(datasource -> !Strings.isNullOrEmpty(datasource)) + .collect(Collectors.toSet()); + + // We exclude removing datasource metadata with active supervisor + return indexerMetadataStorageCoordinator.removeDataSourceMetadataOlderThan( + minCreatedTime.getMillis(), + allDatasourceWithActiveSupervisor + ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java index 50b1740e8643..40964959b308 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java @@ -19,64 +19,36 @@ package org.apache.druid.server.coordinator.duty; -import com.google.common.base.Preconditions; import com.google.inject.Inject; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; -import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.stats.Stats; +import org.joda.time.DateTime; -public class KillRules implements CoordinatorDuty +public class KillRules extends MetadataCleanupDuty { - private static final Logger log = new Logger(KillRules.class); - - private final long period; - private final long retainDuration; - private long lastKillTime = 0; + private final MetadataRuleManager metadataRuleManager; @Inject public KillRules( - DruidCoordinatorConfig config + DruidCoordinatorConfig config, + MetadataRuleManager metadataRuleManager ) { - this.period = config.getCoordinatorRuleKillPeriod().getMillis(); - Preconditions.checkArgument( - this.period >= config.getCoordinatorMetadataStoreManagementPeriod().getMillis(), - "coordinator rule kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod" - ); - this.retainDuration = config.getCoordinatorRuleKillDurationToRetain().getMillis(); - Preconditions.checkArgument(this.retainDuration >= 0, "coordinator rule kill retainDuration must be >= 0"); - Preconditions.checkArgument(this.retainDuration < System.currentTimeMillis(), "Coordinator rule kill retainDuration cannot be greater than current time in ms"); - log.debug( - "Rule Kill Task scheduling enabled with period [%s], retainDuration [%s]", - this.period, - this.retainDuration + super( + "rules", + "druid.coordinator.kill.rule", + config.getCoordinatorRuleKillPeriod(), + config.getCoordinatorRuleKillDurationToRetain(), + Stats.Kill.RULES, + config ); + this.metadataRuleManager = metadataRuleManager; } @Override - public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime) { - long currentTimeMillis = System.currentTimeMillis(); - if ((lastKillTime + period) < currentTimeMillis) { - lastKillTime = currentTimeMillis; - long timestamp = currentTimeMillis - retainDuration; - try { - int ruleRemoved = params.getDatabaseRuleManager().removeRulesForEmptyDatasourcesOlderThan(timestamp); - ServiceEmitter emitter = params.getEmitter(); - emitter.emit( - new ServiceMetricEvent.Builder().build( - "metadata/kill/rule/count", - ruleRemoved - ) - ); - log.info("Finished running KillRules duty. Removed %,d rule", ruleRemoved); - } - catch (Exception e) { - log.error(e, "Failed to kill rules metadata"); - } - } - return params; + return metadataRuleManager.removeRulesForEmptyDatasourcesOlderThan(minCreatedTime.getMillis()); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java index 3f87f5dee824..7f6c43d5ee0e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java @@ -19,26 +19,17 @@ package org.apache.druid.server.coordinator.duty; -import com.google.common.base.Preconditions; import com.google.inject.Inject; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.MetadataSupervisorManager; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; -import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.stats.Stats; +import org.joda.time.DateTime; /** - * CoordinatorDuty for automatic deletion of terminated supervisors from the supervisor table in metadata storage. + * Cleans up terminated supervisors from the supervisors table in metadata storage. */ -public class KillSupervisors implements CoordinatorDuty +public class KillSupervisors extends MetadataCleanupDuty { - private static final Logger log = new Logger(KillSupervisors.class); - - private final long period; - private final long retainDuration; - private long lastKillTime = 0; - private final MetadataSupervisorManager metadataSupervisorManager; @Inject @@ -47,44 +38,20 @@ public KillSupervisors( MetadataSupervisorManager metadataSupervisorManager ) { - this.metadataSupervisorManager = metadataSupervisorManager; - this.period = config.getCoordinatorSupervisorKillPeriod().getMillis(); - Preconditions.checkArgument( - this.period >= config.getCoordinatorMetadataStoreManagementPeriod().getMillis(), - "Coordinator supervisor kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod" - ); - this.retainDuration = config.getCoordinatorSupervisorKillDurationToRetain().getMillis(); - Preconditions.checkArgument(this.retainDuration >= 0, "Coordinator supervisor kill retainDuration must be >= 0"); - Preconditions.checkArgument(this.retainDuration < System.currentTimeMillis(), "Coordinator supervisor kill retainDuration cannot be greater than current time in ms"); - log.debug( - "Supervisor Kill Task scheduling enabled with period [%s], retainDuration [%s]", - this.period, - this.retainDuration + super( + "supervisors", + "druid.coordinator.kill.supervisor", + config.getCoordinatorSupervisorKillPeriod(), + config.getCoordinatorSupervisorKillDurationToRetain(), + Stats.Kill.SUPERVISOR_SPECS, + config ); + this.metadataSupervisorManager = metadataSupervisorManager; } @Override - public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime) { - long currentTimeMillis = System.currentTimeMillis(); - if ((lastKillTime + period) < currentTimeMillis) { - lastKillTime = currentTimeMillis; - long timestamp = currentTimeMillis - retainDuration; - try { - int supervisorRemoved = metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(timestamp); - ServiceEmitter emitter = params.getEmitter(); - emitter.emit( - new ServiceMetricEvent.Builder().build( - "metadata/kill/supervisor/count", - supervisorRemoved - ) - ); - log.info("Finished running KillSupervisors duty. Removed %,d supervisor specs", supervisorRemoved); - } - catch (Exception e) { - log.error(e, "Failed to kill terminated supervisor metadata"); - } - } - return params; + return metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(minCreatedTime.getMillis()); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDuty.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDuty.java index 7a5b1d6970ac..7d645967be4e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDuty.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDuty.java @@ -22,64 +22,58 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Preconditions; +import org.apache.druid.guice.annotations.UnstableApi; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.MetadataSupervisorManager; -import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.DruidCoordinatorConfig; +import org.apache.druid.server.coordinator.stats.Stats; +import org.joda.time.DateTime; import org.joda.time.Duration; /** - * CoordinatorDuty for automatic deletion of terminated supervisors from the supervisor table in metadata storage. - * This class has the same purpose as {@link KillSupervisors} but uses a different configuration style as - * detailed in {@link CoordinatorCustomDuty}. This class primary purpose is as an example to demostrate the usuage - * of the {@link CoordinatorCustomDuty} {@link org.apache.druid.guice.annotations.ExtensionPoint} - * - * Production use case should still use {@link KillSupervisors}. In the future, we might migrate all metadata - * management coordinator duties to {@link CoordinatorCustomDuty} but until then this class will remains undocumented - * and should not be use in production. + * Example {@link CoordinatorCustomDuty} for automatic deletion of terminated + * supervisors from the metadata storage. This duty has the same implementation + * as {@link KillSupervisors} but uses a different configuration style as + * detailed in {@link CoordinatorCustomDuty}. + *

+ * This duty is only an example to demostrate the usage of coordinator custom + * duties. All production clusters should continue using {@link KillSupervisors}. + *

+ * In the future, we might migrate all metadata management coordinator duties to + * {@link CoordinatorCustomDuty} but until then this class will remain undocumented. */ -public class KillSupervisorsCustomDuty implements CoordinatorCustomDuty +@UnstableApi +public class KillSupervisorsCustomDuty extends MetadataCleanupDuty implements CoordinatorCustomDuty { private static final Logger log = new Logger(KillSupervisorsCustomDuty.class); - private final Duration retainDuration; private final MetadataSupervisorManager metadataSupervisorManager; @JsonCreator public KillSupervisorsCustomDuty( - @JsonProperty("retainDuration") Duration retainDuration, - @JacksonInject MetadataSupervisorManager metadataSupervisorManager + @JsonProperty("durationToRetain") Duration retainDuration, + @JacksonInject MetadataSupervisorManager metadataSupervisorManager, + @JacksonInject DruidCoordinatorConfig coordinatorConfig ) { - this.metadataSupervisorManager = metadataSupervisorManager; - this.retainDuration = retainDuration; - Preconditions.checkArgument(this.retainDuration != null && this.retainDuration.getMillis() >= 0, "(Custom Duty) Coordinator supervisor kill retainDuration must be >= 0"); - log.info( - "Supervisor Kill Task scheduling enabled with retainDuration [%s]", - this.retainDuration + super( + "supervisors", + "KillSupervisorsCustomDuty", + // Use the same period as metadata store management so that validation passes + // Actual period of custom duties is configured by the user + coordinatorConfig.getCoordinatorMetadataStoreManagementPeriod(), + retainDuration, + Stats.Kill.SUPERVISOR_SPECS, + coordinatorConfig ); + this.metadataSupervisorManager = metadataSupervisorManager; + log.warn("This is only an example implementation of a custom duty and" + + " must not be used in production. Use KillSupervisors duty instead."); } @Override - public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime) { - long timestamp = System.currentTimeMillis() - retainDuration.getMillis(); - try { - int supervisorRemoved = metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(timestamp); - ServiceEmitter emitter = params.getEmitter(); - emitter.emit( - new ServiceMetricEvent.Builder().build( - "metadata/kill/supervisor/count", - supervisorRemoved - ) - ); - log.info("Finished running KillSupervisors duty. Removed %,d supervisor specs", supervisorRemoved); - } - catch (Exception e) { - log.error(e, "Failed to kill terminated supervisor metadata"); - } - return params; + return metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(minCreatedTime.getMillis()); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/MetadataCleanupDuty.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/MetadataCleanupDuty.java new file mode 100644 index 000000000000..3206f1100379 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/MetadataCleanupDuty.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordinator.DruidCoordinatorConfig; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.stats.CoordinatorStat; +import org.joda.time.DateTime; +import org.joda.time.Duration; + +import javax.annotation.Nullable; + +/** + * Performs cleanup of stale metadata entries created before a configured retain duration. + *

+ * In every invocation of {@link #run}, the duty checks if the {@link #cleanupPeriod} + * has elapsed since the {@link #lastCleanupTime}. If it has, then the method + * {@link #cleanupEntriesCreatedBefore(DateTime)} is invoked. Otherwise, the duty + * completes immediately without making any changes. + */ +public abstract class MetadataCleanupDuty implements CoordinatorDuty +{ + private static final Logger log = new Logger(MetadataCleanupDuty.class); + + private final String propertyPrefix; + private final String entryType; + private final CoordinatorStat cleanupCountStat; + + private final Duration cleanupPeriod; + private final Duration retainDuration; + + private DateTime lastCleanupTime = DateTimes.utc(0); + + protected MetadataCleanupDuty( + String entryType, + String propertyPrefix, + Duration cleanupPeriod, + Duration retainDuration, + CoordinatorStat cleanupCountStat, + DruidCoordinatorConfig coordinatorConfig + ) + { + this.propertyPrefix = propertyPrefix; + this.entryType = entryType; + this.cleanupPeriod = cleanupPeriod; + this.retainDuration = retainDuration; + this.cleanupCountStat = cleanupCountStat; + + validatePeriod(cleanupPeriod, coordinatorConfig.getCoordinatorMetadataStoreManagementPeriod()); + validateRetainDuration(retainDuration); + + log.debug( + "Enabled cleanup of [%s] with period [%s] and durationToRetain [%s].", + entryType, cleanupPeriod, retainDuration + ); + } + + @Nullable + @Override + public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + { + final DateTime now = DateTimes.nowUtc(); + + // Perform cleanup only if cleanup period has elapsed + if (lastCleanupTime.plus(cleanupPeriod).isBefore(now)) { + lastCleanupTime = now; + + try { + DateTime minCreatedTime = now.minus(retainDuration); + int deletedEntries = cleanupEntriesCreatedBefore(minCreatedTime); + log.info("Removed [%,d] [%s] created before [%s].", deletedEntries, entryType, minCreatedTime); + + params.getCoordinatorStats().add(cleanupCountStat, deletedEntries); + } + catch (Exception e) { + log.error(e, "Failed to perform cleanup of [%s]", entryType); + } + } + + return params; + } + + /** + * Cleans up metadata entries created before the {@code minCreatedTime}. + *

+ * This method is not invoked if the {@link #cleanupPeriod} has not elapsed + * since the {@link #lastCleanupTime}. + * + * @return Number of deleted metadata entries + */ + protected abstract int cleanupEntriesCreatedBefore(DateTime minCreatedTime); + + private void validatePeriod(Duration period, Duration metadataManagementPeriod) + { + Preconditions.checkArgument( + period != null && period.getMillis() >= metadataManagementPeriod.getMillis(), + "[%s.period] must be greater than [druid.coordinator.period.metadataStoreManagementPeriod]", + propertyPrefix + ); + } + + private void validateRetainDuration(Duration retainDuration) + { + Preconditions.checkArgument( + retainDuration != null && retainDuration.getMillis() >= 0, + "[%s.durationToRetain] must be 0 milliseconds or higher", + propertyPrefix + ); + Preconditions.checkArgument( + retainDuration.getMillis() < System.currentTimeMillis(), + "[%s.durationToRetain] cannot be greater than current time in milliseconds", + propertyPrefix + ); + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java index 28f5c91049fb..ac37767327f9 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java @@ -129,6 +129,20 @@ public static class CoordinatorRun = CoordinatorStat.toDebugAndEmit("groupRunTime", "coordinator/global/time"); } + public static class Kill + { + public static final CoordinatorStat COMPACTION_CONFIGS + = CoordinatorStat.toDebugAndEmit("killedCompactConfigs", "metadata/kill/compaction/count"); + public static final CoordinatorStat SUPERVISOR_SPECS + = CoordinatorStat.toDebugAndEmit("killedSupervisorSpecs", "metadata/kill/supervisor/count"); + public static final CoordinatorStat RULES + = CoordinatorStat.toDebugAndEmit("killedRules", "metadata/kill/rule/count"); + public static final CoordinatorStat AUDIT_LOGS + = CoordinatorStat.toDebugAndEmit("killedAuditLogs", "metadata/kill/audit/count"); + public static final CoordinatorStat DATASOURCES + = CoordinatorStat.toDebugAndEmit("killedDatasources", "metadata/kill/datasource/count"); + } + public static class Balancer { public static final CoordinatorStat COMPUTATION_ERRORS = CoordinatorStat.toLogAndEmit( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java index 5f1e9dd7e856..d1df91269892 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java @@ -139,7 +139,6 @@ public void bigProfiler() .build() ) .withSegmentAssignerUsing(loadQueueManager) - .withEmitter(emitter) .withDatabaseRuleManager(manager) .build(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 15fc5f5ac919..979181619a71 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -690,7 +690,11 @@ public void testCompactSegmentsDutyWhenCustomDutyGroupEmpty() @Test public void testInitializeCompactSegmentsDutyWhenCustomDutyGroupDoesNotContainsCompactSegments() { - CoordinatorCustomDutyGroup group = new CoordinatorCustomDutyGroup("group1", Duration.standardSeconds(1), ImmutableList.of(new KillSupervisorsCustomDuty(new Duration("PT1S"), null))); + CoordinatorCustomDutyGroup group = new CoordinatorCustomDutyGroup( + "group1", + Duration.standardSeconds(1), + ImmutableList.of(new KillSupervisorsCustomDuty(new Duration("PT1S"), null, druidCoordinatorConfig)) + ); CoordinatorCustomDutyGroups customDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of(group)); coordinator = new DruidCoordinator( druidCoordinatorConfig, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java index 8480670160a8..bec2e3ed4bad 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java @@ -20,14 +20,14 @@ package org.apache.druid.server.coordinator.duty; import org.apache.druid.audit.AuditManager; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Stats; import org.joda.time.Duration; -import org.junit.Rule; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.ArgumentMatchers; import org.mockito.Mock; @@ -43,13 +43,15 @@ public class KillAuditLogTest @Mock private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams; - @Mock - private ServiceEmitter mockServiceEmitter; - - @Rule - public ExpectedException exception = ExpectedException.none(); - private KillAuditLog killAuditLog; + private CoordinatorRunStats runStats; + + @Before + public void setup() + { + runStats = new CoordinatorRunStats(); + Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats); + } @Test public void testRunSkipIfLastRunLessThanPeriod() @@ -69,7 +71,6 @@ public void testRunSkipIfLastRunLessThanPeriod() @Test public void testRunNotSkipIfLastRunMoreThanPeriod() { - Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder() .withMetadataStoreManagementPeriod(new Duration("PT5s")) .withCoordianatorAuditKillPeriod(new Duration("PT6S")) @@ -80,7 +81,7 @@ public void testRunNotSkipIfLastRunMoreThanPeriod() killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig); killAuditLog.run(mockDruidCoordinatorRuntimeParams); Mockito.verify(mockAuditManager).removeAuditLogsOlderThan(ArgumentMatchers.anyLong()); - Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class)); + Assert.assertTrue(runStats.hasStat(Stats.Kill.AUDIT_LOGS)); } @Test @@ -93,9 +94,16 @@ public void testConstructorFailIfInvalidPeriod() .withCoordinatorKillMaxSegments(10) .withCoordinatorKillIgnoreDurationToRetain(false) .build(); - exception.expect(IllegalArgumentException.class); - exception.expectMessage("coordinator audit kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"); - killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig); + + final IllegalArgumentException exception = Assert.assertThrows( + IllegalArgumentException.class, + () -> killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig) + ); + Assert.assertEquals( + "[druid.coordinator.kill.audit.period] must be greater than" + + " [druid.coordinator.period.metadataStoreManagementPeriod]", + exception.getMessage() + ); } @Test @@ -108,8 +116,13 @@ public void testConstructorFailIfInvalidRetainDuration() .withCoordinatorKillMaxSegments(10) .withCoordinatorKillIgnoreDurationToRetain(false) .build(); - exception.expect(IllegalArgumentException.class); - exception.expectMessage("coordinator audit kill retainDuration must be >= 0"); - killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig); + final IllegalArgumentException exception = Assert.assertThrows( + IllegalArgumentException.class, + () -> killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig) + ); + Assert.assertEquals( + "[druid.coordinator.kill.audit.durationToRetain] must be 0 milliseconds or higher", + exception.getMessage() + ); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java index 916f3ed5e6b3..6d201aa9d6a2 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java @@ -25,8 +25,6 @@ import org.apache.druid.common.config.ConfigManager; import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; import org.apache.druid.metadata.MetadataStorageConnector; import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.SqlSegmentsMetadataManager; @@ -35,21 +33,19 @@ import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Stats; import org.joda.time.Duration; import org.joda.time.Period; import org.junit.Assert; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; -import org.mockito.stubbing.Answer; @RunWith(MockitoJUnitRunner.class) public class KillCompactionConfigTest @@ -57,9 +53,6 @@ public class KillCompactionConfigTest @Mock private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams; - @Mock - private ServiceEmitter mockServiceEmitter; - @Mock private SqlSegmentsMetadataManager mockSqlSegmentsMetadataManager; @@ -72,15 +65,15 @@ public class KillCompactionConfigTest @Mock private MetadataStorageTablesConfig mockConnectorConfig; - @Rule - public ExpectedException exception = ExpectedException.none(); - private KillCompactionConfig killCompactionConfig; + private CoordinatorRunStats runStats; @Before public void setup() { + runStats = new CoordinatorRunStats(); Mockito.when(mockConnectorConfig.getConfigTable()).thenReturn("druid_config"); + Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats); } @Test @@ -102,7 +95,7 @@ public void testRunSkipIfLastRunLessThanPeriod() killCompactionConfig.run(mockDruidCoordinatorRuntimeParams); Mockito.verifyNoInteractions(mockSqlSegmentsMetadataManager); Mockito.verifyNoInteractions(mockJacksonConfigManager); - Mockito.verifyNoInteractions(mockServiceEmitter); + Assert.assertEquals(0, runStats.rowCount()); } @Test @@ -114,14 +107,21 @@ public void testConstructorFailIfInvalidPeriod() .withCoordinatorKillMaxSegments(10) .withCoordinatorKillIgnoreDurationToRetain(false) .build(); - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Coordinator compaction configuration kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"); - killCompactionConfig = new KillCompactionConfig( - druidCoordinatorConfig, - mockSqlSegmentsMetadataManager, - mockJacksonConfigManager, - mockConnector, - mockConnectorConfig + + final IllegalArgumentException exception = Assert.assertThrows( + IllegalArgumentException.class, + () -> killCompactionConfig = new KillCompactionConfig( + druidCoordinatorConfig, + mockSqlSegmentsMetadataManager, + mockJacksonConfigManager, + mockConnector, + mockConnectorConfig + ) + ); + Assert.assertEquals( + "[druid.coordinator.kill.compaction.period] must be greater than" + + " [druid.coordinator.period.metadataStoreManagementPeriod]", + exception.getMessage() ); } @@ -129,7 +129,6 @@ public void testConstructorFailIfInvalidPeriod() @Test public void testRunDoNothingIfCurrentConfigIsEmpty() { - Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); // Set current compaction config to an empty compaction config Mockito.when(mockConnector.lookup( ArgumentMatchers.anyString(), @@ -158,10 +157,9 @@ public void testRunDoNothingIfCurrentConfigIsEmpty() ); killCompactionConfig.run(mockDruidCoordinatorRuntimeParams); Mockito.verifyNoInteractions(mockSqlSegmentsMetadataManager); - final ArgumentCaptor emittedEventCaptor = ArgumentCaptor.forClass(ServiceEventBuilder.class); - Mockito.verify(mockServiceEmitter).emit(emittedEventCaptor.capture()); - Assert.assertEquals(KillCompactionConfig.COUNT_METRIC, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("metric")); - Assert.assertEquals(0, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value")); + Assert.assertTrue(runStats.hasStat(Stats.Kill.COMPACTION_CONFIGS)); + Assert.assertEquals(0, runStats.get(Stats.Kill.COMPACTION_CONFIGS)); + Mockito.verify(mockJacksonConfigManager).convertByteToConfig( ArgumentMatchers.eq(null), ArgumentMatchers.eq(CoordinatorCompactionConfig.class), @@ -223,7 +221,6 @@ public void testRunRemoveInactiveDatasourceCompactionConfig() ArgumentMatchers.eq(CoordinatorCompactionConfig.class), ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) ).thenReturn(originalCurrentConfig); - Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); Mockito.when(mockSqlSegmentsMetadataManager.retrieveAllDataSourceNames()).thenReturn(ImmutableSet.of(activeDatasourceName)); final ArgumentCaptor oldConfigCaptor = ArgumentCaptor.forClass(byte[].class); final ArgumentCaptor newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class); @@ -257,11 +254,7 @@ public void testRunRemoveInactiveDatasourceCompactionConfig() Assert.assertEquals(1, newConfigCaptor.getValue().getCompactionConfigs().size()); Assert.assertEquals(activeDatasourceConfig, newConfigCaptor.getValue().getCompactionConfigs().get(0)); - final ArgumentCaptor emittedEventCaptor = ArgumentCaptor.forClass(ServiceEventBuilder.class); - Mockito.verify(mockServiceEmitter).emit(emittedEventCaptor.capture()); - Assert.assertEquals(KillCompactionConfig.COUNT_METRIC, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("metric")); - // Should delete 1 config - Assert.assertEquals(1, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value")); + Assert.assertEquals(1, runStats.get(Stats.Kill.COMPACTION_CONFIGS)); Mockito.verify(mockJacksonConfigManager).convertByteToConfig( ArgumentMatchers.eq(originalCurrentConfigBytes), @@ -317,27 +310,20 @@ public void testRunRetryForRetryableException() ArgumentMatchers.eq(CoordinatorCompactionConfig.class), ArgumentMatchers.eq(CoordinatorCompactionConfig.empty())) ).thenReturn(originalCurrentConfig); - Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); Mockito.when(mockSqlSegmentsMetadataManager.retrieveAllDataSourceNames()).thenReturn(ImmutableSet.of()); Mockito.when(mockJacksonConfigManager.set( ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY), ArgumentMatchers.any(byte[].class), ArgumentMatchers.any(CoordinatorCompactionConfig.class), ArgumentMatchers.any()) - ).thenAnswer(new Answer() { - private int count = 0; - @Override - public Object answer(InvocationOnMock invocation) - { - if (count++ < 3) { - // Return fail result with RetryableException the first three call to updated set - return ConfigManager.SetResult.fail(new Exception(), true); - } else { - // Return success ok on the fourth call to set updated config - return ConfigManager.SetResult.ok(); - } - } - }); + ).thenReturn( + // Return fail result with RetryableException the first three calls to updated set + ConfigManager.SetResult.fail(new Exception(), true), + ConfigManager.SetResult.fail(new Exception(), true), + ConfigManager.SetResult.fail(new Exception(), true), + // Return success ok on the fourth call to set updated config + ConfigManager.SetResult.ok() + ); TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder() .withMetadataStoreManagementPeriod(new Duration("PT5S")) @@ -354,12 +340,8 @@ public Object answer(InvocationOnMock invocation) ); killCompactionConfig.run(mockDruidCoordinatorRuntimeParams); - // Verify and Assert - final ArgumentCaptor emittedEventCaptor = ArgumentCaptor.forClass(ServiceEventBuilder.class); - Mockito.verify(mockServiceEmitter).emit(emittedEventCaptor.capture()); - Assert.assertEquals(KillCompactionConfig.COUNT_METRIC, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("metric")); - // Should delete 1 config - Assert.assertEquals(1, emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value")); + // Verify that 1 config has been deleted + Assert.assertEquals(1, runStats.get(Stats.Kill.COMPACTION_CONFIGS)); // Should call convertByteToConfig and lookup (to refresh current compaction config) four times due to RetryableException when failed Mockito.verify(mockJacksonConfigManager, Mockito.times(4)).convertByteToConfig( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java index 9e4960331550..222d5a715fba 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java @@ -21,16 +21,15 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; import org.apache.druid.metadata.MetadataSupervisorManager; -import org.apache.druid.metadata.TestSupervisorSpec; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Stats; import org.joda.time.Duration; -import org.junit.Rule; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.ArgumentMatchers; import org.mockito.Mock; @@ -49,16 +48,15 @@ public class KillDatasourceMetadataTest @Mock private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams; - @Mock - private TestSupervisorSpec mockKinesisSupervisorSpec; - - @Mock - private ServiceEmitter mockServiceEmitter; - - @Rule - public ExpectedException exception = ExpectedException.none(); - private KillDatasourceMetadata killDatasourceMetadata; + private CoordinatorRunStats runStats; + + @Before + public void setup() + { + runStats = new CoordinatorRunStats(); + Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats); + } @Test public void testRunSkipIfLastRunLessThanPeriod() @@ -79,7 +77,7 @@ public void testRunSkipIfLastRunLessThanPeriod() @Test public void testRunNotSkipIfLastRunMoreThanPeriod() { - Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); + Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats); TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder() .withMetadataStoreManagementPeriod(new Duration("PT5S")) @@ -88,10 +86,14 @@ public void testRunNotSkipIfLastRunMoreThanPeriod() .withCoordinatorKillMaxSegments(10) .withCoordinatorKillIgnoreDurationToRetain(false) .build(); - killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); + killDatasourceMetadata = new KillDatasourceMetadata( + druidCoordinatorConfig, + mockIndexerMetadataStorageCoordinator, + mockMetadataSupervisorManager + ); killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams); Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(), ArgumentMatchers.anySet()); - Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class)); + Assert.assertTrue(runStats.hasStat(Stats.Kill.DATASOURCES)); } @Test @@ -104,9 +106,20 @@ public void testConstructorFailIfInvalidPeriod() .withCoordinatorKillMaxSegments(10) .withCoordinatorKillIgnoreDurationToRetain(false) .build(); - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Coordinator datasource metadata kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"); - killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); + + final IllegalArgumentException exception = Assert.assertThrows( + IllegalArgumentException.class, + () -> killDatasourceMetadata = new KillDatasourceMetadata( + druidCoordinatorConfig, + mockIndexerMetadataStorageCoordinator, + mockMetadataSupervisorManager + ) + ); + Assert.assertEquals( + "[druid.coordinator.kill.datasource.period] must be greater than" + + " [druid.coordinator.period.metadataStoreManagementPeriod]", + exception.getMessage() + ); } @Test @@ -119,16 +132,23 @@ public void testConstructorFailIfInvalidRetainDuration() .withCoordinatorKillMaxSegments(10) .withCoordinatorKillIgnoreDurationToRetain(false) .build(); - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Coordinator datasource metadata kill retainDuration must be >= 0"); - killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); + final IllegalArgumentException exception = Assert.assertThrows( + IllegalArgumentException.class, + () -> killDatasourceMetadata = new KillDatasourceMetadata( + druidCoordinatorConfig, + mockIndexerMetadataStorageCoordinator, + mockMetadataSupervisorManager + ) + ); + Assert.assertEquals( + "[druid.coordinator.kill.datasource.durationToRetain] must be 0 milliseconds or higher", + exception.getMessage() + ); } @Test public void testRunWithEmptyFilterExcludedDatasource() { - Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); - TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder() .withMetadataStoreManagementPeriod(new Duration("PT5S")) .withCoordinatorDatasourceKillPeriod(new Duration("PT6S")) @@ -139,6 +159,6 @@ public void testRunWithEmptyFilterExcludedDatasource() killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams); Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(), ArgumentMatchers.eq(ImmutableSet.of())); - Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class)); + Assert.assertTrue(runStats.hasStat(Stats.Kill.DATASOURCES)); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java index 6d98cfe0b586..f1663dffda54 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java @@ -19,16 +19,15 @@ package org.apache.druid.server.coordinator.duty; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Stats; import org.joda.time.Duration; +import org.junit.Assert; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.ArgumentMatchers; import org.mockito.Mock; @@ -44,18 +43,14 @@ public class KillRulesTest @Mock private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams; - @Mock - private ServiceEmitter mockServiceEmitter; - - @Rule - public ExpectedException exception = ExpectedException.none(); - private KillRules killRules; + private CoordinatorRunStats runStats; @Before public void setup() { - Mockito.when(mockDruidCoordinatorRuntimeParams.getDatabaseRuleManager()).thenReturn(mockRuleManager); + runStats = new CoordinatorRunStats(); + Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats); } @Test @@ -68,7 +63,7 @@ public void testRunSkipIfLastRunLessThanPeriod() .withCoordinatorKillMaxSegments(10) .withCoordinatorKillIgnoreDurationToRetain(false) .build(); - killRules = new KillRules(druidCoordinatorConfig); + killRules = new KillRules(druidCoordinatorConfig, mockRuleManager); killRules.run(mockDruidCoordinatorRuntimeParams); Mockito.verifyNoInteractions(mockRuleManager); } @@ -76,7 +71,6 @@ public void testRunSkipIfLastRunLessThanPeriod() @Test public void testRunNotSkipIfLastRunMoreThanPeriod() { - Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder() .withMetadataStoreManagementPeriod(new Duration("PT5S")) .withCoordinatorRuleKillPeriod(new Duration("PT6S")) @@ -84,10 +78,10 @@ public void testRunNotSkipIfLastRunMoreThanPeriod() .withCoordinatorKillMaxSegments(10) .withCoordinatorKillIgnoreDurationToRetain(false) .build(); - killRules = new KillRules(druidCoordinatorConfig); + killRules = new KillRules(druidCoordinatorConfig, mockRuleManager); killRules.run(mockDruidCoordinatorRuntimeParams); Mockito.verify(mockRuleManager).removeRulesForEmptyDatasourcesOlderThan(ArgumentMatchers.anyLong()); - Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class)); + Assert.assertTrue(runStats.hasStat(Stats.Kill.RULES)); } @Test @@ -100,9 +94,15 @@ public void testConstructorFailIfInvalidPeriod() .withCoordinatorKillMaxSegments(10) .withCoordinatorKillIgnoreDurationToRetain(false) .build(); - exception.expect(IllegalArgumentException.class); - exception.expectMessage("coordinator rule kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"); - killRules = new KillRules(druidCoordinatorConfig); + final IllegalArgumentException exception = Assert.assertThrows( + IllegalArgumentException.class, + () -> killRules = new KillRules(druidCoordinatorConfig, mockRuleManager) + ); + Assert.assertEquals( + "[druid.coordinator.kill.rule.period] must be greater than" + + " [druid.coordinator.period.metadataStoreManagementPeriod]", + exception.getMessage() + ); } @Test @@ -115,8 +115,13 @@ public void testConstructorFailIfInvalidRetainDuration() .withCoordinatorKillMaxSegments(10) .withCoordinatorKillIgnoreDurationToRetain(false) .build(); - exception.expect(IllegalArgumentException.class); - exception.expectMessage("coordinator rule kill retainDuration must be >= 0"); - killRules = new KillRules(druidCoordinatorConfig); + final IllegalArgumentException exception = Assert.assertThrows( + IllegalArgumentException.class, + () -> killRules = new KillRules(druidCoordinatorConfig, mockRuleManager) + ); + Assert.assertEquals( + "[druid.coordinator.kill.rule.durationToRetain] must be 0 milliseconds or higher", + exception.getMessage() + ); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDutyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDutyTest.java index a07c4f676a3d..e93e04f1b38c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDutyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDutyTest.java @@ -19,15 +19,15 @@ package org.apache.druid.server.coordinator.duty; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; import org.apache.druid.metadata.MetadataSupervisorManager; +import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Stats; import org.joda.time.Duration; import org.junit.Assert; -import org.junit.Rule; +import org.junit.Before; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.ArgumentMatchers; import org.mockito.Mock; @@ -44,43 +44,70 @@ public class KillSupervisorsCustomDutyTest private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams; @Mock - private ServiceEmitter mockServiceEmitter; - - @Rule - public ExpectedException exception = ExpectedException.none(); + private DruidCoordinatorConfig coordinatorConfig; private KillSupervisorsCustomDuty killSupervisors; + @Before + public void setup() + { + Mockito.when(coordinatorConfig.getCoordinatorMetadataStoreManagementPeriod()) + .thenReturn(new Duration(3600 * 1000)); + } + @Test public void testConstructorFailIfRetainDurationNull() { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("(Custom Duty) Coordinator supervisor kill retainDuration must be >= 0"); - killSupervisors = new KillSupervisorsCustomDuty(null, mockMetadataSupervisorManager); + final IllegalArgumentException exception = Assert.assertThrows( + IllegalArgumentException.class, + () -> killSupervisors = new KillSupervisorsCustomDuty(null, mockMetadataSupervisorManager, coordinatorConfig) + ); + Assert.assertEquals( + "[KillSupervisorsCustomDuty.durationToRetain] must be 0 milliseconds or higher", + exception.getMessage() + ); } @Test public void testConstructorFailIfRetainDurationInvalid() { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("(Custom Duty) Coordinator supervisor kill retainDuration must be >= 0"); - killSupervisors = new KillSupervisorsCustomDuty(new Duration("PT-1s"), mockMetadataSupervisorManager); + final IllegalArgumentException exception = Assert.assertThrows( + IllegalArgumentException.class, + () -> killSupervisors = new KillSupervisorsCustomDuty( + new Duration("PT-1S"), + mockMetadataSupervisorManager, + coordinatorConfig + ) + ); + Assert.assertEquals( + "[KillSupervisorsCustomDuty.durationToRetain] must be 0 milliseconds or higher", + exception.getMessage() + ); } @Test public void testConstructorSuccess() { - killSupervisors = new KillSupervisorsCustomDuty(new Duration("PT1S"), mockMetadataSupervisorManager); + killSupervisors = new KillSupervisorsCustomDuty( + new Duration("PT1S"), + mockMetadataSupervisorManager, + coordinatorConfig + ); Assert.assertNotNull(killSupervisors); } @Test public void testRun() { - Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); - killSupervisors = new KillSupervisorsCustomDuty(new Duration("PT1S"), mockMetadataSupervisorManager); + final CoordinatorRunStats runStats = new CoordinatorRunStats(); + Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats); + killSupervisors = new KillSupervisorsCustomDuty( + new Duration("PT1S"), + mockMetadataSupervisorManager, + coordinatorConfig + ); killSupervisors.run(mockDruidCoordinatorRuntimeParams); Mockito.verify(mockMetadataSupervisorManager).removeTerminatedSupervisorsOlderThan(ArgumentMatchers.anyLong()); - Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class)); + Assert.assertTrue(runStats.hasStat(Stats.Kill.SUPERVISOR_SPECS)); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java index 0f7c2fff7e24..bbe92a2cd10f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java @@ -19,15 +19,15 @@ package org.apache.druid.server.coordinator.duty; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; import org.apache.druid.metadata.MetadataSupervisorManager; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Stats; import org.joda.time.Duration; -import org.junit.Rule; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.ArgumentMatchers; import org.mockito.Mock; @@ -43,13 +43,15 @@ public class KillSupervisorsTest @Mock private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams; - @Mock - private ServiceEmitter mockServiceEmitter; - - @Rule - public ExpectedException exception = ExpectedException.none(); - private KillSupervisors killSupervisors; + private CoordinatorRunStats runStats; + + @Before + public void setup() + { + runStats = new CoordinatorRunStats(); + Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats); + } @Test public void testRunSkipIfLastRunLessThanPeriod() @@ -69,7 +71,6 @@ public void testRunSkipIfLastRunLessThanPeriod() @Test public void testRunNotSkipIfLastRunMoreThanPeriod() { - Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder() .withMetadataStoreManagementPeriod(new Duration("PT5S")) .withCoordinatorSupervisorKillPeriod(new Duration("PT6S")) @@ -80,7 +81,7 @@ public void testRunNotSkipIfLastRunMoreThanPeriod() killSupervisors = new KillSupervisors(druidCoordinatorConfig, mockMetadataSupervisorManager); killSupervisors.run(mockDruidCoordinatorRuntimeParams); Mockito.verify(mockMetadataSupervisorManager).removeTerminatedSupervisorsOlderThan(ArgumentMatchers.anyLong()); - Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class)); + Assert.assertTrue(runStats.hasStat(Stats.Kill.SUPERVISOR_SPECS)); } @Test @@ -93,9 +94,15 @@ public void testConstructorFailIfInvalidPeriod() .withCoordinatorKillMaxSegments(10) .withCoordinatorKillIgnoreDurationToRetain(false) .build(); - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Coordinator supervisor kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"); - killSupervisors = new KillSupervisors(druidCoordinatorConfig, mockMetadataSupervisorManager); + final IllegalArgumentException exception = Assert.assertThrows( + IllegalArgumentException.class, + () -> killSupervisors = new KillSupervisors(druidCoordinatorConfig, mockMetadataSupervisorManager) + ); + Assert.assertEquals( + "[druid.coordinator.kill.supervisor.period] must be greater than" + + " [druid.coordinator.period.metadataStoreManagementPeriod]", + exception.getMessage() + ); } @Test @@ -108,8 +115,14 @@ public void testConstructorFailIfInvalidRetainDuration() .withCoordinatorKillMaxSegments(10) .withCoordinatorKillIgnoreDurationToRetain(false) .build(); - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Coordinator supervisor kill retainDuration must be >= 0"); - killSupervisors = new KillSupervisors(druidCoordinatorConfig, mockMetadataSupervisorManager); + + final IllegalArgumentException exception = Assert.assertThrows( + IllegalArgumentException.class, + () -> killSupervisors = new KillSupervisors(druidCoordinatorConfig, mockMetadataSupervisorManager) + ); + Assert.assertEquals( + "[druid.coordinator.kill.supervisor.durationToRetain] must be 0 milliseconds or higher", + exception.getMessage() + ); } }