Skip to content

Commit

Permalink
Merge pull request #4375 from butonic/exctract-uploadmetadata-interface
Browse files Browse the repository at this point in the history
extract UploadSessionLister interface
  • Loading branch information
butonic authored Dec 1, 2023
2 parents 96036e1 + 4db6635 commit a389ddc
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 87 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/extract-uploadsessionlister-interface.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Introduce UploadSessionLister interface

We introduced a new UploadSessionLister interface that allows better control of upload sessions. Upload sessions include the processing state and can be used to filter and purge the list of currently ongoing upload sessions.

https://github.com/cs3org/reva/pull/4375
48 changes: 20 additions & 28 deletions pkg/rhttp/datatx/manager/tus/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ import (
"log"
"net/http"
"path"
"path/filepath"
"time"

"github.com/pkg/errors"
tusd "github.com/tus/tusd/pkg/handler"

userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/net"
"github.com/cs3org/reva/v2/pkg/appctx"
Expand All @@ -40,8 +38,8 @@ import (
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/cache"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/mitchellh/mapstructure"
)

Expand Down Expand Up @@ -103,33 +101,27 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
return nil, err
}

go func() {
for {
ev := <-handler.CompleteUploads
info := ev.Upload
spaceOwner := &userv1beta1.UserId{
OpaqueId: info.Storage["SpaceOwnerOrManager"],
}
owner := &userv1beta1.UserId{
Idp: info.Storage["Idp"],
OpaqueId: info.Storage["UserId"],
}
ref := &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: info.MetaData["providerID"],
SpaceId: info.Storage["SpaceRoot"],
OpaqueId: info.Storage["SpaceRoot"],
},
Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])),
}
datatx.InvalidateCache(owner, ref, m.statCache)
if m.publisher != nil {
if err := datatx.EmitFileUploadedEvent(spaceOwner, owner, ref, m.publisher); err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event")
if _, ok := fs.(storage.UploadSessionLister); ok {
// We can currently only send updates if the fs is decomposedfs as we read very specific keys from the storage map of the tus info
go func() {
for {
ev := <-handler.CompleteUploads
// We should be able to get the upload progress with fs.GetUploadProgress, but currently tus will erase the info files
// so we create a Progress instance here that is used to read the correct properties
up := upload.Progress{
Info: ev.Upload,
}
executant := up.Executant()
ref := up.Reference()
datatx.InvalidateCache(&executant, &ref, m.statCache)
if m.publisher != nil {
if err := datatx.EmitFileUploadedEvent(up.SpaceOwner(), &executant, &ref, m.publisher); err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event")
}
}
}
}
}()
}()
}

h := handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
method := r.Method
Expand Down
18 changes: 0 additions & 18 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,10 @@ import (
"io"
"net/url"

tusd "github.com/tus/tusd/pkg/handler"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1"
)

// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished
type UploadFinishedFunc func(spaceOwner, owner *userpb.UserId, ref *provider.Reference)

// FS is the interface to implement access to the storage.
type FS interface {
GetHome(ctx context.Context) (string, error)
Expand Down Expand Up @@ -77,12 +71,6 @@ type FS interface {
DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorageSpaceRequest) error
}

// UploadsManager defines the interface for FS implementations that allow for managing uploads
type UploadsManager interface {
ListUploads() ([]tusd.FileInfo, error)
PurgeExpiredUploads(chan<- tusd.FileInfo) error
}

// Registry is the interface that storage registries implement
// for discovering storage providers
type Registry interface {
Expand All @@ -98,9 +86,3 @@ type PathWrapper interface {
Unwrap(ctx context.Context, rp string) (string, error)
Wrap(ctx context.Context, rp string) (string, error)
}

type UploadRequest struct {
Ref *provider.Reference
Body io.ReadCloser
Length int64
}
86 changes: 86 additions & 0 deletions pkg/storage/uploads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package storage

import (
"context"
"io"
"time"

userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
tusd "github.com/tus/tusd/pkg/handler"
)

// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished
type UploadFinishedFunc func(spaceOwner, executant *userpb.UserId, ref *provider.Reference)

// UploadRequest us used in FS.Upload() to carry required upload metadata
type UploadRequest struct {
Ref *provider.Reference
Body io.ReadCloser
Length int64
}

// UploadsManager defines the interface for storage drivers that allow for managing uploads
// Deprecated: No longer used. Storage drivers should implement the UploadSessionLister.
type UploadsManager interface {
ListUploads() ([]tusd.FileInfo, error)
PurgeExpiredUploads(chan<- tusd.FileInfo) error
}

// UploadSessionLister defines the interface for FS implementations that allow listing and purging upload sessions
type UploadSessionLister interface {
// ListUploadSessions returns the upload sessions matching the given filter
ListUploadSessions(ctx context.Context, filter UploadSessionFilter) ([]UploadSession, error)
}

// UploadSession is the interface that storage drivers need to return whan listing upload sessions.
type UploadSession interface {
// ID returns the upload id
ID() string
// Filename returns the filename of the file
Filename() string
// Size returns the size of the upload
Size() int64
// Offset returns the current offset
Offset() int64
// Reference returns a reference for the file being uploaded. May be absolute id based or relative to e.g. a space root
Reference() provider.Reference
// Executant returns the userid of the user that created the upload
Executant() userpb.UserId
// SpaceOwner returns the owner of a space if set. optional
SpaceOwner() *userpb.UserId
// Expires returns the time when the upload can no longer be used
Expires() time.Time

// IsProcessing returns true if postprocessing has not finished, yet
// The actual postprocessing state is tracked in the postprocessing service.
IsProcessing() bool

// Purge allows completely removing an upload. Should emit a PostprocessingFinished event with a Delete outcome
Purge(ctx context.Context) error
}

// UploadSessionFilter can be used to filter upload sessions
type UploadSessionFilter struct {
ID *string
Processing *bool
Expired *bool
}
107 changes: 66 additions & 41 deletions pkg/storage/utils/decomposedfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ package decomposedfs

import (
"context"
"fmt"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -96,14 +96,14 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u
},
Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])),
}
owner, ok := ctxpkg.ContextGetUser(uploadInfo.Ctx)
executant, ok := ctxpkg.ContextGetUser(uploadInfo.Ctx)
if !ok {
return provider.ResourceInfo{}, errtypes.PreconditionFailed("error getting user from uploadinfo context")
}
spaceOwner := &userpb.UserId{
OpaqueId: info.Storage["SpaceOwnerOrManager"],
}
uff(spaceOwner, owner.Id, uploadRef)
uff(spaceOwner, executant.Id, uploadRef)
}

ri := provider.ResourceInfo{
Expand Down Expand Up @@ -243,36 +243,42 @@ func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload,
return upload.Get(ctx, id, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens)
}

// ListUploads returns a list of all incomplete uploads
func (fs *Decomposedfs) ListUploads() ([]tusd.FileInfo, error) {
return fs.uploadInfos(context.Background())
}

// PurgeExpiredUploads scans the fs for expired downloads and removes any leftovers
func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- tusd.FileInfo) error {
infos, err := fs.uploadInfos(context.Background())
if err != nil {
return err
}

for _, info := range infos {
expires, err := strconv.Atoi(info.MetaData["expires"])
// ListUploadSessions returns the upload sessions for the given filter
func (fs *Decomposedfs) ListUploadSessions(ctx context.Context, filter storage.UploadSessionFilter) ([]storage.UploadSession, error) {
var sessions []storage.UploadSession
if filter.ID != nil && *filter.ID != "" {
session, err := fs.getUploadSession(ctx, filepath.Join(fs.o.Root, "uploads", *filter.ID+".info"))
if err != nil {
return nil, err
}
sessions = []storage.UploadSession{session}
} else {
var err error
sessions, err = fs.uploadSessions(ctx)
if err != nil {
return nil, err
}
}
filteredSessions := []storage.UploadSession{}
now := time.Now()
for _, session := range sessions {
if filter.Processing != nil && *filter.Processing != session.IsProcessing() {
continue
}
if int64(expires) < time.Now().Unix() {
purgedChan <- info
err = os.Remove(info.Storage["BinPath"])
if err != nil {
return err
}
err = os.Remove(filepath.Join(fs.o.Root, "uploads", info.ID+".info"))
if err != nil {
return err
if filter.Expired != nil {
if *filter.Expired {
if now.Before(session.Expires()) {
continue
}
} else {
if now.After(session.Expires()) {
continue
}
}
}
filteredSessions = append(filteredSessions, session)
}
return nil
return filteredSessions, nil
}

// AsTerminatableUpload returns a TerminatableUpload
Expand All @@ -296,28 +302,47 @@ func (fs *Decomposedfs) AsConcatableUpload(up tusd.Upload) tusd.ConcatableUpload
return up.(*upload.Upload)
}

func (fs *Decomposedfs) uploadInfos(ctx context.Context) ([]tusd.FileInfo, error) {
infos := []tusd.FileInfo{}
func (fs *Decomposedfs) uploadSessions(ctx context.Context) ([]storage.UploadSession, error) {
uploads := []storage.UploadSession{}
infoFiles, err := filepath.Glob(filepath.Join(fs.o.Root, "uploads", "*.info"))
if err != nil {
return nil, err
}

for _, info := range infoFiles {
match := _idRegexp.FindStringSubmatch(info)
if match == nil || len(match) < 2 {
continue
}
up, err := fs.GetUpload(ctx, match[1])
if err != nil {
return nil, err
}
info, err := up.GetInfo(context.Background())
progress, err := fs.getUploadSession(ctx, info)
if err != nil {
return nil, err
appctx.GetLogger(ctx).Error().Interface("path", info).Msg("Decomposedfs: could not getUploadSession")
continue
}

infos = append(infos, info)
uploads = append(uploads, progress)
}
return uploads, nil
}

func (fs *Decomposedfs) getUploadSession(ctx context.Context, path string) (storage.UploadSession, error) {
match := _idRegexp.FindStringSubmatch(path)
if match == nil || len(match) < 2 {
return nil, fmt.Errorf("invalid upload path")
}
up, err := fs.GetUpload(ctx, match[1])
if err != nil {
return nil, err
}
info, err := up.GetInfo(context.Background())
if err != nil {
return nil, err
}
// upload processing state is stored in the node, for decomposedfs the NodeId is always set by InitiateUpload
n, err := node.ReadNode(ctx, fs.lu, info.Storage["SpaceRoot"], info.Storage["NodeId"], true, nil, true)
if err != nil {
return nil, err
}
progress := upload.Progress{
Path: path,
Info: info,
Processing: n.IsProcessing(ctx),
}
return infos, nil
return progress, nil
}
Loading

0 comments on commit a389ddc

Please sign in to comment.