Skip to content

Commit

Permalink
Merge pull request #3649 from lowang-bh/fixEnableErrCache
Browse files Browse the repository at this point in the history
don't enable error cache if task role spec is empty
  • Loading branch information
volcano-sh-bot authored Sep 18, 2024
2 parents 9e31b06 + 205eec5 commit 0843c0d
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 11 deletions.
15 changes: 13 additions & 2 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@ import (

type Action struct {
session *framework.Session
// configured flag for error cache
enablePredicateErrorCache bool
}

func New() *Action {
return &Action{}
return &Action{
enablePredicateErrorCache: true, // default to enable it
}
}

func (alloc *Action) Name() string {
Expand All @@ -43,10 +47,17 @@ func (alloc *Action) Name() string {

func (alloc *Action) Initialize() {}

func (alloc *Action) parseArguments(ssn *framework.Session) {
arguments := framework.GetArgOfActionFromConf(ssn.Configurations, alloc.Name())
arguments.GetBool(&alloc.enablePredicateErrorCache, conf.EnablePredicateErrCacheKey)
}

func (alloc *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Allocate ...")
defer klog.V(5).Infof("Leaving Allocate ...")

alloc.parseArguments(ssn)

// the allocation for pod may have many stages
// 1. pick a queue named Q (using ssn.QueueOrderFn)
// 2. pick a job named J from Q (using ssn.JobOrderFn)
Expand Down Expand Up @@ -204,7 +215,7 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
break
}

predicateNodes, fitErrors := ph.PredicateNodes(task, allNodes, alloc.predicate, true)
predicateNodes, fitErrors := ph.PredicateNodes(task, allNodes, alloc.predicate, alloc.enablePredicateErrorCache)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
// Assume that all left tasks are allocatable, but can not meet gang-scheduling min member,
Expand Down
11 changes: 11 additions & 0 deletions pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"

"github.com/agiledragon/gomonkey/v2"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -53,6 +54,16 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}

func TestParseArgs(t *testing.T) {
test := uthelper.TestCommonStruct{Name: "set cache false"}

action := New()
test.RegisterSession(nil, []conf.Configuration{{Name: action.Name(),
Arguments: map[string]interface{}{conf.EnablePredicateErrCacheKey: false}}})
test.Run([]framework.Action{action})
assert.False(t, action.enablePredicateErrorCache)
}

func TestAllocate(t *testing.T) {
plugins := map[string]framework.PluginBuilder{
drf.PluginName: drf.New,
Expand Down
18 changes: 15 additions & 3 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@ import (
"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/metrics"
"volcano.sh/volcano/pkg/scheduler/util"
)

type Action struct{}
type Action struct {
enablePredicateErrorCache bool
}

func New() *Action {
return &Action{}
return &Action{
enablePredicateErrorCache: true, // default to enable it
}
}

func (backfill *Action) Name() string {
Expand All @@ -39,10 +44,17 @@ func (backfill *Action) Name() string {

func (backfill *Action) Initialize() {}

func (backfill *Action) parseArguments(ssn *framework.Session) {
arguments := framework.GetArgOfActionFromConf(ssn.Configurations, backfill.Name())
arguments.GetBool(&backfill.enablePredicateErrorCache, conf.EnablePredicateErrCacheKey)
}

func (backfill *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Backfill ...")
defer klog.V(5).Infof("Leaving Backfill ...")

backfill.parseArguments(ssn)

predicateFunc := ssn.PredicateForAllocateAction

// TODO (k82cn): When backfill, it's also need to balance between Queues.
Expand All @@ -62,7 +74,7 @@ func (backfill *Action) Execute(ssn *framework.Session) {
break
}

predicateNodes, fitErrors := ph.PredicateNodes(task, ssn.NodeList, predicateFunc, true)
predicateNodes, fitErrors := ph.PredicateNodes(task, ssn.NodeList, predicateFunc, backfill.enablePredicateErrorCache)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
break
Expand Down
24 changes: 18 additions & 6 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@ import (
"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/metrics"
"volcano.sh/volcano/pkg/scheduler/util"
)

type Action struct{}
type Action struct {
enablePredicateErrorCache bool
}

func New() *Action {
return &Action{}
return &Action{
enablePredicateErrorCache: true,
}
}

func (pmpt *Action) Name() string {
Expand All @@ -39,10 +44,17 @@ func (pmpt *Action) Name() string {

func (pmpt *Action) Initialize() {}

func (pmpt *Action) parseArguments(ssn *framework.Session) {
arguments := framework.GetArgOfActionFromConf(ssn.Configurations, pmpt.Name())
arguments.GetBool(&pmpt.enablePredicateErrorCache, conf.EnablePredicateErrCacheKey)
}

func (pmpt *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Preempt ...")
defer klog.V(5).Infof("Leaving Preempt ...")

pmpt.parseArguments(ssn)

preemptorsMap := map[api.QueueID]*util.PriorityQueue{}
preemptorTasks := map[api.JobID]*util.PriorityQueue{}

Expand Down Expand Up @@ -116,7 +128,7 @@ func (pmpt *Action) Execute(ssn *framework.Session) {

preemptor := preemptorTasks[preemptorJob.UID].Pop().(*api.TaskInfo)

assigned, err = preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
assigned, err = pmpt.preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
// Ignore non running task.
if !api.PreemptableStatus(task.Status) {
return false
Expand Down Expand Up @@ -176,7 +188,7 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
preemptor := preemptorTasks[job.UID].Pop().(*api.TaskInfo)

stmt := framework.NewStatement(ssn)
assigned, err := preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
assigned, err := pmpt.preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
// Ignore non running task.
if !api.PreemptableStatus(task.Status) {
return false
Expand Down Expand Up @@ -212,7 +224,7 @@ func (pmpt *Action) Execute(ssn *framework.Session) {

func (pmpt *Action) UnInitialize() {}

func preempt(
func (pmpt *Action) preempt(
ssn *framework.Session,
stmt *framework.Statement,
preemptor *api.TaskInfo,
Expand All @@ -228,7 +240,7 @@ func preempt(
predicateFn := ssn.PredicateForPreemptAction
// we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action
allNodes := ssn.GetUnschedulableAndUnresolvableNodesForTask(preemptor)
predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true)
predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, pmpt.enablePredicateErrorCache)

nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

Expand Down
6 changes: 6 additions & 0 deletions pkg/scheduler/conf/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package conf

const (
// EnablePredicateErrCacheKey is the key whether predicate error cache is enabled
EnablePredicateErrCacheKey = "predicateErrorCacheEnable"
)
7 changes: 7 additions & 0 deletions pkg/scheduler/util/predicate_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeI
var errorLock sync.RWMutex
fe := api.NewFitErrors()

// don't enable error cache if task's TaskRole is empty, because different pods with empty TaskRole will all
// have the same taskGroupID, and one pod predicate failed, all other pods will also be failed
// see issue: https://github.com/volcano-sh/volcano/issues/3527
if len(task.TaskRole) == 0 {
enableErrorCache = false
}

allNodes := len(nodes)
if allNodes == 0 {
return make([]*api.NodeInfo, 0), fe
Expand Down

0 comments on commit 0843c0d

Please sign in to comment.