Skip to content

Commit

Permalink
Merge pull request #29 from martin-krcmar/three-queueus
Browse files Browse the repository at this point in the history
Three queueus
  • Loading branch information
waldez authored Jun 5, 2018
2 parents 8ff45c0 + 5ab32f0 commit 29f2d22
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 5 deletions.
3 changes: 2 additions & 1 deletion appmixer-lib.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module.exports = {
object: require('./util/object'),
component: require('./util/component'),
flow: require('./util/flow'),
PagingAggregator: require('./util/paging-aggregator')
PagingAggregator: require('./util/paging-aggregator'),
promise: require('./util/promise')
}
};
2 changes: 1 addition & 1 deletion util/component.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
'use strict'
'use strict';

const metrohash128 = require('metrohash').metrohash128;
const objectUtil = require('./object');
Expand Down
51 changes: 48 additions & 3 deletions util/flow.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
'use strict'
'use strict';

/**
* Returns generator, which loops over unique input links
Expand Down Expand Up @@ -43,9 +43,54 @@ function* allInputLinks(source, checkUniqueIdPort = true) {
}
}

/**
* Returns generator, which loops over all output links from a component's output port.
* @param {Flow} flow
* @param {string} cid - component Id
* @param {string} outputPort
* @return {Iterator<*>}
*/
function* allOutputLinks(flow, cid, outputPort) {

const descriptor = flow.getFlowDescriptor();
for (let componentId in descriptor) {
if (!descriptor.hasOwnProperty(componentId)) {
continue;
}

let source = descriptor[componentId].source;
for (let inputPort in source) {
if (!source.hasOwnProperty(inputPort)) {
continue;
}

let sourceInput = source[inputPort];
for (let sourceComponentId in sourceInput) {
if (!sourceInput.hasOwnProperty(sourceComponentId)) {
continue;
}

if (sourceComponentId !== cid) {
continue;
}

const ports = Array.isArray(sourceInput[sourceComponentId]) ?
sourceInput[sourceComponentId] : [sourceInput[sourceComponentId]];

if (ports.indexOf(outputPort) !== -1) {
yield {
componentId,
inputPort
};
}
}
}
}
}

module.exports = {
ComponentDescriptor: {
allInputLinks
}
allInputLinks,
allOutputLinks
}
};
95 changes: 95 additions & 0 deletions util/promise.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
'use strict';
const Promise = require('bluebird');
const check = require('check-types');

/**
* This is a variation of classic Promise.all function. Where Promise.all is fail fast -
* when the first promise fails, the rest is not executed.
* In this case all promises are executed and result array is returned.
* @param {Array} promises
* @return {Promise<*>}
* @throws Error
*/
module.exports.allNoErr = async promises => {

if (!Array.isArray(promises)) {
throw new Error('Promises is not an array.');
}
return Promise.all(promises.map(promise => promise.catch(e => e)));
};

/**
* Similar to the previous function. But in this case when one of the promises fail error
* is thrown. The first error found will be throws. So it's like classic Promise.all but
* all promises are finished.
* @param {Array} promises
* @return {Promise<*>}
* @throws Error
*/
module.exports.all = async promises => {

if (!Array.isArray(promises)) {
throw new Error('Promises is not an array.');
}
return Promise.all(promises.map(promise => promise.catch(e => e)))
.then(results => {
for (let result of results) {
if (result instanceof Error) {
throw result;
}
}
return results;
});
};

/**
* Like Promise.map but again - let all promises finish and then throw an error if one of
* the promises failed. Return result of each promise in an array otherwise.
* @param {Object} object
* @param {function} callback
* @return {Promise<Array>}
* @throws Error
*/
module.exports.mapKeys = async (object, callback) => {

check.assert.object(object, 'Invalid object.');
check.assert.function(callback, 'Invalid callback function');

let results = [];
return Promise.map(Object.keys(object), async key => {
return Promise.resolve(callback(key)).reflect();
}).each(inspection => {
if (!inspection.isFulfilled()) {
throw inspection.reason();
}
results.push(inspection.value());
}).then(() => {
return results;
});
};

/**
* Like Promise.map but again - let all promises finish and then throw an error if one of
* the promises failed. Return result of each promise in an array otherwise.
* @param {Object} object
* @param {function} callback
* @return {Promise<Array>}
* @throws Error
*/
module.exports.mapProperties = async (object, callback) => {

check.assert.object(object, 'Invalid object.');
check.assert.function(callback, 'Invalid callback function');

let results = [];
return Promise.map(Object.keys(object), async key => {
return Promise.resolve(callback(object[key])).reflect();
}).each(inspection => {
if (!inspection.isFulfilled()) {
throw inspection.reason();
}
results.push(inspection.value());
}).then(() => {
return results;
});
};

0 comments on commit 29f2d22

Please sign in to comment.