Skip to content

Commit

Permalink
Merge pull request #4699 from kobergj/ListBlobstores
Browse files Browse the repository at this point in the history
Export Path method in blobstores
  • Loading branch information
kobergj authored May 31, 2024
2 parents 11d7ad4 + e5e3f3d commit 0a7d957
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 40 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/add-path-method-to-blobstore.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Add a Path method to blobstore

Add a method to get the path of a blob to the ocis and s3ng blobstores.

https://github.com/cs3org/reva/pull/4699
53 changes: 25 additions & 28 deletions pkg/storage/fs/ocis/blobstore/blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (
"github.com/pkg/errors"
)

// ErrBlobIDEmpty is returned when the BlobID is empty
var ErrBlobIDEmpty = fmt.Errorf("blobstore: BlobID is empty")

// Blobstore provides an interface to an filesystem based blobstore
type Blobstore struct {
root string
Expand All @@ -51,10 +54,12 @@ func New(root string) (*Blobstore, error) {

// Upload stores some data in the blobstore under the given key
func (bs *Blobstore) Upload(node *node.Node, source string) error {
dest, err := bs.path(node)
if err != nil {
return err
if node.BlobID == "" {
return ErrBlobIDEmpty
}

dest := bs.Path(node)

// ensure parent path exists
if err := os.MkdirAll(filepath.Dir(dest), 0700); err != nil {
return errors.Wrap(err, "Decomposedfs: oCIS blobstore: error creating parent folders for blob")
Expand Down Expand Up @@ -87,10 +92,11 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error {

// Download retrieves a blob from the blobstore for reading
func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) {
dest, err := bs.path(node)
if err != nil {
return nil, err
if node.BlobID == "" {
return nil, ErrBlobIDEmpty
}

dest := bs.Path(node)
file, err := os.Open(dest)
if err != nil {
return nil, errors.Wrapf(err, "could not read blob '%s'", dest)
Expand All @@ -100,48 +106,39 @@ func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) {

// Delete deletes a blob from the blobstore
func (bs *Blobstore) Delete(node *node.Node) error {
dest, err := bs.path(node)
if err != nil {
return err
if node.BlobID == "" {
return ErrBlobIDEmpty
}
dest := bs.Path(node)
if err := utils.RemoveItem(dest); err != nil {
return errors.Wrapf(err, "could not delete blob '%s'", dest)
}
return nil
}

// List lists all blobs in the Blobstore
func (bs *Blobstore) List() ([]string, error) {
func (bs *Blobstore) List() ([]*node.Node, error) {
dirs, err := filepath.Glob(filepath.Join(bs.root, "spaces", "*", "*", "blobs", "*", "*", "*", "*", "*"))
if err != nil {
return nil, err
}
blobids := make([]string, 0, len(dirs))
blobids := make([]*node.Node, 0, len(dirs))
for _, d := range dirs {
seps := strings.Split(d, "/")
var b string
var now bool
for _, s := range seps {
if now {
b += s
}
if s == "blobs" {
now = true
}
}
blobids = append(blobids, b)
_, s, _ := strings.Cut(d, "spaces")
spaceraw, blobraw, _ := strings.Cut(s, "blobs")
blobids = append(blobids, &node.Node{
SpaceID: strings.ReplaceAll(spaceraw, "/", ""),
BlobID: strings.ReplaceAll(blobraw, "/", ""),
})
}
return blobids, nil
}

func (bs *Blobstore) path(node *node.Node) (string, error) {
if node.BlobID == "" {
return "", fmt.Errorf("blobstore: BlobID is empty")
}
func (bs *Blobstore) Path(node *node.Node) string {
return filepath.Join(
bs.root,
filepath.Clean(filepath.Join(
"/", "spaces", lookup.Pathify(node.SpaceID, 1, 2), "blobs", lookup.Pathify(node.BlobID, 4, 2)),
),
), nil
)
}
27 changes: 15 additions & 12 deletions pkg/storage/fs/s3ng/blobstore/blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error {
}
defer reader.Close()

_, err = bs.client.PutObject(context.Background(), bs.bucket, bs.path(node), reader, node.Blobsize, minio.PutObjectOptions{
_, err = bs.client.PutObject(context.Background(), bs.bucket, bs.Path(node), reader, node.Blobsize, minio.PutObjectOptions{
ContentType: "application/octet-stream",
SendContentMd5: bs.defaultPutOptions.SendContentMd5,
ConcurrentStreamParts: bs.defaultPutOptions.ConcurrentStreamParts,
Expand All @@ -95,21 +95,21 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error {
})

if err != nil {
return errors.Wrapf(err, "could not store object '%s' into bucket '%s'", bs.path(node), bs.bucket)
return errors.Wrapf(err, "could not store object '%s' into bucket '%s'", bs.Path(node), bs.bucket)
}
return nil
}

// Download retrieves a blob from the blobstore for reading
func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) {
reader, err := bs.client.GetObject(context.Background(), bs.bucket, bs.path(node), minio.GetObjectOptions{})
reader, err := bs.client.GetObject(context.Background(), bs.bucket, bs.Path(node), minio.GetObjectOptions{})
if err != nil {
return nil, errors.Wrapf(err, "could not download object '%s' from bucket '%s'", bs.path(node), bs.bucket)
return nil, errors.Wrapf(err, "could not download object '%s' from bucket '%s'", bs.Path(node), bs.bucket)
}

stat, err := reader.Stat()
if err != nil {
return nil, errors.Wrapf(err, "blob path: %s", bs.path(node))
return nil, errors.Wrapf(err, "blob path: %s", bs.Path(node))
}

if node.Blobsize != stat.Size {
Expand All @@ -121,31 +121,34 @@ func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) {

// Delete deletes a blob from the blobstore
func (bs *Blobstore) Delete(node *node.Node) error {
err := bs.client.RemoveObject(context.Background(), bs.bucket, bs.path(node), minio.RemoveObjectOptions{})
err := bs.client.RemoveObject(context.Background(), bs.bucket, bs.Path(node), minio.RemoveObjectOptions{})
if err != nil {
return errors.Wrapf(err, "could not delete object '%s' from bucket '%s'", bs.path(node), bs.bucket)
return errors.Wrapf(err, "could not delete object '%s' from bucket '%s'", bs.Path(node), bs.bucket)
}
return nil
}

// List lists all blobs in the Blobstore
func (bs *Blobstore) List() ([]string, error) {
func (bs *Blobstore) List() ([]*node.Node, error) {
ch := bs.client.ListObjects(context.Background(), bs.bucket, minio.ListObjectsOptions{Recursive: true})

var err error
ids := make([]string, 0)
ids := make([]*node.Node, 0)
for oi := range ch {
if oi.Err != nil {
err = oi.Err
continue
}
_, blobid, _ := strings.Cut(oi.Key, "/")
ids = append(ids, strings.ReplaceAll(blobid, "/", ""))
spaceid, blobid, _ := strings.Cut(oi.Key, "/")
ids = append(ids, &node.Node{
SpaceID: strings.ReplaceAll(spaceid, "/", ""),
BlobID: strings.ReplaceAll(blobid, "/", ""),
})
}
return ids, err
}

func (bs *Blobstore) path(node *node.Node) string {
func (bs *Blobstore) Path(node *node.Node) string {
// https://aws.amazon.com/de/premiumsupport/knowledge-center/s3-prefix-nested-folders-difference/
// Prefixes are used to partion a bucket. A prefix is everything except the filename.
// For a file `BucketName/foo/bar/lorem.ipsum`, `BucketName/foo/bar/` is the prefix.
Expand Down

0 comments on commit 0a7d957

Please sign in to comment.