-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/kakfa exporter - kafka key by TraceID #25909
Feature/kakfa exporter - kafka key by TraceID #25909
Conversation
…ey-by-traceid # Conflicts: # exporter/kafkaexporter/go.mod
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you making the changes, the only concern I have is now we are splitting the traces by default which is a change in behaviour.
Ideally this should be controlled by a configuration field, something like:
key_data: [none, traceid] # OneOf none Trace ID
Otherwise, you will need to:
- Add a change log entry
- Add some tests around the changes
@MovieStoreGuy I've updated the PR with |
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
@MovieStoreGuy , @pavolloffay can you please review? |
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is good, and I've left comments for things that need to be added.
I am not entirely convinced on the approach, the way I envision this is there is a new type named KeyedEncoder
that extends the existing encoding types.
I wonder if it would be possible to do something like:
type KeyedTracesMarshaler struct {
base TraceMarshaler
keyBy KeyByFunc
}
func (keyed KeyedTracesMarshaler) Marshal(in ptrace.Traces, topic string) (...) {
for _, trace := range keyed.keyBy(in) {
msg, err := keyed.base.Marshal(trace, topic)
if err != nil {
return nil, err
}
msgs = append(msgs,
&sarama.ProducerMessage{
Topic: topic,
Value: sarama.byteEncoder(msg),
Key: sarama.ByteEncoder(keyed.getKey(trace)),
},
)
}
}
This would allow you just wrap any existing TraceMarshler and still implement the expected interface without having to extend it.
@@ -28,6 +28,9 @@ type Config struct { | |||
// Encoding of messages (default "otlp_proto") | |||
Encoding string `mapstructure:"encoding"` | |||
|
|||
// KeyData of messages (default "none") | |||
KeyData string `mapstructure:"key_data"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say that ""
is also the same as none
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add this to the Validation
method as well.
@@ -118,6 +120,14 @@ type kafkaExporterFactory struct { | |||
logsMarshalers map[string]LogsMarshaler | |||
} | |||
|
|||
func KeyOfTracerMarshaller(marshaler TracesMarshaler) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function doesn't it clear of the intent, it should also not be exported.
Could I ask you to rename add some comments around it to help explain its usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the key used to be only the Encoding
, see line 48, now the key is combination of Encoding
and KeyData
if marshaler == nil { | ||
if config.KeyData != "none" && config.KeyData != "traceID" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally this should be done within the config.Validate
as well to help make it easier for users to understand the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wanted to make least changes needed, and mechanism of validation was done in that place so i kept it here.
I'll try to move it to suggested config.Validate
type pdataTracesMarshalerByTraceId pdataTracesMarshaler | ||
|
||
func (p pdataTracesMarshalerByTraceId) Marshal(td ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) { | ||
var messages []*sarama.ProducerMessage | ||
|
||
for _, tracesById := range batchpersignal.SplitTraces(td) { | ||
bts, err := p.marshaler.MarshalTraces(tracesById) | ||
if err != nil { | ||
return nil, err | ||
} | ||
var traceID = tracesById.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID() | ||
key := traceutil.TraceIDToHexOrEmptyString(traceID) | ||
messages = append(messages, &sarama.ProducerMessage{ | ||
Topic: topic, | ||
Value: sarama.ByteEncoder(bts), | ||
Key: sarama.ByteEncoder(key), | ||
}) | ||
} | ||
return messages, nil | ||
} | ||
|
||
func (p pdataTracesMarshalerByTraceId) Encoding() string { | ||
return p.encoding | ||
} | ||
|
||
func (p pdataTracesMarshalerByTraceId) KeyData() string { | ||
return "traceID" | ||
} | ||
|
||
func newPdataTracesMarshaler(marshaler ptrace.Marshaler, encoding string) TracesMarshaler { | ||
return pdataTracesMarshaler{ | ||
marshaler: marshaler, | ||
encoding: encoding, | ||
} | ||
} | ||
|
||
func newPdataTracesMarshalerByTraceId(marshaler ptrace.Marshaler, encoding string) TracesMarshaler { | ||
return pdataTracesMarshalerByTraceId{ | ||
marshaler: marshaler, | ||
encoding: encoding, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move this new code contributions to a new file to make it easier to maintain.
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
Closed as inactive. Feel free to reopen if this PR is still being worked on. |
Hey! My company needs this feature. @arik-dig, can I take over and apply the maintainers comments? |
Hi @MarcinGinszt, Sure, please help and take over 🙏 |
Hi @arik-dig, @MarcinGinszt! My company is also looking for this functionality 😆, and fairly urgently . As such, I have been working on a new branch here with an updated implementation and some more robust unit tests. It still needs a bit of cleanup, but I'm planning on opening a PR soon. Regards. |
Linking out the open PR to resolve the same issue: #29660 |
Description:
Kafka Exporter - been modified so it publish kafka messages with key of TraceID - it will allow to partition the kafka Topic and consume it concurrently
Link to tracking Issue:
relates to issue #12318
Testing:
I've installed the modified version and seeing the Key being filled as expected (seeing via Kafka UI)
here are entries BEFORE the change (the key does not exist)
Documentation:
Did not touch it