Skip to content

Commit

Permalink
Controller - Operation handling -- WIP (#146)
Browse files Browse the repository at this point in the history
serializing the shard assignment for all,  subscription operation execution and updates on subscription operation for a subscription
  • Loading branch information
kmrdhruv authored Jun 10, 2024
1 parent 16e60ef commit d77831a
Show file tree
Hide file tree
Showing 48 changed files with 969 additions and 388 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.core.cluster.ConsumerApi;
import com.flipkart.varadhi.entities.cluster.ConsumerInfo;
import com.flipkart.varadhi.entities.cluster.ShardOperation;
import com.flipkart.varadhi.entities.VaradhiSubscription;
import com.flipkart.varadhi.entities.cluster.ShardState;
Expand Down Expand Up @@ -33,7 +34,22 @@ public CompletableFuture<Void> start(ShardOperation.StartData operation) {
}

@Override
public CompletableFuture<ShardStatus> getStatus(String subscriptionId, int shardId) {
public CompletableFuture<Void> stop(ShardOperation.StopData operation) {
VaradhiSubscription subscription = operation.getSubscription();
return consumersManager.stopSubscription(
subscription.getName(),
""
);
}

@Override
public CompletableFuture<ShardStatus> getShardStatus(String subscriptionId, int shardId) {
return CompletableFuture.completedFuture(new ShardStatus(ShardState.UNKNOWN, "Not a owner of shard"));
}

@Override
public CompletableFuture<ConsumerInfo> getConsumerInfo() {
//TODO::Return assignments as well.
return CompletableFuture.completedFuture(consumersManager.getInfo());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.flipkart.varadhi.entities.Endpoint;
import com.flipkart.varadhi.entities.Project;
import com.flipkart.varadhi.entities.StorageTopic;
import com.flipkart.varadhi.entities.cluster.ConsumerInfo;
import com.flipkart.varadhi.spi.services.TopicPartitions;

import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -40,4 +41,6 @@ CompletableFuture<Void> startSubscription(
ConsumerState getConsumerState(String subscription, String shardName);

// TODO likely need status on the starting / stopping as well; as the above status is for a running consumer..

ConsumerInfo getInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@
import com.flipkart.varadhi.consumer.ConsumerState;
import com.flipkart.varadhi.consumer.ConsumersManager;
import com.flipkart.varadhi.consumer.ConsumptionFailurePolicy;
import com.flipkart.varadhi.entities.ConsumptionPolicy;
import com.flipkart.varadhi.entities.Endpoint;
import com.flipkart.varadhi.entities.Project;
import com.flipkart.varadhi.entities.StorageTopic;
import com.flipkart.varadhi.entities.*;
import com.flipkart.varadhi.entities.cluster.ConsumerInfo;
import com.flipkart.varadhi.spi.services.TopicPartitions;

import java.util.concurrent.CompletableFuture;

public class ConsumersManagerImpl implements ConsumersManager {
private final ConsumerInfo consumerInfo;


public ConsumersManagerImpl() {
public ConsumersManagerImpl(ConsumerInfo consumerInfo) {
this.consumerInfo = consumerInfo;
}

@Override
Expand All @@ -28,7 +27,7 @@ public CompletableFuture<Void> startSubscription(

@Override
public CompletableFuture<Void> stopSubscription(String subscription, String shardName) {
return null;
return CompletableFuture.completedFuture(null);
}

@Override
Expand All @@ -45,4 +44,9 @@ public void resumeSubscription(String subscription, String shardName) {
public ConsumerState getConsumerState(String subscription, String shardName) {
return null;
}

@Override
public ConsumerInfo getInfo() {
return consumerInfo;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,29 +1,38 @@
package com.flipkart.varadhi.controller;

import com.flipkart.varadhi.entities.SubscriptionShards;
import com.flipkart.varadhi.entities.cluster.Assignment;
import com.flipkart.varadhi.entities.cluster.ConsumerNode;
import com.flipkart.varadhi.controller.impl.LeastAssignedStrategy;
import com.flipkart.varadhi.entities.SubscriptionUnitShard;
import com.flipkart.varadhi.entities.VaradhiSubscription;
import com.flipkart.varadhi.spi.db.AssignmentStore;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableBoolean;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.*;
import java.util.stream.Collectors;

@Slf4j
public class ShardAssigner {
private final String metricPrefix = "controller.assigner";
private final AssignmentStrategy strategy;
private final Map<String, ConsumerNode> consumerNodes;
private final AssignmentStore assignmentStore;
private final ExecutorService executor;

public ShardAssigner(AssignmentStore assignmentStore) {
public ShardAssigner(AssignmentStore assignmentStore, MeterRegistry meterRegistry) {
this.strategy = new LeastAssignedStrategy();
this.consumerNodes = new ConcurrentHashMap<>();
this.assignmentStore = assignmentStore;
//TODO::ExecutorService should emit the metrics.
this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("assigner-%d").build());
}

public void addConsumerNodes(List<ConsumerNode> clusterConsumers) {
Expand All @@ -33,16 +42,60 @@ public void addConsumerNodes(List<ConsumerNode> clusterConsumers) {
});
}

public List<Assignment> assignShard(List<SubscriptionUnitShard> shards, VaradhiSubscription subscription) {
//TODO:: It need to ensure, assignment is not using stale values, specifically if they are running in parallel.
List<ConsumerNode> activeConsumers =
consumerNodes.values().stream().filter(c -> !c.isMarkedForDeletion()).collect(Collectors.toList());
log.info("AssignShards consumer nodes active:{} of total:{}", activeConsumers.size(), consumerNodes.size());
List<Assignment> assignments = strategy.assign(shards, subscription, activeConsumers);
assignmentStore.createAssignments(assignments);
return assignments;
public CompletableFuture<List<Assignment>> assignShard(
List<SubscriptionUnitShard> shards, VaradhiSubscription subscription
) {
return CompletableFuture.supplyAsync(() -> {
List<ConsumerNode> activeConsumers =
consumerNodes.values().stream().filter(c -> !c.isMarkedForDeletion()).collect(Collectors.toList());
log.info(
"AssignShards found consumer nodes active:{} of total:{}", activeConsumers.size(),
consumerNodes.size()
);

List<Assignment> assignments = new ArrayList<>();
try {
assignments.addAll(strategy.assign(shards, subscription, activeConsumers));
assignmentStore.createAssignments(assignments);
return assignments;
} catch (Exception e) {
log.error("Failed while creating assignment, freeing up any allocation done. {}.", e.getMessage());
assignments.forEach(assignment -> freeAssignedCapacity(assignment, subscription));
throw e;
}
}, executor);
}


public CompletableFuture<Void> unAssignShard(List<Assignment> assignments, VaradhiSubscription subscription) {
return CompletableFuture.supplyAsync(() -> {
try {
assignmentStore.deleteAssignments(assignments);
assignments.forEach(a -> {
String consumerId = a.getConsumerId();
ConsumerNode consumerNode = consumerNodes.getOrDefault(consumerId, null);
if (null == consumerNode) {
log.error("Consumer node not found, for assignment {}. Ignoring unAssignShard", a);
} else {
SubscriptionShards shards = subscription.getShards();
consumerNode.free(a, shards.getShard(a.getShardId()).getCapacityRequest());
}
});
return null;
} catch (Exception e) {
log.error("Failed while unAssigning Shards. {}.", e.getMessage());
throw e;
}
}, executor);
}

private void freeAssignedCapacity(Assignment assignment, VaradhiSubscription subscription) {
SubscriptionUnitShard shard = subscription.getShards().getShard(assignment.getShardId());
ConsumerNode consumerNode = consumerNodes.get(assignment.getConsumerId());
consumerNode.free(assignment, shard.getCapacityRequest());
}


public List<Assignment> getSubscriptionAssignment(String subscriptionName) {
return assignmentStore.getSubscriptionAssignments(subscriptionName);
}
Expand All @@ -54,19 +107,6 @@ public void consumerNodeJoined(ConsumerNode consumerNode) {
}
}

private boolean addConsumerNode(ConsumerNode consumerNode) {
String consumerNodeId = consumerNode.getMemberInfo().hostname();
MutableBoolean added = new MutableBoolean(false);
consumerNodes.computeIfAbsent(consumerNodeId, k -> {
added.setTrue();
return consumerNode;
});
if (!added.booleanValue()) {
log.warn("ConsumerNode {} already exists. Not adding again.", consumerNodes.get(consumerNodeId));
}
return added.booleanValue();
}

public void consumerNodeLeft(String consumerNodeId) {
//TODO:: re-assign the shards (should this be trigger from here or from the controller) ?
MutableBoolean marked = new MutableBoolean(false);
Expand All @@ -81,4 +121,18 @@ public void consumerNodeLeft(String consumerNodeId) {
log.warn("ConsumerNode {} not found.", consumerNodeId);
}
}

private boolean addConsumerNode(ConsumerNode consumerNode) {
String consumerNodeId = consumerNode.getMemberInfo().hostname();
MutableBoolean added = new MutableBoolean(false);
consumerNodes.computeIfAbsent(consumerNodeId, k -> {
added.setTrue();
return consumerNode;
});
if (!added.booleanValue()) {
log.warn("ConsumerNode {} already exists. Not adding again.", consumerNodes.get(consumerNodeId));
}
return added.booleanValue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public List<Assignment> assign(
List<SubscriptionUnitShard> shards, VaradhiSubscription subscription, List<ConsumerNode> consumerNodes
) {
if (consumerNodes.isEmpty()) {
log.error("Shard Assignment Failure: No active consumer nodes.");
throw new CapacityException("No active consumer node for Subscription assignment.");
}
List<Assignment> assignments = new ArrayList<>();
Expand All @@ -44,7 +43,7 @@ public List<Assignment> assign(
}
Assignment assignment =
new Assignment(subscription.getName(), shard.getShardId(), consumerNode.getMemberInfo().hostname());
consumerNode.allocate(shard.getCapacityRequest());
consumerNode.allocate(assignment, shard.getCapacityRequest());
assignments.add(assignment);
consumers.add(consumerNode);
}
Expand Down
1 change: 1 addition & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies {
api(project(':spi'))
implementation("io.vertx:vertx-core:${vertx_version}")
implementation('com.fasterxml.jackson.core:jackson-databind')
implementation("com.google.guava:guava")

testFixturesImplementation(project(":common"))
testImplementation(project(':pulsar'))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.flipkart.varadhi.core.cluster;

import com.flipkart.varadhi.entities.cluster.ConsumerInfo;
import com.flipkart.varadhi.entities.cluster.ShardOperation;
import com.flipkart.varadhi.entities.cluster.ShardStatus;

Expand All @@ -8,5 +9,9 @@
public interface ConsumerApi {
CompletableFuture<Void> start(ShardOperation.StartData operation);

CompletableFuture<ShardStatus> getStatus(String subscriptionId, int shardId);
CompletableFuture<Void> stop(ShardOperation.StopData operation);

CompletableFuture<ShardStatus> getShardStatus(String subscriptionId, int shardId);

CompletableFuture<ConsumerInfo> getConsumerInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
public interface ControllerApi {
String ROUTE_CONTROLLER = "controller";

CompletableFuture<Void> startSubscription(SubscriptionOperation.StartData operation);
CompletableFuture<SubscriptionOperation> startSubscription(String subscriptionId, String requestedBy);

CompletableFuture<Void> stopSubscription(SubscriptionOperation.StopData operation);
CompletableFuture<SubscriptionOperation> stopSubscription(String subscriptionId, String requestedBy);

CompletableFuture<Void> update(ShardOperation.OpData operation);
}
Loading

0 comments on commit d77831a

Please sign in to comment.