diff --git a/CHANGELOG.md b/CHANGELOG.md index 8aa9fc68839..43c4d95d768 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). ### Added +- **CUMULUS-3757** + - Added a database helper function to assist with updating a granule and its files PG fields when moving the granule(s) across collections - **CUMULUS-3919** - Added terraform variables `disableSSL` and `rejectUnauthorized` to `tf-modules/cumulus-rds-tf` module. diff --git a/packages/api-client/src/granules.ts b/packages/api-client/src/granules.ts index 305a157defb..8441b16eea0 100644 --- a/packages/api-client/src/granules.ts +++ b/packages/api-client/src/granules.ts @@ -602,6 +602,40 @@ export const associateExecutionWithGranule = async (params: { }); }; +/** + * Bulk operations on granules stored in cumulus + * POST /granules/bulk + * + * @param params - params + * @param params.prefix - the prefix configured for the stack + * @param params.body - body to pass the API lambda + * @param params.callback - async function to invoke the api lambda + * that takes a prefix / user payload. Defaults + * to cumulusApiClient.invokeApifunction to invoke the + * api lambda + * @returns - the response from the callback + */ +export const updateGranules = async (params: { + prefix: string, + body: ApiGranule[], + callback?: InvokeApiFunction +}): Promise => { + const { prefix, body, callback = invokeApi } = params; + return await callback({ + prefix: prefix, + payload: { + httpMethod: 'PATCH', + resource: '/{proxy+}', + headers: { + 'Content-Type': 'application/json', + }, + path: '/granules/', + body: JSON.stringify(body), + }, + expectedStatusCodes: 202, + }); +} + /** * Bulk operations on granules stored in cumulus * POST /granules/bulk diff --git a/packages/api/endpoints/granules.js b/packages/api/endpoints/granules.js index f3b3ed47544..94db3e9ef14 100644 --- a/packages/api/endpoints/granules.js +++ b/packages/api/endpoints/granules.js @@ -25,6 +25,7 @@ const { translatePostgresCollectionToApiCollection, translatePostgresGranuleToApiGranule, getGranuleAndCollection, + updateGranulesAndFiles, } = require('@cumulus/db'); const { deleteGranuleAndFiles } = require('../src/lib/granule-delete'); @@ -846,6 +847,34 @@ async function getByGranuleId(req, res) { return res.send({ ...result, recoveryStatus }); } +/** + * Based on a move-collections-task, will update a list of + * moved granules' records in PG and ES + * + * @param {Object} req - express request object + * @param {Object} res - express response object + * @returns {Promise} the promise of express response object + */ +async function updateGranulesAndFilesCollectionRecords(req, res) { + const { + knex = await getKnexClient(), + } = req.testContext || {}; + const payload = req.body; + try { + await updateGranulesAndFiles(knex, payload); + // update ES + } catch (error) { + log.error( + 'failed to update granules:', + error + ); + return res.boom.badRequest(errorify(error)); + } + return res.send({ + message: 'Successfully updated granules', + }); +} + async function bulkOperations(req, res) { const payload = req.body; @@ -1009,6 +1038,7 @@ router.patch('/:granuleId', requireApiVersion(2), patchByGranuleId); router.patch('/:collectionId/:granuleId', requireApiVersion(2), patch); router.put('/:collectionId/:granuleId', requireApiVersion(2), put); +router.patch('/', updateGranulesAndFilesCollectionRecords); router.post( '/bulk', validateBulkGranulesRequest, @@ -1039,5 +1069,6 @@ module.exports = { put, patch, patchGranule, + updateGranulesAndFilesCollectionRecords, router, }; diff --git a/packages/api/tests/endpoints/test-granules-updateGranulesAndFilesCollection.js b/packages/api/tests/endpoints/test-granules-updateGranulesAndFilesCollection.js new file mode 100644 index 00000000000..3a9e01e47e3 --- /dev/null +++ b/packages/api/tests/endpoints/test-granules-updateGranulesAndFilesCollection.js @@ -0,0 +1,292 @@ +/* eslint-disable no-await-in-loop */ +const test = require('ava'); +const range = require('lodash/range'); + +const cryptoRandomString = require('crypto-random-string'); +const { + CollectionPgModel, + destroyLocalTestDb, + fakeCollectionRecordFactory, + fakeFileRecordFactory, + fakeGranuleRecordFactory, + FilePgModel, + generateLocalTestDb, + getUniqueGranuleByGranuleId, + GranulePgModel, + localStackConnectionEnv, + migrationDir, + translatePostgresCollectionToApiCollection, + translatePostgresGranuleResultToApiGranule, +} = require('@cumulus/db'); +const { + createBucket, + recursivelyDeleteS3Bucket, + s3PutObject, +} = require('@cumulus/aws-client/S3'); +const { randomString, randomId } = require('@cumulus/common/test-utils'); +const { constructCollectionId } = require('@cumulus/message/Collections'); +const models = require('../../models'); + +const { request } = require('../helpers/request'); + +// Dynamo mock data factories +const { + createFakeJwtAuthToken, + setAuthorizedOAuthUsers, +} = require('../../lib/testUtils'); + +const testDbName = `granules_${cryptoRandomString({ length: 10 })}`; + +let accessTokenModel; +let jwtAuthToken; + +process.env.AccessTokensTable = randomId('token'); +process.env.stackName = randomId('stackname'); +process.env.system_bucket = randomId('system-bucket'); +process.env.TOKEN_SECRET = randomId('secret'); +process.env.backgroundQueueUrl = randomId('backgroundQueueUrl'); + +// import the express app after setting the env variables +const { app } = require('../../app'); + +/** + * Simulate granule records post-collection-move for database updates test + * + * @param {Knex | Knex.Transaction} knexOrTransaction - DB client or transaction + * @param {Array} [granules] - granule records to update + * @param {Object} [collection] - current collection of granules used for translation + * @param {string} [collectionId] - collectionId of current granules + * @param {string} [collectionId2] - collectionId of collection that the granule is moving to + * @returns {Array} - list of updated apiGranules (moved to new collection) + */ +const simulateGranuleUpdate = async (knex, granules, collection, collectionId, collectionId2) => { + const movedGranules = []; + for (const granule of granules) { + const postMoveApiGranule = await translatePostgresGranuleResultToApiGranule(knex, { + ...granule, + collectionName: collection.name, + collectionVersion: collection.version, + }); + postMoveApiGranule.collectionId = collectionId2; + postMoveApiGranule.updatedAt = Date.now(); + postMoveApiGranule.lastUpdateDateTime = new Date().toISOString(); + for (const apiFile of postMoveApiGranule.files) { + apiFile.bucket = apiFile.bucket.replace(collectionId, collectionId2); + apiFile.key = apiFile.key.replace(collectionId, collectionId2); + apiFile.updatedAt = Date.now(); + } + movedGranules.push(postMoveApiGranule); + } + return movedGranules; +}; + +test.before(async (t) => { + const { knexAdmin, knex } = await generateLocalTestDb( + testDbName, + migrationDir + ); + process.env.CMR_ENVIRONMENT = 'SIT'; + process.env = { + ...process.env, + ...localStackConnectionEnv, + PG_DATABASE: testDbName, + }; + + // create a fake bucket + await createBucket(process.env.system_bucket); + + // create a workflow template file + const tKey = `${process.env.stackName}/workflow_template.json`; + await s3PutObject({ Bucket: process.env.system_bucket, Key: tKey, Body: '{}' }); + + const username = randomString(); + await setAuthorizedOAuthUsers([username]); + + accessTokenModel = new models.AccessToken(); + await accessTokenModel.createTable(); + + jwtAuthToken = await createFakeJwtAuthToken({ accessTokenModel, username }); + t.context.knexAdmin = knexAdmin; + t.context.knex = knex; + + t.context.granulePgModel = new GranulePgModel(); + t.context.collectionPgModel = new CollectionPgModel(); + t.context.filePgModel = new FilePgModel(); + + // set up 2 collections + t.context.collection = fakeCollectionRecordFactory(); + t.context.collection2 = fakeCollectionRecordFactory(); + t.context.collectionId = constructCollectionId( + t.context.collection.name, + t.context.collection.version + ); + t.context.collectionId2 = constructCollectionId( + t.context.collection2.name, + t.context.collection2.version + ); + const collectionResponse = await t.context.collectionPgModel.create( + t.context.knex, + t.context.collection + ); + const collectionResponse2 = await t.context.collectionPgModel.create( + t.context.knex, + t.context.collection2 + ); + t.context.collectionCumulusId = collectionResponse[0].cumulus_id; + t.context.collectionCumulusId2 = collectionResponse2[0].cumulus_id; + t.context.apiCollection1 = translatePostgresCollectionToApiCollection(collectionResponse[0]); + t.context.apiCollection2 = translatePostgresCollectionToApiCollection(collectionResponse2[0]); + + // create 10 granules in one collection, 0 in the other + t.context.granuleIds = range(10).map((num) => 'granuleId___' + num); + + t.context.granulePgModel = new GranulePgModel(); + t.context.granules = range(10).map((num) => fakeGranuleRecordFactory({ + granule_id: t.context.granuleIds[num], + collection_cumulus_id: t.context.collectionCumulusId, + cumulus_id: num, + })); + t.context.pgGranules = await t.context.granulePgModel.insert( + knex, + t.context.granules + ); + + t.context.movedGranules = []; + + t.context.files = []; + // create fake files for each of the ten granules (3 per granule) + for (const pgGranule of t.context.granules) { + t.context.files.push( + fakeFileRecordFactory({ + granule_cumulus_id: pgGranule.cumulus_id, + file_name: pgGranule.granule_id + '.hdf', + updated_at: new Date().toISOString(), + bucket: t.context.collectionId + '--bucket', + key: t.context.collectionId + pgGranule.granule_id + '/key-hdf.pem', + path: t.context.collectionId + '/' + pgGranule.granule_id, + }), + fakeFileRecordFactory({ + granule_cumulus_id: pgGranule.cumulus_id, + file_name: pgGranule.granule_id + '.txt', + updated_at: new Date().toISOString(), + bucket: t.context.collectionId + '--bucket', + key: t.context.collectionId + pgGranule.granule_id + '/key-txt.pem', + path: t.context.collectionId + '/' + pgGranule.granule_id, + }), + fakeFileRecordFactory({ + granule_cumulus_id: pgGranule.cumulus_id, + file_name: pgGranule.granule_id + '.cmr', + updated_at: new Date().toISOString(), + bucket: t.context.collectionId + '--bucket', + key: t.context.collectionId + pgGranule.granule_id + '/key-cmr.pem', + path: t.context.collectionId + '/' + pgGranule.granule_id, + }) + ); + } + + t.context.pgFiles = await t.context.filePgModel.insert(knex, t.context.files); + // update 1/2 of the granules to be moved to the new collection + t.context.movedGranules.push(await simulateGranuleUpdate(knex, t.context.granules.slice(0, 5), + t.context.collection, t.context.collectionId, t.context.collectionId2)); + + // the other half will be unmoved but translated to an apiGranule + t.context.movedGranules.push(await simulateGranuleUpdate(knex, t.context.granules.slice(5), + t.context.collection, t.context.collectionId, t.context.collectionId)); + + t.context.movedGranules = t.context.movedGranules.flat(); +}); + +test.after.always(async (t) => { + await accessTokenModel.deleteTable(); + await recursivelyDeleteS3Bucket(process.env.system_bucket); + await destroyLocalTestDb({ + knex: t.context.knex, + knexAdmin: t.context.knexAdmin, + testDbName, + }); +}); + +test.serial('PATCH successfully updates a partial list of granules based on the collectionId change ', async (t) => { + const { + granuleIds, + granulePgModel, + movedGranules, + collectionId2, + collectionId, + collection, + collection2, + knex, + } = t.context; + const response = await request(app) + .patch('/granules/') + .set('Accept', 'application/json') + .set('Authorization', `Bearer ${jwtAuthToken}`) + .send(movedGranules) + .expect(200); + + const body = response.body; + t.true(body.message.includes('Successfully updated granules')); + const returnedGranules = await Promise.all(granuleIds.map((id) => + getUniqueGranuleByGranuleId(knex, id, granulePgModel))); + + for (const granule of returnedGranules) { + const testCollection = granule.cumulus_id >= 5 ? collection : collection2; + const testCollectionId = granule.cumulus_id >= 5 ? collectionId : collectionId2; + const apiGranule = await translatePostgresGranuleResultToApiGranule(knex, { + ...granule, + collectionName: testCollection.name, + collectionVersion: testCollection.version, + }); + // the movedGranules param only has 1/2 of the granules to be moved to collection 2 + // here we can check based on the granule's cumulus id which collection it should be a part of + t.true(apiGranule.collectionId === testCollectionId); + for (const file of apiGranule.files) { + t.true(file.key.includes(testCollectionId)); + t.true(file.bucket.includes(testCollectionId)); + } + } +}); + +test.serial('PATCH successfully updates a complete list of granules, 1/2 of which have already been moved', async (t) => { + const { + granuleIds, + granulePgModel, + granules, + movedGranules, + collectionId2, + collectionId, + collection, + collection2, + knex, + } = t.context; + movedGranules.splice(-5); + movedGranules.push(await simulateGranuleUpdate(knex, granules.slice(5), collection, + collectionId, collectionId2)); + + const testPostMoveApiGranules = movedGranules.flat(); + const response = await request(app) + .patch('/granules/') + .set('Accept', 'application/json') + .set('Authorization', `Bearer ${jwtAuthToken}`) + .send(testPostMoveApiGranules) + .expect(200); + + const body = response.body; + t.true(body.message.includes('Successfully updated granules')); + const returnedGranules = await Promise.all(granuleIds.map((id) => + getUniqueGranuleByGranuleId(knex, id, granulePgModel))); + + for (const granule of returnedGranules) { + const apiGranule = await translatePostgresGranuleResultToApiGranule(knex, { + ...granule, + collectionName: collection2.name, + collectionVersion: collection2.version, + }); + // now every granule should be part of collection 2 + t.true(apiGranule.collectionId === collectionId2); + for (const file of apiGranule.files) { + t.true(file.key.includes(collectionId2)); + t.true(file.bucket.includes(collectionId2)); + } + } +}); diff --git a/packages/db/src/index.ts b/packages/db/src/index.ts index 3a417e174b5..1ab74257863 100644 --- a/packages/db/src/index.ts +++ b/packages/db/src/index.ts @@ -134,6 +134,7 @@ export { getGranulesByApiPropertiesQuery, getGranulesByGranuleId, getGranuleAndCollection, + updateGranulesAndFiles, } from './lib/granule'; export { diff --git a/packages/db/src/lib/granule.ts b/packages/db/src/lib/granule.ts index b6e2ca273fb..aebd27f648d 100644 --- a/packages/db/src/lib/granule.ts +++ b/packages/db/src/lib/granule.ts @@ -6,19 +6,25 @@ import { deconstructCollectionId, } from '@cumulus/message/Collections'; +import { ApiGranuleRecord } from '@cumulus/types/api/granules'; import { RecordDoesNotExist } from '@cumulus/errors'; import Logger from '@cumulus/logger'; +import { createRejectableTransaction } from '../database'; import { CollectionPgModel } from '../models/collection'; import { GranulePgModel } from '../models/granule'; import { GranulesExecutionsPgModel } from '../models/granules-executions'; import { PostgresGranule, PostgresGranuleRecord } from '../types/granule'; import { GranuleWithProviderAndCollectionInfo } from '../types/query'; import { UpdatedAtRange } from '../types/record'; +import { translateApiGranuleToPostgresGranule } from '../translate/granules'; +import { translateApiFiletoPostgresFile } from '../translate/file'; const { deprecate } = require('@cumulus/common/util'); const { TableNames } = require('../tables'); +const log = new Logger({ sender: '@cumulus/api/lib/writeRecords/write-granules' }); + export const getGranuleCollectionId = async ( knexOrTransaction: Knex, granule: { collection_cumulus_id: number } @@ -354,3 +360,61 @@ export const getGranulesByGranuleId = async ( .where({ granule_id: granuleId }); return records; }; + +/** + * Change a granules' PG record and its files' PG record based on collection move + * + * @param {Knex | Knex.Transaction} knexOrTransaction - DB client or transaction + * @param {Array} [granules] - updated ApiGranule records + * @returns {Promise} + */ +export const updateGranulesAndFiles = async ( + knexOrTransaction: Knex | Knex.Transaction, + granules: Array +):Promise => { + const { + granules: granulesTable, + files: filesTable, + } = TableNames; + + try { + await createRejectableTransaction(knexOrTransaction, async (trx) => { + await Promise.all(granules.map(async (granule) => { + const pgGranule = await translateApiGranuleToPostgresGranule({ + dynamoRecord: granule, + knexOrTransaction, + }); + await trx(granulesTable).where('granule_id', '=', pgGranule.granule_id).update( + { + collection_cumulus_id: pgGranule.collection_cumulus_id, + updated_at: pgGranule.updated_at, + status: pgGranule.status, + last_update_date_time: pgGranule.last_update_date_time, + } + ); + if (granule.files) { + await Promise.all(granule.files.map(async (file) => { + const pgFile = translateApiFiletoPostgresFile( + { + ...file, + granuleId: pgGranule.granule_id, + } + ); + + await trx(filesTable).where('file_name', '=', String(pgFile.file_name)).update( + { + updated_at: pgGranule.updated_at, + bucket: pgFile.bucket, + key: pgFile.key, + path: pgFile.path, + } + ); + })); + } + })); + }); + } catch (thrownError) { + log.error(`Write Granule and Files failed: ${JSON.stringify(thrownError)}`); + throw thrownError; + } +};