Skip to content

Commit

Permalink
Support for different lock modes (#88)
Browse files Browse the repository at this point in the history
Addresses #39
  • Loading branch information
rkrage authored Nov 20, 2023
1 parent 14a48df commit c4b0d4a
Show file tree
Hide file tree
Showing 8 changed files with 695 additions and 48 deletions.
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -424,14 +424,22 @@ safe_partman_reapply_privileges :table

#### safely\_acquire\_lock\_for\_table

Safely acquire a lock for a table.
Safely acquire an access exclusive lock for a table.

```ruby
safely_acquire_lock_for_table(:table) do
...
end
```

Safely acquire a lock for a table in a different mode.

```ruby
safely_acquire_lock_for_table(:table, mode: :share) do
...
end
```

Note:

We enforce that only one table (or a table and its partitions) can be locked at a time.
Expand Down
1 change: 1 addition & 0 deletions lib/pg_ha_migrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def self.configure
require "pg_ha_migrations/blocking_database_transactions"
require "pg_ha_migrations/blocking_database_transactions_reporter"
require "pg_ha_migrations/partman_config"
require "pg_ha_migrations/lock_mode"
require "pg_ha_migrations/unsafe_statements"
require "pg_ha_migrations/safe_statements"
require "pg_ha_migrations/dependent_objects_checks"
Expand Down
4 changes: 2 additions & 2 deletions lib/pg_ha_migrations/blocking_database_transactions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class BlockingDatabaseTransactions
def initialize(*args)
super

self.tables_with_locks = tables_with_locks.map { |name, schema| Table.new(name, schema) }.select(&:present?)
self.tables_with_locks = tables_with_locks.map { |args| Table.new(*args) }.select(&:present?)
end

def description
Expand Down Expand Up @@ -48,7 +48,7 @@ def self.find_blocking_transactions(minimum_transaction_age = "0 seconds")
psa.#{query_column} as current_query,
psa.state,
clock_timestamp() - psa.xact_start AS transaction_age,
array_agg(distinct array[c.relname, ns.nspname]) AS tables_with_locks
array_agg(distinct array[c.relname, ns.nspname, l.mode]) AS tables_with_locks
FROM pg_stat_activity psa -- Cluster wide
LEFT JOIN pg_locks l ON (psa.#{pid_column} = l.pid) -- Cluster wide
LEFT JOIN pg_class c ON ( -- Database wide
Expand Down
100 changes: 100 additions & 0 deletions lib/pg_ha_migrations/lock_mode.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
module PgHaMigrations
class LockMode
include Comparable

MODE_CONFLICTS = ActiveSupport::OrderedHash.new

MODE_CONFLICTS[:access_share] = %i[
access_exclusive
]

MODE_CONFLICTS[:row_share] = %i[
exclusive
access_exclusive
]

MODE_CONFLICTS[:row_exclusive] = %i[
share
share_row_exclusive
exclusive
access_exclusive
]

MODE_CONFLICTS[:share_update_exclusive] = %i[
share_update_exclusive
share
share_row_exclusive
exclusive
access_exclusive
]

MODE_CONFLICTS[:share] = %i[
row_exclusive
share_update_exclusive
share_row_exclusive
exclusive
access_exclusive
]

MODE_CONFLICTS[:share_row_exclusive] = %i[
row_exclusive
share_update_exclusive
share
share_row_exclusive
exclusive
access_exclusive
]

MODE_CONFLICTS[:exclusive] = %i[
row_share
row_exclusive
share_update_exclusive
share
share_row_exclusive
exclusive
access_exclusive
]

MODE_CONFLICTS[:access_exclusive] = %i[
access_share
row_share
row_exclusive
share_update_exclusive
share
share_row_exclusive
exclusive
access_exclusive
]

attr_reader :mode

delegate :to_s, to: :mode

def initialize(mode)
@mode = mode
.to_s
.underscore
.delete_suffix("_lock")
.to_sym

if !MODE_CONFLICTS.keys.include?(@mode)
raise ArgumentError, "Unrecognized lock mode #{@mode.inspect}. Valid modes: #{MODE_CONFLICTS.keys}"
end
end

def to_sql
mode
.to_s
.upcase
.gsub("_", " ")
end

def <=>(other)
MODE_CONFLICTS.keys.index(mode) <=> MODE_CONFLICTS.keys.index(other.mode)
end

def conflicts_with?(other)
MODE_CONFLICTS[mode].include?(other.mode)
end
end
end
28 changes: 23 additions & 5 deletions lib/pg_ha_migrations/relation.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
module PgHaMigrations
Relation = Struct.new(:name, :schema) do
Relation = Struct.new(:name, :schema, :mode) do
def self.connection
ActiveRecord::Base.connection
end

delegate :inspect, to: :name
delegate :connection, to: :class

def initialize(name, schema, mode=nil)
super(name, schema)

self.mode = LockMode.new(mode) if mode.present?
end

def conflicts_with?(other)
self == other && (
mode.nil? || other.mode.nil? || mode.conflicts_with?(other.mode)
)
end

def fully_qualified_name
@fully_qualified_name ||= [
PG::Connection.quote_ident(schema),
Expand All @@ -17,10 +29,14 @@ def fully_qualified_name
def present?
name.present? && schema.present?
end

def ==(other)
other.is_a?(Relation) && name == other.name && schema == other.schema
end
end

class Table < Relation
def self.from_table_name(table)
def self.from_table_name(table, mode=nil)
pg_name = ActiveRecord::ConnectionAdapters::PostgreSQL::Utils.extract_schema_qualified_name(table.to_s)

schema_conditional = if pg_name.schema
Expand All @@ -39,7 +55,7 @@ def self.from_table_name(table)

raise UndefinedTableError, "Table #{pg_name.quoted} does not exist#{" in search path" unless pg_name.schema}" unless schema.present?

new(pg_name.identifier, schema)
new(pg_name.identifier, schema, mode)
end

def natively_partitioned?
Expand All @@ -53,9 +69,9 @@ def natively_partitioned?
SQL
end

def partitions(include_sub_partitions: false)
def partitions(include_sub_partitions: false, include_self: false)
tables = connection.structs_from_sql(self.class, <<~SQL)
SELECT child.relname AS name, child_ns.nspname AS schema
SELECT child.relname AS name, child_ns.nspname AS schema, NULLIF('#{mode}', '') AS mode
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
Expand All @@ -73,6 +89,8 @@ def partitions(include_sub_partitions: false)
tables.concat(sub_partitions)
end

tables.prepend(self) if include_self

tables
end
end
Expand Down
33 changes: 18 additions & 15 deletions lib/pg_ha_migrations/safe_statements.rb
Original file line number Diff line number Diff line change
Expand Up @@ -519,34 +519,37 @@ def exec_migration(conn, direction)
super(conn, direction)
end

def safely_acquire_lock_for_table(table, &block)
nested_lock = Thread.current[__method__]
def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block)
nested_target_table = Thread.current[__method__]

_check_postgres_adapter!

table = PgHaMigrations::Table.from_table_name(table)
target_table = PgHaMigrations::Table.from_table_name(table, mode)

# Disallow nested locks unless targeting the same table
if nested_lock && nested_lock != table
raise PgHaMigrations::InvalidMigrationError, "Nested lock detected! Cannot acquire lock on #{table.fully_qualified_name} while #{nested_lock.fully_qualified_name} is locked."
if nested_target_table
if nested_target_table != target_table
raise PgHaMigrations::InvalidMigrationError, "Nested lock detected! Cannot acquire lock on #{target_table.fully_qualified_name} while #{nested_target_table.fully_qualified_name} is locked."
elsif nested_target_table.mode < target_table.mode
raise PgHaMigrations::InvalidMigrationError, "Lock escalation detected! Cannot change lock level from :#{nested_target_table.mode} to :#{target_table.mode} for #{target_table.fully_qualified_name}."
end
else
Thread.current[__method__] = table
Thread.current[__method__] = target_table
end

target_tables = [table]

# Locking a partitioned table will also lock child tables (including sub-partitions),
# so we need to check for blocking queries on those tables as well
target_tables.concat(table.partitions(include_sub_partitions: true))
target_tables = target_table.partitions(include_sub_partitions: true, include_self: true)

successfully_acquired_lock = false

until successfully_acquired_lock
while (
blocking_transactions = PgHaMigrations::BlockingDatabaseTransactions.find_blocking_transactions("#{PgHaMigrations::LOCK_TIMEOUT_SECONDS} seconds")
blocking_transactions.any? do |query|
query.tables_with_locks.any? do |table|
target_tables.include?(table)
query.tables_with_locks.any? do |locked_table|
target_tables.any? do |target_table|
target_table.conflicts_with?(locked_table)
end
end
end
)
Expand All @@ -561,13 +564,13 @@ def safely_acquire_lock_for_table(table, &block)
adjust_timeout_method = connection.postgresql_version >= 9_03_00 ? :adjust_lock_timeout : :adjust_statement_timeout
begin
method(adjust_timeout_method).call(PgHaMigrations::LOCK_TIMEOUT_SECONDS) do
connection.execute("LOCK #{table.fully_qualified_name};")
connection.execute("LOCK #{target_table.fully_qualified_name} IN #{target_table.mode.to_sql} MODE;")
end
successfully_acquired_lock = true
rescue ActiveRecord::StatementInvalid => e
if e.message =~ /PG::LockNotAvailable.+ lock timeout/ || e.message =~ /PG::QueryCanceled.+ statement timeout/
sleep_seconds = PgHaMigrations::LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER * PgHaMigrations::LOCK_TIMEOUT_SECONDS
say "Timed out trying to acquire an exclusive lock on the #{table.fully_qualified_name} table."
say "Timed out trying to acquire #{target_table.mode.to_sql} lock on the #{target_table.fully_qualified_name} table."
say "Sleeping for #{sleep_seconds}s to allow potentially queued up queries to finish before continuing."
sleep(sleep_seconds)

Expand All @@ -583,7 +586,7 @@ def safely_acquire_lock_for_table(table, &block)
end
end
ensure
Thread.current[__method__] = nil unless nested_lock
Thread.current[__method__] = nil unless nested_target_table
end

def adjust_lock_timeout(timeout_seconds = PgHaMigrations::LOCK_TIMEOUT_SECONDS, &block)
Expand Down
Loading

0 comments on commit c4b0d4a

Please sign in to comment.