Skip to content

Commit

Permalink
Stop runnable processes when registered process record disappears
Browse files Browse the repository at this point in the history
If, for some reason, the process failed a heartbeat and the supervisor
pruned it, we shouldn't continue running. Just stop as if we had
received a signal.

This could be used in the future from Mission Control to stop a worker.
  • Loading branch information
rosa committed Sep 9, 2024
1 parent 73df529 commit 8744877
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 4 deletions.
9 changes: 6 additions & 3 deletions lib/solid_queue/processes/registrable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ def register
end

def deregister
process.deregister if registered?
process&.deregister
end

def registered?
process&.persisted?
process.present?
end

def launch_heartbeat
Expand All @@ -53,7 +53,10 @@ def stop_heartbeat
end

def heartbeat
process.heartbeat
process.reload.heartbeat
rescue ActiveRecord::RecordNotFound
self.process = nil
wake_up
end
end
end
6 changes: 5 additions & 1 deletion lib/solid_queue/processes/runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ def boot
end
end

def run
raise NotImplementedError
end

def shutting_down?
stopped? || (running_as_fork? && supervisor_went_away?) || finished?
stopped? || (running_as_fork? && supervisor_went_away?) || finished? || !registered?
end

def run
Expand Down
21 changes: 21 additions & 0 deletions test/unit/worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,27 @@ class WorkerTest < ActiveSupport::TestCase
assert_equal 5, JobResult.where(queue_name: :background, status: "completed", value: :immediate).count
end

test "terminate on heartbeat when unregistered" do
old_heartbeat_interval, SolidQueue.process_heartbeat_interval = SolidQueue.process_heartbeat_interval, 1.second

@worker.start
wait_for_registered_processes(1, timeout: 1.second)

assert_not @worker.pool.shutdown?

process = SolidQueue::Process.first
assert_equal "Worker", process.kind

process.deregister

# And now just wait until the worker tries to heartbeat and realises
# it needs to stop
wait_while_with_timeout(2) { !@worker.pool.shutdown? }
assert @worker.pool.shutdown?
ensure
SolidQueue.process_heartbeat_interval = old_heartbeat_interval
end

private
def with_polling(silence:)
old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence
Expand Down

0 comments on commit 8744877

Please sign in to comment.