Implement an in-process cron scheduler that accepts a job and executes it periodically.
node-cronelda
is a simple lightweighted (zero-dependencies) scheduler that runs scheduled jobs based on provided intervals similar to cron-jobs. The scheduler is able to handle multiple synchronous and asynchronous jobs' executions concurrently. node-cronelda
works by spawning a child process which executes the jobs provided without blocking the main thread by communicating with the parent process following IPC.
It simply works by importing Scheduler
class in your main code.
const Scheduler = require("./Scheduler");
Then you create an object represents the job you need to run, and add it to the Scheduler
. Below is a simple code snippet illustrating.
const job = {
name: "job 1",
time: "1s",
execution: () => {
console.log("x");
},
};
const scheduler = new Scheduler(); // Create an instance from Scheduler
scheduler.addJob(job); // Add the above job to the Scheduler
scheduler.start(); // Starts executing the job
If you need to stop executing the jobs, just call the following method.
scheduler.stop()
- The client code initializes the
Scheduler
const scheduler = new Scheduler();
- The client adds the jobs' data to the client using either
addJob
oraddBulkJobs
methods.scheduler.addJob(job); //or scheduler.addBulkJobs(jobs)
- The client calls
start
method of the scheduler to start executing the jobs.scheduler.start()
- Upon calling
start
method, the scheduler emits an event calledstart-scheduler
where upon emitting this event, theScheduler
calls an auxiliary method calledstartScheduler
.this.on("scheduler-start", () => { console.log( `SCHEDULER: ---Starting scheduler--- {${new Date().toLocaleString()}}` ); this.startScheduler(); });
- The
startScheduler
method spawns a child process which runs thedaemon
, which is responsible for executing and stopping the jobs in the scheduler.startScheduler() { if (!this._isRunning) { this._daemonProcess = fork(this._daemonPath); //...
- Then, the
scheduler
converts the execution functions in thejobs
added in step.2 to a string form, and sends them as a message of typeget-jobs-data
to the child process using IPC.const jobsToSend = this._jobs.map((job) => { job.execution = job.execution.toString(); return job; }); this._daemonProcess.send({ type: "get-jobs-data", data: jobsToSend, });
- The
daemon
keeps listening for messages, and when it receives a message of typeget-jobs-data
, it converts the jobs' execution functions into its normal form and add it to its_jobs
list.//daemon.js const Job = require("./Job"); let _jobs = new Map(); process.on("message", (message) => { switch (message.type) { case "get-jobs-data": if (message.data) { try { message.data?.forEach((job) => { var execution = new Function("return " + job.execution)(); _jobs.set( job.name, new Job(job.name, job.time, execution, job?.options) ); }); } catch (error) { process.send("get-jobs-error", error); } } break;
- Back to the
startScheduler
method inScheduler.js
. After sending the data to thedaemon
, the scheduler instructs thedaemon
to start executing the jobs.//... this._daemonProcess.send({ type: "run-jobs", }); this._daemonProcess.on("message", (message) => { if (message === "daemon-isRunning") { this._isRunning = true; } }); //...
- The
daemon
receives therun-jobs
message and starts running the jobs usingstartDaemon()
method, and sends a message to signal that the daemon is currently running to update the status of thescheduler
torunning
case "run-jobs": if (_jobs.size > 0) { console.log("DAEMON: -----Starting Daemon-----"); startDaemon(); process.send("daemon-isRunning"); } break;
startDaemon()
method callsjob.execute()
method of each job received from theScheduler
//daemon.js function startDaemon() { try { Array.from(_jobs.values()).forEach((job) => { job.execute(); }); } catch (error) { process.send("job-failed", error); } }
- When the client wants to stops the
daemon
from running, they must call the following method in their source codescheduler.stop()
stop()
method emits an event calledscheduler-stop
which callsstopScheduler
auxiliary method.this.on("scheduler-stop", () => { console.log("SCHEDULER: ---Stopping scheduler---"); this.stopScheduler(); });
stopScheduler
method will send a message to thedaemon
to stop running the jobs, and wait for a reply withdaemon-stopped
in order to terminate the daemon process and set the scheduler torunning = false
//Scheduler.js
//...
stopScheduler() {
if (this.isRunning()) {
this._daemonProcess.send({
type: "stop-jobs",
});
this._daemonProcess.on("message", (message) => {
if (message === "daemon-stopped") {
this._daemonProcess.kill();
this._isRunning = false;
console.log(
`SCHEDULER: ---Stopped scheduler--- {${new Date().toLocaleString()}}`
);
}
});
}
}
- The
daemon
will receive thestop-jobs
message and callsstopDaemon()
method, which will clear the interval IDs of each job.
//daemon.js
//...
case "stop-jobs":
console.log("DAEMON: -----Stopping Daemon-----");
stopDaemon();
console.log("DAEMON: -----Stopped Daemon-----");
process.send("daemon-stopped");
break;
//...
function stopDaemon() {
Array.from(_jobs.values()).forEach((job) => {
job.stopJob();
});
}
//...
job.stopJob()
method will emit an eventstop-job
, which will clear theintervalId
ortimeoutId
and set the_intervalId
of the job tonull
value
//Job.js
//...
this.on("stop-job", () => {
clearInterval(this._intervalId);
clearTimeout(this._intervalId);
this._intervalId = null;
});
This represents a scheduled task with a name, execution function, and time interval.
name
: represents unique name of the job.time
: represents time interval of the job. (e.g. 1s, 30m, 1hr 25m, 2d) Limit: ~~ 25dexecution
: represents the function that runs during executing the job.options?
(optional): an optional object that takes propertyonce
if you wanted to run the job only once (default=false).
{
name: "job 1",
time: "5s",
execution: () => {
console.log("x");
},
options: {
once: true,
},
},
In Parser.js
, there is a function converts time expressions like 1h 10m
to a single integer to be understood by the setTimeout
or setInterval
methods.
Currently these are the allowed expressions.
* allowed expressions:
* 1. "s" -> seconds
* 2. "m" -> minutes
* 3. "h" -> hours
* 4. "d" -> days
* 5. "w" -> weeks
* 6. "M" -> months
* 7. "y" -> years
You can write 1hr 10m 25s
which will be converted to an interval of 4225000
Note: due to nature of setInterval() and setTimeout, the maximum expression allowed is 25 days (See limitations section below)
-
I decided to delegate running jobs to another module called
daemon
which runs in a child process whenever the Scheduler'sstart()
method is called.Why did I go with this approach?
- I faced a problem with clearing the timeouts of the tasks whenever I call
scheduler.stop()
, since it keeps waiting for the last task to finish its callback, then terminates. Unlike using a separate child process which will terminate the process directly. - In addition, this approach will guarantee us that the main thread will not be blocked by any CPU Intensive jobs running.
- I faced a problem with clearing the timeouts of the tasks whenever I call
-
I decided to use the
EventEmitter
API by Node.js to adapt a publisher-subscriber pattern where I can perform specific actions upon listening to specific events. For example, start running theScheduler
, stopping theScheduler
, executing the jobs in thedaemon
,...etc.
-
Upon sending a
Job
object from theScheduler
to thedaemon
child process, it serializes the object being sent internally while JSON doesn't support serializing Functions.JSON doesn't support serializing functions (at least out of the box). You could convert the function to its string representation first (via function.toString()) and then re-create the function again in the child process. The problem though is you lose scope and context with this process, so your function really has to be standalone.
This left me with an option to convert the execution method of the
Job
to a string form, and convert it back to its actual form in thedaemon
process upon receiving theJob
object. -
Adding
process.stdout.write()
when trying to log the current time and job's name before execution resulted in overflow of the logs between the asynchronous jobs and each other. Sometimes it is not stable, since we cannot expect the behavior of the Event loop.Instead I emitted events before execution and after execution of the job's task.
this.emit("start-executing");
exec = this._execution();
//....
this.emit("finished-executing");
And inside each event listener for the above events, I logged the time and job's name which signaled these events.
this.on("start-executing", () => {
console.log(
`[${new Date().toLocaleString()}] Job {${this.getName()}}: started executing `
);
this._isExecuting = true;
});
this.on("finished-executing", () => {
console.log(
`[${new Date().toLocaleString()}] Job {${this.getName()}}: finished executing `
);
this._isExecuting = false;
});
//index.js
const Scheduler = require("./Scheduler");
function main() {
try {
const job = {
name: "job 1",
time: "5s",
execution: () => {
console.log("job single run");
},
options: {
once: true,
},
};
const scheduler = new Scheduler();
scheduler.addJob(job);
scheduler.start();
} catch (error) {
console.error(error);
}
}
main();
//index.js
const Scheduler = require("./Scheduler");
const jobsBulk = [
{
name: "job 1",
time: "5s",
execution: () => {
console.log("x");
},
options: {
once: true,
},
},
];
function main() {
try {
const scheduler = new Scheduler();
scheduler.addBulkJobs(jobsBulk);
scheduler.start();
//Add `job 2` 9 seconds after starting the scheduler
setTimeout(() => {
scheduler.addJob({
name: "job 2",
time: "2s",
execution: () => {
return new Promise((resolve,reject) => {
setTimeout(() => {
console.log("hello world async after 2 seconds");
resolve()
}, 2000);
});
},
});
}, 9000);
} catch (error) {
console.error(error);
}
}
main();
//index.js
const Scheduler = require("./Scheduler");
const jobsBulk = [
{
name: "job 1",
time: "5s",
execution: () => {
console.log("hello world synchronous");
},
options: {
once: false,
},
},
{
name: "job 3",
time: "6s",
execution: () => {
return new Promise((resolve, reject) => {
setTimeout(() => {
console.log("hello world async after 2 second ");
resolve();
}, 2000);
});
},
},
];
function main() {
try {
const scheduler = new Scheduler();
scheduler.addBulkJobs(jobsBulk);
scheduler.start();
//Add `job 2` 9 seconds after starting the scheduler
setTimeout(() => {
scheduler.addJob({
name: "job 2",
time: "2s",
execution: () => {
return new Promise((resolve, reject) => {
setTimeout(() => {
console.log("hello world async after 1 seconds");
resolve();
}, 1000);
});
},
});
}, 9000);
//Shutdown after 15 seconds
setTimeout(() => {
scheduler.stop();
}, 15000);
} catch (error) {
console.error(error);
}
}
main();
- Support cron expressions in
time
. e.g. (* * * * * *) - Support persisting the jobs metadata either on local storage or a database (MongoDB, Redis)
- Support configuring the
Scheduler
to run automatically when server reboots (depends on number 2) - Add CI/CD Pipeline to run tests in pull requests.
- Support running the
daemon
indetached
mode, which keeps running the jobs even if the parent process is terminated.
- Maximum
interval
can be added to a job is the maximumdelay
parameter accepted bysetInterval
orsetTimeout
which is2147483647
ms, approximately 25 days. If you added longerinterval
than the max, it will be automatically set to the maximum interval. - When the
Scheduler
sends the jobs' data to thedaemon
, theexecution
function of each job is stringfied. Thus, it loses itsthis
context. So, theexecution
function of the job must be standalone-function for now.