-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka module: query each broker all the partitions it is a leader for #16556
Conversation
jenkins, test this again please |
jenkins, test this again please |
jenkins, test this again please (Jenkins couldn't trigger jobs again...) |
metricbeat/module/kafka/broker.go
Outdated
|
||
for _, topic := range topics { | ||
for _, partition := range topic.Partitions { | ||
broker, err := b.client.Leader(topic.Name, partition.ID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leader id is available in partition.Leader
. We could use this id for the grouping of partitions per broker.
If we continue using b.client.Leader()
we have to remember to Close()
the returned broker. Maybe we can use b.client.Brokers()
to look for the broker per id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the method b.client.Leader(topic, partition)
will always return the most actual leader (there might be a metadata update in background, right?).
The method b.client.Brokers()
returns brokers without opening connections to them. To establish a connection, I would need a configuration structure: broker.Open(conf *Config)
. Leader()
handles it on its own.
Regarding Close()
, I think you're right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There can still be a problem with this approach, here we are calling Leader
for each topic and partition, this may open too many connections to brokers, and we may be leaking connections because we only keep track of one connection per broker in leaderBrokers
.
Using the method
b.client.Leader(topic, partition)
will always return the most actual leader (there might be a metadata update in background, right?).
This is right, but between this moment and the moment we make the offsets request there can still be some metadata change, if we want to solve this for good (not sure if it worths it) we would need to handle leadership errors (second option in #13380).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answered below.
metricbeat/module/kafka/broker.go
Outdated
if _, ok := leaderTopicPartition[broker.ID()]; !ok { | ||
leaderTopicPartition[broker.ID()] = map[string]int32{} | ||
} | ||
leaderTopicPartition[broker.ID()][topic.Name] = partition.ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it safe to assume that there cannot be two partitions for the same topic in the same broker? (Probably, but I am not sure about that)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There might be a case in which the number of Kafka brokers is lower than number of Kafka partitions of the same topic, so would rather keep this map as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, we may need to list multiple partitions for the same topic in the same broker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
metricbeat/module/kafka/broker.go
Outdated
} | ||
|
||
block := resp.GetBlock(topic, partition) | ||
if len(block.Offsets) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also check block.Err
in any case?
continue | ||
} else if newestPartitionOffsets.Err != nil { | ||
msg := fmt.Errorf("failed to query kafka partition (%v:%v) newest offsets: %v", | ||
topic.Name, partition.ID, newestPartitionOffsets.Err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. Extract this common logic for oldest and newest offset to a method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extracted.
jenkins, test this again please |
metricbeat/module/kafka/broker.go
Outdated
|
||
for _, topic := range topics { | ||
for _, partition := range topic.Partitions { | ||
broker, err := b.client.Leader(topic.Name, partition.ID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There can still be a problem with this approach, here we are calling Leader
for each topic and partition, this may open too many connections to brokers, and we may be leaking connections because we only keep track of one connection per broker in leaderBrokers
.
Using the method
b.client.Leader(topic, partition)
will always return the most actual leader (there might be a metadata update in background, right?).
This is right, but between this moment and the moment we make the offsets request there can still be some metadata change, if we want to solve this for good (not sure if it worths it) we would need to handle leadership errors (second option in #13380).
metricbeat/module/kafka/broker.go
Outdated
if _, ok := leaderTopicPartition[broker.ID()]; !ok { | ||
leaderTopicPartition[broker.ID()] = map[string]int32{} | ||
} | ||
leaderTopicPartition[broker.ID()][topic.Name] = partition.ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, we may need to list multiple partitions for the same topic in the same broker.
metricbeat/module/kafka/broker.go
Outdated
req.AddBlock(topic, partition, time, 1) | ||
} | ||
|
||
resp, err := leaderBrokers[leader].GetAvailableOffsets(req) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit and probably unneeded optimizatiod 😄: we could parallelize requests per leader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
If you consider this solution as incomplete, vulnerable, please don't hesitate to close this PR. Here, I focused on the first option, but maybe the other one might be better. |
for _, partition := range topic.Partitions { | ||
if _, ok := leaderTopicPartition[partition.Leader]; !ok { | ||
leaderTopicPartition[partition.Leader] = []topicPartition{} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. This initialization is not needed, append
will initialize it if needed.
|
||
resp, err := broker.GetAvailableOffsets(req) | ||
if err != nil { | ||
err = fmt.Errorf("get available offsets failed by leader (ID: %d): %v", brokerID, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. We could use errors.Wrap
to add context to these errors and others added in this PR.
} | ||
|
||
func (b *Broker) queryBrokerForPartitionOffsets(brokerID int32, topicPartitions []topicPartition, time int64) map[string]map[int32]PartitionOffsets { | ||
req := new(sarama.OffsetRequest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something I am missing here is that in previous implementation we were making a request per replica (the request done in b.PartitionOffset()
was also modified with req.SetReplicaID()
).
Are we missing the offsets of replicas now? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this may work (replicaID = -1 means a leader), with I believe this is getting too complex.
We had an offline discussion about this issue. It's definitely too complex comparing to the gain we may have here. Also, there is still a problem with not fresh metadata, which apparently can be solved only by retries. Resolving. |
This PR addresses issue reported in #13380 .
Briefly - for the partition metricset:
Meta-issue: #14852