From 7eb76db8530382cb36536187803bf1de6fdf103c Mon Sep 17 00:00:00 2001 From: Sanketh Nalli Date: Thu, 25 Jul 2024 16:28:27 -0700 Subject: [PATCH] [vcr-2.0] Delete blob eagerly (#2836) Implementing immediate blob deletion from the cloud once the VCR receives a DEL message from the server via replication. This approach offers several advantages over our current method, where instead of deleting the blob outright, we currently add a delete-timestamp to Azure blob metadata. Compaction then deletes the blob later based on this timestamp. The benefits of the new approach include: Proactively reclaiming space in Azure. Reducing the number of network calls required to delete a blob; currently, it involves 2 network calls to attach a timestamp and another 2 network calls from compaction to remove the blob. Decreasing the size of the response from the list-metadata API used by compaction to list all blobs in an Azure partition. Based on customer usage patterns, few customers utilize the undelete feature. If the VCR receives an undelete request after a blob has been removed from the cloud, it will be re-uploaded to the cloud using the replication protocol. This patch also removes some unused legacy code --- .../github/ambry/cloud/CloudBlobStore.java | 21 +--- .../cloud/azure/AzureBlobDeletePolicy.java | 26 +++++ .../ambry/cloud/azure/AzureCloudConfig.java | 7 ++ .../azure/AzureCloudDestinationSync.java | 110 +++++++++--------- .../github/ambry/vcr/CloudBlobStoreTest.java | 11 -- 5 files changed, 94 insertions(+), 81 deletions(-) create mode 100644 ambry-cloud/src/main/java/com/github/ambry/cloud/azure/AzureBlobDeletePolicy.java diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudBlobStore.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudBlobStore.java index af7596951b..e6e603837e 100644 --- a/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudBlobStore.java +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudBlobStore.java @@ -634,24 +634,15 @@ private boolean shouldUpload(MessageInfo messageInfo) { @Override public void delete(List infos) throws StoreException { - // TODO: Remove the duplicate code by calling deleteAsync() method. - checkStarted(); - checkDuplicates(infos); - try { - for (MessageInfo msgInfo : infos) { - BlobId blobId = (BlobId) msgInfo.getStoreKey(); - // If the cache has been updated by another thread, retry may be avoided - requestAgent.doWithRetries(() -> deleteIfNeeded(blobId, msgInfo.getOperationTimeMs(), msgInfo.getLifeVersion()), - "Delete", partitionId.toPathString()); + for (MessageInfo msg : infos) { + cloudDestination.deleteBlob((BlobId) msg.getStoreKey(), msg.getOperationTimeMs(), msg.getLifeVersion(), null); } - } catch (CloudStorageException ex) { - if (ex.getCause() instanceof StoreException) { - throw (StoreException) ex.getCause(); + } catch (CloudStorageException cse) { + if (cse.getCause() instanceof StoreException) { + throw (StoreException) cse.getCause(); } - StoreErrorCodes errorCode = - (ex.getStatusCode() == STATUS_NOT_FOUND) ? StoreErrorCodes.ID_Not_Found : StoreErrorCodes.IOError; - throw new StoreException(ex, errorCode); + throw new StoreException(cse.getCause(), StoreErrorCodes.IOError); } } diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/azure/AzureBlobDeletePolicy.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/azure/AzureBlobDeletePolicy.java new file mode 100644 index 0000000000..37e184230e --- /dev/null +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/azure/AzureBlobDeletePolicy.java @@ -0,0 +1,26 @@ +/** + * Copyright 2024 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.cloud.azure; + +public enum AzureBlobDeletePolicy { + /** + * Attach a delete-timestamp to azure-blob, but do not delete it. + * Let compaction job eventually delete the blob. + */ + EVENTUAL, + /** + * Just delete the blob when a DELETE message is received from server. + */ + IMMEDIATE +} diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/azure/AzureCloudConfig.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/azure/AzureCloudConfig.java index 0ab9c97dc9..097d2189d2 100644 --- a/ambry-cloud/src/main/java/com/github/ambry/cloud/azure/AzureCloudConfig.java +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/azure/AzureCloudConfig.java @@ -463,7 +463,14 @@ static List parseStorageAccountInfo(String storageAccountInf public static final int DEFAULT_AZURE_MAX_TRIES = 4; // Defaults from RequestRetryOptions @Config(AZURE_MAX_TRIES) public final int azureMaxTries; + + public static final String AZURE_BLOB_DELETE_POLICY = "azure.blob.delete.policy"; + public final AzureBlobDeletePolicy DEFAULT_AZURE_BLOB_DELETE_POLICY = AzureBlobDeletePolicy.EVENTUAL; + @Config(AZURE_BLOB_DELETE_POLICY) + public final AzureBlobDeletePolicy azureBlobDeletePolicy; + public AzureCloudConfig(VerifiableProperties verifiableProperties) { + azureBlobDeletePolicy = verifiableProperties.getEnum(AZURE_BLOB_DELETE_POLICY, AzureBlobDeletePolicy.class, DEFAULT_AZURE_BLOB_DELETE_POLICY); // 5000 is the default size of Azure blob storage azureBlobStorageMaxResultsPerPage = verifiableProperties.getInt(AZURE_BLOB_STORAGE_MAX_RESULTS_PER_PAGE, 5000); azureStorageConnectionString = verifiableProperties.getString(AZURE_STORAGE_CONNECTION_STRING, ""); diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/azure/AzureCloudDestinationSync.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/azure/AzureCloudDestinationSync.java index ef3faee7c4..cbed08becb 100644 --- a/ambry-cloud/src/main/java/com/github/ambry/cloud/azure/AzureCloudDestinationSync.java +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/azure/AzureCloudDestinationSync.java @@ -669,39 +669,35 @@ CloudStorageException toCloudStorageException(String msg, Throwable t) { @Override public boolean deleteBlob(BlobId blobId, long deletionTime, short lifeVersion, - CloudUpdateValidator cloudUpdateValidator) throws CloudStorageException { + CloudUpdateValidator unused) throws CloudStorageException { Timer.Context storageTimer = azureMetrics.blobUpdateDeleteTimeLatency.time(); AzureBlobLayoutStrategy.BlobLayout blobLayout = azureBlobLayoutStrategy.getDataBlobLayout(blobId); String blobIdStr = blobLayout.blobFilePath; - Map newMetadata = new HashMap<>(); - newMetadata.put(CloudBlobMetadata.FIELD_DELETION_TIME, String.valueOf(deletionTime)); - newMetadata.put(CloudBlobMetadata.FIELD_LIFE_VERSION, lifeVersion); + if (azureCloudConfig.azureBlobDeletePolicy.equals(AzureBlobDeletePolicy.IMMEDIATE)) { + return eraseBlob(createOrGetBlobStore(blobLayout.containerName).getBlobClient(blobLayout.blobFilePath), + "CUSTOMER_DELETE_REQUEST"); + } + // AzureBlobDeletePolicy.EVENTUAL BlobProperties blobProperties = getBlobPropertiesCached(blobLayout); Map cloudMetadata = blobProperties.getMetadata(); - + // lifeVersion must always be present + short cloudlifeVersion = Short.parseShort(cloudMetadata.get(CloudBlobMetadata.FIELD_LIFE_VERSION)); try { - if (cloudUpdateValidator != null && - !cloudUpdateValidator.validateUpdate(CloudBlobMetadata.fromMap(cloudMetadata), blobId, newMetadata)) { - // lifeVersion must always be present - short cloudlifeVersion = Short.parseShort(cloudMetadata.get(CloudBlobMetadata.FIELD_LIFE_VERSION)); - if (cloudlifeVersion > lifeVersion) { - String error = String.format("Failed to update deleteTime of blob %s as it has a higher life version in cloud than replicated message: %s > %s", - blobIdStr, cloudlifeVersion, lifeVersion); - logger.trace(error); - throw AzureCloudDestination.toCloudStorageException(error, new StoreException(error, StoreErrorCodes.Life_Version_Conflict), null); - } + if (cloudlifeVersion > lifeVersion) { + String error = String.format("Failed to update deleteTime of blob %s as it has a higher life version in cloud than replicated message: %s > %s", + blobIdStr, cloudlifeVersion, lifeVersion); + logger.trace(error); + throw new StoreException(error, StoreErrorCodes.Life_Version_Conflict); + } + if (cloudlifeVersion == lifeVersion && cloudMetadata.containsKey(CloudBlobMetadata.FIELD_DELETION_TIME)) { String error = String.format("Failed to update deleteTime of blob %s as it is marked for deletion in cloud", blobIdStr); logger.trace(error); - throw AzureCloudDestination.toCloudStorageException(error, new StoreException(error, StoreErrorCodes.ID_Deleted), null); + throw new StoreException(error, StoreErrorCodes.ID_Deleted); } - } catch (StoreException e) { - azureMetrics.blobUpdateDeleteTimeErrorCount.inc(); - String error = String.format("Failed to update deleteTime of blob %s in Azure blob storage due to (%s)", blobLayout, e.getMessage()); - throw AzureCloudDestination.toCloudStorageException(error, e, null); - } - - newMetadata.forEach((k,v) -> cloudMetadata.put(k, String.valueOf(v))); - try { + Map newMetadata = new HashMap<>(); + newMetadata.put(CloudBlobMetadata.FIELD_DELETION_TIME, String.valueOf(deletionTime)); + newMetadata.put(CloudBlobMetadata.FIELD_LIFE_VERSION, lifeVersion); + newMetadata.forEach((k,v) -> cloudMetadata.put(k, String.valueOf(v))); logger.trace("Updating deleteTime of blob {} in Azure blob storage ", blobLayout.blobFilePath); Response response = updateBlobMetadata(blobLayout, blobProperties); // Success rate is effective, success counter is ineffective because it just monotonically increases @@ -709,24 +705,15 @@ public boolean deleteBlob(BlobId blobId, long deletionTime, short lifeVersion, logger.trace("Successfully updated deleteTime of blob {} in Azure blob storage with statusCode = {}, etag = {}", blobLayout.blobFilePath, response.getStatusCode(), response.getHeaders().get(HttpHeaderName.ETAG)); return true; - } catch (BlobStorageException bse) { - String error = String.format("Failed to update deleteTime of blob %s in Azure blob storage due to (%s)", blobLayout, bse.getMessage()); - if (bse.getErrorCode() == BlobErrorCode.CONDITION_NOT_MET) { - /* - If we are here, it just means that two threads tried to delete concurrently. This is ok. - */ - logger.trace(error); - return true; - } + } catch (StoreException e) { azureMetrics.blobUpdateDeleteTimeErrorCount.inc(); - logger.error(error); - throw AzureCloudDestination.toCloudStorageException(error, bse, null); + String error = String.format("Failed to update deleteTime of blob %s in Azure blob storage due to (%s)", blobLayout, e.getMessage()); + throw toCloudStorageException(error, e); } catch (Throwable t) { - // Unknown error azureMetrics.blobUpdateDeleteTimeErrorCount.inc(); String error = String.format("Failed to update deleteTime of blob %s in Azure blob storage due to (%s)", blobLayout, t.getMessage()); logger.error(error); - throw AzureCloudDestination.toCloudStorageException(error, t, null); + throw toCloudStorageException(error, t); } finally { storageTimer.stop(); } // try-catch @@ -947,6 +934,36 @@ public FindResult findEntriesSince(String partitionPath, FindToken findToken, lo throw new UnsupportedOperationException("findEntriesSince will not be implemented for AzureCloudDestinationSync"); } + /** + * Erases a blob permanently, including all snapshots of it from Azure Storage. + * @param blobClient Client for the blob + * @param eraseReason Reason to delete + * @return True if blob deleted, else false. + */ + protected boolean eraseBlob(BlobClient blobClient, String eraseReason) { + Timer.Context storageTimer = azureMetrics.blobCompactionLatency.time(); + BlobRequestConditions blobRequestConditions = new BlobRequestConditions().setIfMatch("*"); + Response response = blobClient.deleteWithResponse(DeleteSnapshotsOptionType.INCLUDE, + blobRequestConditions, Duration.ofMillis(cloudConfig.cloudRequestTimeout), Context.NONE); + storageTimer.stop(); + switch (response.getStatusCode()) { + case HttpStatus.SC_ACCEPTED: + logger.trace("[ERASE] Erased blob {}/{} from Azure blob storage, reason = {}, status = {}", + blobClient.getContainerName(), blobClient.getBlobName(), eraseReason, response.getStatusCode()); + azureMetrics.blobCompactionSuccessRate.mark(); + break; + case HttpStatus.SC_NOT_FOUND: + // If you're trying to delete a blob, then it must exist. + // If it doesn't, then there is something wrong in the code. Go figure it out ! + default: + // Just increment a counter and set an alert on it. No need to throw an error and fail the thread. + azureMetrics.blobCompactionErrorCount.inc(); + logger.error("[ERASE] Failed to erase blob {}/{} from Azure blob storage, reason = {}, status {}", + blobClient.getContainerName(), blobClient.getBlobName(), eraseReason, response.getStatusCode()); + } + return response.getStatusCode() == HttpStatus.SC_ACCEPTED; + } + /** * Erases blobs from a given list of blobs in cloud * @param blobItemList List of blobs in a container @@ -993,24 +1010,7 @@ protected int eraseBlobs(List blobItemList, BlobContainerClient blobCo logger.trace("[DRY-RUN][COMPACT] Can erase blob {} from Azure blob storage because {}", blobItem.getName(), eraseReason); numBlobsPurged += 1; } else { - Timer.Context storageTimer = azureMetrics.blobCompactionLatency.time(); - Response response = blobContainerClient.getBlobClient(blobItem.getName()) - .deleteWithResponse(DeleteSnapshotsOptionType.INCLUDE, null, null, null); - storageTimer.stop(); - switch (response.getStatusCode()) { - case HttpStatus.SC_ACCEPTED: - logger.trace("[COMPACT] Erased blob {} from Azure blob storage, reason = {}, status = {}", blobItem.getName(), - eraseReason, response.getStatusCode()); - numBlobsPurged += 1; - azureMetrics.blobCompactionSuccessRate.mark(); - break; - case HttpStatus.SC_NOT_FOUND: - default: - // Just increment a counter and set an alert on it. No need to throw an error and fail the thread. - azureMetrics.blobCompactionErrorCount.inc(); - logger.error("[COMPACT] Failed to erase blob {} from Azure blob storage with status {}", blobItem.getName(), - response.getStatusCode()); - } + numBlobsPurged += eraseBlob(blobContainerClient.getBlobClient(blobItem.getName()), eraseReason) ? 1 : 0; } } else { logger.trace("[COMPACT] Cannot erase blob {} from Azure blob storage because condition not met: {}", blobItem.getName(), eraseReason); diff --git a/ambry-vcr/src/test/java/com/github/ambry/vcr/CloudBlobStoreTest.java b/ambry-vcr/src/test/java/com/github/ambry/vcr/CloudBlobStoreTest.java index beb1830612..55bf6c9f52 100644 --- a/ambry-vcr/src/test/java/com/github/ambry/vcr/CloudBlobStoreTest.java +++ b/ambry-vcr/src/test/java/com/github/ambry/vcr/CloudBlobStoreTest.java @@ -793,17 +793,6 @@ public void testStoreDeletes() throws Exception { any(CloudUpdateValidator.class)); } verifyCacheHits(count * 4 + numPuts, count * 2); - - // Test 5: Try to upload a set of blobs containing duplicates. This should fail. - List messageInfoList = new ArrayList<>(messageInfoMap.values()); - messageInfoList.add(messageInfoList.get(messageInfoMap.values().size() - 1)); - try { - store.delete(messageInfoList); - fail("delete must throw an exception for duplicates"); - } catch (IllegalArgumentException iaex) { - assumeTrue(iaex.getMessage().startsWith("list contains duplicates")); - } - verifyCacheHits(count * 4 + numPuts, count * 2); } /** Test the CloudBlobStore updateTtl method. */