Skip to content

Commit

Permalink
WIP: Modular: Re-implements streaming store as handler
Browse files Browse the repository at this point in the history
  • Loading branch information
DougReeder committed Mar 11, 2024
1 parent 0d0a798 commit de89258
Show file tree
Hide file tree
Showing 8 changed files with 862 additions and 81 deletions.
209 changes: 209 additions & 0 deletions lib/routes/S3Handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/* streaming storage to an S3-compatible service */

/* eslint-env node */
/* eslint-disable camelcase */
const express = require('express');
const { posix } = require('node:path');
const { HeadObjectCommand, S3Client, DeleteObjectCommand, GetObjectCommand, PutObjectCommand } = require('@aws-sdk/client-s3');
const normalizeETag = require('../util/normalizeETag');
const ParameterError = require('../util/ParameterError');
const { dirname, basename } = require('path');
const { createHash } = require('node:crypto');
const YAML = require('yaml');
const TimeoutError = require('../util/timeoutError');
const { Upload } = require('@aws-sdk/lib-storage');
const { pipeline } = require('node:stream/promises');

const PUT_TIMEOUT = 24 * 60 * 60 * 1000;
// const AUTH_PREFIX = 'remoteStorageAuth';
// const AUTHENTICATION_LOCAL_PASSWORD = 'authenticationLocalPassword';
// const USER_METADATA = 'userMetadata';
const FILE_PREFIX = 'remoteStorageBlob';
const EMPTY_DIRECTORY = { '@context': 'http://remotestorage.io/spec/folder-description', items: {} };

module.exports = function (endPoint = 'play.min.io', accessKey = 'Q3AM3UQ867SPQQA43P2F', secretKey = 'zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG', region = 'us-east-1') {
const sslEnabled = !/\blocalhost\b|\b127.0.0.1\b|\b10.0.0.2\b/.test(endPoint);
if (!endPoint.startsWith('http')) {
endPoint = (sslEnabled ? 'https://' : 'http://') + endPoint;
}
// if (!/:\d{1,5}\/?$/.test(endPoint)) {
// endPoint += ':9000';
// }

const s3client = new S3Client({
forcePathStyle: true,
region,
endpoint: endPoint,
sslEnabled,
credentials: {
accessKeyId: accessKey,
secretAccessKey: secretKey,
Version: 1
}
// logger: getLogger(),
});

const router = express.Router();
router.get('/:username/*',
async function (req, res, next) {
try {
const bucketName = req.params.username.toLowerCase();
const isDirectoryRequest = req.url.endsWith('/');
const s3Path = posix.join(FILE_PREFIX, req.url.slice(1 + bucketName.length));
let getParam;
if (req.get('If-None-Match')) {
getParam = { Bucket: bucketName, Key: s3Path, IfNoneMatch: req.get('If-None-Match') };
} else if (req.get('If-Match')) {
getParam = { Bucket: bucketName, Key: s3Path, IfMatch: req.get('If-Match') };
} else { // unconditional
getParam = { Bucket: bucketName, Key: s3Path };
}

const { Body, ETag, ContentType, ContentLength } = await s3client.send(new GetObjectCommand(getParam));
const isDirectory = ContentType === 'application/x.remotestorage-ld+json';
const contentType = isDirectory ? 'application/ld+json' : ContentType;
if (isDirectoryRequest ^ isDirectory) {
return res.status(409).end(); // Conflict
// return { status: 409, readStream: null, contentType, contentLength: null, ETag: null }; // Conflict
} else {
res.status(200).set('Content-Length', ContentLength).set('Content-Type', contentType).set('ETag', normalizeETag(ETag));
return pipeline(Body, res);
}
} catch (err) {
if (['NotFound', 'NoSuchKey'].includes(err.name)) {
return res.status(404).end(); // Not Found
// return next(Object.assign(new Error(`No file exists at path “${req.blobPath}”`), { status: 404 }));
} else if (err.name === 'PreconditionFailed') {
return res.status(412).end();
// return { status: 412 };
} else if (err.name === 'NotModified' || err.$metadata?.httpStatusCode === 304 || err.name === 304) {
return res.status(304).end();
} else {
return next(Object.assign(err, { status: 502 }));
}
}
}
);

router.delete('/:username/*',
async function (req, res, next) {
try {
const bucketName = req.params.username.toLowerCase();
const s3Path = posix.join(FILE_PREFIX, req.url.slice(1 + bucketName.length));
let currentETag;
try {
const headResponse = await s3client.send(new HeadObjectCommand({ Bucket: bucketName, Key: s3Path }));
if (headResponse.ContentType === 'application/x.remotestorage-ld+json') {
return res.status(409).end();
}
currentETag = normalizeETag(headResponse.ETag);

if (req.get('If-Match') && req.get('If-Match') !== currentETag) {
return res.status(412).end();
} else if (req.get('If-None-Match') && req.get('If-None-Match') === currentETag) {
return res.status(412).end();
}
/* const { DeleteMarker, VersionId } = */ await s3client.send(new DeleteObjectCommand({ Bucket: bucketName, Key: s3Path }));
} catch (err) {
if (['NotFound', 'NoSuchKey'].includes(err.name)) {
return res.status(404).end();
} else if (err.$metadata?.httpStatusCode === 400 || err.name === '400' || /\bBucket\b/.test(err.message)) {
return next(Object.assign(new ParameterError('A parameter value is bad', { cause: err }), { status: 400 }));
} else {
return next(Object.assign(err, { status: 502 }));
}
}

// updates all ancestor directories
let itemETag = null;
let itemPath = s3Path;
do {
let directory;
try {
directory = await readJson(bucketName, dirname(itemPath));
} catch (err) {
if (!['NotFound', 'NoSuchKey'].includes(err.name)) { return next(Object.assign(err, { status: 502 })); }
}
if (!(directory?.items instanceof Object)) {
directory = structuredClone(EMPTY_DIRECTORY);
// TODO: scan for existing blobs
}
if (typeof itemETag === 'string') { // item is folder
if (itemETag.length > 0) {
directory.items[basename(itemPath) + '/'] = { ETag: itemETag };
} else {
delete directory.items[basename(itemPath) + '/'];
}
} else {
delete directory.items[basename(itemPath)];
}
if (Array.from(Object.keys(directory.items)).length > 0) {
const dirJSON = JSON.stringify(directory);
await putBlob(bucketName, dirname(itemPath), 'application/x.remotestorage-ld+json', dirJSON.length, dirJSON);

if (dirname(itemPath) !== FILE_PREFIX) {
// calculates ETag for the folder
const hash = createHash('md5');
for (const itemMeta of Object.values(directory.items)) {
hash.update(itemMeta?.ETag?.replace(/^W\/"|^"|"$/g, '') || '');
}
itemETag = '"' + hash.digest('hex') + '"';
}
} else { // that was the last blob in the folder, so delete the folder
/* const { DeleteMarker, VersionId } = */ await s3client.send(new DeleteObjectCommand({ Bucket: bucketName, Key: dirname(itemPath) }));
itemETag = '';
}

itemPath = dirname(itemPath);
} while (itemPath.length > FILE_PREFIX.length);

if (currentETag) {
res.set('ETag', normalizeETag(currentETag));
}
res.status(204).end();
} catch (err) {
if (err.$metadata?.httpStatusCode === 400 || err.name === '400') {
return next(Object.assign(new ParameterError('A parameter value is bad', { cause: err }), { status: 400 }));
} else {
return next(Object.assign(err, { status: 502 }));
}
}
}
);

async function putBlob (bucketName, s3Path, contentType, contentLength, contentStream) {
if (contentLength <= 500_000_000) {
const putPrms = s3client.send(new PutObjectCommand(
{ Bucket: bucketName, Key: s3Path, Body: contentStream, ContentType: contentType, ContentLength: contentLength }));
const timeoutPrms = new Promise((_resolve, reject) =>
setTimeout(reject, PUT_TIMEOUT, new TimeoutError(`PUT of ${contentLength / 1_000_000} MB to ${bucketName} ${s3Path} took more than ${Math.round(PUT_TIMEOUT / 60_000)} minutes`)));
const putResponse = await Promise.race([putPrms, timeoutPrms]);
return normalizeETag(putResponse.ETag);
} else {
const parallelUpload = new Upload({
client: s3client,
params: { Bucket: bucketName, Key: s3Path, Body: contentStream, ContentType: contentType, ContentLength: contentLength }
});

parallelUpload.on('httpUploadProgress', (progress) => {
console.debug(bucketName, s3Path, `part ${progress.part} ${progress.loaded} / ${progress.total} bytes`);
});

return normalizeETag((await parallelUpload.done()).ETag);
}
}

async function readYaml (bucketName, s3Path) { // eslint-disable-line no-unused-vars
const { Body } = await s3client.send(new GetObjectCommand({ Bucket: bucketName, Key: s3Path }));
const string = (await Body.setEncoding('utf-8').toArray())[0];
return YAML.parse(string);
}

async function readJson (bucketName, s3Path) {
const { Body } = await s3client.send(new GetObjectCommand({ Bucket: bucketName, Key: s3Path }));
const string = (await Body.setEncoding('utf-8').toArray())[0];
return JSON.parse(string);
}

return router;
};
101 changes: 29 additions & 72 deletions lib/streaming_stores/S3.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const {
PutObjectCommand,
CreateBucketCommand,
DeleteBucketCommand,
GetObjectCommand, PutBucketVersioningCommand, DeleteObjectsCommand, ListObjectVersionsCommand,
GetObjectCommand, PutBucketVersioningCommand, ListObjectVersionsCommand,
DeleteObjectCommand, HeadObjectCommand
} = require('@aws-sdk/client-s3');
const { Upload } = require('@aws-sdk/lib-storage');
Expand All @@ -36,9 +36,9 @@ class S3 {
if (!endPoint.startsWith('http')) {
endPoint = (sslEnabled ? 'https://' : 'http://') + endPoint;
}
if (!/:\d{1,5}\/?$/.test(endPoint)) {
endPoint += ':9000';
}
// if (!/:\d{1,5}\/?$/.test(endPoint)) {
// endPoint += ':9000';
// }

this.#S3Client = new S3Client({
forcePathStyle: true,
Expand Down Expand Up @@ -107,90 +107,47 @@ class S3 {
* @returns {Promise<number>} number of files deleted
*/
async deleteUser (username) {
return new Promise((resolve, reject) => {
const DELETE_GROUP_SIZE = 100;
const objectVersions = [];
let numRequested = 0; let numResolved = 0; let isReceiveComplete = false;

const removeObjectVersions = async () => {
let group;
try {
if (objectVersions.length > 0) {
group = objectVersions.slice(0);
objectVersions.length = 0;
numRequested += group.length;
const { Errors } = await this.#S3Client.send(new DeleteObjectsCommand({ Bucket: username, Delete: { Objects: group } }));
numResolved += group.length;
if (Errors?.length > 0) {
getLogger().error('errors deleting object versions:', YAML.stringify(Errors));
}
}
} catch (err) {
if (err.name === 'NoSuchBucket') {
resolve(numResolved);
} else if (err.name === 'NotImplemented') { // OpenIO
getLogger().warning('while deleting object versions: ' + err);
for (const objectVersion of group) {
const { Errors } = await this.#S3Client.send(new DeleteObjectCommand({ Bucket: username, Key: objectVersion.Key, VersionId: objectVersion.VersionId }));
if (Errors?.length > 0) {
getLogger().error('errors deleting object version:', YAML.stringify(Errors));
}
++numResolved;
}
} else {
reject(Object.assign(new Error('while deleting object versions: ' + err), { cause: err }));
}
}
try {
if (isReceiveComplete && numResolved === numRequested) {
// will fail if any object versions remain
await this.#S3Client.send(new DeleteBucketCommand({ Bucket: username }));
resolve(numResolved);
}
} catch (err) {
if (err.name === 'NoSuchBucket') {
resolve(numResolved);
} else {
reject(new Error('while deleting bucket: ' + err));
await new Promise((resolve, reject) => {
const bucketName = username.toLowerCase();

const deleteItems = async items => {
for (const item of items) {
try {
/* const { DeleteMarker } = */ await this.#S3Client.send(new DeleteObjectCommand({ Bucket: bucketName, Key: item.Key, VersionId: item.VersionId }));
// console.log(`deleted ${item.Key} ${DeleteMarker}`);
} catch (err) {
console.warn('while deleting', bucketName, item.Key, item.VersionID);
}
}
};

const removeObjectVersionsAndBucket = async err => {
const pageObjectVersions = async (KeyMarker) => {
try {
isReceiveComplete = true;
await removeObjectVersions();
if (err) {
reject(err);
}
} catch (err2) {
reject(err || err2);
}
};
const { Versions, DeleteMarkers, IsTruncated, NextKeyMarker } = await this.#S3Client.send(new ListObjectVersionsCommand({ Bucket: bucketName, ...(KeyMarker ? { KeyMarker } : null) }));

let keyMarker = null;
const pageVersions = async () => {
try {
const { Versions, IsTruncated, NextKeyMarker } = await this.#S3Client.send(new ListObjectVersionsCommand({ Bucket: username, KeyMarker: keyMarker /*, MaxKeys: DELETE_GROUP_SIZE */ }));
keyMarker = NextKeyMarker;
objectVersions.push(...Versions);
isReceiveComplete = !IsTruncated;
if (objectVersions.length >= DELETE_GROUP_SIZE || !IsTruncated) {
await removeObjectVersions();
if (typeof Versions?.[Symbol.iterator] === 'function') {
await deleteItems(Versions);
}
if (typeof DeleteMarkers?.[Symbol.iterator] === 'function') {
await deleteItems(DeleteMarkers);
}

if (IsTruncated) {
return pageVersions();
return pageObjectVersions(NextKeyMarker).catch(reject);
} else {
await this.#S3Client.send(new DeleteBucketCommand({ Bucket: username }));
resolve();
}
} catch (err) {
if (err.name === 'NoSuchBucket') {
resolve(numResolved);
resolve();
} else {
return removeObjectVersionsAndBucket(err);
reject(err);
}
}
};

pageVersions();
pageObjectVersions(undefined).catch(reject);
});
}

Expand Down
8 changes: 6 additions & 2 deletions notes/S3 streaming store.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ Streaming Stores can only be used with the modular server.

You should be able to connect to any S3-compatible service that supports versioning. Tested services include:

Tested implementations:
Tested working implementations:

* AWS S3

Tested working with caveats

* OpenIO

Expand All @@ -13,7 +17,7 @@ Incompatible implementations:
* min.io (both self-hosted and cloud)


Configure the store by passing to the constructor the endpoint (host name, and port if not 9000), access key (admin user name) and secret key (password). (If you don't pass any arguments, S3 will use the public account on `play.min.io`, where the files can be **read, altered and deleted** by anyone in the world.) If you're using a AWS and a region other than `us-east-1`, include that as a fourth argument. You can provide these however you like, but typically they are stored in these environment variables:
Configure the store by passing to the constructor the endpoint (host name, and port if not 9000), access key (admin user name) and secret key (password). (If you don't pass any arguments, S3 will use the public account on `play.min.io`, where the files can be **read, altered and deleted** by anyone in the world. It's also incompatible.) If you're using a AWS and a region other than `us-east-1`, include that as a fourth argument. You can provide these however you like, but typically they are stored in these environment variables:

* S3_ENDPOINT
* S3_ACCESS_KEY
Expand Down
Loading

0 comments on commit de89258

Please sign in to comment.