Skip to content

Commit

Permalink
Load default adapters schemas provided by Enrich
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Feb 16, 2024
1 parent 85dcf10 commit 0d64df1
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import com.snowplowanalytics.iglu.client.resolver.registries.{JavaNetRegistryLoo
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.snowplow.collector.core.{Config => CollectorConfig}
import com.snowplowanalytics.snowplow.enrich.common.adapters.{CallrailSchemas, CloudfrontAccessLogSchemas, GoogleAnalyticsSchemas, HubspotSchemas, MailchimpSchemas, MailgunSchemas, MandrillSchemas, MarketoSchemas, OlarkSchemas, PagerdutySchemas, PingdomSchemas, SendgridSchemas, StatusGatorSchemas, UnbounceSchemas, UrbanAirshipSchemas, VeroSchemas, AdaptersSchemas => EnrichAdaptersSchemas}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.typesafe.config.{ConfigFactory, ConfigParseOptions, Config => TypesafeConfig}
import fs2.io.file.{Files, Path => FS2Path}
import io.circe.config.syntax.CirceConfigOps
import io.circe.generic.semiauto.deriveDecoder
import io.circe.syntax.EncoderOps
import io.circe.{Decoder, Json}
import org.typelevel.log4cats.Logger
Expand All @@ -38,14 +40,14 @@ object Configuration {

object Cli {
final case class Config(collector: Option[Path], iglu: Option[Path], outputEnrichedTsv: Boolean)

private val collector = Opts.option[Path]("collector-config", "Path to HOCON configuration (optional)", "c", "config.hocon").orNone
private val iglu = Opts.option[Path]("iglu", "Configuration file for Iglu Client", "i", "iglu.json").orNone
private val outputEnrichedTsv = Opts.flag("output-tsv", "Print events in TSV format to standard output", "t").orFalse
val config: Opts[Config] = (collector, iglu, outputEnrichedTsv).mapN(Config.apply)

val config: Opts[Config] = (collector, iglu, outputEnrichedTsv).mapN(Config.apply)
}


object EnvironmentVariables {
val igluRegistryUrl = "MICRO_IGLU_REGISTRY_URL"
Expand All @@ -54,14 +56,18 @@ object Configuration {
}

final case class DummySinkConfig()
type SinkConfig = DummySinkConfig

type SinkConfig = DummySinkConfig
implicit val dec: Decoder[DummySinkConfig] = Decoder.instance(_ => Right(DummySinkConfig()))

final case class MicroConfig(collector: CollectorConfig[SinkConfig],
iglu: IgluResources,
enrichmentsConfig: List[EnrichmentConf],
adaptersSchemas: AdaptersSchemas,
outputEnrichedTsv: Boolean)

final case class AdaptersSchemas(adaptersSchemas: EnrichAdaptersSchemas)

final case class IgluResources(resolver: Resolver[IO], client: IgluCirceClient[IO])

implicit private def logger: Logger[IO] = Slf4jLogger.getLogger[IO]
Expand All @@ -72,7 +78,8 @@ object Configuration {
collectorConfig <- loadCollectorConfig(cliConfig.collector)
igluResources <- loadIgluResources(cliConfig.iglu)
enrichmentsConfig <- loadEnrichmentConfig(igluResources.client)
} yield MicroConfig(collectorConfig, igluResources, enrichmentsConfig, cliConfig.outputEnrichedTsv)
adaptersSchemas <- loadAdaptersSchemas()
} yield MicroConfig(collectorConfig, igluResources, enrichmentsConfig, adaptersSchemas, cliConfig.outputEnrichedTsv)
}
}

Expand All @@ -99,20 +106,27 @@ object Configuration {
asJson <- loadEnrichmentsAsSDD(path, igluClient, fileType = ".json")
asHocon <- loadEnrichmentsAsSDD(path, igluClient, fileType = ".hocon")
asJSScripts <- loadJSScripts(path)
} yield asJson ::: asHocon ::: asJSScripts
} yield asJson ::: asHocon ::: asJSScripts
case None =>
EitherT.rightT[IO, String](List.empty)
}
}

private def loadAdaptersSchemas(): EitherT[IO, String, AdaptersSchemas] = {
val resolveOrder = (config: TypesafeConfig) => ConfigFactory.load(config)

//It's not configurable in micro, we load it from reference.conf provided by enrich
loadConfig[AdaptersSchemas](path = None, resolveOrder)
}

private def buildIgluResources(resolverConfig: ResolverConfig): EitherT[IO, String, IgluResources] =
for {
resolver <- Resolver.fromConfig[IO](resolverConfig).leftMap(_.show)
completeResolver = resolver.copy(repos = resolver.repos ++ readIgluExtraRegistry())
client <- EitherT.liftF(IgluCirceClient.fromResolver[IO](completeResolver, resolverConfig.cacheSize))
} yield IgluResources(resolver, client)

private def loadEnrichmentsAsSDD(enrichmentsDirectory: Path,
private def loadEnrichmentsAsSDD(enrichmentsDirectory: Path,
igluClient: IgluCirceClient[IO],
fileType: String): EitherT[IO, String, List[EnrichmentConf]] = {
listAvailableEnrichments(enrichmentsDirectory, fileType)
Expand All @@ -129,7 +143,7 @@ object Configuration {
}

private def buildJSConfig(script: FS2Path): IO[EnrichmentConf.JavascriptScriptConf] = {
val schemaKey = SchemaKey("com.snowplowanalytics.snowplow", "javascript_script_config", "jsonschema", SchemaVer.Full(1, 0, 0))
val schemaKey = SchemaKey("com.snowplowanalytics.snowplow", "javascript_script_config", "jsonschema", SchemaVer.Full(1, 0, 0))
Files[IO]
.readUtf8Lines(script)
.compile
Expand Down Expand Up @@ -195,7 +209,7 @@ object Configuration {

private def handleInputPath(path: Option[Path]): TypesafeConfig = {
path match {
case Some(definedPath) =>
case Some(definedPath) =>
//Fail when provided file doesn't exist
ConfigFactory.parseFile(definedPath.toFile, ConfigParseOptions.defaults().setAllowMissing(false))
case None => ConfigFactory.empty()
Expand All @@ -212,4 +226,41 @@ object Configuration {

implicit val resolverDecoder: Decoder[ResolverConfig] = Decoder.decodeJson.emap(json => Resolver.parseConfig(json).leftMap(_.show))

implicit val adaptersSchemasDecoder: Decoder[AdaptersSchemas] =
deriveDecoder[AdaptersSchemas]
implicit val enrichAdaptersSchemasDecoder: Decoder[EnrichAdaptersSchemas] =
deriveDecoder[EnrichAdaptersSchemas]
implicit val callrailSchemasDecoder: Decoder[CallrailSchemas] =
deriveDecoder[CallrailSchemas]
implicit val cloudfrontAccessLogSchemasDecoder: Decoder[CloudfrontAccessLogSchemas] =
deriveDecoder[CloudfrontAccessLogSchemas]
implicit val googleAnalyticsSchemasDecoder: Decoder[GoogleAnalyticsSchemas] =
deriveDecoder[GoogleAnalyticsSchemas]
implicit val hubspotSchemasDecoder: Decoder[HubspotSchemas] =
deriveDecoder[HubspotSchemas]
implicit val mailchimpSchemasDecoder: Decoder[MailchimpSchemas] =
deriveDecoder[MailchimpSchemas]
implicit val mailgunSchemasDecoder: Decoder[MailgunSchemas] =
deriveDecoder[MailgunSchemas]
implicit val mandrillSchemasDecoder: Decoder[MandrillSchemas] =
deriveDecoder[MandrillSchemas]
implicit val marketoSchemasDecoder: Decoder[MarketoSchemas] =
deriveDecoder[MarketoSchemas]
implicit val olarkSchemasDecoder: Decoder[OlarkSchemas] =
deriveDecoder[OlarkSchemas]
implicit val pagerdutySchemasDecoder: Decoder[PagerdutySchemas] =
deriveDecoder[PagerdutySchemas]
implicit val pingdomSchemasDecoder: Decoder[PingdomSchemas] =
deriveDecoder[PingdomSchemas]
implicit val sendgridSchemasDecoder: Decoder[SendgridSchemas] =
deriveDecoder[SendgridSchemas]
implicit val statusgatorSchemasDecoder: Decoder[StatusGatorSchemas] =
deriveDecoder[StatusGatorSchemas]
implicit val unbounceSchemasDecoder: Decoder[UnbounceSchemas] =
deriveDecoder[UnbounceSchemas]
implicit val urbanAirshipSchemasDecoder: Decoder[UrbanAirshipSchemas] =
deriveDecoder[UrbanAirshipSchemas]
implicit val veroSchemasDecoder: Decoder[VeroSchemas] =
deriveDecoder[VeroSchemas]

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLook
import com.snowplowanalytics.snowplow.badrows.Processor
import com.snowplowanalytics.snowplow.collector.core._
import com.snowplowanalytics.snowplow.collector.core.model.Sinks
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, EnrichmentConf}
import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution}
Expand Down Expand Up @@ -55,7 +56,7 @@ object Run {
sslContext <- Resource.eval(setupSSLContext())
enrichmentRegistry <- buildEnrichmentRegistry(config.enrichmentsConfig)
badProcessor = Processor(BuildInfo.name, BuildInfo.version)
adapterRegistry = MicroAdapterRegistry.create()
adapterRegistry = new AdapterRegistry[IO](Map.empty, config.adaptersSchemas.adaptersSchemas)
lookup = JavaNetRegistryLookup.ioLookupInstance[IO]
sink = new MemorySink(config.iglu.client, lookup, enrichmentRegistry, config.outputEnrichedTsv, badProcessor, adapterRegistry)
collectorService = new Service[IO](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.snowplowanalytics.iglu.client.IgluCirceClient
import com.snowplowanalytics.iglu.client.resolver.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.{JavaNetRegistryLookup, Registry}
import com.snowplowanalytics.snowplow.badrows.Processor
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import org.specs2.mutable.SpecificationLike

Expand Down Expand Up @@ -166,7 +167,7 @@ class MemorySinkSpec extends CatsResource[IO, MemorySink] with SpecificationLike
igluClient <- IgluCirceClient.fromResolver[IO](Resolver(List(Registry.IgluCentral), None), 500)
enrichmentRegistry = new EnrichmentRegistry[IO]()
processor = Processor(BuildInfo.name, BuildInfo.version)
adapterRegistry = MicroAdapterRegistry.create()
adapterRegistry = new AdapterRegistry[IO](Map.empty, TestAdapterRegistry.adaptersSchemas)
lookup = JavaNetRegistryLookup.ioLookupInstance[IO]
} yield new MemorySink(igluClient, lookup, enrichmentRegistry, false, processor, adapterRegistry)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package com.snowplowanalytics.snowplow.micro

import cats.effect.IO
import com.snowplowanalytics.snowplow.enrich.common.adapters._

object MicroAdapterRegistry {
private val adaptersSchemas = AdaptersSchemas(
object TestAdapterRegistry {

val adaptersSchemas = AdaptersSchemas(
CallrailSchemas("iglu:com.callrail/call_complete/jsonschema/1-0-2"),
CloudfrontAccessLogSchemas(
"iglu:com.amazon.aws.cloudfront/wd_access_log/jsonschema/1-0-2",
Expand Down Expand Up @@ -151,7 +150,4 @@ object MicroAdapterRegistry {
)
)

def create(): AdapterRegistry[IO] = {
new AdapterRegistry[IO](Map.empty, adaptersSchemas)
}
}

0 comments on commit 0d64df1

Please sign in to comment.