Skip to content

Commit

Permalink
Add failure entity for incomplete events
Browse files Browse the repository at this point in the history
With this commit, failure entities are started to be attached to incomplete
events as derived contexts.

There are some cases that we need more information than in the bad rows to create
failure entities therefore it isn't possible to create failure entities from bad rows directly.
For this purpose, we created wrapper classes to attach extra information about failure entities.
  • Loading branch information
spenes committed Apr 5, 2024
1 parent 9b068b3 commit c4ab89b
Show file tree
Hide file tree
Showing 10 changed files with 1,395 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ package com.snowplowanalytics.snowplow.enrich.common.enrichments

import cats.data.NonEmptyList

import io.circe.syntax._

import com.snowplowanalytics.snowplow.badrows.FailureDetails

import com.snowplowanalytics.iglu.client.ClientError.ValidationError
Expand Down Expand Up @@ -133,12 +135,19 @@ object AtomicFields {
AtomicFields(withLimits)
}

def errorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): FailureDetails.SchemaViolation = {
def errorsToSchemaViolation(errors: NonEmptyList[ValidatorReport]): Failure.SchemaViolation = {
val clientError = ValidationError(ValidatorError.InvalidData(errors), None)

FailureDetails.SchemaViolation.IgluError(
AtomicFields.atomicSchema,
clientError
val failureData = errors.toList.flatMap(e => e.path.map(p => p := e.keyword)).toMap.asJson

Failure.SchemaViolation(
schemaViolation = FailureDetails.SchemaViolation.IgluError(
AtomicFields.atomicSchema,
clientError
),
// Source atomic field and actual value of the field should be already on the ValidatorReport list
source = "atomic_field",
data = failureData
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import cats.implicits._

import com.snowplowanalytics.iglu.client.validator.ValidatorReport

import com.snowplowanalytics.snowplow.badrows.FailureDetails

import com.snowplowanalytics.snowplow.enrich.common.enrichments.AtomicFields.LimitedAtomicField
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent

Expand All @@ -37,7 +35,7 @@ object AtomicFieldsLengthValidator {
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
): IorT[F, FailureDetails.SchemaViolation, Unit] =
): IorT[F, Failure.SchemaViolation, Unit] =
IorT {
atomicFields.value
.map(validateField(event, _).toValidatedNel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.snowplow.badrows._
import com.snowplowanalytics.snowplow.badrows.{FailureDetails, Payload, Processor}
import com.snowplowanalytics.snowplow.badrows.{Failure => BadRowFailure}

import com.snowplowanalytics.snowplow.enrich.common.{EtlPipeline, QueryStringParameters, RawEventParameters}
import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent
Expand Down Expand Up @@ -71,52 +71,109 @@ object EnrichmentManager {
atomicFields: AtomicFields,
emitIncomplete: Boolean
): IorT[F, BadRow, EnrichedEvent] = {
val iorT: IorT[F, NonEmptyList[BadRow], EnrichedEvent] = for {
enriched <- IorT.pure[F, NonEmptyList[BadRow]](new EnrichedEvent)
extractResult <- mapAndValidateInput(
raw,
enriched,
etlTstamp,
processor,
client,
registryLookup
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
// Next 2 lines remove the invalid contexts and the invalid unstructured event from the event.
// This should be done after the bad row was created and only if emitIncomplete is enabled.
_ = {
enriched.contexts = ME.formatContexts(extractResult.contexts).orNull
enriched.unstruct_event = ME.formatUnstructEvent(extractResult.unstructEvent).orNull
}
enrichmentsContexts <- runEnrichments(
registry,
processor,
raw,
enriched,
extractResult.contexts,
extractResult.unstructEvent,
featureFlags.legacyEnrichmentOrder
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
_ <- validateEnriched(
enriched,
raw,
enrichmentsContexts,
extractResult.validationInfoContexts,
client,
processor,
registryLookup,
featureFlags.acceptInvalid,
invalidCount,
atomicFields
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
} yield enriched
def enrich(enriched: EnrichedEvent): IorT[F, NonEmptyList[NonEmptyList[Failure]], List[SelfDescribingData[Json]]] =
for {
extractResult <- mapAndValidateInput(
raw,
enriched,
etlTstamp,
processor,
client,
registryLookup
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
// Next 2 lines remove the invalid contexts and the invalid unstructured event from the event.
// This should be done after the bad row was created and only if emitIncomplete is enabled.
_ = {
enriched.contexts = ME.formatContexts(extractResult.contexts).orNull
enriched.unstruct_event = ME.formatUnstructEvent(extractResult.unstructEvent).orNull
}
enrichmentsContexts <- runEnrichments(
registry,
raw,
enriched,
extractResult.contexts,
extractResult.unstructEvent,
featureFlags.legacyEnrichmentOrder
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
validContexts <- validateEnriched(
enriched,
enrichmentsContexts,
client,
registryLookup,
featureFlags.acceptInvalid,
invalidCount,
atomicFields
)
.leftMap(NonEmptyList.one)
.possiblyExitingEarly(emitIncomplete)
derivedContexts = validContexts ::: extractResult.validationInfoContexts
} yield derivedContexts

// derived contexts are set lastly because we want to include failure entities
// to derived contexts as well and we can get failure entities only in the end
// of the enrichment process
IorT(
for {
enrichedEvent <- Sync[F].delay(new EnrichedEvent)
enrichmentResult <- enrich(enrichedEvent).value
now = Instant.now()
_ = setDerivedContexts(enrichedEvent, enrichmentResult, now, processor)
result = enrichmentResult
.leftMap { fe =>
createBadRow(
fe,
EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent),
RawEvent.toRawEvent(raw),
now,
processor
)
}
.map(_ => enrichedEvent)
} yield result
)
}

private def createBadRow(
fe: NonEmptyList[NonEmptyList[Failure]],
pe: Payload.PartiallyEnrichedEvent,
re: Payload.RawEvent,
timestamp: Instant,
processor: Processor
): BadRow = {
val firstList = fe.head
firstList.head match {
case h: Failure.SchemaViolation =>
val sv = firstList.tail.collect { case f: Failure.SchemaViolation => f }
BadRow.SchemaViolations(
processor,
BadRowFailure.SchemaViolations(timestamp, NonEmptyList(h, sv).map(_.schemaViolation)),
Payload.EnrichmentPayload(pe, re)
)
case h: Failure.EnrichmentFailure =>
val ef = firstList.tail.collect { case f: Failure.EnrichmentFailure => f }
BadRow.EnrichmentFailures(
processor,
BadRowFailure.EnrichmentFailures(timestamp, NonEmptyList(h, ef).map(_.enrichmentFailure)),
Payload.EnrichmentPayload(pe, re)
)
}
}

iorT.leftMap(_.head)
def setDerivedContexts(
enriched: EnrichedEvent,
enrichmentResult: Ior[NonEmptyList[NonEmptyList[Failure]], List[SelfDescribingData[Json]]],
timestamp: Instant,
processor: Processor
): Unit = {
val derivedContexts = enrichmentResult.leftMap { ll =>
ll.flatten.toList
.map(_.toSDJ(timestamp, processor))
}.merge
ME.formatContexts(derivedContexts).foreach(c => enriched.derived_contexts = c)
}

private def mapAndValidateInput[F[_]: Sync](
Expand All @@ -126,23 +183,15 @@ object EnrichmentManager {
processor: Processor,
client: IgluCirceClient[F],
registryLookup: RegistryLookup[F]
): IorT[F, BadRow, IgluUtils.EventExtractResult] = {
val iorT = for {
): IorT[F, NonEmptyList[Failure], IgluUtils.EventExtractResult] =
for {
_ <- setupEnrichedEvent[F](raw, enrichedEvent, etlTstamp, processor)
.leftMap(NonEmptyList.one)
extract <- IgluUtils.extractAndValidateInputJsons(enrichedEvent, client, registryLookup)
extract <- IgluUtils
.extractAndValidateInputJsons(enrichedEvent, client, registryLookup)
.leftMap { l: NonEmptyList[Failure] => l }
} yield extract

iorT.leftMap { violations =>
buildSchemaViolationsBadRow(
violations,
EnrichedEvent.toPartiallyEnrichedEvent(enrichedEvent),
RawEvent.toRawEvent(raw),
processor
)
}
}

/**
* Run all the enrichments
* @param enriched /!\ MUTABLE enriched event, mutated IN-PLACE /!\
Expand All @@ -151,13 +200,12 @@ object EnrichmentManager {
*/
private def runEnrichments[F[_]: Monad](
registry: EnrichmentRegistry[F],
processor: Processor,
raw: RawEvent,
enriched: EnrichedEvent,
inputContexts: List[SelfDescribingData[Json]],
unstructEvent: Option[SelfDescribingData[Json]],
legacyOrder: Boolean
): IorT[F, BadRow, List[SelfDescribingData[Json]]] =
): IorT[F, NonEmptyList[Failure], List[SelfDescribingData[Json]]] =
IorT {
accState(registry, raw, inputContexts, unstructEvent, legacyOrder)
.runS(Accumulation(enriched, Nil, Nil))
Expand All @@ -166,12 +214,7 @@ object EnrichmentManager {
failures.toNel match {
case Some(nel) =>
Ior.both(
buildEnrichmentFailuresBadRow(
nel,
EnrichedEvent.toPartiallyEnrichedEvent(enriched),
RawEvent.toRawEvent(raw),
processor
),
nel.map(Failure.EnrichmentFailure),
contexts
)
case None =>
Expand All @@ -182,33 +225,19 @@ object EnrichmentManager {

private def validateEnriched[F[_]: Clock: Monad](
enriched: EnrichedEvent,
raw: RawEvent,
enrichmentsContexts: List[SelfDescribingData[Json]],
validationInfoContexts: List[SelfDescribingData[Json]],
client: IgluCirceClient[F],
processor: Processor,
registryLookup: RegistryLookup[F],
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
): IorT[F, BadRow, Unit] = {
val iorT = for {
): IorT[F, NonEmptyList[Failure], List[SelfDescribingData[Json]]] =
for {
validContexts <- IgluUtils.validateEnrichmentsContexts[F](client, enrichmentsContexts, registryLookup)
_ = ME.formatContexts(validContexts ::: validationInfoContexts).foreach(enriched.derived_contexts = _)
_ <- AtomicFieldsLengthValidator
.validate[F](enriched, acceptInvalid, invalidCount, atomicFields)
.leftMap(NonEmptyList.one)
} yield ()

iorT.leftMap { violations =>
buildSchemaViolationsBadRow(
violations,
EnrichedEvent.toPartiallyEnrichedEvent(enriched),
RawEvent.toRawEvent(raw),
processor
)
}
}
.leftMap { v: Failure => NonEmptyList.one(v) }
} yield validContexts

private[enrichments] case class Accumulation(
event: EnrichedEvent,
Expand Down Expand Up @@ -336,7 +365,7 @@ object EnrichmentManager {
e: EnrichedEvent,
etlTstamp: DateTime,
processor: Processor
): IorT[F, FailureDetails.SchemaViolation, Unit] =
): IorT[F, Failure.SchemaViolation, Unit] =
IorT {
Sync[F].delay {
e.event_id = EE.generateEventId() // May be updated later if we have an `eid` parameter
Expand Down Expand Up @@ -847,30 +876,6 @@ object EnrichmentManager {
}
}

private def buildSchemaViolationsBadRow(
vs: NonEmptyList[FailureDetails.SchemaViolation],
pee: Payload.PartiallyEnrichedEvent,
re: Payload.RawEvent,
processor: Processor
): BadRow.SchemaViolations =
BadRow.SchemaViolations(
processor,
Failure.SchemaViolations(Instant.now(), vs),
Payload.EnrichmentPayload(pee, re)
)

private def buildEnrichmentFailuresBadRow(
fs: NonEmptyList[FailureDetails.EnrichmentFailure],
pee: Payload.PartiallyEnrichedEvent,
re: Payload.RawEvent,
processor: Processor
) =
BadRow.EnrichmentFailures(
processor,
Failure.EnrichmentFailures(Instant.now(), fs),
Payload.EnrichmentPayload(pee, re)
)

private implicit class IorTOps[F[_], A, B](val iorT: IorT[F, A, B]) extends AnyVal {

/** If the incomplete events feature is disabled, then convert a Both to a Left, so we don't waste time with next steps */
Expand Down
Loading

0 comments on commit c4ab89b

Please sign in to comment.