Skip to content

Commit

Permalink
IGNITE-23307 Throw CompactedException when getting timestamp by revis…
Browse files Browse the repository at this point in the history
…ion and vice versa (#4504)
  • Loading branch information
tkalkirill authored Oct 4, 2024
1 parent 12e58d0 commit 5c0d950
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ private void recoverUpdates(OnUpdateHandler handler, long recoveryRevision, int

long revision = entry.revision();

handler.handle(update, metastore.timestampByRevision(revision), revision);
handler.handle(update, metastore.timestampByRevisionLocally(revision), revision);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.Flow.Subscriber;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.dsl.Condition;
Expand Down Expand Up @@ -124,10 +125,14 @@ public interface MetaStorageManager extends IgniteComponent {
* Looks up a timestamp by a revision. This should only be invoked if it is guaranteed that the
* revision is available in the local storage. This method always operates locally.
*
* <p>Requested revision is expected to be less than or equal to the current metastorage revision.</p>
*
* @param revision Revision by which to do a lookup.
* @return Timestamp corresponding to the revision.
* @throws IgniteInternalException with cause {@link NodeStoppingException} if the node is in the process of stopping.
* @throws CompactedException If the requested revision has been compacted.
*/
HybridTimestamp timestampByRevision(long revision);
HybridTimestamp timestampByRevisionLocally(long revision);

/**
* Retrieves entries for given keys.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,8 @@ public void onError(Throwable e) {
Entry entry1 = metaStorageManager0.getLocally(key1, Long.MAX_VALUE);
Entry entry2 = metaStorageManager0.getLocally(key2, Long.MAX_VALUE);

assertThat(revToTs.get(entry1.revision()), is(metaStorageManager0.timestampByRevision(entry1.revision())));
assertThat(revToTs.get(entry2.revision()), is(metaStorageManager0.timestampByRevision(entry2.revision())));
assertThat(revToTs.get(entry1.revision()), is(metaStorageManager0.timestampByRevisionLocally(entry1.revision())));
assertThat(revToTs.get(entry2.revision()), is(metaStorageManager0.timestampByRevisionLocally(entry2.revision())));
}

private static class RevisionAndTimestamp {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -750,16 +750,8 @@ public Cursor<Entry> prefixLocally(ByteArray keyPrefix, long revUpperBound) {
}

@Override
public HybridTimestamp timestampByRevision(long revision) {
if (!busyLock.enterBusy()) {
throw new IgniteException(new NodeStoppingException());
}

try {
return storage.timestampByRevision(revision);
} finally {
busyLock.leaveBusy();
}
public HybridTimestamp timestampByRevisionLocally(long revision) {
return inBusyLock(busyLock, () -> storage.timestampByRevision(revision));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,19 +296,23 @@ boolean invoke(
/**
* Looks up a timestamp by a revision.
*
* <p>Requested revision is expected to be less than or equal to the current storage revision.</p>
*
* @param revision Revision by which to do a lookup.
* @return Timestamp corresponding to the revision.
* @throws CompactedException If the requested revision has been compacted.
*/
// TODO: IGNITE-23307 Figure out what to do after compaction
HybridTimestamp timestampByRevision(long revision);

/**
* Looks a revision lesser or equal to the timestamp.
*
* <p>It is possible to get the revision timestamp using method {@link Entry#timestamp}.</p>
*
* @param timestamp Timestamp by which to do a lookup.
* @return Revision lesser or equal to the timestamp or -1 if there is no such revision.
* @return Revision lesser or equal to the timestamp.
* @throws CompactedException If a revision could not be found by timestamp because it was already compacted.
*/
// TODO: IGNITE-23307 Figure out what to do after compaction
long revisionByTimestamp(HybridTimestamp timestamp);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,12 @@ public static void assertCompactionRevisionLessThanCurrent(long compactionRevisi
compactionRevision, revision
);
}

/** Asserts that the requested revision is less than or equal to the current storage revision. */
public static void assertRequestedRevisionLessThanOrEqualToCurrent(long requestedRevision, long revision) {
assert requestedRevision <= revision : String.format(
"Requested revision should be less than or equal to the current: [requested=%s, current=%s]",
requestedRevision, revision
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.ignite.internal.metastorage.server.persistence;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOTHING_TO_COMPACT_INDEX;
import static org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertCompactionRevisionLessThanCurrent;
import static org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertRequestedRevisionLessThanOrEqualToCurrent;
import static org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.indexToCompact;
import static org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.toUtf8String;
import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
Expand Down Expand Up @@ -967,30 +969,10 @@ public void removeWatch(WatchListener listener) {
public void compact(long revision) {
assert revision >= 0;

try (RocksIterator iterator = index.newIterator()) {
iterator.seekToFirst();

while (iterator.isValid()) {
rwLock.writeLock().lock();

try (WriteBatch batch = new WriteBatch()) {
assertCompactionRevisionLessThanCurrent(revision, rev);

for (int i = 0; i < COMPACT_BATCH_SIZE && iterator.isValid(); i++, iterator.next()) {
if (stopCompaction.get()) {
return;
}

compactForKey(batch, iterator.key(), getAsLongs(iterator.value()), revision);
}

db.write(defaultWriteOptions, batch);
} finally {
rwLock.writeLock().unlock();
}
}
try {
compactKeys(revision);

checkIterator(iterator);
compactRevisionToTimestampAndViceVersa(revision);
} catch (Throwable t) {
throw new MetaStorageException(COMPACTION_ERR, "Error during compaction: " + revision, t);
}
Expand Down Expand Up @@ -1436,34 +1418,44 @@ private void replayUpdates(long lowerRevision, long upperRevision) {

@Override
public HybridTimestamp timestampByRevision(long revision) {
rwLock.readLock().lock();

try {
assertRequestedRevisionLessThanOrEqualToCurrent(revision, rev);

byte[] tsBytes = revisionToTs.get(longToBytes(revision));

assert tsBytes != null;
if (tsBytes == null) {
throw new CompactedException("Requested revision has already been compacted: " + revision);
}

return HybridTimestamp.hybridTimestamp(bytesToLong(tsBytes));
return hybridTimestamp(bytesToLong(tsBytes));
} catch (RocksDBException e) {
throw new MetaStorageException(OP_EXECUTION_ERR, e);
throw new MetaStorageException(OP_EXECUTION_ERR, "Error reading revision timestamp: " + revision, e);
} finally {
rwLock.readLock().unlock();
}
}

@Override
public long revisionByTimestamp(HybridTimestamp timestamp) {
byte[] tsBytes = hybridTsToArray(timestamp);
rwLock.readLock().lock();

// Find a revision with timestamp lesser or equal to the watermark.
// Find a revision with timestamp lesser or equal to the timestamp.
try (RocksIterator rocksIterator = tsToRevision.newIterator()) {
rocksIterator.seekForPrev(tsBytes);
rocksIterator.seekForPrev(hybridTsToArray(timestamp));

checkIterator(rocksIterator);

byte[] tsValue = rocksIterator.value();

if (tsValue.length == 0) {
return -1;
throw new CompactedException("Revisions less than or equal to the requested one are already compacted: " + timestamp);
}

return bytesToLong(tsValue);
} finally {
rwLock.readLock().unlock();
}
}

Expand Down Expand Up @@ -1618,6 +1610,71 @@ public long getCompactionRevision() {
}
}

private void compactKeys(long compactionRevision) throws RocksDBException {
compactInBatches(index, (it, batch) -> {
compactForKey(batch, it.key(), getAsLongs(it.value()), compactionRevision);

return true;
});
}

private void compactRevisionToTimestampAndViceVersa(long compactionRevision) throws RocksDBException {
compactInBatches(revisionToTs, (it, batch) -> {
long revision = bytesToLong(it.key());

if (revision > compactionRevision) {
return false;
}

revisionToTs.delete(batch, it.key());
tsToRevision.delete(batch, it.value());

return true;
});
}

@FunctionalInterface
private interface CompactionAction {
/**
* Performs compaction on the storage at the current iterator pointer. Returns {@code true} if it is necessary to continue
* iterating, {@link false} if it is necessary to finish with writing the last batch.
*/
boolean compact(RocksIterator it, WriteBatch batch) throws RocksDBException;
}

private void compactInBatches(ColumnFamily columnFamily, CompactionAction compactionAction) throws RocksDBException {
try (RocksIterator iterator = columnFamily.newIterator()) {
iterator.seekToFirst();

boolean continueIterating = true;

while (continueIterating && iterator.isValid()) {
rwLock.writeLock().lock();

try (WriteBatch batch = new WriteBatch()) {
assertCompactionRevisionLessThanCurrent(compactionRevision, rev);

for (int i = 0; i < COMPACT_BATCH_SIZE && iterator.isValid(); i++, iterator.next()) {
if (stopCompaction.get()) {
return;
}

if (!compactionAction.compact(iterator, batch)) {
continueIterating = false;

break;
}
}

db.write(defaultWriteOptions, batch);
} finally {
rwLock.writeLock().unlock();
}
}

checkIterator(iterator);
}
}

private boolean isTombstone(byte[] key, long revision) throws RocksDBException {
byte[] rocksKey = keyToRocksKey(revision, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,21 @@
import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite.internal.metastorage.impl.CommandIdGenerator;
import org.apache.ignite.internal.metastorage.server.ExistenceCondition.Type;
import org.apache.ignite.internal.testframework.WorkDirectory;
Expand Down Expand Up @@ -270,6 +274,58 @@ void testEntryOperationTimestampAfterCompaction() {
assertNull(storage.get(BAR_KEY).timestamp());
}

@Test
void testTimestampByRevisionAfterCompaction() {
storage.compact(1);

assertThrows(CompactedException.class, () -> storage.timestampByRevision(1));
assertDoesNotThrow(() -> storage.timestampByRevision(2));
assertDoesNotThrow(() -> storage.timestampByRevision(3));

storage.compact(2);

assertThrows(CompactedException.class, () -> storage.timestampByRevision(1));
assertThrows(CompactedException.class, () -> storage.timestampByRevision(2));
assertDoesNotThrow(() -> storage.timestampByRevision(3));

storage.compact(3);

assertThrows(CompactedException.class, () -> storage.timestampByRevision(1));
assertThrows(CompactedException.class, () -> storage.timestampByRevision(2));
assertThrows(CompactedException.class, () -> storage.timestampByRevision(3));
}

@Test
void testRevisionByTimestampAfterCompaction() {
HybridTimestamp timestamp1 = storage.timestampByRevision(1);
HybridTimestamp timestamp2 = storage.timestampByRevision(2);
HybridTimestamp timestamp3 = storage.timestampByRevision(3);

storage.compact(1);

assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp1));
assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp1.subtractPhysicalTime(1)));
assertDoesNotThrow(() -> storage.revisionByTimestamp(timestamp2));
assertDoesNotThrow(() -> storage.revisionByTimestamp(timestamp3));

storage.compact(2);

assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp1));
assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp1.subtractPhysicalTime(1)));
assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp2));
assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp2.subtractPhysicalTime(1)));
assertDoesNotThrow(() -> storage.revisionByTimestamp(timestamp3));

storage.compact(3);

assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp1));
assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp1.subtractPhysicalTime(1)));
assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp2));
assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp2.subtractPhysicalTime(1)));
assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp3));
assertThrows(CompactedException.class, () -> storage.revisionByTimestamp(timestamp3.subtractPhysicalTime(1)));
}

private List<Integer> collectRevisions(byte[] key) {
var revisions = new ArrayList<Integer>();

Expand Down
Loading

0 comments on commit 5c0d950

Please sign in to comment.