Skip to content

Commit

Permalink
fix: optimise workflow node status offload with compression and fallb…
Browse files Browse the repository at this point in the history
…ack and db stability

Signed-off-by: 刘达 <[email protected]>
  • Loading branch information
刘达 committed Aug 8, 2024
1 parent b6d6403 commit 7577a94
Show file tree
Hide file tree
Showing 11 changed files with 638 additions and 55 deletions.
2 changes: 2 additions & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ This document outlines environment variables that can be used to customize behav
| `ARGO_AGENT_TASK_WORKERS` | `int` | `16` | The number of task workers for the agent pod. |
| `ALL_POD_CHANGES_SIGNIFICANT` | `bool` | `false` | Whether to consider all pod changes as significant during pod reconciliation. |
| `ALWAYS_OFFLOAD_NODE_STATUS` | `bool` | `false` | Whether to always offload the node status. |
| `OFFLOAD_NODE_STATUS_ERROR_FALLBACK` | `bool` | `false` | Control if we can save node status to workflow as normal when offload failed. |
| `HYDRATION_FAILED_RETRY_DURATION` | `time.Duration` | `900s` | The time that we wait before mark workflow as Error when OFFLOAD_NODE_STATUS_ERROR_FALLBACK enabled. |
| `ARCHIVED_WORKFLOW_GC_PERIOD` | `time.Duration` | `24h` | The periodicity for GC of archived workflows. |
| `ARGO_PPROF` | `bool` | `false` | Enable [`pprof`](https://go.dev/blog/pprof) endpoints |
| `ARGO_PROGRESS_PATCH_TICK_DURATION` | `time.Duration` | `1m` | How often self reported progress is patched into the pod annotations which means how long it takes until the controller picks up the progress change. Set to 0 to disable self reporting progress. |
Expand Down
2 changes: 1 addition & 1 deletion persist/sqldb/explosive_offload_node_status_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ func (n *explosiveOffloadNodeStatusRepo) Delete(string, string) error {
return OffloadNotSupportedError
}

func (n *explosiveOffloadNodeStatusRepo) ListOldOffloads(string) (map[string][]string, error) {
func (n *explosiveOffloadNodeStatusRepo) ListOldOffloads(string) (map[string][]NodesRecord, error) {
return nil, OffloadNotSupportedError
}
9 changes: 9 additions & 0 deletions persist/sqldb/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,15 @@ func (m migrate) Exec(ctx context.Context) (err error) {
// add indexes for list archived workflow performance. #8836
ansiSQLChange(`create index argo_archived_workflows_i4 on argo_archived_workflows (startedat)`),
ansiSQLChange(`create index argo_archived_workflows_labels_i1 on argo_archived_workflows_labels (name,value)`),
ternary(dbType == MySQL,
ansiSQLChange(`alter table `+m.tableName+` add column compressednodes longtext`),
ansiSQLChange(`alter table `+m.tableName+` add column compressednodes text`),
),
ansiSQLChange(`update ` + m.tableName + ` set compressednodes = '' where compressednodes is null`),
ternary(dbType == MySQL,
ansiSQLChange(`alter table `+m.tableName+` add column id bigint auto_increment, add constraint id unique (id)`),
ansiSQLChange(`alter table `+m.tableName+` add column id bigserial, add constraint id unique (id)`),
),
} {
err := m.applyChange(changeSchemaVersion, change)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions persist/sqldb/mocks/OffloadNodeStatusRepo.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

122 changes: 98 additions & 24 deletions persist/sqldb/offload_node_status_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/json"
"fmt"
"hash/fnv"
"os"
"strconv"
"strings"
"time"

Expand All @@ -12,9 +14,17 @@ import (

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util/env"
"github.com/argoproj/argo-workflows/v3/util/file"
"github.com/argoproj/argo-workflows/v3/workflow/common"
)

const OffloadNodeStatusDisabled = "Workflow has offloaded nodes, but offloading has been disabled"
const (
OffloadNodeStatusDisabled = "Workflow has offloaded nodes, but offloading has been disabled"
tooLarge = "workflow is longer than maximum allowed size for sqldb."

envVarOffloadMaxSize = "OFFLOAD_NODE_STATUS_MAX_SIZE"
envVarCompressNodeStatus = "OFFLOAD_NODE_STATUS_COMPRESSED"
)

type UUIDVersion struct {
UID string `db:"uid"`
Expand All @@ -25,24 +35,44 @@ type OffloadNodeStatusRepo interface {
Save(uid, namespace string, nodes wfv1.Nodes) (string, error)
Get(uid, version string) (wfv1.Nodes, error)
List(namespace string) (map[UUIDVersion]wfv1.Nodes, error)
ListOldOffloads(namespace string) (map[string][]string, error)
ListOldOffloads(namespace string) (map[string][]NodesRecord, error)
Delete(uid, version string) error
IsEnabled() bool
}

func IsTooLargeError(err error) bool {
return err != nil && strings.HasPrefix(err.Error(), tooLarge)
}

func getMaxWorkflowSize() int {
s, _ := strconv.Atoi(os.Getenv(envVarOffloadMaxSize))
if s == 0 {
s = 3 * 1024 * 1024
}
return s
}

func NewOffloadNodeStatusRepo(session db.Session, clusterName, tableName string) (OffloadNodeStatusRepo, error) {
// this environment variable allows you to make Argo Workflows delete offloaded data more or less aggressively,
// useful for testing
ttl := env.LookupEnvDurationOr("OFFLOAD_NODE_STATUS_TTL", 5*time.Minute)
log.WithField("ttl", ttl).Debug("Node status offloading config")
return &nodeOffloadRepo{session: session, clusterName: clusterName, tableName: tableName, ttl: ttl}, nil
ttl := env.LookupEnvDurationOr(common.EnvVarOffloadNodeStatusTTL, 5*time.Minute)
historyCount := env.LookupEnvIntOr(common.EnvVarOffloadNodeStatusHistoryCount, 12)
log.WithField("ttl", ttl).WithField("historyCount", historyCount).Debug("Node status offloading config")
return &nodeOffloadRepo{session: session, clusterName: clusterName, tableName: tableName, ttl: ttl, historyCount: historyCount}, nil
}

type nodesRecord struct {
ClusterName string `db:"clustername"`
UUIDVersion
Namespace string `db:"namespace"`
Nodes string `db:"nodes"`
Namespace string `db:"namespace"`
Nodes string `db:"nodes"`
CompressedNodes string `db:"compressednodes"`
}

type NodesRecord struct {
UUIDVersion
Id int64 `db:"id"`
UpdatedAt time.Time `db:"updatedat"`
}

type nodeOffloadRepo struct {
Expand All @@ -51,6 +81,8 @@ type nodeOffloadRepo struct {
tableName string
// time to live - at what ttl an offload becomes old
ttl time.Duration
// this number is related to retry number of `reapplyUpdate`
historyCount int
}

func (wdc *nodeOffloadRepo) IsEnabled() bool {
Expand All @@ -73,15 +105,26 @@ func (wdc *nodeOffloadRepo) Save(uid, namespace string, nodes wfv1.Nodes) (strin
if err != nil {
return "", err
}

raw := marshalled
size := len(raw)
compressed := ""
if os.Getenv(envVarCompressNodeStatus) != "false" {
raw = "null"
compressed = file.CompressEncodeString(marshalled)
size = len(compressed)
}
if size > getMaxWorkflowSize() {
return "", fmt.Errorf("%s compressed size %d > maxSize %d", tooLarge, size, getMaxWorkflowSize())
}
record := &nodesRecord{
ClusterName: wdc.clusterName,
UUIDVersion: UUIDVersion{
UID: uid,
Version: version,
},
Namespace: namespace,
Nodes: marshalled,
Namespace: namespace,
Nodes: raw,
CompressedNodes: compressed,
}

logCtx := log.WithFields(log.Fields{"uid": uid, "version": version})
Expand All @@ -101,21 +144,38 @@ func (wdc *nodeOffloadRepo) Save(uid, namespace string, nodes wfv1.Nodes) (strin
// This might fail, which kind of fine (maybe a bug).
// It might not delete all records, which is also fine, as we always key on resource version.
// We also want to keep enough around so that we can service watches.
rs, err := wdc.session.SQL().
DeleteFrom(wdc.tableName).
var records []NodesRecord
err = wdc.session.SQL().
Select("id").
From(wdc.tableName).
Where(db.Cond{"clustername": wdc.clusterName}).
And(db.Cond{"uid": uid}).
And(db.Cond{"version <>": version}).
And(wdc.oldOffload()).
Exec()
OrderBy("updatedat desc").
Offset(wdc.historyCount).
All(&records)
if err != nil {
return "", err
}
rowsAffected, err := rs.RowsAffected()
if err != nil {
return "", err
if len(records) > 0 {
var ids []int64
for _, r := range records {
ids = append(ids, r.Id)
}
rs, err := wdc.session.SQL().
DeleteFrom(wdc.tableName).
Where(db.Cond{"id in": ids}).
Exec()
if err != nil {
return "", err
}
rowsAffected, err := rs.RowsAffected()
if err != nil {
return "", err
}
logCtx.WithField("rowsAffected", rowsAffected).Debug("Deleted offloaded nodes")
}
logCtx.WithField("rowsAffected", rowsAffected).Debug("Deleted offloaded nodes")
return version, nil
}

Expand Down Expand Up @@ -143,8 +203,15 @@ func (wdc *nodeOffloadRepo) Get(uid, version string) (wfv1.Nodes, error) {
if err != nil {
return nil, err
}
dbNodes := r.Nodes
if r.CompressedNodes != "" {
dbNodes, err = file.DecodeDecompressString(r.CompressedNodes)
if err != nil {
return nil, err
}
}
nodes := &wfv1.Nodes{}
err = json.Unmarshal([]byte(r.Nodes), nodes)
err = json.Unmarshal([]byte(dbNodes), nodes)
if err != nil {
return nil, err
}
Expand All @@ -166,8 +233,15 @@ func (wdc *nodeOffloadRepo) List(namespace string) (map[UUIDVersion]wfv1.Nodes,

res := make(map[UUIDVersion]wfv1.Nodes)
for _, r := range records {
dbNodes := r.Nodes
if r.CompressedNodes != "" {
dbNodes, err = file.DecodeDecompressString(r.CompressedNodes)
if err != nil {
return nil, err
}
}
nodes := &wfv1.Nodes{}
err = json.Unmarshal([]byte(r.Nodes), nodes)
err = json.Unmarshal([]byte(dbNodes), nodes)
if err != nil {
return nil, err
}
Expand All @@ -177,11 +251,11 @@ func (wdc *nodeOffloadRepo) List(namespace string) (map[UUIDVersion]wfv1.Nodes,
return res, nil
}

func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) (map[string][]string, error) {
func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) (map[string][]NodesRecord, error) {
log.WithFields(log.Fields{"namespace": namespace}).Debug("Listing old offloaded nodes")
var records []UUIDVersion
var records []NodesRecord
err := wdc.session.SQL().
Select("uid", "version").
Select("uid", "version", "updatedat").
From(wdc.tableName).
Where(db.Cond{"clustername": wdc.clusterName}).
And(namespaceEqual(namespace)).
Expand All @@ -190,9 +264,9 @@ func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) (map[string][]stri
if err != nil {
return nil, err
}
x := make(map[string][]string)
x := make(map[string][]NodesRecord)
for _, r := range records {
x[r.UID] = append(x[r.UID], r.Version)
x[r.UID] = append(x[r.UID], r)
}
return x, nil
}
Expand Down
Loading

0 comments on commit 7577a94

Please sign in to comment.