Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CUMULUS-3842: Delete DynamoDB ReconciliationReportsTable #3857

Open
wants to merge 8 commits into
base: feature/es-phase-2
Choose a base branch
from
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ aws lambda invoke --function-name $PREFIX-ReconciliationReportMigration $OUTFILE
- **CUMULUS-3718**
- Updated `reconciliation_reports` list api endpoint and added `ReconciliationReportSearch` class to query postgres
- Added `reconciliationReports` type to stats endpoint, so `aggregate` query will work for reconciliation reports
- **CUMULUS-3842**
- Remove reconciliationReports DynamoDB table
- **CUMULUS-3859**
- Updated `@cumulus/api/bin/serveUtils` to no longer add records to ElasticSearch

Expand Down
21 changes: 0 additions & 21 deletions packages/api/bin/serve.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ const {
const { constructCollectionId } = require('@cumulus/message/Collections');

const { bootstrapElasticSearch } = require('@cumulus/es-client/bootstrap');

const { ReconciliationReport } = require('../models');

const testUtils = require('../lib/testUtils');
const serveUtils = require('./serveUtils');
const {
Expand Down Expand Up @@ -249,24 +246,6 @@ async function serveApi(user, stackName = localStackName, reseed = true) {

checkEnvVariablesAreSet(requiredEnvVars);

// Create reconciliation report table
const reconciliationReportTableName = `${stackName}-ReconciliationReportsTable`;
process.env.ReconciliationReportsTable = reconciliationReportTableName;
const reconciliationReportModel = new ReconciliationReport({
tableName: reconciliationReportTableName,
stackName: process.env.stackName,
systemBucket: process.env.system_bucket,
});
try {
await reconciliationReportModel.createTable();
} catch (error) {
if (error && error.name && error.name === 'ResourceInUseException') {
console.log(`${reconciliationReportTableName} is already created`);
} else {
throw error;
}
}

await prepareServices(stackName, process.env.system_bucket);
await populateBucket(process.env.system_bucket, stackName);
if (reseed) {
Expand Down
1 change: 0 additions & 1 deletion packages/api/endpoints/elasticsearch.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ async function indexFromDatabase(req, res) {
operationType: 'ES Index',
payload: {
indexName,
reconciliationReportsTable: process.env.ReconciliationReportsTable,
esHost: process.env.ES_HOST,
esRequestConcurrency: esRequestConcurrency || process.env.ES_CONCURRENCY,
postgresResultPageSize,
Expand Down
67 changes: 7 additions & 60 deletions packages/api/lambdas/index-from-database.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const isNil = require('lodash/isNil');
const pLimit = require('p-limit');

const DynamoDbSearchQueue = require('@cumulus/aws-client/DynamoDbSearchQueue');
const log = require('@cumulus/common/log');

const { getEsClient } = require('@cumulus/es-client/search');
Expand All @@ -13,6 +12,7 @@ const {
AsyncOperationPgModel,
GranulePgModel,
ProviderPgModel,
ReconciliationReportPgModel,
RulePgModel,
PdrPgModel,
getKnexClient,
Expand All @@ -22,6 +22,7 @@ const {
translatePostgresGranuleToApiGranule,
translatePostgresProviderToApiProvider,
translatePostgresPdrToApiPdr,
translatePostgresReconReportToApiReconReport,
translatePostgresRuleToApiRule,
} = require('@cumulus/db');
const indexer = require('@cumulus/es-client/indexer');
Expand Down Expand Up @@ -60,62 +61,6 @@ const getEsRequestConcurrency = (event) => {
return 10;
};

// Legacy method used for indexing Reconciliation Reports only
async function indexReconciliationReports({
esClient,
tableName,
esIndex,
indexFn,
limitEsRequests,
}) {
const scanQueue = new DynamoDbSearchQueue({
TableName: tableName,
});

let itemsComplete = false;
let totalItemsIndexed = 0;

/* eslint-disable no-await-in-loop */
while (itemsComplete === false) {
await scanQueue.fetchItems();

itemsComplete = scanQueue.items[scanQueue.items.length - 1] === null;

if (itemsComplete) {
// pop the null item off
scanQueue.items.pop();
}

if (scanQueue.items.length === 0) {
log.info(`No records to index for ${tableName}`);
return true;
}

log.info(`Attempting to index ${scanQueue.items.length} records from ${tableName}`);

const input = scanQueue.items.map(
(item) => limitEsRequests(
async () => {
try {
return await indexFn(esClient, item, esIndex);
} catch (error) {
log.error(`Error indexing record ${JSON.stringify(item)}, error: ${error}`);
return false;
}
}
)
);
const results = await Promise.all(input);
const successfulResults = results.filter((result) => result !== false);
totalItemsIndexed += successfulResults;

log.info(`Completed index of ${successfulResults.length} records from ${tableName}`);
}
/* eslint-enable no-await-in-loop */

return totalItemsIndexed;
}

/**
* indexModel - Index a postgres RDS table's contents to ElasticSearch
*
Expand Down Expand Up @@ -199,7 +144,6 @@ async function indexFromDatabase(event) {
const {
indexName: esIndex,
esHost = process.env.ES_HOST,
reconciliationReportsTable = process.env.ReconciliationReportsTable,
postgresResultPageSize,
postgresConnectionPoolSize,
} = event;
Expand Down Expand Up @@ -286,12 +230,15 @@ async function indexFromDatabase(event) {
knex,
pageSize,
}),
indexReconciliationReports({
indexModel({
esClient,
tableName: reconciliationReportsTable,
esIndex,
indexFn: indexer.indexReconciliationReport,
limitEsRequests,
postgresModel: new ReconciliationReportPgModel(),
translationFunction: translatePostgresReconReportToApiReconReport,
knex,
pageSize,
}),
indexModel({
esClient,
Expand Down
2 changes: 1 addition & 1 deletion packages/api/models/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ const ReconciliationReport = require('./reconciliation-reports');

module.exports = {
AccessToken,
ReconciliationReport,
Manager,
ReconciliationReport,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should I remove this model + the tests as well ? its not needed anymore right 🤔

};
2 changes: 0 additions & 2 deletions packages/api/tests/endpoints/test-elasticsearch.js
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,6 @@ test.serial('Reindex from database - startAsyncOperation is called with expected
const indexName = randomString();
const processEnv = { ...process.env };
process.env.ES_HOST = 'fakeEsHost';
process.env.ReconciliationReportsTable = 'fakeReportsTable';

const asyncOperationsStub = sinon.stub(startAsyncOperation, 'invokeStartAsyncOperationLambda');
const payload = {
Expand All @@ -572,7 +571,6 @@ test.serial('Reindex from database - startAsyncOperation is called with expected
t.deepEqual(asyncOperationsStub.getCall(0).args[0].payload, {
...payload,
esHost: process.env.ES_HOST,
reconciliationReportsTable: process.env.ReconciliationReportsTable,
});
} finally {
process.env = processEnv;
Expand Down
43 changes: 12 additions & 31 deletions packages/api/tests/lambdas/test-index-from-database.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,59 +21,39 @@ const {
fakeCollectionRecordFactory,
fakeExecutionRecordFactory,
fakeGranuleRecordFactory,
fakeReconciliationReportRecordFactory,
fakePdrRecordFactory,
fakeProviderRecordFactory,
generateLocalTestDb,
GranulePgModel,
migrationDir,
ReconciliationReportPgModel,
PdrPgModel,
ProviderPgModel,
translatePostgresCollectionToApiCollection,
translatePostgresExecutionToApiExecution,
translatePostgresGranuleToApiGranule,
translatePostgresReconReportToApiReconReport,
translatePostgresPdrToApiPdr,
translatePostgresProviderToApiProvider,
} = require('@cumulus/db');

const {
fakeReconciliationReportFactory,
} = require('../../lib/testUtils');

const models = require('../../models');
const indexFromDatabase = require('../../lambdas/index-from-database');
const {
getWorkflowList,
} = require('../../lib/testUtils');

const workflowList = getWorkflowList();
process.env.ReconciliationReportsTable = randomString();
const reconciliationReportModel = new models.ReconciliationReport();

// create all the variables needed across this test
process.env.system_bucket = randomString();
process.env.stackName = randomString();

const reconciliationReportsTable = process.env.ReconciliationReportsTable;

function sortAndFilter(input, omitList, sortKey) {
return input.map((r) => omit(r, omitList))
.sort((a, b) => (a[sortKey] > b[sortKey] ? 1 : -1));
}

async function addFakeDynamoData(numItems, factory, model, factoryParams = {}) {
const items = [];

/* eslint-disable no-await-in-loop */
for (let i = 0; i < numItems; i += 1) {
const item = factory(factoryParams);
items.push(item);
await model.create(item);
}
/* eslint-enable no-await-in-loop */

return items;
}

async function addFakeData(knex, numItems, factory, model, factoryParams = {}) {
const items = [];
for (let i = 0; i < numItems; i += 1) {
Expand All @@ -92,7 +72,6 @@ test.before(async (t) => {
t.context.esIndices = [];

await awsServices.s3().createBucket({ Bucket: process.env.system_bucket });
await reconciliationReportModel.createTable();

const wKey = `${process.env.stackName}/workflows/${workflowList[0].name}.json`;
const tKey = `${process.env.stackName}/workflow_template.json`;
Expand Down Expand Up @@ -220,7 +199,6 @@ test('No error is thrown if nothing is in the database', async (t) => {

await t.notThrowsAsync(() => indexFromDatabase.indexFromDatabase({
indexName: esAlias,
reconciliationReportsTable,
knex,
}));
});
Expand Down Expand Up @@ -272,10 +250,12 @@ test.serial('Lambda successfully indexes records of all types', async (t) => {
...dateObject,
});

const fakeReconciliationReportRecords = await addFakeDynamoData(
const fakeReconciliationReportRecords = await addFakeData(
knex,
numItems,
fakeReconciliationReportFactory,
reconciliationReportModel
fakeReconciliationReportRecordFactory,
new ReconciliationReportPgModel(),
dateObject
);

await indexFromDatabase.handler({
Expand Down Expand Up @@ -309,6 +289,9 @@ test.serial('Lambda successfully indexes records of all types', async (t) => {
knexOrTransaction: knex,
}))
);
const reconciliationReportResults = await Promise.all(
fakeReconciliationReportRecords.map((r) => translatePostgresReconReportToApiReconReport(r))
);
const pdrResults = await Promise.all(
fakePdrRecords.map((r) => translatePostgresPdrToApiPdr(r, knex))
);
Expand Down Expand Up @@ -346,7 +329,7 @@ test.serial('Lambda successfully indexes records of all types', async (t) => {

t.deepEqual(
sortAndFilter(searchResults[5].results, ['timestamp'], 'name'),
sortAndFilter(fakeReconciliationReportRecords, ['timestamp'], 'name')
sortAndFilter(reconciliationReportResults, ['timestamp'], 'name')
);
});

Expand Down Expand Up @@ -386,7 +369,6 @@ test.serial('failure in indexing record of specific type should not prevent inde
try {
await indexFromDatabase.handler({
indexName: esAlias,
reconciliationReportsTable,
knex,
});

Expand Down Expand Up @@ -463,7 +445,6 @@ test.serial(
try {
await indexFromDatabase.handler({
indexName: esAlias,
reconciliationReportsTable,
knex,
});

Expand Down
2 changes: 0 additions & 2 deletions tf-modules/archive/api.tf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ resource "aws_ssm_parameter" "dynamo_table_names" {
type = "String"
value = jsonencode({
AccessTokensTable = var.dynamo_tables.access_tokens.name
ReconciliationReportsTable = var.dynamo_tables.reconciliation_reports.name
})
}
locals {
Expand All @@ -13,7 +12,6 @@ locals {
api_redirect_uri = "${local.api_uri}token"
dynamo_table_namestring = jsonencode({
AccessTokensTable = var.dynamo_tables.access_tokens.name
ReconciliationReportsTable = var.dynamo_tables.reconciliation_reports.name
})
api_env_variables = {
auth_mode = "public"
Expand Down
1 change: 0 additions & 1 deletion tf-modules/archive/index_from_database.tf
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ resource "aws_lambda_function" "index_from_database" {
databaseCredentialSecretArn = var.rds_user_access_secret_arn
ES_CONCURRENCY = var.es_request_concurrency
ES_HOST = var.elasticsearch_hostname
ReconciliationReportsTable = var.dynamo_tables.reconciliation_reports.name
stackName = var.prefix
}
}
Expand Down
25 changes: 0 additions & 25 deletions tf-modules/data-persistence/dynamo.tf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ locals {
enable_point_in_time_table_names = [for x in var.enable_point_in_time_tables : "${var.prefix}-${x}"]
table_names = {
access_tokens_table = "${var.prefix}-AccessTokensTable"
reconciliation_reports_table = "${var.prefix}-ReconciliationReportsTable"
semaphores_table = "${var.prefix}-SemaphoresTable"
}
}
Expand Down Expand Up @@ -34,30 +33,6 @@ resource "aws_dynamodb_table" "access_tokens_table" {
tags = var.tags
}

resource "aws_dynamodb_table" "reconciliation_reports_table" {
name = local.table_names.reconciliation_reports_table
billing_mode = "PAY_PER_REQUEST"
hash_key = "name"
stream_enabled = true
stream_view_type = "NEW_AND_OLD_IMAGES"

attribute {
name = "name"
type = "S"
}

point_in_time_recovery {
enabled = contains(local.enable_point_in_time_table_names, local.table_names.reconciliation_reports_table)
}

lifecycle {
prevent_destroy = true
ignore_changes = [ name ]
}

tags = var.tags
}

resource "aws_dynamodb_table" "semaphores_table" {
name = local.table_names.semaphores_table
billing_mode = "PAY_PER_REQUEST"
Expand Down
4 changes: 0 additions & 4 deletions tf-modules/data-persistence/outputs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ output "dynamo_tables" {
name = aws_dynamodb_table.access_tokens_table.name,
arn = aws_dynamodb_table.access_tokens_table.arn
}
reconciliation_reports = {
name = aws_dynamodb_table.reconciliation_reports_table.name
arn = aws_dynamodb_table.reconciliation_reports_table.arn
}
semaphores = {
name = aws_dynamodb_table.semaphores_table.name
arn = aws_dynamodb_table.semaphores_table.arn
Expand Down