Skip to content

Commit

Permalink
Add timeout (close #825)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Oct 27, 2023
1 parent 29f2672 commit 1dcb0e1
Show file tree
Hide file tree
Showing 33 changed files with 558 additions and 210 deletions.
8 changes: 8 additions & 0 deletions config/config.file.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,12 @@
# Try to base64 decode event if initial Thrift serialization fail
"tryBase64Decoding": false
}

# Timeouts for the processing of events
"timeouts": {
# If UA parser enrichment takes longer than this, an EnrichmentFailures bad row is emitted
"uaParserEnrichment": "1 minute"
# If enriching an event (includes validating) takes longer than this, an EnrichmentFailures bad row is emitted
"enriching": "5 minutes"
}
}
8 changes: 8 additions & 0 deletions config/config.kafka.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,12 @@
"pipelineId": "75a13583-5c99-40e3-81fc-541084dfc784"
}
}

# Timeouts for the processing of events
"timeouts": {
# If UA parser enrichment takes longer than this, an EnrichmentFailures bad row is emitted
"uaParserEnrichment": "1 minute"
# If enriching an event (includes validating) takes longer than this, an EnrichmentFailures bad row is emitted
"enriching": "5 minutes"
}
}
8 changes: 8 additions & 0 deletions config/config.kinesis.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -361,4 +361,12 @@
"pipelineId": "75a13583-5c99-40e3-81fc-541084dfc784"
}
}

# Timeouts for the processing of events
"timeouts": {
# If UA parser enrichment takes longer than this, an EnrichmentFailures bad row is emitted
"uaParserEnrichment": "1 minute"
# If enriching an event (includes validating) takes longer than this, an EnrichmentFailures bad row is emitted
"enriching": "5 minutes"
}
}
8 changes: 8 additions & 0 deletions config/config.nsq.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,12 @@
"pipelineId": "75a13583-5c99-40e3-81fc-541084dfc784"
}
}

# Timeouts for the processing of events
"timeouts": {
# If UA parser enrichment takes longer than this, an EnrichmentFailures bad row is emitted
"uaParserEnrichment": "1 minute"
# If enriching an event (includes validating) takes longer than this, an EnrichmentFailures bad row is emitted
"enriching": "5 minutes"
}
}
8 changes: 8 additions & 0 deletions config/config.pubsub.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,12 @@
"pipelineId": "75a13583-5c99-40e3-81fc-541084dfc784"
}
}

# Timeouts for the processing of events
"timeouts": {
# If UA parser enrichment takes longer than this, an EnrichmentFailures bad row is emitted
"uaParserEnrichment": "1 minute"
# If enriching an event (includes validating) takes longer than this, an EnrichmentFailures bad row is emitted
"enriching": "5 minutes"
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023 Snowplow Analytics Ltd. All rights reserved.
* Copyright (c) 2020-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
Expand Down Expand Up @@ -40,13 +40,16 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger

import com.snowplowanalytics.iglu.client.IgluCirceClient
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup

import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Processor, Payload => BadRowPayload}

import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.loaders.{CollectorPayload, ThriftLoader}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{EnrichmentRegistry, Timeouts}
import com.snowplowanalytics.snowplow.enrich.common.utils.ConversionUtils

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.FeatureFlags

object Enrich {
Expand All @@ -73,7 +76,8 @@ object Enrich {
env.sentry,
env.processor,
env.featureFlags,
env.metrics.invalidCount
env.metrics.invalidCount,
env.timeouts
)
}

Expand Down Expand Up @@ -112,14 +116,15 @@ object Enrich {
* Enrich a single `CollectorPayload` to get list of bad rows and/or enriched events
* @return enriched event or bad row, along with the collector timestamp
*/
def enrichWith[F[_]: Clock: ContextShift: RegistryLookup: Sync](
def enrichWith[F[_]: Clock: Concurrent: ContextShift: RegistryLookup: Timer](
enrichRegistry: F[EnrichmentRegistry[F]],
adapterRegistry: AdapterRegistry[F],
igluClient: IgluCirceClient[F],
sentry: Option[SentryClient],
processor: Processor,
featureFlags: FeatureFlags,
invalidCount: F[Unit]
invalidCount: F[Unit],
timeouts: Timeouts
)(
row: Array[Byte]
): F[Result] = {
Expand All @@ -130,16 +135,18 @@ object Enrich {
for {
etlTstamp <- Clock[F].realTime(TimeUnit.MILLISECONDS).map(millis => new DateTime(millis))
registry <- enrichRegistry
enriched <- EtlPipeline.processEvents[F](
adapterRegistry,
registry,
igluClient,
processor,
etlTstamp,
payload,
FeatureFlags.toCommon(featureFlags),
invalidCount
)
enriched <- EtlPipeline
.processEvents[F](
adapterRegistry,
registry,
igluClient,
processor,
etlTstamp,
payload,
FeatureFlags.toCommon(featureFlags),
invalidCount,
timeouts
)
} yield (enriched, collectorTstamp)

result.handleErrorWith(sendToSentry[F](row, sentry, processor, collectorTstamp))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023 Snowplow Analytics Ltd. All rights reserved.
* Copyright (c) 2020-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
Expand All @@ -12,13 +12,16 @@
*/
package com.snowplowanalytics.snowplow.enrich.common.fs2

import java.util.concurrent.TimeoutException

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

import cats.Show
import cats.data.EitherT
import cats.implicits._

import cats.effect.{Async, Blocker, Clock, ConcurrentEffect, ContextShift, ExitCase, Resource, Sync, Timer}
import cats.effect.{Async, Blocker, Clock, Concurrent, ConcurrentEffect, ContextShift, ExitCase, Resource, Sync, Timer}
import cats.effect.concurrent.{Ref, Semaphore}

import fs2.Stream
Expand All @@ -27,21 +30,24 @@ import _root_.io.sentry.{Sentry, SentryClient}

import org.http4s.client.{Client => Http4sClient}
import org.http4s.Status

import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import com.snowplowanalytics.iglu.client.IgluCirceClient
import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, RegistryLookup}

import com.snowplowanalytics.snowplow.badrows.Processor

import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.adapters.registry.RemoteAdapter
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{EnrichmentRegistry, Timeouts}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution}

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{ConfigFile, ParsedConfigs}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Input.Kinesis
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{
Cloud,
Concurrency,
Expand All @@ -53,11 +59,6 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{Clients, Metrics}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.experimental.Metadata

import scala.concurrent.ExecutionContext
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Input.Kinesis

import java.util.concurrent.TimeoutException

/**
* All allocated resources, configs and mutable variables necessary for running Enrich process
* Also responsiblle for initial assets downloading (during `assetsState` initialisation)
Expand Down Expand Up @@ -93,6 +94,7 @@ import java.util.concurrent.TimeoutException
* @param region region in the cloud where enrich runs
* @param cloud cloud where enrich runs (AWS or GCP)
* @param featureFlags Feature flags for the current version of enrich, liable to change in future versions.
* @param timeouts Configured timeouts for the enriching step
* @tparam A type emitted by the source (e.g. `ConsumerRecord` for PubSub).
* getPayload must be defined for this type, as well as checkpointing
*/
Expand Down Expand Up @@ -125,7 +127,8 @@ final case class Environment[F[_], A](
streamsSettings: Environment.StreamsSettings,
region: Option[String],
cloud: Option[Cloud],
featureFlags: FeatureFlags
featureFlags: FeatureFlags,
timeouts: Timeouts
)

object Environment {
Expand All @@ -134,7 +137,7 @@ object Environment {
Slf4jLogger.getLogger[F]

/** Registry with all allocated clients (MaxMind, IAB etc) and their original configs */
final case class Enrichments[F[_]: Async: Clock: ContextShift](
final case class Enrichments[F[_]: Concurrent: Clock: ContextShift: Timer](
registry: EnrichmentRegistry[F],
configs: List[EnrichmentConf],
httpClient: HttpClient[F]
Expand All @@ -146,7 +149,7 @@ object Environment {
}

object Enrichments {
def make[F[_]: Async: Clock: ContextShift](
def make[F[_]: Clock: Concurrent: ContextShift: Timer](
configs: List[EnrichmentConf],
blocker: Blocker,
shifter: ShiftExecution[F],
Expand All @@ -159,7 +162,7 @@ object Environment {
} yield ref
}

def buildRegistry[F[_]: Async: Clock: ContextShift](
def buildRegistry[F[_]: Clock: Concurrent: ContextShift: Timer](
configs: List[EnrichmentConf],
blocker: Blocker,
shifter: ShiftExecution[F],
Expand Down Expand Up @@ -187,7 +190,8 @@ object Environment {
maxRecordSize: Int,
cloud: Option[Cloud],
getRegion: => Option[String],
featureFlags: FeatureFlags
featureFlags: FeatureFlags,
timeouts: Timeouts
): Resource[F, Environment[F, A]] = {
val file = parsedConfigs.configFile
for {
Expand Down Expand Up @@ -239,7 +243,8 @@ object Environment {
StreamsSettings(file.concurrency, maxRecordSize),
getRegionFromConfig(file).orElse(getRegion),
cloud,
featureFlags
featureFlags,
timeouts
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023 Snowplow Analytics Ltd. All rights reserved.
* Copyright (c) 2021-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
Expand Down Expand Up @@ -98,7 +98,8 @@ object Run {
maxRecordSize,
cloud,
getRegion,
file.featureFlags
file.featureFlags,
file.timeouts
)
runEnvironment[F, Array[Byte]](env)
case input =>
Expand Down Expand Up @@ -128,7 +129,8 @@ object Run {
maxRecordSize,
cloud,
getRegion,
file.featureFlags
file.featureFlags,
file.timeouts
)
runEnvironment[F, A](env)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import _root_.io.circe.generic.extras.semiauto.{deriveConfiguredDecoder, deriveC

import com.typesafe.config.{ConfigFactory, Config => TSConfig}

import com.snowplowanalytics.snowplow.enrich.common.enrichments.Timeouts
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdaptersSchemas
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io._

Expand All @@ -38,6 +39,7 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io._
* @param featureFlags to activate/deactivate enrich features
* @param experimental configuration for experimental features
* @param adaptersSchemas configuration for adapters
* @param timeouts configuration for the timeouts of the enriching step
*/
final case class ConfigFile(
input: Input,
Expand All @@ -49,7 +51,8 @@ final case class ConfigFile(
telemetry: Telemetry,
featureFlags: FeatureFlags,
experimental: Option[Experimental],
adaptersSchemas: AdaptersSchemas
adaptersSchemas: AdaptersSchemas,
timeouts: Timeouts
)

object ConfigFile {
Expand All @@ -59,18 +62,24 @@ object ConfigFile {
implicit val finiteDurationEncoder: Encoder[FiniteDuration] =
implicitly[Encoder[String]].contramap(_.toString)

implicit val timeoutsDecoder: Decoder[Timeouts] =
deriveConfiguredDecoder[Timeouts]
implicit val timeoutsEncoder: Encoder[Timeouts] =
deriveConfiguredEncoder[Timeouts]

implicit val configFileDecoder: Decoder[ConfigFile] =
deriveConfiguredDecoder[ConfigFile].emap {
case ConfigFile(_, _, _, Some(aup), _, _, _, _, _, _) if aup._1 <= 0L =>
case ConfigFile(_, _, _, Some(aup), _, _, _, _, _, _, _) if aup._1 <= 0L =>
"assetsUpdatePeriod in config file cannot be less than 0".asLeft // TODO: use newtype
// Remove pii output if streamName and region empty
case c @ ConfigFile(_, Outputs(good, Some(output: Output.Kinesis), bad), _, _, _, _, _, _, _, _) if output.streamName.isEmpty =>
case c @ ConfigFile(_, Outputs(good, Some(output: Output.Kinesis), bad), _, _, _, _, _, _, _, _, _) if output.streamName.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
// Remove pii output if topic empty
case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _)), bad), _, _, _, _, _, _, _, _) if t.isEmpty =>
case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _)), bad), _, _, _, _, _, _, _, _, _) if t.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
// Remove pii output if topic empty
case c @ ConfigFile(_, Outputs(good, Some(Output.Kafka(topicName, _, _, _, _)), bad), _, _, _, _, _, _, _, _) if topicName.isEmpty =>
case c @ ConfigFile(_, Outputs(good, Some(Output.Kafka(topicName, _, _, _, _)), bad), _, _, _, _, _, _, _, _, _)
if topicName.isEmpty =>
c.copy(output = Outputs(good, None, bad)).asRight
case other => other.asRight
}
Expand Down
Loading

0 comments on commit 1dcb0e1

Please sign in to comment.