From 8e5e54e87f6f86a2425a5c767e208767ba3df14c Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Tue, 10 Oct 2023 16:16:36 +0530 Subject: [PATCH] Use file length from metadata while downloading segments from remote store (#10399) Signed-off-by: Sachin Kale --- .../index/store/RemoteDirectory.java | 13 +++-- .../store/RemoteSegmentStoreDirectory.java | 3 +- .../index/store/RemoteDirectoryTests.java | 54 +++++++++---------- .../RemoteSegmentStoreDirectoryTests.java | 5 +- 4 files changed, 39 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index eb75c39532d71..70a88f6159764 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -39,7 +39,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -193,10 +192,14 @@ public IndexOutput createOutput(String name, IOContext context) { */ @Override public IndexInput openInput(String name, IOContext context) throws IOException { + return openInput(name, fileLength(name), context); + } + + public IndexInput openInput(String name, long fileLength, IOContext context) throws IOException { InputStream inputStream = null; try { inputStream = blobContainer.readBlob(name); - return new RemoteIndexInput(name, downloadRateLimiter.apply(inputStream), fileLength(name)); + return new RemoteIndexInput(name, downloadRateLimiter.apply(inputStream), fileLength); } catch (Exception e) { // Incase the RemoteIndexInput creation fails, close the input stream to avoid file handler leak. if (inputStream != null) { @@ -230,9 +233,9 @@ public void close() throws IOException { @Override public long fileLength(String name) throws IOException { // ToDo: Instead of calling remote store each time, keep a cache with segment metadata - Map metadata = blobContainer.listBlobsByPrefix(name); - if (metadata.containsKey(name)) { - return metadata.get(name).length(); + List metadata = blobContainer.listBlobsByPrefixInSortedOrder(name, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + if (metadata.size() == 1 && metadata.get(0).name().equals(name)) { + return metadata.get(0).length(); } throw new NoSuchFileException(name); } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index dc9706306b408..c0b302c68c0ea 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -434,8 +434,9 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti @Override public IndexInput openInput(String name, IOContext context) throws IOException { String remoteFilename = getExistingRemoteFilename(name); + long fileLength = fileLength(name); if (remoteFilename != null) { - return remoteDataDirectory.openInput(remoteFilename, context); + return remoteDataDirectory.openInput(remoteFilename, fileLength, context); } else { throw new NoSuchFileException(name); } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java index 3740abc57b02d..9e38e1749d434 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -29,7 +29,6 @@ import java.nio.file.NoSuchFileException; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -41,11 +40,13 @@ import org.mockito.Mockito; +import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -204,13 +205,29 @@ public void testCreateOutput() { public void testOpenInput() throws IOException { InputStream mockInputStream = mock(InputStream.class); when(blobContainer.readBlob("segment_1")).thenReturn(mockInputStream); - Map fileInfo = new HashMap<>(); - fileInfo.put("segment_1", new PlainBlobMetadata("segment_1", 100)); - when(blobContainer.listBlobsByPrefix("segment_1")).thenReturn(fileInfo); + + BlobMetadata blobMetadata = new PlainBlobMetadata("segment_1", 100); + + when(blobContainer.listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC)).thenReturn(List.of(blobMetadata)); IndexInput indexInput = remoteDirectory.openInput("segment_1", IOContext.DEFAULT); assertTrue(indexInput instanceof RemoteIndexInput); assertEquals(100, indexInput.length()); + verify(blobContainer).listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC); + } + + public void testOpenInputWithLength() throws IOException { + InputStream mockInputStream = mock(InputStream.class); + when(blobContainer.readBlob("segment_1")).thenReturn(mockInputStream); + + BlobMetadata blobMetadata = new PlainBlobMetadata("segment_1", 100); + + when(blobContainer.listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC)).thenReturn(List.of(blobMetadata)); + + IndexInput indexInput = remoteDirectory.openInput("segment_1", 100, IOContext.DEFAULT); + assertTrue(indexInput instanceof RemoteIndexInput); + assertEquals(100, indexInput.length()); + verify(blobContainer, times(0)).listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC); } public void testOpenInputIOException() throws IOException { @@ -228,9 +245,8 @@ public void testOpenInputNoSuchFileException() throws IOException { } public void testFileLength() throws IOException { - Map fileInfo = new HashMap<>(); - fileInfo.put("segment_1", new PlainBlobMetadata("segment_1", 100)); - when(blobContainer.listBlobsByPrefix("segment_1")).thenReturn(fileInfo); + BlobMetadata blobMetadata = new PlainBlobMetadata("segment_1", 100); + when(blobContainer.listBlobsByPrefixInSortedOrder("segment_1", 1, LEXICOGRAPHIC)).thenReturn(List.of(blobMetadata)); assertEquals(100, remoteDirectory.fileLength("segment_1")); } @@ -246,13 +262,7 @@ public void testListFilesByPrefixInLexicographicOrder() throws IOException { LatchedActionListener> latchedActionListener = invocation.getArgument(3); latchedActionListener.onResponse(List.of(new PlainBlobMetadata("metadata_1", 1))); return null; - }).when(blobContainer) - .listBlobsByPrefixInSortedOrder( - eq("metadata"), - eq(1), - eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC), - any(ActionListener.class) - ); + }).when(blobContainer).listBlobsByPrefixInSortedOrder(eq("metadata"), eq(1), eq(LEXICOGRAPHIC), any(ActionListener.class)); assertEquals(List.of("metadata_1"), remoteDirectory.listFilesByPrefixInLexicographicOrder("metadata", 1)); } @@ -262,13 +272,7 @@ public void testListFilesByPrefixInLexicographicOrderEmpty() throws IOException LatchedActionListener> latchedActionListener = invocation.getArgument(3); latchedActionListener.onResponse(List.of()); return null; - }).when(blobContainer) - .listBlobsByPrefixInSortedOrder( - eq("metadata"), - eq(1), - eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC), - any(ActionListener.class) - ); + }).when(blobContainer).listBlobsByPrefixInSortedOrder(eq("metadata"), eq(1), eq(LEXICOGRAPHIC), any(ActionListener.class)); assertEquals(List.of(), remoteDirectory.listFilesByPrefixInLexicographicOrder("metadata", 1)); } @@ -278,13 +282,7 @@ public void testListFilesByPrefixInLexicographicOrderException() { LatchedActionListener> latchedActionListener = invocation.getArgument(3); latchedActionListener.onFailure(new IOException("Error")); return null; - }).when(blobContainer) - .listBlobsByPrefixInSortedOrder( - eq("metadata"), - eq(1), - eq(BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC), - any(ActionListener.class) - ); + }).when(blobContainer).listBlobsByPrefixInSortedOrder(eq("metadata"), eq(1), eq(LEXICOGRAPHIC), any(ActionListener.class)); assertThrows(IOException.class, () -> remoteDirectory.listFilesByPrefixInLexicographicOrder("metadata", 1)); } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 0f44d5c3b2f53..3e45699ef36eb 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -71,6 +71,7 @@ import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes; import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata; import static org.hamcrest.CoreMatchers.is; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.contains; import static org.mockito.Mockito.any; @@ -391,7 +392,7 @@ public void testOpenInput() throws IOException { remoteSegmentStoreDirectory.init(); IndexInput indexInput = mock(IndexInput.class); - when(remoteDataDirectory.openInput(startsWith("_0.si"), eq(IOContext.DEFAULT))).thenReturn(indexInput); + when(remoteDataDirectory.openInput(startsWith("_0.si"), anyLong(), eq(IOContext.DEFAULT))).thenReturn(indexInput); assertEquals(indexInput, remoteSegmentStoreDirectory.openInput("_0.si", IOContext.DEFAULT)); } @@ -404,7 +405,7 @@ public void testOpenInputException() throws IOException { populateMetadata(); remoteSegmentStoreDirectory.init(); - when(remoteDataDirectory.openInput(startsWith("_0.si"), eq(IOContext.DEFAULT))).thenThrow(new IOException("Error")); + when(remoteDataDirectory.openInput(startsWith("_0.si"), anyLong(), eq(IOContext.DEFAULT))).thenThrow(new IOException("Error")); assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.openInput("_0.si", IOContext.DEFAULT)); }