Skip to content

Commit

Permalink
fix: proper implementation for allow dequeue job (#106)
Browse files Browse the repository at this point in the history
- Remove meta, meta should be only in unique job
- Save priority value in args for allowing dequeue job

https://jira.railsc.ru/browse/USERS-421
  • Loading branch information
artofhuman authored Jun 23, 2017
1 parent 2d6bcef commit 77168c9
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 36 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ test/version_tmp
tmp
.idea
.ruby-version
spec/internal/log/*.log
5 changes: 5 additions & 0 deletions lib/resque/integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ def unique?
false
end

# Public: job used priority queues
def priority?
false
end

# extend resque-retry
#
# options - Hash
Expand Down
1 change: 1 addition & 0 deletions lib/resque/integration/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class Engine < Rails::Engine

initializer "resque-integration.extensions" do
::Resque::Worker.send :include, ::Resque::Integration::Extensions::Worker
::Resque::Job.singleton_class.prepend(::Resque::Integration::Extensions::Job)
end
end # class Engine
end # module Resque::Integration
1 change: 1 addition & 0 deletions lib/resque/integration/extensions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module Resque
module Integration
module Extensions
autoload :Worker, "resque/integration/extensions/worker"
autoload :Job, "resque/integration/extensions/job"
end
end
end
17 changes: 17 additions & 0 deletions lib/resque/integration/extensions/job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module Resque
module Integration
module Extensions
# Public: extension for proper determine queue
# when destroy job with priority
module Job
def destroy(queue, klass, *args)
if klass.respond_to?(:priority?) && klass.priority?
queue = klass.priority_queue(args.last)
end

super
end
end
end
end
end
75 changes: 51 additions & 24 deletions lib/resque/integration/priority.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,56 +8,83 @@ module Integration
# include Resque::Integration::Priority
#
# queue :foo
#
# def self.execute(*args)
# meta = get_meta
#
# heavy_lifting_work do
# meta[:count] += 1
# end
# end
# end
#
# MyJob.enqueue(1, another_param: 2, queue_priority: high) # enqueue job to :foo_high queue
# MyJob.enqueue(1, another_param: 2, queue_priority: low) # enqueue job to :foo_low queue
# MyJob.enqueue_with_priority(:high, 1, another_param: 2) # enqueue job to :foo_high queue
# MyJob.enqueue_with_priority(:low, 1, another_param: 2) # enqueue job to :foo_low queue
#
# class MyUniqueJob
# include Resque::Integration
# include Resque::Integration::Priority
#
# queue :foo
# unique
#
# def self.execute(*args)
# meta = get_meta
#
# heavy_lifting_work do
# meta[:count] += 1
# end
# end
# end
module Priority
def self.included(base)
base.extend(Resque::Plugins::Meta)
base.extend(ClassMethods)
base.singleton_class.prepend(Enqueue)
end

module ClassMethods
def priority?
true
end
end

module Enqueue
# Public: enqueue job with normal priority
#
# Example:
# MyJob.enqueue(1)
def enqueue(*args)
priority = args.last.delete(:queue_priority) { :normal }.to_sym

priority_queue = priority == :normal ? queue : "#{queue}_#{priority}".to_sym
enqueue_with_priority(:normal, *args)
end

# Public: dequeue job with priority
#
# Example:
# MyJob.dequeue(:high, 1)
def dequeue(priority, *args)
if unique?
enqueue_to(priority_queue, *args)
super(*args, priority)
else
Resque::Plugins::Meta::Metadata.new('meta_id' => meta_id(args), 'job_class' => to_s).tap do |meta|
meta.save
Resque.enqueue_to(priority_queue, self, meta.meta_id, *args)
end
Resque.dequeue(self, *args, priority)
end
end

def perform(meta_id, *args)
@meta_id = meta_id
# Public: enqueue job to priority queue
#
# Example:
# MyJob.enqueue_with_priority(:high, 1)
def enqueue_with_priority(priority, *args)
queue = priority_queue(priority)

if unique?
enqueue_to(queue, *args, priority)
else
Resque.enqueue_to(queue, self, *args, priority)
end
end

execute(*args)
def priority_queue(priority)
priority.to_sym == :normal ? queue : "#{queue}_#{priority}".to_sym
end

def meta
get_meta(@meta_id)
def perform(*args, _priority)
if unique?
super(*args)
else
execute(*args)
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/resque/integration/unique.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def lock_on(&block)
# LockID should be independent from MetaID
# @api private
def lock(meta_id, *args)
args = [*args[0..-2], args.last.with_indifferent_access] if args.last.is_a?(Hash)
args = args.map { |i| i.is_a?(Hash) ? i.with_indifferent_access : i }

"lock:#{name}-#{Digest::SHA1.hexdigest(obj_to_string(lock_on[*args]))}"
end
Expand Down
1 change: 1 addition & 0 deletions resque-integration.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ Gem::Specification.new do |gem|
gem.add_development_dependency 'simplecov'
gem.add_development_dependency 'mock_redis'
gem.add_development_dependency 'timecop'
gem.add_development_dependency 'combustion'
gem.add_development_dependency 'pry-byebug'
end
42 changes: 31 additions & 11 deletions spec/resque/integration/priority_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ class UniqueJobWithPriority
include Resque::Integration::Priority

queue :foo
unique
unique do |id, params|
[id, params["param"]]
end

def self.execute(id, params)
end
end

describe '#enqueue' do
it 'enqueue to priority queue' do
meta = JobWithPriority.enqueue(1, param: 'one', queue_priority: :high)

expect(meta.meta_id).to be
JobWithPriority.enqueue_with_priority(:high, 1, param: 'one')

expect(Resque.size(:foo)).to eq(0)
expect(Resque.size(:foo_low)).to eq(0)
Expand All @@ -42,8 +42,8 @@ def self.execute(id, params)
end

it 'enqueue only one job' do
meta1 = UniqueJobWithPriority.enqueue(1, param: 'one', queue_priority: :high)
meta2 = UniqueJobWithPriority.enqueue(1, param: 'one', queue_priority: :high)
meta1 = UniqueJobWithPriority.enqueue_with_priority(:high, 1, param: 'one')
meta2 = UniqueJobWithPriority.enqueue_with_priority(:high, 1, param: 'one')

expect(meta1.meta_id).to eq(meta2.meta_id)

Expand All @@ -53,22 +53,42 @@ def self.execute(id, params)
end
end

describe '#dequeue' do
it 'dequeue simple job with high priority' do
JobWithPriority.enqueue_with_priority(:high, 1, param: 'one')
JobWithPriority.enqueue_with_priority(:high, 2, param: 'two')
expect(Resque.size(:foo_high)).to eq(2)

JobWithPriority.dequeue(:high, 1, param: 'one')
expect(Resque.size(:foo_high)).to eq(1)
end

it 'dequeue unique job with high priority' do
UniqueJobWithPriority.enqueue_with_priority(:high, 1, param: 'one')
UniqueJobWithPriority.enqueue_with_priority(:high, 2, param: 'two')
expect(Resque.size(:foo_high)).to eq(2)

UniqueJobWithPriority.dequeue(:high, 1, param: 'one')
expect(Resque.size(:foo_high)).to eq(1)
end
end

describe '#perform' do
include_context 'resque inline'

it 'executes job' do
expect(JobWithPriority).to receive(:execute).with(1, 'param' => 'one').once.and_call_original
expect(JobWithPriority).to receive(:execute).with(2, 'param' => 'two').once.and_call_original

JobWithPriority.enqueue(1, param: 'one', queue_priority: :high)
JobWithPriority.enqueue(2, param: 'two', queue_priority: :low)
JobWithPriority.enqueue_with_priority(:high, 1, param: 'one')
JobWithPriority.enqueue_with_priority(:high, 2, param: 'two')
end

it 'executes job' do
expect(UniqueJobWithPriority).to receive(:execute).with(1, 'param' => 'one').twice.and_call_original
expect(UniqueJobWithPriority).to receive(:execute).twice.and_call_original

UniqueJobWithPriority.enqueue(1, param: 'one', queue_priority: :high)
UniqueJobWithPriority.enqueue(1, param: 'one', queue_priority: :high)
UniqueJobWithPriority.enqueue_with_priority(:high, 1, param: 'one')
UniqueJobWithPriority.enqueue_with_priority(:high, 1, param: 'one')
end
end
end
4 changes: 4 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
require 'simplecov'
require 'mock_redis'
require 'timecop'
require 'pry-byebug'
require 'combustion'

Resque.redis = MockRedis.new

SimpleCov.start

require 'resque/integration'

Combustion.initialize!

Dir["./spec/shared/**/*.rb"].each(&method(:require))

RSpec.configure do |config|
Expand Down

0 comments on commit 77168c9

Please sign in to comment.