From 0587a113e16993be17b9c1a4a6678b7a88c4d52c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Wed, 29 Nov 2023 14:51:19 +0100 Subject: [PATCH 1/6] extract upload manager interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/rhttp/datatx/manager/tus/tus.go | 49 +++++------ pkg/storage/storage.go | 37 +++++++-- pkg/storage/utils/decomposedfs/upload.go | 83 ++++++++++--------- .../utils/decomposedfs/upload/processing.go | 55 ++++++++++++ 4 files changed, 154 insertions(+), 70 deletions(-) diff --git a/pkg/rhttp/datatx/manager/tus/tus.go b/pkg/rhttp/datatx/manager/tus/tus.go index f98ce8adb1..e868569a31 100644 --- a/pkg/rhttp/datatx/manager/tus/tus.go +++ b/pkg/rhttp/datatx/manager/tus/tus.go @@ -23,13 +23,11 @@ import ( "log" "net/http" "path" - "path/filepath" "time" "github.com/pkg/errors" tusd "github.com/tus/tusd/pkg/handler" - userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/net" "github.com/cs3org/reva/v2/pkg/appctx" @@ -40,8 +38,8 @@ import ( "github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/cache" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload" "github.com/cs3org/reva/v2/pkg/storagespace" - "github.com/cs3org/reva/v2/pkg/utils" "github.com/mitchellh/mapstructure" ) @@ -103,33 +101,27 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { return nil, err } - go func() { - for { - ev := <-handler.CompleteUploads - info := ev.Upload - spaceOwner := &userv1beta1.UserId{ - OpaqueId: info.Storage["SpaceOwnerOrManager"], - } - owner := &userv1beta1.UserId{ - Idp: info.Storage["Idp"], - OpaqueId: info.Storage["UserId"], - } - ref := &provider.Reference{ - ResourceId: &provider.ResourceId{ - StorageId: info.MetaData["providerID"], - SpaceId: info.Storage["SpaceRoot"], - OpaqueId: info.Storage["SpaceRoot"], - }, - Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])), - } - datatx.InvalidateCache(owner, ref, m.statCache) - if m.publisher != nil { - if err := datatx.EmitFileUploadedEvent(spaceOwner, owner, ref, m.publisher); err != nil { - appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event") + if _, ok := fs.(storage.UploadsManager); ok { + // TODO we can currently only send updates if the fs is decomposedfs as we read very specific keys from the storage map of the tus info + go func() { + for { + ev := <-handler.CompleteUploads + // TODO we should be able to get the upload progress with fs.GetUploadProgress, but currently tus will erase the info files + // so we create a Progress instance here that is used to read the correct properties + up := upload.Progress{ + Info: ev.Upload, + } + executant := up.Executant() + ref := up.Reference() + datatx.InvalidateCache(&executant, &ref, m.statCache) + if m.publisher != nil { + if err := datatx.EmitFileUploadedEvent(up.SpaceOwner(), &executant, &ref, m.publisher); err != nil { + appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event") + } } } - } - }() + }() + } h := handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { method := r.Method @@ -202,6 +194,7 @@ func setHeaders(fs storage.FS, w http.ResponseWriter, r *http.Request) { } expires := info.MetaData["expires"] if expires != "" { + // FIXME currently info.MetaData["expires"] is an int ... but it MUST be RFC 7231 datetime format, see https://tus.io/protocols/resumable-upload#upload-expires w.Header().Set(net.HeaderTusUploadExpires, expires) } resourceid := provider.ResourceId{ diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index bc73db0fdc..2494b88d5d 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -22,8 +22,7 @@ import ( "context" "io" "net/url" - - tusd "github.com/tus/tusd/pkg/handler" + "time" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" @@ -31,7 +30,7 @@ import ( ) // UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished -type UploadFinishedFunc func(spaceOwner, owner *userpb.UserId, ref *provider.Reference) +type UploadFinishedFunc func(spaceOwner, executant *userpb.UserId, ref *provider.Reference) // FS is the interface to implement access to the storage. type FS interface { @@ -79,8 +78,36 @@ type FS interface { // UploadsManager defines the interface for FS implementations that allow for managing uploads type UploadsManager interface { - ListUploads() ([]tusd.FileInfo, error) - PurgeExpiredUploads(chan<- tusd.FileInfo) error + // ListUploads returns a list of all currently known uploads + // TODO and their processing state + ListUploads() ([]UploadProgress, error) + // PurgeExpiredUploads purges expired uploads + // TODO skip uploads in progress + PurgeExpiredUploads(chan<- UploadProgress) error + // GetUploadProgress returns the upload progress + GetUploadProgress(ctx context.Context, uploadID string) (UploadProgress, error) +} + +type UploadProgress interface { + // ID returns the upload id + ID() string + // Filename returns the filename of the file + Filename() string + // Size returns the size of the upload + Size() int64 + // Offset returns the current offset + Offset() int64 + // Reference returns a reference for the file being uploaded. May be absolute id based or relative to e.g. a space root + Reference() provider.Reference + // Executant returns the userid of the user that created the upload + Executant() userpb.UserId + // SpaceOwner returns the owner of a space if set. optional + SpaceOwner() *userpb.UserId + // Expires returns the time when the upload cen no longer be used + Expires() time.Time + + // Purge allows completely removing an upload. Should emit a PostprocessingFinished event with a Delete outcome + Purge() error } // Registry is the interface that storage registries implement diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 40cb53b073..260d5a2dd0 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -20,10 +20,10 @@ package decomposedfs import ( "context" + "fmt" "os" "path/filepath" "regexp" - "strconv" "strings" "time" @@ -92,18 +92,17 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u ResourceId: &provider.ResourceId{ StorageId: info.MetaData["providerID"], SpaceId: info.Storage["SpaceRoot"], - OpaqueId: info.Storage["SpaceRoot"], + OpaqueId: info.Storage["NodeId"], }, - Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])), } - owner, ok := ctxpkg.ContextGetUser(uploadInfo.Ctx) + executant, ok := ctxpkg.ContextGetUser(uploadInfo.Ctx) if !ok { return provider.ResourceInfo{}, errtypes.PreconditionFailed("error getting user from uploadinfo context") } spaceOwner := &userpb.UserId{ OpaqueId: info.Storage["SpaceOwnerOrManager"], } - uff(spaceOwner, owner.Id, uploadRef) + uff(spaceOwner, executant.Id, uploadRef) } ri := provider.ResourceInfo{ @@ -243,33 +242,30 @@ func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, return upload.Get(ctx, id, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) } +// GetUploadProgress returns the metadata for the given upload id +func (fs *Decomposedfs) GetUploadProgress(ctx context.Context, uploadID string) (storage.UploadProgress, error) { + return fs.getUploadProgress(ctx, filepath.Join(fs.o.Root, "uploads", uploadID+".info")) +} + // ListUploads returns a list of all incomplete uploads -func (fs *Decomposedfs) ListUploads() ([]tusd.FileInfo, error) { +func (fs *Decomposedfs) ListUploads() ([]storage.UploadProgress, error) { return fs.uploadInfos(context.Background()) } // PurgeExpiredUploads scans the fs for expired downloads and removes any leftovers -func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- tusd.FileInfo) error { - infos, err := fs.uploadInfos(context.Background()) +func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- storage.UploadProgress) error { + uploads, err := fs.uploadInfos(context.Background()) if err != nil { return err } - for _, info := range infos { - expires, err := strconv.Atoi(info.MetaData["expires"]) - if err != nil { - continue - } - if int64(expires) < time.Now().Unix() { - purgedChan <- info - err = os.Remove(info.Storage["BinPath"]) - if err != nil { - return err - } - err = os.Remove(filepath.Join(fs.o.Root, "uploads", info.ID+".info")) - if err != nil { - return err - } + for _, upload := range uploads { + if time.Now().After(upload.Expires()) { + // TODO check postprocessing state + purgedChan <- upload + + _ = upload.Purge() + // TODO use a channel to return errors } } return nil @@ -296,28 +292,41 @@ func (fs *Decomposedfs) AsConcatableUpload(up tusd.Upload) tusd.ConcatableUpload return up.(*upload.Upload) } -func (fs *Decomposedfs) uploadInfos(ctx context.Context) ([]tusd.FileInfo, error) { - infos := []tusd.FileInfo{} +func (fs *Decomposedfs) uploadInfos(ctx context.Context) ([]storage.UploadProgress, error) { + uploads := []storage.UploadProgress{} infoFiles, err := filepath.Glob(filepath.Join(fs.o.Root, "uploads", "*.info")) if err != nil { return nil, err } for _, info := range infoFiles { - match := _idRegexp.FindStringSubmatch(info) - if match == nil || len(match) < 2 { - continue - } - up, err := fs.GetUpload(ctx, match[1]) - if err != nil { - return nil, err - } - info, err := up.GetInfo(context.Background()) + progress, err := fs.getUploadProgress(ctx, info) if err != nil { - return nil, err + // Log error? + continue } - infos = append(infos, info) + uploads = append(uploads, progress) + } + return uploads, nil +} + +func (fs *Decomposedfs) getUploadProgress(ctx context.Context, path string) (storage.UploadProgress, error) { + match := _idRegexp.FindStringSubmatch(path) + if match == nil || len(match) < 2 { + return nil, fmt.Errorf("invalid upload path") + } + up, err := fs.GetUpload(ctx, match[1]) + if err != nil { + return nil, err + } + info, err := up.GetInfo(context.Background()) + if err != nil { + return nil, err + } + progress := upload.Progress{ + Path: path, + Info: info, } - return infos, nil + return progress, nil } diff --git a/pkg/storage/utils/decomposedfs/upload/processing.go b/pkg/storage/utils/decomposedfs/upload/processing.go index 462716562e..6a430a364e 100644 --- a/pkg/storage/utils/decomposedfs/upload/processing.go +++ b/pkg/storage/utils/decomposedfs/upload/processing.go @@ -497,3 +497,58 @@ func lookupNode(ctx context.Context, spaceRoot *node.Node, path string, lu *look } return n, nil } + +type Progress struct { + Path string + Info tusd.FileInfo +} + +func (p Progress) ID() string { + return p.Info.ID +} +func (p Progress) Filename() string { + return p.Info.MetaData["filename"] +} +func (p Progress) Size() int64 { + return p.Info.Size +} +func (p Progress) Offset() int64 { + return p.Info.Offset +} +func (p Progress) Reference() provider.Reference { + return provider.Reference{ + ResourceId: &provider.ResourceId{ + StorageId: p.Info.MetaData["providerID"], + SpaceId: p.Info.Storage["SpaceRoot"], + OpaqueId: p.Info.Storage["NodeId"], // Node id is always set in Initiate Upload + }, + } +} +func (p Progress) Executant() userpb.UserId { + return userpb.UserId{ + Idp: p.Info.Storage["Idp"], + OpaqueId: p.Info.Storage["UserId"], + Type: utils.UserTypeMap(p.Info.Storage["UserType"]), + } +} +func (p Progress) SpaceOwner() *userpb.UserId { + return &userpb.UserId{ + OpaqueId: p.Info.Storage["SpaceOwnerOrManager"], + // TODO idp and type? + } +} +func (p Progress) Expires() time.Time { + mt, _ := utils.MTimeToTime(p.Info.MetaData["expires"]) + return mt +} + +func (p Progress) Purge() error { + // TODO we should use the upload id to look up the tus upload and Terminate() that + err := os.Remove(p.Info.Storage["BinPath"]) + if err != nil { + return err + } + + // remove upload metadata + return os.Remove(p.Path) +} From 87f5191a80e7718aa34b3efc65d0f84c05d6c76b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Thu, 30 Nov 2023 14:14:41 +0100 Subject: [PATCH 2/6] iterate interface, add filter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/storage.go | 45 ----------- pkg/storage/uploads.go | 79 +++++++++++++++++++ pkg/storage/utils/decomposedfs/upload.go | 71 ++++++++++------- .../utils/decomposedfs/upload/processing.go | 17 +++- 4 files changed, 138 insertions(+), 74 deletions(-) create mode 100644 pkg/storage/uploads.go diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 2494b88d5d..70ceb5134a 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -22,16 +22,11 @@ import ( "context" "io" "net/url" - "time" - userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1" ) -// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished -type UploadFinishedFunc func(spaceOwner, executant *userpb.UserId, ref *provider.Reference) - // FS is the interface to implement access to the storage. type FS interface { GetHome(ctx context.Context) (string, error) @@ -76,40 +71,6 @@ type FS interface { DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorageSpaceRequest) error } -// UploadsManager defines the interface for FS implementations that allow for managing uploads -type UploadsManager interface { - // ListUploads returns a list of all currently known uploads - // TODO and their processing state - ListUploads() ([]UploadProgress, error) - // PurgeExpiredUploads purges expired uploads - // TODO skip uploads in progress - PurgeExpiredUploads(chan<- UploadProgress) error - // GetUploadProgress returns the upload progress - GetUploadProgress(ctx context.Context, uploadID string) (UploadProgress, error) -} - -type UploadProgress interface { - // ID returns the upload id - ID() string - // Filename returns the filename of the file - Filename() string - // Size returns the size of the upload - Size() int64 - // Offset returns the current offset - Offset() int64 - // Reference returns a reference for the file being uploaded. May be absolute id based or relative to e.g. a space root - Reference() provider.Reference - // Executant returns the userid of the user that created the upload - Executant() userpb.UserId - // SpaceOwner returns the owner of a space if set. optional - SpaceOwner() *userpb.UserId - // Expires returns the time when the upload cen no longer be used - Expires() time.Time - - // Purge allows completely removing an upload. Should emit a PostprocessingFinished event with a Delete outcome - Purge() error -} - // Registry is the interface that storage registries implement // for discovering storage providers type Registry interface { @@ -125,9 +86,3 @@ type PathWrapper interface { Unwrap(ctx context.Context, rp string) (string, error) Wrap(ctx context.Context, rp string) (string, error) } - -type UploadRequest struct { - Ref *provider.Reference - Body io.ReadCloser - Length int64 -} diff --git a/pkg/storage/uploads.go b/pkg/storage/uploads.go new file mode 100644 index 0000000000..4fcc69a7d2 --- /dev/null +++ b/pkg/storage/uploads.go @@ -0,0 +1,79 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package storage + +import ( + "context" + "io" + "time" + + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" +) + +// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished +type UploadFinishedFunc func(spaceOwner, executant *userpb.UserId, ref *provider.Reference) + +type UploadRequest struct { + Ref *provider.Reference + Body io.ReadCloser + Length int64 +} + +// UploadsManager defines the interface for FS implementations that allow for managing uploads +type UploadsManager interface { + // GetUploadProgress returns the upload progress + ListUploadSessions(ctx context.Context, filter UploadSessionFilter) ([]UploadSession, error) +} + +type UploadSession interface { + // ID returns the upload id + ID() string + // Filename returns the filename of the file + Filename() string + // Size returns the size of the upload + Size() int64 + // Offset returns the current offset + Offset() int64 + // Reference returns a reference for the file being uploaded. May be absolute id based or relative to e.g. a space root + Reference() provider.Reference + // Executant returns the userid of the user that created the upload + Executant() userpb.UserId + // SpaceOwner returns the owner of a space if set. optional + SpaceOwner() *userpb.UserId + // Expires returns the time when the upload can no longer be used + Expires() time.Time + + // IsProcessing returns true if postprocessing has not finished, yet + // The actual postprocessing state is tracked in the postprocessing service. + IsProcessing() bool + // MalwareDescription returns the scan result returned by the scanner + MalwareDescription() string + // MalwareScanTime returns the timestamp the upload was scanned. Default time means the item has not been scanned + MalwareScanTime() time.Time + + // Purge allows completely removing an upload. Should emit a PostprocessingFinished event with a Delete outcome + Purge() error +} + +type UploadSessionFilter struct { + Id *string + Processing *bool + Expired *bool +} diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 260d5a2dd0..8af40e4add 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -243,32 +243,42 @@ func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, } // GetUploadProgress returns the metadata for the given upload id -func (fs *Decomposedfs) GetUploadProgress(ctx context.Context, uploadID string) (storage.UploadProgress, error) { - return fs.getUploadProgress(ctx, filepath.Join(fs.o.Root, "uploads", uploadID+".info")) -} - -// ListUploads returns a list of all incomplete uploads -func (fs *Decomposedfs) ListUploads() ([]storage.UploadProgress, error) { - return fs.uploadInfos(context.Background()) -} - -// PurgeExpiredUploads scans the fs for expired downloads and removes any leftovers -func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- storage.UploadProgress) error { - uploads, err := fs.uploadInfos(context.Background()) - if err != nil { - return err +func (fs *Decomposedfs) ListUploadSessions(ctx context.Context, filter storage.UploadSessionFilter) ([]storage.UploadSession, error) { + var sessions []storage.UploadSession + if filter.Id != nil && *filter.Id != "" { + session, err := fs.getUploadProgress(ctx, filepath.Join(fs.o.Root, "uploads", *filter.Id+".info")) + if err != nil { + return nil, err + } + sessions = []storage.UploadSession{session} + } else { + var err error + sessions, err = fs.uploadSessions(ctx) + if err != nil { + return nil, err + } } + filteredSessions := []storage.UploadSession{} + now := time.Now() + for _, session := range sessions { + if filter.Processing != nil && *filter.Processing != session.IsProcessing() { + continue + } + if filter.Expired != nil { + if *filter.Expired { + if now.Before(session.Expires()) { + continue + } + } else { + if now.After(session.Expires()) { + continue + } - for _, upload := range uploads { - if time.Now().After(upload.Expires()) { - // TODO check postprocessing state - purgedChan <- upload - - _ = upload.Purge() - // TODO use a channel to return errors + } } + filteredSessions = append(filteredSessions, session) } - return nil + return filteredSessions, nil } // AsTerminatableUpload returns a TerminatableUpload @@ -292,8 +302,8 @@ func (fs *Decomposedfs) AsConcatableUpload(up tusd.Upload) tusd.ConcatableUpload return up.(*upload.Upload) } -func (fs *Decomposedfs) uploadInfos(ctx context.Context) ([]storage.UploadProgress, error) { - uploads := []storage.UploadProgress{} +func (fs *Decomposedfs) uploadSessions(ctx context.Context) ([]storage.UploadSession, error) { + uploads := []storage.UploadSession{} infoFiles, err := filepath.Glob(filepath.Join(fs.o.Root, "uploads", "*.info")) if err != nil { return nil, err @@ -311,7 +321,7 @@ func (fs *Decomposedfs) uploadInfos(ctx context.Context) ([]storage.UploadProgre return uploads, nil } -func (fs *Decomposedfs) getUploadProgress(ctx context.Context, path string) (storage.UploadProgress, error) { +func (fs *Decomposedfs) getUploadProgress(ctx context.Context, path string) (storage.UploadSession, error) { match := _idRegexp.FindStringSubmatch(path) if match == nil || len(match) < 2 { return nil, fmt.Errorf("invalid upload path") @@ -324,9 +334,16 @@ func (fs *Decomposedfs) getUploadProgress(ctx context.Context, path string) (sto if err != nil { return nil, err } + // upload processing state is stored in the node, for decomposedfs the NodeId is always set by InitiateUpload + n, err := node.ReadNode(ctx, fs.lu, info.Storage["SpaceRoot"], info.Storage["NodeId"], true, nil, true) + if err != nil { + return nil, err + } progress := upload.Progress{ - Path: path, - Info: info, + Path: path, + Info: info, + Processing: n.IsProcessing(ctx), } + _, progress.ScanStatus, progress.ScanTime = n.ScanData(ctx) return progress, nil } diff --git a/pkg/storage/utils/decomposedfs/upload/processing.go b/pkg/storage/utils/decomposedfs/upload/processing.go index 6a430a364e..71ac7eaa9a 100644 --- a/pkg/storage/utils/decomposedfs/upload/processing.go +++ b/pkg/storage/utils/decomposedfs/upload/processing.go @@ -499,8 +499,11 @@ func lookupNode(ctx context.Context, spaceRoot *node.Node, path string, lu *look } type Progress struct { - Path string - Info tusd.FileInfo + Path string + Info tusd.FileInfo + Processing bool + ScanStatus string + ScanTime time.Time } func (p Progress) ID() string { @@ -542,6 +545,16 @@ func (p Progress) Expires() time.Time { return mt } +func (p Progress) IsProcessing() bool { + return p.Processing +} +func (p Progress) MalwareDescription() string { + return p.ScanStatus +} +func (p Progress) MalwareScanTime() time.Time { + return p.ScanTime +} + func (p Progress) Purge() error { // TODO we should use the upload id to look up the tus upload and Terminate() that err := os.Remove(p.Info.Storage["BinPath"]) From 8743cdaa40a28f1cea16347195cc86b5259798a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Thu, 30 Nov 2023 15:25:28 +0100 Subject: [PATCH 3/6] incorporate feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/rhttp/datatx/manager/tus/tus.go | 7 +++---- pkg/storage/uploads.go | 18 +++++++++++------- pkg/storage/utils/decomposedfs/upload.go | 12 +++++------- .../utils/decomposedfs/upload/processing.go | 15 +++------------ 4 files changed, 22 insertions(+), 30 deletions(-) diff --git a/pkg/rhttp/datatx/manager/tus/tus.go b/pkg/rhttp/datatx/manager/tus/tus.go index e868569a31..a46bd17739 100644 --- a/pkg/rhttp/datatx/manager/tus/tus.go +++ b/pkg/rhttp/datatx/manager/tus/tus.go @@ -101,12 +101,12 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { return nil, err } - if _, ok := fs.(storage.UploadsManager); ok { - // TODO we can currently only send updates if the fs is decomposedfs as we read very specific keys from the storage map of the tus info + if _, ok := fs.(storage.UploadSessionLister); ok { + // We can currently only send updates if the fs is decomposedfs as we read very specific keys from the storage map of the tus info go func() { for { ev := <-handler.CompleteUploads - // TODO we should be able to get the upload progress with fs.GetUploadProgress, but currently tus will erase the info files + // We should be able to get the upload progress with fs.GetUploadProgress, but currently tus will erase the info files // so we create a Progress instance here that is used to read the correct properties up := upload.Progress{ Info: ev.Upload, @@ -194,7 +194,6 @@ func setHeaders(fs storage.FS, w http.ResponseWriter, r *http.Request) { } expires := info.MetaData["expires"] if expires != "" { - // FIXME currently info.MetaData["expires"] is an int ... but it MUST be RFC 7231 datetime format, see https://tus.io/protocols/resumable-upload#upload-expires w.Header().Set(net.HeaderTusUploadExpires, expires) } resourceid := provider.ResourceId{ diff --git a/pkg/storage/uploads.go b/pkg/storage/uploads.go index 4fcc69a7d2..2408caf4c8 100644 --- a/pkg/storage/uploads.go +++ b/pkg/storage/uploads.go @@ -25,6 +25,7 @@ import ( userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + tusd "github.com/tus/tusd/pkg/handler" ) // UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished @@ -36,8 +37,15 @@ type UploadRequest struct { Length int64 } -// UploadsManager defines the interface for FS implementations that allow for managing uploads +// UploadsManager defines the interface for storage drivers that allow for managing uploads +// Deprecated: No longer used. Storage drivers should implement the UploadSessionLister. type UploadsManager interface { + ListUploads() ([]tusd.FileInfo, error) + PurgeExpiredUploads(chan<- tusd.FileInfo) error +} + +// UploadSessionLister defines the interface for FS implementations that allow listing and purging upload sessions +type UploadSessionLister interface { // GetUploadProgress returns the upload progress ListUploadSessions(ctx context.Context, filter UploadSessionFilter) ([]UploadSession, error) } @@ -63,17 +71,13 @@ type UploadSession interface { // IsProcessing returns true if postprocessing has not finished, yet // The actual postprocessing state is tracked in the postprocessing service. IsProcessing() bool - // MalwareDescription returns the scan result returned by the scanner - MalwareDescription() string - // MalwareScanTime returns the timestamp the upload was scanned. Default time means the item has not been scanned - MalwareScanTime() time.Time // Purge allows completely removing an upload. Should emit a PostprocessingFinished event with a Delete outcome - Purge() error + Purge(ctx context.Context) error } type UploadSessionFilter struct { - Id *string + ID *string Processing *bool Expired *bool } diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 8af40e4add..e4ce6a4ccb 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -245,8 +245,8 @@ func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, // GetUploadProgress returns the metadata for the given upload id func (fs *Decomposedfs) ListUploadSessions(ctx context.Context, filter storage.UploadSessionFilter) ([]storage.UploadSession, error) { var sessions []storage.UploadSession - if filter.Id != nil && *filter.Id != "" { - session, err := fs.getUploadProgress(ctx, filepath.Join(fs.o.Root, "uploads", *filter.Id+".info")) + if filter.ID != nil && *filter.ID != "" { + session, err := fs.getUploadSession(ctx, filepath.Join(fs.o.Root, "uploads", *filter.ID+".info")) if err != nil { return nil, err } @@ -273,7 +273,6 @@ func (fs *Decomposedfs) ListUploadSessions(ctx context.Context, filter storage.U if now.After(session.Expires()) { continue } - } } filteredSessions = append(filteredSessions, session) @@ -310,9 +309,9 @@ func (fs *Decomposedfs) uploadSessions(ctx context.Context) ([]storage.UploadSes } for _, info := range infoFiles { - progress, err := fs.getUploadProgress(ctx, info) + progress, err := fs.getUploadSession(ctx, info) if err != nil { - // Log error? + appctx.GetLogger(ctx).Error().Interface("path", info).Msg("Decomposedfs: could not getUploadSession") continue } @@ -321,7 +320,7 @@ func (fs *Decomposedfs) uploadSessions(ctx context.Context) ([]storage.UploadSes return uploads, nil } -func (fs *Decomposedfs) getUploadProgress(ctx context.Context, path string) (storage.UploadSession, error) { +func (fs *Decomposedfs) getUploadSession(ctx context.Context, path string) (storage.UploadSession, error) { match := _idRegexp.FindStringSubmatch(path) if match == nil || len(match) < 2 { return nil, fmt.Errorf("invalid upload path") @@ -344,6 +343,5 @@ func (fs *Decomposedfs) getUploadProgress(ctx context.Context, path string) (sto Info: info, Processing: n.IsProcessing(ctx), } - _, progress.ScanStatus, progress.ScanTime = n.ScanData(ctx) return progress, nil } diff --git a/pkg/storage/utils/decomposedfs/upload/processing.go b/pkg/storage/utils/decomposedfs/upload/processing.go index 71ac7eaa9a..8b64ae5f7f 100644 --- a/pkg/storage/utils/decomposedfs/upload/processing.go +++ b/pkg/storage/utils/decomposedfs/upload/processing.go @@ -502,8 +502,6 @@ type Progress struct { Path string Info tusd.FileInfo Processing bool - ScanStatus string - ScanTime time.Time } func (p Progress) ID() string { @@ -523,7 +521,7 @@ func (p Progress) Reference() provider.Reference { ResourceId: &provider.ResourceId{ StorageId: p.Info.MetaData["providerID"], SpaceId: p.Info.Storage["SpaceRoot"], - OpaqueId: p.Info.Storage["NodeId"], // Node id is always set in Initiate Upload + OpaqueId: p.Info.Storage["NodeId"], // Node id is always set in InitiateUpload }, } } @@ -536,8 +534,8 @@ func (p Progress) Executant() userpb.UserId { } func (p Progress) SpaceOwner() *userpb.UserId { return &userpb.UserId{ + // idp and type do not seem to be consumed and the node currently only stores the user id anyway OpaqueId: p.Info.Storage["SpaceOwnerOrManager"], - // TODO idp and type? } } func (p Progress) Expires() time.Time { @@ -548,15 +546,8 @@ func (p Progress) Expires() time.Time { func (p Progress) IsProcessing() bool { return p.Processing } -func (p Progress) MalwareDescription() string { - return p.ScanStatus -} -func (p Progress) MalwareScanTime() time.Time { - return p.ScanTime -} -func (p Progress) Purge() error { - // TODO we should use the upload id to look up the tus upload and Terminate() that +func (p Progress) Purge(ctx context.Context) error { err := os.Remove(p.Info.Storage["BinPath"]) if err != nil { return err From f37fe4280b3384f425ee247ec4f8e251e6520083 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Thu, 30 Nov 2023 16:17:33 +0100 Subject: [PATCH 4/6] add changelog MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- .../unreleased/extract-uploadsessionlister-interface.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog/unreleased/extract-uploadsessionlister-interface.md diff --git a/changelog/unreleased/extract-uploadsessionlister-interface.md b/changelog/unreleased/extract-uploadsessionlister-interface.md new file mode 100644 index 0000000000..db393b2253 --- /dev/null +++ b/changelog/unreleased/extract-uploadsessionlister-interface.md @@ -0,0 +1,5 @@ +Enhancement: Introduce UploadSessionLister interface + +We introduced a new UploadSessionLister interface that allows better control of upload sessions. Upload sessions include the processing state and can be used to filter and purge the list of currently ongoing upload sessions. + +https://github.com/cs3org/reva/pull/4375 From f4e06ef570aff45d2efd364741cb9734e6717e5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Thu, 30 Nov 2023 17:07:34 +0100 Subject: [PATCH 5/6] make jklinter happy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/uploads.go | 5 ++- pkg/storage/utils/decomposedfs/upload.go | 2 +- .../utils/decomposedfs/upload/processing.go | 32 ++++++++++++++++--- 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/pkg/storage/uploads.go b/pkg/storage/uploads.go index 2408caf4c8..87d26115bf 100644 --- a/pkg/storage/uploads.go +++ b/pkg/storage/uploads.go @@ -31,6 +31,7 @@ import ( // UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished type UploadFinishedFunc func(spaceOwner, executant *userpb.UserId, ref *provider.Reference) +// UploadRequest us used in FS.Upload() to carry required upload metadata type UploadRequest struct { Ref *provider.Reference Body io.ReadCloser @@ -46,10 +47,11 @@ type UploadsManager interface { // UploadSessionLister defines the interface for FS implementations that allow listing and purging upload sessions type UploadSessionLister interface { - // GetUploadProgress returns the upload progress + // ListUploadSessions returns the upload sessions matching the given filter ListUploadSessions(ctx context.Context, filter UploadSessionFilter) ([]UploadSession, error) } +// UploadSession is the interface that storage drivers need to return whan listing upload sessions. type UploadSession interface { // ID returns the upload id ID() string @@ -76,6 +78,7 @@ type UploadSession interface { Purge(ctx context.Context) error } +// UploadSessionFilter can be used to filter upload sessions type UploadSessionFilter struct { ID *string Processing *bool diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index e4ce6a4ccb..055f901ccc 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -242,7 +242,7 @@ func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, return upload.Get(ctx, id, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens) } -// GetUploadProgress returns the metadata for the given upload id +// ListUploadSessions returns the upload sessions for the given filter func (fs *Decomposedfs) ListUploadSessions(ctx context.Context, filter storage.UploadSessionFilter) ([]storage.UploadSession, error) { var sessions []storage.UploadSession if filter.ID != nil && *filter.ID != "" { diff --git a/pkg/storage/utils/decomposedfs/upload/processing.go b/pkg/storage/utils/decomposedfs/upload/processing.go index 8b64ae5f7f..a025c54ed7 100644 --- a/pkg/storage/utils/decomposedfs/upload/processing.go +++ b/pkg/storage/utils/decomposedfs/upload/processing.go @@ -21,6 +21,7 @@ package upload import ( "context" "encoding/json" + stderrors "errors" "fmt" iofs "io/fs" "os" @@ -498,24 +499,34 @@ func lookupNode(ctx context.Context, spaceRoot *node.Node, path string, lu *look return n, nil } +// Progress adapts the persisted upload metadata for the UploadSessionLister interface type Progress struct { Path string Info tusd.FileInfo Processing bool } +// ID implements the storage.UploadSession interface func (p Progress) ID() string { return p.Info.ID } + +// Filename implements the storage.UploadSession interface func (p Progress) Filename() string { return p.Info.MetaData["filename"] } + +// Size implements the storage.UploadSession interface func (p Progress) Size() int64 { return p.Info.Size } + +// Offset implements the storage.UploadSession interface func (p Progress) Offset() int64 { return p.Info.Offset } + +// Reference implements the storage.UploadSession interface func (p Progress) Reference() provider.Reference { return provider.Reference{ ResourceId: &provider.ResourceId{ @@ -525,6 +536,8 @@ func (p Progress) Reference() provider.Reference { }, } } + +// Executant implements the storage.UploadSession interface func (p Progress) Executant() userpb.UserId { return userpb.UserId{ Idp: p.Info.Storage["Idp"], @@ -532,27 +545,38 @@ func (p Progress) Executant() userpb.UserId { Type: utils.UserTypeMap(p.Info.Storage["UserType"]), } } + +// SpaceOwner implements the storage.UploadSession interface func (p Progress) SpaceOwner() *userpb.UserId { return &userpb.UserId{ // idp and type do not seem to be consumed and the node currently only stores the user id anyway OpaqueId: p.Info.Storage["SpaceOwnerOrManager"], } } + +// Expires implements the storage.UploadSession interface func (p Progress) Expires() time.Time { mt, _ := utils.MTimeToTime(p.Info.MetaData["expires"]) return mt } +// IsProcessing implements the storage.UploadSession interface func (p Progress) IsProcessing() bool { return p.Processing } +// Purge implements the storage.UploadSession interface func (p Progress) Purge(ctx context.Context) error { - err := os.Remove(p.Info.Storage["BinPath"]) - if err != nil { - return err + berr := os.Remove(p.Info.Storage["BinPath"]) + if berr != nil { + appctx.GetLogger(ctx).Error().Str("id", p.Info.ID).Interface("path", p.Info.Storage["BinPath"]).Msg("Decomposedfs: could not purge bin path for upload session") } // remove upload metadata - return os.Remove(p.Path) + merr := os.Remove(p.Path) + if merr != nil { + appctx.GetLogger(ctx).Error().Str("id", p.Info.ID).Interface("path", p.Path).Msg("Decomposedfs: could not purge metadata path for upload session") + } + + return stderrors.Join(berr, merr) } From 4db6635be3e49f939d8ea4b715be84602d7f1c5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 1 Dec 2023 11:38:17 +0100 Subject: [PATCH 6/6] restore sending path in events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- pkg/storage/utils/decomposedfs/upload.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 055f901ccc..59ef12a4e1 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -92,8 +92,9 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u ResourceId: &provider.ResourceId{ StorageId: info.MetaData["providerID"], SpaceId: info.Storage["SpaceRoot"], - OpaqueId: info.Storage["NodeId"], + OpaqueId: info.Storage["SpaceRoot"], }, + Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])), } executant, ok := ctxpkg.ContextGetUser(uploadInfo.Ctx) if !ok {