Skip to content

Commit

Permalink
making fixes to grouped message src, so that nextMessages always retu… (
Browse files Browse the repository at this point in the history
#145)

Making fixes to grouped message src, so that nextMessages always returns with messages asynchronously. previously it could return with 0 msgs and used to prioritize fetching messages from consumer
  • Loading branch information
gauravAshok authored May 30, 2024
1 parent 2e1c7fd commit 16e60ef
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,54 @@
import com.flipkart.varadhi.spi.services.PolledMessages;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
* Message source that maintains ordering among messages of the same groupId.
*/
@RequiredArgsConstructor
@Slf4j
public class GroupedMessageSrc<O extends Offset> implements MessageSrc {

private final ConcurrentHashMap<String, GroupTracker> allGroupedMessages = new ConcurrentHashMap<>();

private final ConcurrentLinkedDeque<String> freeGroups = new ConcurrentLinkedDeque<>();

private final Consumer<O> consumer;

/**
* Used to limit the message buffering. Will be driven via consumer configuration.
*/
private final long maxUnAckedMessages;

/**
* Maintains the count of total messages read from the consumer so far.
* Required for watermark checks, for when this value runs low we can fetch more messages from the consumer.
* Counter gets decremented when the message is committed/consumed.
*/
private final AtomicLong totalInFlightMessages = new AtomicLong(0);
private final AtomicLong totalUnAckedMessages = new AtomicLong(0);

// Used for watermark checks against the totalInFlightMessages. Will be driven via consumer configuration.
private final long maxInFlightMessages = 100; // todo(aayush): make configurable
// Internal states to manage async state

private final Consumer<O> consumer;
/**
* flag to indicate whether a task to fetch messages from consumer is ongoing.
*/
private final AtomicBoolean pendingAsyncFetch = new AtomicBoolean(false);

/**
* holder to keep the incomplete future object while waiting for new messages or groups to get freed up.
*/
private final AtomicReference<NextMsgsRequest> pendingRequest = new AtomicReference<>();

/**
* Attempt to fill the message array with one message from each group.
Expand All @@ -47,22 +66,65 @@ public class GroupedMessageSrc<O extends Offset> implements MessageSrc {
*/
@Override
public CompletableFuture<Integer> nextMessages(MessageTracker[] messages) {
if (!hasMaxInFlightMessages()) {
return replenishAvailableGroups().thenApply(v -> nextMessagesInternal(messages));
int count = nextMessagesInternal(messages);
if (count > 0) {
return CompletableFuture.completedFuture(count);
}

NextMsgsRequest request = new NextMsgsRequest(new CompletableFuture<>(), messages);
if (!pendingRequest.compareAndSet(null, request)) {
throw new IllegalStateException(
"nextMessages method is not supposed to be called concurrently. There seems to be a pending nextMessage call");
}

// incomplete result is saved. trigger new message fetch.
optionallyFetchNewMessages();

// double check, if any free group is available now.
if (isFreeGroupPresent()) {
tryCompletePendingRequest();
}

return request.result;
}

private void tryCompletePendingRequest() {
NextMsgsRequest request;
if ((request = pendingRequest.getAndSet(null)) != null) {
request.result.complete(nextMessagesInternal(request.messages));
}
}

private void optionallyFetchNewMessages() {
if (!isMaxUnAckedMessagesBreached() && pendingAsyncFetch.compareAndSet(false, true)) {
// there is more room for new messages. We can initiate a new fetch request, as none is ongoing.
consumer.receiveAsync().whenComplete((polledMessages, ex) -> {
if (ex != null) {
replenishAvailableGroups(polledMessages);
pendingAsyncFetch.set(false);
} else {
log.error("Error while fetching messages from consumer", ex);
throw new IllegalStateException(
"should be unreachable. consumer.receiveAsync() should not throw exception.");
}
});
}
return CompletableFuture.completedFuture(nextMessagesInternal(messages));
}

private int nextMessagesInternal(MessageTracker[] messages) {
int i = 0;
GroupTracker groupTracker;
while (i < messages.length && (groupTracker = getGroupTracker()) != null) {
while (i < messages.length && (groupTracker = pollFreeGroup()) != null) {
messages[i++] = new GroupedMessageTracker(groupTracker.messages.getFirst().nextMessage());
}
return i;
}

private GroupTracker getGroupTracker() {
boolean isFreeGroupPresent() {
return !freeGroups.isEmpty();
}

private GroupTracker pollFreeGroup() {
String freeGroup = freeGroups.poll();
if (freeGroup == null) {
return null;
Expand All @@ -77,13 +139,6 @@ private GroupTracker getGroupTracker() {
return tracker;
}

private CompletableFuture<Void> replenishAvailableGroups() {
return consumer.receiveAsync().thenApply(polledMessages -> {
replenishAvailableGroups(polledMessages);
return null;
});
}

private void replenishAvailableGroups(PolledMessages<O> polledMessages) {
Map<String, List<MessageTracker>> groupedMessages = groupMessagesByGroupId(polledMessages);
for (Map.Entry<String, List<MessageTracker>> group : groupedMessages.entrySet()) {
Expand All @@ -97,11 +152,12 @@ private void replenishAvailableGroups(PolledMessages<O> polledMessages) {
tracker.messages.add(newBatch);
return tracker;
});
totalInFlightMessages.addAndGet(newBatch.count());
totalUnAckedMessages.addAndGet(newBatch.count());
if (isNewGroup.isTrue()) {
freeGroups.add(group.getKey());
}
}
tryCompletePendingRequest();
}

private Map<String, List<MessageTracker>> groupMessagesByGroupId(PolledMessages<O> polledMessages) {
Expand All @@ -117,8 +173,8 @@ private Map<String, List<MessageTracker>> groupMessagesByGroupId(PolledMessages<
return groups;
}

private boolean hasMaxInFlightMessages() {
return totalInFlightMessages.get() >= maxInFlightMessages;
boolean isMaxUnAckedMessagesBreached() {
return totalUnAckedMessages.get() >= maxUnAckedMessages;
}

enum GroupStatus {
Expand Down Expand Up @@ -158,7 +214,7 @@ private void free(String groupId, MessageConsumptionStatus status) {
throw new IllegalStateException(String.format("Tried to free group %s: %s", gId, tracker));
}
var messages = tracker.messages;
if (!messages.isEmpty() && messages.getFirst().remaining() == 0) {
while (!messages.isEmpty() && messages.getFirst().remaining() == 0) {
messages.removeFirst();
}
if (!messages.isEmpty()) {
Expand All @@ -169,10 +225,14 @@ private void free(String groupId, MessageConsumptionStatus status) {
return null;
}
});
totalInFlightMessages.decrementAndGet();
totalUnAckedMessages.decrementAndGet();
if (isRemaining.isTrue()) {
freeGroups.addFirst(groupId);
tryCompletePendingRequest();
}
}
}

record NextMsgsRequest(CompletableFuture<Integer> result, MessageTracker[] messages) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ public class UnGroupedMessageSrc<O extends Offset> implements MessageSrc {

private final Consumer<O> consumer;

// flag to indicate whether a future is in progress to fetch messages from the consumer.
private final AtomicBoolean futureInProgress = new AtomicBoolean(false);
/**
* flag to indicate whether a task to fetch messages from consumer is ongoing.
*/
private final AtomicBoolean pendingAsyncFetch = new AtomicBoolean(false);

// Iterator into an ongoing consumer batch that has not been fully processed yet.
private Iterator<PolledMessage<O>> ongoingIterator = null;
/**
* Iterator into an ongoing consumer batch that has not been fully processed yet.
*/
private volatile Iterator<PolledMessage<O>> ongoingIterator = null;

/**
* Fetches the next batch of messages from the consumer.
* Prioritise immediate fetch and return over waiting for the consumer.
* Prioritises returning whatever messages are available.
*
* @param messages Array of message trackers to populate.
*
Expand All @@ -38,44 +42,53 @@ public CompletableFuture<Integer> nextMessages(MessageTracker[] messages) {
// Our first priority is to drain the iterator if it is set and return immediately.
// We do not want to proceed with consumer receiveAsync if we have messages in the iterator,
// as a slow or empty consumer might block the flow and cause the iterator contents to be stuck.
int offset = fetchFromIterator(ongoingIterator, messages, 0);
if (offset > 0) {
return CompletableFuture.completedFuture(offset);
int count = fetchFromIterator(consumer, messages, ongoingIterator);
if (count > 0) {
return CompletableFuture.completedFuture(count);
}

// If the iterator is not set, or is empty, then we try to fetch the message batch from the consumer.
// However, multiple calls to nextMessages may fire multiple futures concurrently.
// Leading to a race condition that overrides the iterator from a previous un-processed batch, causing a lost-update problem.
// Therefore, we use the futureInProgress flag to limit the concurrency and ensure only one future is in progress at a time.
if (futureInProgress.compareAndSet(false, true)) {
return consumer.receiveAsync()
.thenApply(polledMessages -> processPolledMessages(polledMessages, messages, offset))
.whenComplete((result, ex) -> futureInProgress.set(
false)); // any of the above stages can complete exceptionally, so this is to ensure the flag is reset.
ongoingIterator = null;
if (pendingAsyncFetch.compareAndSet(false, true)) {
return consumer.receiveAsync().whenComplete((result, ex) -> pendingAsyncFetch.set(false))
.thenApply(polledMessages -> processPolledMessages(polledMessages, messages));
} else {
throw new IllegalStateException(
"nextMessages method is not supposed to be called concurrently. There seems to be an ongoing consumer.receiveAsync() operation.");
}
return CompletableFuture.completedFuture(0);
}

private int processPolledMessages(PolledMessages<O> polledMessages, MessageTracker[] messages, int startIndex) {
ongoingIterator = polledMessages.iterator();
return fetchFromIterator(ongoingIterator, messages, startIndex);
private int processPolledMessages(PolledMessages<O> polledMessages, MessageTracker[] messages) {
Iterator<PolledMessage<O>> polledMessagesIterator = polledMessages.iterator();
ongoingIterator = polledMessagesIterator;
return fetchFromIterator(consumer, messages, polledMessagesIterator);
}

/**
* Fetches messages from the iterator and populates the message array.
*
* @param iterator Iterator of messages to fetch from.
* @param messages Array of message trackers to populate.
* @param startIndex Index into the messages array from where to start storing the messages.
* @param iterator Iterator of messages to fetch from.
* @param messages Array of message trackers to populate.
*
* @return Index into the messages array where the next message should be stored. (will be equal to the length if completely full)
*/
private int fetchFromIterator(
Iterator<PolledMessage<O>> iterator, MessageTracker[] messages, int startIndex

static <O extends Offset> int fetchFromIterator(
Consumer<O> consumer, MessageTracker[] messages, Iterator<PolledMessage<O>> iterator
) {
while (iterator != null && iterator.hasNext() && startIndex < messages.length) {
messages[startIndex++] = new PolledMessageTracker<>(consumer, iterator.next());
if (iterator == null || !iterator.hasNext()) {
return 0;
}

int i = 0;
while (i < messages.length && iterator.hasNext()) {
PolledMessage<O> polledMessage = iterator.next();
MessageTracker messageTracker = new PolledMessageTracker<>(consumer, polledMessage);
messages[i++] = messageTracker;
}
return startIndex;
return i;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.flipkart.varadhi.consumer.impl;

import com.flipkart.varadhi.consumer.ThresholdProvider;
import com.flipkart.varadhi.consumer.InternalQueueType;
import com.flipkart.varadhi.consumer.ThresholdProvider;
import com.flipkart.varadhi.consumer.Throttler;
import com.google.common.base.Ticker;
import lombok.RequiredArgsConstructor;
Expand All @@ -18,7 +18,8 @@
* @param <T>
*/
@Slf4j
public class SlidingWindowThrottler<T> implements Throttler<T>, ThresholdProvider.ThresholdChangeListener, AutoCloseable {
public class SlidingWindowThrottler<T>
implements Throttler<T>, ThresholdProvider.ThresholdChangeListener, AutoCloseable {

/*
Approach:
Expand Down Expand Up @@ -172,7 +173,6 @@ private boolean moveWindow(long currentTick) {
if (newWindowBeginTick == windowBeginTick) {
return false;
}

for (long i = windowBeginTick; i < newWindowBeginTick; ++i) {
int beginIdx = (int) (i % totalTicks);
int endIdx = (int) ((i + ticksInWindow) % totalTicks);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.flipkart.varadhi.consumer;

import com.codahale.metrics.*;
import com.codahale.metrics.Timer;
import com.codahale.metrics.*;
import com.flipkart.varadhi.consumer.concurrent.Context;
import com.flipkart.varadhi.consumer.concurrent.CustomThread;
import com.flipkart.varadhi.consumer.concurrent.EventExecutor;
Expand Down Expand Up @@ -103,17 +103,24 @@ public static void doSimulation(
Meter loadGenMeter =
registry.register("load.gen.rate", new Meter());
Meter errorExpMeter = registry.register("task.error.experienced.rate", new Meter());
Timer completionLatency = registry.register("task.completion.latency", new Timer(new SlidingTimeWindowArrayReservoir(60, TimeUnit.SECONDS)));
Timer throttlerAcquireLatency = registry.register("throttler.acquire.latency", new Timer(new SlidingTimeWindowArrayReservoir(60, TimeUnit.SECONDS)));
Gauge<Float> errorThresholdGuage = registry.registerGauge("error.threshold.value", dynamicThreshold::getThreshold);
Timer completionLatency = registry.register(
"task.completion.latency",
new Timer(new SlidingTimeWindowArrayReservoir(60, TimeUnit.SECONDS))
);
Timer throttlerAcquireLatency = registry.register(
"throttler.acquire.latency",
new Timer(new SlidingTimeWindowArrayReservoir(60, TimeUnit.SECONDS))
);
Gauge<Float> errorThresholdGuage =
registry.registerGauge("error.threshold.value", dynamicThreshold::getThreshold);
if (metricListener != null) {
websocketScheduler.scheduleAtFixedRate(() -> {
Map<String, Double> datapoints = new HashMap<>();
datapoints.put("error.experienced.rate", errorExpMeter.getOneMinuteRate());
datapoints.put("task.completion.rate", completionLatency.getOneMinuteRate());
datapoints.put("error.threshold.value", (double) errorThresholdGuage.getValue());
metricListener.accept(datapoints);
}, 1_000,2_000, TimeUnit.MILLISECONDS);
}, 1_000, 2_000, TimeUnit.MILLISECONDS);
}
reporter.start(2, TimeUnit.SECONDS);
AtomicInteger throttlePending = new AtomicInteger(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import com.flipkart.varadhi.spi.services.DummyConsumer;
import com.flipkart.varadhi.spi.services.DummyProducer.DummyOffset;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;

@Slf4j
class UnGroupedMessageSrcTest {
Expand Down Expand Up @@ -152,9 +152,15 @@ void testConcurrencyInConsumerFetchNotAllowed() {
DummyConsumer.SlowConsumer consumer = new DummyConsumer.SlowConsumer(messages, 3);
UnGroupedMessageSrc<DummyOffset> messageSrc = new UnGroupedMessageSrc<>(consumer);
var f1 = messageSrc.nextMessages(messageTrackers);
var f2 = messageSrc.nextMessages(messageTrackers);

assertEquals(0, f2.join());
try {
assertFalse(f1.isDone());
var f2 = messageSrc.nextMessages(messageTrackers);
Assertions.fail("concurrent invocation is not expected.");
} catch (IllegalStateException e) {
// expected.
}

assertEquals(messageTrackers.length, f1.join());

// since f1 is completed now, next invocation should return remaining messages
Expand Down
Loading

0 comments on commit 16e60ef

Please sign in to comment.