From 52f2da2f142a9cb0c17239283ab52c9fb563483f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mark=20Lis=C3=A9?= Date: Mon, 30 Oct 2023 11:43:24 -0700 Subject: [PATCH] 233: Hookup A&R to Data Register. (#267) * 233: Hookup A&R to Data Register. * Remove useless declaration. * Update const. * Bump readme. * Fix UT --------- Co-authored-by: Daniel Truong --- README.md | 1 - __tests__/global/data.json | 6 +- __tests__/global/settings.js | 4 +- __tests__/global/setup.js | 51 ++++++- __tests__/global/teardown.js | 7 +- __tests__/name-update.test.js | 150 ++++++++++++++++++++ lambda/dynamoUtil.js | 4 + lambda/nameUpdate/index.js | 256 ++++++++++++++++++++++++++++++++++ serverless.yml | 28 ++++ terraform/src/db.tf | 25 ++++ terraform/src/nameUpdate.tf | 48 +++++++ terraform/src/variables.tf | 8 ++ 12 files changed, 580 insertions(+), 8 deletions(-) create mode 100644 __tests__/name-update.test.js create mode 100644 lambda/nameUpdate/index.js create mode 100644 terraform/src/nameUpdate.tf diff --git a/README.md b/README.md index 209a361..173215b 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,6 @@ # BC Parks Attendance & Revenue - API ![Lifecycle:Maturing](https://img.shields.io/badge/Lifecycle-Maturing-007EC6) [![Maintainability Rating](https://sonarcloud.io/api/project_badges/measure?project=bcgov_bcparks-ar-api&metric=sqale_rating)](https://sonarcloud.io/summary/new_code?id=bcgov_bcparks-ar-api) - # Introduction This repository consists of the back end code for the BC Parks Attendance & Revenue system (A&R) API. A&R helps Park Operators, BC Parks, and the BC Government track important statistical information to help guide budget allowances and any maintenance that needs to be done to parks. diff --git a/__tests__/global/data.json b/__tests__/global/data.json index de4871a..3871bea 100644 --- a/__tests__/global/data.json +++ b/__tests__/global/data.json @@ -50,7 +50,7 @@ "Frontcountry Camping", "Day Use" ], - "orcs": 41, + "orcs": "41", "subAreaName": "Maple Bay" } ], @@ -63,7 +63,7 @@ "Day Use" ], "parkName": "Cultus Lake Park", - "orcs": 41, + "orcs": "41", "subAreaName": "Maple Bay", "roles": [ "sysadmin", @@ -77,7 +77,7 @@ "Day Use" ], "parkName": "Cultus Lake Park", - "orcs": 41, + "orcs": "41", "subAreaName": "Teapot Hill", "roles": [ "sysadmin", diff --git a/__tests__/global/settings.js b/__tests__/global/settings.js index 856e8f5..7d79337 100644 --- a/__tests__/global/settings.js +++ b/__tests__/global/settings.js @@ -2,10 +2,12 @@ const REGION = process.env.AWS_REGION || 'local-env'; const ENDPOINT = 'http://localhost:8000'; const TABLE_NAME = process.env.TABLE_NAME || 'ar-tests'; const CONFIG_TABLE_NAME = process.env.CONFIG_TABLE_NAME || 'ar-config'; +const NAME_CACHE_TABLE_NAME = process.env.NAME_CACHE_TABLE_NAME || 'name-cache'; module.exports = { REGION, ENDPOINT, TABLE_NAME, - CONFIG_TABLE_NAME + CONFIG_TABLE_NAME, + NAME_CACHE_TABLE_NAME }; diff --git a/__tests__/global/setup.js b/__tests__/global/setup.js index 8fc6e10..31fe878 100644 --- a/__tests__/global/setup.js +++ b/__tests__/global/setup.js @@ -1,6 +1,6 @@ const AWS = require('aws-sdk'); -const { REGION, ENDPOINT, TABLE_NAME, CONFIG_TABLE_NAME } = require('./settings'); +const { REGION, ENDPOINT, TABLE_NAME, CONFIG_TABLE_NAME, NAME_CACHE_TABLE_NAME } = require('./settings'); const { logger } = require('../../lambda/logger'); module.exports = async () => { @@ -12,7 +12,8 @@ module.exports = async () => { // TODO: This should pull in the JSON version of our serverless.yml! try { - let res = await dynamoDb + console.log("Creating main table."); + await dynamoDb .createTable({ TableName: TABLE_NAME, KeySchema: [ @@ -33,6 +34,51 @@ module.exports = async () => { { AttributeName: 'sk', AttributeType: 'S' + }, + { + AttributeName: 'orcs', + AttributeType: 'S' + } + ], + ProvisionedThroughput: { + ReadCapacityUnits: 1, + WriteCapacityUnits: 1 + }, + GlobalSecondaryIndexes: [ + { + IndexName: 'orcs-index', + KeySchema: [ + { + AttributeName: 'orcs', + KeyType: 'HASH' + } + ], + Projection: { + ProjectionType: 'ALL' + }, + ProvisionedThroughput: { + ReadCapacityUnits: 1, + WriteCapacityUnits: 1 + } + } + ] + }) + .promise(); + + console.log("Creating name-cache table."); + await dynamoDb + .createTable({ + TableName: NAME_CACHE_TABLE_NAME, + KeySchema: [ + { + AttributeName: 'pk', + KeyType: 'HASH' + } + ], + AttributeDefinitions: [ + { + AttributeName: 'pk', + AttributeType: 'S' } ], ProvisionedThroughput: { @@ -42,6 +88,7 @@ module.exports = async () => { }) .promise(); + console.log("Creating config table."); await dynamoDb .createTable({ TableName: CONFIG_TABLE_NAME, diff --git a/__tests__/global/teardown.js b/__tests__/global/teardown.js index 218910c..ef76ac6 100644 --- a/__tests__/global/teardown.js +++ b/__tests__/global/teardown.js @@ -1,6 +1,6 @@ const AWS = require('aws-sdk'); -const { REGION, ENDPOINT, TABLE_NAME, CONFIG_TABLE_NAME } = require('./settings'); +const { REGION, ENDPOINT, TABLE_NAME, CONFIG_TABLE_NAME, NAME_CACHE_TABLE_NAME } = require('./settings'); const { logger } = require('../../lambda/logger'); module.exports = async () => { @@ -15,6 +15,11 @@ module.exports = async () => { TableName: TABLE_NAME }) .promise(); + await dynamoDb + .deleteTable({ + TableName: NAME_CACHE_TABLE_NAME + }) + .promise(); await dynamoDb .deleteTable({ TableName: CONFIG_TABLE_NAME diff --git a/__tests__/name-update.test.js b/__tests__/name-update.test.js new file mode 100644 index 0000000..8879eaf --- /dev/null +++ b/__tests__/name-update.test.js @@ -0,0 +1,150 @@ +const AWS = require("aws-sdk"); +const { DocumentClient } = require("aws-sdk/clients/dynamodb"); +const { REGION, ENDPOINT, TABLE_NAME, NAME_CACHE_TABLE_NAME } = require("./global/settings"); +const docClient = new DocumentClient({ + region: REGION, + endpoint: ENDPOINT, + convertEmptyValues: true, +}); +async function setupDb() { + // Insert a document for the handler to now find and update. + await docClient + .put({ + TableName: TABLE_NAME, + Item: { + "pk": "0673::Backcountry Cabins", + "sk": "201702", + "activity": "Backcountry Cabins", + "date": "201702", + "isLegacy": true, + "isLocked": true, + "lastUpdated": "2023-04-04T21:32:34.317Z", + "legacyData": { + "legacy_backcountryCabinsNetRevenue": 0, + "legacy_backcountryCabinsTotalAttendancePeople": 0 + }, + "orcs": "1", + "parkName": "Strathcona Park", + "subAreaId": "0673" + } + }) + .promise(); +} + +describe("Name Update Tests", () => { + const OLD_ENV = process.env; + beforeEach(async () => { + jest.resetModules(); + process.env = { ...OLD_ENV }; // Make a copy of environment + }); + + afterEach(() => { + process.env = OLD_ENV; // Restore old environment + }); + + beforeAll(async () => { + return await setupDb(); + }); + + test("updateLocalCache", async () => { + const axios = require('axios'); + jest.mock("axios"); + axios.get.mockImplementation( + () => Promise.resolve({ + statusCode: 200, + data: { + data: { + items: [ + { + updateDate: '2023-09-18T16:54:12.574Z', + displayName: 'Strathcona Park', + createDate: '2023-09-15T17:13:14.633Z', + status: 'current', + sk: 'Details', + pk: '1', + effectiveDate: '1911-03-01', + legalName: 'Strathcona Park', + phoneticName: '' + } + ] + } + } + }) + ); + + const nameUpdateHandler = require("../lambda/nameUpdate/index"); + + // Cached document keys + const CACHED_DOCUMENT = { + TableName: NAME_CACHE_TABLE_NAME, + Key: { + pk: "1", + }, + }; + + // AR document key + const AR_DOCUMENT_KEY = { + pk: "0673::Backcountry Cabins", + sk: "201702" + }; + + // Ensure this doesn't exist yet. + const notFoundDoc = await docClient.get(CACHED_DOCUMENT).promise(); + expect(notFoundDoc).toStrictEqual({}); + + // Call the handler, it will have cache-miss + await nameUpdateHandler.handler({}, null); + + // Expect the cache to be updated. + const doc = await docClient.get(CACHED_DOCUMENT).promise(); + expect(doc.Item.pk).toBe("1"); + + // Change the last cached item to be different in order to trigger a displayName + // change on the handler. + const params = { + TableName: NAME_CACHE_TABLE_NAME, + Key: { pk: '1' }, + UpdateExpression: 'set displayName =:displayName', + ExpressionAttributeValues: { + ':displayName': 'some other park name' + } + }; + await docClient.update(params).promise(); + const cachedDocumentSet = await docClient.get(CACHED_DOCUMENT).promise(); + expect(cachedDocumentSet.Item.displayName).toBe('some other park name'); + + // Also update the backcountry cabin record in the main table + const params2 = { + TableName: TABLE_NAME, + Key: AR_DOCUMENT_KEY, + UpdateExpression: 'set parkName =:parkName', + ExpressionAttributeValues: { + ':parkName': 'some other park name' + } + }; + await docClient.update(params2).promise(); + const arDocumentSetParkName = await docClient.get({ + TableName: TABLE_NAME, + Key: AR_DOCUMENT_KEY, + }).promise(); + + expect(arDocumentSetParkName.Item.parkName).toBe('some other park name'); + + // Run the update + await nameUpdateHandler.handler({}, null); + + // Fetch the updated cache and check that it has been udpated + const cachedDocument = await docClient.get(CACHED_DOCUMENT).promise(); + + // Ensure it was updated + expect(cachedDocument.Item.displayName).toBe('Strathcona Park'); + + // Fetch the updated AR document and check that it has been udpated + const arDocument = await docClient.get({ + TableName: TABLE_NAME, + Key: AR_DOCUMENT_KEY, + }).promise(); + + expect(arDocument.Item.parkName).toBe('Strathcona Park'); + }); +}); diff --git a/lambda/dynamoUtil.js b/lambda/dynamoUtil.js index 4daae64..51bdd0a 100644 --- a/lambda/dynamoUtil.js +++ b/lambda/dynamoUtil.js @@ -2,6 +2,8 @@ const AWS = require("aws-sdk"); const { logger } = require("./logger"); const TABLE_NAME = process.env.TABLE_NAME || "ar-tests"; +const ORCS_INDEX = process.env.ORCS_INDEX || "orcs-index"; +const NAME_CACHE_TABLE_NAME = process.env.NAME_CACHE_TABLE_NAME || "name-cache"; const CONFIG_TABLE_NAME = process.env.CONFIG_TABLE_NAME || "ar-config"; const options = { region: "ca-central-1", @@ -234,6 +236,8 @@ module.exports = { PASS_TYPE_EXPIRY_HOURS, FISCAL_YEAR_FINAL_MONTH, TABLE_NAME, + ORCS_INDEX, + NAME_CACHE_TABLE_NAME, dynamodb, runQuery, runScan, diff --git a/lambda/nameUpdate/index.js b/lambda/nameUpdate/index.js new file mode 100644 index 0000000..f45785c --- /dev/null +++ b/lambda/nameUpdate/index.js @@ -0,0 +1,256 @@ +const axios = require('axios'); +const AWS = require("aws-sdk"); +const { runQuery, runScan, NAME_CACHE_TABLE_NAME, TABLE_NAME, ORCS_INDEX, dynamodb } = require("../dynamoUtil"); +const { logger } = require('../logger'); +const DATA_REGISTER_NAME_ENDPOINT = process.env.DATA_REGISTER_NAME_ENDPOINT || 'https://zloys5cfvf.execute-api.ca-central-1.amazonaws.com/api/parks/names?status=current'; + +exports.handler = async (event, context) => { + logger.info('Name Update') + logger.debug(event, context); + + try { + // Get list of park names from the data register + const response = await getDataRegisterRecords(); + const dataRegisterRecords = response.data?.data?.items; + logger.info("Data Register records size:", dataRegisterRecords.length); + + // Get last cached list, if any, from our DB + const cachedRecords = await getCachedData(); + if (cachedRecords.length === 0) { + logger.info("No cached data"); + + // Get a list of records where attributes orcs/parkName are present on a per-record basis. + logger.debug(dataRegisterRecords) + await updateAllRecords(dataRegisterRecords); + + // Store the cache for next time. + logger.info("Storing data register records in the cache."); + await batchWriteCache(dataRegisterRecords); + + return {}; + } + logger.info("Cached data found.", cachedRecords.length); + + // Start the process of determining which records have changed since last time we called this. + // Compare against the last time we check, and determine if there are any name changes required. + const { differences, newItems } = compareArraysByDisplayName(cachedRecords, dataRegisterRecords); + + if (differences.length > 0) { + logger.info(`Differences found in property 'displayName'`); + for(const rec of differences) { + logger.debug(rec); + const foundObject = dataRegisterRecords.find(item => item.pk === rec.pk); + logger.debug(foundObject); + + // Update all records that match this found object in the DR + await updateAllRecords([foundObject]); + + // Update the local cache now that we have updated the records related to it + await updateLocalCache(foundObject); + + // TODO: What to do when the dataRegister record is repealed? + } + } else { + logger.info(`No differences found in property 'displayName'.`); + } + + // Simply just put them into our cache. + if (newItems.length > 0) { + logger.info(`New items found`); + for (const rec of newItems) { + logger.debug(rec); + const foundObject = dataRegisterRecords.find(item => item.pk === rec.pk); + logger.debug(foundObject); + + // Update our local cache with the difference and new items. + await updateLocalCache(foundObject); + } + } else { + logger.info(`No new items found in data register.`); + } + } catch (err) { + logger.error(JSON.stringify(err)); + } + return {}; +}; + +async function updateLocalCache(item) { + logger.info("Updating local cache"); + logger.debug(item); + const putItem = { + TableName: NAME_CACHE_TABLE_NAME, + Item: AWS.DynamoDB.Converter.marshall(item) + }; + await dynamodb.putItem(putItem).promise(); + logger.info("Update complete") +} + +async function batchWriteCache(records) { + logger.info(`writing ${Math.ceil(records.length/25)} batches for ${records.length} records`); + let batchCount = 0; + // Writes the records in batches of 25 + let batch = { RequestItems: { [NAME_CACHE_TABLE_NAME]: [] } }; + for(const record of records) { + logger.info("Writing cache"); + logger.debug(record) + // logger.info(`Processing record:`, record) + batch.RequestItems[NAME_CACHE_TABLE_NAME].push({ + PutRequest: { + Item: AWS.DynamoDB.Converter.marshall(record) + } + }); + // Check if we should write the batch + if (batch.RequestItems[NAME_CACHE_TABLE_NAME].length === 25) { + batchCount++; + // Write the current batch and reset the batch + await dynamodb.batchWriteItem(batch).promise(); + process.stdout.write(`.`); + batch.RequestItems[NAME_CACHE_TABLE_NAME] = []; + } + } + + // Write out any remaining batch items. + if (batch.RequestItems[NAME_CACHE_TABLE_NAME].length > 0) { + batchCount++; + logger.info(`writing final batch #${batchCount}`); + await dynamodb.batchWriteItem(batch).promise(); + logger.info(`Complete.`); + } +} + +async function updateAllRecords(records) { + logger.info(`Processing ${records.length} data register records.`) + // Pre-setup the update object. + let updateObj = { + TableName: TABLE_NAME, + Key: { + pk: { S: '' }, // Prefix with leading zeros for this system later + sk: { S: '' }, + }, + UpdateExpression: 'set parkName =:parkName', + ExpressionAttributeValues: { + ':parkName': { S: '' } + }, + ReturnValues: 'NONE', + }; + for(const record of records) { + logger.info("----------------------"); + logger.debug(record); + updateObj.ExpressionAttributeValues[':parkName'].S = record.displayName; + // Each record is a seperate protected area in the data register + logger.info(`Getting indexed record set for:${record.pk}`); + const recordsToUpdate = await getIndexedRecordSet(record.pk); + logger.debug(recordsToUpdate); + logger.info(`Size: ${recordsToUpdate.length}`); + if (recordsToUpdate.length > 0) { + process.stdout.write(`Orcs: ${record.pk} (${recordsToUpdate.length} records)`); + // Update all the records + await updateRecords(recordsToUpdate, updateObj); + } + } +} + +async function updateRecords(recordsToUpdate, updateObj) { + for(const record of recordsToUpdate) { + let params = JSON.parse(JSON.stringify(updateObj)) + params.Key.pk.S = record.pk; + params.Key.sk.S = record.sk; + + try { + process.stdout.write(`.`); + await dynamodb.updateItem(params).promise(); + } catch (e) { + logger.info(e); + // TODO: Fall through, but record error somehow? + } + } + logger.info(); // New line +} + +async function getIndexedRecordSet(id) { + const queryObj = { + TableName: TABLE_NAME, + IndexName: ORCS_INDEX, + ExpressionAttributeValues: { + ':orcs': { S: id } + }, + KeyConditionExpression: 'orcs =:orcs', + }; + + try { + return await runQuery(queryObj); + } catch (e) { + logger.error(e); + logger.error(JSON.stringify(e)); + return []; + } +} + +async function getCachedData() { + const queryObj = { + TableName: NAME_CACHE_TABLE_NAME + }; + + try { + return await runScan(queryObj); + } catch (e) { + logger.error(e) + logger.error(JSON.stringify(e)); + return []; + } +} + +async function getDataRegisterRecords() { + return await axios.get(encodeURI(DATA_REGISTER_NAME_ENDPOINT), + { + params: { + status: 'current' + }, + headers: { + 'Content-Type': 'application/json', + 'Authorization': 'None', + 'Accept': 'application/json' + } + }); +} + +// Function to compare two arrays of objects by the displayName property (matching on pk) +function compareArraysByDisplayName(arr1, arr2) { + const differences = []; + const newItems = []; + + // Iterate through arr1 and compare with items in arr2 + for (let i = 0; i < arr1.length; i++) { + const item1 = arr1[i]; + const item2 = arr2.find(item => item.pk === item1.pk); + + if (item2 && item1.displayName !== item2.displayName) { + differences.push({ + pk: item1.pk, + displayName: { + oldValue: item1.displayName, + newValue: item2.displayName + } + }); + } + } + + // Find items in arr2 that are not in arr1 and add them to newItems + for (let i = 0; i < arr2.length; i++) { + const item2 = arr2[i]; + const item1 = arr1.find(item => item.pk === item2.pk); + + if (!item1) { + newItems.push({ + pk: item2.pk, + displayName: item2.displayName + }); + } + } + + return { + differences, + newItems + }; +} + diff --git a/serverless.yml b/serverless.yml index 4bec8d8..36b69ce 100644 --- a/serverless.yml +++ b/serverless.yml @@ -148,6 +148,12 @@ functions: cloudwatchAlarm: handler: lambda/cloudwatchAlarm/index.handler + ########### + # Cloudwatch Alarm + ########### + nameUpdate: + handler: lambda/nameUpdate/index.handler + ########### # Export ########### @@ -246,3 +252,25 @@ resources: - AttributeName: sk KeyType: RANGE BillingMode: PAY_PER_REQUEST + GlobalSecondaryIndexes: + - IndexName: orcs-index + KeySchema: + - AttributeName: orcs + KeyType: HASH + Projection: + ProjectionType: ALL + ProvisionedThroughput: + ReadCapacityUnits: 1 + WriteCapacityUnits: 1 + nameCacheTable: + Type: "AWS::DynamoDB::Table" + DeletionPolicy: Retain + Properties: + TableName: nameCache + AttributeDefinitions: + - AttributeName: pk + AttributeType: S + KeySchema: + - AttributeName: pk + KeyType: HASH + BillingMode: PAY_PER_REQUEST diff --git a/terraform/src/db.tf b/terraform/src/db.tf index f3a9b7c..29147d5 100644 --- a/terraform/src/db.tf +++ b/terraform/src/db.tf @@ -1,3 +1,22 @@ +resource "aws_dynamodb_table" "ar_table_name_cache" { + name = "${data.aws_ssm_parameter.db_name_cache.value}-${random_string.postfix.result}" + hash_key = "pk" + billing_mode = "PAY_PER_REQUEST" + + point_in_time_recovery { + enabled = false + } + + tags = { + Name = "database-${random_string.postfix.result}" + } + + attribute { + name = "pk" + type = "S" + } +} + resource "aws_dynamodb_table" "ar_table" { name = "${data.aws_ssm_parameter.db_name.value}-${random_string.postfix.result}" hash_key = "pk" @@ -21,6 +40,12 @@ resource "aws_dynamodb_table" "ar_table" { name = "sk" type = "S" } + + global_secondary_index { + name = "orcs-index" + hash_key = "orcs" + projection_type = "ALL" + } } resource "aws_backup_vault" "backup_vault" { diff --git a/terraform/src/nameUpdate.tf b/terraform/src/nameUpdate.tf new file mode 100644 index 0000000..d7d643e --- /dev/null +++ b/terraform/src/nameUpdate.tf @@ -0,0 +1,48 @@ +resource "aws_lambda_function" "name_update" { + function_name = "nameUpdate-${random_string.postfix.result}" + + filename = "artifacts/nameUpdate.zip" + source_code_hash = filebase64sha256("artifacts/nameUpdate.zip") + + handler = "lambda/nameUpdate/index.handler" + runtime = "nodejs18.x" + memory_size = 512 + timeout = 300 + publish = "true" + + environment { + variables = { + TABLE_NAME = aws_dynamodb_table.ar_table.name, + NAME_CACHE_TABLE_NAME = aws_dynamodb_table.ar_table_name_cache.name, + DATA_REGISTER_NAME_ENDPOINT = data.aws_ssm_parameter.data_register_name_endpoint.value, + LOG_LEVEL = "debug" + } + } + role = aws_iam_role.databaseReadRole.arn +} + +resource "aws_lambda_alias" "name_update_latest" { + name = "latest" + function_name = aws_lambda_function.name_update.function_name + function_version = aws_lambda_function.name_update.version +} + +resource "aws_cloudwatch_event_rule" "name_update_every_midnight" { + name = "name-update-every-midnight" + description = "Executes nightly" + schedule_expression = "cron(* 0 * * ? *)" +} + +resource "aws_cloudwatch_event_target" "name_update_every_midnight" { + rule = aws_cloudwatch_event_rule.name_update_every_midnight.name + target_id = "name_update" + arn = aws_lambda_function.name_update.arn +} + +resource "aws_lambda_permission" "allow_cloudwatch_to_call_name_update" { + statement_id = "AllowExecutionFromCloudWatch" + action = "lambda:InvokeFunction" + function_name = aws_lambda_function.name_update.function_name + principal = "events.amazonaws.com" + source_arn = aws_cloudwatch_event_rule.name_update_every_midnight.arn +} diff --git a/terraform/src/variables.tf b/terraform/src/variables.tf index c706aff..757e39d 100644 --- a/terraform/src/variables.tf +++ b/terraform/src/variables.tf @@ -15,6 +15,14 @@ data "aws_ssm_parameter" "db_name" { name = "/parks-ar-api/db-name" } +data "aws_ssm_parameter" "db_name_cache" { + name = "/parks-ar-api/db-name-cache" +} + +data "aws_ssm_parameter" "data_register_name_endpoint" { + name = "/parks-ar-api/data-register-name-endpoint" +} + data "aws_ssm_parameter" "s3_bucket_assets" { name = "/parks-ar-api/s3-bucket-assets" }