From 3de3a99c2e117070a87ac61ed617d9211ed10363 Mon Sep 17 00:00:00 2001 From: "ted.shim" <79561371+tedshim@users.noreply.github.com> Date: Wed, 14 Feb 2024 13:44:53 +0900 Subject: [PATCH] [Improve][Connector File][HdfsFile] Improved to create configuration with text content in addition to hdfs-site.xml path. (#6) --- .../file/hdfs/source/BaseHdfsFileSource.java | 4 ++++ .../seatunnel/file/config/BaseSourceConfig.java | 6 ++++++ .../seatunnel/file/config/HadoopConf.java | 14 ++++++++++++++ 3 files changed, 24 insertions(+) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java index 57d2ceca6eb..17d174ccc34 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java @@ -64,6 +64,10 @@ public void prepare(Config pluginConfig) throws PrepareFailException { hadoopConf.setHdfsSitePath( pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_PATH.key())); } + if (pluginConfig.hasPath(HdfsSourceConfig.HDFS_SITE_XML_PROPERTIES.key())) { + hadoopConf.setHdfsSiteXmlProperties( + pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_XML_PROPERTIES.key())); + } if (pluginConfig.hasPath(HdfsSourceConfig.KERBEROS_PRINCIPAL.key())) { hadoopConf.setKerberosPrincipal( pluginConfig.getString(HdfsSourceConfig.KERBEROS_PRINCIPAL.key())); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java index 7b1e32d1dad..08fec2fc055 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java @@ -77,6 +77,12 @@ public class BaseSourceConfig { .noDefaultValue() .withDescription("The path of hdfs-site.xml"); + public static final Option HDFS_SITE_XML_PROPERTIES = + Options.key("hdfs_site_xml_properties") + .stringType() + .noDefaultValue() + .withDescription("Full content of hdfs-site.xml; can be used instead of path"); + public static final Option KERBEROS_PRINCIPAL = Options.key("kerberos_principal") .stringType() diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java index 197e70e0488..5d1d0edea59 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java @@ -21,8 +21,13 @@ import org.apache.hadoop.fs.Path; import lombok.Data; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.Serializable; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -33,6 +38,7 @@ public class HadoopConf implements Serializable { protected Map extraOptions = new HashMap<>(); protected String hdfsNameKey; protected String hdfsSitePath; + protected String hdfsSiteXmlProperties; protected String kerberosPrincipal; protected String kerberosKeytabPath; @@ -55,5 +61,13 @@ public void setExtraOptionsForConfiguration(Configuration configuration) { if (hdfsSitePath != null) { configuration.addResource(new Path(hdfsSitePath)); } + if (hdfsSiteXmlProperties != null) { + try (ByteArrayInputStream inputStream = + new ByteArrayInputStream(hdfsSiteXmlProperties.getBytes(StandardCharsets.UTF_8))) { + configuration.addResource(inputStream); + } catch (IOException e) { + throw new FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED, e.getMessage()); + } + } } }