Skip to content
This repository has been archived by the owner on Oct 6, 2023. It is now read-only.

Commit

Permalink
Merge pull request #43 from everpeace/add-event-threshold-too-small
Browse files Browse the repository at this point in the history
Publish an pod event when the pod resource requests exceed throrttle's threshold for warning users
  • Loading branch information
everpeace authored Jan 12, 2022
2 parents 09c1310 + 0949469 commit a11b2fd
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,4 @@ e2e: fmt lint e2e-setup
e2e-debug: fmt lint e2e-setup
GOMEGA_DEFAULT_EVENTUALLY_TIMEOUT=$(E2E_GOMEGA_DEFAULT_EVENTUALLY_TIMEOUT) \
GOMEGA_DEFAULT_CONSISTENTLY_DURATION=$(E2E_GOMEGA_DEFAULT_CONSISTENTLY_DURATION) \
dlv test --headless --listen=0.0.0.0:2345 --api-version=2 --log ./test/integration -- --kubeconfig=$(E2E_KIND_KUBECNOFIG)
dlv test --headless --listen=0.0.0.0:2345 --api-version=2 --log ./test/integration -- --kubeconfig=$(E2E_KIND_KUBECNOFIG) --pause-image=$(E2E_PAUSE_IMAGE)
12 changes: 8 additions & 4 deletions pkg/apis/schedule/v1alpha1/clusterthrottle_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,19 @@ type ClusterThrottleSpec struct {
}

func (thr ClusterThrottle) CheckThrottledFor(pod *corev1.Pod, reservedResourceAmount ResourceAmount, isThrottledOnEqual bool) CheckThrottleStatus {
if thr.Status.Throttled.IsThrottledFor(pod) {
return CheckThrottleStatusActive
}

threshold := thr.Spec.Threshold
if !thr.Status.CalculatedThreshold.CalculatedAt.Time.IsZero() {
threshold = thr.Status.CalculatedThreshold.Threshold
}

if threshold.IsThrottled(ResourceAmountOfPod(pod), false).IsThrottledFor(pod) {
return CheckThrottleStatusPodRequestsExceedsThreshold
}

if thr.Status.Throttled.IsThrottledFor(pod) {
return CheckThrottleStatusActive
}

alreadyUsed := ResourceAmount{}.Add(thr.Status.Used).Add(reservedResourceAmount)
if threshold.IsThrottled(alreadyUsed, isThrottledOnEqual).IsThrottledFor(pod) {
return CheckThrottleStatusActive
Expand Down
19 changes: 12 additions & 7 deletions pkg/apis/schedule/v1alpha1/throttle_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,26 @@ type ThrottleStatus struct {
type CheckThrottleStatus string

var (
CheckThrottleStatusNotThrottled CheckThrottleStatus = "not-throttled"
CheckThrottleStatusActive CheckThrottleStatus = "active"
CheckThrottleStatusInsufficient CheckThrottleStatus = "insufficient"
CheckThrottleStatusNotThrottled CheckThrottleStatus = "not-throttled"
CheckThrottleStatusActive CheckThrottleStatus = "active"
CheckThrottleStatusInsufficient CheckThrottleStatus = "insufficient"
CheckThrottleStatusPodRequestsExceedsThreshold CheckThrottleStatus = "pod-requests-exceeds-threshold"
)

func (thr Throttle) CheckThrottledFor(pod *corev1.Pod, reservedResourceAmount ResourceAmount, isThrottledOnEqual bool) CheckThrottleStatus {
if thr.Status.Throttled.IsThrottledFor(pod) {
return CheckThrottleStatusActive
}

threshold := thr.Spec.Threshold
if !thr.Status.CalculatedThreshold.CalculatedAt.Time.IsZero() {
threshold = thr.Status.CalculatedThreshold.Threshold
}

if threshold.IsThrottled(ResourceAmountOfPod(pod), false).IsThrottledFor(pod) {
return CheckThrottleStatusPodRequestsExceedsThreshold
}

if thr.Status.Throttled.IsThrottledFor(pod) {
return CheckThrottleStatusActive
}

alreadyUsed := ResourceAmount{}.Add(thr.Status.Used).Add(reservedResourceAmount)
if threshold.IsThrottled(alreadyUsed, true).IsThrottledFor(pod) {
return CheckThrottleStatusActive
Expand Down
18 changes: 15 additions & 3 deletions pkg/controllers/clusterthrottle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,14 +377,24 @@ func (c *ClusterThrottleController) UnReserveOnClusterThrottle(pod *v1.Pod, thr
return removed
}

func (c *ClusterThrottleController) CheckThrottled(pod *v1.Pod, isThrottledOnEqual bool) ([]schedulev1alpha1.ClusterThrottle, []schedulev1alpha1.ClusterThrottle, []schedulev1alpha1.ClusterThrottle, error) {
func (c *ClusterThrottleController) CheckThrottled(
pod *v1.Pod,
isThrottledOnEqual bool,
) (
[]schedulev1alpha1.ClusterThrottle,
[]schedulev1alpha1.ClusterThrottle,
[]schedulev1alpha1.ClusterThrottle,
[]schedulev1alpha1.ClusterThrottle,
error,
) {
throttles, err := c.affectedClusterThrottles(pod)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
affected := []schedulev1alpha1.ClusterThrottle{}
alreadyThrottled := []schedulev1alpha1.ClusterThrottle{}
insufficient := []schedulev1alpha1.ClusterThrottle{}
podRequestsExceedsThreshold := []schedulev1alpha1.ClusterThrottle{}
for _, thr := range throttles {
affected = append(affected, *thr)
reservedAmt, reservedPodNNs := c.cache.reservedResourceAmount(types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name})
Expand All @@ -409,9 +419,11 @@ func (c *ClusterThrottleController) CheckThrottled(pod *v1.Pod, isThrottledOnEqu
alreadyThrottled = append(alreadyThrottled, *thr)
case schedulev1alpha1.CheckThrottleStatusInsufficient:
insufficient = append(insufficient, *thr)
case schedulev1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold:
podRequestsExceedsThreshold = append(podRequestsExceedsThreshold, *thr)
}
}
return alreadyThrottled, insufficient, affected, nil
return alreadyThrottled, insufficient, podRequestsExceedsThreshold, affected, nil
}

func (c *ClusterThrottleController) setupEventHandler() {
Expand Down
19 changes: 16 additions & 3 deletions pkg/controllers/throttle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,25 @@ func (c *ThrottleController) UnReserveOnThrottle(pod *v1.Pod, thr *schedulev1alp
return removed
}

func (c *ThrottleController) CheckThrottled(pod *v1.Pod, isThrottledOnEqual bool) ([]schedulev1alpha1.Throttle, []schedulev1alpha1.Throttle, []schedulev1alpha1.Throttle, error) {
func (c *ThrottleController) CheckThrottled(
pod *v1.Pod,
isThrottledOnEqual bool,
) (
[]schedulev1alpha1.Throttle,
[]schedulev1alpha1.Throttle,
[]schedulev1alpha1.Throttle,
[]schedulev1alpha1.Throttle,
error,
) {
throttles, err := c.affectedThrottles(pod)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
affected := []schedulev1alpha1.Throttle{}
alreadyThrottled := []schedulev1alpha1.Throttle{}
insufficient := []schedulev1alpha1.Throttle{}
podRequestsExceedsThreshold := []schedulev1alpha1.Throttle{}

for _, thr := range throttles {
affected = append(affected, *thr)
reservedAmt, reservedPodNNs := c.cache.reservedResourceAmount(types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name})
Expand All @@ -380,9 +391,11 @@ func (c *ThrottleController) CheckThrottled(pod *v1.Pod, isThrottledOnEqual bool
alreadyThrottled = append(alreadyThrottled, *thr)
case schedulev1alpha1.CheckThrottleStatusInsufficient:
insufficient = append(insufficient, *thr)
case schedulev1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold:
podRequestsExceedsThreshold = append(podRequestsExceedsThreshold, *thr)
}
}
return alreadyThrottled, insufficient, affected, nil
return alreadyThrottled, insufficient, podRequestsExceedsThreshold, affected, nil
}

func (c *ThrottleController) setupEventHandler() {
Expand Down
36 changes: 31 additions & 5 deletions pkg/scheduler_plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,29 +150,55 @@ func (pl *KubeThrottler) PreFilter(
state *framework.CycleState,
pod *v1.Pod,
) *framework.Status {
thrActive, thrInsufficient, thrAffected, err := pl.throttleCtr.CheckThrottled(pod, false)
thrActive, thrInsufficient, thrPodRequestsExceeds, thrAffected, err := pl.throttleCtr.CheckThrottled(pod, false)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
klog.V(2).InfoS("PreFilter: throttle check result",
"Pod", pod.Namespace+"/"+pod.Name,
"#ActiveThrottles", len(thrActive), "#InsufficientThrottles", len(thrInsufficient), "#AffectedThrottles", len(thrAffected),
"#ActiveThrottles", len(thrActive),
"#InsufficientThrottles", len(thrInsufficient),
"#PodRequestsExceedsThresholdThrottles", len(thrPodRequestsExceeds),
"#AffectedThrottles", len(thrAffected),
)

clthrActive, clthrInsufficient, clThrAffected, err := pl.clusterThrottleCtr.CheckThrottled(pod, false)
clthrActive, clthrInsufficient, clthrPodRequestsExceeds, clThrAffected, err := pl.clusterThrottleCtr.CheckThrottled(pod, false)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
klog.V(2).InfoS("PreFilter: clusterthrottle check result",
"Pod", pod.Namespace+"/"+pod.Name,
"#ActiveClusterThrottles", len(clthrActive), "#InsufficientClusterThrottles", len(clthrInsufficient), "#AffectedClusterThrottles", len(clThrAffected),
"#ActiveClusterThrottles", len(clthrActive),
"#InsufficientClusterThrottles", len(clthrInsufficient),
"#PodRequestsExceedsThresholdClusterThrottles", len(clthrPodRequestsExceeds),
"#AffectedClusterThrottles", len(clThrAffected),
)

if len(thrActive)+len(thrInsufficient)+len(clthrActive)+len(clthrInsufficient) == 0 {
if len(thrActive)+len(thrInsufficient)+len(thrPodRequestsExceeds)+
len(clthrActive)+len(clthrInsufficient)+len(clthrPodRequestsExceeds) == 0 {
return framework.NewStatus(framework.Success)
}

reasons := []string{}
if len(clthrPodRequestsExceeds) > 0 {
reasons = append(reasons, fmt.Sprintf("clusterthrottle[%s]=%s", schedulev1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold, strings.Join(clusterThrottleNames(clthrPodRequestsExceeds), ",")))
}
if len(thrPodRequestsExceeds) > 0 {
reasons = append(reasons, fmt.Sprintf("throttle[%s]=%s", schedulev1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold, strings.Join(throttleNames(thrPodRequestsExceeds), ",")))
}
if len(clthrPodRequestsExceeds)+len(thrPodRequestsExceeds) > 0 {
pl.fh.EventRecorder().Eventf(
pod, nil,
v1.EventTypeWarning,
"ResourceRequestsExceedsThrottleThreshold",
pl.Name(),
"It won't be scheduled unless decreasing resource requests or increasing ClusterThrottle/Throttle threshold because its resource requests exceeds their thresholds: %s",
strings.Join(
append(clusterThrottleNames(clthrPodRequestsExceeds), throttleNames(thrPodRequestsExceeds)...),
",",
),
)
}
if len(clthrActive) > 0 {
reasons = append(reasons, fmt.Sprintf("clusterthrottle[%s]=%s", schedulev1alpha1.CheckThrottleStatusActive, strings.Join(clusterThrottleNames(clthrActive), ",")))
}
Expand Down
19 changes: 19 additions & 0 deletions test/integration/clusterthrottle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,25 @@ var _ = Describe("Clusterthrottle Test", func() {
Consistently(PodIsNotScheduled(ctx, DefaultNs, pod2.Name)).Should(Succeed())
})
})
Context("ResourceRequest (pod-requests-exceeds-threshold)", func() {
var pod *corev1.Pod
BeforeEach(func() {
pod = MustCreatePod(ctx, MakePod(DefaultNs, "pod", "1.1").Label(throttleKey, throttleName).Obj())
})
It("should not schedule pod", func() {
Eventually(AsyncAll(
WakeupBackoffPod(ctx),
ClusterThottleHasStatus(
ctx, thr.Name,
ClthrOpts.WithCalculatedThreshold(thr.Spec.Threshold),
ClthrOpts.WithPodThrottled(false), ClthrOpts.WithCpuThrottled(false),
),
MustPodFailedScheduling(ctx, DefaultNs, pod.Name, v1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold),
MustPodResourceRequestsExceedsThrottleThreshold(ctx, DefaultNs, pod.Name, thr.Namespace+"/"+thr.Name),
)).Should(Succeed())
Consistently(PodIsNotScheduled(ctx, DefaultNs, pod.Name)).Should(Succeed())
})
})
})

When("Many pods are created at once", func() {
Expand Down
1 change: 1 addition & 0 deletions test/integration/integration_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestIntegration(t *testing.T) {
}

var _ = BeforeSuite(func() {
flag.Parse()
Expect(kubeConfigPath).NotTo(BeEmpty())
Expect(pauseImage).NotTo(BeEmpty())
restConfig, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
Expand Down
19 changes: 19 additions & 0 deletions test/integration/throttle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,25 @@ var _ = Describe("Throttle Test", func() {
Consistently(PodIsNotScheduled(ctx, DefaultNs, pod2.Name)).Should(Succeed())
})
})
Context("ResourceRequest (pod-requests-exceeds-threshold)", func() {
var pod *corev1.Pod
BeforeEach(func() {
pod = MustCreatePod(ctx, MakePod(DefaultNs, "pod", "1.1").Label(throttleKey, throttleName).Obj())
})
It("should not schedule pod", func() {
Eventually(AsyncAll(
WakeupBackoffPod(ctx),
ThrottleHasStatus(
ctx, DefaultNs, thr.Name,
ThOpts.WithCalculatedThreshold(thr.Spec.Threshold),
ThOpts.WithPodThrottled(false), ThOpts.WithCpuThrottled(false),
),
MustPodFailedScheduling(ctx, DefaultNs, pod.Name, v1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold),
MustPodResourceRequestsExceedsThrottleThreshold(ctx, DefaultNs, pod.Name, thr.Namespace+"/"+thr.Name),
)).Should(Succeed())
Consistently(PodIsNotScheduled(ctx, DefaultNs, pod.Name)).Should(Succeed())
})
})
})

When("Many pods are created at once", func() {
Expand Down
13 changes: 13 additions & 0 deletions test/integration/util_pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ func MustPodFailedScheduling(ctx context.Context, ns, n string, throttleStatus v
}
}

func MustPodResourceRequestsExceedsThrottleThreshold(ctx context.Context, ns, n, throttleNamespacedName string) func(g Gomega) {
return func(g Gomega) {
events, err := k8sCli.CoreV1().Events(ns).List(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("involvedObject.name=%s,reason=ResourceRequestsExceedsThrottleThreshold", n),
})
g.Expect(err).NotTo(HaveOccurred())
g.Expect(events.Items).Should(ContainElement(WithTransform(
func(e corev1.Event) string { return e.Message },
ContainSubstring(throttleNamespacedName),
)))
}
}

func PodIsScheduled(ctx context.Context, namespace, name string) func(g Gomega) {
return func(g Gomega) {
got, err := k8sCli.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
Expand Down

0 comments on commit a11b2fd

Please sign in to comment.