Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support erasing inflights in CompleteMPU and AbortMPU #5719

Open
wants to merge 5 commits into
base: development/8.8
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions lib/api/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@ const api = {

const requestContexts = prepareRequestContexts(apiMethod, request,
sourceBucket, sourceObject, sourceVersionId);

if (apiMethod === 'completeMultipartUpload' || apiMethod === 'multipartDelete') {
// Request account quotas explicitly for MPU requests, to consider parts cleanup
// NOTE: we need quota for these, but it will be evaluated at the end of the API,
// once the parts have actually been deleted (not via standardMetadataValidateBucketAndObj)
requestContexts.forEach(context => {
context._needQuota = true; // eslint-disable-line no-param-reassign
});
}
williamlardier marked this conversation as resolved.
Show resolved Hide resolved

// Extract all the _apiMethods and store them in an array
const apiMethods = requestContexts ? requestContexts.map(context => context._apiMethod) : [];
// Attach the names to the current request
Expand Down
18 changes: 17 additions & 1 deletion lib/api/apiUtils/object/abortMultipartUpload.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const { data } = require('../../../data/wrapper');
const locationConstraintCheck = require('../object/locationConstraintCheck');
const { standardMetadataValidateBucketAndObj } =
require('../../../metadata/metadataUtils');
const { validateQuotas } = require('../quotas/quotaUtils');
const services = require('../../../services');
const metadata = require('../../../metadata/wrapper');

Expand Down Expand Up @@ -138,7 +139,22 @@ function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,
}
cb();
});
}, () => next(null, mpuBucket, storedParts, destBucket));
}, () => {
const length = storedParts.reduce((length, loc) => length + loc.value.Size, 0);
return validateQuotas(request, destBucket, request.accountQuotas,
['objectDelete'], 'objectDelete', -length, false, log, err => {
if (err) {
// Ignore error, as the data has been deleted already: only inflight count
// has not been updated, and will be eventually consistent anyway
log.warn('failed to update inflights', {
method: 'abortMultipartUpload',
locations,
error: err,
});
}
next(null, mpuBucket, storedParts, destBucket);
});
});
},
function deleteShadowObjectMetadata(mpuBucket, storedParts, destBucket, next) {
let splitter = constants.splitter;
Expand Down
72 changes: 45 additions & 27 deletions lib/api/completeMultipartUpload.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const locationKeysHaveChanged
= require('./apiUtils/object/locationKeysHaveChanged');
const { setExpirationHeaders } = require('./apiUtils/object/expirationHeaders');
const { validatePutVersionId } = require('./apiUtils/object/coldStorage');
const { validateQuotas } = require('./apiUtils/quotas/quotaUtils');

const versionIdUtils = versioning.VersionID;

Expand Down Expand Up @@ -213,12 +214,13 @@ function completeMultipartUpload(authInfo, request, log, callback) {
return next(err, destBucket);
}
const storedParts = result.Contents;
const totalMPUSize = storedParts.reduce((acc, part) => acc + part.value.Size, 0);
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, location, mpuOverviewKey);
jsonList, storedMetadata, location, mpuOverviewKey, totalMPUSize);
});
},
function completeExternalMpu(destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, location, mpuOverviewKey, next) {
jsonList, storedMetadata, location, mpuOverviewKey, totalMPUSize, next) {
const mdInfo = { storedParts, mpuOverviewKey, splitter };
const mpuInfo =
{ objectKey, uploadId, jsonList, bucketName, destBucket };
Expand All @@ -236,16 +238,17 @@ function completeMultipartUpload(authInfo, request, log, callback) {
}
// if mpu not handled externally, completeObjData will be null
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData, mpuOverviewKey);
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
totalMPUSize);
});
},
function validateAndFilterParts(destBucket, objMD, mpuBucket,
storedParts, jsonList, storedMetadata, completeObjData, mpuOverviewKey,
next) {
totalMPUSize, next) {
if (completeObjData) {
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
completeObjData.filteredPartsObj);
completeObjData.filteredPartsObj, totalMPUSize);
}
const filteredPartsObj = validateAndFilterMpuParts(storedParts,
jsonList, mpuOverviewKey, splitter, log);
Expand All @@ -254,11 +257,11 @@ function completeMultipartUpload(authInfo, request, log, callback) {
}
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
filteredPartsObj);
filteredPartsObj, totalMPUSize);
},
function processParts(destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
filteredPartsObj, next) {
filteredPartsObj, totalMPUSize, next) {
// if mpu was completed on backend that stored mpu MD externally,
// skip MD processing steps
if (completeObjData && skipMpuPartProcessing(completeObjData)) {
Expand All @@ -276,7 +279,7 @@ function completeMultipartUpload(authInfo, request, log, callback) {
const calculatedSize = completeObjData.contentLength;
return next(null, destBucket, objMD, mpuBucket, storedMetadata,
completeObjData.eTag, calculatedSize, dataLocations,
[mpuOverviewKey], null, completeObjData);
[mpuOverviewKey], null, completeObjData, totalMPUSize);
}

const partsInfo =
Expand All @@ -300,15 +303,15 @@ function completeMultipartUpload(authInfo, request, log, callback) {
];
return next(null, destBucket, objMD, mpuBucket, storedMetadata,
aggregateETag, calculatedSize, dataLocations, keysToDelete,
extraPartLocations, completeObjData);
extraPartLocations, completeObjData, totalMPUSize);
}
return next(null, destBucket, objMD, mpuBucket, storedMetadata,
aggregateETag, calculatedSize, dataLocations, keysToDelete,
extraPartLocations, null);
extraPartLocations, null, totalMPUSize);
},
function prepForStoring(destBucket, objMD, mpuBucket, storedMetadata,
aggregateETag, calculatedSize, dataLocations, keysToDelete,
extraPartLocations, completeObjData, next) {
extraPartLocations, completeObjData, totalMPUSize, next) {
const metaHeaders = {};
const keysNotNeeded =
['initiator', 'partLocations', 'key',
Expand All @@ -321,6 +324,8 @@ function completeMultipartUpload(authInfo, request, log, callback) {
metaHeaders[item] = storedMetadata[item];
});

const droppedMPUSize = totalMPUSize - calculatedSize;

const metaStoreParams = {
authInfo,
objectKey,
Expand Down Expand Up @@ -380,7 +385,7 @@ function completeMultipartUpload(authInfo, request, log, callback) {
return process.nextTick(() => next(null, destBucket, dataLocations,
metaStoreParams, mpuBucket, keysToDelete, aggregateETag,
objMD, extraPartLocations, pseudoCipherBundle,
completeObjData, options));
completeObjData, options, droppedMPUSize));
}

if (!destBucket.isVersioningEnabled() && objMD?.archive?.archiveInfo) {
Expand All @@ -403,13 +408,13 @@ function completeMultipartUpload(authInfo, request, log, callback) {
return next(null, destBucket, dataLocations,
metaStoreParams, mpuBucket, keysToDelete, aggregateETag,
objMD, extraPartLocations, pseudoCipherBundle,
completeObjData, options);
completeObjData, options, droppedMPUSize);
});
},
function storeAsNewObj(destinationBucket, dataLocations,
metaStoreParams, mpuBucket, keysToDelete, aggregateETag, objMD,
extraPartLocations, pseudoCipherBundle,
completeObjData, options, next) {
completeObjData, options, droppedMPUSize, next) {
const dataToDelete = options.dataToDelete;
/* eslint-disable no-param-reassign */
metaStoreParams.versionId = options.versionId;
Expand Down Expand Up @@ -461,7 +466,7 @@ function completeMultipartUpload(authInfo, request, log, callback) {
return next(null, mpuBucket, keysToDelete, aggregateETag,
extraPartLocations, destinationBucket,
// pass the original version ID as generatedVersionId
objMD.versionId);
objMD.versionId, droppedMPUSize);
}
}
return services.metadataStoreObject(destinationBucket.getName(),
Expand Down Expand Up @@ -499,31 +504,44 @@ function completeMultipartUpload(authInfo, request, log, callback) {
}
return next(null, mpuBucket, keysToDelete,
aggregateETag, extraPartLocations,
destinationBucket, generatedVersionId);
destinationBucket, generatedVersionId,
droppedMPUSize);
});
}
return next(null, mpuBucket, keysToDelete, aggregateETag,
extraPartLocations, destinationBucket,
generatedVersionId);
generatedVersionId, droppedMPUSize);
});
},
function deletePartsMetadata(mpuBucket, keysToDelete, aggregateETag,
extraPartLocations, destinationBucket, generatedVersionId, next) {
extraPartLocations, destinationBucket, generatedVersionId, droppedMPUSize, next) {
services.batchDeleteObjectMetadata(mpuBucket.getName(),
keysToDelete, log, err => next(err, extraPartLocations,
destinationBucket, aggregateETag, generatedVersionId));
destinationBucket, aggregateETag, generatedVersionId, droppedMPUSize));
},
function batchDeleteExtraParts(extraPartLocations, destinationBucket,
aggregateETag, generatedVersionId, next) {
aggregateETag, generatedVersionId, droppedMPUSize, next) {
if (extraPartLocations && extraPartLocations.length > 0) {
return data.batchDelete(extraPartLocations, request.method,
null, log, err => {
if (err) {
return next(err);
}
return next(null, destinationBucket, aggregateETag,
generatedVersionId);
return data.batchDelete(extraPartLocations, request.method, null, log, err => {
if (err) {
return next(err);
}

francoisferrand marked this conversation as resolved.
Show resolved Hide resolved
return validateQuotas(request, destinationBucket, request.accountQuotas,
['objectDelete'], 'objectDelete', -droppedMPUSize, false, log, err => {
if (err) {
// Ignore error, as the data has been deleted already: only inflight count
// has not been updated, and will be eventually consistent anyway
log.warn('failed to update inflights', {
method: 'completeMultipartUpload',
extraPartLocations,
error: err,
});
}
return next(null, destinationBucket, aggregateETag,
generatedVersionId);
});
});
}
return next(null, destinationBucket, aggregateETag,
generatedVersionId);
Expand Down
132 changes: 132 additions & 0 deletions tests/quota/awsNodeSdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -883,4 +883,136 @@ function multiObjectDelete(bucket, keys, size, callback) {
next => deleteBucket(bucket, next),
], done);
});

it('should reduce inflights when completing MPU with fewer parts than uploaded', done => {
const bucket = 'quota-test-bucket-mpu1';
const key = 'quota-test-object';
const parts = 3;
const partSize = 5 * 1024 * 1024;
const totalSize = parts * partSize;
const usedParts = 2;
let uploadId = null;
const ETags = [];

if (!s3Config.isQuotaInflightEnabled()) {
return done();
}

return async.series([
next => createBucket(bucket, false, next),
next => sendRequest(putQuotaVerb, '127.0.0.1:8000', `/${bucket}/?quota=true`,
JSON.stringify({ quota: totalSize * 2 }), config)
.then(() => next()).catch(err => next(err)),
next => s3Client.createMultipartUpload({
Bucket: bucket,
Key: key,
}, (err, data) => {
if (err) {
return next(err);
}
uploadId = data.UploadId;
return next();
}),
next => async.timesSeries(parts, (n, cb) => {
const uploadPartParams = {
Bucket: bucket,
Key: key,
PartNumber: n + 1,
UploadId: uploadId,
Body: Buffer.alloc(partSize),
};
return s3Client.uploadPart(uploadPartParams, (err, data) => {
if (err) {
return cb(err);
}
ETags[n] = data.ETag;
return cb();
});
}, next),
next => wait(inflightFlushFrequencyMS * 2, next),
next => {
// Verify all parts are counted in inflights
assert.strictEqual(scuba.getInflightsForBucket(bucket), totalSize);
return next();
},
next => {
// Complete with only first two parts
const params = {
Bucket: bucket,
Key: key,
MultipartUpload: {
Parts: Array.from({ length: usedParts }, (_, i) => ({
ETag: ETags[i],
PartNumber: i + 1,
})),
},
UploadId: uploadId,
};
return s3Client.completeMultipartUpload(params, next);
},
next => wait(inflightFlushFrequencyMS * 2, () => next()),
next => {
// Verify inflights reduced by dropped part
const expectedInflights = usedParts * partSize;
assert.strictEqual(scuba.getInflightsForBucket(bucket), expectedInflights);
return next();
},
next => deleteObject(bucket, key, usedParts * partSize, next),
next => deleteBucket(bucket, next),
], done);
});

it('should reduce inflights when aborting MPU', done => {
const bucket = 'quota-test-bucket-mpu2';
const key = 'quota-test-object';
const parts = 3;
const partSize = 5 * 1024 * 1024;
const totalSize = parts * partSize;
let uploadId = null;

if (!s3Config.isQuotaInflightEnabled()) {
return done();
}

return async.series([
next => createBucket(bucket, false, next),
next => sendRequest(putQuotaVerb, '127.0.0.1:8000', `/${bucket}/?quota=true`,
JSON.stringify({ quota: totalSize * 2 }), config)
.then(() => next()).catch(err => next(err)),
next => s3Client.createMultipartUpload({
Bucket: bucket,
Key: key,
}, (err, data) => {
if (err) {
return next(err);
}
uploadId = data.UploadId;
return next();
}),
next => async.timesSeries(parts, (n, cb) => {
const uploadPartParams = {
Bucket: bucket,
Key: key,
PartNumber: n + 1,
UploadId: uploadId,
Body: Buffer.alloc(partSize),
};
return s3Client.uploadPart(uploadPartParams, cb);
}, next),
next => wait(inflightFlushFrequencyMS * 2, next),
next => {
// Verify all parts are counted in inflights
assert.strictEqual(scuba.getInflightsForBucket(bucket), totalSize);
return next();
},
next => abortMPU(bucket, key, uploadId, totalSize, next),
next => wait(inflightFlushFrequencyMS * 2, next),
next => {
// Verify inflights reduced to zero after abort
assert.strictEqual(scuba.getInflightsForBucket(bucket), 0);
return next();
},
next => deleteBucket(bucket, next),
], done);
});
});
Loading
Loading