Skip to content

Commit

Permalink
fix: Healthcheck endpoint does not raise errors like the healthcheck …
Browse files Browse the repository at this point in the history
…command (closes #279)
  • Loading branch information
claustres committed Jul 5, 2024
1 parent 936a6fd commit bec2b88
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 48 deletions.
75 changes: 35 additions & 40 deletions lib/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,50 +126,51 @@ export function getServer () {
return server
}

export function runJob (job, options = {}) {
export async function runJob (job, options = {}) {
// Function to effectively run the job
function runJobWithOptions () {
async function runJobWithOptions () {
console.log(`Launching job ${job.id} at ${(new Date()).toISOString()}, please wait...`)
console.time('Running time')
Healthcheck.jobId = job.id
Healthcheck.isRunning = true
const hrstart = process.hrtime()
return app.service('jobs').create(job)
.then(tasks => {
console.log('Job terminated, ' + tasks.length + ' tasks ran')
const hrend = process.hrtime(hrstart)
console.timeEnd('Running time')
Healthcheck.isRunning = false
Healthcheck.duration = hrend[0] + (hrend[1] / 1e9)
Healthcheck.error = null
// Compute the error ratio for fault-tolerant jobs
Healthcheck.nbFailedTasks = 0
Healthcheck.nbSuccessfulTasks = 0
tasks.forEach(task => {
if (task.error) Healthcheck.nbFailedTasks++
else Healthcheck.nbSuccessfulTasks++
})
const nbTotalTasks = (Healthcheck.nbSuccessfulTasks + Healthcheck.nbFailedTasks)
// Job with 0 tasks is always succesful
Healthcheck.successRate = (nbTotalTasks > 0 ? Healthcheck.nbSuccessfulTasks / nbTotalTasks : 1)
return Promise.resolve(tasks)
})
.catch(error => {
console.error(error.message)
Healthcheck.isRunning = false
Healthcheck.error = error
try {
const tasks = await app.service('jobs').create(job)
console.log('Job terminated, ' + tasks.length + ' tasks ran')
const hrend = process.hrtime(hrstart)
console.timeEnd('Running time')
Healthcheck.isRunning = false
Healthcheck.duration = hrend[0] + (hrend[1] / 1e9)
Healthcheck.error = null
// Compute the error ratio for fault-tolerant jobs
Healthcheck.nbFailedTasks = 0
Healthcheck.nbSuccessfulTasks = 0
tasks.forEach(task => {
if (task.error) Healthcheck.nbFailedTasks++
else Healthcheck.nbSuccessfulTasks++
})
const nbTotalTasks = (Healthcheck.nbSuccessfulTasks + Healthcheck.nbFailedTasks)
// Job with 0 tasks is always succesful
Healthcheck.successRate = (nbTotalTasks > 0 ? Healthcheck.nbSuccessfulTasks / nbTotalTasks : 1)
// Check if run has to be considered as successful
await healthcheck(options)
return Promise.resolve(tasks)
} catch (error) {
console.error(error.message)
Healthcheck.isRunning = false
Healthcheck.error = error
}
}

// Setup CRON job if required
let cronJob
if (options.cron) {
console.log('Scheduling job with cron pattern ' + options.cron)
cronJob = new CronJob(options.cron, () => {
cronJob = new CronJob(options.cron, async () => {
// If last job has not yet finished skip this call as we are late
if (!Healthcheck.isRunning) {
Healthcheck.nbSkippedJobs = 0
runJobWithOptions()
await runJobWithOptions()
} else {
console.log('Skipping scheduled job as previous one is not yet finished')
Healthcheck.nbSkippedJobs++
Expand All @@ -182,32 +183,25 @@ export function runJob (job, options = {}) {
})
}
// Run job
let tasks
if (cronJob) {
cronJob.start()
// Force run on start ?
if (options.run) return runJobWithOptions()
if (options.run) tasks = await runJobWithOptions()
} else {
return runJobWithOptions()
tasks = await runJobWithOptions()
}
return tasks
}

export async function run (job, options = {}) {
await createApp(job, options)
const tasks = await runJob(job, options)
// Check if run has to be considered as successful
// If CRON the healthcheck command should be used instead
if (!options.cron) {
let healthcheckError
try {
await healthcheck(options)
} catch (error) {
healthcheckError = error
}
// When not running job continuously stop the server as well
await server.close()
if (healthcheckError) throw healthcheckError
return tasks
}
return tasks
}

export async function processOptions () {
Expand All @@ -230,6 +224,7 @@ export async function processOptions () {
.option('-s, --sync [uri]', 'Activate sync module with given connection URI')
.option('-sr, --success-rate [rate]', 'Change the success rate for fault-tolerant jobs to be considered as successful (defaults to 1)', process.env.SUCCESS_RATE ? Number(process.env.SUCCESS_RATE) : 1)
.option('-md, --max-duration [duration]', 'Change the maximum run duration in seconds for fault-tolerant jobs to be considered as failed (defaults to unset)', process.env.MAX_DURATION ? Number(process.env.MAX_DURATION) : -1)
.option('-nsj, --nb-skipped-jobs [nb]', 'Change the number of skipped runs for fault-tolerant jobs to be considered as failed (defaults to 3)', process.env.NB_SKIPPED_JOBS ? Number(process.env.NB_SKIPPED_JOBS) : 3)
.option('-sw, --slack-webhook [url]', 'Slack webhook URL to post messages on failure', process.env.SLACK_WEBHOOK_URL)
.option('-mt, --message-template [template]', 'Message template used on failure', process.env.MESSAGE_TEMPLATE || 'Job <%= jobId %>: <%= error.message %>')
.option('-lt, --link-template [template]', 'Link template used on failure', process.env.LINK_TEMPLATE || '')
Expand Down
1 change: 0 additions & 1 deletion lib/healthcheck.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ async function publishToSlack (slackWebhook, data, compilers, posttext = '', col
const message = compilers.message(data)
const link = compilers.link(data)
const text = link ? `<${link}|${message}${posttext}>` : `${message}${posttext}`

await utils.promisify(request.post)({
url: slackWebhook,
body: JSON.stringify({
Expand Down
18 changes: 11 additions & 7 deletions test/cli.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ describe('krawler:cli', () => {

it('runs successfully once using CLI', async () => {
try {
const tasks = await cli(jobfile, { port: 3030, messageTemplate: 'Job <%= jobId %>: <%= error.message %>', debug: true })
const tasks = await cli(jobfile, { port: 3030, messageTemplate: process.env.MESSAGE_TEMPLATE, debug: true, slackWebhook: process.env.SLACK_WEBHOOK_URL })
// All other features should have been tested independently
// so we just test here the CLI run correctly
expect(tasks.length).to.equal(1)
Expand All @@ -55,7 +55,7 @@ describe('krawler:cli', () => {

it('runs unsuccessfully once using CLI', async () => {
try {
await cli(jobfile, { port: 3030, maxDuration: 0, messageTemplate: 'Job <%= jobId %>: <%= error.message %>', debug: true })
await cli(jobfile, { port: 3030, maxDuration: 0, messageTemplate: process.env.MESSAGE_TEMPLATE, debug: true, slackWebhook: process.env.SLACK_WEBHOOK_URL })
assert.fail('Healthcheck should fail')
} catch (error) {
// Check intermediate products have been erased and final product are here
Expand All @@ -70,7 +70,7 @@ describe('krawler:cli', () => {
try {
// Clean previous test output
fs.removeSync(path.join(outputPath, 'RJTT-30-18000-2-1.tif.csv'))
appServer = await cli(jobfile, { mode: 'setup', api: true, apiPrefix: '/api', port: 3030 })
appServer = await cli(jobfile, { mode: 'setup', api: true, apiPrefix: '/api', port: 3030, messageTemplate: process.env.MESSAGE_TEMPLATE, debug: true, slackWebhook: process.env.SLACK_WEBHOOK_URL })
// Submit a job to be run
const response = await utils.promisify(request.post)({
url: 'http://localhost:3030/api/jobs',
Expand Down Expand Up @@ -108,7 +108,11 @@ describe('krawler:cli', () => {
cli(jobfile, {
mode: 'setup',
sync: 'mongodb://127.0.0.1:27017/krawler-test',
cron: '*/10 * * * * *'
port: 3030,
cron: '*/10 * * * * *',
messageTemplate: process.env.MESSAGE_TEMPLATE,
debug: true,
slackWebhook: process.env.SLACK_WEBHOOK_URL
})
.then(server => {
appServer = server
Expand All @@ -124,7 +128,7 @@ describe('krawler:cli', () => {
runCount++
// First run is fine, second one raises an error
if (runCount === 1) return hook
else throw new Error('Error')
else throw new Error('Test Error')
}
}
})
Expand All @@ -134,7 +138,7 @@ describe('krawler:cli', () => {
if ((event.name === 'task-done') || (event.name === 'job-done')) eventCount++
})
// Only run as we already setup the app
cli(jobfile, { mode: 'runJob', cron: '*/10 * * * * *', run: true })
cli(jobfile, { mode: 'runJob', port: 3030, cron: '*/10 * * * * *', run: true, messageTemplate: process.env.MESSAGE_TEMPLATE, debug: true, slackWebhook: process.env.SLACK_WEBHOOK_URL })
.then(async () => {
// As it runs every 10 seconds wait until it should have ran at least once again
const seconds = Math.floor(moment().seconds())
Expand All @@ -161,7 +165,7 @@ describe('krawler:cli', () => {
expect(healthcheck.successRate).beUndefined()
expect(healthcheck.error).toExist()
expect(healthcheck.error.message).toExist()
expect(healthcheck.error.message).to.equal('Error')
expect(healthcheck.error.message).to.equal('Test Error')
expect(eventCount).to.equal(4) // 4 events
collection = client.db.collection('krawler-events')
const taskEvents = await collection.find({ event: 'task-done' }).toArray()
Expand Down

0 comments on commit bec2b88

Please sign in to comment.