Skip to content

Commit

Permalink
Fix issues with MSQ Compaction (apache#17250)
Browse files Browse the repository at this point in the history
The patch makes the following changes:
1. Fixes a bug causing compaction to fail on array, complex, and other non-primitive-type columns
2. Updates compaction status check to be conscious of partition dimensions when comparing dimension ordering.
3. Ensures only string columns are specified as partition dimensions
4. Ensures `rollup` is true if and only if metricsSpec is non-empty
5. Ensures disjoint intervals aren't submitted for compaction
6. Adds `compactionReason` to compaction task context.
  • Loading branch information
gargvishesh authored Oct 6, 2024
1 parent 7d9e6d3 commit 7e35e50
Show file tree
Hide file tree
Showing 10 changed files with 317 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.inject.Injector;
import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
import org.apache.druid.data.input.impl.DimensionSchema;
Expand Down Expand Up @@ -129,21 +130,35 @@ public MSQCompactionRunner(
* The following configs aren't supported:
* <ul>
* <li>partitionsSpec of type HashedParititionsSpec.</li>
* <li>'range' partitionsSpec with non-string partition dimensions.</li>
* <li>maxTotalRows in DynamicPartitionsSpec.</li>
* <li>rollup in granularitySpec set to false when metricsSpec is specified or true when it's null.
* Null is treated as true if metricsSpec exist and false if empty.</li>
* <li>any metric is non-idempotent, i.e. it defines some aggregatorFactory 'A' s.t. 'A != A.combiningFactory()'.</li>
* <li>Rollup without metricsSpec being specified or vice-versa.</li>
* <li>Any aggregatorFactory {@code A} s.t. {@code A != A.combiningFactory()}.</li>
* <li>Multiple disjoint intervals in compaction task</li>
* </ul>
*/
@Override
public CompactionConfigValidationResult validateCompactionTask(
CompactionTask compactionTask
CompactionTask compactionTask,
Map<Interval, DataSchema> intervalToDataSchemaMap
)
{
if (intervalToDataSchemaMap.size() > 1) {
// We are currently not able to handle multiple intervals in the map for multiple reasons, one of them being that
// the subsequent worker ids clash -- since they are derived from MSQControllerTask ID which in turn is equal to
// CompactionTask ID for each sequentially launched MSQControllerTask.
return CompactionConfigValidationResult.failure(
"MSQ: Disjoint compaction intervals[%s] not supported",
intervalToDataSchemaMap.keySet()
);
}
List<CompactionConfigValidationResult> validationResults = new ArrayList<>();
if (compactionTask.getTuningConfig() != null) {
validationResults.add(ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
compactionTask.getTuningConfig().getPartitionsSpec())
validationResults.add(
ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
compactionTask.getTuningConfig().getPartitionsSpec(),
Iterables.getOnlyElement(intervalToDataSchemaMap.values()).getDimensionsSpec().getDimensions()
)
);
}
if (compactionTask.getGranularitySpec() != null) {
Expand Down Expand Up @@ -300,7 +315,7 @@ private static RowSignature getRowSignature(DataSchema dataSchema)
rowSignatureBuilder.add(TIME_VIRTUAL_COLUMN, ColumnType.LONG);
}
for (DimensionSchema dimensionSchema : dataSchema.getDimensionsSpec().getDimensions()) {
rowSignatureBuilder.add(dimensionSchema.getName(), ColumnType.fromString(dimensionSchema.getTypeName()));
rowSignatureBuilder.add(dimensionSchema.getName(), dimensionSchema.getColumnType());
}
// There can be columns that are part of metricsSpec for a datasource.
for (AggregatorFactory aggregatorFactory : dataSchema.getAggregators()) {
Expand Down Expand Up @@ -416,7 +431,9 @@ private static boolean isGroupBy(DataSchema dataSchema)
{
if (dataSchema.getGranularitySpec() != null) {
// If rollup is true without any metrics, all columns are treated as dimensions and
// duplicate rows are removed in line with native compaction.
// duplicate rows are removed in line with native compaction. This case can only happen if the rollup is
// specified as null in the compaction spec and is then inferred to be true by segment analysis. metrics=null and
// rollup=true combination in turn can only have been recorded for natively ingested segments.
return dataSchema.getGranularitySpec().isRollup();
}
// If no rollup specified, decide based on whether metrics are present.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
Expand All @@ -41,6 +42,7 @@
import org.apache.druid.indexing.common.task.TuningConfigBuilder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
Expand Down Expand Up @@ -96,7 +98,6 @@ public class MSQCompactionRunnerTest
private static final int MAX_ROWS_PER_SEGMENT = 150000;
private static final GranularityType SEGMENT_GRANULARITY = GranularityType.HOUR;
private static final GranularityType QUERY_GRANULARITY = GranularityType.HOUR;
private static List<String> PARTITION_DIMENSIONS;

private static final StringDimensionSchema STRING_DIMENSION = new StringDimensionSchema("string_dim", null, false);
private static final StringDimensionSchema MV_STRING_DIMENSION = new StringDimensionSchema("mv_string_dim", null, null);
Expand All @@ -106,24 +107,49 @@ public class MSQCompactionRunnerTest
LONG_DIMENSION,
MV_STRING_DIMENSION
);
private static final Map<Interval, DataSchema> INTERVAL_DATASCHEMAS = ImmutableMap.of(
COMPACTION_INTERVAL,
new DataSchema.Builder()
.withDataSource(DATA_SOURCE)
.withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null))
.withDimensions(new DimensionsSpec(DIMENSIONS))
.build()
);
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final AggregatorFactory AGG1 = new CountAggregatorFactory("agg_0");
private static final AggregatorFactory AGG2 = new LongSumAggregatorFactory("sum_added", "sum_added");
private static final List<AggregatorFactory> AGGREGATORS = ImmutableList.of(AGG1, AGG2);
private static final MSQCompactionRunner MSQ_COMPACTION_RUNNER = new MSQCompactionRunner(JSON_MAPPER, TestExprMacroTable.INSTANCE, null);
private static final List<String> PARTITION_DIMENSIONS = Collections.singletonList(STRING_DIMENSION.getName());


@BeforeClass
public static void setupClass()
{
NullHandling.initializeForTests();
}

final StringDimensionSchema stringDimensionSchema = new StringDimensionSchema(
"string_dim",
@Test
public void testMultipleDisjointCompactionIntervalsAreInvalid()
{
Map<Interval, DataSchema> intervalDataschemas = new HashMap<>(INTERVAL_DATASCHEMAS);
intervalDataschemas.put(Intervals.of("2017-07-01/2018-01-01"), null);
CompactionTask compactionTask = createCompactionTask(
new HashedPartitionsSpec(3, null, ImmutableList.of("dummy")),
null,
Collections.emptyMap(),
null,
null
);

PARTITION_DIMENSIONS = Collections.singletonList(stringDimensionSchema.getName());
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(
compactionTask,
intervalDataschemas
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
StringUtils.format("MSQ: Disjoint compaction intervals[%s] not supported", intervalDataschemas.keySet()),
validationResult.getReason()
);
}

@Test
Expand All @@ -136,11 +162,11 @@ public void testHashedPartitionsSpecIsInvalid()
null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}

@Test
public void testDimensionRangePartitionsSpecIsValid()
public void testStringDimensionInRangePartitionsSpecIsValid()
{
CompactionTask compactionTask = createCompactionTask(
new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null, PARTITION_DIMENSIONS, false),
Expand All @@ -149,7 +175,29 @@ public void testDimensionRangePartitionsSpecIsValid()
null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}

@Test
public void testLongDimensionInRangePartitionsSpecIsInvalid()
{
List<String> longPartitionDimension = Collections.singletonList(LONG_DIMENSION.getName());
CompactionTask compactionTask = createCompactionTask(
new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null, longPartitionDimension, false),
null,
Collections.emptyMap(),
null,
null
);

CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: Non-string partition dimension[long_dim] of type[long] not supported with 'range' partition spec",
validationResult.getReason()
);
}

@Test
Expand All @@ -162,7 +210,7 @@ public void testMaxTotalRowsIsInvalid()
null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}

@Test
Expand All @@ -175,7 +223,7 @@ public void testDynamicPartitionsSpecIsValid()
null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}

@Test
Expand All @@ -188,7 +236,7 @@ public void testQueryGranularityAllIsValid()
new ClientCompactionTaskGranularitySpec(null, Granularities.ALL, null),
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}

@Test
Expand All @@ -201,7 +249,7 @@ public void testRollupFalseWithMetricsSpecIsInValid()
new ClientCompactionTaskGranularitySpec(null, null, false),
AGGREGATORS.toArray(new AggregatorFactory[0])
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}

@Test
Expand All @@ -214,7 +262,7 @@ public void testRollupTrueWithoutMetricsSpecIsInValid()
new ClientCompactionTaskGranularitySpec(null, null, true),
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}

@Test
Expand All @@ -227,13 +275,16 @@ public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
new DynamicPartitionsSpec(3, null),
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, null, null),
new ClientCompactionTaskGranularitySpec(null, null, true),
new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}
);
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask);
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(
compactionTask,
INTERVAL_DATASCHEMAS
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: Non-idempotent aggregator[sum_added] not supported in 'metricsSpec'.",
"MSQ: Aggregator[sum_added] not supported in 'metricsSpec'",
validationResult.getReason()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ TaskStatus runCompactionTasks(
* Checks if the provided compaction config is supported by the runner.
* The same validation is done at {@link org.apache.druid.msq.indexing.MSQCompactionRunner#validateCompactionTask}
*/
CompactionConfigValidationResult validateCompactionTask(CompactionTask compactionTask);
CompactionConfigValidationResult validateCompactionTask(
CompactionTask compactionTask,
Map<Interval, DataSchema> intervalToDataSchemaMap
);

}
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,10 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
);

registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder());
CompactionConfigValidationResult supportsCompactionConfig = compactionRunner.validateCompactionTask(this);
CompactionConfigValidationResult supportsCompactionConfig = compactionRunner.validateCompactionTask(
this,
intervalDataSchemas
);
if (!supportsCompactionConfig.isValid()) {
throw InvalidInput.exception("Compaction spec not supported. Reason[%s].", supportsCompactionConfig.getReason());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public CurrentSubTaskHolder getCurrentSubTaskHolder()

@Override
public CompactionConfigValidationResult validateCompactionTask(
CompactionTask compactionTask
CompactionTask compactionTask,
Map<Interval, DataSchema> intervalDataSchemaMap
)
{
return CompactionConfigValidationResult.success();
Expand Down
Loading

0 comments on commit 7e35e50

Please sign in to comment.