diff --git a/lib/resque/integration/ordered.rb b/lib/resque/integration/ordered.rb index df84bb6..aabc172 100644 --- a/lib/resque/integration/ordered.rb +++ b/lib/resque/integration/ordered.rb @@ -11,7 +11,7 @@ # unique { |company_id, param1| [company_id] } # ordered max_iterations: 10 # -# def self.execute(company_id, param1) +# def self.execute(meta, company_id, param1) # heavy_lifting_work # end # end @@ -35,19 +35,35 @@ module ClassMethods def enqueue_with_ordered(*args) meta = enqueue_without_ordered(*args) + ordered_meta = Resque::Plugins::Meta::Metadata.new('meta_id' => ordered_meta_id(args), 'job_class' => self) + ordered_meta.save + + args.unshift(ordered_meta.meta_id) encoded_args = Resque.encode(args) args_key = ordered_queue_key(meta.meta_id) + Resque.redis.rpush(args_key, encoded_args) Resque.redis.expire(args_key, ARGS_EXPIRATION) - meta + ordered_meta end def perform(meta_id, *) args_key = ordered_queue_key(meta_id) i = 1 while job_args = Resque.redis.lpop(args_key) - execute(*Resque.decode(job_args)) + job_args = Resque.decode(job_args) + ordered_meta = get_meta(job_args.shift) + ordered_meta.start! + + begin + execute(ordered_meta, *job_args) + rescue Exception + ordered_meta.fail! + raise + end + + ordered_meta.finish! i += 1 return continue if max_iterations && i > max_iterations @@ -61,6 +77,10 @@ def ordered_queue_size(meta_id) def ordered_queue_key(meta_id) "ordered:#{meta_id}" end + + def ordered_meta_id(args) + Digest::SHA1.hexdigest([Time.now.to_f, rand, self, args].join) + end end end end diff --git a/resque-integration.gemspec b/resque-integration.gemspec index d168bba..5837481 100644 --- a/resque-integration.gemspec +++ b/resque-integration.gemspec @@ -38,4 +38,5 @@ Gem::Specification.new do |gem| gem.add_development_dependency 'rspec', '~> 2.14' gem.add_development_dependency 'simplecov' gem.add_development_dependency 'mock_redis' + gem.add_development_dependency 'pry-debugger' end diff --git a/spec/resque/integration/ordered_spec.rb b/spec/resque/integration/ordered_spec.rb index ad16992..1145f0a 100644 --- a/spec/resque/integration/ordered_spec.rb +++ b/spec/resque/integration/ordered_spec.rb @@ -9,33 +9,43 @@ class TestJob end it "push args to separate queue" do - meta = TestJob.enqueue(1, 10) - TestJob.enqueue(1, 20) + ordered_meta1 = TestJob.enqueue(1, 10) + ordered_meta2 = TestJob.enqueue(1, 20) + + meta_id = TestJob.meta_id(1, 10) + args_key = TestJob.ordered_queue_key(meta_id) - args_key = TestJob.ordered_queue_key(meta.meta_id) expect(TestJob).to be_enqueued(1) - expect(TestJob.ordered_queue_size(meta.meta_id)).to eq 2 + expect(TestJob.ordered_queue_size(meta_id)).to eq 2 + + job_args = Resque.decode(Resque.redis.lpop(args_key)) + expect(job_args[0]).to eq ordered_meta1.meta_id + + job_args = Resque.decode(Resque.redis.lpop(args_key)) + expect(job_args[0]).to eq ordered_meta2.meta_id end it "execute jobs by each args" do - meta = TestJob.enqueue(1, 10) + TestJob.enqueue(1, 10) TestJob.enqueue(1, 20) - expect(TestJob).to receive(:execute).with(1, 10).ordered - expect(TestJob).to receive(:execute).with(1, 20).ordered + expect(TestJob).to receive(:execute).with(kind_of(Resque::Plugins::Meta::Metadata), 1, 10).ordered + expect(TestJob).to receive(:execute).with(kind_of(Resque::Plugins::Meta::Metadata), 1, 20).ordered - TestJob.perform(meta.meta_id) + meta_id = TestJob.meta_id(1, 10) + TestJob.perform(meta_id) end it "reenqueue job after max iterations reached" do - meta = TestJob.enqueue(1, 10) + TestJob.enqueue(1, 10) TestJob.enqueue(1, 20) TestJob.enqueue(1, 30) - expect(TestJob).to receive(:execute).with(1, 10).ordered - expect(TestJob).to receive(:execute).with(1, 20).ordered - expect(TestJob).to_not receive(:execute).with(1, 30).ordered + expect(TestJob).to receive(:execute).with(kind_of(Resque::Plugins::Meta::Metadata), 1, 10).ordered + expect(TestJob).to receive(:execute).with(kind_of(Resque::Plugins::Meta::Metadata), 1, 20).ordered + expect(TestJob).to_not receive(:execute).with(kind_of(Resque::Plugins::Meta::Metadata), 1, 30).ordered - TestJob.perform(meta.meta_id) + meta_id = TestJob.meta_id(1, 10) + TestJob.perform(meta_id) end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 14731f6..38311be 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,5 +1,6 @@ # coding: utf-8 require 'bundler/setup' +require 'pry-debugger' require 'rspec' require 'resque' require 'simplecov'