node-taskman is a node.js implementation of taskman worker.
Taskman is a system of worker which work with a queue and a hash in redis, so we can pilot it directly in redis console or via other languages. It support FIFO and LIFO, we can set an end date, get multiple data in one time, etc...
var taskman = require("node-taskman"), driver, queue, worker;
driver = new taskman.driver.RedisDriver();
queue = new taskman.Queue("test_queue", driver);
worker = new taskman.Worker(queue, driver, "test_worker", function(data, callback){
console.log(data);
callback();
});
queue.rpush("Hello World", function(){
worker.start();
});
Via npm:
$ npm install node-taskman
As a submodule of your project
$ git submodule add http://github.com/neoziro/node-taskman.git node-taskman
$ git submodule update --init
Create a redis driver, currently the only driver supported by taskman.
Options avalaible are :
port
: Port, default is the redis port, so6379
.host
: Host, the host, default is127.0.0.1
.db
: The database, default is 0. The redis driver will perform aselect
in the initialization.queuePrefix
: The queue prefix, defaultqueue
. If you name your queue "my_queue", the complete name will be "queue:my_queue".workerPrefix
: The worker prefix, defaultworker
. If you name your worker "my_worker", the complete name will be "worker:my_worker".
Create a new queue, with a name
and a driver
name
: The name of the queuedriver
: The driver to use.
var taskman = require("node-taskman"), driver, queue;
driver = new taskman.driver.RedisDriver();
queue = new taskman.Queue("my_queue", driver);
Push a data at the end of the list.
data
: The data to push, only string is supported.callback(err, res)
: Called when command is done.
Push a data at the beginning of the list.
data
: The data to push, only string is supported.callback(err, res)
: Called when command is done.
Remove and get the number of element specified at the end of a queue. If you use rpush to add data to the list, the type is LIFO. We can pass a number to get several data in the queue using pipeling to be more performant.
number
: Number of data to get.callback(err, res)
: Called when command is done.
Remove and get the number of element specified at the end of a queue. If you use rpush to add data to the list, the type is FIFO. We can pass a number to get several data in the queue using pipeling to be more performant.
number
: Number of data to get.callback(err, res)
: Called when command is done.
Count the number of element in the queue.
callback(err, res)
: Called when command is done.
Create a worker to take items of a queue and execute an action.
queue
: TheQueue
that will be processed.driver
: The driver to use to stock informations of the worker.name
: The name of the worker, to be identified.action(data, callback, worker)
: A function called when a data is pop from the queuedata
: An array of values get from the queue, the number of values is the same, but empty values are filled by null.callback
: The callback to call to trigger the next tick of the worker.worker
: The current worker instance.
options
: Optionstype
: FIFO or LIFOloopSleepTime
: The time in seconds to sleep between two tick, default0
.pauseSleepTime
: The time in seconds to sleep between each tick in pause mode, default5
.waitingTimeout
: The time in seconds to wait when the queue is empty, default1
.dataPerTick
: The number of item popped from the queue, default1
.expire
: Expire time in seconds, default17280000
(200 days).unique
: A key can't be duplicated in queue, defaultfalse
.timeout
: A timeout for the pop's action, default300000
(5 minutes).
var taskman = require("node-taskman"), driver, queue, worker;
driver = new taskman.driver.RedisDriver();
queue = new taskman.Queue("test_queue", driver);
worker = new taskman.Worker(queue, driver, "test_worker", function(data, callback){
console.log(data);
callback();
}, {dataPerTick: 2});
worker.start();
Start the worker, execute the callback
when it's started.
callback(err, res)
: Called when the command is done.
Stop the worker, execute the callback
when it's stopped.
callback(err, res)
: Called when the command is done.
Pause the worker, execute the callback
when it's paused.
callback(err, res)
: Called when the command is done.
Set an end date to the worker, when the end date is reached, the worker die.
date
: Date at json format (new Date().toJSON()
).callback(err, res)
: Called when the command is done.
Change the loop sleep time.
time
: Time in seconds.callback(err, res)
: Called when the command is done.
Change the pause sleep time.
time
: Time in seconds.callback(err, res)
: Called when the command is done.
Change the number of items returned in each tick.
number
: Number of items per tick.callback(err, res)
: Called when the command is done.
Change the waiting timeout of the worker.
timeout
: Timeout in second.callback(err, res)
: Called when the command is done.
Change the action executed by the worker.
action
: Action executed by the worker.callback(err, res)
: Called when the command is done.
Change the type of the worker.
type
: Type of the process, LIFO or FIFO.callback(err, res)
: Called when the command is done.
Get the complete name of the worker.
return
a string that is the complete name of the worker.
Get all infos of the worker.
callback(err, res)
: Called when the command is done.
In the module, there is a script named taskman-caller.js, with this script you can spawn a worker that call a simple command.
$ node bin/taskman-caller.js -w "my_worker" -q "my_queue" -a "echo \"##data##\"" -s --output
With this simple line you call a worker that echo the data pushed in the queue.
Options :
-a, --action The script to execute, with ##data## as json data [required]
--timeout The timeout of the action in milliseconds
-w, --worker The name of the worker [required]
-o, --worker-options Options of the worker in json
-q, --queue The name of the queue [required]
-p, --port The redis port
-h, --host The redis host
--database The redis database
-s, --simple The simple mode, only one data without an array is pop
--output Transfer stdout from action
Every information of the worker are avalaible in the redis database, an example :
var taskman = require("node-taskman"), driver, queue, worker;
driver = new taskman.driver.RedisDriver();
queue = new taskman.Queue("test_queue", driver);
worker = new taskman.Worker(queue, driver, "test_worker", function(data, callback){
console.log(data);
callback();
});
queue.rpush("Hello World", function(){
worker.start();
});
When this script is executed, we can connect redis and execute some commands :
redis-cli
redis 127.0.0.1:6379> hgetall worker:test_queue:test_worker
1) "waiting_timeout"
2) "1"
3) "loop_sleep"
4) "0"
5) "pause_sleep_time"
6) "5"
7) "data_per_tick"
8) "1"
9) "action"
10) "function (data, callback){\n console.log(data);\n callback();\n}"
11) "type"
12) "FIFO"
13) "language"
14) "nodejs"
15) "process_id"
16) "23985"
17) "status"
18) "WAITING"
19) "status_changedate"
20) "2012-06-05T09:27:02.693Z"
21) "cpt_action"
22) "1"
23) "start_date"
24) "2012-06-05T09:26:42.653Z"
25) "end_date"
26) "2012-06-05T14:14:42.654Z"
And we can see some informations :
language
: The language that execute the workerprocess_id
: The PID of the workerstatus
: The current status of the workerstatus_changedate
: The last movecpt_action
: The number of tick
We have our parameters too, and we can change them :
redis 127.0.0.1:6379> hset worker:test_queue:test_worker loop_sleep 20
(integer) 0
So in real time you can control your worker !
Soon :)
Copyright (c) 2012 Bergé Greg
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Written and maintained by Greg Bergé.
An original idea by David Desbouis.
Build an used on Le Monde.fr.