Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrage committed Jun 4, 2024
1 parent 3eab753 commit c93a56b
Showing 1 changed file with 44 additions and 19 deletions.
63 changes: 44 additions & 19 deletions lib/pg_ha_migrations/safe_statements.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ def safe_added_columns_without_default_value
@safe_added_columns_without_default_value ||= []
end

# This variable is used to track nested lock acquisition.
# Each element is an array of table objects with their lock modes.
# The order of the array represents the current call stack,
# where the most recent method call is the last element.
def safely_acquire_lock_for_table_history
@safely_acquire_lock_for_table_history ||= []
end

def safe_create_table(table, options={}, &block)
if options[:force]
raise PgHaMigrations::UnsafeMigrationError.new(":force is NOT SAFE! Explicitly call unsafe_drop_table first if you want to recreate an existing table")
Expand Down Expand Up @@ -528,35 +536,52 @@ def exec_migration(conn, direction)
super(conn, direction)
end

def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block)
nested_target_table = Thread.current[__method__]
def safely_acquire_lock_for_table(*tables, mode: :access_exclusive, &block)
# So this variable is always available in the ensure block
successfully_acquired_lock = false

_check_postgres_adapter!

target_table = PgHaMigrations::Table.from_table_name(table, mode)
target_tables = tables.map do |target_table|
PgHaMigrations::Table.from_table_name(target_table, mode)
end.sort_by(&:fully_qualified_name)

if nested_target_table
if nested_target_table != target_table
raise PgHaMigrations::InvalidMigrationError, "Nested lock detected! Cannot acquire lock on #{target_table.fully_qualified_name} while #{nested_target_table.fully_qualified_name} is locked."
elsif nested_target_table.mode < target_table.mode
raise PgHaMigrations::InvalidMigrationError, "Lock escalation detected! Cannot change lock level from :#{nested_target_table.mode} to :#{target_table.mode} for #{target_table.fully_qualified_name}."
# Lock mode is the same across supplied tables so just grab the first.
# This variable is useful for error handling and messaging.
target_mode = target_tables.first.mode

# Grab the latest locked tables from the call stack.
# This will be nil if we are not in a nested context.
nested_target_tables = safely_acquire_lock_for_table_history.last

if nested_target_tables
if target_tables != nested_target_tables
raise PgHaMigrations::InvalidMigrationError,
"Nested lock detected! Cannot acquire lock on #{target_tables.map(&:fully_qualified_name).join(", ")} " \
"while #{nested_target_tables.map(&:fully_qualified_name).join(", ")} is locked."
end

nested_target_mode = nested_target_tables.first.mode

if nested_target_mode < target_mode
raise PgHaMigrations::InvalidMigrationError,
"Lock escalation detected! Cannot change lock level from :#{nested_target_mode} " \
"to :#{target_mode} for #{target_tables.map(&:fully_qualified_name).join(", ")}."
end
else
Thread.current[__method__] = target_table
end

# Locking a partitioned table will also lock child tables (including sub-partitions),
# so we need to check for blocking queries on those tables as well
target_tables = target_table.partitions(include_sub_partitions: true, include_self: true)

successfully_acquired_lock = false
target_tables_for_blocking_transactions = target_tables.flat_map do |target_table|
target_table.partitions(include_sub_partitions: true, include_self: true)
end

until successfully_acquired_lock
while (
blocking_transactions = PgHaMigrations::BlockingDatabaseTransactions.find_blocking_transactions("#{PgHaMigrations::LOCK_TIMEOUT_SECONDS} seconds")
blocking_transactions.any? do |query|
query.tables_with_locks.any? do |locked_table|
target_tables.any? do |target_table|
target_tables_for_blocking_transactions.any? do |target_table|
target_table.conflicts_with?(locked_table)
end
end
Expand All @@ -570,16 +595,15 @@ def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block)
end

connection.transaction do
adjust_timeout_method = connection.postgresql_version >= 9_03_00 ? :adjust_lock_timeout : :adjust_statement_timeout
begin
method(adjust_timeout_method).call(PgHaMigrations::LOCK_TIMEOUT_SECONDS) do
connection.execute("LOCK #{target_table.fully_qualified_name} IN #{target_table.mode.to_sql} MODE;")
adjust_statement_timeout(PgHaMigrations::LOCK_TIMEOUT_SECONDS) do
connection.execute("LOCK #{target_tables.map(&:fully_qualified_name).join(", ")} IN #{target_mode.to_sql} MODE;")
end
successfully_acquired_lock = true
rescue ActiveRecord::StatementInvalid => e
if e.message =~ /PG::LockNotAvailable.+ lock timeout/ || e.message =~ /PG::QueryCanceled.+ statement timeout/
sleep_seconds = PgHaMigrations::LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER * PgHaMigrations::LOCK_TIMEOUT_SECONDS
say "Timed out trying to acquire #{target_table.mode.to_sql} lock on the #{target_table.fully_qualified_name} table."
say "Timed out trying to acquire #{target_mode.to_sql} lock on #{target_tables.map(&:fully_qualified_name).join(", ")}."
say "Sleeping for #{sleep_seconds}s to allow potentially queued up queries to finish before continuing."
sleep(sleep_seconds)

Expand All @@ -590,12 +614,13 @@ def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block)
end

if successfully_acquired_lock
safely_acquire_lock_for_table_history.push(target_tables)
block.call
end
end
end
ensure
Thread.current[__method__] = nil unless nested_target_table
safely_acquire_lock_for_table_history.pop if successfully_acquired_lock
end

def adjust_lock_timeout(timeout_seconds = PgHaMigrations::LOCK_TIMEOUT_SECONDS, &block)
Expand Down

0 comments on commit c93a56b

Please sign in to comment.