From 344bde3e3ed50710a78c1816a56cae9e3a342356 Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Thu, 28 Mar 2024 17:57:21 -0700 Subject: [PATCH 1/3] correctly set task execution phase for array node Signed-off-by: Paul Dittamo --- .../pkg/controller/nodes/array/handler.go | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 991a28c1c3..0adda16206 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -70,6 +70,11 @@ func (a *arrayNodeHandler) Abort(ctx context.Context, nCtx interfaces.NodeExecut eventRecorder := newArrayEventRecorder(nCtx.EventsRecorder()) messageCollector := errorcollector.NewErrorMessageCollector() + + taskPhase := idlcore.TaskExecution_ABORTED + if arrayNodeState.Phase == v1alpha1.ArrayNodePhaseFailing { + taskPhase = idlcore.TaskExecution_FAILED + } switch arrayNodeState.Phase { case v1alpha1.ArrayNodePhaseExecuting, v1alpha1.ArrayNodePhaseFailing: for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { @@ -110,7 +115,7 @@ func (a *arrayNodeHandler) Abort(ctx context.Context, nCtx interfaces.NodeExecut } // update state for subNodes - if err := eventRecorder.finalize(ctx, nCtx, idlcore.TaskExecution_ABORTED, 0, a.eventConfig); err != nil { + if err := eventRecorder.finalize(ctx, nCtx, taskPhase, 0, a.eventConfig); err != nil { logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) return err } @@ -407,6 +412,12 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu return handler.UnknownTransition, err } + // ensure task_execution set to failed + if err := eventRecorder.finalize(ctx, nCtx, idlcore.TaskExecution_FAILED, 0, a.eventConfig); err != nil { + logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) + return handler.UnknownTransition, err + } + // fail with reported error if one exists if arrayNodeState.Error != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(arrayNodeState.Error, nil)), nil @@ -505,7 +516,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } } case v1alpha1.NodeKindWorkflow: - // TODO - to support launchplans we will need to process the output interface variables here + // TODO - to support launchplans we will need to process the output interface variables here fallthrough default: logger.Warnf(ctx, "ArrayNode does not support pre-populating outputLiteral collections for node kind '%s'", arrayNode.GetSubNodeSpec().GetKind()) @@ -539,6 +550,12 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu return handler.UnknownTransition, err } + // ensure task_execution set to succeeded + if err := eventRecorder.finalize(ctx, nCtx, idlcore.TaskExecution_SUCCEEDED, 0, a.eventConfig); err != nil { + logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) + return handler.UnknownTransition, err + } + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess( &handler.ExecutionInfo{ OutputInfo: &handler.OutputInfo{ From 0e0d7c4254cf8cd920c6b881e28b6137eca05f51 Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Fri, 23 Aug 2024 14:58:05 -0700 Subject: [PATCH 2/3] clean up Signed-off-by: Paul Dittamo --- .../pkg/controller/nodes/array/handler.go | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 500592627f..f45963e24c 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -120,11 +120,10 @@ func (a *arrayNodeHandler) Abort(ctx context.Context, nCtx interfaces.NodeExecut // update state for subNodes if err := eventRecorder.finalize(ctx, nCtx, taskPhase, 0, a.eventConfig); err != nil { // a task event with abort phase is already emitted when handling ArrayNodePhaseFailing - if eventsErr.IsAlreadyExists(err) { - return nil + if !eventsErr.IsAlreadyExists(err) { + logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) + return err } - logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) - return err } return nil @@ -429,10 +428,12 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu return handler.UnknownTransition, err } - // ensure task_execution set to failed + // ensure task_execution set to failed - this should already be sent by the abort handler if err := eventRecorder.finalize(ctx, nCtx, idlcore.TaskExecution_FAILED, 0, a.eventConfig); err != nil { - logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) - return handler.UnknownTransition, err + if !eventsErr.IsAlreadyExists(err) { + logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) + return handler.UnknownTransition, err + } } // fail with reported error if one exists @@ -569,8 +570,10 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // ensure task_execution set to succeeded if err := eventRecorder.finalize(ctx, nCtx, idlcore.TaskExecution_SUCCEEDED, 0, a.eventConfig); err != nil { - logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) - return handler.UnknownTransition, err + if !eventsErr.IsAlreadyExists(err) { + logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) + return handler.UnknownTransition, err + } } return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess( From 8e4f4fae8d0e910110f92b6e1d029b97390efc41 Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Mon, 13 Jan 2025 00:30:00 -0800 Subject: [PATCH 3/3] update unit tests Signed-off-by: Paul Dittamo --- .../pkg/controller/nodes/array/handler.go | 9 +-- .../controller/nodes/array/handler_test.go | 55 +++++++++++++------ 2 files changed, 40 insertions(+), 24 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 802d9ddd38..46227cd3b4 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -466,18 +466,11 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu nCtx.ExecutionContext().IncrementParallelism() } case v1alpha1.ArrayNodePhaseFailing: + // note: sub node eventing handled during Abort if err := a.Abort(ctx, nCtx, "ArrayNodeFailing"); err != nil { return handler.UnknownTransition, err } - // ensure task_execution set to failed - this should already be sent by the abort handler - if err := eventRecorder.finalize(ctx, nCtx, idlcore.TaskExecution_FAILED, 0, a.eventConfig); err != nil { - if !eventsErr.IsAlreadyExists(err) { - logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) - return handler.UnknownTransition, err - } - } - // fail with reported error if one exists if arrayNodeState.Error != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(arrayNodeState.Error, nil)), nil diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index ac0e4b45ad..eb9d468532 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -202,19 +202,6 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte func TestAbort(t *testing.T) { ctx := context.Background() - scope := promutils.NewTestScope() - dataStore, err := storage.NewDataStore(&storage.Config{ - Type: storage.TypeMemory, - }, scope) - assert.NoError(t, err) - - nodeHandler := &mocks.NodeHandler{} - nodeHandler.OnAbortMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) - nodeHandler.OnFinalizeMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) - - // initialize ArrayNodeHandler - arrayNodeHandler, err := createArrayNodeHandler(ctx, t, nodeHandler, dataStore, scope) - assert.NoError(t, err) tests := []struct { name string @@ -222,20 +209,49 @@ func TestAbort(t *testing.T) { subNodePhases []v1alpha1.NodePhase subNodeTaskPhases []core.Phase expectedExternalResourcePhases []idlcore.TaskExecution_Phase + arrayNodeState v1alpha1.ArrayNodePhase + expectedTaskExecutionPhase idlcore.TaskExecution_Phase }{ { - name: "Success", + name: "Aborted after failed", + inputMap: map[string][]int64{ + "foo": []int64{0, 1, 2}, + }, + subNodePhases: []v1alpha1.NodePhase{v1alpha1.NodePhaseSucceeded, v1alpha1.NodePhaseRunning, v1alpha1.NodePhaseNotYetStarted}, + subNodeTaskPhases: []core.Phase{core.PhaseSuccess, core.PhaseRunning, core.PhaseUndefined}, + expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_ABORTED}, + arrayNodeState: v1alpha1.ArrayNodePhaseFailing, + expectedTaskExecutionPhase: idlcore.TaskExecution_FAILED, + }, + { + name: "Aborted while running", inputMap: map[string][]int64{ "foo": []int64{0, 1, 2}, }, subNodePhases: []v1alpha1.NodePhase{v1alpha1.NodePhaseSucceeded, v1alpha1.NodePhaseRunning, v1alpha1.NodePhaseNotYetStarted}, subNodeTaskPhases: []core.Phase{core.PhaseSuccess, core.PhaseRunning, core.PhaseUndefined}, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_ABORTED}, + arrayNodeState: v1alpha1.ArrayNodePhaseExecuting, + expectedTaskExecutionPhase: idlcore.TaskExecution_ABORTED, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + scope := promutils.NewTestScope() + dataStore, err := storage.NewDataStore(&storage.Config{ + Type: storage.TypeMemory, + }, scope) + assert.NoError(t, err) + + nodeHandler := &mocks.NodeHandler{} + nodeHandler.OnAbortMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) + nodeHandler.OnFinalizeMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) + + // initialize ArrayNodeHandler + arrayNodeHandler, err := createArrayNodeHandler(ctx, t, nodeHandler, dataStore, scope) + assert.NoError(t, err) + // initialize universal variables literalMap := convertMapToArrayLiterals(test.inputMap) @@ -250,7 +266,7 @@ func TestAbort(t *testing.T) { // initialize ArrayNodeState arrayNodeState := &handler.ArrayNodeState{ - Phase: v1alpha1.ArrayNodePhaseFailing, + Phase: test.arrayNodeState, } for _, item := range []struct { arrayReference *bitarray.CompactArray @@ -279,12 +295,13 @@ func TestAbort(t *testing.T) { nCtx := createNodeExecutionContext(dataStore, eventRecorder, nil, literalMap, &arrayNodeSpec, arrayNodeState, 0, workflowMaxParallelism) // evaluate node - err := arrayNodeHandler.Abort(ctx, nCtx, "foo") + err = arrayNodeHandler.Abort(ctx, nCtx, "foo") assert.NoError(t, err) nodeHandler.AssertNumberOfCalls(t, "Abort", len(test.expectedExternalResourcePhases)) if len(test.expectedExternalResourcePhases) > 0 { assert.Equal(t, 1, len(eventRecorder.taskExecutionEvents)) + assert.Equal(t, test.expectedTaskExecutionPhase, eventRecorder.taskExecutionEvents[0].GetPhase()) externalResources := eventRecorder.taskExecutionEvents[0].GetMetadata().GetExternalResources() assert.Equal(t, len(test.expectedExternalResourcePhases), len(externalResources)) @@ -1296,6 +1313,9 @@ func TestHandleArrayNodePhaseSucceeding(t *testing.T) { assert.Equal(t, int64(*outputValue), collection.GetLiterals()[i].GetScalar().GetPrimitive().GetInteger()) } } + + assert.Equal(t, 1, len(eventRecorder.taskExecutionEvents)) + assert.Equal(t, idlcore.TaskExecution_SUCCEEDED, eventRecorder.taskExecutionEvents[0].GetPhase()) }) } } @@ -1374,6 +1394,9 @@ func TestHandleArrayNodePhaseFailing(t *testing.T) { assert.Equal(t, test.expectedArrayNodePhase, arrayNodeState.Phase) assert.Equal(t, test.expectedTransitionPhase, transition.Info().GetPhase()) nodeHandler.AssertNumberOfCalls(t, "Abort", test.expectedAbortCalls) + + assert.Equal(t, 1, len(eventRecorder.taskExecutionEvents)) + assert.Equal(t, idlcore.TaskExecution_FAILED, eventRecorder.taskExecutionEvents[0].GetPhase()) }) } }