From 0e14aec9809ab501fc95ac4f45503fd8f65ec92e Mon Sep 17 00:00:00 2001 From: kmrdhruv Date: Wed, 19 Jun 2024 10:25:38 +0530 Subject: [PATCH] Refactoring and start consumer integration. (#154) * Refactoring for - Add active produce/consume topic/subscription to composite entities - Move default topic/node capacity to config from hard coded values. - moved varadhitopic factory/service to server module. - refactoring related to start subscription in consumer --- .../java/com/flipkart/varadhi/Constants.java | 4 ++ .../varadhi/consumer/ConsumerApiMgr.java | 21 ++++-- .../varadhi/consumer/ConsumersManager.java | 20 +++--- .../consumer/ConsumptionFailurePolicy.java | 7 +- .../varadhi/consumer/StorageRetryTopic.java | 28 -------- .../consumer/impl/ConsumersManagerImpl.java | 10 +-- .../varadhi/controller/ControllerApiMgr.java | 5 +- .../varadhi/controller/ShardAssigner.java | 7 +- .../impl/LeastAssignedStrategy.java | 2 +- .../varadhi/core/cluster/OperationMgr.java | 8 ++- .../InternalCompositeSubscription.java | 42 +++++++++-- .../entities/InternalCompositeTopic.java | 33 ++++++--- .../varadhi/entities/RetrySubscription.java | 4 +- .../entities/SubscriptionUnitShard.java | 3 +- .../varadhi/entities/TopicCapacityPolicy.java | 11 ++- .../varadhi/entities/TopicResource.java | 15 ++-- .../flipkart/varadhi/entities/TopicState.java | 2 + .../varadhi/entities/VaradhiTopic.java | 19 ++--- .../entities/cluster/ConsumerInfo.java | 5 +- .../entities/cluster/ConsumerNode.java | 16 ++--- .../varadhi/entities/cluster/MemberInfo.java | 2 +- .../entities/cluster/MemberResources.java | 15 ---- .../entities/cluster/NodeCapacity.java | 16 ++--- .../varadhi/entities/VaradhiTopicTest.java | 19 ++--- .../produce/services/ProducerService.java | 14 ++-- .../services/ProducerServiceTests.java | 46 +++++++----- .../varadhi/pulsar/PulsarTopicService.java | 4 ++ .../pulsar/PulsarProducerFactoryTest.java | 4 +- .../pulsar/PulsarStackProviderTest.java | 4 +- .../pulsar/entities/PulsarProducerTest.java | 3 +- .../services/PulsarTopicServiceTest.java | 12 ++-- .../pulsar/PulsarTopicServiceTest.java | 13 ++-- .../flipkart/varadhi/VaradhiApplication.java | 40 +++++------ .../flipkart/varadhi/config/MemberConfig.java | 3 +- .../flipkart/varadhi/config/RestOptions.java | 3 + .../services}/VaradhiTopicService.java | 26 ++----- .../varadhi/utils/ShardProvisioner.java | 70 +++++++++++-------- .../utils/VaradhiSubscriptionFactory.java | 29 ++++---- .../varadhi/utils}/VaradhiTopicFactory.java | 19 ++--- .../verticles/consumer/ConsumerVerticle.java | 8 +-- .../webserver/WebServerVerticle.java | 24 +++++-- .../web/v1/admin/SubscriptionHandlers.java | 11 ++- .../varadhi/web/v1/admin/TopicHandlers.java | 8 +-- server/src/main/resources/configuration.yml | 8 ++- .../services/SubscriptionServiceTest.java | 34 +++++---- .../services}/VaradhiTopicServiceTest.java | 24 +++---- .../utils}/VaradhiTopicFactoryTest.java | 16 ++--- .../web/admin/SubscriptionHandlersTest.java | 2 +- .../varadhi/web/admin/TopicHandlersTest.java | 24 ++++--- .../src/test/resources/testConfiguration.yml | 1 - .../java/com/flipkart/varadhi/TopicTests.java | 3 +- 51 files changed, 397 insertions(+), 370 deletions(-) delete mode 100644 consumer/src/main/java/com/flipkart/varadhi/consumer/StorageRetryTopic.java delete mode 100644 entities/src/main/java/com/flipkart/varadhi/entities/cluster/MemberResources.java rename {core/src/main/java/com/flipkart/varadhi/core => server/src/main/java/com/flipkart/varadhi/services}/VaradhiTopicService.java (72%) rename {core/src/main/java/com/flipkart/varadhi/core => server/src/main/java/com/flipkart/varadhi/utils}/VaradhiTopicFactory.java (65%) rename {core/src/test/java/com/flipkart/varadhi/core => server/src/test/java/com/flipkart/varadhi/services}/VaradhiTopicServiceTest.java (94%) rename {core/src/test/java/com/flipkart/varadhi/core => server/src/test/java/com/flipkart/varadhi/utils}/VaradhiTopicFactoryTest.java (84%) diff --git a/common/src/main/java/com/flipkart/varadhi/Constants.java b/common/src/main/java/com/flipkart/varadhi/Constants.java index b5b32eab..79773248 100644 --- a/common/src/main/java/com/flipkart/varadhi/Constants.java +++ b/common/src/main/java/com/flipkart/varadhi/Constants.java @@ -1,5 +1,7 @@ package com.flipkart.varadhi; +import com.flipkart.varadhi.entities.TopicCapacityPolicy; + public class Constants { public static final int RANDOM_PARTITION_KEY_LENGTH = 5; public static final String CONTEXT_KEY_BODY = "varadhi.body"; @@ -8,6 +10,8 @@ public class Constants { // TODO: this header is only for testing and x_ convention may cause it to be sent to the destination during consume public static final String USER_ID_HEADER = "x_user_id"; + public static final TopicCapacityPolicy DefaultTopicCapacity = new TopicCapacityPolicy(100, 400, 2); + public static class PathParams { public static final String PATH_PARAM_ORG = "org"; public static final String PATH_PARAM_TEAM = "team"; diff --git a/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumerApiMgr.java b/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumerApiMgr.java index 9da16c6c..caff7c8f 100644 --- a/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumerApiMgr.java +++ b/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumerApiMgr.java @@ -1,6 +1,9 @@ package com.flipkart.varadhi.consumer; import com.flipkart.varadhi.core.cluster.ConsumerApi; +import com.flipkart.varadhi.entities.StorageSubscription; +import com.flipkart.varadhi.entities.StorageTopic; +import com.flipkart.varadhi.entities.SubscriptionUnitShard; import com.flipkart.varadhi.entities.cluster.ConsumerInfo; import com.flipkart.varadhi.entities.cluster.ShardOperation; import com.flipkart.varadhi.entities.VaradhiSubscription; @@ -20,16 +23,24 @@ public ConsumerApiMgr(ConsumersManager consumersManager) { @Override public CompletableFuture start(ShardOperation.StartData operation) { + log.info("Consumer: Starting shard {}", operation); VaradhiSubscription subscription = operation.getSubscription(); + SubscriptionUnitShard shard = operation.getShard(); + StorageSubscription mainSub = shard.getMainSubscription().getSubscriptionToConsume(); + ConsumptionFailurePolicy failurePolicy = + new ConsumptionFailurePolicy(subscription.getRetryPolicy(), shard.getRetrySubscription(), + shard.getDeadLetterSubscription() + ); + return consumersManager.startSubscription( - null, + subscription.getProject(), subscription.getName(), - "", - null, + operation.getShardId(), + mainSub, subscription.isGrouped(), subscription.getEndpoint(), subscription.getConsumptionPolicy(), - null + failurePolicy ); } @@ -38,7 +49,7 @@ public CompletableFuture stop(ShardOperation.StopData operation) { VaradhiSubscription subscription = operation.getSubscription(); return consumersManager.stopSubscription( subscription.getName(), - "" + operation.getShardId() ); } diff --git a/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumersManager.java b/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumersManager.java index fbef111d..9906fbb3 100644 --- a/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumersManager.java +++ b/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumersManager.java @@ -1,11 +1,7 @@ package com.flipkart.varadhi.consumer; -import com.flipkart.varadhi.entities.ConsumptionPolicy; -import com.flipkart.varadhi.entities.Endpoint; -import com.flipkart.varadhi.entities.Project; -import com.flipkart.varadhi.entities.StorageTopic; +import com.flipkart.varadhi.entities.*; import com.flipkart.varadhi.entities.cluster.ConsumerInfo; -import com.flipkart.varadhi.entities.TopicPartitions; import java.util.concurrent.CompletableFuture; @@ -22,23 +18,23 @@ public interface ConsumersManager { * @return */ CompletableFuture startSubscription( - Project project, + String project, String subscription, - String shardName, - TopicPartitions topic, + int shardId, + StorageSubscription mainSubscription, boolean grouped, Endpoint endpoint, ConsumptionPolicy consumptionPolicy, ConsumptionFailurePolicy failurePolicy ); - CompletableFuture stopSubscription(String subscription, String shardName); + CompletableFuture stopSubscription(String subscription, int shardId); - void pauseSubscription(String subscription, String shardName); + void pauseSubscription(String subscription, int shardId); - void resumeSubscription(String subscription, String shardName); + void resumeSubscription(String subscription, int shardId); - ConsumerState getConsumerState(String subscription, String shardName); + ConsumerState getConsumerState(String subscription, int shardId); // TODO likely need status on the starting / stopping as well; as the above status is for a running consumer.. diff --git a/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumptionFailurePolicy.java b/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumptionFailurePolicy.java index 992b280c..5f0deb97 100644 --- a/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumptionFailurePolicy.java +++ b/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumptionFailurePolicy.java @@ -1,7 +1,6 @@ package com.flipkart.varadhi.consumer; -import com.flipkart.varadhi.entities.RetryPolicy; -import com.flipkart.varadhi.entities.StorageTopic; +import com.flipkart.varadhi.entities.*; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -11,7 +10,7 @@ public class ConsumptionFailurePolicy { private final RetryPolicy retryPolicy; - private final StorageRetryTopic retryTopic; + private final RetrySubscription retrySubscription; - private final StorageTopic deadLetterTopic; + private final InternalCompositeSubscription deadLetterSubscription; } diff --git a/consumer/src/main/java/com/flipkart/varadhi/consumer/StorageRetryTopic.java b/consumer/src/main/java/com/flipkart/varadhi/consumer/StorageRetryTopic.java deleted file mode 100644 index c625fe9d..00000000 --- a/consumer/src/main/java/com/flipkart/varadhi/consumer/StorageRetryTopic.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.flipkart.varadhi.consumer; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.flipkart.varadhi.entities.StorageTopic; - -public class StorageRetryTopic { - - private final StorageTopic[] retryTopics; - - public StorageRetryTopic(StorageTopic[] retryTopics) { - this.retryTopics = retryTopics; - } - - /** - * @param retryCount 1-based retry count - * - * @return the storage topic for the given retry count - */ - @JsonIgnore - public StorageTopic getTopicForRetry(int retryCount) { - return retryTopics[retryCount - 1]; - } - - @JsonIgnore - public int getMaxRetryCount() { - return retryTopics.length; - } -} diff --git a/consumer/src/main/java/com/flipkart/varadhi/consumer/impl/ConsumersManagerImpl.java b/consumer/src/main/java/com/flipkart/varadhi/consumer/impl/ConsumersManagerImpl.java index 203dd5aa..c3ac7772 100644 --- a/consumer/src/main/java/com/flipkart/varadhi/consumer/impl/ConsumersManagerImpl.java +++ b/consumer/src/main/java/com/flipkart/varadhi/consumer/impl/ConsumersManagerImpl.java @@ -18,7 +18,7 @@ public ConsumersManagerImpl(ConsumerInfo consumerInfo) { @Override public CompletableFuture startSubscription( - Project project, String subscription, String shardName, TopicPartitions topic, + String project, String subscription, int shardId, StorageSubscription storageSubscription, boolean grouped, Endpoint endpoint, ConsumptionPolicy consumptionPolicy, ConsumptionFailurePolicy failurePolicy ) { @@ -26,22 +26,22 @@ public CompletableFuture startSubscription( } @Override - public CompletableFuture stopSubscription(String subscription, String shardName) { + public CompletableFuture stopSubscription(String subscription, int shardId) { return CompletableFuture.completedFuture(null); } @Override - public void pauseSubscription(String subscription, String shardName) { + public void pauseSubscription(String subscription, int shardId) { } @Override - public void resumeSubscription(String subscription, String shardName) { + public void resumeSubscription(String subscription, int shardId) { } @Override - public ConsumerState getConsumerState(String subscription, String shardName) { + public ConsumerState getConsumerState(String subscription, int shardId) { return null; } diff --git a/controller/src/main/java/com/flipkart/varadhi/controller/ControllerApiMgr.java b/controller/src/main/java/com/flipkart/varadhi/controller/ControllerApiMgr.java index 2006d19a..7864b5ab 100644 --- a/controller/src/main/java/com/flipkart/varadhi/controller/ControllerApiMgr.java +++ b/controller/src/main/java/com/flipkart/varadhi/controller/ControllerApiMgr.java @@ -84,6 +84,7 @@ private SubscriptionStatus getSubscriptionStatusFromShardStatus( public CompletableFuture startSubscription( String subscriptionId, String requestedBy ) { + //TODO:: Fix it -assignment failure is not failing the start op. Task failure in the operation mgr queue. VaradhiSubscription subscription = metaStore.getSubscription(subscriptionId); return getSubscriptionStatus(subscription).exceptionally(t -> { // If not temporary, then alternate needs to be provided to allow recovery from this. @@ -111,7 +112,7 @@ private CompletableFuture> getOrCreateShardAssignment(VaradhiSu return shardAssigner.assignShard(unAssigned, subscription); } else { log.info( - "{} Shards for Subscription {} are already assigned", assignedShards.size(), + "{} Shards for Subscription {} are already assigned.", assignedShards.size(), subscription.getName() ); return CompletableFuture.completedFuture(assignedShards); @@ -250,7 +251,7 @@ private List getSubscriptionShards(SubscriptionShards sha @Override public CompletableFuture update(ShardOperation.OpData opData) { - log.debug("Received update on shard operation: {}", opData); + log.info("Received update on shard operation: {}", opData); try { // Update is getting executed inline on dispatcher thread. operationMgr.updateShardOp(opData); diff --git a/controller/src/main/java/com/flipkart/varadhi/controller/ShardAssigner.java b/controller/src/main/java/com/flipkart/varadhi/controller/ShardAssigner.java index 4f8f3f43..359d9ce3 100644 --- a/controller/src/main/java/com/flipkart/varadhi/controller/ShardAssigner.java +++ b/controller/src/main/java/com/flipkart/varadhi/controller/ShardAssigner.java @@ -9,7 +9,6 @@ import com.flipkart.varadhi.spi.db.AssignmentStore; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.mutable.MutableBoolean; @@ -38,7 +37,7 @@ public ShardAssigner(AssignmentStore assignmentStore, MeterRegistry meterRegistr public void addConsumerNodes(List clusterConsumers) { clusterConsumers.forEach(c -> { addConsumerNode(c); - log.info("Added consumer node {}", c.getMemberInfo().hostname()); + log.info("Added consumer node {}", c.getConsumerId()); }); } @@ -103,7 +102,7 @@ public List getSubscriptionAssignment(String subscriptionName) { public void consumerNodeJoined(ConsumerNode consumerNode) { boolean added = addConsumerNode(consumerNode); if (added) { - log.info("ConsumerNode {} joined.", consumerNode.getMemberInfo().hostname()); + log.info("ConsumerNode {} joined.", consumerNode.getConsumerId()); } } @@ -123,7 +122,7 @@ public void consumerNodeLeft(String consumerNodeId) { } private boolean addConsumerNode(ConsumerNode consumerNode) { - String consumerNodeId = consumerNode.getMemberInfo().hostname(); + String consumerNodeId = consumerNode.getConsumerId(); MutableBoolean added = new MutableBoolean(false); consumerNodes.computeIfAbsent(consumerNodeId, k -> { added.setTrue(); diff --git a/controller/src/main/java/com/flipkart/varadhi/controller/impl/LeastAssignedStrategy.java b/controller/src/main/java/com/flipkart/varadhi/controller/impl/LeastAssignedStrategy.java index 26500651..8f67f516 100644 --- a/controller/src/main/java/com/flipkart/varadhi/controller/impl/LeastAssignedStrategy.java +++ b/controller/src/main/java/com/flipkart/varadhi/controller/impl/LeastAssignedStrategy.java @@ -42,7 +42,7 @@ public List assign( throw new CapacityException("Not enough Resources for Subscription assignment."); } Assignment assignment = - new Assignment(subscription.getName(), shard.getShardId(), consumerNode.getMemberInfo().hostname()); + new Assignment(subscription.getName(), shard.getShardId(), consumerNode.getConsumerId()); consumerNode.allocate(assignment, shard.getCapacityRequest()); assignments.add(assignment); consumers.add(consumerNode); diff --git a/core/src/main/java/com/flipkart/varadhi/core/cluster/OperationMgr.java b/core/src/main/java/com/flipkart/varadhi/core/cluster/OperationMgr.java index 51a52641..fee2e69c 100644 --- a/core/src/main/java/com/flipkart/varadhi/core/cluster/OperationMgr.java +++ b/core/src/main/java/com/flipkart/varadhi/core/cluster/OperationMgr.java @@ -84,7 +84,6 @@ private void handleSubOpUpdate( SubscriptionOperation.OpData updated = operation.getData(); subOps.compute(operation.getData().getSubscriptionId(), (subId, scheduledTasks) -> { if (null != scheduledTasks && !scheduledTasks.isEmpty()) { - // process the update using provided handler. // Update processing can take time, this will affect a subscription. if (null != updateHandler) { @@ -102,14 +101,17 @@ private void handleSubOpUpdate( // Remove completed operation from the pending list and schedule next operation if available. if (operation.completed()) { scheduledTasks.removeFirst(); - log.info("Completed SubOp({}) removed from the queue.", updated); + log.info("Completed SubOp({}) removed from the queue.", operation.getData()); if (scheduledTasks.isEmpty()) { + log.info("No more pending operation for {}.", subId); return null; } else { OpTask waiting = scheduledTasks.peekFirst(); - log.info("Pending SubOp({}) scheduled for execution.", waiting.operation.getData()); + log.info("Next pending SubOp({}) scheduled for execution.", waiting.operation.getData()); executor.submit(waiting); } + }else{ + log.info("Pending SubOp({}) still in progress", operation.getData()); } return scheduledTasks; } else { diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/InternalCompositeSubscription.java b/entities/src/main/java/com/flipkart/varadhi/entities/InternalCompositeSubscription.java index 5a9f4ddd..fd02a5c9 100644 --- a/entities/src/main/java/com/flipkart/varadhi/entities/InternalCompositeSubscription.java +++ b/entities/src/main/java/com/flipkart/varadhi/entities/InternalCompositeSubscription.java @@ -1,15 +1,43 @@ package com.flipkart.varadhi.entities; import com.fasterxml.jackson.annotation.JsonIgnore; -import lombok.Data; +import lombok.AllArgsConstructor; +import lombok.Getter; -@Data +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +@Getter +@AllArgsConstructor public class InternalCompositeSubscription { - private final String subRegion; private final InternalQueueType queueType; + private StorageSubscription[] storageSubscriptions; + private int produceIndex; + private int consumeIndex; + + + public static InternalCompositeSubscription of( + StorageSubscription storageSubscription, InternalQueueType queueType + ) { + return new InternalCompositeSubscription(queueType, new StorageSubscription[]{storageSubscription}, 0, 0); + } + + @JsonIgnore + public StorageTopic getTopicToProduce() { + if (queueType.getCategory() == InternalQueueCategory.MAIN) { + throw new IllegalArgumentException("Main Subscription does not have a topic to produce"); + } + return storageSubscriptions[produceIndex].getTopicPartitions().getTopic(); + } + + @JsonIgnore + public StorageSubscription getSubscriptionToConsume() { + return storageSubscriptions[consumeIndex]; + } - /** - * As of now only 1 is supported, but in future this can be an array where we can add more storage topics. - */ - private final StorageSubscription storageSubscription; + @JsonIgnore + public List> getActiveSubscriptions() { + return new ArrayList<>(Arrays.asList(storageSubscriptions)); + } } diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/InternalCompositeTopic.java b/entities/src/main/java/com/flipkart/varadhi/entities/InternalCompositeTopic.java index abced429..65a9b486 100644 --- a/entities/src/main/java/com/flipkart/varadhi/entities/InternalCompositeTopic.java +++ b/entities/src/main/java/com/flipkart/varadhi/entities/InternalCompositeTopic.java @@ -1,22 +1,39 @@ package com.flipkart.varadhi.entities; -import lombok.Data; +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; /** * A wrapper on the storage topic. In future this class will handle the adding additional storage topics for the purpose * of increasing partition count without affecting ordering. * This concept is internal and is never exposed to the user. */ -@Data +@Getter +@AllArgsConstructor public class InternalCompositeTopic { + private StorageTopic[] storageTopics; + private int produceIndex; + @Setter + private TopicState topicState; - private final String topicRegion; + public static InternalCompositeTopic of(StorageTopic storageTopic) { + return new InternalCompositeTopic(new StorageTopic[]{storageTopic}, 0, TopicState.Producing); + } - private final TopicState topicState; + @JsonIgnore + public StorageTopic getTopicToProduce() { + return storageTopics[produceIndex]; + } - /** - * As of now only 1 is supported, but in future this can be an array where we can add more storage topics. - */ - private final StorageTopic storageTopic; + @JsonIgnore + public List getActiveTopics() { + return new ArrayList<>(Arrays.asList(storageTopics)); + } } diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/RetrySubscription.java b/entities/src/main/java/com/flipkart/varadhi/entities/RetrySubscription.java index 56b8315a..081e120e 100644 --- a/entities/src/main/java/com/flipkart/varadhi/entities/RetrySubscription.java +++ b/entities/src/main/java/com/flipkart/varadhi/entities/RetrySubscription.java @@ -16,8 +16,8 @@ public class RetrySubscription { */ @JsonIgnore - public StorageSubscription getStorageSubscriptionForRetry(int retryCount) { - return retrySubscriptions[retryCount - 1].getStorageSubscription(); + public InternalCompositeSubscription getSubscriptionForRetry(int retryCount) { + return retrySubscriptions[retryCount - 1]; } @JsonIgnore diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/SubscriptionUnitShard.java b/entities/src/main/java/com/flipkart/varadhi/entities/SubscriptionUnitShard.java index 09eb93d5..6a08e705 100644 --- a/entities/src/main/java/com/flipkart/varadhi/entities/SubscriptionUnitShard.java +++ b/entities/src/main/java/com/flipkart/varadhi/entities/SubscriptionUnitShard.java @@ -4,7 +4,8 @@ @Getter public class SubscriptionUnitShard extends SubscriptionShards { - + //TODO::Add a notion of regions to either Shard or VaradhiSubscription + // and bring operation down to regional level e.g. start/stop/assignment. private final int shardId; private final TopicCapacityPolicy capacityRequest; private final InternalCompositeSubscription mainSubscription; diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/TopicCapacityPolicy.java b/entities/src/main/java/com/flipkart/varadhi/entities/TopicCapacityPolicy.java index db4e9727..c50a5ceb 100644 --- a/entities/src/main/java/com/flipkart/varadhi/entities/TopicCapacityPolicy.java +++ b/entities/src/main/java/com/flipkart/varadhi/entities/TopicCapacityPolicy.java @@ -2,11 +2,13 @@ import com.flipkart.varadhi.entities.cluster.NodeCapacity; import lombok.Data; +import lombok.NoArgsConstructor; @Data +@NoArgsConstructor public class TopicCapacityPolicy implements Comparable { - private int throughputKBps; private int qps; + private int throughputKBps; private int readFanOut; public TopicCapacityPolicy(int qps, int throughputKBps, int readFanOut) { @@ -14,11 +16,6 @@ public TopicCapacityPolicy(int qps, int throughputKBps, int readFanOut) { this.throughputKBps = throughputKBps; this.readFanOut = readFanOut; } - - public static TopicCapacityPolicy getDefault() { - return new TopicCapacityPolicy(100, 400, 2); - } - public TopicCapacityPolicy from(double factor, int readFanOut) { int qps = (int)Math.ceil((double) this.qps * factor); int kbps = (int)Math.ceil((double) throughputKBps * factor); @@ -32,6 +29,6 @@ public int compareTo(NodeCapacity o) { @Override public String toString() { - return String.format("%d KBps %d qps", throughputKBps, qps); + return String.format("%dKBps, %d Qps, %d readFanOut", throughputKBps, qps, readFanOut); } } diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/TopicResource.java b/entities/src/main/java/com/flipkart/varadhi/entities/TopicResource.java index 7122adb8..6aed226c 100644 --- a/entities/src/main/java/com/flipkart/varadhi/entities/TopicResource.java +++ b/entities/src/main/java/com/flipkart/varadhi/entities/TopicResource.java @@ -1,17 +1,18 @@ package com.flipkart.varadhi.entities; import lombok.EqualsAndHashCode; -import lombok.Value; +import lombok.Getter; +import lombok.Setter; -@Value +@Getter @EqualsAndHashCode(callSuper = true) @ValidateResource(message = "Invalid Topic name. Check naming constraints.", max = 64) public class TopicResource extends VersionedEntity implements Validatable { private static final String RESOURCE_TYPE_NAME = "TopicResource"; - - String project; - boolean grouped; - TopicCapacityPolicy capacity; + private final String project; + private final boolean grouped; + @Setter + private TopicCapacityPolicy capacity; public TopicResource( String name, @@ -26,7 +27,7 @@ public TopicResource( this.capacity = capacity; } - public static TopicResource of(VaradhiTopic varadhiTopic) { + public static TopicResource from(VaradhiTopic varadhiTopic) { String[] topicResourceInfo = varadhiTopic.getName().split(NAME_SEPARATOR_REGEX); return new TopicResource( topicResourceInfo[1], diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/TopicState.java b/entities/src/main/java/com/flipkart/varadhi/entities/TopicState.java index 770d8eb1..b606675e 100644 --- a/entities/src/main/java/com/flipkart/varadhi/entities/TopicState.java +++ b/entities/src/main/java/com/flipkart/varadhi/entities/TopicState.java @@ -4,6 +4,8 @@ @Getter public enum TopicState { + // TODO:: Storage topic should be only Producing & Replicating + // TODO:: Blocked/Throttled are VaradhiTopic state. Producing(true, ProduceStatus.Success), Blocked(false, ProduceStatus.Blocked), Throttled(false, ProduceStatus.Throttled), diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/VaradhiTopic.java b/entities/src/main/java/com/flipkart/varadhi/entities/VaradhiTopic.java index 1ed30449..cc935202 100644 --- a/entities/src/main/java/com/flipkart/varadhi/entities/VaradhiTopic.java +++ b/entities/src/main/java/com/flipkart/varadhi/entities/VaradhiTopic.java @@ -25,20 +25,16 @@ private VaradhiTopic( super(name, version); this.grouped = grouped; this.capacity = capacity; - this.internalTopics = null == internalTopics ? new HashMap<>() : internalTopics; + this.internalTopics = internalTopics; } public static VaradhiTopic of(TopicResource topicResource) { - TopicCapacityPolicy capacity = topicResource.getCapacity(); - if (null == capacity) { - capacity = fetchDefaultCapacity(); - } return new VaradhiTopic( buildTopicName(topicResource.getProject(), topicResource.getName()), INITIAL_VERSION, topicResource.isGrouped(), - capacity, - null + topicResource.getCapacity(), + new HashMap<>() ); } @@ -46,8 +42,8 @@ public static String buildTopicName(String projectName, String topicName) { return String.join(NAME_SEPARATOR, projectName, topicName); } - public void addInternalTopic(InternalCompositeTopic internalTopic) { - this.internalTopics.put(internalTopic.getTopicRegion(), internalTopic); + public void addInternalTopic(String region, InternalCompositeTopic internalTopic) { + this.internalTopics.put(region, internalTopic); } @JsonIgnore @@ -58,9 +54,4 @@ public String getProjectName() { public InternalCompositeTopic getProduceTopicForRegion(String region) { return internalTopics.get(region); } - - private static TopicCapacityPolicy fetchDefaultCapacity() { - //TODO:: make default capacity config based instead of hard coding. - return TopicCapacityPolicy.getDefault(); - } } diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ConsumerInfo.java b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ConsumerInfo.java index 41a2ae44..c3f8c47f 100644 --- a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ConsumerInfo.java +++ b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ConsumerInfo.java @@ -1,17 +1,16 @@ package com.flipkart.varadhi.entities.cluster; - import lombok.AllArgsConstructor; import lombok.Getter; @Getter @AllArgsConstructor public class ConsumerInfo { + // Consumer Info as maintained by the consumer node itself. private String consumerId; private NodeCapacity available; public static ConsumerInfo from(MemberInfo memberInfo) { - return new ConsumerInfo( - memberInfo.hostname(), new NodeCapacity(1000, memberInfo.capacity().getNetworkMBps())); + return new ConsumerInfo(memberInfo.hostname(), memberInfo.provisionedCapacity().clone()); } } diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ConsumerNode.java b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ConsumerNode.java index a0303644..80b2d49d 100644 --- a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ConsumerNode.java +++ b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ConsumerNode.java @@ -11,16 +11,17 @@ @Getter public class ConsumerNode { + // Consumer Node info as viewed by Controller public static Comparator NodeComparator = comparing(o -> o.available); - private final MemberInfo memberInfo; + private final String consumerId; private final NodeCapacity available; - private boolean markedForDeletion; private final Map assignments; + private boolean markedForDeletion; public ConsumerNode(MemberInfo memberInfo) { - this.memberInfo = memberInfo; + this.consumerId = memberInfo.hostname(); this.markedForDeletion = false; - this.available = new NodeCapacity(1000, memberInfo.capacity().getNetworkMBps() * 1000); + this.available = memberInfo.provisionedCapacity().clone(); this.assignments = new HashMap<>(); } @@ -32,17 +33,14 @@ public void updateWithConsumerInfo(ConsumerInfo consumerInfo) { available.setMaxThroughputKBps(consumerInfo.getAvailable().getMaxThroughputKBps()); } - public String getConsumerId() { - return memberInfo.hostname(); - } - public synchronized void allocate(Assignment a, TopicCapacityPolicy requests) { if (null == assignments.putIfAbsent(a.getName(), a)) { available.setMaxThroughputKBps(available.getMaxThroughputKBps() - requests.getThroughputKBps()); } } + public synchronized void free(Assignment a, TopicCapacityPolicy requests) { - if (null != assignments.remove(a.getName())){ + if (null != assignments.remove(a.getName())) { available.setMaxThroughputKBps(available.getMaxThroughputKBps() + requests.getThroughputKBps()); } } diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/MemberInfo.java b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/MemberInfo.java index 12845cc8..b7551d28 100644 --- a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/MemberInfo.java +++ b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/MemberInfo.java @@ -5,7 +5,7 @@ public record MemberInfo( String hostname, int port, ComponentKind[] roles, - MemberResources capacity + NodeCapacity provisionedCapacity ) { public boolean hasRole(ComponentKind role) { for (ComponentKind r : roles) { diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/MemberResources.java b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/MemberResources.java deleted file mode 100644 index 056a0379..00000000 --- a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/MemberResources.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.flipkart.varadhi.entities.cluster; - -import lombok.Data; - -@Data -public class MemberResources { - private int cpuCount; - private int networkMBps; - - public MemberResources(int cpuCount, int networkMBps) { - this.cpuCount = cpuCount; - this.networkMBps = networkMBps; - } - -} diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/NodeCapacity.java b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/NodeCapacity.java index 7b08dc9b..49e2eac6 100644 --- a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/NodeCapacity.java +++ b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/NodeCapacity.java @@ -4,20 +4,16 @@ @Data public class NodeCapacity implements Comparable { + private int maxQps; private int maxThroughputKBps; - private int maxQPS; - public NodeCapacity(int maxQPS, int maxThroughputKBps) { - this.maxQPS = maxQPS; + public NodeCapacity(int maxQps, int maxThroughputKBps) { + this.maxQps = maxQps; this.maxThroughputKBps = maxThroughputKBps; } - public static NodeCapacity getDefault() { - return new NodeCapacity(100, 100); - } - - public NodeCapacity from(double factor) { - return new NodeCapacity((int)((double)maxQPS * factor), (int)((double)maxThroughputKBps * factor)); + public NodeCapacity clone() { + return new NodeCapacity(maxQps, maxThroughputKBps); } @Override @@ -27,6 +23,6 @@ public int compareTo(NodeCapacity o) { @Override public String toString() { - return String.format("%d KBps %d qps", maxThroughputKBps, maxQPS); + return String.format("%d KBps %d qps", maxThroughputKBps, maxQps); } } diff --git a/entities/src/test/java/com/flipkart/varadhi/entities/VaradhiTopicTest.java b/entities/src/test/java/com/flipkart/varadhi/entities/VaradhiTopicTest.java index 687412d0..67dfe231 100644 --- a/entities/src/test/java/com/flipkart/varadhi/entities/VaradhiTopicTest.java +++ b/entities/src/test/java/com/flipkart/varadhi/entities/VaradhiTopicTest.java @@ -11,6 +11,7 @@ class VaradhiTopicTest { private static final String projectName = "project1"; private static final String topicName = "topic1"; + private static final TopicCapacityPolicy topicCapacity = new TopicCapacityPolicy(100, 400, 2); @Test void buildTopicName() { @@ -23,22 +24,15 @@ void buildTopicName() { void addAndGetInternalTopic() { VaradhiTopic varadhiTopic = VaradhiTopic.of(new TopicResource(topicName, 1, projectName, false, null)); StorageTopic st = new DummyStorageTopic(varadhiTopic.getName(), 0); - InternalCompositeTopic internalTopic = new InternalCompositeTopic("region1", TopicState.Producing, st); - - varadhiTopic.addInternalTopic(internalTopic); - assertEquals( - internalTopic.getTopicRegion(), - varadhiTopic.getProduceTopicForRegion(internalTopic.getTopicRegion()).getTopicRegion() - ); + varadhiTopic.addInternalTopic("region1", InternalCompositeTopic.of(st)); + assertEquals(st.getName(), varadhiTopic.getProduceTopicForRegion("region1").getTopicToProduce().getName()); } @Test void getTopicResource() { VaradhiTopic varadhiTopic = VaradhiTopic.of( - new TopicResource(topicName, INITIAL_VERSION, projectName, false, TopicCapacityPolicy.getDefault())); - - TopicResource topicResource = TopicResource.of(varadhiTopic); - + new TopicResource(topicName, INITIAL_VERSION, projectName, false, topicCapacity)); + TopicResource topicResource = TopicResource.from(varadhiTopic); assertEquals(topicName, topicResource.getName()); assertEquals(INITIAL_VERSION, topicResource.getVersion()); assertEquals(projectName, topicResource.getProject()); @@ -48,7 +42,8 @@ void getTopicResource() { @EqualsAndHashCode(callSuper = true) public static class DummyStorageTopic extends StorageTopic { public DummyStorageTopic(String name, int version) { - super(name, version, TopicCapacityPolicy.getDefault()); + super(name, version, topicCapacity); + } } } diff --git a/messaging/src/main/java/com/flipkart/varadhi/produce/services/ProducerService.java b/messaging/src/main/java/com/flipkart/varadhi/produce/services/ProducerService.java index 93b9c798..1f67925a 100644 --- a/messaging/src/main/java/com/flipkart/varadhi/produce/services/ProducerService.java +++ b/messaging/src/main/java/com/flipkart/varadhi/produce/services/ProducerService.java @@ -2,7 +2,6 @@ import com.flipkart.varadhi.Result; import com.flipkart.varadhi.VaradhiCache; -import com.flipkart.varadhi.core.VaradhiTopicService; import com.flipkart.varadhi.entities.*; import com.flipkart.varadhi.exceptions.ProduceException; import com.flipkart.varadhi.exceptions.ResourceNotFoundException; @@ -11,7 +10,6 @@ import com.flipkart.varadhi.produce.config.ProducerOptions; import com.flipkart.varadhi.produce.otel.ProducerMetricsEmitter; import com.flipkart.varadhi.spi.services.Producer; -import com.flipkart.varadhi.spi.services.ProducerFactory; import io.micrometer.core.instrument.MeterRegistry; import lombok.extern.slf4j.Slf4j; @@ -28,14 +26,14 @@ public class ProducerService { public ProducerService( String produceRegion, ProducerOptions producerOptions, - ProducerFactory producerFactory, - VaradhiTopicService varadhiTopicService, + Function producerProvider, + Function topicProvider, MeterRegistry meterRegistry ) { this.internalTopicCache = - setupTopicCache(producerOptions.getTopicCacheBuilderSpec(), varadhiTopicService::get, meterRegistry); + setupTopicCache(producerOptions.getTopicCacheBuilderSpec(), topicProvider, meterRegistry); this.producerCache = - setupProducerCache(producerOptions.getProducerCacheBuilderSpec(), producerFactory::newProducer, + setupProducerCache(producerOptions.getProducerCacheBuilderSpec(), producerProvider, meterRegistry ); this.produceRegion = produceRegion; @@ -89,9 +87,9 @@ public CompletableFuture produceToTopic( return CompletableFuture.completedFuture( ProduceResult.ofNonProducingTopic(message.getMessageId(), internalTopic.getTopicState())); } - Producer producer = producerCache.get(internalTopic.getStorageTopic()); + Producer producer = producerCache.get(internalTopic.getTopicToProduce()); return produceToStorageProducer( - producer, metricsEmitter, internalTopic.getStorageTopic().getName(), message).thenApply(result -> + producer, metricsEmitter, internalTopic.getTopicToProduce().getName(), message).thenApply(result -> ProduceResult.of(message.getMessageId(), result)); } catch (VaradhiException e) { throw e; diff --git a/messaging/src/test/java/com/flipkart/varadhi/services/ProducerServiceTests.java b/messaging/src/test/java/com/flipkart/varadhi/services/ProducerServiceTests.java index 3329a1ac..16e76655 100644 --- a/messaging/src/test/java/com/flipkart/varadhi/services/ProducerServiceTests.java +++ b/messaging/src/test/java/com/flipkart/varadhi/services/ProducerServiceTests.java @@ -1,6 +1,6 @@ package com.flipkart.varadhi.services; -import com.flipkart.varadhi.core.VaradhiTopicService; +import com.flipkart.varadhi.Constants; import com.flipkart.varadhi.entities.*; import com.flipkart.varadhi.exceptions.ProduceException; import com.flipkart.varadhi.exceptions.ResourceNotFoundException; @@ -37,7 +37,7 @@ public class ProducerServiceTests { ProducerService service; ProducerFactory producerFactory; MeterRegistry meterRegistry; - VaradhiTopicService topicService; + TopicProvider topicProvider; Producer producer; Random random; String topic = "topic1"; @@ -47,9 +47,12 @@ public class ProducerServiceTests { @BeforeEach public void preTest() { producerFactory = mock(ProducerFactory.class); - topicService = mock(VaradhiTopicService.class); + topicProvider = mock(TopicProvider.class); meterRegistry = new OtlpMeterRegistry(); - service = new ProducerService(region, new ProducerOptions(), producerFactory, topicService, meterRegistry); + + service = new ProducerService(region, new ProducerOptions(), producerFactory::newProducer, topicProvider::get, + meterRegistry + ); random = new Random(); producer = spy(new DummyProducer(JsonMapper.getMapper())); @@ -60,7 +63,7 @@ public void testProduceMessage() throws InterruptedException { ProducerMetricsEmitter emitter = getMetricEmitter(topic, project, region); Message msg1 = getMessage(0, 1, null, 10); VaradhiTopic vt = getTopic(topic, project, region); - doReturn(vt).when(topicService).get(vt.getName()); + doReturn(vt).when(topicProvider).get(vt.getName()); doReturn(producer).when(producerFactory).newProducer(any()); CompletableFuture result = service.produceToTopic(msg1, VaradhiTopic.buildTopicName(project.getName(), topic), emitter); @@ -76,7 +79,7 @@ public void testProduceMessage() throws InterruptedException { Assertions.assertNull(rc.throwable); verify(producer, times(1)).produceAsync(msg2); verify(producerFactory, times(1)).newProducer(any()); - verify(topicService, times(1)).get(vt.getName()); + verify(topicProvider, times(1)).get(vt.getName()); } @Test @@ -84,7 +87,7 @@ public void testProduceWhenProduceAsyncThrows() { ProducerMetricsEmitter emitter = mock(ProducerMetricsEmitter.class); Message msg1 = getMessage(0, 1, null, 10); VaradhiTopic vt = getTopic(topic, project, region); - doReturn(vt).when(topicService).get(vt.getName()); + doReturn(vt).when(topicProvider).get(vt.getName()); doReturn(producer).when(producerFactory).newProducer(any()); doThrow(new RuntimeException("Some random error.")).when(producer).produceAsync(msg1); // This is testing Producer.ProduceAsync(), throwing an exception which is handled in produce service. @@ -103,7 +106,7 @@ public void testProduceToNonExistingTopic() { Message msg1 = getMessage(0, 1, null, 0); VaradhiTopic vt = getTopic(topic, project, region); doReturn(producer).when(producerFactory).newProducer(any()); - doThrow(new ResourceNotFoundException("Topic doesn't exists.")).when(topicService).get(vt.getName()); + doThrow(new ResourceNotFoundException("Topic doesn't exists.")).when(topicProvider).get(vt.getName()); Assertions.assertThrows( ResourceNotFoundException.class, () -> service.produceToTopic(msg1, VaradhiTopic.buildTopicName(project.getName(), topic), emitter) @@ -117,7 +120,7 @@ public void testProduceWithUnknownExceptionInGetTopic() { Message msg1 = getMessage(0, 1, null, 0); VaradhiTopic vt = getTopic(topic, project, region); doReturn(producer).when(producerFactory).newProducer(any()); - doThrow(new RuntimeException("Unknown error.")).when(topicService).get(vt.getName()); + doThrow(new RuntimeException("Unknown error.")).when(topicProvider).get(vt.getName()); ProduceException e = Assertions.assertThrows( ProduceException.class, () -> service.produceToTopic(msg1, VaradhiTopic.buildTopicName(project.getName(), topic), emitter) @@ -160,7 +163,7 @@ public void produceNotAllowedTopicState( ProducerMetricsEmitter emitter = getMetricEmitter(topic, project, region); Message msg1 = getMessage(0, 1, null, 0); VaradhiTopic vt = getTopic(topicState, topic, project, region); - doReturn(vt).when(topicService).get(vt.getName()); + doReturn(vt).when(topicProvider).get(vt.getName()); doReturn(producer).when(producerFactory).newProducer(any()); CompletableFuture result = service.produceToTopic(msg1, VaradhiTopic.buildTopicName(project.getName(), topic), emitter); @@ -177,7 +180,7 @@ public void testProduceWithUnknownExceptionInGetProducer() { ProducerMetricsEmitter emitter = getMetricEmitter(topic, project, region); Message msg1 = getMessage(0, 1, null, 0); VaradhiTopic vt = getTopic(topic, project, region); - doReturn(vt).when(topicService).get(vt.getName()); + doReturn(vt).when(topicProvider).get(vt.getName()); doThrow(new RuntimeException("Unknown Error.")).when(producerFactory).newProducer(any()); ProduceException pe = Assertions.assertThrows( ProduceException.class, @@ -192,7 +195,7 @@ public void testProduceWithKnownExceptionInGetProducer() { ProducerMetricsEmitter emitter = getMetricEmitter(topic, project, region); Message msg1 = getMessage(0, 1, null, 0); VaradhiTopic vt = getTopic(topic, project, region); - doReturn(vt).when(topicService).get(vt.getName()); + doReturn(vt).when(topicProvider).get(vt.getName()); doThrow(new RuntimeException("Topic doesn't exists.")).when(producerFactory).newProducer(any()); RuntimeException re = Assertions.assertThrows( RuntimeException.class, @@ -208,7 +211,7 @@ public void testProduceWithProducerFailure() throws InterruptedException { ProducerMetricsEmitter emitter = getMetricEmitter(topic, project, region); Message msg1 = getMessage(0, 1, UnsupportedOperationException.class.getName(), 0); VaradhiTopic vt = getTopic(topic, project, region); - doReturn(vt).when(topicService).get(vt.getName()); + doReturn(vt).when(topicProvider).get(vt.getName()); doReturn(producer).when(producerFactory).newProducer(any()); CompletableFuture result = @@ -225,14 +228,13 @@ public void testProduceWithProducerFailure() throws InterruptedException { verify(producerFactory, times(1)).newProducer(any()); } - @Test public void testMetricEmitFailureNotIgnored() throws InterruptedException { ProducerMetricsEmitter emitter = mock(ProducerMetricsEmitter.class); doThrow(new RuntimeException("Failed to send metric.")).when(emitter).emit(anyBoolean(), anyLong()); Message msg1 = getMessage(0, 1, null, 10); VaradhiTopic vt = getTopic(topic, project, region); - doReturn(vt).when(topicService).get(vt.getName()); + doReturn(vt).when(topicProvider).get(vt.getName()); doReturn(producer).when(producerFactory).newProducer(any()); CompletableFuture result = service.produceToTopic(msg1, VaradhiTopic.buildTopicName(project.getName(), topic), emitter); @@ -252,7 +254,9 @@ public VaradhiTopic getTopic(String name, Project project, String region) { public VaradhiTopic getTopic(TopicState state, String name, Project project, String region) { VaradhiTopic topic = VaradhiTopic.of(new TopicResource(name, 0, project.getName(), false, null)); StorageTopic st = new DummyStorageTopic(topic.getName(), 0); - topic.addInternalTopic(new InternalCompositeTopic(region, state, st)); + InternalCompositeTopic ict = InternalCompositeTopic.of(st); + ict.setTopicState(state); + topic.addInternalTopic(region, ict); return topic; } @@ -280,7 +284,7 @@ public ProducerMetricsEmitter getMetricEmitter(String topic, Project project, St produceAttributes.put(TAG_PROJECT, project.getName()); produceAttributes.put(TAG_TOPIC, topic); produceAttributes.put(TAG_IDENTITY, ANONYMOUS_IDENTITY); - produceAttributes.put(TAG_REMOTEHOST, "remotehost"); + produceAttributes.put(TAG_REMOTEHOST, "remoteHost"); return new ProducerMetricsEmitterImpl(meterRegistry, 0, produceAttributes); } @@ -302,6 +306,12 @@ ResultCapture getResult(CompletableFuture future) throws Interrup return rc; } + public static class TopicProvider { + public VaradhiTopic get(String topicName) { + return null; + } + } + static class ResultCapture { ProduceResult produceResult; Throwable throwable; @@ -309,7 +319,7 @@ static class ResultCapture { public static class DummyStorageTopic extends StorageTopic { public DummyStorageTopic(String name, int version) { - super(name, version, TopicCapacityPolicy.getDefault()); + super(name, version, Constants.DefaultTopicCapacity); } } } diff --git a/pulsar/src/main/java/com/flipkart/varadhi/pulsar/PulsarTopicService.java b/pulsar/src/main/java/com/flipkart/varadhi/pulsar/PulsarTopicService.java index 73aafd63..e528657e 100644 --- a/pulsar/src/main/java/com/flipkart/varadhi/pulsar/PulsarTopicService.java +++ b/pulsar/src/main/java/com/flipkart/varadhi/pulsar/PulsarTopicService.java @@ -96,6 +96,10 @@ public void delete(String topicName, Project project) { clientProvider.getAdminClient().topics().deletePartitionedTopic(topicName, false, false); log.debug("Deleted the pulsar topic:{}", topicName); } catch (PulsarAdminException e) { + if (e instanceof PulsarAdminException.NotFoundException) { + log.warn("Pulsar topic {} not found, skipping delete.", topicName); + return; + } throw new MessagingException(e); } } diff --git a/pulsar/src/test/java/com/flipkart/varadhi/pulsar/PulsarProducerFactoryTest.java b/pulsar/src/test/java/com/flipkart/varadhi/pulsar/PulsarProducerFactoryTest.java index a0ad42d8..c5c7637e 100644 --- a/pulsar/src/test/java/com/flipkart/varadhi/pulsar/PulsarProducerFactoryTest.java +++ b/pulsar/src/test/java/com/flipkart/varadhi/pulsar/PulsarProducerFactoryTest.java @@ -1,6 +1,6 @@ package com.flipkart.varadhi.pulsar; -import com.flipkart.varadhi.entities.TopicCapacityPolicy; +import com.flipkart.varadhi.Constants; import com.flipkart.varadhi.exceptions.ProduceException; import com.flipkart.varadhi.pulsar.entities.PulsarStorageTopic; import com.flipkart.varadhi.pulsar.producer.PulsarProducerFactory; @@ -32,7 +32,7 @@ public void preTest() throws IOException { "pulsarAdminOptions:\n serviceHttpUrl: \"http://127.0.0.1:8081\"\npulsarClientOptions:\n serviceUrl: \"http://127.0.0.1:8081\"\n"; Path configFile = tempDir.resolve("pulsarConfig.yaml"); Files.write(configFile, yamlContent.getBytes()); - topic = PulsarStorageTopic.from("testTopic", 1, TopicCapacityPolicy.getDefault()); + topic = PulsarStorageTopic.from("testTopic", 1, Constants.DefaultTopicCapacity); pClient = mock(PulsarClient.class); builder = mock(ProducerBuilder.class); org.apache.pulsar.client.api.Producer producer = mock(org.apache.pulsar.client.api.Producer.class); diff --git a/pulsar/src/test/java/com/flipkart/varadhi/pulsar/PulsarStackProviderTest.java b/pulsar/src/test/java/com/flipkart/varadhi/pulsar/PulsarStackProviderTest.java index 13d34fe6..da79b639 100644 --- a/pulsar/src/test/java/com/flipkart/varadhi/pulsar/PulsarStackProviderTest.java +++ b/pulsar/src/test/java/com/flipkart/varadhi/pulsar/PulsarStackProviderTest.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.flipkart.varadhi.Constants; import com.flipkart.varadhi.entities.InternalQueueCategory; import com.flipkart.varadhi.entities.Project; import com.flipkart.varadhi.entities.TopicCapacityPolicy; @@ -74,8 +75,7 @@ public void testGetStorageTopicFactory_NotInitialized() { @Test public void testGetStorageTopicFactory_Initialized() { String topicName = "foobar"; - - TopicCapacityPolicy capacity = TopicCapacityPolicy.getDefault(); + TopicCapacityPolicy capacity = Constants.DefaultTopicCapacity; InternalQueueCategory topicCategory = InternalQueueCategory.MAIN; pulsarStackProvider.init(messagingStackOptions, objectMapper); StorageTopicFactory storageTopicFactory = pulsarStackProvider.getStorageTopicFactory(); diff --git a/pulsar/src/test/java/com/flipkart/varadhi/pulsar/entities/PulsarProducerTest.java b/pulsar/src/test/java/com/flipkart/varadhi/pulsar/entities/PulsarProducerTest.java index 01d33078..3fa8ccd6 100644 --- a/pulsar/src/test/java/com/flipkart/varadhi/pulsar/entities/PulsarProducerTest.java +++ b/pulsar/src/test/java/com/flipkart/varadhi/pulsar/entities/PulsarProducerTest.java @@ -1,5 +1,6 @@ package com.flipkart.varadhi.pulsar.entities; +import com.flipkart.varadhi.Constants; import com.flipkart.varadhi.entities.*; import com.flipkart.varadhi.entities.Message; import com.flipkart.varadhi.pulsar.config.ProducerOptions; @@ -48,7 +49,7 @@ public void preTest() throws PulsarClientException { messageBuilder = spy(new TypedMessageBuilderImpl(producer, Schema.BYTES)); doReturn(messageBuilder).when(producer).newMessage(); - policy = TopicCapacityPolicy.getDefault(); + policy = Constants.DefaultTopicCapacity; topic = PulsarStorageTopic.from("one.two.three.four", 1, policy); doReturn(topic.getName()).when(producer).getTopic(); diff --git a/pulsar/src/test/java/com/flipkart/varadhi/pulsar/services/PulsarTopicServiceTest.java b/pulsar/src/test/java/com/flipkart/varadhi/pulsar/services/PulsarTopicServiceTest.java index 71c84b4e..0777c215 100644 --- a/pulsar/src/test/java/com/flipkart/varadhi/pulsar/services/PulsarTopicServiceTest.java +++ b/pulsar/src/test/java/com/flipkart/varadhi/pulsar/services/PulsarTopicServiceTest.java @@ -1,7 +1,7 @@ package com.flipkart.varadhi.pulsar.services; +import com.flipkart.varadhi.Constants; import com.flipkart.varadhi.entities.Project; -import com.flipkart.varadhi.entities.TopicCapacityPolicy; import com.flipkart.varadhi.pulsar.ClientProvider; import com.flipkart.varadhi.pulsar.PulsarTopicService; import com.flipkart.varadhi.pulsar.config.PulsarConfig; @@ -10,7 +10,6 @@ import com.flipkart.varadhi.pulsar.util.TopicPlanner; import com.flipkart.varadhi.spi.services.MessagingException; import org.apache.pulsar.client.admin.*; -import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -47,7 +46,7 @@ public void setUp() { @Test public void testCreate() throws PulsarAdminException { - PulsarStorageTopic topic = PulsarStorageTopic.from(TEST_TOPIC, 1, TopicCapacityPolicy.getDefault()); + PulsarStorageTopic topic = PulsarStorageTopic.from(TEST_TOPIC, 1, Constants.DefaultTopicCapacity); doThrow(new PulsarAdminException.NotFoundException(new RuntimeException(""), "topic not found", 409)).when( topics).getPartitionedTopicMetadata(topic.getName()); doNothing().when(topics).createPartitionedTopic(anyString(), eq(1)); @@ -57,7 +56,7 @@ public void testCreate() throws PulsarAdminException { @Test public void testCreate_PulsarAdminException() throws PulsarAdminException { - PulsarStorageTopic topic = PulsarStorageTopic.from(TEST_TOPIC, 1, TopicCapacityPolicy.getDefault()); + PulsarStorageTopic topic = PulsarStorageTopic.from(TEST_TOPIC, 1, Constants.DefaultTopicCapacity); doThrow(new PulsarAdminException.NotFoundException(new RuntimeException(""), "topic not found", 409)).when( topics).getPartitionedTopicMetadata(topic.getName()); doThrow(PulsarAdminException.class).when(topics).createPartitionedTopic(anyString(), eq(1)); @@ -67,8 +66,7 @@ public void testCreate_PulsarAdminException() throws PulsarAdminException { @Test public void testCreate_ConflictException() throws PulsarAdminException { - PulsarStorageTopic topic = PulsarStorageTopic.from(TEST_TOPIC, 1, TopicCapacityPolicy.getDefault()); - + PulsarStorageTopic topic = PulsarStorageTopic.from(TEST_TOPIC, 1, Constants.DefaultTopicCapacity); doThrow(new PulsarAdminException.NotFoundException(new RuntimeException(""), "topic not found", 409)).when( topics).getPartitionedTopicMetadata(topic.getName()); doThrow(PulsarAdminException.class).when(topics).createPartitionedTopic(anyString(), eq(1)); @@ -88,7 +86,7 @@ public void testCreate_NewTenantNamespace() throws PulsarAdminException { String newTenant = "testTenantNew"; Project projectNew = new Project("projectNew", INITIAL_VERSION, "", "public", newTenant); String newNamespace = EntityHelper.getNamespace(newTenant, projectNew.getName()); - PulsarStorageTopic topic = PulsarStorageTopic.from(TEST_TOPIC, 1, TopicCapacityPolicy.getDefault()); + PulsarStorageTopic topic = PulsarStorageTopic.from(TEST_TOPIC, 1, Constants.DefaultTopicCapacity); doThrow(new PulsarAdminException.NotFoundException(new RuntimeException(""), "topic not found", 409)).when( topics).getPartitionedTopicMetadata(topic.getName()); doNothing().when(topics).createPartitionedTopic(anyString(), eq(1)); diff --git a/pulsar/src/testE2E/java/com/flipkart/varadhi/pulsar/PulsarTopicServiceTest.java b/pulsar/src/testE2E/java/com/flipkart/varadhi/pulsar/PulsarTopicServiceTest.java index 65433450..874bf230 100644 --- a/pulsar/src/testE2E/java/com/flipkart/varadhi/pulsar/PulsarTopicServiceTest.java +++ b/pulsar/src/testE2E/java/com/flipkart/varadhi/pulsar/PulsarTopicServiceTest.java @@ -1,7 +1,7 @@ package com.flipkart.varadhi.pulsar; +import com.flipkart.varadhi.Constants; import com.flipkart.varadhi.entities.Project; -import com.flipkart.varadhi.entities.TopicCapacityPolicy; import com.flipkart.varadhi.pulsar.entities.PulsarStorageTopic; import com.flipkart.varadhi.pulsar.util.EntityHelper; import com.flipkart.varadhi.pulsar.util.TopicPlanner; @@ -36,7 +36,7 @@ public void init() throws PulsarAdminException { @Test public void testCreateTopic() throws PulsarAdminException { String topicFQDN = getRandomTopicFQDN(); - PulsarStorageTopic pt = PulsarStorageTopic.from(topicFQDN, 1, TopicCapacityPolicy.getDefault()); + PulsarStorageTopic pt = PulsarStorageTopic.from(topicFQDN, 1, Constants.DefaultTopicCapacity); topicService.create(pt, project); validateTopicExists(topicFQDN); } @@ -44,7 +44,7 @@ public void testCreateTopic() throws PulsarAdminException { @Test public void testDuplicateTopicWithSameConfigAllowed() { String topicFQDN = getRandomTopicFQDN(); - PulsarStorageTopic pt = PulsarStorageTopic.from(topicFQDN, 1, TopicCapacityPolicy.getDefault()); + PulsarStorageTopic pt = PulsarStorageTopic.from(topicFQDN, 1, Constants.DefaultTopicCapacity); topicService.create(pt, project); topicService.create(pt, project); } @@ -52,8 +52,8 @@ public void testDuplicateTopicWithSameConfigAllowed() { @Test public void testDuplicateTopicWithDifferentConfigNotAllowed() { String topicFQDN = getRandomTopicFQDN(); - PulsarStorageTopic pt1 = PulsarStorageTopic.from(topicFQDN, 2, TopicCapacityPolicy.getDefault()); - PulsarStorageTopic pt2 = PulsarStorageTopic.from(topicFQDN, 1, TopicCapacityPolicy.getDefault()); + PulsarStorageTopic pt1 = PulsarStorageTopic.from(topicFQDN, 2, Constants.DefaultTopicCapacity); + PulsarStorageTopic pt2 = PulsarStorageTopic.from(topicFQDN, 1, Constants.DefaultTopicCapacity); topicService.create(pt1, project); MessagingException m = Assertions.assertThrows(MessagingException.class, () -> topicService.create(pt2, project)); @@ -67,9 +67,8 @@ public void testCreate_NewTenantNamespace() throws PulsarAdminException { String newNamespace = EntityHelper.getNamespace(newTenant, projectNew.getName()); String topicFQDN = getRandomTopicFQDN(); - PulsarStorageTopic pt = PulsarStorageTopic.from(topicFQDN, 1, TopicCapacityPolicy.getDefault()); + PulsarStorageTopic pt = PulsarStorageTopic.from(topicFQDN, 1, Constants.DefaultTopicCapacity); topicService.create(pt, projectNew); - validateTopicExists(topicFQDN); validateTenantExists(newTenant); validateNamespaceExists(newTenant, newNamespace); diff --git a/server/src/main/java/com/flipkart/varadhi/VaradhiApplication.java b/server/src/main/java/com/flipkart/varadhi/VaradhiApplication.java index d35e354a..aaddeffc 100644 --- a/server/src/main/java/com/flipkart/varadhi/VaradhiApplication.java +++ b/server/src/main/java/com/flipkart/varadhi/VaradhiApplication.java @@ -1,6 +1,6 @@ package com.flipkart.varadhi; -import com.flipkart.varadhi.entities.cluster.MemberResources; +import com.flipkart.varadhi.entities.cluster.NodeCapacity; import com.flipkart.varadhi.utils.JsonMapper; import com.flipkart.varadhi.verticles.consumer.ConsumerVerticle; import com.flipkart.varadhi.verticles.webserver.WebServerVerticle; @@ -32,6 +32,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; +import java.net.UnknownHostException; import java.util.Arrays; import java.util.Map; import java.util.function.Function; @@ -39,21 +40,17 @@ @Slf4j public class VaradhiApplication { - private static MemberInfo memberInfo; - public static void main(String[] args) { try { - String host = HostUtils.getHostName(); - int port = 0; - log.info("VaradhiApplication Starting on {}.", host); + log.info("Starting VaradhiApplication"); AppConfiguration configuration = readConfiguration(args); - MemberConfig mConfig = configuration.getMember(); - memberInfo = new MemberInfo(host, port, mConfig.getRoles(), getMemberResources(mConfig)); + MemberInfo memberInfo = getMemberInfo(configuration.getMember()); CoreServices services = new CoreServices(configuration); - VaradhiZkClusterManager clusterManager = getClusterManager(configuration, host); - Map verticles = getComponentVerticles(configuration, services, clusterManager); - createClusteredVertx(configuration, clusterManager, services, host).compose(vertx -> + VaradhiZkClusterManager clusterManager = getClusterManager(configuration, memberInfo.hostname()); + Map verticles = + getComponentVerticles(configuration, services, clusterManager, memberInfo); + createClusteredVertx(configuration, clusterManager, services, memberInfo).compose(vertx -> Future.all(verticles.entrySet().stream() .map(es -> vertx.deployVerticle(es.getValue()).onComplete(ar -> { if (ar.succeeded()) { @@ -63,9 +60,9 @@ public static void main(String[] args) { } })).collect(Collectors.toList())) ) - .onSuccess(ar -> log.info("VaradhiApplication Started on {}.", host)) + .onSuccess(ar -> log.info("VaradhiApplication Started on {}.", memberInfo.hostname())) .onFailure(t -> { - log.error("VaradhiApplication on host {} failed to start. {} ", host, t); + log.error("VaradhiApplication on host {} failed to start. {} ", memberInfo.hostname(), t); log.error("Closing the application."); System.exit(-1); }); @@ -77,9 +74,11 @@ public static void main(String[] args) { // TODO: check need for shutdown hook } - private static MemberResources getMemberResources(MemberConfig memberConfig) { - //TODO:: need to get auto detect, and provide override via config . - return new MemberResources(memberConfig.getCpuCount(), memberConfig.getNetworkMBps()); + private static MemberInfo getMemberInfo(MemberConfig memberConfig) throws UnknownHostException { + String host = HostUtils.getHostName(); + int networkKBps = memberConfig.getNetworkMBps() * 1000; + NodeCapacity provisionedCapacity = new NodeCapacity(memberConfig.getMaxQps(), networkKBps); + return new MemberInfo(host, memberConfig.getClusterPort(), memberConfig.getRoles(), provisionedCapacity); } private static VaradhiZkClusterManager getClusterManager(AppConfiguration config, String host) { @@ -91,12 +90,12 @@ private static VaradhiZkClusterManager getClusterManager(AppConfiguration config } private static Future createClusteredVertx( - AppConfiguration config, ClusterManager clusterManager, CoreServices services, String host + AppConfiguration config, ClusterManager clusterManager, CoreServices services, MemberInfo memberInfo ) { int port = 0; JsonObject memberInfoJson = new JsonObject(JsonMapper.jsonSerialize(memberInfo)); EventBusOptions eventBusOptions = new EventBusOptions() - .setHost(host) + .setHost(memberInfo.hostname()) .setPort(port) .setClusterNodeMetadata(memberInfoJson); @@ -147,9 +146,10 @@ public static AppConfiguration readConfigFromFile(String filePath) throws Invali } private static Map getComponentVerticles( - AppConfiguration config, CoreServices coreServices, VaradhiClusterManager clusterManager + AppConfiguration config, CoreServices coreServices, VaradhiClusterManager clusterManager, + MemberInfo memberInfo ) { - return Arrays.stream(config.getMember().getRoles()).distinct() + return Arrays.stream(memberInfo.roles()).distinct() .collect(Collectors.toMap(Function.identity(), kind -> switch (kind) { case Server -> new WebServerVerticle(config, coreServices, clusterManager); case Controller -> new ControllerVerticle(coreServices, clusterManager); diff --git a/server/src/main/java/com/flipkart/varadhi/config/MemberConfig.java b/server/src/main/java/com/flipkart/varadhi/config/MemberConfig.java index cfe1c4a0..4975973b 100644 --- a/server/src/main/java/com/flipkart/varadhi/config/MemberConfig.java +++ b/server/src/main/java/com/flipkart/varadhi/config/MemberConfig.java @@ -6,6 +6,7 @@ @Data public class MemberConfig { private ComponentKind[] roles; - private int cpuCount; + private int clusterPort; + private int maxQps; private int networkMBps; } diff --git a/server/src/main/java/com/flipkart/varadhi/config/RestOptions.java b/server/src/main/java/com/flipkart/varadhi/config/RestOptions.java index d7cf35cd..cc984bb5 100644 --- a/server/src/main/java/com/flipkart/varadhi/config/RestOptions.java +++ b/server/src/main/java/com/flipkart/varadhi/config/RestOptions.java @@ -1,5 +1,7 @@ package com.flipkart.varadhi.config; +import com.flipkart.varadhi.Constants; +import com.flipkart.varadhi.entities.TopicCapacityPolicy; import jakarta.validation.constraints.NotBlank; import jakarta.validation.constraints.NotNull; import lombok.Data; @@ -12,6 +14,7 @@ public class RestOptions { private String deployedRegion; @NotNull private String projectCacheBuilderSpec = "expireAfterWrite=3600s"; + private TopicCapacityPolicy defaultTopicCapacity = Constants.DefaultTopicCapacity; private boolean traceRequestEnabled = true; private int payloadSizeMax = PAYLOAD_SIZE_MAX; private int headersAllowedMax = HEADERS_ALLOWED_MAX; diff --git a/core/src/main/java/com/flipkart/varadhi/core/VaradhiTopicService.java b/server/src/main/java/com/flipkart/varadhi/services/VaradhiTopicService.java similarity index 72% rename from core/src/main/java/com/flipkart/varadhi/core/VaradhiTopicService.java rename to server/src/main/java/com/flipkart/varadhi/services/VaradhiTopicService.java index a85c258a..172ba0d8 100644 --- a/core/src/main/java/com/flipkart/varadhi/core/VaradhiTopicService.java +++ b/server/src/main/java/com/flipkart/varadhi/services/VaradhiTopicService.java @@ -1,4 +1,4 @@ -package com.flipkart.varadhi.core; +package com.flipkart.varadhi.services; import com.flipkart.varadhi.entities.Project; import com.flipkart.varadhi.entities.StorageTopic; @@ -28,14 +28,10 @@ public VaradhiTopicService( public void create(VaradhiTopic varadhiTopic, Project project) { log.info("Creating Varadhi topic {}", varadhiTopic.getName()); - varadhiTopic.getInternalTopics().forEach((kind, internalTopic) -> - { - StorageTopic storageTopic = internalTopic.getStorageTopic(); - // StorageTopicService.create() to ensure if pre-existing topi can be re-used. - // i.e. topic creation at storage level need to be idempotent. - topicService.create(storageTopic, project); - } - ); + // StorageTopicService.create() to ensure if pre-existing topic can be re-used. + // i.e. topic creation at storage level need to be idempotent. + varadhiTopic.getInternalTopics().forEach((region, internalTopic) -> internalTopic.getActiveTopics() + .forEach(storageTopic -> topicService.create(storageTopic, project))); metaStore.createTopic(varadhiTopic); } @@ -51,16 +47,8 @@ public void delete(String varadhiTopicName) { String projectName = varadhiTopic.getProjectName(); Project project = metaStore.getProject(projectName); validateDelete(varadhiTopicName); - varadhiTopic.getInternalTopics().forEach((kind, internalTopic) -> - { - StorageTopic storageTopic = internalTopic.getStorageTopic(); - if (topicService.exists(storageTopic.getName())) { - topicService.delete(storageTopic.getName(), project); - } else { - log.warn("Specified StorageTopic({}) does not exist.", storageTopic.getName()); - } - } - ); + varadhiTopic.getInternalTopics().forEach((region, internalTopic) -> internalTopic.getActiveTopics() + .forEach(storageTopic -> topicService.delete(storageTopic.getName(), project))); metaStore.deleteTopic(varadhiTopic.getName()); } diff --git a/server/src/main/java/com/flipkart/varadhi/utils/ShardProvisioner.java b/server/src/main/java/com/flipkart/varadhi/utils/ShardProvisioner.java index 71daeab4..a04f2e1f 100644 --- a/server/src/main/java/com/flipkart/varadhi/utils/ShardProvisioner.java +++ b/server/src/main/java/com/flipkart/varadhi/utils/ShardProvisioner.java @@ -5,6 +5,8 @@ import com.flipkart.varadhi.spi.services.StorageTopicService; import lombok.extern.slf4j.Slf4j; +import java.util.List; + @Slf4j public class ShardProvisioner { StorageTopicService storageTopicService; @@ -36,32 +38,25 @@ public void deProvision(VaradhiSubscription varadhiSub, Project project) { private void provisionShard(String subscriptionName, SubscriptionUnitShard shard, Project project) { // provision main sub for shard. - provisionStorageSubscription(shard.getMainSubscription().getStorageSubscription(), project); - + provisionCompositeSubscription(shard.getMainSubscription(), project, false); RetrySubscription retrySub = shard.getRetrySubscription(); for (int rqIndex = 0; rqIndex < retrySub.getMaxRetryCount(); rqIndex++) { - int retryAttempt = rqIndex + 1; - StorageSubscription retryStorageSub = retrySub.getStorageSubscriptionForRetry(retryAttempt); - StorageTopic retryStorageTopic = retryStorageSub.getStorageTopic(); - provisionStorageTopic(retryStorageTopic, project); - provisionStorageSubscription(retryStorageSub, project); + provisionCompositeSubscription(retrySub.getSubscriptionForRetry(rqIndex + 1), project, true); } - InternalCompositeSubscription dltSub = shard.getDeadLetterSubscription(); - StorageSubscription dltStorageSub = dltSub.getStorageSubscription(); - StorageTopic dltStorageTopic = dltStorageSub.getStorageTopic(); - provisionStorageTopic(dltStorageTopic, project); - - provisionStorageSubscription(dltStorageSub, project); + provisionCompositeSubscription(shard.getDeadLetterSubscription(), project, true); log.info("Provisioned the Subscription: {}, Shard:{}", subscriptionName, shard.getShardId()); } - private void provisionStorageTopic(StorageTopic storageTopic, Project project) { - if (storageTopicService.exists(storageTopic.getName())) { - log.info("StorageTopic:{} already exists, re-using it.", storageTopic.getName()); - } else { - storageTopicService.create(storageTopic, project); - log.info("storageTopic:{} provisioned.", storageTopic.getName()); - } + private void provisionCompositeSubscription( + InternalCompositeSubscription compositeSubscription, Project project, boolean provisionTopics + ) { + List> storageSubs = compositeSubscription.getActiveSubscriptions(); + storageSubs.forEach(storageSub -> { + if (provisionTopics) { + provisionStorageTopic(storageSub.getStorageTopic(), project); + } + provisionStorageSubscription(storageSub, project); + }); } private void provisionStorageSubscription(StorageSubscription storageSub, Project project) { @@ -74,25 +69,38 @@ private void provisionStorageSubscription(StorageSubscription stor } } - private void deProvisionShard(String subscriptionName, SubscriptionUnitShard shard, Project project) { - InternalCompositeSubscription dltSub = shard.getDeadLetterSubscription(); - StorageSubscription dltStorageSub = dltSub.getStorageSubscription(); - StorageTopic dltStorageTopic = dltStorageSub.getTopicPartitions().getTopic(); - deProvisionStorageSubscription(dltStorageSub, project); - deProvisionStorageTopic(dltStorageTopic, project); + private void provisionStorageTopic(StorageTopic storageTopic, Project project) { + if (storageTopicService.exists(storageTopic.getName())) { + log.info("StorageTopic:{} already exists, re-using it.", storageTopic.getName()); + } else { + storageTopicService.create(storageTopic, project); + log.info("storageTopic:{} provisioned.", storageTopic.getName()); + } + } + private void deProvisionShard(String subscriptionName, SubscriptionUnitShard shard, Project project) { + deProvisionCompositeSubscription(shard.getDeadLetterSubscription(), project, true); RetrySubscription retrySub = shard.getRetrySubscription(); for (int rqIndex = 0; rqIndex < retrySub.getMaxRetryCount(); rqIndex++) { int rqAttempt = rqIndex + 1; - StorageSubscription retryStorageSub = retrySub.getStorageSubscriptionForRetry(rqAttempt); - StorageTopic retryStorageTopic = retryStorageSub.getTopicPartitions().getTopic(); - deProvisionStorageSubscription(retryStorageSub, project); - deProvisionStorageTopic(retryStorageTopic, project); + deProvisionCompositeSubscription(retrySub.getSubscriptionForRetry(rqAttempt), project, true); } - deProvisionStorageSubscription(shard.getMainSubscription().getStorageSubscription(), project); + deProvisionCompositeSubscription(shard.getMainSubscription(), project, false); log.info("deProvisioned the Subscription: {}, Shard:{}", subscriptionName, shard.getShardId()); } + private void deProvisionCompositeSubscription( + InternalCompositeSubscription compositeSubscription, Project project, boolean deProvisionTopics + ) { + List> storageSubs = compositeSubscription.getActiveSubscriptions(); + storageSubs.forEach(storageSub -> { + deProvisionStorageSubscription(storageSub, project); + if (deProvisionTopics) { + deProvisionStorageTopic(storageSub.getStorageTopic(), project); + } + }); + } + private void deProvisionStorageTopic(StorageTopic storageTopic, Project project) { if (!storageTopicService.exists(storageTopic.getName())) { log.info("StorageTopic:{} not found, skipping delete.", storageTopic.getName()); diff --git a/server/src/main/java/com/flipkart/varadhi/utils/VaradhiSubscriptionFactory.java b/server/src/main/java/com/flipkart/varadhi/utils/VaradhiSubscriptionFactory.java index 3a83cfd7..dfd56098 100644 --- a/server/src/main/java/com/flipkart/varadhi/utils/VaradhiSubscriptionFactory.java +++ b/server/src/main/java/com/flipkart/varadhi/utils/VaradhiSubscriptionFactory.java @@ -18,6 +18,7 @@ public final class VaradhiSubscriptionFactory { private static final String SUB_QUALIFIER = "is"; private static final String TOPIC_QUALIFIER = "it"; private static final String SHARD_QUALIFIER = "shard"; + private static final int READ_FAN_OUT_FOR_INTERNAL_QUEUE = 1; private static final int MAX_RETRY_COUNT = 3; private final String deployedRegion; private final StorageSubscriptionFactory, StorageTopic> subscriptionFactory; @@ -56,7 +57,7 @@ public VaradhiSubscription get(SubscriptionResource subscriptionResource, Projec private SubscriptionShards getSubscriptionShards( String subName, VaradhiTopic topic, Project subProject, ConsumptionPolicy consumptionPolicy ) { - StorageTopic subscribedStorageTopic = topic.getProduceTopicForRegion(deployedRegion).getStorageTopic(); + StorageTopic subscribedStorageTopic = topic.getProduceTopicForRegion(deployedRegion).getTopicToProduce(); List> topicPartitions = topicService.shardTopic(subscribedStorageTopic, InternalQueueCategory.MAIN); int numShards = topicPartitions.size(); @@ -67,9 +68,10 @@ private SubscriptionShards getSubscriptionShards( } else { Map subShards = new HashMap<>(); for (int shardId = 0; shardId < numShards; shardId++) { - subShards.put(shardId, getShard(subName, shardId, topicPartitions.get(shardId), shardCapacity, subProject, - consumptionPolicy - )); + subShards.put( + shardId, getShard(subName, shardId, topicPartitions.get(shardId), shardCapacity, subProject, + consumptionPolicy + )); } return new SubscriptionMultiShard(subShards); } @@ -83,6 +85,8 @@ private SubscriptionUnitShard getShard( String subName, int shardId, TopicPartitions shardTopicPartition, TopicCapacityPolicy capacity, Project subProject, ConsumptionPolicy consumptionPolicy ) { + //TODO::Take care of region. + //TODO::Storage Topic/Subscription names needs to be indexed with in Composite topic/subscription. InternalCompositeSubscription shardMainSub = getShardMainSub(subName, shardId, shardTopicPartition, subProject); RetrySubscription retrySub = getRetrySub(subName, shardId, subProject, capacity, consumptionPolicy); @@ -96,7 +100,7 @@ private InternalCompositeSubscription getShardMainSub( ) { String shardSubName = getShardMainSubName(subscriptionName, shardId); StorageSubscription ss = subscriptionFactory.get(shardSubName, shardTopicPartition, project); - return new InternalCompositeSubscription(deployedRegion, new InternalQueueType.Main(), ss); + return InternalCompositeSubscription.of(ss, new InternalQueueType.Main()); } private String getShardMainSubName(String subscriptionName, int shardId) { @@ -141,21 +145,18 @@ private InternalCompositeSubscription getDltSub( } private InternalCompositeSubscription getInternalSub( - String subscriptionName, - int shardId, - InternalQueueType queueType, - int queueIndex, - Project project, TopicCapacityPolicy capacity, ConsumptionPolicy consumptionPolicy + String subscriptionName, int shardId, InternalQueueType queueType, int queueIndex, Project project, + TopicCapacityPolicy capacity, ConsumptionPolicy consumptionPolicy ) { - //TODO::handle cases where retry and dlt topic might be on different projects. + // TODO::handle cases where retry and dlt topic might be on different projects. String itSubName = getInternalSubName(subscriptionName, shardId, queueType.getCategory(), queueIndex); String itTopicName = getInternalTopicName(subscriptionName, shardId, queueType.getCategory(), queueIndex); - TopicCapacityPolicy errCapacity = capacity.from(consumptionPolicy.getMaxErrorThreshold(), 1); + TopicCapacityPolicy errCapacity = + capacity.from(consumptionPolicy.getMaxErrorThreshold(), READ_FAN_OUT_FOR_INTERNAL_QUEUE); StorageTopic st = topicFactory.getTopic(itTopicName, project, errCapacity, queueType.getCategory()); TopicPartitions tp = TopicPartitions.byTopic(st); StorageSubscription ss = subscriptionFactory.get(itSubName, tp, project); - - return new InternalCompositeSubscription(deployedRegion, queueType, ss); + return InternalCompositeSubscription.of(ss, queueType); } private String getInternalSubName( diff --git a/core/src/main/java/com/flipkart/varadhi/core/VaradhiTopicFactory.java b/server/src/main/java/com/flipkart/varadhi/utils/VaradhiTopicFactory.java similarity index 65% rename from core/src/main/java/com/flipkart/varadhi/core/VaradhiTopicFactory.java rename to server/src/main/java/com/flipkart/varadhi/utils/VaradhiTopicFactory.java index b5af7a2a..98b312d6 100644 --- a/core/src/main/java/com/flipkart/varadhi/core/VaradhiTopicFactory.java +++ b/server/src/main/java/com/flipkart/varadhi/utils/VaradhiTopicFactory.java @@ -1,4 +1,4 @@ -package com.flipkart.varadhi.core; +package com.flipkart.varadhi.utils; import com.flipkart.varadhi.entities.*; @@ -7,18 +7,26 @@ public class VaradhiTopicFactory { private final StorageTopicFactory topicFactory; + private final TopicCapacityPolicy defaultTopicCapacityPolicy; //TODO:: This is currently used to provide default value for primary region for the topic being created. //This should come from TopicResource a part of Regional/HA/BCP-DR policy. Since those are not available //use deploymentRegion as global single primary topic region as a workaround. private final String deploymentRegion; - public VaradhiTopicFactory(StorageTopicFactory topicFactory, String deploymentRegion) { + public VaradhiTopicFactory( + StorageTopicFactory topicFactory, String deploymentRegion, + TopicCapacityPolicy defaultTopicCapacityPolicy + ) { this.topicFactory = topicFactory; + this.defaultTopicCapacityPolicy = defaultTopicCapacityPolicy; this.deploymentRegion = deploymentRegion; } public VaradhiTopic get(Project project, TopicResource topicResource) { + if (null == topicResource.getCapacity()) { + topicResource.setCapacity(defaultTopicCapacityPolicy); + } VaradhiTopic vt = VaradhiTopic.of(topicResource); planDeployment(project, vt); return vt; @@ -28,11 +36,6 @@ private void planDeployment(Project project, VaradhiTopic varadhiTopic) { StorageTopic storageTopic = topicFactory.getTopic( varadhiTopic.getName(), project, varadhiTopic.getCapacity(), InternalQueueCategory.MAIN); - InternalCompositeTopic internalTopic = new InternalCompositeTopic( - deploymentRegion, - TopicState.Producing, - storageTopic - ); - varadhiTopic.addInternalTopic(internalTopic); + varadhiTopic.addInternalTopic(deploymentRegion, InternalCompositeTopic.of(storageTopic)); } } diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/consumer/ConsumerVerticle.java b/server/src/main/java/com/flipkart/varadhi/verticles/consumer/ConsumerVerticle.java index 56adbc84..a8b3aba5 100644 --- a/server/src/main/java/com/flipkart/varadhi/verticles/consumer/ConsumerVerticle.java +++ b/server/src/main/java/com/flipkart/varadhi/verticles/consumer/ConsumerVerticle.java @@ -15,18 +15,18 @@ public class ConsumerVerticle extends AbstractVerticle { private final VaradhiClusterManager clusterManager; - private final MemberInfo memberInfo; + private final ConsumerInfo consumerInfo; public ConsumerVerticle(MemberInfo memberInfo, VaradhiClusterManager clusterManager) { this.clusterManager = clusterManager; - this.memberInfo = memberInfo; + this.consumerInfo = ConsumerInfo.from(memberInfo); } @Override public void start(Promise startPromise) { MessageRouter messageRouter = clusterManager.getRouter(vertx); MessageExchange messageExchange = clusterManager.getExchange(vertx); - ConsumersManager consumersManager = new ConsumersManagerImpl(ConsumerInfo.from(memberInfo)); + ConsumersManager consumersManager = new ConsumersManagerImpl(consumerInfo); ControllerClient controllerClient = new ControllerClient(messageExchange); ConsumerApiMgr consumerApiManager = new ConsumerApiMgr(consumersManager); @@ -41,7 +41,7 @@ public void stop(Promise stopPromise) { } private void setupApiHandlers(MessageRouter messageRouter, ConsumerApiHandler handler) { - String consumerId = memberInfo.hostname(); + String consumerId = consumerInfo.getConsumerId(); messageRouter.sendHandler(consumerId, "start", handler::start); messageRouter.sendHandler(consumerId, "stop", handler::stop); messageRouter.requestHandler(consumerId, "status", handler::status); diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerVerticle.java b/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerVerticle.java index 183d9311..375b067e 100644 --- a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerVerticle.java +++ b/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerVerticle.java @@ -1,14 +1,19 @@ package com.flipkart.varadhi.verticles.webserver; +import com.flipkart.varadhi.Constants; import com.flipkart.varadhi.CoreServices; import com.flipkart.varadhi.auth.DefaultAuthorizationProvider; import com.flipkart.varadhi.cluster.VaradhiClusterManager; +import com.flipkart.varadhi.entities.StorageTopic; +import com.flipkart.varadhi.entities.TopicCapacityPolicy; +import com.flipkart.varadhi.entities.VaradhiTopic; +import com.flipkart.varadhi.spi.services.Producer; import com.flipkart.varadhi.utils.ShardProvisioner; import com.flipkart.varadhi.utils.VaradhiSubscriptionFactory; import com.flipkart.varadhi.verticles.controller.ControllerClient; import com.flipkart.varadhi.config.AppConfiguration; -import com.flipkart.varadhi.core.VaradhiTopicFactory; -import com.flipkart.varadhi.core.VaradhiTopicService; +import com.flipkart.varadhi.utils.VaradhiTopicFactory; +import com.flipkart.varadhi.services.VaradhiTopicService; import com.flipkart.varadhi.core.cluster.ControllerApi; import com.flipkart.varadhi.produce.otel.ProducerMetricHandler; import com.flipkart.varadhi.produce.services.ProducerService; @@ -36,13 +41,12 @@ import lombok.extern.slf4j.Slf4j; import java.util.*; +import java.util.function.Function; @Slf4j @ExtensionMethod({Extensions.RoutingContextExtension.class}) public class WebServerVerticle extends AbstractVerticle { - - private final String deployedRegion; private final Map routeBehaviourConfigurators = new HashMap<>(); private final AppConfiguration configuration; private final VaradhiClusterManager clusterManager; @@ -60,7 +64,6 @@ public class WebServerVerticle extends AbstractVerticle { public WebServerVerticle( AppConfiguration configuration, CoreServices services, VaradhiClusterManager clusterManager ) { - this.deployedRegion = configuration.getRestOptions().getDeployedRegion(); this.configuration = configuration; this.clusterManager = clusterManager; this.messagingStackProvider = services.getMessagingStackProvider(); @@ -184,8 +187,11 @@ private List getIamPolicyRoutes() { private List getAdminApiRoutes() { List routes = new ArrayList<>(); + TopicCapacityPolicy defaultTopicCapacity = configuration.getRestOptions().getDefaultTopicCapacity(); + String deployedRegion = configuration.getRestOptions().getDeployedRegion(); VaradhiTopicFactory varadhiTopicFactory = - new VaradhiTopicFactory(messagingStackProvider.getStorageTopicFactory(), deployedRegion); + new VaradhiTopicFactory( + messagingStackProvider.getStorageTopicFactory(), deployedRegion, defaultTopicCapacity); VaradhiSubscriptionFactory subscriptionFactory = new VaradhiSubscriptionFactory(messagingStackProvider.getStorageTopicService(), messagingStackProvider.getSubscriptionFactory(), @@ -211,9 +217,13 @@ private List getManagementEntitiesApiRoutes() { } private List getProduceApiRoutes() { + String deployedRegion = configuration.getRestOptions().getDeployedRegion(); HeaderValidationHandler headerValidator = new HeaderValidationHandler(configuration.getRestOptions()); + Function topicProvider = varadhiTopicService::get; + Function producerProvider = messagingStackProvider.getProducerFactory()::newProducer; + ProducerService producerService = new ProducerService(deployedRegion, configuration.getProducerOptions(), - messagingStackProvider.getProducerFactory(), varadhiTopicService, meterRegistry + producerProvider, topicProvider, meterRegistry ); ProducerMetricHandler producerMetricsHandler = new ProducerMetricHandler(configuration.getProducerOptions().isMetricEnabled(), meterRegistry); diff --git a/server/src/main/java/com/flipkart/varadhi/web/v1/admin/SubscriptionHandlers.java b/server/src/main/java/com/flipkart/varadhi/web/v1/admin/SubscriptionHandlers.java index 683b1170..7894eed0 100644 --- a/server/src/main/java/com/flipkart/varadhi/web/v1/admin/SubscriptionHandlers.java +++ b/server/src/main/java/com/flipkart/varadhi/web/v1/admin/SubscriptionHandlers.java @@ -1,6 +1,6 @@ package com.flipkart.varadhi.web.v1.admin; -import com.flipkart.varadhi.core.VaradhiTopicService; +import com.flipkart.varadhi.services.VaradhiTopicService; import com.flipkart.varadhi.entities.*; import com.flipkart.varadhi.services.ProjectService; import com.flipkart.varadhi.services.SubscriptionService; @@ -140,8 +140,10 @@ public void create(RoutingContext ctx) { public void update(RoutingContext ctx) { SubscriptionResource subscription = getValidSubscriptionResource(ctx); //TODO::Evaluate separating these into individual update APIs. + //Fix:: Update is allowed, though no change in the subscription, this can be avoided. executeAsyncRequest( - ctx, () -> subscriptionService.updateSubscription(subscription.getSubscriptionInternalName(), subscription.getVersion(), + ctx, () -> subscriptionService.updateSubscription(subscription.getSubscriptionInternalName(), + subscription.getVersion(), subscription.getDescription(), subscription.isGrouped(), subscription.getEndpoint(), subscription.getRetryPolicy(), subscription.getConsumptionPolicy(), ctx.getIdentityOrDefault() ).thenApply(SubscriptionResource::from)); @@ -150,7 +152,10 @@ public void update(RoutingContext ctx) { public void delete(RoutingContext ctx) { String projectName = ctx.pathParam(PATH_PARAM_PROJECT); Project subProject = projectService.getCachedProject(projectName); - executeAsyncRequest(ctx, () -> subscriptionService.deleteSubscription(getSubscriptionName(ctx), subProject, ctx.getIdentityOrDefault())); + executeAsyncRequest( + ctx, () -> subscriptionService.deleteSubscription(getSubscriptionName(ctx), subProject, + ctx.getIdentityOrDefault() + )); } public void start(RoutingContext ctx) { diff --git a/server/src/main/java/com/flipkart/varadhi/web/v1/admin/TopicHandlers.java b/server/src/main/java/com/flipkart/varadhi/web/v1/admin/TopicHandlers.java index 05aa4260..6b948884 100644 --- a/server/src/main/java/com/flipkart/varadhi/web/v1/admin/TopicHandlers.java +++ b/server/src/main/java/com/flipkart/varadhi/web/v1/admin/TopicHandlers.java @@ -1,7 +1,7 @@ package com.flipkart.varadhi.web.v1.admin; -import com.flipkart.varadhi.core.VaradhiTopicFactory; -import com.flipkart.varadhi.core.VaradhiTopicService; +import com.flipkart.varadhi.utils.VaradhiTopicFactory; +import com.flipkart.varadhi.services.VaradhiTopicService; import com.flipkart.varadhi.entities.*; import com.flipkart.varadhi.exceptions.DuplicateResourceException; import com.flipkart.varadhi.services.ProjectService; @@ -87,7 +87,7 @@ public ResourceHierarchy getHierarchy(RoutingContext ctx, boolean hasBody) { public void get(RoutingContext ctx) { String varadhiTopicName = getVaradhiTopicName(ctx); VaradhiTopic varadhiTopic = varadhiTopicService.get(varadhiTopicName); - TopicResource topicResource = TopicResource.of(varadhiTopic); + TopicResource topicResource = TopicResource.from(varadhiTopic); ctx.endApiWithResponse(topicResource); } @@ -110,7 +110,7 @@ public void create(RoutingContext ctx) { } VaradhiTopic vt = varadhiTopicFactory.get(project, topicResource); varadhiTopicService.create(vt, project); - ctx.endApiWithResponse(topicResource); + ctx.endApiWithResponse(TopicResource.from(vt)); } public void delete(RoutingContext ctx) { diff --git a/server/src/main/resources/configuration.yml b/server/src/main/resources/configuration.yml index 3b296614..2506d3f0 100755 --- a/server/src/main/resources/configuration.yml +++ b/server/src/main/resources/configuration.yml @@ -1,8 +1,8 @@ member: roles : ["Server", "Controller", "Consumer"] - cpuCount: 1 - networkMBps: 100 + maxQps: 5000 + networkMBps: 30 vertxOptions: eventLoopPoolSize: 1 @@ -28,6 +28,10 @@ httpServerOptions: tracingPolicy: "ALWAYS" restOptions: + defaultTopicCapacity: + throughputKBps: 400 + qps: 100 + readFanOut: 2 deployedRegion: "default" defaultOrg: "default" defaultTeam: "public" diff --git a/server/src/test/java/com/flipkart/varadhi/services/SubscriptionServiceTest.java b/server/src/test/java/com/flipkart/varadhi/services/SubscriptionServiceTest.java index ae72e715..88f753f5 100644 --- a/server/src/test/java/com/flipkart/varadhi/services/SubscriptionServiceTest.java +++ b/server/src/test/java/com/flipkart/varadhi/services/SubscriptionServiceTest.java @@ -1,6 +1,7 @@ package com.flipkart.varadhi.services; import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.flipkart.varadhi.Constants; import com.flipkart.varadhi.core.cluster.ControllerApi; import com.flipkart.varadhi.db.VaradhiMetaStore; import com.flipkart.varadhi.entities.*; @@ -119,7 +120,7 @@ void testSubscriptionEntitiesSerDe() { } catch (MalformedURLException e) { throw new RuntimeException(e); } - TopicCapacityPolicy capacity = TopicCapacityPolicy.getDefault(); + TopicCapacityPolicy capacity = Constants.DefaultTopicCapacity; String region = "default"; TopicPlanner planner = new TopicPlanner(new PulsarConfig()); @@ -131,12 +132,7 @@ void testSubscriptionEntitiesSerDe() { VaradhiTopic vt = VaradhiTopic.of(tr); StorageTopic storageTopic = ptf.getTopic(vt.getName(), o1t1p2, capacity, InternalQueueCategory.MAIN); - InternalCompositeTopic internalTopic = new InternalCompositeTopic( - region, - TopicState.Producing, - storageTopic - ); - vt.addInternalTopic(internalTopic); + vt.addInternalTopic(region, InternalCompositeTopic.of(storageTopic)); SubscriptionResource subRes = new SubscriptionResource( "sub12", @@ -340,17 +336,19 @@ void deleteSubscriptionRemovesSubscription(VertxTestContext ctx) { CompletableFuture status = CompletableFuture.completedFuture(new SubscriptionStatus(name, SubscriptionState.STOPPED)); doReturn(status).when(controllerApi).getSubscriptionStatus(name, requestedBy); - Future.fromCompletionStage(subscriptionService.deleteSubscription(name, o1t1p1, requestedBy)).onComplete(ctx.succeeding( - v -> { - Exception exception = assertThrows(ResourceNotFoundException.class, - () -> subscriptionService.getSubscription(name) - ); - String expectedMessage = "Subscription(%s) not found.".formatted(name); - String actualMessage = exception.getMessage(); - assertEquals(expectedMessage, actualMessage); - checkpoint.flag(); - } - )); + Future.fromCompletionStage(subscriptionService.deleteSubscription(name, o1t1p1, requestedBy)) + .onComplete(ctx.succeeding( + v -> { + Exception exception = assertThrows( + ResourceNotFoundException.class, + () -> subscriptionService.getSubscription(name) + ); + String expectedMessage = "Subscription(%s) not found.".formatted(name); + String actualMessage = exception.getMessage(); + assertEquals(expectedMessage, actualMessage); + checkpoint.flag(); + } + )); } private CompletableFuture updateSubscription(VaradhiSubscription to) { diff --git a/core/src/test/java/com/flipkart/varadhi/core/VaradhiTopicServiceTest.java b/server/src/test/java/com/flipkart/varadhi/services/VaradhiTopicServiceTest.java similarity index 94% rename from core/src/test/java/com/flipkart/varadhi/core/VaradhiTopicServiceTest.java rename to server/src/test/java/com/flipkart/varadhi/services/VaradhiTopicServiceTest.java index 3b2401fa..0a4b7386 100644 --- a/core/src/test/java/com/flipkart/varadhi/core/VaradhiTopicServiceTest.java +++ b/server/src/test/java/com/flipkart/varadhi/services/VaradhiTopicServiceTest.java @@ -1,11 +1,13 @@ -package com.flipkart.varadhi.core; +package com.flipkart.varadhi.services; +import com.flipkart.varadhi.Constants; import com.flipkart.varadhi.entities.*; import com.flipkart.varadhi.exceptions.VaradhiException; import com.flipkart.varadhi.pulsar.entities.PulsarStorageTopic; import com.flipkart.varadhi.spi.db.MetaStore; import com.flipkart.varadhi.spi.services.StorageTopicFactory; import com.flipkart.varadhi.spi.services.StorageTopicService; +import com.flipkart.varadhi.utils.VaradhiTopicFactory; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -34,13 +36,13 @@ public void setUp() { storageTopicService = mock(StorageTopicService.class); metaStore = mock(MetaStore.class); storageTopicFactory = mock(StorageTopicFactory.class); - varadhiTopicFactory = spy(new VaradhiTopicFactory(storageTopicFactory, region)); + varadhiTopicFactory = spy(new VaradhiTopicFactory(storageTopicFactory, region, Constants.DefaultTopicCapacity)); varadhiTopicService = new VaradhiTopicService(storageTopicService, metaStore); project = new Project("default", INITIAL_VERSION, "", "public", "public"); vTopicName = String.format("%s.%s", project.getName(), topicName); String pTopicName = String.format("persistent://%s/%s/%s", project.getOrg(), project.getName(), vTopicName); - capacityPolicy = TopicCapacityPolicy.getDefault(); + capacityPolicy = Constants.DefaultTopicCapacity; PulsarStorageTopic pTopic = PulsarStorageTopic.from(pTopicName, 1, capacityPolicy); Mockito.doReturn(pTopic).when(storageTopicFactory) .getTopic(vTopicName, project, capacityPolicy, InternalQueueCategory.MAIN); @@ -52,7 +54,7 @@ public void createVaradhiTopic() { VaradhiTopic varadhiTopic = varadhiTopicFactory.get(project, topicResource); varadhiTopicService.create(varadhiTopic, project); verify(metaStore, times(1)).createTopic(varadhiTopic); - StorageTopic st = varadhiTopic.getProduceTopicForRegion(region).getStorageTopic(); + StorageTopic st = varadhiTopic.getProduceTopicForRegion(region).getTopicToProduce(); verify(storageTopicService, times(1)).create(st, project); verify(storageTopicFactory, times(1)).getTopic(vTopicName, project, capacityPolicy, InternalQueueCategory.MAIN); } @@ -61,7 +63,7 @@ public void createVaradhiTopic() { public void createVaradhiTopicWhenMetaStoreFails() { TopicResource topicResource = getTopicResource(topicName, project); VaradhiTopic varadhiTopic = varadhiTopicFactory.get(project, topicResource); - StorageTopic st = varadhiTopic.getProduceTopicForRegion(region).getStorageTopic(); + StorageTopic st = varadhiTopic.getProduceTopicForRegion(region).getTopicToProduce(); doThrow(new VaradhiException("Some error")).when(metaStore).createTopic(varadhiTopic); Exception exception = Assertions.assertThrows(VaradhiException.class, () -> varadhiTopicService.create( @@ -77,7 +79,7 @@ public void createVaradhiTopicWhenMetaStoreFails() { public void createVaradhiTopicWhenStorageTopicServiceFails() { TopicResource topicResource = getTopicResource(topicName, project); VaradhiTopic varadhiTopic = varadhiTopicFactory.get(project, topicResource); - StorageTopic st = varadhiTopic.getProduceTopicForRegion(region).getStorageTopic(); + StorageTopic st = varadhiTopic.getProduceTopicForRegion(region).getTopicToProduce(); doThrow(new VaradhiException("Some error")).when(storageTopicService).create(st, project); Exception exception = Assertions.assertThrows(VaradhiException.class, () -> varadhiTopicService.create( @@ -93,7 +95,7 @@ public void createVaradhiTopicWhenStorageTopicServiceFails() { public void deleteVaradhiTopicSuccessfully() { TopicResource topicResource = getTopicResource(topicName, project); VaradhiTopic varadhiTopic = varadhiTopicFactory.get(project, topicResource); - StorageTopic st = varadhiTopic.getProduceTopicForRegion(region).getStorageTopic(); + StorageTopic st = varadhiTopic.getProduceTopicForRegion(region).getTopicToProduce(); when(storageTopicService.exists(st.getName())).thenReturn(true); when(metaStore.getTopic(varadhiTopic.getName())).thenReturn(varadhiTopic); when(metaStore.getProject(project.getName())).thenReturn(project); @@ -108,14 +110,12 @@ public void deleteVaradhiTopicSuccessfully() { public void deleteVaradhiTopicWhenStorageTopicDoesNotExist() { TopicResource topicResource = getTopicResource(topicName, project); VaradhiTopic varadhiTopic = varadhiTopicFactory.get(project, topicResource); - StorageTopic st = varadhiTopic.getProduceTopicForRegion(region).getStorageTopic(); + StorageTopic st = varadhiTopic.getProduceTopicForRegion(region).getTopicToProduce(); when(storageTopicService.exists(st.getName())).thenReturn(false); when(metaStore.getTopic(varadhiTopic.getName())).thenReturn(varadhiTopic); when(metaStore.getProject(project.getName())).thenReturn(project); - varadhiTopicService.delete(varadhiTopic.getName()); - - verify(storageTopicService, times(0)).delete(st.getName(), project); + verify(storageTopicService, times(1)).delete(st.getName(), project); verify(metaStore, times(1)).deleteTopic(varadhiTopic.getName()); } @@ -123,7 +123,7 @@ public void deleteVaradhiTopicWhenStorageTopicDoesNotExist() { public void deleteVaradhiTopicWhenMetaStoreFails() { TopicResource topicResource = getTopicResource(topicName, project); VaradhiTopic varadhiTopic = varadhiTopicFactory.get(project, topicResource); - StorageTopic st = varadhiTopic.getProduceTopicForRegion(region).getStorageTopic(); + StorageTopic st = varadhiTopic.getProduceTopicForRegion(region).getTopicToProduce(); when(storageTopicService.exists(st.getName())).thenReturn(true); when(metaStore.getTopic(varadhiTopic.getName())).thenReturn(varadhiTopic); when(metaStore.getProject(project.getName())).thenReturn(project); diff --git a/core/src/test/java/com/flipkart/varadhi/core/VaradhiTopicFactoryTest.java b/server/src/test/java/com/flipkart/varadhi/utils/VaradhiTopicFactoryTest.java similarity index 84% rename from core/src/test/java/com/flipkart/varadhi/core/VaradhiTopicFactoryTest.java rename to server/src/test/java/com/flipkart/varadhi/utils/VaradhiTopicFactoryTest.java index 926b5501..20c6f108 100644 --- a/core/src/test/java/com/flipkart/varadhi/core/VaradhiTopicFactoryTest.java +++ b/server/src/test/java/com/flipkart/varadhi/utils/VaradhiTopicFactoryTest.java @@ -1,5 +1,6 @@ -package com.flipkart.varadhi.core; +package com.flipkart.varadhi.utils; +import com.flipkart.varadhi.Constants; import com.flipkart.varadhi.entities.*; import com.flipkart.varadhi.pulsar.entities.PulsarStorageTopic; import com.flipkart.varadhi.spi.services.StorageTopicFactory; @@ -21,12 +22,12 @@ public class VaradhiTopicFactoryTest { @BeforeEach public void setUp() { storageTopicFactory = mock(StorageTopicFactory.class); - varadhiTopicFactory = new VaradhiTopicFactory(storageTopicFactory, region); + varadhiTopicFactory = new VaradhiTopicFactory(storageTopicFactory, region, Constants.DefaultTopicCapacity); project = new Project("default", INITIAL_VERSION, "", "public", "public"); vTopicName = String.format("%s.%s", project.getName(), topicName); String pTopicName = String.format("persistent://%s/%s", project.getOrg(), vTopicName); - TopicCapacityPolicy capacityPolicy = TopicCapacityPolicy.getDefault(); + TopicCapacityPolicy capacityPolicy = Constants.DefaultTopicCapacity; PulsarStorageTopic pTopic = PulsarStorageTopic.from(pTopicName, 1, capacityPolicy); doReturn(pTopic).when(storageTopicFactory) .getTopic(vTopicName, project, capacityPolicy, InternalQueueCategory.MAIN); @@ -34,7 +35,7 @@ public void setUp() { @Test public void getTopic() { - TopicCapacityPolicy capacityPolicy = TopicCapacityPolicy.getDefault(); + TopicCapacityPolicy capacityPolicy = Constants.DefaultTopicCapacity; TopicResource topicResource = new TopicResource( topicName, 1, @@ -45,16 +46,15 @@ public void getTopic() { VaradhiTopic varadhiTopic = varadhiTopicFactory.get(project, topicResource); Assertions.assertNotNull(varadhiTopic); InternalCompositeTopic it = varadhiTopic.getProduceTopicForRegion(region); - StorageTopic st = it.getStorageTopic(); + StorageTopic st = it.getTopicToProduce(); Assertions.assertEquals(it.getTopicState(), TopicState.Producing); - Assertions.assertEquals(it.getTopicRegion(), region); Assertions.assertNotNull(st); verify(storageTopicFactory, times(1)).getTopic(vTopicName, project, capacityPolicy, InternalQueueCategory.MAIN); } @Test public void getTopicWithDefaultCapacity() { - TopicCapacityPolicy capacityPolicy = TopicCapacityPolicy.getDefault(); + TopicCapacityPolicy capacityPolicy = Constants.DefaultTopicCapacity; TopicResource topicResource = new TopicResource( topicName, 1, @@ -64,7 +64,7 @@ public void getTopicWithDefaultCapacity() { ); VaradhiTopic varadhiTopic = varadhiTopicFactory.get(project, topicResource); InternalCompositeTopic it = varadhiTopic.getProduceTopicForRegion(region); - PulsarStorageTopic pt = (PulsarStorageTopic) it.getStorageTopic(); + PulsarStorageTopic pt = (PulsarStorageTopic) it.getTopicToProduce(); Assertions.assertEquals(capacityPolicy.getThroughputKBps(), pt.getCapacity().getThroughputKBps()); Assertions.assertEquals(capacityPolicy.getQps(), pt.getCapacity().getQps()); } diff --git a/server/src/test/java/com/flipkart/varadhi/web/admin/SubscriptionHandlersTest.java b/server/src/test/java/com/flipkart/varadhi/web/admin/SubscriptionHandlersTest.java index b6a535cd..3880ca8e 100644 --- a/server/src/test/java/com/flipkart/varadhi/web/admin/SubscriptionHandlersTest.java +++ b/server/src/test/java/com/flipkart/varadhi/web/admin/SubscriptionHandlersTest.java @@ -1,6 +1,6 @@ package com.flipkart.varadhi.web.admin; -import com.flipkart.varadhi.core.VaradhiTopicService; +import com.flipkart.varadhi.services.VaradhiTopicService; import com.flipkart.varadhi.entities.*; import com.flipkart.varadhi.exceptions.ResourceNotFoundException; import com.flipkart.varadhi.services.ProjectService; diff --git a/server/src/test/java/com/flipkart/varadhi/web/admin/TopicHandlersTest.java b/server/src/test/java/com/flipkart/varadhi/web/admin/TopicHandlersTest.java index 72828d7c..6001d3b4 100644 --- a/server/src/test/java/com/flipkart/varadhi/web/admin/TopicHandlersTest.java +++ b/server/src/test/java/com/flipkart/varadhi/web/admin/TopicHandlersTest.java @@ -1,7 +1,8 @@ package com.flipkart.varadhi.web.admin; -import com.flipkart.varadhi.core.VaradhiTopicFactory; -import com.flipkart.varadhi.core.VaradhiTopicService; +import com.flipkart.varadhi.Constants; +import com.flipkart.varadhi.utils.VaradhiTopicFactory; +import com.flipkart.varadhi.services.VaradhiTopicService; import com.flipkart.varadhi.entities.*; import com.flipkart.varadhi.services.ProjectService; import com.flipkart.varadhi.web.RequestTelemetryConfigurator; @@ -27,6 +28,10 @@ import static org.mockito.Mockito.*; public class TopicHandlersTest extends WebTestBase { + private final String topicName = "topic1"; + private final String team1 = "team1"; + private final String org1 = "org1"; + private final Project project = new Project("project1", 0, "", team1, org1); TopicHandlers topicHandlers; VaradhiTopicService varadhiTopicService; VaradhiTopicFactory varadhiTopicFactory; @@ -34,10 +39,6 @@ public class TopicHandlersTest extends WebTestBase { RequestTelemetryConfigurator requestTelemetryConfigurator; SpanProvider spanProvider; Span span; - private final String topicName = "topic1"; - private final String team1 = "team1"; - private final String org1 = "org1"; - private final Project project = new Project("project1", 0, "", team1, org1); @BeforeEach public void PreTest() throws InterruptedException { @@ -66,9 +67,11 @@ public void PreTest() throws InterruptedException { setupFailureHandler(routeCreate); Route routeGet = router.get("/projects/:project/topics/:topic").handler(wrapBlocking(topicHandlers::get)); setupFailureHandler(routeGet); - Route routeListAll = router.get("/projects/:project/topics").handler(bodyHandler).handler(wrapBlocking(topicHandlers::listTopics)); + Route routeListAll = router.get("/projects/:project/topics").handler(bodyHandler) + .handler(wrapBlocking(topicHandlers::listTopics)); setupFailureHandler(routeListAll); - Route routeDelete = router.delete("/projects/:project/topics/:topic").handler(wrapBlocking(topicHandlers::delete)); + Route routeDelete = + router.delete("/projects/:project/topics/:topic").handler(wrapBlocking(topicHandlers::delete)); setupFailureHandler(routeDelete); } @@ -81,7 +84,8 @@ public void PostTest() throws InterruptedException { public void testTopicCreate() throws InterruptedException { HttpRequest request = createRequest(HttpMethod.POST, getTopicsUrl(project)); TopicResource topicResource = getTopicResource(topicName, project); - + VaradhiTopic vt = VaradhiTopic.of(topicResource); + doReturn(vt).when(varadhiTopicFactory).get(project, topicResource); TopicResource t1Created = sendRequestWithBody(request, topicResource, TopicResource.class); Assertions.assertEquals(topicResource.getProject(), t1Created.getProject()); verify(spanProvider, times(1)).addSpan(eq(REQUEST_SPAN_NAME)); @@ -129,7 +133,7 @@ private TopicResource getTopicResource(String topicName, Project project) { 1, project.getName(), true, - TopicCapacityPolicy.getDefault() + Constants.DefaultTopicCapacity ); } diff --git a/server/src/test/resources/testConfiguration.yml b/server/src/test/resources/testConfiguration.yml index e5f24eea..89d5d145 100644 --- a/server/src/test/resources/testConfiguration.yml +++ b/server/src/test/resources/testConfiguration.yml @@ -1,6 +1,5 @@ member: roles : ["Server"] - cpuCount: 1 networkMBps: 100 deliveryOptions: diff --git a/server/src/testE2E/java/com/flipkart/varadhi/TopicTests.java b/server/src/testE2E/java/com/flipkart/varadhi/TopicTests.java index f93a4226..96d31e98 100644 --- a/server/src/testE2E/java/com/flipkart/varadhi/TopicTests.java +++ b/server/src/testE2E/java/com/flipkart/varadhi/TopicTests.java @@ -58,8 +58,7 @@ public void createTopic() { Assertions.assertEquals(topic.getName(), r.getName()); Assertions.assertEquals(topic.getProject(), r.getProject()); Assertions.assertEquals(topic.isGrouped(), r.isGrouped()); - Assertions.assertNull(r.getCapacity()); - //TODO::fix this. + Assertions.assertNotNull(r.getCapacity()); String errorDuplicateTopic = String.format( "Specified Topic(%s) already exists.",