diff --git a/modules/command/dh/dh-replication-import-command.js b/modules/command/dh/dh-replication-import-command.js index 61fd8bba0..55d5a6a39 100644 --- a/modules/command/dh/dh-replication-import-command.js +++ b/modules/command/dh/dh-replication-import-command.js @@ -1,10 +1,12 @@ const bytes = require('utf8-length'); const fs = require('fs'); +const { sha3_256 } = require('js-sha3'); const Command = require('../command'); const Encryption = require('../../RSAEncryption'); const Utilities = require('../../Utilities'); const Models = require('../../../models/index'); -const { fork } = require('child_process'); +const ImportUtilities = require('../../ImportUtilities'); +const OtJsonUtilities = require('../../OtJsonUtilities'); /** * Imports data for replication @@ -45,140 +47,61 @@ class DhReplicationImportCommand extends Command { encColor, dcIdentity, } = command.data; + const { otJson, permissionedData } + = JSON.parse(fs.readFileSync(documentPath, { encoding: 'utf-8' })); - const blockchainRootHash = await this.blockchain.getRootHash(dataSetId); - const forked = fork('modules/worker/validate-received-replication-worker.js'); - this.logger.info('Validation of received replication started.'); - forked.send(JSON.stringify({ - documentPath, - litigationPublicKey, - offerId, - encColor, - dataSetId, - blockchainRootHash, - litigationRootHash, - })); - - forked.on('message', async (response) => { - if (response.error) { - throw new Error(response.error); - } else { - const { - dataHash, - decryptedDataset, - permissionedData, - encryptedMap, - decryptedGraphRootHash, - } = response; - this.logger.info('Replication data is valid. Replication data import started.'); - this.permissionedDataService.attachPermissionedDataToGraph( - decryptedDataset['@graph'], - permissionedData, - ); - - const holdingData = await Models.holding_data.findOne({ - where: { - offer_id: offerId, - data_set_id: dataSetId, - color: encColor, - source_wallet: dcWallet, - }, - }); - - if (holdingData == null) { - // Store holding information and generate keys for eventual data replication. - const newHoldingEntry = { - data_set_id: dataSetId, - source_wallet: dcWallet, - litigation_public_key: litigationPublicKey, - litigation_root_hash: litigationRootHash, - distribution_public_key: distributionPublicKey, - distribution_private_key: distributionPrivateKey, - distribution_epk: distributionEpk, - transaction_hash: transactionHash, - color: encColor, - offer_id: offerId, - }; - await Models.holding_data.create(newHoldingEntry); - } - - const dataInfo = await Models.data_info.findOne({ - where: { - data_set_id: dataSetId, - }, - }); - await this.permissionedDataService.addDataSellerForPermissionedData( - dataSetId, - dcIdentity, - 0, - dcNodeId, - decryptedDataset['@graph'], - ); - - const importResult = await this.importService.importFile({ - document: decryptedDataset, - encryptedMap, - }); - this.logger.info('Replication data import finalized.'); - fs.unlinkSync(documentPath); - - if (importResult.error) { - throw Error(importResult.error); - } - - if (dataInfo == null) { - const dataSize = bytes(JSON.stringify(decryptedDataset)); - await Models.data_info.create({ - data_set_id: dataSetId, - total_documents: decryptedDataset['@graph'].length, - root_hash: decryptedGraphRootHash, - // TODO: add field data_provider_id: 'Perutnina Ptuj ERC...' - // otjson.datasetHeader.dataProvider || 'Unknown' - // TODO: add field data_provider_id_type: 'ERC725' || 'Unknown' - // TODO: add field data_creator_id: otjson.datasetHeader.dataCreator - // TODO: add field data_creator_id_type: 'ERC725' || 'Unknown' - data_provider_wallet: dcWallet, // TODO: rename to data_creator_wallet - import_timestamp: new Date(), - otjson_size_in_bytes: dataSize, - data_hash: dataHash, - origin: 'HOLDING', - }); - } - this.logger.important(`[DH] Replication finished for offer_id ${offerId}`); - - const toSign = [ - Utilities.denormalizeHex(offerId), - Utilities.denormalizeHex(this.config.erc725Identity)]; - const messageSignature = Encryption.signMessage( - this.web3, - toSign, - Utilities.normalizeHex(this.config.node_private_key), - ); - - const replicationFinishedMessage = { + let decryptedDataset; + let encryptedMap; + let decryptedGraphRootHash; + + this.decryptAndSortDataset(otJson, litigationPublicKey, offerId, encColor) + .then((result) => { + decryptedDataset = result.decDataset; + encryptedMap = result.encMap; + }) + .then(() => this.validateDatasetId(dataSetId, decryptedDataset)) + .then(() => this.validateRootHash(decryptedDataset, dataSetId, otJson)) + .then((result) => { + decryptedGraphRootHash = result; + }) + .then(() => this.validateLitigationRootHash(otJson, litigationRootHash)) + .then(() => this.updatePermissionedData( + decryptedDataset, + permissionedData, + dataSetId, + dcIdentity, + dcNodeId, + )) + .then(() => this.updateHoldingData( + offerId, + dataSetId, + encColor, + dcWallet, + litigationPublicKey, + litigationRootHash, + distributionPublicKey, + distributionPrivateKey, + distributionEpk, + transactionHash, + )) + .then(() => this.importDataset(decryptedDataset, encryptedMap, documentPath)) + .then(() => this.updateDataInfo( + dataSetId, + decryptedDataset, + otJson, + decryptedGraphRootHash, + dcWallet, + )) + .then(() => this.sendReplicationFinishedMessage(offerId, dcNodeId)) + .then(() => this.updateBidData(offerId)) + .then(() => this.commandExecutor.add({ + name: 'dhOfferFinalizedCommand', + deadline_at: Date.now() + (60 * 60 * 1000), // One hour. + period: 10 * 1000, + data: { offerId, - dhIdentity: this.config.erc725Identity, - messageSignature: messageSignature.signature, - wallet: this.config.node_wallet, - }; - - await this.transport.replicationFinished(replicationFinishedMessage, dcNodeId); - const bid = await Models.bids.findOne({ where: { offer_id: offerId } }); - bid.status = 'REPLICATED'; - await bid.save({ fields: ['status'] }); - - this.logger.info(`Sent replication finished message for offer_id ${offerId} to node ${dcNodeId}`); - await this.commandExecutor.add({ - name: 'dhOfferFinalizedCommand', - deadline_at: Date.now() + (60 * 60 * 1000), // One hour. - period: 10 * 1000, - data: { - offerId, - }, - }); - } - forked.kill(); - }); + }, + })); return Command.empty(); } @@ -199,6 +122,181 @@ class DhReplicationImportCommand extends Command { return Command.empty(); } + async decryptAndSortDataset(otJson, litigationPublicKey, offerId, encColor) { + this.logger.trace('Decrypting received dataset.'); + const replication = ImportUtilities.decryptDataset( + otJson, + litigationPublicKey, + offerId, + encColor, + ); + const tempSortedDataset = OtJsonUtilities + .prepareDatasetForNewReplication(replication.decryptedDataset); + if (tempSortedDataset) { + replication.decryptedDataset = tempSortedDataset; + } + + return { + decDataset: replication.decryptedDataset, + encMap: replication.encryptedMap, + }; + } + + async validateDatasetId(dataSetId, decryptedDataset) { + this.logger.trace('Validating received dataset ID.'); + const calculatedDataSetId = ImportUtilities.calculateGraphPublicHash(decryptedDataset); + + if (dataSetId !== calculatedDataSetId) { + throw new Error(`Calculated data set ID ${calculatedDataSetId} differs from DC data set ID ${dataSetId}`); + } + } + + async validateRootHash(decryptedDataset, dataSetId, otJson) { + this.logger.trace('Validating root hash.'); + const decryptedGraphRootHash = ImportUtilities.calculateDatasetRootHash(decryptedDataset); + const blockchainRootHash = await this.blockchain.getRootHash(dataSetId); + if (decryptedGraphRootHash !== blockchainRootHash) { + throw Error(`Calculated root hash ${decryptedGraphRootHash} differs from Blockchain root hash ${blockchainRootHash}`); + } + const originalRootHash = otJson.datasetHeader.dataIntegrity.proofs[0].proofValue; + if (decryptedGraphRootHash !== originalRootHash) { + throw Error(`Calculated root hash ${decryptedGraphRootHash} differs from document root hash ${originalRootHash}`); + } + return decryptedGraphRootHash; + } + + async validateLitigationRootHash(otJson, litigationRootHash) { + this.logger.trace('Validating litigation hash.'); + let sortedDataset = + OtJsonUtilities.prepareDatasetForGeneratingLitigationProof(otJson); + if (!sortedDataset) { + sortedDataset = otJson; + } + const encryptedGraphRootHash = this.challengeService.getLitigationRootHash(sortedDataset['@graph']); + if (encryptedGraphRootHash !== litigationRootHash) { + throw Error(`Calculated distribution hash ${encryptedGraphRootHash} differs from DC distribution hash ${litigationRootHash}`); + } + } + + async updatePermissionedData( + decryptedDataset, + permissionedData, + dataSetId, + dcIdentity, + dcNodeId, + ) { + this.permissionedDataService.attachPermissionedDataToGraph( + decryptedDataset['@graph'], + permissionedData, + ); + + await this.permissionedDataService.addDataSellerForPermissionedData( + dataSetId, + dcIdentity, + 0, + dcNodeId, + decryptedDataset['@graph'], + ); + } + + async updateHoldingData( + offerId, + dataSetId, + encColor, + dcWallet, + litigationPublicKey, + litigationRootHash, + distributionPublicKey, + distributionPrivateKey, + distributionEpk, + transactionHash, + ) { + const holdingData = await Models.holding_data.findOne({ + where: { + offer_id: offerId, + data_set_id: dataSetId, + color: encColor, + source_wallet: dcWallet, + }, + }); + + if (holdingData == null) { + // Store holding information and generate keys for eventual data replication. + const newHoldingEntry = { + data_set_id: dataSetId, + source_wallet: dcWallet, + litigation_public_key: litigationPublicKey, + litigation_root_hash: litigationRootHash, + distribution_public_key: distributionPublicKey, + distribution_private_key: distributionPrivateKey, + distribution_epk: distributionEpk, + transaction_hash: transactionHash, + color: encColor, + offer_id: offerId, + }; + await Models.holding_data.create(newHoldingEntry); + } + } + + async importDataset(decryptedDataset, encryptedMap, documentPath) { + const importResult = await this.importService.importFile({ + document: decryptedDataset, + encryptedMap, + }); + + fs.unlinkSync(documentPath); + + if (importResult.error) { + throw Error(importResult.error); + } + } + + async updateDataInfo(dataSetId, decryptedDataset, otJson, decryptedGraphRootHash, dcWallet) { + const dataInfo = await Models.data_info.findOne({ + where: { + data_set_id: dataSetId, + }, + }); + + if (dataInfo == null) { + const dataSize = bytes(JSON.stringify(decryptedDataset)); + const dataHash = Utilities.normalizeHex(sha3_256(`${otJson}`)); + await Models.data_info.create({ + data_set_id: dataSetId, + total_documents: decryptedDataset['@graph'].length, + root_hash: decryptedGraphRootHash, + data_provider_wallet: dcWallet, + import_timestamp: new Date(), + otjson_size_in_bytes: dataSize, + data_hash: dataHash, + origin: 'HOLDING', + }); + } + } + + async sendReplicationFinishedMessage(offerId, dcNodeId) { + const toSign = [ + Utilities.denormalizeHex(offerId), + Utilities.denormalizeHex(this.config.erc725Identity)]; + const messageSignature = Encryption + .signMessage(this.web3, toSign, Utilities.normalizeHex(this.config.node_private_key)); + + const replicationFinishedMessage = { + offerId, + dhIdentity: this.config.erc725Identity, + messageSignature: messageSignature.signature, + wallet: this.config.node_wallet, + }; + + await this.transport.replicationFinished(replicationFinishedMessage, dcNodeId); + } + + async updateBidData(offerId) { + const bid = await Models.bids.findOne({ where: { offer_id: offerId } }); + bid.status = 'REPLICATED'; + await bid.save({ fields: ['status'] }); + } + /** * Builds default * @param map diff --git a/modules/worker/validate-received-replication-worker.js b/modules/worker/validate-received-replication-worker.js deleted file mode 100644 index 5f407b715..000000000 --- a/modules/worker/validate-received-replication-worker.js +++ /dev/null @@ -1,67 +0,0 @@ -const fs = require('fs'); -const { sha3_256 } = require('js-sha3'); -const ImportUtilities = require('../ImportUtilities'); -const OtJsonUtilities = require('../OtJsonUtilities'); -const ChallengeService = require('../service/challenge-service'); -const Utilities = require('../Utilities'); - -process.on('message', async (data) => { - const { - documentPath, - litigationPublicKey, - offerId, - encColor, - dataSetId, - blockchainRootHash, - litigationRootHash, - } = JSON.parse(data); - try { - const { otJson, permissionedData } - = JSON.parse(fs.readFileSync(documentPath, { encoding: 'utf-8' })); - const dataHash = Utilities.normalizeHex(sha3_256(`${otJson}`)); - const replication = - await ImportUtilities.decryptDataset(otJson, litigationPublicKey, offerId, encColor); - - let { decryptedDataset } = replication; - const { encryptedMap } = replication; - - const tempSortedDataset = OtJsonUtilities.prepareDatasetForNewReplication(decryptedDataset); - if (tempSortedDataset) { - decryptedDataset = tempSortedDataset; - } - const calculatedDataSetId = - await ImportUtilities.calculateGraphPublicHash(decryptedDataset); - - if (dataSetId !== calculatedDataSetId) { - throw new Error(`Calculated data set ID ${calculatedDataSetId} differs from DC data set ID ${dataSetId}`); - } - - const decryptedGraphRootHash = ImportUtilities.calculateDatasetRootHash(decryptedDataset); - - if (decryptedGraphRootHash !== blockchainRootHash) { - throw Error(`Calculated root hash ${decryptedGraphRootHash} differs from Blockchain root hash ${blockchainRootHash}`); - } - - let sortedDataset = - OtJsonUtilities.prepareDatasetForGeneratingLitigationProof(otJson); - if (!sortedDataset) { - sortedDataset = otJson; - } - this.challengeService = new ChallengeService(); - const encryptedGraphRootHash = this.challengeService.getLitigationRootHash(sortedDataset['@graph']); - - if (encryptedGraphRootHash !== litigationRootHash) { - throw Error(`Calculated distribution hash ${encryptedGraphRootHash} differs from DC distribution hash ${litigationRootHash}`); - } - - const originalRootHash = otJson.datasetHeader.dataIntegrity.proofs[0].proofValue; - if (decryptedGraphRootHash !== originalRootHash) { - throw Error(`Calculated root hash ${decryptedGraphRootHash} differs from document root hash ${originalRootHash}`); - } - process.send({ - dataHash, decryptedDataset, permissionedData, encryptedMap, decryptedGraphRootHash, - }); - } catch (error) { - process.send({ error: `${error.message}` }); - } -});