From d9a624c2fe81391b59846892d7f0d70a3980c34e Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Thu, 21 Nov 2024 21:30:18 +0100 Subject: [PATCH] Validate atomic fields lengths and JSON depth with Enrich defaults --- project/Dependencies.scala | 2 +- .../Configuration.scala | 47 +++++++++++++------ .../MemorySink.scala | 14 ++++-- .../Run.scala | 4 +- .../MemorySinkSpec.scala | 7 ++- 5 files changed, 48 insertions(+), 26 deletions(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index bcfcb6d..4a99fb5 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,7 +17,7 @@ object Dependencies { object V { // Snowplow val snowplowStreamCollector = "3.2.0" - val snowplowCommonEnrich = "4.2.0" + val snowplowCommonEnrich = "5.1.4-rc2" val http4sCirce = "0.23.23" val decline = "2.4.1" diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala index 090ed34..5f81ae5 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala @@ -22,7 +22,7 @@ import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} import com.snowplowanalytics.snowplow.collector.core.{Config => CollectorConfig} import com.snowplowanalytics.snowplow.enrich.common.adapters.{CallrailSchemas, CloudfrontAccessLogSchemas, GoogleAnalyticsSchemas, HubspotSchemas, MailchimpSchemas, MailgunSchemas, MandrillSchemas, MarketoSchemas, OlarkSchemas, PagerdutySchemas, PingdomSchemas, SendgridSchemas, StatusGatorSchemas, UnbounceSchemas, UrbanAirshipSchemas, VeroSchemas, AdaptersSchemas => EnrichAdaptersSchemas} -import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry +import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, EnrichmentRegistry} import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf import com.typesafe.config.{ConfigFactory, ConfigParseOptions, Config => TypesafeConfig} import fs2.io.file.{Files, Path => FS2Path} @@ -63,10 +63,15 @@ object Configuration { final case class MicroConfig(collector: CollectorConfig[SinkConfig], iglu: IgluResources, enrichmentsConfig: List[EnrichmentConf], - adaptersSchemas: AdaptersSchemas, + enrichConfig: EnrichConfig, outputEnrichedTsv: Boolean) - final case class AdaptersSchemas(adaptersSchemas: EnrichAdaptersSchemas) + final case class EnrichValidation(atomicFieldsLimits: AtomicFields) + final case class EnrichConfig( + adaptersSchemas: EnrichAdaptersSchemas, + maxJsonDepth: Int, + validation: EnrichValidation + ) final case class IgluResources(resolver: Resolver[IO], client: IgluCirceClient[IO]) @@ -76,10 +81,10 @@ object Configuration { Cli.config.map { cliConfig => for { collectorConfig <- loadCollectorConfig(cliConfig.collector) - igluResources <- loadIgluResources(cliConfig.iglu) + enrichConfig <- loadEnrichConfig() + igluResources <- loadIgluResources(cliConfig.iglu, enrichConfig.maxJsonDepth) enrichmentsConfig <- loadEnrichmentConfig(igluResources.client) - adaptersSchemas <- loadAdaptersSchemas() - } yield MicroConfig(collectorConfig, igluResources, enrichmentsConfig, adaptersSchemas, cliConfig.outputEnrichedTsv) + } yield MicroConfig(collectorConfig, igluResources, enrichmentsConfig, enrichConfig, cliConfig.outputEnrichedTsv) } } @@ -90,12 +95,12 @@ object Configuration { loadConfig[CollectorConfig[SinkConfig]](path, resolveOrder) } - private def loadIgluResources(path: Option[Path]): EitherT[IO, String, IgluResources] = { + private def loadIgluResources(path: Option[Path], maxJsonDepth: Int): EitherT[IO, String, IgluResources] = { val resolveOrder = (config: TypesafeConfig) => config.withFallback(ConfigFactory.parseResources("default-iglu-resolver.conf")) loadConfig[ResolverConfig](path, resolveOrder) - .flatMap(buildIgluResources) + .flatMap(resolverConfig => buildIgluResources(resolverConfig, maxJsonDepth)) } private def loadEnrichmentConfig(igluClient: IgluCirceClient[IO]): EitherT[IO, String, List[EnrichmentConf]] = { @@ -112,18 +117,18 @@ object Configuration { } } - private def loadAdaptersSchemas(): EitherT[IO, String, AdaptersSchemas] = { + def loadEnrichConfig(): EitherT[IO, String, EnrichConfig] = { val resolveOrder = (config: TypesafeConfig) => ConfigFactory.load(config) //It's not configurable in micro, we load it from reference.conf provided by enrich - loadConfig[AdaptersSchemas](path = None, resolveOrder) + loadConfig[EnrichConfig](path = None, resolveOrder) } - private def buildIgluResources(resolverConfig: ResolverConfig): EitherT[IO, String, IgluResources] = + private def buildIgluResources(resolverConfig: ResolverConfig, maxJsonDepth: Int): EitherT[IO, String, IgluResources] = for { resolver <- Resolver.fromConfig[IO](resolverConfig).leftMap(_.show) completeResolver = resolver.copy(repos = resolver.repos ++ readIgluExtraRegistry()) - client <- EitherT.liftF(IgluCirceClient.fromResolver[IO](completeResolver, resolverConfig.cacheSize)) + client <- EitherT.liftF(IgluCirceClient.fromResolver[IO](completeResolver, resolverConfig.cacheSize, maxJsonDepth)) } yield IgluResources(completeResolver, client) private def loadEnrichmentsAsSDD(enrichmentsDirectory: Path, @@ -226,8 +231,8 @@ object Configuration { implicit val resolverDecoder: Decoder[ResolverConfig] = Decoder.decodeJson.emap(json => Resolver.parseConfig(json).leftMap(_.show)) - implicit val adaptersSchemasDecoder: Decoder[AdaptersSchemas] = - deriveDecoder[AdaptersSchemas] + implicit val enrichConfigDecoder: Decoder[EnrichConfig] = + deriveDecoder[EnrichConfig] implicit val enrichAdaptersSchemasDecoder: Decoder[EnrichAdaptersSchemas] = deriveDecoder[EnrichAdaptersSchemas] implicit val callrailSchemasDecoder: Decoder[CallrailSchemas] = @@ -263,4 +268,18 @@ object Configuration { implicit val veroSchemasDecoder: Decoder[VeroSchemas] = deriveDecoder[VeroSchemas] + implicit val validationDecoder: Decoder[EnrichValidation] = + deriveDecoder[EnrichValidation] + implicit val atomicFieldsDecoder: Decoder[AtomicFields] = Decoder[Map[String, Int]].emap { fieldsLimits => + val configuredFields = fieldsLimits.keys.toList + val supportedFields = AtomicFields.supportedFields.map(_.name) + val unsupportedFields = configuredFields.diff(supportedFields) + + if (unsupportedFields.nonEmpty) + Left(s""" + |Configured atomic fields: ${unsupportedFields.mkString("[", ",", "]")} are not supported. + |Supported fields: ${supportedFields.mkString("[", ",", "]")}""".stripMargin) + else + Right(AtomicFields.from(fieldsLimits)) + } } diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala index a0506a9..4311e06 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala @@ -21,12 +21,13 @@ import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Payload, Process import com.snowplowanalytics.snowplow.collector.core.Sink import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline import com.snowplowanalytics.snowplow.enrich.common.adapters.{AdapterRegistry, RawEvent} -import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, EnrichmentManager, EnrichmentRegistry} +import com.snowplowanalytics.snowplow.enrich.common.enrichments.{EnrichmentManager, EnrichmentRegistry} import com.snowplowanalytics.snowplow.enrich.common.loaders.ThriftLoader import com.snowplowanalytics.snowplow.enrich.common.utils.ConversionUtils import io.circe.syntax._ import org.joda.time.DateTime import org.slf4j.LoggerFactory +import com.snowplowanalytics.snowplow.micro.Configuration.EnrichConfig /** Sink of the collector that Snowplow Micro is. * Contains the functions that are called for each tracking event sent @@ -40,10 +41,12 @@ final class MemorySink(igluClient: IgluCirceClient[IO], enrichmentRegistry: EnrichmentRegistry[IO], outputEnrichedTsv: Boolean, processor: Processor, - adapterRegistry: AdapterRegistry[IO]) extends Sink[IO] { + enrichConfig: EnrichConfig) extends Sink[IO] { override val maxBytes = Int.MaxValue private lazy val logger = LoggerFactory.getLogger("EventLog") + private val adapterRegistry = new AdapterRegistry[IO](Map.empty, enrichConfig.adaptersSchemas) + override def isHealthy: IO[Boolean] = IO.pure(true) override def storeRawEvents(events: List[Array[Byte]], key: String): IO[Unit] = { @@ -75,7 +78,7 @@ final class MemorySink(igluClient: IgluCirceClient[IO], case Validated.Valid(maybePayload) => maybePayload match { case Some(collectorPayload) => - adapterRegistry.toRawEvents(collectorPayload, igluClient, processor, registryLookup).flatMap { + adapterRegistry.toRawEvents(collectorPayload, igluClient, processor, registryLookup, enrichConfig.maxJsonDepth).flatMap { case Validated.Valid(rawEvents) => val partitionEvents = rawEvents.toList.foldLeftM((Nil, Nil): (List[GoodEvent], List[BadEvent])) { case ((good, bad), rawEvent) => @@ -135,8 +138,11 @@ final class MemorySink(igluClient: IgluCirceClient[IO], EtlPipeline.FeatureFlags(acceptInvalid = false, legacyEnrichmentOrder = false), IO.unit, registryLookup, - AtomicFields.from(Map.empty) + enrichConfig.validation.atomicFieldsLimits, + emitIncomplete = false, + enrichConfig.maxJsonDepth ) + .toEither .subflatMap { enriched => EventConverter.fromEnriched(enriched) .leftMap { failure => diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala index 96949a4..bd46f88 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala @@ -18,7 +18,6 @@ import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLook import com.snowplowanalytics.snowplow.badrows.Processor import com.snowplowanalytics.snowplow.collector.core._ import com.snowplowanalytics.snowplow.collector.core.model.Sinks -import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, EnrichmentConf} import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution} @@ -56,9 +55,8 @@ object Run { sslContext <- Resource.eval(setupSSLContext()) enrichmentRegistry <- buildEnrichmentRegistry(config.enrichmentsConfig) badProcessor = Processor(BuildInfo.name, BuildInfo.version) - adapterRegistry = new AdapterRegistry[IO](Map.empty, config.adaptersSchemas.adaptersSchemas) lookup = JavaNetRegistryLookup.ioLookupInstance[IO] - sink = new MemorySink(config.iglu.client, lookup, enrichmentRegistry, config.outputEnrichedTsv, badProcessor, adapterRegistry) + sink = new MemorySink(config.iglu.client, lookup, enrichmentRegistry, config.outputEnrichedTsv, badProcessor, config.enrichConfig) collectorService = new Service[IO]( config.collector, Sinks(sink, sink), diff --git a/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala b/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala index a69b4e7..9e08400 100644 --- a/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala +++ b/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala @@ -16,7 +16,6 @@ import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.Resolver import com.snowplowanalytics.iglu.client.resolver.registries.{JavaNetRegistryLookup, Registry} import com.snowplowanalytics.snowplow.badrows.Processor -import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry import org.specs2.mutable.SpecificationLike @@ -162,12 +161,12 @@ class MemorySinkSpec extends CatsResource[IO, MemorySink] with SpecificationLike private def createSink(): IO[MemorySink] = { for { - igluClient <- IgluCirceClient.fromResolver[IO](Resolver(List(Registry.IgluCentral), None), 500) + enrichConfig <- Configuration.loadEnrichConfig().value.map(_.getOrElse(throw new IllegalArgumentException("Can't read defaults from Enrich config"))) + igluClient <- IgluCirceClient.fromResolver[IO](Resolver[IO](List(Registry.IgluCentral), None), 500, enrichConfig.maxJsonDepth) enrichmentRegistry = new EnrichmentRegistry[IO]() processor = Processor(BuildInfo.name, BuildInfo.version) - adapterRegistry = new AdapterRegistry[IO](Map.empty, TestAdapterRegistry.adaptersSchemas) lookup = JavaNetRegistryLookup.ioLookupInstance[IO] - } yield new MemorySink(igluClient, lookup, enrichmentRegistry, false, processor, adapterRegistry) + } yield new MemorySink(igluClient, lookup, enrichmentRegistry, false, processor, enrichConfig) } }