Skip to content

Commit

Permalink
PubSubSink use timeouts to set unhealthy sink (close #191)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Dec 6, 2021
1 parent b7bcfe3 commit 5050bb6
Showing 1 changed file with 47 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ package com.snowplowanalytics.snowplow.collectors.scalastream
package sinks

import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicReference
import java.time.Instant
import com.google.api.core.{ApiFutureCallback, ApiFutures}
import com.google.api.gax.batching.BatchingSettings
import com.google.api.gax.retrying.RetrySettings
Expand All @@ -33,16 +35,32 @@ import com.snowplowanalytics.snowplow.collectors.scalastream.model._
/**
* Google PubSub Sink for the Scala Stream Collector
*/
class GooglePubSubSink private (publisher: Publisher, topicName: String, throttler: Sink.Throttler)
extends Sink.Throttled(throttler) {
class GooglePubSubSink private (
publisher: Publisher,
topicName: String,
throttler: Sink.Throttler,
healthTimeout: Duration
) extends Sink.Throttled(throttler) {
import GooglePubSubSink.OutageMonitor

private val logExecutor = Executors.newSingleThreadExecutor()

// maximum size of a pubsub message is 10MB
override val MaxBytes: Int = 10000000

private val outage = new AtomicReference(OutageMonitor(true, None))

// Is the collector detecting an outage downstream
@volatile private var outage: Boolean = false
override def isHealthy: Boolean = !outage
override def isHealthy: Boolean = {
val cutoff = Instant.now.minusMillis(healthTimeout.toMillis)
outage.get match {
case OutageMonitor(_, Some(outstanding)) if outstanding.isBefore(cutoff) =>
// No message has been recently published, but one is overdue
false
case OutageMonitor(healthy, _) =>
healthy
}
}

/**
* Store raw events in the PubSub topic
Expand All @@ -52,31 +70,47 @@ class GooglePubSubSink private (publisher: Publisher, topicName: String, throttl
override def storeRawEventsThrottled(events: List[Array[Byte]], key: String): Unit = {
if (events.nonEmpty)
log.debug(s"Writing ${events.size} Thrift records to Google PubSub topic $topicName.")
events.foreach { event =>
val futures = events.map { event =>
val future = publisher.publish(eventToPubsubMessage(event))
ApiFutures.addCallback(
future,
new ApiFutureCallback[String]() {
override def onSuccess(messageId: String): Unit = {
outage = false
log.debug(s"Successfully published event with id $messageId to $topicName.")
onComplete(event.size.toLong)
}

override def onFailure(throwable: Throwable): Unit = {
outage = true
throwable match {
case apiEx: ApiException =>
log.error(
s"Publishing message to $topicName failed with code ${apiEx.getStatusCode}: ${apiEx.getMessage} This error is retryable: ${apiEx.isRetryable}."
)
case t => log.error(s"Publishing message to $topicName failed with ${t.getMessage}.")
}
outage.set(OutageMonitor(false, None))
onComplete(event.size.toLong)
}
},
logExecutor
)
future
}

if (events.nonEmpty) {
outage.getAndUpdate {
case OutageMonitor(healthy, None) => OutageMonitor(healthy, Some(Instant.now))
case other => other
}
ApiFutures.addCallback(
ApiFutures.allAsList(futures.asJava),
new ApiFutureCallback[java.util.List[String]]() {
override def onSuccess(i: java.util.List[String]): Unit =
outage.set(OutageMonitor(true, None))
override def onFailure(throwable: Throwable): Unit = ()
},
logExecutor
)
}
}

Expand Down Expand Up @@ -106,7 +140,7 @@ object GooglePubSubSink {
if (b) ().asRight
else new IllegalArgumentException(s"Google PubSub topic $topicName doesn't exist").asLeft
} else ().asRight
} yield new GooglePubSubSink(publisher, topicName, throttler)
} yield new GooglePubSubSink(publisher, topicName, throttler, InitialRpcTimeout)

private val UserAgent = s"snowplow/stream-collector-${generated.BuildInfo.version}"

Expand Down Expand Up @@ -138,6 +172,8 @@ object GooglePubSubSink {
.setDelayThreshold(Duration.ofMillis(bufferConfig.timeLimit))
.build()

val InitialRpcTimeout = Duration.ofMillis(10000)

/** Defaults are used for the rpc configuration, see Publisher.java */
private def retrySettings(backoffPolicy: GooglePubSubBackoffPolicyConfig): RetrySettings =
RetrySettings
Expand All @@ -146,7 +182,7 @@ object GooglePubSubSink {
.setMaxRetryDelay(Duration.ofMillis(backoffPolicy.maxBackoff))
.setRetryDelayMultiplier(backoffPolicy.multiplier)
.setTotalTimeout(Duration.ofMillis(backoffPolicy.totalBackoff))
.setInitialRpcTimeout(Duration.ofSeconds(10))
.setInitialRpcTimeout(InitialRpcTimeout)
.setRpcTimeoutMultiplier(2)
.setMaxRpcTimeout(Duration.ofSeconds(10))
.build()
Expand All @@ -161,4 +197,6 @@ object GooglePubSubSink {
exists = topics.map(_.getName).exists(_.contains(topicName))
_ <- Either.catchNonFatal(topicAdminClient.close())
} yield exists

private case class OutageMonitor(healthy: Boolean, outstanding: Option[Instant])
}

0 comments on commit 5050bb6

Please sign in to comment.