Skip to content

Commit

Permalink
fix: linting
Browse files Browse the repository at this point in the history
Signed-off-by: isubasinghe <[email protected]>
  • Loading branch information
isubasinghe committed Oct 14, 2024
1 parent 4abd0f4 commit 41c2644
Showing 1 changed file with 7 additions and 79 deletions.
86 changes: 7 additions & 79 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,48 +788,6 @@ func deletePodNodeDuringRetryWorkflow(wf *wfv1.Workflow, node wfv1.NodeStatus, d
return deletedPods, podsToDelete
}

func containsNode(nodes []string, node string) bool {
for _, e := range nodes {
if e == node {
return true
}
}
return false
}

func isGroupNode(node wfv1.NodeStatus) bool {
return node.Type == wfv1.NodeTypeDAG || node.Type == wfv1.NodeTypeTaskGroup || node.Type == wfv1.NodeTypeStepGroup || node.Type == wfv1.NodeTypeSteps
}

func resetConnectedParentGroupNodes(oldWF *wfv1.Workflow, newWF *wfv1.Workflow, currentNode wfv1.NodeStatus, resetParentGroupNodes []string) (*wfv1.Workflow, []string) {
currentNodeID := currentNode.ID
for {
currentNode, err := oldWF.Status.Nodes.Get(currentNodeID)
if err != nil {
log.Panicf("dying due to inability to obtain node for %s, panicking", currentNodeID)
}
if !containsNode(resetParentGroupNodes, currentNodeID) {
newWF.Status.Nodes.Set(currentNodeID, resetNode(*currentNode.DeepCopy()))
resetParentGroupNodes = append(resetParentGroupNodes, currentNodeID)
log.Debugf("Reset connected group node %s", currentNode.Name)
}
if currentNode.BoundaryID != "" && currentNode.BoundaryID != oldWF.ObjectMeta.Name {
parentNode, err := oldWF.Status.Nodes.Get(currentNode.BoundaryID)
if err != nil {
log.Panicf("unable to obtain node for %s, panicking", currentNode.BoundaryID)
}
if isGroupNode(*parentNode) {
currentNodeID = parentNode.ID
} else {
break
}
} else {
break
}
}
return newWF, resetParentGroupNodes
}

func createNewRetryWorkflow(wf *wfv1.Workflow, parameters []string) (*wfv1.Workflow, error) {
newWF := wf.DeepCopy()

Expand Down Expand Up @@ -985,39 +943,39 @@ func getChildren(n *node) map[string]bool {
type resetFn func(string, bool)
type deleteFn func(string, bool)

func consumeTill(n *node, nodeType wfv1.NodeType, resetFunc resetFn, addToID bool) (*node, error) {
func consumeTill(n *node, nodeType wfv1.NodeType, resetFunc resetFn) (*node, error) {
curr := n
for {
if curr == nil {
return nil, fmt.Errorf("expected type %s but ran out of nodes to explore", nodeType)
}

if curr.n.Type == nodeType {
resetFunc(curr.n.ID, addToID)
resetFunc(curr.n.ID, true)
return curr, nil
}
curr = curr.parent
}
}

func consumeStepGroup(n *node, resetFunc resetFn) (*node, error) {
return consumeTill(n, wfv1.NodeTypeStepGroup, resetFunc, true)
return consumeTill(n, wfv1.NodeTypeStepGroup, resetFunc)
}

func consumeSteps(n *node, resetFunc resetFn) (*node, error) {
return consumeTill(n, wfv1.NodeTypeSteps, resetFunc, true)
return consumeTill(n, wfv1.NodeTypeSteps, resetFunc)
}
func consumeTaskGroup(n *node, resetFunc resetFn) (*node, error) {
return consumeTill(n, wfv1.NodeTypeTaskGroup, resetFunc, true)
return consumeTill(n, wfv1.NodeTypeTaskGroup, resetFunc)
}

func consumeDAG(n *node, resetFunc resetFn) (*node, error) {
return consumeTill(n, wfv1.NodeTypeDAG, resetFunc, true)
return consumeTill(n, wfv1.NodeTypeDAG, resetFunc)
}

func consumePod(n *node, resetFunc resetFn, addToDelete deleteFn) (*node, error) {
// this sets to reset but resets are overriden by deletes in the final FormulateRetryWorkflow logic.
curr, err := consumeTill(n, wfv1.NodeTypePod, resetFunc, true)
curr, err := consumeTill(n, wfv1.NodeTypePod, resetFunc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1401,36 +1359,6 @@ func getNodeIDsToResetNoChildren(restartSuccessful bool, nodeFieldSelector strin

return nodeIDsToReset, nil
}
func getNodeIDsToReset(restartSuccessful bool, nodeFieldSelector string, nodes wfv1.Nodes) (map[string]bool, error) {
nodeIDsToReset := make(map[string]bool)
if !restartSuccessful || len(nodeFieldSelector) == 0 {
return nodeIDsToReset, nil
}

selector, err := fields.ParseSelector(nodeFieldSelector)
if err != nil {
return nil, err
} else {
for _, node := range nodes {
if SelectorMatchesNode(selector, node) {
// traverse all children of the node
var queue []string
queue = append(queue, node.ID)

for len(queue) > 0 {
childNode := queue[0]
// if the child isn't already in nodeIDsToReset then we add it and traverse its children
if _, present := nodeIDsToReset[childNode]; !present {
nodeIDsToReset[childNode] = true
queue = append(queue, nodes[childNode].Children...)
}
queue = queue[1:]
}
}
}
}
return nodeIDsToReset, nil
}

var errSuspendedCompletedWorkflow = errors.Errorf(errors.CodeBadRequest, "cannot suspend completed workflows")

Expand Down

0 comments on commit 41c2644

Please sign in to comment.