diff --git a/README.md b/README.md index bfbba6e..e2862e7 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ User can use below available API methods that can be called on a spark dataframe * convertJSONStringToStruct * flattenSchema * renameColumns -* replaceEmptyStringsWithNulls +* replaceStringInColumnName * splitColumn ## Documentation diff --git a/build.sbt b/build.sbt index 16daa26..be0ebca 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ ThisBuild / scalaVersion := "3.3.0" ThisBuild / organization := "com.clairvoyant.data.scalaxy" -ThisBuild / version := "1.0.0" +ThisBuild / version := "1.1.0" ThisBuild / resolvers ++= Seq( "DataScalaxyTestUtil Repo" at "https://maven.pkg.github.com/teamclairvoyant/data-scalaxy-test-util" diff --git a/src/main/scala/com/clairvoyant/data/scalaxy/transformer/DataFrameTransformerImplicits.scala b/src/main/scala/com/clairvoyant/data/scalaxy/transformer/DataFrameTransformerImplicits.scala index b236d07..140345c 100644 --- a/src/main/scala/com/clairvoyant/data/scalaxy/transformer/DataFrameTransformerImplicits.scala +++ b/src/main/scala/com/clairvoyant/data/scalaxy/transformer/DataFrameTransformerImplicits.scala @@ -88,6 +88,61 @@ object DataFrameTransformerImplicits { } } + private def applyChangeNameFunctionRecursively( + schema: StructType, + changeNameFunction: String => String + ): StructType = + StructType { + schema.flatMap { + case sf @ StructField( + name, + _ @ArrayType(arrayNestedType: StructType, containsNull), + nullable, + metadata + ) => + StructType { + Seq( + sf.copy( + changeNameFunction(name), + ArrayType( + applyChangeNameFunctionRecursively(arrayNestedType, changeNameFunction), + containsNull + ), + nullable, + metadata + ) + ) + } + + case sf @ StructField( + name, + structType: StructType, + nullable, + metadata + ) => + StructType { + Seq( + sf.copy( + changeNameFunction(name), + applyChangeNameFunctionRecursively(structType, changeNameFunction), + nullable, + metadata + ) + ) + } + + case sf @ StructField( + name, + _, + _, + _ + ) => + StructType { + Seq(sf.copy(name = changeNameFunction(name))) + } + } + } + // --- PUBLIC METHODS --- // /** @@ -455,11 +510,41 @@ object DataFrameTransformerImplicits { ) /** - * Replaces all occurrences of empty strings with nulls + * Replaces a specific text in column name with the another text + * @param columnName + * The column name to be modified + * @param pattern + * The sequence of characters or text to be replaced + * @param replacement + * The target text to replace with + * @param replaceRecursively + * Flag to determine if operation needs to be performed at root level only or at nested level * @return - * DataFrame with empty strings being replaced by nulls in column values + * A dataframe with the column name modified */ - def replaceEmptyStringsWithNulls: DataFrame = df.na.replace(df.columns, Map("" -> null)) + def replaceStringInColumnName( + columnName: String, + pattern: String, + replacement: String, + replaceRecursively: Boolean + ): DataFrame = + val replaceStringInColumnNameFunction = + (colName: String) => + if (colName == columnName) + colName.replace(pattern, replacement) + else + colName + + if (replaceRecursively) + df.sparkSession.createDataFrame( + rowRDD = df.rdd, + schema = applyChangeNameFunctionRecursively( + schema = df.schema, + changeNameFunction = replaceStringInColumnNameFunction + ) + ) + else + df.withColumnRenamed(columnName, replaceStringInColumnNameFunction(columnName)) /** * Creates new columns using the value of another column that is a delimiter separated string. diff --git a/src/test/scala/com/clairvoyant/data/scalaxy/transformer/DataFrameTransformerImplicitsSpec.scala b/src/test/scala/com/clairvoyant/data/scalaxy/transformer/DataFrameTransformerImplicitsSpec.scala index f9d000a..7ed1128 100644 --- a/src/test/scala/com/clairvoyant/data/scalaxy/transformer/DataFrameTransformerImplicitsSpec.scala +++ b/src/test/scala/com/clairvoyant/data/scalaxy/transformer/DataFrameTransformerImplicitsSpec.scala @@ -1082,25 +1082,89 @@ class DataFrameTransformerImplicitsSpec extends DataFrameReader with DataFrameMa actualDF should matchExpectedDataFrame(expectedDF) } - "replaceEmptyStringsWithNulls()" should "replace all empty strings with nulls" in { + "replaceStringInColumnName() - with replaceRecursively as false" should "modify the column name" in { val df = readJSONFromText( """ |{ - | "col_A": "", - | "col_B": "val_B", - | "col_C": "" + | "col_A": 5, + | "col_B": 4, + | "col_D": { + | "col_B": 6 + | }, + | "col_F": [ + | { + | "col_B": 4.356343 + | } + | ] |} |""".stripMargin ) - val actualDF = df.replaceEmptyStringsWithNulls + val actualDF = df.replaceStringInColumnName( + columnName = "col_B", + pattern = "_B", + replacement = "_B_test", + replaceRecursively = false + ) val expectedDF = readJSONFromText( """ |{ - | "col_A": null, - | "col_B": "val_B", - | "col_C": null + | "col_A": 5, + | "col_B_test": 4, + | "col_D": { + | "col_B": 6 + | }, + | "col_F": [ + | { + | "col_B": 4.356343 + | } + | ] + |} + |""".stripMargin + ) + + actualDF should matchExpectedDataFrame(expectedDF) + } + + "replaceStringInColumnName() - with replaceRecursively as true" should "modify the column name" in { + val df = readJSONFromText( + """ + |{ + | "col_A": 5, + | "col_B": 4, + | "col_D": { + | "col_B": 6 + | }, + | "col_F": [ + | { + | "col_B": 4.356343 + | } + | ] + |} + |""".stripMargin + ) + + val actualDF = df.replaceStringInColumnName( + columnName = "col_B", + pattern = "_B", + replacement = "_B_test", + replaceRecursively = true + ) + + val expectedDF = readJSONFromText( + """ + |{ + | "col_A": 5, + | "col_B_test": 4, + | "col_D": { + | "col_B_test": 6 + | }, + | "col_F": [ + | { + | "col_B_test": 4.356343 + | } + | ] |} |""".stripMargin )