diff --git a/lib/api/api.js b/lib/api/api.js index bf0d9f65d4..71f0f700cf 100644 --- a/lib/api/api.js +++ b/lib/api/api.js @@ -146,6 +146,7 @@ const api = { const requestContexts = prepareRequestContexts(apiMethod, request, sourceBucket, sourceObject, sourceVersionId); + // Extract all the _apiMethods and store them in an array const apiMethods = requestContexts ? requestContexts.map(context => context._apiMethod) : []; // Attach the names to the current request diff --git a/lib/api/apiUtils/authorization/prepareRequestContexts.js b/lib/api/apiUtils/authorization/prepareRequestContexts.js index 9098c346fc..fa82b15cec 100644 --- a/lib/api/apiUtils/authorization/prepareRequestContexts.js +++ b/lib/api/apiUtils/authorization/prepareRequestContexts.js @@ -265,6 +265,15 @@ function prepareRequestContexts(apiMethod, request, sourceBucket, requestContexts.push(requestContext); } + if (apiMethod === 'completeMultipartUpload' || apiMethod === 'multipartDelete') { + // Request account quotas explicitly for MPU requests, to consider parts cleanup + // NOTE: we need quota for these, but it will be evaluated at the end of the API, + // once the parts have actually been deleted (not via standardMetadataValidateBucketAndObj) + requestContexts.forEach(context => { + context._needQuota = true; // eslint-disable-line no-param-reassign + }); + } + return requestContexts; } diff --git a/lib/api/apiUtils/object/abortMultipartUpload.js b/lib/api/apiUtils/object/abortMultipartUpload.js index 97eda8ff4e..be12659723 100644 --- a/lib/api/apiUtils/object/abortMultipartUpload.js +++ b/lib/api/apiUtils/object/abortMultipartUpload.js @@ -5,6 +5,7 @@ const { data } = require('../../../data/wrapper'); const locationConstraintCheck = require('../object/locationConstraintCheck'); const { standardMetadataValidateBucketAndObj } = require('../../../metadata/metadataUtils'); +const { validateQuotas } = require('../quotas/quotaUtils'); const services = require('../../../services'); const metadata = require('../../../metadata/wrapper'); @@ -138,7 +139,22 @@ function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log, } cb(); }); - }, () => next(null, mpuBucket, storedParts, destBucket)); + }, () => { + const length = storedParts.reduce((length, loc) => length + loc.value.Size, 0); + return validateQuotas(request, destBucket, request.accountQuotas, + ['objectDelete'], 'objectDelete', -length, false, log, err => { + if (err) { + // Ignore error, as the data has been deleted already: only inflight count + // has not been updated, and will be eventually consistent anyway + log.warn('failed to update inflights', { + method: 'abortMultipartUpload', + locations, + error: err, + }); + } + next(null, mpuBucket, storedParts, destBucket); + }); + }); }, function deleteShadowObjectMetadata(mpuBucket, storedParts, destBucket, next) { let splitter = constants.splitter; diff --git a/lib/api/completeMultipartUpload.js b/lib/api/completeMultipartUpload.js index 0b5fc339d1..c499dec0e3 100644 --- a/lib/api/completeMultipartUpload.js +++ b/lib/api/completeMultipartUpload.js @@ -22,6 +22,7 @@ const locationKeysHaveChanged = require('./apiUtils/object/locationKeysHaveChanged'); const { setExpirationHeaders } = require('./apiUtils/object/expirationHeaders'); const { validatePutVersionId } = require('./apiUtils/object/coldStorage'); +const { validateQuotas } = require('./apiUtils/quotas/quotaUtils'); const versionIdUtils = versioning.VersionID; @@ -213,12 +214,13 @@ function completeMultipartUpload(authInfo, request, log, callback) { return next(err, destBucket); } const storedParts = result.Contents; + const totalMPUSize = storedParts.reduce((acc, part) => acc + part.value.Size, 0); return next(null, destBucket, objMD, mpuBucket, storedParts, - jsonList, storedMetadata, location, mpuOverviewKey); + jsonList, storedMetadata, location, mpuOverviewKey, totalMPUSize); }); }, function completeExternalMpu(destBucket, objMD, mpuBucket, storedParts, - jsonList, storedMetadata, location, mpuOverviewKey, next) { + jsonList, storedMetadata, location, mpuOverviewKey, totalMPUSize, next) { const mdInfo = { storedParts, mpuOverviewKey, splitter }; const mpuInfo = { objectKey, uploadId, jsonList, bucketName, destBucket }; @@ -236,16 +238,17 @@ function completeMultipartUpload(authInfo, request, log, callback) { } // if mpu not handled externally, completeObjData will be null return next(null, destBucket, objMD, mpuBucket, storedParts, - jsonList, storedMetadata, completeObjData, mpuOverviewKey); + jsonList, storedMetadata, completeObjData, mpuOverviewKey, + totalMPUSize); }); }, function validateAndFilterParts(destBucket, objMD, mpuBucket, storedParts, jsonList, storedMetadata, completeObjData, mpuOverviewKey, - next) { + totalMPUSize, next) { if (completeObjData) { return next(null, destBucket, objMD, mpuBucket, storedParts, jsonList, storedMetadata, completeObjData, mpuOverviewKey, - completeObjData.filteredPartsObj); + completeObjData.filteredPartsObj, totalMPUSize); } const filteredPartsObj = validateAndFilterMpuParts(storedParts, jsonList, mpuOverviewKey, splitter, log); @@ -254,11 +257,11 @@ function completeMultipartUpload(authInfo, request, log, callback) { } return next(null, destBucket, objMD, mpuBucket, storedParts, jsonList, storedMetadata, completeObjData, mpuOverviewKey, - filteredPartsObj); + filteredPartsObj, totalMPUSize); }, function processParts(destBucket, objMD, mpuBucket, storedParts, jsonList, storedMetadata, completeObjData, mpuOverviewKey, - filteredPartsObj, next) { + filteredPartsObj, totalMPUSize, next) { // if mpu was completed on backend that stored mpu MD externally, // skip MD processing steps if (completeObjData && skipMpuPartProcessing(completeObjData)) { @@ -276,7 +279,7 @@ function completeMultipartUpload(authInfo, request, log, callback) { const calculatedSize = completeObjData.contentLength; return next(null, destBucket, objMD, mpuBucket, storedMetadata, completeObjData.eTag, calculatedSize, dataLocations, - [mpuOverviewKey], null, completeObjData); + [mpuOverviewKey], null, completeObjData, totalMPUSize); } const partsInfo = @@ -300,15 +303,15 @@ function completeMultipartUpload(authInfo, request, log, callback) { ]; return next(null, destBucket, objMD, mpuBucket, storedMetadata, aggregateETag, calculatedSize, dataLocations, keysToDelete, - extraPartLocations, completeObjData); + extraPartLocations, completeObjData, totalMPUSize); } return next(null, destBucket, objMD, mpuBucket, storedMetadata, aggregateETag, calculatedSize, dataLocations, keysToDelete, - extraPartLocations, null); + extraPartLocations, null, totalMPUSize); }, function prepForStoring(destBucket, objMD, mpuBucket, storedMetadata, aggregateETag, calculatedSize, dataLocations, keysToDelete, - extraPartLocations, completeObjData, next) { + extraPartLocations, completeObjData, totalMPUSize, next) { const metaHeaders = {}; const keysNotNeeded = ['initiator', 'partLocations', 'key', @@ -321,6 +324,8 @@ function completeMultipartUpload(authInfo, request, log, callback) { metaHeaders[item] = storedMetadata[item]; }); + const droppedMPUSize = totalMPUSize - calculatedSize; + const metaStoreParams = { authInfo, objectKey, @@ -380,7 +385,7 @@ function completeMultipartUpload(authInfo, request, log, callback) { return process.nextTick(() => next(null, destBucket, dataLocations, metaStoreParams, mpuBucket, keysToDelete, aggregateETag, objMD, extraPartLocations, pseudoCipherBundle, - completeObjData, options)); + completeObjData, options, droppedMPUSize)); } if (!destBucket.isVersioningEnabled() && objMD?.archive?.archiveInfo) { @@ -403,13 +408,13 @@ function completeMultipartUpload(authInfo, request, log, callback) { return next(null, destBucket, dataLocations, metaStoreParams, mpuBucket, keysToDelete, aggregateETag, objMD, extraPartLocations, pseudoCipherBundle, - completeObjData, options); + completeObjData, options, droppedMPUSize); }); }, function storeAsNewObj(destinationBucket, dataLocations, metaStoreParams, mpuBucket, keysToDelete, aggregateETag, objMD, extraPartLocations, pseudoCipherBundle, - completeObjData, options, next) { + completeObjData, options, droppedMPUSize, next) { const dataToDelete = options.dataToDelete; /* eslint-disable no-param-reassign */ metaStoreParams.versionId = options.versionId; @@ -461,7 +466,7 @@ function completeMultipartUpload(authInfo, request, log, callback) { return next(null, mpuBucket, keysToDelete, aggregateETag, extraPartLocations, destinationBucket, // pass the original version ID as generatedVersionId - objMD.versionId); + objMD.versionId, droppedMPUSize); } } return services.metadataStoreObject(destinationBucket.getName(), @@ -499,31 +504,44 @@ function completeMultipartUpload(authInfo, request, log, callback) { } return next(null, mpuBucket, keysToDelete, aggregateETag, extraPartLocations, - destinationBucket, generatedVersionId); + destinationBucket, generatedVersionId, + droppedMPUSize); }); } return next(null, mpuBucket, keysToDelete, aggregateETag, extraPartLocations, destinationBucket, - generatedVersionId); + generatedVersionId, droppedMPUSize); }); }, function deletePartsMetadata(mpuBucket, keysToDelete, aggregateETag, - extraPartLocations, destinationBucket, generatedVersionId, next) { + extraPartLocations, destinationBucket, generatedVersionId, droppedMPUSize, next) { services.batchDeleteObjectMetadata(mpuBucket.getName(), keysToDelete, log, err => next(err, extraPartLocations, - destinationBucket, aggregateETag, generatedVersionId)); + destinationBucket, aggregateETag, generatedVersionId, droppedMPUSize)); }, function batchDeleteExtraParts(extraPartLocations, destinationBucket, - aggregateETag, generatedVersionId, next) { + aggregateETag, generatedVersionId, droppedMPUSize, next) { if (extraPartLocations && extraPartLocations.length > 0) { - return data.batchDelete(extraPartLocations, request.method, - null, log, err => { - if (err) { - return next(err); - } - return next(null, destinationBucket, aggregateETag, - generatedVersionId); + return data.batchDelete(extraPartLocations, request.method, null, log, err => { + if (err) { + return next(err); + } + + return validateQuotas(request, destinationBucket, request.accountQuotas, + ['objectDelete'], 'objectDelete', -droppedMPUSize, false, log, err => { + if (err) { + // Ignore error, as the data has been deleted already: only inflight count + // has not been updated, and will be eventually consistent anyway + log.warn('failed to update inflights', { + method: 'completeMultipartUpload', + extraPartLocations, + error: err, + }); + } + return next(null, destinationBucket, aggregateETag, + generatedVersionId); }); + }); } return next(null, destinationBucket, aggregateETag, generatedVersionId); diff --git a/tests/quota/awsNodeSdk.js b/tests/quota/awsNodeSdk.js index f0420a853b..954932a227 100644 --- a/tests/quota/awsNodeSdk.js +++ b/tests/quota/awsNodeSdk.js @@ -883,4 +883,136 @@ function multiObjectDelete(bucket, keys, size, callback) { next => deleteBucket(bucket, next), ], done); }); + + it('should reduce inflights when completing MPU with fewer parts than uploaded', done => { + const bucket = 'quota-test-bucket-mpu1'; + const key = 'quota-test-object'; + const parts = 3; + const partSize = 5 * 1024 * 1024; + const totalSize = parts * partSize; + const usedParts = 2; + let uploadId = null; + const ETags = []; + + if (!s3Config.isQuotaInflightEnabled()) { + return done(); + } + + return async.series([ + next => createBucket(bucket, false, next), + next => sendRequest(putQuotaVerb, '127.0.0.1:8000', `/${bucket}/?quota=true`, + JSON.stringify({ quota: totalSize * 2 }), config) + .then(() => next()).catch(err => next(err)), + next => s3Client.createMultipartUpload({ + Bucket: bucket, + Key: key, + }, (err, data) => { + if (err) { + return next(err); + } + uploadId = data.UploadId; + return next(); + }), + next => async.timesSeries(parts, (n, cb) => { + const uploadPartParams = { + Bucket: bucket, + Key: key, + PartNumber: n + 1, + UploadId: uploadId, + Body: Buffer.alloc(partSize), + }; + return s3Client.uploadPart(uploadPartParams, (err, data) => { + if (err) { + return cb(err); + } + ETags[n] = data.ETag; + return cb(); + }); + }, next), + next => wait(inflightFlushFrequencyMS * 2, next), + next => { + // Verify all parts are counted in inflights + assert.strictEqual(scuba.getInflightsForBucket(bucket), totalSize); + return next(); + }, + next => { + // Complete with only first two parts + const params = { + Bucket: bucket, + Key: key, + MultipartUpload: { + Parts: Array.from({ length: usedParts }, (_, i) => ({ + ETag: ETags[i], + PartNumber: i + 1, + })), + }, + UploadId: uploadId, + }; + return s3Client.completeMultipartUpload(params, next); + }, + next => wait(inflightFlushFrequencyMS * 2, () => next()), + next => { + // Verify inflights reduced by dropped part + const expectedInflights = usedParts * partSize; + assert.strictEqual(scuba.getInflightsForBucket(bucket), expectedInflights); + return next(); + }, + next => deleteObject(bucket, key, usedParts * partSize, next), + next => deleteBucket(bucket, next), + ], done); + }); + + it('should reduce inflights when aborting MPU', done => { + const bucket = 'quota-test-bucket-mpu2'; + const key = 'quota-test-object'; + const parts = 3; + const partSize = 5 * 1024 * 1024; + const totalSize = parts * partSize; + let uploadId = null; + + if (!s3Config.isQuotaInflightEnabled()) { + return done(); + } + + return async.series([ + next => createBucket(bucket, false, next), + next => sendRequest(putQuotaVerb, '127.0.0.1:8000', `/${bucket}/?quota=true`, + JSON.stringify({ quota: totalSize * 2 }), config) + .then(() => next()).catch(err => next(err)), + next => s3Client.createMultipartUpload({ + Bucket: bucket, + Key: key, + }, (err, data) => { + if (err) { + return next(err); + } + uploadId = data.UploadId; + return next(); + }), + next => async.timesSeries(parts, (n, cb) => { + const uploadPartParams = { + Bucket: bucket, + Key: key, + PartNumber: n + 1, + UploadId: uploadId, + Body: Buffer.alloc(partSize), + }; + return s3Client.uploadPart(uploadPartParams, cb); + }, next), + next => wait(inflightFlushFrequencyMS * 2, next), + next => { + // Verify all parts are counted in inflights + assert.strictEqual(scuba.getInflightsForBucket(bucket), totalSize); + return next(); + }, + next => abortMPU(bucket, key, uploadId, totalSize, next), + next => wait(inflightFlushFrequencyMS * 2, next), + next => { + // Verify inflights reduced to zero after abort + assert.strictEqual(scuba.getInflightsForBucket(bucket), 0); + return next(); + }, + next => deleteBucket(bucket, next), + ], done); + }); }); diff --git a/tests/unit/api/api.js b/tests/unit/api/api.js new file mode 100644 index 0000000000..d556ce9add --- /dev/null +++ b/tests/unit/api/api.js @@ -0,0 +1,113 @@ +const sinon = require('sinon'); +const { errors, auth } = require('arsenal'); +const api = require('../../../lib/api/api'); +const DummyRequest = require('../DummyRequest'); +const { default: AuthInfo } = require('arsenal/build/lib/auth/AuthInfo'); +const assert = require('assert'); + +describe('api.callApiMethod', () => { + let sandbox; + let request; + let response; + let log; + let authServer; + + beforeEach(() => { + sandbox = sinon.createSandbox(); + + request = new DummyRequest('my-obj'); + request.query = {}; + request.socket = { + remoteAddress: '127.0.0.1', + }; + + response = { + write: sandbox.stub(), + end: sandbox.stub() + }; + + log = { + addDefaultFields: sandbox.stub(), + trace: sandbox.stub(), + error: sandbox.stub(), + debug: sandbox.stub() + }; + + authServer = { + doAuth: sandbox.stub().callsArgWith(2, null, new AuthInfo({}), [{ + isAllowed: true, + isImplicit: false, + }], null, { + accountQuota: 5000, + }), + }; + + sandbox.stub(auth, 'server').value(authServer); + }); + + afterEach(() => { + sandbox.restore(); + }); + + it('should attach apiMethod to request', done => { + const testMethod = 'bucketGet'; + api.callApiMethod(testMethod, request, response, log, () => { + assert.strictEqual(request.apiMethod, testMethod); + done(); + }); + }); + + it('should initialize finalizerHooks array', done => { + api.callApiMethod('bucketGet', request, response, log, () => { + assert.strictEqual(Array.isArray(request.finalizerHooks), true); + assert.strictEqual(request.finalizerHooks.length, 0); + done(); + }); + }); + + it('should handle auth server errors', done => { + authServer.doAuth.callsArgWith(2, errors.AccessDenied); + + api.callApiMethod('bucketGet', request, response, log, err => { + assert(err.is.AccessDenied); + done(); + }); + }); + + it('should execute finalizer hooks after api method completion', done => { + let called = false; + + sandbox.stub(api, 'objectPut').callsFake((userInfo, _request, streamingV4Params, log, cb) => { + request.finalizerHooks.push((err, _done) => { + called = true; + _done(); + }); + cb(); + }); + request.objectKey = 'testobject'; + api.callApiMethod('objectPut', request, response, log, () => { + assert.strictEqual(called, true); + done(); + }); + }); + + it('should set _needQuota to true for completeMultipartUpload', done => { + authServer.doAuth.callsFake((req, log, cb, awsService, requestContexts) => { + assert.strictEqual(requestContexts[0]._needQuota, true); + done(); + }); + sandbox.stub(api, 'completeMultipartUpload').callsFake( + (userInfo, _request, streamingV4Params, log, cb) => cb); + api.callApiMethod('completeMultipartUpload', request, response, log); + }); + + it('should set _needQuota to true for multipartDelete', done => { + authServer.doAuth.callsFake((req, log, cb, awsService, requestContexts) => { + assert.strictEqual(requestContexts[0]._needQuota, true); + done(); + }); + sandbox.stub(api, 'multipartDelete').callsFake( + (userInfo, _request, streamingV4Params, log, cb) => cb); + api.callApiMethod('multipartDelete', request, response, log); + }); +});