From 5c0d950a05485adc8934704163ff2d7a55423205 Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Fri, 4 Oct 2024 15:38:38 +0300 Subject: [PATCH] IGNITE-23307 Throw CompactedException when getting timestamp by revision and vice versa (#4504) --- .../catalog/storage/UpdateLogImpl.java | 2 +- .../metastorage/MetaStorageManager.java | 7 +- .../impl/ItMetaStorageWatchTest.java | 4 +- .../impl/MetaStorageManagerImpl.java | 12 +- .../metastorage/server/KeyValueStorage.java | 10 +- .../server/KeyValueStorageUtils.java | 8 ++ .../persistence/RocksDbKeyValueStorage.java | 117 +++++++++++++----- ...AbstractCompactionKeyValueStorageTest.java | 56 +++++++++ .../BasicOperationsKeyValueStorageTest.java | 70 +++++------ .../server/SimpleInMemoryKeyValueStorage.java | 38 +++++- .../disaster/ManualGroupUpdateRequest.java | 2 +- 11 files changed, 236 insertions(+), 90 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java index 524b91c0bf6..81760cba24f 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java @@ -264,7 +264,7 @@ private void recoverUpdates(OnUpdateHandler handler, long recoveryRevision, int long revision = entry.revision(); - handler.handle(update, metastore.timestampByRevision(revision), revision); + handler.handle(update, metastore.timestampByRevisionLocally(revision), revision); } } diff --git a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java index 09970056ca2..053bbeb61da 100644 --- a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java +++ b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java @@ -26,6 +26,7 @@ import java.util.concurrent.Flow.Subscriber; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.ByteArray; +import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.dsl.Condition; @@ -124,10 +125,14 @@ public interface MetaStorageManager extends IgniteComponent { * Looks up a timestamp by a revision. This should only be invoked if it is guaranteed that the * revision is available in the local storage. This method always operates locally. * + *

Requested revision is expected to be less than or equal to the current metastorage revision.

+ * * @param revision Revision by which to do a lookup. * @return Timestamp corresponding to the revision. + * @throws IgniteInternalException with cause {@link NodeStoppingException} if the node is in the process of stopping. + * @throws CompactedException If the requested revision has been compacted. */ - HybridTimestamp timestampByRevision(long revision); + HybridTimestamp timestampByRevisionLocally(long revision); /** * Retrieves entries for given keys. diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java index a1ca164e3d1..4d1873e40f2 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java @@ -520,8 +520,8 @@ public void onError(Throwable e) { Entry entry1 = metaStorageManager0.getLocally(key1, Long.MAX_VALUE); Entry entry2 = metaStorageManager0.getLocally(key2, Long.MAX_VALUE); - assertThat(revToTs.get(entry1.revision()), is(metaStorageManager0.timestampByRevision(entry1.revision()))); - assertThat(revToTs.get(entry2.revision()), is(metaStorageManager0.timestampByRevision(entry2.revision()))); + assertThat(revToTs.get(entry1.revision()), is(metaStorageManager0.timestampByRevisionLocally(entry1.revision()))); + assertThat(revToTs.get(entry2.revision()), is(metaStorageManager0.timestampByRevisionLocally(entry2.revision()))); } private static class RevisionAndTimestamp { diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java index 0eda1c3c529..b07ce360c7c 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java @@ -750,16 +750,8 @@ public Cursor prefixLocally(ByteArray keyPrefix, long revUpperBound) { } @Override - public HybridTimestamp timestampByRevision(long revision) { - if (!busyLock.enterBusy()) { - throw new IgniteException(new NodeStoppingException()); - } - - try { - return storage.timestampByRevision(revision); - } finally { - busyLock.leaveBusy(); - } + public HybridTimestamp timestampByRevisionLocally(long revision) { + return inBusyLock(busyLock, () -> storage.timestampByRevision(revision)); } @Override diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java index d3cf0168754..37481cd00c6 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java @@ -296,19 +296,23 @@ boolean invoke( /** * Looks up a timestamp by a revision. * + *

Requested revision is expected to be less than or equal to the current storage revision.

+ * * @param revision Revision by which to do a lookup. * @return Timestamp corresponding to the revision. + * @throws CompactedException If the requested revision has been compacted. */ - // TODO: IGNITE-23307 Figure out what to do after compaction HybridTimestamp timestampByRevision(long revision); /** * Looks a revision lesser or equal to the timestamp. * + *

It is possible to get the revision timestamp using method {@link Entry#timestamp}.

+ * * @param timestamp Timestamp by which to do a lookup. - * @return Revision lesser or equal to the timestamp or -1 if there is no such revision. + * @return Revision lesser or equal to the timestamp. + * @throws CompactedException If a revision could not be found by timestamp because it was already compacted. */ - // TODO: IGNITE-23307 Figure out what to do after compaction long revisionByTimestamp(HybridTimestamp timestamp); /** diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java index c49e3d427fe..e5b97cbb4dc 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java @@ -72,4 +72,12 @@ public static void assertCompactionRevisionLessThanCurrent(long compactionRevisi compactionRevision, revision ); } + + /** Asserts that the requested revision is less than or equal to the current storage revision. */ + public static void assertRequestedRevisionLessThanOrEqualToCurrent(long requestedRevision, long revision) { + assert requestedRevision <= revision : String.format( + "Requested revision should be less than or equal to the current: [requested=%s, current=%s]", + requestedRevision, revision + ); + } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java index 24150794fbb..4888ec09bda 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java @@ -18,8 +18,10 @@ package org.apache.ignite.internal.metastorage.server.persistence; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; import static org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOTHING_TO_COMPACT_INDEX; import static org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertCompactionRevisionLessThanCurrent; +import static org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertRequestedRevisionLessThanOrEqualToCurrent; import static org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.indexToCompact; import static org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.toUtf8String; import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE; @@ -967,30 +969,10 @@ public void removeWatch(WatchListener listener) { public void compact(long revision) { assert revision >= 0; - try (RocksIterator iterator = index.newIterator()) { - iterator.seekToFirst(); - - while (iterator.isValid()) { - rwLock.writeLock().lock(); - - try (WriteBatch batch = new WriteBatch()) { - assertCompactionRevisionLessThanCurrent(revision, rev); - - for (int i = 0; i < COMPACT_BATCH_SIZE && iterator.isValid(); i++, iterator.next()) { - if (stopCompaction.get()) { - return; - } - - compactForKey(batch, iterator.key(), getAsLongs(iterator.value()), revision); - } - - db.write(defaultWriteOptions, batch); - } finally { - rwLock.writeLock().unlock(); - } - } + try { + compactKeys(revision); - checkIterator(iterator); + compactRevisionToTimestampAndViceVersa(revision); } catch (Throwable t) { throw new MetaStorageException(COMPACTION_ERR, "Error during compaction: " + revision, t); } @@ -1436,34 +1418,44 @@ private void replayUpdates(long lowerRevision, long upperRevision) { @Override public HybridTimestamp timestampByRevision(long revision) { + rwLock.readLock().lock(); + try { + assertRequestedRevisionLessThanOrEqualToCurrent(revision, rev); + byte[] tsBytes = revisionToTs.get(longToBytes(revision)); - assert tsBytes != null; + if (tsBytes == null) { + throw new CompactedException("Requested revision has already been compacted: " + revision); + } - return HybridTimestamp.hybridTimestamp(bytesToLong(tsBytes)); + return hybridTimestamp(bytesToLong(tsBytes)); } catch (RocksDBException e) { - throw new MetaStorageException(OP_EXECUTION_ERR, e); + throw new MetaStorageException(OP_EXECUTION_ERR, "Error reading revision timestamp: " + revision, e); + } finally { + rwLock.readLock().unlock(); } } @Override public long revisionByTimestamp(HybridTimestamp timestamp) { - byte[] tsBytes = hybridTsToArray(timestamp); + rwLock.readLock().lock(); - // Find a revision with timestamp lesser or equal to the watermark. + // Find a revision with timestamp lesser or equal to the timestamp. try (RocksIterator rocksIterator = tsToRevision.newIterator()) { - rocksIterator.seekForPrev(tsBytes); + rocksIterator.seekForPrev(hybridTsToArray(timestamp)); checkIterator(rocksIterator); byte[] tsValue = rocksIterator.value(); if (tsValue.length == 0) { - return -1; + throw new CompactedException("Revisions less than or equal to the requested one are already compacted: " + timestamp); } return bytesToLong(tsValue); + } finally { + rwLock.readLock().unlock(); } } @@ -1618,6 +1610,71 @@ public long getCompactionRevision() { } } + private void compactKeys(long compactionRevision) throws RocksDBException { + compactInBatches(index, (it, batch) -> { + compactForKey(batch, it.key(), getAsLongs(it.value()), compactionRevision); + + return true; + }); + } + + private void compactRevisionToTimestampAndViceVersa(long compactionRevision) throws RocksDBException { + compactInBatches(revisionToTs, (it, batch) -> { + long revision = bytesToLong(it.key()); + + if (revision > compactionRevision) { + return false; + } + + revisionToTs.delete(batch, it.key()); + tsToRevision.delete(batch, it.value()); + + return true; + }); + } + + @FunctionalInterface + private interface CompactionAction { + /** + * Performs compaction on the storage at the current iterator pointer. Returns {@code true} if it is necessary to continue + * iterating, {@link false} if it is necessary to finish with writing the last batch. + */ + boolean compact(RocksIterator it, WriteBatch batch) throws RocksDBException; + } + + private void compactInBatches(ColumnFamily columnFamily, CompactionAction compactionAction) throws RocksDBException { + try (RocksIterator iterator = columnFamily.newIterator()) { + iterator.seekToFirst(); + + boolean continueIterating = true; + + while (continueIterating && iterator.isValid()) { + rwLock.writeLock().lock(); + + try (WriteBatch batch = new WriteBatch()) { + assertCompactionRevisionLessThanCurrent(compactionRevision, rev); + + for (int i = 0; i < COMPACT_BATCH_SIZE && iterator.isValid(); i++, iterator.next()) { + if (stopCompaction.get()) { + return; + } + + if (!compactionAction.compact(iterator, batch)) { + continueIterating = false; + + break; + } + } + + db.write(defaultWriteOptions, batch); + } finally { + rwLock.writeLock().unlock(); + } + } + + checkIterator(iterator); + } + } private boolean isTombstone(byte[] key, long revision) throws RocksDBException { byte[] rocksKey = keyToRocksKey(revision, key); diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java index 6a6c6c2a0f4..097f534eb92 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java @@ -24,8 +24,10 @@ import static org.apache.ignite.internal.metastorage.dsl.Operations.remove; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.nio.file.Path; import java.util.ArrayList; @@ -33,8 +35,10 @@ import java.util.UUID; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.metastorage.Entry; +import org.apache.ignite.internal.metastorage.exceptions.CompactedException; import org.apache.ignite.internal.metastorage.impl.CommandIdGenerator; import org.apache.ignite.internal.metastorage.server.ExistenceCondition.Type; import org.apache.ignite.internal.testframework.WorkDirectory; @@ -270,6 +274,58 @@ void testEntryOperationTimestampAfterCompaction() { assertNull(storage.get(BAR_KEY).timestamp()); } + @Test + void testTimestampByRevisionAfterCompaction() { + storage.compact(1); + + assertThrows(CompactedException.class, () -> storage.timestampByRevision(1)); + assertDoesNotThrow(() -> storage.timestampByRevision(2)); + assertDoesNotThrow(() -> storage.timestampByRevision(3)); + + storage.compact(2); + + assertThrows(CompactedException.class, () -> storage.timestampByRevision(1)); + assertThrows(CompactedException.class, () -> storage.timestampByRevision(2)); + assertDoesNotThrow(() -> storage.timestampByRevision(3)); + + storage.compact(3); + + assertThrows(CompactedException.class, () -> storage.timestampByRevision(1)); + assertThrows(CompactedException.class, () -> storage.timestampByRevision(2)); + assertThrows(CompactedException.class, () -> storage.timestampByRevision(3)); + } + + @Test + void testRevisionByTimestampAfterCompaction() { + HybridTimestamp timestamp1 = storage.timestampByRevision(1); + HybridTimestamp timestamp2 = storage.timestampByRevision(2); + HybridTimestamp timestamp3 = storage.timestampByRevision(3); + + storage.compact(1); + + assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp1)); + assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp1.subtractPhysicalTime(1))); + assertDoesNotThrow(() -> storage.revisionByTimestamp(timestamp2)); + assertDoesNotThrow(() -> storage.revisionByTimestamp(timestamp3)); + + storage.compact(2); + + assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp1)); + assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp1.subtractPhysicalTime(1))); + assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp2)); + assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp2.subtractPhysicalTime(1))); + assertDoesNotThrow(() -> storage.revisionByTimestamp(timestamp3)); + + storage.compact(3); + + assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp1)); + assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp1.subtractPhysicalTime(1))); + assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp2)); + assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp2.subtractPhysicalTime(1))); + assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp3)); + assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp3.subtractPhysicalTime(1))); + } + private List collectRevisions(byte[] key) { var revisions = new ArrayList(); diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java index 5709bc2d9cd..5c8f864ba98 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java @@ -1951,59 +1951,55 @@ void testWatchErrorHandling() { @Test public void testRevisionByTimestamp() { - // Verify that in case of empty storage -1 will be returned. - assertEquals(-1, storage.revisionByTimestamp(MIN_VALUE)); - assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(2))); - assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(5))); - assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(7))); - assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(10))); - assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(12))); - assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(15))); - assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(17))); - assertEquals(-1, storage.revisionByTimestamp(MAX_VALUE)); - // Populate storage with some data in order to have following revision to timestamp mapping: // 1 -> 5 // 2 -> 10 // 3 -> 15 - { - storage.put(key(1), keyValue(1, 1), hybridTimestamp(5)); - assertEquals(1, storage.revision()); + storage.put(key(1), keyValue(1, 1), hybridTimestamp(5)); + assertEquals(1, storage.revision()); - storage.put(key(1), keyValue(1, 1), hybridTimestamp(10)); - assertEquals(2, storage.revision()); + storage.put(key(1), keyValue(1, 1), hybridTimestamp(10)); + assertEquals(2, storage.revision()); - storage.put(key(2), keyValue(2, 2), hybridTimestamp(15)); - assertEquals(3, storage.revision()); - } + storage.put(key(2), keyValue(2, 2), hybridTimestamp(15)); + assertEquals(3, storage.revision()); // Check revisionByTimestamp() - { - assertEquals(-1, storage.revisionByTimestamp(MIN_VALUE)); + // Exact matching 1 -> 5 + assertEquals(1, storage.revisionByTimestamp(hybridTimestamp(5))); + + // There's no revision associated with 7, so closest left one is expected. + assertEquals(1, storage.revisionByTimestamp(hybridTimestamp(7))); + + // Exact matching 2 -> 10 + assertEquals(2, storage.revisionByTimestamp(hybridTimestamp(10))); - // There's no revision associated with 2, so closest left one is expected. - assertEquals(-1, storage.revisionByTimestamp(hybridTimestamp(2))); + // There's no revision associated with 12, so closest left one is expected. + assertEquals(2, storage.revisionByTimestamp(hybridTimestamp(12))); - // Exact matching 1 -> 5 - assertEquals(1, storage.revisionByTimestamp(hybridTimestamp(5))); + // Exact matching 3 -> 15 + assertEquals(3, storage.revisionByTimestamp(hybridTimestamp(15))); - // There's no revision associated with 7, so closest left one is expected. - assertEquals(1, storage.revisionByTimestamp(hybridTimestamp(7))); + // There's no revision associated with 17, so closest left one is expected. + assertEquals(3, storage.revisionByTimestamp(hybridTimestamp(17))); - // Exact matching 2 -> 10 - assertEquals(2, storage.revisionByTimestamp(hybridTimestamp(10))); + assertEquals(3, storage.revisionByTimestamp(MAX_VALUE)); + } - // There's no revision associated with 12, so closest left one is expected. - assertEquals(2, storage.revisionByTimestamp(hybridTimestamp(12))); + @Test + void testTimestampByRevision() { + byte[] key = key(0); + byte[] value = keyValue(0, 0); - // Exact matching 3 -> 15 - assertEquals(3, storage.revisionByTimestamp(hybridTimestamp(15))); + HybridTimestamp timestamp0 = hybridTimestamp(10L); + HybridTimestamp timestamp1 = hybridTimestamp(20L); - // There's no revision associated with 17, so closest left one is expected. - assertEquals(3, storage.revisionByTimestamp(hybridTimestamp(17))); + storage.put(key, value, timestamp0); + assertEquals(timestamp0, storage.timestampByRevision(1)); - assertEquals(3, storage.revisionByTimestamp(MAX_VALUE)); - } + storage.put(key, value, timestamp1); + assertEquals(timestamp0, storage.timestampByRevision(1)); + assertEquals(timestamp1, storage.timestampByRevision(2)); } @Test diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java index 6ba48f9213f..2e217606116 100644 --- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java +++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java @@ -24,6 +24,7 @@ import static java.util.stream.Collectors.toMap; import static org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOTHING_TO_COMPACT_INDEX; import static org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertCompactionRevisionLessThanCurrent; +import static org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertRequestedRevisionLessThanOrEqualToCurrent; import static org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.indexToCompact; import static org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.toUtf8String; import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE; @@ -44,10 +45,10 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.Objects; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; @@ -94,7 +95,11 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { /** Timestamp to revision mapping. */ private final NavigableMap tsToRevMap = new TreeMap<>(); - /** Revision to timestamp mapping. */ + /** + * Revision to timestamp mapping. + * + *

Guarded by {@link #mux}.

+ */ private final Map revToTsMap = new HashMap<>(); /** @@ -425,8 +430,18 @@ public Cursor range(byte[] keyFrom, byte @Nullable [] keyTo, long revUppe @Override public HybridTimestamp timestampByRevision(long revision) { + assert revision >= 0; + synchronized (mux) { - return Objects.requireNonNull(revToTsMap.get(revision), "Revision " + revision + " not found"); + assertRequestedRevisionLessThanOrEqualToCurrent(revision, rev); + + HybridTimestamp timestamp = revToTsMap.get(revision); + + if (timestamp == null) { + throw new CompactedException("Requested revision has already been compacted: " + revision); + } + + return timestamp; } } @@ -436,8 +451,7 @@ public long revisionByTimestamp(HybridTimestamp timestamp) { Map.Entry revisionEntry = tsToRevMap.floorEntry(timestamp.longValue()); if (revisionEntry == null) { - // Nothing to compact yet. - return -1; + throw new CompactedException("Revisions less than or equal to the requested one are already compacted: " + timestamp); } return revisionEntry.getValue(); @@ -554,6 +568,20 @@ public void compact(long revision) { compactForKey(entry.getKey(), toLongArray(entry.getValue()), revision); } } + + synchronized (mux) { + for (Iterator> it = revToTsMap.entrySet().iterator(); it.hasNext(); ) { + Map.Entry e = it.next(); + + if (e.getKey() <= revision) { + it.remove(); + + tsToRevMap.remove(e.getValue().longValue()); + } else { + break; + } + } + } } @Override diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java index 693777f6e50..9f4058ac5b5 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java @@ -127,7 +127,7 @@ public Set partitionIds() { @Override public CompletableFuture handle(DisasterRecoveryManager disasterRecoveryManager, long msRevision) { - HybridTimestamp msSafeTime = disasterRecoveryManager.metaStorageManager.timestampByRevision(msRevision); + HybridTimestamp msSafeTime = disasterRecoveryManager.metaStorageManager.timestampByRevisionLocally(msRevision); int catalogVersion = disasterRecoveryManager.catalogManager.activeCatalogVersion(msSafeTime.longValue());