Skip to content

Commit

Permalink
feature: independent meta for ordered jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
bibendi committed Sep 14, 2015
1 parent bc27294 commit 333cf82
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 16 deletions.
26 changes: 23 additions & 3 deletions lib/resque/integration/ordered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# unique { |company_id, param1| [company_id] }
# ordered max_iterations: 10
#
# def self.execute(company_id, param1)
# def self.execute(meta, company_id, param1)
# heavy_lifting_work
# end
# end
Expand All @@ -35,19 +35,35 @@ module ClassMethods
def enqueue_with_ordered(*args)
meta = enqueue_without_ordered(*args)

ordered_meta = Resque::Plugins::Meta::Metadata.new('meta_id' => ordered_meta_id(args), 'job_class' => self)
ordered_meta.save

args.unshift(ordered_meta.meta_id)
encoded_args = Resque.encode(args)
args_key = ordered_queue_key(meta.meta_id)

Resque.redis.rpush(args_key, encoded_args)
Resque.redis.expire(args_key, ARGS_EXPIRATION)

meta
ordered_meta
end

def perform(meta_id, *)
args_key = ordered_queue_key(meta_id)
i = 1
while job_args = Resque.redis.lpop(args_key)
execute(*Resque.decode(job_args))
job_args = Resque.decode(job_args)
ordered_meta = get_meta(job_args.shift)
ordered_meta.start!

begin
execute(ordered_meta, *job_args)
rescue Exception
ordered_meta.fail!
raise
end

ordered_meta.finish!

i += 1
return continue if max_iterations && i > max_iterations
Expand All @@ -61,6 +77,10 @@ def ordered_queue_size(meta_id)
def ordered_queue_key(meta_id)
"ordered:#{meta_id}"
end

def ordered_meta_id(args)
Digest::SHA1.hexdigest([Time.now.to_f, rand, self, args].join)
end
end
end
end
Expand Down
1 change: 1 addition & 0 deletions resque-integration.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ Gem::Specification.new do |gem|
gem.add_development_dependency 'rspec', '~> 2.14'
gem.add_development_dependency 'simplecov'
gem.add_development_dependency 'mock_redis'
gem.add_development_dependency 'pry-debugger'
end
36 changes: 23 additions & 13 deletions spec/resque/integration/ordered_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,43 @@ class TestJob
end

it "push args to separate queue" do
meta = TestJob.enqueue(1, 10)
TestJob.enqueue(1, 20)
ordered_meta1 = TestJob.enqueue(1, 10)
ordered_meta2 = TestJob.enqueue(1, 20)

meta_id = TestJob.meta_id(1, 10)
args_key = TestJob.ordered_queue_key(meta_id)

args_key = TestJob.ordered_queue_key(meta.meta_id)
expect(TestJob).to be_enqueued(1)
expect(TestJob.ordered_queue_size(meta.meta_id)).to eq 2
expect(TestJob.ordered_queue_size(meta_id)).to eq 2

job_args = Resque.decode(Resque.redis.lpop(args_key))
expect(job_args[0]).to eq ordered_meta1.meta_id

job_args = Resque.decode(Resque.redis.lpop(args_key))
expect(job_args[0]).to eq ordered_meta2.meta_id
end

it "execute jobs by each args" do
meta = TestJob.enqueue(1, 10)
TestJob.enqueue(1, 10)
TestJob.enqueue(1, 20)

expect(TestJob).to receive(:execute).with(1, 10).ordered
expect(TestJob).to receive(:execute).with(1, 20).ordered
expect(TestJob).to receive(:execute).with(kind_of(Resque::Plugins::Meta::Metadata), 1, 10).ordered
expect(TestJob).to receive(:execute).with(kind_of(Resque::Plugins::Meta::Metadata), 1, 20).ordered

TestJob.perform(meta.meta_id)
meta_id = TestJob.meta_id(1, 10)
TestJob.perform(meta_id)
end

it "reenqueue job after max iterations reached" do
meta = TestJob.enqueue(1, 10)
TestJob.enqueue(1, 10)
TestJob.enqueue(1, 20)
TestJob.enqueue(1, 30)

expect(TestJob).to receive(:execute).with(1, 10).ordered
expect(TestJob).to receive(:execute).with(1, 20).ordered
expect(TestJob).to_not receive(:execute).with(1, 30).ordered
expect(TestJob).to receive(:execute).with(kind_of(Resque::Plugins::Meta::Metadata), 1, 10).ordered
expect(TestJob).to receive(:execute).with(kind_of(Resque::Plugins::Meta::Metadata), 1, 20).ordered
expect(TestJob).to_not receive(:execute).with(kind_of(Resque::Plugins::Meta::Metadata), 1, 30).ordered

TestJob.perform(meta.meta_id)
meta_id = TestJob.meta_id(1, 10)
TestJob.perform(meta_id)
end
end
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# coding: utf-8
require 'bundler/setup'
require 'pry-debugger'
require 'rspec'
require 'resque'
require 'simplecov'
Expand Down

0 comments on commit 333cf82

Please sign in to comment.