From 58a35579bc6906e623f75c21a4a452e2f4ba1f3b Mon Sep 17 00:00:00 2001 From: Pedro Fialho Date: Fri, 16 Sep 2022 01:44:57 +0000 Subject: [PATCH] Revert "Merge: [SYSINFRA-554] Deprecate nocacheextern flag (grailbio/grail!3658)" (grailbio/grail!5650) Approved-by: Jim Clune GitLab URL: https://gitlab.com/grailbio/grail/-/merge_requests/5650 fbshipit-source-id: cf457c2 --- blob/mux.go | 36 --- flow/eval.go | 25 +- flow/eval_test.go | 388 ++++++++++++-------------------- repository/blobrepo/blobrepo.go | 14 ++ runtime/runflags.go | 21 +- sched/scheduler.go | 96 +++----- sched/scheduler_test.go | 16 +- test/testutil/repository.go | 81 ++++--- 8 files changed, 277 insertions(+), 400 deletions(-) diff --git a/blob/mux.go b/blob/mux.go index f58e171b..6cc78ada 100644 --- a/blob/mux.go +++ b/blob/mux.go @@ -124,42 +124,6 @@ func (m Mux) CanTransfer(ctx context.Context, dsturl, srcurl string) (bool, erro return true, nil } -// NeedTransfer returns whether src needs to be transferred to the location -// of dst. It expects both src and dst to be reference files, and it only -// determines that a transfer is unnecessary if the objects have the same ETag -// or ContentHash. -func (m Mux) NeedTransfer(ctx context.Context, dst, src reflow.File) (bool, error) { - if src.Size != 0 && dst.Size != 0 && src.Size != dst.Size { - return true, nil - } - // An ETag mismatch doesn't necessarily mean that the objects have different - // contents. E.g. the ETag of an S3 object uploaded via multipart copy is - // not a digest of the object data (https://docs.aws.amazon.com/AmazonS3/latest/API/API_Object.html). - if src.ETag != "" && src.ETag == dst.ETag { - return false, nil - } - // A zero ContentHash doesn't necessarily mean that the field is missing - // from the object's metadata. - if src.ContentHash.IsZero() { - var err error - src, err = m.File(ctx, src.Source) - if err != nil { - return false, err - } - } - if dst.ContentHash.IsZero() { - var err error - dst, err = m.File(ctx, dst.Source) - if err != nil { - return false, err - } - } - if !src.ContentHash.IsZero() && !dst.ContentHash.IsZero() { - return src.ContentHash != dst.ContentHash, nil - } - return true, nil -} - // Transfer transfers the contents of object in srcurl to dsturl. // errors.NotSupported is returned if the transfer is not possible. func (m Mux) Transfer(ctx context.Context, dsturl, srcurl string) error { diff --git a/flow/eval.go b/flow/eval.go index e4da23ab..a13ab1ac 100644 --- a/flow/eval.go +++ b/flow/eval.go @@ -142,10 +142,13 @@ type EvalConfig struct { RunID taskdb.RunID // CacheMode determines whether the evaluator reads from - // or writes to the cache. If CacheMode is nonzero, Assoc, + // or writees to the cache. If CacheMode is nonzero, Assoc, // Repository, and Transferer must be non-nil. CacheMode infra2.CacheMode + // NoCacheExtern determines whether externs are cached. + NoCacheExtern bool + // RecomputeEmpty determines whether cached empty values // are recomputed. RecomputeEmpty bool @@ -205,6 +208,11 @@ func (e EvalConfig) String() string { } var flags []string + if e.NoCacheExtern { + flags = append(flags, "nocacheextern") + } else { + flags = append(flags, "cacheextern") + } if e.CacheMode == infra2.CacheOff { flags = append(flags, "nocache") } else { @@ -838,6 +846,9 @@ func (e *Eval) returnFlow(f *Flow) { // incur extra computation, thus dirtying does not work when dirtying // nodes are hidden behind maps, continuations, or coercions. func (e *Eval) dirty(f *Flow) bool { + if !e.NoCacheExtern { + return false + } if f.Op == Extern { return true } @@ -884,7 +895,7 @@ func (e *Eval) todo(f *Flow, visited flowOnce, v *FlowVisitor) { } } switch f.Op { - case Intern, Exec: + case Intern, Exec, Extern: if !e.BottomUp && e.CacheMode.Reading() && !e.dirty(f) { v.Push(f) e.Mutate(f, NeedLookup) @@ -1113,7 +1124,7 @@ func (e *Eval) CacheWrite(ctx context.Context, f *Flow) error { ctx, endTrace = trace.Start(ctx, trace.Cache, f.Digest(), fmt.Sprintf("cache write %s", f.Digest().Short())) defer endTrace() switch f.Op { - case Intern, Exec: + case Intern, Extern, Exec: default: return nil } @@ -1126,6 +1137,12 @@ func (e *Eval) CacheWrite(ctx context.Context, f *Flow) error { if f.Err != nil || f.State != Done { return nil } + if f.Op == Data { + return nil + } + if e.NoCacheExtern && f.Op == Extern { + return nil + } keys := f.CacheKeys() if len(keys) == 0 { return nil @@ -1235,7 +1252,7 @@ func (e *Eval) batchLookup(ctx context.Context, flows ...*Flow) { batch := make(assoc.Batch) for _, f := range flows { - if !e.valid(f) || !e.CacheMode.Reading() || f.Op == Extern || f == e.root { + if !e.valid(f) || !e.CacheMode.Reading() || e.NoCacheExtern && (f.Op == Extern || f == e.root) { e.lookupFailed(f) continue } diff --git a/flow/eval_test.go b/flow/eval_test.go index 711b8092..e8adcf17 100644 --- a/flow/eval_test.go +++ b/flow/eval_test.go @@ -256,50 +256,52 @@ func TestExecRetry(t *testing.T) { func TestCacheWrite(t *testing.T) { for _, bottomup := range []bool{false, true} { - // The following is done in a func to defer context cancellations. - func() { - e, config, done := newTestScheduler() - defer done() - config.CacheMode = infra.CacheRead | infra.CacheWrite - config.BottomUp = bottomup - - intern := op.Intern("internurl") - exec := op.Exec("image", "command", testutil.Resources, intern) - groupby := op.Groupby("(.*)", exec) - pullup := op.Pullup(groupby) - - eval := flow.NewEval(pullup, config) - ctx, cancel := context.WithTimeout(context.Background(), timeout) - rc := testutil.EvalAsync(ctx, eval) - var ( - internValue = testutil.WriteFiles(e.Repo, "ignored") - execValue = testutil.WriteFiles(e.Repo, "a", "b", "c", "d") - ) - e.Ok(ctx, intern, internValue) - e.Ok(ctx, exec, execValue) - r := <-rc - cancel() - if r.Err != nil { - t.Fatal(r.Err) - } - if got, want := r.Val, execValue; !got.Equal(want) { - t.Errorf("got %v, want %v", got, want) - } - if got, want := testutil.Exists(eval, intern.CacheKeys()...), true; got != want { - t.Errorf("got %v, want %v", got, want) - } - if got, want := testutil.Value(eval, exec.Digest()), execValue; !testutil.Exists(eval, exec.CacheKeys()...) || !got.Equal(want) { - t.Errorf("got %v, want %v", got, want) - } - }() + e, config, done := newTestScheduler() + defer done() + config.CacheMode = infra.CacheRead | infra.CacheWrite + config.BottomUp = bottomup + + intern := op.Intern("internurl") + exec := op.Exec("image", "command", testutil.Resources, intern) + groupby := op.Groupby("(.*)", exec) + pullup := op.Pullup(groupby) + + eval := flow.NewEval(pullup, config) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + rc := testutil.EvalAsync(ctx, eval) + var ( + internValue = testutil.WriteFiles(e.Repo, "ignored") + execValue = testutil.WriteFiles(e.Repo, "a", "b", "c", "d") + ) + e.Ok(ctx, intern, internValue) + e.Ok(ctx, exec, execValue) + r := <-rc + cancel() + if r.Err != nil { + t.Fatal(r.Err) + } + if got, want := r.Val, execValue; !got.Equal(want) { + t.Errorf("got %v, want %v", got, want) + } + if got, want := testutil.Exists(eval, intern.CacheKeys()...), true; got != want { + t.Errorf("got %v, want %v", got, want) + } + if got, want := testutil.Value(eval, exec.Digest()), execValue; !testutil.Exists(eval, exec.CacheKeys()...) || !got.Equal(want) { + t.Errorf("got %v, want %v", got, want) + } } } func TestCacheLookupFilesetMigration(t *testing.T) { *debug = true intern := op.Intern("internurl") - exec := op.Exec("image", "command", testutil.Resources, intern) - extern := op.Extern("externurl", exec) + groupby := op.Groupby("(.*)", intern) + m := newMapper(func(f *flow.Flow) *flow.Flow { + return op.Exec("image", "command", testutil.Resources, f) + }) + mapCollect := op.Map(m.mapFunc, groupby) + pullup := op.Pullup(mapCollect) + extern := op.Extern("externurl", pullup) e, config, done := newTestScheduler() defer done() @@ -318,32 +320,30 @@ func TestCacheLookupFilesetMigration(t *testing.T) { if pErr != nil { t.Fatal(pErr) } - if err := eval.Assoc.Store(ctx, assoc.Fileset, exec.Digest(), v1Digest); err != nil { + if err := eval.Assoc.Store(ctx, assoc.Fileset, extern.Digest(), v1Digest); err != nil { t.Fatal(err) } cancel() ctx, cancel = context.WithTimeout(context.Background(), timeout) rc := testutil.EvalAsync(ctx, eval) - e.Ok(ctx, extern, reflow.Fileset{}) r := <-rc cancel() if r.Err != nil { t.Fatal(r.Err) } - if !e.Equiv(extern) { // only extern is executed, eval will read v1 fileset format - t.Error("wrong set of expected flows") + if !e.Equiv() { //no flows to be executed, eval will read v1 fileset format + t.Error("did not expect any flows to be executed") } // expect eval to also writeback v2 format - fsWriteback := testutil.Value(eval, exec.Digest()) + fsWriteback := testutil.Value(eval, extern.Digest()) if diff, nomatch := fsWriteback.Diff(fs); nomatch { t.Errorf("expected writeback fs to match input fs but found following diff: %s", diff) } } func TestCacheLookup(t *testing.T) { - // TopDown evaluation with cache entries for intern and execs. intern := op.Intern("internurl") groupby := op.Groupby("(.*)", intern) m := newMapper(func(f *flow.Flow) *flow.Flow { @@ -358,25 +358,18 @@ func TestCacheLookup(t *testing.T) { config.CacheMode = infra.CacheRead | infra.CacheWrite eval := flow.NewEval(extern, config) - // Write cache entries for intern and execs. - testutil.WriteCache(eval, intern.Digest(), "a", "b") - testutil.WriteCache(eval, m.mapFunc(flowFiles("a")).Digest(), "c") - testutil.WriteCache(eval, m.mapFunc(flowFiles("b")).Digest(), "d") - + testutil.WriteCache(eval, extern.Digest()) ctx, cancel := context.WithTimeout(context.Background(), timeout) rc := testutil.EvalAsync(ctx, eval) - // Prevent scheduler submission because externs are not cached. - e.Ok(ctx, extern, reflow.Fileset{}) r := <-rc cancel() if r.Err != nil { t.Fatal(r.Err) } - if !e.Equiv(extern) { // only the extern is executed. - t.Error("wrong set of expected flows") + if !e.Equiv() { //no flows to be executed + t.Error("did not expect any flows to be executed") } - // TopDown evaluation with cache entry for intern. e, config, done = newTestScheduler() defer done() config.CacheMode = infra.CacheRead | infra.CacheWrite @@ -419,31 +412,25 @@ func TestCacheLookupWithAssertions(t *testing.T) { defer done() config.CacheMode = infra.CacheRead | infra.CacheWrite config.CacheLookupTimeout = 100 * time.Millisecond - config.AssertionGenerator = newTestGenerator(map[string]string{"c1": "v1"}) + config.AssertionGenerator = newTestGenerator(map[string]string{"c": "v1"}) config.Assert = reflow.AssertExact eval := flow.NewEval(extern, config) - // Write cached results with the same values returned by the generator. - testutil.WriteCache(eval, intern.Digest(), "a1", "b1") - fs := testutil.WriteFiles(eval.Repository, "c1") - assertion := reflow.AssertionsFromEntry( - reflow.AssertionKey{Subject: "c1", Namespace: "blob"}, map[string]string{"etag": "v1"}) - if err := fs.AddAssertions(assertion); err != nil { - t.Fatal(err) - } - testutil.WriteCacheFileset(eval, m.mapFunc(flowFiles("a1")).Digest(), fs) - testutil.WriteCacheFileset(eval, m.mapFunc(flowFiles("b1")).Digest(), fs) + // Write a cached result with same value returned by the generator. + fs := testutil.WriteFiles(eval.Repository, "c") + _ = fs.AddAssertions(reflow.AssertionsFromEntry( + reflow.AssertionKey{Subject: "c", Namespace: "blob"}, map[string]string{"etag": "v1"})) + testutil.WriteCacheFileset(eval, extern.Digest(), fs) ctx, cancel := context.WithTimeout(context.Background(), timeout) rc := testutil.EvalAsync(ctx, eval) - e.Ok(ctx, extern, reflow.Fileset{}) r := <-rc cancel() if r.Err != nil { t.Fatal(r.Err) } - if !e.Equiv(extern) { - t.Error("wrong set of expected flows") + if !e.Equiv() { //no flows to be executed + t.Error("did not expect any flows to be executed") } fmt.Println("finished first async eval") @@ -452,40 +439,36 @@ func TestCacheLookupWithAssertions(t *testing.T) { defer done() config.CacheMode = infra.CacheRead | infra.CacheWrite config.CacheLookupTimeout = 100 * time.Millisecond - config.AssertionGenerator = newTestGenerator(map[string]string{"c2": "v1"}) + config.AssertionGenerator = newTestGenerator(map[string]string{"c": "v1"}) config.Assert = reflow.AssertExact eval = flow.NewEval(extern, config) - testutil.WriteCache(eval, intern.Digest(), "a2", "b2") + testutil.WriteCache(eval, intern.Digest(), "a", "b") // Write a cached result with different value returned by the generator. - fs = testutil.WriteFiles(eval.Repository, "c2") - assertion = reflow.AssertionsFromEntry( - reflow.AssertionKey{Subject: "c2", Namespace: "blob"}, map[string]string{"etag": "v2"}) - if err := fs.AddAssertions(assertion); err != nil { - t.Fatal(err) - } - testutil.WriteCacheFileset(eval, m.mapFunc(flowFiles("a2")).Digest(), fs) - testutil.WriteCacheFileset(eval, m.mapFunc(flowFiles("b2")).Digest(), fs) + fs = testutil.WriteFiles(eval.Repository, "c") + _ = fs.AddAssertions(reflow.AssertionsFromEntry( + reflow.AssertionKey{Subject: "c", Namespace: "blob"}, map[string]string{"etag": "v2"})) + testutil.WriteCacheFileset(eval, extern.Digest(), fs) ctx, cancel = context.WithTimeout(context.Background(), timeout) rc = testutil.EvalAsync(ctx, eval) for _, v := range []reflow.Fileset{ - testutil.WriteFiles(e.Repo, "a2"), - testutil.WriteFiles(e.Repo, "b2"), + testutil.WriteFiles(e.Repo, "a"), + testutil.WriteFiles(e.Repo, "b"), } { v := v f := m.mapFunc(&flow.Flow{Op: flow.Val, Value: values.T(v), State: flow.Done}) go e.Ok(ctx, f, v) // identity } - testutil.WriteFiles(e.Repo, "c2") - e.Ok(ctx, extern, reflow.Fileset{}) + testutil.WriteFiles(e.Repo, "c") + e.Ok(ctx, extern, fs) r = <-rc cancel() if r.Err != nil { t.Fatal(r.Err) } - if !e.Equiv(extern, m.mapFunc(flowFiles("a2")), m.mapFunc(flowFiles("b2"))) { + if !e.Equiv(extern, m.mapFunc(flowFiles("a")), m.mapFunc(flowFiles("b"))) { t.Error("wrong set of expected flows") } @@ -493,42 +476,38 @@ func TestCacheLookupWithAssertions(t *testing.T) { defer done() config.CacheMode = infra.CacheRead | infra.CacheWrite config.CacheLookupTimeout = 100 * time.Millisecond - config.AssertionGenerator = newTestGenerator(map[string]string{"c3": "v1"}) + config.AssertionGenerator = newTestGenerator(map[string]string{"c": "v1"}) config.Assert = reflow.AssertExact eval = flow.NewEval(extern, config) - testutil.WriteCache(eval, intern.Digest(), "a3", "b3") + testutil.WriteCache(eval, intern.Digest(), "a", "b") // Write a cached result with an assertion for which the generator will return an error. - fs = testutil.WriteFiles(eval.Repository, "c3") - assertion = reflow.AssertionsFromEntry( - reflow.AssertionKey{Subject: "c3", Namespace: "error"}, map[string]string{"etag": "v"}) - if err := fs.AddAssertions(assertion); err != nil { - t.Fatal(err) - } - testutil.WriteCacheFileset(eval, m.mapFunc(flowFiles("a3")).Digest(), fs) - testutil.WriteCacheFileset(eval, m.mapFunc(flowFiles("b3")).Digest(), fs) + fs = testutil.WriteFiles(eval.Repository, "c") + _ = fs.AddAssertions(reflow.AssertionsFromEntry( + reflow.AssertionKey{Subject: "c", Namespace: "error"}, map[string]string{"etag": "v"})) + testutil.WriteCacheFileset(eval, extern.Digest(), fs) ctx, cancel = context.WithTimeout(context.Background(), timeout) defer cancel() rc = testutil.EvalAsync(ctx, eval) for _, v := range []reflow.Fileset{ - testutil.WriteFiles(e.Repo, "a3"), - testutil.WriteFiles(e.Repo, "b3"), + testutil.WriteFiles(e.Repo, "a"), + testutil.WriteFiles(e.Repo, "b"), } { v := v f := m.mapFunc(&flow.Flow{Op: flow.Val, Value: values.T(v), State: flow.Done}) go e.Ok(ctx, f, v) // identity } - testutil.WriteFiles(e.Repo, "c3") + testutil.WriteFiles(e.Repo, "c") e.Ok(ctx, extern, fs) r = <-rc if r.Err != nil { t.Fatal(r.Err) } - if !e.Equiv(extern, m.mapFunc(flowFiles("a3")), m.mapFunc(flowFiles("b3"))) { + if !e.Equiv(extern, m.mapFunc(flowFiles("a")), m.mapFunc(flowFiles("b"))) { t.Error("wrong set of expected flows") } } @@ -636,6 +615,7 @@ func TestCacheLookupBottomupPhysical(t *testing.T) { internFiles, execFiles := "a:same_contents", "same_exec_result" testutil.WriteCache(eval, internA.Digest(), internFiles) testutil.WriteFile(e.Repo, execFiles) + testutil.WriteCache(eval, extern.Digest(), "extern_result") ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() @@ -653,12 +633,11 @@ func TestCacheLookupBottomupPhysical(t *testing.T) { time.Sleep(200 * time.Millisecond) // Now define internB's result (same as internA) e.Ok(ctx, internB, testutil.WriteFiles(e.Repo, internFiles)) - e.Ok(ctx, extern, reflow.Fileset{}) r := <-rc if r.Err != nil { t.Fatal(r.Err) } - if !e.Equiv(extern, execA, internB) { + if !e.Equiv(execA, internB) { t.Error("wrong set of expected flows") } } @@ -693,25 +672,16 @@ func TestCacheLookupBottomupWithAssertions(t *testing.T) { fsA, fsB, fsC := testutil.Files("a"), testutil.Files("b"), testutil.Files("c") // "a" has "v1" and will get "v1" from the generator, so the cache hit will be accepted. - assertionA := reflow.AssertionsFromEntry( - reflow.AssertionKey{Subject: "a", Namespace: "blob"}, map[string]string{"etag": "v1"}) - if err := fsA.AddAssertions(assertionA); err != nil { - t.Fatal(err) - } + _ = fsA.AddAssertions(reflow.AssertionsFromEntry( + reflow.AssertionKey{Subject: "a", Namespace: "blob"}, map[string]string{"etag": "v1"})) testutil.WriteCacheFileset(eval, m.mapFunc(flowFiles("a")).Digest(), fsA) // "b" has "v2" but will get "v1" from the generator, so the cache hit will be rejected. - assertionB := reflow.AssertionsFromEntry( - reflow.AssertionKey{Subject: "b", Namespace: "blob"}, map[string]string{"etag": "v2"}) - if err := fsB.AddAssertions(assertionB); err != nil { - t.Fatal(err) - } + _ = fsB.AddAssertions(reflow.AssertionsFromEntry( + reflow.AssertionKey{Subject: "b", Namespace: "blob"}, map[string]string{"etag": "v2"})) testutil.WriteCacheFileset(eval, m.mapFunc(flowFiles("b")).Digest(), fsB) // "c" has "error" in the namespace so the generator will error out, so the cache hit will be rejected. - assertionC := reflow.AssertionsFromEntry( - reflow.AssertionKey{Subject: "c", Namespace: "error"}, map[string]string{"etag": "v"}) - if err := fsC.AddAssertions(assertionC); err != nil { - t.Fatal(err) - } + _ = fsC.AddAssertions(reflow.AssertionsFromEntry( + reflow.AssertionKey{Subject: "c", Namespace: "error"}, map[string]string{"etag": "v"})) testutil.WriteCacheFileset(eval, m.mapFunc(flowFiles("c")).Digest(), fsC) ctx, cancel := context.WithTimeout(context.Background(), timeout) @@ -753,21 +723,12 @@ func TestCacheLookupBottomupWithAssertExact(t *testing.T) { fsA, fsB, fsC := testutil.Files("a"), testutil.Files("b"), testutil.Files("c") // All three will be cache-hits. - assertionA := reflow.AssertionsFromEntry( - reflow.AssertionKey{Subject: "a", Namespace: "blob"}, map[string]string{"etag": "va"}) - if err := fsA.AddAssertions(assertionA); err != nil { - t.Fatal(err) - } - assertionB := reflow.AssertionsFromEntry( - reflow.AssertionKey{Subject: "b", Namespace: "blob"}, map[string]string{"etag": "vb"}) - if err := fsB.AddAssertions(assertionB); err != nil { - t.Fatal(err) - } - assertionC := reflow.AssertionsFromEntry( - reflow.AssertionKey{Subject: "c", Namespace: "blob"}, map[string]string{"etag": "vc"}) - if err := fsC.AddAssertions(assertionC); err != nil { - t.Fatal(err) - } + _ = fsA.AddAssertions(reflow.AssertionsFromEntry( + reflow.AssertionKey{Subject: "a", Namespace: "blob"}, map[string]string{"etag": "va"})) + _ = fsB.AddAssertions(reflow.AssertionsFromEntry( + reflow.AssertionKey{Subject: "b", Namespace: "blob"}, map[string]string{"etag": "vb"})) + _ = fsC.AddAssertions(reflow.AssertionsFromEntry( + reflow.AssertionKey{Subject: "c", Namespace: "blob"}, map[string]string{"etag": "vc"})) testutil.WriteCacheFileset(eval, mapFunc(flowFiles("a")).Digest(), fsA) testutil.WriteCacheFileset(eval, mapFunc(flowFiles("b")).Digest(), fsB) testutil.WriteCacheFileset(eval, mapFunc(flowFiles("c")).Digest(), fsC) @@ -786,7 +747,6 @@ func TestCacheLookupBottomupWithAssertExact(t *testing.T) { } func TestCacheLookupBottomupWithAssertNever(t *testing.T) { - // TopDown evaluation with filesets containing invalid assertions. intern := op.Intern("internurl") groupby := op.Groupby("(.*)", intern) m := newMapper(func(f *flow.Flow) *flow.Flow { @@ -804,35 +764,23 @@ func TestCacheLookupBottomupWithAssertNever(t *testing.T) { config.Assert = reflow.AssertNever eval := flow.NewEval(extern, config) - testutil.WriteCache(eval, intern.Digest(), "a", "b") - fsA := testutil.Files("a") - assertionA := reflow.AssertionsFromEntry( - reflow.AssertionKey{Subject: "a", Namespace: "blob"}, map[string]string{"etag": "invalid"}) - if err := fsA.AddAssertions(assertionA); err != nil { - t.Fatal(err) - } - testutil.WriteCacheFileset(eval, m.mapFunc(flowFiles("a")).Digest(), fsA) - fsB := testutil.Files("b") - assertionB := reflow.AssertionsFromEntry( - reflow.AssertionKey{Subject: "b", Namespace: "blob"}, map[string]string{"etag": "invalid"}) - if err := fsB.AddAssertions(assertionB); err != nil { - t.Fatal(err) - } - testutil.WriteCacheFileset(eval, m.mapFunc(flowFiles("b")).Digest(), fsB) + fs := testutil.Files("e") + _ = fs.AddAssertions(reflow.AssertionsFromEntry( + reflow.AssertionKey{Subject: "e", Namespace: "blob"}, map[string]string{"etag": "invalid"})) + testutil.WriteCacheFileset(eval, extern.Digest(), fs) + testutil.WriteCache(eval, extern.Digest()) ctx, cancel := context.WithTimeout(context.Background(), timeout) rc := testutil.EvalAsync(ctx, eval) - e.Ok(ctx, extern, reflow.Fileset{}) r := <-rc cancel() if r.Err != nil { t.Fatal(r.Err) } - if !e.Equiv(extern) { - t.Error("wrong set of expected flows") + if !e.Equiv() { //no flows to be executed + t.Error("did not expect any flows to be executed") } - // BottomUp evaluation with filesets containing invalid assertions. e, config, done = newTestScheduler() defer done() config.CacheMode = infra.CacheRead | infra.CacheWrite @@ -853,35 +801,27 @@ func TestCacheLookupBottomupWithAssertNever(t *testing.T) { fsA, fsB, fsC := testutil.Files("a"), testutil.Files("b"), testutil.Files("c") // All three will be cache-hits. - assertionA = reflow.AssertionsFromEntry( - reflow.AssertionKey{Subject: "a", Namespace: "blob"}, map[string]string{"etag": "invalid"}) - if err := fsA.AddAssertions(assertionA); err != nil { - t.Fatal(err) - } - assertionB = reflow.AssertionsFromEntry( - reflow.AssertionKey{Subject: "b", Namespace: "blob"}, map[string]string{"etag": "invalid"}) - if err := fsB.AddAssertions(assertionB); err != nil { - t.Fatal(err) - } - assertionC := reflow.AssertionsFromEntry( - reflow.AssertionKey{Subject: "c", Namespace: "blob"}, map[string]string{"etag": "invalid"}) - if err := fsC.AddAssertions(assertionC); err != nil { - t.Fatal(err) - } + _ = fsA.AddAssertions(reflow.AssertionsFromEntry( + reflow.AssertionKey{Subject: "a", Namespace: "blob"}, map[string]string{"etag": "invalid"})) + _ = fsB.AddAssertions(reflow.AssertionsFromEntry( + reflow.AssertionKey{Subject: "b", Namespace: "blob"}, map[string]string{"etag": "invalid"})) + _ = fsC.AddAssertions(reflow.AssertionsFromEntry( + reflow.AssertionKey{Subject: "c", Namespace: "blob"}, map[string]string{"etag": "invalid"})) testutil.WriteCacheFileset(eval, m.mapFunc(flowFiles("a")).Digest(), fsA) testutil.WriteCacheFileset(eval, m.mapFunc(flowFiles("b")).Digest(), fsB) testutil.WriteCacheFileset(eval, m.mapFunc(flowFiles("c")).Digest(), fsC) + // extern also has a cached result. + testutil.WriteCache(eval, extern.Digest()) ctx, cancel = context.WithTimeout(context.Background(), timeout) defer cancel() rc = testutil.EvalAsync(ctx, eval) - e.Ok(ctx, extern, reflow.Fileset{}) r = <-rc if r.Err != nil { t.Fatal(r.Err) } - if !e.Equiv(extern) { - t.Error("wrong set of expected flows") + if !e.Equiv() { //no flows to be executed + t.Error("did not expect any flows to be executed") } } @@ -914,89 +854,43 @@ func TestCacheLookupMissing(t *testing.T) { } } -func TestExtern(t *testing.T) { +func TestNoCacheExtern(t *testing.T) { for _, bottomup := range []bool{false, true} { - // The following is done in a func to defer context cancellations. - func() { - intern := op.Intern("internurl") - groupby := op.Groupby("(.*)", intern) - m := newMapper(func(f *flow.Flow) *flow.Flow { - return op.Exec("image", "command", testutil.Resources, f) - }) - mapCollect := op.Map(m.mapFunc, groupby) - pullup := op.Pullup(mapCollect) - extern := op.Extern("externurl", pullup) - - e, config, done := newTestScheduler() - defer done() - config.CacheMode = infra.CacheRead | infra.CacheWrite - config.BottomUp = bottomup - eval := flow.NewEval(extern, config) - - testutil.WriteCache(eval, intern.Digest(), "a", "b") - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - rc := testutil.EvalAsync(ctx, eval) - for _, v := range []reflow.Fileset{ - testutil.WriteFiles(e.Repo, "a"), - testutil.WriteFiles(e.Repo, "b"), - } { - f := &flow.Flow{Op: flow.Val, Value: values.T(v), State: flow.Done} - go e.Ok(ctx, m.mapFunc(f), v) - } + intern := op.Intern("internurl") + groupby := op.Groupby("(.*)", intern) + m := newMapper(func(f *flow.Flow) *flow.Flow { + return op.Exec("image", "command", testutil.Resources, f) + }) + mapCollect := op.Map(m.mapFunc, groupby) + pullup := op.Pullup(mapCollect) + extern := op.Extern("externurl", pullup) + + e, config, done := newTestScheduler() + defer done() + config.CacheMode = infra.CacheRead | infra.CacheWrite + config.BottomUp = bottomup + config.NoCacheExtern = true + eval := flow.NewEval(extern, config) + + testutil.WriteCache(eval, intern.Digest(), "a", "b") + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + rc := testutil.EvalAsync(ctx, eval) + for _, v := range []reflow.Fileset{ + testutil.WriteFiles(e.Repo, "a"), + testutil.WriteFiles(e.Repo, "b"), + } { + f := &flow.Flow{Op: flow.Val, Value: values.T(v), State: flow.Done} + go e.Ok(ctx, m.mapFunc(f), v) + } - e.Ok(ctx, extern, reflow.Fileset{}) - r := <-rc - if r.Err != nil { - t.Fatal(r.Err) - } - if !e.Equiv(extern, m.mapFunc(flowFiles("a")), m.mapFunc(flowFiles("b"))) { - t.Error("wrong set of expected flows") - } - // Assert that we don't write cache entries for externs. - want := errors.E(errors.NotExist) - if _, _, err := eval.Assoc.Get(ctx, assoc.Fileset, extern.Digest()); !errors.Match(want, err) { - t.Fatal("did not expect to write a cache entry for extern") - } - if _, _, err := eval.Assoc.Get(ctx, assoc.FilesetV2, extern.Digest()); !errors.Match(want, err) { - t.Fatal("did not expect to write a cache entry for extern") - } - }() - } -} + e.Ok(ctx, extern, reflow.Fileset{}) + r := <-rc + if r.Err != nil { + t.Fatal(r.Err) + } -func TestNoCacheExternDeprecation(t *testing.T) { - // Assert that extern cache entries written before deprecating the - // nocacheextern flag are not used in new evaluations. - for _, bottomup := range []bool{false, true} { - // The following is done in a func to defer context cancellations. - func() { - intern := op.Intern("internurl") - exec := op.Exec("image", "command", testutil.Resources, intern) - extern := op.Extern("externurl", exec) - - e, config, done := newTestScheduler() - defer done() - config.CacheMode = infra.CacheRead | infra.CacheWrite - config.BottomUp = bottomup - eval := flow.NewEval(extern, config) - - testutil.WriteCache(eval, intern.Digest(), "a") - testutil.WriteCache(eval, exec.Digest()) - testutil.WriteCache(eval, extern.Digest()) // existing cache entry should not be used. - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - rc := testutil.EvalAsync(ctx, eval) - e.Ok(ctx, extern, reflow.Fileset{}) - r := <-rc - if r.Err != nil { - t.Fatal(r.Err) - } - if !e.Equiv(extern) { - t.Error("wrong set of expected flows") - } - }() + // TODO(swami): Add assertion that the 'extern' flow result isn't cached. } } diff --git a/repository/blobrepo/blobrepo.go b/repository/blobrepo/blobrepo.go index 4c8d4fe2..ae162911 100644 --- a/repository/blobrepo/blobrepo.go +++ b/repository/blobrepo/blobrepo.go @@ -63,6 +63,20 @@ func (r *Repository) Stat(ctx context.Context, id digest.Digest) (reflow.File, e return file, err } +// Location returns the location of this object. +func (r *Repository) Location(ctx context.Context, id digest.Digest) (string, error) { + id, err := r.resolve(ctx, id) + if err != nil { + return "", err + } + var src string + file, err := r.Bucket.File(ctx, path.Join(r.Prefix, objectsPath, id.String())) + if err == nil { + src = file.Source + } + return src, err +} + // Get retrieves an object from the repository. func (r *Repository) Get(ctx context.Context, id digest.Digest) (io.ReadCloser, error) { id, err := r.resolve(ctx, id) diff --git a/runtime/runflags.go b/runtime/runflags.go index 18da3ad5..9ff9905f 100644 --- a/runtime/runflags.go +++ b/runtime/runflags.go @@ -18,11 +18,9 @@ type FlagName string const ( Unknown FlagName = "unknown" // CommonRunFlags flag names - FlagNameAssert FlagName = "assert" - FlagNameEvalStrategy FlagName = "eval" - FlagNameInvalidate FlagName = "invalidate" - // Deprecated: externs are no longer cached. - // TODO(pfialho): remove flag. + FlagNameAssert FlagName = "assert" + FlagNameEvalStrategy FlagName = "eval" + FlagNameInvalidate FlagName = "invalidate" FlagNameNoCacheExtern FlagName = "nocacheextern" FlagNamePostUseChecksum FlagName = "postusechecksum" FlagNameRecomputeEmpty FlagName = "recomputeempty" @@ -41,6 +39,8 @@ type CommonRunFlags struct { EvalStrategy string // Invalidate is a regular expression for node identifiers that should be invalidated. Invalidate string + // NoCacheExtern indicates if extern operations should be written to cache. + NoCacheExtern bool // PostUseChecksum indicates whether input filesets are checksummed after use. PostUseChecksum bool // RecomputeEmpty indicates if cache results with empty filesets be automatically recomputed. @@ -104,11 +104,13 @@ Reach out to the reflow team for more information.`) flags.StringVar(&r.Invalidate, prefix+"invalidate", "", "regular expression for node identifiers that should be invalidated") } if names == nil || names[FlagNameNoCacheExtern] { - var vacuum bool - flags.BoolVar(&vacuum, prefix+string(FlagNameNoCacheExtern), false, `don't cache extern ops + // TODO(pboyapalli): [SYSINFRA-554] modify extern caching so that we can drop the nocacheextern flag + flags.BoolVar(&r.NoCacheExtern, prefix+string(FlagNameNoCacheExtern), false, `don't cache extern ops -DEPRECATED. This flag may still be set, but the program will always behave as if -"nocacheextern=true" because externs are no longer cached.`) +For all extern operations, "nocacheextern=true" acts like "cache=off" and +"nocacheextern=false" acts like "cache=readwrite". With "nocacheextern=true", +extern operations that have a cache hit will result in the fileset being copied +to the extern URL again.`) } if names == nil || names[FlagNamePostUseChecksum] { flags.BoolVar(&r.PostUseChecksum, prefix+string(FlagNamePostUseChecksum), false, `checksum exec input files after use @@ -150,6 +152,7 @@ func (r *CommonRunFlags) Configure(c *flow.EvalConfig) (err error) { if c.Assert, err = asserter(r.Assert); err != nil { return err } + c.NoCacheExtern = r.NoCacheExtern c.RecomputeEmpty = r.RecomputeEmpty c.BottomUp = r.EvalStrategy == "bottomup" c.PostUseChecksum = r.PostUseChecksum diff --git a/sched/scheduler.go b/sched/scheduler.go index 82637db4..520bba5d 100644 --- a/sched/scheduler.go +++ b/sched/scheduler.go @@ -62,6 +62,11 @@ type Cluster interface { CanAllocate(reflow.Resources) (bool, error) } +// blobLocator defines an interface for locating blobs. +type blobLocator interface { + Location(ctx context.Context, id digest.Digest) (string, error) +} + // A Scheduler is responsible for managing a set of tasks and allocs, // assigning (and reassigning) tasks to appropriate allocs. Scheduler // can manage large numbers of tasks and allocs efficiently. @@ -792,77 +797,48 @@ func (s *Scheduler) doDirectTransfer(ctx context.Context, task *Task, taskLogger return errors.E(errors.Precondition, errors.Errorf("unexpected args (must be 1, but was %d): %v", len(task.Config.Args), task.Config.Args)) } + // Check if the task's repository supports blobLocator. + fileLocator, ok := task.Repository.(blobLocator) + if !ok { + return errors.E(errors.NotSupported, errors.New("scheduler repository does not support locating blobs")) + } // Check if the destination is a blob store. if _, _, err := s.Mux.Bucket(ctx, task.Config.URL); err != nil { return err } - // Get existing keys at destination path. + extUrl := strings.TrimSuffix(task.Config.URL, "/") - const withMetadata = false - scan, err := s.Mux.Scan(ctx, extUrl, withMetadata) - if err != nil { - return err - } - existing := make(map[string]reflow.File) - for scan.Scan(ctx) { - f := scan.File() - existing[f.Source] = f - } - if err = scan.Err(); err != nil { - return err - } - // Determine which files need to be transferred. fs := task.Config.Args[0].Fileset.Pullup() - task.mu.Lock() - task.Result.Fileset.Map = make(map[string]reflow.File, len(fs.Map)) - task.mu.Unlock() + var transfers []directTransfer - g, gctx := errgroup.WithContext(ctx) for k, v := range fs.Map { - filename, srcFile := k, v - g.Go(func() error { - // A resolved file does not contain the metadata needed for the - // Mux.*Transfer methods. - var srcRef reflow.File - if !srcFile.IsRef() { - var err error - srcRef, err = task.Repository.Stat(gctx, srcFile.ID) - if err != nil { - return err - } + filename, file := k, v + var srcUrl string + if !file.IsRef() { + // resolved file + if src, err := fileLocator.Location(ctx, file.ID); err != nil { + return err } else { - srcRef = srcFile - } - dstUrl := extUrl + "/" + filename - if filename == "." { - dstUrl = extUrl - } - if ok, err := s.Mux.CanTransfer(gctx, dstUrl, srcRef.Source); !ok { - return errors.E(fmt.Sprintf("scheduler cannot direct transfer: %s -> %s", srcRef.Source, dstUrl), err) + srcUrl = src } - if dstRef, ok := existing[dstUrl]; ok { - need, err := s.Mux.NeedTransfer(gctx, dstRef, srcRef) - if err != nil { - taskLogger.Errorf("scheduler cannot determine if direct transfer is needed: %s -> %s: %v", srcRef.Source, dstUrl, err) - } else if !need { - // Skip transfer. - task.mu.Lock() - task.Result.Fileset.Map[filename] = srcFile - task.mu.Unlock() - return nil - } - } - task.mu.Lock() - transfers = append(transfers, directTransfer{filename, srcFile, srcRef.Source, dstUrl}) - task.mu.Unlock() - return nil - }) - } - err = g.Wait() - if err != nil { - return err + } else { + // reference file + srcUrl = file.Source + } + dstUrl := extUrl + "/" + filename + if filename == "." { + dstUrl = extUrl + } + if ok, err := s.Mux.CanTransfer(ctx, dstUrl, srcUrl); !ok { + return errors.E(fmt.Sprintf("scheduler cannot direct transfer: %s -> %s", srcUrl, dstUrl), err) + } + transfers = append(transfers, directTransfer{filename, file, srcUrl, dstUrl}) } + task.mu.Lock() + task.Result.Fileset.Map = map[string]reflow.File{} + task.mu.Unlock() + var ( failed []directTransfer stalledAttempts int @@ -870,7 +846,7 @@ func (s *Scheduler) doDirectTransfer(ctx context.Context, task *Task, taskLogger const maxStalledAttempts = 3 for stalledAttempts <= maxStalledAttempts && len(transfers) > 0 { - g, gctx = errgroup.WithContext(ctx) + g, gctx := errgroup.WithContext(ctx) for _, t := range transfers { taskLogger.Debugf("%d transfers remaining, stalled attempts: %d/%d", len(transfers), stalledAttempts, maxStalledAttempts) t := t diff --git a/sched/scheduler_test.go b/sched/scheduler_test.go index 1dadf65d..f27e2ba7 100644 --- a/sched/scheduler_test.go +++ b/sched/scheduler_test.go @@ -666,12 +666,12 @@ func TestSchedulerDirectTransfer(t *testing.T) { scheduler.Mux = blob.Mux{"test": blb} defer shutdown() ctx := context.Background() - repo := testutil.NewInmemoryRepository("") + repo := testutil.NewInmemoryLocatorRepository() in := utiltest.RandomFileset(repo) expectExists(t, repo, in) for _, f := range in.Files() { loc := fmt.Sprintf("test://bucketin/objects/%s", f.ID) - _ = repo.SetLocation(ctx, f.ID, loc) + repo.SetLocation(f.ID, loc) rc, _ := repo.Get(ctx, f.ID) _ = scheduler.Mux.Put(ctx, loc, f.Size, rc, "") } @@ -720,12 +720,12 @@ func TestSchedulerDirectTransferRetryableErrorsProgress(t *testing.T) { scheduler.Mux = blob.Mux{"test": blb} defer shutdown() ctx := context.Background() - repo := testutil.NewInmemoryRepository("") + repo := testutil.NewInmemoryLocatorRepository() in := utiltest.RandomFileset(repo) expectExists(t, repo, in) for _, f := range in.Files() { loc := fmt.Sprintf("test://bucketin/objects/%s", f.ID) - _ = repo.SetLocation(ctx, f.ID, loc) + repo.SetLocation(f.ID, loc) rc, _ := repo.Get(ctx, f.ID) _ = scheduler.Mux.Put(ctx, loc, f.Size, rc, "") } @@ -762,12 +762,12 @@ func TestSchedulerDirectTransferRetryableErrorsStalled(t *testing.T) { scheduler.Mux = blob.Mux{"test": blb} defer shutdown() ctx := context.Background() - repo := testutil.NewInmemoryRepository("") + repo := testutil.NewInmemoryLocatorRepository() in := utiltest.RandomFileset(repo) expectExists(t, repo, in) for _, f := range in.Files() { loc := fmt.Sprintf("test://bucketin/objects/%s", f.ID) - _ = repo.SetLocation(ctx, f.ID, loc) + repo.SetLocation(f.ID, loc) rc, _ := repo.Get(ctx, f.ID) _ = scheduler.Mux.Put(ctx, loc, f.Size, rc, "") } @@ -801,7 +801,7 @@ func TestSchedulerDirectTransfer_noLocator(t *testing.T) { } func TestSchedulerDirectTransfer_unresolvedFile(t *testing.T) { - repo := testutil.NewInmemoryRepository("") + repo := testutil.NewInmemoryLocatorRepository() scheduler, _, shutdown := newTestScheduler(t) blb1, blb2 := testblob.New("test"), testblob.New("test2") scheduler.Mux = blob.Mux{"test": blb1, "test2": blb2} @@ -811,7 +811,7 @@ func TestSchedulerDirectTransfer_unresolvedFile(t *testing.T) { ctx := context.Background() for _, f := range in.Files() { loc := fmt.Sprintf("test://bucketin/objects/%s", f.ID) - _ = repo.SetLocation(ctx, f.ID, loc) + repo.SetLocation(f.ID, loc) rc, _ := repo.Get(ctx, f.ID) _ = scheduler.Mux.Put(ctx, loc, f.Size, rc, "") } diff --git a/test/testutil/repository.go b/test/testutil/repository.go index 588a3642..ce6d9e55 100644 --- a/test/testutil/repository.go +++ b/test/testutil/repository.go @@ -26,15 +26,10 @@ import ( //go:generate stringer -type=RepositoryCallKind -type FileWithContents struct { - reflow.File - contents []byte -} - // InmemoryRepository is an in-memory repository used for testing. type InmemoryRepository struct { mu sync.Mutex - files map[digest.Digest]FileWithContents + files map[digest.Digest][]byte url *url.URL } @@ -63,7 +58,7 @@ func NewInmemoryRepository(name string) *InmemoryRepository { return nil } repo := &InmemoryRepository{ - files: map[digest.Digest]FileWithContents{}, + files: map[digest.Digest][]byte{}, url: url, } repos[name] = repo @@ -76,16 +71,16 @@ func GetInMemoryRepository(repo *url.URL) *InmemoryRepository { return repos[repo.Host] } -func (r *InmemoryRepository) get(k digest.Digest) (FileWithContents, bool) { +func (r *InmemoryRepository) get(k digest.Digest) []byte { r.mu.Lock() - b, ok := r.files[k] + b := r.files[k] r.mu.Unlock() - return b, ok + return b } -func (r *InmemoryRepository) set(k digest.Digest, f FileWithContents) { +func (r *InmemoryRepository) set(k digest.Digest, b []byte) { r.mu.Lock() - r.files[k] = f + r.files[k] = b r.mu.Unlock() } @@ -98,31 +93,20 @@ func (r *InmemoryRepository) Delete(_ context.Context, id digest.Digest) { // Stat returns metadata for the blob named by id. func (r *InmemoryRepository) Stat(_ context.Context, id digest.Digest) (reflow.File, error) { - f, ok := r.get(id) - if !ok { + b := r.get(id) + if b == nil { return reflow.File{}, errors.E("stat", id, errors.NotExist) } - return f.File, nil -} - -// SetLocation returns metadata for the blob named by id. -func (r *InmemoryRepository) SetLocation(_ context.Context, id digest.Digest, loc string) error { - f, ok := r.get(id) - if !ok { - return errors.E("stat", id, errors.NotExist) - } - f.File.Source = loc - r.set(id, f) - return nil + return reflow.File{ID: id, Size: int64(len(b))}, nil } // Get returns the blob named by id. func (r *InmemoryRepository) Get(_ context.Context, id digest.Digest) (io.ReadCloser, error) { - b, ok := r.get(id) - if !ok { + b := r.get(id) + if b == nil { return nil, errors.E("get", id, errors.NotExist) } - return ioutil.NopCloser(bytes.NewReader(b.contents)), nil + return ioutil.NopCloser(bytes.NewReader(b)), nil } // Put installs the blob rd and returns its digest. @@ -132,8 +116,7 @@ func (r *InmemoryRepository) Put(_ context.Context, rd io.Reader) (digest.Digest return digest.Digest{}, err } id := reflow.Digester.FromBytes(b) - f := FileWithContents{reflow.File{ID: id, Size: int64(len(b))}, b} - r.set(id, f) + r.set(id, b) return id, nil } @@ -172,11 +155,7 @@ func (r *InmemoryRepository) URL() *url.URL { } func (r *InmemoryRepository) RawFiles() map[digest.Digest][]byte { - m := make(map[digest.Digest][]byte, len(r.files)) - for k, v := range r.files { - m[k] = v.contents - } - return m + return r.files } func (r *InmemoryRepository) Copy() *InmemoryRepository { @@ -199,6 +178,36 @@ func (r *InmemoryRepository) Vacuum(ctx context.Context, repo *InmemoryRepositor repo.files = nil } +// InmemoryLocatorRepository is an in-memory repository used for testing which also implements scheduler.blobLocator. +type InmemoryLocatorRepository struct { + *InmemoryRepository + locations map[digest.Digest]string +} + +// NewInmemoryLocatorRepository returns a new repository that stores objects +// in memory. +func NewInmemoryLocatorRepository() *InmemoryLocatorRepository { + return &InmemoryLocatorRepository{ + InmemoryRepository: NewInmemoryRepository(""), + locations: map[digest.Digest]string{}, + } +} + +func (r *InmemoryLocatorRepository) SetLocation(k digest.Digest, loc string) { + r.mu.Lock() + r.locations[k] = loc + r.mu.Unlock() +} + +// Location implements scheduler.blobLocator +func (r *InmemoryLocatorRepository) Location(ctx context.Context, id digest.Digest) (string, error) { + loc := r.locations[id] + if loc == "" { + return "", errors.E(errors.NotExist, fmt.Errorf("unknown %v", id)) + } + return loc, nil +} + // RepositoryCallKind indicates the type of repository call. type RepositoryCallKind int