diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 1b9b022b09847..3c8a3f127a295 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -222,10 +222,10 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp // If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload. // Therefore, redirecting it to slow client. if ((uploadRequest.getWritePriority() == WritePriority.LOW - && blobStore.getLowPrioritySizeBasedBlockingQ().canProduce(uploadRequest.getContentLength()) == false) + && blobStore.getLowPrioritySizeBasedBlockingQ().isBelowCapacity(uploadRequest.getContentLength()) == false) || (uploadRequest.getWritePriority() != WritePriority.HIGH && uploadRequest.getWritePriority() != WritePriority.URGENT - && blobStore.getOtherPrioritySizeBasedBlockingQ().canProduce(uploadRequest.getContentLength()) == false)) { + && blobStore.getNormalPrioritySizeBasedBlockingQ().isBelowCapacity(uploadRequest.getContentLength()) == false)) { StreamContext streamContext = SocketAccess.doPrivileged( () -> writeContext.getStreamProvider(uploadRequest.getContentLength()) ); @@ -276,14 +276,16 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp () -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener) ) ); - } else { - blobStore.getOtherPrioritySizeBasedBlockingQ() + } else if (writeContext.getWritePriority() == WritePriority.NORMAL) { + blobStore.getNormalPrioritySizeBasedBlockingQ() .produce( new SizeBasedBlockingQ.Item( writeContext.getFileSize(), () -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener) ) ); + } else { + throw new IllegalStateException("Cannot perform upload for other priority types."); } } } catch (Exception e) { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java index e1a8fff87bc3e..414e6eeb9369b 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java @@ -95,7 +95,7 @@ class S3BlobStore implements BlobStore { private final AsyncExecutorContainer priorityExecutorBuilder; private final AsyncExecutorContainer normalExecutorBuilder; private final boolean multipartUploadEnabled; - private final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ; + private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; S3BlobStore( @@ -113,7 +113,7 @@ class S3BlobStore implements BlobStore { AsyncExecutorContainer urgentExecutorBuilder, AsyncExecutorContainer priorityExecutorBuilder, AsyncExecutorContainer normalExecutorBuilder, - SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ, + SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ, SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ ) { this.service = service; @@ -133,7 +133,7 @@ class S3BlobStore implements BlobStore { // Settings to initialize blobstore with. this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings()); this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings()); - this.otherPrioritySizeBasedBlockingQ = otherPrioritySizeBasedBlockingQ; + this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ; this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ; } @@ -191,8 +191,8 @@ public int getBulkDeletesSize() { return bulkDeletesSize; } - public SizeBasedBlockingQ getOtherPrioritySizeBasedBlockingQ() { - return otherPrioritySizeBasedBlockingQ; + public SizeBasedBlockingQ getNormalPrioritySizeBasedBlockingQ() { + return normalPrioritySizeBasedBlockingQ; } public SizeBasedBlockingQ getLowPrioritySizeBasedBlockingQ() { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index 3306062c653cd..269135a15d411 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -285,7 +285,7 @@ class S3Repository extends MeteredBlobStoreRepository { private final AsyncExecutorContainer priorityExecutorBuilder; private final AsyncExecutorContainer normalExecutorBuilder; private final Path pluginConfigPath; - private final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ; + private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; private volatile int bulkDeletesSize; @@ -303,7 +303,7 @@ class S3Repository extends MeteredBlobStoreRepository { final AsyncExecutorContainer normalExecutorBuilder, final S3AsyncService s3AsyncService, final boolean multipartUploadEnabled, - final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ, + final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ, final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ ) { this( @@ -319,7 +319,7 @@ class S3Repository extends MeteredBlobStoreRepository { s3AsyncService, multipartUploadEnabled, Path.of(""), - otherPrioritySizeBasedBlockingQ, + normalPrioritySizeBasedBlockingQ, lowPrioritySizeBasedBlockingQ ); } @@ -340,7 +340,7 @@ class S3Repository extends MeteredBlobStoreRepository { final S3AsyncService s3AsyncService, final boolean multipartUploadEnabled, Path pluginConfigPath, - final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ, + final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ, final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ ) { super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata)); @@ -352,7 +352,7 @@ class S3Repository extends MeteredBlobStoreRepository { this.urgentExecutorBuilder = urgentExecutorBuilder; this.priorityExecutorBuilder = priorityExecutorBuilder; this.normalExecutorBuilder = normalExecutorBuilder; - this.otherPrioritySizeBasedBlockingQ = otherPrioritySizeBasedBlockingQ; + this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ; this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ; validateRepositoryMetadata(metadata); @@ -417,7 +417,7 @@ protected S3BlobStore createBlobStore() { urgentExecutorBuilder, priorityExecutorBuilder, normalExecutorBuilder, - otherPrioritySizeBasedBlockingQ, + normalPrioritySizeBasedBlockingQ, lowPrioritySizeBasedBlockingQ ); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java index c4acb9ed67e22..b9065a52601c6 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java @@ -90,7 +90,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo private static final String FUTURE_COMPLETION = "future_completion"; private static final String STREAM_READER = "stream_reader"; private static final String LOW_TRANSFER_QUEUE_CONSUMER = "low_transfer_queue_consumer"; - private static final String OTHER_TRANSFER_QUEUE_CONSUMER = "other_transfer_queue_consumer"; + private static final String NORMAL_TRANSFER_QUEUE_CONSUMER = "normal_transfer_queue_consumer"; protected final S3Service service; private final S3AsyncService s3AsyncService; @@ -101,8 +101,8 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo private AsyncExecutorContainer priorityExecutorBuilder; private AsyncExecutorContainer normalExecutorBuilder; private ExecutorService lowTransferQConsumerService; - private ExecutorService otherTransferQConsumerService; - private SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ; + private ExecutorService normalTransferQConsumerService; + private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; private TransferSemaphoresHolder transferSemaphoresHolder; @@ -146,10 +146,10 @@ public List> getExecutorBuilders(Settings settings) { executorBuilders.add( new FixedExecutorBuilder( settings, - OTHER_TRANSFER_QUEUE_CONSUMER, - otherPriorityTransferQConsumers(settings), + NORMAL_TRANSFER_QUEUE_CONSUMER, + normalPriorityTransferQConsumers(settings), 10, - "thread_pool." + OTHER_TRANSFER_QUEUE_CONSUMER + "thread_pool." + NORMAL_TRANSFER_QUEUE_CONSUMER ) ); return executorBuilders; @@ -160,7 +160,7 @@ private int lowPriorityTransferQConsumers(Settings settings) { return Math.max(2, (int) (lowPriorityAllocation * S3Repository.S3_TRANSFER_QUEUE_CONSUMERS.get(settings))); } - private int otherPriorityTransferQConsumers(Settings settings) { + private int normalPriorityTransferQConsumers(Settings settings) { return S3Repository.S3_TRANSFER_QUEUE_CONSUMERS.get(settings); } @@ -231,12 +231,12 @@ public Collection createComponents( new AsyncTransferEventLoopGroup(normalEventLoopThreads) ); this.lowTransferQConsumerService = threadPool.executor(LOW_TRANSFER_QUEUE_CONSUMER); - this.otherTransferQConsumerService = threadPool.executor(OTHER_TRANSFER_QUEUE_CONSUMER); - int otherPriorityConsumers = otherPriorityTransferQConsumers(clusterService.getSettings()); - this.otherPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( - new ByteSizeValue(otherPriorityConsumers * 10L, ByteSizeUnit.GB), - otherTransferQConsumerService, - otherPriorityConsumers + this.normalTransferQConsumerService = threadPool.executor(NORMAL_TRANSFER_QUEUE_CONSUMER); + int normalPriorityConsumers = normalPriorityTransferQConsumers(clusterService.getSettings()); + this.normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( + new ByteSizeValue(normalPriorityConsumers * 10L, ByteSizeUnit.GB), + normalTransferQConsumerService, + normalPriorityConsumers ); int lowPriorityConsumers = lowPriorityTransferQConsumers(clusterService.getSettings()); LowPrioritySizeBasedBlockingQ lowPrioritySizeBasedBlockingQ = new LowPrioritySizeBasedBlockingQ( @@ -253,7 +253,7 @@ public Collection createComponents( TimeUnit.MINUTES ); - return CollectionUtils.arrayAsArrayList(this.otherPrioritySizeBasedBlockingQ, lowPrioritySizeBasedBlockingQ); + return CollectionUtils.arrayAsArrayList(this.normalPrioritySizeBasedBlockingQ, lowPrioritySizeBasedBlockingQ); } // New class because in core, components are injected via guice only by instance creation due to which @@ -292,7 +292,7 @@ protected S3Repository createRepository( s3AsyncService, S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING.get(clusterService.getSettings()), configPath, - otherPrioritySizeBasedBlockingQ, + normalPrioritySizeBasedBlockingQ, lowPrioritySizeBasedBlockingQ ); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQ.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQ.java index 289c92c14a8b9..ace8003d0fe55 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQ.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQ.java @@ -69,6 +69,7 @@ public void produce(Item item) throws InterruptedException { if (item == null || item.size <= 0) { throw new IllegalStateException("Invalid item input to produce."); } + log.debug(() -> "Transfer queue event received of size: " + item.size + ". Current queue utilisation: " + currentSize.get()); if (currentSize.get() + item.size >= capacity.getBytes()) { throw new S3TransferRejectedException("S3 Transfer queue capacity reached"); @@ -96,8 +97,8 @@ public int getSize() { return queue.size(); } - public boolean canProduce(long contentLength) { - return (currentSize.get() + contentLength) < capacity.getBytes(); + public boolean isBelowCapacity(long contentLength) { + return contentLength < capacity.getBytes(); } protected static class Consumer extends Thread { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolder.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolder.java index aa3b6a78e2c4a..77c2a77fb5c12 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolder.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolder.java @@ -8,6 +8,8 @@ package org.opensearch.repositories.s3.async; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.common.blobstore.stream.write.WritePriority; import java.util.Objects; @@ -18,7 +20,7 @@ * Transfer semaphore holder for controlled transfer of data to remote. */ public class TransferSemaphoresHolder { - + private static final Logger log = LogManager.getLogger(TransferSemaphoresHolder.class); // For tests protected TypeSemaphore lowPrioritySemaphore; protected TypeSemaphore highPrioritySemaphore; @@ -88,6 +90,14 @@ public RequestContext createRequestContext() { * transfers. */ public TypeSemaphore acquirePermit(WritePriority writePriority, RequestContext requestContext) throws InterruptedException { + log.debug( + () -> "Acquire permit request for transfer type: " + + writePriority + + ". Available high priority permits: " + + highPrioritySemaphore.availablePermits() + + " and low priority permits: " + + lowPrioritySemaphore.availablePermits() + ); // Try acquiring low priority permit or high priority permit immediately if available. // Otherwise, we wait for low priority permit. if (Objects.requireNonNull(writePriority) == WritePriority.LOW) { diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index 4ec1baf634f52..d5f664b1f0608 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -100,7 +100,7 @@ public class S3BlobContainerMockClientTests extends OpenSearchTestCase implement private ScheduledExecutorService scheduler; private AsyncTransferEventLoopGroup transferNIOGroup; private S3BlobContainer blobContainer; - private SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ; + private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; static class MockS3AsyncService extends S3AsyncService { @@ -377,7 +377,7 @@ public void setUp() throws Exception { transferQueueConsumerService = Executors.newFixedThreadPool(20); scheduler = new Scheduler.SafeScheduledThreadPoolExecutor(1); transferNIOGroup = new AsyncTransferEventLoopGroup(1); - otherPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( + normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 10L, ByteSizeUnit.GB), transferQueueConsumerService, 10 @@ -387,7 +387,7 @@ public void setUp() throws Exception { transferQueueConsumerService, 5 ); - otherPrioritySizeBasedBlockingQ.start(); + normalPrioritySizeBasedBlockingQ.start(); lowPrioritySizeBasedBlockingQ.start(); blobContainer = createBlobContainer(); super.setUp(); @@ -401,7 +401,7 @@ public void tearDown() throws Exception { streamReaderService.shutdown(); remoteTransferRetry.shutdown(); transferQueueConsumerService.shutdown(); - otherPrioritySizeBasedBlockingQ.close(); + normalPrioritySizeBasedBlockingQ.close(); lowPrioritySizeBasedBlockingQ.close(); scheduler.shutdown(); transferNIOGroup.close(); @@ -448,7 +448,7 @@ private S3BlobStore createBlobStore() { asyncExecutorContainer, asyncExecutorContainer, asyncExecutorContainer, - otherPrioritySizeBasedBlockingQ, + normalPrioritySizeBasedBlockingQ, lowPrioritySizeBasedBlockingQ ); } @@ -651,7 +651,7 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, W when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize); when(blobStore.getLowPrioritySizeBasedBlockingQ()).thenReturn(sizeBasedBlockingQ); - when(blobStore.getOtherPrioritySizeBasedBlockingQ()).thenReturn(sizeBasedBlockingQ); + when(blobStore.getNormalPrioritySizeBasedBlockingQ()).thenReturn(sizeBasedBlockingQ); final boolean serverSideEncryption = randomBoolean(); when(blobStore.serverSideEncryption()).thenReturn(serverSideEncryption); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java index 95d807efa92cc..f2f87231e9620 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -122,7 +122,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes private ExecutorService transferQueueConsumerService; private ScheduledExecutorService scheduler; private AsyncTransferEventLoopGroup transferNIOGroup; - private SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ; + private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; @Before @@ -137,7 +137,7 @@ public void setUp() throws Exception { remoteTransferRetry = Executors.newFixedThreadPool(20); transferQueueConsumerService = Executors.newFixedThreadPool(2); scheduler = new ScheduledThreadPoolExecutor(1); - otherPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( + normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 5L, ByteSizeUnit.GB), transferQueueConsumerService, 2 @@ -147,7 +147,7 @@ public void setUp() throws Exception { transferQueueConsumerService, 2 ); - otherPrioritySizeBasedBlockingQ.start(); + normalPrioritySizeBasedBlockingQ.start(); lowPrioritySizeBasedBlockingQ.start(); // needed by S3AsyncService SocketAccess.doPrivileged(() -> System.setProperty("opensearch.path.conf", configPath().toString())); @@ -163,7 +163,7 @@ public void tearDown() throws Exception { remoteTransferRetry.shutdown(); transferQueueConsumerService.shutdown(); scheduler.shutdown(); - otherPrioritySizeBasedBlockingQ.close(); + normalPrioritySizeBasedBlockingQ.close(); lowPrioritySizeBasedBlockingQ.close(); IOUtils.close(transferNIOGroup); @@ -257,7 +257,7 @@ protected AsyncMultiStreamBlobContainer createBlobContainer( asyncExecutorContainer, asyncExecutorContainer, asyncExecutorContainer, - otherPrioritySizeBasedBlockingQ, + normalPrioritySizeBasedBlockingQ, lowPrioritySizeBasedBlockingQ ) ) { diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java index 9888612b444bc..4e8db0a3a8c69 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java @@ -14,8 +14,12 @@ * @opensearch.internal */ public enum WritePriority { + // Used for segment transfers during refresh, flush or merges NORMAL, + // Used for transfer of translog or ckp files. HIGH, + // Used for transfer of remote cluster state URGENT, + // All other background transfers such as in snapshot recovery, recovery from local store or index etc. LOW }