Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix sync-registries task refinements failing tests #1193

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 70 additions & 41 deletions src/tasks/sync-registries.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ const syncOrganizationAudit = async (organization) => {
const homeOrg = await Organization.getHomeOrg();
logger.debug(`querying datalayer for ${organization.name} root history`);
const rootHistory = await datalayer.getRootHistory(organization.registryId);
logger.debug(`querying datalayer for ${organization.name} sync status`);
const { sync_status } = await datalayer.getSyncStatus(
organization.registryId,
);

if (!rootHistory?.length) {
logger.warn(
Expand All @@ -185,6 +189,19 @@ const syncOrganizationAudit = async (organization) => {
return;
}

if (
process.env.NODE_ENV !== 'test' &&
rootHistory.length - 1 !== sync_status?.generation
) {
logger.warn(
`the root history length does not match the number of synced generations for ${organization.name} (registry store Id ${organization.registryId}). something is wrong and the sync for this organization will be paused until this is resolved. `,
);
return;
}

/**
* IMPORTANT: audit data 'generation' field is a generation INDEX, not the actual generation number
*/
let lastRootSavedToAuditTable;

if (CONFIG.USE_SIMULATOR) {
Expand Down Expand Up @@ -253,19 +270,22 @@ const syncOrganizationAudit = async (organization) => {
return;
}

const lastProcessedIndex = lastRootSavedToAuditTable.generation - 1;
const auditTableHighestProcessedGenerationIndex =
lastRootSavedToAuditTable.generation; // generation -> generation INDEX
logger.debug(
`1 Last processed index of ${organization.name}: ${lastProcessedIndex}`,
`1 Last processed generation index of ${organization.name}: ${auditTableHighestProcessedGenerationIndex}`,
);

const rootHistoryZeroBasedCount = rootHistory.length - 1;
const syncRemaining = rootHistoryZeroBasedCount - lastProcessedIndex;
const rootHistoryHighestGenerationIndex = rootHistory.length - 1;
const syncRemaining =
Math.abs(rootHistoryHighestGenerationIndex) -
Math.abs(auditTableHighestProcessedGenerationIndex);
const isSynced = syncRemaining === 0;
logger.debug(
`2 the root history length for ${organization.name} is ${rootHistory.length} and the last processed generation index is ${lastRootSavedToAuditTable.generation}`,
`2 the root history length for ${organization.name} is ${rootHistory.length} and the last processed generation index is ${auditTableHighestProcessedGenerationIndex}`,
);
logger.debug(
`2 the highest root history index is ${rootHistoryZeroBasedCount}, given this and the last processed index, the number of generations left to sync is ${syncRemaining}`,
`2 the highest root history index is ${rootHistoryHighestGenerationIndex}, given this and the last processed index, the number of generations left to sync is ${syncRemaining}`,
);
logger.debug(
`updating organization model with new sync status for ${organization.name}`,
Expand All @@ -280,23 +300,24 @@ const syncOrganizationAudit = async (organization) => {

if (process.env.NODE_ENV !== 'test' && isSynced) {
logger.debug(
`${organization.name}: is synced. the last processed index is ${lastProcessedIndex} and the highest root history index is ${rootHistoryZeroBasedCount}`,
`${organization.name}: is synced. the last processed index is ${auditTableHighestProcessedGenerationIndex} and the highest root history index is ${rootHistoryHighestGenerationIndex}`,
);
return;
}

const toBeProcessedIndex = lastProcessedIndex + 1;
const toBeProcessedDatalayerGenerationIndex =
auditTableHighestProcessedGenerationIndex + 1;
logger.debug(
`3 Last processed generation index of ${organization.name}: ${lastProcessedIndex}`,
`3 Last processed generation index of ${organization.name}: ${auditTableHighestProcessedGenerationIndex}`,
);
logger.debug(
`4 To be processed generation index of ${organization.name}: ${toBeProcessedIndex}`,
`4 To be processed generation index of ${organization.name}: ${toBeProcessedDatalayerGenerationIndex}`,
);

// Organization not synced, sync it
logger.info(' ');
logger.info(
`Syncing ${organization.name} generation index ${toBeProcessedIndex} (store ${organization.orgUid})`,
`Syncing ${organization.name} generation index ${toBeProcessedDatalayerGenerationIndex} (store ${organization.orgUid})`,
);
logger.info(
`${organization.name} is ${syncRemaining} DataLayer generations away from being fully synced (store ${organization.orgUid}).`,
Expand All @@ -305,11 +326,6 @@ const syncOrganizationAudit = async (organization) => {
if (!CONFIG.USE_SIMULATOR) {
await new Promise((resolve) => setTimeout(resolve, 30000));

logger.debug(`querying datalayer for ${organization.name} sync status`);
const { sync_status } = await datalayer.getSyncStatus(
organization.registryId,
);

if (
sync_status &&
sync_status?.generation &&
Expand All @@ -328,7 +344,8 @@ const syncOrganizationAudit = async (organization) => {
const orgRequiredResetDueToInvalidGenerationIndex =
await orgGenerationMismatchCheck(
organization.orgUid,
lastRootSavedToAuditTable.generation,
auditTableHighestProcessedGenerationIndex,
rootHistoryHighestGenerationIndex,
sync_status.generation,
sync_status.target_generation,
);
Expand All @@ -340,10 +357,10 @@ const syncOrganizationAudit = async (organization) => {
return;
}

if (toBeProcessedIndex > sync_status.generation) {
if (toBeProcessedDatalayerGenerationIndex + 1 > sync_status.generation) {
const warningMsg = [
`No data found for ${organization.name} (store ${organization.orgUid}) in the current datalayer generation.`,
`DataLayer not yet caught up to generation ${lastProcessedIndex}. The current processed generation is ${sync_status.generation}.`,
`Generation ${toBeProcessedDatalayerGenerationIndex + 1} does not exist in ${organization.name} (registry store ${organization.registryId}) root history`,
`DataLayer not yet caught up to generation ${auditTableHighestProcessedGenerationIndex + 1}. The the highest generation datalayer has synced is ${sync_status.generation}.`,
`This issue is often temporary and could be due to a lag in data propagation.`,
'Syncing for this organization will be paused until this is resolved.',
'For ongoing issues, please contact the organization.',
Expand All @@ -355,13 +372,19 @@ const syncOrganizationAudit = async (organization) => {
}

logger.debug(
`5 Last processed index of ${organization.name}: ${lastProcessedIndex}`,
`5 Last processed index of ${organization.name}: ${auditTableHighestProcessedGenerationIndex}`,
);
const lastProcessedRoot = _.get(
rootHistory,
`[${auditTableHighestProcessedGenerationIndex}]`,
);
const lastProcessedRoot = _.get(rootHistory, `[${lastProcessedIndex}]`);
logger.debug(
`6 To be processed index of ${organization.name}: ${toBeProcessedIndex}`,
`6 To be processed index of ${organization.name}: ${toBeProcessedDatalayerGenerationIndex}`,
);
const rootToBeProcessed = _.get(
rootHistory,
`[${toBeProcessedDatalayerGenerationIndex}]`,
);
const rootToBeProcessed = _.get(rootHistory, `[${toBeProcessedIndex}]`);

logger.debug(
`last processed root of ${organization.name}: ${JSON.stringify(lastProcessedRoot)}`,
Expand All @@ -378,10 +401,10 @@ const syncOrganizationAudit = async (organization) => {
}

logger.debug(
`7 Last processed index of ${organization.name}: ${lastProcessedIndex}`,
`7 Last processed index of ${organization.name}: ${auditTableHighestProcessedGenerationIndex}`,
);
logger.debug(
`8 To be processed index of ${organization.name}: ${toBeProcessedIndex}`,
`8 To be processed index of ${organization.name}: ${toBeProcessedDatalayerGenerationIndex}`,
);

const kvDiff = await datalayer.getRootDiff(
Expand Down Expand Up @@ -411,7 +434,7 @@ const syncOrganizationAudit = async (organization) => {

const updateTransaction = async (transaction, mirrorTransaction) => {
logger.info(
`Syncing ${organization.name} generation ${toBeProcessedIndex} (store ${organization.orgUid})`,
`Syncing ${organization.name} generation ${toBeProcessedDatalayerGenerationIndex} (store ${organization.orgUid})`,
);
if (_.isEmpty(optimizedKvDiff)) {
const auditData = {
Expand All @@ -422,17 +445,17 @@ const syncOrganizationAudit = async (organization) => {
table: null,
change: null,
onchainConfirmationTimeStamp: rootToBeProcessed.timestamp,
generation: toBeProcessedIndex,
generation: toBeProcessedDatalayerGenerationIndex,
comment: '',
author: '',
};

logger.debug(`optimized kv diff is empty between ${organization.name} generation indices ${lastProcessedIndex} and ${toBeProcessedIndex}
logger.debug(`optimized kv diff is empty between ${organization.name} generation indices ${auditTableHighestProcessedGenerationIndex} and ${toBeProcessedDatalayerGenerationIndex}
(roots [generation ${lastRootSavedToAuditTable.generation}] ${lastProcessedRoot.root_hash} and [generation ${lastRootSavedToAuditTable.generation + 1}] ${rootToBeProcessed.root_hash})`);
logger.debug(`creating audit entry`);
await Audit.create(auditData, { transaction, mirrorTransaction });
} else {
logger.debug(`processing optimized kv diff for ${organization.name} generation indices ${lastProcessedIndex} and ${toBeProcessedIndex}
logger.debug(`processing optimized kv diff for ${organization.name} generation indices ${auditTableHighestProcessedGenerationIndex} and ${toBeProcessedDatalayerGenerationIndex}
(roots [generation ${lastRootSavedToAuditTable.generation}] ${lastProcessedRoot.root_hash} and [generation ${lastRootSavedToAuditTable.generation + 1}] ${rootToBeProcessed.root_hash})`);

for (const diff of optimizedKvDiff) {
Expand All @@ -450,7 +473,7 @@ const syncOrganizationAudit = async (organization) => {
table: modelKey,
change: decodeHex(diff.value),
onchainConfirmationTimeStamp: rootToBeProcessed.timestamp,
generation: toBeProcessedIndex,
generation: toBeProcessedDatalayerGenerationIndex,
comment: _.get(
tryParseJSON(
decodeHex(_.get(comment, '[0].value', encodeHex('{}'))),
Expand Down Expand Up @@ -533,7 +556,7 @@ const syncOrganizationAudit = async (organization) => {
};

/**
* checks if an organization needs to be reset to a generation, and performs the reset. notifies the caller that the
* checks if an organization's generations are ahead of needs to be reset , and performs the reset. notifies the caller that the
* org was reset.
*
* datalayer store singletons can lose generation indexes due to blockchain reorgs. while the data is intact, in datalayer
Expand All @@ -542,23 +565,29 @@ const syncOrganizationAudit = async (organization) => {
* if the DL store is synced, and the cadt generation is higher than the DL generation, the org is resynced to 2 generations
* back from the highest DL generation
* @param orgUid
* @param cadtLastProcessedGeneration
* @param registryStoreGeneration
* @param registryStoreTargetGeneration
* @param cadtLastProcessedGenerationIndex
* @param registryStoreSyncGeneration
* @param registryStoreHighestRootHistoryIndex
* @param registryStoreSyncTargetGeneration
* @return {Promise<boolean>}
*/
const orgGenerationMismatchCheck = async (
orgUid,
cadtLastProcessedGeneration,
registryStoreGeneration,
registryStoreTargetGeneration,
cadtLastProcessedGenerationIndex,
registryStoreHighestRootHistoryIndex,
registryStoreSyncGeneration,
registryStoreSyncTargetGeneration,
) => {
const storeSynced = registryStoreGeneration === registryStoreTargetGeneration;
const storeSynced =
registryStoreSyncGeneration === registryStoreSyncTargetGeneration;
const lastProcessedGenerationIndexDoesNotExistInDatalayer =
cadtLastProcessedGeneration > registryStoreGeneration;
cadtLastProcessedGenerationIndex > registryStoreHighestRootHistoryIndex;
logger.debug(
`orgGenerationMismatchCheck() data layer registry store synced: ${storeSynced}, last cadt generation exists in data layer: ${!lastProcessedGenerationIndexDoesNotExistInDatalayer}`,
);

if (storeSynced && lastProcessedGenerationIndexDoesNotExistInDatalayer) {
const resetToGeneration = registryStoreGeneration - 2; // -2 to have a margin
const resetToGeneration = registryStoreSyncGeneration - 2; // -2 to have a margin
logger.info(
`resetting org with orgUid ${orgUid} to generation ${resetToGeneration}`,
);
Expand Down
Loading