Skip to content

Commit

Permalink
IGNITE-23417 Fix race between compaction and KeyValueStorage#range (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tkalkirill authored Oct 14, 2024
1 parent ba808e4 commit c3f3b6a
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ boolean invoke(
/**
* Returns cursor by latest entries which correspond to the given keys range.
*
* <p>Cursor will iterate over a snapshot of keys and their revisions at the time the method was invoked.</p>
* <p>Cursor will iterate over a snapshot of keys and their revisions at the time the method was invoked. Also, each entry will be the
* only one with the most recent revision.</p>
*
* <p>Never throws {@link CompactedException} as well as cursor methods.</p>
*
Expand All @@ -299,7 +300,8 @@ boolean invoke(
/**
* Returns cursor by entries which correspond to the given keys range and bounded by revision number.
*
* <p>Cursor will iterate over a snapshot of keys and their revisions at the time the method was invoked.</p>
* <p>Cursor will iterate over a snapshot of keys and their revisions at the time the method was invoked. And also each record will be
* one and with a revision less than or equal to the {@code revUpperBound}.</p>
*
* <p>Cursor methods never throw {@link CompactedException}.</p>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteUtils;
Expand Down Expand Up @@ -1621,12 +1622,20 @@ private boolean isTombstoneForCompaction(byte[] key, long revision) {
}

private Value getValueForOperation(byte[] key, long revision) {
Value value = getValueForOperationNullable(key, revision);

assert value != null : "key=" + toUtf8String(key) + ", revision=" + revision;

return value;
}

private @Nullable Value getValueForOperationNullable(byte[] key, long revision) {
try {
byte[] valueBytes = data.get(keyToRocksKey(revision, key));

assert valueBytes != null && valueBytes.length != 0 : "key=" + toUtf8String(key) + ", revision=" + revision;

return bytesToValue(valueBytes);
return ArrayUtils.nullOrEmpty(valueBytes) ? null : bytesToValue(valueBytes);
} catch (RocksDBException e) {
throw new MetaStorageException(
OP_EXECUTION_ERR,
Expand All @@ -1651,6 +1660,8 @@ private Cursor<Entry> doRange(byte[] keyFrom, byte @Nullable [] keyTo, long revU

iterator.seek(keyFrom);

long compactionRevisionBeforeCreateCursor = compactionRevision;

return new RocksIteratorAdapter<>(iterator) {
/** Cached entry used to filter "empty" values. */
private @Nullable Entry next;
Expand Down Expand Up @@ -1702,10 +1713,12 @@ protected Entry decodeEntry(byte[] key, byte[] keyRevisionsBytes) {
}

long revision = keyRevisions[maxRevisionIndex];
Value value = getValueForOperationNullable(key, revision);

// According to the compaction algorithm, we will start it locally on a new compaction revision only when all cursors are
// completed strictly before it. Therefore, during normal operation, we should not get an error here.
Value value = getValueForOperation(key, revision);
// Value may be null if the compaction has removed it in parallel.
if (value == null || (revision <= compactionRevisionBeforeCreateCursor && value.tombstone())) {
return EntryImpl.empty(key);
}

return EntryImpl.toEntry(key, revision, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
Expand Down Expand Up @@ -722,6 +723,70 @@ void testRangeAndCompaction() {
}
}

/**
* Tests {@link KeyValueStorage#range(byte[], byte[])} and {@link KeyValueStorage#range(byte[], byte[], long)} for the case when
* cursors should or should not return the last element after compaction. For {@link #FOO_KEY} and {@link #BAR_KEY}, the last revisions
* are 5, a regular entry and a tombstone. Keys with their revisions are added in {@link #setUp()}.
*
* <p>Consider the situation:</p>
* <ul>
* <li>Made {@link KeyValueStorage#setCompactionRevision(long) KeyValueStorage.setCompactionRevision(5)}.</li>
* <li>Waited for all cursors to end with revision {@code 5} or less.</li>
* <li>Made {@link KeyValueStorage#compact(long) KeyValueStorage.compact(5)}.</li>
* <li>Invoke {@link KeyValueStorage#range} for last revision and revision {@code 6} for {@link #FOO_KEY} and {@link #BAR_KEY}.
* <ul>
* <li>For {@link #FOO_KEY}, we need to return a entry with revision {@code 5}, since it will not be removed from the storage
* after compaction.</li>
* <li>For {@link #BAR_KEY}, we should not return anything, since the key will be deleted after compaction.</li>
* </ul>
* </li>
* </ul>
*/
@Test
void testRangeAndCompactionForCaseReadLastEntries() {
storage.setCompactionRevision(5);

try (
Cursor<Entry> rangeFooKeyCursorLatest = storage.range(FOO_KEY, storage.nextKey(FOO_KEY));
Cursor<Entry> rangeFooKeyCursorBounded = storage.range(FOO_KEY, storage.nextKey(FOO_KEY), 6);
Cursor<Entry> rangeBarKeyCursorLatest = storage.range(BAR_KEY, storage.nextKey(BAR_KEY));
Cursor<Entry> rangeBarKeyCursorBounded = storage.range(BAR_KEY, storage.nextKey(BAR_KEY), 6)
) {
// Must see the latest revision of the FOO_KEY as it will not be removed from storage by the compaction.
assertEquals(List.of(5L), collectRevisions(rangeFooKeyCursorLatest));
assertEquals(List.of(5L), collectRevisions(rangeFooKeyCursorBounded));

// Must not see the latest revision of the BAR_KEY, as it will have to be removed from the storage by the compaction.
assertEquals(List.of(), collectRevisions(rangeBarKeyCursorLatest));
assertEquals(List.of(), collectRevisions(rangeBarKeyCursorBounded));
}
}

/**
* Tests {@link KeyValueStorage#range(byte[], byte[])} and {@link KeyValueStorage#range(byte[], byte[], long)} for the case when they
* were invoked on a revision (for example, on revision {@code 5}) before invoking
* {@link KeyValueStorage#setCompactionRevision(long) KeyValueStorage.setCompactionRevision(5)} but before invoking
* {@link KeyValueStorage#compact(long) KeyValueStorage.compact(5)}. Such cursors should return entries since nothing should be
* removed yet until they are completed. Keys are chosen for convenience. Keys with their revisions are added in {@link #setUp()}.
*/
@Test
void testRangeAfterSetCompactionRevisionButBeforeStartCompaction() {
try (
Cursor<Entry> rangeFooKeyCursorLatest = storage.range(FOO_KEY, storage.nextKey(FOO_KEY));
Cursor<Entry> rangeFooKeyCursorBounded = storage.range(FOO_KEY, storage.nextKey(FOO_KEY), 5);
Cursor<Entry> rangeBarKeyCursorLatest = storage.range(BAR_KEY, storage.nextKey(BAR_KEY));
Cursor<Entry> rangeBarKeyCursorBounded = storage.range(BAR_KEY, storage.nextKey(BAR_KEY), 5)
) {
storage.setCompactionRevision(5);

assertEquals(List.of(5L), collectRevisions(rangeFooKeyCursorLatest));
assertEquals(List.of(5L), collectRevisions(rangeFooKeyCursorBounded));

assertEquals(List.of(5L), collectRevisions(rangeBarKeyCursorLatest));
assertEquals(List.of(5L), collectRevisions(rangeBarKeyCursorBounded));
}
}

private List<Integer> collectRevisions(byte[] key) {
var revisions = new ArrayList<Integer>();

Expand All @@ -736,6 +801,10 @@ private List<Integer> collectRevisions(byte[] key) {
return revisions;
}

private List<Long> collectRevisions(Cursor<Entry> cursor) {
return cursor.stream().map(Entry::revision).collect(toList());
}

private static byte[] fromString(String s) {
return s.getBytes(UTF_8);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,17 +918,21 @@ private boolean isTombstoneForCompaction(byte[] key, long revision) {
}

private Value getValue(byte[] key, long revision) {
NavigableMap<byte[], Value> valueByKey = revsIdx.get(revision);

assert valueByKey != null : "key=" + toUtf8String(key) + ", revision=" + revision;

Value value = valueByKey.get(key);
Value value = getValueNullable(key, revision);

assert value != null : "key=" + toUtf8String(key) + ", revision=" + revision;

return value;
}

private @Nullable Value getValueNullable(byte[] key, long revision) {
NavigableMap<byte[], Value> valueByKey = revsIdx.get(revision);

assert valueByKey != null : "key=" + toUtf8String(key) + ", revision=" + revision;

return valueByKey.get(key);
}

private Cursor<Entry> doRange(byte[] keyFrom, byte @Nullable [] keyTo, long revUpperBound) {
assert revUpperBound >= 0 : revUpperBound;

Expand All @@ -943,14 +947,19 @@ private Cursor<Entry> doRange(byte[] keyFrom, byte @Nullable [] keyTo, long revU
byte[] key = e.getKey();
long[] keyRevisions = toLongArray(e.getValue());

int maxRevisionIndex = KeyValueStorageUtils.maxRevisionIndex(keyRevisions, revUpperBound);
int maxRevisionIndex = maxRevisionIndex(keyRevisions, revUpperBound);

if (maxRevisionIndex == NOT_FOUND) {
return EntryImpl.empty(key);
}

long revision = keyRevisions[maxRevisionIndex];
Value value = getValue(key, revision);
Value value = getValueNullable(key, revision);

// Value may be null if the compaction has removed it in parallel.
if (value == null || (revision <= compactionRevision && value.tombstone())) {
return EntryImpl.empty(key);
}

return EntryImpl.toEntry(key, revision, value);
})
Expand Down

0 comments on commit c3f3b6a

Please sign in to comment.