Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(v4): expose Kafka headers #81

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 37 additions & 8 deletions eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ const (
ErrorLevel = "error"
)

// EventMetadata defines the structure of event metadata
type EventMetadata struct {
Partition int `json:",omitempty"`
Offset int64 `json:",omitempty"`
Key string `json:",omitempty"`
}

// Event defines the structure of event
type Event struct {
ID string `json:"id,omitempty"`
Expand Down Expand Up @@ -139,6 +146,12 @@ type SecurityConfig struct {
SASLPassword string
}

// Header represents a single entry in a list of record headers.
type Header struct {
Key string
Value []byte
}

// PublishBuilder defines the structure of message which is sent through message broker
type PublishBuilder struct {
id string
Expand All @@ -164,6 +177,7 @@ type PublishBuilder struct {
additionalFields map[string]interface{}
key string
payload map[string]interface{}
headers []Header
ctx context.Context
timeout time.Duration
}
Expand Down Expand Up @@ -314,6 +328,12 @@ func (p *PublishBuilder) Payload(payload map[string]interface{}) *PublishBuilder
return p
}

// Headers are event headers that will be published
func (p *PublishBuilder) Headers(headers []Header) *PublishBuilder {
p.headers = headers
return p
}

// Context define client context when publish event.
// default: context.Background()
func (p *PublishBuilder) Context(ctx context.Context) *PublishBuilder {
Expand All @@ -335,14 +355,15 @@ func (p *PublishBuilder) Timeout(timeout time.Duration) *PublishBuilder {

// SubscribeBuilder defines the structure of message which is sent through message broker
type SubscribeBuilder struct {
topic string
groupID string
groupInstanceID string
offset int64
callback func(ctx context.Context, event *Event, err error) error
eventName string
ctx context.Context
callbackRaw func(ctx context.Context, msgValue []byte, err error) error
topic string
groupID string
groupInstanceID string
offset int64
callback func(ctx context.Context, event *Event, err error) error
eventName string
ctx context.Context
callbackRaw func(ctx context.Context, msgValue []byte, err error) error
callbackWithHeaders func(ctx context.Context, msgValue []byte, headers []Header, metadata *EventMetadata, err error) error
// flag to send error message to DLQ
sendErrorDLQ bool
// flag to use async commit consumer
Expand Down Expand Up @@ -403,6 +424,14 @@ func (s *SubscribeBuilder) CallbackRaw(
return s
}

// CallbackWithHeaders callback that receives the undecoded payload and headers
func (s *SubscribeBuilder) CallbackWithHeaders(
f func(ctx context.Context, msgValue []byte, headers []Header, metadata *EventMetadata, err error) error,
) *SubscribeBuilder {
s.callbackWithHeaders = f
return s
}

// Context define client context when subscribe event.
// default: context.Background()
func (s *SubscribeBuilder) Context(ctx context.Context) *SubscribeBuilder {
Expand Down
38 changes: 36 additions & 2 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,9 @@ func ConstructEvent(publishBuilder *PublishBuilder) (*kafka.Message, *Event, err
}

return &kafka.Message{
Key: []byte(key),
Value: eventBytes,
Key: []byte(key),
Value: eventBytes,
Headers: toKafkaHeaders(publishBuilder.headers),
}, event, nil
}

Expand Down Expand Up @@ -668,6 +669,9 @@ func (client *KafkaClient) Register(subscribeBuilder *SubscribeBuilder) error {
if subscribeBuilder.callbackRaw != nil {
err = subscribeBuilder.callbackRaw(subscribeBuilder.ctx, nil, subscribeBuilder.ctx.Err())
}
if subscribeBuilder.callbackWithHeaders != nil {
err = subscribeBuilder.callbackWithHeaders(subscribeBuilder.ctx, nil, nil, nil, subscribeBuilder.ctx.Err())
}

loggerFields.Warn("triggered an external context cancellation. Cancelling the subscription")

Expand Down Expand Up @@ -924,6 +928,9 @@ func (client *KafkaClient) getWriter(config *kafka.ConfigMap) (*kafka.Producer,

// processMessage process a message from kafka
func (client *KafkaClient) processMessage(subscribeBuilder *SubscribeBuilder, message *kafka.Message, topic string) error {
if subscribeBuilder.callbackWithHeaders != nil {
return subscribeBuilder.callbackWithHeaders(subscribeBuilder.ctx, message.Value, toEventHeaders(message.Headers), getEventMetadata(message), nil)
}
if subscribeBuilder.callbackRaw != nil {
return subscribeBuilder.callbackRaw(subscribeBuilder.ctx, message.Value, nil)
}
Expand Down Expand Up @@ -985,6 +992,33 @@ func unmarshal(message *kafka.Message) (*Event, error) {
return event, nil
}

func toEventHeaders(kafkaHeaders []kafka.Header) []Header {
var eventHeaders []Header
for _, header := range kafkaHeaders {
eventHeaders = append(eventHeaders, Header(header))
}
return eventHeaders
}

func getEventMetadata(message *kafka.Message) *EventMetadata {
if message == nil {
return nil
}
return &EventMetadata{
Partition: int(message.TopicPartition.Partition),
Offset: int64(message.TopicPartition.Offset),
Key: string(message.Key),
}
}

func toKafkaHeaders(eventHeaders []Header) []kafka.Header {
var kafkaHeaders []kafka.Header
for _, header := range eventHeaders {
kafkaHeaders = append(kafkaHeaders, kafka.Header(header))
}
return kafkaHeaders
}

// runCallback run callback function when receive an event
func (client *KafkaClient) runCallback(
subscribeBuilder *SubscribeBuilder,
Expand Down
111 changes: 111 additions & 0 deletions kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,3 +1060,114 @@ func callerFuncName() string {
}
return "UnknownFunction"
}

// nolint:funlen
func TestKafkaPubSubWithHeaders(t *testing.T) {
t.Parallel()

ctx, done := context.WithTimeout(context.Background(), time.Duration(timeoutTest)*time.Second)
defer done()

doneChan := make(chan bool, 1)

client := createKafkaClient(t)

topicName := constructTopicTest()

var mockPayload = make(map[string]interface{})
mockPayload[testPayload] = "payloadContent"

mockAdditionalFields := map[string]interface{}{
"summary": "user:_failed",
}

mockEvent := &Event{
EventName: "testEventWithHeaders",
Namespace: "event",
ClientID: "b2a64320e9b041168089d3a01a194e6d",
TraceID: "e9c33e1c61bd4779953a55154c40944a",
SpanContext: "test-span-context",
UserID: "0455788f721f4d1185ca7b9df13a3483",
EventID: 3, // nolint:gomnd
EventType: 301, // nolint:gomnd
EventLevel: 3, //nolint:gomnd
ServiceName: "test",
ClientIDs: []string{"b1393b6444a44ebaba4156799fb7e2d1"},
TargetUserIDs: []string{"fef576c71b8f44b1a78bcd74ac43ec27", "629ff6f7d0c34feca3f4056855416da2"},
TargetNamespace: "publisher",
Privacy: true,
AdditionalFields: mockAdditionalFields,
Version: defaultVersion,
Payload: mockPayload,
}

expectedHeaders := []Header{
{
Key: "sequenceID",
Value: []byte(`{"abaafa8de77e4743a085ee10f41ddac5":20,"85ff6ae1361b4286aa8fc72ec9bf1e81":11,"38be0532113341adbc67691c14297ff2":5}`),
},
}

groupID := generateID()
err := client.Register(
NewSubscribe().
Topic(topicName).
EventName(mockEvent.EventName).
GroupID(groupID).
Offset(0).
Context(ctx).
CallbackWithHeaders(func(ctx context.Context, _ []byte, headers []Header, metadata *EventMetadata, err error) error {
if ctx.Err() != nil {
return ctx.Err()
}

if err != nil {
return err
}

require.NotEmpty(t, headers)
require.Equal(t, expectedHeaders, headers)
require.NotNil(t, metadata)
require.Equal(t, "somekey", metadata.Key)

doneChan <- true

return nil
}))
require.NoError(t, err)

err = client.Publish(
NewPublish().
Topic(topicName).
EventName(mockEvent.EventName).
Namespace(mockEvent.Namespace).
ClientID(mockEvent.ClientID).
UserID(mockEvent.UserID).
SessionID(mockEvent.SessionID).
TraceID(mockEvent.TraceID).
SpanContext(mockEvent.SpanContext).
EventID(mockEvent.EventID).
EventType(mockEvent.EventType).
EventLevel(mockEvent.EventLevel).
ServiceName(mockEvent.ServiceName).
ClientIDs(mockEvent.ClientIDs).
TargetUserIDs(mockEvent.TargetUserIDs).
TargetNamespace(mockEvent.TargetNamespace).
Privacy(mockEvent.Privacy).
AdditionalFields(mockEvent.AdditionalFields).
Context(context.Background()).
Timeout(10 * time.Second).
Payload(mockPayload).
Headers(expectedHeaders).
Key("somekey"))
require.NoError(t, err)

for {
select {
case <-doneChan:
return
case <-ctx.Done():
assert.FailNow(t, errorTimeout)
}
}
}
24 changes: 13 additions & 11 deletions validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,19 @@ func validateSubscribeEvent(subscribeBuilder *SubscribeBuilder) error {
}

subscribeEvent := struct {
Topic string `valid:"required"`
EventName string
GroupID string
Callback func(ctx context.Context, event *Event, err error) error
CallbackRaw func(ctx context.Context, msg []byte, err error) error
Topic string `valid:"required"`
EventName string
GroupID string
Callback func(ctx context.Context, event *Event, err error) error
CallbackRaw func(ctx context.Context, msg []byte, err error) error
CallbackWithHeaders func(ctx context.Context, msg []byte, headers []Header, metadata *EventMetadata, err error) error
}{
Topic: subscribeBuilder.topic,
EventName: subscribeBuilder.eventName,
GroupID: subscribeBuilder.groupID,
Callback: subscribeBuilder.callback,
CallbackRaw: subscribeBuilder.callbackRaw,
Topic: subscribeBuilder.topic,
EventName: subscribeBuilder.eventName,
GroupID: subscribeBuilder.groupID,
Callback: subscribeBuilder.callback,
CallbackRaw: subscribeBuilder.callbackRaw,
CallbackWithHeaders: subscribeBuilder.callbackWithHeaders,
}

_, err := validator.ValidateStruct(subscribeEvent)
Expand All @@ -164,7 +166,7 @@ func validateSubscribeEvent(subscribeBuilder *SubscribeBuilder) error {
return err
}

if subscribeEvent.Callback == nil && subscribeEvent.CallbackRaw == nil {
if subscribeEvent.Callback == nil && subscribeEvent.CallbackRaw == nil && subscribeEvent.CallbackWithHeaders == nil {
return errInvalidCallback
}

Expand Down
Loading