Skip to content

Commit

Permalink
Take superseding schema into account during validation (close #751)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed May 3, 2023
1 parent 7318ec5 commit 2b3fdc9
Show file tree
Hide file tree
Showing 19 changed files with 521 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,12 @@ class EventGenEtlPipelineSpec extends Specification with CatsIO {
.map(_.toJson(false))
.map(_.foldWith(folder))
.map(_.noSpaces)
.map(
_.replaceAll(
"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0",
"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1"
)
)
.map(decode[IntermediateEvent])
.rethrow
.map(IntermediateEvent.pad)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import cats.data.Validated.{Invalid, Valid}

import io.circe.Json
import io.circe.syntax._
import io.circe.parser.{parse => jparse}

import org.apache.thrift.TSerializer

Expand Down Expand Up @@ -116,10 +117,18 @@ object BlackBoxTesting extends Specification with CatsIO {

private def checkEnriched(enriched: EnrichedEvent, expectedFields: Map[String, String]) = {
val asMap = getMap(enriched)
val r = expectedFields.map { case (k, v) => asMap.get(k) must beSome(v) }
val r = expectedFields.map {
case (k, v) if k == "unstruct_event" || k == "contexts" || k == "derived_contexts" =>
compareJsons(asMap.getOrElse(k, ""), v) must beTrue
case (k, v) =>
asMap.get(k) must beSome(v)
}
r.toList.reduce(_ and _)
}

private def compareJsons(j1: String, j2: String): Boolean =
j1 == j2 || jparse(j1).toOption.get == jparse(j2).toOption.get

private val enrichedFields = classOf[EnrichedEvent].getDeclaredFields()
enrichedFields.foreach(_.setAccessible(true))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ApiRequestEnrichmentSpec extends Specification with CatsIO {
)
val expected = Map(
"unstruct_event" -> json"""{"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow-website/signup_form_submitted/jsonschema/1-0-0","data":{"name":"Bob®","email":"[email protected]","company":"SP","eventsPerMonth":"< 1 million","serviceType":"unsure"}}}""".noSpaces,
"contexts" -> json"""{"data":[{"data":{"osType":"OSX","appleIdfa":"some_appleIdfa","openIdfa":"some_Idfa","carrier":"some_carrier","deviceModel":"large","osVersion":"3.0.0","appleIdfa":"some_appleIdfa","androidIdfa":"some_androidIdfa","deviceManufacturer":"Amstrad"},"schema":"iglu:com.snowplowanalytics.snowplow/mobile_context/jsonschema/1-0-0"},{"data":{"longitude":10,"bearing":50,"speed":16,"altitude":20,"altitudeAccuracy":0.3,"latitudeLongitudeAccuracy":0.5,"latitude":7},"schema":"iglu:com.snowplowanalytics.snowplow/geolocation_context/jsonschema/1-0-0"}],"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0"}""".noSpaces,
"contexts" -> json"""{"data":[{"data":{"osType":"OSX","appleIdfa":"some_appleIdfa","openIdfa":"some_Idfa","carrier":"some_carrier","deviceModel":"large","osVersion":"3.0.0","appleIdfa":"some_appleIdfa","androidIdfa":"some_androidIdfa","deviceManufacturer":"Amstrad"},"schema":"iglu:com.snowplowanalytics.snowplow/mobile_context/jsonschema/1-0-0"},{"data":{"longitude":10,"bearing":50,"speed":16,"altitude":20,"altitudeAccuracy":0.3,"latitudeLongitudeAccuracy":0.5,"latitude":7},"schema":"iglu:com.snowplowanalytics.snowplow/geolocation_context/jsonschema/1-0-0"}],"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1"}""".noSpaces,
"derived_contexts" -> json"""{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:com.statusgator/status_change/jsonschema/1-0-0","data":{"serviceName": "sp-api-request-enrichment"}}]}""".noSpaces
)
BlackBoxTesting.runTest(input, expected, Some(ApiRequestEnrichmentSpec.conf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class TransactionSpec extends Specification with CatsIO {
"txn_id" -> "28288",
"tr_country" -> "UK",
"tr_city" -> "London",
"contexts" -> json"""{"data":[{"schema":"iglu:com.snowplowanalytics.snowplow/uri_redirect/jsonschema/1-0-0","data":{"uri":"http://snowplowanalytics.com/"}}],"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0"}""".noSpaces
"contexts" -> json"""{"data":[{"schema":"iglu:com.snowplowanalytics.snowplow/uri_redirect/jsonschema/1-0-0","data":{"uri":"http://snowplowanalytics.com/"}}],"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1"}""".noSpaces
)
BlackBoxTesting.runTest(input, expected)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,21 @@ object EnrichmentManager {
): EitherT[F, BadRow, EnrichedEvent] =
for {
enriched <- EitherT.fromEither[F](setupEnrichedEvent(raw, etlTstamp, processor))
inputSDJs <- IgluUtils.extractAndValidateInputJsons(enriched, client, raw, processor)
(inputContexts, unstructEvent) = inputSDJs
extractResult <- IgluUtils.extractAndValidateInputJsons(enriched, client, raw, processor)
_ = {
ME.formatUnstructEvent(extractResult.unstructEvent).foreach(e => enriched.unstruct_event = e)
ME.formatContexts(extractResult.contexts).foreach(c => enriched.contexts = c)
}
enrichmentsContexts <- runEnrichments(
registry,
processor,
raw,
enriched,
inputContexts,
unstructEvent,
extractResult.contexts,
extractResult.unstructEvent,
featureFlags.legacyEnrichmentOrder
)
_ <- EitherT.rightT[F, BadRow] {
if (enrichmentsContexts.nonEmpty)
enriched.derived_contexts = ME.formatDerivedContexts(enrichmentsContexts)
}
_ = ME.formatContexts(enrichmentsContexts ::: extractResult.validationInfoContexts).foreach(c => enriched.derived_contexts = c)
_ <- IgluUtils
.validateEnrichmentsContexts[F](client, enrichmentsContexts, raw, processor, enriched)
_ <- EitherT.rightT[F, BadRow](
Expand Down Expand Up @@ -665,9 +665,8 @@ object EnrichmentManager {
case (event, derivedContexts) =>
javascriptScript match {
case Some(jse) =>
if (derivedContexts.nonEmpty)
event.derived_contexts = ME.formatDerivedContexts(derivedContexts)
jse.process(event).leftMap(NonEmptyList.one(_))
ME.formatContexts(derivedContexts).foreach(c => event.derived_contexts = c)
jse.process(event).leftMap(NonEmptyList.one)
case None => Nil.asRight
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ object MiscEnrichments {
val ContextsSchema =
SchemaKey("com.snowplowanalytics.snowplow", "contexts", "jsonschema", SchemaVer.Full(1, 0, 1))

val UnstructEventSchema =
SchemaKey("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", SchemaVer.Full(1, 0, 0))

/**
* The version of this ETL. Appends this version to the supplied "host" ETL.
* @param processor The version of the host ETL running this library
Expand Down Expand Up @@ -82,6 +85,11 @@ object MiscEnrichments {
}

/** Turn a list of custom contexts into a self-describing JSON property */
def formatDerivedContexts(derivedContexts: List[SelfDescribingData[Json]]): String =
SelfDescribingData(ContextsSchema, Json.arr(derivedContexts.map(_.normalize): _*)).asString
def formatContexts(contexts: List[SelfDescribingData[Json]]): Option[String] =
if (contexts.isEmpty) None
else Some(SelfDescribingData(ContextsSchema, Json.arr(contexts.map(_.normalize): _*)).asString)

/** Turn a unstruct event into a self-describing JSON property */
def formatUnstructEvent(unstructEvent: Option[SelfDescribingData[Json]]): Option[String] =
unstructEvent.map(e => SelfDescribingData(UnstructEventSchema, e.normalize).asString)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ import cats.data.{EitherT, NonEmptyList, Validated, ValidatedNel}
import cats.effect.Clock
import cats.implicits._

import io.circe.Json
import io.circe._
import io.circe.syntax._
import io.circe.generic.semiauto._

import java.time.Instant

import com.snowplowanalytics.iglu.client.{ClientError, IgluCirceClient}
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SelfDescribingData}
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.snowplow.badrows._
Expand Down Expand Up @@ -61,7 +63,7 @@ object IgluUtils {
): EitherT[
F,
BadRow.SchemaViolations,
(List[SelfDescribingData[Json]], Option[SelfDescribingData[Json]])
EventExtractResult
] =
EitherT {
for {
Expand All @@ -71,7 +73,9 @@ object IgluUtils {
.map(_.toValidatedNel)
} yield (contexts, unstruct)
.mapN { (c, ue) =>
(c, ue)
val validationInfoContexts = (c.flatMap(_.validationInfo) ::: ue.flatMap(_.validationInfo).toList).distinct
.map(_.toSdj)
EventExtractResult(contexts = c.map(_.sdj), unstructEvent = ue.map(_.sdj), validationInfoContexts = validationInfoContexts)
}
.leftMap { schemaViolations =>
buildSchemaViolationsBadRow(
Expand All @@ -98,27 +102,17 @@ object IgluUtils {
client: IgluCirceClient[F],
field: String = "ue_properties",
criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", 1, 0)
): F[Validated[FailureDetails.SchemaViolation, Option[SelfDescribingData[Json]]]] =
): F[Validated[FailureDetails.SchemaViolation, Option[SdjExtractResult]]] =
(Option(enriched.unstruct_event) match {
case Some(rawUnstructEvent) =>
for {
// Validate input Json string and extract unstructured event
unstruct <- extractInputData(rawUnstructEvent, field, criterion, client)
// Parse Json unstructured event as SelfDescribingData[Json]
unstructSDJ <- SelfDescribingData
.parse(unstruct)
.leftMap(FailureDetails.SchemaViolation.NotIglu(unstruct, _))
.toEitherT[F]
// Check SelfDescribingData[Json] of unstructured event
_ <- check(client, unstructSDJ)
.leftMap {
case (schemaKey, clientError) =>
FailureDetails.SchemaViolation.IgluError(schemaKey, clientError)
}
.leftWiden[FailureDetails.SchemaViolation]
unstructSDJ <- parseAndValidateSDJ_sv(unstruct, client)
} yield unstructSDJ.some
case None =>
EitherT.rightT[F, FailureDetails.SchemaViolation](none[SelfDescribingData[Json]])
EitherT.rightT[F, FailureDetails.SchemaViolation](none[SdjExtractResult])
}).toValidated

/**
Expand All @@ -135,7 +129,7 @@ object IgluUtils {
client: IgluCirceClient[F],
field: String = "contexts",
criterion: SchemaCriterion = SchemaCriterion("com.snowplowanalytics.snowplow", "contexts", "jsonschema", 1, 0)
): F[ValidatedNel[FailureDetails.SchemaViolation, List[SelfDescribingData[Json]]]] =
): F[ValidatedNel[FailureDetails.SchemaViolation, List[SdjExtractResult]]] =
(Option(enriched.contexts) match {
case Some(rawContexts) =>
for {
Expand All @@ -153,7 +147,7 @@ object IgluUtils {
} yield contextsSDJ
case None =>
EitherT.rightT[F, NonEmptyList[FailureDetails.SchemaViolation]](
List.empty[SelfDescribingData[Json]]
List.empty[SdjExtractResult]
)
}).toValidated

Expand Down Expand Up @@ -238,10 +232,10 @@ object IgluUtils {
private def check[F[_]: Monad: RegistryLookup: Clock](
client: IgluCirceClient[F],
sdj: SelfDescribingData[Json]
): EitherT[F, (SchemaKey, ClientError), Unit] =
): EitherT[F, (SchemaKey, ClientError), Option[SchemaVer.Full]] =
client
.check(sdj)
.leftMap(clientErr => (sdj.schema, clientErr))
.leftMap((sdj.schema, _))

/** Check a list of SDJs and merge the Iglu errors */
private def checkList[F[_]: Monad: RegistryLookup: Clock](
Expand All @@ -259,20 +253,54 @@ object IgluUtils {
private def parseAndValidateSDJ_sv[F[_]: Monad: RegistryLookup: Clock]( // _sv for SchemaViolation
json: Json,
client: IgluCirceClient[F]
): EitherT[F, FailureDetails.SchemaViolation, SelfDescribingData[Json]] =
): EitherT[F, FailureDetails.SchemaViolation, SdjExtractResult] =
for {
sdj <- SelfDescribingData
.parse(json)
.leftMap(FailureDetails.SchemaViolation.NotIglu(json, _))
.toEitherT[F]
_ <- check(client, sdj)
.leftMap {
case (schemaKey, clientError) =>
FailureDetails.SchemaViolation
.IgluError(schemaKey, clientError): FailureDetails.SchemaViolation
supersedingSchema <- check(client, sdj)
.leftMap {
case (schemaKey, clientError) =>
FailureDetails.SchemaViolation
.IgluError(schemaKey, clientError): FailureDetails.SchemaViolation

}
} yield sdj
}
validationInfo = supersedingSchema.map(s => ValidationInfo(sdj.schema, s))
sdjUpdated = replaceSchemaVersion(sdj, validationInfo)
} yield SdjExtractResult(sdjUpdated, validationInfo)

private def replaceSchemaVersion(
sdj: SelfDescribingData[Json],
validationInfo: Option[ValidationInfo]
): SelfDescribingData[Json] =
validationInfo match {
case None => sdj
case Some(s) => sdj.copy(schema = sdj.schema.copy(version = s.validatedWith))
}

case class ValidationInfo(originalSchema: SchemaKey, validatedWith: SchemaVer.Full) {
def toSdj: SelfDescribingData[Json] =
SelfDescribingData(ValidationInfo.schemaKey, (this: ValidationInfo).asJson)
}

object ValidationInfo {
val schemaKey = SchemaKey("com.snowplowanalytics.iglu", "validation_info", "jsonschema", SchemaVer.Full(1, 0, 0))

implicit val schemaVerFullEncoder: Encoder[SchemaVer.Full] =
Encoder.encodeString.contramap(v => v.asString)

implicit val validationInfoEncoder: Encoder[ValidationInfo] =
deriveEncoder[ValidationInfo]
}

case class SdjExtractResult(sdj: SelfDescribingData[Json], validationInfo: Option[ValidationInfo])

case class EventExtractResult(
contexts: List[SelfDescribingData[Json]],
unstructEvent: Option[SelfDescribingData[Json]],
validationInfoContexts: List[SelfDescribingData[Json]]
)

/** Build `BadRow.SchemaViolations` from a list of `FailureDetails.SchemaViolation`s */
def buildSchemaViolationsBadRow(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Superseding schema example",
"$supersededBy": "1-0-1",
"self": {
"vendor": "com.acme",
"name": "superseding_schema",
"format": "jsonschema",
"version": "1-0-0"
},
"type": "object",
"properties": {
"field_a": {
"type": "string"
},
"field_b": {
"type": "string"
},
"field_c": {
"type": "string"
}
},
"required": ["field_a", "field_b"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Superseding schema example",
"self": {
"vendor": "com.acme",
"name": "superseding_schema",
"format": "jsonschema",
"version": "1-0-1"
},
"type": "object",
"properties": {
"field_a": {
"type": "string"
},
"field_b": {
"type": "string"
},
"field_c": {
"type": "string"
},
"field_d": {
"type": "string"
}
},
"required": ["field_a", "field_b"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Superseding schema example",
"$supersededBy": "2-0-1",
"self": {
"vendor": "com.acme",
"name": "superseding_schema",
"format": "jsonschema",
"version": "2-0-0"
},
"type": "object",
"properties": {
"field_e": {
"type": "string"
},
"field_f": {
"type": "string"
}
},
"required": ["field_e", "field_f"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Superseding schema example",
"self": {
"vendor": "com.acme",
"name": "superseding_schema",
"format": "jsonschema",
"version": "2-0-1"
},
"type": "object",
"properties": {
"field_e": {
"type": "string"
},
"field_f": {
"type": "string"
},
"field_g": {
"type": "string"
}
},
"required": ["field_e", "field_f"],
"additionalProperties": false
}
Loading

0 comments on commit 2b3fdc9

Please sign in to comment.