Skip to content

Commit

Permalink
fix: failing tests due to zero vs one based generation count mismatch
Browse files Browse the repository at this point in the history
  • Loading branch information
wwills2 committed Oct 10, 2024
1 parent 2e8b8f0 commit ca8d15c
Showing 1 changed file with 69 additions and 41 deletions.
110 changes: 69 additions & 41 deletions src/tasks/sync-registries.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ const syncOrganizationAudit = async (organization) => {
const homeOrg = await Organization.getHomeOrg();
logger.debug(`querying datalayer for ${organization.name} root history`);
const rootHistory = await datalayer.getRootHistory(organization.registryId);
logger.debug(`querying datalayer for ${organization.name} sync status`);
const { sync_status } = await datalayer.getSyncStatus(
organization.registryId,
);

if (!rootHistory?.length) {
logger.warn(
Expand All @@ -185,6 +189,19 @@ const syncOrganizationAudit = async (organization) => {
return;
}

if (
process.env.NODE_ENV !== 'test' &&
rootHistory.length - 1 !== sync_status?.generation
) {
logger.warn(
`the root history length does not match the number of synced generations for ${organization.name} (registry store Id ${organization.registryId}). something is wrong and the sync for this organization will be paused until this is resolved. `,
);
return;
}

/**
* IMPORTANT: audit data 'generation' field is a generation INDEX, not the actual generation number
*/
let lastRootSavedToAuditTable;

if (CONFIG.USE_SIMULATOR) {
Expand Down Expand Up @@ -253,20 +270,22 @@ const syncOrganizationAudit = async (organization) => {
return;
}

const lastProcessedIndex = lastRootSavedToAuditTable.generation - 1;
const auditTableHighestProcessedGenerationIndex =
lastRootSavedToAuditTable.generation; // generation -> generation INDEX
logger.debug(
`1 Last processed index of ${organization.name}: ${lastProcessedIndex}`,
`1 Last processed generation index of ${organization.name}: ${auditTableHighestProcessedGenerationIndex}`,
);

const rootHistoryZeroBasedCount = rootHistory.length - 1;
const rootHistoryHighestGenerationIndex = rootHistory.length - 1;
const syncRemaining =
Math.abs(rootHistoryZeroBasedCount) - Math.abs(lastProcessedIndex);
Math.abs(rootHistoryHighestGenerationIndex) -
Math.abs(auditTableHighestProcessedGenerationIndex);
const isSynced = syncRemaining === 0;
logger.debug(
`2 the root history length for ${organization.name} is ${rootHistory.length} and the last processed generation index is ${lastRootSavedToAuditTable.generation}`,
`2 the root history length for ${organization.name} is ${rootHistory.length} and the last processed generation index is ${auditTableHighestProcessedGenerationIndex}`,
);
logger.debug(
`2 the highest root history index is ${rootHistoryZeroBasedCount}, given this and the last processed index, the number of generations left to sync is ${syncRemaining}`,
`2 the highest root history index is ${rootHistoryHighestGenerationIndex}, given this and the last processed index, the number of generations left to sync is ${syncRemaining}`,
);
logger.debug(
`updating organization model with new sync status for ${organization.name}`,
Expand All @@ -281,23 +300,24 @@ const syncOrganizationAudit = async (organization) => {

if (process.env.NODE_ENV !== 'test' && isSynced) {
logger.debug(
`${organization.name}: is synced. the last processed index is ${lastProcessedIndex} and the highest root history index is ${rootHistoryZeroBasedCount}`,
`${organization.name}: is synced. the last processed index is ${auditTableHighestProcessedGenerationIndex} and the highest root history index is ${rootHistoryHighestGenerationIndex}`,
);
return;
}

const toBeProcessedIndex = lastProcessedIndex + 1;
const toBeProcessedDatalayerGenerationIndex =
auditTableHighestProcessedGenerationIndex + 1;
logger.debug(
`3 Last processed generation index of ${organization.name}: ${lastProcessedIndex}`,
`3 Last processed generation index of ${organization.name}: ${auditTableHighestProcessedGenerationIndex}`,
);
logger.debug(
`4 To be processed generation index of ${organization.name}: ${toBeProcessedIndex}`,
`4 To be processed generation index of ${organization.name}: ${toBeProcessedDatalayerGenerationIndex}`,
);

// Organization not synced, sync it
logger.info(' ');
logger.info(
`Syncing ${organization.name} generation index ${toBeProcessedIndex} (store ${organization.orgUid})`,
`Syncing ${organization.name} generation index ${toBeProcessedDatalayerGenerationIndex} (store ${organization.orgUid})`,
);
logger.info(
`${organization.name} is ${syncRemaining} DataLayer generations away from being fully synced (store ${organization.orgUid}).`,
Expand All @@ -306,11 +326,6 @@ const syncOrganizationAudit = async (organization) => {
if (!CONFIG.USE_SIMULATOR) {
await new Promise((resolve) => setTimeout(resolve, 30000));

logger.debug(`querying datalayer for ${organization.name} sync status`);
const { sync_status } = await datalayer.getSyncStatus(
organization.registryId,
);

if (
sync_status &&
sync_status?.generation &&
Expand All @@ -329,7 +344,8 @@ const syncOrganizationAudit = async (organization) => {
const orgRequiredResetDueToInvalidGenerationIndex =
await orgGenerationMismatchCheck(
organization.orgUid,
lastRootSavedToAuditTable.generation,
auditTableHighestProcessedGenerationIndex,
rootHistoryHighestGenerationIndex,
sync_status.generation,
sync_status.target_generation,
);
Expand All @@ -341,10 +357,10 @@ const syncOrganizationAudit = async (organization) => {
return;
}

if (toBeProcessedIndex > sync_status.generation) {
if (toBeProcessedDatalayerGenerationIndex + 1 > sync_status.generation) {
const warningMsg = [
`No data found for ${organization.name} (store ${organization.orgUid}) in the current datalayer generation.`,
`DataLayer not yet caught up to generation ${lastProcessedIndex}. The current processed generation is ${sync_status.generation}.`,
`Generation ${toBeProcessedDatalayerGenerationIndex + 1} does not exist in ${organization.name} (registry store ${organization.registryId}) root history`,
`DataLayer not yet caught up to generation ${auditTableHighestProcessedGenerationIndex + 1}. The the highest generation datalayer has synced is ${sync_status.generation}.`,
`This issue is often temporary and could be due to a lag in data propagation.`,
'Syncing for this organization will be paused until this is resolved.',
'For ongoing issues, please contact the organization.',
Expand All @@ -356,13 +372,19 @@ const syncOrganizationAudit = async (organization) => {
}

logger.debug(
`5 Last processed index of ${organization.name}: ${lastProcessedIndex}`,
`5 Last processed index of ${organization.name}: ${auditTableHighestProcessedGenerationIndex}`,
);
const lastProcessedRoot = _.get(
rootHistory,
`[${auditTableHighestProcessedGenerationIndex}]`,
);
const lastProcessedRoot = _.get(rootHistory, `[${lastProcessedIndex}]`);
logger.debug(
`6 To be processed index of ${organization.name}: ${toBeProcessedIndex}`,
`6 To be processed index of ${organization.name}: ${toBeProcessedDatalayerGenerationIndex}`,
);
const rootToBeProcessed = _.get(
rootHistory,
`[${toBeProcessedDatalayerGenerationIndex}]`,
);
const rootToBeProcessed = _.get(rootHistory, `[${toBeProcessedIndex}]`);

logger.debug(
`last processed root of ${organization.name}: ${JSON.stringify(lastProcessedRoot)}`,
Expand All @@ -379,10 +401,10 @@ const syncOrganizationAudit = async (organization) => {
}

logger.debug(
`7 Last processed index of ${organization.name}: ${lastProcessedIndex}`,
`7 Last processed index of ${organization.name}: ${auditTableHighestProcessedGenerationIndex}`,
);
logger.debug(
`8 To be processed index of ${organization.name}: ${toBeProcessedIndex}`,
`8 To be processed index of ${organization.name}: ${toBeProcessedDatalayerGenerationIndex}`,
);

const kvDiff = await datalayer.getRootDiff(
Expand Down Expand Up @@ -412,7 +434,7 @@ const syncOrganizationAudit = async (organization) => {

const updateTransaction = async (transaction, mirrorTransaction) => {
logger.info(
`Syncing ${organization.name} generation ${toBeProcessedIndex} (store ${organization.orgUid})`,
`Syncing ${organization.name} generation ${toBeProcessedDatalayerGenerationIndex} (store ${organization.orgUid})`,
);
if (_.isEmpty(optimizedKvDiff)) {
const auditData = {
Expand All @@ -423,17 +445,17 @@ const syncOrganizationAudit = async (organization) => {
table: null,
change: null,
onchainConfirmationTimeStamp: rootToBeProcessed.timestamp,
generation: toBeProcessedIndex,
generation: toBeProcessedDatalayerGenerationIndex,
comment: '',
author: '',
};

logger.debug(`optimized kv diff is empty between ${organization.name} generation indices ${lastProcessedIndex} and ${toBeProcessedIndex}
logger.debug(`optimized kv diff is empty between ${organization.name} generation indices ${auditTableHighestProcessedGenerationIndex} and ${toBeProcessedDatalayerGenerationIndex}
(roots [generation ${lastRootSavedToAuditTable.generation}] ${lastProcessedRoot.root_hash} and [generation ${lastRootSavedToAuditTable.generation + 1}] ${rootToBeProcessed.root_hash})`);
logger.debug(`creating audit entry`);
await Audit.create(auditData, { transaction, mirrorTransaction });
} else {
logger.debug(`processing optimized kv diff for ${organization.name} generation indices ${lastProcessedIndex} and ${toBeProcessedIndex}
logger.debug(`processing optimized kv diff for ${organization.name} generation indices ${auditTableHighestProcessedGenerationIndex} and ${toBeProcessedDatalayerGenerationIndex}
(roots [generation ${lastRootSavedToAuditTable.generation}] ${lastProcessedRoot.root_hash} and [generation ${lastRootSavedToAuditTable.generation + 1}] ${rootToBeProcessed.root_hash})`);

for (const diff of optimizedKvDiff) {
Expand All @@ -451,7 +473,7 @@ const syncOrganizationAudit = async (organization) => {
table: modelKey,
change: decodeHex(diff.value),
onchainConfirmationTimeStamp: rootToBeProcessed.timestamp,
generation: toBeProcessedIndex,
generation: toBeProcessedDatalayerGenerationIndex,
comment: _.get(
tryParseJSON(
decodeHex(_.get(comment, '[0].value', encodeHex('{}'))),
Expand Down Expand Up @@ -534,7 +556,7 @@ const syncOrganizationAudit = async (organization) => {
};

/**
* checks if an organization needs to be reset to a generation, and performs the reset. notifies the caller that the
* checks if an organization's generations are ahead of needs to be reset , and performs the reset. notifies the caller that the
* org was reset.
*
* datalayer store singletons can lose generation indexes due to blockchain reorgs. while the data is intact, in datalayer
Expand All @@ -543,23 +565,29 @@ const syncOrganizationAudit = async (organization) => {
* if the DL store is synced, and the cadt generation is higher than the DL generation, the org is resynced to 2 generations
* back from the highest DL generation
* @param orgUid
* @param cadtLastProcessedGeneration
* @param registryStoreGeneration
* @param registryStoreTargetGeneration
* @param cadtLastProcessedGenerationIndex
* @param registryStoreSyncGeneration
* @param registryStoreHighestRootHistoryIndex
* @param registryStoreSyncTargetGeneration
* @return {Promise<boolean>}
*/
const orgGenerationMismatchCheck = async (
orgUid,
cadtLastProcessedGeneration,
registryStoreGeneration,
registryStoreTargetGeneration,
cadtLastProcessedGenerationIndex,
registryStoreHighestRootHistoryIndex,
registryStoreSyncGeneration,
registryStoreSyncTargetGeneration,
) => {
const storeSynced = registryStoreGeneration === registryStoreTargetGeneration;
const storeSynced =
registryStoreSyncGeneration === registryStoreSyncTargetGeneration;
const lastProcessedGenerationIndexDoesNotExistInDatalayer =
cadtLastProcessedGeneration > registryStoreGeneration;
cadtLastProcessedGenerationIndex > registryStoreHighestRootHistoryIndex;
logger.debug(
`orgGenerationMismatchCheck() data layer registry store synced: ${storeSynced}, last cadt generation exists in data layer: ${!lastProcessedGenerationIndexDoesNotExistInDatalayer}`,
);

if (storeSynced && lastProcessedGenerationIndexDoesNotExistInDatalayer) {
const resetToGeneration = registryStoreGeneration - 2; // -2 to have a margin
const resetToGeneration = registryStoreSyncGeneration - 2; // -2 to have a margin
logger.info(
`resetting org with orgUid ${orgUid} to generation ${resetToGeneration}`,
);
Expand Down

0 comments on commit ca8d15c

Please sign in to comment.