diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java index 7481acb2ff..81a627cbbe 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java @@ -13,9 +13,6 @@ */ package io.streamnative.pulsar.handlers.kop.security.auth; - -import static com.google.common.base.Preconditions.checkArgument; - import io.streamnative.pulsar.handlers.kop.security.KafkaPrincipal; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; @@ -70,9 +67,7 @@ private CompletableFuture authorizeTenantPermission(KafkaPrincipal prin @Override public CompletableFuture canAccessTenantAsync(KafkaPrincipal principal, Resource resource) { - checkArgument(resource.getResourceType() == ResourceType.TENANT, - String.format("Expected resource type is TENANT, but have [%s]", resource.getResourceType())); - + checkResourceType(resource, ResourceType.TENANT); CompletableFuture canAccessFuture = new CompletableFuture<>(); authorizeTenantPermission(principal, resource).whenComplete((hasPermission, ex) -> { if (ex != null) { @@ -92,9 +87,7 @@ public CompletableFuture canAccessTenantAsync(KafkaPrincipal principal, @Override public CompletableFuture canCreateTopicAsync(KafkaPrincipal principal, Resource resource) { - checkArgument(resource.getResourceType() == ResourceType.TOPIC, - String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType())); - + checkResourceType(resource, ResourceType.TOPIC); TopicName topicName = TopicName.get(resource.getName()); return authorizationService.allowNamespaceOperationAsync( topicName.getNamespaceObject(), @@ -105,9 +98,7 @@ public CompletableFuture canCreateTopicAsync(KafkaPrincipal principal, @Override public CompletableFuture canDeleteTopicAsync(KafkaPrincipal principal, Resource resource) { - checkArgument(resource.getResourceType() == ResourceType.TOPIC, - String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType())); - + checkResourceType(resource, ResourceType.TOPIC); TopicName topicName = TopicName.get(resource.getName()); return authorizationService.allowNamespaceOperationAsync( topicName.getNamespaceObject(), @@ -118,9 +109,7 @@ public CompletableFuture canDeleteTopicAsync(KafkaPrincipal principal, @Override public CompletableFuture canAlterTopicAsync(KafkaPrincipal principal, Resource resource) { - checkArgument(resource.getResourceType() == ResourceType.TOPIC, - String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType())); - + checkResourceType(resource, ResourceType.TOPIC); TopicName topicName = TopicName.get(resource.getName()); return authorizationService.allowTopicPolicyOperationAsync( topicName, PolicyName.PARTITION, PolicyOperation.WRITE, @@ -129,9 +118,7 @@ public CompletableFuture canAlterTopicAsync(KafkaPrincipal principal, R @Override public CompletableFuture canManageTenantAsync(KafkaPrincipal principal, Resource resource) { - checkArgument(resource.getResourceType() == ResourceType.TOPIC, - String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType())); - + checkResourceType(resource, ResourceType.TOPIC); TopicName topicName = TopicName.get(resource.getName()); return authorizationService.allowTopicOperationAsync( topicName, TopicOperation.LOOKUP, principal.getName(), principal.getAuthenticationData()); @@ -139,16 +126,14 @@ public CompletableFuture canManageTenantAsync(KafkaPrincipal principal, @Override public CompletableFuture canLookupAsync(KafkaPrincipal principal, Resource resource) { - checkArgument(resource.getResourceType() == ResourceType.TOPIC, - String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType())); + checkResourceType(resource, ResourceType.TOPIC); TopicName topicName = TopicName.get(resource.getName()); return authorizationService.canLookupAsync(topicName, principal.getName(), principal.getAuthenticationData()); } @Override public CompletableFuture canGetTopicList(KafkaPrincipal principal, Resource resource) { - checkArgument(resource.getResourceType() == ResourceType.NAMESPACE, - String.format("Expected resource type is NAMESPACE, but have [%s]", resource.getResourceType())); + checkResourceType(resource, ResourceType.NAMESPACE); return authorizationService.allowNamespaceOperationAsync( NamespaceName.get(resource.getName()), NamespaceOperation.GET_TOPICS, @@ -158,16 +143,14 @@ public CompletableFuture canGetTopicList(KafkaPrincipal principal, Reso @Override public CompletableFuture canProduceAsync(KafkaPrincipal principal, Resource resource) { - checkArgument(resource.getResourceType() == ResourceType.TOPIC, - String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType())); + checkResourceType(resource, ResourceType.TOPIC); TopicName topicName = TopicName.get(resource.getName()); return authorizationService.canProduceAsync(topicName, principal.getName(), principal.getAuthenticationData()); } @Override public CompletableFuture canConsumeAsync(KafkaPrincipal principal, Resource resource) { - checkArgument(resource.getResourceType() == ResourceType.TOPIC, - String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType())); + checkResourceType(resource, ResourceType.TOPIC); TopicName topicName = TopicName.get(resource.getName()); return authorizationService.canConsumeAsync( topicName, principal.getName(), principal.getAuthenticationData(), ""); @@ -179,9 +162,16 @@ public CompletableFuture canConsumeAsync(KafkaPrincipal principal, Reso */ @Override public CompletableFuture canDeleteGroupAsync(KafkaPrincipal principal, Resource resource) { - checkArgument(resource.getResourceType() == ResourceType.GROUP, - String.format("Expected resource type is GROUP, but have [%s]", resource.getResourceType())); + checkResourceType(resource, ResourceType.GROUP); return authorizationService.isSuperUser(principal.getName(), principal.getAuthenticationData()); } + private void checkResourceType(Resource actual, ResourceType expected) { + if (actual.getResourceType() != expected) { + throw new IllegalArgumentException( + String.format("Expected resource type is [%s], but have [%s]", + expected, actual.getResourceType())); + } + } + }