diff --git a/lib/cli.js b/lib/cli.js index 5abc70f6..c445d51b 100644 --- a/lib/cli.js +++ b/lib/cli.js @@ -126,50 +126,51 @@ export function getServer () { return server } -export function runJob (job, options = {}) { +export async function runJob (job, options = {}) { // Function to effectively run the job - function runJobWithOptions () { + async function runJobWithOptions () { console.log(`Launching job ${job.id} at ${(new Date()).toISOString()}, please wait...`) console.time('Running time') Healthcheck.jobId = job.id Healthcheck.isRunning = true const hrstart = process.hrtime() - return app.service('jobs').create(job) - .then(tasks => { - console.log('Job terminated, ' + tasks.length + ' tasks ran') - const hrend = process.hrtime(hrstart) - console.timeEnd('Running time') - Healthcheck.isRunning = false - Healthcheck.duration = hrend[0] + (hrend[1] / 1e9) - Healthcheck.error = null - // Compute the error ratio for fault-tolerant jobs - Healthcheck.nbFailedTasks = 0 - Healthcheck.nbSuccessfulTasks = 0 - tasks.forEach(task => { - if (task.error) Healthcheck.nbFailedTasks++ - else Healthcheck.nbSuccessfulTasks++ - }) - const nbTotalTasks = (Healthcheck.nbSuccessfulTasks + Healthcheck.nbFailedTasks) - // Job with 0 tasks is always succesful - Healthcheck.successRate = (nbTotalTasks > 0 ? Healthcheck.nbSuccessfulTasks / nbTotalTasks : 1) - return Promise.resolve(tasks) - }) - .catch(error => { - console.error(error.message) - Healthcheck.isRunning = false - Healthcheck.error = error + try { + const tasks = await app.service('jobs').create(job) + console.log('Job terminated, ' + tasks.length + ' tasks ran') + const hrend = process.hrtime(hrstart) + console.timeEnd('Running time') + Healthcheck.isRunning = false + Healthcheck.duration = hrend[0] + (hrend[1] / 1e9) + Healthcheck.error = null + // Compute the error ratio for fault-tolerant jobs + Healthcheck.nbFailedTasks = 0 + Healthcheck.nbSuccessfulTasks = 0 + tasks.forEach(task => { + if (task.error) Healthcheck.nbFailedTasks++ + else Healthcheck.nbSuccessfulTasks++ }) + const nbTotalTasks = (Healthcheck.nbSuccessfulTasks + Healthcheck.nbFailedTasks) + // Job with 0 tasks is always succesful + Healthcheck.successRate = (nbTotalTasks > 0 ? Healthcheck.nbSuccessfulTasks / nbTotalTasks : 1) + // Check if run has to be considered as successful + await healthcheck(options) + return Promise.resolve(tasks) + } catch (error) { + console.error(error.message) + Healthcheck.isRunning = false + Healthcheck.error = error + } } // Setup CRON job if required let cronJob if (options.cron) { console.log('Scheduling job with cron pattern ' + options.cron) - cronJob = new CronJob(options.cron, () => { + cronJob = new CronJob(options.cron, async () => { // If last job has not yet finished skip this call as we are late if (!Healthcheck.isRunning) { Healthcheck.nbSkippedJobs = 0 - runJobWithOptions() + await runJobWithOptions() } else { console.log('Skipping scheduled job as previous one is not yet finished') Healthcheck.nbSkippedJobs++ @@ -182,32 +183,25 @@ export function runJob (job, options = {}) { }) } // Run job + let tasks if (cronJob) { cronJob.start() // Force run on start ? - if (options.run) return runJobWithOptions() + if (options.run) tasks = await runJobWithOptions() } else { - return runJobWithOptions() + tasks = await runJobWithOptions() } + return tasks } export async function run (job, options = {}) { await createApp(job, options) const tasks = await runJob(job, options) - // Check if run has to be considered as successful - // If CRON the healthcheck command should be used instead if (!options.cron) { - let healthcheckError - try { - await healthcheck(options) - } catch (error) { - healthcheckError = error - } // When not running job continuously stop the server as well await server.close() - if (healthcheckError) throw healthcheckError - return tasks } + return tasks } export async function processOptions () { @@ -230,6 +224,7 @@ export async function processOptions () { .option('-s, --sync [uri]', 'Activate sync module with given connection URI') .option('-sr, --success-rate [rate]', 'Change the success rate for fault-tolerant jobs to be considered as successful (defaults to 1)', process.env.SUCCESS_RATE ? Number(process.env.SUCCESS_RATE) : 1) .option('-md, --max-duration [duration]', 'Change the maximum run duration in seconds for fault-tolerant jobs to be considered as failed (defaults to unset)', process.env.MAX_DURATION ? Number(process.env.MAX_DURATION) : -1) + .option('-nsj, --nb-skipped-jobs [nb]', 'Change the number of skipped runs for fault-tolerant jobs to be considered as failed (defaults to 3)', process.env.NB_SKIPPED_JOBS ? Number(process.env.NB_SKIPPED_JOBS) : 3) .option('-sw, --slack-webhook [url]', 'Slack webhook URL to post messages on failure', process.env.SLACK_WEBHOOK_URL) .option('-mt, --message-template [template]', 'Message template used on failure', process.env.MESSAGE_TEMPLATE || 'Job <%= jobId %>: <%= error.message %>') .option('-lt, --link-template [template]', 'Link template used on failure', process.env.LINK_TEMPLATE || '') diff --git a/lib/healthcheck.js b/lib/healthcheck.js index e33283aa..01554615 100644 --- a/lib/healthcheck.js +++ b/lib/healthcheck.js @@ -61,7 +61,6 @@ async function publishToSlack (slackWebhook, data, compilers, posttext = '', col const message = compilers.message(data) const link = compilers.link(data) const text = link ? `<${link}|${message}${posttext}>` : `${message}${posttext}` - await utils.promisify(request.post)({ url: slackWebhook, body: JSON.stringify({ diff --git a/test/cli.test.js b/test/cli.test.js index 246f821d..09ad3916 100644 --- a/test/cli.test.js +++ b/test/cli.test.js @@ -39,7 +39,7 @@ describe('krawler:cli', () => { it('runs successfully once using CLI', async () => { try { - const tasks = await cli(jobfile, { port: 3030, messageTemplate: 'Job <%= jobId %>: <%= error.message %>', debug: true }) + const tasks = await cli(jobfile, { port: 3030, messageTemplate: process.env.MESSAGE_TEMPLATE, debug: true, slackWebhook: process.env.SLACK_WEBHOOK_URL }) // All other features should have been tested independently // so we just test here the CLI run correctly expect(tasks.length).to.equal(1) @@ -55,7 +55,7 @@ describe('krawler:cli', () => { it('runs unsuccessfully once using CLI', async () => { try { - await cli(jobfile, { port: 3030, maxDuration: 0, messageTemplate: 'Job <%= jobId %>: <%= error.message %>', debug: true }) + await cli(jobfile, { port: 3030, maxDuration: 0, messageTemplate: process.env.MESSAGE_TEMPLATE, debug: true, slackWebhook: process.env.SLACK_WEBHOOK_URL }) assert.fail('Healthcheck should fail') } catch (error) { // Check intermediate products have been erased and final product are here @@ -70,7 +70,7 @@ describe('krawler:cli', () => { try { // Clean previous test output fs.removeSync(path.join(outputPath, 'RJTT-30-18000-2-1.tif.csv')) - appServer = await cli(jobfile, { mode: 'setup', api: true, apiPrefix: '/api', port: 3030 }) + appServer = await cli(jobfile, { mode: 'setup', api: true, apiPrefix: '/api', port: 3030, messageTemplate: process.env.MESSAGE_TEMPLATE, debug: true, slackWebhook: process.env.SLACK_WEBHOOK_URL }) // Submit a job to be run const response = await utils.promisify(request.post)({ url: 'http://localhost:3030/api/jobs', @@ -108,7 +108,11 @@ describe('krawler:cli', () => { cli(jobfile, { mode: 'setup', sync: 'mongodb://127.0.0.1:27017/krawler-test', - cron: '*/10 * * * * *' + port: 3030, + cron: '*/10 * * * * *', + messageTemplate: process.env.MESSAGE_TEMPLATE, + debug: true, + slackWebhook: process.env.SLACK_WEBHOOK_URL }) .then(server => { appServer = server @@ -124,7 +128,7 @@ describe('krawler:cli', () => { runCount++ // First run is fine, second one raises an error if (runCount === 1) return hook - else throw new Error('Error') + else throw new Error('Test Error') } } }) @@ -134,7 +138,7 @@ describe('krawler:cli', () => { if ((event.name === 'task-done') || (event.name === 'job-done')) eventCount++ }) // Only run as we already setup the app - cli(jobfile, { mode: 'runJob', cron: '*/10 * * * * *', run: true }) + cli(jobfile, { mode: 'runJob', port: 3030, cron: '*/10 * * * * *', run: true, messageTemplate: process.env.MESSAGE_TEMPLATE, debug: true, slackWebhook: process.env.SLACK_WEBHOOK_URL }) .then(async () => { // As it runs every 10 seconds wait until it should have ran at least once again const seconds = Math.floor(moment().seconds()) @@ -161,7 +165,7 @@ describe('krawler:cli', () => { expect(healthcheck.successRate).beUndefined() expect(healthcheck.error).toExist() expect(healthcheck.error.message).toExist() - expect(healthcheck.error.message).to.equal('Error') + expect(healthcheck.error.message).to.equal('Test Error') expect(eventCount).to.equal(4) // 4 events collection = client.db.collection('krawler-events') const taskEvents = await collection.find({ event: 'task-done' }).toArray()