Skip to content

Commit

Permalink
adding first barebone impl for varadhi consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
gauravAshok committed Jun 20, 2024
1 parent 0e14aec commit d779255
Show file tree
Hide file tree
Showing 22 changed files with 598 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

public interface ConcurrencyControl<T> {

Collection<CompletableFuture<T>> enqueueTasks(InternalQueueType type, Collection<Supplier<CompletableFuture<T>>> tasks);
// TODO: maybe evaluate per task enqueue for CC as well.
Collection<CompletableFuture<T>> enqueueTasks(
InternalQueueType type, Collection<Supplier<CompletableFuture<T>>> tasks
);

void executePendingTasks();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public interface ConsumersManager {
* details.
* `shardName` identifies the different shards within the subscription.
*
* @return
* @return Future that will be completed when the consumer is started & ready to consume messages.
*/
CompletableFuture<Void> startSubscription(
String project,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.entities.Message;
import com.flipkart.varadhi.entities.Offset;
import com.flipkart.varadhi.spi.services.Consumer;
import com.flipkart.varadhi.spi.services.PolledMessage;
Expand All @@ -22,8 +21,8 @@
/**
* Message source that maintains ordering among messages of the same groupId.
*/
@RequiredArgsConstructor
@Slf4j
@RequiredArgsConstructor
public class GroupedMessageSrc<O extends Offset> implements MessageSrc {

private final ConcurrentHashMap<String, GroupTracker> allGroupedMessages = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -91,6 +90,7 @@ public CompletableFuture<Integer> nextMessages(MessageTracker[] messages) {
private void tryCompletePendingRequest() {
NextMsgsRequest request;
if ((request = pendingRequest.getAndSet(null)) != null) {
// TODO: does it need to be done on the context?
request.result.complete(nextMessagesInternal(request.messages));
}
}
Expand Down Expand Up @@ -196,7 +196,7 @@ private class GroupedMessageTracker implements MessageTracker {
private final MessageTracker messageTracker;

@Override
public Message getMessage() {
public PolledMessage<? extends Offset> getMessage() {
return messageTracker.getMessage();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.entities.Message;
import com.flipkart.varadhi.entities.Offset;
import com.flipkart.varadhi.spi.services.PolledMessage;

public interface MessageTracker {

Message getMessage();
PolledMessage<? extends Offset> getMessage();

default String getGroupId() {
Message msg = getMessage();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.entities.Message;
import com.flipkart.varadhi.entities.Offset;
import com.flipkart.varadhi.spi.services.Consumer;
import com.flipkart.varadhi.spi.services.PolledMessage;
Expand All @@ -18,7 +17,7 @@ public PolledMessageTracker(Consumer<O> committer, PolledMessage<O> message) {
}

@Override
public Message getMessage() {
public PolledMessage<O> getMessage() {
return message;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;

@Getter
@RequiredArgsConstructor
public class Context {

private static final FastThreadLocal<Context> currentThreadCtx = new FastThreadLocal<>();

private final EventExecutor executor;
@Getter(lombok.AccessLevel.PACKAGE)
final EventExecutor executor;

public interface Task extends Runnable {
Context getContext();
}

/**
* Update the current thread's context to this context.
*/
public void updateCurrentThreadContext() {
Thread currentThread = Thread.currentThread();
if (currentThread instanceof CustomThread) {
Expand All @@ -25,12 +28,48 @@ public void updateCurrentThreadContext() {
}
}

public static Context getCurrentTheadContext() {
public static Context getCurrentThreadContext() {
Thread currentThread = Thread.currentThread();
if (currentThread instanceof CustomThread) {
return ((CustomThread) currentThread).getContext();
} else {
return currentThreadCtx.get();
}
}

public boolean isInContext() {
return Thread.currentThread() == executor.getThread();
}

public Task wrap(Runnable runnable) {
return new Task() {
@Override
public Context getContext() {
return Context.this;
}

@Override
public void run() {
runnable.run();
}
};
}

/**
* It makes sure that the task is run on the thread bound to the context. If already on the context thread, it
* runs the task directly.
*
* @param runnable
*/
public void executeOnContext(Runnable runnable) {
if (isInContext()) {
runnable.run();
} else {
executor.execute(wrap(runnable));
}
}

public void execute(Task task) {
executor.execute(task);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,28 @@ EventExecutorGroup getParent() {
return parent;
}

CustomThread getThread() {
return thread;
}

@Override
public void execute(Runnable command) {
// TODO: review all cases here. As of now, here we are assuming that all tasks are running from custom threads,
// which is most likely false.

if (command instanceof Context.Task) {
EventExecutor boundExecutor = ((Context.Task) command).getContext().getExecutor();
if (command instanceof Context.Task task) {

// TODO: maybe move this check to context.execute()
EventExecutor boundExecutor = task.getContext().getExecutor();
if (boundExecutor != null && boundExecutor != this) {
throw new IllegalStateException(
"task is tied to an executor:" + boundExecutor + ", but is being executed on:" + this);
}
taskQueue.add((Context.Task) command);
taskQueue.add(task);
return;
}

taskQueue.add(new WrappedTask(Context.getCurrentTheadContext(), command));
taskQueue.add(new WrappedTask(Context.getCurrentThreadContext(), command));
}

public void stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -126,6 +129,8 @@ public int getPendingCount() {
* Otherwise, pending tasks may sit idle forever.
* 3. If there is no task running at the moment to schedule any pending task, then we should schedule it regardless.
*
* This method can run on any arbitrary thread.
*
* @param result
* @param ex
*/
Expand All @@ -141,7 +146,7 @@ private void onTaskCompletion(T result, Throwable ex) {
}

if (scheduleRequired) {
context.getExecutor().execute(() -> {
context.executeOnContext(() -> {
executePendingTasks();
schedulePendingTaskCounter.decrementAndGet();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@
import com.flipkart.varadhi.consumer.ConsumerState;
import com.flipkart.varadhi.consumer.ConsumersManager;
import com.flipkart.varadhi.consumer.ConsumptionFailurePolicy;
import com.flipkart.varadhi.entities.*;
import com.flipkart.varadhi.consumer.VaradhiConsumer;
import com.flipkart.varadhi.entities.ConsumptionPolicy;
import com.flipkart.varadhi.entities.Endpoint;
import com.flipkart.varadhi.entities.StorageSubscription;
import com.flipkart.varadhi.entities.StorageTopic;
import com.flipkart.varadhi.entities.cluster.ConsumerInfo;
import com.flipkart.varadhi.entities.TopicPartitions;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

public class ConsumersManagerImpl implements ConsumersManager {
private final ConsumerInfo consumerInfo;
private final Map<ShardId, ConsumerHolder> consumers = new ConcurrentHashMap<>();

public ConsumersManagerImpl(ConsumerInfo consumerInfo) {
this.consumerInfo = consumerInfo;
Expand All @@ -22,7 +28,15 @@ public CompletableFuture<Void> startSubscription(
boolean grouped, Endpoint endpoint, ConsumptionPolicy consumptionPolicy,
ConsumptionFailurePolicy failurePolicy
) {
return CompletableFuture.completedFuture(null);
ShardId id = new ShardId(subscription, shardId);
ConsumerHolder prev = consumers.putIfAbsent(id, new ConsumerHolder());
if (prev != null) {
throw new IllegalArgumentException("Consumer already exists for " + id);
}
ConsumerHolder newConsumer = consumers.get(id);
newConsumer.consumer = null;

return null;
}

@Override
Expand All @@ -49,4 +63,11 @@ public ConsumerState getConsumerState(String subscription, int shardId) {
public ConsumerInfo getInfo() {
return consumerInfo;
}

record ShardId(String subscription, int shardId) {
}

static class ConsumerHolder {
private VaradhiConsumer consumer;
}
}
Loading

0 comments on commit d779255

Please sign in to comment.