Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore!: remove legacy patch pods fallback #12976

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ const (
AnnotationKeyRBACRule = workflow.WorkflowFullName + "/rbac-rule"
AnnotationKeyRBACRulePrecedence = workflow.WorkflowFullName + "/rbac-rule-precedence"

// AnnotationKeyOutputs is the pod metadata annotation key containing the container outputs
AnnotationKeyOutputs = workflow.WorkflowFullName + "/outputs"
// AnnotationKeyCronWfScheduledTime is the workflow metadata annotation key containing the time when the workflow
// was scheduled to run by CronWorkflow.
AnnotationKeyCronWfScheduledTime = workflow.WorkflowFullName + "/scheduled-time"
Expand All @@ -48,13 +46,6 @@ const (
// AnnotationKeyPodNameVersion stores the pod naming convention version
AnnotationKeyPodNameVersion = workflow.WorkflowFullName + "/pod-name-format"

// AnnotationKeyProgress is N/M progress for the node
AnnotationKeyProgress = workflow.WorkflowFullName + "/progress"

// AnnotationKeyReportOutputsCompleted is an annotation on a workflow pod indicating outputs have completed.
// Only used as a backup in case LabelKeyReportOutputsCompleted can't be added to WorkflowTaskResult.
AnnotationKeyReportOutputsCompleted = workflow.WorkflowFullName + "/report-outputs-completed"

// AnnotationKeyArtifactGCStrategy is listed as an annotation on the Artifact GC Pod to identify
// the strategy whose artifacts are being deleted
AnnotationKeyArtifactGCStrategy = workflow.WorkflowFullName + "/artifact-gc-strategy"
Expand Down
14 changes: 0 additions & 14 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,16 +444,6 @@ func listPods(woc *wfOperationCtx) (*apiv1.PodList, error) {

type with func(pod *apiv1.Pod)

func withOutputs(v interface{}) with {
switch x := v.(type) {
case string:
return withAnnotation(common.AnnotationKeyOutputs, x)
default:
return withOutputs(wfv1.MustMarshallJSON(x))
}
}
func withProgress(v string) with { return withAnnotation(common.AnnotationKeyProgress, v) }

func withExitCode(v int32) with {
return func(pod *apiv1.Pod) {
for _, c := range pod.Spec.Containers {
Expand All @@ -469,10 +459,6 @@ func withExitCode(v int32) with {
}
}

func withAnnotation(key, val string) with {
return func(pod *apiv1.Pod) { pod.Annotations[key] = val }
}

// createRunningPods creates the pods that are marked as running in a given test so that they can be accessed by the
// pod assessor
func createRunningPods(ctx context.Context, woc *wfOperationCtx) {
Expand Down
21 changes: 10 additions & 11 deletions workflow/controller/exit_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)
Expand All @@ -24,7 +23,7 @@ spec:
steps:
- - name: leafA
hooks:
exit:
exit:
template: exitContainer
arguments:
parameters:
Expand All @@ -33,7 +32,7 @@ spec:
template: whalesay
- - name: leafB
hooks:
exit:
exit:
template: exitContainer
arguments:
parameters:
Expand Down Expand Up @@ -92,7 +91,7 @@ spec:
tasks:
- name: leafA
hooks:
exit:
exit:
template: exitContainer
arguments:
parameters:
Expand All @@ -102,7 +101,7 @@ spec:
- name: leafB
dependencies: [leafA]
hooks:
exit:
exit:
template: exitContainer
arguments:
parameters:
Expand Down Expand Up @@ -160,7 +159,7 @@ spec:
steps:
- - name: leafA
hooks:
exit:
exit:
template: exitContainer
arguments:
artifacts:
Expand Down Expand Up @@ -236,7 +235,7 @@ spec:
tasks:
- name: leafA
hooks:
exit:
exit:
template: exitContainer
arguments:
artifacts:
Expand Down Expand Up @@ -314,7 +313,7 @@ spec:
template: whalesay
- - name: leafB
hooks:
exit:
exit:
template: exitContainer
arguments:
parameters:
Expand Down Expand Up @@ -356,7 +355,7 @@ func TestStepsTmplOnExit(t *testing.T) {
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
makePodsPhase(ctx, woc, apiv1.PodSucceeded, withOutputs(wfv1.Outputs{Result: pointer.String("ok"), Parameters: []wfv1.Parameter{{}}}))
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc1 := newWorkflowOperationCtx(woc.wf, controller)
woc1.operate(ctx)
assert.Equal(t, wfv1.WorkflowRunning, woc1.wf.Status.Phase)
Expand Down Expand Up @@ -419,7 +418,7 @@ spec:
- name: leafB
dependencies: [leafA]
hooks:
exit:
exit:
template: exitContainer
arguments:
parameters:
Expand Down Expand Up @@ -461,7 +460,7 @@ func TestDAGOnExit(t *testing.T) {
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
makePodsPhase(ctx, woc, apiv1.PodSucceeded, withOutputs(wfv1.Outputs{Parameters: []wfv1.Parameter{{}}}))
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc1 := newWorkflowOperationCtx(woc.wf, controller)
woc1.operate(ctx)
assert.Equal(t, wfv1.WorkflowRunning, woc1.wf.Status.Phase)
Expand Down
28 changes: 0 additions & 28 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1405,40 +1405,12 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus
new.PodIP = pod.Status.PodIP
}

if x, ok := pod.Annotations[common.AnnotationKeyReportOutputsCompleted]; ok {
woc.log.Warn("workflow uses legacy/insecure pod patch, see https://argo-workflows.readthedocs.io/en/latest/workflow-rbac/")
resultName := woc.nodeID(pod)
if x == "true" {
woc.wf.Status.MarkTaskResultComplete(resultName)
} else {
woc.wf.Status.MarkTaskResultIncomplete(resultName)
}
}

if x, ok := pod.Annotations[common.AnnotationKeyOutputs]; ok {
woc.log.Warn("workflow uses legacy/insecure pod patch, see https://argo-workflows.readthedocs.io/en/latest/workflow-rbac/")
if new.Outputs == nil {
new.Outputs = &wfv1.Outputs{}
}
if err := json.Unmarshal([]byte(x), new.Outputs); err != nil {
new.Phase = wfv1.NodeError
new.Message = err.Error()
}
}

new.HostNodeName = pod.Spec.NodeName

if !new.Progress.IsValid() {
new.Progress = wfv1.ProgressDefault
}

if x, ok := pod.Annotations[common.AnnotationKeyProgress]; ok {
woc.log.Warn("workflow uses legacy/insecure pod patch, see https://argo-workflows.readthedocs.io/en/latest/workflow-rbac/")
if p, ok := wfv1.ParseProgress(x); ok {
new.Progress = p
}
}

// We capture the exit-code after we look for the task-result.
// All other outputs are set by the executor, only the exit-code is set by the controller.
// By waiting, we avoid breaking the race-condition check.
Expand Down
11 changes: 5 additions & 6 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ spec:
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)

makePodsPhase(ctx, woc, apiv1.PodRunning, withProgress("50/100"))
makePodsPhase(ctx, woc, apiv1.PodRunning)
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)

Expand All @@ -301,7 +301,7 @@ spec:
pod := woc.wf.Status.Nodes.FindByDisplayName("pod")
assert.Equal(t, wfv1.Progress("50/100"), pod.Progress)

makePodsPhase(ctx, woc, apiv1.PodSucceeded, withProgress("100/100"))
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)

Expand Down Expand Up @@ -6175,7 +6175,7 @@ func TestConfigMapCacheSaveOperate(t *testing.T) {

ctx := context.Background()
woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded, withExitCode(0), withOutputs(wfv1.MustMarshallJSON(sampleOutputs)))
makePodsPhase(ctx, woc, apiv1.PodSucceeded, withExitCode(0))
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)

Expand Down Expand Up @@ -6465,7 +6465,7 @@ spec:
assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)

// make all created pods as successful
makePodsPhase(ctx, woc, apiv1.PodSucceeded, withOutputs(`{"parameters": [{"name": "my-param"}]}`))
makePodsPhase(ctx, woc, apiv1.PodSucceeded)

// reconcile
woc = newWorkflowOperationCtx(woc.wf, controller)
Expand Down Expand Up @@ -8439,7 +8439,6 @@ func TestWFGlobalArtifactNil(t *testing.T) {
makePodsPhase(ctx, woc, apiv1.PodRunning)
woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodFailed, func(pod *apiv1.Pod) {
pod.Annotations[common.AnnotationKeyOutputs] = string("{\"parameters\":[{\"name\":\"hello-param\",\"valueFrom\":{\"path\":\"/tmp/hello_world.txt\"},\"globalName\":\"my-global-param\"}],\"artifacts\":[{\"name\":\"hello-art\",\"path\":\"/tmp/hello_world.txt\",\"globalName\":\"my-global-art\"}]}")
pod.Status.ContainerStatuses = []apiv1.ContainerStatus{
{
Name: "main",
Expand Down Expand Up @@ -10661,7 +10660,7 @@ spec:
script:
image: python:alpine3.6
command: [python]
env:
env:
- name: message
value: "{{inputs.parameters.message}}"
source: |
Expand Down
24 changes: 3 additions & 21 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,9 +799,7 @@ func (we *WorkflowExecutor) FinalizeOutput(ctx context.Context) {
common.LabelKeyReportOutputsCompleted: "true",
})
if apierr.IsForbidden(err) || apierr.IsNotFound(err) {
log.WithError(err).Warn("failed to patch task result, falling back to legacy/insecure pod patch, see https://argo-workflows.readthedocs.io/en/latest/workflow-rbac/")
// Only added as a backup in case LabelKeyReportOutputsCompleted could not be set
err = we.AddAnnotation(ctx, common.AnnotationKeyReportOutputsCompleted, "true")
log.WithError(err).Warn("failed to patch task result, see https://argo-workflows.readthedocs.io/en/latest/workflow-rbac/")
}
return err
})
Expand All @@ -820,9 +818,7 @@ func (we *WorkflowExecutor) InitializeOutput(ctx context.Context) {
}, errorsutil.IsTransientErr, func() error {
err := we.upsertTaskResult(ctx, wfv1.NodeResult{})
if apierr.IsForbidden(err) {
log.WithError(err).Warn("failed to patch task result, falling back to legacy/insecure pod patch, see https://argo-workflows.readthedocs.io/en/latest/workflow-rbac/")
// Only added as a backup in case LabelKeyReportOutputsCompleted could not be set
err = we.AddAnnotation(ctx, common.AnnotationKeyReportOutputsCompleted, "false")
log.WithError(err).Warn("failed to patch task result, see https://argo-workflows.readthedocs.io/en/latest/workflow-rbac/")
}
return err
})
Expand All @@ -848,22 +844,8 @@ func (we *WorkflowExecutor) reportResult(ctx context.Context, result wfv1.NodeRe
}, errorsutil.IsTransientErr, func() error {
err := we.upsertTaskResult(ctx, result)
if apierr.IsForbidden(err) {
log.WithError(err).Warn("failed to patch task result, falling back to legacy/insecure pod patch, see https://argo-workflows.readthedocs.io/en/latest/workflow-rbac/")
if result.Outputs.HasOutputs() {
value, err := json.Marshal(result.Outputs)
if err != nil {
return err
}

return we.AddAnnotation(ctx, common.AnnotationKeyOutputs, string(value))
}
if result.Progress.IsValid() { // this may result in occasionally two patches
return we.AddAnnotation(ctx, common.AnnotationKeyProgress, string(result.Progress))
}
// Only added as a backup in case LabelKeyReportOutputsCompleted could not be set
return we.AddAnnotation(ctx, common.AnnotationKeyReportOutputsCompleted, "false")
log.WithError(err).Warn("failed to patch task result, see https://argo-workflows.readthedocs.io/en/latest/workflow-rbac/")
}

return err
})
}
Expand Down
Loading