Skip to content

Commit

Permalink
Repeater: PubSub ack extensions should match backoff delay
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Apr 12, 2024
1 parent b1ff197 commit 191d9b3
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ object Flow {
.as(Inserted.asInstanceOf[InsertStatus])
.value
} else {
Logger[F].debug(s"Event ${event.value.eventId}/${event.value.etlTstamp} is not ready yet. Nack") >>
event.nack.as(Retry.asInstanceOf[InsertStatus].asRight)
Logger[F]
.debug(
s"Event ${event.value.eventId}/${event.value.etlTstamp} is not ready yet. Ignoring it so PubSub re-sends it later."
)
.as(Retry.asInstanceOf[InsertStatus].asRight)
}
}
} yield result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import com.snowplowanalytics.snowplow.badrows.Processor
import cats.effect._
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import scala.concurrent.duration.DurationInt

object Repeater extends IOApp {

Expand All @@ -34,7 +35,8 @@ object Repeater extends IOApp {
resources.env.projectId,
resources.env.config.input.subscription,
resources.uninsertable,
resources.env.gcpUserAgent
resources.env.gcpUserAgent,
command.backoffPeriod.seconds
)
.interruptWhen(resources.stop)
.through[IO, Unit](Flow.sink(resources))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,26 @@
*/
package com.snowplowanalytics.snowplow.storage.bigquery.repeater.services

import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload}
import com.snowplowanalytics.snowplow.storage.bigquery.repeater.{EventContainer, Repeater}
import cats.effect._
import cats.effect.std.Queue
import cats.syntax.all._
import fs2.Stream
import org.typelevel.log4cats.Logger
import org.threeten.bp.{Duration => ThreetenDuration}
import com.google.pubsub.v1.PubsubMessage
import com.google.api.gax.core.ExecutorProvider
import com.google.api.gax.batching.FlowControlSettings
import com.google.common.util.concurrent.{ForwardingListeningExecutorService, MoreExecutors}

import com.permutive.pubsub.consumer.{ConsumerRecord, Model}
import com.permutive.pubsub.consumer.grpc.{PubsubGoogleConsumer, PubsubGoogleConsumerConfig}
import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Payload}
import com.snowplowanalytics.snowplow.storage.bigquery.common.config.AllAppsConfig.GcpUserAgent
import com.snowplowanalytics.snowplow.storage.bigquery.common.createGcpUserAgentHeader
import fs2.Stream
import org.typelevel.log4cats.Logger
import com.snowplowanalytics.snowplow.storage.bigquery.repeater.{EventContainer, Repeater}

import scala.concurrent.duration.{DurationInt, FiniteDuration}
import java.util.concurrent.{Callable, ScheduledExecutorService, ScheduledFuture, ScheduledThreadPoolExecutor, TimeUnit}

/** Module responsible for reading Pub/Sub */
object PubSub {
Expand All @@ -33,18 +41,72 @@ object PubSub {
projectId: String,
subscription: String,
uninsertable: Queue[F, BadRow],
gcpUserAgent: GcpUserAgent
gcpUserAgent: GcpUserAgent,
backoffPeriod: FiniteDuration
): Stream[F, ConsumerRecord[F, EventContainer]] =
PubsubGoogleConsumer.subscribe[F, EventContainer](
Model.ProjectId(projectId),
Model.Subscription(subscription),
(msg, err, ack, _) => callback[F](msg, err, ack, uninsertable),
PubsubGoogleConsumerConfig[F](
onFailedTerminate = t => Logger[F].error(s"Terminating consumer due to $t"),
customizeSubscriber = Some(_.setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent)))
onFailedTerminate = t => Logger[F].error(s"Terminating consumer due to $t"),
customizeSubscriber = Some {
_.setHeaderProvider(createGcpUserAgentHeader(gcpUserAgent))
.setMaxAckExtensionPeriod(convertDuration(backoffPeriod.min(1.hour)))
.setMinDurationPerAckExtension(convertDuration(backoffPeriod.min(600.seconds).minus(1.second)))
.setExecutorProvider {
new ExecutorProvider {
def shouldAutoClose: Boolean = true
def getExecutor: ScheduledExecutorService = scheduledExecutorService
}
}
.setFlowControlSettings {
// Switch off any flow control, because we handle it ourselves via fs2's backpressure
FlowControlSettings.getDefaultInstance
}
}
)
)

private def convertDuration(d: FiniteDuration): ThreetenDuration =
ThreetenDuration.ofMillis(d.toMillis)

def scheduledExecutorService: ScheduledExecutorService =
new ForwardingListeningExecutorService with ScheduledExecutorService {
val delegate = MoreExecutors.newDirectExecutorService
lazy val scheduler = new ScheduledThreadPoolExecutor(1) // I think this scheduler is never used, but I implement it here for safety
override def schedule[V](
callable: Callable[V],
delay: Long,
unit: TimeUnit
): ScheduledFuture[V] =
scheduler.schedule(callable, delay, unit)
override def schedule(
runnable: Runnable,
delay: Long,
unit: TimeUnit
): ScheduledFuture[_] =
scheduler.schedule(runnable, delay, unit)
override def scheduleAtFixedRate(
runnable: Runnable,
initialDelay: Long,
period: Long,
unit: TimeUnit
): ScheduledFuture[_] =
scheduler.scheduleAtFixedRate(runnable, initialDelay, period, unit)
override def scheduleWithFixedDelay(
runnable: Runnable,
initialDelay: Long,
delay: Long,
unit: TimeUnit
): ScheduledFuture[_] =
scheduler.scheduleWithFixedDelay(runnable, initialDelay, delay, unit)
override def shutdown(): Unit = {
delegate.shutdown()
scheduler.shutdown()
}
}

private def callback[F[_]: Sync](msg: PubsubMessage, err: Throwable, ack: F[Unit], uninsertable: Queue[F, BadRow]) = {
val info = FailureDetails.LoaderRecoveryError.ParsingError(err.toString, Nil)
val failure = Failure.LoaderRecoveryFailure(info)
Expand Down

0 comments on commit 191d9b3

Please sign in to comment.