From fd395246491b07b720eccd82438ae2c3b0615e7a Mon Sep 17 00:00:00 2001 From: Luc Claustres Date: Wed, 25 Sep 2024 18:17:43 +0200 Subject: [PATCH] fix: Healthcheck endpoint does not raise error when skipping too much jobs (closes #280) --- healthcheck.js | 2 + lib/cli.js | 21 +++-- lib/healthcheck.js | 36 +++++---- test/cli.test.js | 178 +++++++++++++++++++---------------------- test/jobs.cron.test.js | 100 +++++++++++++++++++++++ 5 files changed, 221 insertions(+), 116 deletions(-) create mode 100644 test/jobs.cron.test.js diff --git a/healthcheck.js b/healthcheck.js index 20ee727a..54fcb059 100644 --- a/healthcheck.js +++ b/healthcheck.js @@ -20,5 +20,7 @@ try { await healthcheck(program) process.exit(0) } catch (error) { + // Healthcheck is already writing to stderr + // console.error(error) process.exit(1) } diff --git a/lib/cli.js b/lib/cli.js index c445d51b..f35b4219 100644 --- a/lib/cli.js +++ b/lib/cli.js @@ -80,7 +80,7 @@ export async function createApp (job, options = {}) { // Add a healthcheck for cron jobs app.get(apiPrefix + '/healthcheck', (req, res, next) => { if (Healthcheck.error) { - res.status(500).json(_.pick(Healthcheck, ['jobId', 'error.code', 'error.message'])) + res.status(500).json(_.pick(Healthcheck, ['jobId', 'nbSkippedJobs', 'isRunning', 'error.code', 'error.message'])) } else { res.status(200).json(_.omit(Healthcheck, ['error'])) } @@ -152,9 +152,7 @@ export async function runJob (job, options = {}) { const nbTotalTasks = (Healthcheck.nbSuccessfulTasks + Healthcheck.nbFailedTasks) // Job with 0 tasks is always succesful Healthcheck.successRate = (nbTotalTasks > 0 ? Healthcheck.nbSuccessfulTasks / nbTotalTasks : 1) - // Check if run has to be considered as successful - await healthcheck(options) - return Promise.resolve(tasks) + return tasks } catch (error) { console.error(error.message) Healthcheck.isRunning = false @@ -166,15 +164,24 @@ export async function runJob (job, options = {}) { let cronJob if (options.cron) { console.log('Scheduling job with cron pattern ' + options.cron) + Healthcheck.nbSkippedJobs = 0 cronJob = new CronJob(options.cron, async () => { // If last job has not yet finished skip this call as we are late if (!Healthcheck.isRunning) { - Healthcheck.nbSkippedJobs = 0 await runJobWithOptions() + Healthcheck.nbSkippedJobs = 0 } else { console.log('Skipping scheduled job as previous one is not yet finished') Healthcheck.nbSkippedJobs++ } + try { + // Check if run has to be considered as successful + await healthcheck(options) + } catch (error) { + // Healthcheck is already writing to stderr + //console.error(error) + Healthcheck.error = error + } }) // In case the server is forced to exit stop the job as well server.on('close', () => { @@ -195,13 +202,13 @@ export async function runJob (job, options = {}) { } export async function run (job, options = {}) { - await createApp(job, options) + const server = await createApp(job, options) const tasks = await runJob(job, options) if (!options.cron) { // When not running job continuously stop the server as well await server.close() } - return tasks + return (tasks ? tasks : server) } export async function processOptions () { diff --git a/lib/healthcheck.js b/lib/healthcheck.js index 01554615..347625b0 100644 --- a/lib/healthcheck.js +++ b/lib/healthcheck.js @@ -17,13 +17,19 @@ export const Healthcheck = { nbSkippedJobs: 0, // Number of times the scheduled job has been skipped due to on-going one error: null // Indicating error if job has erroned for cron jobs /* Undefined by default + nbSkippedJobs: 0, // Number of time last job was not yet finished when cron ran nbFailedTasks: 0, // Number of failed/success tasks of last run for fault-tolerant jobs nbSuccessfulTasks: 0, successRate: 0 // Ratio of previous values */ } -export class HealthcheckError extends Error {} +export class HealthcheckError extends Error { + constructor(data) { + super(data.message) + this.code = data.code + } +} function readFromLog () { try { @@ -45,7 +51,7 @@ function writeToLog (data) { } } -function publishToConsole (data, compilers, pretext, stream = 'error') { +export function publishToConsole (data, compilers, pretext, stream = 'error') { try { if (stream === 'error') console.error(pretext, compilers.message(data)) else console.log(pretext, compilers.message(data)) @@ -55,7 +61,7 @@ function publishToConsole (data, compilers, pretext, stream = 'error') { } } -async function publishToSlack (slackWebhook, data, compilers, posttext = '', color = 'danger') { +export async function publishToSlack (slackWebhook, data, compilers, posttext = '', color = 'danger') { if (!slackWebhook) return try { const message = compilers.message(data) @@ -102,20 +108,17 @@ export async function healthcheck (options) { const response = await utils.promisify(request.get)(endpoint) const data = JSON.parse(response.body) if (options.debug) { + console.log('Current healthcheck status from service', response.statusCode) console.log('Current healthcheck output read from service', data) console.log('Previous healthcheck output read from log', previousHealthcheck) } - if (response.statusCode === 200) { - // Fault-tolerant jobs always return 200, we use more criteria to check for health status - if (_.has(data, 'successRate') && (data.successRate < options.successRate)) { - data.error = { code: 'HEALTHCHECK_SUCCESS_RATE', message: `Insufficient success rate (${data.successRate.toFixed(2)})` } - } - if (data.nbSkippedJobs >= options.nbSkippedJobs) { - data.error = { code: 'HEALTHCHECK_SKIPPED_JOBS', message: `Too much skipped jobs (${data.nbSkippedJobs})` } - } - if ((options.maxDuration > 0) && (data.duration > options.maxDuration)) { - data.error = { code: 'HEALTHCHECK_DURATION', message: `Too much slow execution (${data.duration}s)` } - } + // Fault-tolerant jobs always return 200, we use more criteria to check for health status + if (_.has(data, 'successRate') && (data.successRate < options.successRate)) { + data.error = { code: 'HEALTHCHECK_SUCCESS_RATE', message: `Insufficient success rate (${data.successRate.toFixed(2)})` } + } else if (data.nbSkippedJobs >= options.nbSkippedJobs) { + data.error = { code: 'HEALTHCHECK_SKIPPED_JOBS', message: `Too much skipped jobs (${data.nbSkippedJobs})` } + } else if ((options.maxDuration > 0) && (data.duration > options.maxDuration)) { + data.error = { code: 'HEALTHCHECK_DURATION', message: `Too much slow execution (${data.duration}s)` } } writeToLog(data) // Add env available for templates @@ -126,7 +129,7 @@ export async function healthcheck (options) { publishToConsole(data, compilers, '[ALERT]', 'error') await publishToSlack(options.slackWebhook, data, compilers, '', 'danger') } - throw new HealthcheckError(data.error.message) + throw new HealthcheckError(data.error) } else { // Only notify on closing errors if (previousError) { @@ -136,6 +139,9 @@ export async function healthcheck (options) { } } } catch (error) { + if (options.debug) { + console.log('Current healthcheck raised error', error) + } // Give feedback for any error raised by the healthcheck process if (!(error instanceof HealthcheckError)) { // Set jobId variable/error available in context so that templates will not fail diff --git a/test/cli.test.js b/test/cli.test.js index 09ad3916..d1f7c066 100644 --- a/test/cli.test.js +++ b/test/cli.test.js @@ -47,6 +47,7 @@ describe('krawler:cli', () => { expect(fs.existsSync(path.join(outputPath, 'RJTT-30-18000-2-1.tif'))).beFalse() expect(fs.existsSync(path.join(outputPath, 'RJTT-30-18000-2-1.tif.csv'))).beTrue() } catch (error) { + console.log(error) assert.fail('Healthcheck should not fail') } }) @@ -101,110 +102,99 @@ describe('krawler:cli', () => { // Let enough time to process .timeout(15000) - it('runs as CRON using CLI with continuous healthcheck', (done) => { + it('runs as CRON using CLI with continuous healthcheck', async () => { // Clean previous test output fs.removeSync(path.join(outputPath, 'RJTT-30-18000-2-1.tif.csv')) // Setup the app - cli(jobfile, { + appServer = await cli(jobfile, { mode: 'setup', sync: 'mongodb://127.0.0.1:27017/krawler-test', port: 3030, cron: '*/10 * * * * *', messageTemplate: process.env.MESSAGE_TEMPLATE, - debug: true, + debug: false, slackWebhook: process.env.SLACK_WEBHOOK_URL }) - .then(server => { - appServer = server - // Clean any previous healthcheck log - fs.removeSync(path.join(__dirname, '..', 'healthcheck.log')) - const app = getApp() - // Add hook to know how many times the job will run - const jobService = app.service('jobs') - let runCount = 0 - jobService.hooks({ - after: { - create: (hook) => { - runCount++ - // First run is fine, second one raises an error - if (runCount === 1) return hook - else throw new Error('Test Error') - } - } - }) - // Check for event emission - let eventCount = 0 - app.on('krawler', event => { - if ((event.name === 'task-done') || (event.name === 'job-done')) eventCount++ - }) - // Only run as we already setup the app - cli(jobfile, { mode: 'runJob', port: 3030, cron: '*/10 * * * * *', run: true, messageTemplate: process.env.MESSAGE_TEMPLATE, debug: true, slackWebhook: process.env.SLACK_WEBHOOK_URL }) - .then(async () => { - // As it runs every 10 seconds wait until it should have ran at least once again - const seconds = Math.floor(moment().seconds()) - const remainingSecondsForNextRun = 11 - seconds % 10 - setTimeout(async () => { - try { - expect(runCount).to.equal(2) // 2 runs - const response = await utils.promisify(request.get)('http://localhost:3030/healthcheck') - console.log(response.body) - expect(response.statusCode).to.equal(500) - const { error, stdout, stderr } = await runCommand('node ' + path.join(__dirname, '..', 'healthcheck.js')) - expect(error).toExist() - expect(stdout).to.equal('') - expect(stderr.includes('[ALERT]')).beTrue() - const healthcheckLog = fs.readJsonSync(path.join(__dirname, '..', 'healthcheck.log')) - const healthcheck = JSON.parse(response.body) - console.log(healthcheck) - expect(healthcheck).to.deep.equal(healthcheckLog) - expect(healthcheck.isRunning).beUndefined() - expect(healthcheck.duration).beUndefined() - expect(healthcheck.nbSkippedJobs).beUndefined() - expect(healthcheck.nbFailedTasks).beUndefined() - expect(healthcheck.nbSuccessfulTasks).beUndefined() - expect(healthcheck.successRate).beUndefined() - expect(healthcheck.error).toExist() - expect(healthcheck.error.message).toExist() - expect(healthcheck.error.message).to.equal('Test Error') - expect(eventCount).to.equal(4) // 4 events - collection = client.db.collection('krawler-events') - const taskEvents = await collection.find({ event: 'task-done' }).toArray() - expect(taskEvents.length).to.equal(2) - const jobEvents = await collection.find({ event: 'job-done' }).toArray() - expect(jobEvents.length).to.equal(2) - server.close() - appServer = null - done() - } catch (error) { - console.log(error) - done(error) - } - }, remainingSecondsForNextRun * 1000) - expect(runCount).to.equal(1) // First run - const response = await utils.promisify(request.get)('http://localhost:3030/healthcheck') - expect(response.statusCode).to.equal(200) - const { error, stdout, stderr } = await runCommand('node ' + path.join(__dirname, '..', 'healthcheck.js')) - expect(error).to.equal(null) - expect(stdout).to.equal('') - expect(stderr).to.equal('') - const healthcheckLog = fs.readJsonSync(path.join(__dirname, '..', 'healthcheck.log')) - const healthcheck = JSON.parse(response.body) - expect(healthcheck).to.deep.equal(healthcheckLog) - expect(healthcheck.isRunning).toExist() - expect(healthcheck.nbSkippedJobs).toExist() - expect(healthcheck.error).beUndefined() - expect(healthcheck.nbFailedTasks).to.equal(0) - expect(healthcheck.nbSuccessfulTasks).to.equal(1) - expect(healthcheck.successRate).to.equal(1) - expect(healthcheck.state).toExist() - expect(eventCount).to.equal(2) // 2 events - collection = client.db.collection('krawler-events') - const taskEvents = await collection.find({ event: 'task-done' }).toArray() - expect(taskEvents.length).to.equal(1) - const jobEvents = await collection.find({ event: 'job-done' }).toArray() - expect(jobEvents.length).to.equal(1) - }) - }) + // Clean any previous healthcheck log + fs.removeSync(path.join(__dirname, '..', 'healthcheck.log')) + const app = getApp() + // Add hook to know how many times the job will run + const jobService = app.service('jobs') + let runCount = 0 + jobService.hooks({ + after: { + create: (hook) => { + runCount++ + // First run is fine, second one raises an error + if (runCount === 1) return hook + else throw new Error('Test Error') + } + } + }) + // Check for event emission + let eventCount = 0 + app.on('krawler', event => { + if ((event.name === 'task-done') || (event.name === 'job-done')) eventCount++ + }) + // Only run as we already setup the app + await cli(jobfile, { mode: 'runJob', port: 3030, cron: '*/10 * * * * *', run: true, messageTemplate: process.env.MESSAGE_TEMPLATE, debug: false, slackWebhook: process.env.SLACK_WEBHOOK_URL }) + expect(runCount).to.equal(1) // First run + const response = await utils.promisify(request.get)('http://localhost:3030/healthcheck') + // console.log(response.body) + expect(response.statusCode).to.equal(200) + const healthcheck = JSON.parse(response.body) + // console.log(healthcheck) + const { error } = await runCommand('node ' + path.join(__dirname, '..', 'healthcheck.js')) + expect(error).beNull() + const healthcheckLog = fs.readJsonSync(path.join(__dirname, '..', 'healthcheck.log')) + expect(healthcheck).to.deep.equal(healthcheckLog) + expect(healthcheck.isRunning).to.equal(false) + expect(healthcheck.nbSkippedJobs).to.equal(0) + expect(healthcheck.error).beUndefined() + expect(healthcheck.nbFailedTasks).to.equal(0) + expect(healthcheck.nbSuccessfulTasks).to.equal(1) + expect(healthcheck.successRate).to.equal(1) + expect(healthcheck.state).toExist() + expect(eventCount).to.equal(2) // 2 events + collection = client.db.collection('krawler-events') + const taskEvents = await collection.find({ event: 'task-done' }).toArray() + expect(taskEvents.length).to.equal(1) + const jobEvents = await collection.find({ event: 'job-done' }).toArray() + expect(jobEvents.length).to.equal(1) + // As it runs every 10 seconds wait until it should have ran at least once again + const seconds = Math.floor(moment().seconds()) + const remainingSecondsForNextRun = 10 - seconds % 10 + await utils.promisify(setTimeout)((1 + remainingSecondsForNextRun) * 1000) + try { + expect(runCount).to.be.at.least(2) // 2 runs + const response = await utils.promisify(request.get)('http://localhost:3030/healthcheck') + // console.log(response.body) + expect(response.statusCode).to.equal(500) + const healthcheck = JSON.parse(response.body) + // console.log(healthcheck) + const { error } = await runCommand('node ' + path.join(__dirname, '..', 'healthcheck.js')) + expect(error).toExist() + const healthcheckLog = fs.readJsonSync(path.join(__dirname, '..', 'healthcheck.log')) + expect(healthcheck).to.deep.equal(healthcheckLog) + expect(healthcheck.isRunning).to.equal(false) + expect(healthcheck.duration).beUndefined() + expect(healthcheck.nbSkippedJobs).to.equal(0) + expect(healthcheck.nbFailedTasks).beUndefined() + expect(healthcheck.nbSuccessfulTasks).beUndefined() + expect(healthcheck.successRate).beUndefined() + expect(healthcheck.error).toExist() + expect(healthcheck.error.message).toExist() + expect(healthcheck.error.message).to.equal('Test Error') + expect(eventCount).to.be.at.least(4) // 4 events + collection = client.db.collection('krawler-events') + const taskEvents = await collection.find({ event: 'task-done' }).toArray() + expect(taskEvents.length).to.be.at.least(2) + const jobEvents = await collection.find({ event: 'job-done' }).toArray() + expect(jobEvents.length).to.be.at.least(2) + } catch (error) { + console.log(error) + throw error + } }) // Let enough time to process .timeout(15000) diff --git a/test/jobs.cron.test.js b/test/jobs.cron.test.js new file mode 100644 index 00000000..e917e84c --- /dev/null +++ b/test/jobs.cron.test.js @@ -0,0 +1,100 @@ +import chai from 'chai' +import chailint from 'chai-lint' +import path, { dirname } from 'path' +import request from 'request' +import utils from 'util' +import moment from 'moment' +import { cli } from '../lib/index.js' +import { fileURLToPath } from 'url' + +const __filename = fileURLToPath(import.meta.url) +const __dirname = dirname(__filename) +const { util, expect } = chai + +describe('krawler:jobs:cron', () => { + let appServer + + before(async () => { + chailint(chai, util) + }) + + it('skipped job as CRON raises error', async () => { + let count = 0 + appServer = await cli({ + id: 'job', + tasks: [{ id: 'task', type: 'http', store: 'job-store', options: { url: 'https://www.google.com' } }], + hooks: { + tasks: { + before: { + apply: { + function: async () => { + if (count === 0) { + await utils.promisify(setTimeout)(10000) + count++ + } + } + } + } + }, + jobs: { + before: { + createStores: { + id: 'job-store', + type: 'fs', + options: { path: path.join(__dirname, 'output') } + } + } + } + } + }, { + port: 3030, + cron: '*/5 * * * * *', + nbSkippedJobs: 1, + messageTemplate: process.env.MESSAGE_TEMPLATE, + debug: false, + slackWebhook: process.env.SLACK_WEBHOOK_URL + }) + // As it runs every 5 seconds wait until it should have ran at least twice + const seconds = Math.floor(moment().seconds()) + const remainingSecondsForNextRun = 5 - seconds % 5 + await utils.promisify(setTimeout)((6 + remainingSecondsForNextRun) * 1000) + // Check for error with healthcheck + { + const response = await utils.promisify(request.get)('http://localhost:3030/healthcheck') + const healthcheck = JSON.parse(response.body) + // console.log(healthcheck) + expect(response.statusCode).to.equal(500) + expect(healthcheck.isRunning).beTrue() + expect(healthcheck.duration).beUndefined() + expect(healthcheck.nbSkippedJobs).to.be.at.least(1) + expect(healthcheck.nbFailedTasks).beUndefined() + expect(healthcheck.nbSuccessfulTasks).beUndefined() + expect(healthcheck.successRate).beUndefined() + expect(healthcheck.error).toExist() + expect(healthcheck.error.message).toExist() + expect(healthcheck.error.message.includes('Too much skipped jobs')).beTrue() + } + await utils.promisify(setTimeout)(5000) + // Now it should have finished + { + const response = await utils.promisify(request.get)('http://localhost:3030/healthcheck') + const healthcheck = JSON.parse(response.body) + // console.log(healthcheck) + expect(response.statusCode).to.equal(200) + expect(healthcheck.isRunning).beFalse() + expect(healthcheck.duration).toExist() + expect(healthcheck.nbSkippedJobs).to.equal(0) + expect(healthcheck.nbFailedTasks).to.equal(0) + expect(healthcheck.nbSuccessfulTasks).to.equal(1) + expect(healthcheck.successRate).to.equal(1) + expect(healthcheck.error).beUndefined() + } + }) + // Let enough time to process + .timeout(15000) + + // Cleanup + after(async () => { + if (appServer) await appServer.close() + }) +})