Skip to content

Commit

Permalink
collector-kafka: authenticate with Event Hubs using OAuth2 via MSI (c…
Browse files Browse the repository at this point in the history
…lose #401)
  • Loading branch information
spenes committed Dec 13, 2023
1 parent 2b4cd29 commit 10796aa
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 20 deletions.
6 changes: 6 additions & 0 deletions kafka/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ collector {
retries = 10
maxBytes = 1000000
buffer = ${collector.streams.buffer}
producerConf = {
"security.protocol" = "SASL_SSL"
"sasl.mechanism" = "OAUTHBEARER"
"sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
"sasl.login.callback.handler.class": "com.snowplowanalytics.snowplow.collectors.scalastream.sinks.AzureAuthenticationCallbackHandler"
}
}

//Legacy style
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Copyright (c) 2013-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.collectors.scalastream
package sinks

import java.net.URI
import java.{lang, util}

import javax.security.auth.callback.Callback
import javax.security.auth.callback.UnsupportedCallbackException
import javax.security.auth.login.AppConfigurationEntry

import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback

import com.microsoft.azure.credentials.MSICredentials

import com.nimbusds.jwt.JWTParser

class AzureAuthenticationCallbackHandler extends AuthenticateCallbackHandler {

val credentials: MSICredentials = new MSICredentials()

var sbUri: String = ""

override def configure(
configs: util.Map[String, _],
saslMechanism: String,
jaasConfigEntries: util.List[AppConfigurationEntry]
): Unit = {
val bootstrapServer = configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString.replaceAll("\\[|\\]", "")
val uri = URI.create("https://" + bootstrapServer)
this.sbUri = uri.getScheme + "://" + uri.getHost
}

override def handle(callbacks: Array[Callback]): Unit =
callbacks.foreach {
case callback: OAuthBearerTokenCallback =>
val token = getOAuthBearerToken()
callback.token(token)
case callback => throw new UnsupportedCallbackException(callback)
}

def getOAuthBearerToken(): OAuthBearerToken = {
val accessToken = credentials.getToken(sbUri)
val jwt = JWTParser.parse(accessToken)
val claims = jwt.getJWTClaimsSet

new OAuthBearerToken {
override def value(): String = accessToken

override def lifetimeMs(): Long = claims.getExpirationTime.getTime

override def scope(): util.Set[String] = null

override def principalName(): String = null

override def startTimeMs(): lang.Long = null
}
}

override def close(): Unit = ()
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,17 @@ object KafkaConfigSpec {
timeLimit = 5000
),
config = KafkaSinkConfig(
maxBytes = 1000000,
brokers = "localhost:9092,another.host:9092",
retries = 10,
producerConf = None
maxBytes = 1000000,
brokers = "localhost:9092,another.host:9092",
retries = 10,
producerConf = Some(
Map(
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "OAUTHBEARER",
"sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"sasl.login.callback.handler.class" -> "com.snowplowanalytics.snowplow.collectors.scalastream.sinks.AzureAuthenticationCallbackHandler"
)
)
)
),
bad = Config.Sink(
Expand All @@ -119,10 +126,17 @@ object KafkaConfigSpec {
timeLimit = 5000
),
config = KafkaSinkConfig(
maxBytes = 1000000,
brokers = "localhost:9092,another.host:9092",
retries = 10,
producerConf = None
maxBytes = 1000000,
brokers = "localhost:9092,another.host:9092",
retries = 10,
producerConf = Some(
Map(
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "OAUTHBEARER",
"sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;",
"sasl.login.callback.handler.class" -> "com.snowplowanalytics.snowplow.collectors.scalastream.sinks.AzureAuthenticationCallbackHandler"
)
)
)
),
useIpAddressAsPartitionKey = false
Expand Down
3 changes: 2 additions & 1 deletion project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ object BuildSettings {
libraryDependencies ++= Seq(
Dependencies.Libraries.kafkaClients,
Dependencies.Libraries.mskAuth,

Dependencies.Libraries.azureAuth,

// integration tests dependencies
Dependencies.Libraries.IntegrationTests.specs2,
Dependencies.Libraries.IntegrationTests.specs2CE
Expand Down
24 changes: 13 additions & 11 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ object Dependencies {
val testcontainers = "0.40.10"
val thrift = "0.15.0" // force this version to mitigate security vulnerabilities
val tracker = "2.0.0"
val azureAuth = "1.7.14"
}

object Libraries {
Expand All @@ -58,17 +59,18 @@ object Dependencies {
val trackerCore = "com.snowplowanalytics" %% "snowplow-scala-tracker-core" % V.tracker

//sinks
val fs2PubSub = "com.permutive" %% "fs2-google-pubsub-grpc" % V.fs2PubSub
val jackson = "com.fasterxml.jackson.core" % "jackson-databind" % V.jackson
val kafkaClients = "org.apache.kafka" % "kafka-clients" % V.kafka
val kinesis = "com.amazonaws" % "aws-java-sdk-kinesis" % V.awsSdk
val log4j = "org.apache.logging.log4j" % "log4j-core" % V.log4j
val mskAuth = "software.amazon.msk" % "aws-msk-iam-auth" % V.mskAuth % Runtime // Enables AWS MSK IAM authentication https://github.com/snowplow/stream-collector/pull/214
val nettyAll = "io.netty" % "netty-all" % V.nettyAll
val nsqClient = "com.snowplowanalytics" % "nsq-java-client" % V.nsqClient
val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub
val sqs = "com.amazonaws" % "aws-java-sdk-sqs" % V.awsSdk
val sts = "com.amazonaws" % "aws-java-sdk-sts" % V.awsSdk % Runtime // Enables web token authentication https://github.com/snowplow/stream-collector/issues/169
val fs2PubSub = "com.permutive" %% "fs2-google-pubsub-grpc" % V.fs2PubSub
val jackson = "com.fasterxml.jackson.core" % "jackson-databind" % V.jackson
val kafkaClients = "org.apache.kafka" % "kafka-clients" % V.kafka
val kinesis = "com.amazonaws" % "aws-java-sdk-kinesis" % V.awsSdk
val log4j = "org.apache.logging.log4j" % "log4j-core" % V.log4j
val mskAuth = "software.amazon.msk" % "aws-msk-iam-auth" % V.mskAuth % Runtime // Enables AWS MSK IAM authentication https://github.com/snowplow/stream-collector/pull/214
val nettyAll = "io.netty" % "netty-all" % V.nettyAll
val nsqClient = "com.snowplowanalytics" % "nsq-java-client" % V.nsqClient
val pubsub = "com.google.cloud" % "google-cloud-pubsub" % V.pubsub
val sqs = "com.amazonaws" % "aws-java-sdk-sqs" % V.awsSdk
val sts = "com.amazonaws" % "aws-java-sdk-sts" % V.awsSdk % Runtime // Enables web token authentication https://github.com/snowplow/stream-collector/issues/169
val azureAuth = "com.microsoft.azure" % "azure-client-authentication" % V.azureAuth

//common unit tests
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test
Expand Down

0 comments on commit 10796aa

Please sign in to comment.