Skip to content

Commit

Permalink
Move some Runner's methods to ProcessRegistration
Browse files Browse the repository at this point in the history
As I want to use these to register the supervisor as well, and Runner is not
something that can be easily adapted to the supervisor.

Also: extend existing tests to assert process metadata.
  • Loading branch information
rosa committed Nov 2, 2023
1 parent 3539250 commit d3f0538
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 18 deletions.
13 changes: 0 additions & 13 deletions lib/solid_queue/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,8 @@ module Runner
extend ActiveSupport::Concern

included do
include ActiveSupport::Callbacks
define_callbacks :start, :run, :shutdown

include AppExecutor, Procline
include ProcessRegistration, Interruptible

attr_accessor :supervisor_pid
end

def start(mode: :supervised)
Expand Down Expand Up @@ -109,13 +104,5 @@ def all_work_completed?
def running_inline?
mode.inline?
end

def hostname
@hostname ||= Socket.gethostname
end

def process_pid
@pid ||= ::Process.pid
end
end
end
13 changes: 13 additions & 0 deletions lib/solid_queue/runner/process_registration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ module SolidQueue::Runner::ProcessRegistration
extend ActiveSupport::Concern

included do
include ActiveSupport::Callbacks
define_callbacks :start, :run, :shutdown

set_callback :start, :before, :register
set_callback :start, :before, :start_heartbeat

set_callback :run, :after, -> { stop unless registered? }

set_callback :shutdown, :before, :stop_heartbeat
set_callback :shutdown, :after, :deregister

attr_accessor :supervisor_pid
end

def inspect
Expand Down Expand Up @@ -46,6 +51,14 @@ def heartbeat
process.heartbeat
end

def hostname
@hostname ||= Socket.gethostname
end

def process_pid
@pid ||= ::Process.pid
end

def metadata
{ kind: self.class.name.demodulize, hostname: hostname, pid: process_pid, supervisor_pid: supervisor_pid }
end
Expand Down
12 changes: 7 additions & 5 deletions test/integration/processes_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase
@pid = run_supervisor_as_fork

wait_for_registered_processes(3, timeout: 0.2.second)
assert_registered_processes_for(:background, :default)
assert_registered_workers_for(:background, :default)
end

teardown do
Expand Down Expand Up @@ -76,7 +76,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase
assert_job_status(pause, :claimed)

# Processes didn't have a chance to deregister either
assert_registered_processes_for(:background, :default)
assert_registered_workers_for(:background, :default)
end

test "term supervisor while there are jobs in-flight" do
Expand Down Expand Up @@ -127,7 +127,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase
assert_job_status(pause, :claimed)

# The process running the long job couldn't deregister, the other did
assert_registered_processes_for(:background)
assert_registered_workers_for(:background)
end

test "process some jobs that raise errors" do
Expand Down Expand Up @@ -177,7 +177,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase

terminate_supervisor
# TODO: change this to clean termination when replacing a worker also deregisters its process ID
assert_registered_processes_for(:background)
assert_registered_workers_for(:background)
end

private
Expand All @@ -198,10 +198,12 @@ def assert_clean_termination
assert_no_claimed_jobs
end

def assert_registered_processes_for(*queues)
def assert_registered_workers_for(*queues)
skip_active_record_query_cache do
registered_queues = SolidQueue::Process.all.map { |process| process.metadata["queues"] }.compact
assert_equal queues.map(&:to_s).sort, registered_queues.sort
assert_equal [ "Worker" ], SolidQueue::Process.all.map { |process| process.metadata["kind"] }.uniq
assert_equal [ @pid ], SolidQueue::Process.all.map { |process| process.metadata["supervisor_pid"] }.uniq
end
end

Expand Down
24 changes: 24 additions & 0 deletions test/unit/supervisor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class SupervisorTest < ActiveSupport::TestCase
test "start in work mode (default)" do
pid = run_supervisor_as_fork
wait_for_registered_processes(0.3)
assert_registered_workers(2, supervisor_pid: pid)

terminate_process(pid)

Expand All @@ -26,6 +27,7 @@ class SupervisorTest < ActiveSupport::TestCase
test "start in schedule mode" do
pid = run_supervisor_as_fork(mode: :schedule)
wait_for_registered_processes(0.3)
assert_registered_scheduler(supervisor_pid: pid)

terminate_process(pid)

Expand Down Expand Up @@ -76,4 +78,26 @@ class SupervisorTest < ActiveSupport::TestCase

terminate_process(pid)
end

private
def assert_registered_workers(count, **metadata)
skip_active_record_query_cache do
assert_equal count, SolidQueue::Process.count

SolidQueue::Process.all.each do |process|
assert_equal "Worker", process.metadata["kind"]
assert metadata < process.metadata.symbolize_keys
end
end
end

def assert_registered_scheduler(**metadata)
skip_active_record_query_cache do
assert_equal 1, SolidQueue::Process.count
scheduler = SolidQueue::Process.first

assert_equal "Scheduler", scheduler.metadata["kind"]
assert metadata < scheduler.metadata.symbolize_keys
end
end
end

0 comments on commit d3f0538

Please sign in to comment.