Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

uploader: Improve upload retry policy #61

Merged
merged 7 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions catalyst-uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func run() int {
verbosity := fs.String("v", "", "Log verbosity. {4|5|6}")
timeout := fs.Duration("t", 30*time.Second, "Upload timeout")
storageFallbackURLs := CommaMapFlag(fs, "storage-fallback-urls", `Comma-separated map of primary to backup storage URLs. If a file fails uploading to one of the primary storages (detected by prefix), it will fallback to the corresponding backup URL after having the prefix replaced`)
segTimeout := fs.Duration("segment-timeout", 5*time.Minute, "Segment write timeout")

defaultConfigFile := "/etc/livepeer/catalyst_uploader.conf"
if _, err := os.Stat(defaultConfigFile); os.IsNotExist(err) {
Expand Down Expand Up @@ -99,7 +100,8 @@ func run() int {
return 1
}

out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs)
start := time.Now()
out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs, *segTimeout)
if err != nil {
glog.Errorf("Uploader failed for %s: %s", uri.Redacted(), err)
return 1
Expand All @@ -109,7 +111,7 @@ func run() int {
if out != nil {
respHeaders = out.UploaderResponseHeaders
}
glog.Infof("Uploader succeeded for %s. storageRequestID=%s Etag=%s", uri.Redacted(), respHeaders.Get("X-Amz-Request-Id"), respHeaders.Get("Etag"))
glog.Infof("Uploader succeeded for %s. storageRequestID=%s Etag=%s timeTaken=%vms", uri.Redacted(), respHeaders.Get("X-Amz-Request-Id"), respHeaders.Get("Etag"), time.Since(start).Milliseconds())
// success, write uploaded file details to stdout
if glog.V(5) {
err = json.NewEncoder(stdout).Encode(map[string]string{"uri": uri.Redacted()})
Expand Down
133 changes: 83 additions & 50 deletions core/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ import (
"strings"
"time"

"golang.org/x/sync/errgroup"

"github.com/cenkalti/backoff/v4"
"github.com/golang/glog"
"github.com/livepeer/go-tools/drivers"
"golang.org/x/sync/errgroup"
)

type ByteCounter struct {
Expand All @@ -29,49 +28,69 @@ func (bc *ByteCounter) Write(p []byte) (n int, err error) {
return n, nil
}

func newExponentialBackOffExecutor() *backoff.ExponentialBackOff {
func newExponentialBackOffExecutor(initial, max, totalMax time.Duration) *backoff.ExponentialBackOff {
backOff := backoff.NewExponentialBackOff()
backOff.InitialInterval = 30 * time.Second
backOff.MaxInterval = 2 * time.Minute
backOff.MaxElapsedTime = 0 // don't impose a timeout as part of the retries
backOff.InitialInterval = initial
backOff.MaxInterval = max
backOff.MaxElapsedTime = totalMax
backOff.Reset()
return backOff
}

func NoRetries() backoff.BackOff {
return &backoff.StopBackOff{}
}

func UploadRetryBackoff() backoff.BackOff {
return backoff.WithMaxRetries(newExponentialBackOffExecutor(), 2)
return newExponentialBackOffExecutor(30*time.Second, 4*time.Minute, 15*time.Minute)
}

const segmentWriteTimeout = 5 * time.Minute
func SingleRequestRetryBackoff() backoff.BackOff {
return newExponentialBackOffExecutor(5*time.Second, 10*time.Second, 30*time.Second)
}

var expiryField = map[string]string{
"Object-Expires": "+168h", // Objects will be deleted after 7 days
}

func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string) (*drivers.SaveDataOutput, error) {
if strings.HasSuffix(outputURI.Path, ".ts") || strings.HasSuffix(outputURI.Path, ".mp4") {
func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string, segTimeout time.Duration) (*drivers.SaveDataOutput, error) {
ext := filepath.Ext(outputURI.Path)
inputFile, err := os.CreateTemp("", "upload-*"+ext)
if err != nil {
return nil, fmt.Errorf("failed to write to temp file: %w", err)
}
inputFileName := inputFile.Name()
defer os.Remove(inputFileName)

if ext == ".ts" || ext == ".mp4" {
// For segments we just write them in one go here and return early.
// (Otherwise the incremental write logic below caused issues with clipping since it results in partial segments being written.)
fileContents, err := io.ReadAll(input)
_, err = io.Copy(inputFile, input)
if err != nil {
return nil, fmt.Errorf("failed to read file")
return nil, fmt.Errorf("failed to write to temp file: %w", err)
}
if err := inputFile.Close(); err != nil {
return nil, fmt.Errorf("failed to close input file: %w", err)
}

out, bytesWritten, err := uploadFileWithBackup(outputURI, fileContents, nil, segmentWriteTimeout, true, storageFallbackURLs)
out, bytesWritten, err := uploadFileWithBackup(outputURI, inputFileName, nil, segTimeout, true, storageFallbackURLs)
if err != nil {
return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), bytesWritten, err)
}

if err = extractThumb(outputURI, fileContents, storageFallbackURLs); err != nil {
if err = extractThumb(outputURI, inputFileName, storageFallbackURLs); err != nil {
glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err)
}
return out, nil
}

// For the manifest files we want a very short cache ttl as the files are updating every few seconds
fields := &drivers.FileProperties{CacheControl: "max-age=1"}
var fileContents []byte
var lastWrite = time.Now()
// Keep the file handle closed while we wait for input data
if err := inputFile.Close(); err != nil {
return nil, fmt.Errorf("failed to close input file: %w", err)
}

scanner := bufio.NewScanner(input)

Expand All @@ -89,11 +108,21 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout

for scanner.Scan() {
b := scanner.Bytes()
fileContents = append(fileContents, b...)

inputFile, err = os.OpenFile(inputFileName, os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}
if _, err := inputFile.Write(b); err != nil {
return nil, fmt.Errorf("failed to append to input file: %w", err)
}
if err := inputFile.Close(); err != nil {
return nil, fmt.Errorf("failed to close input file: %w", err)
}

// Only write the latest version of the data that's been piped in if enough time has elapsed since the last write
if lastWrite.Add(waitBetweenWrites).Before(time.Now()) {
if _, _, err := uploadFileWithBackup(outputURI, fileContents, fields, writeTimeout, false, storageFallbackURLs); err != nil {
if _, _, err := uploadFileWithBackup(outputURI, inputFileName, fields, writeTimeout, false, storageFallbackURLs); err != nil {
// Just log this error, since it'll effectively be retried after the next interval
glog.Errorf("Failed to write: %v", err)
} else {
Expand All @@ -107,28 +136,40 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout
}

// We have to do this final write, otherwise there might be final data that's arrived since the last periodic write
if _, _, err := uploadFileWithBackup(outputURI, fileContents, fields, writeTimeout, true, storageFallbackURLs); err != nil {
if _, _, err := uploadFileWithBackup(outputURI, inputFileName, fields, writeTimeout, false, storageFallbackURLs); err != nil {
// Don't ignore this error, since there won't be any further attempts to write
return nil, fmt.Errorf("failed to write final save: %w", err)
}
glog.Infof("Completed writing %s to storage", outputURI.Redacted())
return nil, nil
}

func uploadFileWithBackup(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool, storageFallbackURLs map[string]string) (out *drivers.SaveDataOutput, bytesWritten int64, err error) {
out, bytesWritten, primaryErr := uploadFile(outputURI, fileContents, fields, writeTimeout, withRetries)
if primaryErr == nil {
return out, bytesWritten, nil
func uploadFileWithBackup(outputURI *url.URL, fileName string, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool, storageFallbackURLs map[string]string) (out *drivers.SaveDataOutput, bytesWritten int64, err error) {
retryPolicy := NoRetries()
if withRetries {
retryPolicy = UploadRetryBackoff()
}
err = backoff.Retry(func() error {
var primaryErr error
out, bytesWritten, primaryErr = uploadFile(outputURI, fileName, fields, writeTimeout, withRetries)
if primaryErr == nil {
return nil
}

backupURI, err := buildBackupURI(outputURI, storageFallbackURLs)
if err != nil {
glog.Errorf("failed to build backup URL: %v", err)
return nil, 0, primaryErr
}
backupURI, err := buildBackupURI(outputURI, storageFallbackURLs)
if err != nil {
glog.Errorf("failed to build backup URL: %v", err)
return primaryErr
}
glog.Warningf("Primary upload failed, uploading to backupURL=%s primaryErr=%q", backupURI.Redacted(), primaryErr)

glog.Warningf("Primary upload failed, uploading to backupURL=%s primaryErr=%q", backupURI.Redacted(), primaryErr)
return uploadFile(backupURI, fileContents, fields, writeTimeout, withRetries)
out, bytesWritten, err = uploadFile(backupURI, fileName, fields, writeTimeout, withRetries)
if err == nil {
return nil
}
return fmt.Errorf("upload file errors: primary: %w; backup: %w", primaryErr, err)
}, retryPolicy)
return out, bytesWritten, err
}

func buildBackupURI(outputURI *url.URL, storageFallbackURLs map[string]string) (*url.URL, error) {
Expand All @@ -142,7 +183,7 @@ func buildBackupURI(outputURI *url.URL, storageFallbackURLs map[string]string) (
return nil, fmt.Errorf("no backup URL found for %s", outputURI.Redacted())
}

func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool) (out *drivers.SaveDataOutput, bytesWritten int64, err error) {
func uploadFile(outputURI *url.URL, fileName string, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool) (out *drivers.SaveDataOutput, bytesWritten int64, err error) {
outputStr := outputURI.String()
// While we wait for storj to implement an easier method for global object deletion we are hacking something
// here to allow us to have recording objects deleted after 7 days.
Expand All @@ -161,14 +202,20 @@ func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FilePro
}
session := driver.NewSession("")

var retryPolicy backoff.BackOff = &backoff.StopBackOff{} // no retries by default
retryPolicy := NoRetries()
if withRetries {
retryPolicy = UploadRetryBackoff()
retryPolicy = SingleRequestRetryBackoff()
}
err = backoff.Retry(func() error {
file, err := os.Open(fileName)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()

// To count how many bytes we are trying to read then write (upload) to s3 storage
byteCounter := &ByteCounter{}
teeReader := io.TeeReader(bytes.NewReader(fileContents), byteCounter)
teeReader := io.TeeReader(file, byteCounter)

out, err = session.SaveData(context.Background(), "", teeReader, fields, writeTimeout)
bytesWritten = byteCounter.Count
Expand All @@ -182,20 +229,16 @@ func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FilePro
return out, bytesWritten, err
}

func extractThumb(outputURI *url.URL, segment []byte, storageFallbackURLs map[string]string) error {
func extractThumb(outputURI *url.URL, segmentFileName string, storageFallbackURLs map[string]string) error {
tmpDir, err := os.MkdirTemp(os.TempDir(), "thumb-*")
if err != nil {
return fmt.Errorf("temp file creation failed: %w", err)
}
defer os.RemoveAll(tmpDir)
outFile := filepath.Join(tmpDir, "out.png")
inFile := filepath.Join(tmpDir, filepath.Base(outputURI.Path))
if err = os.WriteFile(inFile, segment, 0644); err != nil {
return fmt.Errorf("failed to write input file: %w", err)
}

args := []string{
"-i", inFile,
"-i", segmentFileName,
"-ss", "00:00:00",
"-vframes", "1",
"-vf", "scale=640:360:force_original_aspect_ratio=decrease",
Expand All @@ -217,16 +260,6 @@ func extractThumb(outputURI *url.URL, segment []byte, storageFallbackURLs map[st
return fmt.Errorf("ffmpeg failed[%s] [%s]: %w", outputBuf.String(), stdErr.String(), err)
}

f, err := os.Open(outFile)
if err != nil {
return fmt.Errorf("opening file failed: %w", err)
}
defer f.Close()
thumbData, err := io.ReadAll(f)
if err != nil {
return fmt.Errorf("failed to read file: %w", err)
}

// two thumbs, one at session level, the other at stream level
thumbURLs := []*url.URL{outputURI.JoinPath("../latest.png"), outputURI.JoinPath("../../../latest.png")}
fields := &drivers.FileProperties{CacheControl: "max-age=5"}
Expand All @@ -235,7 +268,7 @@ func extractThumb(outputURI *url.URL, segment []byte, storageFallbackURLs map[st
for _, thumbURL := range thumbURLs {
thumbURL := thumbURL
errGroup.Go(func() error {
_, _, err = uploadFileWithBackup(thumbURL, thumbData, fields, 10*time.Second, true, storageFallbackURLs)
_, _, err = uploadFileWithBackup(thumbURL, outFile, fields, 10*time.Second, true, storageFallbackURLs)
if err != nil {
return fmt.Errorf("saving thumbnail failed: %w", err)
}
Expand Down
7 changes: 5 additions & 2 deletions core/uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestItWritesSlowInputIncrementally(t *testing.T) {
go func() {
u, err := url.Parse(outputFile.Name())
require.NoError(t, err)
_, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil)
_, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil, time.Minute)
require.NoError(t, err, "")
}()

Expand Down Expand Up @@ -67,6 +67,9 @@ func TestUploadFileWithBackup(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

testFile := filepath.Join(dir, "input.txt")
require.NoError(t, os.WriteFile(testFile, []byte("test"), 0644))

fakeStorage := "s3+https://fake.service.livepeer.com/bucket/"
backupStorage := filepath.Join(dir, "backup") + "/"
fakeOutput := fakeStorage + "hls/123/file.txt"
Expand All @@ -75,7 +78,7 @@ func TestUploadFileWithBackup(t *testing.T) {
storageFallbackURLs := map[string]string{
fakeStorage: "file://" + backupStorage,
}
out, written, err := uploadFileWithBackup(mustParseURL(fakeOutput), []byte("test"), nil, 0, false, storageFallbackURLs)
out, written, err := uploadFileWithBackup(mustParseURL(fakeOutput), testFile, nil, 0, false, storageFallbackURLs)
require.NoError(t, err)
require.Equal(t, expectedOutFile, out.URL)
require.Equal(t, int64(4), written)
Expand Down
Loading