-
Notifications
You must be signed in to change notification settings - Fork 628
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix check topic partition change by load topic metadata from keys of consumerGroup.topicPartitionLength #1281
base: master
Are you sure you want to change the base?
Conversation
I've done some testing and I have been able to recreate the leave and rejoin but it doesn't seem to be big an issue. Doesn't this change also prevent the new topic from being assigned properly? |
Not familiar with kafka protocol, but my goal is to prevent unsubscribed topic become key of ConsumerGroup.topicPartitionLength object, so it seems more safe to do it around invoking of createTopicPartitionLength? |
Yes, you could filter the topic from there and it won't prevent the assignment of those new topics. I am not sure of the correct behavior. Do you believe the consumer leaders own assigned topics should prevent it from assigning topics and partitions it's not aware of in the group? |
234dae1
to
803555e
Compare
After some experiment, I think the best way to correct the check is to load all topics in |
I'm having the same problem, I want to have two different consumerGroups subscribed to two topics (one per each). Consumers start a rebalance loop, that is because you are requesting metadata of just the subscribed topics, and the function _checkTopicPartitionChange is comparing them with the metadata of the group, not just the consumer topics. @hyperlink do you have an alternative to get some ConsumerGroup for some topics, one per each with the current code? |
Previous behavior only check topics that this consumer is subscribed, if two consumer with same group id subscribed to different topic, group master cannot check topic partition change correctly, and rejoin the whole group each time such check is scheduled. This is because this.topicPartitionLength is initialized when joining group and contains all the topics that has been subscribed by this group, while this.topics may not contains all the topics. In such condition: ``` const topicOrPartitionsChanged = _.some(this.topicPartitionLength, function (numberOfPartitions, topic) { return numberOfPartitions !== _.get(metadata, `['${topic}'].length`, 0); }); ``` So `topicPartitionChanged` will always be caculated to be true.
803555e
to
55b4451
Compare
@hyperlink Could this PR be reviewed again if it could be merged? |
If two consumer group of same group id subscribed different topics,
Return value of ConsumerGroup._checkTopicPartitionChange will always
return true, resulting into leave and rejoin.
Actually I'm considered whether code could just be rewrite into
var topics = this.topics
,but it seems too risky.
P.S. The reason that two consumer group of same group id subscribed different topics
is that we're splitting our app into microservices, and we want to be able to upgrade
or rollback seamlessly.