diff --git a/presto-hive-common/src/main/java/com/facebook/presto/hive/BaseHiveTableLayoutHandle.java b/presto-hive-common/src/main/java/com/facebook/presto/hive/BaseHiveTableLayoutHandle.java index 264df0281689..3068d87e7b4c 100644 --- a/presto-hive-common/src/main/java/com/facebook/presto/hive/BaseHiveTableLayoutHandle.java +++ b/presto-hive-common/src/main/java/com/facebook/presto/hive/BaseHiveTableLayoutHandle.java @@ -37,7 +37,7 @@ public class BaseHiveTableLayoutHandle private final TupleDomain partitionColumnPredicate; // coordinator-only properties - private final Optional> partitions; + private final Optional partitions; public BaseHiveTableLayoutHandle( List partitionColumns, @@ -45,7 +45,7 @@ public BaseHiveTableLayoutHandle( RowExpression remainingPredicate, boolean pushdownFilterEnabled, TupleDomain partitionColumnPredicate, - Optional> partitions) + Optional partitions) { this.partitionColumns = ImmutableList.copyOf(requireNonNull(partitionColumns, "partitionColumns is null")); this.domainPredicate = requireNonNull(domainPredicate, "domainPredicate is null"); @@ -91,7 +91,7 @@ public TupleDomain getPartitionColumnPredicate() * @return list of partitions if available, {@code Optional.empty()} if dropped */ @JsonIgnore - public Optional> getPartitions() + public Optional getPartitions() { return partitions; } diff --git a/presto-hive-common/src/main/java/com/facebook/presto/hive/LazyLoadedPartitions.java b/presto-hive-common/src/main/java/com/facebook/presto/hive/LazyLoadedPartitions.java new file mode 100644 index 000000000000..19862f877885 --- /dev/null +++ b/presto-hive-common/src/main/java/com/facebook/presto/hive/LazyLoadedPartitions.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.spi.LazyIterable; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class LazyLoadedPartitions +{ + private boolean fullyLoaded; + private PartitionLoader partitionLoader; + private List partitions; + + public LazyLoadedPartitions(List partitions) + { + this.partitions = requireNonNull(partitions, "partitions is null"); + this.fullyLoaded = true; + } + + public LazyLoadedPartitions(PartitionLoader partitionLoader) + { + this.partitionLoader = requireNonNull(partitionLoader, "partitionLoader is null"); + } + + public void setMaxPartitionThreshold(int maxPartitionThreshold) + { + if (this.partitionLoader != null) { + this.partitionLoader.setMaxPartitionThreshold(maxPartitionThreshold); + } + } + + public List getFullyLoadedPartitions() + { + tryFullyLoad(); + return this.partitions; + } + + /** + * This method may return an iterable with lazy loading + * */ + public LazyIterable getPartitionsIterable() + { + return new LazyPartitionsIterable(this); + } + + public boolean isEmpty() + { + if (this.fullyLoaded) { + return this.partitions.isEmpty(); + } + else { + return this.partitionLoader.isEmpty(); + } + } + + private void tryFullyLoad() + { + if (!this.fullyLoaded) { + synchronized (this) { + if (!this.fullyLoaded) { + this.partitions = this.partitionLoader.loadPartitions(); + this.fullyLoaded = true; + this.partitionLoader = null; + } + } + } + } + + public interface PartitionLoader + { + List loadPartitions(); + + boolean isEmpty(); + + void setMaxPartitionThreshold(int maxPartitionThreshold); + } +} diff --git a/presto-hive-common/src/main/java/com/facebook/presto/hive/LazyPartitionsIterable.java b/presto-hive-common/src/main/java/com/facebook/presto/hive/LazyPartitionsIterable.java new file mode 100644 index 000000000000..3cfbeb8d6862 --- /dev/null +++ b/presto-hive-common/src/main/java/com/facebook/presto/hive/LazyPartitionsIterable.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.spi.LazyIterable; +import com.google.common.collect.AbstractIterator; + +import java.util.Iterator; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +class LazyPartitionsIterable + implements LazyIterable +{ + private final LazyLoadedPartitions partitions; + + public LazyPartitionsIterable(LazyLoadedPartitions partitions) + { + this.partitions = requireNonNull(partitions, "partitions is null"); + } + + @Override + public Iterator iterator() + { + return new LazyIterator(partitions); + } + + @Override + public void setMaxIterableCount(int maxIterableCount) + { + partitions.setMaxPartitionThreshold(maxIterableCount); + } + + private static class LazyIterator + extends AbstractIterator + { + private final LazyLoadedPartitions lazyPartitions; + private List partitions; + private int position = -1; + + private LazyIterator(LazyLoadedPartitions lazyPartitions) + { + this.lazyPartitions = lazyPartitions; + } + + @Override + protected HivePartition computeNext() + { + if (partitions == null) { + partitions = lazyPartitions.getFullyLoadedPartitions(); + } + + position++; + if (position >= partitions.size()) { + return endOfData(); + } + return partitions.get(position); + } + } +} diff --git a/presto-hive-common/src/main/java/com/facebook/presto/hive/MetadataUtils.java b/presto-hive-common/src/main/java/com/facebook/presto/hive/MetadataUtils.java index 361709b7e496..bb5b3754f6a5 100644 --- a/presto-hive-common/src/main/java/com/facebook/presto/hive/MetadataUtils.java +++ b/presto-hive-common/src/main/java/com/facebook/presto/hive/MetadataUtils.java @@ -20,6 +20,7 @@ import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.DiscretePredicates; +import com.facebook.presto.spi.LazyIterable; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.relation.RowExpression; @@ -27,9 +28,10 @@ import com.facebook.presto.spi.relation.SpecialFormExpression; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -51,14 +53,35 @@ public final class MetadataUtils { private MetadataUtils() {} - public static Optional getDiscretePredicates(List partitionColumns, List partitions) + public static Optional getDiscretePredicates(List partitionColumns, LazyLoadedPartitions partitions) { Optional discretePredicates = Optional.empty(); - if (!partitionColumns.isEmpty() && !(partitions.size() == 1 && partitions.get(0).getPartitionId().equals(UNPARTITIONED_ID))) { + if (!partitionColumns.isEmpty()) { // Do not create tuple domains for every partition at the same time! // There can be a huge number of partitions so use an iterable so // all domains do not need to be in memory at the same time. - Iterable> partitionDomains = Iterables.transform(partitions, (hivePartition) -> TupleDomain.fromFixedValues(hivePartition.getKeys())); + LazyIterable partitionIterable = partitions.getPartitionsIterable(); + Iterable> partitionDomains = new LazyIterable>() + { + @Override + public void setMaxIterableCount(int maxIterableCount) + { + partitionIterable.setMaxIterableCount(maxIterableCount); + } + + @Override + public Iterator> iterator() + { + return Iterators.transform(partitionIterable.iterator(), (hivePartition) -> { + if (hivePartition.getPartitionId().equals(UNPARTITIONED_ID)) { + return TupleDomain.all(); + } + else { + return TupleDomain.fromFixedValues(hivePartition.getKeys()); + } + }); + } + }; discretePredicates = Optional.of(new DiscretePredicates(partitionColumns, partitionDomains)); } return discretePredicates; diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java index 95a082c4ea0f..ee92eb0092e1 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java @@ -779,7 +779,7 @@ public Optional getInfo(ConnectorTableLayoutHandle layoutHandle) HiveTableLayoutHandle tableLayoutHandle = (HiveTableLayoutHandle) layoutHandle; if (tableLayoutHandle.getPartitions().isPresent()) { return Optional.of(new HiveInputInfo( - tableLayoutHandle.getPartitions().get().stream() + tableLayoutHandle.getPartitions().get().getFullyLoadedPartitions().stream() .map(hivePartition -> hivePartition.getPartitionId().getPartitionName()) .collect(toList()), false)); @@ -2553,13 +2553,13 @@ public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandl private List getOrComputePartitions(HiveTableLayoutHandle layoutHandle, ConnectorSession session, ConnectorTableHandle tableHandle) { if (layoutHandle.getPartitions().isPresent()) { - return layoutHandle.getPartitions().get(); + return layoutHandle.getPartitions().get().getFullyLoadedPartitions(); } else { TupleDomain partitionColumnPredicate = layoutHandle.getPartitionColumnPredicate(); Predicate> predicate = convertToPredicate(partitionColumnPredicate); List tableLayoutResults = getTableLayouts(session, tableHandle, new Constraint<>(partitionColumnPredicate, predicate), Optional.empty()); - return ((HiveTableLayoutHandle) Iterables.getOnlyElement(tableLayoutResults).getTableLayout().getHandle()).getPartitions().get(); + return ((HiveTableLayoutHandle) Iterables.getOnlyElement(tableLayoutResults).getTableLayout().getHandle()).getPartitions().get().getFullyLoadedPartitions(); } } @@ -2677,7 +2677,7 @@ public List getTableLayouts(ConnectorSession session .setRemainingPredicate(TRUE_CONSTANT) .setPredicateColumns(predicateColumns) .setPartitionColumnPredicate(hivePartitionResult.getEnforcedConstraint()) - .setPartitions(hivePartitionResult.getPartitions()) + .setPartitions(new LazyLoadedPartitions(hivePartitionResult.getPartitions())) .setBucketHandle(hiveBucketHandle) .setBucketFilter(hivePartitionResult.getBucketFilter()) .setPushdownFilterEnabled(false) @@ -2724,7 +2724,7 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa { HiveTableLayoutHandle hiveLayoutHandle = (HiveTableLayoutHandle) layoutHandle; List partitionColumns = ImmutableList.copyOf(hiveLayoutHandle.getPartitionColumns()); - List partitions = hiveLayoutHandle.getPartitions().get(); + LazyLoadedPartitions partitions = hiveLayoutHandle.getPartitions().get(); Optional discretePredicates = getDiscretePredicates(partitionColumns, partitions); @@ -2784,7 +2784,7 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa if (hiveLayoutHandle.isPushdownFilterEnabled()) { Map predicateColumns = hiveLayoutHandle.getPredicateColumns().entrySet() .stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - predicate = getPredicate(hiveLayoutHandle, partitionColumns, partitions, predicateColumns); + predicate = getPredicate(hiveLayoutHandle, partitionColumns, partitions.getFullyLoadedPartitions(), predicateColumns); // capture subfields from domainPredicate to add to remainingPredicate // so those filters don't get lost @@ -2794,7 +2794,7 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa subfieldPredicate = getSubfieldPredicate(session, hiveLayoutHandle, columnTypes, functionResolution, rowExpressionService); } else { - predicate = createPredicate(partitionColumns, partitions); + predicate = createPredicate(partitionColumns, partitions.getFullyLoadedPartitions()); subfieldPredicate = TRUE_CONSTANT; } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java index 62e620bee3e2..55852c16006b 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java @@ -259,7 +259,7 @@ public ConnectorSplitSource getSplits( } // get partitions - List partitions = layout.getPartitions() + List partitions = layout.getPartitions().map(LazyLoadedPartitions::getFullyLoadedPartitions) .orElseThrow(() -> new PrestoException(GENERIC_INTERNAL_ERROR, "Layout does not contain partitions")); // short circuit if we don't have any partitions diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveTableLayoutHandle.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveTableLayoutHandle.java index 9b882b768507..0fdcaf32dc11 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveTableLayoutHandle.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveTableLayoutHandle.java @@ -65,7 +65,7 @@ public class HiveTableLayoutHandle private final boolean footerStatsUnreliable; // coordinator-only properties - private final Optional> partitions; + private final Optional partitions; private final Optional hiveTableHandle; /** @@ -133,7 +133,7 @@ protected HiveTableLayoutHandle( Optional> requestedColumns, boolean partialAggregationsPushedDown, boolean appendRowNumberEnabled, - Optional> partitions, + Optional partitions, boolean footerStatsUnreliable, Optional hiveTableHandle) { @@ -300,7 +300,7 @@ private TupleDomain getConstraint(PlanCanonicalizationStrategy can // Constants are only removed from point checks, and not range checks. Example: // `x = 1` is equivalent to `x = 1000` // `x > 1` is NOT equivalent to `x > 1000` - TupleDomain constraint = createPredicate(ImmutableList.copyOf(getPartitionColumns()), partitions.get()); + TupleDomain constraint = createPredicate(ImmutableList.copyOf(getPartitionColumns()), partitions.get().getFullyLoadedPartitions()); constraint = getDomainPredicate() .transform(subfield -> subfield.getPath().isEmpty() ? subfield.getRootName() : null) .transform(getPredicateColumns()::get) @@ -393,7 +393,7 @@ public static class Builder private boolean appendRowNumberEnabled; private boolean footerStatsUnreliable; - private Optional> partitions; + private Optional partitions; private Optional hiveTableHandle = Optional.empty(); public Builder setSchemaTableName(SchemaTableName schemaTableName) @@ -492,13 +492,13 @@ public Builder setAppendRowNumberEnabled(boolean appendRowNumberEnabled) return this; } - public Builder setPartitions(List partitions) + public Builder setPartitions(LazyLoadedPartitions partitions) { requireNonNull(partitions, "partitions is null"); return setPartitions(Optional.of(partitions)); } - public Builder setPartitions(Optional> partitions) + public Builder setPartitions(Optional partitions) { requireNonNull(partitions, "partitions is null"); this.partitions = partitions; diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/rule/HiveFilterPushdown.java b/presto-hive/src/main/java/com/facebook/presto/hive/rule/HiveFilterPushdown.java index 8d73f18ac48f..bcc495e6d7c8 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/rule/HiveFilterPushdown.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/rule/HiveFilterPushdown.java @@ -25,6 +25,7 @@ import com.facebook.presto.hive.HiveTableHandle; import com.facebook.presto.hive.HiveTableLayoutHandle; import com.facebook.presto.hive.HiveTransactionManager; +import com.facebook.presto.hive.LazyLoadedPartitions; import com.facebook.presto.hive.metastore.Column; import com.facebook.presto.hive.metastore.MetastoreContext; import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore; @@ -195,7 +196,7 @@ public ConnectorPushdownFilterResult getConnectorPushdownFilterResult( .setRemainingPredicate(remainingExpressions.getRemainingExpression()) .setPredicateColumns(predicateColumns) .setPartitionColumnPredicate(hivePartitionResult.getEnforcedConstraint()) - .setPartitions(hivePartitionResult.getPartitions()) + .setPartitions(new LazyLoadedPartitions(hivePartitionResult.getPartitions())) .setBucketHandle(hivePartitionResult.getBucketHandle()) .setBucketFilter(hivePartitionResult.getBucketFilter()) .setPushdownFilterEnabled(true) diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java index 6d36b4863cf3..c28b0f919789 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java @@ -838,7 +838,7 @@ protected void setupHive(String connectorId, String databaseName, String timeZon .setRemainingPredicate(TRUE_CONSTANT) .setPredicateColumns(ImmutableMap.of()) .setPartitionColumnPredicate(TupleDomain.all()) - .setPartitions(ImmutableList.of(new HivePartition(invalidTable, new PartitionNameWithVersion("unknown", Optional.empty()), ImmutableMap.of()))) + .setPartitions(new LazyLoadedPartitions(ImmutableList.of(new HivePartition(invalidTable, new PartitionNameWithVersion("unknown", Optional.empty()), ImmutableMap.of())))) .setBucketHandle(Optional.empty()) .setBucketFilter(Optional.empty()) .setPushdownFilterEnabled(false) @@ -910,7 +910,7 @@ protected void setupHive(String connectorId, String databaseName, String timeZon .setRemainingPredicate(TRUE_CONSTANT) .setPredicateColumns(ImmutableMap.of(dsColumn.getName(), dsColumn)) .setPartitionColumnPredicate(tupleDomain) - .setPartitions(partitions).setBucketHandle(Optional.empty()) + .setPartitions(new LazyLoadedPartitions(partitions)).setBucketHandle(Optional.empty()) .setBucketFilter(Optional.empty()) .setPushdownFilterEnabled(false) .setLayoutString("layout") @@ -957,7 +957,7 @@ protected void setupHive(String connectorId, String databaseName, String timeZon .setRemainingPredicate(TRUE_CONSTANT) .setPredicateColumns(ImmutableMap.of()) .setPartitionColumnPredicate(TupleDomain.all()) - .setPartitions(unpartitionedPartitions) + .setPartitions(new LazyLoadedPartitions(unpartitionedPartitions)) .setBucketHandle(Optional.empty()) .setBucketFilter(Optional.empty()) .setPushdownFilterEnabled(false) @@ -1625,7 +1625,7 @@ protected void assertExpectedTableLayoutHandle(ConnectorTableLayoutHandle actual assertInstanceOf(expectedTableLayoutHandle, HiveTableLayoutHandle.class); HiveTableLayoutHandle actual = (HiveTableLayoutHandle) actualTableLayoutHandle; HiveTableLayoutHandle expected = (HiveTableLayoutHandle) expectedTableLayoutHandle; - assertExpectedPartitions(actual.getPartitions().get(), expected.getPartitions().get()); + assertExpectedPartitions(actual.getPartitions().get().getFullyLoadedPartitions(), expected.getPartitions().get().getFullyLoadedPartitions()); } protected void assertExpectedPartitions(List actualPartitions, Iterable expectedPartitions) @@ -5322,7 +5322,7 @@ protected static List getAllSplits(ConnectorSplitSource splitSou protected List getAllPartitions(ConnectorTableLayoutHandle layoutHandle) { - return ((HiveTableLayoutHandle) layoutHandle).getPartitions() + return ((HiveTableLayoutHandle) layoutHandle).getPartitions().map(LazyLoadedPartitions::getFullyLoadedPartitions) .orElseThrow(() -> new AssertionError("layout has no partitions")); } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java index a692ba3d9ee4..068e0041b82e 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java @@ -469,7 +469,7 @@ private void createTable(MetastoreContext metastoreContext, SchemaTableName tabl // verify the data List tableLayoutResults = metadata.getTableLayouts(session, hiveTableHandle, Constraint.alwaysTrue(), Optional.empty()); HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) getOnlyElement(tableLayoutResults).getTableLayout().getHandle(); - assertEquals(layoutHandle.getPartitions().get().size(), 1); + assertEquals(layoutHandle.getPartitions().get().getFullyLoadedPartitions().size(), 1); ConnectorSplitSource splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, layoutHandle, SPLIT_SCHEDULING_CONTEXT); ConnectorSplit split = getOnlyElement(getAllSplits(splitSource)); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java index 815b60088109..07ca1acd053e 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java @@ -212,7 +212,7 @@ protected QueryRunner createQueryRunner() private List getPartitions(HiveTableLayoutHandle tableLayoutHandle) { - return tableLayoutHandle.getPartitions().get(); + return tableLayoutHandle.getPartitions().get().getFullyLoadedPartitions(); } @Test diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitManager.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitManager.java index bc4954831cac..bb6b0a31f583 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitManager.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveSplitManager.java @@ -581,7 +581,7 @@ private void assertRedundantColumnDomains(Range predicateRange, PartitionStatist .setRemainingPredicate(TRUE_CONSTANT) .setPredicateColumns(ImmutableMap.of(partitionColumn.getName(), partitionColumn, columnHandle.getName(), columnHandle)) .setPartitionColumnPredicate(queryTupleDomain) - .setPartitions(partitions) + .setPartitions(new LazyLoadedPartitions(partitions)) .setBucketHandle(Optional.empty()) .setBucketFilter(Optional.empty()) .setPushdownFilterEnabled(false) @@ -725,7 +725,7 @@ public void testEncryptionInformation() .setRemainingPredicate(TRUE_CONSTANT) .setPredicateColumns(ImmutableMap.of()) .setPartitionColumnPredicate(TupleDomain.all()) - .setPartitions(partitions) + .setPartitions(new LazyLoadedPartitions(partitions)) .setBucketHandle(Optional.empty()) .setBucketFilter(Optional.empty()) .setPushdownFilterEnabled(false) diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveTableLayoutHandle.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveTableLayoutHandle.java index 54355447c971..25b5cc55fa6b 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveTableLayoutHandle.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveTableLayoutHandle.java @@ -60,7 +60,7 @@ public void testIsAppendRowNumberEnabled() Optional bucketFilter = Optional.empty(); Optional> requestedColumns = Optional.empty(); SchemaTableName schemaTableName = SchemaTableName.valueOf("schema.TableName"); - Optional> partitions = Optional.empty(); + Optional partitions = Optional.empty(); Optional hiveTableHandle = Optional.empty(); HiveTableLayoutHandle handle = new HiveTableLayoutHandle( schemaTableName, diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index c920f22e6f6b..466fabbb579b 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -24,6 +24,7 @@ import com.facebook.presto.common.type.VarcharType; import com.facebook.presto.hive.HivePartition; import com.facebook.presto.hive.HiveWrittenPartitions; +import com.facebook.presto.hive.LazyLoadedPartitions; import com.facebook.presto.hive.NodeVersion; import com.facebook.presto.iceberg.changelog.ChangelogUtil; import com.facebook.presto.iceberg.statistics.StatisticsFileCache; @@ -243,10 +244,10 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint( TupleDomain partitionColumnPredicate = TupleDomain.withColumnDomains(Maps.filterKeys(constraint.getSummary().getDomains().get(), Predicates.in(partitionColumns))); Optional> requestedColumns = desiredColumns.map(columns -> columns.stream().map(column -> (IcebergColumnHandle) column).collect(toImmutableSet())); - List partitions; + LazyLoadedPartitions partitions; if (handle.getIcebergTableName().getTableType() == CHANGELOG || handle.getIcebergTableName().getTableType() == EQUALITY_DELETES) { - partitions = ImmutableList.of(new HivePartition(handle.getSchemaTableName())); + partitions = new LazyLoadedPartitions(ImmutableList.of(new HivePartition(handle.getSchemaTableName()))); } else { partitions = getPartitions( @@ -268,7 +269,7 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint( .setRequestedColumns(requestedColumns) .setPushdownFilterEnabled(isPushdownFilterEnabled(session)) .setPartitionColumnPredicate(partitionColumnPredicate.simplify()) - .setPartitions(Optional.ofNullable(partitions.size() == 0 ? null : partitions)) + .setPartitions(Optional.ofNullable(partitions.isEmpty() ? null : partitions)) .setTable(handle) .build()); return new ConnectorTableLayoutResult(layout, constraint.getSummary()); @@ -297,7 +298,7 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa Table icebergTable = getIcebergTable(session, tableHandle.getSchemaTableName()); validateTableMode(session, icebergTable); List partitionColumns = ImmutableList.copyOf(icebergTableLayoutHandle.getPartitionColumns()); - Optional> partitions = icebergTableLayoutHandle.getPartitions(); + Optional partitions = icebergTableLayoutHandle.getPartitions(); Optional discretePredicates = partitions.flatMap(parts -> getDiscretePredicates(partitionColumns, parts)); if (!isPushdownFilterEnabled(session)) { return new ConnectorTableLayout( @@ -313,7 +314,7 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa Map predicateColumns = icebergTableLayoutHandle.getPredicateColumns().entrySet() .stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - Optional> predicate = partitions.map(parts -> getPredicate(icebergTableLayoutHandle, partitionColumns, parts, predicateColumns)); + Optional> predicate = partitions.map(parts -> getPredicate(icebergTableLayoutHandle, partitionColumns, parts.getFullyLoadedPartitions(), predicateColumns)); // capture subfields from domainPredicate to add to remainingPredicate // so those filters don't get lost Map columnTypes = getColumns(icebergTable.schema(), icebergTable.spec(), typeManager).stream() diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPartitionLoader.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPartitionLoader.java new file mode 100644 index 000000000000..55cec62e7b52 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPartitionLoader.java @@ -0,0 +1,213 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.common.predicate.Domain; +import com.facebook.presto.common.predicate.NullableValue; +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.hive.HivePartition; +import com.facebook.presto.hive.LazyLoadedPartitions.PartitionLoader; +import com.facebook.presto.hive.PartitionNameWithVersion; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.Constraint; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static com.facebook.presto.common.Utils.checkArgument; +import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression; +import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat; +import static com.facebook.presto.iceberg.IcebergUtil.getIdentityPartitions; +import static com.facebook.presto.iceberg.IcebergUtil.getNonMetadataColumnConstraints; +import static com.facebook.presto.iceberg.IcebergUtil.parsePartitionValue; +import static com.facebook.presto.iceberg.IcebergUtil.resolveSnapshotIdByName; +import static com.facebook.presto.iceberg.TypeConverter.toPrestoType; +import static org.apache.iceberg.types.Type.TypeID.BINARY; +import static org.apache.iceberg.types.Type.TypeID.FIXED; + +public class IcebergPartitionLoader + implements PartitionLoader +{ + TypeManager typeManager; + FileFormat fileFormat; + ConnectorTableHandle tableHandle; + Constraint constraint; + List partitionColumns; + int maxPartitionThreshold = -1; + final boolean isEmptyTable; + final TableScan tableScan; + public IcebergPartitionLoader( + TypeManager typeManager, + ConnectorTableHandle tableHandle, + Table icebergTable, + Constraint constraint, + List partitionColumns) + { + this.typeManager = typeManager; + this.partitionColumns = partitionColumns; + this.fileFormat = getFileFormat(icebergTable); + this.tableHandle = tableHandle; + this.constraint = constraint; + IcebergTableName name = ((IcebergTableHandle) tableHandle).getIcebergTableName(); + // Empty iceberg table would cause `snapshotId` not present + Optional snapshotId = resolveSnapshotIdByName(icebergTable, name); + if (!snapshotId.isPresent()) { + this.isEmptyTable = true; + this.tableScan = null; + } + else { + this.isEmptyTable = false; + this.tableScan = icebergTable.newScan() + .filter(toIcebergExpression(getNonMetadataColumnConstraints(constraint + .getSummary() + .simplify()))) + .useSnapshot(snapshotId.get()); + } + } + + @Override + public synchronized List loadPartitions() + { + if (isEmptyTable) { + return ImmutableList.of(); + } + + // When `maxPartitionThreshold == 0`, skip the optimization directly + if (maxPartitionThreshold == 0) { + return ImmutableList.of(new HivePartition(((IcebergTableHandle) tableHandle).getSchemaTableName())); + } + + Set partitions = new HashSet<>(); + try (CloseableIterable fileScanTasks = tableScan.planFiles()) { + for (FileScanTask fileScanTask : fileScanTasks) { + // If exists delete files, skip the metadata optimization based on partition values as they might become incorrect + if (!fileScanTask.deletes().isEmpty()) { + return ImmutableList.of(new HivePartition(((IcebergTableHandle) tableHandle).getSchemaTableName())); + } + + // If threshold is set explicitly greater than 0, and partitions number exceeds the threshold, skip the optimization as well + if (maxPartitionThreshold > 0 && partitions.size() >= maxPartitionThreshold) { + return ImmutableList.of(new HivePartition(((IcebergTableHandle) tableHandle).getSchemaTableName())); + } + StructLike partition = fileScanTask.file().partition(); + PartitionSpec spec = fileScanTask.spec(); + Map fieldToIndex = getIdentityPartitions(spec); + ImmutableMap.Builder builder = ImmutableMap.builder(); + + fieldToIndex.forEach((field, index) -> { + int id = field.sourceId(); + org.apache.iceberg.types.Type type = spec.schema().findType(id); + Class javaClass = type.typeId().javaClass(); + Object value = partition.get(index, javaClass); + String partitionStringValue; + + if (value == null) { + partitionStringValue = null; + } + else { + if (type.typeId() == FIXED || type.typeId() == BINARY) { + partitionStringValue = Base64.getEncoder().encodeToString(((ByteBuffer) value).array()); + } + else { + partitionStringValue = value.toString(); + } + } + + NullableValue partitionValue = parsePartitionValue(fileFormat, partitionStringValue, toPrestoType(type, typeManager), partition.toString()); + Optional column = partitionColumns.stream() + .filter(icebergColumnHandle -> Objects.equals(icebergColumnHandle.getId(), field.sourceId())) + .findAny(); + + if (column.isPresent()) { + builder.put(column.get(), partitionValue); + } + }); + + Map values = builder.build(); + HivePartition newPartition = new HivePartition( + ((IcebergTableHandle) tableHandle).getSchemaTableName(), + new PartitionNameWithVersion(partition.toString(), Optional.empty()), + values); + + boolean isIncludePartition = true; + Map domains = constraint.getSummary().getDomains().get(); + for (IcebergColumnHandle column : partitionColumns) { + NullableValue value = newPartition.getKeys().get(column); + Domain allowedDomain = domains.get(column); + if (allowedDomain != null && !allowedDomain.includesNullableValue(value.getValue())) { + isIncludePartition = false; + break; + } + } + + if (constraint.predicate().isPresent() && !constraint.predicate().get().test(newPartition.getKeys())) { + isIncludePartition = false; + } + + if (isIncludePartition) { + partitions.add(newPartition); + } + } + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + return new ArrayList<>(partitions); + } + + @Override + public synchronized boolean isEmpty() + { + if (isEmptyTable) { + return true; + } + + try (CloseableIterable fileScanTasks = tableScan.planFiles(); + CloseableIterator iterator = fileScanTasks.iterator()) { + return !iterator.hasNext(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void setMaxPartitionThreshold(int maxPartitionThreshold) + { + checkArgument(maxPartitionThreshold > -1, "maxPartitionThreshold must greater than -1"); + if (this.maxPartitionThreshold == -1) { + this.maxPartitionThreshold = maxPartitionThreshold; + } + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableLayoutHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableLayoutHandle.java index 3b1a7c669181..3e00a6c126cf 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableLayoutHandle.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableLayoutHandle.java @@ -17,7 +17,7 @@ import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.hive.BaseHiveColumnHandle; import com.facebook.presto.hive.BaseHiveTableLayoutHandle; -import com.facebook.presto.hive.HivePartition; +import com.facebook.presto.hive.LazyLoadedPartitions; import com.facebook.presto.hive.metastore.Column; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.relation.RowExpression; @@ -77,7 +77,7 @@ public IcebergTableLayoutHandle( Optional> requestedColumns, boolean pushdownFilterEnabled, TupleDomain partitionColumnPredicate, - Optional> partitions, + Optional partitions, IcebergTableHandle table) { super( @@ -171,7 +171,7 @@ public static class Builder private Optional> requestedColumns; private boolean pushdownFilterEnabled; private TupleDomain partitionColumnPredicate; - private Optional> partitions; + private Optional partitions; private IcebergTableHandle table; public Builder setPartitionColumns(List partitionColumns) @@ -222,7 +222,7 @@ public Builder setPartitionColumnPredicate(TupleDomain partitionCo return this; } - public Builder setPartitions(Optional> partitions) + public Builder setPartitions(Optional partitions) { this.partitions = partitions; return this; diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index 73f251f7f76d..fb5e2bdee972 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -28,10 +28,9 @@ import com.facebook.presto.hive.HdfsContext; import com.facebook.presto.hive.HdfsEnvironment; import com.facebook.presto.hive.HiveColumnConverterProvider; -import com.facebook.presto.hive.HivePartition; import com.facebook.presto.hive.HivePartitionKey; import com.facebook.presto.hive.HiveType; -import com.facebook.presto.hive.PartitionNameWithVersion; +import com.facebook.presto.hive.LazyLoadedPartitions; import com.facebook.presto.hive.metastore.Column; import com.facebook.presto.hive.metastore.ExtendedHiveMetastore; import com.facebook.presto.hive.metastore.MetastoreContext; @@ -80,7 +79,6 @@ import org.apache.iceberg.util.SnapshotUtil; import java.io.IOException; -import java.io.UncheckedIOException; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; @@ -141,7 +139,6 @@ import static com.facebook.presto.iceberg.IcebergTableProperties.getCommitRetries; import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion; import static com.facebook.presto.iceberg.TypeConverter.toIcebergType; -import static com.facebook.presto.iceberg.TypeConverter.toPrestoType; import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier; import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; @@ -514,7 +511,7 @@ private static void verifyPartitionTypeSupported(FileFormat fileFormat, String p } } - private static NullableValue parsePartitionValue( + protected static NullableValue parsePartitionValue( FileFormat fileFormat, String partitionStringValue, Type prestoType, @@ -558,100 +555,20 @@ else if (constraint.getKey() == DATA_SEQUENCE_NUMBER_COLUMN_HANDLE) { return matches; } - public static List getPartitions( + public static LazyLoadedPartitions getPartitions( TypeManager typeManager, ConnectorTableHandle tableHandle, Table icebergTable, Constraint constraint, List partitionColumns) { - IcebergTableName name = ((IcebergTableHandle) tableHandle).getIcebergTableName(); - FileFormat fileFormat = getFileFormat(icebergTable); - // Empty iceberg table would cause `snapshotId` not present - Optional snapshotId = resolveSnapshotIdByName(icebergTable, name); - if (!snapshotId.isPresent()) { - return ImmutableList.of(); - } - - TableScan tableScan = icebergTable.newScan() - .filter(toIcebergExpression(getNonMetadataColumnConstraints(constraint - .getSummary() - .simplify()))) - .useSnapshot(snapshotId.get()); - - Set partitions = new HashSet<>(); - - try (CloseableIterable fileScanTasks = tableScan.planFiles()) { - for (FileScanTask fileScanTask : fileScanTasks) { - // If exists delete files, skip the metadata optimization based on partition values as they might become incorrect - if (!fileScanTask.deletes().isEmpty()) { - return ImmutableList.of(new HivePartition(((IcebergTableHandle) tableHandle).getSchemaTableName())); - } - StructLike partition = fileScanTask.file().partition(); - PartitionSpec spec = fileScanTask.spec(); - Map fieldToIndex = getIdentityPartitions(spec); - ImmutableMap.Builder builder = ImmutableMap.builder(); - - fieldToIndex.forEach((field, index) -> { - int id = field.sourceId(); - org.apache.iceberg.types.Type type = spec.schema().findType(id); - Class javaClass = type.typeId().javaClass(); - Object value = partition.get(index, javaClass); - String partitionStringValue; - - if (value == null) { - partitionStringValue = null; - } - else { - if (type.typeId() == FIXED || type.typeId() == BINARY) { - partitionStringValue = Base64.getEncoder().encodeToString(((ByteBuffer) value).array()); - } - else { - partitionStringValue = value.toString(); - } - } - - NullableValue partitionValue = parsePartitionValue(fileFormat, partitionStringValue, toPrestoType(type, typeManager), partition.toString()); - Optional column = partitionColumns.stream() - .filter(icebergColumnHandle -> Objects.equals(icebergColumnHandle.getId(), field.sourceId())) - .findAny(); - - if (column.isPresent()) { - builder.put(column.get(), partitionValue); - } - }); - - Map values = builder.build(); - HivePartition newPartition = new HivePartition( - ((IcebergTableHandle) tableHandle).getSchemaTableName(), - new PartitionNameWithVersion(partition.toString(), Optional.empty()), - values); - - boolean isIncludePartition = true; - Map domains = constraint.getSummary().getDomains().get(); - for (IcebergColumnHandle column : partitionColumns) { - NullableValue value = newPartition.getKeys().get(column); - Domain allowedDomain = domains.get(column); - if (allowedDomain != null && !allowedDomain.includesNullableValue(value.getValue())) { - isIncludePartition = false; - break; - } - } - - if (constraint.predicate().isPresent() && !constraint.predicate().get().test(newPartition.getKeys())) { - isIncludePartition = false; - } - - if (isIncludePartition) { - partitions.add(newPartition); - } - } - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - - return new ArrayList<>(partitions); + return new LazyLoadedPartitions( + new IcebergPartitionLoader( + typeManager, + tableHandle, + icebergTable, + constraint, + partitionColumns)); } public static Optional tryGetSchema(Table table) diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergFilterPushdown.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergFilterPushdown.java index 38f09363270b..b6250687bce6 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergFilterPushdown.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergFilterPushdown.java @@ -16,7 +16,7 @@ import com.facebook.presto.common.Subfield; import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.type.TypeManager; -import com.facebook.presto.hive.HivePartition; +import com.facebook.presto.hive.LazyLoadedPartitions; import com.facebook.presto.hive.rule.BaseSubfieldExtractionRewriter; import com.facebook.presto.iceberg.IcebergAbstractMetadata; import com.facebook.presto.iceberg.IcebergColumnHandle; @@ -155,7 +155,7 @@ protected ConnectorPushdownFilterResult getConnectorPushdownFilterResult( TupleDomain partitionColumnPredicate = TupleDomain.withColumnDomains(Maps.filterKeys( constraint.getSummary().getDomains().get(), Predicates.in(partitionColumns))); - List partitions = getPartitions( + LazyLoadedPartitions partitions = getPartitions( typeManager, tableHandle, icebergTable, @@ -174,7 +174,7 @@ protected ConnectorPushdownFilterResult getConnectorPushdownFilterResult( .setRequestedColumns(requestedColumns) .setPushdownFilterEnabled(true) .setPartitionColumnPredicate(partitionColumnPredicate) - .setPartitions(Optional.ofNullable(partitions.size() == 0 ? null : partitions)) + .setPartitions(Optional.ofNullable(partitions.isEmpty() ? null : partitions)) .setTable((IcebergTableHandle) tableHandle) .build()), remainingExpressions.getDynamicFilterExpression()); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergMetadataOptimizer.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergMetadataOptimizer.java index e390f45f05fb..77ab55076836 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergMetadataOptimizer.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergMetadataOptimizer.java @@ -203,8 +203,18 @@ public PlanNode visitAggregation(AggregationNode node, RewriteContext cont DiscretePredicates discretePredicates = layout.getDiscretePredicates().get(); + // When reducible optimization is possible, we tend to load all partitions + // Or we load partitions that do not exceed the threshold + if (!isReducible(node, inputs)) { + discretePredicates.setPartitionThreshold(rowsForMetadataOptimizationThreshold); + } + + if (discretePredicates.isSingleAllDomain()) { + return context.defaultRewrite(node); + } + // the optimization is only valid if there is no filter on non-partition columns - if (layout.getPredicate().getColumnDomains().isPresent()) { + if (!discretePredicates.isEmpty() && layout.getPredicate().getColumnDomains().isPresent()) { List predicateColumns = layout.getPredicate().getColumnDomains().get().stream() .map(ColumnDomain::getColumn) .collect(toImmutableList()); @@ -231,9 +241,9 @@ public PlanNode visitAggregation(AggregationNode node, RewriteContext cont } // When `rowsForMetadataOptimizationThreshold == 0`, or partitions number exceeds the threshold, skip the optimization - if (rowsForMetadataOptimizationThreshold == 0 || Iterables.size(discretePredicates.getPredicates()) > rowsForMetadataOptimizationThreshold) { + /*if (rowsForMetadataOptimizationThreshold == 0 || Iterables.size(discretePredicates.getPredicates()) > rowsForMetadataOptimizationThreshold) { return context.defaultRewrite(node); - } + }*/ ImmutableList.Builder> rowsBuilder = ImmutableList.builder(); for (TupleDomain domain : discretePredicates.getPredicates()) { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java index af5aef01101f..16d2af85f470 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java @@ -90,6 +90,7 @@ import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner; import static com.facebook.presto.iceberg.IcebergSessionProperties.PARQUET_DEREFERENCE_PUSHDOWN_ENABLED; import static com.facebook.presto.iceberg.IcebergSessionProperties.PUSHDOWN_FILTER_ENABLED; +import static com.facebook.presto.iceberg.IcebergSessionProperties.ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD; import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled; import static com.facebook.presto.parquet.ParquetTypeUtils.pushdownColumnNameForSubfield; import static com.facebook.presto.spi.relation.SpecialFormExpression.Form.AND; @@ -199,6 +200,74 @@ public void testMetadataQueryOptimizer(boolean enabled) } } + @Test(dataProvider = "push_down_filter_enabled") + public void testMetadataQueryOptimizerWithMaxPartitionThreshold(boolean enabled) + + { + QueryRunner queryRunner = getQueryRunner(); + try { + queryRunner.execute("create table metadata_optimize_with_threshold(v1 int, v2 varchar, a int, b varchar)" + + " with(partitioning = ARRAY['a', 'b'])"); + // insert data into 4 different partitions + queryRunner.execute("insert into metadata_optimize_with_threshold values" + + " (1, '1001', 1, '1001')," + + " (2, '1002', 2, '1001')," + + " (3, '1003', 3, '1002')," + + " (4, '1004', 4, '1002')"); + + // Perform metadata optimization when the number of partitions does not exceed the threshold + Session sessionWithThresholdBigger = getSessionWithOptimizeMetadataQueriesAndThreshold(enabled, 4); + assertQuery(sessionWithThresholdBigger, "select b, max(a), min(a) from metadata_optimize_with_threshold group by b", + "values('1001', 2, 1), ('1002', 4, 3)"); + assertPlan(sessionWithThresholdBigger, "select b, max(a), min(a) from metadata_optimize_with_threshold group by b", + anyTree(values( + ImmutableList.of("a", "b"), + ImmutableList.of( + ImmutableList.of(new LongLiteral("1"), new StringLiteral("1001")), + ImmutableList.of(new LongLiteral("2"), new StringLiteral("1001")), + ImmutableList.of(new LongLiteral("3"), new StringLiteral("1002")), + ImmutableList.of(new LongLiteral("4"), new StringLiteral("1002")))))); + + assertQuery(sessionWithThresholdBigger, "select distinct a, b from metadata_optimize_with_threshold", + "values(1, '1001'), (2, '1001'), (3, '1002'), (4, '1002')"); + assertPlan(sessionWithThresholdBigger, "select distinct a, b from metadata_optimize_with_threshold", + anyTree(values( + ImmutableList.of("a", "b"), + ImmutableList.of( + ImmutableList.of(new LongLiteral("1"), new StringLiteral("1001")), + ImmutableList.of(new LongLiteral("2"), new StringLiteral("1001")), + ImmutableList.of(new LongLiteral("3"), new StringLiteral("1002")), + ImmutableList.of(new LongLiteral("4"), new StringLiteral("1002")))))); + + // Do not perform metadata optimization when the number of partitions exceeds the threshold + Session sessionWithThresholdSmaller = getSessionWithOptimizeMetadataQueriesAndThreshold(false, 3); + assertQuery(sessionWithThresholdSmaller, "select b, max(a), min(a) from metadata_optimize_with_threshold group by b", + "values('1001', 2, 1), ('1002', 4, 3)"); + assertPlan(sessionWithThresholdSmaller, "select b, max(a), min(a) from metadata_optimize_with_threshold group by b", + anyTree(strictTableScan("metadata_optimize_with_threshold", identityMap("a", "b")))); + + assertQuery(sessionWithThresholdSmaller, "select distinct a, b from metadata_optimize_with_threshold", + "values(1, '1001'), (2, '1001'), (3, '1002'), (4, '1002')"); + assertPlan(sessionWithThresholdSmaller, "select distinct a, b from metadata_optimize_with_threshold", + anyTree(strictTableScan("metadata_optimize_with_threshold", identityMap("a", "b")))); + + // Perform further reducible optimization regardless of whether the number of partitions exceeds the threshold + assertQuery(sessionWithThresholdBigger, "select min(a), max(b) from metadata_optimize_with_threshold", "values(1, '1002')"); + assertPlan(sessionWithThresholdBigger, "select min(a), max(b) from metadata_optimize_with_threshold", + anyNot(AggregationNode.class, strictProject( + ImmutableMap.of("a", expression("1"), "b", expression("1002")), + anyTree(values())))); + assertQuery(sessionWithThresholdSmaller, "select min(a), max(b) from metadata_optimize_with_threshold", "values(1, '1002')"); + assertPlan(sessionWithThresholdSmaller, "select min(a), max(b) from metadata_optimize_with_threshold", + anyNot(AggregationNode.class, strictProject( + ImmutableMap.of("a", expression("1"), "b", expression("1002")), + anyTree(values())))); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS metadata_optimize_with_threshold"); + } + } + @Test(dataProvider = "push_down_filter_enabled") public void testMetadataQueryOptimizerOnPartitionEvolution(boolean enabled) { @@ -2306,6 +2375,14 @@ protected Session getSessionWithOptimizeMetadataQueries(boolean enabled) .build(); } + protected Session getSessionWithOptimizeMetadataQueriesAndThreshold(boolean enabled, int rowsForMetadataOptimizationThreshold) + { + return Session.builder(super.getSession()) + .setCatalogSessionProperty(ICEBERG_CATALOG, PUSHDOWN_FILTER_ENABLED, String.valueOf(enabled)) + .setCatalogSessionProperty(ICEBERG_CATALOG, ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD, String.valueOf(rowsForMetadataOptimizationThreshold)) + .build(); + } + private static Set toSubfields(String... subfieldPaths) { return Arrays.stream(subfieldPaths) diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/DiscretePredicates.java b/presto-spi/src/main/java/com/facebook/presto/spi/DiscretePredicates.java index bf0aa6e41cf4..c48148c44eb5 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/DiscretePredicates.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/DiscretePredicates.java @@ -16,6 +16,7 @@ import com.facebook.presto.common.predicate.TupleDomain; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import static java.util.Collections.unmodifiableList; @@ -46,4 +47,28 @@ public Iterable> getPredicates() { return predicates; } + + public void setPartitionThreshold(int partitionThreshold) + { + if (predicates instanceof LazyIterable) { + ((LazyIterable) predicates).setMaxIterableCount(partitionThreshold); + } + } + + public boolean isSingleAllDomain() + { + Iterator> predicateIterator = this.predicates.iterator(); + if (predicateIterator.hasNext()) { + TupleDomain domain = predicateIterator.next(); + if (domain.equals(TupleDomain.all()) && !predicateIterator.hasNext()) { + return true; + } + } + return false; + } + + public boolean isEmpty() + { + return !this.predicates.iterator().hasNext(); + } } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/LazyIterable.java b/presto-spi/src/main/java/com/facebook/presto/spi/LazyIterable.java new file mode 100644 index 000000000000..073bfbef4a1d --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/LazyIterable.java @@ -0,0 +1,20 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi; + +public interface LazyIterable + extends Iterable +{ + void setMaxIterableCount(int maxIterableCount); +}