Skip to content

Commit

Permalink
Merge pull request #9 from teamclairvoyant/staging
Browse files Browse the repository at this point in the history
Staging
  • Loading branch information
rahulbhatia023 authored Oct 24, 2023
2 parents 0932393 + 60be735 commit 985df62
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 13 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ User can use below available API methods that can be called on a spark dataframe
* convertJSONStringToStruct
* flattenSchema
* renameColumns
* replaceEmptyStringsWithNulls
* replaceStringInColumnName
* splitColumn

## Documentation
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ ThisBuild / scalaVersion := "3.3.0"

ThisBuild / organization := "com.clairvoyant.data.scalaxy"

ThisBuild / version := "1.0.0"
ThisBuild / version := "1.1.0"

ThisBuild / resolvers ++= Seq(
"DataScalaxyTestUtil Repo" at "https://maven.pkg.github.com/teamclairvoyant/data-scalaxy-test-util"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,61 @@ object DataFrameTransformerImplicits {
}
}

private def applyChangeNameFunctionRecursively(
schema: StructType,
changeNameFunction: String => String
): StructType =
StructType {
schema.flatMap {
case sf @ StructField(
name,
_ @ArrayType(arrayNestedType: StructType, containsNull),
nullable,
metadata
) =>
StructType {
Seq(
sf.copy(
changeNameFunction(name),
ArrayType(
applyChangeNameFunctionRecursively(arrayNestedType, changeNameFunction),
containsNull
),
nullable,
metadata
)
)
}

case sf @ StructField(
name,
structType: StructType,
nullable,
metadata
) =>
StructType {
Seq(
sf.copy(
changeNameFunction(name),
applyChangeNameFunctionRecursively(structType, changeNameFunction),
nullable,
metadata
)
)
}

case sf @ StructField(
name,
_,
_,
_
) =>
StructType {
Seq(sf.copy(name = changeNameFunction(name)))
}
}
}

// --- PUBLIC METHODS --- //

/**
Expand Down Expand Up @@ -455,11 +510,41 @@ object DataFrameTransformerImplicits {
)

/**
* Replaces all occurrences of empty strings with nulls
* Replaces a specific text in column name with the another text
* @param columnName
* The column name to be modified
* @param pattern
* The sequence of characters or text to be replaced
* @param replacement
* The target text to replace with
* @param replaceRecursively
* Flag to determine if operation needs to be performed at root level only or at nested level
* @return
* DataFrame with empty strings being replaced by nulls in column values
* A dataframe with the column name modified
*/
def replaceEmptyStringsWithNulls: DataFrame = df.na.replace(df.columns, Map("" -> null))
def replaceStringInColumnName(
columnName: String,
pattern: String,
replacement: String,
replaceRecursively: Boolean
): DataFrame =
val replaceStringInColumnNameFunction =
(colName: String) =>
if (colName == columnName)
colName.replace(pattern, replacement)
else
colName

if (replaceRecursively)
df.sparkSession.createDataFrame(
rowRDD = df.rdd,
schema = applyChangeNameFunctionRecursively(
schema = df.schema,
changeNameFunction = replaceStringInColumnNameFunction
)
)
else
df.withColumnRenamed(columnName, replaceStringInColumnNameFunction(columnName))

/**
* Creates new columns using the value of another column that is a delimiter separated string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1082,25 +1082,89 @@ class DataFrameTransformerImplicitsSpec extends DataFrameReader with DataFrameMa
actualDF should matchExpectedDataFrame(expectedDF)
}

"replaceEmptyStringsWithNulls()" should "replace all empty strings with nulls" in {
"replaceStringInColumnName() - with replaceRecursively as false" should "modify the column name" in {
val df = readJSONFromText(
"""
|{
| "col_A": "",
| "col_B": "val_B",
| "col_C": ""
| "col_A": 5,
| "col_B": 4,
| "col_D": {
| "col_B": 6
| },
| "col_F": [
| {
| "col_B": 4.356343
| }
| ]
|}
|""".stripMargin
)

val actualDF = df.replaceEmptyStringsWithNulls
val actualDF = df.replaceStringInColumnName(
columnName = "col_B",
pattern = "_B",
replacement = "_B_test",
replaceRecursively = false
)

val expectedDF = readJSONFromText(
"""
|{
| "col_A": null,
| "col_B": "val_B",
| "col_C": null
| "col_A": 5,
| "col_B_test": 4,
| "col_D": {
| "col_B": 6
| },
| "col_F": [
| {
| "col_B": 4.356343
| }
| ]
|}
|""".stripMargin
)

actualDF should matchExpectedDataFrame(expectedDF)
}

"replaceStringInColumnName() - with replaceRecursively as true" should "modify the column name" in {
val df = readJSONFromText(
"""
|{
| "col_A": 5,
| "col_B": 4,
| "col_D": {
| "col_B": 6
| },
| "col_F": [
| {
| "col_B": 4.356343
| }
| ]
|}
|""".stripMargin
)

val actualDF = df.replaceStringInColumnName(
columnName = "col_B",
pattern = "_B",
replacement = "_B_test",
replaceRecursively = true
)

val expectedDF = readJSONFromText(
"""
|{
| "col_A": 5,
| "col_B_test": 4,
| "col_D": {
| "col_B_test": 6
| },
| "col_F": [
| {
| "col_B_test": 4.356343
| }
| ]
|}
|""".stripMargin
)
Expand Down

0 comments on commit 985df62

Please sign in to comment.