From 9f410bce9899cc95947bb4404e297d6537bbb65d Mon Sep 17 00:00:00 2001 From: sopguo Date: Tue, 10 Sep 2024 21:24:19 -0700 Subject: [PATCH] Move IdConverter inside router phase1 (#2886) * Move IdConverter inside router * pass in idConverter to router * Add blobPath in router.putBlob --------- Co-authored-by: Sophie Guo --- .../com/github/ambry/account/MockRouter.java | 2 +- .../com/github/ambry/config/RouterConfig.java | 19 +++++++ .../github/ambry/frontend/IdConverter.java | 21 ++++---- .../java/com/github/ambry/router/Router.java | 19 ++++--- .../ambry/rest/MockRestRequestService.java | 4 +- .../frontend/AmbryIdConverterFactory.java | 40 +++++++-------- .../ambry/frontend/NamedBlobPutHandler.java | 7 +-- .../ambry/frontend/PostBlobHandler.java | 6 +-- .../s3/S3MultipartCompleteUploadHandler.java | 2 +- .../s3/S3MultipartUploadPartHandler.java | 3 +- .../frontend/AmbryIdConverterFactoryTest.java | 2 +- .../FrontendRestRequestServiceTest.java | 21 ++++---- .../frontend/NamedBlobPutHandlerTest.java | 14 ++--- .../ambry/frontend/TtlUpdateHandlerTest.java | 2 +- .../ambry/router/NonBlockingRouter.java | 51 +++++++++++-------- .../router/NonBlockingRouterFactory.java | 23 ++++++++- .../ambry/router/NonBlockingQuotaTest.java | 12 ++--- .../router/NonBlockingRouterTestBase.java | 2 +- .../github/ambry/router/PutManagerTest.java | 2 +- .../server/RouterServerTestFramework.java | 2 +- .../github/ambry/router/InMemoryRouter.java | 2 +- .../github/ambry/server/ServerTestUtil.java | 2 +- .../tools/admin/ConcurrencyTestTool.java | 2 +- .../ambry/tools/perf/rest/PerfRouter.java | 12 +++-- 24 files changed, 166 insertions(+), 106 deletions(-) diff --git a/ambry-account/src/test/java/com/github/ambry/account/MockRouter.java b/ambry-account/src/test/java/com/github/ambry/account/MockRouter.java index 9b4cc6745b..450edbfac3 100644 --- a/ambry-account/src/test/java/com/github/ambry/account/MockRouter.java +++ b/ambry-account/src/test/java/com/github/ambry/account/MockRouter.java @@ -100,7 +100,7 @@ public Future getBlob(String blobId, GetBlobOptions options, Call @Override public Future putBlob(BlobProperties blobProperties, byte[] userMetadata, ReadableStreamChannel channel, - PutBlobOptions options, Callback callback, QuotaChargeCallback quotaChargeCallback) { + PutBlobOptions options, Callback callback, QuotaChargeCallback quotaChargeCallback, String blobPath) { lock.lock(); try { FutureResult future = new FutureResult<>(); diff --git a/ambry-api/src/main/java/com/github/ambry/config/RouterConfig.java b/ambry-api/src/main/java/com/github/ambry/config/RouterConfig.java index 9f77d771a4..a6f4d46145 100644 --- a/ambry-api/src/main/java/com/github/ambry/config/RouterConfig.java +++ b/ambry-api/src/main/java/com/github/ambry/config/RouterConfig.java @@ -761,11 +761,30 @@ public class RouterConfig { @Default("false") public final boolean routerParanoidDurabilityEnabled; + /** + * This is set in frontendConfig until id converter been fully migrate to router. + */ + public final String idConverterFactory; + + /** + * This is set in frontendConfig until id converter been fully migrate to router. + */ + public final String idSigningServiceFactory; + + /** + * This is set in frontendConfig until id converter been fully migrate to router. + */ + public final String namedBlobDbFactory; + /** * Create a RouterConfig instance. * @param verifiableProperties the properties map to refer to. */ public RouterConfig(VerifiableProperties verifiableProperties) { + FrontendConfig frontendConfig = new FrontendConfig(verifiableProperties); + idConverterFactory = frontendConfig.idConverterFactory; + idSigningServiceFactory = frontendConfig.idSigningServiceFactory; + namedBlobDbFactory = frontendConfig.namedBlobDbFactory; routerBlobMetadataCacheId = verifiableProperties.getString(ROUTER_BLOB_METADATA_CACHE_ID, "routerBlobMetadataCache"); routerMaxNumMetadataCacheEntries = diff --git a/ambry-api/src/main/java/com/github/ambry/frontend/IdConverter.java b/ambry-api/src/main/java/com/github/ambry/frontend/IdConverter.java index 4d5a7ea128..9729ae55ae 100644 --- a/ambry-api/src/main/java/com/github/ambry/frontend/IdConverter.java +++ b/ambry-api/src/main/java/com/github/ambry/frontend/IdConverter.java @@ -15,6 +15,7 @@ import com.github.ambry.commons.CallbackUtils; import com.github.ambry.messageformat.BlobInfo; +import com.github.ambry.messageformat.BlobProperties; import com.github.ambry.rest.RestRequest; import com.github.ambry.commons.Callback; import java.io.Closeable; @@ -53,24 +54,26 @@ default CompletableFuture convert(RestRequest restRequest, String input) /** * Converts an ID. - * @param restRequest {@link RestRequest} representing the request. - * @param input the ID that needs to be converted. - * @param blobInfo the {@link BlobInfo} for an uploaded blob. Can be null for non-upload use cases. - * @param callback the {@link Callback} to invoke once the converted ID is available. Can be null. + * + * @param restRequest {@link RestRequest} representing the request. + * @param input the ID that needs to be converted. + * @param blobProperties the {@link BlobProperties} for an uploaded blob. Can be null for non-upload use cases. + * @param callback the {@link Callback} to invoke once the converted ID is available. Can be null. * @return a {@link Future} that will eventually contain the converted ID. */ - default Future convert(RestRequest restRequest, String input, BlobInfo blobInfo, Callback callback) { + default Future convert(RestRequest restRequest, String input, BlobProperties blobProperties, Callback callback) { return convert(restRequest, input, callback); } /** * Converts an ID. - * @param restRequest {@link RestRequest} representing the request. - * @param input the ID that needs to be converted. - * @param blobInfo the {@link BlobInfo} for an uploaded blob. Can be null for non-upload use cases. + * + * @param restRequest {@link RestRequest} representing the request. + * @param input the ID that needs to be converted. + * @param blobProperties the {@link BlobProperties} for an uploaded blob. Can be null for non-upload use cases. * @return a {@link CompletableFuture} that will eventually contain the converted ID. */ - default CompletableFuture convert(RestRequest restRequest, String input, BlobInfo blobInfo) { + default CompletableFuture convert(RestRequest restRequest, String input, BlobProperties blobProperties) { return convert(restRequest, input); } } diff --git a/ambry-api/src/main/java/com/github/ambry/router/Router.java b/ambry-api/src/main/java/com/github/ambry/router/Router.java index c23ac1b3d2..d7d141f392 100644 --- a/ambry-api/src/main/java/com/github/ambry/router/Router.java +++ b/ambry-api/src/main/java/com/github/ambry/router/Router.java @@ -47,17 +47,20 @@ Future getBlob(String blobId, GetBlobOptions options, Callback putBlob(BlobProperties blobProperties, byte[] userMetadata, ReadableStreamChannel channel, - PutBlobOptions options, Callback callback, QuotaChargeCallback quotaChargeCallback); + PutBlobOptions options, Callback callback, QuotaChargeCallback quotaChargeCallback, String blobPath); /** * Requests for a new metadata blob to be put asynchronously and invokes the {@link Callback} when the request @@ -192,7 +195,7 @@ default CompletableFuture stitchBlob(BlobProperties blobProperties, byte default CompletableFuture putBlob(BlobProperties blobProperties, byte[] userMetadata, ReadableStreamChannel channel, PutBlobOptions options) { CompletableFuture future = new CompletableFuture<>(); - putBlob(blobProperties, userMetadata, channel, options, CallbackUtils.fromCompletableFuture(future), null); + putBlob(blobProperties, userMetadata, channel, options, CallbackUtils.fromCompletableFuture(future), null, null); return future; } diff --git a/ambry-api/src/test/java/com/github/ambry/rest/MockRestRequestService.java b/ambry-api/src/test/java/com/github/ambry/rest/MockRestRequestService.java index dd6698fb65..4185f129b7 100644 --- a/ambry-api/src/test/java/com/github/ambry/rest/MockRestRequestService.java +++ b/ambry-api/src/test/java/com/github/ambry/rest/MockRestRequestService.java @@ -128,7 +128,7 @@ public void handlePost(RestRequest restRequest, RestResponseChannel restResponse BlobProperties blobProperties = RestUtils.buildBlobProperties(restRequest.getArgs()); byte[] usermetadata = RestUtils.buildUserMetadata(restRequest.getArgs()); router.putBlob(blobProperties, usermetadata, restRequest, new PutBlobOptionsBuilder().build(), - new MockPostCallback(this, restRequest, restResponseChannel, blobProperties), null); + new MockPostCallback(this, restRequest, restResponseChannel, blobProperties), null, null); } catch (RestServiceException e) { handleResponse(restRequest, restResponseChannel, null, e); } @@ -159,7 +159,7 @@ public void handlePut(RestRequest restRequest, RestResponseChannel restResponseC BlobProperties blobProperties = RestUtils.buildBlobProperties(restRequest.getArgs()); byte[] usermetadata = RestUtils.buildUserMetadata(restRequest.getArgs()); router.putBlob(blobProperties, usermetadata, restRequest, new PutBlobOptionsBuilder().build(), - new MockPostCallback(this, restRequest, restResponseChannel, blobProperties), null); + new MockPostCallback(this, restRequest, restResponseChannel, blobProperties), null, null); } catch (RestServiceException e) { handleResponse(restRequest, restResponseChannel, null, e); } diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbryIdConverterFactory.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbryIdConverterFactory.java index 76e421e3f3..a53a393b3d 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbryIdConverterFactory.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbryIdConverterFactory.java @@ -19,7 +19,6 @@ import com.github.ambry.config.FrontendConfig; import com.github.ambry.config.VerifiableProperties; import com.github.ambry.frontend.s3.S3BaseHandler; -import com.github.ambry.frontend.s3.S3MultipartUploadHandler; import com.github.ambry.messageformat.BlobInfo; import com.github.ambry.messageformat.BlobProperties; import com.github.ambry.named.DeleteResult; @@ -89,19 +88,18 @@ public Future convert(RestRequest restRequest, String input, Callback convert(RestRequest restRequest, String input, BlobInfo blobInfo, Callback callback) { + public Future convert(RestRequest restRequest, String input, BlobProperties blobProperties, Callback callback) { final CompletableFuture future = new CompletableFuture<>(); String convertedId = null; Exception exception = null; @@ -115,7 +113,7 @@ public Future convert(RestRequest restRequest, String input, BlobInfo bl // TODO [S3] Add ID conversion for S3 POST requests (coming during multipart uploads) as well convertedId = "/" + signIdIfRequired(restRequest, input); } else { - CallbackUtils.callCallbackAfter(convertId(input, restRequest, blobInfo), + CallbackUtils.callCallbackAfter(convertId(input, restRequest, blobProperties), (id, e) -> completeConversion(id, e, future, callback)); } } catch (Exception e) { @@ -153,13 +151,14 @@ private void completeConversion(T conversionResult, Exception exception, Com /** * Convert the input ID to the requested output. If it's the named blob request, return the blobId from NameBlobDb, * otherwise return the input with leading slash and extension be stripped. - * @param input the input blob ID. - * @param restRequest the {@link RestRequest} to set arguments in. - * @param blobInfo the {@link BlobInfo} for an uploaded blob. This will be used for named blob PUT requests. + * + * @param input the input blob ID. + * @param restRequest the {@link RestRequest} to set arguments in. + * @param blobProperties the {@link BlobProperties} for an uploaded blob. This will be used for named blob PUT requests. * @return the {@link CompletionStage} that will be completed with the converted ID * @throws RestServiceException */ - private CompletionStage convertId(String input, RestRequest restRequest, BlobInfo blobInfo) + private CompletionStage convertId(String input, RestRequest restRequest, BlobProperties blobProperties) throws RestServiceException { CompletionStage conversionFuture; LOGGER.debug("input for convertId : " + input); @@ -185,17 +184,16 @@ private CompletionStage convertId(String input, RestRequest restRequest, namedBlobPath.getBlobName(), getOption).thenApply(NamedBlobRecord::getBlobId); } } else if (isNamedBlobPutRequest(restRequest) || isS3MultipartUploadCompleteRequest(restRequest)) { - Objects.requireNonNull(blobInfo, "blobInfo cannot be null."); + Objects.requireNonNull(blobProperties, "blobProperties cannot be null."); NamedBlobPath namedBlobPath = NamedBlobPath.parse(RestUtils.getRequestPath(restRequest), restRequest.getArgs()); String blobId = RestUtils.stripSlashAndExtensionFromId(input); - BlobProperties properties = blobInfo.getBlobProperties(); long expirationTimeMs = - Utils.addSecondsToEpochTime(properties.getCreationTimeInMs(), properties.getTimeToLiveInSeconds()); + Utils.addSecondsToEpochTime(blobProperties.getCreationTimeInMs(), blobProperties.getTimeToLiveInSeconds()); // Please note that the modified_ts column in DB will be auto-populated by the DB server. NamedBlobRecord record = new NamedBlobRecord(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(), - namedBlobPath.getBlobName(), blobId, expirationTimeMs, 0, properties.getBlobSize()); + namedBlobPath.getBlobName(), blobId, expirationTimeMs, 0, blobProperties.getBlobSize()); NamedBlobState state = NamedBlobState.READY; - if (properties.getTimeToLiveInSeconds() == Utils.Infinite_Time) { + if (blobProperties.getTimeToLiveInSeconds() == Utils.Infinite_Time) { // Set named blob state as 'IN_PROGRESS', will set the state to be 'READY' in the ttlUpdate success callback: routerTtlUpdateCallback state = NamedBlobState.IN_PROGRESS; } diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPutHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPutHandler.java index 2c802fdf9b..c3704ecd2b 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPutHandler.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPutHandler.java @@ -235,7 +235,8 @@ private Callback securityPostProcessRequestCallback(BlobInfo blobInfo) { } PutBlobOptions options = getPutBlobOptionsFromRequest(); router.putBlob(getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(), restRequest, options, - routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true)); + routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true), + RestUtils.getRequestPath(restRequest).getOperationOrBlobId(false)); } }, uri, LOGGER, deleteDatasetCallback); } @@ -250,7 +251,7 @@ private Callback routerPutBlobCallback(BlobInfo blobInfo) { return buildCallback(frontendMetrics.putRouterPutBlobMetrics, blobId -> { restResponseChannel.setHeader(RestUtils.Headers.BLOB_SIZE, restRequest.getBlobBytesReceived()); blobInfo.getBlobProperties().setBlobSize(restRequest.getBlobBytesReceived()); - idConverter.convert(restRequest, blobId, blobInfo, idConverterCallback(blobInfo, blobId)); + idConverter.convert(restRequest, blobId, blobInfo.getBlobProperties(), idConverterCallback(blobInfo, blobId)); }, uri, LOGGER, deleteDatasetCallback); } @@ -284,7 +285,7 @@ private Callback routerStitchBlobCallback(BlobInfo blobInfo, // The actual blob size is now present in the instance of BlobProperties passed to the router.stitchBlob(). // Update it in the BlobInfo so that IdConverter can add it to the named blob DB blobInfo.getBlobProperties().setBlobSize(propertiesPassedInRouterUpload.getBlobSize()); - idConverter.convert(restRequest, blobId, blobInfo, idConverterCallback(blobInfo, blobId)); + idConverter.convert(restRequest, blobId, blobInfo.getBlobProperties(), idConverterCallback(blobInfo, blobId)); }, uri, LOGGER, deleteDatasetCallback); } diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/PostBlobHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/PostBlobHandler.java index 6ff63da1de..d3ab33d0fb 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/PostBlobHandler.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/PostBlobHandler.java @@ -195,8 +195,8 @@ private Callback securityPostProcessRequestCallback(BlobInfo blobInfo) { } else { PutBlobOptions options = getPutBlobOptionsFromRequest(); router.putBlob(blobInfo.getBlobProperties(), blobInfo.getUserMetadata(), restRequest, options, - routerPutBlobCallback(blobInfo), - QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true)); + routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true), + null); } }, uri, LOGGER, finalCallback); } @@ -225,7 +225,7 @@ private Callback fetchStitchRequestBodyCallback(RetainingAsyncWritableChan */ private Callback routerStitchBlobCallback(BlobInfo blobInfo) { return buildCallback(frontendMetrics.postRouterStitchBlobMetrics, - blobId -> idConverter.convert(restRequest, blobId, blobInfo, idConverterCallback(blobInfo)), uri, LOGGER, + blobId -> idConverter.convert(restRequest, blobId, blobInfo.getBlobProperties(), idConverterCallback(blobInfo)), uri, LOGGER, finalCallback); } diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3MultipartCompleteUploadHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3MultipartCompleteUploadHandler.java index e269206374..96f13b9e11 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3MultipartCompleteUploadHandler.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3MultipartCompleteUploadHandler.java @@ -239,7 +239,7 @@ private Callback routerStitchBlobCallback(BlobInfo blobInfo, // The actual blob size is now present in the instance of BlobProperties passed to the router.stitchBlob(). // Update it in the BlobInfo so that IdConverter can add it to the named blob DB blobInfo.getBlobProperties().setBlobSize(propertiesPassedInRouterUpload.getBlobSize()); - idConverter.convert(restRequest, blobId, blobInfo, idConverterCallback(blobInfo, blobId)); + idConverter.convert(restRequest, blobId, blobInfo.getBlobProperties(), idConverterCallback(blobInfo, blobId)); }, uri, LOGGER, finalCallback); } diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3MultipartUploadPartHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3MultipartUploadPartHandler.java index 3888271e77..4ff5d292a8 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3MultipartUploadPartHandler.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3MultipartUploadPartHandler.java @@ -173,7 +173,8 @@ private Callback securityPostProcessRequestCallback(BlobInfo blobInfo) { } else { PutBlobOptions options = getPutBlobOptionsFromRequest(); router.putBlob(blobInfo.getBlobProperties(), blobInfo.getUserMetadata(), restRequest, options, - routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true)); + routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true), + null); } }, uri, logger, finalCallback); } diff --git a/ambry-frontend/src/test/java/com/github/ambry/frontend/AmbryIdConverterFactoryTest.java b/ambry-frontend/src/test/java/com/github/ambry/frontend/AmbryIdConverterFactoryTest.java index cceaf70f27..66043bcdc1 100644 --- a/ambry-frontend/src/test/java/com/github/ambry/frontend/AmbryIdConverterFactoryTest.java +++ b/ambry-frontend/src/test/java/com/github/ambry/frontend/AmbryIdConverterFactoryTest.java @@ -254,7 +254,7 @@ private void testConversionForNamedBlob(IdConverter idConverter, RestMethod rest } IdConversionCallback callback = new IdConversionCallback(); assertEquals("Converted ID does not match expected (Future)", expectedOutput, - idConverter.convert(restRequest, input, blobInfo, callback).get(5, TimeUnit.SECONDS)); + idConverter.convert(restRequest, input, blobInfo.getBlobProperties(), callback).get(5, TimeUnit.SECONDS)); assertEquals("Converted ID does not match expected (Callback)", expectedOutput, callback.result); assertTrue("Named Blob Put request should have named blob version in its internal keys", restRequest.getArgs().containsKey(RestUtils.InternalKeys.NAMED_BLOB_VERSION)); diff --git a/ambry-frontend/src/test/java/com/github/ambry/frontend/FrontendRestRequestServiceTest.java b/ambry-frontend/src/test/java/com/github/ambry/frontend/FrontendRestRequestServiceTest.java index ba5ba4e960..2914eb8787 100644 --- a/ambry-frontend/src/test/java/com/github/ambry/frontend/FrontendRestRequestServiceTest.java +++ b/ambry-frontend/src/test/java/com/github/ambry/frontend/FrontendRestRequestServiceTest.java @@ -742,7 +742,7 @@ public void testDatasetVersionFallback() throws Exception { // add dataset versio and mock router.putBlob failed. Router mockRouter = mock(Router.class); - when(mockRouter.putBlob(any(), any(), any(), any(), any(), any())).thenThrow(new RuntimeException()); + when(mockRouter.putBlob(any(), any(), any(), any(), any(), any(), any())).thenThrow(new RuntimeException()); frontendRestRequestService = new FrontendRestRequestService(frontendConfig, frontendMetrics, mockRouter, clusterMap, idConverterFactory, securityServiceFactory, urlSigningService, idSigningService, namedBlobDb, accountService, @@ -4205,7 +4205,7 @@ class FrontendTestIdConverterFactory implements IdConverterFactory { String translation = null; boolean returnInputIfTranslationNull = false; volatile String lastInput = null; - volatile BlobInfo lastBlobInfo = null; + volatile BlobProperties lastBlobProperties = null; volatile String lastConvertedId = null; @Override @@ -4222,7 +4222,7 @@ public Future convert(RestRequest restRequest, String input, Callback convert(RestRequest restRequest, String input, BlobInfo blobInfo, Callback callback) { + public Future convert(RestRequest restRequest, String input, BlobProperties blobProperties, Callback callback) { if (!isOpen) { throw new IllegalStateException("IdConverter closed"); } @@ -4230,7 +4230,7 @@ public Future convert(RestRequest restRequest, String input, BlobInfo bl && RestUtils.getRequestPath(restRequest).matchesOperation(Operations.NAMED_BLOB)) { restRequest.setArg(RestUtils.InternalKeys.NAMED_BLOB_VERSION, -1L); } - return completeOperation(input, blobInfo, callback); + return completeOperation(input, blobProperties, callback); } @Override @@ -4240,14 +4240,15 @@ public void close() { /** * Completes the operation by creating and invoking a {@link Future} and invoking the {@code callback} if non-null. - * @param input the original input ID received - * @param blobInfo the blob info received. - * @param callback the {@link Callback} to invoke. Can be null. + * + * @param input the original input ID received + * @param blobProperties the blob info received. + * @param callback the {@link Callback} to invoke. Can be null. * @return the created {@link Future}. */ - private Future completeOperation(String input, BlobInfo blobInfo, Callback callback) { + private Future completeOperation(String input, BlobProperties blobProperties, Callback callback) { lastInput = input; - lastBlobInfo = blobInfo; + lastBlobProperties = blobProperties; if (exceptionToThrow != null) { throw exceptionToThrow; } @@ -4410,7 +4411,7 @@ public Future getBlob(String blobId, GetBlobOptions options, Call @Override public Future putBlob(BlobProperties blobProperties, byte[] usermetadata, ReadableStreamChannel channel, - PutBlobOptions options, Callback callback, QuotaChargeCallback quotaChargeCallback) { + PutBlobOptions options, Callback callback, QuotaChargeCallback quotaChargeCallback, String blobPath) { return completeOperation(TestUtils.getRandomString(10), callback, OpType.PutBlob); } diff --git a/ambry-frontend/src/test/java/com/github/ambry/frontend/NamedBlobPutHandlerTest.java b/ambry-frontend/src/test/java/com/github/ambry/frontend/NamedBlobPutHandlerTest.java index 759d190353..0cac1072cf 100644 --- a/ambry-frontend/src/test/java/com/github/ambry/frontend/NamedBlobPutHandlerTest.java +++ b/ambry-frontend/src/test/java/com/github/ambry/frontend/NamedBlobPutHandlerTest.java @@ -184,7 +184,7 @@ public void updateNamedBlobAllowedTest() throws Exception { assertEquals("Unexpected blob content stored", ByteBuffer.wrap(content), blob.getBlob()); assertEquals("Unexpected TTL in blob", TestUtils.TTL_SECS, blob.getBlobProperties().getTimeToLiveInSeconds()); assertEquals("Unexpected TTL in named blob DB", TestUtils.TTL_SECS, - idConverterFactory.lastBlobInfo.getBlobProperties().getTimeToLiveInSeconds()); + idConverterFactory.lastBlobProperties.getTimeToLiveInSeconds()); assertEquals("Unexpected response status", restResponseChannel.getStatus(), ResponseStatus.Ok); } @@ -440,7 +440,7 @@ private void stitchDatasetVersionAndVerify(byte[] requestBody, List e RestResponseChannel restResponseChannel = new MockRestResponseChannel(); FutureResult future = new FutureResult<>(); idConverterFactory.lastInput = null; - idConverterFactory.lastBlobInfo = null; + idConverterFactory.lastBlobProperties = null; idConverterFactory.lastConvertedId = null; namedBlobPutHandler.handle(request, restResponseChannel, future::done); future.get(TIMEOUT_SECS, TimeUnit.SECONDS); @@ -457,7 +457,7 @@ private void stitchDatasetVersionAndVerify(byte[] requestBody, List e assertEquals("Unexpected blob size", Long.toString(getStitchedBlobSize(expectedStitchedChunks)), restResponseChannel.getHeader(RestUtils.Headers.BLOB_SIZE)); assertEquals("Unexpected TTL in named blob DB", -1, - idConverterFactory.lastBlobInfo.getBlobProperties().getTimeToLiveInSeconds()); + idConverterFactory.lastBlobProperties.getTimeToLiveInSeconds()); assertEquals("Unexpected TTL in blob", -1, blob.getBlobProperties().getTimeToLiveInSeconds()); } } @@ -482,7 +482,7 @@ private void stitchBlobAndVerify(byte[] requestBody, List expectedSti RestResponseChannel restResponseChannel = new MockRestResponseChannel(); FutureResult future = new FutureResult<>(); idConverterFactory.lastInput = null; - idConverterFactory.lastBlobInfo = null; + idConverterFactory.lastBlobProperties = null; idConverterFactory.lastConvertedId = null; namedBlobPutHandler.handle(request, restResponseChannel, future::done); if (errorChecker == null) { @@ -501,7 +501,7 @@ private void stitchBlobAndVerify(byte[] requestBody, List expectedSti assertEquals("Unexpected blob size", Long.toString(getStitchedBlobSize(expectedStitchedChunks)), restResponseChannel.getHeader(RestUtils.Headers.BLOB_SIZE)); assertEquals("Unexpected TTL in named blob DB", ttl, - idConverterFactory.lastBlobInfo.getBlobProperties().getTimeToLiveInSeconds()); + idConverterFactory.lastBlobProperties.getTimeToLiveInSeconds()); assertEquals("Unexpected TTL in blob", ttl, blob.getBlobProperties().getTimeToLiveInSeconds()); } else { TestUtils.assertException(ExecutionException.class, () -> future.get(TIMEOUT_SECS, TimeUnit.SECONDS), @@ -591,7 +591,7 @@ private void putBlobAndVerify(ThrowingConsumer errorChecker, RestResponseChannel restResponseChannel = new MockRestResponseChannel(); FutureResult future = new FutureResult<>(); idConverterFactory.lastInput = null; - idConverterFactory.lastBlobInfo = null; + idConverterFactory.lastBlobProperties = null; idConverterFactory.lastConvertedId = null; namedBlobPutHandler.handle(request, restResponseChannel, future::done); if (errorChecker == null) { @@ -602,7 +602,7 @@ private void putBlobAndVerify(ThrowingConsumer errorChecker, assertEquals("Unexpected blob content stored", ByteBuffer.wrap(content), blob.getBlob()); assertEquals("Unexpected TTL in blob", ttl, blob.getBlobProperties().getTimeToLiveInSeconds()); assertEquals("Unexpected TTL in named blob DB", ttl, - idConverterFactory.lastBlobInfo.getBlobProperties().getTimeToLiveInSeconds()); + idConverterFactory.lastBlobProperties.getTimeToLiveInSeconds()); assertEquals("Unexpected response status", restResponseChannel.getStatus(), ResponseStatus.Ok); } else { TestUtils.assertException(ExecutionException.class, () -> future.get(TIMEOUT_SECS, TimeUnit.SECONDS), diff --git a/ambry-frontend/src/test/java/com/github/ambry/frontend/TtlUpdateHandlerTest.java b/ambry-frontend/src/test/java/com/github/ambry/frontend/TtlUpdateHandlerTest.java index e64b45babe..76a4d28414 100644 --- a/ambry-frontend/src/test/java/com/github/ambry/frontend/TtlUpdateHandlerTest.java +++ b/ambry-frontend/src/test/java/com/github/ambry/frontend/TtlUpdateHandlerTest.java @@ -93,7 +93,7 @@ public TtlUpdateHandlerTest() throws Exception { accountAndContainerInjector, metrics, CLUSTER_MAP, QuotaTestUtils.createDummyQuotaManager(), null, null); ReadableStreamChannel channel = new ByteBufferReadableStreamChannel(ByteBuffer.wrap(BLOB_DATA)); blobId = router.putBlob(BLOB_PROPERTIES, new byte[0], channel, new PutBlobOptionsBuilder().build(), null, - QuotaTestUtils.createTestQuotaChargeCallback(QuotaMethod.WRITE)).get(1, TimeUnit.SECONDS); + QuotaTestUtils.createTestQuotaChargeCallback(QuotaMethod.WRITE), null).get(1, TimeUnit.SECONDS); idConverterFactory.translation = blobId; } diff --git a/ambry-router/src/main/java/com/github/ambry/router/NonBlockingRouter.java b/ambry-router/src/main/java/com/github/ambry/router/NonBlockingRouter.java index ef23dabc36..7cf2cc1e3a 100644 --- a/ambry-router/src/main/java/com/github/ambry/router/NonBlockingRouter.java +++ b/ambry-router/src/main/java/com/github/ambry/router/NonBlockingRouter.java @@ -22,6 +22,8 @@ import com.github.ambry.commons.CallbackUtils; import com.github.ambry.commons.ResponseHandler; import com.github.ambry.config.RouterConfig; +import com.github.ambry.frontend.IdConverter; +import com.github.ambry.frontend.IdConverterFactory; import com.github.ambry.messageformat.BlobInfo; import com.github.ambry.messageformat.BlobProperties; import com.github.ambry.network.NetworkClient; @@ -77,32 +79,36 @@ public class NonBlockingRouter implements Router { public final AtomicInteger currentOperationsCount = new AtomicInteger(0); private RepairRequestsDb repairRequestsDb = null; + private IdConverter idConverter = null; /** * Constructs a NonBlockingRouter. - * @param routerConfig the configs for the router. - * @oaran repairRequestsDbFactory the {@link RepairRequestsDbFactory} use to create the {@link RepairRequestsDb} - * @param routerMetrics the metrics for the router. - * @param networkClientFactory the {@link NetworkClientFactory} used by the {@link OperationController} to create - * instances of {@link NetworkClient}. - * @param notificationSystem the notification system to use to notify about blob creations and deletions. - * @param clusterMap the cluster map for the cluster. - * @param kms {@link KeyManagementService} to assist in fetching container keys for encryption or decryption - * @param cryptoService {@link CryptoService} to assist in encryption or decryption - * @param cryptoJobHandler {@link CryptoJobHandler} to assist in the execution of crypto jobs - * @param accountService the {@link AccountService} to use. - * @param time the time instance. + * + * @param routerConfig the configs for the router. + * @param routerMetrics the metrics for the router. + * @param networkClientFactory the {@link NetworkClientFactory} used by the {@link OperationController} to create + * instances of {@link NetworkClient}. + * @param notificationSystem the notification system to use to notify about blob creations and deletions. + * @param clusterMap the cluster map for the cluster. + * @param kms {@link KeyManagementService} to assist in fetching container keys for encryption or + * decryption + * @param cryptoService {@link CryptoService} to assist in encryption or decryption + * @param cryptoJobHandler {@link CryptoJobHandler} to assist in the execution of crypto jobs + * @param accountService the {@link AccountService} to use. + * @param time the time instance. * @param defaultPartitionClass the default partition class to choose partitions from (if none is found in the * container config). Can be {@code null} if no affinity is required for the puts for * which the container contains no partition class hints. - * @throws IOException if the OperationController could not be successfully created. + * @param idConverterFactory the {@link IdConverterFactory} used to create the {@link IdConverter} + * @throws IOException if the OperationController could not be successfully created. * @throws ReflectiveOperationException if the OperationController could not be successfully created. + * @oaran repairRequestsDbFactory the {@link RepairRequestsDbFactory} use to create the {@link RepairRequestsDb} */ public NonBlockingRouter(RouterConfig routerConfig, RepairRequestsDbFactory repairRequestsDbFactory, NonBlockingRouterMetrics routerMetrics, NetworkClientFactory networkClientFactory, NotificationSystem notificationSystem, ClusterMap clusterMap, KeyManagementService kms, CryptoService cryptoService, CryptoJobHandler cryptoJobHandler, AccountService accountService, Time time, - String defaultPartitionClass, AmbryCache blobMetadataCache) throws IOException, ReflectiveOperationException { + String defaultPartitionClass, AmbryCache blobMetadataCache, IdConverterFactory idConverterFactory) throws IOException, ReflectiveOperationException { this.routerConfig = routerConfig; this.routerMetrics = routerMetrics; ResponseHandler responseHandler = new ResponseHandler(clusterMap); @@ -148,6 +154,9 @@ public NonBlockingRouter(RouterConfig routerConfig, RepairRequestsDbFactory repa logger.error("RepairRequests: Cannot connect to the RepairRequestsDB. ", e); } } + if (idConverterFactory != null) { + idConverter = idConverterFactory.getIdConverter(); + } } /** @@ -175,7 +184,7 @@ public NonBlockingRouter(RouterConfig routerConfig, NonBlockingRouterMetrics rou AccountService accountService, Time time, String defaultPartitionClass, AmbryCache blobMetadataCache) throws IOException, ReflectiveOperationException { this(routerConfig, null, routerMetrics, networkClientFactory, notificationSystem, clusterMap, kms, cryptoService, - cryptoJobHandler, accountService, time, defaultPartitionClass, blobMetadataCache); + cryptoJobHandler, accountService, time, defaultPartitionClass, blobMetadataCache, null); } /** @@ -338,17 +347,19 @@ public Future getBlob(String blobIdStr, GetBlobOptions options, f /** * Requests for a new blob to be put asynchronously and invokes the {@link Callback} when the request completes. + * * @param blobProperties The properties of the blob. Note that the size specified in the properties is ignored. The * channel is consumed fully, and the size of the blob is the number of bytes read from it. - * @param userMetadata Optional user metadata about the blob. This can be null. - * @param channel The {@link ReadableStreamChannel} that contains the content of the blob. - * @param options The {@link PutBlobOptions} associated with the request. This cannot be null. - * @param callback The {@link Callback} which will be invoked on the completion of the request . + * @param userMetadata Optional user metadata about the blob. This can be null. + * @param channel The {@link ReadableStreamChannel} that contains the content of the blob. + * @param options The {@link PutBlobOptions} associated with the request. This cannot be null. + * @param callback The {@link Callback} which will be invoked on the completion of the request . + * @param blobPath The name of the blob path for named blob based upload. * @return A future that would contain the BlobId eventually. */ @Override public Future putBlob(BlobProperties blobProperties, byte[] userMetadata, ReadableStreamChannel channel, - PutBlobOptions options, Callback callback, QuotaChargeCallback quotaChargeCallback) { + PutBlobOptions options, Callback callback, QuotaChargeCallback quotaChargeCallback, String blobPath) { if (blobProperties == null || channel == null || options == null) { throw new IllegalArgumentException("blobProperties, channel, or options must not be null"); } diff --git a/ambry-router/src/main/java/com/github/ambry/router/NonBlockingRouterFactory.java b/ambry-router/src/main/java/com/github/ambry/router/NonBlockingRouterFactory.java index ff53e63127..f42f46535e 100644 --- a/ambry-router/src/main/java/com/github/ambry/router/NonBlockingRouterFactory.java +++ b/ambry-router/src/main/java/com/github/ambry/router/NonBlockingRouterFactory.java @@ -23,6 +23,11 @@ import com.github.ambry.config.NetworkConfig; import com.github.ambry.config.RouterConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.frontend.IdConverterFactory; +import com.github.ambry.frontend.IdSigningService; +import com.github.ambry.frontend.IdSigningServiceFactory; +import com.github.ambry.named.NamedBlobDb; +import com.github.ambry.named.NamedBlobDbFactory; import com.github.ambry.network.NetworkClientFactory; import com.github.ambry.network.NetworkMetrics; import com.github.ambry.network.SocketNetworkClientFactory; @@ -143,9 +148,25 @@ public Router getRouter() { logger.error("Failed to create RepairRequestsDbFactory", e); } } + IdConverterFactory idConverterFactory = null; + if (routerConfig.idConverterFactory != null) { + try { + IdSigningService idSigningService = + Utils.getObj(routerConfig.idSigningServiceFactory, verifiableProperties, + clusterMap.getMetricRegistry()).getIdSigningService(); + NamedBlobDb namedBlobDb = Utils.isNullOrEmpty(routerConfig.namedBlobDbFactory) ? null + : Utils.getObj(routerConfig.namedBlobDbFactory, verifiableProperties, + clusterMap.getMetricRegistry(), accountService).getNamedBlobDb(); + idConverterFactory = + Utils.getObj(routerConfig.idConverterFactory, verifiableProperties, clusterMap.getMetricRegistry(), + idSigningService, namedBlobDb); + } catch (Exception e) { + logger.error("Failed to create idConverterFactory"); + } + } return new NonBlockingRouter(routerConfig, repairRequestsDbFactory, routerMetrics, networkClientFactory, notificationSystem, clusterMap, kms, cryptoService, cryptoJobHandler, accountService, time, - defaultPartitionClass, blobMetadataCache); + defaultPartitionClass, blobMetadataCache, idConverterFactory); } catch (IOException | ReflectiveOperationException e) { throw new IllegalStateException("Error instantiating NonBlocking Router ", e); } diff --git a/ambry-router/src/test/java/com/github/ambry/router/NonBlockingQuotaTest.java b/ambry-router/src/test/java/com/github/ambry/router/NonBlockingQuotaTest.java index e0d968f518..ef0eab48f0 100644 --- a/ambry-router/src/test/java/com/github/ambry/router/NonBlockingQuotaTest.java +++ b/ambry-router/src/test/java/com/github/ambry/router/NonBlockingQuotaTest.java @@ -181,7 +181,7 @@ public QuotaConfig getQuotaConfig() { setOperationParams(blobSize, TTL_SECS); String compositeBlobId = router.putBlob(putBlobProperties, putUserMetadata, putChannel, PutBlobOptions.DEFAULT, null, - quotaChargeCallback).get(); + quotaChargeCallback, null).get(); expectedChargeCallbackCount += blobSize; assertEquals(expectedChargeCallbackCount, listenerCalledCount.get()); RetainingAsyncWritableChannel retainingAsyncWritableChannel = new RetainingAsyncWritableChannel(); @@ -201,7 +201,7 @@ public QuotaConfig getQuotaConfig() { for (int i = 0; i < 2; i++) { setOperationParams(); String blobId = router.putBlob(putBlobProperties, putUserMetadata, putChannel, PutBlobOptions.DEFAULT, null, - quotaChargeCallback).get(); + quotaChargeCallback, null).get(); assertEquals(expectedChargeCallbackCount += PUT_CONTENT_SIZE, listenerCalledCount.get()); logger.info("Put blob {}", blobId); blobIds.add(blobId); @@ -242,7 +242,7 @@ public QuotaConfig getQuotaConfig() { for (int i = 0; i < stitchedBlobCount; i++) { setOperationParams(); String blobId = router.putBlob(putBlobProperties, putUserMetadata, putChannel, putOptionsForChunkedUpload, null, - quotaChargeCallback).get(); + quotaChargeCallback, null).get(); assertEquals(expectedChargeCallbackCount += PUT_CONTENT_SIZE, listenerCalledCount.get()); logger.info("Put blob {}", blobId); blobIds.add(blobId); @@ -337,7 +337,7 @@ public QuotaConfig getQuotaConfig() { setOperationParams(blobSize, TTL_SECS); String compositeBlobId = router.putBlob(putBlobProperties, putUserMetadata, putChannel, PutBlobOptions.DEFAULT, null, - quotaChargeCallback).get(); + quotaChargeCallback, null).get(); expectedChargeCallbackCount += blobSize; assertEquals(expectedChargeCallbackCount, listenerCalledCount.get()); RetainingAsyncWritableChannel retainingAsyncWritableChannel = new RetainingAsyncWritableChannel(); @@ -393,7 +393,7 @@ public void testRouterWithDefaultQuotaCallback() throws Exception { setOperationParams(blobSize, TTL_SECS); String compositeBlobId = router.putBlob(putBlobProperties, putUserMetadata, putChannel, PutBlobOptions.DEFAULT, null, - quotaChargeCallback).get(); + quotaChargeCallback, null).get(); assertEquals(0, listenerCalledCount.get()); RetainingAsyncWritableChannel retainingAsyncWritableChannel = new RetainingAsyncWritableChannel(); router.getBlob(compositeBlobId, new GetBlobOptionsBuilder().build(), null, quotaChargeCallback) @@ -450,7 +450,7 @@ public void testRouterWithPreProcessQuotaCallback() throws Exception { quotaSource.getCuUsage().put(String.valueOf(account.getId()), new CapacityUnit()); String compositeBlobId = router.putBlob(putBlobProperties, putUserMetadata, putChannel, PutBlobOptions.DEFAULT, null, - quotaChargeCallback).get(); + quotaChargeCallback, null).get(); CapacityUnit quotaUsage = quotaSource.getCuUsage().get(String.valueOf(account.getId())); Assert.assertEquals(8, quotaUsage.getWcu()); Assert.assertEquals(0, quotaUsage.getRcu()); diff --git a/ambry-router/src/test/java/com/github/ambry/router/NonBlockingRouterTestBase.java b/ambry-router/src/test/java/com/github/ambry/router/NonBlockingRouterTestBase.java index 2c6aa66f4c..4f5fba34cf 100644 --- a/ambry-router/src/test/java/com/github/ambry/router/NonBlockingRouterTestBase.java +++ b/ambry-router/src/test/java/com/github/ambry/router/NonBlockingRouterTestBase.java @@ -231,7 +231,7 @@ protected void setRouter(Properties props, MockServerLayout serverLayout, Notifi router = new NonBlockingRouter(routerConfig, factory, routerMetrics, new MockNetworkClientFactory(verifiableProperties, mockSelectorState, MAX_PORTS_PLAIN_TEXT, MAX_PORTS_SSL, CHECKOUT_TIMEOUT_MS, serverLayout, mockTime), notificationSystem, mockClusterMap, kms, cryptoService, - cryptoJobHandler, accountService, mockTime, MockClusterMap.DEFAULT_PARTITION_CLASS, null); + cryptoJobHandler, accountService, mockTime, MockClusterMap.DEFAULT_PARTITION_CLASS, null, null); } protected void setRouterWithMetadataCache(Properties props, AmbryCacheStats ambryCacheStats) throws Exception { diff --git a/ambry-router/src/test/java/com/github/ambry/router/PutManagerTest.java b/ambry-router/src/test/java/com/github/ambry/router/PutManagerTest.java index 76eba99ebd..678c897e19 100644 --- a/ambry-router/src/test/java/com/github/ambry/router/PutManagerTest.java +++ b/ambry-router/src/test/java/com/github/ambry/router/PutManagerTest.java @@ -405,7 +405,7 @@ public void testBadCallback() throws Exception { router.putBlob(req.putBlobProperties, req.putUserMetadata, putChannel, req.options, (result, exception) -> { callbackCalled.countDown(); throw new RuntimeException("Throwing an exception in the user callback"); - }, QuotaTestUtils.createTestQuotaChargeCallback()); + }, QuotaTestUtils.createTestQuotaChargeCallback(), null); submitPutsAndAssertSuccess(false); //future.get() for operation with bad callback should still succeed future.get(); diff --git a/ambry-server/src/integration-test/java/com/github/ambry/server/RouterServerTestFramework.java b/ambry-server/src/integration-test/java/com/github/ambry/server/RouterServerTestFramework.java index 5f2ac0b16e..809f18e0c1 100644 --- a/ambry-server/src/integration-test/java/com/github/ambry/server/RouterServerTestFramework.java +++ b/ambry-server/src/integration-test/java/com/github/ambry/server/RouterServerTestFramework.java @@ -299,7 +299,7 @@ void action(String result) { }; Future future = router.putBlob(opChain.properties, opChain.userMetadata, putChannel, new PutBlobOptionsBuilder().build(), - callback, quotaChargeCallback); + callback, quotaChargeCallback, null); TestFuture testFuture = new TestFuture(future, genLabel("putBlob", false), opChain) { @Override void check() throws Exception { diff --git a/ambry-test-utils/src/main/java/com/github/ambry/router/InMemoryRouter.java b/ambry-test-utils/src/main/java/com/github/ambry/router/InMemoryRouter.java index 64ee466b34..d91c578cfd 100644 --- a/ambry-test-utils/src/main/java/com/github/ambry/router/InMemoryRouter.java +++ b/ambry-test-utils/src/main/java/com/github/ambry/router/InMemoryRouter.java @@ -259,7 +259,7 @@ public Future getBlob(String blobId, GetBlobOptions options, Call @Override public Future putBlob(BlobProperties blobProperties, byte[] usermetadata, ReadableStreamChannel channel, - PutBlobOptions options, Callback callback, QuotaChargeCallback quotaChargeCallback) { + PutBlobOptions options, Callback callback, QuotaChargeCallback quotaChargeCallback, String blobPath) { FutureResult futureResult = new FutureResult<>(); if (!handlePrechecks(futureResult, callback)) { return futureResult; diff --git a/ambry-test-utils/src/main/java/com/github/ambry/server/ServerTestUtil.java b/ambry-test-utils/src/main/java/com/github/ambry/server/ServerTestUtil.java index 6987662191..4d9d32d290 100644 --- a/ambry-test-utils/src/main/java/com/github/ambry/server/ServerTestUtil.java +++ b/ambry-test-utils/src/main/java/com/github/ambry/server/ServerTestUtil.java @@ -1492,7 +1492,7 @@ public void onCompletion(String result, Exception exception) { } callbackLatch.countDown(); } - }, QUOTA_CHARGE_EVENT_LISTENER); + }, QUOTA_CHARGE_EVENT_LISTENER, null); putFutures.add(future); } for (Future future : putFutures) { diff --git a/ambry-tools/src/main/java/com/github/ambry/tools/admin/ConcurrencyTestTool.java b/ambry-tools/src/main/java/com/github/ambry/tools/admin/ConcurrencyTestTool.java index e4a9865b68..1384d3e6ab 100644 --- a/ambry-tools/src/main/java/com/github/ambry/tools/admin/ConcurrencyTestTool.java +++ b/ambry-tools/src/main/java/com/github/ambry/tools/admin/ConcurrencyTestTool.java @@ -674,7 +674,7 @@ public void onCompletion(String result, Exception exception) { callback.onCompletion(toReturn, exceptionToReturn); } } - }, QUOTA_CHARGE_EVENT_LISTENER); + }, QUOTA_CHARGE_EVENT_LISTENER, null); } catch (Exception e) { futureResult.done(null, e); if (callback != null) { diff --git a/ambry-tools/src/main/java/com/github/ambry/tools/perf/rest/PerfRouter.java b/ambry-tools/src/main/java/com/github/ambry/tools/perf/rest/PerfRouter.java index e81493e8c6..80d1c9c559 100644 --- a/ambry-tools/src/main/java/com/github/ambry/tools/perf/rest/PerfRouter.java +++ b/ambry-tools/src/main/java/com/github/ambry/tools/perf/rest/PerfRouter.java @@ -102,16 +102,18 @@ public Future getBlob(String blobId, GetBlobOptions options, Call /** * Consumes the data in {@code channel} and simply throws it away. {@code blobProperties} and {@code usermetadata} are * ignored. + * * @param blobProperties The properties of the blob. - * @param usermetadata Optional user metadata about the blob. This can be null. - * @param channel The {@link ReadableStreamChannel} that contains the content of the blob. - * @param options the {@link PutBlobOptions} for the blob. - * @param callback the {@link Callback} to invoke on operation completion. + * @param usermetadata Optional user metadata about the blob. This can be null. + * @param channel The {@link ReadableStreamChannel} that contains the content of the blob. + * @param options the {@link PutBlobOptions} for the blob. + * @param callback the {@link Callback} to invoke on operation completion. + * @param blobPath The name of the blob path for named blob based upload. * @return a {@link Future} that will contain a (dummy) blob id. */ @Override public Future putBlob(BlobProperties blobProperties, byte[] usermetadata, final ReadableStreamChannel channel, - PutBlobOptions options, final Callback callback, QuotaChargeCallback quotaChargeCallback) { + PutBlobOptions options, final Callback callback, QuotaChargeCallback quotaChargeCallback, String blobPath) { logger.trace("Received putBlob call"); final FutureResult futureResult = new FutureResult(); if (!routerOpen) {