diff --git a/external/libp2p-service.js b/external/libp2p-service.js index 10b57b9c2..375c26312 100644 --- a/external/libp2p-service.js +++ b/external/libp2p-service.js @@ -71,10 +71,7 @@ class Libp2pService { initializationObject.peerId = this.config.peerId; this.workerPool = this.config.workerPool; - this.limiter = new InMemoryRateLimiter({ - interval: constants.NETWORK_API_RATE_LIMIT_TIME_WINDOW_MILLS, - maxInInterval: constants.NETWORK_API_RATE_LIMIT_MAX_NUMBER, - }); + this._initializeRateLimiters(); Libp2p.create(initializationObject).then((node) => { this.node = node; @@ -94,6 +91,25 @@ class Libp2pService { }); } + _initializeRateLimiters() { + const basicRateLimiter = new InMemoryRateLimiter({ + interval: constants.NETWORK_API_RATE_LIMIT.TIME_WINDOW_MILLS, + maxInInterval: constants.NETWORK_API_RATE_LIMIT.MAX_NUMBER, + }); + + const spamDetection = new InMemoryRateLimiter({ + interval: constants.NETWORK_API_SPAM_DETECTION.TIME_WINDOW_MILLS, + maxInInterval: constants.NETWORK_API_SPAM_DETECTION.MAX_NUMBER, + }); + + this.rateLimiter = { + basicRateLimiter, + spamDetection, + } + + this.blackList = {}; + } + _initializeNodeListeners() { this.node.on('peer:discovery', (peer) => { this._onPeerDiscovery(peer); @@ -111,14 +127,16 @@ class Libp2pService { this.logger.debug(`Node ${this.node.peerId._idB58String} connected to ${connection.remotePeer.toB58String()}`); } - async findNodes(key, limit) { + async findNodes(key, protocol) { const encodedKey = new TextEncoder().encode(key); // Creates a DHT ID by hashing a given Uint8Array const id = (await sha256.digest(encodedKey)).digest; const nodes = this.node._dht.peerRouting.getClosestPeers(id); const result = new Set(); for await (const node of nodes) { - result.add(node); + if(this.node.peerStore.peers.get(node._idB58String).protocols.includes(protocol)){ + result.add(node); + } } this.logger.info(`Found ${result.size} nodes`); @@ -165,14 +183,13 @@ class Libp2pService { this.node.handle(eventName, async (handlerProps) => { const {stream} = handlerProps; let timestamp = Date.now(); - const blocked = await this.limiter.limit(handlerProps.connection.remotePeer.toB58String()); - if(blocked) { + const remotePeerId = handlerProps.connection.remotePeer._idB58String; + if(await this.limitRequest(remotePeerId)) { const preparedBlockedResponse = await this.prepareForSending(constants.NETWORK_RESPONSES.BLOCKED); await pipe( [preparedBlockedResponse], stream ); - this.logger.info(`Blocking request from ${handlerProps.connection.remotePeer._idB58String}. Max number of requests exceeded.`); return; } let data = await pipe( @@ -188,10 +205,10 @@ class Libp2pService { ) try { data = await this.workerPool.exec('JSONParse', [data.toString()]); - this.logger.info(`Receiving message from ${handlerProps.connection.remotePeer._idB58String} to ${this.config.id}: event=${eventName};`); + this.logger.info(`Receiving message from ${remotePeerId} to ${this.config.id}: event=${eventName};`); if (!async) { const result = await handler(data); - this.logger.info(`Sending response from ${this.config.id} to ${handlerProps.connection.remotePeer._idB58String}: event=${eventName};`); + this.logger.info(`Sending response from ${this.config.id} to ${remotePeerId}: event=${eventName};`); const preparedData = await this.prepareForSending(result); await pipe( [Buffer.from(preparedData)], @@ -204,12 +221,12 @@ class Libp2pService { stream ) - this.logger.info(`Sending response from ${this.config.id} to ${handlerProps.connection.remotePeer._idB58String}: event=${eventName};`); + this.logger.info(`Sending response from ${this.config.id} to ${remotePeerId}: event=${eventName};`); const result = await handler(data); if (Date.now() <= timestamp + timeout) { await this.sendMessage(`${eventName}/result`, result, handlerProps.connection.remotePeer); } else { - this.logger.warn(`Too late to send response from ${this.config.id} to ${handlerProps.connection.remotePeer._idB58String}: event=${eventName};`); + this.logger.warn(`Too late to send response from ${this.config.id} to ${remotePeerId}: event=${eventName};`); } } } catch (e) { @@ -265,6 +282,38 @@ class Libp2pService { return false; } + async limitRequest(remotePeerId) { + if(this.blackList[remotePeerId]){ + const remainingMinutes = Math.floor( + constants.NETWORK_API_BLACK_LIST_TIME_WINDOW_MINUTES - + (Date.now() - this.blackList[remotePeerId]) / (1000 * 60) + ); + + if(remainingMinutes > 0) { + this.logger.info(`Blocking request from ${remotePeerId}. Node is blacklisted for ${remainingMinutes} minutes.`); + + return true; + } else { + delete this.blackList[remotePeerId] + } + } + + if(await this.rateLimiter.spamDetection.limit(remotePeerId)) { + this.blackList[remotePeerId] = Date.now(); + this.logger.info( + `Blocking request from ${remotePeerId}. Spammer detected and blacklisted for ${constants.NETWORK_API_BLACK_LIST_TIME_WINDOW_MINUTES} minutes.` + ); + + return true; + } else if (await this.rateLimiter.basicRateLimiter.limit(remotePeerId)) { + this.logger.info(`Blocking request from ${remotePeerId}. Max number of requests exceeded.`); + + return true; + } + + return false; + } + async restartService() { this.logger.info('Restrating libp2p service...'); // TODO: reinitialize service diff --git a/modules/command/publish/send-assertion-command.js b/modules/command/publish/send-assertion-command.js index 4a0ea35fa..e7c581ea5 100644 --- a/modules/command/publish/send-assertion-command.js +++ b/modules/command/publish/send-assertion-command.js @@ -38,6 +38,7 @@ class SendAssertionCommand extends Command { ); const foundNodes = await this.networkService.findNodes( keyword, + constants.NETWORK_PROTOCOLS.STORE, this.config.replicationFactor, ); if (foundNodes.length < this.config.replicationFactor) { diff --git a/modules/constants.js b/modules/constants.js index d578daba5..31fbd0bff 100644 --- a/modules/constants.js +++ b/modules/constants.js @@ -12,46 +12,47 @@ exports.DID = 'DID'; exports.MAX_FILE_SIZE = 2621440; /** - * @constant {number} SERVICE_API_RATE_LIMIT_TIME_WINDOW_MILLS - * - Express rate limit time window in milliseconds + * @constant {object} SERVICE_API_RATE_LIMIT + * - Express rate limit configuration constants */ -exports.SERVICE_API_RATE_LIMIT_TIME_WINDOW_MILLS = 1 * 60 * 1000; - -/** - * @constant {number} SERVICE_API_RATE_LIMIT_MAX_NUMBER - * - Express rate limit max number of requests allowed in the specified time window - */ -exports.SERVICE_API_RATE_LIMIT_MAX_NUMBER = 10; - -/** - * @constant {number} SERVICE_API_SLOW_DOWN_TIME_WINDOW_MILLS - * - Express slow down time window in milliseconds - */ -exports.SERVICE_API_SLOW_DOWN_TIME_WINDOW_MILLS = 1 * 60 * 1000; +exports.SERVICE_API_RATE_LIMIT = { + TIME_WINDOW_MILLS: 1 * 60 * 1000, + MAX_NUMBER: 10, +}; /** - * @constant {number} SERVICE_API_SLOW_DOWN_DELAY_AFTER - * - Express slow down number of seconds after which it starts delaying requests + * @constant {object} SERVICE_API_SLOW_DOWN + * - Express slow down configuration constants */ -exports.SERVICE_API_SLOW_DOWN_DELAY_AFTER = 5; +exports.SERVICE_API_SLOW_DOWN = { + TIME_WINDOW_MILLS: 1 * 60 * 1000, + DELAY_AFTER_SECONDS: 5, + DELAY_MILLS: 3 * 1000, +}; /** - * @constant {number} SERVICE_API_SLOW_DOWN_DELAY_MILLS - * - Express slow down delay between requests in milliseconds + * @constant {object} NETWORK_API_RATE_LIMIT + * - Network (Libp2p) rate limiter configuration constants */ -exports.SERVICE_API_SLOW_DOWN_DELAY_MILLS = 3 * 1000; +exports.NETWORK_API_RATE_LIMIT = { + TIME_WINDOW_MILLS: 1 * 60 * 1000, + MAX_NUMBER: this.SERVICE_API_RATE_LIMIT.MAX_NUMBER, +}; /** - * @constant {number} NETWORK_API_RATE_LIMIT_TIME_WINDOW_MILLS - * - Network (Libp2p) rate limit time window in milliseconds + * @constant {object} NETWORK_API_SPAM_DETECTION + * - Network (Libp2p) spam detection rate limiter configuration constants */ -exports.NETWORK_API_RATE_LIMIT_TIME_WINDOW_MILLS = 1 * 60 * 1000; +exports.NETWORK_API_SPAM_DETECTION = { + TIME_WINDOW_MILLS: 1 * 60 * 1000, + MAX_NUMBER: 20, +}; /** - * @constant {number} NETWORK_API_RATE_LIMIT_MAX_NUMBER - * - Network (Libp2p) rate limit max number of requests allowed in the specified time window + * @constant {object} NETWORK_API_BLACK_LIST_TIME_WINDOW_MINUTES + * - Network (Libp2p) black list time window in minutes */ -exports.NETWORK_API_RATE_LIMIT_MAX_NUMBER = 10; +exports.NETWORK_API_BLACK_LIST_TIME_WINDOW_MINUTES = 60; /** * @constant {number} DID_PREFIX @@ -191,6 +192,19 @@ exports.STRINGIFIED_NETWORK_RESPONSES = { error: '"error"', }; +/** + * @constant {object} STRINGIFIED_NETWORK_RESPONSES - + * Stringified types of known network responses + */ +exports.NETWORK_PROTOCOLS = { + STORE: '/store/1.0.0', + RESOLVE: '/resolve/1.0.0', + SEARCH: '/search/1.0.0', + SEARCH_RESULT: '/search/result/1.0.0', + SEARCH_ASSERTIONS: '/search/assertions/1.0.0', + SEARCH_ASSERTIONS_RESULT: '/search/assertions/result/1.0.0', +}; + /** * @constant {object} ERROR_TYPE - * Types of errors supported diff --git a/modules/controller/rpc-controller.js b/modules/controller/rpc-controller.js index fd66295e8..a8f4895fd 100644 --- a/modules/controller/rpc-controller.js +++ b/modules/controller/rpc-controller.js @@ -126,9 +126,9 @@ class RpcController { initializeRateLimitMiddleware() { this.rateLimitMiddleware = rateLimit({ - windowMs: constants.SERVICE_API_RATE_LIMIT_TIME_WINDOW_MILLS, - max: constants.SERVICE_API_RATE_LIMIT_MAX_NUMBER, - message: `Too many requests sent, maximum number of requests per minute is ${constants.SERVICE_API_RATE_LIMIT_MAX_NUMBER}`, + windowMs: constants.SERVICE_API_RATE_LIMIT.TIME_WINDOW_MILLS, + max: constants.SERVICE_API_RATE_LIMIT.MAX_NUMBER, + message: `Too many requests sent, maximum number of requests per minute is ${constants.SERVICE_API_RATE_LIMIT.MAX_NUMBER}`, standardHeaders: true, // Return rate limit info in the `RateLimit-*` headers legacyHeaders: false, // Disable the `X-RateLimit-*` headers }); @@ -136,32 +136,52 @@ class RpcController { initializeSlowDownMiddleWare() { this.slowDownMiddleware = slowDown({ - windowMs: constants.SERVICE_API_SLOW_DOWN_TIME_WINDOW_MILLS, - delayAfter: constants.SERVICE_API_SLOW_DOWN_DELAY_AFTER, - delayMs: constants.SERVICE_API_SLOW_DOWN_DELAY_MILLS, + windowMs: constants.SERVICE_API_SLOW_DOWN.TIME_WINDOW_MILLS, + delayAfter: constants.SERVICE_API_SLOW_DOWN.DELAY_AFTER_SECONDS, + delayMs: constants.SERVICE_API_SLOW_DOWN.DELAY_MILLS, }); } initializeNetworkApi() { this.logger.info(`Network API module enabled on port ${this.config.network.port}`); - this.networkService.handleMessage('/store', (result) => this.publishService.handleStore(result)); + this.networkService.handleMessage( + constants.NETWORK_PROTOCOLS.STORE, + (result) => this.publishService.handleStore(result), + ); - this.networkService.handleMessage('/resolve', (result) => this.queryService.handleResolve(result)); + this.networkService.handleMessage( + constants.NETWORK_PROTOCOLS.RESOLVE, + (result) => this.queryService.handleResolve(result), + ); - this.networkService.handleMessage('/search', (result) => this.queryService.handleSearch(result), { - async: true, - timeout: 60e3, - }); - - this.networkService.handleMessage('/search/result', (result) => this.queryService.handleSearchResult(result)); - - this.networkService.handleMessage('/search/assertions', (result) => this.queryService.handleSearchAssertions(result), { - async: true, - timeout: 60e3, - }); + this.networkService.handleMessage( + constants.NETWORK_PROTOCOLS.SEARCH, + (result) => this.queryService.handleSearch(result), + { + async: true, + timeout: constants.NETWORK_HANDLER_TIMEOUT, + }, + ); + + this.networkService.handleMessage( + constants.NETWORK_PROTOCOLS.SEARCH_RESULT, + (result) => this.queryService.handleSearchResult(result), + ); + + this.networkService.handleMessage( + constants.NETWORK_PROTOCOLS.SEARCH_ASSERTIONS, + (result) => this.queryService.handleSearchAssertions(result), + { + async: true, + timeout: constants.NETWORK_HANDLER_TIMEOUT, + }, + ); - this.networkService.handleMessage('/search/assertions/result', (result) => this.queryService.handleSearchAssertionsResult(result)); + this.networkService.handleMessage( + constants.NETWORK_PROTOCOLS.SEARCH_ASSERTIONS_RESULT, + (result) => this.queryService.handleSearchAssertionsResult(result), + ); // this.networkService.handleMessage('/query', (result) // => this.queryService.handleQuery(result)); @@ -171,13 +191,13 @@ class RpcController { this.logger.info(`Service API module enabled, server running on port ${this.config.rpcPort}`); this.app.post('/publish', this.rateLimitMiddleware, this.slowDownMiddleware, async (req, res, next) => { - await this.publish(req, res, next, {isAsset: false}); + await this.publish(req, res, next, { isAsset: false }); }); - + this.app.post('/provision', this.rateLimitMiddleware, this.slowDownMiddleware, async (req, res, next) => { - await this.publish(req, res, next, {isAsset: true, ual: null}); + await this.publish(req, res, next, { isAsset: true, ual: null }); }); - + this.app.post('/update', this.rateLimitMiddleware, this.slowDownMiddleware, async (req, res, next) => { if (!req.body.ual) { return next({ @@ -188,247 +208,260 @@ class RpcController { await this.publish(req, res, next, { isAsset: true, ual: req.body.ual }); }); - this.app.get('/resolve', this.rateLimitMiddleware, this.slowDownMiddleware, async (req, res, next) => { - const operationId = uuidv1(); - this.logger.emit({ - msg: 'Started measuring execution of resolve command', - Event_name: 'resolve_start', - Operation_name: 'resolve', - Id_operation: operationId, - }); - - this.logger.emit({ - msg: 'Started measuring execution of resolve init', - Event_name: 'resolve_init_start', - Operation_name: 'resolve_init', - Id_operation: operationId, - }); + this.app.get( + constants.NETWORK_PROTOCOLS.STORE, + this.rateLimitMiddleware, + this.slowDownMiddleware, + async (req, res, next) => { + const operationId = uuidv1(); + this.logger.emit({ + msg: 'Started measuring execution of resolve command', + Event_name: 'resolve_start', + Operation_name: 'resolve', + Id_operation: operationId, + }); - if (!req.query.ids) { - return next({ code: 400, message: 'Param ids is required.' }); - } + this.logger.emit({ + msg: 'Started measuring execution of resolve init', + Event_name: 'resolve_init_start', + Operation_name: 'resolve_init', + Id_operation: operationId, + }); - if (req.query.load === undefined) { - req.query.load = false; - } + if (!req.query.ids) { + return next({ code: 400, message: 'Param ids is required.' }); + } - this.logger.emit({ - msg: 'Finished measuring execution of resolve init', - Event_name: 'resolve_init_end', - Operation_name: 'resolve_init', - Id_operation: operationId, - }); + if (req.query.load === undefined) { + req.query.load = false; + } - let handlerId = null; - try { - const inserted_object = await Models.handler_ids.create({ - status: 'PENDING', - }); - handlerId = inserted_object.dataValues.handler_id; - res.status(202).send({ - handler_id: handlerId, + this.logger.emit({ + msg: 'Finished measuring execution of resolve init', + Event_name: 'resolve_init_end', + Operation_name: 'resolve_init', + Id_operation: operationId, }); - let ids = [req.query.ids]; - if (req.query.ids instanceof Array) { - ids = [...new Set(req.query.ids)]; - } - this.logger.info(`Resolve for ${ids} with handler id ${handlerId} initiated.`); - const response = []; - - for (let id of ids) { - let isAsset = false; - const { assertionId } = await this.blockchainService.getAssetProofs(id); - if (assertionId) { - isAsset = true; - id = assertionId; - } - this.logger.emit({ - msg: id, - Event_name: 'resolve_assertion_id', - Operation_name: 'resolve_assertion_id', - Id_operation: operationId, - }); - this.logger.emit({ - msg: 'Started measuring execution of resolve local', - Event_name: 'resolve_local_start', - Operation_name: 'resolve_local', - Id_operation: operationId, + let handlerId = null; + try { + const inserted_object = await Models.handler_ids.create({ + status: 'PENDING', }); - - const nquads = await this.dataService.resolve(id, true); - - this.logger.emit({ - msg: 'Finished measuring execution of resolve local', - Event_name: 'resolve_local_end', - Operation_name: 'resolve_local', - Id_operation: operationId, + handlerId = inserted_object.dataValues.handler_id; + res.status(202).send({ + handler_id: handlerId, }); - if (nquads) { + let ids = [req.query.ids]; + if (req.query.ids instanceof Array) { + ids = [...new Set(req.query.ids)]; + } + this.logger.info(`Resolve for ${ids} with handler id ${handlerId} initiated.`); + const response = []; + + for (let id of ids) { + let isAsset = false; + const { assertionId } = await this.blockchainService.getAssetProofs(id); + if (assertionId) { + isAsset = true; + id = assertionId; + } + this.logger.emit({ + msg: id, + Event_name: 'resolve_assertion_id', + Operation_name: 'resolve_assertion_id', + Id_operation: operationId, + }); this.logger.emit({ - msg: 'Started measuring execution of create assertion from nquads', - Event_name: 'resolve_create_assertion_from_nquads_start', - Operation_name: 'resolve_create_assertion_from_nquads', + msg: 'Started measuring execution of resolve local', + Event_name: 'resolve_local_start', + Operation_name: 'resolve_local', Id_operation: operationId, }); - const assertion = await this.dataService.createAssertion(nquads); + const nquads = await this.dataService.resolve(id, true); this.logger.emit({ - msg: 'Finished measuring execution of create assertion from nquads', - Event_name: 'resolve_create_assertion_from_nquads_end', - Operation_name: 'resolve_create_assertion_from_nquads', + msg: 'Finished measuring execution of resolve local', + Event_name: 'resolve_local_end', + Operation_name: 'resolve_local', Id_operation: operationId, }); - assertion.jsonld.metadata = JSON.parse( - sortedStringify(assertion.jsonld.metadata), - ); - assertion.jsonld.data = JSON.parse( - sortedStringify( - await this.dataService.fromNQuads( - assertion.jsonld.data, - assertion.jsonld.metadata.type, - ), - ), - ); - response.push(isAsset ? { - type: 'asset', - id: assertion.jsonld.metadata.UALs[0], - result: { - assertions: await this.dataService.assertionsByAsset( - assertion.jsonld.metadata.UALs[0], + if (nquads) { + this.logger.emit({ + msg: 'Started measuring execution of create assertion from nquads', + Event_name: 'resolve_create_assertion_from_nquads_start', + Operation_name: 'resolve_create_assertion_from_nquads', + Id_operation: operationId, + }); + + const assertion = await this.dataService.createAssertion(nquads); + + this.logger.emit({ + msg: 'Finished measuring execution of create assertion from nquads', + Event_name: 'resolve_create_assertion_from_nquads_end', + Operation_name: 'resolve_create_assertion_from_nquads', + Id_operation: operationId, + }); + + assertion.jsonld.metadata = JSON.parse( + sortedStringify(assertion.jsonld.metadata), + ); + assertion.jsonld.data = JSON.parse( + sortedStringify( + await this.dataService.fromNQuads( + assertion.jsonld.data, + assertion.jsonld.metadata.type, + ), ), - metadata: { - type: assertion.jsonld.metadata.type, - issuer: assertion.jsonld.metadata.issuer, - latestState: assertion.jsonld.metadata.timestamp, + ); + response.push(isAsset ? { + type: 'asset', + id: assertion.jsonld.metadata.UALs[0], + result: { + assertions: await this.dataService.assertionsByAsset( + assertion.jsonld.metadata.UALs[0], + ), + metadata: { + type: assertion.jsonld.metadata.type, + issuer: assertion.jsonld.metadata.issuer, + latestState: assertion.jsonld.metadata.timestamp, + }, + data: assertion.jsonld.data, }, - data: assertion.jsonld.data, - }, - } : { - type: 'assertion', - id, - assertion: assertion.jsonld, - }); - response.push(isAsset ? { - type: 'asset', - id: assertion.jsonld.metadata.UALs[0], - result: { - assertions: await this.dataService.assertionsByAsset( - assertion.jsonld.metadata.UALs[0], - ), - metadata: { - type: assertion.jsonld.metadata.type, - issuer: assertion.jsonld.metadata.issuer, - latestState: assertion.jsonld.metadata.timestamp, + } : { + type: 'assertion', + id, + assertion: assertion.jsonld, + }); + response.push(isAsset ? { + type: 'asset', + id: assertion.jsonld.metadata.UALs[0], + result: { + assertions: await this.dataService.assertionsByAsset( + assertion.jsonld.metadata.UALs[0], + ), + metadata: { + type: assertion.jsonld.metadata.type, + issuer: assertion.jsonld.metadata.issuer, + latestState: assertion.jsonld.metadata.timestamp, + }, + data: assertion.jsonld.data, }, - data: assertion.jsonld.data, - }, - } : { - type: 'assertion', - id, - assertion: assertion.jsonld, - }); - } else { - this.logger.info(`Searching for closest ${this.config.replicationFactor} node(s) for keyword ${id}`); - let nodes = await this.networkService.findNodes(id, this.config.replicationFactor); - if (nodes.length < this.config.replicationFactor) { - this.logger.warn(`Found only ${nodes.length} node(s) for keyword ${id}`); - } - for (const node of nodes) { - try { - const assertion = await this.queryService.resolve( - id, req.query.load, isAsset, node, operationId, - ); - if (assertion) { - assertion.jsonld.metadata = JSON.parse( - sortedStringify(assertion.jsonld.metadata), + } : { + type: 'assertion', + id, + assertion: assertion.jsonld, + }); + } else { + this.logger.info(`Searching for closest ${this.config.replicationFactor} node(s) for keyword ${id}`); + const nodes = await this.networkService.findNodes( + id, + constants.NETWORK_PROTOCOLS.STORE, + this.config.replicationFactor, + ); + if (nodes.length < this.config.replicationFactor) { + this.logger.warn(`Found only ${nodes.length} node(s) for keyword ${id}`); + } + for (const node of nodes) { + try { + const assertion = await this.queryService.resolve( + id, req.query.load, isAsset, node, operationId, ); - assertion.jsonld.data = JSON.parse( - sortedStringify( - await this.dataService.fromNQuads( - assertion.jsonld.data, - assertion.jsonld.metadata.type, + if (assertion) { + assertion.jsonld.metadata = JSON.parse( + sortedStringify(assertion.jsonld.metadata), + ); + assertion.jsonld.data = JSON.parse( + sortedStringify( + await this.dataService.fromNQuads( + assertion.jsonld.data, + assertion.jsonld.metadata.type, + ), ), - ), - ); - response.push(isAsset ? { - type: 'asset', - id: assertion.jsonld.metadata.UALs[0], - result: { - metadata: { - type: assertion.jsonld.metadata.type, - issuer: assertion.jsonld.metadata.issuer, - latestState: assertion.jsonld.metadata.timestamp, + ); + response.push(isAsset ? { + type: 'asset', + id: assertion.jsonld.metadata.UALs[0], + result: { + metadata: { + type: assertion.jsonld.metadata.type, + issuer: assertion.jsonld.metadata.issuer, + latestState: assertion + .jsonld.metadata.timestamp, + }, + data: assertion.jsonld.data, }, - data: assertion.jsonld.data, - }, - } : { - type: 'assertion', - id, - assertion: assertion.jsonld, + } : { + type: 'assertion', + id, + assertion: assertion.jsonld, + }); + break; + } + } catch (e) { + this.logger.error({ + msg: `Error while resolving data from another node: ${e.message}. ${e.stack}`, + Event_name: constants.ERROR_TYPE.RESOLVE_ROUTE_ERROR, + Event_value1: e.message, + Id_operation: operationId, }); - break; } - } catch (e) { - this.logger.error({ - msg: `Error while resolving data from another node: ${e.message}. ${e.stack}`, - Event_name: constants.ERROR_TYPE.RESOLVE_ROUTE_ERROR, - Event_value1: e.message, - Id_operation: operationId, - }); } } } - } - const handlerIdCachePath = this.fileService.getHandlerIdCachePath(); + const handlerIdCachePath = this.fileService.getHandlerIdCachePath(); - this.logger.emit({ - msg: 'Started measuring execution of resolve save assertion', - Event_name: 'resolve_save_assertion_start', - Operation_name: 'resolve_save_assertion', - Id_operation: operationId, - }); + this.logger.emit({ + msg: 'Started measuring execution of resolve save assertion', + Event_name: 'resolve_save_assertion_start', + Operation_name: 'resolve_save_assertion', + Id_operation: operationId, + }); - await this.fileService - .writeContentsToFile(handlerIdCachePath, handlerId, JSON.stringify(response)); + await this.fileService.writeContentsToFile( + handlerIdCachePath, + handlerId, + JSON.stringify(response), + ); - this.logger.emit({ - msg: 'Finished measuring execution of resolve save assertion', - Event_name: 'resolve_save_assertion_end', - Operation_name: 'resolve_save_assertion', - Id_operation: operationId, - }); + this.logger.emit({ + msg: 'Finished measuring execution of resolve save assertion', + Event_name: 'resolve_save_assertion_end', + Operation_name: 'resolve_save_assertion', + Id_operation: operationId, + }); - await Models.handler_ids.update( - { - status: 'COMPLETED', - }, { - where: { - handler_id: handlerId, + await Models.handler_ids.update( + { + status: 'COMPLETED', + }, { + where: { + handler_id: handlerId, + }, }, - }, - ); + ); - this.logger.emit({ - msg: 'Finished measuring execution of resolve command', - Event_name: 'resolve_end', - Operation_name: 'resolve', - Id_operation: operationId, - }); - } catch (e) { - this.logger.error({ - msg: `Unexpected error at resolve route: ${e.message}. ${e.stack}`, - Event_name: constants.ERROR_TYPE.RESOLVE_ROUTE_ERROR, - Event_value1: e.message, - Id_operation: operationId, - }); - this.updateFailedHandlerId(handlerId, e, next); - } - }); + this.logger.emit({ + msg: 'Finished measuring execution of resolve command', + Event_name: 'resolve_end', + Operation_name: 'resolve', + Id_operation: operationId, + }); + } catch (e) { + this.logger.error({ + msg: `Unexpected error at resolve route: ${e.message}. ${e.stack}`, + Event_name: constants.ERROR_TYPE.RESOLVE_ROUTE_ERROR, + Event_value1: e.message, + Id_operation: operationId, + }); + this.updateFailedHandlerId(handlerId, e, next); + } + }, + ); this.app.get('/assertions::search', this.rateLimitMiddleware, this.slowDownMiddleware, async (req, res, next) => { if (!req.query.query || req.params.search !== 'search') { @@ -486,7 +519,7 @@ class RpcController { ); this.logger.info(`Searching for closest ${this.config.replicationFactor} node(s) for keyword ${query}`); - let nodes = await this.networkService.findNodes(query, this.config.replicationFactor); + let nodes = await this.networkService.findNodes(query, '/assertions::search', this.config.replicationFactor); if (nodes.length < this.config.replicationFactor) { this.logger.warn(`Found only ${nodes.length} node(s) for keyword ${query}`); } @@ -497,7 +530,7 @@ class RpcController { options: { limit, prefix }, handlerId, }, node)); - await Promise.all(searchPromises); + await Promise.allSettled(searchPromises); } catch (e) { this.logger.error({ msg: `Unexpected error at search assertions route: ${e.message}. ${e.stack}`, @@ -581,6 +614,7 @@ class RpcController { this.logger.info(`Searching for closest ${this.config.replicationFactor} node(s) for keyword ${query}`); nodes = await this.networkService.findNodes( query, + '/entities::search', this.config.replicationFactor, ); if (nodes.length < this.config.replicationFactor) { @@ -609,7 +643,7 @@ class RpcController { limit, handlerId, }, node)); - await Promise.all(searchPromises); + await Promise.allSettled(searchPromises); } catch (e) { this.logger.error({ msg: `Unexpected error at search entities route: ${e.message}. ${e.stack}`, diff --git a/modules/service/network-service.js b/modules/service/network-service.js index e8e40c9a7..e922c6526 100644 --- a/modules/service/network-service.js +++ b/modules/service/network-service.js @@ -32,7 +32,7 @@ class NetworkService { * @param {Number} limit * @returns Promise{Iterable} */ - async findNodes(key, limit) { + async findNodes(key, protocol, limit) { const Id_operation = uuidv1(); this.logger.emit({ msg: 'Started measuring execution of find nodes', Event_name: 'find_nodes_start', Operation_name: 'find_nodes', Id_operation, @@ -40,14 +40,14 @@ class NetworkService { this.logger.emit({ msg: 'Started measuring execution of kad find nodes', Event_name: 'kad_find_nodes_start', Operation_name: 'find_nodes', Id_operation, }); - const nodes = await this.implementation.findNodes(key, limit); + const nodes = await this.implementation.findNodes(key, protocol); this.logger.emit({ msg: 'Finished measuring execution of kad find nodes ', Event_name: 'kad_find_nodes_end', Operation_name: 'find_nodes', Id_operation, }); this.logger.emit({ msg: 'Started measuring execution of rank nodes', Event_name: 'rank_nodes_start', Operation_name: 'find_nodes', Id_operation, }); - const rankedNodes = await this.rankingService.rank(nodes, key, this.config.replicationFactor, ['kad-identity']); + const rankedNodes = await this.rankingService.rank(nodes, key, limit, ['kad-identity']); this.logger.emit({ msg: 'Finished measuring execution of rank nodes', Event_name: 'rank_nodes_end', Operation_name: 'find_nodes', Id_operation, }); diff --git a/modules/service/publish-service.js b/modules/service/publish-service.js index 0cccee6f9..993efc420 100644 --- a/modules/service/publish-service.js +++ b/modules/service/publish-service.js @@ -130,14 +130,22 @@ class PublishService { async store(assertion, node) { // await this.networkService.store(node, topic, {}); let retries = 0; - let response = await this.networkService.sendMessage('/store', assertion, node); + let response = await this.networkService.sendMessage( + constants.NETWORK_PROTOCOLS.STORE, + assertion, + node, + ); while ( response === constants.NETWORK_RESPONSES.BUSY && retries < constants.STORE_MAX_RETRIES ) { retries += 1; await sleep.sleep(constants.STORE_BUSY_REPEAT_INTERVAL_IN_MILLS); - response = await this.networkService.sendMessage('/store', assertion, node); + response = await this.networkService.sendMessage( + constants.NETWORK_PROTOCOLS.STORE, + assertion, + node, + ); } return response; diff --git a/modules/service/query-service.js b/modules/service/query-service.js index e71af18df..bd8848467 100644 --- a/modules/service/query-service.js +++ b/modules/service/query-service.js @@ -17,7 +17,11 @@ class QueryService { resolve(null); }, constants.RESOLVE_MAX_TIME_MILLIS); - const result = await this.networkService.sendMessage('/resolve', id, node); + const result = await this.networkService.sendMessage( + constants.NETWORK_PROTOCOLS.STORE, + id, + node, + ); clearTimeout(timer); resolve(result); }); @@ -119,7 +123,11 @@ class QueryService { } async search(data, node) { - const result = await this.networkService.sendMessage('/search', data, node); + const result = await this.networkService.sendMessage( + constants.NETWORK_PROTOCOLS.SEARCH, + data, + node, + ); return result; } @@ -162,6 +170,10 @@ class QueryService { } async handleSearchResult(request) { + if (request === constants.NETWORK_RESPONSES.BUSY) { + return false; + } + // TODO: add mutex const operationId = uuidv1(); this.logger.emit({ @@ -173,10 +185,6 @@ class QueryService { const { handlerId, response } = request; - if (response === constants.NETWORK_RESPONSES.BUSY) { - return false; - } - const documentPath = this.fileService.getHandlerIdDocumentPath(handlerId); const handlerData = await this.fileService.loadJsonFromFile(documentPath); @@ -235,7 +243,11 @@ class QueryService { } async searchAssertions(data, node) { - const result = await this.networkService.sendMessage('/search/assertions', data, node); + const result = await this.networkService.sendMessage( + constants.NETWORK_PROTOCOLS.SEARCH_ASSERTIONS, + data, + node, + ); return result; } @@ -265,6 +277,10 @@ class QueryService { } async handleSearchAssertionsResult(request) { + if (request === constants.NETWORK_RESPONSES.BUSY) { + return false; + } + // TODO: add mutex const operationId = uuidv1(); this.logger.emit({ @@ -275,10 +291,6 @@ class QueryService { }); const { handlerId, response } = request; - if (response === constants.NETWORK_RESPONSES.BUSY) { - return false; - } - const documentPath = this.fileService.getHandlerIdDocumentPath(handlerId); const handlerData = await this.fileService.loadJsonFromFile(documentPath); if (response !== undefined && response.length && handlerData) { diff --git a/package-lock.json b/package-lock.json index 3c622d36d..9e07c9cae 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "6.0.0-beta.1.31", + "version": "6.0.0-beta.1.32", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "6.0.0-beta.1.31", + "version": "6.0.0-beta.1.32", "license": "ISC", "dependencies": { "app-root-path": "^3.0.0", diff --git a/package.json b/package.json index daaa43132..2c284514a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "6.0.0-beta.1.31", + "version": "6.0.0-beta.1.32", "description": "OTNode v6 Beta 1", "main": "index.js", "scripts": {