From fe2d47fefd590a50c09247b144e1e3c49d7289df Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Sun, 8 Sep 2024 21:23:44 +0200 Subject: [PATCH 1/6] Prevent supervisor from pruning itself For example, when coming back from being suspended and having its heartbeat expired. --- app/models/solid_queue/process/prunable.rb | 4 ++-- lib/solid_queue/supervisor/maintenance.rb | 2 +- test/unit/supervisor_test.rb | 22 ++++++++++++++++++++++ 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/app/models/solid_queue/process/prunable.rb b/app/models/solid_queue/process/prunable.rb index 408b4915..85341d1d 100644 --- a/app/models/solid_queue/process/prunable.rb +++ b/app/models/solid_queue/process/prunable.rb @@ -10,9 +10,9 @@ module Prunable end class_methods do - def prune + def prune(excluding: nil) SolidQueue.instrument :prune_processes, size: 0 do |payload| - prunable.non_blocking_lock.find_in_batches(batch_size: 50) do |batch| + prunable.excluding(excluding).non_blocking_lock.find_in_batches(batch_size: 50) do |batch| payload[:size] += batch.size batch.each(&:prune) diff --git a/lib/solid_queue/supervisor/maintenance.rb b/lib/solid_queue/supervisor/maintenance.rb index 2aa8ef04..1b6b5204 100644 --- a/lib/solid_queue/supervisor/maintenance.rb +++ b/lib/solid_queue/supervisor/maintenance.rb @@ -24,7 +24,7 @@ def stop_maintenance_task end def prune_dead_processes - wrap_in_app_executor { SolidQueue::Process.prune } + wrap_in_app_executor { SolidQueue::Process.prune(excluding: process) } end def fail_orphaned_executions diff --git a/test/unit/supervisor_test.rb b/test/unit/supervisor_test.rb index 28288717..d4919070 100644 --- a/test/unit/supervisor_test.rb +++ b/test/unit/supervisor_test.rb @@ -121,6 +121,28 @@ class SupervisorTest < ActiveSupport::TestCase end end + test "prune processes with expired heartbeats" do + pruned = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-42") + + # Simulate expired heartbeats + SolidQueue::Process.update_all(last_heartbeat_at: 10.minutes.ago) + + not_pruned = SolidQueue::Process.register(kind: "Worker", pid: 44, name: "worker-44") + + assert_equal 2, SolidQueue::Process.count + + pid = run_supervisor_as_fork(load_configuration_from: { workers: [ { queues: :background } ] }) + wait_for_registered_processes(4) + + terminate_process(pid) + + skip_active_record_query_cache do + assert_equal 1, SolidQueue::Process.count + assert_nil SolidQueue::Process.find_by(id: pruned.id) + assert SolidQueue::Process.find_by(id: not_pruned.id).present? + end + end + private def assert_registered_workers(supervisor_pid: nil, count: 1) assert_registered_processes(kind: "Worker", count: count, supervisor_pid: supervisor_pid) From f8898187060e670b716b2f6f5986f1698551f5c9 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Sun, 8 Sep 2024 21:58:12 +0200 Subject: [PATCH 2/6] Move `#stop` and `#stopped?` to `Processes:Base` As these are all common between all processes. --- lib/solid_queue/processes/base.rb | 9 +++++++++ lib/solid_queue/processes/runnable.rb | 10 ++-------- lib/solid_queue/supervisor.rb | 9 +++------ 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/lib/solid_queue/processes/base.rb b/lib/solid_queue/processes/base.rb index 7575fe41..6069a90d 100644 --- a/lib/solid_queue/processes/base.rb +++ b/lib/solid_queue/processes/base.rb @@ -10,6 +10,7 @@ class Base def initialize(*) @name = generate_name + @stopped = false end def kind @@ -28,10 +29,18 @@ def metadata {} end + def stop + @stopped = true + end + private def generate_name [ kind.downcase, SecureRandom.hex(10) ].join("-") end + + def stopped? + @stopped + end end end end diff --git a/lib/solid_queue/processes/runnable.rb b/lib/solid_queue/processes/runnable.rb index d66ebb2b..474a1e0f 100644 --- a/lib/solid_queue/processes/runnable.rb +++ b/lib/solid_queue/processes/runnable.rb @@ -17,9 +17,9 @@ def start end def stop - @stopped = true - wake_up + super + wake_up @thread&.join end @@ -33,8 +33,6 @@ def mode def boot SolidQueue.instrument(:start_process, process: self) do run_callbacks(:boot) do - @stopped = false - if running_as_fork? register_signal_handlers set_procline @@ -51,10 +49,6 @@ def run raise NotImplementedError end - def stopped? - @stopped - end - def finished? running_inline? && all_work_completed? end diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 3b492831..0960a2e0 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -31,13 +31,15 @@ def start run_start_hooks start_processes + + launch_heartbeat launch_maintenance_task supervise end def stop - @stopped = true + super run_stop_hooks end @@ -47,7 +49,6 @@ def stop def boot SolidQueue.instrument(:start_process, process: self) do run_callbacks(:boot) do - @stopped = false sync_std_streams end end @@ -87,10 +88,6 @@ def start_process(configured_process) forks[pid] = process_instance end - def stopped? - @stopped - end - def set_procline procline "supervising #{supervised_processes.join(", ")}" end From fdf463beb1ce1e18c8d406eb6a280b0142e43bfd Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Sun, 8 Sep 2024 22:39:48 +0200 Subject: [PATCH 3/6] Stop runnable processes when registered process record disappears If, for some reason, the process failed a heartbeat and the supervisor pruned it, we shouldn't continue running. Just stop as if we had received a signal. This could be used in the future from Mission Control to stop a worker. --- lib/solid_queue/processes/registrable.rb | 9 ++++++--- lib/solid_queue/processes/runnable.rb | 6 +++++- test/unit/worker_test.rb | 21 +++++++++++++++++++++ 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/lib/solid_queue/processes/registrable.rb b/lib/solid_queue/processes/registrable.rb index cdd8fbcf..9eddf20e 100644 --- a/lib/solid_queue/processes/registrable.rb +++ b/lib/solid_queue/processes/registrable.rb @@ -29,11 +29,11 @@ def register end def deregister - process.deregister if registered? + process&.deregister end def registered? - process&.persisted? + process.present? end def launch_heartbeat @@ -53,7 +53,10 @@ def stop_heartbeat end def heartbeat - process.heartbeat + process.reload.heartbeat + rescue ActiveRecord::RecordNotFound + self.process = nil + wake_up end end end diff --git a/lib/solid_queue/processes/runnable.rb b/lib/solid_queue/processes/runnable.rb index 474a1e0f..2c11cf3d 100644 --- a/lib/solid_queue/processes/runnable.rb +++ b/lib/solid_queue/processes/runnable.rb @@ -41,8 +41,12 @@ def boot end end + def run + raise NotImplementedError + end + def shutting_down? - stopped? || (running_as_fork? && supervisor_went_away?) || finished? + stopped? || (running_as_fork? && supervisor_went_away?) || finished? || !registered? end def run diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index 786c2b8f..3523e4a1 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -133,6 +133,27 @@ class WorkerTest < ActiveSupport::TestCase assert_equal 5, JobResult.where(queue_name: :background, status: "completed", value: :immediate).count end + test "terminate on heartbeat when unregistered" do + old_heartbeat_interval, SolidQueue.process_heartbeat_interval = SolidQueue.process_heartbeat_interval, 1.second + + @worker.start + wait_for_registered_processes(1, timeout: 1.second) + + assert_not @worker.pool.shutdown? + + process = SolidQueue::Process.first + assert_equal "Worker", process.kind + + process.deregister + + # And now just wait until the worker tries to heartbeat and realises + # it needs to stop + wait_while_with_timeout(2) { !@worker.pool.shutdown? } + assert @worker.pool.shutdown? + ensure + SolidQueue.process_heartbeat_interval = old_heartbeat_interval + end + private def with_polling(silence:) old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence From d58c230d5d06ea10aec63a1d434be6f956d56bf5 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 9 Sep 2024 17:19:15 +0200 Subject: [PATCH 4/6] Lock the process record before heartbeating To guard against race conditions of the record being deleted precisely then. --- lib/solid_queue/processes/registrable.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/solid_queue/processes/registrable.rb b/lib/solid_queue/processes/registrable.rb index 9eddf20e..084e8faf 100644 --- a/lib/solid_queue/processes/registrable.rb +++ b/lib/solid_queue/processes/registrable.rb @@ -53,7 +53,7 @@ def stop_heartbeat end def heartbeat - process.reload.heartbeat + process.with_lock(&:heartbeat) rescue ActiveRecord::RecordNotFound self.process = nil wake_up From 6d7bc6f5320b4cdd239ee34c4e68b1fba0644cf2 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Tue, 10 Sep 2024 19:31:13 +0200 Subject: [PATCH 5/6] Fix block for `with_lock`, that doesn't yield the receiver Thanks to @npezza93 for catching this ^_^U --- lib/solid_queue/processes/registrable.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/solid_queue/processes/registrable.rb b/lib/solid_queue/processes/registrable.rb index 084e8faf..24941238 100644 --- a/lib/solid_queue/processes/registrable.rb +++ b/lib/solid_queue/processes/registrable.rb @@ -53,7 +53,7 @@ def stop_heartbeat end def heartbeat - process.with_lock(&:heartbeat) + process.with_lock { process.heartbeat } rescue ActiveRecord::RecordNotFound self.process = nil wake_up From 8957e00c35bcf120c83e9da6a89dbb3608ff30e1 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Wed, 11 Sep 2024 14:01:32 +0200 Subject: [PATCH 6/6] Remove duplicate heartbeat launch for the supervisor Left-over from somethign I was rewriting. --- lib/solid_queue/supervisor.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 0960a2e0..9ef736e4 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -31,8 +31,6 @@ def start run_start_hooks start_processes - - launch_heartbeat launch_maintenance_task supervise