Skip to content

Commit

Permalink
Validate atomic fields lengths with Enrich defaults
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Nov 21, 2024
1 parent e8766d0 commit cda9213
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 13 deletions.
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object Dependencies {
object V {
// Snowplow
val snowplowStreamCollector = "3.2.0"
val snowplowCommonEnrich = "4.2.0"
val snowplowCommonEnrich = "5.0.0-1-1e8b01b4-20241119-1547-SNAPSHOT"
val http4sCirce = "0.23.23"

val decline = "2.4.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import com.snowplowanalytics.iglu.client.resolver.registries.{JavaNetRegistryLoo
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.snowplow.collector.core.{Config => CollectorConfig}
import com.snowplowanalytics.snowplow.enrich.common.adapters.{CallrailSchemas, CloudfrontAccessLogSchemas, GoogleAnalyticsSchemas, HubspotSchemas, MailchimpSchemas, MailgunSchemas, MandrillSchemas, MarketoSchemas, OlarkSchemas, PagerdutySchemas, PingdomSchemas, SendgridSchemas, StatusGatorSchemas, UnbounceSchemas, UrbanAirshipSchemas, VeroSchemas, AdaptersSchemas => EnrichAdaptersSchemas}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.adapters.{AdaptersSchemas => EnrichAdaptersSchemas, CallrailSchemas, CloudfrontAccessLogSchemas, GoogleAnalyticsSchemas, HubspotSchemas, MailchimpSchemas, MailgunSchemas, MandrillSchemas, MarketoSchemas, OlarkSchemas, PagerdutySchemas, PingdomSchemas, SendgridSchemas, StatusGatorSchemas, UnbounceSchemas, UrbanAirshipSchemas, VeroSchemas}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, EnrichmentRegistry}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.typesafe.config.{ConfigFactory, ConfigParseOptions, Config => TypesafeConfig}
import fs2.io.file.{Files, Path => FS2Path}
Expand Down Expand Up @@ -63,10 +63,14 @@ object Configuration {
final case class MicroConfig(collector: CollectorConfig[SinkConfig],
iglu: IgluResources,
enrichmentsConfig: List[EnrichmentConf],
adaptersSchemas: AdaptersSchemas,
enrichConfig: EnrichConfig,
outputEnrichedTsv: Boolean)

final case class AdaptersSchemas(adaptersSchemas: EnrichAdaptersSchemas)
case class EnrichValidation(atomicFieldsLimits: AtomicFields)
final case class EnrichConfig(
adaptersSchemas: EnrichAdaptersSchemas,
validation: EnrichValidation
)

final case class IgluResources(resolver: Resolver[IO], client: IgluCirceClient[IO])

Expand Down Expand Up @@ -112,11 +116,11 @@ object Configuration {
}
}

private def loadAdaptersSchemas(): EitherT[IO, String, AdaptersSchemas] = {
private def loadAdaptersSchemas(): EitherT[IO, String, EnrichConfig] = {
val resolveOrder = (config: TypesafeConfig) => ConfigFactory.load(config)

//It's not configurable in micro, we load it from reference.conf provided by enrich
loadConfig[AdaptersSchemas](path = None, resolveOrder)
loadConfig[EnrichConfig](path = None, resolveOrder)
}

private def buildIgluResources(resolverConfig: ResolverConfig): EitherT[IO, String, IgluResources] =
Expand Down Expand Up @@ -226,8 +230,8 @@ object Configuration {

implicit val resolverDecoder: Decoder[ResolverConfig] = Decoder.decodeJson.emap(json => Resolver.parseConfig(json).leftMap(_.show))

implicit val adaptersSchemasDecoder: Decoder[AdaptersSchemas] =
deriveDecoder[AdaptersSchemas]
implicit val enrichConfigDecoder: Decoder[EnrichConfig] =
deriveDecoder[EnrichConfig]
implicit val enrichAdaptersSchemasDecoder: Decoder[EnrichAdaptersSchemas] =
deriveDecoder[EnrichAdaptersSchemas]
implicit val callrailSchemasDecoder: Decoder[CallrailSchemas] =
Expand Down Expand Up @@ -263,4 +267,18 @@ object Configuration {
implicit val veroSchemasDecoder: Decoder[VeroSchemas] =
deriveDecoder[VeroSchemas]

implicit val validationDecoder: Decoder[EnrichValidation] =
deriveDecoder[EnrichValidation]
implicit val atomicFieldsDecoder: Decoder[AtomicFields] = Decoder[Map[String, Int]].emap { fieldsLimits =>
val configuredFields = fieldsLimits.keys.toList
val supportedFields = AtomicFields.supportedFields.map(_.name)
val unsupportedFields = configuredFields.diff(supportedFields)

if (unsupportedFields.nonEmpty)
Left(s"""
|Configured atomic fields: ${unsupportedFields.mkString("[", ",", "]")} are not supported.
|Supported fields: ${supportedFields.mkString("[", ",", "]")}""".stripMargin)
else
Right(AtomicFields.from(fieldsLimits))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ final class MemorySink(igluClient: IgluCirceClient[IO],
enrichmentRegistry: EnrichmentRegistry[IO],
outputEnrichedTsv: Boolean,
processor: Processor,
adapterRegistry: AdapterRegistry[IO]) extends Sink[IO] {
adapterRegistry: AdapterRegistry[IO],
atomicFieldsLengths: AtomicFields) extends Sink[IO] {
override val maxBytes = Int.MaxValue
private lazy val logger = LoggerFactory.getLogger("EventLog")

Expand Down Expand Up @@ -135,8 +136,10 @@ final class MemorySink(igluClient: IgluCirceClient[IO],
EtlPipeline.FeatureFlags(acceptInvalid = false, legacyEnrichmentOrder = false),
IO.unit,
registryLookup,
AtomicFields.from(Map.empty)
atomicFieldsLengths,
emitIncomplete = false
)
.toEither
.subflatMap { enriched =>
EventConverter.fromEnriched(enriched)
.leftMap { failure =>
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ object Run {
sslContext <- Resource.eval(setupSSLContext())
enrichmentRegistry <- buildEnrichmentRegistry(config.enrichmentsConfig)
badProcessor = Processor(BuildInfo.name, BuildInfo.version)
adapterRegistry = new AdapterRegistry[IO](Map.empty, config.adaptersSchemas.adaptersSchemas)
adapterRegistry = new AdapterRegistry[IO](Map.empty, config.enrichConfig.adaptersSchemas)
lookup = JavaNetRegistryLookup.ioLookupInstance[IO]
sink = new MemorySink(config.iglu.client, lookup, enrichmentRegistry, config.outputEnrichedTsv, badProcessor, adapterRegistry)
sink = new MemorySink(config.iglu.client, lookup, enrichmentRegistry, config.outputEnrichedTsv, badProcessor, adapterRegistry, config.enrichConfig.validation.atomicFieldsLimits)
collectorService = new Service[IO](
config.collector,
Sinks(sink, sink),
Expand Down

0 comments on commit cda9213

Please sign in to comment.