diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java index 01c36b824ea6..a8b4c915868a 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java @@ -28,7 +28,6 @@ import java.io.File; import java.io.IOException; import java.net.URI; -import java.nio.file.NoSuchFileException; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; @@ -36,7 +35,6 @@ import java.util.Map; import java.util.Random; import org.apache.avro.generic.GenericData.Record; -import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Files; import org.apache.iceberg.Parameter; @@ -76,6 +74,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; @ExtendWith(ParameterizedTestExtension.class) public class TestDataFrameWrites extends ParameterizedAvroDataTest { @@ -88,6 +87,8 @@ public static Collection parameters() { @Parameter private String format; + @TempDir private File location; + private static SparkSession spark = null; private static JavaSparkContext sc = null; @@ -140,47 +141,37 @@ public static void stopSpark() { @Override protected void writeAndValidate(Schema schema) throws IOException { - File location = createTableFolder(); - Table table = createTable(schema, location); - writeAndValidateWithLocations(table, location, new File(location, "data")); + Table table = createTable(schema); + writeAndValidateWithLocations(table, new File(location, "data")); } @TestTemplate public void testWriteWithCustomDataLocation() throws IOException { - File location = createTableFolder(); File tablePropertyDataLocation = temp.resolve("test-table-property-data-dir").toFile(); - Table table = createTable(new Schema(SUPPORTED_PRIMITIVES.fields()), location); + Table table = createTable(new Schema(SUPPORTED_PRIMITIVES.fields())); table .updateProperties() .set(TableProperties.WRITE_DATA_LOCATION, tablePropertyDataLocation.getAbsolutePath()) .commit(); - writeAndValidateWithLocations(table, location, tablePropertyDataLocation); - } - - private File createTableFolder() throws IOException { - File parent = temp.resolve("parquet").toFile(); - File location = new File(parent, "test"); - assertThat(location.mkdirs()).as("Mkdir should succeed").isTrue(); - return location; + writeAndValidateWithLocations(table, tablePropertyDataLocation); } - private Table createTable(Schema schema, File location) { + private Table createTable(Schema schema) { HadoopTables tables = new HadoopTables(CONF); return tables.create(schema, PartitionSpec.unpartitioned(), location.toString()); } - private void writeAndValidateWithLocations(Table table, File location, File expectedDataDir) - throws IOException { + private void writeAndValidateWithLocations(Table table, File expectedDataDir) throws IOException { Schema tableSchema = table.schema(); // use the table schema because ids are reassigned table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit(); Iterable expected = RandomData.generate(tableSchema, 100, 0L); - writeData(expected, tableSchema, location.toString()); + writeData(expected, tableSchema); table.refresh(); - List actual = readTable(location.toString()); + List actual = readTable(); Iterator expectedIter = expected.iterator(); Iterator actualIter = actual.iterator(); @@ -204,21 +195,20 @@ private void writeAndValidateWithLocations(Table table, File location, File expe .startsWith(expectedDataDir.getAbsolutePath())); } - private List readTable(String location) { - Dataset result = spark.read().format("iceberg").load(location); + private List readTable() { + Dataset result = spark.read().format("iceberg").load(location.toString()); return result.collectAsList(); } - private void writeData(Iterable records, Schema schema, String location) - throws IOException { + private void writeData(Iterable records, Schema schema) throws IOException { Dataset df = createDataset(records, schema); DataFrameWriter writer = df.write().format("iceberg").mode("append"); - writer.save(location); + writer.save(location.toString()); } - private void writeDataWithFailOnPartition( - Iterable records, Schema schema, String location) throws IOException, SparkException { + private void writeDataWithFailOnPartition(Iterable records, Schema schema) + throws IOException, SparkException { final int numPartitions = 10; final int partitionToFail = new Random().nextInt(numPartitions); MapPartitionsFunction failOnFirstPartitionFunc = @@ -241,7 +231,7 @@ private void writeDataWithFailOnPartition( // Setting "check-nullability" option to "false" doesn't help as it fails at Spark analyzer. Dataset convertedDf = df.sqlContext().createDataFrame(df.rdd(), convert(schema)); DataFrameWriter writer = convertedDf.write().format("iceberg").mode("append"); - writer.save(location); + writer.save(location.toString()); } private Dataset createDataset(Iterable records, Schema schema) throws IOException { @@ -287,7 +277,6 @@ public void testNullableWithWriteOption() throws IOException { .as("Spark 3 rejects writing nulls to a required column") .startsWith("2"); - File location = temp.resolve("parquet").resolve("test").toFile(); String sourcePath = String.format("%s/nullable_poc/sourceFolder/", location); String targetPath = String.format("%s/nullable_poc/targetFolder/", location); @@ -341,7 +330,6 @@ public void testNullableWithSparkSqlOption() throws IOException { .as("Spark 3 rejects writing nulls to a required column") .startsWith("2"); - File location = temp.resolve("parquet").resolve("test").toFile(); String sourcePath = String.format("%s/nullable_poc/sourceFolder/", location); String targetPath = String.format("%s/nullable_poc/targetFolder/", location); @@ -397,37 +385,28 @@ public void testNullableWithSparkSqlOption() throws IOException { @TestTemplate public void testFaultToleranceOnWrite() throws IOException { - File location = createTableFolder(); Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields()); - Table table = createTable(schema, location); + Table table = createTable(schema); Iterable records = RandomData.generate(schema, 100, 0L); - writeData(records, schema, location.toString()); + writeData(records, schema); table.refresh(); Snapshot snapshotBeforeFailingWrite = table.currentSnapshot(); - List resultBeforeFailingWrite = readTable(location.toString()); + List resultBeforeFailingWrite = readTable(); Iterable records2 = RandomData.generate(schema, 100, 0L); - assertThatThrownBy(() -> writeDataWithFailOnPartition(records2, schema, location.toString())) + assertThatThrownBy(() -> writeDataWithFailOnPartition(records2, schema)) .isInstanceOf(SparkException.class); table.refresh(); Snapshot snapshotAfterFailingWrite = table.currentSnapshot(); - List resultAfterFailingWrite = readTable(location.toString()); + List resultAfterFailingWrite = readTable(); assertThat(snapshotBeforeFailingWrite).isEqualTo(snapshotAfterFailingWrite); assertThat(resultBeforeFailingWrite).isEqualTo(resultAfterFailingWrite); - - while (location.exists()) { - try { - FileUtils.deleteDirectory(location); - } catch (NoSuchFileException e) { - // ignore NoSuchFileException when a file is already deleted - } - } } }