From 802b5de6521e32f7238a011d2a722f95bf353d6c Mon Sep 17 00:00:00 2001 From: Jeremy Evans Date: Thu, 21 Sep 2023 09:32:55 -0700 Subject: [PATCH] Add paged_datasets to paged_update_delete plugin This extracts the dataset part of paged_update and adds it as a separate method, as it may be useful for things other than updates. While here, do not increment the offset, so that with a page size of 1, it yields datasets that each contain 1 row, instead of 2 rows. --- lib/sequel/plugins/paged_update_delete.rb | 66 +++++++--- spec/extensions/paged_update_delete_spec.rb | 126 ++++++++++++++++---- spec/integration/plugin_test.rb | 31 +++++ 3 files changed, 182 insertions(+), 41 deletions(-) diff --git a/lib/sequel/plugins/paged_update_delete.rb b/lib/sequel/plugins/paged_update_delete.rb index ba6bc304b..aab4347f9 100644 --- a/lib/sequel/plugins/paged_update_delete.rb +++ b/lib/sequel/plugins/paged_update_delete.rb @@ -6,7 +6,8 @@ module Plugins # +paged_delete+ dataset methods. These behave similarly to # the default +update+ and +delete+ dataset methods, except # that the update or deletion is done in potentially multiple - # queries. For a large table, this prevents the change from + # queries (by default, affected 1000 rows per query). + # For a large table, this prevents the change from # locking the table for a long period of time. # # Because the point of this is to prevent locking tables for @@ -50,9 +51,22 @@ module Plugins # # SELECT id FROM albums WHERE ((name <= 'M') AND (id >= 12345)) ORDER BY id LIMIT 1 OFFSET 4 # # UPDATE albums SET updated_at = CURRENT_TIMESTAMP WHERE ((name <= 'M') AND (id >= 12345)) # - # You should avoid using +paged_update+ with updates that - # modify the primary key, as such usage is not supported by - # this plugin. + # The plugin also adds a +paged_datasets+ method that will yield + # separate datasets limited in size that in total handle all + # rows in the receiver: + # + # Album.where{name > 'M'}.paged_datasets{|ds| puts ds.sql} + # # Runs: SELECT id FROM albums WHERE (name <= 'M') ORDER BY id LIMIT 1 OFFSET 1001 + # # Prints: SELECT * FROM albums WHERE ((name <= 'M') AND ("id" < 1002)) + # # Runs: SELECT id FROM albums WHERE ((name <= 'M') AND (id >= 1002)) ORDER BY id LIMIT 1 OFFSET 1001 + # # Prints: SELECT * FROM albums WHERE ((name <= 'M') AND ("id" < 1002) AND (id >= 1002)) + # # ... + # # Runs: SELECT id FROM albums WHERE ((name <= 'M') AND (id >= 10002)) ORDER BY id LIMIT 1 OFFSET 1001 + # # Prints: SELECT * FROM albums WHERE ((name <= 'M') AND (id >= 10002)) + # + # You should avoid using +paged_update+ or +paged_datasets+ + # with updates that modify the primary key, as such usage is + # not supported by this plugin. # # This plugin only supports models with scalar primary keys. # @@ -66,10 +80,36 @@ module Plugins # Album.plugin :paged_update_delete module PagedUpdateDelete module ClassMethods - Plugins.def_dataset_methods(self, [:paged_delete, :paged_update, :paged_update_delete_size]) + Plugins.def_dataset_methods(self, [:paged_datasets, :paged_delete, :paged_update, :paged_update_delete_size]) end module DatasetMethods + # Yield datasets for subsets of the receiver that are limited + # to no more than 1000 rows (you can configure the number of + # rows using paged_update_delete_size). + def paged_datasets + unless defined?(yield) + return enum_for(:paged_datasets) + end + + pk = _paged_update_delete_pk(:paged_update) + base_offset_ds = offset_ds = _paged_update_delete_offset_ds + first = nil + + while last = offset_ds.get(pk) + ds = where(pk < last) + ds = ds.where(pk >= first) if first + yield ds + first = last + offset_ds = base_offset_ds.where(pk >= first) + end + + ds = self + ds = ds.where(pk >= first) if first + yield ds + nil + end + # Delete all rows of the dataset using using multiple queries so that # no more than 1000 rows are deleted at a time (you can configure the # number of rows using paged_update_delete_size). @@ -88,21 +128,11 @@ def paged_delete # number of rows using paged_update_delete_size). All arguments are # passed to Dataset#update. def paged_update(*args) - pk = _paged_update_delete_pk(:paged_update) rows_updated = 0 - base_offset_ds = offset_ds = _paged_update_delete_offset_ds - first = nil - - while last = offset_ds.get(pk) - ds = where(pk < last) - ds = ds.where(pk >= first) if first + paged_datasets do |ds| rows_updated += ds.update(*args) - first = last - offset_ds = base_offset_ds.where(pk >= first) end - ds = self - ds = ds.where(pk >= first) if first - rows_updated + ds.update(*args) + rows_updated end # Set the number of rows to update or delete per query when using @@ -133,7 +163,7 @@ def _paged_update_delete_pk(meth) # to get the upper limit for the next UPDATE or DELETE query. def _paged_update_delete_offset_ds offset = @opts[:paged_updated_delete_rows] || 1000 - _force_primary_key_order.offset(offset+1) + _force_primary_key_order.offset(offset) end end end diff --git a/spec/extensions/paged_update_delete_spec.rb b/spec/extensions/paged_update_delete_spec.rb index 4d7db4c63..cd1784af0 100644 --- a/spec/extensions/paged_update_delete_spec.rb +++ b/spec/extensions/paged_update_delete_spec.rb @@ -14,11 +14,11 @@ it "#paged_delete should delete using multiple queries" do @ds.paged_delete.must_equal 2002 @db.sqls.must_equal [ - "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000", "DELETE FROM albums WHERE (id < 1002)", - "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000", "DELETE FROM albums WHERE (id < 2002)", - "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000", "DELETE FROM albums" ] end @@ -26,21 +26,51 @@ it "#paged_update should update using multiple queries" do @ds.paged_update(:x=>1).must_equal 2002 @db.sqls.must_equal [ - "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000", "UPDATE albums SET x = 1 WHERE (id < 1002)", - "SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 1000", "UPDATE albums SET x = 1 WHERE ((id < 2002) AND (id >= 1002))", - "SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 1000", "UPDATE albums SET x = 1 WHERE (id >= 2002)" ] end + it "#paged_datasets should yield multiple datasets making up dataset" do + sqls = [] + @ds.paged_datasets{|ds| sqls << ds.sql} + sqls.must_equal [ + "SELECT * FROM albums WHERE (id < 1002)", + "SELECT * FROM albums WHERE ((id < 2002) AND (id >= 1002))", + "SELECT * FROM albums WHERE (id >= 2002)" + ] + @db.sqls.must_equal [ + "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000", + "SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 1000", + "SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 1000", + ] + end + + it "#paged_datasets should support returning enum" do + enum = @ds.paged_datasets + enum.must_be_kind_of Enumerator + enum.map(&:sql).must_equal [ + "SELECT * FROM albums WHERE (id < 1002)", + "SELECT * FROM albums WHERE ((id < 2002) AND (id >= 1002))", + "SELECT * FROM albums WHERE (id >= 2002)" + ] + @db.sqls.must_equal [ + "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000", + "SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 1000", + "SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 1000", + ] + end + it "#paged_delete should handle case where number of rows is less than page size" do @db.fetch = [] @db.numrows = [2] @ds.paged_delete.must_equal 2 @db.sqls.must_equal [ - "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000", "DELETE FROM albums" ] end @@ -50,19 +80,25 @@ @db.numrows = [2] @ds.paged_update(:x=>1).must_equal 2 @db.sqls.must_equal [ - "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000", "UPDATE albums SET x = 1" ] end + it "#paged_datasets should handle case where number of rows is less than page size" do + @db.fetch = [] + @ds.paged_datasets.map(&:sql).must_equal ['SELECT * FROM albums'] + @db.sqls.must_equal ["SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000"] + end + it "#paged_delete should respect existing filters" do @ds.where{x > 3}.paged_delete.must_equal 2002 @db.sqls.must_equal [ - "SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1000", "DELETE FROM albums WHERE ((x > 3) AND (id < 1002))", - "SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1000", "DELETE FROM albums WHERE ((x > 3) AND (id < 2002))", - "SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1000", "DELETE FROM albums WHERE (x > 3)" ] end @@ -70,18 +106,32 @@ it "#paged_update should respect existing filters" do @ds.where{x > 3}.paged_update(:x=>1).must_equal 2002 @db.sqls.must_equal [ - "SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1000", "UPDATE albums SET x = 1 WHERE ((x > 3) AND (id < 1002))", - "SELECT id FROM albums WHERE ((x > 3) AND (id >= 1002)) ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums WHERE ((x > 3) AND (id >= 1002)) ORDER BY id LIMIT 1 OFFSET 1000", "UPDATE albums SET x = 1 WHERE ((x > 3) AND (id < 2002) AND (id >= 1002))", - "SELECT id FROM albums WHERE ((x > 3) AND (id >= 2002)) ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums WHERE ((x > 3) AND (id >= 2002)) ORDER BY id LIMIT 1 OFFSET 1000", "UPDATE albums SET x = 1 WHERE ((x > 3) AND (id >= 2002))" ] end + it "#paged_datasets should respect existing filters" do + @ds.where{x > 3}.paged_datasets.map(&:sql).must_equal [ + "SELECT * FROM albums WHERE ((x > 3) AND (id < 1002))", + "SELECT * FROM albums WHERE ((x > 3) AND (id < 2002) AND (id >= 1002))", + "SELECT * FROM albums WHERE ((x > 3) AND (id >= 2002))" + ] + + @db.sqls.must_equal [ + "SELECT id FROM albums WHERE (x > 3) ORDER BY id LIMIT 1 OFFSET 1000", + "SELECT id FROM albums WHERE ((x > 3) AND (id >= 1002)) ORDER BY id LIMIT 1 OFFSET 1000", + "SELECT id FROM albums WHERE ((x > 3) AND (id >= 2002)) ORDER BY id LIMIT 1 OFFSET 1000", + ] + end + it "#paged_update_delete_size should set the page size for paged_update" do @db.numrows = [4, 4, 2] - @ds.paged_update_delete_size(3).paged_delete.must_equal 10 + @ds.paged_update_delete_size(4).paged_delete.must_equal 10 @db.sqls.must_equal [ "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4", "DELETE FROM albums WHERE (id < 1002)", @@ -94,7 +144,7 @@ it "#paged_update_delete_size should set the page size for paged_delete" do @db.numrows = [4, 4, 2] - @ds.paged_update_delete_size(3).paged_update(:x=>1).must_equal 10 + @ds.paged_update_delete_size(4).paged_update(:x=>1).must_equal 10 @db.sqls.must_equal [ "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4", "UPDATE albums SET x = 1 WHERE (id < 1002)", @@ -105,6 +155,19 @@ ] end + it "#paged_update_delete_size should set the page size for paged_datasets" do + @db.numrows = [4, 4, 2] + @ds.paged_update_delete_size(4).paged_datasets.map(&:sql).must_equal [ + "SELECT * FROM albums WHERE (id < 1002)", + "SELECT * FROM albums WHERE ((id < 2002) AND (id >= 1002))", + "SELECT * FROM albums WHERE (id >= 2002)" + ] + @db.sqls.must_equal [ + "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4", + "SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 4", + "SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 4", + ] + end it "should raise error for invalid size passed to paged_update_delete_size" do proc{@ds.paged_update_delete_size(0)}.must_raise Sequel::Error proc{@ds.paged_update_delete_size(-1)}.must_raise Sequel::Error @@ -113,33 +176,37 @@ it "should raise error for dataset with limit" do proc{@ds.limit(1).paged_delete}.must_raise Sequel::Error proc{@ds.limit(1).paged_update(:x=>1)}.must_raise Sequel::Error + proc{@ds.limit(1).paged_datasets{}}.must_raise Sequel::Error end it "should raise error for dataset with offset" do proc{@ds.offset(1).paged_delete}.must_raise Sequel::Error proc{@ds.offset(1).paged_update(:x=>1)}.must_raise Sequel::Error + proc{@ds.offset(1).paged_datasets{}}.must_raise Sequel::Error end it "should raise error for model with composite primary key" do @c.set_primary_key [:id, :x] proc{@c.dataset.paged_delete}.must_raise Sequel::Error proc{@c.dataset.paged_update(:x=>1)}.must_raise Sequel::Error + proc{@c.dataset.paged_datasets{}}.must_raise Sequel::Error end it "should raise error for model with no primary key" do @c.no_primary_key proc{@c.dataset.paged_delete}.must_raise Sequel::Error proc{@c.dataset.paged_update(:x=>1)}.must_raise Sequel::Error + proc{@c.dataset.paged_datasets{}}.must_raise Sequel::Error end it "should offer paged_delete class method" do @c.paged_delete.must_equal 2002 @db.sqls.must_equal [ - "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000", "DELETE FROM albums WHERE (id < 1002)", - "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000", "DELETE FROM albums WHERE (id < 2002)", - "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000", "DELETE FROM albums" ] end @@ -147,18 +214,31 @@ it "should offer paged_update class method" do @c.paged_update(:x=>1).must_equal 2002 @db.sqls.must_equal [ - "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000", "UPDATE albums SET x = 1 WHERE (id < 1002)", - "SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 1000", "UPDATE albums SET x = 1 WHERE ((id < 2002) AND (id >= 1002))", - "SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 1001", + "SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 1000", "UPDATE albums SET x = 1 WHERE (id >= 2002)" ] end + it "should offer paged_datasets class method" do + @c.paged_datasets.map(&:sql).must_equal [ + "SELECT * FROM albums WHERE (id < 1002)", + "SELECT * FROM albums WHERE ((id < 2002) AND (id >= 1002))", + "SELECT * FROM albums WHERE (id >= 2002)" + ] + @db.sqls.must_equal [ + "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 1000", + "SELECT id FROM albums WHERE (id >= 1002) ORDER BY id LIMIT 1 OFFSET 1000", + "SELECT id FROM albums WHERE (id >= 2002) ORDER BY id LIMIT 1 OFFSET 1000", + ] + end + it "should offer paged_update_delete_size class method" do @db.numrows = [4, 4, 2] - @c.paged_update_delete_size(3).paged_delete.must_equal 10 + @c.paged_update_delete_size(4).paged_delete.must_equal 10 @db.sqls.must_equal [ "SELECT id FROM albums ORDER BY id LIMIT 1 OFFSET 4", "DELETE FROM albums WHERE (id < 1002)", diff --git a/spec/integration/plugin_test.rb b/spec/integration/plugin_test.rb index 52666a6cc..8ce5d8bbf 100644 --- a/spec/integration/plugin_test.rb +++ b/spec/integration/plugin_test.rb @@ -3062,6 +3062,21 @@ def set(k, v, ttl) self[k] = v end @model.select_order_map([:id, :o]).must_equal expected end + it "Model#paged_datasets should work on unfiltered dataset" do + final_counts = [1, 2, 1, 10, 1, 2, 1, 100, 100] + @sizes.zip(final_counts).each do |rows, expected_fc| + @db.transaction(:rollback=>:always) do + counts = [] + @model.paged_update_delete_size(rows).paged_datasets{|ds| counts << ds.count} + counts.pop.must_equal expected_fc + counts.each{|c| c.must_equal rows} + end + end + counts = [] + @model.paged_datasets{|ds| counts << ds.count} + counts.must_equal [100] + end + it "Model#paged_delete should work on filtered dataset" do ds = @model.where{id < 50} @sizes.each do |rows| @@ -3092,4 +3107,20 @@ def set(k, v, ttl) self[k] = v end ds.select_order_map([:id, :o]).must_equal ds_expected other.select_order_map([:id, :o]).must_equal other_expected end + + it "Model#paged_datasets should work on filtered dataset" do + ds = @model.where{id < 50} + final_counts = [1, 1, 1, 9, 5, 49, 49, 49, 49] + @sizes.zip(final_counts).each do |rows, expected_fc| + @db.transaction(:rollback=>:always) do + counts = [] + ds.paged_update_delete_size(rows).paged_datasets{|ds| counts << ds.count} + counts.pop.must_equal expected_fc + counts.each{|c| c.must_equal rows} + end + end + counts = [] + ds.paged_datasets{|ds| counts << ds.count} + counts.must_equal [49] + end end