Skip to content

Commit

Permalink
fix: optimise offload nodes clean-up logic
Browse files Browse the repository at this point in the history
Signed-off-by: 刘达 <[email protected]>
  • Loading branch information
刘达 committed Jul 7, 2024
1 parent 5cdf7d7 commit b5c8645
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 56 deletions.
4 changes: 2 additions & 2 deletions persist/sqldb/explosive_offload_node_status_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (n *explosiveOffloadNodeStatusRepo) IsEnabled() bool {
return false
}

func (n *explosiveOffloadNodeStatusRepo) Save(string, string, wfv1.Nodes) (string, error) {
func (n *explosiveOffloadNodeStatusRepo) Save(string, string, int64, wfv1.Nodes) (string, error) {
return "", OffloadNotSupportedError
}

Expand All @@ -33,6 +33,6 @@ func (n *explosiveOffloadNodeStatusRepo) Delete(string, string) error {
return OffloadNotSupportedError
}

func (n *explosiveOffloadNodeStatusRepo) ListOldOffloads(string) (map[string][]string, error) {
func (n *explosiveOffloadNodeStatusRepo) ListOldOffloads(string) (map[string][]NodesRecord, error) {
return nil, OffloadNotSupportedError
}
2 changes: 2 additions & 0 deletions persist/sqldb/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ func (m migrate) Exec(ctx context.Context) (err error) {
ansiSQLChange(`alter table argo_archived_workflows add column compressednodes text`),
),
ansiSQLChange(`update argo_archived_workflows set compressednodes = '' where compressednodes is null`),
ansiSQLChange(`alter table ` + m.tableName + ` add column generation bigint`),
ansiSQLChange(`update ` + m.tableName + ` set generation = 0 where generation is null`),
} {
err := m.applyChange(changeSchemaVersion, change)
if err != nil {
Expand Down
20 changes: 10 additions & 10 deletions persist/sqldb/mocks/OffloadNodeStatusRepo.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 15 additions & 33 deletions persist/sqldb/offload_node_status_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ type UUIDVersion struct {
}

type OffloadNodeStatusRepo interface {
Save(uid, namespace string, nodes wfv1.Nodes) (string, error)
Save(uid, namespace string, generation int64, nodes wfv1.Nodes) (string, error)
Get(uid, version string) (wfv1.Nodes, error)
List(namespace string) (map[UUIDVersion]wfv1.Nodes, error)
ListOldOffloads(namespace string) (map[string][]string, error)
ListOldOffloads(namespace string) (map[string][]NodesRecord, error)
Delete(uid, version string) error
IsEnabled() bool
}
Expand All @@ -39,12 +39,13 @@ func NewOffloadNodeStatusRepo(session db.Session, clusterName, tableName string)
return &nodeOffloadRepo{session: session, clusterName: clusterName, tableName: tableName, ttl: ttl}, nil
}

type nodesRecord struct {
type NodesRecord struct {
ClusterName string `db:"clustername"`
UUIDVersion
Namespace string `db:"namespace"`
Nodes string `db:"nodes"`
CompressedNodes string `db:"compressednodes"`
Generation int64 `db:"generation"`
}

type nodeOffloadRepo struct {
Expand All @@ -70,14 +71,14 @@ func nodeStatusVersion(s wfv1.Nodes) (string, string, error) {
return string(marshalled), fmt.Sprintf("fnv:%v", h.Sum32()), nil
}

func (wdc *nodeOffloadRepo) Save(uid, namespace string, nodes wfv1.Nodes) (string, error) {
func (wdc *nodeOffloadRepo) Save(uid, namespace string, generation int64, nodes wfv1.Nodes) (string, error) {
marshalled, version, err := nodeStatusVersion(nodes)
if err != nil {
return "", err
}
compressed := file.CompressEncodeString(marshalled)

record := &nodesRecord{
record := &NodesRecord{
ClusterName: wdc.clusterName,
UUIDVersion: UUIDVersion{
UID: uid,
Expand All @@ -86,6 +87,7 @@ func (wdc *nodeOffloadRepo) Save(uid, namespace string, nodes wfv1.Nodes) (strin
Namespace: namespace,
Nodes: "null",
CompressedNodes: compressed,
Generation: generation,
}

logCtx := log.WithFields(log.Fields{"uid": uid, "version": version})
Expand All @@ -99,27 +101,7 @@ func (wdc *nodeOffloadRepo) Save(uid, namespace string, nodes wfv1.Nodes) (strin
}
logCtx.WithField("err", err).Info("Ignoring duplicate key error")
}

logCtx.Debug("Nodes offloaded, cleaning up old offloads")

// This might fail, which kind of fine (maybe a bug).
// It might not delete all records, which is also fine, as we always key on resource version.
// We also want to keep enough around so that we can service watches.
rs, err := wdc.session.SQL().
DeleteFrom(wdc.tableName).
Where(db.Cond{"clustername": wdc.clusterName}).
And(db.Cond{"uid": uid}).
And(db.Cond{"version <>": version}).
And(wdc.oldOffload()).
Exec()
if err != nil {
return "", err
}
rowsAffected, err := rs.RowsAffected()
if err != nil {
return "", err
}
logCtx.WithField("rowsAffected", rowsAffected).Debug("Deleted offloaded nodes")
// old versions will be cleaned-up by periodic gc
return version, nil
}

Expand All @@ -137,7 +119,7 @@ func isDuplicateKeyError(err error) bool {

func (wdc *nodeOffloadRepo) Get(uid, version string) (wfv1.Nodes, error) {
log.WithFields(log.Fields{"uid": uid, "version": version}).Debug("Getting offloaded nodes")
r := &nodesRecord{}
r := &NodesRecord{}
err := wdc.session.SQL().
SelectFrom(wdc.tableName).
Where(db.Cond{"clustername": wdc.clusterName}).
Expand All @@ -164,7 +146,7 @@ func (wdc *nodeOffloadRepo) Get(uid, version string) (wfv1.Nodes, error) {

func (wdc *nodeOffloadRepo) List(namespace string) (map[UUIDVersion]wfv1.Nodes, error) {
log.WithFields(log.Fields{"namespace": namespace}).Debug("Listing offloaded nodes")
var records []nodesRecord
var records []NodesRecord
err := wdc.session.SQL().
Select("uid", "version", "nodes").
From(wdc.tableName).
Expand Down Expand Up @@ -195,11 +177,11 @@ func (wdc *nodeOffloadRepo) List(namespace string) (map[UUIDVersion]wfv1.Nodes,
return res, nil
}

func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) (map[string][]string, error) {
func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) (map[string][]NodesRecord, error) {
log.WithFields(log.Fields{"namespace": namespace}).Debug("Listing old offloaded nodes")
var records []UUIDVersion
var records []NodesRecord
err := wdc.session.SQL().
Select("uid", "version").
Select("uid", "version", "generation").
From(wdc.tableName).
Where(db.Cond{"clustername": wdc.clusterName}).
And(namespaceEqual(namespace)).
Expand All @@ -208,9 +190,9 @@ func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) (map[string][]stri
if err != nil {
return nil, err
}
x := make(map[string][]string)
x := make(map[string][]NodesRecord)
for _, r := range records {
x[r.UID] = append(x[r.UID], r.Version)
x[r.UID] = append(x[r.UID], r)
}
return x, nil
}
Expand Down
2 changes: 1 addition & 1 deletion server/workflowarchive/archived_workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (w *archivedWorkflowServer) RetryArchivedWorkflow(ctx context.Context, req
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
_, err = w.offloadNodeStatusRepo.Save(string(result.UID), wf.Namespace, offloadedNodes)
_, err = w.offloadNodeStatusRepo.Save(string(result.UID), wf.Namespace, result.Generation, offloadedNodes)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
Expand Down
9 changes: 5 additions & 4 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ func (wfc *WorkflowController) workflowGarbageCollector(stopCh <-chan struct{})
}
}

func (wfc *WorkflowController) deleteOffloadedNodesForWorkflow(uid string, versions []string) error {
func (wfc *WorkflowController) deleteOffloadedNodesForWorkflow(uid string, versions []sqldb.NodesRecord) error {
workflows, err := wfc.wfInformer.GetIndexer().ByIndex(indexes.UIDIndex, uid)
if err != nil {
return err
Expand Down Expand Up @@ -736,11 +736,12 @@ func (wfc *WorkflowController) deleteOffloadedNodesForWorkflow(uid string, versi
return fmt.Errorf("expected no more than 1 workflow, got %d", l)
}
for _, version := range versions {
// skip delete if offload is live
if wf != nil && wf.Status.OffloadNodeStatusVersion == version {
// skip delete if offload is live or generation is greater or equal generation in cache.
// Generation will be increased every update, see k8s.io/apiextensions-apiserver/pkg/registry/customresource/strategy.go
if wf != nil && (wf.Status.OffloadNodeStatusVersion == version.Version || version.Generation >= wf.Generation) {
continue
}
if err := wfc.offloadNodeStatusRepo.Delete(uid, version); err != nil {
if err := wfc.offloadNodeStatusRepo.Delete(uid, version.Version); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator_persist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

func getMockDBCtx(expectedError error, largeWfSupport bool) (*mocks.OffloadNodeStatusRepo, hydrator.Interface) {
mockDBRepo := &mocks.OffloadNodeStatusRepo{}
mockDBRepo.On("Save", mock.Anything, mock.Anything, mock.Anything).Return("my-offloaded-version", expectedError)
mockDBRepo.On("Save", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("my-offloaded-version", expectedError)
mockDBRepo.On("Get", mock.Anything, mock.Anything).Return(wfv1.Nodes{"my-node": wfv1.NodeStatus{}}, nil)
mockDBRepo.On("IsEnabled").Return(largeWfSupport)
return mockDBRepo, hydrator.New(mockDBRepo)
Expand Down
2 changes: 1 addition & 1 deletion workflow/hydrator/hydrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (h hydrator) Dehydrate(wf *wfv1.Workflow) error {
}
offloadErr := waitutil.Backoff(writeRetry, func() (bool, error) {
var offloadErr error
offloadVersion, offloadErr = h.offloadNodeStatusRepo.Save(string(wf.UID), wf.Namespace, wf.Status.Nodes)
offloadVersion, offloadErr = h.offloadNodeStatusRepo.Save(string(wf.UID), wf.Namespace, wf.Generation, wf.Status.Nodes)
return !errorsutil.IsTransientErr(offloadErr), offloadErr
})
if offloadErr != nil {
Expand Down
8 changes: 4 additions & 4 deletions workflow/hydrator/hydrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ func TestHydrator(t *testing.T) {
})
t.Run("Offload", func(t *testing.T) {
offloadNodeStatusRepo := &sqldbmocks.OffloadNodeStatusRepo{}
offloadNodeStatusRepo.On("Save", "my-uid", "my-ns", mock.Anything).Return("my-offload-version", nil)
offloadNodeStatusRepo.On("Save", "my-uid", "my-ns", int64(1), mock.Anything).Return("my-offload-version", nil)
hydrator := New(offloadNodeStatusRepo)
wf := &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{UID: "my-uid", Namespace: "my-ns"},
ObjectMeta: metav1.ObjectMeta{UID: "my-uid", Namespace: "my-ns", Generation: 1},
Spec: wfv1.WorkflowSpec{Entrypoint: "main"},
Status: wfv1.WorkflowStatus{Nodes: wfv1.Nodes{"foo": wfv1.NodeStatus{}, "bar": wfv1.NodeStatus{}, "baz": wfv1.NodeStatus{}, "qux": wfv1.NodeStatus{}}},
}
Expand All @@ -72,10 +72,10 @@ func TestHydrator(t *testing.T) {
})
t.Run("WorkflowTooLargeButOffloadNotSupported", func(t *testing.T) {
offloadNodeStatusRepo := &sqldbmocks.OffloadNodeStatusRepo{}
offloadNodeStatusRepo.On("Save", "my-uid", "my-ns", mock.Anything).Return("my-offload-version", sqldb.OffloadNotSupportedError)
offloadNodeStatusRepo.On("Save", "my-uid", "my-ns", int64(1), mock.Anything).Return("my-offload-version", sqldb.OffloadNotSupportedError)
hydrator := New(offloadNodeStatusRepo)
wf := &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{UID: "my-uid", Namespace: "my-ns"},
ObjectMeta: metav1.ObjectMeta{UID: "my-uid", Namespace: "my-ns", Generation: 1},
Spec: wfv1.WorkflowSpec{Entrypoint: "main"},
Status: wfv1.WorkflowStatus{Nodes: wfv1.Nodes{"foo": wfv1.NodeStatus{}, "bar": wfv1.NodeStatus{}, "baz": wfv1.NodeStatus{}, "qux": wfv1.NodeStatus{}}},
}
Expand Down

0 comments on commit b5c8645

Please sign in to comment.