diff --git a/.eslintignore b/.eslintignore index c95db6dce80..33f0e28918b 100644 --- a/.eslintignore +++ b/.eslintignore @@ -36,3 +36,5 @@ _book/** /packages/object-store/*.js /packages/lzards-api-client/*.d.ts /packages/lzards-api-client/*.js +/tasks/move-granule-collections/*.js +/tasks/move-granule-collections/*.d.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e1d09cbd68..4200356d362 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). ### Added +- **CUMULUS-3757** + - Added a database helper function to assist with updating a granule and its files PG fields when moving the granule(s) across collections - **CUMULUS-3919** - Added terraform variables `disableSSL` and `rejectUnauthorized` to `tf-modules/cumulus-rds-tf` module. diff --git a/docs/tasks.md b/docs/tasks.md index e0a9b7a4f34..081b488fc65 100644 --- a/docs/tasks.md +++ b/docs/tasks.md @@ -80,6 +80,15 @@ Move granule files from staging to final location --- +### [@cumulus/move-granule-collections](https://github.com/nasa/cumulus/tree/master/tasks/move-granule-collections) + +Move granule files and records in cumulus datastores from one collection to another + +- Schemas: See this module's [schema definitions](https://github.com/nasa/cumulus/tree/master/tasks/move-granule-collections/schemas). +- Resources: [npm](https://npmjs.com/package/@cumulus/move-granule-collections) | [source](https://github.com/nasa/cumulus) | [web](https://github.com/nasa/cumulus/tree/master/tasks/move-granule-collections) + +--- + ### [@cumulus/orca-copy-to-archive-adapter](https://github.com/nasa/cumulus/tree/master/tasks/orca-copy-to-archive-adapter) Adapter to invoke orca copy-to-archive lambda diff --git a/example/cumulus-tf/ecs_move_granule_collections.asl.json b/example/cumulus-tf/ecs_move_granule_collections.asl.json new file mode 100644 index 00000000000..1f2730a3af2 --- /dev/null +++ b/example/cumulus-tf/ecs_move_granule_collections.asl.json @@ -0,0 +1,30 @@ +{ + "Comment": "Moves granules across collections", + "StartAt": "EcsTaskMoveGranuleCollections", + "States": { + "EcsTaskMoveGranuleCollections": { + "Parameters": { + "cma": { + "event.$": "$", + "task_config": { + "buckets": "{$.meta.buckets}", + "provider": "{$.meta.provider}", + "collection": "{$.meta.collection}" + } + } + }, + "Type": "Task", + "Resource": "${ecs_task_move_granule_collections}", + "TimeoutSeconds": 60, + "Retry": [ + { + "ErrorEquals": [ + "States.Timeout" + ], + "MaxAttempts": 1 + } + ], + "End": true + } + } +} diff --git a/example/cumulus-tf/ecs_move_granule_collections.tf b/example/cumulus-tf/ecs_move_granule_collections.tf new file mode 100644 index 00000000000..9dfc0165ab6 --- /dev/null +++ b/example/cumulus-tf/ecs_move_granule_collections.tf @@ -0,0 +1,17 @@ +module "ecs_move_granule_collections" { + source = "../../tf-modules/workflow" + + prefix = var.prefix + name = "ECSMoveGranuleCollectionsWorkflow" + workflow_config = module.cumulus.workflow_config + system_bucket = var.system_bucket + tags = local.tags + + + state_machine_definition = templatefile( + "${path.module}/ecs_move_granule_collections.asl.json", + { + ecs_task_move_granule_collections: data.terraform_remote_state.ingest.outputs.move_granule_collections_ecs_task_id + } + ) +} diff --git a/example/cumulus-tf/main.tf b/example/cumulus-tf/main.tf index 172ac87e34f..da4fc073e91 100644 --- a/example/cumulus-tf/main.tf +++ b/example/cumulus-tf/main.tf @@ -53,6 +53,11 @@ data "terraform_remote_state" "data_persistence" { config = var.data_persistence_remote_state_config workspace = terraform.workspace } +data "terraform_remote_state" "ingest" { + backend = "s3" + config = var.ingest_remote_state_config + workspace = terraform.workspace +} data "aws_lambda_function" "sts_credentials" { function_name = "gsfc-ngap-sh-s3-sts-get-keys" diff --git a/example/cumulus-tf/variables.tf b/example/cumulus-tf/variables.tf index 938c83a1193..ebc8d127538 100644 --- a/example/cumulus-tf/variables.tf +++ b/example/cumulus-tf/variables.tf @@ -106,6 +106,11 @@ variable "data_persistence_remote_state_config" { type = object({ bucket = string, key = string, region = string }) } +variable "ingest_remote_state_config" { + type = object({ bucket = string, key = string, region = string }) +} + + variable "s3_replicator_config" { type = object({ source_bucket = string, source_prefix = string, target_bucket = string, target_prefix = string, target_region = optional(string) }) default = null diff --git a/example/spec/parallel/moveGranuleCollections/MoveGranuleCollectionWorkflowSpec.js b/example/spec/parallel/moveGranuleCollections/MoveGranuleCollectionWorkflowSpec.js new file mode 100644 index 00000000000..ee543fa4823 --- /dev/null +++ b/example/spec/parallel/moveGranuleCollections/MoveGranuleCollectionWorkflowSpec.js @@ -0,0 +1,61 @@ +const { deleteExecution } = require('@cumulus/api-client/executions'); +const { ActivityStep } = require('@cumulus/integration-tests/sfnStep'); +const { getExecution } = require('@cumulus/api-client/executions'); + +const { buildAndExecuteWorkflow } = require('../../helpers/workflowUtils'); +const { loadConfig } = require('../../helpers/testUtils'); +const { waitForApiStatus } = require('../../helpers/apiUtils'); + +const activityStep = new ActivityStep(); + +describe('The MoveGranuleCollection workflow using ECS', () => { + let workflowExecution; + let config; + + beforeAll(async () => { + config = await loadConfig(); + + workflowExecution = await buildAndExecuteWorkflow( + config.stackName, + config.bucket, + 'ECSMoveGranuleCollections' + ); + }); + + afterAll(async () => { + await deleteExecution({ prefix: config.stackName, executionArn: workflowExecution.executionArn }); + }); + + it('executes successfully', () => { + expect(workflowExecution.status).toEqual('completed'); + }); + + describe('the HelloWorld ECS', () => { + let activityOutput; + + beforeAll(async () => { + activityOutput = await activityStep.getStepOutput( + workflowExecution.executionArn, + 'EcsTaskHelloWorld' + ); + }); + + it('output is Hello World', () => { + expect(activityOutput.payload).toEqual({ hello: 'Hello World' }); + }); + }); + + describe('the reporting lambda has received the cloudwatch stepfunction event and', () => { + it('the execution record is added to the PostgreSQL database', async () => { + const record = await waitForApiStatus( + getExecution, + { + prefix: config.stackName, + arn: workflowExecution.executionArn, + }, + 'completed' + ); + expect(record.status).toEqual('completed'); + }); + }); +}); diff --git a/example/spec/parallel/moveGranuleCollections/MoveGranuleCollectionsSpec.js b/example/spec/parallel/moveGranuleCollections/MoveGranuleCollectionsSpec.js new file mode 100644 index 00000000000..fff27013f2a --- /dev/null +++ b/example/spec/parallel/moveGranuleCollections/MoveGranuleCollectionsSpec.js @@ -0,0 +1,296 @@ +'use strict'; + +const { InvokeCommand } = require('@aws-sdk/client-lambda'); +const { lambda } = require('@cumulus/aws-client/services'); +const { + promiseS3Upload, + deleteS3Object, +} = require('@cumulus/aws-client/S3'); +const { waitForListObjectsV2ResultCount } = require('@cumulus/integration-tests'); +const { + granules, + collections, +} = require('@cumulus/api-client'); + +const path = require('path'); +const fs = require('fs'); +const { v4: uuidv4 } = require('uuid'); +const { loadConfig } = require('../../helpers/testUtils'); +const { constructCollectionId } = require('../../../../packages/message/Collections'); +describe('when moveGranulesCollection is called', () => { + let stackName; + const sourceUrlPrefix = `source_path/${uuidv4()}`; + const targetUrlPrefix = `target_path/${uuidv4()}`; + const originalCollection = { + files: [ + { + regex: '^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf$', + sampleFileName: 'MOD11A1.A2017200.h19v04.006.2017201090724.hdf', + bucket: 'protected', + }, + { + regex: '^BROWSE\\.MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf$', + sampleFileName: 'BROWSE.MOD11A1.A2017200.h19v04.006.2017201090724.hdf', + bucket: 'private', + }, + { + regex: '^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf\\.met$', + sampleFileName: 'MOD11A1.A2017200.h19v04.006.2017201090724.hdf.met', + bucket: 'private', + }, + { + regex: '^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.cmr\\.xml$', + sampleFileName: 'MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml', + bucket: 'protected', + }, + { + regex: '^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_2\\.jpg$', + sampleFileName: 'MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg', + bucket: 'public', + }, + { + regex: '^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_1\\.jpg$', + sampleFileName: 'MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg', + bucket: 'private', + }, + ], + url_path: targetUrlPrefix, + name: 'MOD11A1', + granuleIdExtraction: '(MOD11A1\\.(.*))\\.hdf', + granuleId: '^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}$', + dataType: 'MOD11A1', + process: 'modis', + version: '006', + sampleFileName: 'MOD11A1.A2017200.h19v04.006.2017201090724.hdf', + id: 'MOD11A1', + }; + const targetCollection = { + files: [ + { + regex: '^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf$', + sampleFileName: 'MOD11A1.A2017200.h19v04.006.2017201090724.hdf', + bucket: 'protected', + }, + { + regex: '^BROWSE\\.MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf$', + sampleFileName: 'BROWSE.MOD11A1.A2017200.h19v04.006.2017201090724.hdf', + bucket: 'private', + }, + { + regex: '^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf\\.met$', + sampleFileName: 'MOD11A1.A2017200.h19v04.006.2017201090724.hdf.met', + bucket: 'private', + }, + { + regex: '^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.cmr\\.xml$', + sampleFileName: 'MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml', + bucket: 'public', + }, + { + regex: '^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_2\\.jpg$', + sampleFileName: 'MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg', + bucket: 'public', + }, + { + regex: '^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_1\\.jpg$', + sampleFileName: 'MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg', + bucket: 'public', + url_path: `${targetUrlPrefix}/jpg/example2/`, + }, + ], + url_path: targetUrlPrefix, + name: 'MOD11A2', + granuleIdExtraction: '(MOD11A1\\.(.*))\\.hdf', + granuleId: '^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}$', + dataType: 'MOD11A2', + process: 'modis', + version: '006', + sampleFileName: 'MOD11A1.A2017200.h19v04.006.2017201090724.hdf', + id: 'MOD11A2', + }; + const processGranule = { + status: 'completed', + collectionId: 'MOD11A1___006', + granuleId: 'MOD11A1.A2017200.h19v04.006.2017201090724', + files: [ + { + key: `${sourceUrlPrefix}/MOD11A1.A2017200.h19v04.006.2017201090724.hdf`, + bucket: 'cumulus-test-sandbox-protected', + type: 'data', + fileName: 'MOD11A1.A2017200.h19v04.006.2017201090724.hdf', + }, + { + key: `${sourceUrlPrefix}/MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg`, + bucket: 'cumulus-test-sandbox-private', + type: 'browse', + fileName: 'MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg', + }, + { + key: `${sourceUrlPrefix}/MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg`, + bucket: 'cumulus-test-sandbox-public', + type: 'browse', + fileName: 'MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg', + }, + { + key: `${sourceUrlPrefix}/MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml`, + bucket: 'cumulus-test-sandbox-protected', + type: 'metadata', + fileName: 'MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml', + }, + ], + }; + // let systemBucket; + beforeAll(async () => { + const config = await loadConfig(); + stackName = config.stackName; + }); + + describe('under normal circumstances', () => { + let beforeAllFailed; + let finalFiles; + afterAll(async () => { + await Promise.all(finalFiles.map((fileObj) => deleteS3Object( + fileObj.bucket, + fileObj.key + ))); + }); + beforeAll(async () => { + finalFiles = [ + { + bucket: 'cumulus-test-sandbox-protected', + key: `${targetUrlPrefix}/MOD11A1.A2017200.h19v04.006.2017201090724.hdf`, + }, + { + bucket: 'cumulus-test-sandbox-public', + key: `${targetUrlPrefix}/jpg/example2/MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg`, + }, + { + bucket: 'cumulus-test-sandbox-public', + key: `${targetUrlPrefix}/MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg`, + }, + { + bucket: 'cumulus-test-sandbox-public', + key: `${targetUrlPrefix}/MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml`, + }, + ]; + + const payload = { + meta: { + collection: targetCollection, + buckets: { + internal: { + type: 'cumulus-test-sandbox-internal', + }, + private: { + name: 'cumulus-test-sandbox-private', + type: 'private', + }, + protected: { + name: 'cumulus-test-sandbox-protected', + type: 'protected', + }, + public: { + name: 'cumulus-test-sandbox-public', + type: 'public', + }, + }, + }, + config: { + buckets: '{$.meta.buckets}', + distribution_endpoint: 'https://something.api.us-east-1.amazonaws.com/', + collection: '{$.meta.collection}', + }, + input: { + granules: [ + processGranule, + ], + }, + }; + //upload to cumulus + try { + try { + await collections.createCollection({ + prefix: stackName, + collection: originalCollection, + }); + } catch { + console.log(`collection ${constructCollectionId(originalCollection.name, originalCollection.version)} already exists`); + } + try { + await collections.createCollection({ + prefix: stackName, + collection: targetCollection, + }); + } catch { + console.log(`collection ${constructCollectionId(targetCollection.name, targetCollection.version)} already exists`); + } + try { + await granules.createGranule({ + prefix: stackName, + body: processGranule, + }); + } catch { + console.log(`granule ${processGranule.granuleId} already exists`); + } + await Promise.all(processGranule.files.map(async (file) => { + let body; + if (file.type === 'metadata') { + body = fs.createReadStream(path.join(__dirname, 'data/meta.xml')); + } else { + body = file.key; + } + await promiseS3Upload({ + params: { + Bucket: file.bucket, + Key: file.key, + Body: body, + }, + }); + })); + const { $metadata } = await lambda().send(new InvokeCommand({ + FunctionName: `${stackName}-MoveGranuleCollections`, + InvocationType: 'RequestResponse', + Payload: JSON.stringify({ + cma: { + meta: payload.meta, + task_config: payload.config, + event: { + payload: payload.input, + }, + }, + }), + })); + console.log($metadata.httpStatusCode); + if ($metadata.httpStatusCode >= 400) { + console.log(`lambda invocation to set up failed, code ${$metadata.httpStatusCode}`); + beforeAllFailed = true; + } + + await Promise.all(finalFiles.map((file) => expectAsync( + waitForListObjectsV2ResultCount({ + bucket: file.bucket, + prefix: file.key, + desiredCount: 1, + interval: 5 * 1000, + timeout: 30 * 1000, + }) + ).toBeResolved())); + } catch (error) { + console.log(`files do not appear to have been moved: error: ${error}`); + beforeAllFailed = false; + } + }); + it('moves the granule data in s3', () => { + if (beforeAllFailed) fail('beforeAllFailed'); + }); + it('updates the granule data in cumulus', async () => { + if (beforeAllFailed) fail('beforeAllFailed'); + const cumulusGranule = await granules.getGranule({ + prefix: stackName, + granuleId: processGranule.granuleId, + }); + expect(cumulusGranule.granuleId).toEqual(processGranule.granuleId); + expect(cumulusGranule.collectionId).toEqual(constructCollectionId(targetCollection.name, targetCollection.version)); + }); + }); +}); diff --git a/example/spec/parallel/moveGranuleCollections/data/meta.xml b/example/spec/parallel/moveGranuleCollections/data/meta.xml new file mode 100644 index 00000000000..8c2a07bf41d --- /dev/null +++ b/example/spec/parallel/moveGranuleCollections/data/meta.xml @@ -0,0 +1,210 @@ + + + MOD11A1.A2017200.h19v04.006.2017201090724 + 2017-11-20T23:02:40.055807 + 2017-11-20T23:02:40.055814 + + MOD11A1 + 006 + + + further update is anticipated + reprocessed + MOD11A1.A2017200.h19v04.006.2017201090724.hdf + BOTH + 2015-07-02T16:47:38.000Z + 6.4.4AS + + + 6.4.11 + + + + 2003-02-19T00:00:00Z + 2003-02-19T23:59:59Z + + + + + + + + + -70.004161028804404 + -0.004166666666662 + + + -60.004177439215297 + -0.004166666666662 + + + -60.929844150213498 + -9.995833333333330 + + + -71.084093041393103 + -9.995833333333330 + + + + + + + + + MOD 1KM L3 LST + + 0 + 0 + 0 + 92 + + + Passed + No automatic quality assessment is performed in the PGE. + Not Investigated + See http://landweb.nascom.nasa.gov/cgi-bin/QA_WWW/qaFlagPage.cgi?sat + + + + + + Terra + + + MODIS + + + MODIS + + + + + + + + + QAFRACTIONGOODQUALITY + + 0.0285983 + + + + QAPERCENTNOTPRODUCEDOTHER + + 0 + + + + CLOUD_CONTAMINATED_LST_SCREENED + + YES + + + + VERTICALTILENUMBER + + 09 + + + + QAFRACTIONOTHERQUALITY + + 0.0543250 + + + + QAPERCENTGOODQUALITY + + 3 + + + + HORIZONTALTILENUMBER + + 11 + + + + QAFRACTIONNOTPRODUCEDCLOUD + + 0.9170767 + + + + TileID + + 51011009 + + + + QAFRACTIONNOTPRODUCEDOTHER + + 0.0000000 + + + + identifier_product_doi + + 10.5067/MODIS/MOD11A1.006 + + + + N_GRAN_POINTERS + + 28 + + + + QAPERCENTNOTPRODUCEDCLOUD + + 92 + + + + identifier_product_doi_authority + + http://dx.doi.org + + + + QAPERCENTOTHERQUALITY + + 5 + + + + + MOD03.A2003050.0315.006.2012268032410.hdf + MOD021KM.A2003050.0315.006.2014220090715.hdf + MOD35_L2.A2003050.0315.006.2014320160924.hdf + MOD07_L2.A2003050.0315.006.2014320161105.hdf + MOD03.A2003050.1355.006.2012268034643.hdf + + + 11 + 09 + MODIS Tile SIN + + + + https://fvk4vim143.execute-api.us-east-1.amazonaws.com/dev/MOD11A1.A2017200.h19v04.006.2017201090724.hdf + Download MOD11A1.A2017200.h19v04.006.2017201090724.hdf + + + http://cumulus-test-sandbox-public.s3.amazonaws.com/MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg + Download MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg + + + http://cumulus-test-sandbox-public.s3.amazonaws.com/MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg + Download MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg + + + https://fvk4vim143.execute-api.us-east-1.amazonaws.com/dev/MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml + Download MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml + + + true + true + 92 + diff --git a/package.json b/package.json index 149e9ae5efb..52d234e3437 100644 --- a/package.json +++ b/package.json @@ -33,7 +33,7 @@ "bootstrap-no-build-no-scripts": "lerna bootstrap --no-ci --force-local --ignore-scripts", "bootstrap-no-build-no-concurrency": "lerna bootstrap --no-ci --force-local --concurrency 1", "bootstrap-no-build-quiet": "lerna bootstrap --no-ci --force-local --loglevel=error", - "ci:bootstrap": "lerna bootstrap --no-ci --force-local --ignore-scripts && lerna run prepublish", + "ci:bootstrap": "lerna bootstrap --no-ci --force-local && lerna run prepublish", "ci:bootstrap-no-scripts": "lerna bootstrap --no-ci --force-local --ignore-scripts", "ci:bootstrap-no-scripts-quiet": "lerna bootstrap --no-ci --force-local --ignore-scripts --loglevel=error", "update": "lerna version --exact --force-publish --no-git-tag-version --no-push", diff --git a/packages/api-client/src/granules.ts b/packages/api-client/src/granules.ts index 305a157defb..2cbd9c7936b 100644 --- a/packages/api-client/src/granules.ts +++ b/packages/api-client/src/granules.ts @@ -602,6 +602,40 @@ export const associateExecutionWithGranule = async (params: { }); }; +/** + * Update granules to new details in cumulus + * POST /granules/bulk + * + * @param params - params + * @param params.prefix - the prefix configured for the stack + * @param params.body - body to pass the API lambda + * @param params.callback - async function to invoke the api lambda + * that takes a prefix / user payload. Defaults + * to cumulusApiClient.invokeApifunction to invoke the + * api lambda + * @returns - the response from the callback + */ +export const updateGranules = async (params: { + prefix: string, + body: ApiGranule[], + callback?: InvokeApiFunction +}): Promise => { + const { prefix, body, callback = invokeApi } = params; + return await callback({ + prefix: prefix, + payload: { + httpMethod: 'PATCH', + resource: '/{proxy+}', + headers: { + 'Content-Type': 'application/json', + }, + path: '/granules/', + body: JSON.stringify(body), + }, + expectedStatusCodes: 200, + }); +}; + /** * Bulk operations on granules stored in cumulus * POST /granules/bulk diff --git a/packages/api/endpoints/granules.js b/packages/api/endpoints/granules.js index f3b3ed47544..94db3e9ef14 100644 --- a/packages/api/endpoints/granules.js +++ b/packages/api/endpoints/granules.js @@ -25,6 +25,7 @@ const { translatePostgresCollectionToApiCollection, translatePostgresGranuleToApiGranule, getGranuleAndCollection, + updateGranulesAndFiles, } = require('@cumulus/db'); const { deleteGranuleAndFiles } = require('../src/lib/granule-delete'); @@ -846,6 +847,34 @@ async function getByGranuleId(req, res) { return res.send({ ...result, recoveryStatus }); } +/** + * Based on a move-collections-task, will update a list of + * moved granules' records in PG and ES + * + * @param {Object} req - express request object + * @param {Object} res - express response object + * @returns {Promise} the promise of express response object + */ +async function updateGranulesAndFilesCollectionRecords(req, res) { + const { + knex = await getKnexClient(), + } = req.testContext || {}; + const payload = req.body; + try { + await updateGranulesAndFiles(knex, payload); + // update ES + } catch (error) { + log.error( + 'failed to update granules:', + error + ); + return res.boom.badRequest(errorify(error)); + } + return res.send({ + message: 'Successfully updated granules', + }); +} + async function bulkOperations(req, res) { const payload = req.body; @@ -1009,6 +1038,7 @@ router.patch('/:granuleId', requireApiVersion(2), patchByGranuleId); router.patch('/:collectionId/:granuleId', requireApiVersion(2), patch); router.put('/:collectionId/:granuleId', requireApiVersion(2), put); +router.patch('/', updateGranulesAndFilesCollectionRecords); router.post( '/bulk', validateBulkGranulesRequest, @@ -1039,5 +1069,6 @@ module.exports = { put, patch, patchGranule, + updateGranulesAndFilesCollectionRecords, router, }; diff --git a/packages/api/tests/endpoints/test-granules-updateGranulesAndFilesCollection.js b/packages/api/tests/endpoints/test-granules-updateGranulesAndFilesCollection.js new file mode 100644 index 00000000000..3a9e01e47e3 --- /dev/null +++ b/packages/api/tests/endpoints/test-granules-updateGranulesAndFilesCollection.js @@ -0,0 +1,292 @@ +/* eslint-disable no-await-in-loop */ +const test = require('ava'); +const range = require('lodash/range'); + +const cryptoRandomString = require('crypto-random-string'); +const { + CollectionPgModel, + destroyLocalTestDb, + fakeCollectionRecordFactory, + fakeFileRecordFactory, + fakeGranuleRecordFactory, + FilePgModel, + generateLocalTestDb, + getUniqueGranuleByGranuleId, + GranulePgModel, + localStackConnectionEnv, + migrationDir, + translatePostgresCollectionToApiCollection, + translatePostgresGranuleResultToApiGranule, +} = require('@cumulus/db'); +const { + createBucket, + recursivelyDeleteS3Bucket, + s3PutObject, +} = require('@cumulus/aws-client/S3'); +const { randomString, randomId } = require('@cumulus/common/test-utils'); +const { constructCollectionId } = require('@cumulus/message/Collections'); +const models = require('../../models'); + +const { request } = require('../helpers/request'); + +// Dynamo mock data factories +const { + createFakeJwtAuthToken, + setAuthorizedOAuthUsers, +} = require('../../lib/testUtils'); + +const testDbName = `granules_${cryptoRandomString({ length: 10 })}`; + +let accessTokenModel; +let jwtAuthToken; + +process.env.AccessTokensTable = randomId('token'); +process.env.stackName = randomId('stackname'); +process.env.system_bucket = randomId('system-bucket'); +process.env.TOKEN_SECRET = randomId('secret'); +process.env.backgroundQueueUrl = randomId('backgroundQueueUrl'); + +// import the express app after setting the env variables +const { app } = require('../../app'); + +/** + * Simulate granule records post-collection-move for database updates test + * + * @param {Knex | Knex.Transaction} knexOrTransaction - DB client or transaction + * @param {Array} [granules] - granule records to update + * @param {Object} [collection] - current collection of granules used for translation + * @param {string} [collectionId] - collectionId of current granules + * @param {string} [collectionId2] - collectionId of collection that the granule is moving to + * @returns {Array} - list of updated apiGranules (moved to new collection) + */ +const simulateGranuleUpdate = async (knex, granules, collection, collectionId, collectionId2) => { + const movedGranules = []; + for (const granule of granules) { + const postMoveApiGranule = await translatePostgresGranuleResultToApiGranule(knex, { + ...granule, + collectionName: collection.name, + collectionVersion: collection.version, + }); + postMoveApiGranule.collectionId = collectionId2; + postMoveApiGranule.updatedAt = Date.now(); + postMoveApiGranule.lastUpdateDateTime = new Date().toISOString(); + for (const apiFile of postMoveApiGranule.files) { + apiFile.bucket = apiFile.bucket.replace(collectionId, collectionId2); + apiFile.key = apiFile.key.replace(collectionId, collectionId2); + apiFile.updatedAt = Date.now(); + } + movedGranules.push(postMoveApiGranule); + } + return movedGranules; +}; + +test.before(async (t) => { + const { knexAdmin, knex } = await generateLocalTestDb( + testDbName, + migrationDir + ); + process.env.CMR_ENVIRONMENT = 'SIT'; + process.env = { + ...process.env, + ...localStackConnectionEnv, + PG_DATABASE: testDbName, + }; + + // create a fake bucket + await createBucket(process.env.system_bucket); + + // create a workflow template file + const tKey = `${process.env.stackName}/workflow_template.json`; + await s3PutObject({ Bucket: process.env.system_bucket, Key: tKey, Body: '{}' }); + + const username = randomString(); + await setAuthorizedOAuthUsers([username]); + + accessTokenModel = new models.AccessToken(); + await accessTokenModel.createTable(); + + jwtAuthToken = await createFakeJwtAuthToken({ accessTokenModel, username }); + t.context.knexAdmin = knexAdmin; + t.context.knex = knex; + + t.context.granulePgModel = new GranulePgModel(); + t.context.collectionPgModel = new CollectionPgModel(); + t.context.filePgModel = new FilePgModel(); + + // set up 2 collections + t.context.collection = fakeCollectionRecordFactory(); + t.context.collection2 = fakeCollectionRecordFactory(); + t.context.collectionId = constructCollectionId( + t.context.collection.name, + t.context.collection.version + ); + t.context.collectionId2 = constructCollectionId( + t.context.collection2.name, + t.context.collection2.version + ); + const collectionResponse = await t.context.collectionPgModel.create( + t.context.knex, + t.context.collection + ); + const collectionResponse2 = await t.context.collectionPgModel.create( + t.context.knex, + t.context.collection2 + ); + t.context.collectionCumulusId = collectionResponse[0].cumulus_id; + t.context.collectionCumulusId2 = collectionResponse2[0].cumulus_id; + t.context.apiCollection1 = translatePostgresCollectionToApiCollection(collectionResponse[0]); + t.context.apiCollection2 = translatePostgresCollectionToApiCollection(collectionResponse2[0]); + + // create 10 granules in one collection, 0 in the other + t.context.granuleIds = range(10).map((num) => 'granuleId___' + num); + + t.context.granulePgModel = new GranulePgModel(); + t.context.granules = range(10).map((num) => fakeGranuleRecordFactory({ + granule_id: t.context.granuleIds[num], + collection_cumulus_id: t.context.collectionCumulusId, + cumulus_id: num, + })); + t.context.pgGranules = await t.context.granulePgModel.insert( + knex, + t.context.granules + ); + + t.context.movedGranules = []; + + t.context.files = []; + // create fake files for each of the ten granules (3 per granule) + for (const pgGranule of t.context.granules) { + t.context.files.push( + fakeFileRecordFactory({ + granule_cumulus_id: pgGranule.cumulus_id, + file_name: pgGranule.granule_id + '.hdf', + updated_at: new Date().toISOString(), + bucket: t.context.collectionId + '--bucket', + key: t.context.collectionId + pgGranule.granule_id + '/key-hdf.pem', + path: t.context.collectionId + '/' + pgGranule.granule_id, + }), + fakeFileRecordFactory({ + granule_cumulus_id: pgGranule.cumulus_id, + file_name: pgGranule.granule_id + '.txt', + updated_at: new Date().toISOString(), + bucket: t.context.collectionId + '--bucket', + key: t.context.collectionId + pgGranule.granule_id + '/key-txt.pem', + path: t.context.collectionId + '/' + pgGranule.granule_id, + }), + fakeFileRecordFactory({ + granule_cumulus_id: pgGranule.cumulus_id, + file_name: pgGranule.granule_id + '.cmr', + updated_at: new Date().toISOString(), + bucket: t.context.collectionId + '--bucket', + key: t.context.collectionId + pgGranule.granule_id + '/key-cmr.pem', + path: t.context.collectionId + '/' + pgGranule.granule_id, + }) + ); + } + + t.context.pgFiles = await t.context.filePgModel.insert(knex, t.context.files); + // update 1/2 of the granules to be moved to the new collection + t.context.movedGranules.push(await simulateGranuleUpdate(knex, t.context.granules.slice(0, 5), + t.context.collection, t.context.collectionId, t.context.collectionId2)); + + // the other half will be unmoved but translated to an apiGranule + t.context.movedGranules.push(await simulateGranuleUpdate(knex, t.context.granules.slice(5), + t.context.collection, t.context.collectionId, t.context.collectionId)); + + t.context.movedGranules = t.context.movedGranules.flat(); +}); + +test.after.always(async (t) => { + await accessTokenModel.deleteTable(); + await recursivelyDeleteS3Bucket(process.env.system_bucket); + await destroyLocalTestDb({ + knex: t.context.knex, + knexAdmin: t.context.knexAdmin, + testDbName, + }); +}); + +test.serial('PATCH successfully updates a partial list of granules based on the collectionId change ', async (t) => { + const { + granuleIds, + granulePgModel, + movedGranules, + collectionId2, + collectionId, + collection, + collection2, + knex, + } = t.context; + const response = await request(app) + .patch('/granules/') + .set('Accept', 'application/json') + .set('Authorization', `Bearer ${jwtAuthToken}`) + .send(movedGranules) + .expect(200); + + const body = response.body; + t.true(body.message.includes('Successfully updated granules')); + const returnedGranules = await Promise.all(granuleIds.map((id) => + getUniqueGranuleByGranuleId(knex, id, granulePgModel))); + + for (const granule of returnedGranules) { + const testCollection = granule.cumulus_id >= 5 ? collection : collection2; + const testCollectionId = granule.cumulus_id >= 5 ? collectionId : collectionId2; + const apiGranule = await translatePostgresGranuleResultToApiGranule(knex, { + ...granule, + collectionName: testCollection.name, + collectionVersion: testCollection.version, + }); + // the movedGranules param only has 1/2 of the granules to be moved to collection 2 + // here we can check based on the granule's cumulus id which collection it should be a part of + t.true(apiGranule.collectionId === testCollectionId); + for (const file of apiGranule.files) { + t.true(file.key.includes(testCollectionId)); + t.true(file.bucket.includes(testCollectionId)); + } + } +}); + +test.serial('PATCH successfully updates a complete list of granules, 1/2 of which have already been moved', async (t) => { + const { + granuleIds, + granulePgModel, + granules, + movedGranules, + collectionId2, + collectionId, + collection, + collection2, + knex, + } = t.context; + movedGranules.splice(-5); + movedGranules.push(await simulateGranuleUpdate(knex, granules.slice(5), collection, + collectionId, collectionId2)); + + const testPostMoveApiGranules = movedGranules.flat(); + const response = await request(app) + .patch('/granules/') + .set('Accept', 'application/json') + .set('Authorization', `Bearer ${jwtAuthToken}`) + .send(testPostMoveApiGranules) + .expect(200); + + const body = response.body; + t.true(body.message.includes('Successfully updated granules')); + const returnedGranules = await Promise.all(granuleIds.map((id) => + getUniqueGranuleByGranuleId(knex, id, granulePgModel))); + + for (const granule of returnedGranules) { + const apiGranule = await translatePostgresGranuleResultToApiGranule(knex, { + ...granule, + collectionName: collection2.name, + collectionVersion: collection2.version, + }); + // now every granule should be part of collection 2 + t.true(apiGranule.collectionId === collectionId2); + for (const file of apiGranule.files) { + t.true(file.key.includes(collectionId2)); + t.true(file.bucket.includes(collectionId2)); + } + } +}); diff --git a/packages/aws-client/src/S3.ts b/packages/aws-client/src/S3.ts index effd620a375..4255ab8ca8c 100644 --- a/packages/aws-client/src/S3.ts +++ b/packages/aws-client/src/S3.ts @@ -185,11 +185,11 @@ export const headObject = ( /** * Test if an object exists in S3 * - * @param {Object} params - same params as https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html#headObject-property - * @returns {Promise} a Promise that will resolve to a boolean indicating + * @param params - same params as https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html#headObject-property + * @returns a Promise that will resolve to a boolean indicating * if the object exists */ -export const s3ObjectExists = (params: { Bucket: string, Key: string }) => +export const s3ObjectExists = (params: { Bucket: string, Key: string }): Promise => headObject(params.Bucket, params.Key) .then(() => true) .catch((error) => { @@ -1122,16 +1122,6 @@ export const multipartCopyObject = async ( /** * Move an S3 object to another location in S3 - * - * @param {Object} params - * @param {string} params.sourceBucket - * @param {string} params.sourceKey - * @param {string} params.destinationBucket - * @param {string} params.destinationKey - * @param {string} [params.ACL] - an [S3 Canned ACL](https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#canned-acl) - * @param {boolean} [params.copyTags=false] - * @param {number} [params.chunkSize] - chunk size of the S3 multipart uploads - * @returns {Promise} */ export const moveObject = async ( params: { diff --git a/packages/cmrjs/src/cmr-utils.js b/packages/cmrjs/src/cmr-utils.js index 59fa5ff81b6..fef0625b4fe 100644 --- a/packages/cmrjs/src/cmr-utils.js +++ b/packages/cmrjs/src/cmr-utils.js @@ -32,7 +32,9 @@ const { xmlParseOptions, ummVersionToMetadataFormat, } = require('./utils'); - +/** + * @typedef {import('@cumulus/cmr-client/CMR').CMRConstructorParams} CMRConstructorParams + */ const log = new Logger({ sender: '@cumulus/cmrjs/src/cmr-utils' }); function getS3KeyOfFile(file) { @@ -125,6 +127,10 @@ function granuleToCmrFileObject({ granuleId, files = [] }, filterFunc = isCMRFil }); } +/** + * @typedef {import('./types').CMRFile} CMRFile + */ + /** * Reduce granule object array to CMR files array * @@ -132,7 +138,7 @@ function granuleToCmrFileObject({ granuleId, files = [] }, filterFunc = isCMRFil * @param {Function} filterFunc - function to determine if the given file object is a CMR file; defaults to `isCMRFile` * - * @returns {Array} - CMR file object array: { etag, bucket, key, granuleId } + * @returns {Array} - CMR file object array: { etag, bucket, key, granuleId } */ function granulesToCmrFileObjects(granules, filterFunc = isCMRFile) { return granules.flatMap((granule) => granuleToCmrFileObject(granule, filterFunc)); @@ -218,18 +224,16 @@ async function publishUMMGJSON2CMR(cmrFile, cmrClient, revisionId) { * @param {string} cmrRevisionId - Optional CMR Revision ID * if not provided, CMR username and password are used to get a cmr token */ -async function publish2CMR(cmrPublishObject, creds, cmrRevisionId) { +function publish2CMR(cmrPublishObject, creds, cmrRevisionId) { const cmrClient = new CMR(creds); const cmrFileName = getFilename(cmrPublishObject); - // choose xml or json and do the things. if (isECHO10Filename(cmrFileName)) { - return await publishECHO10XML2CMR(cmrPublishObject, cmrClient, cmrRevisionId); + return publishECHO10XML2CMR(cmrPublishObject, cmrClient, cmrRevisionId); } if (isUMMGFilename(cmrFileName)) { - return await publishUMMGJSON2CMR(cmrPublishObject, cmrClient, cmrRevisionId); + return publishUMMGJSON2CMR(cmrPublishObject, cmrClient, cmrRevisionId); } - throw new Error(`invalid cmrPublishObject passed to publis2CMR ${JSON.stringify(cmrPublishObject)}`); } @@ -749,7 +753,7 @@ async function updateUMMGMetadata({ * @param {string} cmrConfig.certificate - Launchpad certificate * @param {string} cmrConfig.username - EDL username * @param {string} cmrConfig.passwordSecretName - CMR password secret name - * @returns {Promise} object to create CMR instance - contains the + * @returns {Promise} object to create CMR instance - contains the * provider, clientId, and either launchpad token or EDL username and * password */ diff --git a/packages/cmrjs/src/types.ts b/packages/cmrjs/src/types.ts new file mode 100644 index 00000000000..4986e3bf5d4 --- /dev/null +++ b/packages/cmrjs/src/types.ts @@ -0,0 +1,6 @@ +export interface CMRFile { + key: string + bucket: string +} + +export interface ISOFile extends CMRFile { } diff --git a/packages/db/src/index.ts b/packages/db/src/index.ts index 3a417e174b5..1ab74257863 100644 --- a/packages/db/src/index.ts +++ b/packages/db/src/index.ts @@ -134,6 +134,7 @@ export { getGranulesByApiPropertiesQuery, getGranulesByGranuleId, getGranuleAndCollection, + updateGranulesAndFiles, } from './lib/granule'; export { diff --git a/packages/db/src/lib/granule.ts b/packages/db/src/lib/granule.ts index b6e2ca273fb..aebd27f648d 100644 --- a/packages/db/src/lib/granule.ts +++ b/packages/db/src/lib/granule.ts @@ -6,19 +6,25 @@ import { deconstructCollectionId, } from '@cumulus/message/Collections'; +import { ApiGranuleRecord } from '@cumulus/types/api/granules'; import { RecordDoesNotExist } from '@cumulus/errors'; import Logger from '@cumulus/logger'; +import { createRejectableTransaction } from '../database'; import { CollectionPgModel } from '../models/collection'; import { GranulePgModel } from '../models/granule'; import { GranulesExecutionsPgModel } from '../models/granules-executions'; import { PostgresGranule, PostgresGranuleRecord } from '../types/granule'; import { GranuleWithProviderAndCollectionInfo } from '../types/query'; import { UpdatedAtRange } from '../types/record'; +import { translateApiGranuleToPostgresGranule } from '../translate/granules'; +import { translateApiFiletoPostgresFile } from '../translate/file'; const { deprecate } = require('@cumulus/common/util'); const { TableNames } = require('../tables'); +const log = new Logger({ sender: '@cumulus/api/lib/writeRecords/write-granules' }); + export const getGranuleCollectionId = async ( knexOrTransaction: Knex, granule: { collection_cumulus_id: number } @@ -354,3 +360,61 @@ export const getGranulesByGranuleId = async ( .where({ granule_id: granuleId }); return records; }; + +/** + * Change a granules' PG record and its files' PG record based on collection move + * + * @param {Knex | Knex.Transaction} knexOrTransaction - DB client or transaction + * @param {Array} [granules] - updated ApiGranule records + * @returns {Promise} + */ +export const updateGranulesAndFiles = async ( + knexOrTransaction: Knex | Knex.Transaction, + granules: Array +):Promise => { + const { + granules: granulesTable, + files: filesTable, + } = TableNames; + + try { + await createRejectableTransaction(knexOrTransaction, async (trx) => { + await Promise.all(granules.map(async (granule) => { + const pgGranule = await translateApiGranuleToPostgresGranule({ + dynamoRecord: granule, + knexOrTransaction, + }); + await trx(granulesTable).where('granule_id', '=', pgGranule.granule_id).update( + { + collection_cumulus_id: pgGranule.collection_cumulus_id, + updated_at: pgGranule.updated_at, + status: pgGranule.status, + last_update_date_time: pgGranule.last_update_date_time, + } + ); + if (granule.files) { + await Promise.all(granule.files.map(async (file) => { + const pgFile = translateApiFiletoPostgresFile( + { + ...file, + granuleId: pgGranule.granule_id, + } + ); + + await trx(filesTable).where('file_name', '=', String(pgFile.file_name)).update( + { + updated_at: pgGranule.updated_at, + bucket: pgFile.bucket, + key: pgFile.key, + path: pgFile.path, + } + ); + })); + } + })); + }); + } catch (thrownError) { + log.error(`Write Granule and Files failed: ${JSON.stringify(thrownError)}`); + throw thrownError; + } +}; diff --git a/packages/errors/src/index.ts b/packages/errors/src/index.ts index 6ce1d9eba8a..2846669d185 100644 --- a/packages/errors/src/index.ts +++ b/packages/errors/src/index.ts @@ -148,6 +148,8 @@ export const MissingRequiredEnvVar = createErrorType('MissingRequiredEnvVar'); export const MissingRequiredEnvVarError = createErrorType('MissingRequiredEnvVarError'); +export const MissingS3FileError = createErrorType('MissingS3FileError'); + export const PDRParsingError = createErrorType('PDRParsingError'); export const ProviderNotFound = createErrorType('ProviderNotFound'); diff --git a/tasks/move-granule-collections/.babelrc b/tasks/move-granule-collections/.babelrc new file mode 100644 index 00000000000..1e3ca31c340 --- /dev/null +++ b/tasks/move-granule-collections/.babelrc @@ -0,0 +1,15 @@ +{ + "comments": false, + "sourceMaps": "both", + "sourceType": "unambiguous", + "presets": [ + ["@babel/preset-env", { + "targets": { + "node": "20.12.2" + } + }] + ], + "plugins": [ + "source-map-support" + ] +} diff --git a/tasks/move-granule-collections/.gitignore b/tasks/move-granule-collections/.gitignore new file mode 100644 index 00000000000..f47c80a2fb8 --- /dev/null +++ b/tasks/move-granule-collections/.gitignore @@ -0,0 +1,4 @@ +/*.js +/*.d.ts +lambda.zip +webpack \ No newline at end of file diff --git a/tasks/move-granule-collections/.npmignore b/tasks/move-granule-collections/.npmignore new file mode 100644 index 00000000000..c5aca0ae906 --- /dev/null +++ b/tasks/move-granule-collections/.npmignore @@ -0,0 +1,2 @@ +/nyc.config.js +/tests/ diff --git a/tasks/move-granule-collections/.nycrc.json b/tasks/move-granule-collections/.nycrc.json new file mode 100644 index 00000000000..f0e2078d7e6 --- /dev/null +++ b/tasks/move-granule-collections/.nycrc.json @@ -0,0 +1,10 @@ +{ + "extends": "../../nyc.config.js", + "include": [ + "*.js" + ], + "statements": 92.0, + "functions": 92.0, + "branches": 78.0, + "lines": 91.0 +} \ No newline at end of file diff --git a/tasks/move-granule-collections/README.md b/tasks/move-granule-collections/README.md new file mode 100644 index 00000000000..7a751452b5e --- /dev/null +++ b/tasks/move-granule-collections/README.md @@ -0,0 +1,45 @@ +# @cumulus/move-granule-collections + +This lambda function moves granules between collections in s3 and in postgres + +## Message Configuration + +For more information on configuring a Cumulus Message Adapter task, see [the Cumulus workflow input/output documentation](https://nasa.github.io/cumulus/docs/workflows/input_output). + +### Config + +Config object fields: + +| field name | type | default | values | description +| ---------- | ---- | ------- | ------ | ----------- + +| buckets | object | (required) | | Object specifying AWS S3 buckets used by this task +| collection | object | (required) | | The cumulus-api collection object +| distribution_endpoint | string | (required) | | The API distribution endpoint +| s3MultipartChunksizeMb | number | | | S3 multipart upload chunk size in MB. If none is specified, the default `default_s3_multipart_chunksize_mb` is used. + +### Input + +Input object fields: + +| field name | type | default | description +| ---------- | ---- | ------- | ----------- +| granules | array\ | (required) | List of granule objects + +### Output + +Output object fields: + +| field name | type | default | description +| ---------- | ---- | ------- | ----------- +granules | array\ | N/A | List of granule objects with updated S3 location information + +## About Cumulus + +Cumulus is a cloud-based data ingest, archive, distribution and management prototype for NASA's future Earth science data streams. + +[Cumulus Documentation](https://nasa.github.io/cumulus) + +## Contributing + +To make a contribution, please [see our contributing guidelines](https://github.com/nasa/cumulus/blob/master/CONTRIBUTING.md). diff --git a/tasks/move-granule-collections/bin/package.sh b/tasks/move-granule-collections/bin/package.sh new file mode 100755 index 00000000000..e968404c371 --- /dev/null +++ b/tasks/move-granule-collections/bin/package.sh @@ -0,0 +1,16 @@ +#!/bin/sh + +set -e + +rm -rf dist/webpack + +npm run webpack + +cp -R schemas dist/webpack + +( + set -e + + cd dist/webpack + rm -f lambda.zip && node ../../../../bin/zip.js lambda.zip index.js schemas +) diff --git a/tasks/move-granule-collections/package.json b/tasks/move-granule-collections/package.json new file mode 100644 index 00000000000..dab878bb506 --- /dev/null +++ b/tasks/move-granule-collections/package.json @@ -0,0 +1,60 @@ +{ + "name": "@cumulus/move-granule-collections", + "version": "19.1.0", + "description": "Move granule files and postgres records to a new collection", + "main": "dist/index.js", + "directories": { + "test": "tests" + }, + "homepage": "https://github.com/nasa/cumulus/tree/master/tasks/move-granule-collections", + "repository": { + "type": "git", + "url": "https://github.com/nasa/cumulus", + "directory": "tasks/move-granule-collections" + }, + "engines": { + "node": ">=20.12.2" + }, + "scripts": { + "clean": "rm -rf dist", + "package": "./bin/package.sh", + "test": "../../node_modules/.bin/ava", + "test:ci": "../../scripts/run_package_ci_unit.sh", + "test:coverage": "../../node_modules/.bin/nyc npm test", + "prepare": "npm run tsc", + "tsc": "../../node_modules/.bin/tsc", + "tsc:listEmittedFiles": "../../node_modules/.bin/tsc --listEmittedFiles", + "watch-test": "../../node_modules/.bin/tsc-watch --onsuccess 'npm test'", + "webpack": "../../node_modules/.bin/webpack", + "coverage": "python ../../scripts/coverage_handler/coverage.py" + }, + "ava": { + "files": [ + "tests/*.js" + ], + "failFast": true, + "serial": true, + "verbose": true, + "timeout": "15m" + }, + "author": "Cumulus Authors", + "license": "Apache-2.0", + "dependencies": { + "@cumulus/api-client": "19.1.0", + "@cumulus/aws-client": "19.1.0", + "@cumulus/cmr-client": "19.1.0", + "@cumulus/cmrjs": "19.1.0", + "@cumulus/common": "19.1.0", + "@cumulus/cumulus-message-adapter-js": "2.2.0", + "@cumulus/distribution-utils": "19.1.0", + "@cumulus/errors": "19.1.0", + "@cumulus/ingest": "19.1.0", + "@cumulus/message": "19.1.0", + "@cumulus/types": "19.1.0", + "lodash": "^4.17.21" + }, + "devDependencies": { + "@cumulus/db": "^19.1.0", + "@cumulus/schemas": "19.1.0" + } +} diff --git a/tasks/move-granule-collections/schemas/config.json b/tasks/move-granule-collections/schemas/config.json new file mode 100644 index 00000000000..bbf6fd6de7f --- /dev/null +++ b/tasks/move-granule-collections/schemas/config.json @@ -0,0 +1,75 @@ +{ + "title": "MoveGranuleCollectionsConfig", + "description": "Describes the config used by the move-granule-collections task", + "type": "object", + "required": [ + "distribution_endpoint", + "collection", + "buckets" + ], + "properties": { + "buckets": { + "type": "object", + "description": "aws s3 buckets used by this task", + "patternProperties": { + "\\S*": { + "description": "bucket configuration for the key'd bucket", + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "name of the S3 bucket" + }, + "type": { + "type": "string", + "description": "the type of bucket - i.e. internal, public, private, protected" + } + } + } + } + }, + "distribution_endpoint": { + "type": "string", + "description": "The api distribution endpoint" + }, + "collection": { + "type": "object", + "description": "collection to move granules over to", + "required": [ + "files", + "name" + ], + "properties": { + "name": { "type": "string" }, + "process": { "type": "string" }, + "url_path": { "type": "string" }, + "duplicateHandling": { "type": "string" }, + "files": { + "type": "array", + "items": { + "type": "object", + "required": ["regex", "bucket"], + "properties": { + "regex": { "type": "string" }, + "bucket": { "type": "string" }, + "url_path": { "type": "string" } + } + } + }, + "meta": { + "title": "Optional Metadata for the Collection", + "type": "object", + "additionalProperties": true, + "granuleMetadataFileExtension": { + "description": "Set to the file extension for task to look for the granule metadata information. If none is specified, the granule CMR metadata or ISO metadata file is used", + "type": "string" + } + } + } + }, + "s3MultipartChunksizeMb": { + "type": ["number", "null"], + "description": "S3 multipart upload chunk size in MB. If none is specified, the default default_s3_multipart_chunksize_mb is used." + } + } +} diff --git a/tasks/move-granule-collections/schemas/input.json b/tasks/move-granule-collections/schemas/input.json new file mode 100644 index 00000000000..1ee9e8e74ff --- /dev/null +++ b/tasks/move-granule-collections/schemas/input.json @@ -0,0 +1,71 @@ +{ + "title": "MoveGranuleCollectionsInput", + "description": "Describes the input expected by the move-granules task", + "type": "object", + "required": [ + "granules" + ], + "properties": { + "granules": { + "type": "array", + "description": "Array of all granules", + "items": { + "type": "object", + "required": [ + "granuleId", + "files" + ], + "properties": { + "granuleId": { + "type": "string" + }, + "files": { + "type": "array", + "items": { + "additionalProperties": false, + "type": "object", + "required": [ + "bucket", + "key" + ], + "properties": { + "bucket": { + "description": "Bucket where file is archived in S3", + "type": "string" + }, + "checksum": { + "description": "Checksum value for file", + "type": "string" + }, + "checksumType": { + "description": "Type of checksum (e.g. md5, sha256, etc)", + "type": "string" + }, + "fileName": { + "description": "Name of file (e.g. file.txt)", + "type": "string" + }, + "key": { + "description": "S3 Key for archived file", + "type": "string" + }, + "size": { + "description": "Size of file (in bytes)", + "type": "number" + }, + "source": { + "description": "Source URI of the file from origin system (e.g. S3, FTP, HTTP)", + "type": "string" + }, + "type": { + "description": "Type of file (e.g. data, metadata, browse)", + "type": "string" + } + } + } + } + } + } + } + } +} \ No newline at end of file diff --git a/tasks/move-granule-collections/schemas/input.json.template b/tasks/move-granule-collections/schemas/input.json.template new file mode 100644 index 00000000000..1951a2119f7 --- /dev/null +++ b/tasks/move-granule-collections/schemas/input.json.template @@ -0,0 +1,24 @@ +{ + "title": "MoveGranuleCollectionsInput", + "description": "Describes the input expected by the move-granules task", + "type": "object", + "required": [ + "granules" + ], + "properties": { + "granules": { + "type": "array", + "description": "Array of all granules", + "items": { + "type": "object", + "required": ["granuleId", "files"], + "properties": { + "granuleId": { + "type": "string" + }, + "files": "{{files}}" + } + } + } + } +} diff --git a/tasks/move-granule-collections/schemas/output.json b/tasks/move-granule-collections/schemas/output.json new file mode 100644 index 00000000000..61c5c5097b9 --- /dev/null +++ b/tasks/move-granule-collections/schemas/output.json @@ -0,0 +1,68 @@ +{ + "title": "MoveGranuleCollectionsOutput", + "description": "Describes the output produced by the move-granules task", + "type": "object", + "properties": { + "granules": { + "type": "array", + "description": "Array of all granules", + "items": { + "type": "object", + "required": [ + "granuleId", + "files" + ], + "properties": { + "granuleId": { + "type": "string" + }, + "files": { + "type": "array", + "items": { + "additionalProperties": false, + "type": "object", + "required": [ + "bucket", + "key" + ], + "properties": { + "bucket": { + "description": "Bucket where file is archived in S3", + "type": "string" + }, + "checksum": { + "description": "Checksum value for file", + "type": "string" + }, + "checksumType": { + "description": "Type of checksum (e.g. md5, sha256, etc)", + "type": "string" + }, + "fileName": { + "description": "Name of file (e.g. file.txt)", + "type": "string" + }, + "key": { + "description": "S3 Key for archived file", + "type": "string" + }, + "size": { + "description": "Size of file (in bytes)", + "type": "number" + }, + "source": { + "description": "Source URI of the file from origin system (e.g. S3, FTP, HTTP)", + "type": "string" + }, + "type": { + "description": "Type of file (e.g. data, metadata, browse)", + "type": "string" + } + } + } + } + } + } + } + } +} \ No newline at end of file diff --git a/tasks/move-granule-collections/schemas/output.json.template b/tasks/move-granule-collections/schemas/output.json.template new file mode 100644 index 00000000000..65e494d1527 --- /dev/null +++ b/tasks/move-granule-collections/schemas/output.json.template @@ -0,0 +1,21 @@ +{ + "title": "MoveGranuleCollectionsOutput", + "description": "Describes the output produced by the move-granules task", + "type": "object", + "properties": { + "granules": { + "type": "array", + "description": "Array of all granules", + "items": { + "type": "object", + "required": ["granuleId", "files"], + "properties": { + "granuleId": { + "type": "string" + }, + "files": "{{files}}" + } + } + } + } +} diff --git a/tasks/move-granule-collections/src/index.ts b/tasks/move-granule-collections/src/index.ts new file mode 100644 index 00000000000..ad68faa8bd1 --- /dev/null +++ b/tasks/move-granule-collections/src/index.ts @@ -0,0 +1,393 @@ +'use strict'; + +import { Context } from 'aws-lambda'; +import { runCumulusTask } from '@cumulus/cumulus-message-adapter-js'; +import get from 'lodash/get'; +import keyBy from 'lodash/keyBy'; +import cloneDeep from 'lodash/cloneDeep'; +import zip from 'lodash/zip'; +// eslint-disable-next-line lodash/import-scope +import { Dictionary } from 'lodash'; +import path from 'path'; +import { MissingS3FileError, DuplicateFile, InvalidArgument } from '@cumulus/errors'; +import { S3 } from '@cumulus/aws-client'; +import { CMR } from '@cumulus/cmr-client'; +import { + unversionFilename, +} from '@cumulus/ingest/granule'; +import { + isCMRFile, + isISOFile, + metadataObjectFromCMRFile, + granulesToCmrFileObjects, +} from '@cumulus/cmrjs'; +import { BucketsConfig } from '@cumulus/common'; +import { urlPathTemplate } from '@cumulus/ingest/url-path-template'; +import { isFileExtensionMatched } from '@cumulus/message/utils'; +import { constructCollectionId } from '@cumulus/message/Collections'; +import { log } from '@cumulus/common'; +import { ApiGranule, DuplicateHandling } from '@cumulus/types'; +import { ApiFile } from '@cumulus/types/api/files'; +import { AssertionError } from 'assert'; +import { CumulusMessage } from '@cumulus/types/message'; +import { CMRFile } from '@cumulus/cmrjs/types'; +import { CollectionFile } from '@cumulus/types'; +import { BucketsConfigObject } from '@cumulus/common/types'; +import { getCmrSettings } from '@cumulus/cmrjs/cmr-utils'; +import { CMRConstructorParams } from '@cumulus/cmr-client/CMR'; +import { s3CopyObject } from '@cumulus/aws-client/S3'; +import { updateGranules } from '@cumulus/api-client/granules'; +import { getRequiredEnvVar } from '@cumulus/common/env'; + +const MB = 1024 * 1024; + +interface EventConfig { + collection: { + meta: { + granuleMetadataFileExtension: string, + }, + url_path?: string, + files: Array, + duplicateHandling?: DuplicateHandling, + name: string, + version: string, + }, + buckets: BucketsConfigObject, + s3MultipartChunksizeMb?: number, +} + +interface MoveGranuleCollectionsEvent { + config: EventConfig, + cumulus_config?: { + cumulus_context?: { + forceDuplicateOverwrite?: boolean, + } + }, + input: { + granules: Array, + } +} + +interface ValidApiFile extends ApiFile { + bucket: string, + key: string +} + +/** + * Validates the file matched only one collection.file and has a valid bucket + * config. + */ +function identifyFileMatch( + bucketsConfig: BucketsConfig, + fileName: string, + fileSpecs: Array +): CollectionFile { + const collectionRegexes = fileSpecs.map((spec) => spec.regex); + const matches = fileSpecs.filter( + ((collectionFile) => unversionFilename(fileName).match(collectionFile.regex)) + ); + if (matches.length > 1) { + throw new InvalidArgument(`File (${fileName}) matched more than one of ${JSON.stringify(collectionRegexes)}.`); + } + if (matches.length === 0) { + throw new InvalidArgument(`File (${fileName}) did not match any of ${JSON.stringify(collectionRegexes)}`); + } + const [match] = matches; + if (!bucketsConfig.keyExists(match.bucket)) { + throw new InvalidArgument(`Collection config specifies a bucket key of ${match.bucket}, ` + + `but the configured bucket keys are: ${Object.keys(bucketsConfig).join(', ')}`); + } + return match; +} + +function apiFileIsValid(file: Omit): file is ValidApiFile { + if (file.bucket === undefined || file.key === undefined) { + return false; + } + return true; +} + +function isCMRMetadataFile(file: ApiFile | Omit): boolean { + return file.type === 'metadata'; +} + +function moveRequested( + sourceFile: ValidApiFile, + targetFile: ValidApiFile +): boolean { + return !((sourceFile.key === targetFile.key) && (sourceFile.bucket === targetFile.bucket)); +} + +async function s3MoveNeeded( + sourceFile: ValidApiFile, + targetFile: ValidApiFile, + isMetadataFile: boolean +): Promise { + if (!moveRequested(sourceFile, targetFile)) { + return false; + } + + const targetExists = await S3.s3ObjectExists({ Bucket: targetFile.bucket, Key: targetFile.key }); + /** + * cmrmetadata file must skip duplicate behavior since it will + * intentionally be duplicated during first move. It should be impossible to get here with + * target and *not* source existing, but this is not a normal duplicate situation. + * this is because old cmrfile won't be deleted until after postgres record is updated + */ + if (isMetadataFile) { + if (targetExists) { + return false; + } + return true; + } + const sourceExists = await S3.s3ObjectExists({ Bucket: sourceFile.bucket, Key: sourceFile.key }); + if (targetExists && sourceExists) { + // TODO should this use duplicateHandling? + throw new DuplicateFile(`target location ${{ Bucket: targetFile.bucket, Key: targetFile.key }} already occupied`); + } + if (sourceExists) { + return true; + } + if (targetExists) { + return false; + } + throw new MissingS3FileError(`source location ${{ Bucket: targetFile.bucket, Key: targetFile.key }} doesn't exist`); +} + +async function moveGranulesInS3( + sourceGranules: Array, + targetGranules: Array, + s3MultipartChunksizeMb?: number +): Promise { + await Promise.all( + zip(sourceGranules, targetGranules).map(async ([sourceGranule, targetGranule]) => { + if (sourceGranule?.files === undefined || targetGranule?.files === undefined) { + return null; + } + return Promise.all(zip(sourceGranule.files, targetGranule.files) + .map(async ([sourceFile, targetFile]) => { + if (!(sourceFile && targetFile)) { + return; + } + if (!apiFileIsValid(sourceFile) || !apiFileIsValid(targetFile)) { + throw new AssertionError({ message: '' }); + } + const isMetadataFile = isCMRMetadataFile(targetFile); + if (!await s3MoveNeeded(sourceFile, targetFile, isMetadataFile)) { + return; + } + if (isMetadataFile) { + await s3CopyObject({ + Bucket: targetFile.bucket, + Key: targetFile.key, + CopySource: `${sourceFile.bucket}/${sourceFile.key}`, + }); + } else { + await S3.moveObject({ + sourceBucket: sourceFile.bucket, + sourceKey: sourceFile.key, + destinationBucket: targetFile.bucket, + destinationKey: targetFile.key, + chunkSize: s3MultipartChunksizeMb, + }); + } + })); + }) + ); +} + +async function moveGranulesInCumulusDatastores( + targetGranules: Array +): Promise { + await updateGranules({ + prefix: getRequiredEnvVar('stackName'), + body: targetGranules, + }); +} + +async function cleanupCMRMetadataFiles( + sourceGranules: Array, + targetGranules: Array +) { + await Promise.all( + zip(sourceGranules, targetGranules).map(async ([sourceGranule, targetGranule]) => { + if (sourceGranule?.files === undefined || targetGranule?.files === undefined) { + return null; + } + return Promise.all(zip(sourceGranule.files, targetGranule.files) + .map(async ([sourceFile, targetFile]) => { + if (!(sourceFile && targetFile)) { + return; + } + if (!isCMRMetadataFile(targetFile)) { + return; + } + if (!apiFileIsValid(sourceFile) || !apiFileIsValid(targetFile)) { + throw new AssertionError({ message: '' }); + } + if ( + moveRequested(sourceFile, targetFile) && + await S3.s3ObjectExists({ Bucket: sourceFile.bucket, Key: sourceFile.key }) + ) { + await S3.deleteS3Object(sourceFile.bucket, sourceFile.key); + } + })); + }) + ); +} + +/** + * Move all files in a collection of granules from staging location to final location, + * and update granule files to include renamed files if any. + */ +async function moveFilesForAllGranules( + sourceGranules: Array, + targetGranules: Array, + s3MultipartChunksizeMb?: number +): Promise { + /** + * in order to parse targetGranules, specifically url_path for the files + * in the case that the collection is declared with a templated url_path, + * we must be able to find the cmrMetadata files in their original location untill all is done + */ + // move all non-cmrMetadata files and copy all cmrmetadata files + await moveGranulesInS3(sourceGranules, targetGranules, s3MultipartChunksizeMb); + // update postgres (or other cumulus datastores if applicable) + await moveGranulesInCumulusDatastores( + targetGranules + ); + // because cmrMetadata files were *copied* and not deleted, delete them now + await cleanupCMRMetadataFiles(sourceGranules, targetGranules); +} + +function updateFileMetadata( + file: Omit, + granule: ApiGranule, + config: EventConfig, + cmrMetadata: Object, + cmrFileNames: Array +): Omit { + if (file.key === undefined) { + throw new AssertionError({ message: 'damn' }); + } + const bucketsConfig = new BucketsConfig(config.buckets); + + const fileName = path.basename(file.key); + const cmrFileTypeObject: { type?: string } = {}; + if (cmrFileNames.includes(fileName) && !file.type) { + cmrFileTypeObject.type = 'metadata'; + } + + const match = identifyFileMatch(bucketsConfig, fileName, config.collection.files); + const URLPathTemplate = match.url_path || config.collection.url_path || ''; + const urlPath = urlPathTemplate(URLPathTemplate, { + file, + granule: granule, + cmrMetadata, + }); + const updatedBucket = bucketsConfig.nameByKey(match.bucket); + const updatedKey = S3.s3Join(urlPath, fileName); + const output = { + ...cloneDeep(file), + ...cmrFileTypeObject, // Add type if the file is a CMR file + bucket: updatedBucket, + key: updatedKey, + }; + return output; +} + +async function getCMRMetadata(cmrFile: CMRFile, granuleId: string): Promise { + try { + return metadataObjectFromCMRFile(`s3://${cmrFile.bucket}/${cmrFile.key}`); + } catch { + const cmrSettings: CMRConstructorParams = await getCmrSettings(); + const cmr = new CMR(cmrSettings); + const [granulesOutput] = await cmr.searchGranules({ granuleId }) as Array; + return granulesOutput; + } +} + +async function updateGranuleMetadata( + granule: ApiGranule, + config: EventConfig, + cmrFiles: { [key: string]: CMRFile }, + cmrFileNames: Array +): Promise { + const cmrFile = get(cmrFiles, granule.granuleId, null); + + const cmrMetadata = cmrFile ? + await getCMRMetadata(cmrFile, granule.granuleId) : + {}; + const newFiles = granule.files?.map( + (file) => updateFileMetadata(file, granule, config, cmrMetadata, cmrFileNames) + ); + return { + ...cloneDeep(granule), + files: newFiles, + collectionId: constructCollectionId(config.collection.name, config.collection.version), + }; +} + +async function buildTargetGranules( + granules: Array, + config: EventConfig, + cmrFiles: { [key: string]: CMRFile } +): Promise> { + const cmrFileNames = Object.values(cmrFiles).map((f) => path.basename(f.key)); + + return await Promise.all(granules.map( + async (granule) => updateGranuleMetadata(granule, config, cmrFiles, cmrFileNames) + )); +} + +async function moveGranules(event: MoveGranuleCollectionsEvent): Promise { + const config = event.config; + const s3MultipartChunksizeMb = config.s3MultipartChunksizeMb + ? config.s3MultipartChunksizeMb : Number(process.env.default_s3_multipart_chunksize_mb); + + const chunkSize = s3MultipartChunksizeMb ? s3MultipartChunksizeMb * MB : undefined; + const granuleMetadataFileExtension: string = get( + config, + 'collection.meta.granuleMetadataFileExtension' + ); + + log.debug(`moveGranules config: s3MultipartChunksizeMb: ${s3MultipartChunksizeMb}, ` + + `granuleMetadataFileExtension ${granuleMetadataFileExtension}`); + + const granulesInput = event.input.granules; + + let filterFunc; + if (granuleMetadataFileExtension) { + filterFunc = (fileobject: ApiFile) => isFileExtensionMatched( + fileobject, + granuleMetadataFileExtension + ); + } else { + filterFunc = (fileobject: ApiFile) => isCMRFile(fileobject) || isISOFile(fileobject); + } + const cmrFiles: Array = granulesToCmrFileObjects(granulesInput, filterFunc); + const cmrFilesByGranuleId: Dictionary = keyBy(cmrFiles, 'granuleId'); + + const targetGranules = await buildTargetGranules( + granulesInput, config, cmrFilesByGranuleId + ); + + // Move files from staging location to final location + await moveFilesForAllGranules( + granulesInput, targetGranules, chunkSize + ); + + return { + granules: targetGranules, + }; +} + +/** + * Lambda handler + */ +async function handler(event: CumulusMessage, context: Context): Promise { + return await runCumulusTask(moveGranules, event, context); +} + +exports.handler = handler; +exports.moveGranules = moveGranules; diff --git a/tasks/move-granule-collections/tests/data/meta.xml b/tasks/move-granule-collections/tests/data/meta.xml new file mode 100644 index 00000000000..8c2a07bf41d --- /dev/null +++ b/tasks/move-granule-collections/tests/data/meta.xml @@ -0,0 +1,210 @@ + + + MOD11A1.A2017200.h19v04.006.2017201090724 + 2017-11-20T23:02:40.055807 + 2017-11-20T23:02:40.055814 + + MOD11A1 + 006 + + + further update is anticipated + reprocessed + MOD11A1.A2017200.h19v04.006.2017201090724.hdf + BOTH + 2015-07-02T16:47:38.000Z + 6.4.4AS + + + 6.4.11 + + + + 2003-02-19T00:00:00Z + 2003-02-19T23:59:59Z + + + + + + + + + -70.004161028804404 + -0.004166666666662 + + + -60.004177439215297 + -0.004166666666662 + + + -60.929844150213498 + -9.995833333333330 + + + -71.084093041393103 + -9.995833333333330 + + + + + + + + + MOD 1KM L3 LST + + 0 + 0 + 0 + 92 + + + Passed + No automatic quality assessment is performed in the PGE. + Not Investigated + See http://landweb.nascom.nasa.gov/cgi-bin/QA_WWW/qaFlagPage.cgi?sat + + + + + + Terra + + + MODIS + + + MODIS + + + + + + + + + QAFRACTIONGOODQUALITY + + 0.0285983 + + + + QAPERCENTNOTPRODUCEDOTHER + + 0 + + + + CLOUD_CONTAMINATED_LST_SCREENED + + YES + + + + VERTICALTILENUMBER + + 09 + + + + QAFRACTIONOTHERQUALITY + + 0.0543250 + + + + QAPERCENTGOODQUALITY + + 3 + + + + HORIZONTALTILENUMBER + + 11 + + + + QAFRACTIONNOTPRODUCEDCLOUD + + 0.9170767 + + + + TileID + + 51011009 + + + + QAFRACTIONNOTPRODUCEDOTHER + + 0.0000000 + + + + identifier_product_doi + + 10.5067/MODIS/MOD11A1.006 + + + + N_GRAN_POINTERS + + 28 + + + + QAPERCENTNOTPRODUCEDCLOUD + + 92 + + + + identifier_product_doi_authority + + http://dx.doi.org + + + + QAPERCENTOTHERQUALITY + + 5 + + + + + MOD03.A2003050.0315.006.2012268032410.hdf + MOD021KM.A2003050.0315.006.2014220090715.hdf + MOD35_L2.A2003050.0315.006.2014320160924.hdf + MOD07_L2.A2003050.0315.006.2014320161105.hdf + MOD03.A2003050.1355.006.2012268034643.hdf + + + 11 + 09 + MODIS Tile SIN + + + + https://fvk4vim143.execute-api.us-east-1.amazonaws.com/dev/MOD11A1.A2017200.h19v04.006.2017201090724.hdf + Download MOD11A1.A2017200.h19v04.006.2017201090724.hdf + + + http://cumulus-test-sandbox-public.s3.amazonaws.com/MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg + Download MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg + + + http://cumulus-test-sandbox-public.s3.amazonaws.com/MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg + Download MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg + + + https://fvk4vim143.execute-api.us-east-1.amazonaws.com/dev/MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml + Download MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml + + + true + true + 92 + diff --git a/tasks/move-granule-collections/tests/data/new_collection_base.json b/tasks/move-granule-collections/tests/data/new_collection_base.json new file mode 100644 index 00000000000..861e8351f5e --- /dev/null +++ b/tasks/move-granule-collections/tests/data/new_collection_base.json @@ -0,0 +1,44 @@ +{ + "files": [ + { + "regex": "^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf$", + "sampleFileName": "MOD11A1.A2017200.h19v04.006.2017201090724.hdf", + "bucket": "protected" + }, + { + "regex": "^BROWSE\\.MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf$", + "sampleFileName": "BROWSE.MOD11A1.A2017200.h19v04.006.2017201090724.hdf", + "bucket": "private" + }, + { + "regex": "^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf\\.met$", + "sampleFileName": "MOD11A1.A2017200.h19v04.006.2017201090724.hdf.met", + "bucket": "private" + }, + { + "regex": "^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.cmr\\.xml$", + "sampleFileName": "MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml", + "bucket": "public" + }, + { + "regex": "^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_2\\.jpg$", + "sampleFileName": "MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg", + "bucket": "public" + }, + { + "regex": "^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_1\\.jpg$", + "sampleFileName": "MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg", + "bucket": "public", + "url_path": "jpg/example2/" + } + ], + "url_path": "example2/{extractYear(cmrMetadata.Granule.Temporal.RangeDateTime.BeginningDateTime)}/", + "name": "MOD11A2", + "granuleIdExtraction": "(MOD11A1\\.(.*))\\.hdf", + "granuleId": "^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}$", + "dataType": "MOD11A2", + "process": "modis", + "version": "006", + "sampleFileName": "MOD11A1.A2017200.h19v04.006.2017201090724.hdf", + "id": "MOD11A2" + } \ No newline at end of file diff --git a/tasks/move-granule-collections/tests/data/no_move_collection.json b/tasks/move-granule-collections/tests/data/no_move_collection.json new file mode 100644 index 00000000000..cff6758ae88 --- /dev/null +++ b/tasks/move-granule-collections/tests/data/no_move_collection.json @@ -0,0 +1,32 @@ +{ + "files": [ + { + "regex": "^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf$", + "sampleFileName": "MOD11A1.A2017200.h19v04.006.2017201090724.hdf", + "bucket": "protected" + }, + { + "regex": "^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_1\\.jpg$", + "sampleFileName": "MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg", + "bucket": "private" + }, + { + "regex": "^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_2\\.jpg$", + "sampleFileName": "MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg", + "bucket": "public" + }, + { + "regex": "^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.cmr\\.xml$", + "sampleFileName": "MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml", + "bucket": "protected" + } + ], + "url_path": "file-staging/subdir/", + "name": "MOD11A2", + "granuleIdExtraction": "(MOD11A1\\.(.*))\\.hdf", + "granuleId": "^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}$", "dataType": "MOD11A2", + "process": "modis", + "version": "006", + "sampleFileName": "MOD11A1.A2017200.h19v04.006.2017201090724.hdf", + "id": "MOD11A2" + } \ No newline at end of file diff --git a/tasks/move-granule-collections/tests/data/original_collection.json b/tasks/move-granule-collections/tests/data/original_collection.json new file mode 100644 index 00000000000..e56c226bb22 --- /dev/null +++ b/tasks/move-granule-collections/tests/data/original_collection.json @@ -0,0 +1,33 @@ +{ + "files": [ + { + "regex": "^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.hdf$", + "sampleFileName": "MOD11A1.A2017200.h19v04.006.2017201090724.hdf", + "bucket": "protected" + }, + { + "regex": "^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_1\\.jpg$", + "sampleFileName": "MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg", + "bucket": "private" + }, + { + "regex": "^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}_2\\.jpg$", + "sampleFileName": "MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg", + "bucket": "public" + }, + { + "regex": "^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}\\.cmr\\.xml$", + "sampleFileName": "MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml", + "bucket": "protected" + } + ], + "url_path": "file-staging/subdir/", + "name": "MOD11A1", + "granuleIdExtraction": "(MOD11A1\\.(.*))\\.hdf", + "granuleId": "^MOD11A1\\.A[\\d]{7}\\.[\\S]{6}\\.006.[\\d]{13}$", + "dataType": "MOD11A1", + "process": "modis", + "version": "006", + "sampleFileName": "MOD11A1.A2017200.h19v04.006.2017201090724.hdf", + "id": "MOD11A1" + } \ No newline at end of file diff --git a/tasks/move-granule-collections/tests/data/payload.json b/tasks/move-granule-collections/tests/data/payload.json new file mode 100644 index 00000000000..b0193b5b294 --- /dev/null +++ b/tasks/move-granule-collections/tests/data/payload.json @@ -0,0 +1,56 @@ +{ + "config": { + "bucket": "cumulus-internal", + "buckets": { + "internal": { + "name": "cumulus-internal", + "type": "internal" + }, + "private": { + "name": "cumulus-private", + "type": "private" + }, + "protected": { + "name": "cumulus-protected", + "type": "protected" + }, + "public": { + "name": "cumulus-public", + "type": "public" + } + }, + "distribution_endpoint": "https://something.api.us-east-1.amazonaws.com/" + }, + "input": { + "granules": [ + { + "status": "completed", + "collectionId": "MOD11A1___006", + "granuleId": "MOD11A1.A2017200.h19v04.006.2017201090724", + "files": [ + + { + "key": "file-staging/subdir/MOD11A1.A2017200.h19v04.006.2017201090724.hdf", + "bucket": "replaceme-protected", + "type": "data" + }, + { + "key": "file-staging/subdir/MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg", + "bucket": "replaceme-private", + "type": "browse" + }, + { + "key": "file-staging/subdir/MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg", + "bucket": "replaceme-public", + "type": "browse" + }, + { + "key": "file-staging/subdir/MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml", + "bucket": "replaceme-protected", + "type": "metadata" + } + ] + } + ] + } +} diff --git a/tasks/move-granule-collections/tests/data/payload_partly_moved.json b/tasks/move-granule-collections/tests/data/payload_partly_moved.json new file mode 100644 index 00000000000..0106acadc58 --- /dev/null +++ b/tasks/move-granule-collections/tests/data/payload_partly_moved.json @@ -0,0 +1,55 @@ +{ + "config": { + "bucket": "cumulus-internal", + "buckets": { + "internal": { + "name": "cumulus-internal", + "type": "internal" + }, + "private": { + "name": "cumulus-private", + "type": "private" + }, + "protected": { + "name": "cumulus-protected", + "type": "protected" + }, + "public": { + "name": "cumulus-public", + "type": "public" + } + }, + "distribution_endpoint": "https://something.api.us-east-1.amazonaws.com/" + }, + "input": { + "granules": [ + { + "status": "completed", + "granuleId": "MOD11A1.A2017200.h19v04.006.2017201090724", + "files": [ + + { + "key": "file-staging/subdir/MOD11A1.A2017200.h19v04.006.2017201090724.hdf", + "bucket": "replaceme-protected", + "type": "data" + }, + { + "key": "jpg/example2/MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg", + "bucket": "replaceme-public", + "type": "browse" + }, + { + "key": "file-staging/subdir/MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg", + "bucket": "replaceme-public", + "type": "browse" + }, + { + "key": "file-staging/subdir/MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml", + "bucket": "replaceme-protected", + "type": "metadata" + } + ] + } + ] + } +} diff --git a/tasks/move-granule-collections/tests/test-index.js b/tasks/move-granule-collections/tests/test-index.js new file mode 100644 index 00000000000..9b44ceb3a93 --- /dev/null +++ b/tasks/move-granule-collections/tests/test-index.js @@ -0,0 +1,396 @@ +'use strict'; + +const fs = require('fs'); + +const proxyquire = require('proxyquire'); +const path = require('path'); +const test = require('ava'); +const cryptoRandomString = require('crypto-random-string'); +const { s3 } = require('@cumulus/aws-client/services'); +const { + buildS3Uri, + recursivelyDeleteS3Bucket, + putJsonS3Object, + s3ObjectExists, + promiseS3Upload, + parseS3Uri, +} = require('@cumulus/aws-client/S3'); +const { + randomId, validateOutput, +} = require('@cumulus/common/test-utils'); +const { + localStackConnectionEnv, + CollectionPgModel, + GranulePgModel, + translateApiCollectionToPostgresCollection, + translateApiGranuleToPostgresGranule, + migrationDir, + generateLocalTestDb, + destroyLocalTestDb, +} = require('@cumulus/db'); +const { getDistributionBucketMapKey } = require('@cumulus/distribution-utils'); +const { isECHO10Filename, isISOFilename } = require('@cumulus/cmrjs/cmr-utils'); +const { updateGranulesAndFiles } = require('@cumulus/db'); + +let moveGranules; +async function uploadFiles(files) { + await Promise.all(files.map((file) => { + let body; + if (isECHO10Filename(file)) { + body = fs.createReadStream('tests/data/meta.xml'); + } else if (isISOFilename(file)) { + body = fs.createReadStream('tests/data/meta.iso.xml'); + } else { + body = parseS3Uri(file).Key; + } + return promiseS3Upload({ + params: { + Bucket: parseS3Uri(file).Bucket, + Key: parseS3Uri(file).Key, + Body: body, + }, + }); + })); +} + +async function setupPGData(granules, targetCollection, knex) { + const granuleModel = new GranulePgModel(); + const collectionModel = new CollectionPgModel(); + const collectionPath = path.join(__dirname, 'data', 'original_collection.json'); + const sourceCollection = JSON.parse(fs.readFileSync(collectionPath)); + const pgRecords = {}; + await collectionModel.create( + knex, + translateApiCollectionToPostgresCollection(sourceCollection) + ); + [pgRecords.targetCollection] = await collectionModel.create( + knex, + translateApiCollectionToPostgresCollection(targetCollection) + ); + pgRecords.granules = await granuleModel.insert( + knex, + await Promise.all(granules.map(async (g) => ( + await translateApiGranuleToPostgresGranule({ dynamoRecord: g, knexOrTransaction: knex }) + ))), + ['cumulus_id', 'granule_id'] + ); + return pgRecords; +} + +function granulesToFileURIs(granules) { + const files = granules.reduce((arr, g) => arr.concat(g.files), []); + return files.map((file) => buildS3Uri(file.bucket, file.key)); +} + +function buildPayload(t, collection) { + const newPayload = t.context.payload; + newPayload.config.collection = collection; + newPayload.config.bucket = t.context.stagingBucket; + newPayload.config.buckets.internal.name = t.context.stagingBucket; + newPayload.config.buckets.public.name = t.context.publicBucket; + newPayload.config.buckets.private.name = t.context.privateBucket; + newPayload.config.buckets.protected.name = t.context.protectedBucket; + newPayload.input.granules.forEach((granule) => { + granule.files?.forEach( + (file) => { + file.fileName = file.key.split('/').pop(); + } + ); + }); + return newPayload; +} + +test.beforeEach(async (t) => { + const testDbName = `move-granule-collections${cryptoRandomString({ length: 10 })}`; + const { knexAdmin, knex } = await generateLocalTestDb( + testDbName, + migrationDir + ); + moveGranules = proxyquire( + '../dist/src', + { + '@cumulus/api-client/granules': { + updateGranules: (params) => ( + updateGranulesAndFiles(knex, params.body) + ), + }, + } + ).moveGranules; + t.context.knexAdmin = knexAdmin; + t.context.knex = knex; + t.context.publicBucket = randomId('public'); + t.context.protectedBucket = randomId('protected'); + t.context.privateBucket = randomId('private'); + t.context.systemBucket = randomId('system'); + t.context.stackName = 'moveGranulesTestStack'; + const bucketMapping = { + public: t.context.publicBucket, + protected: t.context.protectedBucket, + private: t.context.privateBucket, + + }; + t.context.bucketMapping = bucketMapping; + await Promise.all([ + s3().createBucket({ Bucket: t.context.publicBucket }), + s3().createBucket({ Bucket: t.context.protectedBucket }), + s3().createBucket({ Bucket: t.context.privateBucket }), + s3().createBucket({ Bucket: t.context.systemBucket }), + ]); + process.env = { + ...process.env, + ...localStackConnectionEnv, + PG_DATABASE: testDbName, + }; + process.env.system_bucket = t.context.systemBucket; + process.env.stackName = t.context.stackName; + putJsonS3Object( + t.context.systemBucket, + getDistributionBucketMapKey(t.context.stackName), + { + [t.context.publicBucket]: t.context.publicBucket, + [t.context.privateBucket]: t.context.privateBucket, + [t.context.protectedBucket]: t.context.protectedBucket, + [t.context.systemBucket]: t.context.systemBucket, + } + ); +}); + +test.afterEach.always(async (t) => { + await recursivelyDeleteS3Bucket(t.context.publicBucket); + await recursivelyDeleteS3Bucket(t.context.protectedBucket); + await recursivelyDeleteS3Bucket(t.context.systemBucket); + await destroyLocalTestDb({ + knex: t.context.knex, + knexAdmin: t.context.knexAdmin, + tesetDbName: t.context.testDbName, + }); +}); + +test.serial('Should move files to final location and update pg data', async (t) => { + const payloadPath = path.join(__dirname, 'data', 'payload.json'); + const rawPayload = fs.readFileSync(payloadPath, 'utf8') + .replaceAll('replaceme-public', t.context.bucketMapping.public) + .replaceAll('replaceme-private', t.context.bucketMapping.private) + .replaceAll('replaceme-protected', t.context.bucketMapping.protected); + t.context.payload = JSON.parse(rawPayload); + const filesToUpload = granulesToFileURIs( + t.context.payload.input.granules + ); + const collectionPath = path.join(__dirname, 'data', 'new_collection_base.json'); + const collection = JSON.parse(fs.readFileSync(collectionPath)); + const newPayload = buildPayload(t, collection); + await uploadFiles(filesToUpload, t.context.bucketMapping); + const pgRecords = await setupPGData(newPayload.input.granules, collection, t.context.knex); + const output = await moveGranules(newPayload); + await validateOutput(t, output); + t.true(await s3ObjectExists({ + Bucket: t.context.protectedBucket, + Key: 'example2/2003/MOD11A1.A2017200.h19v04.006.2017201090724.hdf', + })); + t.true(await s3ObjectExists({ + Bucket: t.context.publicBucket, + Key: 'jpg/example2/MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg', + })); + t.true(await s3ObjectExists({ + Bucket: t.context.publicBucket, + Key: 'example2/2003/MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg', + })); + t.true(await s3ObjectExists({ + Bucket: t.context.publicBucket, + Key: 'example2/2003/MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml', + })); + const granuleModel = new GranulePgModel(); + const finalPgGranule = await granuleModel.get(t.context.knex, { + cumulus_id: pgRecords.granules[0].cumulus_id, + }); + t.true(finalPgGranule.granule_id === pgRecords.granules[0].granule_id); + t.true(finalPgGranule.collection_cumulus_id === pgRecords.targetCollection.cumulus_id); +}); + +test('handles partially moved files', async (t) => { + const payloadPath = path.join(__dirname, 'data', 'payload.json'); + const rawPayload = fs.readFileSync(payloadPath, 'utf8') + .replaceAll('replaceme-public', t.context.publicBucket) + .replaceAll('replaceme-private', t.context.privateBucket) + .replaceAll('replaceme-protected', t.context.protectedBucket); + t.context.payload = JSON.parse(rawPayload); + + // a starting granule state that disagrees with the payload as some have already been moved + const startingState = [{ + files: [ + { + key: 'file-staging/subdir/MOD11A1.A2017200.h19v04.006.2017201090724.hdf', + bucket: t.context.protectedBucket, + type: 'data', + }, + { + key: 'jpg/example2/MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg', + bucket: t.context.publicBucket, + type: 'browse', + }, + { + key: 'file-staging/subdir/MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg', + bucket: t.context.publicBucket, + type: 'browse', + }, + { + key: 'example2/2003/MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml', + bucket: t.context.publicBucket, + type: 'metadata ', + }, + { + key: 'file-staging/subdir/MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml', + bucket: t.context.protectedBucket, + type: 'metadata', + }, + ], + }]; + const filesToUpload = granulesToFileURIs( + startingState + ); + const collectionPath = path.join(__dirname, 'data', 'new_collection_base.json'); + const collection = JSON.parse(fs.readFileSync(collectionPath)); + const newPayload = buildPayload(t, collection); + + const pgRecords = await setupPGData(newPayload.input.granules, collection, t.context.knex); + await uploadFiles(filesToUpload, t.context.bucketMapping); + + const output = await moveGranules(newPayload); + await validateOutput(t, output); + t.true(await s3ObjectExists({ + Bucket: t.context.protectedBucket, + Key: 'example2/2003/MOD11A1.A2017200.h19v04.006.2017201090724.hdf', + })); + t.true(await s3ObjectExists({ + Bucket: t.context.publicBucket, + Key: 'jpg/example2/MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg', + })); + t.true(await s3ObjectExists({ + Bucket: t.context.publicBucket, + Key: 'example2/2003/MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg', + })); + t.true(await s3ObjectExists({ + Bucket: t.context.publicBucket, + Key: 'example2/2003/MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml', + })); + const granuleModel = new GranulePgModel(); + const finalPgGranule = await granuleModel.get(t.context.knex, { + cumulus_id: pgRecords.granules[0].cumulus_id, + }); + t.true(finalPgGranule.granule_id === pgRecords.granules[0].granule_id); + t.true(finalPgGranule.collection_cumulus_id === pgRecords.targetCollection.cumulus_id); +}); + +test.serial('handles files that are pre-moved and misplaced w/r to postgres', async (t) => { + const payloadPath = path.join(__dirname, 'data', 'payload.json'); + const rawPayload = fs.readFileSync(payloadPath, 'utf8') + .replaceAll('replaceme-public', t.context.bucketMapping.public) + .replaceAll('replaceme-private', t.context.bucketMapping.private) + .replaceAll('replaceme-protected', t.context.bucketMapping.protected); + t.context.payload = JSON.parse(rawPayload); + const startingState = [{ + files: [ + { + key: 'example2/2003/MOD11A1.A2017200.h19v04.006.2017201090724.hdf', + bucket: t.context.protectedBucket, + type: 'data', + }, + { + key: 'jpg/example2/MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg', + bucket: t.context.publicBucket, + type: 'browse', + }, + { + key: 'example2/2003/MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg', + bucket: t.context.publicBucket, + type: 'browse', + }, + { + key: 'example2/2003/MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml', + bucket: t.context.publicBucket, + type: 'metadata ', + }, + { + key: 'file-staging/subdir/MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml', + bucket: t.context.bucketMapping.protected, + type: 'metadata', + }, + ], + }]; + const filesToUpload = granulesToFileURIs( + startingState + ); + const collectionPath = path.join(__dirname, 'data', 'new_collection_base.json'); + const collection = JSON.parse(fs.readFileSync(collectionPath)); + const newPayload = buildPayload(t, collection); + + await uploadFiles(filesToUpload, t.context.bucketMapping); + const pgRecords = await setupPGData(newPayload.input.granules, collection, t.context.knex); + const output = await moveGranules(newPayload); + await validateOutput(t, output); + t.true(await s3ObjectExists({ + Bucket: t.context.protectedBucket, + Key: 'example2/2003/MOD11A1.A2017200.h19v04.006.2017201090724.hdf', + })); + t.true(await s3ObjectExists({ + Bucket: t.context.publicBucket, + Key: 'jpg/example2/MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg', + })); + t.true(await s3ObjectExists({ + Bucket: t.context.publicBucket, + Key: 'example2/2003/MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg', + })); + t.true(await s3ObjectExists({ + Bucket: t.context.publicBucket, + Key: 'example2/2003/MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml', + })); + const granuleModel = new GranulePgModel(); + const finalPgGranule = await granuleModel.get(t.context.knex, { + cumulus_id: pgRecords.granules[0].cumulus_id, + }); + t.true(finalPgGranule.granule_id === pgRecords.granules[0].granule_id); + t.true(finalPgGranule.collection_cumulus_id === pgRecords.targetCollection.cumulus_id); +}); + +test.serial('handles files that need no move', async (t) => { + const payloadPath = path.join(__dirname, 'data', 'payload.json'); + const rawPayload = fs.readFileSync(payloadPath, 'utf8') + .replaceAll('replaceme-public', t.context.bucketMapping.public) + .replaceAll('replaceme-private', t.context.bucketMapping.private) + .replaceAll('replaceme-protected', t.context.bucketMapping.protected); + t.context.payload = JSON.parse(rawPayload); + const filesToUpload = granulesToFileURIs( + t.context.payload.input.granules + ); + const collectionPath = path.join(__dirname, 'data', 'no_move_collection.json'); + const collection = JSON.parse(fs.readFileSync(collectionPath)); + const newPayload = buildPayload(t, collection); + await uploadFiles(filesToUpload, t.context.bucketMapping); + const pgRecords = await setupPGData(newPayload.input.granules, collection, t.context.knex); + + const output = await moveGranules(newPayload); + await validateOutput(t, output); + t.true(await s3ObjectExists({ + Bucket: t.context.protectedBucket, + Key: 'file-staging/subdir/MOD11A1.A2017200.h19v04.006.2017201090724.hdf', + })); + t.true(await s3ObjectExists({ + Bucket: t.context.privateBucket, + Key: 'file-staging/subdir/MOD11A1.A2017200.h19v04.006.2017201090724_1.jpg', + })); + t.true(await s3ObjectExists({ + Bucket: t.context.publicBucket, + Key: 'file-staging/subdir/MOD11A1.A2017200.h19v04.006.2017201090724_2.jpg', + })); + t.true(await s3ObjectExists({ + Bucket: t.context.protectedBucket, + Key: 'file-staging/subdir/MOD11A1.A2017200.h19v04.006.2017201090724.cmr.xml', + })); + + const granuleModel = new GranulePgModel(); + const finalPgGranule = await granuleModel.get(t.context.knex, { + cumulus_id: pgRecords.granules[0].cumulus_id, + }); + t.true(finalPgGranule.granule_id === pgRecords.granules[0].granule_id); + t.true(finalPgGranule.collection_cumulus_id === pgRecords.targetCollection.cumulus_id); +}); diff --git a/tasks/move-granule-collections/tsconfig.json b/tasks/move-granule-collections/tsconfig.json new file mode 100644 index 00000000000..ee77bce5f9b --- /dev/null +++ b/tasks/move-granule-collections/tsconfig.json @@ -0,0 +1,15 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "target": "es2021", + "lib": [ + "DOM", + "es2021" + ], + "outDir": "dist/src" + }, + "include": [ + "src/" + ], + "exclude": [], +} diff --git a/tasks/move-granule-collections/webpack.config.js b/tasks/move-granule-collections/webpack.config.js new file mode 100644 index 00000000000..918367a02e7 --- /dev/null +++ b/tasks/move-granule-collections/webpack.config.js @@ -0,0 +1,44 @@ +const path = require('path'); +// path to module root +const root = path.resolve(__dirname); + +module.exports = { + mode: process.env.PRODUCTION ? 'production' : 'development', + entry: './dist/src/index.js', + output: { + chunkFormat: false, + libraryTarget: 'commonjs2', + filename: 'index.js', + path: path.resolve(__dirname, 'dist', 'webpack'), + devtoolModuleFilenameTemplate: (info) => { + const relativePath = path.relative(root, info.absoluteResourcePath) + return `webpack://${relativePath}`; + } + }, + externals: [ + /@aws-sdk\//, + 'electron', + {'formidable': 'url'}, + // See https://github.com/knex/knex/issues/1128 re: webpack configuration + { + 'better-sqlite3': 'better-sqlite3', + sqlite3: 'sqlite3', + mysql2: 'mysql2', + mariasql: 'mariasql', + mysql: 'mysql', + mssql: 'mssql', + oracle: 'oracle', + 'strong-oracle': 'strong-oracle', + oracledb: 'oracledb', + pg: 'pg', + 'pg-query-stream': 'pg-query-stream', + tedious: 'tedious' + } + ], + + devtool: 'inline-source-map', + target: 'node', + optimization: { + nodeEnv: false + } +}; diff --git a/tasks/move-granules/package.json b/tasks/move-granules/package.json index 253d189224d..aa837b03d5d 100644 --- a/tasks/move-granules/package.json +++ b/tasks/move-granules/package.json @@ -52,6 +52,7 @@ "lodash": "^4.17.21" }, "devDependencies": { + "@cumulus/db": "19.1.0", "@cumulus/schemas": "19.1.0" } } diff --git a/tf-modules/cumulus/ingest.tf b/tf-modules/cumulus/ingest.tf index 733e2c3d54c..f151b715368 100644 --- a/tf-modules/cumulus/ingest.tf +++ b/tf-modules/cumulus/ingest.tf @@ -67,4 +67,5 @@ module "ingest" { # Cloudwatch log retention config cloudwatch_log_retention_periods = var.cloudwatch_log_retention_periods default_log_retention_days = var.default_log_retention_days + ecs_cluster_arn = aws_ecs_cluster.default.arn } diff --git a/tf-modules/cumulus/outputs.tf b/tf-modules/cumulus/outputs.tf index 4d1d1c61967..fa3d60d4e3b 100644 --- a/tf-modules/cumulus/outputs.tf +++ b/tf-modules/cumulus/outputs.tf @@ -83,6 +83,11 @@ output "move_granules_task" { value = module.ingest.move_granules_task } + +output "move_granule_collections_task" { + value = module.ingest.move_granule_collections_task +} + output "orca_copy_to_archive_adapter_task" { value = module.ingest.orca_copy_to_archive_adapter_task } @@ -181,3 +186,4 @@ output "ecs_cluster_name" { output "start_sf_queue_url" { value = module.ingest.start_sf_queue_url } + diff --git a/tf-modules/ingest/move-granule-collections-task.tf b/tf-modules/ingest/move-granule-collections-task.tf new file mode 100644 index 00000000000..1b597b89b36 --- /dev/null +++ b/tf-modules/ingest/move-granule-collections-task.tf @@ -0,0 +1,68 @@ +resource "aws_lambda_function" "move_granule_collections_task" { + function_name = "${var.prefix}-MoveGranuleCollections" + filename = "${path.module}/../../tasks/move-granule-collections/dist/webpack/lambda.zip" + source_code_hash = filebase64sha256("${path.module}/../../tasks/move-granule-collections/dist/webpack/lambda.zip") + handler = "index.handler" + role = var.lambda_processing_role_arn + runtime = "nodejs20.x" + timeout = lookup(var.lambda_timeouts, "MoveGranuleCollections", 300) + memory_size = lookup(var.lambda_memory_sizes, "MoveGranuleCollections", 1024) + + layers = [var.cumulus_message_adapter_lambda_layer_version_arn] + + environment { + variables = { + CMR_ENVIRONMENT = var.cmr_environment + CMR_HOST = var.cmr_custom_host + CUMULUS_MESSAGE_ADAPTER_DIR = "/opt/" + default_s3_multipart_chunksize_mb = var.default_s3_multipart_chunksize_mb + stackName = var.prefix + system_bucket = var.system_bucket + } + } + + dynamic "vpc_config" { + for_each = length(var.lambda_subnet_ids) == 0 ? [] : [1] + content { + subnet_ids = var.lambda_subnet_ids + security_group_ids = [ + aws_security_group.no_ingress_all_egress[0].id + ] + } + } + + tags = var.tags +} + +resource "aws_sfn_activity" "move_granule_collections_ecs_task" { + name = "${var.prefix}-EcsTaskMoveGranuleCollections" + tags = var.tags +} + +data "aws_ecr_repository" "ecs_task_image" { + name = "cumulus-ecs-task" +} + +module "move_granule_collections_service" { + source = "../cumulus_ecs_service" + prefix = var.prefix + name = "MoveGranuleCollections" + cluster_arn = var.ecs_cluster_arn + image = "${data.aws_ecr_repository.ecs_task_image.repository_url}:${var.ecs_task_image_version}" + + desired_count = 1 + cpu = 400 + memory_reservation = 700 + + default_log_retention_days = var.default_log_retention_days + cloudwatch_log_retention_periods = var.cloudwatch_log_retention_periods + + environment = { + AWS_DEFAULT_REGION = data.aws_region.current.name + } + command = [ + "cumulus-ecs-task", + "--activityArn", + aws_sfn_activity.move_granule_collections_ecs_task.id, + ] +} \ No newline at end of file diff --git a/tf-modules/ingest/outputs.tf b/tf-modules/ingest/outputs.tf index f4f2928f6de..3e3b39e279e 100644 --- a/tf-modules/ingest/outputs.tf +++ b/tf-modules/ingest/outputs.tf @@ -211,3 +211,14 @@ output "update_granules_cmr_metadata_file_links_task" { last_modified_date = aws_lambda_function.update_granules_cmr_metadata_file_links_task.last_modified } } + +output "move_granule_collections_task" { + value = { + task_arn = aws_lambda_function.move_granule_collections_task.arn + last_modified_date = aws_lambda_function.move_granule_collections_task.last_modified + } +} + +output "move_granule_collections_ecs_task_id" { + value = aws_sfn_activity.move_granule_collections_ecs_task.id +} \ No newline at end of file diff --git a/tf-modules/ingest/variables.tf b/tf-modules/ingest/variables.tf index a21df893325..ea180814d8f 100644 --- a/tf-modules/ingest/variables.tf +++ b/tf-modules/ingest/variables.tf @@ -217,3 +217,13 @@ variable "sqs_message_consumer_watcher_time_limit" { polling SQS. This value should be adjusted in conjunction with sqs_message_consumer_watcher_message_limit. EOF } + +variable "ecs_task_image_version" { + description = "docker image version to use for Cumulus hello world task" + type = string + default = "2.1.0" +} + +variable "ecs_cluster_arn" { + type = string +}