From 431d2bdfd61ab999ae0e09f6eb6fc265da4fa504 Mon Sep 17 00:00:00 2001 From: csmig <33138761+csmig@users.noreply.github.com> Date: Thu, 13 Jun 2024 00:54:38 +0000 Subject: [PATCH] feat: schedule next scan after workers finish (#128) --- lib/scan.js | 74 ++++++++++++++++++++++++++++++++++------------------- 1 file changed, 48 insertions(+), 26 deletions(-) diff --git a/lib/scan.js b/lib/scan.js index 0b96eea..35dc817 100644 --- a/lib/scan.js +++ b/lib/scan.js @@ -1,6 +1,7 @@ import { logger } from './logger.js' import { options } from './args.js' import { queue as parseQueue} from './parse.js' +import { cargoQueue } from './cargo.js' import { serializeError } from 'serialize-error' import fg from 'fast-glob' import lineByLine from 'n-readlines' @@ -10,12 +11,50 @@ import Alarm from './alarm.js' const component = 'scan' const historySet = new Set() // in memory history set let isWriteScheduled = false // flag to indicate if there is pending files to write to the history file -let timeoutId // id of the active setTimeout +let isParseQueueActive = false // flag to indicate if parseQueue has pending work +let isCargoQueueActive = false // flag to indicate if cargoQueue has pending work /** - * Utility function that calls initHistory() and startScanner() + * Schedules the next scan if we're not in oneShot operation and the queues are idle + */ +function tryScheduleNextScan() { + if (!isParseQueueActive && !isCargoQueueActive && !options.oneShot) { + scheduleNextScan() + } +} + +/** + * - Attaches handlers for task_queued and drain events on the queues + * - Sets the flags that track if the queues have pending work + * - on drain events, try to schedule the next scan + */ +function initQueueEvents() { + parseQueue.on('task_queued', () => { + logger.verbose({ component, message: `handling parseQueue event`, event: 'task_queued' }) + isParseQueueActive = true + }) + parseQueue.on('drain', () => { + logger.verbose({ component, message: `handling parseQueue event`, event: 'drained' }) + isParseQueueActive = false + tryScheduleNextScan() + }) + cargoQueue.on('task_queued', () => { + logger.verbose({ component, message: `handling cargoQueue event`, event: 'task_queued' }) + isCargoQueueActive = true + }) + cargoQueue.on('drain', () => { + logger.verbose({ component, message: `handling cargoQueue event`, event: 'drained' }) + isCargoQueueActive = false + tryScheduleNextScan() + }) +} + +/** + * Utility function that calls initQueueEvents(), initHistory() and startScanner() + * Attaches handlers for alarmRaised and alarmLowered */ function initScanner() { + initQueueEvents() initHistory() startScanner() Alarm.on('alarmRaised', onAlarmRaised) @@ -53,18 +92,12 @@ async function startScanner() { //Remove stale files: those in historySet but not found in the current scan removeStaleFiles(discoveredFiles) logger.info({ component, message: `scan ended`, path: options.path }) + // allow queue event handlers to execute before calling tryScheduleNextScan() + setTimeout(tryScheduleNextScan, 0) } catch (e) { logger.error({ component, error: serializeError(e) }) } - finally { - if (!options.oneShot) { - scheduleNextScan() - } - else { - logger.info({ component, message: `one-shot scan completed`, path: options.path }) - } - } } /** @@ -83,7 +116,7 @@ function removeStaleFiles(currentFilesSet){ * References options properties {path, scanInterval}. */ function scheduleNextScan() { - timeoutId = setTimeout(() => { + setTimeout(() => { startScanner().catch(e => { logger.error({ component, error: serializeError(e) }) }) @@ -97,19 +130,6 @@ function scheduleNextScan() { }) } -/** - * Cancels the next scan and logs. - * References options properties {path}. - */ -function cancelNextScan() { - clearTimeout(timeoutId) - logger.info({ - component, - message: `scan cancelled`, - path: options.path - }) -} - /** * Returns immediately if options.historyFile is falsy. * Initializes the history Set by reading it from a file and adding each line to the history set. @@ -330,7 +350,8 @@ function onAlarmRaised(alarmType) { message: `handling raised alarm`, alarmType }) - cancelNextScan() + parseQueue.pause() + cargoQueue.pause() } /** @@ -344,7 +365,8 @@ function onAlarmLowered(alarmType) { message: `handling lowered alarm`, alarmType }) - startScanner() + parseQueue.resume() + cargoQueue.resume() }