From 5319a292a4cbb5f4cc6c9e0466898af58c300a1f Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 26 Oct 2023 12:46:32 -0700 Subject: [PATCH] init failure node Signed-off-by: Kevin Su --- charts/flyte-core/values-eks.yaml | 2 +- .../apis/flyteworkflow/v1alpha1/workflow.go | 2 ++ .../pkg/compiler/transformers/k8s/node.go | 4 ++++ .../pkg/controller/executors/node_lookup.go | 2 ++ .../pkg/controller/nodes/executor.go | 3 ++- .../pkg/controller/nodes/node_exec_context.go | 1 + .../pkg/controller/workflow/executor.go | 24 +++++++++++++++---- 7 files changed, 32 insertions(+), 6 deletions(-) diff --git a/charts/flyte-core/values-eks.yaml b/charts/flyte-core/values-eks.yaml index 7b7ca446f5..d5769b5640 100644 --- a/charts/flyte-core/values-eks.yaml +++ b/charts/flyte-core/values-eks.yaml @@ -251,7 +251,7 @@ configmap: propeller: resourcemanager: type: noop - # Note: By default resource manager is disable for propeller, Please use `type: redis` to enaable + # Note: By default resource manager is disabled for propeller, Please use `type: redis` to enable # type: redis # resourceMaxQuota: 10000 # redis: diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go index 225a49ac3f..d86ac02340 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "fmt" "github.com/golang/protobuf/jsonpb" "github.com/pkg/errors" @@ -331,6 +332,7 @@ func (in *WorkflowSpec) GetOutputs() *OutputVarMap { } func (in *WorkflowSpec) GetNode(nodeID NodeID) (ExecutableNode, bool) { + fmt.Print("Getting node ", nodeID, " from ", in.Nodes) n, ok := in.Nodes[nodeID] return n, ok } diff --git a/flytepropeller/pkg/compiler/transformers/k8s/node.go b/flytepropeller/pkg/compiler/transformers/k8s/node.go index 2ac06ebd89..26de17f346 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/node.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/node.go @@ -1,6 +1,8 @@ package k8s import ( + "context" + "github.com/flyteorg/flyte/flytestdlib/logger" "strings" "github.com/go-test/deep" @@ -33,7 +35,9 @@ func buildNodeSpec(n *core.Node, tasks []*core.CompiledTask, errs errors.Compile if n.GetTaskNode() != nil { taskID := n.GetTaskNode().GetReferenceId().String() // TODO: Use task index for quick lookup + logger.Info(context.Background(), "kevin Looking up task", "taskID", taskID) for _, t := range tasks { + logger.Infof(context.Background(), "kevin Comparing %v with %v", t.Template.Id.String(), taskID) if t.Template.Id.String() == taskID { task = t.Template break diff --git a/flytepropeller/pkg/controller/executors/node_lookup.go b/flytepropeller/pkg/controller/executors/node_lookup.go index 0a714ab4e7..2e82f5920e 100644 --- a/flytepropeller/pkg/controller/executors/node_lookup.go +++ b/flytepropeller/pkg/controller/executors/node_lookup.go @@ -2,6 +2,7 @@ package executors import ( "context" + "fmt" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) @@ -46,6 +47,7 @@ type staticNodeLookup struct { } func (s staticNodeLookup) GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool) { + fmt.Print("staticNodeLookup.GetNode") n, ok := s.nodes[nodeID] return n, ok } diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index b9d727db7e..bd96edcd5f 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -180,6 +180,7 @@ func (c *recursiveNodeExecutor) RecursiveNodeHandler(ctx context.Context, execCo currentNodeCtx := contextutils.WithNodeID(ctx, currentNode.GetID()) nodeStatus := nl.GetNodeExecutionStatus(ctx, currentNode.GetID()) nodePhase := nodeStatus.GetPhase() + logger.Infof(currentNodeCtx, "Handling node [%v] Status [%v]", currentNode.GetID(), nodePhase.String()) if canHandleNode(nodePhase) { // TODO Follow up Pull Request, @@ -989,7 +990,7 @@ func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executor if np != nodeStatus.GetPhase() { // assert np == Queued! - logger.Infof(ctx, "Change in node state detected from [%s] -> [%s]", nodeStatus.GetPhase().String(), np.String()) + logger.Infof(ctx, "Change in node state detected1 from [%s] -> [%s]", nodeStatus.GetPhase().String(), np.String()) p = p.WithOccuredAt(occurredAt) nev, err := ToNodeExecutionEvent(nCtx.NodeExecutionMetadata().GetNodeExecutionID(), diff --git a/flytepropeller/pkg/controller/nodes/node_exec_context.go b/flytepropeller/pkg/controller/nodes/node_exec_context.go index f42f8b0324..d3029f954f 100644 --- a/flytepropeller/pkg/controller/nodes/node_exec_context.go +++ b/flytepropeller/pkg/controller/nodes/node_exec_context.go @@ -261,6 +261,7 @@ func isAboveInterruptibleFailureThreshold(numFailures uint32, maxAttempts uint32 func (c *nodeExecutor) BuildNodeExecutionContext(ctx context.Context, executionContext executors.ExecutionContext, nl executors.NodeLookup, currentNodeID v1alpha1.NodeID) (interfaces.NodeExecutionContext, error) { + fmt.Printf("-------------- BuildNodeExecutionContext for node [%v] in execution [%v]\n", currentNodeID, executionContext.GetID()) n, ok := nl.GetNode(currentNodeID) if !ok { return nil, fmt.Errorf("failed to find node with ID [%s] in execution [%s]", currentNodeID, executionContext.GetID()) diff --git a/flytepropeller/pkg/controller/workflow/executor.go b/flytepropeller/pkg/controller/workflow/executor.go index 13957606d1..d92c0cd939 100644 --- a/flytepropeller/pkg/controller/workflow/executor.go +++ b/flytepropeller/pkg/controller/workflow/executor.go @@ -170,13 +170,22 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.FlyteWorkflow) (Status, error) { execErr := executionErrorOrDefault(w.GetExecutionStatus().GetExecutionError(), w.GetExecutionStatus().GetMessage()) errorNode := w.GetOnFailureNode() + logger.Infof(ctx, "Handling FailureNode [%v]", errorNode) execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow()) - state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, w, errorNode) + + // TODO: GetNodeExecutionStatus doesn't work. How do we get the error node status from CRD + status := w.GetExecutionStatus().GetNodeExecutionStatus(ctx, errorNode.GetID()) + failureNodeLookup := executors.NewFailureNodeLookup(errorNode.(*v1alpha1.NodeSpec), status) + state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, failureNodeLookup, errorNode) + logger.Infof(ctx, "FailureNode [%v] finished with state [%v]", errorNode, state) + logger.Infof(ctx, "FailureNode [%v] finished with error [%v]", errorNode, err) if err != nil { + logger.Infof(ctx, "test") return StatusFailureNode(execErr), err } if state.HasFailed() { + logger.Infof(ctx, "test1 [%v]", state.Err) return StatusFailed(state.Err), nil } @@ -187,6 +196,8 @@ func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.Fl Message: "FailureNode Timed-out"}), nil } + logger.Infof(ctx, "test2") + if state.PartiallyComplete() { // Re-enqueue the workflow c.enqueueWorkflow(w.GetK8sWorkflowID().String()) @@ -220,6 +231,7 @@ func (c *workflowExecutor) handleFailingWorkflow(ctx context.Context, w *v1alpha } errorNode := w.GetOnFailureNode() + logger.Infof(ctx, "Handling xxx FailureNode [%v]", errorNode) if errorNode != nil { return StatusFailureNode(execErr), nil } @@ -282,13 +294,17 @@ func (c *workflowExecutor) TransitionToPhase(ctx context.Context, execID *core.W wfEvent.Phase = core.WorkflowExecution_RUNNING wStatus.UpdatePhase(v1alpha1.WorkflowPhaseRunning, "Workflow Started", nil) wfEvent.OccurredAt = utils.GetProtoTime(wStatus.GetStartedAt()) - case v1alpha1.WorkflowPhaseHandlingFailureNode: - fallthrough case v1alpha1.WorkflowPhaseFailing: wfEvent.Phase = core.WorkflowExecution_FAILING wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError) wStatus.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "", wfEvent.GetError()) wfEvent.OccurredAt = utils.GetProtoTime(nil) + case v1alpha1.WorkflowPhaseHandlingFailureNode: + // TODO: Add core.WorkflowPhaseHandlingFailureNode to proto + wfEvent.Phase = core.WorkflowExecution_FAILING + wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError) + wStatus.UpdatePhase(v1alpha1.WorkflowPhaseHandlingFailureNode, "", wfEvent.GetError()) + wfEvent.OccurredAt = utils.GetProtoTime(nil) case v1alpha1.WorkflowPhaseFailed: wfEvent.Phase = core.WorkflowExecution_FAILED wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError) @@ -422,7 +438,7 @@ func (c *workflowExecutor) HandleFlyteWorkflow(ctx context.Context, w *v1alpha1. case v1alpha1.WorkflowPhaseHandlingFailureNode: newStatus, err := c.handleFailureNode(ctx, w) if err != nil { - return err + return errors.Errorf("failed to handle failure node for workflow [%s], err: [%s]", w.ID, err.Error()) } failureErr := c.TransitionToPhase(ctx, w.ExecutionID.WorkflowExecutionIdentifier, wStatus, newStatus) // Ignore ExecutionNotFound and IncompatibleCluster errors to allow graceful failure