Skip to content

Commit

Permalink
refactor(server): cron jobs and agenda cron service cleaned up
Browse files Browse the repository at this point in the history
  • Loading branch information
ADRFranklin committed Jun 8, 2024
1 parent 954bc26 commit 4e3026b
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 44 deletions.
4 changes: 2 additions & 2 deletions server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { MongooseDatabaseConnection } from '@/infrastructure/database/connection
import logger from '@/infrastructure/logger/logger';
import { Master } from '@/infrastructure/worker/master';
import { CacheService } from '@/services/cache/cache-service';
import { runCron } from '@/services/cron';
import { AgendaCronService } from '@/services/cron/agenda-cron';
import { MigrationService } from '@/services/migration/migration';
import { SettingsService } from '@/services/settings/settings';

Expand Down Expand Up @@ -112,7 +112,7 @@ async function doWorker() {
}

if (settings.initialSetup && process.env.job) {
await runCron();
await getFromContainer(AgendaCronService).bootstrap();
}
}

Expand Down
5 changes: 4 additions & 1 deletion server/src/infrastructure/container/constants.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
/**
* Enum representing the container tags used for dependency injection.
*/
export enum ContainerTags {
// HTTP
HTTP_API_V1 = 'HTTP_API_V1',
HTTP_API_V2 = 'HTTP_API_V2',
// CRON
CRON_JOB = 'CRON_JOB',
AGENDA_CRON_JOB = 'AGENDA_CRON_JOB',
}
30 changes: 27 additions & 3 deletions server/src/services/cron/agenda-cron.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import { Agenda } from '@hokify/agenda';

import Bluebird from 'bluebird';
import { Service } from 'diod';

import { ContainerTags } from '@/infrastructure/container/constants';
import {
findTaggedServiceIdentifiers,
getFromContainer,
} from '@/infrastructure/container/container';
import { Logger } from '@/infrastructure/logger/logger';
import { CronService } from './service';
import { Jobber } from '@/services/cron/job';

import { CronService } from './cron-service';
import { CronJobProcessor, CronOptions, JobCronName } from './types';

/**
Expand All @@ -11,7 +19,7 @@ import { CronJobProcessor, CronOptions, JobCronName } from './types';
@Service()
export class AgendaCronService extends CronService {
constructor(
activeJobs: JobCronName[],
activeJobs: JobCronName[],
private agenda: Agenda,
logger: Logger,
) {
Expand Down Expand Up @@ -50,6 +58,21 @@ export class AgendaCronService extends CronService {
await this.agenda.every(interval, jobName);
}

/**
* Registers the cron jobs by finding the tagged service identifiers and
* calling the `register` method on each job.
* @returns A promise that resolves when all jobs have been registered.
*/
async registerJobs(): Promise<void> {
const jobIdentifiers = findTaggedServiceIdentifiers(
ContainerTags.AGENDA_CRON_JOB,
);
await Bluebird.mapSeries(jobIdentifiers, async (identifier) => {
const job = getFromContainer<Jobber>(identifier);
await job.register();
});
}

/**
* Removes a cron job from the Agenda scheduler.
* @param jobName - The name of the job to remove.
Expand All @@ -62,6 +85,7 @@ export class AgendaCronService extends CronService {
* Initializes the Agenda scheduler.
*/
protected async initialise() {
await this.registerJobs();
await this.agenda.start();
}

Expand Down
3 changes: 3 additions & 0 deletions server/src/services/cron/constants.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { CronOptions } from './types';

/**
* Default cron options.
*/
export const DefaultCronOptions: CronOptions = {
lockLimit: 1,
lockLifetime: 1000 * 60 * 10,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,19 @@ export abstract class CronService {
*/
async bootstrap(): Promise<void> {
if (!this.activeJobs || !this.activeJobs.length) {
this.logger.trace('no active jobs found');
this.logger.trace('No active jobs found');
} else {
this.logger.trace(
'found following active jobs:',
'Found following active jobs:',
this.activeJobs.join(', '),
);
}

try {
await this.initialise();
this.logger.debug('cron service initialised');
this.logger.debug('Cron service initialised');
} catch (err) {
this.logger.error(err, 'failed to initialise cron service');
this.logger.error(err, 'Failed to initialise cron service');
}
}

Expand All @@ -77,10 +77,15 @@ export abstract class CronService {
try {
await this.shutdown();
} catch (err) {
this.logger.error(err, 'failed to terminate cron service');
this.logger.error(err, 'Failed to terminate cron service');
}
}

/**
* Checks if a job with the specified name is active.
* @param jobName - The name of the job to check.
* @returns A boolean indicating whether the job is active or not.
*/
private isActiveJob(jobName: JobCronName): boolean {
return this.activeJobs.includes(jobName);
}
Expand All @@ -99,7 +104,7 @@ export abstract class CronService {
options: CronOptions,
): Promise<void> {
if (!this.isActiveJob(jobName)) {
this.logger.trace(`job '${jobName}' is not active`);
this.logger.trace(`Job '${jobName}' is not active`);
return;
}

Expand All @@ -122,20 +127,20 @@ export abstract class CronService {
});
const finishedAt = new Date();
_this.logger.info(
`job '${jobName}' processed in ${
`Job '${jobName}' processed in ${
finishedAt.getTime() - startedAt.getTime()
}ms`,
);
} catch (err) {
_this.logger.error(err, `failed to process job '${jobName}'`);
_this.logger.error(err, `Failed to process job '${jobName}'`);
}
},
interval,
mergedOptions,
);
this.logger.debug(`job '${jobName}' added`);
this.logger.debug(`Job '${jobName}' added`);
} catch (err) {
this.logger.error(err, `failed to add job '${jobName}'`);
this.logger.error(err, `Failed to add job '${jobName}'`);
}
}

Expand All @@ -145,15 +150,15 @@ export abstract class CronService {
*/
async remove(jobName: JobCronName): Promise<void> {
if (!this.isActiveJob(jobName)) {
this.logger.trace(`job '${jobName}' is not active`);
this.logger.trace(`Job '${jobName}' is not active`);
return;
}

try {
await this.removeJob(jobName);
this.logger.info(`job '${jobName}' removed`);
this.logger.info(`Job '${jobName}' removed`);
} catch (err) {
this.logger.error(err, `failed to remove job '${jobName}'`);
this.logger.error(err, `Failed to remove job '${jobName}'`);
}
}
}
32 changes: 7 additions & 25 deletions server/src/services/cron/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
import { Agenda } from '@hokify/agenda';
import Bluebird from 'bluebird';
import { ContainerBuilder } from 'diod';

import { DATABASE_URL } from '@/infrastructure/config/env';
import { ContainerTags } from '@/infrastructure/container/constants';
import {
findTaggedServiceIdentifiers,
getFromContainer,
} from '@/infrastructure/container/container';
import { Logger } from '@/infrastructure/logger/logger';
import { Jobber } from '@/services/cron/job';
import { ClearCacheJob } from '@/services/cron/jobs/clear-cache';
import { ContentScanJob } from '@/services/cron/jobs/content-scan';
import { LibraryScanJob } from '@/services/cron/jobs/library-scan';
Expand All @@ -24,27 +18,27 @@ export default (builder: ContainerBuilder) => {
builder
.registerAndUse(LibraryScanJob)
.asSingleton()
.addTag(ContainerTags.CRON_JOB);
.addTag(ContainerTags.AGENDA_CRON_JOB);
builder
.registerAndUse(ContentScanJob)
.asSingleton()
.addTag(ContainerTags.CRON_JOB);
.addTag(ContainerTags.AGENDA_CRON_JOB);
builder
.registerAndUse(UsersScanJob)
.asSingleton()
.addTag(ContainerTags.CRON_JOB);
.addTag(ContainerTags.AGENDA_CRON_JOB);
builder
.registerAndUse(QuotaResetJob)
.asSingleton()
.addTag(ContainerTags.CRON_JOB);
.addTag(ContainerTags.AGENDA_CRON_JOB);
builder
.registerAndUse(ResourceCacheJob)
.asSingleton()
.addTag(ContainerTags.CRON_JOB);
.addTag(ContainerTags.AGENDA_CRON_JOB);
builder
.registerAndUse(ClearCacheJob)
.asSingleton()
.addTag(ContainerTags.CRON_JOB);
.addTag(ContainerTags.AGENDA_CRON_JOB);
builder
.register(AgendaCronService)
.useFactory(
Expand All @@ -54,7 +48,7 @@ export default (builder: ContainerBuilder) => {
new Agenda({
db: { address: DATABASE_URL, collection: 'jobs' },
processEvery: '2 minutes',
maxConcurrency: 1,
maxConcurrency: 2,
defaultConcurrency: 1,
defaultLockLifetime: 1000 * 60 * 10,
ensureIndex: true,
Expand All @@ -64,15 +58,3 @@ export default (builder: ContainerBuilder) => {
)
.asSingleton();
};

/**
* Runs the cron jobs by registering and bootstrapping them.
* @returns A promise that resolves when the cron jobs are successfully registered and bootstrapped.
*/
export async function runCron() {
const cronService = getFromContainer<AgendaCronService>(AgendaCronService);
const tags = findTaggedServiceIdentifiers<Jobber>(ContainerTags.CRON_JOB);
const jobs = tags.map((tag) => getFromContainer<Jobber>(tag));
await Bluebird.map(jobs, async (job) => job.register());
return cronService.bootstrap();
}

0 comments on commit 4e3026b

Please sign in to comment.