Skip to content

Commit

Permalink
fix: improve query performance for get archived workflow in controlle…
Browse files Browse the repository at this point in the history
…r during estimation. Fixes #13382

Signed-off-by: linzhengen <[email protected]>
  • Loading branch information
linzhengen committed Jul 26, 2024
1 parent 637ffce commit 45fce7d
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 23 deletions.
31 changes: 31 additions & 0 deletions persist/sqldb/mocks/WorkflowArchive.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions persist/sqldb/null_workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/labels"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
sutils "github.com/argoproj/argo-workflows/v3/server/utils"
)
Expand Down Expand Up @@ -32,6 +34,10 @@ func (r *nullWorkflowArchive) GetWorkflow(string, string, string) (*wfv1.Workflo
return nil, fmt.Errorf("getting archived workflows not supported")
}

func (r *nullWorkflowArchive) GetWorkflowForEstimator(namespace string, requirements []labels.Requirement) (*wfv1.Workflow, error) {
return nil, fmt.Errorf("getting archived workflow for estimator not supported")
}

func (r *nullWorkflowArchive) DeleteWorkflow(string) error {
return fmt.Errorf("deleting archived workflows not supported")
}
Expand Down
50 changes: 50 additions & 0 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/labels"

log "github.com/sirupsen/logrus"
"github.com/upper/db/v4"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -69,6 +71,7 @@ type WorkflowArchive interface {
ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error)
CountWorkflows(options sutils.ListOptions) (int64, error)
GetWorkflow(uid string, namespace string, name string) (*wfv1.Workflow, error)
GetWorkflowForEstimator(namespace string, requirements []labels.Requirement) (*wfv1.Workflow, error)
DeleteWorkflow(uid string) error
DeleteExpiredWorkflows(ttl time.Duration) error
IsEnabled() bool
Expand Down Expand Up @@ -289,6 +292,13 @@ func namePrefixClause(namePrefix string) db.Cond {
return db.Cond{}
}

func phaseEqual(phase string) db.Cond {
if phase != "" {
return db.Cond{"phase": phase}
}
return db.Cond{}
}

func (r *workflowArchive) GetWorkflow(uid string, namespace string, name string) (*wfv1.Workflow, error) {
var err error
archivedWf := &archivedWorkflowRecord{}
Expand Down Expand Up @@ -377,6 +387,46 @@ func (r *workflowArchive) DeleteExpiredWorkflows(ttl time.Duration) error {
return nil
}

func (r *workflowArchive) GetWorkflowForEstimator(namespace string, requirements []labels.Requirement) (*wfv1.Workflow, error) {
selector := r.session.SQL().
Select("name", "namespace", "uid", "startedat", "finishedat").
From(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID()).
And(phaseEqual(string(wfv1.NodeSucceeded)))

selector, err := BuildArchivedWorkflowSelector(selector, archiveTableName, archiveLabelsTableName, r.dbType, sutils.ListOptions{
Namespace: namespace,
LabelRequirements: requirements,
Limit: 1,
Offset: 0,
}, false)
if err != nil {
return nil, err
}

var awf archivedWorkflowMetadata
err = selector.One(&awf)
if err != nil {
return nil, err
}

return &wfv1.Workflow{
ObjectMeta: v1.ObjectMeta{
Name: awf.Name,
Namespace: awf.Namespace,
UID: types.UID(awf.UID),
Labels: map[string]string{
common.LabelKeyWorkflowArchivingStatus: "Persisted",
},
},
Status: wfv1.WorkflowStatus{
StartedAt: v1.Time{Time: awf.StartedAt},
FinishedAt: v1.Time{Time: awf.FinishedAt},
},
}, nil

}

func selectArchivedWorkflowQuery(t dbType) (*db.RawExpr, error) {
switch t {
case MySQL:
Expand Down
17 changes: 4 additions & 13 deletions workflow/controller/estimation/estimator_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/argoproj/argo-workflows/v3/persist/sqldb"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/server/utils"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/controller/indexes"
"github.com/argoproj/argo-workflows/v3/workflow/hydrator"
Expand Down Expand Up @@ -72,23 +71,15 @@ func (f *estimatorFactory) NewEstimator(wf *wfv1.Workflow) (Estimator, error) {
return &estimator{wf, newestWf}, nil
}
// we failed to find a base-line in the live set, so we now look in the archive
requirements, err := labels.ParseToRequirements(common.LabelKeyPhase + "=" + string(wfv1.NodeSucceeded) + "," + labelName + "=" + labelValue)
requirements, err := labels.ParseToRequirements(labelName + "=" + labelValue)
if err != nil {
return defaultEstimator, fmt.Errorf("failed to parse selector to requirements: %v", err)
}
workflows, err := f.wfArchive.ListWorkflows(
utils.ListOptions{
Namespace: wf.Namespace,
LabelRequirements: requirements,
Limit: 1,
Offset: 0,
})
baselineWF, err := f.wfArchive.GetWorkflowForEstimator(wf.Namespace, requirements)
if err != nil {
return defaultEstimator, fmt.Errorf("failed to list archived workflows: %v", err)
}
if len(workflows) > 0 {
return &estimator{wf, &workflows[0]}, nil
return defaultEstimator, fmt.Errorf("failed to get archived workflows: %v", err)
}
return &estimator{wf, baselineWF}, nil
}
}
return defaultEstimator, nil
Expand Down
13 changes: 3 additions & 10 deletions workflow/controller/estimation/estimator_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

sqldbmocks "github.com/argoproj/argo-workflows/v3/persist/sqldb/mocks"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/server/utils"
testutil "github.com/argoproj/argo-workflows/v3/test/util"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/controller/indexes"
Expand Down Expand Up @@ -51,17 +50,11 @@ metadata:
workflows.argoproj.io/phase: Succeeded
`), wfFailed)
wfArchive := &sqldbmocks.WorkflowArchive{}
r, err := labels.ParseToRequirements("workflows.argoproj.io/phase=Succeeded,workflows.argoproj.io/workflow-template=my-archived-wftmpl")
r, err := labels.ParseToRequirements("workflows.argoproj.io/workflow-template=my-archived-wftmpl")
assert.NoError(t, err)
wfArchive.On("ListWorkflows", utils.ListOptions{
Namespace: "my-ns",
LabelRequirements: r,
Limit: 1,
}).Return(wfv1.Workflows{
*testutil.MustUnmarshalWorkflow(`
wfArchive.On("GetWorkflowForEstimator", "my-ns", r).Return(testutil.MustUnmarshalWorkflow(`
metadata:
name: my-archived-wftmpl-baseline`),
}, nil)
name: my-archived-wftmpl-baseline`), nil)
f := NewEstimatorFactory(informer, hydratorfake.Always, wfArchive)
t.Run("None", func(t *testing.T) {
p, err := f.NewEstimator(&wfv1.Workflow{})
Expand Down

0 comments on commit 45fce7d

Please sign in to comment.