diff --git a/api/src/main/java/org/apache/iceberg/RewriteJobOrder.java b/api/src/main/java/org/apache/iceberg/RewriteJobOrder.java new file mode 100644 index 000000000000..3b47dff78296 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/RewriteJobOrder.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.Locale; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * Enum of supported rewrite job order, it defines the order in which the file groups should be + * written. + *

+ */ +public enum RewriteJobOrder { + BYTES_ASC("bytes-asc"), BYTES_DESC("bytes-desc"), + FILES_ASC("files-asc"), FILES_DESC("files-desc"), NONE("none"); + + private final String orderName; + + RewriteJobOrder(String orderName) { + this.orderName = orderName; + } + + public String orderName() { + return orderName; + } + + public static RewriteJobOrder fromName(String orderName) { + Preconditions.checkArgument(orderName != null, "Rewrite job order name should not be null"); + // Replace the hyphen in order name with underscore to map to the enum value. For example: bytes-asc to BYTES_ASC + return RewriteJobOrder.valueOf(orderName.replaceFirst("-", "_").toUpperCase(Locale.ENGLISH)); + } +} diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java index aa5d49c7ab65..f00596fa46e8 100644 --- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java @@ -20,6 +20,7 @@ package org.apache.iceberg.actions; import java.util.List; +import org.apache.iceberg.RewriteJobOrder; import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.expressions.Expression; @@ -88,6 +89,21 @@ public interface RewriteDataFiles extends SnapshotUpdate

+ * Defaults to none. + */ + String REWRITE_JOB_ORDER = "rewrite.job-order"; + String REWRITE_JOB_ORDER_DEFAULT = RewriteJobOrder.NONE.orderName(); + /** * Choose BINPACK as a strategy for this rewrite operation * @return this for method chaining diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java index 1dd1476ba62b..99ec75772301 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java @@ -77,4 +77,12 @@ public String toString() { .add("numAddedFiles", addedFiles == null ? "Rewrite Incomplete" : Integer.toString(addedFiles.size())) .toString(); } + + public long sizeInBytes() { + return fileScanTasks.stream().mapToLong(FileScanTask::length).sum(); + } + + public int numFiles() { + return fileScanTasks.size(); + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java index c56c03ee2d1b..5350e729c8ea 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.math.RoundingMode; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,6 +35,7 @@ import java.util.stream.Stream; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.RewriteJobOrder; import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; @@ -80,7 +82,8 @@ public class BaseRewriteDataFilesSparkAction PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_MAX_COMMITS, TARGET_FILE_SIZE_BYTES, - USE_STARTING_SEQUENCE_NUMBER + USE_STARTING_SEQUENCE_NUMBER, + REWRITE_JOB_ORDER ); private final Table table; @@ -90,6 +93,7 @@ public class BaseRewriteDataFilesSparkAction private int maxCommits; private boolean partialProgressEnabled; private boolean useStartingSequenceNumber; + private RewriteJobOrder rewriteJobOrder; private RewriteStrategy strategy = null; protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) { @@ -146,7 +150,6 @@ public RewriteDataFiles.Result execute() { } validateAndInitOptions(); - strategy = strategy.options(options()); Map>> fileGroupsByPartition = planFileGroups(startingSnapshotId); RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition); @@ -166,7 +169,7 @@ public RewriteDataFiles.Result execute() { } } - private Map>> planFileGroups(long startingSnapshotId) { + Map>> planFileGroups(long startingSnapshotId) { CloseableIterable fileScanTasks = table.newScan() .useSnapshot(startingSnapshotId) .filter(filter) @@ -328,11 +331,9 @@ private Result doExecuteWithPartialProgress(RewriteExecutionContext ctx, Stream< return new BaseRewriteDataFilesResult(rewriteResults); } - private Stream toGroupStream(RewriteExecutionContext ctx, - Map>> fileGroupsByPartition) { - - // Todo Add intelligence to the order in which we do rewrites instead of just using partition order - return fileGroupsByPartition.entrySet().stream() + Stream toGroupStream(RewriteExecutionContext ctx, + Map>> fileGroupsByPartition) { + Stream rewriteFileGroupStream = fileGroupsByPartition.entrySet().stream() .flatMap(e -> { StructLike partition = e.getKey(); List> fileGroups = e.getValue(); @@ -343,9 +344,26 @@ private Stream toGroupStream(RewriteExecutionContext ctx, return new RewriteFileGroup(info, tasks); }); }); + + return rewriteFileGroupStream.sorted(rewriteGroupComparator()); } - private void validateAndInitOptions() { + private Comparator rewriteGroupComparator() { + switch (rewriteJobOrder) { + case BYTES_ASC: + return Comparator.comparing(RewriteFileGroup::sizeInBytes); + case BYTES_DESC: + return Comparator.comparing(RewriteFileGroup::sizeInBytes, Comparator.reverseOrder()); + case FILES_ASC: + return Comparator.comparing(RewriteFileGroup::numFiles); + case FILES_DESC: + return Comparator.comparing(RewriteFileGroup::numFiles, Comparator.reverseOrder()); + default: + return (fileGroupOne, fileGroupTwo) -> 0; + } + } + + void validateAndInitOptions() { Set validOptions = Sets.newHashSet(strategy.validOptions()); validOptions.addAll(VALID_OPTIONS); @@ -356,6 +374,8 @@ private void validateAndInitOptions() { "Cannot use options %s, they are not supported by the action or the strategy %s", invalidKeys, strategy.name()); + strategy = strategy.options(options()); + maxConcurrentFileGroupRewrites = PropertyUtil.propertyAsInt(options(), MAX_CONCURRENT_FILE_GROUP_REWRITES, MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT); @@ -372,6 +392,10 @@ private void validateAndInitOptions() { USE_STARTING_SEQUENCE_NUMBER, USE_STARTING_SEQUENCE_NUMBER_DEFAULT); + rewriteJobOrder = RewriteJobOrder.fromName(PropertyUtil.propertyAsString(options(), + REWRITE_JOB_ORDER, + REWRITE_JOB_ORDER_DEFAULT)); + Preconditions.checkArgument(maxConcurrentFileGroupRewrites >= 1, "Cannot set %s to %s, the value must be positive.", MAX_CONCURRENT_FILE_GROUP_REWRITES, maxConcurrentFileGroupRewrites); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 0c39faf259c5..1d8695053123 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -41,6 +41,7 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RewriteJobOrder; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; @@ -76,6 +77,7 @@ import org.apache.iceberg.spark.FileScanTaskSetManager; import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction.RewriteExecutionContext; import org.apache.iceberg.spark.source.ThreeColumnRecord; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Conversions; @@ -807,6 +809,12 @@ public void testInvalidOptions() { () -> basicRewrite(table) .option("foobarity", "-5") .execute()); + + AssertHelpers.assertThrows("Cannot set rewrite.job-order to foo", + IllegalArgumentException.class, + () -> basicRewrite(table) + .option(RewriteDataFiles.REWRITE_JOB_ORDER, "foo") + .execute()); } @Test @@ -1038,6 +1046,144 @@ public void testInvalidAPIUsage() { "Cannot set strategy", () -> actions().rewriteDataFiles(table).sort(SortOrder.unsorted()).binPack()); } + @Test + public void testRewriteJobOrderBytesAsc() { + Table table = createTablePartitioned(4, 2); + writeRecords(1, SCALE, 1); + writeRecords(2, SCALE, 2); + writeRecords(3, SCALE, 3); + writeRecords(4, SCALE, 4); + table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); + + BaseRewriteDataFilesSparkAction basicRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .binPack(); + List expected = toGroupStream(table, basicRewrite) + .mapToLong(RewriteFileGroup::sizeInBytes) + .boxed() + .collect(Collectors.toList()); + + BaseRewriteDataFilesSparkAction jobOrderRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.BYTES_ASC.orderName()) + .binPack(); + List actual = toGroupStream(table, jobOrderRewrite) + .mapToLong(RewriteFileGroup::sizeInBytes) + .boxed() + .collect(Collectors.toList()); + + expected.sort(Comparator.naturalOrder()); + Assert.assertEquals("Size in bytes order should be ascending", actual, expected); + Collections.reverse(expected); + Assert.assertNotEquals("Size in bytes order should not be descending", actual, expected); + } + + @Test + public void testRewriteJobOrderBytesDesc() { + Table table = createTablePartitioned(4, 2); + writeRecords(1, SCALE, 1); + writeRecords(2, SCALE, 2); + writeRecords(3, SCALE, 3); + writeRecords(4, SCALE, 4); + table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); + + BaseRewriteDataFilesSparkAction basicRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .binPack(); + List expected = toGroupStream(table, basicRewrite) + .mapToLong(RewriteFileGroup::sizeInBytes) + .boxed() + .collect(Collectors.toList()); + + BaseRewriteDataFilesSparkAction jobOrderRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.BYTES_DESC.orderName()) + .binPack(); + List actual = toGroupStream(table, jobOrderRewrite) + .mapToLong(RewriteFileGroup::sizeInBytes) + .boxed() + .collect(Collectors.toList()); + + expected.sort(Comparator.reverseOrder()); + Assert.assertEquals("Size in bytes order should be descending", actual, expected); + Collections.reverse(expected); + Assert.assertNotEquals("Size in bytes order should not be ascending", actual, expected); + } + + @Test + public void testRewriteJobOrderFilesAsc() { + Table table = createTablePartitioned(4, 2); + writeRecords(1, SCALE, 1); + writeRecords(2, SCALE, 2); + writeRecords(3, SCALE, 3); + writeRecords(4, SCALE, 4); + table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); + + BaseRewriteDataFilesSparkAction basicRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .binPack(); + List expected = toGroupStream(table, basicRewrite) + .mapToLong(RewriteFileGroup::numFiles) + .boxed() + .collect(Collectors.toList()); + + BaseRewriteDataFilesSparkAction jobOrderRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.FILES_ASC.orderName()) + .binPack(); + List actual = toGroupStream(table, jobOrderRewrite) + .mapToLong(RewriteFileGroup::numFiles) + .boxed() + .collect(Collectors.toList()); + + expected.sort(Comparator.naturalOrder()); + Assert.assertEquals("Number of files order should be ascending", actual, expected); + Collections.reverse(expected); + Assert.assertNotEquals("Number of files order should not be descending", actual, expected); + } + + @Test + public void testRewriteJobOrderFilesDesc() { + Table table = createTablePartitioned(4, 2); + writeRecords(1, SCALE, 1); + writeRecords(2, SCALE, 2); + writeRecords(3, SCALE, 3); + writeRecords(4, SCALE, 4); + table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); + + BaseRewriteDataFilesSparkAction basicRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .binPack(); + List expected = toGroupStream(table, basicRewrite) + .mapToLong(RewriteFileGroup::numFiles) + .boxed() + .collect(Collectors.toList()); + + BaseRewriteDataFilesSparkAction jobOrderRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.FILES_DESC.orderName()) + .binPack(); + List actual = toGroupStream(table, jobOrderRewrite) + .mapToLong(RewriteFileGroup::numFiles) + .boxed() + .collect(Collectors.toList()); + + expected.sort(Comparator.reverseOrder()); + Assert.assertEquals("Number of files order should be descending", actual, expected); + Collections.reverse(expected); + Assert.assertNotEquals("Number of files order should not be ascending", actual, expected); + } + + private Stream toGroupStream(Table table, + BaseRewriteDataFilesSparkAction rewrite) { + rewrite.validateAndInitOptions(); + Map>> fileGroupsByPartition = + rewrite.planFileGroups(table.currentSnapshot().snapshotId()); + + return rewrite.toGroupStream( + new RewriteExecutionContext(fileGroupsByPartition), fileGroupsByPartition); + } + protected List currentData() { return rowsToJava(spark.read().format("iceberg").load(tableLocation) .sort("c1", "c2", "c3")