diff --git a/package-lock.json b/package-lock.json index 3a18c9d491..1690bab2ef 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "6.3.0", + "version": "6.3.0+hotfix.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "6.3.0", + "version": "6.3.0+hotfix.1", "license": "ISC", "dependencies": { "@comunica/query-sparql": "^2.4.3", diff --git a/package.json b/package.json index 57fad629e8..829f671121 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "6.3.0", + "version": "6.3.0+hotfix.1", "description": "OTNode V6", "main": "index.js", "type": "module", diff --git a/src/commands/common/get-latest-service-agreement/blockchain-get-latest-service-agreement.js b/src/commands/common/get-latest-service-agreement/blockchain-get-latest-service-agreement.js index 80ec73e83c..1782490af6 100644 --- a/src/commands/common/get-latest-service-agreement/blockchain-get-latest-service-agreement.js +++ b/src/commands/common/get-latest-service-agreement/blockchain-get-latest-service-agreement.js @@ -1,13 +1,17 @@ +/* eslint-disable no-await-in-loop */ +import { setTimeout as sleep } from 'timers/promises'; import Command from '../../command.js'; import { CONTENT_ASSET_HASH_FUNCTION_ID, EXPECTED_TRANSACTION_ERRORS, + GET_ASSERTION_IDS_MAX_RETRY_COUNT, + GET_ASSERTION_IDS_RETRY_DELAY_IN_SECONDS, + GET_LATEST_SERVICE_AGREEMENT_BATCH_SIZE, + GET_LATEST_SERVICE_AGREEMENT_EXCLUDE_LATEST_TOKEN_ID, GET_LATEST_SERVICE_AGREEMENT_FREQUENCY_MILLS, SERVICE_AGREEMENT_SOURCES, } from '../../../constants/constants.js'; -const BATCH_SIZE = 50; - class BlockchainGetLatestServiceAgreement extends Command { constructor(ctx) { super(ctx); @@ -27,25 +31,35 @@ class BlockchainGetLatestServiceAgreement extends Command { const assetStorageContractAddresses = this.blockchainModuleManager.getAssetStorageContractAddresses(blockchain); - await Promise.all( + const results = await Promise.all( assetStorageContractAddresses.map((contract) => - this.updateAgreementDataForAssetContract(contract, blockchain), + this.updateAgreementDataForAssetContract( + contract, + blockchain, + command.data[contract], + ), ), ); + results.forEach((result) => { + if (result) { + // eslint-disable-next-line no-param-reassign + command.data[result.contract] = result.lastProcessedTokenId; + } + }); + return Command.repeat(); } - async updateAgreementDataForAssetContract(contract, blockchain) { + async updateAgreementDataForAssetContract(contract, blockchain, lastProcessedTokenId) { this.logger.info( - `Get latest service agreement: Starting get latest service agreement command for blockchain: ${blockchain}`, + `Get latest service agreement: Starting get latest service agreement command, last processed token id: ${lastProcessedTokenId} for blockchain: ${blockchain}`, ); let latestBlockchainTokenId; try { - latestBlockchainTokenId = await this.blockchainModuleManager.getLatestTokenId( - blockchain, - contract, - ); + latestBlockchainTokenId = + Number(await this.blockchainModuleManager.getLatestTokenId(blockchain, contract)) - + GET_LATEST_SERVICE_AGREEMENT_EXCLUDE_LATEST_TOKEN_ID; } catch (error) { if (error.message.includes(EXPECTED_TRANSACTION_ERRORS.NO_MINTED_ASSETS)) { this.logger.info( @@ -53,11 +67,33 @@ class BlockchainGetLatestServiceAgreement extends Command { ); return; } - throw error; + this.logger.error( + `Unable to process agreement data for asset contract ${contract}. Error: ${error}`, + ); + return; } const latestDbTokenId = - (await this.repositoryModuleManager.getLatestServiceAgreementTokenId(blockchain)) ?? 0; + lastProcessedTokenId ?? + (await this.repositoryModuleManager.getLatestServiceAgreementTokenId(blockchain)) ?? + 0; + + if (latestBlockchainTokenId < latestDbTokenId) { + this.logger.debug( + `Get latest service agreement: No new agreements found on blockchain: ${blockchain}.`, + ); + return { + contract, + lastProcessedTokenId: latestDbTokenId, + }; + } + + if (latestBlockchainTokenId < latestDbTokenId) { + this.logger.debug( + `Get latest service agreement: No new agreements found on blockchain: ${blockchain}.`, + ); + return; + } this.logger.debug( `Get latest service agreement: Latest token id on chain: ${latestBlockchainTokenId}, latest token id in database: ${latestDbTokenId} on blockchain: ${blockchain}`, @@ -75,26 +111,28 @@ class BlockchainGetLatestServiceAgreement extends Command { ); if ( getAgreementDataPromise.length === tokenIdDifference || - getAgreementDataPromise.length === BATCH_SIZE + getAgreementDataPromise.length === GET_LATEST_SERVICE_AGREEMENT_BATCH_SIZE ) { - // eslint-disable-next-line no-await-in-loop const missingAgreements = await Promise.all(getAgreementDataPromise); - // eslint-disable-next-line no-await-in-loop await this.repositoryModuleManager.bulkCreateServiceAgreementRecords( missingAgreements.filter((agreement) => agreement != null), ); getAgreementDataPromise = []; - tokenIdDifference -= BATCH_SIZE; + tokenIdDifference -= GET_LATEST_SERVICE_AGREEMENT_BATCH_SIZE; } } - if (latestBlockchainTokenId - latestDbTokenId !== 0) { + if (latestBlockchainTokenId - latestDbTokenId > 0) { this.logger.debug( `Get latest service agreement: Successfully fetched ${ latestBlockchainTokenId - latestDbTokenId } on blockchain: ${blockchain}`, ); } + return { + contract, + lastProcessedTokenId: latestDbTokenId, + }; } async getAgreementDataForToken( @@ -103,56 +141,76 @@ class BlockchainGetLatestServiceAgreement extends Command { contract, hashFunctionId = CONTENT_ASSET_HASH_FUNCTION_ID, ) { - this.logger.debug( - `Get latest service agreement: Getting agreement data for token id: ${tokenId} on blockchain: ${blockchain}`, - ); - const assertionIds = await this.blockchainModuleManager.getAssertionIds( - blockchain, - contract, - tokenId, - ); - const keyword = await this.ualService.calculateLocationKeyword( - blockchain, - contract, - tokenId, - assertionIds[0], - ); - const agreementId = await this.serviceAgreementService.generateId( - blockchain, - contract, - tokenId, - keyword, - hashFunctionId, - ); - const agreementData = await this.blockchainModuleManager.getAgreementData( - blockchain, - agreementId, - ); + try { + this.logger.debug( + `Get latest service agreement: Getting agreement data for token id: ${tokenId} on blockchain: ${blockchain}`, + ); + let assertionIds = []; + let retryCount = 0; + + while (assertionIds.length === 0) { + if (retryCount === GET_ASSERTION_IDS_MAX_RETRY_COUNT) { + throw Error( + `Get latest service agreement: Unable to get assertion ids for token id: ${tokenId} on blockchain: ${blockchain}`, + ); + } + this.logger.debug( + `Get latest service agreement: getting assertion ids retry ${retryCount} for token id: ${tokenId} on blockchain: ${blockchain}`, + ); + assertionIds = await this.blockchainModuleManager.getAssertionIds( + blockchain, + contract, + tokenId, + ); + retryCount += 1; + await sleep(GET_ASSERTION_IDS_RETRY_DELAY_IN_SECONDS * 1000); + } - if (!agreementData) { - this.logger.warn( - `Unable to fetch agreement data while processing asset created event for agreement id: ${agreementId}, blockchain id: ${blockchain}`, + const keyword = await this.ualService.calculateLocationKeyword( + blockchain, + contract, + tokenId, + assertionIds[0], + ); + const agreementId = await this.serviceAgreementService.generateId( + blockchain, + contract, + tokenId, + keyword, + hashFunctionId, + ); + const agreementData = await this.blockchainModuleManager.getAgreementData( + blockchain, + agreementId, ); - } - const latestStateIndex = assertionIds.length - 1; + if (!agreementData) { + throw Error( + `Get latest service agreement: Unable to fetch agreement data while processing asset created event for agreement id: ${agreementId}, blockchain id: ${blockchain}`, + ); + } - return { - blockchainId: blockchain, - assetStorageContractAddress: contract, - tokenId, - agreementId, - startTime: agreementData.startTime, - epochsNumber: agreementData.epochsNumber, - epochLength: agreementData.epochLength, - scoreFunctionId: agreementData.scoreFunctionId, - stateIndex: latestStateIndex, - assertionId: assertionIds[latestStateIndex], - hashFunctionId, - keyword, - proofWindowOffsetPerc: agreementData.proofWindowOffsetPerc, - dataSource: SERVICE_AGREEMENT_SOURCES.NODE, - }; + const latestStateIndex = assertionIds.length - 1; + + return { + blockchainId: blockchain, + assetStorageContractAddress: contract, + tokenId, + agreementId, + startTime: agreementData.startTime, + epochsNumber: agreementData.epochsNumber, + epochLength: agreementData.epochLength, + scoreFunctionId: agreementData.scoreFunctionId, + stateIndex: latestStateIndex, + assertionId: assertionIds[latestStateIndex], + hashFunctionId, + keyword, + proofWindowOffsetPerc: agreementData.proofWindowOffsetPerc, + dataSource: SERVICE_AGREEMENT_SOURCES.NODE, + }; + } catch (error) { + this.logger.error(error.message); + } } /** diff --git a/src/constants/constants.js b/src/constants/constants.js index 66f7ad18a1..be2b6d2da2 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -548,6 +548,14 @@ export const ARCHIVE_UPDATE_RESPONSES_FOLDER = 'update_responses'; */ export const COMMAND_QUEUE_PARALLELISM = 100; +export const GET_LATEST_SERVICE_AGREEMENT_BATCH_SIZE = 50; + +export const GET_ASSERTION_IDS_MAX_RETRY_COUNT = 5; + +export const GET_ASSERTION_IDS_RETRY_DELAY_IN_SECONDS = 2; + +export const GET_LATEST_SERVICE_AGREEMENT_EXCLUDE_LATEST_TOKEN_ID = 1; + /** * @constant {object} HTTP_API_ROUTES - * HTTP API Routes with parameters diff --git a/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js b/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js index 562f9d0617..19ffb462a3 100644 --- a/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js @@ -85,7 +85,7 @@ class ServiceAgreementRepository { async bulkCreateServiceAgreementRecords(serviceAgreements) { return this.model.bulkCreate(serviceAgreements, { - validate: true, + ignoreDuplicates: true, }); }