diff --git a/src/tasks/sync-registries.js b/src/tasks/sync-registries.js index d02c0e52..5780d8b7 100644 --- a/src/tasks/sync-registries.js +++ b/src/tasks/sync-registries.js @@ -177,6 +177,10 @@ const syncOrganizationAudit = async (organization) => { const homeOrg = await Organization.getHomeOrg(); logger.debug(`querying datalayer for ${organization.name} root history`); const rootHistory = await datalayer.getRootHistory(organization.registryId); + logger.debug(`querying datalayer for ${organization.name} sync status`); + const { sync_status } = await datalayer.getSyncStatus( + organization.registryId, + ); if (!rootHistory?.length) { logger.warn( @@ -185,6 +189,19 @@ const syncOrganizationAudit = async (organization) => { return; } + if ( + process.env.NODE_ENV !== 'test' && + rootHistory.length - 1 !== sync_status?.generation + ) { + logger.warn( + `the root history length does not match the number of synced generations for ${organization.name} (registry store Id ${organization.registryId}). something is wrong and the sync for this organization will be paused until this is resolved. `, + ); + return; + } + + /** + * IMPORTANT: audit data 'generation' field is a generation INDEX, not the actual generation number + */ let lastRootSavedToAuditTable; if (CONFIG.USE_SIMULATOR) { @@ -253,19 +270,22 @@ const syncOrganizationAudit = async (organization) => { return; } - const lastProcessedIndex = lastRootSavedToAuditTable.generation - 1; + const auditTableHighestProcessedGenerationIndex = + lastRootSavedToAuditTable.generation; // generation -> generation INDEX logger.debug( - `1 Last processed index of ${organization.name}: ${lastProcessedIndex}`, + `1 Last processed generation index of ${organization.name}: ${auditTableHighestProcessedGenerationIndex}`, ); - const rootHistoryZeroBasedCount = rootHistory.length - 1; - const syncRemaining = rootHistoryZeroBasedCount - lastProcessedIndex; + const rootHistoryHighestGenerationIndex = rootHistory.length - 1; + const syncRemaining = + Math.abs(rootHistoryHighestGenerationIndex) - + Math.abs(auditTableHighestProcessedGenerationIndex); const isSynced = syncRemaining === 0; logger.debug( - `2 the root history length for ${organization.name} is ${rootHistory.length} and the last processed generation index is ${lastRootSavedToAuditTable.generation}`, + `2 the root history length for ${organization.name} is ${rootHistory.length} and the last processed generation index is ${auditTableHighestProcessedGenerationIndex}`, ); logger.debug( - `2 the highest root history index is ${rootHistoryZeroBasedCount}, given this and the last processed index, the number of generations left to sync is ${syncRemaining}`, + `2 the highest root history index is ${rootHistoryHighestGenerationIndex}, given this and the last processed index, the number of generations left to sync is ${syncRemaining}`, ); logger.debug( `updating organization model with new sync status for ${organization.name}`, @@ -280,23 +300,24 @@ const syncOrganizationAudit = async (organization) => { if (process.env.NODE_ENV !== 'test' && isSynced) { logger.debug( - `${organization.name}: is synced. the last processed index is ${lastProcessedIndex} and the highest root history index is ${rootHistoryZeroBasedCount}`, + `${organization.name}: is synced. the last processed index is ${auditTableHighestProcessedGenerationIndex} and the highest root history index is ${rootHistoryHighestGenerationIndex}`, ); return; } - const toBeProcessedIndex = lastProcessedIndex + 1; + const toBeProcessedDatalayerGenerationIndex = + auditTableHighestProcessedGenerationIndex + 1; logger.debug( - `3 Last processed generation index of ${organization.name}: ${lastProcessedIndex}`, + `3 Last processed generation index of ${organization.name}: ${auditTableHighestProcessedGenerationIndex}`, ); logger.debug( - `4 To be processed generation index of ${organization.name}: ${toBeProcessedIndex}`, + `4 To be processed generation index of ${organization.name}: ${toBeProcessedDatalayerGenerationIndex}`, ); // Organization not synced, sync it logger.info(' '); logger.info( - `Syncing ${organization.name} generation index ${toBeProcessedIndex} (store ${organization.orgUid})`, + `Syncing ${organization.name} generation index ${toBeProcessedDatalayerGenerationIndex} (store ${organization.orgUid})`, ); logger.info( `${organization.name} is ${syncRemaining} DataLayer generations away from being fully synced (store ${organization.orgUid}).`, @@ -305,11 +326,6 @@ const syncOrganizationAudit = async (organization) => { if (!CONFIG.USE_SIMULATOR) { await new Promise((resolve) => setTimeout(resolve, 30000)); - logger.debug(`querying datalayer for ${organization.name} sync status`); - const { sync_status } = await datalayer.getSyncStatus( - organization.registryId, - ); - if ( sync_status && sync_status?.generation && @@ -328,7 +344,8 @@ const syncOrganizationAudit = async (organization) => { const orgRequiredResetDueToInvalidGenerationIndex = await orgGenerationMismatchCheck( organization.orgUid, - lastRootSavedToAuditTable.generation, + auditTableHighestProcessedGenerationIndex, + rootHistoryHighestGenerationIndex, sync_status.generation, sync_status.target_generation, ); @@ -340,10 +357,10 @@ const syncOrganizationAudit = async (organization) => { return; } - if (toBeProcessedIndex > sync_status.generation) { + if (toBeProcessedDatalayerGenerationIndex + 1 > sync_status.generation) { const warningMsg = [ - `No data found for ${organization.name} (store ${organization.orgUid}) in the current datalayer generation.`, - `DataLayer not yet caught up to generation ${lastProcessedIndex}. The current processed generation is ${sync_status.generation}.`, + `Generation ${toBeProcessedDatalayerGenerationIndex + 1} does not exist in ${organization.name} (registry store ${organization.registryId}) root history`, + `DataLayer not yet caught up to generation ${auditTableHighestProcessedGenerationIndex + 1}. The the highest generation datalayer has synced is ${sync_status.generation}.`, `This issue is often temporary and could be due to a lag in data propagation.`, 'Syncing for this organization will be paused until this is resolved.', 'For ongoing issues, please contact the organization.', @@ -355,13 +372,19 @@ const syncOrganizationAudit = async (organization) => { } logger.debug( - `5 Last processed index of ${organization.name}: ${lastProcessedIndex}`, + `5 Last processed index of ${organization.name}: ${auditTableHighestProcessedGenerationIndex}`, + ); + const lastProcessedRoot = _.get( + rootHistory, + `[${auditTableHighestProcessedGenerationIndex}]`, ); - const lastProcessedRoot = _.get(rootHistory, `[${lastProcessedIndex}]`); logger.debug( - `6 To be processed index of ${organization.name}: ${toBeProcessedIndex}`, + `6 To be processed index of ${organization.name}: ${toBeProcessedDatalayerGenerationIndex}`, + ); + const rootToBeProcessed = _.get( + rootHistory, + `[${toBeProcessedDatalayerGenerationIndex}]`, ); - const rootToBeProcessed = _.get(rootHistory, `[${toBeProcessedIndex}]`); logger.debug( `last processed root of ${organization.name}: ${JSON.stringify(lastProcessedRoot)}`, @@ -378,10 +401,10 @@ const syncOrganizationAudit = async (organization) => { } logger.debug( - `7 Last processed index of ${organization.name}: ${lastProcessedIndex}`, + `7 Last processed index of ${organization.name}: ${auditTableHighestProcessedGenerationIndex}`, ); logger.debug( - `8 To be processed index of ${organization.name}: ${toBeProcessedIndex}`, + `8 To be processed index of ${organization.name}: ${toBeProcessedDatalayerGenerationIndex}`, ); const kvDiff = await datalayer.getRootDiff( @@ -411,7 +434,7 @@ const syncOrganizationAudit = async (organization) => { const updateTransaction = async (transaction, mirrorTransaction) => { logger.info( - `Syncing ${organization.name} generation ${toBeProcessedIndex} (store ${organization.orgUid})`, + `Syncing ${organization.name} generation ${toBeProcessedDatalayerGenerationIndex} (store ${organization.orgUid})`, ); if (_.isEmpty(optimizedKvDiff)) { const auditData = { @@ -422,17 +445,17 @@ const syncOrganizationAudit = async (organization) => { table: null, change: null, onchainConfirmationTimeStamp: rootToBeProcessed.timestamp, - generation: toBeProcessedIndex, + generation: toBeProcessedDatalayerGenerationIndex, comment: '', author: '', }; - logger.debug(`optimized kv diff is empty between ${organization.name} generation indices ${lastProcessedIndex} and ${toBeProcessedIndex} + logger.debug(`optimized kv diff is empty between ${organization.name} generation indices ${auditTableHighestProcessedGenerationIndex} and ${toBeProcessedDatalayerGenerationIndex} (roots [generation ${lastRootSavedToAuditTable.generation}] ${lastProcessedRoot.root_hash} and [generation ${lastRootSavedToAuditTable.generation + 1}] ${rootToBeProcessed.root_hash})`); logger.debug(`creating audit entry`); await Audit.create(auditData, { transaction, mirrorTransaction }); } else { - logger.debug(`processing optimized kv diff for ${organization.name} generation indices ${lastProcessedIndex} and ${toBeProcessedIndex} + logger.debug(`processing optimized kv diff for ${organization.name} generation indices ${auditTableHighestProcessedGenerationIndex} and ${toBeProcessedDatalayerGenerationIndex} (roots [generation ${lastRootSavedToAuditTable.generation}] ${lastProcessedRoot.root_hash} and [generation ${lastRootSavedToAuditTable.generation + 1}] ${rootToBeProcessed.root_hash})`); for (const diff of optimizedKvDiff) { @@ -450,7 +473,7 @@ const syncOrganizationAudit = async (organization) => { table: modelKey, change: decodeHex(diff.value), onchainConfirmationTimeStamp: rootToBeProcessed.timestamp, - generation: toBeProcessedIndex, + generation: toBeProcessedDatalayerGenerationIndex, comment: _.get( tryParseJSON( decodeHex(_.get(comment, '[0].value', encodeHex('{}'))), @@ -533,7 +556,7 @@ const syncOrganizationAudit = async (organization) => { }; /** - * checks if an organization needs to be reset to a generation, and performs the reset. notifies the caller that the + * checks if an organization's generations are ahead of needs to be reset , and performs the reset. notifies the caller that the * org was reset. * * datalayer store singletons can lose generation indexes due to blockchain reorgs. while the data is intact, in datalayer @@ -542,23 +565,29 @@ const syncOrganizationAudit = async (organization) => { * if the DL store is synced, and the cadt generation is higher than the DL generation, the org is resynced to 2 generations * back from the highest DL generation * @param orgUid - * @param cadtLastProcessedGeneration - * @param registryStoreGeneration - * @param registryStoreTargetGeneration + * @param cadtLastProcessedGenerationIndex + * @param registryStoreSyncGeneration + * @param registryStoreHighestRootHistoryIndex + * @param registryStoreSyncTargetGeneration * @return {Promise} */ const orgGenerationMismatchCheck = async ( orgUid, - cadtLastProcessedGeneration, - registryStoreGeneration, - registryStoreTargetGeneration, + cadtLastProcessedGenerationIndex, + registryStoreHighestRootHistoryIndex, + registryStoreSyncGeneration, + registryStoreSyncTargetGeneration, ) => { - const storeSynced = registryStoreGeneration === registryStoreTargetGeneration; + const storeSynced = + registryStoreSyncGeneration === registryStoreSyncTargetGeneration; const lastProcessedGenerationIndexDoesNotExistInDatalayer = - cadtLastProcessedGeneration > registryStoreGeneration; + cadtLastProcessedGenerationIndex > registryStoreHighestRootHistoryIndex; + logger.debug( + `orgGenerationMismatchCheck() data layer registry store synced: ${storeSynced}, last cadt generation exists in data layer: ${!lastProcessedGenerationIndexDoesNotExistInDatalayer}`, + ); if (storeSynced && lastProcessedGenerationIndexDoesNotExistInDatalayer) { - const resetToGeneration = registryStoreGeneration - 2; // -2 to have a margin + const resetToGeneration = registryStoreSyncGeneration - 2; // -2 to have a margin logger.info( `resetting org with orgUid ${orgUid} to generation ${resetToGeneration}`, );