diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index efb110809da60..9c67085f37515 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1051,43 +1051,49 @@ public CompletableFuture> getTopic(final TopicName topicName, bo } final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent); if (isPersistentTopic) { - final CompletableFuture> topicPoliciesFuture = - getTopicPoliciesBypassSystemTopic(topicName); - return topicPoliciesFuture.exceptionally(ex -> { - final Throwable rc = FutureUtil.unwrapCompletionException(ex); - final String errorInfo = String.format("Topic creation encountered an exception by initialize" - + " topic policies service. topic_name=%s error_message=%s", topicName, rc.getMessage()); - log.error(errorInfo, rc); - throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo)); - }).thenCompose(optionalTopicPolicies -> { - final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null); - return topics.computeIfAbsent(topicName.toString(), (tpName) -> { - if (topicName.isPartitioned()) { - final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName()); - return fetchPartitionedTopicMetadataAsync(topicNameEntity) - .thenCompose((metadata) -> { - // Allow crate non-partitioned persistent topic that name includes `partition` - if (metadata.partitions == 0 - || topicName.getPartitionIndex() < metadata.partitions) { - return loadOrCreatePersistentTopic(tpName, createIfMissing, - properties, topicPolicies); - } - final String errorMsg = - String.format("Illegal topic partition name %s with max allowed " - + "%d partitions", topicName, metadata.partitions); - log.warn(errorMsg); - return FutureUtil - .failedFuture(new BrokerServiceException.NotAllowedException(errorMsg)); - }); - } - return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies); - }).thenCompose(optionalTopic -> { - if (!optionalTopic.isPresent() && createIfMissing) { - log.warn("[{}] Try to recreate the topic with createIfMissing=true " - + "but the returned topic is empty", topicName); - return getTopic(topicName, createIfMissing, properties); - } - return CompletableFuture.completedFuture(optionalTopic); + return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName) + .thenCompose(exists -> { + if (!exists && !createIfMissing) { + return CompletableFuture.completedFuture(Optional.empty()); + } + return getTopicPoliciesBypassSystemTopic(topicName).exceptionally(ex -> { + final Throwable rc = FutureUtil.unwrapCompletionException(ex); + final String errorInfo = String.format("Topic creation encountered an exception by initialize" + + " topic policies service. topic_name=%s error_message=%s", topicName, + rc.getMessage()); + log.error(errorInfo, rc); + throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo)); + }).thenCompose(optionalTopicPolicies -> { + final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null); + return topics.computeIfAbsent(topicName.toString(), (tpName) -> { + if (topicName.isPartitioned()) { + final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName()); + return fetchPartitionedTopicMetadataAsync(topicNameEntity) + .thenCompose((metadata) -> { + // Allow crate non-partitioned persistent topic that name includes + // `partition` + if (metadata.partitions == 0 + || topicName.getPartitionIndex() < metadata.partitions) { + return loadOrCreatePersistentTopic(tpName, createIfMissing, + properties, topicPolicies); + } + final String errorMsg = + String.format("Illegal topic partition name %s with max allowed " + + "%d partitions", topicName, metadata.partitions); + log.warn(errorMsg); + return FutureUtil.failedFuture( + new BrokerServiceException.NotAllowedException(errorMsg)); + }); + } + return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies); + }).thenCompose(optionalTopic -> { + if (!optionalTopic.isPresent() && createIfMissing) { + log.warn("[{}] Try to recreate the topic with createIfMissing=true " + + "but the returned topic is empty", topicName); + return getTopic(topicName, createIfMissing, properties); + } + return CompletableFuture.completedFuture(optionalTopic); + }); }); }); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java index e6459bbf74c31..ceb3c1d0d9335 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java @@ -126,7 +126,7 @@ public void testEvents(String topicTypePersistence, String topicTypePartitioned, boolean forceDelete) throws Exception { String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID(); - createTopicAndVerifyEvents(topicTypePartitioned, topicName); + createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName); events.clear(); if (topicTypePartitioned.equals("partitioned")) { @@ -150,7 +150,7 @@ public void testEventsWithUnload(String topicTypePersistence, String topicTypePa boolean forceDelete) throws Exception { String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID(); - createTopicAndVerifyEvents(topicTypePartitioned, topicName); + createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName); events.clear(); admin.topics().unload(topicName); @@ -182,7 +182,7 @@ public void testEventsActiveSub(String topicTypePersistence, String topicTypePar boolean forceDelete) throws Exception { String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID(); - createTopicAndVerifyEvents(topicTypePartitioned, topicName); + createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName); Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe(); Producer producer = pulsarClient.newProducer().topic(topicName).create(); @@ -238,7 +238,7 @@ public void testEventsActiveSub(String topicTypePersistence, String topicTypePar public void testTopicAutoGC(String topicTypePersistence, String topicTypePartitioned) throws Exception { String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID(); - createTopicAndVerifyEvents(topicTypePartitioned, topicName); + createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName); admin.namespaces().setInactiveTopicPolicies(namespace, new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true)); @@ -262,25 +262,21 @@ public void testTopicAutoGC(String topicTypePersistence, String topicTypePartiti ); } - private void createTopicAndVerifyEvents(String topicTypePartitioned, String topicName) throws Exception { + private void createTopicAndVerifyEvents(String topicDomain, String topicTypePartitioned, String topicName) throws Exception { final String[] expectedEvents; - if (topicTypePartitioned.equals("partitioned")) { - topicNameToWatch = topicName + "-partition-1"; - admin.topics().createPartitionedTopic(topicName, 2); - triggerPartitionsCreation(topicName); - + if (topicDomain.equalsIgnoreCase("persistent") || topicTypePartitioned.equals("partitioned")) { expectedEvents = new String[]{ "LOAD__BEFORE", "CREATE__BEFORE", "CREATE__SUCCESS", "LOAD__SUCCESS" }; - } else { - topicNameToWatch = topicName; - admin.topics().createNonPartitionedTopic(topicName); - expectedEvents = new String[]{ + // Before https://github.com/apache/pulsar/pull/21995, Pulsar will skip create topic if the topic + // was already exists, and the action "check topic exists" will try to load Managed ledger, + // the check triggers two exrtra events: [LOAD__BEFORE, LOAD__FAILURE]. + // #21995 fixed this wrong behavior, so remove these two events. "LOAD__BEFORE", "LOAD__FAILURE", "LOAD__BEFORE", @@ -288,7 +284,14 @@ private void createTopicAndVerifyEvents(String topicTypePartitioned, String topi "CREATE__SUCCESS", "LOAD__SUCCESS" }; - + } + if (topicTypePartitioned.equals("partitioned")) { + topicNameToWatch = topicName + "-partition-1"; + admin.topics().createPartitionedTopic(topicName, 2); + triggerPartitionsCreation(topicName); + } else { + topicNameToWatch = topicName; + admin.topics().createNonPartitionedTopic(topicName); } Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() -> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index f124a06845ff9..7cf84673be86a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -3417,4 +3417,32 @@ private void testAnalyzeSubscriptionBacklogNotCauseStuck() throws Exception { producer.close(); admin.topics().delete(topic); } + + @Test + public void testGetStatsIfPartitionNotExists() throws Exception { + // create topic. + final String partitionedTp = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp"); + admin.topics().createPartitionedTopic(partitionedTp, 1); + TopicName partition0 = TopicName.get(partitionedTp).getPartition(0); + boolean topicExists1 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent(); + assertTrue(topicExists1); + // Verify topics-stats works. + TopicStats topicStats = admin.topics().getStats(partition0.toString()); + assertNotNull(topicStats); + + // Delete partition and call topic-stats again. + admin.topics().delete(partition0.toString()); + boolean topicExists2 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent(); + assertFalse(topicExists2); + // Verify: respond 404. + try { + admin.topics().getStats(partition0.toString()); + fail("Should respond 404 after the partition was deleted"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("Topic partitions were not yet created")); + } + + // cleanup. + admin.topics().deletePartitionedTopic(partitionedTp); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java index c9138beee52d1..a75ae78cef393 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java @@ -149,10 +149,11 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() .sendTimeout(1, TimeUnit.SECONDS) .topic(topic) .create()) { - } catch (PulsarClientException.LookupException expected) { - String msg = "Namespace bundle for topic (%s) not served by this instance"; + } catch (PulsarClientException.TopicDoesNotExistException expected) { + // Since the "policies.deleted" is "true", the value of "isAllowAutoTopicCreationAsync" will be false, + // so the "TopicDoesNotExistException" is expected. log.info("Expected error", expected); - assertTrue(expected.getMessage().contains(String.format(msg, topic)) + assertTrue(expected.getMessage().contains(topic) || expected.getMessage().contains(topicPoliciesServiceInitException)); } @@ -160,10 +161,11 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() .topic(topic) .subscriptionName("test") .subscribe()) { - } catch (PulsarClientException.LookupException expected) { - String msg = "Namespace bundle for topic (%s) not served by this instance"; + } catch (PulsarClientException.TopicDoesNotExistException expected) { + // Since the "policies.deleted" is "true", the value of "isAllowAutoTopicCreationAsync" will be false, + // so the "TopicDoesNotExistException" is expected. log.info("Expected error", expected); - assertTrue(expected.getMessage().contains(String.format(msg, topic)) + assertTrue(expected.getMessage().contains(topic) || expected.getMessage().contains(topicPoliciesServiceInitException)); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 6f60a13fd4894..fe84aeb1df792 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -295,7 +295,8 @@ public void testPersistentPartitionedTopicUnload() throws Exception { assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName)); pulsar.getBrokerService().getTopicIfExists(topicName).get(); - assertTrue(pulsar.getBrokerService().getTopics().containsKey(topicName)); + // The map topics should only contain partitions, does not contain partitioned topic. + assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName)); // ref of partitioned-topic name should be empty assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());