Skip to content

Commit

Permalink
Move checkedNodes check out of queuing algorithms (#9189)
Browse files Browse the repository at this point in the history
  • Loading branch information
chencs authored Sep 3, 2024
1 parent 41edf3f commit ef8d30a
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 23 deletions.
8 changes: 7 additions & 1 deletion pkg/scheduler/queue/multi_queuing_algorithm_tree_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,13 @@ func (n *Node) dequeue() (QueuePath, any) {
// we can't dequeue a value from an empty node; return early
return path, nil
}
dequeueNode, checkedAllNodes = n.queuingAlgorithm.dequeueSelectNode(n)
if n.isLeaf() {
checkedAllNodes = n.childrenChecked == 1
} else {
checkedAllNodes = n.childrenChecked == len(n.queueMap)
}

dequeueNode = n.queuingAlgorithm.dequeueSelectNode(n)
switch dequeueNode {
case n:
if n.isLeaf() {
Expand Down
14 changes: 6 additions & 8 deletions pkg/scheduler/queue/tenant_querier_assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,14 +445,12 @@ func (tqa *tenantQuerierAssignments) shuffleTenantQueriers(tenantID TenantID, sc
//
// Note that because we use the shared tenantIDOrder and tenantOrderIndex to manage the queue, we functionally
// ignore each Node's individual queueOrder and queuePosition.
func (tqa *tenantQuerierAssignments) dequeueSelectNode(node *Node) (*Node, bool) {
func (tqa *tenantQuerierAssignments) dequeueSelectNode(node *Node) *Node {
// can't get a tenant if no querier set
if tqa.currentQuerier == "" {
return nil, true
return nil
}

checkedAllNodes := node.childrenChecked == len(node.queueMap)

// advance queue position for dequeue
tqa.tenantOrderIndex++
if tqa.tenantOrderIndex >= len(tqa.tenantIDOrder) {
Expand All @@ -461,7 +459,7 @@ func (tqa *tenantQuerierAssignments) dequeueSelectNode(node *Node) (*Node, bool)

// no children or local queue reached
if len(node.queueMap) == 0 || tqa.tenantOrderIndex == localQueueIndex {
return node, checkedAllNodes
return node
}

checkIndex := tqa.tenantOrderIndex
Expand All @@ -487,18 +485,18 @@ func (tqa *tenantQuerierAssignments) dequeueSelectNode(node *Node) (*Node, bool)
// if the tenant-querier set is nil, any querier can serve this tenant
if tqa.tenantQuerierIDs[tenantID] == nil {
tqa.tenantOrderIndex = checkIndex
return node.queueMap[tenantName], checkedAllNodes
return node.queueMap[tenantName]
}
// otherwise, check if the querier is assigned to this tenant
if tenantQuerierSet, ok := tqa.tenantQuerierIDs[tenantID]; ok {
if _, ok := tenantQuerierSet[tqa.currentQuerier]; ok {
tqa.tenantOrderIndex = checkIndex
return node.queueMap[tenantName], checkedAllNodes
return node.queueMap[tenantName]
}
}
checkIndex++
}
return nil, checkedAllNodes
return nil
}

// dequeueUpdateState deletes the dequeued-from node from the following locations if it is empty:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ func (qa *QuerierWorkerQueuePriorityAlgo) wrapCurrentNodeOrderIndex(increment bo
}
}

func (qa *QuerierWorkerQueuePriorityAlgo) checkedAllNodes(n *Node) bool {
return n.childrenChecked == len(n.queueMap)
}

func (qa *QuerierWorkerQueuePriorityAlgo) addChildNode(parent, child *Node) {
// add child node to its parent's queueMap
parent.queueMap[child.Name()] = child
Expand All @@ -115,13 +111,12 @@ func (qa *QuerierWorkerQueuePriorityAlgo) addChildNode(parent, child *Node) {
qa.nodeCounts[child.Name()]++
}

func (qa *QuerierWorkerQueuePriorityAlgo) dequeueSelectNode(node *Node) (*Node, bool) {
func (qa *QuerierWorkerQueuePriorityAlgo) dequeueSelectNode(node *Node) *Node {
currentNodeName := qa.nodeOrder[qa.currentNodeOrderIndex]
checkedAllNodes := qa.checkedAllNodes(node)
if childNode, ok := node.queueMap[currentNodeName]; ok {
return childNode, checkedAllNodes
return childNode
}
return nil, checkedAllNodes
return nil
}

func (qa *QuerierWorkerQueuePriorityAlgo) dequeueUpdateState(node *Node, dequeuedFrom *Node) {
Expand Down
11 changes: 5 additions & 6 deletions pkg/scheduler/queue/tree_queueing_algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type QueuingAlgorithm interface {
// The Tree uses the bool returned to determine when no dequeue-able value can be found at this child. If all
// children in the subtree have been checked, but no dequeue-able value is found, the Tree traverses to the next
// child in the given node's order. dequeueSelectNode does *not* update any Node fields.
dequeueSelectNode(node *Node) (*Node, bool)
dequeueSelectNode(node *Node) *Node

// dequeueUpdateState is called after we have finished dequeuing from a node. When a child is left empty after the
// dequeue operation, dequeueUpdateState should perform cleanup by deleting that child from the Node and update
Expand Down Expand Up @@ -58,17 +58,16 @@ func (rrs *roundRobinState) addChildNode(parent, child *Node) {

// dequeueSelectNode returns the node at the node's queuePosition. queuePosition represents the position of
// the next node to dequeue from, and is incremented in dequeueUpdateState.
func (rrs *roundRobinState) dequeueSelectNode(node *Node) (*Node, bool) {
checkedAllNodes := node.childrenChecked == len(node.queueOrder)+1 // must check local queue as well
func (rrs *roundRobinState) dequeueSelectNode(node *Node) *Node {
if node.queuePosition == localQueueIndex {
return node, checkedAllNodes
return node
}

currentNodeName := node.queueOrder[node.queuePosition]
if node, ok := node.queueMap[currentNodeName]; ok {
return node, checkedAllNodes
return node
}
return nil, checkedAllNodes
return nil
}

// dequeueUpdateState does the following:
Expand Down

0 comments on commit ef8d30a

Please sign in to comment.