diff --git a/Gemfile.lock b/Gemfile.lock index 38d2f7e6..ffb2453e 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -330,8 +330,12 @@ PLATFORMS aarch64-linux arm-linux arm64-darwin + arm64-darwin-21 + arm64-darwin-22 x86-linux x86_64-darwin + x86_64-darwin-22 + x86_64-darwin-23 x86_64-linux DEPENDENCIES diff --git a/app/controllers/mission_control/jobs/dispatches_controller.rb b/app/controllers/mission_control/jobs/dispatches_controller.rb new file mode 100644 index 00000000..8ca94669 --- /dev/null +++ b/app/controllers/mission_control/jobs/dispatches_controller.rb @@ -0,0 +1,13 @@ +class MissionControl::Jobs::DispatchesController < MissionControl::Jobs::ApplicationController + include MissionControl::Jobs::JobScoped + + def create + @job.dispatch + redirect_to application_jobs_url(@application, :blocked), notice: "Dispatched job with id #{@job.job_id}" + end + + private + def jobs_relation + ApplicationJob.jobs.blocked + end +end diff --git a/app/helpers/mission_control/jobs/jobs_helper.rb b/app/helpers/mission_control/jobs/jobs_helper.rb index 8736866b..68cec842 100644 --- a/app/helpers/mission_control/jobs/jobs_helper.rb +++ b/app/helpers/mission_control/jobs/jobs_helper.rb @@ -18,7 +18,7 @@ def failed_job_backtrace(job) def attribute_names_for_job_status(status) case status.to_s when "failed" then [ "Error", "" ] - when "blocked" then [ "Queue", "Blocked by", "Block expiry" ] + when "blocked" then [ "Queue", "Blocked by", "Block expiry", "" ] when "finished" then [ "Queue", "Finished" ] when "scheduled" then [ "Queue", "Scheduled", "" ] when "in_progress" then [ "Queue", "Run by", "Running for" ] diff --git a/app/views/mission_control/jobs/jobs/_title.html.erb b/app/views/mission_control/jobs/jobs/_title.html.erb index e2b8209f..ed974e47 100644 --- a/app/views/mission_control/jobs/jobs/_title.html.erb +++ b/app/views/mission_control/jobs/jobs/_title.html.erb @@ -8,6 +8,9 @@ <% if job.failed? %> <%= render "mission_control/jobs/jobs/failed/actions", job: job %> <% end %> + <% if job.blocked? %> + <%= render "mission_control/jobs/jobs/blocked/actions", job: job %> + <% end %> diff --git a/app/views/mission_control/jobs/jobs/blocked/_actions.html.erb b/app/views/mission_control/jobs/jobs/blocked/_actions.html.erb new file mode 100644 index 00000000..9afb0f86 --- /dev/null +++ b/app/views/mission_control/jobs/jobs/blocked/_actions.html.erb @@ -0,0 +1,3 @@ +
+ <%= button_to "Dispatch", application_job_dispatch_path(@application, job.job_id), class: "button is-warning is-light mr-0" %> +
diff --git a/app/views/mission_control/jobs/jobs/blocked/_job.html.erb b/app/views/mission_control/jobs/jobs/blocked/_job.html.erb index c4c1088b..8135c73f 100644 --- a/app/views/mission_control/jobs/jobs/blocked/_job.html.erb +++ b/app/views/mission_control/jobs/jobs/blocked/_job.html.erb @@ -1,3 +1,6 @@ <%= link_to job.queue_name, application_queue_path(@application, job.queue) %>
<%= job.blocked_by %>
<%= bidirectional_time_distance_in_words_with_title(job.blocked_until) %> + + <%= render "mission_control/jobs/jobs/blocked/actions", job: job %> + diff --git a/config/routes.rb b/config/routes.rb index 0a04ba44..8b2057c3 100644 --- a/config/routes.rb +++ b/config/routes.rb @@ -9,6 +9,7 @@ resources :jobs, only: :show do resource :retry, only: :create resource :discard, only: :create + resource :dispatch, only: :create collection do resource :bulk_retries, only: :create diff --git a/lib/active_job/executing.rb b/lib/active_job/executing.rb index e27ab15f..d8f7e24f 100644 --- a/lib/active_job/executing.rb +++ b/lib/active_job/executing.rb @@ -4,9 +4,8 @@ module ActiveJob::Executing extend ActiveSupport::Concern included do - attr_accessor :raw_data, :position, :finished_at, :blocked_by, :blocked_until, :worker_id, :started_at + attr_accessor :raw_data, :position, :finished_at, :blocked_by, :blocked_until, :worker_id, :started_at, :status attr_reader :serialized_arguments - attr_writer :status thread_cattr_accessor :current_queue_adapter end @@ -25,10 +24,8 @@ def discard jobs_relation_for_discarding.discard_job(self) end - def status - return @status if @status.present? - - failed? ? :failed : :pending + def dispatch + ActiveJob.jobs.blocked.dispatch_job(self) end private diff --git a/lib/active_job/failed.rb b/lib/active_job/failed.rb index 80049977..b1817365 100644 --- a/lib/active_job/failed.rb +++ b/lib/active_job/failed.rb @@ -4,8 +4,4 @@ module ActiveJob::Failed included do attr_accessor :last_execution_error, :failed_at end - - def failed? - last_execution_error.present? - end end diff --git a/lib/active_job/job_proxy.rb b/lib/active_job/job_proxy.rb index 7441db32..623415fb 100644 --- a/lib/active_job/job_proxy.rb +++ b/lib/active_job/job_proxy.rb @@ -23,4 +23,10 @@ def serialize def perform_now raise UnsupportedError, "A JobProxy doesn't support immediate execution, only enqueuing." end + + ActiveJob::JobsRelation::STATUSES.each do |status| + define_method "#{status}?" do + self.status == status + end + end end diff --git a/lib/active_job/jobs_relation.rb b/lib/active_job/jobs_relation.rb index 09a78ffe..5764c479 100644 --- a/lib/active_job/jobs_relation.rb +++ b/lib/active_job/jobs_relation.rb @@ -147,6 +147,11 @@ def discard_job(job) queue_adapter.discard_job(job, self) end + # Dispatch the provided job. + def dispatch_job(job) + queue_adapter.dispatch_job(job, self) + end + # Find a job by id. # # Returns nil when not found. diff --git a/lib/active_job/queue_adapters/solid_queue_ext.rb b/lib/active_job/queue_adapters/solid_queue_ext.rb index ff62faab..4e2ef7b9 100644 --- a/lib/active_job/queue_adapters/solid_queue_ext.rb +++ b/lib/active_job/queue_adapters/solid_queue_ext.rb @@ -82,6 +82,10 @@ def discard_job(job, jobs_relation) find_solid_queue_job!(job.job_id, jobs_relation).discard end + def dispatch_job(job, jobs_relation) + dispatch_immediately find_solid_queue_job!(job.job_id, jobs_relation) + end + def find_job(job_id, *) if job = SolidQueue::Job.where(active_job_id: job_id).order(:id).last deserialize_and_proxy_solid_queue_job job @@ -146,6 +150,13 @@ def execution_error_from_solid_queue_job(solid_queue_job) end end + def dispatch_immediately(job) + SolidQueue::Job.transaction do + job.dispatch_bypassing_concurrency_limits + job.blocked_execution.destroy! + end + end + class RelationAdapter STATUS_MAP = { pending: :ready, diff --git a/lib/mission_control/jobs/adapter.rb b/lib/mission_control/jobs/adapter.rb index 7ab5ac2f..278b5c94 100644 --- a/lib/mission_control/jobs/adapter.rb +++ b/lib/mission_control/jobs/adapter.rb @@ -102,6 +102,10 @@ def discard_job(job, jobs_relation) raise_incompatible_adapter_error_from :discard_job end + def dispatch_job(job, jobs_relation) + raise_incompatible_adapter_error_from :dispatch_job + end + def find_job(job_id, *) raise_incompatible_adapter_error_from :find_job end diff --git a/test/active_job/queue_adapters/adapter_testing/dispatch_jobs.rb b/test/active_job/queue_adapters/adapter_testing/dispatch_jobs.rb new file mode 100644 index 00000000..22c4b20b --- /dev/null +++ b/test/active_job/queue_adapters/adapter_testing/dispatch_jobs.rb @@ -0,0 +1,19 @@ +module ActiveJob::QueueAdapters::AdapterTesting::DispatchJobs + extend ActiveSupport::Testing::Declarative + + test "dispatch blocked job immediately" do + 10.times { |index| BlockingJob.perform_later(index * 0.1.seconds) } + + # Given, there is one pending and the others are blocked + pending_jobs = ActiveJob.jobs.pending + assert_equal 1, pending_jobs.size + blocked_jobs = ActiveJob.jobs.blocked + assert_equal 9, blocked_jobs.size + + blocked_jobs.each(&:dispatch) + + # Then, all blocked jobs are pending + assert_empty blocked_jobs.reload + assert_equal 10, pending_jobs.reload.size + end +end diff --git a/test/active_job/queue_adapters/solid_queue_test.rb b/test/active_job/queue_adapters/solid_queue_test.rb index ff62a28b..c5d1036e 100644 --- a/test/active_job/queue_adapters/solid_queue_test.rb +++ b/test/active_job/queue_adapters/solid_queue_test.rb @@ -2,6 +2,7 @@ class ActiveJob::QueueAdapters::SolidQueueTest < ActiveSupport::TestCase include ActiveJob::QueueAdapters::AdapterTesting + include DispatchJobs setup do SolidQueue.logger = ActiveSupport::Logger.new(nil) diff --git a/test/dummy/app/jobs/blocking_job.rb b/test/dummy/app/jobs/blocking_job.rb new file mode 100644 index 00000000..046be28e --- /dev/null +++ b/test/dummy/app/jobs/blocking_job.rb @@ -0,0 +1,7 @@ +class BlockingJob < ApplicationJob + limits_concurrency key: ->(*args) { "exclusive" } + + def perform(pause = nil) + sleep(pause) if pause + end +end diff --git a/test/dummy/db/seeds.rb b/test/dummy/db/seeds.rb index d642f200..36cae4a0 100644 --- a/test/dummy/db/seeds.rb +++ b/test/dummy/db/seeds.rb @@ -9,7 +9,7 @@ def clean_database end class JobsLoader - attr_reader :application, :server, :failed_jobs_count, :regular_jobs_count, :finished_jobs_count + attr_reader :application, :server, :failed_jobs_count, :regular_jobs_count, :finished_jobs_count, :blocked_jobs_count def initialize(application, server, failed_jobs_count: 100, regular_jobs_count: 50) @application = application @@ -17,6 +17,7 @@ def initialize(application, server, failed_jobs_count: 100, regular_jobs_count: @failed_jobs_count = randomize(failed_jobs_count) @regular_jobs_count = randomize(regular_jobs_count) @finished_jobs_count = randomize(regular_jobs_count) + @blocked_jobs_count = randomize(regular_jobs_count) end def load @@ -24,6 +25,7 @@ def load load_finished_jobs load_failed_jobs load_regular_jobs + load_blocked_jobs if server.queue_adapter.supported_statuses.include?(:blocked) end end @@ -31,10 +33,10 @@ def load def load_failed_jobs puts "Generating #{failed_jobs_count} failed jobs for #{application} - #{server}..." failed_jobs_count.times { |index| enqueue_one_of FailingJob => index, FailingReloadedJob => index, FailingPostJob => [ Post.last, 1.year.ago ] } - dispatch_jobs + perform_jobs end - def dispatch_jobs + def perform_jobs case server.queue_adapter_name when :resque worker = Resque::Worker.new("*") @@ -53,7 +55,7 @@ def load_finished_jobs regular_jobs_count.times do |index| enqueue_one_of DummyJob => index, DummyReloadedJob => index end - dispatch_jobs + perform_jobs end def load_regular_jobs @@ -63,6 +65,13 @@ def load_regular_jobs end end + def load_blocked_jobs + puts "Generating #{blocked_jobs_count} blocked jobs for #{application} - #{server}..." + blocked_jobs_count.times do |index| + enqueue_one_of BlockingJob => index + end + end + def with_random_queue(job_class) random_queue = [ "background", "reports", "default", "realtime" ].sample job_class.tap do