From 4a66666807c3a227d445cd7c59eec50f3422b36e Mon Sep 17 00:00:00 2001 From: Julien Chanaud Date: Thu, 21 Oct 2021 00:15:22 +0200 Subject: [PATCH] KafkaStream resource must validate against Topic and ConsumerGroup ACL (#122) * Change Stream ACL behavior * Added tests --- .../controllers/StreamController.java | 8 +- .../ns4kafka/services/StreamService.java | 13 ++- .../AccessControlEntryAsyncExecutor.java | 5 - .../controllers/StreamControllerTest.java | 16 +-- .../ns4kafka/services/StreamServiceTest.java | 105 +++++++++++++++++- 5 files changed, 123 insertions(+), 24 deletions(-) diff --git a/api/src/main/java/com/michelin/ns4kafka/controllers/StreamController.java b/api/src/main/java/com/michelin/ns4kafka/controllers/StreamController.java index a646fbb1..92c83c98 100644 --- a/api/src/main/java/com/michelin/ns4kafka/controllers/StreamController.java +++ b/api/src/main/java/com/michelin/ns4kafka/controllers/StreamController.java @@ -42,9 +42,9 @@ HttpResponse apply(String namespace,@Body @Valid KafkaStream stream Namespace ns = getNamespace(namespace); //Creation of the correct ACLs - if (!streamService.isNamespaceOwnerOfStream(namespace, stream.getMetadata().getName())) { + if (!streamService.isNamespaceOwnerOfKafkaStream(ns, stream.getMetadata().getName())) { throw new ResourceValidationException(List.of("Invalid value " + stream.getMetadata().getName() - + " for name: Namespace not OWNER of this stream"), "Stream", stream.getMetadata().getName()); + + " for name: Namespace not OWNER of underlying Topic prefix and Group prefix"), "Stream", stream.getMetadata().getName()); } //Augment the Stream stream.getMetadata().setCreationTimestamp(Date.from(Instant.now())); @@ -77,9 +77,9 @@ HttpResponse apply(String namespace,@Body @Valid KafkaStream stream HttpResponse delete(String namespace,String stream, @QueryValue(defaultValue = "false") boolean dryrun){ Namespace ns = getNamespace(namespace); - if (!streamService.isNamespaceOwnerOfStream(namespace, stream)) { + if (!streamService.isNamespaceOwnerOfKafkaStream(ns, stream)) { throw new ResourceValidationException(List.of("Invalid value " + stream - + " for name: Namespace not OWNER of this stream"), "Stream", stream); + + " for name: Namespace not OWNER of underlying Topic prefix and Group prefix"), "Stream", stream); } // exists ? Optional optionalStream = streamService.findByName(ns, stream); diff --git a/api/src/main/java/com/michelin/ns4kafka/services/StreamService.java b/api/src/main/java/com/michelin/ns4kafka/services/StreamService.java index 25ef4352..232dfab2 100644 --- a/api/src/main/java/com/michelin/ns4kafka/services/StreamService.java +++ b/api/src/main/java/com/michelin/ns4kafka/services/StreamService.java @@ -29,8 +29,17 @@ public Optional findByName(Namespace namespace, String stream) { .findFirst(); } - public boolean isNamespaceOwnerOfStream(String namespace, String stream) { - return accessControlEntryService.isNamespaceOwnerOfResource(namespace, AccessControlEntry.ResourceType.TOPIC, stream); + public boolean isNamespaceOwnerOfKafkaStream(Namespace namespace, String resource) { + // KafkaStream Ownership is determined by both Topic and Group ownership on PREFIXED resource, + // this is because KafkaStream application.id is a consumergroup but also a prefix for internal topic names + return accessControlEntryService.findAllGrantedToNamespace(namespace) + .stream() + .filter(accessControlEntry -> accessControlEntry.getSpec().getPermission() == AccessControlEntry.Permission.OWNER) + .filter(accessControlEntry -> accessControlEntry.getSpec().getResourcePatternType() == AccessControlEntry.ResourcePatternType.PREFIXED) + .filter(accessControlEntry -> resource.startsWith(accessControlEntry.getSpec().getResource())) + .map(accessControlEntry -> accessControlEntry.getSpec().getResourceType()) + .collect(Collectors.toList()) + .containsAll(List.of(AccessControlEntry.ResourceType.TOPIC, AccessControlEntry.ResourceType.GROUP)); } public KafkaStream create(KafkaStream stream) { diff --git a/api/src/main/java/com/michelin/ns4kafka/services/executors/AccessControlEntryAsyncExecutor.java b/api/src/main/java/com/michelin/ns4kafka/services/executors/AccessControlEntryAsyncExecutor.java index ebd817e0..f69aa332 100644 --- a/api/src/main/java/com/michelin/ns4kafka/services/executors/AccessControlEntryAsyncExecutor.java +++ b/api/src/main/java/com/michelin/ns4kafka/services/executors/AccessControlEntryAsyncExecutor.java @@ -206,11 +206,6 @@ private List buildAclBindingsFromKafkaStream(KafkaStream stream, Str new AclBinding( new ResourcePattern(ResourceType.TOPIC, stream.getMetadata().getName(), PatternType.PREFIXED), new org.apache.kafka.common.acl.AccessControlEntry("User:" + kafkaUser, "*", AclOperation.DELETE, AclPermissionType.ALLOW) - ), - // READ on application.id group - new AclBinding( - new ResourcePattern(ResourceType.GROUP, stream.getMetadata().getName(), PatternType.PREFIXED), - new org.apache.kafka.common.acl.AccessControlEntry("User:" + kafkaUser, "*", AclOperation.READ, AclPermissionType.ALLOW) ) ); } diff --git a/api/src/test/java/com/michelin/ns4kafka/controllers/StreamControllerTest.java b/api/src/test/java/com/michelin/ns4kafka/controllers/StreamControllerTest.java index b76b5ca8..d6252275 100644 --- a/api/src/test/java/com/michelin/ns4kafka/controllers/StreamControllerTest.java +++ b/api/src/test/java/com/michelin/ns4kafka/controllers/StreamControllerTest.java @@ -146,7 +146,7 @@ void createStreamSuccess() { Mockito.when(namespaceService.findByName("test")) .thenReturn(Optional.of(ns)); - Mockito.when(streamService.isNamespaceOwnerOfStream("test", "test_stream1")) + Mockito.when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) .thenReturn(true); when(streamService.findByName(ns, "test_stream1")) @@ -182,7 +182,7 @@ void createStreamSuccessDryRun() { Mockito.when(namespaceService.findByName("test")) .thenReturn(Optional.of(ns)); - Mockito.when(streamService.isNamespaceOwnerOfStream("test", "test_stream1")) + Mockito.when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) .thenReturn(true); when(streamService.findByName(ns, "test_stream1")) @@ -213,7 +213,7 @@ void updateStreamUnchanged() { Mockito.when(namespaceService.findByName("test")) .thenReturn(Optional.of(ns)); - Mockito.when(streamService.isNamespaceOwnerOfStream("test", "test_stream1")) + Mockito.when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) .thenReturn(true); when(streamService.findByName(ns, "test_stream1")) @@ -244,7 +244,7 @@ void createStreamValidationError() { Mockito.when(namespaceService.findByName("test")) .thenReturn(Optional.of(ns)); - Mockito.when(streamService.isNamespaceOwnerOfStream("test", "test_stream1")) + Mockito.when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) .thenReturn(false); ResourceValidationException actual = Assertions.assertThrows(ResourceValidationException.class, () -> streamController.apply("test", stream1, false)); Mockito.verify(streamService, never()).create(any()); @@ -268,7 +268,7 @@ void deleteStreamSuccess() { Mockito.when(namespaceService.findByName("test")) .thenReturn(Optional.of(ns)); - Mockito.when(streamService.isNamespaceOwnerOfStream("test", "test_stream1")) + Mockito.when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) .thenReturn(true); when(streamService.findByName(ns, "test_stream1")) @@ -300,7 +300,7 @@ void deleteStreamSuccessDryRun() { Mockito.when(namespaceService.findByName("test")) .thenReturn(Optional.of(ns)); - Mockito.when(streamService.isNamespaceOwnerOfStream("test", "test_stream1")) + Mockito.when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) .thenReturn(true); when(streamService.findByName(ns, "test_stream1")) @@ -329,7 +329,7 @@ void deleteStreamNotFound() { Mockito.when(namespaceService.findByName("test")) .thenReturn(Optional.of(ns)); - Mockito.when(streamService.isNamespaceOwnerOfStream("test", "test_stream1")) + Mockito.when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) .thenReturn(true); when(streamService.findByName(ns, "test_stream1")) @@ -352,7 +352,7 @@ void deleteStreamNotOwner() { Mockito.when(namespaceService.findByName("test")) .thenReturn(Optional.of(ns)); - Mockito.when(streamService.isNamespaceOwnerOfStream("test", "test_stream1")) + Mockito.when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) .thenReturn(false); ResourceValidationException actual = Assertions.assertThrows(ResourceValidationException.class, () -> streamController.delete("test", "test_stream1", false)); diff --git a/api/src/test/java/com/michelin/ns4kafka/services/StreamServiceTest.java b/api/src/test/java/com/michelin/ns4kafka/services/StreamServiceTest.java index 564dc119..06eaed4b 100644 --- a/api/src/test/java/com/michelin/ns4kafka/services/StreamServiceTest.java +++ b/api/src/test/java/com/michelin/ns4kafka/services/StreamServiceTest.java @@ -1,14 +1,10 @@ package com.michelin.ns4kafka.services; -import static org.mockito.Mockito.when; - -import java.util.List; - +import com.michelin.ns4kafka.models.AccessControlEntry; import com.michelin.ns4kafka.models.KafkaStream; import com.michelin.ns4kafka.models.Namespace; import com.michelin.ns4kafka.models.ObjectMeta; import com.michelin.ns4kafka.repositories.StreamRepository; - import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -16,6 +12,10 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import java.util.List; + +import static org.mockito.Mockito.when; + @ExtendWith(MockitoExtension.class) public class StreamServiceTest { @@ -140,4 +140,99 @@ void findByNameEmpty() { var actual = streamService.findByName(ns, "test_stream2"); Assertions.assertTrue(actual.isEmpty()); } + + @Test + void isNamespaceOwnerOfKafkaStream() { + Namespace ns = Namespace.builder() + .metadata(ObjectMeta.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + AccessControlEntry ace1 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.OWNER) + .resource("test.") + .grantedTo("test") + .build() + ) + .build(); + AccessControlEntry ace2 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.GROUP) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.OWNER) + .resource("test.") + .grantedTo("test") + .build() + ) + .build(); + AccessControlEntry ace3 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.OWNER) + .resource("test.") + .grantedTo("test") + .build() + ) + .build(); + AccessControlEntry ace4 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .permission(AccessControlEntry.Permission.OWNER) + .resource("test-bis.") + .grantedTo("test") + .build() + ) + .build(); + AccessControlEntry ace5 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.GROUP) + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .permission(AccessControlEntry.Permission.OWNER) + .resource("test-bis.") + .grantedTo("test") + .build() + ) + .build(); + AccessControlEntry ace6 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.GROUP) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.OWNER) + .resource("test-ter.") + .grantedTo("test") + .build() + ) + .build(); + AccessControlEntry ace7 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.OWNER) + .resource("test-qua.") + .grantedTo("test") + .build() + ) + .build(); + + when(accessControlEntryService.findAllGrantedToNamespace(ns)) + .thenReturn(List.of(ace1, ace2, ace3, ace4, ace5, ace6, ace7)); + + Assertions.assertTrue( + streamService.isNamespaceOwnerOfKafkaStream(ns, "test.stream")); + Assertions.assertFalse( + streamService.isNamespaceOwnerOfKafkaStream(ns, "test-bis.stream"),"ACL are LITERAL"); + Assertions.assertFalse( + streamService.isNamespaceOwnerOfKafkaStream(ns, "test-ter.stream"), "Topic ACL missing"); + Assertions.assertFalse( + streamService.isNamespaceOwnerOfKafkaStream(ns, "test-qua.stream"),"Group ACL missing"); + Assertions.assertFalse( + streamService.isNamespaceOwnerOfKafkaStream(ns, "test-nop.stream"),"No ACL"); + } }