Skip to content

Commit

Permalink
Validate atomic fields lengths and JSON depth with Enrich defaults
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Nov 23, 2024
1 parent e8766d0 commit d9a624c
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 26 deletions.
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object Dependencies {
object V {
// Snowplow
val snowplowStreamCollector = "3.2.0"
val snowplowCommonEnrich = "4.2.0"
val snowplowCommonEnrich = "5.1.4-rc2"
val http4sCirce = "0.23.23"

val decline = "2.4.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.snowplow.collector.core.{Config => CollectorConfig}
import com.snowplowanalytics.snowplow.enrich.common.adapters.{CallrailSchemas, CloudfrontAccessLogSchemas, GoogleAnalyticsSchemas, HubspotSchemas, MailchimpSchemas, MailgunSchemas, MandrillSchemas, MarketoSchemas, OlarkSchemas, PagerdutySchemas, PingdomSchemas, SendgridSchemas, StatusGatorSchemas, UnbounceSchemas, UrbanAirshipSchemas, VeroSchemas, AdaptersSchemas => EnrichAdaptersSchemas}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, EnrichmentRegistry}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.typesafe.config.{ConfigFactory, ConfigParseOptions, Config => TypesafeConfig}
import fs2.io.file.{Files, Path => FS2Path}
Expand Down Expand Up @@ -63,10 +63,15 @@ object Configuration {
final case class MicroConfig(collector: CollectorConfig[SinkConfig],
iglu: IgluResources,
enrichmentsConfig: List[EnrichmentConf],
adaptersSchemas: AdaptersSchemas,
enrichConfig: EnrichConfig,
outputEnrichedTsv: Boolean)

final case class AdaptersSchemas(adaptersSchemas: EnrichAdaptersSchemas)
final case class EnrichValidation(atomicFieldsLimits: AtomicFields)
final case class EnrichConfig(
adaptersSchemas: EnrichAdaptersSchemas,
maxJsonDepth: Int,
validation: EnrichValidation
)

final case class IgluResources(resolver: Resolver[IO], client: IgluCirceClient[IO])

Expand All @@ -76,10 +81,10 @@ object Configuration {
Cli.config.map { cliConfig =>
for {
collectorConfig <- loadCollectorConfig(cliConfig.collector)
igluResources <- loadIgluResources(cliConfig.iglu)
enrichConfig <- loadEnrichConfig()
igluResources <- loadIgluResources(cliConfig.iglu, enrichConfig.maxJsonDepth)
enrichmentsConfig <- loadEnrichmentConfig(igluResources.client)
adaptersSchemas <- loadAdaptersSchemas()
} yield MicroConfig(collectorConfig, igluResources, enrichmentsConfig, adaptersSchemas, cliConfig.outputEnrichedTsv)
} yield MicroConfig(collectorConfig, igluResources, enrichmentsConfig, enrichConfig, cliConfig.outputEnrichedTsv)
}
}

Expand All @@ -90,12 +95,12 @@ object Configuration {
loadConfig[CollectorConfig[SinkConfig]](path, resolveOrder)
}

private def loadIgluResources(path: Option[Path]): EitherT[IO, String, IgluResources] = {
private def loadIgluResources(path: Option[Path], maxJsonDepth: Int): EitherT[IO, String, IgluResources] = {
val resolveOrder = (config: TypesafeConfig) =>
config.withFallback(ConfigFactory.parseResources("default-iglu-resolver.conf"))

loadConfig[ResolverConfig](path, resolveOrder)
.flatMap(buildIgluResources)
.flatMap(resolverConfig => buildIgluResources(resolverConfig, maxJsonDepth))
}

private def loadEnrichmentConfig(igluClient: IgluCirceClient[IO]): EitherT[IO, String, List[EnrichmentConf]] = {
Expand All @@ -112,18 +117,18 @@ object Configuration {
}
}

private def loadAdaptersSchemas(): EitherT[IO, String, AdaptersSchemas] = {
def loadEnrichConfig(): EitherT[IO, String, EnrichConfig] = {
val resolveOrder = (config: TypesafeConfig) => ConfigFactory.load(config)

//It's not configurable in micro, we load it from reference.conf provided by enrich
loadConfig[AdaptersSchemas](path = None, resolveOrder)
loadConfig[EnrichConfig](path = None, resolveOrder)
}

private def buildIgluResources(resolverConfig: ResolverConfig): EitherT[IO, String, IgluResources] =
private def buildIgluResources(resolverConfig: ResolverConfig, maxJsonDepth: Int): EitherT[IO, String, IgluResources] =
for {
resolver <- Resolver.fromConfig[IO](resolverConfig).leftMap(_.show)
completeResolver = resolver.copy(repos = resolver.repos ++ readIgluExtraRegistry())
client <- EitherT.liftF(IgluCirceClient.fromResolver[IO](completeResolver, resolverConfig.cacheSize))
client <- EitherT.liftF(IgluCirceClient.fromResolver[IO](completeResolver, resolverConfig.cacheSize, maxJsonDepth))
} yield IgluResources(completeResolver, client)

private def loadEnrichmentsAsSDD(enrichmentsDirectory: Path,
Expand Down Expand Up @@ -226,8 +231,8 @@ object Configuration {

implicit val resolverDecoder: Decoder[ResolverConfig] = Decoder.decodeJson.emap(json => Resolver.parseConfig(json).leftMap(_.show))

implicit val adaptersSchemasDecoder: Decoder[AdaptersSchemas] =
deriveDecoder[AdaptersSchemas]
implicit val enrichConfigDecoder: Decoder[EnrichConfig] =
deriveDecoder[EnrichConfig]
implicit val enrichAdaptersSchemasDecoder: Decoder[EnrichAdaptersSchemas] =
deriveDecoder[EnrichAdaptersSchemas]
implicit val callrailSchemasDecoder: Decoder[CallrailSchemas] =
Expand Down Expand Up @@ -263,4 +268,18 @@ object Configuration {
implicit val veroSchemasDecoder: Decoder[VeroSchemas] =
deriveDecoder[VeroSchemas]

implicit val validationDecoder: Decoder[EnrichValidation] =
deriveDecoder[EnrichValidation]
implicit val atomicFieldsDecoder: Decoder[AtomicFields] = Decoder[Map[String, Int]].emap { fieldsLimits =>
val configuredFields = fieldsLimits.keys.toList
val supportedFields = AtomicFields.supportedFields.map(_.name)
val unsupportedFields = configuredFields.diff(supportedFields)

if (unsupportedFields.nonEmpty)
Left(s"""
|Configured atomic fields: ${unsupportedFields.mkString("[", ",", "]")} are not supported.
|Supported fields: ${supportedFields.mkString("[", ",", "]")}""".stripMargin)
else
Right(AtomicFields.from(fieldsLimits))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Payload, Process
import com.snowplowanalytics.snowplow.collector.core.Sink
import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline
import com.snowplowanalytics.snowplow.enrich.common.adapters.{AdapterRegistry, RawEvent}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, EnrichmentManager, EnrichmentRegistry}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{EnrichmentManager, EnrichmentRegistry}
import com.snowplowanalytics.snowplow.enrich.common.loaders.ThriftLoader
import com.snowplowanalytics.snowplow.enrich.common.utils.ConversionUtils
import io.circe.syntax._
import org.joda.time.DateTime
import org.slf4j.LoggerFactory
import com.snowplowanalytics.snowplow.micro.Configuration.EnrichConfig

/** Sink of the collector that Snowplow Micro is.
* Contains the functions that are called for each tracking event sent
Expand All @@ -40,10 +41,12 @@ final class MemorySink(igluClient: IgluCirceClient[IO],
enrichmentRegistry: EnrichmentRegistry[IO],
outputEnrichedTsv: Boolean,
processor: Processor,
adapterRegistry: AdapterRegistry[IO]) extends Sink[IO] {
enrichConfig: EnrichConfig) extends Sink[IO] {
override val maxBytes = Int.MaxValue
private lazy val logger = LoggerFactory.getLogger("EventLog")

private val adapterRegistry = new AdapterRegistry[IO](Map.empty, enrichConfig.adaptersSchemas)

override def isHealthy: IO[Boolean] = IO.pure(true)

override def storeRawEvents(events: List[Array[Byte]], key: String): IO[Unit] = {
Expand Down Expand Up @@ -75,7 +78,7 @@ final class MemorySink(igluClient: IgluCirceClient[IO],
case Validated.Valid(maybePayload) =>
maybePayload match {
case Some(collectorPayload) =>
adapterRegistry.toRawEvents(collectorPayload, igluClient, processor, registryLookup).flatMap {
adapterRegistry.toRawEvents(collectorPayload, igluClient, processor, registryLookup, enrichConfig.maxJsonDepth).flatMap {
case Validated.Valid(rawEvents) =>
val partitionEvents = rawEvents.toList.foldLeftM((Nil, Nil): (List[GoodEvent], List[BadEvent])) {
case ((good, bad), rawEvent) =>
Expand Down Expand Up @@ -135,8 +138,11 @@ final class MemorySink(igluClient: IgluCirceClient[IO],
EtlPipeline.FeatureFlags(acceptInvalid = false, legacyEnrichmentOrder = false),
IO.unit,
registryLookup,
AtomicFields.from(Map.empty)
enrichConfig.validation.atomicFieldsLimits,
emitIncomplete = false,
enrichConfig.maxJsonDepth
)
.toEither
.subflatMap { enriched =>
EventConverter.fromEnriched(enriched)
.leftMap { failure =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLook
import com.snowplowanalytics.snowplow.badrows.Processor
import com.snowplowanalytics.snowplow.collector.core._
import com.snowplowanalytics.snowplow.collector.core.model.Sinks
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, EnrichmentConf}
import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution}
Expand Down Expand Up @@ -56,9 +55,8 @@ object Run {
sslContext <- Resource.eval(setupSSLContext())
enrichmentRegistry <- buildEnrichmentRegistry(config.enrichmentsConfig)
badProcessor = Processor(BuildInfo.name, BuildInfo.version)
adapterRegistry = new AdapterRegistry[IO](Map.empty, config.adaptersSchemas.adaptersSchemas)
lookup = JavaNetRegistryLookup.ioLookupInstance[IO]
sink = new MemorySink(config.iglu.client, lookup, enrichmentRegistry, config.outputEnrichedTsv, badProcessor, adapterRegistry)
sink = new MemorySink(config.iglu.client, lookup, enrichmentRegistry, config.outputEnrichedTsv, badProcessor, config.enrichConfig)
collectorService = new Service[IO](
config.collector,
Sinks(sink, sink),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import com.snowplowanalytics.iglu.client.IgluCirceClient
import com.snowplowanalytics.iglu.client.resolver.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.{JavaNetRegistryLookup, Registry}
import com.snowplowanalytics.snowplow.badrows.Processor
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import org.specs2.mutable.SpecificationLike

Expand Down Expand Up @@ -162,12 +161,12 @@ class MemorySinkSpec extends CatsResource[IO, MemorySink] with SpecificationLike

private def createSink(): IO[MemorySink] = {
for {
igluClient <- IgluCirceClient.fromResolver[IO](Resolver(List(Registry.IgluCentral), None), 500)
enrichConfig <- Configuration.loadEnrichConfig().value.map(_.getOrElse(throw new IllegalArgumentException("Can't read defaults from Enrich config")))
igluClient <- IgluCirceClient.fromResolver[IO](Resolver[IO](List(Registry.IgluCentral), None), 500, enrichConfig.maxJsonDepth)
enrichmentRegistry = new EnrichmentRegistry[IO]()
processor = Processor(BuildInfo.name, BuildInfo.version)
adapterRegistry = new AdapterRegistry[IO](Map.empty, TestAdapterRegistry.adaptersSchemas)
lookup = JavaNetRegistryLookup.ioLookupInstance[IO]
} yield new MemorySink(igluClient, lookup, enrichmentRegistry, false, processor, adapterRegistry)
} yield new MemorySink(igluClient, lookup, enrichmentRegistry, false, processor, enrichConfig)
}

}

0 comments on commit d9a624c

Please sign in to comment.