Skip to content

Commit

Permalink
Refactoring and start consumer integration. (#154)
Browse files Browse the repository at this point in the history
* Refactoring for
- Add active produce/consume topic/subscription to composite entities
- Move default topic/node capacity to config from hard coded values.
- moved varadhitopic factory/service to server module.
- refactoring related to start subscription in consumer
  • Loading branch information
kmrdhruv authored Jun 19, 2024
1 parent fc87a1e commit 0e14aec
Show file tree
Hide file tree
Showing 51 changed files with 397 additions and 370 deletions.
4 changes: 4 additions & 0 deletions common/src/main/java/com/flipkart/varadhi/Constants.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.flipkart.varadhi;

import com.flipkart.varadhi.entities.TopicCapacityPolicy;

public class Constants {
public static final int RANDOM_PARTITION_KEY_LENGTH = 5;
public static final String CONTEXT_KEY_BODY = "varadhi.body";
Expand All @@ -8,6 +10,8 @@ public class Constants {
// TODO: this header is only for testing and x_ convention may cause it to be sent to the destination during consume
public static final String USER_ID_HEADER = "x_user_id";

public static final TopicCapacityPolicy DefaultTopicCapacity = new TopicCapacityPolicy(100, 400, 2);

public static class PathParams {
public static final String PATH_PARAM_ORG = "org";
public static final String PATH_PARAM_TEAM = "team";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.core.cluster.ConsumerApi;
import com.flipkart.varadhi.entities.StorageSubscription;
import com.flipkart.varadhi.entities.StorageTopic;
import com.flipkart.varadhi.entities.SubscriptionUnitShard;
import com.flipkart.varadhi.entities.cluster.ConsumerInfo;
import com.flipkart.varadhi.entities.cluster.ShardOperation;
import com.flipkart.varadhi.entities.VaradhiSubscription;
Expand All @@ -20,16 +23,24 @@ public ConsumerApiMgr(ConsumersManager consumersManager) {

@Override
public CompletableFuture<Void> start(ShardOperation.StartData operation) {
log.info("Consumer: Starting shard {}", operation);
VaradhiSubscription subscription = operation.getSubscription();
SubscriptionUnitShard shard = operation.getShard();
StorageSubscription<StorageTopic> mainSub = shard.getMainSubscription().getSubscriptionToConsume();
ConsumptionFailurePolicy failurePolicy =
new ConsumptionFailurePolicy(subscription.getRetryPolicy(), shard.getRetrySubscription(),
shard.getDeadLetterSubscription()
);

return consumersManager.startSubscription(
null,
subscription.getProject(),
subscription.getName(),
"",
null,
operation.getShardId(),
mainSub,
subscription.isGrouped(),
subscription.getEndpoint(),
subscription.getConsumptionPolicy(),
null
failurePolicy
);
}

Expand All @@ -38,7 +49,7 @@ public CompletableFuture<Void> stop(ShardOperation.StopData operation) {
VaradhiSubscription subscription = operation.getSubscription();
return consumersManager.stopSubscription(
subscription.getName(),
""
operation.getShardId()
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.entities.ConsumptionPolicy;
import com.flipkart.varadhi.entities.Endpoint;
import com.flipkart.varadhi.entities.Project;
import com.flipkart.varadhi.entities.StorageTopic;
import com.flipkart.varadhi.entities.*;
import com.flipkart.varadhi.entities.cluster.ConsumerInfo;
import com.flipkart.varadhi.entities.TopicPartitions;

import java.util.concurrent.CompletableFuture;

Expand All @@ -22,23 +18,23 @@ public interface ConsumersManager {
* @return
*/
CompletableFuture<Void> startSubscription(
Project project,
String project,
String subscription,
String shardName,
TopicPartitions<StorageTopic> topic,
int shardId,
StorageSubscription<StorageTopic> mainSubscription,
boolean grouped,
Endpoint endpoint,
ConsumptionPolicy consumptionPolicy,
ConsumptionFailurePolicy failurePolicy
);

CompletableFuture<Void> stopSubscription(String subscription, String shardName);
CompletableFuture<Void> stopSubscription(String subscription, int shardId);

void pauseSubscription(String subscription, String shardName);
void pauseSubscription(String subscription, int shardId);

void resumeSubscription(String subscription, String shardName);
void resumeSubscription(String subscription, int shardId);

ConsumerState getConsumerState(String subscription, String shardName);
ConsumerState getConsumerState(String subscription, int shardId);

// TODO likely need status on the starting / stopping as well; as the above status is for a running consumer..

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.entities.RetryPolicy;
import com.flipkart.varadhi.entities.StorageTopic;
import com.flipkart.varadhi.entities.*;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

Expand All @@ -11,7 +10,7 @@ public class ConsumptionFailurePolicy {

private final RetryPolicy retryPolicy;

private final StorageRetryTopic retryTopic;
private final RetrySubscription retrySubscription;

private final StorageTopic deadLetterTopic;
private final InternalCompositeSubscription deadLetterSubscription;
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,30 @@ public ConsumersManagerImpl(ConsumerInfo consumerInfo) {

@Override
public CompletableFuture<Void> startSubscription(
Project project, String subscription, String shardName, TopicPartitions<StorageTopic> topic,
String project, String subscription, int shardId, StorageSubscription<StorageTopic> storageSubscription,
boolean grouped, Endpoint endpoint, ConsumptionPolicy consumptionPolicy,
ConsumptionFailurePolicy failurePolicy
) {
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> stopSubscription(String subscription, String shardName) {
public CompletableFuture<Void> stopSubscription(String subscription, int shardId) {
return CompletableFuture.completedFuture(null);
}

@Override
public void pauseSubscription(String subscription, String shardName) {
public void pauseSubscription(String subscription, int shardId) {

}

@Override
public void resumeSubscription(String subscription, String shardName) {
public void resumeSubscription(String subscription, int shardId) {

}

@Override
public ConsumerState getConsumerState(String subscription, String shardName) {
public ConsumerState getConsumerState(String subscription, int shardId) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ private SubscriptionStatus getSubscriptionStatusFromShardStatus(
public CompletableFuture<SubscriptionOperation> startSubscription(
String subscriptionId, String requestedBy
) {
//TODO:: Fix it -assignment failure is not failing the start op. Task failure in the operation mgr queue.
VaradhiSubscription subscription = metaStore.getSubscription(subscriptionId);
return getSubscriptionStatus(subscription).exceptionally(t -> {
// If not temporary, then alternate needs to be provided to allow recovery from this.
Expand Down Expand Up @@ -111,7 +112,7 @@ private CompletableFuture<List<Assignment>> getOrCreateShardAssignment(VaradhiSu
return shardAssigner.assignShard(unAssigned, subscription);
} else {
log.info(
"{} Shards for Subscription {} are already assigned", assignedShards.size(),
"{} Shards for Subscription {} are already assigned.", assignedShards.size(),
subscription.getName()
);
return CompletableFuture.completedFuture(assignedShards);
Expand Down Expand Up @@ -250,7 +251,7 @@ private List<SubscriptionUnitShard> getSubscriptionShards(SubscriptionShards sha

@Override
public CompletableFuture<Void> update(ShardOperation.OpData opData) {
log.debug("Received update on shard operation: {}", opData);
log.info("Received update on shard operation: {}", opData);
try {
// Update is getting executed inline on dispatcher thread.
operationMgr.updateShardOp(opData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.flipkart.varadhi.spi.db.AssignmentStore;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableBoolean;

Expand Down Expand Up @@ -38,7 +37,7 @@ public ShardAssigner(AssignmentStore assignmentStore, MeterRegistry meterRegistr
public void addConsumerNodes(List<ConsumerNode> clusterConsumers) {
clusterConsumers.forEach(c -> {
addConsumerNode(c);
log.info("Added consumer node {}", c.getMemberInfo().hostname());
log.info("Added consumer node {}", c.getConsumerId());
});
}

Expand Down Expand Up @@ -103,7 +102,7 @@ public List<Assignment> getSubscriptionAssignment(String subscriptionName) {
public void consumerNodeJoined(ConsumerNode consumerNode) {
boolean added = addConsumerNode(consumerNode);
if (added) {
log.info("ConsumerNode {} joined.", consumerNode.getMemberInfo().hostname());
log.info("ConsumerNode {} joined.", consumerNode.getConsumerId());
}
}

Expand All @@ -123,7 +122,7 @@ public void consumerNodeLeft(String consumerNodeId) {
}

private boolean addConsumerNode(ConsumerNode consumerNode) {
String consumerNodeId = consumerNode.getMemberInfo().hostname();
String consumerNodeId = consumerNode.getConsumerId();
MutableBoolean added = new MutableBoolean(false);
consumerNodes.computeIfAbsent(consumerNodeId, k -> {
added.setTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public List<Assignment> assign(
throw new CapacityException("Not enough Resources for Subscription assignment.");
}
Assignment assignment =
new Assignment(subscription.getName(), shard.getShardId(), consumerNode.getMemberInfo().hostname());
new Assignment(subscription.getName(), shard.getShardId(), consumerNode.getConsumerId());
consumerNode.allocate(assignment, shard.getCapacityRequest());
assignments.add(assignment);
consumers.add(consumerNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ private void handleSubOpUpdate(
SubscriptionOperation.OpData updated = operation.getData();
subOps.compute(operation.getData().getSubscriptionId(), (subId, scheduledTasks) -> {
if (null != scheduledTasks && !scheduledTasks.isEmpty()) {

// process the update using provided handler.
// Update processing can take time, this will affect a subscription.
if (null != updateHandler) {
Expand All @@ -102,14 +101,17 @@ private void handleSubOpUpdate(
// Remove completed operation from the pending list and schedule next operation if available.
if (operation.completed()) {
scheduledTasks.removeFirst();
log.info("Completed SubOp({}) removed from the queue.", updated);
log.info("Completed SubOp({}) removed from the queue.", operation.getData());
if (scheduledTasks.isEmpty()) {
log.info("No more pending operation for {}.", subId);
return null;
} else {
OpTask waiting = scheduledTasks.peekFirst();
log.info("Pending SubOp({}) scheduled for execution.", waiting.operation.getData());
log.info("Next pending SubOp({}) scheduled for execution.", waiting.operation.getData());
executor.submit(waiting);
}
}else{
log.info("Pending SubOp({}) still in progress", operation.getData());
}
return scheduledTasks;
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,43 @@
package com.flipkart.varadhi.entities;

import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.Getter;

@Data
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

@Getter
@AllArgsConstructor
public class InternalCompositeSubscription {
private final String subRegion;
private final InternalQueueType queueType;
private StorageSubscription<StorageTopic>[] storageSubscriptions;
private int produceIndex;
private int consumeIndex;


public static InternalCompositeSubscription of(
StorageSubscription<StorageTopic> storageSubscription, InternalQueueType queueType
) {
return new InternalCompositeSubscription(queueType, new StorageSubscription[]{storageSubscription}, 0, 0);
}

@JsonIgnore
public StorageTopic getTopicToProduce() {
if (queueType.getCategory() == InternalQueueCategory.MAIN) {
throw new IllegalArgumentException("Main Subscription does not have a topic to produce");
}
return storageSubscriptions[produceIndex].getTopicPartitions().getTopic();
}

@JsonIgnore
public StorageSubscription<StorageTopic> getSubscriptionToConsume() {
return storageSubscriptions[consumeIndex];
}

/**
* As of now only 1 is supported, but in future this can be an array where we can add more storage topics.
*/
private final StorageSubscription<StorageTopic> storageSubscription;
@JsonIgnore
public List<StorageSubscription<StorageTopic>> getActiveSubscriptions() {
return new ArrayList<>(Arrays.asList(storageSubscriptions));
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,39 @@
package com.flipkart.varadhi.entities;


import lombok.Data;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* A wrapper on the storage topic. In future this class will handle the adding additional storage topics for the purpose
* of increasing partition count without affecting ordering.
* This concept is internal and is never exposed to the user.
*/
@Data
@Getter
@AllArgsConstructor
public class InternalCompositeTopic {
private StorageTopic[] storageTopics;
private int produceIndex;
@Setter
private TopicState topicState;

private final String topicRegion;
public static InternalCompositeTopic of(StorageTopic storageTopic) {
return new InternalCompositeTopic(new StorageTopic[]{storageTopic}, 0, TopicState.Producing);
}

private final TopicState topicState;
@JsonIgnore
public StorageTopic getTopicToProduce() {
return storageTopics[produceIndex];
}

/**
* As of now only 1 is supported, but in future this can be an array where we can add more storage topics.
*/
private final StorageTopic storageTopic;
@JsonIgnore
public List<StorageTopic> getActiveTopics() {
return new ArrayList<>(Arrays.asList(storageTopics));
}
}
Loading

0 comments on commit 0e14aec

Please sign in to comment.