diff --git a/README.md b/README.md index bf52c90..4b63bec 100644 --- a/README.md +++ b/README.md @@ -260,6 +260,18 @@ class ResqueJobTest end ``` +При необходимости, можно добиться уникальности упорядоченных джобов, указав параметры в опции `unique` + +```ruby +class UniqueOrderedJob + include Resque::Integration + + unique { |company_id, param1| [company_id] } + ordered max_iterations: 10, unique: ->(_company_id, param1) { [param1] } + ... +end +``` + ## Contributing 1. Fork it ( https://github.com/abak-press/resque-integration/fork ) diff --git a/lib/resque/integration.rb b/lib/resque/integration.rb index 181cf8e..e7c0775 100644 --- a/lib/resque/integration.rb +++ b/lib/resque/integration.rb @@ -113,6 +113,7 @@ def ordered(options = {}) extend Ordered self.max_iterations = options.fetch(:max_iterations, 20) + self.uniqueness = Ordered::Uniqueness.new(&options[:unique]) if options.key?(:unique) end end end # module Integration diff --git a/lib/resque/integration/ordered.rb b/lib/resque/integration/ordered.rb index 62f05f6..92c8ef8 100644 --- a/lib/resque/integration/ordered.rb +++ b/lib/resque/integration/ordered.rb @@ -16,6 +16,14 @@ # end # end # +# class UniqueTestJob +# include Resque::Integration +# +# unique { |company_id, param1| [company_id] } +# ordered max_iterations: 10, unique: ->(_company_id, param1) { [param1] } +# ... +# end +# module Resque module Integration module Ordered @@ -25,19 +33,58 @@ def self.extended(base) base.extend ClassMethods base.singleton_class.class_eval do - attr_accessor :max_iterations + attr_accessor :max_iterations, :uniqueness alias_method_chain :enqueue, :ordered end end + class Uniqueness + def initialize(&block) + @unique_block = block + end + + def key(meta_id) + "ordered:unique:#{meta_id}" + end + + def remove(meta_id, args) + Resque.redis.hdel(key(meta_id), encoded_unique_args(args)) + end + + def size(meta_id) + Resque.redis.hlen(key(meta_id)).to_i + end + + def encoded_unique_args(args) + Resque.encode(@unique_block.call(*args)) + end + + def ordered_meta_id(meta_id, args) + Resque.redis.hget(key(meta_id), encoded_unique_args(args)) + end + + def set(meta_id, args, ordered_meta_id) + unique_key = key(meta_id) + + if Resque.redis.hset(unique_key, encoded_unique_args(args), ordered_meta_id) + Resque.redis.expire(unique_key, ARGS_EXPIRATION) + end + end + end + module ClassMethods def enqueue_with_ordered(*args) meta = enqueue_without_ordered(*args) + if uniqueness && ordered_meta_id = uniqueness.ordered_meta_id(meta.meta_id, args) + return get_meta(ordered_meta_id) + end + ordered_meta = Resque::Plugins::Meta::Metadata.new('meta_id' => ordered_meta_id(args), 'job_class' => self) ordered_meta.save + uniqueness.set(meta.meta_id, args, ordered_meta.meta_id) if uniqueness args.unshift(ordered_meta.meta_id) encoded_args = Resque.encode(args) args_key = ordered_queue_key(meta.meta_id) @@ -61,6 +108,8 @@ def perform(meta_id, *) rescue Exception ordered_meta.fail! raise + ensure + uniqueness.remove(meta_id, job_args) if uniqueness end ordered_meta.finish! diff --git a/spec/resque/integration/ordered_spec.rb b/spec/resque/integration/ordered_spec.rb index c7a0c1a..6265a93 100644 --- a/spec/resque/integration/ordered_spec.rb +++ b/spec/resque/integration/ordered_spec.rb @@ -8,6 +8,13 @@ class TestJob ordered max_iterations: 2 end + class UniqueTestJob + include Resque::Integration + + unique { |company_id, param1| [company_id] } + ordered max_iterations: 2, unique: ->(_company_id, param1) { [param1] } + end + it "push args to separate queue" do ordered_meta1 = TestJob.enqueue(1, 10) ordered_meta2 = TestJob.enqueue(1, 20) @@ -54,4 +61,27 @@ class TestJob TestJob.perform(meta_id) expect(TestJob.ordered_queue_size(meta_id)).to eq 2 end + + context 'uniqueness' do + it 'perform with unique args only once' do + UniqueTestJob.enqueue(1, 10) + UniqueTestJob.enqueue(1, 20) + UniqueTestJob.enqueue(1, 10) + + expect(UniqueTestJob).to receive(:execute).once.with(kind_of(Resque::Plugins::Meta::Metadata), 1, 10).ordered + expect(UniqueTestJob).to receive(:execute).once.with(kind_of(Resque::Plugins::Meta::Metadata), 1, 20).ordered + expect(UniqueTestJob).to_not receive(:continue) + + meta_id = UniqueTestJob.meta_id(1, 10) + UniqueTestJob.perform(meta_id) + expect(UniqueTestJob.ordered_queue_size(meta_id)).to eq 0 + expect(UniqueTestJob.uniqueness.size(meta_id)).to eq 0 + end + + it 'enqueue unique jobs with equal meta' do + meta = UniqueTestJob.enqueue(1, 10) + expect(meta.meta_id).to eq UniqueTestJob.enqueue(1, 10).meta_id + expect(meta.meta_id).to_not eq UniqueTestJob.enqueue(1, 20).meta_id + end + end end