Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New stream method with the full rebalance information #876

Open
LMnet opened this issue Feb 21, 2022 · 3 comments
Open

New stream method with the full rebalance information #876

LMnet opened this issue Feb 21, 2022 · 3 comments
Labels
enhancement New feature or request
Milestone

Comments

@LMnet
Copy link
Member

LMnet commented Feb 21, 2022

After an #844 partitionsMapStream is no more a "Stream where each element contains a current assignment" as was mentioned in the scala doc previously.

It looks like if we want to retain such functionality, we need to add a new stream method like this:

def detailedDataStream: Stream[F, FullAssigmnentInfo[F, K, V]]

case class FullAssigmnentInfo[F[_], K, V](
  newlyAssignedPartitions: Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]],
  revokedPartitions: SortedSet[TopicPartition],
  retainedPartitions: SortedSet[TopicPartition],
)

Naming is not final, just my thoughts.

@LMnet LMnet added the enhancement New feature or request label Feb 21, 2022
@LMnet LMnet added this to the v3.0.0 milestone Mar 12, 2022
@LMnet LMnet mentioned this issue Mar 12, 2022
@bplommer
Copy link
Member

I think this is a great idea. We could initially make it a private method and implement the other methods in terms of it, so we have time to iron out any issues before exposing it in the public API. Then when we expose it we can provide default implementations for the other methods in terms of it on KafkaConsume, making it much easier to mock KafkaConsume for testing.

@bplommer
Copy link
Member

Having played with this a bit, I think the stream should instead emit an ADT with subtypes Assigned(newlyAssignedPartitions: Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]) and Revoked(revokedPartitions: SortedSet[TopicPartition]). That's what the information we get from Java Kafka looks like, and a stream of FullAssignmentInfo can be generated in user code via a suitable fold.

@LMnet
Copy link
Member Author

LMnet commented Mar 27, 2022

But what about retained partitions? We could provide this information too.

@aartigao aartigao modified the milestones: v3.0.0, v4.0.0 Oct 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Development

No branches or pull requests

3 participants