From c3f3b6a4ce6b6a7e11da44fc98ccd7a8e6e57876 Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Mon, 14 Oct 2024 16:38:19 +0300 Subject: [PATCH] IGNITE-23417 Fix race between compaction and KeyValueStorage#range (#4550) --- .../metastorage/server/KeyValueStorage.java | 6 +- .../persistence/RocksDbKeyValueStorage.java | 21 ++++-- ...AbstractCompactionKeyValueStorageTest.java | 69 +++++++++++++++++++ .../server/SimpleInMemoryKeyValueStorage.java | 23 +++++-- 4 files changed, 106 insertions(+), 13 deletions(-) diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java index ccbaa1d580f..92432367e02 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java @@ -287,7 +287,8 @@ boolean invoke( /** * Returns cursor by latest entries which correspond to the given keys range. * - *

Cursor will iterate over a snapshot of keys and their revisions at the time the method was invoked.

+ *

Cursor will iterate over a snapshot of keys and their revisions at the time the method was invoked. Also, each entry will be the + * only one with the most recent revision.

* *

Never throws {@link CompactedException} as well as cursor methods.

* @@ -299,7 +300,8 @@ boolean invoke( /** * Returns cursor by entries which correspond to the given keys range and bounded by revision number. * - *

Cursor will iterate over a snapshot of keys and their revisions at the time the method was invoked.

+ *

Cursor will iterate over a snapshot of keys and their revisions at the time the method was invoked. And also each record will be + * one and with a revision less than or equal to the {@code revUpperBound}.

* *

Cursor methods never throw {@link CompactedException}.

* diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java index 5e792e495f4..bf743489dee 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java @@ -111,6 +111,7 @@ import org.apache.ignite.internal.rocksdb.RocksUtils; import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager; import org.apache.ignite.internal.thread.NamedThreadFactory; +import org.apache.ignite.internal.util.ArrayUtils; import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.internal.util.IgniteUtils; @@ -1621,12 +1622,20 @@ private boolean isTombstoneForCompaction(byte[] key, long revision) { } private Value getValueForOperation(byte[] key, long revision) { + Value value = getValueForOperationNullable(key, revision); + + assert value != null : "key=" + toUtf8String(key) + ", revision=" + revision; + + return value; + } + + private @Nullable Value getValueForOperationNullable(byte[] key, long revision) { try { byte[] valueBytes = data.get(keyToRocksKey(revision, key)); assert valueBytes != null && valueBytes.length != 0 : "key=" + toUtf8String(key) + ", revision=" + revision; - return bytesToValue(valueBytes); + return ArrayUtils.nullOrEmpty(valueBytes) ? null : bytesToValue(valueBytes); } catch (RocksDBException e) { throw new MetaStorageException( OP_EXECUTION_ERR, @@ -1651,6 +1660,8 @@ private Cursor doRange(byte[] keyFrom, byte @Nullable [] keyTo, long revU iterator.seek(keyFrom); + long compactionRevisionBeforeCreateCursor = compactionRevision; + return new RocksIteratorAdapter<>(iterator) { /** Cached entry used to filter "empty" values. */ private @Nullable Entry next; @@ -1702,10 +1713,12 @@ protected Entry decodeEntry(byte[] key, byte[] keyRevisionsBytes) { } long revision = keyRevisions[maxRevisionIndex]; + Value value = getValueForOperationNullable(key, revision); - // According to the compaction algorithm, we will start it locally on a new compaction revision only when all cursors are - // completed strictly before it. Therefore, during normal operation, we should not get an error here. - Value value = getValueForOperation(key, revision); + // Value may be null if the compaction has removed it in parallel. + if (value == null || (revision <= compactionRevisionBeforeCreateCursor && value.tombstone())) { + return EntryImpl.empty(key); + } return EntryImpl.toEntry(key, revision, value); } diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java index 66324d84ce7..956382ca5bc 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java @@ -19,6 +19,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.metastorage.dsl.Operations.noop; import static org.apache.ignite.internal.metastorage.dsl.Operations.ops; import static org.apache.ignite.internal.metastorage.dsl.Operations.put; @@ -722,6 +723,70 @@ void testRangeAndCompaction() { } } + /** + * Tests {@link KeyValueStorage#range(byte[], byte[])} and {@link KeyValueStorage#range(byte[], byte[], long)} for the case when + * cursors should or should not return the last element after compaction. For {@link #FOO_KEY} and {@link #BAR_KEY}, the last revisions + * are 5, a regular entry and a tombstone. Keys with their revisions are added in {@link #setUp()}. + * + *

Consider the situation:

+ * + */ + @Test + void testRangeAndCompactionForCaseReadLastEntries() { + storage.setCompactionRevision(5); + + try ( + Cursor rangeFooKeyCursorLatest = storage.range(FOO_KEY, storage.nextKey(FOO_KEY)); + Cursor rangeFooKeyCursorBounded = storage.range(FOO_KEY, storage.nextKey(FOO_KEY), 6); + Cursor rangeBarKeyCursorLatest = storage.range(BAR_KEY, storage.nextKey(BAR_KEY)); + Cursor rangeBarKeyCursorBounded = storage.range(BAR_KEY, storage.nextKey(BAR_KEY), 6) + ) { + // Must see the latest revision of the FOO_KEY as it will not be removed from storage by the compaction. + assertEquals(List.of(5L), collectRevisions(rangeFooKeyCursorLatest)); + assertEquals(List.of(5L), collectRevisions(rangeFooKeyCursorBounded)); + + // Must not see the latest revision of the BAR_KEY, as it will have to be removed from the storage by the compaction. + assertEquals(List.of(), collectRevisions(rangeBarKeyCursorLatest)); + assertEquals(List.of(), collectRevisions(rangeBarKeyCursorBounded)); + } + } + + /** + * Tests {@link KeyValueStorage#range(byte[], byte[])} and {@link KeyValueStorage#range(byte[], byte[], long)} for the case when they + * were invoked on a revision (for example, on revision {@code 5}) before invoking + * {@link KeyValueStorage#setCompactionRevision(long) KeyValueStorage.setCompactionRevision(5)} but before invoking + * {@link KeyValueStorage#compact(long) KeyValueStorage.compact(5)}. Such cursors should return entries since nothing should be + * removed yet until they are completed. Keys are chosen for convenience. Keys with their revisions are added in {@link #setUp()}. + */ + @Test + void testRangeAfterSetCompactionRevisionButBeforeStartCompaction() { + try ( + Cursor rangeFooKeyCursorLatest = storage.range(FOO_KEY, storage.nextKey(FOO_KEY)); + Cursor rangeFooKeyCursorBounded = storage.range(FOO_KEY, storage.nextKey(FOO_KEY), 5); + Cursor rangeBarKeyCursorLatest = storage.range(BAR_KEY, storage.nextKey(BAR_KEY)); + Cursor rangeBarKeyCursorBounded = storage.range(BAR_KEY, storage.nextKey(BAR_KEY), 5) + ) { + storage.setCompactionRevision(5); + + assertEquals(List.of(5L), collectRevisions(rangeFooKeyCursorLatest)); + assertEquals(List.of(5L), collectRevisions(rangeFooKeyCursorBounded)); + + assertEquals(List.of(5L), collectRevisions(rangeBarKeyCursorLatest)); + assertEquals(List.of(5L), collectRevisions(rangeBarKeyCursorBounded)); + } + } + private List collectRevisions(byte[] key) { var revisions = new ArrayList(); @@ -736,6 +801,10 @@ private List collectRevisions(byte[] key) { return revisions; } + private List collectRevisions(Cursor cursor) { + return cursor.stream().map(Entry::revision).collect(toList()); + } + private static byte[] fromString(String s) { return s.getBytes(UTF_8); } diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java index 7d379dedb68..fad0672cb5e 100644 --- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java +++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java @@ -918,17 +918,21 @@ private boolean isTombstoneForCompaction(byte[] key, long revision) { } private Value getValue(byte[] key, long revision) { - NavigableMap valueByKey = revsIdx.get(revision); - - assert valueByKey != null : "key=" + toUtf8String(key) + ", revision=" + revision; - - Value value = valueByKey.get(key); + Value value = getValueNullable(key, revision); assert value != null : "key=" + toUtf8String(key) + ", revision=" + revision; return value; } + private @Nullable Value getValueNullable(byte[] key, long revision) { + NavigableMap valueByKey = revsIdx.get(revision); + + assert valueByKey != null : "key=" + toUtf8String(key) + ", revision=" + revision; + + return valueByKey.get(key); + } + private Cursor doRange(byte[] keyFrom, byte @Nullable [] keyTo, long revUpperBound) { assert revUpperBound >= 0 : revUpperBound; @@ -943,14 +947,19 @@ private Cursor doRange(byte[] keyFrom, byte @Nullable [] keyTo, long revU byte[] key = e.getKey(); long[] keyRevisions = toLongArray(e.getValue()); - int maxRevisionIndex = KeyValueStorageUtils.maxRevisionIndex(keyRevisions, revUpperBound); + int maxRevisionIndex = maxRevisionIndex(keyRevisions, revUpperBound); if (maxRevisionIndex == NOT_FOUND) { return EntryImpl.empty(key); } long revision = keyRevisions[maxRevisionIndex]; - Value value = getValue(key, revision); + Value value = getValueNullable(key, revision); + + // Value may be null if the compaction has removed it in parallel. + if (value == null || (revision <= compactionRevision && value.tombstone())) { + return EntryImpl.empty(key); + } return EntryImpl.toEntry(key, revision, value); })