Skip to content

Commit

Permalink
BugFix: call listener.onFailure on failure to pin the timestamp
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale committed Oct 9, 2024
1 parent acf209f commit 8854340
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ protected Settings nodeSettings(int nodeOrdinal) {

ActionListener<Void> noOpActionListener = new ActionListener<>() {
@Override
public void onResponse(Void unused) {}
public void onResponse(Void unused) {
// do nothing
}

@Override
public void onFailure(Exception e) {}
public void onFailure(Exception e) {
fail();
}
};

public void testTimestampPinUnpin() throws Exception {
Expand All @@ -61,11 +65,6 @@ public void testTimestampPinUnpin() throws Exception {
assertEquals(-1L, lastFetchTimestamp);
assertEquals(Set.of(), pinnedTimestampWithFetchTimestamp.v2());

assertThrows(
IllegalArgumentException.class,
() -> remoteStorePinnedTimestampService.pinTimestamp(1234L, "ss1", noOpActionListener)
);

long timestamp1 = System.currentTimeMillis() + 30000L;
long timestamp2 = System.currentTimeMillis() + 60000L;
long timestamp3 = System.currentTimeMillis() + 900000L;
Expand Down Expand Up @@ -197,6 +196,97 @@ public void onFailure(Exception e) {
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
}

public void testPinExceptionsOlderTimestamp() throws InterruptedException {
prepareCluster(1, 1, INDEX_NAME, 0, 2);
ensureGreen(INDEX_NAME);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

CountDownLatch latch = new CountDownLatch(1);
remoteStorePinnedTimestampService.pinTimestamp(1234L, "ss1", new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
// We expect onFailure to be called
fail();
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof IllegalArgumentException);
}
}, latch));

latch.await();
}

// This test fails as we can't control actual upload of pinned timestamp file. We ideally need a BlobStoreRepository
// which can control the speed of upload.
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/16246")
public void testPinExceptionsRemoteStoreCallTakeTime() throws InterruptedException {
prepareCluster(1, 1, INDEX_NAME, 0, 2);
ensureGreen(INDEX_NAME);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueNanos(50000));

CountDownLatch latch = new CountDownLatch(1);
long timestampToBePinned = System.currentTimeMillis();
remoteStorePinnedTimestampService.pinTimestamp(timestampToBePinned, "ss1", new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
// We expect onFailure to be called
fail();
}

@Override
public void onFailure(Exception e) {
logger.error(e.getMessage());
assertTrue(e instanceof RuntimeException);
assertTrue(e.getMessage().contains("Timestamp pinning took"));

// Check if the timestamp was unpinned
remoteStorePinnedTimestampService.forceSyncPinnedTimestamps();
assertFalse(RemoteStorePinnedTimestampService.getPinnedTimestamps().v2().contains(timestampToBePinned));
}
}, latch));

latch.await();
}

public void testUnpinException() throws InterruptedException {
prepareCluster(1, 1, INDEX_NAME, 0, 2);
ensureGreen(INDEX_NAME);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

CountDownLatch latch = new CountDownLatch(1);
remoteStorePinnedTimestampService.unpinTimestamp(1234L, "dummy-entity", new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
// We expect onFailure to be called
fail();
}

@Override
public void onFailure(Exception e) {
logger.error(e.getMessage());
assertTrue(e instanceof IllegalArgumentException);
}
}, latch));

latch.await();
}

public void testLastSuccessfulFetchOfPinnedTimestampsPresentInNodeStats() throws Exception {
logger.info("Starting up cluster manager");
logger.info("cluster.remote_store.pinned_timestamps.enabled set to true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,16 @@ public static Map<String, Set<Long>> fetchPinnedTimestamps(Settings settings, Re
* @throws IllegalArgumentException If the timestamp is less than the current time minus one second
*/
public void pinTimestamp(long timestamp, String pinningEntity, ActionListener<Void> listener) {
// If a caller uses current system time to pin the timestamp, following check will almost always fail.
// So, we allow pinning timestamp in the past upto some buffer
long lookbackIntervalInMills = RemoteStoreSettings.getPinnedTimestampsLookbackInterval().millis();
if (timestamp < (System.currentTimeMillis() - lookbackIntervalInMills)) {
throw new IllegalArgumentException(
"Timestamp to be pinned is less than current timestamp - value of cluster.remote_store.pinned_timestamps.lookback_interval"
);
}
long startTime = System.nanoTime();
try {
// If a caller uses current system time to pin the timestamp, following check will almost always fail.
// So, we allow pinning timestamp in the past upto some buffer
long lookbackIntervalInMills = RemoteStoreSettings.getPinnedTimestampsLookbackInterval().millis();
if (timestamp < (System.currentTimeMillis() - lookbackIntervalInMills)) {
throw new IllegalArgumentException(
"Timestamp to be pinned is less than current timestamp - value of cluster.remote_store.pinned_timestamps.lookback_interval"
);
}
long startTime = System.nanoTime();
logger.debug("Pinning timestamp = {} against entity = {}", timestamp, pinningEntity);
blobContainer.writeBlob(getBlobName(timestamp, pinningEntity), new ByteArrayInputStream(new byte[0]), 0, true);
long elapsedTime = System.nanoTime() - startTime;
Expand All @@ -155,7 +155,7 @@ public void pinTimestamp(long timestamp, String pinningEntity, ActionListener<Vo
} else {
listener.onResponse(null);
}
} catch (IOException e) {
} catch (Exception e) {
listener.onFailure(e);
}
}
Expand Down Expand Up @@ -198,7 +198,7 @@ public void cloneTimestamp(long timestamp, String existingPinningEntity, String
logger.error(errorMessage);
listener.onFailure(new IllegalArgumentException(errorMessage));
}
} catch (IOException e) {
} catch (Exception e) {
listener.onFailure(e);
}
}
Expand Down Expand Up @@ -249,7 +249,7 @@ public void unpinTimestamp(long timestamp, String pinningEntity, ActionListener<
logger.error(errorMessage);
listener.onFailure(new IllegalArgumentException(errorMessage));
}
} catch (IOException e) {
} catch (Exception e) {
listener.onFailure(e);
}
}
Expand Down

0 comments on commit 8854340

Please sign in to comment.