diff --git a/catalyst-uploader.go b/catalyst-uploader.go index ca95efd..9eb9842 100644 --- a/catalyst-uploader.go +++ b/catalyst-uploader.go @@ -39,6 +39,7 @@ func run() int { verbosity := fs.String("v", "", "Log verbosity. {4|5|6}") timeout := fs.Duration("t", 30*time.Second, "Upload timeout") storageFallbackURLs := CommaMapFlag(fs, "storage-fallback-urls", `Comma-separated map of primary to backup storage URLs. If a file fails uploading to one of the primary storages (detected by prefix), it will fallback to the corresponding backup URL after having the prefix replaced`) + segTimeout := fs.Duration("segment-timeout", 5*time.Minute, "Segment write timeout") defaultConfigFile := "/etc/livepeer/catalyst_uploader.conf" if _, err := os.Stat(defaultConfigFile); os.IsNotExist(err) { @@ -99,7 +100,8 @@ func run() int { return 1 } - out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs) + start := time.Now() + out, err := core.Upload(os.Stdin, uri, WaitBetweenWrites, *timeout, *storageFallbackURLs, *segTimeout) if err != nil { glog.Errorf("Uploader failed for %s: %s", uri.Redacted(), err) return 1 @@ -109,7 +111,7 @@ func run() int { if out != nil { respHeaders = out.UploaderResponseHeaders } - glog.Infof("Uploader succeeded for %s. storageRequestID=%s Etag=%s", uri.Redacted(), respHeaders.Get("X-Amz-Request-Id"), respHeaders.Get("Etag")) + glog.Infof("Uploader succeeded for %s. storageRequestID=%s Etag=%s timeTaken=%vms", uri.Redacted(), respHeaders.Get("X-Amz-Request-Id"), respHeaders.Get("Etag"), time.Since(start).Milliseconds()) // success, write uploaded file details to stdout if glog.V(5) { err = json.NewEncoder(stdout).Encode(map[string]string{"uri": uri.Redacted()}) diff --git a/core/uploader.go b/core/uploader.go index 0bb8e9d..d0e9746 100644 --- a/core/uploader.go +++ b/core/uploader.go @@ -13,11 +13,10 @@ import ( "strings" "time" - "golang.org/x/sync/errgroup" - "github.com/cenkalti/backoff/v4" "github.com/golang/glog" "github.com/livepeer/go-tools/drivers" + "golang.org/x/sync/errgroup" ) type ByteCounter struct { @@ -29,40 +28,57 @@ func (bc *ByteCounter) Write(p []byte) (n int, err error) { return n, nil } -func newExponentialBackOffExecutor() *backoff.ExponentialBackOff { +func newExponentialBackOffExecutor(initial, max, totalMax time.Duration) *backoff.ExponentialBackOff { backOff := backoff.NewExponentialBackOff() - backOff.InitialInterval = 30 * time.Second - backOff.MaxInterval = 2 * time.Minute - backOff.MaxElapsedTime = 0 // don't impose a timeout as part of the retries + backOff.InitialInterval = initial + backOff.MaxInterval = max + backOff.MaxElapsedTime = totalMax backOff.Reset() return backOff } +func NoRetries() backoff.BackOff { + return &backoff.StopBackOff{} +} + func UploadRetryBackoff() backoff.BackOff { - return backoff.WithMaxRetries(newExponentialBackOffExecutor(), 2) + return newExponentialBackOffExecutor(30*time.Second, 4*time.Minute, 15*time.Minute) } -const segmentWriteTimeout = 5 * time.Minute +func SingleRequestRetryBackoff() backoff.BackOff { + return newExponentialBackOffExecutor(5*time.Second, 10*time.Second, 30*time.Second) +} var expiryField = map[string]string{ "Object-Expires": "+168h", // Objects will be deleted after 7 days } -func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string) (*drivers.SaveDataOutput, error) { - if strings.HasSuffix(outputURI.Path, ".ts") || strings.HasSuffix(outputURI.Path, ".mp4") { +func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout time.Duration, storageFallbackURLs map[string]string, segTimeout time.Duration) (*drivers.SaveDataOutput, error) { + ext := filepath.Ext(outputURI.Path) + inputFile, err := os.CreateTemp("", "upload-*"+ext) + if err != nil { + return nil, fmt.Errorf("failed to write to temp file: %w", err) + } + inputFileName := inputFile.Name() + defer os.Remove(inputFileName) + + if ext == ".ts" || ext == ".mp4" { // For segments we just write them in one go here and return early. // (Otherwise the incremental write logic below caused issues with clipping since it results in partial segments being written.) - fileContents, err := io.ReadAll(input) + _, err = io.Copy(inputFile, input) if err != nil { - return nil, fmt.Errorf("failed to read file") + return nil, fmt.Errorf("failed to write to temp file: %w", err) + } + if err := inputFile.Close(); err != nil { + return nil, fmt.Errorf("failed to close input file: %w", err) } - out, bytesWritten, err := uploadFileWithBackup(outputURI, fileContents, nil, segmentWriteTimeout, true, storageFallbackURLs) + out, bytesWritten, err := uploadFileWithBackup(outputURI, inputFileName, nil, segTimeout, true, storageFallbackURLs) if err != nil { return nil, fmt.Errorf("failed to upload video %s: (%d bytes) %w", outputURI.Redacted(), bytesWritten, err) } - if err = extractThumb(outputURI, fileContents, storageFallbackURLs); err != nil { + if err = extractThumb(outputURI, inputFileName, storageFallbackURLs); err != nil { glog.Errorf("extracting thumbnail failed for %s: %v", outputURI.Redacted(), err) } return out, nil @@ -70,8 +86,11 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout // For the manifest files we want a very short cache ttl as the files are updating every few seconds fields := &drivers.FileProperties{CacheControl: "max-age=1"} - var fileContents []byte var lastWrite = time.Now() + // Keep the file handle closed while we wait for input data + if err := inputFile.Close(); err != nil { + return nil, fmt.Errorf("failed to close input file: %w", err) + } scanner := bufio.NewScanner(input) @@ -89,11 +108,21 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout for scanner.Scan() { b := scanner.Bytes() - fileContents = append(fileContents, b...) + + inputFile, err = os.OpenFile(inputFileName, os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + if _, err := inputFile.Write(b); err != nil { + return nil, fmt.Errorf("failed to append to input file: %w", err) + } + if err := inputFile.Close(); err != nil { + return nil, fmt.Errorf("failed to close input file: %w", err) + } // Only write the latest version of the data that's been piped in if enough time has elapsed since the last write if lastWrite.Add(waitBetweenWrites).Before(time.Now()) { - if _, _, err := uploadFileWithBackup(outputURI, fileContents, fields, writeTimeout, false, storageFallbackURLs); err != nil { + if _, _, err := uploadFileWithBackup(outputURI, inputFileName, fields, writeTimeout, false, storageFallbackURLs); err != nil { // Just log this error, since it'll effectively be retried after the next interval glog.Errorf("Failed to write: %v", err) } else { @@ -107,7 +136,7 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout } // We have to do this final write, otherwise there might be final data that's arrived since the last periodic write - if _, _, err := uploadFileWithBackup(outputURI, fileContents, fields, writeTimeout, true, storageFallbackURLs); err != nil { + if _, _, err := uploadFileWithBackup(outputURI, inputFileName, fields, writeTimeout, false, storageFallbackURLs); err != nil { // Don't ignore this error, since there won't be any further attempts to write return nil, fmt.Errorf("failed to write final save: %w", err) } @@ -115,20 +144,32 @@ func Upload(input io.Reader, outputURI *url.URL, waitBetweenWrites, writeTimeout return nil, nil } -func uploadFileWithBackup(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool, storageFallbackURLs map[string]string) (out *drivers.SaveDataOutput, bytesWritten int64, err error) { - out, bytesWritten, primaryErr := uploadFile(outputURI, fileContents, fields, writeTimeout, withRetries) - if primaryErr == nil { - return out, bytesWritten, nil +func uploadFileWithBackup(outputURI *url.URL, fileName string, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool, storageFallbackURLs map[string]string) (out *drivers.SaveDataOutput, bytesWritten int64, err error) { + retryPolicy := NoRetries() + if withRetries { + retryPolicy = UploadRetryBackoff() } + err = backoff.Retry(func() error { + var primaryErr error + out, bytesWritten, primaryErr = uploadFile(outputURI, fileName, fields, writeTimeout, withRetries) + if primaryErr == nil { + return nil + } - backupURI, err := buildBackupURI(outputURI, storageFallbackURLs) - if err != nil { - glog.Errorf("failed to build backup URL: %v", err) - return nil, 0, primaryErr - } + backupURI, err := buildBackupURI(outputURI, storageFallbackURLs) + if err != nil { + glog.Errorf("failed to build backup URL: %v", err) + return primaryErr + } + glog.Warningf("Primary upload failed, uploading to backupURL=%s primaryErr=%q", backupURI.Redacted(), primaryErr) - glog.Warningf("Primary upload failed, uploading to backupURL=%s primaryErr=%q", backupURI.Redacted(), primaryErr) - return uploadFile(backupURI, fileContents, fields, writeTimeout, withRetries) + out, bytesWritten, err = uploadFile(backupURI, fileName, fields, writeTimeout, withRetries) + if err == nil { + return nil + } + return fmt.Errorf("upload file errors: primary: %w; backup: %w", primaryErr, err) + }, retryPolicy) + return out, bytesWritten, err } func buildBackupURI(outputURI *url.URL, storageFallbackURLs map[string]string) (*url.URL, error) { @@ -142,7 +183,7 @@ func buildBackupURI(outputURI *url.URL, storageFallbackURLs map[string]string) ( return nil, fmt.Errorf("no backup URL found for %s", outputURI.Redacted()) } -func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool) (out *drivers.SaveDataOutput, bytesWritten int64, err error) { +func uploadFile(outputURI *url.URL, fileName string, fields *drivers.FileProperties, writeTimeout time.Duration, withRetries bool) (out *drivers.SaveDataOutput, bytesWritten int64, err error) { outputStr := outputURI.String() // While we wait for storj to implement an easier method for global object deletion we are hacking something // here to allow us to have recording objects deleted after 7 days. @@ -161,14 +202,20 @@ func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FilePro } session := driver.NewSession("") - var retryPolicy backoff.BackOff = &backoff.StopBackOff{} // no retries by default + retryPolicy := NoRetries() if withRetries { - retryPolicy = UploadRetryBackoff() + retryPolicy = SingleRequestRetryBackoff() } err = backoff.Retry(func() error { + file, err := os.Open(fileName) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + // To count how many bytes we are trying to read then write (upload) to s3 storage byteCounter := &ByteCounter{} - teeReader := io.TeeReader(bytes.NewReader(fileContents), byteCounter) + teeReader := io.TeeReader(file, byteCounter) out, err = session.SaveData(context.Background(), "", teeReader, fields, writeTimeout) bytesWritten = byteCounter.Count @@ -182,20 +229,16 @@ func uploadFile(outputURI *url.URL, fileContents []byte, fields *drivers.FilePro return out, bytesWritten, err } -func extractThumb(outputURI *url.URL, segment []byte, storageFallbackURLs map[string]string) error { +func extractThumb(outputURI *url.URL, segmentFileName string, storageFallbackURLs map[string]string) error { tmpDir, err := os.MkdirTemp(os.TempDir(), "thumb-*") if err != nil { return fmt.Errorf("temp file creation failed: %w", err) } defer os.RemoveAll(tmpDir) outFile := filepath.Join(tmpDir, "out.png") - inFile := filepath.Join(tmpDir, filepath.Base(outputURI.Path)) - if err = os.WriteFile(inFile, segment, 0644); err != nil { - return fmt.Errorf("failed to write input file: %w", err) - } args := []string{ - "-i", inFile, + "-i", segmentFileName, "-ss", "00:00:00", "-vframes", "1", "-vf", "scale=640:360:force_original_aspect_ratio=decrease", @@ -217,16 +260,6 @@ func extractThumb(outputURI *url.URL, segment []byte, storageFallbackURLs map[st return fmt.Errorf("ffmpeg failed[%s] [%s]: %w", outputBuf.String(), stdErr.String(), err) } - f, err := os.Open(outFile) - if err != nil { - return fmt.Errorf("opening file failed: %w", err) - } - defer f.Close() - thumbData, err := io.ReadAll(f) - if err != nil { - return fmt.Errorf("failed to read file: %w", err) - } - // two thumbs, one at session level, the other at stream level thumbURLs := []*url.URL{outputURI.JoinPath("../latest.png"), outputURI.JoinPath("../../../latest.png")} fields := &drivers.FileProperties{CacheControl: "max-age=5"} @@ -235,7 +268,7 @@ func extractThumb(outputURI *url.URL, segment []byte, storageFallbackURLs map[st for _, thumbURL := range thumbURLs { thumbURL := thumbURL errGroup.Go(func() error { - _, _, err = uploadFileWithBackup(thumbURL, thumbData, fields, 10*time.Second, true, storageFallbackURLs) + _, _, err = uploadFileWithBackup(thumbURL, outFile, fields, 10*time.Second, true, storageFallbackURLs) if err != nil { return fmt.Errorf("saving thumbnail failed: %w", err) } diff --git a/core/uploader_test.go b/core/uploader_test.go index 649528b..9717b57 100644 --- a/core/uploader_test.go +++ b/core/uploader_test.go @@ -36,7 +36,7 @@ func TestItWritesSlowInputIncrementally(t *testing.T) { go func() { u, err := url.Parse(outputFile.Name()) require.NoError(t, err) - _, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil) + _, err = Upload(slowReader, u, 100*time.Millisecond, time.Second, nil, time.Minute) require.NoError(t, err, "") }() @@ -67,6 +67,9 @@ func TestUploadFileWithBackup(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) + testFile := filepath.Join(dir, "input.txt") + require.NoError(t, os.WriteFile(testFile, []byte("test"), 0644)) + fakeStorage := "s3+https://fake.service.livepeer.com/bucket/" backupStorage := filepath.Join(dir, "backup") + "/" fakeOutput := fakeStorage + "hls/123/file.txt" @@ -75,7 +78,7 @@ func TestUploadFileWithBackup(t *testing.T) { storageFallbackURLs := map[string]string{ fakeStorage: "file://" + backupStorage, } - out, written, err := uploadFileWithBackup(mustParseURL(fakeOutput), []byte("test"), nil, 0, false, storageFallbackURLs) + out, written, err := uploadFileWithBackup(mustParseURL(fakeOutput), testFile, nil, 0, false, storageFallbackURLs) require.NoError(t, err) require.Equal(t, expectedOutFile, out.URL) require.Equal(t, int64(4), written)