Skip to content

Commit

Permalink
Merge pull request #1175 from Chia-Network/fix/reorg-negative-generat…
Browse files Browse the repository at this point in the history
…ions

Fix/reorg negative generations
  • Loading branch information
wwills2 authored Oct 3, 2024
2 parents 906e5e0 + a89fcdd commit 0d428de
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 24 deletions.
12 changes: 4 additions & 8 deletions src/config/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,18 @@ const persistanceFolder = `${chiaRoot}/cadt/${getDataModelVersion()}`;
const localQueryLogger = (query) => {
const queryString = query.split(/:\s(.+)/)[1];
const queryHash = createHash('md5').update(queryString).digest('hex');
logger.debug(`SQLite Sequelize [query hash: ${queryHash}]\n\t${query}`);
logger.silly(`SQLite Sequelize [query hash: ${queryHash}]\n\t${query}`);
};

const mirrorQueryLogger = (query) => {
const queryString = query.split(/:\s(.+)/)[1];
const queryHash = createHash('md5').update(queryString).digest('hex');
logger.debug(`Mirror DB Sequelize [query hash: ${queryHash}]\n\t${query}`);
logger.silly(`Mirror DB Sequelize [query hash: ${queryHash}]\n\t${query}`);
};

const appLogLevel = getConfig().APP.LOG_LEVEL;
const localLogging =
appLogLevel === 'silly' || appLogLevel === 'debug' ? localQueryLogger : false;
const mirrorLogging =
appLogLevel === 'silly' || appLogLevel === 'debug'
? mirrorQueryLogger
: false;
const localLogging = appLogLevel === 'silly' ? localQueryLogger : false;
const mirrorLogging = appLogLevel === 'silly' ? mirrorQueryLogger : false;

export default {
local: {
Expand Down
4 changes: 4 additions & 0 deletions src/datalayer/persistance.js
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,10 @@ const getSyncStatus = async (storeId) => {
// We just care that we got some response, not what the response is
if (Object.keys(data).includes('success')) {
return data;
} else {
logger.warn(
`datalayer '/get_sync_status' RPC failed to get sync status for ${storeId}`,
);
}

return false;
Expand Down
29 changes: 29 additions & 0 deletions src/middleware.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
import packageJson from '../package.json' assert { type: 'json' };
import datalayer from './datalayer';
import { Organization } from './models';
import { logger } from './config/logger.js';

const { CADT_API_KEY, READ_ONLY, IS_GOVERNANCE_BODY, USE_SIMULATOR } =
getConfig().APP;
Expand All @@ -40,6 +41,34 @@ app.use(
app.use(express.json());
app.use(bodyParser.urlencoded({ extended: false }));

// Request logger middleware
app.use((req, res, next) => {
logger.verbose(`Received request: ${req.method} ${req.originalUrl}`, {
method: req.method,
url: req.originalUrl,
headers: req.headers,
timestamp: new Date().toISOString(),
});

const startTime = Date.now();

res.on('finish', () => {
const duration = Date.now() - startTime;
logger.verbose(
`Processed request: ${req.method} ${req.originalUrl}, status: ${res.statusCode}, duration: ${duration}ms`,
{
method: req.method,
url: req.originalUrl,
status: res.statusCode,
responseTime: duration,
timestamp: new Date().toISOString(),
},
);
});

next();
});

// Common assertions on every endpoint
app.use(async function (req, res, next) {
try {
Expand Down
93 changes: 78 additions & 15 deletions src/tasks/sync-registries.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ const syncOrganizationAudit = async (organization) => {
logger.debug(`querying datalayer for ${organization.name} root history`);
const rootHistory = await datalayer.getRootHistory(organization.registryId);

if (!rootHistory.length) {
logger.info(
`No root history found for ${organization.name} (store ${organization.orgUid})`,
if (!rootHistory?.length) {
logger.warn(
`Could not find root history for ${organization.name} (orgUid ${organization.orgUid}) with timestamp ${currentGeneration.timestamp}, something is wrong and the sync for this organization will be paused until this is resolved.`,
);
return;
}
Expand Down Expand Up @@ -258,20 +258,15 @@ const syncOrganizationAudit = async (organization) => {
`1 Last processed index of ${organization.name}: ${lastProcessedIndex}`,
);

if (lastProcessedIndex > rootHistory.length) {
logger.error(
`Could not find root history for ${organization.name} (store ${organization.orgUid}) with timestamp ${currentGeneration.timestamp}, something is wrong and the sync for this organization will be paused until this is resolved.`,
);
}

const rootHistoryZeroBasedCount = rootHistory.length - 1;
const syncRemaining = rootHistoryZeroBasedCount - lastProcessedIndex;
const isSynced = syncRemaining === 0;
logger.debug(`2 the root history length for ${organization.name} is ${rootHistory.length}
and the last processed generation is ${lastProcessedIndex}`);
logger.debug(`2 the highest root history index is ${rootHistoryZeroBasedCount},
given this and the last processed index, the number of generations left to sync is ${syncRemaining}`);

logger.debug(
`2 the root history length for ${organization.name} is ${rootHistory.length} and the last processed generation is ${lastProcessedIndex}`,
);
logger.debug(
`2 the highest root history index is ${rootHistoryZeroBasedCount}, given this and the last processed index, the number of generations left to sync is ${syncRemaining}`,
);
logger.debug(
`updating organization model with new sync status for ${organization.name}`,
);
Expand All @@ -285,7 +280,7 @@ const syncOrganizationAudit = async (organization) => {

if (process.env.NODE_ENV !== 'test' && isSynced) {
logger.debug(
`3 Last processed index of ${organization.name}: ${lastProcessedIndex}`,
`${organization.name}: is synced. the last processed index is ${lastProcessedIndex} and the highest root history index is ${rootHistoryZeroBasedCount}`,
);
return;
}
Expand Down Expand Up @@ -315,6 +310,36 @@ const syncOrganizationAudit = async (organization) => {
organization.registryId,
);

if (
sync_status &&
sync_status?.generation &&
sync_status?.target_generation
) {
logger.debug(
`store ${organization.registryId} (${organization.name}) is currently at generation ${sync_status.generation} with a target generation of ${sync_status.target_generation}`,
);
} else {
logger.error(
`could not get datalayer sync status for store ${organization.registryId} (${organization.name}). pausing sync until sync status can be retrieved`,
);
return;
}

const orgRequiredResetDueToInvalidGenerationIndex =
await orgGenerationMismatchCheck(
organization.orgUid,
lastProcessedIndex,
sync_status.generation,
sync_status.target_generation,
);

if (orgRequiredResetDueToInvalidGenerationIndex) {
logger.info(
`${organization.name} was ahead of datalayer and needed to resync a few generations. trying again shortly...`,
);
return;
}

if (toBeProcessedIndex > sync_status.generation) {
const warningMsg = [
`No data found for ${organization.name} (store ${organization.orgUid}) in the current datalayer generation.`,
Expand Down Expand Up @@ -509,4 +534,42 @@ const syncOrganizationAudit = async (organization) => {
}
};

/**
* checks if an organization needs to be reset to a generation, and performs the reset. notifies the caller that the
* org was reset.
*
* datalayer store singletons can lose generation indexes due to blockchain reorgs. while the data is intact, in datalayer
* and cadt, this effectively makes the last synced cadt generation a 'future' generation, which causes problems.
*
* if the DL store is synced, and the cadt generation is higher than the DL generation, the org is resynced to 2 generations
* back from the highest DL generation
* @param orgUid
* @param cadtLastProcessedGeneration
* @param registryStoreGeneration
* @param registryStoreTargetGeneration
* @return {Promise<boolean>}
*/
const orgGenerationMismatchCheck = async (
orgUid,
cadtLastProcessedGeneration,
registryStoreGeneration,
registryStoreTargetGeneration,
) => {
const storeSynced = registryStoreGeneration === registryStoreTargetGeneration;
const lastProcessedGenerationIndexDoesNotExistInDatalayer =
cadtLastProcessedGeneration > registryStoreGeneration;

if (storeSynced && lastProcessedGenerationIndexDoesNotExistInDatalayer) {
const resetToGeneration = registryStoreGeneration - 2; // -2 to have a margin
logger.info(
`resetting org with orgUid ${orgUid} to generation ${resetToGeneration}`,
);

await Audit.resetToGeneration(resetToGeneration, orgUid);
return true;
} else {
return false;
}
};

export default job;
2 changes: 1 addition & 1 deletion src/utils/xls.js
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ function buildObjectXlsData({
columnsWithSpecialTreatment[name] == null ||
!columnsWithSpecialTreatment[name].includes(column)
? primaryKeyProp
: primaryKeyMap[column] ?? primaryKeyMap['default'];
: (primaryKeyMap[column] ?? primaryKeyMap['default']);

if (!Array.isArray(itemValue)) {
const primaryKeyValue =
Expand Down

0 comments on commit 0d428de

Please sign in to comment.