From 962ac1de6d86201fc2372a36930c79636c80f094 Mon Sep 17 00:00:00 2001
From: Tyler Hendrickson <1851017+TylerHendrickson@users.noreply.github.com>
Date: Thu, 31 Oct 2024 10:42:57 -0700
Subject: [PATCH] Feature: Persist forecasted grant opportunities to DynamoDB
(#926)
* Add types with grantRecord interface
* Refactor handler to use grantRecord interface
* Simplify DynamoDB helpers
* Update custom metric names in Datadog
* Add S3 trigger for forecasted grant persistence
---
cmd/PersistGrantsGovXMLDB/dynamodb.go | 26 +--
cmd/PersistGrantsGovXMLDB/dynamodb_test.go | 48 ++--
cmd/PersistGrantsGovXMLDB/handler.go | 61 +++--
cmd/PersistGrantsGovXMLDB/handler_test.go | 257 ++++++++++++++++++++-
cmd/PersistGrantsGovXMLDB/types.go | 70 ++++++
cmd/PersistGrantsGovXMLDB/types_test.go | 54 +++++
terraform/datadog_dashboard.tf | 26 ++-
terraform/main.tf | 6 +
terraform/staging.tfvars | 12 +-
9 files changed, 481 insertions(+), 79 deletions(-)
create mode 100644 cmd/PersistGrantsGovXMLDB/types.go
create mode 100644 cmd/PersistGrantsGovXMLDB/types_test.go
diff --git a/cmd/PersistGrantsGovXMLDB/dynamodb.go b/cmd/PersistGrantsGovXMLDB/dynamodb.go
index 83c9808c..86684a04 100644
--- a/cmd/PersistGrantsGovXMLDB/dynamodb.go
+++ b/cmd/PersistGrantsGovXMLDB/dynamodb.go
@@ -4,7 +4,6 @@ import (
"context"
"github.com/aws/aws-sdk-go-v2/aws"
- "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
@@ -15,12 +14,8 @@ type DynamoDBUpdateItemAPI interface {
UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error)
}
-func UpdateDynamoDBItem(ctx context.Context, c DynamoDBUpdateItemAPI, table string, opp opportunity) error {
- key, err := buildKey(opp)
- if err != nil {
- return err
- }
- expr, err := buildUpdateExpression(opp)
+func UpdateDynamoDBItem(ctx context.Context, c DynamoDBUpdateItemAPI, table string, key, attrs map[string]types.AttributeValue) error {
+ expr, err := buildUpdateExpression(attrs)
if err != nil {
return err
}
@@ -36,24 +31,13 @@ func UpdateDynamoDBItem(ctx context.Context, c DynamoDBUpdateItemAPI, table stri
return err
}
-func buildKey(o opportunity) (map[string]types.AttributeValue, error) {
- oid, err := attributevalue.Marshal(o.OpportunityID)
-
- return map[string]types.AttributeValue{"grant_id": oid}, err
-}
-
-func buildUpdateExpression(o opportunity) (expression.Expression, error) {
- oppAttr, err := attributevalue.MarshalMap(o)
- if err != nil {
- return expression.Expression{}, err
- }
-
+func buildUpdateExpression(m map[string]types.AttributeValue) (expression.Expression, error) {
update := expression.UpdateBuilder{}
- for k, v := range oppAttr {
+ for k, v := range m {
update = update.Set(expression.Name(k), expression.Value(v))
}
update = awsHelpers.DDBSetRevisionForUpdate(update)
- condition, err := awsHelpers.DDBIfAnyValueChangedCondition(oppAttr)
+ condition, err := awsHelpers.DDBIfAnyValueChangedCondition(m)
if err != nil {
return expression.Expression{}, err
}
diff --git a/cmd/PersistGrantsGovXMLDB/dynamodb_test.go b/cmd/PersistGrantsGovXMLDB/dynamodb_test.go
index 90873892..39a43b73 100644
--- a/cmd/PersistGrantsGovXMLDB/dynamodb_test.go
+++ b/cmd/PersistGrantsGovXMLDB/dynamodb_test.go
@@ -4,13 +4,13 @@ import (
"context"
"fmt"
"testing"
- "time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/stretchr/testify/assert"
- grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov"
+ "github.com/stretchr/testify/require"
+ "github.com/usdigitalresponse/grants-ingest/internal/awsHelpers"
)
type mockUpdateItemAPI func(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error)
@@ -24,28 +24,29 @@ type mockDynamoDBUpdateItemAPI struct {
}
func TestUploadDynamoDBItem(t *testing.T) {
- now := time.Now()
testTableName := "test-table"
- testHashKey := map[string]types.AttributeValue{}
- testHashKey["grant_id"] = &types.AttributeValueMemberS{Value: "123456"}
testError := fmt.Errorf("oh no this is an error")
- testOpportunity := opportunity{
- OpportunityID: "123456",
- LastUpdatedDate: grantsgov.MMDDYYYYType(now.Format(grantsgov.TimeLayoutMMDDYYYYType)),
+ testItemAttrs := map[string]types.AttributeValue{
+ "someKey": &types.AttributeValueMemberS{Value: "is a key"},
+ "attrString": &types.AttributeValueMemberS{Value: "is a string"},
+ "attrBool": &types.AttributeValueMemberBOOL{Value: true},
}
+ testKey := map[string]types.AttributeValue{"someKey": testItemAttrs["someKey"]}
for _, tt := range []struct {
- name string
- client func(t *testing.T) DynamoDBUpdateItemAPI
- expErr error
+ name string
+ key, attrs map[string]types.AttributeValue
+ client func(t *testing.T) DynamoDBUpdateItemAPI
+ expErr error
}{
{
"UpdateItem successful",
+ testKey,
+ testItemAttrs,
func(t *testing.T) DynamoDBUpdateItemAPI {
return mockUpdateItemAPI(func(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) {
t.Helper()
- assert.Equal(t, aws.String(testTableName), params.TableName)
- assert.Equal(t, testHashKey, params.Key)
+ assert.Equal(t, testKey, params.Key)
return &dynamodb.UpdateItemOutput{}, nil
})
},
@@ -53,19 +54,36 @@ func TestUploadDynamoDBItem(t *testing.T) {
},
{
"UpdateItem returns error",
+ testKey,
+ testItemAttrs,
func(t *testing.T) DynamoDBUpdateItemAPI {
return mockUpdateItemAPI(func(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) {
t.Helper()
assert.Equal(t, aws.String(testTableName), params.TableName)
- assert.Equal(t, testHashKey, params.Key)
+ assert.Equal(t, testKey, params.Key)
return &dynamodb.UpdateItemOutput{}, testError
})
},
testError,
},
+ {
+ "Empty attribute map returns error",
+ testKey,
+ make(map[string]types.AttributeValue),
+ func(t *testing.T) DynamoDBUpdateItemAPI {
+ return mockUpdateItemAPI(func(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) {
+ t.Helper()
+ assert.Equal(t, aws.String(testTableName), params.TableName)
+ assert.Equal(t, testKey, params.Key)
+ require.Fail(t, "UpdateItem called unexpectedly")
+ return &dynamodb.UpdateItemOutput{}, nil
+ })
+ },
+ awsHelpers.ErrEmptyFields,
+ },
} {
t.Run(tt.name, func(t *testing.T) {
- err := UpdateDynamoDBItem(context.TODO(), tt.client(t), testTableName, testOpportunity)
+ err := UpdateDynamoDBItem(context.TODO(), tt.client(t), testTableName, tt.key, tt.attrs)
if tt.expErr != nil {
assert.EqualError(t, err, tt.expErr.Error())
} else {
diff --git a/cmd/PersistGrantsGovXMLDB/handler.go b/cmd/PersistGrantsGovXMLDB/handler.go
index 9c50fa29..0e15f9e2 100644
--- a/cmd/PersistGrantsGovXMLDB/handler.go
+++ b/cmd/PersistGrantsGovXMLDB/handler.go
@@ -12,17 +12,9 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/hashicorp/go-multierror"
"github.com/usdigitalresponse/grants-ingest/internal/log"
- grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)
-const (
- MB = int64(1024 * 1024)
- GRANT_OPPORTUNITY_XML_NAME = "OpportunitySynopsisDetail_1_0"
-)
-
-type opportunity grantsgov.OpportunitySynopsisDetail_1_0
-
// handleS3Event handles events representing S3 bucket notifications of type "ObjectCreated:*"
// for XML DB extracts saved from Grants.gov and split into separate files via the SplitGrantsGovXMLDB Lambda.
// The XML data from the source S3 object provided represents an individual grant opportunity.
@@ -40,7 +32,7 @@ func handleS3EventWithConfig(s3svc *s3.Client, dynamodbsvc DynamoDBUpdateItemAPI
defer span.Finish(tracer.WithError(err))
defer func() {
if err != nil {
- sendMetric("opportunity.failed", 1)
+ sendMetric("record.failed", 1)
}
}()
@@ -58,18 +50,13 @@ func handleS3EventWithConfig(s3svc *s3.Client, dynamodbsvc DynamoDBUpdateItemAPI
return err
}
- data, err := io.ReadAll(resp.Body)
+ record, err := decodeNextGrantRecord(resp.Body)
+ resp.Body.Close()
if err != nil {
- log.Error(logger, "Error reading source opportunity from S3", err)
- return err
- }
-
- var opp opportunity
- if err := xml.Unmarshal(data, &opp); err != nil {
- log.Error(logger, "Error parsing opportunity from XML", err)
+ log.Error(logger, "Error decoding S3 object XML to record", err)
return err
}
- return processOpportunity(ctx, dynamodbsvc, opp)
+ return processGrantRecord(ctx, dynamodbsvc, record)
})
}(record)
}
@@ -84,13 +71,41 @@ func handleS3EventWithConfig(s3svc *s3.Client, dynamodbsvc DynamoDBUpdateItemAPI
return nil
}
+// decodeNextGrantRecord reads XML from r until a grantRecord can be decoded or EOF is reached.
+// It stops reading and returns the grantRecord as soon as one is decoded.
+// If there is an error reading or unmarshalling XML, it returns a nil grantRecord and the encountered error.
+func decodeNextGrantRecord(r io.Reader) (grantRecord, error) {
+ d := xml.NewDecoder(r)
+ for {
+ token, err := d.Token()
+ if err != nil {
+ return nil, err
+ }
+ if se, ok := token.(xml.StartElement); ok {
+ if se.Name.Local == "OpportunitySynopsisDetail_1_0" {
+ var o opportunity
+ err := d.DecodeElement(&o, &se)
+ return o, err
+ }
+ if se.Name.Local == "OpportunityForecastDetail_1_0" {
+ f := forecast{}
+ err := d.DecodeElement(&f, &se)
+ return f, err
+ }
+ }
+ }
+}
+
// processOpportunity takes a single opportunity and uploads an XML representation of the
// opportunity to its configured DynamoDB table.
-func processOpportunity(ctx context.Context, svc DynamoDBUpdateItemAPI, opp opportunity) error {
- logger := log.With(logger,
- "opportunity_id", opp.OpportunityID, "opportunity_number", opp.OpportunityNumber)
+func processGrantRecord(ctx context.Context, svc DynamoDBUpdateItemAPI, rec grantRecord) error {
+ logger := rec.logWith(logger)
- if err := UpdateDynamoDBItem(ctx, svc, env.DestinationTable, opp); err != nil {
+ itemAttrs, err := rec.dynamoDBAttributeMap()
+ if err != nil {
+ return log.Errorf(logger, "Error marshaling grantRecord to DynamoDB attributes map", err)
+ }
+ if err := UpdateDynamoDBItem(ctx, svc, env.DestinationTable, rec.dynamoDBItemKey(), itemAttrs); err != nil {
var conditionalCheckErr *types.ConditionalCheckFailedException
if errors.As(err, &conditionalCheckErr) {
log.Warn(logger, "Grants.gov data already matches the target DynamoDB item",
@@ -101,6 +116,6 @@ func processOpportunity(ctx context.Context, svc DynamoDBUpdateItemAPI, opp oppo
}
log.Info(logger, "Successfully uploaded opportunity")
- sendMetric("opportunity.saved", 1)
+ sendMetric("record.saved", 1)
return nil
}
diff --git a/cmd/PersistGrantsGovXMLDB/handler_test.go b/cmd/PersistGrantsGovXMLDB/handler_test.go
index 71b51f37..87fd67b8 100644
--- a/cmd/PersistGrantsGovXMLDB/handler_test.go
+++ b/cmd/PersistGrantsGovXMLDB/handler_test.go
@@ -5,8 +5,10 @@ import (
"context"
"crypto/tls"
"fmt"
+ "io"
"net/http"
"net/http/httptest"
+ "strings"
"testing"
"text/template"
"time"
@@ -16,6 +18,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
+ "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/aws/aws-sdk-go-v2/service/s3"
@@ -87,7 +90,7 @@ const SOURCE_OPPORTUNITY_TEMPLATE = `
Fun Grant
ABCD-1234
Some Category
- Clarinet
+ CA
My Funding Category
Meow meow meow
1234.567
@@ -112,6 +115,52 @@ const SOURCE_OPPORTUNITY_TEMPLATE = `
`
+const SOURCE_FORECAST_TEMPLATE = `
+
+ {{.OpportunityID}}
+ Fun Grant
+ ABCD-1234
+ D
+ CA
+ My Funding Category
+ 93.322
+ 12
+ 13
+ 22
+ This is some additional information on eligibility.
+ TEST-AC
+ Bureau of Testing
+ 09082029
+ {{.LastUpdatedDate}}
+ 09082029
+ 2029
+ 09082030
+ Electronically submitted applications must be submitted no later than 11:59 p.m., ET, on the listed application due date.
+ 09082035
+ 09082031
+ 0
+ 0
+ 60000
+ 10
+ Here is a description.
+ Forecast 1
+ No
+ 09092030
+ test@example.gov
+ Inquiries
+ Tester Person, Bureau of Testing, Office of Stuff <br/>
+ (555) 555-1234
+
+`
+
+var (
+ opportunityTemplate = template.Must(
+ template.New("xml").Delims("{{", "}}").Parse(SOURCE_OPPORTUNITY_TEMPLATE))
+ forecastTemplate = template.Must(
+ template.New("xml").Delims("{{", "}}").Parse(SOURCE_FORECAST_TEMPLATE),
+ )
+)
+
func TestLambdaInvocationScenarios(t *testing.T) {
t.Run("Missing source object", func(t *testing.T) {
setupLambdaEnvForTesting(t)
@@ -119,12 +168,9 @@ func TestLambdaInvocationScenarios(t *testing.T) {
sourceBucketName := "test-source-bucket"
s3Client, err := setupS3ForTesting(t, sourceBucketName)
require.NoError(t, err)
- sourceTemplate := template.Must(
- template.New("xml").Delims("{{", "}}").Parse(SOURCE_OPPORTUNITY_TEMPLATE),
- )
var sourceData bytes.Buffer
require.NoError(t, err)
- require.NoError(t, sourceTemplate.Execute(&sourceData, map[string]string{
+ require.NoError(t, opportunityTemplate.Execute(&sourceData, map[string]string{
"OpportunityID": "123456",
"LastUpdatedDate": "01022023",
}))
@@ -162,6 +208,37 @@ func TestLambdaInvocationScenarios(t *testing.T) {
}
})
+ t.Run("Empty XML stream", func(t *testing.T) {
+ setupLambdaEnvForTesting(t)
+
+ sourceBucketName := "test-source-bucket"
+ s3Client, err := setupS3ForTesting(t, sourceBucketName)
+ require.NoError(t, err)
+ require.NoError(t, err)
+ _, err = s3Client.PutObject(context.TODO(), &s3.PutObjectInput{
+ Bucket: aws.String(sourceBucketName),
+ Key: aws.String("123/123456/grants.gov/v2.xml"),
+ Body: bytes.NewReader([]byte{}),
+ })
+ require.NoError(t, err)
+ dynamodbClient := mockDynamoDBUpdateItemAPI{
+ mockUpdateItemAPI(func(context.Context, *dynamodb.UpdateItemInput, ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) {
+ t.Helper()
+ require.Fail(t, "UpdateItem called unexpectedly")
+ return nil, nil
+ }),
+ }
+ err = handleS3EventWithConfig(s3Client, dynamodbClient, context.TODO(), events.S3Event{
+ Records: []events.S3EventRecord{
+ {S3: events.S3Entity{
+ Bucket: events.S3Bucket{Name: sourceBucketName},
+ Object: events.S3Object{Key: "123/123456/grants.gov/v2.xml"},
+ }},
+ },
+ })
+ assert.ErrorIs(t, err, io.EOF)
+ })
+
t.Run("Context canceled during invocation", func(t *testing.T) {
setupLambdaEnvForTesting(t)
s3Client, err := setupS3ForTesting(t, "source-bucket")
@@ -195,9 +272,173 @@ func TestLambdaInvocationScenarios(t *testing.T) {
require.Fail(t, "Invocation error could not be interpreted as *multierror.Error")
}
})
+
+ t.Run("Decodes and persists", func(t *testing.T) {
+ for _, tt := range []struct {
+ grantRecordType string
+ xmlTemplate *template.Template
+ }{
+ {"opportunity", opportunityTemplate},
+ {"forecast", forecastTemplate},
+ } {
+ t.Run(tt.grantRecordType, func(t *testing.T) {
+
+ setupLambdaEnvForTesting(t)
+
+ sourceBucketName := "test-source-bucket"
+ s3Client, err := setupS3ForTesting(t, sourceBucketName)
+ require.NoError(t, err)
+ var sourceData bytes.Buffer
+ require.NoError(t, err)
+ require.NoError(t, tt.xmlTemplate.Execute(&sourceData, map[string]string{
+ "OpportunityID": "123456",
+ "LastUpdatedDate": "01022023",
+ }))
+ require.NoError(t, err)
+ _, err = s3Client.PutObject(context.TODO(), &s3.PutObjectInput{
+ Bucket: aws.String(sourceBucketName),
+ Key: aws.String("123/123456/grants.gov/v2.xml"),
+ Body: bytes.NewReader(sourceData.Bytes()),
+ })
+ require.NoError(t, err)
+ dynamodbClient := mockDynamoDBUpdateItemAPI{
+ mockUpdateItemAPI(func(ctx context.Context, params *dynamodb.UpdateItemInput, opts ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) {
+ t.Helper()
+
+ // Check expected item key
+ var itemKey string
+ assert.NoError(t, attributevalue.Unmarshal(params.Key["grant_id"], &itemKey))
+ assert.Equal(t, "123456", itemKey)
+
+ // Check expected `is_forecast` attribute value
+ attrValuePlaceholder := findDDBExpressionAttributePlaceholder(t,
+ "is_forecast", params.ExpressionAttributeNames)
+ attrValue := params.ExpressionAttributeValues[attrValuePlaceholder]
+ var itemIsForecast bool
+ assert.NoError(t, attributevalue.Unmarshal(attrValue, &itemIsForecast))
+ if tt.grantRecordType == "opportunity" {
+ assert.False(t, itemIsForecast)
+ } else if tt.grantRecordType == "forecast" {
+ assert.True(t, itemIsForecast)
+ } else {
+ require.Fail(t, "Cannot test an unrecognized grantRecord type",
+ "Expected grantRecord type %q or %q but received %q",
+ "opportunity", "forecast", tt.grantRecordType)
+ }
+ return nil, nil
+ }),
+ }
+ err = handleS3EventWithConfig(s3Client, dynamodbClient, context.TODO(), events.S3Event{
+ Records: []events.S3EventRecord{{S3: events.S3Entity{
+ Bucket: events.S3Bucket{Name: sourceBucketName},
+ Object: events.S3Object{Key: "123/123456/grants.gov/v2.xml"},
+ }}},
+ })
+ require.NoError(t, err)
+ })
+ }
+ })
+}
+
+// DynamoDB maps attribute names and corresponding values to placeholders.
+// e.g. `{"#13": "some_attribute_name"}` corresponds to `{":13": true}` means `some_attribute_name = true`.
+// Note that the placeholder numbers are assigned in arbitrary order.
+// This helper identifies the name placeholder that corresponds to the targetName attribute
+// and converts it to a value placeholder by replacing `#` with `:`.
+func findDDBExpressionAttributePlaceholder(t *testing.T, targetName string, expressionAttributeNames map[string]string) string {
+ t.Helper()
+ for namePlaceholder, attrName := range expressionAttributeNames {
+ if attrName == targetName {
+ return strings.Replace(namePlaceholder, "#", ":", 1)
+ }
+ }
+ require.Failf(t, "Failed to locate target attribute in DynamoDB expression attribute names mapping",
+ "Could not find %q in the following name placeholders map: %v",
+ targetName, expressionAttributeNames)
+ return ""
+}
+
+func TestDecodeNextGrantRecord(t *testing.T) {
+ getOpportunityXML := func(OpportunityID, LastUpdatedDate string) *bytes.Buffer {
+ t.Helper()
+ data := &bytes.Buffer{}
+ require.NoError(t, opportunityTemplate.Execute(data, map[string]string{
+ "OpportunityID": OpportunityID,
+ "LastUpdatedDate": LastUpdatedDate,
+ }), "Unexpected error generating opportunity XML data during test setup")
+ return data
+ }
+ getForecastXML := func(OpportunityID, LastUpdatedDate string) *bytes.Buffer {
+ t.Helper()
+ data := &bytes.Buffer{}
+ require.NoError(t, forecastTemplate.Execute(data, map[string]string{
+ "OpportunityID": OpportunityID,
+ "LastUpdatedDate": LastUpdatedDate,
+ }), "Unexpected error generating forecast XML data during test setup")
+ return data
+ }
+ testDateString := time.Now().Format(grantsgov.TimeLayoutMMDDYYYYType)
+
+ t.Run("Decodes opportunity grantRecord from XML", func(t *testing.T) {
+ o, err := decodeNextGrantRecord(getOpportunityXML("12345", testDateString))
+ assert.NoError(t, err)
+ if assert.IsType(t, opportunity{}, o) {
+ assert.Equal(t, "12345", string(o.(opportunity).OpportunityID),
+ "opportunity has unexpected OpportunityID")
+ }
+ })
+
+ t.Run("Decodes forecast grantRecord from XML", func(t *testing.T) {
+ f, err := decodeNextGrantRecord(getForecastXML("12345", testDateString))
+ assert.NoError(t, err)
+ if assert.IsType(t, forecast{}, f) {
+ assert.Equal(t, "12345", string(f.(forecast).OpportunityID),
+ "forecast has unexpected OpportunityID")
+ }
+ })
+
+ t.Run("Stops reading after decoding next grantRecord or EOF", func(t *testing.T) {
+ r := io.MultiReader(
+ getOpportunityXML("12345", testDateString),
+ getForecastXML("56789", testDateString),
+ getForecastXML("13579", testDateString),
+ getOpportunityXML("24680", testDateString),
+ )
+ o1, err := decodeNextGrantRecord(r)
+ assert.NoError(t, err)
+ if assert.IsType(t, opportunity{}, o1) {
+ assert.Equal(t, "12345", string(o1.(opportunity).OpportunityID),
+ "opportunity has unexpected OpportunityID")
+ }
+
+ f1, err := decodeNextGrantRecord(r)
+ assert.NoError(t, err)
+ if assert.IsType(t, forecast{}, f1) {
+ assert.Equal(t, "56789", string(f1.(forecast).OpportunityID),
+ "forecast has unexpected OpportunityID")
+ }
+
+ f2, err := decodeNextGrantRecord(r)
+ assert.NoError(t, err)
+ if assert.IsType(t, forecast{}, f2) {
+ assert.Equal(t, "13579", string(f2.(forecast).OpportunityID),
+ "forecast has unexpected OpportunityID")
+ }
+
+ o2, err := decodeNextGrantRecord(r)
+ assert.NoError(t, err)
+ if assert.IsType(t, opportunity{}, o2) {
+ assert.Equal(t, "24680", string(o2.(opportunity).OpportunityID),
+ "opportunity has unexpected OpportunityID")
+ }
+
+ finalRV, err := decodeNextGrantRecord(r)
+ assert.Nil(t, finalRV)
+ assert.ErrorIs(t, err, io.EOF)
+ })
}
-func TestProcessOpportunity(t *testing.T) {
+func TestProcessGrantRecord(t *testing.T) {
now := time.Now()
testOpportunity := opportunity{
OpportunityID: "123456",
@@ -212,7 +453,7 @@ func TestProcessOpportunity(t *testing.T) {
return nil, fmt.Errorf("some UpdateItem error")
}),
}
- err := processOpportunity(context.TODO(), dynamodbClient, testOpportunity)
+ err := processGrantRecord(context.TODO(), dynamodbClient, testOpportunity)
assert.ErrorContains(t, err, "Error uploading prepared grant opportunity to DynamoDB")
})
@@ -227,6 +468,6 @@ func TestProcessOpportunity(t *testing.T) {
return nil, err
}),
}
- assert.NoError(t, processOpportunity(context.TODO(), dynamodbClient, testOpportunity))
+ assert.NoError(t, processGrantRecord(context.TODO(), dynamodbClient, testOpportunity))
})
}
diff --git a/cmd/PersistGrantsGovXMLDB/types.go b/cmd/PersistGrantsGovXMLDB/types.go
new file mode 100644
index 00000000..57af0558
--- /dev/null
+++ b/cmd/PersistGrantsGovXMLDB/types.go
@@ -0,0 +1,70 @@
+package main
+
+import (
+ "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
+ ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
+ "github.com/usdigitalresponse/grants-ingest/internal/log"
+ grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov"
+)
+
+const (
+ GRANT_OPPORTUNITY_XML_NAME = "OpportunitySynopsisDetail_1_0"
+ GRANT_FORECAST_XML_NAME = "OpportunityForecastDetail_1_0"
+)
+
+type grantRecord interface {
+ logWith(log.Logger) log.Logger
+ dynamoDBItemKey() map[string]ddbtypes.AttributeValue
+ // Marshalls the grantRecord contents to a map of DynamoDB item attributes,
+ // which should contain an additional `is_forcast` discriminator field that can be used
+ // to differentiate between `opportunity` and `forecast` source record types.
+ dynamoDBAttributeMap() (map[string]ddbtypes.AttributeValue, error)
+}
+
+type opportunity grantsgov.OpportunitySynopsisDetail_1_0
+
+func (o opportunity) logWith(logger log.Logger) log.Logger {
+ return log.With(logger,
+ "opportunity_id", o.OpportunityID,
+ "opportunity_number", o.OpportunityNumber,
+ "is_forecast", false,
+ )
+}
+
+func (o opportunity) dynamoDBItemKey() map[string]ddbtypes.AttributeValue {
+ return map[string]ddbtypes.AttributeValue{
+ "grant_id": &ddbtypes.AttributeValueMemberS{Value: string(o.OpportunityID)},
+ }
+}
+
+func (o opportunity) dynamoDBAttributeMap() (map[string]ddbtypes.AttributeValue, error) {
+ m, err := attributevalue.MarshalMap(o)
+ if m != nil {
+ m["is_forecast"] = &ddbtypes.AttributeValueMemberBOOL{Value: false}
+ }
+ return m, err
+}
+
+type forecast grantsgov.OpportunityForecastDetail_1_0
+
+func (f forecast) logWith(logger log.Logger) log.Logger {
+ return log.With(logger,
+ "opportunity_id", f.OpportunityID,
+ "opportunity_number", f.OpportunityNumber,
+ "is_forecast", true,
+ )
+}
+
+func (f forecast) dynamoDBItemKey() map[string]ddbtypes.AttributeValue {
+ return map[string]ddbtypes.AttributeValue{
+ "grant_id": &ddbtypes.AttributeValueMemberS{Value: string(f.OpportunityID)},
+ }
+}
+
+func (f forecast) dynamoDBAttributeMap() (map[string]ddbtypes.AttributeValue, error) {
+ m, err := attributevalue.MarshalMap(f)
+ if m != nil {
+ m["is_forecast"] = &ddbtypes.AttributeValueMemberBOOL{Value: true}
+ }
+ return m, err
+}
diff --git a/cmd/PersistGrantsGovXMLDB/types_test.go b/cmd/PersistGrantsGovXMLDB/types_test.go
new file mode 100644
index 00000000..3f20d460
--- /dev/null
+++ b/cmd/PersistGrantsGovXMLDB/types_test.go
@@ -0,0 +1,54 @@
+package main
+
+import (
+ "reflect"
+ "testing"
+
+ "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestGrantRecordToDDBConversions(t *testing.T) {
+ for _, tt := range []struct {
+ record grantRecord
+ expectedGrantIDItemKeyValue string
+ expectedIsForecastAttributeValue bool
+ }{
+ {
+ opportunity{OpportunityID: "1234", OpportunityNumber: "ABC-1234"},
+ "1234",
+ false,
+ },
+ {
+ forecast{OpportunityID: "9876", OpportunityNumber: "ZYX-9876"},
+ "9876",
+ true,
+ },
+ } {
+ t.Run(reflect.TypeOf(tt.record).Name(), func(t *testing.T) {
+ t.Run("grant_id item key", func(t *testing.T) {
+ itemKeyMap := tt.record.dynamoDBItemKey()
+ attr, exists := itemKeyMap["grant_id"]
+ require.True(t, exists, "Missing grant_id in DynamoDB item key structure")
+ var grantId string
+ require.NoError(t, attributevalue.Unmarshal(attr, &grantId),
+ "Unexpected error unmarshaling value for grant_id key")
+ assert.Equal(t, tt.expectedGrantIDItemKeyValue, grantId,
+ "Unexpected value for DynamoDB grant_id item key")
+ })
+
+ t.Run("is_forecast attribute", func(t *testing.T) {
+ attrMap, err := tt.record.dynamoDBAttributeMap()
+ require.NoError(t, err, "Unexpected error getting DDB attribute value map")
+ attr, exists := attrMap["is_forecast"]
+ require.True(t, exists, "Missing is_forecast attribute in DynamoDB item")
+ var isForecast bool
+ require.NoError(t, attributevalue.Unmarshal(attr, &isForecast),
+ "Unexpected error unmarshaling value for is_forecast attribute")
+ assert.Equal(t, tt.expectedIsForecastAttributeValue, isForecast,
+ "Unexpected value for DynamoDB is_forecast attribute")
+ })
+ })
+ }
+}
diff --git a/terraform/datadog_dashboard.tf b/terraform/datadog_dashboard.tf
index 75c7bd0b..a8ef83e8 100644
--- a/terraform/datadog_dashboard.tf
+++ b/terraform/datadog_dashboard.tf
@@ -978,10 +978,10 @@ resource "datadog_dashboard" "service_dashboard" {
}
}
- // Shows counts of opportunities saved vs failed
+ // Shows counts of records saved vs failed
widget {
timeseries_definition {
- title = "Opportunity Records Saved vs Failed"
+ title = "Grant Records Saved vs Failed"
show_legend = true
legend_layout = "horizontal"
legend_columns = ["sum"]
@@ -995,7 +995,7 @@ resource "datadog_dashboard" "service_dashboard" {
display_type = "bars"
formula {
- formula_expression = "records_saved"
+ formula_expression = "opportunities_saved + records_saved"
alias = "Saved"
style {
palette = "cool"
@@ -1003,14 +1003,21 @@ resource "datadog_dashboard" "service_dashboard" {
}
}
query {
+ // Legacy metric (before Forecasted grants were in the pipeline)
metric_query {
- name = "records_saved"
+ name = "opportunities_saved"
query = "sum:grants_ingest.PersistGrantsGovXMLDB.opportunity.saved{$env,$service,$version}.as_count()"
}
}
+ query {
+ metric_query {
+ name = "records_saved"
+ query = "sum:grants_ingest.PersistGrantsGovXMLDB.record.saved{$env,$service,$version}.as_count()"
+ }
+ }
formula {
- formula_expression = "records_failed"
+ formula_expression = "opportunities_failed + records_failed"
alias = "Failed"
style {
palette = "warm"
@@ -1018,11 +1025,18 @@ resource "datadog_dashboard" "service_dashboard" {
}
}
query {
+ // Legacy metric (before Forecasted grants were in the pipeline)
metric_query {
- name = "records_failed"
+ name = "opportunities_failed"
query = "sum:grants_ingest.PersistGrantsGovXMLDB.opportunity.failed{$env,$service,$version}.as_count()"
}
}
+ query {
+ metric_query {
+ name = "records_failed"
+ query = "sum:grants_ingest.PersistGrantsGovXMLDB.record.failed{$env,$service,$version}.as_count()"
+ }
+ }
}
}
widget_layout {
diff --git a/terraform/main.tf b/terraform/main.tf
index 3250df6e..e05bef11 100644
--- a/terraform/main.tf
+++ b/terraform/main.tf
@@ -450,6 +450,12 @@ resource "aws_s3_bucket_notification" "grant_prepared_data" {
filter_suffix = "/grants.gov/v2.OpportunitySynopsisDetail_1_0.xml"
}
+ lambda_function {
+ lambda_function_arn = module.PersistGrantsGovXMLDB.lambda_function_arn
+ events = ["s3:ObjectCreated:*"]
+ filter_suffix = "/grants.gov/v2.OpportunityForecastDetail_1_0.xml"
+ }
+
lambda_function {
lambda_function_arn = module.PersistFFISData.lambda_function_arn
events = ["s3:ObjectCreated:*"]
diff --git a/terraform/staging.tfvars b/terraform/staging.tfvars
index 2e19bcd4..a232ec0f 100644
--- a/terraform/staging.tfvars
+++ b/terraform/staging.tfvars
@@ -49,15 +49,15 @@ datadog_metrics_metadata = {
unit = "record"
}
- "PersistGrantsGovXMLDB.opportunity.saved" = {
- short_name = "Saved opportunities"
- description = "Count of opportunity records persisted to DynamoDB with Grants.gov data."
+ "PersistGrantsGovXMLDB.record.saved" = {
+ short_name = "Saved grant records"
+ description = "Count of grant records persisted to DynamoDB with Grants.gov data."
unit = "record"
}
- "PersistGrantsGovXMLDB.opportunity.failed" = {
- short_name = "Failed opportunities"
- description = "Count of opportunity records that failed to be persisted to DynamoDB with Grants.gov data."
+ "PersistGrantsGovXMLDB.record.failed" = {
+ short_name = "Failed grant records"
+ description = "Count of grant records that failed to be persisted to DynamoDB with Grants.gov data."
unit = "record"
}