Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task: added delete task handler #160

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions task/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package task

import (
"fmt"
"time"

"github.com/golang/glog"
"github.com/livepeer/go-api-client"
"github.com/livepeer/livepeer-data/pkg/data"
"golang.org/x/sync/errgroup"
)

func TaskDelete(tctx *TaskContext) (*TaskHandlerOutput, error) {

var (
ctx = tctx.Context
asset = tctx.InputAsset
osSess = tctx.outputOS
)

directory := asset.PlaybackID

files, err := osSess.ListFiles(ctx, directory, "/")

gioelecerati marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
glog.Errorf("Error listing files in directory %v", directory)
gioelecerati marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 27 in task/delete.go

View check run for this annotation

Codecov / codecov/patch

task/delete.go#L13-L27

Added lines #L13 - L27 were not covered by tests

totalFiles := files.Files()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not total files, it's only 1 page! You have to handle pagination. e.g.: https://github.com/livepeer/livepeer-infra/pull/1214/files#diff-71a8aa1d4b064228d72ee7fbf0ba12affc84118bac0b08da9ad06125cd5f86deR178 (you dont need the rate limiting)


accumulator := NewAccumulator()
gioelecerati marked this conversation as resolved.
Show resolved Hide resolved
tctx.Progress.TrackCount(accumulator.Size, uint64(len(totalFiles)), 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a problem here due to pagination. The total is not really the total if we do the pagination concurrently with the deletion. One alternative, only to get a right progress number, would be listing all the files beforehand accumulating them in memory until there are no more pages, then start the deletion process. It will make the deletion take a little longer if there are too many pages, but I think it may be interesting anyway.

Otherwise could try something smarter... using the asset size or duration and approximating the number of files... but idk, sounds like overengineering so i'd prefer either the pre-listing idea, having progress go back and forth once there are multiple pages, or even not have any progress at all.

WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: maybe the ideal long term solution might be to store the total size of the asset files in the asset object, when they are created or modified. Then here we could just measure the progress based on that total size. Not a change for this PR tho as it involves a lot other code paths.


const maxRetries = 3
const retryInterval = time.Second

eg := errgroup.Group{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to set the limit of concurrent routines.


for _, file := range totalFiles {
filename := file.Name
eg.Go(func() error {
var err error
for i := 0; i < maxRetries; i++ {
err = osSess.DeleteFile(ctx, filename)
if err == nil {
accumulator.Accumulate(1)
return nil
}
glog.Errorf("Error deleting file %v: %v (retrying...)", filename, err)
time.Sleep(retryInterval)

Check warning on line 50 in task/delete.go

View check run for this annotation

Codecov / codecov/patch

task/delete.go#L29-L50

Added lines #L29 - L50 were not covered by tests
}
return fmt.Errorf("failed to delete file %v after %d retries", filename, maxRetries)

Check warning on line 52 in task/delete.go

View check run for this annotation

Codecov / codecov/patch

task/delete.go#L52

Added line #L52 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you return an error the whole error group will cancel. You should always return nil and only log the errors instead (if you want to delete as much as possible before failing)

})
}

if err := eg.Wait(); err != nil {
glog.Errorf("Error deleting files: %v", err)
}

Check warning on line 58 in task/delete.go

View check run for this annotation

Codecov / codecov/patch

task/delete.go#L56-L58

Added lines #L56 - L58 were not covered by tests

if ipfs := asset.AssetSpec.Storage.IPFS; ipfs != nil {
err = UnpinFromIpfs(*tctx, ipfs.CID, "cid")
if err != nil {
glog.Errorf("Error unpinning from IPFS %v", ipfs.CID)
}
err = UnpinFromIpfs(*tctx, ipfs.NFTMetadata.CID, "nftMetadataCid")
if err != nil {
glog.Errorf("Error unpinning metadata from IPFS %v", ipfs.NFTMetadata.CID)
}

Check warning on line 68 in task/delete.go

View check run for this annotation

Codecov / codecov/patch

task/delete.go#L60-L68

Added lines #L60 - L68 were not covered by tests
Comment on lines +56 to +68
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, thinking further about it, if you never return errors, this task will always succeed and we will never even retry deleting the files. In order to simplify this whole logic (since you have some complexity missing like pagination), I'd suggest you to just return errors for now.

If you do want the failures to be as much partial successes as possible, you can accumulate the intermediate errors you get on a local slice here, and in the end you create a single error in case you found any errors during the process. I think that could be premature optimization for now tho, so I'd rather invest more in the additional retry logics you implemented here for file deletion, and could delete for IPFS unpinning as well.

Can even create a generic helper function to retry another function a few times :D
Geppetto gave me this:

func retry(maxRetries int, f func() error) error {
    var err error

    for i := 0; i <= maxRetries; i++ {
        if err = f(); err == nil {
            return nil
        }
    }

    return fmt.Errorf("after %d attempts, last error: %s", maxRetries, err)
}

WDYT?

}

return &TaskHandlerOutput{
TaskOutput: &data.TaskOutput{},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably still create the Delete output field (empty object), otherwise we have some code in the API that can explode if the task type output is missing.

}, nil

Check warning on line 73 in task/delete.go

View check run for this annotation

Codecov / codecov/patch

task/delete.go#L71-L73

Added lines #L71 - L73 were not covered by tests
}

func UnpinFromIpfs(tctx TaskContext, cid string, filter string) error {
assets, _, err := tctx.lapi.ListAssets(api.ListOptions{
Filters: map[string]interface{}{
filter: cid,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weird go syntax, it looks like the field is called filter lol

},
AllUsers: true,
})

if err != nil {
return err
}

Check warning on line 86 in task/delete.go

View check run for this annotation

Codecov / codecov/patch

task/delete.go#L76-L86

Added lines #L76 - L86 were not covered by tests

if len(assets) == 1 {
return tctx.ipfs.Unpin(tctx.Context, cid)
}

Check warning on line 90 in task/delete.go

View check run for this annotation

Codecov / codecov/patch

task/delete.go#L88-L90

Added lines #L88 - L90 were not covered by tests

return nil

Check warning on line 92 in task/delete.go

View check run for this annotation

Codecov / codecov/patch

task/delete.go#L92

Added line #L92 was not covered by tests
}
1 change: 1 addition & 0 deletions task/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var (
"upload": TaskUpload,
"export": TaskExport,
"export-data": TaskExportData,
"delete": TaskDelete,
"transcode-file": TaskTranscodeFile,
"clip": TaskClip,
}
Expand Down
Loading