Skip to content

Commit

Permalink
Core: lazily load default Hadoop Configuration to avoid NPE with Hado…
Browse files Browse the repository at this point in the history
…opFileIO because FileIOParser doesn't serialize Hadoop configuration (#10926)

Co-authored-by: Eduard Tudenhoefner <[email protected]>
  • Loading branch information
stevenzwu and nastra authored Oct 17, 2024
1 parent 3def1f4 commit f4ffe13
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 7 deletions.
24 changes: 17 additions & 7 deletions core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public HadoopFileIO(SerializableSupplier<Configuration> hadoopConf) {
}

public Configuration conf() {
return hadoopConf.get();
return getConf();
}

@Override
Expand All @@ -84,23 +84,23 @@ public void initialize(Map<String, String> props) {

@Override
public InputFile newInputFile(String path) {
return HadoopInputFile.fromLocation(path, hadoopConf.get());
return HadoopInputFile.fromLocation(path, getConf());
}

@Override
public InputFile newInputFile(String path, long length) {
return HadoopInputFile.fromLocation(path, length, hadoopConf.get());
return HadoopInputFile.fromLocation(path, length, getConf());
}

@Override
public OutputFile newOutputFile(String path) {
return HadoopOutputFile.fromPath(new Path(path), hadoopConf.get());
return HadoopOutputFile.fromPath(new Path(path), getConf());
}

@Override
public void deleteFile(String path) {
Path toDelete = new Path(path);
FileSystem fs = Util.getFs(toDelete, hadoopConf.get());
FileSystem fs = Util.getFs(toDelete, getConf());
try {
fs.delete(toDelete, false /* not recursive */);
} catch (IOException e) {
Expand All @@ -120,6 +120,16 @@ public void setConf(Configuration conf) {

@Override
public Configuration getConf() {
// Create a default hadoopConf as it is required for the object to be valid.
// E.g. newInputFile would throw NPE with getConf() otherwise.
if (hadoopConf == null) {
synchronized (this) {
if (hadoopConf == null) {
this.hadoopConf = new SerializableConfiguration(new Configuration())::get;
}
}
}

return hadoopConf.get();
}

Expand All @@ -132,7 +142,7 @@ public void serializeConfWith(
@Override
public Iterable<FileInfo> listPrefix(String prefix) {
Path prefixToList = new Path(prefix);
FileSystem fs = Util.getFs(prefixToList, hadoopConf.get());
FileSystem fs = Util.getFs(prefixToList, getConf());

return () -> {
try {
Expand All @@ -154,7 +164,7 @@ public Iterable<FileInfo> listPrefix(String prefix) {
@Override
public void deletePrefix(String prefix) {
Path prefixToDelete = new Path(prefix);
FileSystem fs = Util.getFs(prefixToDelete, hadoopConf.get());
FileSystem fs = Util.getFs(prefixToDelete, getConf());

try {
fs.delete(prefixToDelete, true /* recursive */);
Expand Down
49 changes: 49 additions & 0 deletions core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.util.List;
import java.util.Random;
import java.util.UUID;
Expand All @@ -36,6 +37,7 @@
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileIOParser;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -176,6 +178,53 @@ public void testResolvingFileIOLoad() {
assertThat(result).isInstanceOf(HadoopFileIO.class);
}

@Test
public void testJsonParserWithoutHadoopConf() throws Exception {
this.hadoopFileIO = new HadoopFileIO();

hadoopFileIO.initialize(ImmutableMap.of("properties-bar", "2"));
assertThat(hadoopFileIO.properties().get("properties-bar")).isEqualTo("2");

testJsonParser(hadoopFileIO, tempDir);
}

@Test
public void testJsonParserWithHadoopConf() throws Exception {
this.hadoopFileIO = new HadoopFileIO();

Configuration hadoopConf = new Configuration();
hadoopConf.setInt("hadoop-conf-foo", 1);
hadoopFileIO.setConf(hadoopConf);
assertThat(hadoopFileIO.conf().get("hadoop-conf-foo")).isNotNull();

hadoopFileIO.initialize(ImmutableMap.of("properties-bar", "2"));
assertThat(hadoopFileIO.properties().get("properties-bar")).isEqualTo("2");

testJsonParser(hadoopFileIO, tempDir);
}

private static void testJsonParser(HadoopFileIO hadoopFileIO, File tempDir) throws Exception {
String json = FileIOParser.toJson(hadoopFileIO);
try (FileIO deserialized = FileIOParser.fromJson(json)) {
assertThat(deserialized).isInstanceOf(HadoopFileIO.class);
HadoopFileIO deserializedHadoopFileIO = (HadoopFileIO) deserialized;

// properties are carried over during serialization and deserialization
assertThat(deserializedHadoopFileIO.properties()).isEqualTo(hadoopFileIO.properties());

// FileIOParser doesn't serialize and deserialize Hadoop configuration
// so config "foo" is not restored in deserialized object.
assertThat(deserializedHadoopFileIO.conf().get("hadoop-conf-foo")).isNull();

// make sure deserialized io can create input file
String inputFilePath =
Files.createTempDirectory(tempDir.toPath(), "junit").toFile().getAbsolutePath()
+ "/test.parquet";
deserializedHadoopFileIO.newInputFile(
File.createTempFile("test", "parquet", tempDir).toString());
}
}

private List<Path> createRandomFiles(Path parent, int count) {
Vector<Path> paths = new Vector<>();
random
Expand Down

0 comments on commit f4ffe13

Please sign in to comment.