From cee41b41051fc82be8908bbc6782c14b88192f3c Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Tue, 15 Oct 2024 19:48:53 +0200 Subject: [PATCH] apply review comments --- sns/marshaler.go | 15 ++++++++------- sns/subscriber.go | 7 +++++-- sqs/marshaler.go | 2 +- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/sns/marshaler.go b/sns/marshaler.go index 9777309..0abedab 100644 --- a/sns/marshaler.go +++ b/sns/marshaler.go @@ -1,6 +1,7 @@ package sns import ( + "github.com/ThreeDotsLabs/watermill-amazonsqs/sqs" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sns/types" @@ -8,9 +9,6 @@ import ( "github.com/ThreeDotsLabs/watermill/message" ) -// todo: check if it can be renamed -const UUIDAttribute = "UUID" - type Marshaler interface { Marshal(topicArn TopicArn, msg *message.Message) *sns.PublishInput } @@ -21,9 +19,8 @@ func (d DefaultMarshalerUnmarshaler) Marshal(topicArn TopicArn, msg *message.Mes // client side uuid // there is a deduplication id that can be use for // fifo queues - // todo: check how it works attributes, deduplicationId, groupId := metadataToAttributes(msg.Metadata) - attributes[UUIDAttribute] = types.MessageAttributeValue{ + attributes[sqs.UUIDAttribute] = types.MessageAttributeValue{ StringValue: aws.String(msg.UUID), DataType: aws.String("String"), } @@ -42,11 +39,11 @@ func metadataToAttributes(meta message.Metadata) (map[string]types.MessageAttrib var deduplicationId, groupId *string for k, v := range meta { // SNS has special attributes for deduplication and group id - if k == "MessageDeduplicationId" { + if k == MessageDeduplicationIdMetadataField { deduplicationId = aws.String(v) continue } - if k == "MessageGroupId" { + if k == MessageGroupIdMetadataField { groupId = aws.String(v) continue } @@ -58,3 +55,7 @@ func metadataToAttributes(meta message.Metadata) (map[string]types.MessageAttrib return attributes, deduplicationId, groupId } + +const MessageDeduplicationIdMetadataField = "MessageDeduplicationId" + +const MessageGroupIdMetadataField = "MessageGroupId" diff --git a/sns/subscriber.go b/sns/subscriber.go index c56f53d..27f7442 100644 --- a/sns/subscriber.go +++ b/sns/subscriber.go @@ -118,7 +118,7 @@ func (s *Subscriber) SubscribeInitializeWithContext(ctx context.Context, topic s } if !s.config.DoNotSetQueueAccessPolicy { - if err := s.setSqsQuePolicy(ctx, *sqsQueueArn, snsTopicArn, *sqsURL); err != nil { + if err := s.setSqsQueuePolicy(ctx, *sqsQueueArn, snsTopicArn, *sqsURL); err != nil { return fmt.Errorf("cannot set queue access policy for topic %s: %w", snsTopicArn, err) } } @@ -147,12 +147,15 @@ func (s *Subscriber) SubscribeInitializeWithContext(ctx context.Context, topic s return nil } -func (s *Subscriber) setSqsQuePolicy(ctx context.Context, sqsQueueArn sqs.QueueArn, snsTopicArn TopicArn, sqsURL sqs.QueueURL) error { +func (s *Subscriber) setSqsQueuePolicy(ctx context.Context, sqsQueueArn sqs.QueueArn, snsTopicArn TopicArn, sqsURL sqs.QueueURL) error { policy, err := s.config.GenerateQueueAccessPolicy(ctx, GenerateQueueAccessPolicyParams{ SqsQueueArn: sqsQueueArn, SnsTopicArn: snsTopicArn, SqsURL: sqsURL, }) + if err != nil { + return fmt.Errorf("cannot generate queue access policy: %w", err) + } policyJSON, err := json.Marshal(policy) if err != nil { diff --git a/sqs/marshaler.go b/sqs/marshaler.go index 6090a0d..2a5d1d4 100644 --- a/sqs/marshaler.go +++ b/sqs/marshaler.go @@ -7,7 +7,7 @@ import ( "github.com/ThreeDotsLabs/watermill/message" ) -const UUIDAttribute = "UUID" +const UUIDAttribute = "_watermill_message_uuid" type Marshaler interface { Marshal(msg *message.Message) (*types.Message, error)