Skip to content

Commit

Permalink
Merge pull request #3158 from OriginTrail/v6/prerelease/mainnet
Browse files Browse the repository at this point in the history
OriginTrail Mainnet Release v6.3.0
  • Loading branch information
djordjekovac authored Apr 26, 2024
2 parents 830f4cc + 4842b6c commit 4ad7024
Show file tree
Hide file tree
Showing 13 changed files with 385 additions and 125 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "origintrail_node",
"version": "6.2.4",
"version": "6.3.0",
"description": "OTNode V6",
"main": "index.js",
"type": "module",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
/* eslint-disable no-await-in-loop */
import { setTimeout as sleep } from 'timers/promises';
import Command from '../../command.js';
import {
CONTENT_ASSET_HASH_FUNCTION_ID,
EXPECTED_TRANSACTION_ERRORS,
GET_ASSERTION_IDS_MAX_RETRY_COUNT,
GET_ASSERTION_IDS_RETRY_DELAY_IN_SECONDS,
GET_LATEST_SERVICE_AGREEMENT_BATCH_SIZE,
GET_LATEST_SERVICE_AGREEMENT_EXCLUDE_LATEST_TOKEN_ID,
GET_LATEST_SERVICE_AGREEMENT_FREQUENCY_MILLS,
SERVICE_AGREEMENT_SOURCES,
} from '../../../constants/constants.js';

class BlockchainGetLatestServiceAgreement extends Command {
constructor(ctx) {
super(ctx);
this.repositoryModuleManager = ctx.repositoryModuleManager;
this.blockchainModuleManager = ctx.blockchainModuleManager;
this.serviceAgreementService = ctx.serviceAgreementService;
this.ualService = ctx.ualService;
}

/**
* Executes command and produces one or more events
* @param command
*/
async execute(command) {
const { blockchain } = command.data;

const assetStorageContractAddresses =
this.blockchainModuleManager.getAssetStorageContractAddresses(blockchain);

const results = await Promise.all(
assetStorageContractAddresses.map((contract) =>
this.updateAgreementDataForAssetContract(
contract,
blockchain,
command.data[contract],
),
),
);

results.forEach((result) => {
if (result) {
// eslint-disable-next-line no-param-reassign
command.data[result.contract] = result.lastProcessedTokenId;
this.logger.debug(
`Get latest service agreement: updating last processed token id: ${result.lastProcessedTokenId} for blockchain ${blockchain}`,
);
}
});

return Command.repeat();
}

async updateAgreementDataForAssetContract(contract, blockchain, lastProcessedTokenId) {
this.logger.info(
`Get latest service agreement: Starting get latest service agreement command, last processed token id: ${lastProcessedTokenId} for blockchain: ${blockchain}`,
);
let latestBlockchainTokenId;
try {
latestBlockchainTokenId =
Number(await this.blockchainModuleManager.getLatestTokenId(blockchain, contract)) -
GET_LATEST_SERVICE_AGREEMENT_EXCLUDE_LATEST_TOKEN_ID;
} catch (error) {
if (error.message.includes(EXPECTED_TRANSACTION_ERRORS.NO_MINTED_ASSETS)) {
this.logger.info(
`Get latest service agreement: No minted assets on blockchain: ${blockchain}`,
);
return;
}
this.logger.error(
`Unable to process agreement data for asset contract ${contract}. Error: ${error}`,
);
return;
}

const latestDbTokenId =
lastProcessedTokenId ??
(await this.repositoryModuleManager.getLatestServiceAgreementTokenId(blockchain)) ??
latestBlockchainTokenId;

if (latestBlockchainTokenId < latestDbTokenId) {
this.logger.debug(
`Get latest service agreement: No new agreements found on blockchain: ${blockchain}.`,
);
return {
contract,
lastProcessedTokenId: latestDbTokenId,
};
}

if (latestBlockchainTokenId < latestDbTokenId) {
this.logger.debug(
`Get latest service agreement: No new agreements found on blockchain: ${blockchain}.`,
);
return;
}

this.logger.debug(
`Get latest service agreement: Latest token id on chain: ${latestBlockchainTokenId}, latest token id in database: ${latestDbTokenId} on blockchain: ${blockchain}`,
);

let tokenIdDifference = latestBlockchainTokenId - latestDbTokenId;
let getAgreementDataPromise = [];
for (
let tokenIdToBeFetched = latestDbTokenId + 1;
tokenIdToBeFetched <= latestBlockchainTokenId;
tokenIdToBeFetched += 1
) {
getAgreementDataPromise.push(
this.getAgreementDataForToken(tokenIdToBeFetched, blockchain, contract),
);
if (
getAgreementDataPromise.length === tokenIdDifference ||
getAgreementDataPromise.length === GET_LATEST_SERVICE_AGREEMENT_BATCH_SIZE
) {
const missingAgreements = await Promise.all(getAgreementDataPromise);

await this.repositoryModuleManager.bulkCreateServiceAgreementRecords(
missingAgreements.filter((agreement) => agreement != null),
);
getAgreementDataPromise = [];
tokenIdDifference -= GET_LATEST_SERVICE_AGREEMENT_BATCH_SIZE;
}
}
if (latestBlockchainTokenId - latestDbTokenId > 0) {
this.logger.debug(
`Get latest service agreement: Successfully fetched ${
latestBlockchainTokenId - latestDbTokenId
} on blockchain: ${blockchain}`,
);
}
return {
contract,
lastProcessedTokenId: latestBlockchainTokenId,
};
}

async getAgreementDataForToken(
tokenId,
blockchain,
contract,
hashFunctionId = CONTENT_ASSET_HASH_FUNCTION_ID,
) {
try {
if (await this.repositoryModuleManager.serviceAgreementExists(blockchain, tokenId)) {
this.logger.debug(
`Get latest service agreement: data exists in repository for token id: ${tokenId} on blockchain: ${blockchain}`,
);
return;
}
this.logger.debug(
`Get latest service agreement: Getting agreement data for token id: ${tokenId} on blockchain: ${blockchain}`,
);
let assertionIds = [];
let retryCount = 0;

while (assertionIds.length === 0) {
if (retryCount === GET_ASSERTION_IDS_MAX_RETRY_COUNT) {
throw Error(
`Get latest service agreement: Unable to get assertion ids for token id: ${tokenId} on blockchain: ${blockchain}`,
);
}
this.logger.debug(
`Get latest service agreement: getting assertion ids retry ${retryCount} for token id: ${tokenId} on blockchain: ${blockchain}`,
);
assertionIds = await this.blockchainModuleManager.getAssertionIds(
blockchain,
contract,
tokenId,
);
retryCount += 1;
await sleep(GET_ASSERTION_IDS_RETRY_DELAY_IN_SECONDS * 1000);
}

const keyword = await this.ualService.calculateLocationKeyword(
blockchain,
contract,
tokenId,
assertionIds[0],
);
const agreementId = await this.serviceAgreementService.generateId(
blockchain,
contract,
tokenId,
keyword,
hashFunctionId,
);
const agreementData = await this.blockchainModuleManager.getAgreementData(
blockchain,
agreementId,
);

if (!agreementData) {
throw Error(
`Get latest service agreement: Unable to fetch agreement data while processing asset created event for agreement id: ${agreementId}, blockchain id: ${blockchain}`,
);
}

const latestStateIndex = assertionIds.length - 1;

return {
blockchainId: blockchain,
assetStorageContractAddress: contract,
tokenId,
agreementId,
startTime: agreementData.startTime,
epochsNumber: agreementData.epochsNumber,
epochLength: agreementData.epochLength,
scoreFunctionId: agreementData.scoreFunctionId,
stateIndex: latestStateIndex,
assertionId: assertionIds[latestStateIndex],
hashFunctionId,
keyword,
proofWindowOffsetPerc: agreementData.proofWindowOffsetPerc,
dataSource: SERVICE_AGREEMENT_SOURCES.NODE,
};
} catch (error) {
this.logger.error(error.message);
}
}

/**
* Recover system from failure
* @param error
*/
async recover() {
return Command.repeat();
}

/**
* Builds default command
* @param map
* @returns {{add, data: *, delay: *, deadline: *}}
*/
default(map) {
const command = {
name: 'blockchainGetLatestServiceAgreement',
data: {},
period: GET_LATEST_SERVICE_AGREEMENT_FREQUENCY_MILLS,
transactional: false,
};
Object.assign(command, map);
return command;
}
}

export default BlockchainGetLatestServiceAgreement;
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import Command from '../../command.js';
import { GET_LATEST_SERVICE_AGREEMENT_FREQUENCY_MILLS } from '../../../constants/constants.js';

class GetLatestServiceAgreement extends Command {
constructor(ctx) {
super(ctx);
this.commandExecutor = ctx.commandExecutor;
this.shardingTableService = ctx.shardingTableService;
this.repositoryModuleManager = ctx.repositoryModuleManager;
this.blockchainModuleManager = ctx.blockchainModuleManager;
}

/**
* Executes command and produces one or more events
* @param command
*/
async execute() {
const operationId = this.operationIdService.generateId();

this.logger.info(
`Get latest service agreement: Starting get latest service agreement command for operation id: ${operationId}`,
);

await this.commandExecutor.delete('blockchainGetLatestServiceAgreement');

await Promise.all(
this.blockchainModuleManager.getImplementationNames().map(async (blockchain) => {
const commandData = {
blockchain,
operationId,
};

return this.commandExecutor.add({
name: 'blockchainGetLatestServiceAgreement',
data: commandData,
period: GET_LATEST_SERVICE_AGREEMENT_FREQUENCY_MILLS,
});
}),
);

return Command.empty();
}

/**
* Recover system from failure
* @param command
* @param error
*/
async recover(command) {
this.logger.warn(`Failed to execute ${command.name}. Error: ${command.message}`);

return Command.repeat();
}

/**
* Builds default command
* @param map
* @returns {{add, data: *, delay: *, deadline: *}}
*/
default(map) {
const command = {
name: 'getLatestServiceAgreement',
data: {},
transactional: false,
};
Object.assign(command, map);
return command;
}
}

export default GetLatestServiceAgreement;
Original file line number Diff line number Diff line change
Expand Up @@ -493,12 +493,9 @@ class BlockchainEpochCheckCommand extends Command {

/**
* Recover system from failure
* @param command
* @param error
*/
async recover(command) {
this.logger.warn(`Failed to execute ${command.name}. Error: ${command.message}`);

async recover() {
return Command.repeat();
}

Expand Down
Loading

0 comments on commit 4ad7024

Please sign in to comment.