Skip to content

Commit

Permalink
fix: improve archived api performance. Fixes argoproj#13382 argoproj#…
Browse files Browse the repository at this point in the history
…13295

Signed-off-by: linzhengen <[email protected]>
  • Loading branch information
linzhengen committed Jul 25, 2024
1 parent c16c5e4 commit 7dafc0d
Showing 1 changed file with 39 additions and 55 deletions.
94 changes: 39 additions & 55 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/upper/db/v4"
"google.golang.org/grpc/codes"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
sutils "github.com/argoproj/argo-workflows/v3/server/utils"
Expand Down Expand Up @@ -84,6 +82,22 @@ type workflowArchive struct {
dbType dbType
}

type workflow struct {
metav1.ObjectMeta `json:"metadata"`
Spec struct {
Suspend *bool `json:"suspend,omitempty"`
} `json:"spec"`
Status struct {
Phase wfv1.WorkflowPhase `json:"phase,omitempty"`
StartedAt metav1.Time `json:"startedAt,omitempty"`
FinishedAt metav1.Time `json:"finishedAt,omitempty"`
EstimatedDuration wfv1.EstimatedDuration `json:"estimatedDuration,omitempty"`
Progress wfv1.Progress `json:"progress,omitempty"`
Message string `json:"message,omitempty"`
ResourcesDuration wfv1.ResourcesDuration `json:"resourcesDuration,omitempty"`
} `json:"status,omitempty"`
}

func (r *workflowArchive) IsEnabled() bool {
return true
}
Expand Down Expand Up @@ -154,19 +168,13 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
}

func (r *workflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error) {
var archivedWfs []archivedWorkflowMetadata

selectQuery, err := selectArchivedWorkflowQuery(r.dbType)
if err != nil {
return nil, err
}

var archivedWfs []archivedWorkflowRecord
selector := r.session.SQL().
Select(selectQuery).
Select("workflow").
From(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID())

selector, err = BuildArchivedWorkflowSelector(selector, archiveTableName, archiveLabelsTableName, r.dbType, options, false)
selector, err := BuildArchivedWorkflowSelector(selector, archiveTableName, archiveLabelsTableName, r.dbType, options, false)
if err != nil {
return nil, err
}
Expand All @@ -178,48 +186,34 @@ func (r *workflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workfl

wfs := make(wfv1.Workflows, len(archivedWfs))
for i, md := range archivedWfs {
labels := make(map[string]string)
if err := json.Unmarshal([]byte(md.Labels), &labels); err != nil {
return nil, err
}
// For backward compatibility, we should label workflow retrieved from DB as Persisted.
labels[common.LabelKeyWorkflowArchivingStatus] = "Persisted"

annotations := make(map[string]string)
if err := json.Unmarshal([]byte(md.Annotations), &annotations); err != nil {
return nil, err
}

t, err := time.Parse(time.RFC3339, md.CreationTimestamp)
var wf workflow
err = json.Unmarshal([]byte(md.Workflow), &wf)
if err != nil {
return nil, err
}

resourcesDuration := make(map[corev1.ResourceName]wfv1.ResourceDuration)
if err := json.Unmarshal([]byte(md.ResourcesDuration), &resourcesDuration); err != nil {
return nil, err
}
// For backward compatibility, we should label workflow retrieved from DB as Persisted.
wf.ObjectMeta.Labels[common.LabelKeyWorkflowArchivingStatus] = "Persisted"

wfs[i] = wfv1.Workflow{
ObjectMeta: v1.ObjectMeta{
Name: md.Name,
Namespace: md.Namespace,
UID: types.UID(md.UID),
CreationTimestamp: v1.Time{Time: t},
Labels: labels,
Annotations: annotations,
ObjectMeta: metav1.ObjectMeta{
Name: wf.Name,
Namespace: wf.Namespace,
UID: wf.UID,
CreationTimestamp: wf.CreationTimestamp,
Labels: wf.Labels,
Annotations: wf.Annotations,
},
Spec: wfv1.WorkflowSpec{
Suspend: md.Suspend,
Suspend: wf.Spec.Suspend,
},
Status: wfv1.WorkflowStatus{
Phase: md.Phase,
StartedAt: v1.Time{Time: md.StartedAt},
FinishedAt: v1.Time{Time: md.FinishedAt},
Progress: wfv1.Progress(md.Progress),
Message: md.Message,
EstimatedDuration: wfv1.EstimatedDuration(md.EstimatedDuration),
ResourcesDuration: resourcesDuration,
Phase: wf.Status.Phase,
StartedAt: wf.Status.StartedAt,
FinishedAt: wf.Status.FinishedAt,
Progress: wf.Status.Progress,
Message: wf.Status.Message,
EstimatedDuration: wf.Status.EstimatedDuration,
ResourcesDuration: wf.Status.ResourcesDuration,
},
}
}
Expand Down Expand Up @@ -376,13 +370,3 @@ func (r *workflowArchive) DeleteExpiredWorkflows(ttl time.Duration) error {
log.WithFields(log.Fields{"rowsAffected": rowsAffected}).Info("Deleted archived workflows")
return nil
}

func selectArchivedWorkflowQuery(t dbType) (*db.RawExpr, error) {
switch t {
case MySQL:
return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce(JSON_EXTRACT(workflow,'$.metadata.labels'), '{}') as labels,coalesce(JSON_EXTRACT(workflow,'$.metadata.annotations'), '{}') as annotations, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.status.progress')), '') as progress, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.metadata.creationTimestamp')), '') as creationtimestamp, JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.spec.suspend')) as suspend, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.status.message')), '') as message, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.status.estimatedDuration')), '0') as estimatedduration, coalesce(JSON_EXTRACT(workflow,'$.status.resourcesDuration'), '{}') as resourcesduration"), nil
case Postgres:
return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce((workflow::json)->'metadata'->>'labels', '{}') as labels, coalesce((workflow::json)->'metadata'->>'annotations', '{}') as annotations, coalesce((workflow::json)->'status'->>'progress', '') as progress, coalesce((workflow::json)->'metadata'->>'creationTimestamp', '') as creationtimestamp, (workflow::json)->'spec'->>'suspend' as suspend, coalesce((workflow::json)->'status'->>'message', '') as message, coalesce((workflow::json)->'status'->>'estimatedDuration', '0') as estimatedduration, coalesce((workflow::json)->'status'->>'resourcesDuration', '{}') as resourcesduration"), nil
}
return nil, fmt.Errorf("unsupported db type %s", t)
}

0 comments on commit 7dafc0d

Please sign in to comment.