From 63352b0fb96b6231686023fb83d4c708bd27d430 Mon Sep 17 00:00:00 2001 From: Erik Sipsma Date: Fri, 26 Apr 2024 18:55:25 -0700 Subject: [PATCH] solver: recursively add merge source jobs to target and ancestors Signed-off-by: Erik Sipsma --- solver/jobs.go | 51 +++++++++++++++++++++++--- solver/scheduler_test.go | 78 +++++++++++++++++++++++++++++++++------- 2 files changed, 111 insertions(+), 18 deletions(-) diff --git a/solver/jobs.go b/solver/jobs.go index 2ee5b09b8398..ea8a9a1b3b3e 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -176,11 +176,7 @@ func (s *state) setEdge(index Index, targetEdge *edge, targetState *state) { targetEdge.takeOwnership(e) if targetState != nil { - targetState.mu.Lock() - for j := range s.jobs { - targetState.jobs[j] = struct{}{} - } - targetState.mu.Unlock() + targetState.addJobs(s, map[*state]struct{}{}) if _, ok := targetState.allPw[s.mpw]; !ok { targetState.mpw.Add(s.mpw) @@ -189,6 +185,51 @@ func (s *state) setEdge(index Index, targetEdge *edge, targetState *state) { } } +// addJobs recursively adds jobs to state and all its ancestors. currently +// only used during edge merges to add jobs from the source of the merge to the +// target and its ancestors. +// requires that Solver.mu is read-locked and srcState.mu is locked +func (s *state) addJobs(srcState *state, memo map[*state]struct{}) { + if _, ok := memo[s]; ok { + return + } + memo[s] = struct{}{} + + s.mu.Lock() + defer s.mu.Unlock() + + for j := range srcState.jobs { + s.jobs[j] = struct{}{} + } + + for _, inputEdge := range s.vtx.Inputs() { + inputState, ok := s.solver.actives[inputEdge.Vertex.Digest()] + if !ok { + bklog.G(context.TODO()). + WithField("vertex_digest", inputEdge.Vertex.Digest()). + Error("input vertex not found during addJobs") + continue + } + inputState.addJobs(srcState, memo) + + // tricky case: if the inputState's edge was *already* merged we should + // also add jobs to the merged edge's state + mergedInputEdge := inputState.getEdge(inputEdge.Index) + if mergedInputEdge == nil || mergedInputEdge.edge.Vertex.Digest() == inputEdge.Vertex.Digest() { + // not merged + continue + } + mergedInputState, ok := s.solver.actives[mergedInputEdge.edge.Vertex.Digest()] + if !ok { + bklog.G(context.TODO()). + WithField("vertex_digest", mergedInputEdge.edge.Vertex.Digest()). + Error("merged input vertex not found during addJobs") + continue + } + mergedInputState.addJobs(srcState, memo) + } +} + func (s *state) combinedCacheManager() CacheManager { s.mu.Lock() cms := make([]CacheManager, 0, len(s.cache)+1) diff --git a/solver/scheduler_test.go b/solver/scheduler_test.go index 06f99207f830..46ef12de7635 100644 --- a/solver/scheduler_test.go +++ b/solver/scheduler_test.go @@ -3457,18 +3457,19 @@ func TestStaleEdgeMerge(t *testing.T) { }) defer s.Close() + depV0 := vtxConst(1, vtxOpt{name: "depV0"}) + depV1 := vtxConst(1, vtxOpt{name: "depV1"}) + depV2 := vtxConst(1, vtxOpt{name: "depV2"}) + // These should all end up edge merged v0 := vtxAdd(2, vtxOpt{name: "v0", inputs: []Edge{ - {Vertex: vtxConst(3, vtxOpt{})}, - {Vertex: vtxConst(4, vtxOpt{})}, + {Vertex: depV0}, }}) v1 := vtxAdd(2, vtxOpt{name: "v1", inputs: []Edge{ - {Vertex: vtxConst(3, vtxOpt{})}, - {Vertex: vtxConst(4, vtxOpt{})}, + {Vertex: depV1}, }}) v2 := vtxAdd(2, vtxOpt{name: "v2", inputs: []Edge{ - {Vertex: vtxConst(3, vtxOpt{})}, - {Vertex: vtxConst(4, vtxOpt{})}, + {Vertex: depV2}, }}) j0, err := s.NewJob("job0") @@ -3478,6 +3479,11 @@ func TestStaleEdgeMerge(t *testing.T) { require.NoError(t, err) require.NotNil(t, res) + require.Contains(t, s.actives, v0.Digest()) + require.Contains(t, s.actives[v0.Digest()].jobs, j0) + require.Contains(t, s.actives, depV0.Digest()) + require.Contains(t, s.actives[depV0.Digest()].jobs, j0) + // this edge should be merged with the one from j0 j1, err := s.NewJob("job1") require.NoError(t, err) @@ -3486,14 +3492,37 @@ func TestStaleEdgeMerge(t *testing.T) { require.NoError(t, err) require.NotNil(t, res) + require.Contains(t, s.actives, v0.Digest()) + require.Contains(t, s.actives[v0.Digest()].jobs, j0) + require.Contains(t, s.actives[v0.Digest()].jobs, j1) + require.Contains(t, s.actives, depV0.Digest()) + require.Contains(t, s.actives[depV0.Digest()].jobs, j0) + require.Contains(t, s.actives[depV0.Digest()].jobs, j1) + + require.Contains(t, s.actives, v1.Digest()) + require.NotContains(t, s.actives[v1.Digest()].jobs, j0) + require.Contains(t, s.actives[v1.Digest()].jobs, j1) + require.Contains(t, s.actives, depV1.Digest()) + require.NotContains(t, s.actives[depV1.Digest()].jobs, j0) + require.Contains(t, s.actives[depV1.Digest()].jobs, j1) + // discard j0, verify that v0 is still active and it's state contains j1 since j1's // edge was merged to v0's state require.NoError(t, j0.Discard()) + require.Contains(t, s.actives, v0.Digest()) - require.Contains(t, s.actives, v1.Digest()) require.NotContains(t, s.actives[v0.Digest()].jobs, j0) require.Contains(t, s.actives[v0.Digest()].jobs, j1) + require.Contains(t, s.actives, depV0.Digest()) + require.NotContains(t, s.actives[depV0.Digest()].jobs, j0) + require.Contains(t, s.actives[depV0.Digest()].jobs, j1) + + require.Contains(t, s.actives, v1.Digest()) + require.NotContains(t, s.actives[v1.Digest()].jobs, j0) require.Contains(t, s.actives[v1.Digest()].jobs, j1) + require.Contains(t, s.actives, depV1.Digest()) + require.NotContains(t, s.actives[depV1.Digest()].jobs, j0) + require.Contains(t, s.actives[depV1.Digest()].jobs, j1) // verify another job can still merge j2, err := s.NewJob("job2") @@ -3504,29 +3533,52 @@ func TestStaleEdgeMerge(t *testing.T) { require.NotNil(t, res) require.Contains(t, s.actives, v0.Digest()) - require.Contains(t, s.actives, v1.Digest()) - require.Contains(t, s.actives, v2.Digest()) - require.NotContains(t, s.actives[v0.Digest()].jobs, j0) require.Contains(t, s.actives[v0.Digest()].jobs, j1) require.Contains(t, s.actives[v0.Digest()].jobs, j2) + require.Contains(t, s.actives, depV0.Digest()) + require.Contains(t, s.actives[depV0.Digest()].jobs, j1) + require.Contains(t, s.actives[depV0.Digest()].jobs, j2) + + require.Contains(t, s.actives, v1.Digest()) require.Contains(t, s.actives[v1.Digest()].jobs, j1) + require.NotContains(t, s.actives[v1.Digest()].jobs, j2) + require.Contains(t, s.actives, depV1.Digest()) + require.Contains(t, s.actives[depV1.Digest()].jobs, j1) + require.NotContains(t, s.actives[depV1.Digest()].jobs, j2) + + require.Contains(t, s.actives, v2.Digest()) + require.NotContains(t, s.actives[v2.Digest()].jobs, j1) require.Contains(t, s.actives[v2.Digest()].jobs, j2) + require.Contains(t, s.actives, depV2.Digest()) + require.NotContains(t, s.actives[depV2.Digest()].jobs, j1) + require.Contains(t, s.actives[depV2.Digest()].jobs, j2) // discard j1, verify only referenced edges still exist require.NoError(t, j1.Discard()) + require.Contains(t, s.actives, v0.Digest()) - require.NotContains(t, s.actives, v1.Digest()) - require.Contains(t, s.actives, v2.Digest()) - require.NotContains(t, s.actives[v0.Digest()].jobs, j0) require.NotContains(t, s.actives[v0.Digest()].jobs, j1) require.Contains(t, s.actives[v0.Digest()].jobs, j2) + require.Contains(t, s.actives, depV0.Digest()) + require.NotContains(t, s.actives[depV0.Digest()].jobs, j1) + require.Contains(t, s.actives[depV0.Digest()].jobs, j2) + + require.NotContains(t, s.actives, v1.Digest()) + require.NotContains(t, s.actives, depV1.Digest()) + + require.Contains(t, s.actives, v2.Digest()) require.Contains(t, s.actives[v2.Digest()].jobs, j2) + require.Contains(t, s.actives, depV2.Digest()) + require.Contains(t, s.actives[depV2.Digest()].jobs, j2) // discard the last job and verify everything was removed now require.NoError(t, j2.Discard()) require.NotContains(t, s.actives, v0.Digest()) require.NotContains(t, s.actives, v1.Digest()) require.NotContains(t, s.actives, v2.Digest()) + require.NotContains(t, s.actives, depV0.Digest()) + require.NotContains(t, s.actives, depV1.Digest()) + require.NotContains(t, s.actives, depV2.Digest()) } func generateSubGraph(nodes int) (Edge, int) {