Skip to content

Commit

Permalink
Refactor. Start of new test.
Browse files Browse the repository at this point in the history
  • Loading branch information
PhillHenry committed Dec 6, 2023
1 parent 1421e51 commit 2d3fefa
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package uk.co.odinconsultants
import io.delta.tables.DeltaTable
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.DataFrame
import org.scalatest.GivenWhenThen
import org.scalatest.matchers.should.Matchers._
import uk.co.odinconsultants.documentation_utils.SQLUtils.createTableSQL
import uk.co.odinconsultants.documentation_utils.{Datum, SpecPretifier, TableNameFixture}
import uk.co.odinconsultants.documentation_utils.{SpecPretifier, TableNameFixture}

import java.io.ByteArrayOutputStream

Expand All @@ -19,7 +18,7 @@ class ChangeDataFlowSpec extends SpecPretifier with GivenWhenThen with TableName
val pkCol: String = "id"
val condition: String = s"$tableName.$pkCol = $sinkTable.$pkCol"
"be created and populated" in new SimpleSparkFixture {
givenCDFTable(tableName, spark)
Given(aCDFTable(tableName, spark))

When(s"we write ${data.length} rows to $tableName")
appendData(tableName)
Expand Down Expand Up @@ -72,23 +71,6 @@ class ChangeDataFlowSpec extends SpecPretifier with GivenWhenThen with TableName
}
new String(out.toByteArray)
}

def givenCDFTable(tableName: String, spark: SparkSession): DataFrame = {
val createCDF: String =
s"${createTableSQLUsingDelta(tableName)} TBLPROPERTIES (delta.enableChangeDataFeed = true)"
Given(s"a table created with the SQL: ${formatSQL(createCDF)}")
spark.sqlContext.sql(createCDF)
}

def describeHistory(
tableName: String,
spark: SparkSession,
): DataFrame =
spark.sqlContext.sql(s"DESCRIBE HISTORY $tableName")

def createTableSQLUsingDelta(tableName: String): String =
s"""${createTableSQL(tableName, classOf[Datum])}
|USING DELTA""".stripMargin
}

object ChangeDataFlowSpec {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package uk.co.odinconsultants

import io.delta.tables.DeltaTable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalatest.GivenWhenThen
import org.scalatest.matchers.should.Matchers._
import uk.co.odinconsultants.documentation_utils.SQLUtils.createTableSQL
import uk.co.odinconsultants.documentation_utils.{Datum, SpecPretifier, TableNameFixture}
import uk.co.odinconsultants.documentation_utils.{SpecPretifier, TableNameFixture}

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration
Expand All @@ -20,7 +19,7 @@ class ChangeDataFlowStreamingSpec extends SpecPretifier with GivenWhenThen with

"A dataset that is updated" should {
"write its deltas to another table as a stream" in new SimpleSparkFixture {
givenCDFTable(tableName, spark)
Given(aCDFTable(tableName, spark))
override def num_rows: Int = 100
val streamSinkTable = "streamsink"
val sinkSQL = createTableSQLUsingDelta(streamSinkTable)
Expand Down Expand Up @@ -77,15 +76,5 @@ class ChangeDataFlowStreamingSpec extends SpecPretifier with GivenWhenThen with
}
}

def givenCDFTable(tableName: String, spark: SparkSession): DataFrame = {
val createCDF: String =
s"${createTableSQLUsingDelta(tableName)} TBLPROPERTIES (delta.enableChangeDataFeed = true)"
Given(s"a table created with the SQL: ${formatSQL(createCDF)}")
spark.sqlContext.sql(createCDF)
}

def createTableSQLUsingDelta(tableName: String): String =
s"""${createTableSQL(tableName, classOf[Datum])}
|USING DELTA""".stripMargin
}

15 changes: 15 additions & 0 deletions modules/core/src/test/scala/uk/co/odinconsultants/CrudSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package uk.co.odinconsultants
import org.scalatest.GivenWhenThen
import uk.co.odinconsultants.documentation_utils.{SpecPretifier, TableNameFixture}

class CrudSpec extends SpecPretifier with GivenWhenThen with TableNameFixture {

"A Delta table" should {
"be created and populated" in new SimpleSparkFixture {
val sinkSQL = createTableSQLUsingDelta(tableName)
Given(s"a table created with SQL${formatSQL(sinkSQL)}")
spark.sqlContext.sql(sinkSQL)
}
}

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
package uk.co.odinconsultants
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{DataFrame, SparkSession}
import uk.co.odinconsultants.SparkUtils.tmpDir
import uk.co.odinconsultants.documentation_utils.SimpleFixture
import uk.co.odinconsultants.documentation_utils.SQLUtils.createTableSQL
import uk.co.odinconsultants.documentation_utils.{Datum, SimpleFixture, SpecFormats}

trait SimpleSparkFixture extends SimpleFixture {
trait SimpleSparkFixture extends SimpleFixture with SpecFormats {

val spark: SparkSession = SparkUtils.sparkSession

def dataDir(tableName: String): String = s"$tmpDir/$tableName/data"

def appendData(tableName: String): Unit = spark.createDataFrame(data).writeTo(tableName).append()

def aCDFTable(tableName: String, spark: SparkSession): String = {
val createCDF: String =
s"${createTableSQLUsingDelta(tableName)} TBLPROPERTIES (delta.enableChangeDataFeed = true)"
spark.sqlContext.sql(createCDF)
s"a table created with the SQL: ${formatSQL(createCDF)}"
}

def describeHistory(
tableName: String,
spark: SparkSession,
): DataFrame =
spark.sqlContext.sql(s"DESCRIBE HISTORY $tableName")

def createTableSQLUsingDelta(tableName: String): String =
s"""${createTableSQL(tableName, classOf[Datum])}
|USING DELTA""".stripMargin

}

0 comments on commit 2d3fefa

Please sign in to comment.