Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forceful deletion of remaining pods after cleanup #670

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
13 changes: 13 additions & 0 deletions config/crd/bases/workload.codeflare.dev_appwrappers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,13 @@ spec:
check should happen, and if requeuing has reached its maximum
number of times.
properties:
forceDeletionTimeInSeconds:
default: 0
description: Enable forceful deletion of generic items and
pods with the AppWrapper label after specified seconds.
This may be necesary to prevent redeployment of generic
items that create pods that were not correctly deleted.
type: integer
growthType:
default: exponential
description: Growth strategy to increase the waiting time
Expand Down Expand Up @@ -192,6 +199,12 @@ spec:
description: Field to keep track of how many times a requeuing
event has been triggered.
type: integer
pauseTimeInSeconds:
default: 0
description: When a job is ready to be redispatched after
it has been requeued due to preemption, MCAD will wait 'pauseTimeInSeconds'
before redispatching
type: integer
timeInSeconds:
default: 300
description: Initial waiting time before requeuing conditions
Expand Down
13 changes: 13 additions & 0 deletions config/crd/bases/workload.codeflare.dev_schedulingspecs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ spec:
time. Values in this field control how often the pod check should
happen, and if requeuing has reached its maximum number of times.
properties:
forceDeletionTimeInSeconds:
default: 0
description: Enable forceful deletion of generic items and pods
with the AppWrapper label after specified seconds. This may
be necesary to prevent redeployment of generic items that create
pods that were not correctly deleted.
type: integer
growthType:
default: exponential
description: Growth strategy to increase the waiting time between
Expand Down Expand Up @@ -88,6 +95,12 @@ spec:
description: Field to keep track of how many times a requeuing
event has been triggered.
type: integer
pauseTimeInSeconds:
default: 0
description: When a job is ready to be redispatched after it has
been requeued due to preemption, MCAD will wait 'pauseTimeInSeconds'
before redispatching
type: integer
timeInSeconds:
default: 300
description: Initial waiting time before requeuing conditions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,13 @@ spec:
check should happen, and if requeuing has reached its maximum
number of times.
properties:
forceDeletionTimeInSeconds:
default: 0
description: Enable forceful deletion of generic items and
pods with the AppWrapper label after specified seconds.
This may be necesary to prevent redeployment of generic
items that create pods that were not correctly deleted.
type: integer
growthType:
default: exponential
description: Growth strategy to increase the waiting time
Expand Down Expand Up @@ -192,6 +199,12 @@ spec:
description: Field to keep track of how many times a requeuing
event has been triggered.
type: integer
pauseTimeInSeconds:
default: 0
description: When a job is ready to be redispatched after
it has been requeued due to preemption, MCAD will wait 'pauseTimeInSeconds'
before redispatching
type: integer
timeInSeconds:
default: 300
description: Initial waiting time before requeuing conditions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ spec:
time. Values in this field control how often the pod check should
happen, and if requeuing has reached its maximum number of times.
properties:
forceDeletionTimeInSeconds:
default: 0
description: Enable forceful deletion of generic items and pods
with the AppWrapper label after specified seconds. This may
be necesary to prevent redeployment of generic items that create
pods that were not correctly deleted.
type: integer
growthType:
default: exponential
description: Growth strategy to increase the waiting time between
Expand Down Expand Up @@ -88,6 +95,12 @@ spec:
description: Field to keep track of how many times a requeuing
event has been triggered.
type: integer
pauseTimeInSeconds:
default: 0
description: When a job is ready to be redispatched after it has
been requeued due to preemption, MCAD will wait 'pauseTimeInSeconds'
before redispatching
type: integer
timeInSeconds:
default: 300
description: Initial waiting time before requeuing conditions
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/controller/v1beta1/schedulingspec.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ type RequeuingTemplate struct {
// items are stopped and removed from the cluster (AppWrapper remains deployed).
// +kubebuilder:default=0
MaxNumRequeuings int `json:"maxNumRequeuings,omitempty" protobuf:"bytes,6,rep,name=maxNumRequeuings"`
// Enable forceful deletion of generic items and pods with the AppWrapper label after specified seconds.
// This may be necesary to prevent redeployment of generic items that create pods that were not correctly deleted.
// +kubebuilder:default=0
ForceDeletionTimeInSeconds int `json:"forceDeletionTimeInSeconds,omitempty" protobuf:"bytes,7,rep,name=forceDeletionTimeInSeconds"`
// When a job is ready to be redispatched after it has been requeued due to preemption, MCAD will
// wait 'pauseTimeInSeconds' before redispatching
// +kubebuilder:default=0
PauseTimeInSeconds int `json:"pauseTimeInSeconds,omitempty" protobuf:"bytes,8,rep,name=pauseTimeInSeconds"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
111 changes: 99 additions & 12 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,18 @@ func NewJobController(restConfig *rest.Config, mcadConfig *config.MCADConfigurat
// we still need a thread for dispatch duration but minScheduling spec can definetly be moved to an informer
func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) {
ctx := context.Background()
// We need to update AW before analyzing it as a candidate for preemption
updateErr := qjm.UpdateQueueJobStatus(inspectAw)
if updateErr != nil {
klog.Warningf("[PreemptQueueJobs] update of pod count to AW %s/%s failed hence skipping preemption", inspectAw.Namespace, inspectAw.Name)
return
}
// Check if AppWrapper is waiting for deletion to be completed
if inspectAw.Status.QueueJobState == arbv1.AppWrapperCondDeleted {
qjm.enqueue(inspectAw)
return
}
// Check if AppWrapper should be preempted
aw := qjm.GetQueueJobEligibleForPreemption(inspectAw)
if aw != nil {
if aw.Status.State == arbv1.AppWrapperStateCompleted || aw.Status.State == arbv1.AppWrapperStateDeleted || aw.Status.State == arbv1.AppWrapperStateFailed {
Expand All @@ -336,12 +348,6 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) {
klog.Warningf("[PreemptQueueJobs] failed in retrieving a fresh copy of the app wrapper '%s/%s', err=%v. Will try to preempt on the next run.", aw.Namespace, aw.Name, err)
return
}
// we need to update AW before analyzing it as a candidate for preemption
updateErr := qjm.UpdateQueueJobStatus(newjob)
if updateErr != nil {
klog.Warningf("[PreemptQueueJobs] update of pod count to AW %s/%s failed hence skipping preemption", newjob.Namespace, newjob.Name)
return
}
newjob.Status.CanRun = false
newjob.Status.FilterIgnore = true // update QueueJobState only
cleanAppWrapper := false
Expand Down Expand Up @@ -372,7 +378,6 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) {
// cannot use cleanup AW, since it puts AW back in running state
qjm.qjqueue.AddUnschedulableIfNotPresent(updateNewJob)
generatedCondition = true

}
}

Expand Down Expand Up @@ -415,6 +420,7 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) {

updateNewJob = newjob.DeepCopy()
generatedCondition = true

} else if newjob.Status.Running == 0 && newjob.Status.Succeeded == 0 && newjob.Status.State == arbv1.AppWrapperStateActive {
// If pods failed scheduling generate new preempt condition
message = fmt.Sprintf("Pods failed scheduling failed=%v, running=%v.", len(newjob.Status.PendingPodConditions), newjob.Status.Running)
Expand Down Expand Up @@ -1642,9 +1648,7 @@ func (cc *XController) addQueueJob(obj interface{}) {
cc.UpdateQueueJobs(latestAw)
klog.V(2).Infof("[Informer-addQJ] requeing AW %s/%s to determine completion status for AW", qj.Namespace, qj.Name)
}

}

}
}()
}
Expand Down Expand Up @@ -1883,23 +1887,84 @@ func (cc *XController) worker() {
// if there are running resources for this job then delete them because the job was put in
// pending state...

// If this the first time seeing this AW, no need to delete.
// If this is the first time seeing this AW, no need to delete.
stateLen := len(queuejob.Status.State)
if stateLen > 0 {
if stateLen > 0 && queuejob.Status.QueueJobState != arbv1.AppWrapperCondDeleted {
klog.V(2).Infof("[worker] Deleting resources for AppWrapper Job '%s/%s' because it was preempted, status.CanRun=%t, status.State=%s", queuejob.Namespace, queuejob.Name, queuejob.Status.CanRun, queuejob.Status.State)
err00 := cc.Cleanup(ctx, queuejob)
if err00 != nil {
klog.Errorf("[worker] Failed to delete resources for AppWrapper Job '%s/%s', err=%v", queuejob.Namespace, queuejob.Name, err00)
return err00
}
klog.V(2).Infof("[worker] Delete resources for AppWrapper Job '%s/%s' due to preemption was sucessfull, status.CanRun=%t, status.State=%s", queuejob.Namespace, queuejob.Name, queuejob.Status.CanRun, queuejob.Status.State)

if queuejob.Spec.SchedSpec.Requeuing.ForceDeletionTimeInSeconds > 0 || queuejob.Spec.SchedSpec.Requeuing.PauseTimeInSeconds > 0 {
// 1) Waiting for deletion of the AppWrapper to be complete before forcing the deletion of pods
// 2) Delaying redispatching with user specified wait time
var err error
newjob, err := cc.getAppWrapper(queuejob.Namespace, queuejob.Name, "[worker] get fresh AppWrapper")
if err != nil {
klog.Errorf("[worker] Failed getting a new AppWrapper: '%s/%s',Status=%+v, err=%+v.", queuejob.Namespace, queuejob.Name, queuejob.Status, err)
return err
}
newjob.Status.QueueJobState = arbv1.AppWrapperCondDeleted
cc.addOrUpdateCondition(newjob, arbv1.AppWrapperCondDeleted, v1.ConditionTrue, "AwaitingDeletion", "")
index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondDeleted, "AwaitingDeletion") // TODO: addOrUpdateCondition is NOT changing the transition time properly so need to do it here
newjob.Status.Conditions[index].LastTransitionMicroTime = metav1.NowMicro()
newjob.Status.Conditions[index].LastUpdateMicroTime = metav1.NowMicro()
newjob.Status.FilterIgnore = true
err = cc.updateStatusInEtcdWithRetry(ctx, newjob, "AwaitingDeletion")
if err != nil {
klog.Errorf("[worker] Error updating status 'Deleted' for AppWrapper: '%s/%s', status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err)
return err
}
return nil
}
} else if queuejob.Status.QueueJobState == arbv1.AppWrapperCondDeleted {
// Checking of 'AwaitingDeletion' condition exists
index := getIndexOfMatchedCondition(queuejob, arbv1.AppWrapperCondDeleted, "AwaitingDeletion")
if index < 0 {
klog.V(4).Infof("WARNING: [worker] Forced deletion condition was not added after 'Cleanup'. Silently ignoring forced cleanup.")
} else {
// Get current time to compare to
currentTime := time.Now()

// The AppWrapper was preempted and its objects were deleted. In case the deletion was not successful for all the items
// MCAD will force delete any pods that remain in the system
if queuejob.Spec.SchedSpec.Requeuing.ForceDeletionTimeInSeconds > 0 {
forceDeletionTime := queuejob.Status.Conditions[index].LastTransitionMicroTime.Add(time.Duration(queuejob.Spec.SchedSpec.Requeuing.ForceDeletionTimeInSeconds) * time.Second)

if currentTime.After(forceDeletionTime) {
if err := cc.ForcefulCleanup(ctx, queuejob); err != nil {
klog.V(5).Infof("[worker] Forced deletion of remaining live pods didn't work (Ending %s/%s). Retrying in the next cycle.", queuejob.Namespace, queuejob.Name)
return nil
}
} else {
klog.V(8).Infof("[worker] Waiting for 'ForceDeletionTimeInSeconds' seconds before requeueing job '%s/%s'.", queuejob.Namespace, queuejob.Name)
return nil
}
}

// When a job is ready to be redispatched after it has been requeued due to preemption, MCAD will wait 'pauseTimeInSeconds' before redispatching
if queuejob.Spec.SchedSpec.Requeuing.PauseTimeInSeconds > 0 {
redispatchingTime := queuejob.Status.Conditions[index].LastTransitionMicroTime.Add(time.Duration(queuejob.Spec.SchedSpec.Requeuing.PauseTimeInSeconds) * time.Second)

if currentTime.After(redispatchingTime) {
klog.V(5).Infof("[worker] Ready to redispatch the AppWrapper (Ending %s/%s).", queuejob.Namespace, queuejob.Name)
} else {
klog.V(8).Infof("[worker] Waiting for 'PauseTimeInSeconds' seconds before redispatching job '%s/%s'.", queuejob.Namespace, queuejob.Name)
return nil
}
}
}
}

// Preparing job for redispatching
queuejob.Status.State = arbv1.AppWrapperStateEnqueued
queuejob.Status.QueueJobState = arbv1.AppWrapperCondQueueing
klog.V(10).Infof("[worker] before add to activeQ %s/%s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", queuejob.Namespace, queuejob.Name, cc.qjqueue.IfExistActiveQ(queuejob), cc.qjqueue.IfExistUnschedulableQ(queuejob), queuejob, queuejob.ResourceVersion, queuejob.Status)
index := getIndexOfMatchedCondition(queuejob, arbv1.AppWrapperCondQueueing, "AwaitingHeadOfLine")
if index < 0 {
queuejob.Status.QueueJobState = arbv1.AppWrapperCondQueueing
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondQueueing, v1.ConditionTrue, "AwaitingHeadOfLine", "")
queuejob.Status.Conditions = append(queuejob.Status.Conditions, cond)
} else {
Expand Down Expand Up @@ -2201,6 +2266,28 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper,
}

// Cleanup function
func (qjm *XController) ForcefulCleanup(ctx context.Context, appwrapper *arbv1.AppWrapper) error {
labelSelector := fmt.Sprintf("%s=%s", "appwrapper.mcad.ibm.com", appwrapper.Name)
pods, getPodsErr := qjm.clients.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector})

if getPodsErr != nil {
klog.V(5).Infof("[ForcefulCleanup] Listing pods with label '%s' was not successful", labelSelector)
return getPodsErr
}

for _, pod := range pods.Items {
klog.V(3).Infof("[ForcefulCleanup] Forcibly deleting long-terminating pod='%s/%s'", pod.Namespace, pod.Name)
zero := int64(0)
delPodErr := qjm.clients.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &zero})
if delPodErr != nil {
klog.V(3).Infof("[ForcefulCleanup] Couldn't forcibly delete long-terminating pod='%s/%s'", pod.Namespace, pod.Name)
return delPodErr
}
}

return nil
}

func (cc *XController) Cleanup(ctx context.Context, appwrapper *arbv1.AppWrapper) error {
klog.V(3).Infof("[Cleanup] begin AppWrapper '%s/%s' Version=%s", appwrapper.Namespace, appwrapper.Name, appwrapper.ResourceVersion)
var err *multierror.Error
Expand Down