Skip to content

Commit

Permalink
feat: schedule next scan after workers finish (#128)
Browse files Browse the repository at this point in the history
  • Loading branch information
csmig authored Jun 13, 2024
1 parent 24efe12 commit 431d2bd
Showing 1 changed file with 48 additions and 26 deletions.
74 changes: 48 additions & 26 deletions lib/scan.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { logger } from './logger.js'
import { options } from './args.js'
import { queue as parseQueue} from './parse.js'
import { cargoQueue } from './cargo.js'
import { serializeError } from 'serialize-error'
import fg from 'fast-glob'
import lineByLine from 'n-readlines'
Expand All @@ -10,12 +11,50 @@ import Alarm from './alarm.js'
const component = 'scan'
const historySet = new Set() // in memory history set
let isWriteScheduled = false // flag to indicate if there is pending files to write to the history file
let timeoutId // id of the active setTimeout
let isParseQueueActive = false // flag to indicate if parseQueue has pending work
let isCargoQueueActive = false // flag to indicate if cargoQueue has pending work

/**
* Utility function that calls initHistory() and startScanner()
* Schedules the next scan if we're not in oneShot operation and the queues are idle
*/
function tryScheduleNextScan() {
if (!isParseQueueActive && !isCargoQueueActive && !options.oneShot) {
scheduleNextScan()
}
}

/**
* - Attaches handlers for task_queued and drain events on the queues
* - Sets the flags that track if the queues have pending work
* - on drain events, try to schedule the next scan
*/
function initQueueEvents() {
parseQueue.on('task_queued', () => {
logger.verbose({ component, message: `handling parseQueue event`, event: 'task_queued' })
isParseQueueActive = true
})
parseQueue.on('drain', () => {
logger.verbose({ component, message: `handling parseQueue event`, event: 'drained' })
isParseQueueActive = false
tryScheduleNextScan()
})
cargoQueue.on('task_queued', () => {
logger.verbose({ component, message: `handling cargoQueue event`, event: 'task_queued' })
isCargoQueueActive = true
})
cargoQueue.on('drain', () => {
logger.verbose({ component, message: `handling cargoQueue event`, event: 'drained' })
isCargoQueueActive = false
tryScheduleNextScan()
})
}

/**
* Utility function that calls initQueueEvents(), initHistory() and startScanner()
* Attaches handlers for alarmRaised and alarmLowered
*/
function initScanner() {
initQueueEvents()
initHistory()
startScanner()
Alarm.on('alarmRaised', onAlarmRaised)
Expand Down Expand Up @@ -53,18 +92,12 @@ async function startScanner() {
//Remove stale files: those in historySet but not found in the current scan
removeStaleFiles(discoveredFiles)
logger.info({ component, message: `scan ended`, path: options.path })
// allow queue event handlers to execute before calling tryScheduleNextScan()
setTimeout(tryScheduleNextScan, 0)
}
catch (e) {
logger.error({ component, error: serializeError(e) })
}
finally {
if (!options.oneShot) {
scheduleNextScan()
}
else {
logger.info({ component, message: `one-shot scan completed`, path: options.path })
}
}
}

/**
Expand All @@ -83,7 +116,7 @@ function removeStaleFiles(currentFilesSet){
* References options properties {path, scanInterval}.
*/
function scheduleNextScan() {
timeoutId = setTimeout(() => {
setTimeout(() => {
startScanner().catch(e => {
logger.error({ component, error: serializeError(e) })
})
Expand All @@ -97,19 +130,6 @@ function scheduleNextScan() {
})
}

/**
* Cancels the next scan and logs.
* References options properties {path}.
*/
function cancelNextScan() {
clearTimeout(timeoutId)
logger.info({
component,
message: `scan cancelled`,
path: options.path
})
}

/**
* Returns immediately if options.historyFile is falsy.
* Initializes the history Set by reading it from a file and adding each line to the history set.
Expand Down Expand Up @@ -330,7 +350,8 @@ function onAlarmRaised(alarmType) {
message: `handling raised alarm`,
alarmType
})
cancelNextScan()
parseQueue.pause()
cargoQueue.pause()
}

/**
Expand All @@ -344,7 +365,8 @@ function onAlarmLowered(alarmType) {
message: `handling lowered alarm`,
alarmType
})
startScanner()
parseQueue.resume()
cargoQueue.resume()
}


Expand Down

0 comments on commit 431d2bd

Please sign in to comment.