From 2bec841fa887509991efcf6b7dcab99b5906c1fe Mon Sep 17 00:00:00 2001 From: Leonardus Windiono Date: Tue, 20 Aug 2024 14:55:14 +0700 Subject: [PATCH 1/2] feat(v4): expose Kafka header --- eventstream.go | 38 +++++++++++--- kafka.go | 27 +++++++++- kafka_integration_test.go | 108 ++++++++++++++++++++++++++++++++++++++ validation.go | 24 +++++---- 4 files changed, 176 insertions(+), 21 deletions(-) diff --git a/eventstream.go b/eventstream.go index c68e4d8..abad43f 100644 --- a/eventstream.go +++ b/eventstream.go @@ -139,6 +139,12 @@ type SecurityConfig struct { SASLPassword string } +// Header represents a single entry in a list of record headers. +type Header struct { + Key string + Value []byte +} + // PublishBuilder defines the structure of message which is sent through message broker type PublishBuilder struct { id string @@ -164,6 +170,7 @@ type PublishBuilder struct { additionalFields map[string]interface{} key string payload map[string]interface{} + headers []Header ctx context.Context timeout time.Duration } @@ -314,6 +321,12 @@ func (p *PublishBuilder) Payload(payload map[string]interface{}) *PublishBuilder return p } +// Headers are event headers that will be published +func (p *PublishBuilder) Headers(headers []Header) *PublishBuilder { + p.headers = headers + return p +} + // Context define client context when publish event. // default: context.Background() func (p *PublishBuilder) Context(ctx context.Context) *PublishBuilder { @@ -335,14 +348,15 @@ func (p *PublishBuilder) Timeout(timeout time.Duration) *PublishBuilder { // SubscribeBuilder defines the structure of message which is sent through message broker type SubscribeBuilder struct { - topic string - groupID string - groupInstanceID string - offset int64 - callback func(ctx context.Context, event *Event, err error) error - eventName string - ctx context.Context - callbackRaw func(ctx context.Context, msgValue []byte, err error) error + topic string + groupID string + groupInstanceID string + offset int64 + callback func(ctx context.Context, event *Event, err error) error + eventName string + ctx context.Context + callbackRaw func(ctx context.Context, msgValue []byte, err error) error + callbackWithHeaders func(ctx context.Context, msgValue []byte, headers []Header, err error) error // flag to send error message to DLQ sendErrorDLQ bool // flag to use async commit consumer @@ -403,6 +417,14 @@ func (s *SubscribeBuilder) CallbackRaw( return s } +// CallbackWithHeaders callback that receives the undecoded payload and headers +func (s *SubscribeBuilder) CallbackWithHeaders( + f func(ctx context.Context, msgValue []byte, headers []Header, err error) error, +) *SubscribeBuilder { + s.callbackWithHeaders = f + return s +} + // Context define client context when subscribe event. // default: context.Background() func (s *SubscribeBuilder) Context(ctx context.Context) *SubscribeBuilder { diff --git a/kafka.go b/kafka.go index c3823f4..079e08f 100644 --- a/kafka.go +++ b/kafka.go @@ -529,8 +529,9 @@ func ConstructEvent(publishBuilder *PublishBuilder) (*kafka.Message, *Event, err } return &kafka.Message{ - Key: []byte(key), - Value: eventBytes, + Key: []byte(key), + Value: eventBytes, + Headers: toKafkaHeaders(publishBuilder.headers), }, event, nil } @@ -668,6 +669,9 @@ func (client *KafkaClient) Register(subscribeBuilder *SubscribeBuilder) error { if subscribeBuilder.callbackRaw != nil { err = subscribeBuilder.callbackRaw(subscribeBuilder.ctx, nil, subscribeBuilder.ctx.Err()) } + if subscribeBuilder.callbackWithHeaders != nil { + err = subscribeBuilder.callbackWithHeaders(subscribeBuilder.ctx, nil, nil, subscribeBuilder.ctx.Err()) + } loggerFields.Warn("triggered an external context cancellation. Cancelling the subscription") @@ -924,6 +928,9 @@ func (client *KafkaClient) getWriter(config *kafka.ConfigMap) (*kafka.Producer, // processMessage process a message from kafka func (client *KafkaClient) processMessage(subscribeBuilder *SubscribeBuilder, message *kafka.Message, topic string) error { + if subscribeBuilder.callbackWithHeaders != nil { + return subscribeBuilder.callbackWithHeaders(subscribeBuilder.ctx, message.Value, toEventHeaders(message.Headers), nil) + } if subscribeBuilder.callbackRaw != nil { return subscribeBuilder.callbackRaw(subscribeBuilder.ctx, message.Value, nil) } @@ -985,6 +992,22 @@ func unmarshal(message *kafka.Message) (*Event, error) { return event, nil } +func toEventHeaders(kafkaHeaders []kafka.Header) []Header { + var eventHeaders []Header + for _, header := range kafkaHeaders { + eventHeaders = append(eventHeaders, Header(header)) + } + return eventHeaders +} + +func toKafkaHeaders(eventHeaders []Header) []kafka.Header { + var kafkaHeaders []kafka.Header + for _, header := range eventHeaders { + kafkaHeaders = append(kafkaHeaders, kafka.Header(header)) + } + return kafkaHeaders +} + // runCallback run callback function when receive an event func (client *KafkaClient) runCallback( subscribeBuilder *SubscribeBuilder, diff --git a/kafka_integration_test.go b/kafka_integration_test.go index 1d65d90..436f124 100644 --- a/kafka_integration_test.go +++ b/kafka_integration_test.go @@ -1060,3 +1060,111 @@ func callerFuncName() string { } return "UnknownFunction" } + +// nolint:funlen +func TestKafkaPubSubWithHeaders(t *testing.T) { + t.Parallel() + + ctx, done := context.WithTimeout(context.Background(), time.Duration(timeoutTest)*time.Second) + defer done() + + doneChan := make(chan bool, 1) + + client := createKafkaClient(t) + + topicName := constructTopicTest() + + var mockPayload = make(map[string]interface{}) + mockPayload[testPayload] = "payloadContent" + + mockAdditionalFields := map[string]interface{}{ + "summary": "user:_failed", + } + + mockEvent := &Event{ + EventName: "testEventWithHeaders", + Namespace: "event", + ClientID: "b2a64320e9b041168089d3a01a194e6d", + TraceID: "e9c33e1c61bd4779953a55154c40944a", + SpanContext: "test-span-context", + UserID: "0455788f721f4d1185ca7b9df13a3483", + EventID: 3, // nolint:gomnd + EventType: 301, // nolint:gomnd + EventLevel: 3, //nolint:gomnd + ServiceName: "test", + ClientIDs: []string{"b1393b6444a44ebaba4156799fb7e2d1"}, + TargetUserIDs: []string{"fef576c71b8f44b1a78bcd74ac43ec27", "629ff6f7d0c34feca3f4056855416da2"}, + TargetNamespace: "publisher", + Privacy: true, + AdditionalFields: mockAdditionalFields, + Version: defaultVersion, + Payload: mockPayload, + } + + expectedHeaders := []Header{ + { + Key: "sequenceID", + Value: []byte(`{"abaafa8de77e4743a085ee10f41ddac5":20,"85ff6ae1361b4286aa8fc72ec9bf1e81":11,"38be0532113341adbc67691c14297ff2":5}`), + }, + } + + groupID := generateID() + err := client.Register( + NewSubscribe(). + Topic(topicName). + EventName(mockEvent.EventName). + GroupID(groupID). + Offset(0). + Context(ctx). + CallbackWithHeaders(func(ctx context.Context, _ []byte, headers []Header, err error) error { + if ctx.Err() != nil { + return ctx.Err() + } + + if err != nil { + return err + } + + require.NotEmpty(t, headers) + require.Equal(t, expectedHeaders, headers) + + doneChan <- true + + return nil + })) + require.NoError(t, err) + + err = client.Publish( + NewPublish(). + Topic(topicName). + EventName(mockEvent.EventName). + Namespace(mockEvent.Namespace). + ClientID(mockEvent.ClientID). + UserID(mockEvent.UserID). + SessionID(mockEvent.SessionID). + TraceID(mockEvent.TraceID). + SpanContext(mockEvent.SpanContext). + EventID(mockEvent.EventID). + EventType(mockEvent.EventType). + EventLevel(mockEvent.EventLevel). + ServiceName(mockEvent.ServiceName). + ClientIDs(mockEvent.ClientIDs). + TargetUserIDs(mockEvent.TargetUserIDs). + TargetNamespace(mockEvent.TargetNamespace). + Privacy(mockEvent.Privacy). + AdditionalFields(mockEvent.AdditionalFields). + Context(context.Background()). + Timeout(10 * time.Second). + Payload(mockPayload). + Headers(expectedHeaders)) + require.NoError(t, err) + + for { + select { + case <-doneChan: + return + case <-ctx.Done(): + assert.FailNow(t, errorTimeout) + } + } +} diff --git a/validation.go b/validation.go index ed57b54..62ca7c9 100644 --- a/validation.go +++ b/validation.go @@ -142,17 +142,19 @@ func validateSubscribeEvent(subscribeBuilder *SubscribeBuilder) error { } subscribeEvent := struct { - Topic string `valid:"required"` - EventName string - GroupID string - Callback func(ctx context.Context, event *Event, err error) error - CallbackRaw func(ctx context.Context, msg []byte, err error) error + Topic string `valid:"required"` + EventName string + GroupID string + Callback func(ctx context.Context, event *Event, err error) error + CallbackRaw func(ctx context.Context, msg []byte, err error) error + CallbackWithHeaders func(ctx context.Context, msg []byte, headers []Header, err error) error }{ - Topic: subscribeBuilder.topic, - EventName: subscribeBuilder.eventName, - GroupID: subscribeBuilder.groupID, - Callback: subscribeBuilder.callback, - CallbackRaw: subscribeBuilder.callbackRaw, + Topic: subscribeBuilder.topic, + EventName: subscribeBuilder.eventName, + GroupID: subscribeBuilder.groupID, + Callback: subscribeBuilder.callback, + CallbackRaw: subscribeBuilder.callbackRaw, + CallbackWithHeaders: subscribeBuilder.callbackWithHeaders, } _, err := validator.ValidateStruct(subscribeEvent) @@ -164,7 +166,7 @@ func validateSubscribeEvent(subscribeBuilder *SubscribeBuilder) error { return err } - if subscribeEvent.Callback == nil && subscribeEvent.CallbackRaw == nil { + if subscribeEvent.Callback == nil && subscribeEvent.CallbackRaw == nil && subscribeEvent.CallbackWithHeaders == nil { return errInvalidCallback } From f43185b85985bb67c6acba310ce4d0e5a6dd948b Mon Sep 17 00:00:00 2001 From: Leonardus Windiono Date: Thu, 22 Aug 2024 14:20:53 +0700 Subject: [PATCH 2/2] feat(v4): expose event metadata --- eventstream.go | 11 +++++++++-- kafka.go | 15 +++++++++++++-- kafka_integration_test.go | 7 +++++-- validation.go | 2 +- 4 files changed, 28 insertions(+), 7 deletions(-) diff --git a/eventstream.go b/eventstream.go index abad43f..adaf81a 100644 --- a/eventstream.go +++ b/eventstream.go @@ -54,6 +54,13 @@ const ( ErrorLevel = "error" ) +// EventMetadata defines the structure of event metadata +type EventMetadata struct { + Partition int `json:",omitempty"` + Offset int64 `json:",omitempty"` + Key string `json:",omitempty"` +} + // Event defines the structure of event type Event struct { ID string `json:"id,omitempty"` @@ -356,7 +363,7 @@ type SubscribeBuilder struct { eventName string ctx context.Context callbackRaw func(ctx context.Context, msgValue []byte, err error) error - callbackWithHeaders func(ctx context.Context, msgValue []byte, headers []Header, err error) error + callbackWithHeaders func(ctx context.Context, msgValue []byte, headers []Header, metadata *EventMetadata, err error) error // flag to send error message to DLQ sendErrorDLQ bool // flag to use async commit consumer @@ -419,7 +426,7 @@ func (s *SubscribeBuilder) CallbackRaw( // CallbackWithHeaders callback that receives the undecoded payload and headers func (s *SubscribeBuilder) CallbackWithHeaders( - f func(ctx context.Context, msgValue []byte, headers []Header, err error) error, + f func(ctx context.Context, msgValue []byte, headers []Header, metadata *EventMetadata, err error) error, ) *SubscribeBuilder { s.callbackWithHeaders = f return s diff --git a/kafka.go b/kafka.go index 079e08f..726fce5 100644 --- a/kafka.go +++ b/kafka.go @@ -670,7 +670,7 @@ func (client *KafkaClient) Register(subscribeBuilder *SubscribeBuilder) error { err = subscribeBuilder.callbackRaw(subscribeBuilder.ctx, nil, subscribeBuilder.ctx.Err()) } if subscribeBuilder.callbackWithHeaders != nil { - err = subscribeBuilder.callbackWithHeaders(subscribeBuilder.ctx, nil, nil, subscribeBuilder.ctx.Err()) + err = subscribeBuilder.callbackWithHeaders(subscribeBuilder.ctx, nil, nil, nil, subscribeBuilder.ctx.Err()) } loggerFields.Warn("triggered an external context cancellation. Cancelling the subscription") @@ -929,7 +929,7 @@ func (client *KafkaClient) getWriter(config *kafka.ConfigMap) (*kafka.Producer, // processMessage process a message from kafka func (client *KafkaClient) processMessage(subscribeBuilder *SubscribeBuilder, message *kafka.Message, topic string) error { if subscribeBuilder.callbackWithHeaders != nil { - return subscribeBuilder.callbackWithHeaders(subscribeBuilder.ctx, message.Value, toEventHeaders(message.Headers), nil) + return subscribeBuilder.callbackWithHeaders(subscribeBuilder.ctx, message.Value, toEventHeaders(message.Headers), getEventMetadata(message), nil) } if subscribeBuilder.callbackRaw != nil { return subscribeBuilder.callbackRaw(subscribeBuilder.ctx, message.Value, nil) @@ -1000,6 +1000,17 @@ func toEventHeaders(kafkaHeaders []kafka.Header) []Header { return eventHeaders } +func getEventMetadata(message *kafka.Message) *EventMetadata { + if message == nil { + return nil + } + return &EventMetadata{ + Partition: int(message.TopicPartition.Partition), + Offset: int64(message.TopicPartition.Offset), + Key: string(message.Key), + } +} + func toKafkaHeaders(eventHeaders []Header) []kafka.Header { var kafkaHeaders []kafka.Header for _, header := range eventHeaders { diff --git a/kafka_integration_test.go b/kafka_integration_test.go index 436f124..7d73cb0 100644 --- a/kafka_integration_test.go +++ b/kafka_integration_test.go @@ -1116,7 +1116,7 @@ func TestKafkaPubSubWithHeaders(t *testing.T) { GroupID(groupID). Offset(0). Context(ctx). - CallbackWithHeaders(func(ctx context.Context, _ []byte, headers []Header, err error) error { + CallbackWithHeaders(func(ctx context.Context, _ []byte, headers []Header, metadata *EventMetadata, err error) error { if ctx.Err() != nil { return ctx.Err() } @@ -1127,6 +1127,8 @@ func TestKafkaPubSubWithHeaders(t *testing.T) { require.NotEmpty(t, headers) require.Equal(t, expectedHeaders, headers) + require.NotNil(t, metadata) + require.Equal(t, "somekey", metadata.Key) doneChan <- true @@ -1156,7 +1158,8 @@ func TestKafkaPubSubWithHeaders(t *testing.T) { Context(context.Background()). Timeout(10 * time.Second). Payload(mockPayload). - Headers(expectedHeaders)) + Headers(expectedHeaders). + Key("somekey")) require.NoError(t, err) for { diff --git a/validation.go b/validation.go index 62ca7c9..844a6c2 100644 --- a/validation.go +++ b/validation.go @@ -147,7 +147,7 @@ func validateSubscribeEvent(subscribeBuilder *SubscribeBuilder) error { GroupID string Callback func(ctx context.Context, event *Event, err error) error CallbackRaw func(ctx context.Context, msg []byte, err error) error - CallbackWithHeaders func(ctx context.Context, msg []byte, headers []Header, err error) error + CallbackWithHeaders func(ctx context.Context, msg []byte, headers []Header, metadata *EventMetadata, err error) error }{ Topic: subscribeBuilder.topic, EventName: subscribeBuilder.eventName,