diff --git a/app/models/solid_queue/process/prunable.rb b/app/models/solid_queue/process/prunable.rb index 408b4915..85341d1d 100644 --- a/app/models/solid_queue/process/prunable.rb +++ b/app/models/solid_queue/process/prunable.rb @@ -10,9 +10,9 @@ module Prunable end class_methods do - def prune + def prune(excluding: nil) SolidQueue.instrument :prune_processes, size: 0 do |payload| - prunable.non_blocking_lock.find_in_batches(batch_size: 50) do |batch| + prunable.excluding(excluding).non_blocking_lock.find_in_batches(batch_size: 50) do |batch| payload[:size] += batch.size batch.each(&:prune) diff --git a/lib/solid_queue/processes/base.rb b/lib/solid_queue/processes/base.rb index 7575fe41..6069a90d 100644 --- a/lib/solid_queue/processes/base.rb +++ b/lib/solid_queue/processes/base.rb @@ -10,6 +10,7 @@ class Base def initialize(*) @name = generate_name + @stopped = false end def kind @@ -28,10 +29,18 @@ def metadata {} end + def stop + @stopped = true + end + private def generate_name [ kind.downcase, SecureRandom.hex(10) ].join("-") end + + def stopped? + @stopped + end end end end diff --git a/lib/solid_queue/processes/registrable.rb b/lib/solid_queue/processes/registrable.rb index cdd8fbcf..24941238 100644 --- a/lib/solid_queue/processes/registrable.rb +++ b/lib/solid_queue/processes/registrable.rb @@ -29,11 +29,11 @@ def register end def deregister - process.deregister if registered? + process&.deregister end def registered? - process&.persisted? + process.present? end def launch_heartbeat @@ -53,7 +53,10 @@ def stop_heartbeat end def heartbeat - process.heartbeat + process.with_lock { process.heartbeat } + rescue ActiveRecord::RecordNotFound + self.process = nil + wake_up end end end diff --git a/lib/solid_queue/processes/runnable.rb b/lib/solid_queue/processes/runnable.rb index d66ebb2b..2c11cf3d 100644 --- a/lib/solid_queue/processes/runnable.rb +++ b/lib/solid_queue/processes/runnable.rb @@ -17,9 +17,9 @@ def start end def stop - @stopped = true - wake_up + super + wake_up @thread&.join end @@ -33,8 +33,6 @@ def mode def boot SolidQueue.instrument(:start_process, process: self) do run_callbacks(:boot) do - @stopped = false - if running_as_fork? register_signal_handlers set_procline @@ -43,18 +41,18 @@ def boot end end + def run + raise NotImplementedError + end + def shutting_down? - stopped? || (running_as_fork? && supervisor_went_away?) || finished? + stopped? || (running_as_fork? && supervisor_went_away?) || finished? || !registered? end def run raise NotImplementedError end - def stopped? - @stopped - end - def finished? running_inline? && all_work_completed? end diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 3b492831..9ef736e4 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -37,7 +37,7 @@ def start end def stop - @stopped = true + super run_stop_hooks end @@ -47,7 +47,6 @@ def stop def boot SolidQueue.instrument(:start_process, process: self) do run_callbacks(:boot) do - @stopped = false sync_std_streams end end @@ -87,10 +86,6 @@ def start_process(configured_process) forks[pid] = process_instance end - def stopped? - @stopped - end - def set_procline procline "supervising #{supervised_processes.join(", ")}" end diff --git a/lib/solid_queue/supervisor/maintenance.rb b/lib/solid_queue/supervisor/maintenance.rb index 2aa8ef04..1b6b5204 100644 --- a/lib/solid_queue/supervisor/maintenance.rb +++ b/lib/solid_queue/supervisor/maintenance.rb @@ -24,7 +24,7 @@ def stop_maintenance_task end def prune_dead_processes - wrap_in_app_executor { SolidQueue::Process.prune } + wrap_in_app_executor { SolidQueue::Process.prune(excluding: process) } end def fail_orphaned_executions diff --git a/test/unit/supervisor_test.rb b/test/unit/supervisor_test.rb index 28288717..d4919070 100644 --- a/test/unit/supervisor_test.rb +++ b/test/unit/supervisor_test.rb @@ -121,6 +121,28 @@ class SupervisorTest < ActiveSupport::TestCase end end + test "prune processes with expired heartbeats" do + pruned = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-42") + + # Simulate expired heartbeats + SolidQueue::Process.update_all(last_heartbeat_at: 10.minutes.ago) + + not_pruned = SolidQueue::Process.register(kind: "Worker", pid: 44, name: "worker-44") + + assert_equal 2, SolidQueue::Process.count + + pid = run_supervisor_as_fork(load_configuration_from: { workers: [ { queues: :background } ] }) + wait_for_registered_processes(4) + + terminate_process(pid) + + skip_active_record_query_cache do + assert_equal 1, SolidQueue::Process.count + assert_nil SolidQueue::Process.find_by(id: pruned.id) + assert SolidQueue::Process.find_by(id: not_pruned.id).present? + end + end + private def assert_registered_workers(supervisor_pid: nil, count: 1) assert_registered_processes(kind: "Worker", count: count, supervisor_pid: supervisor_pid) diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index 786c2b8f..3523e4a1 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -133,6 +133,27 @@ class WorkerTest < ActiveSupport::TestCase assert_equal 5, JobResult.where(queue_name: :background, status: "completed", value: :immediate).count end + test "terminate on heartbeat when unregistered" do + old_heartbeat_interval, SolidQueue.process_heartbeat_interval = SolidQueue.process_heartbeat_interval, 1.second + + @worker.start + wait_for_registered_processes(1, timeout: 1.second) + + assert_not @worker.pool.shutdown? + + process = SolidQueue::Process.first + assert_equal "Worker", process.kind + + process.deregister + + # And now just wait until the worker tries to heartbeat and realises + # it needs to stop + wait_while_with_timeout(2) { !@worker.pool.shutdown? } + assert @worker.pool.shutdown? + ensure + SolidQueue.process_heartbeat_interval = old_heartbeat_interval + end + private def with_polling(silence:) old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence