Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#615 - allow specifying a listener for partition assignment changes #1305

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

nachogiljaldo
Copy link

@nachogiljaldo nachogiljaldo commented Jul 2, 2024

Closes #615

Adds a listener to send re-assignments to a listener. After this, it is possible to possible to provide a listener (optional) to have consumers be notified of changes in partition assignments for the current consumer.

This is useful for debugging as well as potentially other things such as dropping in-flight events for partitions that are not owned anymore.

@nachogiljaldo
Copy link
Author

@achille-roussel I saw you commented on #615 . Is this something you could review?

@nachogiljaldo nachogiljaldo marked this pull request as ready for review July 2, 2024 22:12
@isaacd9
Copy link

isaacd9 commented Jul 3, 2024

I don't think adding this is a good idea. Instead, you should probably use the ConsumerGroup API to listen for a new generation and construct a new reader.

@nachogiljaldo
Copy link
Author

Thanks for the feedback @isaacd9 ! Could you elaborate?

I had a quick look at the NewReader. Also it seems to use run which is private, so it's not possible to do it that way without duplicating even more code? Same for getTopics() while building the ConsumerGroupConfig.

Maybe, instead, a modification safe version of ConsumerGroup should be exposed (that only has Next but not Close) in the Reader? That allows to preserve the encapsulation of the logic to create Readers which IMHO seems important and avoids unnecessary duplication on all clients that would need this.

Something like:

type ConsumerGroupGenerationListener interface {
  Next(ctx context.Context) (*Generation, error)
}

...

func (r *Reader) ConsumerGroupGenerationListener(ctx context.Context) (ConsumerGroupGenerationListener, error) {
	if !r.useConsumerGroup() {
		return nil, errOnlyAvailableWithGroup
	}
        return r.consumerGroupGenerationListener, nil
}

@nachogiljaldo
Copy link
Author

@petedannemann , sorry for the direct ping but you seem to have reviewed some of the last PRs. I wonder if this is something you could provide feedback on the approach (or the alternatives provided by Isaac or my counter proposal).

@nachogiljaldo
Copy link
Author

heya @erikdw / @jkoelker apologies for the direct ping, but this has been laying around for a while and I would need some guidance (as you seem to have approved the last merged PRs)

If you're indeed maintainers of this library, are you ok with this approach? Would you prefer the one outlined in #1305 (comment) or a different one? Happy to take on those suggestions

@jkoelker
Copy link
Member

@nachogiljaldo I suspect this has sat for while as the commit messages and pr description don't give very much context as to why its necessary. I clicked through the issue and after reading the discussion there, I understand that there is a desire to notify other portions of the code when group reassignment takes place.

I'm not familiar with this code or kafka implementation details, so as to use a ConsumerGroup or this implementation, I can't say, and wouldn't feel comfortable reviewing this.

Code wise, I'm not sure sorting the list is necessary and would just delay the re-assignment if a listener doesn't need it sorted, if a listener needs it sorted, I'd assume it would sort it itself.

@nachogiljaldo nachogiljaldo changed the title #615 - listener for assignments #615 - allow specifying a listener for partition assignment changes Sep 11, 2024
@nachogiljaldo
Copy link
Author

@jkoelker thanks for the prompt reply!

Fair points, I updated the PR description and first commit message to be more informative, I hope that makes sense.

Also removed the sorting as you suggested.

As per the specifics of the implementation, since you said you're not familiar with the code or kafka, would you know who could help providing feedback on the approach / review this PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

A way to retrieve what partitions where consumer is signed
3 participants