diff --git a/server/src/app.ts b/server/src/app.ts index 91a075cb..1153e775 100644 --- a/server/src/app.ts +++ b/server/src/app.ts @@ -20,7 +20,7 @@ import { MongooseDatabaseConnection } from '@/infrastructure/database/connection import logger from '@/infrastructure/logger/logger'; import { Master } from '@/infrastructure/worker/master'; import { CacheService } from '@/services/cache/cache-service'; -import { runCron } from '@/services/cron'; +import { AgendaCronService } from '@/services/cron/agenda-cron'; import { MigrationService } from '@/services/migration/migration'; import { SettingsService } from '@/services/settings/settings'; @@ -112,7 +112,7 @@ async function doWorker() { } if (settings.initialSetup && process.env.job) { - await runCron(); + await getFromContainer(AgendaCronService).bootstrap(); } } diff --git a/server/src/infrastructure/container/constants.ts b/server/src/infrastructure/container/constants.ts index 08221a3b..65fa643d 100644 --- a/server/src/infrastructure/container/constants.ts +++ b/server/src/infrastructure/container/constants.ts @@ -1,7 +1,10 @@ +/** + * Enum representing the container tags used for dependency injection. + */ export enum ContainerTags { // HTTP HTTP_API_V1 = 'HTTP_API_V1', HTTP_API_V2 = 'HTTP_API_V2', // CRON - CRON_JOB = 'CRON_JOB', + AGENDA_CRON_JOB = 'AGENDA_CRON_JOB', } diff --git a/server/src/services/cron/agenda-cron.ts b/server/src/services/cron/agenda-cron.ts index 20bd8e31..ea99f8aa 100644 --- a/server/src/services/cron/agenda-cron.ts +++ b/server/src/services/cron/agenda-cron.ts @@ -1,8 +1,16 @@ import { Agenda } from '@hokify/agenda'; - +import Bluebird from 'bluebird'; import { Service } from 'diod'; + +import { ContainerTags } from '@/infrastructure/container/constants'; +import { + findTaggedServiceIdentifiers, + getFromContainer, +} from '@/infrastructure/container/container'; import { Logger } from '@/infrastructure/logger/logger'; -import { CronService } from './service'; +import { Jobber } from '@/services/cron/job'; + +import { CronService } from './cron-service'; import { CronJobProcessor, CronOptions, JobCronName } from './types'; /** @@ -11,7 +19,7 @@ import { CronJobProcessor, CronOptions, JobCronName } from './types'; @Service() export class AgendaCronService extends CronService { constructor( - activeJobs: JobCronName[], + activeJobs: JobCronName[], private agenda: Agenda, logger: Logger, ) { @@ -50,6 +58,21 @@ export class AgendaCronService extends CronService { await this.agenda.every(interval, jobName); } + /** + * Registers the cron jobs by finding the tagged service identifiers and + * calling the `register` method on each job. + * @returns A promise that resolves when all jobs have been registered. + */ + async registerJobs(): Promise { + const jobIdentifiers = findTaggedServiceIdentifiers( + ContainerTags.AGENDA_CRON_JOB, + ); + await Bluebird.mapSeries(jobIdentifiers, async (identifier) => { + const job = getFromContainer(identifier); + await job.register(); + }); + } + /** * Removes a cron job from the Agenda scheduler. * @param jobName - The name of the job to remove. @@ -62,6 +85,7 @@ export class AgendaCronService extends CronService { * Initializes the Agenda scheduler. */ protected async initialise() { + await this.registerJobs(); await this.agenda.start(); } diff --git a/server/src/services/cron/constants.ts b/server/src/services/cron/constants.ts index fab4daff..0a70ac88 100644 --- a/server/src/services/cron/constants.ts +++ b/server/src/services/cron/constants.ts @@ -1,5 +1,8 @@ import { CronOptions } from './types'; +/** + * Default cron options. + */ export const DefaultCronOptions: CronOptions = { lockLimit: 1, lockLifetime: 1000 * 60 * 10, diff --git a/server/src/services/cron/service.ts b/server/src/services/cron/cron-service.ts similarity index 78% rename from server/src/services/cron/service.ts rename to server/src/services/cron/cron-service.ts index e7d4751d..fad92028 100644 --- a/server/src/services/cron/service.ts +++ b/server/src/services/cron/cron-service.ts @@ -54,19 +54,19 @@ export abstract class CronService { */ async bootstrap(): Promise { if (!this.activeJobs || !this.activeJobs.length) { - this.logger.trace('no active jobs found'); + this.logger.trace('No active jobs found'); } else { this.logger.trace( - 'found following active jobs:', + 'Found following active jobs:', this.activeJobs.join(', '), ); } try { await this.initialise(); - this.logger.debug('cron service initialised'); + this.logger.debug('Cron service initialised'); } catch (err) { - this.logger.error(err, 'failed to initialise cron service'); + this.logger.error(err, 'Failed to initialise cron service'); } } @@ -77,10 +77,15 @@ export abstract class CronService { try { await this.shutdown(); } catch (err) { - this.logger.error(err, 'failed to terminate cron service'); + this.logger.error(err, 'Failed to terminate cron service'); } } + /** + * Checks if a job with the specified name is active. + * @param jobName - The name of the job to check. + * @returns A boolean indicating whether the job is active or not. + */ private isActiveJob(jobName: JobCronName): boolean { return this.activeJobs.includes(jobName); } @@ -99,7 +104,7 @@ export abstract class CronService { options: CronOptions, ): Promise { if (!this.isActiveJob(jobName)) { - this.logger.trace(`job '${jobName}' is not active`); + this.logger.trace(`Job '${jobName}' is not active`); return; } @@ -122,20 +127,20 @@ export abstract class CronService { }); const finishedAt = new Date(); _this.logger.info( - `job '${jobName}' processed in ${ + `Job '${jobName}' processed in ${ finishedAt.getTime() - startedAt.getTime() }ms`, ); } catch (err) { - _this.logger.error(err, `failed to process job '${jobName}'`); + _this.logger.error(err, `Failed to process job '${jobName}'`); } }, interval, mergedOptions, ); - this.logger.debug(`job '${jobName}' added`); + this.logger.debug(`Job '${jobName}' added`); } catch (err) { - this.logger.error(err, `failed to add job '${jobName}'`); + this.logger.error(err, `Failed to add job '${jobName}'`); } } @@ -145,15 +150,15 @@ export abstract class CronService { */ async remove(jobName: JobCronName): Promise { if (!this.isActiveJob(jobName)) { - this.logger.trace(`job '${jobName}' is not active`); + this.logger.trace(`Job '${jobName}' is not active`); return; } try { await this.removeJob(jobName); - this.logger.info(`job '${jobName}' removed`); + this.logger.info(`Job '${jobName}' removed`); } catch (err) { - this.logger.error(err, `failed to remove job '${jobName}'`); + this.logger.error(err, `Failed to remove job '${jobName}'`); } } } diff --git a/server/src/services/cron/index.ts b/server/src/services/cron/index.ts index 19e237d9..2dcf106d 100644 --- a/server/src/services/cron/index.ts +++ b/server/src/services/cron/index.ts @@ -1,15 +1,9 @@ import { Agenda } from '@hokify/agenda'; -import Bluebird from 'bluebird'; import { ContainerBuilder } from 'diod'; import { DATABASE_URL } from '@/infrastructure/config/env'; import { ContainerTags } from '@/infrastructure/container/constants'; -import { - findTaggedServiceIdentifiers, - getFromContainer, -} from '@/infrastructure/container/container'; import { Logger } from '@/infrastructure/logger/logger'; -import { Jobber } from '@/services/cron/job'; import { ClearCacheJob } from '@/services/cron/jobs/clear-cache'; import { ContentScanJob } from '@/services/cron/jobs/content-scan'; import { LibraryScanJob } from '@/services/cron/jobs/library-scan'; @@ -24,27 +18,27 @@ export default (builder: ContainerBuilder) => { builder .registerAndUse(LibraryScanJob) .asSingleton() - .addTag(ContainerTags.CRON_JOB); + .addTag(ContainerTags.AGENDA_CRON_JOB); builder .registerAndUse(ContentScanJob) .asSingleton() - .addTag(ContainerTags.CRON_JOB); + .addTag(ContainerTags.AGENDA_CRON_JOB); builder .registerAndUse(UsersScanJob) .asSingleton() - .addTag(ContainerTags.CRON_JOB); + .addTag(ContainerTags.AGENDA_CRON_JOB); builder .registerAndUse(QuotaResetJob) .asSingleton() - .addTag(ContainerTags.CRON_JOB); + .addTag(ContainerTags.AGENDA_CRON_JOB); builder .registerAndUse(ResourceCacheJob) .asSingleton() - .addTag(ContainerTags.CRON_JOB); + .addTag(ContainerTags.AGENDA_CRON_JOB); builder .registerAndUse(ClearCacheJob) .asSingleton() - .addTag(ContainerTags.CRON_JOB); + .addTag(ContainerTags.AGENDA_CRON_JOB); builder .register(AgendaCronService) .useFactory( @@ -54,7 +48,7 @@ export default (builder: ContainerBuilder) => { new Agenda({ db: { address: DATABASE_URL, collection: 'jobs' }, processEvery: '2 minutes', - maxConcurrency: 1, + maxConcurrency: 2, defaultConcurrency: 1, defaultLockLifetime: 1000 * 60 * 10, ensureIndex: true, @@ -64,15 +58,3 @@ export default (builder: ContainerBuilder) => { ) .asSingleton(); }; - -/** - * Runs the cron jobs by registering and bootstrapping them. - * @returns A promise that resolves when the cron jobs are successfully registered and bootstrapped. - */ -export async function runCron() { - const cronService = getFromContainer(AgendaCronService); - const tags = findTaggedServiceIdentifiers(ContainerTags.CRON_JOB); - const jobs = tags.map((tag) => getFromContainer(tag)); - await Bluebird.map(jobs, async (job) => job.register()); - return cronService.bootstrap(); -}