diff --git a/ot-node.js b/ot-node.js index b401df164..31465afe5 100644 --- a/ot-node.js +++ b/ot-node.js @@ -113,6 +113,12 @@ class OTNode { this.logger, this.config, ); + + MigrationExecutor.executeServiceAgreementFixMigration( + this.container, + this.logger, + this.config, + ); } checkNodeVersion() { diff --git a/src/migration/migration-executor.js b/src/migration/migration-executor.js index 8e26b5c9a..850437259 100644 --- a/src/migration/migration-executor.js +++ b/src/migration/migration-executor.js @@ -21,6 +21,7 @@ import GetOldServiceAgreementsMigration from './get-old-service-agreements-migra import ServiceAgreementPruningMigration from './service-agreement-pruning-migration.js'; import RemoveDuplicateServiceAgreementMigration from './remove-duplicate-service-agreement-migration.js'; import DevnetNeuroPruningMigration from './devnet-neuro-pruning-migration.js'; +import ServiceAgreementFixMigration from './service-agreement-fix-migration.js'; class MigrationExecutor { static async executePullShardingTableMigration(container, logger, config) { @@ -529,6 +530,36 @@ class MigrationExecutor { } } + static async executeServiceAgreementFixMigration(container, logger, config) { + if ( + process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || + process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST + ) + return; + + const repositoryModuleManager = container.resolve('repositoryModuleManager'); + const blockchainModuleManager = container.resolve('blockchainModuleManager'); + const serviceAgreementService = container.resolve('serviceAgreementService'); + + const migration = new ServiceAgreementFixMigration( + 'serviceAgreementFixMigration', + logger, + config, + repositoryModuleManager, + blockchainModuleManager, + serviceAgreementService, + ); + if (!(await migration.migrationAlreadyExecuted())) { + try { + await migration.migrate(); + } catch (error) { + logger.error( + `Unable to execute service agreement fix migration. Error: ${error.message}`, + ); + } + } + } + static exitNode(code = 0) { process.exit(code); } diff --git a/src/migration/service-agreement-fix-migration.js b/src/migration/service-agreement-fix-migration.js new file mode 100644 index 000000000..1333b7088 --- /dev/null +++ b/src/migration/service-agreement-fix-migration.js @@ -0,0 +1,113 @@ +import BaseMigration from './base-migration.js'; +import { NODE_ENVIRONMENTS, CONTENT_ASSET_HASH_FUNCTION_ID } from '../constants/constants.js'; + +const NUMBER_OF_ASSETS_FROM_DB = 1_000_000; +const BATCH_FOR_RPC_CALLS = 25; + +class ServiceAgreementFixMigration extends BaseMigration { + constructor( + migrationName, + logger, + config, + repositoryModuleManager, + blockchainModuleManager, + serviceAgreementService, + ) { + super(migrationName, logger, config); + this.repositoryModuleManager = repositoryModuleManager; + this.blockchainModuleManager = blockchainModuleManager; + this.serviceAgreementService = serviceAgreementService; + } + + async executeMigration() { + let blockchainId; + switch (process.env.NODE_ENV) { + case NODE_ENVIRONMENTS.DEVNET: + blockchainId = 'otp:2160'; + break; + case NODE_ENVIRONMENTS.TESTNET: + blockchainId = 'otp:20430'; + break; + case NODE_ENVIRONMENTS.MAINENET: + default: + blockchainId = 'otp:2043'; + } + + // Get count of service agreement for neuroweb + const serviceAgreementCount = + await this.repositoryModuleManager.getCountOfServiceAgreementsByBlockchain( + blockchainId, + ); + + // In batches + const numberOfIteration = Math.ceil(serviceAgreementCount / NUMBER_OF_ASSETS_FROM_DB); + for (let i = 0; i < numberOfIteration; i += 1) { + let serviceAgreementToBeUpdated = []; + const serviceAgreementBatch = + // eslint-disable-next-line no-await-in-loop + await this.repositoryModuleManager.getServiceAgreementsByBlockchainInBatches( + blockchainId, + NUMBER_OF_ASSETS_FROM_DB, + i * NUMBER_OF_ASSETS_FROM_DB, + ); + for (let j = 0; j < serviceAgreementBatch.length; j += BATCH_FOR_RPC_CALLS) { + const currentBatch = serviceAgreementBatch.slice(j, j + BATCH_FOR_RPC_CALLS); + + const batchPromises = currentBatch.map((serviceAgreement) => + this.compareDataWithOnChainData(serviceAgreement), + ); + // eslint-disable-next-line no-await-in-loop + const results = await Promise.all(batchPromises); + + const mismatches = results.filter((result) => result !== null); + + if (mismatches.length > 0) { + this.logger.trace(`Mismatches found: ${mismatches.length}`); + serviceAgreementToBeUpdated = serviceAgreementToBeUpdated.concat(mismatches); + } + } + if (serviceAgreementToBeUpdated.length > 0) { + // eslint-disable-next-line no-await-in-loop + await this.repositoryModuleManager.updateAssertionIdServiceAgreement( + blockchainId, + serviceAgreementToBeUpdated, + ); + } + } + } + + async compareDataWithOnChainData(serviceAgreement) { + let assertionId; + try { + assertionId = await this.blockchainModuleManager.getAssertionIdByIndex( + serviceAgreement.blockchainId, + serviceAgreement.assetStorageContractAddress, + serviceAgreement.tokenId, + 0, + ); + } catch (error) { + this.logger.warn( + `Unable to fetch assertionIdfor token id: ${serviceAgreement.tokenId}`, + ); + return null; + } + + if (serviceAgreement.asssertionId !== assertionId) { + const serviceAgreementId = this.serviceAgreementService.generateId( + serviceAgreement.blockchainId, + serviceAgreement.assetStorageContractAddress, + serviceAgreement.tokenId, + assertionId, + CONTENT_ASSET_HASH_FUNCTION_ID, // 1 - sha256 + ); + return { + tokenId: serviceAgreement.tokenId, + assertionId, + serviceAgreementId, + }; + } + return null; + } +} + +export default ServiceAgreementFixMigration; diff --git a/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js b/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js index 844a523dc..4cb738acb 100644 --- a/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js @@ -331,6 +331,51 @@ class ServiceAgreementRepository { order: [['token_id']], }); } + + async getCountOfServiceAgreementsByBlockchain(blockchainId) { + return this.model.count({ + where: { + blockchainId, + }, + }); + } + + async getServiceAgreementsByBlockchainInBatches(blockchainId, batchSize, offset) { + return this.model.findAll({ + where: { blockchainId }, + limit: batchSize, + offset, + }); + } + + async updateAssertionIdServiceAgreement(blockchainId, serviceAgreementsToBeUpdated) { + const tokenIds = serviceAgreementsToBeUpdated.map( + (serviceAgreement) => serviceAgreement.tokenId, + ); + const assertionIdCase = serviceAgreementsToBeUpdated + .map(({ tokenId, assertionId }) => `WHEN token_id = ${tokenId} THEN '${assertionId}'`) + .join(' '); + + const serviceAgreementIdCase = serviceAgreementsToBeUpdated + .map( + ({ tokenId, serviceAgreementId }) => + `WHEN token_id = ${tokenId} THEN '${serviceAgreementId}'`, + ) + .join(' '); + + const query = ` + UPDATE service_agreement + SET + assertion_id = CASE ${assertionIdCase} ELSE assertion_id END, + agreement_id = CASE ${serviceAgreementIdCase} ELSE agreement_id END + WHERE blockchain_id = :blockchainId AND token_id IN (:tokenIds); + `; + + await this.sequelize.query(query, { + replacements: { blockchainId, tokenIds }, + type: Sequelize.QueryTypes.UPDATE, + }); + } } export default ServiceAgreementRepository; diff --git a/src/modules/repository/repository-module-manager.js b/src/modules/repository/repository-module-manager.js index eea9d4983..c3f7d2028 100644 --- a/src/modules/repository/repository-module-manager.js +++ b/src/modules/repository/repository-module-manager.js @@ -554,6 +554,27 @@ class RepositoryModuleManager extends BaseModuleManager { async getParanetsBlockchains() { return this.getRepository('paranet').getParanetsBlockchains(); } + + async getCountOfServiceAgreementsByBlockchain(blockchainId) { + return this.getRepository('service_agreement').getCountOfServiceAgreementsByBlockchain( + blockchainId, + ); + } + + async getServiceAgreementsByBlockchainInBatches(blockchainId, batchSize, offset) { + return this.getRepository('service_agreement').getServiceAgreementsByBlockchainInBatches( + blockchainId, + batchSize, + offset, + ); + } + + async updateAssertionIdServiceAgreement(blockchainId, serviceAgreementsToBeUpdated) { + return this.getRepository('service_agreement').updateAssertionIdServiceAgreement( + blockchainId, + serviceAgreementsToBeUpdated, + ); + } } export default RepositoryModuleManager;