Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw committed Nov 16, 2023
1 parent 6730797 commit 5d0c2b2
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions flytepropeller/pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,35 +174,36 @@ func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.Fl
execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow())

// TODO: GetNodeExecutionStatus doesn't work. How do we get the error node status from CRD
failureNodeLookup := executors.NewFailureNodeLookup(errorNode.(*v1alpha1.NodeSpec), w.GetNode(v1alpha1.StartNodeID), w.GetExecutionStatus())
state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, failureNodeLookup, errorNode)
startNode, _ := w.GetNode(v1alpha1.StartNodeID)
failureNodeLookup := executors.NewFailureNodeLookup(errorNode.(*v1alpha1.NodeSpec), startNode, w.GetExecutionStatus())
state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, failureNodeLookup, failureNodeLookup, errorNode)
logger.Infof(ctx, "FailureNode [%v] finished with state [%v]", errorNode, state)
logger.Infof(ctx, "FailureNode [%v] finished with error [%v]", errorNode, err)
if err != nil {
logger.Infof(ctx, "test")
return StatusFailureNode(execErr), err
}

if state.HasFailed() {
logger.Infof(ctx, "test1 [%v]", state.Err)
switch state.NodePhase {
case interfaces.NodePhaseFailed:
return StatusFailed(state.Err), nil
}

if state.HasTimedOut() {
case interfaces.NodePhaseTimedOut:
return StatusFailed(&core.ExecutionError{
Kind: core.ExecutionError_USER,
Code: "TimedOut",
Message: "FailureNode Timed-out"}), nil
}

logger.Infof(ctx, "test2")

if state.PartiallyComplete() {
case interfaces.NodePhaseQueued:
fallthrough
case interfaces.NodePhaseRunning:
fallthrough
case interfaces.NodePhaseSuccess:
// Re-enqueue the workflow
c.enqueueWorkflow(w.GetK8sWorkflowID().String())
return StatusFailureNode(execErr), nil
}

logger.Infof(ctx, "test2")

// If the failure node finished executing, transition to failed.
return StatusFailed(execErr), nil
}
Expand Down

0 comments on commit 5d0c2b2

Please sign in to comment.