From 0d64df12369a7728c8ede203a7578c54cf88fc5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Poniedzia=C5=82ek?= Date: Fri, 16 Feb 2024 14:56:47 +0100 Subject: [PATCH] Load default adapters schemas provided by Enrich --- .../Configuration.scala | 71 ++++++++++++++++--- .../Run.scala | 3 +- .../MemorySinkSpec.scala | 3 +- .../TestAdapterRegistry.scala} | 10 +-- 4 files changed, 68 insertions(+), 19 deletions(-) rename src/{main/scala/com.snowplowanalytics.snowplow.micro/MicroAdapterRegistry.scala => test/scala/com.snowplowanalytics.snowplow.micro/TestAdapterRegistry.scala} (97%) diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala index d09566f..3395afa 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Configuration.scala @@ -21,11 +21,13 @@ import com.snowplowanalytics.iglu.client.resolver.registries.{JavaNetRegistryLoo import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} import com.snowplowanalytics.snowplow.collector.core.{Config => CollectorConfig} +import com.snowplowanalytics.snowplow.enrich.common.adapters.{CallrailSchemas, CloudfrontAccessLogSchemas, GoogleAnalyticsSchemas, HubspotSchemas, MailchimpSchemas, MailgunSchemas, MandrillSchemas, MarketoSchemas, OlarkSchemas, PagerdutySchemas, PingdomSchemas, SendgridSchemas, StatusGatorSchemas, UnbounceSchemas, UrbanAirshipSchemas, VeroSchemas, AdaptersSchemas => EnrichAdaptersSchemas} import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf import com.typesafe.config.{ConfigFactory, ConfigParseOptions, Config => TypesafeConfig} import fs2.io.file.{Files, Path => FS2Path} import io.circe.config.syntax.CirceConfigOps +import io.circe.generic.semiauto.deriveDecoder import io.circe.syntax.EncoderOps import io.circe.{Decoder, Json} import org.typelevel.log4cats.Logger @@ -38,14 +40,14 @@ object Configuration { object Cli { final case class Config(collector: Option[Path], iglu: Option[Path], outputEnrichedTsv: Boolean) - + private val collector = Opts.option[Path]("collector-config", "Path to HOCON configuration (optional)", "c", "config.hocon").orNone private val iglu = Opts.option[Path]("iglu", "Configuration file for Iglu Client", "i", "iglu.json").orNone private val outputEnrichedTsv = Opts.flag("output-tsv", "Print events in TSV format to standard output", "t").orFalse - - val config: Opts[Config] = (collector, iglu, outputEnrichedTsv).mapN(Config.apply) + + val config: Opts[Config] = (collector, iglu, outputEnrichedTsv).mapN(Config.apply) } - + object EnvironmentVariables { val igluRegistryUrl = "MICRO_IGLU_REGISTRY_URL" @@ -54,14 +56,18 @@ object Configuration { } final case class DummySinkConfig() - type SinkConfig = DummySinkConfig + + type SinkConfig = DummySinkConfig implicit val dec: Decoder[DummySinkConfig] = Decoder.instance(_ => Right(DummySinkConfig())) final case class MicroConfig(collector: CollectorConfig[SinkConfig], iglu: IgluResources, enrichmentsConfig: List[EnrichmentConf], + adaptersSchemas: AdaptersSchemas, outputEnrichedTsv: Boolean) + final case class AdaptersSchemas(adaptersSchemas: EnrichAdaptersSchemas) + final case class IgluResources(resolver: Resolver[IO], client: IgluCirceClient[IO]) implicit private def logger: Logger[IO] = Slf4jLogger.getLogger[IO] @@ -72,7 +78,8 @@ object Configuration { collectorConfig <- loadCollectorConfig(cliConfig.collector) igluResources <- loadIgluResources(cliConfig.iglu) enrichmentsConfig <- loadEnrichmentConfig(igluResources.client) - } yield MicroConfig(collectorConfig, igluResources, enrichmentsConfig, cliConfig.outputEnrichedTsv) + adaptersSchemas <- loadAdaptersSchemas() + } yield MicroConfig(collectorConfig, igluResources, enrichmentsConfig, adaptersSchemas, cliConfig.outputEnrichedTsv) } } @@ -99,12 +106,19 @@ object Configuration { asJson <- loadEnrichmentsAsSDD(path, igluClient, fileType = ".json") asHocon <- loadEnrichmentsAsSDD(path, igluClient, fileType = ".hocon") asJSScripts <- loadJSScripts(path) - } yield asJson ::: asHocon ::: asJSScripts + } yield asJson ::: asHocon ::: asJSScripts case None => EitherT.rightT[IO, String](List.empty) } } + private def loadAdaptersSchemas(): EitherT[IO, String, AdaptersSchemas] = { + val resolveOrder = (config: TypesafeConfig) => ConfigFactory.load(config) + + //It's not configurable in micro, we load it from reference.conf provided by enrich + loadConfig[AdaptersSchemas](path = None, resolveOrder) + } + private def buildIgluResources(resolverConfig: ResolverConfig): EitherT[IO, String, IgluResources] = for { resolver <- Resolver.fromConfig[IO](resolverConfig).leftMap(_.show) @@ -112,7 +126,7 @@ object Configuration { client <- EitherT.liftF(IgluCirceClient.fromResolver[IO](completeResolver, resolverConfig.cacheSize)) } yield IgluResources(resolver, client) - private def loadEnrichmentsAsSDD(enrichmentsDirectory: Path, + private def loadEnrichmentsAsSDD(enrichmentsDirectory: Path, igluClient: IgluCirceClient[IO], fileType: String): EitherT[IO, String, List[EnrichmentConf]] = { listAvailableEnrichments(enrichmentsDirectory, fileType) @@ -129,7 +143,7 @@ object Configuration { } private def buildJSConfig(script: FS2Path): IO[EnrichmentConf.JavascriptScriptConf] = { - val schemaKey = SchemaKey("com.snowplowanalytics.snowplow", "javascript_script_config", "jsonschema", SchemaVer.Full(1, 0, 0)) + val schemaKey = SchemaKey("com.snowplowanalytics.snowplow", "javascript_script_config", "jsonschema", SchemaVer.Full(1, 0, 0)) Files[IO] .readUtf8Lines(script) .compile @@ -195,7 +209,7 @@ object Configuration { private def handleInputPath(path: Option[Path]): TypesafeConfig = { path match { - case Some(definedPath) => + case Some(definedPath) => //Fail when provided file doesn't exist ConfigFactory.parseFile(definedPath.toFile, ConfigParseOptions.defaults().setAllowMissing(false)) case None => ConfigFactory.empty() @@ -212,4 +226,41 @@ object Configuration { implicit val resolverDecoder: Decoder[ResolverConfig] = Decoder.decodeJson.emap(json => Resolver.parseConfig(json).leftMap(_.show)) + implicit val adaptersSchemasDecoder: Decoder[AdaptersSchemas] = + deriveDecoder[AdaptersSchemas] + implicit val enrichAdaptersSchemasDecoder: Decoder[EnrichAdaptersSchemas] = + deriveDecoder[EnrichAdaptersSchemas] + implicit val callrailSchemasDecoder: Decoder[CallrailSchemas] = + deriveDecoder[CallrailSchemas] + implicit val cloudfrontAccessLogSchemasDecoder: Decoder[CloudfrontAccessLogSchemas] = + deriveDecoder[CloudfrontAccessLogSchemas] + implicit val googleAnalyticsSchemasDecoder: Decoder[GoogleAnalyticsSchemas] = + deriveDecoder[GoogleAnalyticsSchemas] + implicit val hubspotSchemasDecoder: Decoder[HubspotSchemas] = + deriveDecoder[HubspotSchemas] + implicit val mailchimpSchemasDecoder: Decoder[MailchimpSchemas] = + deriveDecoder[MailchimpSchemas] + implicit val mailgunSchemasDecoder: Decoder[MailgunSchemas] = + deriveDecoder[MailgunSchemas] + implicit val mandrillSchemasDecoder: Decoder[MandrillSchemas] = + deriveDecoder[MandrillSchemas] + implicit val marketoSchemasDecoder: Decoder[MarketoSchemas] = + deriveDecoder[MarketoSchemas] + implicit val olarkSchemasDecoder: Decoder[OlarkSchemas] = + deriveDecoder[OlarkSchemas] + implicit val pagerdutySchemasDecoder: Decoder[PagerdutySchemas] = + deriveDecoder[PagerdutySchemas] + implicit val pingdomSchemasDecoder: Decoder[PingdomSchemas] = + deriveDecoder[PingdomSchemas] + implicit val sendgridSchemasDecoder: Decoder[SendgridSchemas] = + deriveDecoder[SendgridSchemas] + implicit val statusgatorSchemasDecoder: Decoder[StatusGatorSchemas] = + deriveDecoder[StatusGatorSchemas] + implicit val unbounceSchemasDecoder: Decoder[UnbounceSchemas] = + deriveDecoder[UnbounceSchemas] + implicit val urbanAirshipSchemasDecoder: Decoder[UrbanAirshipSchemas] = + deriveDecoder[UrbanAirshipSchemas] + implicit val veroSchemasDecoder: Decoder[VeroSchemas] = + deriveDecoder[VeroSchemas] + } diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala index c6ac1e2..e335027 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala @@ -18,6 +18,7 @@ import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLook import com.snowplowanalytics.snowplow.badrows.Processor import com.snowplowanalytics.snowplow.collector.core._ import com.snowplowanalytics.snowplow.collector.core.model.Sinks +import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, EnrichmentConf} import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution} @@ -55,7 +56,7 @@ object Run { sslContext <- Resource.eval(setupSSLContext()) enrichmentRegistry <- buildEnrichmentRegistry(config.enrichmentsConfig) badProcessor = Processor(BuildInfo.name, BuildInfo.version) - adapterRegistry = MicroAdapterRegistry.create() + adapterRegistry = new AdapterRegistry[IO](Map.empty, config.adaptersSchemas.adaptersSchemas) lookup = JavaNetRegistryLookup.ioLookupInstance[IO] sink = new MemorySink(config.iglu.client, lookup, enrichmentRegistry, config.outputEnrichedTsv, badProcessor, adapterRegistry) collectorService = new Service[IO]( diff --git a/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala b/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala index dfaa100..d500be0 100644 --- a/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala +++ b/src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala @@ -18,6 +18,7 @@ import com.snowplowanalytics.iglu.client.IgluCirceClient import com.snowplowanalytics.iglu.client.resolver.Resolver import com.snowplowanalytics.iglu.client.resolver.registries.{JavaNetRegistryLookup, Registry} import com.snowplowanalytics.snowplow.badrows.Processor +import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry import org.specs2.mutable.SpecificationLike @@ -166,7 +167,7 @@ class MemorySinkSpec extends CatsResource[IO, MemorySink] with SpecificationLike igluClient <- IgluCirceClient.fromResolver[IO](Resolver(List(Registry.IgluCentral), None), 500) enrichmentRegistry = new EnrichmentRegistry[IO]() processor = Processor(BuildInfo.name, BuildInfo.version) - adapterRegistry = MicroAdapterRegistry.create() + adapterRegistry = new AdapterRegistry[IO](Map.empty, TestAdapterRegistry.adaptersSchemas) lookup = JavaNetRegistryLookup.ioLookupInstance[IO] } yield new MemorySink(igluClient, lookup, enrichmentRegistry, false, processor, adapterRegistry) } diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroAdapterRegistry.scala b/src/test/scala/com.snowplowanalytics.snowplow.micro/TestAdapterRegistry.scala similarity index 97% rename from src/main/scala/com.snowplowanalytics.snowplow.micro/MicroAdapterRegistry.scala rename to src/test/scala/com.snowplowanalytics.snowplow.micro/TestAdapterRegistry.scala index a4c9c5f..cc675e4 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroAdapterRegistry.scala +++ b/src/test/scala/com.snowplowanalytics.snowplow.micro/TestAdapterRegistry.scala @@ -1,11 +1,10 @@ package com.snowplowanalytics.snowplow.micro -import cats.effect.IO import com.snowplowanalytics.snowplow.enrich.common.adapters._ -object MicroAdapterRegistry { - - private val adaptersSchemas = AdaptersSchemas( +object TestAdapterRegistry { + + val adaptersSchemas = AdaptersSchemas( CallrailSchemas("iglu:com.callrail/call_complete/jsonschema/1-0-2"), CloudfrontAccessLogSchemas( "iglu:com.amazon.aws.cloudfront/wd_access_log/jsonschema/1-0-2", @@ -151,7 +150,4 @@ object MicroAdapterRegistry { ) ) - def create(): AdapterRegistry[IO] = { - new AdapterRegistry[IO](Map.empty, adaptersSchemas) - } }