Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle scenarios where registered processes disappear outside regular flow #337

Merged
merged 6 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/models/solid_queue/process/prunable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ module Prunable
end

class_methods do
def prune
def prune(excluding: nil)
SolidQueue.instrument :prune_processes, size: 0 do |payload|
prunable.non_blocking_lock.find_in_batches(batch_size: 50) do |batch|
prunable.excluding(excluding).non_blocking_lock.find_in_batches(batch_size: 50) do |batch|
payload[:size] += batch.size

batch.each(&:prune)
Expand Down
9 changes: 9 additions & 0 deletions lib/solid_queue/processes/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class Base

def initialize(*)
@name = generate_name
@stopped = false
end

def kind
Expand All @@ -28,10 +29,18 @@ def metadata
{}
end

def stop
@stopped = true
end

private
def generate_name
[ kind.downcase, SecureRandom.hex(10) ].join("-")
end

def stopped?
@stopped
end
end
end
end
9 changes: 6 additions & 3 deletions lib/solid_queue/processes/registrable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ def register
end

def deregister
process.deregister if registered?
process&.deregister
end

def registered?
process&.persisted?
process.present?
end

def launch_heartbeat
Expand All @@ -53,7 +53,10 @@ def stop_heartbeat
end

def heartbeat
process.heartbeat
process.with_lock { process.heartbeat }
rescue ActiveRecord::RecordNotFound
self.process = nil
wake_up
end
end
end
16 changes: 7 additions & 9 deletions lib/solid_queue/processes/runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ def start
end

def stop
@stopped = true
wake_up
super

wake_up
@thread&.join
end

Expand All @@ -33,8 +33,6 @@ def mode
def boot
SolidQueue.instrument(:start_process, process: self) do
run_callbacks(:boot) do
@stopped = false

if running_as_fork?
register_signal_handlers
set_procline
Expand All @@ -43,18 +41,18 @@ def boot
end
end

def run
raise NotImplementedError
end

def shutting_down?
stopped? || (running_as_fork? && supervisor_went_away?) || finished?
stopped? || (running_as_fork? && supervisor_went_away?) || finished? || !registered?
end

def run
raise NotImplementedError
end

def stopped?
@stopped
end

def finished?
running_inline? && all_work_completed?
end
Expand Down
7 changes: 1 addition & 6 deletions lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def start
end

def stop
@stopped = true
super
run_stop_hooks
end

Expand All @@ -47,7 +47,6 @@ def stop
def boot
SolidQueue.instrument(:start_process, process: self) do
run_callbacks(:boot) do
@stopped = false
sync_std_streams
end
end
Expand Down Expand Up @@ -87,10 +86,6 @@ def start_process(configured_process)
forks[pid] = process_instance
end

def stopped?
@stopped
end

def set_procline
procline "supervising #{supervised_processes.join(", ")}"
end
Expand Down
2 changes: 1 addition & 1 deletion lib/solid_queue/supervisor/maintenance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def stop_maintenance_task
end

def prune_dead_processes
wrap_in_app_executor { SolidQueue::Process.prune }
wrap_in_app_executor { SolidQueue::Process.prune(excluding: process) }
end

def fail_orphaned_executions
Expand Down
22 changes: 22 additions & 0 deletions test/unit/supervisor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,28 @@ class SupervisorTest < ActiveSupport::TestCase
end
end

test "prune processes with expired heartbeats" do
pruned = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-42")

# Simulate expired heartbeats
SolidQueue::Process.update_all(last_heartbeat_at: 10.minutes.ago)

not_pruned = SolidQueue::Process.register(kind: "Worker", pid: 44, name: "worker-44")

assert_equal 2, SolidQueue::Process.count

pid = run_supervisor_as_fork(load_configuration_from: { workers: [ { queues: :background } ] })
wait_for_registered_processes(4)

terminate_process(pid)

skip_active_record_query_cache do
assert_equal 1, SolidQueue::Process.count
assert_nil SolidQueue::Process.find_by(id: pruned.id)
assert SolidQueue::Process.find_by(id: not_pruned.id).present?
end
end

private
def assert_registered_workers(supervisor_pid: nil, count: 1)
assert_registered_processes(kind: "Worker", count: count, supervisor_pid: supervisor_pid)
Expand Down
21 changes: 21 additions & 0 deletions test/unit/worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,27 @@ class WorkerTest < ActiveSupport::TestCase
assert_equal 5, JobResult.where(queue_name: :background, status: "completed", value: :immediate).count
end

test "terminate on heartbeat when unregistered" do
old_heartbeat_interval, SolidQueue.process_heartbeat_interval = SolidQueue.process_heartbeat_interval, 1.second

@worker.start
wait_for_registered_processes(1, timeout: 1.second)

assert_not @worker.pool.shutdown?

process = SolidQueue::Process.first
assert_equal "Worker", process.kind

process.deregister

# And now just wait until the worker tries to heartbeat and realises
# it needs to stop
wait_while_with_timeout(2) { [email protected]? }
assert @worker.pool.shutdown?
ensure
SolidQueue.process_heartbeat_interval = old_heartbeat_interval
end

private
def with_polling(silence:)
old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence
Expand Down