Skip to content

Commit

Permalink
Feature: Persist forecasted grant opportunities to DynamoDB (#926)
Browse files Browse the repository at this point in the history
* Add types with grantRecord interface

* Refactor handler to use grantRecord interface

* Simplify DynamoDB helpers

* Update custom metric names in Datadog

* Add S3 trigger for forecasted grant persistence
  • Loading branch information
TylerHendrickson authored Oct 31, 2024
1 parent 36684e8 commit 962ac1d
Show file tree
Hide file tree
Showing 9 changed files with 481 additions and 79 deletions.
26 changes: 5 additions & 21 deletions cmd/PersistGrantsGovXMLDB/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
Expand All @@ -15,12 +14,8 @@ type DynamoDBUpdateItemAPI interface {
UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error)
}

func UpdateDynamoDBItem(ctx context.Context, c DynamoDBUpdateItemAPI, table string, opp opportunity) error {
key, err := buildKey(opp)
if err != nil {
return err
}
expr, err := buildUpdateExpression(opp)
func UpdateDynamoDBItem(ctx context.Context, c DynamoDBUpdateItemAPI, table string, key, attrs map[string]types.AttributeValue) error {
expr, err := buildUpdateExpression(attrs)
if err != nil {
return err
}
Expand All @@ -36,24 +31,13 @@ func UpdateDynamoDBItem(ctx context.Context, c DynamoDBUpdateItemAPI, table stri
return err
}

func buildKey(o opportunity) (map[string]types.AttributeValue, error) {
oid, err := attributevalue.Marshal(o.OpportunityID)

return map[string]types.AttributeValue{"grant_id": oid}, err
}

func buildUpdateExpression(o opportunity) (expression.Expression, error) {
oppAttr, err := attributevalue.MarshalMap(o)
if err != nil {
return expression.Expression{}, err
}

func buildUpdateExpression(m map[string]types.AttributeValue) (expression.Expression, error) {
update := expression.UpdateBuilder{}
for k, v := range oppAttr {
for k, v := range m {
update = update.Set(expression.Name(k), expression.Value(v))
}
update = awsHelpers.DDBSetRevisionForUpdate(update)
condition, err := awsHelpers.DDBIfAnyValueChangedCondition(oppAttr)
condition, err := awsHelpers.DDBIfAnyValueChangedCondition(m)
if err != nil {
return expression.Expression{}, err
}
Expand Down
48 changes: 33 additions & 15 deletions cmd/PersistGrantsGovXMLDB/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/stretchr/testify/assert"
grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov"
"github.com/stretchr/testify/require"
"github.com/usdigitalresponse/grants-ingest/internal/awsHelpers"
)

type mockUpdateItemAPI func(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error)
Expand All @@ -24,48 +24,66 @@ type mockDynamoDBUpdateItemAPI struct {
}

func TestUploadDynamoDBItem(t *testing.T) {
now := time.Now()
testTableName := "test-table"
testHashKey := map[string]types.AttributeValue{}
testHashKey["grant_id"] = &types.AttributeValueMemberS{Value: "123456"}
testError := fmt.Errorf("oh no this is an error")
testOpportunity := opportunity{
OpportunityID: "123456",
LastUpdatedDate: grantsgov.MMDDYYYYType(now.Format(grantsgov.TimeLayoutMMDDYYYYType)),
testItemAttrs := map[string]types.AttributeValue{
"someKey": &types.AttributeValueMemberS{Value: "is a key"},
"attrString": &types.AttributeValueMemberS{Value: "is a string"},
"attrBool": &types.AttributeValueMemberBOOL{Value: true},
}
testKey := map[string]types.AttributeValue{"someKey": testItemAttrs["someKey"]}

for _, tt := range []struct {
name string
client func(t *testing.T) DynamoDBUpdateItemAPI
expErr error
name string
key, attrs map[string]types.AttributeValue
client func(t *testing.T) DynamoDBUpdateItemAPI
expErr error
}{
{
"UpdateItem successful",
testKey,
testItemAttrs,
func(t *testing.T) DynamoDBUpdateItemAPI {
return mockUpdateItemAPI(func(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) {
t.Helper()
assert.Equal(t, aws.String(testTableName), params.TableName)
assert.Equal(t, testHashKey, params.Key)
assert.Equal(t, testKey, params.Key)
return &dynamodb.UpdateItemOutput{}, nil
})
},
nil,
},
{
"UpdateItem returns error",
testKey,
testItemAttrs,
func(t *testing.T) DynamoDBUpdateItemAPI {
return mockUpdateItemAPI(func(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) {
t.Helper()
assert.Equal(t, aws.String(testTableName), params.TableName)
assert.Equal(t, testHashKey, params.Key)
assert.Equal(t, testKey, params.Key)
return &dynamodb.UpdateItemOutput{}, testError
})
},
testError,
},
{
"Empty attribute map returns error",
testKey,
make(map[string]types.AttributeValue),
func(t *testing.T) DynamoDBUpdateItemAPI {
return mockUpdateItemAPI(func(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) {
t.Helper()
assert.Equal(t, aws.String(testTableName), params.TableName)
assert.Equal(t, testKey, params.Key)
require.Fail(t, "UpdateItem called unexpectedly")
return &dynamodb.UpdateItemOutput{}, nil
})
},
awsHelpers.ErrEmptyFields,
},
} {
t.Run(tt.name, func(t *testing.T) {
err := UpdateDynamoDBItem(context.TODO(), tt.client(t), testTableName, testOpportunity)
err := UpdateDynamoDBItem(context.TODO(), tt.client(t), testTableName, tt.key, tt.attrs)
if tt.expErr != nil {
assert.EqualError(t, err, tt.expErr.Error())
} else {
Expand Down
61 changes: 38 additions & 23 deletions cmd/PersistGrantsGovXMLDB/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,9 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/hashicorp/go-multierror"
"github.com/usdigitalresponse/grants-ingest/internal/log"
grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

const (
MB = int64(1024 * 1024)
GRANT_OPPORTUNITY_XML_NAME = "OpportunitySynopsisDetail_1_0"
)

type opportunity grantsgov.OpportunitySynopsisDetail_1_0

// handleS3Event handles events representing S3 bucket notifications of type "ObjectCreated:*"
// for XML DB extracts saved from Grants.gov and split into separate files via the SplitGrantsGovXMLDB Lambda.
// The XML data from the source S3 object provided represents an individual grant opportunity.
Expand All @@ -40,7 +32,7 @@ func handleS3EventWithConfig(s3svc *s3.Client, dynamodbsvc DynamoDBUpdateItemAPI
defer span.Finish(tracer.WithError(err))
defer func() {
if err != nil {
sendMetric("opportunity.failed", 1)
sendMetric("record.failed", 1)
}
}()

Expand All @@ -58,18 +50,13 @@ func handleS3EventWithConfig(s3svc *s3.Client, dynamodbsvc DynamoDBUpdateItemAPI
return err
}

data, err := io.ReadAll(resp.Body)
record, err := decodeNextGrantRecord(resp.Body)
resp.Body.Close()
if err != nil {
log.Error(logger, "Error reading source opportunity from S3", err)
return err
}

var opp opportunity
if err := xml.Unmarshal(data, &opp); err != nil {
log.Error(logger, "Error parsing opportunity from XML", err)
log.Error(logger, "Error decoding S3 object XML to record", err)
return err
}
return processOpportunity(ctx, dynamodbsvc, opp)
return processGrantRecord(ctx, dynamodbsvc, record)
})
}(record)
}
Expand All @@ -84,13 +71,41 @@ func handleS3EventWithConfig(s3svc *s3.Client, dynamodbsvc DynamoDBUpdateItemAPI
return nil
}

// decodeNextGrantRecord reads XML from r until a grantRecord can be decoded or EOF is reached.
// It stops reading and returns the grantRecord as soon as one is decoded.
// If there is an error reading or unmarshalling XML, it returns a nil grantRecord and the encountered error.
func decodeNextGrantRecord(r io.Reader) (grantRecord, error) {
d := xml.NewDecoder(r)
for {
token, err := d.Token()
if err != nil {
return nil, err
}
if se, ok := token.(xml.StartElement); ok {
if se.Name.Local == "OpportunitySynopsisDetail_1_0" {
var o opportunity
err := d.DecodeElement(&o, &se)
return o, err
}
if se.Name.Local == "OpportunityForecastDetail_1_0" {
f := forecast{}
err := d.DecodeElement(&f, &se)
return f, err
}
}
}
}

// processOpportunity takes a single opportunity and uploads an XML representation of the
// opportunity to its configured DynamoDB table.
func processOpportunity(ctx context.Context, svc DynamoDBUpdateItemAPI, opp opportunity) error {
logger := log.With(logger,
"opportunity_id", opp.OpportunityID, "opportunity_number", opp.OpportunityNumber)
func processGrantRecord(ctx context.Context, svc DynamoDBUpdateItemAPI, rec grantRecord) error {
logger := rec.logWith(logger)

if err := UpdateDynamoDBItem(ctx, svc, env.DestinationTable, opp); err != nil {
itemAttrs, err := rec.dynamoDBAttributeMap()
if err != nil {
return log.Errorf(logger, "Error marshaling grantRecord to DynamoDB attributes map", err)
}
if err := UpdateDynamoDBItem(ctx, svc, env.DestinationTable, rec.dynamoDBItemKey(), itemAttrs); err != nil {
var conditionalCheckErr *types.ConditionalCheckFailedException
if errors.As(err, &conditionalCheckErr) {
log.Warn(logger, "Grants.gov data already matches the target DynamoDB item",
Expand All @@ -101,6 +116,6 @@ func processOpportunity(ctx context.Context, svc DynamoDBUpdateItemAPI, opp oppo
}

log.Info(logger, "Successfully uploaded opportunity")
sendMetric("opportunity.saved", 1)
sendMetric("record.saved", 1)
return nil
}
Loading

0 comments on commit 962ac1d

Please sign in to comment.