Skip to content

Commit

Permalink
Merge pull request #5 from teamclairvoyant/staging
Browse files Browse the repository at this point in the history
Added API: convertArrayOfStructToArrayOfString
rahulbhatia023 authored Aug 28, 2023
2 parents 555545a + 3e44f5c commit 08492af
Showing 3 changed files with 81 additions and 17 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ User can use below available API methods that can be called on a spark dataframe
* castFromToDataTypes
* castNestedColumn
* changeCaseOfColumnNames
* convertArrayOfStructToArrayOfString
* flattenSchema
* renameColumns
* replaceEmptyStringsWithNulls
Original file line number Diff line number Diff line change
@@ -91,7 +91,7 @@ object DataFrameTransformerImplicits {
// --- PUBLIC METHODS --- //

/**
* It lets the user add a new column with a literal value of the desired data type
* Adds a new column with a literal value of the desired data type
* @param columnName
* Name of the new column to be added
* @param columnValue
@@ -119,7 +119,7 @@ object DataFrameTransformerImplicits {
addColumn(df, columnName, columnValue, columnDataType)

/**
* It lets the user add a new column with an expression value of the desired data type
* Adds a new column with an expression value of the desired data type
*
* @param columnName
* Name of the new column to be added
@@ -148,7 +148,7 @@ object DataFrameTransformerImplicits {
addColumnWithExpression(df, columnName, columnExpression, columnDataType)

/**
* It lets the user add a desired prefix to column names
* Adds a desired prefix to column names
* @param prefix
* The desired prefix to be added to column names
* @param columnNames
@@ -167,7 +167,7 @@ object DataFrameTransformerImplicits {
)

/**
* It lets the user add a desired suffix to column names
* Adds a desired suffix to column names
*
* @param suffix
* The desired suffix to be added to column names
@@ -187,7 +187,7 @@ object DataFrameTransformerImplicits {
)

/**
* It lets the user cast the data type of multiple columns to the desired different types at once
* Casts the data type of multiple columns to the desired different types at once
* @param columnDataTypeMapper
* Mapping of column names to its corresponding desired data types
* @return
@@ -205,8 +205,7 @@ object DataFrameTransformerImplicits {
)

/**
* It lets the user cast the data type of multiple columns to the desired different types at once based on the
* prefix of the columns
* Casts the data type of multiple columns to the desired different types at once based on the prefix of the columns
* @param prefix
* Prefix string based on which given columns to be selected to cast them to the desired data type
* @param dataType
@@ -226,8 +225,7 @@ object DataFrameTransformerImplicits {
)

/**
* It lets the user cast the data type of multiple columns to the desired different types at once based on the
* suffix of the columns
* Casts the data type of multiple columns to the desired different types at once based on the suffix of the columns
*
* @param suffix
* Suffix string based on which given columns to be selected to cast them to the desired data type
@@ -248,7 +246,7 @@ object DataFrameTransformerImplicits {
)

/**
* It lets users cast all columns having X data type to a different Y data type
* Casts all columns having X data type to a different Y data type
*
* @param dataTypeMapper
* Defines the mapping of source data type and target data type
@@ -323,7 +321,7 @@ object DataFrameTransformerImplicits {
}

/**
* It lets the user cast the data type of any nested or struct type column from one type to another
* Casts the data type of any nested or struct type column from one type to another
* @param columnName
* The name of the nested column
* @param schemaDDL
@@ -337,7 +335,7 @@ object DataFrameTransformerImplicits {
): DataFrame = df.withColumn(columnName, from_json(to_json(col(columnName)), DataType.fromDDL(schemaDDL)))

/**
* It lets the user change the case of the column names
* Changes the case of column names
* @param sourceCaseType
* The original case type
* @param targetCaseType
@@ -377,8 +375,22 @@ object DataFrameTransformerImplicits {
}

/**
* It lets the user flatten the schema of the dataframe. If any of the column is of StructType or is nested, this
* transformation removes the nested structure and represent each nested attribute at a root level.
* Converts the columns of array of struct type to array of string type
*
* @return
* DataFrame with the columns of array of struct type converted to array of string type
*/
def convertArrayOfStructToArrayOfString: DataFrame =
df.schema.fields
.filter(_.dataType.sql.toLowerCase().startsWith("array<struct"))
.map(_.name)
.foldLeft(df) { (dataFrame, fieldName) =>
dataFrame.withColumn(fieldName, transform(col(fieldName), column => to_json(column)))
}

/**
* Flattens the schema of the dataframe. If any of the column is of StructType or is nested, this transformation
* removes the nested structure and represent each nested attribute at a root level.
* @return
* DataFrame with the flattened schema
*/
@@ -405,7 +417,7 @@ object DataFrameTransformerImplicits {
}

/**
* It lets the user rename one or multiple columns at once
* Renames one or multiple columns at once
* @param renameColumnMapper
* Defines the mapping of the existing and desired column name
* @return
@@ -423,14 +435,14 @@ object DataFrameTransformerImplicits {
)

/**
* It lets users replace all occurrences of empty strings with nulls
* Replaces all occurrences of empty strings with nulls
* @return
* DataFrame with empty strings being replaced by nulls in column values
*/
def replaceEmptyStringsWithNulls: DataFrame = df.na.replace(df.columns, Map("" -> null))

/**
* It lets user create new columns using the value of another column that is a delimiter separated string.
* Creates new columns using the value of another column that is a delimiter separated string.
* @param fromColumn
* Name of the source column having delimiter separated string as a value from which new columns need to be
* created
Original file line number Diff line number Diff line change
@@ -936,6 +936,57 @@ class DataFrameTransformerImplicitsSpec extends DataFrameReader with DataFrameMa
actualDF should matchExpectedDataFrame(expectedDF)
}

"convertArrayOfStructToArrayOfString()" should "convert all columns of array of struct type to array of string type" in {
val df = readJSONFromText(
"""
|{
| "col_A": [
| {
| "col_B": "val_B1",
| "col_C": "val_C1"
| },
| {
| "col_B": "val_B2",
| "col_C": "val_C2"
| }
| ]
|}
|""".stripMargin
)

df.schema.fields
.filter(_.name == "col_A")
.head
.dataType shouldBe ArrayType(
StructType(
List(
StructField("col_B", StringType),
StructField("col_C", StringType)
)
)
)

val actualDF = df.convertArrayOfStructToArrayOfString

val expectedDF = readJSONFromText(
"""
|{
| "col_A": [
| "{\"col_B\":\"val_B1\",\"col_C\":\"val_C1\"}",
| "{\"col_B\":\"val_B2\",\"col_C\":\"val_C2\"}"
| ]
|}
|""".stripMargin
)

actualDF.schema.fields
.filter(_.name == "col_A")
.head
.dataType shouldBe ArrayType(StringType)

actualDF should matchExpectedDataFrame(expectedDF)
}

"flattenSchema()" should "flatten the dataframe" in {
val df = readJSONFromText(
"""

0 comments on commit 08492af

Please sign in to comment.