An extensible thread safe job/message queueing system that uses mongodb as the persistent storage engine.
-
Fully Configurable
-
Distributed
-
Atomic Locking
-
Thread Safe
-
Priority Support
-
Worker Timeout Support
-
Fully Tested
-
Production Ready
-
Simple API
When instantiating the MonogoQueue, there are several options which can be used to configure how it will act. You can configure everything from where the data is stored, to the amount of time which a message can be locked, to the number of times the job will be attempted, and more. Below are the default configuration options which can be adjusted to your taste.
db = Mongo::Connection.new options = { :database => 'mongo_queue', :collection => 'mongo_queue', :timeout => 300, # number of seconds that an item will remain locked :attempts => 3 # number of times attempts to lock and complete the item before ignoring } queue = Mongo::Queue.new(db, options)
A job is inserted as a hash in to the document store, but you will want to avoid using the protected keys which enable Mongo::Queue to operate effectively as a queue system. Avoid using the following fields in your queue messages;
-
locked_at
-
locked_by
-
attempts
-
timeout
-
time
-
priority
-
last_error
job = { :site => 'http://example.com/', :foo => 'More Stuff', :priority => 10 } queue.insert(job)
Generally when using a message queue, you will want to iterate over job items and continue to do so until all jobs have been completed. This can be completed within a loop, and even be threaded easily. You will want to make sure that you generate a unique string which you can reference the worker with throughout it’s lifetime.
require 'digest/md5' require 'rubygems' require 'mongo' require 'mongo_queue' db = Mongo::Connection.new('localhost') queue = Mongo::Queue.new(db) process_id = Digest::MD5.hexdigest("#{Socket.gethostname}-#{Process.pid}-#{Thread.current}") queue.cleanup! # remove expired locks while(doc = queue.lock_next(process_id)) begin do_some_work(doc) queue.complete(doc, process_id) sleep(1) rescue StandardError => e queue.error(doc, e.message) end end
There is an additional stat
method available on the queue object which provides a hash with some information that may be useful when reviewing the current contents of the queue.
queue.stat # => {:locked => 34, :available => 34536, :errors => 0, :total => 34570}
Copyright © 2010 Josh Martin. See LICENSE for details.