From 2b83e0553248ca7b931783978d04f43d5ec4671c Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Wed, 13 Mar 2024 02:37:40 +0800 Subject: [PATCH] Topic properties support for update and remove (#5981) (#6058) * Topic properties support for update and remove (#5911) (cherry picked from commit 6d7dfcf6e4533a7afc3d00d6a6843a0cb859ab33) (cherry picked from commit 6564e6e4bd110a24ccdea5c2234a425fd05d7340) (cherry picked from commit bc7ee1ac4b1bb1eac7b266a530e6a8ea027eaca7) (cherry picked from commit 403aafe1931fbcb38e2e93862a2f41859608d89b) * [fix][broker] managedLedger.getConfig().getProperties().putAll(properties) NPE (#20361) (#5932) (cherry picked from commit aa7decc5b75894e864f3c5962c5cffad255abf25) Co-authored-by: Heesung Sohn <103456639+heesung-sn@users.noreply.github.com> (cherry picked from commit 5f6ed1b5f72d658f80bc4f42899b04344111a30e) (cherry picked from commit 876513450fb93d2b080cee6ae17c65b99cbfc696) (cherry picked from commit f6e3643bf3333793df6bc58ad31bdcdd4bccf5ff) * Revert gpg and delete useless file (cherry picked from commit 2739f2c3be4f68f132d9a4cee5bf767a9da1378e) (cherry picked from commit 73b7c06cee6c66b5d3fb7d4d8c7ec7d35df47f21) (cherry picked from commit 2eb41396db4c15cf379c62560adb76945ecf1ff0) (cherry picked from commit 6bed83b78c38936a60c0396603009869480c2da0) --- .../PulsarAuthorizationProvider.java | 1 + .../admin/impl/PersistentTopicsBase.java | 112 ++++++++++++++++++ .../broker/admin/v2/PersistentTopics.java | 71 +++++++++++ .../pulsar/broker/service/BrokerService.java | 3 +- .../pulsar/broker/admin/AdminApi2Test.java | 93 +++++++++++++++ .../broker/admin/TopicPoliciesTest.java | 14 +++ .../apache/pulsar/client/admin/Topics.java | 36 ++++++ .../client/admin/internal/TopicsImpl.java | 28 +++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 11 ++ .../apache/pulsar/admin/cli/CmdTopics.java | 38 ++++++ .../common/policies/data/TopicOperation.java | 1 + 11 files changed, 407 insertions(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 2201ee3031c55..8aa678e9e9f0a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -630,6 +630,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, case COMPACT: case OFFLOAD: case UNLOAD: + case DELETE_METADATA: case ADD_BUNDLE_RANGE: case GET_BUNDLE_RANGE: case DELETE_BUNDLE_RANGE: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index f18f30a860621..0972eeaff380f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -64,6 +64,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -654,6 +655,117 @@ private CompletableFuture> getPropertiesAsync() { }); } + protected CompletableFuture internalUpdatePropertiesAsync(boolean authoritative, + Map properties) { + if (properties == null || properties.isEmpty()) { + log.warn("[{}] [{}] properties is empty, ignore update", clientAppId(), topicName); + return CompletableFuture.completedFuture(null); + } + return validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PRODUCE)) + .thenCompose(__ -> { + if (topicName.isPartitioned()) { + return internalUpdateNonPartitionedTopicProperties(properties); + } else { + return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName) + .thenCompose(metadata -> { + if (metadata.partitions == 0) { + return internalUpdateNonPartitionedTopicProperties(properties); + } + return namespaceResources() + .getPartitionedTopicResources().updatePartitionedTopicAsync(topicName, + p -> new PartitionedTopicMetadata(p.partitions, + p.properties == null ? properties + : MapUtils.putAll(p.properties, properties.entrySet().toArray()))); + }); + } + }).thenAccept(__ -> + log.info("[{}] [{}] update properties success with properties {}", + clientAppId(), topicName, properties)); + } + + private CompletableFuture internalUpdateNonPartitionedTopicProperties(Map properties) { + CompletableFuture future = new CompletableFuture<>(); + pulsar().getBrokerService().getTopicIfExists(topicName.toString()) + .thenAccept(opt -> { + if (!opt.isPresent()) { + throw new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString())); + } + ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger(); + managedLedger.asyncSetProperties(properties, new AsyncCallbacks.UpdatePropertiesCallback() { + + @Override + public void updatePropertiesComplete(Map properties, Object ctx) { + if (managedLedger.getConfig().getProperties() == null) { + managedLedger.getConfig().setProperties(new HashMap<>()); + } + managedLedger.getConfig().getProperties().putAll(properties); + + future.complete(null); + } + + @Override + public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + }); + return future; + } + + protected CompletableFuture internalRemovePropertiesAsync(boolean authoritative, String key) { + return validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.DELETE_METADATA)) + .thenCompose(__ -> { + if (topicName.isPartitioned()) { + return internalRemoveNonPartitionedTopicProperties(key); + } else { + return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName) + .thenCompose(metadata -> { + if (metadata.partitions == 0) { + return internalRemoveNonPartitionedTopicProperties(key); + } + return namespaceResources() + .getPartitionedTopicResources().updatePartitionedTopicAsync(topicName, + p -> { + if (p.properties != null) { + p.properties.remove(key); + } + return new PartitionedTopicMetadata(p.partitions, p.properties); + }); + }); + } + }).thenAccept(__ -> + log.info("[{}] remove [{}] properties success with key {}", + clientAppId(), topicName, key)); + } + + private CompletableFuture internalRemoveNonPartitionedTopicProperties(String key) { + CompletableFuture future = new CompletableFuture<>(); + pulsar().getBrokerService().getTopicIfExists(topicName.toString()) + .thenAccept(opt -> { + if (!opt.isPresent()) { + throw new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString())); + } + ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger(); + managedLedger.asyncDeleteProperty(key, new AsyncCallbacks.UpdatePropertiesCallback() { + + @Override + public void updatePropertiesComplete(Map properties, Object ctx) { + future.complete(null); + } + + @Override + public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + }); + return future; + } + protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force, boolean deleteSchema) { validateTopicOwnershipAsync(topicName, authoritative) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index af5120fa9f91e..26889da82d417 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -915,6 +915,77 @@ public void getProperties( }); } + @PUT + @Path("/{tenant}/{namespace}/{topic}/properties") + @ApiOperation(value = "Update the properties on the given topic.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" + + "subscriber is not authorized to access this operation"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic/Subscription does not exist"), + @ApiResponse(code = 405, message = "Method Not Allowed"), + @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") + }) + public void updateProperties( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Key value pair properties for the topic metadata") Map properties){ + validatePersistentTopicName(tenant, namespace, encodedTopic); + internalUpdatePropertiesAsync(authoritative, properties) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to update topic {} properties", clientAppId(), topicName, ex); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/properties") + @ApiOperation(value = "Remove the key in properties on the given topic.") + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Partitioned topic does not exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 412, message = "Partitioned topic name is invalid"), + @ApiResponse(code = 500, message = "Internal server error") + }) + public void removeProperties( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("key") String key, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validatePersistentTopicName(tenant, namespace, encodedTopic); + internalRemovePropertiesAsync(authoritative, key) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to remove key {} in properties on topic {}", + clientAppId(), key, topicName, ex); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + @DELETE @Path("/{tenant}/{namespace}/{topic}/partitions") @ApiOperation(value = "Delete a partitioned topic.", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6860ab57a688e..9e78bc8ac1c66 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1469,7 +1469,8 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName) .thenAccept(isActive -> { if (isActive) { - createPersistentTopic(topic, createIfMissing, topicFuture, properties, topicPolicies); + createPersistentTopic(topic, createIfMissing, topicFuture, + properties == null ? new HashMap<>() : properties, topicPolicies); } else { // namespace is being unloaded String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 5b43aa7145e81..d84fab7137f86 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -902,6 +902,99 @@ public void testCreateAndGetTopicProperties() throws Exception { Assert.assertEquals(properties22.get("key2"), "value2"); } + @Test + public void testUpdatePartitionedTopicProperties() throws Exception { + final String namespace = "prop-xyz/ns2"; + final String topicName = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties"; + final String topicNameTwo = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties2"; + admin.namespaces().createNamespace(namespace, 20); + + // create partitioned topic without properties + admin.topics().createPartitionedTopic(topicName, 2); + Map properties = admin.topics().getProperties(topicName); + Assert.assertNull(properties); + Map topicProperties = new HashMap<>(); + topicProperties.put("key1", "value1"); + admin.topics().updateProperties(topicName, topicProperties); + properties = admin.topics().getProperties(topicName); + Assert.assertNotNull(properties); + Assert.assertEquals(properties.get("key1"), "value1"); + + // update with new key, old properties should keep + topicProperties = new HashMap<>(); + topicProperties.put("key2", "value2"); + admin.topics().updateProperties(topicName, topicProperties); + properties = admin.topics().getProperties(topicName); + Assert.assertNotNull(properties); + Assert.assertEquals(properties.size(), 2); + Assert.assertEquals(properties.get("key1"), "value1"); + Assert.assertEquals(properties.get("key2"), "value2"); + + // override old values + topicProperties = new HashMap<>(); + topicProperties.put("key1", "value11"); + admin.topics().updateProperties(topicName, topicProperties); + properties = admin.topics().getProperties(topicName); + Assert.assertNotNull(properties); + Assert.assertEquals(properties.size(), 2); + Assert.assertEquals(properties.get("key1"), "value11"); + Assert.assertEquals(properties.get("key2"), "value2"); + + // create topic without properties + admin.topics().createPartitionedTopic(topicNameTwo, 2); + properties = admin.topics().getProperties(topicNameTwo); + Assert.assertNull(properties); + // remove key of properties on this topic + admin.topics().removeProperties(topicNameTwo, "key1"); + properties = admin.topics().getProperties(topicNameTwo); + Assert.assertNull(properties); + Map topicProp = new HashMap<>(); + topicProp.put("key1", "value1"); + topicProp.put("key2", "value2"); + admin.topics().updateProperties(topicNameTwo, topicProp); + properties = admin.topics().getProperties(topicNameTwo); + Assert.assertEquals(properties, topicProp); + admin.topics().removeProperties(topicNameTwo, "key1"); + topicProp.remove("key1"); + properties = admin.topics().getProperties(topicNameTwo); + Assert.assertEquals(properties, topicProp); + } + + @Test + public void testUpdateNonPartitionedTopicProperties() throws Exception { + final String namespace = "prop-xyz/ns2"; + final String topicName = "persistent://" + namespace + "/testUpdateNonPartitionedTopicProperties"; + admin.namespaces().createNamespace(namespace, 20); + + // create non-partitioned topic with properties + Map topicProperties = new HashMap<>(); + topicProperties.put("key1", "value1"); + admin.topics().createNonPartitionedTopic(topicName, topicProperties); + Map properties = admin.topics().getProperties(topicName); + Assert.assertNotNull(properties); + Assert.assertEquals(properties.get("key1"), "value1"); + + // update with new key, old properties should keep + topicProperties = new HashMap<>(); + topicProperties.put("key2", "value2"); + admin.topics().updateProperties(topicName, topicProperties); + properties = admin.topics().getProperties(topicName); + Assert.assertNotNull(properties); + Assert.assertEquals(properties.size(), 2); + Assert.assertEquals(properties.get("key1"), "value1"); + Assert.assertEquals(properties.get("key2"), "value2"); + + // override old values + topicProperties = new HashMap<>(); + topicProperties.put("key1", "value11"); + admin.topics().updateProperties(topicName, topicProperties); + properties = admin.topics().getProperties(topicName); + Assert.assertNotNull(properties); + Assert.assertEquals(properties.size(), 2); + Assert.assertEquals(properties.get("key1"), "value11"); + Assert.assertEquals(properties.get("key2"), "value2"); + } + @Test public void testNonPersistentTopics() throws Exception { final String namespace = "prop-xyz/ns2"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 376391e6db50f..201530e438254 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -32,6 +32,7 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -184,6 +185,19 @@ public void testTopicPolicyInitialValueWithNamespaceAlreadyLoaded() throws Excep } + @Test + public void updatePropertiesForAutoCreatedTopicTest() throws Exception { + TopicName topicName = TopicName.get( + TopicDomain.persistent.value(), + NamespaceName.get(myNamespace), + "test-" + UUID.randomUUID() + ); + String testTopic = topicName.toString(); + Producer producer = pulsarClient.newProducer().topic(testTopic).create(); + HashMap properties = new HashMap<>(); + properties.put("backlogQuotaType", "message_age"); + admin.topics().updateProperties(testTopic, properties); + } @Test public void testSetSizeBasedBacklogQuota() throws Exception { diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 73f9a199a1b12..be706d048d686 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -679,6 +679,42 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal */ CompletableFuture> getPropertiesAsync(String topic); + /** + * Update Topic Properties on a topic. + * The new properties will override the existing values, old properties in the topic will be keep if not override. + * @param topic + * @param properties + * @throws PulsarAdminException + */ + void updateProperties(String topic, Map properties) throws PulsarAdminException; + + /** + * Update Topic Properties on a topic. + * The new properties will override the existing values, old properties in the topic will be keep if not override. + * @param topic + * @param properties + * @return + */ + CompletableFuture updatePropertiesAsync(String topic, Map properties); + + /** + * Remove the key in properties on a topic. + * + * @param topic + * @param key + * @throws PulsarAdminException + */ + void removeProperties(String topic, String key) throws PulsarAdminException; + + /** + * Remove the key in properties on a topic asynchronously. + * + * @param topic + * @param key + * @return + */ + CompletableFuture removePropertiesAsync(String topic, String key); + /** * Delete a partitioned topic. *

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index dbdaffd9e5c5f..b4be27e0770a4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -474,11 +474,39 @@ public void failed(Throwable throwable) { return future; } + @Override + public void updateProperties(String topic, Map properties) throws PulsarAdminException { + sync(() -> updatePropertiesAsync(topic, properties)); + } + + @Override + public CompletableFuture updatePropertiesAsync(String topic, Map properties) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "properties"); + if (properties == null) { + properties = new HashMap<>(); + } + return asyncPutRequest(path, Entity.entity(properties, MediaType.APPLICATION_JSON)); + } + @Override public void deletePartitionedTopic(String topic) throws PulsarAdminException { deletePartitionedTopic(topic, false); } + @Override + public void removeProperties(String topic, String key) throws PulsarAdminException { + sync(() -> removePropertiesAsync(topic, key)); + } + + @Override + public CompletableFuture removePropertiesAsync(String topic, String key) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "properties") + .queryParam("key", key); + return asyncDeleteRequest(path); + } + @Override public CompletableFuture deletePartitionedTopicAsync(String topic) { return deletePartitionedTopicAsync(topic, false); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 8e7e339b536cc..ca4ba39a61b66 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -1422,6 +1422,17 @@ public void topics() throws Exception { cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 --clear")); verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", new HashMap<>()); + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("update-properties persistent://myprop/clust/ns1/ds1 --property a=b -p x=y,z")); + props = new HashMap<>(); + props.put("a", "b"); + props.put("x", "y,z"); + verify(mockTopics).updateProperties("persistent://myprop/clust/ns1/ds1", props); + + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("remove-properties persistent://myprop/clust/ns1/ds1 --key a")); + verify(mockTopics).removeProperties("persistent://myprop/clust/ns1/ds1", "a"); + cmdTopics = new CmdTopics(() -> admin); props = new HashMap<>(); props.put("a", "b"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 056847c7583a1..eaa907726997d 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -56,6 +56,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.cli.NoSplitter; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; @@ -119,6 +120,8 @@ public CmdTopics(Supplier admin) { jcommander.addCommand("update-partitioned-topic", new UpdatePartitionedCmd()); jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd()); jcommander.addCommand("get-properties", new GetPropertiesCmd()); + jcommander.addCommand("update-properties", new UpdateProperties()); + jcommander.addCommand("remove-properties", new RemoveProperties()); jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd()); jcommander.addCommand("peek-messages", new PeekMessages()); @@ -605,6 +608,41 @@ void run() throws Exception { } } + @Parameters(commandDescription = "Update the properties of on a topic") + private class UpdateProperties extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--property", "-p"}, description = "key value pair properties(-p a=b -p c=d)", + required = false, splitter = NoSplitter.class) + private java.util.List properties; + + @Override + void run() throws Exception { + String topic = validateTopicName(params); + Map map = parseListKeyValueMap(properties); + if (map == null) { + map = Collections.emptyMap(); + } + getTopics().updateProperties(topic, map); + } + } + + @Parameters(commandDescription = "Remove the key in properties of a topic") + private class RemoveProperties extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = {"--key", "-k"}, description = "The key to remove in the properties of topic") + private String key; + + @Override + void run() throws Exception { + String topic = validateTopicName(params); + getTopics().removeProperties(topic, key); + } + } + @Parameters(commandDescription = "Delete a partitioned topic. " + "It will also delete all the partitions of the topic if it exists.") private class DeletePartitionedCmd extends CliCommand { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java index d4de706e607b7..0184e0efb82d7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java @@ -50,6 +50,7 @@ public enum TopicOperation { GET_STATS, GET_METADATA, + DELETE_METADATA, GET_BACKLOG_SIZE, SET_REPLICATED_SUBSCRIPTION_STATUS,