Skip to content

Commit

Permalink
fix: Add user info to suspended node when resuming
Browse files Browse the repository at this point in the history
Signed-off-by: toyamagu-2021 <[email protected]>
  • Loading branch information
toyamagu-2021 committed Sep 6, 2023
1 parent 94fbb3b commit 6791c6a
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 21 deletions.
2 changes: 1 addition & 1 deletion test/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1705,7 +1705,7 @@ spec:
}
nodeStatus = status.Nodes.FindByDisplayName("approve")
if assert.NotNil(t, nodeStatus) {
assert.Equal(t, "Test message", nodeStatus.Message)
assert.Equal(t, "Test message; Resumed by: map[User:system:serviceaccount:argo:argo-server]", nodeStatus.Message)
}
})
}
Expand Down
18 changes: 18 additions & 0 deletions workflow/creator/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,21 @@ func dnsFriendly(s string) string {
value = regexp.MustCompile("[^a-z0-9A-Z]*$").ReplaceAllString(value, "")
return value
}

func UserInfoMap(ctx context.Context) map[string]string {
claims := auth.GetClaims(ctx)
if claims == nil {
return nil
}
res := map[string]string{}
if claims.Subject != "" {
res["User"] = claims.Subject
}
if claims.Email != "" {
res["Email"] = claims.Email
}
if claims.PreferredUsername != "" {
res["PreferredUsername"] = claims.PreferredUsername
}
return res
}
17 changes: 17 additions & 0 deletions workflow/creator/creator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,20 @@ func TestLabel(t *testing.T) {
}
})
}

func TestUserInfoMap(t *testing.T) {
t.Run("NotEmpty", func(t *testing.T) {
ctx := context.WithValue(context.TODO(), auth.ClaimsKey,
&types.Claims{Claims: jwt.Claims{Subject: strings.Repeat("x", 63) + "y"}, Email: "my@email", PreferredUsername: "username"})
uim := UserInfoMap(ctx)
assert.Equal(t, map[string]string{
"User": strings.Repeat("x", 63) + "y",
"Email": "my@email",
"PreferredUsername": "username",
}, uim)
})
t.Run("Empty", func(t *testing.T) {
uim := UserInfoMap(context.TODO())
assert.Nil(t, uim)
})
}
11 changes: 10 additions & 1 deletion workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,13 @@ func SuspendWorkflow(ctx context.Context, wfIf v1alpha1.WorkflowInterface, workf
// ResumeWorkflow resumes a workflow by setting spec.suspend to nil and any suspended nodes to Successful.
// Retries conflict errors
func ResumeWorkflow(ctx context.Context, wfIf v1alpha1.WorkflowInterface, hydrator hydrator.Interface, workflowName string, nodeFieldSelector string) error {
uiMsg := ""
uim := creator.UserInfoMap(ctx)
if uim != nil {
uiMsg = fmt.Sprintf("Resumed by: %v", uim)
}
if len(nodeFieldSelector) > 0 {
return updateSuspendedNode(ctx, wfIf, hydrator, workflowName, nodeFieldSelector, SetOperationValues{Phase: wfv1.NodeSucceeded})
return updateSuspendedNode(ctx, wfIf, hydrator, workflowName, nodeFieldSelector, SetOperationValues{Phase: wfv1.NodeSucceeded, Message: uiMsg})
} else {
err := waitutil.Backoff(retry.DefaultRetry, func() (bool, error) {
wf, err := wfIf.Get(ctx, workflowName, metav1.GetOptions{})
Expand Down Expand Up @@ -415,6 +420,10 @@ func ResumeWorkflow(ctx context.Context, wfIf v1alpha1.WorkflowInterface, hydrat
}
}
node.Phase = wfv1.NodeSucceeded
if node.Message != "" {
uiMsg = node.Message + "; " + uiMsg
}
node.Message = uiMsg
node.FinishedAt = metav1.Time{Time: time.Now().UTC()}
wf.Status.Nodes.Set(nodeID, node)
workflowUpdated = true
Expand Down
75 changes: 56 additions & 19 deletions workflow/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package util

import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"time"

Expand All @@ -22,6 +24,7 @@ import (
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
argofake "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/creator"
hydratorfake "github.com/argoproj/argo-workflows/v3/workflow/hydrator/fake"
)

Expand Down Expand Up @@ -266,30 +269,64 @@ status:
`

func TestResumeWorkflowByNodeName(t *testing.T) {
wfIf := argofake.NewSimpleClientset().ArgoprojV1alpha1().Workflows("")
origWf := wfv1.MustUnmarshalWorkflow(suspendedWf)
t.Run("Withought user info", func(t *testing.T) {
wfIf := argofake.NewSimpleClientset().ArgoprojV1alpha1().Workflows("")
origWf := wfv1.MustUnmarshalWorkflow(suspendedWf)

ctx := context.Background()
_, err := wfIf.Create(ctx, origWf, metav1.CreateOptions{})
assert.NoError(t, err)
ctx := context.Background()
_, err := wfIf.Create(ctx, origWf, metav1.CreateOptions{})
assert.NoError(t, err)

// will return error as displayName does not match any nodes
err = ResumeWorkflow(ctx, wfIf, hydratorfake.Noop, "suspend", "displayName=nonexistant")
assert.Error(t, err)
// will return error as displayName does not match any nodes
err = ResumeWorkflow(ctx, wfIf, hydratorfake.Noop, "suspend", "displayName=nonexistant")
assert.Error(t, err)

// displayName didn't match suspend node so should still be running
wf, err := wfIf.Get(ctx, "suspend", metav1.GetOptions{})
assert.NoError(t, err)
assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes.FindByDisplayName("approve").Phase)
// displayName didn't match suspend node so should still be running
wf, err := wfIf.Get(ctx, "suspend", metav1.GetOptions{})
assert.NoError(t, err)
assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes.FindByDisplayName("approve").Phase)

err = ResumeWorkflow(ctx, wfIf, hydratorfake.Noop, "suspend", "displayName=approve")
assert.NoError(t, err)
err = ResumeWorkflow(ctx, wfIf, hydratorfake.Noop, "suspend", "displayName=approve")
assert.NoError(t, err)

// displayName matched node so has succeeded
wf, err = wfIf.Get(ctx, "suspend", metav1.GetOptions{})
if assert.NoError(t, err) {
assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes.FindByDisplayName("approve").Phase)
}
// displayName matched node so has succeeded
wf, err = wfIf.Get(ctx, "suspend", metav1.GetOptions{})
if assert.NoError(t, err) {
assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes.FindByDisplayName("approve").Phase)
assert.Equal(t, "", wf.Status.Nodes.FindByDisplayName("approve").Message)
}
})

t.Run("With user info", func(t *testing.T) {
wfIf := argofake.NewSimpleClientset().ArgoprojV1alpha1().Workflows("")
origWf := wfv1.MustUnmarshalWorkflow(suspendedWf)

ctx := context.WithValue(context.TODO(), auth.ClaimsKey,
&types.Claims{Claims: jwt.Claims{Subject: strings.Repeat("x", 63) + "y"}, Email: "my@email", PreferredUsername: "username"})
uim := creator.UserInfoMap(ctx)

_, err := wfIf.Create(ctx, origWf, metav1.CreateOptions{})
assert.NoError(t, err)

// will return error as displayName does not match any nodes
err = ResumeWorkflow(ctx, wfIf, hydratorfake.Noop, "suspend", "displayName=nonexistant")
assert.Error(t, err)

// displayName didn't match suspend node so should still be running
wf, err := wfIf.Get(ctx, "suspend", metav1.GetOptions{})
assert.NoError(t, err)
assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes.FindByDisplayName("approve").Phase)

err = ResumeWorkflow(ctx, wfIf, hydratorfake.Noop, "suspend", "displayName=approve")
assert.NoError(t, err)

// displayName matched node so has succeeded
wf, err = wfIf.Get(ctx, "suspend", metav1.GetOptions{})
if assert.NoError(t, err) {
assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes.FindByDisplayName("approve").Phase)
assert.Equal(t, fmt.Sprintf("Resumed by: %v", uim), wf.Status.Nodes.FindByDisplayName("approve").Message)
}
})
}

func TestStopWorkflowByNodeName(t *testing.T) {
Expand Down

0 comments on commit 6791c6a

Please sign in to comment.