Skip to content

Commit

Permalink
Add Kafka escapeHeaders documentation (#4332)
Browse files Browse the repository at this point in the history
* Add Kafka escapeHeaders documentation

Signed-off-by: Anton Troshin <[email protected]>

* update the escapeHeaders setting docs

Signed-off-by: Anton Troshin <[email protected]>

* review fixes

Signed-off-by: Anton Troshin <[email protected]>

---------

Signed-off-by: Anton Troshin <[email protected]>
Co-authored-by: Yaron Schneider <[email protected]>
  • Loading branch information
antontroshin and yaron2 authored Sep 6, 2024
1 parent 581fabb commit 0e22a69
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ spec:
value: true
- name: schemaLatestVersionCacheTTL # Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available.
value: 5m
- name: escapeHeaders # Optional.
value: false
```
## Spec metadata fields
Expand Down Expand Up @@ -99,6 +101,7 @@ spec:
| `consumerFetchDefault` | N | Input/Output | The default number of message bytes to fetch from the broker in each request. Default is `"1048576"` bytes. | `"2097152"` |
| `heartbeatInterval` | N | Input | The interval between heartbeats to the consumer coordinator. At most, the value should be set to a 1/3 of the `sessionTimeout` value. Defaults to `"3s"`. | `"5s"` |
| `sessionTimeout` | N | Input | The timeout used to detect client failures when using Kafka’s group management facility. If the broker fails to receive any heartbeats from the consumer before the expiration of this session timeout, then the consumer is removed and initiates a rebalance. Defaults to `"10s"`. | `"20s"` |
| `escapeHeaders` | N | Input | Enables URL escaping of the message header values received by the consumer. Allows receiving content with special characters that are usually not allowed in HTTP headers. Default is `false`. | `true` |

#### Note
The metadata `version` must be set to `1.0.0` when using Azure EventHubs with Kafka.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ spec:
value: true
- name: schemaLatestVersionCacheTTL # Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available.
value: 5m
- name: escapeHeaders # Optional.
value: false

```

Expand Down Expand Up @@ -112,6 +114,7 @@ spec:
| consumerFetchDefault | N | The default number of message bytes to fetch from the broker in each request. Default is `"1048576"` bytes. | `"2097152"` |
| heartbeatInterval | N | The interval between heartbeats to the consumer coordinator. At most, the value should be set to a 1/3 of the `sessionTimeout` value. Defaults to "3s". | `"5s"` |
| sessionTimeout | N | The timeout used to detect client failures when using Kafka’s group management facility. If the broker fails to receive any heartbeats from the consumer before the expiration of this session timeout, then the consumer is removed and initiates a rebalance. Defaults to "10s". | `"20s"` |
| escapeHeaders | N | Enables URL escaping of the message header values received by the consumer. Allows receiving content with special characters that are usually not allowed in HTTP headers. Default is `false`. | `true` |

The `secretKeyRef` above is referencing a [kubernetes secrets store]({{< ref kubernetes-secret-store.md >}}) to access the tls information. Visit [here]({{< ref setup-secret-store.md >}}) to learn more about how to configure a secret store component.

Expand Down Expand Up @@ -485,6 +488,39 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correla
}'
```

## Receiving message headers with special characters

The consumer application may be required to receive message headers that include special characters, which may cause HTTP protocol validation errors.
HTTP header values must follow specifications, making some characters not allowed. [Learn more about the protocols](https://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2).
In this case, you can enable `escapeHeaders` configuration setting, which uses URL escaping to encode header values on the consumer side.

{{% alert title="Note" color="primary" %}}
When using this setting, the received message headers are URL escaped, and you need to URL "un-escape" it to get the original value.
{{% /alert %}}

Set `escapeHeaders` to `true` to URL escape.

```yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub-escape-headers
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # Required. Kafka broker connection setting
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: consumerGroup # Optional. Used for input bindings.
value: "group1"
- name: clientID # Optional. Used as client tracing ID by Kafka brokers.
value: "my-dapr-app-id"
- name: authType # Required.
value: "none"
- name: escapeHeaders
value: "true"
```

## Avro Schema Registry serialization/deserialization
You can configure pub/sub to publish or consume data encoded using [Avro binary serialization](https://avro.apache.org/docs/), leveraging an [Apache Schema Registry](https://developer.confluent.io/courses/apache-kafka/schema-registry/) (for example, [Confluent Schema Registry](https://developer.confluent.io/courses/apache-kafka/schema-registry/), [Apicurio](https://www.apicur.io/registry/)).

Expand Down Expand Up @@ -597,6 +633,7 @@ To run Kafka on Kubernetes, you can use any Kafka operator, such as [Strimzi](ht

{{< /tabs >}}


## Related links
- [Basic schema for a Dapr component]({{< ref component-schema >}})
- Read [this guide]({{< ref "howto-publish-subscribe.md##step-1-setup-the-pubsub-component" >}}) for instructions on configuring pub/sub components
Expand Down

0 comments on commit 0e22a69

Please sign in to comment.