Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.5: Fix flaky test due to temp directory not empty during delete #11470

Merged
merged 2 commits into from
Nov 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,13 @@
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.avro.generic.GenericData.Record;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Files;
import org.apache.iceberg.Parameter;
Expand Down Expand Up @@ -76,6 +74,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(ParameterizedTestExtension.class)
public class TestDataFrameWrites extends ParameterizedAvroDataTest {
Expand All @@ -88,6 +87,8 @@ public static Collection<String> parameters() {

@Parameter private String format;

@TempDir private File location;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to use @TempDir here.


private static SparkSession spark = null;
private static JavaSparkContext sc = null;

Expand Down Expand Up @@ -140,47 +141,37 @@ public static void stopSpark() {

@Override
protected void writeAndValidate(Schema schema) throws IOException {
File location = createTableFolder();
Table table = createTable(schema, location);
writeAndValidateWithLocations(table, location, new File(location, "data"));
Table table = createTable(schema);
writeAndValidateWithLocations(table, new File(location, "data"));
}

@TestTemplate
public void testWriteWithCustomDataLocation() throws IOException {
File location = createTableFolder();
File tablePropertyDataLocation = temp.resolve("test-table-property-data-dir").toFile();
Table table = createTable(new Schema(SUPPORTED_PRIMITIVES.fields()), location);
Table table = createTable(new Schema(SUPPORTED_PRIMITIVES.fields()));
table
.updateProperties()
.set(TableProperties.WRITE_DATA_LOCATION, tablePropertyDataLocation.getAbsolutePath())
.commit();
writeAndValidateWithLocations(table, location, tablePropertyDataLocation);
}

private File createTableFolder() throws IOException {
File parent = temp.resolve("parquet").toFile();
File location = new File(parent, "test");
assertThat(location.mkdirs()).as("Mkdir should succeed").isTrue();
return location;
writeAndValidateWithLocations(table, tablePropertyDataLocation);
}

private Table createTable(Schema schema, File location) {
private Table createTable(Schema schema) {
HadoopTables tables = new HadoopTables(CONF);
return tables.create(schema, PartitionSpec.unpartitioned(), location.toString());
}

private void writeAndValidateWithLocations(Table table, File location, File expectedDataDir)
throws IOException {
private void writeAndValidateWithLocations(Table table, File expectedDataDir) throws IOException {
Schema tableSchema = table.schema(); // use the table schema because ids are reassigned

table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();

Iterable<Record> expected = RandomData.generate(tableSchema, 100, 0L);
writeData(expected, tableSchema, location.toString());
writeData(expected, tableSchema);

table.refresh();

List<Row> actual = readTable(location.toString());
List<Row> actual = readTable();

Iterator<Record> expectedIter = expected.iterator();
Iterator<Row> actualIter = actual.iterator();
Expand All @@ -204,21 +195,20 @@ private void writeAndValidateWithLocations(Table table, File location, File expe
.startsWith(expectedDataDir.getAbsolutePath()));
}

private List<Row> readTable(String location) {
Dataset<Row> result = spark.read().format("iceberg").load(location);
private List<Row> readTable() {
Dataset<Row> result = spark.read().format("iceberg").load(location.toString());

return result.collectAsList();
}

private void writeData(Iterable<Record> records, Schema schema, String location)
throws IOException {
private void writeData(Iterable<Record> records, Schema schema) throws IOException {
Dataset<Row> df = createDataset(records, schema);
DataFrameWriter<?> writer = df.write().format("iceberg").mode("append");
writer.save(location);
writer.save(location.toString());
}

private void writeDataWithFailOnPartition(
Iterable<Record> records, Schema schema, String location) throws IOException, SparkException {
private void writeDataWithFailOnPartition(Iterable<Record> records, Schema schema)
throws IOException, SparkException {
final int numPartitions = 10;
final int partitionToFail = new Random().nextInt(numPartitions);
MapPartitionsFunction<Row, Row> failOnFirstPartitionFunc =
Expand All @@ -241,7 +231,7 @@ private void writeDataWithFailOnPartition(
// Setting "check-nullability" option to "false" doesn't help as it fails at Spark analyzer.
Dataset<Row> convertedDf = df.sqlContext().createDataFrame(df.rdd(), convert(schema));
DataFrameWriter<?> writer = convertedDf.write().format("iceberg").mode("append");
writer.save(location);
writer.save(location.toString());
}

private Dataset<Row> createDataset(Iterable<Record> records, Schema schema) throws IOException {
Expand Down Expand Up @@ -287,7 +277,6 @@ public void testNullableWithWriteOption() throws IOException {
.as("Spark 3 rejects writing nulls to a required column")
.startsWith("2");

File location = temp.resolve("parquet").resolve("test").toFile();
String sourcePath = String.format("%s/nullable_poc/sourceFolder/", location);
String targetPath = String.format("%s/nullable_poc/targetFolder/", location);

Expand Down Expand Up @@ -341,7 +330,6 @@ public void testNullableWithSparkSqlOption() throws IOException {
.as("Spark 3 rejects writing nulls to a required column")
.startsWith("2");

File location = temp.resolve("parquet").resolve("test").toFile();
String sourcePath = String.format("%s/nullable_poc/sourceFolder/", location);
String targetPath = String.format("%s/nullable_poc/targetFolder/", location);

Expand Down Expand Up @@ -397,37 +385,28 @@ public void testNullableWithSparkSqlOption() throws IOException {

@TestTemplate
public void testFaultToleranceOnWrite() throws IOException {
File location = createTableFolder();
Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields());
Table table = createTable(schema, location);
Table table = createTable(schema);

Iterable<Record> records = RandomData.generate(schema, 100, 0L);
writeData(records, schema, location.toString());
writeData(records, schema);

table.refresh();

Snapshot snapshotBeforeFailingWrite = table.currentSnapshot();
List<Row> resultBeforeFailingWrite = readTable(location.toString());
List<Row> resultBeforeFailingWrite = readTable();

Iterable<Record> records2 = RandomData.generate(schema, 100, 0L);

assertThatThrownBy(() -> writeDataWithFailOnPartition(records2, schema, location.toString()))
assertThatThrownBy(() -> writeDataWithFailOnPartition(records2, schema))
.isInstanceOf(SparkException.class);

table.refresh();

Snapshot snapshotAfterFailingWrite = table.currentSnapshot();
List<Row> resultAfterFailingWrite = readTable(location.toString());
List<Row> resultAfterFailingWrite = readTable();

assertThat(snapshotBeforeFailingWrite).isEqualTo(snapshotAfterFailingWrite);
assertThat(resultBeforeFailingWrite).isEqualTo(resultAfterFailingWrite);

while (location.exists()) {
try {
FileUtils.deleteDirectory(location);
} catch (NoSuchFileException e) {
// ignore NoSuchFileException when a file is already deleted
}
}
}
}