Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RePause All Partitions After Rebalance if user paused any and requested #363

Merged
merged 4 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/docs/asciidoc/api-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,9 @@ When group management is used, assignment listeners are invoked whenever partiti
to the consumer after a rebalance operation. When manual assignment is used, assignment listeners
are invoked when the consumer is started. Assignment listeners can be used to seek to particular offsets
in the assigned partitions so that messages are consumed from the specified offset.
When a user pauses topics/partitions before rebalancing, the behavior depends on the value of
`pauseAllAfterRebalance`. If it is set to `false`, the paused topics/partitions will remain paused after the rebalance.
However, if it is set to `true`, all assigned topics/partitions will be paused after the rebalance.

When group management is used, revocation listeners are invoked whenever partitions are revoked
from a consumer after a rebalance operation. When manual assignment is used, revocation listeners
Expand Down
111 changes: 85 additions & 26 deletions src/main/java/reactor/kafka/receiver/ImmutableReceiverOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class ImmutableReceiverOptions<K, V> implements ReceiverOptions<K, V> {
private final Pattern subscribePattern;
private final Supplier<Scheduler> schedulerSupplier;
private final ConsumerListener consumerListener;
private final boolean pauseAllAfterRebalance;

ImmutableReceiverOptions() {
this(new HashMap<>());
Expand Down Expand Up @@ -105,6 +106,7 @@ class ImmutableReceiverOptions<K, V> implements ReceiverOptions<K, V> {
this.properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
schedulerSupplier = Schedulers::immediate;
consumerListener = null;
pauseAllAfterRebalance = false;
}

ImmutableReceiverOptions(
Expand All @@ -127,7 +129,8 @@ class ImmutableReceiverOptions<K, V> implements ReceiverOptions<K, V> {
Collection<TopicPartition> partitions,
Pattern pattern,
Supplier<Scheduler> supplier,
ConsumerListener consumerListener
ConsumerListener consumerListener,
boolean pauseAllAfterRebalance
) {
this.properties = new HashMap<>(properties);
this.assignListeners = new ArrayList<>(assignListeners);
Expand All @@ -149,6 +152,7 @@ class ImmutableReceiverOptions<K, V> implements ReceiverOptions<K, V> {
this.subscribePattern = pattern;
this.schedulerSupplier = supplier;
this.consumerListener = consumerListener;
this.pauseAllAfterRebalance = pauseAllAfterRebalance;
}

@Override
Expand Down Expand Up @@ -189,7 +193,8 @@ public ReceiverOptions<K, V> consumerProperty(String name, Object newValue) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand All @@ -215,7 +220,8 @@ public ReceiverOptions<K, V> withKeyDeserializer(Deserializer<K> keyDeserializer
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -246,7 +252,8 @@ public ReceiverOptions<K, V> withValueDeserializer(Deserializer<V> valueDeserial
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -285,7 +292,8 @@ public ReceiverOptions<K, V> pollTimeout(Duration timeout) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -319,7 +327,8 @@ public ReceiverOptions<K, V> closeTimeout(Duration timeout) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -350,7 +359,8 @@ public ReceiverOptions<K, V> addAssignListener(Consumer<Collection<ReceiverParti
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -381,7 +391,8 @@ public ReceiverOptions<K, V> addRevokeListener(Consumer<Collection<ReceiverParti
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand All @@ -407,7 +418,8 @@ public ReceiverOptions<K, V> clearAssignListeners() {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand All @@ -433,7 +445,8 @@ public ReceiverOptions<K, V> clearRevokeListeners() {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -469,7 +482,8 @@ public ReceiverOptions<K, V> subscription(Collection<String> topics) {
null,
null,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand All @@ -495,7 +509,8 @@ public ReceiverOptions<K, V> subscription(Pattern pattern) {
null,
Objects.requireNonNull(pattern),
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand All @@ -521,7 +536,8 @@ public ReceiverOptions<K, V> assignment(Collection<TopicPartition> partitions) {
Objects.requireNonNull(partitions),
null,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -582,7 +598,8 @@ public ReceiverOptions<K, V> commitInterval(Duration commitInterval) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -616,7 +633,8 @@ public ReceiverOptions<K, V> commitBatchSize(int commitBatchSize) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -650,7 +668,8 @@ public ReceiverOptions<K, V> atmostOnceCommitAheadSize(int commitAheadSize) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -684,7 +703,8 @@ public ReceiverOptions<K, V> maxCommitAttempts(int maxAttempts) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -715,7 +735,8 @@ public ReceiverOptions<K, V> maxDeferredCommits(int maxDeferred) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -746,10 +767,43 @@ public ReceiverOptions<K, V> maxDelayRebalance(Duration maxDelay) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

@Override
public boolean pauseAllAfterRebalance() {
return this.pauseAllAfterRebalance;
}

@Override
public ReceiverOptions<K, V> pauseAllAfterRebalance(boolean pauseAll) {
return new ImmutableReceiverOptions<>(
properties,
assignListeners,
revokeListeners,
keyDeserializer,
valueDeserializer,
pollTimeout,
closeTimeout,
commitInterval,
commitBatchSize,
atmostOnceCommitAheadSize,
maxCommitAttempts,
commitRetryInterval,
maxDeferredCommits,
maxDelayRebalance,
commitIntervalDuringDelay,
subscribeTopics,
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener,
pauseAll
);
}

@Override
public long commitIntervalDuringDelay() {
return this.commitIntervalDuringDelay;
Expand All @@ -776,7 +830,8 @@ public ReceiverOptions<K, V> commitIntervalDuringDelay(long interval) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -807,7 +862,8 @@ public ReceiverOptions<K, V> schedulerSupplier(Supplier<Scheduler> schedulerSupp
assignTopicPartitions,
subscribePattern,
Objects.requireNonNull(schedulerSupplier),
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand All @@ -833,7 +889,8 @@ public ReceiverOptions<K, V> commitRetryInterval(Duration commitRetryInterval) {
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand All @@ -858,7 +915,8 @@ public ReceiverOptions<K, V> consumerListener(@Nullable ConsumerListener consume
assignTopicPartitions,
subscribePattern,
schedulerSupplier,
consumerListener
consumerListener,
pauseAllAfterRebalance
);
}

Expand Down Expand Up @@ -912,8 +970,8 @@ public int hashCode() {
subscribeTopics,
assignTopicPartitions,
subscribePattern,
consumerListener
);
consumerListener) +
(this.pauseAllAfterRebalance ? 1 : 0);
}

@Override
Expand All @@ -940,7 +998,8 @@ public boolean equals(Object object) {
&& Objects.equals(subscribeTopics, that.subscribeTopics)
&& Objects.equals(assignTopicPartitions, that.assignTopicPartitions)
&& Objects.equals(subscribePattern, that.subscribePattern)
&& Objects.equals(consumerListener, that.consumerListener);
&& Objects.equals(consumerListener, that.consumerListener)
&& pauseAllAfterRebalance == that.pauseAllAfterRebalance;
}
return false;
}
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/reactor/kafka/receiver/ReceiverOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,16 @@ default ReceiverOptions<K, V> maxDelayRebalance(Duration maxDelay) {
return this;
}

/**
* When true, pause all partitions on assignment after rebalance,
* if any partitions were paused by User before the rebalance. Default false
* @param pauseAll
* @return
*/
default ReceiverOptions<K, V> pauseAllAfterRebalance(boolean pauseAll) {
return this;
}

/**
* Set how often to commit offsets, in milliseconds, while a rebalance is being
* delayed. Default 100ms.
Expand Down Expand Up @@ -461,6 +471,16 @@ default Duration maxDelayRebalance() {
return Duration.ofSeconds(60);
}

/**
* When true, pause all partitions on assignment after rebalance,
* if any partitions were paused by User before the rebalance.
* Default false
* @return
*/
default boolean pauseAllAfterRebalance() {
return false;
}

/**
* Get how often to commit offsets, in milliseconds, while a rebalance is being
* delayed. Default 100ms.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,22 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
if (!pausedByUser.isEmpty()) {
List<TopicPartition> toRepause = new ArrayList<>();
Iterator<TopicPartition> iterator = pausedByUser.iterator();
while (iterator.hasNext()) {
TopicPartition next = iterator.next();
if (partitions.contains(next)) {
toRepause.add(next);
} else {
iterator.remove();
if (receiverOptions.pauseAllAfterRebalance()) {
// pause all partitions if any partitions are paused by user before rebalance
log.debug("User requested re-pausing all assignments");
toRepause.addAll(partitions);
pausedByUser.clear();
pausedByUser.addAll(partitions);

} else {
Iterator<TopicPartition> iterator = pausedByUser.iterator();
while (iterator.hasNext()) {
TopicPartition next = iterator.next();
if (partitions.contains(next)) {
toRepause.add(next);
} else {
iterator.remove();
}
}
}
if (!repausedAll && !toRepause.isEmpty()) {
Expand Down
Loading
Loading