Skip to content

Commit

Permalink
collector-kafka: authenticate with Event Hubs using OAuth2 (close #401)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes authored and peel committed Feb 22, 2024
1 parent aee5608 commit 3d24905
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 25 deletions.
8 changes: 8 additions & 0 deletions kafka/src/it/resources/collector.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@ collector {
name = ${TOPIC_GOOD}
brokers = ${BROKER}
maxBytes = ${MAX_BYTES}
producerConf = {
"security.protocol" = "PLAINTEXT"
"sasl.mechanism" = "GSSAPI"
}
}
bad {
name = ${TOPIC_BAD}
brokers = ${BROKER}
maxBytes = ${MAX_BYTES}
producerConf = {
"security.protocol" = "PLAINTEXT"
"sasl.mechanism" = "GSSAPI"
}
}
}
}
5 changes: 5 additions & 0 deletions kafka/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ collector {
retries = 10
maxBytes = 1000000
buffer = ${collector.streams.buffer}
producerConf = {
"security.protocol" = "SASL_SSL"
"sasl.mechanism" = "OAUTHBEARER"
"sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
}
}

//Legacy style
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@ object KafkaCollector extends App[KafkaSinkConfig](BuildInfo) {

override def mkSinks(config: Config.Streams[KafkaSinkConfig]): Resource[IO, Sinks[IO]] =
for {
good <- KafkaSink.create[IO](config.good)
bad <- KafkaSink.create[IO](config.bad)
good <- KafkaSink.create[IO](
config.good,
classOf[GoodAzureAuthenticationCallbackHandler].getName
)
bad <- KafkaSink.create[IO](
config.bad,
classOf[BadAzureAuthenticationCallbackHandler].getName
)
} yield Sinks(good, bad)

override def telemetryInfo(config: Config.Streams[KafkaSinkConfig]): IO[Telemetry.TelemetryInfo] =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/**
* Copyright (c) 2013-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/
package com.snowplowanalytics.snowplow.collectors.scalastream
package sinks

import java.net.URI
import java.{lang, util}

import javax.security.auth.callback.Callback
import javax.security.auth.callback.UnsupportedCallbackException
import javax.security.auth.login.AppConfigurationEntry

import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback

import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.core.credential.TokenRequestContext

import com.nimbusds.jwt.JWTParser

// We need separate instances of callback handler with good and bad sink because
// they need different tokens to authenticate. However we are only giving class name to
// Kafka and it initializes the class itself and if we pass same class name for both sinks,
// Kafka initializes and uses only one instance of the callback handler. To create two
// separate instances, we created two different classes and pass their names to respective
// sink's properties. With this way, both sinks will have their own callback handler instance.
class GoodAzureAuthenticationCallbackHandler extends AzureAuthenticationCallbackHandler

class BadAzureAuthenticationCallbackHandler extends AzureAuthenticationCallbackHandler

class AzureAuthenticationCallbackHandler extends AuthenticateCallbackHandler {

val credentials = new DefaultAzureCredentialBuilder().build()

var sbUri: String = ""

override def configure(
configs: util.Map[String, _],
saslMechanism: String,
jaasConfigEntries: util.List[AppConfigurationEntry]
): Unit = {
val bootstrapServer =
configs
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
.toString
.replaceAll("\\[|\\]", "")
.split(",")
.toList
.headOption match {
case Some(s) => s
case None => throw new Exception("Empty bootstrap servers list")
}
val uri = URI.create("https://" + bootstrapServer)
// Workload identity works with '.default' scope
this.sbUri = s"${uri.getScheme}://${uri.getHost}/.default"
}

override def handle(callbacks: Array[Callback]): Unit =
callbacks.foreach {
case callback: OAuthBearerTokenCallback =>
val token = getOAuthBearerToken()
callback.token(token)
case callback => throw new UnsupportedCallbackException(callback)
}

def getOAuthBearerToken(): OAuthBearerToken = {
val reqContext = new TokenRequestContext()
reqContext.addScopes(sbUri)
val accessToken = credentials.getTokenSync(reqContext).getToken
val jwt = JWTParser.parse(accessToken)
val claims = jwt.getJWTClaimsSet

new OAuthBearerToken {
override def value(): String = accessToken

override def lifetimeMs(): Long = claims.getExpirationTime.getTime

override def scope(): util.Set[String] = null

override def principalName(): String = null

override def startTimeMs(): lang.Long = null
}
}

override def close(): Unit = ()
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ class KafkaSink[F[_]: Sync](
object KafkaSink {

def create[F[_]: Sync](
sinkConfig: Config.Sink[KafkaSinkConfig]
sinkConfig: Config.Sink[KafkaSinkConfig],
authCallbackClass: String
): Resource[F, KafkaSink[F]] =
for {
kafkaProducer <- createProducer(sinkConfig.config, sinkConfig.buffer)
kafkaProducer <- createProducer(sinkConfig.config, sinkConfig.buffer, authCallbackClass)
kafkaSink = new KafkaSink(sinkConfig.config.maxBytes, kafkaProducer, sinkConfig.name)
} yield kafkaSink

Expand All @@ -77,7 +78,8 @@ object KafkaSink {
*/
private def createProducer[F[_]: Sync](
kafkaConfig: KafkaSinkConfig,
bufferConfig: Config.Buffer
bufferConfig: Config.Buffer,
authCallbackClass: String
): Resource[F, KafkaProducer[String, Array[Byte]]] = {
val acquire = Sync[F].delay {
val props = new Properties()
Expand All @@ -88,6 +90,7 @@ object KafkaSink {
props.setProperty("linger.ms", bufferConfig.timeLimit.toString)
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer")
props.setProperty("sasl.login.callback.handler.class", authCallbackClass)

// Can't use `putAll` in JDK 11 because of https://github.com/scala/bug/issues/10418
kafkaConfig.producerConf.getOrElse(Map()).foreach { case (k, v) => props.setProperty(k, v) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,16 @@ object KafkaConfigSpec {
timeLimit = 5000
),
config = KafkaSinkConfig(
maxBytes = 1000000,
brokers = "localhost:9092,another.host:9092",
retries = 10,
producerConf = None
maxBytes = 1000000,
brokers = "localhost:9092,another.host:9092",
retries = 10,
producerConf = Some(
Map(
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "OAUTHBEARER",
"sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
)
)
)
),
bad = Config.Sink(
Expand All @@ -135,10 +141,16 @@ object KafkaConfigSpec {
timeLimit = 5000
),
config = KafkaSinkConfig(
maxBytes = 1000000,
brokers = "localhost:9092,another.host:9092",
retries = 10,
producerConf = None
maxBytes = 1000000,
brokers = "localhost:9092,another.host:9092",
retries = 10,
producerConf = Some(
Map(
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "OAUTHBEARER",
"sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
)
)
)
),
useIpAddressAsPartitionKey = false
Expand Down
3 changes: 2 additions & 1 deletion project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ object BuildSettings {
libraryDependencies ++= Seq(
Dependencies.Libraries.kafkaClients,
Dependencies.Libraries.mskAuth,

Dependencies.Libraries.azureIdentity,

// integration tests dependencies
Dependencies.Libraries.IntegrationTests.specs2,
Dependencies.Libraries.IntegrationTests.specs2CE
Expand Down
24 changes: 13 additions & 11 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ object Dependencies {
val thrift = "0.15.0" // force this version to mitigate security vulnerabilities
val tracker = "2.0.0"
val dataDog4s = "0.32.0"
val azureIdentity = "1.11.0"
}

object Libraries {
Expand All @@ -63,17 +64,18 @@ object Dependencies {
val datadogStatsd = "com.avast.cloud" %% "datadog4s-statsd" % V.dataDog4s

//sinks
val fs2PubSub = "com.permutive" %% "fs2-google-pubsub-grpc" % V.fs2PubSub
val jackson = "com.fasterxml.jackson.core" % "jackson-databind" % V.jackson
val kafkaClients = "org.apache.kafka" % "kafka-clients" % V.kafka
val kinesis = "com.amazonaws" % "aws-java-sdk-kinesis" % V.awsSdk
val log4j = "org.apache.logging.log4j" % "log4j-core" % V.log4j
val mskAuth = "software.amazon.msk" % "aws-msk-iam-auth" % V.mskAuth % Runtime // Enables AWS MSK IAM authentication https://github.com/snowplow/stream-collector/pull/214
val nettyAll = "io.netty" % "netty-all" % V.nettyAll
val nsqClient = "com.snowplowanalytics" % "nsq-java-client" % V.nsqClient
val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub
val sqs = "com.amazonaws" % "aws-java-sdk-sqs" % V.awsSdk
val sts = "com.amazonaws" % "aws-java-sdk-sts" % V.awsSdk % Runtime // Enables web token authentication https://github.com/snowplow/stream-collector/issues/169
val fs2PubSub = "com.permutive" %% "fs2-google-pubsub-grpc" % V.fs2PubSub
val jackson = "com.fasterxml.jackson.core" % "jackson-databind" % V.jackson
val kafkaClients = "org.apache.kafka" % "kafka-clients" % V.kafka
val kinesis = "com.amazonaws" % "aws-java-sdk-kinesis" % V.awsSdk
val log4j = "org.apache.logging.log4j" % "log4j-core" % V.log4j
val mskAuth = "software.amazon.msk" % "aws-msk-iam-auth" % V.mskAuth % Runtime // Enables AWS MSK IAM authentication https://github.com/snowplow/stream-collector/pull/214
val nettyAll = "io.netty" % "netty-all" % V.nettyAll
val nsqClient = "com.snowplowanalytics" % "nsq-java-client" % V.nsqClient
val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub
val sqs = "com.amazonaws" % "aws-java-sdk-sqs" % V.awsSdk
val sts = "com.amazonaws" % "aws-java-sdk-sts" % V.awsSdk % Runtime // Enables web token authentication https://github.com/snowplow/stream-collector/issues/169
val azureIdentity = "com.azure" % "azure-identity" % V.azureIdentity

//common unit tests
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test
Expand Down

0 comments on commit 3d24905

Please sign in to comment.