From 7e35e50052ee1b4f4d65222e0d5c4883e9fa26da Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Sun, 6 Oct 2024 21:48:26 +0530 Subject: [PATCH] Fix issues with MSQ Compaction (#17250) The patch makes the following changes: 1. Fixes a bug causing compaction to fail on array, complex, and other non-primitive-type columns 2. Updates compaction status check to be conscious of partition dimensions when comparing dimension ordering. 3. Ensures only string columns are specified as partition dimensions 4. Ensures `rollup` is true if and only if metricsSpec is non-empty 5. Ensures disjoint intervals aren't submitted for compaction 6. Adds `compactionReason` to compaction task context. --- .../msq/indexing/MSQCompactionRunner.java | 33 ++++++-- .../msq/indexing/MSQCompactionRunnerTest.java | 83 +++++++++++++++---- .../common/task/CompactionRunner.java | 5 +- .../indexing/common/task/CompactionTask.java | 5 +- .../common/task/NativeCompactionRunner.java | 3 +- .../indexing/ClientCompactionRunnerInfo.java | 54 ++++++++---- .../server/compaction/CompactionStatus.java | 44 ++++++++-- .../coordinator/duty/CompactSegments.java | 5 ++ .../ClientCompactionRunnerInfoTest.java | 68 ++++++++++++--- .../NewestSegmentFirstPolicyTest.java | 77 +++++++++++++++++ 10 files changed, 317 insertions(+), 60 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java index 417fdb60d0f3..e20188d58294 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.inject.Injector; import org.apache.druid.client.indexing.ClientCompactionRunnerInfo; import org.apache.druid.data.input.impl.DimensionSchema; @@ -129,21 +130,35 @@ public MSQCompactionRunner( * The following configs aren't supported: * */ @Override public CompactionConfigValidationResult validateCompactionTask( - CompactionTask compactionTask + CompactionTask compactionTask, + Map intervalToDataSchemaMap ) { + if (intervalToDataSchemaMap.size() > 1) { + // We are currently not able to handle multiple intervals in the map for multiple reasons, one of them being that + // the subsequent worker ids clash -- since they are derived from MSQControllerTask ID which in turn is equal to + // CompactionTask ID for each sequentially launched MSQControllerTask. + return CompactionConfigValidationResult.failure( + "MSQ: Disjoint compaction intervals[%s] not supported", + intervalToDataSchemaMap.keySet() + ); + } List validationResults = new ArrayList<>(); if (compactionTask.getTuningConfig() != null) { - validationResults.add(ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ( - compactionTask.getTuningConfig().getPartitionsSpec()) + validationResults.add( + ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ( + compactionTask.getTuningConfig().getPartitionsSpec(), + Iterables.getOnlyElement(intervalToDataSchemaMap.values()).getDimensionsSpec().getDimensions() + ) ); } if (compactionTask.getGranularitySpec() != null) { @@ -300,7 +315,7 @@ private static RowSignature getRowSignature(DataSchema dataSchema) rowSignatureBuilder.add(TIME_VIRTUAL_COLUMN, ColumnType.LONG); } for (DimensionSchema dimensionSchema : dataSchema.getDimensionsSpec().getDimensions()) { - rowSignatureBuilder.add(dimensionSchema.getName(), ColumnType.fromString(dimensionSchema.getTypeName())); + rowSignatureBuilder.add(dimensionSchema.getName(), dimensionSchema.getColumnType()); } // There can be columns that are part of metricsSpec for a datasource. for (AggregatorFactory aggregatorFactory : dataSchema.getAggregators()) { @@ -416,7 +431,9 @@ private static boolean isGroupBy(DataSchema dataSchema) { if (dataSchema.getGranularitySpec() != null) { // If rollup is true without any metrics, all columns are treated as dimensions and - // duplicate rows are removed in line with native compaction. + // duplicate rows are removed in line with native compaction. This case can only happen if the rollup is + // specified as null in the compaction spec and is then inferred to be true by segment analysis. metrics=null and + // rollup=true combination in turn can only have been recorded for natively ingested segments. return dataSchema.getGranularitySpec().isRollup(); } // If no rollup specified, decide based on whether metrics are present. diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java index 15b12be15753..0b5395d727fe 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec; import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec; @@ -41,6 +42,7 @@ import org.apache.druid.indexing.common.task.TuningConfigBuilder; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; @@ -96,7 +98,6 @@ public class MSQCompactionRunnerTest private static final int MAX_ROWS_PER_SEGMENT = 150000; private static final GranularityType SEGMENT_GRANULARITY = GranularityType.HOUR; private static final GranularityType QUERY_GRANULARITY = GranularityType.HOUR; - private static List PARTITION_DIMENSIONS; private static final StringDimensionSchema STRING_DIMENSION = new StringDimensionSchema("string_dim", null, false); private static final StringDimensionSchema MV_STRING_DIMENSION = new StringDimensionSchema("mv_string_dim", null, null); @@ -106,24 +107,49 @@ public class MSQCompactionRunnerTest LONG_DIMENSION, MV_STRING_DIMENSION ); + private static final Map INTERVAL_DATASCHEMAS = ImmutableMap.of( + COMPACTION_INTERVAL, + new DataSchema.Builder() + .withDataSource(DATA_SOURCE) + .withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null)) + .withDimensions(new DimensionsSpec(DIMENSIONS)) + .build() + ); private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); private static final AggregatorFactory AGG1 = new CountAggregatorFactory("agg_0"); private static final AggregatorFactory AGG2 = new LongSumAggregatorFactory("sum_added", "sum_added"); private static final List AGGREGATORS = ImmutableList.of(AGG1, AGG2); private static final MSQCompactionRunner MSQ_COMPACTION_RUNNER = new MSQCompactionRunner(JSON_MAPPER, TestExprMacroTable.INSTANCE, null); + private static final List PARTITION_DIMENSIONS = Collections.singletonList(STRING_DIMENSION.getName()); + @BeforeClass public static void setupClass() { NullHandling.initializeForTests(); + } - final StringDimensionSchema stringDimensionSchema = new StringDimensionSchema( - "string_dim", + @Test + public void testMultipleDisjointCompactionIntervalsAreInvalid() + { + Map intervalDataschemas = new HashMap<>(INTERVAL_DATASCHEMAS); + intervalDataschemas.put(Intervals.of("2017-07-01/2018-01-01"), null); + CompactionTask compactionTask = createCompactionTask( + new HashedPartitionsSpec(3, null, ImmutableList.of("dummy")), + null, + Collections.emptyMap(), null, null ); - - PARTITION_DIMENSIONS = Collections.singletonList(stringDimensionSchema.getName()); + CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask( + compactionTask, + intervalDataschemas + ); + Assert.assertFalse(validationResult.isValid()); + Assert.assertEquals( + StringUtils.format("MSQ: Disjoint compaction intervals[%s] not supported", intervalDataschemas.keySet()), + validationResult.getReason() + ); } @Test @@ -136,11 +162,11 @@ public void testHashedPartitionsSpecIsInvalid() null, null ); - Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); + Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid()); } @Test - public void testDimensionRangePartitionsSpecIsValid() + public void testStringDimensionInRangePartitionsSpecIsValid() { CompactionTask compactionTask = createCompactionTask( new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null, PARTITION_DIMENSIONS, false), @@ -149,7 +175,29 @@ public void testDimensionRangePartitionsSpecIsValid() null, null ); - Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); + Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid()); + } + + @Test + public void testLongDimensionInRangePartitionsSpecIsInvalid() + { + List longPartitionDimension = Collections.singletonList(LONG_DIMENSION.getName()); + CompactionTask compactionTask = createCompactionTask( + new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null, longPartitionDimension, false), + null, + Collections.emptyMap(), + null, + null + ); + + CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, + INTERVAL_DATASCHEMAS + ); + Assert.assertFalse(validationResult.isValid()); + Assert.assertEquals( + "MSQ: Non-string partition dimension[long_dim] of type[long] not supported with 'range' partition spec", + validationResult.getReason() + ); } @Test @@ -162,7 +210,7 @@ public void testMaxTotalRowsIsInvalid() null, null ); - Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); + Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid()); } @Test @@ -175,7 +223,7 @@ public void testDynamicPartitionsSpecIsValid() null, null ); - Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); + Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid()); } @Test @@ -188,7 +236,7 @@ public void testQueryGranularityAllIsValid() new ClientCompactionTaskGranularitySpec(null, Granularities.ALL, null), null ); - Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); + Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid()); } @Test @@ -201,7 +249,7 @@ public void testRollupFalseWithMetricsSpecIsInValid() new ClientCompactionTaskGranularitySpec(null, null, false), AGGREGATORS.toArray(new AggregatorFactory[0]) ); - Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); + Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid()); } @Test @@ -214,7 +262,7 @@ public void testRollupTrueWithoutMetricsSpecIsInValid() new ClientCompactionTaskGranularitySpec(null, null, true), null ); - Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid()); + Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid()); } @Test @@ -227,13 +275,16 @@ public void testMSQEngineWithUnsupportedMetricsSpecIsInValid() new DynamicPartitionsSpec(3, null), null, Collections.emptyMap(), - new ClientCompactionTaskGranularitySpec(null, null, null), + new ClientCompactionTaskGranularitySpec(null, null, true), new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)} ); - CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask); + CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask( + compactionTask, + INTERVAL_DATASCHEMAS + ); Assert.assertFalse(validationResult.isValid()); Assert.assertEquals( - "MSQ: Non-idempotent aggregator[sum_added] not supported in 'metricsSpec'.", + "MSQ: Aggregator[sum_added] not supported in 'metricsSpec'", validationResult.getReason() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java index 8d30a60d04e6..0abaeed8eb27 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java @@ -57,6 +57,9 @@ TaskStatus runCompactionTasks( * Checks if the provided compaction config is supported by the runner. * The same validation is done at {@link org.apache.druid.msq.indexing.MSQCompactionRunner#validateCompactionTask} */ - CompactionConfigValidationResult validateCompactionTask(CompactionTask compactionTask); + CompactionConfigValidationResult validateCompactionTask( + CompactionTask compactionTask, + Map intervalToDataSchemaMap + ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index b3c01d79f98b..4594fc1e9b25 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -470,7 +470,10 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception ); registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder()); - CompactionConfigValidationResult supportsCompactionConfig = compactionRunner.validateCompactionTask(this); + CompactionConfigValidationResult supportsCompactionConfig = compactionRunner.validateCompactionTask( + this, + intervalDataSchemas + ); if (!supportsCompactionConfig.isValid()) { throw InvalidInput.exception("Compaction spec not supported. Reason[%s].", supportsCompactionConfig.getReason()); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java index 2074d14f0f90..541f24fe0889 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java @@ -85,7 +85,8 @@ public CurrentSubTaskHolder getCurrentSubTaskHolder() @Override public CompactionConfigValidationResult validateCompactionTask( - CompactionTask compactionTask + CompactionTask compactionTask, + Map intervalDataSchemaMap ) { return CompactionConfigValidationResult.success(); diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java index 806b35e94819..f6a009afe1ce 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java @@ -21,12 +21,14 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.query.QueryContext; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -36,6 +38,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; /** @@ -102,16 +107,20 @@ public static CompactionConfigValidationResult validateCompactionConfig( * Checks if the provided compaction config is supported by MSQ. The following configs aren't supported: *
    *
  • partitionsSpec of type HashedParititionsSpec.
  • + *
  • 'range' partitionsSpec with non-string partition dimensions.
  • *
  • maxTotalRows in DynamicPartitionsSpec.
  • - *
  • rollup in granularitySpec set to false when metricsSpec is specified or true when it's empty.
  • - *
  • any metric is non-idempotent, i.e. it defines some aggregatorFactory 'A' s.t. 'A != A.combiningFactory()'.
  • + *
  • Rollup without metricsSpec being specified or vice-versa.
  • + *
  • Any aggregatorFactory {@code A} s.t. {@code A != A.combiningFactory()}.
  • *
*/ private static CompactionConfigValidationResult compactionConfigSupportedByMSQEngine(DataSourceCompactionConfig newConfig) { List validationResults = new ArrayList<>(); if (newConfig.getTuningConfig() != null) { - validationResults.add(validatePartitionsSpecForMSQ(newConfig.getTuningConfig().getPartitionsSpec())); + validationResults.add(validatePartitionsSpecForMSQ( + newConfig.getTuningConfig().getPartitionsSpec(), + newConfig.getDimensionsSpec() == null ? null : newConfig.getDimensionsSpec().getDimensions() + )); } if (newConfig.getGranularitySpec() != null) { validationResults.add(validateRollupForMSQ( @@ -128,9 +137,13 @@ private static CompactionConfigValidationResult compactionConfigSupportedByMSQEn } /** - * Validate that partitionSpec is either 'dynamic` or 'range', and if 'dynamic', ensure 'maxTotalRows' is null. + * Validate that partitionSpec is either 'dynamic` or 'range'. If 'dynamic', ensure 'maxTotalRows' is null. If range + * ensure all partition columns are of string type. */ - public static CompactionConfigValidationResult validatePartitionsSpecForMSQ(PartitionsSpec partitionsSpec) + public static CompactionConfigValidationResult validatePartitionsSpecForMSQ( + @Nullable PartitionsSpec partitionsSpec, + @Nullable List dimensionSchemas + ) { if (!(partitionsSpec instanceof DimensionRangePartitionsSpec || partitionsSpec instanceof DynamicPartitionsSpec)) { @@ -146,11 +159,28 @@ public static CompactionConfigValidationResult validatePartitionsSpecForMSQ(Part "MSQ: 'maxTotalRows' not supported with 'dynamic' partitioning" ); } + if (partitionsSpec instanceof DimensionRangePartitionsSpec && dimensionSchemas != null) { + Map dimensionSchemaMap = dimensionSchemas.stream().collect( + Collectors.toMap(DimensionSchema::getName, Function.identity()) + ); + Optional nonStringDimension = ((DimensionRangePartitionsSpec) partitionsSpec) + .getPartitionDimensions() + .stream() + .filter(dim -> !ColumnType.STRING.equals(dimensionSchemaMap.get(dim).getColumnType())) + .findAny(); + if (nonStringDimension.isPresent()) { + return CompactionConfigValidationResult.failure( + "MSQ: Non-string partition dimension[%s] of type[%s] not supported with 'range' partition spec", + nonStringDimension.get(), + dimensionSchemaMap.get(nonStringDimension.get()).getTypeName() + ); + } + } return CompactionConfigValidationResult.success(); } /** - * Validate rollup in granularitySpec is set to true when metricsSpec is specified and false if it's null. + * Validate rollup in granularitySpec is set to true iff metricsSpec is specified. * If rollup set to null, all existing segments are analyzed, and it's set to true iff all segments have rollup * set to true. */ @@ -159,13 +189,9 @@ public static CompactionConfigValidationResult validateRollupForMSQ( @Nullable Boolean isRollup ) { - if (metricsSpec != null && metricsSpec.length != 0 && isRollup != null && !isRollup) { - return CompactionConfigValidationResult.failure( - "MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is specified" - ); - } else if ((metricsSpec == null || metricsSpec.length == 0) && isRollup != null && isRollup) { + if ((metricsSpec != null && metricsSpec.length > 0) != Boolean.TRUE.equals(isRollup)) { return CompactionConfigValidationResult.failure( - "MSQ: 'granularitySpec.rollup' must be false if 'metricsSpec' is null" + "MSQ: 'granularitySpec.rollup' must be true if and only if 'metricsSpec' is specified" ); } return CompactionConfigValidationResult.success(); @@ -190,7 +216,7 @@ public static CompactionConfigValidationResult validateMaxNumTasksForMSQ(Map CompactionConfigValidationResult.failure( - "MSQ: Non-idempotent aggregator[%s] not supported in 'metricsSpec'.", + "MSQ: Aggregator[%s] not supported in 'metricsSpec'", aggregatorFactory.getName() ) ).orElse(CompactionConfigValidationResult.success()); diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java index 2bc6d251f06b..fd53ed38c257 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java @@ -25,7 +25,7 @@ import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec; import org.apache.druid.common.config.Configs; -import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; @@ -44,6 +44,7 @@ import java.util.Arrays; import java.util.List; import java.util.function.Function; +import java.util.stream.Collectors; /** * Represents the status of compaction for a given {@link CompactionCandidate}. @@ -230,6 +231,21 @@ static PartitionsSpec findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuni } } + private static List getNonPartitioningDimensions( + @Nullable final List dimensionSchemas, + @Nullable final PartitionsSpec partitionsSpec + ) + { + if (dimensionSchemas == null || !(partitionsSpec instanceof DimensionRangePartitionsSpec)) { + return dimensionSchemas; + } + + final List partitionsDimensions = ((DimensionRangePartitionsSpec) partitionsSpec).getPartitionDimensions(); + return dimensionSchemas.stream() + .filter(dim -> !partitionsDimensions.contains(dim.getName())) + .collect(Collectors.toList()); + } + /** * Converts to have only the effective maxRowsPerSegment to avoid false positives when targetRowsPerSegment is set but * effectively translates to the same maxRowsPerSegment. @@ -389,18 +405,34 @@ private CompactionStatus queryGranularityIsUpToDate() } } + /** + * Removes partition dimensions before comparison, since they are placed in front of the sort order -- + * which can create a mismatch between expected and actual order of dimensions. Partition dimensions are separately + * covered in {@link Evaluator#partitionsSpecIsUpToDate()} check. + */ private CompactionStatus dimensionsSpecIsUpToDate() { if (compactionConfig.getDimensionsSpec() == null) { return COMPLETE; } else { - final DimensionsSpec existingDimensionsSpec = lastCompactionState.getDimensionsSpec(); - return CompactionStatus.completeIfEqual( - "dimensionsSpec", + List existingDimensions = getNonPartitioningDimensions( + lastCompactionState.getDimensionsSpec() == null + ? null + : lastCompactionState.getDimensionsSpec().getDimensions(), + lastCompactionState.getPartitionsSpec() + ); + List configuredDimensions = getNonPartitioningDimensions( compactionConfig.getDimensionsSpec().getDimensions(), - existingDimensionsSpec == null ? null : existingDimensionsSpec.getDimensions(), - String::valueOf + compactionConfig.getTuningConfig() == null ? null : compactionConfig.getTuningConfig().getPartitionsSpec() ); + { + return CompactionStatus.completeIfEqual( + "dimensionsSpec", + configuredDimensions, + existingDimensions, + String::valueOf + ); + } } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index b347a57dcb6c..035286692bf5 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -86,6 +86,7 @@ public class CompactSegments implements CoordinatorCustomDuty * Must be the same as org.apache.druid.indexing.common.task.Tasks.STORE_COMPACTION_STATE_KEY */ public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState"; + private static final String COMPACTION_REASON_KEY = "compactionReason"; private static final Logger LOG = new Logger(CompactSegments.class); @@ -567,6 +568,10 @@ private int submitCompactionTasks( slotsRequiredForCurrentTask = findMaxNumTaskSlotsUsedByOneNativeCompactionTask(config.getTuningConfig()); } + if (entry.getCurrentStatus() != null) { + autoCompactionContext.put(COMPACTION_REASON_KEY, entry.getCurrentStatus().getReason()); + } + final String taskId = compactSegments( entry, config.getTaskPriority(), diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java index 011a4640da37..b1f065422805 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java @@ -21,6 +21,8 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.SegmentsSplitHintSpec; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; @@ -36,6 +38,7 @@ import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; import org.joda.time.Duration; @@ -45,6 +48,7 @@ import javax.annotation.Nullable; import java.util.Collections; +import java.util.List; import java.util.Map; public class ClientCompactionRunnerInfoTest @@ -56,6 +60,7 @@ public void testMSQEngineWithHashedPartitionsSpecIsInvalid() new HashedPartitionsSpec(100, null, null), Collections.emptyMap(), null, + null, null ); CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig( @@ -76,6 +81,7 @@ public void testMSQEngineWithMaxTotalRowsIsInvalid() new DynamicPartitionsSpec(100, 100L), Collections.emptyMap(), null, + null, null ); CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig( @@ -96,6 +102,7 @@ public void testMSQEngineWithDynamicPartitionsSpecIsValid() new DynamicPartitionsSpec(100, null), Collections.emptyMap(), null, + null, null ); Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig, CompactionEngine.NATIVE) @@ -103,18 +110,40 @@ public void testMSQEngineWithDynamicPartitionsSpecIsValid() } @Test - public void testMSQEngineWithDimensionRangePartitionsSpecIsValid() + public void testMSQEngineWithStringDimensionsInRangePartitionsSpecIsValid() { DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig( new DimensionRangePartitionsSpec(100, null, ImmutableList.of("partitionDim"), false), Collections.emptyMap(), null, + null, null ); Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig, CompactionEngine.NATIVE) .isValid()); } + @Test + public void testMSQEngineWithLongDimensionsInRangePartitionsSpecIsValid() + { + DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig( + new DimensionRangePartitionsSpec(100, null, ImmutableList.of("partitionDim"), false), + Collections.emptyMap(), + null, + null, + ImmutableList.of(new LongDimensionSchema("partitionDim")) + ); + CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig( + compactionConfig, + CompactionEngine.NATIVE + ); + Assert.assertFalse(validationResult.isValid()); + Assert.assertEquals( + "MSQ: Non-string partition dimension[partitionDim] of type[long] not supported with 'range' partition spec", + validationResult.getReason() + ); + } + @Test public void testMSQEngineWithQueryGranularityAllIsValid() { @@ -122,6 +151,7 @@ public void testMSQEngineWithQueryGranularityAllIsValid() new DynamicPartitionsSpec(3, null), Collections.emptyMap(), new UserCompactionTaskGranularityConfig(Granularities.ALL, Granularities.ALL, false), + null, null ); Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig, CompactionEngine.NATIVE) @@ -135,7 +165,8 @@ public void testMSQEngineWithRollupFalseWithMetricsSpecIsInvalid() new DynamicPartitionsSpec(3, null), Collections.emptyMap(), new UserCompactionTaskGranularityConfig(null, null, false), - new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")} + new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")}, + null ); CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig( compactionConfig, @@ -143,7 +174,7 @@ public void testMSQEngineWithRollupFalseWithMetricsSpecIsInvalid() ); Assert.assertFalse(validationResult.isValid()); Assert.assertEquals( - "MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is specified", + "MSQ: 'granularitySpec.rollup' must be true if and only if 'metricsSpec' is specified", validationResult.getReason() ); } @@ -155,6 +186,7 @@ public void testMSQEngineWithRollupTrueWithoutMetricsSpecIsInvalid() new DynamicPartitionsSpec(3, null), Collections.emptyMap(), new UserCompactionTaskGranularityConfig(null, null, true), + null, null ); CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig( @@ -163,7 +195,7 @@ public void testMSQEngineWithRollupTrueWithoutMetricsSpecIsInvalid() ); Assert.assertFalse(validationResult.isValid()); Assert.assertEquals( - "MSQ: 'granularitySpec.rollup' must be false if 'metricsSpec' is null", + "MSQ: 'granularitySpec.rollup' must be true if and only if 'metricsSpec' is specified", validationResult.getReason() ); } @@ -177,8 +209,9 @@ public void testMSQEngineWithUnsupportedMetricsSpecIsInvalid() DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig( new DynamicPartitionsSpec(3, null), Collections.emptyMap(), - new UserCompactionTaskGranularityConfig(null, null, null), - new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)} + new UserCompactionTaskGranularityConfig(null, null, true), + new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}, + null ); CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig( compactionConfig, @@ -186,29 +219,38 @@ public void testMSQEngineWithUnsupportedMetricsSpecIsInvalid() ); Assert.assertFalse(validationResult.isValid()); Assert.assertEquals( - "MSQ: Non-idempotent aggregator[sum_added] not supported in 'metricsSpec'.", + "MSQ: Aggregator[sum_added] not supported in 'metricsSpec'", validationResult.getReason() ); } @Test - public void testMSQEngineWithRollupNullWithMetricsSpecIsValid() + public void testMSQEngineWithRollupNullWithMetricsSpecIsInvalid() { DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig( new DynamicPartitionsSpec(3, null), Collections.emptyMap(), new UserCompactionTaskGranularityConfig(null, null, null), - new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")} + new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")}, + null + ); + CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig( + compactionConfig, + CompactionEngine.NATIVE + ); + Assert.assertFalse(validationResult.isValid()); + Assert.assertEquals( + "MSQ: 'granularitySpec.rollup' must be true if and only if 'metricsSpec' is specified", + validationResult.getReason() ); - Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig, CompactionEngine.NATIVE) - .isValid()); } private static DataSourceCompactionConfig createMSQCompactionConfig( PartitionsSpec partitionsSpec, Map context, @Nullable UserCompactionTaskGranularityConfig granularitySpec, - @Nullable AggregatorFactory[] metricsSpec + @Nullable AggregatorFactory[] metricsSpec, + List dimensions ) { final DataSourceCompactionConfig config = new DataSourceCompactionConfig( @@ -219,7 +261,7 @@ private static DataSourceCompactionConfig createMSQCompactionConfig( new Period(3600), createTuningConfig(partitionsSpec), granularitySpec, - null, + new UserCompactionTaskDimensionsConfig(dimensions), metricsSpec, null, null, diff --git a/server/src/test/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicyTest.java index 7580582685b0..5659a0ff5bfc 100644 --- a/server/src/test/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicyTest.java @@ -28,6 +28,7 @@ import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; @@ -1137,6 +1138,82 @@ public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentDim Assert.assertFalse(iterator.hasNext()); } + @Test + public void testIteratorDoesNotReturnsSegmentsWhenPartitionDimensionsPrefixed() + { + // Same indexSpec as what is set in the auto compaction config + Map indexSpec = IndexSpec.DEFAULT.asMap(mapper); + // Set range partitions spec with dimensions ["dim2", "dim4"] -- the same as what is set in the auto compaction config + PartitionsSpec partitionsSpec = new DimensionRangePartitionsSpec( + null, + Integer.MAX_VALUE, + ImmutableList.of("dim2", "dim4"), + false + ); + + // Create segments that were compacted (CompactionState != null) and have + // Dimensions=["dim2", "dim4", "dim3", "dim1"] with ["dim2", "dim4"] as partition dimensions for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, + // Dimensions=["dim2", "dim4", "dim1", "dim3"] with ["dim2", "dim4"] as partition dimensions for interval 2017-10-02T00:00:00/2017-10-03T00:00:00, + final SegmentTimeline timeline = createTimeline( + createSegments() + .startingAt("2017-10-01") + .withNumPartitions(4) + .withCompactionState( + new CompactionState(partitionsSpec, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim2", "dim4", "dim3", "dim1"))), null, null, indexSpec, null) + ), + createSegments() + .startingAt("2017-10-02") + .withNumPartitions(4) + .withCompactionState( + new CompactionState(partitionsSpec, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim2", "dim4", "dim1", "dim3"))), null, null, indexSpec, null) + ) + ); + + // Auto compaction config sets Dimensions=["dim1", "dim2", "dim3", "dim4"] and partition dimensions as ["dim2", "dim4"] + CompactionSegmentIterator iterator = createIterator( + configBuilder().withDimensionsSpec( + new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3", "dim4"))) + ) + .withTuningConfig( + new UserCompactionTaskQueryTuningConfig( + null, + null, + null, + 1000L, + null, + partitionsSpec, + IndexSpec.DEFAULT, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ) + ) + .build(), + timeline + ); + // We should get only interval 2017-10-01T00:00:00/2017-10-02T00:00:00 since 2017-10-02T00:00:00/2017-10-03T00:00:00 + // has dimension order as expected post reordering of partition dimensions. + Assert.assertTrue(iterator.hasNext()); + List expectedSegmentsToCompact = new ArrayList<>( + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), Partitions.ONLY_COMPLETE) + ); + Assert.assertEquals( + ImmutableSet.copyOf(expectedSegmentsToCompact), + ImmutableSet.copyOf(iterator.next().getSegments()) + ); + // No more + Assert.assertFalse(iterator.hasNext()); + } + @Test public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentFilter() throws Exception {