diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java index 5457e04286c9..dc19e33282cf 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java @@ -54,19 +54,21 @@ import org.apache.druid.query.QueryContexts; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.ExpressionPostAggregator; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DimensionSpec; -import org.apache.druid.query.expression.TimestampFloorExprMacro; -import org.apache.druid.query.expression.TimestampParseExprMacro; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.indexing.CombinedDataSchema; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; @@ -82,6 +84,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -92,12 +95,14 @@ public class MSQCompactionRunner implements CompactionRunner private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.ALL; private final ObjectMapper jsonMapper; + private final ExprMacroTable exprMacroTable; private final Injector injector; // Needed as output column name while grouping in the scenario of: // a) no query granularity -- to specify an output name for the time dimension column since __time is a reserved name. // b) custom query granularity -- to create a virtual column containing the rounded-off row timestamp. // In both cases, the new column is converted back to __time later using columnMappings. public static final String TIME_VIRTUAL_COLUMN = "__vTime"; + public static final String ARRAY_VIRTUAL_COLUMN_PREFIX = "__vArray_"; @JsonIgnore private final CurrentSubTaskHolder currentSubTaskHolder = new CurrentSubTaskHolder( @@ -108,9 +113,14 @@ public class MSQCompactionRunner implements CompactionRunner @JsonCreator - public MSQCompactionRunner(@JacksonInject ObjectMapper jsonMapper, @JacksonInject Injector injector) + public MSQCompactionRunner( + @JacksonInject final ObjectMapper jsonMapper, + @JacksonInject final ExprMacroTable exprMacroTable, + @JacksonInject final Injector injector + ) { this.jsonMapper = jsonMapper; + this.exprMacroTable = exprMacroTable; this.injector = injector; } @@ -192,11 +202,12 @@ public List createMsqControllerTasks( Query query; Interval interval = intervalDataSchema.getKey(); DataSchema dataSchema = intervalDataSchema.getValue(); + Map inputColToVirtualCol = getVirtualColumns(dataSchema, interval); if (isGroupBy(dataSchema)) { - query = buildGroupByQuery(compactionTask, interval, dataSchema); + query = buildGroupByQuery(compactionTask, interval, dataSchema, inputColToVirtualCol); } else { - query = buildScanQuery(compactionTask, interval, dataSchema); + query = buildScanQuery(compactionTask, interval, dataSchema, inputColToVirtualCol); } QueryContext compactionTaskContext = new QueryContext(compactionTask.getContext()); @@ -308,7 +319,10 @@ private static RowSignature getRowSignature(DataSchema dataSchema) return rowSignatureBuilder.build(); } - private static List getAggregateDimensions(DataSchema dataSchema) + private static List getAggregateDimensions( + DataSchema dataSchema, + Map inputColToVirtualCol + ) { List dimensionSpecs = new ArrayList<>(); @@ -319,14 +333,22 @@ private static List getAggregateDimensions(DataSchema dataSchema) // The changed granularity would result in a new virtual column that needs to be aggregated upon. dimensionSpecs.add(new DefaultDimensionSpec(TIME_VIRTUAL_COLUMN, TIME_VIRTUAL_COLUMN, ColumnType.LONG)); } - - dimensionSpecs.addAll(dataSchema.getDimensionsSpec().getDimensions().stream() - .map(dim -> new DefaultDimensionSpec( - dim.getName(), - dim.getName(), - dim.getColumnType() - )) - .collect(Collectors.toList())); + // If virtual columns are created from dimensions, replace dimension columns names with virtual column names. + dimensionSpecs.addAll( + dataSchema.getDimensionsSpec().getDimensions().stream() + .map(dim -> { + String dimension = dim.getName(); + ColumnType colType = dim.getColumnType(); + if (inputColToVirtualCol.containsKey(dim.getName())) { + VirtualColumn virtualColumn = inputColToVirtualCol.get(dimension); + dimension = virtualColumn.getOutputName(); + if (virtualColumn instanceof ExpressionVirtualColumn) { + colType = ((ExpressionVirtualColumn) virtualColumn).getOutputType(); + } + } + return new DefaultDimensionSpec(dimension, dimension, colType); + }) + .collect(Collectors.toList())); return dimensionSpecs; } @@ -365,13 +387,19 @@ private static List getOrderBySpec(PartitionsSpec partitionSp return Collections.emptyList(); } - private static Query buildScanQuery(CompactionTask compactionTask, Interval interval, DataSchema dataSchema) + private static Query buildScanQuery( + CompactionTask compactionTask, + Interval interval, + DataSchema dataSchema, + Map inputColToVirtualCol + ) { RowSignature rowSignature = getRowSignature(dataSchema); + VirtualColumns virtualColumns = VirtualColumns.create(new ArrayList<>(inputColToVirtualCol.values())); Druids.ScanQueryBuilder scanQueryBuilder = new Druids.ScanQueryBuilder() .dataSource(dataSchema.getDataSource()) .columns(rowSignature.getColumnNames()) - .virtualColumns(getVirtualColumns(dataSchema, interval)) + .virtualColumns(virtualColumns) .columnTypes(rowSignature.getColumnTypes()) .intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(interval))) .filters(dataSchema.getTransformSpec().getFilter()) @@ -416,51 +444,115 @@ private static boolean isQueryGranularityEmptyOrNone(DataSchema dataSchema) } /** - * Creates a virtual timestamp column to create a new __time field according to the provided queryGranularity, as - * queryGranularity field itself is mandated to be ALL in MSQControllerTask. + * Conditionally creates below virtual columns + *
    + *
  • timestamp column (for custom queryGranularity): converts __time field in line with the provided + * queryGranularity, since the queryGranularity field itself in MSQControllerTask is mandated to be ALL.
  • + *
  • mv_to_array columns (for group-by queries): temporary columns that convert MVD columns to array to enable + * grouping on them without unnesting.
  • + *
*/ - private static VirtualColumns getVirtualColumns(DataSchema dataSchema, Interval interval) + private Map getVirtualColumns(DataSchema dataSchema, Interval interval) { - if (isQueryGranularityEmptyOrNone(dataSchema)) { - return VirtualColumns.EMPTY; + Map inputColToVirtualCol = new HashMap<>(); + if (!isQueryGranularityEmptyOrNone(dataSchema)) { + // Round-off time field according to provided queryGranularity + String timeVirtualColumnExpr; + if (dataSchema.getGranularitySpec() + .getQueryGranularity() + .equals(Granularities.ALL)) { + // For ALL query granularity, all records in a segment are assigned the interval start timestamp of the segment. + // It's the same behaviour in native compaction. + timeVirtualColumnExpr = StringUtils.format("timestamp_parse('%s')", interval.getStart()); + } else { + PeriodGranularity periodQueryGranularity = (PeriodGranularity) dataSchema.getGranularitySpec() + .getQueryGranularity(); + // Round off the __time column according to the required granularity. + timeVirtualColumnExpr = + StringUtils.format( + "timestamp_floor(\"%s\", '%s')", + ColumnHolder.TIME_COLUMN_NAME, + periodQueryGranularity.getPeriod().toString() + ); + } + inputColToVirtualCol.put(ColumnHolder.TIME_COLUMN_NAME, new ExpressionVirtualColumn( + TIME_VIRTUAL_COLUMN, + timeVirtualColumnExpr, + ColumnType.LONG, + exprMacroTable + )); } - String virtualColumnExpr; - if (dataSchema.getGranularitySpec() - .getQueryGranularity() - .equals(Granularities.ALL)) { - // For ALL query granularity, all records in a segment are assigned the interval start timestamp of the segment. - // It's the same behaviour in native compaction. - virtualColumnExpr = StringUtils.format("timestamp_parse('%s')", interval.getStart()); - } else { - PeriodGranularity periodQueryGranularity = (PeriodGranularity) dataSchema.getGranularitySpec() - .getQueryGranularity(); - // Round of the __time column according to the required granularity. - virtualColumnExpr = - StringUtils.format( - "timestamp_floor(\"%s\", '%s')", - ColumnHolder.TIME_COLUMN_NAME, - periodQueryGranularity.getPeriod().toString() - ); + if (isGroupBy(dataSchema)) { + // Convert MVDs to arrays for grouping to avoid unnest, assuming all string cols to be MVDs. + Set multiValuedColumns = dataSchema.getDimensionsSpec() + .getDimensions() + .stream() + .filter(dim -> dim.getColumnType().equals(ColumnType.STRING)) + .map(DimensionSchema::getName) + .collect(Collectors.toSet()); + if (dataSchema instanceof CombinedDataSchema && + ((CombinedDataSchema) dataSchema).getMultiValuedDimensions() != null) { + // Filter actual MVDs from schema info. + Set multiValuedColumnsFromSchema = + ((CombinedDataSchema) dataSchema).getMultiValuedDimensions(); + multiValuedColumns = multiValuedColumns.stream() + .filter(multiValuedColumnsFromSchema::contains) + .collect(Collectors.toSet()); + } + + for (String dim : multiValuedColumns) { + String virtualColumnExpr = StringUtils.format("mv_to_array(\"%s\")", dim); + inputColToVirtualCol.put( + dim, + new ExpressionVirtualColumn( + ARRAY_VIRTUAL_COLUMN_PREFIX + dim, + virtualColumnExpr, + ColumnType.STRING_ARRAY, + exprMacroTable + ) + ); + } } - return VirtualColumns.create(new ExpressionVirtualColumn( - TIME_VIRTUAL_COLUMN, - virtualColumnExpr, - ColumnType.LONG, - new ExprMacroTable(ImmutableList.of(new TimestampFloorExprMacro(), new TimestampParseExprMacro())) - )); + return inputColToVirtualCol; } - private static Query buildGroupByQuery(CompactionTask compactionTask, Interval interval, DataSchema dataSchema) + private Query buildGroupByQuery( + CompactionTask compactionTask, + Interval interval, + DataSchema dataSchema, + Map inputColToVirtualCol + ) { DimFilter dimFilter = dataSchema.getTransformSpec().getFilter(); + VirtualColumns virtualColumns = VirtualColumns.create(new ArrayList<>(inputColToVirtualCol.values())); + + // Convert MVDs converted to arrays back to MVDs, with the same name as the input column. + // This is safe since input column names no longer exist at post-aggregation stage. + List postAggregators = + inputColToVirtualCol.entrySet() + .stream() + .filter(entry -> !entry.getKey().equals(ColumnHolder.TIME_COLUMN_NAME)) + .map( + entry -> + new ExpressionPostAggregator( + entry.getKey(), + StringUtils.format("array_to_mv(\"%s\")", entry.getValue().getOutputName()), + null, + ColumnType.STRING, + exprMacroTable + ) + ) + .collect(Collectors.toList()); + GroupByQuery.Builder builder = new GroupByQuery.Builder() .setDataSource(new TableDataSource(compactionTask.getDataSource())) - .setVirtualColumns(getVirtualColumns(dataSchema, interval)) + .setVirtualColumns(virtualColumns) .setDimFilter(dimFilter) .setGranularity(new AllGranularity()) - .setDimensions(getAggregateDimensions(dataSchema)) + .setDimensions(getAggregateDimensions(dataSchema, inputColToVirtualCol)) .setAggregatorSpecs(Arrays.asList(dataSchema.getAggregators())) + .setPostAggregatorSpecs(postAggregators) .setContext(compactionTask.getContext()) .setInterval(interval); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java index 09c5ae477182..34092d061b2a 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java @@ -20,7 +20,6 @@ package org.apache.druid.msq.indexing; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -44,7 +43,6 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.GranularityType; -import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; import org.apache.druid.msq.util.MultiStageQueryContext; @@ -52,19 +50,27 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; -import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.data.CompressionFactory; import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; +import org.apache.druid.segment.indexing.CombinedDataSchema; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; import org.joda.time.Interval; import org.junit.Assert; import org.junit.BeforeClass; @@ -73,8 +79,10 @@ import javax.annotation.Nullable; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -83,20 +91,25 @@ public class MSQCompactionRunnerTest private static final String DATA_SOURCE = "dataSource"; private static final Interval COMPACTION_INTERVAL = Intervals.of("2017-01-01/2017-07-01"); - private static final String TIMESTAMP_COLUMN = "timestamp"; + private static final String TIMESTAMP_COLUMN = ColumnHolder.TIME_COLUMN_NAME; private static final int TARGET_ROWS_PER_SEGMENT = 100000; private static final GranularityType SEGMENT_GRANULARITY = GranularityType.HOUR; private static final GranularityType QUERY_GRANULARITY = GranularityType.HOUR; private static List PARTITION_DIMENSIONS; - private static final StringDimensionSchema STRING_DIMENSION = new StringDimensionSchema("string_dim", null, null); + private static final StringDimensionSchema STRING_DIMENSION = new StringDimensionSchema("string_dim", null, false); + private static final StringDimensionSchema MV_STRING_DIMENSION = new StringDimensionSchema("mv_string_dim", null, null); private static final LongDimensionSchema LONG_DIMENSION = new LongDimensionSchema("long_dim"); - private static final List DIMENSIONS = ImmutableList.of(STRING_DIMENSION, LONG_DIMENSION); + private static final List DIMENSIONS = ImmutableList.of( + STRING_DIMENSION, + LONG_DIMENSION, + MV_STRING_DIMENSION + ); private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); private static final AggregatorFactory AGG1 = new CountAggregatorFactory("agg_0"); private static final AggregatorFactory AGG2 = new LongSumAggregatorFactory("sum_added", "sum_added"); private static final List AGGREGATORS = ImmutableList.of(AGG1, AGG2); - private static final MSQCompactionRunner MSQ_COMPACTION_RUNNER = new MSQCompactionRunner(JSON_MAPPER, null); + private static final MSQCompactionRunner MSQ_COMPACTION_RUNNER = new MSQCompactionRunner(JSON_MAPPER, TestExprMacroTable.INSTANCE, null); @BeforeClass public static void setupClass() @@ -110,11 +123,6 @@ public static void setupClass() ); PARTITION_DIMENSIONS = Collections.singletonList(stringDimensionSchema.getName()); - - JSON_MAPPER.setInjectableValues(new InjectableValues.Std().addValue( - ExprMacroTable.class, - LookupEnabledTestExprMacroTable.INSTANCE - )); } @Test @@ -296,7 +304,10 @@ public void testMSQControllerTaskSpecWithScanIsValid() throws JsonProcessingExce actualMSQSpec.getDestination() ); - Assert.assertEquals(dimFilter, actualMSQSpec.getQuery().getFilter()); + Assert.assertTrue(actualMSQSpec.getQuery() instanceof ScanQuery); + ScanQuery scanQuery = (ScanQuery) actualMSQSpec.getQuery(); + + Assert.assertEquals(dimFilter, scanQuery.getFilter()); Assert.assertEquals( JSON_MAPPER.writeValueAsString(SEGMENT_GRANULARITY.toString()), msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY) @@ -305,7 +316,7 @@ public void testMSQControllerTaskSpecWithScanIsValid() throws JsonProcessingExce Assert.assertEquals(WorkerAssignmentStrategy.MAX, actualMSQSpec.getAssignmentStrategy()); Assert.assertEquals( PARTITION_DIMENSIONS.stream().map(OrderBy::ascending).collect(Collectors.toList()), - ((ScanQuery) actualMSQSpec.getQuery()).getOrderBys() + scanQuery.getOrderBys() ); } @@ -322,7 +333,10 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess null ); - DataSchema dataSchema = new DataSchema( + Set multiValuedDimensions = new HashSet<>(); + multiValuedDimensions.add(MV_STRING_DIMENSION.getName()); + + CombinedDataSchema dataSchema = new CombinedDataSchema( DATA_SOURCE, new TimestampSpec(TIMESTAMP_COLUMN, null, null), new DimensionsSpec(DIMENSIONS), @@ -332,7 +346,8 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess QUERY_GRANULARITY.getDefaultGranularity(), Collections.singletonList(COMPACTION_INTERVAL) ), - new TransformSpec(dimFilter, Collections.emptyList()) + new TransformSpec(dimFilter, Collections.emptyList()), + multiValuedDimensions ); @@ -367,7 +382,10 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess actualMSQSpec.getDestination() ); - Assert.assertEquals(dimFilter, actualMSQSpec.getQuery().getFilter()); + Assert.assertTrue(actualMSQSpec.getQuery() instanceof GroupByQuery); + GroupByQuery groupByQuery = (GroupByQuery) actualMSQSpec.getQuery(); + + Assert.assertEquals(dimFilter, groupByQuery.getFilter()); Assert.assertEquals( JSON_MAPPER.writeValueAsString(SEGMENT_GRANULARITY.toString()), msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY) @@ -377,6 +395,31 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY) ); Assert.assertEquals(WorkerAssignmentStrategy.MAX, actualMSQSpec.getAssignmentStrategy()); + + + // Since only MV_STRING_DIMENSION is indicated to be MVD by the CombinedSchema, conversion to array should happen + // only for that column. + List expectedDimensionSpec = DIMENSIONS.stream() + .filter(dim -> !MV_STRING_DIMENSION.getName() + .equals(dim.getName())) + .map(dim -> new DefaultDimensionSpec( + dim.getName(), + dim.getName(), + dim.getColumnType() + )) + .collect( + Collectors.toList()); + expectedDimensionSpec.add( + new DefaultDimensionSpec(MSQCompactionRunner.TIME_VIRTUAL_COLUMN, + MSQCompactionRunner.TIME_VIRTUAL_COLUMN, + ColumnType.LONG) + ); + String mvToArrayStringDim = MSQCompactionRunner.ARRAY_VIRTUAL_COLUMN_PREFIX + MV_STRING_DIMENSION.getName(); + expectedDimensionSpec.add(new DefaultDimensionSpec(mvToArrayStringDim, mvToArrayStringDim, ColumnType.STRING_ARRAY)); + MatcherAssert.assertThat( + expectedDimensionSpec, + Matchers.containsInAnyOrder(groupByQuery.getDimensions().toArray(new DimensionSpec[0])) + ); } private CompactionTask createCompactionTask( @@ -408,7 +451,7 @@ private CompactionTask createCompactionTask( .transformSpec(transformSpec) .granularitySpec(granularitySpec) .metricsSpec(metricsSpec) - .compactionRunner(new MSQCompactionRunner(JSON_MAPPER, null)) + .compactionRunner(MSQ_COMPACTION_RUNNER) .context(context); return builder.build(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index ab7736b047e1..73c8a35405c4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -77,8 +77,10 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.incremental.AppendableIndexSpec; +import org.apache.druid.segment.indexing.CombinedDataSchema; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.indexing.granularity.GranularitySpec; @@ -462,7 +464,8 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception transformSpec, metricsSpec, granularitySpec, - getMetricBuilder() + getMetricBuilder(), + !(compactionRunner instanceof NativeCompactionRunner) ); registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder()); @@ -488,7 +491,8 @@ static Map createDataSchemasForIntervals( @Nullable final ClientCompactionTaskTransformSpec transformSpec, @Nullable final AggregatorFactory[] metricsSpec, @Nullable final ClientCompactionTaskGranularitySpec granularitySpec, - final ServiceMetricEvent.Builder metricBuilder + final ServiceMetricEvent.Builder metricBuilder, + boolean needMultiValuedColumns ) throws IOException { final Iterable timelineSegments = retrieveRelevantTimelineHolders( @@ -552,7 +556,8 @@ static Map createDataSchemasForIntervals( metricsSpec, granularitySpec == null ? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, null, null) - : granularitySpec.withSegmentGranularity(segmentGranularityToUse) + : granularitySpec.withSegmentGranularity(segmentGranularityToUse), + needMultiValuedColumns ); intervalDataSchemaMap.put(interval, dataSchema); } @@ -577,7 +582,8 @@ static Map createDataSchemasForIntervals( dimensionsSpec, transformSpec, metricsSpec, - granularitySpec + granularitySpec, + needMultiValuedColumns ); return Collections.singletonMap(segmentProvider.interval, dataSchema); } @@ -607,7 +613,8 @@ private static DataSchema createDataSchema( @Nullable DimensionsSpec dimensionsSpec, @Nullable ClientCompactionTaskTransformSpec transformSpec, @Nullable AggregatorFactory[] metricsSpec, - @Nonnull ClientCompactionTaskGranularitySpec granularitySpec + @Nonnull ClientCompactionTaskGranularitySpec granularitySpec, + boolean needMultiValuedColumns ) { // Check index metadata & decide which values to propagate (i.e. carry over) for rollup & queryGranularity @@ -616,7 +623,8 @@ private static DataSchema createDataSchema( granularitySpec.isRollup() == null, granularitySpec.getQueryGranularity() == null, dimensionsSpec == null, - metricsSpec == null + metricsSpec == null, + needMultiValuedColumns ); final Stopwatch stopwatch = Stopwatch.createStarted(); @@ -668,13 +676,14 @@ private static DataSchema createDataSchema( finalMetricsSpec = metricsSpec; } - return new DataSchema( + return new CombinedDataSchema( dataSource, new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null), finalDimensionsSpec, finalMetricsSpec, uniformGranularitySpec, - transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null) + transformSpec == null ? null : new TransformSpec(transformSpec.getFilter(), null), + existingSegmentAnalyzer.getMultiValuedDimensions() ); } @@ -748,6 +757,7 @@ static class ExistingSegmentAnalyzer private final boolean needQueryGranularity; private final boolean needDimensionsSpec; private final boolean needMetricsSpec; + private final boolean needMultiValuedDimensions; // For processRollup: private boolean rollup = true; @@ -761,13 +771,15 @@ static class ExistingSegmentAnalyzer // For processMetricsSpec: private final Set> aggregatorFactoryLists = new HashSet<>(); + private Set multiValuedDimensions; ExistingSegmentAnalyzer( final Iterable>>> segmentsIterable, final boolean needRollup, final boolean needQueryGranularity, final boolean needDimensionsSpec, - final boolean needMetricsSpec + final boolean needMetricsSpec, + final boolean needMultiValuedDimensions ) { this.segmentsIterable = segmentsIterable; @@ -775,15 +787,26 @@ static class ExistingSegmentAnalyzer this.needQueryGranularity = needQueryGranularity; this.needDimensionsSpec = needDimensionsSpec; this.needMetricsSpec = needMetricsSpec; + this.needMultiValuedDimensions = needMultiValuedDimensions; + } + + private boolean shouldFetchSegments() + { + // Don't fetch segments for just needMultiValueDimensions + return needRollup || needQueryGranularity || needDimensionsSpec || needMetricsSpec; } public void fetchAndProcessIfNeeded() { - if (!needRollup && !needQueryGranularity && !needDimensionsSpec && !needMetricsSpec) { + if (!shouldFetchSegments()) { // Nothing to do; short-circuit and don't fetch segments. return; } + if (needMultiValuedDimensions) { + multiValuedDimensions = new HashSet<>(); + } + final List>>> segments = sortSegmentsListNewestFirst(); for (Pair>> segmentPair : segments) { @@ -804,6 +827,7 @@ public void fetchAndProcessIfNeeded() processQueryGranularity(index); processDimensionsSpec(index); processMetricsSpec(index); + processMultiValuedDimensions(index); } } } @@ -890,6 +914,11 @@ public AggregatorFactory[] getMetricsSpec() return mergedAggregators; } + public Set getMultiValuedDimensions() + { + return multiValuedDimensions; + } + /** * Sort {@link #segmentsIterable} in order, such that we look at later segments prior to earlier ones. Useful when * analyzing dimensions, as it allows us to take the latest value we see, and therefore prefer types from more @@ -983,6 +1012,24 @@ private void processMetricsSpec(final QueryableIndex index) } } + private void processMultiValuedDimensions(final QueryableIndex index) + { + if (!needMultiValuedDimensions) { + return; + } + for (String dimension : index.getAvailableDimensions()) { + if (isMultiValuedDimension(index, dimension)) { + multiValuedDimensions.add(dimension); + } + } + } + + private boolean isMultiValuedDimension(final QueryableIndex index, final String col) + { + ColumnCapabilities columnCapabilities = index.getColumnCapabilities(col); + return columnCapabilities != null && columnCapabilities.hasMultipleValues().isTrue(); + } + static Granularity compareWithCurrent(Granularity queryGranularity, Granularity current) { if (queryGranularity == null && current != null) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 84a1457ad914..c1bf649980f6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -116,6 +116,7 @@ import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.BatchIOConfig; +import org.apache.druid.segment.indexing.CombinedDataSchema; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; @@ -749,7 +750,8 @@ public void testCreateIngestionSchema() throws IOException null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -810,7 +812,8 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -872,7 +875,8 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -935,7 +939,8 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1005,7 +1010,8 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1055,7 +1061,8 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException null, customMetricsSpec, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1098,7 +1105,8 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1148,7 +1156,8 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); NativeCompactionRunner.createIngestionSpecs( @@ -1178,7 +1187,8 @@ public void testMissingMetadata() throws IOException null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); NativeCompactionRunner.createIngestionSpecs( @@ -1219,7 +1229,8 @@ public void testSegmentGranularityAndNullQueryGranularity() throws IOException null, null, new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null, null), - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1263,7 +1274,8 @@ public void testQueryGranularityAndNullSegmentGranularity() throws IOException null, null, new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null), null), - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( dataSchemasForIntervals, @@ -1308,7 +1320,8 @@ public void testQueryGranularityAndSegmentGranularityNonNull() throws IOExceptio new PeriodGranularity(Period.months(3), null, null), null ), - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1355,7 +1368,8 @@ public void testNullGranularitySpec() throws IOException null, null, null, - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1400,7 +1414,8 @@ public void testGranularitySpecWithNullQueryGranularityAndNullSegmentGranularity null, null, new ClientCompactionTaskGranularitySpec(null, null, null), - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1445,7 +1460,8 @@ public void testGranularitySpecWithNotNullRollup() null, null, new ClientCompactionTaskGranularitySpec(null, null, true), - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1475,7 +1491,8 @@ public void testGranularitySpecWithNullRollup() null, null, new ClientCompactionTaskGranularitySpec(null, null, null), - METRIC_BUILDER + METRIC_BUILDER, + false ); final List ingestionSpecs = NativeCompactionRunner.createIngestionSpecs( @@ -1495,6 +1512,27 @@ public void testGranularitySpecWithNullRollup() } } + @Test + public void testMultiValuedDimensionsProcessing() + throws IOException + { + final Map dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals( + toolbox, + LockGranularity.TIME_CHUNK, + new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), + null, + null, + null, + new ClientCompactionTaskGranularitySpec(null, null, null), + METRIC_BUILDER, + true + ); + for (DataSchema dataSchema : dataSchemasForIntervals.values()) { + Assert.assertTrue(dataSchema instanceof CombinedDataSchema); + Assert.assertTrue(((CombinedDataSchema) dataSchema).getMultiValuedDimensions().isEmpty()); + } + } + @Test public void testChooseFinestGranularityWithNulls() { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 31a6bccffc72..e95d09bd5082 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -526,7 +526,7 @@ public void testAutoCompactionPreservesCreateBitmapIndexInDimensionSchema(Compac try (final Closeable ignored = unloader(fullDatasourceName)) { final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); intervalsBeforeCompaction.sort(null); - // 4 segments across 2 days (4 total)... + // 4 segments across 2 days (4 total) verifySegmentsCount(4); verifyQuery(INDEX_QUERIES_RESOURCE); @@ -548,13 +548,59 @@ public void testAutoCompactionPreservesCreateBitmapIndexInDimensionSchema(Compac true, engine ); - //...compacted into 1 segment for the entire year. + // Compacted into 1 segment for the entire year. forceTriggerAutoCompaction(1); verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); verifySegmentsCompactedDimensionSchema(dimensionSchemas); } } + @Test(dataProvider = "engine") + public void testAutoCompactionRollsUpMultiValueDimensionsWithoutUnnest(CompactionEngine engine) throws Exception + { + loadData(INDEX_TASK); + try (final Closeable ignored = unloader(fullDatasourceName)) { + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + intervalsBeforeCompaction.sort(null); + // 4 segments across 2 days (4 total) + verifySegmentsCount(4); + verifyQuery(INDEX_QUERIES_RESOURCE); + + LOG.info("Auto compaction test with YEAR segment granularity, DAY query granularity, dropExisting is true"); + + List dimensionSchemas = ImmutableList.of( + new StringDimensionSchema("language", null, true), + new StringDimensionSchema("tags", DimensionSchema.MultiValueHandling.SORTED_ARRAY, true) + ); + + submitCompactionConfig( + MAX_ROWS_PER_SEGMENT_COMPACTED, + NO_SKIP_OFFSET, + new UserCompactionTaskGranularityConfig(Granularities.YEAR, Granularities.DAY, true), + new UserCompactionTaskDimensionsConfig(dimensionSchemas), + null, + new AggregatorFactory[] {new LongSumAggregatorFactory("added", "added")}, + true, + engine + ); + // Compacted into 1 segment for the entire year. + forceTriggerAutoCompaction(1); + Map queryAndResultFields = ImmutableMap.of( + "%%FIELD_TO_QUERY%%", + "added", + "%%EXPECTED_COUNT_RESULT%%", + 1, + "%%EXPECTED_SCAN_RESULT%%", + ImmutableList.of( + ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(516))) + ) + ); + verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields); + verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); + verifySegmentsCompactedDimensionSchema(dimensionSchemas); + } + } + @Test public void testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception { @@ -636,10 +682,11 @@ public void testAutoCompactionDutyCanUpdateCompactionConfig(CompactionEngine eng final HashedPartitionsSpec hashedPartitionsSpec = new HashedPartitionsSpec(null, 3, null); submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null, null, null, null, false, engine); - // 2 segments published per day after compaction. - forceTriggerAutoCompaction(4); + // 3 segments for both 2013-08-31 and 2013-09-01. (Note that numShards guarantees max shards but not exact + // number of final shards, since some shards may end up empty.) + forceTriggerAutoCompaction(6); verifyQuery(INDEX_QUERIES_RESOURCE); - verifySegmentsCompacted(hashedPartitionsSpec, 4); + verifySegmentsCompacted(hashedPartitionsSpec, 6); checkCompactionIntervals(intervalsBeforeCompaction); } diff --git a/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data1.json b/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data1.json index 169796cd7468..30fef106aafb 100644 --- a/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data1.json +++ b/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data1.json @@ -1,3 +1,3 @@ -{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143} -{"timestamp": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330} -{"timestamp": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111} \ No newline at end of file +{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "tags": ["t1", "t2"], "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143} +{"timestamp": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : "en", "tags": ["t1", "t2"], "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330} +{"timestamp": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "tags": ["t3", "t4", "t5"], "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111} \ No newline at end of file diff --git a/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data2.json b/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data2.json index 62e270113d51..d13b6a37bbcc 100644 --- a/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data2.json +++ b/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data2.json @@ -1,3 +1,3 @@ -{"timestamp": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900} +{"timestamp": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "tags": ["t6"], "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900} {"timestamp": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9} -{"timestamp": "2013-09-01T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143} \ No newline at end of file +{"timestamp": "2013-09-01T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "tags": ["t1", "t2"], "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143} \ No newline at end of file diff --git a/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data3.json b/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data3.json index 28e0762f84cf..a8f5c2ec292e 100644 --- a/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data3.json +++ b/integration-tests/src/test/resources/data/batch_index/json/wikipedia_index_data3.json @@ -1,4 +1,4 @@ -{"timestamp": "2013-09-01T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330} -{"timestamp": "2013-09-01T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111} -{"timestamp": "2013-09-01T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900} +{"timestamp": "2013-09-01T03:32:45Z", "page": "Striker Eureka", "language" : "en", "tags": ["t1", "t2"], "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330} +{"timestamp": "2013-09-01T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "tags": ["t3", "t4", "t5"], "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111} +{"timestamp": "2013-09-01T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "tags": ["t6"], "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900} {"timestamp": "2013-09-01T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9} \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json index 00bf7721f2b7..00005c7b65aa 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json @@ -11,6 +11,7 @@ "dimensions": [ "page", {"type": "string", "name": "language", "createBitmapIndex": false}, + "tags", "user", "unpatrolled", "newPage", diff --git a/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java b/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java new file mode 100644 index 000000000000..14deba7725af --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.indexing; + +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.indexing.granularity.GranularitySpec; +import org.apache.druid.segment.transform.TransformSpec; + +import javax.annotation.Nullable; +import java.util.Set; + +/** + * Class representing the combined DataSchema of a set of segments, currently used only by Compaction. + */ +public class CombinedDataSchema extends DataSchema +{ + private final Set multiValuedDimensions; + + public CombinedDataSchema( + String dataSource, + @Nullable TimestampSpec timestampSpec, + @Nullable DimensionsSpec dimensionsSpec, + AggregatorFactory[] aggregators, + GranularitySpec granularitySpec, + TransformSpec transformSpec, + @Nullable Set multiValuedDimensions + ) + { + super( + dataSource, + timestampSpec, + dimensionsSpec, + aggregators, + granularitySpec, + transformSpec, + null, + null + ); + this.multiValuedDimensions = multiValuedDimensions; + } + + @Nullable + public Set getMultiValuedDimensions() + { + return multiValuedDimensions; + } + +} diff --git a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java index 32b2a3830fdf..90297dd4af9d 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java @@ -779,4 +779,26 @@ public void testWithDimensionSpec() Assert.assertSame(oldSchema.getParserMap(), newSchema.getParserMap()); } + + @Test + public void testCombinedDataSchemaSetsMultiValuedColumnsInfo() + { + Set multiValuedDimensions = ImmutableSet.of("dimA"); + + CombinedDataSchema schema = new CombinedDataSchema( + IdUtilsTest.VALID_ID_CHARS, + new TimestampSpec("time", "auto", null), + DimensionsSpec.builder() + .setDimensions( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimA", "dimB", "metric1")) + ) + .setDimensionExclusions(ImmutableList.of("dimC")) + .build(), + null, + new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))), + null, + multiValuedDimensions + ); + Assert.assertEquals(ImmutableSet.of("dimA"), schema.getMultiValuedDimensions()); + } }