From e9d8865f4a3f51be2ba417f50b5c514e09649980 Mon Sep 17 00:00:00 2001 From: Ryan Krage Date: Mon, 3 Jun 2024 15:49:45 +0000 Subject: [PATCH] Safely locking multiple tables - Add test for swallowed postgres exceptions - Add test for skipping blocking query check for nested locks - Add tests for multi-table locking --- lib/pg_ha_migrations/safe_statements.rb | 82 ++++++--- spec/safe_statements_spec.rb | 215 +++++++++++++++++++++--- 2 files changed, 248 insertions(+), 49 deletions(-) diff --git a/lib/pg_ha_migrations/safe_statements.rb b/lib/pg_ha_migrations/safe_statements.rb index 0a6ef04..c15e9d5 100644 --- a/lib/pg_ha_migrations/safe_statements.rb +++ b/lib/pg_ha_migrations/safe_statements.rb @@ -3,6 +3,14 @@ def safe_added_columns_without_default_value @safe_added_columns_without_default_value ||= [] end + # This variable is used to track nested lock acquisition. + # Each element is an array of table objects with their lock modes. + # The order of the array represents the current call stack, + # where the most recent method call is the last element. + def safely_acquire_lock_for_table_history + @safely_acquire_lock_for_table_history ||= [] + end + def safe_create_table(table, options={}, &block) if options[:force] raise PgHaMigrations::UnsafeMigrationError.new(":force is NOT SAFE! Explicitly call unsafe_drop_table first if you want to recreate an existing table") @@ -528,40 +536,64 @@ def exec_migration(conn, direction) super(conn, direction) end - def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block) - nested_target_table = Thread.current[__method__] + def safely_acquire_lock_for_table(*tables, mode: :access_exclusive, &block) + # So this variable is always available in the ensure block + successfully_acquired_lock = false _check_postgres_adapter! - target_table = PgHaMigrations::Table.from_table_name(table, mode) + target_tables = tables.map do |target_table| + PgHaMigrations::Table.from_table_name(target_table, mode) + end.sort_by(&:fully_qualified_name) + + # Lock mode is the same across supplied tables so just grab the first. + # This variable is useful for error handling and messaging. + target_mode = target_tables.first.mode - if nested_target_table - if nested_target_table != target_table - raise PgHaMigrations::InvalidMigrationError, "Nested lock detected! Cannot acquire lock on #{target_table.fully_qualified_name} while #{nested_target_table.fully_qualified_name} is locked." - elsif nested_target_table.mode < target_table.mode - raise PgHaMigrations::InvalidMigrationError, "Lock escalation detected! Cannot change lock level from :#{nested_target_table.mode} to :#{target_table.mode} for #{target_table.fully_qualified_name}." + # Grab the latest locked tables from the call stack. + # This will be nil if we are not in a nested context. + nested_target_tables = safely_acquire_lock_for_table_history.last + + if nested_target_tables + if target_tables != nested_target_tables + raise PgHaMigrations::InvalidMigrationError, + "Nested lock detected! Cannot acquire lock on #{target_tables.map(&:fully_qualified_name).join(", ")} " \ + "while #{nested_target_tables.map(&:fully_qualified_name).join(", ")} is locked." end - else - Thread.current[__method__] = target_table - end - # Locking a partitioned table will also lock child tables (including sub-partitions), - # so we need to check for blocking queries on those tables as well - target_tables = target_table.partitions(include_sub_partitions: true, include_self: true) + nested_target_mode = nested_target_tables.first.mode - successfully_acquired_lock = false + if nested_target_mode < target_mode + raise PgHaMigrations::InvalidMigrationError, + "Lock escalation detected! Cannot change lock level from :#{nested_target_mode} " \ + "to :#{target_mode} for #{target_tables.map(&:fully_qualified_name).join(", ")}." + end + end until successfully_acquired_lock - while ( + loop do + # If in a nested context and all of the above checks have passed, + # we have already acquired the lock so this check is unnecessary. + # In fact, it could actually cause a deadlock if a blocking query + # was executed shortly after the initial lock acquisition. + break if nested_target_tables + blocking_transactions = PgHaMigrations::BlockingDatabaseTransactions.find_blocking_transactions("#{PgHaMigrations::LOCK_TIMEOUT_SECONDS} seconds") - blocking_transactions.any? do |query| + + # Locking a partitioned table will also lock child tables (including sub-partitions), + # so we need to check for blocking queries on those tables as well + target_tables_for_blocking_transactions = target_tables.flat_map do |target_table| + target_table.partitions(include_sub_partitions: true, include_self: true) + end + + break unless blocking_transactions.any? do |query| query.tables_with_locks.any? do |locked_table| - target_tables.any? do |target_table| + target_tables_for_blocking_transactions.any? do |target_table| target_table.conflicts_with?(locked_table) end end end - ) + say "Waiting on blocking transactions:" blocking_transactions.each do |blocking_transaction| say blocking_transaction.description @@ -570,16 +602,15 @@ def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block) end connection.transaction do - adjust_timeout_method = connection.postgresql_version >= 9_03_00 ? :adjust_lock_timeout : :adjust_statement_timeout begin - method(adjust_timeout_method).call(PgHaMigrations::LOCK_TIMEOUT_SECONDS) do - connection.execute("LOCK #{target_table.fully_qualified_name} IN #{target_table.mode.to_sql} MODE;") + adjust_statement_timeout(PgHaMigrations::LOCK_TIMEOUT_SECONDS) do + connection.execute("LOCK #{target_tables.map(&:fully_qualified_name).join(", ")} IN #{target_mode.to_sql} MODE;") end successfully_acquired_lock = true rescue ActiveRecord::StatementInvalid => e - if e.message =~ /PG::LockNotAvailable.+ lock timeout/ || e.message =~ /PG::QueryCanceled.+ statement timeout/ + if e.message =~ /PG::QueryCanceled.+ statement timeout/ sleep_seconds = PgHaMigrations::LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER * PgHaMigrations::LOCK_TIMEOUT_SECONDS - say "Timed out trying to acquire #{target_table.mode.to_sql} lock on the #{target_table.fully_qualified_name} table." + say "Timed out trying to acquire #{target_mode.to_sql} lock on #{target_tables.map(&:fully_qualified_name).join(", ")}." say "Sleeping for #{sleep_seconds}s to allow potentially queued up queries to finish before continuing." sleep(sleep_seconds) @@ -590,12 +621,13 @@ def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block) end if successfully_acquired_lock + safely_acquire_lock_for_table_history.push(target_tables) block.call end end end ensure - Thread.current[__method__] = nil unless nested_target_table + safely_acquire_lock_for_table_history.pop if successfully_acquired_lock end def adjust_lock_timeout(timeout_seconds = PgHaMigrations::LOCK_TIMEOUT_SECONDS, &block) diff --git a/spec/safe_statements_spec.rb b/spec/safe_statements_spec.rb index 50b1073..206cc07 100644 --- a/spec/safe_statements_spec.rb +++ b/spec/safe_statements_spec.rb @@ -3793,14 +3793,15 @@ def up let(:alternate_connection_pool) do ActiveRecord::ConnectionAdapters::ConnectionPool.new(pool_config) end - let(:alternate_connection) do - alternate_connection_pool.connection - end + + let(:alternate_connection) { alternate_connection_pool.connection } + let(:alternate_connection_2) { alternate_connection_pool.connection } let(:migration) { Class.new(migration_klass).new } before(:each) do ActiveRecord::Base.connection.execute(<<~SQL) CREATE TABLE #{table_name}(pk SERIAL, i INTEGER); + CREATE TABLE #{table_name}_2(pk SERIAL, i INTEGER); CREATE SCHEMA partman; CREATE EXTENSION pg_partman SCHEMA partman; SQL @@ -3829,6 +3830,28 @@ def up end end + it "acquires exclusive locks by default when multiple tables provided" do + migration.safely_acquire_lock_for_table(table_name, "bogus_table_2") do + expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ) + ) + + expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table_2", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ) + ) + end + end + it "acquires a lock in a different mode when provided" do migration.safely_acquire_lock_for_table(table_name, mode: :share) do expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( @@ -3842,6 +3865,28 @@ def up end end + it "acquires locks in a different mode when multiple tables and mode provided" do + migration.safely_acquire_lock_for_table(table_name, "bogus_table_2", mode: :share_row_exclusive) do + expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "ShareRowExclusiveLock", + granted: true, + pid: kind_of(Integer), + ) + ) + + expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table_2", + lock_type: "ShareRowExclusiveLock", + granted: true, + pid: kind_of(Integer), + ) + ) + end + end + it "raises error when invalid lock mode provided" do expect do migration.safely_acquire_lock_for_table(table_name, mode: :garbage) {} @@ -3851,7 +3896,7 @@ def up ) end - it "releases the lock (even after an exception)" do + it "releases the lock even after an exception" do begin migration.safely_acquire_lock_for_table(table_name) do raise "bogus error" @@ -3862,6 +3907,32 @@ def up expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty end + it "releases the lock even after a swallowed postgres exception" do + migration.safely_acquire_lock_for_table(table_name) do + expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + ) + + begin + migration.connection.execute("SELECT * FROM garbage") + rescue + end + + expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + + expect do + migration.connection.execute("SELECT * FROM bogus_table") + end.to raise_error(ActiveRecord::StatementInvalid, /PG::InFailedSqlTransaction/) + end + + expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + end + it "waits to acquire a lock if the table is already blocked" do block_call_count = 0 expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions).exactly(3).times do |*args| @@ -3883,6 +3954,36 @@ def up end end + it "times out the lock query after LOCK_TIMEOUT_SECONDS when multiple tables provided" do + stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) + stub_const("PgHaMigrations::LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER", 0) + allow(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions).and_return([]) + allow(migration.connection).to receive(:execute).and_call_original + + expect(migration.connection).to receive(:execute).with("LOCK \"public\".\"bogus_table\", \"public\".\"bogus_table_2\" IN ACCESS EXCLUSIVE MODE;") + .at_least(3) + .times + + begin + query_thread = Thread.new do + alternate_connection.execute("BEGIN; LOCK bogus_table_2;") + sleep 3 + alternate_connection.execute("ROLLBACK") + end + + sleep 0.5 + + migration.suppress_messages do + migration.safely_acquire_lock_for_table(table_name, "bogus_table_2") do + expect(locks_for_table(table_name, connection: alternate_connection_2)).not_to be_empty + expect(locks_for_table("bogus_table_2", connection: alternate_connection_2)).not_to be_empty + end + end + ensure + query_thread.join + end + end + it "does not wait to acquire a lock if the table has an existing but non-conflicting lock" do stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) @@ -4303,6 +4404,53 @@ def up expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty end + it "allows re-entrancy when multiple tables provided" do + migration.safely_acquire_lock_for_table(table_name, "bogus_table_2") do + # The ordering of the args is intentional here to ensure + # the array sorting and equality logic works as intended + migration.safely_acquire_lock_for_table("bogus_table_2", table_name) do + expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + ) + + expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table_2", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + ) + end + + expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + ) + + expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to contain_exactly( + having_attributes( + table: "bogus_table_2", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + ) + end + + expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to be_empty + end + it "allows re-entrancy when inner lock is a lower level" do migration.safely_acquire_lock_for_table(table_name) do migration.safely_acquire_lock_for_table(table_name, mode: :exclusive) do @@ -4353,6 +4501,45 @@ def up expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty end + it "skips blocking query check for nested lock acquisition" do + stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1) + + query_thread = nil + + expect(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions) + .once + .and_call_original + + begin + migration.safely_acquire_lock_for_table(table_name) do + query_thread = Thread.new { alternate_connection.execute("SELECT * FROM bogus_table") } + + sleep 2 + + migration.safely_acquire_lock_for_table(table_name) do + expect(locks_for_table(table_name, connection: alternate_connection_2)).to contain_exactly( + having_attributes( + table: "bogus_table", + lock_type: "AccessExclusiveLock", + granted: true, + pid: kind_of(Integer), + ), + having_attributes( + table: "bogus_table", + lock_type: "AccessShareLock", + granted: false, + pid: kind_of(Integer), + ), + ) + end + end + ensure + query_thread.join if query_thread + end + + expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty + end + it "does not allow re-entrancy when lock escalation detected" do expect do migration.safely_acquire_lock_for_table(table_name, mode: :share) do @@ -4405,26 +4592,6 @@ def up expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty end - - it "uses statement_timeout instead of lock_timeout when on Postgres 9.1" do - allow(ActiveRecord::Base.connection).to receive(:postgresql_version).and_wrap_original do |m, *args| - if caller.detect { |line| line =~ /lib\/pg_ha_migrations\/blocking_database_transactions\.rb/ } - # The long-running transactions check needs to know the actual - # Postgres version to use the proper columns, so we don't want - # to mock any calls from it. - m.call(*args) - else - 9_01_12 - end - end - - expect do - migration.safely_acquire_lock_for_table(table_name) do - expect(locks_for_table(table_name, connection: alternate_connection)).not_to be_empty - end - expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty - end.not_to make_database_queries(matching: /lock_timeout/i) - end end end