Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: compress node status when offloading to database. Fixes #13290 #13313

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 49 additions & 46 deletions docs/environment-variables.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/go-uuid v1.0.3
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/huandu/xstrings v1.3.3 // indirect
github.com/imdario/mergo v0.3.15 // indirect
Expand Down
2 changes: 1 addition & 1 deletion persist/sqldb/explosive_offload_node_status_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ func (n *explosiveOffloadNodeStatusRepo) Delete(string, string) error {
return OffloadNotSupportedError
}

func (n *explosiveOffloadNodeStatusRepo) ListOldOffloads(string) (map[string][]string, error) {
func (n *explosiveOffloadNodeStatusRepo) ListOldOffloads(string) (map[string][]NodesRecord, error) {
return nil, OffloadNotSupportedError
}
9 changes: 9 additions & 0 deletions persist/sqldb/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,15 @@ func (m migrate) Exec(ctx context.Context) (err error) {
// add indexes for list archived workflow performance. #8836
ansiSQLChange(`create index argo_archived_workflows_i4 on argo_archived_workflows (startedat)`),
ansiSQLChange(`create index argo_archived_workflows_labels_i1 on argo_archived_workflows_labels (name,value)`),
ternary(dbType == MySQL,
ansiSQLChange(`alter table `+m.tableName+` add column compressednodes longtext`),
ansiSQLChange(`alter table `+m.tableName+` add column compressednodes text`),
),
ansiSQLChange(`update ` + m.tableName + ` set compressednodes = '' where compressednodes is null`),
ternary(dbType == MySQL,
ansiSQLChange(`alter table `+m.tableName+` add column id bigint auto_increment, add constraint id unique (id)`),
ansiSQLChange(`alter table `+m.tableName+` add column id bigserial, add constraint id unique (id)`),
),
} {
err := m.applyChange(changeSchemaVersion, change)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions persist/sqldb/mocks/OffloadNodeStatusRepo.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

122 changes: 98 additions & 24 deletions persist/sqldb/offload_node_status_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/json"
"fmt"
"hash/fnv"
"os"
"strconv"
"strings"
"time"

Expand All @@ -12,9 +14,17 @@ import (

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util/env"
"github.com/argoproj/argo-workflows/v3/util/file"
"github.com/argoproj/argo-workflows/v3/workflow/common"
)

const OffloadNodeStatusDisabled = "Workflow has offloaded nodes, but offloading has been disabled"
const (
OffloadNodeStatusDisabled = "Workflow has offloaded nodes, but offloading has been disabled"
tooLarge = "workflow is longer than maximum allowed size for sqldb."

envVarOffloadMaxSize = "OFFLOAD_NODE_STATUS_MAX_SIZE"
envVarCompressNodeStatus = "OFFLOAD_NODE_STATUS_COMPRESSED"
)

type UUIDVersion struct {
UID string `db:"uid"`
Expand All @@ -25,24 +35,44 @@ type OffloadNodeStatusRepo interface {
Save(uid, namespace string, nodes wfv1.Nodes) (string, error)
Get(uid, version string) (wfv1.Nodes, error)
List(namespace string) (map[UUIDVersion]wfv1.Nodes, error)
ListOldOffloads(namespace string) (map[string][]string, error)
ListOldOffloads(namespace string) (map[string][]NodesRecord, error)
Delete(uid, version string) error
IsEnabled() bool
}

func IsTooLargeError(err error) bool {
return err != nil && strings.HasPrefix(err.Error(), tooLarge)
}

func getMaxWorkflowSize() int {
s, _ := strconv.Atoi(os.Getenv(envVarOffloadMaxSize))
if s == 0 {
s = 3 * 1024 * 1024
}
return s
}

func NewOffloadNodeStatusRepo(session db.Session, clusterName, tableName string) (OffloadNodeStatusRepo, error) {
// this environment variable allows you to make Argo Workflows delete offloaded data more or less aggressively,
// useful for testing
ttl := env.LookupEnvDurationOr("OFFLOAD_NODE_STATUS_TTL", 5*time.Minute)
log.WithField("ttl", ttl).Debug("Node status offloading config")
return &nodeOffloadRepo{session: session, clusterName: clusterName, tableName: tableName, ttl: ttl}, nil
ttl := env.LookupEnvDurationOr(common.EnvVarOffloadNodeStatusTTL, 5*time.Minute)
historyCount := env.LookupEnvIntOr(common.EnvVarOffloadNodeStatusHistoryCount, 12)
log.WithField("ttl", ttl).WithField("historyCount", historyCount).Debug("Node status offloading config")
return &nodeOffloadRepo{session: session, clusterName: clusterName, tableName: tableName, ttl: ttl, historyCount: historyCount}, nil
}

type nodesRecord struct {
ClusterName string `db:"clustername"`
UUIDVersion
Namespace string `db:"namespace"`
Nodes string `db:"nodes"`
Namespace string `db:"namespace"`
Nodes string `db:"nodes"`
CompressedNodes string `db:"compressednodes"`
}

type NodesRecord struct {
UUIDVersion
Id int64 `db:"id"`
UpdatedAt time.Time `db:"updatedat"`
}

type nodeOffloadRepo struct {
Expand All @@ -51,6 +81,8 @@ type nodeOffloadRepo struct {
tableName string
// time to live - at what ttl an offload becomes old
ttl time.Duration
// this number is related to retry number of `reapplyUpdate`
historyCount int
}

func (wdc *nodeOffloadRepo) IsEnabled() bool {
Expand All @@ -73,15 +105,26 @@ func (wdc *nodeOffloadRepo) Save(uid, namespace string, nodes wfv1.Nodes) (strin
if err != nil {
return "", err
}

raw := marshalled
size := len(raw)
compressed := ""
if os.Getenv(envVarCompressNodeStatus) != "false" {
raw = "null"
compressed = file.CompressEncodeString(marshalled)
size = len(compressed)
}
if size > getMaxWorkflowSize() {
return "", fmt.Errorf("%s compressed size %d > maxSize %d", tooLarge, size, getMaxWorkflowSize())
}
record := &nodesRecord{
ClusterName: wdc.clusterName,
UUIDVersion: UUIDVersion{
UID: uid,
Version: version,
},
Namespace: namespace,
Nodes: marshalled,
Namespace: namespace,
Nodes: raw,
CompressedNodes: compressed,
}

logCtx := log.WithFields(log.Fields{"uid": uid, "version": version})
Expand All @@ -101,21 +144,38 @@ func (wdc *nodeOffloadRepo) Save(uid, namespace string, nodes wfv1.Nodes) (strin
// This might fail, which kind of fine (maybe a bug).
// It might not delete all records, which is also fine, as we always key on resource version.
// We also want to keep enough around so that we can service watches.
rs, err := wdc.session.SQL().
DeleteFrom(wdc.tableName).
var records []NodesRecord
err = wdc.session.SQL().
Select("id").
From(wdc.tableName).
Where(db.Cond{"clustername": wdc.clusterName}).
And(db.Cond{"uid": uid}).
And(db.Cond{"version <>": version}).
And(wdc.oldOffload()).
Exec()
OrderBy("updatedat desc").
Offset(wdc.historyCount).
All(&records)
if err != nil {
return "", err
}
rowsAffected, err := rs.RowsAffected()
if err != nil {
return "", err
if len(records) > 0 {
var ids []int64
for _, r := range records {
ids = append(ids, r.Id)
}
rs, err := wdc.session.SQL().
DeleteFrom(wdc.tableName).
Where(db.Cond{"id in": ids}).
Exec()
if err != nil {
return "", err
}
rowsAffected, err := rs.RowsAffected()
if err != nil {
return "", err
}
logCtx.WithField("rowsAffected", rowsAffected).Debug("Deleted offloaded nodes")
}
logCtx.WithField("rowsAffected", rowsAffected).Debug("Deleted offloaded nodes")
return version, nil
}

Expand Down Expand Up @@ -143,8 +203,15 @@ func (wdc *nodeOffloadRepo) Get(uid, version string) (wfv1.Nodes, error) {
if err != nil {
return nil, err
}
dbNodes := r.Nodes
if r.CompressedNodes != "" {
dbNodes, err = file.DecodeDecompressString(r.CompressedNodes)
if err != nil {
return nil, err
}
}
nodes := &wfv1.Nodes{}
err = json.Unmarshal([]byte(r.Nodes), nodes)
err = json.Unmarshal([]byte(dbNodes), nodes)
if err != nil {
return nil, err
}
Expand All @@ -166,8 +233,15 @@ func (wdc *nodeOffloadRepo) List(namespace string) (map[UUIDVersion]wfv1.Nodes,

res := make(map[UUIDVersion]wfv1.Nodes)
for _, r := range records {
dbNodes := r.Nodes
if r.CompressedNodes != "" {
dbNodes, err = file.DecodeDecompressString(r.CompressedNodes)
if err != nil {
return nil, err
}
}
nodes := &wfv1.Nodes{}
err = json.Unmarshal([]byte(r.Nodes), nodes)
err = json.Unmarshal([]byte(dbNodes), nodes)
if err != nil {
return nil, err
}
Expand All @@ -177,11 +251,11 @@ func (wdc *nodeOffloadRepo) List(namespace string) (map[UUIDVersion]wfv1.Nodes,
return res, nil
}

func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) (map[string][]string, error) {
func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) (map[string][]NodesRecord, error) {
log.WithFields(log.Fields{"namespace": namespace}).Debug("Listing old offloaded nodes")
var records []UUIDVersion
var records []NodesRecord
err := wdc.session.SQL().
Select("uid", "version").
Select("uid", "version", "updatedat").
From(wdc.tableName).
Where(db.Cond{"clustername": wdc.clusterName}).
And(namespaceEqual(namespace)).
Expand All @@ -190,9 +264,9 @@ func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) (map[string][]stri
if err != nil {
return nil, err
}
x := make(map[string][]string)
x := make(map[string][]NodesRecord)
for _, r := range records {
x[r.UID] = append(x[r.UID], r.Version)
x[r.UID] = append(x[r.UID], r)
}
return x, nil
}
Expand Down
Loading
Loading