From 1daa167af63888442d09c20463fda9df512f6a6e Mon Sep 17 00:00:00 2001 From: Michail Merkushin Date: Thu, 10 Sep 2015 16:09:34 +0500 Subject: [PATCH] feature: add ordered jobs https://jira.railsc.ru/browse/PC4-15507 --- lib/resque/integration.rb | 12 ++++- lib/resque/integration/ordered.rb | 67 +++++++++++++++++++++++++ spec/resque/integration/ordered_spec.rb | 41 +++++++++++++++ spec/spec_helper.rb | 8 ++- 4 files changed, 126 insertions(+), 2 deletions(-) create mode 100644 lib/resque/integration/ordered.rb create mode 100644 spec/resque/integration/ordered_spec.rb diff --git a/lib/resque/integration.rb b/lib/resque/integration.rb index 08b91f3..b42a6ad 100644 --- a/lib/resque/integration.rb +++ b/lib/resque/integration.rb @@ -51,6 +51,7 @@ module Integration autoload :Configuration, 'resque/integration/configuration' autoload :Continuous, 'resque/integration/continuous' autoload :Unique, 'resque/integration/unique' + autoload :Ordered, 'resque/integration/ordered' autoload :LogsRotator, 'resque/integration/logs_rotator' extend ActiveSupport::Concern @@ -72,7 +73,7 @@ def queue(name = nil) end # Mark Job as unique and set given +callback+ or +block+ as Unique Arguments procedure - def unique(callback=nil, &block) + def unique(callback = nil, &block) extend Unique lock_on(&(callback || block)) @@ -104,6 +105,15 @@ def retrys(options = {}) @retry_limit = options.fetch(:limit, 2) @retry_delay = options.fetch(:delay, 60) end + + # Mark Job as ordered + def ordered(options = {}) + unique unless unique? + continuous + extend Ordered + + self.max_iterations = options.fetch(:max_iterations, 20) + end end end # module Integration end # module Resque diff --git a/lib/resque/integration/ordered.rb b/lib/resque/integration/ordered.rb new file mode 100644 index 0000000..df84bb6 --- /dev/null +++ b/lib/resque/integration/ordered.rb @@ -0,0 +1,67 @@ +# Ordered Job +# +# Ensures that only one job for a given queue +# will be running on any worker at a given time +# +# Examples: +# +# class TestJob +# include Resque::Integration +# +# unique { |company_id, param1| [company_id] } +# ordered max_iterations: 10 +# +# def self.execute(company_id, param1) +# heavy_lifting_work +# end +# end +# +module Resque + module Integration + module Ordered + ARGS_EXPIRATION = 1.week + + def self.extended(base) + base.extend ClassMethods + + base.singleton_class.class_eval do + attr_accessor :max_iterations + + alias_method_chain :enqueue, :ordered + end + end + + module ClassMethods + def enqueue_with_ordered(*args) + meta = enqueue_without_ordered(*args) + + encoded_args = Resque.encode(args) + args_key = ordered_queue_key(meta.meta_id) + Resque.redis.rpush(args_key, encoded_args) + Resque.redis.expire(args_key, ARGS_EXPIRATION) + + meta + end + + def perform(meta_id, *) + args_key = ordered_queue_key(meta_id) + i = 1 + while job_args = Resque.redis.lpop(args_key) + execute(*Resque.decode(job_args)) + + i += 1 + return continue if max_iterations && i > max_iterations + end + end + + def ordered_queue_size(meta_id) + Resque.redis.llen(ordered_queue_key(meta_id)).to_i + end + + def ordered_queue_key(meta_id) + "ordered:#{meta_id}" + end + end + end + end +end diff --git a/spec/resque/integration/ordered_spec.rb b/spec/resque/integration/ordered_spec.rb new file mode 100644 index 0000000..ad16992 --- /dev/null +++ b/spec/resque/integration/ordered_spec.rb @@ -0,0 +1,41 @@ +require "spec_helper" + +describe Resque::Integration::Ordered do + class TestJob + include Resque::Integration + + unique { |company_id, param1| [company_id] } + ordered max_iterations: 2 + end + + it "push args to separate queue" do + meta = TestJob.enqueue(1, 10) + TestJob.enqueue(1, 20) + + args_key = TestJob.ordered_queue_key(meta.meta_id) + expect(TestJob).to be_enqueued(1) + expect(TestJob.ordered_queue_size(meta.meta_id)).to eq 2 + end + + it "execute jobs by each args" do + meta = TestJob.enqueue(1, 10) + TestJob.enqueue(1, 20) + + expect(TestJob).to receive(:execute).with(1, 10).ordered + expect(TestJob).to receive(:execute).with(1, 20).ordered + + TestJob.perform(meta.meta_id) + end + + it "reenqueue job after max iterations reached" do + meta = TestJob.enqueue(1, 10) + TestJob.enqueue(1, 20) + TestJob.enqueue(1, 30) + + expect(TestJob).to receive(:execute).with(1, 10).ordered + expect(TestJob).to receive(:execute).with(1, 20).ordered + expect(TestJob).to_not receive(:execute).with(1, 30).ordered + + TestJob.perform(meta.meta_id) + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 8d5d6b4..14731f6 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -9,4 +9,10 @@ SimpleCov.start -require 'resque/integration' \ No newline at end of file +require 'resque/integration' + +RSpec.configure do |config| + config.before do + Resque.redis.flushdb + end +end