Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BE: Chore: Polishing #553

Merged
merged 1 commit into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ public class AzureEntraLoginCallbackHandler implements AuthenticateCallbackHandl
private TokenRequestContext tokenRequestContext;

@Override
public void configure(
Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
public void configure(Map<String, ?> configs,
String mechanism,
List<AppConfigurationEntry> jaasConfigEntries) {
tokenRequestContext = buildTokenRequestContext(configs);
}

Expand All @@ -45,16 +46,17 @@ private TokenRequestContext buildTokenRequestContext(Map<String, ?> configs) {
return request;
}

@SuppressWarnings("unchecked")
private URI buildEventHubsServerUri(Map<String, ?> configs) {
final List<String> bootstrapServers = (List<String>) configs.get(BOOTSTRAP_SERVERS_CONFIG);

if (null == bootstrapServers) {
if (bootstrapServers == null) {
final String message = BOOTSTRAP_SERVERS_CONFIG + " is missing from the Kafka configuration.";
log.error(message);
throw new IllegalArgumentException(message);
}

if (bootstrapServers.size() != 1) {
if (bootstrapServers.size() > 1) {
final String message =
BOOTSTRAP_SERVERS_CONFIG
+ " contains multiple bootstrap servers. Only a single bootstrap server is supported.";
Expand All @@ -72,11 +74,10 @@ private String buildTokenAudience(URI uri) {
@Override
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
for (Callback callback : callbacks) {
if (callback instanceof OAuthBearerTokenCallback oauthCallback) {
handleOAuthCallback(oauthCallback);
} else {
if (!(callback instanceof OAuthBearerTokenCallback oauthCallback)) {
throw new UnsupportedCallbackException(callback);
}
handleOAuthCallback(oauthCallback);
}
}

Expand All @@ -91,7 +92,7 @@ private void handleOAuthCallback(OAuthBearerTokenCallback oauthCallback) {
.block();

oauthCallback.token(token);
} catch (final RuntimeException e) {
} catch (RuntimeException e) {
final String message =
"Failed to acquire Azure token for Event Hub Authentication. "
+ "Please ensure valid Azure credentials are configured.";
Expand All @@ -104,7 +105,7 @@ public void close() {
// NOOP
}

void setTokenCredential(final TokenCredential tokenCredential) {
void setTokenCredential(TokenCredential tokenCredential) {
AzureEntraLoginCallbackHandler.tokenCredential = tokenCredential;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.nimbusds.jwt.JWTParser;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.SaslAuthenticationException;
Expand All @@ -14,7 +13,6 @@
public class AzureEntraOAuthBearerToken implements OAuthBearerToken {

private final AccessToken accessToken;

private final JWTClaimsSet claims;

public AzureEntraOAuthBearerToken(AccessToken accessToken) {
Expand Down Expand Up @@ -48,7 +46,9 @@ public Set<String> scope() {
// https://docs.microsoft.com/azure/active-directory/develop/access-tokens#payload-claims, the
// scp
// claim is a String which is presented as a space separated list.
return Arrays.stream(((String) claims.getClaim("scp")).split(" ")).collect(Collectors.toSet());
return Arrays
.stream(((String) claims.getClaim("scp")).split(" "))
.collect(Collectors.toSet());
}

@Override
Expand Down
Loading