Skip to content

Commit

Permalink
KafkaStream resource must validate against Topic and ConsumerGroup ACL (
Browse files Browse the repository at this point in the history
#122)

* Change Stream ACL behavior

* Added tests
  • Loading branch information
twobeeb authored Oct 20, 2021
1 parent 69cbdff commit 4a66666
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ HttpResponse<KafkaStream> apply(String namespace,@Body @Valid KafkaStream stream
Namespace ns = getNamespace(namespace);

//Creation of the correct ACLs
if (!streamService.isNamespaceOwnerOfStream(namespace, stream.getMetadata().getName())) {
if (!streamService.isNamespaceOwnerOfKafkaStream(ns, stream.getMetadata().getName())) {
throw new ResourceValidationException(List.of("Invalid value " + stream.getMetadata().getName()
+ " for name: Namespace not OWNER of this stream"), "Stream", stream.getMetadata().getName());
+ " for name: Namespace not OWNER of underlying Topic prefix and Group prefix"), "Stream", stream.getMetadata().getName());
}
//Augment the Stream
stream.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
Expand Down Expand Up @@ -77,9 +77,9 @@ HttpResponse<KafkaStream> apply(String namespace,@Body @Valid KafkaStream stream
HttpResponse delete(String namespace,String stream, @QueryValue(defaultValue = "false") boolean dryrun){

Namespace ns = getNamespace(namespace);
if (!streamService.isNamespaceOwnerOfStream(namespace, stream)) {
if (!streamService.isNamespaceOwnerOfKafkaStream(ns, stream)) {
throw new ResourceValidationException(List.of("Invalid value " + stream
+ " for name: Namespace not OWNER of this stream"), "Stream", stream);
+ " for name: Namespace not OWNER of underlying Topic prefix and Group prefix"), "Stream", stream);
}
// exists ?
Optional<KafkaStream> optionalStream = streamService.findByName(ns, stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,17 @@ public Optional<KafkaStream> findByName(Namespace namespace, String stream) {
.findFirst();
}

public boolean isNamespaceOwnerOfStream(String namespace, String stream) {
return accessControlEntryService.isNamespaceOwnerOfResource(namespace, AccessControlEntry.ResourceType.TOPIC, stream);
public boolean isNamespaceOwnerOfKafkaStream(Namespace namespace, String resource) {
// KafkaStream Ownership is determined by both Topic and Group ownership on PREFIXED resource,
// this is because KafkaStream application.id is a consumergroup but also a prefix for internal topic names
return accessControlEntryService.findAllGrantedToNamespace(namespace)
.stream()
.filter(accessControlEntry -> accessControlEntry.getSpec().getPermission() == AccessControlEntry.Permission.OWNER)
.filter(accessControlEntry -> accessControlEntry.getSpec().getResourcePatternType() == AccessControlEntry.ResourcePatternType.PREFIXED)
.filter(accessControlEntry -> resource.startsWith(accessControlEntry.getSpec().getResource()))
.map(accessControlEntry -> accessControlEntry.getSpec().getResourceType())
.collect(Collectors.toList())
.containsAll(List.of(AccessControlEntry.ResourceType.TOPIC, AccessControlEntry.ResourceType.GROUP));
}

public KafkaStream create(KafkaStream stream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,6 @@ private List<AclBinding> buildAclBindingsFromKafkaStream(KafkaStream stream, Str
new AclBinding(
new ResourcePattern(ResourceType.TOPIC, stream.getMetadata().getName(), PatternType.PREFIXED),
new org.apache.kafka.common.acl.AccessControlEntry("User:" + kafkaUser, "*", AclOperation.DELETE, AclPermissionType.ALLOW)
),
// READ on application.id group
new AclBinding(
new ResourcePattern(ResourceType.GROUP, stream.getMetadata().getName(), PatternType.PREFIXED),
new org.apache.kafka.common.acl.AccessControlEntry("User:" + kafkaUser, "*", AclOperation.READ, AclPermissionType.ALLOW)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void createStreamSuccess() {
Mockito.when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));

Mockito.when(streamService.isNamespaceOwnerOfStream("test", "test_stream1"))
Mockito.when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1"))
.thenReturn(true);

when(streamService.findByName(ns, "test_stream1"))
Expand Down Expand Up @@ -182,7 +182,7 @@ void createStreamSuccessDryRun() {
Mockito.when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));

Mockito.when(streamService.isNamespaceOwnerOfStream("test", "test_stream1"))
Mockito.when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1"))
.thenReturn(true);

when(streamService.findByName(ns, "test_stream1"))
Expand Down Expand Up @@ -213,7 +213,7 @@ void updateStreamUnchanged() {
Mockito.when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));

Mockito.when(streamService.isNamespaceOwnerOfStream("test", "test_stream1"))
Mockito.when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1"))
.thenReturn(true);

when(streamService.findByName(ns, "test_stream1"))
Expand Down Expand Up @@ -244,7 +244,7 @@ void createStreamValidationError() {
Mockito.when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));

Mockito.when(streamService.isNamespaceOwnerOfStream("test", "test_stream1"))
Mockito.when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1"))
.thenReturn(false);
ResourceValidationException actual = Assertions.assertThrows(ResourceValidationException.class, () -> streamController.apply("test", stream1, false));
Mockito.verify(streamService, never()).create(any());
Expand All @@ -268,7 +268,7 @@ void deleteStreamSuccess() {
Mockito.when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));

Mockito.when(streamService.isNamespaceOwnerOfStream("test", "test_stream1"))
Mockito.when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1"))
.thenReturn(true);

when(streamService.findByName(ns, "test_stream1"))
Expand Down Expand Up @@ -300,7 +300,7 @@ void deleteStreamSuccessDryRun() {
Mockito.when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));

Mockito.when(streamService.isNamespaceOwnerOfStream("test", "test_stream1"))
Mockito.when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1"))
.thenReturn(true);

when(streamService.findByName(ns, "test_stream1"))
Expand Down Expand Up @@ -329,7 +329,7 @@ void deleteStreamNotFound() {
Mockito.when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));

Mockito.when(streamService.isNamespaceOwnerOfStream("test", "test_stream1"))
Mockito.when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1"))
.thenReturn(true);

when(streamService.findByName(ns, "test_stream1"))
Expand All @@ -352,7 +352,7 @@ void deleteStreamNotOwner() {
Mockito.when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));

Mockito.when(streamService.isNamespaceOwnerOfStream("test", "test_stream1"))
Mockito.when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1"))
.thenReturn(false);

ResourceValidationException actual = Assertions.assertThrows(ResourceValidationException.class, () -> streamController.delete("test", "test_stream1", false));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package com.michelin.ns4kafka.services;

import static org.mockito.Mockito.when;

import java.util.List;

import com.michelin.ns4kafka.models.AccessControlEntry;
import com.michelin.ns4kafka.models.KafkaStream;
import com.michelin.ns4kafka.models.Namespace;
import com.michelin.ns4kafka.models.ObjectMeta;
import com.michelin.ns4kafka.repositories.StreamRepository;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.List;

import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class StreamServiceTest {

Expand Down Expand Up @@ -140,4 +140,99 @@ void findByNameEmpty() {
var actual = streamService.findByName(ns, "test_stream2");
Assertions.assertTrue(actual.isEmpty());
}

@Test
void isNamespaceOwnerOfKafkaStream() {
Namespace ns = Namespace.builder()
.metadata(ObjectMeta.builder()
.name("test")
.cluster("local")
.build())
.build();

AccessControlEntry ace1 = AccessControlEntry.builder()
.spec(AccessControlEntry.AccessControlEntrySpec.builder()
.resourceType(AccessControlEntry.ResourceType.TOPIC)
.resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED)
.permission(AccessControlEntry.Permission.OWNER)
.resource("test.")
.grantedTo("test")
.build()
)
.build();
AccessControlEntry ace2 = AccessControlEntry.builder()
.spec(AccessControlEntry.AccessControlEntrySpec.builder()
.resourceType(AccessControlEntry.ResourceType.GROUP)
.resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED)
.permission(AccessControlEntry.Permission.OWNER)
.resource("test.")
.grantedTo("test")
.build()
)
.build();
AccessControlEntry ace3 = AccessControlEntry.builder()
.spec(AccessControlEntry.AccessControlEntrySpec.builder()
.resourceType(AccessControlEntry.ResourceType.CONNECT)
.resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED)
.permission(AccessControlEntry.Permission.OWNER)
.resource("test.")
.grantedTo("test")
.build()
)
.build();
AccessControlEntry ace4 = AccessControlEntry.builder()
.spec(AccessControlEntry.AccessControlEntrySpec.builder()
.resourceType(AccessControlEntry.ResourceType.TOPIC)
.resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL)
.permission(AccessControlEntry.Permission.OWNER)
.resource("test-bis.")
.grantedTo("test")
.build()
)
.build();
AccessControlEntry ace5 = AccessControlEntry.builder()
.spec(AccessControlEntry.AccessControlEntrySpec.builder()
.resourceType(AccessControlEntry.ResourceType.GROUP)
.resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL)
.permission(AccessControlEntry.Permission.OWNER)
.resource("test-bis.")
.grantedTo("test")
.build()
)
.build();
AccessControlEntry ace6 = AccessControlEntry.builder()
.spec(AccessControlEntry.AccessControlEntrySpec.builder()
.resourceType(AccessControlEntry.ResourceType.GROUP)
.resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED)
.permission(AccessControlEntry.Permission.OWNER)
.resource("test-ter.")
.grantedTo("test")
.build()
)
.build();
AccessControlEntry ace7 = AccessControlEntry.builder()
.spec(AccessControlEntry.AccessControlEntrySpec.builder()
.resourceType(AccessControlEntry.ResourceType.TOPIC)
.resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED)
.permission(AccessControlEntry.Permission.OWNER)
.resource("test-qua.")
.grantedTo("test")
.build()
)
.build();

when(accessControlEntryService.findAllGrantedToNamespace(ns))
.thenReturn(List.of(ace1, ace2, ace3, ace4, ace5, ace6, ace7));

Assertions.assertTrue(
streamService.isNamespaceOwnerOfKafkaStream(ns, "test.stream"));
Assertions.assertFalse(
streamService.isNamespaceOwnerOfKafkaStream(ns, "test-bis.stream"),"ACL are LITERAL");
Assertions.assertFalse(
streamService.isNamespaceOwnerOfKafkaStream(ns, "test-ter.stream"), "Topic ACL missing");
Assertions.assertFalse(
streamService.isNamespaceOwnerOfKafkaStream(ns, "test-qua.stream"),"Group ACL missing");
Assertions.assertFalse(
streamService.isNamespaceOwnerOfKafkaStream(ns, "test-nop.stream"),"No ACL");
}
}

0 comments on commit 4a66666

Please sign in to comment.