Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: various fixes for non-minio s3 endpoints #526

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions pkg/boot/alldone.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package boot

import (
"context"
"fmt"
"os"
"strings"

"lunchpail.io/pkg/be"
Expand All @@ -22,21 +24,12 @@ func waitForAllDone(ctx context.Context, backend be.Backend, run queue.RunContex
run.Bucket = client.RunContext.Bucket
defer client.Stop()

alldone := run.AsFile(queue.AllDoneMarker)
objc, errc := client.Listen(run.Bucket, alldone, "", false)
if err := client.WaitTillExists(run.Bucket, run.AsFile(queue.AllDoneMarker)); err != nil {
return err
}

for {
select {
case <-objc:
return nil
case <-ctx.Done():
return nil
case err := <-errc:
if err == nil || strings.Contains(err.Error(), "EOF") || strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "Connection closed") {
return nil
} else {
return err
}
}
if opts.Verbose {
fmt.Fprintln(os.Stderr, "Got all done. Cleaning up", run.Step)
}
return nil
}
3 changes: 2 additions & 1 deletion pkg/boot/failures.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package boot

import (
"context"
"errors"
"fmt"
"os"
"strings"
Expand Down Expand Up @@ -35,7 +36,7 @@ func lookForTaskFailures(ctx context.Context, backend be.Backend, run queue.RunC
case err := <-errc:
if err == nil || strings.Contains(err.Error(), "EOF") || strings.Contains(err.Error(), "connection refused") {
done = true
} else {
} else if !errors.Is(err, s3.ListenNotSupportedError) {
fmt.Fprintln(os.Stderr, err)
}
case object := <-objc:
Expand Down
51 changes: 34 additions & 17 deletions pkg/boot/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"os/signal"
"strings"

"lunchpail.io/pkg/be"
"lunchpail.io/pkg/build"
Expand Down Expand Up @@ -95,6 +96,7 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption
// Wait for a SIGINT
for {
select {
case <-cancellable.Done():
case <-sigint:
gotSigInt = true

Expand Down Expand Up @@ -127,20 +129,38 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption
// We need to chain the isRunning channel to our 0-2 consumers
// below. This is because golang channels are not multicast.
isRunning := make(chan llir.Context) // is the job ready for business?
isRunning5 := make(chan llir.Context)
isRunning6 := make(chan llir.Context)
needsCatAndRedirect := len(opts.Inputs) > 0 || ir.Context.Run.Step > 0
go func() {
ctx := <-isRunning
isRunning5 <- ctx
isRunning5 <- ctx
isRunning6 <- ctx
isRunning6 <- ctx
isRunning6 <- ctx
if opts.Executable != "" {
isRunning5 <- ctx
isRunning6 <- ctx
}
if needsCatAndRedirect {
isRunning5 <- ctx
isRunning6 <- ctx
}
if opts.Watch {
isRunning5 <- ctx
isRunning6 <- ctx
}
}()

alldone := make(chan struct{})
var errorFromAllDone error
go func() {
ctx := <-isRunning6
if ctx.Run.Step == 0 || isFinalStep(ctx) {
errorFromAllDone = waitForAllDone(cancellable, backend, ctx.Run, *opts.BuildOptions.Log)
if errorFromAllDone != nil && strings.Contains(errorFromAllDone.Error(), "connection refused") {
// Then Minio went away on its own. That's probably ok.
errorFromAllDone = nil
}
alldone <- struct{}{}
cancel()
} else {
alldone <- struct{}{}
}
}()

Expand All @@ -150,7 +170,7 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption
// Behave like `cat inputs | ... > outputs`
go func() {
// wait for the run to be ready for us to enqueue
<-isRunning5
<-isRunning6

defer func() { redirectDone <- struct{}{} }()
if err := catAndRedirect(cancellable, opts.Inputs, backend, ir, *opts.BuildOptions.Log); err != nil {
Expand All @@ -164,21 +184,21 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption
if opts.Watch {
verbose := opts.BuildOptions.Log.Verbose
go func() {
<-isRunning5
<-isRunning6
go watchLogs(cancellable, backend, ir, logsDone, WatchOptions{Verbose: verbose})
go watchUtilization(cancellable, backend, ir, WatchOptions{Verbose: verbose})
}()
}

go func() {
if err := handlePipelineStdout(<-isRunning5); err != nil {
if err := handlePipelineStdout(<-isRunning6); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}()

var errorFromTask error
go func() {
<-isRunning5
<-isRunning6
if err := lookForTaskFailures(cancellable, backend, ir.Context.Run, *opts.BuildOptions.Log); err != nil {
errorFromTask = err
// fail fast? cancel()
Expand All @@ -189,9 +209,9 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption
if opts.Executable != "" {
go func() {
// wait for the run to be ready for us to enqueue
<-isRunning5
<-isRunning6

if err := s3.UploadFiles(ctx, backend, ir.Context.Run, []upload.Upload{upload.Upload{LocalPath: opts.Executable, TargetDir: ir.Context.Run.AsFile(q.Blobs)}}, *opts.BuildOptions.Log); err != nil {
if err := s3.UploadFiles(cancellable, backend, ir.Context.Run, []upload.Upload{upload.Upload{LocalPath: opts.Executable, TargetDir: ir.Context.Run.AsFile(q.Blobs)}}, *opts.BuildOptions.Log); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}()
Expand All @@ -204,15 +224,12 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption
err := backend.Down(cancellable, ir, opts.BuildOptions)
}()*/

<-alldone

if needsCatAndRedirect {
<-redirectDone
}

var errorFromAllDone error
if ir.Context.Run.Step == 0 {
errorFromAllDone = waitForAllDone(cancellable, backend, ir.Context.Run, *opts.BuildOptions.Log)
}

// Note the use of `select` to implement a non-blocking send
select {
case submissionComplete <- struct{}{}:
Expand Down
17 changes: 10 additions & 7 deletions pkg/observe/queuestreamer/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package queuestreamer

import (
"context"
"errors"
"fmt"
"os"
"strings"
Expand Down Expand Up @@ -48,7 +49,7 @@ func StreamModel(ctx context.Context, s3 s3.S3Client, run queue.RunContext, mode
}

func isFatal(err error) bool {
return err != nil && strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "unexpected EOF")
return err != nil && (strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "unexpected EOF"))
}

func once(ctx context.Context, c client, modelChan chan Model, doneChan chan struct{}, opts StreamOptions) error {
Expand Down Expand Up @@ -80,10 +81,12 @@ func once(ctx context.Context, c client, modelChan chan Model, doneChan chan str
return err
}

fmt.Fprintf(os.Stderr, "Queue streamer got push notification error: %v\n", err)
if err != nil && !errors.Is(err, s3.ListenNotSupportedError) {
fmt.Fprintf(os.Stderr, "Queue streamer got push notification error: %v\n", err)

// sleep for a bit
time.Sleep(time.Duration(opts.PollingInterval) * time.Second)
// sleep for a bit
time.Sleep(time.Duration(opts.PollingInterval) * time.Second)
}

case obj := <-objects:
if c.LogOptions.Verbose {
Expand All @@ -96,9 +99,9 @@ func once(ctx context.Context, c client, modelChan chan Model, doneChan chan str
}
return nil
}
}

// fetch and parse model
modelChan <- c.fetchModel(opts.AnyStep)
// fetch and parse model
modelChan <- c.fetchModel(opts.AnyStep)
}
}
}
3 changes: 2 additions & 1 deletion pkg/runtime/builtins/redirect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package builtins

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -57,7 +58,7 @@ func RedirectTo(ctx context.Context, client s3.S3Client, run queue.RunContext, f
case err := <-outboxErrs:
if err == nil || strings.Contains(err.Error(), "EOF") {
done = true
} else {
} else if !errors.Is(err, s3.ListenNotSupportedError) {
fmt.Fprintln(os.Stderr, err)
}
case object := <-outboxObjects:
Expand Down
4 changes: 3 additions & 1 deletion pkg/runtime/queue/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type S3Client struct {
client *minio.Client
endpoint string
Paths filepaths
ak string
sk string
}

type S3ClientStop struct {
Expand Down Expand Up @@ -65,7 +67,7 @@ func NewS3ClientFromOptions(ctx context.Context, opts S3ClientOptions) (S3Client
return S3Client{}, err
}

return S3Client{ctx, client, opts.Endpoint, paths}, nil
return S3Client{ctx, client, opts.Endpoint, paths, opts.AccessKeyID, opts.SecretAccessKey}, nil
}

// Client for a given run in the given backend
Expand Down
7 changes: 6 additions & 1 deletion pkg/runtime/queue/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ func (s3 S3Client) BucketExists(bucket string) (bool, error) {
return yup, nil
}

func (s3 S3Client) isBucketAlreadyExistsError(bucket string, err error) bool {
return strings.Contains(err.Error(), "Your previous request to create the named bucket succeeded and you already own it") || // Minio
strings.Contains(err.Error(), "Container "+bucket+" exists") // IBM Cloud Object Storage
}

func (s3 S3Client) Mkdirp(bucket string) error {
exists, err := s3.BucketExists(bucket)
if err != nil {
Expand All @@ -262,7 +267,7 @@ func (s3 S3Client) Mkdirp(bucket string) error {

if !exists {
if err := s3.client.MakeBucket(s3.context, bucket, minio.MakeBucketOptions{}); err != nil {
if !strings.Contains(err.Error(), "Your previous request to create the named bucket succeeded and you already own it") {
if !s3.isBucketAlreadyExistsError(bucket, err) {
// bucket already exists error
return err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/runtime/queue/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ func (s3 S3Client) copyInDir(bucket string, spec upload.Upload, opts build.LogOp

func (s3 S3Client) copyInFile(bucket, localPath string, spec upload.Upload, opts build.LogOptions) error {
for i := range 10 {
select {
case <-s3.context.Done():
return nil
default:
}

var dst string
switch spec.TargetDir {
case "":
Expand Down
Loading
Loading