diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 32667fcf1eb13..b1a7190b823ac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -2435,7 +2435,8 @@ public void getRetention(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetRetention(applied, isGlobal)) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { @@ -2462,7 +2463,8 @@ public void setRetention(@Suspended final AsyncResponse asyncResponse, @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, @ApiParam(value = "Retention policies for the specified topic") RetentionPolicies retention) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetRetention(retention, isGlobal)) .thenRun(() -> { try { @@ -2498,7 +2500,8 @@ public void removeRetention(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.RETENTION, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalRemoveRetention(isGlobal)) .thenRun(() -> { log.info("[{}] Successfully remove retention: namespace={}, topic={}", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/MockedPulsarStandalone.java b/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java similarity index 76% rename from pulsar-broker/src/test/java/org/apache/pulsar/security/tls/MockedPulsarStandalone.java rename to pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java index 1a7e806f0e698..2eebfec941efd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/MockedPulsarStandalone.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/security/MockedPulsarStandalone.java @@ -16,25 +16,36 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.security.tls; +package org.apache.pulsar.security; import static org.apache.pulsar.utils.ResourceUtils.getAbsolutePath; +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.SignatureAlgorithm; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import javax.crypto.SecretKey; import lombok.Getter; import lombok.SneakyThrows; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls; import org.apache.pulsar.client.impl.auth.AuthenticationTls; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.util.ObjectMapperFactory; + public abstract class MockedPulsarStandalone implements AutoCloseable { @@ -60,6 +71,50 @@ public abstract class MockedPulsarStandalone implements AutoCloseable { serviceConfiguration.setExposeBundlesMetricsInPrometheus(true); } + + protected static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + + private static final String BROKER_INTERNAL_CLIENT_SUBJECT = "broker_internal"; + private static final String BROKER_INTERNAL_CLIENT_TOKEN = Jwts.builder() + .claim("sub", BROKER_INTERNAL_CLIENT_SUBJECT).signWith(SECRET_KEY).compact(); + protected static final String SUPER_USER_SUBJECT = "super-user"; + protected static final String SUPER_USER_TOKEN = Jwts.builder() + .claim("sub", SUPER_USER_SUBJECT).signWith(SECRET_KEY).compact(); + protected static final String NOBODY_SUBJECT = "nobody"; + protected static final String NOBODY_TOKEN = Jwts.builder() + .claim("sub", NOBODY_SUBJECT).signWith(SECRET_KEY).compact(); + + + @SneakyThrows + protected void loadTokenAuthentication() { + serviceConfiguration.setAuthenticationEnabled(true); + serviceConfiguration.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName())); + // internal client + serviceConfiguration.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + final Map brokerClientAuthParams = new HashMap<>(); + brokerClientAuthParams.put("token", BROKER_INTERNAL_CLIENT_TOKEN); + final String brokerClientAuthParamStr = MAPPER.writeValueAsString(brokerClientAuthParams); + serviceConfiguration.setBrokerClientAuthenticationParameters(brokerClientAuthParamStr); + + Properties properties = serviceConfiguration.getProperties(); + if (properties == null) { + properties = new Properties(); + serviceConfiguration.setProperties(properties); + } + properties.put("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY)); + + } + + + + protected void loadDefaultAuthorization() { + serviceConfiguration.setAuthorizationEnabled(true); + serviceConfiguration.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName()); + serviceConfiguration.setSuperUserRoles(Set.of(SUPER_USER_SUBJECT, BROKER_INTERNAL_CLIENT_SUBJECT)); + } + + + @SneakyThrows protected void loadECTlsCertificateWithFile() { serviceConfiguration.setTlsEnabled(true); @@ -176,4 +231,7 @@ public void close() throws Exception { protected static final String TLS_EC_KS_TRUSTED_STORE = getAbsolutePath("certificate-authority/ec/jks/ca.truststore.jks"); protected static final String TLS_EC_KS_TRUSTED_STORE_PASS = "rootpw"; + + + private static final ObjectMapper MAPPER = ObjectMapperFactory.getMapper().getObjectMapper(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/security/authz/DefaultAuthZWithPublicAPITest.java b/pulsar-broker/src/test/java/org/apache/pulsar/security/authz/DefaultAuthZWithPublicAPITest.java new file mode 100644 index 0000000000000..3b239dea9e052 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/security/authz/DefaultAuthZWithPublicAPITest.java @@ -0,0 +1,86 @@ +package org.apache.pulsar.security.authz; + +import io.jsonwebtoken.Jwts; +import java.util.Set; +import java.util.UUID; +import lombok.SneakyThrows; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.security.MockedPulsarStandalone; +import org.junit.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public final class DefaultAuthZWithPublicAPITest extends MockedPulsarStandalone { + + private static final String USER1_SUBJECT = "user1"; + private static final String USER1_TOKEN = Jwts.builder() + .claim("sub", USER1_SUBJECT).signWith(SECRET_KEY).compact(); + + private PulsarAdmin user1Admin; + + private PulsarAdmin superUserAdmin; + @SneakyThrows + @BeforeClass + public void before() { + loadTokenAuthentication(); + loadDefaultAuthorization(); + start(); + this.user1Admin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(USER1_TOKEN)) + .build(); + this.superUserAdmin =PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(SUPER_USER_TOKEN)) + .build(); + } + + + @SneakyThrows + @AfterClass + public void after() { + close(); + } + + + + @SneakyThrows + @Test + public void testConsumeWithTopicPolicyRetention() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + + // grant consume permission to user 1, it can lookup and consume messages + superUserAdmin.namespaces().grantPermissionOnNamespace("public/default", + USER1_SUBJECT, Set.of(AuthAction.consume)); + superUserAdmin.topics().createNonPartitionedTopic(topic); + + // the user 1 shouldn't touch retention policy + try { + user1Admin.topicPolicies().getRetention(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + final RetentionPolicies policies = new RetentionPolicies(1, 1); + user1Admin.topicPolicies().setRetention(topic, policies); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + + try { + user1Admin.topicPolicies().removeRetention(topic); + Assert.fail("unexpected behaviour"); + } catch (PulsarAdminException ex) { + Assert.assertTrue(ex instanceof PulsarAdminException.NotAuthorizedException); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECCertificateFileTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECCertificateFileTest.java index 39d9b7326d104..b02b10f5996bf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECCertificateFileTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECCertificateFileTest.java @@ -25,7 +25,7 @@ import java.util.UUID; import lombok.Cleanup; import lombok.SneakyThrows; -import org.apache.pulsar.security.tls.MockedPulsarStandalone; +import org.apache.pulsar.security.MockedPulsarStandalone; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECKeyStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECKeyStoreTest.java index e39ad67e4a9d1..c6ff16d4cc50d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECKeyStoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/security/tls/ec/TlsWithECKeyStoreTest.java @@ -21,9 +21,13 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import lombok.Cleanup; import lombok.SneakyThrows; -import org.apache.pulsar.security.tls.MockedPulsarStandalone; +import org.apache.pulsar.security.MockedPulsarStandalone; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -35,10 +39,6 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; @Test