diff --git a/changelog/unreleased/extract-uploadsessionlister-interface.md b/changelog/unreleased/extract-uploadsessionlister-interface.md new file mode 100644 index 0000000000..db393b2253 --- /dev/null +++ b/changelog/unreleased/extract-uploadsessionlister-interface.md @@ -0,0 +1,5 @@ +Enhancement: Introduce UploadSessionLister interface + +We introduced a new UploadSessionLister interface that allows better control of upload sessions. Upload sessions include the processing state and can be used to filter and purge the list of currently ongoing upload sessions. + +https://github.com/cs3org/reva/pull/4375 diff --git a/pkg/rhttp/datatx/manager/tus/tus.go b/pkg/rhttp/datatx/manager/tus/tus.go index f98ce8adb1..a46bd17739 100644 --- a/pkg/rhttp/datatx/manager/tus/tus.go +++ b/pkg/rhttp/datatx/manager/tus/tus.go @@ -23,13 +23,11 @@ import ( "log" "net/http" "path" - "path/filepath" "time" "github.com/pkg/errors" tusd "github.com/tus/tusd/pkg/handler" - userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/net" "github.com/cs3org/reva/v2/pkg/appctx" @@ -40,8 +38,8 @@ import ( "github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/cache" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload" "github.com/cs3org/reva/v2/pkg/storagespace" - "github.com/cs3org/reva/v2/pkg/utils" "github.com/mitchellh/mapstructure" ) @@ -103,33 +101,27 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { return nil, err } - go func() { - for { - ev := <-handler.CompleteUploads - info := ev.Upload - spaceOwner := &userv1beta1.UserId{ - OpaqueId: info.Storage["SpaceOwnerOrManager"], - } - owner := &userv1beta1.UserId{ - Idp: info.Storage["Idp"], - OpaqueId: info.Storage["UserId"], - } - ref := &provider.Reference{ - ResourceId: &provider.ResourceId{ - StorageId: info.MetaData["providerID"], - SpaceId: info.Storage["SpaceRoot"], - OpaqueId: info.Storage["SpaceRoot"], - }, - Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])), - } - datatx.InvalidateCache(owner, ref, m.statCache) - if m.publisher != nil { - if err := datatx.EmitFileUploadedEvent(spaceOwner, owner, ref, m.publisher); err != nil { - appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event") + if _, ok := fs.(storage.UploadSessionLister); ok { + // We can currently only send updates if the fs is decomposedfs as we read very specific keys from the storage map of the tus info + go func() { + for { + ev := <-handler.CompleteUploads + // We should be able to get the upload progress with fs.GetUploadProgress, but currently tus will erase the info files + // so we create a Progress instance here that is used to read the correct properties + up := upload.Progress{ + Info: ev.Upload, + } + executant := up.Executant() + ref := up.Reference() + datatx.InvalidateCache(&executant, &ref, m.statCache) + if m.publisher != nil { + if err := datatx.EmitFileUploadedEvent(up.SpaceOwner(), &executant, &ref, m.publisher); err != nil { + appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event") + } } } - } - }() + }() + } h := handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { method := r.Method diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index bc73db0fdc..70ceb5134a 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -23,16 +23,10 @@ import ( "io" "net/url" - tusd "github.com/tus/tusd/pkg/handler" - - userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1" ) -// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished -type UploadFinishedFunc func(spaceOwner, owner *userpb.UserId, ref *provider.Reference) - // FS is the interface to implement access to the storage. type FS interface { GetHome(ctx context.Context) (string, error) @@ -77,12 +71,6 @@ type FS interface { DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorageSpaceRequest) error } -// UploadsManager defines the interface for FS implementations that allow for managing uploads -type UploadsManager interface { - ListUploads() ([]tusd.FileInfo, error) - PurgeExpiredUploads(chan<- tusd.FileInfo) error -} - // Registry is the interface that storage registries implement // for discovering storage providers type Registry interface { @@ -98,9 +86,3 @@ type PathWrapper interface { Unwrap(ctx context.Context, rp string) (string, error) Wrap(ctx context.Context, rp string) (string, error) } - -type UploadRequest struct { - Ref *provider.Reference - Body io.ReadCloser - Length int64 -} diff --git a/pkg/storage/uploads.go b/pkg/storage/uploads.go new file mode 100644 index 0000000000..87d26115bf --- /dev/null +++ b/pkg/storage/uploads.go @@ -0,0 +1,86 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package storage + +import ( + "context" + "io" + "time" + + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + tusd "github.com/tus/tusd/pkg/handler" +) + +// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished +type UploadFinishedFunc func(spaceOwner, executant *userpb.UserId, ref *provider.Reference) + +// UploadRequest us used in FS.Upload() to carry required upload metadata +type UploadRequest struct { + Ref *provider.Reference + Body io.ReadCloser + Length int64 +} + +// UploadsManager defines the interface for storage drivers that allow for managing uploads +// Deprecated: No longer used. Storage drivers should implement the UploadSessionLister. +type UploadsManager interface { + ListUploads() ([]tusd.FileInfo, error) + PurgeExpiredUploads(chan<- tusd.FileInfo) error +} + +// UploadSessionLister defines the interface for FS implementations that allow listing and purging upload sessions +type UploadSessionLister interface { + // ListUploadSessions returns the upload sessions matching the given filter + ListUploadSessions(ctx context.Context, filter UploadSessionFilter) ([]UploadSession, error) +} + +// UploadSession is the interface that storage drivers need to return whan listing upload sessions. +type UploadSession interface { + // ID returns the upload id + ID() string + // Filename returns the filename of the file + Filename() string + // Size returns the size of the upload + Size() int64 + // Offset returns the current offset + Offset() int64 + // Reference returns a reference for the file being uploaded. May be absolute id based or relative to e.g. a space root + Reference() provider.Reference + // Executant returns the userid of the user that created the upload + Executant() userpb.UserId + // SpaceOwner returns the owner of a space if set. optional + SpaceOwner() *userpb.UserId + // Expires returns the time when the upload can no longer be used + Expires() time.Time + + // IsProcessing returns true if postprocessing has not finished, yet + // The actual postprocessing state is tracked in the postprocessing service. + IsProcessing() bool + + // Purge allows completely removing an upload. Should emit a PostprocessingFinished event with a Delete outcome + Purge(ctx context.Context) error +} + +// UploadSessionFilter can be used to filter upload sessions +type UploadSessionFilter struct { + ID *string + Processing *bool + Expired *bool +} diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 40cb53b073..59ef12a4e1 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -20,10 +20,10 @@ package decomposedfs import ( "context" + "fmt" "os" "path/filepath" "regexp" - "strconv" "strings" "time" @@ -96,14 +96,14 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u }, Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])), } - owner, ok := ctxpkg.ContextGetUser(uploadInfo.Ctx) + executant, ok := ctxpkg.ContextGetUser(uploadInfo.Ctx) if !ok { return provider.ResourceInfo{}, errtypes.PreconditionFailed("error getting user from uploadinfo context") } spaceOwner := &userpb.UserId{ OpaqueId: info.Storage["SpaceOwnerOrManager"], } - uff(spaceOwner, owner.Id, uploadRef) + uff(spaceOwner, executant.Id, uploadRef) } ri := provider.ResourceInfo{ @@ -243,36 +243,42 @@ func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, return upload.Get(ctx, id, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) } -// ListUploads returns a list of all incomplete uploads -func (fs *Decomposedfs) ListUploads() ([]tusd.FileInfo, error) { - return fs.uploadInfos(context.Background()) -} - -// PurgeExpiredUploads scans the fs for expired downloads and removes any leftovers -func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- tusd.FileInfo) error { - infos, err := fs.uploadInfos(context.Background()) - if err != nil { - return err - } - - for _, info := range infos { - expires, err := strconv.Atoi(info.MetaData["expires"]) +// ListUploadSessions returns the upload sessions for the given filter +func (fs *Decomposedfs) ListUploadSessions(ctx context.Context, filter storage.UploadSessionFilter) ([]storage.UploadSession, error) { + var sessions []storage.UploadSession + if filter.ID != nil && *filter.ID != "" { + session, err := fs.getUploadSession(ctx, filepath.Join(fs.o.Root, "uploads", *filter.ID+".info")) + if err != nil { + return nil, err + } + sessions = []storage.UploadSession{session} + } else { + var err error + sessions, err = fs.uploadSessions(ctx) if err != nil { + return nil, err + } + } + filteredSessions := []storage.UploadSession{} + now := time.Now() + for _, session := range sessions { + if filter.Processing != nil && *filter.Processing != session.IsProcessing() { continue } - if int64(expires) < time.Now().Unix() { - purgedChan <- info - err = os.Remove(info.Storage["BinPath"]) - if err != nil { - return err - } - err = os.Remove(filepath.Join(fs.o.Root, "uploads", info.ID+".info")) - if err != nil { - return err + if filter.Expired != nil { + if *filter.Expired { + if now.Before(session.Expires()) { + continue + } + } else { + if now.After(session.Expires()) { + continue + } } } + filteredSessions = append(filteredSessions, session) } - return nil + return filteredSessions, nil } // AsTerminatableUpload returns a TerminatableUpload @@ -296,28 +302,47 @@ func (fs *Decomposedfs) AsConcatableUpload(up tusd.Upload) tusd.ConcatableUpload return up.(*upload.Upload) } -func (fs *Decomposedfs) uploadInfos(ctx context.Context) ([]tusd.FileInfo, error) { - infos := []tusd.FileInfo{} +func (fs *Decomposedfs) uploadSessions(ctx context.Context) ([]storage.UploadSession, error) { + uploads := []storage.UploadSession{} infoFiles, err := filepath.Glob(filepath.Join(fs.o.Root, "uploads", "*.info")) if err != nil { return nil, err } for _, info := range infoFiles { - match := _idRegexp.FindStringSubmatch(info) - if match == nil || len(match) < 2 { - continue - } - up, err := fs.GetUpload(ctx, match[1]) - if err != nil { - return nil, err - } - info, err := up.GetInfo(context.Background()) + progress, err := fs.getUploadSession(ctx, info) if err != nil { - return nil, err + appctx.GetLogger(ctx).Error().Interface("path", info).Msg("Decomposedfs: could not getUploadSession") + continue } - infos = append(infos, info) + uploads = append(uploads, progress) + } + return uploads, nil +} + +func (fs *Decomposedfs) getUploadSession(ctx context.Context, path string) (storage.UploadSession, error) { + match := _idRegexp.FindStringSubmatch(path) + if match == nil || len(match) < 2 { + return nil, fmt.Errorf("invalid upload path") + } + up, err := fs.GetUpload(ctx, match[1]) + if err != nil { + return nil, err + } + info, err := up.GetInfo(context.Background()) + if err != nil { + return nil, err + } + // upload processing state is stored in the node, for decomposedfs the NodeId is always set by InitiateUpload + n, err := node.ReadNode(ctx, fs.lu, info.Storage["SpaceRoot"], info.Storage["NodeId"], true, nil, true) + if err != nil { + return nil, err + } + progress := upload.Progress{ + Path: path, + Info: info, + Processing: n.IsProcessing(ctx), } - return infos, nil + return progress, nil } diff --git a/pkg/storage/utils/decomposedfs/upload/processing.go b/pkg/storage/utils/decomposedfs/upload/processing.go index 462716562e..a025c54ed7 100644 --- a/pkg/storage/utils/decomposedfs/upload/processing.go +++ b/pkg/storage/utils/decomposedfs/upload/processing.go @@ -21,6 +21,7 @@ package upload import ( "context" "encoding/json" + stderrors "errors" "fmt" iofs "io/fs" "os" @@ -497,3 +498,85 @@ func lookupNode(ctx context.Context, spaceRoot *node.Node, path string, lu *look } return n, nil } + +// Progress adapts the persisted upload metadata for the UploadSessionLister interface +type Progress struct { + Path string + Info tusd.FileInfo + Processing bool +} + +// ID implements the storage.UploadSession interface +func (p Progress) ID() string { + return p.Info.ID +} + +// Filename implements the storage.UploadSession interface +func (p Progress) Filename() string { + return p.Info.MetaData["filename"] +} + +// Size implements the storage.UploadSession interface +func (p Progress) Size() int64 { + return p.Info.Size +} + +// Offset implements the storage.UploadSession interface +func (p Progress) Offset() int64 { + return p.Info.Offset +} + +// Reference implements the storage.UploadSession interface +func (p Progress) Reference() provider.Reference { + return provider.Reference{ + ResourceId: &provider.ResourceId{ + StorageId: p.Info.MetaData["providerID"], + SpaceId: p.Info.Storage["SpaceRoot"], + OpaqueId: p.Info.Storage["NodeId"], // Node id is always set in InitiateUpload + }, + } +} + +// Executant implements the storage.UploadSession interface +func (p Progress) Executant() userpb.UserId { + return userpb.UserId{ + Idp: p.Info.Storage["Idp"], + OpaqueId: p.Info.Storage["UserId"], + Type: utils.UserTypeMap(p.Info.Storage["UserType"]), + } +} + +// SpaceOwner implements the storage.UploadSession interface +func (p Progress) SpaceOwner() *userpb.UserId { + return &userpb.UserId{ + // idp and type do not seem to be consumed and the node currently only stores the user id anyway + OpaqueId: p.Info.Storage["SpaceOwnerOrManager"], + } +} + +// Expires implements the storage.UploadSession interface +func (p Progress) Expires() time.Time { + mt, _ := utils.MTimeToTime(p.Info.MetaData["expires"]) + return mt +} + +// IsProcessing implements the storage.UploadSession interface +func (p Progress) IsProcessing() bool { + return p.Processing +} + +// Purge implements the storage.UploadSession interface +func (p Progress) Purge(ctx context.Context) error { + berr := os.Remove(p.Info.Storage["BinPath"]) + if berr != nil { + appctx.GetLogger(ctx).Error().Str("id", p.Info.ID).Interface("path", p.Info.Storage["BinPath"]).Msg("Decomposedfs: could not purge bin path for upload session") + } + + // remove upload metadata + merr := os.Remove(p.Path) + if merr != nil { + appctx.GetLogger(ctx).Error().Str("id", p.Info.ID).Interface("path", p.Path).Msg("Decomposedfs: could not purge metadata path for upload session") + } + + return stderrors.Join(berr, merr) +}