Skip to content

Commit

Permalink
Merge pull request #1425 from OriginTrail/prerelease/testnet
Browse files Browse the repository at this point in the history
Hotfix: Dh replication command is not executed in child process (#1423)
  • Loading branch information
djordjekovac authored Jan 4, 2021
2 parents 79af0bf + 237a72d commit 6dd9567
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 200 deletions.
364 changes: 231 additions & 133 deletions modules/command/dh/dh-replication-import-command.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
const bytes = require('utf8-length');
const fs = require('fs');
const { sha3_256 } = require('js-sha3');
const Command = require('../command');
const Encryption = require('../../RSAEncryption');
const Utilities = require('../../Utilities');
const Models = require('../../../models/index');
const { fork } = require('child_process');
const ImportUtilities = require('../../ImportUtilities');
const OtJsonUtilities = require('../../OtJsonUtilities');

/**
* Imports data for replication
Expand Down Expand Up @@ -45,140 +47,61 @@ class DhReplicationImportCommand extends Command {
encColor,
dcIdentity,
} = command.data;
const { otJson, permissionedData }
= JSON.parse(fs.readFileSync(documentPath, { encoding: 'utf-8' }));

const blockchainRootHash = await this.blockchain.getRootHash(dataSetId);
const forked = fork('modules/worker/validate-received-replication-worker.js');
this.logger.info('Validation of received replication started.');
forked.send(JSON.stringify({
documentPath,
litigationPublicKey,
offerId,
encColor,
dataSetId,
blockchainRootHash,
litigationRootHash,
}));

forked.on('message', async (response) => {
if (response.error) {
throw new Error(response.error);
} else {
const {
dataHash,
decryptedDataset,
permissionedData,
encryptedMap,
decryptedGraphRootHash,
} = response;
this.logger.info('Replication data is valid. Replication data import started.');
this.permissionedDataService.attachPermissionedDataToGraph(
decryptedDataset['@graph'],
permissionedData,
);

const holdingData = await Models.holding_data.findOne({
where: {
offer_id: offerId,
data_set_id: dataSetId,
color: encColor,
source_wallet: dcWallet,
},
});

if (holdingData == null) {
// Store holding information and generate keys for eventual data replication.
const newHoldingEntry = {
data_set_id: dataSetId,
source_wallet: dcWallet,
litigation_public_key: litigationPublicKey,
litigation_root_hash: litigationRootHash,
distribution_public_key: distributionPublicKey,
distribution_private_key: distributionPrivateKey,
distribution_epk: distributionEpk,
transaction_hash: transactionHash,
color: encColor,
offer_id: offerId,
};
await Models.holding_data.create(newHoldingEntry);
}

const dataInfo = await Models.data_info.findOne({
where: {
data_set_id: dataSetId,
},
});
await this.permissionedDataService.addDataSellerForPermissionedData(
dataSetId,
dcIdentity,
0,
dcNodeId,
decryptedDataset['@graph'],
);

const importResult = await this.importService.importFile({
document: decryptedDataset,
encryptedMap,
});
this.logger.info('Replication data import finalized.');
fs.unlinkSync(documentPath);

if (importResult.error) {
throw Error(importResult.error);
}

if (dataInfo == null) {
const dataSize = bytes(JSON.stringify(decryptedDataset));
await Models.data_info.create({
data_set_id: dataSetId,
total_documents: decryptedDataset['@graph'].length,
root_hash: decryptedGraphRootHash,
// TODO: add field data_provider_id: 'Perutnina Ptuj ERC...'
// otjson.datasetHeader.dataProvider || 'Unknown'
// TODO: add field data_provider_id_type: 'ERC725' || 'Unknown'
// TODO: add field data_creator_id: otjson.datasetHeader.dataCreator
// TODO: add field data_creator_id_type: 'ERC725' || 'Unknown'
data_provider_wallet: dcWallet, // TODO: rename to data_creator_wallet
import_timestamp: new Date(),
otjson_size_in_bytes: dataSize,
data_hash: dataHash,
origin: 'HOLDING',
});
}
this.logger.important(`[DH] Replication finished for offer_id ${offerId}`);

const toSign = [
Utilities.denormalizeHex(offerId),
Utilities.denormalizeHex(this.config.erc725Identity)];
const messageSignature = Encryption.signMessage(
this.web3,
toSign,
Utilities.normalizeHex(this.config.node_private_key),
);

const replicationFinishedMessage = {
let decryptedDataset;
let encryptedMap;
let decryptedGraphRootHash;

this.decryptAndSortDataset(otJson, litigationPublicKey, offerId, encColor)
.then((result) => {
decryptedDataset = result.decDataset;
encryptedMap = result.encMap;
})
.then(() => this.validateDatasetId(dataSetId, decryptedDataset))
.then(() => this.validateRootHash(decryptedDataset, dataSetId, otJson))
.then((result) => {
decryptedGraphRootHash = result;
})
.then(() => this.validateLitigationRootHash(otJson, litigationRootHash))
.then(() => this.updatePermissionedData(
decryptedDataset,
permissionedData,
dataSetId,
dcIdentity,
dcNodeId,
))
.then(() => this.updateHoldingData(
offerId,
dataSetId,
encColor,
dcWallet,
litigationPublicKey,
litigationRootHash,
distributionPublicKey,
distributionPrivateKey,
distributionEpk,
transactionHash,
))
.then(() => this.importDataset(decryptedDataset, encryptedMap, documentPath))
.then(() => this.updateDataInfo(
dataSetId,
decryptedDataset,
otJson,
decryptedGraphRootHash,
dcWallet,
))
.then(() => this.sendReplicationFinishedMessage(offerId, dcNodeId))
.then(() => this.updateBidData(offerId))
.then(() => this.commandExecutor.add({
name: 'dhOfferFinalizedCommand',
deadline_at: Date.now() + (60 * 60 * 1000), // One hour.
period: 10 * 1000,
data: {
offerId,
dhIdentity: this.config.erc725Identity,
messageSignature: messageSignature.signature,
wallet: this.config.node_wallet,
};

await this.transport.replicationFinished(replicationFinishedMessage, dcNodeId);
const bid = await Models.bids.findOne({ where: { offer_id: offerId } });
bid.status = 'REPLICATED';
await bid.save({ fields: ['status'] });

this.logger.info(`Sent replication finished message for offer_id ${offerId} to node ${dcNodeId}`);
await this.commandExecutor.add({
name: 'dhOfferFinalizedCommand',
deadline_at: Date.now() + (60 * 60 * 1000), // One hour.
period: 10 * 1000,
data: {
offerId,
},
});
}
forked.kill();
});
},
}));
return Command.empty();
}

Expand All @@ -199,6 +122,181 @@ class DhReplicationImportCommand extends Command {
return Command.empty();
}

async decryptAndSortDataset(otJson, litigationPublicKey, offerId, encColor) {
this.logger.trace('Decrypting received dataset.');
const replication = ImportUtilities.decryptDataset(
otJson,
litigationPublicKey,
offerId,
encColor,
);
const tempSortedDataset = OtJsonUtilities
.prepareDatasetForNewReplication(replication.decryptedDataset);
if (tempSortedDataset) {
replication.decryptedDataset = tempSortedDataset;
}

return {
decDataset: replication.decryptedDataset,
encMap: replication.encryptedMap,
};
}

async validateDatasetId(dataSetId, decryptedDataset) {
this.logger.trace('Validating received dataset ID.');
const calculatedDataSetId = ImportUtilities.calculateGraphPublicHash(decryptedDataset);

if (dataSetId !== calculatedDataSetId) {
throw new Error(`Calculated data set ID ${calculatedDataSetId} differs from DC data set ID ${dataSetId}`);
}
}

async validateRootHash(decryptedDataset, dataSetId, otJson) {
this.logger.trace('Validating root hash.');
const decryptedGraphRootHash = ImportUtilities.calculateDatasetRootHash(decryptedDataset);
const blockchainRootHash = await this.blockchain.getRootHash(dataSetId);
if (decryptedGraphRootHash !== blockchainRootHash) {
throw Error(`Calculated root hash ${decryptedGraphRootHash} differs from Blockchain root hash ${blockchainRootHash}`);
}
const originalRootHash = otJson.datasetHeader.dataIntegrity.proofs[0].proofValue;
if (decryptedGraphRootHash !== originalRootHash) {
throw Error(`Calculated root hash ${decryptedGraphRootHash} differs from document root hash ${originalRootHash}`);
}
return decryptedGraphRootHash;
}

async validateLitigationRootHash(otJson, litigationRootHash) {
this.logger.trace('Validating litigation hash.');
let sortedDataset =
OtJsonUtilities.prepareDatasetForGeneratingLitigationProof(otJson);
if (!sortedDataset) {
sortedDataset = otJson;
}
const encryptedGraphRootHash = this.challengeService.getLitigationRootHash(sortedDataset['@graph']);
if (encryptedGraphRootHash !== litigationRootHash) {
throw Error(`Calculated distribution hash ${encryptedGraphRootHash} differs from DC distribution hash ${litigationRootHash}`);
}
}

async updatePermissionedData(
decryptedDataset,
permissionedData,
dataSetId,
dcIdentity,
dcNodeId,
) {
this.permissionedDataService.attachPermissionedDataToGraph(
decryptedDataset['@graph'],
permissionedData,
);

await this.permissionedDataService.addDataSellerForPermissionedData(
dataSetId,
dcIdentity,
0,
dcNodeId,
decryptedDataset['@graph'],
);
}

async updateHoldingData(
offerId,
dataSetId,
encColor,
dcWallet,
litigationPublicKey,
litigationRootHash,
distributionPublicKey,
distributionPrivateKey,
distributionEpk,
transactionHash,
) {
const holdingData = await Models.holding_data.findOne({
where: {
offer_id: offerId,
data_set_id: dataSetId,
color: encColor,
source_wallet: dcWallet,
},
});

if (holdingData == null) {
// Store holding information and generate keys for eventual data replication.
const newHoldingEntry = {
data_set_id: dataSetId,
source_wallet: dcWallet,
litigation_public_key: litigationPublicKey,
litigation_root_hash: litigationRootHash,
distribution_public_key: distributionPublicKey,
distribution_private_key: distributionPrivateKey,
distribution_epk: distributionEpk,
transaction_hash: transactionHash,
color: encColor,
offer_id: offerId,
};
await Models.holding_data.create(newHoldingEntry);
}
}

async importDataset(decryptedDataset, encryptedMap, documentPath) {
const importResult = await this.importService.importFile({
document: decryptedDataset,
encryptedMap,
});

fs.unlinkSync(documentPath);

if (importResult.error) {
throw Error(importResult.error);
}
}

async updateDataInfo(dataSetId, decryptedDataset, otJson, decryptedGraphRootHash, dcWallet) {
const dataInfo = await Models.data_info.findOne({
where: {
data_set_id: dataSetId,
},
});

if (dataInfo == null) {
const dataSize = bytes(JSON.stringify(decryptedDataset));
const dataHash = Utilities.normalizeHex(sha3_256(`${otJson}`));
await Models.data_info.create({
data_set_id: dataSetId,
total_documents: decryptedDataset['@graph'].length,
root_hash: decryptedGraphRootHash,
data_provider_wallet: dcWallet,
import_timestamp: new Date(),
otjson_size_in_bytes: dataSize,
data_hash: dataHash,
origin: 'HOLDING',
});
}
}

async sendReplicationFinishedMessage(offerId, dcNodeId) {
const toSign = [
Utilities.denormalizeHex(offerId),
Utilities.denormalizeHex(this.config.erc725Identity)];
const messageSignature = Encryption
.signMessage(this.web3, toSign, Utilities.normalizeHex(this.config.node_private_key));

const replicationFinishedMessage = {
offerId,
dhIdentity: this.config.erc725Identity,
messageSignature: messageSignature.signature,
wallet: this.config.node_wallet,
};

await this.transport.replicationFinished(replicationFinishedMessage, dcNodeId);
}

async updateBidData(offerId) {
const bid = await Models.bids.findOne({ where: { offer_id: offerId } });
bid.status = 'REPLICATED';
await bid.save({ fields: ['status'] });
}

/**
* Builds default
* @param map
Expand Down
Loading

0 comments on commit 6dd9567

Please sign in to comment.