Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch Support #142

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
61 changes: 61 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,67 @@ class ApplicationMailer < ActionMailer::Base
Rails.error.report(exception)
raise exception
end
```

## Batch jobs

SolidQueue offers support for batching jobs. This allows you to track progress of a set of jobs,
and optionally trigger callbacks based on their status. It supports the following:

- Relating jobs to a batch, to track their status
- Three available callbacks to fire:
- `on_finish`: Fired when all jobs have finished, including retries. Fires even when some jobs have failed.
- `on_success`: Fired when all jobs have succeeded, including retries. Will not fire if any jobs have failed, but will fire if jobs have been discarded using `discard_on`
- `on_failure`: Fired the _first_ time a job fails, after all retries are exhausted.
- If a job is part of a batch, it can enqueue more jobs for that batch using `batch#enqueue`
- Batches can be nested within other batches, creating a hierarchy. Outer batches will not finish until all nested batches have finished.

```rb
class SleepyJob < ApplicationJob
def perform(seconds_to_sleep)
Rails.logger.info "Feeling #{seconds_to_sleep} seconds sleepy..."
sleep seconds_to_sleep
end
end

class MultiStepJob < ApplicationJob
def perform
batch.enqueue do
SleepyJob.perform_later(5)
# Because of this nested batch, the top-level batch won't finish until the inner,
# 10 second job finishes
# Both jobs will still run simultaneously
SolidQueue::JobBatch.enqueue do
SleepyJob.perform_later(10)
end
end
end
end

class BatchFinishJob < ApplicationJob
def perform(batch) # batch is always the default first argument
Rails.logger.info "Good job finishing all jobs"
end
end

class BatchSuccessJob < ApplicationJob
def perform(batch) # batch is always the default first argument
Rails.logger.info "Good job finishing all jobs, and all of them worked!"
end
end

class BatchFailureJob < ApplicationJob
def perform(batch) # batch is always the default first argument
Rails.logger.info "At least one job failed, sorry!"
end
end

SolidQueue::JobBatch.enqueue(
on_finish: BatchFinishJob,
on_success: BatchSuccessJob,
on_failure: BatchFailureJob
) do
5.times.map { |i| SleepyJob.perform_later(i) }
end
```

Expand Down
2 changes: 2 additions & 0 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ def perform
else
failed_with(result.error)
end

job.job_batch.touch(:changed_at, :last_changed_at) if job.batch_id.present?
ensure
job.unblock_next_blocked_job
end
Expand Down
6 changes: 5 additions & 1 deletion app/models/solid_queue/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ class EnqueueError < StandardError; end

serialize :arguments, coder: JSON

belongs_to :job_batch, foreign_key: :batch_id, optional: true

class << self
def enqueue_all(active_jobs)
active_jobs_by_job_id = active_jobs.index_by(&:job_id)
Expand Down Expand Up @@ -53,14 +55,16 @@ def create_all_from_active_jobs(active_jobs)
end

def attributes_from_active_job(active_job)
active_job.batch_id = JobBatch.current_batch_id || active_job.batch_id
{
queue_name: active_job.queue_name || DEFAULT_QUEUE_NAME,
active_job_id: active_job.job_id,
priority: active_job.priority || DEFAULT_PRIORITY,
scheduled_at: active_job.scheduled_at,
class_name: active_job.class.name,
arguments: active_job.serialize,
concurrency_key: active_job.concurrency_key
concurrency_key: active_job.concurrency_key,
batch_id: active_job.batch_id
}
end
end
Expand Down
2 changes: 1 addition & 1 deletion app/models/solid_queue/job/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def dispatch_bypassing_concurrency_limits
end

def finished!
if SolidQueue.preserve_finished_jobs?
if SolidQueue.preserve_finished_jobs? || batch_id.present?
touch(:finished_at)
else
destroy!
Expand Down
166 changes: 166 additions & 0 deletions app/models/solid_queue/job_batch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# frozen_string_literal: true

module SolidQueue
class JobBatch < Record
belongs_to :job, foreign_key: :job_id, optional: true
belongs_to :parent_job_batch, foreign_key: :parent_job_batch_id, class_name: "SolidQueue::JobBatch", optional: true
has_many :jobs, foreign_key: :batch_id
has_many :children, foreign_key: :parent_job_batch_id, class_name: "SolidQueue::JobBatch"

serialize :on_finish_active_job, coder: JSON
serialize :on_success_active_job, coder: JSON
serialize :on_failure_active_job, coder: JSON

scope :incomplete, -> {
where(finished_at: nil).where("changed_at IS NOT NULL OR last_changed_at < ?", 1.hour.ago)
}
scope :finished, -> { where.not(finished_at: nil) }

class << self
def current_batch_id
ActiveSupport::IsolatedExecutionState[:current_batch_id]
end

def enqueue(attributes = {})
job_batch = nil
transaction do
job_batch = create!(batch_attributes(attributes))
wrap_in_batch_context(job_batch.id) do
yield job_batch
end
end

job_batch
end

def dispatch_finished_batches
incomplete.order(:id).pluck(:id).each do |id|
transaction do
where(id: id).includes(:children, :jobs).non_blocking_lock.each(&:finish)
end
end
end

def wrap_in_batch_context(batch_id)
previous_batch_id = current_batch_id.presence || nil
ActiveSupport::IsolatedExecutionState[:current_batch_id] = batch_id
yield
ensure
ActiveSupport::IsolatedExecutionState[:current_batch_id] = previous_batch_id
end

private

def batch_attributes(attributes)
on_finish_klass = attributes.delete(:on_finish)
on_success_klass = attributes.delete(:on_success)
on_failure_klass = attributes.delete(:on_failure)

if on_finish_klass.present?
attributes[:on_finish_active_job] = as_active_job(on_finish_klass).serialize
end

if on_success_klass.present?
attributes[:on_success_active_job] = as_active_job(on_success_klass).serialize
end

if on_failure_klass.present?
attributes[:on_failure_active_job] = as_active_job(on_failure_klass).serialize
end

attributes[:parent_job_batch_id] = current_batch_id if current_batch_id.present?
# Set it initially, so we check the batch even if there are no jobs
attributes[:changed_at] = Time.zone.now
attributes[:last_changed_at] = Time.zone.now

attributes
end

def as_active_job(active_job_klass)
active_job_klass.is_a?(ActiveJob::Base) ? active_job_klass : active_job_klass.new
end
end

# Instance-level enqueue
def enqueue(attributes = {})
raise "You cannot enqueue a batch that is already finished" if finished?

transaction do
self.class.wrap_in_batch_context(id) do
yield self
end
end

self
end

def finished?
finished_at.present?
end

def finish
return if finished?
reset_changed_at

all_jobs_succeeded = true
attrs = {}
jobs.find_each do |next_job|
# SolidQueue does treats `discard_on` differently than failures. The job will report as being :finished,
# and there is no record of the failure.
# GoodJob would report a discard as an error. It's possible we should do that in the future?
if fire_failure_job?(next_job)
perform_completion_job(:on_failure_active_job, attrs)
update!(attrs)
end

status = next_job.status
all_jobs_succeeded = all_jobs_succeeded && status != :failed
return unless status.in?([ :finished, :failed ])
end

children.find_each do |child|
return unless child.finished?
end

if on_finish_active_job.present?
perform_completion_job(:on_finish_active_job, attrs)
end

if on_success_active_job.present? && all_jobs_succeeded
perform_completion_job(:on_success_active_job, attrs)
end

transaction do
parent_job_batch.touch(:changed_at, :last_changed_at) if parent_job_batch_id.present?
update!({ finished_at: Time.zone.now }.merge(attrs))
end
end

private

def fire_failure_job?(job)
return false if on_failure_active_job.blank? || job.failed_execution.blank?
job = ActiveJob::Base.deserialize(on_failure_active_job)
job.provider_job_id.blank?
end

def perform_completion_job(job_field, attrs)
active_job = ActiveJob::Base.deserialize(send(job_field))
active_job.send(:deserialize_arguments_if_needed)
active_job.arguments = [ self ] + Array.wrap(active_job.arguments)
self.class.wrap_in_batch_context(parent_job_batch_id || self.class.current_batch_id) do
ActiveJob.perform_all_later([ active_job ])
end
active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id
attrs[job_field] = active_job.serialize
end

def reset_changed_at
if changed_at.blank? && last_changed_at.present?
update_columns(last_changed_at: Time.zone.now) # wait another hour before we check again
else
update_columns(changed_at: nil) # clear out changed_at so we ignore this until the next job finishes
end
end
end
end
21 changes: 21 additions & 0 deletions db/migrate/20240131013203_create_solid_queue_batch_table.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
class CreateSolidQueueBatchTable < ActiveRecord::Migration[7.1]
def change
create_table :solid_queue_job_batches do |t|
t.references :parent_job_batch, index: true # FIXME: foreign key
t.text :on_finish_active_job
t.text :on_success_active_job
t.text :on_failure_active_job
t.datetime :finished_at
t.datetime :changed_at
t.datetime :last_changed_at
t.timestamps

t.index [ :finished_at ]
t.index [ :changed_at ]
t.index [ :last_changed_at ]
end

add_reference :solid_queue_jobs, :batch, index: true
add_foreign_key :solid_queue_jobs, :solid_queue_job_batches, column: :batch_id, on_delete: :cascade
end
end
26 changes: 26 additions & 0 deletions lib/active_job/job_batch_id.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

# Inspired by active_job/core.rb docs
# https://github.com/rails/rails/blob/1c2529b9a6ba5a1eff58be0d0373d7d9d401015b/activejob/lib/active_job/core.rb#L136
module ActiveJob
module JobBatchId
extend ActiveSupport::Concern

included do
attr_accessor :batch_id
end

def serialize
super.merge("batch_id" => batch_id)
end

def deserialize(job_data)
super
self.batch_id = job_data["batch_id"]
end

def batch
@batch ||= SolidQueue::JobBatch.find_by(id: batch_id)
end
end
end
1 change: 1 addition & 0 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

require "active_job"
require "active_job/queue_adapters"
require "active_job/job_batch_id"

require "active_support"
require "active_support/core_ext/numeric/time"
Expand Down
1 change: 1 addition & 0 deletions lib/solid_queue/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def poll
def dispatch_next_batch
with_polling_volume do
ScheduledExecution.dispatch_next_batch(batch_size)
SolidQueue::JobBatch.dispatch_finished_batches
end
end

Expand Down
1 change: 1 addition & 0 deletions lib/solid_queue/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class Engine < ::Rails::Engine
initializer "solid_queue.active_job.extensions" do
ActiveSupport.on_load :active_job do
include ActiveJob::ConcurrencyControls
include ActiveJob::JobBatchId
end
end
end
Expand Down
7 changes: 7 additions & 0 deletions test/dummy/app/jobs/batch_completion_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
class BatchCompletionJob < ApplicationJob
queue_as :background

def perform(batch)
Rails.logger.info "#{batch.jobs.size} jobs completed!"
end
end
10 changes: 10 additions & 0 deletions test/dummy/app/jobs/sleepy_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class SleepyJob < ApplicationJob
queue_as :background

retry_on Exception, wait: 30.seconds, attempts: 5

def perform(seconds_to_sleep)
Rails.logger.info "Feeling #{seconds_to_sleep} seconds sleepy..."
sleep seconds_to_sleep
end
end
Loading
Loading