Skip to content

Commit

Permalink
feat: Async job should manage tasks in a queue (closes #151)
Browse files Browse the repository at this point in the history
  • Loading branch information
claustres committed Jun 30, 2021
1 parent ffb6a0f commit cac15ed
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 22 deletions.
6 changes: 3 additions & 3 deletions src/hooks/hooks.geojson.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ export function convertToGeoJson (options = {}) {
// Lat, long, alt not required anymore
// Keep GeoJson properties but avoid geometry, etc. in case the target object is already a GeoJson feature
properties: Object.assign({}, _.get(object, 'properties'),
(options.keepGeometryProperties ?
_.omit(object, ['properties', 'geometry', 'type']) :
_.omit(object, [geometry, longitude, latitude, altitude, 'properties', 'geometry', 'type']))),
(options.keepGeometryProperties
? _.omit(object, ['properties', 'geometry', 'type'])
: _.omit(object, [geometry, longitude, latitude, altitude, 'properties', 'geometry', 'type']))),
geometry: (geo || {
type: 'Point',
coordinates: [lon, lat, alt]
Expand Down
2 changes: 1 addition & 1 deletion src/hooks/hooks.json.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ export function mergeJson (options = {}) {
json.forEach(object => {
// Find similar items
const items = _.filter(objects, item => {
if (typeof options.by == 'function') return (options.by(object) === options.by(item))
if (typeof options.by === 'function') return (options.by(object) === options.by(item))
else return _.get(object, options.by) === _.get(item, options.by)
})
// Then merge similar items
Expand Down
2 changes: 1 addition & 1 deletion src/hooks/hooks.mongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ export function createMongoBucket (options = {}) {
else resolve(collection)
})
})

if (!bucket) {
debug('Creating the ' + bucketName + ' bucket')
bucket = new GridFSBucket(client.db, Object.assign({
Expand Down
2 changes: 1 addition & 1 deletion src/hooks/hooks.nwp.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export function generateNwpTasks (options) {
for (let timeOffset = elementLowerLimit; timeOffset <= elementUpperLimit; timeOffset += elementInterval) {
const forecastTime = runTime.clone().add({ seconds: timeOffset })
if (options.keepPastForecasts || !forecastTime.isBefore(lowerTime)) {
let task = Object.assign({
const task = Object.assign({
level,
runTime,
forecastTime,
Expand Down
4 changes: 2 additions & 2 deletions src/hooks/hooks.system.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export function untar (options = {}) {

export function runCommand (options = {}) {
async function run (item) {
let command = template(item, options.command)
const command = template(item, options.command)
// Could actually be a set of commands so we unify the way to handle both cases
let commands
if (!options.spawn) { // When using exec commands are strings
Expand All @@ -46,7 +46,7 @@ export function runCommand (options = {}) {
commands = (Array.isArray(command[0]) ? command : [command])
}
for (let i = 0; i < commands.length; i++) {
let command = commands[i]
const command = commands[i]
let result
debug('Running command', command)
if (options.spawn) {
Expand Down
37 changes: 25 additions & 12 deletions src/jobs/jobs.async.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,39 +47,52 @@ async function createJob (options = {}, store = null, tasks, id, taskTemplate) {
}
}
}
// Wait for a task to be run by keeping track of task in queue
const addToQueue = async (task, queue, taskResults) => {
queue.push(task)
const result = await task
const hrend = process.hrtime(hrstart)
const duration = (1000 * hrend[0] + hrend[1] / 1e6)
queue.splice(queue.indexOf(task), 1)
taskResults.push(result)
debug('Task ran', result)
debug(taskResults.length + ` tasks ran from start in ${duration} ms`)
// Check if timeout has been reached
if (options.timeout && (duration > options.timeout)) throw new Timeout('Job timeout reached')
return result
}

const workersLimit = options.workersLimit || 4
let i = 0
// The set of workers/tasks for current step
// We launch the workers in sequence, one step of the sequence contains a maximum number of workersLimit workers
let workers = []
let taskResults = []
const workers = []
const taskResults = []
while (i < tasks.length) {
const task = tasks[i]
const params = {}
if (store) params.store = store
// Add a worker to current step of the sequence
workers.push(runTask(task, params))
addToQueue(runTask(task, params), workers, taskResults)
// When we reach the worker limit wait until the step finishes and jump to next one
if ((workers.length >= workersLimit) ||
(i === tasks.length - 1)) {
try {
const results = await Promise.all(workers)
const hrend = process.hrtime(hrstart)
const duration = (1000 * hrend[0] + hrend[1] / 1e6)
taskResults = taskResults.concat(results)
debug(results.length + ' tasks ran', results)
debug(taskResults.length + ` tasks ran from start in ${duration} ms`)
// Check if timeout has been reached
if (options.timeout && (duration > options.timeout)) throw new Timeout('Job timeout reached')
await Promise.race(workers)
} catch (error) {
debug('Some tasks failed', error)
throw error
}
workers = []
}
i++
}
// Wait for the rest of the tasks to finish
try {
await Promise.all(workers)
} catch (error) {
debug('Some tasks failed', error)
throw error
}
return taskResults
}

Expand Down
4 changes: 2 additions & 2 deletions src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export function transformJsonObject (json, options) {
json = sift(options.filter, json)
}
// By default we perform transformation in place
if (!_.get(options, 'inPlace', true)) {
if (!_.get(options, 'inPlace', true)) {
json = _.cloneDeep(json)
}
// Iterate over path mapping
Expand Down Expand Up @@ -148,7 +148,7 @@ export function transformJsonObject (json, options) {
}

// Call a given function on each hook item
export function callOnHookItems(options = {}) {
export function callOnHookItems (options = {}) {
return (f) => {
// Default call mode is per item
const perItem = _.get(options, 'perItem', true)
Expand Down

0 comments on commit cac15ed

Please sign in to comment.