From f39a82f5e326f48cd50df6212c9faf1a92265c4b Mon Sep 17 00:00:00 2001 From: rahulbhatia023 Date: Tue, 29 Aug 2023 10:10:53 -0400 Subject: [PATCH] Added API: convertJSONStringToStruct --- README.md | 3 +- .../DataFrameTransformerImplicits.scala | 26 ++++++++++-- .../DataFrameTransformerImplicitsSpec.scala | 41 ++++++++++++++++++- 3 files changed, 64 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 9ea6d78..bfbba6e 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,8 @@ User can use below available API methods that can be called on a spark dataframe * castFromToDataTypes * castNestedColumn * changeCaseOfColumnNames -* convertArrayOfStructToArrayOfString +* convertArrayOfStructToArrayOfJSONString +* convertJSONStringToStruct * flattenSchema * renameColumns * replaceEmptyStringsWithNulls diff --git a/src/main/scala/com/clairvoyant/data/scalaxy/transformer/DataFrameTransformerImplicits.scala b/src/main/scala/com/clairvoyant/data/scalaxy/transformer/DataFrameTransformerImplicits.scala index ba99471..b236d07 100644 --- a/src/main/scala/com/clairvoyant/data/scalaxy/transformer/DataFrameTransformerImplicits.scala +++ b/src/main/scala/com/clairvoyant/data/scalaxy/transformer/DataFrameTransformerImplicits.scala @@ -375,12 +375,12 @@ object DataFrameTransformerImplicits { } /** - * Converts the columns of array of struct type to array of string type + * Converts the columns of array of struct type to array of json string type * * @return - * DataFrame with the columns of array of struct type converted to array of string type + * DataFrame with the columns of array of struct type converted to array of json string type */ - def convertArrayOfStructToArrayOfString: DataFrame = + def convertArrayOfStructToArrayOfJSONString: DataFrame = df.schema.fields .filter(_.dataType.sql.toLowerCase().startsWith("array to_json(column))) } + /** + * Converts the column with JSON string as value to struct type + * + * @param columnName + * Name of the column to be converted + * @return + * DataFrame with the column converted to struct type + */ + def convertJSONStringToStruct( + columnName: String + ): DataFrame = + import df.sparkSession.implicits.* + df.withColumn( + columnName, + from_json( + col(columnName), + df.sparkSession.read.json(df.select(columnName).as[String]).schema + ) + ) + /** * Flattens the schema of the dataframe. If any of the column is of StructType or is nested, this transformation * removes the nested structure and represent each nested attribute at a root level. diff --git a/src/test/scala/com/clairvoyant/data/scalaxy/transformer/DataFrameTransformerImplicitsSpec.scala b/src/test/scala/com/clairvoyant/data/scalaxy/transformer/DataFrameTransformerImplicitsSpec.scala index 8e259ef..f9d000a 100644 --- a/src/test/scala/com/clairvoyant/data/scalaxy/transformer/DataFrameTransformerImplicitsSpec.scala +++ b/src/test/scala/com/clairvoyant/data/scalaxy/transformer/DataFrameTransformerImplicitsSpec.scala @@ -936,7 +936,7 @@ class DataFrameTransformerImplicitsSpec extends DataFrameReader with DataFrameMa actualDF should matchExpectedDataFrame(expectedDF) } - "convertArrayOfStructToArrayOfString()" should "convert all columns of array of struct type to array of string type" in { + "convertArrayOfStructToArrayOfJSONString()" should "convert all columns of array of struct type to array of string type" in { val df = readJSONFromText( """ |{ @@ -966,7 +966,7 @@ class DataFrameTransformerImplicitsSpec extends DataFrameReader with DataFrameMa ) ) - val actualDF = df.convertArrayOfStructToArrayOfString + val actualDF = df.convertArrayOfStructToArrayOfJSONString val expectedDF = readJSONFromText( """ @@ -987,6 +987,43 @@ class DataFrameTransformerImplicitsSpec extends DataFrameReader with DataFrameMa actualDF should matchExpectedDataFrame(expectedDF) } + "convertJSONStringToStruct() - with columnName" should "convert the specified column to Struct Type" in { + val df = readJSONFromText( + """ + |{ + | "col_A": "{\"col_B\":\"val_B1\",\"col_C\":\"val_C1\"}" + |} + |""".stripMargin + ) + + val actualDF = df.convertJSONStringToStruct( + columnName = "col_A" + ) + + val expectedDF = readJSONFromText( + """ + |{ + | "col_A": { + | "col_B": "val_B1", + | "col_C": "val_C1" + | } + |} + |""".stripMargin + ) + + actualDF.schema.fields + .filter(_.name == "col_A") + .head + .dataType shouldBe StructType( + List( + StructField("col_B", StringType), + StructField("col_C", StringType) + ) + ) + + actualDF should matchExpectedDataFrame(expectedDF) + } + "flattenSchema()" should "flatten the dataframe" in { val df = readJSONFromText( """