Skip to content

Commit

Permalink
Spark: Add option to introduce ordering of RewriteFileGroup (apache#4377
Browse files Browse the repository at this point in the history
)

Allows users to specify the order in which Rewrite JobGroups should be executed.
  • Loading branch information
rajarshisarkar authored Apr 15, 2022
1 parent 993666d commit d1476c6
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 9 deletions.
55 changes: 55 additions & 0 deletions api/src/main/java/org/apache/iceberg/RewriteJobOrder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.util.Locale;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* Enum of supported rewrite job order, it defines the order in which the file groups should be
* written.
* <p><ul>
* <li> bytes-asc: rewrite the smallest job groups first.
* <li> bytes-desc: rewrite the largest job groups first.
* <li> files-asc: rewrite the job groups with the least files first.
* <li> files-desc: rewrite the job groups with the most files first.
* <li> none: rewrite job groups in the order they were planned (no specific ordering).
* </ul><p>
*/
public enum RewriteJobOrder {
BYTES_ASC("bytes-asc"), BYTES_DESC("bytes-desc"),
FILES_ASC("files-asc"), FILES_DESC("files-desc"), NONE("none");

private final String orderName;

RewriteJobOrder(String orderName) {
this.orderName = orderName;
}

public String orderName() {
return orderName;
}

public static RewriteJobOrder fromName(String orderName) {
Preconditions.checkArgument(orderName != null, "Rewrite job order name should not be null");
// Replace the hyphen in order name with underscore to map to the enum value. For example: bytes-asc to BYTES_ASC
return RewriteJobOrder.valueOf(orderName.replaceFirst("-", "_").toUpperCase(Locale.ENGLISH));
}
}
16 changes: 16 additions & 0 deletions api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.actions;

import java.util.List;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.Expression;
Expand Down Expand Up @@ -88,6 +89,21 @@ public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, Rewri
String USE_STARTING_SEQUENCE_NUMBER = "use-starting-sequence-number";
boolean USE_STARTING_SEQUENCE_NUMBER_DEFAULT = true;

/**
* Forces the rewrite job order based on the value.
* <p><ul>
* <li> If rewrite.job-order=bytes-asc, then rewrite the smallest job groups first.
* <li> If rewrite.job-order=bytes-desc, then rewrite the largest job groups first.
* <li> If rewrite.job-order=files-asc, then rewrite the job groups with the least files first.
* <li> If rewrite.job-order=files-desc, then rewrite the job groups with the most files first.
* <li> If rewrite.job-order=none, then rewrite job groups in the order they were planned (no
* specific ordering).
* </ul><p>
* Defaults to none.
*/
String REWRITE_JOB_ORDER = "rewrite.job-order";
String REWRITE_JOB_ORDER_DEFAULT = RewriteJobOrder.NONE.orderName();

/**
* Choose BINPACK as a strategy for this rewrite operation
* @return this for method chaining
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,12 @@ public String toString() {
.add("numAddedFiles", addedFiles == null ? "Rewrite Incomplete" : Integer.toString(addedFiles.size()))
.toString();
}

public long sizeInBytes() {
return fileScanTasks.stream().mapToLong(FileScanTask::length).sum();
}

public int numFiles() {
return fileScanTasks.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.math.RoundingMode;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -34,6 +35,7 @@
import java.util.stream.Stream;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -80,7 +82,8 @@ public class BaseRewriteDataFilesSparkAction
PARTIAL_PROGRESS_ENABLED,
PARTIAL_PROGRESS_MAX_COMMITS,
TARGET_FILE_SIZE_BYTES,
USE_STARTING_SEQUENCE_NUMBER
USE_STARTING_SEQUENCE_NUMBER,
REWRITE_JOB_ORDER
);

private final Table table;
Expand All @@ -90,6 +93,7 @@ public class BaseRewriteDataFilesSparkAction
private int maxCommits;
private boolean partialProgressEnabled;
private boolean useStartingSequenceNumber;
private RewriteJobOrder rewriteJobOrder;
private RewriteStrategy strategy = null;

protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) {
Expand Down Expand Up @@ -146,7 +150,6 @@ public RewriteDataFiles.Result execute() {
}

validateAndInitOptions();
strategy = strategy.options(options());

Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition = planFileGroups(startingSnapshotId);
RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition);
Expand All @@ -166,7 +169,7 @@ public RewriteDataFiles.Result execute() {
}
}

private Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
CloseableIterable<FileScanTask> fileScanTasks = table.newScan()
.useSnapshot(startingSnapshotId)
.filter(filter)
Expand Down Expand Up @@ -328,11 +331,9 @@ private Result doExecuteWithPartialProgress(RewriteExecutionContext ctx, Stream<
return new BaseRewriteDataFilesResult(rewriteResults);
}

private Stream<RewriteFileGroup> toGroupStream(RewriteExecutionContext ctx,
Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {

// Todo Add intelligence to the order in which we do rewrites instead of just using partition order
return fileGroupsByPartition.entrySet().stream()
Stream<RewriteFileGroup> toGroupStream(RewriteExecutionContext ctx,
Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {
Stream<RewriteFileGroup> rewriteFileGroupStream = fileGroupsByPartition.entrySet().stream()
.flatMap(e -> {
StructLike partition = e.getKey();
List<List<FileScanTask>> fileGroups = e.getValue();
Expand All @@ -343,9 +344,26 @@ private Stream<RewriteFileGroup> toGroupStream(RewriteExecutionContext ctx,
return new RewriteFileGroup(info, tasks);
});
});

return rewriteFileGroupStream.sorted(rewriteGroupComparator());
}

private void validateAndInitOptions() {
private Comparator<RewriteFileGroup> rewriteGroupComparator() {
switch (rewriteJobOrder) {
case BYTES_ASC:
return Comparator.comparing(RewriteFileGroup::sizeInBytes);
case BYTES_DESC:
return Comparator.comparing(RewriteFileGroup::sizeInBytes, Comparator.reverseOrder());
case FILES_ASC:
return Comparator.comparing(RewriteFileGroup::numFiles);
case FILES_DESC:
return Comparator.comparing(RewriteFileGroup::numFiles, Comparator.reverseOrder());
default:
return (fileGroupOne, fileGroupTwo) -> 0;
}
}

void validateAndInitOptions() {
Set<String> validOptions = Sets.newHashSet(strategy.validOptions());
validOptions.addAll(VALID_OPTIONS);

Expand All @@ -356,6 +374,8 @@ private void validateAndInitOptions() {
"Cannot use options %s, they are not supported by the action or the strategy %s",
invalidKeys, strategy.name());

strategy = strategy.options(options());

maxConcurrentFileGroupRewrites = PropertyUtil.propertyAsInt(options(),
MAX_CONCURRENT_FILE_GROUP_REWRITES,
MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT);
Expand All @@ -372,6 +392,10 @@ private void validateAndInitOptions() {
USE_STARTING_SEQUENCE_NUMBER,
USE_STARTING_SEQUENCE_NUMBER_DEFAULT);

rewriteJobOrder = RewriteJobOrder.fromName(PropertyUtil.propertyAsString(options(),
REWRITE_JOB_ORDER,
REWRITE_JOB_ORDER_DEFAULT));

Preconditions.checkArgument(maxConcurrentFileGroupRewrites >= 1,
"Cannot set %s to %s, the value must be positive.",
MAX_CONCURRENT_FILE_GROUP_REWRITES, maxConcurrentFileGroupRewrites);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
Expand Down Expand Up @@ -76,6 +77,7 @@
import org.apache.iceberg.spark.FileScanTaskSetManager;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction.RewriteExecutionContext;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Conversions;
Expand Down Expand Up @@ -807,6 +809,12 @@ public void testInvalidOptions() {
() -> basicRewrite(table)
.option("foobarity", "-5")
.execute());

AssertHelpers.assertThrows("Cannot set rewrite.job-order to foo",
IllegalArgumentException.class,
() -> basicRewrite(table)
.option(RewriteDataFiles.REWRITE_JOB_ORDER, "foo")
.execute());
}

@Test
Expand Down Expand Up @@ -1038,6 +1046,144 @@ public void testInvalidAPIUsage() {
"Cannot set strategy", () -> actions().rewriteDataFiles(table).sort(SortOrder.unsorted()).binPack());
}

@Test
public void testRewriteJobOrderBytesAsc() {
Table table = createTablePartitioned(4, 2);
writeRecords(1, SCALE, 1);
writeRecords(2, SCALE, 2);
writeRecords(3, SCALE, 3);
writeRecords(4, SCALE, 4);
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();

BaseRewriteDataFilesSparkAction basicRewrite =
(BaseRewriteDataFilesSparkAction) basicRewrite(table)
.binPack();
List<Long> expected = toGroupStream(table, basicRewrite)
.mapToLong(RewriteFileGroup::sizeInBytes)
.boxed()
.collect(Collectors.toList());

BaseRewriteDataFilesSparkAction jobOrderRewrite =
(BaseRewriteDataFilesSparkAction) basicRewrite(table)
.option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.BYTES_ASC.orderName())
.binPack();
List<Long> actual = toGroupStream(table, jobOrderRewrite)
.mapToLong(RewriteFileGroup::sizeInBytes)
.boxed()
.collect(Collectors.toList());

expected.sort(Comparator.naturalOrder());
Assert.assertEquals("Size in bytes order should be ascending", actual, expected);
Collections.reverse(expected);
Assert.assertNotEquals("Size in bytes order should not be descending", actual, expected);
}

@Test
public void testRewriteJobOrderBytesDesc() {
Table table = createTablePartitioned(4, 2);
writeRecords(1, SCALE, 1);
writeRecords(2, SCALE, 2);
writeRecords(3, SCALE, 3);
writeRecords(4, SCALE, 4);
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();

BaseRewriteDataFilesSparkAction basicRewrite =
(BaseRewriteDataFilesSparkAction) basicRewrite(table)
.binPack();
List<Long> expected = toGroupStream(table, basicRewrite)
.mapToLong(RewriteFileGroup::sizeInBytes)
.boxed()
.collect(Collectors.toList());

BaseRewriteDataFilesSparkAction jobOrderRewrite =
(BaseRewriteDataFilesSparkAction) basicRewrite(table)
.option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.BYTES_DESC.orderName())
.binPack();
List<Long> actual = toGroupStream(table, jobOrderRewrite)
.mapToLong(RewriteFileGroup::sizeInBytes)
.boxed()
.collect(Collectors.toList());

expected.sort(Comparator.reverseOrder());
Assert.assertEquals("Size in bytes order should be descending", actual, expected);
Collections.reverse(expected);
Assert.assertNotEquals("Size in bytes order should not be ascending", actual, expected);
}

@Test
public void testRewriteJobOrderFilesAsc() {
Table table = createTablePartitioned(4, 2);
writeRecords(1, SCALE, 1);
writeRecords(2, SCALE, 2);
writeRecords(3, SCALE, 3);
writeRecords(4, SCALE, 4);
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();

BaseRewriteDataFilesSparkAction basicRewrite =
(BaseRewriteDataFilesSparkAction) basicRewrite(table)
.binPack();
List<Long> expected = toGroupStream(table, basicRewrite)
.mapToLong(RewriteFileGroup::numFiles)
.boxed()
.collect(Collectors.toList());

BaseRewriteDataFilesSparkAction jobOrderRewrite =
(BaseRewriteDataFilesSparkAction) basicRewrite(table)
.option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.FILES_ASC.orderName())
.binPack();
List<Long> actual = toGroupStream(table, jobOrderRewrite)
.mapToLong(RewriteFileGroup::numFiles)
.boxed()
.collect(Collectors.toList());

expected.sort(Comparator.naturalOrder());
Assert.assertEquals("Number of files order should be ascending", actual, expected);
Collections.reverse(expected);
Assert.assertNotEquals("Number of files order should not be descending", actual, expected);
}

@Test
public void testRewriteJobOrderFilesDesc() {
Table table = createTablePartitioned(4, 2);
writeRecords(1, SCALE, 1);
writeRecords(2, SCALE, 2);
writeRecords(3, SCALE, 3);
writeRecords(4, SCALE, 4);
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();

BaseRewriteDataFilesSparkAction basicRewrite =
(BaseRewriteDataFilesSparkAction) basicRewrite(table)
.binPack();
List<Long> expected = toGroupStream(table, basicRewrite)
.mapToLong(RewriteFileGroup::numFiles)
.boxed()
.collect(Collectors.toList());

BaseRewriteDataFilesSparkAction jobOrderRewrite =
(BaseRewriteDataFilesSparkAction) basicRewrite(table)
.option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.FILES_DESC.orderName())
.binPack();
List<Long> actual = toGroupStream(table, jobOrderRewrite)
.mapToLong(RewriteFileGroup::numFiles)
.boxed()
.collect(Collectors.toList());

expected.sort(Comparator.reverseOrder());
Assert.assertEquals("Number of files order should be descending", actual, expected);
Collections.reverse(expected);
Assert.assertNotEquals("Number of files order should not be ascending", actual, expected);
}

private Stream<RewriteFileGroup> toGroupStream(Table table,
BaseRewriteDataFilesSparkAction rewrite) {
rewrite.validateAndInitOptions();
Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition =
rewrite.planFileGroups(table.currentSnapshot().snapshotId());

return rewrite.toGroupStream(
new RewriteExecutionContext(fileGroupsByPartition), fileGroupsByPartition);
}

protected List<Object[]> currentData() {
return rowsToJava(spark.read().format("iceberg").load(tableLocation)
.sort("c1", "c2", "c3")
Expand Down

0 comments on commit d1476c6

Please sign in to comment.