Skip to content

Commit

Permalink
Add Kafka metadata opt-in setting to escapeHeaders (#3524)
Browse files Browse the repository at this point in the history
Signed-off-by: Anton Troshin <[email protected]>
  • Loading branch information
antontroshin authored Aug 30, 2024
1 parent 753d09e commit 27dfbaa
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 15 deletions.
10 changes: 9 additions & 1 deletion bindings/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -331,4 +331,12 @@ metadata:
description: |
The TTL for schema caching when publishing a message with latest schema available.
example: '"5m"'
default: '"5m"'
default: '"5m"'
- name: escapeHeaders
type: bool
required: false
description: |
Enables URL escaping of the message header values.
It allows sending headers with special characters that are usually not allowed in HTTP headers.
example: "true"
default: "false"
18 changes: 13 additions & 5 deletions common/component/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (consumer *consumer) doBulkCallback(session sarama.ConsumerGroupSession,

for i, message := range messages {
if message != nil {
metadata := GetEventMetadata(message)
metadata := GetEventMetadata(message, consumer.k.escapeHeaders)
handlerConfig, err := consumer.k.GetTopicHandlerConfig(message.Topic)
if err != nil {
return err
Expand Down Expand Up @@ -193,7 +193,7 @@ func (consumer *consumer) doCallback(session sarama.ConsumerGroupSession, messag
Topic: message.Topic,
Data: messageVal,
}
event.Metadata = GetEventMetadata(message)
event.Metadata = GetEventMetadata(message, consumer.k.escapeHeaders)

err = handlerConfig.Handler(session.Context(), &event)
if err == nil {
Expand All @@ -202,18 +202,26 @@ func (consumer *consumer) doCallback(session sarama.ConsumerGroupSession, messag
return err
}

func GetEventMetadata(message *sarama.ConsumerMessage) map[string]string {
func GetEventMetadata(message *sarama.ConsumerMessage, escapeHeaders bool) map[string]string {
if message != nil {
metadata := make(map[string]string, len(message.Headers)+5)
if message.Key != nil {
metadata[keyMetadataKey] = url.QueryEscape(string(message.Key))
if escapeHeaders {
metadata[keyMetadataKey] = url.QueryEscape(string(message.Key))
} else {
metadata[keyMetadataKey] = string(message.Key)
}
}
metadata[offsetMetadataKey] = strconv.FormatInt(message.Offset, 10)
metadata[topicMetadataKey] = message.Topic
metadata[timestampMetadataKey] = strconv.FormatInt(message.Timestamp.UnixMilli(), 10)
metadata[partitionMetadataKey] = strconv.FormatInt(int64(message.Partition), 10)
for _, header := range message.Headers {
metadata[string(header.Key)] = url.QueryEscape(string(header.Value))
if escapeHeaders {
metadata[string(header.Key)] = url.QueryEscape(string(header.Value))
} else {
metadata[string(header.Key)] = string(header.Value)
}
}
return metadata
}
Expand Down
2 changes: 2 additions & 0 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Kafka struct {
saslPassword string
initialOffset int64
config *sarama.Config
escapeHeaders bool

cg sarama.ConsumerGroup
subscribeTopics TopicHandlerConfig
Expand Down Expand Up @@ -135,6 +136,7 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
k.consumerGroup = meta.ConsumerGroup
k.initialOffset = meta.internalInitialOffset
k.authType = meta.AuthType
k.escapeHeaders = meta.EscapeHeaders

config := sarama.NewConfig()
config.Version = meta.internalVersion
Expand Down
2 changes: 2 additions & 0 deletions common/component/kafka/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type KafkaMetadata struct {
HeartbeatInterval time.Duration `mapstructure:"heartbeatInterval"`
SessionTimeout time.Duration `mapstructure:"sessionTimeout"`
Version string `mapstructure:"version"`
EscapeHeaders bool `mapstructure:"escapeHeaders"`
internalVersion sarama.KafkaVersion `mapstructure:"-"`
internalOidcExtensions map[string]string `mapstructure:"-"`

Expand Down Expand Up @@ -162,6 +163,7 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
ClientConnectionKeepAliveInterval: defaultClientConnectionKeepAliveInterval,
HeartbeatInterval: 3 * time.Second,
SessionTimeout: 10 * time.Second,
EscapeHeaders: false,
}

err := metadata.DecodeMetadata(meta, &m)
Expand Down
41 changes: 33 additions & 8 deletions common/component/kafka/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func TestGetEventMetadata(t *testing.T) {
m := sarama.ConsumerMessage{
Headers: nil, Timestamp: ts, Key: []byte("MyKey"), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
}
act := GetEventMetadata(&m)
act := GetEventMetadata(&m, false)
require.Len(t, act, 5)
require.Equal(t, strconv.FormatInt(ts.UnixMilli(), 10), act["__timestamp"])
require.Equal(t, "MyKey", act["__key"])
Expand All @@ -530,7 +530,7 @@ func TestGetEventMetadata(t *testing.T) {
m := sarama.ConsumerMessage{
Headers: headers, Timestamp: ts, Key: []byte("MyKey"), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
}
act := GetEventMetadata(&m)
act := GetEventMetadata(&m, false)
require.Len(t, act, 7)
require.Equal(t, strconv.FormatInt(ts.UnixMilli(), 10), act["__timestamp"])
require.Equal(t, "MyKey", act["__key"])
Expand All @@ -545,7 +545,7 @@ func TestGetEventMetadata(t *testing.T) {
m := sarama.ConsumerMessage{
Headers: nil, Timestamp: ts, Key: nil, Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
}
act := GetEventMetadata(&m)
act := GetEventMetadata(&m, false)
require.Len(t, act, 4)
require.Equal(t, strconv.FormatInt(ts.UnixMilli(), 10), act["__timestamp"])
require.Equal(t, "0", act["__partition"])
Expand All @@ -554,22 +554,32 @@ func TestGetEventMetadata(t *testing.T) {
})

t.Run("null message", func(t *testing.T) {
act := GetEventMetadata(nil)
act := GetEventMetadata(nil, false)
require.Nil(t, act)
})

t.Run("key with invalid value escaped", func(t *testing.T) {
t.Run("key with invalid value escapeHeaders true", func(t *testing.T) {
keyValue := "key1\xFF"
escapedKeyValue := url.QueryEscape(keyValue)

m := sarama.ConsumerMessage{
Headers: nil, Timestamp: ts, Key: []byte(keyValue), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
}
act := GetEventMetadata(&m)
act := GetEventMetadata(&m, true)
require.Equal(t, escapedKeyValue, act[keyMetadataKey])
})

t.Run("header with invalid value escaped", func(t *testing.T) {
t.Run("key with invalid value escapeHeaders false", func(t *testing.T) {
keyValue := "key1\xFF"

m := sarama.ConsumerMessage{
Headers: nil, Timestamp: ts, Key: []byte(keyValue), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
}
act := GetEventMetadata(&m, false)
require.Equal(t, keyValue, act[keyMetadataKey])
})

t.Run("header with invalid value escapeHeaders true", func(t *testing.T) {
headerKey := "key1"
headerValue := "value1\xFF"
escapedHeaderValue := url.QueryEscape(headerValue)
Expand All @@ -580,8 +590,23 @@ func TestGetEventMetadata(t *testing.T) {
m := sarama.ConsumerMessage{
Headers: headers, Timestamp: ts, Key: []byte("MyKey"), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
}
act := GetEventMetadata(&m)
act := GetEventMetadata(&m, true)
require.Len(t, act, 6)
require.Equal(t, escapedHeaderValue, act[headerKey])
})

t.Run("header with invalid value escapeHeaders false", func(t *testing.T) {
headerKey := "key1"
headerValue := "value1\xFF"

headers := []*sarama.RecordHeader{
{Key: []byte(headerKey), Value: []byte(headerValue)},
}
m := sarama.ConsumerMessage{
Headers: headers, Timestamp: ts, Key: []byte("MyKey"), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
}
act := GetEventMetadata(&m, false)
require.Len(t, act, 6)
require.Equal(t, headerValue, act[headerKey])
})
}
10 changes: 9 additions & 1 deletion pubsub/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,12 @@ metadata:
description: |
The TTL for schema caching when publishing a message with latest schema available.
example: '"5m"'
default: '"5m"'
default: '"5m"'
- name: escapeHeaders
type: bool
required: false
description: |
Enables URL escaping of the message header values.
It allows sending headers with special characters that are usually not allowed in HTTP headers.
example: "true"
default: "false"

0 comments on commit 27dfbaa

Please sign in to comment.