Skip to content

Commit

Permalink
Move IdConverter inside router phase1 (linkedin#2886)
Browse files Browse the repository at this point in the history
* Move IdConverter inside router

* pass in idConverter to router

* Add blobPath in router.putBlob

---------

Co-authored-by: Sophie Guo <[email protected]>
  • Loading branch information
SophieGuo410 and Sophie Guo authored Sep 11, 2024
1 parent 570f9cf commit 9f410bc
Show file tree
Hide file tree
Showing 24 changed files with 166 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public Future<GetBlobResult> getBlob(String blobId, GetBlobOptions options, Call

@Override
public Future<String> putBlob(BlobProperties blobProperties, byte[] userMetadata, ReadableStreamChannel channel,
PutBlobOptions options, Callback<String> callback, QuotaChargeCallback quotaChargeCallback) {
PutBlobOptions options, Callback<String> callback, QuotaChargeCallback quotaChargeCallback, String blobPath) {
lock.lock();
try {
FutureResult<String> future = new FutureResult<>();
Expand Down
19 changes: 19 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/config/RouterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -761,11 +761,30 @@ public class RouterConfig {
@Default("false")
public final boolean routerParanoidDurabilityEnabled;

/**
* This is set in frontendConfig until id converter been fully migrate to router.
*/
public final String idConverterFactory;

/**
* This is set in frontendConfig until id converter been fully migrate to router.
*/
public final String idSigningServiceFactory;

/**
* This is set in frontendConfig until id converter been fully migrate to router.
*/
public final String namedBlobDbFactory;

/**
* Create a RouterConfig instance.
* @param verifiableProperties the properties map to refer to.
*/
public RouterConfig(VerifiableProperties verifiableProperties) {
FrontendConfig frontendConfig = new FrontendConfig(verifiableProperties);
idConverterFactory = frontendConfig.idConverterFactory;
idSigningServiceFactory = frontendConfig.idSigningServiceFactory;
namedBlobDbFactory = frontendConfig.namedBlobDbFactory;
routerBlobMetadataCacheId =
verifiableProperties.getString(ROUTER_BLOB_METADATA_CACHE_ID, "routerBlobMetadataCache");
routerMaxNumMetadataCacheEntries =
Expand Down
21 changes: 12 additions & 9 deletions ambry-api/src/main/java/com/github/ambry/frontend/IdConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.github.ambry.commons.CallbackUtils;
import com.github.ambry.messageformat.BlobInfo;
import com.github.ambry.messageformat.BlobProperties;
import com.github.ambry.rest.RestRequest;
import com.github.ambry.commons.Callback;
import java.io.Closeable;
Expand Down Expand Up @@ -53,24 +54,26 @@ default CompletableFuture<String> convert(RestRequest restRequest, String input)

/**
* Converts an ID.
* @param restRequest {@link RestRequest} representing the request.
* @param input the ID that needs to be converted.
* @param blobInfo the {@link BlobInfo} for an uploaded blob. Can be null for non-upload use cases.
* @param callback the {@link Callback} to invoke once the converted ID is available. Can be null.
*
* @param restRequest {@link RestRequest} representing the request.
* @param input the ID that needs to be converted.
* @param blobProperties the {@link BlobProperties} for an uploaded blob. Can be null for non-upload use cases.
* @param callback the {@link Callback} to invoke once the converted ID is available. Can be null.
* @return a {@link Future} that will eventually contain the converted ID.
*/
default Future<String> convert(RestRequest restRequest, String input, BlobInfo blobInfo, Callback<String> callback) {
default Future<String> convert(RestRequest restRequest, String input, BlobProperties blobProperties, Callback<String> callback) {
return convert(restRequest, input, callback);
}

/**
* Converts an ID.
* @param restRequest {@link RestRequest} representing the request.
* @param input the ID that needs to be converted.
* @param blobInfo the {@link BlobInfo} for an uploaded blob. Can be null for non-upload use cases.
*
* @param restRequest {@link RestRequest} representing the request.
* @param input the ID that needs to be converted.
* @param blobProperties the {@link BlobProperties} for an uploaded blob. Can be null for non-upload use cases.
* @return a {@link CompletableFuture} that will eventually contain the converted ID.
*/
default CompletableFuture<String> convert(RestRequest restRequest, String input, BlobInfo blobInfo) {
default CompletableFuture<String> convert(RestRequest restRequest, String input, BlobProperties blobProperties) {
return convert(restRequest, input);
}
}
19 changes: 11 additions & 8 deletions ambry-api/src/main/java/com/github/ambry/router/Router.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,20 @@ Future<GetBlobResult> getBlob(String blobId, GetBlobOptions options, Callback<Ge

/**
* Requests for a new blob to be put asynchronously and invokes the {@link Callback} when the request completes.
* @param blobProperties The properties of the blob. Note that the size specified in the properties is ignored. The
* channel is consumed fully, and the size of the blob is the number of bytes read from it.
* @param userMetadata Optional user metadata about the blob. This can be null.
* @param channel The {@link ReadableStreamChannel} that contains the content of the blob.
* @param options The {@link PutBlobOptions} associated with the request. This cannot be null.
* @param callback The {@link Callback} which will be invoked on the completion of the request .
*
* @param blobProperties The properties of the blob. Note that the size specified in the properties is ignored.
* The channel is consumed fully, and the size of the blob is the number of bytes read from
* it.
* @param userMetadata Optional user metadata about the blob. This can be null.
* @param channel The {@link ReadableStreamChannel} that contains the content of the blob.
* @param options The {@link PutBlobOptions} associated with the request. This cannot be null.
* @param callback The {@link Callback} which will be invoked on the completion of the request .
* @param quotaChargeCallback Listener interface to charge quota cost for the operation.
* @param blobPath The name of the blob path for named blob based upload.
* @return A future that would contain the BlobId eventually.
*/
Future<String> putBlob(BlobProperties blobProperties, byte[] userMetadata, ReadableStreamChannel channel,
PutBlobOptions options, Callback<String> callback, QuotaChargeCallback quotaChargeCallback);
PutBlobOptions options, Callback<String> callback, QuotaChargeCallback quotaChargeCallback, String blobPath);

/**
* Requests for a new metadata blob to be put asynchronously and invokes the {@link Callback} when the request
Expand Down Expand Up @@ -192,7 +195,7 @@ default CompletableFuture<String> stitchBlob(BlobProperties blobProperties, byte
default CompletableFuture<String> putBlob(BlobProperties blobProperties, byte[] userMetadata,
ReadableStreamChannel channel, PutBlobOptions options) {
CompletableFuture<String> future = new CompletableFuture<>();
putBlob(blobProperties, userMetadata, channel, options, CallbackUtils.fromCompletableFuture(future), null);
putBlob(blobProperties, userMetadata, channel, options, CallbackUtils.fromCompletableFuture(future), null, null);
return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void handlePost(RestRequest restRequest, RestResponseChannel restResponse
BlobProperties blobProperties = RestUtils.buildBlobProperties(restRequest.getArgs());
byte[] usermetadata = RestUtils.buildUserMetadata(restRequest.getArgs());
router.putBlob(blobProperties, usermetadata, restRequest, new PutBlobOptionsBuilder().build(),
new MockPostCallback(this, restRequest, restResponseChannel, blobProperties), null);
new MockPostCallback(this, restRequest, restResponseChannel, blobProperties), null, null);
} catch (RestServiceException e) {
handleResponse(restRequest, restResponseChannel, null, e);
}
Expand Down Expand Up @@ -159,7 +159,7 @@ public void handlePut(RestRequest restRequest, RestResponseChannel restResponseC
BlobProperties blobProperties = RestUtils.buildBlobProperties(restRequest.getArgs());
byte[] usermetadata = RestUtils.buildUserMetadata(restRequest.getArgs());
router.putBlob(blobProperties, usermetadata, restRequest, new PutBlobOptionsBuilder().build(),
new MockPostCallback(this, restRequest, restResponseChannel, blobProperties), null);
new MockPostCallback(this, restRequest, restResponseChannel, blobProperties), null, null);
} catch (RestServiceException e) {
handleResponse(restRequest, restResponseChannel, null, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.github.ambry.config.FrontendConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.frontend.s3.S3BaseHandler;
import com.github.ambry.frontend.s3.S3MultipartUploadHandler;
import com.github.ambry.messageformat.BlobInfo;
import com.github.ambry.messageformat.BlobProperties;
import com.github.ambry.named.DeleteResult;
Expand Down Expand Up @@ -89,19 +88,18 @@ public Future<String> convert(RestRequest restRequest, String input, Callback<St
}

/**
* {@inheritDoc}
* On {@link RestMethod#POST}, adds a leading slash to indicate that the ID represents the path of the resource
* created.
* On any other {@link RestMethod}, removes the leading slash in order to convert the path into an ID that the
* {@link com.github.ambry.router.Router} will understand.
* @param restRequest {@link RestRequest} representing the request.
* @param input the ID that needs to be converted.
* @param blobInfo the {@link BlobInfo} for an uploaded blob. This will be used for named blob PUT requests.
* @param callback the {@link Callback} to invoke once the converted ID is available. Can be null.
* {@inheritDoc} On {@link RestMethod#POST}, adds a leading slash to indicate that the ID represents the path of the
* resource created. On any other {@link RestMethod}, removes the leading slash in order to convert the path into an
* ID that the {@link com.github.ambry.router.Router} will understand.
*
* @param restRequest {@link RestRequest} representing the request.
* @param input the ID that needs to be converted.
* @param blobProperties the {@link BlobInfo} for an uploaded blob. This will be used for named blob PUT requests.
* @param callback the {@link Callback} to invoke once the converted ID is available. Can be null.
* @return a {@link Future} that will eventually contain the converted ID.
*/
@Override
public Future<String> convert(RestRequest restRequest, String input, BlobInfo blobInfo, Callback<String> callback) {
public Future<String> convert(RestRequest restRequest, String input, BlobProperties blobProperties, Callback<String> callback) {
final CompletableFuture<String> future = new CompletableFuture<>();
String convertedId = null;
Exception exception = null;
Expand All @@ -115,7 +113,7 @@ public Future<String> convert(RestRequest restRequest, String input, BlobInfo bl
// TODO [S3] Add ID conversion for S3 POST requests (coming during multipart uploads) as well
convertedId = "/" + signIdIfRequired(restRequest, input);
} else {
CallbackUtils.callCallbackAfter(convertId(input, restRequest, blobInfo),
CallbackUtils.callCallbackAfter(convertId(input, restRequest, blobProperties),
(id, e) -> completeConversion(id, e, future, callback));
}
} catch (Exception e) {
Expand Down Expand Up @@ -153,13 +151,14 @@ private <T> void completeConversion(T conversionResult, Exception exception, Com
/**
* Convert the input ID to the requested output. If it's the named blob request, return the blobId from NameBlobDb,
* otherwise return the input with leading slash and extension be stripped.
* @param input the input blob ID.
* @param restRequest the {@link RestRequest} to set arguments in.
* @param blobInfo the {@link BlobInfo} for an uploaded blob. This will be used for named blob PUT requests.
*
* @param input the input blob ID.
* @param restRequest the {@link RestRequest} to set arguments in.
* @param blobProperties the {@link BlobProperties} for an uploaded blob. This will be used for named blob PUT requests.
* @return the {@link CompletionStage} that will be completed with the converted ID
* @throws RestServiceException
*/
private CompletionStage<String> convertId(String input, RestRequest restRequest, BlobInfo blobInfo)
private CompletionStage<String> convertId(String input, RestRequest restRequest, BlobProperties blobProperties)
throws RestServiceException {
CompletionStage<String> conversionFuture;
LOGGER.debug("input for convertId : " + input);
Expand All @@ -185,17 +184,16 @@ private CompletionStage<String> convertId(String input, RestRequest restRequest,
namedBlobPath.getBlobName(), getOption).thenApply(NamedBlobRecord::getBlobId);
}
} else if (isNamedBlobPutRequest(restRequest) || isS3MultipartUploadCompleteRequest(restRequest)) {
Objects.requireNonNull(blobInfo, "blobInfo cannot be null.");
Objects.requireNonNull(blobProperties, "blobProperties cannot be null.");
NamedBlobPath namedBlobPath = NamedBlobPath.parse(RestUtils.getRequestPath(restRequest), restRequest.getArgs());
String blobId = RestUtils.stripSlashAndExtensionFromId(input);
BlobProperties properties = blobInfo.getBlobProperties();
long expirationTimeMs =
Utils.addSecondsToEpochTime(properties.getCreationTimeInMs(), properties.getTimeToLiveInSeconds());
Utils.addSecondsToEpochTime(blobProperties.getCreationTimeInMs(), blobProperties.getTimeToLiveInSeconds());
// Please note that the modified_ts column in DB will be auto-populated by the DB server.
NamedBlobRecord record = new NamedBlobRecord(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(),
namedBlobPath.getBlobName(), blobId, expirationTimeMs, 0, properties.getBlobSize());
namedBlobPath.getBlobName(), blobId, expirationTimeMs, 0, blobProperties.getBlobSize());
NamedBlobState state = NamedBlobState.READY;
if (properties.getTimeToLiveInSeconds() == Utils.Infinite_Time) {
if (blobProperties.getTimeToLiveInSeconds() == Utils.Infinite_Time) {
// Set named blob state as 'IN_PROGRESS', will set the state to be 'READY' in the ttlUpdate success callback: routerTtlUpdateCallback
state = NamedBlobState.IN_PROGRESS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ private Callback<Void> securityPostProcessRequestCallback(BlobInfo blobInfo) {
}
PutBlobOptions options = getPutBlobOptionsFromRequest();
router.putBlob(getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(), restRequest, options,
routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true));
routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true),
RestUtils.getRequestPath(restRequest).getOperationOrBlobId(false));
}
}, uri, LOGGER, deleteDatasetCallback);
}
Expand All @@ -250,7 +251,7 @@ private Callback<String> routerPutBlobCallback(BlobInfo blobInfo) {
return buildCallback(frontendMetrics.putRouterPutBlobMetrics, blobId -> {
restResponseChannel.setHeader(RestUtils.Headers.BLOB_SIZE, restRequest.getBlobBytesReceived());
blobInfo.getBlobProperties().setBlobSize(restRequest.getBlobBytesReceived());
idConverter.convert(restRequest, blobId, blobInfo, idConverterCallback(blobInfo, blobId));
idConverter.convert(restRequest, blobId, blobInfo.getBlobProperties(), idConverterCallback(blobInfo, blobId));
}, uri, LOGGER, deleteDatasetCallback);
}

Expand Down Expand Up @@ -284,7 +285,7 @@ private Callback<String> routerStitchBlobCallback(BlobInfo blobInfo,
// The actual blob size is now present in the instance of BlobProperties passed to the router.stitchBlob().
// Update it in the BlobInfo so that IdConverter can add it to the named blob DB
blobInfo.getBlobProperties().setBlobSize(propertiesPassedInRouterUpload.getBlobSize());
idConverter.convert(restRequest, blobId, blobInfo, idConverterCallback(blobInfo, blobId));
idConverter.convert(restRequest, blobId, blobInfo.getBlobProperties(), idConverterCallback(blobInfo, blobId));
}, uri, LOGGER, deleteDatasetCallback);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ private Callback<Void> securityPostProcessRequestCallback(BlobInfo blobInfo) {
} else {
PutBlobOptions options = getPutBlobOptionsFromRequest();
router.putBlob(blobInfo.getBlobProperties(), blobInfo.getUserMetadata(), restRequest, options,
routerPutBlobCallback(blobInfo),
QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true));
routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true),
null);
}
}, uri, LOGGER, finalCallback);
}
Expand Down Expand Up @@ -225,7 +225,7 @@ private Callback<Long> fetchStitchRequestBodyCallback(RetainingAsyncWritableChan
*/
private Callback<String> routerStitchBlobCallback(BlobInfo blobInfo) {
return buildCallback(frontendMetrics.postRouterStitchBlobMetrics,
blobId -> idConverter.convert(restRequest, blobId, blobInfo, idConverterCallback(blobInfo)), uri, LOGGER,
blobId -> idConverter.convert(restRequest, blobId, blobInfo.getBlobProperties(), idConverterCallback(blobInfo)), uri, LOGGER,
finalCallback);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ private Callback<String> routerStitchBlobCallback(BlobInfo blobInfo,
// The actual blob size is now present in the instance of BlobProperties passed to the router.stitchBlob().
// Update it in the BlobInfo so that IdConverter can add it to the named blob DB
blobInfo.getBlobProperties().setBlobSize(propertiesPassedInRouterUpload.getBlobSize());
idConverter.convert(restRequest, blobId, blobInfo, idConverterCallback(blobInfo, blobId));
idConverter.convert(restRequest, blobId, blobInfo.getBlobProperties(), idConverterCallback(blobInfo, blobId));
}, uri, LOGGER, finalCallback);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ private Callback<Void> securityPostProcessRequestCallback(BlobInfo blobInfo) {
} else {
PutBlobOptions options = getPutBlobOptionsFromRequest();
router.putBlob(blobInfo.getBlobProperties(), blobInfo.getUserMetadata(), restRequest, options,
routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true));
routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true),
null);
}
}, uri, logger, finalCallback);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ private void testConversionForNamedBlob(IdConverter idConverter, RestMethod rest
}
IdConversionCallback callback = new IdConversionCallback();
assertEquals("Converted ID does not match expected (Future)", expectedOutput,
idConverter.convert(restRequest, input, blobInfo, callback).get(5, TimeUnit.SECONDS));
idConverter.convert(restRequest, input, blobInfo.getBlobProperties(), callback).get(5, TimeUnit.SECONDS));
assertEquals("Converted ID does not match expected (Callback)", expectedOutput, callback.result);
assertTrue("Named Blob Put request should have named blob version in its internal keys",
restRequest.getArgs().containsKey(RestUtils.InternalKeys.NAMED_BLOB_VERSION));
Expand Down
Loading

0 comments on commit 9f410bc

Please sign in to comment.