Skip to content

Commit

Permalink
Support dispatching blocked jobs directly from the UI
Browse files Browse the repository at this point in the history
Introduce the dispatch action to blocked jobs

Closes #37

Signed-off-by: Lex Cao <[email protected]>
  • Loading branch information
lexcao authored and rosa committed Mar 1, 2024
1 parent 6d7315b commit 6cf8371
Show file tree
Hide file tree
Showing 17 changed files with 97 additions and 15 deletions.
4 changes: 4 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,12 @@ PLATFORMS
aarch64-linux
arm-linux
arm64-darwin
arm64-darwin-21
arm64-darwin-22
x86-linux
x86_64-darwin
x86_64-darwin-22
x86_64-darwin-23
x86_64-linux

DEPENDENCIES
Expand Down
13 changes: 13 additions & 0 deletions app/controllers/mission_control/jobs/dispatches_controller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
class MissionControl::Jobs::DispatchesController < MissionControl::Jobs::ApplicationController
include MissionControl::Jobs::JobScoped

def create
@job.dispatch
redirect_to application_jobs_url(@application, :blocked), notice: "Dispatched job with id #{@job.job_id}"
end

private
def jobs_relation
ApplicationJob.jobs.blocked
end
end
2 changes: 1 addition & 1 deletion app/helpers/mission_control/jobs/jobs_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def failed_job_backtrace(job)
def attribute_names_for_job_status(status)
case status.to_s
when "failed" then [ "Error", "" ]
when "blocked" then [ "Queue", "Blocked by", "Block expiry" ]
when "blocked" then [ "Queue", "Blocked by", "Block expiry", "" ]
when "finished" then [ "Queue", "Finished" ]
when "scheduled" then [ "Queue", "Scheduled", "" ]
when "in_progress" then [ "Queue", "Run by", "Running for" ]
Expand Down
3 changes: 3 additions & 0 deletions app/views/mission_control/jobs/jobs/_title.html.erb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
<% if job.failed? %>
<%= render "mission_control/jobs/jobs/failed/actions", job: job %>
<% end %>
<% if job.blocked? %>
<%= render "mission_control/jobs/jobs/blocked/actions", job: job %>
<% end %>
</div>
</div>
</h1>
3 changes: 3 additions & 0 deletions app/views/mission_control/jobs/jobs/blocked/_actions.html.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<div class="buttons is-right">
<%= button_to "Dispatch", application_job_dispatch_path(@application, job.job_id), class: "button is-warning is-light mr-0" %>
</div>
3 changes: 3 additions & 0 deletions app/views/mission_control/jobs/jobs/blocked/_job.html.erb
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
<td><%= link_to job.queue_name, application_queue_path(@application, job.queue) %></td>
<td><div class="is-family-monospace is-size-7"><%= job.blocked_by %></div></td>
<td><%= bidirectional_time_distance_in_words_with_title(job.blocked_until) %></td>
<td class="pr-0">
<%= render "mission_control/jobs/jobs/blocked/actions", job: job %>
</td>
1 change: 1 addition & 0 deletions config/routes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
resources :jobs, only: :show do
resource :retry, only: :create
resource :discard, only: :create
resource :dispatch, only: :create

collection do
resource :bulk_retries, only: :create
Expand Down
9 changes: 3 additions & 6 deletions lib/active_job/executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ module ActiveJob::Executing
extend ActiveSupport::Concern

included do
attr_accessor :raw_data, :position, :finished_at, :blocked_by, :blocked_until, :worker_id, :started_at
attr_accessor :raw_data, :position, :finished_at, :blocked_by, :blocked_until, :worker_id, :started_at, :status
attr_reader :serialized_arguments
attr_writer :status

thread_cattr_accessor :current_queue_adapter
end
Expand All @@ -25,10 +24,8 @@ def discard
jobs_relation_for_discarding.discard_job(self)
end

def status
return @status if @status.present?

failed? ? :failed : :pending
def dispatch
ActiveJob.jobs.blocked.dispatch_job(self)
end

private
Expand Down
4 changes: 0 additions & 4 deletions lib/active_job/failed.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,4 @@ module ActiveJob::Failed
included do
attr_accessor :last_execution_error, :failed_at
end

def failed?
last_execution_error.present?
end
end
6 changes: 6 additions & 0 deletions lib/active_job/job_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ def serialize
def perform_now
raise UnsupportedError, "A JobProxy doesn't support immediate execution, only enqueuing."
end

ActiveJob::JobsRelation::STATUSES.each do |status|
define_method "#{status}?" do
self.status == status
end
end
end
5 changes: 5 additions & 0 deletions lib/active_job/jobs_relation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ def discard_job(job)
queue_adapter.discard_job(job, self)
end

# Dispatch the provided job.
def dispatch_job(job)
queue_adapter.dispatch_job(job, self)
end

# Find a job by id.
#
# Returns nil when not found.
Expand Down
11 changes: 11 additions & 0 deletions lib/active_job/queue_adapters/solid_queue_ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ def discard_job(job, jobs_relation)
find_solid_queue_job!(job.job_id, jobs_relation).discard
end

def dispatch_job(job, jobs_relation)
dispatch_immediately find_solid_queue_job!(job.job_id, jobs_relation)
end

def find_job(job_id, *)
if job = SolidQueue::Job.where(active_job_id: job_id).order(:id).last
deserialize_and_proxy_solid_queue_job job
Expand Down Expand Up @@ -146,6 +150,13 @@ def execution_error_from_solid_queue_job(solid_queue_job)
end
end

def dispatch_immediately(job)
SolidQueue::Job.transaction do
job.dispatch_bypassing_concurrency_limits
job.blocked_execution.destroy!
end
end

class RelationAdapter
STATUS_MAP = {
pending: :ready,
Expand Down
4 changes: 4 additions & 0 deletions lib/mission_control/jobs/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ def discard_job(job, jobs_relation)
raise_incompatible_adapter_error_from :discard_job
end

def dispatch_job(job, jobs_relation)
raise_incompatible_adapter_error_from :dispatch_job
end

def find_job(job_id, *)
raise_incompatible_adapter_error_from :find_job
end
Expand Down
19 changes: 19 additions & 0 deletions test/active_job/queue_adapters/adapter_testing/dispatch_jobs.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
module ActiveJob::QueueAdapters::AdapterTesting::DispatchJobs
extend ActiveSupport::Testing::Declarative

test "dispatch blocked job immediately" do
10.times { |index| BlockingJob.perform_later(index * 0.1.seconds) }

# Given, there is one pending and the others are blocked
pending_jobs = ActiveJob.jobs.pending
assert_equal 1, pending_jobs.size
blocked_jobs = ActiveJob.jobs.blocked
assert_equal 9, blocked_jobs.size

blocked_jobs.each(&:dispatch)

# Then, all blocked jobs are pending
assert_empty blocked_jobs.reload
assert_equal 10, pending_jobs.reload.size
end
end
1 change: 1 addition & 0 deletions test/active_job/queue_adapters/solid_queue_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

class ActiveJob::QueueAdapters::SolidQueueTest < ActiveSupport::TestCase
include ActiveJob::QueueAdapters::AdapterTesting
include DispatchJobs

setup do
SolidQueue.logger = ActiveSupport::Logger.new(nil)
Expand Down
7 changes: 7 additions & 0 deletions test/dummy/app/jobs/blocking_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
class BlockingJob < ApplicationJob
limits_concurrency key: ->(*args) { "exclusive" }

def perform(pause = nil)
sleep(pause) if pause
end
end
17 changes: 13 additions & 4 deletions test/dummy/db/seeds.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,34 @@ def clean_database
end

class JobsLoader
attr_reader :application, :server, :failed_jobs_count, :regular_jobs_count, :finished_jobs_count
attr_reader :application, :server, :failed_jobs_count, :regular_jobs_count, :finished_jobs_count, :blocked_jobs_count

def initialize(application, server, failed_jobs_count: 100, regular_jobs_count: 50)
@application = application
@server = server
@failed_jobs_count = randomize(failed_jobs_count)
@regular_jobs_count = randomize(regular_jobs_count)
@finished_jobs_count = randomize(regular_jobs_count)
@blocked_jobs_count = randomize(regular_jobs_count)
end

def load
server.activating do
load_finished_jobs
load_failed_jobs
load_regular_jobs
load_blocked_jobs if server.queue_adapter.supported_statuses.include?(:blocked)
end
end

private
def load_failed_jobs
puts "Generating #{failed_jobs_count} failed jobs for #{application} - #{server}..."
failed_jobs_count.times { |index| enqueue_one_of FailingJob => index, FailingReloadedJob => index, FailingPostJob => [ Post.last, 1.year.ago ] }
dispatch_jobs
perform_jobs
end

def dispatch_jobs
def perform_jobs
case server.queue_adapter_name
when :resque
worker = Resque::Worker.new("*")
Expand All @@ -53,7 +55,7 @@ def load_finished_jobs
regular_jobs_count.times do |index|
enqueue_one_of DummyJob => index, DummyReloadedJob => index
end
dispatch_jobs
perform_jobs
end

def load_regular_jobs
Expand All @@ -63,6 +65,13 @@ def load_regular_jobs
end
end

def load_blocked_jobs
puts "Generating #{blocked_jobs_count} blocked jobs for #{application} - #{server}..."
blocked_jobs_count.times do |index|
enqueue_one_of BlockingJob => index
end
end

def with_random_queue(job_class)
random_queue = [ "background", "reports", "default", "realtime" ].sample
job_class.tap do
Expand Down

0 comments on commit 6cf8371

Please sign in to comment.