diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java index 57d2ceca6eb..17d174ccc34 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java @@ -64,6 +64,10 @@ public void prepare(Config pluginConfig) throws PrepareFailException { hadoopConf.setHdfsSitePath( pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_PATH.key())); } + if (pluginConfig.hasPath(HdfsSourceConfig.HDFS_SITE_XML_PROPERTIES.key())) { + hadoopConf.setHdfsSiteXmlProperties( + pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_XML_PROPERTIES.key())); + } if (pluginConfig.hasPath(HdfsSourceConfig.KERBEROS_PRINCIPAL.key())) { hadoopConf.setKerberosPrincipal( pluginConfig.getString(HdfsSourceConfig.KERBEROS_PRINCIPAL.key())); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java index 7b1e32d1dad..08fec2fc055 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java @@ -77,6 +77,12 @@ public class BaseSourceConfig { .noDefaultValue() .withDescription("The path of hdfs-site.xml"); + public static final Option HDFS_SITE_XML_PROPERTIES = + Options.key("hdfs_site_xml_properties") + .stringType() + .noDefaultValue() + .withDescription("Full content of hdfs-site.xml; can be used instead of path"); + public static final Option KERBEROS_PRINCIPAL = Options.key("kerberos_principal") .stringType() diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java index 197e70e0488..5d1d0edea59 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java @@ -21,8 +21,13 @@ import org.apache.hadoop.fs.Path; import lombok.Data; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.Serializable; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -33,6 +38,7 @@ public class HadoopConf implements Serializable { protected Map extraOptions = new HashMap<>(); protected String hdfsNameKey; protected String hdfsSitePath; + protected String hdfsSiteXmlProperties; protected String kerberosPrincipal; protected String kerberosKeytabPath; @@ -55,5 +61,13 @@ public void setExtraOptionsForConfiguration(Configuration configuration) { if (hdfsSitePath != null) { configuration.addResource(new Path(hdfsSitePath)); } + if (hdfsSiteXmlProperties != null) { + try (ByteArrayInputStream inputStream = + new ByteArrayInputStream(hdfsSiteXmlProperties.getBytes(StandardCharsets.UTF_8))) { + configuration.addResource(inputStream); + } catch (IOException e) { + throw new FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED, e.getMessage()); + } + } } }