Skip to content

Commit

Permalink
[vcr-2.0] Delete blob eagerly (linkedin#2836)
Browse files Browse the repository at this point in the history
Implementing immediate blob deletion from the cloud once the VCR receives a DEL message from the server via replication. This approach offers several advantages over our current method, where instead of deleting the blob outright, we currently add a delete-timestamp to Azure blob metadata. Compaction then deletes the blob later based on this timestamp.

The benefits of the new approach include:

Proactively reclaiming space in Azure.
Reducing the number of network calls required to delete a blob; currently, it involves 2 network calls to attach a timestamp and another 2 network calls from compaction to remove the blob.
Decreasing the size of the response from the list-metadata API used by compaction to list all blobs in an Azure partition.
Based on customer usage patterns, few customers utilize the undelete feature. If the VCR receives an undelete request after a blob has been removed from the cloud, it will be re-uploaded to the cloud using the replication protocol.

This patch also removes some unused legacy code
  • Loading branch information
snalli authored Jul 25, 2024
1 parent e0abcd8 commit 7eb76db
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -634,24 +634,15 @@ private boolean shouldUpload(MessageInfo messageInfo) {

@Override
public void delete(List<MessageInfo> infos) throws StoreException {
// TODO: Remove the duplicate code by calling deleteAsync() method.
checkStarted();
checkDuplicates(infos);

try {
for (MessageInfo msgInfo : infos) {
BlobId blobId = (BlobId) msgInfo.getStoreKey();
// If the cache has been updated by another thread, retry may be avoided
requestAgent.doWithRetries(() -> deleteIfNeeded(blobId, msgInfo.getOperationTimeMs(), msgInfo.getLifeVersion()),
"Delete", partitionId.toPathString());
for (MessageInfo msg : infos) {
cloudDestination.deleteBlob((BlobId) msg.getStoreKey(), msg.getOperationTimeMs(), msg.getLifeVersion(), null);
}
} catch (CloudStorageException ex) {
if (ex.getCause() instanceof StoreException) {
throw (StoreException) ex.getCause();
} catch (CloudStorageException cse) {
if (cse.getCause() instanceof StoreException) {
throw (StoreException) cse.getCause();
}
StoreErrorCodes errorCode =
(ex.getStatusCode() == STATUS_NOT_FOUND) ? StoreErrorCodes.ID_Not_Found : StoreErrorCodes.IOError;
throw new StoreException(ex, errorCode);
throw new StoreException(cse.getCause(), StoreErrorCodes.IOError);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.cloud.azure;

public enum AzureBlobDeletePolicy {
/**
* Attach a delete-timestamp to azure-blob, but do not delete it.
* Let compaction job eventually delete the blob.
*/
EVENTUAL,
/**
* Just delete the blob when a DELETE message is received from server.
*/
IMMEDIATE
}
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,14 @@ static List<StorageAccountInfo> parseStorageAccountInfo(String storageAccountInf
public static final int DEFAULT_AZURE_MAX_TRIES = 4; // Defaults from RequestRetryOptions
@Config(AZURE_MAX_TRIES)
public final int azureMaxTries;

public static final String AZURE_BLOB_DELETE_POLICY = "azure.blob.delete.policy";
public final AzureBlobDeletePolicy DEFAULT_AZURE_BLOB_DELETE_POLICY = AzureBlobDeletePolicy.EVENTUAL;
@Config(AZURE_BLOB_DELETE_POLICY)
public final AzureBlobDeletePolicy azureBlobDeletePolicy;

public AzureCloudConfig(VerifiableProperties verifiableProperties) {
azureBlobDeletePolicy = verifiableProperties.getEnum(AZURE_BLOB_DELETE_POLICY, AzureBlobDeletePolicy.class, DEFAULT_AZURE_BLOB_DELETE_POLICY);
// 5000 is the default size of Azure blob storage
azureBlobStorageMaxResultsPerPage = verifiableProperties.getInt(AZURE_BLOB_STORAGE_MAX_RESULTS_PER_PAGE, 5000);
azureStorageConnectionString = verifiableProperties.getString(AZURE_STORAGE_CONNECTION_STRING, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,64 +669,51 @@ CloudStorageException toCloudStorageException(String msg, Throwable t) {

@Override
public boolean deleteBlob(BlobId blobId, long deletionTime, short lifeVersion,
CloudUpdateValidator cloudUpdateValidator) throws CloudStorageException {
CloudUpdateValidator unused) throws CloudStorageException {
Timer.Context storageTimer = azureMetrics.blobUpdateDeleteTimeLatency.time();
AzureBlobLayoutStrategy.BlobLayout blobLayout = azureBlobLayoutStrategy.getDataBlobLayout(blobId);
String blobIdStr = blobLayout.blobFilePath;
Map<String, Object> newMetadata = new HashMap<>();
newMetadata.put(CloudBlobMetadata.FIELD_DELETION_TIME, String.valueOf(deletionTime));
newMetadata.put(CloudBlobMetadata.FIELD_LIFE_VERSION, lifeVersion);
if (azureCloudConfig.azureBlobDeletePolicy.equals(AzureBlobDeletePolicy.IMMEDIATE)) {
return eraseBlob(createOrGetBlobStore(blobLayout.containerName).getBlobClient(blobLayout.blobFilePath),
"CUSTOMER_DELETE_REQUEST");
}
// AzureBlobDeletePolicy.EVENTUAL
BlobProperties blobProperties = getBlobPropertiesCached(blobLayout);
Map<String, String> cloudMetadata = blobProperties.getMetadata();

// lifeVersion must always be present
short cloudlifeVersion = Short.parseShort(cloudMetadata.get(CloudBlobMetadata.FIELD_LIFE_VERSION));
try {
if (cloudUpdateValidator != null &&
!cloudUpdateValidator.validateUpdate(CloudBlobMetadata.fromMap(cloudMetadata), blobId, newMetadata)) {
// lifeVersion must always be present
short cloudlifeVersion = Short.parseShort(cloudMetadata.get(CloudBlobMetadata.FIELD_LIFE_VERSION));
if (cloudlifeVersion > lifeVersion) {
String error = String.format("Failed to update deleteTime of blob %s as it has a higher life version in cloud than replicated message: %s > %s",
blobIdStr, cloudlifeVersion, lifeVersion);
logger.trace(error);
throw AzureCloudDestination.toCloudStorageException(error, new StoreException(error, StoreErrorCodes.Life_Version_Conflict), null);
}
if (cloudlifeVersion > lifeVersion) {
String error = String.format("Failed to update deleteTime of blob %s as it has a higher life version in cloud than replicated message: %s > %s",
blobIdStr, cloudlifeVersion, lifeVersion);
logger.trace(error);
throw new StoreException(error, StoreErrorCodes.Life_Version_Conflict);
}
if (cloudlifeVersion == lifeVersion && cloudMetadata.containsKey(CloudBlobMetadata.FIELD_DELETION_TIME)) {
String error = String.format("Failed to update deleteTime of blob %s as it is marked for deletion in cloud", blobIdStr);
logger.trace(error);
throw AzureCloudDestination.toCloudStorageException(error, new StoreException(error, StoreErrorCodes.ID_Deleted), null);
throw new StoreException(error, StoreErrorCodes.ID_Deleted);
}
} catch (StoreException e) {
azureMetrics.blobUpdateDeleteTimeErrorCount.inc();
String error = String.format("Failed to update deleteTime of blob %s in Azure blob storage due to (%s)", blobLayout, e.getMessage());
throw AzureCloudDestination.toCloudStorageException(error, e, null);
}

newMetadata.forEach((k,v) -> cloudMetadata.put(k, String.valueOf(v)));
try {
Map<String, Object> newMetadata = new HashMap<>();
newMetadata.put(CloudBlobMetadata.FIELD_DELETION_TIME, String.valueOf(deletionTime));
newMetadata.put(CloudBlobMetadata.FIELD_LIFE_VERSION, lifeVersion);
newMetadata.forEach((k,v) -> cloudMetadata.put(k, String.valueOf(v)));
logger.trace("Updating deleteTime of blob {} in Azure blob storage ", blobLayout.blobFilePath);
Response<Void> response = updateBlobMetadata(blobLayout, blobProperties);
// Success rate is effective, success counter is ineffective because it just monotonically increases
azureMetrics.blobUpdateDeleteTimeSuccessRate.mark();
logger.trace("Successfully updated deleteTime of blob {} in Azure blob storage with statusCode = {}, etag = {}",
blobLayout.blobFilePath, response.getStatusCode(), response.getHeaders().get(HttpHeaderName.ETAG));
return true;
} catch (BlobStorageException bse) {
String error = String.format("Failed to update deleteTime of blob %s in Azure blob storage due to (%s)", blobLayout, bse.getMessage());
if (bse.getErrorCode() == BlobErrorCode.CONDITION_NOT_MET) {
/*
If we are here, it just means that two threads tried to delete concurrently. This is ok.
*/
logger.trace(error);
return true;
}
} catch (StoreException e) {
azureMetrics.blobUpdateDeleteTimeErrorCount.inc();
logger.error(error);
throw AzureCloudDestination.toCloudStorageException(error, bse, null);
String error = String.format("Failed to update deleteTime of blob %s in Azure blob storage due to (%s)", blobLayout, e.getMessage());
throw toCloudStorageException(error, e);
} catch (Throwable t) {
// Unknown error
azureMetrics.blobUpdateDeleteTimeErrorCount.inc();
String error = String.format("Failed to update deleteTime of blob %s in Azure blob storage due to (%s)", blobLayout, t.getMessage());
logger.error(error);
throw AzureCloudDestination.toCloudStorageException(error, t, null);
throw toCloudStorageException(error, t);
} finally {
storageTimer.stop();
} // try-catch
Expand Down Expand Up @@ -947,6 +934,36 @@ public FindResult findEntriesSince(String partitionPath, FindToken findToken, lo
throw new UnsupportedOperationException("findEntriesSince will not be implemented for AzureCloudDestinationSync");
}

/**
* Erases a blob permanently, including all snapshots of it from Azure Storage.
* @param blobClient Client for the blob
* @param eraseReason Reason to delete
* @return True if blob deleted, else false.
*/
protected boolean eraseBlob(BlobClient blobClient, String eraseReason) {
Timer.Context storageTimer = azureMetrics.blobCompactionLatency.time();
BlobRequestConditions blobRequestConditions = new BlobRequestConditions().setIfMatch("*");
Response<Void> response = blobClient.deleteWithResponse(DeleteSnapshotsOptionType.INCLUDE,
blobRequestConditions, Duration.ofMillis(cloudConfig.cloudRequestTimeout), Context.NONE);
storageTimer.stop();
switch (response.getStatusCode()) {
case HttpStatus.SC_ACCEPTED:
logger.trace("[ERASE] Erased blob {}/{} from Azure blob storage, reason = {}, status = {}",
blobClient.getContainerName(), blobClient.getBlobName(), eraseReason, response.getStatusCode());
azureMetrics.blobCompactionSuccessRate.mark();
break;
case HttpStatus.SC_NOT_FOUND:
// If you're trying to delete a blob, then it must exist.
// If it doesn't, then there is something wrong in the code. Go figure it out !
default:
// Just increment a counter and set an alert on it. No need to throw an error and fail the thread.
azureMetrics.blobCompactionErrorCount.inc();
logger.error("[ERASE] Failed to erase blob {}/{} from Azure blob storage, reason = {}, status {}",
blobClient.getContainerName(), blobClient.getBlobName(), eraseReason, response.getStatusCode());
}
return response.getStatusCode() == HttpStatus.SC_ACCEPTED;
}

/**
* Erases blobs from a given list of blobs in cloud
* @param blobItemList List of blobs in a container
Expand Down Expand Up @@ -993,24 +1010,7 @@ protected int eraseBlobs(List<BlobItem> blobItemList, BlobContainerClient blobCo
logger.trace("[DRY-RUN][COMPACT] Can erase blob {} from Azure blob storage because {}", blobItem.getName(), eraseReason);
numBlobsPurged += 1;
} else {
Timer.Context storageTimer = azureMetrics.blobCompactionLatency.time();
Response<Void> response = blobContainerClient.getBlobClient(blobItem.getName())
.deleteWithResponse(DeleteSnapshotsOptionType.INCLUDE, null, null, null);
storageTimer.stop();
switch (response.getStatusCode()) {
case HttpStatus.SC_ACCEPTED:
logger.trace("[COMPACT] Erased blob {} from Azure blob storage, reason = {}, status = {}", blobItem.getName(),
eraseReason, response.getStatusCode());
numBlobsPurged += 1;
azureMetrics.blobCompactionSuccessRate.mark();
break;
case HttpStatus.SC_NOT_FOUND:
default:
// Just increment a counter and set an alert on it. No need to throw an error and fail the thread.
azureMetrics.blobCompactionErrorCount.inc();
logger.error("[COMPACT] Failed to erase blob {} from Azure blob storage with status {}", blobItem.getName(),
response.getStatusCode());
}
numBlobsPurged += eraseBlob(blobContainerClient.getBlobClient(blobItem.getName()), eraseReason) ? 1 : 0;
}
} else {
logger.trace("[COMPACT] Cannot erase blob {} from Azure blob storage because condition not met: {}", blobItem.getName(), eraseReason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,17 +793,6 @@ public void testStoreDeletes() throws Exception {
any(CloudUpdateValidator.class));
}
verifyCacheHits(count * 4 + numPuts, count * 2);

// Test 5: Try to upload a set of blobs containing duplicates. This should fail.
List<MessageInfo> messageInfoList = new ArrayList<>(messageInfoMap.values());
messageInfoList.add(messageInfoList.get(messageInfoMap.values().size() - 1));
try {
store.delete(messageInfoList);
fail("delete must throw an exception for duplicates");
} catch (IllegalArgumentException iaex) {
assumeTrue(iaex.getMessage().startsWith("list contains duplicates"));
}
verifyCacheHits(count * 4 + numPuts, count * 2);
}

/** Test the CloudBlobStore updateTtl method. */
Expand Down

0 comments on commit 7eb76db

Please sign in to comment.