Skip to content

Commit

Permalink
Merge pull request #7 from teamclairvoyant/REST-144
Browse files Browse the repository at this point in the history
REST:144 - Add provision to pass schema to the 'ConvertJSONStringToStruct' transformation
rahulbhatia023 authored Oct 25, 2023
2 parents 60be735 + 90f6b27 commit d150992
Showing 4 changed files with 57 additions and 3 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@
.metals
.venv
.vscode
.bsp
.idea

project
target
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ ThisBuild / scalaVersion := "3.3.0"

ThisBuild / organization := "com.clairvoyant.data.scalaxy"

ThisBuild / version := "1.1.0"
ThisBuild / version := "1.2.0"

ThisBuild / resolvers ++= Seq(
"DataScalaxyTestUtil Repo" at "https://maven.pkg.github.com/teamclairvoyant/data-scalaxy-test-util"
Original file line number Diff line number Diff line change
@@ -448,18 +448,30 @@ object DataFrameTransformerImplicits {
*
* @param columnName
* Name of the column to be converted
* @param schemaDDL
* The Data Definition Language (DDL) for the column
* @return
* DataFrame with the column converted to struct type
*/
def convertJSONStringToStruct(
columnName: String
columnName: String,
schemaDDL: Option[String] = None
): DataFrame =
import df.sparkSession.implicits.*

val schema =
schemaDDL match {
case Some(schemaDDL) =>
DataType.fromDDL(schemaDDL)
case None =>
df.sparkSession.read.json(df.select(columnName).as[String]).schema
}

df.withColumn(
columnName,
from_json(
col(columnName),
df.sparkSession.read.json(df.select(columnName).as[String]).schema
schema
)
)

Original file line number Diff line number Diff line change
@@ -1024,6 +1024,46 @@ class DataFrameTransformerImplicitsSpec extends DataFrameReader with DataFrameMa
actualDF should matchExpectedDataFrame(expectedDF)
}

"convertJSONStringToStruct() - with columnName and schemaDDL" should "convert the specified column to Struct Type" in {
val df = readJSONFromText(
"""
|{
| "col_A": "val_A",
| "col_B": "{\"col_C\": \"val_C\",\"col_D\": 5}"
|}
|""".stripMargin
)

val actualDF = df.convertJSONStringToStruct(
columnName = "col_B",
schemaDDL = Some("col_C STRING, col_D STRING")
)

val expectedDF = readJSONFromText(
"""
|{
| "col_A": "val_A",
| "col_B": {
| "col_C": "val_C",
| "col_D": "5"
| }
|}
|""".stripMargin
)

actualDF.schema.fields
.filter(_.name == "col_B")
.head
.dataType shouldBe StructType(
List(
StructField("col_C", StringType),
StructField("col_D", StringType)
)
)

actualDF should matchExpectedDataFrame(expectedDF)
}

"flattenSchema()" should "flatten the dataframe" in {
val df = readJSONFromText(
"""

0 comments on commit d150992

Please sign in to comment.