Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

only update etcd when preempted #633

Merged
merged 5 commits into from
Oct 10, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 34 additions & 26 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func NewJobController(restConfig *rest.Config, mcadConfig *config.MCADConfigurat
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *arbv1.AppWrapper:
klog.V(10).Infof("[Informer] Filter Name=%s Namespace=%s Version=%s Local=%t FilterIgnore=%t Sender=%s &qj=%p qj=%+v", t.Name, t.Namespace, t.ResourceVersion, t.Status.Local, t.Status.FilterIgnore, t.Status.Sender, t, t)
klog.V(10).Infof("[Informer] Filter Name=%s Namespace=%s Version=%s Local=%t FilterIgnore=%t Sender=%s ", t.Name, t.Namespace, t.ResourceVersion, t.Status.Local, t.Status.FilterIgnore, t.Status.Sender)
// todo: This is a current workaround for duplicate message bug.
// if t.Status.Local == true { // ignore duplicate message from cache
// return false
Expand Down Expand Up @@ -345,6 +345,7 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) {
newjob.Status.CanRun = false
newjob.Status.FilterIgnore = true // update QueueJobState only
cleanAppWrapper := false
generatedCondition := false
// If dispatch deadline is exceeded no matter what the state of AW, kill the job and set status as Failed.
if (newjob.Status.State == arbv1.AppWrapperStateActive) && (newjob.Spec.SchedSpec.DispatchDuration.Limit > 0) {
if newjob.Spec.SchedSpec.DispatchDuration.Overrun {
Expand All @@ -370,6 +371,7 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) {
}
// cannot use cleanup AW, since it puts AW back in running state
qjm.qjqueue.AddUnschedulableIfNotPresent(updateNewJob)
generatedCondition = true

}
}
Expand Down Expand Up @@ -412,7 +414,8 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) {
}

updateNewJob = newjob.DeepCopy()
} else {
generatedCondition = true
} else if newjob.Status.Running == 0 && newjob.Status.Succeeded == 0 && newjob.Status.State == arbv1.AppWrapperStateActive {
asm582 marked this conversation as resolved.
Show resolved Hide resolved
// If pods failed scheduling generate new preempt condition
message = fmt.Sprintf("Pods failed scheduling failed=%v, running=%v.", len(newjob.Status.PendingPodConditions), newjob.Status.Running)
index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondPreemptCandidate, "PodsFailedScheduling")
Expand All @@ -427,22 +430,25 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) {
}

updateNewJob = newjob.DeepCopy()
generatedCondition = true
}

err = qjm.updateStatusInEtcdWithRetry(ctx, updateNewJob, "PreemptQueueJobs - CanRun: false -- MinPodsNotRunning")
if err != nil {
klog.Warningf("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v", newjob.Namespace, newjob.Name, err)
return
}
if generatedCondition {
err = qjm.updateStatusInEtcdWithRetry(ctx, updateNewJob, "PreemptQueueJobs - CanRun: false -- MinPodsNotRunning")
if err != nil {
klog.Warningf("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v", newjob.Namespace, newjob.Name, err)
return
}

if cleanAppWrapper {
klog.V(4).Infof("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded.", newjob.Namespace, newjob.Name)
go qjm.Cleanup(ctx, updateNewJob)
} else {
// Only back-off AWs that are in state running and not in state Failed
if updateNewJob.Status.State != arbv1.AppWrapperStateFailed {
klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", newjob.Namespace, newjob.Name)
qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message))
if cleanAppWrapper {
klog.V(4).Infof("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded.", newjob.Namespace, newjob.Name)
go qjm.Cleanup(ctx, updateNewJob)
} else {
// Only back-off AWs that are in state running and not in state Failed
if updateNewJob.Status.State != arbv1.AppWrapperStateFailed {
klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", newjob.Namespace, newjob.Name)
qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message))
}
}
}
}
Expand Down Expand Up @@ -1362,7 +1368,7 @@ func (cc *XController) updateStatusInEtcd(ctx context.Context, currentAppwrapper
func (cc *XController) updateStatusInEtcdWithRetry(ctx context.Context, source *arbv1.AppWrapper, caller string) error {
klog.V(4).Infof("[updateStatusInEtcdWithMergeFunction] trying to update '%s/%s' version '%s' called by '%s'", source.Namespace, source.Name, source.ResourceVersion, caller)
source.Status.Sender = "before " + caller // set Sender string to indicate code location
updateStatusRetrierRetrier := retrier.New(retrier.ExponentialBackoff(10, 100*time.Millisecond), &EtcdErrorClassifier{})
updateStatusRetrierRetrier := retrier.New(retrier.ExponentialBackoff(1, 100*time.Millisecond), &EtcdErrorClassifier{})
updateStatusRetrierRetrier.SetJitter(0.05)
updatedAW := source.DeepCopy()
err := updateStatusRetrierRetrier.RunCtx(ctx, func(localContext context.Context) error {
Expand Down Expand Up @@ -1559,10 +1565,10 @@ func (cc *XController) addQueueJob(obj interface{}) {
firstTime := metav1.NowMicro()
qj, ok := obj.(*arbv1.AppWrapper)
if !ok {
klog.Errorf("[Informer-addQJ] object is not AppWrapper. object=%+v", obj)
klog.Errorf("[Informer-addQJ] object is not AppWrapper.")
return
}
klog.V(6).Infof("[Informer-addQJ] %s/%s &qj=%p qj=%+v", qj.Namespace, qj.Name, qj, qj)
klog.V(6).Infof("[Informer-addQJ] %s/%s", qj.Namespace, qj.Name)
if qj.Status.QueueJobState == "" {
qj.Status.ControllerFirstTimestamp = firstTime
qj.Status.SystemPriority = float64(qj.Spec.Priority)
Expand Down Expand Up @@ -1597,18 +1603,19 @@ func (cc *XController) addQueueJob(obj interface{}) {
// updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate
// on stale AWs. This has potential to improve performance at scale.
if hasCompletionStatus {
requeueInterval := 5 * time.Second
requeueIntervalForCompletionStatus := 5 * time.Second
key, err := cache.MetaNamespaceKeyFunc(qj)
if err != nil {
klog.Warningf("[Informer-addQJ] Error getting AW %s/%s from cache cannot determine completion status", qj.Namespace, qj.Name)
// TODO: should we return from this loop?
}
go func() {
for {
time.Sleep(requeueInterval)
time.Sleep(requeueIntervalForCompletionStatus)
latestObj, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key)
if err != nil && !exists {
klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache", qj.Namespace, qj.Name)
if err != nil || !exists {
klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache,stopping check for completion status", qj.Namespace, qj.Name)
break
} else {
var latestAw *arbv1.AppWrapper
if latestObj != nil {
Expand Down Expand Up @@ -1643,8 +1650,9 @@ func (cc *XController) addQueueJob(obj interface{}) {
for {
time.Sleep(requeueInterval)
latestObj, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key)
if err != nil && !exists {
klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache", qj.Namespace, qj.Name)
if err != nil || !exists {
klog.Warningf("[Informer-addQJ] Recent copy of AW %s/%s not found in cache, stopping check for minScheduling", qj.Namespace, qj.Name)
break
} else {
var latestAw *arbv1.AppWrapper
if latestObj != nil {
Expand Down Expand Up @@ -1752,9 +1760,9 @@ func (cc *XController) enqueue(obj interface{}) error {

err := cc.eventQueue.Add(qj) // add to FIFO queue if not in, update object & keep position if already in FIFO queue
if err != nil {
klog.Errorf("[enqueue] Fail to enqueue %s/%s to eventQueue, ignore. *Delay=%.6f seconds &qj=%p Version=%s Status=%+v err=%#v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj, qj.ResourceVersion, qj.Status, err)
klog.Errorf("[enqueue] Fail to enqueue %s/%s to eventQueue, ignore. *Delay=%.6f seconds Version=%s Status=%+v err=%#v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.ResourceVersion, qj.Status, err)
} else {
klog.V(10).Infof("[enqueue] %s/%s *Delay=%.6f seconds eventQueue.Add_byEnqueue &qj=%p Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj, qj.ResourceVersion, qj.Status)
klog.V(10).Infof("[enqueue] %s/%s *Delay=%.6f seconds eventQueue.Add_byEnqueue Version=%s Status=%+v", qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.ResourceVersion, qj.Status)
}
return err
}
Expand Down