Skip to content

Commit

Permalink
Merge pull request #31 from basecamp/ensure-only-claimed-jobs-are-del…
Browse files Browse the repository at this point in the history
…eted

Delete ready_executions only when they've been claimed
  • Loading branch information
rosa authored Oct 30, 2023
2 parents d844a30 + 57bad40 commit 9cd9971
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 41 deletions.
14 changes: 9 additions & 5 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ def success?
end
end

CLAIM_ATTRIBUTES = %w[ job_id ]

class << self
def claim_batch(job_ids)
claimed_at = Time.current
rows = Array(job_ids).map { |id| { job_id: id, created_at: claimed_at } }
insert_all(rows) if rows.any?
def claiming(executions, &block)
job_data = Array(executions).collect { |execution| execution.attributes.slice(*CLAIM_ATTRIBUTES) }

SolidQueue.logger.info("[SolidQueue] Claimed #{rows.size} jobs at #{claimed_at}")
insert_all(job_data)
where(job_id: job_data.map { |data| data["job_id"]} ).tap do |claimed|
block.call(claimed)
SolidQueue.logger.info("[SolidQueue] Claimed #{claimed.size} jobs")
end
end

def release_all
Expand Down
33 changes: 12 additions & 21 deletions app/models/solid_queue/ready_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,35 @@ class ReadyExecution < Execution

class << self
def claim(queues, limit)
return [] unless limit > 0

candidate_job_ids = []

transaction do
candidate_job_ids = query_candidates(queues, limit)
lock(candidate_job_ids)
candidates = select_candidates(queues, limit)
lock(candidates)
end

claimed_executions_for(candidate_job_ids)
end

def queued_as(queues)
QueueParser.new(queues, self).scoped_relation
end

private
def query_candidates(queues, limit)
queued_as(queues).not_paused.ordered.limit(limit).lock("FOR UPDATE SKIP LOCKED").pluck(:job_id)
def select_candidates(queues, limit)
queued_as(queues).not_paused.ordered.limit(limit).lock("FOR UPDATE SKIP LOCKED")
end

def lock(job_ids)
return nil if job_ids.none?
SolidQueue::ClaimedExecution.claim_batch(job_ids)
where(job_id: job_ids).delete_all
end

def claimed_executions_for(job_ids)
return [] if job_ids.none?
def lock(candidates)
return [] if candidates.none?

SolidQueue::ClaimedExecution.where(job_id: job_ids)
SolidQueue::ClaimedExecution.claiming(candidates) do |claimed|
where(job_id: claimed.pluck(:job_id)).delete_all
end
end
end

def claim
transaction do
SolidQueue::ClaimedExecution.claim_batch(job_id)
delete
SolidQueue::ClaimedExecution.claiming(self) do |claimed|
delete if claimed.one?
end
end
end
end
Expand Down
8 changes: 8 additions & 0 deletions db/migrate/20231030164933_make_job_id_not_null.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class MakeJobIdNotNull < ActiveRecord::Migration[7.1]
def change
change_column :solid_queue_claimed_executions, :job_id, :bigint, null: false
change_column :solid_queue_failed_executions, :job_id, :bigint, null: false
change_column :solid_queue_ready_executions, :job_id, :bigint, null: false
change_column :solid_queue_scheduled_executions, :job_id, :bigint, null: false
end
end
10 changes: 5 additions & 5 deletions test/dummy/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.1].define(version: 2023_10_25_165946) do
ActiveRecord::Schema[7.1].define(version: 2023_10_30_164933) do
create_table "job_results", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.string "queue_name"
t.string "status"
Expand All @@ -20,15 +20,15 @@
end

create_table "solid_queue_claimed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.bigint "job_id"
t.bigint "job_id", null: false
t.bigint "process_id"
t.datetime "created_at", null: false
t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true
t.index ["process_id"], name: "index_solid_queue_claimed_executions_on_process_id"
end

create_table "solid_queue_failed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.bigint "job_id"
t.bigint "job_id", null: false
t.text "error"
t.datetime "created_at", null: false
t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true
Expand Down Expand Up @@ -63,7 +63,7 @@
end

create_table "solid_queue_ready_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.bigint "job_id"
t.bigint "job_id", null: false
t.string "queue_name", null: false
t.integer "priority", default: 0, null: false
t.datetime "created_at", null: false
Expand All @@ -73,7 +73,7 @@
end

create_table "solid_queue_scheduled_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.bigint "job_id"
t.bigint "job_id", null: false
t.string "queue_name", null: false
t.integer "priority", default: 0, null: false
t.datetime "scheduled_at", null: false
Expand Down
2 changes: 1 addition & 1 deletion test/integration/processes_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase
setup do
@pid = run_supervisor_as_fork

wait_for_registered_processes(3, timeout: 0.1.second)
wait_for_registered_processes(3, timeout: 0.2.second)
assert_registered_processes_for(:background, :default)
end

Expand Down
6 changes: 3 additions & 3 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ActiveSupport::TestCase
def wait_for_jobs_to_finish_for(timeout = 1.second)
Timeout.timeout(timeout) do
while SolidQueue::Job.where(finished_at: nil).any? do
sleep 0.25
sleep 0.05
end
end
rescue Timeout::Error
Expand All @@ -44,7 +44,7 @@ def run_supervisor_as_fork(**options)
def wait_for_registered_processes(count, timeout: 1.second)
Timeout.timeout(timeout) do
while SolidQueue::Process.count < count do
sleep 0.25
sleep 0.05
end
end
rescue Timeout::Error
Expand All @@ -69,7 +69,7 @@ def wait_for_process_termination_with_timeout(pid, timeout: 10, from_parent: tru
else
loop do
break unless process_exists?(pid)
sleep(0.1)
sleep 0.05
end
end
end
Expand Down
12 changes: 6 additions & 6 deletions test/unit/supervisor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class SupervisorTest < ActiveSupport::TestCase

test "start in work mode (default)" do
pid = run_supervisor_as_fork
wait_for_registered_processes(2)
wait_for_registered_processes(0.3)

terminate_process(pid)

Expand All @@ -25,7 +25,7 @@ class SupervisorTest < ActiveSupport::TestCase

test "start in schedule mode" do
pid = run_supervisor_as_fork(mode: :schedule)
wait_for_registered_processes(1)
wait_for_registered_processes(0.3)

terminate_process(pid)

Expand All @@ -36,7 +36,7 @@ class SupervisorTest < ActiveSupport::TestCase
assert_not File.exist?(@pidfile)

pid = run_supervisor_as_fork(mode: :all)
wait_for_registered_processes(3)
wait_for_registered_processes(0.3)

assert File.exist?(@pidfile)
assert_equal pid, File.read(@pidfile).strip.to_i
Expand All @@ -50,7 +50,7 @@ class SupervisorTest < ActiveSupport::TestCase
File.write(@pidfile, ::Process.pid.to_s)

pid = run_supervisor_as_fork(mode: :all)
wait_for_registered_processes(3)
wait_for_registered_processes(0.3)

assert File.exist?(@pidfile)
assert_not_equal pid, File.read(@pidfile).strip.to_i
Expand All @@ -60,15 +60,15 @@ class SupervisorTest < ActiveSupport::TestCase

test "deletes previous pidfile if the owner is dead" do
pid = run_supervisor_as_fork(mode: :all)
wait_for_registered_processes(3)
wait_for_registered_processes(0.3)

terminate_process(pid, signal: :KILL)

assert File.exist?(@pidfile)
assert_equal pid, File.read(@pidfile).strip.to_i

pid = run_supervisor_as_fork(mode: :all)
wait_for_registered_processes(3)
wait_for_registered_processes(0.3)

assert File.exist?(@pidfile)
assert_equal pid, File.read(@pidfile).strip.to_i
Expand Down

0 comments on commit 9cd9971

Please sign in to comment.