From 37e72de1398d29674fbe3358eddc1e43dd1d50bc Mon Sep 17 00:00:00 2001 From: Jay Edgar Date: Fri, 12 Feb 2016 14:37:12 -0800 Subject: [PATCH] Add retry option when receiving kBusy on first row after snapshot Summary: When there is contention on rows that causes kBusy errors if this occurs on the first row since we created a snapshot we can release the snapshot (and iterator) and retry to avoid returning too many deadlock errors. Test Plan: MTR Reviewers: spetrunia, yoshinorim, hermanlee4 Reviewed By: hermanlee4 Subscribers: spetrunia, webscalesql-eng Differential Revision: https://reviews.facebook.net/D54231 --- .../include/rocksdb_concurrent_delete.inc | 53 +++ mysql-test/suite/rocksdb/r/deadlock.result | 19 + .../suite/rocksdb/r/delete_before_lock.result | 5 +- .../r/rocksdb_concurrent_delete.result | 62 ++- mysql-test/suite/rocksdb/t/deadlock.test | 21 +- .../suite/rocksdb/t/delete_before_lock.test | 1 - .../rocksdb/t/rocksdb_concurrent_delete.test | 50 +-- storage/rocksdb/ha_rocksdb.cc | 369 ++++++++++-------- storage/rocksdb/ha_rocksdb.h | 16 +- 9 files changed, 375 insertions(+), 221 deletions(-) create mode 100644 mysql-test/suite/rocksdb/include/rocksdb_concurrent_delete.inc diff --git a/mysql-test/suite/rocksdb/include/rocksdb_concurrent_delete.inc b/mysql-test/suite/rocksdb/include/rocksdb_concurrent_delete.inc new file mode 100644 index 000000000000..71e713226d75 --- /dev/null +++ b/mysql-test/suite/rocksdb/include/rocksdb_concurrent_delete.inc @@ -0,0 +1,53 @@ +# Usage: +# +# let $order = ASC; # or DESC +# let $comment = "rev:cf2"; # or "" +# --source suite/rocksdb/include/rocksdb_concurrent_delete.inc + +let $first_row = -1; # Error this should never happen +if ($order == 'ASC') +{ + let $first_row = 1; +} +if ($order == 'DESC') +{ + let $first_row = 3; +} + +connect (con, localhost, root,,); +connection default; + +--disable_warnings +SET debug_sync='RESET'; +DROP TABLE IF EXISTS t1; +--enable_warnings + +eval CREATE TABLE t1 (pk INT PRIMARY KEY COMMENT $comment, a INT); +INSERT INTO t1 VALUES(1,1), (2,2), (3,3); + +# This will cause the SELECT to block after finding the first row, but +# before locking and reading it. +connection con; +SET debug_sync='rocksdb_concurrent_delete SIGNAL parked WAIT_FOR go'; +send_eval SELECT * FROM t1 order by t1.pk $order FOR UPDATE; + +# While that connection is waiting, delete the first row (the one con +# is about to lock and read +connection default; +SET debug_sync='now WAIT_FOR parked'; +eval DELETE FROM t1 WHERE pk = $first_row; + +# Signal the waiting select to continue +SET debug_sync='now SIGNAL go'; + +# Now get the results from the select. The first entry (1,1) (or (3,3) when +# using reverse ordering) should be missing. Prior to the fix the SELECT +# would have returned: "1815: Internal error: NotFound:" +connection con; +reap; + +# Cleanup +connection default; +disconnect con; +set debug_sync='RESET'; +drop table t1; diff --git a/mysql-test/suite/rocksdb/r/deadlock.result b/mysql-test/suite/rocksdb/r/deadlock.result index 4ca34e3936a4..3e2f5709ca05 100644 --- a/mysql-test/suite/rocksdb/r/deadlock.result +++ b/mysql-test/suite/rocksdb/r/deadlock.result @@ -6,6 +6,7 @@ DROP DATABASE IF EXISTS mysqlslap; CREATE DATABASE mysqlslap; USE mysqlslap; CREATE TABLE t1(id1 BIGINT, id2 BIGINT, count INT, PRIMARY KEY(id1, id2), KEY(id2)) ENGINE=rocksdb; +CREATE TABLE t1rev(id1 BIGINT, id2 BIGINT, count INT, PRIMARY KEY(id1, id2) COMMENT "rev:cf2", KEY(id2) COMMENT "rev:cf2") ENGINE=rocksdb; SET @save = @@global.rocksdb_lock_wait_timeout; SET GLOBAL rocksdb_lock_wait_timeout = 60; SELECT count from t1; @@ -14,5 +15,23 @@ count SELECT count from t1; count 100000 +SELECT count from t1; +count +150000 +SELECT count from t1; +count +200000 +SELECT count from t1rev; +count +50000 +SELECT count from t1rev; +count +100000 +SELECT count from t1rev; +count +150000 +SELECT count from t1rev; +count +200000 SET GLOBAL rocksdb_lock_wait_timeout = @save; DROP DATABASE mysqlslap; diff --git a/mysql-test/suite/rocksdb/r/delete_before_lock.result b/mysql-test/suite/rocksdb/r/delete_before_lock.result index a9f325590934..402ef539ffd4 100644 --- a/mysql-test/suite/rocksdb/r/delete_before_lock.result +++ b/mysql-test/suite/rocksdb/r/delete_before_lock.result @@ -7,10 +7,9 @@ update t1 set value=100 where id1=1; set debug_sync='now WAIT_FOR parked'; delete from t1 where id1=1 and id2=1; set debug_sync='now SIGNAL go'; -ERROR 40001: Deadlock found when trying to get lock; try restarting transaction select * from t1 where id1=1 for update; id1 id2 value -1 2 1 -1 3 1 +1 2 100 +1 3 100 set debug_sync='RESET'; drop table t1; diff --git a/mysql-test/suite/rocksdb/r/rocksdb_concurrent_delete.result b/mysql-test/suite/rocksdb/r/rocksdb_concurrent_delete.result index b2f013bab2fb..9d6d368c6868 100644 --- a/mysql-test/suite/rocksdb/r/rocksdb_concurrent_delete.result +++ b/mysql-test/suite/rocksdb/r/rocksdb_concurrent_delete.result @@ -1,12 +1,56 @@ +SET debug_sync='RESET'; +DROP TABLE IF EXISTS t1; +CREATE TABLE t1 (pk INT PRIMARY KEY COMMENT "", a INT); +INSERT INTO t1 VALUES(1,1), (2,2), (3,3); +SET debug_sync='rocksdb_concurrent_delete SIGNAL parked WAIT_FOR go'; +SELECT * FROM t1 order by t1.pk ASC FOR UPDATE; +SET debug_sync='now WAIT_FOR parked'; +DELETE FROM t1 WHERE pk = 1; +SET debug_sync='now SIGNAL go'; +pk a +2 2 +3 3 set debug_sync='RESET'; -drop table if exists t1; -create table t1 (pk int primary key, a int); -insert into t1 values(1,1), (2,2), (3,3); -set debug_sync='rocksdb_concurrent_delete SIGNAL parked WAIT_FOR go'; -select * from t1 for update; -set debug_sync='now WAIT_FOR parked'; -delete from t1 where pk = 1; -set debug_sync='now SIGNAL go'; -ERROR 40001: Deadlock found when trying to get lock; try restarting transaction +drop table t1; +SET debug_sync='RESET'; +DROP TABLE IF EXISTS t1; +CREATE TABLE t1 (pk INT PRIMARY KEY COMMENT "", a INT); +INSERT INTO t1 VALUES(1,1), (2,2), (3,3); +SET debug_sync='rocksdb_concurrent_delete SIGNAL parked WAIT_FOR go'; +SELECT * FROM t1 order by t1.pk DESC FOR UPDATE; +SET debug_sync='now WAIT_FOR parked'; +DELETE FROM t1 WHERE pk = 3; +SET debug_sync='now SIGNAL go'; +pk a +2 2 +1 1 +set debug_sync='RESET'; +drop table t1; +SET debug_sync='RESET'; +DROP TABLE IF EXISTS t1; +CREATE TABLE t1 (pk INT PRIMARY KEY COMMENT "rev:cf2", a INT); +INSERT INTO t1 VALUES(1,1), (2,2), (3,3); +SET debug_sync='rocksdb_concurrent_delete SIGNAL parked WAIT_FOR go'; +SELECT * FROM t1 order by t1.pk ASC FOR UPDATE; +SET debug_sync='now WAIT_FOR parked'; +DELETE FROM t1 WHERE pk = 1; +SET debug_sync='now SIGNAL go'; +pk a +2 2 +3 3 +set debug_sync='RESET'; +drop table t1; +SET debug_sync='RESET'; +DROP TABLE IF EXISTS t1; +CREATE TABLE t1 (pk INT PRIMARY KEY COMMENT "rev:cf2", a INT); +INSERT INTO t1 VALUES(1,1), (2,2), (3,3); +SET debug_sync='rocksdb_concurrent_delete SIGNAL parked WAIT_FOR go'; +SELECT * FROM t1 order by t1.pk DESC FOR UPDATE; +SET debug_sync='now WAIT_FOR parked'; +DELETE FROM t1 WHERE pk = 3; +SET debug_sync='now SIGNAL go'; +pk a +2 2 +1 1 set debug_sync='RESET'; drop table t1; diff --git a/mysql-test/suite/rocksdb/t/deadlock.test b/mysql-test/suite/rocksdb/t/deadlock.test index e5bd6a773446..3be7fda9952a 100644 --- a/mysql-test/suite/rocksdb/t/deadlock.test +++ b/mysql-test/suite/rocksdb/t/deadlock.test @@ -14,6 +14,7 @@ DROP DATABASE IF EXISTS mysqlslap; CREATE DATABASE mysqlslap; USE mysqlslap; CREATE TABLE t1(id1 BIGINT, id2 BIGINT, count INT, PRIMARY KEY(id1, id2), KEY(id2)) ENGINE=rocksdb; +CREATE TABLE t1rev(id1 BIGINT, id2 BIGINT, count INT, PRIMARY KEY(id1, id2) COMMENT "rev:cf2", KEY(id2) COMMENT "rev:cf2") ENGINE=rocksdb; SET @save = @@global.rocksdb_lock_wait_timeout; SET GLOBAL rocksdb_lock_wait_timeout = 60; @@ -22,12 +23,20 @@ SET GLOBAL rocksdb_lock_wait_timeout = 60; SELECT count from t1; --exec $MYSQL_SLAP --silent --concurrency=50 --number-of-queries=50000 --query="UPDATE t1 SET count=count+1 WHERE id1=1 AND id2=1" SELECT count from t1; -## Disable these for now -## TODO(jkedgar) add retry ability so these tests can be enabled again -##--exec $MYSQL_SLAP --silent --concurrency=50 --number-of-queries=50000 --query="UPDATE t1 SET count=count+1 WHERE id2=1" -##SELECT count from t1; -##--exec $MYSQL_SLAP --silent --concurrency=50 --number-of-queries=50000 --query="UPDATE t1 SET count=count+1" -##SELECT count from t1; +--exec $MYSQL_SLAP --silent --concurrency=50 --number-of-queries=50000 --query="UPDATE t1 SET count=count+1 WHERE id2=1" +SELECT count from t1; +--exec $MYSQL_SLAP --silent --concurrency=50 --number-of-queries=50000 --query="UPDATE t1 SET count=count+1" +SELECT count from t1; + +# Same tests on a table with reverse orderings +--exec $MYSQL_SLAP --silent --concurrency=50 --number-of-queries=50000 --query="INSERT INTO t1rev VALUES(1, 1, 1) ON DUPLICATE KEY UPDATE count=count+1" +SELECT count from t1rev; +--exec $MYSQL_SLAP --silent --concurrency=50 --number-of-queries=50000 --query="UPDATE t1rev SET count=count+1 WHERE id1=1 AND id2=1" +SELECT count from t1rev; +--exec $MYSQL_SLAP --silent --concurrency=50 --number-of-queries=50000 --query="UPDATE t1rev SET count=count+1 WHERE id2=1" +SELECT count from t1rev; +--exec $MYSQL_SLAP --silent --concurrency=50 --number-of-queries=50000 --query="UPDATE t1rev SET count=count+1" +SELECT count from t1rev; SET GLOBAL rocksdb_lock_wait_timeout = @save; diff --git a/mysql-test/suite/rocksdb/t/delete_before_lock.test b/mysql-test/suite/rocksdb/t/delete_before_lock.test index cd958ee3c34e..93a9d1adaf94 100644 --- a/mysql-test/suite/rocksdb/t/delete_before_lock.test +++ b/mysql-test/suite/rocksdb/t/delete_before_lock.test @@ -26,7 +26,6 @@ delete from t1 where id1=1 and id2=1; set debug_sync='now SIGNAL go'; connection con; ---error ER_LOCK_DEADLOCK reap; select * from t1 where id1=1 for update; diff --git a/mysql-test/suite/rocksdb/t/rocksdb_concurrent_delete.test b/mysql-test/suite/rocksdb/t/rocksdb_concurrent_delete.test index 459ffa08e0d1..ecb4b2a3609d 100644 --- a/mysql-test/suite/rocksdb/t/rocksdb_concurrent_delete.test +++ b/mysql-test/suite/rocksdb/t/rocksdb_concurrent_delete.test @@ -7,44 +7,18 @@ # deleted before the GetForUpdate() call could occur. When this happened # a nearly useless error was being returned. -connect (con, localhost, root,,); -connection default; +let $order=ASC; +let $comment=""; +--source suite/rocksdb/include/rocksdb_concurrent_delete.inc ---disable_warnings -set debug_sync='RESET'; -drop table if exists t1; ---enable_warnings +let $order=DESC; +let $comment=""; +--source suite/rocksdb/include/rocksdb_concurrent_delete.inc -create table t1 (pk int primary key, a int); -insert into t1 values(1,1), (2,2), (3,3); +let $order=ASC; +let $comment="rev:cf2"; +--source suite/rocksdb/include/rocksdb_concurrent_delete.inc -# This will cause the select to block after finding the first row, but -# before locking and reading it. -connection con; -set debug_sync='rocksdb_concurrent_delete SIGNAL parked WAIT_FOR go'; -send select * from t1 for update; - -# While that connection is waiting, delete the first row (the one con -# is about to lock and read -connection default; -set debug_sync='now WAIT_FOR parked'; -delete from t1 where pk = 1; - -# Signal the waiting select to continue -set debug_sync='now SIGNAL go'; - -# Now get the results from the select. The first entry (1,1) should -# be missing. Prior to the fix the select would have returned: -# "1815: Internal error: NotFound:" -connection con; -# As of https://reviews.facebook.net/D54129 this will get a deadlock error -# TODO(jkedgar): Once retrying is available this should no longer return -# an error ---error ER_LOCK_DEADLOCK -reap; - -# Cleanup -connection default; -disconnect con; -set debug_sync='RESET'; -drop table t1; +let $order=DESC; +let $comment="rev:cf2"; +--source suite/rocksdb/include/rocksdb_concurrent_delete.inc diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index 57b1b8ee914e..bfe2f1fb69d4 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -1502,6 +1502,11 @@ class Rdb_transaction txn->ClearSnapshot(); } + bool has_snapshot() + { + return read_opts.snapshot != nullptr; + } + /* Flush the data accumulated so far. This assumes we're doing a bulk insert. @@ -2832,8 +2837,7 @@ void ha_rocksdb::load_auto_incr_value() (Why don't we use index_init/index_end? class handler defines index_init as private, for some reason). */ - delete scan_it; - scan_it= nullptr; + release_scan_iterator(); } /* Get PK value from table->record[0]. */ @@ -2886,11 +2890,7 @@ int ha_rocksdb::load_hidden_pk_value() table->status= save_table_status; active_index= save_active_index; - if (scan_it) - { - delete scan_it; - scan_it= nullptr; - } + release_scan_iterator(); return 0; } @@ -4304,7 +4304,6 @@ int ha_rocksdb::read_row_from_primary_key(uchar *buf) if (lock_rows) { /* We need to put a lock and re-read */ - // TODO(jkedgar): what if we find that the row is gone? Need a re-read? rc= get_row_by_rowid(buf, pk_packed_tuple, pk_size); } else @@ -4643,39 +4642,55 @@ int ha_rocksdb::index_read_map_impl(uchar *buf, const uchar *key, if (find_flag == HA_READ_KEY_EXACT && my_count_bits(keypart_map) == kd->get_m_key_parts()) use_all_keys= true; - /* - This will open the iterator and position it at a record that's equal or - greater than the lookup tuple. - */ - setup_index_scan(kd, &slice, use_all_keys, is_ascending(kd, find_flag), - eq_cond_len); - - /* - Once we are positioned on from above, move to the position we really - want: See storage/rocksdb/rocksdb-range-access.txt - */ - bool move_forward; - rc = position_to_correct_key(kd, find_flag, using_full_key, key, keypart_map, - slice, &move_forward); - if (rc) + Rdb_transaction *trx= get_or_create_trx(table->in_use); + bool is_new_snapshot= !trx->has_snapshot(); + // Loop as long as we get a deadlock error AND we end up creating the + // snapshot here (i.e. it did not exist prior to this) + for (;;) { - table->status= STATUS_NOT_FOUND; /* This status is returned on any error */ - io_perf_end_and_record(); - DBUG_RETURN(rc); - } + /* + This will open the iterator and position it at a record that's equal or + greater than the lookup tuple. + */ + setup_scan_iterator(kd, &slice, use_all_keys, is_ascending(kd, find_flag), + eq_cond_len); - skip_scan_it_next_call= FALSE; + /* + Once we are positioned on from above, move to the position we really + want: See storage/rocksdb/rocksdb-range-access.txt + */ + bool move_forward; + rc = position_to_correct_key(kd, find_flag, using_full_key, key, + keypart_map, slice, &move_forward); - /* - Now get the data for the row into 'buf'. If we were using a primary key - then we have all the rows we need. For a secondary key we now need to - lookup the primary key. - */ - if (active_index == table->s->primary_key) - rc= read_row_from_primary_key(buf); - else - rc= read_row_from_secondary_key(buf, kd, move_forward); + if (rc) + { + /* This status is returned on any error */ + table->status= STATUS_NOT_FOUND; + io_perf_end_and_record(); + DBUG_RETURN(rc); + } + + skip_scan_it_next_call= FALSE; + + /* + Now get the data for the row into 'buf'. If we were using a primary key + then we have all the rows we need. For a secondary key we now need to + lookup the primary key. + */ + if (active_index == table->s->primary_key) + rc= read_row_from_primary_key(buf); + else + rc= read_row_from_secondary_key(buf, kd, move_forward); + + if (rc != HA_ERR_LOCK_DEADLOCK || !is_new_snapshot) + break; /* Exit the loop */ + + // release the snapshot and iterator so they will be regenerated + trx->release_snapshot(); + release_scan_iterator(); + } if (rc) { @@ -5230,12 +5245,25 @@ int ha_rocksdb::index_first_intern(uchar *buf) rocksdb::Slice index_key((const char*)key, key_size); - setup_index_scan(key_descr[active_index], &index_key, false, - !key_descr[active_index]->is_reverse_cf, - RDBSE_KEYDEF::INDEX_NUMBER_SIZE); - skip_scan_it_next_call= TRUE; + Rdb_transaction *trx= get_or_create_trx(table->in_use); + bool is_new_snapshot= !trx->has_snapshot(); + // Loop as long as we get a deadlock error AND we end up creating the + // snapshot here (i.e. it did not exist prior to this) + for (;;) + { + setup_scan_iterator(key_descr[active_index], &index_key, false, + !key_descr[active_index]->is_reverse_cf, + RDBSE_KEYDEF::INDEX_NUMBER_SIZE); + skip_scan_it_next_call= TRUE; + + rc= index_next_with_direction(buf, true); + if (rc != HA_ERR_LOCK_DEADLOCK || !is_new_snapshot) + break; // exit the loop - rc= index_next_with_direction(buf, true); + // release the snapshot and iterator so they will be regenerated + trx->release_snapshot(); + release_scan_iterator(); + } if (!rc) { @@ -5305,39 +5333,53 @@ int ha_rocksdb::index_last_intern(uchar *buf) rocksdb::Slice index_key((const char*)key, key_size); - setup_index_scan(key_descr[active_index], &index_key, false, - key_descr[active_index]->is_reverse_cf, - RDBSE_KEYDEF::INDEX_NUMBER_SIZE); - - if (!scan_it->Valid()) - { - /* - RocksDB keyspace has no records that follow our index. It could be, our - index is the last in the keyspace. Navigate to last record in the - keyspace. - */ - scan_it->SeekToLast(); - } - else + Rdb_transaction *trx= get_or_create_trx(table->in_use); + bool is_new_snapshot= !trx->has_snapshot(); + // Loop as long as we get a deadlock error AND we end up creating the + // snapshot here (i.e. it did not exist prior to this) + for (;;) { - /* - Iterator is pointing at some record right after the end of the index. - Move one record back. - */ - scan_it->Prev(); - } - skip_scan_it_next_call= FALSE; + setup_scan_iterator(key_descr[active_index], &index_key, false, + key_descr[active_index]->is_reverse_cf, + RDBSE_KEYDEF::INDEX_NUMBER_SIZE); - if (is_pk(active_index, table, tbl_def)) - { - skip_scan_it_next_call= true; - rc= rnd_next_with_direction(buf, false); - } - else - { - rc= find_icp_matching_index_rec(false /*move_forward*/, buf); - if (!rc) - rc= secondary_index_read(active_index, buf); + if (!scan_it->Valid()) + { + /* + RocksDB keyspace has no records that follow our index. It could be, our + index is the last in the keyspace. Navigate to last record in the + keyspace. + */ + scan_it->SeekToLast(); + } + else + { + /* + Iterator is pointing at some record right after the end of the index. + Move one record back. + */ + scan_it->Prev(); + } + skip_scan_it_next_call= FALSE; + + if (is_pk(active_index, table, tbl_def)) + { + skip_scan_it_next_call= true; + rc= rnd_next_with_direction(buf, false); + } + else + { + rc= find_icp_matching_index_rec(false /*move_forward*/, buf); + if (!rc) + rc= secondary_index_read(active_index, buf); + } + + if (rc != HA_ERR_LOCK_DEADLOCK || !is_new_snapshot) + break; /* exit the loop */ + + // release the snapshot and iterator so they will be regenerated + trx->release_snapshot(); + release_scan_iterator(); } if (!rc) @@ -5869,11 +5911,11 @@ int ha_rocksdb::update_write_row(const uchar *old_data, Open a cursor and position it at the passed record */ -void ha_rocksdb::setup_index_scan(RDBSE_KEYDEF *keydef, - rocksdb::Slice *slice, - const bool use_all_keys, - const bool is_ascending, - const uint eq_cond_len) +void ha_rocksdb::setup_scan_iterator(RDBSE_KEYDEF *keydef, + rocksdb::Slice *slice, + const bool use_all_keys, + const bool is_ascending, + const uint eq_cond_len) { DBUG_ASSERT(slice->size() >= eq_cond_len); Rdb_transaction *trx= get_or_create_trx(table->in_use); @@ -5885,21 +5927,20 @@ void ha_rocksdb::setup_index_scan(RDBSE_KEYDEF *keydef, skip_bloom= false; /* - In some cases, setup_index_scan() is called multiple times from + In some cases, setup_scan_iterator() is called multiple times from the same query but bloom filter can not always be used. Suppose the following query example. id2 is VARCHAR(30) and PRIMARY KEY (id1, id2). select count(*) from t2 WHERE id1=100 and id2 IN ('00000000000000000000', '100'); - In this case, setup_index_scan() is called twice, the first time is for + In this case, setup_scan_iterator() is called twice, the first time is for (id1, id2)=(100, '00000000000000000000') and the second time is for (100, '100'). If prefix bloom filter length is 24 bytes, prefix bloom filter can be used for the first condition but not for the second condition. If bloom filter condition is changed, currently it is necessary to destroy and re-create Iterator. */ - if (scan_it && scan_it_skips_bloom != skip_bloom) + if (scan_it_skips_bloom != skip_bloom) { - delete scan_it; - scan_it= nullptr; + release_scan_iterator(); } /* @@ -5919,46 +5960,68 @@ void ha_rocksdb::setup_index_scan(RDBSE_KEYDEF *keydef, scan_it->Seek(*slice); } +void ha_rocksdb::setup_iterator_for_rnd_scan() +{ + uint key_size; + + if (pk_descr->is_reverse_cf) + pk_descr->get_supremum_key(pk_packed_tuple, &key_size); + else + pk_descr->get_infimum_key(pk_packed_tuple, &key_size); + + rocksdb::Slice table_key((const char*)pk_packed_tuple, key_size); + + setup_scan_iterator(pk_descr, &table_key); + skip_scan_it_next_call= true; +} int ha_rocksdb::rnd_init(bool scan) { DBUG_ENTER("ha_rocksdb::rnd_init"); io_perf_start(); + + Rdb_transaction *trx= get_or_create_trx(table->in_use); if (scan) { - uint key_size; - if (pk_descr->is_reverse_cf) - pk_descr->get_supremum_key(pk_packed_tuple, &key_size); - else - pk_descr->get_infimum_key(pk_packed_tuple, &key_size); - - rocksdb::Slice table_key((const char*)pk_packed_tuple, key_size); - - setup_index_scan(pk_descr, &table_key); - skip_scan_it_next_call= true; + rnd_scan_is_new_snapshot= !trx->has_snapshot(); + setup_iterator_for_rnd_scan(); } else { /* We don't need any preparations for rnd_pos() calls. */ } - io_perf_end_and_record(); - - Rdb_transaction *trx= get_or_create_trx(table->in_use); // If lock_rows is on then we will be doing a GetForUpdate when accessing // the index, so don't acquire the snapshot right away. Otherwise acquire // the snapshot immediately. trx->acquire_snapshot(!lock_rows); + io_perf_end_and_record(); + DBUG_RETURN(0); } int ha_rocksdb::rnd_next(uchar *buf) { + int rc; io_perf_start(); - int rc= rnd_next_with_direction(buf, true); + for (;;) + { + rc= rnd_next_with_direction(buf, true); + if (rc != HA_ERR_LOCK_DEADLOCK || !rnd_scan_is_new_snapshot) + break; /* exit the loop */ + + // release the snapshot and iterator and then regenerate them + Rdb_transaction *trx= get_or_create_trx(table->in_use); + trx->release_snapshot(); + release_scan_iterator(); + setup_iterator_for_rnd_scan(); + } + + rnd_scan_is_new_snapshot= false; + io_perf_end_and_record(); if (rc == HA_ERR_KEY_NOT_FOUND) rc = HA_ERR_END_OF_FILE; @@ -5991,78 +6054,64 @@ int ha_rocksdb::rnd_next_with_direction(uchar *buf, bool move_forward) DBUG_RETURN(HA_ERR_END_OF_FILE); } - bool retry; - do + if (skip_scan_it_next_call) + skip_scan_it_next_call= false; + else { - retry = false; - if (skip_scan_it_next_call) - skip_scan_it_next_call= false; + if (move_forward) + scan_it->Next(); /* this call cannot fail */ else + scan_it->Prev(); /* this call cannot fail */ + } + + if (scan_it->Valid()) + { + /* check if we're out of this table */ + rocksdb::Slice key= scan_it->key(); + if (!pk_descr->covers_key(key)) { - if (move_forward) - scan_it->Next(); /* this call cannot fail */ - else - scan_it->Prev(); /* this call cannot fail */ + rc= HA_ERR_END_OF_FILE; + table->status= STATUS_NOT_FOUND; } - - if (scan_it->Valid()) + else { - /* check if we're out of this table */ - rocksdb::Slice key= scan_it->key(); - if (!pk_descr->covers_key(key)) - { - rc= HA_ERR_END_OF_FILE; - table->status= STATUS_NOT_FOUND; - } - else + if (lock_rows) { - if (lock_rows) - { - /* - Lock the row we've just read. - - Now we call GetForUpdate which will 1) Take a lock and 2) Will fail - if the row was deleted since the snapshot was taken. - */ - Rdb_transaction *trx= get_or_create_trx(table->in_use); - DEBUG_SYNC(ha_thd(), "rocksdb_concurrent_delete"); - rocksdb::Status s= trx->GetForUpdate(pk_descr->get_cf(), key, - &retrieved_record); - if (s.IsNotFound()) - { - // The current row was deleted between when we saw it - // (via Next/Prev) and when we locked and read it. - // Move on to the next row instead. - retry = true; - continue; - } - - if (!s.ok()) - { - DBUG_RETURN(trx->set_status_error(table->in_use, s, pk_descr, - tbl_def)); - } + /* + Lock the row we've just read. - // If we called GetForUpdate() use the value from that call not from - // the iterator as it may be stale since we don't have a snapshot - // when lock_rows is true. - last_rowkey.copy(key.data(), key.size(), &my_charset_bin); - rc= convert_record_from_storage_format(&key, buf); - } - else + Now we call GetForUpdate which will 1) Take a lock and 2) Will fail + if the row was deleted since the snapshot was taken. + */ + Rdb_transaction *trx= get_or_create_trx(table->in_use); + DEBUG_SYNC(ha_thd(), "rocksdb_concurrent_delete"); + rocksdb::Status s= trx->GetForUpdate(pk_descr->get_cf(), key, + &retrieved_record); + if (!s.ok()) { - // Use the value from the iterator - rocksdb::Slice value= scan_it->value(); - last_rowkey.copy(key.data(), key.size(), &my_charset_bin); - rc= convert_record_from_storage_format(&key, &value, buf); + DBUG_RETURN(trx->set_status_error(table->in_use, s, pk_descr, + tbl_def)); } - table->status= 0; + // If we called GetForUpdate() use the value from that call not from + // the iterator as it may be stale since we don't have a snapshot + // when lock_rows is true. + last_rowkey.copy(key.data(), key.size(), &my_charset_bin); + rc= convert_record_from_storage_format(&key, buf); + } + else + { + // Use the value from the iterator + rocksdb::Slice value= scan_it->value(); + last_rowkey.copy(key.data(), key.size(), &my_charset_bin); + rc= convert_record_from_storage_format(&key, &value, buf); } + + table->status= 0; } - else - rc= HA_ERR_END_OF_FILE; - } while (retry); + } + else + rc= HA_ERR_END_OF_FILE; if (!rc) { stats.rows_read++; @@ -6078,8 +6127,7 @@ int ha_rocksdb::rnd_end() { DBUG_ENTER("ha_rocksdb::rnd_end"); - delete scan_it; - scan_it= nullptr; + release_scan_iterator(); DBUG_RETURN(0); } @@ -6100,8 +6148,7 @@ int ha_rocksdb::index_init(uint idx, bool sorted) int ha_rocksdb::index_end() { - delete scan_it; - scan_it= nullptr; + release_scan_iterator(); active_index= MAX_KEY; in_range_check_pushed_down= FALSE; diff --git a/storage/rocksdb/ha_rocksdb.h b/storage/rocksdb/ha_rocksdb.h index 00e62607432c..1add0ce493ab 100644 --- a/storage/rocksdb/ha_rocksdb.h +++ b/storage/rocksdb/ha_rocksdb.h @@ -365,6 +365,9 @@ class ha_rocksdb: public handler bool skip_scan_it_next_call; + /* TRUE means we are accessing the first row after a snapshot was created */ + bool rnd_scan_is_new_snapshot; + /* Valid inside UPDATE statements, when updated_indexes_is_valid=true: @@ -383,14 +386,21 @@ class ha_rocksdb: public handler int create_key_defs(TABLE *table_arg, const char *db_table, uint len, HA_CREATE_INFO *create_info); int secondary_index_read(int keyno, uchar *buf); - void setup_index_scan(RDBSE_KEYDEF *keydef, rocksdb::Slice *slice) + void setup_iterator_for_rnd_scan(); + void setup_scan_iterator(RDBSE_KEYDEF *keydef, rocksdb::Slice *slice) { - setup_index_scan(keydef, slice, false, false, 0); + setup_scan_iterator(keydef, slice, false, false, 0); } bool is_ascending(RDBSE_KEYDEF *keydef, enum ha_rkey_function find_flag); - void setup_index_scan(RDBSE_KEYDEF *keydef, rocksdb::Slice *slice, + void setup_scan_iterator(RDBSE_KEYDEF *keydef, rocksdb::Slice *slice, const bool use_all_keys, const bool is_ascending, const uint eq_cond_len); + void release_scan_iterator(void) + { + delete scan_it; + scan_it= nullptr; + } + int get_row_by_rowid(uchar *buf, const char *pk_tuple, uint pk_tuple_size); int get_row_by_rowid(uchar *buf, const uchar *pk_tuple, uint pk_tuple_size) {