diff --git a/docs/environment-variables.md b/docs/environment-variables.md index c9a56c852f6b..e7b44e289cc5 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -8,52 +8,55 @@ This document outlines environment variables that can be used to customize behav ## Controller -| Name | Type | Default | Description | -|------------------------------------------|---------------------|---------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `ARGO_AGENT_TASK_WORKERS` | `int` | `16` | The number of task workers for the agent pod. | -| `ALL_POD_CHANGES_SIGNIFICANT` | `bool` | `false` | Whether to consider all pod changes as significant during pod reconciliation. | -| `ALWAYS_OFFLOAD_NODE_STATUS` | `bool` | `false` | Whether to always offload the node status. | -| `ARCHIVED_WORKFLOW_GC_PERIOD` | `time.Duration` | `24h` | The periodicity for GC of archived workflows. | -| `ARGO_PPROF` | `bool` | `false` | Enable [`pprof`](https://go.dev/blog/pprof) endpoints | -| `ARGO_PROGRESS_PATCH_TICK_DURATION` | `time.Duration` | `1m` | How often self reported progress is patched into the pod annotations which means how long it takes until the controller picks up the progress change. Set to 0 to disable self reporting progress. | -| `ARGO_PROGRESS_FILE_TICK_DURATION` | `time.Duration` | `3s` | How often the progress file is read by the executor. Set to 0 to disable self reporting progress. | -| `ARGO_REMOVE_PVC_PROTECTION_FINALIZER` | `bool` | `true` | Remove the `kubernetes.io/pvc-protection` finalizer from persistent volume claims (PVC) after marking PVCs created for the workflow for deletion, so deleted is not blocked until the pods are deleted. [#6629](https://github.com/argoproj/argo-workflows/issues/6629) | -| `ARGO_TRACE` | `string` | `` | Whether to enable tracing statements in Argo components. | -| `ARGO_AGENT_PATCH_RATE` | `time.Duration` | `DEFAULT_REQUEUE_TIME` | Rate that the Argo Agent will patch the workflow task-set. | -| `ARGO_AGENT_CPU_LIMIT` | `resource.Quantity` | `100m` | CPU resource limit for the agent. | -| `ARGO_AGENT_MEMORY_LIMIT` | `resource.Quantity` | `256m` | Memory resource limit for the agent. | -| `ARGO_POD_STATUS_CAPTURE_FINALIZER` | `bool` | `false` | The finalizer blocks the deletion of pods until the controller captures their status. -| `BUBBLE_ENTRY_TEMPLATE_ERR` | `bool` | `true` | Whether to bubble up template errors to workflow. | -| `CACHE_GC_PERIOD` | `time.Duration` | `0s` | How often to perform memoization cache GC, which is disabled by default and can be enabled by providing a non-zero duration. | -| `CACHE_GC_AFTER_NOT_HIT_DURATION` | `time.Duration` | `30s` | When a memoization cache has not been hit after this duration, it will be deleted. | -| `CRON_SYNC_PERIOD` | `time.Duration` | `10s` | How often to sync cron workflows. | -| `DEFAULT_REQUEUE_TIME` | `time.Duration` | `10s` | The re-queue time for the rate limiter of the workflow queue. | -| `DISABLE_MAX_RECURSION` | `bool` | `false` | Set to true to disable the recursion preventer, which will stop a workflow running which has called into a child template 100 times | -| `EXPRESSION_TEMPLATES` | `bool` | `true` | Escape hatch to disable expression templates. | -| `EVENT_AGGREGATION_WITH_ANNOTATIONS` | `bool` | `false` | Whether event annotations will be used when aggregating events. | -| `GZIP_IMPLEMENTATION` | `string` | `PGZip` | The implementation of compression/decompression. Currently only "`PGZip`" and "`GZip`" are supported. | -| `INFORMER_WRITE_BACK` | `bool` | `true` | Whether to write back to informer instead of catching up. | -| `HEALTHZ_AGE` | `time.Duration` | `5m` | How old a un-reconciled workflow is to report unhealthy. | -| `INDEX_WORKFLOW_SEMAPHORE_KEYS` | `bool` | `true` | Whether or not to index semaphores. | -| `LEADER_ELECTION_IDENTITY` | `string` | Controller's `metadata.name` | The ID used for workflow controllers to elect a leader. | -| `LEADER_ELECTION_DISABLE` | `bool` | `false` | Whether leader election should be disabled. | -| `LEADER_ELECTION_LEASE_DURATION` | `time.Duration` | `15s` | The duration that non-leader candidates will wait to force acquire leadership. | -| `LEADER_ELECTION_RENEW_DEADLINE` | `time.Duration` | `10s` | The duration that the acting master will retry refreshing leadership before giving up. | -| `LEADER_ELECTION_RETRY_PERIOD` | `time.Duration` | `5s` | The duration that the leader election clients should wait between tries of actions. | -| `MAX_OPERATION_TIME` | `time.Duration` | `30s` | The maximum time a workflow operation is allowed to run for before re-queuing the workflow onto the work queue. | -| `OFFLOAD_NODE_STATUS_TTL` | `time.Duration` | `5m` | The TTL to delete the offloaded node status. Currently only used for testing. | -| `OPERATION_DURATION_METRIC_BUCKET_COUNT` | `int` | `6` | The number of buckets to collect the metric for the operation duration. | -| `POD_NAMES` | `string` | `v2` | Whether to have pod names contain the template name (v2) or be the node id (v1) - should be set the same for Argo Server. | -| `RECENTLY_STARTED_POD_DURATION` | `time.Duration` | `10s` | The duration of a pod before the pod is considered to be recently started. | -| `RETRY_BACKOFF_DURATION` | `time.Duration` | `10ms` | The retry back-off duration when retrying API calls. | -| `RETRY_BACKOFF_FACTOR` | `float` | `2.0` | The retry back-off factor when retrying API calls. | -| `RETRY_BACKOFF_STEPS` | `int` | `5` | The retry back-off steps when retrying API calls. | -| `RETRY_HOST_NAME_LABEL_KEY` | `string` | `kubernetes.io/hostname` | The label key for host name used when retrying templates. | -| `TRANSIENT_ERROR_PATTERN` | `string` | `""` | The regular expression that represents additional patterns for transient errors. | -| `WF_DEL_PROPAGATION_POLICY` | `string` | `""` | The deletion propagation policy for workflows. | -| `WORKFLOW_GC_PERIOD` | `time.Duration` | `5m` | The periodicity for GC of workflows. | -| `SEMAPHORE_NOTIFY_DELAY` | `time.Duration` | `1s` | Tuning Delay when notifying semaphore waiters about availability in the semaphore | -| `WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS` | `bool` | `true` | Whether to watch the Controller's ConfigMap and semaphore ConfigMaps for run-time changes. When disabled, the Controller will only read these ConfigMaps once and will have to be manually restarted to pick up new changes. | +| Name | Type | Default | Description | +|------------------------------------------|---------------------|------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `ARGO_AGENT_TASK_WORKERS` | `int` | `16` | The number of task workers for the agent pod. | +| `ALL_POD_CHANGES_SIGNIFICANT` | `bool` | `false` | Whether to consider all pod changes as significant during pod reconciliation. | +| `ALWAYS_OFFLOAD_NODE_STATUS` | `bool` | `false` | Whether to always offload the node status. | +| `OFFLOAD_NODE_STATUS_ERROR_FALLBACK` | `bool` | `false` | Control if we can save node status to workflow as normal when offload failed. | +| `HYDRATION_FAILED_RETRY_DURATION` | `time.Duration` | `900s` | The time that we wait before mark workflow as Error when fallback enabled. | +| `ARCHIVED_WORKFLOW_GC_PERIOD` | `time.Duration` | `24h` | The periodicity for GC of archived workflows. | +| `ARGO_PPROF` | `bool` | `false` | Enable [`pprof`](https://go.dev/blog/pprof) endpoints | +| `ARGO_PROGRESS_PATCH_TICK_DURATION` | `time.Duration` | `1m` | How often self reported progress is patched into the pod annotations which means how long it takes until the controller picks up the progress change. Set to 0 to disable self reporting progress. | +| `ARGO_PROGRESS_FILE_TICK_DURATION` | `time.Duration` | `3s` | How often the progress file is read by the executor. Set to 0 to disable self reporting progress. | +| `ARGO_REMOVE_PVC_PROTECTION_FINALIZER` | `bool` | `true` | Remove the `kubernetes.io/pvc-protection` finalizer from persistent volume claims (PVC) after marking PVCs created for the workflow for deletion, so deleted is not blocked until the pods are deleted. [#6629](https://github.com/argoproj/argo-workflows/issues/6629) | +| `ARGO_TRACE` | `string` | `` | Whether to enable tracing statements in Argo components. | +| `ARGO_AGENT_PATCH_RATE` | `time.Duration` | `DEFAULT_REQUEUE_TIME` | Rate that the Argo Agent will patch the workflow task-set. | +| `ARGO_AGENT_CPU_LIMIT` | `resource.Quantity` | `100m` | CPU resource limit for the agent. | +| `ARGO_AGENT_MEMORY_LIMIT` | `resource.Quantity` | `256m` | Memory resource limit for the agent. | +| `ARGO_POD_STATUS_CAPTURE_FINALIZER` | `bool` | `false` | The finalizer blocks the deletion of pods until the controller captures their status. +| `BUBBLE_ENTRY_TEMPLATE_ERR` | `bool` | `true` | Whether to bubble up template errors to workflow. | +| `CACHE_GC_PERIOD` | `time.Duration` | `0s` | How often to perform memoization cache GC, which is disabled by default and can be enabled by providing a non-zero duration. | +| `CACHE_GC_AFTER_NOT_HIT_DURATION` | `time.Duration` | `30s` | When a memoization cache has not been hit after this duration, it will be deleted. | +| `CRON_SYNC_PERIOD` | `time.Duration` | `10s` | How often to sync cron workflows. | +| `DEFAULT_REQUEUE_TIME` | `time.Duration` | `10s` | The re-queue time for the rate limiter of the workflow queue. | +| `DISABLE_MAX_RECURSION` | `bool` | `false` | Set to true to disable the recursion preventer, which will stop a workflow running which has called into a child template 100 times | +| `EXPRESSION_TEMPLATES` | `bool` | `true` | Escape hatch to disable expression templates. | +| `EVENT_AGGREGATION_WITH_ANNOTATIONS` | `bool` | `false` | Whether event annotations will be used when aggregating events. | +| `GZIP_IMPLEMENTATION` | `string` | `PGZip` | The implementation of compression/decompression. Currently only "`PGZip`" and "`GZip`" are supported. | +| `INFORMER_WRITE_BACK` | `bool` | `true` | Whether to write back to informer instead of catching up. | +| `HEALTHZ_AGE` | `time.Duration` | `5m` | How old a un-reconciled workflow is to report unhealthy. | +| `INDEX_WORKFLOW_SEMAPHORE_KEYS` | `bool` | `true` | Whether or not to index semaphores. | +| `LEADER_ELECTION_IDENTITY` | `string` | Controller's `metadata.name` | The ID used for workflow controllers to elect a leader. | +| `LEADER_ELECTION_DISABLE` | `bool` | `false` | Whether leader election should be disabled. | +| `LEADER_ELECTION_LEASE_DURATION` | `time.Duration` | `15s` | The duration that non-leader candidates will wait to force acquire leadership. | +| `LEADER_ELECTION_RENEW_DEADLINE` | `time.Duration` | `10s` | The duration that the acting master will retry refreshing leadership before giving up. | +| `LEADER_ELECTION_RETRY_PERIOD` | `time.Duration` | `5s` | The duration that the leader election clients should wait between tries of actions. | +| `MAX_OPERATION_TIME` | `time.Duration` | `30s` | The maximum time a workflow operation is allowed to run for before re-queuing the workflow onto the work queue. | +| `OFFLOAD_NODE_STATUS_TTL` | `time.Duration` | `5m` | The TTL to delete the offloaded node status. Currently only used for testing. | +| `OPERATION_DURATION_METRIC_BUCKET_COUNT` | `int` | `6` | The number of buckets to collect the metric for the operation duration. | +| `POD_NAMES` | `string` | `v2` | Whether to have pod names contain the template name (v2) or be the node id (v1) - should be set the same for Argo Server. | +| `RECENTLY_STARTED_POD_DURATION` | `time.Duration` | `10s` | The duration of a pod before the pod is considered to be recently started. | +| `RETRY_BACKOFF_DURATION` | `time.Duration` | `10ms` | The retry back-off duration when retrying API calls. | +| `RETRY_BACKOFF_FACTOR` | `float` | `2.0` | The retry back-off factor when retrying API calls. | +| `RETRY_BACKOFF_STEPS` | `int` | `5` | The retry back-off steps when retrying API calls. | +| `RETRY_HOST_NAME_LABEL_KEY` | `string` | `kubernetes.io/hostname` | The label key for host name used when retrying templates. | +| `TRANSIENT_ERROR_PATTERN` | `string` | `""` | The regular expression that represents additional patterns for transient errors. | +| `WF_DEL_PROPAGATION_POLICY` | `string` | `""` | The deletion propagation policy for workflows. | +| `WORKFLOW_GC_PERIOD` | `time.Duration` | `5m` | The periodicity for GC of workflows. | +| `WORKFLOW_GC_MAX_WORKER` | `int` | `10` | The worker count for GC of workflows. | +| `SEMAPHORE_NOTIFY_DELAY` | `time.Duration` | `1s` | Tuning Delay when notifying semaphore waiters about availability in the semaphore | +| `WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS` | `bool` | `true` | Whether to watch the Controller's ConfigMap and semaphore ConfigMaps for run-time changes. When disabled, the Controller will only read these ConfigMaps once and will have to be manually restarted to pick up new changes. | CLI parameters of the Controller can be specified as environment variables with the `ARGO_` prefix. For example: diff --git a/go.mod b/go.mod index 300f9200f110..bb8902f4684d 100644 --- a/go.mod +++ b/go.mod @@ -213,7 +213,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect - github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/hashicorp/go-uuid v1.0.3 github.com/hashicorp/hcl v1.0.0 // indirect github.com/huandu/xstrings v1.3.3 // indirect github.com/imdario/mergo v0.3.15 // indirect diff --git a/persist/sqldb/explosive_offload_node_status_repo.go b/persist/sqldb/explosive_offload_node_status_repo.go index d9e1816af2cb..fa5c7dcd6b2a 100644 --- a/persist/sqldb/explosive_offload_node_status_repo.go +++ b/persist/sqldb/explosive_offload_node_status_repo.go @@ -33,6 +33,6 @@ func (n *explosiveOffloadNodeStatusRepo) Delete(string, string) error { return OffloadNotSupportedError } -func (n *explosiveOffloadNodeStatusRepo) ListOldOffloads(string) (map[string][]string, error) { +func (n *explosiveOffloadNodeStatusRepo) ListOldOffloads(string) (map[string][]NodesRecord, error) { return nil, OffloadNotSupportedError } diff --git a/persist/sqldb/migrate.go b/persist/sqldb/migrate.go index 45c908cc87de..1453720e00cc 100644 --- a/persist/sqldb/migrate.go +++ b/persist/sqldb/migrate.go @@ -258,6 +258,15 @@ func (m migrate) Exec(ctx context.Context) (err error) { // add indexes for list archived workflow performance. #8836 ansiSQLChange(`create index argo_archived_workflows_i4 on argo_archived_workflows (startedat)`), ansiSQLChange(`create index argo_archived_workflows_labels_i1 on argo_archived_workflows_labels (name,value)`), + ternary(dbType == MySQL, + ansiSQLChange(`alter table `+m.tableName+` add column compressednodes longtext`), + ansiSQLChange(`alter table `+m.tableName+` add column compressednodes text`), + ), + ansiSQLChange(`update ` + m.tableName + ` set compressednodes = '' where compressednodes is null`), + ternary(dbType == MySQL, + ansiSQLChange(`alter table `+m.tableName+` add column id bigint auto_increment, add constraint id unique (id)`), + ansiSQLChange(`alter table `+m.tableName+` add column id bigserial, add constraint id unique (id)`), + ), } { err := m.applyChange(changeSchemaVersion, change) if err != nil { diff --git a/persist/sqldb/mocks/OffloadNodeStatusRepo.go b/persist/sqldb/mocks/OffloadNodeStatusRepo.go index 47e038504d7b..72be2efe9914 100644 --- a/persist/sqldb/mocks/OffloadNodeStatusRepo.go +++ b/persist/sqldb/mocks/OffloadNodeStatusRepo.go @@ -89,15 +89,15 @@ func (_m *OffloadNodeStatusRepo) List(namespace string) (map[sqldb.UUIDVersion]v } // ListOldOffloads provides a mock function with given fields: namespace -func (_m *OffloadNodeStatusRepo) ListOldOffloads(namespace string) (map[string][]string, error) { +func (_m *OffloadNodeStatusRepo) ListOldOffloads(namespace string) (map[string][]sqldb.NodesRecord, error) { ret := _m.Called(namespace) - var r0 map[string][]string - if rf, ok := ret.Get(0).(func(string) map[string][]string); ok { + var r0 map[string][]sqldb.NodesRecord + if rf, ok := ret.Get(0).(func(string) map[string][]sqldb.NodesRecord); ok { r0 = rf(namespace) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(map[string][]string) + r0 = ret.Get(0).(map[string][]sqldb.NodesRecord) } } diff --git a/persist/sqldb/offload_node_status_repo.go b/persist/sqldb/offload_node_status_repo.go index 64fe982e9f2b..0077767ae5b1 100644 --- a/persist/sqldb/offload_node_status_repo.go +++ b/persist/sqldb/offload_node_status_repo.go @@ -4,6 +4,8 @@ import ( "encoding/json" "fmt" "hash/fnv" + "os" + "strconv" "strings" "time" @@ -12,9 +14,17 @@ import ( wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/util/env" + "github.com/argoproj/argo-workflows/v3/util/file" + "github.com/argoproj/argo-workflows/v3/workflow/common" ) -const OffloadNodeStatusDisabled = "Workflow has offloaded nodes, but offloading has been disabled" +const ( + OffloadNodeStatusDisabled = "Workflow has offloaded nodes, but offloading has been disabled" + tooLarge = "workflow is longer than maximum allowed size for sqldb." + + envVarOffloadMaxSize = "OFFLOAD_NODE_STATUS_MAX_SIZE" + envVarCompressNodeStatus = "OFFLOAD_NODE_STATUS_COMPRESSED" +) type UUIDVersion struct { UID string `db:"uid"` @@ -25,24 +35,44 @@ type OffloadNodeStatusRepo interface { Save(uid, namespace string, nodes wfv1.Nodes) (string, error) Get(uid, version string) (wfv1.Nodes, error) List(namespace string) (map[UUIDVersion]wfv1.Nodes, error) - ListOldOffloads(namespace string) (map[string][]string, error) + ListOldOffloads(namespace string) (map[string][]NodesRecord, error) Delete(uid, version string) error IsEnabled() bool } +func IsTooLargeError(err error) bool { + return err != nil && strings.HasPrefix(err.Error(), tooLarge) +} + +func getMaxWorkflowSize() int { + s, _ := strconv.Atoi(os.Getenv(envVarOffloadMaxSize)) + if s == 0 { + s = 3 * 1024 * 1024 + } + return s +} + func NewOffloadNodeStatusRepo(session db.Session, clusterName, tableName string) (OffloadNodeStatusRepo, error) { // this environment variable allows you to make Argo Workflows delete offloaded data more or less aggressively, // useful for testing - ttl := env.LookupEnvDurationOr("OFFLOAD_NODE_STATUS_TTL", 5*time.Minute) - log.WithField("ttl", ttl).Debug("Node status offloading config") - return &nodeOffloadRepo{session: session, clusterName: clusterName, tableName: tableName, ttl: ttl}, nil + ttl := env.LookupEnvDurationOr(common.EnvVarOffloadNodeStatusTTL, 5*time.Minute) + historyCount := env.LookupEnvIntOr(common.EnvVarOffloadNodeStatusHistoryCount, 12) + log.WithField("ttl", ttl).WithField("historyCount", historyCount).Debug("Node status offloading config") + return &nodeOffloadRepo{session: session, clusterName: clusterName, tableName: tableName, ttl: ttl, historyCount: historyCount}, nil } type nodesRecord struct { ClusterName string `db:"clustername"` UUIDVersion - Namespace string `db:"namespace"` - Nodes string `db:"nodes"` + Namespace string `db:"namespace"` + Nodes string `db:"nodes"` + CompressedNodes string `db:"compressednodes"` +} + +type NodesRecord struct { + UUIDVersion + Id int64 `db:"id"` + UpdatedAt time.Time `db:"updatedat"` } type nodeOffloadRepo struct { @@ -51,6 +81,8 @@ type nodeOffloadRepo struct { tableName string // time to live - at what ttl an offload becomes old ttl time.Duration + // this number is related to retry number of `reapplyUpdate` + historyCount int } func (wdc *nodeOffloadRepo) IsEnabled() bool { @@ -73,15 +105,26 @@ func (wdc *nodeOffloadRepo) Save(uid, namespace string, nodes wfv1.Nodes) (strin if err != nil { return "", err } - + raw := marshalled + size := len(raw) + compressed := "" + if os.Getenv(envVarCompressNodeStatus) != "false" { + raw = "null" + compressed = file.CompressEncodeString(marshalled) + size = len(compressed) + } + if size > getMaxWorkflowSize() { + return "", fmt.Errorf("%s compressed size %d > maxSize %d", tooLarge, size, getMaxWorkflowSize()) + } record := &nodesRecord{ ClusterName: wdc.clusterName, UUIDVersion: UUIDVersion{ UID: uid, Version: version, }, - Namespace: namespace, - Nodes: marshalled, + Namespace: namespace, + Nodes: raw, + CompressedNodes: compressed, } logCtx := log.WithFields(log.Fields{"uid": uid, "version": version}) @@ -101,21 +144,38 @@ func (wdc *nodeOffloadRepo) Save(uid, namespace string, nodes wfv1.Nodes) (strin // This might fail, which kind of fine (maybe a bug). // It might not delete all records, which is also fine, as we always key on resource version. // We also want to keep enough around so that we can service watches. - rs, err := wdc.session.SQL(). - DeleteFrom(wdc.tableName). + var records []NodesRecord + err = wdc.session.SQL(). + Select("id"). + From(wdc.tableName). Where(db.Cond{"clustername": wdc.clusterName}). And(db.Cond{"uid": uid}). And(db.Cond{"version <>": version}). And(wdc.oldOffload()). - Exec() + OrderBy("updatedat desc"). + Offset(wdc.historyCount). + All(&records) if err != nil { return "", err } - rowsAffected, err := rs.RowsAffected() - if err != nil { - return "", err + if len(records) > 0 { + var ids []int64 + for _, r := range records { + ids = append(ids, r.Id) + } + rs, err := wdc.session.SQL(). + DeleteFrom(wdc.tableName). + Where(db.Cond{"id in": ids}). + Exec() + if err != nil { + return "", err + } + rowsAffected, err := rs.RowsAffected() + if err != nil { + return "", err + } + logCtx.WithField("rowsAffected", rowsAffected).Debug("Deleted offloaded nodes") } - logCtx.WithField("rowsAffected", rowsAffected).Debug("Deleted offloaded nodes") return version, nil } @@ -143,8 +203,15 @@ func (wdc *nodeOffloadRepo) Get(uid, version string) (wfv1.Nodes, error) { if err != nil { return nil, err } + dbNodes := r.Nodes + if r.CompressedNodes != "" { + dbNodes, err = file.DecodeDecompressString(r.CompressedNodes) + if err != nil { + return nil, err + } + } nodes := &wfv1.Nodes{} - err = json.Unmarshal([]byte(r.Nodes), nodes) + err = json.Unmarshal([]byte(dbNodes), nodes) if err != nil { return nil, err } @@ -166,8 +233,15 @@ func (wdc *nodeOffloadRepo) List(namespace string) (map[UUIDVersion]wfv1.Nodes, res := make(map[UUIDVersion]wfv1.Nodes) for _, r := range records { + dbNodes := r.Nodes + if r.CompressedNodes != "" { + dbNodes, err = file.DecodeDecompressString(r.CompressedNodes) + if err != nil { + return nil, err + } + } nodes := &wfv1.Nodes{} - err = json.Unmarshal([]byte(r.Nodes), nodes) + err = json.Unmarshal([]byte(dbNodes), nodes) if err != nil { return nil, err } @@ -177,11 +251,11 @@ func (wdc *nodeOffloadRepo) List(namespace string) (map[UUIDVersion]wfv1.Nodes, return res, nil } -func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) (map[string][]string, error) { +func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) (map[string][]NodesRecord, error) { log.WithFields(log.Fields{"namespace": namespace}).Debug("Listing old offloaded nodes") - var records []UUIDVersion + var records []NodesRecord err := wdc.session.SQL(). - Select("uid", "version"). + Select("uid", "version", "updatedat"). From(wdc.tableName). Where(db.Cond{"clustername": wdc.clusterName}). And(namespaceEqual(namespace)). @@ -190,9 +264,9 @@ func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) (map[string][]stri if err != nil { return nil, err } - x := make(map[string][]string) + x := make(map[string][]NodesRecord) for _, r := range records { - x[r.UID] = append(x[r.UID], r.Version) + x[r.UID] = append(x[r.UID], r) } return x, nil } diff --git a/test/stress/mysql/main.go b/test/stress/mysql/main.go new file mode 100644 index 000000000000..83c9b0e8ee1e --- /dev/null +++ b/test/stress/mysql/main.go @@ -0,0 +1,357 @@ +package main + +import ( + "context" + "flag" + "os" + "os/signal" + "strconv" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + + "github.com/argoproj/pkg/rand" + "github.com/hashicorp/go-uuid" + log "github.com/sirupsen/logrus" + "github.com/upper/db/v4" + "github.com/upper/db/v4/adapter/mysql" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + runtimeutil "k8s.io/apimachinery/pkg/util/runtime" + + "github.com/argoproj/argo-workflows/v3/persist/sqldb" + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + envutil "github.com/argoproj/argo-workflows/v3/util/env" + "github.com/argoproj/argo-workflows/v3/util/instanceid" +) + +type archivedRecord struct { + StartedAt time.Time `db:"startedat"` +} + +func getFirstArchivedRecord(session db.Session) (*archivedRecord, error) { + var record *archivedRecord + err := session.SQL().Select("startedat"). + From("argo_archived_workflows"). + OrderBy("startedat asc"). + Limit(1). + One(&record) + if err != nil { + log.Warnf("Get first archived workflow error: %s", err) + } + return record, err +} + +func getLastArchivedRecord(session db.Session) (*archivedRecord, error) { + var record *archivedRecord + err := session.SQL().Select("startedat"). + From("argo_archived_workflows"). + OrderBy("startedat desc"). + Limit(1). + One(&record) + if err != nil { + log.Warnf("Get last archived workflow error: %s", err) + } + return record, err +} + +func getArchivedWorkflowsCount(session db.Session) (int, error) { + var count *int + rows, err := session.SQL().Query("select count(*) count from argo_archived_workflows") + if err != nil { + log.Warnf("Get archived workflow count error: %s", err) + return 0, err + } + defer func() { + rows.Close() + }() + if rows.Next() { + err = rows.Scan(&count) + if err != nil { + log.Warnf("Scan archived workflow count error: %s", err) + return 0, err + } + return *count, nil + } + return 0, rows.Err() +} + +// Related environment variables: +// - WORKFLOW_GC_PERIOD +// - OFFLOAD_NODE_STATUS_TTL +func main() { + var dbUri string + var parallel int + var duration time.Duration + var archiveCount int + var archiveCleanSize int + var nodeStatusSize int + var archiveNodeStatusSize int + var archiveWorkers int + + flag.StringVar(&dbUri, "db-uri", "root:root@tcp(localhost:3306)/argoperftest", "Database connection url string") + flag.IntVar(¶llel, "parallel", 100, "Concurrent worker count") + flag.DurationVar(&duration, "duration", 15*time.Minute, "Test time duration") + flag.IntVar(&archiveCount, "archive-count", 100000, "Archive workflows count") + flag.IntVar(&archiveCleanSize, "archive-clean-size", 10000, "Archive workflows count of per batch cleaning") + flag.IntVar(&nodeStatusSize, "node-status-size", 40*1024, "Archive workflows count") + flag.IntVar(&archiveNodeStatusSize, "archive-node-status-size", 200*1024, "Archive workflows count") + flag.IntVar(&archiveWorkers, "archive-workers", 10, "Archive workers count") + flag.Parse() + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + conn, err := mysql.ParseURL(dbUri) + if err != nil { + log.Fatal(err) + } + + session, err := mysql.Open(conn) + if err != nil { + log.Fatal(err) + } + defer func() { + session.Close() + }() + session.SetMaxOpenConns(5000) + session.SetMaxIdleConns(150) + session.SetConnMaxLifetime(60 * time.Second) + + // this is needed to make MySQL run in a Golang-compatible UTF-8 character set. + _, err = session.SQL().Exec("SET NAMES 'utf8mb4'") + if err != nil { + log.Fatal(err) + } + _, err = session.SQL().Exec("SET CHARACTER SET utf8mb4") + if err != nil { + log.Fatal(err) + } + + err = sqldb.NewMigrate(session, "local", "argo_workflows").Exec(ctx) + if err != nil { + log.Fatal(err) + } + + instanceIDService := instanceid.NewService("") + archive := sqldb.NewWorkflowArchive(session, "local", "argo-managed", instanceIDService) + + repo, err := sqldb.NewOffloadNodeStatusRepo(session, "local", "argo_workflows") + if err != nil { + log.Fatal(err) + } + + nodeStatus, err := rand.RandString(nodeStatusSize) + if err != nil { + log.Fatal(err) + } + + archiveNodeStatus, err := rand.RandString(archiveNodeStatusSize) + if err != nil { + log.Fatal(err) + } + + firstStartTime := time.Date(2024, time.January, 1, 00, 0, 0, 0, time.UTC) + lastStartTime := time.Date(2024, time.January, 1, 00, 0, 0, 0, time.UTC) + if record, err := getLastArchivedRecord(session); err == nil { + lastStartTime = record.StartedAt + } + log.Infof("Last started workflow time is %s", lastStartTime) + + if count, err := getArchivedWorkflowsCount(session); err == nil { + log.Infof("There are %d archived workflows currently", count) + } + + wg := sync.WaitGroup{} + createdArchiveCount := atomic.Int64{} + createArchive := func(start, end int) { + for i := start; i < end; i++ { + name, err := rand.RandString(16) + if err != nil { + log.Warnf("generate workflow name error: %s", err) + continue + } + uid, err := uuid.GenerateUUID() + if err != nil { + log.Warnf("generate uuid error: %s", err) + continue + } + offTime := time.Duration(i) * time.Second + wf := wfv1.Workflow{ + ObjectMeta: v1.ObjectMeta{ + UID: types.UID(uid), + Name: name, + Namespace: "argo-managed", + Labels: map[string]string{}, + }, + Status: wfv1.WorkflowStatus{ + Nodes: map[string]wfv1.NodeStatus{ + "n1": { + Message: archiveNodeStatus, + }, + }, + Phase: wfv1.WorkflowSucceeded, + StartedAt: v1.NewTime(lastStartTime.Add(offTime)), + FinishedAt: v1.NewTime(lastStartTime.Add(offTime)), + }, + } + err = archive.ArchiveWorkflow(&wf) + if err != nil { + log.Warnf("archive workflow error: %s", err) + if strings.Contains(err.Error(), "try restarting transaction") { + i-- + } else { + continue + } + } else { + createdArchiveCount.Add(1) + } + } + wg.Done() + } + workerBatch := archiveCount / archiveWorkers + for i := 0; i < archiveWorkers; i++ { + log.Info("Creating archive worker...") + go createArchive(i*workerBatch, (i+1)*workerBatch) + wg.Add(1) + } + stopPrintCount := make(chan struct{}) + go func(c chan struct{}) { + t := time.NewTicker(time.Second) + for { + select { + case <-c: + return + case <-t.C: + log.Infof("Created archives count is %d", createdArchiveCount.Load()) + } + } + }(stopPrintCount) + wg.Wait() + stopPrintCount <- struct{}{} + log.Info("Create archives done") + + if record, err := getFirstArchivedRecord(session); err == nil { + firstStartTime = record.StartedAt + } + log.Infof("First started workflow time is %s", firstStartTime) + + totalUpdate := atomic.Int64{} + calculateRate := func(startTime time.Time) float64 { + endTime := time.Now() + second := endTime.Sub(startTime).Seconds() + rate := float64(totalUpdate.Load()) / second + return rate + } + + reconcileWf := func() { + uid, err := uuid.GenerateUUID() + if err != nil { + log.Fatal(err) + } + i := int64(0) + version := "" + for { + select { + case <-ctx.Done(): + return + default: + i++ + if version != "" { + if _, err = repo.Get(uid, version); err != nil { + log.Error("get", err) + } + } + time.Sleep(500 * time.Millisecond) + nodes := map[string]wfv1.NodeStatus{ + "n1": { + Message: nodeStatus + strconv.FormatInt(i, 10), + }, + } + version, err = repo.Save(uid, "argo-managed", nodes) + if err != nil { + log.Error("save", err) + } + time.Sleep(500 * time.Millisecond) + totalUpdate.Add(1) + } + } + } + + workflowGarbageCollector := func() { + defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) + + periodicity := envutil.LookupEnvDurationOr("WORKFLOW_GC_PERIOD", 5*time.Minute) + log.WithField("periodicity", periodicity).Info("Performing periodic GC") + t := time.NewTicker(periodicity) + for { + select { + case <-ctx.Done(): + return + case <-t.C: + log.Info("Performing periodic workflow GC") + oldRecords, err := repo.ListOldOffloads("argo-managed") + if err != nil { + log.Error(err) + } + log.WithField("len_wfs", len(oldRecords)).Info("Deleting old offloads that are not live") + for uid, versions := range oldRecords { + for _, version := range versions { + // skip delete if offload is live + //log.Info("deleting......") + if err := repo.Delete(uid, version.Version); err != nil { + log.Error("delete", err) + } + } + } + log.Info("Workflow GC finished") + } + } + } + + go workflowGarbageCollector() + + for i := 0; i < parallel; i++ { + go reconcileWf() + } + + stopCh := make(chan os.Signal, 1) + signal.Notify(stopCh, os.Interrupt, syscall.SIGTERM) + + stopTimer := time.NewTimer(duration) + cleanDuration := 3 * time.Minute + cleanTicker := time.NewTicker(cleanDuration) + nthCleanBatch := 0 + startTime := time.Now() + + for { + select { + case <-cleanTicker.C: + log.Infof("Average rate is: %f", calculateRate(startTime)) + + cleanTicker.Stop() + totalUpdate.Store(0) + startTime = time.Now() + nthCleanBatch += 1 + archiveTTL := time.Now().UTC().Sub(firstStartTime.Add(time.Duration(archiveCleanSize*nthCleanBatch) * time.Second)) + err = archive.DeleteExpiredWorkflows(archiveTTL) + if err != nil { + log.Warnf("Clean up expired archive workflows error: %s", err) + } + log.Infof("Cleaning %d archives cost %s", archiveCleanSize, time.Since(startTime)) + cleanTicker.Reset(cleanDuration) + log.Infof("Average rate when cleaning archives is: %f", calculateRate(startTime)) + + totalUpdate.Store(0) + startTime = time.Now() + case <-stopTimer.C: + cancel() + return + case <-stopCh: + cancel() + return + } + } +} diff --git a/workflow/common/common.go b/workflow/common/common.go index b4f174263eaa..dbeacac44291 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -58,6 +58,8 @@ const ( // AnnotationKeyArtifactGCStrategy is listed as an annotation on the Artifact GC Pod to identify // the strategy whose artifacts are being deleted AnnotationKeyArtifactGCStrategy = workflow.WorkflowFullName + "/artifact-gc-strategy" + // AnnotationKeyHydrationFailedTime is the time that when hydrate failed, it will be cleared after a successful hydration + AnnotationKeyHydrationFailedTime = workflow.WorkflowFullName + "/hydration-failed-time" // LabelKeyControllerInstanceID is the label the controller will carry forward to workflows/pod labels // for the purposes of workflow segregation @@ -163,6 +165,14 @@ const ( EnvVarProgressFile = "ARGO_PROGRESS_FILE" // EnvVarDefaultRequeueTime is the default requeue time for Workflow Informers. For more info, see rate_limiters.go EnvVarDefaultRequeueTime = "DEFAULT_REQUEUE_TIME" + // EnvVarOffloadNodeStatusErrorFallback control if we can save node status to workflow as normal when offload failed + EnvVarOffloadNodeStatusErrorFallback = "OFFLOAD_NODE_STATUS_ERROR_FALLBACK" + // EnvVarOffloadNodeStatusHistoryCount is the number of workflow offload status to keep for a live workflow + EnvVarOffloadNodeStatusHistoryCount = "OFFLOAD_NODE_STATUS_HISTORY_COUNT" + // EnvVarOffloadNodeStatusTTL is the ttl time of offload node status + EnvVarOffloadNodeStatusTTL = "OFFLOAD_NODE_STATUS_TTL" + // EnvVarHydrationFailedRetryDuration is the time that we wait before mark it is as failed + EnvVarHydrationFailedRetryDuration = "HYDRATION_FAILED_RETRY_DURATION" // EnvAgentTaskWorkers is the number of task workers for the agent pod EnvAgentTaskWorkers = "ARGO_AGENT_TASK_WORKERS" // EnvAgentPatchRate is the rate that the Argo Agent will patch the Workflow TaskSet diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 7dfce1a960b4..90bb68c9f99d 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "os" + "sort" "strconv" gosync "sync" "syscall" @@ -682,10 +683,16 @@ func (wfc *WorkflowController) workflowGarbageCollector(stopCh <-chan struct{}) continue } log.WithField("len_wfs", len(oldRecords)).Info("Deleting old offloads that are not live") + maxGoroutines := env.LookupEnvIntOr("WORKFLOW_GC_MAX_WORKER", 10) + guard := make(chan struct{}, maxGoroutines) for uid, versions := range oldRecords { - if err := wfc.deleteOffloadedNodesForWorkflow(uid, versions); err != nil { - log.WithError(err).WithField("uid", uid).Error("Failed to delete old offloaded nodes") - } + guard <- struct{}{} + go func(gUid string, gVersions []sqldb.NodesRecord) { + if err := wfc.deleteOffloadedNodesForWorkflow(gUid, gVersions); err != nil { + log.WithError(err).WithField("uid", gUid).Error("Failed to delete old offloaded nodes") + } + <-guard + }(uid, versions) } log.Info("Workflow GC finished") } @@ -693,7 +700,7 @@ func (wfc *WorkflowController) workflowGarbageCollector(stopCh <-chan struct{}) } } -func (wfc *WorkflowController) deleteOffloadedNodesForWorkflow(uid string, versions []string) error { +func (wfc *WorkflowController) deleteOffloadedNodesForWorkflow(uid string, versions []sqldb.NodesRecord) error { workflows, err := wfc.wfInformer.GetIndexer().ByIndex(indexes.UIDIndex, uid) if err != nil { return err @@ -735,12 +742,33 @@ func (wfc *WorkflowController) deleteOffloadedNodesForWorkflow(uid string, versi default: return fmt.Errorf("expected no more than 1 workflow, got %d", l) } + var updatedAt time.Time + if wf != nil { + sort.Slice(versions, func(i, j int) bool { + return versions[i].UpdatedAt.After(versions[j].UpdatedAt) + }) + historyCount := env.LookupEnvIntOr(common.EnvVarOffloadNodeStatusHistoryCount, 12) + if len(versions) > historyCount { + versions = versions[historyCount:] + } else { + // Skip delete offload version for a live workflow if current records count is less than configured count + return nil + } + ttl := env.LookupEnvDurationOr(common.EnvVarOffloadNodeStatusTTL, 5*time.Minute) + for _, v := range versions { + if v.Version == wf.Status.OffloadNodeStatusVersion { + updatedAt = v.UpdatedAt.Add(-ttl) + break + } + } + } for _, version := range versions { // skip delete if offload is live - if wf != nil && wf.Status.OffloadNodeStatusVersion == version { + if wf != nil && (version.Version == wf.Status.OffloadNodeStatusVersion || !updatedAt.IsZero() && !version.UpdatedAt.Before(updatedAt)) { continue } - if err := wfc.offloadNodeStatusRepo.Delete(uid, version); err != nil { + log.Debugf("Deleting offload version, uid %s, version %s", uid, version.Version) + if err := wfc.offloadNodeStatusRepo.Delete(uid, version.Version); err != nil { return err } } @@ -789,6 +817,44 @@ func (wfc *WorkflowController) runWorker() { } } +func (wfc *WorkflowController) hydrateWorkflow(woc *wfOperationCtx) (bool, error) { + err := wfc.hydrator.Hydrate(woc.wf) + if err != nil { + if os.Getenv(common.EnvVarOffloadNodeStatusErrorFallback) != "true" { + return false, err + } + woc.log.Warnf("hydration failed and will retry, error: %v", err) + if woc.wf.Annotations == nil { + woc.wf.Annotations = make(map[string]string) + } + failTimeStr, exist := woc.wf.Annotations[common.AnnotationKeyHydrationFailedTime] + if exist { + failTime, err := time.Parse(time.RFC3339, failTimeStr) + if err != nil { + return false, fmt.Errorf("parse hydration failed time error: %w", err) + } + retryDuration := env.LookupEnvDurationOr(common.EnvVarHydrationFailedRetryDuration, 900*time.Second) + if failTime.Add(retryDuration).Before(time.Now()) { + return false, fmt.Errorf("retry hydration timeout after waiting %s", retryDuration.String()) + } else { + return false, nil + } + } else { + woc.wf.Annotations[common.AnnotationKeyHydrationFailedTime] = time.Now().Format(time.RFC3339) + woc.updated = true + return false, nil + } + } + if woc.wf.Annotations != nil && woc.wf.Annotations[common.AnnotationKeyHydrationFailedTime] != "" { + delete(woc.wf.Annotations, common.AnnotationKeyHydrationFailedTime) + woc.updated = true + } + if !wfc.hydrator.IsHydrated(woc.orig) { + wfc.hydrator.HydrateWithNodes(woc.orig, woc.wf.Status.Nodes.DeepCopy()) + } + return true, nil +} + // processNextItem is the worker logic for handling workflow updates func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { key, quit := wfc.wfQueue.Get() @@ -853,14 +919,18 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { wfc.throttler.Remove(key.(string)) } }() - - err = wfc.hydrator.Hydrate(woc.wf) + hydrated, err := wfc.hydrateWorkflow(woc) if err != nil { - woc.log.Errorf("hydration failed: %v", err) + woc.log.Errorf("Hydrate workflow with uid %s version %s failed: %v", woc.wf.UID, woc.wf.Status.OffloadNodeStatusVersion, err) woc.markWorkflowError(ctx, err) woc.persistUpdates(ctx) return true } + if !hydrated { + woc.persistUpdates(ctx) + wfc.wfQueue.AddRateLimited(fmt.Sprintf("%s/%s", woc.wf.GetNamespace(), woc.wf.GetName())) + return true + } startTime := time.Now() woc.operate(ctx) wfc.metrics.OperationCompleted(time.Since(startTime).Seconds()) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 84279172351a..5315fd9d8b27 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -29,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/validation" "k8s.io/client-go/tools/cache" @@ -37,6 +38,7 @@ import ( "sigs.k8s.io/yaml" "github.com/argoproj/argo-workflows/v3/errors" + "github.com/argoproj/argo-workflows/v3/persist/sqldb" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1" @@ -738,10 +740,12 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { wfClient := woc.controller.wfclientset.ArgoprojV1alpha1().Workflows(woc.wf.ObjectMeta.Namespace) // try and compress nodes if needed nodes := woc.wf.Status.Nodes - err := woc.controller.hydrator.Dehydrate(woc.wf) - if err != nil { - woc.log.Warnf("Failed to dehydrate: %v", err) - woc.markWorkflowError(ctx, err) + dehydrateErr := woc.controller.hydrator.Dehydrate(woc.wf) + if dehydrateErr != nil { + woc.log.Warnf("Failed to dehydrate: %v", dehydrateErr) + if !woc.wf.Status.Fulfilled() { + woc.markWorkflowError(ctx, dehydrateErr) + } } // Release all acquired lock for completed workflow @@ -752,11 +756,16 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { } // Remove completed taskset status before update workflow. - err = woc.removeCompletedTaskSetStatus(ctx) + err := woc.removeCompletedTaskSetStatus(ctx) if err != nil { woc.log.WithError(err).Warn("error updating taskset") } + if sqldb.IsTooLargeError(dehydrateErr) { + woc.persistWorkflowSizeLimitErr(ctx, wfClient, dehydrateErr) + return + } + wf, err := wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{}) if err != nil { woc.log.Warnf("Error updating workflow: %v %s", err, apierr.ReasonForError(err)) @@ -852,9 +861,28 @@ func (woc *wfOperationCtx) writeBackToInformer() error { // persistWorkflowSizeLimitErr will fail a the workflow with an error when we hit the resource size limit // See https://github.com/argoproj/argo-workflows/issues/913 func (woc *wfOperationCtx) persistWorkflowSizeLimitErr(ctx context.Context, wfClient v1alpha1.WorkflowInterface, err error) { + if woc.orig.Status.Fulfilled() { + return + } woc.wf = woc.orig.DeepCopy() woc.markWorkflowError(ctx, err) - _, err = wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{}) + oldData, err := json.Marshal(woc.orig) + if err != nil { + woc.log.Warnf("Error marshal woc.orig with error: %v", err) + return + } + newData, err := json.Marshal(woc.wf) + if err != nil { + woc.log.Warnf("Error marshal woc.wf with error: %v", err) + return + } + patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + woc.log.Warnf("Error create merge patch of woc.orig and woc.wf with error: %v", err) + return + } + woc.log.Debugf("Persist workflow size limit error, merge patch: %s", string(patchBytes)) + _, err = wfClient.Patch(ctx, woc.wf.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}) if err != nil { woc.log.Warnf("Error updating workflow with size error: %v", err) } @@ -868,10 +896,6 @@ func (woc *wfOperationCtx) reapplyUpdate(ctx context.Context, wfClient v1alpha1. if woc.orig.ResourceVersion != woc.wf.ResourceVersion { woc.log.Panic("cannot re-apply update with mismatched resource versions") } - err := woc.controller.hydrator.Hydrate(woc.orig) - if err != nil { - return nil, err - } // First generate the patch oldData, err := json.Marshal(woc.orig) if err != nil { diff --git a/workflow/hydrator/hydrator.go b/workflow/hydrator/hydrator.go index c5b922c72191..2d041ca7e2bb 100644 --- a/workflow/hydrator/hydrator.go +++ b/workflow/hydrator/hydrator.go @@ -1,6 +1,7 @@ package hydrator import ( + "encoding/json" "fmt" "os" "time" @@ -11,7 +12,9 @@ import ( "github.com/argoproj/argo-workflows/v3/persist/sqldb" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" errorsutil "github.com/argoproj/argo-workflows/v3/util/errors" + "github.com/argoproj/argo-workflows/v3/util/file" waitutil "github.com/argoproj/argo-workflows/v3/util/wait" + "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/packer" ) @@ -31,6 +34,7 @@ func New(offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo) Interface { } var alwaysOffloadNodeStatus = os.Getenv("ALWAYS_OFFLOAD_NODE_STATUS") == "true" +var offloadNodeStatusErrorFallback = os.Getenv(common.EnvVarOffloadNodeStatusErrorFallback) == "true" func init() { log.WithField("alwaysOffloadNodeStatus", alwaysOffloadNodeStatus).Debug("Hydrator config") @@ -115,6 +119,21 @@ func (h hydrator) Dehydrate(wf *wfv1.Workflow) error { return !errorsutil.IsTransientErr(offloadErr), offloadErr }) if offloadErr != nil { + if sqldb.IsTooLargeError(offloadErr) { + return offloadErr + } + if offloadNodeStatusErrorFallback { + log.Warnf("%sTried to offload but encountered error: %s", errMsg, offloadErr.Error()) + var nodeContent []byte + nodeContent, err = json.Marshal(wf.Status.Nodes) + if err != nil { + return err + } + wf.Status.CompressedNodes = file.CompressEncodeString(string(nodeContent)) + wf.Status.Nodes = nil + wf.Status.OffloadNodeStatusVersion = "" + return nil + } return fmt.Errorf("%sTried to offload but encountered error: %s", errMsg, offloadErr.Error()) } wf.Status.Nodes = nil diff --git a/workflow/packer/packer.go b/workflow/packer/packer.go index a140b8682ce3..47da6fb44729 100644 --- a/workflow/packer/packer.go +++ b/workflow/packer/packer.go @@ -11,10 +11,19 @@ import ( "github.com/argoproj/argo-workflows/v3/util/file" ) -const envVarName = "MAX_WORKFLOW_SIZE" +const envVarMaxWorkflowSize = "MAX_WORKFLOW_SIZE" +const envVarMaxNodeStatusSize = "MAX_NODE_STATUS_SIZE" func getMaxWorkflowSize() int { - s, _ := strconv.Atoi(os.Getenv(envVarName)) + s, _ := strconv.Atoi(os.Getenv(envVarMaxWorkflowSize)) + if s == 0 { + s = 1024 * 1024 + } + return s +} + +func getMaxNodeStatusSize() int { + s, _ := strconv.Atoi(os.Getenv(envVarMaxNodeStatusSize)) if s == 0 { s = 1024 * 1024 } @@ -22,8 +31,8 @@ func getMaxWorkflowSize() int { } func SetMaxWorkflowSize(s int) func() { - _ = os.Setenv(envVarName, strconv.Itoa(s)) - return func() { _ = os.Unsetenv(envVarName) } + _ = os.Setenv(envVarMaxWorkflowSize, strconv.Itoa(s)) + return func() { _ = os.Unsetenv(envVarMaxWorkflowSize) } } func DecompressWorkflow(wf *wfv1.Workflow) error { @@ -48,7 +57,20 @@ func getSize(wf *wfv1.Workflow) (int, error) { return len(nodeContent), nil } +// getNodeStatusSize return the workflow node status json string size +func getNodeStatusSize(wf *wfv1.Workflow) (int, error) { + nodeContent, err := json.Marshal(wf.Status.Nodes) + if err != nil { + return 0, err + } + return len(nodeContent) + len(wf.Status.CompressedNodes), nil +} + func IsLargeWorkflow(wf *wfv1.Workflow) (bool, error) { + nodesSize, err := getNodeStatusSize(wf) + if nodesSize > getMaxNodeStatusSize() { + return true, err + } size, err := getSize(wf) return size > getMaxWorkflowSize(), err } @@ -86,13 +108,18 @@ func compressWorkflow(wf *wfv1.Workflow) error { return err } if large { - compressedSize, err := getSize(wf) wf.Status.CompressedNodes = "" wf.Status.Nodes = nodes + compressedSize, err := getSize(wf) + if err != nil { + return err + } + nodesSize, err := getNodeStatusSize(wf) if err != nil { return err } - return fmt.Errorf("%s compressed size %d > maxSize %d", tooLarge, compressedSize, getMaxWorkflowSize()) + return fmt.Errorf("%s compressed size %d > maxSize %d or node status size %d > maxNodeStatusSize %d", + tooLarge, compressedSize, getMaxWorkflowSize(), nodesSize, getMaxNodeStatusSize()) } return nil }