Skip to content

Commit

Permalink
Merge pull request #6 from teamclairvoyant/staging
Browse files Browse the repository at this point in the history
Added API: convertJSONStringToStruct
  • Loading branch information
rahulbhatia023 authored Aug 29, 2023
2 parents 08492af + f39a82f commit 0932393
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 6 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ User can use below available API methods that can be called on a spark dataframe
* castFromToDataTypes
* castNestedColumn
* changeCaseOfColumnNames
* convertArrayOfStructToArrayOfString
* convertArrayOfStructToArrayOfJSONString
* convertJSONStringToStruct
* flattenSchema
* renameColumns
* replaceEmptyStringsWithNulls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,19 +375,39 @@ object DataFrameTransformerImplicits {
}

/**
* Converts the columns of array of struct type to array of string type
* Converts the columns of array of struct type to array of json string type
*
* @return
* DataFrame with the columns of array of struct type converted to array of string type
* DataFrame with the columns of array of struct type converted to array of json string type
*/
def convertArrayOfStructToArrayOfString: DataFrame =
def convertArrayOfStructToArrayOfJSONString: DataFrame =
df.schema.fields
.filter(_.dataType.sql.toLowerCase().startsWith("array<struct"))
.map(_.name)
.foldLeft(df) { (dataFrame, fieldName) =>
dataFrame.withColumn(fieldName, transform(col(fieldName), column => to_json(column)))
}

/**
* Converts the column with JSON string as value to struct type
*
* @param columnName
* Name of the column to be converted
* @return
* DataFrame with the column converted to struct type
*/
def convertJSONStringToStruct(
columnName: String
): DataFrame =
import df.sparkSession.implicits.*
df.withColumn(
columnName,
from_json(
col(columnName),
df.sparkSession.read.json(df.select(columnName).as[String]).schema
)
)

/**
* Flattens the schema of the dataframe. If any of the column is of StructType or is nested, this transformation
* removes the nested structure and represent each nested attribute at a root level.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,7 @@ class DataFrameTransformerImplicitsSpec extends DataFrameReader with DataFrameMa
actualDF should matchExpectedDataFrame(expectedDF)
}

"convertArrayOfStructToArrayOfString()" should "convert all columns of array of struct type to array of string type" in {
"convertArrayOfStructToArrayOfJSONString()" should "convert all columns of array of struct type to array of string type" in {
val df = readJSONFromText(
"""
|{
Expand Down Expand Up @@ -966,7 +966,7 @@ class DataFrameTransformerImplicitsSpec extends DataFrameReader with DataFrameMa
)
)

val actualDF = df.convertArrayOfStructToArrayOfString
val actualDF = df.convertArrayOfStructToArrayOfJSONString

val expectedDF = readJSONFromText(
"""
Expand All @@ -987,6 +987,43 @@ class DataFrameTransformerImplicitsSpec extends DataFrameReader with DataFrameMa
actualDF should matchExpectedDataFrame(expectedDF)
}

"convertJSONStringToStruct() - with columnName" should "convert the specified column to Struct Type" in {
val df = readJSONFromText(
"""
|{
| "col_A": "{\"col_B\":\"val_B1\",\"col_C\":\"val_C1\"}"
|}
|""".stripMargin
)

val actualDF = df.convertJSONStringToStruct(
columnName = "col_A"
)

val expectedDF = readJSONFromText(
"""
|{
| "col_A": {
| "col_B": "val_B1",
| "col_C": "val_C1"
| }
|}
|""".stripMargin
)

actualDF.schema.fields
.filter(_.name == "col_A")
.head
.dataType shouldBe StructType(
List(
StructField("col_B", StringType),
StructField("col_C", StringType)
)
)

actualDF should matchExpectedDataFrame(expectedDF)
}

"flattenSchema()" should "flatten the dataframe" in {
val df = readJSONFromText(
"""
Expand Down

0 comments on commit 0932393

Please sign in to comment.