Skip to content

Commit

Permalink
Merge pull request #65 from bibendi/master
Browse files Browse the repository at this point in the history
feature: add shuffled queues on worker
  • Loading branch information
bibendi committed Sep 15, 2015
2 parents f3ac05d + 3ca76ed commit 1d58f9b
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 11 deletions.
30 changes: 21 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ $ rails generate resque:integration:install
class ResqueJobTest
include Resque::Integration

# это название очереди, в которой будет выполняться джою
# это название очереди, в которой будет выполняться джоб
queue :my_queue

# с помощью unique можно указать, что задача является уникальной, и какие аргументы определяют уникальность задачи.
Expand Down Expand Up @@ -125,7 +125,9 @@ workers:
stop_timeout: 5 # максимальное время, отпущенное воркеру для остановки/рестарта
env: # переменные окружение, специфичные для данного воркера
RUBY_HEAP_SLOTS_GROWTH_FACTOR: 0.5
'companies,images: 2 # совмещённая очередь, приоритет будет у companies
'companies,images': 2 # совмещённая очередь, приоритет будет у companies
'xls,yml':
shuffle: true # совмещённая очередь, приоритета не будет

# конфигурация failure-бэкэндов
failure:
Expand Down Expand Up @@ -225,7 +227,7 @@ Resque.enqueue(ImageProcessingJob, id=2)
```ruby
class ResqueJobTest
include Resque::Integration
retrys delay: 10, limit: 2
unique
end
Expand All @@ -240,13 +242,23 @@ workers:
jobs_per_fork: 10
```

## Gem Releasing
## Resque Ordered

Уникальный по каким-то параметрам джоб может выполняться в одно и тоже время только на одном из воркеров

```ruby
class ResqueJobTest
include Resque::Integration
unique { |company_id, param1| [company_id] }
ordered max_iterations: 20 # max_iterations - сколько раз запустится метод `execute` с аргументами в очереди,
# прежде чем джоб перепоставится

1. должен быть настроен git remote upstream и должны быть права на push
1. git checkout master
2. git pull upstream master
3. правим версию гема в файле VERSION в корне гема. (читаем правила версионирования http://semver.org/)
4. bundle exec rake release
def self.execute(ordered_meta, company_id, param1)
heavy_lifting_work
end
end
```

## Contributing

Expand Down
4 changes: 3 additions & 1 deletion lib/resque/integration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
require 'active_support/concern'

require 'resque/integration/hooks'
require 'resque/integration/engine'

require 'resque/scheduler'
require 'resque/scheduler/tasks'
Expand Down Expand Up @@ -53,6 +52,7 @@ module Integration
autoload :Unique, 'resque/integration/unique'
autoload :Ordered, 'resque/integration/ordered'
autoload :LogsRotator, 'resque/integration/logs_rotator'
autoload :Extensions, 'resque/integration/extensions'

extend ActiveSupport::Concern

Expand Down Expand Up @@ -117,3 +117,5 @@ def ordered(options = {})
end
end # module Integration
end # module Resque

require 'resque/integration/engine'
1 change: 1 addition & 0 deletions lib/resque/integration/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def env
env[:QUEUE] ||= queue
env[:JOBS_PER_FORK] ||= jobs_per_fork if jobs_per_fork
env[:MINUTES_PER_FORK] ||= minutes_per_fork if minutes_per_fork
env[:SHUFFLE] ||= 1 if shuffle

Hash[env.map { |k, v| [k, v.to_s] }]
end
Expand Down
4 changes: 4 additions & 0 deletions lib/resque/integration/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,9 @@ class Engine < Rails::Engine
end
end
end

initializer "resque-integration.extensions" do
::Resque::Worker.send :include, ::Resque::Integration::Extensions::Worker
end
end # class Engine
end # module Resque::Integration
7 changes: 7 additions & 0 deletions lib/resque/integration/extensions.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module Resque
module Integration
module Extensions
autoload :Worker, "resque/integration/extensions/worker"
end
end
end
23 changes: 23 additions & 0 deletions lib/resque/integration/extensions/worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
module Resque
module Integration
module Extensions
module Worker
extend ActiveSupport::Concern

included do
alias_method_chain :queues, :shuffle
end

def queues_with_shuffle
queues = queues_without_shuffle
shuffle? ? queues.shuffle : queues
end

def shuffle?
return @shuffle if defined?(@shuffle)
@shuffle = ENV.key?('SHUFFLE')
end
end
end
end
end
10 changes: 9 additions & 1 deletion spec/resque/integration/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,25 @@

describe '#env' do
let :config do
described_class::new(:default,
described_class.new(:default,
:count => 2,
:jobs_per_fork => 10,
:minutes_per_fork => 5,
:shuffle => true,
:env => {:VAR => 2})
end

subject { config.env }

its([:QUEUE]) { should eq 'default' }
its([:JOBS_PER_FORK]) { should eq '10' }
its([:MINUTES_PER_FORK]) { should eq '5' }
its([:SHUFFLE]) { should eq '1' }
its([:VAR]) { should eq '2' }

context "when shuffle is disabled" do
let(:config) { described_class.new(:default, shuffle: false) }
its([:SHUFFLE]) { should be_nil }
end
end
end

0 comments on commit 1d58f9b

Please sign in to comment.