Job queue on MongoDB. It uses atomic writes to grab jobs and exposes generator friendly API.
Requirements:
- NodeJS
>= 4
- MongoDB
>= 2.6
const yajob = require('yajob');
const mails = yajob('localhost/queuedb')
.tag('mails');
mails.put({
from: '[email protected]',
to: '[email protected]',
body: 'Wow!'
});
// => Promise
for (var mail of yield mails.take(100)) {
yield sendmail(mail);
}
Processed jobs removed from queue, when for-loop is ended or broken (either with break
or exception).
In some cases you will need to skip taken job. To do this pass into generator false
value:
const jobs = yield mails.take(100);
const job = jobs.next().value;
if (value === 'Ohnoez') {
job.next(false); // Returns Ohnoez back to queue and get next job
}
By default, all jobs have priority 0
. You can specify sort
for queue, in which jobs will be taken:
const important = queue.tag('mail').sort({priority: -1});
Returns instance of queue, that stores data in MongoDB.
Type: String
MongoDB URI string.
Type: Object
MongoDB MongoClient.connect options.
Add job to queue. Returns Promise
.
Type: Object
/ Array
Data, that will be attached to job. If attrs
is an Array
- then every Object
in attrs
considered as new job.
Add unique by attr job to queue. Returns Promise
. If job isn't unique promise will be resolved with {result: {ok: 0, n: 0}}
Type: Object
Data, that will be attached to job.
Type: Object
schedule
-Date
, when job should be available totake
priority
-Number
, that represents priority of job
Returns Promise
that resolves to a Generator
, that will emit jobs one by one.
After all jobs are taken from batch - they are considered done
and removed from queue.
Type: Number
Default: 1
Maximum number of jobs to take from one batch request.
Removes jobs, that match attrs
from queue. Returns Promise
.
Closes connections to MongoDB.
Default: default
Sets name
of the MongoDB collection, that will be used to save and get jobs.
Sets delay for job, that is not scheduled. That is - every job without schedule
options will be scheduled on Date() + delay
.
If job is failed delay will be used to define new shedule on Date() + delay
.
Default: Infinity
Sets maximum job trys, before failed
status will be assigned.
Delay between trys is set by delay(milliseconds)
method.
Sets sort order rule for take
. Use this, when you need to get jobs, sorted by priority.
0
- new job, that was just added to queue1
- taken job, that was assigned totakenBy
worker2
- failed job, that has moreattemts
than allowed
MIT © Vsevolod Strukchinsky