Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Topic properties support for update and remove (#5981) #6058

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
case COMPACT:
case OFFLOAD:
case UNLOAD:
case DELETE_METADATA:
case ADD_BUNDLE_RANGE:
case GET_BUNDLE_RANGE:
case DELETE_BUNDLE_RANGE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -654,6 +655,117 @@ private CompletableFuture<Map<String, String>> getPropertiesAsync() {
});
}

protected CompletableFuture<Void> internalUpdatePropertiesAsync(boolean authoritative,
Map<String, String> properties) {
if (properties == null || properties.isEmpty()) {
log.warn("[{}] [{}] properties is empty, ignore update", clientAppId(), topicName);
return CompletableFuture.completedFuture(null);
}
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PRODUCE))
.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return internalUpdateNonPartitionedTopicProperties(properties);
} else {
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose(metadata -> {
if (metadata.partitions == 0) {
return internalUpdateNonPartitionedTopicProperties(properties);
}
return namespaceResources()
.getPartitionedTopicResources().updatePartitionedTopicAsync(topicName,
p -> new PartitionedTopicMetadata(p.partitions,
p.properties == null ? properties
: MapUtils.putAll(p.properties, properties.entrySet().toArray())));
});
}
}).thenAccept(__ ->
log.info("[{}] [{}] update properties success with properties {}",
clientAppId(), topicName, properties));
}

private CompletableFuture<Void> internalUpdateNonPartitionedTopicProperties(Map<String, String> properties) {
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.thenAccept(opt -> {
if (!opt.isPresent()) {
throw new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString()));
}
ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger();
managedLedger.asyncSetProperties(properties, new AsyncCallbacks.UpdatePropertiesCallback() {

@Override
public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
if (managedLedger.getConfig().getProperties() == null) {
managedLedger.getConfig().setProperties(new HashMap<>());
}
managedLedger.getConfig().getProperties().putAll(properties);

future.complete(null);
}

@Override
public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
});
return future;
}

protected CompletableFuture<Void> internalRemovePropertiesAsync(boolean authoritative, String key) {
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.DELETE_METADATA))
.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return internalRemoveNonPartitionedTopicProperties(key);
} else {
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose(metadata -> {
if (metadata.partitions == 0) {
return internalRemoveNonPartitionedTopicProperties(key);
}
return namespaceResources()
.getPartitionedTopicResources().updatePartitionedTopicAsync(topicName,
p -> {
if (p.properties != null) {
p.properties.remove(key);
}
return new PartitionedTopicMetadata(p.partitions, p.properties);
});
});
}
}).thenAccept(__ ->
log.info("[{}] remove [{}] properties success with key {}",
clientAppId(), topicName, key));
}

private CompletableFuture<Void> internalRemoveNonPartitionedTopicProperties(String key) {
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.thenAccept(opt -> {
if (!opt.isPresent()) {
throw new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString()));
}
ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger();
managedLedger.asyncDeleteProperty(key, new AsyncCallbacks.UpdatePropertiesCallback() {

@Override
public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
future.complete(null);
}

@Override
public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
});
return future;
}

protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative,
boolean force, boolean deleteSchema) {
validateTopicOwnershipAsync(topicName, authoritative)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,77 @@ public void getProperties(
});
}

@PUT
@Path("/{tenant}/{namespace}/{topic}/properties")
@ApiOperation(value = "Update the properties on the given topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
@ApiResponse(code = 405, message = "Method Not Allowed"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void updateProperties(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Key value pair properties for the topic metadata") Map<String, String> properties){
validatePersistentTopicName(tenant, namespace, encodedTopic);
internalUpdatePropertiesAsync(authoritative, properties)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to update topic {} properties", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/properties")
@ApiOperation(value = "Remove the key in properties on the given topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Partitioned topic does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error")
})
public void removeProperties(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("key") String key,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validatePersistentTopicName(tenant, namespace, encodedTopic);
internalRemovePropertiesAsync(authoritative, key)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to remove key {} in properties on topic {}",
clientAppId(), key, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Delete a partitioned topic.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1469,7 +1469,8 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean
pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
.thenAccept(isActive -> {
if (isActive) {
createPersistentTopic(topic, createIfMissing, topicFuture, properties, topicPolicies);
createPersistentTopic(topic, createIfMissing, topicFuture,
properties == null ? new HashMap<>() : properties, topicPolicies);
} else {
// namespace is being unloaded
String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,99 @@ public void testCreateAndGetTopicProperties() throws Exception {
Assert.assertEquals(properties22.get("key2"), "value2");
}

@Test
public void testUpdatePartitionedTopicProperties() throws Exception {
final String namespace = "prop-xyz/ns2";
final String topicName = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties";
final String topicNameTwo = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties2";
admin.namespaces().createNamespace(namespace, 20);

// create partitioned topic without properties
admin.topics().createPartitionedTopic(topicName, 2);
Map<String, String> properties = admin.topics().getProperties(topicName);
Assert.assertNull(properties);
Map<String, String> topicProperties = new HashMap<>();
topicProperties.put("key1", "value1");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.get("key1"), "value1");

// update with new key, old properties should keep
topicProperties = new HashMap<>();
topicProperties.put("key2", "value2");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value1");
Assert.assertEquals(properties.get("key2"), "value2");

// override old values
topicProperties = new HashMap<>();
topicProperties.put("key1", "value11");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value11");
Assert.assertEquals(properties.get("key2"), "value2");

// create topic without properties
admin.topics().createPartitionedTopic(topicNameTwo, 2);
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertNull(properties);
// remove key of properties on this topic
admin.topics().removeProperties(topicNameTwo, "key1");
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertNull(properties);
Map<String, String> topicProp = new HashMap<>();
topicProp.put("key1", "value1");
topicProp.put("key2", "value2");
admin.topics().updateProperties(topicNameTwo, topicProp);
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertEquals(properties, topicProp);
admin.topics().removeProperties(topicNameTwo, "key1");
topicProp.remove("key1");
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertEquals(properties, topicProp);
}

@Test
public void testUpdateNonPartitionedTopicProperties() throws Exception {
final String namespace = "prop-xyz/ns2";
final String topicName = "persistent://" + namespace + "/testUpdateNonPartitionedTopicProperties";
admin.namespaces().createNamespace(namespace, 20);

// create non-partitioned topic with properties
Map<String, String> topicProperties = new HashMap<>();
topicProperties.put("key1", "value1");
admin.topics().createNonPartitionedTopic(topicName, topicProperties);
Map<String, String> properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.get("key1"), "value1");

// update with new key, old properties should keep
topicProperties = new HashMap<>();
topicProperties.put("key2", "value2");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value1");
Assert.assertEquals(properties.get("key2"), "value2");

// override old values
topicProperties = new HashMap<>();
topicProperties.put("key1", "value11");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value11");
Assert.assertEquals(properties.get("key2"), "value2");
}

@Test
public void testNonPersistentTopics() throws Exception {
final String namespace = "prop-xyz/ns2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -184,6 +185,19 @@ public void testTopicPolicyInitialValueWithNamespaceAlreadyLoaded() throws Excep
}


@Test
public void updatePropertiesForAutoCreatedTopicTest() throws Exception {
TopicName topicName = TopicName.get(
TopicDomain.persistent.value(),
NamespaceName.get(myNamespace),
"test-" + UUID.randomUUID()
);
String testTopic = topicName.toString();
Producer<byte[]> producer = pulsarClient.newProducer().topic(testTopic).create();
HashMap<String, String> properties = new HashMap<>();
properties.put("backlogQuotaType", "message_age");
admin.topics().updateProperties(testTopic, properties);
}
@Test
public void testSetSizeBasedBacklogQuota() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,42 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal
*/
CompletableFuture<Map<String, String>> getPropertiesAsync(String topic);

/**
* Update Topic Properties on a topic.
* The new properties will override the existing values, old properties in the topic will be keep if not override.
* @param topic
* @param properties
* @throws PulsarAdminException
*/
void updateProperties(String topic, Map<String, String> properties) throws PulsarAdminException;

/**
* Update Topic Properties on a topic.
* The new properties will override the existing values, old properties in the topic will be keep if not override.
* @param topic
* @param properties
* @return
*/
CompletableFuture<Void> updatePropertiesAsync(String topic, Map<String, String> properties);

/**
* Remove the key in properties on a topic.
*
* @param topic
* @param key
* @throws PulsarAdminException
*/
void removeProperties(String topic, String key) throws PulsarAdminException;

/**
* Remove the key in properties on a topic asynchronously.
*
* @param topic
* @param key
* @return
*/
CompletableFuture<Void> removePropertiesAsync(String topic, String key);

/**
* Delete a partitioned topic.
* <p/>
Expand Down
Loading
Loading