-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CUMULUS-3757: Update granule upsert logic to allow updating collection info #3872
Changes from 38 commits
83fd2c8
aa314ee
ae30573
e3de4fb
f195570
e91d941
a9d773b
ee10ed1
c7a2d79
c301ea3
03b9822
5e52bb0
25d28c1
994e877
7554d51
0cdad44
81cecdf
1e773b7
4c2e461
fac41f7
5a63c76
de8f086
926291f
47fa0a3
e9ee557
ef1dacc
67b6145
adac427
7e09e80
1bc74e1
26cbdc1
b1bbc29
c1619c4
68a8b28
d36fc3b
108e42d
b92ced4
e8346ea
6a0d618
8342a02
70b09b7
9a800b8
a835a6a
4834f68
c8bc308
2d15b5f
5c81cf4
a2efe77
5f29b66
0e05f81
03237e0
c4b76f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,12 +9,15 @@ import { | |
import { RecordDoesNotExist } from '@cumulus/errors'; | ||
import Logger from '@cumulus/logger'; | ||
|
||
import { ApiGranule } from '@cumulus/types/api/granules'; | ||
import { CollectionPgModel } from '../models/collection'; | ||
import { GranulePgModel } from '../models/granule'; | ||
import { GranulesExecutionsPgModel } from '../models/granules-executions'; | ||
import { PostgresGranule, PostgresGranuleRecord } from '../types/granule'; | ||
import { GranuleWithProviderAndCollectionInfo } from '../types/query'; | ||
import { UpdatedAtRange } from '../types/record'; | ||
import { translateApiGranuleToPostgresGranule } from '../translate/granules'; | ||
import { translateApiFiletoPostgresFile } from '../translate/file'; | ||
const { deprecate } = require('@cumulus/common/util'); | ||
|
||
const { TableNames } = require('../tables'); | ||
|
@@ -354,3 +357,41 @@ export const getGranulesByGranuleId = async ( | |
.where({ granule_id: granuleId }); | ||
return records; | ||
}; | ||
|
||
/** | ||
* Change a granules' PG record and its files' PG record based on collection move | ||
* | ||
* @param {Knex | Knex.Transaction} knexOrTransaction - DB client or transaction | ||
* @param {Object} [collectionPgModel] - Collection PG model class instance | ||
* @param {string[]} granuleIds - list of granules by granuleIds to change | ||
* @param {string} collectionId - collection ID | ||
* @returns {Promise<void>} | ||
*/ | ||
export const updateGranuleAndFiles = async ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This takes an input array of multiples, it should probably be |
||
knexOrTransaction: Knex | Knex.Transaction, | ||
granules: Array<ApiGranule> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider ApiGranuleRecord (these shouldn't be new records, right?) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. whats the difference? theyre not new records (the same granules with the updated files/collectionId based on the move), I assumed ApiGranule was fine, will change it but just wondering |
||
):Promise<void> => { | ||
const { | ||
granules: granulesTable, | ||
files: filesTable, | ||
} = TableNames; | ||
await Promise.all(granules.map(async (granule) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: If the map of granules is 10k, is it an acceptable usage scenario that when it fails on granule 1001 for that 1k granules were moved, and 9k failed, or do we want it to move 9,999 of them and fail the one with a metadata/connection/whatever issue? Apologies if that should be obvious, I may be lacking context. Edit reviewed the tests - re-run/idempotent intent is probably fine here, but a doc annotation for @error is probably warranted in the header. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. im not exactly sure on this, i'd assume this: |
||
const pgGranule = await translateApiGranuleToPostgresGranule({ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Theory/Nit: Should the DB package be taking API granules as input for anything other than translation generally? I see we're doing it for fields in other methods, so I'm not resting on package dogma, but seeing 'take a set of API granules and translate' in the method makes me wonder if we should be offloading that concurrency to the calling method and just take a set of updates to incoming Knex objects. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not really sure, that does sounds like it would make it better, does that entail passing the translation method into the function? changes-wise There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ohhhhhhhhhhhh you're saying instead of sending APIGRANULES to the function, instead to send them already translated to PG, ok, that doesnt seem like any more work except that Ethan's task would need to do that 🤔 |
||
dynamoRecord: granule, | ||
knexOrTransaction, | ||
}); | ||
await knexOrTransaction(granulesTable).where('granule_id', '=', pgGranule.granule_id).update( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to enforce a transaction here, not just have it be possible it's a transaction object. We don't want partial granule/file record updates. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Nnaga1 I'd read up on https://knexjs.org/guide/transactions.html / take a look at what we're doing in the API logic for https://github.com/nasa/cumulus/blob/CUMULUS-3757-move-granule/packages/api/lib/writeRecords/write-granules.js#L488 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be clear, I'm suggesting that we commit all granule/file updates together (and roll back on failure) in a transaction instead of make updates serially in a way that doesn't rollback if any of them fail. |
||
pgGranule | ||
); | ||
|
||
if (granule.files) { | ||
await Promise.all(granule.files.map(async (file) => { | ||
const pgFile = translateApiFiletoPostgresFile({ ...file, granuleId: pgGranule.granule_id }); | ||
|
||
await knexOrTransaction(filesTable).where('file_name', '=', String(pgFile.file_name)).update( | ||
pgFile | ||
); | ||
})); | ||
} | ||
})); | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,220 @@ | ||
/* eslint-disable no-await-in-loop */ | ||
const test = require('ava'); | ||
const cryptoRandomString = require('crypto-random-string'); | ||
const range = require('lodash/range'); | ||
|
||
const { constructCollectionId } = require('@cumulus/message/Collections'); | ||
const { | ||
CollectionPgModel, | ||
GranulePgModel, | ||
FilePgModel, | ||
generateLocalTestDb, | ||
destroyLocalTestDb, | ||
fakeCollectionRecordFactory, | ||
fakeFileRecordFactory, | ||
fakeGranuleRecordFactory, | ||
getUniqueGranuleByGranuleId, | ||
migrationDir, | ||
updateGranuleAndFiles, | ||
translatePostgresGranuleResultToApiGranule, | ||
translatePostgresCollectionToApiCollection, | ||
} = require('../../dist'); | ||
|
||
const testDbName = `granule_${cryptoRandomString({ length: 10 })}`; | ||
|
||
// this function is used to simulate granule records post-collection-move for database updates | ||
const simulateGranuleUpdate = async (knex, granules, collection, collectionId, collectionId2) => { | ||
const movedGranules = []; | ||
for (const granule of granules) { | ||
const postMoveApiGranule = await translatePostgresGranuleResultToApiGranule(knex, { | ||
...granule, | ||
collectionName: collection.name, | ||
collectionVersion: collection.version, | ||
}); | ||
postMoveApiGranule.collectionId = collectionId2; | ||
postMoveApiGranule.updatedAt = Date.now(); | ||
postMoveApiGranule.lastUpdateDateTime = new Date().toISOString(); | ||
for (const apiFile of postMoveApiGranule.files) { | ||
apiFile.bucket = apiFile.bucket.replace(collectionId, collectionId2); | ||
apiFile.key = apiFile.key.replace(collectionId, collectionId2); | ||
//apiFile.path = apiFile.path.replace(t.context.collectionId, t.context.collectionId2); | ||
Nnaga1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
apiFile.updatedAt = Date.now(); | ||
} | ||
movedGranules.push(postMoveApiGranule); | ||
} | ||
return movedGranules; | ||
}; | ||
|
||
test.before(async (t) => { | ||
const { knexAdmin, knex } = await generateLocalTestDb( | ||
testDbName, | ||
migrationDir | ||
); | ||
t.context.knexAdmin = knexAdmin; | ||
t.context.knex = knex; | ||
|
||
t.context.granulePgModel = new GranulePgModel(); | ||
t.context.collectionPgModel = new CollectionPgModel(); | ||
t.context.filePgModel = new FilePgModel(); | ||
|
||
// set up 2 collections | ||
t.context.collection = fakeCollectionRecordFactory(); | ||
t.context.collection2 = fakeCollectionRecordFactory(); | ||
t.context.collectionId = constructCollectionId( | ||
t.context.collection.name, | ||
t.context.collection.version | ||
); | ||
t.context.collectionId2 = constructCollectionId( | ||
t.context.collection2.name, | ||
t.context.collection2.version | ||
); | ||
const collectionResponse = await t.context.collectionPgModel.create( | ||
t.context.knex, | ||
t.context.collection | ||
); | ||
const collectionResponse2 = await t.context.collectionPgModel.create( | ||
t.context.knex, | ||
t.context.collection2 | ||
); | ||
t.context.collectionCumulusId = collectionResponse[0].cumulus_id; | ||
t.context.collectionCumulusId2 = collectionResponse2[0].cumulus_id; | ||
t.context.apiCollection1 = translatePostgresCollectionToApiCollection(collectionResponse[0]); | ||
t.context.apiCollection2 = translatePostgresCollectionToApiCollection(collectionResponse2[0]); | ||
|
||
// create 10 granules in one collection, 0 in the other | ||
t.context.granuleIds = range(10).map((num) => 'granuleId___' + num); | ||
|
||
t.context.granulePgModel = new GranulePgModel(); | ||
t.context.granules = range(10).map((num) => fakeGranuleRecordFactory({ | ||
granule_id: t.context.granuleIds[num], | ||
collection_cumulus_id: t.context.collectionCumulusId, | ||
cumulus_id: num, | ||
})); | ||
t.context.pgGranules = await t.context.granulePgModel.insert( | ||
knex, | ||
t.context.granules | ||
); | ||
|
||
t.context.movedGranules = []; | ||
|
||
t.context.files = []; | ||
// create fake files for each of the ten granules (3 per granule) | ||
for (const pgGranule of t.context.granules) { | ||
t.context.files.push( | ||
fakeFileRecordFactory({ | ||
granule_cumulus_id: pgGranule.cumulus_id, | ||
file_name: pgGranule.granule_id + '.hdf', | ||
updated_at: new Date().toISOString(), | ||
bucket: t.context.collectionId + '--bucket', | ||
key: t.context.collectionId + pgGranule.granule_id + '/key-hdf.pem', | ||
path: t.context.collectionId + '/' + pgGranule.granule_id, | ||
}), | ||
fakeFileRecordFactory({ | ||
granule_cumulus_id: pgGranule.cumulus_id, | ||
file_name: pgGranule.granule_id + '.txt', | ||
updated_at: new Date().toISOString(), | ||
bucket: t.context.collectionId + '--bucket', | ||
key: t.context.collectionId + pgGranule.granule_id + '/key-txt.pem', | ||
path: t.context.collectionId + '/' + pgGranule.granule_id, | ||
}), | ||
fakeFileRecordFactory({ | ||
granule_cumulus_id: pgGranule.cumulus_id, | ||
file_name: pgGranule.granule_id + '.cmr', | ||
updated_at: new Date().toISOString(), | ||
bucket: t.context.collectionId + '--bucket', | ||
key: t.context.collectionId + pgGranule.granule_id + '/key-cmr.pem', | ||
path: t.context.collectionId + '/' + pgGranule.granule_id, | ||
}) | ||
); | ||
} | ||
|
||
t.context.pgFiles = await t.context.filePgModel.insert(knex, t.context.files); | ||
// update 1/2 of the granules to be moved to the new collection | ||
t.context.movedGranules.push(await simulateGranuleUpdate(knex, t.context.granules.slice(0, 5), | ||
t.context.collection, t.context.collectionId, t.context.collectionId2)); | ||
|
||
// the other half will be unmoved but translated to an apiGranule | ||
t.context.movedGranules.push(await simulateGranuleUpdate(knex, t.context.granules.slice(5), | ||
t.context.collection, t.context.collectionId, t.context.collectionId)); | ||
|
||
t.context.movedGranules = t.context.movedGranules.flat(); | ||
}); | ||
|
||
test.after.always(async (t) => { | ||
await destroyLocalTestDb({ | ||
...t.context, | ||
testDbName, | ||
}); | ||
}); | ||
|
||
test.serial('updateGranuleAndFiles successfully updates a partial list of granules based on the collectionId change', async (t) => { | ||
const { | ||
granuleIds, | ||
granulePgModel, | ||
movedGranules, | ||
collectionId2, | ||
collectionId, | ||
collection, | ||
collection2, | ||
knex, | ||
} = t.context; | ||
await updateGranuleAndFiles(knex, movedGranules); | ||
|
||
const returnedGranules = await Promise.all(granuleIds.map((id) => | ||
getUniqueGranuleByGranuleId(knex, id, granulePgModel))); | ||
|
||
for (const granule of returnedGranules) { | ||
const testCollection = granule.cumulus_id >= 5 ? collection : collection2; | ||
const testCollectionId = granule.cumulus_id >= 5 ? collectionId : collectionId2; | ||
const apiGranule = await translatePostgresGranuleResultToApiGranule(knex, { | ||
...granule, | ||
collectionName: testCollection.name, | ||
collectionVersion: testCollection.version, | ||
}); | ||
// the movedGranules param only has 1/2 of the granules to be moved to collection 2 | ||
// here we can check based on the granule's cumulus id which collection it should be a part of | ||
t.true(apiGranule.collectionId === testCollectionId); | ||
for (const file of apiGranule.files) { | ||
t.true(file.key.includes(testCollectionId)); | ||
t.true(file.bucket.includes(testCollectionId)); | ||
} | ||
} | ||
}); | ||
|
||
test.serial('updateGranuleAndFiles successfully updates a complete list of granules, 1/2 of which have already been moved', async (t) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need a more complete set of units testing all granule updates, given the method isn't written to update files and the collection ID, but all granule fields. If that 's not intentional and this method is intending to be limited to updating file locations and the collectionID, we should probably make the method more defensive somehow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so Ethan's task would send the target_granules to me, which are the granule records post move-collections, which is just a complete granule record, so I can do this: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Nnaga1 to some degree that depends on what the intent is. As written this I don't believe this method enforces the ticket context (only update collection and file paths) If the intent is to provide a method that takes an array of granule objects and updates it only utilizing translation conventions and ignoring API write business logic, then we should test that broader case. If the intent is this helper is designed to just update those specific fields, we should update it to do only that and test. This is important as this method is an exposed package method and creating a user contract. |
||
const { | ||
granuleIds, | ||
granulePgModel, | ||
granules, | ||
movedGranules, | ||
collectionId2, | ||
collectionId, | ||
collection, | ||
collection2, | ||
knex, | ||
} = t.context; | ||
// the remaining granules of movedGranules in collection 1 will need to be updated to collection 2 | ||
movedGranules.splice(-5); | ||
movedGranules.push(await simulateGranuleUpdate(knex, granules.slice(5), collection, | ||
collectionId, collectionId2)); | ||
|
||
const testPostMoveApiGranules = movedGranules.flat(); | ||
await updateGranuleAndFiles(knex, testPostMoveApiGranules); | ||
|
||
const returnedGranules = await Promise.all(granuleIds.map((id) => | ||
getUniqueGranuleByGranuleId(knex, id, granulePgModel))); | ||
|
||
for (const granule of returnedGranules) { | ||
const apiGranule = await translatePostgresGranuleResultToApiGranule(knex, { | ||
...granule, | ||
collectionName: collection2.name, | ||
collectionVersion: collection2.version, | ||
}); | ||
// now every granule should be part of collection 2 | ||
t.true(apiGranule.collectionId === collectionId2); | ||
for (const file of apiGranule.files) { | ||
t.true(file.key.includes(collectionId2)); | ||
t.true(file.bucket.includes(collectionId2)); | ||
} | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method looks like it's purpose is to create a bulk granule update method that circumvents the existing granule write logic. That probably isn't specific to collections.
If we are intending that it be specific to moving a collection....I'm assuming the need to entirely re-write the entire object is due to the intent to move files as well, but not try to parallelize writeGranuleFromApi because the business logic should be irrelevant to this use case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method looks like it's purpose is to create a bulk granule update method that circumvents the existing granule write logic.
: this would be the intention of this method, to avoid changing core API functionality and instead do this.If we are intending that it be specific to moving a collection
: this part, I'm not sure, I intended it to be used for Ethan's task but if there are potential applications elsewhere then it can probably be re-used I'd assume