Skip to content

Commit

Permalink
serializing the shard assignment for all, subscription operation exec…
Browse files Browse the repository at this point in the history
…ution and updates on subscription operation for a subscription
  • Loading branch information
kmrdhruv committed May 30, 2024
1 parent 094a279 commit 16211de
Show file tree
Hide file tree
Showing 48 changed files with 969 additions and 388 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.core.cluster.ConsumerApi;
import com.flipkart.varadhi.entities.cluster.ConsumerInfo;
import com.flipkart.varadhi.entities.cluster.ShardOperation;
import com.flipkart.varadhi.entities.VaradhiSubscription;
import com.flipkart.varadhi.entities.cluster.ShardState;
Expand Down Expand Up @@ -33,7 +34,22 @@ public CompletableFuture<Void> start(ShardOperation.StartData operation) {
}

@Override
public CompletableFuture<ShardStatus> getStatus(String subscriptionId, int shardId) {
public CompletableFuture<Void> stop(ShardOperation.StopData operation) {
VaradhiSubscription subscription = operation.getSubscription();
return consumersManager.stopSubscription(
subscription.getName(),
""
);
}

@Override
public CompletableFuture<ShardStatus> getShardStatus(String subscriptionId, int shardId) {
return CompletableFuture.completedFuture(new ShardStatus(ShardState.UNKNOWN, "Not a owner of shard"));
}

@Override
public CompletableFuture<ConsumerInfo> getConsumerInfo() {
//TODO::Return assignments as well.
return CompletableFuture.completedFuture(consumersManager.getInfo());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.flipkart.varadhi.entities.Endpoint;
import com.flipkart.varadhi.entities.Project;
import com.flipkart.varadhi.entities.StorageTopic;
import com.flipkart.varadhi.entities.cluster.ConsumerInfo;
import com.flipkart.varadhi.spi.services.TopicPartitions;

import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -40,4 +41,6 @@ CompletableFuture<Void> startSubscription(
ConsumerState getConsumerState(String subscription, String shardName);

// TODO likely need status on the starting / stopping as well; as the above status is for a running consumer..

ConsumerInfo getInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@
import com.flipkart.varadhi.consumer.ConsumerState;
import com.flipkart.varadhi.consumer.ConsumersManager;
import com.flipkart.varadhi.consumer.ConsumptionFailurePolicy;
import com.flipkart.varadhi.entities.ConsumptionPolicy;
import com.flipkart.varadhi.entities.Endpoint;
import com.flipkart.varadhi.entities.Project;
import com.flipkart.varadhi.entities.StorageTopic;
import com.flipkart.varadhi.entities.*;
import com.flipkart.varadhi.entities.cluster.ConsumerInfo;
import com.flipkart.varadhi.spi.services.TopicPartitions;

import java.util.concurrent.CompletableFuture;

public class ConsumersManagerImpl implements ConsumersManager {
private final ConsumerInfo consumerInfo;


public ConsumersManagerImpl() {
public ConsumersManagerImpl(ConsumerInfo consumerInfo) {
this.consumerInfo = consumerInfo;
}

@Override
Expand All @@ -28,7 +27,7 @@ public CompletableFuture<Void> startSubscription(

@Override
public CompletableFuture<Void> stopSubscription(String subscription, String shardName) {
return null;
return CompletableFuture.completedFuture(null);
}

@Override
Expand All @@ -45,4 +44,9 @@ public void resumeSubscription(String subscription, String shardName) {
public ConsumerState getConsumerState(String subscription, String shardName) {
return null;
}

@Override
public ConsumerInfo getInfo() {
return consumerInfo;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,29 +1,38 @@
package com.flipkart.varadhi.controller;

import com.flipkart.varadhi.entities.SubscriptionShards;
import com.flipkart.varadhi.entities.cluster.Assignment;
import com.flipkart.varadhi.entities.cluster.ConsumerNode;
import com.flipkart.varadhi.controller.impl.LeastAssignedStrategy;
import com.flipkart.varadhi.entities.SubscriptionUnitShard;
import com.flipkart.varadhi.entities.VaradhiSubscription;
import com.flipkart.varadhi.spi.db.AssignmentStore;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableBoolean;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.*;
import java.util.stream.Collectors;

@Slf4j
public class ShardAssigner {
private final String metricPrefix = "controller.assigner";
private final AssignmentStrategy strategy;
private final Map<String, ConsumerNode> consumerNodes;
private final AssignmentStore assignmentStore;
private final ExecutorService executor;

public ShardAssigner(AssignmentStore assignmentStore) {
public ShardAssigner(AssignmentStore assignmentStore, MeterRegistry meterRegistry) {
this.strategy = new LeastAssignedStrategy();
this.consumerNodes = new ConcurrentHashMap<>();
this.assignmentStore = assignmentStore;
//TODO::ExecutorService should emit the metrics.
this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("assigner-%d").build());
}

public void addConsumerNodes(List<ConsumerNode> clusterConsumers) {
Expand All @@ -33,16 +42,60 @@ public void addConsumerNodes(List<ConsumerNode> clusterConsumers) {
});
}

public List<Assignment> assignShard(List<SubscriptionUnitShard> shards, VaradhiSubscription subscription) {
//TODO:: It need to ensure, assignment is not using stale values, specifically if they are running in parallel.
List<ConsumerNode> activeConsumers =
consumerNodes.values().stream().filter(c -> !c.isMarkedForDeletion()).collect(Collectors.toList());
log.info("AssignShards consumer nodes active:{} of total:{}", activeConsumers.size(), consumerNodes.size());
List<Assignment> assignments = strategy.assign(shards, subscription, activeConsumers);
assignmentStore.createAssignments(assignments);
return assignments;
public CompletableFuture<List<Assignment>> assignShard(
List<SubscriptionUnitShard> shards, VaradhiSubscription subscription
) {
return CompletableFuture.supplyAsync(() -> {
List<ConsumerNode> activeConsumers =
consumerNodes.values().stream().filter(c -> !c.isMarkedForDeletion()).collect(Collectors.toList());
log.info(
"AssignShards found consumer nodes active:{} of total:{}", activeConsumers.size(),
consumerNodes.size()
);

List<Assignment> assignments = new ArrayList<>();
try {
assignments.addAll(strategy.assign(shards, subscription, activeConsumers));
assignmentStore.createAssignments(assignments);
return assignments;
} catch (Exception e) {
log.error("Failed while creating assignment, freeing up any allocation done. {}.", e.getMessage());
assignments.forEach(assignment -> freeAssignedCapacity(assignment, subscription));
throw e;
}
}, executor);
}


public CompletableFuture<Void> unAssignShard(List<Assignment> assignments, VaradhiSubscription subscription) {
return CompletableFuture.supplyAsync(() -> {
try {
assignmentStore.deleteAssignments(assignments);
assignments.forEach(a -> {
String consumerId = a.getConsumerId();
ConsumerNode consumerNode = consumerNodes.getOrDefault(consumerId, null);
if (null == consumerNode) {
log.error("Consumer node not found, for assignment {}. Ignoring unAssignShard", a);
} else {
SubscriptionShards shards = subscription.getShards();
consumerNode.free(a, shards.getShard(a.getShardId()).getCapacityRequest());
}
});
return null;
} catch (Exception e) {
log.error("Failed while unAssigning Shards. {}.", e.getMessage());
throw e;
}
}, executor);
}

private void freeAssignedCapacity(Assignment assignment, VaradhiSubscription subscription) {
SubscriptionUnitShard shard = subscription.getShards().getShard(assignment.getShardId());
ConsumerNode consumerNode = consumerNodes.get(assignment.getConsumerId());
consumerNode.free(assignment, shard.getCapacityRequest());
}


public List<Assignment> getSubscriptionAssignment(String subscriptionName) {
return assignmentStore.getSubscriptionAssignments(subscriptionName);
}
Expand All @@ -54,19 +107,6 @@ public void consumerNodeJoined(ConsumerNode consumerNode) {
}
}

private boolean addConsumerNode(ConsumerNode consumerNode) {
String consumerNodeId = consumerNode.getMemberInfo().hostname();
MutableBoolean added = new MutableBoolean(false);
consumerNodes.computeIfAbsent(consumerNodeId, k -> {
added.setTrue();
return consumerNode;
});
if (!added.booleanValue()) {
log.warn("ConsumerNode {} already exists. Not adding again.", consumerNodes.get(consumerNodeId));
}
return added.booleanValue();
}

public void consumerNodeLeft(String consumerNodeId) {
//TODO:: re-assign the shards (should this be trigger from here or from the controller) ?
MutableBoolean marked = new MutableBoolean(false);
Expand All @@ -81,4 +121,18 @@ public void consumerNodeLeft(String consumerNodeId) {
log.warn("ConsumerNode {} not found.", consumerNodeId);
}
}

private boolean addConsumerNode(ConsumerNode consumerNode) {
String consumerNodeId = consumerNode.getMemberInfo().hostname();
MutableBoolean added = new MutableBoolean(false);
consumerNodes.computeIfAbsent(consumerNodeId, k -> {
added.setTrue();
return consumerNode;
});
if (!added.booleanValue()) {
log.warn("ConsumerNode {} already exists. Not adding again.", consumerNodes.get(consumerNodeId));
}
return added.booleanValue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public List<Assignment> assign(
List<SubscriptionUnitShard> shards, VaradhiSubscription subscription, List<ConsumerNode> consumerNodes
) {
if (consumerNodes.isEmpty()) {
log.error("Shard Assignment Failure: No active consumer nodes.");
throw new CapacityException("No active consumer node for Subscription assignment.");
}
List<Assignment> assignments = new ArrayList<>();
Expand All @@ -44,7 +43,7 @@ public List<Assignment> assign(
}
Assignment assignment =
new Assignment(subscription.getName(), shard.getShardId(), consumerNode.getMemberInfo().hostname());
consumerNode.allocate(shard.getCapacityRequest());
consumerNode.allocate(assignment, shard.getCapacityRequest());
assignments.add(assignment);
consumers.add(consumerNode);
}
Expand Down
1 change: 1 addition & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies {
api(project(':spi'))
implementation("io.vertx:vertx-core:${vertx_version}")
implementation('com.fasterxml.jackson.core:jackson-databind')
implementation("com.google.guava:guava")

testFixturesImplementation(project(":common"))
testImplementation(project(':pulsar'))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.flipkart.varadhi.core.cluster;

import com.flipkart.varadhi.entities.cluster.ConsumerInfo;
import com.flipkart.varadhi.entities.cluster.ShardOperation;
import com.flipkart.varadhi.entities.cluster.ShardStatus;

Expand All @@ -8,5 +9,9 @@
public interface ConsumerApi {
CompletableFuture<Void> start(ShardOperation.StartData operation);

CompletableFuture<ShardStatus> getStatus(String subscriptionId, int shardId);
CompletableFuture<Void> stop(ShardOperation.StopData operation);

CompletableFuture<ShardStatus> getShardStatus(String subscriptionId, int shardId);

CompletableFuture<ConsumerInfo> getConsumerInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
public interface ControllerApi {
String ROUTE_CONTROLLER = "controller";

CompletableFuture<Void> startSubscription(SubscriptionOperation.StartData operation);
CompletableFuture<SubscriptionOperation> startSubscription(String subscriptionId, String requestedBy);

CompletableFuture<Void> stopSubscription(SubscriptionOperation.StopData operation);
CompletableFuture<SubscriptionOperation> stopSubscription(String subscriptionId, String requestedBy);

CompletableFuture<Void> update(ShardOperation.OpData operation);
}
Loading

0 comments on commit 16211de

Please sign in to comment.