Skip to content

Commit

Permalink
Merge pull request #1861 from OriginTrail/v6/develop
Browse files Browse the repository at this point in the history
OriginTrail 6.0.0-beta.1.32 Testnet Prerelease
  • Loading branch information
kotlarmilos authored Mar 30, 2022
2 parents 6ce9639 + 4e17877 commit 82fa959
Show file tree
Hide file tree
Showing 9 changed files with 409 additions and 291 deletions.
75 changes: 62 additions & 13 deletions external/libp2p-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ class Libp2pService {

initializationObject.peerId = this.config.peerId;
this.workerPool = this.config.workerPool;
this.limiter = new InMemoryRateLimiter({
interval: constants.NETWORK_API_RATE_LIMIT_TIME_WINDOW_MILLS,
maxInInterval: constants.NETWORK_API_RATE_LIMIT_MAX_NUMBER,
});
this._initializeRateLimiters();

Libp2p.create(initializationObject).then((node) => {
this.node = node;
Expand All @@ -94,6 +91,25 @@ class Libp2pService {
});
}

_initializeRateLimiters() {
const basicRateLimiter = new InMemoryRateLimiter({
interval: constants.NETWORK_API_RATE_LIMIT.TIME_WINDOW_MILLS,
maxInInterval: constants.NETWORK_API_RATE_LIMIT.MAX_NUMBER,
});

const spamDetection = new InMemoryRateLimiter({
interval: constants.NETWORK_API_SPAM_DETECTION.TIME_WINDOW_MILLS,
maxInInterval: constants.NETWORK_API_SPAM_DETECTION.MAX_NUMBER,
});

this.rateLimiter = {
basicRateLimiter,
spamDetection,
}

this.blackList = {};
}

_initializeNodeListeners() {
this.node.on('peer:discovery', (peer) => {
this._onPeerDiscovery(peer);
Expand All @@ -111,14 +127,16 @@ class Libp2pService {
this.logger.debug(`Node ${this.node.peerId._idB58String} connected to ${connection.remotePeer.toB58String()}`);
}

async findNodes(key, limit) {
async findNodes(key, protocol) {
const encodedKey = new TextEncoder().encode(key);
// Creates a DHT ID by hashing a given Uint8Array
const id = (await sha256.digest(encodedKey)).digest;
const nodes = this.node._dht.peerRouting.getClosestPeers(id);
const result = new Set();
for await (const node of nodes) {
result.add(node);
if(this.node.peerStore.peers.get(node._idB58String).protocols.includes(protocol)){
result.add(node);
}
}
this.logger.info(`Found ${result.size} nodes`);

Expand Down Expand Up @@ -165,14 +183,13 @@ class Libp2pService {
this.node.handle(eventName, async (handlerProps) => {
const {stream} = handlerProps;
let timestamp = Date.now();
const blocked = await this.limiter.limit(handlerProps.connection.remotePeer.toB58String());
if(blocked) {
const remotePeerId = handlerProps.connection.remotePeer._idB58String;
if(await this.limitRequest(remotePeerId)) {
const preparedBlockedResponse = await this.prepareForSending(constants.NETWORK_RESPONSES.BLOCKED);
await pipe(
[preparedBlockedResponse],
stream
);
this.logger.info(`Blocking request from ${handlerProps.connection.remotePeer._idB58String}. Max number of requests exceeded.`);
return;
}
let data = await pipe(
Expand All @@ -188,10 +205,10 @@ class Libp2pService {
)
try {
data = await this.workerPool.exec('JSONParse', [data.toString()]);
this.logger.info(`Receiving message from ${handlerProps.connection.remotePeer._idB58String} to ${this.config.id}: event=${eventName};`);
this.logger.info(`Receiving message from ${remotePeerId} to ${this.config.id}: event=${eventName};`);
if (!async) {
const result = await handler(data);
this.logger.info(`Sending response from ${this.config.id} to ${handlerProps.connection.remotePeer._idB58String}: event=${eventName};`);
this.logger.info(`Sending response from ${this.config.id} to ${remotePeerId}: event=${eventName};`);
const preparedData = await this.prepareForSending(result);
await pipe(
[Buffer.from(preparedData)],
Expand All @@ -204,12 +221,12 @@ class Libp2pService {
stream
)

this.logger.info(`Sending response from ${this.config.id} to ${handlerProps.connection.remotePeer._idB58String}: event=${eventName};`);
this.logger.info(`Sending response from ${this.config.id} to ${remotePeerId}: event=${eventName};`);
const result = await handler(data);
if (Date.now() <= timestamp + timeout) {
await this.sendMessage(`${eventName}/result`, result, handlerProps.connection.remotePeer);
} else {
this.logger.warn(`Too late to send response from ${this.config.id} to ${handlerProps.connection.remotePeer._idB58String}: event=${eventName};`);
this.logger.warn(`Too late to send response from ${this.config.id} to ${remotePeerId}: event=${eventName};`);
}
}
} catch (e) {
Expand Down Expand Up @@ -265,6 +282,38 @@ class Libp2pService {
return false;
}

async limitRequest(remotePeerId) {
if(this.blackList[remotePeerId]){
const remainingMinutes = Math.floor(
constants.NETWORK_API_BLACK_LIST_TIME_WINDOW_MINUTES -
(Date.now() - this.blackList[remotePeerId]) / (1000 * 60)
);

if(remainingMinutes > 0) {
this.logger.info(`Blocking request from ${remotePeerId}. Node is blacklisted for ${remainingMinutes} minutes.`);

return true;
} else {
delete this.blackList[remotePeerId]
}
}

if(await this.rateLimiter.spamDetection.limit(remotePeerId)) {
this.blackList[remotePeerId] = Date.now();
this.logger.info(
`Blocking request from ${remotePeerId}. Spammer detected and blacklisted for ${constants.NETWORK_API_BLACK_LIST_TIME_WINDOW_MINUTES} minutes.`
);

return true;
} else if (await this.rateLimiter.basicRateLimiter.limit(remotePeerId)) {
this.logger.info(`Blocking request from ${remotePeerId}. Max number of requests exceeded.`);

return true;
}

return false;
}

async restartService() {
this.logger.info('Restrating libp2p service...');
// TODO: reinitialize service
Expand Down
1 change: 1 addition & 0 deletions modules/command/publish/send-assertion-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class SendAssertionCommand extends Command {
);
const foundNodes = await this.networkService.findNodes(
keyword,
constants.NETWORK_PROTOCOLS.STORE,
this.config.replicationFactor,
);
if (foundNodes.length < this.config.replicationFactor) {
Expand Down
68 changes: 41 additions & 27 deletions modules/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,46 +12,47 @@ exports.DID = 'DID';
exports.MAX_FILE_SIZE = 2621440;

/**
* @constant {number} SERVICE_API_RATE_LIMIT_TIME_WINDOW_MILLS
* - Express rate limit time window in milliseconds
* @constant {object} SERVICE_API_RATE_LIMIT
* - Express rate limit configuration constants
*/
exports.SERVICE_API_RATE_LIMIT_TIME_WINDOW_MILLS = 1 * 60 * 1000;

/**
* @constant {number} SERVICE_API_RATE_LIMIT_MAX_NUMBER
* - Express rate limit max number of requests allowed in the specified time window
*/
exports.SERVICE_API_RATE_LIMIT_MAX_NUMBER = 10;

/**
* @constant {number} SERVICE_API_SLOW_DOWN_TIME_WINDOW_MILLS
* - Express slow down time window in milliseconds
*/
exports.SERVICE_API_SLOW_DOWN_TIME_WINDOW_MILLS = 1 * 60 * 1000;
exports.SERVICE_API_RATE_LIMIT = {
TIME_WINDOW_MILLS: 1 * 60 * 1000,
MAX_NUMBER: 10,
};

/**
* @constant {number} SERVICE_API_SLOW_DOWN_DELAY_AFTER
* - Express slow down number of seconds after which it starts delaying requests
* @constant {object} SERVICE_API_SLOW_DOWN
* - Express slow down configuration constants
*/
exports.SERVICE_API_SLOW_DOWN_DELAY_AFTER = 5;
exports.SERVICE_API_SLOW_DOWN = {
TIME_WINDOW_MILLS: 1 * 60 * 1000,
DELAY_AFTER_SECONDS: 5,
DELAY_MILLS: 3 * 1000,
};

/**
* @constant {number} SERVICE_API_SLOW_DOWN_DELAY_MILLS
* - Express slow down delay between requests in milliseconds
* @constant {object} NETWORK_API_RATE_LIMIT
* - Network (Libp2p) rate limiter configuration constants
*/
exports.SERVICE_API_SLOW_DOWN_DELAY_MILLS = 3 * 1000;
exports.NETWORK_API_RATE_LIMIT = {
TIME_WINDOW_MILLS: 1 * 60 * 1000,
MAX_NUMBER: this.SERVICE_API_RATE_LIMIT.MAX_NUMBER,
};

/**
* @constant {number} NETWORK_API_RATE_LIMIT_TIME_WINDOW_MILLS
* - Network (Libp2p) rate limit time window in milliseconds
* @constant {object} NETWORK_API_SPAM_DETECTION
* - Network (Libp2p) spam detection rate limiter configuration constants
*/
exports.NETWORK_API_RATE_LIMIT_TIME_WINDOW_MILLS = 1 * 60 * 1000;
exports.NETWORK_API_SPAM_DETECTION = {
TIME_WINDOW_MILLS: 1 * 60 * 1000,
MAX_NUMBER: 20,
};

/**
* @constant {number} NETWORK_API_RATE_LIMIT_MAX_NUMBER
* - Network (Libp2p) rate limit max number of requests allowed in the specified time window
* @constant {object} NETWORK_API_BLACK_LIST_TIME_WINDOW_MINUTES
* - Network (Libp2p) black list time window in minutes
*/
exports.NETWORK_API_RATE_LIMIT_MAX_NUMBER = 10;
exports.NETWORK_API_BLACK_LIST_TIME_WINDOW_MINUTES = 60;

/**
* @constant {number} DID_PREFIX
Expand Down Expand Up @@ -191,6 +192,19 @@ exports.STRINGIFIED_NETWORK_RESPONSES = {
error: '"error"',
};

/**
* @constant {object} STRINGIFIED_NETWORK_RESPONSES -
* Stringified types of known network responses
*/
exports.NETWORK_PROTOCOLS = {
STORE: '/store/1.0.0',
RESOLVE: '/resolve/1.0.0',
SEARCH: '/search/1.0.0',
SEARCH_RESULT: '/search/result/1.0.0',
SEARCH_ASSERTIONS: '/search/assertions/1.0.0',
SEARCH_ASSERTIONS_RESULT: '/search/assertions/result/1.0.0',
};

/**
* @constant {object} ERROR_TYPE -
* Types of errors supported
Expand Down
Loading

0 comments on commit 82fa959

Please sign in to comment.