diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java index cf2337e9ab..51c804960e 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java @@ -13,12 +13,17 @@ */ package io.streamnative.pulsar.handlers.kop.security.auth; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration; import io.streamnative.pulsar.handlers.kop.security.KafkaPrincipal; +import java.time.Duration; import java.util.Objects; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.common.naming.NamespaceName; @@ -39,6 +44,19 @@ public class SimpleAclAuthorizer implements Authorizer { private final AuthorizationService authorizationService; private final boolean forceCheckGroupId; + // Cache the authorization results to avoid authorizing PRODUCE or FETCH requests each time. + // key is (topic, role) + private final LoadingCache, Boolean> produceCache = Caffeine.newBuilder() + .maximumSize(10000) + .expireAfterWrite(Duration.ofMinutes(5)) + .refreshAfterWrite(Duration.ofMinutes(1)) + .build(__ -> null); + // key is (topic, role, group) + private final LoadingCache, Boolean> fetchCache = Caffeine.newBuilder() + .maximumSize(10000) + .expireAfterWrite(Duration.ofMinutes(5)) + .refreshAfterWrite(Duration.ofMinutes(1)) + .build(__ -> null); public SimpleAclAuthorizer(PulsarService pulsarService, KafkaServiceConfiguration config) { this.pulsarService = pulsarService; @@ -151,7 +169,16 @@ public CompletableFuture canGetTopicList(KafkaPrincipal principal, Reso public CompletableFuture canProduceAsync(KafkaPrincipal principal, Resource resource) { checkResourceType(resource, ResourceType.TOPIC); TopicName topicName = TopicName.get(resource.getName()); - return authorizationService.canProduceAsync(topicName, principal.getName(), principal.getAuthenticationData()); + final Pair key = Pair.of(topicName, principal.getName()); + final Boolean authorized = produceCache.get(key); + if (authorized != null) { + return CompletableFuture.completedFuture(authorized); + } + return authorizationService.canProduceAsync(topicName, principal.getName(), principal.getAuthenticationData()) + .thenApply(__ -> { + produceCache.put(key, __); + return __; + }); } @Override @@ -161,8 +188,17 @@ public CompletableFuture canConsumeAsync(KafkaPrincipal principal, Reso if (forceCheckGroupId && StringUtils.isBlank(principal.getGroupId())) { return CompletableFuture.completedFuture(false); } + final Triple key = Triple.of(topicName, principal.getName(), principal.getGroupId()); + final Boolean authorized = fetchCache.get(key); + if (authorized != null) { + return CompletableFuture.completedFuture(authorized); + } return authorizationService.canConsumeAsync( - topicName, principal.getName(), principal.getAuthenticationData(), principal.getGroupId()); + topicName, principal.getName(), principal.getAuthenticationData(), principal.getGroupId()) + .thenApply(__ -> { + fetchCache.put(key, __); + return __; + }); } @Override diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTest.java index edeada2ce6..e6aa506721 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTest.java @@ -13,130 +13,25 @@ */ package io.streamnative.pulsar.handlers.kop.security.auth; -import static org.mockito.Mockito.spy; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; - -import com.google.common.collect.Sets; -import io.jsonwebtoken.SignatureAlgorithm; -import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase; -import java.time.Duration; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import javax.crypto.SecretKey; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.PartitionInfo; -import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; -import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; -import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -/** - * Unit test for Authorization with `entryFormat=pulsar`. - */ -public class KafkaAuthorizationMockTest extends KopProtocolHandlerTestBase { - - protected static final String TENANT = "KafkaAuthorizationTest"; - protected static final String NAMESPACE = "ns1"; - private static final SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); - - protected static final String ADMIN_USER = "pass.pass"; +public class KafkaAuthorizationMockTest extends KafkaAuthorizationMockTestBase { @BeforeClass - @Override - protected void setup() throws Exception { - Properties properties = new Properties(); - properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey)); - - String adminToken = AuthTokenUtils.createToken(secretKey, ADMIN_USER, Optional.empty()); - - conf.setSaslAllowedMechanisms(Sets.newHashSet("PLAIN")); - conf.setKafkaMetadataTenant("internal"); - conf.setKafkaMetadataNamespace("__kafka"); - conf.setKafkaTenant(TENANT); - conf.setKafkaNamespace(NAMESPACE); - - conf.setClusterName(super.configClusterName); - conf.setAuthorizationEnabled(true); - conf.setAuthenticationEnabled(true); - conf.setAuthorizationAllowWildcardsMatching(true); - conf.setAuthorizationProvider(KafkaMockAuthorizationProvider.class.getName()); - conf.setAuthenticationProviders( - Sets.newHashSet(AuthenticationProviderToken.class.getName())); - conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); - conf.setBrokerClientAuthenticationParameters("token:" + adminToken); - conf.setProperties(properties); - - super.internalSetup(); + public void setup() throws Exception { + super.setup(); } - @AfterClass - @Override - protected void cleanup() throws Exception { - super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) - .authentication(this.conf.getBrokerClientAuthenticationPlugin(), - this.conf.getBrokerClientAuthenticationParameters()).build()); + @AfterClass(alwaysRun = true) + public void cleanup() throws Exception { + super.cleanup(); } - @Override - protected void createAdmin() throws Exception { - super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) - .authentication(this.conf.getBrokerClientAuthenticationPlugin(), - this.conf.getBrokerClientAuthenticationParameters()).build()); - } - - - @Test(timeOut = 30 * 1000) + @Test(timeOut = 30000) public void testSuperUserProduceAndConsume() throws PulsarAdminException { - String superUserToken = AuthTokenUtils.createToken(secretKey, "pass.pass", Optional.empty()); - String topic = "testSuperUserProduceAndConsumeTopic"; - String fullNewTopicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" + topic; - KProducer kProducer = new KProducer(topic, false, "localhost", getKafkaBrokerPort(), - TENANT + "/" + NAMESPACE, "token:" + superUserToken); - int totalMsgs = 10; - String messageStrPrefix = topic + "_message_"; - - for (int i = 0; i < totalMsgs; i++) { - String messageStr = messageStrPrefix + i; - kProducer.getProducer().send(new ProducerRecord<>(topic, i, messageStr)); - } - KConsumer kConsumer = new KConsumer(topic, "localhost", getKafkaBrokerPort(), false, - TENANT + "/" + NAMESPACE, "token:" + superUserToken, "DemoKafkaOnPulsarConsumer"); - kConsumer.getConsumer().subscribe(Collections.singleton(topic)); - - int i = 0; - while (i < totalMsgs) { - ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); - for (ConsumerRecord record : records) { - Integer key = record.key(); - assertEquals(messageStrPrefix + key.toString(), record.value()); - i++; - } - } - assertEquals(i, totalMsgs); - - // no more records - ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofMillis(200)); - assertTrue(records.isEmpty()); - - // ensure that we can list the topic - Map> result = kConsumer.getConsumer().listTopics(Duration.ofSeconds(1)); - assertEquals(result.size(), 1); - assertTrue(result.containsKey(topic), - "list of topics " + result.keySet() + " does not contains " + topic); - - // Cleanup - kProducer.close(); - kConsumer.close(); - admin.topics().deletePartitionedTopic(fullNewTopicName); + super.testSuperUserProduceAndConsume(); } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTestBase.java new file mode 100644 index 0000000000..032c538e77 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/KafkaAuthorizationMockTestBase.java @@ -0,0 +1,136 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.security.auth; + +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import com.google.common.collect.Sets; +import io.jsonwebtoken.SignatureAlgorithm; +import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import javax.crypto.SecretKey; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.PartitionInfo; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; + +/** + * Unit test for Authorization with `entryFormat=pulsar`. + */ +public class KafkaAuthorizationMockTestBase extends KopProtocolHandlerTestBase { + + protected static final String TENANT = "KafkaAuthorizationTest"; + protected static final String NAMESPACE = "ns1"; + protected static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + + protected static final String ADMIN_USER = "pass.pass"; + protected String authorizationProviderClassName = KafkaMockAuthorizationProvider.class.getName(); + + @Override + protected void setup() throws Exception { + Properties properties = new Properties(); + properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY)); + + String adminToken = AuthTokenUtils.createToken(SECRET_KEY, ADMIN_USER, Optional.empty()); + + conf.setSaslAllowedMechanisms(Sets.newHashSet("PLAIN")); + conf.setKafkaMetadataTenant("internal"); + conf.setKafkaMetadataNamespace("__kafka"); + conf.setKafkaTenant(TENANT); + conf.setKafkaNamespace(NAMESPACE); + + conf.setClusterName(super.configClusterName); + conf.setAuthorizationEnabled(true); + conf.setAuthenticationEnabled(true); + conf.setAuthorizationAllowWildcardsMatching(true); + conf.setAuthorizationProvider(authorizationProviderClassName); + conf.setAuthenticationProviders( + Sets.newHashSet(AuthenticationProviderToken.class.getName())); + conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + conf.setBrokerClientAuthenticationParameters("token:" + adminToken); + conf.setProperties(properties); + + super.internalSetup(); + } + + @Override + protected void cleanup() throws Exception { + super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(this.conf.getBrokerClientAuthenticationPlugin(), + this.conf.getBrokerClientAuthenticationParameters()).build()); + } + + @Override + protected void createAdmin() throws Exception { + super.admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(this.conf.getBrokerClientAuthenticationPlugin(), + this.conf.getBrokerClientAuthenticationParameters()).build()); + } + + public void testSuperUserProduceAndConsume() throws PulsarAdminException { + String superUserToken = AuthTokenUtils.createToken(SECRET_KEY, "pass.pass", Optional.empty()); + String topic = "testSuperUserProduceAndConsumeTopic"; + String fullNewTopicName = "persistent://" + TENANT + "/" + NAMESPACE + "/" + topic; + KProducer kProducer = new KProducer(topic, false, "localhost", getKafkaBrokerPort(), + TENANT + "/" + NAMESPACE, "token:" + superUserToken); + int totalMsgs = 10; + String messageStrPrefix = topic + "_message_"; + + for (int i = 0; i < totalMsgs; i++) { + String messageStr = messageStrPrefix + i; + kProducer.getProducer().send(new ProducerRecord<>(topic, i, messageStr)); + } + KConsumer kConsumer = new KConsumer(topic, "localhost", getKafkaBrokerPort(), false, + TENANT + "/" + NAMESPACE, "token:" + superUserToken, "DemoKafkaOnPulsarConsumer"); + kConsumer.getConsumer().subscribe(Collections.singleton(topic)); + + int i = 0; + while (i < totalMsgs) { + ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); + for (ConsumerRecord record : records) { + Integer key = record.key(); + assertEquals(messageStrPrefix + key.toString(), record.value()); + i++; + } + } + assertEquals(i, totalMsgs); + + // no more records + ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofMillis(200)); + assertTrue(records.isEmpty()); + + // ensure that we can list the topic + Map> result = kConsumer.getConsumer().listTopics(Duration.ofSeconds(1)); + assertEquals(result.size(), 1); + assertTrue(result.containsKey(topic), + "list of topics " + result.keySet() + " does not contains " + topic); + + // Cleanup + kProducer.close(); + kConsumer.close(); + admin.topics().deletePartitionedTopic(fullNewTopicName); + } +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SlowAuthorizationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SlowAuthorizationTest.java new file mode 100644 index 0000000000..c1b1f9202f --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/security/auth/SlowAuthorizationTest.java @@ -0,0 +1,112 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.security.auth; + +import java.time.Duration; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.common.naming.TopicName; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +public class SlowAuthorizationTest extends KafkaAuthorizationMockTestBase { + + @BeforeClass + public void setup() throws Exception { + super.authorizationProviderClassName = SlowMockAuthorizationProvider.class.getName(); + super.setup(); + } + + @AfterClass + public void cleanup() throws Exception { + super.cleanup(); + } + + @Test(timeOut = 60000) + public void testManyMessages() throws Exception { + String superUserToken = AuthTokenUtils.createToken(SECRET_KEY, "normal-user", Optional.empty()); + final String topic = "test-many-messages"; + @Cleanup + final KProducer kProducer = new KProducer(topic, false, "localhost", getKafkaBrokerPort(), + TENANT + "/" + NAMESPACE, "token:" + superUserToken); + long start = System.currentTimeMillis(); + log.info("Before send"); + for (int i = 0; i < 1000; i++) { + kProducer.getProducer().send(new ProducerRecord(topic, "msg-" + i)).get(); + } + log.info("After send ({} ms)", System.currentTimeMillis() - start); + @Cleanup + KConsumer kConsumer = new KConsumer(topic, "localhost", getKafkaBrokerPort(), false, + TENANT + "/" + NAMESPACE, "token:" + superUserToken, "DemoKafkaOnPulsarConsumer"); + kConsumer.getConsumer().subscribe(Collections.singleton(topic)); + int i = 0; + start = System.currentTimeMillis(); + log.info("Before poll"); + while (i < 1000) { + final ConsumerRecords records = kConsumer.getConsumer().poll(Duration.ofSeconds(1)); + i += records.count(); + } + log.info("After poll ({} ms)", System.currentTimeMillis() - start); + } + + public static class SlowMockAuthorizationProvider extends KafkaMockAuthorizationProvider { + + @Override + public CompletableFuture isSuperUser(String role, ServiceConfiguration serviceConfiguration) { + return CompletableFuture.completedFuture(role.equals("pass.pass")); + } + + @Override + public CompletableFuture isSuperUser(String role, AuthenticationDataSource authenticationData, + ServiceConfiguration serviceConfiguration) { + return CompletableFuture.completedFuture(role.equals("pass.pass")); + } + + @Override + public CompletableFuture canProduceAsync(TopicName topicName, String role, + AuthenticationDataSource authenticationData) { + return authorizeSlowly(); + } + + @Override + public CompletableFuture canConsumeAsync( + TopicName topicName, String role, AuthenticationDataSource authenticationData, String subscription) { + return authorizeSlowly(); + } + + private static CompletableFuture authorizeSlowly() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return CompletableFuture.completedFuture(true); + } + + @Override + CompletableFuture roleAuthorizedAsync(String role) { + return CompletableFuture.completedFuture(true); + } + } +}