From 5da3e0a31be255d93fb4a8eca63c775820959d4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E8=BE=BE?= Date: Fri, 9 Aug 2024 01:26:42 +0800 Subject: [PATCH] fix: optimise workflow node status offload with compression and fallback and db stability, fix CI errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 刘达 --- docs/environment-variables.md | 97 ++++++++++++++++++----------------- go.mod | 2 +- test/stress/mysql/main.go | 37 ++++++++----- 3 files changed, 73 insertions(+), 63 deletions(-) diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 94c14c798939..e7b44e289cc5 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -8,54 +8,55 @@ This document outlines environment variables that can be used to customize behav ## Controller -| Name | Type | Default | Description | -|------------------------------------------|---------------------|---------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `ARGO_AGENT_TASK_WORKERS` | `int` | `16` | The number of task workers for the agent pod. | -| `ALL_POD_CHANGES_SIGNIFICANT` | `bool` | `false` | Whether to consider all pod changes as significant during pod reconciliation. | -| `ALWAYS_OFFLOAD_NODE_STATUS` | `bool` | `false` | Whether to always offload the node status. | -| `OFFLOAD_NODE_STATUS_ERROR_FALLBACK` | `bool` | `false` | Control if we can save node status to workflow as normal when offload failed. | -| `HYDRATION_FAILED_RETRY_DURATION` | `time.Duration` | `900s` | The time that we wait before mark workflow as Error when OFFLOAD_NODE_STATUS_ERROR_FALLBACK enabled. | -| `ARCHIVED_WORKFLOW_GC_PERIOD` | `time.Duration` | `24h` | The periodicity for GC of archived workflows. | -| `ARGO_PPROF` | `bool` | `false` | Enable [`pprof`](https://go.dev/blog/pprof) endpoints | -| `ARGO_PROGRESS_PATCH_TICK_DURATION` | `time.Duration` | `1m` | How often self reported progress is patched into the pod annotations which means how long it takes until the controller picks up the progress change. Set to 0 to disable self reporting progress. | -| `ARGO_PROGRESS_FILE_TICK_DURATION` | `time.Duration` | `3s` | How often the progress file is read by the executor. Set to 0 to disable self reporting progress. | -| `ARGO_REMOVE_PVC_PROTECTION_FINALIZER` | `bool` | `true` | Remove the `kubernetes.io/pvc-protection` finalizer from persistent volume claims (PVC) after marking PVCs created for the workflow for deletion, so deleted is not blocked until the pods are deleted. [#6629](https://github.com/argoproj/argo-workflows/issues/6629) | -| `ARGO_TRACE` | `string` | `` | Whether to enable tracing statements in Argo components. | -| `ARGO_AGENT_PATCH_RATE` | `time.Duration` | `DEFAULT_REQUEUE_TIME` | Rate that the Argo Agent will patch the workflow task-set. | -| `ARGO_AGENT_CPU_LIMIT` | `resource.Quantity` | `100m` | CPU resource limit for the agent. | -| `ARGO_AGENT_MEMORY_LIMIT` | `resource.Quantity` | `256m` | Memory resource limit for the agent. | -| `ARGO_POD_STATUS_CAPTURE_FINALIZER` | `bool` | `false` | The finalizer blocks the deletion of pods until the controller captures their status. -| `BUBBLE_ENTRY_TEMPLATE_ERR` | `bool` | `true` | Whether to bubble up template errors to workflow. | -| `CACHE_GC_PERIOD` | `time.Duration` | `0s` | How often to perform memoization cache GC, which is disabled by default and can be enabled by providing a non-zero duration. | -| `CACHE_GC_AFTER_NOT_HIT_DURATION` | `time.Duration` | `30s` | When a memoization cache has not been hit after this duration, it will be deleted. | -| `CRON_SYNC_PERIOD` | `time.Duration` | `10s` | How often to sync cron workflows. | -| `DEFAULT_REQUEUE_TIME` | `time.Duration` | `10s` | The re-queue time for the rate limiter of the workflow queue. | -| `DISABLE_MAX_RECURSION` | `bool` | `false` | Set to true to disable the recursion preventer, which will stop a workflow running which has called into a child template 100 times | -| `EXPRESSION_TEMPLATES` | `bool` | `true` | Escape hatch to disable expression templates. | -| `EVENT_AGGREGATION_WITH_ANNOTATIONS` | `bool` | `false` | Whether event annotations will be used when aggregating events. | -| `GZIP_IMPLEMENTATION` | `string` | `PGZip` | The implementation of compression/decompression. Currently only "`PGZip`" and "`GZip`" are supported. | -| `INFORMER_WRITE_BACK` | `bool` | `true` | Whether to write back to informer instead of catching up. | -| `HEALTHZ_AGE` | `time.Duration` | `5m` | How old a un-reconciled workflow is to report unhealthy. | -| `INDEX_WORKFLOW_SEMAPHORE_KEYS` | `bool` | `true` | Whether or not to index semaphores. | -| `LEADER_ELECTION_IDENTITY` | `string` | Controller's `metadata.name` | The ID used for workflow controllers to elect a leader. | -| `LEADER_ELECTION_DISABLE` | `bool` | `false` | Whether leader election should be disabled. | -| `LEADER_ELECTION_LEASE_DURATION` | `time.Duration` | `15s` | The duration that non-leader candidates will wait to force acquire leadership. | -| `LEADER_ELECTION_RENEW_DEADLINE` | `time.Duration` | `10s` | The duration that the acting master will retry refreshing leadership before giving up. | -| `LEADER_ELECTION_RETRY_PERIOD` | `time.Duration` | `5s` | The duration that the leader election clients should wait between tries of actions. | -| `MAX_OPERATION_TIME` | `time.Duration` | `30s` | The maximum time a workflow operation is allowed to run for before re-queuing the workflow onto the work queue. | -| `OFFLOAD_NODE_STATUS_TTL` | `time.Duration` | `5m` | The TTL to delete the offloaded node status. Currently only used for testing. | -| `OPERATION_DURATION_METRIC_BUCKET_COUNT` | `int` | `6` | The number of buckets to collect the metric for the operation duration. | -| `POD_NAMES` | `string` | `v2` | Whether to have pod names contain the template name (v2) or be the node id (v1) - should be set the same for Argo Server. | -| `RECENTLY_STARTED_POD_DURATION` | `time.Duration` | `10s` | The duration of a pod before the pod is considered to be recently started. | -| `RETRY_BACKOFF_DURATION` | `time.Duration` | `10ms` | The retry back-off duration when retrying API calls. | -| `RETRY_BACKOFF_FACTOR` | `float` | `2.0` | The retry back-off factor when retrying API calls. | -| `RETRY_BACKOFF_STEPS` | `int` | `5` | The retry back-off steps when retrying API calls. | -| `RETRY_HOST_NAME_LABEL_KEY` | `string` | `kubernetes.io/hostname` | The label key for host name used when retrying templates. | -| `TRANSIENT_ERROR_PATTERN` | `string` | `""` | The regular expression that represents additional patterns for transient errors. | -| `WF_DEL_PROPAGATION_POLICY` | `string` | `""` | The deletion propagation policy for workflows. | -| `WORKFLOW_GC_PERIOD` | `time.Duration` | `5m` | The periodicity for GC of workflows. | -| `SEMAPHORE_NOTIFY_DELAY` | `time.Duration` | `1s` | Tuning Delay when notifying semaphore waiters about availability in the semaphore | -| `WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS` | `bool` | `true` | Whether to watch the Controller's ConfigMap and semaphore ConfigMaps for run-time changes. When disabled, the Controller will only read these ConfigMaps once and will have to be manually restarted to pick up new changes. | +| Name | Type | Default | Description | +|------------------------------------------|---------------------|------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `ARGO_AGENT_TASK_WORKERS` | `int` | `16` | The number of task workers for the agent pod. | +| `ALL_POD_CHANGES_SIGNIFICANT` | `bool` | `false` | Whether to consider all pod changes as significant during pod reconciliation. | +| `ALWAYS_OFFLOAD_NODE_STATUS` | `bool` | `false` | Whether to always offload the node status. | +| `OFFLOAD_NODE_STATUS_ERROR_FALLBACK` | `bool` | `false` | Control if we can save node status to workflow as normal when offload failed. | +| `HYDRATION_FAILED_RETRY_DURATION` | `time.Duration` | `900s` | The time that we wait before mark workflow as Error when fallback enabled. | +| `ARCHIVED_WORKFLOW_GC_PERIOD` | `time.Duration` | `24h` | The periodicity for GC of archived workflows. | +| `ARGO_PPROF` | `bool` | `false` | Enable [`pprof`](https://go.dev/blog/pprof) endpoints | +| `ARGO_PROGRESS_PATCH_TICK_DURATION` | `time.Duration` | `1m` | How often self reported progress is patched into the pod annotations which means how long it takes until the controller picks up the progress change. Set to 0 to disable self reporting progress. | +| `ARGO_PROGRESS_FILE_TICK_DURATION` | `time.Duration` | `3s` | How often the progress file is read by the executor. Set to 0 to disable self reporting progress. | +| `ARGO_REMOVE_PVC_PROTECTION_FINALIZER` | `bool` | `true` | Remove the `kubernetes.io/pvc-protection` finalizer from persistent volume claims (PVC) after marking PVCs created for the workflow for deletion, so deleted is not blocked until the pods are deleted. [#6629](https://github.com/argoproj/argo-workflows/issues/6629) | +| `ARGO_TRACE` | `string` | `` | Whether to enable tracing statements in Argo components. | +| `ARGO_AGENT_PATCH_RATE` | `time.Duration` | `DEFAULT_REQUEUE_TIME` | Rate that the Argo Agent will patch the workflow task-set. | +| `ARGO_AGENT_CPU_LIMIT` | `resource.Quantity` | `100m` | CPU resource limit for the agent. | +| `ARGO_AGENT_MEMORY_LIMIT` | `resource.Quantity` | `256m` | Memory resource limit for the agent. | +| `ARGO_POD_STATUS_CAPTURE_FINALIZER` | `bool` | `false` | The finalizer blocks the deletion of pods until the controller captures their status. +| `BUBBLE_ENTRY_TEMPLATE_ERR` | `bool` | `true` | Whether to bubble up template errors to workflow. | +| `CACHE_GC_PERIOD` | `time.Duration` | `0s` | How often to perform memoization cache GC, which is disabled by default and can be enabled by providing a non-zero duration. | +| `CACHE_GC_AFTER_NOT_HIT_DURATION` | `time.Duration` | `30s` | When a memoization cache has not been hit after this duration, it will be deleted. | +| `CRON_SYNC_PERIOD` | `time.Duration` | `10s` | How often to sync cron workflows. | +| `DEFAULT_REQUEUE_TIME` | `time.Duration` | `10s` | The re-queue time for the rate limiter of the workflow queue. | +| `DISABLE_MAX_RECURSION` | `bool` | `false` | Set to true to disable the recursion preventer, which will stop a workflow running which has called into a child template 100 times | +| `EXPRESSION_TEMPLATES` | `bool` | `true` | Escape hatch to disable expression templates. | +| `EVENT_AGGREGATION_WITH_ANNOTATIONS` | `bool` | `false` | Whether event annotations will be used when aggregating events. | +| `GZIP_IMPLEMENTATION` | `string` | `PGZip` | The implementation of compression/decompression. Currently only "`PGZip`" and "`GZip`" are supported. | +| `INFORMER_WRITE_BACK` | `bool` | `true` | Whether to write back to informer instead of catching up. | +| `HEALTHZ_AGE` | `time.Duration` | `5m` | How old a un-reconciled workflow is to report unhealthy. | +| `INDEX_WORKFLOW_SEMAPHORE_KEYS` | `bool` | `true` | Whether or not to index semaphores. | +| `LEADER_ELECTION_IDENTITY` | `string` | Controller's `metadata.name` | The ID used for workflow controllers to elect a leader. | +| `LEADER_ELECTION_DISABLE` | `bool` | `false` | Whether leader election should be disabled. | +| `LEADER_ELECTION_LEASE_DURATION` | `time.Duration` | `15s` | The duration that non-leader candidates will wait to force acquire leadership. | +| `LEADER_ELECTION_RENEW_DEADLINE` | `time.Duration` | `10s` | The duration that the acting master will retry refreshing leadership before giving up. | +| `LEADER_ELECTION_RETRY_PERIOD` | `time.Duration` | `5s` | The duration that the leader election clients should wait between tries of actions. | +| `MAX_OPERATION_TIME` | `time.Duration` | `30s` | The maximum time a workflow operation is allowed to run for before re-queuing the workflow onto the work queue. | +| `OFFLOAD_NODE_STATUS_TTL` | `time.Duration` | `5m` | The TTL to delete the offloaded node status. Currently only used for testing. | +| `OPERATION_DURATION_METRIC_BUCKET_COUNT` | `int` | `6` | The number of buckets to collect the metric for the operation duration. | +| `POD_NAMES` | `string` | `v2` | Whether to have pod names contain the template name (v2) or be the node id (v1) - should be set the same for Argo Server. | +| `RECENTLY_STARTED_POD_DURATION` | `time.Duration` | `10s` | The duration of a pod before the pod is considered to be recently started. | +| `RETRY_BACKOFF_DURATION` | `time.Duration` | `10ms` | The retry back-off duration when retrying API calls. | +| `RETRY_BACKOFF_FACTOR` | `float` | `2.0` | The retry back-off factor when retrying API calls. | +| `RETRY_BACKOFF_STEPS` | `int` | `5` | The retry back-off steps when retrying API calls. | +| `RETRY_HOST_NAME_LABEL_KEY` | `string` | `kubernetes.io/hostname` | The label key for host name used when retrying templates. | +| `TRANSIENT_ERROR_PATTERN` | `string` | `""` | The regular expression that represents additional patterns for transient errors. | +| `WF_DEL_PROPAGATION_POLICY` | `string` | `""` | The deletion propagation policy for workflows. | +| `WORKFLOW_GC_PERIOD` | `time.Duration` | `5m` | The periodicity for GC of workflows. | +| `WORKFLOW_GC_MAX_WORKER` | `int` | `10` | The worker count for GC of workflows. | +| `SEMAPHORE_NOTIFY_DELAY` | `time.Duration` | `1s` | Tuning Delay when notifying semaphore waiters about availability in the semaphore | +| `WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS` | `bool` | `true` | Whether to watch the Controller's ConfigMap and semaphore ConfigMaps for run-time changes. When disabled, the Controller will only read these ConfigMaps once and will have to be manually restarted to pick up new changes. | CLI parameters of the Controller can be specified as environment variables with the `ARGO_` prefix. For example: diff --git a/go.mod b/go.mod index 300f9200f110..bb8902f4684d 100644 --- a/go.mod +++ b/go.mod @@ -213,7 +213,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect - github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/hashicorp/go-uuid v1.0.3 github.com/hashicorp/hcl v1.0.0 // indirect github.com/huandu/xstrings v1.3.3 // indirect github.com/imdario/mergo v0.3.15 // indirect diff --git a/test/stress/mysql/main.go b/test/stress/mysql/main.go index eacf9811dfb0..83c9b0e8ee1e 100644 --- a/test/stress/mysql/main.go +++ b/test/stress/mysql/main.go @@ -3,18 +3,6 @@ package main import ( "context" "flag" - "github.com/argoproj/argo-workflows/v3/persist/sqldb" - wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" - envutil "github.com/argoproj/argo-workflows/v3/util/env" - "github.com/argoproj/argo-workflows/v3/util/instanceid" - "github.com/argoproj/pkg/rand" - "github.com/hashicorp/go-uuid" - log "github.com/sirupsen/logrus" - "github.com/upper/db/v4" - "github.com/upper/db/v4/adapter/mysql" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - runtimeutil "k8s.io/apimachinery/pkg/util/runtime" "os" "os/signal" "strconv" @@ -23,6 +11,20 @@ import ( "sync/atomic" "syscall" "time" + + "github.com/argoproj/pkg/rand" + "github.com/hashicorp/go-uuid" + log "github.com/sirupsen/logrus" + "github.com/upper/db/v4" + "github.com/upper/db/v4/adapter/mysql" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + runtimeutil "k8s.io/apimachinery/pkg/util/runtime" + + "github.com/argoproj/argo-workflows/v3/persist/sqldb" + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + envutil "github.com/argoproj/argo-workflows/v3/util/env" + "github.com/argoproj/argo-workflows/v3/util/instanceid" ) type archivedRecord struct { @@ -62,6 +64,9 @@ func getArchivedWorkflowsCount(session db.Session) (int, error) { log.Warnf("Get archived workflow count error: %s", err) return 0, err } + defer func() { + rows.Close() + }() if rows.Next() { err = rows.Scan(&count) if err != nil { @@ -108,6 +113,9 @@ func main() { if err != nil { log.Fatal(err) } + defer func() { + session.Close() + }() session.SetMaxOpenConns(5000) session.SetMaxIdleConns(150) session.SetConnMaxLifetime(60 * time.Second) @@ -176,6 +184,7 @@ func main() { UID: types.UID(uid), Name: name, Namespace: "argo-managed", + Labels: map[string]string{}, }, Status: wfv1.WorkflowStatus{ Nodes: map[string]wfv1.NodeStatus{ @@ -308,7 +317,7 @@ func main() { go reconcileWf() } - stopCh := make(chan os.Signal) + stopCh := make(chan os.Signal, 1) signal.Notify(stopCh, os.Interrupt, syscall.SIGTERM) stopTimer := time.NewTimer(duration) @@ -331,7 +340,7 @@ func main() { if err != nil { log.Warnf("Clean up expired archive workflows error: %s", err) } - log.Infof("Cleaning %d archives cost %s", archiveCleanSize, time.Now().Sub(startTime)) + log.Infof("Cleaning %d archives cost %s", archiveCleanSize, time.Since(startTime)) cleanTicker.Reset(cleanDuration) log.Infof("Average rate when cleaning archives is: %f", calculateRate(startTime))