From 5050bb6f3f591d5c10feccd288d988bda03ecf07 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Sat, 4 Dec 2021 20:41:23 +0000 Subject: [PATCH] PubSubSink use timeouts to set unhealthy sink (close #191) --- .../sinks/GooglePubSubSink.scala | 56 ++++++++++++++++--- 1 file changed, 47 insertions(+), 9 deletions(-) diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala index 4450985cf..964f7884c 100644 --- a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/GooglePubSubSink.scala @@ -14,6 +14,8 @@ package com.snowplowanalytics.snowplow.collectors.scalastream package sinks import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicReference +import java.time.Instant import com.google.api.core.{ApiFutureCallback, ApiFutures} import com.google.api.gax.batching.BatchingSettings import com.google.api.gax.retrying.RetrySettings @@ -33,16 +35,32 @@ import com.snowplowanalytics.snowplow.collectors.scalastream.model._ /** * Google PubSub Sink for the Scala Stream Collector */ -class GooglePubSubSink private (publisher: Publisher, topicName: String, throttler: Sink.Throttler) - extends Sink.Throttled(throttler) { +class GooglePubSubSink private ( + publisher: Publisher, + topicName: String, + throttler: Sink.Throttler, + healthTimeout: Duration +) extends Sink.Throttled(throttler) { + import GooglePubSubSink.OutageMonitor + private val logExecutor = Executors.newSingleThreadExecutor() // maximum size of a pubsub message is 10MB override val MaxBytes: Int = 10000000 + private val outage = new AtomicReference(OutageMonitor(true, None)) + // Is the collector detecting an outage downstream - @volatile private var outage: Boolean = false - override def isHealthy: Boolean = !outage + override def isHealthy: Boolean = { + val cutoff = Instant.now.minusMillis(healthTimeout.toMillis) + outage.get match { + case OutageMonitor(_, Some(outstanding)) if outstanding.isBefore(cutoff) => + // No message has been recently published, but one is overdue + false + case OutageMonitor(healthy, _) => + healthy + } + } /** * Store raw events in the PubSub topic @@ -52,19 +70,17 @@ class GooglePubSubSink private (publisher: Publisher, topicName: String, throttl override def storeRawEventsThrottled(events: List[Array[Byte]], key: String): Unit = { if (events.nonEmpty) log.debug(s"Writing ${events.size} Thrift records to Google PubSub topic $topicName.") - events.foreach { event => + val futures = events.map { event => val future = publisher.publish(eventToPubsubMessage(event)) ApiFutures.addCallback( future, new ApiFutureCallback[String]() { override def onSuccess(messageId: String): Unit = { - outage = false log.debug(s"Successfully published event with id $messageId to $topicName.") onComplete(event.size.toLong) } override def onFailure(throwable: Throwable): Unit = { - outage = true throwable match { case apiEx: ApiException => log.error( @@ -72,11 +88,29 @@ class GooglePubSubSink private (publisher: Publisher, topicName: String, throttl ) case t => log.error(s"Publishing message to $topicName failed with ${t.getMessage}.") } + outage.set(OutageMonitor(false, None)) onComplete(event.size.toLong) } }, logExecutor ) + future + } + + if (events.nonEmpty) { + outage.getAndUpdate { + case OutageMonitor(healthy, None) => OutageMonitor(healthy, Some(Instant.now)) + case other => other + } + ApiFutures.addCallback( + ApiFutures.allAsList(futures.asJava), + new ApiFutureCallback[java.util.List[String]]() { + override def onSuccess(i: java.util.List[String]): Unit = + outage.set(OutageMonitor(true, None)) + override def onFailure(throwable: Throwable): Unit = () + }, + logExecutor + ) } } @@ -106,7 +140,7 @@ object GooglePubSubSink { if (b) ().asRight else new IllegalArgumentException(s"Google PubSub topic $topicName doesn't exist").asLeft } else ().asRight - } yield new GooglePubSubSink(publisher, topicName, throttler) + } yield new GooglePubSubSink(publisher, topicName, throttler, InitialRpcTimeout) private val UserAgent = s"snowplow/stream-collector-${generated.BuildInfo.version}" @@ -138,6 +172,8 @@ object GooglePubSubSink { .setDelayThreshold(Duration.ofMillis(bufferConfig.timeLimit)) .build() + val InitialRpcTimeout = Duration.ofMillis(10000) + /** Defaults are used for the rpc configuration, see Publisher.java */ private def retrySettings(backoffPolicy: GooglePubSubBackoffPolicyConfig): RetrySettings = RetrySettings @@ -146,7 +182,7 @@ object GooglePubSubSink { .setMaxRetryDelay(Duration.ofMillis(backoffPolicy.maxBackoff)) .setRetryDelayMultiplier(backoffPolicy.multiplier) .setTotalTimeout(Duration.ofMillis(backoffPolicy.totalBackoff)) - .setInitialRpcTimeout(Duration.ofSeconds(10)) + .setInitialRpcTimeout(InitialRpcTimeout) .setRpcTimeoutMultiplier(2) .setMaxRpcTimeout(Duration.ofSeconds(10)) .build() @@ -161,4 +197,6 @@ object GooglePubSubSink { exists = topics.map(_.getName).exists(_.contains(topicName)) _ <- Either.catchNonFatal(topicAdminClient.close()) } yield exists + + private case class OutageMonitor(healthy: Boolean, outstanding: Option[Instant]) }