Skip to content

Commit

Permalink
refactor: cleanup ingest Lambda and CDK (#50)
Browse files Browse the repository at this point in the history
* chore: cleanup CDK queues and environment variables

* chore: move pkg to internal, refactor package structure

* chore: remove unused plus addressing extract function

* chore: rename stateless-stack file

* refactor: final Go cleanup

* chore: add comments throughout existing codebase
  • Loading branch information
p5 authored Aug 15, 2024
1 parent f923b05 commit 7e9ca5e
Show file tree
Hide file tree
Showing 42 changed files with 758 additions and 1,156 deletions.
2 changes: 1 addition & 1 deletion services/ingest-service/functions/enqueue-email/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package main

type Config struct {
ReportStorageBucketName string `env:"INGEST_STORAGE_BUCKET_NAME"`
RawEmailQueueURL string `env:"RAW_EMAIL_QUEUE_URL"`
NextStageQueueURL string `env:"NEXT_STAGE_QUEUE_URL"`
}
57 changes: 57 additions & 0 deletions services/ingest-service/functions/enqueue-email/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"
"strings"

"github.com/aws/aws-lambda-go/events"
"github.com/rsturla/dmarc-monitor/services/ingest-service/internal/aws"
"github.com/rsturla/dmarc-monitor/services/ingest-service/internal/config"
"github.com/rsturla/dmarc-monitor/services/ingest-service/internal/errors"
"github.com/rsturla/dmarc-monitor/services/ingest-service/internal/models"
)

func handler(ctx context.Context, sesEvent events.SimpleEmailEvent) error {
config, err := config.NewConfig[Config]()
if err != nil {
return errors.NewLambdaError(500, fmt.Sprintf("error loading configuration: %v", err))
}

awsClient, err := aws.NewAWSClient(ctx)
if err != nil {
return errors.NewLambdaError(500, fmt.Sprintf("error creating AWS client: %v", err))
}

for _, record := range sesEvent.Records {
if err := processEmail(ctx, awsClient, config, record.SES.Mail); err != nil {
log.Printf("Error processing email with MessageID %s: %v", record.SES.Mail.MessageID, err)
// Optionally: continue processing other emails, or return the error
return errors.NewLambdaError(500, fmt.Sprintf("error processing email with MessageID %s: %v", record.SES.Mail.MessageID, err))
}
}

return nil
}

// processEmail processes an individual SES email message and adds it to the SQS queue for further processing downstream.
func processEmail(ctx context.Context, awsClient *aws.AWSClient, config *Config, mail events.SimpleEmailMessage) error {
tenantID := strings.Split(mail.Destination[0], "@")[0]
messageJSON, err := json.Marshal(models.IngestMessage{
TenantID: tenantID,
RawS3ObjectPath: fmt.Sprintf("raw/%s", mail.MessageID),
MessageTimestamp: fmt.Sprintf("%d", mail.Timestamp.Unix()),
MessageID: mail.MessageID,
})
if err != nil {
return errors.NewLambdaError(500, fmt.Sprintf("error marshalling message: %v", err))
}

if err := awsClient.SQSPublishMessage(ctx, config.NextStageQueueURL, string(messageJSON)); err != nil {
return errors.NewLambdaError(500, fmt.Sprintf("error publishing message to SQS: %v", err))
}

return nil
}
56 changes: 3 additions & 53 deletions services/ingest-service/functions/enqueue-email/main.go
Original file line number Diff line number Diff line change
@@ -1,74 +1,24 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"
"os"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/rsturla/dmarc-monitor/services/ingest-service/pkg/aws"
"github.com/rsturla/dmarc-monitor/services/ingest-service/pkg/aws/awslocal"
"github.com/rsturla/dmarc-monitor/services/ingest-service/pkg/config"
"github.com/rsturla/dmarc-monitor/services/ingest-service/pkg/message"
"github.com/rsturla/dmarc-monitor/services/ingest-service/pkg/models"
"github.com/rsturla/dmarc-monitor/services/ingest-service/internal/aws/awslocal"
)

func main() {
if os.Getenv("AWS_LAMBDA_RUNTIME_API") == "" {
event, ctx, err := awslocal.CreateLocalEvent[events.SimpleEmailEvent]("./sample-events/SQSEvent.json")
if err != nil {
log.Printf("Error creating local event: %v\n", err)
log.Fatalf("Error creating local event: %v\n", err)
}
if err := handler(ctx, event); err != nil {
log.Printf("Error processing local event: %v\n", err)
log.Fatalf("Error processing local event: %v\n", err)
}
} else {
lambda.Start(handler)
}
}

func handler(ctx context.Context, sesEvent events.SimpleEmailEvent) error {
config, err := config.NewConfig[Config]()
if err != nil {
return fmt.Errorf("error loading configuration: %w", err)
}

awsClient, err := aws.NewAWSClient(ctx)
if err != nil {
return fmt.Errorf("error creating AWS client: %w", err)
}

for _, record := range sesEvent.Records {
if err := processEmail(ctx, awsClient, config, record.SES.Mail); err != nil {
return fmt.Errorf("error processing email: %w", err)
}
}
log.Print("Email processing complete.")
return nil
}

func processEmail(ctx context.Context, awsClient *aws.AWSClient, config *Config, mail events.SimpleEmailMessage) error {
recipientTag, err := message.ExtractPlusAddress(mail.Destination[0])
if err != nil {
return fmt.Errorf("error extracting tag from recipient email address: %w", err)
}

messageJSON, err := json.Marshal(models.IngestSQSMessage{
TenantID: recipientTag,
S3ObjectPath: fmt.Sprintf("%s%s", "raw/", mail.MessageID),
Timestamp: fmt.Sprintf("%d", mail.Timestamp.Unix()),
MessageID: mail.MessageID,
})
if err != nil {
return fmt.Errorf("error marshalling message to JSON: %w", err)
}

if err := awsClient.PublishSQSMessage(ctx, config.RawEmailQueueURL, string(messageJSON)); err != nil {
return fmt.Errorf("error publishing message to SQS: %w", err)
}

return nil
}
71 changes: 71 additions & 0 deletions services/ingest-service/functions/extract-attachment/attachment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package main

import (
"context"
"encoding/json"
"fmt"
"io"
"time"

"github.com/rsturla/dmarc-monitor/services/ingest-service/internal/aws"
"github.com/rsturla/dmarc-monitor/services/ingest-service/internal/compress"
"github.com/rsturla/dmarc-monitor/services/ingest-service/internal/email/message"
"github.com/rsturla/dmarc-monitor/services/ingest-service/internal/errors"
"github.com/rsturla/dmarc-monitor/services/ingest-service/internal/models"
)

// processEmailAttachment processes an individual SES email attachment by decompressing
// it, saving it to the S3 bucket, and publishing a message to the next stage SQS queue.
func processEmailAttachment(ctx context.Context, attachment *message.Attachment, awsClient *aws.AWSClient, config *Config, sqsMessage *models.IngestMessage) error {
data, err := getAttachmentData(attachment)
if err != nil {
return errors.NewLambdaError(500, fmt.Sprintf("error getting attachment data: %v", err))
}

attachmentS3ObjectPath, err := saveReport(ctx, awsClient, config, sqsMessage.MessageID, sqsMessage.TenantID, data)
if err != nil {
return errors.NewLambdaError(500, fmt.Sprintf("error saving report to S3: %v", err))
}

messageJSON, err := json.Marshal(models.IngestMessage{
TenantID: sqsMessage.TenantID,
RawS3ObjectPath: sqsMessage.RawS3ObjectPath,
AttachmentS3ObjectPath: attachmentS3ObjectPath,
MessageTimestamp: sqsMessage.MessageTimestamp,
MessageID: sqsMessage.MessageID,
})
if err != nil {
return errors.NewLambdaError(500, fmt.Sprintf("error marshalling message: %v", err))
}

if err := awsClient.SQSPublishMessage(ctx, config.NextStageQueueURL, string(messageJSON)); err != nil {
return errors.NewLambdaError(500, fmt.Sprintf("error publishing message to SQS: %v", err))
}
return nil
}

// getAttachmentData reads the attachment data, decompresses it, and returns the uncompressed data.
func getAttachmentData(attachment *message.Attachment) ([]byte, error) {
data, err := io.ReadAll(attachment.Data)
if err != nil {
return nil, errors.NewLambdaError(500, fmt.Sprintf("error reading attachment data: %v", err))
}

uncompressed, err := compress.Decompress(data, attachment.ContentType)
if err != nil {
return nil, errors.NewLambdaError(500, fmt.Sprintf("error decompressing attachment data: %v", err))
}

return uncompressed, nil
}

// saveReport saves the report data to the S3 bucket and returns the S3 key.
func saveReport(ctx context.Context, awsClient *aws.AWSClient, config *Config, messageID string, tenantID string, data []byte) (string, error) {
s3Key := fmt.Sprintf("reports/%s/%s/%s.xml", tenantID, time.Now().Format("2006/01/02"), messageID)
contentType := "application/xml"
if err := awsClient.S3PutObject(ctx, config.ReportStorageBucketName, s3Key, contentType, data); err != nil {
return "", errors.NewLambdaError(500, fmt.Sprintf("error saving report to S3: %v", err))
}

return s3Key, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package main

type Config struct {
ReportStorageBucketName string `env:"INGEST_STORAGE_BUCKET_NAME"`
ReportQueueURL string `env:"INGEST_QUEUE_URL"`
NextStageQueueURL string `env:"NEXT_STAGE_QUEUE_URL"`
}
73 changes: 73 additions & 0 deletions services/ingest-service/functions/extract-attachment/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package main

import (
"bytes"
"context"
"fmt"
"log"

"github.com/aws/aws-lambda-go/events"
"github.com/rsturla/dmarc-monitor/services/ingest-service/internal/aws"
"github.com/rsturla/dmarc-monitor/services/ingest-service/internal/config"
"github.com/rsturla/dmarc-monitor/services/ingest-service/internal/email/message"
"github.com/rsturla/dmarc-monitor/services/ingest-service/internal/errors"
"github.com/rsturla/dmarc-monitor/services/ingest-service/internal/models"
)

func handler(ctx context.Context, sqsEvent events.SQSEvent) error {
config, err := config.NewConfig[Config]()
if err != nil {
return errors.NewLambdaError(500, fmt.Sprintf("error loading configuration: %v", err))
}

awsClient, err := aws.NewAWSClient(ctx)
if err != nil {
return errors.NewLambdaError(500, fmt.Sprintf("error creating AWS client: %v", err))
}

for _, record := range sqsEvent.Records {
if err := processRecord(ctx, awsClient, config, record); err != nil {
log.Printf("Error processing SQS message with MessageID %s: %v", record.MessageId, err)
return errors.NewLambdaError(500, fmt.Sprintf("error processing message with SQS MessageID %s: %v", record.MessageId, err))
}
}

return nil
}

// processRecord processes an individual SQS record and extracts the attachment into the S3 bucket
func processRecord(ctx context.Context, awsClient *aws.AWSClient, config *Config, record events.SQSMessage) error {
var sqsMessage models.IngestMessage
if err := aws.ParseSQSMessage(record.Body, &sqsMessage); err != nil {
return errors.NewLambdaError(500, fmt.Sprintf("error unmarshalling message: %v", err))
}

rawEmail, err := getRawEmail(ctx, awsClient, config, sqsMessage.MessageID)
if err != nil {
return err
}

email, err := message.ParseMail(bytes.NewReader(rawEmail))
if err != nil {
return errors.NewLambdaError(500, fmt.Sprintf("error parsing email: %v", err))
}

for _, attachment := range email.Attachments {
// Save the report to the S3 bucket - under the reports/<message> key
if err := processEmailAttachment(ctx, &attachment, awsClient, config, &sqsMessage); err != nil {
return err
}
}

return nil
}

// getRawEmail retrieves the raw email from S3
func getRawEmail(ctx context.Context, awsClient *aws.AWSClient, config *Config, messageID string) ([]byte, error) {
body, err := awsClient.S3GetObject(ctx, config.ReportStorageBucketName, messageID)
if err != nil {
return nil, errors.NewLambdaError(500, fmt.Sprintf("error getting raw email from S3: %v", err))
}

return body, nil
}
Loading

0 comments on commit 7e9ca5e

Please sign in to comment.