Skip to content

Commit

Permalink
Merge pull request #1812 from OriginTrail/v6/prerelease/testnet
Browse files Browse the repository at this point in the history
OriginTrail 6.0.0-beta.1.29 Testnet Release
  • Loading branch information
NZT48 authored Mar 9, 2022
2 parents 91f72a2 + 3f5c70e commit 99b1ff7
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 167 deletions.
23 changes: 1 addition & 22 deletions external/blazegraph-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ class BlazegraphService {
}

async resolve(uri) {
let isAsset = false;
const query = `PREFIX schema: <http://schema.org/>
CONSTRUCT { ?s ?p ?o }
WHERE {
Expand All @@ -112,26 +111,6 @@ class BlazegraphService {
}`;
let nquads = await this.construct(query);

if (!nquads.length) {
const query = `PREFIX schema: <http://schema.org/>
CONSTRUCT { ?s ?p ?o }
WHERE {
GRAPH ?g { ?s ?p ?o }
{
SELECT ?ng
WHERE {
?ng schema:hasUALs "${uri}" ;
schema:hasTimestamp ?timestamp .
}
ORDER BY DESC(?timestamp)
LIMIT 1
}
FILTER (?g = ?ng) .
}`;
nquads = await this.construct(query);
isAsset = true;
}

if (nquads.length) {
nquads = nquads.toString();
nquads = nquads.split('\n');
Expand All @@ -140,7 +119,7 @@ class BlazegraphService {
} else {
nquads = null;
}
return { nquads, isAsset };
return nquads;
}

async transformBlankNodes(nquads) {
Expand Down
26 changes: 2 additions & 24 deletions external/graphdb-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ class GraphdbService {
}

async resolve(uri) {
let isAsset = false;
const query = `PREFIX schema: <http://schema.org/>
CONSTRUCT { ?s ?p ?o }
WHERE {
Expand All @@ -127,26 +126,6 @@ class GraphdbService {
}`;
let nquads = await this.construct(query);

if (!nquads.length) {
const query = `PREFIX schema: <http://schema.org/>
CONSTRUCT { ?s ?p ?o }
WHERE {
GRAPH ?g { ?s ?p ?o }
{
SELECT ?ng
WHERE {
?ng schema:hasUALs "${uri}" ;
schema:hasTimestamp ?timestamp .
}
ORDER BY DESC(?timestamp)
LIMIT 1
}
FILTER (?g = ?ng) .
}`;
nquads = await this.construct(query);
isAsset = true;
}

if (nquads.length) {
nquads = nquads.toString();
nquads = nquads.replace(/_:genid(.){37}/gm, '_:$1');
Expand All @@ -155,10 +134,9 @@ class GraphdbService {
} else {
nquads = null;
}
return { nquads, isAsset };
return nquads;
}


async assertionsByAsset(uri) {
const query = `PREFIX schema: <http://schema.org/>
SELECT ?assertionId ?issuer ?timestamp
Expand All @@ -168,7 +146,7 @@ class GraphdbService {
schema:hasIssuer ?issuer .
}
ORDER BY DESC(?timestamp)`;
let result = await this.execute(query);
const result = await this.execute(query);

return JSON.parse(result).results.bindings;
}
Expand Down
8 changes: 4 additions & 4 deletions external/libp2p-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const initializationObject = {
dht: KadDHT,
},
dialer: {
dialTimeout: 1e3,
dialTimeout: 2e3,
},
config: {
dht: {
Expand Down Expand Up @@ -180,7 +180,7 @@ class Libp2pService {
)
} else {
await pipe(
['ack'],
[constants.NETWORK_RESPONSES.ACK],
stream
)

Expand All @@ -199,7 +199,7 @@ class Libp2pService {
Event_name: constants.ERROR_TYPE.LIBP2P_HANDLE_MSG_ERROR,
});
await pipe(
['ack'],
[constants.NETWORK_RESPONSES.ACK],
stream
)

Expand All @@ -224,7 +224,7 @@ class Libp2pService {
},
)

if(response.toString() === 'ack') {
if(response.toString() === constants.NETWORK_RESPONSES.ACK) {
return null;
}

Expand Down
33 changes: 28 additions & 5 deletions modules/command/publish/send-assertion-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,34 @@ class SendAssertionCommand extends Command {
}
nodes = [...new Set(nodes)];

for (const node of nodes) {
this.publishService.store({ id: assertion.id, nquads: nquads }, node).catch((e) => {
this.handleError(handlerId, e, `Error while sending data with assertion id ${assertion.id} to node ${node._idB58String}. Error message: ${e.message}. ${e.stack}`);
});
}
const storePromises = nodes.map((node) => this.publishService
.store({ id: assertion.id, nquads }, node)
.then((response) => {
if (!response) {
this.logger.error({
msg: `Error while sending data with assertion id ${assertion.id} to node ${node._idB58String} - receiving node didn't stored the assertion.`,
Operation_name: 'Error',
Event_name: constants.ERROR_TYPE.SEND_ASSERTION_ERROR,
Id_operation: handlerId,
});
} else if (response === 'busy') {
this.logger.error({
msg: `Error while sending data with assertion id ${assertion.id} to node ${node._idB58String} - receiving node is busy to store.`,
Operation_name: 'Error',
Event_name: constants.ERROR_TYPE.SEND_ASSERTION_ERROR,
Id_operation: handlerId,
});
}
})
.catch((e) => {
this.handleError(
handlerId,
e,
`Error while sending data with assertion id ${assertion.id} to node ${node._idB58String}. Error message: ${e.message}. ${e.stack}`,
);
}));

await Promise.all(storePromises);

await Models.handler_ids.update(
{
Expand Down
42 changes: 42 additions & 0 deletions modules/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,42 @@ exports.TRIPLE_STORE_CONNECT_MAX_RETRIES = 10;
*/
exports.TRIPLE_STORE_CONNECT_RETRY_FREQUENCY = 10; // 10 seconds

/**
* @constant {number} TRIPLE_STORE_QUEUE_LIMIT
* - Triple store queue limit
*/
exports.TRIPLE_STORE_QUEUE_LIMIT = 5000;

/**
* @constant {number} BLOCKCHAIN_QUEUE_LIMIT
* - Blockchain queue limit
*/
exports.BLOCKCHAIN_QUEUE_LIMIT = 25000;

/**
* @constant {number} RESOLVE_MAX_TIME_MILLIS
* - Maximum time for resolve operation
*/
exports.RESOLVE_MAX_TIME_MILLIS = 15 * 1000;

/**
* @constant {number} STORE_MAX_RETRIES
* - Maximum number of retries
*/
exports.STORE_MAX_RETRIES = 3;

/**
* @constant {number} STORE_BUSY_REPEAT_INTERVAL_IN_MILLS
* - Wait interval between retries for sending store requests
*/
exports.STORE_BUSY_REPEAT_INTERVAL_IN_MILLS = 2 * 1000;

/**
* @constant {number} HANDLE_STORE_BUSINESS_LIMIT
* - Max number of operations in triple store queue that indicate business
*/
exports.HANDLE_STORE_BUSINESS_LIMIT = 20;

/**
* @constant {object} TRIPLE_STORE_IMPLEMENTATION -
* Names of available triple store implementations
Expand All @@ -78,6 +108,17 @@ exports.TRIPLE_STORE_IMPLEMENTATION = {
GRAPHDB: 'GraphDB',
};

/**
* @constant {object} NETWORK_RESPONSES -
* Types of known network responses
*/
exports.NETWORK_RESPONSES = {
TRUE: true,
FALSE: false,
ACK: 'ack',
BUSY: 'busy',
};

/**
* @constant {object} ERROR_TYPE -
* Types of errors supported
Expand All @@ -98,6 +139,7 @@ exports.ERROR_TYPE = {
PROOFS_ROUTE_ERROR: 'ProofsRouteError',
RESULTS_ROUTE_ERROR: 'ResultsRouteError',
NODE_INFO_ROUTE_ERROR: 'NodeInfoRouteError',
HANDLE_STORE_ERROR: 'HandleStoreError',
EXTRACT_METADATA_ERROR: 'ExtractMetadataError',
TRIPLE_STORE_UNAVAILABLE_ERROR: 'TripleStoreUnavailableError',
TRIPLE_STORE_INSERT_ERROR: 'TripleStoreInsertError',
Expand Down
26 changes: 15 additions & 11 deletions modules/controller/rpc-controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const path = require('path');
const { v1: uuidv1, v4: uuidv4 } = require('uuid');
const sortedStringify = require('json-stable-stringify');
const validator = require('validator');
const slowDown = require('express-slow-down');
const Models = require('../../models/index');
const constants = require('../constants');
const pjson = require('../../package.json');
Expand Down Expand Up @@ -74,15 +75,21 @@ class RpcController {

this.app.use((error, req, res, next) => {
if (error instanceof IpDeniedError) {
return res.status(401).send('Access denied')
return res.status(401).send('Access denied');
}
return next();
});

this.app.use((req, res, next) => {
this.logger.info(`${req.method}: ${req.url} request received`);
return next();
})
});

this.app.use(slowDown({
windowMs: 1 * 60 * 1000, // 1 minute
delayAfter: 30, // allow 30 requests per 1 minute, then...
delayMs: 2 * 1000, // begin adding 2s of delay per request above 30;
}));
}

async initializeErrorMiddleware() {
Expand Down Expand Up @@ -203,9 +210,8 @@ class RpcController {
isAsset = true;
id = assertionId;
}
const result = await this.dataService.resolve(id, true);
if (result && result.nquads) {
let {nquads} = result;
const nquads = await this.dataService.resolve(id, true);
if (nquads) {
let assertion = await this.dataService.createAssertion(nquads);
assertion.jsonld.metadata = JSON.parse(sortedStringify(assertion.jsonld.metadata))
assertion.jsonld.data = JSON.parse(sortedStringify(await this.dataService.fromNQuads(assertion.jsonld.data, assertion.jsonld.metadata.type)))
Expand Down Expand Up @@ -235,9 +241,8 @@ class RpcController {
nodes = [...new Set(nodes)];
for (const node of nodes) {
try {
const result = await this.queryService.resolve(id, req.query.load, isAsset, node);
if (result) {
const {assertion} = result;
const assertion = await this.queryService.resolve(id, req.query.load, isAsset, node);
if (assertion) {
assertion.jsonld.metadata = JSON.parse(sortedStringify(assertion.jsonld.metadata))
assertion.jsonld.data = JSON.parse(sortedStringify(await this.dataService.fromNQuads(assertion.jsonld.data, assertion.jsonld.metadata.type)))
response.push(isAsset ? {
Expand Down Expand Up @@ -590,9 +595,8 @@ class RpcController {
assertions = await this.dataService.findAssertions(reqNquads);
}
for (const assertionId of assertions) {
const content = await this.dataService.resolve(assertionId);
if (content) {
const rawNquads = content.nquads ? content.nquads : content.rdf;
const rawNquads = await this.dataService.resolve(assertionId);
if (rawNquads) {
const { nquads } = await this.dataService.createAssertion(rawNquads);
const proofs = await this.validationService.getProofs(nquads, reqNquads);
result.push({ assertionId, proofs });
Expand Down
Loading

0 comments on commit 99b1ff7

Please sign in to comment.