From 7dafc0d70452c9c0dd96b8aa8fe4026153a87375 Mon Sep 17 00:00:00 2001 From: linzhengen Date: Wed, 24 Jul 2024 18:13:23 +0900 Subject: [PATCH] fix: improve archived api performance. Fixes #13382 #13295 Signed-off-by: linzhengen --- persist/sqldb/workflow_archive.go | 94 +++++++++++++------------------ 1 file changed, 39 insertions(+), 55 deletions(-) diff --git a/persist/sqldb/workflow_archive.go b/persist/sqldb/workflow_archive.go index 10411968bb5f..e82530489bb8 100644 --- a/persist/sqldb/workflow_archive.go +++ b/persist/sqldb/workflow_archive.go @@ -8,9 +8,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/upper/db/v4" "google.golang.org/grpc/codes" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" sutils "github.com/argoproj/argo-workflows/v3/server/utils" @@ -84,6 +82,22 @@ type workflowArchive struct { dbType dbType } +type workflow struct { + metav1.ObjectMeta `json:"metadata"` + Spec struct { + Suspend *bool `json:"suspend,omitempty"` + } `json:"spec"` + Status struct { + Phase wfv1.WorkflowPhase `json:"phase,omitempty"` + StartedAt metav1.Time `json:"startedAt,omitempty"` + FinishedAt metav1.Time `json:"finishedAt,omitempty"` + EstimatedDuration wfv1.EstimatedDuration `json:"estimatedDuration,omitempty"` + Progress wfv1.Progress `json:"progress,omitempty"` + Message string `json:"message,omitempty"` + ResourcesDuration wfv1.ResourcesDuration `json:"resourcesDuration,omitempty"` + } `json:"status,omitempty"` +} + func (r *workflowArchive) IsEnabled() bool { return true } @@ -154,19 +168,13 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error { } func (r *workflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error) { - var archivedWfs []archivedWorkflowMetadata - - selectQuery, err := selectArchivedWorkflowQuery(r.dbType) - if err != nil { - return nil, err - } - + var archivedWfs []archivedWorkflowRecord selector := r.session.SQL(). - Select(selectQuery). + Select("workflow"). From(archiveTableName). Where(r.clusterManagedNamespaceAndInstanceID()) - selector, err = BuildArchivedWorkflowSelector(selector, archiveTableName, archiveLabelsTableName, r.dbType, options, false) + selector, err := BuildArchivedWorkflowSelector(selector, archiveTableName, archiveLabelsTableName, r.dbType, options, false) if err != nil { return nil, err } @@ -178,48 +186,34 @@ func (r *workflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workfl wfs := make(wfv1.Workflows, len(archivedWfs)) for i, md := range archivedWfs { - labels := make(map[string]string) - if err := json.Unmarshal([]byte(md.Labels), &labels); err != nil { - return nil, err - } - // For backward compatibility, we should label workflow retrieved from DB as Persisted. - labels[common.LabelKeyWorkflowArchivingStatus] = "Persisted" - - annotations := make(map[string]string) - if err := json.Unmarshal([]byte(md.Annotations), &annotations); err != nil { - return nil, err - } - - t, err := time.Parse(time.RFC3339, md.CreationTimestamp) + var wf workflow + err = json.Unmarshal([]byte(md.Workflow), &wf) if err != nil { return nil, err } - - resourcesDuration := make(map[corev1.ResourceName]wfv1.ResourceDuration) - if err := json.Unmarshal([]byte(md.ResourcesDuration), &resourcesDuration); err != nil { - return nil, err - } + // For backward compatibility, we should label workflow retrieved from DB as Persisted. + wf.ObjectMeta.Labels[common.LabelKeyWorkflowArchivingStatus] = "Persisted" wfs[i] = wfv1.Workflow{ - ObjectMeta: v1.ObjectMeta{ - Name: md.Name, - Namespace: md.Namespace, - UID: types.UID(md.UID), - CreationTimestamp: v1.Time{Time: t}, - Labels: labels, - Annotations: annotations, + ObjectMeta: metav1.ObjectMeta{ + Name: wf.Name, + Namespace: wf.Namespace, + UID: wf.UID, + CreationTimestamp: wf.CreationTimestamp, + Labels: wf.Labels, + Annotations: wf.Annotations, }, Spec: wfv1.WorkflowSpec{ - Suspend: md.Suspend, + Suspend: wf.Spec.Suspend, }, Status: wfv1.WorkflowStatus{ - Phase: md.Phase, - StartedAt: v1.Time{Time: md.StartedAt}, - FinishedAt: v1.Time{Time: md.FinishedAt}, - Progress: wfv1.Progress(md.Progress), - Message: md.Message, - EstimatedDuration: wfv1.EstimatedDuration(md.EstimatedDuration), - ResourcesDuration: resourcesDuration, + Phase: wf.Status.Phase, + StartedAt: wf.Status.StartedAt, + FinishedAt: wf.Status.FinishedAt, + Progress: wf.Status.Progress, + Message: wf.Status.Message, + EstimatedDuration: wf.Status.EstimatedDuration, + ResourcesDuration: wf.Status.ResourcesDuration, }, } } @@ -376,13 +370,3 @@ func (r *workflowArchive) DeleteExpiredWorkflows(ttl time.Duration) error { log.WithFields(log.Fields{"rowsAffected": rowsAffected}).Info("Deleted archived workflows") return nil } - -func selectArchivedWorkflowQuery(t dbType) (*db.RawExpr, error) { - switch t { - case MySQL: - return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce(JSON_EXTRACT(workflow,'$.metadata.labels'), '{}') as labels,coalesce(JSON_EXTRACT(workflow,'$.metadata.annotations'), '{}') as annotations, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.status.progress')), '') as progress, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.metadata.creationTimestamp')), '') as creationtimestamp, JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.spec.suspend')) as suspend, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.status.message')), '') as message, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.status.estimatedDuration')), '0') as estimatedduration, coalesce(JSON_EXTRACT(workflow,'$.status.resourcesDuration'), '{}') as resourcesduration"), nil - case Postgres: - return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce((workflow::json)->'metadata'->>'labels', '{}') as labels, coalesce((workflow::json)->'metadata'->>'annotations', '{}') as annotations, coalesce((workflow::json)->'status'->>'progress', '') as progress, coalesce((workflow::json)->'metadata'->>'creationTimestamp', '') as creationtimestamp, (workflow::json)->'spec'->>'suspend' as suspend, coalesce((workflow::json)->'status'->>'message', '') as message, coalesce((workflow::json)->'status'->>'estimatedDuration', '0') as estimatedduration, coalesce((workflow::json)->'status'->>'resourcesDuration', '{}') as resourcesduration"), nil - } - return nil, fmt.Errorf("unsupported db type %s", t) -}