Skip to content

Commit

Permalink
feat: unique args in ordered jobs
Browse files Browse the repository at this point in the history
Closes SERVICES-577
  • Loading branch information
Korotaev Danil committed Oct 6, 2015
1 parent cd1b41d commit 272ede7
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 1 deletion.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,18 @@ class ResqueJobTest
end
```

При необходимости, можно добиться уникальности упорядоченных джобов, указав параметры в опции `unique`

```ruby
class UniqueOrderedJob
include Resque::Integration

unique { |company_id, param1| [company_id] }
ordered max_iterations: 10, unique: ->(_company_id, param1) { [param1] }
...
end
```

## Contributing

1. Fork it ( https://github.com/abak-press/resque-integration/fork )
Expand Down
1 change: 1 addition & 0 deletions lib/resque/integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def ordered(options = {})
extend Ordered

self.max_iterations = options.fetch(:max_iterations, 20)
self.uniqueness = Ordered::Uniqueness.new(&options[:unique]) if options.key?(:unique)
end
end
end # module Integration
Expand Down
51 changes: 50 additions & 1 deletion lib/resque/integration/ordered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@
# end
# end
#
# class UniqueTestJob
# include Resque::Integration
#
# unique { |company_id, param1| [company_id] }
# ordered max_iterations: 10, unique: ->(_company_id, param1) { [param1] }
# ...
# end
#
module Resque
module Integration
module Ordered
Expand All @@ -25,19 +33,58 @@ def self.extended(base)
base.extend ClassMethods

base.singleton_class.class_eval do
attr_accessor :max_iterations
attr_accessor :max_iterations, :uniqueness

alias_method_chain :enqueue, :ordered
end
end

class Uniqueness
def initialize(&block)
@unique_block = block
end

def key(meta_id)
"ordered:unique:#{meta_id}"
end

def remove(meta_id, args)
Resque.redis.hdel(key(meta_id), encoded_unique_args(args))
end

def size(meta_id)
Resque.redis.hlen(key(meta_id)).to_i
end

def encoded_unique_args(args)
Resque.encode(@unique_block.call(*args))
end

def ordered_meta_id(meta_id, args)
Resque.redis.hget(key(meta_id), encoded_unique_args(args))
end

def set(meta_id, args, ordered_meta_id)
unique_key = key(meta_id)

if Resque.redis.hset(unique_key, encoded_unique_args(args), ordered_meta_id)
Resque.redis.expire(unique_key, ARGS_EXPIRATION)
end
end
end

module ClassMethods
def enqueue_with_ordered(*args)
meta = enqueue_without_ordered(*args)

if uniqueness && ordered_meta_id = uniqueness.ordered_meta_id(meta.meta_id, args)
return get_meta(ordered_meta_id)
end

ordered_meta = Resque::Plugins::Meta::Metadata.new('meta_id' => ordered_meta_id(args), 'job_class' => self)
ordered_meta.save

uniqueness.set(meta.meta_id, args, ordered_meta.meta_id) if uniqueness
args.unshift(ordered_meta.meta_id)
encoded_args = Resque.encode(args)
args_key = ordered_queue_key(meta.meta_id)
Expand All @@ -61,6 +108,8 @@ def perform(meta_id, *)
rescue Exception
ordered_meta.fail!
raise
ensure
uniqueness.remove(meta_id, job_args) if uniqueness
end

ordered_meta.finish!
Expand Down
30 changes: 30 additions & 0 deletions spec/resque/integration/ordered_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ class TestJob
ordered max_iterations: 2
end

class UniqueTestJob
include Resque::Integration

unique { |company_id, param1| [company_id] }
ordered max_iterations: 2, unique: ->(_company_id, param1) { [param1] }
end

it "push args to separate queue" do
ordered_meta1 = TestJob.enqueue(1, 10)
ordered_meta2 = TestJob.enqueue(1, 20)
Expand Down Expand Up @@ -54,4 +61,27 @@ class TestJob
TestJob.perform(meta_id)
expect(TestJob.ordered_queue_size(meta_id)).to eq 2
end

context 'uniqueness' do
it 'perform with unique args only once' do
UniqueTestJob.enqueue(1, 10)
UniqueTestJob.enqueue(1, 20)
UniqueTestJob.enqueue(1, 10)

expect(UniqueTestJob).to receive(:execute).once.with(kind_of(Resque::Plugins::Meta::Metadata), 1, 10).ordered
expect(UniqueTestJob).to receive(:execute).once.with(kind_of(Resque::Plugins::Meta::Metadata), 1, 20).ordered
expect(UniqueTestJob).to_not receive(:continue)

meta_id = UniqueTestJob.meta_id(1, 10)
UniqueTestJob.perform(meta_id)
expect(UniqueTestJob.ordered_queue_size(meta_id)).to eq 0
expect(UniqueTestJob.uniqueness.size(meta_id)).to eq 0
end

it 'enqueue unique jobs with equal meta' do
meta = UniqueTestJob.enqueue(1, 10)
expect(meta.meta_id).to eq UniqueTestJob.enqueue(1, 10).meta_id
expect(meta.meta_id).to_not eq UniqueTestJob.enqueue(1, 20).meta_id
end
end
end

0 comments on commit 272ede7

Please sign in to comment.